package main import ( "context" "errors" "fmt" "io" "os" "path/filepath" "time" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" "go.sour.is/xt/internal/otel" "go.yarn.social/lextwt" "go.yarn.social/types" ) const ( TenYear = 3153600000 OneDay = 86400 OneHour = 3600 TenMinutes = 600 TwoMinutes = 60 ) func refreshLoop(c *console, app *appState) error { ctx, span := otel.Span(c.Context) defer span.End() f := NewHTTPFetcher() fetch, close := NewFuncPool(ctx, 25, f.Fetch) defer close() db, err := app.DB(c) if err != nil { span.RecordError(err) return err } go processorLoop(ctx, db, fetch) queue := app.queue span.AddEvent("start refresh loop") for ctx.Err() == nil { if queue.IsEmpty() { span.AddEvent("load feeds") it, err := loadFeeds(ctx, db) span.RecordError(err) for f := range it { queue.Insert(&f) } } span.AddEvent("queue size", trace.WithAttributes(attribute.Int("size", int(queue.count)))) f := queue.ExtractMin() if f == nil { span.AddEvent("sleeping for ", trace.WithAttributes(attribute.Int("seconds", int(TenMinutes)))) select { case <-time.After(TenMinutes * time.Second): case <-c.Done(): return nil } continue } span.AddEvent("next", trace.WithAttributes( attribute.Int("size", int(queue.count)), attribute.String("uri", f.URI), attribute.String("scan on", f.LastScanOn.Time.Format(time.RFC3339)), )) if time.Until(f.NextScanOn.Time) > 2*time.Hour { span.AddEvent("too soon", trace.WithAttributes(attribute.String("uri", f.URI))) continue } select { case <-ctx.Done(): return nil case t := <-time.After(time.Until(f.NextScanOn.Time)): span.AddEvent("fetch", trace.WithAttributes( attribute.Int("size", int(queue.count)), attribute.String("uri", f.URI), attribute.String("timeout", t.Format(time.RFC3339)), attribute.String("scan on", f.NextScanOn.Time.Format(time.RFC3339)), )) fetch.Fetch(f) } } span.RecordError(ctx.Err()) return ctx.Err() } func processorLoop(ctx context.Context, db db, fetch *pool[*Feed, *Response]) { ctx, span := otel.Span(ctx) defer span.End() for ctx.Err() == nil { select { case <-ctx.Done(): return case res := <-fetch.Out(): f := res.Request span.AddEvent("got response", trace.WithAttributes( attribute.String("uri", f.URI), attribute.String("scan on", f.NextScanOn.Time.Format(time.RFC3339)), )) f.LastScanOn.Time = time.Now() f.LastScanOn.Valid = true err := res.err if res.err != nil { if errors.Is(err, ErrPermanentlyDead) { f.RefreshRate = TenYear } if errors.Is(err, ErrTemporarilyDead) { f.RefreshRate = OneDay } if errors.Is(err, ErrUnmodified) { f.RefreshRate = OneDay } span.RecordError(err) f.LastError.String, f.LastError.Valid = err.Error(), true err = f.Save(ctx, db) span.RecordError(err) continue } f.ETag.String, f.ETag.Valid = res.ETag(), true f.LastModified.Time, f.LastModified.Valid = res.LastModified(), true span.AddEvent("read feed") cpy, err := os.OpenFile(filepath.Join("feeds", urlNS.UUID5(f.URI).MarshalText()), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600) if err != nil { span.RecordError(fmt.Errorf("%w: %w", ErrParseFailed, err)) f.LastError.String, f.LastError.Valid = err.Error(), true f.RefreshRate = OneDay err = f.Save(ctx, db) span.RecordError(err) continue } rdr := io.TeeReader(res.Body, cpy) rdr = lextwt.TwtFixer(rdr) twtfile, err := lextwt.ParseFile(rdr, &types.Twter{Nick: f.Nick, URI: f.URI}) if err != nil { span.RecordError(fmt.Errorf("%w: %w", ErrParseFailed, err)) f.LastError.String, f.LastError.Valid = err.Error(), true f.RefreshRate = OneDay err = f.Save(ctx, db) span.RecordError(err) continue } cpy.Close() span.AddEvent("parse complete", trace.WithAttributes(attribute.Int("count", twtfile.Twts().Len()))) err = storeFeed(ctx, db, twtfile) if err != nil { span.RecordError(err) f.LastError.String, f.LastError.Valid = err.Error(), true err = f.Save(ctx, db) span.RecordError(err) continue } f.RefreshRate = TenMinutes f.LastError.String = "" err = f.Save(ctx, db) span.RecordError(err) } } span.RecordError(ctx.Err()) }