Compare commits

..

14 Commits

Author SHA1 Message Date
xuu
303ca5a2db Merge pull request 'add-otel' (#3) from add-otel into main
Reviewed-on: #3
2025-03-26 20:49:41 -06:00
xuu
22d77d6aef
chore: fix 2025-03-26 19:03:20 -06:00
xuu
a0614435ff
chore: deps 2025-03-26 18:58:59 -06:00
xuu
fe28b7c2ad
chore: go fmt 2025-03-26 18:57:09 -06:00
xuu
b34c9bc99f
chore: changes to feed 2025-03-26 18:54:44 -06:00
xuu
5c97bfb182
chore: add twt endpoints 2025-03-25 18:52:30 -06:00
xuu
2fdc43b7de
chore: add last twt on support 2025-03-25 17:05:21 -06:00
xuu
2de06ec4d9
chore: adjust timing 2025-03-24 22:29:18 -06:00
xuu
dae57540e3
chore: refine checkTemp 2025-03-24 18:28:57 -06:00
xuu
7ab409403f
chore: add 500 status 2025-03-24 17:18:46 -06:00
xuu
8ae61d4a27
chore: fix queue sort, add feed temp 2025-03-24 17:01:50 -06:00
xuu
42a9b26b22
chore: changes for otel and fixes to loop 2025-03-24 16:51:18 -06:00
xuu
42fe9176b7
chore: many fixes to code 2025-03-13 22:36:24 -06:00
xuu
ed5b43300b
feat: add otel 2025-02-24 17:28:09 -07:00
16 changed files with 1680 additions and 450 deletions

4
.gitignore vendored
View File

@ -2,3 +2,7 @@
feed feed
__debug* __debug*
feeds/ feeds/
/xt
.env
*.txt
*.txt.xz

View File

@ -1,84 +0,0 @@
.ce
Preamble
.sp
We, the people of the United States, in order
to form a more perfect Union, establish justice, insure
domestic tranquility, provide for the common defense, promote
the general welfare,
and secure the blessing of liberty to ourselves and our
posterity do ordain and establish this Constitution for the
United States of America.
.sp
.nr aR 0 1
.af aR I
.de AR
.ce
.ps 16
.ft B
Article \\n+(aR
.nr sE 0 1
.af sE 1
.ps 12
.ft P
..
.de SE
.sp
.ft B
\\s-2SECTION \\n+(sE:\\s+2
.ft P
.nr pP 0 1
.af pP 1
..
.de PP
.sp
.ft I
\\s-3Paragraph \\n+(pP:\\s+3
.ft P
..
.AR
.SE
Legislative powers; in whom vested:
.PP
All legislative powers herein granted shall be vested in a
Congress of the United States, which shall consist of a Senate
and a House of Representatives.
.SE
House of Representatives, how and by whom chosen, Qualifications
of a Representative. Representatives and direct taxes, how
apportioned. Enumeration. Vacancies to be filled. Power of
choosing officers and of impeachment.
.PP
The House of Representatives shall be composed of members chosen
every second year by the people of the several states, and the
electors in each State shall have the qualifications requisite
for electors of the most numerous branch of the State Legislature.
.PP
No person shall be a Representative who shall not have attained
to the age of twenty-five years, and been seven years a citizen
of the United States, and who shall not, when elected, be an
inhabitant of that State in which he shall be chosen.
.PP
Representatives and direct taxes shall be apportioned among the
several States which maybe included within this Union, according
to their respective numbers, which shall be determined by adding
to the whole number of free persons, including those bound for
service for a term of years, and excluding Indians not taxed,
three-fifths of all other persons. The actual enumeration shall
be made within three years after the first meeting of the
Congress of the United States, and within every subsequent term
of ten years, in such manner as they shall by law direct. The
number of Representatives shall not exceed one for every thirty
thousand, but each State shall have at least one Representative;
and until such enumeration shall be made, the State of New
Hampshire shall be entitled to choose three, Massachusetts eight,
Rhode Island and Providence Plantations one, Connecticut
five, New York six, New Jersey four, Pennsylvania eight,
Delaware one, Maryland six, Virginia ten, North Carolina five,
South Carolina five, and Georgia three.
.PP
When vacancies happen in the representation from any State, the
Executive Authority thereof shall issue writs of election to fill
such vacancies.
.PP
The House of Representatives shall choose their Speaker and other
officers; and shall have the sole power of impeachment.

195
app.go Normal file
View File

@ -0,0 +1,195 @@
package main
import (
"context"
"database/sql"
"fmt"
"iter"
"os"
"runtime/debug"
"strconv"
"strings"
"sync"
_ "embed"
_ "github.com/mattn/go-sqlite3"
"github.com/uptrace/opentelemetry-go-extra/otelsql"
"go.opentelemetry.io/otel/attribute"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
"go.opentelemetry.io/otel/trace"
"go.sour.is/xt/internal/otel"
"go.yarn.social/lextwt"
"go.yarn.social/types"
"golang.org/x/sync/errgroup"
)
func run(c *console) error {
ctx, span := otel.Span(c.Context)
defer span.End()
bi, _ := debug.ReadBuildInfo()
span.AddEvent(name, trace.WithAttributes(attribute.String("version", bi.Main.Version)))
a := c.Args()
app := &appState{
args: a,
feeds: sync.Map{},
queue: FibHeap(func(a, b *Feed) bool {
return a.NextScanOn.Time.Before(b.NextScanOn.Time)
}),
}
// Setup DB
err := func(ctx context.Context) error {
ctx, span := otel.Span(ctx)
defer span.End()
db, err := app.DB(ctx)
if err != nil {
return err
}
defer db.Close()
for _, stmt := range strings.Split(initSQL, ";") {
_, err = db.ExecContext(ctx, stmt)
if err != nil {
return err
}
}
return nil
}(ctx)
if err != nil {
return err
}
// Seed File
err = func(ctx context.Context) error {
ctx, span := otel.Span(ctx)
defer span.End()
f, err := os.Open(a.baseFeed)
if err != nil {
return err
}
defer f.Close()
twtfile, err := lextwt.ParseFile(f, &types.Twter{
Nick: a.Nick,
URI: a.URI,
})
if err != nil {
return fmt.Errorf("%w: %w", ErrParseFailed, err)
}
db, err := app.DB(ctx)
if err != nil {
return err
}
defer db.Close()
return storeFeed(ctx, db, twtfile)
}(ctx)
if err != nil {
return err
}
wg, ctx := errgroup.WithContext(ctx)
c.Context = ctx
wg.Go(func() error {
return feedRefreshProcessor(c, app)
})
go httpServer(c, app)
err = wg.Wait()
if err != nil {
return err
}
return c.Context.Err()
}
type appState struct {
args args
feeds sync.Map
queue *fibHeap[Feed]
}
type db struct {
*sql.DB
Version string
Params map[string]string
MaxLength int
MaxVariableNumber int
}
func (app *appState) DB(ctx context.Context) (db, error) {
// return sql.Open(app.args.dbtype, app.args.dbfile)
var err error
db := db{Params: make(map[string]string)}
db.DB, err = otelsql.Open(app.args.dbtype, app.args.dbfile,
otelsql.WithAttributes(semconv.DBSystemSqlite),
otelsql.WithDBName("xt"))
if err != nil {
return db, err
}
rows, err := db.DB.QueryContext(ctx, `select sqlite_version()`)
if err != nil {
return db, err
}
if rows.Next() {
rows.Scan(&db.Version)
}
if err = rows.Err(); err != nil {
return db, err
}
rows, err = db.DB.QueryContext(ctx, `pragma compile_options`)
if err != nil {
return db, err
}
for rows.Next() {
var key, value string
rows.Scan(&key)
key, value, _ = strings.Cut(key, "=")
db.Params[key] = value
}
if err = rows.Err(); err != nil {
return db, err
}
if m, ok := db.Params["MAX_VARIABLE_NUMBER"]; ok {
db.MaxVariableNumber, _ = strconv.Atoi(m)
}
if m, ok := db.Params["MAX_LENGTH"]; ok {
db.MaxLength, _ = strconv.Atoi(m)
}
return db, err
}
func (app *appState) Feed(feedID string) *Feed {
return nil
}
func (app *appState) Feeds() iter.Seq2[string, *Feed] {
return func(yield func(string, *Feed) bool) {
app.feeds.Range(func(k, v any) bool {
key, _ := k.(string)
value, ok := v.(*Feed)
if !ok {
return false
}
if !yield(key, value) {
return false
}
return true
})
}
}

92
cmd/load/main.go Normal file
View File

@ -0,0 +1,92 @@
package main
import (
"context"
"database/sql"
"fmt"
"hash/fnv"
"iter"
"os"
"github.com/oklog/ulid/v2"
"github.com/uptrace/opentelemetry-go-extra/otelsql"
"go.yarn.social/lextwt"
"go.yarn.social/types"
)
func main() {
in := os.Stdin
if len(os.Args) != 2 {
fmt.Fprint(os.Stderr, "usage: ", os.Args[0], "[db file]")
}
db, err := DB(context.Background(), os.Args[1])
if err != nil {
panic(err)
}
_ = db
for line := range lextwt.IterRegistry(in) {
_ = line
}
}
const MaxVariableNumber = 32766
func DB(ctx context.Context, cnx string) (*sql.DB, error) {
// return sql.Open(app.args.dbtype, app.args.dbfile)
db, err := otelsql.Open("sqlite", cnx)
if err != nil {
return db, err
}
return db, err
}
func makeULID(twt types.Twt) ulid.ULID {
h64 := fnv.New64a()
h16 := fnv.New32a()
text := []byte(fmt.Sprintf("%+l", twt))
b := make([]byte, 10)
copy(b, h16.Sum(text)[:2])
copy(b[2:], h64.Sum(text))
u := ulid.ULID{}
u.SetTime(ulid.Timestamp(twt.Created()))
u.SetEntropy(b)
return u
}
func chunk(args []any, qry func(int) (string, int), maxArgs int) iter.Seq2[string, []any] {
_, size := qry(1)
itemsPerIter := maxArgs / size
if len(args) < size {
return func(yield func(string, []any) bool) {}
}
if len(args) < maxArgs {
return func(yield func(string, []any) bool) {
query, _ := qry(len(args) / size)
yield(query, args)
}
}
return func(yield func(string, []any) bool) {
for len(args) > 0 {
if len(args) > maxArgs {
query, size := qry(itemsPerIter)
if !yield(query, args[:size]) {
return
}
args = args[size:]
continue
}
query, _ := qry(len(args) / size)
yield(query, args)
return
}
}
}

357
feed.go
View File

@ -4,7 +4,9 @@ import (
"cmp" "cmp"
"context" "context"
"database/sql" "database/sql"
"database/sql/driver"
"fmt" "fmt"
"hash/fnv"
"iter" "iter"
"net/http" "net/http"
"net/url" "net/url"
@ -14,28 +16,31 @@ import (
_ "embed" _ "embed"
"github.com/oklog/ulid/v2"
"go.sour.is/xt/internal/otel"
"go.yarn.social/lextwt" "go.yarn.social/lextwt"
"go.yarn.social/types" "go.yarn.social/types"
) )
type Feed struct { type Feed struct {
FeedID uuid FeedID uuid
FetchURI string ParentID uuid
HashURI string
URI string URI string
Nick string Nick string
LastScanOn sql.NullTime State State
LastScanOn TwtTime
RefreshRate int RefreshRate int
NextScanOn TwtTime
LastTwtOn TwtTime
LastModified sql.NullTime LastModified TwtTime
LastError sql.NullString LastError sql.NullString
ETag sql.NullString ETag sql.NullString
Version string Version string
DiscloseFeedURL string DiscloseFeedURL string
DiscloseNick string DiscloseNick string
FirstFetch bool
State State
} }
type State string type State string
@ -46,40 +51,34 @@ const (
Cold State = "cold" Cold State = "cold"
Warm State = "warm" Warm State = "warm"
Hot State = "hot" Hot State = "hot"
Once State = "once"
) )
var ( var (
//go:embed init.sql //go:embed init.sql
initSQL string initSQL string
insertFeed = ` insertFeed = func(r int) (string, int) {
insert into feeds repeat := ""
(feed_id, uri, nick, last_scan_on, refresh_rate) if r > 1 {
values (?, ?, ?, ?, ?) repeat = strings.Repeat(", (?, ?, ?, ?, ?, ?, ?)", r-1)
ON CONFLICT (feed_id) DO NOTHING }
` return `
insert into feeds (
insertTwt = `
insert into twts
(feed_id, hash, conv, dt, text, mentions, tags)
values (?, ?, ?, ?, ?, ?, ?)
ON CONFLICT (feed_id, hash) DO NOTHING
`
fetchFeeds = `
select
feed_id, feed_id,
uri, parent_id,
nick, nick,
uri,
state,
last_scan_on, last_scan_on,
refresh_rate, refresh_rate
last_modified_on, )
last_etag values (?, ?, ?, ?, ?, ?, ?)` + repeat + `
from feeds ON CONFLICT (feed_id) DO NOTHING`, r * 7
where datetime(last_scan_on, '+'||refresh_rate||' seconds') < datetime(current_timestamp, '+10 minutes') }
`
updateFeed = ` updateFeed = `
update feeds set update feeds set
state = ?,
last_scan_on = ?, last_scan_on = ?,
refresh_rate = ?, refresh_rate = ?,
last_modified_on = ?, last_modified_on = ?,
@ -87,19 +86,88 @@ var (
last_error = ? last_error = ?
where feed_id = ? where feed_id = ?
` `
insertTwt = func(r int) (string, int) {
repeat := ""
if r > 1 {
repeat = strings.Repeat(", (?, ?, ?, ?, ?, ?, ?)", r-1)
}
return `
insert into twts
(feed_id, ulid, text, hash, conv, mentions, tags)
values (?, ?, ?, ?, ?, ?, ?)` + repeat + `
ON CONFLICT (feed_id, ulid) DO NOTHING`, r * 7
}
fetchFeeds = `
select
feed_id,
parent_id,
coalesce(hashing_uri, uri) hash_uri,
uri,
nick,
state,
last_scan_on,
strftime(
'%Y-%m-%dT%H:%M:%fZ',
coalesce(last_scan_on, '1901-01-01'),
'+'||abs(refresh_rate + cast(random() % 30 as int))||' seconds'
) next_scan_on,
coalesce(last_twt_on, '1901-01-01T00:00:00Z') last_twt_on,
refresh_rate,
last_modified_on,
last_etag
from feeds
left join (
select feed_id, max(strftime('%Y-%m-%dT%H:%M:%fZ', (substring(text, 1, instr(text, ' ')-1)))) last_twt_on
from twts group by feed_id
) using (feed_id)
left join (
select
feed_id parent_id,
uri hashing_uri
from feeds
where parent_id is null
) using (parent_id)
where datetime(
coalesce(last_scan_on, '1901-01-01'),
'+'||abs(refresh_rate+cast(random()%30 as int))||' seconds'
) < datetime(current_timestamp, '+3 minutes')
`
) )
func (f *Feed) Save(ctx context.Context, db *sql.DB) error { func (f *Feed) Create(ctx context.Context, db db) error {
fmt.Println(f.FetchURI, " ", f.LastModified, " ", f.LastError) ctx, span := otel.Span(ctx)
defer span.End()
query, _ := insertFeed(1)
_, err := db.ExecContext(
ctx,
query,
f.FeedID, // feed_id
f.ParentID, // parent_id
f.Nick, // nick
f.URI, // uri
f.State, // state
f.LastScanOn, // last_scan_on
f.RefreshRate, // refresh_rate
)
return err
}
func (f *Feed) Save(ctx context.Context, db db) error {
ctx, span := otel.Span(ctx)
defer span.End()
_, err := db.ExecContext( _, err := db.ExecContext(
ctx, ctx,
updateFeed, updateFeed,
f.LastScanOn, f.State, // state
f.RefreshRate, f.LastScanOn, // last_scan_on
f.LastModified, f.RefreshRate, // refresh_rate
f.ETag, f.LastModified, // last_modified_on
f.LastError, f.ETag, // last_etag
f.FeedID, f.LastError, // last_error
f.FeedID, // feed_id
) )
return err return err
} }
@ -111,9 +179,14 @@ func (f *Feed) Scan(res interface{ Scan(...any) error }) error {
f.Version = "0.0.1" f.Version = "0.0.1"
err = res.Scan( err = res.Scan(
&f.FeedID, &f.FeedID,
&f.ParentID,
&f.HashURI,
&f.URI, &f.URI,
&f.Nick, &f.Nick,
&f.State,
&f.LastScanOn, &f.LastScanOn,
&f.NextScanOn,
&f.LastTwtOn,
&f.RefreshRate, &f.RefreshRate,
&f.LastModified, &f.LastModified,
&f.ETag, &f.ETag,
@ -122,19 +195,12 @@ func (f *Feed) Scan(res interface{ Scan(...any) error }) error {
return err return err
} }
if !f.LastScanOn.Valid {
f.FirstFetch = true
f.LastScanOn.Time = time.Now()
f.LastScanOn.Valid = true
} else {
f.LastScanOn.Time = f.LastScanOn.Time.Add(time.Duration(f.RefreshRate) * time.Second)
}
f.FetchURI = f.URI
return err return err
} }
func loadFeeds(ctx context.Context, db *sql.DB) (iter.Seq[Feed], error) { func loadFeeds(ctx context.Context, db db) (iter.Seq[Feed], error) {
ctx, span := otel.Span(ctx)
var err error var err error
var res *sql.Rows var res *sql.Rows
@ -145,10 +211,13 @@ func loadFeeds(ctx context.Context, db *sql.DB) (iter.Seq[Feed], error) {
} }
return func(yield func(Feed) bool) { return func(yield func(Feed) bool) {
defer span.End()
for res.Next() { for res.Next() {
var f Feed var f Feed
err = f.Scan(res) err = f.Scan(res)
if err != nil { if err != nil {
span.RecordError(err)
return return
} }
if !yield(f) { if !yield(f) {
@ -158,13 +227,16 @@ func loadFeeds(ctx context.Context, db *sql.DB) (iter.Seq[Feed], error) {
}, err }, err
} }
func storeFeed(db *sql.DB, f types.TwtFile) error { func storeFeed(ctx context.Context, db db, f types.TwtFile) error {
ctx, span := otel.Span(ctx)
defer span.End()
loadTS := time.Now() loadTS := time.Now()
refreshRate := 600 refreshRate := 600
feedID := urlNS.UUID5(cmp.Or(f.Twter().HashingURI, f.Twter().URI)) feedID := urlNS.UUID5(cmp.Or(f.Twter().HashingURI, f.Twter().URI))
tx, err := db.Begin() tx, err := db.BeginTx(ctx, nil)
if err != nil { if err != nil {
return err return err
} }
@ -188,19 +260,11 @@ func storeFeed(db *sql.DB, f types.TwtFile) error {
defer tx.Rollback() defer tx.Rollback()
_, err = tx.Exec( twts := f.Twts()
insertFeed, _, size := insertTwt(len(twts))
feedID, args := make([]any, 0, size)
f.Twter().HashingURI,
f.Twter().DomainNick(),
loadTS,
refreshRate,
)
if err != nil {
return err
}
for _, twt := range f.Twts() { for _, twt := range twts {
mentions := make(uuids, 0, len(twt.Mentions())) mentions := make(uuids, 0, len(twt.Mentions()))
for _, mention := range twt.Mentions() { for _, mention := range twt.Mentions() {
followMap[mention.Twter().Nick] = mention.Twter().URI followMap[mention.Twter().Nick] = mention.Twter().URI
@ -219,30 +283,83 @@ func storeFeed(db *sql.DB, f types.TwtFile) error {
subjectTag = tag.Text() subjectTag = tag.Text()
} }
} }
args = append(
args,
feedID, // feed_id
makeULID(twt), // ulid
fmt.Sprintf("%+l", twt), // text
subjectTag, // conv
twt.Hash(), // hash
mentions.ToStrList(), // mentions
tags, // tags
)
}
for query, args := range chunk(args, insertTwt, db.MaxVariableNumber) {
fmt.Println("store", f.Twter().URI, len(args))
_, err = tx.Exec( _, err = tx.ExecContext(
insertTwt, ctx,
feedID, query,
twt.Hash(), args...,
subjectTag,
twt.Created(),
fmt.Sprint(twt),
mentions.ToStrList(),
tags,
) )
if err != nil { if err != nil {
return err return err
} }
} }
args = args[:0]
args = append(args,
feedID, // feed_id
nil, // parent_id
f.Twter().DomainNick(), // nick
f.Twter().URI, // uri
"warm", // state
TwtTime{Time: loadTS, Valid: true}, // last_scan_on
refreshRate, // refresh_rate
)
if prev, ok := f.Info().GetN("prev", 0); ok {
_, part, ok := strings.Cut(prev.Value(), " ")
if ok {
uri := f.Twter().URI
if u, ok := f.Info().GetN("url", 0); ok {
uri = u.Value()
}
if u, ok := f.Info().GetN("uri", 0); ok {
uri = u.Value()
}
part = uri[:strings.LastIndex(uri, "/")+1] + part
childID := urlNS.UUID5(part)
fmt.Println("found prev", uri, part)
args = append(args,
childID, // feed_id
feedID, // parent_id
f.Twter().DomainNick(), // nick
part, // uri
"once", // state
nil, // last_scan_on
0, // refresh_rate
)
}
}
for nick, uri := range followMap { for nick, uri := range followMap {
_, err = tx.Exec( args = append(args,
insertFeed, urlNS.UUID5(uri), // feed_id
urlNS.UUID5(uri), nil, // parent_id
uri, nick, // nick
nick, uri, // uri
nil, "warm", // state
refreshRate, nil, // last_scan_on
refreshRate, // refresh_rate
)
}
for query, args := range chunk(args, insertFeed, db.MaxVariableNumber) {
_, err = tx.ExecContext(
ctx,
query,
args...,
) )
if err != nil { if err != nil {
return err return err
@ -253,12 +370,14 @@ func storeFeed(db *sql.DB, f types.TwtFile) error {
} }
func (feed *Feed) MakeHTTPRequest(ctx context.Context) (*http.Request, error) { func (feed *Feed) MakeHTTPRequest(ctx context.Context) (*http.Request, error) {
feed.State = "fetch" if strings.Contains(feed.URI, "lublin.se") {
if strings.Contains(feed.FetchURI, "lublin.se") { return nil, fmt.Errorf("%w: permaban: %s", ErrPermanentlyDead, feed.URI)
}
if strings.Contains(feed.URI, "enotty.dk") {
return nil, fmt.Errorf("%w: permaban: %s", ErrPermanentlyDead, feed.URI) return nil, fmt.Errorf("%w: permaban: %s", ErrPermanentlyDead, feed.URI)
} }
req, err := http.NewRequestWithContext(ctx, "GET", feed.FetchURI, nil) req, err := http.NewRequestWithContext(ctx, "GET", feed.URI, nil)
if err != nil { if err != nil {
return nil, fmt.Errorf("creating HTTP request failed: %w", err) return nil, fmt.Errorf("creating HTTP request failed: %w", err)
} }
@ -282,3 +401,81 @@ func (feed *Feed) MakeHTTPRequest(ctx context.Context) (*http.Request, error) {
return req, nil return req, nil
} }
type TwtTime struct {
Time time.Time
Valid bool // Valid is true if Time is not NULL
}
// Scan implements the [Scanner] interface.
func (n *TwtTime) Scan(value any) error {
var err error
switch value := value.(type) {
case nil:
n.Time, n.Valid = time.Time{}, false
return nil
case string:
n.Valid = true
n.Time, err = time.Parse(time.RFC3339, value)
case time.Time:
n.Valid = true
n.Time = value
}
return err
}
// Value implements the [driver.Valuer] interface.
func (n TwtTime) Value() (driver.Value, error) {
if !n.Valid {
return nil, nil
}
return n.Time.Format(time.RFC3339), nil
}
func makeULID(twt types.Twt) ulid.ULID {
h64 := fnv.New64a()
h16 := fnv.New32a()
text := []byte(fmt.Sprintf("%+l", twt))
b := make([]byte, 10)
copy(b, h16.Sum(text)[:2])
copy(b[2:], h64.Sum(text))
u := ulid.ULID{}
u.SetTime(ulid.Timestamp(twt.Created()))
u.SetEntropy(b)
return u
}
func chunk(args []any, qry func(int) (string, int), maxArgs int) iter.Seq2[string, []any] {
_, size := qry(1)
itemsPerIter := maxArgs / size
if len(args) < size {
return func(yield func(string, []any) bool) {}
}
if len(args) < maxArgs {
return func(yield func(string, []any) bool) {
query, _ := qry(len(args) / size)
yield(query, args)
}
}
return func(yield func(string, []any) bool) {
for len(args) > 0 {
if len(args) > maxArgs {
query, size := qry(itemsPerIter)
if !yield(query, args[:size]) {
return
}
args = args[size:]
continue
}
query, _ := qry(len(args) / size)
yield(query, args)
return
}
}
}

View File

@ -8,6 +8,10 @@ import (
"net/http" "net/http"
"sync" "sync"
"time" "time"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.sour.is/xt/internal/otel"
) )
var ( var (
@ -48,10 +52,18 @@ func (r *Response) LastModified() time.Time {
type httpFetcher struct { type httpFetcher struct {
client *http.Client client *http.Client
m_fetch_status metric.Int64Counter
m_fetch_second metric.Float64Histogram
} }
func NewHTTPFetcher() *httpFetcher { func NewHTTPFetcher() *httpFetcher {
fetch_total, _ := otel.Meter().Int64Counter("xt_fetch_status_total")
fetch_second, _ := otel.Meter().Float64Histogram("xt_fetch_seconds")
return &httpFetcher{ return &httpFetcher{
m_fetch_status: fetch_total,
m_fetch_second: fetch_second,
client: &http.Client{ client: &http.Client{
Transport: &http.Transport{ Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment, Proxy: http.ProxyFromEnvironment,
@ -62,7 +74,7 @@ func NewHTTPFetcher() *httpFetcher {
ForceAttemptHTTP2: false, ForceAttemptHTTP2: false,
MaxIdleConns: 100, MaxIdleConns: 100,
IdleConnTimeout: 10 * time.Second, IdleConnTimeout: 10 * time.Second,
TLSHandshakeTimeout: 10 * time.Second, TLSHandshakeTimeout: 5 * time.Second,
ExpectContinueTimeout: 1 * time.Second, ExpectContinueTimeout: 1 * time.Second,
}, },
}, },
@ -70,6 +82,16 @@ func NewHTTPFetcher() *httpFetcher {
} }
func (f *httpFetcher) Fetch(ctx context.Context, request *Feed) *Response { func (f *httpFetcher) Fetch(ctx context.Context, request *Feed) *Response {
ctx, span := otel.Span(ctx)
defer span.End()
start := time.Now()
defer func() {
since := time.Since(start)
f.m_fetch_second.Record(ctx, since.Seconds())
}()
defer fmt.Println("fetch done", request.URI)
response := &Response{ response := &Response{
Request: request, Request: request,
} }
@ -79,8 +101,9 @@ func (f *httpFetcher) Fetch(ctx context.Context, request *Feed) *Response {
response.err = err response.err = err
return response return response
} }
span.AddEvent("start request")
res, err := f.client.Do(req) res, err := f.client.Do(req)
span.AddEvent("got response")
if err != nil { if err != nil {
if errors.Is(err, &net.DNSError{}) { if errors.Is(err, &net.DNSError{}) {
response.err = fmt.Errorf("%w: %s", ErrTemporarilyDead, err) response.err = fmt.Errorf("%w: %s", ErrTemporarilyDead, err)
@ -93,17 +116,24 @@ func (f *httpFetcher) Fetch(ctx context.Context, request *Feed) *Response {
response.Response = res response.Response = res
switch res.StatusCode { switch res.StatusCode {
case 200: case 200:
f.m_fetch_status.Add(ctx, 1, metric.WithAttributes(attribute.String("status", "ok")))
case 304: case 304:
f.m_fetch_status.Add(ctx, 1, metric.WithAttributes(attribute.String("status", "not_modified")))
response.err = fmt.Errorf("%w: %s", ErrUnmodified, res.Status) response.err = fmt.Errorf("%w: %s", ErrUnmodified, res.Status)
case 400, 406, 502, 503: case 400, 406, 429, 500, 502, 503:
f.m_fetch_status.Add(ctx, 1, metric.WithAttributes(attribute.String("status", "temp_fail")))
response.err = fmt.Errorf("%w: %s", ErrTemporarilyDead, res.Status) response.err = fmt.Errorf("%w: %s", ErrTemporarilyDead, res.Status)
case 403, 404, 410: case 403, 404, 410:
f.m_fetch_status.Add(ctx, 1, metric.WithAttributes(attribute.String("status", "perm_fail")))
response.err = fmt.Errorf("%w: %s", ErrPermanentlyDead, res.Status) response.err = fmt.Errorf("%w: %s", ErrPermanentlyDead, res.Status)
default: default:
f.m_fetch_status.Add(ctx, 1, metric.WithAttributes(attribute.Int("status", res.StatusCode)))
response.err = errors.New(res.Status) response.err = errors.New(res.Status)
} }
@ -121,6 +151,9 @@ func NewFuncPool[IN, OUT any](
size int, size int,
fetch func(ctx context.Context, request IN) OUT, fetch func(ctx context.Context, request IN) OUT,
) (*pool[IN, OUT], func()) { ) (*pool[IN, OUT], func()) {
ctx, span := otel.Span(ctx)
defer span.End()
var wg sync.WaitGroup var wg sync.WaitGroup
in := make(chan IN, size) in := make(chan IN, size)
@ -129,12 +162,26 @@ func NewFuncPool[IN, OUT any](
wg.Add(size) wg.Add(size)
for range size { for range size {
go func() { go func() {
ctx, span := otel.Span(ctx)
defer span.End()
defer wg.Done() defer wg.Done()
for request := range in { for request := range in {
ctx, cancel := context.WithTimeoutCause(ctx, 15*time.Second, fmt.Errorf("GOT STUCK"))
defer cancel()
ctx, span := otel.Span(ctx)
defer span.End()
span.AddEvent("start fetch")
r := fetch(ctx, request)
span.AddEvent("got fetch")
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
case out <- fetch(ctx, request): case out <- r:
span.AddEvent("sent queue")
} }
} }
}() }()
@ -143,7 +190,11 @@ func NewFuncPool[IN, OUT any](
return &pool[IN, OUT]{ return &pool[IN, OUT]{
in: in, in: in,
out: out, out: out,
}, func() { close(in); wg.Wait(); close(out) } }, func() {
close(in)
wg.Wait()
close(out)
}
} }
func (f *pool[IN, OUT]) Fetch(request IN) { func (f *pool[IN, OUT]) Fetch(request IN) {

50
go.mod
View File

@ -4,25 +4,61 @@ go 1.23.2
require ( require (
github.com/mattn/go-sqlite3 v1.14.24 github.com/mattn/go-sqlite3 v1.14.24
go.yarn.social/lextwt v0.0.0-20240908172157-7b9ae633db51 go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp v0.6.0
go.yarn.social/lextwt v0.1.5-0.20250327005027-02d9b44de4dd
)
require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.25.1 // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.62.0 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.34.0 // indirect
go.opentelemetry.io/proto/otlp v1.5.0 // indirect
golang.org/x/net v0.34.0 // indirect
golang.org/x/text v0.23.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20250115164207-1a7da9e5054f // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250115164207-1a7da9e5054f // indirect
google.golang.org/protobuf v1.36.5 // indirect
) )
require ( require (
github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect
github.com/google/uuid v1.6.0 // indirect
go.opentelemetry.io/auto/sdk v1.1.0 // indirect go.opentelemetry.io/auto/sdk v1.1.0 // indirect
go.opentelemetry.io/otel/metric v1.34.0 // indirect go.opentelemetry.io/otel/log v0.10.0
go.opentelemetry.io/otel/trace v1.34.0 // indirect go.opentelemetry.io/otel/metric v1.35.0
go.opentelemetry.io/otel/trace v1.35.0
) )
require ( require (
github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/matryer/is v1.4.1 github.com/matryer/is v1.4.1
github.com/oklog/ulid/v2 v2.1.0
github.com/prometheus/client_golang v1.20.5
github.com/sirupsen/logrus v1.9.3 // indirect github.com/sirupsen/logrus v1.9.3 // indirect
github.com/uptrace/opentelemetry-go-extra/otelsql v0.3.2
github.com/writeas/go-strip-markdown/v2 v2.1.1 // indirect github.com/writeas/go-strip-markdown/v2 v2.1.1 // indirect
go.opentelemetry.io/otel v1.34.0 go.opentelemetry.io/contrib/bridges/otelslog v0.9.0
go.yarn.social/types v0.0.0-20230305013457-e4d91e351ac8 go.opentelemetry.io/otel v1.35.0
golang.org/x/crypto v0.27.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.34.0
golang.org/x/sys v0.25.0 // indirect go.opentelemetry.io/otel/exporters/prometheus v0.57.0
go.opentelemetry.io/otel/exporters/stdout/stdoutlog v0.10.0
go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.34.0
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.34.0
go.opentelemetry.io/otel/sdk v1.35.0
go.opentelemetry.io/otel/sdk/log v0.10.0
go.opentelemetry.io/otel/sdk/metric v1.35.0
go.yarn.social/types v0.0.0-20250108134258-ed75fa653ede
golang.org/x/crypto v0.36.0 // indirect
golang.org/x/sync v0.12.0
golang.org/x/sys v0.31.0 // indirect
google.golang.org/grpc v1.70.0 // indirect
) )

127
go.sum
View File

@ -1,46 +1,149 @@
github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.25.1 h1:VNqngBF40hVlDloBruUehVYC3ArSgIyScOAyMRqBxRg=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.25.1/go.mod h1:RBRO7fro65R6tjKzYgLAFo0t1QEXY1Dp+i/bvpRiqiQ=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I= github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I=
github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA=
github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc=
github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw=
github.com/matryer/is v1.4.1 h1:55ehd8zaGABKLXQUe2awZ99BD/PTc2ls+KV/dXphgEQ= github.com/matryer/is v1.4.1 h1:55ehd8zaGABKLXQUe2awZ99BD/PTc2ls+KV/dXphgEQ=
github.com/matryer/is v1.4.1/go.mod h1:8I/i5uYgLzgsgEloJE1U6xx5HkBQpAZvepWuujKwMRU= github.com/matryer/is v1.4.1/go.mod h1:8I/i5uYgLzgsgEloJE1U6xx5HkBQpAZvepWuujKwMRU=
github.com/mattn/go-sqlite3 v1.14.24 h1:tpSp2G2KyMnnQu99ngJ47EIkWVmliIizyZBfPrBWDRM= github.com/mattn/go-sqlite3 v1.14.24 h1:tpSp2G2KyMnnQu99ngJ47EIkWVmliIizyZBfPrBWDRM=
github.com/mattn/go-sqlite3 v1.14.24/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= github.com/mattn/go-sqlite3 v1.14.24/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4=
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
github.com/oklog/ulid/v2 v2.1.0 h1:+9lhoxAP56we25tyYETBBY1YLA2SaoLvUFgrP2miPJU=
github.com/oklog/ulid/v2 v2.1.0/go.mod h1:rcEKHmBBKfef9DhnvX7y1HZBYxjXb0cP5ExxNsTT1QQ=
github.com/pborman/getopt v0.0.0-20170112200414-7148bc3a4c30/go.mod h1:85jBQOZwpVEaDAr341tbn15RS4fCAsIst0qp7i8ex1o=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_golang v1.20.5 h1:cxppBPuYhUnsO6yo/aoRol4L7q7UFfdm+bR9r+8l63Y=
github.com/prometheus/client_golang v1.20.5/go.mod h1:PIEt8X02hGcP8JWbeHyeZ53Y/jReSnHgO035n//V5WE=
github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E=
github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY=
github.com/prometheus/common v0.55.0 h1:KEi6DK7lXW/m7Ig5i47x0vRzuBsHuvJdi5ee6Y3G1dc=
github.com/prometheus/common v0.55.0/go.mod h1:2SECS4xJG1kd8XF9IcM1gMX6510RAEL65zxzNImwdc8=
github.com/prometheus/common v0.62.0 h1:xasJaQlnWAeyHdUBeGjXmutelfJHWMRr+Fg4QszZ2Io=
github.com/prometheus/common v0.62.0/go.mod h1:vyBcEuLSvWos9B1+CyL7JZ2up+uFzXhkqml0W5zIY1I=
github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc=
github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk=
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/uptrace/opentelemetry-go-extra/otelsql v0.3.2 h1:ZjUj9BLYf9PEqBn8W/OapxhPjVRdC6CsXTdULHsyk5c=
github.com/uptrace/opentelemetry-go-extra/otelsql v0.3.2/go.mod h1:O8bHQfyinKwTXKkiKNGmLQS7vRsqRxIQTFZpYpHK3IQ=
github.com/writeas/go-strip-markdown/v2 v2.1.1 h1:hAxUM21Uhznf/FnbVGiJciqzska6iLei22Ijc3q2e28= github.com/writeas/go-strip-markdown/v2 v2.1.1 h1:hAxUM21Uhznf/FnbVGiJciqzska6iLei22Ijc3q2e28=
github.com/writeas/go-strip-markdown/v2 v2.1.1/go.mod h1:UvvgPJgn1vvN8nWuE5e7v/+qmDu3BSVnKAB6Gl7hFzA= github.com/writeas/go-strip-markdown/v2 v2.1.1/go.mod h1:UvvgPJgn1vvN8nWuE5e7v/+qmDu3BSVnKAB6Gl7hFzA=
go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA=
go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A=
go.opentelemetry.io/contrib/bridges/otelslog v0.9.0 h1:N+78eXSlu09kii5nkiM+01YbtWe01oZLPPLhNlEKhus=
go.opentelemetry.io/contrib/bridges/otelslog v0.9.0/go.mod h1:/2KhfLAhtQpgnhIk1f+dftA3fuuMcZjiz//Dc9yfaEs=
go.opentelemetry.io/otel v1.34.0 h1:zRLXxLCgL1WyKsPVrgbSdMN4c0FMkDAskSTQP+0hdUY= go.opentelemetry.io/otel v1.34.0 h1:zRLXxLCgL1WyKsPVrgbSdMN4c0FMkDAskSTQP+0hdUY=
go.opentelemetry.io/otel v1.34.0/go.mod h1:OWFPOQ+h4G8xpyjgqo4SxJYdDQ/qmRH+wivy7zzx9oI= go.opentelemetry.io/otel v1.34.0/go.mod h1:OWFPOQ+h4G8xpyjgqo4SxJYdDQ/qmRH+wivy7zzx9oI=
go.opentelemetry.io/otel v1.35.0 h1:xKWKPxrxB6OtMCbmMY021CqC45J+3Onta9MqjhnusiQ=
go.opentelemetry.io/otel v1.35.0/go.mod h1:UEqy8Zp11hpkUrL73gSlELM0DupHoiq72dR+Zqel/+Y=
go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp v0.6.0 h1:QSKmLBzbFULSyHzOdO9JsN9lpE4zkrz1byYGmJecdVE=
go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp v0.6.0/go.mod h1:sTQ/NH8Yrirf0sJ5rWqVu+oT82i4zL9FaF6rWcqnptM=
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.34.0 h1:OeNbIYk/2C15ckl7glBlOBp5+WlYsOElzTNmiPW/x60=
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.34.0/go.mod h1:7Bept48yIeqxP2OZ9/AqIpYS94h2or0aB4FypJTc8ZM=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.34.0 h1:tgJ0uaNS4c98WRNUEx5U3aDlrDOI5Rs+1Vifcw4DJ8U=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.34.0/go.mod h1:U7HYyW0zt/a9x5J1Kjs+r1f/d4ZHnYFclhYY2+YbeoE=
go.opentelemetry.io/otel/exporters/prometheus v0.57.0 h1:AHh/lAP1BHrY5gBwk8ncc25FXWm/gmmY3BX258z5nuk=
go.opentelemetry.io/otel/exporters/prometheus v0.57.0/go.mod h1:QpFWz1QxqevfjwzYdbMb4Y1NnlJvqSGwyuU0B4iuc9c=
go.opentelemetry.io/otel/exporters/stdout/stdoutlog v0.10.0 h1:GKCEAZLEpEf78cUvudQdTg0aET2ObOZRB2HtXA0qPAI=
go.opentelemetry.io/otel/exporters/stdout/stdoutlog v0.10.0/go.mod h1:9/zqSWLCmHT/9Jo6fYeUDRRogOLL60ABLsHWS99lF8s=
go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.34.0 h1:czJDQwFrMbOr9Kk+BPo1y8WZIIFIK58SA1kykuVeiOU=
go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.34.0/go.mod h1:lT7bmsxOe58Tq+JIOkTQMCGXdu47oA+VJKLZHbaBKbs=
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.34.0 h1:jBpDk4HAUsrnVO1FsfCfCOTEc/MkInJmvfCHYLFiT80=
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.34.0/go.mod h1:H9LUIM1daaeZaz91vZcfeM0fejXPmgCYE8ZhzqfJuiU=
go.opentelemetry.io/otel/log v0.10.0 h1:1CXmspaRITvFcjA4kyVszuG4HjA61fPDxMb7q3BuyF0=
go.opentelemetry.io/otel/log v0.10.0/go.mod h1:PbVdm9bXKku/gL0oFfUF4wwsQsOPlpo4VEqjvxih+FM=
go.opentelemetry.io/otel/metric v1.34.0 h1:+eTR3U0MyfWjRDhmFMxe2SsW64QrZ84AOhvqS7Y+PoQ= go.opentelemetry.io/otel/metric v1.34.0 h1:+eTR3U0MyfWjRDhmFMxe2SsW64QrZ84AOhvqS7Y+PoQ=
go.opentelemetry.io/otel/metric v1.34.0/go.mod h1:CEDrp0fy2D0MvkXE+dPV7cMi8tWZwX3dmaIhwPOaqHE= go.opentelemetry.io/otel/metric v1.34.0/go.mod h1:CEDrp0fy2D0MvkXE+dPV7cMi8tWZwX3dmaIhwPOaqHE=
go.opentelemetry.io/otel/metric v1.35.0 h1:0znxYu2SNyuMSQT4Y9WDWej0VpcsxkuklLa4/siN90M=
go.opentelemetry.io/otel/metric v1.35.0/go.mod h1:nKVFgxBZ2fReX6IlyW28MgZojkoAkJGaE8CpgeAU3oE=
go.opentelemetry.io/otel/sdk v1.34.0 h1:95zS4k/2GOy069d321O8jWgYsW3MzVV+KuSPKp7Wr1A=
go.opentelemetry.io/otel/sdk v1.34.0/go.mod h1:0e/pNiaMAqaykJGKbi+tSjWfNNHMTxoC9qANsCzbyxU=
go.opentelemetry.io/otel/sdk v1.35.0 h1:iPctf8iprVySXSKJffSS79eOjl9pvxV9ZqOWT0QejKY=
go.opentelemetry.io/otel/sdk v1.35.0/go.mod h1:+ga1bZliga3DxJ3CQGg3updiaAJoNECOgJREo9KHGQg=
go.opentelemetry.io/otel/sdk/log v0.10.0 h1:lR4teQGWfeDVGoute6l0Ou+RpFqQ9vaPdrNJlST0bvw=
go.opentelemetry.io/otel/sdk/log v0.10.0/go.mod h1:A+V1UTWREhWAittaQEG4bYm4gAZa6xnvVu+xKrIRkzo=
go.opentelemetry.io/otel/sdk/metric v1.34.0 h1:5CeK9ujjbFVL5c1PhLuStg1wxA7vQv7ce1EK0Gyvahk=
go.opentelemetry.io/otel/sdk/metric v1.34.0/go.mod h1:jQ/r8Ze28zRKoNRdkjCZxfs6YvBTG1+YIqyFVFYec5w=
go.opentelemetry.io/otel/sdk/metric v1.35.0 h1:1RriWBmCKgkeHEhM7a2uMjMUfP7MsOF5JpUCaEqEI9o=
go.opentelemetry.io/otel/sdk/metric v1.35.0/go.mod h1:is6XYCUMpcKi+ZsOvfluY5YstFnhW0BidkR+gL+qN+w=
go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC8mh/k= go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC8mh/k=
go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE= go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE=
go.uber.org/ratelimit v0.3.1 h1:K4qVE+byfv/B3tC+4nYWP7v/6SimcO7HzHekoMNBma0= go.opentelemetry.io/otel/trace v1.35.0 h1:dPpEfJu1sDIqruz7BHFG3c7528f6ddfSWfFDVt/xgMs=
go.uber.org/ratelimit v0.3.1/go.mod h1:6euWsTB6U/Nb3X++xEUXA8ciPJvr19Q/0h1+oDcJhRk= go.opentelemetry.io/otel/trace v1.35.0/go.mod h1:WUk7DtFp1Aw2MkvqGdwiXYDZZNvA/1J8o6xRXLrIkyc=
go.yarn.social/lextwt v0.0.0-20240908172157-7b9ae633db51 h1:XEjx0jSNv1h22gwGfQBfMypWv/YZXWGTRbqh3B8xfIs= go.opentelemetry.io/proto/otlp v1.5.0 h1:xJvq7gMzB31/d406fB8U5CBdyQGw4P399D1aQWU/3i4=
go.yarn.social/lextwt v0.0.0-20240908172157-7b9ae633db51/go.mod h1:CWAZuBHZfGaqa0FreSeLG+pzK3rHP2TNAG7Zh6QlRiM= go.opentelemetry.io/proto/otlp v1.5.0/go.mod h1:keN8WnHxOy8PG0rQZjJJ5A2ebUoafqWp0eVQ4yIXvJ4=
go.yarn.social/types v0.0.0-20230305013457-e4d91e351ac8 h1:zfnniiSO/WO65mSpdQzGYJ9pM0rYg/BKgrSm8h2mTyA= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.yarn.social/types v0.0.0-20230305013457-e4d91e351ac8/go.mod h1:+xnDkQ0T0S8emxWIsvxlCAoyF8gBaj0q81hr/VrKc0c= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
golang.org/x/crypto v0.27.0 h1:GXm2NjJrPaiv/h1tb2UH8QfgC/hOf/+z0p6PT8o1w7A= go.yarn.social/lextwt v0.0.0-20250213063805-7adc6ca07564 h1:z+IAMtxNKWcLNm9nLzJwHw6OPkV5JoQYmmFohaUvcKI=
golang.org/x/crypto v0.27.0/go.mod h1:1Xngt8kV6Dvbssa53Ziq6Eqn0HqbZi5Z6R0ZpwQzt70= go.yarn.social/lextwt v0.0.0-20250213063805-7adc6ca07564/go.mod h1:JOPCOh+3bHv+BMaFZpKzw6soiXbIlZD5b2f7YKDDjqk=
go.yarn.social/lextwt v0.1.5-0.20250327005027-02d9b44de4dd h1:Np3zWtQ0GB9WhRFCPblaItLVtdy8Y35QKL+PUvRR/t8=
go.yarn.social/lextwt v0.1.5-0.20250327005027-02d9b44de4dd/go.mod h1:P36NPegLbhbFa1A0JOLsDyIQcdM0zdmx8kPKACXry4A=
go.yarn.social/types v0.0.0-20250108134258-ed75fa653ede h1:XV9tuDQ605xxH4qIQPRHM1bOa7k0rJZ2RqA5kz2Nun4=
go.yarn.social/types v0.0.0-20250108134258-ed75fa653ede/go.mod h1:+xnDkQ0T0S8emxWIsvxlCAoyF8gBaj0q81hr/VrKc0c=
golang.org/x/crypto v0.33.0 h1:IOBPskki6Lysi0lo9qQvbxiQ+FvsCC/YWOecCHAixus=
golang.org/x/crypto v0.33.0/go.mod h1:bVdXmD7IV/4GdElGPozy6U7lWdRXA4qyRVGJV57uQ5M=
golang.org/x/crypto v0.36.0 h1:AnAEvhDddvBdpY+uR+MyHmuZzzNqXSe/GvuDeob5L34=
golang.org/x/crypto v0.36.0/go.mod h1:Y4J0ReaxCR1IMaabaSMugxJES1EpwhBHhv2bDHklZvc=
golang.org/x/net v0.34.0 h1:Mb7Mrk043xzHgnRM88suvJFwzVrRfHEHJEl5/71CKw0=
golang.org/x/net v0.34.0/go.mod h1:di0qlW3YNM5oh6GqDGQr92MyTozJPmybPK4Ev/Gm31k=
golang.org/x/sync v0.11.0 h1:GGz8+XQP4FvTTrjZPzNKTMFtSXH80RAzG+5ghFPgK9w=
golang.org/x/sync v0.11.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.12.0 h1:MHc5BpPuC30uJk597Ri8TV3CNZcTLu6B6z4lJy+g6Jw=
golang.org/x/sync v0.12.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34= golang.org/x/sys v0.30.0 h1:QjkSwP/36a20jFYWkSue1YwXzLmsV5Gfq7Eiy72C1uc=
golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik=
golang.org/x/sys v0.31.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
golang.org/x/text v0.22.0 h1:bofq7m3/HAFvbF51jz3Q9wLg3jkvSPuiZu/pD1XwgtM=
golang.org/x/text v0.22.0/go.mod h1:YRoo4H8PVmsu+E3Ou7cqLVH8oXWIHVoX0jqUWALQhfY=
golang.org/x/text v0.23.0 h1:D71I7dUrlY+VX0gQShAThNGHFxZ13dGLBHQLVl1mJlY=
golang.org/x/text v0.23.0/go.mod h1:/BLNzu4aZCJ1+kcD0DNRotWKage4q2rGVAg4o22unh4=
google.golang.org/genproto/googleapis/api v0.0.0-20250115164207-1a7da9e5054f h1:gap6+3Gk41EItBuyi4XX/bp4oqJ3UwuIMl25yGinuAA=
google.golang.org/genproto/googleapis/api v0.0.0-20250115164207-1a7da9e5054f/go.mod h1:Ic02D47M+zbarjYYUlK57y316f2MoN0gjAwI3f2S95o=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250115164207-1a7da9e5054f h1:OxYkA3wjPsZyBylwymxSHa7ViiW1Sml4ToBrncvFehI=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250115164207-1a7da9e5054f/go.mod h1:+2Yz8+CLJbIfL9z73EW45avw8Lmge3xVElCP9zEKi50=
google.golang.org/grpc v1.70.0 h1:pWFv03aZoHzlRKHWicjsZytKAiYCtNS0dHbXnIdq7jQ=
google.golang.org/grpc v1.70.0/go.mod h1:ofIJqVKDXx/JiXrwr2IG4/zwdH9txy3IlF40RmcJSQw=
google.golang.org/protobuf v1.36.3 h1:82DV7MYdb8anAVi3qge1wSnMDrnKK7ebr+I0hHRN1BU=
google.golang.org/protobuf v1.36.3/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
google.golang.org/protobuf v1.36.5 h1:tPhr+woSbjfYvY6/GPufUoYizxw1cF/yFoxJ2fmpwlM=
google.golang.org/protobuf v1.36.5/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

276
http.go
View File

@ -1,70 +1,265 @@
package main package main
import ( import (
"context" "errors"
"fmt" "fmt"
"net/http" "net/http"
"slices" "slices"
"sort" "sort"
"strconv"
"strings" "strings"
"time" "time"
"go.yarn.social/lextwt"
"go.yarn.social/types"
"go.sour.is/xt/internal/otel"
) )
func httpServer(c console, app *appState) { func httpServer(c *console, app *appState) error {
c.Log("start http server") ctx, span := otel.Span(c)
defer span.End()
db, err := app.DB() span.AddEvent("start http server")
db, err := app.DB(ctx)
if err != nil { if err != nil {
c.Log("missing db", err) span.RecordError(fmt.Errorf("%w: missing db", err))
c.abort() return err
return
} }
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/html; charset=utf-8") _, span := otel.Span(r.Context())
defer span.End()
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
w.Write([]byte("ok")) w.Write([]byte("ok"))
}) })
http.HandleFunc("/conv/{hash}", func(w http.ResponseWriter, r *http.Request) { http.HandleFunc("/api/plain/conv/{hash}", func(w http.ResponseWriter, r *http.Request) {
ctx, span := otel.Span(r.Context())
defer span.End()
hash := r.PathValue("hash") hash := r.PathValue("hash")
if (len(hash) < 6 || len(hash) > 8) && !notAny(hash, "abcdefghijklmnopqrstuvwxyz234567") { if (len(hash) < 6 || len(hash) > 8) && !notAny(hash, "abcdefghijklmnopqrstuvwxyz234567") {
w.WriteHeader(http.StatusBadRequest) w.WriteHeader(http.StatusBadRequest)
return return
} }
w.Header().Set("Content-Type", "text/html; charset=utf-8") w.Header().Set("Content-Type", "text/plain; charset=utf-8")
w.Write([]byte(hash))
rows, err := db.QueryContext(r.Context(), "SELECT feed_id, hash, conv, dt, text FROM twt WHERE hash = $1 or conv = $1", hash) rows, err := db.QueryContext(
ctx,
`SELECT
feed_id,
hash,
conv,
nick,
uri,
text
FROM twts
JOIN (
SELECT
feed_id,
nick,
uri
FROM feeds
) using (feed_id)
WHERE
hash = $1 or
conv = $1
order by ulid asc`,
hash,
)
if err != nil { if err != nil {
c.Log(err) span.RecordError(err)
return return
} }
defer rows.Close() defer rows.Close()
var twts []types.Twt
for rows.Next() { for rows.Next() {
var twt struct { var o struct {
FeedID string FeedID string
Hash string Hash string
Conv string Conv string
Dt time.Time Dt string
Nick string
URI string
Text string Text string
} }
err = rows.Scan(&twt.FeedID, &twt.Hash, &twt.Conv, &twt.Dt, &twt.Text) err = rows.Scan(&o.FeedID, &o.Hash, &o.Conv, &o.Nick, &o.URI, &o.Text)
if err != nil { if err != nil {
c.Log(err) span.RecordError(err)
return return
} }
twter := types.NewTwter(o.Nick, o.URI)
o.Text = strings.ReplaceAll(o.Text, "\n", "\u2028")
twt, _ := lextwt.ParseLine(o.Text, &twter)
twts = append(twts, twt)
} }
var preamble lextwt.Comments
preamble = append(preamble, lextwt.NewComment("# self = /conv/"+hash))
reg := lextwt.NewTwtRegistry(preamble, twts)
reg.WriteTo(w)
}) })
http.HandleFunc("/feeds", func(w http.ResponseWriter, r *http.Request) { http.HandleFunc("/api/plain/twt", func(w http.ResponseWriter, r *http.Request) {
ctx, span := otel.Span(r.Context())
defer span.End()
args := make([]any, 0, 3)
uriarg := ""
uri := r.URL.Query().Get("uri")
if uri != "" {
feed_id := urlNS.UUID5(uri)
uriarg = "and feed_id = ?"
args = append(args, feed_id)
}
limit := 100
if v, ok := strconv.Atoi(r.URL.Query().Get("limit")); ok == nil {
limit = v
}
offset := 0
if v, ok := strconv.Atoi(r.URL.Query().Get("offset")); ok == nil {
offset = v
}
args = append(args, limit, offset)
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
rows, err := db.QueryContext(
ctx,
`SELECT
feed_id,
hash,
conv,
nick,
uri,
text
FROM twts
JOIN (
SELECT
feed_id,
nick,
uri
FROM feeds
where state not in ('frozen', 'permanantly-dead')
`+uriarg+`
) using (feed_id)
order by ulid desc
limit ?
offset ?
`, args...,
)
if err != nil {
span.RecordError(err)
return
}
defer rows.Close()
var twts []types.Twt
for rows.Next() {
var o struct {
FeedID string
Hash string
Conv string
Dt string
Nick string
URI string
Text string
}
err = rows.Scan(&o.FeedID, &o.Hash, &o.Conv, &o.Nick, &o.URI, &o.Text)
if err != nil {
span.RecordError(err)
return
}
twter := types.NewTwter(o.Nick, o.URI)
o.Text = strings.ReplaceAll(o.Text, "\n", "\u2028")
twt, _ := lextwt.ParseLine(o.Text, &twter)
twts = append(twts, twt)
}
var preamble lextwt.Comments
preamble = append(preamble, lextwt.NewComment("# I am the Watcher. I am your guide through this vast new twtiverse."))
preamble = append(preamble, lextwt.NewComment("# self = /api/plain/twts"+mkqry(uri, limit, offset)))
preamble = append(preamble, lextwt.NewComment("# next = /api/plain/twts"+mkqry(uri, limit, offset+len(twts))))
if offset > 0 {
preamble = append(preamble, lextwt.NewComment("# prev = /api/plain/twts"+mkqry(uri, limit, offset-limit)))
}
reg := lextwt.NewTwtRegistry(preamble, twts)
reg.WriteTo(w)
})
http.HandleFunc("/api/plain/users", func(w http.ResponseWriter, r *http.Request) {
ctx, span := otel.Span(r.Context())
defer span.End()
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
where := `where parent_id is null and state not in ('permanantly-dead', 'frozen') and last_twt_on is not null`
args := make([]any, 0)
if uri := r.URL.Query().Get("uri"); uri != "" {
where = `where feed_id = ? or parent_id = ?`
feed_id := urlNS.UUID5(uri)
args = append(args, feed_id, feed_id)
}
rows, err := db.QueryContext(
ctx,
`SELECT
feed_id,
uri,
nick,
last_scan_on,
last_twt_on
FROM feeds
left join (
select feed_id, max(strftime('%Y-%m-%dT%H:%M:%fZ', (substring(text, 1, instr(text, ' ')-1)))) last_twt_on
from twts group by feed_id
) using (feed_id)
`+where+`
order by nick, uri
`,args...,
)
if err != nil {
span.RecordError(err)
return
}
defer rows.Close()
var twts []types.Twt
for rows.Next() {
var o struct {
FeedID string
URI string
Nick string
Dt TwtTime
LastTwtOn TwtTime
}
err = rows.Scan(&o.FeedID, &o.URI, &o.Nick, &o.Dt, &o.LastTwtOn)
if err != nil {
span.RecordError(err)
return
}
twts = append(twts, lextwt.NewTwt(
types.NewTwter(o.Nick, o.URI),
lextwt.NewDateTime(o.Dt.Time, o.LastTwtOn.Time.Format(time.RFC3339)),
nil,
))
}
reg := lextwt.NewTwtRegistry(nil, twts)
reg.WriteTo(w)
})
http.HandleFunc("/api/plain/queue", func(w http.ResponseWriter, r *http.Request) {
lis := slices.Collect(app.queue.Iter()) lis := slices.Collect(app.queue.Iter())
sort.Slice(lis, func(i, j int) bool { sort.Slice(lis, func(i, j int) bool {
return lis[i].LastScanOn.Time.Before(lis[j].LastScanOn.Time) return lis[i].NextScanOn.Time.Before(lis[j].LastScanOn.Time)
}) })
for _, feed := range lis { for _, feed := range lis {
fmt.Fprintln(w, feed.State, feed.LastScanOn.Time.Format(time.RFC3339), feed.Nick, feed.URI) fmt.Fprintln(w, feed.State, feed.NextScanOn.Time.Format(time.RFC3339), feed.Nick, feed.URI)
} }
}) })
@ -73,18 +268,14 @@ func httpServer(c console, app *appState) {
Handler: http.DefaultServeMux, Handler: http.DefaultServeMux,
} }
go func() { c.AddCancel(srv.Shutdown)
<-c.Done()
c.Log("stop http server")
srv.Shutdown(context.Background())
}()
err = srv.ListenAndServe() err = srv.ListenAndServe()
if err != nil { if !errors.Is(err, http.ErrServerClosed) {
c.Log(err) span.RecordError(err)
c.abort() return err
return
} }
return nil
} }
func notAny(s string, chars string) bool { func notAny(s string, chars string) bool {
@ -95,3 +286,28 @@ func notAny(s string, chars string) bool {
} }
return true return true
} }
func mkqry(uri string, limit, offset int) string {
qry := make([]string, 0, 3)
if uri != "" {
qry = append(qry, "uri=" + uri)
}
limit = min(100, max(1, limit))
if limit != 100 {
qry = append(qry, fmt.Sprint("limit=", limit))
}
offset = max(0, offset)
if offset != 0 {
qry = append(qry, fmt.Sprint("offset=", offset))
}
if len(qry) == 0 {
return ""
}
return "?" + strings.Join(qry, "&")
}

View File

@ -2,8 +2,10 @@ PRAGMA journal_mode=WAL;
create table if not exists feeds ( create table if not exists feeds (
feed_id blob primary key, feed_id blob primary key,
uri text, parent_id blob,
nick text, nick text,
uri text,
state string,
last_scan_on timestamp, last_scan_on timestamp,
refresh_rate int default 600, refresh_rate int default 600,
last_modified_on timestamp, last_modified_on timestamp,
@ -13,12 +15,12 @@ create table if not exists feeds (
create table if not exists twts ( create table if not exists twts (
feed_id blob, feed_id blob,
hash text, ulid blob,
conv text,
dt text, -- timestamp with timezone
text text, text text,
conv text,
hash text,
mentions text, -- json mentions text, -- json
tags text, -- json tags text, -- json
primary key (feed_id, hash) primary key (feed_id, ulid)
); );

333
internal/otel/otel.go Normal file
View File

@ -0,0 +1,333 @@
package otel
import (
"context"
"errors"
"fmt"
"log/slog"
"net/http"
"os"
"runtime"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.opentelemetry.io/contrib/bridges/otelslog"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
otelprom "go.opentelemetry.io/otel/exporters/prometheus"
"go.opentelemetry.io/otel/exporters/stdout/stdoutlog"
"go.opentelemetry.io/otel/exporters/stdout/stdouttrace"
"go.opentelemetry.io/otel/log/global"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/log"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
"go.opentelemetry.io/otel/trace"
)
var (
tracer trace.Tracer
meter metric.Meter
logger *slog.Logger
)
func Init(ctx context.Context, name string) (shutdown func(context.Context) error, err error) {
tracer = otel.Tracer(name)
meter = otel.Meter(name)
logger = otelslog.NewLogger(name)
return setupOTelSDK(ctx, name)
}
func Meter() metric.Meter { return meter }
// func Error(err error, v ...any) {
// if err == nil {
// return
// }
// fmt.Println("ERR:", append([]any{err}, v...))
// logger.Error(err.Error(), v...)
// }
// func Info(msg string, v ...any) { fmt.Println(append([]any{msg}, v...)); logger.Info(msg, v...) }
type spanny struct {
trace.Span
}
func (s *spanny) RecordError(err error, options ...trace.EventOption) {
if err == nil {
return
}
ec := trace.NewEventConfig(options...)
attrs := make([]any, len(ec.Attributes()))
for i, v := range ec.Attributes() {
attrs[i] = v
}
fmt.Println(append([]any{"ERR:", err}, attrs...)...)
logger.Error(err.Error(), attrs...)
s.Span.RecordError(err, options...)
}
func (s *spanny) AddEvent(name string, options ...trace.EventOption) {
ec := trace.NewEventConfig(options...)
attrs := make([]any, 2*len(ec.Attributes()))
for i, v := range ec.Attributes() {
attrs[2*i] = v.Key
attrs[2*i+1] = v.Value.Emit()
}
fmt.Println(append([]any{name}, attrs...)...)
logger.Info(name, attrs...)
s.Span.AddEvent(name, options...)
}
func Span(ctx context.Context, opts ...trace.SpanStartOption) (context.Context, trace.Span) {
name, attrs := Attrs()
ctx, span := tracer.Start(ctx, name, opts...)
span.SetAttributes(attrs...)
return ctx, &spanny{span}
}
func Attrs() (string, []attribute.KeyValue) {
var attrs []attribute.KeyValue
var name string
if pc, file, line, ok := runtime.Caller(2); ok {
if fn := runtime.FuncForPC(pc); fn != nil {
name = fn.Name()
}
attrs = append(attrs,
attribute.String("pc", fmt.Sprintf("%v", pc)),
attribute.String("file", file),
attribute.Int("line", line),
)
}
return name, attrs
}
// setupOTelSDK bootstraps the OpenTelemetry pipeline.
// If it does not return an error, make sure to call shutdown for proper cleanup.
func setupOTelSDK(ctx context.Context, name string) (shutdown func(context.Context) error, err error) {
var shutdownFuncs []func(context.Context) error
// shutdown calls cleanup functions registered via shutdownFuncs.
// The errors from the calls are joined.
// Each registered cleanup will be invoked once.
shutdown = func(ctx context.Context) error {
fmt.Println("shutdown")
var err error
for _, fn := range shutdownFuncs {
err = errors.Join(err, fn(ctx))
}
shutdownFuncs = nil
return err
}
// playShutdown := otelplay.ConfigureOpentelemetry(ctx)
// shutdownFuncs = append(shutdownFuncs, func(ctx context.Context) error { playShutdown(); return nil })
// handleErr calls shutdown for cleanup and makes sure that all errors are returned.
handleErr := func(inErr error) {
err = errors.Join(inErr, shutdown(ctx))
}
// Set up propagator.
prop := newPropagator()
otel.SetTextMapPropagator(prop)
// Set up trace provider.
tracerShutdown, err := newTraceProvider(ctx, name)
if err != nil {
handleErr(err)
return
}
shutdownFuncs = append(shutdownFuncs, tracerShutdown)
// Set up meter provider.
meterShutdown, err := newMeterProvider(ctx, name)
if err != nil {
handleErr(err)
return
}
shutdownFuncs = append(shutdownFuncs, meterShutdown)
// Set up logger provider.
loggerShutdown, err := newLoggerProvider(ctx, name)
if err != nil {
handleErr(err)
return
}
shutdownFuncs = append(shutdownFuncs, loggerShutdown)
return
}
func newPropagator() propagation.TextMapPropagator {
return propagation.NewCompositeTextMapPropagator(
propagation.TraceContext{},
propagation.Baggage{},
)
}
func newTraceProvider(ctx context.Context, name string) (func(context.Context) error, error) {
r, err := resource.Merge(
resource.Default(),
resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceName(name),
),
)
if err != nil {
return nil, err
}
if v := env("XT_TRACER", ""); v != "" {
fmt.Println("use tracer", v)
exp, err := otlptracegrpc.New(
ctx,
otlptracegrpc.WithEndpoint(v),
otlptracegrpc.WithInsecure(),
)
if err != nil {
return nil, err
}
tracerProvider := sdktrace.NewTracerProvider(
sdktrace.WithBatcher(exp),
sdktrace.WithResource(r),
)
otel.SetTracerProvider(tracerProvider)
return func(ctx context.Context) error {
return tracerProvider.Shutdown(ctx)
}, nil
}
traceExporter, err := stdouttrace.New(
stdouttrace.WithWriter(os.Stderr),
stdouttrace.WithPrettyPrint(),
)
if err != nil {
return nil, err
}
tracerProvider := sdktrace.NewTracerProvider(
sdktrace.WithResource(r),
sdktrace.WithBatcher(traceExporter,
// Default is 5s. Set to 1s for demonstrative purposes.
sdktrace.WithBatchTimeout(time.Second)),
)
otel.SetTracerProvider(tracerProvider)
return tracerProvider.Shutdown, nil
}
func newMeterProvider(ctx context.Context, name string) (func(context.Context) error, error) {
// metricExporter, err := stdoutmetric.New()
// if err != nil {
// return nil, err
// }
// meterProvider := sdkmetric.NewMeterProvider(
// sdkmetric.WithReader(sdkmetric.NewPeriodicReader(metricExporter,
// // Default is 1m. Set to 3s for demonstrative purposes.
// sdkmetric.WithInterval(3*time.Second))),
// )
// otel.SetMeterProvider(meterProvider)
metricExporter, err := otelprom.New(
otelprom.WithRegisterer(prometheus.DefaultRegisterer),
// OTEL default buckets assume you're using milliseconds. Substitute defaults
// appropriate for units of seconds.
otelprom.WithAggregationSelector(func(ik sdkmetric.InstrumentKind) sdkmetric.Aggregation {
switch ik {
case sdkmetric.InstrumentKindHistogram:
return sdkmetric.AggregationExplicitBucketHistogram{
Boundaries: prometheus.DefBuckets,
NoMinMax: false,
}
default:
return sdkmetric.DefaultAggregationSelector(ik)
}
}),
)
p := sdkmetric.NewMeterProvider(
sdkmetric.WithReader(metricExporter),
)
otel.SetMeterProvider(p)
http.Handle("/metrics", promhttp.Handler())
return func(ctx context.Context) error { return nil }, err
}
func newLoggerProvider(ctx context.Context, name string) (func(context.Context) error, error) {
r, err := resource.Merge(
resource.Default(),
resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceName(name),
),
)
if err != nil {
return nil, err
}
if v := env("XT_LOGGER", ""); v != "" {
fmt.Println("use logger", v)
exp, err := otlploghttp.New(
ctx,
otlploghttp.WithInsecure(),
otlploghttp.WithEndpointURL(v),
)
if err != nil {
return nil, err
}
processor := log.NewBatchProcessor(exp)
provider := log.NewLoggerProvider(
log.WithProcessor(processor),
log.WithResource(r),
)
global.SetLoggerProvider(provider)
return processor.Shutdown, nil
}
// return func(ctx context.Context) error { return nil }, nil
logExporter, err := stdoutlog.New(
stdoutlog.WithPrettyPrint(),
stdoutlog.WithWriter(os.Stderr),
)
if err != nil {
return nil, err
}
loggerProvider := log.NewLoggerProvider(
log.WithProcessor(
log.NewBatchProcessor(logExporter),
),
log.WithResource(r),
)
global.SetLoggerProvider(loggerProvider)
return loggerProvider.Shutdown, nil
}
func env(key, def string) string {
if v, ok := os.LookupEnv(key); ok {
return v
}
return def
}

115
main.go
View File

@ -1,24 +1,50 @@
package main package main
import ( import (
"bufio"
"context" "context"
"errors" "errors"
"fmt" "fmt"
"io" "io"
"os" "os"
"os/signal" "os/signal"
"strings"
"go.opentelemetry.io/otel" "go.opentelemetry.io/otel/metric"
"go.sour.is/xt/internal/otel"
) )
const name = "go.sour.is/xt" const name = "go.sour.is/xt"
var ( var m_up metric.Int64Gauge
tracer = otel.Tracer(name)
meter = otel.Meter(name)
)
type contextKey struct{ name string } func main() {
dotEnv() // load .env
ctx, console := newConsole(args{
dbtype: env("XT_DBTYPE", "sqlite3"),
dbfile: env("XT_DBFILE", "file:twt.db"),
baseFeed: env("XT_BASE_FEED", "feed"),
Nick: env("XT_NICK", "xuu"),
URI: env("XT_URI", "https://txt.sour.is/user/xuu/twtxt.txt"),
Listen: env("XT_LISTEN", ":8080"),
})
finish, err := otel.Init(ctx, name)
console.IfFatal(err)
console.AddCancel(finish)
m_up, err = otel.Meter().Int64Gauge("up")
console.IfFatal(err)
m_up.Record(ctx, 1)
defer m_up.Record(context.Background(), 0)
err = run(console)
if !errors.Is(err, context.Canceled) {
console.IfFatal(err)
}
}
type console struct { type console struct {
io.Reader io.Reader
@ -26,20 +52,57 @@ type console struct {
err io.Writer err io.Writer
context.Context context.Context
abort func() abort func()
cancelfns []func(context.Context) error
} }
func (c console) Log(v ...any) { fmt.Fprintln(c.err, v...) } func newConsole(args args) (context.Context, *console) {
func (c console) Args() args { ctx := context.Background()
ctx, abort := context.WithCancel(ctx)
ctx, stop := signal.NotifyContext(ctx, os.Interrupt)
go func() { <-ctx.Done(); stop() }() // restore interrupt function
console := &console{Reader: os.Stdin, Writer: os.Stdout, err: os.Stderr, Context: ctx, abort: abort}
console.Set("console", console)
console.Set("args", args)
return ctx, console
}
func (c *console) Args() args {
v, ok := c.Get("args").(args) v, ok := c.Get("args").(args)
if !ok { if !ok {
return args{} return args{}
} }
return v return v
} }
func (c *console) Shutdown() error {
fmt.Fprintln(c.err, "shutting down ", len(c.cancelfns), " cancel functions...")
defer fmt.Fprintln(c.err, "done")
c.abort()
var err error
for _, fn := range c.cancelfns {
err = errors.Join(err, fn(c.Context))
}
return err
}
func (c *console) AddCancel(fn func(context.Context) error) { c.cancelfns = append(c.cancelfns, fn) }
func (c *console) IfFatal(err error) {
if err == nil {
return
}
fmt.Fprintln(c.err, err)
c.abort()
os.Exit(1)
}
type contextKey struct{ name string }
func (c *console) Set(name string, value any) { func (c *console) Set(name string, value any) {
c.Context = context.WithValue(c.Context, contextKey{name}, value) c.Context = context.WithValue(c.Context, contextKey{name}, value)
} }
func (c console) Get(name string) any {
func (c *console) Get(name string) any {
return c.Context.Value(contextKey{name}) return c.Context.Value(contextKey{name})
} }
@ -59,25 +122,25 @@ func env(key, def string) string {
return def return def
} }
func main() { func dotEnv() {
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt) fd, err := os.Open(".env")
console := console{os.Stdin, os.Stdout, os.Stderr, ctx, stop} if err != nil {
return
go func() { <-ctx.Done(); console.Log("shutdown"); stop() }()
args := args{
dbtype: env("XT_DBTYPE", "sqlite3"),
dbfile: env("XT_DBFILE", "file:twt.db"),
baseFeed: env("XT_BASE_FEED", "feed"),
Nick: env("XT_NICK", "xuu"),
URI: env("XT_URI", "https://txt.sour.is/users/xuu/twtxt.txt"),
Listen: env("XT_LISTEN", ":8040"),
} }
console.Set("args", args) scan := bufio.NewScanner(fd)
if err := run(console); err != nil && !errors.Is(err, context.Canceled) { for scan.Scan() {
fmt.Println(err) line := scan.Text()
os.Exit(1)
if strings.HasPrefix(line, "#") {
continue
}
key, val, ok := strings.Cut(line, "=")
if !ok {
continue
}
os.Setenv(strings.TrimSpace(key), strings.TrimSpace(val))
} }
} }

View File

@ -1,105 +1,159 @@
package main package main
import ( import (
"context"
"errors" "errors"
"fmt" "fmt"
"io" "io"
"os" "os"
"path/filepath" "path/filepath"
"strings" "sort"
"time" "time"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.sour.is/xt/internal/otel"
"go.yarn.social/lextwt" "go.yarn.social/lextwt"
"go.yarn.social/types" "go.yarn.social/types"
) )
const ( const (
TenYear = 3153600000 TenYear = 3153600000
OneMonth = OneDay * 30
OneDay = 86400 OneDay = 86400
OneHour = 3600 OneHour = 3600
TenMinutes = 600 TenMinutes = 600
TwoMinutes = 60 TwoMinutes = 120
) )
func refreshLoop(c console, app *appState) { func feedRefreshProcessor(c *console, app *appState) error {
defer c.abort() ctx, span := otel.Span(c.Context)
defer span.End()
sleeping_time, _ := otel.Meter().Int64Counter("xt_feed_sleep")
queue_size, _ := otel.Meter().Int64Gauge("xt_feed_queue_size")
f := NewHTTPFetcher() f := NewHTTPFetcher()
fetch, close := NewFuncPool(c.Context, 25, f.Fetch) fetch, close := NewFuncPool(ctx, 40, f.Fetch)
defer close() defer close()
db, err := app.DB() db, err := app.DB(c)
if err != nil { if err != nil {
c.Log("missing db") span.RecordError(err)
c.abort() return err
return
} }
go processorLoop(ctx, db, fetch)
queue := app.queue queue := app.queue
c.Log("start refresh loop") span.AddEvent("start refresh loop")
for c.Err() == nil {
if queue.IsEmpty() {
c.Log("load feeds")
it, err := loadFeeds(c.Context, db) for ctx.Err() == nil {
if queue.IsEmpty() {
span.AddEvent("load feeds")
it, err := loadFeeds(ctx, db)
span.RecordError(err)
for f := range it { for f := range it {
queue.Insert(&f) queue.Insert(&f)
} }
if err != nil {
c.Log(err)
return
}
} }
span.AddEvent("queue", trace.WithAttributes(attribute.Int("size", int(queue.count))))
queue_size.Record(ctx, int64(queue.count))
f := queue.ExtractMin() f := queue.ExtractMin()
if f == nil { if f == nil {
c.Log("sleeping for ", TenMinutes*time.Second) sleeping_time.Add(ctx, int64(TwoMinutes))
span.AddEvent("sleeping for ", trace.WithAttributes(attribute.Int("seconds", int(TwoMinutes))))
select { select {
case <-time.After(TenMinutes * time.Second): case <-time.After(TwoMinutes * time.Second):
case <-c.Done(): case <-c.Done():
return return nil
} }
span.End()
continue continue
} }
c.Log("queue size", queue.count, "next", f.URI, "next scan on", f.LastScanOn.Time.Format(time.RFC3339)) span.AddEvent("next", trace.WithAttributes(
attribute.Int("size", int(queue.count)),
attribute.String("uri", f.URI),
attribute.String("last scan on", f.LastScanOn.Time.Format(time.RFC3339)),
attribute.String("next scan on", f.NextScanOn.Time.Format(time.RFC3339)),
))
until := time.Until(f.NextScanOn.Time)
if until > 2*time.Hour {
span.AddEvent("too soon", trace.WithAttributes(attribute.String("uri", f.URI)))
span.End()
if time.Until(f.LastScanOn.Time) > 2*time.Hour {
c.Log("too soon", f.URI)
continue continue
} }
span.AddEvent(
"till next",
trace.WithAttributes(attribute.String("time", until.String())))
sleeping_time.Add(ctx, until.Milliseconds())
select { select {
case <-c.Done(): case <-ctx.Done():
return return nil
case t := <-time.After(time.Until(f.LastScanOn.Time)): case t := <-time.After(until):
c.Log("fetch", t.Format(time.RFC3339), f.Nick, f.URI) span.AddEvent("fetch", trace.WithAttributes(
attribute.Int("size", int(queue.count)),
attribute.String("uri", f.URI),
attribute.String("timeout", t.Format(time.RFC3339)),
attribute.String("next scan on", f.NextScanOn.Time.Format(time.RFC3339)),
))
}
fetch.Fetch(f) fetch.Fetch(f)
}
span.RecordError(ctx.Err())
return ctx.Err()
}
func processorLoop(ctx context.Context, db db, fetch *pool[*Feed, *Response]) {
ctx, span := otel.Span(ctx)
defer span.End()
process_in_total, _ := otel.Meter().Int64Counter("xt_processed_in_total")
process_out_total, _ := otel.Meter().Int64Counter("xt_processed_out_total")
twts_histogram, _ := otel.Meter().Float64Histogram("xt_twt1k_bucket")
for ctx.Err() == nil {
select {
case <-ctx.Done():
return
case res := <-fetch.Out(): case res := <-fetch.Out():
c.Log("got response:", res.Request.URI)
f := res.Request f := res.Request
span.AddEvent("got response", trace.WithAttributes(
attribute.String("uri", f.URI),
attribute.String("scan on", f.NextScanOn.Time.Format(time.RFC3339)),
))
process_in_total.Add(ctx, 1)
f.LastScanOn.Time = time.Now() f.LastScanOn.Time = time.Now()
f.LastScanOn.Valid = true
err := res.err err := res.err
if res.err != nil { if res.err != nil {
f.LastError.String, f.LastError.Valid = err.Error(), true
if errors.Is(err, ErrPermanentlyDead) { if errors.Is(err, ErrPermanentlyDead) {
f.State = "permanantly-dead"
f.RefreshRate = TenYear f.RefreshRate = TenYear
} }
if errors.Is(err, ErrTemporarilyDead) { if errors.Is(err, ErrTemporarilyDead) {
f.RefreshRate = OneDay f.RefreshRate, f.State = tsTemp(f.LastTwtOn.Time)
} }
if errors.Is(err, ErrUnmodified) { if errors.Is(err, ErrUnmodified) {
f.RefreshRate = OneDay f.RefreshRate, f.State = tsTemp(f.LastTwtOn.Time)
} }
c.Log(err) span.RecordError(err)
err = f.Save(c.Context, db) f.LastError.String, f.LastError.Valid = err.Error(), true
if err != nil { err = f.Save(ctx, db)
c.Log(err) span.RecordError(err)
return
}
continue continue
} }
@ -107,50 +161,133 @@ func refreshLoop(c console, app *appState) {
f.ETag.String, f.ETag.Valid = res.ETag(), true f.ETag.String, f.ETag.Valid = res.ETag(), true
f.LastModified.Time, f.LastModified.Valid = res.LastModified(), true f.LastModified.Time, f.LastModified.Valid = res.LastModified(), true
span.AddEvent("read feed")
cpy, err := os.OpenFile(filepath.Join("feeds", urlNS.UUID5(f.URI).MarshalText()), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600) cpy, err := os.OpenFile(filepath.Join("feeds", urlNS.UUID5(f.URI).MarshalText()), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600)
rdr := io.TeeReader(res.Body, cpy)
twtfile, err := lextwt.ParseFile(rdr, &types.Twter{Nick: f.Nick, URI: f.URI})
if err != nil { if err != nil {
c.Log(fmt.Errorf("%w: %w", ErrParseFailed, err)) span.RecordError(fmt.Errorf("%w: %w", ErrParseFailed, err))
f.LastError.String, f.LastError.Valid = err.Error(), true
f.RefreshRate = OneDay f.RefreshRate = OneDay
return
}
if prev, ok := twtfile.Info().GetN("prev", 0); f.FirstFetch && ok { err = f.Save(ctx, db)
_, part, ok := strings.Cut(prev.Value(), " ") span.RecordError(err)
if ok {
part = f.URI[:strings.LastIndex(f.URI, "/")+1] + part
queue.Insert(&Feed{
FetchURI: part,
URI: f.URI,
Nick: f.Nick,
LastScanOn: f.LastScanOn,
RefreshRate: f.RefreshRate,
})
}
}
err = storeFeed(db, twtfile) continue
if err != nil {
c.Log(err)
err = f.Save(c.Context, db)
c.Log(err)
return
} }
rdr := io.TeeReader(res.Body, cpy)
rdr = lextwt.TwtFixer(rdr)
twtfile, err := lextwt.ParseFile(rdr, &types.Twter{Nick: f.Nick, URI: f.URI})
cpy.Close() cpy.Close()
f.LastScanOn.Time = time.Now() if err != nil {
f.RefreshRate = TenMinutes span.RecordError(fmt.Errorf("%w: %w", ErrParseFailed, err))
f.LastError.String, f.LastError.Valid = err.Error(), true
f.RefreshRate = OneDay
err = f.Save(ctx, db)
span.RecordError(err)
continue
}
count := twtfile.Twts().Len()
span.AddEvent("parse complete", trace.WithAttributes(attribute.Int("count", count)))
twts_histogram.Record(ctx, float64(count)/1000)
err = storeFeed(ctx, db, twtfile)
if err != nil {
span.RecordError(err)
f.LastError.String, f.LastError.Valid = err.Error(), true
err = f.Save(ctx, db)
span.RecordError(err)
continue
}
f.RefreshRate, f.State = checkTemp(twtfile.Twts())
f.LastError.String = "" f.LastError.String = ""
err = f.Save(c.Context, db) err = f.Save(ctx, db)
if err != nil { span.RecordError(err)
c.Log(err) process_out_total.Add(ctx, 1)
return
}
} }
} }
span.RecordError(ctx.Err())
}
func checkTemp(twts types.Twts) (int, State) {
if len(twts) < 5 {
return 7 * OneDay, "cold"
}
sort.Sort(twts)
since_first := -time.Until(twts[0].Created())
since_fifth := -time.Until(twts[4].Created())
if since_first < 2*time.Hour || since_fifth < 8*time.Hour {
return TwoMinutes, "hot"
}
if since_first < 4*time.Hour || since_fifth < 16*time.Hour {
return TenMinutes, "hot"
}
if since_first < 8*time.Hour || since_fifth < 32*time.Hour {
return 2 * TenMinutes, "warm"
}
if since_first < 16*time.Hour || since_fifth < 64*time.Hour {
return 4 * TenMinutes, "warm"
}
if since_first < 24*time.Hour || since_fifth < 128*time.Hour {
return OneDay, "cold"
}
if since_first < 48*time.Hour || since_fifth < 256*time.Hour {
return 2 * OneDay, "cold"
}
if since_first < 96*time.Hour || since_fifth < 512*time.Hour {
return 7 * OneDay, "frozen"
}
return OneMonth, "frozen"
}
func tsTemp(ts time.Time) (int, State) {
since_first := -time.Until(ts)
if since_first < 2*time.Hour {
return TwoMinutes, "hot"
}
if since_first < 4*time.Hour {
return TenMinutes, "hot"
}
if since_first < 8*time.Hour {
return 2 * TenMinutes, "warm"
}
if since_first < 16*time.Hour {
return 4 * TenMinutes, "warm"
}
if since_first < 24*time.Hour {
return OneDay, "cold"
}
if since_first < 48*time.Hour {
return 2 * OneDay, "cold"
}
if since_first < 96*time.Hour {
return 7 * OneDay, "frozen"
}
return OneMonth, "frozen"
} }

View File

@ -1,115 +0,0 @@
package main
import (
"context"
"database/sql"
"fmt"
"iter"
"os"
"strings"
"sync"
_ "embed"
_ "github.com/mattn/go-sqlite3"
"go.yarn.social/lextwt"
"go.yarn.social/types"
)
func run(c console) error {
a := c.Args()
app := &appState{
args: a,
feeds: sync.Map{},
queue: FibHeap(func(a, b *Feed) bool {
return a.LastScanOn.Time.Before(b.LastScanOn.Time)
}),
}
// Setup DB
err := func(ctx context.Context) error {
db, err := app.DB()
if err != nil {
return err
}
defer db.Close()
for _, stmt := range strings.Split(initSQL, ";") {
_, err = db.ExecContext(ctx, stmt)
if err != nil {
return err
}
}
return nil
}(c.Context)
if err != nil {
return err
}
// Seed File
err = func() error {
f, err := os.Open(a.baseFeed)
if err != nil {
return err
}
defer f.Close()
twtfile, err := lextwt.ParseFile(f, &types.Twter{
Nick: a.Nick,
URI: a.URI,
})
if err != nil {
return fmt.Errorf("%w: %w", ErrParseFailed, err)
}
db, err := app.DB()
if err != nil {
return err
}
defer db.Close()
return storeFeed(db, twtfile)
}()
if err != nil {
return err
}
go refreshLoop(c, app)
go httpServer(c, app)
<-c.Done()
return c.Err()
}
type appState struct {
args args
feeds sync.Map
queue *fibHeap[Feed]
}
func (app *appState) DB() (*sql.DB, error) {
return sql.Open(app.args.dbtype, app.args.dbfile)
}
func (app *appState) Feed(feedID string) *Feed {
return nil
}
func (app *appState) Feeds() iter.Seq2[string, *Feed] {
return func(yield func(string, *Feed) bool) {
app.feeds.Range(func(k, v any) bool {
key, _ := k.(string)
value, ok := v.(*Feed)
if !ok {
return false
}
if !yield(key, value) {
return false
}
return true
})
}
}

View File

@ -70,8 +70,8 @@ func (l *strList) Scan(value any) error {
func (l strList) Value() (driver.Value, error) { func (l strList) Value() (driver.Value, error) {
arr := make([]string, len(l)) arr := make([]string, len(l))
for i, v := range l { for i, v := range l {
arr[i] = "\""+v+"\"" arr[i] = "\"" + v + "\""
} }
return "["+strings.Join(arr, ",") +"]", nil return "[" + strings.Join(arr, ",") + "]", nil
} }