From 7c0df508f8cf61102dd5f0ab58a88915182dd443 Mon Sep 17 00:00:00 2001 From: xuu Date: Sun, 10 Nov 2024 13:23:00 -0700 Subject: [PATCH] chore: add fetching queue --- .gitignore | 2 + fibheap.go | 185 +++++++++++++++++++++++++++++++++++++ go.mod | 15 +++ go.sum | 27 ++++++ init.sql | 22 +++++ main.go | 61 +++++++++++++ service.go | 262 +++++++++++++++++++++++++++++++++++++++++++++++++++++ uuid.go | 77 ++++++++++++++++ 8 files changed, 651 insertions(+) create mode 100644 .gitignore create mode 100644 fibheap.go create mode 100644 go.sum create mode 100644 init.sql create mode 100644 main.go create mode 100644 service.go create mode 100644 uuid.go diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..005c81c --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +twt.db* +feed diff --git a/fibheap.go b/fibheap.go new file mode 100644 index 0000000..c353e73 --- /dev/null +++ b/fibheap.go @@ -0,0 +1,185 @@ +package main + +import "math/bits" + +type fibTree[T any] struct { + value *T + parent *fibTree[T] + child []*fibTree[T] + mark bool +} + +func (t *fibTree[T]) Value() *T { return t.value } +func (t *fibTree[T]) addAtEnd(n *fibTree[T]) { + n.parent = t + t.child = append(t.child, n) +} + +type fibHeap[T any] struct { + trees []*fibTree[T] + least *fibTree[T] + count uint + less func(a, b *T) bool +} + +func FibHeap[T any](less func(a, b *T) bool) *fibHeap[T] { + return &fibHeap[T]{less: less} +} + +func (h *fibHeap[T]) GetMin() *T { + return h.least.value +} + +func (h *fibHeap[T]) IsEmpty() bool { return h.least == nil } + +func (h *fibHeap[T]) Insert(v *T) { + ntree := &fibTree[T]{value: v} + h.trees = append(h.trees, ntree) + if h.least == nil || h.less(v, h.least.value) { + h.least = ntree + } + h.count++ +} + +func (h *fibHeap[T]) ExtractMin() *T { + smallest := h.least + if smallest != nil { + // Remove smallest from root trees. + for i := range h.trees { + pos := h.trees[i] + if pos == smallest { + h.trees[i] = h.trees[len(h.trees)-1] + h.trees = h.trees[:len(h.trees)-1] + break + } + } + + // Add children to root + h.trees = append(h.trees, smallest.child...) + smallest.child = smallest.child[:0] + + h.least = nil + if len(h.trees) > 0 { + h.consolidate() + } + + h.count-- + return smallest.value + } + return nil +} + +func (h *fibHeap[T]) consolidate() { + aux := make([]*fibTree[T], bits.Len(h.count)+1) + for _, x := range h.trees { + order := len(x.child) + + // consolidate the larger roots under smaller roots of same order until we have at most one tree per order. + for aux[order] != nil { + y := aux[order] + if h.less(y.value, x.value) { + x, y = y, x + } + x.addAtEnd(y) + aux[order] = nil + order++ + } + aux[order] = x + } + + h.trees = h.trees[:0] + // move ordered trees to root and find least node. + for _, k := range aux { + if k != nil { + k.parent = nil + h.trees = append(h.trees, k) + if h.least == nil || h.less(k.value, h.least.value) { + h.least = k + } + } + } +} + +func (h *fibHeap[T]) Merge(a *fibHeap[T]) { + h.trees = append(h.trees, a.trees...) + h.count += a.count + if h.least == nil || a.least != nil && h.less(a.least.value, h.least.value) { + h.least = a.least + } +} + +func (h *fibHeap[T]) find(fn func(*T) bool) *fibTree[T] { + var st []*fibTree[T] + st = append(st, h.trees...) + var tr *fibTree[T] + + for len(st) > 0 { + tr, st = st[0], st[1:] + ro := *tr.value + if fn(&ro) { + break + } + st = append(st, tr.child...) + } + + return tr +} + +func (h *fibHeap[T]) Find(fn func(*T) bool) *T { + if needle := h.find(fn); needle != nil { + return needle.value + } + + return nil +} + +func (h *fibHeap[T]) DecreaseKey(find func(*T) bool, decrease func(*T)) { + needle := h.find(find) + if needle == nil { + return + } + decrease(needle.value) + + if h.less(needle.value, h.least.value) { + h.least = needle + } + + if parent := needle.parent; parent != nil { + if h.less(needle.value, parent.value) { + h.cut(needle) + h.cascadingCut(parent) + } + } +} + +func (h *fibHeap[T]) cut(x *fibTree[T]) { + parent := x.parent + for i := range parent.child { + pos := parent.child[i] + if pos == x { + parent.child[i] = parent.child[len(parent.child)-1] + parent.child = parent.child[:len(parent.child)-1] + break + } + } + + x.parent = nil + x.mark = false + h.trees = append(h.trees, x) + + if h.less(x.value, h.least.value) { + h.least = x + } +} + +func (h *fibHeap[T]) cascadingCut(y *fibTree[T]) { + if y.parent != nil { + if !y.mark { + y.mark = true + return + } + + h.cut(y) + h.cascadingCut(y.parent) + } +} diff --git a/go.mod b/go.mod index 660f2a0..70e21c4 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,18 @@ module go.sour.is/xt go 1.23.2 + +require ( + github.com/mattn/go-sqlite3 v1.14.24 + go.yarn.social/lextwt v0.0.0-20240908172157-7b9ae633db51 +) + +require ( + github.com/hashicorp/errwrap v1.1.0 // indirect + github.com/hashicorp/go-multierror v1.1.1 // indirect + github.com/sirupsen/logrus v1.9.3 // indirect + github.com/writeas/go-strip-markdown/v2 v2.1.1 // indirect + go.yarn.social/types v0.0.0-20230305013457-e4d91e351ac8 // indirect + golang.org/x/crypto v0.27.0 // indirect + golang.org/x/sys v0.25.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..8cd4feb --- /dev/null +++ b/go.sum @@ -0,0 +1,27 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I= +github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= +github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= +github.com/mattn/go-sqlite3 v1.14.24 h1:tpSp2G2KyMnnQu99ngJ47EIkWVmliIizyZBfPrBWDRM= +github.com/mattn/go-sqlite3 v1.14.24/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= +github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/writeas/go-strip-markdown/v2 v2.1.1 h1:hAxUM21Uhznf/FnbVGiJciqzska6iLei22Ijc3q2e28= +github.com/writeas/go-strip-markdown/v2 v2.1.1/go.mod h1:UvvgPJgn1vvN8nWuE5e7v/+qmDu3BSVnKAB6Gl7hFzA= +go.yarn.social/lextwt v0.0.0-20240908172157-7b9ae633db51 h1:XEjx0jSNv1h22gwGfQBfMypWv/YZXWGTRbqh3B8xfIs= +go.yarn.social/lextwt v0.0.0-20240908172157-7b9ae633db51/go.mod h1:CWAZuBHZfGaqa0FreSeLG+pzK3rHP2TNAG7Zh6QlRiM= +go.yarn.social/types v0.0.0-20230305013457-e4d91e351ac8 h1:zfnniiSO/WO65mSpdQzGYJ9pM0rYg/BKgrSm8h2mTyA= +go.yarn.social/types v0.0.0-20230305013457-e4d91e351ac8/go.mod h1:+xnDkQ0T0S8emxWIsvxlCAoyF8gBaj0q81hr/VrKc0c= +golang.org/x/crypto v0.27.0 h1:GXm2NjJrPaiv/h1tb2UH8QfgC/hOf/+z0p6PT8o1w7A= +golang.org/x/crypto v0.27.0/go.mod h1:1Xngt8kV6Dvbssa53Ziq6Eqn0HqbZi5Z6R0ZpwQzt70= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34= +golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/init.sql b/init.sql new file mode 100644 index 0000000..8f0e250 --- /dev/null +++ b/init.sql @@ -0,0 +1,22 @@ +PRAGMA journal_mode=WAL; + +create table if not exists feeds ( + feed_id blob primary key, + uri text, + nick string, + domain string, + last_scan_on timestamp, + refresh_rate int default 600 +); + +create table if not exists twts ( + feed_id blob, + hash text, + conv text, + dt text, -- timestamp with timezone + text text, + mentions text, -- json + tags text, -- json + primary key (feed_id, hash) +); + diff --git a/main.go b/main.go new file mode 100644 index 0000000..18750b3 --- /dev/null +++ b/main.go @@ -0,0 +1,61 @@ +package main + +import ( + "context" + "fmt" + "io" + "os" + "os/signal" +) + +type contextKey struct{ name string } + +type console struct { + io.Reader + io.Writer + err io.Writer + context.Context + abort func() +} + +func (c console) Log(v ...any) { fmt.Fprintln(c.err, v...) } +func (c console) Args() args { return c.Get("args").(args) } +func (c *console) Set(name string, value any) { + c.Context = context.WithValue(c.Context, contextKey{name}, value) +} +func (c console) Get(name string) any { + return c.Context.Value(contextKey{name}) +} + +type args struct { + dbtype string + dbfile string + baseFeed string +} + +func env(key, def string) string { + if v, ok := os.LookupEnv(key); ok { + return v + } + return def +} + +func main() { + ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt) + console := console{os.Stdin, os.Stdout, os.Stderr, ctx, stop} + + go func() { <-ctx.Done(); console.Log("shutdown"); stop() }() + + args := args{ + env("XT_DBTYPE", "sqlite3"), + env("XT_DBFILE", "file:twt.db"), + env("XT_BASE_FEED", "feed"), + } + + console.Set("args", args) + + if err := run(console); err != nil { + fmt.Println(err) + os.Exit(1) + } +} diff --git a/service.go b/service.go new file mode 100644 index 0000000..ff9caba --- /dev/null +++ b/service.go @@ -0,0 +1,262 @@ +package main + +import ( + "database/sql" + "fmt" + "io" + "net/http" + "os" + "strings" + "time" + + _ "embed" + + _ "github.com/mattn/go-sqlite3" + "go.yarn.social/lextwt" +) + +func run(c console) error { + ctx := c.Context + a := c.Args() + + db, err := sql.Open(a.dbtype, a.dbfile) + if err != nil { + return err + } + defer db.Close() + + for _, stmt := range strings.Split(initSQL, ";") { + _, err = db.ExecContext(ctx, stmt) + if err != nil { + return err + } + } + + c.Set("db", db) + + f, err := os.Open(a.baseFeed) + if err != nil { + return err + } + defer f.Close() + err = loadFeed(db, f) + if err != nil { + return err + } + + c.Log("ready") + + go refreshLoop(c) + + <-c.Done() + + return nil +} + +var ( + //go:embed init.sql + initSQL string + + insertFeed = ` + insert into feeds + (feed_id, uri, nick, last_scan_on, refresh_rate) + values (?, ?, ?, ?, ?) + ON CONFLICT (feed_id) DO NOTHING + ` + insertTwt = ` + insert into twts + (feed_id, hash, conv, dt, text, mentions, tags) + values (?, ?, ?, ?, ?, ?, ?) + ON CONFLICT (feed_id, hash) DO NOTHING + ` + + fetchFeeds = ` + select feed_id, uri, nick, last_scan_on, refresh_rate from feeds + ` + updateFeed = ` + update feeds set + last_scan_on = ?, + refresh_rate = ? + where feed_id = ? + ` +) + +func loadFeed(db *sql.DB, feed io.Reader) error { + loadTS := time.Now() + refreshRate := 600 + + f, err := lextwt.ParseFile(feed, nil) + if err != nil { + return err + } + + feedID := urlNS.UUID5(f.Twter().HashingURI) + + tx, err := db.Begin() + if err != nil { + return err + } + + followers := f.Info().GetAll("follow") + followMap := make(map[string]string, len(followers)) + for _, f := range f.Info().GetAll("follow") { + nick, uri, _ := strings.Cut(f.Value(), " ") + followMap[nick] = uri + } + + defer tx.Rollback() + + _, err = tx.Exec(insertFeed, feedID, f.Twter().HashingURI, f.Twter().DomainNick(), loadTS, refreshRate) + if err != nil { + return err + } + + for _, twt := range f.Twts() { + mentions := make(uuids, 0, len(twt.Mentions())) + for _, mention := range twt.Mentions() { + followMap[mention.Twter().Nick] = mention.Twter().URI + mentions = append(mentions, urlNS.UUID5(mention.Twter().URI)) + } + + tags := make(strList, 0, len(twt.Tags())) + for _, tag := range twt.Tags() { + tags = append(tags, tag.Text()) + } + + subject := twt.Subject() + subjectTag := "" + if subject != nil { + if tag, ok := subject.Tag().(*lextwt.Tag); ok && tag != nil { + subjectTag = tag.Text() + } + } + + _, err = tx.Exec( + insertTwt, + feedID, + twt.Hash(), + subjectTag, + twt.Created(), + fmt.Sprint(twt), + mentions.ToStrList(), + tags, + ) + if err != nil { + return err + } + } + + for nick, uri := range followMap { + _, err = tx.Exec( + insertFeed, + urlNS.UUID5(uri), + uri, + nick, + nil, + refreshRate, + ) + if err != nil { + return err + } + } + + return tx.Commit() +} + +type feed struct { + ID uuid + URI string + Nick string + LastScanOn sql.NullTime + RefreshRate int +} + +func refreshLoop(c console) { + maxInt := int(^uint(0) >> 1) + + less := func(a, b *feed) bool { + return a.LastScanOn.Time.Before(b.LastScanOn.Time) + } + + queue := FibHeap(less) + + db := c.Get("db").(*sql.DB) + res, err := db.QueryContext(c.Context, fetchFeeds) + + if err != nil { + c.Log(err) + c.abort() + return + } + + c.Log("load feeds") + for res.Next() { + var f feed + err = res.Scan(&f.ID, &f.URI, &f.Nick, &f.LastScanOn, &f.RefreshRate) + if err != nil { + c.Log(err) + c.abort() + return + } + + if !f.LastScanOn.Valid { + f.LastScanOn.Time = time.Now() + f.LastScanOn.Valid = true + } + + f.LastScanOn.Time.Add(time.Duration(f.RefreshRate) * time.Second) + queue.Insert(&f) + } + + c.Log("start refresh loop") + for !queue.IsEmpty() { + f := queue.ExtractMin() + + select { + case <-c.Done(): + return + case <-time.After(f.LastScanOn.Time.Sub(time.Now())): + c.Log("refresh", f.URI) + } + + req, err := http.NewRequestWithContext(c.Context, "GET", f.URI, nil) + if err != nil { + c.Log(err) + c.abort() + return + } + + resp, err := http.DefaultClient.Do(req) + if err != nil { + c.Log(err) + _, err = db.ExecContext(c.Context, updateFeed, f.LastScanOn, maxInt, f.ID) + if err != nil { + c.Log(err) + c.abort() + return + } + + continue + } + defer resp.Body.Close() + + err = loadFeed(db, resp.Body) + if err != nil { + _, err = db.ExecContext(c.Context, updateFeed, f.LastScanOn, maxInt, f.ID) + if err != nil { + c.Log(err) + c.abort() + return + } + + continue + } + + f.LastScanOn.Time = time.Now() + + db.ExecContext(c.Context, updateFeed, f.LastScanOn, f.RefreshRate, f.ID) + + f.LastScanOn.Time.Add(time.Duration(f.RefreshRate) * time.Second) + queue.Insert(f) + } +} diff --git a/uuid.go b/uuid.go new file mode 100644 index 0000000..bd3dc87 --- /dev/null +++ b/uuid.go @@ -0,0 +1,77 @@ +package main + +import ( + "crypto/sha1" + "database/sql/driver" + "encoding/hex" + "fmt" + "strings" +) + +type uuid [16]byte + +var urlNS = uuid{0x6b, 0xa7, 0xb8, 0x10, 0x9d, 0xad, 0x11, 0xd1, 0x80, 0xb4, 0x00, 0xc0, 0x4f, 0xd4, 0x30, 0xc8} + +func (u uuid) UUID5(value string) uuid { + h := sha1.New() + h.Write(u[:]) + h.Write([]byte(value)) + return uuid(h.Sum(nil)) +} + +func (u *uuid) UnmarshalText(data string) error { + data = strings.Trim(data, "{}") + data = strings.ReplaceAll(data, "-", "") + + _, err := hex.Decode(u[:], []byte(data)) + return err +} +func (u uuid) MarshalText() string { + s := hex.EncodeToString(u[:]) + return fmt.Sprintf("{%s-%s-%s-%s-%s}", s[:8], s[8:12], s[12:16], s[16:20], s[20:]) +} + +func (u uuid) Value() (driver.Value, error) { + return u[:], nil +} + +func (u *uuid) Scan(value any) error { + if value == nil { + return nil + } + *u = uuid(value.([]byte)) + return nil +} + +type uuids []uuid + +func (lis uuids) ToStrList() strList { + arr := make(strList, len(lis)) + for i, v := range lis { + arr[i] = v.MarshalText() + } + return arr +} + +type strList []string + +func (l *strList) Scan(value any) error { + s := value.(string) + s = strings.Trim(s, "[]") + for _, v := range strings.Split(s, ",") { + v = strings.TrimSpace(v) + v = strings.Trim(v, "\",") + *l = append(*l, v) + } + + return nil +} + +func (l strList) Value() (driver.Value, error) { + arr := make([]string, len(l)) + for i, v := range l { + arr[i] = "\""+v+"\"" + } + + return "["+strings.Join(arr, ",") +"]", nil +} \ No newline at end of file