From 6569c58e37645a35f3fffd3f4607b2485847b26c Mon Sep 17 00:00:00 2001 From: Jon Lundy Date: Sun, 30 Oct 2022 09:18:08 -0600 Subject: [PATCH] feat: add resolvelinks --- app/gql/resolver.go | 44 ++++++++- app/peerfinder/peer.go | 72 ++++++++++---- app/peerfinder/service.go | 115 ++++++++++++++++++---- go.mod | 2 +- httpmux.go | 5 + main.go | 18 +++- pkg/es/driver/disk-store/disk-store.go | 86 ++++++++++++---- pkg/es/driver/driver.go | 1 + pkg/es/driver/mem-store/mem-store.go | 70 +++++++++++-- pkg/es/driver/projecter/projecter.go | 10 +- pkg/es/driver/projecter/projector_test.go | 8 ++ pkg/es/driver/streamer/streamer.go | 18 ++-- pkg/es/es.go | 26 ++++- pkg/es/event/events.go | 34 +++---- pkg/es/event/reflect.go | 4 +- pkg/es/graph.go | 2 +- pkg/locker/locker.go | 6 +- pkg/locker/locker_test.go | 6 +- 18 files changed, 421 insertions(+), 106 deletions(-) diff --git a/app/gql/resolver.go b/app/gql/resolver.go index a62737d..4dcba3f 100644 --- a/app/gql/resolver.go +++ b/app/gql/resolver.go @@ -7,9 +7,16 @@ import ( "os" "reflect" "runtime/debug" + "time" + "github.com/99designs/gqlgen/graphql" "github.com/99designs/gqlgen/graphql/handler" + "github.com/99designs/gqlgen/graphql/handler/extension" + "github.com/99designs/gqlgen/graphql/handler/lru" + "github.com/99designs/gqlgen/graphql/handler/transport" + "github.com/gorilla/websocket" "github.com/ravilushqa/otelgqlgen" + "github.com/sour-is/ev/app/gql/graphiql" "github.com/sour-is/ev/app/gql/playground" "github.com/sour-is/ev/app/msgbus" "github.com/sour-is/ev/app/salty" @@ -86,11 +93,13 @@ func (r *Resolver) ChainMiddlewares(h http.Handler) http.Handler { } func (r *Resolver) RegisterHTTP(mux *http.ServeMux) { - gql := handler.NewDefaultServer(generated.NewExecutableSchema(generated.Config{Resolvers: r})) + gql := NewServer(generated.NewExecutableSchema(generated.Config{Resolvers: r})) + gql.SetRecoverFunc(NoopRecover) gql.Use(otelgqlgen.Middleware()) - mux.Handle("/", playground.Handler("GraphQL playground", "/gql")) + mux.Handle("/graphiql", graphiql.Handler("GraphiQL playground", "/gql")) mux.Handle("/gql", lg.Htrace(r.ChainMiddlewares(gql), "gql")) + mux.Handle("/playground", playground.Handler("GraphQL playground", "/gql")) } type noop struct{} @@ -130,3 +139,34 @@ func (*noop) EventAdded(ctx context.Context, streamID string, after int64) (<-ch } func (*noop) RegisterHTTP(*http.ServeMux) {} + +func NewServer(es graphql.ExecutableSchema) *handler.Server { + srv := handler.New(es) + + srv.AddTransport(transport.Websocket{ + Upgrader: websocket.Upgrader{ + CheckOrigin: func(r *http.Request) bool { + switch r.Header.Get("Origin") { + case "https://ev.sour.is", "https://www.graphqlbin.com": + return true + default: + return false + } + }, + }, + KeepAlivePingInterval: 10 * time.Second, + }) + srv.AddTransport(transport.Options{}) + srv.AddTransport(transport.GET{}) + srv.AddTransport(transport.POST{}) + srv.AddTransport(transport.MultipartForm{}) + + srv.SetQueryCache(lru.New(1000)) + + srv.Use(extension.Introspection{}) + srv.Use(extension.AutomaticPersistedQuery{ + Cache: lru.New(100), + }) + + return srv +} diff --git a/app/peerfinder/peer.go b/app/peerfinder/peer.go index 78954e6..333ca3e 100644 --- a/app/peerfinder/peer.go +++ b/app/peerfinder/peer.go @@ -11,26 +11,58 @@ import ( "github.com/sour-is/ev/pkg/es/event" ) -type Request struct { +type Request struct{ + event.AggregateRoot + + RequestIP string `json:"req_ip"` + Hidden bool `json:"hide,omitempty"` + + Responses []Response `json:"responses"` +} + +var _ event.Aggregate = (*Request)(nil) +func (a *Request) ApplyEvent(lis ...event.Event) { + for _, e := range lis { + switch e:=e.(type) { + case *RequestSubmitted: + a.RequestIP = e.RequestIP + a.Hidden = e.Hidden + case *ResultSubmitted: + a.Responses = append(a.Responses, Response{ + PeerID: e.PeerID, + PeerVersion: e.PeerVersion, + Latency: e.Latency, + }) + } + } +} + +type Response struct { + PeerID string `json:"peer_id"` + PeerVersion string `json:"peer_version"` + Latency float64 `json:"latency,omitempty"` +} + +type RequestSubmitted struct { eventMeta event.Meta RequestIP string `json:"req_ip"` Hidden bool `json:"hide,omitempty"` } -func (r *Request) StreamID() string { +func (r *RequestSubmitted) StreamID() string { return r.EventMeta().GetEventID() } -func (r *Request) RequestID() string { +func (r *RequestSubmitted) RequestID() string { return r.EventMeta().GetEventID() } -func (r *Request) Created() time.Time { +func (r *RequestSubmitted) Created() time.Time { return r.EventMeta().Created() } -func (r *Request) CreatedString() string { +func (r *RequestSubmitted) CreatedString() string { return r.Created().Format("2006-01-02 15:04:05") } -func (r *Request) Family() int { +func (r *RequestSubmitted) Family() int { if r == nil { return 0 } @@ -46,26 +78,26 @@ func (r *Request) Family() int { } } -var _ event.Event = (*Request)(nil) +var _ event.Event = (*RequestSubmitted)(nil) -func (e *Request) EventMeta() event.Meta { +func (e *RequestSubmitted) EventMeta() event.Meta { if e == nil { return event.Meta{} } return e.eventMeta } -func (e *Request) SetEventMeta(m event.Meta) { +func (e *RequestSubmitted) SetEventMeta(m event.Meta) { if e != nil { e.eventMeta = m } } -func (e *Request) MarshalBinary() (text []byte, err error) { +func (e *RequestSubmitted) MarshalBinary() (text []byte, err error) { return json.Marshal(e) } -func (e *Request) UnmarshalBinary(b []byte) error { +func (e *RequestSubmitted) UnmarshalBinary(b []byte) error { return json.Unmarshal(b, e) } -func (e *Request) MarshalEnviron() ([]byte, error) { +func (e *RequestSubmitted) MarshalEnviron() ([]byte, error) { if e == nil { return nil, nil } @@ -92,7 +124,7 @@ func (e *Request) MarshalEnviron() ([]byte, error) { return b.Bytes(), nil } -type Result struct { +type ResultSubmitted struct { eventMeta event.Meta RequestID string `json:"req_id"` @@ -101,30 +133,30 @@ type Result struct { Latency float64 `json:"latency,omitempty"` } -func (r *Result) Created() time.Time { +func (r *ResultSubmitted) Created() time.Time { return r.eventMeta.Created() } -var _ event.Event = (*Result)(nil) +var _ event.Event = (*ResultSubmitted)(nil) -func (e *Result) EventMeta() event.Meta { +func (e *ResultSubmitted) EventMeta() event.Meta { if e == nil { return event.Meta{} } return e.eventMeta } -func (e *Result) SetEventMeta(m event.Meta) { +func (e *ResultSubmitted) SetEventMeta(m event.Meta) { if e != nil { e.eventMeta = m } } -func (e *Result) MarshalBinary() (text []byte, err error) { +func (e *ResultSubmitted) MarshalBinary() (text []byte, err error) { return json.Marshal(e) } -func (e *Result) UnmarshalBinary(b []byte) error { +func (e *ResultSubmitted) UnmarshalBinary(b []byte) error { return json.Unmarshal(b, e) } -func (e *Result) String() string { +func (e *ResultSubmitted) String() string { return fmt.Sprintf("id: %s\npeer: %s\nversion: %s\nlatency: %0.4f", e.RequestID, e.PeerID, e.PeerVersion, e.Latency) } diff --git a/app/peerfinder/service.go b/app/peerfinder/service.go index eaa4627..fdc086d 100644 --- a/app/peerfinder/service.go +++ b/app/peerfinder/service.go @@ -2,8 +2,12 @@ package peerfinder import ( "context" + "embed" "encoding/json" + "html/template" "io" + "io/fs" + "log" "net" "net/http" "strconv" @@ -16,24 +20,39 @@ import ( "github.com/sour-is/ev/internal/lg" "github.com/sour-is/ev/pkg/es" "github.com/sour-is/ev/pkg/es/event" + "github.com/sour-is/ev/pkg/locker" ) const ( - queueRequests = "pf-requests" - queueResponses = "pf-response-" aggInfo = "pf-info" + queueRequests = "pf-requests" + queueResponses = "pf-request-" + queuePeers = "pf-peer-" initVersion = "1.1.0" ) +var ( + //go:embed pages/* layouts/* assets/* + files embed.FS + templates map[string]*template.Template +) + type service struct { es *es.EventStore + + State locker.Locked[state] +} + +type state struct { + Version string + Requests []Request } func New(ctx context.Context, es *es.EventStore) (*service, error) { ctx, span := lg.Span(ctx) defer span.End() - if err := event.Register(ctx, &Request{}, &Result{}, &VersionChanged{}); err != nil { + if err := event.Register(ctx, &RequestSubmitted{}, &ResultSubmitted{}, &VersionChanged{}); err != nil { span.RecordError(err) return nil, err } @@ -43,8 +62,13 @@ func New(ctx context.Context, es *es.EventStore) (*service, error) { return svc, nil } func (s *service) RegisterHTTP(mux *http.ServeMux) { - mux.Handle("/peers/", lg.Htrace(s, "peers")) + loadTemplates() + a, err := fs.Sub(files, "assets") + log.Println(err) + assets := http.StripPrefix("/peers/assets/", http.FileServer(http.FS(a))) + mux.Handle("/peers/assets/", lg.Htrace(assets, "peer-assets")) + mux.Handle("/peers/", lg.Htrace(s, "peers")) } func (s *service) ServeHTTP(w http.ResponseWriter, r *http.Request) { ctx := r.Context() @@ -66,7 +90,8 @@ func (s *service) ServeHTTP(w http.ResponseWriter, r *http.Request) { return default: - w.WriteHeader(http.StatusNotFound) + t := templates["home.tpl"] + t.Execute(w, nil) return } case http.MethodPost: @@ -113,7 +138,7 @@ func (s *service) getPending(w http.ResponseWriter, r *http.Request, uuid string return } - responses, err := s.es.Read(ctx, queueResponses+uuid, -1, -30) + responses, err := s.es.Read(ctx, queuePeers+uuid, -1, -30) if err != nil { span.RecordError(err) w.WriteHeader(http.StatusInternalServerError) @@ -199,19 +224,17 @@ func (s *service) postRequest(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusBadRequest) return } - - req := &Request{ + req := &RequestSubmitted{ RequestIP: ip.String(), } + if hidden, err := strconv.ParseBool(r.Form.Get("req_hidden")); err != nil { + req.Hidden = hidden + } span.SetAttributes( attribute.Stringer("req_ip", ip), ) - if hidden, err := strconv.ParseBool(r.Form.Get("req_hidden")); err != nil { - req.Hidden = hidden - } - s.es.Append(ctx, queueRequests, event.NewEvents(req)) } func (s *service) postResult(w http.ResponseWriter, r *http.Request, id string) { @@ -238,25 +261,36 @@ func (s *service) postResult(w http.ResponseWriter, r *http.Request, id string) return } - req := &Result{ + req := &ResultSubmitted{ RequestID: id, PeerID: r.Form.Get("peer_id"), PeerVersion: r.Form.Get("peer_version"), Latency: latency, } + span.SetAttributes( attribute.Stringer("result", req), ) - s.es.Append(ctx, queueResponses+id, event.NewEvents(req)) + idx, err := s.es.LastIndex(ctx, queueResponses+id) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + if idx == 0 { + w.WriteHeader(http.StatusNotFound) + return + } + + s.es.Append(ctx, queueRequests, event.NewEvents(req)) } -func filter(requests, responses event.Events) *Request { +func filter(requests, responses event.Events) *RequestSubmitted { have := make(map[string]struct{}, len(responses)) - for _, res := range toList[Result](responses...) { + for _, res := range toList[ResultSubmitted](responses...) { have[res.RequestID] = struct{}{} } - for _, req := range reverse(toList[Request](requests...)...) { + for _, req := range reverse(toList[RequestSubmitted](requests...)...) { if _, ok := have[req.RequestID()]; !ok { return req } @@ -297,3 +331,50 @@ func encodeTo(w io.Writer, fns ...func() ([]byte, error)) (int, error) { } return i, nil } +func loadTemplates() error { + if templates != nil { + return nil + } + templates = make(map[string]*template.Template) + tmplFiles, err := fs.ReadDir(files, "pages") + if err != nil { + return err + } + + for _, tmpl := range tmplFiles { + if tmpl.IsDir() { + continue + } + log.Println(tmpl.Name()) + pt, err := template.ParseFS(files, "pages/"+tmpl.Name(), "layouts/*.tpl") + if err != nil { + return err + } + + templates[tmpl.Name()] = pt + } + return nil +} + +func Projector(e event.Event) []event.Event { + m := e.EventMeta() + streamID := m.StreamID + streamPos := m.Position + + switch e := e.(type) { + case *RequestSubmitted: + e1 := event.NewPtr(streamID, streamPos) + event.SetStreamID(queueResponses+e.RequestID(), e1) + + return []event.Event{e1} + case *ResultSubmitted: + e1 := event.NewPtr(streamID, streamPos) + event.SetStreamID(queueResponses+e.RequestID, e1) + + e2 := event.NewPtr(streamID, streamPos) + event.SetStreamID(queuePeers+e.PeerID, e2) + + return []event.Event{e1, e2} + } + return nil +} diff --git a/go.mod b/go.mod index 599e50c..2314b08 100644 --- a/go.mod +++ b/go.mod @@ -55,7 +55,7 @@ require ( github.com/logrusorgru/aurora v2.0.3+incompatible // indirect github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect - github.com/oklog/ulid v1.3.1 // indirect + github.com/oklog/ulid v1.3.1 github.com/onsi/ginkgo v1.14.0 // indirect github.com/onsi/gomega v1.10.3 // indirect github.com/petermattis/goid v0.0.0-20220331194723-8ee3e6ded87a // indirect diff --git a/httpmux.go b/httpmux.go index a86cd18..811ac58 100644 --- a/httpmux.go +++ b/httpmux.go @@ -34,3 +34,8 @@ func newMux() *mux { return mux } + +type RegisterHTTP func(*http.ServeMux) +func (fn RegisterHTTP) RegisterHTTP(mux *http.ServeMux) { + fn(mux) +} \ No newline at end of file diff --git a/main.go b/main.go index 8c1adbe..9893f57 100644 --- a/main.go +++ b/main.go @@ -22,6 +22,7 @@ import ( diskstore "github.com/sour-is/ev/pkg/es/driver/disk-store" memstore "github.com/sour-is/ev/pkg/es/driver/mem-store" "github.com/sour-is/ev/pkg/es/driver/projecter" + resolvelinks "github.com/sour-is/ev/pkg/es/driver/resolve-links" "github.com/sour-is/ev/pkg/es/driver/streamer" "github.com/sour-is/ev/pkg/es/event" "github.com/sour-is/ev/pkg/set" @@ -61,10 +62,15 @@ func run(ctx context.Context) error { } es, err := es.Open( - ctx, - env("EV_DATA", "mem:"), - streamer.New(ctx), - projecter.New(ctx, projecter.DefaultProjection), + ctx, + env("EV_DATA", "mem:"), + streamer.New(ctx), + projecter.New( + ctx, + projecter.DefaultProjection, + peerfinder.Projector, + ), + resolvelinks.New(), ) if err != nil { span.RecordError(err) @@ -129,7 +135,9 @@ func run(ctx context.Context) error { } svcs = append(svcs, gql) } - svcs = append(svcs, lg.NewHTTP(ctx)) + svcs = append(svcs, lg.NewHTTP(ctx), RegisterHTTP(func(mux *http.ServeMux) { + mux.Handle("/", http.RedirectHandler("/playground", http.StatusTemporaryRedirect)) + })) s.Handler = httpMux(svcs...) diff --git a/pkg/es/driver/disk-store/disk-store.go b/pkg/es/driver/disk-store/disk-store.go index 0dd165d..020e10d 100644 --- a/pkg/es/driver/disk-store/disk-store.go +++ b/pkg/es/driver/disk-store/disk-store.go @@ -94,7 +94,7 @@ func (d *diskStore) Open(ctx context.Context, dsn string) (driver.Driver, error) _, span := lg.Span(ctx) defer span.End() - l.Modify(ctx, func(w *wal.Log) error { + l.Modify(ctx, func(ctx context.Context, w *wal.Log) error { _, span := lg.Span(ctx) defer span.End() @@ -128,7 +128,7 @@ func (d *diskStore) EventLog(ctx context.Context, streamID string) (driver.Event el := &eventLog{streamID: streamID, diskStore: d} - return el, d.openlogs.Modify(ctx, func(openlogs *openlogs) error { + return el, d.openlogs.Modify(ctx, func(ctx context.Context, openlogs *openlogs) error { _, span := lg.Span(ctx) defer span.End() @@ -166,7 +166,7 @@ func (e *eventLog) Append(ctx context.Context, events event.Events, version uint event.SetStreamID(e.streamID, events...) var count uint64 - err := e.events.Modify(ctx, func(l *wal.Log) error { + err := e.events.Modify(ctx, func(ctx context.Context, l *wal.Log) error { _, span := lg.Span(ctx) defer span.End() @@ -215,7 +215,7 @@ func (e *eventLog) Read(ctx context.Context, after, count int64) (event.Events, var events event.Events - err := e.events.Modify(ctx, func(stream *wal.Log) error { + err := e.events.Modify(ctx, func(ctx context.Context, stream *wal.Log) error { _, span := lg.Span(ctx) defer span.End() @@ -244,17 +244,7 @@ func (e *eventLog) Read(ctx context.Context, after, count int64) (event.Events, span.AddEvent(fmt.Sprintf("read event %d of %d", i, len(events))) // --- - var b []byte - b, err = stream.Read(start) - if err != nil { - if errors.Is(err, wal.ErrNotFound) || errors.Is(err, wal.ErrOutOfRange) { - err = fmt.Errorf("%w: empty", es.ErrNotFound) - } - - span.RecordError(err) - return err - } - events[i], err = event.UnmarshalBinary(ctx, b, start) + events[i], err = readStream(ctx, stream, start) if err != nil { span.RecordError(err) return err @@ -283,6 +273,21 @@ func (e *eventLog) Read(ctx context.Context, after, count int64) (event.Events, return events, nil } +func (e *eventLog) ReadN(ctx context.Context, index ...uint64) (event.Events, error) { + _, span := lg.Span(ctx) + defer span.End() + + var events event.Events + err := e.events.Modify(ctx, func(ctx context.Context, stream *wal.Log) error { + var err error + + events, err = readStreamN(ctx, stream, index...) + + return err + }) + + return events, err +} func (e *eventLog) FirstIndex(ctx context.Context) (uint64, error) { _, span := lg.Span(ctx) defer span.End() @@ -290,7 +295,7 @@ func (e *eventLog) FirstIndex(ctx context.Context) (uint64, error) { var idx uint64 var err error - err = e.events.Modify(ctx, func(events *wal.Log) error { + err = e.events.Modify(ctx, func(ctx context.Context, events *wal.Log) error { idx, err = events.FirstIndex() return err }) @@ -304,7 +309,7 @@ func (e *eventLog) LastIndex(ctx context.Context) (uint64, error) { var idx uint64 var err error - err = e.events.Modify(ctx, func(events *wal.Log) error { + err = e.events.Modify(ctx, func(ctx context.Context, events *wal.Log) error { idx, err = events.LastIndex() return err }) @@ -314,3 +319,50 @@ func (e *eventLog) LastIndex(ctx context.Context) (uint64, error) { func (e *eventLog) LoadForUpdate(ctx context.Context, a event.Aggregate, fn func(context.Context, event.Aggregate) error) (uint64, error) { panic("not implemented") } +func readStream(ctx context.Context, stream *wal.Log, index uint64) (event.Event, error) { + _, span := lg.Span(ctx) + defer span.End() + + var b []byte + var err error + b, err = stream.Read(index) + if err != nil { + if errors.Is(err, wal.ErrNotFound) || errors.Is(err, wal.ErrOutOfRange) { + err = fmt.Errorf("%w: empty", es.ErrNotFound) + } + + span.RecordError(err) + return nil, err + } + e, err := event.UnmarshalBinary(ctx, b, index) + if err != nil { + span.RecordError(err) + return nil, err + } + return e, err +} +func readStreamN(ctx context.Context, stream *wal.Log, index ...uint64) (event.Events, error) { + _, span := lg.Span(ctx) + defer span.End() + + var b []byte + var err error + events := make(event.Events, len(index)) + for i, idx := range index { + b, err = stream.Read(idx) + if err != nil { + if errors.Is(err, wal.ErrNotFound) || errors.Is(err, wal.ErrOutOfRange) { + err = fmt.Errorf("%w: empty", es.ErrNotFound) + } + + span.RecordError(err) + return nil, err + } + events[i], err = event.UnmarshalBinary(ctx, b, idx) + if err != nil { + span.RecordError(err) + return nil, err + } + } + return events, err +} \ No newline at end of file diff --git a/pkg/es/driver/driver.go b/pkg/es/driver/driver.go index acdf108..96deccd 100644 --- a/pkg/es/driver/driver.go +++ b/pkg/es/driver/driver.go @@ -14,6 +14,7 @@ type Driver interface { type EventLog interface { Read(ctx context.Context, after, count int64) (event.Events, error) + ReadN(ctx context.Context, index ...uint64) (event.Events, error) Append(ctx context.Context, events event.Events, version uint64) (uint64, error) FirstIndex(context.Context) (uint64, error) LastIndex(context.Context) (uint64, error) diff --git a/pkg/es/driver/mem-store/mem-store.go b/pkg/es/driver/mem-store/mem-store.go index c9af2c4..eeaf78e 100644 --- a/pkg/es/driver/mem-store/mem-store.go +++ b/pkg/es/driver/mem-store/mem-store.go @@ -49,7 +49,7 @@ func (m *memstore) EventLog(ctx context.Context, streamID string) (driver.EventL el := &eventLog{streamID: streamID} - err := m.state.Modify(ctx, func(state *state) error { + err := m.state.Modify(ctx, func(ctx context.Context, state *state) error { _, span := lg.Span(ctx) defer span.End() @@ -76,7 +76,7 @@ func (m *eventLog) Append(ctx context.Context, events event.Events, version uint event.SetStreamID(m.streamID, events...) - return uint64(len(events)), m.events.Modify(ctx, func(stream *event.Events) error { + return uint64(len(events)), m.events.Modify(ctx, func(ctx context.Context, stream *event.Events) error { _, span := lg.Span(ctx) defer span.End() @@ -111,6 +111,23 @@ func (m *eventLog) Append(ctx context.Context, events event.Events, version uint }) } +// ReadOne implements readone +func (m *eventLog) ReadN(ctx context.Context, index ...uint64) (event.Events, error) { + _, span := lg.Span(ctx) + defer span.End() + + var events event.Events + err := m.events.Modify(ctx, func(ctx context.Context, stream *event.Events) error { + var err error + + events, err = readStreamN(ctx, stream, index...) + + return err + }) + + return events, err +} + // Read implements driver.EventStore func (m *eventLog) Read(ctx context.Context, after int64, count int64) (event.Events, error) { ctx, span := lg.Span(ctx) @@ -118,7 +135,7 @@ func (m *eventLog) Read(ctx context.Context, after int64, count int64) (event.Ev var events event.Events - err := m.events.Modify(ctx, func(stream *event.Events) error { + err := m.events.Modify(ctx, func(ctx context.Context, stream *event.Events) error { _, span := lg.Span(ctx) defer span.End() @@ -141,12 +158,8 @@ func (m *eventLog) Read(ctx context.Context, after int64, count int64) (event.Ev span.AddEvent(fmt.Sprintf("read event %d of %d", i, math.Abs(count))) // --- clone event - e := (*stream)[start-1] - b, err := event.MarshalBinary(e) - if err != nil { - return err - } - events[i], err = event.UnmarshalBinary(ctx, b, e.EventMeta().Position) + var err error + events[i], err = readStream(ctx, stream, start) if err != nil { return err } @@ -194,3 +207,42 @@ func (m *eventLog) LastIndex(ctx context.Context) (uint64, error) { func (m *eventLog) LoadForUpdate(ctx context.Context, a event.Aggregate, fn func(context.Context, event.Aggregate) error) (uint64, error) { panic("not implemented") } + +func readStream(ctx context.Context, stream *event.Events, index uint64) (event.Event, error) { + _, span := lg.Span(ctx) + defer span.End() + + var b []byte + var err error + e := (*stream)[index-1] + b, err = event.MarshalBinary(e) + if err != nil { + return nil, err + } + e, err = event.UnmarshalBinary(ctx, b, e.EventMeta().Position) + if err != nil { + return nil, err + } + return e, err +} +func readStreamN(ctx context.Context, stream *event.Events, index ...uint64) (event.Events, error) { + _, span := lg.Span(ctx) + defer span.End() + + var b []byte + var err error + + events := make(event.Events, len(index)) + for i, index := range index { + e := (*stream)[index-1] + b, err = event.MarshalBinary(e) + if err != nil { + return nil, err + } + events[i], err = event.UnmarshalBinary(ctx, b, e.EventMeta().Position) + if err != nil { + return nil, err + } + } + return events, err +} diff --git a/pkg/es/driver/projecter/projecter.go b/pkg/es/driver/projecter/projecter.go index 8a4b7ce..8809ab9 100644 --- a/pkg/es/driver/projecter/projecter.go +++ b/pkg/es/driver/projecter/projecter.go @@ -53,6 +53,12 @@ func (w *wrapper) Read(ctx context.Context, after int64, count int64) (event.Eve return w.up.Read(ctx, after, count) } +func (w *wrapper) ReadN(ctx context.Context, index ...uint64) (event.Events, error) { + ctx, span := lg.Span(ctx) + defer span.End() + + return w.up.ReadN(ctx, index...) +} func (w *wrapper) Append(ctx context.Context, events event.Events, version uint64) (uint64, error) { ctx, span := lg.Span(ctx) defer span.End() @@ -123,10 +129,11 @@ func (w *wrapper) LoadForUpdate(ctx context.Context, a event.Aggregate, fn func( } func DefaultProjection(e event.Event) []event.Event { - eventType := event.TypeOf(e) m := e.EventMeta() streamID := m.StreamID streamPos := m.Position + eventType := event.TypeOf(e) + pkg, _, _ := strings.Cut(eventType, ".") e1 := event.NewPtr(streamID, streamPos) event.SetStreamID("$all", e1) @@ -135,7 +142,6 @@ func DefaultProjection(e event.Event) []event.Event { event.SetStreamID("$type-"+eventType, e2) e3 := event.NewPtr(streamID, streamPos) - pkg, _, _ := strings.Cut(eventType, ".") event.SetStreamID("$pkg-"+pkg, e3) return []event.Event{e1, e2, e3} diff --git a/pkg/es/driver/projecter/projector_test.go b/pkg/es/driver/projecter/projector_test.go index c9deec2..942b621 100644 --- a/pkg/es/driver/projecter/projector_test.go +++ b/pkg/es/driver/projecter/projector_test.go @@ -39,6 +39,7 @@ type mockEventLog struct { onFirstIndex func(context.Context) (uint64, error) onLastIndex func(context.Context) (uint64, error) onRead func(context.Context, int64, int64) (event.Events, error) + onReadN func(context.Context, ...uint64) (event.Events, error) } // Append implements driver.EventLog @@ -73,6 +74,13 @@ func (m *mockEventLog) Read(ctx context.Context, pos int64, count int64) (event. panic("unimplemented") } +func (m *mockEventLog) ReadN(ctx context.Context, index ...uint64) (event.Events, error) { + if m.onReadN != nil { + return m.onReadN(ctx, index...) + } + + panic("unimplemented") +} var _ driver.EventLog = (*mockEventLog)(nil) diff --git a/pkg/es/driver/streamer/streamer.go b/pkg/es/driver/streamer/streamer.go index 206b417..5865b43 100644 --- a/pkg/es/driver/streamer/streamer.go +++ b/pkg/es/driver/streamer/streamer.go @@ -73,7 +73,7 @@ func (s *streamer) Subscribe(ctx context.Context, streamID string, start int64) }) sub.unsub = s.delete(streamID, sub) - return sub, s.state.Modify(ctx, func(state *state) error { + return sub, s.state.Modify(ctx, func(ctx context.Context, state *state) error { state.subscribers[streamID] = append(state.subscribers[streamID], sub) return nil }) @@ -82,13 +82,13 @@ func (s *streamer) Send(ctx context.Context, streamID string, events event.Event ctx, span := lg.Span(ctx) defer span.End() - return s.state.Modify(ctx, func(state *state) error { + return s.state.Modify(ctx, func(ctx context.Context, state *state) error { ctx, span := lg.Span(ctx) defer span.End() span.AddEvent(fmt.Sprint("subscribers=", len(state.subscribers[streamID]))) for _, sub := range state.subscribers[streamID] { - err := sub.position.Modify(ctx, func(position *position) error { + err := sub.position.Modify(ctx, func(ctx context.Context, position *position) error { _, span := lg.Span(ctx) defer span.End() @@ -116,7 +116,7 @@ func (s *streamer) delete(streamID string, sub *subscription) func(context.Conte if err := ctx.Err(); err != nil { return err } - return s.state.Modify(ctx, func(state *state) error { + return s.state.Modify(ctx, func(ctx context.Context, state *state) error { _, span := lg.Span(ctx) defer span.End() @@ -148,6 +148,12 @@ func (w *wrapper) Read(ctx context.Context, after int64, count int64) (event.Eve return w.up.Read(ctx, after, count) } +func (w *wrapper) ReadN(ctx context.Context, index ...uint64) (event.Events, error) { + ctx, span := lg.Span(ctx) + defer span.End() + + return w.up.ReadN(ctx, index...) +} func (w *wrapper) Append(ctx context.Context, events event.Events, version uint64) (uint64, error) { ctx, span := lg.Span(ctx) defer span.End() @@ -215,7 +221,7 @@ func (s *subscription) Recv(ctx context.Context) bool { var wait func(context.Context) bool - err := s.position.Modify(ctx, func(position *position) error { + err := s.position.Modify(ctx, func(ctx context.Context, position *position) error { _, span := lg.Span(ctx) defer span.End() @@ -263,7 +269,7 @@ func (s *subscription) Events(ctx context.Context) (event.Events, error) { defer span.End() var events event.Events - return events, s.position.Modify(ctx, func(position *position) error { + return events, s.position.Modify(ctx, func(ctx context.Context, position *position) error { ctx, span := lg.Span(ctx) defer span.End() diff --git a/pkg/es/es.go b/pkg/es/es.go index 45bc69d..9e3b10c 100644 --- a/pkg/es/es.go +++ b/pkg/es/es.go @@ -53,7 +53,7 @@ func Register(ctx context.Context, name string, d driver.Driver) error { ctx, span := lg.Span(ctx) defer span.End() - return drivers.Modify(ctx, func(c *config) error { + return drivers.Modify(ctx, func(ctx context.Context, c *config) error { if _, set := c.drivers[name]; set { return fmt.Errorf("driver %s already set", name) } @@ -189,6 +189,30 @@ func (es *EventStore) Read(ctx context.Context, streamID string, after, count in return events, err } +func (es *EventStore) ReadN(ctx context.Context, streamID string, index ...uint64) (event.Events, error) { + ctx, span := lg.Span(ctx) + defer span.End() + + lis := make([]int64, len(index)) + for i,j :=range index { + lis[i] = int64(j) + } + + span.SetAttributes( + attribute.String("streamID", streamID), + attribute.Int64Slice("index", lis), + ) + + l, err := es.Driver.EventLog(ctx, streamID) + if err != nil { + return nil, err + } + + events, err := l.ReadN(ctx, index...) + Mes_read.Add(ctx, events.Count()) + + return events, err +} func (es *EventStore) Append(ctx context.Context, streamID string, events event.Events) (uint64, error) { ctx, span := lg.Span(ctx) defer span.End() diff --git a/pkg/es/event/events.go b/pkg/es/event/events.go index 87f65ef..a3afd13 100644 --- a/pkg/es/event/events.go +++ b/pkg/es/event/events.go @@ -137,7 +137,7 @@ func (m Meta) Created() time.Time { func (m Meta) GetEventID() string { return m.EventID.String() } func Init(ctx context.Context) error { - return Register(ctx, NilEvent, &eventPtr{}) + return Register(ctx, NilEvent, &EventPtr{}) } type nilEvent struct{} @@ -156,40 +156,40 @@ func (e *nilEvent) UnmarshalBinary(b []byte) error { return json.Unmarshal(b, e) } -type eventPtr struct { - streamID string - pos uint64 +type EventPtr struct { + StreamID string `json:"stream_id"` + Pos uint64 `json:"pos"` eventMeta Meta } -var _ Event = (*eventPtr)(nil) +var _ Event = (*EventPtr)(nil) -func NewPtr(streamID string, pos uint64) *eventPtr { - return &eventPtr{streamID: streamID, pos: pos} +func NewPtr(streamID string, pos uint64) *EventPtr { + return &EventPtr{StreamID: streamID, Pos: pos} } // MarshalBinary implements Event -func (e *eventPtr) MarshalBinary() (data []byte, err error) { - return []byte(fmt.Sprintf("%s@%d", e.streamID, e.pos)), nil +func (e *EventPtr) MarshalBinary() (data []byte, err error) { + return []byte(fmt.Sprintf("%s@%d", e.StreamID, e.Pos)), nil } // UnmarshalBinary implements Event -func (e *eventPtr) UnmarshalBinary(data []byte) error { +func (e *EventPtr) UnmarshalBinary(data []byte) error { s := string(data) idx := strings.LastIndex(s, "@") if idx == -1 { return fmt.Errorf("missing @ in: %s", s) } - e.streamID = s[:idx] + e.StreamID = s[:idx] var err error - e.pos, err = strconv.ParseUint(s[idx+1:], 10, 64) + e.Pos, err = strconv.ParseUint(s[idx+1:], 10, 64) return err } // EventMeta implements Event -func (e *eventPtr) EventMeta() Meta { +func (e *EventPtr) EventMeta() Meta { if e == nil { return Meta{} } @@ -197,19 +197,19 @@ func (e *eventPtr) EventMeta() Meta { } // SetEventMeta implements Event -func (e *eventPtr) SetEventMeta(m Meta) { +func (e *EventPtr) SetEventMeta(m Meta) { if e == nil { return } e.eventMeta = m } -func (e *eventPtr) Values() any { +func (e *EventPtr) Values() any { return struct { StreamID string `json:"stream_id"` Pos uint64 `json:"pos"` }{ - e.streamID, - e.pos, + e.StreamID, + e.Pos, } } diff --git a/pkg/es/event/reflect.go b/pkg/es/event/reflect.go index 85a5094..7922178 100644 --- a/pkg/es/event/reflect.go +++ b/pkg/es/event/reflect.go @@ -106,7 +106,7 @@ func RegisterName(ctx context.Context, name string, e Event) error { span.AddEvent("register: " + name) - if err := eventTypes.Modify(ctx, func(c *config) error { + if err := eventTypes.Modify(ctx, func(ctx context.Context, c *config) error { _, span := lg.Span(ctx) defer span.End() @@ -124,7 +124,7 @@ func GetContainer(ctx context.Context, s string) Event { var e Event - eventTypes.Modify(ctx, func(c *config) error { + eventTypes.Modify(ctx, func(ctx context.Context,c *config) error { _, span := lg.Span(ctx) defer span.End() diff --git a/pkg/es/graph.go b/pkg/es/graph.go index 9d5b9b7..74faf2b 100644 --- a/pkg/es/graph.go +++ b/pkg/es/graph.go @@ -160,7 +160,7 @@ func (e *GQLEvent) Linked(ctx context.Context) (*GQLEvent, error) { return nil, nil } - events, err := gql.FromContext[contextKey, *EventStore](ctx, esKey).Read(ctx, streamID, int64(pos)-1, 1) + events, err := gql.FromContext[contextKey, *EventStore](ctx, esKey).ReadN(ctx, streamID, pos) return &GQLEvent{e: events.First()}, err } func (e *GQLEvent) IsEdge() {} diff --git a/pkg/locker/locker.go b/pkg/locker/locker.go index 521b596..b9ef6de 100644 --- a/pkg/locker/locker.go +++ b/pkg/locker/locker.go @@ -19,7 +19,7 @@ func New[T any](initial *T) *Locked[T] { } // Modify will call the function with the locked value -func (s *Locked[T]) Modify(ctx context.Context, fn func(*T) error) error { +func (s *Locked[T]) Modify(ctx context.Context, fn func(context.Context, *T) error) error { _, span := lg.Span(ctx) defer span.End() @@ -30,7 +30,7 @@ func (s *Locked[T]) Modify(ctx context.Context, fn func(*T) error) error { select { case state := <-s.state: defer func() { s.state <- state }() - return fn(state) + return fn(ctx, state) case <-ctx.Done(): return ctx.Err() } @@ -40,7 +40,7 @@ func (s *Locked[T]) Modify(ctx context.Context, fn func(*T) error) error { func (s *Locked[T]) Copy(ctx context.Context) (T, error) { var t T - err := s.Modify(ctx, func(c *T) error { + err := s.Modify(ctx, func(ctx context.Context, c *T) error { if c != nil { t = *c } diff --git a/pkg/locker/locker_test.go b/pkg/locker/locker_test.go index bad312b..387c348 100644 --- a/pkg/locker/locker_test.go +++ b/pkg/locker/locker_test.go @@ -22,7 +22,7 @@ func TestLocker(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - err := value.Modify(ctx, func(c *config) error { + err := value.Modify(ctx, func(ctx context.Context, c *config) error { c.Value = "one" c.Counter++ return nil @@ -37,7 +37,7 @@ func TestLocker(t *testing.T) { wait := make(chan struct{}) - go value.Modify(ctx, func(c *config) error { + go value.Modify(ctx, func(ctx context.Context, c *config) error { c.Value = "two" c.Counter++ close(wait) @@ -47,7 +47,7 @@ func TestLocker(t *testing.T) { <-wait cancel() - err = value.Modify(ctx, func(c *config) error { + err = value.Modify(ctx, func(ctx context.Context, c *config) error { c.Value = "three" c.Counter++ return nil