diff --git a/main.go b/main.go index 2a86fad..8c1adbe 100644 --- a/main.go +++ b/main.go @@ -60,7 +60,12 @@ func run(ctx context.Context) error { return err } - es, err := es.Open(ctx, env("EV_DATA", "mem:"), streamer.New(ctx), projecter.New(ctx)) + es, err := es.Open( + ctx, + env("EV_DATA", "mem:"), + streamer.New(ctx), + projecter.New(ctx, projecter.DefaultProjection), + ) if err != nil { span.RecordError(err) return err diff --git a/pkg/es/driver/disk-store/disk-store.go b/pkg/es/driver/disk-store/disk-store.go index 36472ef..1cd44bd 100644 --- a/pkg/es/driver/disk-store/disk-store.go +++ b/pkg/es/driver/disk-store/disk-store.go @@ -111,10 +111,12 @@ func (d *diskStore) Open(ctx context.Context, dsn string) (driver.Driver, error) } logs := &openlogs{logs: c} return &diskStore{ - path: path, - openlogs: locker.New(logs), + path: path, + openlogs: locker.New(logs), m_disk_open: d.m_disk_open, m_disk_evict: d.m_disk_evict, + m_disk_read: d.m_disk_read, + m_disk_write: d.m_disk_write, }, nil } func (d *diskStore) EventLog(ctx context.Context, streamID string) (driver.EventLog, error) { @@ -147,9 +149,9 @@ func (d *diskStore) EventLog(ctx context.Context, streamID string) (driver.Event } type eventLog struct { - streamID string - events *locker.Locked[wal.Log] - diskStore *diskStore + streamID string + events *locker.Locked[wal.Log] + diskStore *diskStore } var _ driver.EventLog = (*eventLog)(nil) diff --git a/pkg/es/driver/projecter/projecter.go b/pkg/es/driver/projecter/projecter.go index 0d619c0..287b081 100644 --- a/pkg/es/driver/projecter/projecter.go +++ b/pkg/es/driver/projecter/projecter.go @@ -11,11 +11,12 @@ import ( ) type projector struct { - up driver.Driver + up driver.Driver + fns []func(event.Event) []event.Event } -func New(ctx context.Context) *projector { - return &projector{} +func New(ctx context.Context, fns ...func(event.Event) []event.Event) *projector { + return &projector{fns: fns} } func (p *projector) Apply(e *es.EventStore) { p.up = e.Driver @@ -70,27 +71,13 @@ func (w *wrapper) Append(ctx context.Context, events event.Events, version uint6 for i := range events { e := events[i] - eventType := event.TypeOf(e) - m := e.EventMeta() - streamID := m.StreamID - streamPos := m.Position - e1 := event.NewPtr(streamID, streamPos) - event.SetStreamID("$all", e1) - - e2 := event.NewPtr(streamID, streamPos) - event.SetStreamID("$type-"+eventType, e2) - - e3 := event.NewPtr(streamID, streamPos) - pkg, _, _ := strings.Cut(eventType, ".") - event.SetStreamID("$pkg-"+pkg, e3) - - pevents = append( - pevents, - e1, - e2, - e3, - ) + for _, fn := range w.projector.fns { + pevents = append( + pevents, + fn(e)..., + ) + } } for i := range pevents { @@ -126,3 +113,22 @@ func (w *wrapper) LoadForUpdate(ctx context.Context, a event.Aggregate, fn func( return w.up.LoadForUpdate(ctx, a, fn) } + +func DefaultProjection(e event.Event) []event.Event { + eventType := event.TypeOf(e) + m := e.EventMeta() + streamID := m.StreamID + streamPos := m.Position + + e1 := event.NewPtr(streamID, streamPos) + event.SetStreamID("$all", e1) + + e2 := event.NewPtr(streamID, streamPos) + event.SetStreamID("$type-"+eventType, e2) + + e3 := event.NewPtr(streamID, streamPos) + pkg, _, _ := strings.Cut(eventType, ".") + event.SetStreamID("$pkg-"+pkg, e3) + + return []event.Event{e1, e2, e3} +}