ev/pkg/es/event/aggregate.go

138 lines
3.2 KiB
Go
Raw Normal View History

2022-08-04 14:37:51 -06:00
package event
import (
"errors"
"fmt"
"sync"
)
2022-10-13 15:32:25 -06:00
// Aggregate implements functionality for working with event store streams as an aggregate.
// When creating a new Aggregate the struct should have an ApplyEvent method and embed the AggregateRoot.
2022-08-04 14:37:51 -06:00
type Aggregate interface {
// ApplyEvent applies the event to the aggrigate state
ApplyEvent(...Event)
2023-04-02 21:00:22 -06:00
AggregateRoot
2022-08-04 14:37:51 -06:00
}
func Start(a Aggregate, i uint64) {
a.start(i)
}
2022-08-04 14:37:51 -06:00
// Raise adds new uncommitted events
func Raise(a Aggregate, lis ...Event) {
lis = NewEvents(lis...)
SetStreamID(a.StreamID(), lis...)
a.raise(lis...)
a.ApplyEvent(lis...)
}
// Append adds new committed events
func Append(a Aggregate, lis ...Event) {
a.append(lis...)
a.ApplyEvent(lis...)
}
// NotExists returns error if there are no events present.
func NotExists(a Aggregate) error {
2022-08-23 21:24:13 -06:00
if a.Version() != 0 {
return fmt.Errorf("%w, got version == %d", ErrShouldNotExist, a.Version())
2022-08-04 14:37:51 -06:00
}
return nil
}
2022-08-14 10:04:15 -06:00
// ShouldExists returns error if there are no events present.
func ShouldExist(a Aggregate) error {
2022-08-23 21:24:13 -06:00
if a.Version() == 0 {
return fmt.Errorf("%w, got version == %d", ErrShouldExist, a.Version())
2022-08-14 10:04:15 -06:00
}
return nil
}
2023-04-02 21:00:22 -06:00
type AggregateRoot interface {
2022-08-14 10:04:15 -06:00
// Events returns the aggregate events
2022-08-04 14:37:51 -06:00
// pass true for only uncommitted events
Events(bool) Events
2022-08-14 10:04:15 -06:00
// StreamID returns aggregate stream ID
StreamID() string
// SetStreamID sets aggregate stream ID
SetStreamID(streamID string)
2022-08-04 14:37:51 -06:00
// StreamVersion returns last commit events
StreamVersion() uint64
// Version returns the current aggrigate version. (committed + uncommitted)
Version() uint64
start(uint64)
2022-08-04 14:37:51 -06:00
raise(lis ...Event)
append(lis ...Event)
Commit()
}
2023-04-02 21:00:22 -06:00
var _ AggregateRoot = &IsAggregate{}
2022-08-04 14:37:51 -06:00
2023-04-02 21:00:22 -06:00
type IsAggregate struct {
events Events
streamID string
firstIndex uint64
lastIndex uint64
2022-08-04 14:37:51 -06:00
mu sync.RWMutex
}
2023-04-02 21:00:22 -06:00
func (a *IsAggregate) Commit() { a.lastIndex = uint64(len(a.events)) }
func (a *IsAggregate) StreamID() string { return a.streamID }
func (a *IsAggregate) SetStreamID(streamID string) { a.streamID = streamID }
func (a *IsAggregate) StreamVersion() uint64 { return a.lastIndex }
func (a *IsAggregate) Version() uint64 { return a.firstIndex + uint64(len(a.events)) }
func (a *IsAggregate) Events(new bool) Events {
2022-08-04 14:37:51 -06:00
a.mu.RLock()
defer a.mu.RUnlock()
events := a.events
if new {
events = events[a.lastIndex-a.firstIndex:]
2022-08-04 14:37:51 -06:00
}
lis := make(Events, len(events))
copy(lis, events)
return lis
}
2023-04-02 21:00:22 -06:00
func (a *IsAggregate) start(i uint64) {
a.firstIndex = i
a.lastIndex = i
}
2022-08-04 14:37:51 -06:00
//lint:ignore U1000 is called by embeded interface
2023-04-02 21:00:22 -06:00
func (a *IsAggregate) raise(lis ...Event) { //nolint
2022-08-04 14:37:51 -06:00
a.mu.Lock()
defer a.mu.Unlock()
a.posStartAt(lis...)
a.events = append(a.events, lis...)
}
//lint:ignore U1000 is called by embeded interface
2023-04-02 21:00:22 -06:00
func (a *IsAggregate) append(lis ...Event) {
2022-08-04 14:37:51 -06:00
a.mu.Lock()
defer a.mu.Unlock()
a.posStartAt(lis...)
a.events = append(a.events, lis...)
a.lastIndex += uint64(len(lis))
2022-08-04 14:37:51 -06:00
}
2023-04-02 21:00:22 -06:00
func (a *IsAggregate) posStartAt(lis ...Event) {
2022-08-04 14:37:51 -06:00
for i, e := range lis {
m := e.EventMeta()
m.Position = a.lastIndex + uint64(i) + 1
2022-08-04 14:37:51 -06:00
e.SetEventMeta(m)
}
}
var ErrShouldNotExist = errors.New("should not exist")
2022-08-14 10:04:15 -06:00
var ErrShouldExist = errors.New("should exist")