diff --git a/app/gql/resolver.go b/app/gql/resolver.go index 658e391..92cd2d4 100644 --- a/app/gql/resolver.go +++ b/app/gql/resolver.go @@ -10,13 +10,13 @@ import ( "go.sour.is/ev/app/msgbus" "go.sour.is/ev/app/salty" "go.sour.is/ev/internal/graph/generated" - "go.sour.is/ev/pkg/es" + gql_es "go.sour.is/ev/pkg/gql" ) type Resolver struct { msgbus.MsgbusResolver salty.SaltyResolver - es.EventResolver + gql_es.EventResolver } // Query returns generated.QueryResolver implementation. @@ -40,7 +40,7 @@ type noop struct{} var _ msgbus.MsgbusResolver = (*noop)(nil) var _ salty.SaltyResolver = (*noop)(nil) -var _ es.EventResolver = (*noop)(nil) +var _ gql_es.EventResolver = (*noop)(nil) func (*noop) IsResolver() {} func (*noop) CreateSaltyUser(ctx context.Context, nick string, pubkey string) (*salty.SaltyUser, error) { @@ -58,7 +58,7 @@ func (*noop) PostAdded(ctx context.Context, name, tag string, after int64) (<-ch func (*noop) Events(ctx context.Context, streamID string, paging *gql.PageInput) (*gql.Connection, error) { panic("not implemented") } -func (*noop) EventAdded(ctx context.Context, streamID string, after int64) (<-chan *es.GQLEvent, error) { +func (*noop) EventAdded(ctx context.Context, streamID string, after int64) (<-chan *gql_es.Event, error) { panic("not implemented") } func (*noop) TruncateStream(ctx context.Context, streamID string, index int64) (bool, error) { diff --git a/app/twtxt/twtxt.go b/app/twtxt/twtxt.go index 0dc0402..93bed82 100644 --- a/app/twtxt/twtxt.go +++ b/app/twtxt/twtxt.go @@ -1 +1 @@ -package twtxt \ No newline at end of file +package twtxt diff --git a/cmd/ev/svc.es.go b/cmd/ev/svc.es.go index 19d0101..9d808b7 100644 --- a/cmd/ev/svc.es.go +++ b/cmd/ev/svc.es.go @@ -14,8 +14,8 @@ import ( "go.sour.is/ev/pkg/driver/projecter" resolvelinks "go.sour.is/ev/pkg/driver/resolve-links" "go.sour.is/ev/pkg/driver/streamer" - "go.sour.is/ev/pkg/es" "go.sour.is/ev/pkg/event" + gql_ev "go.sour.is/ev/pkg/gql" ) var _ = apps.Register(10, func(ctx context.Context, svc *service.Harness) error { @@ -48,7 +48,7 @@ var _ = apps.Register(10, func(ctx context.Context, svc *service.Harness) error span.RecordError(err) return err } - svc.Add(eventstore, &es.EventStore{EventStore: eventstore}) + svc.Add(eventstore, &gql_ev.EventStore{EventStore: eventstore}) return nil }) diff --git a/cmd/webfinger/webfinger_e2e_test.go b/cmd/webfinger/webfinger_e2e_test.go index 5441bd2..dd370cb 100644 --- a/cmd/webfinger/webfinger_e2e_test.go +++ b/cmd/webfinger/webfinger_e2e_test.go @@ -1,4 +1,5 @@ //go:build ignore + package main import ( diff --git a/internal/graph/generated/generated.go b/internal/graph/generated/generated.go index 5e2067b..f4ad281 100644 --- a/internal/graph/generated/generated.go +++ b/internal/graph/generated/generated.go @@ -20,8 +20,8 @@ import ( "github.com/vektah/gqlparser/v2/ast" "go.sour.is/ev/app/msgbus" "go.sour.is/ev/app/salty" - "go.sour.is/ev/pkg/es" "go.sour.is/ev/pkg/event" + gql_ev "go.sour.is/ev/pkg/gql" "go.sour.is/pkg/gql" ) @@ -130,7 +130,7 @@ type QueryResolver interface { SaltyUser(ctx context.Context, nick string) (*salty.SaltyUser, error) } type SubscriptionResolver interface { - EventAdded(ctx context.Context, streamID string, after int64) (<-chan *es.GQLEvent, error) + EventAdded(ctx context.Context, streamID string, after int64) (<-chan *gql_ev.Event, error) PostAdded(ctx context.Context, name string, tag string, after int64) (<-chan *msgbus.PostEvent, error) } @@ -566,7 +566,7 @@ func (ec *executionContext) introspectType(name string) (*introspection.Type, er } var sources = []*ast.Source{ - {Name: "../../../pkg/es/es.graphqls", Input: ` + {Name: "../../../pkg/gql/eventstore.graphqls", Input: ` type Meta @goModel(model: "go.sour.is/ev/pkg/event.Meta") { eventID: String! @goField(name: "getEventID") streamID: String! @goField(name: "ActualStreamID") @@ -585,7 +585,7 @@ extend type Subscription { eventAdded(streamID: String! after: Int! = -1): Event } -type Event implements Edge @goModel(model: "go.sour.is/ev/pkg/es.GQLEvent") { +type Event implements Edge @goModel(model: "go.sour.is/ev/pkg/gql.Event") { id: ID! eventID: String! @@ -1027,7 +1027,7 @@ func (ec *executionContext) fieldContext_Connection_edges(ctx context.Context, f return fc, nil } -func (ec *executionContext) _Event_id(ctx context.Context, field graphql.CollectedField, obj *es.GQLEvent) (ret graphql.Marshaler) { +func (ec *executionContext) _Event_id(ctx context.Context, field graphql.CollectedField, obj *gql_ev.Event) (ret graphql.Marshaler) { fc, err := ec.fieldContext_Event_id(ctx, field) if err != nil { return graphql.Null @@ -1071,7 +1071,7 @@ func (ec *executionContext) fieldContext_Event_id(ctx context.Context, field gra return fc, nil } -func (ec *executionContext) _Event_eventID(ctx context.Context, field graphql.CollectedField, obj *es.GQLEvent) (ret graphql.Marshaler) { +func (ec *executionContext) _Event_eventID(ctx context.Context, field graphql.CollectedField, obj *gql_ev.Event) (ret graphql.Marshaler) { fc, err := ec.fieldContext_Event_eventID(ctx, field) if err != nil { return graphql.Null @@ -1115,7 +1115,7 @@ func (ec *executionContext) fieldContext_Event_eventID(ctx context.Context, fiel return fc, nil } -func (ec *executionContext) _Event_streamID(ctx context.Context, field graphql.CollectedField, obj *es.GQLEvent) (ret graphql.Marshaler) { +func (ec *executionContext) _Event_streamID(ctx context.Context, field graphql.CollectedField, obj *gql_ev.Event) (ret graphql.Marshaler) { fc, err := ec.fieldContext_Event_streamID(ctx, field) if err != nil { return graphql.Null @@ -1159,7 +1159,7 @@ func (ec *executionContext) fieldContext_Event_streamID(ctx context.Context, fie return fc, nil } -func (ec *executionContext) _Event_position(ctx context.Context, field graphql.CollectedField, obj *es.GQLEvent) (ret graphql.Marshaler) { +func (ec *executionContext) _Event_position(ctx context.Context, field graphql.CollectedField, obj *gql_ev.Event) (ret graphql.Marshaler) { fc, err := ec.fieldContext_Event_position(ctx, field) if err != nil { return graphql.Null @@ -1203,7 +1203,7 @@ func (ec *executionContext) fieldContext_Event_position(ctx context.Context, fie return fc, nil } -func (ec *executionContext) _Event_values(ctx context.Context, field graphql.CollectedField, obj *es.GQLEvent) (ret graphql.Marshaler) { +func (ec *executionContext) _Event_values(ctx context.Context, field graphql.CollectedField, obj *gql_ev.Event) (ret graphql.Marshaler) { fc, err := ec.fieldContext_Event_values(ctx, field) if err != nil { return graphql.Null @@ -1247,7 +1247,7 @@ func (ec *executionContext) fieldContext_Event_values(ctx context.Context, field return fc, nil } -func (ec *executionContext) _Event_bytes(ctx context.Context, field graphql.CollectedField, obj *es.GQLEvent) (ret graphql.Marshaler) { +func (ec *executionContext) _Event_bytes(ctx context.Context, field graphql.CollectedField, obj *gql_ev.Event) (ret graphql.Marshaler) { fc, err := ec.fieldContext_Event_bytes(ctx, field) if err != nil { return graphql.Null @@ -1291,7 +1291,7 @@ func (ec *executionContext) fieldContext_Event_bytes(ctx context.Context, field return fc, nil } -func (ec *executionContext) _Event_type(ctx context.Context, field graphql.CollectedField, obj *es.GQLEvent) (ret graphql.Marshaler) { +func (ec *executionContext) _Event_type(ctx context.Context, field graphql.CollectedField, obj *gql_ev.Event) (ret graphql.Marshaler) { fc, err := ec.fieldContext_Event_type(ctx, field) if err != nil { return graphql.Null @@ -1335,7 +1335,7 @@ func (ec *executionContext) fieldContext_Event_type(ctx context.Context, field g return fc, nil } -func (ec *executionContext) _Event_created(ctx context.Context, field graphql.CollectedField, obj *es.GQLEvent) (ret graphql.Marshaler) { +func (ec *executionContext) _Event_created(ctx context.Context, field graphql.CollectedField, obj *gql_ev.Event) (ret graphql.Marshaler) { fc, err := ec.fieldContext_Event_created(ctx, field) if err != nil { return graphql.Null @@ -1379,7 +1379,7 @@ func (ec *executionContext) fieldContext_Event_created(ctx context.Context, fiel return fc, nil } -func (ec *executionContext) _Event_meta(ctx context.Context, field graphql.CollectedField, obj *es.GQLEvent) (ret graphql.Marshaler) { +func (ec *executionContext) _Event_meta(ctx context.Context, field graphql.CollectedField, obj *gql_ev.Event) (ret graphql.Marshaler) { fc, err := ec.fieldContext_Event_meta(ctx, field) if err != nil { return graphql.Null @@ -1433,7 +1433,7 @@ func (ec *executionContext) fieldContext_Event_meta(ctx context.Context, field g return fc, nil } -func (ec *executionContext) _Event_linked(ctx context.Context, field graphql.CollectedField, obj *es.GQLEvent) (ret graphql.Marshaler) { +func (ec *executionContext) _Event_linked(ctx context.Context, field graphql.CollectedField, obj *gql_ev.Event) (ret graphql.Marshaler) { fc, err := ec.fieldContext_Event_linked(ctx, field) if err != nil { return graphql.Null @@ -1456,9 +1456,9 @@ func (ec *executionContext) _Event_linked(ctx context.Context, field graphql.Col if resTmp == nil { return graphql.Null } - res := resTmp.(*es.GQLEvent) + res := resTmp.(*gql_ev.Event) fc.Result = res - return ec.marshalOEvent2ᚖgoᚗsourᚗisᚋevᚋpkgᚋesᚐGQLEvent(ctx, field.Selections, res) + return ec.marshalOEvent2ᚖgoᚗsourᚗisᚋevᚋpkgᚋgqlᚐEvent(ctx, field.Selections, res) } func (ec *executionContext) fieldContext_Event_linked(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { @@ -2709,7 +2709,7 @@ func (ec *executionContext) _Subscription_eventAdded(ctx context.Context, field } return func(ctx context.Context) graphql.Marshaler { select { - case res, ok := <-resTmp.(<-chan *es.GQLEvent): + case res, ok := <-resTmp.(<-chan *gql_ev.Event): if !ok { return nil } @@ -2717,7 +2717,7 @@ func (ec *executionContext) _Subscription_eventAdded(ctx context.Context, field w.Write([]byte{'{'}) graphql.MarshalString(field.Alias).MarshalGQL(w) w.Write([]byte{':'}) - ec.marshalOEvent2ᚖgoᚗsourᚗisᚋevᚋpkgᚋesᚐGQLEvent(ctx, field.Selections, res).MarshalGQL(w) + ec.marshalOEvent2ᚖgoᚗsourᚗisᚋevᚋpkgᚋgqlᚐEvent(ctx, field.Selections, res).MarshalGQL(w) w.Write([]byte{'}'}) }) case <-ctx.Done(): @@ -4726,7 +4726,7 @@ func (ec *executionContext) _Edge(ctx context.Context, sel ast.SelectionSet, obj switch obj := (obj).(type) { case nil: return graphql.Null - case *es.GQLEvent: + case *gql_ev.Event: if obj == nil { return graphql.Null } @@ -4791,7 +4791,7 @@ func (ec *executionContext) _Connection(ctx context.Context, sel ast.SelectionSe var eventImplementors = []string{"Event", "Edge"} -func (ec *executionContext) _Event(ctx context.Context, sel ast.SelectionSet, obj *es.GQLEvent) graphql.Marshaler { +func (ec *executionContext) _Event(ctx context.Context, sel ast.SelectionSet, obj *gql_ev.Event) graphql.Marshaler { fields := graphql.CollectFields(ec.OperationContext, sel, eventImplementors) out := graphql.NewFieldSet(fields) @@ -6284,7 +6284,7 @@ func (ec *executionContext) marshalOBoolean2ᚖbool(ctx context.Context, sel ast return res } -func (ec *executionContext) marshalOEvent2ᚖgoᚗsourᚗisᚋevᚋpkgᚋesᚐGQLEvent(ctx context.Context, sel ast.SelectionSet, v *es.GQLEvent) graphql.Marshaler { +func (ec *executionContext) marshalOEvent2ᚖgoᚗsourᚗisᚋevᚋpkgᚋgqlᚐEvent(ctx context.Context, sel ast.SelectionSet, v *gql_ev.Event) graphql.Marshaler { if v == nil { return graphql.Null } diff --git a/internal/graph/resolver/resolver.go b/internal/graph/resolver/resolver.go index 769ef98..d5495e5 100644 --- a/internal/graph/resolver/resolver.go +++ b/internal/graph/resolver/resolver.go @@ -10,7 +10,7 @@ import ( "go.sour.is/ev/app/msgbus" "go.sour.is/ev/app/salty" "go.sour.is/ev/internal/graph/generated" - "go.sour.is/ev/pkg/es" + gql_es "go.sour.is/ev/pkg/gql" ) type Resolver struct{} @@ -41,7 +41,7 @@ func (r *queryResolver) SaltyUser(ctx context.Context, nick string) (*salty.Salt } // // foo -func (r *subscriptionResolver) EventAdded(ctx context.Context, streamID string, after int64) (<-chan *es.GQLEvent, error) { +func (r *subscriptionResolver) EventAdded(ctx context.Context, streamID string, after int64) (<-chan *gql_es.Event, error) { panic("not implemented") } diff --git a/pkg/driver/disk-store/disk-store.go b/pkg/driver/disk-store/disk-store.go index afc690d..e405cb3 100644 --- a/pkg/driver/disk-store/disk-store.go +++ b/pkg/driver/disk-store/disk-store.go @@ -19,7 +19,6 @@ import ( "go.sour.is/pkg/cache" "go.sour.is/pkg/lg" "go.sour.is/pkg/locker" - "go.sour.is/pkg/math" "go.sour.is/ev" "go.sour.is/ev/pkg/driver" @@ -28,6 +27,9 @@ import ( const CachSize = 1000 +const AppendOnly = ev.AppendOnly +const AllEvents = ev.AllEvents + type lockedWal = locker.Locked[wal.Log] type openlogs struct { logs *cache.Cache[string, *lockedWal] @@ -42,8 +44,7 @@ type diskStore struct { m_disk_write metric.Int64Counter } -const AppendOnly = ev.AppendOnly -const AllEvents = ev.AllEvents +var _ driver.Driver = (*diskStore)(nil) func Init(ctx context.Context) error { ctx, span := lg.Span(ctx) @@ -71,8 +72,6 @@ func Init(ctx context.Context) error { return errs } -var _ driver.Driver = (*diskStore)(nil) - func (d *diskStore) Open(ctx context.Context, dsn string) (driver.Driver, error) { _, span := lg.Span(ctx) defer span.End() @@ -237,84 +236,6 @@ func (e *eventLog) Append(ctx context.Context, events event.Events, version uint return count, err } -func (e *eventLog) Read(ctx context.Context, after, count int64) (event.Events, error) { - ctx, span := lg.Span(ctx) - defer span.End() - span.SetAttributes( - attribute.Int64("args.after", after), - attribute.Int64("args.count", count), - attribute.String("streamID", e.streamID), - attribute.String("path", e.diskStore.path), - ) - - var events event.Events - - err := e.events.Use(ctx, func(ctx context.Context, stream *wal.Log) error { - ctx, span := lg.Span(ctx) - defer span.End() - - first, err := stream.FirstIndex() - if err != nil { - span.RecordError(err) - return err - } - last, err := stream.LastIndex() - if err != nil { - span.RecordError(err) - return err - } - // --- - if first == 0 || last == 0 { - return nil - } - - start, count := math.PagerBox(first, last, after, count) - if count == 0 { - return nil - } - - span.SetAttributes( - attribute.Int64("first", int64(first)), - attribute.Int64("last", int64(last)), - attribute.Int64("start", int64(start)), - attribute.Int64("count", int64(count)), - attribute.Int64("after", int64(after)), - ) - - events = make([]event.Event, math.Abs(count)) - for i := range events { - - // --- - events[i], err = readStream(ctx, stream, start) - if err != nil { - span.RecordError(err) - return err - } - // --- - span.AddEvent(fmt.Sprintf("read event %d of %d - %d", i, len(events), events[i].EventMeta().ActualPosition)) - - if count > 0 { - start += 1 - } else { - start -= 1 - } - if start < first || start > last { - events = events[:i+1] - break - } - } - return nil - }) - if err != nil { - span.RecordError(err) - return nil, err - } - - event.SetStreamID(e.streamID, events...) - e.diskStore.m_disk_read.Add(ctx, int64(len(events))) - - return events, nil -} func (e *eventLog) ReadN(ctx context.Context, index ...uint64) (event.Events, error) { ctx, span := lg.Span(ctx) defer span.End() @@ -341,6 +262,46 @@ func (e *eventLog) ReadN(ctx context.Context, index ...uint64) (event.Events, er return events, err } +func (e *eventLog) Read(ctx context.Context, after, count int64) (event.Events, error) { + ctx, span := lg.Span(ctx) + defer span.End() + span.SetAttributes( + attribute.Int64("args.after", after), + attribute.Int64("args.count", count), + attribute.String("streamID", e.streamID), + attribute.String("path", e.diskStore.path), + ) + + var events event.Events + err := e.events.Use(ctx, func(ctx context.Context, stream *wal.Log) error { + ctx, span := lg.Span(ctx) + defer span.End() + + first, err := stream.FirstIndex() + if err != nil { + return err + } + last, err := stream.LastIndex() + if err != nil { + return err + } + streamIDs, err := driver.GenerateStreamIDs(first, last, after, count) + if err != nil { + return err + } + events, err = readStreamN(ctx, stream, streamIDs...) + event.SetStreamID(e.streamID, events...) + return err + }) + if err != nil { + span.RecordError(err) + return nil, err + } + + e.diskStore.m_disk_read.Add(ctx, int64(len(events))) + + return events, nil +} func (e *eventLog) FirstIndex(ctx context.Context) (uint64, error) { ctx, span := lg.Span(ctx) defer span.End() @@ -399,32 +360,6 @@ func (e *eventLog) Truncate(ctx context.Context, index int64) error { return events.TruncateFront(uint64(index)) }) } -func readStream(ctx context.Context, stream *wal.Log, index uint64) (event.Event, error) { - ctx, span := lg.Span(ctx) - defer span.End() - - span.SetAttributes( - attribute.Int64("args.index", int64(index)), - ) - - var b []byte - var err error - b, err = stream.Read(index) - if err != nil { - if errors.Is(err, wal.ErrNotFound) || errors.Is(err, wal.ErrOutOfRange) { - err = fmt.Errorf("%w: empty", ev.ErrNotFound) - } - - span.RecordError(err) - return nil, err - } - e, err := event.UnmarshalBinary(ctx, b, index) - if err != nil { - span.RecordError(err) - return nil, err - } - return e, err -} func readStreamN(ctx context.Context, stream *wal.Log, index ...uint64) (event.Events, error) { ctx, span := lg.Span(ctx) defer span.End() @@ -442,6 +377,7 @@ func readStreamN(ctx context.Context, stream *wal.Log, index ...uint64) (event.E var err error events := make(event.Events, len(index)) for i, idx := range index { + span.AddEvent(fmt.Sprintf("read event %d of %d - %d", i, len(events), events[i].EventMeta().ActualPosition)) b, err = stream.Read(idx) if err != nil { if errors.Is(err, wal.ErrNotFound) || errors.Is(err, wal.ErrOutOfRange) { diff --git a/pkg/driver/driver.go b/pkg/driver/driver.go index b0ba282..41c0259 100644 --- a/pkg/driver/driver.go +++ b/pkg/driver/driver.go @@ -5,6 +5,7 @@ import ( "context" "go.sour.is/ev/pkg/event" + "go.sour.is/pkg/math" ) type Driver interface { @@ -38,3 +39,31 @@ type EventStream interface { Subscribe(ctx context.Context, streamID string, start int64) (Subscription, error) Send(ctx context.Context, streamID string, events event.Events) error } + +func GenerateStreamIDs(first, last uint64, after, count int64) ([]uint64, error) { + // --- + if first == 0 || last == 0 { + return nil, nil + } + + start, count := math.PagerBox(first, last, after, count) + if count == 0 { + return nil, nil + } + + streamIDs := make([]uint64, math.Abs(count)) + for i := range streamIDs { + streamIDs[i] = start + + if count > 0 { + start += 1 + } else { + start -= 1 + } + if start < first || start > last { + streamIDs = streamIDs[:i+1] + break + } + } + return streamIDs, nil +} diff --git a/pkg/driver/mem-store/mem-store.go b/pkg/driver/mem-store/mem-store.go index cf450de..982e208 100644 --- a/pkg/driver/mem-store/mem-store.go +++ b/pkg/driver/mem-store/mem-store.go @@ -5,28 +5,26 @@ import ( "context" "fmt" + "go.opentelemetry.io/otel/attribute" "go.sour.is/pkg/lg" "go.sour.is/pkg/locker" - "go.sour.is/pkg/math" "go.sour.is/ev" "go.sour.is/ev/pkg/driver" "go.sour.is/ev/pkg/event" ) +const AppendOnly = ev.AppendOnly +const AllEvents = ev.AllEvents + type state struct { streams map[string]*locker.Locked[event.Events] } -type eventLog struct { - streamID string - events *locker.Locked[event.Events] -} type memstore struct { state *locker.Locked[state] } -const AppendOnly = ev.AppendOnly -const AllEvents = ev.AllEvents +var _ driver.Driver = (*memstore)(nil) func Init(ctx context.Context) error { ctx, span := lg.Span(ctx) @@ -35,8 +33,6 @@ func Init(ctx context.Context) error { return ev.Register(ctx, "mem", &memstore{}) } -var _ driver.Driver = (*memstore)(nil) - func (memstore) Open(ctx context.Context, name string) (driver.Driver, error) { _, span := lg.Span(ctx) defer span.End() @@ -68,6 +64,11 @@ func (m *memstore) EventLog(ctx context.Context, streamID string) (driver.EventL return el, err } +type eventLog struct { + streamID string + events *locker.Locked[event.Events] +} + var _ driver.EventLog = (*eventLog)(nil) // Append implements driver.EventStore @@ -117,6 +118,16 @@ func (m *eventLog) ReadN(ctx context.Context, index ...uint64) (event.Events, er ctx, span := lg.Span(ctx) defer span.End() + lis := make([]int64, len(index)) + for i := range index { + lis[i] = int64(index[i]) + } + + span.SetAttributes( + attribute.Int64Slice("args.index", lis), + attribute.String("streamID", m.streamID), + ) + var events event.Events err := m.events.Use(ctx, func(ctx context.Context, stream *event.Events) error { var err error @@ -133,54 +144,30 @@ func (m *eventLog) ReadN(ctx context.Context, index ...uint64) (event.Events, er func (m *eventLog) Read(ctx context.Context, after int64, count int64) (event.Events, error) { ctx, span := lg.Span(ctx) defer span.End() + span.SetAttributes( + attribute.Int64("args.after", after), + attribute.Int64("args.count", count), + attribute.String("streamID", m.streamID), + ) var events event.Events - err := m.events.Use(ctx, func(ctx context.Context, stream *event.Events) error { ctx, span := lg.Span(ctx) defer span.End() - span.AddEvent(fmt.Sprintf("%s %d", m.streamID, len(*stream))) - first := stream.First().EventMeta().Position last := stream.Last().EventMeta().Position - // --- - if first == 0 || last == 0 { - return nil - } - start, count := math.PagerBox(first, last, after, count) - if count == 0 { - return nil - } - span.AddEvent(fmt.Sprint("box", first, last, after, count)) - events = make([]event.Event, math.Abs(count)) - for i := range events { - span.AddEvent(fmt.Sprintf("read event %d of %d", i, math.Abs(count))) - - // --- clone event - var err error - events[i], err = readStream(ctx, stream, start) - if err != nil { - return err - } - // --- - - if count > 0 { - start += 1 - } else { - start -= 1 - } - if start < first || start > last { - events = events[:i+1] - break - } + streamIDs, err := driver.GenerateStreamIDs(first, last, after, count) + if err != nil { + return err } + events, err = readStreamN(ctx, stream, streamIDs...) event.SetStreamID(m.streamID, events...) - - return nil + return err }) if err != nil { + span.RecordError(err) return nil, err } @@ -205,26 +192,26 @@ func (m *eventLog) LastIndex(ctx context.Context) (uint64, error) { return events.Last().EventMeta().Position, err } -func (m *eventLog) LoadForUpdate(ctx context.Context, a event.Aggregate, fn func(context.Context, event.Aggregate) error) (uint64, error) { - panic("not implemented") -} - -func readStream(ctx context.Context, stream *event.Events, index uint64) (event.Event, error) { +func (e *eventLog) Truncate(ctx context.Context, index int64) error { ctx, span := lg.Span(ctx) defer span.End() - var b []byte - var err error - e := (*stream)[index-1] - b, err = event.MarshalBinary(e) - if err != nil { - return nil, err + span.SetAttributes( + attribute.Int64("args.index", index), + attribute.String("streamID", e.streamID), + ) + + if index == 0 { + return nil } - e, err = event.UnmarshalBinary(ctx, b, e.EventMeta().ActualPosition) - if err != nil { - return nil, err - } - return e, err + return e.events.Use(ctx, func(ctx context.Context, events *event.Events) error { + if index < 0 { + *events = (*events)[:index] + return nil + } + *events = (*events)[index:] + return nil + }) } func readStreamN(ctx context.Context, stream *event.Events, index ...uint64) (event.Events, error) { ctx, span := lg.Span(ctx) @@ -233,8 +220,11 @@ func readStreamN(ctx context.Context, stream *event.Events, index ...uint64) (ev var b []byte var err error - events := make(event.Events, len(index)) + count := len(index) + events := make(event.Events, count) for i, index := range index { + span.AddEvent(fmt.Sprintf("read event %d of %d", i, count)) + e := (*stream)[index-1] b, err = event.MarshalBinary(e) if err != nil { diff --git a/pkg/driver/projecter/projecter.go b/pkg/driver/projecter/projecter.go index 4364241..76490d5 100644 --- a/pkg/driver/projecter/projecter.go +++ b/pkg/driver/projecter/projecter.go @@ -21,7 +21,6 @@ func New(_ context.Context, fns ...func(event.Event) []event.Event) *projector { return &projector{fns: fns} } func (p *projector) Apply(e *ev.EventStore) { - up := e.Driver for up != nil { if op, ok := up.(*projector); ok { diff --git a/pkg/driver/projecter/projector_test.go b/pkg/driver/projecter/projector_test.go index 7793462..b5f255d 100644 --- a/pkg/driver/projecter/projector_test.go +++ b/pkg/driver/projecter/projector_test.go @@ -134,5 +134,4 @@ func TestProjecter(t *testing.T) { <-wait is.Equal(len(events), 4) - } diff --git a/pkg/driver/resolve-links/resolve-links.go b/pkg/driver/resolve-links/resolve-links.go index e68addb..6e16d78 100644 --- a/pkg/driver/resolve-links/resolve-links.go +++ b/pkg/driver/resolve-links/resolve-links.go @@ -43,8 +43,8 @@ func (r *resolvelinks) EventLog(ctx context.Context, streamID string) (driver.Ev } type wrapper struct { - up driver.EventLog - resolvelinks *resolvelinks + up driver.EventLog + resolver *resolvelinks } func (r *wrapper) Unwrap() driver.EventLog { @@ -60,60 +60,7 @@ func (w *wrapper) Read(ctx context.Context, after int64, count int64) (event.Eve return nil, err } - idx := make(map[string][]uint64) - ptrs := make(map[string][]int) - for i := range events { - e := events[i] - if e, ok := e.(*event.EventPtr); ok { - idx[e.StreamID] = append(idx[e.StreamID], e.Pos) - ptrs[e.StreamID] = append(ptrs[e.StreamID], i) - } - } - - for streamID, ids := range idx { - d, err := w.resolvelinks.EventLog(ctx, streamID) - if err != nil { - return nil, err - } - ptr := ptrs[streamID] - lis, err := d.ReadN(ctx, ids...) - if err != nil && !errors.Is(err, ev.ErrNotFound) { - return nil, err - } - - for i := range lis { - meta := lis[i].EventMeta() - actual := events[ptr[i]].EventMeta() - meta.ActualPosition = actual.Position - meta.ActualStreamID = actual.ActualStreamID - lis[i].SetEventMeta(meta) - events[i] = lis[i] - } - } - - // for i, e := range events { - // switch e := e.(type) { - // case *event.EventPtr: - // d, err := w.resolvelinks.EventLog(ctx, e.StreamID) - // if err != nil { - // return nil, err - // } - // lis, err := d.ReadN(ctx, e.Pos) - // if err != nil && !errors.Is(err, es.ErrNotFound) { - // return nil, err - // } - - // if ne := lis.First(); ne != event.NilEvent { - // meta := ne.EventMeta() - // actual := e.EventMeta() - // meta.ActualPosition = actual.Position - // meta.ActualStreamID = actual.ActualStreamID - // ne.SetEventMeta(meta) - // events[i] = ne - // } - // } - // } - + err = w.resolvelinks(ctx, events) return events, err } @@ -126,29 +73,7 @@ func (w *wrapper) ReadN(ctx context.Context, index ...uint64) (event.Events, err return nil, err } - for i, e := range events { - switch e := e.(type) { - case *event.EventPtr: - d, err := w.resolvelinks.EventLog(ctx, e.StreamID) - if err != nil { - return nil, err - } - lis, err := d.ReadN(ctx, e.Pos) - if err != nil { - return nil, err - } - - ne := lis.First() - meta := ne.EventMeta() - actual := e.EventMeta() - meta.ActualPosition = actual.Position - meta.ActualStreamID = actual.ActualStreamID - ne.SetEventMeta(meta) - - events[i] = ne - } - } - + err = w.resolvelinks(ctx, events) return events, err } @@ -172,3 +97,37 @@ func (w *wrapper) LastIndex(ctx context.Context) (uint64, error) { return w.up.LastIndex(ctx) } + +func (w *wrapper) resolvelinks(ctx context.Context, events event.Events) error { + idx := make(map[string][]uint64) + ptrs := make(map[string][]int) + for i := range events { + e := events[i] + if e, ok := e.(*event.EventPtr); ok { + idx[e.StreamID] = append(idx[e.StreamID], e.Pos) + ptrs[e.StreamID] = append(ptrs[e.StreamID], i) + } + } + + for streamID, ids := range idx { + d, err := w.resolver.EventLog(ctx, streamID) + if err != nil { + return err + } + ptr := ptrs[streamID] + lis, err := d.ReadN(ctx, ids...) + if err != nil && !errors.Is(err, ev.ErrNotFound) { + return err + } + + for i := range lis { + meta := lis[i].EventMeta() + actual := events[ptr[i]].EventMeta() + meta.ActualPosition = actual.Position + meta.ActualStreamID = actual.ActualStreamID + lis[i].SetEventMeta(meta) + events[i] = lis[i] + } + } + return nil +} diff --git a/pkg/driver/streamer/streamer.go b/pkg/driver/streamer/streamer.go index 4e572b6..a1b4c8b 100644 --- a/pkg/driver/streamer/streamer.go +++ b/pkg/driver/streamer/streamer.go @@ -60,6 +60,8 @@ func (s *streamer) EventLog(ctx context.Context, streamID string) (driver.EventL var _ driver.EventStream = (*streamer)(nil) +// Subscribe sets up a subscription that tracks the last HWM. when new events arrive they are notified via Send. +// Call Close when done to clean up allocation. func (s *streamer) Subscribe(ctx context.Context, streamID string, start int64) (driver.Subscription, error) { ctx, span := lg.Span(ctx) defer span.End() @@ -81,6 +83,8 @@ func (s *streamer) Subscribe(ctx context.Context, streamID string, start int64) return nil }) } + +// Send will notify all Recv-ing subscribers that new events are ready. func (s *streamer) Send(ctx context.Context, streamID string, events event.Events) error { ctx, span := lg.Span(ctx) defer span.End() @@ -218,6 +222,7 @@ type subscription struct { once sync.Once } +// Recv checks for new events. Returned chan will block until an event is ready or cancelled. func (s *subscription) Recv(ctx context.Context) <-chan bool { ctx, span := lg.Span(ctx) defer span.End() @@ -275,6 +280,8 @@ func (s *subscription) Recv(ctx context.Context) <-chan bool { return done } + +// Events returns a batch of events since last call. Updates position of HWM to end of stream. Call Recv to be notified when more are ready. func (s *subscription) Events(ctx context.Context) (event.Events, error) { ctx, span := lg.Span(ctx) defer span.End() @@ -303,6 +310,8 @@ func (s *subscription) Events(ctx context.Context) (event.Events, error) { return err }) } + +// Close unsubscribes from a stream func (s *subscription) Close(ctx context.Context) error { ctx, span := lg.Span(ctx) defer span.End() diff --git a/pkg/event/events.go b/pkg/event/events.go index d251399..f0abadc 100644 --- a/pkg/event/events.go +++ b/pkg/event/events.go @@ -238,28 +238,29 @@ func AsEvent[T any](e T) Event { return &asEvent[T]{payload: e} } -type asEvent [T any] struct { +type asEvent[T any] struct { payload T IsEvent } + func (e asEvent[T]) Payload() T { return e.payload } - -type AGG interface{ApplyEvent(...Event)} +type AGG interface{ ApplyEvent(...Event) } func AsAggregate[T AGG](e T) Aggregate { return &asAggregate[T]{payload: e} } -type asAggregate [T AGG] struct { +type asAggregate[T AGG] struct { payload T IsAggregate } + func (e *asAggregate[T]) Payload() T { return e.payload } func (e *asAggregate[T]) ApplyEvent(lis ...Event) { e.payload.ApplyEvent(lis...) -} \ No newline at end of file +} diff --git a/pkg/es/es.graphqls b/pkg/gql/eventstore.graphqls similarity index 90% rename from pkg/es/es.graphqls rename to pkg/gql/eventstore.graphqls index 4f095c3..6d04475 100644 --- a/pkg/es/es.graphqls +++ b/pkg/gql/eventstore.graphqls @@ -17,7 +17,7 @@ extend type Subscription { eventAdded(streamID: String! after: Int! = -1): Event } -type Event implements Edge @goModel(model: "go.sour.is/ev/pkg/es.GQLEvent") { +type Event implements Edge @goModel(model: "go.sour.is/ev/pkg/gql.Event") { id: ID! eventID: String! diff --git a/pkg/es/graph.go b/pkg/gql/graph.go similarity index 84% rename from pkg/es/graph.go rename to pkg/gql/graph.go index 04bea8d..1e0747d 100644 --- a/pkg/es/graph.go +++ b/pkg/gql/graph.go @@ -1,4 +1,4 @@ -package es +package gql_ev import ( "context" @@ -18,7 +18,7 @@ import ( type EventResolver interface { Events(ctx context.Context, streamID string, paging *gql.PageInput) (*gql.Connection, error) - EventAdded(ctx context.Context, streamID string, after int64) (<-chan *GQLEvent, error) + EventAdded(ctx context.Context, streamID string, after int64) (<-chan *Event, error) TruncateStream(ctx context.Context, streamID string, index int64) (bool, error) } type contextKey struct { @@ -45,7 +45,7 @@ func (es *EventStore) Events(ctx context.Context, streamID string, paging *gql.P edges := make([]gql.Edge, 0, len(lis)) for i := range lis { span.AddEvent(fmt.Sprint("event ", i, " of ", len(lis))) - edges = append(edges, &GQLEvent{lis[i]}) + edges = append(edges, &Event{lis[i]}) } var first, last uint64 @@ -68,7 +68,7 @@ func (es *EventStore) Events(ctx context.Context, streamID string, paging *gql.P Edges: edges, }, nil } -func (e *EventStore) EventAdded(ctx context.Context, streamID string, after int64) (<-chan *GQLEvent, error) { +func (e *EventStore) EventAdded(ctx context.Context, streamID string, after int64) (<-chan *Event, error) { ctx, span := lg.Span(ctx) defer span.End() @@ -83,7 +83,7 @@ func (e *EventStore) EventAdded(ctx context.Context, streamID string, after int6 return nil, err } - ch := make(chan *GQLEvent) + ch := make(chan *Event) go func() { ctx, span := lg.Span(ctx) @@ -110,7 +110,7 @@ func (e *EventStore) EventAdded(ctx context.Context, streamID string, after int6 for i := range events { select { - case ch <- &GQLEvent{events[i]}: + case ch <- &Event{events[i]}: continue case <-ctx.Done(): return @@ -141,32 +141,32 @@ func (e *EventStore) GetMiddleware() func(http.Handler) http.Handler { } } -type GQLEvent struct { +type Event struct { e event.Event } -func (e *GQLEvent) ID() string { +func (e *Event) ID() string { return fmt.Sprint(e.e.EventMeta().StreamID, "@", e.e.EventMeta().Position) } -func (e *GQLEvent) EventID() string { +func (e *Event) EventID() string { return e.e.EventMeta().GetEventID() } -func (e *GQLEvent) StreamID() string { +func (e *Event) StreamID() string { return e.e.EventMeta().StreamID } -func (e *GQLEvent) Position() uint64 { +func (e *Event) Position() uint64 { return e.e.EventMeta().Position } -func (e *GQLEvent) Type() string { +func (e *Event) Type() string { return event.TypeOf(e.e) } -func (e *GQLEvent) Created() time.Time { +func (e *Event) Created() time.Time { return e.e.EventMeta().Created() } -func (e *GQLEvent) Values() map[string]interface{} { +func (e *Event) Values() map[string]interface{} { return event.Values(e.e) } -func (e *GQLEvent) Bytes() (string, error) { +func (e *Event) Bytes() (string, error) { switch e := e.e.(type) { case encoding.BinaryMarshaler: b, err := e.MarshalBinary() @@ -179,11 +179,11 @@ func (e *GQLEvent) Bytes() (string, error) { return string(b), err } } -func (e *GQLEvent) Meta() *event.Meta { +func (e *Event) Meta() *event.Meta { meta := e.e.EventMeta() return &meta } -func (e *GQLEvent) Linked(ctx context.Context) (*GQLEvent, error) { +func (e *Event) Linked(ctx context.Context) (*Event, error) { ctx, span := lg.Span(ctx) defer span.End() @@ -198,6 +198,6 @@ func (e *GQLEvent) Linked(ctx context.Context) (*GQLEvent, error) { } events, err := gql.FromContext[contextKey, *EventStore](ctx, esKey).ReadN(ctx, streamID, pos) - return &GQLEvent{e: events.First()}, err + return &Event{e: events.First()}, err } -func (e *GQLEvent) IsEdge() {} +func (e *Event) IsEdge() {}