Compare commits

..

45 Commits

Author SHA1 Message Date
xuu
cec0bc81f1 fix 2025-09-22 16:43:30 -06:00
xuu
0a6ba26a0f fix 2025-09-22 16:43:20 -06:00
xuu
e54869e4a1 chore: assorted fixes 2025-04-07 13:17:44 -06:00
xuu
be614cb67e chore: fix 2025-04-06 19:47:53 -06:00
xuu
88b7197d17 chore: add video style 2025-04-06 19:45:08 -06:00
xuu
6499ba09cf chore: add deps 2025-04-04 14:20:38 -06:00
xuu
d6f811bd8f chore: add deps 2025-04-04 14:10:45 -06:00
xuu
922a1b1327 chore: guard on nil 2025-04-04 11:56:18 -06:00
xuu
f52827d62a chore: update deps 2025-04-04 11:49:17 -06:00
xuu
13be3d0688 chore: update deps 2025-04-04 11:30:09 -06:00
xuu
19ff8dbd8f chore: make handling more sensitive 2025-04-04 11:28:45 -06:00
xuu
fc200cf84f chore: fix usage bug 2025-04-02 16:09:11 -06:00
xuu
73eaac5bc6 chore: fix panic 2025-04-02 15:29:33 -06:00
xuu
7d336285bf chore: add view thread 2025-04-02 14:28:12 -06:00
xuu
14e1c4176d chore: more responsive 2025-04-02 14:22:10 -06:00
xuu
1ae8680f43 chore: use browser locale 2025-04-02 13:59:09 -06:00
xuu
4490a89f73 chore: remove font 2025-04-02 13:41:52 -06:00
xuu
c4191ec6cd chore: format html better 2025-04-02 13:20:04 -06:00
xuu
74fa69274d chore: add date formatting 2025-04-02 11:55:56 -06:00
xuu
6b8ad143fe chore: reverse twts for html 2025-04-02 11:17:46 -06:00
xuu
a7009dcb56 chore: add twt html formatting 2025-04-02 10:47:30 -06:00
xuu
ef65b115b7 chore: refactor 2025-03-31 17:39:24 -06:00
xuu
cd2c9abd1b chore: refactor http out 2025-03-31 15:23:40 -06:00
xuu
db93108d0b chore: docs 2025-03-31 11:57:30 -06:00
xuu
9db54a0ad9 chore: fix refresh query 2025-03-31 11:09:13 -06:00
xuu
e58cd8e3f1 chore: fix twts 2025-03-31 10:52:26 -06:00
xuu
69755e14d2 chore: fixes 2025-03-31 10:49:36 -06:00
xuu
aedc9245e5 chore: add twt_mentions table 2025-03-30 21:32:49 -06:00
xuu
fc762f3bf3 chore: fix twt hash 2025-03-30 21:14:11 -06:00
xuu
fb957ed25d chore: add mentions 2025-03-30 13:50:39 -06:00
xuu
6579c50c09 chore: add twt help 2025-03-30 12:12:10 -06:00
xuu
2bb2eec993 chore: filter out perma dead 2025-03-29 22:18:43 -06:00
xuu
07aba6d14a chore: adjust timing 2025-03-29 19:48:06 -06:00
xuu
dab5a115cf chore: fixes and import 2025-03-29 17:09:18 -06:00
xuu
84c3099be6 chore: show last scan for child feeds 2025-03-28 22:16:43 -06:00
xuu
d85dd56ac9 chore: fix twt filter 2025-03-28 17:39:55 -06:00
xuu
b863a2786e chore: helper function for preamble 2025-03-28 07:21:34 -06:00
xuu
00c97eb011 chore: fix twt query 2025-03-27 16:55:52 -06:00
xuu
62229deec5 chore: use XT_URI for seed 2025-03-27 16:52:01 -06:00
xuu
a3e6fc0c0f chore: refactor 2025-03-27 16:35:05 -06:00
xuu
303ca5a2db Merge pull request 'add-otel' (#3) from add-otel into main
Reviewed-on: #3
2025-03-26 20:49:41 -06:00
xuu
22d77d6aef chore: fix 2025-03-26 19:03:20 -06:00
xuu
a0614435ff chore: deps 2025-03-26 18:58:59 -06:00
xuu
fe28b7c2ad chore: go fmt 2025-03-26 18:57:09 -06:00
xuu
b34c9bc99f chore: changes to feed 2025-03-26 18:54:44 -06:00
17 changed files with 1352 additions and 683 deletions

2
.gitignore vendored
View File

@@ -5,4 +5,4 @@ feeds/
/xt
.env
*.txt
*.txt.xz
*.xz

89
app.go
View File

@@ -4,12 +4,12 @@ import (
"context"
"database/sql"
"fmt"
"iter"
"io"
"net/http"
"os"
"runtime/debug"
"strconv"
"strings"
"sync"
_ "embed"
@@ -18,14 +18,15 @@ import (
"go.opentelemetry.io/otel/attribute"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
"go.opentelemetry.io/otel/trace"
"go.sour.is/xt/internal/console"
"go.sour.is/xt/internal/otel"
"go.yarn.social/lextwt"
"go.yarn.social/types"
"golang.org/x/sync/errgroup"
)
func run(c *console) error {
ctx, span := otel.Span(c.Context)
func run(ctx context.Context, c *console.C[args]) error {
ctx, span := otel.Span(ctx)
defer span.End()
bi, _ := debug.ReadBuildInfo()
@@ -33,8 +34,8 @@ func run(c *console) error {
a := c.Args()
app := &appState{
args: a,
feeds: sync.Map{},
args: a,
C: c,
queue: FibHeap(func(a, b *Feed) bool {
return a.NextScanOn.Time.Before(b.NextScanOn.Time)
}),
@@ -69,52 +70,68 @@ func run(c *console) error {
ctx, span := otel.Span(ctx)
defer span.End()
f, err := os.Open(a.baseFeed)
if err != nil {
return err
}
defer f.Close()
twtfile, err := lextwt.ParseFile(f, &types.Twter{
Nick: a.Nick,
URI: a.URI,
})
if err != nil {
return fmt.Errorf("%w: %w", ErrParseFailed, err)
}
db, err := app.DB(ctx)
if err != nil {
return err
}
defer db.Close()
return storeFeed(ctx, db, twtfile)
var inFile io.Reader
if a.baseFeed != "" {
f, err := os.Open(a.baseFeed)
if err != nil {
return err
}
defer f.Close()
err = storeRegistry(ctx, db, f)
if err != nil {
return err
}
}
if a.URI != "" {
res, err := http.Get(a.URI)
if err != nil {
return err
}
inFile = res.Body
defer res.Body.Close()
twtfile, err := lextwt.ParseFile(inFile, &types.Twter{
Nick: a.Nick,
URI: a.URI,
})
if err != nil {
return fmt.Errorf("%w: %w", ErrParseFailed, err)
}
return storeFeed(ctx, db, twtfile)
}
return nil
}(ctx)
if err != nil {
return err
}
wg, ctx := errgroup.WithContext(ctx)
c.Context = ctx
wg.Go(func() error {
return feedRefreshProcessor(c, app)
return feedRefreshProcessor(ctx, app)
})
go httpServer(c, app)
go httpServer(ctx, app)
err = wg.Wait()
if err != nil {
return err
}
return c.Context.Err()
return ctx.Err()
}
type appState struct {
args args
feeds sync.Map
queue *fibHeap[Feed]
*console.C[args]
}
type db struct {
@@ -173,23 +190,3 @@ func (app *appState) DB(ctx context.Context) (db, error) {
return db, err
}
func (app *appState) Feed(feedID string) *Feed {
return nil
}
func (app *appState) Feeds() iter.Seq2[string, *Feed] {
return func(yield func(string, *Feed) bool) {
app.feeds.Range(func(k, v any) bool {
key, _ := k.(string)
value, ok := v.(*Feed)
if !ok {
return false
}
if !yield(key, value) {
return false
}
return true
})
}
}

View File

@@ -1,92 +0,0 @@
package main
import (
"context"
"database/sql"
"fmt"
"hash/fnv"
"iter"
"os"
"github.com/oklog/ulid/v2"
"github.com/uptrace/opentelemetry-go-extra/otelsql"
"go.yarn.social/lextwt"
"go.yarn.social/types"
)
func main() {
in := os.Stdin
if len(os.Args) != 2 {
fmt.Fprint(os.Stderr, "usage: ", os.Args[0], "[db file]")
}
db, err := DB(context.Background(), os.Args[1])
if err != nil {
panic(err)
}
_ = db
for line := range lextwt.IterRegistry(in) {
_ = line
}
}
const MaxVariableNumber = 32766
func DB(ctx context.Context, cnx string) (*sql.DB, error) {
// return sql.Open(app.args.dbtype, app.args.dbfile)
db, err := otelsql.Open("sqlite", cnx)
if err != nil {
return db, err
}
return db, err
}
func makeULID(twt types.Twt) ulid.ULID {
h64 := fnv.New64a()
h16 := fnv.New32a()
text := []byte(fmt.Sprintf("%+l", twt))
b := make([]byte, 10)
copy(b, h16.Sum(text)[:2])
copy(b[2:], h64.Sum(text))
u := ulid.ULID{}
u.SetTime(ulid.Timestamp(twt.Created()))
u.SetEntropy(b)
return u
}
func chunk(args []any, qry func(int) (string, int), maxArgs int) iter.Seq2[string, []any] {
_, size := qry(1)
itemsPerIter := maxArgs / size
if len(args) < size {
return func(yield func(string, []any) bool) {}
}
if len(args) < maxArgs {
return func(yield func(string, []any) bool) {
query, _ := qry(len(args) / size)
yield(query, args)
}
}
return func(yield func(string, []any) bool) {
for len(args) > 0 {
if len(args) > maxArgs {
query, size := qry(itemsPerIter)
if !yield(query, args[:size]) {
return
}
args = args[size:]
continue
}
query, _ := qry(len(args) / size)
yield(query, args)
return
}
}
}

596
feed.go
View File

@@ -3,10 +3,11 @@ package main
import (
"cmp"
"context"
"crypto/sha3"
"database/sql"
"database/sql/driver"
"fmt"
"hash/fnv"
"io"
"iter"
"net/http"
"net/url"
@@ -17,14 +18,17 @@ import (
_ "embed"
"github.com/oklog/ulid/v2"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.sour.is/xt/internal/otel"
"go.sour.is/xt/internal/uuid"
"go.yarn.social/lextwt"
"go.yarn.social/types"
)
type Feed struct {
FeedID uuid
ParentID uuid
FeedID uuid.UUID
ParentID uuid.UUID
HashURI string
URI string
Nick string
@@ -74,10 +78,12 @@ var (
refresh_rate
)
values (?, ?, ?, ?, ?, ?, ?)` + repeat + `
ON CONFLICT (feed_id) DO NOTHING`, r * 7
ON CONFLICT (feed_id) DO NOTHING
`, r * 7
}
updateFeed = `
update feeds set
nick = ?,
state = ?,
last_scan_on = ?,
refresh_rate = ?,
@@ -93,10 +99,20 @@ var (
repeat = strings.Repeat(", (?, ?, ?, ?, ?, ?, ?)", r-1)
}
return `
insert into twts
(feed_id, ulid, text, hash, conv, mentions, tags)
insert into twts (
feed_id,
ulid,
text,
hash,
conv,
mentions,
tags
)
values (?, ?, ?, ?, ?, ?, ?)` + repeat + `
ON CONFLICT (feed_id, ulid) DO NOTHING`, r * 7
ON CONFLICT (feed_id, ulid) DO UPDATE SET
conv = excluded.conv,
hash = excluded.hash
`, r * 7
}
fetchFeeds = `
@@ -118,10 +134,10 @@ var (
last_modified_on,
last_etag
from feeds
left join (
select feed_id, max(strftime('%Y-%m-%dT%H:%M:%fZ', (substring(text, 1, instr(text, ' ')-1)))) last_twt_on
from twts group by feed_id
) using (feed_id)
left join (
select feed_id, max(strftime('%Y-%m-%dT%H:%M:%fZ', (substring(text, 1, instr(text, ' ')-1)))) last_twt_on
from twts group by feed_id
) using (feed_id)
left join (
select
feed_id parent_id,
@@ -134,26 +150,12 @@ var (
'+'||abs(refresh_rate+cast(random()%30 as int))||' seconds'
) < datetime(current_timestamp, '+3 minutes')
`
permaban = []string{
"//lublin.se/",
"//enotty.dk/",
}
)
func (f *Feed) Create(ctx context.Context, db db) error {
ctx, span := otel.Span(ctx)
defer span.End()
query, _ := insertFeed(1)
_, err := db.ExecContext(
ctx,
query,
f.FeedID, // feed_id
f.ParentID, // parent_id
f.Nick, // nick
f.URI, // uri
f.State, // state
f.LastScanOn, // last_scan_on
f.RefreshRate, // refresh_rate
)
return err
}
func (f *Feed) Save(ctx context.Context, db db) error {
ctx, span := otel.Span(ctx)
defer span.End()
@@ -161,6 +163,7 @@ func (f *Feed) Save(ctx context.Context, db db) error {
_, err := db.ExecContext(
ctx,
updateFeed,
f.Nick,
f.State, // state
f.LastScanOn, // last_scan_on
f.RefreshRate, // refresh_rate
@@ -234,7 +237,13 @@ func storeFeed(ctx context.Context, db db, f types.TwtFile) error {
loadTS := time.Now()
refreshRate := 600
feedID := urlNS.UUID5(cmp.Or(f.Twter().HashingURI, f.Twter().URI))
feedURI, _ := f.Info().GetN("uri", 0)
feedID := uuid.UrlNS.UUID5(cmp.Or(
feedURI.Value(),
f.Twter().HashingURI,
f.Twter().URI,
))
tx, err := db.BeginTx(ctx, nil)
if err != nil {
@@ -244,18 +253,18 @@ func storeFeed(ctx context.Context, db db, f types.TwtFile) error {
followers := f.Info().GetAll("follow")
followMap := make(map[string]string, len(followers))
for _, f := range f.Info().GetAll("follow") {
nick, uri, ok := strings.Cut(f.Value(), "http")
nick, uri, ok := strings.Cut(f.Value(), " ")
if !ok {
continue
}
nick = strings.TrimSpace(nick)
uri = "http" + strings.TrimSpace(uri)
uri = strings.TrimSpace(uri)
if _, err := url.Parse(uri); err != nil {
continue
}
followMap[nick] = uri
followMap[uri] = nick
}
defer tx.Rollback()
@@ -265,13 +274,15 @@ func storeFeed(ctx context.Context, db db, f types.TwtFile) error {
args := make([]any, 0, size)
for _, twt := range twts {
mentions := make(uuids, 0, len(twt.Mentions()))
twtID := makeULID(twt)
mentions := make(uuid.UUIDs, 0, len(twt.Mentions()))
for _, mention := range twt.Mentions() {
followMap[mention.Twter().Nick] = mention.Twter().URI
mentions = append(mentions, urlNS.UUID5(mention.Twter().URI))
followMap[mention.Twter().URI] = mention.Twter().Nick
mentions = append(mentions, uuid.UrlNS.UUID5(mention.Twter().URI))
}
tags := make(strList, 0, len(twt.Tags()))
tags := make(uuid.List, 0, len(twt.Tags()))
for _, tag := range twt.Tags() {
tags = append(tags, tag.Text())
}
@@ -283,13 +294,14 @@ func storeFeed(ctx context.Context, db db, f types.TwtFile) error {
subjectTag = tag.Text()
}
}
args = append(
args,
feedID, // feed_id
makeULID(twt), // ulid
twtID, // ulid
fmt.Sprintf("%+l", twt), // text
subjectTag, // conv
twt.Hash(), // hash
subjectTag, // conv
mentions.ToStrList(), // mentions
tags, // tags
)
@@ -322,9 +334,16 @@ func storeFeed(ctx context.Context, db db, f types.TwtFile) error {
_, part, ok := strings.Cut(prev.Value(), " ")
if ok {
uri := f.Twter().URI
part = uri[:strings.LastIndex(uri, "/")+1] + part
childID := urlNS.UUID5(part)
if u, ok := f.Info().GetN("url", 0); ok {
uri = u.Value()
}
if u, ok := f.Info().GetN("uri", 0); ok {
uri = u.Value()
}
part = uri[:strings.LastIndex(uri, "/")+1] + part
childID := uuid.UrlNS.UUID5(part)
fmt.Println("found prev", uri, part)
args = append(args,
childID, // feed_id
feedID, // parent_id
@@ -332,20 +351,20 @@ func storeFeed(ctx context.Context, db db, f types.TwtFile) error {
part, // uri
"once", // state
nil, // last_scan_on
refreshRate, // refresh_rate
0, // refresh_rate
)
}
}
for nick, uri := range followMap {
for uri, nick := range followMap {
args = append(args,
urlNS.UUID5(uri), // feed_id
nil, // parent_id
nick, // nick
uri, // uri
"warm", // state
nil, // last_scan_on
refreshRate, // refresh_rate
uuid.UrlNS.UUID5(uri), // feed_id
nil, // parent_id
nick, // nick
uri, // uri
"warm", // state
nil, // last_scan_on
refreshRate, // refresh_rate
)
}
for query, args := range chunk(args, insertFeed, db.MaxVariableNumber) {
@@ -361,13 +380,138 @@ func storeFeed(ctx context.Context, db db, f types.TwtFile) error {
return tx.Commit()
}
func storeRegistry(ctx context.Context, db db, in io.Reader) error {
ctx, span := otel.Span(ctx)
defer span.End()
twters := make(map[string]string)
args := make([]any, 0, 1024*16)
i := 0
for line := range lextwt.IterRegistry(in) {
twt, ok := line.(*lextwt.Twt)
if !ok {
continue
}
nick := twt.Twter().DomainNick()
uri := twt.Twter().URI
feedID := uuid.UrlNS.UUID5(uri)
twtID := makeULID(twt)
text := fmt.Sprintf("%+l", twt)
// if !strings.HasPrefix(uri, "http") {
// fmt.Println("skip bad uri ", nick, uri)
// continue
// }
// if strings.HasPrefix(nick, "http") {
// fmt.Println("skip bad nick", nick, uri)
// continue
// }
twters[uri] = nick
mentions := make(uuid.UUIDs, 0, len(twt.Mentions()))
for _, mention := range twt.Mentions() {
twters[uri] = nick
mentions = append(mentions, uuid.UrlNS.UUID5(mention.Twter().URI))
}
tags := make(uuid.List, 0, len(twt.Tags()))
for _, tag := range twt.Tags() {
tags = append(tags, tag.Text())
}
subject := twt.Subject()
subjectTag := ""
if tag, ok := subject.Tag().(*lextwt.Tag); ok && tag != nil {
subjectTag = tag.Text()
}
args = append(
args,
feedID, // feed_id
twtID, // ulid
text, // text
twt.Hash(), // hash
subjectTag, // conv
mentions.ToStrList(), // mentions
tags, // tags
)
if len(args) >= 16*1022 {
i += len(args)
fmt.Println("store", i/7, i)
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return err
}
for query, args := range chunk(args, insertTwt, db.MaxVariableNumber) {
// fmt.Println("store", len(args))
_, err = tx.ExecContext(
ctx,
query,
args...,
)
if err != nil {
return err
}
}
args = args[:0]
for uri, nick := range twters {
// if !strings.HasPrefix(uri, "http") {
// fmt.Println("skip", nick, uri)
// continue
// }
// if strings.HasPrefix(nick, "http") {
// fmt.Println("skip bad nick", nick, uri)
// continue
// }
feedID := uuid.UrlNS.UUID5(uri)
args = append(args,
feedID, // feed_id
nil, // parent_id
nick, // nick
uri, // uri
PermanentlyDead, // state
nil, // last_scan_on
TenYear, // refresh_rate
)
}
for query, args := range chunk(args, insertFeed, db.MaxVariableNumber) {
_, err = tx.ExecContext(
ctx,
query,
args...,
)
if err != nil {
return err
}
}
args = args[:0]
err = tx.Commit()
if err != nil {
return err
}
}
}
return refreshLastTwt(ctx, db)
}
func (feed *Feed) MakeHTTPRequest(ctx context.Context) (*http.Request, error) {
if strings.Contains(feed.URI, "lublin.se") {
return nil, fmt.Errorf("%w: permaban: %s", ErrPermanentlyDead, feed.URI)
}
if strings.Contains(feed.URI, "enotty.dk") {
return nil, fmt.Errorf("%w: permaban: %s", ErrPermanentlyDead, feed.URI)
for _, host := range permaban {
if strings.Contains(feed.URI, host) {
return nil, fmt.Errorf("%w: permaban: %s", ErrPermanentlyDead, feed.URI)
}
}
req, err := http.NewRequestWithContext(ctx, "GET", feed.URI, nil)
@@ -385,6 +529,7 @@ func (feed *Feed) MakeHTTPRequest(ctx context.Context) (*http.Request, error) {
req.Header.Add("If-None-Match", feed.ETag.String)
}
// TODO: this is probably not needed.
if feed.DiscloseFeedURL != "" && feed.DiscloseNick != "" {
req.Header.Set("User-Agent", fmt.Sprintf("xt/%s (+%s; @%s)",
feed.Version, feed.DiscloseFeedURL, feed.DiscloseNick))
@@ -409,8 +554,8 @@ func (n *TwtTime) Scan(value any) error {
n.Time, n.Valid = time.Time{}, false
return nil
case string:
n.Valid = true
n.Time, err = time.Parse(time.RFC3339, value)
n.Valid = err == nil
case time.Time:
n.Valid = true
n.Time = value
@@ -427,15 +572,10 @@ func (n TwtTime) Value() (driver.Value, error) {
}
func makeULID(twt types.Twt) ulid.ULID {
h64 := fnv.New64a()
h16 := fnv.New32a()
text := []byte(fmt.Sprintf("%+l", twt))
b := make([]byte, 10)
copy(b, h16.Sum(text)[:2])
copy(b[2:], h64.Sum(text))
text := fmt.Appendf(nil, "%s\t%+l", twt.Twter().URI, twt)
u := ulid.ULID{}
u.SetTime(ulid.Timestamp(twt.Created()))
u.SetEntropy(b)
u.SetEntropy(sha3.SumSHAKE128(text, 10))
return u
}
@@ -472,3 +612,333 @@ func chunk(args []any, qry func(int) (string, int), maxArgs int) iter.Seq2[strin
}
}
}
func refreshLastTwt(ctx context.Context, db db) error {
qry := `
delete from last_twt_on;
insert into last_twt_on (feed_id, last_twt_on)
select distinct
feed_id,
max(strftime('%Y-%m-%dT%H:%M:%fZ', (substring(text, 1, instr(text, ' ')-1)))) last_twt_on
from twts
group by feed_id;
delete from twt_mentions;
insert into twt_mentions (ulid, feed_id)
select distinct
ulid,
unhex(replace(trim(value,'{}'),'-','')) feed_id
from twts, json_each(mentions);
`
var err error
for _, stmt := range strings.Split(qry, ";") {
_, err = db.ExecContext(ctx, stmt)
if err != nil {
return err
}
}
return err
}
func fetchTwts(ctx context.Context, db db, uri string, limit int, offset int64) ([]types.Twt, int64, int64, error) {
ctx, span := otel.Span(ctx)
defer span.End()
args := make([]any, 0, 3)
where := `where feed_id in (select feed_id from feeds where state != 'permanantly-dead')`
if uri != "" {
feed_id := uuid.UrlNS.UUID5(uri)
where = "where feed_id = ?"
args = append(args, feed_id)
}
var end int64
err := db.QueryRowContext(ctx, `
select count(*) n from twts `+where+``, args...).Scan(&end)
span.RecordError(err)
if err != nil {
return nil, 0, 0, err
}
if offset < 1 {
offset += end
}
offset = max(1, offset)
args = append(args, limit, offset-int64(limit))
span.AddEvent("twts", trace.WithAttributes(
attribute.Int("limit", limit),
attribute.Int64("offset-end", offset),
attribute.Int64("offset-start", offset-int64(limit)),
attribute.Int64("max", end),
))
qry := `
SELECT
feed_id,
hash,
conv,
coalesce(nick, 'nobody') nick,
coalesce(uri, 'https://empty.txt') uri,
text
FROM twts
join (
select feed_id, nick, uri
from feeds
) using (feed_id)
where rowid in (
select rowid from twts
` + where + `
order by ulid asc
limit ?
offset ?
)
order by ulid asc`
fmt.Println(qry, args)
rows, err := db.QueryContext(
ctx, qry, args...,
)
if err != nil {
span.RecordError(err)
return nil, 0, 0, err
}
defer rows.Close()
var twts []types.Twt
for rows.Next() {
var o struct {
FeedID string
Hash string
Conv string
Dt string
Nick string
URI string
Text string
}
err = rows.Scan(&o.FeedID, &o.Hash, &o.Conv, &o.Nick, &o.URI, &o.Text)
if err != nil {
span.RecordError(err)
return nil, 0, 0, err
}
twter := types.NewTwter(o.Nick, o.URI)
twt, _ := lextwt.ParseLine(o.Text, &twter)
twts = append(twts, twt)
}
return twts, offset, end, err
}
func fetchUsers(ctx context.Context, db db, uri, q string) ([]types.Twt, error) {
ctx, span := otel.Span(ctx)
defer span.End()
where := `where parent_id is null and state not in ('permanantly-dead', 'frozen') and last_twt_on is not null`
args := make([]any, 0)
if uri != "" {
where = `where feed_id = ? or parent_id = ?`
feed_id := uuid.UrlNS.UUID5(uri)
args = append(args, feed_id, feed_id)
} else if q != "" {
where = `where nick like ?`
args = append(args, "%"+q+"%")
}
qry := `
SELECT
feed_id,
uri,
nick,
last_scan_on,
coalesce(last_twt_on, last_scan_on) last_twt_on
FROM feeds
left join last_twt_on using (feed_id)
` + where + `
order by nick, uri
`
fmt.Println(qry, args)
rows, err := db.QueryContext(ctx, qry, args...)
if err != nil {
span.RecordError(err)
return nil, err
}
defer rows.Close()
var twts []types.Twt
for rows.Next() {
var o struct {
FeedID string
URI string
Nick string
Dt TwtTime
LastTwtOn TwtTime
}
err = rows.Scan(&o.FeedID, &o.URI, &o.Nick, &o.Dt, &o.LastTwtOn)
if err != nil {
span.RecordError(err)
return nil, err
}
twts = append(twts, lextwt.NewTwt(
types.NewTwter(o.Nick, o.URI),
lextwt.NewDateTime(o.Dt.Time, o.LastTwtOn.Time.Format(time.RFC3339)),
nil,
))
}
return twts, nil
}
func fetchMentions(ctx context.Context, db db, mention uuid.UUID, limit int, offset int64) ([]types.Twt, int64, int64, error) {
ctx, span := otel.Span(ctx)
defer span.End()
args := make([]any, 0, 3)
args = append(args, mention)
var end int64
err := db.QueryRowContext(ctx, `
select count(*) n from twt_mentions where feed_id = ?`, args...).Scan(&end)
span.RecordError(err)
if err != nil {
return nil, 0, 0, err
}
fmt.Println(mention.MarshalText(), end, err)
if offset < 1 {
offset += end
}
limit = min(100, max(1, limit))
offset = max(1, offset)
args = append(args, limit, offset-int64(limit))
qry := `
SELECT
feed_id,
hash,
conv,
coalesce(nick, 'nobody') nick,
coalesce(uri, 'https://empty.txt') uri,
text
FROM twts
join (
select feed_id, nick, uri
from feeds
) using (feed_id)
where rowid in (
select rowid from twts
where ulid in (select ulid from twt_mentions where feed_id = ?)
order by ulid asc
limit ?
offset ?
)
order by ulid asc
`
fmt.Println(qry, args)
rows, err := db.QueryContext(
ctx, qry, args...,
)
if err != nil {
span.RecordError(err)
return nil, 0, 0, err
}
defer rows.Close()
var twts []types.Twt
for rows.Next() {
var o struct {
FeedID string
Hash string
Conv string
Dt string
Nick string
URI string
Text string
}
err = rows.Scan(&o.FeedID, &o.Hash, &o.Conv, &o.Nick, &o.URI, &o.Text)
if err != nil {
span.RecordError(err)
return nil, 0, 0, err
}
twter := types.NewTwter(o.Nick, o.URI)
// o.Text = strings.ReplaceAll(o.Text, "\n", "\u2028")
twt, _ := lextwt.ParseLine(o.Text, &twter)
twts = append(twts, twt)
}
if err := rows.Err(); err != nil {
fmt.Println(err)
return nil, 0, 0, err
}
return twts, offset, end, err
}
func fetchConv(ctx context.Context, db db, hash string, _ int, offset int64) ([]types.Twt, int64, int64, error) {
ctx, span := otel.Span(ctx)
defer span.End()
var end int64
err := db.QueryRowContext(ctx, `
select count(*) n from twts where hash = $1 or conv = $1`, hash).Scan(&end)
span.RecordError(err)
if err != nil {
return nil, 0, 0, err
}
rows, err := db.QueryContext(
ctx, `
SELECT
feed_id,
hash,
conv,
nick,
uri,
text
FROM twts
JOIN (
SELECT
feed_id,
nick,
uri
FROM feeds
) using (feed_id)
WHERE
hash = $1 or
conv = $1
order by ulid asc`, hash,
)
if err != nil {
span.RecordError(err)
return nil, 0, 0, err
}
defer rows.Close()
var twts []types.Twt
for rows.Next() {
var o struct {
FeedID string
Hash string
Conv string
Dt string
Nick string
URI string
Text string
}
err = rows.Scan(&o.FeedID, &o.Hash, &o.Conv, &o.Nick, &o.URI, &o.Text)
if err != nil {
span.RecordError(err)
return nil, 0, 0, err
}
twter := types.NewTwter(o.Nick, o.URI)
twt, _ := lextwt.ParseLine(o.Text, &twter)
twts = append(twts, twt)
}
err = rows.Err()
return twts, offset, end, err
}

View File

@@ -74,6 +74,7 @@ func NewHTTPFetcher() *httpFetcher {
ForceAttemptHTTP2: false,
MaxIdleConns: 100,
IdleConnTimeout: 10 * time.Second,
ResponseHeaderTimeout: 5 * time.Second,
TLSHandshakeTimeout: 5 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
},

13
go.mod
View File

@@ -1,11 +1,11 @@
module go.sour.is/xt
go 1.23.2
go 1.24.1
require (
github.com/mattn/go-sqlite3 v1.14.24
go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp v0.6.0
go.yarn.social/lextwt v0.1.5-0.20250327005027-02d9b44de4dd
go.yarn.social/lextwt v0.1.5-0.20250406192339-cf769bfa2521
)
require (
@@ -21,7 +21,7 @@ require (
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.34.0 // indirect
go.opentelemetry.io/proto/otlp v1.5.0 // indirect
golang.org/x/net v0.34.0 // indirect
golang.org/x/text v0.23.0 // indirect
golang.org/x/text v0.24.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20250115164207-1a7da9e5054f // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250115164207-1a7da9e5054f // indirect
google.golang.org/protobuf v1.36.5 // indirect
@@ -51,14 +51,13 @@ require (
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.34.0
go.opentelemetry.io/otel/exporters/prometheus v0.57.0
go.opentelemetry.io/otel/exporters/stdout/stdoutlog v0.10.0
go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.34.0
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.34.0
go.opentelemetry.io/otel/sdk v1.35.0
go.opentelemetry.io/otel/sdk/log v0.10.0
go.opentelemetry.io/otel/sdk/metric v1.35.0
go.yarn.social/types v0.0.0-20250108134258-ed75fa653ede
golang.org/x/crypto v0.36.0 // indirect
golang.org/x/sync v0.12.0
golang.org/x/sys v0.31.0 // indirect
golang.org/x/crypto v0.37.0 // indirect
golang.org/x/sync v0.13.0
golang.org/x/sys v0.32.0 // indirect
google.golang.org/grpc v1.70.0 // indirect
)

52
go.sum
View File

@@ -14,8 +14,8 @@ github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.25.1 h1:VNqngBF40hVlDloBruUehVYC3ArSgIyScOAyMRqBxRg=
@@ -35,8 +35,6 @@ github.com/mattn/go-sqlite3 v1.14.24 h1:tpSp2G2KyMnnQu99ngJ47EIkWVmliIizyZBfPrBW
github.com/mattn/go-sqlite3 v1.14.24/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4=
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
github.com/oklog/ulid/v2 v2.1.0 h1:+9lhoxAP56we25tyYETBBY1YLA2SaoLvUFgrP2miPJU=
github.com/oklog/ulid/v2 v2.1.0/go.mod h1:rcEKHmBBKfef9DhnvX7y1HZBYxjXb0cP5ExxNsTT1QQ=
github.com/pborman/getopt v0.0.0-20170112200414-7148bc3a4c30/go.mod h1:85jBQOZwpVEaDAr341tbn15RS4fCAsIst0qp7i8ex1o=
@@ -46,8 +44,6 @@ github.com/prometheus/client_golang v1.20.5 h1:cxppBPuYhUnsO6yo/aoRol4L7q7UFfdm+
github.com/prometheus/client_golang v1.20.5/go.mod h1:PIEt8X02hGcP8JWbeHyeZ53Y/jReSnHgO035n//V5WE=
github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E=
github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY=
github.com/prometheus/common v0.55.0 h1:KEi6DK7lXW/m7Ig5i47x0vRzuBsHuvJdi5ee6Y3G1dc=
github.com/prometheus/common v0.55.0/go.mod h1:2SECS4xJG1kd8XF9IcM1gMX6510RAEL65zxzNImwdc8=
github.com/prometheus/common v0.62.0 h1:xasJaQlnWAeyHdUBeGjXmutelfJHWMRr+Fg4QszZ2Io=
github.com/prometheus/common v0.62.0/go.mod h1:vyBcEuLSvWos9B1+CyL7JZ2up+uFzXhkqml0W5zIY1I=
github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc=
@@ -66,8 +62,6 @@ go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJyS
go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A=
go.opentelemetry.io/contrib/bridges/otelslog v0.9.0 h1:N+78eXSlu09kii5nkiM+01YbtWe01oZLPPLhNlEKhus=
go.opentelemetry.io/contrib/bridges/otelslog v0.9.0/go.mod h1:/2KhfLAhtQpgnhIk1f+dftA3fuuMcZjiz//Dc9yfaEs=
go.opentelemetry.io/otel v1.34.0 h1:zRLXxLCgL1WyKsPVrgbSdMN4c0FMkDAskSTQP+0hdUY=
go.opentelemetry.io/otel v1.34.0/go.mod h1:OWFPOQ+h4G8xpyjgqo4SxJYdDQ/qmRH+wivy7zzx9oI=
go.opentelemetry.io/otel v1.35.0 h1:xKWKPxrxB6OtMCbmMY021CqC45J+3Onta9MqjhnusiQ=
go.opentelemetry.io/otel v1.35.0/go.mod h1:UEqy8Zp11hpkUrL73gSlELM0DupHoiq72dR+Zqel/+Y=
go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp v0.6.0 h1:QSKmLBzbFULSyHzOdO9JsN9lpE4zkrz1byYGmJecdVE=
@@ -80,67 +74,45 @@ go.opentelemetry.io/otel/exporters/prometheus v0.57.0 h1:AHh/lAP1BHrY5gBwk8ncc25
go.opentelemetry.io/otel/exporters/prometheus v0.57.0/go.mod h1:QpFWz1QxqevfjwzYdbMb4Y1NnlJvqSGwyuU0B4iuc9c=
go.opentelemetry.io/otel/exporters/stdout/stdoutlog v0.10.0 h1:GKCEAZLEpEf78cUvudQdTg0aET2ObOZRB2HtXA0qPAI=
go.opentelemetry.io/otel/exporters/stdout/stdoutlog v0.10.0/go.mod h1:9/zqSWLCmHT/9Jo6fYeUDRRogOLL60ABLsHWS99lF8s=
go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.34.0 h1:czJDQwFrMbOr9Kk+BPo1y8WZIIFIK58SA1kykuVeiOU=
go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.34.0/go.mod h1:lT7bmsxOe58Tq+JIOkTQMCGXdu47oA+VJKLZHbaBKbs=
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.34.0 h1:jBpDk4HAUsrnVO1FsfCfCOTEc/MkInJmvfCHYLFiT80=
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.34.0/go.mod h1:H9LUIM1daaeZaz91vZcfeM0fejXPmgCYE8ZhzqfJuiU=
go.opentelemetry.io/otel/log v0.10.0 h1:1CXmspaRITvFcjA4kyVszuG4HjA61fPDxMb7q3BuyF0=
go.opentelemetry.io/otel/log v0.10.0/go.mod h1:PbVdm9bXKku/gL0oFfUF4wwsQsOPlpo4VEqjvxih+FM=
go.opentelemetry.io/otel/metric v1.34.0 h1:+eTR3U0MyfWjRDhmFMxe2SsW64QrZ84AOhvqS7Y+PoQ=
go.opentelemetry.io/otel/metric v1.34.0/go.mod h1:CEDrp0fy2D0MvkXE+dPV7cMi8tWZwX3dmaIhwPOaqHE=
go.opentelemetry.io/otel/metric v1.35.0 h1:0znxYu2SNyuMSQT4Y9WDWej0VpcsxkuklLa4/siN90M=
go.opentelemetry.io/otel/metric v1.35.0/go.mod h1:nKVFgxBZ2fReX6IlyW28MgZojkoAkJGaE8CpgeAU3oE=
go.opentelemetry.io/otel/sdk v1.34.0 h1:95zS4k/2GOy069d321O8jWgYsW3MzVV+KuSPKp7Wr1A=
go.opentelemetry.io/otel/sdk v1.34.0/go.mod h1:0e/pNiaMAqaykJGKbi+tSjWfNNHMTxoC9qANsCzbyxU=
go.opentelemetry.io/otel/sdk v1.35.0 h1:iPctf8iprVySXSKJffSS79eOjl9pvxV9ZqOWT0QejKY=
go.opentelemetry.io/otel/sdk v1.35.0/go.mod h1:+ga1bZliga3DxJ3CQGg3updiaAJoNECOgJREo9KHGQg=
go.opentelemetry.io/otel/sdk/log v0.10.0 h1:lR4teQGWfeDVGoute6l0Ou+RpFqQ9vaPdrNJlST0bvw=
go.opentelemetry.io/otel/sdk/log v0.10.0/go.mod h1:A+V1UTWREhWAittaQEG4bYm4gAZa6xnvVu+xKrIRkzo=
go.opentelemetry.io/otel/sdk/metric v1.34.0 h1:5CeK9ujjbFVL5c1PhLuStg1wxA7vQv7ce1EK0Gyvahk=
go.opentelemetry.io/otel/sdk/metric v1.34.0/go.mod h1:jQ/r8Ze28zRKoNRdkjCZxfs6YvBTG1+YIqyFVFYec5w=
go.opentelemetry.io/otel/sdk/metric v1.35.0 h1:1RriWBmCKgkeHEhM7a2uMjMUfP7MsOF5JpUCaEqEI9o=
go.opentelemetry.io/otel/sdk/metric v1.35.0/go.mod h1:is6XYCUMpcKi+ZsOvfluY5YstFnhW0BidkR+gL+qN+w=
go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC8mh/k=
go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE=
go.opentelemetry.io/otel/trace v1.35.0 h1:dPpEfJu1sDIqruz7BHFG3c7528f6ddfSWfFDVt/xgMs=
go.opentelemetry.io/otel/trace v1.35.0/go.mod h1:WUk7DtFp1Aw2MkvqGdwiXYDZZNvA/1J8o6xRXLrIkyc=
go.opentelemetry.io/proto/otlp v1.5.0 h1:xJvq7gMzB31/d406fB8U5CBdyQGw4P399D1aQWU/3i4=
go.opentelemetry.io/proto/otlp v1.5.0/go.mod h1:keN8WnHxOy8PG0rQZjJJ5A2ebUoafqWp0eVQ4yIXvJ4=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
go.yarn.social/lextwt v0.0.0-20250213063805-7adc6ca07564 h1:z+IAMtxNKWcLNm9nLzJwHw6OPkV5JoQYmmFohaUvcKI=
go.yarn.social/lextwt v0.0.0-20250213063805-7adc6ca07564/go.mod h1:JOPCOh+3bHv+BMaFZpKzw6soiXbIlZD5b2f7YKDDjqk=
go.yarn.social/lextwt v0.1.5-0.20250327005027-02d9b44de4dd h1:Np3zWtQ0GB9WhRFCPblaItLVtdy8Y35QKL+PUvRR/t8=
go.yarn.social/lextwt v0.1.5-0.20250327005027-02d9b44de4dd/go.mod h1:P36NPegLbhbFa1A0JOLsDyIQcdM0zdmx8kPKACXry4A=
go.yarn.social/lextwt v0.1.5-0.20250406192339-cf769bfa2521 h1:mqjKr+llWXgYtAWNsRx1+S45bhr6AgI0eLFhMhSYds4=
go.yarn.social/lextwt v0.1.5-0.20250406192339-cf769bfa2521/go.mod h1:P36NPegLbhbFa1A0JOLsDyIQcdM0zdmx8kPKACXry4A=
go.yarn.social/types v0.0.0-20250108134258-ed75fa653ede h1:XV9tuDQ605xxH4qIQPRHM1bOa7k0rJZ2RqA5kz2Nun4=
go.yarn.social/types v0.0.0-20250108134258-ed75fa653ede/go.mod h1:+xnDkQ0T0S8emxWIsvxlCAoyF8gBaj0q81hr/VrKc0c=
golang.org/x/crypto v0.33.0 h1:IOBPskki6Lysi0lo9qQvbxiQ+FvsCC/YWOecCHAixus=
golang.org/x/crypto v0.33.0/go.mod h1:bVdXmD7IV/4GdElGPozy6U7lWdRXA4qyRVGJV57uQ5M=
golang.org/x/crypto v0.36.0 h1:AnAEvhDddvBdpY+uR+MyHmuZzzNqXSe/GvuDeob5L34=
golang.org/x/crypto v0.36.0/go.mod h1:Y4J0ReaxCR1IMaabaSMugxJES1EpwhBHhv2bDHklZvc=
golang.org/x/crypto v0.37.0 h1:kJNSjF/Xp7kU0iB2Z+9viTPMW4EqqsrywMXLJOOsXSE=
golang.org/x/crypto v0.37.0/go.mod h1:vg+k43peMZ0pUMhYmVAWysMK35e6ioLh3wB8ZCAfbVc=
golang.org/x/net v0.34.0 h1:Mb7Mrk043xzHgnRM88suvJFwzVrRfHEHJEl5/71CKw0=
golang.org/x/net v0.34.0/go.mod h1:di0qlW3YNM5oh6GqDGQr92MyTozJPmybPK4Ev/Gm31k=
golang.org/x/sync v0.11.0 h1:GGz8+XQP4FvTTrjZPzNKTMFtSXH80RAzG+5ghFPgK9w=
golang.org/x/sync v0.11.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.12.0 h1:MHc5BpPuC30uJk597Ri8TV3CNZcTLu6B6z4lJy+g6Jw=
golang.org/x/sync v0.12.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
golang.org/x/sync v0.13.0 h1:AauUjRAJ9OSnvULf/ARrrVywoJDy0YS2AwQ98I37610=
golang.org/x/sync v0.13.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.30.0 h1:QjkSwP/36a20jFYWkSue1YwXzLmsV5Gfq7Eiy72C1uc=
golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik=
golang.org/x/sys v0.31.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
golang.org/x/text v0.22.0 h1:bofq7m3/HAFvbF51jz3Q9wLg3jkvSPuiZu/pD1XwgtM=
golang.org/x/text v0.22.0/go.mod h1:YRoo4H8PVmsu+E3Ou7cqLVH8oXWIHVoX0jqUWALQhfY=
golang.org/x/text v0.23.0 h1:D71I7dUrlY+VX0gQShAThNGHFxZ13dGLBHQLVl1mJlY=
golang.org/x/text v0.23.0/go.mod h1:/BLNzu4aZCJ1+kcD0DNRotWKage4q2rGVAg4o22unh4=
golang.org/x/sys v0.32.0 h1:s77OFDvIQeibCmezSnk/q6iAfkdiQaJi4VzroCFrN20=
golang.org/x/sys v0.32.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
golang.org/x/text v0.24.0 h1:dd5Bzh4yt5KYA8f9CJHCP4FB4D51c2c6JvN37xJJkJ0=
golang.org/x/text v0.24.0/go.mod h1:L8rBsPeo2pSS+xqN0d5u2ikmjtmoJbDBT1b7nHvFCdU=
google.golang.org/genproto/googleapis/api v0.0.0-20250115164207-1a7da9e5054f h1:gap6+3Gk41EItBuyi4XX/bp4oqJ3UwuIMl25yGinuAA=
google.golang.org/genproto/googleapis/api v0.0.0-20250115164207-1a7da9e5054f/go.mod h1:Ic02D47M+zbarjYYUlK57y316f2MoN0gjAwI3f2S95o=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250115164207-1a7da9e5054f h1:OxYkA3wjPsZyBylwymxSHa7ViiW1Sml4ToBrncvFehI=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250115164207-1a7da9e5054f/go.mod h1:+2Yz8+CLJbIfL9z73EW45avw8Lmge3xVElCP9zEKi50=
google.golang.org/grpc v1.70.0 h1:pWFv03aZoHzlRKHWicjsZytKAiYCtNS0dHbXnIdq7jQ=
google.golang.org/grpc v1.70.0/go.mod h1:ofIJqVKDXx/JiXrwr2IG4/zwdH9txy3IlF40RmcJSQw=
google.golang.org/protobuf v1.36.3 h1:82DV7MYdb8anAVi3qge1wSnMDrnKK7ebr+I0hHRN1BU=
google.golang.org/protobuf v1.36.3/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
google.golang.org/protobuf v1.36.5 h1:tPhr+woSbjfYvY6/GPufUoYizxw1cF/yFoxJ2fmpwlM=
google.golang.org/protobuf v1.36.5/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=

231
http-api.go Normal file
View File

@@ -0,0 +1,231 @@
package main
import (
"fmt"
"net/http"
"slices"
"sort"
"strconv"
"strings"
"time"
"go.sour.is/xt/internal/otel"
"go.sour.is/xt/internal/uuid"
"go.yarn.social/lextwt"
)
type API struct {
app *appState
db db
hostname string
}
func (a *API) plain(w http.ResponseWriter, r *http.Request) {
reg := lextwt.NewTwtRegistry(mkPreambleDocs(a.hostname), nil)
reg.WriteTo(w)
}
func (a *API) conv(w http.ResponseWriter, r *http.Request) {
ctx, span := otel.Span(r.Context())
defer span.End()
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
hash := r.PathValue("hash")
// if (len(hash) < 6 || len(hash) > 8) && !notAny(hash, "abcdefghijklmnopqrstuvwxyz234567") {
// w.WriteHeader(http.StatusBadRequest)
// return
// }
limit := 100
if v, ok := strconv.Atoi(r.URL.Query().Get("limit")); ok == nil {
limit = v
}
var offset int64 = 0
if v, ok := strconv.ParseInt(r.URL.Query().Get("offset"), 10, 64); ok == nil {
offset = v
}
twts, offset, end, err := fetchConv(ctx, a.db, hash, limit, offset)
span.RecordError(err)
if err != nil {
http.Error(w, "ERR", 500)
return
}
preamble := mkPreamble(a.hostname, "", "/api/plain/conv/"+hash, limit, int64(len(twts)), offset, end)
reg := lextwt.NewTwtRegistry(preamble, twts)
reg.WriteTo(w)
}
func (a *API) mentions(w http.ResponseWriter, r *http.Request) {
ctx, span := otel.Span(r.Context())
defer span.End()
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
uri := r.URL.Query().Get("uri")
if uri == "" {
reg := lextwt.NewTwtRegistry(mkPreambleDocs(a.hostname), nil)
reg.WriteTo(w)
return
}
mention := uuid.UrlNS.UUID5(uri)
limit := 100
if v, ok := strconv.Atoi(r.URL.Query().Get("limit")); ok == nil {
limit = v
}
var offset int64 = 0
if v, ok := strconv.ParseInt(r.URL.Query().Get("offset"), 10, 64); ok == nil {
offset = v
}
twts, offset, end, err := fetchMentions(ctx, a.db, mention, limit, offset)
span.RecordError(err)
if err != nil {
http.Error(w, "ERR", 500)
return
}
preamble := mkPreamble(a.hostname, uri, "/api/plain/mentions", limit, int64(len(twts)), offset, end)
reg := lextwt.NewTwtRegistry(preamble, twts)
reg.WriteTo(w)
}
func (a *API) twt(w http.ResponseWriter, r *http.Request) {
ctx, span := otel.Span(r.Context())
defer span.End()
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
uri := r.URL.Query().Get("uri")
limit := 100
if v, ok := strconv.Atoi(r.URL.Query().Get("limit")); ok == nil {
limit = v
}
limit = min(100, max(1, limit))
var offset int64 = 0
if v, ok := strconv.ParseInt(r.URL.Query().Get("offset"), 10, 64); ok == nil {
offset = v
}
twts, offset, end, err := fetchTwts(ctx, a.db, uri, limit, offset)
span.RecordError(err)
if err != nil {
http.Error(w, "ERR", 500)
return
}
preamble := mkPreamble(a.hostname, uri, "/api/plain/twt", limit, int64(len(twts)), offset, end)
reg := lextwt.NewTwtRegistry(preamble, twts)
reg.WriteTo(w)
}
func (a *API) users(w http.ResponseWriter, r *http.Request) {
ctx, span := otel.Span(r.Context())
defer span.End()
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
uri := r.URL.Query().Get("uri")
q := r.URL.Query().Get("q")
twts, err := fetchUsers(ctx, a.db, uri, q)
if err != nil {
http.Error(w, "ERR", 500)
return
}
reg := lextwt.NewTwtRegistry(mkPreambleDocs(a.hostname), twts)
reg.WriteTo(w)
}
func (a *API) queue(w http.ResponseWriter, r *http.Request) {
lis := slices.Collect(a.app.queue.Iter())
sort.Slice(lis, func(i, j int) bool {
return lis[i].NextScanOn.Time.Before(lis[j].LastScanOn.Time)
})
for _, feed := range lis {
fmt.Fprintln(w, feed.State, feed.NextScanOn.Time.Format(time.RFC3339), feed.Nick, feed.URI)
}
}
func notAny(s string, chars string) bool {
for _, c := range s {
if !strings.ContainsRune(chars, c) {
return false
}
}
return true
}
func mkqry(uri string, limit int, offset int64) string {
qry := make([]string, 0, 3)
if uri != "" {
qry = append(qry, "uri="+uri)
}
if limit != 100 {
qry = append(qry, fmt.Sprint("limit=", limit))
}
if offset != 0 {
qry = append(qry, fmt.Sprint("offset=", offset))
}
if len(qry) == 0 {
return ""
}
return "?" + strings.Join(qry, "&")
}
func add(preamble lextwt.Comments, text string, v ...any) lextwt.Comments {
if len(v) > 0 {
text = fmt.Sprintf(text, v...)
}
return append(preamble, lextwt.NewComment("# "+text))
}
func addKey(preamble lextwt.Comments, key, value string, v ...any) lextwt.Comments {
if len(v) > 0 {
value = fmt.Sprintf(value, v...)
}
comment := fmt.Sprintf("# %s = %s", key, value)
return append(preamble, lextwt.NewCommentValue(comment, key, value))
}
func mkPreamble(hostname, uri, path string, limit int, length, offset, end int64) lextwt.Comments {
preamble := addKey(mkPreambleDocs(hostname), "twt range", "1 %d", end)
preamble = addKey(preamble, "self", "%s%s%s", hostname, path, mkqry(uri, limit, offset))
if next := offset + length; next < end {
preamble = addKey(preamble, "next", "%s%s%s", hostname, path, mkqry(uri, limit, next))
}
if prev := offset - int64(limit); prev > 0 {
preamble = addKey(preamble, "prev", "%s%s%s", hostname, path, mkqry(uri, limit, prev))
}
return preamble
}
var mkPreambleDocs = func(hostname string) lextwt.Comments {
c := add(nil, iAmTheWatcher)
c = add(c, "")
c = add(c, "Usage:")
c = add(c, " %s/api/plain/users View list of users and latest twt date.", hostname)
c = add(c, " %s/api/plain/twt View all twts.", hostname)
c = add(c, " %s/api/plain/mentions?uri=:uri View all mentions for uri.", hostname)
c = add(c, " %s/api/plain/conv/:hash View all twts for a conversation subject.", hostname)
c = add(c, "")
c = add(c, "Options:")
c = add(c, " uri Filter to show a specific users twts.")
c = add(c, " offset Start index for quey.")
c = add(c, " limit Count of items to return (going back in time).")
return add(c, "")
}

266
http-html.go Normal file
View File

@@ -0,0 +1,266 @@
package main
import (
"fmt"
"io"
"net/http"
"net/url"
"strconv"
"strings"
"time"
"go.sour.is/xt/internal/otel"
"go.yarn.social/lextwt"
"go.yarn.social/types"
)
type HTML struct {
app *appState
db db
hostname string
}
func (a *HTML) healthcheck(w http.ResponseWriter, r *http.Request) {
_, span := otel.Span(r.Context())
defer span.End()
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
w.Write([]byte("ok"))
}
func (a *HTML) home(w http.ResponseWriter, r *http.Request) {
ctx, span := otel.Span(r.Context())
defer span.End()
w.Header().Set("Content-Type", "text/html; charset=utf-8")
uri := r.URL.Query().Get("uri")
limit := 100
if v, ok := strconv.Atoi(r.URL.Query().Get("limit")); ok == nil {
limit = v
}
limit = min(100, max(1, limit))
var offset int64 = 0
if v, ok := strconv.ParseInt(r.URL.Query().Get("offset"), 10, 64); ok == nil {
offset = v
}
twts, offset, end, err := fetchTwts(ctx, a.db, uri, limit, offset)
span.RecordError(err)
if err != nil {
http.Error(w, "ERR", 500)
return
}
preamble := mkPreamble(a.hostname, uri, "", limit, int64(len(twts)), offset, end)
reg := &HTWriter{
lextwt.NewTwtRegistry(preamble, reverse(twts)),
}
reg.WriteTo(w)
}
func (a *HTML) conv(w http.ResponseWriter, r *http.Request) {
ctx, span := otel.Span(r.Context())
defer span.End()
w.Header().Set("Content-Type", "text/html; charset=utf-8")
hash := r.PathValue("hash")
// if (len(hash) < 6 || len(hash) > 8) && !notAny(hash, "abcdefghijklmnopqrstuvwxyz234567") {
// w.WriteHeader(http.StatusBadRequest)
// return
// }
limit := 100
if v, ok := strconv.Atoi(r.URL.Query().Get("limit")); ok == nil {
limit = v
}
var offset int64 = 0
if v, ok := strconv.ParseInt(r.URL.Query().Get("offset"), 10, 64); ok == nil {
offset = v
}
twts, offset, end, err := fetchConv(ctx, a.db, hash, limit, offset)
span.RecordError(err)
if err != nil {
http.Error(w, "ERR", 500)
return
}
preamble := mkPreamble(a.hostname, "", "/conv/"+hash, limit, int64(len(twts)), offset, end)
reg := &HTWriter{
lextwt.NewTwtRegistry(preamble, twts),
}
reg.WriteTo(w)
}
type reg interface {
Preamble() lextwt.Comments
Twts() types.Twts
}
type HTWriter struct {
reg
}
func (r *HTWriter) WriteTo(w io.Writer) (int64, error) {
var output int64
i, err := fmt.Fprintln(w, `<!DOCTYPE html>
<html>
<head>
<title>The Watcher</title>
<style>
@media screen and (max-width: 500px) {
body { width: 100%; margin: 0; }
}
@media screen and (min-width: 500px) and (max-width: 940px) {
body { width: 90%; margin: auto; }
.h-card { columns: 2; }
}
@media screen and (min-width: 940px) {
body { width: 70%; margin: auto; }
.h-card { columns: 2; }
}
body { font-family: sans-serif; background: black; color: white; }
a { color: cornflowerblue; text-decoration: none; }
main { }
pre { white-space: pre-wrap; }
pre.preamble { color: green; }
article { background-color: #333; border: 1px solid green; border-radius: 4px; padding: 4px; margin: 2px; }
article pre { color: orange; }
.h-card .author { display: flex; }
.h-card .icon { width: 36px; margin: 4px; }
.h-card .u-photo { width: 32px; }
.p-org a { color: darkgrey; }
.h-card .date { text-align: right;}
video { width: 100%; }
section { padding: 1em; border: 1px solid darkgreen; background-color: #111; }
section img { max-width: 100%; }
</style>
<meta name="viewport" content="width=device-width, initial-scale=1">
</head>
<body onload="setTimestamps()">
<pre class='preamble'>
`)
output += int64(i)
if err != nil {
return output, err
}
for _, c := range r.Preamble() {
if key := c.Key(); key != "" {
value := mkValue(c.Value())
i, err = fmt.Fprintf(w, "# %s = %s\n", key, value)
} else {
i, err = fmt.Fprintln(w, c.Text())
}
output += int64(i)
if err != nil {
return output, err
}
}
i, err = fmt.Fprintln(w, "</pre><main>")
output += int64(i)
for _, twt := range r.Twts() {
twter := twt.Twter()
uri, err := url.Parse(twter.URI)
if err != nil {
uri = &url.URL{
Scheme: "HTTPS",
Host: "unknown.txt",
}
}
subject := ""
if s := twt.Subject(); s != nil {
if tag, ok := s.Tag().(*lextwt.Tag); ok && tag != nil {
subject = tag.Text()
}
}
i, err = fmt.Fprintf(w, `
<article>
<header class="u-author h-card">
<div class="author">
<div class="icon">
<a href="%s" class="u-url">
<img class="avatar u-photo" src="%s" alt="" loading="lazy">
</a>
</div>
<div class="author-name">
<div class="p-name">
<a href="%s">%s</a>
</div>
<div class="p-org">
<a target="_blank" href="%s">%s</a>
</div>
</div>
</div>
<div class="date">
<div><a class="u-url" href="%s">%s</a></div>
<div><small><a href="%s"> View Thread</a></small></div>
</div>
</header>
<section>
%-h
</section>
</div>
</article>
`, "/?uri="+twter.URI, twter.Avatar,
"/?uri="+twter.URI, twter.Nick,
twter.URI, uri.Host,
"/conv/"+subject, fmt.Sprintf("<time datetime='%s'>%s</time>", twt.Created().Format(time.RFC3339), twt.Created().Format(time.RFC822)),
"/conv/"+subject,
twt,
)
output += int64(i)
if err != nil {
return output, err
}
}
i, err = fmt.Fprintln(w, `</main>
<script>
function setTimestamps() {
document.querySelectorAll("time").forEach((e) => {
const OneDay = new Date(new Date().getTime() - (1 * 24 * 60 * 60 * 1000))
const d = new Date(e.hasAttributes() && e.attributes.getNamedItem('datetime'). value);
if (d > OneDay)
e.innerHTML = d.toLocaleTimeString(navigator.language, { hour: '2-digit', minute: '2-digit' });
else
e.innerHTML = d.toLocaleDateString(navigator.language, { hour: '2-digit', minute: '2-digit' });
});
}
</script>
</body>`)
output += int64(i)
return output, err
}
func mkValue(v string) string {
if strings.HasPrefix(v, "http") {
return fmt.Sprintf(`<a href="%s">%s</a>`, v, v)
}
return v
}
func reverse[T any](s []T) []T {
for i, j := 0, len(s)-1; i < j; i, j = i+1, j-1 {
s[i], s[j] = s[j], s[i]
}
return s
}

270
http.go
View File

@@ -1,23 +1,18 @@
package main
import (
"context"
"errors"
"fmt"
"net/http"
"slices"
"sort"
"strconv"
"strings"
"time"
"go.yarn.social/lextwt"
"go.yarn.social/types"
"go.sour.is/xt/internal/otel"
)
func httpServer(c *console, app *appState) error {
ctx, span := otel.Span(c)
const iAmTheWatcher = "I am the Watcher. I am your guide through this vast new twtiverse."
func httpServer(ctx context.Context, app *appState) error {
ctx, span := otel.Span(ctx)
defer span.End()
span.AddEvent("start http server")
@@ -28,241 +23,37 @@ func httpServer(c *console, app *appState) error {
return err
}
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
_, span := otel.Span(r.Context())
defer span.End()
api := API{
app: app,
db: db,
hostname: app.args.Hostname,
}
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
w.Write([]byte("ok"))
})
html := HTML{
app: app,
db: db,
hostname: app.args.Hostname,
}
http.HandleFunc("/api/plain/conv/{hash}", func(w http.ResponseWriter, r *http.Request) {
ctx, span := otel.Span(r.Context())
defer span.End()
http.HandleFunc("/health", html.healthcheck)
hash := r.PathValue("hash")
if (len(hash) < 6 || len(hash) > 8) && !notAny(hash, "abcdefghijklmnopqrstuvwxyz234567") {
w.WriteHeader(http.StatusBadRequest)
return
}
http.HandleFunc("/", html.home)
http.HandleFunc("/conv/{hash}", html.conv)
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
rows, err := db.QueryContext(
ctx,
`SELECT
feed_id,
hash,
conv,
nick,
uri,
text
FROM twts
JOIN (
SELECT
feed_id,
nick,
uri
FROM feeds
) using (feed_id)
WHERE
hash = $1 or
conv = $1
order by ulid asc`,
hash,
)
if err != nil {
span.RecordError(err)
return
}
defer rows.Close()
var twts []types.Twt
for rows.Next() {
var o struct {
FeedID string
Hash string
Conv string
Dt string
Nick string
URI string
Text string
}
err = rows.Scan(&o.FeedID, &o.Hash, &o.Conv, &o.Nick, &o.URI, &o.Text)
if err != nil {
span.RecordError(err)
return
}
twter := types.NewTwter(o.Nick, o.URI)
o.Text = strings.ReplaceAll(o.Text, "\n", "\u2028")
twt, _ := lextwt.ParseLine(o.Text, &twter)
twts = append(twts, twt)
}
var preamble lextwt.Comments
preamble = append(preamble, lextwt.NewComment("# self = /conv/"+hash))
reg := lextwt.NewTwtRegistry(preamble, twts)
reg.WriteTo(w)
})
http.HandleFunc("/api/plain/twt", func(w http.ResponseWriter, r *http.Request) {
ctx, span := otel.Span(r.Context())
defer span.End()
limit := 100
if v, ok := strconv.Atoi(r.URL.Query().Get("limit")); ok == nil {
limit = v
}
offset := 0
if v, ok := strconv.Atoi(r.URL.Query().Get("offset")); ok == nil {
offset = v
}
args := []any{limit, offset}
uriqry := ""
if u := r.URL.Query().Get("uri"); u != "" {
feed_id := urlNS.UUID5(u)
uriqry = "and feed_id = ?"
args = append([]any{feed_id}, args...)
}
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
rows, err := db.QueryContext(
ctx,
`SELECT
feed_id,
hash,
conv,
nick,
uri,
text
FROM twts
JOIN (
SELECT
feed_id,
nick,
uri
FROM feeds
where state not in ('frozen', 'permanantly-dead')
`+uriqry+`
) using (feed_id)
order by ulid desc
limit ?
offset ?
`, args...,
)
if err != nil {
span.RecordError(err)
return
}
defer rows.Close()
var twts []types.Twt
for rows.Next() {
var o struct {
FeedID string
Hash string
Conv string
Dt string
Nick string
URI string
Text string
}
err = rows.Scan(&o.FeedID, &o.Hash, &o.Conv, &o.Nick, &o.URI, &o.Text)
if err != nil {
span.RecordError(err)
return
}
twter := types.NewTwter(o.Nick, o.URI)
o.Text = strings.ReplaceAll(o.Text, "\n", "\u2028")
twt, _ := lextwt.ParseLine(o.Text, &twter)
twts = append(twts, twt)
}
var preamble lextwt.Comments
preamble = append(preamble, lextwt.NewComment("# self = /api/plain/twts"))
reg := lextwt.NewTwtRegistry(preamble, twts)
reg.WriteTo(w)
})
http.HandleFunc("/api/plain/users", func(w http.ResponseWriter, r *http.Request) {
ctx, span := otel.Span(r.Context())
defer span.End()
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
where := `where parent_id is null and state not in ('permanantly-dead', 'frozen') and last_twt_on is not null`
args := make([]any, 0)
if uri := r.URL.Query().Get("uri"); uri != "" {
where = `where feed_id = ? or parent_id = ?`
feed_id := urlNS.UUID5(uri)
args = append(args, feed_id, feed_id)
}
rows, err := db.QueryContext(
ctx,
`SELECT
feed_id,
uri,
nick,
last_scan_on,
last_twt_on
FROM feeds
left join (
select feed_id, max(strftime('%Y-%m-%dT%H:%M:%fZ', (substring(text, 1, instr(text, ' ')-1)))) last_twt_on
from twts group by feed_id
) using (feed_id)
`+where+`
order by nick, uri
`,args...,
)
if err != nil {
span.RecordError(err)
return
}
defer rows.Close()
var twts []types.Twt
for rows.Next() {
var o struct {
FeedID string
URI string
Nick string
Dt TwtTime
LastTwtOn TwtTime
}
err = rows.Scan(&o.FeedID, &o.URI, &o.Nick, &o.Dt, &o.LastTwtOn)
if err != nil {
span.RecordError(err)
return
}
twts = append(twts, lextwt.NewTwt(
types.NewTwter(o.Nick, o.URI),
lextwt.NewDateTime(o.Dt.Time, o.LastTwtOn.Time.Format(time.RFC3339)),
nil,
))
}
reg := lextwt.NewTwtRegistry(nil, twts)
reg.WriteTo(w)
})
http.HandleFunc("/api/plain/queue", func(w http.ResponseWriter, r *http.Request) {
lis := slices.Collect(app.queue.Iter())
sort.Slice(lis, func(i, j int) bool {
return lis[i].NextScanOn.Time.Before(lis[j].LastScanOn.Time)
})
for _, feed := range lis {
fmt.Fprintln(w, feed.State, feed.NextScanOn.Time.Format(time.RFC3339), feed.Nick, feed.URI)
}
})
http.HandleFunc("/api/plain", api.plain)
http.HandleFunc("/api/plain/conv/{hash}", api.conv)
http.HandleFunc("/api/plain/mentions", api.mentions)
http.HandleFunc("/api/plain/twt", api.twt)
http.HandleFunc("/api/plain/tweets", api.twt)
http.HandleFunc("/api/plain/users", api.users)
http.HandleFunc("/api/plain/queue", api.queue)
srv := &http.Server{
Addr: app.args.Listen,
Handler: http.DefaultServeMux,
}
c.AddCancel(srv.Shutdown)
app.AddCancel(srv.Shutdown)
err = srv.ListenAndServe()
if !errors.Is(err, http.ErrServerClosed) {
span.RecordError(err)
@@ -271,12 +62,3 @@ func httpServer(c *console, app *appState) error {
return nil
}
func notAny(s string, chars string) bool {
for _, c := range s {
if !strings.ContainsRune(chars, c) {
return false
}
}
return true
}

View File

@@ -24,3 +24,18 @@ create table if not exists twts (
primary key (feed_id, ulid)
);
create table if not exists last_twt_on(
feed_id blob,
last_twt_on,
primary key (feed_id)
);
CREATE INDEX if not exists twt_time on twts (ulid asc);
create table if not exists twt_mentions(
feed_id blob,
ulid blob,
primary key (feed_id, ulid)
);
CREATE INDEX if not exists twt_mention on twt_mentions (ulid asc);

View File

@@ -0,0 +1,61 @@
package console
import (
"context"
"errors"
"fmt"
"io"
"os"
"os/signal"
"time"
)
type C[A any] struct {
io.Reader
io.Writer
err io.Writer
args A
abort func()
cancelfns []func(context.Context) error
}
func New[A any](args A) (context.Context, *C[A]) {
ctx := context.Background()
ctx, abort := context.WithCancel(ctx)
ctx, stop := signal.NotifyContext(ctx, os.Interrupt)
go func() { <-ctx.Done(); stop() }() // restore interrupt function
console := &C[A]{Reader: os.Stdin, Writer: os.Stdout, err: os.Stderr, args: args, abort: abort}
return ctx, console
}
func (c *C[A]) Args() A {
return c.args
}
func (c *C[A]) Shutdown() error {
fmt.Fprintln(c.err, "shutting down ", len(c.cancelfns), " cancel functions...")
defer fmt.Fprintln(c.err, "done")
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
c.abort()
var err error
for _, fn := range c.cancelfns {
err = errors.Join(err, fn(ctx))
}
return err
}
func (c *C[A]) AddCancel(fn func(context.Context) error) { c.cancelfns = append(c.cancelfns, fn) }
func (c *C[A]) IfFatal(err error) {
if err == nil {
return
}
fmt.Fprintln(c.err, err)
err = c.Shutdown()
if err != nil {
fmt.Fprintln(c.err, err)
}
os.Exit(1)
}

54
internal/env/env.go vendored Normal file
View File

@@ -0,0 +1,54 @@
package env
import (
"bufio"
"os"
"strings"
)
func Default(key, def string) string {
if v, ok := os.LookupEnv(key); ok {
return v
}
return def
}
type secret struct {
value string
}
func (s secret) Secret() string {
return s.value
}
func (s secret) String() string {
return "***"
}
func Secret(key, def string) secret {
if v, ok := os.LookupEnv(key); ok {
return secret{v}
}
return secret{def}
}
func DotEnv() {
fd, err := os.Open(".env")
if err != nil {
return
}
scan := bufio.NewScanner(fd)
for scan.Scan() {
line := scan.Text()
if strings.HasPrefix(line, "#") {
continue
}
key, val, ok := strings.Cut(line, "=")
if !ok {
continue
}
os.Setenv(strings.TrimSpace(key), strings.TrimSpace(val))
}
}

View File

@@ -189,7 +189,25 @@ func newTraceProvider(ctx context.Context, name string) (func(context.Context) e
return nil, err
}
if v := env("XT_TRACER", ""); v != "" {
if v := env("XT_TRACER", ""); v == "stdout" {
traceExporter, err := stdouttrace.New(
stdouttrace.WithWriter(os.Stderr),
stdouttrace.WithPrettyPrint(),
)
if err != nil {
return nil, err
}
tracerProvider := sdktrace.NewTracerProvider(
sdktrace.WithResource(r),
sdktrace.WithBatcher(traceExporter,
// Default is 5s. Set to 1s for demonstrative purposes.
sdktrace.WithBatchTimeout(time.Second)),
)
otel.SetTracerProvider(tracerProvider)
return tracerProvider.Shutdown, nil
} else if v != "" {
fmt.Println("use tracer", v)
exp, err := otlptracegrpc.New(
ctx,
@@ -210,26 +228,11 @@ func newTraceProvider(ctx context.Context, name string) (func(context.Context) e
}, nil
}
traceExporter, err := stdouttrace.New(
stdouttrace.WithWriter(os.Stderr),
stdouttrace.WithPrettyPrint(),
)
if err != nil {
return nil, err
}
tracerProvider := sdktrace.NewTracerProvider(
sdktrace.WithResource(r),
sdktrace.WithBatcher(traceExporter,
// Default is 5s. Set to 1s for demonstrative purposes.
sdktrace.WithBatchTimeout(time.Second)),
)
otel.SetTracerProvider(tracerProvider)
return tracerProvider.Shutdown, nil
return func(ctx context.Context) error { return nil }, nil
}
func newMeterProvider(ctx context.Context, name string) (func(context.Context) error, error) {
_, _ = ctx, name
// metricExporter, err := stdoutmetric.New()
// if err != nil {
// return nil, err
@@ -281,7 +284,25 @@ func newLoggerProvider(ctx context.Context, name string) (func(context.Context)
return nil, err
}
if v := env("XT_LOGGER", ""); v != "" {
if v := env("XT_LOGGER", ""); v == "stdout" {
logExporter, err := stdoutlog.New(
stdoutlog.WithPrettyPrint(),
stdoutlog.WithWriter(os.Stderr),
)
if err != nil {
return nil, err
}
loggerProvider := log.NewLoggerProvider(
log.WithProcessor(
log.NewBatchProcessor(logExporter),
),
log.WithResource(r),
)
global.SetLoggerProvider(loggerProvider)
return loggerProvider.Shutdown, nil
} else if v != "" {
fmt.Println("use logger", v)
exp, err := otlploghttp.New(
@@ -301,28 +322,9 @@ func newLoggerProvider(ctx context.Context, name string) (func(context.Context)
global.SetLoggerProvider(provider)
return processor.Shutdown, nil
}
// return func(ctx context.Context) error { return nil }, nil
logExporter, err := stdoutlog.New(
stdoutlog.WithPrettyPrint(),
stdoutlog.WithWriter(os.Stderr),
)
if err != nil {
return nil, err
}
loggerProvider := log.NewLoggerProvider(
log.WithProcessor(
log.NewBatchProcessor(logExporter),
),
log.WithResource(r),
)
global.SetLoggerProvider(loggerProvider)
return loggerProvider.Shutdown, nil
return func(ctx context.Context) error { return nil }, nil
}
func env(key, def string) string {

View File

@@ -1,4 +1,4 @@
package main
package uuid
import (
"crypto/sha1"
@@ -8,54 +8,54 @@ import (
"strings"
)
type uuid [16]byte
type UUID [16]byte
var urlNS = uuid{0x6b, 0xa7, 0xb8, 0x10, 0x9d, 0xad, 0x11, 0xd1, 0x80, 0xb4, 0x00, 0xc0, 0x4f, 0xd4, 0x30, 0xc8}
var UrlNS = UUID{0x6b, 0xa7, 0xb8, 0x10, 0x9d, 0xad, 0x11, 0xd1, 0x80, 0xb4, 0x00, 0xc0, 0x4f, 0xd4, 0x30, 0xc8}
func (u uuid) UUID5(value string) uuid {
func (u UUID) UUID5(value string) UUID {
h := sha1.New()
h.Write(u[:])
h.Write([]byte(value))
return uuid(h.Sum(nil))
return UUID(h.Sum(nil))
}
func (u *uuid) UnmarshalText(data string) error {
func (u *UUID) UnmarshalText(data string) error {
data = strings.Trim(data, "{}")
data = strings.ReplaceAll(data, "-", "")
_, err := hex.Decode(u[:], []byte(data))
return err
}
func (u uuid) MarshalText() string {
func (u UUID) MarshalText() string {
s := hex.EncodeToString(u[:])
return fmt.Sprintf("{%s-%s-%s-%s-%s}", s[:8], s[8:12], s[12:16], s[16:20], s[20:])
}
func (u uuid) Value() (driver.Value, error) {
func (u UUID) Value() (driver.Value, error) {
return u[:], nil
}
func (u *uuid) Scan(value any) error {
func (u *UUID) Scan(value any) error {
if value == nil {
return nil
}
*u = uuid(value.([]byte))
*u = UUID(value.([]byte))
return nil
}
type uuids []uuid
type UUIDs []UUID
func (lis uuids) ToStrList() strList {
arr := make(strList, len(lis))
func (lis UUIDs) ToStrList() List {
arr := make(List, len(lis))
for i, v := range lis {
arr[i] = v.MarshalText()
}
return arr
}
type strList []string
type List []string
func (l *strList) Scan(value any) error {
func (l *List) Scan(value any) error {
s := value.(string)
s = strings.Trim(s, "[]")
for _, v := range strings.Split(s, ",") {
@@ -67,7 +67,7 @@ func (l *strList) Scan(value any) error {
return nil
}
func (l strList) Value() (driver.Value, error) {
func (l List) Value() (driver.Value, error) {
arr := make([]string, len(l))
for i, v := range l {
arr[i] = "\"" + v + "\""

143
main.go
View File

@@ -1,111 +1,16 @@
package main
import (
"bufio"
"context"
"errors"
"fmt"
"io"
"os"
"os/signal"
"strings"
"go.opentelemetry.io/otel/metric"
"go.sour.is/xt/internal/console"
"go.sour.is/xt/internal/env"
"go.sour.is/xt/internal/otel"
)
const name = "go.sour.is/xt"
var m_up metric.Int64Gauge
func main() {
dotEnv() // load .env
ctx, console := newConsole(args{
dbtype: env("XT_DBTYPE", "sqlite3"),
dbfile: env("XT_DBFILE", "file:twt.db"),
baseFeed: env("XT_BASE_FEED", "feed"),
Nick: env("XT_NICK", "xuu"),
URI: env("XT_URI", "https://txt.sour.is/user/xuu/twtxt.txt"),
Listen: env("XT_LISTEN", ":8080"),
})
finish, err := otel.Init(ctx, name)
console.IfFatal(err)
console.AddCancel(finish)
m_up, err = otel.Meter().Int64Gauge("up")
console.IfFatal(err)
m_up.Record(ctx, 1)
defer m_up.Record(context.Background(), 0)
err = run(console)
if !errors.Is(err, context.Canceled) {
console.IfFatal(err)
}
}
type console struct {
io.Reader
io.Writer
err io.Writer
context.Context
abort func()
cancelfns []func(context.Context) error
}
func newConsole(args args) (context.Context, *console) {
ctx := context.Background()
ctx, abort := context.WithCancel(ctx)
ctx, stop := signal.NotifyContext(ctx, os.Interrupt)
go func() { <-ctx.Done(); stop() }() // restore interrupt function
console := &console{Reader: os.Stdin, Writer: os.Stdout, err: os.Stderr, Context: ctx, abort: abort}
console.Set("console", console)
console.Set("args", args)
return ctx, console
}
func (c *console) Args() args {
v, ok := c.Get("args").(args)
if !ok {
return args{}
}
return v
}
func (c *console) Shutdown() error {
fmt.Fprintln(c.err, "shutting down ", len(c.cancelfns), " cancel functions...")
defer fmt.Fprintln(c.err, "done")
c.abort()
var err error
for _, fn := range c.cancelfns {
err = errors.Join(err, fn(c.Context))
}
return err
}
func (c *console) AddCancel(fn func(context.Context) error) { c.cancelfns = append(c.cancelfns, fn) }
func (c *console) IfFatal(err error) {
if err == nil {
return
}
fmt.Fprintln(c.err, err)
c.abort()
os.Exit(1)
}
type contextKey struct{ name string }
func (c *console) Set(name string, value any) {
c.Context = context.WithValue(c.Context, contextKey{name}, value)
}
func (c *console) Get(name string) any {
return c.Context.Value(contextKey{name})
}
type args struct {
dbtype string
dbfile string
@@ -113,34 +18,34 @@ type args struct {
Nick string
URI string
Listen string
Hostname string
}
func env(key, def string) string {
if v, ok := os.LookupEnv(key); ok {
return v
}
return def
}
func main() {
env.DotEnv() // load .env
func dotEnv() {
fd, err := os.Open(".env")
if err != nil {
return
}
ctx, console := console.New(args{
dbtype: env.Default("XT_DBTYPE", "sqlite3"),
dbfile: env.Default("XT_DBFILE", "file:twt.db"),
baseFeed: env.Default("XT_BASE_FEED", ""),
Nick: env.Default("XT_NICK", "xuu"),
URI: env.Default("XT_URI", "https://txt.sour.is/user/xuu/twtxt.txt"),
Listen: env.Default("XT_LISTEN", ":8080"),
Hostname: env.Default("XT_HOSTNAME", "https://watcher.sour.is"),
})
scan := bufio.NewScanner(fd)
finish, err := otel.Init(ctx, name)
console.IfFatal(err)
console.AddCancel(finish)
for scan.Scan() {
line := scan.Text()
m_up, err := otel.Meter().Int64Gauge("up")
console.IfFatal(err)
if strings.HasPrefix(line, "#") {
continue
}
key, val, ok := strings.Cut(line, "=")
if !ok {
continue
}
m_up.Record(ctx, 1)
defer m_up.Record(context.Background(), 0)
os.Setenv(strings.TrimSpace(key), strings.TrimSpace(val))
err = run(ctx, console)
if !errors.Is(err, context.Canceled) {
console.IfFatal(err)
}
}

View File

@@ -5,8 +5,6 @@ import (
"errors"
"fmt"
"io"
"os"
"path/filepath"
"sort"
"time"
@@ -26,8 +24,8 @@ const (
TwoMinutes = 120
)
func feedRefreshProcessor(c *console, app *appState) error {
ctx, span := otel.Span(c.Context)
func feedRefreshProcessor(ctx context.Context, app *appState) error {
ctx, span := otel.Span(ctx)
defer span.End()
sleeping_time, _ := otel.Meter().Int64Counter("xt_feed_sleep")
@@ -38,7 +36,7 @@ func feedRefreshProcessor(c *console, app *appState) error {
fetch, close := NewFuncPool(ctx, 40, f.Fetch)
defer close()
db, err := app.DB(c)
db, err := app.DB(ctx)
if err != nil {
span.RecordError(err)
return err
@@ -69,7 +67,7 @@ func feedRefreshProcessor(c *console, app *appState) error {
span.AddEvent("sleeping for ", trace.WithAttributes(attribute.Int("seconds", int(TwoMinutes))))
select {
case <-time.After(TwoMinutes * time.Second):
case <-c.Done():
case <-ctx.Done():
return nil
}
span.End()
@@ -127,6 +125,8 @@ func processorLoop(ctx context.Context, db db, fetch *pool[*Feed, *Response]) {
select {
case <-ctx.Done():
return
case <-time.After(10 * time.Minute):
refreshLastTwt(ctx, db)
case res := <-fetch.Out():
f := res.Request
span.AddEvent("got response", trace.WithAttributes(
@@ -162,7 +162,7 @@ func processorLoop(ctx context.Context, db db, fetch *pool[*Feed, *Response]) {
f.LastModified.Time, f.LastModified.Valid = res.LastModified(), true
span.AddEvent("read feed")
cpy, err := os.OpenFile(filepath.Join("feeds", urlNS.UUID5(f.URI).MarshalText()), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600)
// cpy, err := os.OpenFile(filepath.Join("feeds", urlNS.UUID5(f.URI).MarshalText()), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600)
if err != nil {
span.RecordError(fmt.Errorf("%w: %w", ErrParseFailed, err))
@@ -174,10 +174,12 @@ func processorLoop(ctx context.Context, db db, fetch *pool[*Feed, *Response]) {
continue
}
rdr := io.TeeReader(res.Body, cpy)
var rdr io.Reader = res.Body
// rdr := io.TeeReader(rdr, cpy)
rdr = lextwt.TwtFixer(rdr)
twtfile, err := lextwt.ParseFile(rdr, &types.Twter{Nick: f.Nick, URI: f.URI})
cpy.Close()
//cpy.Close()
res.Body.Close()
if err != nil {
span.RecordError(fmt.Errorf("%w: %w", ErrParseFailed, err))
@@ -206,6 +208,10 @@ func processorLoop(ctx context.Context, db db, fetch *pool[*Feed, *Response]) {
continue
}
if nick := twtfile.Twter().Nick; nick != "" {
f.Nick = nick
}
f.RefreshRate, f.State = checkTemp(twtfile.Twts())
f.LastError.String = ""
@@ -227,31 +233,31 @@ func checkTemp(twts types.Twts) (int, State) {
since_first := -time.Until(twts[0].Created())
since_fifth := -time.Until(twts[4].Created())
if since_first < 2*time.Hour || since_fifth < 8*time.Hour {
if since_first < 24*time.Hour || since_fifth < 32*time.Hour {
return TwoMinutes, "hot"
}
if since_first < 4*time.Hour || since_fifth < 16*time.Hour {
if since_first < 48*time.Hour || since_fifth < 64*time.Hour {
return TenMinutes, "hot"
}
if since_first < 8*time.Hour || since_fifth < 32*time.Hour {
if since_first < 96*time.Hour || since_fifth < 128*time.Hour {
return 2 * TenMinutes, "warm"
}
if since_first < 16*time.Hour || since_fifth < 64*time.Hour {
if since_first < 192*time.Hour || since_fifth < 256*time.Hour {
return 4 * TenMinutes, "warm"
}
if since_first < 24*time.Hour || since_fifth < 128*time.Hour {
if since_first < 384*time.Hour || since_fifth < 512*time.Hour {
return OneDay, "cold"
}
if since_first < 48*time.Hour || since_fifth < 256*time.Hour {
if since_first < 768*time.Hour || since_fifth < 1024*time.Hour {
return 2 * OneDay, "cold"
}
if since_first < 96*time.Hour || since_fifth < 512*time.Hour {
if since_first < 1536*time.Hour || since_fifth < 2048*time.Hour {
return 7 * OneDay, "frozen"
}