diff --git a/Makefile b/Makefile index 02299ea..129615d 100644 --- a/Makefile +++ b/Makefile @@ -1,9 +1,14 @@ -export EV_DATA = mem: -export EV_HTTP = :8080 +export PATH:=$(shell go env GOPATH)/bin:$(PATH) +export EV_DATA=mem: +export EV_HTTP=:8080 +export EV_TRACE_SAMPLE=always -include local.mk run: gen - go run . +ifeq (, $(shell which air)) + go install github.com/cosmtrek/air@latest +endif + air test: go test -cover -race ./... diff --git a/internal/logz/init.go b/internal/logz/init.go index 9de0fcc..bfef13d 100644 --- a/internal/logz/init.go +++ b/internal/logz/init.go @@ -3,6 +3,7 @@ package logz import ( "context" "log" + "os" "go.uber.org/multierr" ) @@ -31,3 +32,11 @@ func Init(ctx context.Context, name string) (context.Context, func() error) { return multierr.Combine(errs...) } } + +func env(name, defaultValue string) string { + if v := os.Getenv(name); v != "" { + log.Println("# ", name, " = ", v) + return v + } + return defaultValue +} diff --git a/internal/logz/metric.go b/internal/logz/metric.go index e7047b6..b1df359 100644 --- a/internal/logz/metric.go +++ b/internal/logz/metric.go @@ -8,14 +8,12 @@ import ( "runtime/debug" "time" - // metricsExporter "github.com/logzio/go-metrics-sdk" "go.opentelemetry.io/contrib/instrumentation/runtime" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/exporters/prometheus" "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/metric/global" "go.opentelemetry.io/otel/sdk/metric/aggregator/histogram" - "go.opentelemetry.io/otel/sdk/metric/controller/basic" controller "go.opentelemetry.io/otel/sdk/metric/controller/basic" "go.opentelemetry.io/otel/sdk/metric/export/aggregation" processor "go.opentelemetry.io/otel/sdk/metric/processor/basic" @@ -61,7 +59,7 @@ func initMetrics(ctx context.Context, name string) (context.Context, func() erro aggregation.CumulativeTemporalitySelector(), processor.WithMemory(true), ), - basic.WithResource( + controller.WithResource( resource.NewWithAttributes( semconv.SchemaURL, attribute.String("app", name), diff --git a/internal/logz/tracer.go b/internal/logz/tracer.go index dc1485a..d1925c4 100644 --- a/internal/logz/tracer.go +++ b/internal/logz/tracer.go @@ -5,7 +5,6 @@ import ( "fmt" "log" "net/http" - "os" "runtime" "strconv" "time" @@ -54,6 +53,13 @@ func Span(ctx context.Context, opts ...trace.SpanStartOption) (context.Context, return ctx, span } +type SampleRate string + +const ( + SampleAlways SampleRate = "always" + SampleNever SampleRate = "never" +) + func initTracing(ctx context.Context, name string) (context.Context, func() error) { res, err := resource.New(ctx, resource.WithAttributes( @@ -76,15 +82,15 @@ func initTracing(ctx context.Context, name string) (context.Context, func() erro bsp := sdktrace.NewBatchSpanProcessor(traceExporter) var sample sdktrace.TracerProviderOption - sampleRate := env("LOGZIO_TRACE_SAMPLE", "always") + sampleRate := SampleRate(env("EV_TRACE_SAMPLE", string(SampleNever))) switch sampleRate { case "always": sample = sdktrace.WithSampler(sdktrace.AlwaysSample()) case "never": sample = sdktrace.WithSampler(sdktrace.NeverSample()) default: - if v, err := strconv.Atoi(sampleRate); err != nil { - sample = sdktrace.WithSampler(sdktrace.AlwaysSample()) + if v, err := strconv.Atoi(string(sampleRate)); err != nil { + sample = sdktrace.WithSampler(sdktrace.NeverSample()) } else { sample = sdktrace.WithSampler(sdktrace.TraceIDRatioBased(float64(v) * 0.01)) } @@ -122,13 +128,6 @@ func reverse[T any](s []T) { last-- } } -func env(name, defaultValue string) string { - if v := os.Getenv(name); v != "" { - log.Println("# ", name, " = ", v) - return v - } - return defaultValue -} func Htrace(h http.Handler, name string) http.Handler { return otelhttp.NewHandler(h, name) diff --git a/main.go b/main.go index eb42e13..e9c9834 100644 --- a/main.go +++ b/main.go @@ -2,7 +2,6 @@ package main import ( "context" - "fmt" "log" "net/http" "os" @@ -82,18 +81,9 @@ func run(ctx context.Context) error { mux.Handle("/", playground.Handler("GraphQL playground", "/gql")) mux.Handle("/gql", logz.Htrace(res.ChainMiddlewares(gql), "gql")) mux.Handle("/inbox/", logz.Htrace(http.StripPrefix("/inbox/", svc), "inbox")) + mux.Handle("/.well-known/salty/", logz.Htrace(svc, "lookup")) mux.Handle("/metrics", logz.PromHTTP(ctx)) - wk := http.HandlerFunc( - func(w http.ResponseWriter, r *http.Request) { - fmt.Fprintln(w, `{ - "endpoint": "https://ev.sour.is/inbox/01GA4Q3NDX4TPAZ2EZ8E92CQE6", - "key": "kex1pqwqatj6sge7qaqrsvk4u4yhue4x3vej8znetkwj6a5k0xds2fmqqe3plh" -}`) - }, - ) - mux.Handle("/.well-known/salty/0ce550020ce36a9932b286b141edd515d33c2b0f51c715445de89ae106345993.json", wk) - s.Handler = cors.AllowAll().Handler(mux) log.Print("Listen on ", s.Addr) diff --git a/pkg/msgbus/service.go b/pkg/msgbus/service.go index 402ecc7..968cee3 100644 --- a/pkg/msgbus/service.go +++ b/pkg/msgbus/service.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "encoding/json" + "errors" "fmt" "io" "net/http" @@ -13,6 +14,7 @@ import ( "github.com/gorilla/websocket" "github.com/sour-is/ev/internal/logz" + "github.com/sour-is/ev/pkg/domain" "github.com/sour-is/ev/pkg/es" "github.com/sour-is/ev/pkg/es/event" ) @@ -50,6 +52,11 @@ func (s *service) ServeHTTP(w http.ResponseWriter, r *http.Request) { s.websocket(w, r) return } + if strings.HasPrefix(r.URL.Path, "/.well-known/salty") { + s.getUser(w, r) + return + } + s.get(w, r) case http.MethodPost, http.MethodPut: s.post(w, r) @@ -108,6 +115,41 @@ func (s *service) get(w http.ResponseWriter, r *http.Request) { fmt.Fprintln(w, events[i]) } } +func (s *service) getUser(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + ctx, span := logz.Span(ctx) + defer span.End() + + addr := "saltyuser-" + strings.TrimPrefix(r.URL.Path, "/.well-known/salty/") + addr = strings.TrimSuffix(addr, ".json") + + span.AddEvent(fmt.Sprint("find ", addr)) + a, err := es.Update(ctx, s.es, addr, func(ctx context.Context, agg *domain.SaltyUser) error { return nil }) + switch { + case errors.Is(err, event.ErrShouldExist): + span.RecordError(err) + + w.WriteHeader(http.StatusNotFound) + return + case err != nil: + span.RecordError(err) + + w.WriteHeader(http.StatusInternalServerError) + return + } + + err = json.NewEncoder(w).Encode( + struct { + Endpoint string `json:"endpoint"` + Key string `json:"key"` + }{ + Endpoint: a.Inbox.String(), + Key: a.Pubkey.ID().String(), + }) + if err != nil { + span.RecordError(err) + } +} func (s *service) post(w http.ResponseWriter, r *http.Request) { ctx := r.Context() @@ -229,7 +271,7 @@ func (s *service) websocket(w http.ResponseWriter, r *http.Request) { es := s.es.EventStream() if es == nil { - span.AddEvent(fmt.Sprint("EventStore does not implement streaming")) + span.AddEvent("EventStore does not implement streaming") w.WriteHeader(http.StatusInternalServerError) return } @@ -242,11 +284,11 @@ func (s *service) websocket(w http.ResponseWriter, r *http.Request) { defer func() { ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) defer cancel() - span.AddEvent(fmt.Sprint("stop ws")) + span.AddEvent("stop ws") sub.Close(ctx) }() - span.AddEvent(fmt.Sprint("start ws")) + span.AddEvent("start ws") for sub.Recv(ctx) { events, err := sub.Events(ctx) if err != nil {