From 0ea449264ab1c852ee2cec7596b0347076323256 Mon Sep 17 00:00:00 2001 From: xuu Date: Mon, 11 Nov 2024 19:13:34 -0700 Subject: [PATCH] chore: improve fetch functionality --- .gitignore | 3 +- feed.go | 37 +++++++++ fetcher.go | 125 +++++++++++++++++++++++++++++++ go.mod | 2 +- init.sql | 8 +- service.go | 215 ++++++++++++++++++++++++++++++++++------------------- 6 files changed, 310 insertions(+), 80 deletions(-) create mode 100644 feed.go create mode 100644 fetcher.go diff --git a/.gitignore b/.gitignore index efaa1a5..24a5a9f 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ -twt.db* +*.db* feed __debug* +feeds/ diff --git a/feed.go b/feed.go new file mode 100644 index 0000000..d70777b --- /dev/null +++ b/feed.go @@ -0,0 +1,37 @@ +package main + +import ( + "context" + "database/sql" +) + +type Feed struct { + FeedID uuid + URI string + Nick string + LastScanOn sql.NullTime + RefreshRate int + + LastModified sql.NullTime + LastError sql.NullString + ETag sql.NullString + + DiscloseFeedURL string + DiscloseNick string + + Version string +} + +func (f *Feed) Save(ctx context.Context, db *sql.DB) error { + _, err := db.ExecContext( + ctx, + updateFeed, + f.LastScanOn, + f.RefreshRate, + f.LastModified, + f.ETag, + f.LastError, + f.FeedID, + ) + return err +} diff --git a/fetcher.go b/fetcher.go new file mode 100644 index 0000000..ff45dc4 --- /dev/null +++ b/fetcher.go @@ -0,0 +1,125 @@ +package main + +import ( + "context" + "errors" + "fmt" + "net" + "net/http" + "strings" + "time" +) + +var ( + ErrUnmodified = errors.New("unmodified") + ErrPermanentlyDead = errors.New("permanently dead") + ErrTemporarilyDead = errors.New("temporarily dead") + ErrParseFailed = errors.New("parse failed") +) + +type Response struct { + *http.Response +} + +func (r *Response) ETag() string { + return r.Header.Get("ETag") +} +func (r *Response) Read(b []byte) (int, error) { + return r.Body.Read(b) +} + +// Close closes the Response.Body, which is necessary to free up resources +func (r *Response) Close() { + r.Body.Close() +} +func (r *Response) ContentType() string { + return r.Header.Get("Content-Type") +} +func (r *Response) LastModified() time.Time { + lastModified := time.Now() + if lm, err := time.Parse(http.TimeFormat, r.Header.Get("Last-Modified")); err == nil { + lastModified = lm + } + + return lastModified +} + +type httpFetcher struct { + client *http.Client +} + +func NewHTTPFetcher() *httpFetcher { + return &httpFetcher{ + client: &http.Client{ + Transport: &http.Transport{ + Proxy: http.ProxyFromEnvironment, + DialContext: (&net.Dialer{ + Timeout: 5 * time.Second, + KeepAlive: 5 * time.Second, + }).DialContext, + ForceAttemptHTTP2: false, + MaxIdleConns: 100, + IdleConnTimeout: 10 * time.Second, + TLSHandshakeTimeout: 10 * time.Second, + ExpectContinueTimeout: 1 * time.Second, + }, + }, + } +} + +func (f *httpFetcher) Fetch(ctx context.Context, request *Feed) (*Response, error) { + if strings.Contains(request.URI, "lublin.se") { + return nil, fmt.Errorf("%w: permaban: %s", ErrPermanentlyDead, request.URI) + } + + req, err := http.NewRequestWithContext(ctx, "GET", request.URI, nil) + if err != nil { + return nil, fmt.Errorf("creating HTTP request failed: %w", err) + } + + req.Header.Add("Accept", "text/plain") + + if !request.LastModified.Valid { + req.Header.Add("If-Modified-Since", request.LastModified.Time.Format(http.TimeFormat)) + } + + if request.ETag.Valid { + req.Header.Add("If-None-Match", request.ETag.String) + } + + if request.DiscloseFeedURL != "" && request.DiscloseNick != "" { + req.Header.Set("User-Agent", fmt.Sprintf("xt/%s (+%s; @%s)", + request.Version, request.DiscloseFeedURL, request.DiscloseNick)) + } else { + req.Header.Set("User-Agent", fmt.Sprintf("xt/%s", request.Version)) + } + + res, err := f.client.Do(req) + if err != nil { + if errors.Is(err, &net.DNSError{}) { + return nil, fmt.Errorf("%w: %s", ErrTemporarilyDead, err) + } + return nil, fmt.Errorf("%w: %w", ErrPermanentlyDead, err) + } + + response := &Response{ + Response: res, + } + + switch res.StatusCode { + case 200: + return response, nil + + case 304: + return response, fmt.Errorf("%w: %s", ErrUnmodified, res.Status) + + case 400, 406, 502, 503: + return response, fmt.Errorf("%w: %s", ErrTemporarilyDead, res.Status) + + case 403, 404, 410: + return response, fmt.Errorf("%w: %s", ErrPermanentlyDead, res.Status) + + default: + return response, errors.New(res.Status) + } +} diff --git a/go.mod b/go.mod index 70e21c4..ee229bd 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,7 @@ require ( github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/sirupsen/logrus v1.9.3 // indirect github.com/writeas/go-strip-markdown/v2 v2.1.1 // indirect - go.yarn.social/types v0.0.0-20230305013457-e4d91e351ac8 // indirect + go.yarn.social/types v0.0.0-20230305013457-e4d91e351ac8 golang.org/x/crypto v0.27.0 // indirect golang.org/x/sys v0.25.0 // indirect ) diff --git a/init.sql b/init.sql index 8f0e250..10dfc70 100644 --- a/init.sql +++ b/init.sql @@ -3,10 +3,12 @@ PRAGMA journal_mode=WAL; create table if not exists feeds ( feed_id blob primary key, uri text, - nick string, - domain string, + nick text, last_scan_on timestamp, - refresh_rate int default 600 + refresh_rate int default 600, + last_modified_on timestamp, + last_etag text, + last_error text ); create table if not exists twts ( diff --git a/service.go b/service.go index 2e5cf4e..fac0a88 100644 --- a/service.go +++ b/service.go @@ -2,11 +2,14 @@ package main import ( "database/sql" + "errors" "fmt" "io" - "net/http" + "iter" "net/url" "os" + "path/filepath" + "slices" "strings" "time" @@ -14,6 +17,7 @@ import ( _ "github.com/mattn/go-sqlite3" "go.yarn.social/lextwt" + "go.yarn.social/types" ) func run(c console) error { @@ -40,7 +44,7 @@ func run(c console) error { return err } defer f.Close() - err = loadFeed(db, f) + err = loadFeed(db, &types.Twter{Nick: "xuu", URI: "https://txt.sour.is/users/xuu/twtxt.txt"}, f) if err != nil { return err } @@ -72,26 +76,37 @@ var ( ` fetchFeeds = ` - select feed_id, uri, nick, last_scan_on, refresh_rate from feeds + select + feed_id, + uri, + nick, + last_scan_on, + refresh_rate, + last_modified_on, + last_etag + from feeds ` updateFeed = ` update feeds set last_scan_on = ?, - refresh_rate = ? + refresh_rate = ?, + last_modified_on = ?, + last_etag = ?, + last_error = ? where feed_id = ? ` ) -func loadFeed(db *sql.DB, feed io.Reader) error { +func loadFeed(db *sql.DB, twter *types.Twter, feed io.Reader) error { loadTS := time.Now() refreshRate := 600 - f, err := lextwt.ParseFile(feed, nil) + f, err := lextwt.ParseFile(feed, twter) if err != nil { - return err + return fmt.Errorf("%w: %w", ErrParseFailed, err) } - feedID := urlNS.UUID5(f.Twter().HashingURI) + feedID := urlNS.UUID5(coalesce(f.Twter().HashingURI, f.Twter().URI)) tx, err := db.Begin() if err != nil { @@ -102,12 +117,16 @@ func loadFeed(db *sql.DB, feed io.Reader) error { followMap := make(map[string]string, len(followers)) for _, f := range f.Info().GetAll("follow") { nick, uri, ok := strings.Cut(f.Value(), "http") - if !ok{ + if !ok { continue } nick = strings.TrimSpace(nick) uri = "http" + strings.TrimSpace(uri) + if uri == "https://lublin.se/twtxt.txt" { + continue + } + if _, err := url.Parse(uri); err != nil { continue } @@ -174,106 +193,152 @@ func loadFeed(db *sql.DB, feed io.Reader) error { return tx.Commit() } -type feed struct { - ID uuid - URI string - Nick string - LastScanOn sql.NullTime - RefreshRate int -} - func refreshLoop(c console) { - maxInt := 3153600000 + defer c.abort() - less := func(a, b *feed) bool { + TenYear := 3153600000 // 10 year + OneDay := 86400 // 1 day + TenMinutes := 600 + + fetch := NewHTTPFetcher() + + less := func(a, b *Feed) bool { return a.LastScanOn.Time.Before(b.LastScanOn.Time) } queue := FibHeap(less) db := c.Get("db").(*sql.DB) - res, err := db.QueryContext(c.Context, fetchFeeds) - - if err != nil { - c.Log(err) - c.abort() - return - } - - c.Log("load feeds") - for res.Next() { - var f feed - err = res.Scan(&f.ID, &f.URI, &f.Nick, &f.LastScanOn, &f.RefreshRate) - if err != nil { - c.Log(err) - c.abort() - return - } - - if !f.LastScanOn.Valid { - f.LastScanOn.Time = time.Now() - f.LastScanOn.Valid = true - } else { - f.LastScanOn.Time = f.LastScanOn.Time.Add(time.Duration(f.RefreshRate) * time.Second) - } - - queue.Insert(&f) - } - - c.Log("start refresh loop") - for !queue.IsEmpty() { + for c.Err() == nil { + if queue.IsEmpty() { + it, err := LoadFeeds(c) + for f := range it { + queue.Insert(&f) + } + if err != nil { + c.Log(err) + return + } + } + f := queue.ExtractMin() - c.Log("next", f.URI, "last scan on", f.LastScanOn.Time) + c.Log("queue size", queue.count, "next", f.URI, "last scan on", f.LastScanOn.Time.Format(time.RFC3339)) + + if time.Until(f.LastScanOn.Time) > 2*time.Hour { + c.Log("too soon", f.URI) + continue + } select { case <-c.Done(): return case <-time.After(time.Until(f.LastScanOn.Time)): - c.Log("refresh", f.URI) } - req, err := http.NewRequestWithContext(c.Context, "GET", f.URI, nil) + res, err := fetch.Fetch(c.Context, f) if err != nil { - c.Log(err) - c.abort() - return - } + f.LastError.String, f.LastError.Valid = err.Error(), true + if errors.Is(err, ErrPermanentlyDead) { + f.RefreshRate = TenYear + } + if errors.Is(err, ErrTemporarilyDead) { + f.RefreshRate = OneDay + } + if errors.Is(err, ErrUnmodified) { + f.RefreshRate = OneDay + } - resp, err := http.DefaultClient.Do(req) - if err != nil { c.Log(err) - _, err = db.ExecContext(c.Context, updateFeed, f.LastScanOn, maxInt, f.ID) + err = f.Save(c.Context, db) if err != nil { c.Log(err) - c.abort() return } continue } - defer resp.Body.Close() - err = loadFeed(db, resp.Body) + f.ETag.String, f.ETag.Valid = res.ETag(), true + f.LastModified.Time, f.LastModified.Valid = res.LastModified(), true + + cpy, err := os.OpenFile(filepath.Join("feeds", urlNS.UUID5(f.URI).MarshalText()), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600) + rdr := io.TeeReader(res.Body, cpy) + + err = loadFeed(db, &types.Twter{Nick: f.Nick, URI: f.URI}, rdr) if err != nil { - _, err = db.ExecContext(c.Context, updateFeed, f.LastScanOn, maxInt, f.ID) - if err != nil { - c.Log(err) - c.abort() - return - } - - continue + c.Log(err) + return } + cpy.Close() + f.LastScanOn.Time = time.Now() + f.RefreshRate = TenMinutes + f.LastError.String = "" - db.ExecContext(c.Context, updateFeed, f.LastScanOn, f.RefreshRate, f.ID) - - f.LastScanOn.Time = f.LastScanOn.Time.Add(time.Duration(f.RefreshRate) * time.Second) - c.Log("next scan", f.URI, "on", f.LastScanOn.Time) - // queue.Insert(f) + err = f.Save(c.Context, db) + if err != nil { + c.Log(err) + return + } } } + +func LoadFeeds(c console) (iter.Seq[Feed], error) { + var err error + var res *sql.Rows + + db := c.Get("db").(*sql.DB) + res, err = db.QueryContext(c.Context, fetchFeeds) + + if err != nil { + return slices.Values([]Feed{}), err + } + + c.Log("load feeds") + + return func(yield func(Feed) bool) { + for res.Next() { + var f Feed + f.Version = "0.0.1" + err = res.Scan( + &f.FeedID, + &f.URI, + &f.Nick, + &f.LastScanOn, + &f.RefreshRate, + &f.LastModified, + &f.ETag, + ) + if err != nil { + return + } + + if !f.LastScanOn.Valid { + f.LastScanOn.Time = time.Now() + f.LastScanOn.Valid = true + } else { + f.LastScanOn.Time = f.LastScanOn.Time.Add(time.Duration(f.RefreshRate) * time.Second) + } + + if !yield(f) { + return + } + } + }, err +} + +func coalesce[T comparable](a T, values ...T) T { + var zero T + + for _, v := range values { + if a == zero { + a = v + } + } + + return a +}