diff --git a/api/gql_ev/resolver.go b/api/gql_ev/resolver.go index e9260de..af3b886 100644 --- a/api/gql_ev/resolver.go +++ b/api/gql_ev/resolver.go @@ -3,6 +3,7 @@ package gql_ev import ( "context" "crypto/sha256" + "errors" "fmt" "strings" "time" @@ -19,31 +20,40 @@ import ( type Resolver struct { es *es.EventStore - Mresolver_posts syncint64.Counter - Mresolver_post_added syncint64.Counter - Mresolver_post_added_event syncint64.Counter + Mresolver_posts syncint64.Counter + Mresolver_post_added syncint64.Counter + Mresolver_post_added_event syncint64.Counter + Mresolver_create_salty_user syncint64.Counter + Mresolver_salty_user syncint64.Counter } -func New(es *es.EventStore) (*Resolver, error) { - m := logz.Meter(context.Background()) +func New(ctx context.Context, es *es.EventStore) (*Resolver, error) { + ctx, span := logz.Span(ctx) + defer span.End() - var errs error + m := logz.Meter(ctx) - Mresolver_posts, err := m.SyncInt64().Counter("resolver_posts") + var err, errs error + + r := &Resolver{es: es} + + r.Mresolver_posts, err = m.SyncInt64().Counter("resolver_posts") errs = multierr.Append(errs, err) - Mresolver_post_added, err := m.SyncInt64().Counter("resolver_post_added") + r.Mresolver_post_added, err = m.SyncInt64().Counter("resolver_post_added") errs = multierr.Append(errs, err) - Mresolver_post_added_event, err := m.SyncInt64().Counter("resolver_post_added") + r.Mresolver_post_added_event, err = m.SyncInt64().Counter("resolver_post_added") errs = multierr.Append(errs, err) - return &Resolver{ - es, - Mresolver_posts, - Mresolver_post_added, - Mresolver_post_added_event, - }, errs + r.Mresolver_create_salty_user, err = m.SyncInt64().Counter("resolver_create_salty_user") + errs = multierr.Append(errs, err) + + r.Mresolver_salty_user, err = m.SyncInt64().Counter("resolver_salty_user") + errs = multierr.Append(errs, err) + + span.RecordError(err) + return r, errs } // Posts is the resolver for the events field. @@ -55,6 +65,7 @@ func (r *Resolver) Posts(ctx context.Context, streamID string, paging *PageInput lis, err := r.es.Read(ctx, streamID, paging.GetIdx(0), paging.GetCount(30)) if err != nil { + span.RecordError(err) return nil, err } @@ -79,9 +90,11 @@ func (r *Resolver) Posts(ctx context.Context, streamID string, paging *PageInput var first, last uint64 if first, err = r.es.FirstIndex(ctx, streamID); err != nil { + span.RecordError(err) return nil, err } if last, err = r.es.LastIndex(ctx, streamID); err != nil { + span.RecordError(err) return nil, err } @@ -109,23 +122,30 @@ func (r *Resolver) PostAdded(ctx context.Context, streamID string, after int64) sub, err := es.Subscribe(ctx, streamID, after) if err != nil { + span.RecordError(err) return nil, err } ch := make(chan *PostEvent) go func() { + ctx, span := logz.Span(ctx) + defer span.End() + defer func() { ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) defer cancel() - sub.Close(ctx) + err := sub.Close(ctx) + span.RecordError(err) }() for sub.Recv(ctx) { events, err := sub.Events(ctx) if err != nil { + span.RecordError(err) break } + span.AddEvent(fmt.Sprintf("received %d events", len(events))) r.Mresolver_post_added_event.Add(ctx, int64(len(events))) for _, e := range events { @@ -151,33 +171,58 @@ func (r *Resolver) PostAdded(ctx context.Context, streamID string, after int64) } func (r *Resolver) CreateSaltyUser(ctx context.Context, nick string, pub string) (*SaltyUser, error) { + ctx, span := logz.Span(ctx) + defer span.End() + + r.Mresolver_create_salty_user.Add(ctx, 1) + streamID := fmt.Sprintf("saltyuser-%x", sha256.Sum256([]byte(strings.ToLower(nick)))) + span.AddEvent(streamID) key, err := keys.NewEdX25519PublicKeyFromID(keys.ID(pub)) if err != nil { + span.RecordError(err) return nil, err } a, err := es.Create(ctx, r.es, streamID, func(ctx context.Context, agg *domain.SaltyUser) error { return agg.OnUserRegister(nick, key) }) - if err != nil { - return nil, err + switch { + case errors.Is(err, es.ErrShouldNotExist): + span.RecordError(err) + return nil, fmt.Errorf("user exists") + + case err != nil: + span.RecordError(err) + return nil, fmt.Errorf("internal error") } return &SaltyUser{ Nick: nick, Pubkey: pub, Inbox: a.Inbox.String(), - }, err + }, nil } func (r *Resolver) SaltyUser(ctx context.Context, nick string) (*SaltyUser, error) { + ctx, span := logz.Span(ctx) + defer span.End() + + r.Mresolver_salty_user.Add(ctx, 1) + streamID := fmt.Sprintf("saltyuser-%x", sha256.Sum256([]byte(strings.ToLower(nick)))) + span.AddEvent(streamID) a, err := es.Update(ctx, r.es, streamID, func(ctx context.Context, agg *domain.SaltyUser) error { return nil }) - if err != nil { - return nil, err + switch { + case errors.Is(err, es.ErrShouldExist): + span.RecordError(err) + return nil, fmt.Errorf("user not found") + + case err != nil: + span.RecordError(err) + return nil, fmt.Errorf("%w internal error", err) } return &SaltyUser{ diff --git a/main.go b/main.go index 9efd86d..52c4fc2 100644 --- a/main.go +++ b/main.go @@ -49,58 +49,68 @@ func main() { } } func run(ctx context.Context) error { - ctx, span := logz.Span(ctx) - diskstore.Init(ctx) - memstore.Init(ctx) - if err := domain.Init(ctx); err != nil { - return err - } - - es, err := es.Open(ctx, env("EV_DATA", "file:data"), streamer.New(ctx)) - if err != nil { - return err - } - - svc, err := msgbus.New(ctx, es, env("EV_BASE_URL", "https://ev.sour.is/inbox/")) - if err != nil { - return err - } - - r, err := gql_ev.New(es) - if err != nil { - return err - } - res := graph.New(r) - gql := handler.NewDefaultServer(generated.NewExecutableSchema(generated.Config{Resolvers: res})) - gql.Use(otelgqlgen.Middleware()) - - s := http.Server{ - Addr: env("EV_HTTP", ":8080"), - } - mux := http.NewServeMux() - - mux.Handle("/", playground.Handler("GraphQL playground", "/gql")) - mux.Handle("/gql", logz.Htrace(res.ChainMiddlewares(gql), "gql")) - mux.Handle("/inbox/", logz.Htrace(http.StripPrefix("/inbox/", svc), "inbox")) - mux.Handle("/.well-known/salty/", logz.Htrace(svc, "lookup")) - mux.Handle("/metrics", logz.PromHTTP(ctx)) - - s.Handler = cors.AllowAll().Handler(mux) - - log.Print("Listen on ", s.Addr) g, ctx := errgroup.WithContext(ctx) - g.Go(s.ListenAndServe) + { + ctx, span := logz.Span(ctx) - g.Go(func() error { - <-ctx.Done() - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - return s.Shutdown(ctx) - }) + diskstore.Init(ctx) + memstore.Init(ctx) + if err := domain.Init(ctx); err != nil { + span.RecordError(err) + return err + } - span.End() + es, err := es.Open(ctx, env("EV_DATA", "file:data"), streamer.New(ctx)) + if err != nil { + span.RecordError(err) + return err + } + svc, err := msgbus.New(ctx, es, env("EV_BASE_URL", "https://ev.sour.is/inbox/")) + if err != nil { + span.RecordError(err) + return err + } + + r, err := gql_ev.New(ctx, es) + if err != nil { + span.RecordError(err) + return err + } + res := graph.New(r) + gql := handler.NewDefaultServer(generated.NewExecutableSchema(generated.Config{Resolvers: res})) + gql.Use(otelgqlgen.Middleware()) + + s := http.Server{ + Addr: env("EV_HTTP", ":8080"), + } + + mux := http.NewServeMux() + + mux.Handle("/", playground.Handler("GraphQL playground", "/gql")) + mux.Handle("/gql", logz.Htrace(res.ChainMiddlewares(gql), "gql")) + mux.Handle("/metrics", logz.PromHTTP(ctx)) + + mux.Handle("/inbox/", logz.Htrace(http.StripPrefix("/inbox/", svc), "inbox")) + mux.Handle("/.well-known/salty/", logz.Htrace(svc, "lookup")) + + s.Handler = cors.AllowAll().Handler(mux) + + log.Print("Listen on ", s.Addr) + span.AddEvent("begin listen and serve") + + g.Go(s.ListenAndServe) + + g.Go(func() error { + <-ctx.Done() + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + return s.Shutdown(ctx) + }) + + span.End() + } return g.Wait() } func env(name, defaultValue string) string { diff --git a/pkg/es/driver/disk-store/disk-store.go b/pkg/es/driver/disk-store/disk-store.go index 04657e1..d61bb83 100644 --- a/pkg/es/driver/disk-store/disk-store.go +++ b/pkg/es/driver/disk-store/disk-store.go @@ -111,13 +111,13 @@ func (d *diskStore) Open(ctx context.Context, dsn string) (driver.Driver, error) Mdisk_evict: d.Mdisk_evict, }, nil } -func (ds *diskStore) EventLog(ctx context.Context, streamID string) (driver.EventLog, error) { +func (d *diskStore) EventLog(ctx context.Context, streamID string) (driver.EventLog, error) { _, span := logz.Span(ctx) defer span.End() el := &eventLog{streamID: streamID} - return el, ds.openlogs.Modify(ctx, func(openlogs *openlogs) error { + return el, d.openlogs.Modify(ctx, func(openlogs *openlogs) error { _, span := logz.Span(ctx) defer span.End() @@ -126,7 +126,7 @@ func (ds *diskStore) EventLog(ctx context.Context, streamID string) (driver.Even return nil } - l, err := wal.Open(filepath.Join(ds.path, streamID), wal.DefaultOptions) + l, err := wal.Open(filepath.Join(d.path, streamID), wal.DefaultOptions) if err != nil { span.RecordError(err) return err @@ -144,14 +144,14 @@ type eventLog struct { var _ driver.EventLog = (*eventLog)(nil) -func (es *eventLog) Append(ctx context.Context, events event.Events, version uint64) (uint64, error) { +func (e *eventLog) Append(ctx context.Context, events event.Events, version uint64) (uint64, error) { _, span := logz.Span(ctx) defer span.End() - event.SetStreamID(es.streamID, events...) + event.SetStreamID(e.streamID, events...) var count uint64 - err := es.events.Modify(ctx, func(l *wal.Log) error { + err := e.events.Modify(ctx, func(l *wal.Log) error { _, span := logz.Span(ctx) defer span.End() @@ -162,7 +162,7 @@ func (es *eventLog) Append(ctx context.Context, events event.Events, version uin } if version != AppendOnly && version != last { - return fmt.Errorf("current version wrong %d != %d", version, last) + return fmt.Errorf("%w: current version wrong %d != %d", es.ErrWrongVersion, version, last) } var b []byte @@ -189,13 +189,13 @@ func (es *eventLog) Append(ctx context.Context, events event.Events, version uin return count, err } -func (es *eventLog) Read(ctx context.Context, pos, count int64) (event.Events, error) { +func (e *eventLog) Read(ctx context.Context, pos, count int64) (event.Events, error) { _, span := logz.Span(ctx) defer span.End() var events event.Events - err := es.events.Modify(ctx, func(stream *wal.Log) error { + err := e.events.Modify(ctx, func(stream *wal.Log) error { _, span := logz.Span(ctx) defer span.End() @@ -255,38 +255,38 @@ func (es *eventLog) Read(ctx context.Context, pos, count int64) (event.Events, e return nil, err } - event.SetStreamID(es.streamID, events...) + event.SetStreamID(e.streamID, events...) return events, nil } -func (es *eventLog) FirstIndex(ctx context.Context) (uint64, error) { +func (e *eventLog) FirstIndex(ctx context.Context) (uint64, error) { _, span := logz.Span(ctx) defer span.End() var idx uint64 var err error - err = es.events.Modify(ctx, func(events *wal.Log) error { + err = e.events.Modify(ctx, func(events *wal.Log) error { idx, err = events.FirstIndex() return err }) return idx, err } -func (es *eventLog) LastIndex(ctx context.Context) (uint64, error) { +func (e *eventLog) LastIndex(ctx context.Context) (uint64, error) { _, span := logz.Span(ctx) defer span.End() var idx uint64 var err error - err = es.events.Modify(ctx, func(events *wal.Log) error { + err = e.events.Modify(ctx, func(events *wal.Log) error { idx, err = events.LastIndex() return err }) return idx, err } -func (es *eventLog) LoadForUpdate(ctx context.Context, a event.Aggregate, fn func(context.Context, event.Aggregate) error) (uint64, error) { +func (e *eventLog) LoadForUpdate(ctx context.Context, a event.Aggregate, fn func(context.Context, event.Aggregate) error) (uint64, error) { panic("not implemented") } diff --git a/pkg/es/driver/mem-store/mem-store.go b/pkg/es/driver/mem-store/mem-store.go index ddec1c9..70f4a72 100644 --- a/pkg/es/driver/mem-store/mem-store.go +++ b/pkg/es/driver/mem-store/mem-store.go @@ -83,7 +83,7 @@ func (m *eventLog) Append(ctx context.Context, events event.Events, version uint last := uint64(len(*stream)) if version != AppendOnly && version != last { - return fmt.Errorf("current version wrong %d != %d", version, last) + return fmt.Errorf("%w: current version wrong %d != %d", es.ErrWrongVersion, version, last) } for i := range events { diff --git a/pkg/es/es.go b/pkg/es/es.go index 639b935..35ae4db 100644 --- a/pkg/es/es.go +++ b/pkg/es/es.go @@ -216,6 +216,9 @@ func Unwrap[T any](t T) T { } var ErrNoDriver = errors.New("no driver") +var ErrWrongVersion = errors.New("wrong version") +var ErrShouldExist = event.ErrShouldExist +var ErrShouldNotExist = event.ErrShouldNotExist type PA[T any] interface { event.Aggregate