chore: make connection paging more like standard

This commit is contained in:
Jon Lundy 2022-10-25 20:15:57 -06:00
parent 7ae2a8ad25
commit 9dd9443bc9
Signed by untrusted user who does not match committer: xuu
GPG Key ID: C63E6D61F3035024
14 changed files with 350 additions and 41 deletions

View File

@ -476,8 +476,8 @@ func (e *PostEvent) Values() any {
}
return struct {
Payload []byte
Tags []string
Payload []byte `json:"payload"`
Tags []string `json:"tags,omitempty"`
}{
Payload: e.payload,
Tags: e.tags,

View File

@ -59,9 +59,12 @@ type ComplexityRoot struct {
Event struct {
Bytes func(childComplexity int) int
Created func(childComplexity int) int
EventID func(childComplexity int) int
ID func(childComplexity int) int
Linked func(childComplexity int) int
Meta func(childComplexity int) int
Type func(childComplexity int) int
Values func(childComplexity int) int
}
@ -163,6 +166,13 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in
return e.complexity.Event.Bytes(childComplexity), true
case "Event.created":
if e.complexity.Event.Created == nil {
break
}
return e.complexity.Event.Created(childComplexity), true
case "Event.eventID":
if e.complexity.Event.EventID == nil {
break
@ -177,6 +187,13 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in
return e.complexity.Event.ID(childComplexity), true
case "Event.linked":
if e.complexity.Event.Linked == nil {
break
}
return e.complexity.Event.Linked(childComplexity), true
case "Event.meta":
if e.complexity.Event.Meta == nil {
break
@ -184,6 +201,13 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in
return e.complexity.Event.Meta(childComplexity), true
case "Event.type":
if e.complexity.Event.Type == nil {
break
}
return e.complexity.Event.Type(childComplexity), true
case "Event.values":
if e.complexity.Event.Values == nil {
break
@ -497,7 +521,11 @@ type Event implements Edge @goModel(model: "github.com/sour-is/ev/pkg/es.GQLEven
eventID: String!
values: Map!
bytes: String!
type: String!
created: Time!
meta: Meta!
linked: Event
}`, BuiltIn: false},
{Name: "../../../pkg/gql/common.graphqls", Input: `scalar Time
scalar Map
@ -507,7 +535,8 @@ type Connection @goModel(model: "github.com/sour-is/ev/pkg/gql.Connection") {
edges: [Edge!]!
}
input PageInput @goModel(model: "github.com/sour-is/ev/pkg/gql.PageInput") {
idx: Int = 0
after: Int = 0
before: Int
count: Int = 30
}
type PageInfo @goModel(model: "github.com/sour-is/ev/pkg/gql.PageInfo") {
@ -1053,6 +1082,94 @@ func (ec *executionContext) fieldContext_Event_bytes(ctx context.Context, field
return fc, nil
}
func (ec *executionContext) _Event_type(ctx context.Context, field graphql.CollectedField, obj *es.GQLEvent) (ret graphql.Marshaler) {
fc, err := ec.fieldContext_Event_type(ctx, field)
if err != nil {
return graphql.Null
}
ctx = graphql.WithFieldContext(ctx, fc)
defer func() {
if r := recover(); r != nil {
ec.Error(ctx, ec.Recover(ctx, r))
ret = graphql.Null
}
}()
resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) {
ctx = rctx // use context from middleware stack in children
return obj.Type(), nil
})
if err != nil {
ec.Error(ctx, err)
return graphql.Null
}
if resTmp == nil {
if !graphql.HasFieldError(ctx, fc) {
ec.Errorf(ctx, "must not be null")
}
return graphql.Null
}
res := resTmp.(string)
fc.Result = res
return ec.marshalNString2string(ctx, field.Selections, res)
}
func (ec *executionContext) fieldContext_Event_type(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) {
fc = &graphql.FieldContext{
Object: "Event",
Field: field,
IsMethod: true,
IsResolver: false,
Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) {
return nil, errors.New("field of type String does not have child fields")
},
}
return fc, nil
}
func (ec *executionContext) _Event_created(ctx context.Context, field graphql.CollectedField, obj *es.GQLEvent) (ret graphql.Marshaler) {
fc, err := ec.fieldContext_Event_created(ctx, field)
if err != nil {
return graphql.Null
}
ctx = graphql.WithFieldContext(ctx, fc)
defer func() {
if r := recover(); r != nil {
ec.Error(ctx, ec.Recover(ctx, r))
ret = graphql.Null
}
}()
resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) {
ctx = rctx // use context from middleware stack in children
return obj.Created(), nil
})
if err != nil {
ec.Error(ctx, err)
return graphql.Null
}
if resTmp == nil {
if !graphql.HasFieldError(ctx, fc) {
ec.Errorf(ctx, "must not be null")
}
return graphql.Null
}
res := resTmp.(time.Time)
fc.Result = res
return ec.marshalNTime2timeᚐTime(ctx, field.Selections, res)
}
func (ec *executionContext) fieldContext_Event_created(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) {
fc = &graphql.FieldContext{
Object: "Event",
Field: field,
IsMethod: true,
IsResolver: false,
Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) {
return nil, errors.New("field of type Time does not have child fields")
},
}
return fc, nil
}
func (ec *executionContext) _Event_meta(ctx context.Context, field graphql.CollectedField, obj *es.GQLEvent) (ret graphql.Marshaler) {
fc, err := ec.fieldContext_Event_meta(ctx, field)
if err != nil {
@ -1107,6 +1224,65 @@ func (ec *executionContext) fieldContext_Event_meta(ctx context.Context, field g
return fc, nil
}
func (ec *executionContext) _Event_linked(ctx context.Context, field graphql.CollectedField, obj *es.GQLEvent) (ret graphql.Marshaler) {
fc, err := ec.fieldContext_Event_linked(ctx, field)
if err != nil {
return graphql.Null
}
ctx = graphql.WithFieldContext(ctx, fc)
defer func() {
if r := recover(); r != nil {
ec.Error(ctx, ec.Recover(ctx, r))
ret = graphql.Null
}
}()
resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) {
ctx = rctx // use context from middleware stack in children
return obj.Linked(ctx)
})
if err != nil {
ec.Error(ctx, err)
return graphql.Null
}
if resTmp == nil {
return graphql.Null
}
res := resTmp.(*es.GQLEvent)
fc.Result = res
return ec.marshalOEvent2ᚖgithubᚗcomᚋsourᚑisᚋevᚋpkgᚋesᚐGQLEvent(ctx, field.Selections, res)
}
func (ec *executionContext) fieldContext_Event_linked(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) {
fc = &graphql.FieldContext{
Object: "Event",
Field: field,
IsMethod: true,
IsResolver: false,
Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) {
switch field.Name {
case "id":
return ec.fieldContext_Event_id(ctx, field)
case "eventID":
return ec.fieldContext_Event_eventID(ctx, field)
case "values":
return ec.fieldContext_Event_values(ctx, field)
case "bytes":
return ec.fieldContext_Event_bytes(ctx, field)
case "type":
return ec.fieldContext_Event_type(ctx, field)
case "created":
return ec.fieldContext_Event_created(ctx, field)
case "meta":
return ec.fieldContext_Event_meta(ctx, field)
case "linked":
return ec.fieldContext_Event_linked(ctx, field)
}
return nil, fmt.Errorf("no field named %q was found under type Event", field.Name)
},
}
return fc, nil
}
func (ec *executionContext) _Meta_eventID(ctx context.Context, field graphql.CollectedField, obj *event.Meta) (ret graphql.Marshaler) {
fc, err := ec.fieldContext_Meta_eventID(ctx, field)
if err != nil {
@ -2298,8 +2474,14 @@ func (ec *executionContext) fieldContext_Subscription_eventAdded(ctx context.Con
return ec.fieldContext_Event_values(ctx, field)
case "bytes":
return ec.fieldContext_Event_bytes(ctx, field)
case "type":
return ec.fieldContext_Event_type(ctx, field)
case "created":
return ec.fieldContext_Event_created(ctx, field)
case "meta":
return ec.fieldContext_Event_meta(ctx, field)
case "linked":
return ec.fieldContext_Event_linked(ctx, field)
}
return nil, fmt.Errorf("no field named %q was found under type Event", field.Name)
},
@ -4217,25 +4399,33 @@ func (ec *executionContext) unmarshalInputPageInput(ctx context.Context, obj int
asMap[k] = v
}
if _, present := asMap["idx"]; !present {
asMap["idx"] = 0
if _, present := asMap["after"]; !present {
asMap["after"] = 0
}
if _, present := asMap["count"]; !present {
asMap["count"] = 30
}
fieldsInOrder := [...]string{"idx", "count"}
fieldsInOrder := [...]string{"after", "before", "count"}
for _, k := range fieldsInOrder {
v, ok := asMap[k]
if !ok {
continue
}
switch k {
case "idx":
case "after":
var err error
ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("idx"))
it.Idx, err = ec.unmarshalOInt2ᚖint64(ctx, v)
ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("after"))
it.After, err = ec.unmarshalOInt2ᚖint64(ctx, v)
if err != nil {
return it, err
}
case "before":
var err error
ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("before"))
it.Before, err = ec.unmarshalOInt2ᚖint64(ctx, v)
if err != nil {
return it, err
}
@ -4330,36 +4520,67 @@ func (ec *executionContext) _Event(ctx context.Context, sel ast.SelectionSet, ob
out.Values[i] = ec._Event_id(ctx, field, obj)
if out.Values[i] == graphql.Null {
invalids++
atomic.AddUint32(&invalids, 1)
}
case "eventID":
out.Values[i] = ec._Event_eventID(ctx, field, obj)
if out.Values[i] == graphql.Null {
invalids++
atomic.AddUint32(&invalids, 1)
}
case "values":
out.Values[i] = ec._Event_values(ctx, field, obj)
if out.Values[i] == graphql.Null {
invalids++
atomic.AddUint32(&invalids, 1)
}
case "bytes":
out.Values[i] = ec._Event_bytes(ctx, field, obj)
if out.Values[i] == graphql.Null {
invalids++
atomic.AddUint32(&invalids, 1)
}
case "type":
out.Values[i] = ec._Event_type(ctx, field, obj)
if out.Values[i] == graphql.Null {
atomic.AddUint32(&invalids, 1)
}
case "created":
out.Values[i] = ec._Event_created(ctx, field, obj)
if out.Values[i] == graphql.Null {
atomic.AddUint32(&invalids, 1)
}
case "meta":
out.Values[i] = ec._Event_meta(ctx, field, obj)
if out.Values[i] == graphql.Null {
invalids++
atomic.AddUint32(&invalids, 1)
}
case "linked":
field := field
innerFunc := func(ctx context.Context) (res graphql.Marshaler) {
defer func() {
if r := recover(); r != nil {
ec.Error(ctx, ec.Recover(ctx, r))
}
}()
res = ec._Event_linked(ctx, field, obj)
return res
}
out.Concurrently(i, func() graphql.Marshaler {
return innerFunc(ctx)
})
default:
panic("unknown field " + strconv.Quote(field.Name))
}

View File

@ -4,6 +4,7 @@ package diskstore
import (
"context"
"errors"
"fmt"
"os"
"path/filepath"
@ -208,7 +209,7 @@ func (e *eventLog) Append(ctx context.Context, events event.Events, version uint
return count, err
}
func (e *eventLog) Read(ctx context.Context, pos, count int64) (event.Events, error) {
func (e *eventLog) Read(ctx context.Context, after, count int64) (event.Events, error) {
_, span := lg.Span(ctx)
defer span.End()
@ -233,7 +234,7 @@ func (e *eventLog) Read(ctx context.Context, pos, count int64) (event.Events, er
return nil
}
start, count := math.PagerBox(first, last, pos, count)
start, count := math.PagerBox(first, last, after, count)
if count == 0 {
return nil
}
@ -246,6 +247,10 @@ func (e *eventLog) Read(ctx context.Context, pos, count int64) (event.Events, er
var b []byte
b, err = stream.Read(start)
if err != nil {
if errors.Is(err, wal.ErrNotFound) || errors.Is(err, wal.ErrOutOfRange) {
err = fmt.Errorf("%w: empty", es.ErrNotFound)
}
span.RecordError(err)
return err
}

View File

@ -13,7 +13,7 @@ type Driver interface {
}
type EventLog interface {
Read(ctx context.Context, pos, count int64) (event.Events, error)
Read(ctx context.Context, after, count int64) (event.Events, error)
Append(ctx context.Context, events event.Events, version uint64) (uint64, error)
FirstIndex(context.Context) (uint64, error)
LastIndex(context.Context) (uint64, error)

View File

@ -112,7 +112,7 @@ func (m *eventLog) Append(ctx context.Context, events event.Events, version uint
}
// Read implements driver.EventStore
func (m *eventLog) Read(ctx context.Context, pos int64, count int64) (event.Events, error) {
func (m *eventLog) Read(ctx context.Context, after int64, count int64) (event.Events, error) {
ctx, span := lg.Span(ctx)
defer span.End()
@ -131,11 +131,11 @@ func (m *eventLog) Read(ctx context.Context, pos int64, count int64) (event.Even
return nil
}
start, count := math.PagerBox(first, last, pos, count)
start, count := math.PagerBox(first, last, after, count)
if count == 0 {
return nil
}
span.AddEvent(fmt.Sprint("box", first, last, pos, count))
span.AddEvent(fmt.Sprint("box", first, last, after, count))
events = make([]event.Event, math.Abs(count))
for i := range events {
span.AddEvent(fmt.Sprintf("read event %d of %d", i, math.Abs(count)))

View File

@ -47,11 +47,11 @@ type wrapper struct {
var _ driver.EventLog = (*wrapper)(nil)
func (w *wrapper) Read(ctx context.Context, pos int64, count int64) (event.Events, error) {
func (w *wrapper) Read(ctx context.Context, after int64, count int64) (event.Events, error) {
ctx, span := lg.Span(ctx)
defer span.End()
return w.up.Read(ctx, pos, count)
return w.up.Read(ctx, after, count)
}
func (w *wrapper) Append(ctx context.Context, events event.Events, version uint64) (uint64, error) {
ctx, span := lg.Span(ctx)

View File

@ -142,11 +142,11 @@ type wrapper struct {
var _ driver.EventLog = (*wrapper)(nil)
func (w *wrapper) Read(ctx context.Context, pos int64, count int64) (event.Events, error) {
func (w *wrapper) Read(ctx context.Context, after int64, count int64) (event.Events, error) {
ctx, span := lg.Span(ctx)
defer span.End()
return w.up.Read(ctx, pos, count)
return w.up.Read(ctx, after, count)
}
func (w *wrapper) Append(ctx context.Context, events event.Events, version uint64) (uint64, error) {
ctx, span := lg.Span(ctx)

View File

@ -121,7 +121,6 @@ func (es *EventStore) Save(ctx context.Context, agg event.Aggregate) (uint64, er
return 0, nil
}
Mes_save.Add(ctx, 1)
span.SetAttributes(
attribute.String("agg.type", event.TypeOf(agg)),
attribute.String("agg.streamID", agg.StreamID()),
@ -137,6 +136,7 @@ func (es *EventStore) Save(ctx context.Context, agg event.Aggregate) (uint64, er
if err != nil {
return 0, err
}
Mes_save.Add(ctx, int64(count))
agg.Commit()
return count, err
@ -145,8 +145,6 @@ func (es *EventStore) Load(ctx context.Context, agg event.Aggregate) error {
ctx, span := lg.Span(ctx)
defer span.End()
Mes_load.Add(ctx, 1)
span.SetAttributes(
attribute.String("agg.type", event.TypeOf(agg)),
attribute.String("agg.streamID", agg.StreamID()),
@ -162,6 +160,7 @@ func (es *EventStore) Load(ctx context.Context, agg event.Aggregate) error {
return err
}
Mes_load.Add(ctx, events.Count())
event.Append(agg, events...)
span.SetAttributes(
@ -170,20 +169,25 @@ func (es *EventStore) Load(ctx context.Context, agg event.Aggregate) error {
return nil
}
func (es *EventStore) Read(ctx context.Context, streamID string, pos, count int64) (event.Events, error) {
func (es *EventStore) Read(ctx context.Context, streamID string, after, count int64) (event.Events, error) {
ctx, span := lg.Span(ctx)
defer span.End()
Mes_read.Add(ctx, 1)
span.SetAttributes(
attribute.String("ev.streamID", streamID),
attribute.String("streamID", streamID),
attribute.Int64("after", after),
attribute.Int64("count", count),
)
l, err := es.Driver.EventLog(ctx, streamID)
if err != nil {
return nil, err
}
return l.Read(ctx, pos, count)
events, err := l.Read(ctx, after, count)
Mes_read.Add(ctx, events.Count())
return events, err
}
func (es *EventStore) Append(ctx context.Context, streamID string, events event.Events) (uint64, error) {
ctx, span := lg.Span(ctx)
@ -203,6 +207,9 @@ func (es *EventStore) Append(ctx context.Context, streamID string, events event.
func (es *EventStore) FirstIndex(ctx context.Context, streamID string) (uint64, error) {
ctx, span := lg.Span(ctx)
defer span.End()
span.SetAttributes(
attribute.String("ev.streamID", streamID),
)
l, err := es.Driver.EventLog(ctx, streamID)
if err != nil {
@ -213,6 +220,9 @@ func (es *EventStore) FirstIndex(ctx context.Context, streamID string) (uint64,
func (es *EventStore) LastIndex(ctx context.Context, streamID string) (uint64, error) {
ctx, span := lg.Span(ctx)
defer span.End()
span.SetAttributes(
attribute.String("ev.streamID", streamID),
)
l, err := es.Driver.EventLog(ctx, streamID)
if err != nil {
@ -248,6 +258,7 @@ var ErrNoDriver = errors.New("no driver")
var ErrWrongVersion = errors.New("wrong version")
var ErrShouldExist = event.ErrShouldExist
var ErrShouldNotExist = event.ErrShouldNotExist
var ErrNotFound = errors.New("not found")
type PA[T any] interface {
event.Aggregate

View File

@ -20,5 +20,9 @@ type Event implements Edge @goModel(model: "github.com/sour-is/ev/pkg/es.GQLEven
eventID: String!
values: Map!
bytes: String!
type: String!
created: Time!
meta: Meta!
linked: Event
}

View File

@ -60,6 +60,9 @@ func (lis Events) StreamID() string {
func (lis Events) SetStreamID(streamID string) {
SetStreamID(streamID, lis...)
}
func (lis Events) Count() int64 {
return int64(len(lis))
}
func (lis Events) First() Event {
if len(lis) == 0 {
return NilEvent

View File

@ -261,11 +261,30 @@ func Values(e Event) map[string]any {
continue
}
omitempty := false
field := v.FieldByIndex(idx.Index)
name := idx.Name
if n, ok := idx.Tag.Lookup("json"); ok {
name = n
var (
opt string
found bool
)
name, opt, found = strings.Cut(n, ",")
if name == "-" {
continue
}
if found {
if strings.Contains(opt, "omitempty") {
omitempty = true
}
}
}
if omitempty && field.IsZero() {
continue
}
m[name] = field.Interface()

View File

@ -2,6 +2,7 @@ package es
import (
"context"
"errors"
"fmt"
"net/http"
"time"
@ -15,13 +16,18 @@ type EventResolver interface {
Events(ctx context.Context, streamID string, paging *gql.PageInput) (*gql.Connection, error)
EventAdded(ctx context.Context, streamID string, after int64) (<-chan *GQLEvent, error)
}
type contextKey struct {
name string
}
var esKey = contextKey{"event-store"}
func (es *EventStore) Events(ctx context.Context, streamID string, paging *gql.PageInput) (*gql.Connection, error) {
ctx, span := lg.Span(ctx)
defer span.End()
lis, err := es.Read(ctx, streamID, paging.GetIdx(0), paging.GetCount(30))
if err != nil {
if err != nil && !errors.Is(err, ErrNotFound) {
span.RecordError(err)
return nil, err
}
@ -107,6 +113,14 @@ func (e *EventStore) EventAdded(ctx context.Context, streamID string, after int6
return ch, nil
}
func (*EventStore) RegisterHTTP(*http.ServeMux) {}
func (e *EventStore) GetMiddleware() func(http.Handler) http.Handler {
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
r = r.WithContext(gql.ToContext(r.Context(), esKey, e))
next.ServeHTTP(w, r)
})
}
}
type GQLEvent struct {
e event.Event
@ -118,6 +132,12 @@ func (e *GQLEvent) ID() string {
func (e *GQLEvent) EventID() string {
return e.e.EventMeta().GetEventID()
}
func (e *GQLEvent) Type() string {
return event.TypeOf(e.e)
}
func (e *GQLEvent) Created() time.Time {
return e.e.EventMeta().Created()
}
func (e *GQLEvent) Values() map[string]interface{} {
return event.Values(e.e)
}
@ -129,4 +149,18 @@ func (e *GQLEvent) Meta() *event.Meta {
meta := e.e.EventMeta()
return &meta
}
func (e *GQLEvent) Linked(ctx context.Context) (*GQLEvent, error) {
values := event.Values(e.e)
streamID, ok := values["stream_id"].(string)
if !ok {
return nil, nil
}
pos, ok := values["pos"].(uint64)
if !ok {
return nil, nil
}
events, err := gql.FromContext[contextKey, *EventStore](ctx, esKey).Read(ctx, streamID, int64(pos)-1, 1)
return &GQLEvent{e: events.First()}, err
}
func (e *GQLEvent) IsEdge() {}

View File

@ -6,7 +6,8 @@ type Connection @goModel(model: "github.com/sour-is/ev/pkg/gql.Connection") {
edges: [Edge!]!
}
input PageInput @goModel(model: "github.com/sour-is/ev/pkg/gql.PageInput") {
idx: Int = 0
after: Int = 0
before: Int
count: Int = 30
}
type PageInfo @goModel(model: "github.com/sour-is/ev/pkg/gql.PageInfo") {

View File

@ -38,19 +38,30 @@ type PageInfo struct {
}
type PageInput struct {
Idx *int64 `json:"idx"`
After *int64 `json:"after"`
Before *int64 `json:"before"`
Count *int64 `json:"count"`
}
func (p *PageInput) GetIdx(v int64) int64 {
if p == nil || p.Idx == nil {
return v
if p == nil {
// pass
} else if p.Before != nil {
return (*p.Before)
} else if p.After != nil {
return *p.After
}
return *p.Idx
return v
}
func (p *PageInput) GetCount(v int64) int64 {
if p == nil || p.Count == nil {
return v
}
} else if p.Before != nil {
return -(*p.Count)
} else if p.After != nil {
return *p.Count
}
return *p.Count
}