diff --git a/.air.toml b/.air.toml index 498b348..b69c73a 100644 --- a/.air.toml +++ b/.air.toml @@ -15,9 +15,9 @@ tmp_dir = "tmp" full_bin = "" include_dir = [] include_ext = ["go", "tpl", "tmpl", "html"] - kill_delay = "0s" + kill_delay = "1s" log = "build-errors.log" - send_interrupt = false + send_interrupt = true stop_on_error = true [color] diff --git a/Makefile b/Makefile index 6dc96ed..f0d6d54 100644 --- a/Makefile +++ b/Makefile @@ -2,6 +2,7 @@ export PATH:=$(shell go env GOPATH)/bin:$(PATH) export EV_DATA=mem: export EV_HTTP=:8080 export EV_TRACE_SAMPLE=always +export EV_TRACE_ENDPOINT=localhost:4318 -include local.mk air: gen diff --git a/README.md b/README.md index 8b9a00c..d102203 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,5 @@ -# ev +# ev - Eventsourcing Playground -Eventsourcing Playground +This project is learnings in building an eventstore and applications around it. + +Feel free to explore. \ No newline at end of file diff --git a/app/playground/playground.go b/app/gql/playground/playground.go similarity index 100% rename from app/playground/playground.go rename to app/gql/playground/playground.go diff --git a/app/gql/resolver.go b/app/gql/resolver.go new file mode 100644 index 0000000..a62737d --- /dev/null +++ b/app/gql/resolver.go @@ -0,0 +1,132 @@ +package gql + +import ( + "context" + "fmt" + "net/http" + "os" + "reflect" + "runtime/debug" + + "github.com/99designs/gqlgen/graphql/handler" + "github.com/ravilushqa/otelgqlgen" + "github.com/sour-is/ev/app/gql/playground" + "github.com/sour-is/ev/app/msgbus" + "github.com/sour-is/ev/app/salty" + "github.com/sour-is/ev/internal/graph/generated" + "github.com/sour-is/ev/internal/lg" + "github.com/sour-is/ev/pkg/es" + "github.com/sour-is/ev/pkg/gql" + "github.com/vektah/gqlparser/v2/gqlerror" +) + +type Resolver struct { + msgbus.MsgbusResolver + salty.SaltyResolver + es.EventResolver +} + +func New(ctx context.Context, resolvers ...interface{ RegisterHTTP(*http.ServeMux) }) (*Resolver, error) { + _, span := lg.Span(ctx) + defer span.End() + + r := &Resolver{} + v := reflect.ValueOf(r) + v = reflect.Indirect(v) + noop := reflect.ValueOf(&noop{}) + +outer: + for _, idx := range reflect.VisibleFields(v.Type()) { + field := v.FieldByIndex(idx.Index) + + for i := range resolvers { + rs := reflect.ValueOf(resolvers[i]) + + if field.IsNil() && rs.Type().Implements(field.Type()) { + span.AddEvent(fmt.Sprint("found ", field.Type().Name())) + field.Set(rs) + continue outer + } + } + + span.AddEvent(fmt.Sprint("default ", field.Type().Name())) + field.Set(noop) + } + + return r, nil +} + +// Query returns generated.QueryResolver implementation. +func (r *Resolver) Query() generated.QueryResolver { return r } + +// Query returns generated.QueryResolver implementation. +func (r *Resolver) Mutation() generated.MutationResolver { return r } + +// Subscription returns generated.SubscriptionResolver implementation. +func (r *Resolver) Subscription() generated.SubscriptionResolver { return r } + +// ChainMiddlewares will check all embeded resolvers for a GetMiddleware func and add to handler. +func (r *Resolver) ChainMiddlewares(h http.Handler) http.Handler { + v := reflect.ValueOf(r) // Get reflected value of *Resolver + v = reflect.Indirect(v) // Get the pointed value (returns a zero value on nil) + n := v.NumField() // Get number of fields to iterate over. + for i := 0; i < n; i++ { + f := v.Field(i) + if !f.CanInterface() { // Skip non-interface types. + continue + } + if iface, ok := f.Interface().(interface { + GetMiddleware() func(http.Handler) http.Handler + }); ok { + h = iface.GetMiddleware()(h) // Append only items that fulfill the interface. + } + } + + return h +} + +func (r *Resolver) RegisterHTTP(mux *http.ServeMux) { + gql := handler.NewDefaultServer(generated.NewExecutableSchema(generated.Config{Resolvers: r})) + gql.SetRecoverFunc(NoopRecover) + gql.Use(otelgqlgen.Middleware()) + mux.Handle("/", playground.Handler("GraphQL playground", "/gql")) + mux.Handle("/gql", lg.Htrace(r.ChainMiddlewares(gql), "gql")) +} + +type noop struct{} + +func NoopRecover(ctx context.Context, err interface{}) error { + if err, ok := err.(string); ok && err == "not implemented" { + return gqlerror.Errorf("not implemented") + } + fmt.Fprintln(os.Stderr, err) + fmt.Fprintln(os.Stderr) + debug.PrintStack() + + return gqlerror.Errorf("internal system error") +} + +var _ msgbus.MsgbusResolver = (*noop)(nil) +var _ salty.SaltyResolver = (*noop)(nil) +var _ es.EventResolver = (*noop)(nil) + +func (*noop) CreateSaltyUser(ctx context.Context, nick string, pubkey string) (*salty.SaltyUser, error) { + panic("not implemented") +} +func (*noop) Posts(ctx context.Context, streamID string, paging *gql.PageInput) (*gql.Connection, error) { + panic("not implemented") +} +func (*noop) SaltyUser(ctx context.Context, nick string) (*salty.SaltyUser, error) { + panic("not implemented") +} +func (*noop) PostAdded(ctx context.Context, streamID string, after int64) (<-chan *msgbus.PostEvent, error) { + panic("not implemented") +} +func (*noop) Events(ctx context.Context, streamID string, paging *gql.PageInput) (*gql.Connection, error) { + panic("not implemented") +} +func (*noop) EventAdded(ctx context.Context, streamID string, after int64) (<-chan *es.GQLEvent, error) { + panic("not implemented") +} + +func (*noop) RegisterHTTP(*http.ServeMux) {} diff --git a/app/msgbus/service.go b/app/msgbus/service.go index c2e9aff..70653a6 100644 --- a/app/msgbus/service.go +++ b/app/msgbus/service.go @@ -12,7 +12,7 @@ import ( "time" "github.com/gorilla/websocket" - "github.com/sour-is/ev/internal/logz" + "github.com/sour-is/ev/internal/lg" "github.com/sour-is/ev/pkg/es" "github.com/sour-is/ev/pkg/es/event" "github.com/sour-is/ev/pkg/gql" @@ -31,10 +31,11 @@ type service struct { type MsgbusResolver interface { Posts(ctx context.Context, streamID string, paging *gql.PageInput) (*gql.Connection, error) PostAdded(ctx context.Context, streamID string, after int64) (<-chan *PostEvent, error) + RegisterHTTP(mux *http.ServeMux) } func New(ctx context.Context, es *es.EventStore) (*service, error) { - ctx, span := logz.Span(ctx) + ctx, span := lg.Span(ctx) defer span.End() if err := event.Register(ctx, &PostEvent{}); err != nil { @@ -44,7 +45,7 @@ func New(ctx context.Context, es *es.EventStore) (*service, error) { return nil, err } - m := logz.Meter(ctx) + m := lg.Meter(ctx) svc := &service{es: es} @@ -71,11 +72,11 @@ var upgrader = websocket.Upgrader{ } func (s *service) RegisterHTTP(mux *http.ServeMux) { - mux.Handle("/inbox/", logz.Htrace(http.StripPrefix("/inbox/", s), "inbox")) + mux.Handle("/inbox/", lg.Htrace(http.StripPrefix("/inbox/", s), "inbox")) } func (s *service) ServeHTTP(w http.ResponseWriter, r *http.Request) { ctx := r.Context() - ctx, span := logz.Span(ctx) + ctx, span := lg.Span(ctx) defer span.End() r = r.WithContext(ctx) @@ -96,7 +97,7 @@ func (s *service) ServeHTTP(w http.ResponseWriter, r *http.Request) { // Posts is the resolver for the events field. func (r *service) Posts(ctx context.Context, streamID string, paging *gql.PageInput) (*gql.Connection, error) { - ctx, span := logz.Span(ctx) + ctx, span := lg.Span(ctx) defer span.End() r.Mresolver_posts.Add(ctx, 1) @@ -142,7 +143,7 @@ func (r *service) Posts(ctx context.Context, streamID string, paging *gql.PageIn } func (r *service) PostAdded(ctx context.Context, streamID string, after int64) (<-chan *PostEvent, error) { - ctx, span := logz.Span(ctx) + ctx, span := lg.Span(ctx) defer span.End() r.Mresolver_post_added.Add(ctx, 1) @@ -161,15 +162,19 @@ func (r *service) PostAdded(ctx context.Context, streamID string, after int64) ( ch := make(chan *PostEvent) go func() { - ctx, span := logz.Span(ctx) + ctx, span := lg.Span(ctx) defer span.End() - defer func() { - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) - defer cancel() - err := sub.Close(ctx) - span.RecordError(err) - }() + { + ctx, span := lg.Fork(ctx) + defer func() { + defer span.End() + ctx, cancel := context.WithTimeout(ctx, 1*time.Second) + defer cancel() + err := sub.Close(ctx) + span.RecordError(err) + }() + } for sub.Recv(ctx) { events, err := sub.Events(ctx) @@ -198,7 +203,7 @@ func (r *service) PostAdded(ctx context.Context, streamID string, after int64) ( func (s *service) get(w http.ResponseWriter, r *http.Request) { ctx := r.Context() - ctx, span := logz.Span(ctx) + ctx, span := lg.Span(ctx) defer span.End() name, _, _ := strings.Cut(r.URL.Path, "/") @@ -252,7 +257,7 @@ func (s *service) get(w http.ResponseWriter, r *http.Request) { func (s *service) post(w http.ResponseWriter, r *http.Request) { ctx := r.Context() - ctx, span := logz.Span(ctx) + ctx, span := lg.Span(ctx) defer span.End() name, tags, _ := strings.Cut(r.URL.Path, "/") @@ -319,7 +324,7 @@ func (s *service) post(w http.ResponseWriter, r *http.Request) { } func (s *service) websocket(w http.ResponseWriter, r *http.Request) { ctx := r.Context() - ctx, span := logz.Span(ctx) + ctx, span := lg.Span(ctx) defer span.End() name, _, _ := strings.Cut(r.URL.Path, "/") @@ -380,12 +385,16 @@ func (s *service) websocket(w http.ResponseWriter, r *http.Request) { span.RecordError(err) return } - defer func() { - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) - defer cancel() - span.AddEvent("stop ws") - sub.Close(ctx) - }() + { + ctx, span := lg.Fork(ctx) + defer func() { + defer span.End() + ctx, cancel := context.WithTimeout(ctx, 1*time.Second) + defer cancel() + err := sub.Close(ctx) + span.RecordError(err) + }() + } span.AddEvent("start ws") for sub.Recv(ctx) { @@ -434,14 +443,21 @@ func (e *PostEvent) SetEventMeta(eventMeta event.Meta) { } e.eventMeta = eventMeta } -func (e *PostEvent) MarshalBinary() ([]byte, error) { - j := struct { +func (e *PostEvent) Values() any { + if e == nil { + return nil + } + + return struct { Payload []byte Tags []string }{ Payload: e.payload, Tags: e.tags, } +} +func (e *PostEvent) MarshalBinary() ([]byte, error) { + j := e.Values() return json.Marshal(&j) } func (e *PostEvent) UnmarshalBinary(b []byte) error { diff --git a/app/peerfinder/peer.go b/app/peerfinder/peer.go new file mode 100644 index 0000000..84962c6 --- /dev/null +++ b/app/peerfinder/peer.go @@ -0,0 +1,183 @@ +package peerfinder + +import ( + "bytes" + "encoding/json" + "net/netip" + "strconv" + "time" + + "github.com/sour-is/ev/pkg/es/event" +) + +type Request struct { + eventMeta event.Meta + + RequestIP string `json:"req_ip"` + Hidden bool `json:"hide,omitempty"` +} + +func (r *Request) StreamID() string { + return r.EventMeta().GetEventID() +} +func (r *Request) RequestID() string { + return r.EventMeta().GetEventID() +} +func (r *Request) Created() time.Time { + return r.EventMeta().Created() +} +func (r *Request) CreatedString() string { + return r.Created().Format("2006-01-02 15:04:05") +} +func (r *Request) Family() int { + if r == nil { + return 0 + } + + ip, err := netip.ParseAddr(r.RequestIP) + switch { + case err != nil: + return 0 + case ip.Is4(): + return 1 + default: + return 2 + } +} + +var _ event.Event = (*Request)(nil) + +func (e *Request) EventMeta() event.Meta { + if e == nil { + return event.Meta{} + } + return e.eventMeta +} +func (e *Request) SetEventMeta(m event.Meta) { + if e != nil { + e.eventMeta = m + } +} +func (e *Request) MarshalBinary() (text []byte, err error) { + return json.Marshal(e) +} +func (e *Request) UnmarshalBinary(b []byte) error { + return json.Unmarshal(b, e) +} +func (e *Request) MarshalEnviron() ([]byte, error) { + if e == nil { + return nil, nil + } + + var b bytes.Buffer + b.WriteString("REQ_ID=") + b.WriteString(e.RequestID()) + b.WriteRune('\n') + + b.WriteString("REQ_IP=") + b.WriteString(e.RequestIP) + b.WriteRune('\n') + + b.WriteString("REQ_FAMILY=") + if family := e.Family(); family > 0 { + b.WriteString(strconv.Itoa(family)) + } + b.WriteRune('\n') + + b.WriteString("REQ_CREATED=") + b.WriteString(e.CreatedString()) + b.WriteRune('\n') + + return b.Bytes(), nil +} + +type Result struct { + eventMeta event.Meta + + RequestID string `json:"req_id"` + PeerID string `json:"peer_id"` + PeerVersion string `json:"peer_version"` + Latency float64 `json:"latency,omitempty"` +} + +func (r *Result) Created() time.Time { + return r.eventMeta.Created() +} + +var _ event.Event = (*Result)(nil) + +func (e *Result) EventMeta() event.Meta { + if e == nil { + return event.Meta{} + } + return e.eventMeta +} +func (e *Result) SetEventMeta(m event.Meta) { + if e != nil { + e.eventMeta = m + } +} +func (e *Result) MarshalBinary() (text []byte, err error) { + return json.Marshal(e) +} +func (e *Result) UnmarshalBinary(b []byte) error { + return json.Unmarshal(b, e) +} + +type Info struct { + ScriptVersion string `json:"script_version"` + + event.AggregateRoot +} + +var _ event.Aggregate = (*Info)(nil) + +func (a *Info) ApplyEvent(lis ...event.Event) { + for _, e := range lis { + switch e := e.(type) { + case *VersionChanged: + a.ScriptVersion = e.ScriptVersion + } + } +} +func (a *Info) MarshalEnviron() ([]byte, error) { + var b bytes.Buffer + + b.WriteString("SCRIPT_VERSION=") + b.WriteString(a.ScriptVersion) + b.WriteRune('\n') + + return b.Bytes(), nil +} +func (a *Info) OnCreate() error { + if a.StreamVersion() == 0 { + event.Raise(a, &VersionChanged{ScriptVersion: initVersion}) + } + return nil +} + +type VersionChanged struct { + ScriptVersion string `json:"script_version"` + + eventMeta event.Meta +} + +var _ event.Event = (*VersionChanged)(nil) + +func (e *VersionChanged) EventMeta() event.Meta { + if e == nil { + return event.Meta{} + } + return e.eventMeta +} +func (e *VersionChanged) SetEventMeta(m event.Meta) { + if e != nil { + e.eventMeta = m + } +} +func (e *VersionChanged) MarshalBinary() (text []byte, err error) { + return json.Marshal(e) +} +func (e *VersionChanged) UnmarshalBinary(b []byte) error { + return json.Unmarshal(b, e) +} diff --git a/app/peerfinder/service.go b/app/peerfinder/service.go new file mode 100644 index 0000000..366b209 --- /dev/null +++ b/app/peerfinder/service.go @@ -0,0 +1,268 @@ +package peerfinder + +import ( + "context" + "encoding/json" + "io" + "net/http" + "strconv" + "strings" + + ulid "github.com/oklog/ulid/v2" + contentnegotiation "gitlab.com/jamietanna/content-negotiation-go" + + "github.com/sour-is/ev/internal/lg" + "github.com/sour-is/ev/pkg/es" + "github.com/sour-is/ev/pkg/es/event" +) + +const ( + queueRequests = "pf-requests" + queueResponses = "pf-response-" + aggInfo = "pf-info" + initVersion = "1.1.0" +) + +type service struct { + es *es.EventStore +} + +func New(ctx context.Context, es *es.EventStore) (*service, error) { + ctx, span := lg.Span(ctx) + defer span.End() + + if err := event.Register(ctx, &Request{}, &Result{}, &VersionChanged{}); err != nil { + span.RecordError(err) + return nil, err + } + + svc := &service{es: es} + + return svc, nil +} +func (s *service) RegisterHTTP(mux *http.ServeMux) { + mux.Handle("/peers/", lg.Htrace(s, "peers")) + +} +func (s *service) ServeHTTP(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + + _, span := lg.Span(ctx) + defer span.End() + + r = r.WithContext(ctx) + + switch r.Method { + case http.MethodGet: + switch { + case strings.HasPrefix(r.URL.Path, "/peers/pending/"): + s.getPending(w, r, strings.TrimPrefix(r.URL.Path, "/peers/pending/")) + return + + case strings.HasPrefix(r.URL.Path, "/peers/req/"): + s.getResults(w, r, strings.TrimPrefix(r.URL.Path, "/peers/req/")) + return + + default: + w.WriteHeader(http.StatusNotFound) + return + } + case http.MethodPost: + switch { + case strings.HasPrefix(r.URL.Path, "/peers/req/"): + s.postResult(w, r, strings.TrimPrefix(r.URL.Path, "/peers/req/")) + return + + case strings.HasPrefix(r.URL.Path, "/peers/req"): + s.postRequest(w, r) + return + + default: + w.WriteHeader(http.StatusNotFound) + return + } + default: + w.WriteHeader(http.StatusMethodNotAllowed) + return + } +} + +func (s *service) getPending(w http.ResponseWriter, r *http.Request, uuid string) { + ctx := r.Context() + + _, span := lg.Span(ctx) + defer span.End() + + info, err := es.Upsert(ctx, s.es, "pf-info", func(ctx context.Context, agg *Info) error { + return agg.OnCreate() // initialize if not exists + }) + if err != nil { + span.RecordError(err) + w.WriteHeader(http.StatusInternalServerError) + return + } + + requests, err := s.es.Read(ctx, queueRequests, -1, -30) + if err != nil { + span.RecordError(err) + w.WriteHeader(http.StatusInternalServerError) + return + } + + responses, err := s.es.Read(ctx, queueResponses+uuid, -1, -30) + if err != nil { + span.RecordError(err) + w.WriteHeader(http.StatusInternalServerError) + return + } + + req := filter(requests, responses) + + negotiator := contentnegotiation.NewNegotiator("application/json", "text/environment", "text/plain", "text/html") + negotiated, _, err := negotiator.Negotiate(r.Header.Get("Accept")) + if err != nil { + span.RecordError(err) + w.WriteHeader(http.StatusNotAcceptable) + return + } + + span.AddEvent(negotiated.String()) + switch negotiated.String() { + case "text/environment": + _, err = encodeTo(w, info.MarshalEnviron, req.MarshalEnviron) + case "application/json": + var out interface{} = info + if req != nil { + out = struct { + ScriptVersion string `json:"script_version"` + RequestID string `json:"req_id"` + RequestIP string `json:"req_ip"` + Family string `json:"req_family"` + Created string `json:"req_created"` + }{ + info.ScriptVersion, + req.RequestID(), + req.RequestIP, + strconv.Itoa(req.Family()), + req.CreatedString(), + } + } + err = json.NewEncoder(w).Encode(out) + } + span.RecordError(err) +} +func (s *service) getResults(w http.ResponseWriter, r *http.Request, uuid string) { + ctx := r.Context() + + responses, err := s.es.Read(ctx, queueResponses+uuid, -1, es.AllEvents) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + + negotiator := contentnegotiation.NewNegotiator("application/json", "text/environment", "text/csv", "text/plain", "text/html") + negotiated, _, err := negotiator.Negotiate("application/json") + if err != nil { + w.WriteHeader(http.StatusNotAcceptable) + return + } + switch negotiated.String() { + // case "text/environment": + // encodeTo(w, responses.MarshalBinary) + case "application/json": + json.NewEncoder(w).Encode(responses) + } +} +func (s *service) postRequest(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + + if err := r.ParseForm(); err != nil { + w.WriteHeader(http.StatusUnprocessableEntity) + return + } + + req := &Request{ + RequestIP: r.Form.Get("req_ip"), + } + + if hidden, err := strconv.ParseBool(r.Form.Get("req_hidden")); err != nil { + req.Hidden = hidden + } + + s.es.Append(ctx, queueRequests, event.NewEvents(req)) +} +func (s *service) postResult(w http.ResponseWriter, r *http.Request, id string) { + ctx := r.Context() + + if _, err := ulid.ParseStrict(id); err != nil { + w.WriteHeader(http.StatusNotFound) + return + } + + if err := r.ParseForm(); err != nil { + w.WriteHeader(http.StatusUnprocessableEntity) + return + } + + latency, err := strconv.ParseFloat(r.Form.Get("res_latency"), 64) + if err != nil { + w.WriteHeader(http.StatusUnprocessableEntity) + return + } + + req := &Result{ + RequestID: id, + PeerID: r.Form.Get("peer_id"), + PeerVersion: r.Form.Get("peer_version"), + Latency: latency, + } + + s.es.Append(ctx, queueResponses+id, event.NewEvents(req)) +} + +func filter(requests, responses event.Events) *Request { + have := make(map[string]struct{}, len(responses)) + for _, res := range toList[Result](responses...) { + have[res.RequestID] = struct{}{} + } + for _, req := range reverse(toList[Request](requests...)...) { + if _, ok := have[req.RequestID()]; !ok { + return req + } + } + return nil +} +func toList[E any, T es.PE[E]](lis ...event.Event) []T { + newLis := make([]T, 0, len(lis)) + for i := range lis { + if e, ok := lis[i].(T); ok { + newLis = append(newLis, e) + } + } + return newLis +} +func reverse[T any](s ...T) []T { + first, last := 0, len(s)-1 + for first < last { + s[first], s[last] = s[last], s[first] + first++ + last-- + } + return s +} +func encodeTo(w io.Writer, fns ...func() ([]byte, error)) (int, error) { + i := 0 + for _, fn := range fns { + b, err := fn() + if err != nil { + return i, err + } + + j, err := w.Write(b) + i += j + if err != nil { + return i, err + } + } + return i, nil +} diff --git a/app/salty/salty-addr.go b/app/salty/salty-addr.go new file mode 100644 index 0000000..0fb6d24 --- /dev/null +++ b/app/salty/salty-addr.go @@ -0,0 +1,142 @@ +package salty + +import ( + "context" + "crypto/sha256" + "encoding/json" + "fmt" + "net/http" + "net/url" + "strings" + + "github.com/keys-pub/keys" + "github.com/sour-is/ev/internal/lg" +) + +// Config represents a Salty Config for a User which at a minimum is required +// to have an Endpoint and Key (Public Key) +type Config struct { + Endpoint string `json:"endpoint"` + Key string `json:"key"` +} + +type Capabilities struct { + AcceptEncoding string +} + +func (c Capabilities) String() string { + return fmt.Sprint("accept-encoding: ", c.AcceptEncoding) +} + +type Addr struct { + User string + Domain string + + capabilities Capabilities + discoveredDomain string + dns DNSResolver + endpoint *url.URL + key *keys.EdX25519PublicKey +} + +// ParseAddr parsers a Salty Address for a user into it's user and domain +// parts and returns an Addr object with the User and Domain and a method +// for returning the expected User's Well-Known URI +func (s *service) ParseAddr(addr string) (*Addr, error) { + parts := strings.Split(strings.ToLower(addr), "@") + if len(parts) != 2 { + return nil, fmt.Errorf("expected nick@domain found %q", addr) + } + + return &Addr{User: parts[0], Domain: parts[1], dns: s.dns}, nil +} + +func (a *Addr) String() string { + return fmt.Sprintf("%s@%s", a.User, a.Domain) +} + +// Hash returns the Hex(SHA256Sum()) of the Address +func (a *Addr) Hash() string { + return fmt.Sprintf("%x", sha256.Sum256([]byte(strings.ToLower(a.String())))) +} + +// URI returns the Well-Known URI for this Addr +func (a *Addr) URI() string { + return fmt.Sprintf("https://%s/.well-known/salty/%s.json", a.DiscoveredDomain(), a.User) +} + +// HashURI returns the Well-Known HashURI for this Addr +func (a *Addr) HashURI() string { + return fmt.Sprintf("https://%s/.well-known/salty/%s.json", a.DiscoveredDomain(), a.Hash()) +} + +// DiscoveredDomain returns the discovered domain (if any) of fallbacks to the Domain +func (a *Addr) DiscoveredDomain() string { + if a.discoveredDomain != "" { + return a.discoveredDomain + } + return a.Domain +} + +func (a *Addr) Refresh(ctx context.Context) error { + ctx, span := lg.Span(ctx) + defer span.End() + + span.AddEvent(fmt.Sprintf("Looking up SRV record for _salty._tcp.%s", a.Domain)) + if target, _, err := a.dns.LookupSRV(ctx, "salty", "tcp", a.Domain); err == nil { + a.discoveredDomain = target + span.AddEvent(fmt.Sprintf("Discovered salty services %s", a.discoveredDomain)) + } else if err != nil { + span.AddEvent(fmt.Sprintf("error looking up SRV record for _salty._tcp.%s : %s", a.Domain, err)) + } + + config, cap, err := fetchConfig(ctx, a.HashURI()) + if err != nil { + // Fallback to plain user nick + config, cap, err = fetchConfig(ctx, a.URI()) + } + if err != nil { + return fmt.Errorf("error looking up user %s: %w", a, err) + } + key, err := keys.NewEdX25519PublicKeyFromID(keys.ID(config.Key)) + if err != nil { + return fmt.Errorf("error parsing public key %s: %w", config.Key, err) + } + a.key = key + + u, err := url.Parse(config.Endpoint) + if err != nil { + return fmt.Errorf("error parsing endpoint %s: %w", config.Endpoint, err) + } + a.endpoint = u + a.capabilities = cap + + span.AddEvent(fmt.Sprintf("Discovered endpoint: %v", a.endpoint)) + span.AddEvent(fmt.Sprintf("Discovered capability: %v", a.capabilities)) + + return nil +} + +func fetchConfig(ctx context.Context, addr string) (config Config, cap Capabilities, err error) { + ctx, span := lg.Span(ctx) + defer span.End() + + var req *http.Request + req, err = http.NewRequestWithContext(ctx, http.MethodGet, addr, nil) + if err != nil { + return + } + + res, err := http.DefaultClient.Do(req) + if err != nil { + return + } + + if err = json.NewDecoder(res.Body).Decode(&config); err != nil { + return + } + + cap.AcceptEncoding = res.Header.Get("Accept-Encoding") + + return +} diff --git a/app/salty/service.go b/app/salty/service.go index 5b796d6..c442ee9 100644 --- a/app/salty/service.go +++ b/app/salty/service.go @@ -6,12 +6,13 @@ import ( "encoding/json" "errors" "fmt" + "net" "net/http" "path" "strings" "github.com/keys-pub/keys" - "github.com/sour-is/ev/internal/logz" + "github.com/sour-is/ev/internal/lg" "github.com/sour-is/ev/pkg/es" "github.com/sour-is/ev/pkg/es/event" "github.com/sour-is/ev/pkg/gql" @@ -19,12 +20,21 @@ import ( "go.uber.org/multierr" ) +type DNSResolver interface { + LookupSRV(ctx context.Context, service, proto, name string) (string, []*net.SRV, error) +} + type service struct { baseURL string es *es.EventStore + dns DNSResolver - Mresolver_create_salty_user syncint64.Counter - Mresolver_salty_user syncint64.Counter + m_create_user syncint64.Counter + m_get_user syncint64.Counter + m_api_ping syncint64.Counter + m_api_register syncint64.Counter + m_api_lookup syncint64.Counter + m_api_send syncint64.Counter } type contextKey struct { name string @@ -35,10 +45,11 @@ var saltyKey = contextKey{"salty"} type SaltyResolver interface { CreateSaltyUser(ctx context.Context, nick string, pub string) (*SaltyUser, error) SaltyUser(ctx context.Context, nick string) (*SaltyUser, error) + RegisterHTTP(mux *http.ServeMux) } func New(ctx context.Context, es *es.EventStore, baseURL string) (*service, error) { - ctx, span := logz.Span(ctx) + ctx, span := lg.Span(ctx) defer span.End() if err := event.Register(ctx, &UserRegistered{}); err != nil { @@ -48,15 +59,27 @@ func New(ctx context.Context, es *es.EventStore, baseURL string) (*service, erro return nil, err } - m := logz.Meter(ctx) + m := lg.Meter(ctx) - svc := &service{baseURL: baseURL, es: es} + svc := &service{baseURL: baseURL, es: es, dns: net.DefaultResolver} var err, errs error - svc.Mresolver_create_salty_user, err = m.SyncInt64().Counter("resolver_create_salty_user") + svc.m_create_user, err = m.SyncInt64().Counter("salty_create_user") errs = multierr.Append(errs, err) - svc.Mresolver_salty_user, err = m.SyncInt64().Counter("resolver_salty_user") + svc.m_get_user, err = m.SyncInt64().Counter("salty_get_user") + errs = multierr.Append(errs, err) + + svc.m_api_ping, err = m.SyncInt64().Counter("salty_api_ping") + errs = multierr.Append(errs, err) + + svc.m_api_register, err = m.SyncInt64().Counter("salty_api_register") + errs = multierr.Append(errs, err) + + svc.m_api_lookup, err = m.SyncInt64().Counter("salty_api_lookup") + errs = multierr.Append(errs, err) + + svc.m_api_send, err = m.SyncInt64().Counter("salty_api_send") errs = multierr.Append(errs, err) span.RecordError(err) @@ -69,12 +92,18 @@ func (s *service) BaseURL() string { } return s.baseURL } -func(s *service) RegisterHTTP(mux *http.ServeMux) { - mux.Handle("/.well-known/salty/", logz.Htrace(s, "lookup")) +func (s *service) RegisterHTTP(mux *http.ServeMux) { + mux.Handle("/.well-known/salty/", lg.Htrace(s, "lookup")) +} +func (s *service) RegisterAPIv1(mux *http.ServeMux) { + mux.HandleFunc("/ping", s.apiv1) + mux.HandleFunc("/register", s.apiv1) + mux.HandleFunc("/lookup/", s.apiv1) + mux.HandleFunc("/send", s.apiv1) } func (s *service) ServeHTTP(w http.ResponseWriter, r *http.Request) { ctx := r.Context() - ctx, span := logz.Span(ctx) + ctx, span := lg.Span(ctx) defer span.End() addr := "saltyuser-" + strings.TrimPrefix(r.URL.Path, "/.well-known/salty/") @@ -108,10 +137,10 @@ func (s *service) ServeHTTP(w http.ResponseWriter, r *http.Request) { } } func (s *service) CreateSaltyUser(ctx context.Context, nick string, pub string) (*SaltyUser, error) { - ctx, span := logz.Span(ctx) + ctx, span := lg.Span(ctx) defer span.End() - s.Mresolver_create_salty_user.Add(ctx, 1) + s.m_create_user.Add(ctx, 1) streamID := fmt.Sprintf("saltyuser-%x", sha256.Sum256([]byte(strings.ToLower(nick)))) span.AddEvent(streamID) @@ -138,10 +167,10 @@ func (s *service) CreateSaltyUser(ctx context.Context, nick string, pub string) return a, nil } func (s *service) SaltyUser(ctx context.Context, nick string) (*SaltyUser, error) { - ctx, span := logz.Span(ctx) + ctx, span := lg.Span(ctx) defer span.End() - s.Mresolver_salty_user.Add(ctx, 1) + s.m_get_user.Add(ctx, 1) streamID := fmt.Sprintf("saltyuser-%x", sha256.Sum256([]byte(strings.ToLower(nick)))) span.AddEvent(streamID) @@ -167,3 +196,54 @@ func (s *service) GetMiddleware() func(http.Handler) http.Handler { }) } } + +func (s *service) apiv1(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + + ctx, span := lg.Span(ctx) + defer span.End() + + switch r.Method { + case http.MethodGet: + switch { + case r.URL.Path == "/ping": + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write([]byte(`{}`)) + + case strings.HasPrefix(r.URL.Path, "/lookup/"): + addr, err := s.ParseAddr(strings.TrimPrefix(r.URL.Path, "/lookup/")) + if err != nil { + span.RecordError(err) + w.WriteHeader(http.StatusBadRequest) + return + } + err = addr.Refresh(ctx) + if err != nil { + span.RecordError(err) + w.WriteHeader(http.StatusBadRequest) + return + } + + json.NewEncoder(w).Encode(addr) + return + + default: + w.WriteHeader(http.StatusNotFound) + return + } + + case http.MethodPost: + switch r.URL.Path { + case "/register": + + case "/send": + + default: + w.WriteHeader(http.StatusNotFound) + return + } + default: + w.WriteHeader(http.StatusMethodNotAllowed) + return + } +} diff --git a/go.mod b/go.mod index f173970..1b451d2 100644 --- a/go.mod +++ b/go.mod @@ -80,6 +80,7 @@ require ( github.com/tidwall/match v1.1.1 // indirect github.com/tidwall/pretty v1.2.0 // indirect github.com/tidwall/tinylru v1.1.0 // indirect + gitlab.com/jamietanna/content-negotiation-go v0.2.0 go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.9.0 go.uber.org/multierr v1.8.0 ) diff --git a/go.sum b/go.sum index acfe328..537851e 100644 --- a/go.sum +++ b/go.sum @@ -351,6 +351,8 @@ github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de github.com/yuin/goldmark v1.4.1/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/yusufpapurcu/wmi v1.2.2 h1:KBNDSne4vP5mbSWnJbO+51IMOXJB67QiYCSBrubbPRg= github.com/yusufpapurcu/wmi v1.2.2/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= +gitlab.com/jamietanna/content-negotiation-go v0.2.0 h1:vT0OLEPQ6DYRG3/1F7joXSNjVQHGivJ6+JzODlJfjWw= +gitlab.com/jamietanna/content-negotiation-go v0.2.0/go.mod h1:n4ZZ8/X5TstnjYRnjEtR/fC7MCTe+aRKM7PQlLBH3PQ= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= diff --git a/httpmux.go b/httpmux.go new file mode 100644 index 0000000..a19f204 --- /dev/null +++ b/httpmux.go @@ -0,0 +1,37 @@ +package main + +import ( + "net/http" + + "github.com/rs/cors" +) + +type mux struct { + *http.ServeMux + api *http.ServeMux +} + +func httpMux(fns ...interface{ RegisterHTTP(*http.ServeMux) }) http.Handler { + mux := newMux() + for _, fn := range fns { + fn.RegisterHTTP(mux.ServeMux) + + if fn, ok := fn.(interface{ RegisterAPIv1(*http.ServeMux) }); ok { + fn.RegisterAPIv1(mux.api) + } + } + + return cors.AllowAll().Handler(mux) +} +func newMux() *mux { + mux := &mux{ + api: http.NewServeMux(), + ServeMux: http.NewServeMux(), + } + mux.Handle("/api/v1/", http.StripPrefix("/api/v1/", mux.api)) + + return mux +} +func (m mux) HandleAPIv1(pattern string, handler http.Handler) { + m.api.Handle(pattern, handler) +} diff --git a/internal/graph/generated/generated.go b/internal/graph/generated/generated.go index b5c81de..cad5da7 100644 --- a/internal/graph/generated/generated.go +++ b/internal/graph/generated/generated.go @@ -18,6 +18,7 @@ import ( "github.com/99designs/gqlgen/plugin/federation/fedruntime" "github.com/sour-is/ev/app/msgbus" "github.com/sour-is/ev/app/salty" + "github.com/sour-is/ev/pkg/es" "github.com/sour-is/ev/pkg/es/event" "github.com/sour-is/ev/pkg/gql" gqlparser "github.com/vektah/gqlparser/v2" @@ -56,6 +57,14 @@ type ComplexityRoot struct { Paging func(childComplexity int) int } + Event struct { + Bytes func(childComplexity int) int + EventID func(childComplexity int) int + ID func(childComplexity int) int + Meta func(childComplexity int) int + Values func(childComplexity int) int + } + Meta struct { Created func(childComplexity int) int GetEventID func(childComplexity int) int @@ -83,6 +92,7 @@ type ComplexityRoot struct { } Query struct { + Events func(childComplexity int, streamID string, paging *gql.PageInput) int Posts func(childComplexity int, streamID string, paging *gql.PageInput) int SaltyUser func(childComplexity int, nick string) int __resolve__service func(childComplexity int) int @@ -96,7 +106,8 @@ type ComplexityRoot struct { } Subscription struct { - PostAdded func(childComplexity int, streamID string, after int64) int + EventAdded func(childComplexity int, streamID string, after int64) int + PostAdded func(childComplexity int, streamID string, after int64) int } _Service struct { @@ -108,10 +119,12 @@ type MutationResolver interface { CreateSaltyUser(ctx context.Context, nick string, pubkey string) (*salty.SaltyUser, error) } type QueryResolver interface { + Events(ctx context.Context, streamID string, paging *gql.PageInput) (*gql.Connection, error) Posts(ctx context.Context, streamID string, paging *gql.PageInput) (*gql.Connection, error) SaltyUser(ctx context.Context, nick string) (*salty.SaltyUser, error) } type SubscriptionResolver interface { + EventAdded(ctx context.Context, streamID string, after int64) (<-chan *es.GQLEvent, error) PostAdded(ctx context.Context, streamID string, after int64) (<-chan *msgbus.PostEvent, error) } @@ -144,6 +157,41 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in return e.complexity.Connection.Paging(childComplexity), true + case "Event.bytes": + if e.complexity.Event.Bytes == nil { + break + } + + return e.complexity.Event.Bytes(childComplexity), true + + case "Event.eventID": + if e.complexity.Event.EventID == nil { + break + } + + return e.complexity.Event.EventID(childComplexity), true + + case "Event.id": + if e.complexity.Event.ID == nil { + break + } + + return e.complexity.Event.ID(childComplexity), true + + case "Event.meta": + if e.complexity.Event.Meta == nil { + break + } + + return e.complexity.Event.Meta(childComplexity), true + + case "Event.values": + if e.complexity.Event.Values == nil { + break + } + + return e.complexity.Event.Values(childComplexity), true + case "Meta.created": if e.complexity.Meta.Created == nil { break @@ -247,6 +295,18 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in return e.complexity.PostEvent.Tags(childComplexity), true + case "Query.events": + if e.complexity.Query.Events == nil { + break + } + + args, err := ec.field_Query_events_args(context.TODO(), rawArgs) + if err != nil { + return 0, false + } + + return e.complexity.Query.Events(childComplexity, args["streamID"].(string), args["paging"].(*gql.PageInput)), true + case "Query.posts": if e.complexity.Query.Posts == nil { break @@ -306,6 +366,18 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in return e.complexity.SaltyUser.Pubkey(childComplexity), true + case "Subscription.eventAdded": + if e.complexity.Subscription.EventAdded == nil { + break + } + + args, err := ec.field_Subscription_eventAdded_args(context.TODO(), rawArgs) + if err != nil { + return 0, false + } + + return e.complexity.Subscription.EventAdded(childComplexity, args["streamID"].(string), args["after"].(int64)), true + case "Subscription.postAdded": if e.complexity.Subscription.PostAdded == nil { break @@ -417,6 +489,23 @@ type Meta @goModel(model: "github.com/sour-is/ev/pkg/es/event.Meta") { streamID: String! created: Time! position: Int! +} + +extend type Query { + events(streamID: String! paging: PageInput): Connection! +} +extend type Subscription { + """after == 0 start from begining, after == -1 start from end""" + eventAdded(streamID: String! after: Int! = -1): Event +} + +type Event implements Edge @goModel(model: "github.com/sour-is/ev/pkg/es.GQLEvent") { + id: ID! + + eventID: String! + values: Map! + bytes: String! + meta: Meta! }`, BuiltIn: false}, {Name: "../../../pkg/gql/common.graphqls", Input: `scalar Time scalar Map @@ -550,6 +639,30 @@ func (ec *executionContext) field_Query___type_args(ctx context.Context, rawArgs return args, nil } +func (ec *executionContext) field_Query_events_args(ctx context.Context, rawArgs map[string]interface{}) (map[string]interface{}, error) { + var err error + args := map[string]interface{}{} + var arg0 string + if tmp, ok := rawArgs["streamID"]; ok { + ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("streamID")) + arg0, err = ec.unmarshalNString2string(ctx, tmp) + if err != nil { + return nil, err + } + } + args["streamID"] = arg0 + var arg1 *gql.PageInput + if tmp, ok := rawArgs["paging"]; ok { + ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("paging")) + arg1, err = ec.unmarshalOPageInput2ᚖgithubᚗcomᚋsourᚑisᚋevᚋpkgᚋgqlᚐPageInput(ctx, tmp) + if err != nil { + return nil, err + } + } + args["paging"] = arg1 + return args, nil +} + func (ec *executionContext) field_Query_posts_args(ctx context.Context, rawArgs map[string]interface{}) (map[string]interface{}, error) { var err error args := map[string]interface{}{} @@ -589,6 +702,30 @@ func (ec *executionContext) field_Query_saltyUser_args(ctx context.Context, rawA return args, nil } +func (ec *executionContext) field_Subscription_eventAdded_args(ctx context.Context, rawArgs map[string]interface{}) (map[string]interface{}, error) { + var err error + args := map[string]interface{}{} + var arg0 string + if tmp, ok := rawArgs["streamID"]; ok { + ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("streamID")) + arg0, err = ec.unmarshalNString2string(ctx, tmp) + if err != nil { + return nil, err + } + } + args["streamID"] = arg0 + var arg1 int64 + if tmp, ok := rawArgs["after"]; ok { + ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("after")) + arg1, err = ec.unmarshalNInt2int64(ctx, tmp) + if err != nil { + return nil, err + } + } + args["after"] = arg1 + return args, nil +} + func (ec *executionContext) field_Subscription_postAdded_args(ctx context.Context, rawArgs map[string]interface{}) (map[string]interface{}, error) { var err error args := map[string]interface{}{} @@ -749,6 +886,236 @@ func (ec *executionContext) fieldContext_Connection_edges(ctx context.Context, f return fc, nil } +func (ec *executionContext) _Event_id(ctx context.Context, field graphql.CollectedField, obj *es.GQLEvent) (ret graphql.Marshaler) { + fc, err := ec.fieldContext_Event_id(ctx, field) + if err != nil { + return graphql.Null + } + ctx = graphql.WithFieldContext(ctx, fc) + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return obj.ID(), nil + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + if !graphql.HasFieldError(ctx, fc) { + ec.Errorf(ctx, "must not be null") + } + return graphql.Null + } + res := resTmp.(string) + fc.Result = res + return ec.marshalNID2string(ctx, field.Selections, res) +} + +func (ec *executionContext) fieldContext_Event_id(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "Event", + Field: field, + IsMethod: true, + IsResolver: false, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + return nil, errors.New("field of type ID does not have child fields") + }, + } + return fc, nil +} + +func (ec *executionContext) _Event_eventID(ctx context.Context, field graphql.CollectedField, obj *es.GQLEvent) (ret graphql.Marshaler) { + fc, err := ec.fieldContext_Event_eventID(ctx, field) + if err != nil { + return graphql.Null + } + ctx = graphql.WithFieldContext(ctx, fc) + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return obj.EventID(), nil + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + if !graphql.HasFieldError(ctx, fc) { + ec.Errorf(ctx, "must not be null") + } + return graphql.Null + } + res := resTmp.(string) + fc.Result = res + return ec.marshalNString2string(ctx, field.Selections, res) +} + +func (ec *executionContext) fieldContext_Event_eventID(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "Event", + Field: field, + IsMethod: true, + IsResolver: false, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + return nil, errors.New("field of type String does not have child fields") + }, + } + return fc, nil +} + +func (ec *executionContext) _Event_values(ctx context.Context, field graphql.CollectedField, obj *es.GQLEvent) (ret graphql.Marshaler) { + fc, err := ec.fieldContext_Event_values(ctx, field) + if err != nil { + return graphql.Null + } + ctx = graphql.WithFieldContext(ctx, fc) + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return obj.Values(), nil + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + if !graphql.HasFieldError(ctx, fc) { + ec.Errorf(ctx, "must not be null") + } + return graphql.Null + } + res := resTmp.(map[string]interface{}) + fc.Result = res + return ec.marshalNMap2map(ctx, field.Selections, res) +} + +func (ec *executionContext) fieldContext_Event_values(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "Event", + Field: field, + IsMethod: true, + IsResolver: false, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + return nil, errors.New("field of type Map does not have child fields") + }, + } + return fc, nil +} + +func (ec *executionContext) _Event_bytes(ctx context.Context, field graphql.CollectedField, obj *es.GQLEvent) (ret graphql.Marshaler) { + fc, err := ec.fieldContext_Event_bytes(ctx, field) + if err != nil { + return graphql.Null + } + ctx = graphql.WithFieldContext(ctx, fc) + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return obj.Bytes() + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + if !graphql.HasFieldError(ctx, fc) { + ec.Errorf(ctx, "must not be null") + } + return graphql.Null + } + res := resTmp.(string) + fc.Result = res + return ec.marshalNString2string(ctx, field.Selections, res) +} + +func (ec *executionContext) fieldContext_Event_bytes(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "Event", + Field: field, + IsMethod: true, + IsResolver: false, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + return nil, errors.New("field of type String does not have child fields") + }, + } + return fc, nil +} + +func (ec *executionContext) _Event_meta(ctx context.Context, field graphql.CollectedField, obj *es.GQLEvent) (ret graphql.Marshaler) { + fc, err := ec.fieldContext_Event_meta(ctx, field) + if err != nil { + return graphql.Null + } + ctx = graphql.WithFieldContext(ctx, fc) + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return obj.Meta(), nil + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + if !graphql.HasFieldError(ctx, fc) { + ec.Errorf(ctx, "must not be null") + } + return graphql.Null + } + res := resTmp.(*event.Meta) + fc.Result = res + return ec.marshalNMeta2ᚖgithubᚗcomᚋsourᚑisᚋevᚋpkgᚋesᚋeventᚐMeta(ctx, field.Selections, res) +} + +func (ec *executionContext) fieldContext_Event_meta(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "Event", + Field: field, + IsMethod: true, + IsResolver: false, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + switch field.Name { + case "eventID": + return ec.fieldContext_Meta_eventID(ctx, field) + case "streamID": + return ec.fieldContext_Meta_streamID(ctx, field) + case "created": + return ec.fieldContext_Meta_created(ctx, field) + case "position": + return ec.fieldContext_Meta_position(ctx, field) + } + return nil, fmt.Errorf("no field named %q was found under type Meta", field.Name) + }, + } + return fc, nil +} + func (ec *executionContext) _Meta_eventID(ctx context.Context, field graphql.CollectedField, obj *event.Meta) (ret graphql.Marshaler) { fc, err := ec.fieldContext_Meta_eventID(ctx, field) if err != nil { @@ -1393,6 +1760,67 @@ func (ec *executionContext) fieldContext_PostEvent_meta(ctx context.Context, fie return fc, nil } +func (ec *executionContext) _Query_events(ctx context.Context, field graphql.CollectedField) (ret graphql.Marshaler) { + fc, err := ec.fieldContext_Query_events(ctx, field) + if err != nil { + return graphql.Null + } + ctx = graphql.WithFieldContext(ctx, fc) + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return ec.resolvers.Query().Events(rctx, fc.Args["streamID"].(string), fc.Args["paging"].(*gql.PageInput)) + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + if !graphql.HasFieldError(ctx, fc) { + ec.Errorf(ctx, "must not be null") + } + return graphql.Null + } + res := resTmp.(*gql.Connection) + fc.Result = res + return ec.marshalNConnection2ᚖgithubᚗcomᚋsourᚑisᚋevᚋpkgᚋgqlᚐConnection(ctx, field.Selections, res) +} + +func (ec *executionContext) fieldContext_Query_events(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "Query", + Field: field, + IsMethod: true, + IsResolver: true, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + switch field.Name { + case "paging": + return ec.fieldContext_Connection_paging(ctx, field) + case "edges": + return ec.fieldContext_Connection_edges(ctx, field) + } + return nil, fmt.Errorf("no field named %q was found under type Connection", field.Name) + }, + } + defer func() { + if r := recover(); r != nil { + err = ec.Recover(ctx, r) + ec.Error(ctx, err) + } + }() + ctx = graphql.WithFieldContext(ctx, fc) + if fc.Args, err = ec.field_Query_events_args(ctx, field.ArgumentMap(ec.Variables)); err != nil { + ec.Error(ctx, err) + return + } + return fc, nil +} + func (ec *executionContext) _Query_posts(ctx context.Context, field graphql.CollectedField) (ret graphql.Marshaler) { fc, err := ec.fieldContext_Query_posts(ctx, field) if err != nil { @@ -1869,6 +2297,84 @@ func (ec *executionContext) fieldContext_SaltyUser_endpoint(ctx context.Context, return fc, nil } +func (ec *executionContext) _Subscription_eventAdded(ctx context.Context, field graphql.CollectedField) (ret func(ctx context.Context) graphql.Marshaler) { + fc, err := ec.fieldContext_Subscription_eventAdded(ctx, field) + if err != nil { + return nil + } + ctx = graphql.WithFieldContext(ctx, fc) + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = nil + } + }() + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return ec.resolvers.Subscription().EventAdded(rctx, fc.Args["streamID"].(string), fc.Args["after"].(int64)) + }) + if err != nil { + ec.Error(ctx, err) + return nil + } + if resTmp == nil { + return nil + } + return func(ctx context.Context) graphql.Marshaler { + select { + case res, ok := <-resTmp.(<-chan *es.GQLEvent): + if !ok { + return nil + } + return graphql.WriterFunc(func(w io.Writer) { + w.Write([]byte{'{'}) + graphql.MarshalString(field.Alias).MarshalGQL(w) + w.Write([]byte{':'}) + ec.marshalOEvent2ᚖgithubᚗcomᚋsourᚑisᚋevᚋpkgᚋesᚐGQLEvent(ctx, field.Selections, res).MarshalGQL(w) + w.Write([]byte{'}'}) + }) + case <-ctx.Done(): + return nil + } + } +} + +func (ec *executionContext) fieldContext_Subscription_eventAdded(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "Subscription", + Field: field, + IsMethod: true, + IsResolver: true, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + switch field.Name { + case "id": + return ec.fieldContext_Event_id(ctx, field) + case "eventID": + return ec.fieldContext_Event_eventID(ctx, field) + case "values": + return ec.fieldContext_Event_values(ctx, field) + case "bytes": + return ec.fieldContext_Event_bytes(ctx, field) + case "meta": + return ec.fieldContext_Event_meta(ctx, field) + } + return nil, fmt.Errorf("no field named %q was found under type Event", field.Name) + }, + } + defer func() { + if r := recover(); r != nil { + err = ec.Recover(ctx, r) + ec.Error(ctx, err) + } + }() + ctx = graphql.WithFieldContext(ctx, fc) + if fc.Args, err = ec.field_Subscription_eventAdded_args(ctx, field.ArgumentMap(ec.Variables)); err != nil { + ec.Error(ctx, err) + return + } + return fc, nil +} + func (ec *executionContext) _Subscription_postAdded(ctx context.Context, field graphql.CollectedField) (ret func(ctx context.Context) graphql.Marshaler) { fc, err := ec.fieldContext_Subscription_postAdded(ctx, field) if err != nil { @@ -3812,6 +4318,11 @@ func (ec *executionContext) _Edge(ctx context.Context, sel ast.SelectionSet, obj switch obj := (obj).(type) { case nil: return graphql.Null + case *es.GQLEvent: + if obj == nil { + return graphql.Null + } + return ec._Event(ctx, sel, obj) case *msgbus.PostEvent: if obj == nil { return graphql.Null @@ -3861,6 +4372,62 @@ func (ec *executionContext) _Connection(ctx context.Context, sel ast.SelectionSe return out } +var eventImplementors = []string{"Event", "Edge"} + +func (ec *executionContext) _Event(ctx context.Context, sel ast.SelectionSet, obj *es.GQLEvent) graphql.Marshaler { + fields := graphql.CollectFields(ec.OperationContext, sel, eventImplementors) + out := graphql.NewFieldSet(fields) + var invalids uint32 + for i, field := range fields { + switch field.Name { + case "__typename": + out.Values[i] = graphql.MarshalString("Event") + case "id": + + out.Values[i] = ec._Event_id(ctx, field, obj) + + if out.Values[i] == graphql.Null { + invalids++ + } + case "eventID": + + out.Values[i] = ec._Event_eventID(ctx, field, obj) + + if out.Values[i] == graphql.Null { + invalids++ + } + case "values": + + out.Values[i] = ec._Event_values(ctx, field, obj) + + if out.Values[i] == graphql.Null { + invalids++ + } + case "bytes": + + out.Values[i] = ec._Event_bytes(ctx, field, obj) + + if out.Values[i] == graphql.Null { + invalids++ + } + case "meta": + + out.Values[i] = ec._Event_meta(ctx, field, obj) + + if out.Values[i] == graphql.Null { + invalids++ + } + default: + panic("unknown field " + strconv.Quote(field.Name)) + } + } + out.Dispatch() + if invalids > 0 { + return graphql.Null + } + return out +} + var metaImplementors = []string{"Meta"} func (ec *executionContext) _Meta(ctx context.Context, sel ast.SelectionSet, obj *event.Meta) graphql.Marshaler { @@ -4083,6 +4650,29 @@ func (ec *executionContext) _Query(ctx context.Context, sel ast.SelectionSet) gr switch field.Name { case "__typename": out.Values[i] = graphql.MarshalString("Query") + case "events": + field := field + + innerFunc := func(ctx context.Context) (res graphql.Marshaler) { + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + } + }() + res = ec._Query_events(ctx, field) + if res == graphql.Null { + atomic.AddUint32(&invalids, 1) + } + return res + } + + rrm := func(ctx context.Context) graphql.Marshaler { + return ec.OperationContext.RootResolverMiddleware(ctx, innerFunc) + } + + out.Concurrently(i, func() graphql.Marshaler { + return rrm(innerCtx) + }) case "posts": field := field @@ -4247,6 +4837,8 @@ func (ec *executionContext) _Subscription(ctx context.Context, sel ast.Selection } switch fields[0].Name { + case "eventAdded": + return ec._Subscription_eventAdded(ctx, fields[0]) case "postAdded": return ec._Subscription_postAdded(ctx, fields[0]) default: @@ -5126,6 +5718,13 @@ func (ec *executionContext) marshalOBoolean2ᚖbool(ctx context.Context, sel ast return res } +func (ec *executionContext) marshalOEvent2ᚖgithubᚗcomᚋsourᚑisᚋevᚋpkgᚋesᚐGQLEvent(ctx context.Context, sel ast.SelectionSet, v *es.GQLEvent) graphql.Marshaler { + if v == nil { + return graphql.Null + } + return ec._Event(ctx, sel, v) +} + func (ec *executionContext) unmarshalOInt2ᚖint64(ctx context.Context, v interface{}) (*int64, error) { if v == nil { return nil, nil diff --git a/internal/lg/init.go b/internal/lg/init.go new file mode 100644 index 0000000..9a2a572 --- /dev/null +++ b/internal/lg/init.go @@ -0,0 +1,68 @@ +package lg + +import ( + "context" + "log" + "os" + "strings" + + "go.uber.org/multierr" +) + +func Init(ctx context.Context, name string) (context.Context, func() error) { + ctx, span := Span(ctx) + defer span.End() + + stop := [3]func() error{ + initLogger(name), + } + ctx, stop[1] = initMetrics(ctx, name) + ctx, stop[2] = initTracing(ctx, name) + + reverse(stop[:]) + + return ctx, func() error { + log.Println("flushing logs...") + errs := make([]error, len(stop)) + for i, fn := range stop { + if fn != nil { + errs[i] = fn() + } + } + log.Println("all stopped.") + return multierr.Combine(errs...) + } +} + +func env(name, defaultValue string) string { + name = strings.TrimSpace(name) + defaultValue = strings.TrimSpace(defaultValue) + if v := strings.TrimSpace(os.Getenv(name)); v != "" { + log.Println("# ", name, "=", v) + return v + } + log.Println("# ", name, "=", defaultValue, "(default)") + return defaultValue +} + +type secret string + +func (s secret) String() string { + if s == "" { + return "(nil)" + } + return "***" +} +func (s secret) Secret() string { + return string(s) +} +func envSecret(name, defaultValue string) secret { + name = strings.TrimSpace(name) + defaultValue = strings.TrimSpace(defaultValue) + if v := strings.TrimSpace(os.Getenv(name)); v != "" { + log.Println("# ", name, "=", secret(v)) + return secret(v) + } + log.Println("# ", name, "=", secret(defaultValue), "(default)") + return secret(defaultValue) +} diff --git a/internal/logz/logger.go b/internal/lg/logger.go similarity index 93% rename from internal/logz/logger.go rename to internal/lg/logger.go index 05c0f26..80cb813 100644 --- a/internal/logz/logger.go +++ b/internal/lg/logger.go @@ -1,4 +1,4 @@ -package logz +package lg import ( "bytes" @@ -64,15 +64,15 @@ func initLogger(name string) func() error { log.SetPrefix("[" + name + "] ") log.SetFlags(log.LstdFlags&^(log.Ldate|log.Ltime) | log.Lshortfile) - token := env("LOGZIO_LOG_TOKEN", "") + token := envSecret("LOGZIO_LOG_TOKEN", "") if token == "" { return nil } l, err := logzio.New( - token, + token.Secret(), // logzio.SetDebug(os.Stderr), - logzio.SetUrl(env("LOGZIO_LOG_URL", "https://listener.logz.io:8071")), + logzio.SetUrl(env("LOGZIO_LOG_URL", "https://listener.lg.io:8071")), logzio.SetDrainDuration(time.Second*5), logzio.SetTempDirectory(env("LOGZIO_DIR", os.TempDir())), logzio.SetCheckDiskSpace(true), diff --git a/internal/logz/metric.go b/internal/lg/metric.go similarity index 99% rename from internal/logz/metric.go rename to internal/lg/metric.go index e72b2cb..4a1d36f 100644 --- a/internal/logz/metric.go +++ b/internal/lg/metric.go @@ -1,4 +1,4 @@ -package logz +package lg import ( "context" diff --git a/internal/logz/tracer.go b/internal/lg/tracer.go similarity index 81% rename from internal/logz/tracer.go rename to internal/lg/tracer.go index d1925c4..a5c9639 100644 --- a/internal/logz/tracer.go +++ b/internal/lg/tracer.go @@ -1,4 +1,4 @@ -package logz +package lg import ( "context" @@ -33,10 +33,10 @@ func Tracer(ctx context.Context) trace.Tracer { return otel.Tracer("") } -func Span(ctx context.Context, opts ...trace.SpanStartOption) (context.Context, trace.Span) { +func attrs(ctx context.Context) (string, []attribute.KeyValue) { var attrs []attribute.KeyValue var name string - if pc, file, line, ok := runtime.Caller(1); ok { + if pc, file, line, ok := runtime.Caller(2); ok { if fn := runtime.FuncForPC(pc); fn != nil { name = fn.Name() } @@ -47,12 +47,29 @@ func Span(ctx context.Context, opts ...trace.SpanStartOption) (context.Context, attribute.String("name", name), ) } + return name, attrs +} + +func Span(ctx context.Context, opts ...trace.SpanStartOption) (context.Context, trace.Span) { + name, attrs := attrs(ctx) ctx, span := Tracer(ctx).Start(ctx, name, opts...) span.SetAttributes(attrs...) return ctx, span } +func Fork(ctx context.Context, opts ...trace.SpanStartOption) (context.Context, trace.Span) { + name, attrs := attrs(ctx) + childCTX, childSpan := Tracer(ctx).Start(context.Background(), name, append(opts, trace.WithLinks(trace.LinkFromContext(ctx)))...) + childSpan.SetAttributes(attrs...) + + _, span := Tracer(ctx).Start(ctx, name, append(opts, trace.WithLinks(trace.LinkFromContext(childCTX)))...) + span.SetAttributes(attrs...) + defer span.End() + + return childCTX, childSpan +} + type SampleRate string const ( @@ -71,9 +88,13 @@ func initTracing(ctx context.Context, name string) (context.Context, func() erro return ctx, nil } + exporterAddr := env("EV_TRACE_ENDPOINT", "") + if exporterAddr == "" { + return ctx, nil + } traceExporter, err := otlptracehttp.New(ctx, otlptracehttp.WithInsecure(), - otlptracehttp.WithEndpoint("localhost:4318"), + otlptracehttp.WithEndpoint(exporterAddr), ) if err != nil { log.Println(wrap(err, "failed to create trace exporter")) diff --git a/internal/logz/init.go b/internal/logz/init.go deleted file mode 100644 index bfef13d..0000000 --- a/internal/logz/init.go +++ /dev/null @@ -1,42 +0,0 @@ -package logz - -import ( - "context" - "log" - "os" - - "go.uber.org/multierr" -) - -func Init(ctx context.Context, name string) (context.Context, func() error) { - ctx, span := Span(ctx) - defer span.End() - - stop := [3]func() error{ - initLogger(name), - } - ctx, stop[1] = initMetrics(ctx, name) - ctx, stop[2] = initTracing(ctx, name) - - reverse(stop[:]) - - return ctx, func() error { - log.Println("flushing logs...") - errs := make([]error, len(stop)) - for i, fn := range stop { - if fn != nil { - errs[i] = fn() - } - } - log.Println("all stopped.") - return multierr.Combine(errs...) - } -} - -func env(name, defaultValue string) string { - if v := os.Getenv(name); v != "" { - log.Println("# ", name, " = ", v) - return v - } - return defaultValue -} diff --git a/main.go b/main.go index f598e27..f837534 100644 --- a/main.go +++ b/main.go @@ -7,19 +7,24 @@ import ( "os" "os/signal" "path" + "strings" "time" - "github.com/rs/cors" + "go.uber.org/multierr" "golang.org/x/sync/errgroup" "github.com/sour-is/ev/app/gql" "github.com/sour-is/ev/app/msgbus" + "github.com/sour-is/ev/app/peerfinder" "github.com/sour-is/ev/app/salty" - "github.com/sour-is/ev/internal/logz" + "github.com/sour-is/ev/internal/lg" "github.com/sour-is/ev/pkg/es" diskstore "github.com/sour-is/ev/pkg/es/driver/disk-store" memstore "github.com/sour-is/ev/pkg/es/driver/mem-store" + "github.com/sour-is/ev/pkg/es/driver/projecter" "github.com/sour-is/ev/pkg/es/driver/streamer" + "github.com/sour-is/ev/pkg/es/event" + "github.com/sour-is/ev/pkg/set" ) const AppName string = "sour.is-ev" @@ -31,7 +36,7 @@ func main() { defer cancel() }() - ctx, stop := logz.Init(ctx, AppName) + ctx, stop := lg.Init(ctx, AppName) defer stop() if err := run(ctx); err != nil && err != http.ErrServerClosed { @@ -42,12 +47,20 @@ func run(ctx context.Context) error { g, ctx := errgroup.WithContext(ctx) { - ctx, span := logz.Span(ctx) + ctx, span := lg.Span(ctx) - diskstore.Init(ctx) - memstore.Init(ctx) + err := multierr.Combine( + es.Init(ctx), + event.Init(ctx), + diskstore.Init(ctx), + memstore.Init(ctx), + ) + if err != nil { + span.RecordError(err) + return err + } - es, err := es.Open(ctx, env("EV_DATA", "file:data"), streamer.New(ctx)) + es, err := es.Open(ctx, env("EV_DATA", "mem:"), streamer.New(ctx), projecter.New(ctx)) if err != nil { span.RecordError(err) return err @@ -57,30 +70,62 @@ func run(ctx context.Context) error { Addr: env("EV_HTTP", ":8080"), } - salty, err := salty.New(ctx, es, path.Join(env("EV_BASE_URL", "http://localhost"+s.Addr), "inbox")) - if err != nil { - span.RecordError(err) - return err + if strings.HasPrefix(s.Addr, ":") { + s.Addr = "[::]" + s.Addr } - msgbus, err := msgbus.New(ctx, es) - if err != nil { - span.RecordError(err) - return err + enable := set.New(strings.Fields(env("EV_ENABLE", "salty msgbus gql peers"))...) + var svcs []interface{ RegisterHTTP(*http.ServeMux) } + + svcs = append(svcs, es) + + if enable.Has("salty") { + span.AddEvent("Enable Salty") + salty, err := salty.New(ctx, es, path.Join(env("EV_BASE_URL", "http://"+s.Addr), "inbox")) + if err != nil { + span.RecordError(err) + return err + } + svcs = append(svcs, salty) } - gql, err := gql.New(ctx, msgbus, salty) - if err != nil { - span.RecordError(err) - return err + if enable.Has("msgbus") { + span.AddEvent("Enable Msgbus") + msgbus, err := msgbus.New(ctx, es) + if err != nil { + span.RecordError(err) + return err + } + svcs = append(svcs, msgbus) } - s.Handler = httpMux(logz.NewHTTP(ctx), msgbus, salty, gql) + if enable.Has("peers") { + span.AddEvent("Enable Peers") + peers, err := peerfinder.New(ctx, es) + if err != nil { + span.RecordError(err) + return err + } + svcs = append(svcs, peers) + } + + if enable.Has("gql") { + span.AddEvent("Enable GraphQL") + gql, err := gql.New(ctx, svcs...) + if err != nil { + span.RecordError(err) + return err + } + svcs = append(svcs, gql) + } + svcs = append(svcs, lg.NewHTTP(ctx)) + + s.Handler = httpMux(svcs...) log.Print("Listen on ", s.Addr) - span.AddEvent("begin listen and serve") + span.AddEvent("begin listen and serve on " + s.Addr) - Mup, err := logz.Meter(ctx).SyncInt64().UpDownCounter("up") + Mup, err := lg.Meter(ctx).SyncInt64().UpDownCounter("up") if err != nil { return err } @@ -104,16 +149,12 @@ func run(ctx context.Context) error { return nil } func env(name, defaultValue string) string { - if v := os.Getenv(name); v != "" { - log.Println("# ", name, " = ", v) + name = strings.TrimSpace(name) + defaultValue = strings.TrimSpace(defaultValue) + if v := strings.TrimSpace(os.Getenv(name)); v != "" { + log.Println("#", name, "=", v) return v } + log.Println("#", name, "=", defaultValue, "(default)") return defaultValue } -func httpMux(fns ...interface{ RegisterHTTP(*http.ServeMux) }) http.Handler { - mux := http.NewServeMux() - for _, fn := range fns { - fn.RegisterHTTP(mux) - } - return cors.AllowAll().Handler(mux) -} diff --git a/pkg/es/driver/disk-store/disk-store.go b/pkg/es/driver/disk-store/disk-store.go index 43e7057..874fd53 100644 --- a/pkg/es/driver/disk-store/disk-store.go +++ b/pkg/es/driver/disk-store/disk-store.go @@ -11,7 +11,7 @@ import ( "go.opentelemetry.io/otel/metric/instrument/syncint64" "go.uber.org/multierr" - "github.com/sour-is/ev/internal/logz" + "github.com/sour-is/ev/internal/lg" "github.com/sour-is/ev/pkg/cache" "github.com/sour-is/ev/pkg/es" "github.com/sour-is/ev/pkg/es/driver" @@ -38,10 +38,10 @@ const AppendOnly = es.AppendOnly const AllEvents = es.AllEvents func Init(ctx context.Context) error { - _, span := logz.Span(ctx) + _, span := lg.Span(ctx) defer span.End() - m := logz.Meter(ctx) + m := lg.Meter(ctx) var err, errs error Mdisk_open, err := m.SyncInt64().Counter("disk_open") @@ -61,7 +61,7 @@ func Init(ctx context.Context) error { var _ driver.Driver = (*diskStore)(nil) func (d *diskStore) Open(ctx context.Context, dsn string) (driver.Driver, error) { - ctx, span := logz.Span(ctx) + ctx, span := lg.Span(ctx) defer span.End() d.Mdisk_open.Add(ctx, 1) @@ -82,11 +82,11 @@ func (d *diskStore) Open(ctx context.Context, dsn string) (driver.Driver, error) } } c, err := cache.NewWithEvict(CachSize, func(ctx context.Context, s string, l *lockedWal) { - _, span := logz.Span(ctx) + _, span := lg.Span(ctx) defer span.End() l.Modify(ctx, func(w *wal.Log) error { - _, span := logz.Span(ctx) + _, span := lg.Span(ctx) defer span.End() d.Mdisk_evict.Add(ctx, 1) @@ -112,13 +112,13 @@ func (d *diskStore) Open(ctx context.Context, dsn string) (driver.Driver, error) }, nil } func (d *diskStore) EventLog(ctx context.Context, streamID string) (driver.EventLog, error) { - _, span := logz.Span(ctx) + _, span := lg.Span(ctx) defer span.End() el := &eventLog{streamID: streamID} return el, d.openlogs.Modify(ctx, func(openlogs *openlogs) error { - _, span := logz.Span(ctx) + _, span := lg.Span(ctx) defer span.End() if events, ok := openlogs.logs.Get(streamID); ok { @@ -145,14 +145,14 @@ type eventLog struct { var _ driver.EventLog = (*eventLog)(nil) func (e *eventLog) Append(ctx context.Context, events event.Events, version uint64) (uint64, error) { - _, span := logz.Span(ctx) + _, span := lg.Span(ctx) defer span.End() event.SetStreamID(e.streamID, events...) var count uint64 err := e.events.Modify(ctx, func(l *wal.Log) error { - _, span := logz.Span(ctx) + _, span := lg.Span(ctx) defer span.End() last, err := l.LastIndex() @@ -190,13 +190,13 @@ func (e *eventLog) Append(ctx context.Context, events event.Events, version uint return count, err } func (e *eventLog) Read(ctx context.Context, pos, count int64) (event.Events, error) { - _, span := logz.Span(ctx) + _, span := lg.Span(ctx) defer span.End() var events event.Events err := e.events.Modify(ctx, func(stream *wal.Log) error { - _, span := logz.Span(ctx) + _, span := lg.Span(ctx) defer span.End() first, err := stream.FirstIndex() @@ -259,7 +259,7 @@ func (e *eventLog) Read(ctx context.Context, pos, count int64) (event.Events, er return events, nil } func (e *eventLog) FirstIndex(ctx context.Context) (uint64, error) { - _, span := logz.Span(ctx) + _, span := lg.Span(ctx) defer span.End() var idx uint64 @@ -273,7 +273,7 @@ func (e *eventLog) FirstIndex(ctx context.Context) (uint64, error) { return idx, err } func (e *eventLog) LastIndex(ctx context.Context) (uint64, error) { - _, span := logz.Span(ctx) + _, span := lg.Span(ctx) defer span.End() var idx uint64 diff --git a/pkg/es/driver/mem-store/mem-store.go b/pkg/es/driver/mem-store/mem-store.go index 70f4a72..b4378ab 100644 --- a/pkg/es/driver/mem-store/mem-store.go +++ b/pkg/es/driver/mem-store/mem-store.go @@ -4,7 +4,7 @@ import ( "context" "fmt" - "github.com/sour-is/ev/internal/logz" + "github.com/sour-is/ev/internal/lg" "github.com/sour-is/ev/pkg/es" "github.com/sour-is/ev/pkg/es/driver" "github.com/sour-is/ev/pkg/es/event" @@ -26,30 +26,30 @@ type memstore struct { const AppendOnly = es.AppendOnly const AllEvents = es.AllEvents -func Init(ctx context.Context) { - ctx, span := logz.Span(ctx) +func Init(ctx context.Context) error { + ctx, span := lg.Span(ctx) defer span.End() - es.Register(ctx, "mem", &memstore{}) + return es.Register(ctx, "mem", &memstore{}) } var _ driver.Driver = (*memstore)(nil) func (memstore) Open(ctx context.Context, name string) (driver.Driver, error) { - _, span := logz.Span(ctx) + _, span := lg.Span(ctx) defer span.End() s := &state{streams: make(map[string]*locker.Locked[event.Events])} return &memstore{locker.New(s)}, nil } func (m *memstore) EventLog(ctx context.Context, streamID string) (driver.EventLog, error) { - ctx, span := logz.Span(ctx) + ctx, span := lg.Span(ctx) defer span.End() el := &eventLog{streamID: streamID} err := m.state.Modify(ctx, func(state *state) error { - _, span := logz.Span(ctx) + _, span := lg.Span(ctx) defer span.End() l, ok := state.streams[streamID] @@ -70,16 +70,16 @@ var _ driver.EventLog = (*eventLog)(nil) // Append implements driver.EventStore func (m *eventLog) Append(ctx context.Context, events event.Events, version uint64) (uint64, error) { - ctx, span := logz.Span(ctx) + ctx, span := lg.Span(ctx) defer span.End() event.SetStreamID(m.streamID, events...) return uint64(len(events)), m.events.Modify(ctx, func(stream *event.Events) error { - _, span := logz.Span(ctx) + _, span := lg.Span(ctx) defer span.End() - span.AddEvent(fmt.Sprintf(" %s %#v %d", m.streamID, stream, len(*stream))) + span.AddEvent(fmt.Sprintf(" %s %d", m.streamID, len(*stream))) last := uint64(len(*stream)) if version != AppendOnly && version != last { @@ -100,16 +100,16 @@ func (m *eventLog) Append(ctx context.Context, events event.Events, version uint // Read implements driver.EventStore func (m *eventLog) Read(ctx context.Context, pos int64, count int64) (event.Events, error) { - ctx, span := logz.Span(ctx) + ctx, span := lg.Span(ctx) defer span.End() var events event.Events err := m.events.Modify(ctx, func(stream *event.Events) error { - _, span := logz.Span(ctx) + _, span := lg.Span(ctx) defer span.End() - span.AddEvent(fmt.Sprintf(" %s %#v %d", m.streamID, stream, len(*stream))) + span.AddEvent(fmt.Sprintf("%s %d", m.streamID, len(*stream))) first := stream.First().EventMeta().Position last := stream.Last().EventMeta().Position @@ -154,7 +154,7 @@ func (m *eventLog) Read(ctx context.Context, pos int64, count int64) (event.Even // FirstIndex for the streamID func (m *eventLog) FirstIndex(ctx context.Context) (uint64, error) { - _, span := logz.Span(ctx) + _, span := lg.Span(ctx) defer span.End() events, err := m.events.Copy(ctx) @@ -163,7 +163,7 @@ func (m *eventLog) FirstIndex(ctx context.Context) (uint64, error) { // LastIndex for the streamID func (m *eventLog) LastIndex(ctx context.Context) (uint64, error) { - _, span := logz.Span(ctx) + _, span := lg.Span(ctx) defer span.End() events, err := m.events.Copy(ctx) diff --git a/pkg/es/driver/projecter/projecter.go b/pkg/es/driver/projecter/projecter.go new file mode 100644 index 0000000..28a5309 --- /dev/null +++ b/pkg/es/driver/projecter/projecter.go @@ -0,0 +1,127 @@ +package projecter + +import ( + "context" + "strings" + + "github.com/sour-is/ev/internal/lg" + "github.com/sour-is/ev/pkg/es" + "github.com/sour-is/ev/pkg/es/driver" + "github.com/sour-is/ev/pkg/es/event" +) + +type projector struct { + up driver.Driver +} + +func New(ctx context.Context) *projector { + return &projector{} +} +func (p *projector) Apply(e *es.EventStore) { + p.up = e.Driver + e.Driver = p +} +func (s *projector) Unwrap() driver.Driver { + return s.up +} +func (s *projector) Open(ctx context.Context, dsn string) (driver.Driver, error) { + ctx, span := lg.Span(ctx) + defer span.End() + + return s.up.Open(ctx, dsn) +} +func (s *projector) EventLog(ctx context.Context, streamID string) (driver.EventLog, error) { + ctx, span := lg.Span(ctx) + defer span.End() + + l, err := s.up.EventLog(ctx, streamID) + return &wrapper{l, s}, err +} + +type wrapper struct { + up driver.EventLog + projector *projector +} + +var _ driver.EventLog = (*wrapper)(nil) + +func (w *wrapper) Read(ctx context.Context, pos int64, count int64) (event.Events, error) { + ctx, span := lg.Span(ctx) + defer span.End() + + return w.up.Read(ctx, pos, count) +} +func (w *wrapper) Append(ctx context.Context, events event.Events, version uint64) (uint64, error) { + ctx, span := lg.Span(ctx) + defer span.End() + + i, err := w.up.Append(ctx, events, version) + if err != nil { + return i, err + } + + { + ctx, span := lg.Fork(ctx) + + go func() { + defer span.End() + + var pevents []event.Event + + for i := range events { + e := events[i] + eventType := event.TypeOf(e) + streamID := e.EventMeta().StreamID + streamPos := e.EventMeta().Position + + e1 := event.NewPtr(streamID, streamPos) + event.SetStreamID("$all", e1) + + e2 := event.NewPtr(streamID, streamPos) + event.SetStreamID("$type-"+eventType, e2) + + e3 := event.NewPtr(streamID, streamPos) + pkg, _, _ := strings.Cut(eventType, ".") + event.SetStreamID("$pkg-"+pkg, e3) + + pevents = append( + pevents, + e1, + e2, + e3, + ) + } + + for i := range pevents { + e := pevents[i] + l, err := w.projector.up.EventLog(ctx, event.StreamID(e).StreamID()) + if err != nil { + span.RecordError(err) + continue + } + _, err = l.Append(ctx, event.NewEvents(e), es.AppendOnly) + span.RecordError(err) + } + }() + } + + return i, err +} +func (w *wrapper) FirstIndex(ctx context.Context) (uint64, error) { + ctx, span := lg.Span(ctx) + defer span.End() + + return w.up.FirstIndex(ctx) +} +func (w *wrapper) LastIndex(ctx context.Context) (uint64, error) { + ctx, span := lg.Span(ctx) + defer span.End() + + return w.up.LastIndex(ctx) +} +func (w *wrapper) LoadForUpdate(ctx context.Context, a event.Aggregate, fn func(context.Context, event.Aggregate) error) (uint64, error) { + ctx, span := lg.Span(ctx) + defer span.End() + + return w.up.LoadForUpdate(ctx, a, fn) +} diff --git a/pkg/es/driver/streamer/streamer.go b/pkg/es/driver/streamer/streamer.go index ccc6507..5faa065 100644 --- a/pkg/es/driver/streamer/streamer.go +++ b/pkg/es/driver/streamer/streamer.go @@ -2,12 +2,15 @@ package streamer import ( "context" + "fmt" - "github.com/sour-is/ev/internal/logz" + "github.com/sour-is/ev/internal/lg" "github.com/sour-is/ev/pkg/es" "github.com/sour-is/ev/pkg/es/driver" "github.com/sour-is/ev/pkg/es/event" "github.com/sour-is/ev/pkg/locker" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" ) type state struct { @@ -20,7 +23,7 @@ type streamer struct { } func New(ctx context.Context) *streamer { - ctx, span := logz.Span(ctx) + _, span := lg.Span(ctx) defer span.End() return &streamer{state: locker.New(&state{subscribers: map[string][]*subscription{}})} @@ -39,13 +42,13 @@ func (s *streamer) Unwrap() driver.Driver { var _ driver.Driver = (*streamer)(nil) func (s *streamer) Open(ctx context.Context, dsn string) (driver.Driver, error) { - ctx, span := logz.Span(ctx) + ctx, span := lg.Span(ctx) defer span.End() return s.up.Open(ctx, dsn) } func (s *streamer) EventLog(ctx context.Context, streamID string) (driver.EventLog, error) { - ctx, span := logz.Span(ctx) + ctx, span := lg.Span(ctx) defer span.End() l, err := s.up.EventLog(ctx, streamID) @@ -55,7 +58,7 @@ func (s *streamer) EventLog(ctx context.Context, streamID string) (driver.EventL var _ driver.EventStream = (*streamer)(nil) func (s *streamer) Subscribe(ctx context.Context, streamID string, start int64) (driver.Subscription, error) { - ctx, span := logz.Span(ctx) + ctx, span := lg.Span(ctx) defer span.End() events, err := s.up.EventLog(ctx, streamID) @@ -75,22 +78,24 @@ func (s *streamer) Subscribe(ctx context.Context, streamID string, start int64) }) } func (s *streamer) Send(ctx context.Context, streamID string, events event.Events) error { - ctx, span := logz.Span(ctx) + ctx, span := lg.Span(ctx) defer span.End() return s.state.Modify(ctx, func(state *state) error { - ctx, span := logz.Span(ctx) + ctx, span := lg.Span(ctx) defer span.End() + span.AddEvent(fmt.Sprint("subscribers=", len(state.subscribers[streamID]))) for _, sub := range state.subscribers[streamID] { err := sub.position.Modify(ctx, func(position *position) error { - _, span := logz.Span(ctx) + _, span := lg.Span(ctx) defer span.End() position.size = int64(events.Last().EventMeta().Position - uint64(position.idx)) if position.wait != nil { close(position.wait) + position.link = trace.LinkFromContext(ctx, attribute.String("src", "event")) position.wait = nil } return nil @@ -102,17 +107,16 @@ func (s *streamer) Send(ctx context.Context, streamID string, events event.Event return nil }) } - func (s *streamer) delete(streamID string, sub *subscription) func(context.Context) error { return func(ctx context.Context) error { - ctx, span := logz.Span(ctx) + ctx, span := lg.Span(ctx) defer span.End() if err := ctx.Err(); err != nil { return err } return s.state.Modify(ctx, func(state *state) error { - _, span := logz.Span(ctx) + _, span := lg.Span(ctx) defer span.End() lis := state.subscribers[streamID] @@ -138,44 +142,53 @@ type wrapper struct { var _ driver.EventLog = (*wrapper)(nil) func (w *wrapper) Read(ctx context.Context, pos int64, count int64) (event.Events, error) { - ctx, span := logz.Span(ctx) + ctx, span := lg.Span(ctx) defer span.End() return w.up.Read(ctx, pos, count) } - func (w *wrapper) Append(ctx context.Context, events event.Events, version uint64) (uint64, error) { - ctx, span := logz.Span(ctx) + ctx, span := lg.Span(ctx) defer span.End() i, err := w.up.Append(ctx, events, version) if err != nil { return i, err } - return i, w.streamer.Send(ctx, w.topic, events) -} + ctx, span = lg.Fork(ctx) + go func() { + defer span.End() + + err := w.streamer.Send(ctx, w.topic, events) + span.RecordError(err) + }() + + return i, nil +} func (w *wrapper) FirstIndex(ctx context.Context) (uint64, error) { - ctx, span := logz.Span(ctx) + ctx, span := lg.Span(ctx) defer span.End() return w.up.FirstIndex(ctx) } - func (w *wrapper) LastIndex(ctx context.Context) (uint64, error) { - ctx, span := logz.Span(ctx) + ctx, span := lg.Span(ctx) defer span.End() return w.up.LastIndex(ctx) } - func (w *wrapper) LoadForUpdate(ctx context.Context, a event.Aggregate, fn func(context.Context, event.Aggregate) error) (uint64, error) { + ctx, span := lg.Span(ctx) + defer span.End() + return w.up.LoadForUpdate(ctx, a, fn) } type position struct { size int64 idx int64 + link trace.Link wait chan struct{} } @@ -189,13 +202,13 @@ type subscription struct { } func (s *subscription) Recv(ctx context.Context) bool { - ctx, span := logz.Span(ctx) + ctx, span := lg.Span(ctx) defer span.End() var wait func(context.Context) bool err := s.position.Modify(ctx, func(position *position) error { - _, span := logz.Span(ctx) + _, span := lg.Span(ctx) defer span.End() if position.size == es.AllEvents { @@ -204,11 +217,17 @@ func (s *subscription) Recv(ctx context.Context) bool { if position.size == 0 { position.wait = make(chan struct{}) wait = func(ctx context.Context) bool { - ctx, span := logz.Span(ctx) + ctx, span := lg.Span(ctx) defer span.End() select { case <-position.wait: + if position.link.SpanContext.IsValid() { + _, span := lg.Span(ctx, trace.WithLinks(position.link)) + span.AddEvent("recv event") + span.End() + position.link = trace.Link{} + } return true case <-ctx.Done(): @@ -232,12 +251,12 @@ func (s *subscription) Recv(ctx context.Context) bool { return true } func (s *subscription) Events(ctx context.Context) (event.Events, error) { - ctx, span := logz.Span(ctx) + ctx, span := lg.Span(ctx) defer span.End() var events event.Events return events, s.position.Modify(ctx, func(position *position) error { - ctx, span := logz.Span(ctx) + ctx, span := lg.Span(ctx) defer span.End() var err error @@ -253,7 +272,7 @@ func (s *subscription) Events(ctx context.Context) (event.Events, error) { }) } func (s *subscription) Close(ctx context.Context) error { - ctx, span := logz.Span(ctx) + ctx, span := lg.Span(ctx) defer span.End() return s.unsub(ctx) diff --git a/pkg/es/es.go b/pkg/es/es.go index 35ae4db..e5604eb 100644 --- a/pkg/es/es.go +++ b/pkg/es/es.go @@ -6,7 +6,7 @@ import ( "fmt" "strings" - "github.com/sour-is/ev/internal/logz" + "github.com/sour-is/ev/internal/lg" "github.com/sour-is/ev/pkg/es/driver" "github.com/sour-is/ev/pkg/es/event" "github.com/sour-is/ev/pkg/locker" @@ -25,11 +25,8 @@ var ( drivers = locker.New(&config{drivers: make(map[string]driver.Driver)}) ) -func Register(ctx context.Context, name string, d driver.Driver) error { - ctx, span := logz.Span(ctx) - defer span.End() - - m := logz.Meter(ctx) +func Init(ctx context.Context) error { + m := lg.Meter(ctx) var err, errs error Mes_open, err = m.SyncInt64().Counter("es_open") @@ -47,15 +44,20 @@ func Register(ctx context.Context, name string, d driver.Driver) error { Mes_append, err = m.SyncInt64().Counter("es_append") errs = multierr.Append(errs, err) - err = drivers.Modify(ctx, func(c *config) error { + return errs +} + +func Register(ctx context.Context, name string, d driver.Driver) error { + ctx, span := lg.Span(ctx) + defer span.End() + + return drivers.Modify(ctx, func(c *config) error { if _, set := c.drivers[name]; set { return fmt.Errorf("driver %s already set", name) } c.drivers[name] = d return nil }) - - return multierr.Append(errs, err) } type EventStore struct { @@ -71,7 +73,7 @@ var ( ) func Open(ctx context.Context, dsn string, options ...Option) (*EventStore, error) { - ctx, span := logz.Span(ctx) + ctx, span := lg.Span(ctx) defer span.End() Mes_open.Add(ctx, 1) @@ -107,7 +109,7 @@ type Option interface { } func (es *EventStore) Save(ctx context.Context, agg event.Aggregate) (uint64, error) { - ctx, span := logz.Span(ctx) + ctx, span := lg.Span(ctx) defer span.End() events := agg.Events(true) @@ -131,7 +133,7 @@ func (es *EventStore) Save(ctx context.Context, agg event.Aggregate) (uint64, er return count, err } func (es *EventStore) Load(ctx context.Context, agg event.Aggregate) error { - ctx, span := logz.Span(ctx) + ctx, span := lg.Span(ctx) defer span.End() Mes_load.Add(ctx, 1) @@ -151,7 +153,7 @@ func (es *EventStore) Load(ctx context.Context, agg event.Aggregate) error { return nil } func (es *EventStore) Read(ctx context.Context, streamID string, pos, count int64) (event.Events, error) { - ctx, span := logz.Span(ctx) + ctx, span := lg.Span(ctx) defer span.End() Mes_read.Add(ctx, 1) @@ -163,7 +165,7 @@ func (es *EventStore) Read(ctx context.Context, streamID string, pos, count int6 return l.Read(ctx, pos, count) } func (es *EventStore) Append(ctx context.Context, streamID string, events event.Events) (uint64, error) { - ctx, span := logz.Span(ctx) + ctx, span := lg.Span(ctx) defer span.End() Mes_append.Add(ctx, 1) @@ -175,7 +177,7 @@ func (es *EventStore) Append(ctx context.Context, streamID string, events event. return l.Append(ctx, events, AppendOnly) } func (es *EventStore) FirstIndex(ctx context.Context, streamID string) (uint64, error) { - ctx, span := logz.Span(ctx) + ctx, span := lg.Span(ctx) defer span.End() l, err := es.Driver.EventLog(ctx, streamID) @@ -185,7 +187,7 @@ func (es *EventStore) FirstIndex(ctx context.Context, streamID string) (uint64, return l.FirstIndex(ctx) } func (es *EventStore) LastIndex(ctx context.Context, streamID string) (uint64, error) { - ctx, span := logz.Span(ctx) + ctx, span := lg.Span(ctx) defer span.End() l, err := es.Driver.EventLog(ctx, streamID) @@ -224,10 +226,14 @@ type PA[T any] interface { event.Aggregate *T } +type PE[T any] interface { + event.Event + *T +} // Create uses fn to create a new aggregate and store in db. func Create[A any, T PA[A]](ctx context.Context, es *EventStore, streamID string, fn func(context.Context, T) error) (agg T, err error) { - ctx, span := logz.Span(ctx) + ctx, span := lg.Span(ctx) defer span.End() agg = new(A) @@ -257,7 +263,7 @@ func Create[A any, T PA[A]](ctx context.Context, es *EventStore, streamID string // Update uses fn to update an exsisting aggregate and store in db. func Update[A any, T PA[A]](ctx context.Context, es *EventStore, streamID string, fn func(context.Context, T) error) (agg T, err error) { - ctx, span := logz.Span(ctx) + ctx, span := lg.Span(ctx) defer span.End() agg = new(A) @@ -281,3 +287,30 @@ func Update[A any, T PA[A]](ctx context.Context, es *EventStore, streamID string return } + +// Update uses fn to update an exsisting aggregate and store in db. +func Upsert[A any, T PA[A]](ctx context.Context, es *EventStore, streamID string, fn func(context.Context, T) error) (agg T, err error) { + ctx, span := lg.Span(ctx) + defer span.End() + + agg = new(A) + agg.SetStreamID(streamID) + + if err = es.Load(ctx, agg); err != nil { + return + } + + if err = fn(ctx, agg); err != nil { + return + } + + if err = event.ShouldExist(agg); err != nil { + return + } + + if _, err = es.Save(ctx, agg); err != nil { + return + } + + return +} diff --git a/pkg/es/es.graphqls b/pkg/es/es.graphqls index 836253c..59fc253 100644 --- a/pkg/es/es.graphqls +++ b/pkg/es/es.graphqls @@ -4,4 +4,21 @@ type Meta @goModel(model: "github.com/sour-is/ev/pkg/es/event.Meta") { streamID: String! created: Time! position: Int! +} + +extend type Query { + events(streamID: String! paging: PageInput): Connection! +} +extend type Subscription { + """after == 0 start from begining, after == -1 start from end""" + eventAdded(streamID: String! after: Int! = -1): Event +} + +type Event implements Edge @goModel(model: "github.com/sour-is/ev/pkg/es.GQLEvent") { + id: ID! + + eventID: String! + values: Map! + bytes: String! + meta: Meta! } \ No newline at end of file diff --git a/pkg/es/event/aggregate.go b/pkg/es/event/aggregate.go index 6b0e9f3..1e00436 100644 --- a/pkg/es/event/aggregate.go +++ b/pkg/es/event/aggregate.go @@ -29,16 +29,16 @@ func Append(a Aggregate, lis ...Event) { // NotExists returns error if there are no events present. func NotExists(a Aggregate) error { - if a.StreamVersion() != 0 { - return fmt.Errorf("%w, got version == %d", ErrShouldNotExist, a.StreamVersion()) + if a.Version() != 0 { + return fmt.Errorf("%w, got version == %d", ErrShouldNotExist, a.Version()) } return nil } // ShouldExists returns error if there are no events present. func ShouldExist(a Aggregate) error { - if a.StreamVersion() == 0 { - return fmt.Errorf("%w, got version == %d", ErrShouldExist, a.StreamVersion()) + if a.Version() == 0 { + return fmt.Errorf("%w, got version == %d", ErrShouldExist, a.Version()) } return nil } diff --git a/pkg/es/event/events.go b/pkg/es/event/events.go index be42a2d..b81a57f 100644 --- a/pkg/es/event/events.go +++ b/pkg/es/event/events.go @@ -1,11 +1,13 @@ package event import ( + "context" "crypto/rand" "encoding" "encoding/json" "fmt" "io" + "strconv" "strings" "sync" "time" @@ -129,6 +131,10 @@ func (m Meta) Created() time.Time { } func (m Meta) GetEventID() string { return m.EventID.String() } +func Init(ctx context.Context) error { + return Register(ctx, NilEvent, &eventPtr{}) +} + type nilEvent struct{} func (*nilEvent) EventMeta() Meta { @@ -144,3 +150,61 @@ func (e *nilEvent) MarshalBinary() ([]byte, error) { func (e *nilEvent) UnmarshalBinary(b []byte) error { return json.Unmarshal(b, e) } + +type eventPtr struct { + streamID string + pos uint64 + + eventMeta Meta +} + +var _ Event = (*eventPtr)(nil) + +func NewPtr(streamID string, pos uint64) *eventPtr { + return &eventPtr{streamID: streamID, pos: pos} +} + +// MarshalBinary implements Event +func (e *eventPtr) MarshalBinary() (data []byte, err error) { + return []byte(fmt.Sprintf("%s@%d", e.streamID, e.pos)), nil +} + +// UnmarshalBinary implements Event +func (e *eventPtr) UnmarshalBinary(data []byte) error { + s := string(data) + idx := strings.LastIndex(s, "@") + if idx == -1 { + return fmt.Errorf("missing @ in: %s", s) + } + e.streamID = s[:idx] + var err error + e.pos, err = strconv.ParseUint(s[idx+1:], 10, 64) + + return err +} + +// EventMeta implements Event +func (e *eventPtr) EventMeta() Meta { + if e == nil { + return Meta{} + } + return e.eventMeta +} + +// SetEventMeta implements Event +func (e *eventPtr) SetEventMeta(m Meta) { + if e == nil { + return + } + e.eventMeta = m +} + +func (e *eventPtr) Values() any { + return struct { + StreamID string `json:"stream_id"` + Pos uint64 `json:"pos"` + }{ + e.streamID, + e.pos, + } +} diff --git a/pkg/es/event/reflect.go b/pkg/es/event/reflect.go index f6faef5..2610d57 100644 --- a/pkg/es/event/reflect.go +++ b/pkg/es/event/reflect.go @@ -9,7 +9,7 @@ import ( "reflect" "strings" - "github.com/sour-is/ev/internal/logz" + "github.com/sour-is/ev/internal/lg" "github.com/sour-is/ev/pkg/locker" ) @@ -67,7 +67,7 @@ func (u *UnknownEvent) MarshalBinary() ([]byte, error) { // Register a type container for Unmarshalling values into. The type must implement Event and not be a nil value. func Register(ctx context.Context, lis ...Event) error { - _, span := logz.Span(ctx) + _, span := lg.Span(ctx) defer span.End() for _, e := range lis { @@ -84,7 +84,7 @@ func Register(ctx context.Context, lis ...Event) error { return nil } func RegisterName(ctx context.Context, name string, e Event) error { - _, span := logz.Span(ctx) + _, span := lg.Span(ctx) defer span.End() if e == nil { @@ -107,7 +107,7 @@ func RegisterName(ctx context.Context, name string, e Event) error { span.AddEvent("register: " + name) if err := eventTypes.Modify(ctx, func(c *config) error { - _, span := logz.Span(ctx) + _, span := lg.Span(ctx) defer span.End() c.eventTypes[name] = typ @@ -119,13 +119,13 @@ func RegisterName(ctx context.Context, name string, e Event) error { return nil } func GetContainer(ctx context.Context, s string) Event { - _, span := logz.Span(ctx) + _, span := lg.Span(ctx) defer span.End() var e Event eventTypes.Modify(ctx, func(c *config) error { - _, span := logz.Span(ctx) + _, span := lg.Span(ctx) defer span.End() typ, ok := c.eventTypes[s] @@ -176,7 +176,7 @@ func MarshalBinary(e Event) (txt []byte, err error) { } func UnmarshalBinary(ctx context.Context, txt []byte, pos uint64) (e Event, err error) { - _, span := logz.Span(ctx) + _, span := lg.Span(ctx) defer span.End() sp := bytes.SplitN(txt, []byte{'\t'}, 4) @@ -246,3 +246,29 @@ func embedJSON(s string) json.RawMessage { } return []byte(fmt.Sprintf(`"%s"`, strings.Replace(s, `"`, `\"`, -1))) } + +func Values(e Event) map[string]any { + var a any = e + + if e, ok := e.(interface{ Values() any }); ok { + a = e.Values() + } + + m := make(map[string]any) + v := reflect.Indirect(reflect.ValueOf(a)) + for _, idx := range reflect.VisibleFields(v.Type()) { + if !idx.IsExported() { + continue + } + + field := v.FieldByIndex(idx.Index) + + name := idx.Name + if n, ok := idx.Tag.Lookup("json"); ok { + name = n + } + + m[name] = field.Interface() + } + return m +} diff --git a/pkg/es/graph.go b/pkg/es/graph.go new file mode 100644 index 0000000..68f4ded --- /dev/null +++ b/pkg/es/graph.go @@ -0,0 +1,132 @@ +package es + +import ( + "context" + "fmt" + "net/http" + "time" + + "github.com/sour-is/ev/internal/lg" + "github.com/sour-is/ev/pkg/es/event" + "github.com/sour-is/ev/pkg/gql" +) + +type EventResolver interface { + Events(ctx context.Context, streamID string, paging *gql.PageInput) (*gql.Connection, error) + EventAdded(ctx context.Context, streamID string, after int64) (<-chan *GQLEvent, error) +} + +func (es *EventStore) Events(ctx context.Context, streamID string, paging *gql.PageInput) (*gql.Connection, error) { + ctx, span := lg.Span(ctx) + defer span.End() + + lis, err := es.Read(ctx, streamID, paging.GetIdx(0), paging.GetCount(30)) + if err != nil { + span.RecordError(err) + return nil, err + } + + edges := make([]gql.Edge, 0, len(lis)) + for i := range lis { + span.AddEvent(fmt.Sprint("event ", i, " of ", len(lis))) + edges = append(edges, &GQLEvent{lis[i]}) + } + + var first, last uint64 + if first, err = es.FirstIndex(ctx, streamID); err != nil { + span.RecordError(err) + return nil, err + } + if last, err = es.LastIndex(ctx, streamID); err != nil { + span.RecordError(err) + return nil, err + } + + return &gql.Connection{ + Paging: &gql.PageInfo{ + Next: lis.Last().EventMeta().Position < last, + Prev: lis.First().EventMeta().Position > first, + Begin: lis.First().EventMeta().Position, + End: lis.Last().EventMeta().Position, + }, + Edges: edges, + }, nil +} +func (e *EventStore) EventAdded(ctx context.Context, streamID string, after int64) (<-chan *GQLEvent, error) { + ctx, span := lg.Span(ctx) + defer span.End() + + es := e.EventStream() + if es == nil { + return nil, fmt.Errorf("EventStore does not implement streaming") + } + + sub, err := es.Subscribe(ctx, streamID, after) + if err != nil { + span.RecordError(err) + return nil, err + } + + ch := make(chan *GQLEvent) + + go func() { + ctx, span := lg.Span(ctx) + defer span.End() + + { + ctx, span := lg.Fork(ctx) + defer func() { + defer span.End() + ctx, cancel := context.WithTimeout(ctx, 1*time.Second) + defer cancel() + err := sub.Close(ctx) + span.RecordError(err) + }() + } + + for sub.Recv(ctx) { + events, err := sub.Events(ctx) + if err != nil { + span.RecordError(err) + break + } + span.AddEvent(fmt.Sprintf("received %d events", len(events))) + + for i := range events { + select { + case ch <- &GQLEvent{events[i]}: + continue + case <-ctx.Done(): + return + } + + } + } + }() + + return ch, nil +} +func (*EventStore) RegisterHTTP(*http.ServeMux) {} + +type GQLEvent struct { + e event.Event +} + +func (e *GQLEvent) ID() string { + return fmt.Sprint(e.e.EventMeta().StreamID, "@", e.e.EventMeta().Position) +} +func (e *GQLEvent) EventID() string { + return e.e.EventMeta().GetEventID() +} +func (e *GQLEvent) Values() map[string]interface{} { + return event.Values(e.e) +} +func (e *GQLEvent) Bytes() (string, error) { + b, err := e.e.MarshalBinary() + return string(b), err +} +func (e *GQLEvent) Meta() *event.Meta { + meta := e.e.EventMeta() + return &meta +} +func (e *GQLEvent) IsEdge() {} diff --git a/pkg/locker/locker.go b/pkg/locker/locker.go index b7bf629..521b596 100644 --- a/pkg/locker/locker.go +++ b/pkg/locker/locker.go @@ -2,6 +2,8 @@ package locker import ( "context" + + "github.com/sour-is/ev/internal/lg" ) type Locked[T any] struct { @@ -18,6 +20,9 @@ func New[T any](initial *T) *Locked[T] { // Modify will call the function with the locked value func (s *Locked[T]) Modify(ctx context.Context, fn func(*T) error) error { + _, span := lg.Span(ctx) + defer span.End() + if ctx.Err() != nil { return ctx.Err() } diff --git a/pkg/set/set.go b/pkg/set/set.go new file mode 100644 index 0000000..cbd238b --- /dev/null +++ b/pkg/set/set.go @@ -0,0 +1,35 @@ +package set + +import ( + "fmt" + "strings" +) + +type Set[T comparable] map[T]struct{} + +func New[T comparable](items ...T) Set[T] { + s := make(map[T]struct{}, len(items)) + for i := range items { + s[items[i]] = struct{}{} + } + return s +} +func (s Set[T]) Has(v T) bool { + _, ok := (s)[v] + return ok +} +func (s Set[T]) String() string { + if s == nil { + return "set()" + } + lis := make([]string, 0, len(s)) + for k := range s { + lis = append(lis, fmt.Sprint(k)) + } + + var b strings.Builder + b.WriteString("set(") + b.WriteString(strings.Join(lis, ",")) + b.WriteString(")") + return b.String() +}