Compare commits

..

54 Commits

Author SHA1 Message Date
xuu
cec0bc81f1 fix 2025-09-22 16:43:30 -06:00
xuu
0a6ba26a0f fix 2025-09-22 16:43:20 -06:00
xuu
e54869e4a1 chore: assorted fixes 2025-04-07 13:17:44 -06:00
xuu
be614cb67e chore: fix 2025-04-06 19:47:53 -06:00
xuu
88b7197d17 chore: add video style 2025-04-06 19:45:08 -06:00
xuu
6499ba09cf chore: add deps 2025-04-04 14:20:38 -06:00
xuu
d6f811bd8f chore: add deps 2025-04-04 14:10:45 -06:00
xuu
922a1b1327 chore: guard on nil 2025-04-04 11:56:18 -06:00
xuu
f52827d62a chore: update deps 2025-04-04 11:49:17 -06:00
xuu
13be3d0688 chore: update deps 2025-04-04 11:30:09 -06:00
xuu
19ff8dbd8f chore: make handling more sensitive 2025-04-04 11:28:45 -06:00
xuu
fc200cf84f chore: fix usage bug 2025-04-02 16:09:11 -06:00
xuu
73eaac5bc6 chore: fix panic 2025-04-02 15:29:33 -06:00
xuu
7d336285bf chore: add view thread 2025-04-02 14:28:12 -06:00
xuu
14e1c4176d chore: more responsive 2025-04-02 14:22:10 -06:00
xuu
1ae8680f43 chore: use browser locale 2025-04-02 13:59:09 -06:00
xuu
4490a89f73 chore: remove font 2025-04-02 13:41:52 -06:00
xuu
c4191ec6cd chore: format html better 2025-04-02 13:20:04 -06:00
xuu
74fa69274d chore: add date formatting 2025-04-02 11:55:56 -06:00
xuu
6b8ad143fe chore: reverse twts for html 2025-04-02 11:17:46 -06:00
xuu
a7009dcb56 chore: add twt html formatting 2025-04-02 10:47:30 -06:00
xuu
ef65b115b7 chore: refactor 2025-03-31 17:39:24 -06:00
xuu
cd2c9abd1b chore: refactor http out 2025-03-31 15:23:40 -06:00
xuu
db93108d0b chore: docs 2025-03-31 11:57:30 -06:00
xuu
9db54a0ad9 chore: fix refresh query 2025-03-31 11:09:13 -06:00
xuu
e58cd8e3f1 chore: fix twts 2025-03-31 10:52:26 -06:00
xuu
69755e14d2 chore: fixes 2025-03-31 10:49:36 -06:00
xuu
aedc9245e5 chore: add twt_mentions table 2025-03-30 21:32:49 -06:00
xuu
fc762f3bf3 chore: fix twt hash 2025-03-30 21:14:11 -06:00
xuu
fb957ed25d chore: add mentions 2025-03-30 13:50:39 -06:00
xuu
6579c50c09 chore: add twt help 2025-03-30 12:12:10 -06:00
xuu
2bb2eec993 chore: filter out perma dead 2025-03-29 22:18:43 -06:00
xuu
07aba6d14a chore: adjust timing 2025-03-29 19:48:06 -06:00
xuu
dab5a115cf chore: fixes and import 2025-03-29 17:09:18 -06:00
xuu
84c3099be6 chore: show last scan for child feeds 2025-03-28 22:16:43 -06:00
xuu
d85dd56ac9 chore: fix twt filter 2025-03-28 17:39:55 -06:00
xuu
b863a2786e chore: helper function for preamble 2025-03-28 07:21:34 -06:00
xuu
00c97eb011 chore: fix twt query 2025-03-27 16:55:52 -06:00
xuu
62229deec5 chore: use XT_URI for seed 2025-03-27 16:52:01 -06:00
xuu
a3e6fc0c0f chore: refactor 2025-03-27 16:35:05 -06:00
xuu
303ca5a2db Merge pull request 'add-otel' (#3) from add-otel into main
Reviewed-on: #3
2025-03-26 20:49:41 -06:00
xuu
22d77d6aef chore: fix 2025-03-26 19:03:20 -06:00
xuu
a0614435ff chore: deps 2025-03-26 18:58:59 -06:00
xuu
fe28b7c2ad chore: go fmt 2025-03-26 18:57:09 -06:00
xuu
b34c9bc99f chore: changes to feed 2025-03-26 18:54:44 -06:00
xuu
5c97bfb182 chore: add twt endpoints 2025-03-25 18:52:30 -06:00
xuu
2fdc43b7de chore: add last twt on support 2025-03-25 17:05:21 -06:00
xuu
2de06ec4d9 chore: adjust timing 2025-03-24 22:29:18 -06:00
xuu
dae57540e3 chore: refine checkTemp 2025-03-24 18:28:57 -06:00
xuu
7ab409403f chore: add 500 status 2025-03-24 17:18:46 -06:00
xuu
8ae61d4a27 chore: fix queue sort, add feed temp 2025-03-24 17:01:50 -06:00
xuu
42a9b26b22 chore: changes for otel and fixes to loop 2025-03-24 16:51:18 -06:00
xuu
42fe9176b7 chore: many fixes to code 2025-03-13 22:36:24 -06:00
xuu
ed5b43300b feat: add otel 2025-02-24 17:28:09 -07:00
19 changed files with 2417 additions and 556 deletions

4
.gitignore vendored
View File

@@ -2,3 +2,7 @@
feed
__debug*
feeds/
/xt
.env
*.txt
*.xz

View File

@@ -1,84 +0,0 @@
.ce
Preamble
.sp
We, the people of the United States, in order
to form a more perfect Union, establish justice, insure
domestic tranquility, provide for the common defense, promote
the general welfare,
and secure the blessing of liberty to ourselves and our
posterity do ordain and establish this Constitution for the
United States of America.
.sp
.nr aR 0 1
.af aR I
.de AR
.ce
.ps 16
.ft B
Article \\n+(aR
.nr sE 0 1
.af sE 1
.ps 12
.ft P
..
.de SE
.sp
.ft B
\\s-2SECTION \\n+(sE:\\s+2
.ft P
.nr pP 0 1
.af pP 1
..
.de PP
.sp
.ft I
\\s-3Paragraph \\n+(pP:\\s+3
.ft P
..
.AR
.SE
Legislative powers; in whom vested:
.PP
All legislative powers herein granted shall be vested in a
Congress of the United States, which shall consist of a Senate
and a House of Representatives.
.SE
House of Representatives, how and by whom chosen, Qualifications
of a Representative. Representatives and direct taxes, how
apportioned. Enumeration. Vacancies to be filled. Power of
choosing officers and of impeachment.
.PP
The House of Representatives shall be composed of members chosen
every second year by the people of the several states, and the
electors in each State shall have the qualifications requisite
for electors of the most numerous branch of the State Legislature.
.PP
No person shall be a Representative who shall not have attained
to the age of twenty-five years, and been seven years a citizen
of the United States, and who shall not, when elected, be an
inhabitant of that State in which he shall be chosen.
.PP
Representatives and direct taxes shall be apportioned among the
several States which maybe included within this Union, according
to their respective numbers, which shall be determined by adding
to the whole number of free persons, including those bound for
service for a term of years, and excluding Indians not taxed,
three-fifths of all other persons. The actual enumeration shall
be made within three years after the first meeting of the
Congress of the United States, and within every subsequent term
of ten years, in such manner as they shall by law direct. The
number of Representatives shall not exceed one for every thirty
thousand, but each State shall have at least one Representative;
and until such enumeration shall be made, the State of New
Hampshire shall be entitled to choose three, Massachusetts eight,
Rhode Island and Providence Plantations one, Connecticut
five, New York six, New Jersey four, Pennsylvania eight,
Delaware one, Maryland six, Virginia ten, North Carolina five,
South Carolina five, and Georgia three.
.PP
When vacancies happen in the representation from any State, the
Executive Authority thereof shall issue writs of election to fill
such vacancies.
.PP
The House of Representatives shall choose their Speaker and other
officers; and shall have the sole power of impeachment.

192
app.go Normal file
View File

@@ -0,0 +1,192 @@
package main
import (
"context"
"database/sql"
"fmt"
"io"
"net/http"
"os"
"runtime/debug"
"strconv"
"strings"
_ "embed"
_ "github.com/mattn/go-sqlite3"
"github.com/uptrace/opentelemetry-go-extra/otelsql"
"go.opentelemetry.io/otel/attribute"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
"go.opentelemetry.io/otel/trace"
"go.sour.is/xt/internal/console"
"go.sour.is/xt/internal/otel"
"go.yarn.social/lextwt"
"go.yarn.social/types"
"golang.org/x/sync/errgroup"
)
func run(ctx context.Context, c *console.C[args]) error {
ctx, span := otel.Span(ctx)
defer span.End()
bi, _ := debug.ReadBuildInfo()
span.AddEvent(name, trace.WithAttributes(attribute.String("version", bi.Main.Version)))
a := c.Args()
app := &appState{
args: a,
C: c,
queue: FibHeap(func(a, b *Feed) bool {
return a.NextScanOn.Time.Before(b.NextScanOn.Time)
}),
}
// Setup DB
err := func(ctx context.Context) error {
ctx, span := otel.Span(ctx)
defer span.End()
db, err := app.DB(ctx)
if err != nil {
return err
}
defer db.Close()
for _, stmt := range strings.Split(initSQL, ";") {
_, err = db.ExecContext(ctx, stmt)
if err != nil {
return err
}
}
return nil
}(ctx)
if err != nil {
return err
}
// Seed File
err = func(ctx context.Context) error {
ctx, span := otel.Span(ctx)
defer span.End()
db, err := app.DB(ctx)
if err != nil {
return err
}
defer db.Close()
var inFile io.Reader
if a.baseFeed != "" {
f, err := os.Open(a.baseFeed)
if err != nil {
return err
}
defer f.Close()
err = storeRegistry(ctx, db, f)
if err != nil {
return err
}
}
if a.URI != "" {
res, err := http.Get(a.URI)
if err != nil {
return err
}
inFile = res.Body
defer res.Body.Close()
twtfile, err := lextwt.ParseFile(inFile, &types.Twter{
Nick: a.Nick,
URI: a.URI,
})
if err != nil {
return fmt.Errorf("%w: %w", ErrParseFailed, err)
}
return storeFeed(ctx, db, twtfile)
}
return nil
}(ctx)
if err != nil {
return err
}
wg, ctx := errgroup.WithContext(ctx)
wg.Go(func() error {
return feedRefreshProcessor(ctx, app)
})
go httpServer(ctx, app)
err = wg.Wait()
if err != nil {
return err
}
return ctx.Err()
}
type appState struct {
args args
queue *fibHeap[Feed]
*console.C[args]
}
type db struct {
*sql.DB
Version string
Params map[string]string
MaxLength int
MaxVariableNumber int
}
func (app *appState) DB(ctx context.Context) (db, error) {
// return sql.Open(app.args.dbtype, app.args.dbfile)
var err error
db := db{Params: make(map[string]string)}
db.DB, err = otelsql.Open(app.args.dbtype, app.args.dbfile,
otelsql.WithAttributes(semconv.DBSystemSqlite),
otelsql.WithDBName("xt"))
if err != nil {
return db, err
}
rows, err := db.DB.QueryContext(ctx, `select sqlite_version()`)
if err != nil {
return db, err
}
if rows.Next() {
rows.Scan(&db.Version)
}
if err = rows.Err(); err != nil {
return db, err
}
rows, err = db.DB.QueryContext(ctx, `pragma compile_options`)
if err != nil {
return db, err
}
for rows.Next() {
var key, value string
rows.Scan(&key)
key, value, _ = strings.Cut(key, "=")
db.Params[key] = value
}
if err = rows.Err(); err != nil {
return db, err
}
if m, ok := db.Params["MAX_VARIABLE_NUMBER"]; ok {
db.MaxVariableNumber, _ = strconv.Atoi(m)
}
if m, ok := db.Params["MAX_LENGTH"]; ok {
db.MaxLength, _ = strconv.Atoi(m)
}
return db, err
}

852
feed.go
View File

@@ -3,8 +3,11 @@ package main
import (
"cmp"
"context"
"crypto/sha3"
"database/sql"
"database/sql/driver"
"fmt"
"io"
"iter"
"net/http"
"net/url"
@@ -14,28 +17,34 @@ import (
_ "embed"
"github.com/oklog/ulid/v2"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.sour.is/xt/internal/otel"
"go.sour.is/xt/internal/uuid"
"go.yarn.social/lextwt"
"go.yarn.social/types"
)
type Feed struct {
FeedID uuid
FetchURI string
FeedID uuid.UUID
ParentID uuid.UUID
HashURI string
URI string
Nick string
LastScanOn sql.NullTime
State State
LastScanOn TwtTime
RefreshRate int
NextScanOn TwtTime
LastTwtOn TwtTime
LastModified sql.NullTime
LastModified TwtTime
LastError sql.NullString
ETag sql.NullString
Version string
DiscloseFeedURL string
DiscloseNick string
FirstFetch bool
State State
}
type State string
@@ -46,40 +55,36 @@ const (
Cold State = "cold"
Warm State = "warm"
Hot State = "hot"
Once State = "once"
)
var (
//go:embed init.sql
initSQL string
insertFeed = `
insert into feeds
(feed_id, uri, nick, last_scan_on, refresh_rate)
values (?, ?, ?, ?, ?)
ON CONFLICT (feed_id) DO NOTHING
`
insertTwt = `
insert into twts
(feed_id, hash, conv, dt, text, mentions, tags)
values (?, ?, ?, ?, ?, ?, ?)
ON CONFLICT (feed_id, hash) DO NOTHING
`
fetchFeeds = `
select
feed_id,
uri,
nick,
last_scan_on,
refresh_rate,
last_modified_on,
last_etag
from feeds
where datetime(last_scan_on, '+'||refresh_rate||' seconds') < datetime(current_timestamp, '+10 minutes')
`
insertFeed = func(r int) (string, int) {
repeat := ""
if r > 1 {
repeat = strings.Repeat(", (?, ?, ?, ?, ?, ?, ?)", r-1)
}
return `
insert into feeds (
feed_id,
parent_id,
nick,
uri,
state,
last_scan_on,
refresh_rate
)
values (?, ?, ?, ?, ?, ?, ?)` + repeat + `
ON CONFLICT (feed_id) DO NOTHING
`, r * 7
}
updateFeed = `
update feeds set
update feeds set
nick = ?,
state = ?,
last_scan_on = ?,
refresh_rate = ?,
last_modified_on = ?,
@@ -87,19 +92,85 @@ var (
last_error = ?
where feed_id = ?
`
insertTwt = func(r int) (string, int) {
repeat := ""
if r > 1 {
repeat = strings.Repeat(", (?, ?, ?, ?, ?, ?, ?)", r-1)
}
return `
insert into twts (
feed_id,
ulid,
text,
hash,
conv,
mentions,
tags
)
values (?, ?, ?, ?, ?, ?, ?)` + repeat + `
ON CONFLICT (feed_id, ulid) DO UPDATE SET
conv = excluded.conv,
hash = excluded.hash
`, r * 7
}
fetchFeeds = `
select
feed_id,
parent_id,
coalesce(hashing_uri, uri) hash_uri,
uri,
nick,
state,
last_scan_on,
strftime(
'%Y-%m-%dT%H:%M:%fZ',
coalesce(last_scan_on, '1901-01-01'),
'+'||abs(refresh_rate + cast(random() % 30 as int))||' seconds'
) next_scan_on,
coalesce(last_twt_on, '1901-01-01T00:00:00Z') last_twt_on,
refresh_rate,
last_modified_on,
last_etag
from feeds
left join (
select feed_id, max(strftime('%Y-%m-%dT%H:%M:%fZ', (substring(text, 1, instr(text, ' ')-1)))) last_twt_on
from twts group by feed_id
) using (feed_id)
left join (
select
feed_id parent_id,
uri hashing_uri
from feeds
where parent_id is null
) using (parent_id)
where datetime(
coalesce(last_scan_on, '1901-01-01'),
'+'||abs(refresh_rate+cast(random()%30 as int))||' seconds'
) < datetime(current_timestamp, '+3 minutes')
`
permaban = []string{
"//lublin.se/",
"//enotty.dk/",
}
)
func (f *Feed) Save(ctx context.Context, db *sql.DB) error {
fmt.Println(f.FetchURI, " ", f.LastModified, " ", f.LastError)
func (f *Feed) Save(ctx context.Context, db db) error {
ctx, span := otel.Span(ctx)
defer span.End()
_, err := db.ExecContext(
ctx,
updateFeed,
f.LastScanOn,
f.RefreshRate,
f.LastModified,
f.ETag,
f.LastError,
f.FeedID,
f.Nick,
f.State, // state
f.LastScanOn, // last_scan_on
f.RefreshRate, // refresh_rate
f.LastModified, // last_modified_on
f.ETag, // last_etag
f.LastError, // last_error
f.FeedID, // feed_id
)
return err
}
@@ -111,9 +182,14 @@ func (f *Feed) Scan(res interface{ Scan(...any) error }) error {
f.Version = "0.0.1"
err = res.Scan(
&f.FeedID,
&f.ParentID,
&f.HashURI,
&f.URI,
&f.Nick,
&f.State,
&f.LastScanOn,
&f.NextScanOn,
&f.LastTwtOn,
&f.RefreshRate,
&f.LastModified,
&f.ETag,
@@ -122,19 +198,12 @@ func (f *Feed) Scan(res interface{ Scan(...any) error }) error {
return err
}
if !f.LastScanOn.Valid {
f.FirstFetch = true
f.LastScanOn.Time = time.Now()
f.LastScanOn.Valid = true
} else {
f.LastScanOn.Time = f.LastScanOn.Time.Add(time.Duration(f.RefreshRate) * time.Second)
}
f.FetchURI = f.URI
return err
}
func loadFeeds(ctx context.Context, db *sql.DB) (iter.Seq[Feed], error) {
func loadFeeds(ctx context.Context, db db) (iter.Seq[Feed], error) {
ctx, span := otel.Span(ctx)
var err error
var res *sql.Rows
@@ -145,10 +214,13 @@ func loadFeeds(ctx context.Context, db *sql.DB) (iter.Seq[Feed], error) {
}
return func(yield func(Feed) bool) {
defer span.End()
for res.Next() {
var f Feed
err = f.Scan(res)
if err != nil {
span.RecordError(err)
return
}
if !yield(f) {
@@ -158,13 +230,22 @@ func loadFeeds(ctx context.Context, db *sql.DB) (iter.Seq[Feed], error) {
}, err
}
func storeFeed(db *sql.DB, f types.TwtFile) error {
func storeFeed(ctx context.Context, db db, f types.TwtFile) error {
ctx, span := otel.Span(ctx)
defer span.End()
loadTS := time.Now()
refreshRate := 600
feedID := urlNS.UUID5(cmp.Or(f.Twter().HashingURI, f.Twter().URI))
feedURI, _ := f.Info().GetN("uri", 0)
tx, err := db.Begin()
feedID := uuid.UrlNS.UUID5(cmp.Or(
feedURI.Value(),
f.Twter().HashingURI,
f.Twter().URI,
))
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return err
}
@@ -172,42 +253,36 @@ func storeFeed(db *sql.DB, f types.TwtFile) error {
followers := f.Info().GetAll("follow")
followMap := make(map[string]string, len(followers))
for _, f := range f.Info().GetAll("follow") {
nick, uri, ok := strings.Cut(f.Value(), "http")
nick, uri, ok := strings.Cut(f.Value(), " ")
if !ok {
continue
}
nick = strings.TrimSpace(nick)
uri = "http" + strings.TrimSpace(uri)
uri = strings.TrimSpace(uri)
if _, err := url.Parse(uri); err != nil {
continue
}
followMap[nick] = uri
followMap[uri] = nick
}
defer tx.Rollback()
_, err = tx.Exec(
insertFeed,
feedID,
f.Twter().HashingURI,
f.Twter().DomainNick(),
loadTS,
refreshRate,
)
if err != nil {
return err
}
twts := f.Twts()
_, size := insertTwt(len(twts))
args := make([]any, 0, size)
for _, twt := range f.Twts() {
mentions := make(uuids, 0, len(twt.Mentions()))
for _, twt := range twts {
twtID := makeULID(twt)
mentions := make(uuid.UUIDs, 0, len(twt.Mentions()))
for _, mention := range twt.Mentions() {
followMap[mention.Twter().Nick] = mention.Twter().URI
mentions = append(mentions, urlNS.UUID5(mention.Twter().URI))
followMap[mention.Twter().URI] = mention.Twter().Nick
mentions = append(mentions, uuid.UrlNS.UUID5(mention.Twter().URI))
}
tags := make(strList, 0, len(twt.Tags()))
tags := make(uuid.List, 0, len(twt.Tags()))
for _, tag := range twt.Tags() {
tags = append(tags, tag.Text())
}
@@ -220,29 +295,83 @@ func storeFeed(db *sql.DB, f types.TwtFile) error {
}
}
_, err = tx.Exec(
insertTwt,
feedID,
twt.Hash(),
subjectTag,
twt.Created(),
fmt.Sprint(twt),
mentions.ToStrList(),
tags,
args = append(
args,
feedID, // feed_id
twtID, // ulid
fmt.Sprintf("%+l", twt), // text
twt.Hash(), // hash
subjectTag, // conv
mentions.ToStrList(), // mentions
tags, // tags
)
}
for query, args := range chunk(args, insertTwt, db.MaxVariableNumber) {
fmt.Println("store", f.Twter().URI, len(args))
_, err = tx.ExecContext(
ctx,
query,
args...,
)
if err != nil {
return err
}
}
for nick, uri := range followMap {
_, err = tx.Exec(
insertFeed,
urlNS.UUID5(uri),
uri,
nick,
nil,
refreshRate,
args = args[:0]
args = append(args,
feedID, // feed_id
nil, // parent_id
f.Twter().DomainNick(), // nick
f.Twter().URI, // uri
"warm", // state
TwtTime{Time: loadTS, Valid: true}, // last_scan_on
refreshRate, // refresh_rate
)
if prev, ok := f.Info().GetN("prev", 0); ok {
_, part, ok := strings.Cut(prev.Value(), " ")
if ok {
uri := f.Twter().URI
if u, ok := f.Info().GetN("url", 0); ok {
uri = u.Value()
}
if u, ok := f.Info().GetN("uri", 0); ok {
uri = u.Value()
}
part = uri[:strings.LastIndex(uri, "/")+1] + part
childID := uuid.UrlNS.UUID5(part)
fmt.Println("found prev", uri, part)
args = append(args,
childID, // feed_id
feedID, // parent_id
f.Twter().DomainNick(), // nick
part, // uri
"once", // state
nil, // last_scan_on
0, // refresh_rate
)
}
}
for uri, nick := range followMap {
args = append(args,
uuid.UrlNS.UUID5(uri), // feed_id
nil, // parent_id
nick, // nick
uri, // uri
"warm", // state
nil, // last_scan_on
refreshRate, // refresh_rate
)
}
for query, args := range chunk(args, insertFeed, db.MaxVariableNumber) {
_, err = tx.ExecContext(
ctx,
query,
args...,
)
if err != nil {
return err
@@ -251,14 +380,141 @@ func storeFeed(db *sql.DB, f types.TwtFile) error {
return tx.Commit()
}
func storeRegistry(ctx context.Context, db db, in io.Reader) error {
ctx, span := otel.Span(ctx)
defer span.End()
func (feed *Feed) MakeHTTPRequest(ctx context.Context) (*http.Request, error) {
feed.State = "fetch"
if strings.Contains(feed.FetchURI, "lublin.se") {
return nil, fmt.Errorf("%w: permaban: %s", ErrPermanentlyDead, feed.URI)
twters := make(map[string]string)
args := make([]any, 0, 1024*16)
i := 0
for line := range lextwt.IterRegistry(in) {
twt, ok := line.(*lextwt.Twt)
if !ok {
continue
}
nick := twt.Twter().DomainNick()
uri := twt.Twter().URI
feedID := uuid.UrlNS.UUID5(uri)
twtID := makeULID(twt)
text := fmt.Sprintf("%+l", twt)
// if !strings.HasPrefix(uri, "http") {
// fmt.Println("skip bad uri ", nick, uri)
// continue
// }
// if strings.HasPrefix(nick, "http") {
// fmt.Println("skip bad nick", nick, uri)
// continue
// }
twters[uri] = nick
mentions := make(uuid.UUIDs, 0, len(twt.Mentions()))
for _, mention := range twt.Mentions() {
twters[uri] = nick
mentions = append(mentions, uuid.UrlNS.UUID5(mention.Twter().URI))
}
tags := make(uuid.List, 0, len(twt.Tags()))
for _, tag := range twt.Tags() {
tags = append(tags, tag.Text())
}
subject := twt.Subject()
subjectTag := ""
if tag, ok := subject.Tag().(*lextwt.Tag); ok && tag != nil {
subjectTag = tag.Text()
}
args = append(
args,
feedID, // feed_id
twtID, // ulid
text, // text
twt.Hash(), // hash
subjectTag, // conv
mentions.ToStrList(), // mentions
tags, // tags
)
if len(args) >= 16*1022 {
i += len(args)
fmt.Println("store", i/7, i)
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return err
}
for query, args := range chunk(args, insertTwt, db.MaxVariableNumber) {
// fmt.Println("store", len(args))
_, err = tx.ExecContext(
ctx,
query,
args...,
)
if err != nil {
return err
}
}
args = args[:0]
for uri, nick := range twters {
// if !strings.HasPrefix(uri, "http") {
// fmt.Println("skip", nick, uri)
// continue
// }
// if strings.HasPrefix(nick, "http") {
// fmt.Println("skip bad nick", nick, uri)
// continue
// }
feedID := uuid.UrlNS.UUID5(uri)
args = append(args,
feedID, // feed_id
nil, // parent_id
nick, // nick
uri, // uri
PermanentlyDead, // state
nil, // last_scan_on
TenYear, // refresh_rate
)
}
for query, args := range chunk(args, insertFeed, db.MaxVariableNumber) {
_, err = tx.ExecContext(
ctx,
query,
args...,
)
if err != nil {
return err
}
}
args = args[:0]
err = tx.Commit()
if err != nil {
return err
}
}
}
req, err := http.NewRequestWithContext(ctx, "GET", feed.FetchURI, nil)
return refreshLastTwt(ctx, db)
}
func (feed *Feed) MakeHTTPRequest(ctx context.Context) (*http.Request, error) {
for _, host := range permaban {
if strings.Contains(feed.URI, host) {
return nil, fmt.Errorf("%w: permaban: %s", ErrPermanentlyDead, feed.URI)
}
}
req, err := http.NewRequestWithContext(ctx, "GET", feed.URI, nil)
if err != nil {
return nil, fmt.Errorf("creating HTTP request failed: %w", err)
}
@@ -273,6 +529,7 @@ func (feed *Feed) MakeHTTPRequest(ctx context.Context) (*http.Request, error) {
req.Header.Add("If-None-Match", feed.ETag.String)
}
// TODO: this is probably not needed.
if feed.DiscloseFeedURL != "" && feed.DiscloseNick != "" {
req.Header.Set("User-Agent", fmt.Sprintf("xt/%s (+%s; @%s)",
feed.Version, feed.DiscloseFeedURL, feed.DiscloseNick))
@@ -282,3 +539,406 @@ func (feed *Feed) MakeHTTPRequest(ctx context.Context) (*http.Request, error) {
return req, nil
}
type TwtTime struct {
Time time.Time
Valid bool // Valid is true if Time is not NULL
}
// Scan implements the [Scanner] interface.
func (n *TwtTime) Scan(value any) error {
var err error
switch value := value.(type) {
case nil:
n.Time, n.Valid = time.Time{}, false
return nil
case string:
n.Time, err = time.Parse(time.RFC3339, value)
n.Valid = err == nil
case time.Time:
n.Valid = true
n.Time = value
}
return err
}
// Value implements the [driver.Valuer] interface.
func (n TwtTime) Value() (driver.Value, error) {
if !n.Valid {
return nil, nil
}
return n.Time.Format(time.RFC3339), nil
}
func makeULID(twt types.Twt) ulid.ULID {
text := fmt.Appendf(nil, "%s\t%+l", twt.Twter().URI, twt)
u := ulid.ULID{}
u.SetTime(ulid.Timestamp(twt.Created()))
u.SetEntropy(sha3.SumSHAKE128(text, 10))
return u
}
func chunk(args []any, qry func(int) (string, int), maxArgs int) iter.Seq2[string, []any] {
_, size := qry(1)
itemsPerIter := maxArgs / size
if len(args) < size {
return func(yield func(string, []any) bool) {}
}
if len(args) < maxArgs {
return func(yield func(string, []any) bool) {
query, _ := qry(len(args) / size)
yield(query, args)
}
}
return func(yield func(string, []any) bool) {
for len(args) > 0 {
if len(args) > maxArgs {
query, size := qry(itemsPerIter)
if !yield(query, args[:size]) {
return
}
args = args[size:]
continue
}
query, _ := qry(len(args) / size)
yield(query, args)
return
}
}
}
func refreshLastTwt(ctx context.Context, db db) error {
qry := `
delete from last_twt_on;
insert into last_twt_on (feed_id, last_twt_on)
select distinct
feed_id,
max(strftime('%Y-%m-%dT%H:%M:%fZ', (substring(text, 1, instr(text, ' ')-1)))) last_twt_on
from twts
group by feed_id;
delete from twt_mentions;
insert into twt_mentions (ulid, feed_id)
select distinct
ulid,
unhex(replace(trim(value,'{}'),'-','')) feed_id
from twts, json_each(mentions);
`
var err error
for _, stmt := range strings.Split(qry, ";") {
_, err = db.ExecContext(ctx, stmt)
if err != nil {
return err
}
}
return err
}
func fetchTwts(ctx context.Context, db db, uri string, limit int, offset int64) ([]types.Twt, int64, int64, error) {
ctx, span := otel.Span(ctx)
defer span.End()
args := make([]any, 0, 3)
where := `where feed_id in (select feed_id from feeds where state != 'permanantly-dead')`
if uri != "" {
feed_id := uuid.UrlNS.UUID5(uri)
where = "where feed_id = ?"
args = append(args, feed_id)
}
var end int64
err := db.QueryRowContext(ctx, `
select count(*) n from twts `+where+``, args...).Scan(&end)
span.RecordError(err)
if err != nil {
return nil, 0, 0, err
}
if offset < 1 {
offset += end
}
offset = max(1, offset)
args = append(args, limit, offset-int64(limit))
span.AddEvent("twts", trace.WithAttributes(
attribute.Int("limit", limit),
attribute.Int64("offset-end", offset),
attribute.Int64("offset-start", offset-int64(limit)),
attribute.Int64("max", end),
))
qry := `
SELECT
feed_id,
hash,
conv,
coalesce(nick, 'nobody') nick,
coalesce(uri, 'https://empty.txt') uri,
text
FROM twts
join (
select feed_id, nick, uri
from feeds
) using (feed_id)
where rowid in (
select rowid from twts
` + where + `
order by ulid asc
limit ?
offset ?
)
order by ulid asc`
fmt.Println(qry, args)
rows, err := db.QueryContext(
ctx, qry, args...,
)
if err != nil {
span.RecordError(err)
return nil, 0, 0, err
}
defer rows.Close()
var twts []types.Twt
for rows.Next() {
var o struct {
FeedID string
Hash string
Conv string
Dt string
Nick string
URI string
Text string
}
err = rows.Scan(&o.FeedID, &o.Hash, &o.Conv, &o.Nick, &o.URI, &o.Text)
if err != nil {
span.RecordError(err)
return nil, 0, 0, err
}
twter := types.NewTwter(o.Nick, o.URI)
twt, _ := lextwt.ParseLine(o.Text, &twter)
twts = append(twts, twt)
}
return twts, offset, end, err
}
func fetchUsers(ctx context.Context, db db, uri, q string) ([]types.Twt, error) {
ctx, span := otel.Span(ctx)
defer span.End()
where := `where parent_id is null and state not in ('permanantly-dead', 'frozen') and last_twt_on is not null`
args := make([]any, 0)
if uri != "" {
where = `where feed_id = ? or parent_id = ?`
feed_id := uuid.UrlNS.UUID5(uri)
args = append(args, feed_id, feed_id)
} else if q != "" {
where = `where nick like ?`
args = append(args, "%"+q+"%")
}
qry := `
SELECT
feed_id,
uri,
nick,
last_scan_on,
coalesce(last_twt_on, last_scan_on) last_twt_on
FROM feeds
left join last_twt_on using (feed_id)
` + where + `
order by nick, uri
`
fmt.Println(qry, args)
rows, err := db.QueryContext(ctx, qry, args...)
if err != nil {
span.RecordError(err)
return nil, err
}
defer rows.Close()
var twts []types.Twt
for rows.Next() {
var o struct {
FeedID string
URI string
Nick string
Dt TwtTime
LastTwtOn TwtTime
}
err = rows.Scan(&o.FeedID, &o.URI, &o.Nick, &o.Dt, &o.LastTwtOn)
if err != nil {
span.RecordError(err)
return nil, err
}
twts = append(twts, lextwt.NewTwt(
types.NewTwter(o.Nick, o.URI),
lextwt.NewDateTime(o.Dt.Time, o.LastTwtOn.Time.Format(time.RFC3339)),
nil,
))
}
return twts, nil
}
func fetchMentions(ctx context.Context, db db, mention uuid.UUID, limit int, offset int64) ([]types.Twt, int64, int64, error) {
ctx, span := otel.Span(ctx)
defer span.End()
args := make([]any, 0, 3)
args = append(args, mention)
var end int64
err := db.QueryRowContext(ctx, `
select count(*) n from twt_mentions where feed_id = ?`, args...).Scan(&end)
span.RecordError(err)
if err != nil {
return nil, 0, 0, err
}
fmt.Println(mention.MarshalText(), end, err)
if offset < 1 {
offset += end
}
limit = min(100, max(1, limit))
offset = max(1, offset)
args = append(args, limit, offset-int64(limit))
qry := `
SELECT
feed_id,
hash,
conv,
coalesce(nick, 'nobody') nick,
coalesce(uri, 'https://empty.txt') uri,
text
FROM twts
join (
select feed_id, nick, uri
from feeds
) using (feed_id)
where rowid in (
select rowid from twts
where ulid in (select ulid from twt_mentions where feed_id = ?)
order by ulid asc
limit ?
offset ?
)
order by ulid asc
`
fmt.Println(qry, args)
rows, err := db.QueryContext(
ctx, qry, args...,
)
if err != nil {
span.RecordError(err)
return nil, 0, 0, err
}
defer rows.Close()
var twts []types.Twt
for rows.Next() {
var o struct {
FeedID string
Hash string
Conv string
Dt string
Nick string
URI string
Text string
}
err = rows.Scan(&o.FeedID, &o.Hash, &o.Conv, &o.Nick, &o.URI, &o.Text)
if err != nil {
span.RecordError(err)
return nil, 0, 0, err
}
twter := types.NewTwter(o.Nick, o.URI)
// o.Text = strings.ReplaceAll(o.Text, "\n", "\u2028")
twt, _ := lextwt.ParseLine(o.Text, &twter)
twts = append(twts, twt)
}
if err := rows.Err(); err != nil {
fmt.Println(err)
return nil, 0, 0, err
}
return twts, offset, end, err
}
func fetchConv(ctx context.Context, db db, hash string, _ int, offset int64) ([]types.Twt, int64, int64, error) {
ctx, span := otel.Span(ctx)
defer span.End()
var end int64
err := db.QueryRowContext(ctx, `
select count(*) n from twts where hash = $1 or conv = $1`, hash).Scan(&end)
span.RecordError(err)
if err != nil {
return nil, 0, 0, err
}
rows, err := db.QueryContext(
ctx, `
SELECT
feed_id,
hash,
conv,
nick,
uri,
text
FROM twts
JOIN (
SELECT
feed_id,
nick,
uri
FROM feeds
) using (feed_id)
WHERE
hash = $1 or
conv = $1
order by ulid asc`, hash,
)
if err != nil {
span.RecordError(err)
return nil, 0, 0, err
}
defer rows.Close()
var twts []types.Twt
for rows.Next() {
var o struct {
FeedID string
Hash string
Conv string
Dt string
Nick string
URI string
Text string
}
err = rows.Scan(&o.FeedID, &o.Hash, &o.Conv, &o.Nick, &o.URI, &o.Text)
if err != nil {
span.RecordError(err)
return nil, 0, 0, err
}
twter := types.NewTwter(o.Nick, o.URI)
twt, _ := lextwt.ParseLine(o.Text, &twter)
twts = append(twts, twt)
}
err = rows.Err()
return twts, offset, end, err
}

View File

@@ -8,6 +8,10 @@ import (
"net/http"
"sync"
"time"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.sour.is/xt/internal/otel"
)
var (
@@ -48,10 +52,18 @@ func (r *Response) LastModified() time.Time {
type httpFetcher struct {
client *http.Client
m_fetch_status metric.Int64Counter
m_fetch_second metric.Float64Histogram
}
func NewHTTPFetcher() *httpFetcher {
fetch_total, _ := otel.Meter().Int64Counter("xt_fetch_status_total")
fetch_second, _ := otel.Meter().Float64Histogram("xt_fetch_seconds")
return &httpFetcher{
m_fetch_status: fetch_total,
m_fetch_second: fetch_second,
client: &http.Client{
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
@@ -62,7 +74,8 @@ func NewHTTPFetcher() *httpFetcher {
ForceAttemptHTTP2: false,
MaxIdleConns: 100,
IdleConnTimeout: 10 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ResponseHeaderTimeout: 5 * time.Second,
TLSHandshakeTimeout: 5 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
},
},
@@ -70,6 +83,16 @@ func NewHTTPFetcher() *httpFetcher {
}
func (f *httpFetcher) Fetch(ctx context.Context, request *Feed) *Response {
ctx, span := otel.Span(ctx)
defer span.End()
start := time.Now()
defer func() {
since := time.Since(start)
f.m_fetch_second.Record(ctx, since.Seconds())
}()
defer fmt.Println("fetch done", request.URI)
response := &Response{
Request: request,
}
@@ -79,8 +102,9 @@ func (f *httpFetcher) Fetch(ctx context.Context, request *Feed) *Response {
response.err = err
return response
}
span.AddEvent("start request")
res, err := f.client.Do(req)
span.AddEvent("got response")
if err != nil {
if errors.Is(err, &net.DNSError{}) {
response.err = fmt.Errorf("%w: %s", ErrTemporarilyDead, err)
@@ -93,17 +117,24 @@ func (f *httpFetcher) Fetch(ctx context.Context, request *Feed) *Response {
response.Response = res
switch res.StatusCode {
case 200:
f.m_fetch_status.Add(ctx, 1, metric.WithAttributes(attribute.String("status", "ok")))
case 304:
f.m_fetch_status.Add(ctx, 1, metric.WithAttributes(attribute.String("status", "not_modified")))
response.err = fmt.Errorf("%w: %s", ErrUnmodified, res.Status)
case 400, 406, 502, 503:
case 400, 406, 429, 500, 502, 503:
f.m_fetch_status.Add(ctx, 1, metric.WithAttributes(attribute.String("status", "temp_fail")))
response.err = fmt.Errorf("%w: %s", ErrTemporarilyDead, res.Status)
case 403, 404, 410:
f.m_fetch_status.Add(ctx, 1, metric.WithAttributes(attribute.String("status", "perm_fail")))
response.err = fmt.Errorf("%w: %s", ErrPermanentlyDead, res.Status)
default:
f.m_fetch_status.Add(ctx, 1, metric.WithAttributes(attribute.Int("status", res.StatusCode)))
response.err = errors.New(res.Status)
}
@@ -121,6 +152,9 @@ func NewFuncPool[IN, OUT any](
size int,
fetch func(ctx context.Context, request IN) OUT,
) (*pool[IN, OUT], func()) {
ctx, span := otel.Span(ctx)
defer span.End()
var wg sync.WaitGroup
in := make(chan IN, size)
@@ -129,21 +163,39 @@ func NewFuncPool[IN, OUT any](
wg.Add(size)
for range size {
go func() {
ctx, span := otel.Span(ctx)
defer span.End()
defer wg.Done()
for request := range in {
ctx, cancel := context.WithTimeoutCause(ctx, 15*time.Second, fmt.Errorf("GOT STUCK"))
defer cancel()
ctx, span := otel.Span(ctx)
defer span.End()
span.AddEvent("start fetch")
r := fetch(ctx, request)
span.AddEvent("got fetch")
select {
case <-ctx.Done():
return
case out <- fetch(ctx, request):
case out <- r:
span.AddEvent("sent queue")
}
}
}()
}
return &pool[IN, OUT]{
in: in,
out: out,
}, func() { close(in); wg.Wait(); close(out) }
in: in,
out: out,
}, func() {
close(in)
wg.Wait()
close(out)
}
}
func (f *pool[IN, OUT]) Fetch(request IN) {

51
go.mod
View File

@@ -1,28 +1,63 @@
module go.sour.is/xt
go 1.23.2
go 1.24.1
require (
github.com/mattn/go-sqlite3 v1.14.24
go.yarn.social/lextwt v0.0.0-20240908172157-7b9ae633db51
go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp v0.6.0
go.yarn.social/lextwt v0.1.5-0.20250406192339-cf769bfa2521
)
require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.25.1 // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.62.0 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.34.0 // indirect
go.opentelemetry.io/proto/otlp v1.5.0 // indirect
golang.org/x/net v0.34.0 // indirect
golang.org/x/text v0.24.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20250115164207-1a7da9e5054f // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250115164207-1a7da9e5054f // indirect
google.golang.org/protobuf v1.36.5 // indirect
)
require (
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/google/uuid v1.6.0 // indirect
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
go.opentelemetry.io/otel/metric v1.34.0 // indirect
go.opentelemetry.io/otel/trace v1.34.0 // indirect
go.opentelemetry.io/otel/log v0.10.0
go.opentelemetry.io/otel/metric v1.35.0
go.opentelemetry.io/otel/trace v1.35.0
)
require (
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/matryer/is v1.4.1
github.com/oklog/ulid/v2 v2.1.0
github.com/prometheus/client_golang v1.20.5
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/uptrace/opentelemetry-go-extra/otelsql v0.3.2
github.com/writeas/go-strip-markdown/v2 v2.1.1 // indirect
go.opentelemetry.io/otel v1.34.0
go.yarn.social/types v0.0.0-20230305013457-e4d91e351ac8
golang.org/x/crypto v0.27.0 // indirect
golang.org/x/sys v0.25.0 // indirect
go.opentelemetry.io/contrib/bridges/otelslog v0.9.0
go.opentelemetry.io/otel v1.35.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.34.0
go.opentelemetry.io/otel/exporters/prometheus v0.57.0
go.opentelemetry.io/otel/exporters/stdout/stdoutlog v0.10.0
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.34.0
go.opentelemetry.io/otel/sdk v1.35.0
go.opentelemetry.io/otel/sdk/log v0.10.0
go.opentelemetry.io/otel/sdk/metric v1.35.0
go.yarn.social/types v0.0.0-20250108134258-ed75fa653ede
golang.org/x/crypto v0.37.0 // indirect
golang.org/x/sync v0.13.0
golang.org/x/sys v0.32.0 // indirect
google.golang.org/grpc v1.70.0 // indirect
)

111
go.sum
View File

@@ -1,46 +1,121 @@
github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A=
github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.25.1 h1:VNqngBF40hVlDloBruUehVYC3ArSgIyScOAyMRqBxRg=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.25.1/go.mod h1:RBRO7fro65R6tjKzYgLAFo0t1QEXY1Dp+i/bvpRiqiQ=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I=
github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA=
github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc=
github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw=
github.com/matryer/is v1.4.1 h1:55ehd8zaGABKLXQUe2awZ99BD/PTc2ls+KV/dXphgEQ=
github.com/matryer/is v1.4.1/go.mod h1:8I/i5uYgLzgsgEloJE1U6xx5HkBQpAZvepWuujKwMRU=
github.com/mattn/go-sqlite3 v1.14.24 h1:tpSp2G2KyMnnQu99ngJ47EIkWVmliIizyZBfPrBWDRM=
github.com/mattn/go-sqlite3 v1.14.24/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/oklog/ulid/v2 v2.1.0 h1:+9lhoxAP56we25tyYETBBY1YLA2SaoLvUFgrP2miPJU=
github.com/oklog/ulid/v2 v2.1.0/go.mod h1:rcEKHmBBKfef9DhnvX7y1HZBYxjXb0cP5ExxNsTT1QQ=
github.com/pborman/getopt v0.0.0-20170112200414-7148bc3a4c30/go.mod h1:85jBQOZwpVEaDAr341tbn15RS4fCAsIst0qp7i8ex1o=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_golang v1.20.5 h1:cxppBPuYhUnsO6yo/aoRol4L7q7UFfdm+bR9r+8l63Y=
github.com/prometheus/client_golang v1.20.5/go.mod h1:PIEt8X02hGcP8JWbeHyeZ53Y/jReSnHgO035n//V5WE=
github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E=
github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY=
github.com/prometheus/common v0.62.0 h1:xasJaQlnWAeyHdUBeGjXmutelfJHWMRr+Fg4QszZ2Io=
github.com/prometheus/common v0.62.0/go.mod h1:vyBcEuLSvWos9B1+CyL7JZ2up+uFzXhkqml0W5zIY1I=
github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc=
github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk=
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/uptrace/opentelemetry-go-extra/otelsql v0.3.2 h1:ZjUj9BLYf9PEqBn8W/OapxhPjVRdC6CsXTdULHsyk5c=
github.com/uptrace/opentelemetry-go-extra/otelsql v0.3.2/go.mod h1:O8bHQfyinKwTXKkiKNGmLQS7vRsqRxIQTFZpYpHK3IQ=
github.com/writeas/go-strip-markdown/v2 v2.1.1 h1:hAxUM21Uhznf/FnbVGiJciqzska6iLei22Ijc3q2e28=
github.com/writeas/go-strip-markdown/v2 v2.1.1/go.mod h1:UvvgPJgn1vvN8nWuE5e7v/+qmDu3BSVnKAB6Gl7hFzA=
go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA=
go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A=
go.opentelemetry.io/otel v1.34.0 h1:zRLXxLCgL1WyKsPVrgbSdMN4c0FMkDAskSTQP+0hdUY=
go.opentelemetry.io/otel v1.34.0/go.mod h1:OWFPOQ+h4G8xpyjgqo4SxJYdDQ/qmRH+wivy7zzx9oI=
go.opentelemetry.io/otel/metric v1.34.0 h1:+eTR3U0MyfWjRDhmFMxe2SsW64QrZ84AOhvqS7Y+PoQ=
go.opentelemetry.io/otel/metric v1.34.0/go.mod h1:CEDrp0fy2D0MvkXE+dPV7cMi8tWZwX3dmaIhwPOaqHE=
go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC8mh/k=
go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE=
go.uber.org/ratelimit v0.3.1 h1:K4qVE+byfv/B3tC+4nYWP7v/6SimcO7HzHekoMNBma0=
go.uber.org/ratelimit v0.3.1/go.mod h1:6euWsTB6U/Nb3X++xEUXA8ciPJvr19Q/0h1+oDcJhRk=
go.yarn.social/lextwt v0.0.0-20240908172157-7b9ae633db51 h1:XEjx0jSNv1h22gwGfQBfMypWv/YZXWGTRbqh3B8xfIs=
go.yarn.social/lextwt v0.0.0-20240908172157-7b9ae633db51/go.mod h1:CWAZuBHZfGaqa0FreSeLG+pzK3rHP2TNAG7Zh6QlRiM=
go.yarn.social/types v0.0.0-20230305013457-e4d91e351ac8 h1:zfnniiSO/WO65mSpdQzGYJ9pM0rYg/BKgrSm8h2mTyA=
go.yarn.social/types v0.0.0-20230305013457-e4d91e351ac8/go.mod h1:+xnDkQ0T0S8emxWIsvxlCAoyF8gBaj0q81hr/VrKc0c=
golang.org/x/crypto v0.27.0 h1:GXm2NjJrPaiv/h1tb2UH8QfgC/hOf/+z0p6PT8o1w7A=
golang.org/x/crypto v0.27.0/go.mod h1:1Xngt8kV6Dvbssa53Ziq6Eqn0HqbZi5Z6R0ZpwQzt70=
go.opentelemetry.io/contrib/bridges/otelslog v0.9.0 h1:N+78eXSlu09kii5nkiM+01YbtWe01oZLPPLhNlEKhus=
go.opentelemetry.io/contrib/bridges/otelslog v0.9.0/go.mod h1:/2KhfLAhtQpgnhIk1f+dftA3fuuMcZjiz//Dc9yfaEs=
go.opentelemetry.io/otel v1.35.0 h1:xKWKPxrxB6OtMCbmMY021CqC45J+3Onta9MqjhnusiQ=
go.opentelemetry.io/otel v1.35.0/go.mod h1:UEqy8Zp11hpkUrL73gSlELM0DupHoiq72dR+Zqel/+Y=
go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp v0.6.0 h1:QSKmLBzbFULSyHzOdO9JsN9lpE4zkrz1byYGmJecdVE=
go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp v0.6.0/go.mod h1:sTQ/NH8Yrirf0sJ5rWqVu+oT82i4zL9FaF6rWcqnptM=
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.34.0 h1:OeNbIYk/2C15ckl7glBlOBp5+WlYsOElzTNmiPW/x60=
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.34.0/go.mod h1:7Bept48yIeqxP2OZ9/AqIpYS94h2or0aB4FypJTc8ZM=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.34.0 h1:tgJ0uaNS4c98WRNUEx5U3aDlrDOI5Rs+1Vifcw4DJ8U=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.34.0/go.mod h1:U7HYyW0zt/a9x5J1Kjs+r1f/d4ZHnYFclhYY2+YbeoE=
go.opentelemetry.io/otel/exporters/prometheus v0.57.0 h1:AHh/lAP1BHrY5gBwk8ncc25FXWm/gmmY3BX258z5nuk=
go.opentelemetry.io/otel/exporters/prometheus v0.57.0/go.mod h1:QpFWz1QxqevfjwzYdbMb4Y1NnlJvqSGwyuU0B4iuc9c=
go.opentelemetry.io/otel/exporters/stdout/stdoutlog v0.10.0 h1:GKCEAZLEpEf78cUvudQdTg0aET2ObOZRB2HtXA0qPAI=
go.opentelemetry.io/otel/exporters/stdout/stdoutlog v0.10.0/go.mod h1:9/zqSWLCmHT/9Jo6fYeUDRRogOLL60ABLsHWS99lF8s=
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.34.0 h1:jBpDk4HAUsrnVO1FsfCfCOTEc/MkInJmvfCHYLFiT80=
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.34.0/go.mod h1:H9LUIM1daaeZaz91vZcfeM0fejXPmgCYE8ZhzqfJuiU=
go.opentelemetry.io/otel/log v0.10.0 h1:1CXmspaRITvFcjA4kyVszuG4HjA61fPDxMb7q3BuyF0=
go.opentelemetry.io/otel/log v0.10.0/go.mod h1:PbVdm9bXKku/gL0oFfUF4wwsQsOPlpo4VEqjvxih+FM=
go.opentelemetry.io/otel/metric v1.35.0 h1:0znxYu2SNyuMSQT4Y9WDWej0VpcsxkuklLa4/siN90M=
go.opentelemetry.io/otel/metric v1.35.0/go.mod h1:nKVFgxBZ2fReX6IlyW28MgZojkoAkJGaE8CpgeAU3oE=
go.opentelemetry.io/otel/sdk v1.35.0 h1:iPctf8iprVySXSKJffSS79eOjl9pvxV9ZqOWT0QejKY=
go.opentelemetry.io/otel/sdk v1.35.0/go.mod h1:+ga1bZliga3DxJ3CQGg3updiaAJoNECOgJREo9KHGQg=
go.opentelemetry.io/otel/sdk/log v0.10.0 h1:lR4teQGWfeDVGoute6l0Ou+RpFqQ9vaPdrNJlST0bvw=
go.opentelemetry.io/otel/sdk/log v0.10.0/go.mod h1:A+V1UTWREhWAittaQEG4bYm4gAZa6xnvVu+xKrIRkzo=
go.opentelemetry.io/otel/sdk/metric v1.35.0 h1:1RriWBmCKgkeHEhM7a2uMjMUfP7MsOF5JpUCaEqEI9o=
go.opentelemetry.io/otel/sdk/metric v1.35.0/go.mod h1:is6XYCUMpcKi+ZsOvfluY5YstFnhW0BidkR+gL+qN+w=
go.opentelemetry.io/otel/trace v1.35.0 h1:dPpEfJu1sDIqruz7BHFG3c7528f6ddfSWfFDVt/xgMs=
go.opentelemetry.io/otel/trace v1.35.0/go.mod h1:WUk7DtFp1Aw2MkvqGdwiXYDZZNvA/1J8o6xRXLrIkyc=
go.opentelemetry.io/proto/otlp v1.5.0 h1:xJvq7gMzB31/d406fB8U5CBdyQGw4P399D1aQWU/3i4=
go.opentelemetry.io/proto/otlp v1.5.0/go.mod h1:keN8WnHxOy8PG0rQZjJJ5A2ebUoafqWp0eVQ4yIXvJ4=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
go.yarn.social/lextwt v0.1.5-0.20250406192339-cf769bfa2521 h1:mqjKr+llWXgYtAWNsRx1+S45bhr6AgI0eLFhMhSYds4=
go.yarn.social/lextwt v0.1.5-0.20250406192339-cf769bfa2521/go.mod h1:P36NPegLbhbFa1A0JOLsDyIQcdM0zdmx8kPKACXry4A=
go.yarn.social/types v0.0.0-20250108134258-ed75fa653ede h1:XV9tuDQ605xxH4qIQPRHM1bOa7k0rJZ2RqA5kz2Nun4=
go.yarn.social/types v0.0.0-20250108134258-ed75fa653ede/go.mod h1:+xnDkQ0T0S8emxWIsvxlCAoyF8gBaj0q81hr/VrKc0c=
golang.org/x/crypto v0.37.0 h1:kJNSjF/Xp7kU0iB2Z+9viTPMW4EqqsrywMXLJOOsXSE=
golang.org/x/crypto v0.37.0/go.mod h1:vg+k43peMZ0pUMhYmVAWysMK35e6ioLh3wB8ZCAfbVc=
golang.org/x/net v0.34.0 h1:Mb7Mrk043xzHgnRM88suvJFwzVrRfHEHJEl5/71CKw0=
golang.org/x/net v0.34.0/go.mod h1:di0qlW3YNM5oh6GqDGQr92MyTozJPmybPK4Ev/Gm31k=
golang.org/x/sync v0.13.0 h1:AauUjRAJ9OSnvULf/ARrrVywoJDy0YS2AwQ98I37610=
golang.org/x/sync v0.13.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34=
golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.32.0 h1:s77OFDvIQeibCmezSnk/q6iAfkdiQaJi4VzroCFrN20=
golang.org/x/sys v0.32.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
golang.org/x/text v0.24.0 h1:dd5Bzh4yt5KYA8f9CJHCP4FB4D51c2c6JvN37xJJkJ0=
golang.org/x/text v0.24.0/go.mod h1:L8rBsPeo2pSS+xqN0d5u2ikmjtmoJbDBT1b7nHvFCdU=
google.golang.org/genproto/googleapis/api v0.0.0-20250115164207-1a7da9e5054f h1:gap6+3Gk41EItBuyi4XX/bp4oqJ3UwuIMl25yGinuAA=
google.golang.org/genproto/googleapis/api v0.0.0-20250115164207-1a7da9e5054f/go.mod h1:Ic02D47M+zbarjYYUlK57y316f2MoN0gjAwI3f2S95o=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250115164207-1a7da9e5054f h1:OxYkA3wjPsZyBylwymxSHa7ViiW1Sml4ToBrncvFehI=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250115164207-1a7da9e5054f/go.mod h1:+2Yz8+CLJbIfL9z73EW45avw8Lmge3xVElCP9zEKi50=
google.golang.org/grpc v1.70.0 h1:pWFv03aZoHzlRKHWicjsZytKAiYCtNS0dHbXnIdq7jQ=
google.golang.org/grpc v1.70.0/go.mod h1:ofIJqVKDXx/JiXrwr2IG4/zwdH9txy3IlF40RmcJSQw=
google.golang.org/protobuf v1.36.5 h1:tPhr+woSbjfYvY6/GPufUoYizxw1cF/yFoxJ2fmpwlM=
google.golang.org/protobuf v1.36.5/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

231
http-api.go Normal file
View File

@@ -0,0 +1,231 @@
package main
import (
"fmt"
"net/http"
"slices"
"sort"
"strconv"
"strings"
"time"
"go.sour.is/xt/internal/otel"
"go.sour.is/xt/internal/uuid"
"go.yarn.social/lextwt"
)
type API struct {
app *appState
db db
hostname string
}
func (a *API) plain(w http.ResponseWriter, r *http.Request) {
reg := lextwt.NewTwtRegistry(mkPreambleDocs(a.hostname), nil)
reg.WriteTo(w)
}
func (a *API) conv(w http.ResponseWriter, r *http.Request) {
ctx, span := otel.Span(r.Context())
defer span.End()
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
hash := r.PathValue("hash")
// if (len(hash) < 6 || len(hash) > 8) && !notAny(hash, "abcdefghijklmnopqrstuvwxyz234567") {
// w.WriteHeader(http.StatusBadRequest)
// return
// }
limit := 100
if v, ok := strconv.Atoi(r.URL.Query().Get("limit")); ok == nil {
limit = v
}
var offset int64 = 0
if v, ok := strconv.ParseInt(r.URL.Query().Get("offset"), 10, 64); ok == nil {
offset = v
}
twts, offset, end, err := fetchConv(ctx, a.db, hash, limit, offset)
span.RecordError(err)
if err != nil {
http.Error(w, "ERR", 500)
return
}
preamble := mkPreamble(a.hostname, "", "/api/plain/conv/"+hash, limit, int64(len(twts)), offset, end)
reg := lextwt.NewTwtRegistry(preamble, twts)
reg.WriteTo(w)
}
func (a *API) mentions(w http.ResponseWriter, r *http.Request) {
ctx, span := otel.Span(r.Context())
defer span.End()
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
uri := r.URL.Query().Get("uri")
if uri == "" {
reg := lextwt.NewTwtRegistry(mkPreambleDocs(a.hostname), nil)
reg.WriteTo(w)
return
}
mention := uuid.UrlNS.UUID5(uri)
limit := 100
if v, ok := strconv.Atoi(r.URL.Query().Get("limit")); ok == nil {
limit = v
}
var offset int64 = 0
if v, ok := strconv.ParseInt(r.URL.Query().Get("offset"), 10, 64); ok == nil {
offset = v
}
twts, offset, end, err := fetchMentions(ctx, a.db, mention, limit, offset)
span.RecordError(err)
if err != nil {
http.Error(w, "ERR", 500)
return
}
preamble := mkPreamble(a.hostname, uri, "/api/plain/mentions", limit, int64(len(twts)), offset, end)
reg := lextwt.NewTwtRegistry(preamble, twts)
reg.WriteTo(w)
}
func (a *API) twt(w http.ResponseWriter, r *http.Request) {
ctx, span := otel.Span(r.Context())
defer span.End()
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
uri := r.URL.Query().Get("uri")
limit := 100
if v, ok := strconv.Atoi(r.URL.Query().Get("limit")); ok == nil {
limit = v
}
limit = min(100, max(1, limit))
var offset int64 = 0
if v, ok := strconv.ParseInt(r.URL.Query().Get("offset"), 10, 64); ok == nil {
offset = v
}
twts, offset, end, err := fetchTwts(ctx, a.db, uri, limit, offset)
span.RecordError(err)
if err != nil {
http.Error(w, "ERR", 500)
return
}
preamble := mkPreamble(a.hostname, uri, "/api/plain/twt", limit, int64(len(twts)), offset, end)
reg := lextwt.NewTwtRegistry(preamble, twts)
reg.WriteTo(w)
}
func (a *API) users(w http.ResponseWriter, r *http.Request) {
ctx, span := otel.Span(r.Context())
defer span.End()
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
uri := r.URL.Query().Get("uri")
q := r.URL.Query().Get("q")
twts, err := fetchUsers(ctx, a.db, uri, q)
if err != nil {
http.Error(w, "ERR", 500)
return
}
reg := lextwt.NewTwtRegistry(mkPreambleDocs(a.hostname), twts)
reg.WriteTo(w)
}
func (a *API) queue(w http.ResponseWriter, r *http.Request) {
lis := slices.Collect(a.app.queue.Iter())
sort.Slice(lis, func(i, j int) bool {
return lis[i].NextScanOn.Time.Before(lis[j].LastScanOn.Time)
})
for _, feed := range lis {
fmt.Fprintln(w, feed.State, feed.NextScanOn.Time.Format(time.RFC3339), feed.Nick, feed.URI)
}
}
func notAny(s string, chars string) bool {
for _, c := range s {
if !strings.ContainsRune(chars, c) {
return false
}
}
return true
}
func mkqry(uri string, limit int, offset int64) string {
qry := make([]string, 0, 3)
if uri != "" {
qry = append(qry, "uri="+uri)
}
if limit != 100 {
qry = append(qry, fmt.Sprint("limit=", limit))
}
if offset != 0 {
qry = append(qry, fmt.Sprint("offset=", offset))
}
if len(qry) == 0 {
return ""
}
return "?" + strings.Join(qry, "&")
}
func add(preamble lextwt.Comments, text string, v ...any) lextwt.Comments {
if len(v) > 0 {
text = fmt.Sprintf(text, v...)
}
return append(preamble, lextwt.NewComment("# "+text))
}
func addKey(preamble lextwt.Comments, key, value string, v ...any) lextwt.Comments {
if len(v) > 0 {
value = fmt.Sprintf(value, v...)
}
comment := fmt.Sprintf("# %s = %s", key, value)
return append(preamble, lextwt.NewCommentValue(comment, key, value))
}
func mkPreamble(hostname, uri, path string, limit int, length, offset, end int64) lextwt.Comments {
preamble := addKey(mkPreambleDocs(hostname), "twt range", "1 %d", end)
preamble = addKey(preamble, "self", "%s%s%s", hostname, path, mkqry(uri, limit, offset))
if next := offset + length; next < end {
preamble = addKey(preamble, "next", "%s%s%s", hostname, path, mkqry(uri, limit, next))
}
if prev := offset - int64(limit); prev > 0 {
preamble = addKey(preamble, "prev", "%s%s%s", hostname, path, mkqry(uri, limit, prev))
}
return preamble
}
var mkPreambleDocs = func(hostname string) lextwt.Comments {
c := add(nil, iAmTheWatcher)
c = add(c, "")
c = add(c, "Usage:")
c = add(c, " %s/api/plain/users View list of users and latest twt date.", hostname)
c = add(c, " %s/api/plain/twt View all twts.", hostname)
c = add(c, " %s/api/plain/mentions?uri=:uri View all mentions for uri.", hostname)
c = add(c, " %s/api/plain/conv/:hash View all twts for a conversation subject.", hostname)
c = add(c, "")
c = add(c, "Options:")
c = add(c, " uri Filter to show a specific users twts.")
c = add(c, " offset Start index for quey.")
c = add(c, " limit Count of items to return (going back in time).")
return add(c, "")
}

266
http-html.go Normal file
View File

@@ -0,0 +1,266 @@
package main
import (
"fmt"
"io"
"net/http"
"net/url"
"strconv"
"strings"
"time"
"go.sour.is/xt/internal/otel"
"go.yarn.social/lextwt"
"go.yarn.social/types"
)
type HTML struct {
app *appState
db db
hostname string
}
func (a *HTML) healthcheck(w http.ResponseWriter, r *http.Request) {
_, span := otel.Span(r.Context())
defer span.End()
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
w.Write([]byte("ok"))
}
func (a *HTML) home(w http.ResponseWriter, r *http.Request) {
ctx, span := otel.Span(r.Context())
defer span.End()
w.Header().Set("Content-Type", "text/html; charset=utf-8")
uri := r.URL.Query().Get("uri")
limit := 100
if v, ok := strconv.Atoi(r.URL.Query().Get("limit")); ok == nil {
limit = v
}
limit = min(100, max(1, limit))
var offset int64 = 0
if v, ok := strconv.ParseInt(r.URL.Query().Get("offset"), 10, 64); ok == nil {
offset = v
}
twts, offset, end, err := fetchTwts(ctx, a.db, uri, limit, offset)
span.RecordError(err)
if err != nil {
http.Error(w, "ERR", 500)
return
}
preamble := mkPreamble(a.hostname, uri, "", limit, int64(len(twts)), offset, end)
reg := &HTWriter{
lextwt.NewTwtRegistry(preamble, reverse(twts)),
}
reg.WriteTo(w)
}
func (a *HTML) conv(w http.ResponseWriter, r *http.Request) {
ctx, span := otel.Span(r.Context())
defer span.End()
w.Header().Set("Content-Type", "text/html; charset=utf-8")
hash := r.PathValue("hash")
// if (len(hash) < 6 || len(hash) > 8) && !notAny(hash, "abcdefghijklmnopqrstuvwxyz234567") {
// w.WriteHeader(http.StatusBadRequest)
// return
// }
limit := 100
if v, ok := strconv.Atoi(r.URL.Query().Get("limit")); ok == nil {
limit = v
}
var offset int64 = 0
if v, ok := strconv.ParseInt(r.URL.Query().Get("offset"), 10, 64); ok == nil {
offset = v
}
twts, offset, end, err := fetchConv(ctx, a.db, hash, limit, offset)
span.RecordError(err)
if err != nil {
http.Error(w, "ERR", 500)
return
}
preamble := mkPreamble(a.hostname, "", "/conv/"+hash, limit, int64(len(twts)), offset, end)
reg := &HTWriter{
lextwt.NewTwtRegistry(preamble, twts),
}
reg.WriteTo(w)
}
type reg interface {
Preamble() lextwt.Comments
Twts() types.Twts
}
type HTWriter struct {
reg
}
func (r *HTWriter) WriteTo(w io.Writer) (int64, error) {
var output int64
i, err := fmt.Fprintln(w, `<!DOCTYPE html>
<html>
<head>
<title>The Watcher</title>
<style>
@media screen and (max-width: 500px) {
body { width: 100%; margin: 0; }
}
@media screen and (min-width: 500px) and (max-width: 940px) {
body { width: 90%; margin: auto; }
.h-card { columns: 2; }
}
@media screen and (min-width: 940px) {
body { width: 70%; margin: auto; }
.h-card { columns: 2; }
}
body { font-family: sans-serif; background: black; color: white; }
a { color: cornflowerblue; text-decoration: none; }
main { }
pre { white-space: pre-wrap; }
pre.preamble { color: green; }
article { background-color: #333; border: 1px solid green; border-radius: 4px; padding: 4px; margin: 2px; }
article pre { color: orange; }
.h-card .author { display: flex; }
.h-card .icon { width: 36px; margin: 4px; }
.h-card .u-photo { width: 32px; }
.p-org a { color: darkgrey; }
.h-card .date { text-align: right;}
video { width: 100%; }
section { padding: 1em; border: 1px solid darkgreen; background-color: #111; }
section img { max-width: 100%; }
</style>
<meta name="viewport" content="width=device-width, initial-scale=1">
</head>
<body onload="setTimestamps()">
<pre class='preamble'>
`)
output += int64(i)
if err != nil {
return output, err
}
for _, c := range r.Preamble() {
if key := c.Key(); key != "" {
value := mkValue(c.Value())
i, err = fmt.Fprintf(w, "# %s = %s\n", key, value)
} else {
i, err = fmt.Fprintln(w, c.Text())
}
output += int64(i)
if err != nil {
return output, err
}
}
i, err = fmt.Fprintln(w, "</pre><main>")
output += int64(i)
for _, twt := range r.Twts() {
twter := twt.Twter()
uri, err := url.Parse(twter.URI)
if err != nil {
uri = &url.URL{
Scheme: "HTTPS",
Host: "unknown.txt",
}
}
subject := ""
if s := twt.Subject(); s != nil {
if tag, ok := s.Tag().(*lextwt.Tag); ok && tag != nil {
subject = tag.Text()
}
}
i, err = fmt.Fprintf(w, `
<article>
<header class="u-author h-card">
<div class="author">
<div class="icon">
<a href="%s" class="u-url">
<img class="avatar u-photo" src="%s" alt="" loading="lazy">
</a>
</div>
<div class="author-name">
<div class="p-name">
<a href="%s">%s</a>
</div>
<div class="p-org">
<a target="_blank" href="%s">%s</a>
</div>
</div>
</div>
<div class="date">
<div><a class="u-url" href="%s">%s</a></div>
<div><small><a href="%s"> View Thread</a></small></div>
</div>
</header>
<section>
%-h
</section>
</div>
</article>
`, "/?uri="+twter.URI, twter.Avatar,
"/?uri="+twter.URI, twter.Nick,
twter.URI, uri.Host,
"/conv/"+subject, fmt.Sprintf("<time datetime='%s'>%s</time>", twt.Created().Format(time.RFC3339), twt.Created().Format(time.RFC822)),
"/conv/"+subject,
twt,
)
output += int64(i)
if err != nil {
return output, err
}
}
i, err = fmt.Fprintln(w, `</main>
<script>
function setTimestamps() {
document.querySelectorAll("time").forEach((e) => {
const OneDay = new Date(new Date().getTime() - (1 * 24 * 60 * 60 * 1000))
const d = new Date(e.hasAttributes() && e.attributes.getNamedItem('datetime'). value);
if (d > OneDay)
e.innerHTML = d.toLocaleTimeString(navigator.language, { hour: '2-digit', minute: '2-digit' });
else
e.innerHTML = d.toLocaleDateString(navigator.language, { hour: '2-digit', minute: '2-digit' });
});
}
</script>
</body>`)
output += int64(i)
return output, err
}
func mkValue(v string) string {
if strings.HasPrefix(v, "http") {
return fmt.Sprintf(`<a href="%s">%s</a>`, v, v)
}
return v
}
func reverse[T any](s []T) []T {
for i, j := 0, len(s)-1; i < j; i, j = i+1, j-1 {
s[i], s[j] = s[j], s[i]
}
return s
}

109
http.go
View File

@@ -2,96 +2,63 @@ package main
import (
"context"
"errors"
"fmt"
"net/http"
"slices"
"sort"
"strings"
"time"
"go.sour.is/xt/internal/otel"
)
func httpServer(c console, app *appState) {
c.Log("start http server")
const iAmTheWatcher = "I am the Watcher. I am your guide through this vast new twtiverse."
db, err := app.DB()
func httpServer(ctx context.Context, app *appState) error {
ctx, span := otel.Span(ctx)
defer span.End()
span.AddEvent("start http server")
db, err := app.DB(ctx)
if err != nil {
c.Log("missing db", err)
c.abort()
return
span.RecordError(fmt.Errorf("%w: missing db", err))
return err
}
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/html; charset=utf-8")
w.Write([]byte("ok"))
})
api := API{
app: app,
db: db,
hostname: app.args.Hostname,
}
http.HandleFunc("/conv/{hash}", func(w http.ResponseWriter, r *http.Request) {
hash := r.PathValue("hash")
if (len(hash) < 6 || len(hash) > 8) && !notAny(hash, "abcdefghijklmnopqrstuvwxyz234567") {
w.WriteHeader(http.StatusBadRequest)
return
}
html := HTML{
app: app,
db: db,
hostname: app.args.Hostname,
}
w.Header().Set("Content-Type", "text/html; charset=utf-8")
w.Write([]byte(hash))
http.HandleFunc("/health", html.healthcheck)
rows, err := db.QueryContext(r.Context(), "SELECT feed_id, hash, conv, dt, text FROM twt WHERE hash = $1 or conv = $1", hash)
if err != nil {
c.Log(err)
return
}
defer rows.Close()
http.HandleFunc("/", html.home)
http.HandleFunc("/conv/{hash}", html.conv)
for rows.Next() {
var twt struct {
FeedID string
Hash string
Conv string
Dt time.Time
Text string
}
err = rows.Scan(&twt.FeedID, &twt.Hash, &twt.Conv, &twt.Dt, &twt.Text)
if err != nil {
c.Log(err)
return
}
}
})
http.HandleFunc("/feeds", func(w http.ResponseWriter, r *http.Request) {
lis := slices.Collect(app.queue.Iter())
sort.Slice(lis, func(i, j int) bool {
return lis[i].LastScanOn.Time.Before(lis[j].LastScanOn.Time)
})
for _, feed := range lis {
fmt.Fprintln(w, feed.State, feed.LastScanOn.Time.Format(time.RFC3339), feed.Nick, feed.URI)
}
})
http.HandleFunc("/api/plain", api.plain)
http.HandleFunc("/api/plain/conv/{hash}", api.conv)
http.HandleFunc("/api/plain/mentions", api.mentions)
http.HandleFunc("/api/plain/twt", api.twt)
http.HandleFunc("/api/plain/tweets", api.twt)
http.HandleFunc("/api/plain/users", api.users)
http.HandleFunc("/api/plain/queue", api.queue)
srv := &http.Server{
Addr: app.args.Listen,
Handler: http.DefaultServeMux,
}
go func() {
<-c.Done()
c.Log("stop http server")
srv.Shutdown(context.Background())
}()
app.AddCancel(srv.Shutdown)
err = srv.ListenAndServe()
if err != nil {
c.Log(err)
c.abort()
return
if !errors.Is(err, http.ErrServerClosed) {
span.RecordError(err)
return err
}
}
func notAny(s string, chars string) bool {
for _, c := range s {
if !strings.ContainsRune(chars, c) {
return false
}
}
return true
return nil
}

View File

@@ -2,8 +2,10 @@ PRAGMA journal_mode=WAL;
create table if not exists feeds (
feed_id blob primary key,
uri text,
parent_id blob,
nick text,
uri text,
state string,
last_scan_on timestamp,
refresh_rate int default 600,
last_modified_on timestamp,
@@ -13,12 +15,27 @@ create table if not exists feeds (
create table if not exists twts (
feed_id blob,
hash text,
conv text,
dt text, -- timestamp with timezone
ulid blob,
text text,
conv text,
hash text,
mentions text, -- json
tags text, -- json
primary key (feed_id, hash)
primary key (feed_id, ulid)
);
create table if not exists last_twt_on(
feed_id blob,
last_twt_on,
primary key (feed_id)
);
CREATE INDEX if not exists twt_time on twts (ulid asc);
create table if not exists twt_mentions(
feed_id blob,
ulid blob,
primary key (feed_id, ulid)
);
CREATE INDEX if not exists twt_mention on twt_mentions (ulid asc);

View File

@@ -0,0 +1,61 @@
package console
import (
"context"
"errors"
"fmt"
"io"
"os"
"os/signal"
"time"
)
type C[A any] struct {
io.Reader
io.Writer
err io.Writer
args A
abort func()
cancelfns []func(context.Context) error
}
func New[A any](args A) (context.Context, *C[A]) {
ctx := context.Background()
ctx, abort := context.WithCancel(ctx)
ctx, stop := signal.NotifyContext(ctx, os.Interrupt)
go func() { <-ctx.Done(); stop() }() // restore interrupt function
console := &C[A]{Reader: os.Stdin, Writer: os.Stdout, err: os.Stderr, args: args, abort: abort}
return ctx, console
}
func (c *C[A]) Args() A {
return c.args
}
func (c *C[A]) Shutdown() error {
fmt.Fprintln(c.err, "shutting down ", len(c.cancelfns), " cancel functions...")
defer fmt.Fprintln(c.err, "done")
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
c.abort()
var err error
for _, fn := range c.cancelfns {
err = errors.Join(err, fn(ctx))
}
return err
}
func (c *C[A]) AddCancel(fn func(context.Context) error) { c.cancelfns = append(c.cancelfns, fn) }
func (c *C[A]) IfFatal(err error) {
if err == nil {
return
}
fmt.Fprintln(c.err, err)
err = c.Shutdown()
if err != nil {
fmt.Fprintln(c.err, err)
}
os.Exit(1)
}

54
internal/env/env.go vendored Normal file
View File

@@ -0,0 +1,54 @@
package env
import (
"bufio"
"os"
"strings"
)
func Default(key, def string) string {
if v, ok := os.LookupEnv(key); ok {
return v
}
return def
}
type secret struct {
value string
}
func (s secret) Secret() string {
return s.value
}
func (s secret) String() string {
return "***"
}
func Secret(key, def string) secret {
if v, ok := os.LookupEnv(key); ok {
return secret{v}
}
return secret{def}
}
func DotEnv() {
fd, err := os.Open(".env")
if err != nil {
return
}
scan := bufio.NewScanner(fd)
for scan.Scan() {
line := scan.Text()
if strings.HasPrefix(line, "#") {
continue
}
key, val, ok := strings.Cut(line, "=")
if !ok {
continue
}
os.Setenv(strings.TrimSpace(key), strings.TrimSpace(val))
}
}

335
internal/otel/otel.go Normal file
View File

@@ -0,0 +1,335 @@
package otel
import (
"context"
"errors"
"fmt"
"log/slog"
"net/http"
"os"
"runtime"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.opentelemetry.io/contrib/bridges/otelslog"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
otelprom "go.opentelemetry.io/otel/exporters/prometheus"
"go.opentelemetry.io/otel/exporters/stdout/stdoutlog"
"go.opentelemetry.io/otel/exporters/stdout/stdouttrace"
"go.opentelemetry.io/otel/log/global"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/log"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
"go.opentelemetry.io/otel/trace"
)
var (
tracer trace.Tracer
meter metric.Meter
logger *slog.Logger
)
func Init(ctx context.Context, name string) (shutdown func(context.Context) error, err error) {
tracer = otel.Tracer(name)
meter = otel.Meter(name)
logger = otelslog.NewLogger(name)
return setupOTelSDK(ctx, name)
}
func Meter() metric.Meter { return meter }
// func Error(err error, v ...any) {
// if err == nil {
// return
// }
// fmt.Println("ERR:", append([]any{err}, v...))
// logger.Error(err.Error(), v...)
// }
// func Info(msg string, v ...any) { fmt.Println(append([]any{msg}, v...)); logger.Info(msg, v...) }
type spanny struct {
trace.Span
}
func (s *spanny) RecordError(err error, options ...trace.EventOption) {
if err == nil {
return
}
ec := trace.NewEventConfig(options...)
attrs := make([]any, len(ec.Attributes()))
for i, v := range ec.Attributes() {
attrs[i] = v
}
fmt.Println(append([]any{"ERR:", err}, attrs...)...)
logger.Error(err.Error(), attrs...)
s.Span.RecordError(err, options...)
}
func (s *spanny) AddEvent(name string, options ...trace.EventOption) {
ec := trace.NewEventConfig(options...)
attrs := make([]any, 2*len(ec.Attributes()))
for i, v := range ec.Attributes() {
attrs[2*i] = v.Key
attrs[2*i+1] = v.Value.Emit()
}
fmt.Println(append([]any{name}, attrs...)...)
logger.Info(name, attrs...)
s.Span.AddEvent(name, options...)
}
func Span(ctx context.Context, opts ...trace.SpanStartOption) (context.Context, trace.Span) {
name, attrs := Attrs()
ctx, span := tracer.Start(ctx, name, opts...)
span.SetAttributes(attrs...)
return ctx, &spanny{span}
}
func Attrs() (string, []attribute.KeyValue) {
var attrs []attribute.KeyValue
var name string
if pc, file, line, ok := runtime.Caller(2); ok {
if fn := runtime.FuncForPC(pc); fn != nil {
name = fn.Name()
}
attrs = append(attrs,
attribute.String("pc", fmt.Sprintf("%v", pc)),
attribute.String("file", file),
attribute.Int("line", line),
)
}
return name, attrs
}
// setupOTelSDK bootstraps the OpenTelemetry pipeline.
// If it does not return an error, make sure to call shutdown for proper cleanup.
func setupOTelSDK(ctx context.Context, name string) (shutdown func(context.Context) error, err error) {
var shutdownFuncs []func(context.Context) error
// shutdown calls cleanup functions registered via shutdownFuncs.
// The errors from the calls are joined.
// Each registered cleanup will be invoked once.
shutdown = func(ctx context.Context) error {
fmt.Println("shutdown")
var err error
for _, fn := range shutdownFuncs {
err = errors.Join(err, fn(ctx))
}
shutdownFuncs = nil
return err
}
// playShutdown := otelplay.ConfigureOpentelemetry(ctx)
// shutdownFuncs = append(shutdownFuncs, func(ctx context.Context) error { playShutdown(); return nil })
// handleErr calls shutdown for cleanup and makes sure that all errors are returned.
handleErr := func(inErr error) {
err = errors.Join(inErr, shutdown(ctx))
}
// Set up propagator.
prop := newPropagator()
otel.SetTextMapPropagator(prop)
// Set up trace provider.
tracerShutdown, err := newTraceProvider(ctx, name)
if err != nil {
handleErr(err)
return
}
shutdownFuncs = append(shutdownFuncs, tracerShutdown)
// Set up meter provider.
meterShutdown, err := newMeterProvider(ctx, name)
if err != nil {
handleErr(err)
return
}
shutdownFuncs = append(shutdownFuncs, meterShutdown)
// Set up logger provider.
loggerShutdown, err := newLoggerProvider(ctx, name)
if err != nil {
handleErr(err)
return
}
shutdownFuncs = append(shutdownFuncs, loggerShutdown)
return
}
func newPropagator() propagation.TextMapPropagator {
return propagation.NewCompositeTextMapPropagator(
propagation.TraceContext{},
propagation.Baggage{},
)
}
func newTraceProvider(ctx context.Context, name string) (func(context.Context) error, error) {
r, err := resource.Merge(
resource.Default(),
resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceName(name),
),
)
if err != nil {
return nil, err
}
if v := env("XT_TRACER", ""); v == "stdout" {
traceExporter, err := stdouttrace.New(
stdouttrace.WithWriter(os.Stderr),
stdouttrace.WithPrettyPrint(),
)
if err != nil {
return nil, err
}
tracerProvider := sdktrace.NewTracerProvider(
sdktrace.WithResource(r),
sdktrace.WithBatcher(traceExporter,
// Default is 5s. Set to 1s for demonstrative purposes.
sdktrace.WithBatchTimeout(time.Second)),
)
otel.SetTracerProvider(tracerProvider)
return tracerProvider.Shutdown, nil
} else if v != "" {
fmt.Println("use tracer", v)
exp, err := otlptracegrpc.New(
ctx,
otlptracegrpc.WithEndpoint(v),
otlptracegrpc.WithInsecure(),
)
if err != nil {
return nil, err
}
tracerProvider := sdktrace.NewTracerProvider(
sdktrace.WithBatcher(exp),
sdktrace.WithResource(r),
)
otel.SetTracerProvider(tracerProvider)
return func(ctx context.Context) error {
return tracerProvider.Shutdown(ctx)
}, nil
}
return func(ctx context.Context) error { return nil }, nil
}
func newMeterProvider(ctx context.Context, name string) (func(context.Context) error, error) {
_, _ = ctx, name
// metricExporter, err := stdoutmetric.New()
// if err != nil {
// return nil, err
// }
// meterProvider := sdkmetric.NewMeterProvider(
// sdkmetric.WithReader(sdkmetric.NewPeriodicReader(metricExporter,
// // Default is 1m. Set to 3s for demonstrative purposes.
// sdkmetric.WithInterval(3*time.Second))),
// )
// otel.SetMeterProvider(meterProvider)
metricExporter, err := otelprom.New(
otelprom.WithRegisterer(prometheus.DefaultRegisterer),
// OTEL default buckets assume you're using milliseconds. Substitute defaults
// appropriate for units of seconds.
otelprom.WithAggregationSelector(func(ik sdkmetric.InstrumentKind) sdkmetric.Aggregation {
switch ik {
case sdkmetric.InstrumentKindHistogram:
return sdkmetric.AggregationExplicitBucketHistogram{
Boundaries: prometheus.DefBuckets,
NoMinMax: false,
}
default:
return sdkmetric.DefaultAggregationSelector(ik)
}
}),
)
p := sdkmetric.NewMeterProvider(
sdkmetric.WithReader(metricExporter),
)
otel.SetMeterProvider(p)
http.Handle("/metrics", promhttp.Handler())
return func(ctx context.Context) error { return nil }, err
}
func newLoggerProvider(ctx context.Context, name string) (func(context.Context) error, error) {
r, err := resource.Merge(
resource.Default(),
resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceName(name),
),
)
if err != nil {
return nil, err
}
if v := env("XT_LOGGER", ""); v == "stdout" {
logExporter, err := stdoutlog.New(
stdoutlog.WithPrettyPrint(),
stdoutlog.WithWriter(os.Stderr),
)
if err != nil {
return nil, err
}
loggerProvider := log.NewLoggerProvider(
log.WithProcessor(
log.NewBatchProcessor(logExporter),
),
log.WithResource(r),
)
global.SetLoggerProvider(loggerProvider)
return loggerProvider.Shutdown, nil
} else if v != "" {
fmt.Println("use logger", v)
exp, err := otlploghttp.New(
ctx,
otlploghttp.WithInsecure(),
otlploghttp.WithEndpointURL(v),
)
if err != nil {
return nil, err
}
processor := log.NewBatchProcessor(exp)
provider := log.NewLoggerProvider(
log.WithProcessor(processor),
log.WithResource(r),
)
global.SetLoggerProvider(provider)
return processor.Shutdown, nil
}
return func(ctx context.Context) error { return nil }, nil
}
func env(key, def string) string {
if v, ok := os.LookupEnv(key); ok {
return v
}
return def
}

View File

@@ -1,4 +1,4 @@
package main
package uuid
import (
"crypto/sha1"
@@ -8,54 +8,54 @@ import (
"strings"
)
type uuid [16]byte
type UUID [16]byte
var urlNS = uuid{0x6b, 0xa7, 0xb8, 0x10, 0x9d, 0xad, 0x11, 0xd1, 0x80, 0xb4, 0x00, 0xc0, 0x4f, 0xd4, 0x30, 0xc8}
var UrlNS = UUID{0x6b, 0xa7, 0xb8, 0x10, 0x9d, 0xad, 0x11, 0xd1, 0x80, 0xb4, 0x00, 0xc0, 0x4f, 0xd4, 0x30, 0xc8}
func (u uuid) UUID5(value string) uuid {
func (u UUID) UUID5(value string) UUID {
h := sha1.New()
h.Write(u[:])
h.Write([]byte(value))
return uuid(h.Sum(nil))
return UUID(h.Sum(nil))
}
func (u *uuid) UnmarshalText(data string) error {
func (u *UUID) UnmarshalText(data string) error {
data = strings.Trim(data, "{}")
data = strings.ReplaceAll(data, "-", "")
_, err := hex.Decode(u[:], []byte(data))
return err
}
func (u uuid) MarshalText() string {
func (u UUID) MarshalText() string {
s := hex.EncodeToString(u[:])
return fmt.Sprintf("{%s-%s-%s-%s-%s}", s[:8], s[8:12], s[12:16], s[16:20], s[20:])
}
func (u uuid) Value() (driver.Value, error) {
func (u UUID) Value() (driver.Value, error) {
return u[:], nil
}
func (u *uuid) Scan(value any) error {
func (u *UUID) Scan(value any) error {
if value == nil {
return nil
}
*u = uuid(value.([]byte))
*u = UUID(value.([]byte))
return nil
}
type uuids []uuid
type UUIDs []UUID
func (lis uuids) ToStrList() strList {
arr := make(strList, len(lis))
func (lis UUIDs) ToStrList() List {
arr := make(List, len(lis))
for i, v := range lis {
arr[i] = v.MarshalText()
}
return arr
}
type strList []string
type List []string
func (l *strList) Scan(value any) error {
func (l *List) Scan(value any) error {
s := value.(string)
s = strings.Trim(s, "[]")
for _, v := range strings.Split(s, ",") {
@@ -67,11 +67,11 @@ func (l *strList) Scan(value any) error {
return nil
}
func (l strList) Value() (driver.Value, error) {
func (l List) Value() (driver.Value, error) {
arr := make([]string, len(l))
for i, v := range l {
arr[i] = "\""+v+"\""
arr[i] = "\"" + v + "\""
}
return "["+strings.Join(arr, ",") +"]", nil
}
return "[" + strings.Join(arr, ",") + "]", nil
}

82
main.go
View File

@@ -3,46 +3,14 @@ package main
import (
"context"
"errors"
"fmt"
"io"
"os"
"os/signal"
"go.opentelemetry.io/otel"
"go.sour.is/xt/internal/console"
"go.sour.is/xt/internal/env"
"go.sour.is/xt/internal/otel"
)
const name = "go.sour.is/xt"
var (
tracer = otel.Tracer(name)
meter = otel.Meter(name)
)
type contextKey struct{ name string }
type console struct {
io.Reader
io.Writer
err io.Writer
context.Context
abort func()
}
func (c console) Log(v ...any) { fmt.Fprintln(c.err, v...) }
func (c console) Args() args {
v, ok := c.Get("args").(args)
if !ok {
return args{}
}
return v
}
func (c *console) Set(name string, value any) {
c.Context = context.WithValue(c.Context, contextKey{name}, value)
}
func (c console) Get(name string) any {
return c.Context.Value(contextKey{name})
}
type args struct {
dbtype string
dbfile string
@@ -50,34 +18,34 @@ type args struct {
Nick string
URI string
Listen string
}
func env(key, def string) string {
if v, ok := os.LookupEnv(key); ok {
return v
}
return def
Hostname string
}
func main() {
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt)
console := console{os.Stdin, os.Stdout, os.Stderr, ctx, stop}
env.DotEnv() // load .env
go func() { <-ctx.Done(); console.Log("shutdown"); stop() }()
ctx, console := console.New(args{
dbtype: env.Default("XT_DBTYPE", "sqlite3"),
dbfile: env.Default("XT_DBFILE", "file:twt.db"),
baseFeed: env.Default("XT_BASE_FEED", ""),
Nick: env.Default("XT_NICK", "xuu"),
URI: env.Default("XT_URI", "https://txt.sour.is/user/xuu/twtxt.txt"),
Listen: env.Default("XT_LISTEN", ":8080"),
Hostname: env.Default("XT_HOSTNAME", "https://watcher.sour.is"),
})
args := args{
dbtype: env("XT_DBTYPE", "sqlite3"),
dbfile: env("XT_DBFILE", "file:twt.db"),
baseFeed: env("XT_BASE_FEED", "feed"),
Nick: env("XT_NICK", "xuu"),
URI: env("XT_URI", "https://txt.sour.is/users/xuu/twtxt.txt"),
Listen: env("XT_LISTEN", ":8040"),
}
finish, err := otel.Init(ctx, name)
console.IfFatal(err)
console.AddCancel(finish)
console.Set("args", args)
m_up, err := otel.Meter().Int64Gauge("up")
console.IfFatal(err)
if err := run(console); err != nil && !errors.Is(err, context.Canceled) {
fmt.Println(err)
os.Exit(1)
m_up.Record(ctx, 1)
defer m_up.Record(context.Background(), 0)
err = run(ctx, console)
if !errors.Is(err, context.Canceled) {
console.IfFatal(err)
}
}

View File

@@ -1,105 +1,159 @@
package main
import (
"context"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"strings"
"sort"
"time"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.sour.is/xt/internal/otel"
"go.yarn.social/lextwt"
"go.yarn.social/types"
)
const (
TenYear = 3153600000
OneMonth = OneDay * 30
OneDay = 86400
OneHour = 3600
TenMinutes = 600
TwoMinutes = 60
TwoMinutes = 120
)
func refreshLoop(c console, app *appState) {
defer c.abort()
func feedRefreshProcessor(ctx context.Context, app *appState) error {
ctx, span := otel.Span(ctx)
defer span.End()
sleeping_time, _ := otel.Meter().Int64Counter("xt_feed_sleep")
queue_size, _ := otel.Meter().Int64Gauge("xt_feed_queue_size")
f := NewHTTPFetcher()
fetch, close := NewFuncPool(c.Context, 25, f.Fetch)
fetch, close := NewFuncPool(ctx, 40, f.Fetch)
defer close()
db, err := app.DB()
db, err := app.DB(ctx)
if err != nil {
c.Log("missing db")
c.abort()
return
span.RecordError(err)
return err
}
go processorLoop(ctx, db, fetch)
queue := app.queue
c.Log("start refresh loop")
for c.Err() == nil {
if queue.IsEmpty() {
c.Log("load feeds")
span.AddEvent("start refresh loop")
it, err := loadFeeds(c.Context, db)
for ctx.Err() == nil {
if queue.IsEmpty() {
span.AddEvent("load feeds")
it, err := loadFeeds(ctx, db)
span.RecordError(err)
for f := range it {
queue.Insert(&f)
}
if err != nil {
c.Log(err)
return
}
}
span.AddEvent("queue", trace.WithAttributes(attribute.Int("size", int(queue.count))))
queue_size.Record(ctx, int64(queue.count))
f := queue.ExtractMin()
if f == nil {
c.Log("sleeping for ", TenMinutes*time.Second)
sleeping_time.Add(ctx, int64(TwoMinutes))
span.AddEvent("sleeping for ", trace.WithAttributes(attribute.Int("seconds", int(TwoMinutes))))
select {
case <-time.After(TenMinutes * time.Second):
case <-c.Done():
return
case <-time.After(TwoMinutes * time.Second):
case <-ctx.Done():
return nil
}
span.End()
continue
}
c.Log("queue size", queue.count, "next", f.URI, "next scan on", f.LastScanOn.Time.Format(time.RFC3339))
span.AddEvent("next", trace.WithAttributes(
attribute.Int("size", int(queue.count)),
attribute.String("uri", f.URI),
attribute.String("last scan on", f.LastScanOn.Time.Format(time.RFC3339)),
attribute.String("next scan on", f.NextScanOn.Time.Format(time.RFC3339)),
))
until := time.Until(f.NextScanOn.Time)
if until > 2*time.Hour {
span.AddEvent("too soon", trace.WithAttributes(attribute.String("uri", f.URI)))
span.End()
if time.Until(f.LastScanOn.Time) > 2*time.Hour {
c.Log("too soon", f.URI)
continue
}
span.AddEvent(
"till next",
trace.WithAttributes(attribute.String("time", until.String())))
sleeping_time.Add(ctx, until.Milliseconds())
select {
case <-c.Done():
case <-ctx.Done():
return nil
case t := <-time.After(until):
span.AddEvent("fetch", trace.WithAttributes(
attribute.Int("size", int(queue.count)),
attribute.String("uri", f.URI),
attribute.String("timeout", t.Format(time.RFC3339)),
attribute.String("next scan on", f.NextScanOn.Time.Format(time.RFC3339)),
))
}
fetch.Fetch(f)
}
span.RecordError(ctx.Err())
return ctx.Err()
}
func processorLoop(ctx context.Context, db db, fetch *pool[*Feed, *Response]) {
ctx, span := otel.Span(ctx)
defer span.End()
process_in_total, _ := otel.Meter().Int64Counter("xt_processed_in_total")
process_out_total, _ := otel.Meter().Int64Counter("xt_processed_out_total")
twts_histogram, _ := otel.Meter().Float64Histogram("xt_twt1k_bucket")
for ctx.Err() == nil {
select {
case <-ctx.Done():
return
case t := <-time.After(time.Until(f.LastScanOn.Time)):
c.Log("fetch", t.Format(time.RFC3339), f.Nick, f.URI)
fetch.Fetch(f)
case <-time.After(10 * time.Minute):
refreshLastTwt(ctx, db)
case res := <-fetch.Out():
c.Log("got response:", res.Request.URI)
f := res.Request
span.AddEvent("got response", trace.WithAttributes(
attribute.String("uri", f.URI),
attribute.String("scan on", f.NextScanOn.Time.Format(time.RFC3339)),
))
process_in_total.Add(ctx, 1)
f.LastScanOn.Time = time.Now()
f.LastScanOn.Valid = true
err := res.err
if res.err != nil {
f.LastError.String, f.LastError.Valid = err.Error(), true
if errors.Is(err, ErrPermanentlyDead) {
f.State = "permanantly-dead"
f.RefreshRate = TenYear
}
if errors.Is(err, ErrTemporarilyDead) {
f.RefreshRate = OneDay
f.RefreshRate, f.State = tsTemp(f.LastTwtOn.Time)
}
if errors.Is(err, ErrUnmodified) {
f.RefreshRate = OneDay
f.RefreshRate, f.State = tsTemp(f.LastTwtOn.Time)
}
c.Log(err)
err = f.Save(c.Context, db)
if err != nil {
c.Log(err)
return
}
span.RecordError(err)
f.LastError.String, f.LastError.Valid = err.Error(), true
err = f.Save(ctx, db)
span.RecordError(err)
continue
}
@@ -107,50 +161,139 @@ func refreshLoop(c console, app *appState) {
f.ETag.String, f.ETag.Valid = res.ETag(), true
f.LastModified.Time, f.LastModified.Valid = res.LastModified(), true
cpy, err := os.OpenFile(filepath.Join("feeds", urlNS.UUID5(f.URI).MarshalText()), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600)
rdr := io.TeeReader(res.Body, cpy)
twtfile, err := lextwt.ParseFile(rdr, &types.Twter{Nick: f.Nick, URI: f.URI})
span.AddEvent("read feed")
// cpy, err := os.OpenFile(filepath.Join("feeds", urlNS.UUID5(f.URI).MarshalText()), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600)
if err != nil {
c.Log(fmt.Errorf("%w: %w", ErrParseFailed, err))
span.RecordError(fmt.Errorf("%w: %w", ErrParseFailed, err))
f.LastError.String, f.LastError.Valid = err.Error(), true
f.RefreshRate = OneDay
return
}
if prev, ok := twtfile.Info().GetN("prev", 0); f.FirstFetch && ok {
_, part, ok := strings.Cut(prev.Value(), " ")
if ok {
part = f.URI[:strings.LastIndex(f.URI, "/")+1] + part
queue.Insert(&Feed{
FetchURI: part,
URI: f.URI,
Nick: f.Nick,
LastScanOn: f.LastScanOn,
RefreshRate: f.RefreshRate,
})
}
}
err = f.Save(ctx, db)
span.RecordError(err)
continue
}
var rdr io.Reader = res.Body
// rdr := io.TeeReader(rdr, cpy)
rdr = lextwt.TwtFixer(rdr)
twtfile, err := lextwt.ParseFile(rdr, &types.Twter{Nick: f.Nick, URI: f.URI})
//cpy.Close()
res.Body.Close()
err = storeFeed(db, twtfile)
if err != nil {
c.Log(err)
span.RecordError(fmt.Errorf("%w: %w", ErrParseFailed, err))
err = f.Save(c.Context, db)
c.Log(err)
return
f.LastError.String, f.LastError.Valid = err.Error(), true
f.RefreshRate = OneDay
err = f.Save(ctx, db)
span.RecordError(err)
continue
}
cpy.Close()
count := twtfile.Twts().Len()
span.AddEvent("parse complete", trace.WithAttributes(attribute.Int("count", count)))
twts_histogram.Record(ctx, float64(count)/1000)
f.LastScanOn.Time = time.Now()
f.RefreshRate = TenMinutes
err = storeFeed(ctx, db, twtfile)
if err != nil {
span.RecordError(err)
f.LastError.String, f.LastError.Valid = err.Error(), true
err = f.Save(ctx, db)
span.RecordError(err)
continue
}
if nick := twtfile.Twter().Nick; nick != "" {
f.Nick = nick
}
f.RefreshRate, f.State = checkTemp(twtfile.Twts())
f.LastError.String = ""
err = f.Save(c.Context, db)
if err != nil {
c.Log(err)
return
}
err = f.Save(ctx, db)
span.RecordError(err)
process_out_total.Add(ctx, 1)
}
}
span.RecordError(ctx.Err())
}
func checkTemp(twts types.Twts) (int, State) {
if len(twts) < 5 {
return 7 * OneDay, "cold"
}
sort.Sort(twts)
since_first := -time.Until(twts[0].Created())
since_fifth := -time.Until(twts[4].Created())
if since_first < 24*time.Hour || since_fifth < 32*time.Hour {
return TwoMinutes, "hot"
}
if since_first < 48*time.Hour || since_fifth < 64*time.Hour {
return TenMinutes, "hot"
}
if since_first < 96*time.Hour || since_fifth < 128*time.Hour {
return 2 * TenMinutes, "warm"
}
if since_first < 192*time.Hour || since_fifth < 256*time.Hour {
return 4 * TenMinutes, "warm"
}
if since_first < 384*time.Hour || since_fifth < 512*time.Hour {
return OneDay, "cold"
}
if since_first < 768*time.Hour || since_fifth < 1024*time.Hour {
return 2 * OneDay, "cold"
}
if since_first < 1536*time.Hour || since_fifth < 2048*time.Hour {
return 7 * OneDay, "frozen"
}
return OneMonth, "frozen"
}
func tsTemp(ts time.Time) (int, State) {
since_first := -time.Until(ts)
if since_first < 2*time.Hour {
return TwoMinutes, "hot"
}
if since_first < 4*time.Hour {
return TenMinutes, "hot"
}
if since_first < 8*time.Hour {
return 2 * TenMinutes, "warm"
}
if since_first < 16*time.Hour {
return 4 * TenMinutes, "warm"
}
if since_first < 24*time.Hour {
return OneDay, "cold"
}
if since_first < 48*time.Hour {
return 2 * OneDay, "cold"
}
if since_first < 96*time.Hour {
return 7 * OneDay, "frozen"
}
return OneMonth, "frozen"
}

View File

@@ -1,115 +0,0 @@
package main
import (
"context"
"database/sql"
"fmt"
"iter"
"os"
"strings"
"sync"
_ "embed"
_ "github.com/mattn/go-sqlite3"
"go.yarn.social/lextwt"
"go.yarn.social/types"
)
func run(c console) error {
a := c.Args()
app := &appState{
args: a,
feeds: sync.Map{},
queue: FibHeap(func(a, b *Feed) bool {
return a.LastScanOn.Time.Before(b.LastScanOn.Time)
}),
}
// Setup DB
err := func(ctx context.Context) error {
db, err := app.DB()
if err != nil {
return err
}
defer db.Close()
for _, stmt := range strings.Split(initSQL, ";") {
_, err = db.ExecContext(ctx, stmt)
if err != nil {
return err
}
}
return nil
}(c.Context)
if err != nil {
return err
}
// Seed File
err = func() error {
f, err := os.Open(a.baseFeed)
if err != nil {
return err
}
defer f.Close()
twtfile, err := lextwt.ParseFile(f, &types.Twter{
Nick: a.Nick,
URI: a.URI,
})
if err != nil {
return fmt.Errorf("%w: %w", ErrParseFailed, err)
}
db, err := app.DB()
if err != nil {
return err
}
defer db.Close()
return storeFeed(db, twtfile)
}()
if err != nil {
return err
}
go refreshLoop(c, app)
go httpServer(c, app)
<-c.Done()
return c.Err()
}
type appState struct {
args args
feeds sync.Map
queue *fibHeap[Feed]
}
func (app *appState) DB() (*sql.DB, error) {
return sql.Open(app.args.dbtype, app.args.dbfile)
}
func (app *appState) Feed(feedID string) *Feed {
return nil
}
func (app *appState) Feeds() iter.Seq2[string, *Feed] {
return func(yield func(string, *Feed) bool) {
app.feeds.Range(func(k, v any) bool {
key, _ := k.(string)
value, ok := v.(*Feed)
if !ok {
return false
}
if !yield(key, value) {
return false
}
return true
})
}
}