diff --git a/pkg/es/driver/disk-store/disk-store.go b/pkg/es/driver/disk-store/disk-store.go index 020e10d..b502c02 100644 --- a/pkg/es/driver/disk-store/disk-store.go +++ b/pkg/es/driver/disk-store/disk-store.go @@ -6,11 +6,13 @@ import ( "context" "errors" "fmt" + "hash/fnv" "os" "path/filepath" "strings" "github.com/tidwall/wal" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric/instrument/syncint64" "go.uber.org/multierr" @@ -43,7 +45,7 @@ const AppendOnly = es.AppendOnly const AllEvents = es.AllEvents func Init(ctx context.Context) error { - _, span := lg.Span(ctx) + ctx, span := lg.Span(ctx) defer span.End() d := &diskStore{} @@ -74,6 +76,10 @@ func (d *diskStore) Open(ctx context.Context, dsn string) (driver.Driver, error) _, span := lg.Span(ctx) defer span.End() + span.SetAttributes( + attribute.String("args.dsn", dsn), + ) + scheme, path, ok := strings.Cut(dsn, ":") if !ok { return nil, fmt.Errorf("expected scheme") @@ -91,11 +97,11 @@ func (d *diskStore) Open(ctx context.Context, dsn string) (driver.Driver, error) } } c, err := cache.NewWithEvict(CachSize, func(ctx context.Context, s string, l *lockedWal) { - _, span := lg.Span(ctx) + ctx, span := lg.Span(ctx) defer span.End() l.Modify(ctx, func(ctx context.Context, w *wal.Log) error { - _, span := lg.Span(ctx) + ctx, span := lg.Span(ctx) defer span.End() d.m_disk_evict.Add(ctx, 1) @@ -123,13 +129,18 @@ func (d *diskStore) Open(ctx context.Context, dsn string) (driver.Driver, error) }, nil } func (d *diskStore) EventLog(ctx context.Context, streamID string) (driver.EventLog, error) { - _, span := lg.Span(ctx) + ctx, span := lg.Span(ctx) defer span.End() + span.SetAttributes( + attribute.String("args.streamID", streamID), + attribute.String("path", d.path), + ) + el := &eventLog{streamID: streamID, diskStore: d} return el, d.openlogs.Modify(ctx, func(ctx context.Context, openlogs *openlogs) error { - _, span := lg.Span(ctx) + ctx, span := lg.Span(ctx) defer span.End() if events, ok := openlogs.logs.Get(streamID); ok { @@ -139,7 +150,16 @@ func (d *diskStore) EventLog(ctx context.Context, streamID string) (driver.Event d.m_disk_open.Add(ctx, 1) - l, err := wal.Open(filepath.Join(d.path, streamID), wal.DefaultOptions) + // migrate streams into dir friendly subdirs + hashPart := mkDirName(streamID) + oldPath := filepath.Join(d.path, streamID) + newPath := filepath.Join(d.path, hashPart, streamID) + if _, err := os.Stat(oldPath); !os.IsNotExist(err) { + os.MkdirAll(filepath.Join(d.path, hashPart), 0700) + os.Rename(oldPath, newPath) + } + + l, err := wal.Open(newPath, wal.DefaultOptions) if err != nil { span.RecordError(err) return err @@ -160,14 +180,22 @@ type eventLog struct { var _ driver.EventLog = (*eventLog)(nil) func (e *eventLog) Append(ctx context.Context, events event.Events, version uint64) (uint64, error) { - _, span := lg.Span(ctx) + ctx, span := lg.Span(ctx) defer span.End() + span.SetAttributes( + attribute.Int("args.events", len(events)), + attribute.Int64("args.version", int64(version)), + attribute.String("streamID", e.streamID), + attribute.String("path", e.diskStore.path), + + ) + event.SetStreamID(e.streamID, events...) var count uint64 err := e.events.Modify(ctx, func(ctx context.Context, l *wal.Log) error { - _, span := lg.Span(ctx) + ctx, span := lg.Span(ctx) defer span.End() last, err := l.LastIndex() @@ -210,13 +238,19 @@ func (e *eventLog) Append(ctx context.Context, events event.Events, version uint return count, err } func (e *eventLog) Read(ctx context.Context, after, count int64) (event.Events, error) { - _, span := lg.Span(ctx) + ctx, span := lg.Span(ctx) defer span.End() + span.SetAttributes( + attribute.Int64("args.after", after), + attribute.Int64("args.count", count), + attribute.String("streamID", e.streamID), + attribute.String("path", e.diskStore.path), + ) var events event.Events err := e.events.Modify(ctx, func(ctx context.Context, stream *wal.Log) error { - _, span := lg.Span(ctx) + ctx, span := lg.Span(ctx) defer span.End() first, err := stream.FirstIndex() @@ -239,9 +273,17 @@ func (e *eventLog) Read(ctx context.Context, after, count int64) (event.Events, return nil } + span.SetAttributes( + attribute.Int64("first", int64(first)), + attribute.Int64("last", int64(last)), + attribute.Int64("start", int64(start)), + attribute.Int64("count", int64(count)), + attribute.Int64("after", int64(after)), + + ) + events = make([]event.Event, math.Abs(count)) for i := range events { - span.AddEvent(fmt.Sprintf("read event %d of %d", i, len(events))) // --- events[i], err = readStream(ctx, stream, start) @@ -250,6 +292,7 @@ func (e *eventLog) Read(ctx context.Context, after, count int64) (event.Events, return err } // --- + span.AddEvent(fmt.Sprintf("read event %d of %d - %d", i, len(events), events[i].EventMeta().ActualPosition)) if count > 0 { start += 1 @@ -274,9 +317,20 @@ func (e *eventLog) Read(ctx context.Context, after, count int64) (event.Events, return events, nil } func (e *eventLog) ReadN(ctx context.Context, index ...uint64) (event.Events, error) { - _, span := lg.Span(ctx) + ctx, span := lg.Span(ctx) defer span.End() + lis := make([]int64, len(index)) + for i := range index { + lis[i]=int64(index[i]) + } + + span.SetAttributes( + attribute.Int64Slice("args.index", lis), + attribute.String("streamID", e.streamID), + attribute.String("path", e.diskStore.path), + ) + var events event.Events err := e.events.Modify(ctx, func(ctx context.Context, stream *wal.Log) error { var err error @@ -289,8 +343,13 @@ func (e *eventLog) ReadN(ctx context.Context, index ...uint64) (event.Events, er return events, err } func (e *eventLog) FirstIndex(ctx context.Context) (uint64, error) { - _, span := lg.Span(ctx) + ctx, span := lg.Span(ctx) defer span.End() + + span.SetAttributes( + attribute.String("streamID", e.streamID), + attribute.String("path", e.diskStore.path), + ) var idx uint64 var err error @@ -303,9 +362,14 @@ func (e *eventLog) FirstIndex(ctx context.Context) (uint64, error) { return idx, err } func (e *eventLog) LastIndex(ctx context.Context) (uint64, error) { - _, span := lg.Span(ctx) + ctx, span := lg.Span(ctx) defer span.End() + span.SetAttributes( + attribute.String("streamID", e.streamID), + attribute.String("path", e.diskStore.path), + ) + var idx uint64 var err error @@ -316,13 +380,34 @@ func (e *eventLog) LastIndex(ctx context.Context) (uint64, error) { return idx, err } -func (e *eventLog) LoadForUpdate(ctx context.Context, a event.Aggregate, fn func(context.Context, event.Aggregate) error) (uint64, error) { - panic("not implemented") +func (e *eventLog) Truncate(ctx context.Context, index int64) error { + ctx, span := lg.Span(ctx) + defer span.End() + + span.SetAttributes( + attribute.Int64("args.index", index), + attribute.String("streamID", e.streamID), + attribute.String("path", e.diskStore.path), + ) + + if index == 0 { + return nil + } + return e.events.Modify(ctx, func(ctx context.Context, events *wal.Log) error { + if index < 0 { + return events.TruncateBack(uint64(-index)) + } + return events.TruncateFront(uint64(index)) + }) } func readStream(ctx context.Context, stream *wal.Log, index uint64) (event.Event, error) { - _, span := lg.Span(ctx) + ctx, span := lg.Span(ctx) defer span.End() + span.SetAttributes( + attribute.Int64("args.index", int64(index)), + ) + var b []byte var err error b, err = stream.Read(index) @@ -342,9 +427,18 @@ func readStream(ctx context.Context, stream *wal.Log, index uint64) (event.Event return e, err } func readStreamN(ctx context.Context, stream *wal.Log, index ...uint64) (event.Events, error) { - _, span := lg.Span(ctx) + ctx, span := lg.Span(ctx) defer span.End() + lis := make([]int64, len(index)) + for i := range index { + lis[i]=int64(index[i]) + } + + span.SetAttributes( + attribute.Int64Slice("args.index", lis), + ) + var b []byte var err error events := make(event.Events, len(index)) @@ -365,4 +459,9 @@ func readStreamN(ctx context.Context, stream *wal.Log, index ...uint64) (event.E } } return events, err -} \ No newline at end of file +} +func mkDirName(name string) string { + h := fnv.New32a() + fmt.Fprint(h, name) + return fmt.Sprintf("%x/%x/%x", h.Sum32()>>24&0xff, h.Sum32()>>16&0xff, h.Sum32()&0xffff) +} diff --git a/pkg/es/driver/driver.go b/pkg/es/driver/driver.go index 96deccd..382a952 100644 --- a/pkg/es/driver/driver.go +++ b/pkg/es/driver/driver.go @@ -19,6 +19,10 @@ type EventLog interface { FirstIndex(context.Context) (uint64, error) LastIndex(context.Context) (uint64, error) } +type EventLogWithTruncate interface { + Truncate(context.Context, int64) error +} + type EventLogWithUpdate interface { LoadForUpdate(context.Context, event.Aggregate, func(context.Context, event.Aggregate) error) (uint64, error) } diff --git a/pkg/es/driver/mem-store/mem-store.go b/pkg/es/driver/mem-store/mem-store.go index eeaf78e..370f476 100644 --- a/pkg/es/driver/mem-store/mem-store.go +++ b/pkg/es/driver/mem-store/mem-store.go @@ -77,7 +77,7 @@ func (m *eventLog) Append(ctx context.Context, events event.Events, version uint event.SetStreamID(m.streamID, events...) return uint64(len(events)), m.events.Modify(ctx, func(ctx context.Context, stream *event.Events) error { - _, span := lg.Span(ctx) + ctx, span := lg.Span(ctx) defer span.End() span.AddEvent(fmt.Sprintf(" %s %d", m.streamID, len(*stream))) @@ -113,7 +113,7 @@ func (m *eventLog) Append(ctx context.Context, events event.Events, version uint // ReadOne implements readone func (m *eventLog) ReadN(ctx context.Context, index ...uint64) (event.Events, error) { - _, span := lg.Span(ctx) + ctx, span := lg.Span(ctx) defer span.End() var events event.Events @@ -136,7 +136,7 @@ func (m *eventLog) Read(ctx context.Context, after int64, count int64) (event.Ev var events event.Events err := m.events.Modify(ctx, func(ctx context.Context, stream *event.Events) error { - _, span := lg.Span(ctx) + ctx, span := lg.Span(ctx) defer span.End() span.AddEvent(fmt.Sprintf("%s %d", m.streamID, len(*stream))) @@ -188,7 +188,7 @@ func (m *eventLog) Read(ctx context.Context, after int64, count int64) (event.Ev // FirstIndex for the streamID func (m *eventLog) FirstIndex(ctx context.Context) (uint64, error) { - _, span := lg.Span(ctx) + ctx, span := lg.Span(ctx) defer span.End() events, err := m.events.Copy(ctx) @@ -197,7 +197,7 @@ func (m *eventLog) FirstIndex(ctx context.Context) (uint64, error) { // LastIndex for the streamID func (m *eventLog) LastIndex(ctx context.Context) (uint64, error) { - _, span := lg.Span(ctx) + ctx, span := lg.Span(ctx) defer span.End() events, err := m.events.Copy(ctx) @@ -209,7 +209,7 @@ func (m *eventLog) LoadForUpdate(ctx context.Context, a event.Aggregate, fn func } func readStream(ctx context.Context, stream *event.Events, index uint64) (event.Event, error) { - _, span := lg.Span(ctx) + ctx, span := lg.Span(ctx) defer span.End() var b []byte @@ -219,14 +219,14 @@ func readStream(ctx context.Context, stream *event.Events, index uint64) (event. if err != nil { return nil, err } - e, err = event.UnmarshalBinary(ctx, b, e.EventMeta().Position) + e, err = event.UnmarshalBinary(ctx, b, e.EventMeta().ActualPosition) if err != nil { return nil, err } return e, err } func readStreamN(ctx context.Context, stream *event.Events, index ...uint64) (event.Events, error) { - _, span := lg.Span(ctx) + ctx, span := lg.Span(ctx) defer span.End() var b []byte diff --git a/pkg/es/driver/projecter/projecter.go b/pkg/es/driver/projecter/projecter.go index 8809ab9..414d652 100644 --- a/pkg/es/driver/projecter/projecter.go +++ b/pkg/es/driver/projecter/projecter.go @@ -47,6 +47,9 @@ type wrapper struct { var _ driver.EventLog = (*wrapper)(nil) +func (r *wrapper) Unwrap() driver.EventLog { + return r.up +} func (w *wrapper) Read(ctx context.Context, after int64, count int64) (event.Events, error) { ctx, span := lg.Span(ctx) defer span.End() @@ -114,19 +117,7 @@ func (w *wrapper) LastIndex(ctx context.Context) (uint64, error) { return w.up.LastIndex(ctx) } -func (w *wrapper) LoadForUpdate(ctx context.Context, a event.Aggregate, fn func(context.Context, event.Aggregate) error) (uint64, error) { - ctx, span := lg.Span(ctx) - defer span.End() - up := w.up - for up != nil { - if up, ok := up.(driver.EventLogWithUpdate); ok { - return up.LoadForUpdate(ctx, a, fn) - } - up = es.Unwrap(up) - } - return 0, es.ErrNoDriver -} func DefaultProjection(e event.Event) []event.Event { m := e.EventMeta() diff --git a/pkg/es/driver/resolve-links/resolve-links.go b/pkg/es/driver/resolve-links/resolve-links.go index 96358da..2c21098 100644 --- a/pkg/es/driver/resolve-links/resolve-links.go +++ b/pkg/es/driver/resolve-links/resolve-links.go @@ -2,6 +2,7 @@ package resolvelinks import ( "context" + "errors" "github.com/sour-is/ev/internal/lg" "github.com/sour-is/ev/pkg/es" @@ -43,6 +44,9 @@ type wrapper struct { resolvelinks *resolvelinks } +func (r *wrapper) Unwrap() driver.EventLog { + return r.up +} func (w *wrapper) Read(ctx context.Context, after int64, count int64) (event.Events, error) { ctx, span := lg.Span(ctx) defer span.End() @@ -60,11 +64,18 @@ func (w *wrapper) Read(ctx context.Context, after int64, count int64) (event.Eve return nil, err } lis, err := d.ReadN(ctx, e.Pos) - if err != nil { + if err != nil && !errors.Is(err, es.ErrNotFound) { return nil, err } - events[i] = lis.First() + if ne := lis.First(); ne != event.NilEvent { + meta := ne.EventMeta() + actual := e.EventMeta() + meta.ActualPosition = actual.Position + meta.ActualStreamID = actual.ActualStreamID + ne.SetEventMeta(meta) + events[i] = ne + } } } @@ -92,7 +103,14 @@ func (w *wrapper) ReadN(ctx context.Context, index ...uint64) (event.Events, err return nil, err } - events[i] = lis.First() + ne := lis.First() + meta := ne.EventMeta() + actual := e.EventMeta() + meta.ActualPosition = actual.Position + meta.ActualStreamID = actual.ActualStreamID + ne.SetEventMeta(meta) + + events[i] = ne } } diff --git a/pkg/es/driver/streamer/streamer.go b/pkg/es/driver/streamer/streamer.go index 5865b43..13b3552 100644 --- a/pkg/es/driver/streamer/streamer.go +++ b/pkg/es/driver/streamer/streamer.go @@ -5,13 +5,14 @@ import ( "context" "fmt" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" + "github.com/sour-is/ev/internal/lg" "github.com/sour-is/ev/pkg/es" "github.com/sour-is/ev/pkg/es/driver" "github.com/sour-is/ev/pkg/es/event" "github.com/sour-is/ev/pkg/locker" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/trace" ) type state struct { @@ -89,10 +90,16 @@ func (s *streamer) Send(ctx context.Context, streamID string, events event.Event for _, sub := range state.subscribers[streamID] { err := sub.position.Modify(ctx, func(ctx context.Context, position *position) error { - _, span := lg.Span(ctx) + ctx, span := lg.Span(ctx) defer span.End() + span.SetAttributes( + attribute.String("streamID", streamID), + attribute.Int64("actualPosition", int64(events.Last().EventMeta().ActualPosition)), + attribute.String("actualStreamID", events.Last().EventMeta().ActualStreamID), + attribute.Int64("position", int64(events.Last().EventMeta().Position)), + ) - position.size = int64(events.Last().EventMeta().Position - uint64(position.idx)) + position.size = int64(events.Last().EventMeta().ActualPosition - uint64(position.idx)) if position.wait != nil { close(position.wait) @@ -142,6 +149,9 @@ type wrapper struct { var _ driver.EventLog = (*wrapper)(nil) +func (r *wrapper) Unwrap() driver.EventLog { + return r.up +} func (w *wrapper) Read(ctx context.Context, after int64, count int64) (event.Events, error) { ctx, span := lg.Span(ctx) defer span.End() @@ -185,19 +195,6 @@ func (w *wrapper) LastIndex(ctx context.Context) (uint64, error) { return w.up.LastIndex(ctx) } -func (w *wrapper) LoadForUpdate(ctx context.Context, a event.Aggregate, fn func(context.Context, event.Aggregate) error) (uint64, error) { - ctx, span := lg.Span(ctx) - defer span.End() - - up := w.up - for up != nil { - if up, ok := up.(driver.EventLogWithUpdate); ok { - return up.LoadForUpdate(ctx, a, fn) - } - up = es.Unwrap(up) - } - return 0, es.ErrNoDriver -} type position struct { size int64 @@ -280,8 +277,15 @@ func (s *subscription) Events(ctx context.Context) (event.Events, error) { } position.size = int64(len(events)) if len(events) > 0 { - position.idx = int64(events.First().EventMeta().Position - 1) + position.idx = int64(events.First().EventMeta().ActualPosition - 1) } + span.SetAttributes( + attribute.Int64("position.idx", position.idx), + attribute.Int64("position.size", position.size), + attribute.Int64("meta.ActualPosition", int64(events.First().EventMeta().ActualPosition)), + attribute.Int64("meta.Position", int64(events.First().EventMeta().Position)), + ) + return err }) } diff --git a/pkg/es/es.go b/pkg/es/es.go index 9e3b10c..db994e6 100644 --- a/pkg/es/es.go +++ b/pkg/es/es.go @@ -7,13 +7,14 @@ import ( "fmt" "strings" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric/instrument/syncint64" + "go.uber.org/multierr" + "github.com/sour-is/ev/internal/lg" "github.com/sour-is/ev/pkg/es/driver" "github.com/sour-is/ev/pkg/es/event" "github.com/sour-is/ev/pkg/locker" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/metric/instrument/syncint64" - "go.uber.org/multierr" ) type config struct { @@ -194,7 +195,7 @@ func (es *EventStore) ReadN(ctx context.Context, streamID string, index ...uint6 defer span.End() lis := make([]int64, len(index)) - for i,j :=range index { + for i, j := range index { lis[i] = int64(j) } @@ -231,6 +232,7 @@ func (es *EventStore) Append(ctx context.Context, streamID string, events event. func (es *EventStore) FirstIndex(ctx context.Context, streamID string) (uint64, error) { ctx, span := lg.Span(ctx) defer span.End() + span.SetAttributes( attribute.String("ev.streamID", streamID), ) @@ -244,6 +246,7 @@ func (es *EventStore) FirstIndex(ctx context.Context, streamID string) (uint64, func (es *EventStore) LastIndex(ctx context.Context, streamID string) (uint64, error) { ctx, span := lg.Span(ctx) defer span.End() + span.SetAttributes( attribute.String("ev.streamID", streamID), ) @@ -268,6 +271,23 @@ func (es *EventStore) EventStream() driver.EventStream { } return nil } +func (es *EventStore) Truncate(ctx context.Context, streamID string, index int64) error { + ctx, span := lg.Span(ctx) + defer span.End() + + up, err := es.Driver.EventLog(ctx, streamID) + if err != nil { + return err + } + + for up != nil { + if up, ok := up.(driver.EventLogWithTruncate); ok { + return up.Truncate(ctx, index) + } + up = Unwrap(up) + } + return ErrNoDriver +} func Unwrap[T any](t T) T { if unwrap, ok := any(t).(interface{ Unwrap() T }); ok { diff --git a/pkg/es/es.graphqls b/pkg/es/es.graphqls index 40dd28d..a98d369 100644 --- a/pkg/es/es.graphqls +++ b/pkg/es/es.graphqls @@ -1,14 +1,17 @@ type Meta @goModel(model: "github.com/sour-is/ev/pkg/es/event.Meta") { eventID: String! @goField(name: "getEventID") - streamID: String! + streamID: String! @goField(name: "ActualStreamID") + position: Int! @goField(name: "ActualPosition") created: Time! - position: Int! } extend type Query { events(streamID: String! paging: PageInput): Connection! } +extend type Mutation { + truncateStream(streamID: String! index:Int!): Boolean! +} extend type Subscription { """after == 0 start from begining, after == -1 start from end""" eventAdded(streamID: String! after: Int! = -1): Event @@ -18,6 +21,9 @@ type Event implements Edge @goModel(model: "github.com/sour-is/ev/pkg/es.GQLEven id: ID! eventID: String! + streamID: String! + position: Int! + values: Map! bytes: String! type: String! diff --git a/pkg/es/event/events.go b/pkg/es/event/events.go index 160ad38..334bfc0 100644 --- a/pkg/es/event/events.go +++ b/pkg/es/event/events.go @@ -107,6 +107,9 @@ func SetStreamID(id string, lis ...Event) { for _, e := range lis { meta := e.EventMeta() meta.StreamID = id + if meta.ActualStreamID == "" { + meta.ActualStreamID = id + } e.SetEventMeta(meta) } } @@ -122,13 +125,16 @@ func SetEventID(e Event, id ulid.ULID) { func SetPosition(e Event, i uint64) { meta := e.EventMeta() meta.Position = i + meta.ActualPosition = i e.SetEventMeta(meta) } type Meta struct { - EventID ulid.ULID - StreamID string - Position uint64 + EventID ulid.ULID + StreamID string + Position uint64 + ActualStreamID string + ActualPosition uint64 } func (m Meta) Created() time.Time { @@ -146,11 +152,10 @@ func Init(ctx context.Context) error { return nil } -type nilEvent struct{} - -func (*nilEvent) EventMeta() Meta { - return Meta{} +type nilEvent struct { } + +func (*nilEvent) EventMeta() Meta { return Meta{} } func (*nilEvent) SetEventMeta(eventMeta Meta) {} var NilEvent = &nilEvent{} diff --git a/pkg/es/event/reflect.go b/pkg/es/event/reflect.go index 7922178..a433592 100644 --- a/pkg/es/event/reflect.go +++ b/pkg/es/event/reflect.go @@ -67,7 +67,7 @@ func (u *UnknownEvent) MarshalBinary() ([]byte, error) { // Register a type container for Unmarshalling values into. The type must implement Event and not be a nil value. func Register(ctx context.Context, lis ...Event) error { - _, span := lg.Span(ctx) + ctx, span := lg.Span(ctx) defer span.End() for _, e := range lis { @@ -84,7 +84,7 @@ func Register(ctx context.Context, lis ...Event) error { return nil } func RegisterName(ctx context.Context, name string, e Event) error { - _, span := lg.Span(ctx) + ctx, span := lg.Span(ctx) defer span.End() if e == nil { @@ -119,7 +119,7 @@ func RegisterName(ctx context.Context, name string, e Event) error { return nil } func GetContainer(ctx context.Context, s string) Event { - _, span := lg.Span(ctx) + ctx, span := lg.Span(ctx) defer span.End() var e Event @@ -176,7 +176,7 @@ func MarshalBinary(e Event) (txt []byte, err error) { } func UnmarshalBinary(ctx context.Context, txt []byte, pos uint64) (e Event, err error) { - _, span := lg.Span(ctx) + ctx, span := lg.Span(ctx) defer span.End() sp := bytes.SplitN(txt, []byte{'\t'}, 4) @@ -194,6 +194,8 @@ func UnmarshalBinary(ctx context.Context, txt []byte, pos uint64) (e Event, err m.StreamID = string(sp[1]) m.Position = pos + m.ActualStreamID = string(sp[1]) + m.ActualPosition = pos eventType := string(sp[2]) e = GetContainer(ctx, eventType) diff --git a/pkg/es/graph.go b/pkg/es/graph.go index 74faf2b..a8301ad 100644 --- a/pkg/es/graph.go +++ b/pkg/es/graph.go @@ -15,6 +15,7 @@ import ( type EventResolver interface { Events(ctx context.Context, streamID string, paging *gql.PageInput) (*gql.Connection, error) EventAdded(ctx context.Context, streamID string, after int64) (<-chan *GQLEvent, error) + TruncateStream(ctx context.Context, streamID string, index int64) (bool, error) } type contextKey struct { name string @@ -112,11 +113,21 @@ func (e *EventStore) EventAdded(ctx context.Context, streamID string, after int6 return ch, nil } +func (es *EventStore) TruncateStream(ctx context.Context, streamID string, index int64) (bool, error) { + ctx, span := lg.Span(ctx) + defer span.End() + + err := es.Truncate(ctx, streamID, index) + return err == nil, err +} func (*EventStore) RegisterHTTP(*http.ServeMux) {} func (e *EventStore) GetMiddleware() func(http.Handler) http.Handler { return func(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - r = r.WithContext(gql.ToContext(r.Context(), esKey, e)) + ctx, span := lg.Span(r.Context()) + defer span.End() + + r = r.WithContext(gql.ToContext(ctx, esKey, e)) next.ServeHTTP(w, r) }) } @@ -132,6 +143,12 @@ func (e *GQLEvent) ID() string { func (e *GQLEvent) EventID() string { return e.e.EventMeta().GetEventID() } +func (e *GQLEvent) StreamID() string { + return e.e.EventMeta().StreamID +} +func (e *GQLEvent) Position() uint64 { + return e.e.EventMeta().Position +} func (e *GQLEvent) Type() string { return event.TypeOf(e.e) } @@ -150,6 +167,9 @@ func (e *GQLEvent) Meta() *event.Meta { return &meta } func (e *GQLEvent) Linked(ctx context.Context) (*GQLEvent, error) { + ctx, span := lg.Span(ctx) + defer span.End() + values := event.Values(e.e) streamID, ok := values["stream_id"].(string) if !ok {