From 3d6048e544000895c0113b35539368e9b4bcd816 Mon Sep 17 00:00:00 2001 From: xuu Date: Wed, 20 Nov 2024 09:05:05 -0700 Subject: [PATCH] chore: split out store db --- feed.go | 1 + fetcher.go | 6 +- main.go | 4 + service.go | 260 +++++++++++++++++++++++++++++------------------------ 4 files changed, 152 insertions(+), 119 deletions(-) diff --git a/feed.go b/feed.go index d70777b..ea0be6e 100644 --- a/feed.go +++ b/feed.go @@ -7,6 +7,7 @@ import ( type Feed struct { FeedID uuid + FetchURI string URI string Nick string LastScanOn sql.NullTime diff --git a/fetcher.go b/fetcher.go index ff45dc4..cb5ffb1 100644 --- a/fetcher.go +++ b/fetcher.go @@ -68,11 +68,11 @@ func NewHTTPFetcher() *httpFetcher { } func (f *httpFetcher) Fetch(ctx context.Context, request *Feed) (*Response, error) { - if strings.Contains(request.URI, "lublin.se") { + if strings.Contains(request.FetchURI, "lublin.se") { return nil, fmt.Errorf("%w: permaban: %s", ErrPermanentlyDead, request.URI) } - req, err := http.NewRequestWithContext(ctx, "GET", request.URI, nil) + req, err := http.NewRequestWithContext(ctx, "GET", request.FetchURI, nil) if err != nil { return nil, fmt.Errorf("creating HTTP request failed: %w", err) } @@ -99,7 +99,7 @@ func (f *httpFetcher) Fetch(ctx context.Context, request *Feed) (*Response, erro if errors.Is(err, &net.DNSError{}) { return nil, fmt.Errorf("%w: %s", ErrTemporarilyDead, err) } - return nil, fmt.Errorf("%w: %w", ErrPermanentlyDead, err) + return nil, fmt.Errorf("%w: %w", ErrTemporarilyDead, err) } response := &Response{ diff --git a/main.go b/main.go index 18750b3..ac23f25 100644 --- a/main.go +++ b/main.go @@ -31,6 +31,8 @@ type args struct { dbtype string dbfile string baseFeed string + Nick string + URI string } func env(key, def string) string { @@ -50,6 +52,8 @@ func main() { env("XT_DBTYPE", "sqlite3"), env("XT_DBFILE", "file:twt.db"), env("XT_BASE_FEED", "feed"), + env("XT_NICK", "xuu"), + env("XT_URI", "https://txt.sour.is/users/xuu/twtxt.txt"), } console.Set("args", args) diff --git a/service.go b/service.go index fac0a88..ab543c0 100644 --- a/service.go +++ b/service.go @@ -1,6 +1,7 @@ package main import ( + "cmp" "database/sql" "errors" "fmt" @@ -21,30 +22,32 @@ import ( ) func run(c console) error { - ctx := c.Context a := c.Args() - db, err := sql.Open(a.dbtype, a.dbfile) + db, err := setupDB(c) if err != nil { return err } defer db.Close() - for _, stmt := range strings.Split(initSQL, ";") { - _, err = db.ExecContext(ctx, stmt) - if err != nil { - return err - } - } - c.Set("db", db) f, err := os.Open(a.baseFeed) if err != nil { return err } - defer f.Close() - err = loadFeed(db, &types.Twter{Nick: "xuu", URI: "https://txt.sour.is/users/xuu/twtxt.txt"}, f) + + twtfile, err := lextwt.ParseFile(f, &types.Twter{ + Nick: a.Nick, + URI: a.URI, + }) + if err != nil { + return fmt.Errorf("%w: %w", ErrParseFailed, err) + } + f.Close() + + + err = storeFeed(db, twtfile) if err != nil { return err } @@ -68,6 +71,7 @@ var ( values (?, ?, ?, ?, ?) ON CONFLICT (feed_id) DO NOTHING ` + insertTwt = ` insert into twts (feed_id, hash, conv, dt, text, mentions, tags) @@ -97,100 +101,21 @@ var ( ` ) -func loadFeed(db *sql.DB, twter *types.Twter, feed io.Reader) error { - loadTS := time.Now() - refreshRate := 600 - - f, err := lextwt.ParseFile(feed, twter) +func setupDB(c console) (*sql.DB, error) { + a := c.Args() + db, err := sql.Open(a.dbtype, a.dbfile) if err != nil { - return fmt.Errorf("%w: %w", ErrParseFailed, err) + return nil, err } - feedID := urlNS.UUID5(coalesce(f.Twter().HashingURI, f.Twter().URI)) - - tx, err := db.Begin() - if err != nil { - return err - } - - followers := f.Info().GetAll("follow") - followMap := make(map[string]string, len(followers)) - for _, f := range f.Info().GetAll("follow") { - nick, uri, ok := strings.Cut(f.Value(), "http") - if !ok { - continue - } - nick = strings.TrimSpace(nick) - uri = "http" + strings.TrimSpace(uri) - - if uri == "https://lublin.se/twtxt.txt" { - continue - } - - if _, err := url.Parse(uri); err != nil { - continue - } - - followMap[nick] = uri - } - - defer tx.Rollback() - - _, err = tx.Exec(insertFeed, feedID, f.Twter().HashingURI, f.Twter().DomainNick(), loadTS, refreshRate) - if err != nil { - return err - } - - for _, twt := range f.Twts() { - mentions := make(uuids, 0, len(twt.Mentions())) - for _, mention := range twt.Mentions() { - followMap[mention.Twter().Nick] = mention.Twter().URI - mentions = append(mentions, urlNS.UUID5(mention.Twter().URI)) - } - - tags := make(strList, 0, len(twt.Tags())) - for _, tag := range twt.Tags() { - tags = append(tags, tag.Text()) - } - - subject := twt.Subject() - subjectTag := "" - if subject != nil { - if tag, ok := subject.Tag().(*lextwt.Tag); ok && tag != nil { - subjectTag = tag.Text() - } - } - - _, err = tx.Exec( - insertTwt, - feedID, - twt.Hash(), - subjectTag, - twt.Created(), - fmt.Sprint(twt), - mentions.ToStrList(), - tags, - ) + for _, stmt := range strings.Split(initSQL, ";") { + _, err = db.ExecContext(c, stmt) if err != nil { - return err + return nil, err } } - for nick, uri := range followMap { - _, err = tx.Exec( - insertFeed, - urlNS.UUID5(uri), - uri, - nick, - nil, - refreshRate, - ) - if err != nil { - return err - } - } - - return tx.Commit() + return db, nil } func refreshLoop(c console) { @@ -198,15 +123,13 @@ func refreshLoop(c console) { TenYear := 3153600000 // 10 year OneDay := 86400 // 1 day - TenMinutes := 600 + TenMinutes := 600 // 10 mins fetch := NewHTTPFetcher() - less := func(a, b *Feed) bool { + queue := FibHeap(func(a, b *Feed) bool { return a.LastScanOn.Time.Before(b.LastScanOn.Time) - } - - queue := FibHeap(less) + }) db := c.Get("db").(*sql.DB) @@ -267,7 +190,28 @@ func refreshLoop(c console) { cpy, err := os.OpenFile(filepath.Join("feeds", urlNS.UUID5(f.URI).MarshalText()), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600) rdr := io.TeeReader(res.Body, cpy) - err = loadFeed(db, &types.Twter{Nick: f.Nick, URI: f.URI}, rdr) + twtfile, err := lextwt.ParseFile(rdr, &types.Twter{Nick: f.Nick, URI: f.URI}) + if err != nil { + c.Log(fmt.Errorf("%w: %w", ErrParseFailed, err)) + return + } + + if prev, ok :=twtfile.Info().GetN("prev", 0); ok { + _, part, ok := strings.Cut(prev.Value(), " ") + if ok { + + part = f.URI[:strings.LastIndex(f.URI, "/")+1] + part + queue.Insert(&Feed{ + FetchURI: part, + URI: f.URI, + Nick: f.Nick, + LastScanOn: f.LastScanOn, + RefreshRate: f.RefreshRate, + }) + } + } + + err = storeFeed(db, twtfile) if err != nil { c.Log(err) return @@ -287,6 +231,100 @@ func refreshLoop(c console) { } } +func storeFeed(db *sql.DB, f types.TwtFile) error { + loadTS := time.Now() + refreshRate := 600 + + feedID := urlNS.UUID5(cmp.Or(f.Twter().HashingURI, f.Twter().URI)) + + tx, err := db.Begin() + if err != nil { + return err + } + + followers := f.Info().GetAll("follow") + followMap := make(map[string]string, len(followers)) + for _, f := range f.Info().GetAll("follow") { + nick, uri, ok := strings.Cut(f.Value(), "http") + if !ok { + continue + } + nick = strings.TrimSpace(nick) + uri = "http" + strings.TrimSpace(uri) + + if _, err := url.Parse(uri); err != nil { + continue + } + + followMap[nick] = uri + } + + defer tx.Rollback() + + _, err = tx.Exec( + insertFeed, + feedID, + f.Twter().HashingURI, + f.Twter().DomainNick(), + loadTS, + refreshRate, + ) + if err != nil { + return err + } + + for _, twt := range f.Twts() { + mentions := make(uuids, 0, len(twt.Mentions())) + for _, mention := range twt.Mentions() { + followMap[mention.Twter().Nick] = mention.Twter().URI + mentions = append(mentions, urlNS.UUID5(mention.Twter().URI)) + } + + tags := make(strList, 0, len(twt.Tags())) + for _, tag := range twt.Tags() { + tags = append(tags, tag.Text()) + } + + subject := twt.Subject() + subjectTag := "" + if subject != nil { + if tag, ok := subject.Tag().(*lextwt.Tag); ok && tag != nil { + subjectTag = tag.Text() + } + } + + _, err = tx.Exec( + insertTwt, + feedID, + twt.Hash(), + subjectTag, + twt.Created(), + fmt.Sprint(twt), + mentions.ToStrList(), + tags, + ) + if err != nil { + return err + } + } + + for nick, uri := range followMap { + _, err = tx.Exec( + insertFeed, + urlNS.UUID5(uri), + uri, + nick, + nil, + refreshRate, + ) + if err != nil { + return err + } + } + + return tx.Commit() +} + func LoadFeeds(c console) (iter.Seq[Feed], error) { var err error var res *sql.Rows @@ -324,21 +362,11 @@ func LoadFeeds(c console) (iter.Seq[Feed], error) { f.LastScanOn.Time = f.LastScanOn.Time.Add(time.Duration(f.RefreshRate) * time.Second) } + f.FetchURI = f.URI + if !yield(f) { return } } }, err } - -func coalesce[T comparable](a T, values ...T) T { - var zero T - - for _, v := range values { - if a == zero { - a = v - } - } - - return a -}