package main import ( "cmp" "context" "database/sql" "database/sql/driver" "fmt" "hash/fnv" "iter" "net/http" "net/url" "slices" "strings" "time" _ "embed" "github.com/oklog/ulid/v2" "go.sour.is/xt/internal/otel" "go.yarn.social/lextwt" "go.yarn.social/types" ) type Feed struct { FeedID uuid ParentID uuid HashURI string URI string Nick string State State LastScanOn TwtTime RefreshRate int NextScanOn TwtTime LastTwtOn TwtTime LastModified TwtTime LastError sql.NullString ETag sql.NullString Version string DiscloseFeedURL string DiscloseNick string } type State string const ( PermanentlyDead State = "permanantly-dead" Frozen State = "frozen" Cold State = "cold" Warm State = "warm" Hot State = "hot" Once State = "once" ) var ( //go:embed init.sql initSQL string insertFeed = func(r int) (string, int) { repeat := "" if r > 1 { repeat = strings.Repeat(", (?, ?, ?, ?, ?, ?, ?)", r-1) } return ` insert into feeds ( feed_id, parent_id, nick, uri, state, last_scan_on, refresh_rate ) values (?, ?, ?, ?, ?, ?, ?)` + repeat + ` ON CONFLICT (feed_id) DO NOTHING`, r * 7 } updateFeed = ` update feeds set state = ?, last_scan_on = ?, refresh_rate = ?, last_modified_on = ?, last_etag = ?, last_error = ? where feed_id = ? ` insertTwt = func(r int) (string, int) { repeat := "" if r > 1 { repeat = strings.Repeat(", (?, ?, ?, ?, ?, ?, ?)", r-1) } return ` insert into twts (feed_id, ulid, text, hash, conv, mentions, tags) values (?, ?, ?, ?, ?, ?, ?)` + repeat + ` ON CONFLICT (feed_id, ulid) DO NOTHING`, r * 7 } fetchFeeds = ` select feed_id, parent_id, coalesce(hashing_uri, uri) hash_uri, uri, nick, state, last_scan_on, strftime( '%Y-%m-%dT%H:%M:%fZ', coalesce(last_scan_on, '1901-01-01'), '+'||abs(refresh_rate + cast(random() % 30 as int))||' seconds' ) next_scan_on, coalesce(last_twt_on, '1901-01-01T00:00:00Z') last_twt_on, refresh_rate, last_modified_on, last_etag from feeds left join ( select feed_id, max(strftime('%Y-%m-%dT%H:%M:%fZ', (substring(text, 1, instr(text, ' ')-1)))) last_twt_on from twts group by feed_id ) using (feed_id) left join ( select feed_id parent_id, uri hashing_uri from feeds where parent_id is null ) using (parent_id) where datetime( coalesce(last_scan_on, '1901-01-01'), '+'||abs(refresh_rate+cast(random()%30 as int))||' seconds' ) < datetime(current_timestamp, '+3 minutes') ` ) func (f *Feed) Create(ctx context.Context, db db) error { ctx, span := otel.Span(ctx) defer span.End() query, _ := insertFeed(1) _, err := db.ExecContext( ctx, query, f.FeedID, // feed_id f.ParentID, // parent_id f.Nick, // nick f.URI, // uri f.State, // state f.LastScanOn, // last_scan_on f.RefreshRate, // refresh_rate ) return err } func (f *Feed) Save(ctx context.Context, db db) error { ctx, span := otel.Span(ctx) defer span.End() _, err := db.ExecContext( ctx, updateFeed, f.State, // state f.LastScanOn, // last_scan_on f.RefreshRate, // refresh_rate f.LastModified, // last_modified_on f.ETag, // last_etag f.LastError, // last_error f.FeedID, // feed_id ) return err } func (f *Feed) Scan(res interface{ Scan(...any) error }) error { f.State = "load" var err error f.Version = "0.0.1" err = res.Scan( &f.FeedID, &f.ParentID, &f.HashURI, &f.URI, &f.Nick, &f.State, &f.LastScanOn, &f.NextScanOn, &f.LastTwtOn, &f.RefreshRate, &f.LastModified, &f.ETag, ) if err != nil { return err } return err } func loadFeeds(ctx context.Context, db db) (iter.Seq[Feed], error) { ctx, span := otel.Span(ctx) var err error var res *sql.Rows res, err = db.QueryContext(ctx, fetchFeeds) if err != nil { return slices.Values([]Feed{}), err } return func(yield func(Feed) bool) { defer span.End() for res.Next() { var f Feed err = f.Scan(res) if err != nil { span.RecordError(err) return } if !yield(f) { return } } }, err } func storeFeed(ctx context.Context, db db, f types.TwtFile) error { ctx, span := otel.Span(ctx) defer span.End() loadTS := time.Now() refreshRate := 600 feedID := urlNS.UUID5(cmp.Or(f.Twter().HashingURI, f.Twter().URI)) tx, err := db.BeginTx(ctx, nil) if err != nil { return err } followers := f.Info().GetAll("follow") followMap := make(map[string]string, len(followers)) for _, f := range f.Info().GetAll("follow") { nick, uri, ok := strings.Cut(f.Value(), "http") if !ok { continue } nick = strings.TrimSpace(nick) uri = "http" + strings.TrimSpace(uri) if _, err := url.Parse(uri); err != nil { continue } followMap[nick] = uri } defer tx.Rollback() twts := f.Twts() _, size := insertTwt(len(twts)) args := make([]any, 0, size) for _, twt := range twts { mentions := make(uuids, 0, len(twt.Mentions())) for _, mention := range twt.Mentions() { followMap[mention.Twter().Nick] = mention.Twter().URI mentions = append(mentions, urlNS.UUID5(mention.Twter().URI)) } tags := make(strList, 0, len(twt.Tags())) for _, tag := range twt.Tags() { tags = append(tags, tag.Text()) } subject := twt.Subject() subjectTag := "" if subject != nil { if tag, ok := subject.Tag().(*lextwt.Tag); ok && tag != nil { subjectTag = tag.Text() } } args = append( args, feedID, // feed_id makeULID(twt), // ulid fmt.Sprintf("%+l", twt), // text subjectTag, // conv twt.Hash(), // hash mentions.ToStrList(), // mentions tags, // tags ) } for query, args := range chunk(args, insertTwt, db.MaxVariableNumber) { fmt.Println("store", f.Twter().URI, len(args)) _, err = tx.ExecContext( ctx, query, args..., ) if err != nil { return err } } args = args[:0] args = append(args, feedID, // feed_id nil, // parent_id f.Twter().DomainNick(), // nick f.Twter().URI, // uri "warm", // state TwtTime{Time: loadTS, Valid: true}, // last_scan_on refreshRate, // refresh_rate ) if prev, ok := f.Info().GetN("prev", 0); ok { _, part, ok := strings.Cut(prev.Value(), " ") if ok { uri := f.Twter().URI if u, ok := f.Info().GetN("url", 0); ok { uri = u.Value() } if u, ok := f.Info().GetN("uri", 0); ok { uri = u.Value() } part = uri[:strings.LastIndex(uri, "/")+1] + part childID := urlNS.UUID5(part) fmt.Println("found prev", uri, part) args = append(args, childID, // feed_id feedID, // parent_id f.Twter().DomainNick(), // nick part, // uri "once", // state nil, // last_scan_on 0, // refresh_rate ) } } for nick, uri := range followMap { args = append(args, urlNS.UUID5(uri), // feed_id nil, // parent_id nick, // nick uri, // uri "warm", // state nil, // last_scan_on refreshRate, // refresh_rate ) } for query, args := range chunk(args, insertFeed, db.MaxVariableNumber) { _, err = tx.ExecContext( ctx, query, args..., ) if err != nil { return err } } return tx.Commit() } func (feed *Feed) MakeHTTPRequest(ctx context.Context) (*http.Request, error) { if strings.Contains(feed.URI, "lublin.se") { return nil, fmt.Errorf("%w: permaban: %s", ErrPermanentlyDead, feed.URI) } if strings.Contains(feed.URI, "enotty.dk") { return nil, fmt.Errorf("%w: permaban: %s", ErrPermanentlyDead, feed.URI) } req, err := http.NewRequestWithContext(ctx, "GET", feed.URI, nil) if err != nil { return nil, fmt.Errorf("creating HTTP request failed: %w", err) } req.Header.Add("Accept", "text/plain") if !feed.LastModified.Valid { req.Header.Add("If-Modified-Since", feed.LastModified.Time.Format(http.TimeFormat)) } if feed.ETag.Valid { req.Header.Add("If-None-Match", feed.ETag.String) } if feed.DiscloseFeedURL != "" && feed.DiscloseNick != "" { req.Header.Set("User-Agent", fmt.Sprintf("xt/%s (+%s; @%s)", feed.Version, feed.DiscloseFeedURL, feed.DiscloseNick)) } else { req.Header.Set("User-Agent", fmt.Sprintf("xt/%s", feed.Version)) } return req, nil } type TwtTime struct { Time time.Time Valid bool // Valid is true if Time is not NULL } // Scan implements the [Scanner] interface. func (n *TwtTime) Scan(value any) error { var err error switch value := value.(type) { case nil: n.Time, n.Valid = time.Time{}, false return nil case string: n.Valid = true n.Time, err = time.Parse(time.RFC3339, value) case time.Time: n.Valid = true n.Time = value } return err } // Value implements the [driver.Valuer] interface. func (n TwtTime) Value() (driver.Value, error) { if !n.Valid { return nil, nil } return n.Time.Format(time.RFC3339), nil } func makeULID(twt types.Twt) ulid.ULID { h64 := fnv.New64a() h16 := fnv.New32a() text := []byte(fmt.Sprintf("%+l", twt)) b := make([]byte, 10) copy(b, h16.Sum(text)[:2]) copy(b[2:], h64.Sum(text)) u := ulid.ULID{} u.SetTime(ulid.Timestamp(twt.Created())) u.SetEntropy(b) return u } func chunk(args []any, qry func(int) (string, int), maxArgs int) iter.Seq2[string, []any] { _, size := qry(1) itemsPerIter := maxArgs / size if len(args) < size { return func(yield func(string, []any) bool) {} } if len(args) < maxArgs { return func(yield func(string, []any) bool) { query, _ := qry(len(args) / size) yield(query, args) } } return func(yield func(string, []any) bool) { for len(args) > 0 { if len(args) > maxArgs { query, size := qry(itemsPerIter) if !yield(query, args[:size]) { return } args = args[size:] continue } query, _ := qry(len(args) / size) yield(query, args) return } } }