diff --git a/.gitignore b/.gitignore index 1949f0b..df69e5e 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,5 @@ __debug* feeds/ /xt .env +*.txt +*.txt.xz \ No newline at end of file diff --git a/app.go b/app.go index 1688f82..791c01b 100644 --- a/app.go +++ b/app.go @@ -99,7 +99,7 @@ func run(c *console) error { c.Context = ctx wg.Go(func() error { - return refreshLoop(c, app) + return feedRefreshProcessor(c, app) }) go httpServer(c, app) @@ -132,7 +132,7 @@ func (app *appState) DB(ctx context.Context) (db, error) { db := db{Params: make(map[string]string)} db.DB, err = otelsql.Open(app.args.dbtype, app.args.dbfile, otelsql.WithAttributes(semconv.DBSystemSqlite), - otelsql.WithDBName("mydb")) + otelsql.WithDBName("xt")) if err != nil { return db, err } diff --git a/cmd/load/main.go b/cmd/load/main.go new file mode 100644 index 0000000..4ba59ef --- /dev/null +++ b/cmd/load/main.go @@ -0,0 +1,92 @@ +package main + +import ( + "context" + "database/sql" + "fmt" + "hash/fnv" + "iter" + "os" + + "github.com/oklog/ulid/v2" + "github.com/uptrace/opentelemetry-go-extra/otelsql" + "go.yarn.social/lextwt" + "go.yarn.social/types" +) + +func main() { + in := os.Stdin + if len(os.Args) != 2 { + fmt.Fprint(os.Stderr, "usage: ", os.Args[0], "[db file]") + } + + db, err := DB(context.Background(), os.Args[1]) + if err != nil { + panic(err) + } + _ = db + + for line := range lextwt.IterRegistry(in) { + _ = line + } +} + +const MaxVariableNumber = 32766 + +func DB(ctx context.Context, cnx string) (*sql.DB, error) { + // return sql.Open(app.args.dbtype, app.args.dbfile) + + db, err := otelsql.Open("sqlite", cnx) + if err != nil { + return db, err + } + + return db, err +} + +func makeULID(twt types.Twt) ulid.ULID { + h64 := fnv.New64a() + h16 := fnv.New32a() + text := []byte(fmt.Sprintf("%+l", twt)) + b := make([]byte, 10) + copy(b, h16.Sum(text)[:2]) + copy(b[2:], h64.Sum(text)) + u := ulid.ULID{} + u.SetTime(ulid.Timestamp(twt.Created())) + u.SetEntropy(b) + + return u +} + +func chunk(args []any, qry func(int) (string, int), maxArgs int) iter.Seq2[string, []any] { + _, size := qry(1) + itemsPerIter := maxArgs / size + + if len(args) < size { + return func(yield func(string, []any) bool) {} + } + + if len(args) < maxArgs { + return func(yield func(string, []any) bool) { + query, _ := qry(len(args) / size) + yield(query, args) + } + } + + return func(yield func(string, []any) bool) { + for len(args) > 0 { + if len(args) > maxArgs { + query, size := qry(itemsPerIter) + if !yield(query, args[:size]) { + return + } + args = args[size:] + continue + } + + query, _ := qry(len(args) / size) + yield(query, args) + return + } + } +} diff --git a/fetcher.go b/fetcher.go index 58e0cf1..1ec41a0 100644 --- a/fetcher.go +++ b/fetcher.go @@ -9,6 +9,8 @@ import ( "sync" "time" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" "go.sour.is/xt/internal/otel" ) @@ -50,10 +52,18 @@ func (r *Response) LastModified() time.Time { type httpFetcher struct { client *http.Client + + m_fetch_status metric.Int64Counter + m_fetch_second metric.Float64Histogram } func NewHTTPFetcher() *httpFetcher { + fetch_total, _ := otel.Meter().Int64Counter("xt_fetch_status_total") + fetch_second, _ := otel.Meter().Float64Histogram("xt_fetch_seconds") + return &httpFetcher{ + m_fetch_status: fetch_total, + m_fetch_second: fetch_second, client: &http.Client{ Transport: &http.Transport{ Proxy: http.ProxyFromEnvironment, @@ -75,6 +85,12 @@ func (f *httpFetcher) Fetch(ctx context.Context, request *Feed) *Response { ctx, span := otel.Span(ctx) defer span.End() + start := time.Now() + defer func() { + since := time.Since(start) + f.m_fetch_second.Record(ctx, since.Seconds()) + }() + defer fmt.Println("fetch done", request.URI) response := &Response{ Request: request, @@ -100,17 +116,24 @@ func (f *httpFetcher) Fetch(ctx context.Context, request *Feed) *Response { response.Response = res switch res.StatusCode { case 200: + f.m_fetch_status.Add(ctx, 1, metric.WithAttributes(attribute.String("status", "ok"))) case 304: + f.m_fetch_status.Add(ctx, 1, metric.WithAttributes(attribute.String("status", "not_modified"))) response.err = fmt.Errorf("%w: %s", ErrUnmodified, res.Status) case 400, 406, 429, 502, 503: + f.m_fetch_status.Add(ctx, 1, metric.WithAttributes(attribute.String("status", "temp_fail"))) response.err = fmt.Errorf("%w: %s", ErrTemporarilyDead, res.Status) case 403, 404, 410: + f.m_fetch_status.Add(ctx, 1, metric.WithAttributes(attribute.String("status", "perm_fail"))) + response.err = fmt.Errorf("%w: %s", ErrPermanentlyDead, res.Status) default: + f.m_fetch_status.Add(ctx, 1, metric.WithAttributes(attribute.Int("status", res.StatusCode))) + response.err = errors.New(res.Status) } @@ -134,7 +157,7 @@ func NewFuncPool[IN, OUT any]( var wg sync.WaitGroup in := make(chan IN, size) - out := make(chan OUT) + out := make(chan OUT, size) wg.Add(size) for range size { @@ -159,10 +182,6 @@ func NewFuncPool[IN, OUT any]( return case out <- r: span.AddEvent("sent queue") - case <-time.After(20 * time.Second): - fmt.Println("GOT STUCK", request) - span.AddEvent("GOT STUCK") - cancel() } } }() diff --git a/go.mod b/go.mod index 05e88af..32d1163 100644 --- a/go.mod +++ b/go.mod @@ -16,7 +16,7 @@ require ( github.com/klauspost/compress v1.17.9 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/prometheus/client_model v0.6.1 // indirect - github.com/prometheus/common v0.55.0 // indirect + github.com/prometheus/common v0.62.0 // indirect github.com/prometheus/procfs v0.15.1 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.34.0 // indirect go.opentelemetry.io/proto/otlp v1.5.0 // indirect @@ -24,7 +24,7 @@ require ( golang.org/x/text v0.22.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20250115164207-1a7da9e5054f // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20250115164207-1a7da9e5054f // indirect - google.golang.org/protobuf v1.36.3 // indirect + google.golang.org/protobuf v1.36.5 // indirect ) require ( @@ -33,8 +33,8 @@ require ( github.com/google/uuid v1.6.0 // indirect go.opentelemetry.io/auto/sdk v1.1.0 // indirect go.opentelemetry.io/otel/log v0.10.0 - go.opentelemetry.io/otel/metric v1.34.0 - go.opentelemetry.io/otel/trace v1.34.0 + go.opentelemetry.io/otel/metric v1.35.0 + go.opentelemetry.io/otel/trace v1.35.0 ) require ( @@ -47,14 +47,15 @@ require ( github.com/uptrace/opentelemetry-go-extra/otelsql v0.3.2 github.com/writeas/go-strip-markdown/v2 v2.1.1 // indirect go.opentelemetry.io/contrib/bridges/otelslog v0.9.0 - go.opentelemetry.io/otel v1.34.0 + go.opentelemetry.io/otel v1.35.0 go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.34.0 + go.opentelemetry.io/otel/exporters/prometheus v0.57.0 go.opentelemetry.io/otel/exporters/stdout/stdoutlog v0.10.0 go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.34.0 go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.34.0 - go.opentelemetry.io/otel/sdk v1.34.0 + go.opentelemetry.io/otel/sdk v1.35.0 go.opentelemetry.io/otel/sdk/log v0.10.0 - go.opentelemetry.io/otel/sdk/metric v1.34.0 + go.opentelemetry.io/otel/sdk/metric v1.35.0 go.yarn.social/types v0.0.0-20250108134258-ed75fa653ede golang.org/x/crypto v0.33.0 // indirect golang.org/x/sync v0.11.0 diff --git a/go.sum b/go.sum index 247ae07..6199cb5 100644 --- a/go.sum +++ b/go.sum @@ -48,6 +48,8 @@ github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY= github.com/prometheus/common v0.55.0 h1:KEi6DK7lXW/m7Ig5i47x0vRzuBsHuvJdi5ee6Y3G1dc= github.com/prometheus/common v0.55.0/go.mod h1:2SECS4xJG1kd8XF9IcM1gMX6510RAEL65zxzNImwdc8= +github.com/prometheus/common v0.62.0 h1:xasJaQlnWAeyHdUBeGjXmutelfJHWMRr+Fg4QszZ2Io= +github.com/prometheus/common v0.62.0/go.mod h1:vyBcEuLSvWos9B1+CyL7JZ2up+uFzXhkqml0W5zIY1I= github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc= github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= @@ -66,12 +68,16 @@ go.opentelemetry.io/contrib/bridges/otelslog v0.9.0 h1:N+78eXSlu09kii5nkiM+01Ybt go.opentelemetry.io/contrib/bridges/otelslog v0.9.0/go.mod h1:/2KhfLAhtQpgnhIk1f+dftA3fuuMcZjiz//Dc9yfaEs= go.opentelemetry.io/otel v1.34.0 h1:zRLXxLCgL1WyKsPVrgbSdMN4c0FMkDAskSTQP+0hdUY= go.opentelemetry.io/otel v1.34.0/go.mod h1:OWFPOQ+h4G8xpyjgqo4SxJYdDQ/qmRH+wivy7zzx9oI= +go.opentelemetry.io/otel v1.35.0 h1:xKWKPxrxB6OtMCbmMY021CqC45J+3Onta9MqjhnusiQ= +go.opentelemetry.io/otel v1.35.0/go.mod h1:UEqy8Zp11hpkUrL73gSlELM0DupHoiq72dR+Zqel/+Y= go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp v0.6.0 h1:QSKmLBzbFULSyHzOdO9JsN9lpE4zkrz1byYGmJecdVE= go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp v0.6.0/go.mod h1:sTQ/NH8Yrirf0sJ5rWqVu+oT82i4zL9FaF6rWcqnptM= go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.34.0 h1:OeNbIYk/2C15ckl7glBlOBp5+WlYsOElzTNmiPW/x60= go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.34.0/go.mod h1:7Bept48yIeqxP2OZ9/AqIpYS94h2or0aB4FypJTc8ZM= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.34.0 h1:tgJ0uaNS4c98WRNUEx5U3aDlrDOI5Rs+1Vifcw4DJ8U= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.34.0/go.mod h1:U7HYyW0zt/a9x5J1Kjs+r1f/d4ZHnYFclhYY2+YbeoE= +go.opentelemetry.io/otel/exporters/prometheus v0.57.0 h1:AHh/lAP1BHrY5gBwk8ncc25FXWm/gmmY3BX258z5nuk= +go.opentelemetry.io/otel/exporters/prometheus v0.57.0/go.mod h1:QpFWz1QxqevfjwzYdbMb4Y1NnlJvqSGwyuU0B4iuc9c= go.opentelemetry.io/otel/exporters/stdout/stdoutlog v0.10.0 h1:GKCEAZLEpEf78cUvudQdTg0aET2ObOZRB2HtXA0qPAI= go.opentelemetry.io/otel/exporters/stdout/stdoutlog v0.10.0/go.mod h1:9/zqSWLCmHT/9Jo6fYeUDRRogOLL60ABLsHWS99lF8s= go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.34.0 h1:czJDQwFrMbOr9Kk+BPo1y8WZIIFIK58SA1kykuVeiOU= @@ -82,14 +88,22 @@ go.opentelemetry.io/otel/log v0.10.0 h1:1CXmspaRITvFcjA4kyVszuG4HjA61fPDxMb7q3Bu go.opentelemetry.io/otel/log v0.10.0/go.mod h1:PbVdm9bXKku/gL0oFfUF4wwsQsOPlpo4VEqjvxih+FM= go.opentelemetry.io/otel/metric v1.34.0 h1:+eTR3U0MyfWjRDhmFMxe2SsW64QrZ84AOhvqS7Y+PoQ= go.opentelemetry.io/otel/metric v1.34.0/go.mod h1:CEDrp0fy2D0MvkXE+dPV7cMi8tWZwX3dmaIhwPOaqHE= +go.opentelemetry.io/otel/metric v1.35.0 h1:0znxYu2SNyuMSQT4Y9WDWej0VpcsxkuklLa4/siN90M= +go.opentelemetry.io/otel/metric v1.35.0/go.mod h1:nKVFgxBZ2fReX6IlyW28MgZojkoAkJGaE8CpgeAU3oE= go.opentelemetry.io/otel/sdk v1.34.0 h1:95zS4k/2GOy069d321O8jWgYsW3MzVV+KuSPKp7Wr1A= go.opentelemetry.io/otel/sdk v1.34.0/go.mod h1:0e/pNiaMAqaykJGKbi+tSjWfNNHMTxoC9qANsCzbyxU= +go.opentelemetry.io/otel/sdk v1.35.0 h1:iPctf8iprVySXSKJffSS79eOjl9pvxV9ZqOWT0QejKY= +go.opentelemetry.io/otel/sdk v1.35.0/go.mod h1:+ga1bZliga3DxJ3CQGg3updiaAJoNECOgJREo9KHGQg= go.opentelemetry.io/otel/sdk/log v0.10.0 h1:lR4teQGWfeDVGoute6l0Ou+RpFqQ9vaPdrNJlST0bvw= go.opentelemetry.io/otel/sdk/log v0.10.0/go.mod h1:A+V1UTWREhWAittaQEG4bYm4gAZa6xnvVu+xKrIRkzo= go.opentelemetry.io/otel/sdk/metric v1.34.0 h1:5CeK9ujjbFVL5c1PhLuStg1wxA7vQv7ce1EK0Gyvahk= go.opentelemetry.io/otel/sdk/metric v1.34.0/go.mod h1:jQ/r8Ze28zRKoNRdkjCZxfs6YvBTG1+YIqyFVFYec5w= +go.opentelemetry.io/otel/sdk/metric v1.35.0 h1:1RriWBmCKgkeHEhM7a2uMjMUfP7MsOF5JpUCaEqEI9o= +go.opentelemetry.io/otel/sdk/metric v1.35.0/go.mod h1:is6XYCUMpcKi+ZsOvfluY5YstFnhW0BidkR+gL+qN+w= go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC8mh/k= go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE= +go.opentelemetry.io/otel/trace v1.35.0 h1:dPpEfJu1sDIqruz7BHFG3c7528f6ddfSWfFDVt/xgMs= +go.opentelemetry.io/otel/trace v1.35.0/go.mod h1:WUk7DtFp1Aw2MkvqGdwiXYDZZNvA/1J8o6xRXLrIkyc= go.opentelemetry.io/proto/otlp v1.5.0 h1:xJvq7gMzB31/d406fB8U5CBdyQGw4P399D1aQWU/3i4= go.opentelemetry.io/proto/otlp v1.5.0/go.mod h1:keN8WnHxOy8PG0rQZjJJ5A2ebUoafqWp0eVQ4yIXvJ4= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= @@ -117,6 +131,8 @@ google.golang.org/grpc v1.70.0 h1:pWFv03aZoHzlRKHWicjsZytKAiYCtNS0dHbXnIdq7jQ= google.golang.org/grpc v1.70.0/go.mod h1:ofIJqVKDXx/JiXrwr2IG4/zwdH9txy3IlF40RmcJSQw= google.golang.org/protobuf v1.36.3 h1:82DV7MYdb8anAVi3qge1wSnMDrnKK7ebr+I0hHRN1BU= google.golang.org/protobuf v1.36.3/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= +google.golang.org/protobuf v1.36.5 h1:tPhr+woSbjfYvY6/GPufUoYizxw1cF/yFoxJ2fmpwlM= +google.golang.org/protobuf v1.36.5/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/go.work.sum b/go.work.sum index cc7321d..063b17d 100644 --- a/go.work.sum +++ b/go.work.sum @@ -82,6 +82,7 @@ github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/mock v1.1.1 h1:G5FRp8JnTd7RQH5kemVNlMeyXQAztQ3mOWV95KxsXH8= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/go-dap v0.12.0 h1:rVcjv3SyMIrpaOoTAdFDyHs99CwVOItIJGKLQFQhNeM= github.com/google/go-dap v0.12.0/go.mod h1:tNjCASCm5cqePi/RVXXWEVqtnNLV1KTWtYOqu6rZNzc= github.com/google/gofuzz v1.0.0 h1:A8PeW59pxE9IoFRqBp37U+mSNaQoZ46F1f0f863XSXw= diff --git a/internal/otel/otel.go b/internal/otel/otel.go index 99fde23..6162517 100644 --- a/internal/otel/otel.go +++ b/internal/otel/otel.go @@ -10,21 +10,25 @@ import ( "runtime" "time" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" "go.opentelemetry.io/contrib/bridges/otelslog" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" + otelprom "go.opentelemetry.io/otel/exporters/prometheus" "go.opentelemetry.io/otel/exporters/stdout/stdoutlog" "go.opentelemetry.io/otel/exporters/stdout/stdouttrace" - "go.opentelemetry.io/otel/log/global" + "go.opentelemetry.io/otel/log/global" "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/propagation" "go.opentelemetry.io/otel/sdk/log" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/resource" sdktrace "go.opentelemetry.io/otel/sdk/trace" semconv "go.opentelemetry.io/otel/semconv/v1.26.0" + "go.opentelemetry.io/otel/trace" ) @@ -43,6 +47,7 @@ func Init(ctx context.Context, name string) (shutdown func(context.Context) erro } func Meter() metric.Meter { return meter } + // func Error(err error, v ...any) { // if err == nil { // return @@ -52,33 +57,37 @@ func Meter() metric.Meter { return meter } // } // func Info(msg string, v ...any) { fmt.Println(append([]any{msg}, v...)); logger.Info(msg, v...) } -type spanny struct{ +type spanny struct { trace.Span } -func (s *spanny) RecordError(err error, options ...trace.EventOption) { + +func (s *spanny) RecordError(err error, options ...trace.EventOption) { if err == nil { return } ec := trace.NewEventConfig(options...) - + attrs := make([]any, len(ec.Attributes())) for i, v := range ec.Attributes() { attrs[i] = v } - fmt.Println(append([]any{"ERR:", err}, attrs...)...) - logger.Error(err.Error(), attrs...) + fmt.Println(append([]any{"ERR:", err}, attrs...)...) + logger.Error(err.Error(), attrs...) s.Span.RecordError(err, options...) } -func (s *spanny) AddEvent(name string, options ...trace.EventOption) { +func (s *spanny) AddEvent(name string, options ...trace.EventOption) { ec := trace.NewEventConfig(options...) - - attrs := make([]any, len(ec.Attributes())) + + attrs := make([]any, 2*len(ec.Attributes())) for i, v := range ec.Attributes() { - attrs[i] = v + attrs[2*i] = v.Key + attrs[2*i+1] = v.Value.Emit() } - fmt.Println(append([]any{name}, attrs...)...) - logger.Info(name, attrs...) + fmt.Println(append([]any{name}, attrs...)...) + logger.Info(name, attrs...) + + s.Span.AddEvent(name, options...) } func Span(ctx context.Context, opts ...trace.SpanStartOption) (context.Context, trace.Span) { @@ -233,8 +242,31 @@ func newMeterProvider(ctx context.Context, name string) (func(context.Context) e // ) // otel.SetMeterProvider(meterProvider) + metricExporter, err := otelprom.New( + otelprom.WithRegisterer(prometheus.DefaultRegisterer), + + // OTEL default buckets assume you're using milliseconds. Substitute defaults + // appropriate for units of seconds. + otelprom.WithAggregationSelector(func(ik sdkmetric.InstrumentKind) sdkmetric.Aggregation { + switch ik { + case sdkmetric.InstrumentKindHistogram: + return sdkmetric.AggregationExplicitBucketHistogram{ + Boundaries: prometheus.DefBuckets, + NoMinMax: false, + } + default: + return sdkmetric.DefaultAggregationSelector(ik) + } + }), + ) + + p := sdkmetric.NewMeterProvider( + sdkmetric.WithReader(metricExporter), + ) + + otel.SetMeterProvider(p) http.Handle("/metrics", promhttp.Handler()) - return func(ctx context.Context) error { return nil }, nil + return func(ctx context.Context) error { return nil }, err } func newLoggerProvider(ctx context.Context, name string) (func(context.Context) error, error) { diff --git a/refresh-loop.go b/refresh-loop.go index 2625c53..c9d4b25 100644 --- a/refresh-loop.go +++ b/refresh-loop.go @@ -7,6 +7,7 @@ import ( "io" "os" "path/filepath" + "sort" "time" "go.opentelemetry.io/otel/attribute" @@ -24,12 +25,16 @@ const ( TwoMinutes = 60 ) -func refreshLoop(c *console, app *appState) error { +func feedRefreshProcessor(c *console, app *appState) error { ctx, span := otel.Span(c.Context) defer span.End() + sleeping_time, _ := otel.Meter().Int64Counter("xt_feed_sleep") + + queue_size, _ := otel.Meter().Int64Gauge("xt_feed_queue_size") + f := NewHTTPFetcher() - fetch, close := NewFuncPool(ctx, 25, f.Fetch) + fetch, close := NewFuncPool(ctx, 40, f.Fetch) defer close() db, err := app.DB(c) @@ -54,29 +59,40 @@ func refreshLoop(c *console, app *appState) error { queue.Insert(&f) } } - span.AddEvent("queue size", trace.WithAttributes(attribute.Int("size", int(queue.count)))) + span.AddEvent("queue", trace.WithAttributes(attribute.Int("size", int(queue.count)))) + queue_size.Record(ctx, int64(queue.count)) f := queue.ExtractMin() if f == nil { + sleeping_time.Add(ctx, int64(TenMinutes)) span.AddEvent("sleeping for ", trace.WithAttributes(attribute.Int("seconds", int(TenMinutes)))) select { case <-time.After(TenMinutes * time.Second): case <-c.Done(): return nil } + span.End() + continue } span.AddEvent("next", trace.WithAttributes( attribute.Int("size", int(queue.count)), attribute.String("uri", f.URI), - attribute.String("scan on", f.LastScanOn.Time.Format(time.RFC3339)), + attribute.String("last scan on", f.LastScanOn.Time.Format(time.RFC3339)), + attribute.String("next scan on", f.NextScanOn.Time.Format(time.RFC3339)), )) - if time.Until(f.NextScanOn.Time) > 2*time.Hour { + until := time.Until(f.NextScanOn.Time) + + if until > 2*time.Hour { span.AddEvent("too soon", trace.WithAttributes(attribute.String("uri", f.URI))) + span.End() + continue } + + sleeping_time.Add(ctx, until.Milliseconds()) select { case <-ctx.Done(): return nil @@ -85,12 +101,11 @@ func refreshLoop(c *console, app *appState) error { attribute.Int("size", int(queue.count)), attribute.String("uri", f.URI), attribute.String("timeout", t.Format(time.RFC3339)), - attribute.String("scan on", f.NextScanOn.Time.Format(time.RFC3339)), + attribute.String("next scan on", f.NextScanOn.Time.Format(time.RFC3339)), )) - - fetch.Fetch(f) } + fetch.Fetch(f) } span.RecordError(ctx.Err()) @@ -101,6 +116,10 @@ func processorLoop(ctx context.Context, db db, fetch *pool[*Feed, *Response]) { ctx, span := otel.Span(ctx) defer span.End() + process_in_total, _ := otel.Meter().Int64Counter("xt_processed_in_total") + process_out_total, _ := otel.Meter().Int64Counter("xt_processed_out_total") + twts_histogram, _ := otel.Meter().Float64Histogram("xt_twt1k_bucket") + for ctx.Err() == nil { select { case <-ctx.Done(): @@ -112,6 +131,7 @@ func processorLoop(ctx context.Context, db db, fetch *pool[*Feed, *Response]) { attribute.String("scan on", f.NextScanOn.Time.Format(time.RFC3339)), )) + process_in_total.Add(ctx, 1) f.LastScanOn.Time = time.Now() f.LastScanOn.Valid = true err := res.err @@ -153,6 +173,8 @@ func processorLoop(ctx context.Context, db db, fetch *pool[*Feed, *Response]) { rdr := io.TeeReader(res.Body, cpy) rdr = lextwt.TwtFixer(rdr) twtfile, err := lextwt.ParseFile(rdr, &types.Twter{Nick: f.Nick, URI: f.URI}) + cpy.Close() + if err != nil { span.RecordError(fmt.Errorf("%w: %w", ErrParseFailed, err)) @@ -164,8 +186,10 @@ func processorLoop(ctx context.Context, db db, fetch *pool[*Feed, *Response]) { continue } - cpy.Close() - span.AddEvent("parse complete", trace.WithAttributes(attribute.Int("count", twtfile.Twts().Len()))) + + count := twtfile.Twts().Len() + span.AddEvent("parse complete", trace.WithAttributes(attribute.Int("count", count))) + twts_histogram.Record(ctx, float64(count)/1000) err = storeFeed(ctx, db, twtfile) if err != nil { @@ -178,13 +202,40 @@ func processorLoop(ctx context.Context, db db, fetch *pool[*Feed, *Response]) { continue } - f.RefreshRate = TenMinutes + f.RefreshRate = checkTemp(twtfile.Twts()) f.LastError.String = "" err = f.Save(ctx, db) span.RecordError(err) + process_out_total.Add(ctx, 1) } } span.RecordError(ctx.Err()) } + +func checkTemp(twts types.Twts) int { + if len(twts) < 5 { + return OneDay + } + sort.Sort(sort.Reverse(twts)) + + now := time.Now() + + since_first := now.Sub(twts[0].Created()) + since_fifth := now.Sub(twts[4].Created()) + + if since_first < 2 * time.Hour || since_fifth < 8 * time.Hour { + return TwoMinutes + } + + if since_first < 4 * time.Hour || since_fifth < 16 * time.Hour{ + return TenMinutes + } + + if since_first < 8 * time.Hour || since_fifth < 32 * time.Hour{ + return TenMinutes + } + + return OneDay +}