feat: add projection layer to eventstore

This commit is contained in:
Jon Lundy
2022-09-06 10:35:14 -06:00
parent f65e9ca666
commit 063dca997f
24 changed files with 493 additions and 220 deletions

View File

@@ -1,11 +1,13 @@
package event
import (
"context"
"crypto/rand"
"encoding"
"encoding/json"
"fmt"
"io"
"strconv"
"strings"
"sync"
"time"
@@ -129,6 +131,10 @@ func (m Meta) Created() time.Time {
}
func (m Meta) GetEventID() string { return m.EventID.String() }
func Init(ctx context.Context) error {
return Register(ctx, NilEvent, &eventPtr{})
}
type nilEvent struct{}
func (*nilEvent) EventMeta() Meta {
@@ -144,3 +150,61 @@ func (e *nilEvent) MarshalBinary() ([]byte, error) {
func (e *nilEvent) UnmarshalBinary(b []byte) error {
return json.Unmarshal(b, e)
}
type eventPtr struct {
streamID string
pos uint64
eventMeta Meta
}
var _ Event = (*eventPtr)(nil)
func NewPtr(streamID string, pos uint64) *eventPtr {
return &eventPtr{streamID: streamID, pos: pos}
}
// MarshalBinary implements Event
func (e *eventPtr) MarshalBinary() (data []byte, err error) {
return []byte(fmt.Sprintf("%s@%d", e.streamID, e.pos)), nil
}
// UnmarshalBinary implements Event
func (e *eventPtr) UnmarshalBinary(data []byte) error {
s := string(data)
idx := strings.LastIndex(s, "@")
if idx == -1 {
return fmt.Errorf("missing @ in: %s", s)
}
e.streamID = s[:idx]
var err error
e.pos, err = strconv.ParseUint(s[idx+1:], 10, 64)
return err
}
// EventMeta implements Event
func (e *eventPtr) EventMeta() Meta {
if e == nil {
return Meta{}
}
return e.eventMeta
}
// SetEventMeta implements Event
func (e *eventPtr) SetEventMeta(m Meta) {
if e == nil {
return
}
e.eventMeta = m
}
func (e *eventPtr) Values() any {
return struct {
StreamID string `json:"stream_id"`
Pos uint64 `json:"pos"`
}{
e.streamID,
e.pos,
}
}

View File

@@ -9,7 +9,7 @@ import (
"reflect"
"strings"
"github.com/sour-is/ev/internal/logz"
"github.com/sour-is/ev/internal/lg"
"github.com/sour-is/ev/pkg/locker"
)
@@ -67,7 +67,7 @@ func (u *UnknownEvent) MarshalBinary() ([]byte, error) {
// Register a type container for Unmarshalling values into. The type must implement Event and not be a nil value.
func Register(ctx context.Context, lis ...Event) error {
_, span := logz.Span(ctx)
_, span := lg.Span(ctx)
defer span.End()
for _, e := range lis {
@@ -84,7 +84,7 @@ func Register(ctx context.Context, lis ...Event) error {
return nil
}
func RegisterName(ctx context.Context, name string, e Event) error {
_, span := logz.Span(ctx)
_, span := lg.Span(ctx)
defer span.End()
if e == nil {
@@ -107,7 +107,7 @@ func RegisterName(ctx context.Context, name string, e Event) error {
span.AddEvent("register: " + name)
if err := eventTypes.Modify(ctx, func(c *config) error {
_, span := logz.Span(ctx)
_, span := lg.Span(ctx)
defer span.End()
c.eventTypes[name] = typ
@@ -119,13 +119,13 @@ func RegisterName(ctx context.Context, name string, e Event) error {
return nil
}
func GetContainer(ctx context.Context, s string) Event {
_, span := logz.Span(ctx)
_, span := lg.Span(ctx)
defer span.End()
var e Event
eventTypes.Modify(ctx, func(c *config) error {
_, span := logz.Span(ctx)
_, span := lg.Span(ctx)
defer span.End()
typ, ok := c.eventTypes[s]
@@ -176,7 +176,7 @@ func MarshalBinary(e Event) (txt []byte, err error) {
}
func UnmarshalBinary(ctx context.Context, txt []byte, pos uint64) (e Event, err error) {
_, span := logz.Span(ctx)
_, span := lg.Span(ctx)
defer span.End()
sp := bytes.SplitN(txt, []byte{'\t'}, 4)
@@ -262,7 +262,13 @@ func Values(e Event) map[string]any {
}
field := v.FieldByIndex(idx.Index)
m[idx.Name] = field.Interface()
name := idx.Name
if n, ok := idx.Tag.Lookup("json"); ok {
name = n
}
m[name] = field.Interface()
}
return m
}