chore: refactor app harness

This commit is contained in:
xuu 2025-02-15 16:12:42 -07:00
parent 3d6048e544
commit 1574c48429
Signed by: xuu
GPG Key ID: 8B3B0604F164E04F
11 changed files with 824 additions and 378 deletions

84
about.me Normal file
View File

@ -0,0 +1,84 @@
.ce
Preamble
.sp
We, the people of the United States, in order
to form a more perfect Union, establish justice, insure
domestic tranquility, provide for the common defense, promote
the general welfare,
and secure the blessing of liberty to ourselves and our
posterity do ordain and establish this Constitution for the
United States of America.
.sp
.nr aR 0 1
.af aR I
.de AR
.ce
.ps 16
.ft B
Article \\n+(aR
.nr sE 0 1
.af sE 1
.ps 12
.ft P
..
.de SE
.sp
.ft B
\\s-2SECTION \\n+(sE:\\s+2
.ft P
.nr pP 0 1
.af pP 1
..
.de PP
.sp
.ft I
\\s-3Paragraph \\n+(pP:\\s+3
.ft P
..
.AR
.SE
Legislative powers; in whom vested:
.PP
All legislative powers herein granted shall be vested in a
Congress of the United States, which shall consist of a Senate
and a House of Representatives.
.SE
House of Representatives, how and by whom chosen, Qualifications
of a Representative. Representatives and direct taxes, how
apportioned. Enumeration. Vacancies to be filled. Power of
choosing officers and of impeachment.
.PP
The House of Representatives shall be composed of members chosen
every second year by the people of the several states, and the
electors in each State shall have the qualifications requisite
for electors of the most numerous branch of the State Legislature.
.PP
No person shall be a Representative who shall not have attained
to the age of twenty-five years, and been seven years a citizen
of the United States, and who shall not, when elected, be an
inhabitant of that State in which he shall be chosen.
.PP
Representatives and direct taxes shall be apportioned among the
several States which maybe included within this Union, according
to their respective numbers, which shall be determined by adding
to the whole number of free persons, including those bound for
service for a term of years, and excluding Indians not taxed,
three-fifths of all other persons. The actual enumeration shall
be made within three years after the first meeting of the
Congress of the United States, and within every subsequent term
of ten years, in such manner as they shall by law direct. The
number of Representatives shall not exceed one for every thirty
thousand, but each State shall have at least one Representative;
and until such enumeration shall be made, the State of New
Hampshire shall be entitled to choose three, Massachusetts eight,
Rhode Island and Providence Plantations one, Connecticut
five, New York six, New Jersey four, Pennsylvania eight,
Delaware one, Maryland six, Virginia ten, North Carolina five,
South Carolina five, and Georgia three.
.PP
When vacancies happen in the representation from any State, the
Executive Authority thereof shall issue writs of election to fill
such vacancies.
.PP
The House of Representatives shall choose their Speaker and other
officers; and shall have the sole power of impeachment.

248
feed.go
View File

@ -1,8 +1,21 @@
package main
import (
"cmp"
"context"
"database/sql"
"fmt"
"iter"
"net/http"
"net/url"
"slices"
"strings"
"time"
_ "embed"
"go.yarn.social/lextwt"
"go.yarn.social/types"
)
type Feed struct {
@ -17,13 +30,67 @@ type Feed struct {
LastError sql.NullString
ETag sql.NullString
Version string
DiscloseFeedURL string
DiscloseNick string
FirstFetch bool
Version string
State State
}
type State string
const (
PermanentlyDead State = "permanantly-dead"
Frozen State = "frozen"
Cold State = "cold"
Warm State = "warm"
Hot State = "hot"
)
var (
//go:embed init.sql
initSQL string
insertFeed = `
insert into feeds
(feed_id, uri, nick, last_scan_on, refresh_rate)
values (?, ?, ?, ?, ?)
ON CONFLICT (feed_id) DO NOTHING
`
insertTwt = `
insert into twts
(feed_id, hash, conv, dt, text, mentions, tags)
values (?, ?, ?, ?, ?, ?, ?)
ON CONFLICT (feed_id, hash) DO NOTHING
`
fetchFeeds = `
select
feed_id,
uri,
nick,
last_scan_on,
refresh_rate,
last_modified_on,
last_etag
from feeds
where datetime(last_scan_on, '+'||refresh_rate||' seconds') < datetime(current_timestamp, '+10 minutes')
`
updateFeed = `
update feeds set
last_scan_on = ?,
refresh_rate = ?,
last_modified_on = ?,
last_etag = ?,
last_error = ?
where feed_id = ?
`
)
func (f *Feed) Save(ctx context.Context, db *sql.DB) error {
fmt.Println(f.FetchURI, " ", f.LastModified, " ", f.LastError)
_, err := db.ExecContext(
ctx,
updateFeed,
@ -36,3 +103,182 @@ func (f *Feed) Save(ctx context.Context, db *sql.DB) error {
)
return err
}
func (f *Feed) Scan(res interface{ Scan(...any) error }) error {
f.State = "load"
var err error
f.Version = "0.0.1"
err = res.Scan(
&f.FeedID,
&f.URI,
&f.Nick,
&f.LastScanOn,
&f.RefreshRate,
&f.LastModified,
&f.ETag,
)
if err != nil {
return err
}
if !f.LastScanOn.Valid {
f.FirstFetch = true
f.LastScanOn.Time = time.Now()
f.LastScanOn.Valid = true
} else {
f.LastScanOn.Time = f.LastScanOn.Time.Add(time.Duration(f.RefreshRate) * time.Second)
}
f.FetchURI = f.URI
return err
}
func loadFeeds(ctx context.Context, db *sql.DB) (iter.Seq[Feed], error) {
var err error
var res *sql.Rows
res, err = db.QueryContext(ctx, fetchFeeds)
if err != nil {
return slices.Values([]Feed{}), err
}
return func(yield func(Feed) bool) {
for res.Next() {
var f Feed
err = f.Scan(res)
if err != nil {
return
}
if !yield(f) {
return
}
}
}, err
}
func storeFeed(db *sql.DB, f types.TwtFile) error {
loadTS := time.Now()
refreshRate := 600
feedID := urlNS.UUID5(cmp.Or(f.Twter().HashingURI, f.Twter().URI))
tx, err := db.Begin()
if err != nil {
return err
}
followers := f.Info().GetAll("follow")
followMap := make(map[string]string, len(followers))
for _, f := range f.Info().GetAll("follow") {
nick, uri, ok := strings.Cut(f.Value(), "http")
if !ok {
continue
}
nick = strings.TrimSpace(nick)
uri = "http" + strings.TrimSpace(uri)
if _, err := url.Parse(uri); err != nil {
continue
}
followMap[nick] = uri
}
defer tx.Rollback()
_, err = tx.Exec(
insertFeed,
feedID,
f.Twter().HashingURI,
f.Twter().DomainNick(),
loadTS,
refreshRate,
)
if err != nil {
return err
}
for _, twt := range f.Twts() {
mentions := make(uuids, 0, len(twt.Mentions()))
for _, mention := range twt.Mentions() {
followMap[mention.Twter().Nick] = mention.Twter().URI
mentions = append(mentions, urlNS.UUID5(mention.Twter().URI))
}
tags := make(strList, 0, len(twt.Tags()))
for _, tag := range twt.Tags() {
tags = append(tags, tag.Text())
}
subject := twt.Subject()
subjectTag := ""
if subject != nil {
if tag, ok := subject.Tag().(*lextwt.Tag); ok && tag != nil {
subjectTag = tag.Text()
}
}
_, err = tx.Exec(
insertTwt,
feedID,
twt.Hash(),
subjectTag,
twt.Created(),
fmt.Sprint(twt),
mentions.ToStrList(),
tags,
)
if err != nil {
return err
}
}
for nick, uri := range followMap {
_, err = tx.Exec(
insertFeed,
urlNS.UUID5(uri),
uri,
nick,
nil,
refreshRate,
)
if err != nil {
return err
}
}
return tx.Commit()
}
func (feed *Feed) MakeHTTPRequest(ctx context.Context) (*http.Request, error) {
feed.State = "fetch"
if strings.Contains(feed.FetchURI, "lublin.se") {
return nil, fmt.Errorf("%w: permaban: %s", ErrPermanentlyDead, feed.URI)
}
req, err := http.NewRequestWithContext(ctx, "GET", feed.FetchURI, nil)
if err != nil {
return nil, fmt.Errorf("creating HTTP request failed: %w", err)
}
req.Header.Add("Accept", "text/plain")
if !feed.LastModified.Valid {
req.Header.Add("If-Modified-Since", feed.LastModified.Time.Format(http.TimeFormat))
}
if feed.ETag.Valid {
req.Header.Add("If-None-Match", feed.ETag.String)
}
if feed.DiscloseFeedURL != "" && feed.DiscloseNick != "" {
req.Header.Set("User-Agent", fmt.Sprintf("xt/%s (+%s; @%s)",
feed.Version, feed.DiscloseFeedURL, feed.DiscloseNick))
} else {
req.Header.Set("User-Agent", fmt.Sprintf("xt/%s", feed.Version))
}
return req, nil
}

View File

@ -6,7 +6,7 @@ import (
"fmt"
"net"
"net/http"
"strings"
"sync"
"time"
)
@ -18,7 +18,9 @@ var (
)
type Response struct {
Request *Feed
*http.Response
err error
}
func (r *Response) ETag() string {
@ -67,59 +69,87 @@ func NewHTTPFetcher() *httpFetcher {
}
}
func (f *httpFetcher) Fetch(ctx context.Context, request *Feed) (*Response, error) {
if strings.Contains(request.FetchURI, "lublin.se") {
return nil, fmt.Errorf("%w: permaban: %s", ErrPermanentlyDead, request.URI)
func (f *httpFetcher) Fetch(ctx context.Context, request *Feed) *Response {
response := &Response{
Request: request,
}
req, err := http.NewRequestWithContext(ctx, "GET", request.FetchURI, nil)
req, err := request.MakeHTTPRequest(ctx)
if err != nil {
return nil, fmt.Errorf("creating HTTP request failed: %w", err)
}
req.Header.Add("Accept", "text/plain")
if !request.LastModified.Valid {
req.Header.Add("If-Modified-Since", request.LastModified.Time.Format(http.TimeFormat))
}
if request.ETag.Valid {
req.Header.Add("If-None-Match", request.ETag.String)
}
if request.DiscloseFeedURL != "" && request.DiscloseNick != "" {
req.Header.Set("User-Agent", fmt.Sprintf("xt/%s (+%s; @%s)",
request.Version, request.DiscloseFeedURL, request.DiscloseNick))
} else {
req.Header.Set("User-Agent", fmt.Sprintf("xt/%s", request.Version))
response.err = err
return response
}
res, err := f.client.Do(req)
if err != nil {
if errors.Is(err, &net.DNSError{}) {
return nil, fmt.Errorf("%w: %s", ErrTemporarilyDead, err)
response.err = fmt.Errorf("%w: %s", ErrTemporarilyDead, err)
return response
}
return nil, fmt.Errorf("%w: %w", ErrTemporarilyDead, err)
}
response := &Response{
Response: res,
response.err = fmt.Errorf("%w: %w", ErrTemporarilyDead, err)
return response
}
response.Response = res
switch res.StatusCode {
case 200:
return response, nil
case 304:
return response, fmt.Errorf("%w: %s", ErrUnmodified, res.Status)
response.err = fmt.Errorf("%w: %s", ErrUnmodified, res.Status)
case 400, 406, 502, 503:
return response, fmt.Errorf("%w: %s", ErrTemporarilyDead, res.Status)
response.err = fmt.Errorf("%w: %s", ErrTemporarilyDead, res.Status)
case 403, 404, 410:
return response, fmt.Errorf("%w: %s", ErrPermanentlyDead, res.Status)
response.err = fmt.Errorf("%w: %s", ErrPermanentlyDead, res.Status)
default:
return response, errors.New(res.Status)
response.err = errors.New(res.Status)
}
return response
}
type pool[IN, OUT any] struct {
in chan IN
out chan OUT
Err error
}
func NewFuncPool[IN, OUT any](
ctx context.Context,
size int,
fetch func(ctx context.Context, request IN) OUT,
) (*pool[IN, OUT], func()) {
var wg sync.WaitGroup
in := make(chan IN, size)
out := make(chan OUT, size)
wg.Add(size)
for range size {
go func() {
defer wg.Done()
for request := range in {
select {
case <-ctx.Done():
return
case out <- fetch(ctx, request):
}
}
}()
}
return &pool[IN, OUT]{
in: in,
out: out,
}, func() { close(in); wg.Wait(); close(out) }
}
func (f *pool[IN, OUT]) Fetch(request IN) {
f.in <- request
}
func (f *pool[IN, OUT]) Out() <-chan OUT {
return f.out
}

View File

@ -1,6 +1,9 @@
package main
import "math/bits"
import (
"iter"
"math/bits"
)
type fibTree[T any] struct {
value *T
@ -14,6 +17,20 @@ func (t *fibTree[T]) addAtEnd(n *fibTree[T]) {
n.parent = t
t.child = append(t.child, n)
}
func (t *fibTree[T]) Iter() iter.Seq[*T] {
return func(yield func(*T) bool) {
if !yield(t.value) {
return
}
for _, tree := range t.child {
for v := range tree.Iter() {
if !yield(v) {
return
}
}
}
}
}
type fibHeap[T any] struct {
trees []*fibTree[T]
@ -183,3 +200,15 @@ func (h *fibHeap[T]) cascadingCut(y *fibTree[T]) {
h.cascadingCut(y.parent)
}
}
func (b *fibHeap[T]) Iter() iter.Seq[*T] {
return func(yield func(*T) bool) {
for _, tree := range b.trees {
for v := range tree.Iter() {
if !yield(v) {
return
}
}
}
}
}

10
go.mod
View File

@ -7,11 +7,21 @@ require (
go.yarn.social/lextwt v0.0.0-20240908172157-7b9ae633db51
)
require (
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
go.opentelemetry.io/otel/metric v1.34.0 // indirect
go.opentelemetry.io/otel/trace v1.34.0 // indirect
)
require (
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/matryer/is v1.4.1
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/writeas/go-strip-markdown/v2 v2.1.1 // indirect
go.opentelemetry.io/otel v1.34.0
go.yarn.social/types v0.0.0-20230305013457-e4d91e351ac8
golang.org/x/crypto v0.27.0 // indirect
golang.org/x/sys v0.25.0 // indirect

19
go.sum
View File

@ -1,10 +1,19 @@
github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A=
github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I=
github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
github.com/matryer/is v1.4.1 h1:55ehd8zaGABKLXQUe2awZ99BD/PTc2ls+KV/dXphgEQ=
github.com/matryer/is v1.4.1/go.mod h1:8I/i5uYgLzgsgEloJE1U6xx5HkBQpAZvepWuujKwMRU=
github.com/mattn/go-sqlite3 v1.14.24 h1:tpSp2G2KyMnnQu99ngJ47EIkWVmliIizyZBfPrBWDRM=
github.com/mattn/go-sqlite3 v1.14.24/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
@ -14,6 +23,16 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/writeas/go-strip-markdown/v2 v2.1.1 h1:hAxUM21Uhznf/FnbVGiJciqzska6iLei22Ijc3q2e28=
github.com/writeas/go-strip-markdown/v2 v2.1.1/go.mod h1:UvvgPJgn1vvN8nWuE5e7v/+qmDu3BSVnKAB6Gl7hFzA=
go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA=
go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A=
go.opentelemetry.io/otel v1.34.0 h1:zRLXxLCgL1WyKsPVrgbSdMN4c0FMkDAskSTQP+0hdUY=
go.opentelemetry.io/otel v1.34.0/go.mod h1:OWFPOQ+h4G8xpyjgqo4SxJYdDQ/qmRH+wivy7zzx9oI=
go.opentelemetry.io/otel/metric v1.34.0 h1:+eTR3U0MyfWjRDhmFMxe2SsW64QrZ84AOhvqS7Y+PoQ=
go.opentelemetry.io/otel/metric v1.34.0/go.mod h1:CEDrp0fy2D0MvkXE+dPV7cMi8tWZwX3dmaIhwPOaqHE=
go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC8mh/k=
go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE=
go.uber.org/ratelimit v0.3.1 h1:K4qVE+byfv/B3tC+4nYWP7v/6SimcO7HzHekoMNBma0=
go.uber.org/ratelimit v0.3.1/go.mod h1:6euWsTB6U/Nb3X++xEUXA8ciPJvr19Q/0h1+oDcJhRk=
go.yarn.social/lextwt v0.0.0-20240908172157-7b9ae633db51 h1:XEjx0jSNv1h22gwGfQBfMypWv/YZXWGTRbqh3B8xfIs=
go.yarn.social/lextwt v0.0.0-20240908172157-7b9ae633db51/go.mod h1:CWAZuBHZfGaqa0FreSeLG+pzK3rHP2TNAG7Zh6QlRiM=
go.yarn.social/types v0.0.0-20230305013457-e4d91e351ac8 h1:zfnniiSO/WO65mSpdQzGYJ9pM0rYg/BKgrSm8h2mTyA=

97
http.go Normal file
View File

@ -0,0 +1,97 @@
package main
import (
"context"
"fmt"
"net/http"
"slices"
"sort"
"strings"
"time"
)
func httpServer(c console, app *appState) {
c.Log("start http server")
db, err := app.DB()
if err != nil {
c.Log("missing db", err)
c.abort()
return
}
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/html; charset=utf-8")
w.Write([]byte("ok"))
})
http.HandleFunc("/conv/{hash}", func(w http.ResponseWriter, r *http.Request) {
hash := r.PathValue("hash")
if (len(hash) < 6 || len(hash) > 8) && !notAny(hash, "abcdefghijklmnopqrstuvwxyz234567") {
w.WriteHeader(http.StatusBadRequest)
return
}
w.Header().Set("Content-Type", "text/html; charset=utf-8")
w.Write([]byte(hash))
rows, err := db.QueryContext(r.Context(), "SELECT feed_id, hash, conv, dt, text FROM twt WHERE hash = $1 or conv = $1", hash)
if err != nil {
c.Log(err)
return
}
defer rows.Close()
for rows.Next() {
var twt struct {
FeedID string
Hash string
Conv string
Dt time.Time
Text string
}
err = rows.Scan(&twt.FeedID, &twt.Hash, &twt.Conv, &twt.Dt, &twt.Text)
if err != nil {
c.Log(err)
return
}
}
})
http.HandleFunc("/feeds", func(w http.ResponseWriter, r *http.Request) {
lis := slices.Collect(app.queue.Iter())
sort.Slice(lis, func(i, j int) bool {
return lis[i].LastScanOn.Time.Before(lis[j].LastScanOn.Time)
})
for _, feed := range lis {
fmt.Fprintln(w, feed.State, feed.LastScanOn.Time.Format(time.RFC3339), feed.Nick, feed.URI)
}
})
srv := &http.Server{
Addr: app.args.Listen,
Handler: http.DefaultServeMux,
}
go func() {
<-c.Done()
c.Log("stop http server")
srv.Shutdown(context.Background())
}()
err = srv.ListenAndServe()
if err != nil {
c.Log(err)
c.abort()
return
}
}
func notAny(s string, chars string) bool {
for _, c := range s {
if !strings.ContainsRune(chars, c) {
return false
}
}
return true
}

32
main.go
View File

@ -2,10 +2,20 @@ package main
import (
"context"
"errors"
"fmt"
"io"
"os"
"os/signal"
"go.opentelemetry.io/otel"
)
const name = "go.sour.is/xt"
var (
tracer = otel.Tracer(name)
meter = otel.Meter(name)
)
type contextKey struct{ name string }
@ -19,7 +29,13 @@ type console struct {
}
func (c console) Log(v ...any) { fmt.Fprintln(c.err, v...) }
func (c console) Args() args { return c.Get("args").(args) }
func (c console) Args() args {
v, ok := c.Get("args").(args)
if !ok {
return args{}
}
return v
}
func (c *console) Set(name string, value any) {
c.Context = context.WithValue(c.Context, contextKey{name}, value)
}
@ -33,6 +49,7 @@ type args struct {
baseFeed string
Nick string
URI string
Listen string
}
func env(key, def string) string {
@ -49,16 +66,17 @@ func main() {
go func() { <-ctx.Done(); console.Log("shutdown"); stop() }()
args := args{
env("XT_DBTYPE", "sqlite3"),
env("XT_DBFILE", "file:twt.db"),
env("XT_BASE_FEED", "feed"),
env("XT_NICK", "xuu"),
env("XT_URI", "https://txt.sour.is/users/xuu/twtxt.txt"),
dbtype: env("XT_DBTYPE", "sqlite3"),
dbfile: env("XT_DBFILE", "file:twt.db"),
baseFeed: env("XT_BASE_FEED", "feed"),
Nick: env("XT_NICK", "xuu"),
URI: env("XT_URI", "https://txt.sour.is/users/xuu/twtxt.txt"),
Listen: env("XT_LISTEN", ":8040"),
}
console.Set("args", args)
if err := run(console); err != nil {
if err := run(console); err != nil && !errors.Is(err, context.Canceled) {
fmt.Println(err)
os.Exit(1)
}

156
refresh-loop.go Normal file
View File

@ -0,0 +1,156 @@
package main
import (
"errors"
"fmt"
"io"
"os"
"path/filepath"
"strings"
"time"
"go.yarn.social/lextwt"
"go.yarn.social/types"
)
const (
TenYear = 3153600000
OneDay = 86400
OneHour = 3600
TenMinutes = 600
TwoMinutes = 60
)
func refreshLoop(c console, app *appState) {
defer c.abort()
f := NewHTTPFetcher()
fetch, close := NewFuncPool(c.Context, 25, f.Fetch)
defer close()
db, err := app.DB()
if err != nil {
c.Log("missing db")
c.abort()
return
}
queue := app.queue
c.Log("start refresh loop")
for c.Err() == nil {
if queue.IsEmpty() {
c.Log("load feeds")
it, err := loadFeeds(c.Context, db)
for f := range it {
queue.Insert(&f)
}
if err != nil {
c.Log(err)
return
}
}
f := queue.ExtractMin()
if f == nil {
c.Log("sleeping for ", TenMinutes*time.Second)
select {
case <-time.After(TenMinutes * time.Second):
case <-c.Done():
return
}
continue
}
c.Log("queue size", queue.count, "next", f.URI, "next scan on", f.LastScanOn.Time.Format(time.RFC3339))
if time.Until(f.LastScanOn.Time) > 2*time.Hour {
c.Log("too soon", f.URI)
continue
}
select {
case <-c.Done():
return
case t := <-time.After(time.Until(f.LastScanOn.Time)):
c.Log("fetch", t.Format(time.RFC3339), f.Nick, f.URI)
fetch.Fetch(f)
case res := <-fetch.Out():
c.Log("got response:", res.Request.URI)
f := res.Request
f.LastScanOn.Time = time.Now()
err := res.err
if res.err != nil {
f.LastError.String, f.LastError.Valid = err.Error(), true
if errors.Is(err, ErrPermanentlyDead) {
f.RefreshRate = TenYear
}
if errors.Is(err, ErrTemporarilyDead) {
f.RefreshRate = OneDay
}
if errors.Is(err, ErrUnmodified) {
f.RefreshRate = OneDay
}
c.Log(err)
err = f.Save(c.Context, db)
if err != nil {
c.Log(err)
return
}
continue
}
f.ETag.String, f.ETag.Valid = res.ETag(), true
f.LastModified.Time, f.LastModified.Valid = res.LastModified(), true
cpy, err := os.OpenFile(filepath.Join("feeds", urlNS.UUID5(f.URI).MarshalText()), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600)
rdr := io.TeeReader(res.Body, cpy)
twtfile, err := lextwt.ParseFile(rdr, &types.Twter{Nick: f.Nick, URI: f.URI})
if err != nil {
c.Log(fmt.Errorf("%w: %w", ErrParseFailed, err))
f.RefreshRate = OneDay
return
}
if prev, ok := twtfile.Info().GetN("prev", 0); f.FirstFetch && ok {
_, part, ok := strings.Cut(prev.Value(), " ")
if ok {
part = f.URI[:strings.LastIndex(f.URI, "/")+1] + part
queue.Insert(&Feed{
FetchURI: part,
URI: f.URI,
Nick: f.Nick,
LastScanOn: f.LastScanOn,
RefreshRate: f.RefreshRate,
})
}
}
err = storeFeed(db, twtfile)
if err != nil {
c.Log(err)
err = f.Save(c.Context, db)
c.Log(err)
return
}
cpy.Close()
f.LastScanOn.Time = time.Now()
f.RefreshRate = TenMinutes
f.LastError.String = ""
err = f.Save(c.Context, db)
if err != nil {
c.Log(err)
return
}
}
}
}

View File

@ -1,18 +1,13 @@
package main
import (
"cmp"
"context"
"database/sql"
"errors"
"fmt"
"io"
"iter"
"net/url"
"os"
"path/filepath"
"slices"
"strings"
"time"
"sync"
_ "embed"
@ -23,19 +18,42 @@ import (
func run(c console) error {
a := c.Args()
app := &appState{
args: a,
feeds: sync.Map{},
queue: FibHeap(func(a, b *Feed) bool {
return a.LastScanOn.Time.Before(b.LastScanOn.Time)
}),
}
db, err := setupDB(c)
// Setup DB
err := func(ctx context.Context) error {
db, err := app.DB()
if err != nil {
return err
}
defer db.Close()
c.Set("db", db)
for _, stmt := range strings.Split(initSQL, ";") {
_, err = db.ExecContext(ctx, stmt)
if err != nil {
return err
}
}
return nil
}(c.Context)
if err != nil {
return err
}
// Seed File
err = func() error {
f, err := os.Open(a.baseFeed)
if err != nil {
return err
}
defer f.Close()
twtfile, err := lextwt.ParseFile(f, &types.Twter{
Nick: a.Nick,
@ -44,329 +62,54 @@ func run(c console) error {
if err != nil {
return fmt.Errorf("%w: %w", ErrParseFailed, err)
}
f.Close()
db, err := app.DB()
if err != nil {
return err
}
defer db.Close()
err = storeFeed(db, twtfile)
return storeFeed(db, twtfile)
}()
if err != nil {
return err
}
c.Log("ready")
go refreshLoop(c)
go refreshLoop(c, app)
go httpServer(c, app)
<-c.Done()
return c.Err()
}
type appState struct {
args args
feeds sync.Map
queue *fibHeap[Feed]
}
func (app *appState) DB() (*sql.DB, error) {
return sql.Open(app.args.dbtype, app.args.dbfile)
}
func (app *appState) Feed(feedID string) *Feed {
return nil
}
var (
//go:embed init.sql
initSQL string
insertFeed = `
insert into feeds
(feed_id, uri, nick, last_scan_on, refresh_rate)
values (?, ?, ?, ?, ?)
ON CONFLICT (feed_id) DO NOTHING
`
insertTwt = `
insert into twts
(feed_id, hash, conv, dt, text, mentions, tags)
values (?, ?, ?, ?, ?, ?, ?)
ON CONFLICT (feed_id, hash) DO NOTHING
`
fetchFeeds = `
select
feed_id,
uri,
nick,
last_scan_on,
refresh_rate,
last_modified_on,
last_etag
from feeds
`
updateFeed = `
update feeds set
last_scan_on = ?,
refresh_rate = ?,
last_modified_on = ?,
last_etag = ?,
last_error = ?
where feed_id = ?
`
)
func setupDB(c console) (*sql.DB, error) {
a := c.Args()
db, err := sql.Open(a.dbtype, a.dbfile)
if err != nil {
return nil, err
}
for _, stmt := range strings.Split(initSQL, ";") {
_, err = db.ExecContext(c, stmt)
if err != nil {
return nil, err
}
}
return db, nil
}
func refreshLoop(c console) {
defer c.abort()
TenYear := 3153600000 // 10 year
OneDay := 86400 // 1 day
TenMinutes := 600 // 10 mins
fetch := NewHTTPFetcher()
queue := FibHeap(func(a, b *Feed) bool {
return a.LastScanOn.Time.Before(b.LastScanOn.Time)
})
db := c.Get("db").(*sql.DB)
c.Log("start refresh loop")
for c.Err() == nil {
if queue.IsEmpty() {
it, err := LoadFeeds(c)
for f := range it {
queue.Insert(&f)
}
if err != nil {
c.Log(err)
return
}
}
f := queue.ExtractMin()
c.Log("queue size", queue.count, "next", f.URI, "last scan on", f.LastScanOn.Time.Format(time.RFC3339))
if time.Until(f.LastScanOn.Time) > 2*time.Hour {
c.Log("too soon", f.URI)
continue
}
select {
case <-c.Done():
return
case <-time.After(time.Until(f.LastScanOn.Time)):
}
res, err := fetch.Fetch(c.Context, f)
if err != nil {
f.LastError.String, f.LastError.Valid = err.Error(), true
if errors.Is(err, ErrPermanentlyDead) {
f.RefreshRate = TenYear
}
if errors.Is(err, ErrTemporarilyDead) {
f.RefreshRate = OneDay
}
if errors.Is(err, ErrUnmodified) {
f.RefreshRate = OneDay
}
c.Log(err)
err = f.Save(c.Context, db)
if err != nil {
c.Log(err)
return
}
continue
}
f.ETag.String, f.ETag.Valid = res.ETag(), true
f.LastModified.Time, f.LastModified.Valid = res.LastModified(), true
cpy, err := os.OpenFile(filepath.Join("feeds", urlNS.UUID5(f.URI).MarshalText()), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600)
rdr := io.TeeReader(res.Body, cpy)
twtfile, err := lextwt.ParseFile(rdr, &types.Twter{Nick: f.Nick, URI: f.URI})
if err != nil {
c.Log(fmt.Errorf("%w: %w", ErrParseFailed, err))
return
}
if prev, ok :=twtfile.Info().GetN("prev", 0); ok {
_, part, ok := strings.Cut(prev.Value(), " ")
if ok {
part = f.URI[:strings.LastIndex(f.URI, "/")+1] + part
queue.Insert(&Feed{
FetchURI: part,
URI: f.URI,
Nick: f.Nick,
LastScanOn: f.LastScanOn,
RefreshRate: f.RefreshRate,
})
}
}
err = storeFeed(db, twtfile)
if err != nil {
c.Log(err)
return
}
cpy.Close()
f.LastScanOn.Time = time.Now()
f.RefreshRate = TenMinutes
f.LastError.String = ""
err = f.Save(c.Context, db)
if err != nil {
c.Log(err)
return
}
}
}
func storeFeed(db *sql.DB, f types.TwtFile) error {
loadTS := time.Now()
refreshRate := 600
feedID := urlNS.UUID5(cmp.Or(f.Twter().HashingURI, f.Twter().URI))
tx, err := db.Begin()
if err != nil {
return err
}
followers := f.Info().GetAll("follow")
followMap := make(map[string]string, len(followers))
for _, f := range f.Info().GetAll("follow") {
nick, uri, ok := strings.Cut(f.Value(), "http")
func (app *appState) Feeds() iter.Seq2[string, *Feed] {
return func(yield func(string, *Feed) bool) {
app.feeds.Range(func(k, v any) bool {
key, _ := k.(string)
value, ok := v.(*Feed)
if !ok {
continue
return false
}
nick = strings.TrimSpace(nick)
uri = "http" + strings.TrimSpace(uri)
if _, err := url.Parse(uri); err != nil {
continue
if !yield(key, value) {
return false
}
followMap[nick] = uri
return true
})
}
defer tx.Rollback()
_, err = tx.Exec(
insertFeed,
feedID,
f.Twter().HashingURI,
f.Twter().DomainNick(),
loadTS,
refreshRate,
)
if err != nil {
return err
}
for _, twt := range f.Twts() {
mentions := make(uuids, 0, len(twt.Mentions()))
for _, mention := range twt.Mentions() {
followMap[mention.Twter().Nick] = mention.Twter().URI
mentions = append(mentions, urlNS.UUID5(mention.Twter().URI))
}
tags := make(strList, 0, len(twt.Tags()))
for _, tag := range twt.Tags() {
tags = append(tags, tag.Text())
}
subject := twt.Subject()
subjectTag := ""
if subject != nil {
if tag, ok := subject.Tag().(*lextwt.Tag); ok && tag != nil {
subjectTag = tag.Text()
}
}
_, err = tx.Exec(
insertTwt,
feedID,
twt.Hash(),
subjectTag,
twt.Created(),
fmt.Sprint(twt),
mentions.ToStrList(),
tags,
)
if err != nil {
return err
}
}
for nick, uri := range followMap {
_, err = tx.Exec(
insertFeed,
urlNS.UUID5(uri),
uri,
nick,
nil,
refreshRate,
)
if err != nil {
return err
}
}
return tx.Commit()
}
func LoadFeeds(c console) (iter.Seq[Feed], error) {
var err error
var res *sql.Rows
db := c.Get("db").(*sql.DB)
res, err = db.QueryContext(c.Context, fetchFeeds)
if err != nil {
return slices.Values([]Feed{}), err
}
c.Log("load feeds")
return func(yield func(Feed) bool) {
for res.Next() {
var f Feed
f.Version = "0.0.1"
err = res.Scan(
&f.FeedID,
&f.URI,
&f.Nick,
&f.LastScanOn,
&f.RefreshRate,
&f.LastModified,
&f.ETag,
)
if err != nil {
return
}
if !f.LastScanOn.Valid {
f.LastScanOn.Time = time.Now()
f.LastScanOn.Valid = true
} else {
f.LastScanOn.Time = f.LastScanOn.Time.Add(time.Duration(f.RefreshRate) * time.Second)
}
f.FetchURI = f.URI
if !yield(f) {
return
}
}
}, err
}

14
service_test.go Normal file
View File

@ -0,0 +1,14 @@
package main
import (
"testing"
"github.com/matryer/is"
)
func TestNotAny(t *testing.T) {
is := is.New(t)
is.True(!notAny("asdf1asdf", "abcdefghijklmnopqrstuvwxyz234567"))
is.True(notAny("asdf2asdf", "abcdefghijklmnopqrstuvwxyz234567"))
}