From e7df4cc479bf9232eb8a9b15b996c58be00c44dc Mon Sep 17 00:00:00 2001 From: Jon Lundy Date: Fri, 30 Sep 2022 14:56:10 -0600 Subject: [PATCH] chore: move LoadForUpdate to seporate interface --- pkg/es/driver/driver.go | 3 +- pkg/es/driver/projecter/projecter.go | 9 +- pkg/es/driver/projecter/projector_test.go | 101 +++++++++++++++++++--- pkg/es/driver/streamer/streamer.go | 9 +- 4 files changed, 109 insertions(+), 13 deletions(-) diff --git a/pkg/es/driver/driver.go b/pkg/es/driver/driver.go index 175970e..8d63249 100644 --- a/pkg/es/driver/driver.go +++ b/pkg/es/driver/driver.go @@ -16,7 +16,8 @@ type EventLog interface { Append(ctx context.Context, events event.Events, version uint64) (uint64, error) FirstIndex(context.Context) (uint64, error) LastIndex(context.Context) (uint64, error) - +} +type EventLogWithUpdate interface { LoadForUpdate(context.Context, event.Aggregate, func(context.Context, event.Aggregate) error) (uint64, error) } diff --git a/pkg/es/driver/projecter/projecter.go b/pkg/es/driver/projecter/projecter.go index 287b081..94ea378 100644 --- a/pkg/es/driver/projecter/projecter.go +++ b/pkg/es/driver/projecter/projecter.go @@ -111,7 +111,14 @@ func (w *wrapper) LoadForUpdate(ctx context.Context, a event.Aggregate, fn func( ctx, span := lg.Span(ctx) defer span.End() - return w.up.LoadForUpdate(ctx, a, fn) + up := w.up + for up != nil { + if up, ok := up.(driver.EventLogWithUpdate); ok { + return up.LoadForUpdate(ctx, a, fn) + } + up = es.Unwrap(up) + } + return 0, es.ErrNoDriver } func DefaultProjection(e event.Event) []event.Event { diff --git a/pkg/es/driver/projecter/projector_test.go b/pkg/es/driver/projecter/projector_test.go index 9aa2ffb..ba26b3e 100644 --- a/pkg/es/driver/projecter/projector_test.go +++ b/pkg/es/driver/projecter/projector_test.go @@ -4,26 +4,107 @@ import ( "context" "testing" + "github.com/matryer/is" + "github.com/sour-is/ev/pkg/es" "github.com/sour-is/ev/pkg/es/driver" + "github.com/sour-is/ev/pkg/es/driver/projecter" + "github.com/sour-is/ev/pkg/es/event" ) type mockDriver struct { - onOpen func() - onEventLog func() -} - -// EventLog implements driver.Driver -func (*mockDriver) EventLog(ctx context.Context, streamID string) (driver.EventLog, error) { - panic("unimplemented") + onOpen func(context.Context, string) (driver.Driver, error) + onEventLog func(context.Context, string) (driver.EventLog, error) } // Open implements driver.Driver -func (*mockDriver) Open(ctx context.Context, dsn string) (driver.Driver, error) { +func (m *mockDriver) Open(ctx context.Context, dsn string) (driver.Driver, error) { + if m.onOpen != nil { + return m.onOpen(ctx, dsn) + } + panic("unimplemented") +} + +// EventLog implements driver.Driver +func (m *mockDriver) EventLog(ctx context.Context, streamID string) (driver.EventLog, error) { + if m.onEventLog != nil { + return m.onEventLog(ctx, streamID) + } panic("unimplemented") } var _ driver.Driver = (*mockDriver)(nil) -func TestProjecter(t *testing.T) { - +type mockEventLog struct { + onAppend func(context.Context, event.Events, uint64) (uint64, error) + onFirstIndex func(context.Context) (uint64, error) + onLastIndex func(context.Context) (uint64, error) + onRead func(context.Context, int64, int64) (event.Events, error) +} + +// Append implements driver.EventLog +func (m *mockEventLog) Append(ctx context.Context, events event.Events, version uint64) (uint64, error) { + if m.onAppend != nil { + return m.onAppend(ctx, events, version) + } + panic("unimplemented") +} + +// FirstIndex implements driver.EventLog +func (m *mockEventLog) FirstIndex(ctx context.Context) (uint64, error) { + if m.onFirstIndex != nil { + return m.onFirstIndex(ctx) + } + panic("unimplemented") +} + +// LastIndex implements driver.EventLog +func (m *mockEventLog) LastIndex(ctx context.Context) (uint64, error) { + if m.onLastIndex != nil { + return m.onLastIndex(ctx) + } + panic("unimplemented") +} + +// Read implements driver.EventLog +func (m *mockEventLog) Read(ctx context.Context, pos int64, count int64) (event.Events, error) { + if m.onRead != nil { + return m.onRead(ctx, pos, count) + } + + panic("unimplemented") +} + +var _ driver.EventLog = (*mockEventLog)(nil) + +func TestProjecter(t *testing.T) { + is := is.New(t) + ctx := context.Background() + + mockEL := &mockEventLog{} + mockEL.onRead = func(ctx context.Context, i1, i2 int64) (event.Events, error) { + return event.NewEvents(), nil + } + + mock := &mockDriver{} + mock.onOpen = func(ctx context.Context, s string) (driver.Driver, error) { + return mock, nil + } + mock.onEventLog = func(ctx context.Context, s string) (driver.EventLog, error) { + return mockEL, nil + } + + es.Init(ctx) + es.Register(ctx, "mock", mock) + + es, err := es.Open( + ctx, + "mock:", + projecter.New(ctx, projecter.DefaultProjection), + ) + + is.NoErr(err) + + _, err = es.Read(ctx, "test", 0, 1) + + is.NoErr(err) } diff --git a/pkg/es/driver/streamer/streamer.go b/pkg/es/driver/streamer/streamer.go index 5faa065..ad5b27a 100644 --- a/pkg/es/driver/streamer/streamer.go +++ b/pkg/es/driver/streamer/streamer.go @@ -182,7 +182,14 @@ func (w *wrapper) LoadForUpdate(ctx context.Context, a event.Aggregate, fn func( ctx, span := lg.Span(ctx) defer span.End() - return w.up.LoadForUpdate(ctx, a, fn) + up := w.up + for up != nil { + if up, ok := up.(driver.EventLogWithUpdate); ok { + return up.LoadForUpdate(ctx, a, fn) + } + up = es.Unwrap(up) + } + return 0, es.ErrNoDriver } type position struct {