From 9dd9443bc96c3105b4af99e49c0ad49c04033434 Mon Sep 17 00:00:00 2001 From: Jon Lundy Date: Tue, 25 Oct 2022 20:15:57 -0600 Subject: [PATCH] chore: make connection paging more like standard --- app/msgbus/service.go | 4 +- internal/graph/generated/generated.go | 247 +++++++++++++++++++++++-- pkg/es/driver/disk-store/disk-store.go | 9 +- pkg/es/driver/driver.go | 2 +- pkg/es/driver/mem-store/mem-store.go | 6 +- pkg/es/driver/projecter/projecter.go | 4 +- pkg/es/driver/streamer/streamer.go | 4 +- pkg/es/es.go | 25 ++- pkg/es/es.graphqls | 4 + pkg/es/event/events.go | 3 + pkg/es/event/reflect.go | 21 ++- pkg/es/graph.go | 36 +++- pkg/gql/common.graphqls | 5 +- pkg/gql/connection.go | 21 ++- 14 files changed, 350 insertions(+), 41 deletions(-) diff --git a/app/msgbus/service.go b/app/msgbus/service.go index 5b8a5e0..c9b12fc 100644 --- a/app/msgbus/service.go +++ b/app/msgbus/service.go @@ -476,8 +476,8 @@ func (e *PostEvent) Values() any { } return struct { - Payload []byte - Tags []string + Payload []byte `json:"payload"` + Tags []string `json:"tags,omitempty"` }{ Payload: e.payload, Tags: e.tags, diff --git a/internal/graph/generated/generated.go b/internal/graph/generated/generated.go index 8d2efc3..c8d6943 100644 --- a/internal/graph/generated/generated.go +++ b/internal/graph/generated/generated.go @@ -59,9 +59,12 @@ type ComplexityRoot struct { Event struct { Bytes func(childComplexity int) int + Created func(childComplexity int) int EventID func(childComplexity int) int ID func(childComplexity int) int + Linked func(childComplexity int) int Meta func(childComplexity int) int + Type func(childComplexity int) int Values func(childComplexity int) int } @@ -163,6 +166,13 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in return e.complexity.Event.Bytes(childComplexity), true + case "Event.created": + if e.complexity.Event.Created == nil { + break + } + + return e.complexity.Event.Created(childComplexity), true + case "Event.eventID": if e.complexity.Event.EventID == nil { break @@ -177,6 +187,13 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in return e.complexity.Event.ID(childComplexity), true + case "Event.linked": + if e.complexity.Event.Linked == nil { + break + } + + return e.complexity.Event.Linked(childComplexity), true + case "Event.meta": if e.complexity.Event.Meta == nil { break @@ -184,6 +201,13 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in return e.complexity.Event.Meta(childComplexity), true + case "Event.type": + if e.complexity.Event.Type == nil { + break + } + + return e.complexity.Event.Type(childComplexity), true + case "Event.values": if e.complexity.Event.Values == nil { break @@ -497,7 +521,11 @@ type Event implements Edge @goModel(model: "github.com/sour-is/ev/pkg/es.GQLEven eventID: String! values: Map! bytes: String! + type: String! + created: Time! meta: Meta! + + linked: Event }`, BuiltIn: false}, {Name: "../../../pkg/gql/common.graphqls", Input: `scalar Time scalar Map @@ -507,8 +535,9 @@ type Connection @goModel(model: "github.com/sour-is/ev/pkg/gql.Connection") { edges: [Edge!]! } input PageInput @goModel(model: "github.com/sour-is/ev/pkg/gql.PageInput") { - idx: Int = 0 - count: Int = 30 + after: Int = 0 + before: Int + count: Int = 30 } type PageInfo @goModel(model: "github.com/sour-is/ev/pkg/gql.PageInfo") { next: Boolean! @@ -1053,6 +1082,94 @@ func (ec *executionContext) fieldContext_Event_bytes(ctx context.Context, field return fc, nil } +func (ec *executionContext) _Event_type(ctx context.Context, field graphql.CollectedField, obj *es.GQLEvent) (ret graphql.Marshaler) { + fc, err := ec.fieldContext_Event_type(ctx, field) + if err != nil { + return graphql.Null + } + ctx = graphql.WithFieldContext(ctx, fc) + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return obj.Type(), nil + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + if !graphql.HasFieldError(ctx, fc) { + ec.Errorf(ctx, "must not be null") + } + return graphql.Null + } + res := resTmp.(string) + fc.Result = res + return ec.marshalNString2string(ctx, field.Selections, res) +} + +func (ec *executionContext) fieldContext_Event_type(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "Event", + Field: field, + IsMethod: true, + IsResolver: false, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + return nil, errors.New("field of type String does not have child fields") + }, + } + return fc, nil +} + +func (ec *executionContext) _Event_created(ctx context.Context, field graphql.CollectedField, obj *es.GQLEvent) (ret graphql.Marshaler) { + fc, err := ec.fieldContext_Event_created(ctx, field) + if err != nil { + return graphql.Null + } + ctx = graphql.WithFieldContext(ctx, fc) + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return obj.Created(), nil + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + if !graphql.HasFieldError(ctx, fc) { + ec.Errorf(ctx, "must not be null") + } + return graphql.Null + } + res := resTmp.(time.Time) + fc.Result = res + return ec.marshalNTime2timeᚐTime(ctx, field.Selections, res) +} + +func (ec *executionContext) fieldContext_Event_created(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "Event", + Field: field, + IsMethod: true, + IsResolver: false, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + return nil, errors.New("field of type Time does not have child fields") + }, + } + return fc, nil +} + func (ec *executionContext) _Event_meta(ctx context.Context, field graphql.CollectedField, obj *es.GQLEvent) (ret graphql.Marshaler) { fc, err := ec.fieldContext_Event_meta(ctx, field) if err != nil { @@ -1107,6 +1224,65 @@ func (ec *executionContext) fieldContext_Event_meta(ctx context.Context, field g return fc, nil } +func (ec *executionContext) _Event_linked(ctx context.Context, field graphql.CollectedField, obj *es.GQLEvent) (ret graphql.Marshaler) { + fc, err := ec.fieldContext_Event_linked(ctx, field) + if err != nil { + return graphql.Null + } + ctx = graphql.WithFieldContext(ctx, fc) + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return obj.Linked(ctx) + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + return graphql.Null + } + res := resTmp.(*es.GQLEvent) + fc.Result = res + return ec.marshalOEvent2ᚖgithubᚗcomᚋsourᚑisᚋevᚋpkgᚋesᚐGQLEvent(ctx, field.Selections, res) +} + +func (ec *executionContext) fieldContext_Event_linked(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "Event", + Field: field, + IsMethod: true, + IsResolver: false, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + switch field.Name { + case "id": + return ec.fieldContext_Event_id(ctx, field) + case "eventID": + return ec.fieldContext_Event_eventID(ctx, field) + case "values": + return ec.fieldContext_Event_values(ctx, field) + case "bytes": + return ec.fieldContext_Event_bytes(ctx, field) + case "type": + return ec.fieldContext_Event_type(ctx, field) + case "created": + return ec.fieldContext_Event_created(ctx, field) + case "meta": + return ec.fieldContext_Event_meta(ctx, field) + case "linked": + return ec.fieldContext_Event_linked(ctx, field) + } + return nil, fmt.Errorf("no field named %q was found under type Event", field.Name) + }, + } + return fc, nil +} + func (ec *executionContext) _Meta_eventID(ctx context.Context, field graphql.CollectedField, obj *event.Meta) (ret graphql.Marshaler) { fc, err := ec.fieldContext_Meta_eventID(ctx, field) if err != nil { @@ -2298,8 +2474,14 @@ func (ec *executionContext) fieldContext_Subscription_eventAdded(ctx context.Con return ec.fieldContext_Event_values(ctx, field) case "bytes": return ec.fieldContext_Event_bytes(ctx, field) + case "type": + return ec.fieldContext_Event_type(ctx, field) + case "created": + return ec.fieldContext_Event_created(ctx, field) case "meta": return ec.fieldContext_Event_meta(ctx, field) + case "linked": + return ec.fieldContext_Event_linked(ctx, field) } return nil, fmt.Errorf("no field named %q was found under type Event", field.Name) }, @@ -4217,25 +4399,33 @@ func (ec *executionContext) unmarshalInputPageInput(ctx context.Context, obj int asMap[k] = v } - if _, present := asMap["idx"]; !present { - asMap["idx"] = 0 + if _, present := asMap["after"]; !present { + asMap["after"] = 0 } if _, present := asMap["count"]; !present { asMap["count"] = 30 } - fieldsInOrder := [...]string{"idx", "count"} + fieldsInOrder := [...]string{"after", "before", "count"} for _, k := range fieldsInOrder { v, ok := asMap[k] if !ok { continue } switch k { - case "idx": + case "after": var err error - ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("idx")) - it.Idx, err = ec.unmarshalOInt2ᚖint64(ctx, v) + ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("after")) + it.After, err = ec.unmarshalOInt2ᚖint64(ctx, v) + if err != nil { + return it, err + } + case "before": + var err error + + ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("before")) + it.Before, err = ec.unmarshalOInt2ᚖint64(ctx, v) if err != nil { return it, err } @@ -4330,36 +4520,67 @@ func (ec *executionContext) _Event(ctx context.Context, sel ast.SelectionSet, ob out.Values[i] = ec._Event_id(ctx, field, obj) if out.Values[i] == graphql.Null { - invalids++ + atomic.AddUint32(&invalids, 1) } case "eventID": out.Values[i] = ec._Event_eventID(ctx, field, obj) if out.Values[i] == graphql.Null { - invalids++ + atomic.AddUint32(&invalids, 1) } case "values": out.Values[i] = ec._Event_values(ctx, field, obj) if out.Values[i] == graphql.Null { - invalids++ + atomic.AddUint32(&invalids, 1) } case "bytes": out.Values[i] = ec._Event_bytes(ctx, field, obj) if out.Values[i] == graphql.Null { - invalids++ + atomic.AddUint32(&invalids, 1) + } + case "type": + + out.Values[i] = ec._Event_type(ctx, field, obj) + + if out.Values[i] == graphql.Null { + atomic.AddUint32(&invalids, 1) + } + case "created": + + out.Values[i] = ec._Event_created(ctx, field, obj) + + if out.Values[i] == graphql.Null { + atomic.AddUint32(&invalids, 1) } case "meta": out.Values[i] = ec._Event_meta(ctx, field, obj) if out.Values[i] == graphql.Null { - invalids++ + atomic.AddUint32(&invalids, 1) } + case "linked": + field := field + + innerFunc := func(ctx context.Context) (res graphql.Marshaler) { + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + } + }() + res = ec._Event_linked(ctx, field, obj) + return res + } + + out.Concurrently(i, func() graphql.Marshaler { + return innerFunc(ctx) + + }) default: panic("unknown field " + strconv.Quote(field.Name)) } diff --git a/pkg/es/driver/disk-store/disk-store.go b/pkg/es/driver/disk-store/disk-store.go index 8b5730a..0dd165d 100644 --- a/pkg/es/driver/disk-store/disk-store.go +++ b/pkg/es/driver/disk-store/disk-store.go @@ -4,6 +4,7 @@ package diskstore import ( "context" + "errors" "fmt" "os" "path/filepath" @@ -208,7 +209,7 @@ func (e *eventLog) Append(ctx context.Context, events event.Events, version uint return count, err } -func (e *eventLog) Read(ctx context.Context, pos, count int64) (event.Events, error) { +func (e *eventLog) Read(ctx context.Context, after, count int64) (event.Events, error) { _, span := lg.Span(ctx) defer span.End() @@ -233,7 +234,7 @@ func (e *eventLog) Read(ctx context.Context, pos, count int64) (event.Events, er return nil } - start, count := math.PagerBox(first, last, pos, count) + start, count := math.PagerBox(first, last, after, count) if count == 0 { return nil } @@ -246,6 +247,10 @@ func (e *eventLog) Read(ctx context.Context, pos, count int64) (event.Events, er var b []byte b, err = stream.Read(start) if err != nil { + if errors.Is(err, wal.ErrNotFound) || errors.Is(err, wal.ErrOutOfRange) { + err = fmt.Errorf("%w: empty", es.ErrNotFound) + } + span.RecordError(err) return err } diff --git a/pkg/es/driver/driver.go b/pkg/es/driver/driver.go index d00a178..acdf108 100644 --- a/pkg/es/driver/driver.go +++ b/pkg/es/driver/driver.go @@ -13,7 +13,7 @@ type Driver interface { } type EventLog interface { - Read(ctx context.Context, pos, count int64) (event.Events, error) + Read(ctx context.Context, after, count int64) (event.Events, error) Append(ctx context.Context, events event.Events, version uint64) (uint64, error) FirstIndex(context.Context) (uint64, error) LastIndex(context.Context) (uint64, error) diff --git a/pkg/es/driver/mem-store/mem-store.go b/pkg/es/driver/mem-store/mem-store.go index e7ff1c2..c9af2c4 100644 --- a/pkg/es/driver/mem-store/mem-store.go +++ b/pkg/es/driver/mem-store/mem-store.go @@ -112,7 +112,7 @@ func (m *eventLog) Append(ctx context.Context, events event.Events, version uint } // Read implements driver.EventStore -func (m *eventLog) Read(ctx context.Context, pos int64, count int64) (event.Events, error) { +func (m *eventLog) Read(ctx context.Context, after int64, count int64) (event.Events, error) { ctx, span := lg.Span(ctx) defer span.End() @@ -131,11 +131,11 @@ func (m *eventLog) Read(ctx context.Context, pos int64, count int64) (event.Even return nil } - start, count := math.PagerBox(first, last, pos, count) + start, count := math.PagerBox(first, last, after, count) if count == 0 { return nil } - span.AddEvent(fmt.Sprint("box", first, last, pos, count)) + span.AddEvent(fmt.Sprint("box", first, last, after, count)) events = make([]event.Event, math.Abs(count)) for i := range events { span.AddEvent(fmt.Sprintf("read event %d of %d", i, math.Abs(count))) diff --git a/pkg/es/driver/projecter/projecter.go b/pkg/es/driver/projecter/projecter.go index 5974727..8a4b7ce 100644 --- a/pkg/es/driver/projecter/projecter.go +++ b/pkg/es/driver/projecter/projecter.go @@ -47,11 +47,11 @@ type wrapper struct { var _ driver.EventLog = (*wrapper)(nil) -func (w *wrapper) Read(ctx context.Context, pos int64, count int64) (event.Events, error) { +func (w *wrapper) Read(ctx context.Context, after int64, count int64) (event.Events, error) { ctx, span := lg.Span(ctx) defer span.End() - return w.up.Read(ctx, pos, count) + return w.up.Read(ctx, after, count) } func (w *wrapper) Append(ctx context.Context, events event.Events, version uint64) (uint64, error) { ctx, span := lg.Span(ctx) diff --git a/pkg/es/driver/streamer/streamer.go b/pkg/es/driver/streamer/streamer.go index 156a3f8..206b417 100644 --- a/pkg/es/driver/streamer/streamer.go +++ b/pkg/es/driver/streamer/streamer.go @@ -142,11 +142,11 @@ type wrapper struct { var _ driver.EventLog = (*wrapper)(nil) -func (w *wrapper) Read(ctx context.Context, pos int64, count int64) (event.Events, error) { +func (w *wrapper) Read(ctx context.Context, after int64, count int64) (event.Events, error) { ctx, span := lg.Span(ctx) defer span.End() - return w.up.Read(ctx, pos, count) + return w.up.Read(ctx, after, count) } func (w *wrapper) Append(ctx context.Context, events event.Events, version uint64) (uint64, error) { ctx, span := lg.Span(ctx) diff --git a/pkg/es/es.go b/pkg/es/es.go index 5259791..45bc69d 100644 --- a/pkg/es/es.go +++ b/pkg/es/es.go @@ -121,7 +121,6 @@ func (es *EventStore) Save(ctx context.Context, agg event.Aggregate) (uint64, er return 0, nil } - Mes_save.Add(ctx, 1) span.SetAttributes( attribute.String("agg.type", event.TypeOf(agg)), attribute.String("agg.streamID", agg.StreamID()), @@ -137,6 +136,7 @@ func (es *EventStore) Save(ctx context.Context, agg event.Aggregate) (uint64, er if err != nil { return 0, err } + Mes_save.Add(ctx, int64(count)) agg.Commit() return count, err @@ -145,8 +145,6 @@ func (es *EventStore) Load(ctx context.Context, agg event.Aggregate) error { ctx, span := lg.Span(ctx) defer span.End() - Mes_load.Add(ctx, 1) - span.SetAttributes( attribute.String("agg.type", event.TypeOf(agg)), attribute.String("agg.streamID", agg.StreamID()), @@ -162,6 +160,7 @@ func (es *EventStore) Load(ctx context.Context, agg event.Aggregate) error { return err } + Mes_load.Add(ctx, events.Count()) event.Append(agg, events...) span.SetAttributes( @@ -170,20 +169,25 @@ func (es *EventStore) Load(ctx context.Context, agg event.Aggregate) error { return nil } -func (es *EventStore) Read(ctx context.Context, streamID string, pos, count int64) (event.Events, error) { +func (es *EventStore) Read(ctx context.Context, streamID string, after, count int64) (event.Events, error) { ctx, span := lg.Span(ctx) defer span.End() - Mes_read.Add(ctx, 1) span.SetAttributes( - attribute.String("ev.streamID", streamID), + attribute.String("streamID", streamID), + attribute.Int64("after", after), + attribute.Int64("count", count), ) l, err := es.Driver.EventLog(ctx, streamID) if err != nil { return nil, err } - return l.Read(ctx, pos, count) + + events, err := l.Read(ctx, after, count) + Mes_read.Add(ctx, events.Count()) + + return events, err } func (es *EventStore) Append(ctx context.Context, streamID string, events event.Events) (uint64, error) { ctx, span := lg.Span(ctx) @@ -203,6 +207,9 @@ func (es *EventStore) Append(ctx context.Context, streamID string, events event. func (es *EventStore) FirstIndex(ctx context.Context, streamID string) (uint64, error) { ctx, span := lg.Span(ctx) defer span.End() + span.SetAttributes( + attribute.String("ev.streamID", streamID), + ) l, err := es.Driver.EventLog(ctx, streamID) if err != nil { @@ -213,6 +220,9 @@ func (es *EventStore) FirstIndex(ctx context.Context, streamID string) (uint64, func (es *EventStore) LastIndex(ctx context.Context, streamID string) (uint64, error) { ctx, span := lg.Span(ctx) defer span.End() + span.SetAttributes( + attribute.String("ev.streamID", streamID), + ) l, err := es.Driver.EventLog(ctx, streamID) if err != nil { @@ -248,6 +258,7 @@ var ErrNoDriver = errors.New("no driver") var ErrWrongVersion = errors.New("wrong version") var ErrShouldExist = event.ErrShouldExist var ErrShouldNotExist = event.ErrShouldNotExist +var ErrNotFound = errors.New("not found") type PA[T any] interface { event.Aggregate diff --git a/pkg/es/es.graphqls b/pkg/es/es.graphqls index 59fc253..40dd28d 100644 --- a/pkg/es/es.graphqls +++ b/pkg/es/es.graphqls @@ -20,5 +20,9 @@ type Event implements Edge @goModel(model: "github.com/sour-is/ev/pkg/es.GQLEven eventID: String! values: Map! bytes: String! + type: String! + created: Time! meta: Meta! + + linked: Event } \ No newline at end of file diff --git a/pkg/es/event/events.go b/pkg/es/event/events.go index 740d05e..87f65ef 100644 --- a/pkg/es/event/events.go +++ b/pkg/es/event/events.go @@ -60,6 +60,9 @@ func (lis Events) StreamID() string { func (lis Events) SetStreamID(streamID string) { SetStreamID(streamID, lis...) } +func (lis Events) Count() int64 { + return int64(len(lis)) +} func (lis Events) First() Event { if len(lis) == 0 { return NilEvent diff --git a/pkg/es/event/reflect.go b/pkg/es/event/reflect.go index 2610d57..85a5094 100644 --- a/pkg/es/event/reflect.go +++ b/pkg/es/event/reflect.go @@ -261,11 +261,30 @@ func Values(e Event) map[string]any { continue } + omitempty := false field := v.FieldByIndex(idx.Index) name := idx.Name if n, ok := idx.Tag.Lookup("json"); ok { - name = n + var ( + opt string + found bool + ) + + name, opt, found = strings.Cut(n, ",") + if name == "-" { + continue + } + + if found { + if strings.Contains(opt, "omitempty") { + omitempty = true + } + } + } + + if omitempty && field.IsZero() { + continue } m[name] = field.Interface() diff --git a/pkg/es/graph.go b/pkg/es/graph.go index 68f4ded..9d5b9b7 100644 --- a/pkg/es/graph.go +++ b/pkg/es/graph.go @@ -2,6 +2,7 @@ package es import ( "context" + "errors" "fmt" "net/http" "time" @@ -15,13 +16,18 @@ type EventResolver interface { Events(ctx context.Context, streamID string, paging *gql.PageInput) (*gql.Connection, error) EventAdded(ctx context.Context, streamID string, after int64) (<-chan *GQLEvent, error) } +type contextKey struct { + name string +} + +var esKey = contextKey{"event-store"} func (es *EventStore) Events(ctx context.Context, streamID string, paging *gql.PageInput) (*gql.Connection, error) { ctx, span := lg.Span(ctx) defer span.End() lis, err := es.Read(ctx, streamID, paging.GetIdx(0), paging.GetCount(30)) - if err != nil { + if err != nil && !errors.Is(err, ErrNotFound) { span.RecordError(err) return nil, err } @@ -107,6 +113,14 @@ func (e *EventStore) EventAdded(ctx context.Context, streamID string, after int6 return ch, nil } func (*EventStore) RegisterHTTP(*http.ServeMux) {} +func (e *EventStore) GetMiddleware() func(http.Handler) http.Handler { + return func(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + r = r.WithContext(gql.ToContext(r.Context(), esKey, e)) + next.ServeHTTP(w, r) + }) + } +} type GQLEvent struct { e event.Event @@ -118,6 +132,12 @@ func (e *GQLEvent) ID() string { func (e *GQLEvent) EventID() string { return e.e.EventMeta().GetEventID() } +func (e *GQLEvent) Type() string { + return event.TypeOf(e.e) +} +func (e *GQLEvent) Created() time.Time { + return e.e.EventMeta().Created() +} func (e *GQLEvent) Values() map[string]interface{} { return event.Values(e.e) } @@ -129,4 +149,18 @@ func (e *GQLEvent) Meta() *event.Meta { meta := e.e.EventMeta() return &meta } +func (e *GQLEvent) Linked(ctx context.Context) (*GQLEvent, error) { + values := event.Values(e.e) + streamID, ok := values["stream_id"].(string) + if !ok { + return nil, nil + } + pos, ok := values["pos"].(uint64) + if !ok { + return nil, nil + } + + events, err := gql.FromContext[contextKey, *EventStore](ctx, esKey).Read(ctx, streamID, int64(pos)-1, 1) + return &GQLEvent{e: events.First()}, err +} func (e *GQLEvent) IsEdge() {} diff --git a/pkg/gql/common.graphqls b/pkg/gql/common.graphqls index f81640f..ee5dd00 100644 --- a/pkg/gql/common.graphqls +++ b/pkg/gql/common.graphqls @@ -6,8 +6,9 @@ type Connection @goModel(model: "github.com/sour-is/ev/pkg/gql.Connection") { edges: [Edge!]! } input PageInput @goModel(model: "github.com/sour-is/ev/pkg/gql.PageInput") { - idx: Int = 0 - count: Int = 30 + after: Int = 0 + before: Int + count: Int = 30 } type PageInfo @goModel(model: "github.com/sour-is/ev/pkg/gql.PageInfo") { next: Boolean! diff --git a/pkg/gql/connection.go b/pkg/gql/connection.go index 5923450..df9fb22 100644 --- a/pkg/gql/connection.go +++ b/pkg/gql/connection.go @@ -38,19 +38,30 @@ type PageInfo struct { } type PageInput struct { - Idx *int64 `json:"idx"` - Count *int64 `json:"count"` + After *int64 `json:"after"` + Before *int64 `json:"before"` + Count *int64 `json:"count"` } func (p *PageInput) GetIdx(v int64) int64 { - if p == nil || p.Idx == nil { - return v + if p == nil { + // pass + } else if p.Before != nil { + return (*p.Before) + } else if p.After != nil { + return *p.After } - return *p.Idx + + return v } func (p *PageInput) GetCount(v int64) int64 { if p == nil || p.Count == nil { return v + } else if p.Before != nil { + return -(*p.Count) + } else if p.After != nil { + return *p.Count } + return *p.Count }