package main import ( "database/sql" "fmt" "io" "net/http" "net/url" "os" "strings" "time" _ "embed" _ "github.com/mattn/go-sqlite3" "go.yarn.social/lextwt" ) func run(c console) error { ctx := c.Context a := c.Args() db, err := sql.Open(a.dbtype, a.dbfile) if err != nil { return err } defer db.Close() for _, stmt := range strings.Split(initSQL, ";") { _, err = db.ExecContext(ctx, stmt) if err != nil { return err } } c.Set("db", db) f, err := os.Open(a.baseFeed) if err != nil { return err } defer f.Close() err = loadFeed(db, f) if err != nil { return err } c.Log("ready") go refreshLoop(c) <-c.Done() return nil } var ( //go:embed init.sql initSQL string insertFeed = ` insert into feeds (feed_id, uri, nick, last_scan_on, refresh_rate) values (?, ?, ?, ?, ?) ON CONFLICT (feed_id) DO NOTHING ` insertTwt = ` insert into twts (feed_id, hash, conv, dt, text, mentions, tags) values (?, ?, ?, ?, ?, ?, ?) ON CONFLICT (feed_id, hash) DO NOTHING ` fetchFeeds = ` select feed_id, uri, nick, last_scan_on, refresh_rate from feeds ` updateFeed = ` update feeds set last_scan_on = ?, refresh_rate = ? where feed_id = ? ` ) func loadFeed(db *sql.DB, feed io.Reader) error { loadTS := time.Now() refreshRate := 600 f, err := lextwt.ParseFile(feed, nil) if err != nil { return err } feedID := urlNS.UUID5(f.Twter().HashingURI) tx, err := db.Begin() if err != nil { return err } followers := f.Info().GetAll("follow") followMap := make(map[string]string, len(followers)) for _, f := range f.Info().GetAll("follow") { nick, uri, ok := strings.Cut(f.Value(), "http") if !ok{ continue } nick = strings.TrimSpace(nick) uri = "http" + strings.TrimSpace(uri) if _, err := url.Parse(uri); err != nil { continue } followMap[nick] = uri } defer tx.Rollback() _, err = tx.Exec(insertFeed, feedID, f.Twter().HashingURI, f.Twter().DomainNick(), loadTS, refreshRate) if err != nil { return err } for _, twt := range f.Twts() { mentions := make(uuids, 0, len(twt.Mentions())) for _, mention := range twt.Mentions() { followMap[mention.Twter().Nick] = mention.Twter().URI mentions = append(mentions, urlNS.UUID5(mention.Twter().URI)) } tags := make(strList, 0, len(twt.Tags())) for _, tag := range twt.Tags() { tags = append(tags, tag.Text()) } subject := twt.Subject() subjectTag := "" if subject != nil { if tag, ok := subject.Tag().(*lextwt.Tag); ok && tag != nil { subjectTag = tag.Text() } } _, err = tx.Exec( insertTwt, feedID, twt.Hash(), subjectTag, twt.Created(), fmt.Sprint(twt), mentions.ToStrList(), tags, ) if err != nil { return err } } for nick, uri := range followMap { _, err = tx.Exec( insertFeed, urlNS.UUID5(uri), uri, nick, nil, refreshRate, ) if err != nil { return err } } return tx.Commit() } type feed struct { ID uuid URI string Nick string LastScanOn sql.NullTime RefreshRate int } func refreshLoop(c console) { maxInt := 3153600000 less := func(a, b *feed) bool { return a.LastScanOn.Time.Before(b.LastScanOn.Time) } queue := FibHeap(less) db := c.Get("db").(*sql.DB) res, err := db.QueryContext(c.Context, fetchFeeds) if err != nil { c.Log(err) c.abort() return } c.Log("load feeds") for res.Next() { var f feed err = res.Scan(&f.ID, &f.URI, &f.Nick, &f.LastScanOn, &f.RefreshRate) if err != nil { c.Log(err) c.abort() return } if !f.LastScanOn.Valid { f.LastScanOn.Time = time.Now() f.LastScanOn.Valid = true } else { f.LastScanOn.Time = f.LastScanOn.Time.Add(time.Duration(f.RefreshRate) * time.Second) } queue.Insert(&f) } c.Log("start refresh loop") for !queue.IsEmpty() { f := queue.ExtractMin() c.Log("next", f.URI, "last scan on", f.LastScanOn.Time) select { case <-c.Done(): return case <-time.After(time.Until(f.LastScanOn.Time)): c.Log("refresh", f.URI) } req, err := http.NewRequestWithContext(c.Context, "GET", f.URI, nil) if err != nil { c.Log(err) c.abort() return } resp, err := http.DefaultClient.Do(req) if err != nil { c.Log(err) _, err = db.ExecContext(c.Context, updateFeed, f.LastScanOn, maxInt, f.ID) if err != nil { c.Log(err) c.abort() return } continue } defer resp.Body.Close() err = loadFeed(db, resp.Body) if err != nil { _, err = db.ExecContext(c.Context, updateFeed, f.LastScanOn, maxInt, f.ID) if err != nil { c.Log(err) c.abort() return } continue } f.LastScanOn.Time = time.Now() db.ExecContext(c.Context, updateFeed, f.LastScanOn, f.RefreshRate, f.ID) f.LastScanOn.Time = f.LastScanOn.Time.Add(time.Duration(f.RefreshRate) * time.Second) c.Log("next scan", f.URI, "on", f.LastScanOn.Time) // queue.Insert(f) } }