tests: add testing around set and es aggregate
This commit is contained in:
parent
063dca997f
commit
ba3c302dc4
4
Makefile
4
Makefile
|
@ -33,8 +33,10 @@ ifeq (, $(shell which gqlgen))
|
|||
endif
|
||||
gqlgen
|
||||
|
||||
|
||||
EV_HOST?=localhost:8080
|
||||
load:
|
||||
watch -n .1 "http POST localhost:8080/inbox/asdf/test a=b one=1 two:='{\"v\":2}' | jq"
|
||||
watch -n .1 "http POST $(EV_HOST)/inbox/asdf/test a=b one=1 two:='{\"v\":2}' | jq"
|
||||
|
||||
bi:
|
||||
go build .
|
||||
|
|
BIN
local.mk.secret
BIN
local.mk.secret
Binary file not shown.
|
@ -89,9 +89,21 @@ func (m *eventLog) Append(ctx context.Context, events event.Events, version uint
|
|||
for i := range events {
|
||||
span.AddEvent(fmt.Sprintf("read event %d of %d", i, len(events)))
|
||||
|
||||
// --- clone event
|
||||
e := events[i]
|
||||
b, err := event.MarshalBinary(e)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
e, err = event.UnmarshalBinary(ctx, b, e.EventMeta().Position)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// ---
|
||||
|
||||
pos := last + uint64(i) + 1
|
||||
event.SetPosition(events[i], pos)
|
||||
*stream = append(*stream, events[i])
|
||||
event.SetPosition(e, pos)
|
||||
*stream = append(*stream, e)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -126,8 +138,17 @@ func (m *eventLog) Read(ctx context.Context, pos int64, count int64) (event.Even
|
|||
events = make([]event.Event, math.Abs(count))
|
||||
for i := range events {
|
||||
span.AddEvent(fmt.Sprintf("read event %d of %d", i, math.Abs(count)))
|
||||
// ---
|
||||
events[i] = (*stream)[start-1]
|
||||
|
||||
// --- clone event
|
||||
e := (*stream)[start-1]
|
||||
b, err := event.MarshalBinary(e)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
events[i], err = event.UnmarshalBinary(ctx, b, e.EventMeta().Position)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// ---
|
||||
|
||||
if count > 0 {
|
||||
|
@ -140,6 +161,7 @@ func (m *eventLog) Read(ctx context.Context, pos int64, count int64) (event.Even
|
|||
break
|
||||
}
|
||||
}
|
||||
event.SetStreamID(m.streamID, events...)
|
||||
|
||||
return nil
|
||||
})
|
||||
|
@ -147,8 +169,6 @@ func (m *eventLog) Read(ctx context.Context, pos int64, count int64) (event.Even
|
|||
return nil, err
|
||||
}
|
||||
|
||||
event.SetStreamID(m.streamID, events...)
|
||||
|
||||
return events, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -71,8 +71,9 @@ func (w *wrapper) Append(ctx context.Context, events event.Events, version uint6
|
|||
for i := range events {
|
||||
e := events[i]
|
||||
eventType := event.TypeOf(e)
|
||||
streamID := e.EventMeta().StreamID
|
||||
streamPos := e.EventMeta().Position
|
||||
m := e.EventMeta()
|
||||
streamID := m.StreamID
|
||||
streamPos := m.Position
|
||||
|
||||
e1 := event.NewPtr(streamID, streamPos)
|
||||
event.SetStreamID("$all", e1)
|
||||
|
|
|
@ -76,8 +76,6 @@ func Open(ctx context.Context, dsn string, options ...Option) (*EventStore, erro
|
|||
ctx, span := lg.Span(ctx)
|
||||
defer span.End()
|
||||
|
||||
Mes_open.Add(ctx, 1)
|
||||
|
||||
name, _, ok := strings.Cut(dsn, ":")
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("%w: no scheme", ErrNoDriver)
|
||||
|
@ -101,6 +99,8 @@ func Open(ctx context.Context, dsn string, options ...Option) (*EventStore, erro
|
|||
o.Apply(es)
|
||||
}
|
||||
|
||||
Mes_open.Add(ctx, 1)
|
||||
|
||||
return es, err
|
||||
}
|
||||
|
||||
|
@ -197,6 +197,9 @@ func (es *EventStore) LastIndex(ctx context.Context, streamID string) (uint64, e
|
|||
return l.LastIndex(ctx)
|
||||
}
|
||||
func (es *EventStore) EventStream() driver.EventStream {
|
||||
if es == nil {
|
||||
return nil
|
||||
}
|
||||
d := es.Driver
|
||||
for d != nil {
|
||||
if d, ok := d.(driver.EventStream); ok {
|
||||
|
|
|
@ -12,63 +12,15 @@ import (
|
|||
|
||||
"github.com/sour-is/ev/pkg/es"
|
||||
memstore "github.com/sour-is/ev/pkg/es/driver/mem-store"
|
||||
"github.com/sour-is/ev/pkg/es/driver/projecter"
|
||||
"github.com/sour-is/ev/pkg/es/driver/streamer"
|
||||
"github.com/sour-is/ev/pkg/es/event"
|
||||
)
|
||||
|
||||
func TestES(t *testing.T) {
|
||||
is := is.New(t)
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
err := event.Register(ctx, &ValueSet{})
|
||||
is.NoErr(err)
|
||||
|
||||
memstore.Init(ctx)
|
||||
|
||||
_, err = es.Open(ctx, "mem")
|
||||
is.True(errors.Is(err, es.ErrNoDriver))
|
||||
|
||||
_, err = es.Open(ctx, "bogo:")
|
||||
is.True(errors.Is(err, es.ErrNoDriver))
|
||||
|
||||
es, err := es.Open(ctx, "mem:")
|
||||
is.NoErr(err)
|
||||
|
||||
thing := &Thing{Name: "time"}
|
||||
err = es.Load(ctx, thing)
|
||||
is.NoErr(err)
|
||||
|
||||
t.Log(thing.StreamVersion(), thing.Name, thing.Value)
|
||||
|
||||
err = thing.OnSetValue(time.Now().String())
|
||||
is.NoErr(err)
|
||||
|
||||
i, err := es.Save(ctx, thing)
|
||||
is.NoErr(err)
|
||||
|
||||
t.Log(thing.StreamVersion(), thing.Name, thing.Value)
|
||||
t.Log("Wrote: ", i)
|
||||
|
||||
i, err = es.Append(ctx, "thing-time", event.NewEvents(&ValueSet{Value: "xxx"}))
|
||||
is.NoErr(err)
|
||||
is.Equal(i, uint64(1))
|
||||
|
||||
events, err := es.Read(ctx, "thing-time", -1, -11)
|
||||
is.NoErr(err)
|
||||
|
||||
for i, e := range events {
|
||||
t.Logf("event %d %d - %v\n", i, e.EventMeta().Position, e)
|
||||
}
|
||||
|
||||
first, err := es.FirstIndex(ctx, "thing-time")
|
||||
is.NoErr(err)
|
||||
is.Equal(first, uint64(1))
|
||||
|
||||
last, err := es.LastIndex(ctx, "thing-time")
|
||||
is.NoErr(err)
|
||||
is.Equal(last, uint64(2))
|
||||
|
||||
}
|
||||
var (
|
||||
_ event.Event = (*ValueSet)(nil)
|
||||
_ event.Aggregate = (*Thing)(nil)
|
||||
)
|
||||
|
||||
type Thing struct {
|
||||
Name string
|
||||
|
@ -77,9 +29,9 @@ type Thing struct {
|
|||
event.AggregateRoot
|
||||
}
|
||||
|
||||
func (a *Thing) StreamID() string {
|
||||
return fmt.Sprintf("thing-%s", a.Name)
|
||||
}
|
||||
// func (a *Thing) StreamID() string {
|
||||
// return fmt.Sprintf("thing-%s", a.Name)
|
||||
// }
|
||||
func (a *Thing) ApplyEvent(lis ...event.Event) {
|
||||
for _, e := range lis {
|
||||
switch e := e.(type) {
|
||||
|
@ -118,7 +70,120 @@ func (e *ValueSet) UnmarshalBinary(b []byte) error {
|
|||
return json.Unmarshal(b, e)
|
||||
}
|
||||
|
||||
var (
|
||||
_ event.Event = (*ValueSet)(nil)
|
||||
_ event.Aggregate = (*Thing)(nil)
|
||||
)
|
||||
func TestES(t *testing.T) {
|
||||
is := is.New(t)
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
err := event.Register(ctx, &ValueSet{})
|
||||
is.NoErr(err)
|
||||
|
||||
es.Init(ctx)
|
||||
memstore.Init(ctx)
|
||||
|
||||
{
|
||||
store, err := es.Open(ctx, "mem")
|
||||
is.True(errors.Is(err, es.ErrNoDriver))
|
||||
is.True(store.EventStream() == nil)
|
||||
}
|
||||
|
||||
{
|
||||
_, err := es.Open(ctx, "bogo:")
|
||||
is.True(errors.Is(err, es.ErrNoDriver))
|
||||
}
|
||||
|
||||
store, err := es.Open(ctx, "mem:", streamer.New(ctx), projecter.New(ctx))
|
||||
is.NoErr(err)
|
||||
|
||||
thing := &Thing{Name: "time"}
|
||||
err = store.Load(ctx, thing)
|
||||
is.NoErr(err)
|
||||
|
||||
t.Log(thing.StreamVersion(), thing.Name, thing.Value)
|
||||
|
||||
err = thing.OnSetValue(time.Now().String())
|
||||
is.NoErr(err)
|
||||
|
||||
thing.SetStreamID("thing-time")
|
||||
i, err := store.Save(ctx, thing)
|
||||
is.NoErr(err)
|
||||
|
||||
t.Log(thing.StreamVersion(), thing.Name, thing.Value)
|
||||
t.Log("Wrote: ", i)
|
||||
|
||||
i, err = store.Append(ctx, "thing-time", event.NewEvents(&ValueSet{Value: "xxx"}))
|
||||
is.NoErr(err)
|
||||
is.Equal(i, uint64(1))
|
||||
|
||||
events, err := store.Read(ctx, "thing-time", -1, -11)
|
||||
is.NoErr(err)
|
||||
|
||||
for i, e := range events {
|
||||
t.Logf("event %d %d - %v\n", i, e.EventMeta().Position, e)
|
||||
}
|
||||
|
||||
first, err := store.FirstIndex(ctx, "thing-time")
|
||||
is.NoErr(err)
|
||||
is.Equal(first, uint64(1))
|
||||
|
||||
last, err := store.LastIndex(ctx, "thing-time")
|
||||
is.NoErr(err)
|
||||
is.Equal(last, uint64(2))
|
||||
|
||||
stream := store.EventStream()
|
||||
is.True(stream != nil)
|
||||
}
|
||||
|
||||
func TestESOperations(t *testing.T) {
|
||||
is := is.New(t)
|
||||
ctx := context.Background()
|
||||
|
||||
es.Init(ctx)
|
||||
memstore.Init(ctx)
|
||||
|
||||
store, err := es.Open(ctx, "mem:", streamer.New(ctx), projecter.New(ctx))
|
||||
is.NoErr(err)
|
||||
|
||||
thing, err := es.Create(ctx, store, "thing-1", func(ctx context.Context, agg *Thing) error {
|
||||
return agg.OnSetValue("foo")
|
||||
})
|
||||
|
||||
is.NoErr(err)
|
||||
is.Equal(thing.Version(), uint64(1))
|
||||
is.Equal(thing.Value, "foo")
|
||||
|
||||
thing, err = es.Update(ctx, store, "thing-1", func(ctx context.Context, agg *Thing) error {
|
||||
return agg.OnSetValue("bar")
|
||||
})
|
||||
|
||||
is.NoErr(err)
|
||||
is.Equal(thing.Version(), uint64(2))
|
||||
is.Equal(thing.Value, "bar")
|
||||
|
||||
thing, err = es.Upsert(ctx, store, "thing-2", func(ctx context.Context, agg *Thing) error {
|
||||
return agg.OnSetValue("bin")
|
||||
})
|
||||
|
||||
is.NoErr(err)
|
||||
is.Equal(thing.Version(), uint64(1))
|
||||
is.Equal(thing.Value, "bin")
|
||||
|
||||
thing, err = es.Upsert(ctx, store, "thing-2", func(ctx context.Context, agg *Thing) error {
|
||||
return agg.OnSetValue("baz")
|
||||
})
|
||||
|
||||
is.NoErr(err)
|
||||
is.Equal(thing.Version(), uint64(2))
|
||||
is.Equal(thing.Value, "baz")
|
||||
|
||||
}
|
||||
|
||||
func TestUnwrap(t *testing.T) {
|
||||
is := is.New(t)
|
||||
|
||||
err := errors.New("foo")
|
||||
werr := fmt.Errorf("wrap: %w", err)
|
||||
|
||||
is.Equal(es.Unwrap(werr), err)
|
||||
is.Equal(es.Unwrap("test"), "")
|
||||
}
|
||||
|
|
25
pkg/set/set_test.go
Normal file
25
pkg/set/set_test.go
Normal file
|
@ -0,0 +1,25 @@
|
|||
package set_test
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/matryer/is"
|
||||
"github.com/sour-is/ev/pkg/set"
|
||||
)
|
||||
|
||||
func TestStringSet(t *testing.T) {
|
||||
is := is.New(t)
|
||||
|
||||
s := set.New(strings.Fields("one two three")...)
|
||||
|
||||
is.True(s.Has("one"))
|
||||
is.True(s.Has("two"))
|
||||
is.True(s.Has("three"))
|
||||
is.True(!s.Has("four"))
|
||||
|
||||
is.Equal(set.New("one").String(), "set(one)")
|
||||
|
||||
var n set.Set[string]
|
||||
is.Equal(n.String(), "set(<nil>)")
|
||||
}
|
Loading…
Reference in New Issue
Block a user