diff --git a/app.go b/app.go index 791c01b..d76d9b4 100644 --- a/app.go +++ b/app.go @@ -4,12 +4,10 @@ import ( "context" "database/sql" "fmt" - "iter" "os" "runtime/debug" "strconv" "strings" - "sync" _ "embed" @@ -18,14 +16,15 @@ import ( "go.opentelemetry.io/otel/attribute" semconv "go.opentelemetry.io/otel/semconv/v1.4.0" "go.opentelemetry.io/otel/trace" + "go.sour.is/xt/internal/console" "go.sour.is/xt/internal/otel" "go.yarn.social/lextwt" "go.yarn.social/types" "golang.org/x/sync/errgroup" ) -func run(c *console) error { - ctx, span := otel.Span(c.Context) +func run(ctx context.Context, c *console.C[args]) error { + ctx, span := otel.Span(ctx) defer span.End() bi, _ := debug.ReadBuildInfo() @@ -33,8 +32,8 @@ func run(c *console) error { a := c.Args() app := &appState{ - args: a, - feeds: sync.Map{}, + args: a, + C: c, queue: FibHeap(func(a, b *Feed) bool { return a.NextScanOn.Time.Before(b.NextScanOn.Time) }), @@ -96,25 +95,24 @@ func run(c *console) error { } wg, ctx := errgroup.WithContext(ctx) - c.Context = ctx wg.Go(func() error { - return feedRefreshProcessor(c, app) + return feedRefreshProcessor(ctx, app) }) - go httpServer(c, app) + go httpServer(ctx, app) err = wg.Wait() if err != nil { return err } - return c.Context.Err() + return ctx.Err() } type appState struct { args args - feeds sync.Map queue *fibHeap[Feed] + *console.C[args] } type db struct { @@ -173,23 +171,3 @@ func (app *appState) DB(ctx context.Context) (db, error) { return db, err } - -func (app *appState) Feed(feedID string) *Feed { - return nil -} - -func (app *appState) Feeds() iter.Seq2[string, *Feed] { - return func(yield func(string, *Feed) bool) { - app.feeds.Range(func(k, v any) bool { - key, _ := k.(string) - value, ok := v.(*Feed) - if !ok { - return false - } - if !yield(key, value) { - return false - } - return true - }) - } -} diff --git a/feed.go b/feed.go index 120475e..4076222 100644 --- a/feed.go +++ b/feed.go @@ -134,6 +134,10 @@ var ( '+'||abs(refresh_rate+cast(random()%30 as int))||' seconds' ) < datetime(current_timestamp, '+3 minutes') ` + permaban = []string{ + "//lublin.se/", + "//enotty.dk/", + } ) func (f *Feed) Create(ctx context.Context, db db) error { @@ -370,11 +374,10 @@ func storeFeed(ctx context.Context, db db, f types.TwtFile) error { } func (feed *Feed) MakeHTTPRequest(ctx context.Context) (*http.Request, error) { - if strings.Contains(feed.URI, "lublin.se") { - return nil, fmt.Errorf("%w: permaban: %s", ErrPermanentlyDead, feed.URI) - } - if strings.Contains(feed.URI, "enotty.dk") { - return nil, fmt.Errorf("%w: permaban: %s", ErrPermanentlyDead, feed.URI) + for _, host := range permaban { + if strings.Contains(feed.URI, host) { + return nil, fmt.Errorf("%w: permaban: %s", ErrPermanentlyDead, feed.URI) + } } req, err := http.NewRequestWithContext(ctx, "GET", feed.URI, nil) @@ -392,6 +395,7 @@ func (feed *Feed) MakeHTTPRequest(ctx context.Context) (*http.Request, error) { req.Header.Add("If-None-Match", feed.ETag.String) } + // TODO: this is probably not needed. if feed.DiscloseFeedURL != "" && feed.DiscloseNick != "" { req.Header.Set("User-Agent", fmt.Sprintf("xt/%s (+%s; @%s)", feed.Version, feed.DiscloseFeedURL, feed.DiscloseNick)) diff --git a/http.go b/http.go index 1a2a236..12c63bc 100644 --- a/http.go +++ b/http.go @@ -1,6 +1,7 @@ package main import ( + "context" "errors" "fmt" "net/http" @@ -10,14 +11,16 @@ import ( "strings" "time" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "go.yarn.social/lextwt" "go.yarn.social/types" "go.sour.is/xt/internal/otel" ) -func httpServer(c *console, app *appState) error { - ctx, span := otel.Span(c) +func httpServer(ctx context.Context, app *appState) error { + ctx, span := otel.Span(ctx) defer span.End() span.AddEvent("start http server") @@ -110,11 +113,11 @@ func httpServer(c *console, app *appState) error { defer span.End() args := make([]any, 0, 3) - uriarg := "" + uriarg := "1 = 1" uri := r.URL.Query().Get("uri") if uri != "" { feed_id := urlNS.UUID5(uri) - uriarg = "and feed_id = ?" + uriarg = "feed_id = ?" args = append(args, feed_id) } @@ -123,11 +126,39 @@ func httpServer(c *console, app *appState) error { limit = v } - offset := 0 - if v, ok := strconv.Atoi(r.URL.Query().Get("offset")); ok == nil { + var offset int64 = 0 + if v, ok := strconv.ParseInt(r.URL.Query().Get("offset"), 10, 64); ok == nil { offset = v } - args = append(args, limit, offset) + + var end int64 + err = db.QueryRowContext(ctx, ` + select count(*) n from twts + JOIN ( + SELECT + feed_id, + nick, + uri + FROM feeds + where state not in ('frozen', 'permanantly-dead') and `+uriarg+` + ) using (feed_id) + where `+uriarg, args...).Scan(&end) + span.RecordError(err) + + if offset < 1 { + offset += end + } + + limit = min(100, max(1, limit)) + offset = max(1, offset) + + args = append(args, limit, offset-int64(limit)) + span.AddEvent("twts", trace.WithAttributes( + attribute.Int("limit", limit), + attribute.Int64("offset-end", offset), + attribute.Int64("offset-start", offset-int64(limit)), + attribute.Int64("max", end), + )) w.Header().Set("Content-Type", "text/plain; charset=utf-8") rows, err := db.QueryContext( ctx, @@ -145,10 +176,9 @@ func httpServer(c *console, app *appState) error { nick, uri FROM feeds - where state not in ('frozen', 'permanantly-dead') - `+uriarg+` + where state not in ('frozen', 'permanantly-dead') and `+uriarg+` ) using (feed_id) - order by ulid desc + order by ulid asc limit ? offset ? `, args..., @@ -182,10 +212,14 @@ func httpServer(c *console, app *appState) error { } var preamble lextwt.Comments preamble = append(preamble, lextwt.NewComment("# I am the Watcher. I am your guide through this vast new twtiverse.")) + + preamble = append(preamble, lextwt.NewComment(fmt.Sprint("# range = 1 ", end))) preamble = append(preamble, lextwt.NewComment("# self = /api/plain/twts"+mkqry(uri, limit, offset))) - preamble = append(preamble, lextwt.NewComment("# next = /api/plain/twts"+mkqry(uri, limit, offset+len(twts)))) - if offset > 0 { - preamble = append(preamble, lextwt.NewComment("# prev = /api/plain/twts"+mkqry(uri, limit, offset-limit))) + if next := offset + int64(len(twts)); next < end { + preamble = append(preamble, lextwt.NewComment("# next = /api/plain/twts"+mkqry(uri, limit, next))) + } + if prev := offset - int64(limit); prev > 0 { + preamble = append(preamble, lextwt.NewComment("# prev = /api/plain/twts"+mkqry(uri, limit, prev))) } reg := lextwt.NewTwtRegistry(preamble, twts) @@ -221,7 +255,7 @@ func httpServer(c *console, app *appState) error { ) using (feed_id) `+where+` order by nick, uri - `,args..., + `, args..., ) if err != nil { span.RecordError(err) @@ -268,7 +302,7 @@ func httpServer(c *console, app *appState) error { Handler: http.DefaultServeMux, } - c.AddCancel(srv.Shutdown) + app.AddCancel(srv.Shutdown) err = srv.ListenAndServe() if !errors.Is(err, http.ErrServerClosed) { span.RecordError(err) @@ -287,20 +321,17 @@ func notAny(s string, chars string) bool { return true } - -func mkqry(uri string, limit, offset int) string { +func mkqry(uri string, limit int, offset int64) string { qry := make([]string, 0, 3) if uri != "" { - qry = append(qry, "uri=" + uri) + qry = append(qry, "uri="+uri) } - limit = min(100, max(1, limit)) if limit != 100 { qry = append(qry, fmt.Sprint("limit=", limit)) } - offset = max(0, offset) if offset != 0 { qry = append(qry, fmt.Sprint("offset=", offset)) } diff --git a/internal/console/console.go b/internal/console/console.go new file mode 100644 index 0000000..aca099e --- /dev/null +++ b/internal/console/console.go @@ -0,0 +1,61 @@ +package console + +import ( + "context" + "errors" + "fmt" + "io" + "os" + "os/signal" + "time" +) + +type C[A any] struct { + io.Reader + io.Writer + err io.Writer + args A + abort func() + cancelfns []func(context.Context) error +} + +func New[A any](args A) (context.Context, *C[A]) { + ctx := context.Background() + ctx, abort := context.WithCancel(ctx) + ctx, stop := signal.NotifyContext(ctx, os.Interrupt) + go func() { <-ctx.Done(); stop() }() // restore interrupt function + + console := &C[A]{Reader: os.Stdin, Writer: os.Stdout, err: os.Stderr, args: args, abort: abort} + return ctx, console +} + +func (c *C[A]) Args() A { + return c.args +} +func (c *C[A]) Shutdown() error { + fmt.Fprintln(c.err, "shutting down ", len(c.cancelfns), " cancel functions...") + defer fmt.Fprintln(c.err, "done") + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + c.abort() + var err error + for _, fn := range c.cancelfns { + err = errors.Join(err, fn(ctx)) + } + return err +} +func (c *C[A]) AddCancel(fn func(context.Context) error) { c.cancelfns = append(c.cancelfns, fn) } + +func (c *C[A]) IfFatal(err error) { + if err == nil { + return + } + fmt.Fprintln(c.err, err) + err = c.Shutdown() + if err != nil { + fmt.Fprintln(c.err, err) + } + os.Exit(1) +} diff --git a/internal/env/env.go b/internal/env/env.go new file mode 100644 index 0000000..de2b7cd --- /dev/null +++ b/internal/env/env.go @@ -0,0 +1,54 @@ +package env + +import ( + "bufio" + "os" + "strings" +) + + +func Default(key, def string) string { + if v, ok := os.LookupEnv(key); ok { + return v + } + return def +} + +type secret struct { + value string +} +func (s secret) Secret() string { + return s.value +} +func (s secret) String() string { + return "***" +} +func Secret(key, def string) secret { + if v, ok := os.LookupEnv(key); ok { + return secret{v} + } + return secret{def} +} + +func DotEnv() { + fd, err := os.Open(".env") + if err != nil { + return + } + + scan := bufio.NewScanner(fd) + + for scan.Scan() { + line := scan.Text() + + if strings.HasPrefix(line, "#") { + continue + } + key, val, ok := strings.Cut(line, "=") + if !ok { + continue + } + + os.Setenv(strings.TrimSpace(key), strings.TrimSpace(val)) + } +} diff --git a/main.go b/main.go index 679e345..1398f9f 100644 --- a/main.go +++ b/main.go @@ -1,111 +1,16 @@ package main import ( - "bufio" "context" "errors" - "fmt" - "io" - "os" - "os/signal" - "strings" - "go.opentelemetry.io/otel/metric" + "go.sour.is/xt/internal/console" + "go.sour.is/xt/internal/env" "go.sour.is/xt/internal/otel" ) const name = "go.sour.is/xt" -var m_up metric.Int64Gauge - -func main() { - dotEnv() // load .env - - ctx, console := newConsole(args{ - dbtype: env("XT_DBTYPE", "sqlite3"), - dbfile: env("XT_DBFILE", "file:twt.db"), - baseFeed: env("XT_BASE_FEED", "feed"), - Nick: env("XT_NICK", "xuu"), - URI: env("XT_URI", "https://txt.sour.is/user/xuu/twtxt.txt"), - Listen: env("XT_LISTEN", ":8080"), - }) - - finish, err := otel.Init(ctx, name) - console.IfFatal(err) - console.AddCancel(finish) - - m_up, err = otel.Meter().Int64Gauge("up") - console.IfFatal(err) - - m_up.Record(ctx, 1) - defer m_up.Record(context.Background(), 0) - - err = run(console) - if !errors.Is(err, context.Canceled) { - console.IfFatal(err) - } -} - -type console struct { - io.Reader - io.Writer - err io.Writer - context.Context - abort func() - cancelfns []func(context.Context) error -} - -func newConsole(args args) (context.Context, *console) { - ctx := context.Background() - ctx, abort := context.WithCancel(ctx) - ctx, stop := signal.NotifyContext(ctx, os.Interrupt) - go func() { <-ctx.Done(); stop() }() // restore interrupt function - - console := &console{Reader: os.Stdin, Writer: os.Stdout, err: os.Stderr, Context: ctx, abort: abort} - console.Set("console", console) - console.Set("args", args) - return ctx, console -} - -func (c *console) Args() args { - v, ok := c.Get("args").(args) - if !ok { - return args{} - } - return v -} -func (c *console) Shutdown() error { - fmt.Fprintln(c.err, "shutting down ", len(c.cancelfns), " cancel functions...") - defer fmt.Fprintln(c.err, "done") - - c.abort() - var err error - for _, fn := range c.cancelfns { - err = errors.Join(err, fn(c.Context)) - } - return err -} -func (c *console) AddCancel(fn func(context.Context) error) { c.cancelfns = append(c.cancelfns, fn) } - -func (c *console) IfFatal(err error) { - if err == nil { - return - } - fmt.Fprintln(c.err, err) - c.abort() - os.Exit(1) -} - -type contextKey struct{ name string } - -func (c *console) Set(name string, value any) { - c.Context = context.WithValue(c.Context, contextKey{name}, value) -} - -func (c *console) Get(name string) any { - return c.Context.Value(contextKey{name}) -} - type args struct { dbtype string dbfile string @@ -115,32 +20,30 @@ type args struct { Listen string } -func env(key, def string) string { - if v, ok := os.LookupEnv(key); ok { - return v - } - return def -} +func main() { + env.DotEnv() // load .env -func dotEnv() { - fd, err := os.Open(".env") - if err != nil { - return - } + ctx, console := console.New(args{ + dbtype: env.Default("XT_DBTYPE", "sqlite3"), + dbfile: env.Default("XT_DBFILE", "file:twt.db"), + baseFeed: env.Default("XT_BASE_FEED", "feed"), + Nick: env.Default("XT_NICK", "xuu"), + URI: env.Default("XT_URI", "https://txt.sour.is/user/xuu/twtxt.txt"), + Listen: env.Default("XT_LISTEN", ":8080"), + }) - scan := bufio.NewScanner(fd) + finish, err := otel.Init(ctx, name) + console.IfFatal(err) + console.AddCancel(finish) - for scan.Scan() { - line := scan.Text() + m_up, err := otel.Meter().Int64Gauge("up") + console.IfFatal(err) - if strings.HasPrefix(line, "#") { - continue - } - key, val, ok := strings.Cut(line, "=") - if !ok { - continue - } + m_up.Record(ctx, 1) + defer m_up.Record(context.Background(), 0) - os.Setenv(strings.TrimSpace(key), strings.TrimSpace(val)) + err = run(ctx, console) + if !errors.Is(err, context.Canceled) { + console.IfFatal(err) } } diff --git a/refresh-loop.go b/refresh-loop.go index 186e643..73c3ec7 100644 --- a/refresh-loop.go +++ b/refresh-loop.go @@ -26,8 +26,8 @@ const ( TwoMinutes = 120 ) -func feedRefreshProcessor(c *console, app *appState) error { - ctx, span := otel.Span(c.Context) +func feedRefreshProcessor(ctx context.Context, app *appState) error { + ctx, span := otel.Span(ctx) defer span.End() sleeping_time, _ := otel.Meter().Int64Counter("xt_feed_sleep") @@ -38,7 +38,7 @@ func feedRefreshProcessor(c *console, app *appState) error { fetch, close := NewFuncPool(ctx, 40, f.Fetch) defer close() - db, err := app.DB(c) + db, err := app.DB(ctx) if err != nil { span.RecordError(err) return err @@ -69,7 +69,7 @@ func feedRefreshProcessor(c *console, app *appState) error { span.AddEvent("sleeping for ", trace.WithAttributes(attribute.Int("seconds", int(TwoMinutes)))) select { case <-time.After(TwoMinutes * time.Second): - case <-c.Done(): + case <-ctx.Done(): return nil } span.End()