From 69755e14d253f7eaf25e6f1913445198421fecb1 Mon Sep 17 00:00:00 2001 From: xuu Date: Mon, 31 Mar 2025 10:49:36 -0600 Subject: [PATCH] chore: fixes --- app.go | 4 ++-- feed.go | 14 +++++++++----- http.go | 37 ++++++++++++++++--------------------- init.sql | 2 +- internal/otel/otel.go | 14 +++++++------- 5 files changed, 35 insertions(+), 36 deletions(-) diff --git a/app.go b/app.go index a047501..be27ac5 100644 --- a/app.go +++ b/app.go @@ -89,7 +89,7 @@ func run(ctx context.Context, c *console.C[args]) error { return err } } - + if a.URI != "" { res, err := http.Get(a.URI) if err != nil { @@ -104,7 +104,7 @@ func run(ctx context.Context, c *console.C[args]) error { if err != nil { return fmt.Errorf("%w: %w", ErrParseFailed, err) } - + return storeFeed(ctx, db, twtfile) } return nil diff --git a/feed.go b/feed.go index db98cf9..4ef37a0 100644 --- a/feed.go +++ b/feed.go @@ -432,7 +432,7 @@ func storeRegistry(ctx context.Context, db db, in io.Reader) error { ) if len(args) >= 16*1022 { - i+=len(args) + i += len(args) fmt.Println("store", i/7, i) tx, err := db.BeginTx(ctx, nil) if err != nil { @@ -563,7 +563,7 @@ func (n TwtTime) Value() (driver.Value, error) { } func makeULID(twt types.Twt) ulid.ULID { - text := fmt.Appendf(nil, "%s\t%+l", cmp.Or(twt.Twter().HashingURI, twt.Twter().URI), twt) + text := fmt.Appendf(nil, "%s\t%+l", twt.Twter().URI, twt) u := ulid.ULID{} u.SetTime(ulid.Timestamp(twt.Created())) u.SetEntropy(sha3.SumSHAKE128(text, 10)) @@ -608,14 +608,18 @@ func refreshLastTwt(ctx context.Context, db db) error { qry := ` delete from last_twt_on; insert into last_twt_on - select + select distinct feed_id, max(strftime('%Y-%m-%dT%H:%M:%fZ', (substring(text, 1, instr(text, ' ')-1)))) last_twt_on from twts group by feed_id - on conflict(feed_id) do update set last_twt_on = excluded.last_twt_on; + on conflict(feed_id); delete from twt_mentions; - insert into twt_mentions select ulid, unhex(replace(trim(value,'{}'),'-','')) feed_id from twts, json_each(mentions); + insert into twt_mentions + select distinct + ulid, + unhex(replace(trim(value,'{}'),'-','')) feed_id + from twts, json_each(mentions); ` var err error for _, stmt := range strings.Split(qry, ";") { diff --git a/http.go b/http.go index a211b10..56e4363 100644 --- a/http.go +++ b/http.go @@ -25,21 +25,19 @@ const hostname = "https://watcher.sour.is" var PREAMBLE_DOCS = func() lextwt.Comments { c := add(nil, iAmTheWatcher) c = add(c, "") - c = add(c,"Usage:") - c = add(c," %s/api/plain/users View list of users and latest twt date.", hostname) - c = add(c," %s/api/plain/twt View all twts in decending order.", hostname) - c = add(c," %s/api/plain/mentions?uri=:uri View all mentions for uri in decending order.", hostname) - c = add(c," %s/api/plain/conv/:hash View all twts for a conversation subject.", hostname) - c = add(c,"") - c = add(c,"Options:") - c = add(c," uri Filter to show a specific users twts.") - c = add(c," offset Start index for quey.") - c = add(c," limit Count of items to return (going back in time).") - return add(c,"") + c = add(c, "Usage:") + c = add(c, " %s/api/plain/users View list of users and latest twt date.", hostname) + c = add(c, " %s/api/plain/twt View all twts in decending order.", hostname) + c = add(c, " %s/api/plain/mentions?uri=:uri View all mentions for uri in decending order.", hostname) + c = add(c, " %s/api/plain/conv/:hash View all twts for a conversation subject.", hostname) + c = add(c, "") + c = add(c, "Options:") + c = add(c, " uri Filter to show a specific users twts.") + c = add(c, " offset Start index for quey.") + c = add(c, " limit Count of items to return (going back in time).") + return add(c, "") }() - - func httpServer(ctx context.Context, app *appState) error { ctx, span := otel.Span(ctx) defer span.End() @@ -139,10 +137,10 @@ func httpServer(ctx context.Context, app *appState) error { w.Header().Set("Content-Type", "text/plain; charset=utf-8") uri := r.URL.Query().Get("uri") - if uri == ""{ + if uri == "" { reg := lextwt.NewTwtRegistry(PREAMBLE_DOCS, nil) reg.WriteTo(w) - + return } mention := urlNS.UUID5(uri) @@ -150,7 +148,6 @@ func httpServer(ctx context.Context, app *appState) error { args := make([]any, 0, 3) args = append(args, mention) - limit := 100 if v, ok := strconv.Atoi(r.URL.Query().Get("limit")); ok == nil { limit = v @@ -200,7 +197,6 @@ func httpServer(ctx context.Context, app *appState) error { ` fmt.Println(qry, args) - rows, err := db.QueryContext( ctx, qry, args..., ) @@ -249,7 +245,6 @@ func httpServer(ctx context.Context, app *appState) error { reg.WriteTo(w) }) - http.HandleFunc("/api/plain/twt", func(w http.ResponseWriter, r *http.Request) { ctx, span := otel.Span(r.Context()) defer span.End() @@ -266,7 +261,7 @@ func httpServer(ctx context.Context, app *appState) error { offset = v } - twts, end, err := func(uri string, limit int, offset int64, ) ([]types.Twt, int64, error) { + twts, end, err := func(uri string, limit int, offset int64) ([]types.Twt, int64, error) { args := make([]any, 0, 3) where := `where feed_id in (select feed_id from feeds where state != 'permanantly-dead')` @@ -311,7 +306,7 @@ func httpServer(ctx context.Context, app *appState) error { ) using (feed_id) where rowid in ( select rowid from twts - ` + where +` + ` + where + ` order by ulid asc limit ? offset ? @@ -351,7 +346,7 @@ func httpServer(ctx context.Context, app *appState) error { } return twts, end, err - } (uri, limit, offset) + }(uri, limit, offset) span.RecordError(err) preamble := add(PREAMBLE_DOCS, "twt range = 1 %d", end) diff --git a/init.sql b/init.sql index 1b6ceef..1e48dea 100644 --- a/init.sql +++ b/init.sql @@ -35,7 +35,7 @@ CREATE INDEX if not exists twt_time on twts (ulid asc); create table if not exists twt_mentions( feed_id blob, ulid blob, - primary key (feed_id) + primary key (feed_id, ulid) ); CREATE INDEX if not exists twt_mention on twt_mentions (ulid asc); diff --git a/internal/otel/otel.go b/internal/otel/otel.go index 50370d7..1997f24 100644 --- a/internal/otel/otel.go +++ b/internal/otel/otel.go @@ -197,7 +197,7 @@ func newTraceProvider(ctx context.Context, name string) (func(context.Context) e if err != nil { return nil, err } - + tracerProvider := sdktrace.NewTracerProvider( sdktrace.WithResource(r), sdktrace.WithBatcher(traceExporter, @@ -205,7 +205,7 @@ func newTraceProvider(ctx context.Context, name string) (func(context.Context) e sdktrace.WithBatchTimeout(time.Second)), ) otel.SetTracerProvider(tracerProvider) - + return tracerProvider.Shutdown, nil } else if v != "" { fmt.Println("use tracer", v) @@ -226,9 +226,9 @@ func newTraceProvider(ctx context.Context, name string) (func(context.Context) e return func(ctx context.Context) error { return tracerProvider.Shutdown(ctx) }, nil - } + } - return func(ctx context.Context) error {return nil}, nil + return func(ctx context.Context) error { return nil }, nil } func newMeterProvider(ctx context.Context, name string) (func(context.Context) error, error) { @@ -292,7 +292,7 @@ func newLoggerProvider(ctx context.Context, name string) (func(context.Context) if err != nil { return nil, err } - + loggerProvider := log.NewLoggerProvider( log.WithProcessor( log.NewBatchProcessor(logExporter), @@ -300,8 +300,8 @@ func newLoggerProvider(ctx context.Context, name string) (func(context.Context) log.WithResource(r), ) global.SetLoggerProvider(loggerProvider) - - return loggerProvider.Shutdown, nil + + return loggerProvider.Shutdown, nil } else if v != "" { fmt.Println("use logger", v)