Merge pull request 'add-otel' (#3) from add-otel into main

Reviewed-on: #3
This commit is contained in:
xuu 2025-03-26 20:49:41 -06:00
commit 303ca5a2db
16 changed files with 1680 additions and 450 deletions

4
.gitignore vendored
View File

@ -2,3 +2,7 @@
feed
__debug*
feeds/
/xt
.env
*.txt
*.txt.xz

View File

@ -1,84 +0,0 @@
.ce
Preamble
.sp
We, the people of the United States, in order
to form a more perfect Union, establish justice, insure
domestic tranquility, provide for the common defense, promote
the general welfare,
and secure the blessing of liberty to ourselves and our
posterity do ordain and establish this Constitution for the
United States of America.
.sp
.nr aR 0 1
.af aR I
.de AR
.ce
.ps 16
.ft B
Article \\n+(aR
.nr sE 0 1
.af sE 1
.ps 12
.ft P
..
.de SE
.sp
.ft B
\\s-2SECTION \\n+(sE:\\s+2
.ft P
.nr pP 0 1
.af pP 1
..
.de PP
.sp
.ft I
\\s-3Paragraph \\n+(pP:\\s+3
.ft P
..
.AR
.SE
Legislative powers; in whom vested:
.PP
All legislative powers herein granted shall be vested in a
Congress of the United States, which shall consist of a Senate
and a House of Representatives.
.SE
House of Representatives, how and by whom chosen, Qualifications
of a Representative. Representatives and direct taxes, how
apportioned. Enumeration. Vacancies to be filled. Power of
choosing officers and of impeachment.
.PP
The House of Representatives shall be composed of members chosen
every second year by the people of the several states, and the
electors in each State shall have the qualifications requisite
for electors of the most numerous branch of the State Legislature.
.PP
No person shall be a Representative who shall not have attained
to the age of twenty-five years, and been seven years a citizen
of the United States, and who shall not, when elected, be an
inhabitant of that State in which he shall be chosen.
.PP
Representatives and direct taxes shall be apportioned among the
several States which maybe included within this Union, according
to their respective numbers, which shall be determined by adding
to the whole number of free persons, including those bound for
service for a term of years, and excluding Indians not taxed,
three-fifths of all other persons. The actual enumeration shall
be made within three years after the first meeting of the
Congress of the United States, and within every subsequent term
of ten years, in such manner as they shall by law direct. The
number of Representatives shall not exceed one for every thirty
thousand, but each State shall have at least one Representative;
and until such enumeration shall be made, the State of New
Hampshire shall be entitled to choose three, Massachusetts eight,
Rhode Island and Providence Plantations one, Connecticut
five, New York six, New Jersey four, Pennsylvania eight,
Delaware one, Maryland six, Virginia ten, North Carolina five,
South Carolina five, and Georgia three.
.PP
When vacancies happen in the representation from any State, the
Executive Authority thereof shall issue writs of election to fill
such vacancies.
.PP
The House of Representatives shall choose their Speaker and other
officers; and shall have the sole power of impeachment.

195
app.go Normal file
View File

@ -0,0 +1,195 @@
package main
import (
"context"
"database/sql"
"fmt"
"iter"
"os"
"runtime/debug"
"strconv"
"strings"
"sync"
_ "embed"
_ "github.com/mattn/go-sqlite3"
"github.com/uptrace/opentelemetry-go-extra/otelsql"
"go.opentelemetry.io/otel/attribute"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
"go.opentelemetry.io/otel/trace"
"go.sour.is/xt/internal/otel"
"go.yarn.social/lextwt"
"go.yarn.social/types"
"golang.org/x/sync/errgroup"
)
func run(c *console) error {
ctx, span := otel.Span(c.Context)
defer span.End()
bi, _ := debug.ReadBuildInfo()
span.AddEvent(name, trace.WithAttributes(attribute.String("version", bi.Main.Version)))
a := c.Args()
app := &appState{
args: a,
feeds: sync.Map{},
queue: FibHeap(func(a, b *Feed) bool {
return a.NextScanOn.Time.Before(b.NextScanOn.Time)
}),
}
// Setup DB
err := func(ctx context.Context) error {
ctx, span := otel.Span(ctx)
defer span.End()
db, err := app.DB(ctx)
if err != nil {
return err
}
defer db.Close()
for _, stmt := range strings.Split(initSQL, ";") {
_, err = db.ExecContext(ctx, stmt)
if err != nil {
return err
}
}
return nil
}(ctx)
if err != nil {
return err
}
// Seed File
err = func(ctx context.Context) error {
ctx, span := otel.Span(ctx)
defer span.End()
f, err := os.Open(a.baseFeed)
if err != nil {
return err
}
defer f.Close()
twtfile, err := lextwt.ParseFile(f, &types.Twter{
Nick: a.Nick,
URI: a.URI,
})
if err != nil {
return fmt.Errorf("%w: %w", ErrParseFailed, err)
}
db, err := app.DB(ctx)
if err != nil {
return err
}
defer db.Close()
return storeFeed(ctx, db, twtfile)
}(ctx)
if err != nil {
return err
}
wg, ctx := errgroup.WithContext(ctx)
c.Context = ctx
wg.Go(func() error {
return feedRefreshProcessor(c, app)
})
go httpServer(c, app)
err = wg.Wait()
if err != nil {
return err
}
return c.Context.Err()
}
type appState struct {
args args
feeds sync.Map
queue *fibHeap[Feed]
}
type db struct {
*sql.DB
Version string
Params map[string]string
MaxLength int
MaxVariableNumber int
}
func (app *appState) DB(ctx context.Context) (db, error) {
// return sql.Open(app.args.dbtype, app.args.dbfile)
var err error
db := db{Params: make(map[string]string)}
db.DB, err = otelsql.Open(app.args.dbtype, app.args.dbfile,
otelsql.WithAttributes(semconv.DBSystemSqlite),
otelsql.WithDBName("xt"))
if err != nil {
return db, err
}
rows, err := db.DB.QueryContext(ctx, `select sqlite_version()`)
if err != nil {
return db, err
}
if rows.Next() {
rows.Scan(&db.Version)
}
if err = rows.Err(); err != nil {
return db, err
}
rows, err = db.DB.QueryContext(ctx, `pragma compile_options`)
if err != nil {
return db, err
}
for rows.Next() {
var key, value string
rows.Scan(&key)
key, value, _ = strings.Cut(key, "=")
db.Params[key] = value
}
if err = rows.Err(); err != nil {
return db, err
}
if m, ok := db.Params["MAX_VARIABLE_NUMBER"]; ok {
db.MaxVariableNumber, _ = strconv.Atoi(m)
}
if m, ok := db.Params["MAX_LENGTH"]; ok {
db.MaxLength, _ = strconv.Atoi(m)
}
return db, err
}
func (app *appState) Feed(feedID string) *Feed {
return nil
}
func (app *appState) Feeds() iter.Seq2[string, *Feed] {
return func(yield func(string, *Feed) bool) {
app.feeds.Range(func(k, v any) bool {
key, _ := k.(string)
value, ok := v.(*Feed)
if !ok {
return false
}
if !yield(key, value) {
return false
}
return true
})
}
}

92
cmd/load/main.go Normal file
View File

@ -0,0 +1,92 @@
package main
import (
"context"
"database/sql"
"fmt"
"hash/fnv"
"iter"
"os"
"github.com/oklog/ulid/v2"
"github.com/uptrace/opentelemetry-go-extra/otelsql"
"go.yarn.social/lextwt"
"go.yarn.social/types"
)
func main() {
in := os.Stdin
if len(os.Args) != 2 {
fmt.Fprint(os.Stderr, "usage: ", os.Args[0], "[db file]")
}
db, err := DB(context.Background(), os.Args[1])
if err != nil {
panic(err)
}
_ = db
for line := range lextwt.IterRegistry(in) {
_ = line
}
}
const MaxVariableNumber = 32766
func DB(ctx context.Context, cnx string) (*sql.DB, error) {
// return sql.Open(app.args.dbtype, app.args.dbfile)
db, err := otelsql.Open("sqlite", cnx)
if err != nil {
return db, err
}
return db, err
}
func makeULID(twt types.Twt) ulid.ULID {
h64 := fnv.New64a()
h16 := fnv.New32a()
text := []byte(fmt.Sprintf("%+l", twt))
b := make([]byte, 10)
copy(b, h16.Sum(text)[:2])
copy(b[2:], h64.Sum(text))
u := ulid.ULID{}
u.SetTime(ulid.Timestamp(twt.Created()))
u.SetEntropy(b)
return u
}
func chunk(args []any, qry func(int) (string, int), maxArgs int) iter.Seq2[string, []any] {
_, size := qry(1)
itemsPerIter := maxArgs / size
if len(args) < size {
return func(yield func(string, []any) bool) {}
}
if len(args) < maxArgs {
return func(yield func(string, []any) bool) {
query, _ := qry(len(args) / size)
yield(query, args)
}
}
return func(yield func(string, []any) bool) {
for len(args) > 0 {
if len(args) > maxArgs {
query, size := qry(itemsPerIter)
if !yield(query, args[:size]) {
return
}
args = args[size:]
continue
}
query, _ := qry(len(args) / size)
yield(query, args)
return
}
}
}

365
feed.go
View File

@ -4,7 +4,9 @@ import (
"cmp"
"context"
"database/sql"
"database/sql/driver"
"fmt"
"hash/fnv"
"iter"
"net/http"
"net/url"
@ -14,28 +16,31 @@ import (
_ "embed"
"github.com/oklog/ulid/v2"
"go.sour.is/xt/internal/otel"
"go.yarn.social/lextwt"
"go.yarn.social/types"
)
type Feed struct {
FeedID uuid
FetchURI string
ParentID uuid
HashURI string
URI string
Nick string
LastScanOn sql.NullTime
State State
LastScanOn TwtTime
RefreshRate int
NextScanOn TwtTime
LastTwtOn TwtTime
LastModified sql.NullTime
LastModified TwtTime
LastError sql.NullString
ETag sql.NullString
Version string
DiscloseFeedURL string
DiscloseNick string
FirstFetch bool
State State
}
type State string
@ -46,40 +51,34 @@ const (
Cold State = "cold"
Warm State = "warm"
Hot State = "hot"
Once State = "once"
)
var (
//go:embed init.sql
initSQL string
insertFeed = `
insert into feeds
(feed_id, uri, nick, last_scan_on, refresh_rate)
values (?, ?, ?, ?, ?)
ON CONFLICT (feed_id) DO NOTHING
`
insertTwt = `
insert into twts
(feed_id, hash, conv, dt, text, mentions, tags)
values (?, ?, ?, ?, ?, ?, ?)
ON CONFLICT (feed_id, hash) DO NOTHING
`
fetchFeeds = `
select
feed_id,
uri,
nick,
last_scan_on,
refresh_rate,
last_modified_on,
last_etag
from feeds
where datetime(last_scan_on, '+'||refresh_rate||' seconds') < datetime(current_timestamp, '+10 minutes')
`
insertFeed = func(r int) (string, int) {
repeat := ""
if r > 1 {
repeat = strings.Repeat(", (?, ?, ?, ?, ?, ?, ?)", r-1)
}
return `
insert into feeds (
feed_id,
parent_id,
nick,
uri,
state,
last_scan_on,
refresh_rate
)
values (?, ?, ?, ?, ?, ?, ?)` + repeat + `
ON CONFLICT (feed_id) DO NOTHING`, r * 7
}
updateFeed = `
update feeds set
update feeds set
state = ?,
last_scan_on = ?,
refresh_rate = ?,
last_modified_on = ?,
@ -87,19 +86,88 @@ var (
last_error = ?
where feed_id = ?
`
insertTwt = func(r int) (string, int) {
repeat := ""
if r > 1 {
repeat = strings.Repeat(", (?, ?, ?, ?, ?, ?, ?)", r-1)
}
return `
insert into twts
(feed_id, ulid, text, hash, conv, mentions, tags)
values (?, ?, ?, ?, ?, ?, ?)` + repeat + `
ON CONFLICT (feed_id, ulid) DO NOTHING`, r * 7
}
fetchFeeds = `
select
feed_id,
parent_id,
coalesce(hashing_uri, uri) hash_uri,
uri,
nick,
state,
last_scan_on,
strftime(
'%Y-%m-%dT%H:%M:%fZ',
coalesce(last_scan_on, '1901-01-01'),
'+'||abs(refresh_rate + cast(random() % 30 as int))||' seconds'
) next_scan_on,
coalesce(last_twt_on, '1901-01-01T00:00:00Z') last_twt_on,
refresh_rate,
last_modified_on,
last_etag
from feeds
left join (
select feed_id, max(strftime('%Y-%m-%dT%H:%M:%fZ', (substring(text, 1, instr(text, ' ')-1)))) last_twt_on
from twts group by feed_id
) using (feed_id)
left join (
select
feed_id parent_id,
uri hashing_uri
from feeds
where parent_id is null
) using (parent_id)
where datetime(
coalesce(last_scan_on, '1901-01-01'),
'+'||abs(refresh_rate+cast(random()%30 as int))||' seconds'
) < datetime(current_timestamp, '+3 minutes')
`
)
func (f *Feed) Save(ctx context.Context, db *sql.DB) error {
fmt.Println(f.FetchURI, " ", f.LastModified, " ", f.LastError)
func (f *Feed) Create(ctx context.Context, db db) error {
ctx, span := otel.Span(ctx)
defer span.End()
query, _ := insertFeed(1)
_, err := db.ExecContext(
ctx,
query,
f.FeedID, // feed_id
f.ParentID, // parent_id
f.Nick, // nick
f.URI, // uri
f.State, // state
f.LastScanOn, // last_scan_on
f.RefreshRate, // refresh_rate
)
return err
}
func (f *Feed) Save(ctx context.Context, db db) error {
ctx, span := otel.Span(ctx)
defer span.End()
_, err := db.ExecContext(
ctx,
updateFeed,
f.LastScanOn,
f.RefreshRate,
f.LastModified,
f.ETag,
f.LastError,
f.FeedID,
f.State, // state
f.LastScanOn, // last_scan_on
f.RefreshRate, // refresh_rate
f.LastModified, // last_modified_on
f.ETag, // last_etag
f.LastError, // last_error
f.FeedID, // feed_id
)
return err
}
@ -111,9 +179,14 @@ func (f *Feed) Scan(res interface{ Scan(...any) error }) error {
f.Version = "0.0.1"
err = res.Scan(
&f.FeedID,
&f.ParentID,
&f.HashURI,
&f.URI,
&f.Nick,
&f.State,
&f.LastScanOn,
&f.NextScanOn,
&f.LastTwtOn,
&f.RefreshRate,
&f.LastModified,
&f.ETag,
@ -122,19 +195,12 @@ func (f *Feed) Scan(res interface{ Scan(...any) error }) error {
return err
}
if !f.LastScanOn.Valid {
f.FirstFetch = true
f.LastScanOn.Time = time.Now()
f.LastScanOn.Valid = true
} else {
f.LastScanOn.Time = f.LastScanOn.Time.Add(time.Duration(f.RefreshRate) * time.Second)
}
f.FetchURI = f.URI
return err
}
func loadFeeds(ctx context.Context, db *sql.DB) (iter.Seq[Feed], error) {
func loadFeeds(ctx context.Context, db db) (iter.Seq[Feed], error) {
ctx, span := otel.Span(ctx)
var err error
var res *sql.Rows
@ -145,10 +211,13 @@ func loadFeeds(ctx context.Context, db *sql.DB) (iter.Seq[Feed], error) {
}
return func(yield func(Feed) bool) {
defer span.End()
for res.Next() {
var f Feed
err = f.Scan(res)
if err != nil {
span.RecordError(err)
return
}
if !yield(f) {
@ -158,13 +227,16 @@ func loadFeeds(ctx context.Context, db *sql.DB) (iter.Seq[Feed], error) {
}, err
}
func storeFeed(db *sql.DB, f types.TwtFile) error {
func storeFeed(ctx context.Context, db db, f types.TwtFile) error {
ctx, span := otel.Span(ctx)
defer span.End()
loadTS := time.Now()
refreshRate := 600
feedID := urlNS.UUID5(cmp.Or(f.Twter().HashingURI, f.Twter().URI))
tx, err := db.Begin()
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return err
}
@ -188,19 +260,11 @@ func storeFeed(db *sql.DB, f types.TwtFile) error {
defer tx.Rollback()
_, err = tx.Exec(
insertFeed,
feedID,
f.Twter().HashingURI,
f.Twter().DomainNick(),
loadTS,
refreshRate,
)
if err != nil {
return err
}
twts := f.Twts()
_, size := insertTwt(len(twts))
args := make([]any, 0, size)
for _, twt := range f.Twts() {
for _, twt := range twts {
mentions := make(uuids, 0, len(twt.Mentions()))
for _, mention := range twt.Mentions() {
followMap[mention.Twter().Nick] = mention.Twter().URI
@ -219,30 +283,83 @@ func storeFeed(db *sql.DB, f types.TwtFile) error {
subjectTag = tag.Text()
}
}
args = append(
args,
feedID, // feed_id
makeULID(twt), // ulid
fmt.Sprintf("%+l", twt), // text
subjectTag, // conv
twt.Hash(), // hash
mentions.ToStrList(), // mentions
tags, // tags
)
}
for query, args := range chunk(args, insertTwt, db.MaxVariableNumber) {
fmt.Println("store", f.Twter().URI, len(args))
_, err = tx.Exec(
insertTwt,
feedID,
twt.Hash(),
subjectTag,
twt.Created(),
fmt.Sprint(twt),
mentions.ToStrList(),
tags,
_, err = tx.ExecContext(
ctx,
query,
args...,
)
if err != nil {
return err
}
}
args = args[:0]
args = append(args,
feedID, // feed_id
nil, // parent_id
f.Twter().DomainNick(), // nick
f.Twter().URI, // uri
"warm", // state
TwtTime{Time: loadTS, Valid: true}, // last_scan_on
refreshRate, // refresh_rate
)
if prev, ok := f.Info().GetN("prev", 0); ok {
_, part, ok := strings.Cut(prev.Value(), " ")
if ok {
uri := f.Twter().URI
if u, ok := f.Info().GetN("url", 0); ok {
uri = u.Value()
}
if u, ok := f.Info().GetN("uri", 0); ok {
uri = u.Value()
}
part = uri[:strings.LastIndex(uri, "/")+1] + part
childID := urlNS.UUID5(part)
fmt.Println("found prev", uri, part)
args = append(args,
childID, // feed_id
feedID, // parent_id
f.Twter().DomainNick(), // nick
part, // uri
"once", // state
nil, // last_scan_on
0, // refresh_rate
)
}
}
for nick, uri := range followMap {
_, err = tx.Exec(
insertFeed,
urlNS.UUID5(uri),
uri,
nick,
nil,
refreshRate,
args = append(args,
urlNS.UUID5(uri), // feed_id
nil, // parent_id
nick, // nick
uri, // uri
"warm", // state
nil, // last_scan_on
refreshRate, // refresh_rate
)
}
for query, args := range chunk(args, insertFeed, db.MaxVariableNumber) {
_, err = tx.ExecContext(
ctx,
query,
args...,
)
if err != nil {
return err
@ -253,12 +370,14 @@ func storeFeed(db *sql.DB, f types.TwtFile) error {
}
func (feed *Feed) MakeHTTPRequest(ctx context.Context) (*http.Request, error) {
feed.State = "fetch"
if strings.Contains(feed.FetchURI, "lublin.se") {
if strings.Contains(feed.URI, "lublin.se") {
return nil, fmt.Errorf("%w: permaban: %s", ErrPermanentlyDead, feed.URI)
}
if strings.Contains(feed.URI, "enotty.dk") {
return nil, fmt.Errorf("%w: permaban: %s", ErrPermanentlyDead, feed.URI)
}
req, err := http.NewRequestWithContext(ctx, "GET", feed.FetchURI, nil)
req, err := http.NewRequestWithContext(ctx, "GET", feed.URI, nil)
if err != nil {
return nil, fmt.Errorf("creating HTTP request failed: %w", err)
}
@ -282,3 +401,81 @@ func (feed *Feed) MakeHTTPRequest(ctx context.Context) (*http.Request, error) {
return req, nil
}
type TwtTime struct {
Time time.Time
Valid bool // Valid is true if Time is not NULL
}
// Scan implements the [Scanner] interface.
func (n *TwtTime) Scan(value any) error {
var err error
switch value := value.(type) {
case nil:
n.Time, n.Valid = time.Time{}, false
return nil
case string:
n.Valid = true
n.Time, err = time.Parse(time.RFC3339, value)
case time.Time:
n.Valid = true
n.Time = value
}
return err
}
// Value implements the [driver.Valuer] interface.
func (n TwtTime) Value() (driver.Value, error) {
if !n.Valid {
return nil, nil
}
return n.Time.Format(time.RFC3339), nil
}
func makeULID(twt types.Twt) ulid.ULID {
h64 := fnv.New64a()
h16 := fnv.New32a()
text := []byte(fmt.Sprintf("%+l", twt))
b := make([]byte, 10)
copy(b, h16.Sum(text)[:2])
copy(b[2:], h64.Sum(text))
u := ulid.ULID{}
u.SetTime(ulid.Timestamp(twt.Created()))
u.SetEntropy(b)
return u
}
func chunk(args []any, qry func(int) (string, int), maxArgs int) iter.Seq2[string, []any] {
_, size := qry(1)
itemsPerIter := maxArgs / size
if len(args) < size {
return func(yield func(string, []any) bool) {}
}
if len(args) < maxArgs {
return func(yield func(string, []any) bool) {
query, _ := qry(len(args) / size)
yield(query, args)
}
}
return func(yield func(string, []any) bool) {
for len(args) > 0 {
if len(args) > maxArgs {
query, size := qry(itemsPerIter)
if !yield(query, args[:size]) {
return
}
args = args[size:]
continue
}
query, _ := qry(len(args) / size)
yield(query, args)
return
}
}
}

View File

@ -8,6 +8,10 @@ import (
"net/http"
"sync"
"time"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.sour.is/xt/internal/otel"
)
var (
@ -48,10 +52,18 @@ func (r *Response) LastModified() time.Time {
type httpFetcher struct {
client *http.Client
m_fetch_status metric.Int64Counter
m_fetch_second metric.Float64Histogram
}
func NewHTTPFetcher() *httpFetcher {
fetch_total, _ := otel.Meter().Int64Counter("xt_fetch_status_total")
fetch_second, _ := otel.Meter().Float64Histogram("xt_fetch_seconds")
return &httpFetcher{
m_fetch_status: fetch_total,
m_fetch_second: fetch_second,
client: &http.Client{
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
@ -62,7 +74,7 @@ func NewHTTPFetcher() *httpFetcher {
ForceAttemptHTTP2: false,
MaxIdleConns: 100,
IdleConnTimeout: 10 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
TLSHandshakeTimeout: 5 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
},
},
@ -70,6 +82,16 @@ func NewHTTPFetcher() *httpFetcher {
}
func (f *httpFetcher) Fetch(ctx context.Context, request *Feed) *Response {
ctx, span := otel.Span(ctx)
defer span.End()
start := time.Now()
defer func() {
since := time.Since(start)
f.m_fetch_second.Record(ctx, since.Seconds())
}()
defer fmt.Println("fetch done", request.URI)
response := &Response{
Request: request,
}
@ -79,8 +101,9 @@ func (f *httpFetcher) Fetch(ctx context.Context, request *Feed) *Response {
response.err = err
return response
}
span.AddEvent("start request")
res, err := f.client.Do(req)
span.AddEvent("got response")
if err != nil {
if errors.Is(err, &net.DNSError{}) {
response.err = fmt.Errorf("%w: %s", ErrTemporarilyDead, err)
@ -93,17 +116,24 @@ func (f *httpFetcher) Fetch(ctx context.Context, request *Feed) *Response {
response.Response = res
switch res.StatusCode {
case 200:
f.m_fetch_status.Add(ctx, 1, metric.WithAttributes(attribute.String("status", "ok")))
case 304:
f.m_fetch_status.Add(ctx, 1, metric.WithAttributes(attribute.String("status", "not_modified")))
response.err = fmt.Errorf("%w: %s", ErrUnmodified, res.Status)
case 400, 406, 502, 503:
case 400, 406, 429, 500, 502, 503:
f.m_fetch_status.Add(ctx, 1, metric.WithAttributes(attribute.String("status", "temp_fail")))
response.err = fmt.Errorf("%w: %s", ErrTemporarilyDead, res.Status)
case 403, 404, 410:
f.m_fetch_status.Add(ctx, 1, metric.WithAttributes(attribute.String("status", "perm_fail")))
response.err = fmt.Errorf("%w: %s", ErrPermanentlyDead, res.Status)
default:
f.m_fetch_status.Add(ctx, 1, metric.WithAttributes(attribute.Int("status", res.StatusCode)))
response.err = errors.New(res.Status)
}
@ -121,6 +151,9 @@ func NewFuncPool[IN, OUT any](
size int,
fetch func(ctx context.Context, request IN) OUT,
) (*pool[IN, OUT], func()) {
ctx, span := otel.Span(ctx)
defer span.End()
var wg sync.WaitGroup
in := make(chan IN, size)
@ -129,21 +162,39 @@ func NewFuncPool[IN, OUT any](
wg.Add(size)
for range size {
go func() {
ctx, span := otel.Span(ctx)
defer span.End()
defer wg.Done()
for request := range in {
ctx, cancel := context.WithTimeoutCause(ctx, 15*time.Second, fmt.Errorf("GOT STUCK"))
defer cancel()
ctx, span := otel.Span(ctx)
defer span.End()
span.AddEvent("start fetch")
r := fetch(ctx, request)
span.AddEvent("got fetch")
select {
case <-ctx.Done():
return
case out <- fetch(ctx, request):
case out <- r:
span.AddEvent("sent queue")
}
}
}()
}
return &pool[IN, OUT]{
in: in,
out: out,
}, func() { close(in); wg.Wait(); close(out) }
in: in,
out: out,
}, func() {
close(in)
wg.Wait()
close(out)
}
}
func (f *pool[IN, OUT]) Fetch(request IN) {

50
go.mod
View File

@ -4,25 +4,61 @@ go 1.23.2
require (
github.com/mattn/go-sqlite3 v1.14.24
go.yarn.social/lextwt v0.0.0-20240908172157-7b9ae633db51
go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp v0.6.0
go.yarn.social/lextwt v0.1.5-0.20250327005027-02d9b44de4dd
)
require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.25.1 // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.62.0 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.34.0 // indirect
go.opentelemetry.io/proto/otlp v1.5.0 // indirect
golang.org/x/net v0.34.0 // indirect
golang.org/x/text v0.23.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20250115164207-1a7da9e5054f // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250115164207-1a7da9e5054f // indirect
google.golang.org/protobuf v1.36.5 // indirect
)
require (
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/google/uuid v1.6.0 // indirect
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
go.opentelemetry.io/otel/metric v1.34.0 // indirect
go.opentelemetry.io/otel/trace v1.34.0 // indirect
go.opentelemetry.io/otel/log v0.10.0
go.opentelemetry.io/otel/metric v1.35.0
go.opentelemetry.io/otel/trace v1.35.0
)
require (
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/matryer/is v1.4.1
github.com/oklog/ulid/v2 v2.1.0
github.com/prometheus/client_golang v1.20.5
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/uptrace/opentelemetry-go-extra/otelsql v0.3.2
github.com/writeas/go-strip-markdown/v2 v2.1.1 // indirect
go.opentelemetry.io/otel v1.34.0
go.yarn.social/types v0.0.0-20230305013457-e4d91e351ac8
golang.org/x/crypto v0.27.0 // indirect
golang.org/x/sys v0.25.0 // indirect
go.opentelemetry.io/contrib/bridges/otelslog v0.9.0
go.opentelemetry.io/otel v1.35.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.34.0
go.opentelemetry.io/otel/exporters/prometheus v0.57.0
go.opentelemetry.io/otel/exporters/stdout/stdoutlog v0.10.0
go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.34.0
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.34.0
go.opentelemetry.io/otel/sdk v1.35.0
go.opentelemetry.io/otel/sdk/log v0.10.0
go.opentelemetry.io/otel/sdk/metric v1.35.0
go.yarn.social/types v0.0.0-20250108134258-ed75fa653ede
golang.org/x/crypto v0.36.0 // indirect
golang.org/x/sync v0.12.0
golang.org/x/sys v0.31.0 // indirect
google.golang.org/grpc v1.70.0 // indirect
)

127
go.sum
View File

@ -1,46 +1,149 @@
github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A=
github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.25.1 h1:VNqngBF40hVlDloBruUehVYC3ArSgIyScOAyMRqBxRg=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.25.1/go.mod h1:RBRO7fro65R6tjKzYgLAFo0t1QEXY1Dp+i/bvpRiqiQ=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I=
github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA=
github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc=
github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw=
github.com/matryer/is v1.4.1 h1:55ehd8zaGABKLXQUe2awZ99BD/PTc2ls+KV/dXphgEQ=
github.com/matryer/is v1.4.1/go.mod h1:8I/i5uYgLzgsgEloJE1U6xx5HkBQpAZvepWuujKwMRU=
github.com/mattn/go-sqlite3 v1.14.24 h1:tpSp2G2KyMnnQu99ngJ47EIkWVmliIizyZBfPrBWDRM=
github.com/mattn/go-sqlite3 v1.14.24/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4=
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
github.com/oklog/ulid/v2 v2.1.0 h1:+9lhoxAP56we25tyYETBBY1YLA2SaoLvUFgrP2miPJU=
github.com/oklog/ulid/v2 v2.1.0/go.mod h1:rcEKHmBBKfef9DhnvX7y1HZBYxjXb0cP5ExxNsTT1QQ=
github.com/pborman/getopt v0.0.0-20170112200414-7148bc3a4c30/go.mod h1:85jBQOZwpVEaDAr341tbn15RS4fCAsIst0qp7i8ex1o=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_golang v1.20.5 h1:cxppBPuYhUnsO6yo/aoRol4L7q7UFfdm+bR9r+8l63Y=
github.com/prometheus/client_golang v1.20.5/go.mod h1:PIEt8X02hGcP8JWbeHyeZ53Y/jReSnHgO035n//V5WE=
github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E=
github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY=
github.com/prometheus/common v0.55.0 h1:KEi6DK7lXW/m7Ig5i47x0vRzuBsHuvJdi5ee6Y3G1dc=
github.com/prometheus/common v0.55.0/go.mod h1:2SECS4xJG1kd8XF9IcM1gMX6510RAEL65zxzNImwdc8=
github.com/prometheus/common v0.62.0 h1:xasJaQlnWAeyHdUBeGjXmutelfJHWMRr+Fg4QszZ2Io=
github.com/prometheus/common v0.62.0/go.mod h1:vyBcEuLSvWos9B1+CyL7JZ2up+uFzXhkqml0W5zIY1I=
github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc=
github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk=
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/uptrace/opentelemetry-go-extra/otelsql v0.3.2 h1:ZjUj9BLYf9PEqBn8W/OapxhPjVRdC6CsXTdULHsyk5c=
github.com/uptrace/opentelemetry-go-extra/otelsql v0.3.2/go.mod h1:O8bHQfyinKwTXKkiKNGmLQS7vRsqRxIQTFZpYpHK3IQ=
github.com/writeas/go-strip-markdown/v2 v2.1.1 h1:hAxUM21Uhznf/FnbVGiJciqzska6iLei22Ijc3q2e28=
github.com/writeas/go-strip-markdown/v2 v2.1.1/go.mod h1:UvvgPJgn1vvN8nWuE5e7v/+qmDu3BSVnKAB6Gl7hFzA=
go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA=
go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A=
go.opentelemetry.io/contrib/bridges/otelslog v0.9.0 h1:N+78eXSlu09kii5nkiM+01YbtWe01oZLPPLhNlEKhus=
go.opentelemetry.io/contrib/bridges/otelslog v0.9.0/go.mod h1:/2KhfLAhtQpgnhIk1f+dftA3fuuMcZjiz//Dc9yfaEs=
go.opentelemetry.io/otel v1.34.0 h1:zRLXxLCgL1WyKsPVrgbSdMN4c0FMkDAskSTQP+0hdUY=
go.opentelemetry.io/otel v1.34.0/go.mod h1:OWFPOQ+h4G8xpyjgqo4SxJYdDQ/qmRH+wivy7zzx9oI=
go.opentelemetry.io/otel v1.35.0 h1:xKWKPxrxB6OtMCbmMY021CqC45J+3Onta9MqjhnusiQ=
go.opentelemetry.io/otel v1.35.0/go.mod h1:UEqy8Zp11hpkUrL73gSlELM0DupHoiq72dR+Zqel/+Y=
go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp v0.6.0 h1:QSKmLBzbFULSyHzOdO9JsN9lpE4zkrz1byYGmJecdVE=
go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp v0.6.0/go.mod h1:sTQ/NH8Yrirf0sJ5rWqVu+oT82i4zL9FaF6rWcqnptM=
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.34.0 h1:OeNbIYk/2C15ckl7glBlOBp5+WlYsOElzTNmiPW/x60=
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.34.0/go.mod h1:7Bept48yIeqxP2OZ9/AqIpYS94h2or0aB4FypJTc8ZM=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.34.0 h1:tgJ0uaNS4c98WRNUEx5U3aDlrDOI5Rs+1Vifcw4DJ8U=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.34.0/go.mod h1:U7HYyW0zt/a9x5J1Kjs+r1f/d4ZHnYFclhYY2+YbeoE=
go.opentelemetry.io/otel/exporters/prometheus v0.57.0 h1:AHh/lAP1BHrY5gBwk8ncc25FXWm/gmmY3BX258z5nuk=
go.opentelemetry.io/otel/exporters/prometheus v0.57.0/go.mod h1:QpFWz1QxqevfjwzYdbMb4Y1NnlJvqSGwyuU0B4iuc9c=
go.opentelemetry.io/otel/exporters/stdout/stdoutlog v0.10.0 h1:GKCEAZLEpEf78cUvudQdTg0aET2ObOZRB2HtXA0qPAI=
go.opentelemetry.io/otel/exporters/stdout/stdoutlog v0.10.0/go.mod h1:9/zqSWLCmHT/9Jo6fYeUDRRogOLL60ABLsHWS99lF8s=
go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.34.0 h1:czJDQwFrMbOr9Kk+BPo1y8WZIIFIK58SA1kykuVeiOU=
go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.34.0/go.mod h1:lT7bmsxOe58Tq+JIOkTQMCGXdu47oA+VJKLZHbaBKbs=
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.34.0 h1:jBpDk4HAUsrnVO1FsfCfCOTEc/MkInJmvfCHYLFiT80=
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.34.0/go.mod h1:H9LUIM1daaeZaz91vZcfeM0fejXPmgCYE8ZhzqfJuiU=
go.opentelemetry.io/otel/log v0.10.0 h1:1CXmspaRITvFcjA4kyVszuG4HjA61fPDxMb7q3BuyF0=
go.opentelemetry.io/otel/log v0.10.0/go.mod h1:PbVdm9bXKku/gL0oFfUF4wwsQsOPlpo4VEqjvxih+FM=
go.opentelemetry.io/otel/metric v1.34.0 h1:+eTR3U0MyfWjRDhmFMxe2SsW64QrZ84AOhvqS7Y+PoQ=
go.opentelemetry.io/otel/metric v1.34.0/go.mod h1:CEDrp0fy2D0MvkXE+dPV7cMi8tWZwX3dmaIhwPOaqHE=
go.opentelemetry.io/otel/metric v1.35.0 h1:0znxYu2SNyuMSQT4Y9WDWej0VpcsxkuklLa4/siN90M=
go.opentelemetry.io/otel/metric v1.35.0/go.mod h1:nKVFgxBZ2fReX6IlyW28MgZojkoAkJGaE8CpgeAU3oE=
go.opentelemetry.io/otel/sdk v1.34.0 h1:95zS4k/2GOy069d321O8jWgYsW3MzVV+KuSPKp7Wr1A=
go.opentelemetry.io/otel/sdk v1.34.0/go.mod h1:0e/pNiaMAqaykJGKbi+tSjWfNNHMTxoC9qANsCzbyxU=
go.opentelemetry.io/otel/sdk v1.35.0 h1:iPctf8iprVySXSKJffSS79eOjl9pvxV9ZqOWT0QejKY=
go.opentelemetry.io/otel/sdk v1.35.0/go.mod h1:+ga1bZliga3DxJ3CQGg3updiaAJoNECOgJREo9KHGQg=
go.opentelemetry.io/otel/sdk/log v0.10.0 h1:lR4teQGWfeDVGoute6l0Ou+RpFqQ9vaPdrNJlST0bvw=
go.opentelemetry.io/otel/sdk/log v0.10.0/go.mod h1:A+V1UTWREhWAittaQEG4bYm4gAZa6xnvVu+xKrIRkzo=
go.opentelemetry.io/otel/sdk/metric v1.34.0 h1:5CeK9ujjbFVL5c1PhLuStg1wxA7vQv7ce1EK0Gyvahk=
go.opentelemetry.io/otel/sdk/metric v1.34.0/go.mod h1:jQ/r8Ze28zRKoNRdkjCZxfs6YvBTG1+YIqyFVFYec5w=
go.opentelemetry.io/otel/sdk/metric v1.35.0 h1:1RriWBmCKgkeHEhM7a2uMjMUfP7MsOF5JpUCaEqEI9o=
go.opentelemetry.io/otel/sdk/metric v1.35.0/go.mod h1:is6XYCUMpcKi+ZsOvfluY5YstFnhW0BidkR+gL+qN+w=
go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC8mh/k=
go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE=
go.uber.org/ratelimit v0.3.1 h1:K4qVE+byfv/B3tC+4nYWP7v/6SimcO7HzHekoMNBma0=
go.uber.org/ratelimit v0.3.1/go.mod h1:6euWsTB6U/Nb3X++xEUXA8ciPJvr19Q/0h1+oDcJhRk=
go.yarn.social/lextwt v0.0.0-20240908172157-7b9ae633db51 h1:XEjx0jSNv1h22gwGfQBfMypWv/YZXWGTRbqh3B8xfIs=
go.yarn.social/lextwt v0.0.0-20240908172157-7b9ae633db51/go.mod h1:CWAZuBHZfGaqa0FreSeLG+pzK3rHP2TNAG7Zh6QlRiM=
go.yarn.social/types v0.0.0-20230305013457-e4d91e351ac8 h1:zfnniiSO/WO65mSpdQzGYJ9pM0rYg/BKgrSm8h2mTyA=
go.yarn.social/types v0.0.0-20230305013457-e4d91e351ac8/go.mod h1:+xnDkQ0T0S8emxWIsvxlCAoyF8gBaj0q81hr/VrKc0c=
golang.org/x/crypto v0.27.0 h1:GXm2NjJrPaiv/h1tb2UH8QfgC/hOf/+z0p6PT8o1w7A=
golang.org/x/crypto v0.27.0/go.mod h1:1Xngt8kV6Dvbssa53Ziq6Eqn0HqbZi5Z6R0ZpwQzt70=
go.opentelemetry.io/otel/trace v1.35.0 h1:dPpEfJu1sDIqruz7BHFG3c7528f6ddfSWfFDVt/xgMs=
go.opentelemetry.io/otel/trace v1.35.0/go.mod h1:WUk7DtFp1Aw2MkvqGdwiXYDZZNvA/1J8o6xRXLrIkyc=
go.opentelemetry.io/proto/otlp v1.5.0 h1:xJvq7gMzB31/d406fB8U5CBdyQGw4P399D1aQWU/3i4=
go.opentelemetry.io/proto/otlp v1.5.0/go.mod h1:keN8WnHxOy8PG0rQZjJJ5A2ebUoafqWp0eVQ4yIXvJ4=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
go.yarn.social/lextwt v0.0.0-20250213063805-7adc6ca07564 h1:z+IAMtxNKWcLNm9nLzJwHw6OPkV5JoQYmmFohaUvcKI=
go.yarn.social/lextwt v0.0.0-20250213063805-7adc6ca07564/go.mod h1:JOPCOh+3bHv+BMaFZpKzw6soiXbIlZD5b2f7YKDDjqk=
go.yarn.social/lextwt v0.1.5-0.20250327005027-02d9b44de4dd h1:Np3zWtQ0GB9WhRFCPblaItLVtdy8Y35QKL+PUvRR/t8=
go.yarn.social/lextwt v0.1.5-0.20250327005027-02d9b44de4dd/go.mod h1:P36NPegLbhbFa1A0JOLsDyIQcdM0zdmx8kPKACXry4A=
go.yarn.social/types v0.0.0-20250108134258-ed75fa653ede h1:XV9tuDQ605xxH4qIQPRHM1bOa7k0rJZ2RqA5kz2Nun4=
go.yarn.social/types v0.0.0-20250108134258-ed75fa653ede/go.mod h1:+xnDkQ0T0S8emxWIsvxlCAoyF8gBaj0q81hr/VrKc0c=
golang.org/x/crypto v0.33.0 h1:IOBPskki6Lysi0lo9qQvbxiQ+FvsCC/YWOecCHAixus=
golang.org/x/crypto v0.33.0/go.mod h1:bVdXmD7IV/4GdElGPozy6U7lWdRXA4qyRVGJV57uQ5M=
golang.org/x/crypto v0.36.0 h1:AnAEvhDddvBdpY+uR+MyHmuZzzNqXSe/GvuDeob5L34=
golang.org/x/crypto v0.36.0/go.mod h1:Y4J0ReaxCR1IMaabaSMugxJES1EpwhBHhv2bDHklZvc=
golang.org/x/net v0.34.0 h1:Mb7Mrk043xzHgnRM88suvJFwzVrRfHEHJEl5/71CKw0=
golang.org/x/net v0.34.0/go.mod h1:di0qlW3YNM5oh6GqDGQr92MyTozJPmybPK4Ev/Gm31k=
golang.org/x/sync v0.11.0 h1:GGz8+XQP4FvTTrjZPzNKTMFtSXH80RAzG+5ghFPgK9w=
golang.org/x/sync v0.11.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.12.0 h1:MHc5BpPuC30uJk597Ri8TV3CNZcTLu6B6z4lJy+g6Jw=
golang.org/x/sync v0.12.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34=
golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.30.0 h1:QjkSwP/36a20jFYWkSue1YwXzLmsV5Gfq7Eiy72C1uc=
golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik=
golang.org/x/sys v0.31.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
golang.org/x/text v0.22.0 h1:bofq7m3/HAFvbF51jz3Q9wLg3jkvSPuiZu/pD1XwgtM=
golang.org/x/text v0.22.0/go.mod h1:YRoo4H8PVmsu+E3Ou7cqLVH8oXWIHVoX0jqUWALQhfY=
golang.org/x/text v0.23.0 h1:D71I7dUrlY+VX0gQShAThNGHFxZ13dGLBHQLVl1mJlY=
golang.org/x/text v0.23.0/go.mod h1:/BLNzu4aZCJ1+kcD0DNRotWKage4q2rGVAg4o22unh4=
google.golang.org/genproto/googleapis/api v0.0.0-20250115164207-1a7da9e5054f h1:gap6+3Gk41EItBuyi4XX/bp4oqJ3UwuIMl25yGinuAA=
google.golang.org/genproto/googleapis/api v0.0.0-20250115164207-1a7da9e5054f/go.mod h1:Ic02D47M+zbarjYYUlK57y316f2MoN0gjAwI3f2S95o=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250115164207-1a7da9e5054f h1:OxYkA3wjPsZyBylwymxSHa7ViiW1Sml4ToBrncvFehI=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250115164207-1a7da9e5054f/go.mod h1:+2Yz8+CLJbIfL9z73EW45avw8Lmge3xVElCP9zEKi50=
google.golang.org/grpc v1.70.0 h1:pWFv03aZoHzlRKHWicjsZytKAiYCtNS0dHbXnIdq7jQ=
google.golang.org/grpc v1.70.0/go.mod h1:ofIJqVKDXx/JiXrwr2IG4/zwdH9txy3IlF40RmcJSQw=
google.golang.org/protobuf v1.36.3 h1:82DV7MYdb8anAVi3qge1wSnMDrnKK7ebr+I0hHRN1BU=
google.golang.org/protobuf v1.36.3/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
google.golang.org/protobuf v1.36.5 h1:tPhr+woSbjfYvY6/GPufUoYizxw1cF/yFoxJ2fmpwlM=
google.golang.org/protobuf v1.36.5/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

276
http.go
View File

@ -1,70 +1,265 @@
package main
import (
"context"
"errors"
"fmt"
"net/http"
"slices"
"sort"
"strconv"
"strings"
"time"
"go.yarn.social/lextwt"
"go.yarn.social/types"
"go.sour.is/xt/internal/otel"
)
func httpServer(c console, app *appState) {
c.Log("start http server")
func httpServer(c *console, app *appState) error {
ctx, span := otel.Span(c)
defer span.End()
db, err := app.DB()
span.AddEvent("start http server")
db, err := app.DB(ctx)
if err != nil {
c.Log("missing db", err)
c.abort()
return
span.RecordError(fmt.Errorf("%w: missing db", err))
return err
}
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/html; charset=utf-8")
_, span := otel.Span(r.Context())
defer span.End()
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
w.Write([]byte("ok"))
})
http.HandleFunc("/conv/{hash}", func(w http.ResponseWriter, r *http.Request) {
http.HandleFunc("/api/plain/conv/{hash}", func(w http.ResponseWriter, r *http.Request) {
ctx, span := otel.Span(r.Context())
defer span.End()
hash := r.PathValue("hash")
if (len(hash) < 6 || len(hash) > 8) && !notAny(hash, "abcdefghijklmnopqrstuvwxyz234567") {
w.WriteHeader(http.StatusBadRequest)
return
}
w.Header().Set("Content-Type", "text/html; charset=utf-8")
w.Write([]byte(hash))
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
rows, err := db.QueryContext(r.Context(), "SELECT feed_id, hash, conv, dt, text FROM twt WHERE hash = $1 or conv = $1", hash)
rows, err := db.QueryContext(
ctx,
`SELECT
feed_id,
hash,
conv,
nick,
uri,
text
FROM twts
JOIN (
SELECT
feed_id,
nick,
uri
FROM feeds
) using (feed_id)
WHERE
hash = $1 or
conv = $1
order by ulid asc`,
hash,
)
if err != nil {
c.Log(err)
span.RecordError(err)
return
}
defer rows.Close()
var twts []types.Twt
for rows.Next() {
var twt struct {
var o struct {
FeedID string
Hash string
Conv string
Dt time.Time
Dt string
Nick string
URI string
Text string
}
err = rows.Scan(&twt.FeedID, &twt.Hash, &twt.Conv, &twt.Dt, &twt.Text)
err = rows.Scan(&o.FeedID, &o.Hash, &o.Conv, &o.Nick, &o.URI, &o.Text)
if err != nil {
c.Log(err)
span.RecordError(err)
return
}
twter := types.NewTwter(o.Nick, o.URI)
o.Text = strings.ReplaceAll(o.Text, "\n", "\u2028")
twt, _ := lextwt.ParseLine(o.Text, &twter)
twts = append(twts, twt)
}
var preamble lextwt.Comments
preamble = append(preamble, lextwt.NewComment("# self = /conv/"+hash))
reg := lextwt.NewTwtRegistry(preamble, twts)
reg.WriteTo(w)
})
http.HandleFunc("/feeds", func(w http.ResponseWriter, r *http.Request) {
http.HandleFunc("/api/plain/twt", func(w http.ResponseWriter, r *http.Request) {
ctx, span := otel.Span(r.Context())
defer span.End()
args := make([]any, 0, 3)
uriarg := ""
uri := r.URL.Query().Get("uri")
if uri != "" {
feed_id := urlNS.UUID5(uri)
uriarg = "and feed_id = ?"
args = append(args, feed_id)
}
limit := 100
if v, ok := strconv.Atoi(r.URL.Query().Get("limit")); ok == nil {
limit = v
}
offset := 0
if v, ok := strconv.Atoi(r.URL.Query().Get("offset")); ok == nil {
offset = v
}
args = append(args, limit, offset)
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
rows, err := db.QueryContext(
ctx,
`SELECT
feed_id,
hash,
conv,
nick,
uri,
text
FROM twts
JOIN (
SELECT
feed_id,
nick,
uri
FROM feeds
where state not in ('frozen', 'permanantly-dead')
`+uriarg+`
) using (feed_id)
order by ulid desc
limit ?
offset ?
`, args...,
)
if err != nil {
span.RecordError(err)
return
}
defer rows.Close()
var twts []types.Twt
for rows.Next() {
var o struct {
FeedID string
Hash string
Conv string
Dt string
Nick string
URI string
Text string
}
err = rows.Scan(&o.FeedID, &o.Hash, &o.Conv, &o.Nick, &o.URI, &o.Text)
if err != nil {
span.RecordError(err)
return
}
twter := types.NewTwter(o.Nick, o.URI)
o.Text = strings.ReplaceAll(o.Text, "\n", "\u2028")
twt, _ := lextwt.ParseLine(o.Text, &twter)
twts = append(twts, twt)
}
var preamble lextwt.Comments
preamble = append(preamble, lextwt.NewComment("# I am the Watcher. I am your guide through this vast new twtiverse."))
preamble = append(preamble, lextwt.NewComment("# self = /api/plain/twts"+mkqry(uri, limit, offset)))
preamble = append(preamble, lextwt.NewComment("# next = /api/plain/twts"+mkqry(uri, limit, offset+len(twts))))
if offset > 0 {
preamble = append(preamble, lextwt.NewComment("# prev = /api/plain/twts"+mkqry(uri, limit, offset-limit)))
}
reg := lextwt.NewTwtRegistry(preamble, twts)
reg.WriteTo(w)
})
http.HandleFunc("/api/plain/users", func(w http.ResponseWriter, r *http.Request) {
ctx, span := otel.Span(r.Context())
defer span.End()
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
where := `where parent_id is null and state not in ('permanantly-dead', 'frozen') and last_twt_on is not null`
args := make([]any, 0)
if uri := r.URL.Query().Get("uri"); uri != "" {
where = `where feed_id = ? or parent_id = ?`
feed_id := urlNS.UUID5(uri)
args = append(args, feed_id, feed_id)
}
rows, err := db.QueryContext(
ctx,
`SELECT
feed_id,
uri,
nick,
last_scan_on,
last_twt_on
FROM feeds
left join (
select feed_id, max(strftime('%Y-%m-%dT%H:%M:%fZ', (substring(text, 1, instr(text, ' ')-1)))) last_twt_on
from twts group by feed_id
) using (feed_id)
`+where+`
order by nick, uri
`,args...,
)
if err != nil {
span.RecordError(err)
return
}
defer rows.Close()
var twts []types.Twt
for rows.Next() {
var o struct {
FeedID string
URI string
Nick string
Dt TwtTime
LastTwtOn TwtTime
}
err = rows.Scan(&o.FeedID, &o.URI, &o.Nick, &o.Dt, &o.LastTwtOn)
if err != nil {
span.RecordError(err)
return
}
twts = append(twts, lextwt.NewTwt(
types.NewTwter(o.Nick, o.URI),
lextwt.NewDateTime(o.Dt.Time, o.LastTwtOn.Time.Format(time.RFC3339)),
nil,
))
}
reg := lextwt.NewTwtRegistry(nil, twts)
reg.WriteTo(w)
})
http.HandleFunc("/api/plain/queue", func(w http.ResponseWriter, r *http.Request) {
lis := slices.Collect(app.queue.Iter())
sort.Slice(lis, func(i, j int) bool {
return lis[i].LastScanOn.Time.Before(lis[j].LastScanOn.Time)
return lis[i].NextScanOn.Time.Before(lis[j].LastScanOn.Time)
})
for _, feed := range lis {
fmt.Fprintln(w, feed.State, feed.LastScanOn.Time.Format(time.RFC3339), feed.Nick, feed.URI)
fmt.Fprintln(w, feed.State, feed.NextScanOn.Time.Format(time.RFC3339), feed.Nick, feed.URI)
}
})
@ -73,18 +268,14 @@ func httpServer(c console, app *appState) {
Handler: http.DefaultServeMux,
}
go func() {
<-c.Done()
c.Log("stop http server")
srv.Shutdown(context.Background())
}()
c.AddCancel(srv.Shutdown)
err = srv.ListenAndServe()
if err != nil {
c.Log(err)
c.abort()
return
if !errors.Is(err, http.ErrServerClosed) {
span.RecordError(err)
return err
}
return nil
}
func notAny(s string, chars string) bool {
@ -95,3 +286,28 @@ func notAny(s string, chars string) bool {
}
return true
}
func mkqry(uri string, limit, offset int) string {
qry := make([]string, 0, 3)
if uri != "" {
qry = append(qry, "uri=" + uri)
}
limit = min(100, max(1, limit))
if limit != 100 {
qry = append(qry, fmt.Sprint("limit=", limit))
}
offset = max(0, offset)
if offset != 0 {
qry = append(qry, fmt.Sprint("offset=", offset))
}
if len(qry) == 0 {
return ""
}
return "?" + strings.Join(qry, "&")
}

View File

@ -2,8 +2,10 @@ PRAGMA journal_mode=WAL;
create table if not exists feeds (
feed_id blob primary key,
uri text,
parent_id blob,
nick text,
uri text,
state string,
last_scan_on timestamp,
refresh_rate int default 600,
last_modified_on timestamp,
@ -13,12 +15,12 @@ create table if not exists feeds (
create table if not exists twts (
feed_id blob,
hash text,
conv text,
dt text, -- timestamp with timezone
ulid blob,
text text,
conv text,
hash text,
mentions text, -- json
tags text, -- json
primary key (feed_id, hash)
primary key (feed_id, ulid)
);

333
internal/otel/otel.go Normal file
View File

@ -0,0 +1,333 @@
package otel
import (
"context"
"errors"
"fmt"
"log/slog"
"net/http"
"os"
"runtime"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.opentelemetry.io/contrib/bridges/otelslog"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
otelprom "go.opentelemetry.io/otel/exporters/prometheus"
"go.opentelemetry.io/otel/exporters/stdout/stdoutlog"
"go.opentelemetry.io/otel/exporters/stdout/stdouttrace"
"go.opentelemetry.io/otel/log/global"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/log"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
"go.opentelemetry.io/otel/trace"
)
var (
tracer trace.Tracer
meter metric.Meter
logger *slog.Logger
)
func Init(ctx context.Context, name string) (shutdown func(context.Context) error, err error) {
tracer = otel.Tracer(name)
meter = otel.Meter(name)
logger = otelslog.NewLogger(name)
return setupOTelSDK(ctx, name)
}
func Meter() metric.Meter { return meter }
// func Error(err error, v ...any) {
// if err == nil {
// return
// }
// fmt.Println("ERR:", append([]any{err}, v...))
// logger.Error(err.Error(), v...)
// }
// func Info(msg string, v ...any) { fmt.Println(append([]any{msg}, v...)); logger.Info(msg, v...) }
type spanny struct {
trace.Span
}
func (s *spanny) RecordError(err error, options ...trace.EventOption) {
if err == nil {
return
}
ec := trace.NewEventConfig(options...)
attrs := make([]any, len(ec.Attributes()))
for i, v := range ec.Attributes() {
attrs[i] = v
}
fmt.Println(append([]any{"ERR:", err}, attrs...)...)
logger.Error(err.Error(), attrs...)
s.Span.RecordError(err, options...)
}
func (s *spanny) AddEvent(name string, options ...trace.EventOption) {
ec := trace.NewEventConfig(options...)
attrs := make([]any, 2*len(ec.Attributes()))
for i, v := range ec.Attributes() {
attrs[2*i] = v.Key
attrs[2*i+1] = v.Value.Emit()
}
fmt.Println(append([]any{name}, attrs...)...)
logger.Info(name, attrs...)
s.Span.AddEvent(name, options...)
}
func Span(ctx context.Context, opts ...trace.SpanStartOption) (context.Context, trace.Span) {
name, attrs := Attrs()
ctx, span := tracer.Start(ctx, name, opts...)
span.SetAttributes(attrs...)
return ctx, &spanny{span}
}
func Attrs() (string, []attribute.KeyValue) {
var attrs []attribute.KeyValue
var name string
if pc, file, line, ok := runtime.Caller(2); ok {
if fn := runtime.FuncForPC(pc); fn != nil {
name = fn.Name()
}
attrs = append(attrs,
attribute.String("pc", fmt.Sprintf("%v", pc)),
attribute.String("file", file),
attribute.Int("line", line),
)
}
return name, attrs
}
// setupOTelSDK bootstraps the OpenTelemetry pipeline.
// If it does not return an error, make sure to call shutdown for proper cleanup.
func setupOTelSDK(ctx context.Context, name string) (shutdown func(context.Context) error, err error) {
var shutdownFuncs []func(context.Context) error
// shutdown calls cleanup functions registered via shutdownFuncs.
// The errors from the calls are joined.
// Each registered cleanup will be invoked once.
shutdown = func(ctx context.Context) error {
fmt.Println("shutdown")
var err error
for _, fn := range shutdownFuncs {
err = errors.Join(err, fn(ctx))
}
shutdownFuncs = nil
return err
}
// playShutdown := otelplay.ConfigureOpentelemetry(ctx)
// shutdownFuncs = append(shutdownFuncs, func(ctx context.Context) error { playShutdown(); return nil })
// handleErr calls shutdown for cleanup and makes sure that all errors are returned.
handleErr := func(inErr error) {
err = errors.Join(inErr, shutdown(ctx))
}
// Set up propagator.
prop := newPropagator()
otel.SetTextMapPropagator(prop)
// Set up trace provider.
tracerShutdown, err := newTraceProvider(ctx, name)
if err != nil {
handleErr(err)
return
}
shutdownFuncs = append(shutdownFuncs, tracerShutdown)
// Set up meter provider.
meterShutdown, err := newMeterProvider(ctx, name)
if err != nil {
handleErr(err)
return
}
shutdownFuncs = append(shutdownFuncs, meterShutdown)
// Set up logger provider.
loggerShutdown, err := newLoggerProvider(ctx, name)
if err != nil {
handleErr(err)
return
}
shutdownFuncs = append(shutdownFuncs, loggerShutdown)
return
}
func newPropagator() propagation.TextMapPropagator {
return propagation.NewCompositeTextMapPropagator(
propagation.TraceContext{},
propagation.Baggage{},
)
}
func newTraceProvider(ctx context.Context, name string) (func(context.Context) error, error) {
r, err := resource.Merge(
resource.Default(),
resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceName(name),
),
)
if err != nil {
return nil, err
}
if v := env("XT_TRACER", ""); v != "" {
fmt.Println("use tracer", v)
exp, err := otlptracegrpc.New(
ctx,
otlptracegrpc.WithEndpoint(v),
otlptracegrpc.WithInsecure(),
)
if err != nil {
return nil, err
}
tracerProvider := sdktrace.NewTracerProvider(
sdktrace.WithBatcher(exp),
sdktrace.WithResource(r),
)
otel.SetTracerProvider(tracerProvider)
return func(ctx context.Context) error {
return tracerProvider.Shutdown(ctx)
}, nil
}
traceExporter, err := stdouttrace.New(
stdouttrace.WithWriter(os.Stderr),
stdouttrace.WithPrettyPrint(),
)
if err != nil {
return nil, err
}
tracerProvider := sdktrace.NewTracerProvider(
sdktrace.WithResource(r),
sdktrace.WithBatcher(traceExporter,
// Default is 5s. Set to 1s for demonstrative purposes.
sdktrace.WithBatchTimeout(time.Second)),
)
otel.SetTracerProvider(tracerProvider)
return tracerProvider.Shutdown, nil
}
func newMeterProvider(ctx context.Context, name string) (func(context.Context) error, error) {
// metricExporter, err := stdoutmetric.New()
// if err != nil {
// return nil, err
// }
// meterProvider := sdkmetric.NewMeterProvider(
// sdkmetric.WithReader(sdkmetric.NewPeriodicReader(metricExporter,
// // Default is 1m. Set to 3s for demonstrative purposes.
// sdkmetric.WithInterval(3*time.Second))),
// )
// otel.SetMeterProvider(meterProvider)
metricExporter, err := otelprom.New(
otelprom.WithRegisterer(prometheus.DefaultRegisterer),
// OTEL default buckets assume you're using milliseconds. Substitute defaults
// appropriate for units of seconds.
otelprom.WithAggregationSelector(func(ik sdkmetric.InstrumentKind) sdkmetric.Aggregation {
switch ik {
case sdkmetric.InstrumentKindHistogram:
return sdkmetric.AggregationExplicitBucketHistogram{
Boundaries: prometheus.DefBuckets,
NoMinMax: false,
}
default:
return sdkmetric.DefaultAggregationSelector(ik)
}
}),
)
p := sdkmetric.NewMeterProvider(
sdkmetric.WithReader(metricExporter),
)
otel.SetMeterProvider(p)
http.Handle("/metrics", promhttp.Handler())
return func(ctx context.Context) error { return nil }, err
}
func newLoggerProvider(ctx context.Context, name string) (func(context.Context) error, error) {
r, err := resource.Merge(
resource.Default(),
resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceName(name),
),
)
if err != nil {
return nil, err
}
if v := env("XT_LOGGER", ""); v != "" {
fmt.Println("use logger", v)
exp, err := otlploghttp.New(
ctx,
otlploghttp.WithInsecure(),
otlploghttp.WithEndpointURL(v),
)
if err != nil {
return nil, err
}
processor := log.NewBatchProcessor(exp)
provider := log.NewLoggerProvider(
log.WithProcessor(processor),
log.WithResource(r),
)
global.SetLoggerProvider(provider)
return processor.Shutdown, nil
}
// return func(ctx context.Context) error { return nil }, nil
logExporter, err := stdoutlog.New(
stdoutlog.WithPrettyPrint(),
stdoutlog.WithWriter(os.Stderr),
)
if err != nil {
return nil, err
}
loggerProvider := log.NewLoggerProvider(
log.WithProcessor(
log.NewBatchProcessor(logExporter),
),
log.WithResource(r),
)
global.SetLoggerProvider(loggerProvider)
return loggerProvider.Shutdown, nil
}
func env(key, def string) string {
if v, ok := os.LookupEnv(key); ok {
return v
}
return def
}

117
main.go
View File

@ -1,45 +1,108 @@
package main
import (
"bufio"
"context"
"errors"
"fmt"
"io"
"os"
"os/signal"
"strings"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/metric"
"go.sour.is/xt/internal/otel"
)
const name = "go.sour.is/xt"
var (
tracer = otel.Tracer(name)
meter = otel.Meter(name)
)
var m_up metric.Int64Gauge
type contextKey struct{ name string }
func main() {
dotEnv() // load .env
ctx, console := newConsole(args{
dbtype: env("XT_DBTYPE", "sqlite3"),
dbfile: env("XT_DBFILE", "file:twt.db"),
baseFeed: env("XT_BASE_FEED", "feed"),
Nick: env("XT_NICK", "xuu"),
URI: env("XT_URI", "https://txt.sour.is/user/xuu/twtxt.txt"),
Listen: env("XT_LISTEN", ":8080"),
})
finish, err := otel.Init(ctx, name)
console.IfFatal(err)
console.AddCancel(finish)
m_up, err = otel.Meter().Int64Gauge("up")
console.IfFatal(err)
m_up.Record(ctx, 1)
defer m_up.Record(context.Background(), 0)
err = run(console)
if !errors.Is(err, context.Canceled) {
console.IfFatal(err)
}
}
type console struct {
io.Reader
io.Writer
err io.Writer
context.Context
abort func()
abort func()
cancelfns []func(context.Context) error
}
func (c console) Log(v ...any) { fmt.Fprintln(c.err, v...) }
func (c console) Args() args {
func newConsole(args args) (context.Context, *console) {
ctx := context.Background()
ctx, abort := context.WithCancel(ctx)
ctx, stop := signal.NotifyContext(ctx, os.Interrupt)
go func() { <-ctx.Done(); stop() }() // restore interrupt function
console := &console{Reader: os.Stdin, Writer: os.Stdout, err: os.Stderr, Context: ctx, abort: abort}
console.Set("console", console)
console.Set("args", args)
return ctx, console
}
func (c *console) Args() args {
v, ok := c.Get("args").(args)
if !ok {
return args{}
}
return v
}
func (c *console) Shutdown() error {
fmt.Fprintln(c.err, "shutting down ", len(c.cancelfns), " cancel functions...")
defer fmt.Fprintln(c.err, "done")
c.abort()
var err error
for _, fn := range c.cancelfns {
err = errors.Join(err, fn(c.Context))
}
return err
}
func (c *console) AddCancel(fn func(context.Context) error) { c.cancelfns = append(c.cancelfns, fn) }
func (c *console) IfFatal(err error) {
if err == nil {
return
}
fmt.Fprintln(c.err, err)
c.abort()
os.Exit(1)
}
type contextKey struct{ name string }
func (c *console) Set(name string, value any) {
c.Context = context.WithValue(c.Context, contextKey{name}, value)
}
func (c console) Get(name string) any {
func (c *console) Get(name string) any {
return c.Context.Value(contextKey{name})
}
@ -59,25 +122,25 @@ func env(key, def string) string {
return def
}
func main() {
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt)
console := console{os.Stdin, os.Stdout, os.Stderr, ctx, stop}
go func() { <-ctx.Done(); console.Log("shutdown"); stop() }()
args := args{
dbtype: env("XT_DBTYPE", "sqlite3"),
dbfile: env("XT_DBFILE", "file:twt.db"),
baseFeed: env("XT_BASE_FEED", "feed"),
Nick: env("XT_NICK", "xuu"),
URI: env("XT_URI", "https://txt.sour.is/users/xuu/twtxt.txt"),
Listen: env("XT_LISTEN", ":8040"),
func dotEnv() {
fd, err := os.Open(".env")
if err != nil {
return
}
console.Set("args", args)
scan := bufio.NewScanner(fd)
if err := run(console); err != nil && !errors.Is(err, context.Canceled) {
fmt.Println(err)
os.Exit(1)
for scan.Scan() {
line := scan.Text()
if strings.HasPrefix(line, "#") {
continue
}
key, val, ok := strings.Cut(line, "=")
if !ok {
continue
}
os.Setenv(strings.TrimSpace(key), strings.TrimSpace(val))
}
}

View File

@ -1,105 +1,159 @@
package main
import (
"context"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"strings"
"sort"
"time"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.sour.is/xt/internal/otel"
"go.yarn.social/lextwt"
"go.yarn.social/types"
)
const (
TenYear = 3153600000
OneMonth = OneDay * 30
OneDay = 86400
OneHour = 3600
TenMinutes = 600
TwoMinutes = 60
TwoMinutes = 120
)
func refreshLoop(c console, app *appState) {
defer c.abort()
func feedRefreshProcessor(c *console, app *appState) error {
ctx, span := otel.Span(c.Context)
defer span.End()
sleeping_time, _ := otel.Meter().Int64Counter("xt_feed_sleep")
queue_size, _ := otel.Meter().Int64Gauge("xt_feed_queue_size")
f := NewHTTPFetcher()
fetch, close := NewFuncPool(c.Context, 25, f.Fetch)
fetch, close := NewFuncPool(ctx, 40, f.Fetch)
defer close()
db, err := app.DB()
db, err := app.DB(c)
if err != nil {
c.Log("missing db")
c.abort()
return
span.RecordError(err)
return err
}
go processorLoop(ctx, db, fetch)
queue := app.queue
c.Log("start refresh loop")
for c.Err() == nil {
if queue.IsEmpty() {
c.Log("load feeds")
span.AddEvent("start refresh loop")
it, err := loadFeeds(c.Context, db)
for ctx.Err() == nil {
if queue.IsEmpty() {
span.AddEvent("load feeds")
it, err := loadFeeds(ctx, db)
span.RecordError(err)
for f := range it {
queue.Insert(&f)
}
if err != nil {
c.Log(err)
return
}
}
span.AddEvent("queue", trace.WithAttributes(attribute.Int("size", int(queue.count))))
queue_size.Record(ctx, int64(queue.count))
f := queue.ExtractMin()
if f == nil {
c.Log("sleeping for ", TenMinutes*time.Second)
sleeping_time.Add(ctx, int64(TwoMinutes))
span.AddEvent("sleeping for ", trace.WithAttributes(attribute.Int("seconds", int(TwoMinutes))))
select {
case <-time.After(TenMinutes * time.Second):
case <-time.After(TwoMinutes * time.Second):
case <-c.Done():
return
return nil
}
span.End()
continue
}
c.Log("queue size", queue.count, "next", f.URI, "next scan on", f.LastScanOn.Time.Format(time.RFC3339))
span.AddEvent("next", trace.WithAttributes(
attribute.Int("size", int(queue.count)),
attribute.String("uri", f.URI),
attribute.String("last scan on", f.LastScanOn.Time.Format(time.RFC3339)),
attribute.String("next scan on", f.NextScanOn.Time.Format(time.RFC3339)),
))
until := time.Until(f.NextScanOn.Time)
if until > 2*time.Hour {
span.AddEvent("too soon", trace.WithAttributes(attribute.String("uri", f.URI)))
span.End()
if time.Until(f.LastScanOn.Time) > 2*time.Hour {
c.Log("too soon", f.URI)
continue
}
span.AddEvent(
"till next",
trace.WithAttributes(attribute.String("time", until.String())))
sleeping_time.Add(ctx, until.Milliseconds())
select {
case <-c.Done():
case <-ctx.Done():
return nil
case t := <-time.After(until):
span.AddEvent("fetch", trace.WithAttributes(
attribute.Int("size", int(queue.count)),
attribute.String("uri", f.URI),
attribute.String("timeout", t.Format(time.RFC3339)),
attribute.String("next scan on", f.NextScanOn.Time.Format(time.RFC3339)),
))
}
fetch.Fetch(f)
}
span.RecordError(ctx.Err())
return ctx.Err()
}
func processorLoop(ctx context.Context, db db, fetch *pool[*Feed, *Response]) {
ctx, span := otel.Span(ctx)
defer span.End()
process_in_total, _ := otel.Meter().Int64Counter("xt_processed_in_total")
process_out_total, _ := otel.Meter().Int64Counter("xt_processed_out_total")
twts_histogram, _ := otel.Meter().Float64Histogram("xt_twt1k_bucket")
for ctx.Err() == nil {
select {
case <-ctx.Done():
return
case t := <-time.After(time.Until(f.LastScanOn.Time)):
c.Log("fetch", t.Format(time.RFC3339), f.Nick, f.URI)
fetch.Fetch(f)
case res := <-fetch.Out():
c.Log("got response:", res.Request.URI)
f := res.Request
span.AddEvent("got response", trace.WithAttributes(
attribute.String("uri", f.URI),
attribute.String("scan on", f.NextScanOn.Time.Format(time.RFC3339)),
))
process_in_total.Add(ctx, 1)
f.LastScanOn.Time = time.Now()
f.LastScanOn.Valid = true
err := res.err
if res.err != nil {
f.LastError.String, f.LastError.Valid = err.Error(), true
if errors.Is(err, ErrPermanentlyDead) {
f.State = "permanantly-dead"
f.RefreshRate = TenYear
}
if errors.Is(err, ErrTemporarilyDead) {
f.RefreshRate = OneDay
f.RefreshRate, f.State = tsTemp(f.LastTwtOn.Time)
}
if errors.Is(err, ErrUnmodified) {
f.RefreshRate = OneDay
f.RefreshRate, f.State = tsTemp(f.LastTwtOn.Time)
}
c.Log(err)
err = f.Save(c.Context, db)
if err != nil {
c.Log(err)
return
}
span.RecordError(err)
f.LastError.String, f.LastError.Valid = err.Error(), true
err = f.Save(ctx, db)
span.RecordError(err)
continue
}
@ -107,50 +161,133 @@ func refreshLoop(c console, app *appState) {
f.ETag.String, f.ETag.Valid = res.ETag(), true
f.LastModified.Time, f.LastModified.Valid = res.LastModified(), true
span.AddEvent("read feed")
cpy, err := os.OpenFile(filepath.Join("feeds", urlNS.UUID5(f.URI).MarshalText()), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600)
rdr := io.TeeReader(res.Body, cpy)
twtfile, err := lextwt.ParseFile(rdr, &types.Twter{Nick: f.Nick, URI: f.URI})
if err != nil {
c.Log(fmt.Errorf("%w: %w", ErrParseFailed, err))
span.RecordError(fmt.Errorf("%w: %w", ErrParseFailed, err))
f.LastError.String, f.LastError.Valid = err.Error(), true
f.RefreshRate = OneDay
return
err = f.Save(ctx, db)
span.RecordError(err)
continue
}
if prev, ok := twtfile.Info().GetN("prev", 0); f.FirstFetch && ok {
_, part, ok := strings.Cut(prev.Value(), " ")
if ok {
part = f.URI[:strings.LastIndex(f.URI, "/")+1] + part
queue.Insert(&Feed{
FetchURI: part,
URI: f.URI,
Nick: f.Nick,
LastScanOn: f.LastScanOn,
RefreshRate: f.RefreshRate,
})
}
}
err = storeFeed(db, twtfile)
if err != nil {
c.Log(err)
err = f.Save(c.Context, db)
c.Log(err)
return
}
rdr := io.TeeReader(res.Body, cpy)
rdr = lextwt.TwtFixer(rdr)
twtfile, err := lextwt.ParseFile(rdr, &types.Twter{Nick: f.Nick, URI: f.URI})
cpy.Close()
f.LastScanOn.Time = time.Now()
f.RefreshRate = TenMinutes
if err != nil {
span.RecordError(fmt.Errorf("%w: %w", ErrParseFailed, err))
f.LastError.String, f.LastError.Valid = err.Error(), true
f.RefreshRate = OneDay
err = f.Save(ctx, db)
span.RecordError(err)
continue
}
count := twtfile.Twts().Len()
span.AddEvent("parse complete", trace.WithAttributes(attribute.Int("count", count)))
twts_histogram.Record(ctx, float64(count)/1000)
err = storeFeed(ctx, db, twtfile)
if err != nil {
span.RecordError(err)
f.LastError.String, f.LastError.Valid = err.Error(), true
err = f.Save(ctx, db)
span.RecordError(err)
continue
}
f.RefreshRate, f.State = checkTemp(twtfile.Twts())
f.LastError.String = ""
err = f.Save(c.Context, db)
if err != nil {
c.Log(err)
return
}
err = f.Save(ctx, db)
span.RecordError(err)
process_out_total.Add(ctx, 1)
}
}
span.RecordError(ctx.Err())
}
func checkTemp(twts types.Twts) (int, State) {
if len(twts) < 5 {
return 7 * OneDay, "cold"
}
sort.Sort(twts)
since_first := -time.Until(twts[0].Created())
since_fifth := -time.Until(twts[4].Created())
if since_first < 2*time.Hour || since_fifth < 8*time.Hour {
return TwoMinutes, "hot"
}
if since_first < 4*time.Hour || since_fifth < 16*time.Hour {
return TenMinutes, "hot"
}
if since_first < 8*time.Hour || since_fifth < 32*time.Hour {
return 2 * TenMinutes, "warm"
}
if since_first < 16*time.Hour || since_fifth < 64*time.Hour {
return 4 * TenMinutes, "warm"
}
if since_first < 24*time.Hour || since_fifth < 128*time.Hour {
return OneDay, "cold"
}
if since_first < 48*time.Hour || since_fifth < 256*time.Hour {
return 2 * OneDay, "cold"
}
if since_first < 96*time.Hour || since_fifth < 512*time.Hour {
return 7 * OneDay, "frozen"
}
return OneMonth, "frozen"
}
func tsTemp(ts time.Time) (int, State) {
since_first := -time.Until(ts)
if since_first < 2*time.Hour {
return TwoMinutes, "hot"
}
if since_first < 4*time.Hour {
return TenMinutes, "hot"
}
if since_first < 8*time.Hour {
return 2 * TenMinutes, "warm"
}
if since_first < 16*time.Hour {
return 4 * TenMinutes, "warm"
}
if since_first < 24*time.Hour {
return OneDay, "cold"
}
if since_first < 48*time.Hour {
return 2 * OneDay, "cold"
}
if since_first < 96*time.Hour {
return 7 * OneDay, "frozen"
}
return OneMonth, "frozen"
}

View File

@ -1,115 +0,0 @@
package main
import (
"context"
"database/sql"
"fmt"
"iter"
"os"
"strings"
"sync"
_ "embed"
_ "github.com/mattn/go-sqlite3"
"go.yarn.social/lextwt"
"go.yarn.social/types"
)
func run(c console) error {
a := c.Args()
app := &appState{
args: a,
feeds: sync.Map{},
queue: FibHeap(func(a, b *Feed) bool {
return a.LastScanOn.Time.Before(b.LastScanOn.Time)
}),
}
// Setup DB
err := func(ctx context.Context) error {
db, err := app.DB()
if err != nil {
return err
}
defer db.Close()
for _, stmt := range strings.Split(initSQL, ";") {
_, err = db.ExecContext(ctx, stmt)
if err != nil {
return err
}
}
return nil
}(c.Context)
if err != nil {
return err
}
// Seed File
err = func() error {
f, err := os.Open(a.baseFeed)
if err != nil {
return err
}
defer f.Close()
twtfile, err := lextwt.ParseFile(f, &types.Twter{
Nick: a.Nick,
URI: a.URI,
})
if err != nil {
return fmt.Errorf("%w: %w", ErrParseFailed, err)
}
db, err := app.DB()
if err != nil {
return err
}
defer db.Close()
return storeFeed(db, twtfile)
}()
if err != nil {
return err
}
go refreshLoop(c, app)
go httpServer(c, app)
<-c.Done()
return c.Err()
}
type appState struct {
args args
feeds sync.Map
queue *fibHeap[Feed]
}
func (app *appState) DB() (*sql.DB, error) {
return sql.Open(app.args.dbtype, app.args.dbfile)
}
func (app *appState) Feed(feedID string) *Feed {
return nil
}
func (app *appState) Feeds() iter.Seq2[string, *Feed] {
return func(yield func(string, *Feed) bool) {
app.feeds.Range(func(k, v any) bool {
key, _ := k.(string)
value, ok := v.(*Feed)
if !ok {
return false
}
if !yield(key, value) {
return false
}
return true
})
}
}

View File

@ -70,8 +70,8 @@ func (l *strList) Scan(value any) error {
func (l strList) Value() (driver.Value, error) {
arr := make([]string, len(l))
for i, v := range l {
arr[i] = "\""+v+"\""
arr[i] = "\"" + v + "\""
}
return "["+strings.Join(arr, ",") +"]", nil
}
return "[" + strings.Join(arr, ",") + "]", nil
}