feat: add memstore driver

This commit is contained in:
Jon Lundy 2022-08-04 21:07:10 -06:00
parent 1010657a02
commit 189cb5c968
Signed by untrusted user who does not match committer: xuu
GPG Key ID: C63E6D61F3035024
5 changed files with 165 additions and 7 deletions

View File

@ -17,7 +17,7 @@ import (
"github.com/sour-is/ev/pkg/es" "github.com/sour-is/ev/pkg/es"
"github.com/sour-is/ev/pkg/es/driver" "github.com/sour-is/ev/pkg/es/driver"
ds_driver "github.com/sour-is/ev/pkg/es/driver/disk-store" "github.com/sour-is/ev/pkg/es/driver/disk-store"
"github.com/sour-is/ev/pkg/es/event" "github.com/sour-is/ev/pkg/es/event"
) )
@ -35,7 +35,7 @@ func main() {
func run(ctx context.Context) error { func run(ctx context.Context) error {
event.Register(&PostEvent{}) event.Register(&PostEvent{})
ds_driver.Init(ctx) diskstore.Init(ctx)
es, err := es.Open(ctx, "file:data") es, err := es.Open(ctx, "file:data")
if err != nil { if err != nil {

View File

@ -1,4 +1,4 @@
package ds_driver package diskstore
import ( import (
"context" "context"

View File

@ -0,0 +1,142 @@
package memstore
import (
"context"
"fmt"
"github.com/sour-is/ev/pkg/es"
"github.com/sour-is/ev/pkg/es/driver"
"github.com/sour-is/ev/pkg/es/event"
"github.com/sour-is/ev/pkg/locker"
"github.com/sour-is/ev/pkg/math"
)
type state struct {
streams map[string]event.Events
}
type memstore struct {
state *locker.Locked[state]
}
var _ driver.Driver = (*memstore)(nil)
func Init(ctx context.Context) {
es.Register(ctx, "mem", &memstore{})
}
func (memstore) Open(name string) (driver.EventStore, error) {
s := &state{streams: make(map[string]event.Events)}
return &memstore{locker.New(s)}, nil
}
// Append implements driver.EventStore
func (m *memstore) Append(ctx context.Context, streamID string, events event.Events) (uint64, error) {
event.SetStreamID(streamID, events...)
return uint64(len(events)), m.state.Modify(ctx, func(state *state) error {
stream := state.streams[streamID]
last := uint64(len(stream))
for i := range events {
pos := last + uint64(i) + 1
event.SetPosition(events[i], pos)
stream = append(stream, events[i])
state.streams[streamID] = stream
}
return nil
})
}
// Load implements driver.EventStore
func (m *memstore) Load(ctx context.Context, agg event.Aggregate) error {
return m.state.Modify(ctx, func(state *state) error {
events := state.streams[agg.StreamID()]
event.SetStreamID(agg.StreamID(), events...)
agg.ApplyEvent(events...)
return nil
})
}
// Read implements driver.EventStore
func (m *memstore) Read(ctx context.Context, streamID string, pos int64, count int64) (event.Events, error) {
events := make([]event.Event, math.Abs(count))
err := m.state.Modify(ctx, func(state *state) error {
stream := state.streams[streamID]
var first, last, start uint64
first = stream.First().EventMeta().Position
last = stream.Last().EventMeta().Position
if first == 0 || last == 0 {
events = events[:0]
return nil
}
switch {
case pos >= 0:
start = first + uint64(pos)
if pos == 0 && count < 0 {
count = -count // if pos=0 assume forward count.
}
case pos < 0:
start = uint64(int64(last) + pos + 1)
if pos == -1 && count > 0 {
count = -count // if pos=-1 assume backward count.
}
}
for i := range events {
events[i] = stream[start-1]
if count > 0 {
start += 1
} else {
start -= 1
}
if start < first || start > last {
events = events[:i+1]
break
}
}
return nil
})
if err != nil {
return nil, err
}
return events, nil
}
// Save implements driver.EventStore
func (m *memstore) Save(ctx context.Context, agg event.Aggregate) (uint64, error) {
events := agg.Events(true)
event.SetStreamID(agg.StreamID(), events...)
err := m.state.Modify(ctx, func(state *state) error {
stream := state.streams[agg.StreamID()]
last := uint64(len(stream))
if agg.StreamVersion() != last {
return fmt.Errorf("current version wrong %d != %d", agg.StreamVersion(), last)
}
for i := range events {
pos := last + uint64(i) + 1
event.SetPosition(events[i], pos)
stream = append(stream, events[i])
}
state.streams[agg.StreamID()] = stream
return nil
})
if err != nil {
return 0, err
}
agg.Commit()
return uint64(len(events)), nil
}

View File

@ -7,7 +7,7 @@ import (
"time" "time"
"github.com/sour-is/ev/pkg/es" "github.com/sour-is/ev/pkg/es"
ds_driver "github.com/sour-is/ev/pkg/es/driver/disk-store" memstore "github.com/sour-is/ev/pkg/es/driver/mem-store"
"github.com/sour-is/ev/pkg/es/event" "github.com/sour-is/ev/pkg/es/event"
) )
@ -16,9 +16,9 @@ func TestES(t *testing.T) {
event.Register(&ValueSet{}) event.Register(&ValueSet{})
ds_driver.Init(ctx) memstore.Init(ctx)
es, err := es.Open(ctx, "file:data") es, err := es.Open(ctx, "mem:")
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -54,7 +54,7 @@ func (lis Events) SetStreamID(streamID string) {
} }
func (lis Events) First() Event { func (lis Events) First() Event {
if len(lis) == 0 { if len(lis) == 0 {
return nil return nilEvent
} }
return lis[0] return lis[0]
} }
@ -64,6 +64,12 @@ func (lis Events) Rest() Events {
} }
return lis[1:] return lis[1:]
} }
func (lis Events) Last() Event {
if len(lis) == 0 {
return nilEvent
}
return lis[len(lis)-1]
}
func (lis Events) MarshalText() ([]byte, error) { func (lis Events) MarshalText() ([]byte, error) {
b := &bytes.Buffer{} b := &bytes.Buffer{}
for i := range lis { for i := range lis {
@ -118,6 +124,7 @@ func SetPosition(e Event, i uint64) {
meta.Position = i meta.Position = i
e.SetEventMeta(meta) e.SetEventMeta(meta)
} }
type Meta struct { type Meta struct {
EventID ulid.ULID EventID ulid.ULID
StreamID string StreamID string
@ -127,3 +134,12 @@ type Meta struct {
func (m Meta) Time() time.Time { func (m Meta) Time() time.Time {
return ulid.Time(m.EventID.Time()) return ulid.Time(m.EventID.Time())
} }
type _nilEvent struct{}
func (_nilEvent) EventMeta() Meta {
return Meta{}
}
func (_nilEvent) SetEventMeta(eventMeta Meta) {}
var nilEvent _nilEvent