diff --git a/.gitignore b/.gitignore index 24a5a9f..1949f0b 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,5 @@ feed __debug* feeds/ +/xt +.env diff --git a/feed.go b/feed.go index 51a0d8a..27b04bd 100644 --- a/feed.go +++ b/feed.go @@ -14,6 +14,7 @@ import ( _ "embed" + "go.sour.is/xt/internal/otel" "go.yarn.social/lextwt" "go.yarn.social/types" ) @@ -76,7 +77,10 @@ var ( last_modified_on, last_etag from feeds - where datetime(last_scan_on, '+'||refresh_rate||' seconds') < datetime(current_timestamp, '+10 minutes') + where datetime( + coalesce(last_scan_on, '1901-01-01'), + '+'||refresh_rate||' seconds' + ) < datetime(current_timestamp, '+10 minutes') ` updateFeed = ` update feeds set @@ -90,7 +94,9 @@ var ( ) func (f *Feed) Save(ctx context.Context, db *sql.DB) error { - fmt.Println(f.FetchURI, " ", f.LastModified, " ", f.LastError) + ctx, span := otel.Span(ctx) + defer span.End() + _, err := db.ExecContext( ctx, updateFeed, @@ -135,6 +141,8 @@ func (f *Feed) Scan(res interface{ Scan(...any) error }) error { } func loadFeeds(ctx context.Context, db *sql.DB) (iter.Seq[Feed], error) { + ctx, span := otel.Span(ctx) + var err error var res *sql.Rows @@ -145,6 +153,8 @@ func loadFeeds(ctx context.Context, db *sql.DB) (iter.Seq[Feed], error) { } return func(yield func(Feed) bool) { + defer span.End() + for res.Next() { var f Feed err = f.Scan(res) @@ -158,13 +168,16 @@ func loadFeeds(ctx context.Context, db *sql.DB) (iter.Seq[Feed], error) { }, err } -func storeFeed(db *sql.DB, f types.TwtFile) error { +func storeFeed(ctx context.Context, db *sql.DB, f types.TwtFile) error { + ctx, span := otel.Span(ctx) + defer span.End() + loadTS := time.Now() refreshRate := 600 feedID := urlNS.UUID5(cmp.Or(f.Twter().HashingURI, f.Twter().URI)) - tx, err := db.Begin() + tx, err := db.BeginTx(ctx, nil) if err != nil { return err } @@ -188,7 +201,8 @@ func storeFeed(db *sql.DB, f types.TwtFile) error { defer tx.Rollback() - _, err = tx.Exec( + _, err = tx.ExecContext( + ctx, insertFeed, feedID, f.Twter().HashingURI, @@ -220,7 +234,8 @@ func storeFeed(db *sql.DB, f types.TwtFile) error { } } - _, err = tx.Exec( + _, err = tx.ExecContext( + ctx, insertTwt, feedID, twt.Hash(), @@ -236,7 +251,8 @@ func storeFeed(db *sql.DB, f types.TwtFile) error { } for nick, uri := range followMap { - _, err = tx.Exec( + _, err = tx.ExecContext( + ctx, insertFeed, urlNS.UUID5(uri), uri, diff --git a/go.mod b/go.mod index 358000c..f453882 100644 --- a/go.mod +++ b/go.mod @@ -4,25 +4,59 @@ go 1.23.2 require ( github.com/mattn/go-sqlite3 v1.14.24 - go.yarn.social/lextwt v0.0.0-20240908172157-7b9ae633db51 + go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp v0.6.0 + go.yarn.social/lextwt v0.0.0-20250213063805-7adc6ca07564 +) + +require ( + github.com/beorn7/perks v1.0.1 // indirect + github.com/cenkalti/backoff/v4 v4.3.0 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/grpc-ecosystem/grpc-gateway/v2 v2.25.1 // indirect + github.com/klauspost/compress v1.17.9 // indirect + github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect + github.com/prometheus/client_model v0.6.1 // indirect + github.com/prometheus/common v0.55.0 // indirect + github.com/prometheus/procfs v0.15.1 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.34.0 // indirect + go.opentelemetry.io/proto/otlp v1.5.0 // indirect + golang.org/x/net v0.34.0 // indirect + golang.org/x/text v0.22.0 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20250115164207-1a7da9e5054f // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20250115164207-1a7da9e5054f // indirect + google.golang.org/protobuf v1.36.3 // indirect ) require ( github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect + github.com/google/uuid v1.6.0 // indirect go.opentelemetry.io/auto/sdk v1.1.0 // indirect - go.opentelemetry.io/otel/metric v1.34.0 // indirect - go.opentelemetry.io/otel/trace v1.34.0 // indirect + go.opentelemetry.io/otel/log v0.10.0 + go.opentelemetry.io/otel/metric v1.34.0 + go.opentelemetry.io/otel/trace v1.34.0 ) require ( github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/matryer/is v1.4.1 + github.com/prometheus/client_golang v1.20.5 github.com/sirupsen/logrus v1.9.3 // indirect + github.com/uptrace/opentelemetry-go-extra/otelsql v0.3.2 github.com/writeas/go-strip-markdown/v2 v2.1.1 // indirect + go.opentelemetry.io/contrib/bridges/otelslog v0.9.0 go.opentelemetry.io/otel v1.34.0 - go.yarn.social/types v0.0.0-20230305013457-e4d91e351ac8 - golang.org/x/crypto v0.27.0 // indirect - golang.org/x/sys v0.25.0 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.34.0 + go.opentelemetry.io/otel/exporters/stdout/stdoutlog v0.10.0 + go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.34.0 + go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.34.0 + go.opentelemetry.io/otel/sdk v1.34.0 + go.opentelemetry.io/otel/sdk/log v0.10.0 + go.opentelemetry.io/otel/sdk/metric v1.34.0 // indirect + go.yarn.social/types v0.0.0-20250108134258-ed75fa653ede + golang.org/x/crypto v0.33.0 // indirect + golang.org/x/sync v0.11.0 + golang.org/x/sys v0.30.0 // indirect + google.golang.org/grpc v1.70.0 // indirect ) diff --git a/go.sum b/go.sum index 0f9aef5..2ca56a6 100644 --- a/go.sum +++ b/go.sum @@ -1,46 +1,118 @@ -github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A= -github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= +github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= +github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= +github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.25.1 h1:VNqngBF40hVlDloBruUehVYC3ArSgIyScOAyMRqBxRg= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.25.1/go.mod h1:RBRO7fro65R6tjKzYgLAFo0t1QEXY1Dp+i/bvpRiqiQ= github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I= github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= +github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= +github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= +github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= +github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= github.com/matryer/is v1.4.1 h1:55ehd8zaGABKLXQUe2awZ99BD/PTc2ls+KV/dXphgEQ= github.com/matryer/is v1.4.1/go.mod h1:8I/i5uYgLzgsgEloJE1U6xx5HkBQpAZvepWuujKwMRU= github.com/mattn/go-sqlite3 v1.14.24 h1:tpSp2G2KyMnnQu99ngJ47EIkWVmliIizyZBfPrBWDRM= github.com/mattn/go-sqlite3 v1.14.24/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_golang v1.20.5 h1:cxppBPuYhUnsO6yo/aoRol4L7q7UFfdm+bR9r+8l63Y= +github.com/prometheus/client_golang v1.20.5/go.mod h1:PIEt8X02hGcP8JWbeHyeZ53Y/jReSnHgO035n//V5WE= +github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E= +github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY= +github.com/prometheus/common v0.55.0 h1:KEi6DK7lXW/m7Ig5i47x0vRzuBsHuvJdi5ee6Y3G1dc= +github.com/prometheus/common v0.55.0/go.mod h1:2SECS4xJG1kd8XF9IcM1gMX6510RAEL65zxzNImwdc8= +github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc= +github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/uptrace/opentelemetry-go-extra/otelsql v0.3.2 h1:ZjUj9BLYf9PEqBn8W/OapxhPjVRdC6CsXTdULHsyk5c= +github.com/uptrace/opentelemetry-go-extra/otelsql v0.3.2/go.mod h1:O8bHQfyinKwTXKkiKNGmLQS7vRsqRxIQTFZpYpHK3IQ= github.com/writeas/go-strip-markdown/v2 v2.1.1 h1:hAxUM21Uhznf/FnbVGiJciqzska6iLei22Ijc3q2e28= github.com/writeas/go-strip-markdown/v2 v2.1.1/go.mod h1:UvvgPJgn1vvN8nWuE5e7v/+qmDu3BSVnKAB6Gl7hFzA= go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= +go.opentelemetry.io/contrib/bridges/otelslog v0.9.0 h1:N+78eXSlu09kii5nkiM+01YbtWe01oZLPPLhNlEKhus= +go.opentelemetry.io/contrib/bridges/otelslog v0.9.0/go.mod h1:/2KhfLAhtQpgnhIk1f+dftA3fuuMcZjiz//Dc9yfaEs= go.opentelemetry.io/otel v1.34.0 h1:zRLXxLCgL1WyKsPVrgbSdMN4c0FMkDAskSTQP+0hdUY= go.opentelemetry.io/otel v1.34.0/go.mod h1:OWFPOQ+h4G8xpyjgqo4SxJYdDQ/qmRH+wivy7zzx9oI= +go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp v0.6.0 h1:QSKmLBzbFULSyHzOdO9JsN9lpE4zkrz1byYGmJecdVE= +go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp v0.6.0/go.mod h1:sTQ/NH8Yrirf0sJ5rWqVu+oT82i4zL9FaF6rWcqnptM= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.34.0 h1:OeNbIYk/2C15ckl7glBlOBp5+WlYsOElzTNmiPW/x60= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.34.0/go.mod h1:7Bept48yIeqxP2OZ9/AqIpYS94h2or0aB4FypJTc8ZM= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.34.0 h1:tgJ0uaNS4c98WRNUEx5U3aDlrDOI5Rs+1Vifcw4DJ8U= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.34.0/go.mod h1:U7HYyW0zt/a9x5J1Kjs+r1f/d4ZHnYFclhYY2+YbeoE= +go.opentelemetry.io/otel/exporters/stdout/stdoutlog v0.10.0 h1:GKCEAZLEpEf78cUvudQdTg0aET2ObOZRB2HtXA0qPAI= +go.opentelemetry.io/otel/exporters/stdout/stdoutlog v0.10.0/go.mod h1:9/zqSWLCmHT/9Jo6fYeUDRRogOLL60ABLsHWS99lF8s= +go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.34.0 h1:czJDQwFrMbOr9Kk+BPo1y8WZIIFIK58SA1kykuVeiOU= +go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.34.0/go.mod h1:lT7bmsxOe58Tq+JIOkTQMCGXdu47oA+VJKLZHbaBKbs= +go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.34.0 h1:jBpDk4HAUsrnVO1FsfCfCOTEc/MkInJmvfCHYLFiT80= +go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.34.0/go.mod h1:H9LUIM1daaeZaz91vZcfeM0fejXPmgCYE8ZhzqfJuiU= +go.opentelemetry.io/otel/log v0.10.0 h1:1CXmspaRITvFcjA4kyVszuG4HjA61fPDxMb7q3BuyF0= +go.opentelemetry.io/otel/log v0.10.0/go.mod h1:PbVdm9bXKku/gL0oFfUF4wwsQsOPlpo4VEqjvxih+FM= go.opentelemetry.io/otel/metric v1.34.0 h1:+eTR3U0MyfWjRDhmFMxe2SsW64QrZ84AOhvqS7Y+PoQ= go.opentelemetry.io/otel/metric v1.34.0/go.mod h1:CEDrp0fy2D0MvkXE+dPV7cMi8tWZwX3dmaIhwPOaqHE= +go.opentelemetry.io/otel/sdk v1.34.0 h1:95zS4k/2GOy069d321O8jWgYsW3MzVV+KuSPKp7Wr1A= +go.opentelemetry.io/otel/sdk v1.34.0/go.mod h1:0e/pNiaMAqaykJGKbi+tSjWfNNHMTxoC9qANsCzbyxU= +go.opentelemetry.io/otel/sdk/log v0.10.0 h1:lR4teQGWfeDVGoute6l0Ou+RpFqQ9vaPdrNJlST0bvw= +go.opentelemetry.io/otel/sdk/log v0.10.0/go.mod h1:A+V1UTWREhWAittaQEG4bYm4gAZa6xnvVu+xKrIRkzo= +go.opentelemetry.io/otel/sdk/metric v1.34.0 h1:5CeK9ujjbFVL5c1PhLuStg1wxA7vQv7ce1EK0Gyvahk= +go.opentelemetry.io/otel/sdk/metric v1.34.0/go.mod h1:jQ/r8Ze28zRKoNRdkjCZxfs6YvBTG1+YIqyFVFYec5w= go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC8mh/k= go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE= -go.uber.org/ratelimit v0.3.1 h1:K4qVE+byfv/B3tC+4nYWP7v/6SimcO7HzHekoMNBma0= -go.uber.org/ratelimit v0.3.1/go.mod h1:6euWsTB6U/Nb3X++xEUXA8ciPJvr19Q/0h1+oDcJhRk= -go.yarn.social/lextwt v0.0.0-20240908172157-7b9ae633db51 h1:XEjx0jSNv1h22gwGfQBfMypWv/YZXWGTRbqh3B8xfIs= -go.yarn.social/lextwt v0.0.0-20240908172157-7b9ae633db51/go.mod h1:CWAZuBHZfGaqa0FreSeLG+pzK3rHP2TNAG7Zh6QlRiM= -go.yarn.social/types v0.0.0-20230305013457-e4d91e351ac8 h1:zfnniiSO/WO65mSpdQzGYJ9pM0rYg/BKgrSm8h2mTyA= -go.yarn.social/types v0.0.0-20230305013457-e4d91e351ac8/go.mod h1:+xnDkQ0T0S8emxWIsvxlCAoyF8gBaj0q81hr/VrKc0c= -golang.org/x/crypto v0.27.0 h1:GXm2NjJrPaiv/h1tb2UH8QfgC/hOf/+z0p6PT8o1w7A= -golang.org/x/crypto v0.27.0/go.mod h1:1Xngt8kV6Dvbssa53Ziq6Eqn0HqbZi5Z6R0ZpwQzt70= +go.opentelemetry.io/proto/otlp v1.5.0 h1:xJvq7gMzB31/d406fB8U5CBdyQGw4P399D1aQWU/3i4= +go.opentelemetry.io/proto/otlp v1.5.0/go.mod h1:keN8WnHxOy8PG0rQZjJJ5A2ebUoafqWp0eVQ4yIXvJ4= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +go.yarn.social/lextwt v0.0.0-20250213063805-7adc6ca07564 h1:z+IAMtxNKWcLNm9nLzJwHw6OPkV5JoQYmmFohaUvcKI= +go.yarn.social/lextwt v0.0.0-20250213063805-7adc6ca07564/go.mod h1:JOPCOh+3bHv+BMaFZpKzw6soiXbIlZD5b2f7YKDDjqk= +go.yarn.social/types v0.0.0-20250108134258-ed75fa653ede h1:XV9tuDQ605xxH4qIQPRHM1bOa7k0rJZ2RqA5kz2Nun4= +go.yarn.social/types v0.0.0-20250108134258-ed75fa653ede/go.mod h1:+xnDkQ0T0S8emxWIsvxlCAoyF8gBaj0q81hr/VrKc0c= +golang.org/x/crypto v0.33.0 h1:IOBPskki6Lysi0lo9qQvbxiQ+FvsCC/YWOecCHAixus= +golang.org/x/crypto v0.33.0/go.mod h1:bVdXmD7IV/4GdElGPozy6U7lWdRXA4qyRVGJV57uQ5M= +golang.org/x/net v0.34.0 h1:Mb7Mrk043xzHgnRM88suvJFwzVrRfHEHJEl5/71CKw0= +golang.org/x/net v0.34.0/go.mod h1:di0qlW3YNM5oh6GqDGQr92MyTozJPmybPK4Ev/Gm31k= +golang.org/x/sync v0.11.0 h1:GGz8+XQP4FvTTrjZPzNKTMFtSXH80RAzG+5ghFPgK9w= +golang.org/x/sync v0.11.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34= -golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.30.0 h1:QjkSwP/36a20jFYWkSue1YwXzLmsV5Gfq7Eiy72C1uc= +golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.22.0 h1:bofq7m3/HAFvbF51jz3Q9wLg3jkvSPuiZu/pD1XwgtM= +golang.org/x/text v0.22.0/go.mod h1:YRoo4H8PVmsu+E3Ou7cqLVH8oXWIHVoX0jqUWALQhfY= +google.golang.org/genproto/googleapis/api v0.0.0-20250115164207-1a7da9e5054f h1:gap6+3Gk41EItBuyi4XX/bp4oqJ3UwuIMl25yGinuAA= +google.golang.org/genproto/googleapis/api v0.0.0-20250115164207-1a7da9e5054f/go.mod h1:Ic02D47M+zbarjYYUlK57y316f2MoN0gjAwI3f2S95o= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250115164207-1a7da9e5054f h1:OxYkA3wjPsZyBylwymxSHa7ViiW1Sml4ToBrncvFehI= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250115164207-1a7da9e5054f/go.mod h1:+2Yz8+CLJbIfL9z73EW45avw8Lmge3xVElCP9zEKi50= +google.golang.org/grpc v1.70.0 h1:pWFv03aZoHzlRKHWicjsZytKAiYCtNS0dHbXnIdq7jQ= +google.golang.org/grpc v1.70.0/go.mod h1:ofIJqVKDXx/JiXrwr2IG4/zwdH9txy3IlF40RmcJSQw= +google.golang.org/protobuf v1.36.3 h1:82DV7MYdb8anAVi3qge1wSnMDrnKK7ebr+I0hHRN1BU= +google.golang.org/protobuf v1.36.3/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/http.go b/http.go index 93514a9..0069cee 100644 --- a/http.go +++ b/http.go @@ -1,23 +1,24 @@ package main import ( - "context" + "errors" "fmt" "net/http" "slices" "sort" "strings" "time" + + "go.sour.is/xt/internal/otel" ) -func httpServer(c console, app *appState) { - c.Log("start http server") +func httpServer(c *console, app *appState) error { + otel.Info("start http server") db, err := app.DB() if err != nil { - c.Log("missing db", err) - c.abort() - return + otel.Info("missing db", err) + return err } http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { @@ -37,7 +38,7 @@ func httpServer(c console, app *appState) { rows, err := db.QueryContext(r.Context(), "SELECT feed_id, hash, conv, dt, text FROM twt WHERE hash = $1 or conv = $1", hash) if err != nil { - c.Log(err) + otel.Info("error", err) return } defer rows.Close() @@ -52,7 +53,7 @@ func httpServer(c console, app *appState) { } err = rows.Scan(&twt.FeedID, &twt.Hash, &twt.Conv, &twt.Dt, &twt.Text) if err != nil { - c.Log(err) + otel.Error(err) return } } @@ -73,18 +74,14 @@ func httpServer(c console, app *appState) { Handler: http.DefaultServeMux, } - go func() { - <-c.Done() - c.Log("stop http server") - srv.Shutdown(context.Background()) - }() - + c.AddCancel(srv.Shutdown) err = srv.ListenAndServe() - if err != nil { - c.Log(err) - c.abort() - return + if !errors.Is(err, http.ErrServerClosed) { + otel.Error(err) + return err } + + return nil } func notAny(s string, chars string) bool { diff --git a/internal/otel/otel.go b/internal/otel/otel.go new file mode 100644 index 0000000..a35e945 --- /dev/null +++ b/internal/otel/otel.go @@ -0,0 +1,272 @@ +package otel + +import ( + "context" + "errors" + "fmt" + "log/slog" + "net/http" + "os" + "runtime" + "time" + + "github.com/prometheus/client_golang/prometheus/promhttp" + "go.opentelemetry.io/contrib/bridges/otelslog" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" + "go.opentelemetry.io/otel/exporters/stdout/stdoutlog" + "go.opentelemetry.io/otel/exporters/stdout/stdoutmetric" + "go.opentelemetry.io/otel/exporters/stdout/stdouttrace" + "go.opentelemetry.io/otel/log/global" + "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/propagation" + "go.opentelemetry.io/otel/sdk/log" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/resource" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + semconv "go.opentelemetry.io/otel/semconv/v1.26.0" + "go.opentelemetry.io/otel/trace" +) + +var ( + tracer trace.Tracer + meter metric.Meter + logger *slog.Logger +) + +func Init(ctx context.Context, name string) (shutdown func(context.Context) error, err error) { + tracer = otel.Tracer(name) + meter = otel.Meter(name) + logger = otelslog.NewLogger(name) + + return setupOTelSDK(ctx, name) +} + +func Meter() metric.Meter { return meter } +func Error(err error, v ...any) { + if err == nil { + return + } + logger.Error(err.Error(), v...) +} +func Info(msg string, v ...any) { logger.Info(msg, v...) } +func Span(ctx context.Context, opts ...trace.SpanStartOption) (context.Context, trace.Span) { + name, attrs := Attrs() + ctx, span := tracer.Start(ctx, name, opts...) + span.SetAttributes(attrs...) + + return ctx, span +} +func Attrs() (string, []attribute.KeyValue) { + var attrs []attribute.KeyValue + var name string + if pc, file, line, ok := runtime.Caller(2); ok { + if fn := runtime.FuncForPC(pc); fn != nil { + name = fn.Name() + } + attrs = append(attrs, + attribute.String("pc", fmt.Sprintf("%v", pc)), + attribute.String("file", file), + attribute.Int("line", line), + ) + } + return name, attrs +} + +// setupOTelSDK bootstraps the OpenTelemetry pipeline. +// If it does not return an error, make sure to call shutdown for proper cleanup. +func setupOTelSDK(ctx context.Context, name string) (shutdown func(context.Context) error, err error) { + var shutdownFuncs []func(context.Context) error + + // shutdown calls cleanup functions registered via shutdownFuncs. + // The errors from the calls are joined. + // Each registered cleanup will be invoked once. + shutdown = func(ctx context.Context) error { + fmt.Println("shutdown") + var err error + for _, fn := range shutdownFuncs { + err = errors.Join(err, fn(ctx)) + } + shutdownFuncs = nil + return err + } + + // playShutdown := otelplay.ConfigureOpentelemetry(ctx) + // shutdownFuncs = append(shutdownFuncs, func(ctx context.Context) error { playShutdown(); return nil }) + + // handleErr calls shutdown for cleanup and makes sure that all errors are returned. + handleErr := func(inErr error) { + err = errors.Join(inErr, shutdown(ctx)) + } + + // Set up propagator. + prop := newPropagator() + otel.SetTextMapPropagator(prop) + + // Set up trace provider. + tracerShutdown, err := newTraceProvider(ctx, name) + if err != nil { + handleErr(err) + return + } + shutdownFuncs = append(shutdownFuncs, tracerShutdown) + + // Set up meter provider. + meterShutdown, err := newMeterProvider(ctx, name) + if err != nil { + handleErr(err) + return + } + shutdownFuncs = append(shutdownFuncs, meterShutdown) + + // Set up logger provider. + loggerShutdown, err := newLoggerProvider(ctx, name) + if err != nil { + handleErr(err) + return + } + shutdownFuncs = append(shutdownFuncs, loggerShutdown) + + return +} + +func newPropagator() propagation.TextMapPropagator { + return propagation.NewCompositeTextMapPropagator( + propagation.TraceContext{}, + propagation.Baggage{}, + ) +} + +func newTraceProvider(ctx context.Context, name string) (func(context.Context) error, error) { + r, err := resource.Merge( + resource.Default(), + resource.NewWithAttributes( + semconv.SchemaURL, + semconv.ServiceName(name), + ), + ) + if err != nil { + return nil, err + } + + if v := env("XT_TRACER", ""); v != "" { + fmt.Println("use tracer", v) + exp, err := otlptracegrpc.New( + ctx, + otlptracegrpc.WithEndpoint(v), + otlptracegrpc.WithInsecure(), + ) + if err != nil { + return nil, err + } + + tracerProvider := sdktrace.NewTracerProvider( + sdktrace.WithBatcher(exp), + sdktrace.WithResource(r), + ) + otel.SetTracerProvider(tracerProvider) + return func(ctx context.Context) error { + return tracerProvider.Shutdown(ctx) + }, nil + } + + traceExporter, err := stdouttrace.New( + stdouttrace.WithWriter(os.Stderr), + stdouttrace.WithPrettyPrint(), + ) + if err != nil { + return nil, err + } + + tracerProvider := sdktrace.NewTracerProvider( + sdktrace.WithResource(r), + sdktrace.WithBatcher(traceExporter, + // Default is 5s. Set to 1s for demonstrative purposes. + sdktrace.WithBatchTimeout(time.Second)), + ) + otel.SetTracerProvider(tracerProvider) + + return tracerProvider.Shutdown, nil +} + +func newMeterProvider(ctx context.Context, name string) (func(context.Context) error, error) { + metricExporter, err := stdoutmetric.New() + if err != nil { + return nil, err + } + + meterProvider := sdkmetric.NewMeterProvider( + sdkmetric.WithReader(sdkmetric.NewPeriodicReader(metricExporter, + // Default is 1m. Set to 3s for demonstrative purposes. + sdkmetric.WithInterval(3*time.Second))), + ) + otel.SetMeterProvider(meterProvider) + + http.Handle("/metrics", promhttp.Handler()) + return func(ctx context.Context) error { return nil }, nil +} + +func newLoggerProvider(ctx context.Context, name string) (func(context.Context) error, error) { + r, err := resource.Merge( + resource.Default(), + resource.NewWithAttributes( + semconv.SchemaURL, + semconv.ServiceName(name), + ), + ) + if err != nil { + return nil, err + } + + if v := env("XT_LOGGER", ""); v != "" { + fmt.Println("use logger", v) + + exp, err := otlploghttp.New( + ctx, + otlploghttp.WithInsecure(), + otlploghttp.WithEndpointURL(v), + ) + if err != nil { + return nil, err + } + + processor := log.NewBatchProcessor(exp) + provider := log.NewLoggerProvider( + log.WithProcessor(processor), + log.WithResource(r), + ) + global.SetLoggerProvider(provider) + + return processor.Shutdown, nil + + } + + // return func(ctx context.Context) error { return nil }, nil + + logExporter, err := stdoutlog.New( + stdoutlog.WithPrettyPrint(), + stdoutlog.WithWriter(os.Stderr), + ) + if err != nil { + return nil, err + } + + loggerProvider := log.NewLoggerProvider( + log.WithProcessor( + log.NewBatchProcessor(logExporter), + ), + log.WithResource(r), + ) + global.SetLoggerProvider(loggerProvider) + + return loggerProvider.Shutdown, nil +} + +func env(key, def string) string { + if v, ok := os.LookupEnv(key); ok { + return v + } + return def +} diff --git a/main.go b/main.go index 26e95ef..03c0f6d 100644 --- a/main.go +++ b/main.go @@ -1,45 +1,112 @@ package main import ( + "bufio" "context" "errors" "fmt" "io" "os" "os/signal" + "runtime/debug" + "strings" - "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/metric" + "go.sour.is/xt/internal/otel" ) const name = "go.sour.is/xt" -var ( - tracer = otel.Tracer(name) - meter = otel.Meter(name) -) +var m_up metric.Int64Gauge -type contextKey struct{ name string } +func main() { + dotEnv() // load .env + + ctx, console := newConsole(args{ + dbtype: env("XT_DBTYPE", "sqlite3"), + dbfile: env("XT_DBFILE", "file:twt.db"), + baseFeed: env("XT_BASE_FEED", "feed"), + Nick: env("XT_NICK", "xuu"), + URI: env("XT_URI", "https://txt.sour.is/users/xuu/twtxt.txt"), + Listen: env("XT_LISTEN", ":8080"), + }) + + finish, err := otel.Init(ctx, name) + console.IfFatal(err) + console.AddCancel(finish) + + m_up, err = otel.Meter().Int64Gauge("up") + console.IfFatal(err) + + m_up.Record(ctx, 1) + defer m_up.Record(context.Background(), 0) + + bi, _ := debug.ReadBuildInfo() + otel.Info(name, "version", bi.Main.Version) + + err = run(console) + if !errors.Is(err, context.Canceled) { + console.IfFatal(err) + } +} type console struct { io.Reader io.Writer err io.Writer context.Context - abort func() + abort func() + cancelfns []func(context.Context) error } -func (c console) Log(v ...any) { fmt.Fprintln(c.err, v...) } -func (c console) Args() args { +func newConsole(args args) (context.Context, *console) { + ctx := context.Background() + ctx, abort := context.WithCancel(ctx) + ctx, stop := signal.NotifyContext(ctx, os.Interrupt) + go func() { <-ctx.Done(); stop() }() // restore interrupt function + + console := &console{Reader: os.Stdin, Writer: os.Stdout, err: os.Stderr, Context: ctx, abort: abort} + console.Set("console", console) + console.Set("args", args) + return ctx, console +} + +func (c *console) Args() args { v, ok := c.Get("args").(args) if !ok { return args{} } return v } +func (c *console) Shutdown() error { + fmt.Fprintln(c.err, "shutting down ", len(c.cancelfns), " cancel functions...") + defer fmt.Fprintln(c.err, "done") + + c.abort() + var err error + for _, fn := range c.cancelfns { + err = errors.Join(err, fn(c.Context)) + } + return err +} +func (c *console) AddCancel(fn func(context.Context) error) { c.cancelfns = append(c.cancelfns, fn) } + +func (c *console) IfFatal(err error) { + if err == nil { + return + } + fmt.Fprintln(c.err, err) + c.abort() + os.Exit(1) +} + +type contextKey struct{ name string } + func (c *console) Set(name string, value any) { c.Context = context.WithValue(c.Context, contextKey{name}, value) } -func (c console) Get(name string) any { + +func (c *console) Get(name string) any { return c.Context.Value(contextKey{name}) } @@ -59,25 +126,25 @@ func env(key, def string) string { return def } -func main() { - ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt) - console := console{os.Stdin, os.Stdout, os.Stderr, ctx, stop} - - go func() { <-ctx.Done(); console.Log("shutdown"); stop() }() - - args := args{ - dbtype: env("XT_DBTYPE", "sqlite3"), - dbfile: env("XT_DBFILE", "file:twt.db"), - baseFeed: env("XT_BASE_FEED", "feed"), - Nick: env("XT_NICK", "xuu"), - URI: env("XT_URI", "https://txt.sour.is/users/xuu/twtxt.txt"), - Listen: env("XT_LISTEN", ":8040"), +func dotEnv() { + fd, err := os.Open(".env") + if err != nil { + return } - console.Set("args", args) + scan := bufio.NewScanner(fd) - if err := run(console); err != nil && !errors.Is(err, context.Canceled) { - fmt.Println(err) - os.Exit(1) + for scan.Scan() { + line := scan.Text() + + if strings.HasPrefix(line, "#") { + continue + } + key, val, ok := strings.Cut(line, "=") + if !ok { + continue + } + + os.Setenv(strings.TrimSpace(key), strings.TrimSpace(val)) } } diff --git a/otel.go b/otel.go new file mode 100644 index 0000000..c9ecbf5 --- /dev/null +++ b/otel.go @@ -0,0 +1,2 @@ +package main + diff --git a/refresh-loop.go b/refresh-loop.go index 1fe5e30..9905502 100644 --- a/refresh-loop.go +++ b/refresh-loop.go @@ -9,6 +9,7 @@ import ( "strings" "time" + "go.sour.is/xt/internal/otel" "go.yarn.social/lextwt" "go.yarn.social/types" ) @@ -21,8 +22,9 @@ const ( TwoMinutes = 60 ) -func refreshLoop(c console, app *appState) { - defer c.abort() +func refreshLoop(c *console, app *appState) error { + ctx, span := otel.Span(c.Context) + defer span.End() f := NewHTTPFetcher() fetch, close := NewFuncPool(c.Context, 25, f.Fetch) @@ -30,60 +32,58 @@ func refreshLoop(c console, app *appState) { db, err := app.DB() if err != nil { - c.Log("missing db") - c.abort() - return + otel.Error(err, "missing db") + return err } queue := app.queue - c.Log("start refresh loop") - for c.Err() == nil { + otel.Info("start refresh loop") + for c.Context.Err() == nil { if queue.IsEmpty() { - c.Log("load feeds") + otel.Info("load feeds") it, err := loadFeeds(c.Context, db) for f := range it { queue.Insert(&f) } if err != nil { - c.Log(err) - return + otel.Error(err) + return err } } f := queue.ExtractMin() if f == nil { - c.Log("sleeping for ", TenMinutes*time.Second) + otel.Info("sleeping for ", TenMinutes*time.Second) select { case <-time.After(TenMinutes * time.Second): case <-c.Done(): - return + return nil } continue } - c.Log("queue size", queue.count, "next", f.URI, "next scan on", f.LastScanOn.Time.Format(time.RFC3339)) + otel.Info("queue size", queue.count, "next", f.URI, "next scan on", f.LastScanOn.Time.Format(time.RFC3339)) if time.Until(f.LastScanOn.Time) > 2*time.Hour { - c.Log("too soon", f.URI) + otel.Info("too soon", f.URI) continue } select { case <-c.Done(): - return + return nil case t := <-time.After(time.Until(f.LastScanOn.Time)): - c.Log("fetch", t.Format(time.RFC3339), f.Nick, f.URI) + otel.Info("fetch", t.Format(time.RFC3339), f.Nick, f.URI) fetch.Fetch(f) case res := <-fetch.Out(): - c.Log("got response:", res.Request.URI) + otel.Info("got response:", res.Request.URI) f := res.Request f.LastScanOn.Time = time.Now() err := res.err if res.err != nil { - f.LastError.String, f.LastError.Valid = err.Error(), true if errors.Is(err, ErrPermanentlyDead) { f.RefreshRate = TenYear } @@ -94,11 +94,11 @@ func refreshLoop(c console, app *appState) { f.RefreshRate = OneDay } - c.Log(err) + otel.Error(err) + f.LastError.String, f.LastError.Valid = err.Error(), true err = f.Save(c.Context, db) if err != nil { - c.Log(err) - return + otel.Error(err) } continue @@ -108,13 +108,30 @@ func refreshLoop(c console, app *appState) { f.LastModified.Time, f.LastModified.Valid = res.LastModified(), true cpy, err := os.OpenFile(filepath.Join("feeds", urlNS.UUID5(f.URI).MarshalText()), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600) - rdr := io.TeeReader(res.Body, cpy) + if err != nil { + otel.Error(fmt.Errorf("%w: %w", ErrParseFailed, err)) + f.LastError.String, f.LastError.Valid = err.Error(), true + f.RefreshRate = OneDay + + err = f.Save(c.Context, db) + otel.Error(err) + + continue + } + rdr := io.TeeReader(res.Body, cpy) + rdr = lextwt.TwtFixer(rdr) twtfile, err := lextwt.ParseFile(rdr, &types.Twter{Nick: f.Nick, URI: f.URI}) if err != nil { - c.Log(fmt.Errorf("%w: %w", ErrParseFailed, err)) + otel.Error(fmt.Errorf("%w: %w", ErrParseFailed, err)) + + f.LastError.String, f.LastError.Valid = err.Error(), true f.RefreshRate = OneDay - return + + err = f.Save(c.Context, db) + otel.Error(err) + + continue } if prev, ok := twtfile.Info().GetN("prev", 0); f.FirstFetch && ok { @@ -131,13 +148,15 @@ func refreshLoop(c console, app *appState) { } } - err = storeFeed(db, twtfile) + err = storeFeed(ctx, db, twtfile) if err != nil { - c.Log(err) + otel.Error(err) + f.LastError.String, f.LastError.Valid = err.Error(), true err = f.Save(c.Context, db) - c.Log(err) - return + + otel.Error(err) + return err } cpy.Close() @@ -148,9 +167,11 @@ func refreshLoop(c console, app *appState) { err = f.Save(c.Context, db) if err != nil { - c.Log(err) - return + otel.Error(err) + return err } } } + + return c.Context.Err() } diff --git a/service.go b/service.go index a63978e..6344876 100644 --- a/service.go +++ b/service.go @@ -12,11 +12,18 @@ import ( _ "embed" _ "github.com/mattn/go-sqlite3" + "github.com/uptrace/opentelemetry-go-extra/otelsql" + semconv "go.opentelemetry.io/otel/semconv/v1.4.0" + "go.sour.is/xt/internal/otel" "go.yarn.social/lextwt" "go.yarn.social/types" + "golang.org/x/sync/errgroup" ) -func run(c console) error { +func run(c *console) error { + ctx, span := otel.Span(c.Context) + defer span.End() + a := c.Args() app := &appState{ args: a, @@ -28,6 +35,9 @@ func run(c console) error { // Setup DB err := func(ctx context.Context) error { + ctx, span := otel.Span(ctx) + defer span.End() + db, err := app.DB() if err != nil { return err @@ -42,13 +52,16 @@ func run(c console) error { } return nil - }(c.Context) + }(ctx) if err != nil { return err } // Seed File - err = func() error { + err = func(ctx context.Context) error { + ctx, span := otel.Span(ctx) + defer span.End() + f, err := os.Open(a.baseFeed) if err != nil { return err @@ -69,29 +82,34 @@ func run(c console) error { } defer db.Close() - return storeFeed(db, twtfile) - }() + return storeFeed(ctx, db, twtfile) + }(ctx) if err != nil { return err } - go refreshLoop(c, app) + wg, ctx := errgroup.WithContext(ctx) + c.Context = ctx + + wg.Go(func() error { return refreshLoop(c, app) }) go httpServer(c, app) - <-c.Done() - return c.Err() + wg.Wait() + return c.Context.Err() } type appState struct { args args feeds sync.Map queue *fibHeap[Feed] - - } func (app *appState) DB() (*sql.DB, error) { - return sql.Open(app.args.dbtype, app.args.dbfile) + // return sql.Open(app.args.dbtype, app.args.dbfile) + + return otelsql.Open(app.args.dbtype, app.args.dbfile, + otelsql.WithAttributes(semconv.DBSystemSqlite), + otelsql.WithDBName("mydb")) } func (app *appState) Feed(feedID string) *Feed {