ev/pkg/es/event/events.go

147 lines
2.6 KiB
Go
Raw Normal View History

2022-08-04 14:37:51 -06:00
package event
import (
"crypto/rand"
2022-08-14 10:56:00 -06:00
"encoding"
"encoding/json"
2022-08-04 14:37:51 -06:00
"fmt"
"io"
"strings"
"sync"
"time"
ulid "github.com/oklog/ulid/v2"
)
var pool = sync.Pool{
New: func() interface{} { return ulid.Monotonic(rand.Reader, 0) },
}
func getULID() ulid.ULID {
var entropy io.Reader = rand.Reader
if e, ok := pool.Get().(io.Reader); ok {
entropy = e
defer pool.Put(e)
}
return ulid.MustNew(ulid.Now(), entropy)
}
type Event interface {
EventMeta() Meta
SetEventMeta(Meta)
2022-08-14 10:56:00 -06:00
2022-08-15 08:05:04 -06:00
encoding.BinaryMarshaler
encoding.BinaryUnmarshaler
2022-08-04 14:37:51 -06:00
}
// Events is a list of events
type Events []Event
func NewEvents(lis ...Event) Events {
for i, e := range lis {
meta := e.EventMeta()
meta.Position = uint64(i)
meta.EventID = getULID()
e.SetEventMeta(meta)
}
return lis
}
func (lis Events) StreamID() string {
if len(lis) == 0 {
return ""
}
return lis.First().EventMeta().StreamID
}
func (lis Events) SetStreamID(streamID string) {
SetStreamID(streamID, lis...)
}
func (lis Events) First() Event {
if len(lis) == 0 {
return NilEvent
2022-08-04 14:37:51 -06:00
}
return lis[0]
}
func (lis Events) Rest() Events {
if len(lis) == 0 {
return nil
}
return lis[1:]
}
2022-08-04 21:07:10 -06:00
func (lis Events) Last() Event {
if len(lis) == 0 {
return NilEvent
2022-08-04 21:07:10 -06:00
}
return lis[len(lis)-1]
}
2022-08-04 14:37:51 -06:00
func TypeOf(e Event) string {
if ie, ok := e.(interface{ UnwrapEvent() Event }); ok {
e = ie.UnwrapEvent()
}
if e, ok := e.(interface{ EventType() string }); ok {
return e.EventType()
}
// Default to printed representation for unnamed types
return strings.Trim(fmt.Sprintf("%T", e), "*")
}
type streamID string
func (s streamID) StreamID() string {
return string(s)
}
func StreamID(e Event) streamID {
return streamID(e.EventMeta().StreamID)
}
func SetStreamID(id string, lis ...Event) {
for _, e := range lis {
meta := e.EventMeta()
meta.StreamID = id
e.SetEventMeta(meta)
}
}
func EventID(e Event) ulid.ULID {
return e.EventMeta().EventID
}
func SetEventID(e Event, id ulid.ULID) {
meta := e.EventMeta()
meta.EventID = id
e.SetEventMeta(meta)
}
func SetPosition(e Event, i uint64) {
meta := e.EventMeta()
meta.Position = i
e.SetEventMeta(meta)
}
2022-08-04 21:07:10 -06:00
2022-08-04 14:37:51 -06:00
type Meta struct {
EventID ulid.ULID
StreamID string
Position uint64
}
2022-08-07 11:55:49 -06:00
func (m Meta) Created() time.Time {
2022-08-04 14:37:51 -06:00
return ulid.Time(m.EventID.Time())
}
func (m Meta) GetEventID() string { return m.EventID.String() }
2022-08-04 21:07:10 -06:00
type nilEvent struct{}
2022-08-14 10:56:00 -06:00
func (*nilEvent) EventMeta() Meta {
2022-08-04 21:07:10 -06:00
return Meta{}
}
2022-08-14 10:56:00 -06:00
func (*nilEvent) SetEventMeta(eventMeta Meta) {}
var NilEvent = &nilEvent{}
2022-08-04 21:07:10 -06:00
2022-08-15 08:05:04 -06:00
func (e *nilEvent) MarshalBinary() ([]byte, error) {
2022-08-14 10:56:00 -06:00
return json.Marshal(e)
}
2022-08-15 08:05:04 -06:00
func (e *nilEvent) UnmarshalBinary(b []byte) error {
2022-08-14 10:56:00 -06:00
return json.Unmarshal(b, e)
}