138 lines
3.2 KiB
Go
138 lines
3.2 KiB
Go
package event
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"sync"
|
|
)
|
|
|
|
// Aggregate implements functionality for working with event store streams as an aggregate.
|
|
// When creating a new Aggregate the struct should have an ApplyEvent method and embed the AggregateRoot.
|
|
type Aggregate interface {
|
|
// ApplyEvent applies the event to the aggrigate state
|
|
ApplyEvent(...Event)
|
|
|
|
AggregateRoot
|
|
}
|
|
|
|
func Start(a Aggregate, i uint64) {
|
|
a.start(i)
|
|
}
|
|
|
|
// Raise adds new uncommitted events
|
|
func Raise(a Aggregate, lis ...Event) {
|
|
lis = NewEvents(lis...)
|
|
SetStreamID(a.StreamID(), lis...)
|
|
a.raise(lis...)
|
|
a.ApplyEvent(lis...)
|
|
}
|
|
|
|
// Append adds new committed events
|
|
func Append(a Aggregate, lis ...Event) {
|
|
a.append(lis...)
|
|
a.ApplyEvent(lis...)
|
|
}
|
|
|
|
// NotExists returns error if there are no events present.
|
|
func NotExists(a Aggregate) error {
|
|
if a.Version() != 0 {
|
|
return fmt.Errorf("%w, got version == %d", ErrShouldNotExist, a.Version())
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// ShouldExists returns error if there are no events present.
|
|
func ShouldExist(a Aggregate) error {
|
|
if a.Version() == 0 {
|
|
return fmt.Errorf("%w, got version == %d", ErrShouldExist, a.Version())
|
|
}
|
|
return nil
|
|
}
|
|
|
|
type AggregateRoot interface {
|
|
// Events returns the aggregate events
|
|
// pass true for only uncommitted events
|
|
Events(bool) Events
|
|
// StreamID returns aggregate stream ID
|
|
StreamID() string
|
|
// SetStreamID sets aggregate stream ID
|
|
SetStreamID(streamID string)
|
|
// StreamVersion returns last commit events
|
|
StreamVersion() uint64
|
|
// Version returns the current aggrigate version. (committed + uncommitted)
|
|
Version() uint64
|
|
|
|
start(uint64)
|
|
raise(lis ...Event)
|
|
append(lis ...Event)
|
|
Commit()
|
|
}
|
|
|
|
var _ AggregateRoot = &IsAggregate{}
|
|
|
|
type IsAggregate struct {
|
|
events Events
|
|
streamID string
|
|
firstIndex uint64
|
|
lastIndex uint64
|
|
|
|
mu sync.RWMutex
|
|
}
|
|
|
|
func (a *IsAggregate) Commit() { a.lastIndex = uint64(len(a.events)) }
|
|
func (a *IsAggregate) StreamID() string { return a.streamID }
|
|
func (a *IsAggregate) SetStreamID(streamID string) { a.streamID = streamID }
|
|
func (a *IsAggregate) StreamVersion() uint64 { return a.lastIndex }
|
|
func (a *IsAggregate) Version() uint64 { return a.firstIndex + uint64(len(a.events)) }
|
|
func (a *IsAggregate) Events(new bool) Events {
|
|
a.mu.RLock()
|
|
defer a.mu.RUnlock()
|
|
|
|
events := a.events
|
|
if new {
|
|
events = events[a.lastIndex-a.firstIndex:]
|
|
}
|
|
|
|
lis := make(Events, len(events))
|
|
copy(lis, events)
|
|
|
|
return lis
|
|
}
|
|
|
|
func (a *IsAggregate) start(i uint64) {
|
|
a.firstIndex = i
|
|
a.lastIndex = i
|
|
}
|
|
|
|
//lint:ignore U1000 is called by embeded interface
|
|
func (a *IsAggregate) raise(lis ...Event) { //nolint
|
|
a.mu.Lock()
|
|
defer a.mu.Unlock()
|
|
|
|
a.posStartAt(lis...)
|
|
|
|
a.events = append(a.events, lis...)
|
|
}
|
|
|
|
//lint:ignore U1000 is called by embeded interface
|
|
func (a *IsAggregate) append(lis ...Event) {
|
|
a.mu.Lock()
|
|
defer a.mu.Unlock()
|
|
|
|
a.posStartAt(lis...)
|
|
|
|
a.events = append(a.events, lis...)
|
|
a.lastIndex += uint64(len(lis))
|
|
}
|
|
|
|
func (a *IsAggregate) posStartAt(lis ...Event) {
|
|
for i, e := range lis {
|
|
m := e.EventMeta()
|
|
m.Position = a.lastIndex + uint64(i) + 1
|
|
e.SetEventMeta(m)
|
|
}
|
|
}
|
|
|
|
var ErrShouldNotExist = errors.New("should not exist")
|
|
var ErrShouldExist = errors.New("should exist")
|