feat: add resolvelinks

This commit is contained in:
Jon Lundy
2022-10-30 09:18:08 -06:00
parent 5bf052580f
commit 6569c58e37
18 changed files with 421 additions and 106 deletions

View File

@@ -94,7 +94,7 @@ func (d *diskStore) Open(ctx context.Context, dsn string) (driver.Driver, error)
_, span := lg.Span(ctx)
defer span.End()
l.Modify(ctx, func(w *wal.Log) error {
l.Modify(ctx, func(ctx context.Context, w *wal.Log) error {
_, span := lg.Span(ctx)
defer span.End()
@@ -128,7 +128,7 @@ func (d *diskStore) EventLog(ctx context.Context, streamID string) (driver.Event
el := &eventLog{streamID: streamID, diskStore: d}
return el, d.openlogs.Modify(ctx, func(openlogs *openlogs) error {
return el, d.openlogs.Modify(ctx, func(ctx context.Context, openlogs *openlogs) error {
_, span := lg.Span(ctx)
defer span.End()
@@ -166,7 +166,7 @@ func (e *eventLog) Append(ctx context.Context, events event.Events, version uint
event.SetStreamID(e.streamID, events...)
var count uint64
err := e.events.Modify(ctx, func(l *wal.Log) error {
err := e.events.Modify(ctx, func(ctx context.Context, l *wal.Log) error {
_, span := lg.Span(ctx)
defer span.End()
@@ -215,7 +215,7 @@ func (e *eventLog) Read(ctx context.Context, after, count int64) (event.Events,
var events event.Events
err := e.events.Modify(ctx, func(stream *wal.Log) error {
err := e.events.Modify(ctx, func(ctx context.Context, stream *wal.Log) error {
_, span := lg.Span(ctx)
defer span.End()
@@ -244,17 +244,7 @@ func (e *eventLog) Read(ctx context.Context, after, count int64) (event.Events,
span.AddEvent(fmt.Sprintf("read event %d of %d", i, len(events)))
// ---
var b []byte
b, err = stream.Read(start)
if err != nil {
if errors.Is(err, wal.ErrNotFound) || errors.Is(err, wal.ErrOutOfRange) {
err = fmt.Errorf("%w: empty", es.ErrNotFound)
}
span.RecordError(err)
return err
}
events[i], err = event.UnmarshalBinary(ctx, b, start)
events[i], err = readStream(ctx, stream, start)
if err != nil {
span.RecordError(err)
return err
@@ -283,6 +273,21 @@ func (e *eventLog) Read(ctx context.Context, after, count int64) (event.Events,
return events, nil
}
func (e *eventLog) ReadN(ctx context.Context, index ...uint64) (event.Events, error) {
_, span := lg.Span(ctx)
defer span.End()
var events event.Events
err := e.events.Modify(ctx, func(ctx context.Context, stream *wal.Log) error {
var err error
events, err = readStreamN(ctx, stream, index...)
return err
})
return events, err
}
func (e *eventLog) FirstIndex(ctx context.Context) (uint64, error) {
_, span := lg.Span(ctx)
defer span.End()
@@ -290,7 +295,7 @@ func (e *eventLog) FirstIndex(ctx context.Context) (uint64, error) {
var idx uint64
var err error
err = e.events.Modify(ctx, func(events *wal.Log) error {
err = e.events.Modify(ctx, func(ctx context.Context, events *wal.Log) error {
idx, err = events.FirstIndex()
return err
})
@@ -304,7 +309,7 @@ func (e *eventLog) LastIndex(ctx context.Context) (uint64, error) {
var idx uint64
var err error
err = e.events.Modify(ctx, func(events *wal.Log) error {
err = e.events.Modify(ctx, func(ctx context.Context, events *wal.Log) error {
idx, err = events.LastIndex()
return err
})
@@ -314,3 +319,50 @@ func (e *eventLog) LastIndex(ctx context.Context) (uint64, error) {
func (e *eventLog) LoadForUpdate(ctx context.Context, a event.Aggregate, fn func(context.Context, event.Aggregate) error) (uint64, error) {
panic("not implemented")
}
func readStream(ctx context.Context, stream *wal.Log, index uint64) (event.Event, error) {
_, span := lg.Span(ctx)
defer span.End()
var b []byte
var err error
b, err = stream.Read(index)
if err != nil {
if errors.Is(err, wal.ErrNotFound) || errors.Is(err, wal.ErrOutOfRange) {
err = fmt.Errorf("%w: empty", es.ErrNotFound)
}
span.RecordError(err)
return nil, err
}
e, err := event.UnmarshalBinary(ctx, b, index)
if err != nil {
span.RecordError(err)
return nil, err
}
return e, err
}
func readStreamN(ctx context.Context, stream *wal.Log, index ...uint64) (event.Events, error) {
_, span := lg.Span(ctx)
defer span.End()
var b []byte
var err error
events := make(event.Events, len(index))
for i, idx := range index {
b, err = stream.Read(idx)
if err != nil {
if errors.Is(err, wal.ErrNotFound) || errors.Is(err, wal.ErrOutOfRange) {
err = fmt.Errorf("%w: empty", es.ErrNotFound)
}
span.RecordError(err)
return nil, err
}
events[i], err = event.UnmarshalBinary(ctx, b, idx)
if err != nil {
span.RecordError(err)
return nil, err
}
}
return events, err
}

View File

@@ -14,6 +14,7 @@ type Driver interface {
type EventLog interface {
Read(ctx context.Context, after, count int64) (event.Events, error)
ReadN(ctx context.Context, index ...uint64) (event.Events, error)
Append(ctx context.Context, events event.Events, version uint64) (uint64, error)
FirstIndex(context.Context) (uint64, error)
LastIndex(context.Context) (uint64, error)

View File

@@ -49,7 +49,7 @@ func (m *memstore) EventLog(ctx context.Context, streamID string) (driver.EventL
el := &eventLog{streamID: streamID}
err := m.state.Modify(ctx, func(state *state) error {
err := m.state.Modify(ctx, func(ctx context.Context, state *state) error {
_, span := lg.Span(ctx)
defer span.End()
@@ -76,7 +76,7 @@ func (m *eventLog) Append(ctx context.Context, events event.Events, version uint
event.SetStreamID(m.streamID, events...)
return uint64(len(events)), m.events.Modify(ctx, func(stream *event.Events) error {
return uint64(len(events)), m.events.Modify(ctx, func(ctx context.Context, stream *event.Events) error {
_, span := lg.Span(ctx)
defer span.End()
@@ -111,6 +111,23 @@ func (m *eventLog) Append(ctx context.Context, events event.Events, version uint
})
}
// ReadOne implements readone
func (m *eventLog) ReadN(ctx context.Context, index ...uint64) (event.Events, error) {
_, span := lg.Span(ctx)
defer span.End()
var events event.Events
err := m.events.Modify(ctx, func(ctx context.Context, stream *event.Events) error {
var err error
events, err = readStreamN(ctx, stream, index...)
return err
})
return events, err
}
// Read implements driver.EventStore
func (m *eventLog) Read(ctx context.Context, after int64, count int64) (event.Events, error) {
ctx, span := lg.Span(ctx)
@@ -118,7 +135,7 @@ func (m *eventLog) Read(ctx context.Context, after int64, count int64) (event.Ev
var events event.Events
err := m.events.Modify(ctx, func(stream *event.Events) error {
err := m.events.Modify(ctx, func(ctx context.Context, stream *event.Events) error {
_, span := lg.Span(ctx)
defer span.End()
@@ -141,12 +158,8 @@ func (m *eventLog) Read(ctx context.Context, after int64, count int64) (event.Ev
span.AddEvent(fmt.Sprintf("read event %d of %d", i, math.Abs(count)))
// --- clone event
e := (*stream)[start-1]
b, err := event.MarshalBinary(e)
if err != nil {
return err
}
events[i], err = event.UnmarshalBinary(ctx, b, e.EventMeta().Position)
var err error
events[i], err = readStream(ctx, stream, start)
if err != nil {
return err
}
@@ -194,3 +207,42 @@ func (m *eventLog) LastIndex(ctx context.Context) (uint64, error) {
func (m *eventLog) LoadForUpdate(ctx context.Context, a event.Aggregate, fn func(context.Context, event.Aggregate) error) (uint64, error) {
panic("not implemented")
}
func readStream(ctx context.Context, stream *event.Events, index uint64) (event.Event, error) {
_, span := lg.Span(ctx)
defer span.End()
var b []byte
var err error
e := (*stream)[index-1]
b, err = event.MarshalBinary(e)
if err != nil {
return nil, err
}
e, err = event.UnmarshalBinary(ctx, b, e.EventMeta().Position)
if err != nil {
return nil, err
}
return e, err
}
func readStreamN(ctx context.Context, stream *event.Events, index ...uint64) (event.Events, error) {
_, span := lg.Span(ctx)
defer span.End()
var b []byte
var err error
events := make(event.Events, len(index))
for i, index := range index {
e := (*stream)[index-1]
b, err = event.MarshalBinary(e)
if err != nil {
return nil, err
}
events[i], err = event.UnmarshalBinary(ctx, b, e.EventMeta().Position)
if err != nil {
return nil, err
}
}
return events, err
}

View File

@@ -53,6 +53,12 @@ func (w *wrapper) Read(ctx context.Context, after int64, count int64) (event.Eve
return w.up.Read(ctx, after, count)
}
func (w *wrapper) ReadN(ctx context.Context, index ...uint64) (event.Events, error) {
ctx, span := lg.Span(ctx)
defer span.End()
return w.up.ReadN(ctx, index...)
}
func (w *wrapper) Append(ctx context.Context, events event.Events, version uint64) (uint64, error) {
ctx, span := lg.Span(ctx)
defer span.End()
@@ -123,10 +129,11 @@ func (w *wrapper) LoadForUpdate(ctx context.Context, a event.Aggregate, fn func(
}
func DefaultProjection(e event.Event) []event.Event {
eventType := event.TypeOf(e)
m := e.EventMeta()
streamID := m.StreamID
streamPos := m.Position
eventType := event.TypeOf(e)
pkg, _, _ := strings.Cut(eventType, ".")
e1 := event.NewPtr(streamID, streamPos)
event.SetStreamID("$all", e1)
@@ -135,7 +142,6 @@ func DefaultProjection(e event.Event) []event.Event {
event.SetStreamID("$type-"+eventType, e2)
e3 := event.NewPtr(streamID, streamPos)
pkg, _, _ := strings.Cut(eventType, ".")
event.SetStreamID("$pkg-"+pkg, e3)
return []event.Event{e1, e2, e3}

View File

@@ -39,6 +39,7 @@ type mockEventLog struct {
onFirstIndex func(context.Context) (uint64, error)
onLastIndex func(context.Context) (uint64, error)
onRead func(context.Context, int64, int64) (event.Events, error)
onReadN func(context.Context, ...uint64) (event.Events, error)
}
// Append implements driver.EventLog
@@ -73,6 +74,13 @@ func (m *mockEventLog) Read(ctx context.Context, pos int64, count int64) (event.
panic("unimplemented")
}
func (m *mockEventLog) ReadN(ctx context.Context, index ...uint64) (event.Events, error) {
if m.onReadN != nil {
return m.onReadN(ctx, index...)
}
panic("unimplemented")
}
var _ driver.EventLog = (*mockEventLog)(nil)

View File

@@ -73,7 +73,7 @@ func (s *streamer) Subscribe(ctx context.Context, streamID string, start int64)
})
sub.unsub = s.delete(streamID, sub)
return sub, s.state.Modify(ctx, func(state *state) error {
return sub, s.state.Modify(ctx, func(ctx context.Context, state *state) error {
state.subscribers[streamID] = append(state.subscribers[streamID], sub)
return nil
})
@@ -82,13 +82,13 @@ func (s *streamer) Send(ctx context.Context, streamID string, events event.Event
ctx, span := lg.Span(ctx)
defer span.End()
return s.state.Modify(ctx, func(state *state) error {
return s.state.Modify(ctx, func(ctx context.Context, state *state) error {
ctx, span := lg.Span(ctx)
defer span.End()
span.AddEvent(fmt.Sprint("subscribers=", len(state.subscribers[streamID])))
for _, sub := range state.subscribers[streamID] {
err := sub.position.Modify(ctx, func(position *position) error {
err := sub.position.Modify(ctx, func(ctx context.Context, position *position) error {
_, span := lg.Span(ctx)
defer span.End()
@@ -116,7 +116,7 @@ func (s *streamer) delete(streamID string, sub *subscription) func(context.Conte
if err := ctx.Err(); err != nil {
return err
}
return s.state.Modify(ctx, func(state *state) error {
return s.state.Modify(ctx, func(ctx context.Context, state *state) error {
_, span := lg.Span(ctx)
defer span.End()
@@ -148,6 +148,12 @@ func (w *wrapper) Read(ctx context.Context, after int64, count int64) (event.Eve
return w.up.Read(ctx, after, count)
}
func (w *wrapper) ReadN(ctx context.Context, index ...uint64) (event.Events, error) {
ctx, span := lg.Span(ctx)
defer span.End()
return w.up.ReadN(ctx, index...)
}
func (w *wrapper) Append(ctx context.Context, events event.Events, version uint64) (uint64, error) {
ctx, span := lg.Span(ctx)
defer span.End()
@@ -215,7 +221,7 @@ func (s *subscription) Recv(ctx context.Context) bool {
var wait func(context.Context) bool
err := s.position.Modify(ctx, func(position *position) error {
err := s.position.Modify(ctx, func(ctx context.Context, position *position) error {
_, span := lg.Span(ctx)
defer span.End()
@@ -263,7 +269,7 @@ func (s *subscription) Events(ctx context.Context) (event.Events, error) {
defer span.End()
var events event.Events
return events, s.position.Modify(ctx, func(position *position) error {
return events, s.position.Modify(ctx, func(ctx context.Context, position *position) error {
ctx, span := lg.Span(ctx)
defer span.End()