xt/refresh-loop.go
2025-03-29 17:09:18 -06:00

296 lines
6.8 KiB
Go

package main
import (
"context"
"errors"
"fmt"
"io"
"sort"
"time"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.sour.is/xt/internal/otel"
"go.yarn.social/lextwt"
"go.yarn.social/types"
)
const (
TenYear = 3153600000
OneMonth = OneDay * 30
OneDay = 86400
OneHour = 3600
TenMinutes = 600
TwoMinutes = 120
)
func feedRefreshProcessor(ctx context.Context, app *appState) error {
ctx, span := otel.Span(ctx)
defer span.End()
sleeping_time, _ := otel.Meter().Int64Counter("xt_feed_sleep")
queue_size, _ := otel.Meter().Int64Gauge("xt_feed_queue_size")
f := NewHTTPFetcher()
fetch, close := NewFuncPool(ctx, 40, f.Fetch)
defer close()
db, err := app.DB(ctx)
if err != nil {
span.RecordError(err)
return err
}
go processorLoop(ctx, db, fetch)
queue := app.queue
span.AddEvent("start refresh loop")
for ctx.Err() == nil {
if queue.IsEmpty() {
span.AddEvent("load feeds")
it, err := loadFeeds(ctx, db)
span.RecordError(err)
for f := range it {
queue.Insert(&f)
}
}
span.AddEvent("queue", trace.WithAttributes(attribute.Int("size", int(queue.count))))
queue_size.Record(ctx, int64(queue.count))
f := queue.ExtractMin()
if f == nil {
sleeping_time.Add(ctx, int64(TwoMinutes))
span.AddEvent("sleeping for ", trace.WithAttributes(attribute.Int("seconds", int(TwoMinutes))))
select {
case <-time.After(TwoMinutes * time.Second):
case <-ctx.Done():
return nil
}
span.End()
continue
}
span.AddEvent("next", trace.WithAttributes(
attribute.Int("size", int(queue.count)),
attribute.String("uri", f.URI),
attribute.String("last scan on", f.LastScanOn.Time.Format(time.RFC3339)),
attribute.String("next scan on", f.NextScanOn.Time.Format(time.RFC3339)),
))
until := time.Until(f.NextScanOn.Time)
if until > 2*time.Hour {
span.AddEvent("too soon", trace.WithAttributes(attribute.String("uri", f.URI)))
span.End()
continue
}
span.AddEvent(
"till next",
trace.WithAttributes(attribute.String("time", until.String())))
sleeping_time.Add(ctx, until.Milliseconds())
select {
case <-ctx.Done():
return nil
case t := <-time.After(until):
span.AddEvent("fetch", trace.WithAttributes(
attribute.Int("size", int(queue.count)),
attribute.String("uri", f.URI),
attribute.String("timeout", t.Format(time.RFC3339)),
attribute.String("next scan on", f.NextScanOn.Time.Format(time.RFC3339)),
))
}
fetch.Fetch(f)
}
span.RecordError(ctx.Err())
return ctx.Err()
}
func processorLoop(ctx context.Context, db db, fetch *pool[*Feed, *Response]) {
ctx, span := otel.Span(ctx)
defer span.End()
process_in_total, _ := otel.Meter().Int64Counter("xt_processed_in_total")
process_out_total, _ := otel.Meter().Int64Counter("xt_processed_out_total")
twts_histogram, _ := otel.Meter().Float64Histogram("xt_twt1k_bucket")
for ctx.Err() == nil {
select {
case <-ctx.Done():
return
case <-time.After(10 * time.Minute):
refreshLastTwt(ctx, db)
case res := <-fetch.Out():
f := res.Request
span.AddEvent("got response", trace.WithAttributes(
attribute.String("uri", f.URI),
attribute.String("scan on", f.NextScanOn.Time.Format(time.RFC3339)),
))
process_in_total.Add(ctx, 1)
f.LastScanOn.Time = time.Now()
f.LastScanOn.Valid = true
err := res.err
if res.err != nil {
if errors.Is(err, ErrPermanentlyDead) {
f.State = "permanantly-dead"
f.RefreshRate = TenYear
}
if errors.Is(err, ErrTemporarilyDead) {
f.RefreshRate, f.State = tsTemp(f.LastTwtOn.Time)
}
if errors.Is(err, ErrUnmodified) {
f.RefreshRate, f.State = tsTemp(f.LastTwtOn.Time)
}
span.RecordError(err)
f.LastError.String, f.LastError.Valid = err.Error(), true
err = f.Save(ctx, db)
span.RecordError(err)
continue
}
f.ETag.String, f.ETag.Valid = res.ETag(), true
f.LastModified.Time, f.LastModified.Valid = res.LastModified(), true
span.AddEvent("read feed")
// cpy, err := os.OpenFile(filepath.Join("feeds", urlNS.UUID5(f.URI).MarshalText()), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600)
if err != nil {
span.RecordError(fmt.Errorf("%w: %w", ErrParseFailed, err))
f.LastError.String, f.LastError.Valid = err.Error(), true
f.RefreshRate = OneDay
err = f.Save(ctx, db)
span.RecordError(err)
continue
}
var rdr io.Reader = res.Body
// rdr := io.TeeReader(rdr, cpy)
rdr = lextwt.TwtFixer(rdr)
twtfile, err := lextwt.ParseFile(rdr, &types.Twter{Nick: f.Nick, URI: f.URI})
//cpy.Close()
res.Body.Close()
if err != nil {
span.RecordError(fmt.Errorf("%w: %w", ErrParseFailed, err))
f.LastError.String, f.LastError.Valid = err.Error(), true
f.RefreshRate = OneDay
err = f.Save(ctx, db)
span.RecordError(err)
continue
}
count := twtfile.Twts().Len()
span.AddEvent("parse complete", trace.WithAttributes(attribute.Int("count", count)))
twts_histogram.Record(ctx, float64(count)/1000)
err = storeFeed(ctx, db, twtfile)
if err != nil {
span.RecordError(err)
f.LastError.String, f.LastError.Valid = err.Error(), true
err = f.Save(ctx, db)
span.RecordError(err)
continue
}
f.RefreshRate, f.State = checkTemp(twtfile.Twts())
f.LastError.String = ""
err = f.Save(ctx, db)
span.RecordError(err)
process_out_total.Add(ctx, 1)
}
}
span.RecordError(ctx.Err())
}
func checkTemp(twts types.Twts) (int, State) {
if len(twts) < 5 {
return 7 * OneDay, "cold"
}
sort.Sort(twts)
since_first := -time.Until(twts[0].Created())
since_fifth := -time.Until(twts[4].Created())
if since_first < 2*time.Hour || since_fifth < 8*time.Hour {
return TwoMinutes, "hot"
}
if since_first < 4*time.Hour || since_fifth < 16*time.Hour {
return TenMinutes, "hot"
}
if since_first < 8*time.Hour || since_fifth < 32*time.Hour {
return 2 * TenMinutes, "warm"
}
if since_first < 16*time.Hour || since_fifth < 64*time.Hour {
return 4 * TenMinutes, "warm"
}
if since_first < 24*time.Hour || since_fifth < 128*time.Hour {
return OneDay, "cold"
}
if since_first < 48*time.Hour || since_fifth < 256*time.Hour {
return 2 * OneDay, "cold"
}
if since_first < 96*time.Hour || since_fifth < 512*time.Hour {
return 7 * OneDay, "frozen"
}
return OneMonth, "frozen"
}
func tsTemp(ts time.Time) (int, State) {
since_first := -time.Until(ts)
if since_first < 2*time.Hour {
return TwoMinutes, "hot"
}
if since_first < 4*time.Hour {
return TenMinutes, "hot"
}
if since_first < 8*time.Hour {
return 2 * TenMinutes, "warm"
}
if since_first < 16*time.Hour {
return 4 * TenMinutes, "warm"
}
if since_first < 24*time.Hour {
return OneDay, "cold"
}
if since_first < 48*time.Hour {
return 2 * OneDay, "cold"
}
if since_first < 96*time.Hour {
return 7 * OneDay, "frozen"
}
return OneMonth, "frozen"
}