diff --git a/.air.toml b/.air.toml index 498b348..b69c73a 100644 --- a/.air.toml +++ b/.air.toml @@ -15,9 +15,9 @@ tmp_dir = "tmp" full_bin = "" include_dir = [] include_ext = ["go", "tpl", "tmpl", "html"] - kill_delay = "0s" + kill_delay = "1s" log = "build-errors.log" - send_interrupt = false + send_interrupt = true stop_on_error = true [color] diff --git a/app/playground/playground.go b/app/gql/playground/playground.go similarity index 100% rename from app/playground/playground.go rename to app/gql/playground/playground.go diff --git a/app/gql/resolver.go b/app/gql/resolver.go new file mode 100644 index 0000000..e720beb --- /dev/null +++ b/app/gql/resolver.go @@ -0,0 +1,123 @@ +package gql + +import ( + "context" + "fmt" + "net/http" + "os" + "reflect" + "runtime/debug" + + "github.com/99designs/gqlgen/graphql/handler" + "github.com/ravilushqa/otelgqlgen" + "github.com/sour-is/ev/app/gql/playground" + "github.com/sour-is/ev/app/msgbus" + "github.com/sour-is/ev/app/salty" + "github.com/sour-is/ev/internal/graph/generated" + "github.com/sour-is/ev/internal/logz" + "github.com/sour-is/ev/pkg/gql" + "github.com/vektah/gqlparser/v2/gqlerror" +) + +type Resolver struct { + msgbus.MsgbusResolver + salty.SaltyResolver +} + +func New(ctx context.Context, resolvers ...interface{ RegisterHTTP(*http.ServeMux) }) (*Resolver, error) { + _, span := logz.Span(ctx) + defer span.End() + + r := &Resolver{} + v := reflect.ValueOf(r) + v = reflect.Indirect(v) + noop := reflect.ValueOf(&noop{}) + +outer: + for _, idx := range reflect.VisibleFields(v.Type()) { + field := v.FieldByIndex(idx.Index) + + for i := range resolvers { + rs := reflect.ValueOf(resolvers[i]) + + if field.IsNil() && rs.Type().Implements(field.Type()) { + span.AddEvent(fmt.Sprint("found ", field.Type().Name())) + field.Set(rs) + break outer + } + } + span.AddEvent(fmt.Sprint("default ", field.Type().Name())) + + field.Set(noop) + + } + + return r, nil +} + +// Query returns generated.QueryResolver implementation. +func (r *Resolver) Query() generated.QueryResolver { return r } + +// Query returns generated.QueryResolver implementation. +func (r *Resolver) Mutation() generated.MutationResolver { return r } + +// Subscription returns generated.SubscriptionResolver implementation. +func (r *Resolver) Subscription() generated.SubscriptionResolver { return r } + +// ChainMiddlewares will check all embeded resolvers for a GetMiddleware func and add to handler. +func (r *Resolver) ChainMiddlewares(h http.Handler) http.Handler { + v := reflect.ValueOf(r) // Get reflected value of *Resolver + v = reflect.Indirect(v) // Get the pointed value (returns a zero value on nil) + n := v.NumField() // Get number of fields to iterate over. + for i := 0; i < n; i++ { + f := v.Field(i) + if !f.CanInterface() { // Skip non-interface types. + continue + } + if iface, ok := f.Interface().(interface { + GetMiddleware() func(http.Handler) http.Handler + }); ok { + h = iface.GetMiddleware()(h) // Append only items that fulfill the interface. + } + } + + return h +} + +func (r *Resolver) RegisterHTTP(mux *http.ServeMux) { + gql := handler.NewDefaultServer(generated.NewExecutableSchema(generated.Config{Resolvers: r})) + gql.SetRecoverFunc(NoopRecover) + gql.Use(otelgqlgen.Middleware()) + mux.Handle("/", playground.Handler("GraphQL playground", "/gql")) + mux.Handle("/gql", logz.Htrace(r.ChainMiddlewares(gql), "gql")) +} + +type noop struct{} + +func NoopRecover(ctx context.Context, err interface{}) error { + if err, ok := err.(string); ok && err == "not implemented" { + return gqlerror.Errorf("not implemented") + } + fmt.Fprintln(os.Stderr, err) + fmt.Fprintln(os.Stderr) + debug.PrintStack() + + return gqlerror.Errorf("internal system error") +} + +var _ msgbus.MsgbusResolver = (*noop)(nil) +var _ salty.SaltyResolver = (*noop)(nil) + +func (*noop) CreateSaltyUser(ctx context.Context, nick string, pubkey string) (*salty.SaltyUser, error) { + panic("not implemented") +} +func (*noop) Posts(ctx context.Context, streamID string, paging *gql.PageInput) (*gql.Connection, error) { + panic("not implemented") +} +func (*noop) SaltyUser(ctx context.Context, nick string) (*salty.SaltyUser, error) { + panic("not implemented") +} +func (*noop) PostAdded(ctx context.Context, streamID string, after int64) (<-chan *msgbus.PostEvent, error) { + panic("not implemented") +} +func (*noop) RegisterHTTP(*http.ServeMux) {} diff --git a/app/msgbus/service.go b/app/msgbus/service.go index c2e9aff..62b99c6 100644 --- a/app/msgbus/service.go +++ b/app/msgbus/service.go @@ -31,6 +31,7 @@ type service struct { type MsgbusResolver interface { Posts(ctx context.Context, streamID string, paging *gql.PageInput) (*gql.Connection, error) PostAdded(ctx context.Context, streamID string, after int64) (<-chan *PostEvent, error) + RegisterHTTP(mux *http.ServeMux) } func New(ctx context.Context, es *es.EventStore) (*service, error) { diff --git a/app/peerfinder/peer.go b/app/peerfinder/peer.go new file mode 100644 index 0000000..84962c6 --- /dev/null +++ b/app/peerfinder/peer.go @@ -0,0 +1,183 @@ +package peerfinder + +import ( + "bytes" + "encoding/json" + "net/netip" + "strconv" + "time" + + "github.com/sour-is/ev/pkg/es/event" +) + +type Request struct { + eventMeta event.Meta + + RequestIP string `json:"req_ip"` + Hidden bool `json:"hide,omitempty"` +} + +func (r *Request) StreamID() string { + return r.EventMeta().GetEventID() +} +func (r *Request) RequestID() string { + return r.EventMeta().GetEventID() +} +func (r *Request) Created() time.Time { + return r.EventMeta().Created() +} +func (r *Request) CreatedString() string { + return r.Created().Format("2006-01-02 15:04:05") +} +func (r *Request) Family() int { + if r == nil { + return 0 + } + + ip, err := netip.ParseAddr(r.RequestIP) + switch { + case err != nil: + return 0 + case ip.Is4(): + return 1 + default: + return 2 + } +} + +var _ event.Event = (*Request)(nil) + +func (e *Request) EventMeta() event.Meta { + if e == nil { + return event.Meta{} + } + return e.eventMeta +} +func (e *Request) SetEventMeta(m event.Meta) { + if e != nil { + e.eventMeta = m + } +} +func (e *Request) MarshalBinary() (text []byte, err error) { + return json.Marshal(e) +} +func (e *Request) UnmarshalBinary(b []byte) error { + return json.Unmarshal(b, e) +} +func (e *Request) MarshalEnviron() ([]byte, error) { + if e == nil { + return nil, nil + } + + var b bytes.Buffer + b.WriteString("REQ_ID=") + b.WriteString(e.RequestID()) + b.WriteRune('\n') + + b.WriteString("REQ_IP=") + b.WriteString(e.RequestIP) + b.WriteRune('\n') + + b.WriteString("REQ_FAMILY=") + if family := e.Family(); family > 0 { + b.WriteString(strconv.Itoa(family)) + } + b.WriteRune('\n') + + b.WriteString("REQ_CREATED=") + b.WriteString(e.CreatedString()) + b.WriteRune('\n') + + return b.Bytes(), nil +} + +type Result struct { + eventMeta event.Meta + + RequestID string `json:"req_id"` + PeerID string `json:"peer_id"` + PeerVersion string `json:"peer_version"` + Latency float64 `json:"latency,omitempty"` +} + +func (r *Result) Created() time.Time { + return r.eventMeta.Created() +} + +var _ event.Event = (*Result)(nil) + +func (e *Result) EventMeta() event.Meta { + if e == nil { + return event.Meta{} + } + return e.eventMeta +} +func (e *Result) SetEventMeta(m event.Meta) { + if e != nil { + e.eventMeta = m + } +} +func (e *Result) MarshalBinary() (text []byte, err error) { + return json.Marshal(e) +} +func (e *Result) UnmarshalBinary(b []byte) error { + return json.Unmarshal(b, e) +} + +type Info struct { + ScriptVersion string `json:"script_version"` + + event.AggregateRoot +} + +var _ event.Aggregate = (*Info)(nil) + +func (a *Info) ApplyEvent(lis ...event.Event) { + for _, e := range lis { + switch e := e.(type) { + case *VersionChanged: + a.ScriptVersion = e.ScriptVersion + } + } +} +func (a *Info) MarshalEnviron() ([]byte, error) { + var b bytes.Buffer + + b.WriteString("SCRIPT_VERSION=") + b.WriteString(a.ScriptVersion) + b.WriteRune('\n') + + return b.Bytes(), nil +} +func (a *Info) OnCreate() error { + if a.StreamVersion() == 0 { + event.Raise(a, &VersionChanged{ScriptVersion: initVersion}) + } + return nil +} + +type VersionChanged struct { + ScriptVersion string `json:"script_version"` + + eventMeta event.Meta +} + +var _ event.Event = (*VersionChanged)(nil) + +func (e *VersionChanged) EventMeta() event.Meta { + if e == nil { + return event.Meta{} + } + return e.eventMeta +} +func (e *VersionChanged) SetEventMeta(m event.Meta) { + if e != nil { + e.eventMeta = m + } +} +func (e *VersionChanged) MarshalBinary() (text []byte, err error) { + return json.Marshal(e) +} +func (e *VersionChanged) UnmarshalBinary(b []byte) error { + return json.Unmarshal(b, e) +} diff --git a/app/peerfinder/service.go b/app/peerfinder/service.go new file mode 100644 index 0000000..1f1bb2b --- /dev/null +++ b/app/peerfinder/service.go @@ -0,0 +1,268 @@ +package peerfinder + +import ( + "context" + "encoding/json" + "io" + "net/http" + "strconv" + "strings" + + ulid "github.com/oklog/ulid/v2" + contentnegotiation "gitlab.com/jamietanna/content-negotiation-go" + + "github.com/sour-is/ev/internal/logz" + "github.com/sour-is/ev/pkg/es" + "github.com/sour-is/ev/pkg/es/event" +) + +const ( + queueRequests = "pf-requests" + queueResponses = "pf-response-" + aggInfo = "pf-info" + initVersion = "1.1.0" +) + +type service struct { + es *es.EventStore +} + +func New(ctx context.Context, es *es.EventStore) (*service, error) { + ctx, span := logz.Span(ctx) + defer span.End() + + if err := event.Register(ctx, &Request{}, &Result{}, &VersionChanged{}); err != nil { + span.RecordError(err) + return nil, err + } + + svc := &service{es: es} + + return svc, nil +} +func (s *service) RegisterHTTP(mux *http.ServeMux) { + mux.Handle("/peers/", logz.Htrace(s, "peers")) + +} +func (s *service) ServeHTTP(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + + _, span := logz.Span(ctx) + defer span.End() + + r = r.WithContext(ctx) + + switch r.Method { + case http.MethodGet: + switch { + case strings.HasPrefix(r.URL.Path, "/peers/pending/"): + s.getPending(w, r, strings.TrimPrefix(r.URL.Path, "/peers/pending/")) + return + + case strings.HasPrefix(r.URL.Path, "/peers/req/"): + s.getResults(w, r, strings.TrimPrefix(r.URL.Path, "/peers/req/")) + return + + default: + w.WriteHeader(http.StatusNotFound) + return + } + case http.MethodPost: + switch { + case strings.HasPrefix(r.URL.Path, "/peers/req/"): + s.postResult(w, r, strings.TrimPrefix(r.URL.Path, "/peers/req/")) + return + + case strings.HasPrefix(r.URL.Path, "/peers/req"): + s.postRequest(w, r) + return + + default: + w.WriteHeader(http.StatusNotFound) + return + } + default: + w.WriteHeader(http.StatusMethodNotAllowed) + return + } +} + +func (s *service) getPending(w http.ResponseWriter, r *http.Request, uuid string) { + ctx := r.Context() + + _, span := logz.Span(ctx) + defer span.End() + + info, err := es.Upsert(ctx, s.es, "pf-info", func(ctx context.Context, agg *Info) error { + return agg.OnCreate() // initialize if not exists + }) + if err != nil { + span.RecordError(err) + w.WriteHeader(http.StatusInternalServerError) + return + } + + requests, err := s.es.Read(ctx, queueRequests, -1, -30) + if err != nil { + span.RecordError(err) + w.WriteHeader(http.StatusInternalServerError) + return + } + + responses, err := s.es.Read(ctx, queueResponses+uuid, -1, -30) + if err != nil { + span.RecordError(err) + w.WriteHeader(http.StatusInternalServerError) + return + } + + req := filter(requests, responses) + + negotiator := contentnegotiation.NewNegotiator("application/json", "text/environment", "text/plain", "text/html") + negotiated, _, err := negotiator.Negotiate(r.Header.Get("Accept")) + if err != nil { + span.RecordError(err) + w.WriteHeader(http.StatusNotAcceptable) + return + } + + span.AddEvent(negotiated.String()) + switch negotiated.String() { + case "text/environment": + _, err = encodeTo(w, info.MarshalEnviron, req.MarshalEnviron) + case "application/json": + var out interface{} = info + if req != nil { + out = struct { + ScriptVersion string `json:"script_version"` + RequestID string `json:"req_id"` + RequestIP string `json:"req_ip"` + Family string `json:"req_family"` + Created string `json:"req_created"` + }{ + info.ScriptVersion, + req.RequestID(), + req.RequestIP, + strconv.Itoa(req.Family()), + req.CreatedString(), + } + } + err = json.NewEncoder(w).Encode(out) + } + span.RecordError(err) +} +func (s *service) getResults(w http.ResponseWriter, r *http.Request, uuid string) { + ctx := r.Context() + + responses, err := s.es.Read(ctx, queueResponses+uuid, -1, es.AllEvents) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + + negotiator := contentnegotiation.NewNegotiator("application/json", "text/environment", "text/csv", "text/plain", "text/html") + negotiated, _, err := negotiator.Negotiate("application/json") + if err != nil { + w.WriteHeader(http.StatusNotAcceptable) + return + } + switch negotiated.String() { + // case "text/environment": + // encodeTo(w, responses.MarshalBinary) + case "application/json": + json.NewEncoder(w).Encode(responses) + } +} +func (s *service) postRequest(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + + if err := r.ParseForm(); err != nil { + w.WriteHeader(http.StatusUnprocessableEntity) + return + } + + req := &Request{ + RequestIP: r.Form.Get("req_ip"), + } + + if hidden, err := strconv.ParseBool(r.Form.Get("req_hidden")); err != nil { + req.Hidden = hidden + } + + s.es.Append(ctx, queueRequests, event.NewEvents(req)) +} +func (s *service) postResult(w http.ResponseWriter, r *http.Request, id string) { + ctx := r.Context() + + if _, err := ulid.ParseStrict(id); err != nil { + w.WriteHeader(http.StatusNotFound) + return + } + + if err := r.ParseForm(); err != nil { + w.WriteHeader(http.StatusUnprocessableEntity) + return + } + + latency, err := strconv.ParseFloat(r.Form.Get("res_latency"), 64) + if err != nil { + w.WriteHeader(http.StatusUnprocessableEntity) + return + } + + req := &Result{ + RequestID: id, + PeerID: r.Form.Get("peer_id"), + PeerVersion: r.Form.Get("peer_version"), + Latency: latency, + } + + s.es.Append(ctx, queueResponses+id, event.NewEvents(req)) +} + +func filter(requests, responses event.Events) *Request { + have := make(map[string]struct{}, len(responses)) + for _, res := range toList[Result](responses...) { + have[res.RequestID] = struct{}{} + } + for _, req := range reverse(toList[Request](requests...)...) { + if _, ok := have[req.RequestID()]; !ok { + return req + } + } + return nil +} +func toList[E any, T es.PE[E]](lis ...event.Event) []T { + newLis := make([]T, 0, len(lis)) + for i := range lis { + if e, ok := lis[i].(T); ok { + newLis = append(newLis, e) + } + } + return newLis +} +func reverse[T any](s ...T) []T { + first, last := 0, len(s)-1 + for first < last { + s[first], s[last] = s[last], s[first] + first++ + last-- + } + return s +} +func encodeTo(w io.Writer, fns ...func() ([]byte, error)) (int, error) { + i := 0 + for _, fn := range fns { + b, err := fn() + if err != nil { + return i, err + } + + j, err := w.Write(b) + i += j + if err != nil { + return i, err + } + } + return i, nil +} diff --git a/app/salty/service.go b/app/salty/service.go index 5b796d6..306d7e7 100644 --- a/app/salty/service.go +++ b/app/salty/service.go @@ -35,6 +35,7 @@ var saltyKey = contextKey{"salty"} type SaltyResolver interface { CreateSaltyUser(ctx context.Context, nick string, pub string) (*SaltyUser, error) SaltyUser(ctx context.Context, nick string) (*SaltyUser, error) + RegisterHTTP(mux *http.ServeMux) } func New(ctx context.Context, es *es.EventStore, baseURL string) (*service, error) { @@ -69,7 +70,7 @@ func (s *service) BaseURL() string { } return s.baseURL } -func(s *service) RegisterHTTP(mux *http.ServeMux) { +func (s *service) RegisterHTTP(mux *http.ServeMux) { mux.Handle("/.well-known/salty/", logz.Htrace(s, "lookup")) } func (s *service) ServeHTTP(w http.ResponseWriter, r *http.Request) { diff --git a/env b/env new file mode 100644 index 0000000..660c109 --- /dev/null +++ b/env @@ -0,0 +1,5 @@ +SCRIPT_VERSION=1.1.0 +REQ_ID=01GB0GDHCEAPMZFVES2KBCN7KF +REQ_IP=1.1.1.1 +REQ_FAMILY=1 +REQ_CREATED=2022-08-21 09:30:22 diff --git a/go.mod b/go.mod index f173970..1b451d2 100644 --- a/go.mod +++ b/go.mod @@ -80,6 +80,7 @@ require ( github.com/tidwall/match v1.1.1 // indirect github.com/tidwall/pretty v1.2.0 // indirect github.com/tidwall/tinylru v1.1.0 // indirect + gitlab.com/jamietanna/content-negotiation-go v0.2.0 go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.9.0 go.uber.org/multierr v1.8.0 ) diff --git a/go.sum b/go.sum index acfe328..537851e 100644 --- a/go.sum +++ b/go.sum @@ -351,6 +351,8 @@ github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de github.com/yuin/goldmark v1.4.1/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/yusufpapurcu/wmi v1.2.2 h1:KBNDSne4vP5mbSWnJbO+51IMOXJB67QiYCSBrubbPRg= github.com/yusufpapurcu/wmi v1.2.2/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= +gitlab.com/jamietanna/content-negotiation-go v0.2.0 h1:vT0OLEPQ6DYRG3/1F7joXSNjVQHGivJ6+JzODlJfjWw= +gitlab.com/jamietanna/content-negotiation-go v0.2.0/go.mod h1:n4ZZ8/X5TstnjYRnjEtR/fC7MCTe+aRKM7PQlLBH3PQ= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= diff --git a/internal/graph/generated/generated.go b/internal/graph/generated/generated.go index b5c81de..d464231 100644 --- a/internal/graph/generated/generated.go +++ b/internal/graph/generated/generated.go @@ -18,6 +18,7 @@ import ( "github.com/99designs/gqlgen/plugin/federation/fedruntime" "github.com/sour-is/ev/app/msgbus" "github.com/sour-is/ev/app/salty" + "github.com/sour-is/ev/internal/graph/model" "github.com/sour-is/ev/pkg/es/event" "github.com/sour-is/ev/pkg/gql" gqlparser "github.com/vektah/gqlparser/v2" @@ -56,6 +57,13 @@ type ComplexityRoot struct { Paging func(childComplexity int) int } + Event struct { + EventID func(childComplexity int) int + EventMeta func(childComplexity int) int + ID func(childComplexity int) int + Values func(childComplexity int) int + } + Meta struct { Created func(childComplexity int) int GetEventID func(childComplexity int) int @@ -83,6 +91,7 @@ type ComplexityRoot struct { } Query struct { + Events func(childComplexity int, streamID string, paging *gql.PageInput) int Posts func(childComplexity int, streamID string, paging *gql.PageInput) int SaltyUser func(childComplexity int, nick string) int __resolve__service func(childComplexity int) int @@ -96,7 +105,8 @@ type ComplexityRoot struct { } Subscription struct { - PostAdded func(childComplexity int, streamID string, after int64) int + EventAdded func(childComplexity int, streamID string, after int64) int + PostAdded func(childComplexity int, streamID string, after int64) int } _Service struct { @@ -108,10 +118,12 @@ type MutationResolver interface { CreateSaltyUser(ctx context.Context, nick string, pubkey string) (*salty.SaltyUser, error) } type QueryResolver interface { + Events(ctx context.Context, streamID string, paging *gql.PageInput) (*gql.Connection, error) Posts(ctx context.Context, streamID string, paging *gql.PageInput) (*gql.Connection, error) SaltyUser(ctx context.Context, nick string) (*salty.SaltyUser, error) } type SubscriptionResolver interface { + EventAdded(ctx context.Context, streamID string, after int64) (<-chan *model.Event, error) PostAdded(ctx context.Context, streamID string, after int64) (<-chan *msgbus.PostEvent, error) } @@ -144,6 +156,34 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in return e.complexity.Connection.Paging(childComplexity), true + case "Event.eventID": + if e.complexity.Event.EventID == nil { + break + } + + return e.complexity.Event.EventID(childComplexity), true + + case "Event.eventMeta": + if e.complexity.Event.EventMeta == nil { + break + } + + return e.complexity.Event.EventMeta(childComplexity), true + + case "Event.id": + if e.complexity.Event.ID == nil { + break + } + + return e.complexity.Event.ID(childComplexity), true + + case "Event.values": + if e.complexity.Event.Values == nil { + break + } + + return e.complexity.Event.Values(childComplexity), true + case "Meta.created": if e.complexity.Meta.Created == nil { break @@ -247,6 +287,18 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in return e.complexity.PostEvent.Tags(childComplexity), true + case "Query.events": + if e.complexity.Query.Events == nil { + break + } + + args, err := ec.field_Query_events_args(context.TODO(), rawArgs) + if err != nil { + return 0, false + } + + return e.complexity.Query.Events(childComplexity, args["streamID"].(string), args["paging"].(*gql.PageInput)), true + case "Query.posts": if e.complexity.Query.Posts == nil { break @@ -306,6 +358,18 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in return e.complexity.SaltyUser.Pubkey(childComplexity), true + case "Subscription.eventAdded": + if e.complexity.Subscription.EventAdded == nil { + break + } + + args, err := ec.field_Subscription_eventAdded_args(context.TODO(), rawArgs) + if err != nil { + return 0, false + } + + return e.complexity.Subscription.EventAdded(childComplexity, args["streamID"].(string), args["after"].(int64)), true + case "Subscription.postAdded": if e.complexity.Subscription.PostAdded == nil { break @@ -417,6 +481,22 @@ type Meta @goModel(model: "github.com/sour-is/ev/pkg/es/event.Meta") { streamID: String! created: Time! position: Int! +} + +extend type Query { + events(streamID: String! paging: PageInput): Connection! +} +extend type Subscription { + """after == 0 start from begining, after == -1 start from end""" + eventAdded(streamID: String! after: Int! = -1): Event +} + +type Event implements Edge { + id: ID! + + eventID: String! + values: Map! + eventMeta: Meta! }`, BuiltIn: false}, {Name: "../../../pkg/gql/common.graphqls", Input: `scalar Time scalar Map @@ -550,6 +630,30 @@ func (ec *executionContext) field_Query___type_args(ctx context.Context, rawArgs return args, nil } +func (ec *executionContext) field_Query_events_args(ctx context.Context, rawArgs map[string]interface{}) (map[string]interface{}, error) { + var err error + args := map[string]interface{}{} + var arg0 string + if tmp, ok := rawArgs["streamID"]; ok { + ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("streamID")) + arg0, err = ec.unmarshalNString2string(ctx, tmp) + if err != nil { + return nil, err + } + } + args["streamID"] = arg0 + var arg1 *gql.PageInput + if tmp, ok := rawArgs["paging"]; ok { + ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("paging")) + arg1, err = ec.unmarshalOPageInput2ᚖgithubᚗcomᚋsourᚑisᚋevᚋpkgᚋgqlᚐPageInput(ctx, tmp) + if err != nil { + return nil, err + } + } + args["paging"] = arg1 + return args, nil +} + func (ec *executionContext) field_Query_posts_args(ctx context.Context, rawArgs map[string]interface{}) (map[string]interface{}, error) { var err error args := map[string]interface{}{} @@ -589,6 +693,30 @@ func (ec *executionContext) field_Query_saltyUser_args(ctx context.Context, rawA return args, nil } +func (ec *executionContext) field_Subscription_eventAdded_args(ctx context.Context, rawArgs map[string]interface{}) (map[string]interface{}, error) { + var err error + args := map[string]interface{}{} + var arg0 string + if tmp, ok := rawArgs["streamID"]; ok { + ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("streamID")) + arg0, err = ec.unmarshalNString2string(ctx, tmp) + if err != nil { + return nil, err + } + } + args["streamID"] = arg0 + var arg1 int64 + if tmp, ok := rawArgs["after"]; ok { + ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("after")) + arg1, err = ec.unmarshalNInt2int64(ctx, tmp) + if err != nil { + return nil, err + } + } + args["after"] = arg1 + return args, nil +} + func (ec *executionContext) field_Subscription_postAdded_args(ctx context.Context, rawArgs map[string]interface{}) (map[string]interface{}, error) { var err error args := map[string]interface{}{} @@ -749,6 +877,192 @@ func (ec *executionContext) fieldContext_Connection_edges(ctx context.Context, f return fc, nil } +func (ec *executionContext) _Event_id(ctx context.Context, field graphql.CollectedField, obj *model.Event) (ret graphql.Marshaler) { + fc, err := ec.fieldContext_Event_id(ctx, field) + if err != nil { + return graphql.Null + } + ctx = graphql.WithFieldContext(ctx, fc) + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return obj.ID, nil + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + if !graphql.HasFieldError(ctx, fc) { + ec.Errorf(ctx, "must not be null") + } + return graphql.Null + } + res := resTmp.(string) + fc.Result = res + return ec.marshalNID2string(ctx, field.Selections, res) +} + +func (ec *executionContext) fieldContext_Event_id(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "Event", + Field: field, + IsMethod: false, + IsResolver: false, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + return nil, errors.New("field of type ID does not have child fields") + }, + } + return fc, nil +} + +func (ec *executionContext) _Event_eventID(ctx context.Context, field graphql.CollectedField, obj *model.Event) (ret graphql.Marshaler) { + fc, err := ec.fieldContext_Event_eventID(ctx, field) + if err != nil { + return graphql.Null + } + ctx = graphql.WithFieldContext(ctx, fc) + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return obj.EventID, nil + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + if !graphql.HasFieldError(ctx, fc) { + ec.Errorf(ctx, "must not be null") + } + return graphql.Null + } + res := resTmp.(string) + fc.Result = res + return ec.marshalNString2string(ctx, field.Selections, res) +} + +func (ec *executionContext) fieldContext_Event_eventID(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "Event", + Field: field, + IsMethod: false, + IsResolver: false, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + return nil, errors.New("field of type String does not have child fields") + }, + } + return fc, nil +} + +func (ec *executionContext) _Event_values(ctx context.Context, field graphql.CollectedField, obj *model.Event) (ret graphql.Marshaler) { + fc, err := ec.fieldContext_Event_values(ctx, field) + if err != nil { + return graphql.Null + } + ctx = graphql.WithFieldContext(ctx, fc) + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return obj.Values, nil + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + if !graphql.HasFieldError(ctx, fc) { + ec.Errorf(ctx, "must not be null") + } + return graphql.Null + } + res := resTmp.(map[string]interface{}) + fc.Result = res + return ec.marshalNMap2map(ctx, field.Selections, res) +} + +func (ec *executionContext) fieldContext_Event_values(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "Event", + Field: field, + IsMethod: false, + IsResolver: false, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + return nil, errors.New("field of type Map does not have child fields") + }, + } + return fc, nil +} + +func (ec *executionContext) _Event_eventMeta(ctx context.Context, field graphql.CollectedField, obj *model.Event) (ret graphql.Marshaler) { + fc, err := ec.fieldContext_Event_eventMeta(ctx, field) + if err != nil { + return graphql.Null + } + ctx = graphql.WithFieldContext(ctx, fc) + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return obj.EventMeta, nil + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + if !graphql.HasFieldError(ctx, fc) { + ec.Errorf(ctx, "must not be null") + } + return graphql.Null + } + res := resTmp.(*event.Meta) + fc.Result = res + return ec.marshalNMeta2ᚖgithubᚗcomᚋsourᚑisᚋevᚋpkgᚋesᚋeventᚐMeta(ctx, field.Selections, res) +} + +func (ec *executionContext) fieldContext_Event_eventMeta(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "Event", + Field: field, + IsMethod: false, + IsResolver: false, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + switch field.Name { + case "eventID": + return ec.fieldContext_Meta_eventID(ctx, field) + case "streamID": + return ec.fieldContext_Meta_streamID(ctx, field) + case "created": + return ec.fieldContext_Meta_created(ctx, field) + case "position": + return ec.fieldContext_Meta_position(ctx, field) + } + return nil, fmt.Errorf("no field named %q was found under type Meta", field.Name) + }, + } + return fc, nil +} + func (ec *executionContext) _Meta_eventID(ctx context.Context, field graphql.CollectedField, obj *event.Meta) (ret graphql.Marshaler) { fc, err := ec.fieldContext_Meta_eventID(ctx, field) if err != nil { @@ -1393,6 +1707,67 @@ func (ec *executionContext) fieldContext_PostEvent_meta(ctx context.Context, fie return fc, nil } +func (ec *executionContext) _Query_events(ctx context.Context, field graphql.CollectedField) (ret graphql.Marshaler) { + fc, err := ec.fieldContext_Query_events(ctx, field) + if err != nil { + return graphql.Null + } + ctx = graphql.WithFieldContext(ctx, fc) + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return ec.resolvers.Query().Events(rctx, fc.Args["streamID"].(string), fc.Args["paging"].(*gql.PageInput)) + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + if !graphql.HasFieldError(ctx, fc) { + ec.Errorf(ctx, "must not be null") + } + return graphql.Null + } + res := resTmp.(*gql.Connection) + fc.Result = res + return ec.marshalNConnection2ᚖgithubᚗcomᚋsourᚑisᚋevᚋpkgᚋgqlᚐConnection(ctx, field.Selections, res) +} + +func (ec *executionContext) fieldContext_Query_events(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "Query", + Field: field, + IsMethod: true, + IsResolver: true, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + switch field.Name { + case "paging": + return ec.fieldContext_Connection_paging(ctx, field) + case "edges": + return ec.fieldContext_Connection_edges(ctx, field) + } + return nil, fmt.Errorf("no field named %q was found under type Connection", field.Name) + }, + } + defer func() { + if r := recover(); r != nil { + err = ec.Recover(ctx, r) + ec.Error(ctx, err) + } + }() + ctx = graphql.WithFieldContext(ctx, fc) + if fc.Args, err = ec.field_Query_events_args(ctx, field.ArgumentMap(ec.Variables)); err != nil { + ec.Error(ctx, err) + return + } + return fc, nil +} + func (ec *executionContext) _Query_posts(ctx context.Context, field graphql.CollectedField) (ret graphql.Marshaler) { fc, err := ec.fieldContext_Query_posts(ctx, field) if err != nil { @@ -1869,6 +2244,82 @@ func (ec *executionContext) fieldContext_SaltyUser_endpoint(ctx context.Context, return fc, nil } +func (ec *executionContext) _Subscription_eventAdded(ctx context.Context, field graphql.CollectedField) (ret func(ctx context.Context) graphql.Marshaler) { + fc, err := ec.fieldContext_Subscription_eventAdded(ctx, field) + if err != nil { + return nil + } + ctx = graphql.WithFieldContext(ctx, fc) + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = nil + } + }() + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return ec.resolvers.Subscription().EventAdded(rctx, fc.Args["streamID"].(string), fc.Args["after"].(int64)) + }) + if err != nil { + ec.Error(ctx, err) + return nil + } + if resTmp == nil { + return nil + } + return func(ctx context.Context) graphql.Marshaler { + select { + case res, ok := <-resTmp.(<-chan *model.Event): + if !ok { + return nil + } + return graphql.WriterFunc(func(w io.Writer) { + w.Write([]byte{'{'}) + graphql.MarshalString(field.Alias).MarshalGQL(w) + w.Write([]byte{':'}) + ec.marshalOEvent2ᚖgithubᚗcomᚋsourᚑisᚋevᚋinternalᚋgraphᚋmodelᚐEvent(ctx, field.Selections, res).MarshalGQL(w) + w.Write([]byte{'}'}) + }) + case <-ctx.Done(): + return nil + } + } +} + +func (ec *executionContext) fieldContext_Subscription_eventAdded(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "Subscription", + Field: field, + IsMethod: true, + IsResolver: true, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + switch field.Name { + case "id": + return ec.fieldContext_Event_id(ctx, field) + case "eventID": + return ec.fieldContext_Event_eventID(ctx, field) + case "values": + return ec.fieldContext_Event_values(ctx, field) + case "eventMeta": + return ec.fieldContext_Event_eventMeta(ctx, field) + } + return nil, fmt.Errorf("no field named %q was found under type Event", field.Name) + }, + } + defer func() { + if r := recover(); r != nil { + err = ec.Recover(ctx, r) + ec.Error(ctx, err) + } + }() + ctx = graphql.WithFieldContext(ctx, fc) + if fc.Args, err = ec.field_Subscription_eventAdded_args(ctx, field.ArgumentMap(ec.Variables)); err != nil { + ec.Error(ctx, err) + return + } + return fc, nil +} + func (ec *executionContext) _Subscription_postAdded(ctx context.Context, field graphql.CollectedField) (ret func(ctx context.Context) graphql.Marshaler) { fc, err := ec.fieldContext_Subscription_postAdded(ctx, field) if err != nil { @@ -3812,6 +4263,13 @@ func (ec *executionContext) _Edge(ctx context.Context, sel ast.SelectionSet, obj switch obj := (obj).(type) { case nil: return graphql.Null + case model.Event: + return ec._Event(ctx, sel, &obj) + case *model.Event: + if obj == nil { + return graphql.Null + } + return ec._Event(ctx, sel, obj) case *msgbus.PostEvent: if obj == nil { return graphql.Null @@ -3861,6 +4319,55 @@ func (ec *executionContext) _Connection(ctx context.Context, sel ast.SelectionSe return out } +var eventImplementors = []string{"Event", "Edge"} + +func (ec *executionContext) _Event(ctx context.Context, sel ast.SelectionSet, obj *model.Event) graphql.Marshaler { + fields := graphql.CollectFields(ec.OperationContext, sel, eventImplementors) + out := graphql.NewFieldSet(fields) + var invalids uint32 + for i, field := range fields { + switch field.Name { + case "__typename": + out.Values[i] = graphql.MarshalString("Event") + case "id": + + out.Values[i] = ec._Event_id(ctx, field, obj) + + if out.Values[i] == graphql.Null { + invalids++ + } + case "eventID": + + out.Values[i] = ec._Event_eventID(ctx, field, obj) + + if out.Values[i] == graphql.Null { + invalids++ + } + case "values": + + out.Values[i] = ec._Event_values(ctx, field, obj) + + if out.Values[i] == graphql.Null { + invalids++ + } + case "eventMeta": + + out.Values[i] = ec._Event_eventMeta(ctx, field, obj) + + if out.Values[i] == graphql.Null { + invalids++ + } + default: + panic("unknown field " + strconv.Quote(field.Name)) + } + } + out.Dispatch() + if invalids > 0 { + return graphql.Null + } + return out +} + var metaImplementors = []string{"Meta"} func (ec *executionContext) _Meta(ctx context.Context, sel ast.SelectionSet, obj *event.Meta) graphql.Marshaler { @@ -4083,6 +4590,29 @@ func (ec *executionContext) _Query(ctx context.Context, sel ast.SelectionSet) gr switch field.Name { case "__typename": out.Values[i] = graphql.MarshalString("Query") + case "events": + field := field + + innerFunc := func(ctx context.Context) (res graphql.Marshaler) { + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + } + }() + res = ec._Query_events(ctx, field) + if res == graphql.Null { + atomic.AddUint32(&invalids, 1) + } + return res + } + + rrm := func(ctx context.Context) graphql.Marshaler { + return ec.OperationContext.RootResolverMiddleware(ctx, innerFunc) + } + + out.Concurrently(i, func() graphql.Marshaler { + return rrm(innerCtx) + }) case "posts": field := field @@ -4247,6 +4777,8 @@ func (ec *executionContext) _Subscription(ctx context.Context, sel ast.Selection } switch fields[0].Name { + case "eventAdded": + return ec._Subscription_eventAdded(ctx, fields[0]) case "postAdded": return ec._Subscription_postAdded(ctx, fields[0]) default: @@ -5126,6 +5658,13 @@ func (ec *executionContext) marshalOBoolean2ᚖbool(ctx context.Context, sel ast return res } +func (ec *executionContext) marshalOEvent2ᚖgithubᚗcomᚋsourᚑisᚋevᚋinternalᚋgraphᚋmodelᚐEvent(ctx context.Context, sel ast.SelectionSet, v *model.Event) graphql.Marshaler { + if v == nil { + return graphql.Null + } + return ec._Event(ctx, sel, v) +} + func (ec *executionContext) unmarshalOInt2ᚖint64(ctx context.Context, v interface{}) (*int64, error) { if v == nil { return nil, nil diff --git a/internal/graph/model/models_gen.go b/internal/graph/model/models_gen.go index 8e0d251..4461ebe 100644 --- a/internal/graph/model/models_gen.go +++ b/internal/graph/model/models_gen.go @@ -1,3 +1,16 @@ // Code generated by github.com/99designs/gqlgen, DO NOT EDIT. package model + +import ( + "github.com/sour-is/ev/pkg/es/event" +) + +type Event struct { + ID string `json:"id"` + EventID string `json:"eventID"` + Values map[string]interface{} `json:"values"` + EventMeta *event.Meta `json:"eventMeta"` +} + +func (Event) IsEdge() {} diff --git a/internal/logz/init.go b/internal/logz/init.go index bfef13d..f2a097f 100644 --- a/internal/logz/init.go +++ b/internal/logz/init.go @@ -4,6 +4,7 @@ import ( "context" "log" "os" + "strings" "go.uber.org/multierr" ) @@ -34,9 +35,34 @@ func Init(ctx context.Context, name string) (context.Context, func() error) { } func env(name, defaultValue string) string { - if v := os.Getenv(name); v != "" { - log.Println("# ", name, " = ", v) + name = strings.TrimSpace(name) + defaultValue = strings.TrimSpace(defaultValue) + if v := strings.TrimSpace(os.Getenv(name)); v != "" { + log.Println("# ", name, "=", v) return v } + log.Println("# ", name, "=", defaultValue, "(default)") return defaultValue } + +type secret string + +func (s secret) String() string { + if s == "" { + return "(nil)" + } + return "***" +} +func (s secret) Secret() string { + return string(s) +} +func envSecret(name, defaultValue string) secret { + name = strings.TrimSpace(name) + defaultValue = strings.TrimSpace(defaultValue) + if v := strings.TrimSpace(os.Getenv(name)); v != "" { + log.Println("# ", name, "=", secret(v)) + return secret(v) + } + log.Println("# ", name, "=", secret(defaultValue), "(default)") + return secret(defaultValue) +} diff --git a/internal/logz/logger.go b/internal/logz/logger.go index 05c0f26..3c1a7c9 100644 --- a/internal/logz/logger.go +++ b/internal/logz/logger.go @@ -64,13 +64,13 @@ func initLogger(name string) func() error { log.SetPrefix("[" + name + "] ") log.SetFlags(log.LstdFlags&^(log.Ldate|log.Ltime) | log.Lshortfile) - token := env("LOGZIO_LOG_TOKEN", "") + token := envSecret("LOGZIO_LOG_TOKEN", "") if token == "" { return nil } l, err := logzio.New( - token, + token.Secret(), // logzio.SetDebug(os.Stderr), logzio.SetUrl(env("LOGZIO_LOG_URL", "https://listener.logz.io:8071")), logzio.SetDrainDuration(time.Second*5), diff --git a/internal/logz/tracer.go b/internal/logz/tracer.go index d1925c4..6188140 100644 --- a/internal/logz/tracer.go +++ b/internal/logz/tracer.go @@ -71,9 +71,13 @@ func initTracing(ctx context.Context, name string) (context.Context, func() erro return ctx, nil } + exporterAddr := env("EV_TRACE_ENDPOINT", "") + if exporterAddr == "" { + return ctx, nil + } traceExporter, err := otlptracehttp.New(ctx, otlptracehttp.WithInsecure(), - otlptracehttp.WithEndpoint("localhost:4318"), + otlptracehttp.WithEndpoint(exporterAddr), ) if err != nil { log.Println(wrap(err, "failed to create trace exporter")) diff --git a/main.go b/main.go index f598e27..44f28ab 100644 --- a/main.go +++ b/main.go @@ -1,12 +1,15 @@ package main import ( + "bytes" "context" + "fmt" "log" "net/http" "os" "os/signal" "path" + "strings" "time" "github.com/rs/cors" @@ -14,6 +17,7 @@ import ( "github.com/sour-is/ev/app/gql" "github.com/sour-is/ev/app/msgbus" + "github.com/sour-is/ev/app/peerfinder" "github.com/sour-is/ev/app/salty" "github.com/sour-is/ev/internal/logz" "github.com/sour-is/ev/pkg/es" @@ -57,25 +61,55 @@ func run(ctx context.Context) error { Addr: env("EV_HTTP", ":8080"), } - salty, err := salty.New(ctx, es, path.Join(env("EV_BASE_URL", "http://localhost"+s.Addr), "inbox")) - if err != nil { - span.RecordError(err) - return err + if strings.HasPrefix(s.Addr, ":") { + s.Addr = "[::]" + s.Addr } - msgbus, err := msgbus.New(ctx, es) - if err != nil { - span.RecordError(err) - return err + enable := set(strings.Fields(env("EV_ENABLE", "salty msgbus gql peers"))...) + var svcs []interface{ RegisterHTTP(*http.ServeMux) } + + if enable.Has("salty") { + span.AddEvent("Enable Salty") + salty, err := salty.New(ctx, es, path.Join(env("EV_BASE_URL", "http://"+s.Addr), "inbox")) + if err != nil { + span.RecordError(err) + return err + } + svcs = append(svcs, salty) } - gql, err := gql.New(ctx, msgbus, salty) - if err != nil { - span.RecordError(err) - return err + if enable.Has("msgbus") { + span.AddEvent("Enable Msgbus") + msgbus, err := msgbus.New(ctx, es) + if err != nil { + span.RecordError(err) + return err + } + svcs = append(svcs, msgbus) } - s.Handler = httpMux(logz.NewHTTP(ctx), msgbus, salty, gql) + if enable.Has("peers") { + span.AddEvent("Enable Peers") + peers, err := peerfinder.New(ctx, es) + if err != nil { + span.RecordError(err) + return err + } + svcs = append(svcs, peers) + } + + if enable.Has("gql") { + span.AddEvent("Enable GraphQL") + gql, err := gql.New(ctx, svcs...) + if err != nil { + span.RecordError(err) + return err + } + svcs = append(svcs, gql) + } + svcs = append(svcs, logz.NewHTTP(ctx)) + + s.Handler = httpMux(svcs...) log.Print("Listen on ", s.Addr) span.AddEvent("begin listen and serve") @@ -104,10 +138,13 @@ func run(ctx context.Context) error { return nil } func env(name, defaultValue string) string { - if v := os.Getenv(name); v != "" { - log.Println("# ", name, " = ", v) + name = strings.TrimSpace(name) + defaultValue = strings.TrimSpace(defaultValue) + if v := strings.TrimSpace(os.Getenv(name)); v != "" { + log.Println("#", name, "=", v) return v } + log.Println("#", name, "=", defaultValue, "(default)") return defaultValue } func httpMux(fns ...interface{ RegisterHTTP(*http.ServeMux) }) http.Handler { @@ -117,3 +154,32 @@ func httpMux(fns ...interface{ RegisterHTTP(*http.ServeMux) }) http.Handler { } return cors.AllowAll().Handler(mux) } + +type Set[T comparable] map[T]struct{} + +func set[T comparable](items ...T) Set[T] { + s := make(map[T]struct{}, len(items)) + for i := range items { + s[items[i]] = struct{}{} + } + return s +} +func (s Set[T]) Has(v T) bool { + _, ok := (s)[v] + return ok +} +func (s Set[T]) String() string { + if s == nil { + return "set()" + } + lis := make([]string, 0, len(s)) + for k := range s { + lis = append(lis, fmt.Sprint(k)) + } + + var b bytes.Buffer + b.WriteString("set(") + b.WriteString(strings.Join(lis, ",")) + b.WriteString(")") + return b.String() +} diff --git a/pkg/es/es.go b/pkg/es/es.go index 35ae4db..78e2ef3 100644 --- a/pkg/es/es.go +++ b/pkg/es/es.go @@ -224,6 +224,10 @@ type PA[T any] interface { event.Aggregate *T } +type PE[T any] interface { + event.Event + *T +} // Create uses fn to create a new aggregate and store in db. func Create[A any, T PA[A]](ctx context.Context, es *EventStore, streamID string, fn func(context.Context, T) error) (agg T, err error) { @@ -281,3 +285,30 @@ func Update[A any, T PA[A]](ctx context.Context, es *EventStore, streamID string return } + +// Update uses fn to update an exsisting aggregate and store in db. +func Upsert[A any, T PA[A]](ctx context.Context, es *EventStore, streamID string, fn func(context.Context, T) error) (agg T, err error) { + ctx, span := logz.Span(ctx) + defer span.End() + + agg = new(A) + agg.SetStreamID(streamID) + + if err = es.Load(ctx, agg); err != nil { + return + } + + if err = fn(ctx, agg); err != nil { + return + } + + if err = event.ShouldExist(agg); err != nil { + return + } + + if _, err = es.Save(ctx, agg); err != nil { + return + } + + return +} diff --git a/pkg/es/es.graphqls b/pkg/es/es.graphqls index 836253c..0ebafc6 100644 --- a/pkg/es/es.graphqls +++ b/pkg/es/es.graphqls @@ -4,4 +4,20 @@ type Meta @goModel(model: "github.com/sour-is/ev/pkg/es/event.Meta") { streamID: String! created: Time! position: Int! +} + +extend type Query { + events(streamID: String! paging: PageInput): Connection! +} +extend type Subscription { + """after == 0 start from begining, after == -1 start from end""" + eventAdded(streamID: String! after: Int! = -1): Event +} + +type Event implements Edge { + id: ID! + + eventID: String! + values: Map! + eventMeta: Meta! } \ No newline at end of file diff --git a/pkg/es/event/aggregate.go b/pkg/es/event/aggregate.go index 6b0e9f3..1e00436 100644 --- a/pkg/es/event/aggregate.go +++ b/pkg/es/event/aggregate.go @@ -29,16 +29,16 @@ func Append(a Aggregate, lis ...Event) { // NotExists returns error if there are no events present. func NotExists(a Aggregate) error { - if a.StreamVersion() != 0 { - return fmt.Errorf("%w, got version == %d", ErrShouldNotExist, a.StreamVersion()) + if a.Version() != 0 { + return fmt.Errorf("%w, got version == %d", ErrShouldNotExist, a.Version()) } return nil } // ShouldExists returns error if there are no events present. func ShouldExist(a Aggregate) error { - if a.StreamVersion() == 0 { - return fmt.Errorf("%w, got version == %d", ErrShouldExist, a.StreamVersion()) + if a.Version() == 0 { + return fmt.Errorf("%w, got version == %d", ErrShouldExist, a.Version()) } return nil } diff --git a/pkg/es/event/reflect.go b/pkg/es/event/reflect.go index f6faef5..636fd85 100644 --- a/pkg/es/event/reflect.go +++ b/pkg/es/event/reflect.go @@ -246,3 +246,12 @@ func embedJSON(s string) json.RawMessage { } return []byte(fmt.Sprintf(`"%s"`, strings.Replace(s, `"`, `\"`, -1))) } + +func values(e Event) map[string]any { + m := make(map[string]any) + v := reflect.ValueOf(e) + for _, idx := range reflect.VisibleFields(v) { + field := v.FieldByIndex(idx.Index) + m[field.] + } +}