Files
xt/feed.go

945 lines
19 KiB
Go
Raw Permalink Normal View History

2024-11-11 19:13:34 -07:00
package main
import (
2025-02-15 16:12:42 -07:00
"cmp"
2024-11-11 19:13:34 -07:00
"context"
2025-03-30 21:14:11 -06:00
"crypto/sha3"
2024-11-11 19:13:34 -07:00
"database/sql"
2025-03-13 22:36:24 -06:00
"database/sql/driver"
2025-02-15 16:12:42 -07:00
"fmt"
2025-03-29 17:09:18 -06:00
"io"
2025-02-15 16:12:42 -07:00
"iter"
"net/http"
"net/url"
"slices"
"strings"
"time"
_ "embed"
2025-03-13 22:36:24 -06:00
"github.com/oklog/ulid/v2"
2025-03-31 15:23:40 -06:00
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
2025-02-24 17:28:09 -07:00
"go.sour.is/xt/internal/otel"
2025-03-31 17:39:24 -06:00
"go.sour.is/xt/internal/uuid"
2025-02-15 16:12:42 -07:00
"go.yarn.social/lextwt"
"go.yarn.social/types"
2024-11-11 19:13:34 -07:00
)
type Feed struct {
2025-03-31 17:39:24 -06:00
FeedID uuid.UUID
ParentID uuid.UUID
2025-03-13 22:36:24 -06:00
HashURI string
2024-11-11 19:13:34 -07:00
URI string
Nick string
2025-03-13 22:36:24 -06:00
State State
LastScanOn TwtTime
2024-11-11 19:13:34 -07:00
RefreshRate int
2025-03-13 22:36:24 -06:00
NextScanOn TwtTime
2025-03-26 15:38:25 -06:00
LastTwtOn TwtTime
2024-11-11 19:13:34 -07:00
2025-03-13 22:36:24 -06:00
LastModified TwtTime
2024-11-11 19:13:34 -07:00
LastError sql.NullString
ETag sql.NullString
2025-02-15 16:12:42 -07:00
Version string
2024-11-11 19:13:34 -07:00
DiscloseFeedURL string
DiscloseNick string
}
2025-02-15 16:12:42 -07:00
type State string
const (
PermanentlyDead State = "permanantly-dead"
Frozen State = "frozen"
Cold State = "cold"
Warm State = "warm"
Hot State = "hot"
2025-03-13 22:36:24 -06:00
Once State = "once"
2025-02-15 16:12:42 -07:00
)
var (
//go:embed init.sql
initSQL string
2025-03-13 22:36:24 -06:00
insertFeed = func(r int) (string, int) {
repeat := ""
if r > 1 {
repeat = strings.Repeat(", (?, ?, ?, ?, ?, ?, ?)", r-1)
}
return `
insert into feeds (
feed_id,
parent_id,
nick,
uri,
state,
last_scan_on,
refresh_rate
2025-03-24 22:29:18 -06:00
)
2025-03-13 22:36:24 -06:00
values (?, ?, ?, ?, ?, ?, ?)` + repeat + `
2025-03-30 12:12:10 -06:00
ON CONFLICT (feed_id) DO NOTHING
`, r * 7
2025-03-13 22:36:24 -06:00
}
updateFeed = `
update feeds set
2025-03-30 12:12:10 -06:00
nick = ?,
2025-03-13 22:36:24 -06:00
state = ?,
last_scan_on = ?,
refresh_rate = ?,
last_modified_on = ?,
last_etag = ?,
last_error = ?
where feed_id = ?
2025-02-15 16:12:42 -07:00
`
2025-03-13 22:36:24 -06:00
insertTwt = func(r int) (string, int) {
repeat := ""
if r > 1 {
repeat = strings.Repeat(", (?, ?, ?, ?, ?, ?, ?)", r-1)
}
return `
2025-03-29 17:09:18 -06:00
insert into twts (
2025-03-29 19:48:06 -06:00
feed_id,
ulid,
text,
hash,
conv,
mentions,
2025-03-29 17:09:18 -06:00
tags
)
2025-03-13 22:36:24 -06:00
values (?, ?, ?, ?, ?, ?, ?)` + repeat + `
2025-03-30 12:12:10 -06:00
ON CONFLICT (feed_id, ulid) DO UPDATE SET
conv = excluded.conv,
hash = excluded.hash
`, r * 7
2025-03-13 22:36:24 -06:00
}
2025-02-15 16:12:42 -07:00
fetchFeeds = `
2025-03-24 22:29:18 -06:00
select
2025-03-13 22:36:24 -06:00
feed_id,
2025-03-24 22:29:18 -06:00
parent_id,
2025-03-25 17:05:21 -06:00
coalesce(hashing_uri, uri) hash_uri,
2025-03-13 22:36:24 -06:00
uri,
nick,
2025-03-24 22:29:18 -06:00
state,
2025-03-13 22:36:24 -06:00
last_scan_on,
strftime(
2025-03-25 17:05:21 -06:00
'%Y-%m-%dT%H:%M:%fZ',
2025-03-24 22:29:18 -06:00
coalesce(last_scan_on, '1901-01-01'),
2025-03-25 17:05:21 -06:00
'+'||abs(refresh_rate + cast(random() % 30 as int))||' seconds'
) next_scan_on,
coalesce(last_twt_on, '1901-01-01T00:00:00Z') last_twt_on,
2025-02-15 16:12:42 -07:00
refresh_rate,
last_modified_on,
last_etag
from feeds
2025-09-22 16:43:20 -06:00
left join (
select feed_id, max(strftime('%Y-%m-%dT%H:%M:%fZ', (substring(text, 1, instr(text, ' ')-1)))) last_twt_on
from twts group by feed_id
) using (feed_id)
2025-03-13 22:36:24 -06:00
left join (
select
feed_id parent_id,
uri hashing_uri
from feeds
where parent_id is null
) using (parent_id)
2025-02-24 17:28:09 -07:00
where datetime(
2025-03-24 22:29:18 -06:00
coalesce(last_scan_on, '1901-01-01'),
2025-03-25 17:05:21 -06:00
'+'||abs(refresh_rate+cast(random()%30 as int))||' seconds'
) < datetime(current_timestamp, '+3 minutes')
2025-02-15 16:12:42 -07:00
`
2025-03-27 16:35:05 -06:00
permaban = []string{
"//lublin.se/",
"//enotty.dk/",
}
2025-02-15 16:12:42 -07:00
)
2025-03-13 22:36:24 -06:00
func (f *Feed) Save(ctx context.Context, db db) error {
2025-02-24 17:28:09 -07:00
ctx, span := otel.Span(ctx)
defer span.End()
2024-11-11 19:13:34 -07:00
_, err := db.ExecContext(
ctx,
updateFeed,
2025-03-30 12:12:10 -06:00
f.Nick,
2025-03-13 22:36:24 -06:00
f.State, // state
f.LastScanOn, // last_scan_on
f.RefreshRate, // refresh_rate
f.LastModified, // last_modified_on
f.ETag, // last_etag
f.LastError, // last_error
f.FeedID, // feed_id
2024-11-11 19:13:34 -07:00
)
return err
}
2025-02-15 16:12:42 -07:00
func (f *Feed) Scan(res interface{ Scan(...any) error }) error {
f.State = "load"
var err error
f.Version = "0.0.1"
err = res.Scan(
&f.FeedID,
2025-03-13 22:36:24 -06:00
&f.ParentID,
&f.HashURI,
2025-02-15 16:12:42 -07:00
&f.URI,
&f.Nick,
2025-03-13 22:36:24 -06:00
&f.State,
2025-02-15 16:12:42 -07:00
&f.LastScanOn,
2025-03-13 22:36:24 -06:00
&f.NextScanOn,
2025-03-25 17:05:21 -06:00
&f.LastTwtOn,
2025-02-15 16:12:42 -07:00
&f.RefreshRate,
&f.LastModified,
&f.ETag,
)
if err != nil {
return err
}
return err
}
2025-03-13 22:36:24 -06:00
func loadFeeds(ctx context.Context, db db) (iter.Seq[Feed], error) {
2025-02-24 17:28:09 -07:00
ctx, span := otel.Span(ctx)
2025-02-15 16:12:42 -07:00
var err error
var res *sql.Rows
res, err = db.QueryContext(ctx, fetchFeeds)
if err != nil {
return slices.Values([]Feed{}), err
}
return func(yield func(Feed) bool) {
2025-02-24 17:28:09 -07:00
defer span.End()
2025-02-15 16:12:42 -07:00
for res.Next() {
var f Feed
err = f.Scan(res)
if err != nil {
2025-03-13 22:36:24 -06:00
span.RecordError(err)
2025-02-15 16:12:42 -07:00
return
}
if !yield(f) {
return
}
}
}, err
}
2025-03-13 22:36:24 -06:00
func storeFeed(ctx context.Context, db db, f types.TwtFile) error {
2025-02-24 17:28:09 -07:00
ctx, span := otel.Span(ctx)
defer span.End()
2025-02-15 16:12:42 -07:00
loadTS := time.Now()
refreshRate := 600
2025-04-07 13:11:09 -06:00
feedURI, _ := f.Info().GetN("uri", 0)
feedID := uuid.UrlNS.UUID5(cmp.Or(
feedURI.Value(),
f.Twter().HashingURI,
f.Twter().URI,
))
2025-02-15 16:12:42 -07:00
2025-02-24 17:28:09 -07:00
tx, err := db.BeginTx(ctx, nil)
2025-02-15 16:12:42 -07:00
if err != nil {
return err
}
followers := f.Info().GetAll("follow")
followMap := make(map[string]string, len(followers))
for _, f := range f.Info().GetAll("follow") {
2025-03-29 17:09:18 -06:00
nick, uri, ok := strings.Cut(f.Value(), " ")
2025-02-15 16:12:42 -07:00
if !ok {
continue
}
nick = strings.TrimSpace(nick)
2025-03-29 19:48:06 -06:00
uri = strings.TrimSpace(uri)
2025-02-15 16:12:42 -07:00
if _, err := url.Parse(uri); err != nil {
continue
}
2025-03-29 17:09:18 -06:00
followMap[uri] = nick
2025-02-15 16:12:42 -07:00
}
defer tx.Rollback()
2025-03-13 22:36:24 -06:00
twts := f.Twts()
_, size := insertTwt(len(twts))
args := make([]any, 0, size)
2025-02-15 16:12:42 -07:00
2025-03-13 22:36:24 -06:00
for _, twt := range twts {
2025-03-29 17:09:18 -06:00
twtID := makeULID(twt)
2025-03-31 17:39:24 -06:00
mentions := make(uuid.UUIDs, 0, len(twt.Mentions()))
2025-02-15 16:12:42 -07:00
for _, mention := range twt.Mentions() {
2025-03-29 17:09:18 -06:00
followMap[mention.Twter().URI] = mention.Twter().Nick
2025-03-31 17:39:24 -06:00
mentions = append(mentions, uuid.UrlNS.UUID5(mention.Twter().URI))
2025-02-15 16:12:42 -07:00
}
2025-03-31 17:39:24 -06:00
tags := make(uuid.List, 0, len(twt.Tags()))
2025-02-15 16:12:42 -07:00
for _, tag := range twt.Tags() {
tags = append(tags, tag.Text())
}
subject := twt.Subject()
subjectTag := ""
if subject != nil {
if tag, ok := subject.Tag().(*lextwt.Tag); ok && tag != nil {
subjectTag = tag.Text()
}
}
2025-03-29 17:09:18 -06:00
2025-03-13 22:36:24 -06:00
args = append(
args,
feedID, // feed_id
2025-03-29 17:09:18 -06:00
twtID, // ulid
2025-03-13 22:36:24 -06:00
fmt.Sprintf("%+l", twt), // text
twt.Hash(), // hash
2025-03-29 19:48:06 -06:00
subjectTag, // conv
2025-03-13 22:36:24 -06:00
mentions.ToStrList(), // mentions
tags, // tags
)
}
for query, args := range chunk(args, insertTwt, db.MaxVariableNumber) {
fmt.Println("store", f.Twter().URI, len(args))
2025-02-15 16:12:42 -07:00
2025-02-24 17:28:09 -07:00
_, err = tx.ExecContext(
ctx,
2025-03-13 22:36:24 -06:00
query,
args...,
2025-02-15 16:12:42 -07:00
)
if err != nil {
return err
}
}
2025-03-13 22:36:24 -06:00
args = args[:0]
args = append(args,
feedID, // feed_id
nil, // parent_id
f.Twter().DomainNick(), // nick
f.Twter().URI, // uri
"warm", // state
TwtTime{Time: loadTS, Valid: true}, // last_scan_on
refreshRate, // refresh_rate
)
2025-03-24 22:29:18 -06:00
2025-03-13 22:36:24 -06:00
if prev, ok := f.Info().GetN("prev", 0); ok {
_, part, ok := strings.Cut(prev.Value(), " ")
if ok {
2025-03-26 15:38:25 -06:00
uri := f.Twter().URI
2025-03-26 18:54:44 -06:00
if u, ok := f.Info().GetN("url", 0); ok {
uri = u.Value()
}
if u, ok := f.Info().GetN("uri", 0); ok {
uri = u.Value()
}
2025-03-13 22:36:24 -06:00
part = uri[:strings.LastIndex(uri, "/")+1] + part
2025-03-31 17:39:24 -06:00
childID := uuid.UrlNS.UUID5(part)
2025-03-26 18:54:44 -06:00
fmt.Println("found prev", uri, part)
2025-03-13 22:36:24 -06:00
args = append(args,
2025-03-26 15:38:25 -06:00
childID, // feed_id
feedID, // parent_id
2025-03-13 22:36:24 -06:00
f.Twter().DomainNick(), // nick
2025-03-26 15:38:25 -06:00
part, // uri
"once", // state
nil, // last_scan_on
0, // refresh_rate
2025-03-13 22:36:24 -06:00
)
}
}
2025-03-29 17:09:18 -06:00
for uri, nick := range followMap {
2025-03-13 22:36:24 -06:00
args = append(args,
2025-03-31 17:39:24 -06:00
uuid.UrlNS.UUID5(uri), // feed_id
nil, // parent_id
nick, // nick
uri, // uri
"warm", // state
nil, // last_scan_on
refreshRate, // refresh_rate
2025-03-13 22:36:24 -06:00
)
}
for query, args := range chunk(args, insertFeed, db.MaxVariableNumber) {
2025-02-24 17:28:09 -07:00
_, err = tx.ExecContext(
ctx,
2025-03-13 22:36:24 -06:00
query,
args...,
2025-02-15 16:12:42 -07:00
)
if err != nil {
return err
}
}
return tx.Commit()
}
2025-03-29 17:09:18 -06:00
func storeRegistry(ctx context.Context, db db, in io.Reader) error {
ctx, span := otel.Span(ctx)
defer span.End()
twters := make(map[string]string)
args := make([]any, 0, 1024*16)
2025-03-30 21:14:11 -06:00
i := 0
2025-03-29 17:09:18 -06:00
for line := range lextwt.IterRegistry(in) {
twt, ok := line.(*lextwt.Twt)
if !ok {
continue
}
nick := twt.Twter().DomainNick()
uri := twt.Twter().URI
2025-03-31 17:39:24 -06:00
feedID := uuid.UrlNS.UUID5(uri)
2025-03-29 17:09:18 -06:00
twtID := makeULID(twt)
text := fmt.Sprintf("%+l", twt)
// if !strings.HasPrefix(uri, "http") {
// fmt.Println("skip bad uri ", nick, uri)
// continue
// }
// if strings.HasPrefix(nick, "http") {
// fmt.Println("skip bad nick", nick, uri)
// continue
// }
twters[uri] = nick
2025-03-31 17:39:24 -06:00
mentions := make(uuid.UUIDs, 0, len(twt.Mentions()))
2025-03-29 17:09:18 -06:00
for _, mention := range twt.Mentions() {
twters[uri] = nick
2025-03-31 17:39:24 -06:00
mentions = append(mentions, uuid.UrlNS.UUID5(mention.Twter().URI))
2025-03-29 17:09:18 -06:00
}
2025-03-31 17:39:24 -06:00
tags := make(uuid.List, 0, len(twt.Tags()))
2025-03-29 17:09:18 -06:00
for _, tag := range twt.Tags() {
tags = append(tags, tag.Text())
}
subject := twt.Subject()
subjectTag := ""
if tag, ok := subject.Tag().(*lextwt.Tag); ok && tag != nil {
subjectTag = tag.Text()
}
args = append(
args,
feedID, // feed_id
twtID, // ulid
text, // text
twt.Hash(), // hash
subjectTag, // conv
mentions.ToStrList(), // mentions
tags, // tags
)
if len(args) >= 16*1022 {
2025-03-31 10:49:36 -06:00
i += len(args)
2025-03-30 21:14:11 -06:00
fmt.Println("store", i/7, i)
2025-03-29 17:09:18 -06:00
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return err
}
for query, args := range chunk(args, insertTwt, db.MaxVariableNumber) {
// fmt.Println("store", len(args))
_, err = tx.ExecContext(
ctx,
query,
args...,
)
if err != nil {
return err
}
}
args = args[:0]
for uri, nick := range twters {
// if !strings.HasPrefix(uri, "http") {
// fmt.Println("skip", nick, uri)
// continue
// }
// if strings.HasPrefix(nick, "http") {
// fmt.Println("skip bad nick", nick, uri)
// continue
// }
2025-03-31 17:39:24 -06:00
feedID := uuid.UrlNS.UUID5(uri)
2025-03-29 17:09:18 -06:00
args = append(args,
feedID, // feed_id
nil, // parent_id
nick, // nick
uri, // uri
PermanentlyDead, // state
nil, // last_scan_on
TenYear, // refresh_rate
)
}
for query, args := range chunk(args, insertFeed, db.MaxVariableNumber) {
_, err = tx.ExecContext(
ctx,
query,
args...,
)
if err != nil {
return err
}
}
args = args[:0]
err = tx.Commit()
if err != nil {
return err
}
}
}
return refreshLastTwt(ctx, db)
}
2025-02-15 16:12:42 -07:00
func (feed *Feed) MakeHTTPRequest(ctx context.Context) (*http.Request, error) {
2025-03-27 16:35:05 -06:00
for _, host := range permaban {
if strings.Contains(feed.URI, host) {
return nil, fmt.Errorf("%w: permaban: %s", ErrPermanentlyDead, feed.URI)
}
2025-03-26 15:38:25 -06:00
}
2025-02-15 16:12:42 -07:00
2025-03-13 22:36:24 -06:00
req, err := http.NewRequestWithContext(ctx, "GET", feed.URI, nil)
2025-02-15 16:12:42 -07:00
if err != nil {
return nil, fmt.Errorf("creating HTTP request failed: %w", err)
}
req.Header.Add("Accept", "text/plain")
if !feed.LastModified.Valid {
req.Header.Add("If-Modified-Since", feed.LastModified.Time.Format(http.TimeFormat))
}
if feed.ETag.Valid {
req.Header.Add("If-None-Match", feed.ETag.String)
}
2025-03-27 16:35:05 -06:00
// TODO: this is probably not needed.
2025-02-15 16:12:42 -07:00
if feed.DiscloseFeedURL != "" && feed.DiscloseNick != "" {
req.Header.Set("User-Agent", fmt.Sprintf("xt/%s (+%s; @%s)",
feed.Version, feed.DiscloseFeedURL, feed.DiscloseNick))
} else {
req.Header.Set("User-Agent", fmt.Sprintf("xt/%s", feed.Version))
}
return req, nil
}
2025-03-13 22:36:24 -06:00
type TwtTime struct {
Time time.Time
Valid bool // Valid is true if Time is not NULL
}
// Scan implements the [Scanner] interface.
func (n *TwtTime) Scan(value any) error {
var err error
switch value := value.(type) {
case nil:
n.Time, n.Valid = time.Time{}, false
return nil
case string:
n.Time, err = time.Parse(time.RFC3339, value)
2025-04-07 13:11:09 -06:00
n.Valid = err == nil
2025-03-13 22:36:24 -06:00
case time.Time:
n.Valid = true
n.Time = value
}
return err
}
// Value implements the [driver.Valuer] interface.
func (n TwtTime) Value() (driver.Value, error) {
if !n.Valid {
return nil, nil
}
return n.Time.Format(time.RFC3339), nil
}
func makeULID(twt types.Twt) ulid.ULID {
2025-03-31 10:49:36 -06:00
text := fmt.Appendf(nil, "%s\t%+l", twt.Twter().URI, twt)
2025-03-13 22:36:24 -06:00
u := ulid.ULID{}
u.SetTime(ulid.Timestamp(twt.Created()))
2025-03-30 21:14:11 -06:00
u.SetEntropy(sha3.SumSHAKE128(text, 10))
2025-03-13 22:36:24 -06:00
return u
}
func chunk(args []any, qry func(int) (string, int), maxArgs int) iter.Seq2[string, []any] {
_, size := qry(1)
itemsPerIter := maxArgs / size
if len(args) < size {
return func(yield func(string, []any) bool) {}
}
if len(args) < maxArgs {
return func(yield func(string, []any) bool) {
query, _ := qry(len(args) / size)
yield(query, args)
}
}
return func(yield func(string, []any) bool) {
for len(args) > 0 {
if len(args) > maxArgs {
query, size := qry(itemsPerIter)
if !yield(query, args[:size]) {
return
}
args = args[size:]
continue
}
query, _ := qry(len(args) / size)
yield(query, args)
return
}
}
}
2025-03-29 17:09:18 -06:00
func refreshLastTwt(ctx context.Context, db db) error {
2025-03-30 21:32:49 -06:00
qry := `
delete from last_twt_on;
2025-03-31 11:09:13 -06:00
insert into last_twt_on (feed_id, last_twt_on)
2025-03-31 10:49:36 -06:00
select distinct
2025-03-29 17:09:18 -06:00
feed_id,
max(strftime('%Y-%m-%dT%H:%M:%fZ', (substring(text, 1, instr(text, ' ')-1)))) last_twt_on
from twts
2025-03-31 11:09:13 -06:00
group by feed_id;
2025-03-30 21:32:49 -06:00
delete from twt_mentions;
2025-03-31 11:09:13 -06:00
insert into twt_mentions (ulid, feed_id)
2025-03-31 10:49:36 -06:00
select distinct
ulid,
unhex(replace(trim(value,'{}'),'-','')) feed_id
from twts, json_each(mentions);
2025-03-30 21:32:49 -06:00
`
var err error
for _, stmt := range strings.Split(qry, ";") {
_, err = db.ExecContext(ctx, stmt)
if err != nil {
return err
}
}
2025-03-29 17:09:18 -06:00
return err
}
2025-03-31 15:23:40 -06:00
func fetchTwts(ctx context.Context, db db, uri string, limit int, offset int64) ([]types.Twt, int64, int64, error) {
ctx, span := otel.Span(ctx)
defer span.End()
args := make([]any, 0, 3)
where := `where feed_id in (select feed_id from feeds where state != 'permanantly-dead')`
if uri != "" {
2025-03-31 17:39:24 -06:00
feed_id := uuid.UrlNS.UUID5(uri)
2025-03-31 15:23:40 -06:00
where = "where feed_id = ?"
args = append(args, feed_id)
}
var end int64
err := db.QueryRowContext(ctx, `
select count(*) n from twts `+where+``, args...).Scan(&end)
span.RecordError(err)
if err != nil {
return nil, 0, 0, err
}
if offset < 1 {
offset += end
}
offset = max(1, offset)
args = append(args, limit, offset-int64(limit))
span.AddEvent("twts", trace.WithAttributes(
attribute.Int("limit", limit),
attribute.Int64("offset-end", offset),
attribute.Int64("offset-start", offset-int64(limit)),
attribute.Int64("max", end),
))
qry := `
SELECT
feed_id,
hash,
conv,
coalesce(nick, 'nobody') nick,
coalesce(uri, 'https://empty.txt') uri,
text
FROM twts
join (
select feed_id, nick, uri
from feeds
) using (feed_id)
where rowid in (
select rowid from twts
` + where + `
order by ulid asc
limit ?
offset ?
)
order by ulid asc`
fmt.Println(qry, args)
rows, err := db.QueryContext(
ctx, qry, args...,
)
if err != nil {
span.RecordError(err)
return nil, 0, 0, err
}
defer rows.Close()
var twts []types.Twt
for rows.Next() {
var o struct {
FeedID string
Hash string
Conv string
Dt string
Nick string
URI string
Text string
}
err = rows.Scan(&o.FeedID, &o.Hash, &o.Conv, &o.Nick, &o.URI, &o.Text)
if err != nil {
span.RecordError(err)
return nil, 0, 0, err
}
twter := types.NewTwter(o.Nick, o.URI)
twt, _ := lextwt.ParseLine(o.Text, &twter)
twts = append(twts, twt)
}
return twts, offset, end, err
}
func fetchUsers(ctx context.Context, db db, uri, q string) ([]types.Twt, error) {
ctx, span := otel.Span(ctx)
defer span.End()
where := `where parent_id is null and state not in ('permanantly-dead', 'frozen') and last_twt_on is not null`
args := make([]any, 0)
if uri != "" {
where = `where feed_id = ? or parent_id = ?`
2025-03-31 17:39:24 -06:00
feed_id := uuid.UrlNS.UUID5(uri)
2025-03-31 15:23:40 -06:00
args = append(args, feed_id, feed_id)
} else if q != "" {
where = `where nick like ?`
args = append(args, "%"+q+"%")
}
qry := `
SELECT
feed_id,
uri,
nick,
last_scan_on,
coalesce(last_twt_on, last_scan_on) last_twt_on
FROM feeds
left join last_twt_on using (feed_id)
` + where + `
order by nick, uri
`
fmt.Println(qry, args)
rows, err := db.QueryContext(ctx, qry, args...)
if err != nil {
span.RecordError(err)
return nil, err
}
defer rows.Close()
var twts []types.Twt
for rows.Next() {
var o struct {
FeedID string
URI string
Nick string
Dt TwtTime
LastTwtOn TwtTime
}
err = rows.Scan(&o.FeedID, &o.URI, &o.Nick, &o.Dt, &o.LastTwtOn)
if err != nil {
span.RecordError(err)
return nil, err
}
twts = append(twts, lextwt.NewTwt(
types.NewTwter(o.Nick, o.URI),
lextwt.NewDateTime(o.Dt.Time, o.LastTwtOn.Time.Format(time.RFC3339)),
nil,
))
}
return twts, nil
}
2025-03-31 17:39:24 -06:00
func fetchMentions(ctx context.Context, db db, mention uuid.UUID, limit int, offset int64) ([]types.Twt, int64, int64, error) {
2025-03-31 15:23:40 -06:00
ctx, span := otel.Span(ctx)
defer span.End()
args := make([]any, 0, 3)
args = append(args, mention)
var end int64
err := db.QueryRowContext(ctx, `
select count(*) n from twt_mentions where feed_id = ?`, args...).Scan(&end)
span.RecordError(err)
if err != nil {
return nil, 0, 0, err
}
fmt.Println(mention.MarshalText(), end, err)
if offset < 1 {
offset += end
}
limit = min(100, max(1, limit))
offset = max(1, offset)
args = append(args, limit, offset-int64(limit))
qry := `
SELECT
feed_id,
hash,
conv,
coalesce(nick, 'nobody') nick,
coalesce(uri, 'https://empty.txt') uri,
text
FROM twts
join (
select feed_id, nick, uri
from feeds
) using (feed_id)
where rowid in (
select rowid from twts
where ulid in (select ulid from twt_mentions where feed_id = ?)
order by ulid asc
limit ?
offset ?
)
order by ulid asc
`
fmt.Println(qry, args)
rows, err := db.QueryContext(
ctx, qry, args...,
)
if err != nil {
span.RecordError(err)
return nil, 0, 0, err
}
defer rows.Close()
var twts []types.Twt
for rows.Next() {
var o struct {
FeedID string
Hash string
Conv string
Dt string
Nick string
URI string
Text string
}
err = rows.Scan(&o.FeedID, &o.Hash, &o.Conv, &o.Nick, &o.URI, &o.Text)
if err != nil {
span.RecordError(err)
return nil, 0, 0, err
}
twter := types.NewTwter(o.Nick, o.URI)
// o.Text = strings.ReplaceAll(o.Text, "\n", "\u2028")
twt, _ := lextwt.ParseLine(o.Text, &twter)
twts = append(twts, twt)
}
if err := rows.Err(); err != nil {
fmt.Println(err)
return nil, 0, 0, err
}
return twts, offset, end, err
}
2025-03-31 17:39:24 -06:00
func fetchConv(ctx context.Context, db db, hash string, _ int, offset int64) ([]types.Twt, int64, int64, error) {
2025-03-31 15:23:40 -06:00
ctx, span := otel.Span(ctx)
defer span.End()
var end int64
err := db.QueryRowContext(ctx, `
select count(*) n from twts where hash = $1 or conv = $1`, hash).Scan(&end)
span.RecordError(err)
if err != nil {
return nil, 0, 0, err
}
rows, err := db.QueryContext(
ctx, `
SELECT
feed_id,
hash,
conv,
nick,
uri,
text
FROM twts
JOIN (
SELECT
feed_id,
nick,
uri
FROM feeds
) using (feed_id)
WHERE
hash = $1 or
conv = $1
order by ulid asc`, hash,
)
if err != nil {
span.RecordError(err)
return nil, 0, 0, err
}
defer rows.Close()
var twts []types.Twt
for rows.Next() {
var o struct {
FeedID string
Hash string
Conv string
Dt string
Nick string
URI string
Text string
}
err = rows.Scan(&o.FeedID, &o.Hash, &o.Conv, &o.Nick, &o.URI, &o.Text)
if err != nil {
span.RecordError(err)
2025-03-31 17:39:24 -06:00
return nil, 0, 0, err
2025-03-31 15:23:40 -06:00
}
twter := types.NewTwter(o.Nick, o.URI)
twt, _ := lextwt.ParseLine(o.Text, &twter)
twts = append(twts, twt)
}
err = rows.Err()
return twts, offset, end, err
2025-03-31 17:39:24 -06:00
}