From f65e9ca666d5509c80626cea89c33c7c115871a0 Mon Sep 17 00:00:00 2001 From: Jon Lundy Date: Sun, 4 Sep 2022 08:34:22 -0600 Subject: [PATCH] feat: updates --- app/gql/resolver.go | 15 ++- app/msgbus/service.go | 11 +- app/salty/salty-addr.go | 142 ++++++++++++++++++++++++++ app/salty/service.go | 93 +++++++++++++++-- internal/graph/generated/generated.go | 140 +++++++++++++++++-------- internal/graph/model/models_gen.go | 13 --- main.go | 28 ++++- pkg/es/es.graphqls | 5 +- pkg/es/event/reflect.go | 23 +++-- pkg/es/graph.go | 128 +++++++++++++++++++++++ 10 files changed, 523 insertions(+), 75 deletions(-) create mode 100644 app/salty/salty-addr.go create mode 100644 pkg/es/graph.go diff --git a/app/gql/resolver.go b/app/gql/resolver.go index e720beb..7b701af 100644 --- a/app/gql/resolver.go +++ b/app/gql/resolver.go @@ -15,6 +15,7 @@ import ( "github.com/sour-is/ev/app/salty" "github.com/sour-is/ev/internal/graph/generated" "github.com/sour-is/ev/internal/logz" + "github.com/sour-is/ev/pkg/es" "github.com/sour-is/ev/pkg/gql" "github.com/vektah/gqlparser/v2/gqlerror" ) @@ -22,6 +23,7 @@ import ( type Resolver struct { msgbus.MsgbusResolver salty.SaltyResolver + es.EventResolver } func New(ctx context.Context, resolvers ...interface{ RegisterHTTP(*http.ServeMux) }) (*Resolver, error) { @@ -43,13 +45,12 @@ outer: if field.IsNil() && rs.Type().Implements(field.Type()) { span.AddEvent(fmt.Sprint("found ", field.Type().Name())) field.Set(rs) - break outer + continue outer } } + span.AddEvent(fmt.Sprint("default ", field.Type().Name())) - field.Set(noop) - } return r, nil @@ -107,6 +108,7 @@ func NoopRecover(ctx context.Context, err interface{}) error { var _ msgbus.MsgbusResolver = (*noop)(nil) var _ salty.SaltyResolver = (*noop)(nil) +var _ es.EventResolver = (*noop)(nil) func (*noop) CreateSaltyUser(ctx context.Context, nick string, pubkey string) (*salty.SaltyUser, error) { panic("not implemented") @@ -120,4 +122,11 @@ func (*noop) SaltyUser(ctx context.Context, nick string) (*salty.SaltyUser, erro func (*noop) PostAdded(ctx context.Context, streamID string, after int64) (<-chan *msgbus.PostEvent, error) { panic("not implemented") } +func (*noop) Events(ctx context.Context, streamID string, paging *gql.PageInput) (*gql.Connection, error) { + panic("not implemented") +} +func (*noop) EventAdded(ctx context.Context, streamID string, after int64) (<-chan *es.GQLEvent, error) { + panic("not implemented") +} + func (*noop) RegisterHTTP(*http.ServeMux) {} diff --git a/app/msgbus/service.go b/app/msgbus/service.go index 62b99c6..b07c58b 100644 --- a/app/msgbus/service.go +++ b/app/msgbus/service.go @@ -435,14 +435,21 @@ func (e *PostEvent) SetEventMeta(eventMeta event.Meta) { } e.eventMeta = eventMeta } -func (e *PostEvent) MarshalBinary() ([]byte, error) { - j := struct { +func (e *PostEvent) Values() any { + if e == nil { + return nil + } + + return struct { Payload []byte Tags []string }{ Payload: e.payload, Tags: e.tags, } +} +func (e *PostEvent) MarshalBinary() ([]byte, error) { + j := e.Values() return json.Marshal(&j) } func (e *PostEvent) UnmarshalBinary(b []byte) error { diff --git a/app/salty/salty-addr.go b/app/salty/salty-addr.go new file mode 100644 index 0000000..62d2dcf --- /dev/null +++ b/app/salty/salty-addr.go @@ -0,0 +1,142 @@ +package salty + +import ( + "context" + "crypto/sha256" + "encoding/json" + "fmt" + "net/http" + "net/url" + "strings" + + "github.com/keys-pub/keys" + "github.com/sour-is/ev/internal/logz" +) + +// Config represents a Salty Config for a User which at a minimum is required +// to have an Endpoint and Key (Public Key) +type Config struct { + Endpoint string `json:"endpoint"` + Key string `json:"key"` +} + +type Capabilities struct { + AcceptEncoding string +} + +func (c Capabilities) String() string { + return fmt.Sprint("accept-encoding: ", c.AcceptEncoding) +} + +type Addr struct { + User string + Domain string + + capabilities Capabilities + discoveredDomain string + dns DNSResolver + endpoint *url.URL + key *keys.EdX25519PublicKey +} + +// ParseAddr parsers a Salty Address for a user into it's user and domain +// parts and returns an Addr object with the User and Domain and a method +// for returning the expected User's Well-Known URI +func (s *service) ParseAddr(addr string) (*Addr, error) { + parts := strings.Split(strings.ToLower(addr), "@") + if len(parts) != 2 { + return nil, fmt.Errorf("expected nick@domain found %q", addr) + } + + return &Addr{User: parts[0], Domain: parts[1], dns: s.dns}, nil +} + +func (a *Addr) String() string { + return fmt.Sprintf("%s@%s", a.User, a.Domain) +} + +// Hash returns the Hex(SHA256Sum()) of the Address +func (a *Addr) Hash() string { + return fmt.Sprintf("%x", sha256.Sum256([]byte(strings.ToLower(a.String())))) +} + +// URI returns the Well-Known URI for this Addr +func (a *Addr) URI() string { + return fmt.Sprintf("https://%s/.well-known/salty/%s.json", a.DiscoveredDomain(), a.User) +} + +// HashURI returns the Well-Known HashURI for this Addr +func (a *Addr) HashURI() string { + return fmt.Sprintf("https://%s/.well-known/salty/%s.json", a.DiscoveredDomain(), a.Hash()) +} + +// DiscoveredDomain returns the discovered domain (if any) of fallbacks to the Domain +func (a *Addr) DiscoveredDomain() string { + if a.discoveredDomain != "" { + return a.discoveredDomain + } + return a.Domain +} + +func (a *Addr) Refresh(ctx context.Context) error { + ctx, span := logz.Span(ctx) + defer span.End() + + span.AddEvent(fmt.Sprintf("Looking up SRV record for _salty._tcp.%s", a.Domain)) + if target, _, err := a.dns.LookupSRV(ctx, "salty", "tcp", a.Domain); err == nil { + a.discoveredDomain = target + span.AddEvent(fmt.Sprintf("Discovered salty services %s", a.discoveredDomain)) + } else if err != nil { + span.AddEvent(fmt.Sprintf("error looking up SRV record for _salty._tcp.%s : %s", a.Domain, err)) + } + + config, cap, err := fetchConfig(ctx, a.HashURI()) + if err != nil { + // Fallback to plain user nick + config, cap, err = fetchConfig(ctx, a.URI()) + } + if err != nil { + return fmt.Errorf("error looking up user %s: %w", a, err) + } + key, err := keys.NewEdX25519PublicKeyFromID(keys.ID(config.Key)) + if err != nil { + return fmt.Errorf("error parsing public key %s: %w", config.Key, err) + } + a.key = key + + u, err := url.Parse(config.Endpoint) + if err != nil { + return fmt.Errorf("error parsing endpoint %s: %w", config.Endpoint, err) + } + a.endpoint = u + a.capabilities = cap + + span.AddEvent(fmt.Sprintf("Discovered endpoint: %v", a.endpoint)) + span.AddEvent(fmt.Sprintf("Discovered capability: %v", a.capabilities)) + + return nil +} + +func fetchConfig(ctx context.Context, addr string) (config Config, cap Capabilities, err error) { + ctx, span := logz.Span(ctx) + defer span.End() + + var req *http.Request + req, err = http.NewRequestWithContext(ctx, http.MethodGet, addr, nil) + if err != nil { + return + } + + res, err := http.DefaultClient.Do(req) + if err != nil { + return + } + + if err = json.NewDecoder(res.Body).Decode(&config); err != nil { + return + } + + cap.AcceptEncoding = res.Header.Get("Accept-Encoding") + + return +} diff --git a/app/salty/service.go b/app/salty/service.go index 306d7e7..3bc626e 100644 --- a/app/salty/service.go +++ b/app/salty/service.go @@ -6,6 +6,7 @@ import ( "encoding/json" "errors" "fmt" + "net" "net/http" "path" "strings" @@ -19,12 +20,21 @@ import ( "go.uber.org/multierr" ) +type DNSResolver interface { + LookupSRV(ctx context.Context, service, proto, name string) (string, []*net.SRV, error) +} + type service struct { baseURL string es *es.EventStore + dns DNSResolver - Mresolver_create_salty_user syncint64.Counter - Mresolver_salty_user syncint64.Counter + m_create_user syncint64.Counter + m_get_user syncint64.Counter + m_api_ping syncint64.Counter + m_api_register syncint64.Counter + m_api_lookup syncint64.Counter + m_api_send syncint64.Counter } type contextKey struct { name string @@ -51,13 +61,25 @@ func New(ctx context.Context, es *es.EventStore, baseURL string) (*service, erro m := logz.Meter(ctx) - svc := &service{baseURL: baseURL, es: es} + svc := &service{baseURL: baseURL, es: es, dns: net.DefaultResolver} var err, errs error - svc.Mresolver_create_salty_user, err = m.SyncInt64().Counter("resolver_create_salty_user") + svc.m_create_user, err = m.SyncInt64().Counter("salty_create_user") errs = multierr.Append(errs, err) - svc.Mresolver_salty_user, err = m.SyncInt64().Counter("resolver_salty_user") + svc.m_get_user, err = m.SyncInt64().Counter("salty_get_user") + errs = multierr.Append(errs, err) + + svc.m_api_ping, err = m.SyncInt64().Counter("salty_api_ping") + errs = multierr.Append(errs, err) + + svc.m_api_register, err = m.SyncInt64().Counter("salty_api_register") + errs = multierr.Append(errs, err) + + svc.m_api_lookup, err = m.SyncInt64().Counter("salty_api_lookup") + errs = multierr.Append(errs, err) + + svc.m_api_send, err = m.SyncInt64().Counter("salty_api_send") errs = multierr.Append(errs, err) span.RecordError(err) @@ -73,6 +95,12 @@ func (s *service) BaseURL() string { func (s *service) RegisterHTTP(mux *http.ServeMux) { mux.Handle("/.well-known/salty/", logz.Htrace(s, "lookup")) } +func (s *service) RegisterAPIv1(mux *http.ServeMux) { + mux.HandleFunc("/ping", s.apiv1) + mux.HandleFunc("/register", s.apiv1) + mux.HandleFunc("/lookup/", s.apiv1) + mux.HandleFunc("/send", s.apiv1) +} func (s *service) ServeHTTP(w http.ResponseWriter, r *http.Request) { ctx := r.Context() ctx, span := logz.Span(ctx) @@ -112,7 +140,7 @@ func (s *service) CreateSaltyUser(ctx context.Context, nick string, pub string) ctx, span := logz.Span(ctx) defer span.End() - s.Mresolver_create_salty_user.Add(ctx, 1) + s.m_create_user.Add(ctx, 1) streamID := fmt.Sprintf("saltyuser-%x", sha256.Sum256([]byte(strings.ToLower(nick)))) span.AddEvent(streamID) @@ -142,7 +170,7 @@ func (s *service) SaltyUser(ctx context.Context, nick string) (*SaltyUser, error ctx, span := logz.Span(ctx) defer span.End() - s.Mresolver_salty_user.Add(ctx, 1) + s.m_get_user.Add(ctx, 1) streamID := fmt.Sprintf("saltyuser-%x", sha256.Sum256([]byte(strings.ToLower(nick)))) span.AddEvent(streamID) @@ -168,3 +196,54 @@ func (s *service) GetMiddleware() func(http.Handler) http.Handler { }) } } + +func (s *service) apiv1(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + + ctx, span := logz.Span(ctx) + defer span.End() + + switch r.Method { + case http.MethodGet: + switch { + case r.URL.Path == "/ping": + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write([]byte(`{}`)) + + case strings.HasPrefix(r.URL.Path, "/lookup/"): + addr, err := s.ParseAddr(strings.TrimPrefix(r.URL.Path, "/lookup/")) + if err != nil { + span.RecordError(err) + w.WriteHeader(http.StatusBadRequest) + return + } + err = addr.Refresh(ctx) + if err != nil { + span.RecordError(err) + w.WriteHeader(http.StatusBadRequest) + return + } + + json.NewEncoder(w).Encode(addr) + return + + default: + w.WriteHeader(http.StatusNotFound) + return + } + + case http.MethodPost: + switch r.URL.Path { + case "/register": + + case "/send": + + default: + w.WriteHeader(http.StatusNotFound) + return + } + default: + w.WriteHeader(http.StatusMethodNotAllowed) + return + } +} diff --git a/internal/graph/generated/generated.go b/internal/graph/generated/generated.go index d464231..cad5da7 100644 --- a/internal/graph/generated/generated.go +++ b/internal/graph/generated/generated.go @@ -18,7 +18,7 @@ import ( "github.com/99designs/gqlgen/plugin/federation/fedruntime" "github.com/sour-is/ev/app/msgbus" "github.com/sour-is/ev/app/salty" - "github.com/sour-is/ev/internal/graph/model" + "github.com/sour-is/ev/pkg/es" "github.com/sour-is/ev/pkg/es/event" "github.com/sour-is/ev/pkg/gql" gqlparser "github.com/vektah/gqlparser/v2" @@ -58,10 +58,11 @@ type ComplexityRoot struct { } Event struct { - EventID func(childComplexity int) int - EventMeta func(childComplexity int) int - ID func(childComplexity int) int - Values func(childComplexity int) int + Bytes func(childComplexity int) int + EventID func(childComplexity int) int + ID func(childComplexity int) int + Meta func(childComplexity int) int + Values func(childComplexity int) int } Meta struct { @@ -123,7 +124,7 @@ type QueryResolver interface { SaltyUser(ctx context.Context, nick string) (*salty.SaltyUser, error) } type SubscriptionResolver interface { - EventAdded(ctx context.Context, streamID string, after int64) (<-chan *model.Event, error) + EventAdded(ctx context.Context, streamID string, after int64) (<-chan *es.GQLEvent, error) PostAdded(ctx context.Context, streamID string, after int64) (<-chan *msgbus.PostEvent, error) } @@ -156,6 +157,13 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in return e.complexity.Connection.Paging(childComplexity), true + case "Event.bytes": + if e.complexity.Event.Bytes == nil { + break + } + + return e.complexity.Event.Bytes(childComplexity), true + case "Event.eventID": if e.complexity.Event.EventID == nil { break @@ -163,13 +171,6 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in return e.complexity.Event.EventID(childComplexity), true - case "Event.eventMeta": - if e.complexity.Event.EventMeta == nil { - break - } - - return e.complexity.Event.EventMeta(childComplexity), true - case "Event.id": if e.complexity.Event.ID == nil { break @@ -177,6 +178,13 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in return e.complexity.Event.ID(childComplexity), true + case "Event.meta": + if e.complexity.Event.Meta == nil { + break + } + + return e.complexity.Event.Meta(childComplexity), true + case "Event.values": if e.complexity.Event.Values == nil { break @@ -491,12 +499,13 @@ extend type Subscription { eventAdded(streamID: String! after: Int! = -1): Event } -type Event implements Edge { +type Event implements Edge @goModel(model: "github.com/sour-is/ev/pkg/es.GQLEvent") { id: ID! eventID: String! values: Map! - eventMeta: Meta! + bytes: String! + meta: Meta! }`, BuiltIn: false}, {Name: "../../../pkg/gql/common.graphqls", Input: `scalar Time scalar Map @@ -877,7 +886,7 @@ func (ec *executionContext) fieldContext_Connection_edges(ctx context.Context, f return fc, nil } -func (ec *executionContext) _Event_id(ctx context.Context, field graphql.CollectedField, obj *model.Event) (ret graphql.Marshaler) { +func (ec *executionContext) _Event_id(ctx context.Context, field graphql.CollectedField, obj *es.GQLEvent) (ret graphql.Marshaler) { fc, err := ec.fieldContext_Event_id(ctx, field) if err != nil { return graphql.Null @@ -891,7 +900,7 @@ func (ec *executionContext) _Event_id(ctx context.Context, field graphql.Collect }() resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { ctx = rctx // use context from middleware stack in children - return obj.ID, nil + return obj.ID(), nil }) if err != nil { ec.Error(ctx, err) @@ -912,7 +921,7 @@ func (ec *executionContext) fieldContext_Event_id(ctx context.Context, field gra fc = &graphql.FieldContext{ Object: "Event", Field: field, - IsMethod: false, + IsMethod: true, IsResolver: false, Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { return nil, errors.New("field of type ID does not have child fields") @@ -921,7 +930,7 @@ func (ec *executionContext) fieldContext_Event_id(ctx context.Context, field gra return fc, nil } -func (ec *executionContext) _Event_eventID(ctx context.Context, field graphql.CollectedField, obj *model.Event) (ret graphql.Marshaler) { +func (ec *executionContext) _Event_eventID(ctx context.Context, field graphql.CollectedField, obj *es.GQLEvent) (ret graphql.Marshaler) { fc, err := ec.fieldContext_Event_eventID(ctx, field) if err != nil { return graphql.Null @@ -935,7 +944,7 @@ func (ec *executionContext) _Event_eventID(ctx context.Context, field graphql.Co }() resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { ctx = rctx // use context from middleware stack in children - return obj.EventID, nil + return obj.EventID(), nil }) if err != nil { ec.Error(ctx, err) @@ -956,7 +965,7 @@ func (ec *executionContext) fieldContext_Event_eventID(ctx context.Context, fiel fc = &graphql.FieldContext{ Object: "Event", Field: field, - IsMethod: false, + IsMethod: true, IsResolver: false, Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { return nil, errors.New("field of type String does not have child fields") @@ -965,7 +974,7 @@ func (ec *executionContext) fieldContext_Event_eventID(ctx context.Context, fiel return fc, nil } -func (ec *executionContext) _Event_values(ctx context.Context, field graphql.CollectedField, obj *model.Event) (ret graphql.Marshaler) { +func (ec *executionContext) _Event_values(ctx context.Context, field graphql.CollectedField, obj *es.GQLEvent) (ret graphql.Marshaler) { fc, err := ec.fieldContext_Event_values(ctx, field) if err != nil { return graphql.Null @@ -979,7 +988,7 @@ func (ec *executionContext) _Event_values(ctx context.Context, field graphql.Col }() resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { ctx = rctx // use context from middleware stack in children - return obj.Values, nil + return obj.Values(), nil }) if err != nil { ec.Error(ctx, err) @@ -1000,7 +1009,7 @@ func (ec *executionContext) fieldContext_Event_values(ctx context.Context, field fc = &graphql.FieldContext{ Object: "Event", Field: field, - IsMethod: false, + IsMethod: true, IsResolver: false, Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { return nil, errors.New("field of type Map does not have child fields") @@ -1009,8 +1018,8 @@ func (ec *executionContext) fieldContext_Event_values(ctx context.Context, field return fc, nil } -func (ec *executionContext) _Event_eventMeta(ctx context.Context, field graphql.CollectedField, obj *model.Event) (ret graphql.Marshaler) { - fc, err := ec.fieldContext_Event_eventMeta(ctx, field) +func (ec *executionContext) _Event_bytes(ctx context.Context, field graphql.CollectedField, obj *es.GQLEvent) (ret graphql.Marshaler) { + fc, err := ec.fieldContext_Event_bytes(ctx, field) if err != nil { return graphql.Null } @@ -1023,7 +1032,51 @@ func (ec *executionContext) _Event_eventMeta(ctx context.Context, field graphql. }() resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { ctx = rctx // use context from middleware stack in children - return obj.EventMeta, nil + return obj.Bytes() + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + if !graphql.HasFieldError(ctx, fc) { + ec.Errorf(ctx, "must not be null") + } + return graphql.Null + } + res := resTmp.(string) + fc.Result = res + return ec.marshalNString2string(ctx, field.Selections, res) +} + +func (ec *executionContext) fieldContext_Event_bytes(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "Event", + Field: field, + IsMethod: true, + IsResolver: false, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + return nil, errors.New("field of type String does not have child fields") + }, + } + return fc, nil +} + +func (ec *executionContext) _Event_meta(ctx context.Context, field graphql.CollectedField, obj *es.GQLEvent) (ret graphql.Marshaler) { + fc, err := ec.fieldContext_Event_meta(ctx, field) + if err != nil { + return graphql.Null + } + ctx = graphql.WithFieldContext(ctx, fc) + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return obj.Meta(), nil }) if err != nil { ec.Error(ctx, err) @@ -1040,11 +1093,11 @@ func (ec *executionContext) _Event_eventMeta(ctx context.Context, field graphql. return ec.marshalNMeta2ᚖgithubᚗcomᚋsourᚑisᚋevᚋpkgᚋesᚋeventᚐMeta(ctx, field.Selections, res) } -func (ec *executionContext) fieldContext_Event_eventMeta(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { +func (ec *executionContext) fieldContext_Event_meta(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { fc = &graphql.FieldContext{ Object: "Event", Field: field, - IsMethod: false, + IsMethod: true, IsResolver: false, Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { switch field.Name { @@ -2269,7 +2322,7 @@ func (ec *executionContext) _Subscription_eventAdded(ctx context.Context, field } return func(ctx context.Context) graphql.Marshaler { select { - case res, ok := <-resTmp.(<-chan *model.Event): + case res, ok := <-resTmp.(<-chan *es.GQLEvent): if !ok { return nil } @@ -2277,7 +2330,7 @@ func (ec *executionContext) _Subscription_eventAdded(ctx context.Context, field w.Write([]byte{'{'}) graphql.MarshalString(field.Alias).MarshalGQL(w) w.Write([]byte{':'}) - ec.marshalOEvent2ᚖgithubᚗcomᚋsourᚑisᚋevᚋinternalᚋgraphᚋmodelᚐEvent(ctx, field.Selections, res).MarshalGQL(w) + ec.marshalOEvent2ᚖgithubᚗcomᚋsourᚑisᚋevᚋpkgᚋesᚐGQLEvent(ctx, field.Selections, res).MarshalGQL(w) w.Write([]byte{'}'}) }) case <-ctx.Done(): @@ -2300,8 +2353,10 @@ func (ec *executionContext) fieldContext_Subscription_eventAdded(ctx context.Con return ec.fieldContext_Event_eventID(ctx, field) case "values": return ec.fieldContext_Event_values(ctx, field) - case "eventMeta": - return ec.fieldContext_Event_eventMeta(ctx, field) + case "bytes": + return ec.fieldContext_Event_bytes(ctx, field) + case "meta": + return ec.fieldContext_Event_meta(ctx, field) } return nil, fmt.Errorf("no field named %q was found under type Event", field.Name) }, @@ -4263,9 +4318,7 @@ func (ec *executionContext) _Edge(ctx context.Context, sel ast.SelectionSet, obj switch obj := (obj).(type) { case nil: return graphql.Null - case model.Event: - return ec._Event(ctx, sel, &obj) - case *model.Event: + case *es.GQLEvent: if obj == nil { return graphql.Null } @@ -4321,7 +4374,7 @@ func (ec *executionContext) _Connection(ctx context.Context, sel ast.SelectionSe var eventImplementors = []string{"Event", "Edge"} -func (ec *executionContext) _Event(ctx context.Context, sel ast.SelectionSet, obj *model.Event) graphql.Marshaler { +func (ec *executionContext) _Event(ctx context.Context, sel ast.SelectionSet, obj *es.GQLEvent) graphql.Marshaler { fields := graphql.CollectFields(ec.OperationContext, sel, eventImplementors) out := graphql.NewFieldSet(fields) var invalids uint32 @@ -4350,9 +4403,16 @@ func (ec *executionContext) _Event(ctx context.Context, sel ast.SelectionSet, ob if out.Values[i] == graphql.Null { invalids++ } - case "eventMeta": + case "bytes": - out.Values[i] = ec._Event_eventMeta(ctx, field, obj) + out.Values[i] = ec._Event_bytes(ctx, field, obj) + + if out.Values[i] == graphql.Null { + invalids++ + } + case "meta": + + out.Values[i] = ec._Event_meta(ctx, field, obj) if out.Values[i] == graphql.Null { invalids++ @@ -5658,7 +5718,7 @@ func (ec *executionContext) marshalOBoolean2ᚖbool(ctx context.Context, sel ast return res } -func (ec *executionContext) marshalOEvent2ᚖgithubᚗcomᚋsourᚑisᚋevᚋinternalᚋgraphᚋmodelᚐEvent(ctx context.Context, sel ast.SelectionSet, v *model.Event) graphql.Marshaler { +func (ec *executionContext) marshalOEvent2ᚖgithubᚗcomᚋsourᚑisᚋevᚋpkgᚋesᚐGQLEvent(ctx context.Context, sel ast.SelectionSet, v *es.GQLEvent) graphql.Marshaler { if v == nil { return graphql.Null } diff --git a/internal/graph/model/models_gen.go b/internal/graph/model/models_gen.go index 4461ebe..8e0d251 100644 --- a/internal/graph/model/models_gen.go +++ b/internal/graph/model/models_gen.go @@ -1,16 +1,3 @@ // Code generated by github.com/99designs/gqlgen, DO NOT EDIT. package model - -import ( - "github.com/sour-is/ev/pkg/es/event" -) - -type Event struct { - ID string `json:"id"` - EventID string `json:"eventID"` - Values map[string]interface{} `json:"values"` - EventMeta *event.Meta `json:"eventMeta"` -} - -func (Event) IsEdge() {} diff --git a/main.go b/main.go index 44f28ab..3e863e5 100644 --- a/main.go +++ b/main.go @@ -68,6 +68,8 @@ func run(ctx context.Context) error { enable := set(strings.Fields(env("EV_ENABLE", "salty msgbus gql peers"))...) var svcs []interface{ RegisterHTTP(*http.ServeMux) } + svcs = append(svcs, es) + if enable.Has("salty") { span.AddEvent("Enable Salty") salty, err := salty.New(ctx, es, path.Join(env("EV_BASE_URL", "http://"+s.Addr), "inbox")) @@ -148,10 +150,15 @@ func env(name, defaultValue string) string { return defaultValue } func httpMux(fns ...interface{ RegisterHTTP(*http.ServeMux) }) http.Handler { - mux := http.NewServeMux() + mux := newMux() for _, fn := range fns { - fn.RegisterHTTP(mux) + fn.RegisterHTTP(mux.ServeMux) + + if fn, ok := fn.(interface{ RegisterAPIv1(*http.ServeMux) }); ok { + fn.RegisterAPIv1(mux.api) + } } + return cors.AllowAll().Handler(mux) } @@ -183,3 +190,20 @@ func (s Set[T]) String() string { b.WriteString(")") return b.String() } + +type mux struct { + *http.ServeMux + api *http.ServeMux +} +func newMux() *mux { + mux := &mux{ + api: http.NewServeMux(), + ServeMux: http.NewServeMux(), + } + mux.Handle("/api/v1/", http.StripPrefix("/api/v1/", mux.api)) + + return mux +} +func (m mux) HandleAPIv1(pattern string, handler http.Handler) { + m.api.Handle(pattern, handler) +} \ No newline at end of file diff --git a/pkg/es/es.graphqls b/pkg/es/es.graphqls index 0ebafc6..59fc253 100644 --- a/pkg/es/es.graphqls +++ b/pkg/es/es.graphqls @@ -14,10 +14,11 @@ extend type Subscription { eventAdded(streamID: String! after: Int! = -1): Event } -type Event implements Edge { +type Event implements Edge @goModel(model: "github.com/sour-is/ev/pkg/es.GQLEvent") { id: ID! eventID: String! values: Map! - eventMeta: Meta! + bytes: String! + meta: Meta! } \ No newline at end of file diff --git a/pkg/es/event/reflect.go b/pkg/es/event/reflect.go index 636fd85..c1813bb 100644 --- a/pkg/es/event/reflect.go +++ b/pkg/es/event/reflect.go @@ -247,11 +247,22 @@ func embedJSON(s string) json.RawMessage { return []byte(fmt.Sprintf(`"%s"`, strings.Replace(s, `"`, `\"`, -1))) } -func values(e Event) map[string]any { - m := make(map[string]any) - v := reflect.ValueOf(e) - for _, idx := range reflect.VisibleFields(v) { - field := v.FieldByIndex(idx.Index) - m[field.] +func Values(e Event) map[string]any { + var a any = e + + if e, ok := e.(interface{ Values() any }); ok { + a = e.Values() } + + m := make(map[string]any) + v := reflect.Indirect(reflect.ValueOf(a)) + for _, idx := range reflect.VisibleFields(v.Type()) { + if !idx.IsExported() { + continue + } + + field := v.FieldByIndex(idx.Index) + m[idx.Name] = field.Interface() + } + return m } diff --git a/pkg/es/graph.go b/pkg/es/graph.go new file mode 100644 index 0000000..03c3173 --- /dev/null +++ b/pkg/es/graph.go @@ -0,0 +1,128 @@ +package es + +import ( + "context" + "fmt" + "net/http" + "time" + + "github.com/sour-is/ev/internal/logz" + "github.com/sour-is/ev/pkg/es/event" + "github.com/sour-is/ev/pkg/gql" +) + +type EventResolver interface { + Events(ctx context.Context, streamID string, paging *gql.PageInput) (*gql.Connection, error) + EventAdded(ctx context.Context, streamID string, after int64) (<-chan *GQLEvent, error) +} + +func (es *EventStore) Events(ctx context.Context, streamID string, paging *gql.PageInput) (*gql.Connection, error) { + ctx, span := logz.Span(ctx) + defer span.End() + + lis, err := es.Read(ctx, streamID, paging.GetIdx(0), paging.GetCount(30)) + if err != nil { + span.RecordError(err) + return nil, err + } + + edges := make([]gql.Edge, 0, len(lis)) + for i := range lis { + span.AddEvent(fmt.Sprint("event ", i, " of ", len(lis))) + edges = append(edges, &GQLEvent{lis[i]}) + } + + var first, last uint64 + if first, err = es.FirstIndex(ctx, streamID); err != nil { + span.RecordError(err) + return nil, err + } + if last, err = es.LastIndex(ctx, streamID); err != nil { + span.RecordError(err) + return nil, err + } + + return &gql.Connection{ + Paging: &gql.PageInfo{ + Next: lis.Last().EventMeta().Position < last, + Prev: lis.First().EventMeta().Position > first, + Begin: lis.First().EventMeta().Position, + End: lis.Last().EventMeta().Position, + }, + Edges: edges, + }, nil +} +func (e *EventStore) EventAdded(ctx context.Context, streamID string, after int64) (<-chan *GQLEvent, error) { + ctx, span := logz.Span(ctx) + defer span.End() + + es := e.EventStream() + if es == nil { + return nil, fmt.Errorf("EventStore does not implement streaming") + } + + sub, err := es.Subscribe(ctx, streamID, after) + if err != nil { + span.RecordError(err) + return nil, err + } + + ch := make(chan *GQLEvent) + + go func() { + ctx, span := logz.Span(ctx) + defer span.End() + + defer func() { + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + err := sub.Close(ctx) + span.RecordError(err) + }() + + for sub.Recv(ctx) { + events, err := sub.Events(ctx) + if err != nil { + span.RecordError(err) + break + } + span.AddEvent(fmt.Sprintf("received %d events", len(events))) + + for i := range events { + select { + case ch <- &GQLEvent{events[i]}: + continue + case <-ctx.Done(): + return + } + + } + } + }() + + return ch, nil +} +func (*EventStore) RegisterHTTP(*http.ServeMux) {} + +type GQLEvent struct { + e event.Event +} + +func (e *GQLEvent) ID() string { + return "Event/" + e.e.EventMeta().GetEventID() +} +func (e *GQLEvent) EventID() string { + return e.e.EventMeta().GetEventID() +} +func (e *GQLEvent) Values() map[string]interface{} { + return event.Values(e.e) +} +func (e *GQLEvent) Bytes() (string, error) { + b, err := e.e.MarshalBinary() + return string(b), err +} +func (e *GQLEvent) Meta() *event.Meta { + meta := e.e.EventMeta() + return &meta +} +func (e *GQLEvent) IsEdge() {}