chore: refactor

This commit is contained in:
xuu 2025-03-27 16:35:05 -06:00
parent 303ca5a2db
commit a3e6fc0c0f
Signed by: xuu
GPG Key ID: 8B3B0604F164E04F
7 changed files with 210 additions and 179 deletions

40
app.go
View File

@ -4,12 +4,10 @@ import (
"context" "context"
"database/sql" "database/sql"
"fmt" "fmt"
"iter"
"os" "os"
"runtime/debug" "runtime/debug"
"strconv" "strconv"
"strings" "strings"
"sync"
_ "embed" _ "embed"
@ -18,14 +16,15 @@ import (
"go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/attribute"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0" semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
"go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace"
"go.sour.is/xt/internal/console"
"go.sour.is/xt/internal/otel" "go.sour.is/xt/internal/otel"
"go.yarn.social/lextwt" "go.yarn.social/lextwt"
"go.yarn.social/types" "go.yarn.social/types"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
) )
func run(c *console) error { func run(ctx context.Context, c *console.C[args]) error {
ctx, span := otel.Span(c.Context) ctx, span := otel.Span(ctx)
defer span.End() defer span.End()
bi, _ := debug.ReadBuildInfo() bi, _ := debug.ReadBuildInfo()
@ -33,8 +32,8 @@ func run(c *console) error {
a := c.Args() a := c.Args()
app := &appState{ app := &appState{
args: a, args: a,
feeds: sync.Map{}, C: c,
queue: FibHeap(func(a, b *Feed) bool { queue: FibHeap(func(a, b *Feed) bool {
return a.NextScanOn.Time.Before(b.NextScanOn.Time) return a.NextScanOn.Time.Before(b.NextScanOn.Time)
}), }),
@ -96,25 +95,24 @@ func run(c *console) error {
} }
wg, ctx := errgroup.WithContext(ctx) wg, ctx := errgroup.WithContext(ctx)
c.Context = ctx
wg.Go(func() error { wg.Go(func() error {
return feedRefreshProcessor(c, app) return feedRefreshProcessor(ctx, app)
}) })
go httpServer(c, app) go httpServer(ctx, app)
err = wg.Wait() err = wg.Wait()
if err != nil { if err != nil {
return err return err
} }
return c.Context.Err() return ctx.Err()
} }
type appState struct { type appState struct {
args args args args
feeds sync.Map
queue *fibHeap[Feed] queue *fibHeap[Feed]
*console.C[args]
} }
type db struct { type db struct {
@ -173,23 +171,3 @@ func (app *appState) DB(ctx context.Context) (db, error) {
return db, err return db, err
} }
func (app *appState) Feed(feedID string) *Feed {
return nil
}
func (app *appState) Feeds() iter.Seq2[string, *Feed] {
return func(yield func(string, *Feed) bool) {
app.feeds.Range(func(k, v any) bool {
key, _ := k.(string)
value, ok := v.(*Feed)
if !ok {
return false
}
if !yield(key, value) {
return false
}
return true
})
}
}

14
feed.go
View File

@ -134,6 +134,10 @@ var (
'+'||abs(refresh_rate+cast(random()%30 as int))||' seconds' '+'||abs(refresh_rate+cast(random()%30 as int))||' seconds'
) < datetime(current_timestamp, '+3 minutes') ) < datetime(current_timestamp, '+3 minutes')
` `
permaban = []string{
"//lublin.se/",
"//enotty.dk/",
}
) )
func (f *Feed) Create(ctx context.Context, db db) error { func (f *Feed) Create(ctx context.Context, db db) error {
@ -370,11 +374,10 @@ func storeFeed(ctx context.Context, db db, f types.TwtFile) error {
} }
func (feed *Feed) MakeHTTPRequest(ctx context.Context) (*http.Request, error) { func (feed *Feed) MakeHTTPRequest(ctx context.Context) (*http.Request, error) {
if strings.Contains(feed.URI, "lublin.se") { for _, host := range permaban {
return nil, fmt.Errorf("%w: permaban: %s", ErrPermanentlyDead, feed.URI) if strings.Contains(feed.URI, host) {
} return nil, fmt.Errorf("%w: permaban: %s", ErrPermanentlyDead, feed.URI)
if strings.Contains(feed.URI, "enotty.dk") { }
return nil, fmt.Errorf("%w: permaban: %s", ErrPermanentlyDead, feed.URI)
} }
req, err := http.NewRequestWithContext(ctx, "GET", feed.URI, nil) req, err := http.NewRequestWithContext(ctx, "GET", feed.URI, nil)
@ -392,6 +395,7 @@ func (feed *Feed) MakeHTTPRequest(ctx context.Context) (*http.Request, error) {
req.Header.Add("If-None-Match", feed.ETag.String) req.Header.Add("If-None-Match", feed.ETag.String)
} }
// TODO: this is probably not needed.
if feed.DiscloseFeedURL != "" && feed.DiscloseNick != "" { if feed.DiscloseFeedURL != "" && feed.DiscloseNick != "" {
req.Header.Set("User-Agent", fmt.Sprintf("xt/%s (+%s; @%s)", req.Header.Set("User-Agent", fmt.Sprintf("xt/%s (+%s; @%s)",
feed.Version, feed.DiscloseFeedURL, feed.DiscloseNick)) feed.Version, feed.DiscloseFeedURL, feed.DiscloseNick))

71
http.go
View File

@ -1,6 +1,7 @@
package main package main
import ( import (
"context"
"errors" "errors"
"fmt" "fmt"
"net/http" "net/http"
@ -10,14 +11,16 @@ import (
"strings" "strings"
"time" "time"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.yarn.social/lextwt" "go.yarn.social/lextwt"
"go.yarn.social/types" "go.yarn.social/types"
"go.sour.is/xt/internal/otel" "go.sour.is/xt/internal/otel"
) )
func httpServer(c *console, app *appState) error { func httpServer(ctx context.Context, app *appState) error {
ctx, span := otel.Span(c) ctx, span := otel.Span(ctx)
defer span.End() defer span.End()
span.AddEvent("start http server") span.AddEvent("start http server")
@ -110,11 +113,11 @@ func httpServer(c *console, app *appState) error {
defer span.End() defer span.End()
args := make([]any, 0, 3) args := make([]any, 0, 3)
uriarg := "" uriarg := "1 = 1"
uri := r.URL.Query().Get("uri") uri := r.URL.Query().Get("uri")
if uri != "" { if uri != "" {
feed_id := urlNS.UUID5(uri) feed_id := urlNS.UUID5(uri)
uriarg = "and feed_id = ?" uriarg = "feed_id = ?"
args = append(args, feed_id) args = append(args, feed_id)
} }
@ -123,11 +126,39 @@ func httpServer(c *console, app *appState) error {
limit = v limit = v
} }
offset := 0 var offset int64 = 0
if v, ok := strconv.Atoi(r.URL.Query().Get("offset")); ok == nil { if v, ok := strconv.ParseInt(r.URL.Query().Get("offset"), 10, 64); ok == nil {
offset = v offset = v
} }
args = append(args, limit, offset)
var end int64
err = db.QueryRowContext(ctx, `
select count(*) n from twts
JOIN (
SELECT
feed_id,
nick,
uri
FROM feeds
where state not in ('frozen', 'permanantly-dead') and `+uriarg+`
) using (feed_id)
where `+uriarg, args...).Scan(&end)
span.RecordError(err)
if offset < 1 {
offset += end
}
limit = min(100, max(1, limit))
offset = max(1, offset)
args = append(args, limit, offset-int64(limit))
span.AddEvent("twts", trace.WithAttributes(
attribute.Int("limit", limit),
attribute.Int64("offset-end", offset),
attribute.Int64("offset-start", offset-int64(limit)),
attribute.Int64("max", end),
))
w.Header().Set("Content-Type", "text/plain; charset=utf-8") w.Header().Set("Content-Type", "text/plain; charset=utf-8")
rows, err := db.QueryContext( rows, err := db.QueryContext(
ctx, ctx,
@ -145,10 +176,9 @@ func httpServer(c *console, app *appState) error {
nick, nick,
uri uri
FROM feeds FROM feeds
where state not in ('frozen', 'permanantly-dead') where state not in ('frozen', 'permanantly-dead') and `+uriarg+`
`+uriarg+`
) using (feed_id) ) using (feed_id)
order by ulid desc order by ulid asc
limit ? limit ?
offset ? offset ?
`, args..., `, args...,
@ -182,10 +212,14 @@ func httpServer(c *console, app *appState) error {
} }
var preamble lextwt.Comments var preamble lextwt.Comments
preamble = append(preamble, lextwt.NewComment("# I am the Watcher. I am your guide through this vast new twtiverse.")) preamble = append(preamble, lextwt.NewComment("# I am the Watcher. I am your guide through this vast new twtiverse."))
preamble = append(preamble, lextwt.NewComment(fmt.Sprint("# range = 1 ", end)))
preamble = append(preamble, lextwt.NewComment("# self = /api/plain/twts"+mkqry(uri, limit, offset))) preamble = append(preamble, lextwt.NewComment("# self = /api/plain/twts"+mkqry(uri, limit, offset)))
preamble = append(preamble, lextwt.NewComment("# next = /api/plain/twts"+mkqry(uri, limit, offset+len(twts)))) if next := offset + int64(len(twts)); next < end {
if offset > 0 { preamble = append(preamble, lextwt.NewComment("# next = /api/plain/twts"+mkqry(uri, limit, next)))
preamble = append(preamble, lextwt.NewComment("# prev = /api/plain/twts"+mkqry(uri, limit, offset-limit))) }
if prev := offset - int64(limit); prev > 0 {
preamble = append(preamble, lextwt.NewComment("# prev = /api/plain/twts"+mkqry(uri, limit, prev)))
} }
reg := lextwt.NewTwtRegistry(preamble, twts) reg := lextwt.NewTwtRegistry(preamble, twts)
@ -221,7 +255,7 @@ func httpServer(c *console, app *appState) error {
) using (feed_id) ) using (feed_id)
`+where+` `+where+`
order by nick, uri order by nick, uri
`,args..., `, args...,
) )
if err != nil { if err != nil {
span.RecordError(err) span.RecordError(err)
@ -268,7 +302,7 @@ func httpServer(c *console, app *appState) error {
Handler: http.DefaultServeMux, Handler: http.DefaultServeMux,
} }
c.AddCancel(srv.Shutdown) app.AddCancel(srv.Shutdown)
err = srv.ListenAndServe() err = srv.ListenAndServe()
if !errors.Is(err, http.ErrServerClosed) { if !errors.Is(err, http.ErrServerClosed) {
span.RecordError(err) span.RecordError(err)
@ -287,20 +321,17 @@ func notAny(s string, chars string) bool {
return true return true
} }
func mkqry(uri string, limit int, offset int64) string {
func mkqry(uri string, limit, offset int) string {
qry := make([]string, 0, 3) qry := make([]string, 0, 3)
if uri != "" { if uri != "" {
qry = append(qry, "uri=" + uri) qry = append(qry, "uri="+uri)
} }
limit = min(100, max(1, limit))
if limit != 100 { if limit != 100 {
qry = append(qry, fmt.Sprint("limit=", limit)) qry = append(qry, fmt.Sprint("limit=", limit))
} }
offset = max(0, offset)
if offset != 0 { if offset != 0 {
qry = append(qry, fmt.Sprint("offset=", offset)) qry = append(qry, fmt.Sprint("offset=", offset))
} }

View File

@ -0,0 +1,61 @@
package console
import (
"context"
"errors"
"fmt"
"io"
"os"
"os/signal"
"time"
)
type C[A any] struct {
io.Reader
io.Writer
err io.Writer
args A
abort func()
cancelfns []func(context.Context) error
}
func New[A any](args A) (context.Context, *C[A]) {
ctx := context.Background()
ctx, abort := context.WithCancel(ctx)
ctx, stop := signal.NotifyContext(ctx, os.Interrupt)
go func() { <-ctx.Done(); stop() }() // restore interrupt function
console := &C[A]{Reader: os.Stdin, Writer: os.Stdout, err: os.Stderr, args: args, abort: abort}
return ctx, console
}
func (c *C[A]) Args() A {
return c.args
}
func (c *C[A]) Shutdown() error {
fmt.Fprintln(c.err, "shutting down ", len(c.cancelfns), " cancel functions...")
defer fmt.Fprintln(c.err, "done")
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
c.abort()
var err error
for _, fn := range c.cancelfns {
err = errors.Join(err, fn(ctx))
}
return err
}
func (c *C[A]) AddCancel(fn func(context.Context) error) { c.cancelfns = append(c.cancelfns, fn) }
func (c *C[A]) IfFatal(err error) {
if err == nil {
return
}
fmt.Fprintln(c.err, err)
err = c.Shutdown()
if err != nil {
fmt.Fprintln(c.err, err)
}
os.Exit(1)
}

54
internal/env/env.go vendored Normal file
View File

@ -0,0 +1,54 @@
package env
import (
"bufio"
"os"
"strings"
)
func Default(key, def string) string {
if v, ok := os.LookupEnv(key); ok {
return v
}
return def
}
type secret struct {
value string
}
func (s secret) Secret() string {
return s.value
}
func (s secret) String() string {
return "***"
}
func Secret(key, def string) secret {
if v, ok := os.LookupEnv(key); ok {
return secret{v}
}
return secret{def}
}
func DotEnv() {
fd, err := os.Open(".env")
if err != nil {
return
}
scan := bufio.NewScanner(fd)
for scan.Scan() {
line := scan.Text()
if strings.HasPrefix(line, "#") {
continue
}
key, val, ok := strings.Cut(line, "=")
if !ok {
continue
}
os.Setenv(strings.TrimSpace(key), strings.TrimSpace(val))
}
}

141
main.go
View File

@ -1,111 +1,16 @@
package main package main
import ( import (
"bufio"
"context" "context"
"errors" "errors"
"fmt"
"io"
"os"
"os/signal"
"strings"
"go.opentelemetry.io/otel/metric" "go.sour.is/xt/internal/console"
"go.sour.is/xt/internal/env"
"go.sour.is/xt/internal/otel" "go.sour.is/xt/internal/otel"
) )
const name = "go.sour.is/xt" const name = "go.sour.is/xt"
var m_up metric.Int64Gauge
func main() {
dotEnv() // load .env
ctx, console := newConsole(args{
dbtype: env("XT_DBTYPE", "sqlite3"),
dbfile: env("XT_DBFILE", "file:twt.db"),
baseFeed: env("XT_BASE_FEED", "feed"),
Nick: env("XT_NICK", "xuu"),
URI: env("XT_URI", "https://txt.sour.is/user/xuu/twtxt.txt"),
Listen: env("XT_LISTEN", ":8080"),
})
finish, err := otel.Init(ctx, name)
console.IfFatal(err)
console.AddCancel(finish)
m_up, err = otel.Meter().Int64Gauge("up")
console.IfFatal(err)
m_up.Record(ctx, 1)
defer m_up.Record(context.Background(), 0)
err = run(console)
if !errors.Is(err, context.Canceled) {
console.IfFatal(err)
}
}
type console struct {
io.Reader
io.Writer
err io.Writer
context.Context
abort func()
cancelfns []func(context.Context) error
}
func newConsole(args args) (context.Context, *console) {
ctx := context.Background()
ctx, abort := context.WithCancel(ctx)
ctx, stop := signal.NotifyContext(ctx, os.Interrupt)
go func() { <-ctx.Done(); stop() }() // restore interrupt function
console := &console{Reader: os.Stdin, Writer: os.Stdout, err: os.Stderr, Context: ctx, abort: abort}
console.Set("console", console)
console.Set("args", args)
return ctx, console
}
func (c *console) Args() args {
v, ok := c.Get("args").(args)
if !ok {
return args{}
}
return v
}
func (c *console) Shutdown() error {
fmt.Fprintln(c.err, "shutting down ", len(c.cancelfns), " cancel functions...")
defer fmt.Fprintln(c.err, "done")
c.abort()
var err error
for _, fn := range c.cancelfns {
err = errors.Join(err, fn(c.Context))
}
return err
}
func (c *console) AddCancel(fn func(context.Context) error) { c.cancelfns = append(c.cancelfns, fn) }
func (c *console) IfFatal(err error) {
if err == nil {
return
}
fmt.Fprintln(c.err, err)
c.abort()
os.Exit(1)
}
type contextKey struct{ name string }
func (c *console) Set(name string, value any) {
c.Context = context.WithValue(c.Context, contextKey{name}, value)
}
func (c *console) Get(name string) any {
return c.Context.Value(contextKey{name})
}
type args struct { type args struct {
dbtype string dbtype string
dbfile string dbfile string
@ -115,32 +20,30 @@ type args struct {
Listen string Listen string
} }
func env(key, def string) string { func main() {
if v, ok := os.LookupEnv(key); ok { env.DotEnv() // load .env
return v
}
return def
}
func dotEnv() { ctx, console := console.New(args{
fd, err := os.Open(".env") dbtype: env.Default("XT_DBTYPE", "sqlite3"),
if err != nil { dbfile: env.Default("XT_DBFILE", "file:twt.db"),
return baseFeed: env.Default("XT_BASE_FEED", "feed"),
} Nick: env.Default("XT_NICK", "xuu"),
URI: env.Default("XT_URI", "https://txt.sour.is/user/xuu/twtxt.txt"),
Listen: env.Default("XT_LISTEN", ":8080"),
})
scan := bufio.NewScanner(fd) finish, err := otel.Init(ctx, name)
console.IfFatal(err)
console.AddCancel(finish)
for scan.Scan() { m_up, err := otel.Meter().Int64Gauge("up")
line := scan.Text() console.IfFatal(err)
if strings.HasPrefix(line, "#") { m_up.Record(ctx, 1)
continue defer m_up.Record(context.Background(), 0)
}
key, val, ok := strings.Cut(line, "=")
if !ok {
continue
}
os.Setenv(strings.TrimSpace(key), strings.TrimSpace(val)) err = run(ctx, console)
if !errors.Is(err, context.Canceled) {
console.IfFatal(err)
} }
} }

View File

@ -26,8 +26,8 @@ const (
TwoMinutes = 120 TwoMinutes = 120
) )
func feedRefreshProcessor(c *console, app *appState) error { func feedRefreshProcessor(ctx context.Context, app *appState) error {
ctx, span := otel.Span(c.Context) ctx, span := otel.Span(ctx)
defer span.End() defer span.End()
sleeping_time, _ := otel.Meter().Int64Counter("xt_feed_sleep") sleeping_time, _ := otel.Meter().Int64Counter("xt_feed_sleep")
@ -38,7 +38,7 @@ func feedRefreshProcessor(c *console, app *appState) error {
fetch, close := NewFuncPool(ctx, 40, f.Fetch) fetch, close := NewFuncPool(ctx, 40, f.Fetch)
defer close() defer close()
db, err := app.DB(c) db, err := app.DB(ctx)
if err != nil { if err != nil {
span.RecordError(err) span.RecordError(err)
return err return err
@ -69,7 +69,7 @@ func feedRefreshProcessor(c *console, app *appState) error {
span.AddEvent("sleeping for ", trace.WithAttributes(attribute.Int("seconds", int(TwoMinutes)))) span.AddEvent("sleeping for ", trace.WithAttributes(attribute.Int("seconds", int(TwoMinutes))))
select { select {
case <-time.After(TwoMinutes * time.Second): case <-time.After(TwoMinutes * time.Second):
case <-c.Done(): case <-ctx.Done():
return nil return nil
} }
span.End() span.End()