chore: split out store db

This commit is contained in:
xuu 2024-11-20 09:05:05 -07:00
parent 0ea449264a
commit 3d6048e544
Signed by: xuu
GPG Key ID: 8B3B0604F164E04F
4 changed files with 152 additions and 119 deletions

View File

@ -7,6 +7,7 @@ import (
type Feed struct { type Feed struct {
FeedID uuid FeedID uuid
FetchURI string
URI string URI string
Nick string Nick string
LastScanOn sql.NullTime LastScanOn sql.NullTime

View File

@ -68,11 +68,11 @@ func NewHTTPFetcher() *httpFetcher {
} }
func (f *httpFetcher) Fetch(ctx context.Context, request *Feed) (*Response, error) { func (f *httpFetcher) Fetch(ctx context.Context, request *Feed) (*Response, error) {
if strings.Contains(request.URI, "lublin.se") { if strings.Contains(request.FetchURI, "lublin.se") {
return nil, fmt.Errorf("%w: permaban: %s", ErrPermanentlyDead, request.URI) return nil, fmt.Errorf("%w: permaban: %s", ErrPermanentlyDead, request.URI)
} }
req, err := http.NewRequestWithContext(ctx, "GET", request.URI, nil) req, err := http.NewRequestWithContext(ctx, "GET", request.FetchURI, nil)
if err != nil { if err != nil {
return nil, fmt.Errorf("creating HTTP request failed: %w", err) return nil, fmt.Errorf("creating HTTP request failed: %w", err)
} }
@ -99,7 +99,7 @@ func (f *httpFetcher) Fetch(ctx context.Context, request *Feed) (*Response, erro
if errors.Is(err, &net.DNSError{}) { if errors.Is(err, &net.DNSError{}) {
return nil, fmt.Errorf("%w: %s", ErrTemporarilyDead, err) return nil, fmt.Errorf("%w: %s", ErrTemporarilyDead, err)
} }
return nil, fmt.Errorf("%w: %w", ErrPermanentlyDead, err) return nil, fmt.Errorf("%w: %w", ErrTemporarilyDead, err)
} }
response := &Response{ response := &Response{

View File

@ -31,6 +31,8 @@ type args struct {
dbtype string dbtype string
dbfile string dbfile string
baseFeed string baseFeed string
Nick string
URI string
} }
func env(key, def string) string { func env(key, def string) string {
@ -50,6 +52,8 @@ func main() {
env("XT_DBTYPE", "sqlite3"), env("XT_DBTYPE", "sqlite3"),
env("XT_DBFILE", "file:twt.db"), env("XT_DBFILE", "file:twt.db"),
env("XT_BASE_FEED", "feed"), env("XT_BASE_FEED", "feed"),
env("XT_NICK", "xuu"),
env("XT_URI", "https://txt.sour.is/users/xuu/twtxt.txt"),
} }
console.Set("args", args) console.Set("args", args)

View File

@ -1,6 +1,7 @@
package main package main
import ( import (
"cmp"
"database/sql" "database/sql"
"errors" "errors"
"fmt" "fmt"
@ -21,30 +22,32 @@ import (
) )
func run(c console) error { func run(c console) error {
ctx := c.Context
a := c.Args() a := c.Args()
db, err := sql.Open(a.dbtype, a.dbfile) db, err := setupDB(c)
if err != nil { if err != nil {
return err return err
} }
defer db.Close() defer db.Close()
for _, stmt := range strings.Split(initSQL, ";") {
_, err = db.ExecContext(ctx, stmt)
if err != nil {
return err
}
}
c.Set("db", db) c.Set("db", db)
f, err := os.Open(a.baseFeed) f, err := os.Open(a.baseFeed)
if err != nil { if err != nil {
return err return err
} }
defer f.Close()
err = loadFeed(db, &types.Twter{Nick: "xuu", URI: "https://txt.sour.is/users/xuu/twtxt.txt"}, f) twtfile, err := lextwt.ParseFile(f, &types.Twter{
Nick: a.Nick,
URI: a.URI,
})
if err != nil {
return fmt.Errorf("%w: %w", ErrParseFailed, err)
}
f.Close()
err = storeFeed(db, twtfile)
if err != nil { if err != nil {
return err return err
} }
@ -68,6 +71,7 @@ var (
values (?, ?, ?, ?, ?) values (?, ?, ?, ?, ?)
ON CONFLICT (feed_id) DO NOTHING ON CONFLICT (feed_id) DO NOTHING
` `
insertTwt = ` insertTwt = `
insert into twts insert into twts
(feed_id, hash, conv, dt, text, mentions, tags) (feed_id, hash, conv, dt, text, mentions, tags)
@ -97,100 +101,21 @@ var (
` `
) )
func loadFeed(db *sql.DB, twter *types.Twter, feed io.Reader) error { func setupDB(c console) (*sql.DB, error) {
loadTS := time.Now() a := c.Args()
refreshRate := 600 db, err := sql.Open(a.dbtype, a.dbfile)
f, err := lextwt.ParseFile(feed, twter)
if err != nil { if err != nil {
return fmt.Errorf("%w: %w", ErrParseFailed, err) return nil, err
} }
feedID := urlNS.UUID5(coalesce(f.Twter().HashingURI, f.Twter().URI)) for _, stmt := range strings.Split(initSQL, ";") {
_, err = db.ExecContext(c, stmt)
tx, err := db.Begin()
if err != nil { if err != nil {
return err return nil, err
}
followers := f.Info().GetAll("follow")
followMap := make(map[string]string, len(followers))
for _, f := range f.Info().GetAll("follow") {
nick, uri, ok := strings.Cut(f.Value(), "http")
if !ok {
continue
}
nick = strings.TrimSpace(nick)
uri = "http" + strings.TrimSpace(uri)
if uri == "https://lublin.se/twtxt.txt" {
continue
}
if _, err := url.Parse(uri); err != nil {
continue
}
followMap[nick] = uri
}
defer tx.Rollback()
_, err = tx.Exec(insertFeed, feedID, f.Twter().HashingURI, f.Twter().DomainNick(), loadTS, refreshRate)
if err != nil {
return err
}
for _, twt := range f.Twts() {
mentions := make(uuids, 0, len(twt.Mentions()))
for _, mention := range twt.Mentions() {
followMap[mention.Twter().Nick] = mention.Twter().URI
mentions = append(mentions, urlNS.UUID5(mention.Twter().URI))
}
tags := make(strList, 0, len(twt.Tags()))
for _, tag := range twt.Tags() {
tags = append(tags, tag.Text())
}
subject := twt.Subject()
subjectTag := ""
if subject != nil {
if tag, ok := subject.Tag().(*lextwt.Tag); ok && tag != nil {
subjectTag = tag.Text()
} }
} }
_, err = tx.Exec( return db, nil
insertTwt,
feedID,
twt.Hash(),
subjectTag,
twt.Created(),
fmt.Sprint(twt),
mentions.ToStrList(),
tags,
)
if err != nil {
return err
}
}
for nick, uri := range followMap {
_, err = tx.Exec(
insertFeed,
urlNS.UUID5(uri),
uri,
nick,
nil,
refreshRate,
)
if err != nil {
return err
}
}
return tx.Commit()
} }
func refreshLoop(c console) { func refreshLoop(c console) {
@ -198,15 +123,13 @@ func refreshLoop(c console) {
TenYear := 3153600000 // 10 year TenYear := 3153600000 // 10 year
OneDay := 86400 // 1 day OneDay := 86400 // 1 day
TenMinutes := 600 TenMinutes := 600 // 10 mins
fetch := NewHTTPFetcher() fetch := NewHTTPFetcher()
less := func(a, b *Feed) bool { queue := FibHeap(func(a, b *Feed) bool {
return a.LastScanOn.Time.Before(b.LastScanOn.Time) return a.LastScanOn.Time.Before(b.LastScanOn.Time)
} })
queue := FibHeap(less)
db := c.Get("db").(*sql.DB) db := c.Get("db").(*sql.DB)
@ -267,7 +190,28 @@ func refreshLoop(c console) {
cpy, err := os.OpenFile(filepath.Join("feeds", urlNS.UUID5(f.URI).MarshalText()), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600) cpy, err := os.OpenFile(filepath.Join("feeds", urlNS.UUID5(f.URI).MarshalText()), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600)
rdr := io.TeeReader(res.Body, cpy) rdr := io.TeeReader(res.Body, cpy)
err = loadFeed(db, &types.Twter{Nick: f.Nick, URI: f.URI}, rdr) twtfile, err := lextwt.ParseFile(rdr, &types.Twter{Nick: f.Nick, URI: f.URI})
if err != nil {
c.Log(fmt.Errorf("%w: %w", ErrParseFailed, err))
return
}
if prev, ok :=twtfile.Info().GetN("prev", 0); ok {
_, part, ok := strings.Cut(prev.Value(), " ")
if ok {
part = f.URI[:strings.LastIndex(f.URI, "/")+1] + part
queue.Insert(&Feed{
FetchURI: part,
URI: f.URI,
Nick: f.Nick,
LastScanOn: f.LastScanOn,
RefreshRate: f.RefreshRate,
})
}
}
err = storeFeed(db, twtfile)
if err != nil { if err != nil {
c.Log(err) c.Log(err)
return return
@ -287,6 +231,100 @@ func refreshLoop(c console) {
} }
} }
func storeFeed(db *sql.DB, f types.TwtFile) error {
loadTS := time.Now()
refreshRate := 600
feedID := urlNS.UUID5(cmp.Or(f.Twter().HashingURI, f.Twter().URI))
tx, err := db.Begin()
if err != nil {
return err
}
followers := f.Info().GetAll("follow")
followMap := make(map[string]string, len(followers))
for _, f := range f.Info().GetAll("follow") {
nick, uri, ok := strings.Cut(f.Value(), "http")
if !ok {
continue
}
nick = strings.TrimSpace(nick)
uri = "http" + strings.TrimSpace(uri)
if _, err := url.Parse(uri); err != nil {
continue
}
followMap[nick] = uri
}
defer tx.Rollback()
_, err = tx.Exec(
insertFeed,
feedID,
f.Twter().HashingURI,
f.Twter().DomainNick(),
loadTS,
refreshRate,
)
if err != nil {
return err
}
for _, twt := range f.Twts() {
mentions := make(uuids, 0, len(twt.Mentions()))
for _, mention := range twt.Mentions() {
followMap[mention.Twter().Nick] = mention.Twter().URI
mentions = append(mentions, urlNS.UUID5(mention.Twter().URI))
}
tags := make(strList, 0, len(twt.Tags()))
for _, tag := range twt.Tags() {
tags = append(tags, tag.Text())
}
subject := twt.Subject()
subjectTag := ""
if subject != nil {
if tag, ok := subject.Tag().(*lextwt.Tag); ok && tag != nil {
subjectTag = tag.Text()
}
}
_, err = tx.Exec(
insertTwt,
feedID,
twt.Hash(),
subjectTag,
twt.Created(),
fmt.Sprint(twt),
mentions.ToStrList(),
tags,
)
if err != nil {
return err
}
}
for nick, uri := range followMap {
_, err = tx.Exec(
insertFeed,
urlNS.UUID5(uri),
uri,
nick,
nil,
refreshRate,
)
if err != nil {
return err
}
}
return tx.Commit()
}
func LoadFeeds(c console) (iter.Seq[Feed], error) { func LoadFeeds(c console) (iter.Seq[Feed], error) {
var err error var err error
var res *sql.Rows var res *sql.Rows
@ -324,21 +362,11 @@ func LoadFeeds(c console) (iter.Seq[Feed], error) {
f.LastScanOn.Time = f.LastScanOn.Time.Add(time.Duration(f.RefreshRate) * time.Second) f.LastScanOn.Time = f.LastScanOn.Time.Add(time.Duration(f.RefreshRate) * time.Second)
} }
f.FetchURI = f.URI
if !yield(f) { if !yield(f) {
return return
} }
} }
}, err }, err
} }
func coalesce[T comparable](a T, values ...T) T {
var zero T
for _, v := range values {
if a == zero {
a = v
}
}
return a
}