From 1574c48429b9f2c30a013371928f5e1e5518715d Mon Sep 17 00:00:00 2001 From: xuu Date: Sat, 15 Feb 2025 16:12:42 -0700 Subject: [PATCH] chore: refactor app harness --- about.me | 84 ++++++++++ feed.go | 248 ++++++++++++++++++++++++++++- fetcher.go | 98 ++++++++---- fibheap.go | 31 +++- go.mod | 10 ++ go.sum | 19 +++ http.go | 97 ++++++++++++ main.go | 40 +++-- refresh-loop.go | 156 +++++++++++++++++++ service.go | 405 +++++++++--------------------------------------- service_test.go | 14 ++ 11 files changed, 824 insertions(+), 378 deletions(-) create mode 100644 about.me create mode 100644 http.go create mode 100644 refresh-loop.go create mode 100644 service_test.go diff --git a/about.me b/about.me new file mode 100644 index 0000000..dcf15fa --- /dev/null +++ b/about.me @@ -0,0 +1,84 @@ +.ce +Preamble +.sp +We, the people of the United States, in order +to form a more perfect Union, establish justice, insure +domestic tranquility, provide for the common defense, promote +the general welfare, +and secure the blessing of liberty to ourselves and our +posterity do ordain and establish this Constitution for the +United States of America. +.sp +.nr aR 0 1 +.af aR I +.de AR +.ce +.ps 16 +.ft B +Article \\n+(aR +.nr sE 0 1 +.af sE 1 +.ps 12 +.ft P +.. +.de SE +.sp +.ft B +\\s-2SECTION \\n+(sE:\\s+2 +.ft P +.nr pP 0 1 +.af pP 1 +.. +.de PP +.sp +.ft I +\\s-3Paragraph \\n+(pP:\\s+3 +.ft P +.. +.AR +.SE +Legislative powers; in whom vested: +.PP +All legislative powers herein granted shall be vested in a +Congress of the United States, which shall consist of a Senate +and a House of Representatives. +.SE +House of Representatives, how and by whom chosen, Qualifications +of a Representative. Representatives and direct taxes, how +apportioned. Enumeration. Vacancies to be filled. Power of +choosing officers and of impeachment. +.PP +The House of Representatives shall be composed of members chosen +every second year by the people of the several states, and the +electors in each State shall have the qualifications requisite +for electors of the most numerous branch of the State Legislature. +.PP +No person shall be a Representative who shall not have attained +to the age of twenty-five years, and been seven years a citizen +of the United States, and who shall not, when elected, be an +inhabitant of that State in which he shall be chosen. +.PP +Representatives and direct taxes shall be apportioned among the +several States which maybe included within this Union, according +to their respective numbers, which shall be determined by adding +to the whole number of free persons, including those bound for +service for a term of years, and excluding Indians not taxed, +three-fifths of all other persons. The actual enumeration shall +be made within three years after the first meeting of the +Congress of the United States, and within every subsequent term +of ten years, in such manner as they shall by law direct. The +number of Representatives shall not exceed one for every thirty +thousand, but each State shall have at least one Representative; +and until such enumeration shall be made, the State of New +Hampshire shall be entitled to choose three, Massachusetts eight, +Rhode Island and Providence Plantations one, Connecticut +five, New York six, New Jersey four, Pennsylvania eight, +Delaware one, Maryland six, Virginia ten, North Carolina five, +South Carolina five, and Georgia three. +.PP +When vacancies happen in the representation from any State, the +Executive Authority thereof shall issue writs of election to fill +such vacancies. +.PP +The House of Representatives shall choose their Speaker and other +officers; and shall have the sole power of impeachment. \ No newline at end of file diff --git a/feed.go b/feed.go index ea0be6e..51a0d8a 100644 --- a/feed.go +++ b/feed.go @@ -1,8 +1,21 @@ package main import ( + "cmp" "context" "database/sql" + "fmt" + "iter" + "net/http" + "net/url" + "slices" + "strings" + "time" + + _ "embed" + + "go.yarn.social/lextwt" + "go.yarn.social/types" ) type Feed struct { @@ -17,13 +30,67 @@ type Feed struct { LastError sql.NullString ETag sql.NullString + Version string DiscloseFeedURL string DiscloseNick string + FirstFetch bool - Version string + State State } +type State string + +const ( + PermanentlyDead State = "permanantly-dead" + Frozen State = "frozen" + Cold State = "cold" + Warm State = "warm" + Hot State = "hot" +) + +var ( + //go:embed init.sql + initSQL string + + insertFeed = ` + insert into feeds + (feed_id, uri, nick, last_scan_on, refresh_rate) + values (?, ?, ?, ?, ?) + ON CONFLICT (feed_id) DO NOTHING + ` + + insertTwt = ` + insert into twts + (feed_id, hash, conv, dt, text, mentions, tags) + values (?, ?, ?, ?, ?, ?, ?) + ON CONFLICT (feed_id, hash) DO NOTHING + ` + + fetchFeeds = ` + select + feed_id, + uri, + nick, + last_scan_on, + refresh_rate, + last_modified_on, + last_etag + from feeds + where datetime(last_scan_on, '+'||refresh_rate||' seconds') < datetime(current_timestamp, '+10 minutes') + ` + updateFeed = ` + update feeds set + last_scan_on = ?, + refresh_rate = ?, + last_modified_on = ?, + last_etag = ?, + last_error = ? + where feed_id = ? + ` +) + func (f *Feed) Save(ctx context.Context, db *sql.DB) error { + fmt.Println(f.FetchURI, " ", f.LastModified, " ", f.LastError) _, err := db.ExecContext( ctx, updateFeed, @@ -36,3 +103,182 @@ func (f *Feed) Save(ctx context.Context, db *sql.DB) error { ) return err } + +func (f *Feed) Scan(res interface{ Scan(...any) error }) error { + f.State = "load" + var err error + + f.Version = "0.0.1" + err = res.Scan( + &f.FeedID, + &f.URI, + &f.Nick, + &f.LastScanOn, + &f.RefreshRate, + &f.LastModified, + &f.ETag, + ) + if err != nil { + return err + } + + if !f.LastScanOn.Valid { + f.FirstFetch = true + f.LastScanOn.Time = time.Now() + f.LastScanOn.Valid = true + } else { + f.LastScanOn.Time = f.LastScanOn.Time.Add(time.Duration(f.RefreshRate) * time.Second) + } + + f.FetchURI = f.URI + return err +} + +func loadFeeds(ctx context.Context, db *sql.DB) (iter.Seq[Feed], error) { + var err error + var res *sql.Rows + + res, err = db.QueryContext(ctx, fetchFeeds) + + if err != nil { + return slices.Values([]Feed{}), err + } + + return func(yield func(Feed) bool) { + for res.Next() { + var f Feed + err = f.Scan(res) + if err != nil { + return + } + if !yield(f) { + return + } + } + }, err +} + +func storeFeed(db *sql.DB, f types.TwtFile) error { + loadTS := time.Now() + refreshRate := 600 + + feedID := urlNS.UUID5(cmp.Or(f.Twter().HashingURI, f.Twter().URI)) + + tx, err := db.Begin() + if err != nil { + return err + } + + followers := f.Info().GetAll("follow") + followMap := make(map[string]string, len(followers)) + for _, f := range f.Info().GetAll("follow") { + nick, uri, ok := strings.Cut(f.Value(), "http") + if !ok { + continue + } + nick = strings.TrimSpace(nick) + uri = "http" + strings.TrimSpace(uri) + + if _, err := url.Parse(uri); err != nil { + continue + } + + followMap[nick] = uri + } + + defer tx.Rollback() + + _, err = tx.Exec( + insertFeed, + feedID, + f.Twter().HashingURI, + f.Twter().DomainNick(), + loadTS, + refreshRate, + ) + if err != nil { + return err + } + + for _, twt := range f.Twts() { + mentions := make(uuids, 0, len(twt.Mentions())) + for _, mention := range twt.Mentions() { + followMap[mention.Twter().Nick] = mention.Twter().URI + mentions = append(mentions, urlNS.UUID5(mention.Twter().URI)) + } + + tags := make(strList, 0, len(twt.Tags())) + for _, tag := range twt.Tags() { + tags = append(tags, tag.Text()) + } + + subject := twt.Subject() + subjectTag := "" + if subject != nil { + if tag, ok := subject.Tag().(*lextwt.Tag); ok && tag != nil { + subjectTag = tag.Text() + } + } + + _, err = tx.Exec( + insertTwt, + feedID, + twt.Hash(), + subjectTag, + twt.Created(), + fmt.Sprint(twt), + mentions.ToStrList(), + tags, + ) + if err != nil { + return err + } + } + + for nick, uri := range followMap { + _, err = tx.Exec( + insertFeed, + urlNS.UUID5(uri), + uri, + nick, + nil, + refreshRate, + ) + if err != nil { + return err + } + } + + return tx.Commit() +} + +func (feed *Feed) MakeHTTPRequest(ctx context.Context) (*http.Request, error) { + feed.State = "fetch" + if strings.Contains(feed.FetchURI, "lublin.se") { + return nil, fmt.Errorf("%w: permaban: %s", ErrPermanentlyDead, feed.URI) + } + + req, err := http.NewRequestWithContext(ctx, "GET", feed.FetchURI, nil) + if err != nil { + return nil, fmt.Errorf("creating HTTP request failed: %w", err) + } + + req.Header.Add("Accept", "text/plain") + + if !feed.LastModified.Valid { + req.Header.Add("If-Modified-Since", feed.LastModified.Time.Format(http.TimeFormat)) + } + + if feed.ETag.Valid { + req.Header.Add("If-None-Match", feed.ETag.String) + } + + if feed.DiscloseFeedURL != "" && feed.DiscloseNick != "" { + req.Header.Set("User-Agent", fmt.Sprintf("xt/%s (+%s; @%s)", + feed.Version, feed.DiscloseFeedURL, feed.DiscloseNick)) + } else { + req.Header.Set("User-Agent", fmt.Sprintf("xt/%s", feed.Version)) + } + + return req, nil +} diff --git a/fetcher.go b/fetcher.go index cb5ffb1..f4118ff 100644 --- a/fetcher.go +++ b/fetcher.go @@ -6,7 +6,7 @@ import ( "fmt" "net" "net/http" - "strings" + "sync" "time" ) @@ -18,7 +18,9 @@ var ( ) type Response struct { + Request *Feed *http.Response + err error } func (r *Response) ETag() string { @@ -67,59 +69,87 @@ func NewHTTPFetcher() *httpFetcher { } } -func (f *httpFetcher) Fetch(ctx context.Context, request *Feed) (*Response, error) { - if strings.Contains(request.FetchURI, "lublin.se") { - return nil, fmt.Errorf("%w: permaban: %s", ErrPermanentlyDead, request.URI) +func (f *httpFetcher) Fetch(ctx context.Context, request *Feed) *Response { + response := &Response{ + Request: request, } - req, err := http.NewRequestWithContext(ctx, "GET", request.FetchURI, nil) + req, err := request.MakeHTTPRequest(ctx) if err != nil { - return nil, fmt.Errorf("creating HTTP request failed: %w", err) - } - - req.Header.Add("Accept", "text/plain") - - if !request.LastModified.Valid { - req.Header.Add("If-Modified-Since", request.LastModified.Time.Format(http.TimeFormat)) - } - - if request.ETag.Valid { - req.Header.Add("If-None-Match", request.ETag.String) - } - - if request.DiscloseFeedURL != "" && request.DiscloseNick != "" { - req.Header.Set("User-Agent", fmt.Sprintf("xt/%s (+%s; @%s)", - request.Version, request.DiscloseFeedURL, request.DiscloseNick)) - } else { - req.Header.Set("User-Agent", fmt.Sprintf("xt/%s", request.Version)) + response.err = err + return response } res, err := f.client.Do(req) if err != nil { if errors.Is(err, &net.DNSError{}) { - return nil, fmt.Errorf("%w: %s", ErrTemporarilyDead, err) + response.err = fmt.Errorf("%w: %s", ErrTemporarilyDead, err) + return response } - return nil, fmt.Errorf("%w: %w", ErrTemporarilyDead, err) - } - - response := &Response{ - Response: res, + response.err = fmt.Errorf("%w: %w", ErrTemporarilyDead, err) + return response } + response.Response = res switch res.StatusCode { case 200: - return response, nil case 304: - return response, fmt.Errorf("%w: %s", ErrUnmodified, res.Status) + response.err = fmt.Errorf("%w: %s", ErrUnmodified, res.Status) case 400, 406, 502, 503: - return response, fmt.Errorf("%w: %s", ErrTemporarilyDead, res.Status) + response.err = fmt.Errorf("%w: %s", ErrTemporarilyDead, res.Status) case 403, 404, 410: - return response, fmt.Errorf("%w: %s", ErrPermanentlyDead, res.Status) + response.err = fmt.Errorf("%w: %s", ErrPermanentlyDead, res.Status) default: - return response, errors.New(res.Status) + response.err = errors.New(res.Status) } + + return response +} + +type pool[IN, OUT any] struct { + in chan IN + out chan OUT + Err error +} + +func NewFuncPool[IN, OUT any]( + ctx context.Context, + size int, + fetch func(ctx context.Context, request IN) OUT, +) (*pool[IN, OUT], func()) { + var wg sync.WaitGroup + + in := make(chan IN, size) + out := make(chan OUT, size) + + wg.Add(size) + for range size { + go func() { + defer wg.Done() + for request := range in { + select { + case <-ctx.Done(): + return + case out <- fetch(ctx, request): + } + } + }() + } + + return &pool[IN, OUT]{ + in: in, + out: out, + }, func() { close(in); wg.Wait(); close(out) } +} + +func (f *pool[IN, OUT]) Fetch(request IN) { + f.in <- request +} + +func (f *pool[IN, OUT]) Out() <-chan OUT { + return f.out } diff --git a/fibheap.go b/fibheap.go index c353e73..01446e9 100644 --- a/fibheap.go +++ b/fibheap.go @@ -1,6 +1,9 @@ package main -import "math/bits" +import ( + "iter" + "math/bits" +) type fibTree[T any] struct { value *T @@ -14,6 +17,20 @@ func (t *fibTree[T]) addAtEnd(n *fibTree[T]) { n.parent = t t.child = append(t.child, n) } +func (t *fibTree[T]) Iter() iter.Seq[*T] { + return func(yield func(*T) bool) { + if !yield(t.value) { + return + } + for _, tree := range t.child { + for v := range tree.Iter() { + if !yield(v) { + return + } + } + } + } +} type fibHeap[T any] struct { trees []*fibTree[T] @@ -183,3 +200,15 @@ func (h *fibHeap[T]) cascadingCut(y *fibTree[T]) { h.cascadingCut(y.parent) } } + +func (b *fibHeap[T]) Iter() iter.Seq[*T] { + return func(yield func(*T) bool) { + for _, tree := range b.trees { + for v := range tree.Iter() { + if !yield(v) { + return + } + } + } + } +} diff --git a/go.mod b/go.mod index ee229bd..358000c 100644 --- a/go.mod +++ b/go.mod @@ -7,11 +7,21 @@ require ( go.yarn.social/lextwt v0.0.0-20240908172157-7b9ae633db51 ) +require ( + github.com/go-logr/logr v1.4.2 // indirect + github.com/go-logr/stdr v1.2.2 // indirect + go.opentelemetry.io/auto/sdk v1.1.0 // indirect + go.opentelemetry.io/otel/metric v1.34.0 // indirect + go.opentelemetry.io/otel/trace v1.34.0 // indirect +) + require ( github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect + github.com/matryer/is v1.4.1 github.com/sirupsen/logrus v1.9.3 // indirect github.com/writeas/go-strip-markdown/v2 v2.1.1 // indirect + go.opentelemetry.io/otel v1.34.0 go.yarn.social/types v0.0.0-20230305013457-e4d91e351ac8 golang.org/x/crypto v0.27.0 // indirect golang.org/x/sys v0.25.0 // indirect diff --git a/go.sum b/go.sum index 8cd4feb..0f9aef5 100644 --- a/go.sum +++ b/go.sum @@ -1,10 +1,19 @@ +github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A= +github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= +github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I= github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= +github.com/matryer/is v1.4.1 h1:55ehd8zaGABKLXQUe2awZ99BD/PTc2ls+KV/dXphgEQ= +github.com/matryer/is v1.4.1/go.mod h1:8I/i5uYgLzgsgEloJE1U6xx5HkBQpAZvepWuujKwMRU= github.com/mattn/go-sqlite3 v1.14.24 h1:tpSp2G2KyMnnQu99ngJ47EIkWVmliIizyZBfPrBWDRM= github.com/mattn/go-sqlite3 v1.14.24/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= @@ -14,6 +23,16 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/writeas/go-strip-markdown/v2 v2.1.1 h1:hAxUM21Uhznf/FnbVGiJciqzska6iLei22Ijc3q2e28= github.com/writeas/go-strip-markdown/v2 v2.1.1/go.mod h1:UvvgPJgn1vvN8nWuE5e7v/+qmDu3BSVnKAB6Gl7hFzA= +go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= +go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= +go.opentelemetry.io/otel v1.34.0 h1:zRLXxLCgL1WyKsPVrgbSdMN4c0FMkDAskSTQP+0hdUY= +go.opentelemetry.io/otel v1.34.0/go.mod h1:OWFPOQ+h4G8xpyjgqo4SxJYdDQ/qmRH+wivy7zzx9oI= +go.opentelemetry.io/otel/metric v1.34.0 h1:+eTR3U0MyfWjRDhmFMxe2SsW64QrZ84AOhvqS7Y+PoQ= +go.opentelemetry.io/otel/metric v1.34.0/go.mod h1:CEDrp0fy2D0MvkXE+dPV7cMi8tWZwX3dmaIhwPOaqHE= +go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC8mh/k= +go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE= +go.uber.org/ratelimit v0.3.1 h1:K4qVE+byfv/B3tC+4nYWP7v/6SimcO7HzHekoMNBma0= +go.uber.org/ratelimit v0.3.1/go.mod h1:6euWsTB6U/Nb3X++xEUXA8ciPJvr19Q/0h1+oDcJhRk= go.yarn.social/lextwt v0.0.0-20240908172157-7b9ae633db51 h1:XEjx0jSNv1h22gwGfQBfMypWv/YZXWGTRbqh3B8xfIs= go.yarn.social/lextwt v0.0.0-20240908172157-7b9ae633db51/go.mod h1:CWAZuBHZfGaqa0FreSeLG+pzK3rHP2TNAG7Zh6QlRiM= go.yarn.social/types v0.0.0-20230305013457-e4d91e351ac8 h1:zfnniiSO/WO65mSpdQzGYJ9pM0rYg/BKgrSm8h2mTyA= diff --git a/http.go b/http.go new file mode 100644 index 0000000..93514a9 --- /dev/null +++ b/http.go @@ -0,0 +1,97 @@ +package main + +import ( + "context" + "fmt" + "net/http" + "slices" + "sort" + "strings" + "time" +) + +func httpServer(c console, app *appState) { + c.Log("start http server") + + db, err := app.DB() + if err != nil { + c.Log("missing db", err) + c.abort() + return + } + + http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "text/html; charset=utf-8") + w.Write([]byte("ok")) + }) + + http.HandleFunc("/conv/{hash}", func(w http.ResponseWriter, r *http.Request) { + hash := r.PathValue("hash") + if (len(hash) < 6 || len(hash) > 8) && !notAny(hash, "abcdefghijklmnopqrstuvwxyz234567") { + w.WriteHeader(http.StatusBadRequest) + return + } + + w.Header().Set("Content-Type", "text/html; charset=utf-8") + w.Write([]byte(hash)) + + rows, err := db.QueryContext(r.Context(), "SELECT feed_id, hash, conv, dt, text FROM twt WHERE hash = $1 or conv = $1", hash) + if err != nil { + c.Log(err) + return + } + defer rows.Close() + + for rows.Next() { + var twt struct { + FeedID string + Hash string + Conv string + Dt time.Time + Text string + } + err = rows.Scan(&twt.FeedID, &twt.Hash, &twt.Conv, &twt.Dt, &twt.Text) + if err != nil { + c.Log(err) + return + } + } + }) + + http.HandleFunc("/feeds", func(w http.ResponseWriter, r *http.Request) { + lis := slices.Collect(app.queue.Iter()) + sort.Slice(lis, func(i, j int) bool { + return lis[i].LastScanOn.Time.Before(lis[j].LastScanOn.Time) + }) + for _, feed := range lis { + fmt.Fprintln(w, feed.State, feed.LastScanOn.Time.Format(time.RFC3339), feed.Nick, feed.URI) + } + }) + + srv := &http.Server{ + Addr: app.args.Listen, + Handler: http.DefaultServeMux, + } + + go func() { + <-c.Done() + c.Log("stop http server") + srv.Shutdown(context.Background()) + }() + + err = srv.ListenAndServe() + if err != nil { + c.Log(err) + c.abort() + return + } +} + +func notAny(s string, chars string) bool { + for _, c := range s { + if !strings.ContainsRune(chars, c) { + return false + } + } + return true +} diff --git a/main.go b/main.go index ac23f25..26e95ef 100644 --- a/main.go +++ b/main.go @@ -2,10 +2,20 @@ package main import ( "context" + "errors" "fmt" "io" "os" "os/signal" + + "go.opentelemetry.io/otel" +) + +const name = "go.sour.is/xt" + +var ( + tracer = otel.Tracer(name) + meter = otel.Meter(name) ) type contextKey struct{ name string } @@ -19,7 +29,13 @@ type console struct { } func (c console) Log(v ...any) { fmt.Fprintln(c.err, v...) } -func (c console) Args() args { return c.Get("args").(args) } +func (c console) Args() args { + v, ok := c.Get("args").(args) + if !ok { + return args{} + } + return v +} func (c *console) Set(name string, value any) { c.Context = context.WithValue(c.Context, contextKey{name}, value) } @@ -28,11 +44,12 @@ func (c console) Get(name string) any { } type args struct { - dbtype string - dbfile string + dbtype string + dbfile string baseFeed string - Nick string - URI string + Nick string + URI string + Listen string } func env(key, def string) string { @@ -49,16 +66,17 @@ func main() { go func() { <-ctx.Done(); console.Log("shutdown"); stop() }() args := args{ - env("XT_DBTYPE", "sqlite3"), - env("XT_DBFILE", "file:twt.db"), - env("XT_BASE_FEED", "feed"), - env("XT_NICK", "xuu"), - env("XT_URI", "https://txt.sour.is/users/xuu/twtxt.txt"), + dbtype: env("XT_DBTYPE", "sqlite3"), + dbfile: env("XT_DBFILE", "file:twt.db"), + baseFeed: env("XT_BASE_FEED", "feed"), + Nick: env("XT_NICK", "xuu"), + URI: env("XT_URI", "https://txt.sour.is/users/xuu/twtxt.txt"), + Listen: env("XT_LISTEN", ":8040"), } console.Set("args", args) - if err := run(console); err != nil { + if err := run(console); err != nil && !errors.Is(err, context.Canceled) { fmt.Println(err) os.Exit(1) } diff --git a/refresh-loop.go b/refresh-loop.go new file mode 100644 index 0000000..1fe5e30 --- /dev/null +++ b/refresh-loop.go @@ -0,0 +1,156 @@ +package main + +import ( + "errors" + "fmt" + "io" + "os" + "path/filepath" + "strings" + "time" + + "go.yarn.social/lextwt" + "go.yarn.social/types" +) + +const ( + TenYear = 3153600000 + OneDay = 86400 + OneHour = 3600 + TenMinutes = 600 + TwoMinutes = 60 +) + +func refreshLoop(c console, app *appState) { + defer c.abort() + + f := NewHTTPFetcher() + fetch, close := NewFuncPool(c.Context, 25, f.Fetch) + defer close() + + db, err := app.DB() + if err != nil { + c.Log("missing db") + c.abort() + return + } + + queue := app.queue + + c.Log("start refresh loop") + for c.Err() == nil { + if queue.IsEmpty() { + c.Log("load feeds") + + it, err := loadFeeds(c.Context, db) + for f := range it { + queue.Insert(&f) + } + if err != nil { + c.Log(err) + return + } + } + + f := queue.ExtractMin() + if f == nil { + c.Log("sleeping for ", TenMinutes*time.Second) + select { + case <-time.After(TenMinutes * time.Second): + + case <-c.Done(): + return + } + continue + } + + c.Log("queue size", queue.count, "next", f.URI, "next scan on", f.LastScanOn.Time.Format(time.RFC3339)) + + if time.Until(f.LastScanOn.Time) > 2*time.Hour { + c.Log("too soon", f.URI) + continue + } + + select { + case <-c.Done(): + return + case t := <-time.After(time.Until(f.LastScanOn.Time)): + c.Log("fetch", t.Format(time.RFC3339), f.Nick, f.URI) + fetch.Fetch(f) + case res := <-fetch.Out(): + c.Log("got response:", res.Request.URI) + f := res.Request + f.LastScanOn.Time = time.Now() + err := res.err + if res.err != nil { + f.LastError.String, f.LastError.Valid = err.Error(), true + if errors.Is(err, ErrPermanentlyDead) { + f.RefreshRate = TenYear + } + if errors.Is(err, ErrTemporarilyDead) { + f.RefreshRate = OneDay + } + if errors.Is(err, ErrUnmodified) { + f.RefreshRate = OneDay + } + + c.Log(err) + err = f.Save(c.Context, db) + if err != nil { + c.Log(err) + return + } + + continue + } + + f.ETag.String, f.ETag.Valid = res.ETag(), true + f.LastModified.Time, f.LastModified.Valid = res.LastModified(), true + + cpy, err := os.OpenFile(filepath.Join("feeds", urlNS.UUID5(f.URI).MarshalText()), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600) + rdr := io.TeeReader(res.Body, cpy) + + twtfile, err := lextwt.ParseFile(rdr, &types.Twter{Nick: f.Nick, URI: f.URI}) + if err != nil { + c.Log(fmt.Errorf("%w: %w", ErrParseFailed, err)) + f.RefreshRate = OneDay + return + } + + if prev, ok := twtfile.Info().GetN("prev", 0); f.FirstFetch && ok { + _, part, ok := strings.Cut(prev.Value(), " ") + if ok { + part = f.URI[:strings.LastIndex(f.URI, "/")+1] + part + queue.Insert(&Feed{ + FetchURI: part, + URI: f.URI, + Nick: f.Nick, + LastScanOn: f.LastScanOn, + RefreshRate: f.RefreshRate, + }) + } + } + + err = storeFeed(db, twtfile) + if err != nil { + c.Log(err) + + err = f.Save(c.Context, db) + c.Log(err) + return + } + + cpy.Close() + + f.LastScanOn.Time = time.Now() + f.RefreshRate = TenMinutes + f.LastError.String = "" + + err = f.Save(c.Context, db) + if err != nil { + c.Log(err) + return + } + } + } +} diff --git a/service.go b/service.go index ab543c0..a63978e 100644 --- a/service.go +++ b/service.go @@ -1,18 +1,13 @@ package main import ( - "cmp" + "context" "database/sql" - "errors" "fmt" - "io" "iter" - "net/url" "os" - "path/filepath" - "slices" "strings" - "time" + "sync" _ "embed" @@ -23,350 +18,98 @@ import ( func run(c console) error { a := c.Args() - - db, err := setupDB(c) - if err != nil { - return err + app := &appState{ + args: a, + feeds: sync.Map{}, + queue: FibHeap(func(a, b *Feed) bool { + return a.LastScanOn.Time.Before(b.LastScanOn.Time) + }), } - defer db.Close() - c.Set("db", db) + // Setup DB + err := func(ctx context.Context) error { + db, err := app.DB() + if err != nil { + return err + } + defer db.Close() - f, err := os.Open(a.baseFeed) + for _, stmt := range strings.Split(initSQL, ";") { + _, err = db.ExecContext(ctx, stmt) + if err != nil { + return err + } + } + + return nil + }(c.Context) if err != nil { return err } - twtfile, err := lextwt.ParseFile(f, &types.Twter{ - Nick: a.Nick, - URI: a.URI, - }) - if err != nil { - return fmt.Errorf("%w: %w", ErrParseFailed, err) - } - f.Close() + // Seed File + err = func() error { + f, err := os.Open(a.baseFeed) + if err != nil { + return err + } + defer f.Close() + twtfile, err := lextwt.ParseFile(f, &types.Twter{ + Nick: a.Nick, + URI: a.URI, + }) + if err != nil { + return fmt.Errorf("%w: %w", ErrParseFailed, err) + } - err = storeFeed(db, twtfile) + db, err := app.DB() + if err != nil { + return err + } + defer db.Close() + + return storeFeed(db, twtfile) + }() if err != nil { return err } - c.Log("ready") - - go refreshLoop(c) + go refreshLoop(c, app) + go httpServer(c, app) <-c.Done() + return c.Err() +} +type appState struct { + args args + feeds sync.Map + queue *fibHeap[Feed] + + +} + +func (app *appState) DB() (*sql.DB, error) { + return sql.Open(app.args.dbtype, app.args.dbfile) +} + +func (app *appState) Feed(feedID string) *Feed { return nil } -var ( - //go:embed init.sql - initSQL string - - insertFeed = ` - insert into feeds - (feed_id, uri, nick, last_scan_on, refresh_rate) - values (?, ?, ?, ?, ?) - ON CONFLICT (feed_id) DO NOTHING - ` - - insertTwt = ` - insert into twts - (feed_id, hash, conv, dt, text, mentions, tags) - values (?, ?, ?, ?, ?, ?, ?) - ON CONFLICT (feed_id, hash) DO NOTHING - ` - - fetchFeeds = ` - select - feed_id, - uri, - nick, - last_scan_on, - refresh_rate, - last_modified_on, - last_etag - from feeds - ` - updateFeed = ` - update feeds set - last_scan_on = ?, - refresh_rate = ?, - last_modified_on = ?, - last_etag = ?, - last_error = ? - where feed_id = ? - ` -) - -func setupDB(c console) (*sql.DB, error) { - a := c.Args() - db, err := sql.Open(a.dbtype, a.dbfile) - if err != nil { - return nil, err - } - - for _, stmt := range strings.Split(initSQL, ";") { - _, err = db.ExecContext(c, stmt) - if err != nil { - return nil, err - } - } - - return db, nil -} - -func refreshLoop(c console) { - defer c.abort() - - TenYear := 3153600000 // 10 year - OneDay := 86400 // 1 day - TenMinutes := 600 // 10 mins - - fetch := NewHTTPFetcher() - - queue := FibHeap(func(a, b *Feed) bool { - return a.LastScanOn.Time.Before(b.LastScanOn.Time) - }) - - db := c.Get("db").(*sql.DB) - - c.Log("start refresh loop") - for c.Err() == nil { - if queue.IsEmpty() { - it, err := LoadFeeds(c) - for f := range it { - queue.Insert(&f) +func (app *appState) Feeds() iter.Seq2[string, *Feed] { + return func(yield func(string, *Feed) bool) { + app.feeds.Range(func(k, v any) bool { + key, _ := k.(string) + value, ok := v.(*Feed) + if !ok { + return false } - if err != nil { - c.Log(err) - return + if !yield(key, value) { + return false } - } - - f := queue.ExtractMin() - - c.Log("queue size", queue.count, "next", f.URI, "last scan on", f.LastScanOn.Time.Format(time.RFC3339)) - - if time.Until(f.LastScanOn.Time) > 2*time.Hour { - c.Log("too soon", f.URI) - continue - } - - select { - case <-c.Done(): - return - case <-time.After(time.Until(f.LastScanOn.Time)): - } - - res, err := fetch.Fetch(c.Context, f) - if err != nil { - f.LastError.String, f.LastError.Valid = err.Error(), true - if errors.Is(err, ErrPermanentlyDead) { - f.RefreshRate = TenYear - } - if errors.Is(err, ErrTemporarilyDead) { - f.RefreshRate = OneDay - } - if errors.Is(err, ErrUnmodified) { - f.RefreshRate = OneDay - } - - c.Log(err) - err = f.Save(c.Context, db) - if err != nil { - c.Log(err) - return - } - - continue - } - - f.ETag.String, f.ETag.Valid = res.ETag(), true - f.LastModified.Time, f.LastModified.Valid = res.LastModified(), true - - cpy, err := os.OpenFile(filepath.Join("feeds", urlNS.UUID5(f.URI).MarshalText()), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600) - rdr := io.TeeReader(res.Body, cpy) - - twtfile, err := lextwt.ParseFile(rdr, &types.Twter{Nick: f.Nick, URI: f.URI}) - if err != nil { - c.Log(fmt.Errorf("%w: %w", ErrParseFailed, err)) - return - } - - if prev, ok :=twtfile.Info().GetN("prev", 0); ok { - _, part, ok := strings.Cut(prev.Value(), " ") - if ok { - - part = f.URI[:strings.LastIndex(f.URI, "/")+1] + part - queue.Insert(&Feed{ - FetchURI: part, - URI: f.URI, - Nick: f.Nick, - LastScanOn: f.LastScanOn, - RefreshRate: f.RefreshRate, - }) - } - } - - err = storeFeed(db, twtfile) - if err != nil { - c.Log(err) - return - } - - cpy.Close() - - f.LastScanOn.Time = time.Now() - f.RefreshRate = TenMinutes - f.LastError.String = "" - - err = f.Save(c.Context, db) - if err != nil { - c.Log(err) - return - } + return true + }) } } - -func storeFeed(db *sql.DB, f types.TwtFile) error { - loadTS := time.Now() - refreshRate := 600 - - feedID := urlNS.UUID5(cmp.Or(f.Twter().HashingURI, f.Twter().URI)) - - tx, err := db.Begin() - if err != nil { - return err - } - - followers := f.Info().GetAll("follow") - followMap := make(map[string]string, len(followers)) - for _, f := range f.Info().GetAll("follow") { - nick, uri, ok := strings.Cut(f.Value(), "http") - if !ok { - continue - } - nick = strings.TrimSpace(nick) - uri = "http" + strings.TrimSpace(uri) - - if _, err := url.Parse(uri); err != nil { - continue - } - - followMap[nick] = uri - } - - defer tx.Rollback() - - _, err = tx.Exec( - insertFeed, - feedID, - f.Twter().HashingURI, - f.Twter().DomainNick(), - loadTS, - refreshRate, - ) - if err != nil { - return err - } - - for _, twt := range f.Twts() { - mentions := make(uuids, 0, len(twt.Mentions())) - for _, mention := range twt.Mentions() { - followMap[mention.Twter().Nick] = mention.Twter().URI - mentions = append(mentions, urlNS.UUID5(mention.Twter().URI)) - } - - tags := make(strList, 0, len(twt.Tags())) - for _, tag := range twt.Tags() { - tags = append(tags, tag.Text()) - } - - subject := twt.Subject() - subjectTag := "" - if subject != nil { - if tag, ok := subject.Tag().(*lextwt.Tag); ok && tag != nil { - subjectTag = tag.Text() - } - } - - _, err = tx.Exec( - insertTwt, - feedID, - twt.Hash(), - subjectTag, - twt.Created(), - fmt.Sprint(twt), - mentions.ToStrList(), - tags, - ) - if err != nil { - return err - } - } - - for nick, uri := range followMap { - _, err = tx.Exec( - insertFeed, - urlNS.UUID5(uri), - uri, - nick, - nil, - refreshRate, - ) - if err != nil { - return err - } - } - - return tx.Commit() -} - -func LoadFeeds(c console) (iter.Seq[Feed], error) { - var err error - var res *sql.Rows - - db := c.Get("db").(*sql.DB) - res, err = db.QueryContext(c.Context, fetchFeeds) - - if err != nil { - return slices.Values([]Feed{}), err - } - - c.Log("load feeds") - - return func(yield func(Feed) bool) { - for res.Next() { - var f Feed - f.Version = "0.0.1" - err = res.Scan( - &f.FeedID, - &f.URI, - &f.Nick, - &f.LastScanOn, - &f.RefreshRate, - &f.LastModified, - &f.ETag, - ) - if err != nil { - return - } - - if !f.LastScanOn.Valid { - f.LastScanOn.Time = time.Now() - f.LastScanOn.Valid = true - } else { - f.LastScanOn.Time = f.LastScanOn.Time.Add(time.Duration(f.RefreshRate) * time.Second) - } - - f.FetchURI = f.URI - - if !yield(f) { - return - } - } - }, err -} diff --git a/service_test.go b/service_test.go new file mode 100644 index 0000000..3e1973d --- /dev/null +++ b/service_test.go @@ -0,0 +1,14 @@ +package main + +import ( + "testing" + + "github.com/matryer/is" +) + +func TestNotAny(t *testing.T) { + is := is.New(t) + + is.True(!notAny("asdf1asdf", "abcdefghijklmnopqrstuvwxyz234567")) + is.True(notAny("asdf2asdf", "abcdefghijklmnopqrstuvwxyz234567")) +}