chore: improve fetch functionality

This commit is contained in:
xuu 2024-11-11 19:13:34 -07:00
parent d8b87a0072
commit 0ea449264a
Signed by: xuu
GPG Key ID: 8B3B0604F164E04F
6 changed files with 310 additions and 80 deletions

3
.gitignore vendored
View File

@ -1,3 +1,4 @@
twt.db*
*.db*
feed
__debug*
feeds/

37
feed.go Normal file
View File

@ -0,0 +1,37 @@
package main
import (
"context"
"database/sql"
)
type Feed struct {
FeedID uuid
URI string
Nick string
LastScanOn sql.NullTime
RefreshRate int
LastModified sql.NullTime
LastError sql.NullString
ETag sql.NullString
DiscloseFeedURL string
DiscloseNick string
Version string
}
func (f *Feed) Save(ctx context.Context, db *sql.DB) error {
_, err := db.ExecContext(
ctx,
updateFeed,
f.LastScanOn,
f.RefreshRate,
f.LastModified,
f.ETag,
f.LastError,
f.FeedID,
)
return err
}

125
fetcher.go Normal file
View File

@ -0,0 +1,125 @@
package main
import (
"context"
"errors"
"fmt"
"net"
"net/http"
"strings"
"time"
)
var (
ErrUnmodified = errors.New("unmodified")
ErrPermanentlyDead = errors.New("permanently dead")
ErrTemporarilyDead = errors.New("temporarily dead")
ErrParseFailed = errors.New("parse failed")
)
type Response struct {
*http.Response
}
func (r *Response) ETag() string {
return r.Header.Get("ETag")
}
func (r *Response) Read(b []byte) (int, error) {
return r.Body.Read(b)
}
// Close closes the Response.Body, which is necessary to free up resources
func (r *Response) Close() {
r.Body.Close()
}
func (r *Response) ContentType() string {
return r.Header.Get("Content-Type")
}
func (r *Response) LastModified() time.Time {
lastModified := time.Now()
if lm, err := time.Parse(http.TimeFormat, r.Header.Get("Last-Modified")); err == nil {
lastModified = lm
}
return lastModified
}
type httpFetcher struct {
client *http.Client
}
func NewHTTPFetcher() *httpFetcher {
return &httpFetcher{
client: &http.Client{
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 5 * time.Second,
KeepAlive: 5 * time.Second,
}).DialContext,
ForceAttemptHTTP2: false,
MaxIdleConns: 100,
IdleConnTimeout: 10 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
},
},
}
}
func (f *httpFetcher) Fetch(ctx context.Context, request *Feed) (*Response, error) {
if strings.Contains(request.URI, "lublin.se") {
return nil, fmt.Errorf("%w: permaban: %s", ErrPermanentlyDead, request.URI)
}
req, err := http.NewRequestWithContext(ctx, "GET", request.URI, nil)
if err != nil {
return nil, fmt.Errorf("creating HTTP request failed: %w", err)
}
req.Header.Add("Accept", "text/plain")
if !request.LastModified.Valid {
req.Header.Add("If-Modified-Since", request.LastModified.Time.Format(http.TimeFormat))
}
if request.ETag.Valid {
req.Header.Add("If-None-Match", request.ETag.String)
}
if request.DiscloseFeedURL != "" && request.DiscloseNick != "" {
req.Header.Set("User-Agent", fmt.Sprintf("xt/%s (+%s; @%s)",
request.Version, request.DiscloseFeedURL, request.DiscloseNick))
} else {
req.Header.Set("User-Agent", fmt.Sprintf("xt/%s", request.Version))
}
res, err := f.client.Do(req)
if err != nil {
if errors.Is(err, &net.DNSError{}) {
return nil, fmt.Errorf("%w: %s", ErrTemporarilyDead, err)
}
return nil, fmt.Errorf("%w: %w", ErrPermanentlyDead, err)
}
response := &Response{
Response: res,
}
switch res.StatusCode {
case 200:
return response, nil
case 304:
return response, fmt.Errorf("%w: %s", ErrUnmodified, res.Status)
case 400, 406, 502, 503:
return response, fmt.Errorf("%w: %s", ErrTemporarilyDead, res.Status)
case 403, 404, 410:
return response, fmt.Errorf("%w: %s", ErrPermanentlyDead, res.Status)
default:
return response, errors.New(res.Status)
}
}

2
go.mod
View File

@ -12,7 +12,7 @@ require (
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/writeas/go-strip-markdown/v2 v2.1.1 // indirect
go.yarn.social/types v0.0.0-20230305013457-e4d91e351ac8 // indirect
go.yarn.social/types v0.0.0-20230305013457-e4d91e351ac8
golang.org/x/crypto v0.27.0 // indirect
golang.org/x/sys v0.25.0 // indirect
)

View File

@ -3,10 +3,12 @@ PRAGMA journal_mode=WAL;
create table if not exists feeds (
feed_id blob primary key,
uri text,
nick string,
domain string,
nick text,
last_scan_on timestamp,
refresh_rate int default 600
refresh_rate int default 600,
last_modified_on timestamp,
last_etag text,
last_error text
);
create table if not exists twts (

View File

@ -2,11 +2,14 @@ package main
import (
"database/sql"
"errors"
"fmt"
"io"
"net/http"
"iter"
"net/url"
"os"
"path/filepath"
"slices"
"strings"
"time"
@ -14,6 +17,7 @@ import (
_ "github.com/mattn/go-sqlite3"
"go.yarn.social/lextwt"
"go.yarn.social/types"
)
func run(c console) error {
@ -40,7 +44,7 @@ func run(c console) error {
return err
}
defer f.Close()
err = loadFeed(db, f)
err = loadFeed(db, &types.Twter{Nick: "xuu", URI: "https://txt.sour.is/users/xuu/twtxt.txt"}, f)
if err != nil {
return err
}
@ -72,26 +76,37 @@ var (
`
fetchFeeds = `
select feed_id, uri, nick, last_scan_on, refresh_rate from feeds
select
feed_id,
uri,
nick,
last_scan_on,
refresh_rate,
last_modified_on,
last_etag
from feeds
`
updateFeed = `
update feeds set
last_scan_on = ?,
refresh_rate = ?
refresh_rate = ?,
last_modified_on = ?,
last_etag = ?,
last_error = ?
where feed_id = ?
`
)
func loadFeed(db *sql.DB, feed io.Reader) error {
func loadFeed(db *sql.DB, twter *types.Twter, feed io.Reader) error {
loadTS := time.Now()
refreshRate := 600
f, err := lextwt.ParseFile(feed, nil)
f, err := lextwt.ParseFile(feed, twter)
if err != nil {
return err
return fmt.Errorf("%w: %w", ErrParseFailed, err)
}
feedID := urlNS.UUID5(f.Twter().HashingURI)
feedID := urlNS.UUID5(coalesce(f.Twter().HashingURI, f.Twter().URI))
tx, err := db.Begin()
if err != nil {
@ -102,12 +117,16 @@ func loadFeed(db *sql.DB, feed io.Reader) error {
followMap := make(map[string]string, len(followers))
for _, f := range f.Info().GetAll("follow") {
nick, uri, ok := strings.Cut(f.Value(), "http")
if !ok{
if !ok {
continue
}
nick = strings.TrimSpace(nick)
uri = "http" + strings.TrimSpace(uri)
if uri == "https://lublin.se/twtxt.txt" {
continue
}
if _, err := url.Parse(uri); err != nil {
continue
}
@ -174,106 +193,152 @@ func loadFeed(db *sql.DB, feed io.Reader) error {
return tx.Commit()
}
type feed struct {
ID uuid
URI string
Nick string
LastScanOn sql.NullTime
RefreshRate int
}
func refreshLoop(c console) {
maxInt := 3153600000
defer c.abort()
less := func(a, b *feed) bool {
TenYear := 3153600000 // 10 year
OneDay := 86400 // 1 day
TenMinutes := 600
fetch := NewHTTPFetcher()
less := func(a, b *Feed) bool {
return a.LastScanOn.Time.Before(b.LastScanOn.Time)
}
queue := FibHeap(less)
db := c.Get("db").(*sql.DB)
res, err := db.QueryContext(c.Context, fetchFeeds)
if err != nil {
c.Log(err)
c.abort()
return
}
c.Log("load feeds")
for res.Next() {
var f feed
err = res.Scan(&f.ID, &f.URI, &f.Nick, &f.LastScanOn, &f.RefreshRate)
if err != nil {
c.Log(err)
c.abort()
return
}
if !f.LastScanOn.Valid {
f.LastScanOn.Time = time.Now()
f.LastScanOn.Valid = true
} else {
f.LastScanOn.Time = f.LastScanOn.Time.Add(time.Duration(f.RefreshRate) * time.Second)
}
queue.Insert(&f)
}
c.Log("start refresh loop")
for !queue.IsEmpty() {
for c.Err() == nil {
if queue.IsEmpty() {
it, err := LoadFeeds(c)
for f := range it {
queue.Insert(&f)
}
if err != nil {
c.Log(err)
return
}
}
f := queue.ExtractMin()
c.Log("next", f.URI, "last scan on", f.LastScanOn.Time)
c.Log("queue size", queue.count, "next", f.URI, "last scan on", f.LastScanOn.Time.Format(time.RFC3339))
if time.Until(f.LastScanOn.Time) > 2*time.Hour {
c.Log("too soon", f.URI)
continue
}
select {
case <-c.Done():
return
case <-time.After(time.Until(f.LastScanOn.Time)):
c.Log("refresh", f.URI)
}
req, err := http.NewRequestWithContext(c.Context, "GET", f.URI, nil)
res, err := fetch.Fetch(c.Context, f)
if err != nil {
f.LastError.String, f.LastError.Valid = err.Error(), true
if errors.Is(err, ErrPermanentlyDead) {
f.RefreshRate = TenYear
}
if errors.Is(err, ErrTemporarilyDead) {
f.RefreshRate = OneDay
}
if errors.Is(err, ErrUnmodified) {
f.RefreshRate = OneDay
}
c.Log(err)
err = f.Save(c.Context, db)
if err != nil {
c.Log(err)
return
}
continue
}
f.ETag.String, f.ETag.Valid = res.ETag(), true
f.LastModified.Time, f.LastModified.Valid = res.LastModified(), true
cpy, err := os.OpenFile(filepath.Join("feeds", urlNS.UUID5(f.URI).MarshalText()), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600)
rdr := io.TeeReader(res.Body, cpy)
err = loadFeed(db, &types.Twter{Nick: f.Nick, URI: f.URI}, rdr)
if err != nil {
c.Log(err)
c.abort()
return
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
c.Log(err)
_, err = db.ExecContext(c.Context, updateFeed, f.LastScanOn, maxInt, f.ID)
if err != nil {
c.Log(err)
c.abort()
return
}
continue
}
defer resp.Body.Close()
err = loadFeed(db, resp.Body)
if err != nil {
_, err = db.ExecContext(c.Context, updateFeed, f.LastScanOn, maxInt, f.ID)
if err != nil {
c.Log(err)
c.abort()
return
}
continue
}
cpy.Close()
f.LastScanOn.Time = time.Now()
f.RefreshRate = TenMinutes
f.LastError.String = ""
db.ExecContext(c.Context, updateFeed, f.LastScanOn, f.RefreshRate, f.ID)
f.LastScanOn.Time = f.LastScanOn.Time.Add(time.Duration(f.RefreshRate) * time.Second)
c.Log("next scan", f.URI, "on", f.LastScanOn.Time)
// queue.Insert(f)
err = f.Save(c.Context, db)
if err != nil {
c.Log(err)
return
}
}
}
func LoadFeeds(c console) (iter.Seq[Feed], error) {
var err error
var res *sql.Rows
db := c.Get("db").(*sql.DB)
res, err = db.QueryContext(c.Context, fetchFeeds)
if err != nil {
return slices.Values([]Feed{}), err
}
c.Log("load feeds")
return func(yield func(Feed) bool) {
for res.Next() {
var f Feed
f.Version = "0.0.1"
err = res.Scan(
&f.FeedID,
&f.URI,
&f.Nick,
&f.LastScanOn,
&f.RefreshRate,
&f.LastModified,
&f.ETag,
)
if err != nil {
return
}
if !f.LastScanOn.Valid {
f.LastScanOn.Time = time.Now()
f.LastScanOn.Valid = true
} else {
f.LastScanOn.Time = f.LastScanOn.Time.Add(time.Duration(f.RefreshRate) * time.Second)
}
if !yield(f) {
return
}
}
}, err
}
func coalesce[T comparable](a T, values ...T) T {
var zero T
for _, v := range values {
if a == zero {
a = v
}
}
return a
}