xt/feed.go
2025-04-07 13:17:44 -06:00

945 lines
19 KiB
Go

package main
import (
"cmp"
"context"
"crypto/sha3"
"database/sql"
"database/sql/driver"
"fmt"
"io"
"iter"
"net/http"
"net/url"
"slices"
"strings"
"time"
_ "embed"
"github.com/oklog/ulid/v2"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.sour.is/xt/internal/otel"
"go.sour.is/xt/internal/uuid"
"go.yarn.social/lextwt"
"go.yarn.social/types"
)
type Feed struct {
FeedID uuid.UUID
ParentID uuid.UUID
HashURI string
URI string
Nick string
State State
LastScanOn TwtTime
RefreshRate int
NextScanOn TwtTime
LastTwtOn TwtTime
LastModified TwtTime
LastError sql.NullString
ETag sql.NullString
Version string
DiscloseFeedURL string
DiscloseNick string
}
type State string
const (
PermanentlyDead State = "permanantly-dead"
Frozen State = "frozen"
Cold State = "cold"
Warm State = "warm"
Hot State = "hot"
Once State = "once"
)
var (
//go:embed init.sql
initSQL string
insertFeed = func(r int) (string, int) {
repeat := ""
if r > 1 {
repeat = strings.Repeat(", (?, ?, ?, ?, ?, ?, ?)", r-1)
}
return `
insert into feeds (
feed_id,
parent_id,
nick,
uri,
state,
last_scan_on,
refresh_rate
)
values (?, ?, ?, ?, ?, ?, ?)` + repeat + `
ON CONFLICT (feed_id) DO NOTHING
`, r * 7
}
updateFeed = `
update feeds set
nick = ?,
state = ?,
last_scan_on = ?,
refresh_rate = ?,
last_modified_on = ?,
last_etag = ?,
last_error = ?
where feed_id = ?
`
insertTwt = func(r int) (string, int) {
repeat := ""
if r > 1 {
repeat = strings.Repeat(", (?, ?, ?, ?, ?, ?, ?)", r-1)
}
return `
insert into twts (
feed_id,
ulid,
text,
hash,
conv,
mentions,
tags
)
values (?, ?, ?, ?, ?, ?, ?)` + repeat + `
ON CONFLICT (feed_id, ulid) DO UPDATE SET
conv = excluded.conv,
hash = excluded.hash
`, r * 7
}
fetchFeeds = `
select
feed_id,
parent_id,
coalesce(hashing_uri, uri) hash_uri,
uri,
nick,
state,
last_scan_on,
strftime(
'%Y-%m-%dT%H:%M:%fZ',
coalesce(last_scan_on, '1901-01-01'),
'+'||abs(refresh_rate + cast(random() % 30 as int))||' seconds'
) next_scan_on,
coalesce(last_twt_on, '1901-01-01T00:00:00Z') last_twt_on,
refresh_rate,
last_modified_on,
last_etag
from feeds
left join (
select feed_id, max(strftime('%Y-%m-%dT%H:%M:%fZ', (substring(text, 1, instr(text, ' ')-1)))) last_twt_on
from twts group by feed_id
) using (feed_id)
left join (
select
feed_id parent_id,
uri hashing_uri
from feeds
where parent_id is null
) using (parent_id)
where datetime(
coalesce(last_scan_on, '1901-01-01'),
'+'||abs(refresh_rate+cast(random()%30 as int))||' seconds'
) < datetime(current_timestamp, '+3 minutes')
`
permaban = []string{
"//lublin.se/",
"//enotty.dk/",
}
)
func (f *Feed) Save(ctx context.Context, db db) error {
ctx, span := otel.Span(ctx)
defer span.End()
_, err := db.ExecContext(
ctx,
updateFeed,
f.Nick,
f.State, // state
f.LastScanOn, // last_scan_on
f.RefreshRate, // refresh_rate
f.LastModified, // last_modified_on
f.ETag, // last_etag
f.LastError, // last_error
f.FeedID, // feed_id
)
return err
}
func (f *Feed) Scan(res interface{ Scan(...any) error }) error {
f.State = "load"
var err error
f.Version = "0.0.1"
err = res.Scan(
&f.FeedID,
&f.ParentID,
&f.HashURI,
&f.URI,
&f.Nick,
&f.State,
&f.LastScanOn,
&f.NextScanOn,
&f.LastTwtOn,
&f.RefreshRate,
&f.LastModified,
&f.ETag,
)
if err != nil {
return err
}
return err
}
func loadFeeds(ctx context.Context, db db) (iter.Seq[Feed], error) {
ctx, span := otel.Span(ctx)
var err error
var res *sql.Rows
res, err = db.QueryContext(ctx, fetchFeeds)
if err != nil {
return slices.Values([]Feed{}), err
}
return func(yield func(Feed) bool) {
defer span.End()
for res.Next() {
var f Feed
err = f.Scan(res)
if err != nil {
span.RecordError(err)
return
}
if !yield(f) {
return
}
}
}, err
}
func storeFeed(ctx context.Context, db db, f types.TwtFile) error {
ctx, span := otel.Span(ctx)
defer span.End()
loadTS := time.Now()
refreshRate := 600
feedURI, _ := f.Info().GetN("uri", 0)
feedID := uuid.UrlNS.UUID5(cmp.Or(
feedURI.Value(),
f.Twter().HashingURI,
f.Twter().URI,
))
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return err
}
followers := f.Info().GetAll("follow")
followMap := make(map[string]string, len(followers))
for _, f := range f.Info().GetAll("follow") {
nick, uri, ok := strings.Cut(f.Value(), " ")
if !ok {
continue
}
nick = strings.TrimSpace(nick)
uri = strings.TrimSpace(uri)
if _, err := url.Parse(uri); err != nil {
continue
}
followMap[uri] = nick
}
defer tx.Rollback()
twts := f.Twts()
_, size := insertTwt(len(twts))
args := make([]any, 0, size)
for _, twt := range twts {
twtID := makeULID(twt)
mentions := make(uuid.UUIDs, 0, len(twt.Mentions()))
for _, mention := range twt.Mentions() {
followMap[mention.Twter().URI] = mention.Twter().Nick
mentions = append(mentions, uuid.UrlNS.UUID5(mention.Twter().URI))
}
tags := make(uuid.List, 0, len(twt.Tags()))
for _, tag := range twt.Tags() {
tags = append(tags, tag.Text())
}
subject := twt.Subject()
subjectTag := ""
if subject != nil {
if tag, ok := subject.Tag().(*lextwt.Tag); ok && tag != nil {
subjectTag = tag.Text()
}
}
args = append(
args,
feedID, // feed_id
twtID, // ulid
fmt.Sprintf("%+l", twt), // text
twt.Hash(), // hash
subjectTag, // conv
mentions.ToStrList(), // mentions
tags, // tags
)
}
for query, args := range chunk(args, insertTwt, db.MaxVariableNumber) {
fmt.Println("store", f.Twter().URI, len(args))
_, err = tx.ExecContext(
ctx,
query,
args...,
)
if err != nil {
return err
}
}
args = args[:0]
args = append(args,
feedID, // feed_id
nil, // parent_id
f.Twter().DomainNick(), // nick
f.Twter().URI, // uri
"warm", // state
TwtTime{Time: loadTS, Valid: true}, // last_scan_on
refreshRate, // refresh_rate
)
if prev, ok := f.Info().GetN("prev", 0); ok {
_, part, ok := strings.Cut(prev.Value(), " ")
if ok {
uri := f.Twter().URI
if u, ok := f.Info().GetN("url", 0); ok {
uri = u.Value()
}
if u, ok := f.Info().GetN("uri", 0); ok {
uri = u.Value()
}
part = uri[:strings.LastIndex(uri, "/")+1] + part
childID := uuid.UrlNS.UUID5(part)
fmt.Println("found prev", uri, part)
args = append(args,
childID, // feed_id
feedID, // parent_id
f.Twter().DomainNick(), // nick
part, // uri
"once", // state
nil, // last_scan_on
0, // refresh_rate
)
}
}
for uri, nick := range followMap {
args = append(args,
uuid.UrlNS.UUID5(uri), // feed_id
nil, // parent_id
nick, // nick
uri, // uri
"warm", // state
nil, // last_scan_on
refreshRate, // refresh_rate
)
}
for query, args := range chunk(args, insertFeed, db.MaxVariableNumber) {
_, err = tx.ExecContext(
ctx,
query,
args...,
)
if err != nil {
return err
}
}
return tx.Commit()
}
func storeRegistry(ctx context.Context, db db, in io.Reader) error {
ctx, span := otel.Span(ctx)
defer span.End()
twters := make(map[string]string)
args := make([]any, 0, 1024*16)
i := 0
for line := range lextwt.IterRegistry(in) {
twt, ok := line.(*lextwt.Twt)
if !ok {
continue
}
nick := twt.Twter().DomainNick()
uri := twt.Twter().URI
feedID := uuid.UrlNS.UUID5(uri)
twtID := makeULID(twt)
text := fmt.Sprintf("%+l", twt)
// if !strings.HasPrefix(uri, "http") {
// fmt.Println("skip bad uri ", nick, uri)
// continue
// }
// if strings.HasPrefix(nick, "http") {
// fmt.Println("skip bad nick", nick, uri)
// continue
// }
twters[uri] = nick
mentions := make(uuid.UUIDs, 0, len(twt.Mentions()))
for _, mention := range twt.Mentions() {
twters[uri] = nick
mentions = append(mentions, uuid.UrlNS.UUID5(mention.Twter().URI))
}
tags := make(uuid.List, 0, len(twt.Tags()))
for _, tag := range twt.Tags() {
tags = append(tags, tag.Text())
}
subject := twt.Subject()
subjectTag := ""
if tag, ok := subject.Tag().(*lextwt.Tag); ok && tag != nil {
subjectTag = tag.Text()
}
args = append(
args,
feedID, // feed_id
twtID, // ulid
text, // text
twt.Hash(), // hash
subjectTag, // conv
mentions.ToStrList(), // mentions
tags, // tags
)
if len(args) >= 16*1022 {
i += len(args)
fmt.Println("store", i/7, i)
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return err
}
for query, args := range chunk(args, insertTwt, db.MaxVariableNumber) {
// fmt.Println("store", len(args))
_, err = tx.ExecContext(
ctx,
query,
args...,
)
if err != nil {
return err
}
}
args = args[:0]
for uri, nick := range twters {
// if !strings.HasPrefix(uri, "http") {
// fmt.Println("skip", nick, uri)
// continue
// }
// if strings.HasPrefix(nick, "http") {
// fmt.Println("skip bad nick", nick, uri)
// continue
// }
feedID := uuid.UrlNS.UUID5(uri)
args = append(args,
feedID, // feed_id
nil, // parent_id
nick, // nick
uri, // uri
PermanentlyDead, // state
nil, // last_scan_on
TenYear, // refresh_rate
)
}
for query, args := range chunk(args, insertFeed, db.MaxVariableNumber) {
_, err = tx.ExecContext(
ctx,
query,
args...,
)
if err != nil {
return err
}
}
args = args[:0]
err = tx.Commit()
if err != nil {
return err
}
}
}
return refreshLastTwt(ctx, db)
}
func (feed *Feed) MakeHTTPRequest(ctx context.Context) (*http.Request, error) {
for _, host := range permaban {
if strings.Contains(feed.URI, host) {
return nil, fmt.Errorf("%w: permaban: %s", ErrPermanentlyDead, feed.URI)
}
}
req, err := http.NewRequestWithContext(ctx, "GET", feed.URI, nil)
if err != nil {
return nil, fmt.Errorf("creating HTTP request failed: %w", err)
}
req.Header.Add("Accept", "text/plain")
if !feed.LastModified.Valid {
req.Header.Add("If-Modified-Since", feed.LastModified.Time.Format(http.TimeFormat))
}
if feed.ETag.Valid {
req.Header.Add("If-None-Match", feed.ETag.String)
}
// TODO: this is probably not needed.
if feed.DiscloseFeedURL != "" && feed.DiscloseNick != "" {
req.Header.Set("User-Agent", fmt.Sprintf("xt/%s (+%s; @%s)",
feed.Version, feed.DiscloseFeedURL, feed.DiscloseNick))
} else {
req.Header.Set("User-Agent", fmt.Sprintf("xt/%s", feed.Version))
}
return req, nil
}
type TwtTime struct {
Time time.Time
Valid bool // Valid is true if Time is not NULL
}
// Scan implements the [Scanner] interface.
func (n *TwtTime) Scan(value any) error {
var err error
switch value := value.(type) {
case nil:
n.Time, n.Valid = time.Time{}, false
return nil
case string:
n.Time, err = time.Parse(time.RFC3339, value)
n.Valid = err == nil
case time.Time:
n.Valid = true
n.Time = value
}
return err
}
// Value implements the [driver.Valuer] interface.
func (n TwtTime) Value() (driver.Value, error) {
if !n.Valid {
return nil, nil
}
return n.Time.Format(time.RFC3339), nil
}
func makeULID(twt types.Twt) ulid.ULID {
text := fmt.Appendf(nil, "%s\t%+l", twt.Twter().URI, twt)
u := ulid.ULID{}
u.SetTime(ulid.Timestamp(twt.Created()))
u.SetEntropy(sha3.SumSHAKE128(text, 10))
return u
}
func chunk(args []any, qry func(int) (string, int), maxArgs int) iter.Seq2[string, []any] {
_, size := qry(1)
itemsPerIter := maxArgs / size
if len(args) < size {
return func(yield func(string, []any) bool) {}
}
if len(args) < maxArgs {
return func(yield func(string, []any) bool) {
query, _ := qry(len(args) / size)
yield(query, args)
}
}
return func(yield func(string, []any) bool) {
for len(args) > 0 {
if len(args) > maxArgs {
query, size := qry(itemsPerIter)
if !yield(query, args[:size]) {
return
}
args = args[size:]
continue
}
query, _ := qry(len(args) / size)
yield(query, args)
return
}
}
}
func refreshLastTwt(ctx context.Context, db db) error {
qry := `
delete from last_twt_on;
insert into last_twt_on (feed_id, last_twt_on)
select distinct
feed_id,
max(strftime('%Y-%m-%dT%H:%M:%fZ', (substring(text, 1, instr(text, ' ')-1)))) last_twt_on
from twts
group by feed_id;
delete from twt_mentions;
insert into twt_mentions (ulid, feed_id)
select distinct
ulid,
unhex(replace(trim(value,'{}'),'-','')) feed_id
from twts, json_each(mentions);
`
var err error
for _, stmt := range strings.Split(qry, ";") {
_, err = db.ExecContext(ctx, stmt)
if err != nil {
return err
}
}
return err
}
func fetchTwts(ctx context.Context, db db, uri string, limit int, offset int64) ([]types.Twt, int64, int64, error) {
ctx, span := otel.Span(ctx)
defer span.End()
args := make([]any, 0, 3)
where := `where feed_id in (select feed_id from feeds where state != 'permanantly-dead')`
if uri != "" {
feed_id := uuid.UrlNS.UUID5(uri)
where = "where feed_id = ?"
args = append(args, feed_id)
}
var end int64
err := db.QueryRowContext(ctx, `
select count(*) n from twts `+where+``, args...).Scan(&end)
span.RecordError(err)
if err != nil {
return nil, 0, 0, err
}
if offset < 1 {
offset += end
}
offset = max(1, offset)
args = append(args, limit, offset-int64(limit))
span.AddEvent("twts", trace.WithAttributes(
attribute.Int("limit", limit),
attribute.Int64("offset-end", offset),
attribute.Int64("offset-start", offset-int64(limit)),
attribute.Int64("max", end),
))
qry := `
SELECT
feed_id,
hash,
conv,
coalesce(nick, 'nobody') nick,
coalesce(uri, 'https://empty.txt') uri,
text
FROM twts
join (
select feed_id, nick, uri
from feeds
) using (feed_id)
where rowid in (
select rowid from twts
` + where + `
order by ulid asc
limit ?
offset ?
)
order by ulid asc`
fmt.Println(qry, args)
rows, err := db.QueryContext(
ctx, qry, args...,
)
if err != nil {
span.RecordError(err)
return nil, 0, 0, err
}
defer rows.Close()
var twts []types.Twt
for rows.Next() {
var o struct {
FeedID string
Hash string
Conv string
Dt string
Nick string
URI string
Text string
}
err = rows.Scan(&o.FeedID, &o.Hash, &o.Conv, &o.Nick, &o.URI, &o.Text)
if err != nil {
span.RecordError(err)
return nil, 0, 0, err
}
twter := types.NewTwter(o.Nick, o.URI)
twt, _ := lextwt.ParseLine(o.Text, &twter)
twts = append(twts, twt)
}
return twts, offset, end, err
}
func fetchUsers(ctx context.Context, db db, uri, q string) ([]types.Twt, error) {
ctx, span := otel.Span(ctx)
defer span.End()
where := `where parent_id is null and state not in ('permanantly-dead', 'frozen') and last_twt_on is not null`
args := make([]any, 0)
if uri != "" {
where = `where feed_id = ? or parent_id = ?`
feed_id := uuid.UrlNS.UUID5(uri)
args = append(args, feed_id, feed_id)
} else if q != "" {
where = `where nick like ?`
args = append(args, "%"+q+"%")
}
qry := `
SELECT
feed_id,
uri,
nick,
last_scan_on,
coalesce(last_twt_on, last_scan_on) last_twt_on
FROM feeds
left join last_twt_on using (feed_id)
` + where + `
order by nick, uri
`
fmt.Println(qry, args)
rows, err := db.QueryContext(ctx, qry, args...)
if err != nil {
span.RecordError(err)
return nil, err
}
defer rows.Close()
var twts []types.Twt
for rows.Next() {
var o struct {
FeedID string
URI string
Nick string
Dt TwtTime
LastTwtOn TwtTime
}
err = rows.Scan(&o.FeedID, &o.URI, &o.Nick, &o.Dt, &o.LastTwtOn)
if err != nil {
span.RecordError(err)
return nil, err
}
twts = append(twts, lextwt.NewTwt(
types.NewTwter(o.Nick, o.URI),
lextwt.NewDateTime(o.Dt.Time, o.LastTwtOn.Time.Format(time.RFC3339)),
nil,
))
}
return twts, nil
}
func fetchMentions(ctx context.Context, db db, mention uuid.UUID, limit int, offset int64) ([]types.Twt, int64, int64, error) {
ctx, span := otel.Span(ctx)
defer span.End()
args := make([]any, 0, 3)
args = append(args, mention)
var end int64
err := db.QueryRowContext(ctx, `
select count(*) n from twt_mentions where feed_id = ?`, args...).Scan(&end)
span.RecordError(err)
if err != nil {
return nil, 0, 0, err
}
fmt.Println(mention.MarshalText(), end, err)
if offset < 1 {
offset += end
}
limit = min(100, max(1, limit))
offset = max(1, offset)
args = append(args, limit, offset-int64(limit))
qry := `
SELECT
feed_id,
hash,
conv,
coalesce(nick, 'nobody') nick,
coalesce(uri, 'https://empty.txt') uri,
text
FROM twts
join (
select feed_id, nick, uri
from feeds
) using (feed_id)
where rowid in (
select rowid from twts
where ulid in (select ulid from twt_mentions where feed_id = ?)
order by ulid asc
limit ?
offset ?
)
order by ulid asc
`
fmt.Println(qry, args)
rows, err := db.QueryContext(
ctx, qry, args...,
)
if err != nil {
span.RecordError(err)
return nil, 0, 0, err
}
defer rows.Close()
var twts []types.Twt
for rows.Next() {
var o struct {
FeedID string
Hash string
Conv string
Dt string
Nick string
URI string
Text string
}
err = rows.Scan(&o.FeedID, &o.Hash, &o.Conv, &o.Nick, &o.URI, &o.Text)
if err != nil {
span.RecordError(err)
return nil, 0, 0, err
}
twter := types.NewTwter(o.Nick, o.URI)
// o.Text = strings.ReplaceAll(o.Text, "\n", "\u2028")
twt, _ := lextwt.ParseLine(o.Text, &twter)
twts = append(twts, twt)
}
if err := rows.Err(); err != nil {
fmt.Println(err)
return nil, 0, 0, err
}
return twts, offset, end, err
}
func fetchConv(ctx context.Context, db db, hash string, _ int, offset int64) ([]types.Twt, int64, int64, error) {
ctx, span := otel.Span(ctx)
defer span.End()
var end int64
err := db.QueryRowContext(ctx, `
select count(*) n from twts where hash = $1 or conv = $1`, hash).Scan(&end)
span.RecordError(err)
if err != nil {
return nil, 0, 0, err
}
rows, err := db.QueryContext(
ctx, `
SELECT
feed_id,
hash,
conv,
nick,
uri,
text
FROM twts
JOIN (
SELECT
feed_id,
nick,
uri
FROM feeds
) using (feed_id)
WHERE
hash = $1 or
conv = $1
order by ulid asc`, hash,
)
if err != nil {
span.RecordError(err)
return nil, 0, 0, err
}
defer rows.Close()
var twts []types.Twt
for rows.Next() {
var o struct {
FeedID string
Hash string
Conv string
Dt string
Nick string
URI string
Text string
}
err = rows.Scan(&o.FeedID, &o.Hash, &o.Conv, &o.Nick, &o.URI, &o.Text)
if err != nil {
span.RecordError(err)
return nil, 0, 0, err
}
twter := types.NewTwter(o.Nick, o.URI)
twt, _ := lextwt.ParseLine(o.Text, &twter)
twts = append(twts, twt)
}
err = rows.Err()
return twts, offset, end, err
}