package main import ( "errors" "fmt" "io" "os" "path/filepath" "strings" "time" "go.sour.is/xt/internal/otel" "go.yarn.social/lextwt" "go.yarn.social/types" ) const ( TenYear = 3153600000 OneDay = 86400 OneHour = 3600 TenMinutes = 600 TwoMinutes = 60 ) func refreshLoop(c *console, app *appState) error { ctx, span := otel.Span(c.Context) defer span.End() f := NewHTTPFetcher() fetch, close := NewFuncPool(c.Context, 25, f.Fetch) defer close() db, err := app.DB() if err != nil { otel.Error(err, "missing db") return err } queue := app.queue otel.Info("start refresh loop") for c.Context.Err() == nil { if queue.IsEmpty() { otel.Info("load feeds") it, err := loadFeeds(c.Context, db) for f := range it { queue.Insert(&f) } if err != nil { otel.Error(err) return err } } f := queue.ExtractMin() if f == nil { otel.Info("sleeping for ", TenMinutes*time.Second) select { case <-time.After(TenMinutes * time.Second): case <-c.Done(): return nil } continue } otel.Info("queue size", queue.count, "next", f.URI, "next scan on", f.LastScanOn.Time.Format(time.RFC3339)) if time.Until(f.LastScanOn.Time) > 2*time.Hour { otel.Info("too soon", f.URI) continue } select { case <-c.Done(): return nil case t := <-time.After(time.Until(f.LastScanOn.Time)): otel.Info("fetch", t.Format(time.RFC3339), f.Nick, f.URI) fetch.Fetch(f) case res := <-fetch.Out(): otel.Info("got response:", res.Request.URI) f := res.Request f.LastScanOn.Time = time.Now() err := res.err if res.err != nil { if errors.Is(err, ErrPermanentlyDead) { f.RefreshRate = TenYear } if errors.Is(err, ErrTemporarilyDead) { f.RefreshRate = OneDay } if errors.Is(err, ErrUnmodified) { f.RefreshRate = OneDay } otel.Error(err) f.LastError.String, f.LastError.Valid = err.Error(), true err = f.Save(c.Context, db) if err != nil { otel.Error(err) } continue } f.ETag.String, f.ETag.Valid = res.ETag(), true f.LastModified.Time, f.LastModified.Valid = res.LastModified(), true cpy, err := os.OpenFile(filepath.Join("feeds", urlNS.UUID5(f.URI).MarshalText()), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600) if err != nil { otel.Error(fmt.Errorf("%w: %w", ErrParseFailed, err)) f.LastError.String, f.LastError.Valid = err.Error(), true f.RefreshRate = OneDay err = f.Save(c.Context, db) otel.Error(err) continue } rdr := io.TeeReader(res.Body, cpy) rdr = lextwt.TwtFixer(rdr) twtfile, err := lextwt.ParseFile(rdr, &types.Twter{Nick: f.Nick, URI: f.URI}) if err != nil { otel.Error(fmt.Errorf("%w: %w", ErrParseFailed, err)) f.LastError.String, f.LastError.Valid = err.Error(), true f.RefreshRate = OneDay err = f.Save(c.Context, db) otel.Error(err) continue } if prev, ok := twtfile.Info().GetN("prev", 0); f.FirstFetch && ok { _, part, ok := strings.Cut(prev.Value(), " ") if ok { part = f.URI[:strings.LastIndex(f.URI, "/")+1] + part queue.Insert(&Feed{ FetchURI: part, URI: f.URI, Nick: f.Nick, LastScanOn: f.LastScanOn, RefreshRate: f.RefreshRate, }) } } err = storeFeed(ctx, db, twtfile) if err != nil { otel.Error(err) f.LastError.String, f.LastError.Valid = err.Error(), true err = f.Save(c.Context, db) otel.Error(err) return err } cpy.Close() f.LastScanOn.Time = time.Now() f.RefreshRate = TenMinutes f.LastError.String = "" err = f.Save(c.Context, db) if err != nil { otel.Error(err) return err } } } return c.Context.Err() }