feat: make projector extendable
This commit is contained in:
parent
9878ed4a79
commit
129968d179
7
main.go
7
main.go
|
@ -60,7 +60,12 @@ func run(ctx context.Context) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
es, err := es.Open(ctx, env("EV_DATA", "mem:"), streamer.New(ctx), projecter.New(ctx))
|
es, err := es.Open(
|
||||||
|
ctx,
|
||||||
|
env("EV_DATA", "mem:"),
|
||||||
|
streamer.New(ctx),
|
||||||
|
projecter.New(ctx, projecter.DefaultProjection),
|
||||||
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
span.RecordError(err)
|
span.RecordError(err)
|
||||||
return err
|
return err
|
||||||
|
|
|
@ -111,10 +111,12 @@ func (d *diskStore) Open(ctx context.Context, dsn string) (driver.Driver, error)
|
||||||
}
|
}
|
||||||
logs := &openlogs{logs: c}
|
logs := &openlogs{logs: c}
|
||||||
return &diskStore{
|
return &diskStore{
|
||||||
path: path,
|
path: path,
|
||||||
openlogs: locker.New(logs),
|
openlogs: locker.New(logs),
|
||||||
m_disk_open: d.m_disk_open,
|
m_disk_open: d.m_disk_open,
|
||||||
m_disk_evict: d.m_disk_evict,
|
m_disk_evict: d.m_disk_evict,
|
||||||
|
m_disk_read: d.m_disk_read,
|
||||||
|
m_disk_write: d.m_disk_write,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
func (d *diskStore) EventLog(ctx context.Context, streamID string) (driver.EventLog, error) {
|
func (d *diskStore) EventLog(ctx context.Context, streamID string) (driver.EventLog, error) {
|
||||||
|
@ -147,9 +149,9 @@ func (d *diskStore) EventLog(ctx context.Context, streamID string) (driver.Event
|
||||||
}
|
}
|
||||||
|
|
||||||
type eventLog struct {
|
type eventLog struct {
|
||||||
streamID string
|
streamID string
|
||||||
events *locker.Locked[wal.Log]
|
events *locker.Locked[wal.Log]
|
||||||
diskStore *diskStore
|
diskStore *diskStore
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ driver.EventLog = (*eventLog)(nil)
|
var _ driver.EventLog = (*eventLog)(nil)
|
||||||
|
|
|
@ -11,11 +11,12 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
type projector struct {
|
type projector struct {
|
||||||
up driver.Driver
|
up driver.Driver
|
||||||
|
fns []func(event.Event) []event.Event
|
||||||
}
|
}
|
||||||
|
|
||||||
func New(ctx context.Context) *projector {
|
func New(ctx context.Context, fns ...func(event.Event) []event.Event) *projector {
|
||||||
return &projector{}
|
return &projector{fns: fns}
|
||||||
}
|
}
|
||||||
func (p *projector) Apply(e *es.EventStore) {
|
func (p *projector) Apply(e *es.EventStore) {
|
||||||
p.up = e.Driver
|
p.up = e.Driver
|
||||||
|
@ -70,27 +71,13 @@ func (w *wrapper) Append(ctx context.Context, events event.Events, version uint6
|
||||||
|
|
||||||
for i := range events {
|
for i := range events {
|
||||||
e := events[i]
|
e := events[i]
|
||||||
eventType := event.TypeOf(e)
|
|
||||||
m := e.EventMeta()
|
|
||||||
streamID := m.StreamID
|
|
||||||
streamPos := m.Position
|
|
||||||
|
|
||||||
e1 := event.NewPtr(streamID, streamPos)
|
for _, fn := range w.projector.fns {
|
||||||
event.SetStreamID("$all", e1)
|
pevents = append(
|
||||||
|
pevents,
|
||||||
e2 := event.NewPtr(streamID, streamPos)
|
fn(e)...,
|
||||||
event.SetStreamID("$type-"+eventType, e2)
|
)
|
||||||
|
}
|
||||||
e3 := event.NewPtr(streamID, streamPos)
|
|
||||||
pkg, _, _ := strings.Cut(eventType, ".")
|
|
||||||
event.SetStreamID("$pkg-"+pkg, e3)
|
|
||||||
|
|
||||||
pevents = append(
|
|
||||||
pevents,
|
|
||||||
e1,
|
|
||||||
e2,
|
|
||||||
e3,
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for i := range pevents {
|
for i := range pevents {
|
||||||
|
@ -126,3 +113,22 @@ func (w *wrapper) LoadForUpdate(ctx context.Context, a event.Aggregate, fn func(
|
||||||
|
|
||||||
return w.up.LoadForUpdate(ctx, a, fn)
|
return w.up.LoadForUpdate(ctx, a, fn)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func DefaultProjection(e event.Event) []event.Event {
|
||||||
|
eventType := event.TypeOf(e)
|
||||||
|
m := e.EventMeta()
|
||||||
|
streamID := m.StreamID
|
||||||
|
streamPos := m.Position
|
||||||
|
|
||||||
|
e1 := event.NewPtr(streamID, streamPos)
|
||||||
|
event.SetStreamID("$all", e1)
|
||||||
|
|
||||||
|
e2 := event.NewPtr(streamID, streamPos)
|
||||||
|
event.SetStreamID("$type-"+eventType, e2)
|
||||||
|
|
||||||
|
e3 := event.NewPtr(streamID, streamPos)
|
||||||
|
pkg, _, _ := strings.Cut(eventType, ".")
|
||||||
|
event.SetStreamID("$pkg-"+pkg, e3)
|
||||||
|
|
||||||
|
return []event.Event{e1, e2, e3}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue
Block a user