ev/pkg/es/event/events.go

227 lines
4.4 KiB
Go
Raw Normal View History

2022-10-13 15:32:25 -06:00
// package event implements functionality for working with an eventstore.
2022-08-04 14:37:51 -06:00
package event
import (
"context"
2022-08-04 14:37:51 -06:00
"crypto/rand"
2022-08-14 10:56:00 -06:00
"encoding"
"encoding/json"
2022-08-04 14:37:51 -06:00
"fmt"
"io"
"strconv"
2022-08-04 14:37:51 -06:00
"strings"
"sync"
"time"
ulid "github.com/oklog/ulid/v2"
)
var pool = sync.Pool{
New: func() interface{} { return ulid.Monotonic(rand.Reader, 0) },
}
func getULID() ulid.ULID {
var entropy io.Reader = rand.Reader
if e, ok := pool.Get().(io.Reader); ok {
entropy = e
defer pool.Put(e)
}
return ulid.MustNew(ulid.Now(), entropy)
}
2022-10-13 15:32:25 -06:00
// Event implements functionality of an individual event used with the event store. It should implement the getter/setter for EventMeta and BinaryMarshaler/BinaryUnmarshaler.
2022-08-04 14:37:51 -06:00
type Event interface {
EventMeta() Meta
SetEventMeta(Meta)
2022-08-14 10:56:00 -06:00
2022-08-15 08:05:04 -06:00
encoding.BinaryMarshaler
encoding.BinaryUnmarshaler
2022-08-04 14:37:51 -06:00
}
// Events is a list of events
type Events []Event
func NewEvents(lis ...Event) Events {
for i, e := range lis {
meta := e.EventMeta()
meta.Position = uint64(i)
meta.EventID = getULID()
e.SetEventMeta(meta)
}
return lis
}
func (lis Events) StreamID() string {
if len(lis) == 0 {
return ""
}
return lis.First().EventMeta().StreamID
}
func (lis Events) SetStreamID(streamID string) {
SetStreamID(streamID, lis...)
}
func (lis Events) Count() int64 {
return int64(len(lis))
}
2022-08-04 14:37:51 -06:00
func (lis Events) First() Event {
if len(lis) == 0 {
return NilEvent
2022-08-04 14:37:51 -06:00
}
return lis[0]
}
func (lis Events) Rest() Events {
if len(lis) == 0 {
return nil
}
return lis[1:]
}
2022-08-04 21:07:10 -06:00
func (lis Events) Last() Event {
if len(lis) == 0 {
return NilEvent
2022-08-04 21:07:10 -06:00
}
return lis[len(lis)-1]
}
2022-08-04 14:37:51 -06:00
2022-10-25 16:07:46 -06:00
func TypeOf(e any) string {
2022-08-04 14:37:51 -06:00
if ie, ok := e.(interface{ UnwrapEvent() Event }); ok {
e = ie.UnwrapEvent()
}
if e, ok := e.(interface{ EventType() string }); ok {
return e.EventType()
}
// Default to printed representation for unnamed types
return strings.Trim(fmt.Sprintf("%T", e), "*")
}
type streamID string
func (s streamID) StreamID() string {
return string(s)
}
func StreamID(e Event) streamID {
return streamID(e.EventMeta().StreamID)
}
func SetStreamID(id string, lis ...Event) {
for _, e := range lis {
meta := e.EventMeta()
meta.StreamID = id
if meta.ActualStreamID == "" {
meta.ActualStreamID = id
}
2022-08-04 14:37:51 -06:00
e.SetEventMeta(meta)
}
}
func EventID(e Event) ulid.ULID {
return e.EventMeta().EventID
}
func SetEventID(e Event, id ulid.ULID) {
meta := e.EventMeta()
meta.EventID = id
e.SetEventMeta(meta)
}
func SetPosition(e Event, i uint64) {
meta := e.EventMeta()
meta.Position = i
meta.ActualPosition = i
2022-08-04 14:37:51 -06:00
e.SetEventMeta(meta)
}
2022-08-04 21:07:10 -06:00
2022-08-04 14:37:51 -06:00
type Meta struct {
EventID ulid.ULID
StreamID string
Position uint64
ActualStreamID string
ActualPosition uint64
2022-08-04 14:37:51 -06:00
}
2022-08-07 11:55:49 -06:00
func (m Meta) Created() time.Time {
2022-08-04 14:37:51 -06:00
return ulid.Time(m.EventID.Time())
}
func (m Meta) GetEventID() string { return m.EventID.String() }
2022-08-04 21:07:10 -06:00
func Init(ctx context.Context) error {
2022-10-30 10:00:53 -06:00
if err := Register(ctx, NilEvent, &EventPtr{}); err != nil {
return err
}
if err := RegisterName(ctx, "event.eventPtr", &EventPtr{}); err != nil {
return err
}
return nil
}
type nilEvent struct {
2022-08-04 21:07:10 -06:00
}
func (*nilEvent) EventMeta() Meta { return Meta{} }
2022-08-14 10:56:00 -06:00
func (*nilEvent) SetEventMeta(eventMeta Meta) {}
var NilEvent = &nilEvent{}
2022-08-04 21:07:10 -06:00
2022-08-15 08:05:04 -06:00
func (e *nilEvent) MarshalBinary() ([]byte, error) {
2022-08-14 10:56:00 -06:00
return json.Marshal(e)
}
2022-08-15 08:05:04 -06:00
func (e *nilEvent) UnmarshalBinary(b []byte) error {
2022-08-14 10:56:00 -06:00
return json.Unmarshal(b, e)
}
2022-10-30 09:18:08 -06:00
type EventPtr struct {
StreamID string `json:"stream_id"`
Pos uint64 `json:"pos"`
eventMeta Meta
}
2022-10-30 09:18:08 -06:00
var _ Event = (*EventPtr)(nil)
2022-10-30 09:18:08 -06:00
func NewPtr(streamID string, pos uint64) *EventPtr {
return &EventPtr{StreamID: streamID, Pos: pos}
}
// MarshalBinary implements Event
2022-10-30 09:18:08 -06:00
func (e *EventPtr) MarshalBinary() (data []byte, err error) {
return []byte(fmt.Sprintf("%s@%d", e.StreamID, e.Pos)), nil
}
// UnmarshalBinary implements Event
2022-10-30 09:18:08 -06:00
func (e *EventPtr) UnmarshalBinary(data []byte) error {
s := string(data)
idx := strings.LastIndex(s, "@")
if idx == -1 {
return fmt.Errorf("missing @ in: %s", s)
}
2022-10-30 09:18:08 -06:00
e.StreamID = s[:idx]
var err error
2022-10-30 09:18:08 -06:00
e.Pos, err = strconv.ParseUint(s[idx+1:], 10, 64)
return err
}
// EventMeta implements Event
2022-10-30 09:18:08 -06:00
func (e *EventPtr) EventMeta() Meta {
if e == nil {
return Meta{}
}
return e.eventMeta
}
// SetEventMeta implements Event
2022-10-30 09:18:08 -06:00
func (e *EventPtr) SetEventMeta(m Meta) {
if e == nil {
return
}
e.eventMeta = m
}
2022-10-30 09:18:08 -06:00
func (e *EventPtr) Values() any {
return struct {
StreamID string `json:"stream_id"`
Pos uint64 `json:"pos"`
}{
2022-10-30 09:18:08 -06:00
e.StreamID,
e.Pos,
}
}