chore: add fetching queue

This commit is contained in:
xuu 2024-11-10 13:23:00 -07:00
parent 9ae1b7e67e
commit 7c0df508f8
Signed by: xuu
GPG Key ID: 8B3B0604F164E04F
8 changed files with 651 additions and 0 deletions

2
.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
twt.db*
feed

185
fibheap.go Normal file
View File

@ -0,0 +1,185 @@
package main
import "math/bits"
type fibTree[T any] struct {
value *T
parent *fibTree[T]
child []*fibTree[T]
mark bool
}
func (t *fibTree[T]) Value() *T { return t.value }
func (t *fibTree[T]) addAtEnd(n *fibTree[T]) {
n.parent = t
t.child = append(t.child, n)
}
type fibHeap[T any] struct {
trees []*fibTree[T]
least *fibTree[T]
count uint
less func(a, b *T) bool
}
func FibHeap[T any](less func(a, b *T) bool) *fibHeap[T] {
return &fibHeap[T]{less: less}
}
func (h *fibHeap[T]) GetMin() *T {
return h.least.value
}
func (h *fibHeap[T]) IsEmpty() bool { return h.least == nil }
func (h *fibHeap[T]) Insert(v *T) {
ntree := &fibTree[T]{value: v}
h.trees = append(h.trees, ntree)
if h.least == nil || h.less(v, h.least.value) {
h.least = ntree
}
h.count++
}
func (h *fibHeap[T]) ExtractMin() *T {
smallest := h.least
if smallest != nil {
// Remove smallest from root trees.
for i := range h.trees {
pos := h.trees[i]
if pos == smallest {
h.trees[i] = h.trees[len(h.trees)-1]
h.trees = h.trees[:len(h.trees)-1]
break
}
}
// Add children to root
h.trees = append(h.trees, smallest.child...)
smallest.child = smallest.child[:0]
h.least = nil
if len(h.trees) > 0 {
h.consolidate()
}
h.count--
return smallest.value
}
return nil
}
func (h *fibHeap[T]) consolidate() {
aux := make([]*fibTree[T], bits.Len(h.count)+1)
for _, x := range h.trees {
order := len(x.child)
// consolidate the larger roots under smaller roots of same order until we have at most one tree per order.
for aux[order] != nil {
y := aux[order]
if h.less(y.value, x.value) {
x, y = y, x
}
x.addAtEnd(y)
aux[order] = nil
order++
}
aux[order] = x
}
h.trees = h.trees[:0]
// move ordered trees to root and find least node.
for _, k := range aux {
if k != nil {
k.parent = nil
h.trees = append(h.trees, k)
if h.least == nil || h.less(k.value, h.least.value) {
h.least = k
}
}
}
}
func (h *fibHeap[T]) Merge(a *fibHeap[T]) {
h.trees = append(h.trees, a.trees...)
h.count += a.count
if h.least == nil || a.least != nil && h.less(a.least.value, h.least.value) {
h.least = a.least
}
}
func (h *fibHeap[T]) find(fn func(*T) bool) *fibTree[T] {
var st []*fibTree[T]
st = append(st, h.trees...)
var tr *fibTree[T]
for len(st) > 0 {
tr, st = st[0], st[1:]
ro := *tr.value
if fn(&ro) {
break
}
st = append(st, tr.child...)
}
return tr
}
func (h *fibHeap[T]) Find(fn func(*T) bool) *T {
if needle := h.find(fn); needle != nil {
return needle.value
}
return nil
}
func (h *fibHeap[T]) DecreaseKey(find func(*T) bool, decrease func(*T)) {
needle := h.find(find)
if needle == nil {
return
}
decrease(needle.value)
if h.less(needle.value, h.least.value) {
h.least = needle
}
if parent := needle.parent; parent != nil {
if h.less(needle.value, parent.value) {
h.cut(needle)
h.cascadingCut(parent)
}
}
}
func (h *fibHeap[T]) cut(x *fibTree[T]) {
parent := x.parent
for i := range parent.child {
pos := parent.child[i]
if pos == x {
parent.child[i] = parent.child[len(parent.child)-1]
parent.child = parent.child[:len(parent.child)-1]
break
}
}
x.parent = nil
x.mark = false
h.trees = append(h.trees, x)
if h.less(x.value, h.least.value) {
h.least = x
}
}
func (h *fibHeap[T]) cascadingCut(y *fibTree[T]) {
if y.parent != nil {
if !y.mark {
y.mark = true
return
}
h.cut(y)
h.cascadingCut(y.parent)
}
}

15
go.mod
View File

@ -1,3 +1,18 @@
module go.sour.is/xt
go 1.23.2
require (
github.com/mattn/go-sqlite3 v1.14.24
go.yarn.social/lextwt v0.0.0-20240908172157-7b9ae633db51
)
require (
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/writeas/go-strip-markdown/v2 v2.1.1 // indirect
go.yarn.social/types v0.0.0-20230305013457-e4d91e351ac8 // indirect
golang.org/x/crypto v0.27.0 // indirect
golang.org/x/sys v0.25.0 // indirect
)

27
go.sum Normal file
View File

@ -0,0 +1,27 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I=
github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
github.com/mattn/go-sqlite3 v1.14.24 h1:tpSp2G2KyMnnQu99ngJ47EIkWVmliIizyZBfPrBWDRM=
github.com/mattn/go-sqlite3 v1.14.24/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/writeas/go-strip-markdown/v2 v2.1.1 h1:hAxUM21Uhznf/FnbVGiJciqzska6iLei22Ijc3q2e28=
github.com/writeas/go-strip-markdown/v2 v2.1.1/go.mod h1:UvvgPJgn1vvN8nWuE5e7v/+qmDu3BSVnKAB6Gl7hFzA=
go.yarn.social/lextwt v0.0.0-20240908172157-7b9ae633db51 h1:XEjx0jSNv1h22gwGfQBfMypWv/YZXWGTRbqh3B8xfIs=
go.yarn.social/lextwt v0.0.0-20240908172157-7b9ae633db51/go.mod h1:CWAZuBHZfGaqa0FreSeLG+pzK3rHP2TNAG7Zh6QlRiM=
go.yarn.social/types v0.0.0-20230305013457-e4d91e351ac8 h1:zfnniiSO/WO65mSpdQzGYJ9pM0rYg/BKgrSm8h2mTyA=
go.yarn.social/types v0.0.0-20230305013457-e4d91e351ac8/go.mod h1:+xnDkQ0T0S8emxWIsvxlCAoyF8gBaj0q81hr/VrKc0c=
golang.org/x/crypto v0.27.0 h1:GXm2NjJrPaiv/h1tb2UH8QfgC/hOf/+z0p6PT8o1w7A=
golang.org/x/crypto v0.27.0/go.mod h1:1Xngt8kV6Dvbssa53Ziq6Eqn0HqbZi5Z6R0ZpwQzt70=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34=
golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

22
init.sql Normal file
View File

@ -0,0 +1,22 @@
PRAGMA journal_mode=WAL;
create table if not exists feeds (
feed_id blob primary key,
uri text,
nick string,
domain string,
last_scan_on timestamp,
refresh_rate int default 600
);
create table if not exists twts (
feed_id blob,
hash text,
conv text,
dt text, -- timestamp with timezone
text text,
mentions text, -- json
tags text, -- json
primary key (feed_id, hash)
);

61
main.go Normal file
View File

@ -0,0 +1,61 @@
package main
import (
"context"
"fmt"
"io"
"os"
"os/signal"
)
type contextKey struct{ name string }
type console struct {
io.Reader
io.Writer
err io.Writer
context.Context
abort func()
}
func (c console) Log(v ...any) { fmt.Fprintln(c.err, v...) }
func (c console) Args() args { return c.Get("args").(args) }
func (c *console) Set(name string, value any) {
c.Context = context.WithValue(c.Context, contextKey{name}, value)
}
func (c console) Get(name string) any {
return c.Context.Value(contextKey{name})
}
type args struct {
dbtype string
dbfile string
baseFeed string
}
func env(key, def string) string {
if v, ok := os.LookupEnv(key); ok {
return v
}
return def
}
func main() {
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt)
console := console{os.Stdin, os.Stdout, os.Stderr, ctx, stop}
go func() { <-ctx.Done(); console.Log("shutdown"); stop() }()
args := args{
env("XT_DBTYPE", "sqlite3"),
env("XT_DBFILE", "file:twt.db"),
env("XT_BASE_FEED", "feed"),
}
console.Set("args", args)
if err := run(console); err != nil {
fmt.Println(err)
os.Exit(1)
}
}

262
service.go Normal file
View File

@ -0,0 +1,262 @@
package main
import (
"database/sql"
"fmt"
"io"
"net/http"
"os"
"strings"
"time"
_ "embed"
_ "github.com/mattn/go-sqlite3"
"go.yarn.social/lextwt"
)
func run(c console) error {
ctx := c.Context
a := c.Args()
db, err := sql.Open(a.dbtype, a.dbfile)
if err != nil {
return err
}
defer db.Close()
for _, stmt := range strings.Split(initSQL, ";") {
_, err = db.ExecContext(ctx, stmt)
if err != nil {
return err
}
}
c.Set("db", db)
f, err := os.Open(a.baseFeed)
if err != nil {
return err
}
defer f.Close()
err = loadFeed(db, f)
if err != nil {
return err
}
c.Log("ready")
go refreshLoop(c)
<-c.Done()
return nil
}
var (
//go:embed init.sql
initSQL string
insertFeed = `
insert into feeds
(feed_id, uri, nick, last_scan_on, refresh_rate)
values (?, ?, ?, ?, ?)
ON CONFLICT (feed_id) DO NOTHING
`
insertTwt = `
insert into twts
(feed_id, hash, conv, dt, text, mentions, tags)
values (?, ?, ?, ?, ?, ?, ?)
ON CONFLICT (feed_id, hash) DO NOTHING
`
fetchFeeds = `
select feed_id, uri, nick, last_scan_on, refresh_rate from feeds
`
updateFeed = `
update feeds set
last_scan_on = ?,
refresh_rate = ?
where feed_id = ?
`
)
func loadFeed(db *sql.DB, feed io.Reader) error {
loadTS := time.Now()
refreshRate := 600
f, err := lextwt.ParseFile(feed, nil)
if err != nil {
return err
}
feedID := urlNS.UUID5(f.Twter().HashingURI)
tx, err := db.Begin()
if err != nil {
return err
}
followers := f.Info().GetAll("follow")
followMap := make(map[string]string, len(followers))
for _, f := range f.Info().GetAll("follow") {
nick, uri, _ := strings.Cut(f.Value(), " ")
followMap[nick] = uri
}
defer tx.Rollback()
_, err = tx.Exec(insertFeed, feedID, f.Twter().HashingURI, f.Twter().DomainNick(), loadTS, refreshRate)
if err != nil {
return err
}
for _, twt := range f.Twts() {
mentions := make(uuids, 0, len(twt.Mentions()))
for _, mention := range twt.Mentions() {
followMap[mention.Twter().Nick] = mention.Twter().URI
mentions = append(mentions, urlNS.UUID5(mention.Twter().URI))
}
tags := make(strList, 0, len(twt.Tags()))
for _, tag := range twt.Tags() {
tags = append(tags, tag.Text())
}
subject := twt.Subject()
subjectTag := ""
if subject != nil {
if tag, ok := subject.Tag().(*lextwt.Tag); ok && tag != nil {
subjectTag = tag.Text()
}
}
_, err = tx.Exec(
insertTwt,
feedID,
twt.Hash(),
subjectTag,
twt.Created(),
fmt.Sprint(twt),
mentions.ToStrList(),
tags,
)
if err != nil {
return err
}
}
for nick, uri := range followMap {
_, err = tx.Exec(
insertFeed,
urlNS.UUID5(uri),
uri,
nick,
nil,
refreshRate,
)
if err != nil {
return err
}
}
return tx.Commit()
}
type feed struct {
ID uuid
URI string
Nick string
LastScanOn sql.NullTime
RefreshRate int
}
func refreshLoop(c console) {
maxInt := int(^uint(0) >> 1)
less := func(a, b *feed) bool {
return a.LastScanOn.Time.Before(b.LastScanOn.Time)
}
queue := FibHeap(less)
db := c.Get("db").(*sql.DB)
res, err := db.QueryContext(c.Context, fetchFeeds)
if err != nil {
c.Log(err)
c.abort()
return
}
c.Log("load feeds")
for res.Next() {
var f feed
err = res.Scan(&f.ID, &f.URI, &f.Nick, &f.LastScanOn, &f.RefreshRate)
if err != nil {
c.Log(err)
c.abort()
return
}
if !f.LastScanOn.Valid {
f.LastScanOn.Time = time.Now()
f.LastScanOn.Valid = true
}
f.LastScanOn.Time.Add(time.Duration(f.RefreshRate) * time.Second)
queue.Insert(&f)
}
c.Log("start refresh loop")
for !queue.IsEmpty() {
f := queue.ExtractMin()
select {
case <-c.Done():
return
case <-time.After(f.LastScanOn.Time.Sub(time.Now())):
c.Log("refresh", f.URI)
}
req, err := http.NewRequestWithContext(c.Context, "GET", f.URI, nil)
if err != nil {
c.Log(err)
c.abort()
return
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
c.Log(err)
_, err = db.ExecContext(c.Context, updateFeed, f.LastScanOn, maxInt, f.ID)
if err != nil {
c.Log(err)
c.abort()
return
}
continue
}
defer resp.Body.Close()
err = loadFeed(db, resp.Body)
if err != nil {
_, err = db.ExecContext(c.Context, updateFeed, f.LastScanOn, maxInt, f.ID)
if err != nil {
c.Log(err)
c.abort()
return
}
continue
}
f.LastScanOn.Time = time.Now()
db.ExecContext(c.Context, updateFeed, f.LastScanOn, f.RefreshRate, f.ID)
f.LastScanOn.Time.Add(time.Duration(f.RefreshRate) * time.Second)
queue.Insert(f)
}
}

77
uuid.go Normal file
View File

@ -0,0 +1,77 @@
package main
import (
"crypto/sha1"
"database/sql/driver"
"encoding/hex"
"fmt"
"strings"
)
type uuid [16]byte
var urlNS = uuid{0x6b, 0xa7, 0xb8, 0x10, 0x9d, 0xad, 0x11, 0xd1, 0x80, 0xb4, 0x00, 0xc0, 0x4f, 0xd4, 0x30, 0xc8}
func (u uuid) UUID5(value string) uuid {
h := sha1.New()
h.Write(u[:])
h.Write([]byte(value))
return uuid(h.Sum(nil))
}
func (u *uuid) UnmarshalText(data string) error {
data = strings.Trim(data, "{}")
data = strings.ReplaceAll(data, "-", "")
_, err := hex.Decode(u[:], []byte(data))
return err
}
func (u uuid) MarshalText() string {
s := hex.EncodeToString(u[:])
return fmt.Sprintf("{%s-%s-%s-%s-%s}", s[:8], s[8:12], s[12:16], s[16:20], s[20:])
}
func (u uuid) Value() (driver.Value, error) {
return u[:], nil
}
func (u *uuid) Scan(value any) error {
if value == nil {
return nil
}
*u = uuid(value.([]byte))
return nil
}
type uuids []uuid
func (lis uuids) ToStrList() strList {
arr := make(strList, len(lis))
for i, v := range lis {
arr[i] = v.MarshalText()
}
return arr
}
type strList []string
func (l *strList) Scan(value any) error {
s := value.(string)
s = strings.Trim(s, "[]")
for _, v := range strings.Split(s, ",") {
v = strings.TrimSpace(v)
v = strings.Trim(v, "\",")
*l = append(*l, v)
}
return nil
}
func (l strList) Value() (driver.Value, error) {
arr := make([]string, len(l))
for i, v := range l {
arr[i] = "\""+v+"\""
}
return "["+strings.Join(arr, ",") +"]", nil
}