refactor: split go-pkg

This commit is contained in:
xuu
2023-07-12 17:35:02 -06:00
parent d67d768c1e
commit 0f665d3484
80 changed files with 785 additions and 3993 deletions

View File

@@ -1,465 +0,0 @@
// package diskstore provides a driver that reads and writes events to disk.
package diskstore
import (
"context"
"errors"
"fmt"
"hash/fnv"
"os"
"path/filepath"
"strings"
"github.com/tidwall/wal"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric/instrument/syncint64"
"go.uber.org/multierr"
"go.sour.is/ev"
"go.sour.is/ev/internal/lg"
"go.sour.is/ev/pkg/cache"
"go.sour.is/ev/pkg/es/driver"
"go.sour.is/ev/pkg/es/event"
"go.sour.is/ev/pkg/locker"
"go.sour.is/ev/pkg/math"
)
const CachSize = 1000
type lockedWal = locker.Locked[wal.Log]
type openlogs struct {
logs *cache.Cache[string, *lockedWal]
}
type diskStore struct {
path string
openlogs *locker.Locked[openlogs]
m_disk_open syncint64.Counter
m_disk_evict syncint64.Counter
m_disk_read syncint64.Counter
m_disk_write syncint64.Counter
}
const AppendOnly = ev.AppendOnly
const AllEvents = ev.AllEvents
func Init(ctx context.Context) error {
ctx, span := lg.Span(ctx)
defer span.End()
d := &diskStore{}
m := lg.Meter(ctx)
var err, errs error
d.m_disk_open, err = m.SyncInt64().Counter("disk_open")
errs = multierr.Append(errs, err)
d.m_disk_evict, err = m.SyncInt64().Counter("disk_evict")
errs = multierr.Append(errs, err)
d.m_disk_read, err = m.SyncInt64().Counter("disk_read")
errs = multierr.Append(errs, err)
d.m_disk_write, err = m.SyncInt64().Counter("disk_write")
errs = multierr.Append(errs, err)
ev.Register(ctx, "file", d)
return errs
}
var _ driver.Driver = (*diskStore)(nil)
func (d *diskStore) Open(ctx context.Context, dsn string) (driver.Driver, error) {
_, span := lg.Span(ctx)
defer span.End()
span.SetAttributes(
attribute.String("args.dsn", dsn),
)
scheme, path, ok := strings.Cut(dsn, ":")
if !ok {
return nil, fmt.Errorf("expected scheme")
}
if scheme != "file" {
return nil, fmt.Errorf("expeted scheme=file, got=%s", scheme)
}
if _, err := os.Stat(path); os.IsNotExist(err) {
err = os.MkdirAll(path, 0700)
if err != nil {
span.RecordError(err)
return nil, err
}
}
c, err := cache.NewWithEvict(CachSize, func(ctx context.Context, s string, l *lockedWal) {
ctx, span := lg.Span(ctx)
defer span.End()
l.Use(ctx, func(ctx context.Context, w *wal.Log) error {
ctx, span := lg.Span(ctx)
defer span.End()
d.m_disk_evict.Add(ctx, 1)
err := w.Close()
if err != nil {
span.RecordError(err)
return err
}
return nil
})
})
if err != nil {
span.RecordError(err)
return nil, err
}
logs := &openlogs{logs: c}
return &diskStore{
path: path,
openlogs: locker.New(logs),
m_disk_open: d.m_disk_open,
m_disk_evict: d.m_disk_evict,
m_disk_read: d.m_disk_read,
m_disk_write: d.m_disk_write,
}, nil
}
func (d *diskStore) EventLog(ctx context.Context, streamID string) (driver.EventLog, error) {
ctx, span := lg.Span(ctx)
defer span.End()
span.SetAttributes(
attribute.String("args.streamID", streamID),
attribute.String("path", d.path),
)
el := &eventLog{streamID: streamID, diskStore: d}
return el, d.openlogs.Use(ctx, func(ctx context.Context, openlogs *openlogs) error {
ctx, span := lg.Span(ctx)
defer span.End()
if events, ok := openlogs.logs.Get(streamID); ok {
el.events = *events
return nil
}
d.m_disk_open.Add(ctx, 1)
// migrate streams into dir friendly subdirs
hashPart := mkDirName(streamID)
oldPath := filepath.Join(d.path, streamID)
newPath := filepath.Join(d.path, hashPart, streamID)
if _, err := os.Stat(oldPath); !os.IsNotExist(err) {
os.MkdirAll(filepath.Join(d.path, hashPart), 0700)
os.Rename(oldPath, newPath)
}
l, err := wal.Open(newPath, wal.DefaultOptions)
if err != nil {
span.RecordError(err)
return err
}
el.events = locker.New(l)
openlogs.logs.Add(ctx, streamID, el.events)
return nil
})
}
type eventLog struct {
streamID string
events *locker.Locked[wal.Log]
diskStore *diskStore
}
var _ driver.EventLog = (*eventLog)(nil)
func (e *eventLog) Append(ctx context.Context, events event.Events, version uint64) (uint64, error) {
ctx, span := lg.Span(ctx)
defer span.End()
span.SetAttributes(
attribute.Int("args.events", len(events)),
attribute.Int64("args.version", int64(version)),
attribute.String("streamID", e.streamID),
attribute.String("path", e.diskStore.path),
)
event.SetStreamID(e.streamID, events...)
var count uint64
err := e.events.Use(ctx, func(ctx context.Context, l *wal.Log) error {
ctx, span := lg.Span(ctx)
defer span.End()
last, err := l.LastIndex()
if err != nil {
span.RecordError(err)
return err
}
if version != AppendOnly && version != last {
err = fmt.Errorf("%w: current version wrong %d != %d", ev.ErrWrongVersion, version, last)
span.RecordError(err)
return err
}
var b []byte
batch := &wal.Batch{}
for i, e := range events {
span.AddEvent(fmt.Sprintf("append event %d of %d", i, len(events)))
b, err = event.MarshalBinary(e)
if err != nil {
span.RecordError(err)
return err
}
pos := last + uint64(i) + 1
event.SetPosition(e, pos)
batch.Write(pos, b)
}
count = uint64(len(events))
e.diskStore.m_disk_write.Add(ctx, int64(len(events)))
return l.WriteBatch(batch)
})
span.RecordError(err)
return count, err
}
func (e *eventLog) Read(ctx context.Context, after, count int64) (event.Events, error) {
ctx, span := lg.Span(ctx)
defer span.End()
span.SetAttributes(
attribute.Int64("args.after", after),
attribute.Int64("args.count", count),
attribute.String("streamID", e.streamID),
attribute.String("path", e.diskStore.path),
)
var events event.Events
err := e.events.Use(ctx, func(ctx context.Context, stream *wal.Log) error {
ctx, span := lg.Span(ctx)
defer span.End()
first, err := stream.FirstIndex()
if err != nil {
span.RecordError(err)
return err
}
last, err := stream.LastIndex()
if err != nil {
span.RecordError(err)
return err
}
// ---
if first == 0 || last == 0 {
return nil
}
start, count := math.PagerBox(first, last, after, count)
if count == 0 {
return nil
}
span.SetAttributes(
attribute.Int64("first", int64(first)),
attribute.Int64("last", int64(last)),
attribute.Int64("start", int64(start)),
attribute.Int64("count", int64(count)),
attribute.Int64("after", int64(after)),
)
events = make([]event.Event, math.Abs(count))
for i := range events {
// ---
events[i], err = readStream(ctx, stream, start)
if err != nil {
span.RecordError(err)
return err
}
// ---
span.AddEvent(fmt.Sprintf("read event %d of %d - %d", i, len(events), events[i].EventMeta().ActualPosition))
if count > 0 {
start += 1
} else {
start -= 1
}
if start < first || start > last {
events = events[:i+1]
break
}
}
return nil
})
if err != nil {
span.RecordError(err)
return nil, err
}
event.SetStreamID(e.streamID, events...)
e.diskStore.m_disk_read.Add(ctx, int64(len(events)))
return events, nil
}
func (e *eventLog) ReadN(ctx context.Context, index ...uint64) (event.Events, error) {
ctx, span := lg.Span(ctx)
defer span.End()
lis := make([]int64, len(index))
for i := range index {
lis[i] = int64(index[i])
}
span.SetAttributes(
attribute.Int64Slice("args.index", lis),
attribute.String("streamID", e.streamID),
attribute.String("path", e.diskStore.path),
)
var events event.Events
err := e.events.Use(ctx, func(ctx context.Context, stream *wal.Log) error {
var err error
events, err = readStreamN(ctx, stream, index...)
return err
})
return events, err
}
func (e *eventLog) FirstIndex(ctx context.Context) (uint64, error) {
ctx, span := lg.Span(ctx)
defer span.End()
span.SetAttributes(
attribute.String("streamID", e.streamID),
attribute.String("path", e.diskStore.path),
)
var idx uint64
var err error
err = e.events.Use(ctx, func(ctx context.Context, events *wal.Log) error {
idx, err = events.FirstIndex()
return err
})
return idx, err
}
func (e *eventLog) LastIndex(ctx context.Context) (uint64, error) {
ctx, span := lg.Span(ctx)
defer span.End()
span.SetAttributes(
attribute.String("streamID", e.streamID),
attribute.String("path", e.diskStore.path),
)
var idx uint64
var err error
err = e.events.Use(ctx, func(ctx context.Context, events *wal.Log) error {
idx, err = events.LastIndex()
return err
})
return idx, err
}
func (e *eventLog) Truncate(ctx context.Context, index int64) error {
ctx, span := lg.Span(ctx)
defer span.End()
span.SetAttributes(
attribute.Int64("args.index", index),
attribute.String("streamID", e.streamID),
attribute.String("path", e.diskStore.path),
)
if index == 0 {
return nil
}
return e.events.Use(ctx, func(ctx context.Context, events *wal.Log) error {
if index < 0 {
return events.TruncateBack(uint64(-index))
}
return events.TruncateFront(uint64(index))
})
}
func readStream(ctx context.Context, stream *wal.Log, index uint64) (event.Event, error) {
ctx, span := lg.Span(ctx)
defer span.End()
span.SetAttributes(
attribute.Int64("args.index", int64(index)),
)
var b []byte
var err error
b, err = stream.Read(index)
if err != nil {
if errors.Is(err, wal.ErrNotFound) || errors.Is(err, wal.ErrOutOfRange) {
err = fmt.Errorf("%w: empty", ev.ErrNotFound)
}
span.RecordError(err)
return nil, err
}
e, err := event.UnmarshalBinary(ctx, b, index)
if err != nil {
span.RecordError(err)
return nil, err
}
return e, err
}
func readStreamN(ctx context.Context, stream *wal.Log, index ...uint64) (event.Events, error) {
ctx, span := lg.Span(ctx)
defer span.End()
lis := make([]int64, len(index))
for i := range index {
lis[i] = int64(index[i])
}
span.SetAttributes(
attribute.Int64Slice("args.index", lis),
)
var b []byte
var err error
events := make(event.Events, len(index))
for i, idx := range index {
b, err = stream.Read(idx)
if err != nil {
if errors.Is(err, wal.ErrNotFound) || errors.Is(err, wal.ErrOutOfRange) {
err = fmt.Errorf("%w: empty", ev.ErrNotFound)
}
span.RecordError(err)
return nil, err
}
events[i], err = event.UnmarshalBinary(ctx, b, idx)
if err != nil {
span.RecordError(err)
return nil, err
}
}
return events, err
}
func mkDirName(name string) string {
h := fnv.New32a()
fmt.Fprint(h, name)
return fmt.Sprintf("%x/%x/%x", h.Sum32()>>24&0xff, h.Sum32()>>16&0xff, h.Sum32()&0xffff)
}

View File

@@ -1,40 +0,0 @@
// package driver defines interfaces to be used by driver implementations.
package driver
import (
"context"
"go.sour.is/ev/pkg/es/event"
)
type Driver interface {
Open(ctx context.Context, dsn string) (Driver, error)
EventLog(ctx context.Context, streamID string) (EventLog, error)
}
type EventLog interface {
Read(ctx context.Context, after, count int64) (event.Events, error)
ReadN(ctx context.Context, index ...uint64) (event.Events, error)
Append(ctx context.Context, events event.Events, version uint64) (uint64, error)
FirstIndex(context.Context) (uint64, error)
LastIndex(context.Context) (uint64, error)
}
type EventLogWithTruncate interface {
Truncate(context.Context, int64) error
}
type EventLogWithUpdate interface {
LoadForUpdate(context.Context, event.Aggregate, func(context.Context, event.Aggregate) error) (uint64, error)
}
type Subscription interface {
Recv(context.Context) <-chan bool
Events(context.Context) (event.Events, error)
Close(context.Context) error
}
type EventStream interface {
Subscribe(ctx context.Context, streamID string, start int64) (Subscription, error)
Send(ctx context.Context, streamID string, events event.Events) error
}

View File

@@ -1,248 +0,0 @@
// package memstore provides a driver that reads and writes events to memory.
package memstore
import (
"context"
"fmt"
"go.sour.is/ev"
"go.sour.is/ev/internal/lg"
"go.sour.is/ev/pkg/es/driver"
"go.sour.is/ev/pkg/es/event"
"go.sour.is/ev/pkg/locker"
"go.sour.is/ev/pkg/math"
)
type state struct {
streams map[string]*locker.Locked[event.Events]
}
type eventLog struct {
streamID string
events *locker.Locked[event.Events]
}
type memstore struct {
state *locker.Locked[state]
}
const AppendOnly = ev.AppendOnly
const AllEvents = ev.AllEvents
func Init(ctx context.Context) error {
ctx, span := lg.Span(ctx)
defer span.End()
return ev.Register(ctx, "mem", &memstore{})
}
var _ driver.Driver = (*memstore)(nil)
func (memstore) Open(ctx context.Context, name string) (driver.Driver, error) {
_, span := lg.Span(ctx)
defer span.End()
s := &state{streams: make(map[string]*locker.Locked[event.Events])}
return &memstore{locker.New(s)}, nil
}
func (m *memstore) EventLog(ctx context.Context, streamID string) (driver.EventLog, error) {
ctx, span := lg.Span(ctx)
defer span.End()
el := &eventLog{streamID: streamID}
err := m.state.Use(ctx, func(ctx context.Context, state *state) error {
_, span := lg.Span(ctx)
defer span.End()
l, ok := state.streams[streamID]
if !ok {
l = locker.New(&event.Events{})
state.streams[streamID] = l
}
el.events = l
return nil
})
if err != nil {
return nil, err
}
return el, err
}
var _ driver.EventLog = (*eventLog)(nil)
// Append implements driver.EventStore
func (m *eventLog) Append(ctx context.Context, events event.Events, version uint64) (uint64, error) {
ctx, span := lg.Span(ctx)
defer span.End()
event.SetStreamID(m.streamID, events...)
return uint64(len(events)), m.events.Use(ctx, func(ctx context.Context, stream *event.Events) error {
ctx, span := lg.Span(ctx)
defer span.End()
span.AddEvent(fmt.Sprintf(" %s %d", m.streamID, len(*stream)))
last := uint64(len(*stream))
if version != AppendOnly && version != last {
return fmt.Errorf("%w: current version wrong %d != %d", ev.ErrWrongVersion, version, last)
}
for i := range events {
span.AddEvent(fmt.Sprintf("read event %d of %d", i, len(events)))
// --- clone event
e := events[i]
b, err := event.MarshalBinary(e)
if err != nil {
return err
}
e, err = event.UnmarshalBinary(ctx, b, e.EventMeta().Position)
if err != nil {
return err
}
// ---
pos := last + uint64(i) + 1
event.SetPosition(e, pos)
*stream = append(*stream, e)
}
return nil
})
}
// ReadOne implements readone
func (m *eventLog) ReadN(ctx context.Context, index ...uint64) (event.Events, error) {
ctx, span := lg.Span(ctx)
defer span.End()
var events event.Events
err := m.events.Use(ctx, func(ctx context.Context, stream *event.Events) error {
var err error
events, err = readStreamN(ctx, stream, index...)
return err
})
return events, err
}
// Read implements driver.EventStore
func (m *eventLog) Read(ctx context.Context, after int64, count int64) (event.Events, error) {
ctx, span := lg.Span(ctx)
defer span.End()
var events event.Events
err := m.events.Use(ctx, func(ctx context.Context, stream *event.Events) error {
ctx, span := lg.Span(ctx)
defer span.End()
span.AddEvent(fmt.Sprintf("%s %d", m.streamID, len(*stream)))
first := stream.First().EventMeta().Position
last := stream.Last().EventMeta().Position
// ---
if first == 0 || last == 0 {
return nil
}
start, count := math.PagerBox(first, last, after, count)
if count == 0 {
return nil
}
span.AddEvent(fmt.Sprint("box", first, last, after, count))
events = make([]event.Event, math.Abs(count))
for i := range events {
span.AddEvent(fmt.Sprintf("read event %d of %d", i, math.Abs(count)))
// --- clone event
var err error
events[i], err = readStream(ctx, stream, start)
if err != nil {
return err
}
// ---
if count > 0 {
start += 1
} else {
start -= 1
}
if start < first || start > last {
events = events[:i+1]
break
}
}
event.SetStreamID(m.streamID, events...)
return nil
})
if err != nil {
return nil, err
}
return events, nil
}
// FirstIndex for the streamID
func (m *eventLog) FirstIndex(ctx context.Context) (uint64, error) {
ctx, span := lg.Span(ctx)
defer span.End()
events, err := m.events.Copy(ctx)
return events.First().EventMeta().Position, err
}
// LastIndex for the streamID
func (m *eventLog) LastIndex(ctx context.Context) (uint64, error) {
ctx, span := lg.Span(ctx)
defer span.End()
events, err := m.events.Copy(ctx)
return events.Last().EventMeta().Position, err
}
func (m *eventLog) LoadForUpdate(ctx context.Context, a event.Aggregate, fn func(context.Context, event.Aggregate) error) (uint64, error) {
panic("not implemented")
}
func readStream(ctx context.Context, stream *event.Events, index uint64) (event.Event, error) {
ctx, span := lg.Span(ctx)
defer span.End()
var b []byte
var err error
e := (*stream)[index-1]
b, err = event.MarshalBinary(e)
if err != nil {
return nil, err
}
e, err = event.UnmarshalBinary(ctx, b, e.EventMeta().ActualPosition)
if err != nil {
return nil, err
}
return e, err
}
func readStreamN(ctx context.Context, stream *event.Events, index ...uint64) (event.Events, error) {
ctx, span := lg.Span(ctx)
defer span.End()
var b []byte
var err error
events := make(event.Events, len(index))
for i, index := range index {
e := (*stream)[index-1]
b, err = event.MarshalBinary(e)
if err != nil {
return nil, err
}
events[i], err = event.UnmarshalBinary(ctx, b, e.EventMeta().Position)
if err != nil {
return nil, err
}
}
return events, err
}

View File

@@ -1,153 +0,0 @@
// package projecter provides a driver middleware to derive new events from other events.
package projecter
import (
"context"
"strings"
"go.sour.is/ev"
"go.sour.is/ev/internal/lg"
"go.sour.is/ev/pkg/es/driver"
"go.sour.is/ev/pkg/es/event"
)
type projector struct {
up driver.Driver
fns []func(event.Event) []event.Event
}
func New(_ context.Context, fns ...func(event.Event) []event.Event) *projector {
return &projector{fns: fns}
}
func (p *projector) Apply(e *ev.EventStore) {
up := e.Driver
for up != nil {
if op, ok := up.(*projector); ok {
op.AddProjections(p.fns...)
p.up = op.up
return
}
up = ev.Unwrap(up)
}
p.up = e.Driver
e.Driver = p
}
func (s *projector) Unwrap() driver.Driver {
return s.up
}
func (s *projector) Open(ctx context.Context, dsn string) (driver.Driver, error) {
ctx, span := lg.Span(ctx)
defer span.End()
return s.up.Open(ctx, dsn)
}
func (s *projector) EventLog(ctx context.Context, streamID string) (driver.EventLog, error) {
ctx, span := lg.Span(ctx)
defer span.End()
l, err := s.up.EventLog(ctx, streamID)
return &wrapper{l, s}, err
}
func (s *projector) AddProjections(fns ...func(event.Event) []event.Event) {
s.fns = append(s.fns, fns...)
}
type wrapper struct {
up driver.EventLog
projector *projector
}
var _ driver.EventLog = (*wrapper)(nil)
func (r *wrapper) Unwrap() driver.EventLog {
return r.up
}
func (w *wrapper) Read(ctx context.Context, after int64, count int64) (event.Events, error) {
ctx, span := lg.Span(ctx)
defer span.End()
return w.up.Read(ctx, after, count)
}
func (w *wrapper) ReadN(ctx context.Context, index ...uint64) (event.Events, error) {
ctx, span := lg.Span(ctx)
defer span.End()
return w.up.ReadN(ctx, index...)
}
func (w *wrapper) Append(ctx context.Context, events event.Events, version uint64) (uint64, error) {
ctx, span := lg.Span(ctx)
defer span.End()
i, err := w.up.Append(ctx, events, version)
if err != nil {
return i, err
}
{
ctx, span := lg.Fork(ctx)
go func() {
defer span.End()
var pevents []event.Event
for i := range events {
e := events[i]
for _, fn := range w.projector.fns {
pevents = append(
pevents,
fn(e)...,
)
}
}
for i := range pevents {
e := pevents[i]
l, err := w.projector.up.EventLog(ctx, event.StreamID(e).StreamID())
if err != nil {
span.RecordError(err)
continue
}
_, err = l.Append(ctx, event.NewEvents(e), ev.AppendOnly)
span.RecordError(err)
}
}()
}
return i, err
}
func (w *wrapper) FirstIndex(ctx context.Context) (uint64, error) {
ctx, span := lg.Span(ctx)
defer span.End()
return w.up.FirstIndex(ctx)
}
func (w *wrapper) LastIndex(ctx context.Context) (uint64, error) {
ctx, span := lg.Span(ctx)
defer span.End()
return w.up.LastIndex(ctx)
}
func DefaultProjection(e event.Event) []event.Event {
m := e.EventMeta()
streamID := m.StreamID
streamPos := m.Position
eventType := event.TypeOf(e)
pkg, _, _ := strings.Cut(eventType, ".")
e1 := event.NewPtr(streamID, streamPos)
event.SetStreamID("$all", e1)
e2 := event.NewPtr(streamID, streamPos)
event.SetStreamID("$type-"+eventType, e2)
e3 := event.NewPtr(streamID, streamPos)
event.SetStreamID("$pkg-"+pkg, e3)
return []event.Event{e1, e2, e3}
}

View File

@@ -1,137 +0,0 @@
package projecter_test
import (
"context"
"testing"
"github.com/matryer/is"
"go.sour.is/ev"
"go.sour.is/ev/pkg/es/driver"
"go.sour.is/ev/pkg/es/driver/projecter"
"go.sour.is/ev/pkg/es/event"
)
type mockDriver struct {
onOpen func(context.Context, string) (driver.Driver, error)
onEventLog func(context.Context, string) (driver.EventLog, error)
}
// Open implements driver.Driver
func (m *mockDriver) Open(ctx context.Context, dsn string) (driver.Driver, error) {
if m.onOpen != nil {
return m.onOpen(ctx, dsn)
}
panic("unimplemented")
}
// EventLog implements driver.Driver
func (m *mockDriver) EventLog(ctx context.Context, streamID string) (driver.EventLog, error) {
if m.onEventLog != nil {
return m.onEventLog(ctx, streamID)
}
panic("unimplemented")
}
var _ driver.Driver = (*mockDriver)(nil)
type mockEventLog struct {
onAppend func(context.Context, event.Events, uint64) (uint64, error)
onFirstIndex func(context.Context) (uint64, error)
onLastIndex func(context.Context) (uint64, error)
onRead func(context.Context, int64, int64) (event.Events, error)
onReadN func(context.Context, ...uint64) (event.Events, error)
}
// Append implements driver.EventLog
func (m *mockEventLog) Append(ctx context.Context, events event.Events, version uint64) (uint64, error) {
if m.onAppend != nil {
return m.onAppend(ctx, events, version)
}
panic("unimplemented")
}
// FirstIndex implements driver.EventLog
func (m *mockEventLog) FirstIndex(ctx context.Context) (uint64, error) {
if m.onFirstIndex != nil {
return m.onFirstIndex(ctx)
}
panic("unimplemented")
}
// LastIndex implements driver.EventLog
func (m *mockEventLog) LastIndex(ctx context.Context) (uint64, error) {
if m.onLastIndex != nil {
return m.onLastIndex(ctx)
}
panic("unimplemented")
}
// Read implements driver.EventLog
func (m *mockEventLog) Read(ctx context.Context, pos int64, count int64) (event.Events, error) {
if m.onRead != nil {
return m.onRead(ctx, pos, count)
}
panic("unimplemented")
}
func (m *mockEventLog) ReadN(ctx context.Context, index ...uint64) (event.Events, error) {
if m.onReadN != nil {
return m.onReadN(ctx, index...)
}
panic("unimplemented")
}
var _ driver.EventLog = (*mockEventLog)(nil)
func TestProjecter(t *testing.T) {
is := is.New(t)
ctx := context.Background()
var events []event.Event
wait := make(chan struct{})
mockEL := &mockEventLog{}
mockEL.onRead = func(ctx context.Context, i1, i2 int64) (event.Events, error) {
return event.NewEvents(), nil
}
mockEL.onAppend = func(ctx context.Context, e event.Events, u uint64) (uint64, error) {
events = append(events, e...)
if wait != nil && len(events) > 3 {
close(wait)
}
return uint64(len(e)), nil
}
mock := &mockDriver{}
mock.onOpen = func(ctx context.Context, s string) (driver.Driver, error) {
return mock, nil
}
mock.onEventLog = func(ctx context.Context, s string) (driver.EventLog, error) {
return mockEL, nil
}
ev.Init(ctx)
ev.Register(ctx, "mock", mock)
es, err := ev.Open(
ctx,
"mock:",
projecter.New(ctx, projecter.DefaultProjection),
)
is.NoErr(err)
_, err = es.Read(ctx, "test", 0, 1)
is.NoErr(err)
_, err = es.Append(ctx, "test", event.NewEvents(event.NilEvent))
is.NoErr(err)
<-wait
is.Equal(len(events), 4)
}

View File

@@ -1,174 +0,0 @@
package resolvelinks
import (
"context"
"errors"
"go.sour.is/ev"
"go.sour.is/ev/internal/lg"
"go.sour.is/ev/pkg/es/driver"
"go.sour.is/ev/pkg/es/event"
)
type resolvelinks struct {
up driver.Driver
}
func New() *resolvelinks {
return &resolvelinks{}
}
func (r *resolvelinks) Apply(es *ev.EventStore) {
r.up = es.Driver
es.Driver = r
}
func (r *resolvelinks) Unwrap() driver.Driver {
return r.up
}
func (r *resolvelinks) Open(ctx context.Context, dsn string) (driver.Driver, error) {
ctx, span := lg.Span(ctx)
defer span.End()
return r.up.Open(ctx, dsn)
}
func (r *resolvelinks) EventLog(ctx context.Context, streamID string) (driver.EventLog, error) {
ctx, span := lg.Span(ctx)
defer span.End()
l, err := r.up.EventLog(ctx, streamID)
return &wrapper{l, r}, err
}
type wrapper struct {
up driver.EventLog
resolvelinks *resolvelinks
}
func (r *wrapper) Unwrap() driver.EventLog {
return r.up
}
func (w *wrapper) Read(ctx context.Context, after int64, count int64) (event.Events, error) {
ctx, span := lg.Span(ctx)
defer span.End()
events, err := w.up.Read(ctx, after, count)
if err != nil {
return nil, err
}
idx := make(map[string][]uint64)
ptrs := make(map[string][]int)
for i := range events {
e := events[i]
if e, ok := e.(*event.EventPtr); ok {
idx[e.StreamID] = append(idx[e.StreamID], e.Pos)
ptrs[e.StreamID] = append(ptrs[e.StreamID], i)
}
}
for streamID, ids := range idx {
d, err := w.resolvelinks.EventLog(ctx, streamID)
if err != nil {
return nil, err
}
ptr := ptrs[streamID]
lis, err := d.ReadN(ctx, ids...)
if err != nil && !errors.Is(err, ev.ErrNotFound) {
return nil, err
}
for i := range lis {
meta := lis[i].EventMeta()
actual := events[ptr[i]].EventMeta()
meta.ActualPosition = actual.Position
meta.ActualStreamID = actual.ActualStreamID
lis[i].SetEventMeta(meta)
events[i] = lis[i]
}
}
// for i, e := range events {
// switch e := e.(type) {
// case *event.EventPtr:
// d, err := w.resolvelinks.EventLog(ctx, e.StreamID)
// if err != nil {
// return nil, err
// }
// lis, err := d.ReadN(ctx, e.Pos)
// if err != nil && !errors.Is(err, es.ErrNotFound) {
// return nil, err
// }
// if ne := lis.First(); ne != event.NilEvent {
// meta := ne.EventMeta()
// actual := e.EventMeta()
// meta.ActualPosition = actual.Position
// meta.ActualStreamID = actual.ActualStreamID
// ne.SetEventMeta(meta)
// events[i] = ne
// }
// }
// }
return events, err
}
func (w *wrapper) ReadN(ctx context.Context, index ...uint64) (event.Events, error) {
ctx, span := lg.Span(ctx)
defer span.End()
events, err := w.up.ReadN(ctx, index...)
if err != nil {
return nil, err
}
for i, e := range events {
switch e := e.(type) {
case *event.EventPtr:
d, err := w.resolvelinks.EventLog(ctx, e.StreamID)
if err != nil {
return nil, err
}
lis, err := d.ReadN(ctx, e.Pos)
if err != nil {
return nil, err
}
ne := lis.First()
meta := ne.EventMeta()
actual := e.EventMeta()
meta.ActualPosition = actual.Position
meta.ActualStreamID = actual.ActualStreamID
ne.SetEventMeta(meta)
events[i] = ne
}
}
return events, err
}
func (w *wrapper) Append(ctx context.Context, events event.Events, version uint64) (uint64, error) {
ctx, span := lg.Span(ctx)
defer span.End()
return w.up.Append(ctx, events, version)
}
func (w *wrapper) FirstIndex(ctx context.Context) (uint64, error) {
ctx, span := lg.Span(ctx)
defer span.End()
return w.up.FirstIndex(ctx)
}
func (w *wrapper) LastIndex(ctx context.Context) (uint64, error) {
ctx, span := lg.Span(ctx)
defer span.End()
return w.up.LastIndex(ctx)
}

View File

@@ -1,318 +0,0 @@
// package streamer provides a driver to allow awaiting for new events to be added to a stream.
package streamer
import (
"context"
"fmt"
"sync"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.sour.is/ev"
"go.sour.is/ev/internal/lg"
"go.sour.is/ev/pkg/es/driver"
"go.sour.is/ev/pkg/es/event"
"go.sour.is/ev/pkg/locker"
)
type state struct {
subscribers map[string][]*subscription
}
type streamer struct {
state *locker.Locked[state]
up driver.Driver
}
func New(ctx context.Context) *streamer {
_, span := lg.Span(ctx)
defer span.End()
return &streamer{state: locker.New(&state{subscribers: map[string][]*subscription{}})}
}
var _ ev.Option = (*streamer)(nil)
func (s *streamer) Apply(e *ev.EventStore) {
s.up = e.Driver
e.Driver = s
}
func (s *streamer) Unwrap() driver.Driver {
return s.up
}
var _ driver.Driver = (*streamer)(nil)
func (s *streamer) Open(ctx context.Context, dsn string) (driver.Driver, error) {
ctx, span := lg.Span(ctx)
defer span.End()
return s.up.Open(ctx, dsn)
}
func (s *streamer) EventLog(ctx context.Context, streamID string) (driver.EventLog, error) {
ctx, span := lg.Span(ctx)
defer span.End()
l, err := s.up.EventLog(ctx, streamID)
return &wrapper{streamID, l, s}, err
}
var _ driver.EventStream = (*streamer)(nil)
func (s *streamer) Subscribe(ctx context.Context, streamID string, start int64) (driver.Subscription, error) {
ctx, span := lg.Span(ctx)
defer span.End()
events, err := s.up.EventLog(ctx, streamID)
if err != nil {
return nil, err
}
sub := &subscription{topic: streamID, events: events}
sub.position = locker.New(&position{
idx: start,
size: ev.AllEvents,
})
sub.unsub = s.delete(streamID, sub)
return sub, s.state.Use(ctx, func(ctx context.Context, state *state) error {
state.subscribers[streamID] = append(state.subscribers[streamID], sub)
return nil
})
}
func (s *streamer) Send(ctx context.Context, streamID string, events event.Events) error {
ctx, span := lg.Span(ctx)
defer span.End()
return s.state.Use(ctx, func(ctx context.Context, state *state) error {
ctx, span := lg.Span(ctx)
defer span.End()
span.AddEvent(fmt.Sprint("subscribers=", len(state.subscribers[streamID])))
for _, sub := range state.subscribers[streamID] {
err := sub.position.Use(ctx, func(ctx context.Context, position *position) error {
ctx, span := lg.Span(ctx)
defer span.End()
span.SetAttributes(
attribute.String("streamID", streamID),
attribute.Int64("actualPosition", int64(events.Last().EventMeta().ActualPosition)),
attribute.String("actualStreamID", events.Last().EventMeta().ActualStreamID),
attribute.Int64("position", int64(events.Last().EventMeta().Position)),
)
position.size = int64(events.Last().EventMeta().ActualPosition - uint64(position.idx))
if position.wait != nil {
close(position.wait)
position.link = trace.LinkFromContext(ctx, attribute.String("src", "event"))
position.wait = nil
}
return nil
})
if err != nil {
return err
}
}
return nil
})
}
func (s *streamer) delete(streamID string, sub *subscription) func(context.Context) error {
return func(ctx context.Context) error {
ctx, span := lg.Span(ctx)
defer span.End()
if err := ctx.Err(); err != nil {
return err
}
return s.state.Use(ctx, func(ctx context.Context, state *state) error {
_, span := lg.Span(ctx)
defer span.End()
lis := state.subscribers[streamID]
for i := range lis {
if lis[i] == sub {
lis[i] = lis[len(lis)-1]
state.subscribers[streamID] = lis[:len(lis)-1]
return nil
}
}
return nil
})
}
}
type wrapper struct {
topic string
up driver.EventLog
streamer *streamer
}
var _ driver.EventLog = (*wrapper)(nil)
func (r *wrapper) Unwrap() driver.EventLog {
return r.up
}
func (w *wrapper) Read(ctx context.Context, after int64, count int64) (event.Events, error) {
ctx, span := lg.Span(ctx)
defer span.End()
return w.up.Read(ctx, after, count)
}
func (w *wrapper) ReadN(ctx context.Context, index ...uint64) (event.Events, error) {
ctx, span := lg.Span(ctx)
defer span.End()
return w.up.ReadN(ctx, index...)
}
func (w *wrapper) Append(ctx context.Context, events event.Events, version uint64) (uint64, error) {
ctx, span := lg.Span(ctx)
defer span.End()
i, err := w.up.Append(ctx, events, version)
if err != nil {
return i, err
}
ctx, span = lg.Fork(ctx)
go func() {
defer span.End()
err := w.streamer.Send(ctx, w.topic, events)
span.RecordError(err)
}()
return i, nil
}
func (w *wrapper) FirstIndex(ctx context.Context) (uint64, error) {
ctx, span := lg.Span(ctx)
defer span.End()
return w.up.FirstIndex(ctx)
}
func (w *wrapper) LastIndex(ctx context.Context) (uint64, error) {
ctx, span := lg.Span(ctx)
defer span.End()
return w.up.LastIndex(ctx)
}
type position struct {
size int64
idx int64
link trace.Link
wait chan struct{}
}
type subscription struct {
topic string
position *locker.Locked[position]
events driver.EventLog
unsub func(context.Context) error
once sync.Once
}
func (s *subscription) Recv(ctx context.Context) <-chan bool {
ctx, span := lg.Span(ctx)
defer span.End()
done := make(chan bool)
go func() {
var wait func(context.Context) bool
defer close(done)
err := s.position.Use(ctx, func(ctx context.Context, position *position) error {
_, span := lg.Span(ctx)
defer span.End()
if position.size == ev.AllEvents {
return nil
}
if position.size == 0 {
position.wait = make(chan struct{})
wait = func(ctx context.Context) bool {
ctx, span := lg.Span(ctx)
defer span.End()
select {
case <-position.wait:
if position.link.SpanContext.IsValid() {
_, span := lg.Span(ctx, trace.WithLinks(position.link))
span.AddEvent("recv event")
span.End()
position.link = trace.Link{}
}
return true
case <-ctx.Done():
return false
}
}
}
position.idx += position.size
position.size = 0
return nil
})
if err != nil {
done <- false
return
}
if wait != nil {
done <- wait(ctx)
return
}
done <- true
}()
return done
}
func (s *subscription) Events(ctx context.Context) (event.Events, error) {
ctx, span := lg.Span(ctx)
defer span.End()
var events event.Events
return events, s.position.Use(ctx, func(ctx context.Context, position *position) error {
ctx, span := lg.Span(ctx)
defer span.End()
var err error
events, err = s.events.Read(ctx, position.idx, position.size)
if err != nil {
return err
}
position.size = int64(len(events))
if len(events) > 0 {
position.idx = int64(events.First().EventMeta().ActualPosition - 1)
}
span.SetAttributes(
attribute.Int64("position.idx", position.idx),
attribute.Int64("position.size", position.size),
attribute.Int64("meta.ActualPosition", int64(events.First().EventMeta().ActualPosition)),
attribute.Int64("meta.Position", int64(events.First().EventMeta().Position)),
)
return err
})
}
func (s *subscription) Close(ctx context.Context) error {
ctx, span := lg.Span(ctx)
defer span.End()
if s == nil || s.unsub == nil {
return nil
}
var err error
s.once.Do(func() { err = s.unsub(ctx) })
return err
}

View File

@@ -1,5 +1,5 @@
type Meta @goModel(model: "go.sour.is/ev/pkg/es/event.Meta") {
type Meta @goModel(model: "go.sour.is/ev/pkg/event.Meta") {
eventID: String! @goField(name: "getEventID")
streamID: String! @goField(name: "ActualStreamID")
position: Int! @goField(name: "ActualPosition")

View File

@@ -1,137 +0,0 @@
package event
import (
"errors"
"fmt"
"sync"
)
// Aggregate implements functionality for working with event store streams as an aggregate.
// When creating a new Aggregate the struct should have an ApplyEvent method and embed the AggregateRoot.
type Aggregate interface {
// ApplyEvent applies the event to the aggrigate state
ApplyEvent(...Event)
AggregateRoot
}
func Start(a Aggregate, i uint64) {
a.start(i)
}
// Raise adds new uncommitted events
func Raise(a Aggregate, lis ...Event) {
lis = NewEvents(lis...)
SetStreamID(a.StreamID(), lis...)
a.raise(lis...)
a.ApplyEvent(lis...)
}
// Append adds new committed events
func Append(a Aggregate, lis ...Event) {
a.append(lis...)
a.ApplyEvent(lis...)
}
// NotExists returns error if there are no events present.
func NotExists(a Aggregate) error {
if a.Version() != 0 {
return fmt.Errorf("%w, got version == %d", ErrShouldNotExist, a.Version())
}
return nil
}
// ShouldExists returns error if there are no events present.
func ShouldExist(a Aggregate) error {
if a.Version() == 0 {
return fmt.Errorf("%w, got version == %d", ErrShouldExist, a.Version())
}
return nil
}
type AggregateRoot interface {
// Events returns the aggregate events
// pass true for only uncommitted events
Events(bool) Events
// StreamID returns aggregate stream ID
StreamID() string
// SetStreamID sets aggregate stream ID
SetStreamID(streamID string)
// StreamVersion returns last commit events
StreamVersion() uint64
// Version returns the current aggrigate version. (committed + uncommitted)
Version() uint64
start(uint64)
raise(lis ...Event)
append(lis ...Event)
Commit()
}
var _ AggregateRoot = &IsAggregate{}
type IsAggregate struct {
events Events
streamID string
firstIndex uint64
lastIndex uint64
mu sync.RWMutex
}
func (a *IsAggregate) Commit() { a.lastIndex = uint64(len(a.events)) }
func (a *IsAggregate) StreamID() string { return a.streamID }
func (a *IsAggregate) SetStreamID(streamID string) { a.streamID = streamID }
func (a *IsAggregate) StreamVersion() uint64 { return a.lastIndex }
func (a *IsAggregate) Version() uint64 { return a.firstIndex + uint64(len(a.events)) }
func (a *IsAggregate) Events(new bool) Events {
a.mu.RLock()
defer a.mu.RUnlock()
events := a.events
if new {
events = events[a.lastIndex-a.firstIndex:]
}
lis := make(Events, len(events))
copy(lis, events)
return lis
}
func (a *IsAggregate) start(i uint64) {
a.firstIndex = i
a.lastIndex = i
}
//lint:ignore U1000 is called by embeded interface
func (a *IsAggregate) raise(lis ...Event) { //nolint
a.mu.Lock()
defer a.mu.Unlock()
a.posStartAt(lis...)
a.events = append(a.events, lis...)
}
//lint:ignore U1000 is called by embeded interface
func (a *IsAggregate) append(lis ...Event) {
a.mu.Lock()
defer a.mu.Unlock()
a.posStartAt(lis...)
a.events = append(a.events, lis...)
a.lastIndex += uint64(len(lis))
}
func (a *IsAggregate) posStartAt(lis ...Event) {
for i, e := range lis {
m := e.EventMeta()
m.Position = a.lastIndex + uint64(i) + 1
e.SetEventMeta(m)
}
}
var ErrShouldNotExist = errors.New("should not exist")
var ErrShouldExist = errors.New("should exist")

View File

@@ -1,43 +0,0 @@
package event_test
import (
"testing"
"go.sour.is/ev/pkg/es/event"
)
type Agg struct {
Value string
event.IsAggregate
}
var _ event.Aggregate = (*Agg)(nil)
func (a *Agg) streamID() string {
return "value-" + a.Value
}
// ApplyEvent applies the event to the aggrigate state
func (a *Agg) ApplyEvent(lis ...event.Event) {
for _, e := range lis {
switch e := e.(type) {
case *ValueApplied:
a.Value = e.Value
a.SetStreamID(a.streamID())
}
}
}
type ValueApplied struct {
Value string
event.IsEvent
}
var _ event.Event = (*ValueApplied)(nil)
func TestAggregate(t *testing.T) {
agg := &Agg{}
event.Append(agg, &ValueApplied{Value: "one"})
}

View File

@@ -1,265 +0,0 @@
// package event implements functionality for working with an eventstore.
package event
import (
"context"
"crypto/rand"
"fmt"
"io"
"strconv"
"strings"
"sync"
"time"
ulid "github.com/oklog/ulid/v2"
)
var pool = sync.Pool{
New: func() interface{} { return ulid.Monotonic(rand.Reader, 0) },
}
func getULID() ulid.ULID {
var entropy io.Reader = rand.Reader
if e, ok := pool.Get().(io.Reader); ok {
entropy = e
defer pool.Put(e)
}
return ulid.MustNew(ulid.Now(), entropy)
}
// Event implements functionality of an individual event used with the event store. It should implement the getter/setter for EventMeta and BinaryMarshaler/BinaryUnmarshaler.
type Event interface {
EventMeta() Meta
SetEventMeta(Meta)
}
// Events is a list of events
type Events []Event
func NewEvents(lis ...Event) Events {
for i, e := range lis {
meta := e.EventMeta()
meta.Position = uint64(i)
if meta.ActualPosition == 0 {
meta.ActualPosition = uint64(i)
}
meta.EventID = getULID()
e.SetEventMeta(meta)
}
return lis
}
func (lis Events) StreamID() string {
if len(lis) == 0 {
return ""
}
return lis.First().EventMeta().StreamID
}
func (lis Events) SetStreamID(streamID string) {
SetStreamID(streamID, lis...)
}
func (lis Events) Count() int64 {
return int64(len(lis))
}
func (lis Events) First() Event {
if len(lis) == 0 {
return NilEvent
}
return lis[0]
}
func (lis Events) Rest() Events {
if len(lis) == 0 {
return nil
}
return lis[1:]
}
func (lis Events) Last() Event {
if len(lis) == 0 {
return NilEvent
}
return lis[len(lis)-1]
}
func TypeOf(e any) string {
if ie, ok := e.(interface{ UnwrapEvent() Event }); ok {
e = ie.UnwrapEvent()
}
if e, ok := e.(interface{ EventType() string }); ok {
return e.EventType()
}
// Default to printed representation for unnamed types
return strings.Trim(fmt.Sprintf("%T", e), "*")
}
type streamID string
func (s streamID) StreamID() string {
return string(s)
}
func StreamID(e Event) streamID {
return streamID(e.EventMeta().StreamID)
}
func SetStreamID(id string, lis ...Event) {
for _, e := range lis {
meta := e.EventMeta()
meta.StreamID = id
if meta.ActualStreamID == "" {
meta.ActualStreamID = id
}
e.SetEventMeta(meta)
}
}
func EventID(e Event) ulid.ULID {
return e.EventMeta().EventID
}
func SetEventID(e Event, id ulid.ULID) {
meta := e.EventMeta()
meta.EventID = id
e.SetEventMeta(meta)
}
func SetPosition(e Event, i uint64) {
meta := e.EventMeta()
meta.Position = i
meta.ActualPosition = i
e.SetEventMeta(meta)
}
type Meta struct {
EventID ulid.ULID
StreamID string
Position uint64
ActualStreamID string
ActualPosition uint64
}
func (m Meta) Created() time.Time {
return ulid.Time(m.EventID.Time())
}
func (m Meta) GetEventID() string { return m.EventID.String() }
func Init(ctx context.Context) error {
if err := Register(ctx, NilEvent, &EventPtr{}); err != nil {
return err
}
if err := RegisterName(ctx, "event.eventPtr", &EventPtr{}); err != nil {
return err
}
return nil
}
type nilEvent struct {
IsEvent
}
var NilEvent = &nilEvent{}
func (e *nilEvent) MarshalBinary() ([]byte, error) {
return nil, nil
}
func (e *nilEvent) UnmarshalBinary(b []byte) error {
return nil
}
type EventPtr struct {
StreamID string `json:"stream_id"`
Pos uint64 `json:"pos"`
IsEvent
}
var _ Event = (*EventPtr)(nil)
func NewPtr(streamID string, pos uint64) *EventPtr {
return &EventPtr{StreamID: streamID, Pos: pos}
}
// MarshalBinary implements Event
func (e *EventPtr) MarshalBinary() (data []byte, err error) {
return []byte(fmt.Sprintf("%s@%d", e.StreamID, e.Pos)), nil
}
// UnmarshalBinary implements Event
func (e *EventPtr) UnmarshalBinary(data []byte) error {
s := string(data)
idx := strings.LastIndex(s, "@")
if idx == -1 {
return fmt.Errorf("missing @ in: %s", s)
}
e.StreamID = s[:idx]
var err error
e.Pos, err = strconv.ParseUint(s[idx+1:], 10, 64)
return err
}
func (e *EventPtr) Values() any {
return struct {
StreamID string `json:"stream_id"`
Pos uint64 `json:"pos"`
}{
e.StreamID,
e.Pos,
}
}
type FeedTruncated struct {
IsEvent
}
func (e *FeedTruncated) Values() any {
return struct {
}{}
}
type property[T any] struct {
v T
}
type IsEvent = property[Meta]
func (p *property[T]) EventMeta() T {
if p == nil {
var t T
return t
}
return p.v
}
func (p *property[T]) SetEventMeta(x T) {
if p != nil {
p.v = x
}
}
func AsEvent[T any](e T) Event {
return &asEvent[T]{payload: e}
}
type asEvent [T any] struct {
payload T
IsEvent
}
func (e asEvent[T]) Payload() T {
return e.payload
}
type AGG interface{ApplyEvent(...Event)}
func AsAggregate[T AGG](e T) Aggregate {
return &asAggregate[T]{payload: e}
}
type asAggregate [T AGG] struct {
payload T
IsAggregate
}
func (e *asAggregate[T]) Payload() T {
return e.payload
}
func (e *asAggregate[T]) ApplyEvent(lis ...Event) {
e.payload.ApplyEvent(lis...)
}

View File

@@ -1,82 +0,0 @@
package event_test
import (
"bytes"
"context"
"encoding/json"
"testing"
"github.com/matryer/is"
"go.sour.is/ev/pkg/es/event"
)
type DummyEvent struct {
Value string
event.IsEvent
}
func (e *DummyEvent) MarshalBinary() ([]byte, error) {
return json.Marshal(e)
}
func (e *DummyEvent) UnmarshalBinary(b []byte) error {
return json.Unmarshal(b, e)
}
func TestEventEncode(t *testing.T) {
is := is.New(t)
ctx := context.Background()
err := event.Register(ctx, &DummyEvent{})
is.NoErr(err)
var lis event.Events = event.NewEvents(
&DummyEvent{Value: "testA"},
&DummyEvent{Value: "testB"},
&DummyEvent{Value: "testC"},
)
lis.SetStreamID("test")
blis, err := event.EncodeEvents(lis...)
is.NoErr(err)
for _, b := range blis {
sp := bytes.SplitN(b, []byte{'\t'}, 4)
is.Equal(len(sp), 4)
is.Equal(string(sp[1]), "test")
is.Equal(string(sp[2]), "event_test.DummyEvent")
}
chk, err := event.DecodeEvents(ctx, blis...)
is.NoErr(err)
for i := range chk {
is.Equal(lis[i], chk[i])
}
}
type exampleAgg struct{ value string }
func (a *exampleAgg) ApplyEvent(lis ...event.Event) {
for _, e := range lis {
switch e := e.(type) {
case interface{ Payload() exampleEvSetValue }:
a.value = e.Payload().value
}
}
}
type exampleEvSetValue struct{ value string }
func TestApplyEventGeneric(t *testing.T) {
payload := &exampleAgg{}
var agg = event.AsAggregate(payload)
agg.ApplyEvent(event.NewEvents(
event.AsEvent(exampleEvSetValue{"hello"}),
)...)
is := is.New(t)
is.Equal(payload.value, "hello")
}

View File

@@ -1,320 +0,0 @@
package event
import (
"bytes"
"context"
"encoding"
"encoding/json"
"fmt"
"net/url"
"reflect"
"strings"
"go.sour.is/ev/internal/lg"
"go.sour.is/ev/pkg/locker"
)
type config struct {
eventTypes map[string]reflect.Type
}
var (
eventTypes = locker.New(&config{eventTypes: make(map[string]reflect.Type)})
)
type UnknownEvent struct {
eventType string
values map[string]json.RawMessage
IsEvent
}
var _ Event = (*UnknownEvent)(nil)
func NewUnknownEventFromValues(eventType string, meta Meta, values url.Values) *UnknownEvent {
jsonValues := make(map[string]json.RawMessage, len(values))
for k, v := range values {
switch len(v) {
case 0:
jsonValues[k] = []byte("null")
case 1:
jsonValues[k] = embedJSON(v[0])
default:
parts := make([][]byte, len(v))
for i := range v {
parts[i] = embedJSON(v[i])
}
jsonValues[k] = append([]byte("["), bytes.Join(parts, []byte(","))...)
jsonValues[k] = append(jsonValues[k], ']')
}
}
e := &UnknownEvent{eventType: eventType, values: jsonValues}
e.SetEventMeta(meta)
return e
}
func NewUnknownEventFromRaw(eventType string, meta Meta, values map[string]json.RawMessage) *UnknownEvent {
e := &UnknownEvent{eventType: eventType, values: values}
e.SetEventMeta(meta)
return e
}
func (u UnknownEvent) EventType() string { return u.eventType }
func (u *UnknownEvent) UnmarshalBinary(b []byte) error {
return json.Unmarshal(b, &u.values)
}
func (u *UnknownEvent) MarshalBinary() ([]byte, error) {
return json.Marshal(u.values)
}
// Register a type container for Unmarshalling values into. The type must implement Event and not be a nil value.
func Register(ctx context.Context, lis ...Event) error {
ctx, span := lg.Span(ctx)
defer span.End()
for _, e := range lis {
if err := ctx.Err(); err != nil {
span.RecordError(err)
return err
}
name := TypeOf(e)
err := RegisterName(ctx, name, e)
if err != nil {
return err
}
}
return nil
}
func RegisterName(ctx context.Context, name string, e Event) error {
ctx, span := lg.Span(ctx)
defer span.End()
if e == nil {
err := fmt.Errorf("can't register event.Event of type=%T with value=%v", e, e)
span.RecordError(err)
return err
}
value := reflect.ValueOf(e)
if value.IsNil() {
err := fmt.Errorf("can't register event.Event of type=%T with value=%v", e, e)
span.RecordError(err)
return err
}
value = reflect.Indirect(value)
typ := value.Type()
span.AddEvent("register: " + name)
if err := eventTypes.Use(ctx, func(ctx context.Context, c *config) error {
_, span := lg.Span(ctx)
defer span.End()
c.eventTypes[name] = typ
return nil
}); err != nil {
span.RecordError(err)
return err
}
return nil
}
func GetContainer(ctx context.Context, s string) Event {
ctx, span := lg.Span(ctx)
defer span.End()
var e Event
eventTypes.Use(ctx, func(ctx context.Context, c *config) error {
_, span := lg.Span(ctx)
defer span.End()
typ, ok := c.eventTypes[s]
if !ok {
err := fmt.Errorf("not defined: %s", s)
span.RecordError(err)
return err
}
newType := reflect.New(typ)
newInterface := newType.Interface()
if iface, ok := newInterface.(Event); ok {
e = iface
return nil
}
err := fmt.Errorf("failed")
span.RecordError(err)
return err
})
if e == nil {
e = &UnknownEvent{eventType: s}
}
return e
}
func MarshalBinary(e Event) ([]byte, error) {
var err error
b := &bytes.Buffer{}
m := e.EventMeta()
if _, err = b.WriteString(m.EventID.String()); err != nil {
return nil, err
}
b.WriteRune('\t')
if _, err = b.WriteString(m.StreamID); err != nil {
return nil, err
}
b.WriteRune('\t')
if _, err = b.WriteString(TypeOf(e)); err != nil {
return nil, err
}
b.WriteRune('\t')
switch e := e.(type) {
case encoding.BinaryMarshaler:
var txt []byte
if txt, err = e.MarshalBinary(); err != nil {
return nil, err
}
_, err = b.Write(txt)
case encoding.TextMarshaler:
var txt []byte
if txt, err = e.MarshalText(); err != nil {
return nil, err
}
_, err = b.Write(txt)
default:
err = json.NewEncoder(b).Encode(e)
}
return b.Bytes(), err
}
func UnmarshalBinary(ctx context.Context, txt []byte, pos uint64) (e Event, err error) {
ctx, span := lg.Span(ctx)
defer span.End()
sp := bytes.SplitN(txt, []byte{'\t'}, 4)
if len(sp) != 4 {
err = fmt.Errorf("invalid format. expected=4, got=%d", len(sp))
span.RecordError(err)
return nil, err
}
m := Meta{}
if err = m.EventID.UnmarshalText(sp[0]); err != nil {
span.RecordError(err)
return nil, err
}
m.StreamID = string(sp[1])
m.Position = pos
m.ActualStreamID = string(sp[1])
m.ActualPosition = pos
eventType := string(sp[2])
e = GetContainer(ctx, eventType)
span.AddEvent(fmt.Sprintf("%s == %T", eventType, e))
switch e := e.(type) {
case encoding.BinaryUnmarshaler:
if err = e.UnmarshalBinary(sp[3]); err != nil {
span.RecordError(err)
return nil, err
}
case encoding.TextUnmarshaler:
if err = e.UnmarshalText(sp[3]); err != nil {
span.RecordError(err)
return nil, err
}
default:
if err = json.Unmarshal(sp[3], e); err != nil {
span.RecordError(err)
return nil, err
}
}
e.SetEventMeta(m)
return e, nil
}
// DecodeEvents unmarshals the byte list into Events.
func DecodeEvents(ctx context.Context, lis ...[]byte) (Events, error) {
elis := make([]Event, len(lis))
var err error
for i, txt := range lis {
elis[i], err = UnmarshalBinary(ctx, txt, uint64(i))
if err != nil {
return nil, err
}
}
return elis, nil
}
func EncodeEvents(events ...Event) (lis [][]byte, err error) {
lis = make([][]byte, len(events))
for i, txt := range events {
lis[i], err = MarshalBinary(txt)
if err != nil {
return nil, err
}
}
return lis, nil
}
func embedJSON(s string) json.RawMessage {
if len(s) > 1 && s[0] == '{' && s[len(s)-1] == '}' {
return []byte(s)
}
if len(s) > 1 && s[0] == '[' && s[len(s)-1] == ']' {
return []byte(s)
}
return []byte(fmt.Sprintf(`"%s"`, strings.Replace(s, `"`, `\"`, -1)))
}
func Values(e Event) map[string]any {
var a any = e
if e, ok := e.(interface{ Values() any }); ok {
a = e.Values()
}
m := make(map[string]any)
v := reflect.Indirect(reflect.ValueOf(a))
for _, idx := range reflect.VisibleFields(v.Type()) {
if !idx.IsExported() {
continue
}
omitempty := false
field := v.FieldByIndex(idx.Index)
name := idx.Name
if n, ok := idx.Tag.Lookup("json"); ok {
var (
opt string
found bool
)
name, opt, found = strings.Cut(n, ",")
if name == "-" {
continue
}
if found {
if strings.Contains(opt, "omitempty") {
omitempty = true
}
}
}
if omitempty && field.IsZero() {
continue
}
m[name] = field.Interface()
}
return m
}

View File

@@ -9,10 +9,11 @@ import (
"net/http"
"time"
"go.sour.is/pkg/gql"
"go.sour.is/pkg/lg"
"go.sour.is/ev"
"go.sour.is/ev/internal/lg"
"go.sour.is/ev/pkg/es/event"
"go.sour.is/ev/pkg/gql"
"go.sour.is/ev/pkg/event"
)
type EventResolver interface {