diff --git a/main.go b/main.go index 84a4a19..d9be497 100644 --- a/main.go +++ b/main.go @@ -17,7 +17,7 @@ import ( "github.com/sour-is/ev/pkg/es" "github.com/sour-is/ev/pkg/es/driver" - ds_driver "github.com/sour-is/ev/pkg/es/driver/disk-store" + "github.com/sour-is/ev/pkg/es/driver/disk-store" "github.com/sour-is/ev/pkg/es/event" ) @@ -35,7 +35,7 @@ func main() { func run(ctx context.Context) error { event.Register(&PostEvent{}) - ds_driver.Init(ctx) + diskstore.Init(ctx) es, err := es.Open(ctx, "file:data") if err != nil { diff --git a/pkg/es/driver/disk-store/disk-store.go b/pkg/es/driver/disk-store/disk-store.go index 5f30c3d..b50d8bb 100644 --- a/pkg/es/driver/disk-store/disk-store.go +++ b/pkg/es/driver/disk-store/disk-store.go @@ -1,4 +1,4 @@ -package ds_driver +package diskstore import ( "context" diff --git a/pkg/es/driver/mem-store/mem-store.go b/pkg/es/driver/mem-store/mem-store.go new file mode 100644 index 0000000..2843ff8 --- /dev/null +++ b/pkg/es/driver/mem-store/mem-store.go @@ -0,0 +1,142 @@ +package memstore + +import ( + "context" + "fmt" + + "github.com/sour-is/ev/pkg/es" + "github.com/sour-is/ev/pkg/es/driver" + "github.com/sour-is/ev/pkg/es/event" + "github.com/sour-is/ev/pkg/locker" + "github.com/sour-is/ev/pkg/math" +) + +type state struct { + streams map[string]event.Events +} + +type memstore struct { + state *locker.Locked[state] +} + +var _ driver.Driver = (*memstore)(nil) + +func Init(ctx context.Context) { + es.Register(ctx, "mem", &memstore{}) +} + +func (memstore) Open(name string) (driver.EventStore, error) { + s := &state{streams: make(map[string]event.Events)} + return &memstore{locker.New(s)}, nil +} + +// Append implements driver.EventStore +func (m *memstore) Append(ctx context.Context, streamID string, events event.Events) (uint64, error) { + event.SetStreamID(streamID, events...) + + return uint64(len(events)), m.state.Modify(ctx, func(state *state) error { + stream := state.streams[streamID] + last := uint64(len(stream)) + for i := range events { + pos := last + uint64(i) + 1 + event.SetPosition(events[i], pos) + stream = append(stream, events[i]) + state.streams[streamID] = stream + } + + return nil + }) +} + +// Load implements driver.EventStore +func (m *memstore) Load(ctx context.Context, agg event.Aggregate) error { + return m.state.Modify(ctx, func(state *state) error { + events := state.streams[agg.StreamID()] + event.SetStreamID(agg.StreamID(), events...) + agg.ApplyEvent(events...) + return nil + }) +} + +// Read implements driver.EventStore +func (m *memstore) Read(ctx context.Context, streamID string, pos int64, count int64) (event.Events, error) { + events := make([]event.Event, math.Abs(count)) + + err := m.state.Modify(ctx, func(state *state) error { + stream := state.streams[streamID] + + var first, last, start uint64 + first = stream.First().EventMeta().Position + last = stream.Last().EventMeta().Position + + if first == 0 || last == 0 { + events = events[:0] + + return nil + } + + switch { + case pos >= 0: + start = first + uint64(pos) + if pos == 0 && count < 0 { + count = -count // if pos=0 assume forward count. + } + case pos < 0: + start = uint64(int64(last) + pos + 1) + if pos == -1 && count > 0 { + count = -count // if pos=-1 assume backward count. + } + } + + for i := range events { + events[i] = stream[start-1] + + if count > 0 { + start += 1 + } else { + start -= 1 + } + if start < first || start > last { + events = events[:i+1] + break + } + } + + return nil + }) + if err != nil { + return nil, err + } + + return events, nil +} + +// Save implements driver.EventStore +func (m *memstore) Save(ctx context.Context, agg event.Aggregate) (uint64, error) { + events := agg.Events(true) + event.SetStreamID(agg.StreamID(), events...) + + err := m.state.Modify(ctx, func(state *state) error { + stream := state.streams[agg.StreamID()] + + last := uint64(len(stream)) + if agg.StreamVersion() != last { + return fmt.Errorf("current version wrong %d != %d", agg.StreamVersion(), last) + } + + for i := range events { + pos := last + uint64(i) + 1 + event.SetPosition(events[i], pos) + stream = append(stream, events[i]) + } + + state.streams[agg.StreamID()] = stream + return nil + }) + if err != nil { + return 0, err + } + agg.Commit() + + return uint64(len(events)), nil +} diff --git a/pkg/es/es_test.go b/pkg/es/es_test.go index 6788aaf..a95c6ee 100644 --- a/pkg/es/es_test.go +++ b/pkg/es/es_test.go @@ -7,7 +7,7 @@ import ( "time" "github.com/sour-is/ev/pkg/es" - ds_driver "github.com/sour-is/ev/pkg/es/driver/disk-store" + memstore "github.com/sour-is/ev/pkg/es/driver/mem-store" "github.com/sour-is/ev/pkg/es/event" ) @@ -16,9 +16,9 @@ func TestES(t *testing.T) { event.Register(&ValueSet{}) - ds_driver.Init(ctx) + memstore.Init(ctx) - es, err := es.Open(ctx, "file:data") + es, err := es.Open(ctx, "mem:") if err != nil { t.Fatal(err) } diff --git a/pkg/es/event/events.go b/pkg/es/event/events.go index 44bb48f..ff27a7a 100644 --- a/pkg/es/event/events.go +++ b/pkg/es/event/events.go @@ -54,7 +54,7 @@ func (lis Events) SetStreamID(streamID string) { } func (lis Events) First() Event { if len(lis) == 0 { - return nil + return nilEvent } return lis[0] } @@ -64,6 +64,12 @@ func (lis Events) Rest() Events { } return lis[1:] } +func (lis Events) Last() Event { + if len(lis) == 0 { + return nilEvent + } + return lis[len(lis)-1] +} func (lis Events) MarshalText() ([]byte, error) { b := &bytes.Buffer{} for i := range lis { @@ -118,6 +124,7 @@ func SetPosition(e Event, i uint64) { meta.Position = i e.SetEventMeta(meta) } + type Meta struct { EventID ulid.ULID StreamID string @@ -127,3 +134,12 @@ type Meta struct { func (m Meta) Time() time.Time { return ulid.Time(m.EventID.Time()) } + +type _nilEvent struct{} + +func (_nilEvent) EventMeta() Meta { + return Meta{} +} +func (_nilEvent) SetEventMeta(eventMeta Meta) {} + +var nilEvent _nilEvent