package main import ( "cmp" "context" "database/sql" "database/sql/driver" "fmt" "hash/fnv" "io" "iter" "net/http" "net/url" "slices" "strings" "time" _ "embed" "github.com/oklog/ulid/v2" "go.sour.is/xt/internal/otel" "go.yarn.social/lextwt" "go.yarn.social/types" ) type Feed struct { FeedID uuid ParentID uuid HashURI string URI string Nick string State State LastScanOn TwtTime RefreshRate int NextScanOn TwtTime LastTwtOn TwtTime LastModified TwtTime LastError sql.NullString ETag sql.NullString Version string DiscloseFeedURL string DiscloseNick string } type State string const ( PermanentlyDead State = "permanantly-dead" Frozen State = "frozen" Cold State = "cold" Warm State = "warm" Hot State = "hot" Once State = "once" ) var ( //go:embed init.sql initSQL string insertFeed = func(r int) (string, int) { repeat := "" if r > 1 { repeat = strings.Repeat(", (?, ?, ?, ?, ?, ?, ?)", r-1) } return ` insert into feeds ( feed_id, parent_id, nick, uri, state, last_scan_on, refresh_rate ) values (?, ?, ?, ?, ?, ?, ?)` + repeat + ` ON CONFLICT (feed_id) DO NOTHING `, r * 7 } updateFeed = ` update feeds set nick = ?, state = ?, last_scan_on = ?, refresh_rate = ?, last_modified_on = ?, last_etag = ?, last_error = ? where feed_id = ? ` insertTwt = func(r int) (string, int) { repeat := "" if r > 1 { repeat = strings.Repeat(", (?, ?, ?, ?, ?, ?, ?)", r-1) } return ` insert into twts ( feed_id, ulid, text, hash, conv, mentions, tags ) values (?, ?, ?, ?, ?, ?, ?)` + repeat + ` ON CONFLICT (feed_id, ulid) DO UPDATE SET conv = excluded.conv, hash = excluded.hash `, r * 7 } fetchFeeds = ` select feed_id, parent_id, coalesce(hashing_uri, uri) hash_uri, uri, nick, state, last_scan_on, strftime( '%Y-%m-%dT%H:%M:%fZ', coalesce(last_scan_on, '1901-01-01'), '+'||abs(refresh_rate + cast(random() % 30 as int))||' seconds' ) next_scan_on, coalesce(last_twt_on, '1901-01-01T00:00:00Z') last_twt_on, refresh_rate, last_modified_on, last_etag from feeds left join ( select feed_id, max(strftime('%Y-%m-%dT%H:%M:%fZ', (substring(text, 1, instr(text, ' ')-1)))) last_twt_on from twts group by feed_id ) using (feed_id) left join ( select feed_id parent_id, uri hashing_uri from feeds where parent_id is null ) using (parent_id) where datetime( coalesce(last_scan_on, '1901-01-01'), '+'||abs(refresh_rate+cast(random()%30 as int))||' seconds' ) < datetime(current_timestamp, '+3 minutes') ` permaban = []string{ "//lublin.se/", "//enotty.dk/", } ) func (f *Feed) Save(ctx context.Context, db db) error { ctx, span := otel.Span(ctx) defer span.End() _, err := db.ExecContext( ctx, updateFeed, f.Nick, f.State, // state f.LastScanOn, // last_scan_on f.RefreshRate, // refresh_rate f.LastModified, // last_modified_on f.ETag, // last_etag f.LastError, // last_error f.FeedID, // feed_id ) return err } func (f *Feed) Scan(res interface{ Scan(...any) error }) error { f.State = "load" var err error f.Version = "0.0.1" err = res.Scan( &f.FeedID, &f.ParentID, &f.HashURI, &f.URI, &f.Nick, &f.State, &f.LastScanOn, &f.NextScanOn, &f.LastTwtOn, &f.RefreshRate, &f.LastModified, &f.ETag, ) if err != nil { return err } return err } func loadFeeds(ctx context.Context, db db) (iter.Seq[Feed], error) { ctx, span := otel.Span(ctx) var err error var res *sql.Rows res, err = db.QueryContext(ctx, fetchFeeds) if err != nil { return slices.Values([]Feed{}), err } return func(yield func(Feed) bool) { defer span.End() for res.Next() { var f Feed err = f.Scan(res) if err != nil { span.RecordError(err) return } if !yield(f) { return } } }, err } func storeFeed(ctx context.Context, db db, f types.TwtFile) error { ctx, span := otel.Span(ctx) defer span.End() loadTS := time.Now() refreshRate := 600 feedID := urlNS.UUID5(cmp.Or(f.Twter().HashingURI, f.Twter().URI)) tx, err := db.BeginTx(ctx, nil) if err != nil { return err } followers := f.Info().GetAll("follow") followMap := make(map[string]string, len(followers)) for _, f := range f.Info().GetAll("follow") { nick, uri, ok := strings.Cut(f.Value(), " ") if !ok { continue } nick = strings.TrimSpace(nick) uri = strings.TrimSpace(uri) if _, err := url.Parse(uri); err != nil { continue } followMap[uri] = nick } defer tx.Rollback() twts := f.Twts() _, size := insertTwt(len(twts)) args := make([]any, 0, size) for _, twt := range twts { twtID := makeULID(twt) mentions := make(uuids, 0, len(twt.Mentions())) for _, mention := range twt.Mentions() { followMap[mention.Twter().URI] = mention.Twter().Nick mentions = append(mentions, urlNS.UUID5(mention.Twter().URI)) } tags := make(strList, 0, len(twt.Tags())) for _, tag := range twt.Tags() { tags = append(tags, tag.Text()) } subject := twt.Subject() subjectTag := "" if subject != nil { if tag, ok := subject.Tag().(*lextwt.Tag); ok && tag != nil { subjectTag = tag.Text() } } args = append( args, feedID, // feed_id twtID, // ulid fmt.Sprintf("%+l", twt), // text twt.Hash(), // hash subjectTag, // conv mentions.ToStrList(), // mentions tags, // tags ) } for query, args := range chunk(args, insertTwt, db.MaxVariableNumber) { fmt.Println("store", f.Twter().URI, len(args)) _, err = tx.ExecContext( ctx, query, args..., ) if err != nil { return err } } args = args[:0] args = append(args, feedID, // feed_id nil, // parent_id f.Twter().DomainNick(), // nick f.Twter().URI, // uri "warm", // state TwtTime{Time: loadTS, Valid: true}, // last_scan_on refreshRate, // refresh_rate ) if prev, ok := f.Info().GetN("prev", 0); ok { _, part, ok := strings.Cut(prev.Value(), " ") if ok { uri := f.Twter().URI if u, ok := f.Info().GetN("url", 0); ok { uri = u.Value() } if u, ok := f.Info().GetN("uri", 0); ok { uri = u.Value() } part = uri[:strings.LastIndex(uri, "/")+1] + part childID := urlNS.UUID5(part) fmt.Println("found prev", uri, part) args = append(args, childID, // feed_id feedID, // parent_id f.Twter().DomainNick(), // nick part, // uri "once", // state nil, // last_scan_on 0, // refresh_rate ) } } for uri, nick := range followMap { args = append(args, urlNS.UUID5(uri), // feed_id nil, // parent_id nick, // nick uri, // uri "warm", // state nil, // last_scan_on refreshRate, // refresh_rate ) } for query, args := range chunk(args, insertFeed, db.MaxVariableNumber) { _, err = tx.ExecContext( ctx, query, args..., ) if err != nil { return err } } return tx.Commit() } func storeRegistry(ctx context.Context, db db, in io.Reader) error { ctx, span := otel.Span(ctx) defer span.End() twters := make(map[string]string) args := make([]any, 0, 1024*16) for line := range lextwt.IterRegistry(in) { twt, ok := line.(*lextwt.Twt) if !ok { continue } nick := twt.Twter().DomainNick() uri := twt.Twter().URI feedID := urlNS.UUID5(uri) twtID := makeULID(twt) text := fmt.Sprintf("%+l", twt) // if !strings.HasPrefix(uri, "http") { // fmt.Println("skip bad uri ", nick, uri) // continue // } // if strings.HasPrefix(nick, "http") { // fmt.Println("skip bad nick", nick, uri) // continue // } twters[uri] = nick mentions := make(uuids, 0, len(twt.Mentions())) for _, mention := range twt.Mentions() { twters[uri] = nick mentions = append(mentions, urlNS.UUID5(mention.Twter().URI)) } tags := make(strList, 0, len(twt.Tags())) for _, tag := range twt.Tags() { tags = append(tags, tag.Text()) } subject := twt.Subject() subjectTag := "" if tag, ok := subject.Tag().(*lextwt.Tag); ok && tag != nil { subjectTag = tag.Text() } args = append( args, feedID, // feed_id twtID, // ulid text, // text twt.Hash(), // hash subjectTag, // conv mentions.ToStrList(), // mentions tags, // tags ) if len(args) >= 16*1022 { tx, err := db.BeginTx(ctx, nil) if err != nil { return err } for query, args := range chunk(args, insertTwt, db.MaxVariableNumber) { // fmt.Println("store", len(args)) _, err = tx.ExecContext( ctx, query, args..., ) if err != nil { return err } } args = args[:0] for uri, nick := range twters { // if !strings.HasPrefix(uri, "http") { // fmt.Println("skip", nick, uri) // continue // } // if strings.HasPrefix(nick, "http") { // fmt.Println("skip bad nick", nick, uri) // continue // } feedID := urlNS.UUID5(uri) args = append(args, feedID, // feed_id nil, // parent_id nick, // nick uri, // uri PermanentlyDead, // state nil, // last_scan_on TenYear, // refresh_rate ) } for query, args := range chunk(args, insertFeed, db.MaxVariableNumber) { _, err = tx.ExecContext( ctx, query, args..., ) if err != nil { return err } } args = args[:0] err = tx.Commit() if err != nil { return err } } } return refreshLastTwt(ctx, db) } func (feed *Feed) MakeHTTPRequest(ctx context.Context) (*http.Request, error) { for _, host := range permaban { if strings.Contains(feed.URI, host) { return nil, fmt.Errorf("%w: permaban: %s", ErrPermanentlyDead, feed.URI) } } req, err := http.NewRequestWithContext(ctx, "GET", feed.URI, nil) if err != nil { return nil, fmt.Errorf("creating HTTP request failed: %w", err) } req.Header.Add("Accept", "text/plain") if !feed.LastModified.Valid { req.Header.Add("If-Modified-Since", feed.LastModified.Time.Format(http.TimeFormat)) } if feed.ETag.Valid { req.Header.Add("If-None-Match", feed.ETag.String) } // TODO: this is probably not needed. if feed.DiscloseFeedURL != "" && feed.DiscloseNick != "" { req.Header.Set("User-Agent", fmt.Sprintf("xt/%s (+%s; @%s)", feed.Version, feed.DiscloseFeedURL, feed.DiscloseNick)) } else { req.Header.Set("User-Agent", fmt.Sprintf("xt/%s", feed.Version)) } return req, nil } type TwtTime struct { Time time.Time Valid bool // Valid is true if Time is not NULL } // Scan implements the [Scanner] interface. func (n *TwtTime) Scan(value any) error { var err error switch value := value.(type) { case nil: n.Time, n.Valid = time.Time{}, false return nil case string: n.Valid = true n.Time, err = time.Parse(time.RFC3339, value) case time.Time: n.Valid = true n.Time = value } return err } // Value implements the [driver.Valuer] interface. func (n TwtTime) Value() (driver.Value, error) { if !n.Valid { return nil, nil } return n.Time.Format(time.RFC3339), nil } func makeULID(twt types.Twt) ulid.ULID { h64 := fnv.New64a() h16 := fnv.New32a() text := []byte(fmt.Sprintf("%+l", twt)) b := make([]byte, 10) copy(b, h16.Sum(text)[:2]) copy(b[2:], h64.Sum(text)) u := ulid.ULID{} u.SetTime(ulid.Timestamp(twt.Created())) u.SetEntropy(b) return u } func chunk(args []any, qry func(int) (string, int), maxArgs int) iter.Seq2[string, []any] { _, size := qry(1) itemsPerIter := maxArgs / size if len(args) < size { return func(yield func(string, []any) bool) {} } if len(args) < maxArgs { return func(yield func(string, []any) bool) { query, _ := qry(len(args) / size) yield(query, args) } } return func(yield func(string, []any) bool) { for len(args) > 0 { if len(args) > maxArgs { query, size := qry(itemsPerIter) if !yield(query, args[:size]) { return } args = args[size:] continue } query, _ := qry(len(args) / size) yield(query, args) return } } } func refreshLastTwt(ctx context.Context, db db) error { _, err := db.ExecContext(ctx, ` insert into last_twt_on select feed_id, max(strftime('%Y-%m-%dT%H:%M:%fZ', (substring(text, 1, instr(text, ' ')-1)))) last_twt_on from twts group by feed_id on conflict do update set last_twt_on = excluded.last_twt_on; `) return err }