feat: add histograms for request time
This commit is contained in:
parent
6425fcadcd
commit
9103de501b
|
@ -16,16 +16,19 @@ import (
|
||||||
"github.com/sour-is/ev/pkg/es"
|
"github.com/sour-is/ev/pkg/es"
|
||||||
"github.com/sour-is/ev/pkg/es/event"
|
"github.com/sour-is/ev/pkg/es/event"
|
||||||
"github.com/sour-is/ev/pkg/gql"
|
"github.com/sour-is/ev/pkg/gql"
|
||||||
|
"go.opentelemetry.io/otel/metric/instrument"
|
||||||
"go.opentelemetry.io/otel/metric/instrument/syncint64"
|
"go.opentelemetry.io/otel/metric/instrument/syncint64"
|
||||||
|
"go.opentelemetry.io/otel/metric/unit"
|
||||||
"go.uber.org/multierr"
|
"go.uber.org/multierr"
|
||||||
)
|
)
|
||||||
|
|
||||||
type service struct {
|
type service struct {
|
||||||
es *es.EventStore
|
es *es.EventStore
|
||||||
|
|
||||||
Mresolver_posts syncint64.Counter
|
m_gql_posts syncint64.Counter
|
||||||
Mresolver_post_added syncint64.Counter
|
m_gql_post_added syncint64.Counter
|
||||||
Mresolver_post_added_event syncint64.Counter
|
m_gql_post_added_event syncint64.Counter
|
||||||
|
m_req_time syncint64.Histogram
|
||||||
}
|
}
|
||||||
|
|
||||||
type MsgbusResolver interface {
|
type MsgbusResolver interface {
|
||||||
|
@ -50,13 +53,25 @@ func New(ctx context.Context, es *es.EventStore) (*service, error) {
|
||||||
svc := &service{es: es}
|
svc := &service{es: es}
|
||||||
|
|
||||||
var err, errs error
|
var err, errs error
|
||||||
svc.Mresolver_posts, err = m.SyncInt64().Counter("resolver_posts")
|
svc.m_gql_posts, err = m.SyncInt64().Counter("msgbus_posts",
|
||||||
|
instrument.WithDescription("msgbus graphql posts requests"),
|
||||||
|
)
|
||||||
errs = multierr.Append(errs, err)
|
errs = multierr.Append(errs, err)
|
||||||
|
|
||||||
svc.Mresolver_post_added, err = m.SyncInt64().Counter("resolver_post_added")
|
svc.m_gql_post_added, err = m.SyncInt64().Counter("msgbus_post_added",
|
||||||
|
instrument.WithDescription("msgbus graphql post added subcription requests"),
|
||||||
|
)
|
||||||
errs = multierr.Append(errs, err)
|
errs = multierr.Append(errs, err)
|
||||||
|
|
||||||
svc.Mresolver_post_added_event, err = m.SyncInt64().Counter("resolver_post_added")
|
svc.m_gql_post_added_event, err = m.SyncInt64().Counter("msgbus_post_event",
|
||||||
|
instrument.WithDescription("msgbus graphql post added subscription events"),
|
||||||
|
)
|
||||||
|
errs = multierr.Append(errs, err)
|
||||||
|
|
||||||
|
svc.m_req_time, err = m.SyncInt64().Histogram("msgbus_request_time",
|
||||||
|
instrument.WithDescription("msgbus graphql post added subscription events"),
|
||||||
|
instrument.WithUnit(unit.Unit("ns")),
|
||||||
|
)
|
||||||
errs = multierr.Append(errs, err)
|
errs = multierr.Append(errs, err)
|
||||||
|
|
||||||
span.RecordError(err)
|
span.RecordError(err)
|
||||||
|
@ -76,8 +91,10 @@ func (s *service) RegisterHTTP(mux *http.ServeMux) {
|
||||||
}
|
}
|
||||||
func (s *service) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
func (s *service) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||||
ctx := r.Context()
|
ctx := r.Context()
|
||||||
|
|
||||||
ctx, span := lg.Span(ctx)
|
ctx, span := lg.Span(ctx)
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
r = r.WithContext(ctx)
|
r = r.WithContext(ctx)
|
||||||
|
|
||||||
switch r.Method {
|
switch r.Method {
|
||||||
|
@ -96,13 +113,16 @@ func (s *service) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Posts is the resolver for the events field.
|
// Posts is the resolver for the events field.
|
||||||
func (r *service) Posts(ctx context.Context, streamID string, paging *gql.PageInput) (*gql.Connection, error) {
|
func (s *service) Posts(ctx context.Context, streamID string, paging *gql.PageInput) (*gql.Connection, error) {
|
||||||
ctx, span := lg.Span(ctx)
|
ctx, span := lg.Span(ctx)
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
r.Mresolver_posts.Add(ctx, 1)
|
s.m_gql_posts.Add(ctx, 1)
|
||||||
|
|
||||||
lis, err := r.es.Read(ctx, streamID, paging.GetIdx(0), paging.GetCount(30))
|
start := time.Now()
|
||||||
|
defer s.m_req_time.Record(ctx, time.Since(start).Milliseconds())
|
||||||
|
|
||||||
|
lis, err := s.es.Read(ctx, streamID, paging.GetIdx(0), paging.GetCount(30))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
span.RecordError(err)
|
span.RecordError(err)
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -122,11 +142,11 @@ func (r *service) Posts(ctx context.Context, streamID string, paging *gql.PageIn
|
||||||
}
|
}
|
||||||
|
|
||||||
var first, last uint64
|
var first, last uint64
|
||||||
if first, err = r.es.FirstIndex(ctx, streamID); err != nil {
|
if first, err = s.es.FirstIndex(ctx, streamID); err != nil {
|
||||||
span.RecordError(err)
|
span.RecordError(err)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if last, err = r.es.LastIndex(ctx, streamID); err != nil {
|
if last, err = s.es.LastIndex(ctx, streamID); err != nil {
|
||||||
span.RecordError(err)
|
span.RecordError(err)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -146,7 +166,7 @@ func (r *service) PostAdded(ctx context.Context, streamID string, after int64) (
|
||||||
ctx, span := lg.Span(ctx)
|
ctx, span := lg.Span(ctx)
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
r.Mresolver_post_added.Add(ctx, 1)
|
r.m_gql_post_added.Add(ctx, 1)
|
||||||
|
|
||||||
es := r.es.EventStream()
|
es := r.es.EventStream()
|
||||||
if es == nil {
|
if es == nil {
|
||||||
|
@ -183,7 +203,7 @@ func (r *service) PostAdded(ctx context.Context, streamID string, after int64) (
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
span.AddEvent(fmt.Sprintf("received %d events", len(events)))
|
span.AddEvent(fmt.Sprintf("received %d events", len(events)))
|
||||||
r.Mresolver_post_added_event.Add(ctx, int64(len(events)))
|
r.m_gql_post_added_event.Add(ctx, int64(len(events)))
|
||||||
|
|
||||||
for _, e := range events {
|
for _, e := range events {
|
||||||
if p, ok := e.(*PostEvent); ok {
|
if p, ok := e.(*PostEvent); ok {
|
||||||
|
@ -203,9 +223,13 @@ func (r *service) PostAdded(ctx context.Context, streamID string, after int64) (
|
||||||
|
|
||||||
func (s *service) get(w http.ResponseWriter, r *http.Request) {
|
func (s *service) get(w http.ResponseWriter, r *http.Request) {
|
||||||
ctx := r.Context()
|
ctx := r.Context()
|
||||||
|
|
||||||
ctx, span := lg.Span(ctx)
|
ctx, span := lg.Span(ctx)
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
|
start := time.Now()
|
||||||
|
defer s.m_req_time.Record(ctx, time.Since(start).Milliseconds())
|
||||||
|
|
||||||
name, _, _ := strings.Cut(r.URL.Path, "/")
|
name, _, _ := strings.Cut(r.URL.Path, "/")
|
||||||
if name == "" {
|
if name == "" {
|
||||||
w.WriteHeader(http.StatusNotFound)
|
w.WriteHeader(http.StatusNotFound)
|
||||||
|
@ -260,6 +284,9 @@ func (s *service) post(w http.ResponseWriter, r *http.Request) {
|
||||||
ctx, span := lg.Span(ctx)
|
ctx, span := lg.Span(ctx)
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
|
start := time.Now()
|
||||||
|
defer s.m_req_time.Record(ctx, time.Since(start).Milliseconds())
|
||||||
|
|
||||||
name, tags, _ := strings.Cut(r.URL.Path, "/")
|
name, tags, _ := strings.Cut(r.URL.Path, "/")
|
||||||
if name == "" {
|
if name == "" {
|
||||||
w.WriteHeader(http.StatusNotFound)
|
w.WriteHeader(http.StatusNotFound)
|
||||||
|
|
|
@ -17,7 +17,9 @@ import (
|
||||||
"github.com/sour-is/ev/pkg/es"
|
"github.com/sour-is/ev/pkg/es"
|
||||||
"github.com/sour-is/ev/pkg/es/event"
|
"github.com/sour-is/ev/pkg/es/event"
|
||||||
"github.com/sour-is/ev/pkg/gql"
|
"github.com/sour-is/ev/pkg/gql"
|
||||||
|
"go.opentelemetry.io/otel/metric/instrument"
|
||||||
"go.opentelemetry.io/otel/metric/instrument/syncint64"
|
"go.opentelemetry.io/otel/metric/instrument/syncint64"
|
||||||
|
"go.opentelemetry.io/otel/metric/unit"
|
||||||
"go.uber.org/multierr"
|
"go.uber.org/multierr"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -66,25 +68,40 @@ func New(ctx context.Context, es *es.EventStore, baseURL string) (*service, erro
|
||||||
svc := &service{baseURL: baseURL, es: es, dns: net.DefaultResolver}
|
svc := &service{baseURL: baseURL, es: es, dns: net.DefaultResolver}
|
||||||
|
|
||||||
var err, errs error
|
var err, errs error
|
||||||
svc.m_create_user, err = m.SyncInt64().Counter("salty_create_user")
|
svc.m_create_user, err = m.SyncInt64().Counter("salty_create_user",
|
||||||
|
instrument.WithDescription("salty create user graphql called"),
|
||||||
|
)
|
||||||
errs = multierr.Append(errs, err)
|
errs = multierr.Append(errs, err)
|
||||||
|
|
||||||
svc.m_get_user, err = m.SyncInt64().Counter("salty_get_user")
|
svc.m_get_user, err = m.SyncInt64().Counter("salty_get_user",
|
||||||
|
instrument.WithDescription("salty get user graphql called"),
|
||||||
|
)
|
||||||
errs = multierr.Append(errs, err)
|
errs = multierr.Append(errs, err)
|
||||||
|
|
||||||
svc.m_api_ping, err = m.SyncInt64().Counter("salty_api_ping")
|
svc.m_api_ping, err = m.SyncInt64().Counter("salty_api_ping",
|
||||||
|
instrument.WithDescription("salty api ping called"),
|
||||||
|
)
|
||||||
errs = multierr.Append(errs, err)
|
errs = multierr.Append(errs, err)
|
||||||
|
|
||||||
svc.m_api_register, err = m.SyncInt64().Counter("salty_api_register")
|
svc.m_api_register, err = m.SyncInt64().Counter("salty_api_register",
|
||||||
|
instrument.WithDescription("salty api register"),
|
||||||
|
)
|
||||||
errs = multierr.Append(errs, err)
|
errs = multierr.Append(errs, err)
|
||||||
|
|
||||||
svc.m_api_lookup, err = m.SyncInt64().Counter("salty_api_lookup")
|
svc.m_api_lookup, err = m.SyncInt64().Counter("salty_api_lookup",
|
||||||
|
instrument.WithDescription("salty api ping lookup"),
|
||||||
|
)
|
||||||
errs = multierr.Append(errs, err)
|
errs = multierr.Append(errs, err)
|
||||||
|
|
||||||
svc.m_api_send, err = m.SyncInt64().Counter("salty_api_send")
|
svc.m_api_send, err = m.SyncInt64().Counter("salty_api_send",
|
||||||
|
instrument.WithDescription("salty api ping send"),
|
||||||
|
)
|
||||||
errs = multierr.Append(errs, err)
|
errs = multierr.Append(errs, err)
|
||||||
|
|
||||||
svc.m_req_time, err = m.SyncInt64().Histogram("salty_request_time")
|
svc.m_req_time, err = m.SyncInt64().Histogram("salty_request_time",
|
||||||
|
instrument.WithDescription("histogram of requests"),
|
||||||
|
instrument.WithUnit(unit.Unit("ns")),
|
||||||
|
)
|
||||||
errs = multierr.Append(errs, err)
|
errs = multierr.Append(errs, err)
|
||||||
|
|
||||||
span.RecordError(err)
|
span.RecordError(err)
|
||||||
|
@ -113,7 +130,7 @@ func (s *service) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
defer s.m_req_time.Record(ctx, int64(time.Since(start)))
|
defer s.m_req_time.Record(ctx, time.Since(start).Milliseconds())
|
||||||
|
|
||||||
addr := "saltyuser-" + strings.TrimPrefix(r.URL.Path, "/.well-known/salty/")
|
addr := "saltyuser-" + strings.TrimPrefix(r.URL.Path, "/.well-known/salty/")
|
||||||
addr = strings.TrimSuffix(addr, ".json")
|
addr = strings.TrimSuffix(addr, ".json")
|
||||||
|
@ -152,6 +169,8 @@ func (s *service) CreateSaltyUser(ctx context.Context, nick string, pub string)
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
s.m_create_user.Add(ctx, 1)
|
s.m_create_user.Add(ctx, 1)
|
||||||
|
start := time.Now()
|
||||||
|
defer s.m_req_time.Record(ctx, time.Since(start).Milliseconds())
|
||||||
|
|
||||||
streamID := fmt.Sprintf("saltyuser-%x", sha256.Sum256([]byte(strings.ToLower(nick))))
|
streamID := fmt.Sprintf("saltyuser-%x", sha256.Sum256([]byte(strings.ToLower(nick))))
|
||||||
span.AddEvent(streamID)
|
span.AddEvent(streamID)
|
||||||
|
@ -182,6 +201,8 @@ func (s *service) SaltyUser(ctx context.Context, nick string) (*SaltyUser, error
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
s.m_get_user.Add(ctx, 1)
|
s.m_get_user.Add(ctx, 1)
|
||||||
|
start := time.Now()
|
||||||
|
defer s.m_req_time.Record(ctx, time.Since(start).Milliseconds())
|
||||||
|
|
||||||
streamID := fmt.Sprintf("saltyuser-%x", sha256.Sum256([]byte(strings.ToLower(nick))))
|
streamID := fmt.Sprintf("saltyuser-%x", sha256.Sum256([]byte(strings.ToLower(nick))))
|
||||||
span.AddEvent(streamID)
|
span.AddEvent(streamID)
|
||||||
|
@ -214,6 +235,9 @@ func (s *service) apiv1(w http.ResponseWriter, r *http.Request) {
|
||||||
ctx, span := lg.Span(ctx)
|
ctx, span := lg.Span(ctx)
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
|
start := time.Now()
|
||||||
|
defer s.m_req_time.Record(ctx, time.Since(start).Nanoseconds())
|
||||||
|
|
||||||
switch r.Method {
|
switch r.Method {
|
||||||
case http.MethodGet:
|
case http.MethodGet:
|
||||||
switch {
|
switch {
|
||||||
|
|
|
@ -49,7 +49,9 @@ func initMetrics(ctx context.Context, name string) (context.Context, func() erro
|
||||||
}
|
}
|
||||||
|
|
||||||
config := prometheus.Config{
|
config := prometheus.Config{
|
||||||
DefaultHistogramBoundaries: []float64{1, 2, 5, 10, 20, 50},
|
DefaultHistogramBoundaries: []float64{
|
||||||
|
2 << 6, 2 << 8, 2 << 10, 2 << 12, 2 << 14, 2 << 16, 2 << 18, 2 << 20, 2 << 22, 2 << 24, 2 << 26, 2 << 28,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
cont := controller.New(
|
cont := controller.New(
|
||||||
processor.NewFactory(
|
processor.NewFactory(
|
||||||
|
|
Loading…
Reference in New Issue
Block a user