package main import ( "cmp" "context" "crypto/sha3" "database/sql" "database/sql/driver" "fmt" "io" "iter" "net/http" "net/url" "slices" "strings" "time" _ "embed" "github.com/oklog/ulid/v2" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" "go.sour.is/xt/internal/otel" "go.yarn.social/lextwt" "go.yarn.social/types" ) type Feed struct { FeedID uuid ParentID uuid HashURI string URI string Nick string State State LastScanOn TwtTime RefreshRate int NextScanOn TwtTime LastTwtOn TwtTime LastModified TwtTime LastError sql.NullString ETag sql.NullString Version string DiscloseFeedURL string DiscloseNick string } type State string const ( PermanentlyDead State = "permanantly-dead" Frozen State = "frozen" Cold State = "cold" Warm State = "warm" Hot State = "hot" Once State = "once" ) var ( //go:embed init.sql initSQL string insertFeed = func(r int) (string, int) { repeat := "" if r > 1 { repeat = strings.Repeat(", (?, ?, ?, ?, ?, ?, ?)", r-1) } return ` insert into feeds ( feed_id, parent_id, nick, uri, state, last_scan_on, refresh_rate ) values (?, ?, ?, ?, ?, ?, ?)` + repeat + ` ON CONFLICT (feed_id) DO NOTHING `, r * 7 } updateFeed = ` update feeds set nick = ?, state = ?, last_scan_on = ?, refresh_rate = ?, last_modified_on = ?, last_etag = ?, last_error = ? where feed_id = ? ` insertTwt = func(r int) (string, int) { repeat := "" if r > 1 { repeat = strings.Repeat(", (?, ?, ?, ?, ?, ?, ?)", r-1) } return ` insert into twts ( feed_id, ulid, text, hash, conv, mentions, tags ) values (?, ?, ?, ?, ?, ?, ?)` + repeat + ` ON CONFLICT (feed_id, ulid) DO UPDATE SET conv = excluded.conv, hash = excluded.hash `, r * 7 } fetchFeeds = ` select feed_id, parent_id, coalesce(hashing_uri, uri) hash_uri, uri, nick, state, last_scan_on, strftime( '%Y-%m-%dT%H:%M:%fZ', coalesce(last_scan_on, '1901-01-01'), '+'||abs(refresh_rate + cast(random() % 30 as int))||' seconds' ) next_scan_on, coalesce(last_twt_on, '1901-01-01T00:00:00Z') last_twt_on, refresh_rate, last_modified_on, last_etag from feeds left join ( select feed_id, max(strftime('%Y-%m-%dT%H:%M:%fZ', (substring(text, 1, instr(text, ' ')-1)))) last_twt_on from twts group by feed_id ) using (feed_id) left join ( select feed_id parent_id, uri hashing_uri from feeds where parent_id is null ) using (parent_id) where datetime( coalesce(last_scan_on, '1901-01-01'), '+'||abs(refresh_rate+cast(random()%30 as int))||' seconds' ) < datetime(current_timestamp, '+3 minutes') ` permaban = []string{ "//lublin.se/", "//enotty.dk/", } ) func (f *Feed) Save(ctx context.Context, db db) error { ctx, span := otel.Span(ctx) defer span.End() _, err := db.ExecContext( ctx, updateFeed, f.Nick, f.State, // state f.LastScanOn, // last_scan_on f.RefreshRate, // refresh_rate f.LastModified, // last_modified_on f.ETag, // last_etag f.LastError, // last_error f.FeedID, // feed_id ) return err } func (f *Feed) Scan(res interface{ Scan(...any) error }) error { f.State = "load" var err error f.Version = "0.0.1" err = res.Scan( &f.FeedID, &f.ParentID, &f.HashURI, &f.URI, &f.Nick, &f.State, &f.LastScanOn, &f.NextScanOn, &f.LastTwtOn, &f.RefreshRate, &f.LastModified, &f.ETag, ) if err != nil { return err } return err } func loadFeeds(ctx context.Context, db db) (iter.Seq[Feed], error) { ctx, span := otel.Span(ctx) var err error var res *sql.Rows res, err = db.QueryContext(ctx, fetchFeeds) if err != nil { return slices.Values([]Feed{}), err } return func(yield func(Feed) bool) { defer span.End() for res.Next() { var f Feed err = f.Scan(res) if err != nil { span.RecordError(err) return } if !yield(f) { return } } }, err } func storeFeed(ctx context.Context, db db, f types.TwtFile) error { ctx, span := otel.Span(ctx) defer span.End() loadTS := time.Now() refreshRate := 600 feedID := urlNS.UUID5(cmp.Or(f.Twter().HashingURI, f.Twter().URI)) tx, err := db.BeginTx(ctx, nil) if err != nil { return err } followers := f.Info().GetAll("follow") followMap := make(map[string]string, len(followers)) for _, f := range f.Info().GetAll("follow") { nick, uri, ok := strings.Cut(f.Value(), " ") if !ok { continue } nick = strings.TrimSpace(nick) uri = strings.TrimSpace(uri) if _, err := url.Parse(uri); err != nil { continue } followMap[uri] = nick } defer tx.Rollback() twts := f.Twts() _, size := insertTwt(len(twts)) args := make([]any, 0, size) for _, twt := range twts { twtID := makeULID(twt) mentions := make(uuids, 0, len(twt.Mentions())) for _, mention := range twt.Mentions() { followMap[mention.Twter().URI] = mention.Twter().Nick mentions = append(mentions, urlNS.UUID5(mention.Twter().URI)) } tags := make(strList, 0, len(twt.Tags())) for _, tag := range twt.Tags() { tags = append(tags, tag.Text()) } subject := twt.Subject() subjectTag := "" if subject != nil { if tag, ok := subject.Tag().(*lextwt.Tag); ok && tag != nil { subjectTag = tag.Text() } } args = append( args, feedID, // feed_id twtID, // ulid fmt.Sprintf("%+l", twt), // text twt.Hash(), // hash subjectTag, // conv mentions.ToStrList(), // mentions tags, // tags ) } for query, args := range chunk(args, insertTwt, db.MaxVariableNumber) { fmt.Println("store", f.Twter().URI, len(args)) _, err = tx.ExecContext( ctx, query, args..., ) if err != nil { return err } } args = args[:0] args = append(args, feedID, // feed_id nil, // parent_id f.Twter().DomainNick(), // nick f.Twter().URI, // uri "warm", // state TwtTime{Time: loadTS, Valid: true}, // last_scan_on refreshRate, // refresh_rate ) if prev, ok := f.Info().GetN("prev", 0); ok { _, part, ok := strings.Cut(prev.Value(), " ") if ok { uri := f.Twter().URI if u, ok := f.Info().GetN("url", 0); ok { uri = u.Value() } if u, ok := f.Info().GetN("uri", 0); ok { uri = u.Value() } part = uri[:strings.LastIndex(uri, "/")+1] + part childID := urlNS.UUID5(part) fmt.Println("found prev", uri, part) args = append(args, childID, // feed_id feedID, // parent_id f.Twter().DomainNick(), // nick part, // uri "once", // state nil, // last_scan_on 0, // refresh_rate ) } } for uri, nick := range followMap { args = append(args, urlNS.UUID5(uri), // feed_id nil, // parent_id nick, // nick uri, // uri "warm", // state nil, // last_scan_on refreshRate, // refresh_rate ) } for query, args := range chunk(args, insertFeed, db.MaxVariableNumber) { _, err = tx.ExecContext( ctx, query, args..., ) if err != nil { return err } } return tx.Commit() } func storeRegistry(ctx context.Context, db db, in io.Reader) error { ctx, span := otel.Span(ctx) defer span.End() twters := make(map[string]string) args := make([]any, 0, 1024*16) i := 0 for line := range lextwt.IterRegistry(in) { twt, ok := line.(*lextwt.Twt) if !ok { continue } nick := twt.Twter().DomainNick() uri := twt.Twter().URI feedID := urlNS.UUID5(uri) twtID := makeULID(twt) text := fmt.Sprintf("%+l", twt) // if !strings.HasPrefix(uri, "http") { // fmt.Println("skip bad uri ", nick, uri) // continue // } // if strings.HasPrefix(nick, "http") { // fmt.Println("skip bad nick", nick, uri) // continue // } twters[uri] = nick mentions := make(uuids, 0, len(twt.Mentions())) for _, mention := range twt.Mentions() { twters[uri] = nick mentions = append(mentions, urlNS.UUID5(mention.Twter().URI)) } tags := make(strList, 0, len(twt.Tags())) for _, tag := range twt.Tags() { tags = append(tags, tag.Text()) } subject := twt.Subject() subjectTag := "" if tag, ok := subject.Tag().(*lextwt.Tag); ok && tag != nil { subjectTag = tag.Text() } args = append( args, feedID, // feed_id twtID, // ulid text, // text twt.Hash(), // hash subjectTag, // conv mentions.ToStrList(), // mentions tags, // tags ) if len(args) >= 16*1022 { i += len(args) fmt.Println("store", i/7, i) tx, err := db.BeginTx(ctx, nil) if err != nil { return err } for query, args := range chunk(args, insertTwt, db.MaxVariableNumber) { // fmt.Println("store", len(args)) _, err = tx.ExecContext( ctx, query, args..., ) if err != nil { return err } } args = args[:0] for uri, nick := range twters { // if !strings.HasPrefix(uri, "http") { // fmt.Println("skip", nick, uri) // continue // } // if strings.HasPrefix(nick, "http") { // fmt.Println("skip bad nick", nick, uri) // continue // } feedID := urlNS.UUID5(uri) args = append(args, feedID, // feed_id nil, // parent_id nick, // nick uri, // uri PermanentlyDead, // state nil, // last_scan_on TenYear, // refresh_rate ) } for query, args := range chunk(args, insertFeed, db.MaxVariableNumber) { _, err = tx.ExecContext( ctx, query, args..., ) if err != nil { return err } } args = args[:0] err = tx.Commit() if err != nil { return err } } } return refreshLastTwt(ctx, db) } func (feed *Feed) MakeHTTPRequest(ctx context.Context) (*http.Request, error) { for _, host := range permaban { if strings.Contains(feed.URI, host) { return nil, fmt.Errorf("%w: permaban: %s", ErrPermanentlyDead, feed.URI) } } req, err := http.NewRequestWithContext(ctx, "GET", feed.URI, nil) if err != nil { return nil, fmt.Errorf("creating HTTP request failed: %w", err) } req.Header.Add("Accept", "text/plain") if !feed.LastModified.Valid { req.Header.Add("If-Modified-Since", feed.LastModified.Time.Format(http.TimeFormat)) } if feed.ETag.Valid { req.Header.Add("If-None-Match", feed.ETag.String) } // TODO: this is probably not needed. if feed.DiscloseFeedURL != "" && feed.DiscloseNick != "" { req.Header.Set("User-Agent", fmt.Sprintf("xt/%s (+%s; @%s)", feed.Version, feed.DiscloseFeedURL, feed.DiscloseNick)) } else { req.Header.Set("User-Agent", fmt.Sprintf("xt/%s", feed.Version)) } return req, nil } type TwtTime struct { Time time.Time Valid bool // Valid is true if Time is not NULL } // Scan implements the [Scanner] interface. func (n *TwtTime) Scan(value any) error { var err error switch value := value.(type) { case nil: n.Time, n.Valid = time.Time{}, false return nil case string: n.Valid = true n.Time, err = time.Parse(time.RFC3339, value) case time.Time: n.Valid = true n.Time = value } return err } // Value implements the [driver.Valuer] interface. func (n TwtTime) Value() (driver.Value, error) { if !n.Valid { return nil, nil } return n.Time.Format(time.RFC3339), nil } func makeULID(twt types.Twt) ulid.ULID { text := fmt.Appendf(nil, "%s\t%+l", twt.Twter().URI, twt) u := ulid.ULID{} u.SetTime(ulid.Timestamp(twt.Created())) u.SetEntropy(sha3.SumSHAKE128(text, 10)) return u } func chunk(args []any, qry func(int) (string, int), maxArgs int) iter.Seq2[string, []any] { _, size := qry(1) itemsPerIter := maxArgs / size if len(args) < size { return func(yield func(string, []any) bool) {} } if len(args) < maxArgs { return func(yield func(string, []any) bool) { query, _ := qry(len(args) / size) yield(query, args) } } return func(yield func(string, []any) bool) { for len(args) > 0 { if len(args) > maxArgs { query, size := qry(itemsPerIter) if !yield(query, args[:size]) { return } args = args[size:] continue } query, _ := qry(len(args) / size) yield(query, args) return } } } func refreshLastTwt(ctx context.Context, db db) error { qry := ` delete from last_twt_on; insert into last_twt_on (feed_id, last_twt_on) select distinct feed_id, max(strftime('%Y-%m-%dT%H:%M:%fZ', (substring(text, 1, instr(text, ' ')-1)))) last_twt_on from twts group by feed_id; delete from twt_mentions; insert into twt_mentions (ulid, feed_id) select distinct ulid, unhex(replace(trim(value,'{}'),'-','')) feed_id from twts, json_each(mentions); ` var err error for _, stmt := range strings.Split(qry, ";") { _, err = db.ExecContext(ctx, stmt) if err != nil { return err } } return err } func fetchTwts(ctx context.Context, db db, uri string, limit int, offset int64) ([]types.Twt, int64, int64, error) { ctx, span := otel.Span(ctx) defer span.End() args := make([]any, 0, 3) where := `where feed_id in (select feed_id from feeds where state != 'permanantly-dead')` if uri != "" { feed_id := urlNS.UUID5(uri) where = "where feed_id = ?" args = append(args, feed_id) } var end int64 err := db.QueryRowContext(ctx, ` select count(*) n from twts `+where+``, args...).Scan(&end) span.RecordError(err) if err != nil { return nil, 0, 0, err } if offset < 1 { offset += end } offset = max(1, offset) args = append(args, limit, offset-int64(limit)) span.AddEvent("twts", trace.WithAttributes( attribute.Int("limit", limit), attribute.Int64("offset-end", offset), attribute.Int64("offset-start", offset-int64(limit)), attribute.Int64("max", end), )) qry := ` SELECT feed_id, hash, conv, coalesce(nick, 'nobody') nick, coalesce(uri, 'https://empty.txt') uri, text FROM twts join ( select feed_id, nick, uri from feeds ) using (feed_id) where rowid in ( select rowid from twts ` + where + ` order by ulid asc limit ? offset ? ) order by ulid asc` fmt.Println(qry, args) rows, err := db.QueryContext( ctx, qry, args..., ) if err != nil { span.RecordError(err) return nil, 0, 0, err } defer rows.Close() var twts []types.Twt for rows.Next() { var o struct { FeedID string Hash string Conv string Dt string Nick string URI string Text string } err = rows.Scan(&o.FeedID, &o.Hash, &o.Conv, &o.Nick, &o.URI, &o.Text) if err != nil { span.RecordError(err) return nil, 0, 0, err } twter := types.NewTwter(o.Nick, o.URI) twt, _ := lextwt.ParseLine(o.Text, &twter) twts = append(twts, twt) } return twts, offset, end, err } func fetchUsers(ctx context.Context, db db, uri, q string) ([]types.Twt, error) { ctx, span := otel.Span(ctx) defer span.End() where := `where parent_id is null and state not in ('permanantly-dead', 'frozen') and last_twt_on is not null` args := make([]any, 0) if uri != "" { where = `where feed_id = ? or parent_id = ?` feed_id := urlNS.UUID5(uri) args = append(args, feed_id, feed_id) } else if q != "" { where = `where nick like ?` args = append(args, "%"+q+"%") } qry := ` SELECT feed_id, uri, nick, last_scan_on, coalesce(last_twt_on, last_scan_on) last_twt_on FROM feeds left join last_twt_on using (feed_id) ` + where + ` order by nick, uri ` fmt.Println(qry, args) rows, err := db.QueryContext(ctx, qry, args...) if err != nil { span.RecordError(err) return nil, err } defer rows.Close() var twts []types.Twt for rows.Next() { var o struct { FeedID string URI string Nick string Dt TwtTime LastTwtOn TwtTime } err = rows.Scan(&o.FeedID, &o.URI, &o.Nick, &o.Dt, &o.LastTwtOn) if err != nil { span.RecordError(err) return nil, err } twts = append(twts, lextwt.NewTwt( types.NewTwter(o.Nick, o.URI), lextwt.NewDateTime(o.Dt.Time, o.LastTwtOn.Time.Format(time.RFC3339)), nil, )) } return twts, nil } func fetchMentions(ctx context.Context, db db, mention uuid, limit int, offset int64) ([]types.Twt, int64, int64, error) { ctx, span := otel.Span(ctx) defer span.End() args := make([]any, 0, 3) args = append(args, mention) var end int64 err := db.QueryRowContext(ctx, ` select count(*) n from twt_mentions where feed_id = ?`, args...).Scan(&end) span.RecordError(err) if err != nil { return nil, 0, 0, err } fmt.Println(mention.MarshalText(), end, err) if offset < 1 { offset += end } limit = min(100, max(1, limit)) offset = max(1, offset) args = append(args, limit, offset-int64(limit)) qry := ` SELECT feed_id, hash, conv, coalesce(nick, 'nobody') nick, coalesce(uri, 'https://empty.txt') uri, text FROM twts join ( select feed_id, nick, uri from feeds ) using (feed_id) where rowid in ( select rowid from twts where ulid in (select ulid from twt_mentions where feed_id = ?) order by ulid asc limit ? offset ? ) order by ulid asc ` fmt.Println(qry, args) rows, err := db.QueryContext( ctx, qry, args..., ) if err != nil { span.RecordError(err) return nil, 0, 0, err } defer rows.Close() var twts []types.Twt for rows.Next() { var o struct { FeedID string Hash string Conv string Dt string Nick string URI string Text string } err = rows.Scan(&o.FeedID, &o.Hash, &o.Conv, &o.Nick, &o.URI, &o.Text) if err != nil { span.RecordError(err) return nil, 0, 0, err } twter := types.NewTwter(o.Nick, o.URI) // o.Text = strings.ReplaceAll(o.Text, "\n", "\u2028") twt, _ := lextwt.ParseLine(o.Text, &twter) twts = append(twts, twt) } if err := rows.Err(); err != nil { fmt.Println(err) return nil, 0, 0, err } return twts, offset, end, err } func fetchConv(ctx context.Context, db db, hash string, limit int, offset int64) ([]types.Twt, int64, int64, error) { ctx, span := otel.Span(ctx) defer span.End() var end int64 err := db.QueryRowContext(ctx, ` select count(*) n from twts where hash = $1 or conv = $1`, hash).Scan(&end) span.RecordError(err) if err != nil { return nil, 0, 0, err } rows, err := db.QueryContext( ctx, ` SELECT feed_id, hash, conv, nick, uri, text FROM twts JOIN ( SELECT feed_id, nick, uri FROM feeds ) using (feed_id) WHERE hash = $1 or conv = $1 order by ulid asc`, hash, ) if err != nil { span.RecordError(err) return nil, 0, 0, err } defer rows.Close() var twts []types.Twt for rows.Next() { var o struct { FeedID string Hash string Conv string Dt string Nick string URI string Text string } err = rows.Scan(&o.FeedID, &o.Hash, &o.Conv, &o.Nick, &o.URI, &o.Text) if err != nil { span.RecordError(err) return nil, 0,0,err } twter := types.NewTwter(o.Nick, o.URI) twt, _ := lextwt.ParseLine(o.Text, &twter) twts = append(twts, twt) } err = rows.Err() return twts, offset, end, err }