ev/driver/driver.go

70 lines
1.7 KiB
Go
Raw Permalink Normal View History

2022-10-13 15:32:25 -06:00
// package driver defines interfaces to be used by driver implementations.
2022-08-04 14:37:51 -06:00
package driver
import (
"context"
2023-09-29 10:07:24 -06:00
"go.sour.is/ev/event"
"go.sour.is/pkg/math"
2022-08-04 14:37:51 -06:00
)
type Driver interface {
Open(ctx context.Context, dsn string) (Driver, error)
EventLog(ctx context.Context, streamID string) (EventLog, error)
2022-08-04 14:37:51 -06:00
}
type EventLog interface {
Read(ctx context.Context, after, count int64) (event.Events, error)
2022-10-30 09:18:08 -06:00
ReadN(ctx context.Context, index ...uint64) (event.Events, error)
Append(ctx context.Context, events event.Events, version uint64) (uint64, error)
2022-08-14 10:04:15 -06:00
FirstIndex(context.Context) (uint64, error)
LastIndex(context.Context) (uint64, error)
}
2022-12-19 10:50:38 -07:00
type EventLogWithTruncate interface {
Truncate(context.Context, int64) error
}
type EventLogWithUpdate interface {
2022-08-14 10:04:15 -06:00
LoadForUpdate(context.Context, event.Aggregate, func(context.Context, event.Aggregate) error) (uint64, error)
}
type Subscription interface {
Recv(context.Context) <-chan bool
Events(context.Context) (event.Events, error)
Close(context.Context) error
}
type EventStream interface {
2022-08-10 10:09:58 -06:00
Subscribe(ctx context.Context, streamID string, start int64) (Subscription, error)
Send(ctx context.Context, streamID string, events event.Events) error
2022-08-04 14:37:51 -06:00
}
func GenerateStreamIDs(first, last uint64, after, count int64) ([]uint64, error) {
// ---
if first == 0 || last == 0 {
return nil, nil
}
start, count := math.PagerBox(first, last, after, count)
if count == 0 {
return nil, nil
}
streamIDs := make([]uint64, math.Abs(count))
for i := range streamIDs {
streamIDs[i] = start
if count > 0 {
start += 1
} else {
start -= 1
}
if start < first || start > last {
streamIDs = streamIDs[:i+1]
break
}
}
return streamIDs, nil
}