feat: add well known for user lookup
This commit is contained in:
parent
353b15ce61
commit
01285ebc9a
7
Makefile
7
Makefile
|
@ -1,9 +1,14 @@
|
||||||
|
export PATH:=$(shell go env GOPATH)/bin:$(PATH)
|
||||||
export EV_DATA=mem:
|
export EV_DATA=mem:
|
||||||
export EV_HTTP=:8080
|
export EV_HTTP=:8080
|
||||||
|
export EV_TRACE_SAMPLE=always
|
||||||
-include local.mk
|
-include local.mk
|
||||||
|
|
||||||
run: gen
|
run: gen
|
||||||
go run .
|
ifeq (, $(shell which air))
|
||||||
|
go install github.com/cosmtrek/air@latest
|
||||||
|
endif
|
||||||
|
air
|
||||||
test:
|
test:
|
||||||
go test -cover -race ./...
|
go test -cover -race ./...
|
||||||
|
|
||||||
|
|
|
@ -3,6 +3,7 @@ package logz
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"log"
|
"log"
|
||||||
|
"os"
|
||||||
|
|
||||||
"go.uber.org/multierr"
|
"go.uber.org/multierr"
|
||||||
)
|
)
|
||||||
|
@ -31,3 +32,11 @@ func Init(ctx context.Context, name string) (context.Context, func() error) {
|
||||||
return multierr.Combine(errs...)
|
return multierr.Combine(errs...)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func env(name, defaultValue string) string {
|
||||||
|
if v := os.Getenv(name); v != "" {
|
||||||
|
log.Println("# ", name, " = ", v)
|
||||||
|
return v
|
||||||
|
}
|
||||||
|
return defaultValue
|
||||||
|
}
|
||||||
|
|
|
@ -8,14 +8,12 @@ import (
|
||||||
"runtime/debug"
|
"runtime/debug"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
// metricsExporter "github.com/logzio/go-metrics-sdk"
|
|
||||||
"go.opentelemetry.io/contrib/instrumentation/runtime"
|
"go.opentelemetry.io/contrib/instrumentation/runtime"
|
||||||
"go.opentelemetry.io/otel/attribute"
|
"go.opentelemetry.io/otel/attribute"
|
||||||
"go.opentelemetry.io/otel/exporters/prometheus"
|
"go.opentelemetry.io/otel/exporters/prometheus"
|
||||||
"go.opentelemetry.io/otel/metric"
|
"go.opentelemetry.io/otel/metric"
|
||||||
"go.opentelemetry.io/otel/metric/global"
|
"go.opentelemetry.io/otel/metric/global"
|
||||||
"go.opentelemetry.io/otel/sdk/metric/aggregator/histogram"
|
"go.opentelemetry.io/otel/sdk/metric/aggregator/histogram"
|
||||||
"go.opentelemetry.io/otel/sdk/metric/controller/basic"
|
|
||||||
controller "go.opentelemetry.io/otel/sdk/metric/controller/basic"
|
controller "go.opentelemetry.io/otel/sdk/metric/controller/basic"
|
||||||
"go.opentelemetry.io/otel/sdk/metric/export/aggregation"
|
"go.opentelemetry.io/otel/sdk/metric/export/aggregation"
|
||||||
processor "go.opentelemetry.io/otel/sdk/metric/processor/basic"
|
processor "go.opentelemetry.io/otel/sdk/metric/processor/basic"
|
||||||
|
@ -61,7 +59,7 @@ func initMetrics(ctx context.Context, name string) (context.Context, func() erro
|
||||||
aggregation.CumulativeTemporalitySelector(),
|
aggregation.CumulativeTemporalitySelector(),
|
||||||
processor.WithMemory(true),
|
processor.WithMemory(true),
|
||||||
),
|
),
|
||||||
basic.WithResource(
|
controller.WithResource(
|
||||||
resource.NewWithAttributes(
|
resource.NewWithAttributes(
|
||||||
semconv.SchemaURL,
|
semconv.SchemaURL,
|
||||||
attribute.String("app", name),
|
attribute.String("app", name),
|
||||||
|
|
|
@ -5,7 +5,6 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
|
||||||
"runtime"
|
"runtime"
|
||||||
"strconv"
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
|
@ -54,6 +53,13 @@ func Span(ctx context.Context, opts ...trace.SpanStartOption) (context.Context,
|
||||||
return ctx, span
|
return ctx, span
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type SampleRate string
|
||||||
|
|
||||||
|
const (
|
||||||
|
SampleAlways SampleRate = "always"
|
||||||
|
SampleNever SampleRate = "never"
|
||||||
|
)
|
||||||
|
|
||||||
func initTracing(ctx context.Context, name string) (context.Context, func() error) {
|
func initTracing(ctx context.Context, name string) (context.Context, func() error) {
|
||||||
res, err := resource.New(ctx,
|
res, err := resource.New(ctx,
|
||||||
resource.WithAttributes(
|
resource.WithAttributes(
|
||||||
|
@ -76,15 +82,15 @@ func initTracing(ctx context.Context, name string) (context.Context, func() erro
|
||||||
bsp := sdktrace.NewBatchSpanProcessor(traceExporter)
|
bsp := sdktrace.NewBatchSpanProcessor(traceExporter)
|
||||||
|
|
||||||
var sample sdktrace.TracerProviderOption
|
var sample sdktrace.TracerProviderOption
|
||||||
sampleRate := env("LOGZIO_TRACE_SAMPLE", "always")
|
sampleRate := SampleRate(env("EV_TRACE_SAMPLE", string(SampleNever)))
|
||||||
switch sampleRate {
|
switch sampleRate {
|
||||||
case "always":
|
case "always":
|
||||||
sample = sdktrace.WithSampler(sdktrace.AlwaysSample())
|
sample = sdktrace.WithSampler(sdktrace.AlwaysSample())
|
||||||
case "never":
|
case "never":
|
||||||
sample = sdktrace.WithSampler(sdktrace.NeverSample())
|
sample = sdktrace.WithSampler(sdktrace.NeverSample())
|
||||||
default:
|
default:
|
||||||
if v, err := strconv.Atoi(sampleRate); err != nil {
|
if v, err := strconv.Atoi(string(sampleRate)); err != nil {
|
||||||
sample = sdktrace.WithSampler(sdktrace.AlwaysSample())
|
sample = sdktrace.WithSampler(sdktrace.NeverSample())
|
||||||
} else {
|
} else {
|
||||||
sample = sdktrace.WithSampler(sdktrace.TraceIDRatioBased(float64(v) * 0.01))
|
sample = sdktrace.WithSampler(sdktrace.TraceIDRatioBased(float64(v) * 0.01))
|
||||||
}
|
}
|
||||||
|
@ -122,13 +128,6 @@ func reverse[T any](s []T) {
|
||||||
last--
|
last--
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
func env(name, defaultValue string) string {
|
|
||||||
if v := os.Getenv(name); v != "" {
|
|
||||||
log.Println("# ", name, " = ", v)
|
|
||||||
return v
|
|
||||||
}
|
|
||||||
return defaultValue
|
|
||||||
}
|
|
||||||
|
|
||||||
func Htrace(h http.Handler, name string) http.Handler {
|
func Htrace(h http.Handler, name string) http.Handler {
|
||||||
return otelhttp.NewHandler(h, name)
|
return otelhttp.NewHandler(h, name)
|
||||||
|
|
12
main.go
12
main.go
|
@ -2,7 +2,6 @@ package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
|
||||||
"log"
|
"log"
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
|
@ -82,18 +81,9 @@ func run(ctx context.Context) error {
|
||||||
mux.Handle("/", playground.Handler("GraphQL playground", "/gql"))
|
mux.Handle("/", playground.Handler("GraphQL playground", "/gql"))
|
||||||
mux.Handle("/gql", logz.Htrace(res.ChainMiddlewares(gql), "gql"))
|
mux.Handle("/gql", logz.Htrace(res.ChainMiddlewares(gql), "gql"))
|
||||||
mux.Handle("/inbox/", logz.Htrace(http.StripPrefix("/inbox/", svc), "inbox"))
|
mux.Handle("/inbox/", logz.Htrace(http.StripPrefix("/inbox/", svc), "inbox"))
|
||||||
|
mux.Handle("/.well-known/salty/", logz.Htrace(svc, "lookup"))
|
||||||
mux.Handle("/metrics", logz.PromHTTP(ctx))
|
mux.Handle("/metrics", logz.PromHTTP(ctx))
|
||||||
|
|
||||||
wk := http.HandlerFunc(
|
|
||||||
func(w http.ResponseWriter, r *http.Request) {
|
|
||||||
fmt.Fprintln(w, `{
|
|
||||||
"endpoint": "https://ev.sour.is/inbox/01GA4Q3NDX4TPAZ2EZ8E92CQE6",
|
|
||||||
"key": "kex1pqwqatj6sge7qaqrsvk4u4yhue4x3vej8znetkwj6a5k0xds2fmqqe3plh"
|
|
||||||
}`)
|
|
||||||
},
|
|
||||||
)
|
|
||||||
mux.Handle("/.well-known/salty/0ce550020ce36a9932b286b141edd515d33c2b0f51c715445de89ae106345993.json", wk)
|
|
||||||
|
|
||||||
s.Handler = cors.AllowAll().Handler(mux)
|
s.Handler = cors.AllowAll().Handler(mux)
|
||||||
|
|
||||||
log.Print("Listen on ", s.Addr)
|
log.Print("Listen on ", s.Addr)
|
||||||
|
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
@ -13,6 +14,7 @@ import (
|
||||||
|
|
||||||
"github.com/gorilla/websocket"
|
"github.com/gorilla/websocket"
|
||||||
"github.com/sour-is/ev/internal/logz"
|
"github.com/sour-is/ev/internal/logz"
|
||||||
|
"github.com/sour-is/ev/pkg/domain"
|
||||||
"github.com/sour-is/ev/pkg/es"
|
"github.com/sour-is/ev/pkg/es"
|
||||||
"github.com/sour-is/ev/pkg/es/event"
|
"github.com/sour-is/ev/pkg/es/event"
|
||||||
)
|
)
|
||||||
|
@ -50,6 +52,11 @@ func (s *service) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||||
s.websocket(w, r)
|
s.websocket(w, r)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
if strings.HasPrefix(r.URL.Path, "/.well-known/salty") {
|
||||||
|
s.getUser(w, r)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
s.get(w, r)
|
s.get(w, r)
|
||||||
case http.MethodPost, http.MethodPut:
|
case http.MethodPost, http.MethodPut:
|
||||||
s.post(w, r)
|
s.post(w, r)
|
||||||
|
@ -108,6 +115,41 @@ func (s *service) get(w http.ResponseWriter, r *http.Request) {
|
||||||
fmt.Fprintln(w, events[i])
|
fmt.Fprintln(w, events[i])
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
func (s *service) getUser(w http.ResponseWriter, r *http.Request) {
|
||||||
|
ctx := r.Context()
|
||||||
|
ctx, span := logz.Span(ctx)
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
|
addr := "saltyuser-" + strings.TrimPrefix(r.URL.Path, "/.well-known/salty/")
|
||||||
|
addr = strings.TrimSuffix(addr, ".json")
|
||||||
|
|
||||||
|
span.AddEvent(fmt.Sprint("find ", addr))
|
||||||
|
a, err := es.Update(ctx, s.es, addr, func(ctx context.Context, agg *domain.SaltyUser) error { return nil })
|
||||||
|
switch {
|
||||||
|
case errors.Is(err, event.ErrShouldExist):
|
||||||
|
span.RecordError(err)
|
||||||
|
|
||||||
|
w.WriteHeader(http.StatusNotFound)
|
||||||
|
return
|
||||||
|
case err != nil:
|
||||||
|
span.RecordError(err)
|
||||||
|
|
||||||
|
w.WriteHeader(http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
err = json.NewEncoder(w).Encode(
|
||||||
|
struct {
|
||||||
|
Endpoint string `json:"endpoint"`
|
||||||
|
Key string `json:"key"`
|
||||||
|
}{
|
||||||
|
Endpoint: a.Inbox.String(),
|
||||||
|
Key: a.Pubkey.ID().String(),
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
span.RecordError(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
func (s *service) post(w http.ResponseWriter, r *http.Request) {
|
func (s *service) post(w http.ResponseWriter, r *http.Request) {
|
||||||
ctx := r.Context()
|
ctx := r.Context()
|
||||||
|
|
||||||
|
@ -229,7 +271,7 @@ func (s *service) websocket(w http.ResponseWriter, r *http.Request) {
|
||||||
|
|
||||||
es := s.es.EventStream()
|
es := s.es.EventStream()
|
||||||
if es == nil {
|
if es == nil {
|
||||||
span.AddEvent(fmt.Sprint("EventStore does not implement streaming"))
|
span.AddEvent("EventStore does not implement streaming")
|
||||||
w.WriteHeader(http.StatusInternalServerError)
|
w.WriteHeader(http.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -242,11 +284,11 @@ func (s *service) websocket(w http.ResponseWriter, r *http.Request) {
|
||||||
defer func() {
|
defer func() {
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
|
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
span.AddEvent(fmt.Sprint("stop ws"))
|
span.AddEvent("stop ws")
|
||||||
sub.Close(ctx)
|
sub.Close(ctx)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
span.AddEvent(fmt.Sprint("start ws"))
|
span.AddEvent("start ws")
|
||||||
for sub.Recv(ctx) {
|
for sub.Recv(ctx) {
|
||||||
events, err := sub.Events(ctx)
|
events, err := sub.Events(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
Loading…
Reference in New Issue
Block a user