chore: changes for otel and fixes to loop
This commit is contained in:
parent
42fe9176b7
commit
42a9b26b22
2
.gitignore
vendored
2
.gitignore
vendored
@ -4,3 +4,5 @@ __debug*
|
||||
feeds/
|
||||
/xt
|
||||
.env
|
||||
*.txt
|
||||
*.txt.xz
|
4
app.go
4
app.go
@ -99,7 +99,7 @@ func run(c *console) error {
|
||||
c.Context = ctx
|
||||
|
||||
wg.Go(func() error {
|
||||
return refreshLoop(c, app)
|
||||
return feedRefreshProcessor(c, app)
|
||||
})
|
||||
go httpServer(c, app)
|
||||
|
||||
@ -132,7 +132,7 @@ func (app *appState) DB(ctx context.Context) (db, error) {
|
||||
db := db{Params: make(map[string]string)}
|
||||
db.DB, err = otelsql.Open(app.args.dbtype, app.args.dbfile,
|
||||
otelsql.WithAttributes(semconv.DBSystemSqlite),
|
||||
otelsql.WithDBName("mydb"))
|
||||
otelsql.WithDBName("xt"))
|
||||
if err != nil {
|
||||
return db, err
|
||||
}
|
||||
|
92
cmd/load/main.go
Normal file
92
cmd/load/main.go
Normal file
@ -0,0 +1,92 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"hash/fnv"
|
||||
"iter"
|
||||
"os"
|
||||
|
||||
"github.com/oklog/ulid/v2"
|
||||
"github.com/uptrace/opentelemetry-go-extra/otelsql"
|
||||
"go.yarn.social/lextwt"
|
||||
"go.yarn.social/types"
|
||||
)
|
||||
|
||||
func main() {
|
||||
in := os.Stdin
|
||||
if len(os.Args) != 2 {
|
||||
fmt.Fprint(os.Stderr, "usage: ", os.Args[0], "[db file]")
|
||||
}
|
||||
|
||||
db, err := DB(context.Background(), os.Args[1])
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
_ = db
|
||||
|
||||
for line := range lextwt.IterRegistry(in) {
|
||||
_ = line
|
||||
}
|
||||
}
|
||||
|
||||
const MaxVariableNumber = 32766
|
||||
|
||||
func DB(ctx context.Context, cnx string) (*sql.DB, error) {
|
||||
// return sql.Open(app.args.dbtype, app.args.dbfile)
|
||||
|
||||
db, err := otelsql.Open("sqlite", cnx)
|
||||
if err != nil {
|
||||
return db, err
|
||||
}
|
||||
|
||||
return db, err
|
||||
}
|
||||
|
||||
func makeULID(twt types.Twt) ulid.ULID {
|
||||
h64 := fnv.New64a()
|
||||
h16 := fnv.New32a()
|
||||
text := []byte(fmt.Sprintf("%+l", twt))
|
||||
b := make([]byte, 10)
|
||||
copy(b, h16.Sum(text)[:2])
|
||||
copy(b[2:], h64.Sum(text))
|
||||
u := ulid.ULID{}
|
||||
u.SetTime(ulid.Timestamp(twt.Created()))
|
||||
u.SetEntropy(b)
|
||||
|
||||
return u
|
||||
}
|
||||
|
||||
func chunk(args []any, qry func(int) (string, int), maxArgs int) iter.Seq2[string, []any] {
|
||||
_, size := qry(1)
|
||||
itemsPerIter := maxArgs / size
|
||||
|
||||
if len(args) < size {
|
||||
return func(yield func(string, []any) bool) {}
|
||||
}
|
||||
|
||||
if len(args) < maxArgs {
|
||||
return func(yield func(string, []any) bool) {
|
||||
query, _ := qry(len(args) / size)
|
||||
yield(query, args)
|
||||
}
|
||||
}
|
||||
|
||||
return func(yield func(string, []any) bool) {
|
||||
for len(args) > 0 {
|
||||
if len(args) > maxArgs {
|
||||
query, size := qry(itemsPerIter)
|
||||
if !yield(query, args[:size]) {
|
||||
return
|
||||
}
|
||||
args = args[size:]
|
||||
continue
|
||||
}
|
||||
|
||||
query, _ := qry(len(args) / size)
|
||||
yield(query, args)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
29
fetcher.go
29
fetcher.go
@ -9,6 +9,8 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/metric"
|
||||
"go.sour.is/xt/internal/otel"
|
||||
)
|
||||
|
||||
@ -50,10 +52,18 @@ func (r *Response) LastModified() time.Time {
|
||||
|
||||
type httpFetcher struct {
|
||||
client *http.Client
|
||||
|
||||
m_fetch_status metric.Int64Counter
|
||||
m_fetch_second metric.Float64Histogram
|
||||
}
|
||||
|
||||
func NewHTTPFetcher() *httpFetcher {
|
||||
fetch_total, _ := otel.Meter().Int64Counter("xt_fetch_status_total")
|
||||
fetch_second, _ := otel.Meter().Float64Histogram("xt_fetch_seconds")
|
||||
|
||||
return &httpFetcher{
|
||||
m_fetch_status: fetch_total,
|
||||
m_fetch_second: fetch_second,
|
||||
client: &http.Client{
|
||||
Transport: &http.Transport{
|
||||
Proxy: http.ProxyFromEnvironment,
|
||||
@ -75,6 +85,12 @@ func (f *httpFetcher) Fetch(ctx context.Context, request *Feed) *Response {
|
||||
ctx, span := otel.Span(ctx)
|
||||
defer span.End()
|
||||
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
since := time.Since(start)
|
||||
f.m_fetch_second.Record(ctx, since.Seconds())
|
||||
}()
|
||||
|
||||
defer fmt.Println("fetch done", request.URI)
|
||||
response := &Response{
|
||||
Request: request,
|
||||
@ -100,17 +116,24 @@ func (f *httpFetcher) Fetch(ctx context.Context, request *Feed) *Response {
|
||||
response.Response = res
|
||||
switch res.StatusCode {
|
||||
case 200:
|
||||
f.m_fetch_status.Add(ctx, 1, metric.WithAttributes(attribute.String("status", "ok")))
|
||||
|
||||
case 304:
|
||||
f.m_fetch_status.Add(ctx, 1, metric.WithAttributes(attribute.String("status", "not_modified")))
|
||||
response.err = fmt.Errorf("%w: %s", ErrUnmodified, res.Status)
|
||||
|
||||
case 400, 406, 429, 502, 503:
|
||||
f.m_fetch_status.Add(ctx, 1, metric.WithAttributes(attribute.String("status", "temp_fail")))
|
||||
response.err = fmt.Errorf("%w: %s", ErrTemporarilyDead, res.Status)
|
||||
|
||||
case 403, 404, 410:
|
||||
f.m_fetch_status.Add(ctx, 1, metric.WithAttributes(attribute.String("status", "perm_fail")))
|
||||
|
||||
response.err = fmt.Errorf("%w: %s", ErrPermanentlyDead, res.Status)
|
||||
|
||||
default:
|
||||
f.m_fetch_status.Add(ctx, 1, metric.WithAttributes(attribute.Int("status", res.StatusCode)))
|
||||
|
||||
response.err = errors.New(res.Status)
|
||||
}
|
||||
|
||||
@ -134,7 +157,7 @@ func NewFuncPool[IN, OUT any](
|
||||
var wg sync.WaitGroup
|
||||
|
||||
in := make(chan IN, size)
|
||||
out := make(chan OUT)
|
||||
out := make(chan OUT, size)
|
||||
|
||||
wg.Add(size)
|
||||
for range size {
|
||||
@ -159,10 +182,6 @@ func NewFuncPool[IN, OUT any](
|
||||
return
|
||||
case out <- r:
|
||||
span.AddEvent("sent queue")
|
||||
case <-time.After(20 * time.Second):
|
||||
fmt.Println("GOT STUCK", request)
|
||||
span.AddEvent("GOT STUCK")
|
||||
cancel()
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
15
go.mod
15
go.mod
@ -16,7 +16,7 @@ require (
|
||||
github.com/klauspost/compress v1.17.9 // indirect
|
||||
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
|
||||
github.com/prometheus/client_model v0.6.1 // indirect
|
||||
github.com/prometheus/common v0.55.0 // indirect
|
||||
github.com/prometheus/common v0.62.0 // indirect
|
||||
github.com/prometheus/procfs v0.15.1 // indirect
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.34.0 // indirect
|
||||
go.opentelemetry.io/proto/otlp v1.5.0 // indirect
|
||||
@ -24,7 +24,7 @@ require (
|
||||
golang.org/x/text v0.22.0 // indirect
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20250115164207-1a7da9e5054f // indirect
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20250115164207-1a7da9e5054f // indirect
|
||||
google.golang.org/protobuf v1.36.3 // indirect
|
||||
google.golang.org/protobuf v1.36.5 // indirect
|
||||
)
|
||||
|
||||
require (
|
||||
@ -33,8 +33,8 @@ require (
|
||||
github.com/google/uuid v1.6.0 // indirect
|
||||
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
|
||||
go.opentelemetry.io/otel/log v0.10.0
|
||||
go.opentelemetry.io/otel/metric v1.34.0
|
||||
go.opentelemetry.io/otel/trace v1.34.0
|
||||
go.opentelemetry.io/otel/metric v1.35.0
|
||||
go.opentelemetry.io/otel/trace v1.35.0
|
||||
)
|
||||
|
||||
require (
|
||||
@ -47,14 +47,15 @@ require (
|
||||
github.com/uptrace/opentelemetry-go-extra/otelsql v0.3.2
|
||||
github.com/writeas/go-strip-markdown/v2 v2.1.1 // indirect
|
||||
go.opentelemetry.io/contrib/bridges/otelslog v0.9.0
|
||||
go.opentelemetry.io/otel v1.34.0
|
||||
go.opentelemetry.io/otel v1.35.0
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.34.0
|
||||
go.opentelemetry.io/otel/exporters/prometheus v0.57.0
|
||||
go.opentelemetry.io/otel/exporters/stdout/stdoutlog v0.10.0
|
||||
go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.34.0
|
||||
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.34.0
|
||||
go.opentelemetry.io/otel/sdk v1.34.0
|
||||
go.opentelemetry.io/otel/sdk v1.35.0
|
||||
go.opentelemetry.io/otel/sdk/log v0.10.0
|
||||
go.opentelemetry.io/otel/sdk/metric v1.34.0
|
||||
go.opentelemetry.io/otel/sdk/metric v1.35.0
|
||||
go.yarn.social/types v0.0.0-20250108134258-ed75fa653ede
|
||||
golang.org/x/crypto v0.33.0 // indirect
|
||||
golang.org/x/sync v0.11.0
|
||||
|
16
go.sum
16
go.sum
@ -48,6 +48,8 @@ github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p
|
||||
github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY=
|
||||
github.com/prometheus/common v0.55.0 h1:KEi6DK7lXW/m7Ig5i47x0vRzuBsHuvJdi5ee6Y3G1dc=
|
||||
github.com/prometheus/common v0.55.0/go.mod h1:2SECS4xJG1kd8XF9IcM1gMX6510RAEL65zxzNImwdc8=
|
||||
github.com/prometheus/common v0.62.0 h1:xasJaQlnWAeyHdUBeGjXmutelfJHWMRr+Fg4QszZ2Io=
|
||||
github.com/prometheus/common v0.62.0/go.mod h1:vyBcEuLSvWos9B1+CyL7JZ2up+uFzXhkqml0W5zIY1I=
|
||||
github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc=
|
||||
github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk=
|
||||
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
|
||||
@ -66,12 +68,16 @@ go.opentelemetry.io/contrib/bridges/otelslog v0.9.0 h1:N+78eXSlu09kii5nkiM+01Ybt
|
||||
go.opentelemetry.io/contrib/bridges/otelslog v0.9.0/go.mod h1:/2KhfLAhtQpgnhIk1f+dftA3fuuMcZjiz//Dc9yfaEs=
|
||||
go.opentelemetry.io/otel v1.34.0 h1:zRLXxLCgL1WyKsPVrgbSdMN4c0FMkDAskSTQP+0hdUY=
|
||||
go.opentelemetry.io/otel v1.34.0/go.mod h1:OWFPOQ+h4G8xpyjgqo4SxJYdDQ/qmRH+wivy7zzx9oI=
|
||||
go.opentelemetry.io/otel v1.35.0 h1:xKWKPxrxB6OtMCbmMY021CqC45J+3Onta9MqjhnusiQ=
|
||||
go.opentelemetry.io/otel v1.35.0/go.mod h1:UEqy8Zp11hpkUrL73gSlELM0DupHoiq72dR+Zqel/+Y=
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp v0.6.0 h1:QSKmLBzbFULSyHzOdO9JsN9lpE4zkrz1byYGmJecdVE=
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp v0.6.0/go.mod h1:sTQ/NH8Yrirf0sJ5rWqVu+oT82i4zL9FaF6rWcqnptM=
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.34.0 h1:OeNbIYk/2C15ckl7glBlOBp5+WlYsOElzTNmiPW/x60=
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.34.0/go.mod h1:7Bept48yIeqxP2OZ9/AqIpYS94h2or0aB4FypJTc8ZM=
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.34.0 h1:tgJ0uaNS4c98WRNUEx5U3aDlrDOI5Rs+1Vifcw4DJ8U=
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.34.0/go.mod h1:U7HYyW0zt/a9x5J1Kjs+r1f/d4ZHnYFclhYY2+YbeoE=
|
||||
go.opentelemetry.io/otel/exporters/prometheus v0.57.0 h1:AHh/lAP1BHrY5gBwk8ncc25FXWm/gmmY3BX258z5nuk=
|
||||
go.opentelemetry.io/otel/exporters/prometheus v0.57.0/go.mod h1:QpFWz1QxqevfjwzYdbMb4Y1NnlJvqSGwyuU0B4iuc9c=
|
||||
go.opentelemetry.io/otel/exporters/stdout/stdoutlog v0.10.0 h1:GKCEAZLEpEf78cUvudQdTg0aET2ObOZRB2HtXA0qPAI=
|
||||
go.opentelemetry.io/otel/exporters/stdout/stdoutlog v0.10.0/go.mod h1:9/zqSWLCmHT/9Jo6fYeUDRRogOLL60ABLsHWS99lF8s=
|
||||
go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.34.0 h1:czJDQwFrMbOr9Kk+BPo1y8WZIIFIK58SA1kykuVeiOU=
|
||||
@ -82,14 +88,22 @@ go.opentelemetry.io/otel/log v0.10.0 h1:1CXmspaRITvFcjA4kyVszuG4HjA61fPDxMb7q3Bu
|
||||
go.opentelemetry.io/otel/log v0.10.0/go.mod h1:PbVdm9bXKku/gL0oFfUF4wwsQsOPlpo4VEqjvxih+FM=
|
||||
go.opentelemetry.io/otel/metric v1.34.0 h1:+eTR3U0MyfWjRDhmFMxe2SsW64QrZ84AOhvqS7Y+PoQ=
|
||||
go.opentelemetry.io/otel/metric v1.34.0/go.mod h1:CEDrp0fy2D0MvkXE+dPV7cMi8tWZwX3dmaIhwPOaqHE=
|
||||
go.opentelemetry.io/otel/metric v1.35.0 h1:0znxYu2SNyuMSQT4Y9WDWej0VpcsxkuklLa4/siN90M=
|
||||
go.opentelemetry.io/otel/metric v1.35.0/go.mod h1:nKVFgxBZ2fReX6IlyW28MgZojkoAkJGaE8CpgeAU3oE=
|
||||
go.opentelemetry.io/otel/sdk v1.34.0 h1:95zS4k/2GOy069d321O8jWgYsW3MzVV+KuSPKp7Wr1A=
|
||||
go.opentelemetry.io/otel/sdk v1.34.0/go.mod h1:0e/pNiaMAqaykJGKbi+tSjWfNNHMTxoC9qANsCzbyxU=
|
||||
go.opentelemetry.io/otel/sdk v1.35.0 h1:iPctf8iprVySXSKJffSS79eOjl9pvxV9ZqOWT0QejKY=
|
||||
go.opentelemetry.io/otel/sdk v1.35.0/go.mod h1:+ga1bZliga3DxJ3CQGg3updiaAJoNECOgJREo9KHGQg=
|
||||
go.opentelemetry.io/otel/sdk/log v0.10.0 h1:lR4teQGWfeDVGoute6l0Ou+RpFqQ9vaPdrNJlST0bvw=
|
||||
go.opentelemetry.io/otel/sdk/log v0.10.0/go.mod h1:A+V1UTWREhWAittaQEG4bYm4gAZa6xnvVu+xKrIRkzo=
|
||||
go.opentelemetry.io/otel/sdk/metric v1.34.0 h1:5CeK9ujjbFVL5c1PhLuStg1wxA7vQv7ce1EK0Gyvahk=
|
||||
go.opentelemetry.io/otel/sdk/metric v1.34.0/go.mod h1:jQ/r8Ze28zRKoNRdkjCZxfs6YvBTG1+YIqyFVFYec5w=
|
||||
go.opentelemetry.io/otel/sdk/metric v1.35.0 h1:1RriWBmCKgkeHEhM7a2uMjMUfP7MsOF5JpUCaEqEI9o=
|
||||
go.opentelemetry.io/otel/sdk/metric v1.35.0/go.mod h1:is6XYCUMpcKi+ZsOvfluY5YstFnhW0BidkR+gL+qN+w=
|
||||
go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC8mh/k=
|
||||
go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE=
|
||||
go.opentelemetry.io/otel/trace v1.35.0 h1:dPpEfJu1sDIqruz7BHFG3c7528f6ddfSWfFDVt/xgMs=
|
||||
go.opentelemetry.io/otel/trace v1.35.0/go.mod h1:WUk7DtFp1Aw2MkvqGdwiXYDZZNvA/1J8o6xRXLrIkyc=
|
||||
go.opentelemetry.io/proto/otlp v1.5.0 h1:xJvq7gMzB31/d406fB8U5CBdyQGw4P399D1aQWU/3i4=
|
||||
go.opentelemetry.io/proto/otlp v1.5.0/go.mod h1:keN8WnHxOy8PG0rQZjJJ5A2ebUoafqWp0eVQ4yIXvJ4=
|
||||
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
|
||||
@ -117,6 +131,8 @@ google.golang.org/grpc v1.70.0 h1:pWFv03aZoHzlRKHWicjsZytKAiYCtNS0dHbXnIdq7jQ=
|
||||
google.golang.org/grpc v1.70.0/go.mod h1:ofIJqVKDXx/JiXrwr2IG4/zwdH9txy3IlF40RmcJSQw=
|
||||
google.golang.org/protobuf v1.36.3 h1:82DV7MYdb8anAVi3qge1wSnMDrnKK7ebr+I0hHRN1BU=
|
||||
google.golang.org/protobuf v1.36.3/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
|
||||
google.golang.org/protobuf v1.36.5 h1:tPhr+woSbjfYvY6/GPufUoYizxw1cF/yFoxJ2fmpwlM=
|
||||
google.golang.org/protobuf v1.36.5/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||
|
@ -82,6 +82,7 @@ github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l
|
||||
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
|
||||
github.com/golang/mock v1.1.1 h1:G5FRp8JnTd7RQH5kemVNlMeyXQAztQ3mOWV95KxsXH8=
|
||||
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
|
||||
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
|
||||
github.com/google/go-dap v0.12.0 h1:rVcjv3SyMIrpaOoTAdFDyHs99CwVOItIJGKLQFQhNeM=
|
||||
github.com/google/go-dap v0.12.0/go.mod h1:tNjCASCm5cqePi/RVXXWEVqtnNLV1KTWtYOqu6rZNzc=
|
||||
github.com/google/gofuzz v1.0.0 h1:A8PeW59pxE9IoFRqBp37U+mSNaQoZ46F1f0f863XSXw=
|
||||
|
@ -10,21 +10,25 @@ import (
|
||||
"runtime"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
"go.opentelemetry.io/contrib/bridges/otelslog"
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp"
|
||||
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
|
||||
otelprom "go.opentelemetry.io/otel/exporters/prometheus"
|
||||
"go.opentelemetry.io/otel/exporters/stdout/stdoutlog"
|
||||
"go.opentelemetry.io/otel/exporters/stdout/stdouttrace"
|
||||
"go.opentelemetry.io/otel/log/global"
|
||||
"go.opentelemetry.io/otel/log/global"
|
||||
"go.opentelemetry.io/otel/metric"
|
||||
"go.opentelemetry.io/otel/propagation"
|
||||
"go.opentelemetry.io/otel/sdk/log"
|
||||
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
|
||||
"go.opentelemetry.io/otel/sdk/resource"
|
||||
sdktrace "go.opentelemetry.io/otel/sdk/trace"
|
||||
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
|
||||
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
||||
@ -43,6 +47,7 @@ func Init(ctx context.Context, name string) (shutdown func(context.Context) erro
|
||||
}
|
||||
|
||||
func Meter() metric.Meter { return meter }
|
||||
|
||||
// func Error(err error, v ...any) {
|
||||
// if err == nil {
|
||||
// return
|
||||
@ -52,33 +57,37 @@ func Meter() metric.Meter { return meter }
|
||||
// }
|
||||
// func Info(msg string, v ...any) { fmt.Println(append([]any{msg}, v...)); logger.Info(msg, v...) }
|
||||
|
||||
type spanny struct{
|
||||
type spanny struct {
|
||||
trace.Span
|
||||
}
|
||||
func (s *spanny) RecordError(err error, options ...trace.EventOption) {
|
||||
|
||||
func (s *spanny) RecordError(err error, options ...trace.EventOption) {
|
||||
if err == nil {
|
||||
return
|
||||
}
|
||||
ec := trace.NewEventConfig(options...)
|
||||
|
||||
|
||||
attrs := make([]any, len(ec.Attributes()))
|
||||
for i, v := range ec.Attributes() {
|
||||
attrs[i] = v
|
||||
}
|
||||
|
||||
fmt.Println(append([]any{"ERR:", err}, attrs...)...)
|
||||
logger.Error(err.Error(), attrs...)
|
||||
fmt.Println(append([]any{"ERR:", err}, attrs...)...)
|
||||
logger.Error(err.Error(), attrs...)
|
||||
s.Span.RecordError(err, options...)
|
||||
}
|
||||
func (s *spanny) AddEvent(name string, options ...trace.EventOption) {
|
||||
func (s *spanny) AddEvent(name string, options ...trace.EventOption) {
|
||||
ec := trace.NewEventConfig(options...)
|
||||
|
||||
attrs := make([]any, len(ec.Attributes()))
|
||||
|
||||
attrs := make([]any, 2*len(ec.Attributes()))
|
||||
for i, v := range ec.Attributes() {
|
||||
attrs[i] = v
|
||||
attrs[2*i] = v.Key
|
||||
attrs[2*i+1] = v.Value.Emit()
|
||||
}
|
||||
fmt.Println(append([]any{name}, attrs...)...)
|
||||
logger.Info(name, attrs...)
|
||||
fmt.Println(append([]any{name}, attrs...)...)
|
||||
logger.Info(name, attrs...)
|
||||
|
||||
s.Span.AddEvent(name, options...)
|
||||
}
|
||||
|
||||
func Span(ctx context.Context, opts ...trace.SpanStartOption) (context.Context, trace.Span) {
|
||||
@ -233,8 +242,31 @@ func newMeterProvider(ctx context.Context, name string) (func(context.Context) e
|
||||
// )
|
||||
// otel.SetMeterProvider(meterProvider)
|
||||
|
||||
metricExporter, err := otelprom.New(
|
||||
otelprom.WithRegisterer(prometheus.DefaultRegisterer),
|
||||
|
||||
// OTEL default buckets assume you're using milliseconds. Substitute defaults
|
||||
// appropriate for units of seconds.
|
||||
otelprom.WithAggregationSelector(func(ik sdkmetric.InstrumentKind) sdkmetric.Aggregation {
|
||||
switch ik {
|
||||
case sdkmetric.InstrumentKindHistogram:
|
||||
return sdkmetric.AggregationExplicitBucketHistogram{
|
||||
Boundaries: prometheus.DefBuckets,
|
||||
NoMinMax: false,
|
||||
}
|
||||
default:
|
||||
return sdkmetric.DefaultAggregationSelector(ik)
|
||||
}
|
||||
}),
|
||||
)
|
||||
|
||||
p := sdkmetric.NewMeterProvider(
|
||||
sdkmetric.WithReader(metricExporter),
|
||||
)
|
||||
|
||||
otel.SetMeterProvider(p)
|
||||
http.Handle("/metrics", promhttp.Handler())
|
||||
return func(ctx context.Context) error { return nil }, nil
|
||||
return func(ctx context.Context) error { return nil }, err
|
||||
}
|
||||
|
||||
func newLoggerProvider(ctx context.Context, name string) (func(context.Context) error, error) {
|
||||
|
@ -7,6 +7,7 @@ import (
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
@ -24,12 +25,16 @@ const (
|
||||
TwoMinutes = 60
|
||||
)
|
||||
|
||||
func refreshLoop(c *console, app *appState) error {
|
||||
func feedRefreshProcessor(c *console, app *appState) error {
|
||||
ctx, span := otel.Span(c.Context)
|
||||
defer span.End()
|
||||
|
||||
sleeping_time, _ := otel.Meter().Int64Counter("xt_feed_sleep")
|
||||
|
||||
queue_size, _ := otel.Meter().Int64Gauge("xt_feed_queue_size")
|
||||
|
||||
f := NewHTTPFetcher()
|
||||
fetch, close := NewFuncPool(ctx, 25, f.Fetch)
|
||||
fetch, close := NewFuncPool(ctx, 40, f.Fetch)
|
||||
defer close()
|
||||
|
||||
db, err := app.DB(c)
|
||||
@ -54,29 +59,40 @@ func refreshLoop(c *console, app *appState) error {
|
||||
queue.Insert(&f)
|
||||
}
|
||||
}
|
||||
span.AddEvent("queue size", trace.WithAttributes(attribute.Int("size", int(queue.count))))
|
||||
span.AddEvent("queue", trace.WithAttributes(attribute.Int("size", int(queue.count))))
|
||||
queue_size.Record(ctx, int64(queue.count))
|
||||
|
||||
f := queue.ExtractMin()
|
||||
if f == nil {
|
||||
sleeping_time.Add(ctx, int64(TenMinutes))
|
||||
span.AddEvent("sleeping for ", trace.WithAttributes(attribute.Int("seconds", int(TenMinutes))))
|
||||
select {
|
||||
case <-time.After(TenMinutes * time.Second):
|
||||
case <-c.Done():
|
||||
return nil
|
||||
}
|
||||
span.End()
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
span.AddEvent("next", trace.WithAttributes(
|
||||
attribute.Int("size", int(queue.count)),
|
||||
attribute.String("uri", f.URI),
|
||||
attribute.String("scan on", f.LastScanOn.Time.Format(time.RFC3339)),
|
||||
attribute.String("last scan on", f.LastScanOn.Time.Format(time.RFC3339)),
|
||||
attribute.String("next scan on", f.NextScanOn.Time.Format(time.RFC3339)),
|
||||
))
|
||||
|
||||
if time.Until(f.NextScanOn.Time) > 2*time.Hour {
|
||||
until := time.Until(f.NextScanOn.Time)
|
||||
|
||||
if until > 2*time.Hour {
|
||||
span.AddEvent("too soon", trace.WithAttributes(attribute.String("uri", f.URI)))
|
||||
span.End()
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
sleeping_time.Add(ctx, until.Milliseconds())
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
@ -85,12 +101,11 @@ func refreshLoop(c *console, app *appState) error {
|
||||
attribute.Int("size", int(queue.count)),
|
||||
attribute.String("uri", f.URI),
|
||||
attribute.String("timeout", t.Format(time.RFC3339)),
|
||||
attribute.String("scan on", f.NextScanOn.Time.Format(time.RFC3339)),
|
||||
attribute.String("next scan on", f.NextScanOn.Time.Format(time.RFC3339)),
|
||||
))
|
||||
|
||||
fetch.Fetch(f)
|
||||
}
|
||||
|
||||
fetch.Fetch(f)
|
||||
}
|
||||
span.RecordError(ctx.Err())
|
||||
|
||||
@ -101,6 +116,10 @@ func processorLoop(ctx context.Context, db db, fetch *pool[*Feed, *Response]) {
|
||||
ctx, span := otel.Span(ctx)
|
||||
defer span.End()
|
||||
|
||||
process_in_total, _ := otel.Meter().Int64Counter("xt_processed_in_total")
|
||||
process_out_total, _ := otel.Meter().Int64Counter("xt_processed_out_total")
|
||||
twts_histogram, _ := otel.Meter().Float64Histogram("xt_twt1k_bucket")
|
||||
|
||||
for ctx.Err() == nil {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
@ -112,6 +131,7 @@ func processorLoop(ctx context.Context, db db, fetch *pool[*Feed, *Response]) {
|
||||
attribute.String("scan on", f.NextScanOn.Time.Format(time.RFC3339)),
|
||||
))
|
||||
|
||||
process_in_total.Add(ctx, 1)
|
||||
f.LastScanOn.Time = time.Now()
|
||||
f.LastScanOn.Valid = true
|
||||
err := res.err
|
||||
@ -153,6 +173,8 @@ func processorLoop(ctx context.Context, db db, fetch *pool[*Feed, *Response]) {
|
||||
rdr := io.TeeReader(res.Body, cpy)
|
||||
rdr = lextwt.TwtFixer(rdr)
|
||||
twtfile, err := lextwt.ParseFile(rdr, &types.Twter{Nick: f.Nick, URI: f.URI})
|
||||
cpy.Close()
|
||||
|
||||
if err != nil {
|
||||
span.RecordError(fmt.Errorf("%w: %w", ErrParseFailed, err))
|
||||
|
||||
@ -164,8 +186,10 @@ func processorLoop(ctx context.Context, db db, fetch *pool[*Feed, *Response]) {
|
||||
|
||||
continue
|
||||
}
|
||||
cpy.Close()
|
||||
span.AddEvent("parse complete", trace.WithAttributes(attribute.Int("count", twtfile.Twts().Len())))
|
||||
|
||||
count := twtfile.Twts().Len()
|
||||
span.AddEvent("parse complete", trace.WithAttributes(attribute.Int("count", count)))
|
||||
twts_histogram.Record(ctx, float64(count)/1000)
|
||||
|
||||
err = storeFeed(ctx, db, twtfile)
|
||||
if err != nil {
|
||||
@ -178,13 +202,40 @@ func processorLoop(ctx context.Context, db db, fetch *pool[*Feed, *Response]) {
|
||||
continue
|
||||
}
|
||||
|
||||
f.RefreshRate = TenMinutes
|
||||
f.RefreshRate = checkTemp(twtfile.Twts())
|
||||
f.LastError.String = ""
|
||||
|
||||
err = f.Save(ctx, db)
|
||||
span.RecordError(err)
|
||||
process_out_total.Add(ctx, 1)
|
||||
}
|
||||
}
|
||||
|
||||
span.RecordError(ctx.Err())
|
||||
}
|
||||
|
||||
func checkTemp(twts types.Twts) int {
|
||||
if len(twts) < 5 {
|
||||
return OneDay
|
||||
}
|
||||
sort.Sort(sort.Reverse(twts))
|
||||
|
||||
now := time.Now()
|
||||
|
||||
since_first := now.Sub(twts[0].Created())
|
||||
since_fifth := now.Sub(twts[4].Created())
|
||||
|
||||
if since_first < 2 * time.Hour || since_fifth < 8 * time.Hour {
|
||||
return TwoMinutes
|
||||
}
|
||||
|
||||
if since_first < 4 * time.Hour || since_fifth < 16 * time.Hour{
|
||||
return TenMinutes
|
||||
}
|
||||
|
||||
if since_first < 8 * time.Hour || since_fifth < 32 * time.Hour{
|
||||
return TenMinutes
|
||||
}
|
||||
|
||||
return OneDay
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user