feat: add histograms for request time
This commit is contained in:
		
							parent
							
								
									fb72d4bc8c
								
							
						
					
					
						commit
						be8a318ca3
					
				@ -16,16 +16,19 @@ import (
 | 
				
			|||||||
	"github.com/sour-is/ev/pkg/es"
 | 
						"github.com/sour-is/ev/pkg/es"
 | 
				
			||||||
	"github.com/sour-is/ev/pkg/es/event"
 | 
						"github.com/sour-is/ev/pkg/es/event"
 | 
				
			||||||
	"github.com/sour-is/ev/pkg/gql"
 | 
						"github.com/sour-is/ev/pkg/gql"
 | 
				
			||||||
 | 
						"go.opentelemetry.io/otel/metric/instrument"
 | 
				
			||||||
	"go.opentelemetry.io/otel/metric/instrument/syncint64"
 | 
						"go.opentelemetry.io/otel/metric/instrument/syncint64"
 | 
				
			||||||
 | 
						"go.opentelemetry.io/otel/metric/unit"
 | 
				
			||||||
	"go.uber.org/multierr"
 | 
						"go.uber.org/multierr"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type service struct {
 | 
					type service struct {
 | 
				
			||||||
	es *es.EventStore
 | 
						es *es.EventStore
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	Mresolver_posts            syncint64.Counter
 | 
						m_gql_posts            syncint64.Counter
 | 
				
			||||||
	Mresolver_post_added       syncint64.Counter
 | 
						m_gql_post_added       syncint64.Counter
 | 
				
			||||||
	Mresolver_post_added_event syncint64.Counter
 | 
						m_gql_post_added_event syncint64.Counter
 | 
				
			||||||
 | 
						m_req_time             syncint64.Histogram
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type MsgbusResolver interface {
 | 
					type MsgbusResolver interface {
 | 
				
			||||||
@ -50,13 +53,25 @@ func New(ctx context.Context, es *es.EventStore) (*service, error) {
 | 
				
			|||||||
	svc := &service{es: es}
 | 
						svc := &service{es: es}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	var err, errs error
 | 
						var err, errs error
 | 
				
			||||||
	svc.Mresolver_posts, err = m.SyncInt64().Counter("resolver_posts")
 | 
						svc.m_gql_posts, err = m.SyncInt64().Counter("msgbus_posts",
 | 
				
			||||||
 | 
							instrument.WithDescription("msgbus graphql posts requests"),
 | 
				
			||||||
 | 
						)
 | 
				
			||||||
	errs = multierr.Append(errs, err)
 | 
						errs = multierr.Append(errs, err)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	svc.Mresolver_post_added, err = m.SyncInt64().Counter("resolver_post_added")
 | 
						svc.m_gql_post_added, err = m.SyncInt64().Counter("msgbus_post_added",
 | 
				
			||||||
 | 
							instrument.WithDescription("msgbus graphql post added subcription requests"),
 | 
				
			||||||
 | 
						)
 | 
				
			||||||
	errs = multierr.Append(errs, err)
 | 
						errs = multierr.Append(errs, err)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	svc.Mresolver_post_added_event, err = m.SyncInt64().Counter("resolver_post_added")
 | 
						svc.m_gql_post_added_event, err = m.SyncInt64().Counter("msgbus_post_event",
 | 
				
			||||||
 | 
							instrument.WithDescription("msgbus graphql post added subscription events"),
 | 
				
			||||||
 | 
						)
 | 
				
			||||||
 | 
						errs = multierr.Append(errs, err)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						svc.m_req_time, err = m.SyncInt64().Histogram("msgbus_request_time",
 | 
				
			||||||
 | 
							instrument.WithDescription("msgbus graphql post added subscription events"),
 | 
				
			||||||
 | 
							instrument.WithUnit(unit.Unit("ns")),
 | 
				
			||||||
 | 
						)
 | 
				
			||||||
	errs = multierr.Append(errs, err)
 | 
						errs = multierr.Append(errs, err)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	span.RecordError(err)
 | 
						span.RecordError(err)
 | 
				
			||||||
@ -76,8 +91,10 @@ func (s *service) RegisterHTTP(mux *http.ServeMux) {
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
func (s *service) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 | 
					func (s *service) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 | 
				
			||||||
	ctx := r.Context()
 | 
						ctx := r.Context()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	ctx, span := lg.Span(ctx)
 | 
						ctx, span := lg.Span(ctx)
 | 
				
			||||||
	defer span.End()
 | 
						defer span.End()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	r = r.WithContext(ctx)
 | 
						r = r.WithContext(ctx)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	switch r.Method {
 | 
						switch r.Method {
 | 
				
			||||||
@ -96,13 +113,16 @@ func (s *service) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Posts is the resolver for the events field.
 | 
					// Posts is the resolver for the events field.
 | 
				
			||||||
func (r *service) Posts(ctx context.Context, streamID string, paging *gql.PageInput) (*gql.Connection, error) {
 | 
					func (s *service) Posts(ctx context.Context, streamID string, paging *gql.PageInput) (*gql.Connection, error) {
 | 
				
			||||||
	ctx, span := lg.Span(ctx)
 | 
						ctx, span := lg.Span(ctx)
 | 
				
			||||||
	defer span.End()
 | 
						defer span.End()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	r.Mresolver_posts.Add(ctx, 1)
 | 
						s.m_gql_posts.Add(ctx, 1)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	lis, err := r.es.Read(ctx, streamID, paging.GetIdx(0), paging.GetCount(30))
 | 
						start := time.Now()
 | 
				
			||||||
 | 
						defer s.m_req_time.Record(ctx, time.Since(start).Milliseconds())
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						lis, err := s.es.Read(ctx, streamID, paging.GetIdx(0), paging.GetCount(30))
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		span.RecordError(err)
 | 
							span.RecordError(err)
 | 
				
			||||||
		return nil, err
 | 
							return nil, err
 | 
				
			||||||
@ -122,11 +142,11 @@ func (r *service) Posts(ctx context.Context, streamID string, paging *gql.PageIn
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	var first, last uint64
 | 
						var first, last uint64
 | 
				
			||||||
	if first, err = r.es.FirstIndex(ctx, streamID); err != nil {
 | 
						if first, err = s.es.FirstIndex(ctx, streamID); err != nil {
 | 
				
			||||||
		span.RecordError(err)
 | 
							span.RecordError(err)
 | 
				
			||||||
		return nil, err
 | 
							return nil, err
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	if last, err = r.es.LastIndex(ctx, streamID); err != nil {
 | 
						if last, err = s.es.LastIndex(ctx, streamID); err != nil {
 | 
				
			||||||
		span.RecordError(err)
 | 
							span.RecordError(err)
 | 
				
			||||||
		return nil, err
 | 
							return nil, err
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
@ -146,7 +166,7 @@ func (r *service) PostAdded(ctx context.Context, streamID string, after int64) (
 | 
				
			|||||||
	ctx, span := lg.Span(ctx)
 | 
						ctx, span := lg.Span(ctx)
 | 
				
			||||||
	defer span.End()
 | 
						defer span.End()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	r.Mresolver_post_added.Add(ctx, 1)
 | 
						r.m_gql_post_added.Add(ctx, 1)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	es := r.es.EventStream()
 | 
						es := r.es.EventStream()
 | 
				
			||||||
	if es == nil {
 | 
						if es == nil {
 | 
				
			||||||
@ -183,7 +203,7 @@ func (r *service) PostAdded(ctx context.Context, streamID string, after int64) (
 | 
				
			|||||||
				break
 | 
									break
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
			span.AddEvent(fmt.Sprintf("received %d events", len(events)))
 | 
								span.AddEvent(fmt.Sprintf("received %d events", len(events)))
 | 
				
			||||||
			r.Mresolver_post_added_event.Add(ctx, int64(len(events)))
 | 
								r.m_gql_post_added_event.Add(ctx, int64(len(events)))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
			for _, e := range events {
 | 
								for _, e := range events {
 | 
				
			||||||
				if p, ok := e.(*PostEvent); ok {
 | 
									if p, ok := e.(*PostEvent); ok {
 | 
				
			||||||
@ -203,9 +223,13 @@ func (r *service) PostAdded(ctx context.Context, streamID string, after int64) (
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
func (s *service) get(w http.ResponseWriter, r *http.Request) {
 | 
					func (s *service) get(w http.ResponseWriter, r *http.Request) {
 | 
				
			||||||
	ctx := r.Context()
 | 
						ctx := r.Context()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	ctx, span := lg.Span(ctx)
 | 
						ctx, span := lg.Span(ctx)
 | 
				
			||||||
	defer span.End()
 | 
						defer span.End()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						start := time.Now()
 | 
				
			||||||
 | 
						defer s.m_req_time.Record(ctx, time.Since(start).Milliseconds())
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	name, _, _ := strings.Cut(r.URL.Path, "/")
 | 
						name, _, _ := strings.Cut(r.URL.Path, "/")
 | 
				
			||||||
	if name == "" {
 | 
						if name == "" {
 | 
				
			||||||
		w.WriteHeader(http.StatusNotFound)
 | 
							w.WriteHeader(http.StatusNotFound)
 | 
				
			||||||
@ -260,6 +284,9 @@ func (s *service) post(w http.ResponseWriter, r *http.Request) {
 | 
				
			|||||||
	ctx, span := lg.Span(ctx)
 | 
						ctx, span := lg.Span(ctx)
 | 
				
			||||||
	defer span.End()
 | 
						defer span.End()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						start := time.Now()
 | 
				
			||||||
 | 
						defer s.m_req_time.Record(ctx, time.Since(start).Milliseconds())
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	name, tags, _ := strings.Cut(r.URL.Path, "/")
 | 
						name, tags, _ := strings.Cut(r.URL.Path, "/")
 | 
				
			||||||
	if name == "" {
 | 
						if name == "" {
 | 
				
			||||||
		w.WriteHeader(http.StatusNotFound)
 | 
							w.WriteHeader(http.StatusNotFound)
 | 
				
			||||||
 | 
				
			|||||||
@ -17,7 +17,9 @@ import (
 | 
				
			|||||||
	"github.com/sour-is/ev/pkg/es"
 | 
						"github.com/sour-is/ev/pkg/es"
 | 
				
			||||||
	"github.com/sour-is/ev/pkg/es/event"
 | 
						"github.com/sour-is/ev/pkg/es/event"
 | 
				
			||||||
	"github.com/sour-is/ev/pkg/gql"
 | 
						"github.com/sour-is/ev/pkg/gql"
 | 
				
			||||||
 | 
						"go.opentelemetry.io/otel/metric/instrument"
 | 
				
			||||||
	"go.opentelemetry.io/otel/metric/instrument/syncint64"
 | 
						"go.opentelemetry.io/otel/metric/instrument/syncint64"
 | 
				
			||||||
 | 
						"go.opentelemetry.io/otel/metric/unit"
 | 
				
			||||||
	"go.uber.org/multierr"
 | 
						"go.uber.org/multierr"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -66,25 +68,40 @@ func New(ctx context.Context, es *es.EventStore, baseURL string) (*service, erro
 | 
				
			|||||||
	svc := &service{baseURL: baseURL, es: es, dns: net.DefaultResolver}
 | 
						svc := &service{baseURL: baseURL, es: es, dns: net.DefaultResolver}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	var err, errs error
 | 
						var err, errs error
 | 
				
			||||||
	svc.m_create_user, err = m.SyncInt64().Counter("salty_create_user")
 | 
						svc.m_create_user, err = m.SyncInt64().Counter("salty_create_user",
 | 
				
			||||||
 | 
							instrument.WithDescription("salty create user graphql called"),
 | 
				
			||||||
 | 
						)
 | 
				
			||||||
	errs = multierr.Append(errs, err)
 | 
						errs = multierr.Append(errs, err)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	svc.m_get_user, err = m.SyncInt64().Counter("salty_get_user")
 | 
						svc.m_get_user, err = m.SyncInt64().Counter("salty_get_user",
 | 
				
			||||||
 | 
							instrument.WithDescription("salty get user graphql called"),
 | 
				
			||||||
 | 
						)
 | 
				
			||||||
	errs = multierr.Append(errs, err)
 | 
						errs = multierr.Append(errs, err)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	svc.m_api_ping, err = m.SyncInt64().Counter("salty_api_ping")
 | 
						svc.m_api_ping, err = m.SyncInt64().Counter("salty_api_ping",
 | 
				
			||||||
 | 
							instrument.WithDescription("salty api ping called"),
 | 
				
			||||||
 | 
						)
 | 
				
			||||||
	errs = multierr.Append(errs, err)
 | 
						errs = multierr.Append(errs, err)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	svc.m_api_register, err = m.SyncInt64().Counter("salty_api_register")
 | 
						svc.m_api_register, err = m.SyncInt64().Counter("salty_api_register",
 | 
				
			||||||
 | 
							instrument.WithDescription("salty api register"),
 | 
				
			||||||
 | 
						)
 | 
				
			||||||
	errs = multierr.Append(errs, err)
 | 
						errs = multierr.Append(errs, err)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	svc.m_api_lookup, err = m.SyncInt64().Counter("salty_api_lookup")
 | 
						svc.m_api_lookup, err = m.SyncInt64().Counter("salty_api_lookup",
 | 
				
			||||||
 | 
							instrument.WithDescription("salty api ping lookup"),
 | 
				
			||||||
 | 
						)
 | 
				
			||||||
	errs = multierr.Append(errs, err)
 | 
						errs = multierr.Append(errs, err)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	svc.m_api_send, err = m.SyncInt64().Counter("salty_api_send")
 | 
						svc.m_api_send, err = m.SyncInt64().Counter("salty_api_send",
 | 
				
			||||||
 | 
							instrument.WithDescription("salty api ping send"),
 | 
				
			||||||
 | 
						)
 | 
				
			||||||
	errs = multierr.Append(errs, err)
 | 
						errs = multierr.Append(errs, err)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	svc.m_req_time, err = m.SyncInt64().Histogram("salty_request_time")
 | 
						svc.m_req_time, err = m.SyncInt64().Histogram("salty_request_time",
 | 
				
			||||||
 | 
							instrument.WithDescription("histogram of requests"),
 | 
				
			||||||
 | 
							instrument.WithUnit(unit.Unit("ns")),
 | 
				
			||||||
 | 
						)
 | 
				
			||||||
	errs = multierr.Append(errs, err)
 | 
						errs = multierr.Append(errs, err)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	span.RecordError(err)
 | 
						span.RecordError(err)
 | 
				
			||||||
@ -113,7 +130,7 @@ func (s *service) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 | 
				
			|||||||
	defer span.End()
 | 
						defer span.End()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	start := time.Now()
 | 
						start := time.Now()
 | 
				
			||||||
	defer s.m_req_time.Record(ctx, int64(time.Since(start)))
 | 
						defer s.m_req_time.Record(ctx, time.Since(start).Milliseconds())
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	addr := "saltyuser-" + strings.TrimPrefix(r.URL.Path, "/.well-known/salty/")
 | 
						addr := "saltyuser-" + strings.TrimPrefix(r.URL.Path, "/.well-known/salty/")
 | 
				
			||||||
	addr = strings.TrimSuffix(addr, ".json")
 | 
						addr = strings.TrimSuffix(addr, ".json")
 | 
				
			||||||
@ -152,6 +169,8 @@ func (s *service) CreateSaltyUser(ctx context.Context, nick string, pub string)
 | 
				
			|||||||
	defer span.End()
 | 
						defer span.End()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	s.m_create_user.Add(ctx, 1)
 | 
						s.m_create_user.Add(ctx, 1)
 | 
				
			||||||
 | 
						start := time.Now()
 | 
				
			||||||
 | 
						defer s.m_req_time.Record(ctx, time.Since(start).Milliseconds())
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	streamID := fmt.Sprintf("saltyuser-%x", sha256.Sum256([]byte(strings.ToLower(nick))))
 | 
						streamID := fmt.Sprintf("saltyuser-%x", sha256.Sum256([]byte(strings.ToLower(nick))))
 | 
				
			||||||
	span.AddEvent(streamID)
 | 
						span.AddEvent(streamID)
 | 
				
			||||||
@ -182,6 +201,8 @@ func (s *service) SaltyUser(ctx context.Context, nick string) (*SaltyUser, error
 | 
				
			|||||||
	defer span.End()
 | 
						defer span.End()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	s.m_get_user.Add(ctx, 1)
 | 
						s.m_get_user.Add(ctx, 1)
 | 
				
			||||||
 | 
						start := time.Now()
 | 
				
			||||||
 | 
						defer s.m_req_time.Record(ctx, time.Since(start).Milliseconds())
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	streamID := fmt.Sprintf("saltyuser-%x", sha256.Sum256([]byte(strings.ToLower(nick))))
 | 
						streamID := fmt.Sprintf("saltyuser-%x", sha256.Sum256([]byte(strings.ToLower(nick))))
 | 
				
			||||||
	span.AddEvent(streamID)
 | 
						span.AddEvent(streamID)
 | 
				
			||||||
@ -214,6 +235,9 @@ func (s *service) apiv1(w http.ResponseWriter, r *http.Request) {
 | 
				
			|||||||
	ctx, span := lg.Span(ctx)
 | 
						ctx, span := lg.Span(ctx)
 | 
				
			||||||
	defer span.End()
 | 
						defer span.End()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						start := time.Now()
 | 
				
			||||||
 | 
						defer s.m_req_time.Record(ctx, time.Since(start).Nanoseconds())
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	switch r.Method {
 | 
						switch r.Method {
 | 
				
			||||||
	case http.MethodGet:
 | 
						case http.MethodGet:
 | 
				
			||||||
		switch {
 | 
							switch {
 | 
				
			||||||
 | 
				
			|||||||
@ -49,7 +49,9 @@ func initMetrics(ctx context.Context, name string) (context.Context, func() erro
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	config := prometheus.Config{
 | 
						config := prometheus.Config{
 | 
				
			||||||
		DefaultHistogramBoundaries: []float64{1, 2, 5, 10, 20, 50},
 | 
							DefaultHistogramBoundaries: []float64{
 | 
				
			||||||
 | 
								2 << 6, 2 << 8, 2 << 10, 2 << 12, 2 << 14, 2 << 16, 2 << 18, 2 << 20, 2 << 22, 2 << 24, 2 << 26, 2 << 28,
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	cont := controller.New(
 | 
						cont := controller.New(
 | 
				
			||||||
		processor.NewFactory(
 | 
							processor.NewFactory(
 | 
				
			||||||
 | 
				
			|||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user