From 814c974e93885f7cb98bf1b841f99bf58f63b8cb Mon Sep 17 00:00:00 2001 From: Jon Lundy Date: Fri, 19 Aug 2022 12:26:42 -0600 Subject: [PATCH] refactor: move graphql into individual services --- Makefile | 13 +- api/gql_ev/docs.go | 1 - api/gql_ev/resolver.go | 238 +++--------------- {api/gql_ev => app/msgbus}/msgbus.graphqls | 2 +- {pkg => app}/msgbus/service.go | 233 ++++++++++++----- app/msgbus/service_test.go | 25 ++ {pkg => app}/playground/playground.go | 0 {pkg/domain => app/salty}/salty-user.go | 31 ++- {api/gql_ev => app/salty}/salty.graphqls | 2 +- app/salty/service.go | 166 ++++++++++++ gqlgen.yml | 13 +- internal/graph/generated/generated.go | 172 ++++++------- internal/graph/resolver.go | 55 ---- main.go | 52 ++-- pkg/es/driver/disk-store/disk-store.go | 1 - pkg/es/es.graphqls | 7 + pkg/es/event/reflect.go | 84 +++++-- {api/gql_ev => pkg/gql}/common.graphqls | 15 +- api/gql_ev/models.go => pkg/gql/connection.go | 12 +- pkg/gql/context.go | 14 ++ 20 files changed, 633 insertions(+), 503 deletions(-) delete mode 100644 api/gql_ev/docs.go rename {api/gql_ev => app/msgbus}/msgbus.graphqls (77%) rename {pkg => app}/msgbus/service.go (63%) create mode 100644 app/msgbus/service_test.go rename {pkg => app}/playground/playground.go (100%) rename {pkg/domain => app/salty}/salty-user.go (73%) rename {api/gql_ev => app/salty}/salty.graphqls (75%) create mode 100644 app/salty/service.go delete mode 100644 internal/graph/resolver.go create mode 100644 pkg/es/es.graphqls rename {api/gql_ev => pkg/gql}/common.graphqls (62%) rename api/gql_ev/models.go => pkg/gql/connection.go (80%) create mode 100644 pkg/gql/context.go diff --git a/Makefile b/Makefile index 129615d..6dc96ed 100644 --- a/Makefile +++ b/Makefile @@ -4,17 +4,24 @@ export EV_HTTP=:8080 export EV_TRACE_SAMPLE=always -include local.mk -run: gen +air: gen ifeq (, $(shell which air)) go install github.com/cosmtrek/air@latest endif air + +run: + go run . + test: go test -cover -race ./... -GQLDIR=api/gql_ev -GQLS=$(wildcard $(GQLDIR)/*.go) $(wildcard $(GQLDIR)/*.graphqls) gqlgen.yml +GQLS=gqlgen.yml +GQLS:=$(GQLS) $(wildcard api/gql_ev/*.go) +GQLS:=$(GQLS) $(wildcard pkg/*/*.graphqls) +GQLS:=$(GQLS) $(wildcard app/*/*.graphqls) +GQLS:=$(GQLS) $(wildcard app/*/*.go) GQLSRC=internal/graph/generated/generated.go gen: gql diff --git a/api/gql_ev/docs.go b/api/gql_ev/docs.go deleted file mode 100644 index 455f0e0..0000000 --- a/api/gql_ev/docs.go +++ /dev/null @@ -1 +0,0 @@ -package gql_ev diff --git a/api/gql_ev/resolver.go b/api/gql_ev/resolver.go index af3b886..e44e8f3 100644 --- a/api/gql_ev/resolver.go +++ b/api/gql_ev/resolver.go @@ -2,232 +2,54 @@ package gql_ev import ( "context" - "crypto/sha256" - "errors" - "fmt" - "strings" - "time" + "net/http" + "reflect" - "github.com/keys-pub/keys" + "github.com/sour-is/ev/app/msgbus" + "github.com/sour-is/ev/app/salty" + "github.com/sour-is/ev/internal/graph/generated" "github.com/sour-is/ev/internal/logz" - "github.com/sour-is/ev/pkg/domain" - "github.com/sour-is/ev/pkg/es" - "github.com/sour-is/ev/pkg/msgbus" - "go.opentelemetry.io/otel/metric/instrument/syncint64" - "go.uber.org/multierr" ) type Resolver struct { - es *es.EventStore - - Mresolver_posts syncint64.Counter - Mresolver_post_added syncint64.Counter - Mresolver_post_added_event syncint64.Counter - Mresolver_create_salty_user syncint64.Counter - Mresolver_salty_user syncint64.Counter + msgbus.MsgbusResolver + salty.SaltyResolver } -func New(ctx context.Context, es *es.EventStore) (*Resolver, error) { - ctx, span := logz.Span(ctx) +func New(ctx context.Context, m msgbus.MsgbusResolver, s salty.SaltyResolver) (*Resolver, error) { + _, span := logz.Span(ctx) defer span.End() - m := logz.Meter(ctx) + r := &Resolver{m, s} - var err, errs error - - r := &Resolver{es: es} - - r.Mresolver_posts, err = m.SyncInt64().Counter("resolver_posts") - errs = multierr.Append(errs, err) - - r.Mresolver_post_added, err = m.SyncInt64().Counter("resolver_post_added") - errs = multierr.Append(errs, err) - - r.Mresolver_post_added_event, err = m.SyncInt64().Counter("resolver_post_added") - errs = multierr.Append(errs, err) - - r.Mresolver_create_salty_user, err = m.SyncInt64().Counter("resolver_create_salty_user") - errs = multierr.Append(errs, err) - - r.Mresolver_salty_user, err = m.SyncInt64().Counter("resolver_salty_user") - errs = multierr.Append(errs, err) - - span.RecordError(err) - return r, errs + return r, nil } -// Posts is the resolver for the events field. -func (r *Resolver) Posts(ctx context.Context, streamID string, paging *PageInput) (*Connection, error) { - ctx, span := logz.Span(ctx) - defer span.End() +// Query returns generated.QueryResolver implementation. +func (r *Resolver) Query() generated.QueryResolver { return r } - r.Mresolver_posts.Add(ctx, 1) +// Query returns generated.QueryResolver implementation. +func (r *Resolver) Mutation() generated.MutationResolver { return r } - lis, err := r.es.Read(ctx, streamID, paging.GetIdx(0), paging.GetCount(30)) - if err != nil { - span.RecordError(err) - return nil, err - } +// Subscription returns generated.SubscriptionResolver implementation. +func (r *Resolver) Subscription() generated.SubscriptionResolver { return r } - edges := make([]Edge, 0, len(lis)) - for i := range lis { - span.AddEvent(fmt.Sprint("post ", i, " of ", len(lis))) - e := lis[i] - m := e.EventMeta() - - post, ok := e.(*msgbus.PostEvent) - if !ok { +// ChainMiddlewares will check all embeded resolvers for a GetMiddleware func and add to handler. +func (r *Resolver) ChainMiddlewares(h http.Handler) http.Handler { + v := reflect.ValueOf(r) // Get reflected value of *Resolver + v = reflect.Indirect(v) // Get the pointed value (returns a zero value on nil) + n := v.NumField() // Get number of fields to iterate over. + for i := 0; i < n; i++ { + f := v.Field(i) + if !f.CanInterface() { // Skip non-interface types. continue } - - edges = append(edges, PostEvent{ - ID: lis[i].EventMeta().EventID.String(), - Payload: string(post.Payload), - Tags: post.Tags, - Meta: &m, - }) - } - - var first, last uint64 - if first, err = r.es.FirstIndex(ctx, streamID); err != nil { - span.RecordError(err) - return nil, err - } - if last, err = r.es.LastIndex(ctx, streamID); err != nil { - span.RecordError(err) - return nil, err - } - - return &Connection{ - Paging: &PageInfo{ - Next: lis.Last().EventMeta().Position < last, - Prev: lis.First().EventMeta().Position > first, - Begin: lis.First().EventMeta().Position, - End: lis.Last().EventMeta().Position, - }, - Edges: edges, - }, nil -} - -func (r *Resolver) PostAdded(ctx context.Context, streamID string, after int64) (<-chan *PostEvent, error) { - ctx, span := logz.Span(ctx) - defer span.End() - - r.Mresolver_post_added.Add(ctx, 1) - - es := r.es.EventStream() - if es == nil { - return nil, fmt.Errorf("EventStore does not implement streaming") - } - - sub, err := es.Subscribe(ctx, streamID, after) - if err != nil { - span.RecordError(err) - return nil, err - } - - ch := make(chan *PostEvent) - - go func() { - ctx, span := logz.Span(ctx) - defer span.End() - - defer func() { - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) - defer cancel() - err := sub.Close(ctx) - span.RecordError(err) - }() - - for sub.Recv(ctx) { - events, err := sub.Events(ctx) - if err != nil { - span.RecordError(err) - break - } - span.AddEvent(fmt.Sprintf("received %d events", len(events))) - r.Mresolver_post_added_event.Add(ctx, int64(len(events))) - - for _, e := range events { - m := e.EventMeta() - if p, ok := e.(*msgbus.PostEvent); ok { - select { - case ch <- &PostEvent{ - ID: m.EventID.String(), - Payload: string(p.Payload), - Tags: p.Tags, - Meta: &m, - }: - continue - case <-ctx.Done(): - return - } - } - } + if iface, ok := f.Interface().(interface { + GetMiddleware() func(http.Handler) http.Handler + }); ok { + h = iface.GetMiddleware()(h) // Append only items that fulfill the interface. } - }() - - return ch, nil -} - -func (r *Resolver) CreateSaltyUser(ctx context.Context, nick string, pub string) (*SaltyUser, error) { - ctx, span := logz.Span(ctx) - defer span.End() - - r.Mresolver_create_salty_user.Add(ctx, 1) - - streamID := fmt.Sprintf("saltyuser-%x", sha256.Sum256([]byte(strings.ToLower(nick)))) - span.AddEvent(streamID) - - key, err := keys.NewEdX25519PublicKeyFromID(keys.ID(pub)) - if err != nil { - span.RecordError(err) - return nil, err } - a, err := es.Create(ctx, r.es, streamID, func(ctx context.Context, agg *domain.SaltyUser) error { - return agg.OnUserRegister(nick, key) - }) - switch { - case errors.Is(err, es.ErrShouldNotExist): - span.RecordError(err) - return nil, fmt.Errorf("user exists") - - case err != nil: - span.RecordError(err) - return nil, fmt.Errorf("internal error") - } - - return &SaltyUser{ - Nick: nick, - Pubkey: pub, - Inbox: a.Inbox.String(), - }, nil -} - -func (r *Resolver) SaltyUser(ctx context.Context, nick string) (*SaltyUser, error) { - ctx, span := logz.Span(ctx) - defer span.End() - - r.Mresolver_salty_user.Add(ctx, 1) - - streamID := fmt.Sprintf("saltyuser-%x", sha256.Sum256([]byte(strings.ToLower(nick)))) - span.AddEvent(streamID) - - a, err := es.Update(ctx, r.es, streamID, func(ctx context.Context, agg *domain.SaltyUser) error { return nil }) - switch { - case errors.Is(err, es.ErrShouldExist): - span.RecordError(err) - return nil, fmt.Errorf("user not found") - - case err != nil: - span.RecordError(err) - return nil, fmt.Errorf("%w internal error", err) - } - - return &SaltyUser{ - Nick: nick, - Pubkey: a.Pubkey.String(), - Inbox: a.Inbox.String(), - }, err + return h } diff --git a/api/gql_ev/msgbus.graphqls b/app/msgbus/msgbus.graphqls similarity index 77% rename from api/gql_ev/msgbus.graphqls rename to app/msgbus/msgbus.graphqls index 53c0f3d..43358cc 100644 --- a/api/gql_ev/msgbus.graphqls +++ b/app/msgbus/msgbus.graphqls @@ -5,7 +5,7 @@ extend type Subscription { """after == 0 start from begining, after == -1 start from end""" postAdded(streamID: String! after: Int! = -1): PostEvent } -type PostEvent implements Edge { +type PostEvent implements Edge @goModel(model: "github.com/sour-is/ev/app/msgbus.PostEvent") { id: ID! payload: String! diff --git a/pkg/msgbus/service.go b/app/msgbus/service.go similarity index 63% rename from pkg/msgbus/service.go rename to app/msgbus/service.go index 33c3c55..852a186 100644 --- a/pkg/msgbus/service.go +++ b/app/msgbus/service.go @@ -4,35 +4,63 @@ import ( "bytes" "context" "encoding/json" - "errors" "fmt" "io" "net/http" - "path" "strconv" "strings" "time" "github.com/gorilla/websocket" "github.com/sour-is/ev/internal/logz" - "github.com/sour-is/ev/pkg/domain" "github.com/sour-is/ev/pkg/es" "github.com/sour-is/ev/pkg/es/event" + "github.com/sour-is/ev/pkg/gql" + "go.opentelemetry.io/otel/metric/instrument/syncint64" + "go.uber.org/multierr" ) type service struct { - baseURL string - es *es.EventStore + es *es.EventStore + + Mresolver_posts syncint64.Counter + Mresolver_post_added syncint64.Counter + Mresolver_post_added_event syncint64.Counter } -func New(ctx context.Context, es *es.EventStore, baseURL string) (*service, error) { +type MsgbusResolver interface { + Posts(ctx context.Context, streamID string, paging *gql.PageInput) (*gql.Connection, error) + PostAdded(ctx context.Context, streamID string, after int64) (<-chan *PostEvent, error) +} + +func New(ctx context.Context, es *es.EventStore) (*service, error) { ctx, span := logz.Span(ctx) defer span.End() if err := event.Register(ctx, &PostEvent{}); err != nil { return nil, err } - return &service{baseURL, es}, nil + if err := event.RegisterName(ctx, "domain.PostEvent", &PostEvent{}); err != nil { + return nil, err + } + + m := logz.Meter(ctx) + + svc := &service{es: es} + + var err, errs error + svc.Mresolver_posts, err = m.SyncInt64().Counter("resolver_posts") + errs = multierr.Append(errs, err) + + svc.Mresolver_post_added, err = m.SyncInt64().Counter("resolver_post_added") + errs = multierr.Append(errs, err) + + svc.Mresolver_post_added_event, err = m.SyncInt64().Counter("resolver_post_added") + errs = multierr.Append(errs, err) + + span.RecordError(err) + + return svc, errs } var upgrader = websocket.Upgrader{ @@ -54,10 +82,6 @@ func (s *service) ServeHTTP(w http.ResponseWriter, r *http.Request) { s.websocket(w, r) return } - if strings.HasPrefix(r.URL.Path, "/.well-known/salty") { - s.getUser(w, r) - return - } s.get(w, r) case http.MethodPost, http.MethodPut: @@ -67,6 +91,108 @@ func (s *service) ServeHTTP(w http.ResponseWriter, r *http.Request) { } } +// Posts is the resolver for the events field. +func (r *service) Posts(ctx context.Context, streamID string, paging *gql.PageInput) (*gql.Connection, error) { + ctx, span := logz.Span(ctx) + defer span.End() + + r.Mresolver_posts.Add(ctx, 1) + + lis, err := r.es.Read(ctx, streamID, paging.GetIdx(0), paging.GetCount(30)) + if err != nil { + span.RecordError(err) + return nil, err + } + + edges := make([]gql.Edge, 0, len(lis)) + for i := range lis { + span.AddEvent(fmt.Sprint("post ", i, " of ", len(lis))) + e := lis[i] + + post, ok := e.(*PostEvent) + if !ok { + continue + } + + edges = append(edges, post) + } + + var first, last uint64 + if first, err = r.es.FirstIndex(ctx, streamID); err != nil { + span.RecordError(err) + return nil, err + } + if last, err = r.es.LastIndex(ctx, streamID); err != nil { + span.RecordError(err) + return nil, err + } + + return &gql.Connection{ + Paging: &gql.PageInfo{ + Next: lis.Last().EventMeta().Position < last, + Prev: lis.First().EventMeta().Position > first, + Begin: lis.First().EventMeta().Position, + End: lis.Last().EventMeta().Position, + }, + Edges: edges, + }, nil +} + +func (r *service) PostAdded(ctx context.Context, streamID string, after int64) (<-chan *PostEvent, error) { + ctx, span := logz.Span(ctx) + defer span.End() + + r.Mresolver_post_added.Add(ctx, 1) + + es := r.es.EventStream() + if es == nil { + return nil, fmt.Errorf("EventStore does not implement streaming") + } + + sub, err := es.Subscribe(ctx, streamID, after) + if err != nil { + span.RecordError(err) + return nil, err + } + + ch := make(chan *PostEvent) + + go func() { + ctx, span := logz.Span(ctx) + defer span.End() + + defer func() { + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + err := sub.Close(ctx) + span.RecordError(err) + }() + + for sub.Recv(ctx) { + events, err := sub.Events(ctx) + if err != nil { + span.RecordError(err) + break + } + span.AddEvent(fmt.Sprintf("received %d events", len(events))) + r.Mresolver_post_added_event.Add(ctx, int64(len(events))) + + for _, e := range events { + if p, ok := e.(*PostEvent); ok { + select { + case ch <- p: + continue + case <-ctx.Done(): + return + } + } + } + } + }() + + return ch, nil +} + func (s *service) get(w http.ResponseWriter, r *http.Request) { ctx := r.Context() ctx, span := logz.Span(ctx) @@ -120,41 +246,6 @@ func (s *service) get(w http.ResponseWriter, r *http.Request) { fmt.Fprintln(w, events[i]) } } -func (s *service) getUser(w http.ResponseWriter, r *http.Request) { - ctx := r.Context() - ctx, span := logz.Span(ctx) - defer span.End() - - addr := "saltyuser-" + strings.TrimPrefix(r.URL.Path, "/.well-known/salty/") - addr = strings.TrimSuffix(addr, ".json") - - span.AddEvent(fmt.Sprint("find ", addr)) - a, err := es.Update(ctx, s.es, addr, func(ctx context.Context, agg *domain.SaltyUser) error { return nil }) - switch { - case errors.Is(err, event.ErrShouldExist): - span.RecordError(err) - - w.WriteHeader(http.StatusNotFound) - return - case err != nil: - span.RecordError(err) - - w.WriteHeader(http.StatusInternalServerError) - return - } - - err = json.NewEncoder(w).Encode( - struct { - Endpoint string `json:"endpoint"` - Key string `json:"key"` - }{ - Endpoint: path.Join(s.baseURL, a.Inbox.String()), - Key: a.Pubkey.ID().String(), - }) - if err != nil { - span.RecordError(err) - } -} func (s *service) post(w http.ResponseWriter, r *http.Request) { ctx := r.Context() @@ -187,8 +278,8 @@ func (s *service) post(w http.ResponseWriter, r *http.Request) { } events := event.NewEvents(&PostEvent{ - Payload: b, - Tags: fields(tags), + payload: b, + tags: fields(tags), }) _, err = s.es.Append(ctx, "post-"+name, events) @@ -322,8 +413,8 @@ func (s *service) websocket(w http.ResponseWriter, r *http.Request) { } type PostEvent struct { - Payload []byte - Tags []string + payload []byte + tags []string eventMeta event.Meta } @@ -341,11 +432,38 @@ func (e *PostEvent) SetEventMeta(eventMeta event.Meta) { e.eventMeta = eventMeta } func (e *PostEvent) MarshalBinary() ([]byte, error) { - return json.Marshal(e) + j := struct { + Payload []byte + Tags []string + }{ + Payload: e.payload, + Tags: e.tags, + } + return json.Marshal(&j) } func (e *PostEvent) UnmarshalBinary(b []byte) error { - return json.Unmarshal(b, e) + j := struct { + Payload []byte + Tags []string + }{} + err := json.Unmarshal(b, &j) + e.payload = j.Payload + e.tags = j.Tags + + return err } +func (e *PostEvent) MarshalJSON() ([]byte, error) { return e.MarshalBinary() } +func (e *PostEvent) UnmarshalJSON(b []byte) error { return e.UnmarshalBinary(b) } + +func (e *PostEvent) ID() string { return e.eventMeta.GetEventID() } +func (e *PostEvent) Tags() []string { return e.tags } +func (e *PostEvent) Payload() string { return string(e.payload) } +func (e *PostEvent) PayloadJSON(ctx context.Context) (m map[string]interface{}, err error) { + err = json.Unmarshal([]byte(e.payload), &m) + return +} +func (e *PostEvent) Meta() *event.Meta { return &e.eventMeta } +func (e *PostEvent) IsEdge() {} func (e *PostEvent) String() string { var b bytes.Buffer @@ -357,14 +475,15 @@ func (e *PostEvent) String() string { b.WriteString(e.eventMeta.EventID.String()) b.WriteRune('\t') - b.WriteString(string(e.Payload)) - if len(e.Tags) > 0 { + b.WriteString(string(e.payload)) + if len(e.tags) > 0 { b.WriteRune('\t') - b.WriteString(strings.Join(e.Tags, ",")) + b.WriteString(strings.Join(e.tags, ",")) } return b.String() } + func fields(s string) []string { if s == "" { return nil @@ -393,8 +512,8 @@ func encodeJSON(w io.Writer, first event.Event, events ...event.Event) error { } out[i].ID = e.EventMeta().Position out[i].Created = e.EventMeta().Created().Format(time.RFC3339Nano) - out[i].Payload = e.Payload - out[i].Tags = e.Tags + out[i].Payload = e.payload + out[i].Tags = e.tags out[i].Topic.Name = strings.TrimPrefix(e.EventMeta().StreamID, "post-") out[i].Topic.Created = first.EventMeta().Created().Format(time.RFC3339Nano) out[i].Topic.Seq = e.EventMeta().Position diff --git a/app/msgbus/service_test.go b/app/msgbus/service_test.go new file mode 100644 index 0000000..5e4620c --- /dev/null +++ b/app/msgbus/service_test.go @@ -0,0 +1,25 @@ +package msgbus + +import ( + "encoding/json" + "testing" +) + +func TestUnmarshal(t *testing.T) { + m := &PostEvent{} + s := `{"Payload":"QkVHSU4gU0FMVFBBQ0sgRU5DUllQVEVEIE1FU1NBR0UuIGtlRElETVFXWXZWUjU4QiBGVGZUZURRTkhzT2ZuZWMgWUV1dkNYSTNoVDBYZzZKIDJxeXBJcmdsT3ZlZjlqciA0dHVQaWpJRVgxdlpoTEkgUTVKTzNvYVY5cnNna01BIEFOeTJwYjg2Qkd6N0JGMCA4MXJuZk9OV2RRM0VldFAgSmU0ZFlHeUI4NkRydkVrIGNqcFpoajNmcEJUcDdiZiBpMktwRDJQM1kzNVJBQU8gWmIyZGZtOVpneHZNSVJ2IDJsVVRCWTQxVEtZNkJhTyB2NGVIeXF1MENjQkR4dW8gSEZIekxJd3BBb3ZoRGt1IGFJdXRZYzdhZ3puMUxvNCBZQWFyUDZxVVVtTVlrQXAgYkdSYTZLZWVOa3ZzTDdMIHFoMWd6WUlnS2l6cW51eCB1SVQ0QTdaU1BscWxlR1IgbTk3M2ZoNUduWEZTM3MwIDJzQ2FvclpmN2c1RUo5TiBlS1hkZkFSMWF6TVRBek8gSmNEM1hDNDBwVTRpaG9mIE8wYnB2RU1UOVlUb3ZOWCBobVUxZWZ6enpyMUFDdXcgWExwcUhlVXNXdEtGcXRnIHdyWEZleExBYU50T21jRSBOeFFDUi4gRU5EIFNBTFRQQUNLIEVOQ1JZUFRFRCBNRVNTQUdFLgo=","Tags":null}` + + err := json.Unmarshal([]byte(s), m) + t.Log(err) +} + +func TestMarshal(t *testing.T) { + m := &PostEvent{ + payload: []byte("QkVHSU4gU0FMVFBBQ0sgRU5DUllQVEVEIE1FU1NBR0UuIGtlRElETVFXWXZWUjU4QiBGVGZUZURRTkhzT2ZuZWMgWUV1dkNYSTNoVDBYZzZKIDJxeXBJcmdsT3ZlZjlqciA0dHVQaWpJRVgxdlpoTEkgUTVKTzNvYVY5cnNna01BIEFOeTJwYjg2Qkd6N0JGMCA4MXJuZk9OV2RRM0VldFAgSmU0ZFlHeUI4NkRydkVrIGNqcFpoajNmcEJUcDdiZiBpMktwRDJQM1kzNVJBQU8gWmIyZGZtOVpneHZNSVJ2IDJsVVRCWTQxVEtZNkJhTyB2NGVIeXF1MENjQkR4dW8gSEZIekxJd3BBb3ZoRGt1IGFJdXRZYzdhZ3puMUxvNCBZQWFyUDZxVVVtTVlrQXAgYkdSYTZLZWVOa3ZzTDdMIHFoMWd6WUlnS2l6cW51eCB1SVQ0QTdaU1BscWxlR1IgbTk3M2ZoNUduWEZTM3MwIDJzQ2FvclpmN2c1RUo5TiBlS1hkZkFSMWF6TVRBek8gSmNEM1hDNDBwVTRpaG9mIE8wYnB2RU1UOVlUb3ZOWCBobVUxZWZ6enpyMUFDdXcgWExwcUhlVXNXdEtGcXRnIHdyWEZleExBYU50T21jRSBOeFFDUi4gRU5EIFNBTFRQQUNLIEVOQ1JZUFRFRCBNRVNTQUdFLgo="), + } + b, err := json.Marshal(m) + t.Log(err) + + err = json.Unmarshal(b, m) + t.Log(err) +} diff --git a/pkg/playground/playground.go b/app/playground/playground.go similarity index 100% rename from pkg/playground/playground.go rename to app/playground/playground.go diff --git a/pkg/domain/salty-user.go b/app/salty/salty-user.go similarity index 73% rename from pkg/domain/salty-user.go rename to app/salty/salty-user.go index 6223157..10bc1bd 100644 --- a/pkg/domain/salty-user.go +++ b/app/salty/salty-user.go @@ -1,4 +1,4 @@ -package domain +package salty import ( "bytes" @@ -6,21 +6,19 @@ import ( "crypto/sha256" "fmt" "log" + "path" "strings" "github.com/keys-pub/keys" "github.com/oklog/ulid/v2" "github.com/sour-is/ev/pkg/es/event" + "github.com/sour-is/ev/pkg/gql" ) -func Init(ctx context.Context) error { - return event.Register(ctx, &UserRegistered{}) -} - type SaltyUser struct { - Name string - Pubkey *keys.EdX25519PublicKey - Inbox ulid.ULID + name string + pubkey *keys.EdX25519PublicKey + inbox ulid.ULID event.AggregateRoot } @@ -32,9 +30,9 @@ func (a *SaltyUser) ApplyEvent(lis ...event.Event) { for _, e := range lis { switch e := e.(type) { case *UserRegistered: - a.Name = e.Name - a.Pubkey = e.Pubkey - a.Inbox = e.EventMeta().EventID + a.name = e.Name + a.pubkey = e.Pubkey + a.inbox = e.EventMeta().EventID a.SetStreamID(a.streamID()) default: log.Printf("unknown event %T", e) @@ -43,7 +41,7 @@ func (a *SaltyUser) ApplyEvent(lis ...event.Event) { } func (a *SaltyUser) streamID() string { - return fmt.Sprintf("saltyuser-%x", sha256.Sum256([]byte(strings.ToLower(a.Name)))) + return fmt.Sprintf("saltyuser-%x", sha256.Sum256([]byte(strings.ToLower(a.name)))) } func (a *SaltyUser) OnUserRegister(name string, pubkey *keys.EdX25519PublicKey) error { @@ -51,6 +49,14 @@ func (a *SaltyUser) OnUserRegister(name string, pubkey *keys.EdX25519PublicKey) return nil } +func (a *SaltyUser) Nick() string { return a.name } +func (a *SaltyUser) Inbox() string { return a.inbox.String() } +func (a *SaltyUser) Pubkey() string { return a.pubkey.String() } +func (s *SaltyUser) Endpoint(ctx context.Context) string { + svc := gql.FromContext[contextKey, *service](ctx, saltyKey) + return path.Join(svc.BaseURL(), s.inbox.String()) +} + type UserRegistered struct { Name string Pubkey *keys.EdX25519PublicKey @@ -66,7 +72,6 @@ func (e *UserRegistered) EventMeta() event.Meta { } return e.eventMeta } - func (e *UserRegistered) SetEventMeta(m event.Meta) { if e != nil { e.eventMeta = m diff --git a/api/gql_ev/salty.graphqls b/app/salty/salty.graphqls similarity index 75% rename from api/gql_ev/salty.graphqls rename to app/salty/salty.graphqls index 0088be2..37bb5c9 100644 --- a/api/gql_ev/salty.graphqls +++ b/app/salty/salty.graphqls @@ -6,7 +6,7 @@ extend type Mutation { createSaltyUser(nick: String! pubkey: String!): SaltyUser } -type SaltyUser { +type SaltyUser @goModel(model: "github.com/sour-is/ev/app/salty.SaltyUser"){ nick: String! pubkey: String! inbox: String! diff --git a/app/salty/service.go b/app/salty/service.go new file mode 100644 index 0000000..197cb98 --- /dev/null +++ b/app/salty/service.go @@ -0,0 +1,166 @@ +package salty + +import ( + "context" + "crypto/sha256" + "encoding/json" + "errors" + "fmt" + "net/http" + "path" + "strings" + + "github.com/keys-pub/keys" + "github.com/sour-is/ev/internal/logz" + "github.com/sour-is/ev/pkg/es" + "github.com/sour-is/ev/pkg/es/event" + "github.com/sour-is/ev/pkg/gql" + "go.opentelemetry.io/otel/metric/instrument/syncint64" + "go.uber.org/multierr" +) + +type service struct { + baseURL string + es *es.EventStore + + Mresolver_create_salty_user syncint64.Counter + Mresolver_salty_user syncint64.Counter +} +type contextKey struct { + name string +} + +var saltyKey = contextKey{"salty"} + +type SaltyResolver interface { + CreateSaltyUser(ctx context.Context, nick string, pub string) (*SaltyUser, error) + SaltyUser(ctx context.Context, nick string) (*SaltyUser, error) +} + +func New(ctx context.Context, es *es.EventStore, baseURL string) (*service, error) { + ctx, span := logz.Span(ctx) + defer span.End() + + if err := event.Register(ctx, &UserRegistered{}); err != nil { + return nil, err + } + if err := event.RegisterName(ctx, "domain.UserRegistered", &UserRegistered{}); err != nil { + return nil, err + } + + m := logz.Meter(ctx) + + svc := &service{baseURL: baseURL, es: es} + + var err, errs error + svc.Mresolver_create_salty_user, err = m.SyncInt64().Counter("resolver_create_salty_user") + errs = multierr.Append(errs, err) + + svc.Mresolver_salty_user, err = m.SyncInt64().Counter("resolver_salty_user") + errs = multierr.Append(errs, err) + span.RecordError(err) + + return svc, errs +} + +func (s *service) BaseURL() string { + if s == nil { + return "http://missing.context/" + } + return s.baseURL +} +func (s *service) ServeHTTP(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + ctx, span := logz.Span(ctx) + defer span.End() + + addr := "saltyuser-" + strings.TrimPrefix(r.URL.Path, "/.well-known/salty/") + addr = strings.TrimSuffix(addr, ".json") + + span.AddEvent(fmt.Sprint("find ", addr)) + a, err := es.Update(ctx, s.es, addr, func(ctx context.Context, agg *SaltyUser) error { return nil }) + switch { + case errors.Is(err, event.ErrShouldExist): + span.RecordError(err) + + w.WriteHeader(http.StatusNotFound) + return + case err != nil: + span.RecordError(err) + + w.WriteHeader(http.StatusInternalServerError) + return + } + + err = json.NewEncoder(w).Encode( + struct { + Endpoint string `json:"endpoint"` + Key string `json:"key"` + }{ + Endpoint: path.Join(s.baseURL, a.inbox.String()), + Key: a.pubkey.ID().String(), + }) + if err != nil { + span.RecordError(err) + } +} +func (s *service) CreateSaltyUser(ctx context.Context, nick string, pub string) (*SaltyUser, error) { + ctx, span := logz.Span(ctx) + defer span.End() + + s.Mresolver_create_salty_user.Add(ctx, 1) + + streamID := fmt.Sprintf("saltyuser-%x", sha256.Sum256([]byte(strings.ToLower(nick)))) + span.AddEvent(streamID) + + key, err := keys.NewEdX25519PublicKeyFromID(keys.ID(pub)) + if err != nil { + span.RecordError(err) + return nil, err + } + + a, err := es.Create(ctx, s.es, streamID, func(ctx context.Context, agg *SaltyUser) error { + return agg.OnUserRegister(nick, key) + }) + switch { + case errors.Is(err, es.ErrShouldNotExist): + span.RecordError(err) + return nil, fmt.Errorf("user exists") + + case err != nil: + span.RecordError(err) + return nil, fmt.Errorf("internal error") + } + + return a, nil +} +func (s *service) SaltyUser(ctx context.Context, nick string) (*SaltyUser, error) { + ctx, span := logz.Span(ctx) + defer span.End() + + s.Mresolver_salty_user.Add(ctx, 1) + + streamID := fmt.Sprintf("saltyuser-%x", sha256.Sum256([]byte(strings.ToLower(nick)))) + span.AddEvent(streamID) + + a, err := es.Update(ctx, s.es, streamID, func(ctx context.Context, agg *SaltyUser) error { return nil }) + switch { + case errors.Is(err, es.ErrShouldExist): + span.RecordError(err) + return nil, fmt.Errorf("user not found") + + case err != nil: + span.RecordError(err) + return nil, fmt.Errorf("%w internal error", err) + } + + return a, err +} +func (s *service) GetMiddleware() func(http.Handler) http.Handler { + return func(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + r = r.WithContext(gql.ToContext(r.Context(), saltyKey, s)) + next.ServeHTTP(w, r) + }) + } +} diff --git a/gqlgen.yml b/gqlgen.yml index f0329d0..19d5081 100644 --- a/gqlgen.yml +++ b/gqlgen.yml @@ -1,6 +1,7 @@ # Where are all the schema files located? globs are supported eg src/**/*.graphqls schema: - - api/gql_ev/*.graphqls + - pkg/*/*.graphqls + - app/*/*.graphqls # Where should the generated server code go? exec: @@ -34,8 +35,8 @@ model: # gqlgen will search for any type names in the schema in these go packages # if they match it will use them, otherwise it will generate them. -autobind: - - "github.com/sour-is/ev/api/gql_ev" +# autobind: +# - "github.com/sour-is/ev/pkg/gql" # This section declares type mapping between the GraphQL and go type systems # @@ -60,10 +61,4 @@ models: - github.com/99designs/gqlgen/graphql.Uint64 - github.com/99designs/gqlgen/graphql.Uint32 - github.com/99designs/gqlgen/graphql.Uint - Time: - model: - - github.com/99designs/gqlgen/graphql.Time - Meta: - model: - - github.com/sour-is/ev/pkg/es/event.Meta diff --git a/internal/graph/generated/generated.go b/internal/graph/generated/generated.go index 956f69a..b5c81de 100644 --- a/internal/graph/generated/generated.go +++ b/internal/graph/generated/generated.go @@ -16,8 +16,10 @@ import ( "github.com/99designs/gqlgen/graphql" "github.com/99designs/gqlgen/graphql/introspection" "github.com/99designs/gqlgen/plugin/federation/fedruntime" - "github.com/sour-is/ev/api/gql_ev" + "github.com/sour-is/ev/app/msgbus" + "github.com/sour-is/ev/app/salty" "github.com/sour-is/ev/pkg/es/event" + "github.com/sour-is/ev/pkg/gql" gqlparser "github.com/vektah/gqlparser/v2" "github.com/vektah/gqlparser/v2/ast" ) @@ -81,7 +83,7 @@ type ComplexityRoot struct { } Query struct { - Posts func(childComplexity int, streamID string, paging *gql_ev.PageInput) int + Posts func(childComplexity int, streamID string, paging *gql.PageInput) int SaltyUser func(childComplexity int, nick string) int __resolve__service func(childComplexity int) int } @@ -103,14 +105,14 @@ type ComplexityRoot struct { } type MutationResolver interface { - CreateSaltyUser(ctx context.Context, nick string, pubkey string) (*gql_ev.SaltyUser, error) + CreateSaltyUser(ctx context.Context, nick string, pubkey string) (*salty.SaltyUser, error) } type QueryResolver interface { - Posts(ctx context.Context, streamID string, paging *gql_ev.PageInput) (*gql_ev.Connection, error) - SaltyUser(ctx context.Context, nick string) (*gql_ev.SaltyUser, error) + Posts(ctx context.Context, streamID string, paging *gql.PageInput) (*gql.Connection, error) + SaltyUser(ctx context.Context, nick string) (*salty.SaltyUser, error) } type SubscriptionResolver interface { - PostAdded(ctx context.Context, streamID string, after int64) (<-chan *gql_ev.PostEvent, error) + PostAdded(ctx context.Context, streamID string, after int64) (<-chan *msgbus.PostEvent, error) } type executableSchema struct { @@ -255,7 +257,7 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in return 0, false } - return e.complexity.Query.Posts(childComplexity, args["streamID"].(string), args["paging"].(*gql_ev.PageInput)), true + return e.complexity.Query.Posts(childComplexity, args["streamID"].(string), args["paging"].(*gql.PageInput)), true case "Query.saltyUser": if e.complexity.Query.SaltyUser == nil { @@ -409,35 +411,35 @@ func (ec *executionContext) introspectType(name string) (*introspection.Type, er } var sources = []*ast.Source{ - {Name: "../../../api/gql_ev/common.graphqls", Input: `scalar Time + {Name: "../../../pkg/es/es.graphqls", Input: ` +type Meta @goModel(model: "github.com/sour-is/ev/pkg/es/event.Meta") { + eventID: String! @goField(name: "getEventID") + streamID: String! + created: Time! + position: Int! +}`, BuiltIn: false}, + {Name: "../../../pkg/gql/common.graphqls", Input: `scalar Time scalar Map -type Connection { +type Connection @goModel(model: "github.com/sour-is/ev/pkg/gql.Connection") { paging: PageInfo! edges: [Edge!]! } -input PageInput { +input PageInput @goModel(model: "github.com/sour-is/ev/pkg/gql.PageInput") { idx: Int = 0 count: Int = 30 } -type PageInfo { +type PageInfo @goModel(model: "github.com/sour-is/ev/pkg/gql.PageInfo") { next: Boolean! prev: Boolean! begin: Int! end: Int! } -interface Edge { +interface Edge @goModel(model: "github.com/sour-is/ev/pkg/gql.Edge"){ id: ID! } -type Meta { - eventID: String! @goField(name: "getEventID") - streamID: String! - created: Time! - position: Int! -} - directive @goModel( model: String models: [String!] @@ -452,14 +454,14 @@ directive @goTag( key: String! value: String ) on INPUT_FIELD_DEFINITION | FIELD_DEFINITION`, BuiltIn: false}, - {Name: "../../../api/gql_ev/msgbus.graphqls", Input: `extend type Query { + {Name: "../../../app/msgbus/msgbus.graphqls", Input: `extend type Query { posts(streamID: String! paging: PageInput): Connection! } extend type Subscription { """after == 0 start from begining, after == -1 start from end""" postAdded(streamID: String! after: Int! = -1): PostEvent } -type PostEvent implements Edge { +type PostEvent implements Edge @goModel(model: "github.com/sour-is/ev/app/msgbus.PostEvent") { id: ID! payload: String! @@ -468,7 +470,7 @@ type PostEvent implements Edge { meta: Meta! }`, BuiltIn: false}, - {Name: "../../../api/gql_ev/salty.graphqls", Input: `extend type Query { + {Name: "../../../app/salty/salty.graphqls", Input: `extend type Query { saltyUser(nick: String!): SaltyUser } @@ -476,7 +478,7 @@ extend type Mutation { createSaltyUser(nick: String! pubkey: String!): SaltyUser } -type SaltyUser { +type SaltyUser @goModel(model: "github.com/sour-is/ev/app/salty.SaltyUser"){ nick: String! pubkey: String! inbox: String! @@ -560,10 +562,10 @@ func (ec *executionContext) field_Query_posts_args(ctx context.Context, rawArgs } } args["streamID"] = arg0 - var arg1 *gql_ev.PageInput + var arg1 *gql.PageInput if tmp, ok := rawArgs["paging"]; ok { ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("paging")) - arg1, err = ec.unmarshalOPageInput2ᚖgithubᚗcomᚋsourᚑisᚋevᚋapiᚋgql_evᚐPageInput(ctx, tmp) + arg1, err = ec.unmarshalOPageInput2ᚖgithubᚗcomᚋsourᚑisᚋevᚋpkgᚋgqlᚐPageInput(ctx, tmp) if err != nil { return nil, err } @@ -649,7 +651,7 @@ func (ec *executionContext) field___Type_fields_args(ctx context.Context, rawArg // region **************************** field.gotpl ***************************** -func (ec *executionContext) _Connection_paging(ctx context.Context, field graphql.CollectedField, obj *gql_ev.Connection) (ret graphql.Marshaler) { +func (ec *executionContext) _Connection_paging(ctx context.Context, field graphql.CollectedField, obj *gql.Connection) (ret graphql.Marshaler) { fc, err := ec.fieldContext_Connection_paging(ctx, field) if err != nil { return graphql.Null @@ -675,9 +677,9 @@ func (ec *executionContext) _Connection_paging(ctx context.Context, field graphq } return graphql.Null } - res := resTmp.(*gql_ev.PageInfo) + res := resTmp.(*gql.PageInfo) fc.Result = res - return ec.marshalNPageInfo2ᚖgithubᚗcomᚋsourᚑisᚋevᚋapiᚋgql_evᚐPageInfo(ctx, field.Selections, res) + return ec.marshalNPageInfo2ᚖgithubᚗcomᚋsourᚑisᚋevᚋpkgᚋgqlᚐPageInfo(ctx, field.Selections, res) } func (ec *executionContext) fieldContext_Connection_paging(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { @@ -703,7 +705,7 @@ func (ec *executionContext) fieldContext_Connection_paging(ctx context.Context, return fc, nil } -func (ec *executionContext) _Connection_edges(ctx context.Context, field graphql.CollectedField, obj *gql_ev.Connection) (ret graphql.Marshaler) { +func (ec *executionContext) _Connection_edges(ctx context.Context, field graphql.CollectedField, obj *gql.Connection) (ret graphql.Marshaler) { fc, err := ec.fieldContext_Connection_edges(ctx, field) if err != nil { return graphql.Null @@ -729,9 +731,9 @@ func (ec *executionContext) _Connection_edges(ctx context.Context, field graphql } return graphql.Null } - res := resTmp.([]gql_ev.Edge) + res := resTmp.([]gql.Edge) fc.Result = res - return ec.marshalNEdge2ᚕgithubᚗcomᚋsourᚑisᚋevᚋapiᚋgql_evᚐEdgeᚄ(ctx, field.Selections, res) + return ec.marshalNEdge2ᚕgithubᚗcomᚋsourᚑisᚋevᚋpkgᚋgqlᚐEdgeᚄ(ctx, field.Selections, res) } func (ec *executionContext) fieldContext_Connection_edges(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { @@ -946,9 +948,9 @@ func (ec *executionContext) _Mutation_createSaltyUser(ctx context.Context, field if resTmp == nil { return graphql.Null } - res := resTmp.(*gql_ev.SaltyUser) + res := resTmp.(*salty.SaltyUser) fc.Result = res - return ec.marshalOSaltyUser2ᚖgithubᚗcomᚋsourᚑisᚋevᚋapiᚋgql_evᚐSaltyUser(ctx, field.Selections, res) + return ec.marshalOSaltyUser2ᚖgithubᚗcomᚋsourᚑisᚋevᚋappᚋsaltyᚐSaltyUser(ctx, field.Selections, res) } func (ec *executionContext) fieldContext_Mutation_createSaltyUser(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { @@ -985,7 +987,7 @@ func (ec *executionContext) fieldContext_Mutation_createSaltyUser(ctx context.Co return fc, nil } -func (ec *executionContext) _PageInfo_next(ctx context.Context, field graphql.CollectedField, obj *gql_ev.PageInfo) (ret graphql.Marshaler) { +func (ec *executionContext) _PageInfo_next(ctx context.Context, field graphql.CollectedField, obj *gql.PageInfo) (ret graphql.Marshaler) { fc, err := ec.fieldContext_PageInfo_next(ctx, field) if err != nil { return graphql.Null @@ -1029,7 +1031,7 @@ func (ec *executionContext) fieldContext_PageInfo_next(ctx context.Context, fiel return fc, nil } -func (ec *executionContext) _PageInfo_prev(ctx context.Context, field graphql.CollectedField, obj *gql_ev.PageInfo) (ret graphql.Marshaler) { +func (ec *executionContext) _PageInfo_prev(ctx context.Context, field graphql.CollectedField, obj *gql.PageInfo) (ret graphql.Marshaler) { fc, err := ec.fieldContext_PageInfo_prev(ctx, field) if err != nil { return graphql.Null @@ -1073,7 +1075,7 @@ func (ec *executionContext) fieldContext_PageInfo_prev(ctx context.Context, fiel return fc, nil } -func (ec *executionContext) _PageInfo_begin(ctx context.Context, field graphql.CollectedField, obj *gql_ev.PageInfo) (ret graphql.Marshaler) { +func (ec *executionContext) _PageInfo_begin(ctx context.Context, field graphql.CollectedField, obj *gql.PageInfo) (ret graphql.Marshaler) { fc, err := ec.fieldContext_PageInfo_begin(ctx, field) if err != nil { return graphql.Null @@ -1117,7 +1119,7 @@ func (ec *executionContext) fieldContext_PageInfo_begin(ctx context.Context, fie return fc, nil } -func (ec *executionContext) _PageInfo_end(ctx context.Context, field graphql.CollectedField, obj *gql_ev.PageInfo) (ret graphql.Marshaler) { +func (ec *executionContext) _PageInfo_end(ctx context.Context, field graphql.CollectedField, obj *gql.PageInfo) (ret graphql.Marshaler) { fc, err := ec.fieldContext_PageInfo_end(ctx, field) if err != nil { return graphql.Null @@ -1161,7 +1163,7 @@ func (ec *executionContext) fieldContext_PageInfo_end(ctx context.Context, field return fc, nil } -func (ec *executionContext) _PostEvent_id(ctx context.Context, field graphql.CollectedField, obj *gql_ev.PostEvent) (ret graphql.Marshaler) { +func (ec *executionContext) _PostEvent_id(ctx context.Context, field graphql.CollectedField, obj *msgbus.PostEvent) (ret graphql.Marshaler) { fc, err := ec.fieldContext_PostEvent_id(ctx, field) if err != nil { return graphql.Null @@ -1175,7 +1177,7 @@ func (ec *executionContext) _PostEvent_id(ctx context.Context, field graphql.Col }() resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { ctx = rctx // use context from middleware stack in children - return obj.ID, nil + return obj.ID(), nil }) if err != nil { ec.Error(ctx, err) @@ -1196,7 +1198,7 @@ func (ec *executionContext) fieldContext_PostEvent_id(ctx context.Context, field fc = &graphql.FieldContext{ Object: "PostEvent", Field: field, - IsMethod: false, + IsMethod: true, IsResolver: false, Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { return nil, errors.New("field of type ID does not have child fields") @@ -1205,7 +1207,7 @@ func (ec *executionContext) fieldContext_PostEvent_id(ctx context.Context, field return fc, nil } -func (ec *executionContext) _PostEvent_payload(ctx context.Context, field graphql.CollectedField, obj *gql_ev.PostEvent) (ret graphql.Marshaler) { +func (ec *executionContext) _PostEvent_payload(ctx context.Context, field graphql.CollectedField, obj *msgbus.PostEvent) (ret graphql.Marshaler) { fc, err := ec.fieldContext_PostEvent_payload(ctx, field) if err != nil { return graphql.Null @@ -1219,7 +1221,7 @@ func (ec *executionContext) _PostEvent_payload(ctx context.Context, field graphq }() resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { ctx = rctx // use context from middleware stack in children - return obj.Payload, nil + return obj.Payload(), nil }) if err != nil { ec.Error(ctx, err) @@ -1240,7 +1242,7 @@ func (ec *executionContext) fieldContext_PostEvent_payload(ctx context.Context, fc = &graphql.FieldContext{ Object: "PostEvent", Field: field, - IsMethod: false, + IsMethod: true, IsResolver: false, Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { return nil, errors.New("field of type String does not have child fields") @@ -1249,7 +1251,7 @@ func (ec *executionContext) fieldContext_PostEvent_payload(ctx context.Context, return fc, nil } -func (ec *executionContext) _PostEvent_payloadJSON(ctx context.Context, field graphql.CollectedField, obj *gql_ev.PostEvent) (ret graphql.Marshaler) { +func (ec *executionContext) _PostEvent_payloadJSON(ctx context.Context, field graphql.CollectedField, obj *msgbus.PostEvent) (ret graphql.Marshaler) { fc, err := ec.fieldContext_PostEvent_payloadJSON(ctx, field) if err != nil { return graphql.Null @@ -1293,7 +1295,7 @@ func (ec *executionContext) fieldContext_PostEvent_payloadJSON(ctx context.Conte return fc, nil } -func (ec *executionContext) _PostEvent_tags(ctx context.Context, field graphql.CollectedField, obj *gql_ev.PostEvent) (ret graphql.Marshaler) { +func (ec *executionContext) _PostEvent_tags(ctx context.Context, field graphql.CollectedField, obj *msgbus.PostEvent) (ret graphql.Marshaler) { fc, err := ec.fieldContext_PostEvent_tags(ctx, field) if err != nil { return graphql.Null @@ -1307,7 +1309,7 @@ func (ec *executionContext) _PostEvent_tags(ctx context.Context, field graphql.C }() resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { ctx = rctx // use context from middleware stack in children - return obj.Tags, nil + return obj.Tags(), nil }) if err != nil { ec.Error(ctx, err) @@ -1328,7 +1330,7 @@ func (ec *executionContext) fieldContext_PostEvent_tags(ctx context.Context, fie fc = &graphql.FieldContext{ Object: "PostEvent", Field: field, - IsMethod: false, + IsMethod: true, IsResolver: false, Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { return nil, errors.New("field of type String does not have child fields") @@ -1337,7 +1339,7 @@ func (ec *executionContext) fieldContext_PostEvent_tags(ctx context.Context, fie return fc, nil } -func (ec *executionContext) _PostEvent_meta(ctx context.Context, field graphql.CollectedField, obj *gql_ev.PostEvent) (ret graphql.Marshaler) { +func (ec *executionContext) _PostEvent_meta(ctx context.Context, field graphql.CollectedField, obj *msgbus.PostEvent) (ret graphql.Marshaler) { fc, err := ec.fieldContext_PostEvent_meta(ctx, field) if err != nil { return graphql.Null @@ -1351,7 +1353,7 @@ func (ec *executionContext) _PostEvent_meta(ctx context.Context, field graphql.C }() resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { ctx = rctx // use context from middleware stack in children - return obj.Meta, nil + return obj.Meta(), nil }) if err != nil { ec.Error(ctx, err) @@ -1372,7 +1374,7 @@ func (ec *executionContext) fieldContext_PostEvent_meta(ctx context.Context, fie fc = &graphql.FieldContext{ Object: "PostEvent", Field: field, - IsMethod: false, + IsMethod: true, IsResolver: false, Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { switch field.Name { @@ -1405,7 +1407,7 @@ func (ec *executionContext) _Query_posts(ctx context.Context, field graphql.Coll }() resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { ctx = rctx // use context from middleware stack in children - return ec.resolvers.Query().Posts(rctx, fc.Args["streamID"].(string), fc.Args["paging"].(*gql_ev.PageInput)) + return ec.resolvers.Query().Posts(rctx, fc.Args["streamID"].(string), fc.Args["paging"].(*gql.PageInput)) }) if err != nil { ec.Error(ctx, err) @@ -1417,9 +1419,9 @@ func (ec *executionContext) _Query_posts(ctx context.Context, field graphql.Coll } return graphql.Null } - res := resTmp.(*gql_ev.Connection) + res := resTmp.(*gql.Connection) fc.Result = res - return ec.marshalNConnection2ᚖgithubᚗcomᚋsourᚑisᚋevᚋapiᚋgql_evᚐConnection(ctx, field.Selections, res) + return ec.marshalNConnection2ᚖgithubᚗcomᚋsourᚑisᚋevᚋpkgᚋgqlᚐConnection(ctx, field.Selections, res) } func (ec *executionContext) fieldContext_Query_posts(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { @@ -1475,9 +1477,9 @@ func (ec *executionContext) _Query_saltyUser(ctx context.Context, field graphql. if resTmp == nil { return graphql.Null } - res := resTmp.(*gql_ev.SaltyUser) + res := resTmp.(*salty.SaltyUser) fc.Result = res - return ec.marshalOSaltyUser2ᚖgithubᚗcomᚋsourᚑisᚋevᚋapiᚋgql_evᚐSaltyUser(ctx, field.Selections, res) + return ec.marshalOSaltyUser2ᚖgithubᚗcomᚋsourᚑisᚋevᚋappᚋsaltyᚐSaltyUser(ctx, field.Selections, res) } func (ec *executionContext) fieldContext_Query_saltyUser(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { @@ -1691,7 +1693,7 @@ func (ec *executionContext) fieldContext_Query___schema(ctx context.Context, fie return fc, nil } -func (ec *executionContext) _SaltyUser_nick(ctx context.Context, field graphql.CollectedField, obj *gql_ev.SaltyUser) (ret graphql.Marshaler) { +func (ec *executionContext) _SaltyUser_nick(ctx context.Context, field graphql.CollectedField, obj *salty.SaltyUser) (ret graphql.Marshaler) { fc, err := ec.fieldContext_SaltyUser_nick(ctx, field) if err != nil { return graphql.Null @@ -1705,7 +1707,7 @@ func (ec *executionContext) _SaltyUser_nick(ctx context.Context, field graphql.C }() resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { ctx = rctx // use context from middleware stack in children - return obj.Nick, nil + return obj.Nick(), nil }) if err != nil { ec.Error(ctx, err) @@ -1726,7 +1728,7 @@ func (ec *executionContext) fieldContext_SaltyUser_nick(ctx context.Context, fie fc = &graphql.FieldContext{ Object: "SaltyUser", Field: field, - IsMethod: false, + IsMethod: true, IsResolver: false, Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { return nil, errors.New("field of type String does not have child fields") @@ -1735,7 +1737,7 @@ func (ec *executionContext) fieldContext_SaltyUser_nick(ctx context.Context, fie return fc, nil } -func (ec *executionContext) _SaltyUser_pubkey(ctx context.Context, field graphql.CollectedField, obj *gql_ev.SaltyUser) (ret graphql.Marshaler) { +func (ec *executionContext) _SaltyUser_pubkey(ctx context.Context, field graphql.CollectedField, obj *salty.SaltyUser) (ret graphql.Marshaler) { fc, err := ec.fieldContext_SaltyUser_pubkey(ctx, field) if err != nil { return graphql.Null @@ -1749,7 +1751,7 @@ func (ec *executionContext) _SaltyUser_pubkey(ctx context.Context, field graphql }() resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { ctx = rctx // use context from middleware stack in children - return obj.Pubkey, nil + return obj.Pubkey(), nil }) if err != nil { ec.Error(ctx, err) @@ -1770,7 +1772,7 @@ func (ec *executionContext) fieldContext_SaltyUser_pubkey(ctx context.Context, f fc = &graphql.FieldContext{ Object: "SaltyUser", Field: field, - IsMethod: false, + IsMethod: true, IsResolver: false, Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { return nil, errors.New("field of type String does not have child fields") @@ -1779,7 +1781,7 @@ func (ec *executionContext) fieldContext_SaltyUser_pubkey(ctx context.Context, f return fc, nil } -func (ec *executionContext) _SaltyUser_inbox(ctx context.Context, field graphql.CollectedField, obj *gql_ev.SaltyUser) (ret graphql.Marshaler) { +func (ec *executionContext) _SaltyUser_inbox(ctx context.Context, field graphql.CollectedField, obj *salty.SaltyUser) (ret graphql.Marshaler) { fc, err := ec.fieldContext_SaltyUser_inbox(ctx, field) if err != nil { return graphql.Null @@ -1793,7 +1795,7 @@ func (ec *executionContext) _SaltyUser_inbox(ctx context.Context, field graphql. }() resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { ctx = rctx // use context from middleware stack in children - return obj.Inbox, nil + return obj.Inbox(), nil }) if err != nil { ec.Error(ctx, err) @@ -1814,7 +1816,7 @@ func (ec *executionContext) fieldContext_SaltyUser_inbox(ctx context.Context, fi fc = &graphql.FieldContext{ Object: "SaltyUser", Field: field, - IsMethod: false, + IsMethod: true, IsResolver: false, Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { return nil, errors.New("field of type String does not have child fields") @@ -1823,7 +1825,7 @@ func (ec *executionContext) fieldContext_SaltyUser_inbox(ctx context.Context, fi return fc, nil } -func (ec *executionContext) _SaltyUser_endpoint(ctx context.Context, field graphql.CollectedField, obj *gql_ev.SaltyUser) (ret graphql.Marshaler) { +func (ec *executionContext) _SaltyUser_endpoint(ctx context.Context, field graphql.CollectedField, obj *salty.SaltyUser) (ret graphql.Marshaler) { fc, err := ec.fieldContext_SaltyUser_endpoint(ctx, field) if err != nil { return graphql.Null @@ -1892,7 +1894,7 @@ func (ec *executionContext) _Subscription_postAdded(ctx context.Context, field g } return func(ctx context.Context) graphql.Marshaler { select { - case res, ok := <-resTmp.(<-chan *gql_ev.PostEvent): + case res, ok := <-resTmp.(<-chan *msgbus.PostEvent): if !ok { return nil } @@ -1900,7 +1902,7 @@ func (ec *executionContext) _Subscription_postAdded(ctx context.Context, field g w.Write([]byte{'{'}) graphql.MarshalString(field.Alias).MarshalGQL(w) w.Write([]byte{':'}) - ec.marshalOPostEvent2ᚖgithubᚗcomᚋsourᚑisᚋevᚋapiᚋgql_evᚐPostEvent(ctx, field.Selections, res).MarshalGQL(w) + ec.marshalOPostEvent2ᚖgithubᚗcomᚋsourᚑisᚋevᚋappᚋmsgbusᚐPostEvent(ctx, field.Selections, res).MarshalGQL(w) w.Write([]byte{'}'}) }) case <-ctx.Done(): @@ -3759,8 +3761,8 @@ func (ec *executionContext) fieldContext___Type_specifiedByURL(ctx context.Conte // region **************************** input.gotpl ***************************** -func (ec *executionContext) unmarshalInputPageInput(ctx context.Context, obj interface{}) (gql_ev.PageInput, error) { - var it gql_ev.PageInput +func (ec *executionContext) unmarshalInputPageInput(ctx context.Context, obj interface{}) (gql.PageInput, error) { + var it gql.PageInput asMap := map[string]interface{}{} for k, v := range obj.(map[string]interface{}) { asMap[k] = v @@ -3806,13 +3808,11 @@ func (ec *executionContext) unmarshalInputPageInput(ctx context.Context, obj int // region ************************** interface.gotpl *************************** -func (ec *executionContext) _Edge(ctx context.Context, sel ast.SelectionSet, obj gql_ev.Edge) graphql.Marshaler { +func (ec *executionContext) _Edge(ctx context.Context, sel ast.SelectionSet, obj gql.Edge) graphql.Marshaler { switch obj := (obj).(type) { case nil: return graphql.Null - case gql_ev.PostEvent: - return ec._PostEvent(ctx, sel, &obj) - case *gql_ev.PostEvent: + case *msgbus.PostEvent: if obj == nil { return graphql.Null } @@ -3828,7 +3828,7 @@ func (ec *executionContext) _Edge(ctx context.Context, sel ast.SelectionSet, obj var connectionImplementors = []string{"Connection"} -func (ec *executionContext) _Connection(ctx context.Context, sel ast.SelectionSet, obj *gql_ev.Connection) graphql.Marshaler { +func (ec *executionContext) _Connection(ctx context.Context, sel ast.SelectionSet, obj *gql.Connection) graphql.Marshaler { fields := graphql.CollectFields(ec.OperationContext, sel, connectionImplementors) out := graphql.NewFieldSet(fields) var invalids uint32 @@ -3948,7 +3948,7 @@ func (ec *executionContext) _Mutation(ctx context.Context, sel ast.SelectionSet) var pageInfoImplementors = []string{"PageInfo"} -func (ec *executionContext) _PageInfo(ctx context.Context, sel ast.SelectionSet, obj *gql_ev.PageInfo) graphql.Marshaler { +func (ec *executionContext) _PageInfo(ctx context.Context, sel ast.SelectionSet, obj *gql.PageInfo) graphql.Marshaler { fields := graphql.CollectFields(ec.OperationContext, sel, pageInfoImplementors) out := graphql.NewFieldSet(fields) var invalids uint32 @@ -3997,7 +3997,7 @@ func (ec *executionContext) _PageInfo(ctx context.Context, sel ast.SelectionSet, var postEventImplementors = []string{"PostEvent", "Edge"} -func (ec *executionContext) _PostEvent(ctx context.Context, sel ast.SelectionSet, obj *gql_ev.PostEvent) graphql.Marshaler { +func (ec *executionContext) _PostEvent(ctx context.Context, sel ast.SelectionSet, obj *msgbus.PostEvent) graphql.Marshaler { fields := graphql.CollectFields(ec.OperationContext, sel, postEventImplementors) out := graphql.NewFieldSet(fields) var invalids uint32 @@ -4174,7 +4174,7 @@ func (ec *executionContext) _Query(ctx context.Context, sel ast.SelectionSet) gr var saltyUserImplementors = []string{"SaltyUser"} -func (ec *executionContext) _SaltyUser(ctx context.Context, sel ast.SelectionSet, obj *gql_ev.SaltyUser) graphql.Marshaler { +func (ec *executionContext) _SaltyUser(ctx context.Context, sel ast.SelectionSet, obj *salty.SaltyUser) graphql.Marshaler { fields := graphql.CollectFields(ec.OperationContext, sel, saltyUserImplementors) out := graphql.NewFieldSet(fields) var invalids uint32 @@ -4612,11 +4612,11 @@ func (ec *executionContext) marshalNBoolean2bool(ctx context.Context, sel ast.Se return res } -func (ec *executionContext) marshalNConnection2githubᚗcomᚋsourᚑisᚋevᚋapiᚋgql_evᚐConnection(ctx context.Context, sel ast.SelectionSet, v gql_ev.Connection) graphql.Marshaler { +func (ec *executionContext) marshalNConnection2githubᚗcomᚋsourᚑisᚋevᚋpkgᚋgqlᚐConnection(ctx context.Context, sel ast.SelectionSet, v gql.Connection) graphql.Marshaler { return ec._Connection(ctx, sel, &v) } -func (ec *executionContext) marshalNConnection2ᚖgithubᚗcomᚋsourᚑisᚋevᚋapiᚋgql_evᚐConnection(ctx context.Context, sel ast.SelectionSet, v *gql_ev.Connection) graphql.Marshaler { +func (ec *executionContext) marshalNConnection2ᚖgithubᚗcomᚋsourᚑisᚋevᚋpkgᚋgqlᚐConnection(ctx context.Context, sel ast.SelectionSet, v *gql.Connection) graphql.Marshaler { if v == nil { if !graphql.HasFieldError(ctx, graphql.GetFieldContext(ctx)) { ec.Errorf(ctx, "the requested element is null which the schema does not allow") @@ -4626,7 +4626,7 @@ func (ec *executionContext) marshalNConnection2ᚖgithubᚗcomᚋsourᚑisᚋev return ec._Connection(ctx, sel, v) } -func (ec *executionContext) marshalNEdge2githubᚗcomᚋsourᚑisᚋevᚋapiᚋgql_evᚐEdge(ctx context.Context, sel ast.SelectionSet, v gql_ev.Edge) graphql.Marshaler { +func (ec *executionContext) marshalNEdge2githubᚗcomᚋsourᚑisᚋevᚋpkgᚋgqlᚐEdge(ctx context.Context, sel ast.SelectionSet, v gql.Edge) graphql.Marshaler { if v == nil { if !graphql.HasFieldError(ctx, graphql.GetFieldContext(ctx)) { ec.Errorf(ctx, "the requested element is null which the schema does not allow") @@ -4636,7 +4636,7 @@ func (ec *executionContext) marshalNEdge2githubᚗcomᚋsourᚑisᚋevᚋapiᚋg return ec._Edge(ctx, sel, v) } -func (ec *executionContext) marshalNEdge2ᚕgithubᚗcomᚋsourᚑisᚋevᚋapiᚋgql_evᚐEdgeᚄ(ctx context.Context, sel ast.SelectionSet, v []gql_ev.Edge) graphql.Marshaler { +func (ec *executionContext) marshalNEdge2ᚕgithubᚗcomᚋsourᚑisᚋevᚋpkgᚋgqlᚐEdgeᚄ(ctx context.Context, sel ast.SelectionSet, v []gql.Edge) graphql.Marshaler { ret := make(graphql.Array, len(v)) var wg sync.WaitGroup isLen1 := len(v) == 1 @@ -4660,7 +4660,7 @@ func (ec *executionContext) marshalNEdge2ᚕgithubᚗcomᚋsourᚑisᚋevᚋapi if !isLen1 { defer wg.Done() } - ret[i] = ec.marshalNEdge2githubᚗcomᚋsourᚑisᚋevᚋapiᚋgql_evᚐEdge(ctx, sel, v[i]) + ret[i] = ec.marshalNEdge2githubᚗcomᚋsourᚑisᚋevᚋpkgᚋgqlᚐEdge(ctx, sel, v[i]) } if isLen1 { f(i) @@ -4756,7 +4756,7 @@ func (ec *executionContext) marshalNMeta2ᚖgithubᚗcomᚋsourᚑisᚋevᚋpkg return ec._Meta(ctx, sel, v) } -func (ec *executionContext) marshalNPageInfo2ᚖgithubᚗcomᚋsourᚑisᚋevᚋapiᚋgql_evᚐPageInfo(ctx context.Context, sel ast.SelectionSet, v *gql_ev.PageInfo) graphql.Marshaler { +func (ec *executionContext) marshalNPageInfo2ᚖgithubᚗcomᚋsourᚑisᚋevᚋpkgᚋgqlᚐPageInfo(ctx context.Context, sel ast.SelectionSet, v *gql.PageInfo) graphql.Marshaler { if v == nil { if !graphql.HasFieldError(ctx, graphql.GetFieldContext(ctx)) { ec.Errorf(ctx, "the requested element is null which the schema does not allow") @@ -5142,7 +5142,7 @@ func (ec *executionContext) marshalOInt2ᚖint64(ctx context.Context, sel ast.Se return res } -func (ec *executionContext) unmarshalOPageInput2ᚖgithubᚗcomᚋsourᚑisᚋevᚋapiᚋgql_evᚐPageInput(ctx context.Context, v interface{}) (*gql_ev.PageInput, error) { +func (ec *executionContext) unmarshalOPageInput2ᚖgithubᚗcomᚋsourᚑisᚋevᚋpkgᚋgqlᚐPageInput(ctx context.Context, v interface{}) (*gql.PageInput, error) { if v == nil { return nil, nil } @@ -5150,14 +5150,14 @@ func (ec *executionContext) unmarshalOPageInput2ᚖgithubᚗcomᚋsourᚑisᚋev return &res, graphql.ErrorOnPath(ctx, err) } -func (ec *executionContext) marshalOPostEvent2ᚖgithubᚗcomᚋsourᚑisᚋevᚋapiᚋgql_evᚐPostEvent(ctx context.Context, sel ast.SelectionSet, v *gql_ev.PostEvent) graphql.Marshaler { +func (ec *executionContext) marshalOPostEvent2ᚖgithubᚗcomᚋsourᚑisᚋevᚋappᚋmsgbusᚐPostEvent(ctx context.Context, sel ast.SelectionSet, v *msgbus.PostEvent) graphql.Marshaler { if v == nil { return graphql.Null } return ec._PostEvent(ctx, sel, v) } -func (ec *executionContext) marshalOSaltyUser2ᚖgithubᚗcomᚋsourᚑisᚋevᚋapiᚋgql_evᚐSaltyUser(ctx context.Context, sel ast.SelectionSet, v *gql_ev.SaltyUser) graphql.Marshaler { +func (ec *executionContext) marshalOSaltyUser2ᚖgithubᚗcomᚋsourᚑisᚋevᚋappᚋsaltyᚐSaltyUser(ctx context.Context, sel ast.SelectionSet, v *salty.SaltyUser) graphql.Marshaler { if v == nil { return graphql.Null } diff --git a/internal/graph/resolver.go b/internal/graph/resolver.go deleted file mode 100644 index f253ebf..0000000 --- a/internal/graph/resolver.go +++ /dev/null @@ -1,55 +0,0 @@ -package graph - -import ( - "net/http" - "reflect" - - "github.com/sour-is/ev/api/gql_ev" - "github.com/sour-is/ev/internal/graph/generated" -) - -// This file will not be regenerated automatically. -// -// It serves as dependency injection for your app, add any dependencies you require here. - -type Resolver struct { - *gql_ev.Resolver -} - -func New(r *gql_ev.Resolver) *Resolver { - return &Resolver{r} -} - -// Query returns generated.QueryResolver implementation. -func (r *Resolver) Query() generated.QueryResolver { return &queryResolver{r} } - -// Query returns generated.QueryResolver implementation. -func (r *Resolver) Mutation() generated.MutationResolver { return &mutationResolver{r} } - -// Subscription returns generated.SubscriptionResolver implementation. -func (r *Resolver) Subscription() generated.SubscriptionResolver { return &subscriptionResolver{r} } - -type queryResolver struct{ *Resolver } - -type mutationResolver struct{ *Resolver } - -type subscriptionResolver struct{ *Resolver } - -func (r *Resolver) ChainMiddlewares(h http.Handler) http.Handler { - v := reflect.ValueOf(r) // Get reflected value of *Resolver - v = reflect.Indirect(v) // Get the pointed value (returns a zero value on nil) - n := v.NumField() // Get number of fields to iterate over. - for i := 0; i < n; i++ { - f := v.Field(i) - if !f.CanInterface() { // Skip non-interface types. - continue - } - if iface, ok := f.Interface().(interface { - GetMiddleware() func(http.Handler) http.Handler - }); ok { - h = iface.GetMiddleware()(h) // Append only items that fulfill the interface. - } - } - - return h -} diff --git a/main.go b/main.go index 52c4fc2..e9dab38 100644 --- a/main.go +++ b/main.go @@ -6,6 +6,7 @@ import ( "net/http" "os" "os/signal" + "path" "time" "github.com/99designs/gqlgen/graphql/handler" @@ -14,16 +15,15 @@ import ( "golang.org/x/sync/errgroup" "github.com/sour-is/ev/api/gql_ev" - "github.com/sour-is/ev/internal/graph" + "github.com/sour-is/ev/app/msgbus" + "github.com/sour-is/ev/app/playground" + "github.com/sour-is/ev/app/salty" "github.com/sour-is/ev/internal/graph/generated" "github.com/sour-is/ev/internal/logz" - "github.com/sour-is/ev/pkg/domain" "github.com/sour-is/ev/pkg/es" diskstore "github.com/sour-is/ev/pkg/es/driver/disk-store" memstore "github.com/sour-is/ev/pkg/es/driver/mem-store" "github.com/sour-is/ev/pkg/es/driver/streamer" - "github.com/sour-is/ev/pkg/msgbus" - "github.com/sour-is/ev/pkg/playground" ) const AppName string = "sour.is-ev" @@ -56,10 +56,6 @@ func run(ctx context.Context) error { diskstore.Init(ctx) memstore.Init(ctx) - if err := domain.Init(ctx); err != nil { - span.RecordError(err) - return err - } es, err := es.Open(ctx, env("EV_DATA", "file:data"), streamer.New(ctx)) if err != nil { @@ -67,33 +63,39 @@ func run(ctx context.Context) error { return err } - svc, err := msgbus.New(ctx, es, env("EV_BASE_URL", "https://ev.sour.is/inbox/")) - if err != nil { - span.RecordError(err) - return err - } - - r, err := gql_ev.New(ctx, es) - if err != nil { - span.RecordError(err) - return err - } - res := graph.New(r) - gql := handler.NewDefaultServer(generated.NewExecutableSchema(generated.Config{Resolvers: res})) - gql.Use(otelgqlgen.Middleware()) - s := http.Server{ Addr: env("EV_HTTP", ":8080"), } + saltySVC, err := salty.New(ctx, es, path.Join(env("EV_BASE_URL", "http://localhost" + s.Addr), "inbox")) + if err != nil { + span.RecordError(err) + return err + } + + msgbusSVC, err := msgbus.New(ctx, es) + if err != nil { + span.RecordError(err) + return err + } + + res, err := gql_ev.New(ctx, msgbusSVC, saltySVC) + if err != nil { + span.RecordError(err) + return err + } + gql := handler.NewDefaultServer(generated.NewExecutableSchema(generated.Config{Resolvers: res})) + gql.Use(otelgqlgen.Middleware()) + + mux := http.NewServeMux() mux.Handle("/", playground.Handler("GraphQL playground", "/gql")) mux.Handle("/gql", logz.Htrace(res.ChainMiddlewares(gql), "gql")) mux.Handle("/metrics", logz.PromHTTP(ctx)) - mux.Handle("/inbox/", logz.Htrace(http.StripPrefix("/inbox/", svc), "inbox")) - mux.Handle("/.well-known/salty/", logz.Htrace(svc, "lookup")) + mux.Handle("/inbox/", logz.Htrace(http.StripPrefix("/inbox/", msgbusSVC), "inbox")) + mux.Handle("/.well-known/salty/", logz.Htrace(saltySVC, "lookup")) s.Handler = cors.AllowAll().Handler(mux) diff --git a/pkg/es/driver/disk-store/disk-store.go b/pkg/es/driver/disk-store/disk-store.go index d61bb83..43e7057 100644 --- a/pkg/es/driver/disk-store/disk-store.go +++ b/pkg/es/driver/disk-store/disk-store.go @@ -215,7 +215,6 @@ func (e *eventLog) Read(ctx context.Context, pos, count int64) (event.Events, er } start, count := math.PagerBox(first, last, pos, count) - span.AddEvent(fmt.Sprint("reading", first, last, pos, count, start)) if count == 0 { return nil } diff --git a/pkg/es/es.graphqls b/pkg/es/es.graphqls new file mode 100644 index 0000000..836253c --- /dev/null +++ b/pkg/es/es.graphqls @@ -0,0 +1,7 @@ + +type Meta @goModel(model: "github.com/sour-is/ev/pkg/es/event.Meta") { + eventID: String! @goField(name: "getEventID") + streamID: String! + created: Time! + position: Int! +} \ No newline at end of file diff --git a/pkg/es/event/reflect.go b/pkg/es/event/reflect.go index 97de8c4..f6faef5 100644 --- a/pkg/es/event/reflect.go +++ b/pkg/es/event/reflect.go @@ -9,6 +9,7 @@ import ( "reflect" "strings" + "github.com/sour-is/ev/internal/logz" "github.com/sour-is/ev/pkg/locker" ) @@ -66,41 +67,72 @@ func (u *UnknownEvent) MarshalBinary() ([]byte, error) { // Register a type container for Unmarshalling values into. The type must implement Event and not be a nil value. func Register(ctx context.Context, lis ...Event) error { + _, span := logz.Span(ctx) + defer span.End() + for _, e := range lis { if err := ctx.Err(); err != nil { + span.RecordError(err) return err } - if e == nil { - return fmt.Errorf("can't register event.Event of type=%T with value=%v", e, e) - } - - value := reflect.ValueOf(e) - - if value.IsNil() { - return fmt.Errorf("can't register event.Event of type=%T with value=%v", e, e) - } - - value = reflect.Indirect(value) - name := TypeOf(e) - typ := value.Type() - - if err := eventTypes.Modify(ctx, func(c *config) error { - c.eventTypes[name] = typ - return nil - }); err != nil { + err := RegisterName(ctx, name, e) + if err != nil { return err } } return nil } +func RegisterName(ctx context.Context, name string, e Event) error { + _, span := logz.Span(ctx) + defer span.End() + + if e == nil { + err := fmt.Errorf("can't register event.Event of type=%T with value=%v", e, e) + span.RecordError(err) + return err + } + + value := reflect.ValueOf(e) + + if value.IsNil() { + err := fmt.Errorf("can't register event.Event of type=%T with value=%v", e, e) + span.RecordError(err) + return err + } + value = reflect.Indirect(value) + + typ := value.Type() + + span.AddEvent("register: " + name) + + if err := eventTypes.Modify(ctx, func(c *config) error { + _, span := logz.Span(ctx) + defer span.End() + + c.eventTypes[name] = typ + return nil + }); err != nil { + span.RecordError(err) + return err + } + return nil +} func GetContainer(ctx context.Context, s string) Event { + _, span := logz.Span(ctx) + defer span.End() + var e Event eventTypes.Modify(ctx, func(c *config) error { + _, span := logz.Span(ctx) + defer span.End() + typ, ok := c.eventTypes[s] if !ok { - return fmt.Errorf("not defined") + err := fmt.Errorf("not defined: %s", s) + span.RecordError(err) + return err } newType := reflect.New(typ) newInterface := newType.Interface() @@ -108,7 +140,9 @@ func GetContainer(ctx context.Context, s string) Event { e = iface return nil } - return fmt.Errorf("failed") + err := fmt.Errorf("failed") + span.RecordError(err) + return err }) if e == nil { e = &UnknownEvent{eventType: s} @@ -142,13 +176,19 @@ func MarshalBinary(e Event) (txt []byte, err error) { } func UnmarshalBinary(ctx context.Context, txt []byte, pos uint64) (e Event, err error) { + _, span := logz.Span(ctx) + defer span.End() + sp := bytes.SplitN(txt, []byte{'\t'}, 4) if len(sp) != 4 { - return nil, fmt.Errorf("invalid format. expected=4, got=%d", len(sp)) + err = fmt.Errorf("invalid format. expected=4, got=%d", len(sp)) + span.RecordError(err) + return nil, err } m := Meta{} if err = m.EventID.UnmarshalText(sp[0]); err != nil { + span.RecordError(err) return nil, err } @@ -157,8 +197,10 @@ func UnmarshalBinary(ctx context.Context, txt []byte, pos uint64) (e Event, err eventType := string(sp[2]) e = GetContainer(ctx, eventType) + span.AddEvent(fmt.Sprintf("%s == %T", eventType, e)) if err = e.UnmarshalBinary(sp[3]); err != nil { + span.RecordError(err) return nil, err } diff --git a/api/gql_ev/common.graphqls b/pkg/gql/common.graphqls similarity index 62% rename from api/gql_ev/common.graphqls rename to pkg/gql/common.graphqls index 89a5502..f81640f 100644 --- a/api/gql_ev/common.graphqls +++ b/pkg/gql/common.graphqls @@ -1,32 +1,25 @@ scalar Time scalar Map -type Connection { +type Connection @goModel(model: "github.com/sour-is/ev/pkg/gql.Connection") { paging: PageInfo! edges: [Edge!]! } -input PageInput { +input PageInput @goModel(model: "github.com/sour-is/ev/pkg/gql.PageInput") { idx: Int = 0 count: Int = 30 } -type PageInfo { +type PageInfo @goModel(model: "github.com/sour-is/ev/pkg/gql.PageInfo") { next: Boolean! prev: Boolean! begin: Int! end: Int! } -interface Edge { +interface Edge @goModel(model: "github.com/sour-is/ev/pkg/gql.Edge"){ id: ID! } -type Meta { - eventID: String! @goField(name: "getEventID") - streamID: String! - created: Time! - position: Int! -} - directive @goModel( model: String models: [String!] diff --git a/api/gql_ev/models.go b/pkg/gql/connection.go similarity index 80% rename from api/gql_ev/models.go rename to pkg/gql/connection.go index 9da9d82..5923450 100644 --- a/api/gql_ev/models.go +++ b/pkg/gql/connection.go @@ -1,4 +1,4 @@ -package gql_ev +package gql import ( "context" @@ -54,13 +54,3 @@ func (p *PageInput) GetCount(v int64) int64 { } return *p.Count } - -type SaltyUser struct { - Nick string `json:"nick"` - Pubkey string `json:"pubkey"` - Inbox string `json:"inbox"` -} - -func (s SaltyUser) Endpoint(ctx context.Context) string { - return "https://ev.sour.is/inbox/" + s.Inbox -} diff --git a/pkg/gql/context.go b/pkg/gql/context.go new file mode 100644 index 0000000..072fa7d --- /dev/null +++ b/pkg/gql/context.go @@ -0,0 +1,14 @@ +package gql + +import "context" + +func ToContext[K comparable, V any](ctx context.Context, key K, value V) context.Context { + return context.WithValue(ctx, key, value) +} +func FromContext[K comparable, V any](ctx context.Context, key K) V { + var empty V + if v, ok := ctx.Value(key).(V); ok { + return v + } + return empty +}