diff --git a/.gitignore b/.gitignore index df69e5e..502590c 100644 --- a/.gitignore +++ b/.gitignore @@ -5,4 +5,4 @@ feeds/ /xt .env *.txt -*.txt.xz \ No newline at end of file +*.xz \ No newline at end of file diff --git a/app.go b/app.go index f23f805..a047501 100644 --- a/app.go +++ b/app.go @@ -70,6 +70,12 @@ func run(ctx context.Context, c *console.C[args]) error { ctx, span := otel.Span(ctx) defer span.End() + db, err := app.DB(ctx) + if err != nil { + return err + } + defer db.Close() + var inFile io.Reader if a.baseFeed != "" { @@ -78,30 +84,30 @@ func run(ctx context.Context, c *console.C[args]) error { return err } defer f.Close() - } else { + err = storeRegistry(ctx, db, f) + if err != nil { + return err + } + } + + if a.URI != "" { res, err := http.Get(a.URI) if err != nil { return err } inFile = res.Body defer res.Body.Close() + twtfile, err := lextwt.ParseFile(inFile, &types.Twter{ + Nick: a.Nick, + URI: a.URI, + }) + if err != nil { + return fmt.Errorf("%w: %w", ErrParseFailed, err) + } + + return storeFeed(ctx, db, twtfile) } - - twtfile, err := lextwt.ParseFile(inFile, &types.Twter{ - Nick: a.Nick, - URI: a.URI, - }) - if err != nil { - return fmt.Errorf("%w: %w", ErrParseFailed, err) - } - - db, err := app.DB(ctx) - if err != nil { - return err - } - defer db.Close() - - return storeFeed(ctx, db, twtfile) + return nil }(ctx) if err != nil { return err diff --git a/feed.go b/feed.go index 4076222..baf03bb 100644 --- a/feed.go +++ b/feed.go @@ -7,6 +7,7 @@ import ( "database/sql/driver" "fmt" "hash/fnv" + "io" "iter" "net/http" "net/url" @@ -93,8 +94,15 @@ var ( repeat = strings.Repeat(", (?, ?, ?, ?, ?, ?, ?)", r-1) } return ` - insert into twts - (feed_id, ulid, text, hash, conv, mentions, tags) + insert into twts ( + feed_id, + ulid, + text, + hash, + conv, + mentions, + tags + ) values (?, ?, ?, ?, ?, ?, ?)` + repeat + ` ON CONFLICT (feed_id, ulid) DO NOTHING`, r * 7 } @@ -140,24 +148,6 @@ var ( } ) -func (f *Feed) Create(ctx context.Context, db db) error { - ctx, span := otel.Span(ctx) - defer span.End() - query, _ := insertFeed(1) - _, err := db.ExecContext( - ctx, - query, - f.FeedID, // feed_id - f.ParentID, // parent_id - f.Nick, // nick - f.URI, // uri - f.State, // state - f.LastScanOn, // last_scan_on - f.RefreshRate, // refresh_rate - ) - return err -} - func (f *Feed) Save(ctx context.Context, db db) error { ctx, span := otel.Span(ctx) defer span.End() @@ -248,7 +238,7 @@ func storeFeed(ctx context.Context, db db, f types.TwtFile) error { followers := f.Info().GetAll("follow") followMap := make(map[string]string, len(followers)) for _, f := range f.Info().GetAll("follow") { - nick, uri, ok := strings.Cut(f.Value(), "http") + nick, uri, ok := strings.Cut(f.Value(), " ") if !ok { continue } @@ -259,7 +249,7 @@ func storeFeed(ctx context.Context, db db, f types.TwtFile) error { continue } - followMap[nick] = uri + followMap[uri] = nick } defer tx.Rollback() @@ -269,9 +259,11 @@ func storeFeed(ctx context.Context, db db, f types.TwtFile) error { args := make([]any, 0, size) for _, twt := range twts { + twtID := makeULID(twt) + mentions := make(uuids, 0, len(twt.Mentions())) for _, mention := range twt.Mentions() { - followMap[mention.Twter().Nick] = mention.Twter().URI + followMap[mention.Twter().URI] = mention.Twter().Nick mentions = append(mentions, urlNS.UUID5(mention.Twter().URI)) } @@ -287,10 +279,11 @@ func storeFeed(ctx context.Context, db db, f types.TwtFile) error { subjectTag = tag.Text() } } + args = append( args, feedID, // feed_id - makeULID(twt), // ulid + twtID, // ulid fmt.Sprintf("%+l", twt), // text subjectTag, // conv twt.Hash(), // hash @@ -348,7 +341,7 @@ func storeFeed(ctx context.Context, db db, f types.TwtFile) error { } } - for nick, uri := range followMap { + for uri, nick := range followMap { args = append(args, urlNS.UUID5(uri), // feed_id nil, // parent_id @@ -372,6 +365,129 @@ func storeFeed(ctx context.Context, db db, f types.TwtFile) error { return tx.Commit() } +func storeRegistry(ctx context.Context, db db, in io.Reader) error { + ctx, span := otel.Span(ctx) + defer span.End() + + twters := make(map[string]string) + args := make([]any, 0, 1024*16) + + for line := range lextwt.IterRegistry(in) { + twt, ok := line.(*lextwt.Twt) + if !ok { + continue + } + + nick := twt.Twter().DomainNick() + uri := twt.Twter().URI + feedID := urlNS.UUID5(uri) + twtID := makeULID(twt) + text := fmt.Sprintf("%+l", twt) + + // if !strings.HasPrefix(uri, "http") { + // fmt.Println("skip bad uri ", nick, uri) + // continue + // } + + // if strings.HasPrefix(nick, "http") { + // fmt.Println("skip bad nick", nick, uri) + // continue + // } + + twters[uri] = nick + + mentions := make(uuids, 0, len(twt.Mentions())) + for _, mention := range twt.Mentions() { + twters[uri] = nick + mentions = append(mentions, urlNS.UUID5(mention.Twter().URI)) + } + + tags := make(strList, 0, len(twt.Tags())) + for _, tag := range twt.Tags() { + tags = append(tags, tag.Text()) + } + + subject := twt.Subject() + subjectTag := "" + if tag, ok := subject.Tag().(*lextwt.Tag); ok && tag != nil { + subjectTag = tag.Text() + } + + args = append( + args, + feedID, // feed_id + twtID, // ulid + text, // text + twt.Hash(), // hash + subjectTag, // conv + mentions.ToStrList(), // mentions + tags, // tags + ) + + if len(args) >= 16*1022 { + tx, err := db.BeginTx(ctx, nil) + if err != nil { + return err + } + + for query, args := range chunk(args, insertTwt, db.MaxVariableNumber) { + // fmt.Println("store", len(args)) + + _, err = tx.ExecContext( + ctx, + query, + args..., + ) + if err != nil { + return err + } + } + + args = args[:0] + + for uri, nick := range twters { + // if !strings.HasPrefix(uri, "http") { + // fmt.Println("skip", nick, uri) + // continue + // } + // if strings.HasPrefix(nick, "http") { + // fmt.Println("skip bad nick", nick, uri) + // continue + // } + + feedID := urlNS.UUID5(uri) + + args = append(args, + feedID, // feed_id + nil, // parent_id + nick, // nick + uri, // uri + PermanentlyDead, // state + nil, // last_scan_on + TenYear, // refresh_rate + ) + } + for query, args := range chunk(args, insertFeed, db.MaxVariableNumber) { + _, err = tx.ExecContext( + ctx, + query, + args..., + ) + if err != nil { + return err + } + } + args = args[:0] + + err = tx.Commit() + if err != nil { + return err + } + } + } + + return refreshLastTwt(ctx, db) +} func (feed *Feed) MakeHTTPRequest(ctx context.Context) (*http.Request, error) { for _, host := range permaban { @@ -483,3 +599,16 @@ func chunk(args []any, qry func(int) (string, int), maxArgs int) iter.Seq2[strin } } } + +func refreshLastTwt(ctx context.Context, db db) error { + _, err := db.ExecContext(ctx, ` + insert into last_twt_on + select + feed_id, + max(strftime('%Y-%m-%dT%H:%M:%fZ', (substring(text, 1, instr(text, ' ')-1)))) last_twt_on + from twts + group by feed_id + on conflict do update set last_twt_on = excluded.last_twt_on; +`) + return err +} diff --git a/http.go b/http.go index e0e0f8d..995021c 100644 --- a/http.go +++ b/http.go @@ -114,11 +114,12 @@ func httpServer(ctx context.Context, app *appState) error { defer span.End() args := make([]any, 0, 3) - uriarg := "state not in ('frozen', 'permanantly-dead')" + where := `` + uri := r.URL.Query().Get("uri") if uri != "" { feed_id := urlNS.UUID5(uri) - uriarg = "feed_id = ?" + where = "where feed_id = ?" args = append(args, feed_id) } @@ -134,15 +135,7 @@ func httpServer(ctx context.Context, app *appState) error { var end int64 err = db.QueryRowContext(ctx, ` - select count(*) n from twts - JOIN ( - SELECT - feed_id, - nick, - uri - FROM feeds - where `+uriarg+` - ) using (feed_id)`, args...).Scan(&end) + select count(*) n from twts `+where, args...).Scan(&end) span.RecordError(err) if offset < 1 { @@ -159,27 +152,29 @@ func httpServer(ctx context.Context, app *appState) error { attribute.Int64("offset-start", offset-int64(limit)), attribute.Int64("max", end), )) + qry := ` SELECT feed_id, hash, conv, - nick, - uri, + coalesce(nick, 'nobody') nick, + coalesce(uri, 'https://empty.txt') uri, text FROM twts - JOIN ( - SELECT - feed_id, - nick, - uri - FROM feeds - where ` + uriarg + ` + left join ( + select feed_id, nick, uri + from feeds ) using (feed_id) - order by ulid asc - limit ? - offset ? - ` + where rowid in ( + select rowid from twts + ` + where +` + order by ulid asc + limit ? + offset ? + ) + order by ulid asc` + fmt.Println(qry, args) w.Header().Set("Content-Type", "text/plain; charset=utf-8") rows, err := db.QueryContext( @@ -249,13 +244,7 @@ func httpServer(ctx context.Context, app *appState) error { last_scan_on, coalesce(last_twt_on, last_scan_on) last_twt_on FROM feeds - left join ( - select - feed_id, - max(strftime('%Y-%m-%dT%H:%M:%fZ', (substring(text, 1, instr(text, ' ')-1)))) last_twt_on - from twts - group by feed_id - ) using (feed_id) + left join last_twt_on using (feed_id) ` + where + ` order by nick, uri ` diff --git a/init.sql b/init.sql index 30e2cc5..cd279b4 100644 --- a/init.sql +++ b/init.sql @@ -24,3 +24,10 @@ create table if not exists twts ( primary key (feed_id, ulid) ); +create table if not exists last_twt_on( + feed_id blob, + last_twt_on, + primary key (feed_id) +); + +CREATE INDEX if not exists twt_time on twts (ulid asc); diff --git a/internal/otel/otel.go b/internal/otel/otel.go index 95bcca0..50370d7 100644 --- a/internal/otel/otel.go +++ b/internal/otel/otel.go @@ -189,7 +189,25 @@ func newTraceProvider(ctx context.Context, name string) (func(context.Context) e return nil, err } - if v := env("XT_TRACER", ""); v != "" { + if v := env("XT_TRACER", ""); v == "stdout" { + traceExporter, err := stdouttrace.New( + stdouttrace.WithWriter(os.Stderr), + stdouttrace.WithPrettyPrint(), + ) + if err != nil { + return nil, err + } + + tracerProvider := sdktrace.NewTracerProvider( + sdktrace.WithResource(r), + sdktrace.WithBatcher(traceExporter, + // Default is 5s. Set to 1s for demonstrative purposes. + sdktrace.WithBatchTimeout(time.Second)), + ) + otel.SetTracerProvider(tracerProvider) + + return tracerProvider.Shutdown, nil + } else if v != "" { fmt.Println("use tracer", v) exp, err := otlptracegrpc.New( ctx, @@ -208,28 +226,13 @@ func newTraceProvider(ctx context.Context, name string) (func(context.Context) e return func(ctx context.Context) error { return tracerProvider.Shutdown(ctx) }, nil - } + } - traceExporter, err := stdouttrace.New( - stdouttrace.WithWriter(os.Stderr), - stdouttrace.WithPrettyPrint(), - ) - if err != nil { - return nil, err - } - - tracerProvider := sdktrace.NewTracerProvider( - sdktrace.WithResource(r), - sdktrace.WithBatcher(traceExporter, - // Default is 5s. Set to 1s for demonstrative purposes. - sdktrace.WithBatchTimeout(time.Second)), - ) - otel.SetTracerProvider(tracerProvider) - - return tracerProvider.Shutdown, nil + return func(ctx context.Context) error {return nil}, nil } func newMeterProvider(ctx context.Context, name string) (func(context.Context) error, error) { + _, _ = ctx, name // metricExporter, err := stdoutmetric.New() // if err != nil { // return nil, err @@ -281,7 +284,25 @@ func newLoggerProvider(ctx context.Context, name string) (func(context.Context) return nil, err } - if v := env("XT_LOGGER", ""); v != "" { + if v := env("XT_LOGGER", ""); v == "stdout" { + logExporter, err := stdoutlog.New( + stdoutlog.WithPrettyPrint(), + stdoutlog.WithWriter(os.Stderr), + ) + if err != nil { + return nil, err + } + + loggerProvider := log.NewLoggerProvider( + log.WithProcessor( + log.NewBatchProcessor(logExporter), + ), + log.WithResource(r), + ) + global.SetLoggerProvider(loggerProvider) + + return loggerProvider.Shutdown, nil + } else if v != "" { fmt.Println("use logger", v) exp, err := otlploghttp.New( @@ -301,28 +322,9 @@ func newLoggerProvider(ctx context.Context, name string) (func(context.Context) global.SetLoggerProvider(provider) return processor.Shutdown, nil - } - // return func(ctx context.Context) error { return nil }, nil - - logExporter, err := stdoutlog.New( - stdoutlog.WithPrettyPrint(), - stdoutlog.WithWriter(os.Stderr), - ) - if err != nil { - return nil, err - } - - loggerProvider := log.NewLoggerProvider( - log.WithProcessor( - log.NewBatchProcessor(logExporter), - ), - log.WithResource(r), - ) - global.SetLoggerProvider(loggerProvider) - - return loggerProvider.Shutdown, nil + return func(ctx context.Context) error { return nil }, nil } func env(key, def string) string { diff --git a/refresh-loop.go b/refresh-loop.go index 73c3ec7..3e9c3ed 100644 --- a/refresh-loop.go +++ b/refresh-loop.go @@ -5,8 +5,6 @@ import ( "errors" "fmt" "io" - "os" - "path/filepath" "sort" "time" @@ -127,6 +125,8 @@ func processorLoop(ctx context.Context, db db, fetch *pool[*Feed, *Response]) { select { case <-ctx.Done(): return + case <-time.After(10 * time.Minute): + refreshLastTwt(ctx, db) case res := <-fetch.Out(): f := res.Request span.AddEvent("got response", trace.WithAttributes( @@ -162,7 +162,7 @@ func processorLoop(ctx context.Context, db db, fetch *pool[*Feed, *Response]) { f.LastModified.Time, f.LastModified.Valid = res.LastModified(), true span.AddEvent("read feed") - cpy, err := os.OpenFile(filepath.Join("feeds", urlNS.UUID5(f.URI).MarshalText()), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600) + // cpy, err := os.OpenFile(filepath.Join("feeds", urlNS.UUID5(f.URI).MarshalText()), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600) if err != nil { span.RecordError(fmt.Errorf("%w: %w", ErrParseFailed, err)) @@ -174,10 +174,12 @@ func processorLoop(ctx context.Context, db db, fetch *pool[*Feed, *Response]) { continue } - rdr := io.TeeReader(res.Body, cpy) + var rdr io.Reader = res.Body + // rdr := io.TeeReader(rdr, cpy) rdr = lextwt.TwtFixer(rdr) twtfile, err := lextwt.ParseFile(rdr, &types.Twter{Nick: f.Nick, URI: f.URI}) - cpy.Close() + //cpy.Close() + res.Body.Close() if err != nil { span.RecordError(fmt.Errorf("%w: %w", ErrParseFailed, err))