ev/pkg/es/event/events.go
2022-09-06 10:35:14 -06:00

211 lines
3.8 KiB
Go

package event
import (
"context"
"crypto/rand"
"encoding"
"encoding/json"
"fmt"
"io"
"strconv"
"strings"
"sync"
"time"
ulid "github.com/oklog/ulid/v2"
)
var pool = sync.Pool{
New: func() interface{} { return ulid.Monotonic(rand.Reader, 0) },
}
func getULID() ulid.ULID {
var entropy io.Reader = rand.Reader
if e, ok := pool.Get().(io.Reader); ok {
entropy = e
defer pool.Put(e)
}
return ulid.MustNew(ulid.Now(), entropy)
}
type Event interface {
EventMeta() Meta
SetEventMeta(Meta)
encoding.BinaryMarshaler
encoding.BinaryUnmarshaler
}
// Events is a list of events
type Events []Event
func NewEvents(lis ...Event) Events {
for i, e := range lis {
meta := e.EventMeta()
meta.Position = uint64(i)
meta.EventID = getULID()
e.SetEventMeta(meta)
}
return lis
}
func (lis Events) StreamID() string {
if len(lis) == 0 {
return ""
}
return lis.First().EventMeta().StreamID
}
func (lis Events) SetStreamID(streamID string) {
SetStreamID(streamID, lis...)
}
func (lis Events) First() Event {
if len(lis) == 0 {
return NilEvent
}
return lis[0]
}
func (lis Events) Rest() Events {
if len(lis) == 0 {
return nil
}
return lis[1:]
}
func (lis Events) Last() Event {
if len(lis) == 0 {
return NilEvent
}
return lis[len(lis)-1]
}
func TypeOf(e Event) string {
if ie, ok := e.(interface{ UnwrapEvent() Event }); ok {
e = ie.UnwrapEvent()
}
if e, ok := e.(interface{ EventType() string }); ok {
return e.EventType()
}
// Default to printed representation for unnamed types
return strings.Trim(fmt.Sprintf("%T", e), "*")
}
type streamID string
func (s streamID) StreamID() string {
return string(s)
}
func StreamID(e Event) streamID {
return streamID(e.EventMeta().StreamID)
}
func SetStreamID(id string, lis ...Event) {
for _, e := range lis {
meta := e.EventMeta()
meta.StreamID = id
e.SetEventMeta(meta)
}
}
func EventID(e Event) ulid.ULID {
return e.EventMeta().EventID
}
func SetEventID(e Event, id ulid.ULID) {
meta := e.EventMeta()
meta.EventID = id
e.SetEventMeta(meta)
}
func SetPosition(e Event, i uint64) {
meta := e.EventMeta()
meta.Position = i
e.SetEventMeta(meta)
}
type Meta struct {
EventID ulid.ULID
StreamID string
Position uint64
}
func (m Meta) Created() time.Time {
return ulid.Time(m.EventID.Time())
}
func (m Meta) GetEventID() string { return m.EventID.String() }
func Init(ctx context.Context) error {
return Register(ctx, NilEvent, &eventPtr{})
}
type nilEvent struct{}
func (*nilEvent) EventMeta() Meta {
return Meta{}
}
func (*nilEvent) SetEventMeta(eventMeta Meta) {}
var NilEvent = &nilEvent{}
func (e *nilEvent) MarshalBinary() ([]byte, error) {
return json.Marshal(e)
}
func (e *nilEvent) UnmarshalBinary(b []byte) error {
return json.Unmarshal(b, e)
}
type eventPtr struct {
streamID string
pos uint64
eventMeta Meta
}
var _ Event = (*eventPtr)(nil)
func NewPtr(streamID string, pos uint64) *eventPtr {
return &eventPtr{streamID: streamID, pos: pos}
}
// MarshalBinary implements Event
func (e *eventPtr) MarshalBinary() (data []byte, err error) {
return []byte(fmt.Sprintf("%s@%d", e.streamID, e.pos)), nil
}
// UnmarshalBinary implements Event
func (e *eventPtr) UnmarshalBinary(data []byte) error {
s := string(data)
idx := strings.LastIndex(s, "@")
if idx == -1 {
return fmt.Errorf("missing @ in: %s", s)
}
e.streamID = s[:idx]
var err error
e.pos, err = strconv.ParseUint(s[idx+1:], 10, 64)
return err
}
// EventMeta implements Event
func (e *eventPtr) EventMeta() Meta {
if e == nil {
return Meta{}
}
return e.eventMeta
}
// SetEventMeta implements Event
func (e *eventPtr) SetEventMeta(m Meta) {
if e == nil {
return
}
e.eventMeta = m
}
func (e *eventPtr) Values() any {
return struct {
StreamID string `json:"stream_id"`
Pos uint64 `json:"pos"`
}{
e.streamID,
e.pos,
}
}