feat: add subscriptions and rework interfaces

This commit is contained in:
Jon Lundy 2022-08-09 16:23:33 -06:00
parent 82f23ae323
commit 0642879c07
Signed by untrusted user who does not match committer: xuu
GPG Key ID: C63E6D61F3035024
16 changed files with 1368 additions and 627 deletions

View File

@ -1,7 +1,7 @@
export EV_DATA = mem: export EV_DATA = mem:
# export EV_HTTP = :8080 export EV_HTTP = :8080
run: run: gen
go run . go run .
test: test:
go test -cover -race ./... go test -cover -race ./...
@ -9,10 +9,15 @@ test:
GQLDIR=api/gql_ev GQLDIR=api/gql_ev
GQLS=$(wildcard $(GQLDIR)/*.go) $(wildcard $(GQLDIR)/*.graphqls) gqlgen.yml GQLS=$(wildcard $(GQLDIR)/*.go) $(wildcard $(GQLDIR)/*.graphqls) gqlgen.yml
GQLSRC=internal/ev/graph/generated/generated.go GQLSRC=internal/graph/generated/generated.go
gen: gql gen: gql
gql: $(GQLSRC) gql: $(GQLSRC)
$(GQLSRC): $(GQLS) $(GQLSRC): $(GQLS)
go get github.com/99designs/gqlgen@latest ifeq (, $(shell which gqlgen))
go run github.com/99designs/gqlgen go install github.com/99designs/gqlgen@latest
endif
gqlgen
load:
watch -n .1 "http POST localhost:8080/event/asdf/test a=b one=1 two:='{\"v\":2}' | jq"

View File

@ -1,6 +1,5 @@
extend type Query { scalar Time
events(streamID: String! paging: PageInput): Connection! scalar Map
}
type Connection { type Connection {
paging: PageInfo! paging: PageInfo!
@ -21,26 +20,24 @@ interface Edge {
id: ID! id: ID!
} }
type Event implements Edge {
id: ID!
payload: String!
tags: [String!]!
meta: Meta!
}
type Meta { type Meta {
id: String! eventID: String! @goField(name: "getEventID")
streamID: String! streamID: String!
created: Time! created: Time!
position: Int! position: Int!
} }
scalar Time directive @goModel(
model: String
models: [String!]
) on OBJECT | INPUT_OBJECT | SCALAR | ENUM | INTERFACE | UNION
directive @goField( directive @goField(
forceResolver: Boolean forceResolver: Boolean
name: String name: String
) on INPUT_FIELD_DEFINITION | FIELD_DEFINITION ) on INPUT_FIELD_DEFINITION | FIELD_DEFINITION
directive @goTag(
key: String!
value: String
) on INPUT_FIELD_DEFINITION | FIELD_DEFINITION

View File

@ -1,6 +1,11 @@
package gql_ev package gql_ev
import "github.com/sour-is/ev/pkg/es/event" import (
"context"
"encoding/json"
"github.com/sour-is/ev/pkg/es/event"
)
type Edge interface { type Edge interface {
IsEdge() IsEdge()
@ -11,14 +16,19 @@ type Connection struct {
Edges []Edge `json:"edges"` Edges []Edge `json:"edges"`
} }
type Event struct { type PostEvent struct {
ID string `json:"id"` ID string `json:"id"`
Payload string `json:"payload"` Payload string `json:"payload"`
Tags []string `json:"tags"` Tags []string `json:"tags"`
Meta *event.Meta `json:"meta"` Meta *event.Meta `json:"meta"`
} }
func (Event) IsEdge() {} func (PostEvent) IsEdge() {}
func (e *PostEvent) PayloadJSON(ctx context.Context) (m map[string]interface{}, err error) {
err = json.Unmarshal([]byte(e.Payload), &m)
return
}
type PageInfo struct { type PageInfo struct {
Next bool `json:"next"` Next bool `json:"next"`

View File

@ -0,0 +1,15 @@
extend type Query {
posts(streamID: String! paging: PageInput): Connection!
}
extend type Subscription {
postAdded(streamID: String!): PostEvent
}
type PostEvent implements Edge {
id: ID!
payload: String!
payloadJSON: Map!
tags: [String!]!
meta: Meta!
}

View File

@ -2,9 +2,12 @@ package gql_ev
import ( import (
"context" "context"
"fmt"
"log"
"time"
"github.com/sour-is/ev/pkg/es/driver" "github.com/sour-is/ev/pkg/es"
"github.com/sour-is/ev/pkg/es/service" "github.com/sour-is/ev/pkg/msgbus"
) )
// This file will not be regenerated automatically. // This file will not be regenerated automatically.
@ -12,15 +15,15 @@ import (
// It serves as dependency injection for your app, add any dependencies you require here. // It serves as dependency injection for your app, add any dependencies you require here.
type Resolver struct { type Resolver struct {
es driver.EventStore es *es.EventStore
} }
func New(es driver.EventStore) *Resolver { func New(es *es.EventStore) *Resolver {
return &Resolver{es} return &Resolver{es}
} }
// Events is the resolver for the events field. // Posts is the resolver for the events field.
func (r *Resolver) Events(ctx context.Context, streamID string, paging *PageInput) (*Connection, error) { func (r *Resolver) Posts(ctx context.Context, streamID string, paging *PageInput) (*Connection, error) {
lis, err := r.es.Read(ctx, streamID, paging.GetIdx(0), paging.GetCount(30)) lis, err := r.es.Read(ctx, streamID, paging.GetIdx(0), paging.GetCount(30))
if err != nil { if err != nil {
return nil, err return nil, err
@ -31,12 +34,12 @@ func (r *Resolver) Events(ctx context.Context, streamID string, paging *PageInpu
e := lis[i] e := lis[i]
m := e.EventMeta() m := e.EventMeta()
post, ok := e.(*service.PostEvent) post, ok := e.(*msgbus.PostEvent)
if !ok { if !ok {
continue continue
} }
edges = append(edges, Event{ edges = append(edges, PostEvent{
ID: lis[i].EventMeta().EventID.String(), ID: lis[i].EventMeta().EventID.String(),
Payload: string(post.Payload), Payload: string(post.Payload),
Tags: post.Tags, Tags: post.Tags,
@ -62,3 +65,50 @@ func (r *Resolver) Events(ctx context.Context, streamID string, paging *PageInpu
Edges: edges, Edges: edges,
}, nil }, nil
} }
func (r *Resolver) PostAdded(ctx context.Context, streamID string) (<-chan *PostEvent, error) {
es := r.es.EventStream()
if es == nil {
return nil, fmt.Errorf("EventStore does not implement streaming")
}
sub, err := es.Subscribe(ctx, streamID)
if err != nil {
return nil, err
}
ch := make(chan *PostEvent)
go func() {
defer func() {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
log.Print(sub.Close(ctx))
}()
for sub.Recv(ctx) {
events, err := sub.Events(ctx)
if err != nil {
break
}
for _, e := range events {
m := e.EventMeta()
if p, ok := e.(*msgbus.PostEvent); ok {
select {
case ch <- &PostEvent{
ID: m.EventID.String(),
Payload: string(p.Payload),
Tags: p.Tags,
Meta: &m,
}:
continue
case <-ctx.Done():
return
}
}
}
}
}()
return ch, nil
}

File diff suppressed because it is too large Load Diff

View File

@ -24,11 +24,11 @@ func New(r *gql_ev.Resolver) *Resolver {
func (r *Resolver) Query() generated.QueryResolver { return &queryResolver{r} } func (r *Resolver) Query() generated.QueryResolver { return &queryResolver{r} }
// Subscription returns generated.SubscriptionResolver implementation. // Subscription returns generated.SubscriptionResolver implementation.
// func (r *Resolver) Subscription() generated.SubscriptionResolver { return &subscriptionResolver{r} } func (r *Resolver) Subscription() generated.SubscriptionResolver { return &subscriptionResolver{r} }
type queryResolver struct{ *Resolver } type queryResolver struct{ *Resolver }
// type subscriptionResolver struct{ *Resolver } type subscriptionResolver struct{ *Resolver }
func (r *Resolver) ChainMiddlewares(h http.Handler) http.Handler { func (r *Resolver) ChainMiddlewares(h http.Handler) http.Handler {
v := reflect.ValueOf(r) // Get reflected value of *Resolver v := reflect.ValueOf(r) // Get reflected value of *Resolver

View File

@ -17,7 +17,8 @@ import (
"github.com/sour-is/ev/pkg/es" "github.com/sour-is/ev/pkg/es"
diskstore "github.com/sour-is/ev/pkg/es/driver/disk-store" diskstore "github.com/sour-is/ev/pkg/es/driver/disk-store"
memstore "github.com/sour-is/ev/pkg/es/driver/mem-store" memstore "github.com/sour-is/ev/pkg/es/driver/mem-store"
"github.com/sour-is/ev/pkg/es/service" "github.com/sour-is/ev/pkg/es/driver/streamer"
"github.com/sour-is/ev/pkg/msgbus"
"github.com/sour-is/ev/pkg/playground" "github.com/sour-is/ev/pkg/playground"
) )
@ -36,12 +37,12 @@ func run(ctx context.Context) error {
diskstore.Init(ctx) diskstore.Init(ctx)
memstore.Init(ctx) memstore.Init(ctx)
es, err := es.Open(ctx, env("EV_DATA", "file:data")) es, err := es.Open(ctx, env("EV_DATA", "file:data"), streamer.New(ctx))
if err != nil { if err != nil {
return err return err
} }
svc, err := service.New(ctx, es) svc, err := msgbus.New(ctx, es)
if err != nil { if err != nil {
return err return err
} }

View File

@ -12,6 +12,7 @@ import (
"github.com/sour-is/ev/pkg/es" "github.com/sour-is/ev/pkg/es"
"github.com/sour-is/ev/pkg/es/driver" "github.com/sour-is/ev/pkg/es/driver"
"github.com/sour-is/ev/pkg/es/event" "github.com/sour-is/ev/pkg/es/event"
"github.com/sour-is/ev/pkg/locker"
"github.com/sour-is/ev/pkg/math" "github.com/sour-is/ev/pkg/math"
) )
@ -19,14 +20,17 @@ type diskStore struct {
path string path string
} }
var _ driver.Driver = (*diskStore)(nil) const AppendOnly = es.AppendOnly
const AllEvents = es.AllEvents
func Init(ctx context.Context) error { func Init(ctx context.Context) error {
es.Register(ctx, "file", &diskStore{}) es.Register(ctx, "file", &diskStore{})
return nil return nil
} }
func (diskStore) Open(dsn string) (driver.EventStore, error) { var _ driver.Driver = (*diskStore)(nil)
func (diskStore) Open(_ context.Context, dsn string) (driver.Driver, error) {
scheme, path, ok := strings.Cut(dsn, ":") scheme, path, ok := strings.Cut(dsn, ":")
if !ok { if !ok {
return nil, fmt.Errorf("expected scheme") return nil, fmt.Errorf("expected scheme")
@ -45,181 +49,151 @@ func (diskStore) Open(dsn string) (driver.EventStore, error) {
return &diskStore{path: path}, nil return &diskStore{path: path}, nil
} }
func (ds *diskStore) EventLog(ctx context.Context, streamID string) (driver.EventLog, error) {
func (es *diskStore) Save(ctx context.Context, agg event.Aggregate) (uint64, error) { el := &eventLog{streamID: streamID}
l, err := es.readLog(agg.StreamID()) l, err := wal.Open(filepath.Join(ds.path, streamID), wal.DefaultOptions)
if err != nil { el.events = locker.New(l)
return 0, err return el, err
}
var last uint64
if last, err = l.LastIndex(); err != nil {
return 0, err
}
if agg.StreamVersion() != last {
return 0, fmt.Errorf("current version wrong %d != %d", agg.StreamVersion(), last)
}
events := agg.Events(true)
var b []byte
batch := &wal.Batch{}
for _, e := range events {
b, err = event.MarshalText(e)
if err != nil {
return 0, err
}
batch.Write(e.EventMeta().Position, b)
}
err = l.WriteBatch(batch)
if err != nil {
return 0, err
}
agg.Commit()
return uint64(len(events)), nil
} }
func (es *diskStore) Append(ctx context.Context, streamID string, events event.Events) (uint64, error) {
event.SetStreamID(streamID, events...)
l, err := es.readLog(streamID) type eventLog struct {
if err != nil { streamID string
return 0, err events *locker.Locked[wal.Log]
}
var last uint64
if last, err = l.LastIndex(); err != nil {
return 0, err
}
var b []byte
batch := &wal.Batch{}
for i, e := range events {
b, err = event.MarshalText(e)
if err != nil {
return 0, err
}
pos := last + uint64(i) + 1
event.SetPosition(e, pos)
batch.Write(pos, b)
}
err = l.WriteBatch(batch)
if err != nil {
return 0, err
}
return uint64(len(events)), nil
} }
func (es *diskStore) Load(ctx context.Context, agg event.Aggregate) error {
l, err := es.readLog(agg.StreamID())
if err != nil {
return err
}
var i, first, last uint64 var _ driver.EventLog = (*eventLog)(nil)
if first, err = l.FirstIndex(); err != nil { func (es *eventLog) Append(ctx context.Context, events event.Events, version uint64) (uint64, error) {
return err event.SetStreamID(es.streamID, events...)
}
if last, err = l.LastIndex(); err != nil {
return err
}
if first == 0 || last == 0 {
return nil
}
var b []byte var count uint64
events := make([]event.Event, last-i) err := es.events.Modify(ctx, func(l *wal.Log) error {
for i = 0; first+i <= last; i++ { last, err := l.LastIndex()
b, err = l.Read(first + i)
if err != nil { if err != nil {
return err return err
} }
events[i], err = event.UnmarshalText(ctx, b, first+i)
if err != nil { if version != AppendOnly && version != last {
return err return fmt.Errorf("current version wrong %d != %d", version, last)
} }
}
event.Append(agg, events...)
return nil
}
func (es *diskStore) Read(ctx context.Context, streamID string, pos, count int64) (event.Events, error) {
l, err := es.readLog(streamID)
if err != nil {
return nil, err
}
var first, last, start uint64
if first, err = l.FirstIndex(); err != nil {
return nil, err
}
if last, err = l.LastIndex(); err != nil {
return nil, err
}
if first == 0 || last == 0 {
return nil, nil
}
switch {
case pos >= 0:
start = first + uint64(pos)
if pos == 0 && count < 0 {
count = -count // if pos=0 assume forward count.
}
case pos < 0:
start = uint64(int64(last) + pos + 1)
if pos == -1 && count > 0 {
count = -count // if pos=-1 assume backward count.
}
}
events := make([]event.Event, math.Abs(count))
for i := range events {
var b []byte var b []byte
b, err = l.Read(start) batch := &wal.Batch{}
for i, e := range events {
b, err = event.MarshalText(e)
if err != nil {
return err
}
pos := last + uint64(i) + 1
event.SetPosition(e, pos)
batch.Write(pos, b)
}
count = uint64(len(events))
return l.WriteBatch(batch)
})
return count, err
}
func (es *eventLog) Read(ctx context.Context, pos, count int64) (event.Events, error) {
var events event.Events
err := es.events.Modify(ctx, func(stream *wal.Log) error {
first, err := stream.FirstIndex()
if err != nil { if err != nil {
return events, err return err
} }
events[i], err = event.UnmarshalText(ctx, b, start) last, err := stream.LastIndex()
if err != nil { if err != nil {
return events, err return err
}
// ---
if first == 0 || last == 0 {
return nil
} }
if count > 0 { if count == AllEvents {
start += 1 count = int64(first - last)
} else {
start -= 1
} }
if start < first || start > last {
events = events[:i+1]
break
}
}
event.SetStreamID(streamID, events...)
return events, nil var start uint64
}
func (es *diskStore) FirstIndex(ctx context.Context, streamID string) (uint64, error) { switch {
l, err := es.readLog(streamID) case pos >= 0 && count > 0:
start = first + uint64(pos)
case pos < 0 && count > 0:
start = uint64(int64(last) + pos + 1)
case pos >= 0 && count < 0:
start = first + uint64(pos)
if pos > 1 {
start -= 2 // if pos is positive and count negative start before
}
if pos <= 1 {
return nil // if pos is one or zero and negative count nothing to return
}
case pos < 0 && count < 0:
start = uint64(int64(last) + pos)
}
if start >= last {
return nil // if start is after last and positive count nothing to return
}
events = make([]event.Event, math.Abs(count))
for i := range events {
// ---
var b []byte
b, err = stream.Read(start)
if err != nil {
return err
}
events[i], err = event.UnmarshalText(ctx, b, start)
if err != nil {
return err
}
// ---
if count > 0 {
start += 1
} else {
start -= 1
}
if start < first || start > last {
events = events[:i+1]
break
}
}
return nil
})
if err != nil { if err != nil {
return 0, err return nil, err
} }
return l.FirstIndex()
}
func (es *diskStore) LastIndex(ctx context.Context, streamID string) (uint64, error) {
l, err := es.readLog(streamID)
if err != nil {
return 0, err
}
return l.LastIndex()
}
func (es *diskStore) readLog(name string) (*wal.Log, error) { event.SetStreamID(es.streamID, events...)
return wal.Open(filepath.Join(es.path, name), wal.DefaultOptions)
return events, err
}
func (es *eventLog) FirstIndex(ctx context.Context) (uint64, error) {
var idx uint64
var err error
err = es.events.Modify(ctx, func(events *wal.Log) error {
idx, err = events.FirstIndex()
return err
})
return idx, err
}
func (es *eventLog) LastIndex(ctx context.Context) (uint64, error) {
var idx uint64
var err error
err = es.events.Modify(ctx, func(events *wal.Log) error {
idx, err = events.LastIndex()
return err
})
return idx, err
} }

View File

@ -7,14 +7,24 @@ import (
) )
type Driver interface { type Driver interface {
Open(string) (EventStore, error) Open(ctx context.Context, dsn string) (Driver, error)
EventLog(ctx context.Context, streamID string) (EventLog, error)
} }
type EventStore interface { type EventLog interface {
Save(ctx context.Context, agg event.Aggregate) (uint64, error) Read(ctx context.Context, pos, count int64) (event.Events, error)
Load(ctx context.Context, agg event.Aggregate) error Append(ctx context.Context, events event.Events, version uint64) (uint64, error)
Read(ctx context.Context, streamID string, pos, count int64) (event.Events, error) FirstIndex(ctx context.Context) (uint64, error)
Append(ctx context.Context, streamID string, events event.Events) (uint64, error) LastIndex(ctx context.Context) (uint64, error)
FirstIndex(ctx context.Context, streamID string) (uint64, error) }
LastIndex(ctx context.Context, streamID string) (uint64, error)
type Subscription interface {
Recv(context.Context) bool
Events(context.Context) (event.Events, error)
Close(context.Context) error
}
type EventStream interface {
Subscribe(ctx context.Context, streamID string) (Subscription, error)
Send(ctx context.Context, streamID string, events event.Events) error
} }

View File

@ -12,84 +12,113 @@ import (
) )
type state struct { type state struct {
streams map[string]event.Events streams map[string]*locker.Locked[event.Events]
}
type eventLog struct {
streamID string
events *locker.Locked[event.Events]
} }
type memstore struct { type memstore struct {
state *locker.Locked[state] state *locker.Locked[state]
} }
var _ driver.Driver = (*memstore)(nil) const AppendOnly = es.AppendOnly
const AllEvents = es.AllEvents
func Init(ctx context.Context) { func Init(ctx context.Context) {
es.Register(ctx, "mem", &memstore{}) es.Register(ctx, "mem", &memstore{})
} }
func (memstore) Open(name string) (driver.EventStore, error) { var _ driver.Driver = (*memstore)(nil)
s := &state{streams: make(map[string]event.Events)}
func (memstore) Open(_ context.Context, name string) (driver.Driver, error) {
s := &state{streams: make(map[string]*locker.Locked[event.Events])}
return &memstore{locker.New(s)}, nil return &memstore{locker.New(s)}, nil
} }
func (m *memstore) EventLog(ctx context.Context, streamID string) (driver.EventLog, error) {
el := &eventLog{streamID: streamID}
err := m.state.Modify(ctx, func(state *state) error {
l, ok := state.streams[streamID]
if !ok {
l = locker.New(&event.Events{})
state.streams[streamID] = l
}
el.events = l
return nil
})
if err != nil {
return nil, err
}
return el, err
}
var _ driver.EventLog = (*eventLog)(nil)
// Append implements driver.EventStore // Append implements driver.EventStore
func (m *memstore) Append(ctx context.Context, streamID string, events event.Events) (uint64, error) { func (m *eventLog) Append(ctx context.Context, events event.Events, version uint64) (uint64, error) {
event.SetStreamID(streamID, events...) event.SetStreamID(m.streamID, events...)
return uint64(len(events)), m.events.Modify(ctx, func(stream *event.Events) error {
last := uint64(len(*stream))
if version != AppendOnly && version != last {
return fmt.Errorf("current version wrong %d != %d", version, last)
}
return uint64(len(events)), m.state.Modify(ctx, func(state *state) error {
stream := state.streams[streamID]
last := uint64(len(stream))
for i := range events { for i := range events {
pos := last + uint64(i) + 1 pos := last + uint64(i) + 1
event.SetPosition(events[i], pos) event.SetPosition(events[i], pos)
stream = append(stream, events[i]) *stream = append(*stream, events[i])
state.streams[streamID] = stream
} }
return nil return nil
}) })
} }
// Load implements driver.EventStore
func (m *memstore) Load(ctx context.Context, agg event.Aggregate) error {
return m.state.Modify(ctx, func(state *state) error {
events := state.streams[agg.StreamID()]
event.SetStreamID(agg.StreamID(), events...)
agg.ApplyEvent(events...)
return nil
})
}
// Read implements driver.EventStore // Read implements driver.EventStore
func (m *memstore) Read(ctx context.Context, streamID string, pos int64, count int64) (event.Events, error) { func (es *eventLog) Read(ctx context.Context, pos int64, count int64) (event.Events, error) {
events := make([]event.Event, math.Abs(count)) var events event.Events
err := m.state.Modify(ctx, func(state *state) error {
stream := state.streams[streamID]
var first, last, start uint64
first = stream.First().EventMeta().Position
last = stream.Last().EventMeta().Position
err := es.events.Modify(ctx, func(stream *event.Events) error {
first := stream.First().EventMeta().Position
last := stream.Last().EventMeta().Position
// ---
if first == 0 || last == 0 { if first == 0 || last == 0 {
events = events[:0]
return nil return nil
} }
switch { if count == AllEvents {
case pos >= 0: count = int64(first - last)
start = first + uint64(pos)
if pos == 0 && count < 0 {
count = -count // if pos=0 assume forward count.
}
case pos < 0:
start = uint64(int64(last) + pos + 1)
if pos == -1 && count > 0 {
count = -count // if pos=-1 assume backward count.
}
} }
var start uint64
switch {
case pos >= 0 && count > 0:
start = first + uint64(pos)
case pos < 0 && count > 0:
start = uint64(int64(last) + pos + 1)
case pos >= 0 && count < 0:
start = first + uint64(pos)
if pos > 1 {
start -= 2 // if pos is positive and count negative start before
}
if pos <= 1 {
return nil // if pos is one or zero and negative count nothing to return
}
case pos < 0 && count < 0:
start = uint64(int64(last) + pos)
}
if start >= last {
return nil // if start is after last and positive count nothing to return
}
events = make([]event.Event, math.Abs(count))
for i := range events { for i := range events {
events[i] = stream[start-1] // ---
events[i] = (*stream)[start-1]
// ---
if count > 0 { if count > 0 {
start += 1 start += 1
@ -108,51 +137,19 @@ func (m *memstore) Read(ctx context.Context, streamID string, pos int64, count i
return nil, err return nil, err
} }
event.SetStreamID(es.streamID, events...)
return events, nil return events, nil
} }
// Save implements driver.EventStore // FirstIndex for the streamID
func (m *memstore) Save(ctx context.Context, agg event.Aggregate) (uint64, error) { func (m *eventLog) FirstIndex(ctx context.Context) (uint64, error) {
events := agg.Events(true) events, err := m.events.Copy(ctx)
event.SetStreamID(agg.StreamID(), events...) return events.First().EventMeta().Position, err
err := m.state.Modify(ctx, func(state *state) error {
stream := state.streams[agg.StreamID()]
last := uint64(len(stream))
if agg.StreamVersion() != last {
return fmt.Errorf("current version wrong %d != %d", agg.StreamVersion(), last)
}
for i := range events {
pos := last + uint64(i) + 1
event.SetPosition(events[i], pos)
stream = append(stream, events[i])
}
state.streams[agg.StreamID()] = stream
return nil
})
if err != nil {
return 0, err
}
agg.Commit()
return uint64(len(events)), nil
} }
func (m *memstore) FirstIndex(ctx context.Context, streamID string) (uint64, error) { // LastIndex for the streamID
stream, err := m.state.Copy(ctx) func (m *eventLog) LastIndex(ctx context.Context) (uint64, error) {
if err != nil { events, err := m.events.Copy(ctx)
return 0, err return events.Last().EventMeta().Position, err
}
return stream.streams[streamID].First().EventMeta().Position, nil
}
func (m *memstore) LastIndex(ctx context.Context, streamID string) (uint64, error) {
stream, err := m.state.Copy(ctx)
if err != nil {
return 0, err
}
return stream.streams[streamID].Last().EventMeta().Position, nil
} }

View File

@ -0,0 +1,203 @@
package streamer
import (
"context"
"log"
"github.com/sour-is/ev/pkg/es"
"github.com/sour-is/ev/pkg/es/driver"
"github.com/sour-is/ev/pkg/es/event"
"github.com/sour-is/ev/pkg/locker"
)
type state struct {
subscribers map[string][]*subscription
}
type streamer struct {
state *locker.Locked[state]
up driver.Driver
}
func New(ctx context.Context) *streamer {
return &streamer{state: locker.New(&state{subscribers: map[string][]*subscription{}})}
}
var _ es.Option = (*streamer)(nil)
func (s *streamer) Apply(e *es.EventStore) {
s.up = e.Driver
e.Driver = s
}
func (s *streamer) Unwrap() driver.Driver {
return s.up
}
var _ driver.Driver = (*streamer)(nil)
func (s *streamer) Open(ctx context.Context, dsn string) (driver.Driver, error) {
return s.up.Open(ctx, dsn)
}
func (s *streamer) EventLog(ctx context.Context, streamID string) (driver.EventLog, error) {
l, err := s.up.EventLog(ctx, streamID)
return &wrapper{streamID, l, s}, err
}
var _ driver.EventStream = (*streamer)(nil)
func (s *streamer) Subscribe(ctx context.Context, streamID string) (driver.Subscription, error) {
log.Println("subscribe", streamID)
events, err := s.up.EventLog(ctx, streamID)
if err != nil {
return nil, err
}
sub := &subscription{topic: streamID, events: events}
sub.position = locker.New(&position{
size: es.AllEvents,
})
sub.unsub = s.delete(streamID, sub)
return sub, s.state.Modify(ctx, func(state *state) error {
state.subscribers[streamID] = append(state.subscribers[streamID], sub)
log.Println("subs=", len(state.subscribers[streamID]))
return nil
})
}
func (s *streamer) Send(ctx context.Context, streamID string, events event.Events) error {
log.Println("send", streamID, len(events))
return s.state.Modify(ctx, func(state *state) error {
for _, sub := range state.subscribers[streamID] {
return sub.position.Modify(ctx, func(position *position) error {
position.size = int64(events.Last().EventMeta().Position - uint64(position.idx))
if position.wait != nil {
close(position.wait)
position.wait = nil
}
return nil
})
}
return nil
})
}
func (s *streamer) delete(streamID string, sub *subscription) func(context.Context) error {
return func(ctx context.Context) error {
log.Println("unsub", streamID)
if err := ctx.Err(); err != nil {
return err
}
return s.state.Modify(ctx, func(state *state) error {
lis := state.subscribers[streamID]
for i := range lis {
if lis[i] == sub {
lis[i] = lis[len(lis)-1]
state.subscribers[streamID] = lis[:len(lis)-1]
log.Println("subs=", len(state.subscribers[streamID]))
return nil
}
}
return nil
})
}
}
type wrapper struct {
topic string
up driver.EventLog
streamer *streamer
}
var _ driver.EventLog = (*wrapper)(nil)
func (w *wrapper) Read(ctx context.Context, pos int64, count int64) (event.Events, error) {
return w.up.Read(ctx, pos, count)
}
func (w *wrapper) Append(ctx context.Context, events event.Events, version uint64) (uint64, error) {
i, err := w.up.Append(ctx, events, version)
if err != nil {
return i, err
}
return i, w.streamer.Send(ctx, w.topic, events)
}
func (w *wrapper) FirstIndex(ctx context.Context) (uint64, error) {
return w.up.FirstIndex(ctx)
}
func (w *wrapper) LastIndex(ctx context.Context) (uint64, error) {
return w.up.LastIndex(ctx)
}
type position struct {
size int64
idx int64
wait chan struct{}
}
type subscription struct {
topic string
position *locker.Locked[position]
events driver.EventLog
unsub func(context.Context) error
}
func (s *subscription) Recv(ctx context.Context) bool {
var wait func(context.Context) bool
log.Println("recv more")
err := s.position.Modify(ctx, func(position *position) error {
if position.size == es.AllEvents {
return nil
}
if position.size == 0 {
position.wait = make(chan struct{})
wait = func(ctx context.Context) bool {
log.Println("waiting", s.topic)
select {
case <-position.wait:
log.Println("got some")
return true
case <-ctx.Done():
log.Println("got cancel")
return false
}
}
}
position.idx += position.size
position.size = 0
return nil
})
if err != nil {
return false
}
if wait != nil {
return wait(ctx)
}
return true
}
func (s *subscription) Events(ctx context.Context) (event.Events, error) {
var events event.Events
log.Println("get events")
return events, s.position.Modify(ctx, func(position *position) error {
var err error
events, err = s.events.Read(ctx, int64(position.idx), position.size)
log.Printf("got events=%d %#v", len(events), position)
position.size = int64(len(events))
if len(events) > 0 {
position.idx = int64(events.First().EventMeta().Position - 1)
log.Println(position, events.First())
}
return err
})
}
func (s *subscription) Close(ctx context.Context) error {
return s.unsub(ctx)
}

View File

@ -7,6 +7,7 @@ import (
"strings" "strings"
"github.com/sour-is/ev/pkg/es/driver" "github.com/sour-is/ev/pkg/es/driver"
"github.com/sour-is/ev/pkg/es/event"
"github.com/sour-is/ev/pkg/locker" "github.com/sour-is/ev/pkg/locker"
) )
@ -14,6 +15,9 @@ type config struct {
drivers map[string]driver.Driver drivers map[string]driver.Driver
} }
const AppendOnly = ^uint64(0)
const AllEvents = int64(AppendOnly >> 1)
var ( var (
drivers = locker.New(&config{drivers: make(map[string]driver.Driver)}) drivers = locker.New(&config{drivers: make(map[string]driver.Driver)})
) )
@ -28,23 +32,118 @@ func Register(ctx context.Context, name string, d driver.Driver) error {
}) })
} }
func Open(ctx context.Context, dsn string) (driver.EventStore, error) { type EventStore struct {
driver.Driver
}
func Open(ctx context.Context, dsn string, options ...Option) (*EventStore, error) {
name, _, ok := strings.Cut(dsn, ":") name, _, ok := strings.Cut(dsn, ":")
if !ok { if !ok {
return nil, fmt.Errorf("%w: no scheme", ErrNoDriver) return nil, fmt.Errorf("%w: no scheme", ErrNoDriver)
} }
var d driver.Driver var d driver.Driver
drivers.Modify(ctx,func(c *config) error { c, err := drivers.Copy(ctx)
var ok bool if err != nil {
d, ok = c.drivers[name] return nil, err
if !ok { }
return fmt.Errorf("%w: %s not registered", ErrNoDriver, name)
}
return nil
})
return d.Open(dsn) d, ok = c.drivers[name]
if !ok {
return nil, fmt.Errorf("%w: %s not registered", ErrNoDriver, name)
}
conn, err := d.Open(ctx, dsn)
es := &EventStore{Driver: conn}
for _, o := range options {
o.Apply(es)
}
return es, err
}
type Option interface {
Apply(*EventStore)
}
func (es *EventStore) Save(ctx context.Context, agg event.Aggregate) (uint64, error) {
l, err := es.EventLog(ctx, agg.StreamID())
if err != nil {
return 0, err
}
events := agg.Events(true)
count, err := l.Append(ctx, events, agg.StreamVersion())
if err != nil {
return 0, err
}
agg.Commit()
return count, err
}
func (es *EventStore) Load(ctx context.Context, agg event.Aggregate) error {
l, err := es.Driver.EventLog(ctx, agg.StreamID())
if err != nil {
return err
}
events, err := l.Read(ctx, 0, AllEvents)
if err != nil {
return err
}
event.Append(agg, events...)
return nil
}
func (es *EventStore) Read(ctx context.Context, streamID string, pos, count int64) (event.Events, error) {
l, err := es.Driver.EventLog(ctx, streamID)
if err != nil {
return nil, err
}
return l.Read(ctx, pos, count)
}
func (es *EventStore) Append(ctx context.Context, streamID string, events event.Events) (uint64, error) {
l, err := es.Driver.EventLog(ctx, streamID)
if err != nil {
return 0, err
}
return l.Append(ctx, events, AppendOnly)
}
func (es *EventStore) FirstIndex(ctx context.Context, streamID string) (uint64, error) {
l, err := es.Driver.EventLog(ctx, streamID)
if err != nil {
return 0, err
}
return l.FirstIndex(ctx)
}
func (es *EventStore) LastIndex(ctx context.Context, streamID string) (uint64, error) {
l, err := es.Driver.EventLog(ctx, streamID)
if err != nil {
return 0, err
}
return l.LastIndex(ctx)
}
func (es *EventStore) EventStream() driver.EventStream {
d := es.Driver
for d != nil {
if d, ok := d.(driver.EventStream); ok {
return d
}
d = Unwrap(d)
}
return nil
}
func Unwrap[T any](t T) T {
if unwrap, ok := any(t).(interface{Unwrap() T}); ok {
return unwrap.Unwrap()
} else {
var zero T
return zero
}
} }
var ErrNoDriver = errors.New("no driver") var ErrNoDriver = errors.New("no driver")

View File

@ -54,7 +54,7 @@ func (lis Events) SetStreamID(streamID string) {
} }
func (lis Events) First() Event { func (lis Events) First() Event {
if len(lis) == 0 { if len(lis) == 0 {
return nilEvent return NilEvent
} }
return lis[0] return lis[0]
} }
@ -66,7 +66,7 @@ func (lis Events) Rest() Events {
} }
func (lis Events) Last() Event { func (lis Events) Last() Event {
if len(lis) == 0 { if len(lis) == 0 {
return nilEvent return NilEvent
} }
return lis[len(lis)-1] return lis[len(lis)-1]
} }
@ -134,13 +134,14 @@ type Meta struct {
func (m Meta) Created() time.Time { func (m Meta) Created() time.Time {
return ulid.Time(m.EventID.Time()) return ulid.Time(m.EventID.Time())
} }
func (m Meta) ID() string { return m.EventID.String() } func (m Meta) GetEventID() string { return m.EventID.String() }
type _nilEvent struct{}
func (_nilEvent) EventMeta() Meta { type nilEvent struct{}
func (nilEvent) EventMeta() Meta {
return Meta{} return Meta{}
} }
func (_nilEvent) SetEventMeta(eventMeta Meta) {} func (nilEvent) SetEventMeta(eventMeta Meta) {}
var nilEvent _nilEvent var NilEvent nilEvent

View File

@ -1,6 +1,9 @@
package locker package locker
import "context" import (
"context"
"log"
)
type Locked[T any] struct { type Locked[T any] struct {
state chan *T state chan *T
@ -23,6 +26,9 @@ func (s *Locked[T]) Modify(ctx context.Context, fn func(*T) error) error {
select { select {
case state := <-s.state: case state := <-s.state:
defer func() { s.state <- state }() defer func() { s.state <- state }()
log.Printf("locker %T to %p", state, fn)
defer log.Printf("locker %T from %p", state, fn)
return fn(state) return fn(state)
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err() return ctx.Err()

View File

@ -1,24 +1,26 @@
package service package msgbus
import ( import (
"bytes" "bytes"
"context" "context"
"encoding/json"
"fmt" "fmt"
"io" "io"
"log" "log"
"net/http" "net/http"
"strconv" "strconv"
"strings" "strings"
"time"
"github.com/sour-is/ev/pkg/es/driver" "github.com/sour-is/ev/pkg/es"
"github.com/sour-is/ev/pkg/es/event" "github.com/sour-is/ev/pkg/es/event"
) )
type service struct { type service struct {
es driver.EventStore es *es.EventStore
} }
func New(ctx context.Context, es driver.EventStore) (*service, error) { func New(ctx context.Context, es *es.EventStore) (*service, error) {
if err := event.Register(ctx, &PostEvent{}); err != nil { if err := event.Register(ctx, &PostEvent{}); err != nil {
return nil, err return nil, err
} }
@ -34,6 +36,11 @@ func (s *service) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return return
} }
var first event.Event = event.NilEvent
if lis, err := s.es.Read(ctx, "post-"+name, 0, 1); err == nil && len(lis) > 0 {
first = lis[0]
}
switch r.Method { switch r.Method {
case http.MethodGet: case http.MethodGet:
var pos, count int64 = -1, -99 var pos, count int64 = -1, -99
@ -54,6 +61,18 @@ func (s *service) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return return
} }
if strings.Contains(r.Header.Get("Accept"), "application/json") {
w.Header().Add("Content-Type", "application/json")
if err = encodeJSON(w, first, events); err != nil {
log.Print(err)
w.WriteHeader(http.StatusInternalServerError)
return
}
return
}
for i := range events { for i := range events {
fmt.Fprintln(w, events[i]) fmt.Fprintln(w, events[i])
} }
@ -85,10 +104,29 @@ func (s *service) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return return
} }
if first == event.NilEvent {
first = events.First()
}
m := events.First().EventMeta() m := events.First().EventMeta()
w.WriteHeader(http.StatusAccepted)
log.Print("POST topic=", name, " tags=", tags, " idx=", m.Position, " id=", m.EventID) log.Print("POST topic=", name, " tags=", tags, " idx=", m.Position, " id=", m.EventID)
w.WriteHeader(http.StatusAccepted)
if strings.Contains(r.Header.Get("Accept"), "application/json") {
w.Header().Add("Content-Type", "application/json")
if err = encodeJSON(w, first, events); err != nil {
log.Print(err)
w.WriteHeader(http.StatusInternalServerError)
return
}
return
}
w.Header().Add("Content-Type", "text/plain")
fmt.Fprintf(w, "OK %d %s", m.Position, m.EventID) fmt.Fprintf(w, "OK %d %s", m.Position, m.EventID)
return
default: default:
w.WriteHeader(http.StatusMethodNotAllowed) w.WriteHeader(http.StatusMethodNotAllowed)
} }
@ -137,3 +175,38 @@ func fields(s string) []string {
} }
return strings.Split(s, "/") return strings.Split(s, "/")
} }
func encodeJSON(w io.Writer, first event.Event, events event.Events) error {
out := make([]struct {
ID uint64 `json:"id"`
Payload []byte `json:"payload"`
Created string `json:"created"`
Tags []string `json:"tags"`
Topic struct {
Name string `json:"name"`
TTL uint64 `json:"ttl"`
Seq uint64 `json:"seq"`
Created string `json:"created"`
} `json:"topic"`
}, len(events))
for i := range events {
e, ok := events[i].(*PostEvent)
if !ok {
continue
}
out[i].ID = e.EventMeta().Position
out[i].Created = e.EventMeta().Created().Format(time.RFC3339Nano)
out[i].Payload = e.Payload
out[i].Tags = e.Tags
out[i].Topic.Name = e.EventMeta().StreamID
out[i].Topic.Created = first.EventMeta().Created().Format(time.RFC3339Nano)
out[i].Topic.Seq = e.EventMeta().Position
}
if len(out) == 1 {
return json.NewEncoder(w).Encode(out[0])
}
return json.NewEncoder(w).Encode(out)
}