diff --git a/app/salty/salty-addr.go b/app/salty/salty-addr.go index 0fb6d24..3db2630 100644 --- a/app/salty/salty-addr.go +++ b/app/salty/salty-addr.go @@ -25,6 +25,9 @@ type Capabilities struct { } func (c Capabilities) String() string { + if c.AcceptEncoding == "" { + return "" + } return fmt.Sprint("accept-encoding: ", c.AcceptEncoding) } @@ -83,30 +86,40 @@ func (a *Addr) Refresh(ctx context.Context) error { defer span.End() span.AddEvent(fmt.Sprintf("Looking up SRV record for _salty._tcp.%s", a.Domain)) - if target, _, err := a.dns.LookupSRV(ctx, "salty", "tcp", a.Domain); err == nil { - a.discoveredDomain = target + if _, srv, err := a.dns.LookupSRV(ctx, "salty", "tcp", a.Domain); err == nil { + if len(srv) > 0 { + a.discoveredDomain = strings.TrimSuffix(srv[0].Target, ".") + } span.AddEvent(fmt.Sprintf("Discovered salty services %s", a.discoveredDomain)) } else if err != nil { - span.AddEvent(fmt.Sprintf("error looking up SRV record for _salty._tcp.%s : %s", a.Domain, err)) + span.RecordError(fmt.Errorf("error looking up SRV record for _salty._tcp.%s : %s", a.Domain, err)) } config, cap, err := fetchConfig(ctx, a.HashURI()) if err != nil { // Fallback to plain user nick + span.RecordError(err) + config, cap, err = fetchConfig(ctx, a.URI()) } if err != nil { - return fmt.Errorf("error looking up user %s: %w", a, err) + err = fmt.Errorf("error looking up user %s: %w", a, err) + span.RecordError(err) + return err } key, err := keys.NewEdX25519PublicKeyFromID(keys.ID(config.Key)) if err != nil { - return fmt.Errorf("error parsing public key %s: %w", config.Key, err) + err = fmt.Errorf("error parsing public key %s: %w", config.Key, err) + span.RecordError(err) + return err } a.key = key u, err := url.Parse(config.Endpoint) if err != nil { - return fmt.Errorf("error parsing endpoint %s: %w", config.Endpoint, err) + err = fmt.Errorf("error parsing endpoint %s: %w", config.Endpoint, err) + span.RecordError(err) + return err } a.endpoint = u a.capabilities = cap diff --git a/app/salty/service.go b/app/salty/service.go index 3cf4922..d67cf10 100644 --- a/app/salty/service.go +++ b/app/salty/service.go @@ -10,6 +10,7 @@ import ( "net/http" "net/url" "strings" + "time" "github.com/keys-pub/keys" "github.com/sour-is/ev/internal/lg" @@ -35,6 +36,7 @@ type service struct { m_api_register syncint64.Counter m_api_lookup syncint64.Counter m_api_send syncint64.Counter + m_req_time syncint64.Histogram } type contextKey struct { name string @@ -81,6 +83,10 @@ func New(ctx context.Context, es *es.EventStore, baseURL string) (*service, erro svc.m_api_send, err = m.SyncInt64().Counter("salty_api_send") errs = multierr.Append(errs, err) + + svc.m_req_time, err = m.SyncInt64().Histogram("salty_request_time") + errs = multierr.Append(errs, err) + span.RecordError(err) return svc, errs @@ -106,6 +112,9 @@ func (s *service) ServeHTTP(w http.ResponseWriter, r *http.Request) { ctx, span := lg.Span(ctx) defer span.End() + start := time.Now() + defer s.m_req_time.Record(ctx, int64(time.Since(start))) + addr := "saltyuser-" + strings.TrimPrefix(r.URL.Path, "/.well-known/salty/") addr = strings.TrimSuffix(addr, ".json") @@ -209,10 +218,14 @@ func (s *service) apiv1(w http.ResponseWriter, r *http.Request) { case http.MethodGet: switch { case r.URL.Path == "/ping": + s.m_api_ping.Add(ctx, 1) + w.Header().Set("Content-Type", "application/json") _, _ = w.Write([]byte(`{}`)) case strings.HasPrefix(r.URL.Path, "/lookup/"): + s.m_api_lookup.Add(ctx, 1) + addr, err := s.ParseAddr(strings.TrimPrefix(r.URL.Path, "/lookup/")) if err != nil { span.RecordError(err) @@ -226,7 +239,8 @@ func (s *service) apiv1(w http.ResponseWriter, r *http.Request) { return } - json.NewEncoder(w).Encode(addr) + err = json.NewEncoder(w).Encode(addr) + span.RecordError(err) return default: @@ -237,8 +251,14 @@ func (s *service) apiv1(w http.ResponseWriter, r *http.Request) { case http.MethodPost: switch r.URL.Path { case "/register": + s.m_api_register.Add(ctx, 1) + notImplemented(w) + return case "/send": + s.m_api_send.Add(ctx, 1) + notImplemented(w) + return default: w.WriteHeader(http.StatusNotFound) @@ -249,3 +269,7 @@ func (s *service) apiv1(w http.ResponseWriter, r *http.Request) { return } } + +func notImplemented(w http.ResponseWriter) { + w.WriteHeader(http.StatusNotImplemented) +} diff --git a/httpmux.go b/httpmux.go index a19f204..a86cd18 100644 --- a/httpmux.go +++ b/httpmux.go @@ -1,6 +1,7 @@ package main import ( + "log" "net/http" "github.com/rs/cors" @@ -17,6 +18,7 @@ func httpMux(fns ...interface{ RegisterHTTP(*http.ServeMux) }) http.Handler { fn.RegisterHTTP(mux.ServeMux) if fn, ok := fn.(interface{ RegisterAPIv1(*http.ServeMux) }); ok { + log.Printf("register api %T", fn) fn.RegisterAPIv1(mux.api) } } @@ -28,10 +30,7 @@ func newMux() *mux { api: http.NewServeMux(), ServeMux: http.NewServeMux(), } - mux.Handle("/api/v1/", http.StripPrefix("/api/v1/", mux.api)) + mux.Handle("/api/v1/", http.StripPrefix("/api/v1", mux.api)) return mux } -func (m mux) HandleAPIv1(pattern string, handler http.Handler) { - m.api.Handle(pattern, handler) -} diff --git a/httpmux_test.go b/httpmux_test.go new file mode 100644 index 0000000..53b580e --- /dev/null +++ b/httpmux_test.go @@ -0,0 +1,40 @@ +package main + +import ( + "net/http" + "net/http/httptest" + "testing" + + "github.com/matryer/is" +) + +type mockHTTP struct { + onServeHTTP func() +} + +func (m *mockHTTP) ServeHTTP(w http.ResponseWriter, r *http.Request) { + m.onServeHTTP() +} + +func (h *mockHTTP) RegisterHTTP(mux *http.ServeMux) { + mux.Handle("/", h) +} +func (h *mockHTTP) RegisterAPIv1(mux *http.ServeMux) { + mux.Handle("/ping", h) +} + +func TestHttpMux(t *testing.T) { + is := is.New(t) + + called := false + + mux := httpMux(&mockHTTP{func() { called = true }}) + + is.True(mux != nil) + + w := httptest.NewRecorder() + r := httptest.NewRequest(http.MethodGet, "/api/v1/ping", nil) + mux.ServeHTTP(w, r) + + is.True(called) +} diff --git a/internal/lg/metric.go b/internal/lg/metric.go index 4a1d36f..1667693 100644 --- a/internal/lg/metric.go +++ b/internal/lg/metric.go @@ -48,7 +48,9 @@ func initMetrics(ctx context.Context, name string) (context.Context, func() erro host = h } - config := prometheus.Config{} + config := prometheus.Config{ + DefaultHistogramBoundaries: []float64{1, 2, 5, 10, 20, 50}, + } cont := controller.New( processor.NewFactory( selector.NewWithHistogramDistribution( diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index 368e0c5..29cf5d9 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -139,7 +139,7 @@ func (c *Cache[K, V]) ContainsOrAdd(ctx context.Context, key K, value V) (ok, ev // PeekOrAdd checks if a key is in the cache without updating the // recent-ness or deleting it for being stale, and if not, adds the value. // Returns whether found and whether an eviction occurred. -func (c *Cache[K, V]) PeekOrAdd(ctx context.Context, key K, value V) (previous interface{}, ok, evicted bool) { +func (c *Cache[K, V]) PeekOrAdd(ctx context.Context, key K, value V) (previous *V, ok, evicted bool) { var k K var v V c.lock.Lock() diff --git a/pkg/cache/cache_test.go b/pkg/cache/cache_test.go new file mode 100644 index 0000000..298f406 --- /dev/null +++ b/pkg/cache/cache_test.go @@ -0,0 +1,131 @@ +package cache_test + +import ( + "context" + "testing" + + "github.com/matryer/is" + "github.com/sour-is/ev/pkg/cache" +) + +func TestCache(t *testing.T) { + is := is.New(t) + ctx := context.Background() + + c, err := cache.NewCache[string, int](1) + is.NoErr(err) + + evicted := c.Add(ctx, "one", 1) + is.True(!evicted) + + is.True(c.Contains("one")) + _, ok := c.Peek("one") + is.True(ok) + + ok, evicted = c.ContainsOrAdd(ctx, "two", 2) + is.True(!ok) + is.True(evicted) + + is.True(!c.Contains("one")) + is.True(c.Contains("two")) + + is.Equal(c.Len(), 1) + is.Equal(c.Keys(), []string{"two"}) + + v, ok := c.Get("two") + is.True(ok) + is.Equal(*v, 2) + + evictCount := c.Resize(ctx, 100) + is.True(evictCount == 0) + + c.Add(ctx, "one", 1) + + prev, ok, evicted := c.PeekOrAdd(ctx, "three", 3) + is.True(!ok) + is.True(!evicted) + is.Equal(prev, nil) + + key, value, ok := c.GetOldest() + is.True(ok) + is.Equal(*key, "two") + is.Equal(*value, 2) + + key, value, ok = c.RemoveOldest(ctx) + is.True(ok) + is.Equal(*key, "two") + is.Equal(*value, 2) + + c.Remove(ctx, "one") + + c.Purge(ctx) + is.True(!c.Contains("three")) +} + +func TestCacheWithEvict(t *testing.T) { + is := is.New(t) + ctx := context.Background() + + evictions := 0 + + c, err := cache.NewWithEvict(1, func(ctx context.Context, s string, i int) { evictions++ }) + is.NoErr(err) + + key, value, ok := c.GetOldest() + is.True(!ok) + is.Equal(key, nil) + is.Equal(value, nil) + + key, value, ok = c.RemoveOldest(ctx) + is.True(!ok) + is.Equal(key, nil) + is.Equal(value, nil) + + evicted := c.Add(ctx, "one", 1) + is.True(!evicted) + + is.True(c.Contains("one")) + _, ok = c.Peek("one") + is.True(ok) + + ok, evicted = c.ContainsOrAdd(ctx, "two", 2) + is.True(!ok) + is.True(evicted) + + is.True(!c.Contains("one")) + is.True(c.Contains("two")) + + is.Equal(c.Len(), 1) + is.Equal(c.Keys(), []string{"two"}) + + v, ok := c.Get("two") + is.True(ok) + is.Equal(*v, 2) + + evictCount := c.Resize(ctx, 100) + is.True(evictCount == 0) + + c.Add(ctx, "one", 1) + + prev, ok, evicted := c.PeekOrAdd(ctx, "three", 3) + is.True(!ok) + is.True(!evicted) + is.Equal(prev, nil) + + key, value, ok = c.GetOldest() + is.True(ok) + is.Equal(*key, "two") + is.Equal(*value, 2) + + key, value, ok = c.RemoveOldest(ctx) + is.True(ok) + is.Equal(*key, "two") + is.Equal(*value, 2) + + c.Resize(ctx, 1) + + c.Purge(ctx) + is.True(!c.Contains("three")) + + is.Equal(evictions, 4) +} diff --git a/pkg/es/driver/projecter/projector_test.go b/pkg/es/driver/projecter/projector_test.go new file mode 100644 index 0000000..9aa2ffb --- /dev/null +++ b/pkg/es/driver/projecter/projector_test.go @@ -0,0 +1,29 @@ +package projecter_test + +import ( + "context" + "testing" + + "github.com/sour-is/ev/pkg/es/driver" +) + +type mockDriver struct { + onOpen func() + onEventLog func() +} + +// EventLog implements driver.Driver +func (*mockDriver) EventLog(ctx context.Context, streamID string) (driver.EventLog, error) { + panic("unimplemented") +} + +// Open implements driver.Driver +func (*mockDriver) Open(ctx context.Context, dsn string) (driver.Driver, error) { + panic("unimplemented") +} + +var _ driver.Driver = (*mockDriver)(nil) + +func TestProjecter(t *testing.T) { + +}