2022-10-13 15:32:25 -06:00
// package event implements functionality for working with an eventstore.
2022-08-04 14:37:51 -06:00
package event
import (
2022-09-06 10:35:14 -06:00
"context"
2022-08-04 14:37:51 -06:00
"crypto/rand"
"fmt"
"io"
2022-09-06 10:35:14 -06:00
"strconv"
2022-08-04 14:37:51 -06:00
"strings"
"sync"
"time"
ulid "github.com/oklog/ulid/v2"
)
var pool = sync . Pool {
New : func ( ) interface { } { return ulid . Monotonic ( rand . Reader , 0 ) } ,
}
func getULID ( ) ulid . ULID {
var entropy io . Reader = rand . Reader
if e , ok := pool . Get ( ) . ( io . Reader ) ; ok {
entropy = e
defer pool . Put ( e )
}
return ulid . MustNew ( ulid . Now ( ) , entropy )
}
2022-10-13 15:32:25 -06:00
// Event implements functionality of an individual event used with the event store. It should implement the getter/setter for EventMeta and BinaryMarshaler/BinaryUnmarshaler.
2022-08-04 14:37:51 -06:00
type Event interface {
EventMeta ( ) Meta
SetEventMeta ( Meta )
}
// Events is a list of events
type Events [ ] Event
func NewEvents ( lis ... Event ) Events {
for i , e := range lis {
meta := e . EventMeta ( )
meta . Position = uint64 ( i )
2022-12-19 10:50:38 -07:00
if meta . ActualPosition == 0 {
meta . ActualPosition = uint64 ( i )
}
2022-08-04 14:37:51 -06:00
meta . EventID = getULID ( )
e . SetEventMeta ( meta )
}
return lis
}
func ( lis Events ) StreamID ( ) string {
if len ( lis ) == 0 {
return ""
}
return lis . First ( ) . EventMeta ( ) . StreamID
}
func ( lis Events ) SetStreamID ( streamID string ) {
SetStreamID ( streamID , lis ... )
}
2022-10-25 20:15:57 -06:00
func ( lis Events ) Count ( ) int64 {
return int64 ( len ( lis ) )
}
2022-08-04 14:37:51 -06:00
func ( lis Events ) First ( ) Event {
if len ( lis ) == 0 {
2022-08-09 16:23:33 -06:00
return NilEvent
2022-08-04 14:37:51 -06:00
}
return lis [ 0 ]
}
func ( lis Events ) Rest ( ) Events {
if len ( lis ) == 0 {
return nil
}
return lis [ 1 : ]
}
2022-08-04 21:07:10 -06:00
func ( lis Events ) Last ( ) Event {
if len ( lis ) == 0 {
2022-08-09 16:23:33 -06:00
return NilEvent
2022-08-04 21:07:10 -06:00
}
return lis [ len ( lis ) - 1 ]
}
2022-08-04 14:37:51 -06:00
2022-10-25 16:07:46 -06:00
func TypeOf ( e any ) string {
2022-08-04 14:37:51 -06:00
if ie , ok := e . ( interface { UnwrapEvent ( ) Event } ) ; ok {
e = ie . UnwrapEvent ( )
}
if e , ok := e . ( interface { EventType ( ) string } ) ; ok {
return e . EventType ( )
}
// Default to printed representation for unnamed types
return strings . Trim ( fmt . Sprintf ( "%T" , e ) , "*" )
}
type streamID string
func ( s streamID ) StreamID ( ) string {
return string ( s )
}
func StreamID ( e Event ) streamID {
return streamID ( e . EventMeta ( ) . StreamID )
}
func SetStreamID ( id string , lis ... Event ) {
for _ , e := range lis {
meta := e . EventMeta ( )
meta . StreamID = id
2022-11-20 10:15:51 -07:00
if meta . ActualStreamID == "" {
meta . ActualStreamID = id
}
2022-08-04 14:37:51 -06:00
e . SetEventMeta ( meta )
}
}
func EventID ( e Event ) ulid . ULID {
return e . EventMeta ( ) . EventID
}
func SetEventID ( e Event , id ulid . ULID ) {
meta := e . EventMeta ( )
meta . EventID = id
e . SetEventMeta ( meta )
}
func SetPosition ( e Event , i uint64 ) {
meta := e . EventMeta ( )
meta . Position = i
2022-11-20 10:15:51 -07:00
meta . ActualPosition = i
2022-08-04 14:37:51 -06:00
e . SetEventMeta ( meta )
}
2022-08-04 21:07:10 -06:00
2022-08-04 14:37:51 -06:00
type Meta struct {
2022-11-20 10:15:51 -07:00
EventID ulid . ULID
StreamID string
Position uint64
ActualStreamID string
ActualPosition uint64
2022-08-04 14:37:51 -06:00
}
2022-08-07 11:55:49 -06:00
func ( m Meta ) Created ( ) time . Time {
2022-08-04 14:37:51 -06:00
return ulid . Time ( m . EventID . Time ( ) )
}
2022-08-09 16:23:33 -06:00
func ( m Meta ) GetEventID ( ) string { return m . EventID . String ( ) }
2022-08-04 21:07:10 -06:00
2022-09-06 10:35:14 -06:00
func Init ( ctx context . Context ) error {
2022-10-30 10:00:53 -06:00
if err := Register ( ctx , NilEvent , & EventPtr { } ) ; err != nil {
return err
}
if err := RegisterName ( ctx , "event.eventPtr" , & EventPtr { } ) ; err != nil {
return err
}
return nil
2022-09-06 10:35:14 -06:00
}
2022-11-20 10:15:51 -07:00
type nilEvent struct {
2023-04-02 16:45:17 -06:00
IsEvent
2022-08-04 21:07:10 -06:00
}
2022-11-20 10:15:51 -07:00
2022-08-14 10:56:00 -06:00
var NilEvent = & nilEvent { }
2022-08-04 21:07:10 -06:00
2022-08-15 08:05:04 -06:00
func ( e * nilEvent ) MarshalBinary ( ) ( [ ] byte , error ) {
2023-04-02 16:45:17 -06:00
return nil , nil
2022-08-14 10:56:00 -06:00
}
2022-08-15 08:05:04 -06:00
func ( e * nilEvent ) UnmarshalBinary ( b [ ] byte ) error {
2023-04-02 16:45:17 -06:00
return nil
2022-08-14 10:56:00 -06:00
}
2022-09-06 10:35:14 -06:00
2022-10-30 09:18:08 -06:00
type EventPtr struct {
StreamID string ` json:"stream_id" `
Pos uint64 ` json:"pos" `
2022-09-06 10:35:14 -06:00
2023-04-02 16:45:17 -06:00
IsEvent
2022-09-06 10:35:14 -06:00
}
2022-10-30 09:18:08 -06:00
var _ Event = ( * EventPtr ) ( nil )
2022-09-06 10:35:14 -06:00
2022-10-30 09:18:08 -06:00
func NewPtr ( streamID string , pos uint64 ) * EventPtr {
return & EventPtr { StreamID : streamID , Pos : pos }
2022-09-06 10:35:14 -06:00
}
// MarshalBinary implements Event
2022-10-30 09:18:08 -06:00
func ( e * EventPtr ) MarshalBinary ( ) ( data [ ] byte , err error ) {
return [ ] byte ( fmt . Sprintf ( "%s@%d" , e . StreamID , e . Pos ) ) , nil
2022-09-06 10:35:14 -06:00
}
// UnmarshalBinary implements Event
2022-10-30 09:18:08 -06:00
func ( e * EventPtr ) UnmarshalBinary ( data [ ] byte ) error {
2022-09-06 10:35:14 -06:00
s := string ( data )
idx := strings . LastIndex ( s , "@" )
if idx == - 1 {
return fmt . Errorf ( "missing @ in: %s" , s )
}
2022-10-30 09:18:08 -06:00
e . StreamID = s [ : idx ]
2022-09-06 10:35:14 -06:00
var err error
2022-10-30 09:18:08 -06:00
e . Pos , err = strconv . ParseUint ( s [ idx + 1 : ] , 10 , 64 )
2022-09-06 10:35:14 -06:00
return err
}
2022-10-30 09:18:08 -06:00
func ( e * EventPtr ) Values ( ) any {
2022-09-06 10:35:14 -06:00
return struct {
StreamID string ` json:"stream_id" `
Pos uint64 ` json:"pos" `
} {
2022-10-30 09:18:08 -06:00
e . StreamID ,
e . Pos ,
2022-09-06 10:35:14 -06:00
}
}
2022-12-19 10:50:38 -07:00
type FeedTruncated struct {
2023-04-02 16:45:17 -06:00
IsEvent
2022-12-19 10:50:38 -07:00
}
2023-04-02 16:45:17 -06:00
func ( e * FeedTruncated ) Values ( ) any {
return struct {
} { }
2022-12-19 10:50:38 -07:00
}
2023-04-02 16:45:17 -06:00
type property [ T any ] struct {
v T
}
type IsEvent = property [ Meta ]
func ( p * property [ T ] ) EventMeta ( ) T {
if p == nil {
var t T
return t
2022-12-19 10:50:38 -07:00
}
2023-04-02 16:45:17 -06:00
return p . v
2022-12-19 10:50:38 -07:00
}
2023-04-02 16:45:17 -06:00
func ( p * property [ T ] ) SetEventMeta ( x T ) {
if p != nil {
p . v = x
}
2022-12-19 10:50:38 -07:00
}
2023-05-29 09:48:20 -06:00
func AsEvent [ T any ] ( e T ) Event {
return & asEvent [ T ] { payload : e }
}
2023-07-22 08:52:15 -06:00
type asEvent [ T any ] struct {
2023-05-29 09:48:20 -06:00
payload T
IsEvent
}
2023-07-22 08:52:15 -06:00
2023-05-29 09:48:20 -06:00
func ( e asEvent [ T ] ) Payload ( ) T {
return e . payload
}
2023-07-22 08:52:15 -06:00
type AGG interface { ApplyEvent ( ... Event ) }
2023-05-29 09:48:20 -06:00
func AsAggregate [ T AGG ] ( e T ) Aggregate {
return & asAggregate [ T ] { payload : e }
}
2023-07-22 08:52:15 -06:00
type asAggregate [ T AGG ] struct {
2023-05-29 09:48:20 -06:00
payload T
IsAggregate
}
2023-07-22 08:52:15 -06:00
2023-05-29 09:48:20 -06:00
func ( e * asAggregate [ T ] ) Payload ( ) T {
return e . payload
}
func ( e * asAggregate [ T ] ) ApplyEvent ( lis ... Event ) {
e . payload . ApplyEvent ( lis ... )
2023-07-22 08:52:15 -06:00
}