373 lines
6.4 KiB
Go
373 lines
6.4 KiB
Go
package main
|
|
|
|
import (
|
|
"cmp"
|
|
"database/sql"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"iter"
|
|
"net/url"
|
|
"os"
|
|
"path/filepath"
|
|
"slices"
|
|
"strings"
|
|
"time"
|
|
|
|
_ "embed"
|
|
|
|
_ "github.com/mattn/go-sqlite3"
|
|
"go.yarn.social/lextwt"
|
|
"go.yarn.social/types"
|
|
)
|
|
|
|
func run(c console) error {
|
|
a := c.Args()
|
|
|
|
db, err := setupDB(c)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer db.Close()
|
|
|
|
c.Set("db", db)
|
|
|
|
f, err := os.Open(a.baseFeed)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
twtfile, err := lextwt.ParseFile(f, &types.Twter{
|
|
Nick: a.Nick,
|
|
URI: a.URI,
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("%w: %w", ErrParseFailed, err)
|
|
}
|
|
f.Close()
|
|
|
|
|
|
err = storeFeed(db, twtfile)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
c.Log("ready")
|
|
|
|
go refreshLoop(c)
|
|
|
|
<-c.Done()
|
|
|
|
return nil
|
|
}
|
|
|
|
var (
|
|
//go:embed init.sql
|
|
initSQL string
|
|
|
|
insertFeed = `
|
|
insert into feeds
|
|
(feed_id, uri, nick, last_scan_on, refresh_rate)
|
|
values (?, ?, ?, ?, ?)
|
|
ON CONFLICT (feed_id) DO NOTHING
|
|
`
|
|
|
|
insertTwt = `
|
|
insert into twts
|
|
(feed_id, hash, conv, dt, text, mentions, tags)
|
|
values (?, ?, ?, ?, ?, ?, ?)
|
|
ON CONFLICT (feed_id, hash) DO NOTHING
|
|
`
|
|
|
|
fetchFeeds = `
|
|
select
|
|
feed_id,
|
|
uri,
|
|
nick,
|
|
last_scan_on,
|
|
refresh_rate,
|
|
last_modified_on,
|
|
last_etag
|
|
from feeds
|
|
`
|
|
updateFeed = `
|
|
update feeds set
|
|
last_scan_on = ?,
|
|
refresh_rate = ?,
|
|
last_modified_on = ?,
|
|
last_etag = ?,
|
|
last_error = ?
|
|
where feed_id = ?
|
|
`
|
|
)
|
|
|
|
func setupDB(c console) (*sql.DB, error) {
|
|
a := c.Args()
|
|
db, err := sql.Open(a.dbtype, a.dbfile)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
for _, stmt := range strings.Split(initSQL, ";") {
|
|
_, err = db.ExecContext(c, stmt)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
return db, nil
|
|
}
|
|
|
|
func refreshLoop(c console) {
|
|
defer c.abort()
|
|
|
|
TenYear := 3153600000 // 10 year
|
|
OneDay := 86400 // 1 day
|
|
TenMinutes := 600 // 10 mins
|
|
|
|
fetch := NewHTTPFetcher()
|
|
|
|
queue := FibHeap(func(a, b *Feed) bool {
|
|
return a.LastScanOn.Time.Before(b.LastScanOn.Time)
|
|
})
|
|
|
|
db := c.Get("db").(*sql.DB)
|
|
|
|
c.Log("start refresh loop")
|
|
for c.Err() == nil {
|
|
if queue.IsEmpty() {
|
|
it, err := LoadFeeds(c)
|
|
for f := range it {
|
|
queue.Insert(&f)
|
|
}
|
|
if err != nil {
|
|
c.Log(err)
|
|
return
|
|
}
|
|
}
|
|
|
|
f := queue.ExtractMin()
|
|
|
|
c.Log("queue size", queue.count, "next", f.URI, "last scan on", f.LastScanOn.Time.Format(time.RFC3339))
|
|
|
|
if time.Until(f.LastScanOn.Time) > 2*time.Hour {
|
|
c.Log("too soon", f.URI)
|
|
continue
|
|
}
|
|
|
|
select {
|
|
case <-c.Done():
|
|
return
|
|
case <-time.After(time.Until(f.LastScanOn.Time)):
|
|
}
|
|
|
|
res, err := fetch.Fetch(c.Context, f)
|
|
if err != nil {
|
|
f.LastError.String, f.LastError.Valid = err.Error(), true
|
|
if errors.Is(err, ErrPermanentlyDead) {
|
|
f.RefreshRate = TenYear
|
|
}
|
|
if errors.Is(err, ErrTemporarilyDead) {
|
|
f.RefreshRate = OneDay
|
|
}
|
|
if errors.Is(err, ErrUnmodified) {
|
|
f.RefreshRate = OneDay
|
|
}
|
|
|
|
c.Log(err)
|
|
err = f.Save(c.Context, db)
|
|
if err != nil {
|
|
c.Log(err)
|
|
return
|
|
}
|
|
|
|
continue
|
|
}
|
|
|
|
f.ETag.String, f.ETag.Valid = res.ETag(), true
|
|
f.LastModified.Time, f.LastModified.Valid = res.LastModified(), true
|
|
|
|
cpy, err := os.OpenFile(filepath.Join("feeds", urlNS.UUID5(f.URI).MarshalText()), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600)
|
|
rdr := io.TeeReader(res.Body, cpy)
|
|
|
|
twtfile, err := lextwt.ParseFile(rdr, &types.Twter{Nick: f.Nick, URI: f.URI})
|
|
if err != nil {
|
|
c.Log(fmt.Errorf("%w: %w", ErrParseFailed, err))
|
|
return
|
|
}
|
|
|
|
if prev, ok :=twtfile.Info().GetN("prev", 0); ok {
|
|
_, part, ok := strings.Cut(prev.Value(), " ")
|
|
if ok {
|
|
|
|
part = f.URI[:strings.LastIndex(f.URI, "/")+1] + part
|
|
queue.Insert(&Feed{
|
|
FetchURI: part,
|
|
URI: f.URI,
|
|
Nick: f.Nick,
|
|
LastScanOn: f.LastScanOn,
|
|
RefreshRate: f.RefreshRate,
|
|
})
|
|
}
|
|
}
|
|
|
|
err = storeFeed(db, twtfile)
|
|
if err != nil {
|
|
c.Log(err)
|
|
return
|
|
}
|
|
|
|
cpy.Close()
|
|
|
|
f.LastScanOn.Time = time.Now()
|
|
f.RefreshRate = TenMinutes
|
|
f.LastError.String = ""
|
|
|
|
err = f.Save(c.Context, db)
|
|
if err != nil {
|
|
c.Log(err)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func storeFeed(db *sql.DB, f types.TwtFile) error {
|
|
loadTS := time.Now()
|
|
refreshRate := 600
|
|
|
|
feedID := urlNS.UUID5(cmp.Or(f.Twter().HashingURI, f.Twter().URI))
|
|
|
|
tx, err := db.Begin()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
followers := f.Info().GetAll("follow")
|
|
followMap := make(map[string]string, len(followers))
|
|
for _, f := range f.Info().GetAll("follow") {
|
|
nick, uri, ok := strings.Cut(f.Value(), "http")
|
|
if !ok {
|
|
continue
|
|
}
|
|
nick = strings.TrimSpace(nick)
|
|
uri = "http" + strings.TrimSpace(uri)
|
|
|
|
if _, err := url.Parse(uri); err != nil {
|
|
continue
|
|
}
|
|
|
|
followMap[nick] = uri
|
|
}
|
|
|
|
defer tx.Rollback()
|
|
|
|
_, err = tx.Exec(
|
|
insertFeed,
|
|
feedID,
|
|
f.Twter().HashingURI,
|
|
f.Twter().DomainNick(),
|
|
loadTS,
|
|
refreshRate,
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, twt := range f.Twts() {
|
|
mentions := make(uuids, 0, len(twt.Mentions()))
|
|
for _, mention := range twt.Mentions() {
|
|
followMap[mention.Twter().Nick] = mention.Twter().URI
|
|
mentions = append(mentions, urlNS.UUID5(mention.Twter().URI))
|
|
}
|
|
|
|
tags := make(strList, 0, len(twt.Tags()))
|
|
for _, tag := range twt.Tags() {
|
|
tags = append(tags, tag.Text())
|
|
}
|
|
|
|
subject := twt.Subject()
|
|
subjectTag := ""
|
|
if subject != nil {
|
|
if tag, ok := subject.Tag().(*lextwt.Tag); ok && tag != nil {
|
|
subjectTag = tag.Text()
|
|
}
|
|
}
|
|
|
|
_, err = tx.Exec(
|
|
insertTwt,
|
|
feedID,
|
|
twt.Hash(),
|
|
subjectTag,
|
|
twt.Created(),
|
|
fmt.Sprint(twt),
|
|
mentions.ToStrList(),
|
|
tags,
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
for nick, uri := range followMap {
|
|
_, err = tx.Exec(
|
|
insertFeed,
|
|
urlNS.UUID5(uri),
|
|
uri,
|
|
nick,
|
|
nil,
|
|
refreshRate,
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return tx.Commit()
|
|
}
|
|
|
|
func LoadFeeds(c console) (iter.Seq[Feed], error) {
|
|
var err error
|
|
var res *sql.Rows
|
|
|
|
db := c.Get("db").(*sql.DB)
|
|
res, err = db.QueryContext(c.Context, fetchFeeds)
|
|
|
|
if err != nil {
|
|
return slices.Values([]Feed{}), err
|
|
}
|
|
|
|
c.Log("load feeds")
|
|
|
|
return func(yield func(Feed) bool) {
|
|
for res.Next() {
|
|
var f Feed
|
|
f.Version = "0.0.1"
|
|
err = res.Scan(
|
|
&f.FeedID,
|
|
&f.URI,
|
|
&f.Nick,
|
|
&f.LastScanOn,
|
|
&f.RefreshRate,
|
|
&f.LastModified,
|
|
&f.ETag,
|
|
)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
if !f.LastScanOn.Valid {
|
|
f.LastScanOn.Time = time.Now()
|
|
f.LastScanOn.Valid = true
|
|
} else {
|
|
f.LastScanOn.Time = f.LastScanOn.Time.Add(time.Duration(f.RefreshRate) * time.Second)
|
|
}
|
|
|
|
f.FetchURI = f.URI
|
|
|
|
if !yield(f) {
|
|
return
|
|
}
|
|
}
|
|
}, err
|
|
}
|