From 1010657a02dc318c57443b4dc8205172b241d8ed Mon Sep 17 00:00:00 2001 From: Jon Lundy Date: Thu, 4 Aug 2022 14:37:51 -0600 Subject: [PATCH] initial commit --- .gitignore | 2 + go.mod | 17 ++ go.sum | 17 ++ main.go | 185 ++++++++++++++++++++ pkg/es/driver/disk-store/disk-store.go | 232 +++++++++++++++++++++++++ pkg/es/driver/driver.go | 18 ++ pkg/es/es.go | 50 ++++++ pkg/es/es_test.go | 101 +++++++++++ pkg/es/event/aggregate.go | 124 +++++++++++++ pkg/es/event/events.go | 129 ++++++++++++++ pkg/es/event/events_test.go | 59 +++++++ pkg/es/event/reflect.go | 201 +++++++++++++++++++++ pkg/locker/locker.go | 28 +++ pkg/math/math.go | 36 ++++ 14 files changed, 1199 insertions(+) create mode 100644 .gitignore create mode 100644 go.mod create mode 100644 go.sum create mode 100644 main.go create mode 100644 pkg/es/driver/disk-store/disk-store.go create mode 100644 pkg/es/driver/driver.go create mode 100644 pkg/es/es.go create mode 100644 pkg/es/es_test.go create mode 100644 pkg/es/event/aggregate.go create mode 100644 pkg/es/event/events.go create mode 100644 pkg/es/event/events_test.go create mode 100644 pkg/es/event/reflect.go create mode 100644 pkg/locker/locker.go create mode 100644 pkg/math/math.go diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..bf24f70 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +.vscode/ +data/ diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..bda14f0 --- /dev/null +++ b/go.mod @@ -0,0 +1,17 @@ +module github.com/sour-is/ev + +go 1.18 + +require ( + github.com/tidwall/wal v1.1.7 + golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 +) + +require ( + github.com/matryer/is v1.4.0 + github.com/oklog/ulid/v2 v2.1.0 + github.com/tidwall/gjson v1.10.2 // indirect + github.com/tidwall/match v1.1.1 // indirect + github.com/tidwall/pretty v1.2.0 // indirect + github.com/tidwall/tinylru v1.1.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..f8626f9 --- /dev/null +++ b/go.sum @@ -0,0 +1,17 @@ +github.com/matryer/is v1.4.0 h1:sosSmIWwkYITGrxZ25ULNDeKiMNzFSr4V/eqBQP0PeE= +github.com/matryer/is v1.4.0/go.mod h1:8I/i5uYgLzgsgEloJE1U6xx5HkBQpAZvepWuujKwMRU= +github.com/oklog/ulid/v2 v2.1.0 h1:+9lhoxAP56we25tyYETBBY1YLA2SaoLvUFgrP2miPJU= +github.com/oklog/ulid/v2 v2.1.0/go.mod h1:rcEKHmBBKfef9DhnvX7y1HZBYxjXb0cP5ExxNsTT1QQ= +github.com/pborman/getopt v0.0.0-20170112200414-7148bc3a4c30/go.mod h1:85jBQOZwpVEaDAr341tbn15RS4fCAsIst0qp7i8ex1o= +github.com/tidwall/gjson v1.10.2 h1:APbLGOM0rrEkd8WBw9C24nllro4ajFuJu0Sc9hRz8Bo= +github.com/tidwall/gjson v1.10.2/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= +github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA= +github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM= +github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs= +github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= +github.com/tidwall/tinylru v1.1.0 h1:XY6IUfzVTU9rpwdhKUF6nQdChgCdGjkMfLzbWyiau6I= +github.com/tidwall/tinylru v1.1.0/go.mod h1:3+bX+TJ2baOLMWTnlyNWHh4QMnFyARg2TLTQ6OFbzw8= +github.com/tidwall/wal v1.1.7 h1:emc1TRjIVsdKKSnpwGBAcsAGg0767SvUk8+ygx7Bb+4= +github.com/tidwall/wal v1.1.7/go.mod h1:r6lR1j27W9EPalgHiB7zLJDYu3mzW5BQP5KrzBpYY/E= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 h1:uVc8UZUe6tr40fFVnUP5Oj+veunVezqYl9z7DYw9xzw= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= diff --git a/main.go b/main.go new file mode 100644 index 0000000..84a4a19 --- /dev/null +++ b/main.go @@ -0,0 +1,185 @@ +package main + +import ( + "bytes" + "context" + "fmt" + "io" + "log" + "net/http" + "os" + "os/signal" + "strconv" + "strings" + "time" + + "golang.org/x/sync/errgroup" + + "github.com/sour-is/ev/pkg/es" + "github.com/sour-is/ev/pkg/es/driver" + ds_driver "github.com/sour-is/ev/pkg/es/driver/disk-store" + "github.com/sour-is/ev/pkg/es/event" +) + +func main() { + ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill) + go func() { + <-ctx.Done() + defer cancel() + }() + + if err := run(ctx); err != nil { + log.Fatal(err) + } +} + +func run(ctx context.Context) error { + event.Register(&PostEvent{}) + ds_driver.Init(ctx) + + es, err := es.Open(ctx, "file:data") + if err != nil { + return err + } + + svc := &service{ + es: es, + } + + s := http.Server{ + Addr: ":8080", + } + + http.HandleFunc("/event/", svc.event) + + log.Print("Listen on ", s.Addr) + g, ctx := errgroup.WithContext(ctx) + + g.Go(s.ListenAndServe) + + g.Go(func() error { + <-ctx.Done() + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + log.Print("shutdown http") + return s.Shutdown(ctx) + }) + + return g.Wait() +} + +type service struct { + es driver.EventStore +} + +func (s *service) event(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + + var name, tags string + if strings.HasPrefix(r.URL.Path, "/event/") { + name = strings.TrimPrefix(r.URL.Path, "/event/") + name, tags, _ = strings.Cut(name, "/") + } else { + w.WriteHeader(http.StatusNotFound) + return + } + + if r.Method == http.MethodGet { + if name == "" { + w.WriteHeader(http.StatusNotFound) + return + } + + var pos, count int64 = -1, -99 + qry := r.URL.Query() + + if i, err := strconv.ParseInt(qry.Get("pos"), 10, 64); err == nil { + pos = i + } + if i, err := strconv.ParseInt(qry.Get("n"), 10, 64); err == nil { + count = i + } + + log.Print("name=", name, ", pos=", pos, ", n=", count) + events, err := s.es.Read(ctx, "post-"+name, pos, count) + if err != nil { + log.Print(err) + w.WriteHeader(http.StatusInternalServerError) + return + } + + for i := range events { + fmt.Fprintln(w, events[i]) + } + + return + } + + b, err := io.ReadAll(io.LimitReader(r.Body, 64*1024)) + if err != nil { + log.Print(err) + w.WriteHeader(http.StatusBadRequest) + return + } + r.Body.Close() + + if name == "" { + w.WriteHeader(http.StatusNotFound) + return + } + + log.Print(name, tags) + events := event.NewEvents(&PostEvent{ + Payload: b, + Tags: strings.Split(tags, "/"), + }) + _, err = s.es.Append(r.Context(), "post-"+name, events) + if err != nil { + log.Print(err) + + w.WriteHeader(http.StatusInternalServerError) + return + } + + m := events.First().EventMeta() + w.WriteHeader(http.StatusAccepted) + fmt.Fprintf(w, "OK %d %s", m.Position, m.EventID) +} + +type PostEvent struct { + Payload []byte + Tags []string + + eventMeta event.Meta +} + +func (e *PostEvent) EventMeta() event.Meta { + if e == nil { + return event.Meta{} + } + return e.eventMeta +} +func (e *PostEvent) SetEventMeta(eventMeta event.Meta) { + if e == nil { + return + } + e.eventMeta = eventMeta +} +func (e *PostEvent) String() string { + var b bytes.Buffer + + // b.WriteString(e.eventMeta.StreamID) + // b.WriteRune('@') + b.WriteString(strconv.FormatUint(e.eventMeta.Position, 10)) + b.WriteRune('\t') + + b.WriteString(e.eventMeta.EventID.String()) + b.WriteRune('\t') + b.WriteString(string(e.Payload)) + if len(e.Tags) > 0 { + b.WriteRune('\t') + b.WriteString(strings.Join(e.Tags, ",")) + } + + return b.String() +} diff --git a/pkg/es/driver/disk-store/disk-store.go b/pkg/es/driver/disk-store/disk-store.go new file mode 100644 index 0000000..5f30c3d --- /dev/null +++ b/pkg/es/driver/disk-store/disk-store.go @@ -0,0 +1,232 @@ +package ds_driver + +import ( + "context" + "fmt" + "os" + "path/filepath" + "strings" + + "github.com/tidwall/wal" + + "github.com/sour-is/ev/pkg/es" + "github.com/sour-is/ev/pkg/es/driver" + "github.com/sour-is/ev/pkg/es/event" + "github.com/sour-is/ev/pkg/math" +) + +type diskStore struct { + path string +} + +var _ driver.Driver = (*diskStore)(nil) + +func Init(ctx context.Context) { + es.Register(ctx, "file", &diskStore{}) +} + +func (diskStore) Open(dsn string) (driver.EventStore, error) { + scheme, path, ok := strings.Cut(dsn, ":") + if !ok { + return nil, fmt.Errorf("expected scheme") + } + + if scheme != "file" { + return nil, fmt.Errorf("expeted scheme=file, got=%s", scheme) + } + + if _, err := os.Stat(path); os.IsNotExist(err) { + err = os.MkdirAll(path, 0700) + if err != nil { + return nil, err + } + } + + return &diskStore{path: path}, nil +} + +func (es *diskStore) Save(ctx context.Context, agg event.Aggregate) (uint64, error) { + l, err := es.readLog(agg.StreamID()) + if err != nil { + return 0, err + } + + var last uint64 + + if last, err = l.w.LastIndex(); err != nil { + return 0, err + } + + if agg.StreamVersion() != last { + return 0, fmt.Errorf("current version wrong %d != %d", agg.StreamVersion(), last) + } + + events := agg.Events(true) + + var b []byte + batch := &wal.Batch{} + for _, e := range events { + b, err = event.MarshalText(e) + if err != nil { + return 0, err + } + + batch.Write(e.EventMeta().Position, b) + } + + err = l.w.WriteBatch(batch) + if err != nil { + return 0, err + } + agg.Commit() + + return uint64(len(events)), nil +} +func (es *diskStore) Append(ctx context.Context, streamID string, events event.Events) (uint64, error) { + event.SetStreamID(streamID, events...) + + l, err := es.readLog(streamID) + if err != nil { + return 0, err + } + + var last uint64 + + if last, err = l.w.LastIndex(); err != nil { + return 0, err + } + + var b []byte + + batch := &wal.Batch{} + for i, e := range events { + b, err = event.MarshalText(e) + if err != nil { + return 0, err + } + pos := last + uint64(i) + 1 + event.SetPosition(e, pos) + + batch.Write(pos, b) + } + + err = l.w.WriteBatch(batch) + if err != nil { + return 0, err + } + return uint64(len(events)), nil +} +func (es *diskStore) Load(ctx context.Context, agg event.Aggregate) error { + l, err := es.readLog(agg.StreamID()) + if err != nil { + return err + } + + var i, first, last uint64 + + if first, err = l.w.FirstIndex(); err != nil { + return err + } + if last, err = l.w.LastIndex(); err != nil { + return err + } + + if first == 0 || last == 0 { + return nil + } + + var b []byte + events := make([]event.Event, last-i) + + for i = 0; first+i <= last; i++ { + b, err = l.w.Read(first + i) + if err != nil { + return err + } + events[i], err = event.UnmarshalText(b, first+i) + if err != nil { + return err + } + } + + event.Append(agg, events...) + + return nil +} +func (es *diskStore) Read(ctx context.Context, streamID string, pos, count int64) (event.Events, error) { + l, err := es.readLog(streamID) + if err != nil { + return nil, err + } + + var first, last, start uint64 + + if first, err = l.w.FirstIndex(); err != nil { + return nil, err + } + if last, err = l.w.LastIndex(); err != nil { + return nil, err + } + + if first == 0 || last == 0 { + return nil, nil + } + + switch { + case pos >= 0: + start = first + uint64(pos) + if pos == 0 && count < 0 { + count = -count // if pos=0 assume forward count. + } + case pos < 0: + start = uint64(int64(last) + pos + 1) + if pos == -1 && count > 0 { + count = -count // if pos=-1 assume backward count. + } + } + + events := make([]event.Event, math.Abs(count)) + for i := range events { + var b []byte + + b, err = l.w.Read(start) + if err != nil { + return events, err + } + events[i], err = event.UnmarshalText(b, start) + if err != nil { + return events, err + } + + if count > 0 { + start += 1 + } else { + start -= 1 + } + if start < first || start > last { + events = events[:i+1] + break + } + } + + event.SetStreamID(streamID, events...) + + return events, nil +} + +func (es *diskStore) readLog(name string) (*eventLog, error) { + return newEventLog(name, filepath.Join(es.path, name)) +} + +type eventLog struct { + name string + path string + w *wal.Log +} + +func newEventLog(name, path string) (*eventLog, error) { + var err error + el := &eventLog{name: name, path: path} + el.w, err = wal.Open(path, wal.DefaultOptions) + return el, err +} diff --git a/pkg/es/driver/driver.go b/pkg/es/driver/driver.go new file mode 100644 index 0000000..796c8a3 --- /dev/null +++ b/pkg/es/driver/driver.go @@ -0,0 +1,18 @@ +package driver + +import ( + "context" + + "github.com/sour-is/ev/pkg/es/event" +) + +type Driver interface { + Open(string) (EventStore, error) +} + +type EventStore interface { + Save(ctx context.Context, agg event.Aggregate) (uint64, error) + Load(ctx context.Context, agg event.Aggregate) error + Read(ctx context.Context, streamID string, pos, count int64) (event.Events, error) + Append(ctx context.Context, streamID string, events event.Events) (uint64, error) +} diff --git a/pkg/es/es.go b/pkg/es/es.go new file mode 100644 index 0000000..aa8267f --- /dev/null +++ b/pkg/es/es.go @@ -0,0 +1,50 @@ +package es + +import ( + "context" + "errors" + "fmt" + "strings" + + "github.com/sour-is/ev/pkg/es/driver" + "github.com/sour-is/ev/pkg/locker" +) + +type config struct { + drivers map[string]driver.Driver +} + +var ( + Config = locker.New(&config{drivers: make(map[string]driver.Driver)}) +) + +func Register(ctx context.Context, name string, d driver.Driver) { + Config.Modify(ctx, func(c *config) error { + if _, set := c.drivers[name]; set { + return fmt.Errorf("driver %s already set", name) + } + c.drivers[name] = d + return nil + }) +} + +func Open(ctx context.Context, dsn string) (driver.EventStore, error) { + name, _, ok := strings.Cut(dsn, ":") + if !ok { + return nil, fmt.Errorf("%w: no scheme", ErrNoDriver) + } + + var d driver.Driver + Config.Modify(ctx,func(c *config) error { + var ok bool + d, ok = c.drivers[name] + if !ok { + return fmt.Errorf("%w: %s not registered", ErrNoDriver, name) + } + return nil + }) + + return d.Open(dsn) +} + +var ErrNoDriver = errors.New("no driver") diff --git a/pkg/es/es_test.go b/pkg/es/es_test.go new file mode 100644 index 0000000..6788aaf --- /dev/null +++ b/pkg/es/es_test.go @@ -0,0 +1,101 @@ +package es_test + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/sour-is/ev/pkg/es" + ds_driver "github.com/sour-is/ev/pkg/es/driver/disk-store" + "github.com/sour-is/ev/pkg/es/event" +) + +func TestES(t *testing.T) { + ctx := context.Background() + + event.Register(&ValueSet{}) + + ds_driver.Init(ctx) + + es, err := es.Open(ctx, "file:data") + if err != nil { + t.Fatal(err) + } + + thing := &Thing{Name: "time"} + err = es.Load(ctx, thing) + if err != nil { + t.Fatal(err) + } + t.Log(thing.StreamVersion(), thing.Name, thing.Value) + + err = thing.OnSetValue(time.Now().String()) + if err != nil { + t.Fatal(err) + } + + i, err := es.Save(ctx, thing) + if err != nil { + t.Fatal(err) + } + + t.Log(thing.StreamVersion(), thing.Name, thing.Value) + t.Log("Wrote: ", i) + + events, err := es.Read(ctx, "thing-time", -1, -11) + if err != nil { + t.Fatal(err) + } + + for i, e := range events { + t.Logf("event %d %d - %v\n", i, e.EventMeta().Position, e) + } +} + +type Thing struct { + Name string + Value string + + event.AggregateRoot +} + +func (a *Thing) StreamID() string { + return fmt.Sprintf("thing-%s", a.Name) +} +func (a *Thing) ApplyEvent(lis ...event.Event) { + for _, e := range lis { + switch e := e.(type) { + case *ValueSet: + a.Value = e.Value + } + } +} +func (a *Thing) OnSetValue(value string) error { + event.Raise(a, &ValueSet{Value: value}) + return nil +} + +type ValueSet struct { + Value string + + eventMeta event.Meta +} + +func (e *ValueSet) EventMeta() event.Meta { + if e == nil { + return event.Meta{} + } + return e.eventMeta +} +func (e *ValueSet) SetEventMeta(eventMeta event.Meta) { + if e == nil { + return + } + e.eventMeta = eventMeta +} + +var ( + _ event.Event = (*ValueSet)(nil) + _ event.Aggregate = (*Thing)(nil) +) diff --git a/pkg/es/event/aggregate.go b/pkg/es/event/aggregate.go new file mode 100644 index 0000000..51b3f07 --- /dev/null +++ b/pkg/es/event/aggregate.go @@ -0,0 +1,124 @@ +package event + +import ( + "errors" + "fmt" + "sync" +) + +type Aggregate interface { + // ApplyEvent applies the event to the aggrigate state + ApplyEvent(...Event) + StreamID() string + + AggregateRootInterface +} + +// Raise adds new uncommitted events +func Raise(a Aggregate, lis ...Event) { + lis = NewEvents(lis...) + SetStreamID(a.StreamID(), lis...) + a.raise(lis...) + a.ApplyEvent(lis...) +} + +// Append adds new committed events +func Append(a Aggregate, lis ...Event) { + a.append(lis...) + a.ApplyEvent(lis...) +} + +// CheckVersion returns an error if the version does not match. +func CheckVersion(a Aggregate, version uint64) error { + if version != uint64(a.StreamVersion()) { + return fmt.Errorf("version wrong, got (proposed) %d != (expected) %d", version, a.StreamVersion()) + } + return nil +} + +// NotExists returns error if there are no events present. +func NotExists(a Aggregate) error { + if a.StreamVersion() != 0 { + return fmt.Errorf("%w, got version == %d", ErrShouldNotExist, a.StreamVersion()) + } + return nil +} + +type AggregateRootInterface interface { + // Events returns the aggrigate events + // pass true for only uncommitted events + Events(bool) Events + // StreamVersion returns last commit events + StreamVersion() uint64 + // Version returns the current aggrigate version. (committed + uncommitted) + Version() uint64 + + raise(lis ...Event) + append(lis ...Event) + Commit() +} + +var _ AggregateRootInterface = &AggregateRoot{} + +type AggregateRoot struct { + events Events + streamVersion uint64 + + mu sync.RWMutex +} + +func (a *AggregateRoot) Commit() { + a.streamVersion = uint64(len(a.events)) +} + +func (a *AggregateRoot) StreamVersion() uint64 { + return a.streamVersion +} +func (a *AggregateRoot) Events(new bool) Events { + a.mu.RLock() + defer a.mu.RUnlock() + + events := a.events + if new { + events = events[a.streamVersion:] + } + + lis := make(Events, len(events)) + copy(lis, events) + + return lis +} +func (a *AggregateRoot) Version() uint64 { + return uint64(len(a.events)) +} + +//lint:ignore U1000 is called by embeded interface +func (a *AggregateRoot) raise(lis ...Event) { //nolint + a.mu.Lock() + defer a.mu.Unlock() + + a.posStartAt(lis...) + + a.events = append(a.events, lis...) +} + +//lint:ignore U1000 is called by embeded interface +func (a *AggregateRoot) append(lis ...Event) { + a.mu.Lock() + defer a.mu.Unlock() + + a.posStartAt(lis...) + + a.events = append(a.events, lis...) + a.streamVersion += uint64(len(lis)) +} + +func (a *AggregateRoot) posStartAt(lis ...Event) { + for i, e := range lis { + m := e.EventMeta() + m.Position = a.streamVersion + uint64(i) + 1 + e.SetEventMeta(m) + } +} + +var ErrShouldNotExist = errors.New("should not exist") diff --git a/pkg/es/event/events.go b/pkg/es/event/events.go new file mode 100644 index 0000000..44bb48f --- /dev/null +++ b/pkg/es/event/events.go @@ -0,0 +1,129 @@ +package event + +import ( + "bytes" + "crypto/rand" + "fmt" + "io" + "strings" + "sync" + "time" + + ulid "github.com/oklog/ulid/v2" +) + +var pool = sync.Pool{ + New: func() interface{} { return ulid.Monotonic(rand.Reader, 0) }, +} + +func getULID() ulid.ULID { + var entropy io.Reader = rand.Reader + if e, ok := pool.Get().(io.Reader); ok { + entropy = e + defer pool.Put(e) + } + return ulid.MustNew(ulid.Now(), entropy) +} + +type Event interface { + EventMeta() Meta + SetEventMeta(Meta) +} + +// Events is a list of events +type Events []Event + +func NewEvents(lis ...Event) Events { + for i, e := range lis { + meta := e.EventMeta() + meta.Position = uint64(i) + meta.EventID = getULID() + e.SetEventMeta(meta) + } + return lis +} + +func (lis Events) StreamID() string { + if len(lis) == 0 { + return "" + } + return lis.First().EventMeta().StreamID +} +func (lis Events) SetStreamID(streamID string) { + SetStreamID(streamID, lis...) +} +func (lis Events) First() Event { + if len(lis) == 0 { + return nil + } + return lis[0] +} +func (lis Events) Rest() Events { + if len(lis) == 0 { + return nil + } + return lis[1:] +} +func (lis Events) MarshalText() ([]byte, error) { + b := &bytes.Buffer{} + for i := range lis { + txt, err := MarshalText(lis[i]) + if err != nil { + return nil, err + } + b.Write(txt) + } + return b.Bytes(), nil +} + +func TypeOf(e Event) string { + if ie, ok := e.(interface{ UnwrapEvent() Event }); ok { + e = ie.UnwrapEvent() + } + if e, ok := e.(interface{ EventType() string }); ok { + return e.EventType() + } + + // Default to printed representation for unnamed types + return strings.Trim(fmt.Sprintf("%T", e), "*") +} + +type streamID string + +func (s streamID) StreamID() string { + return string(s) +} + +func StreamID(e Event) streamID { + return streamID(e.EventMeta().StreamID) +} +func SetStreamID(id string, lis ...Event) { + for _, e := range lis { + meta := e.EventMeta() + meta.StreamID = id + e.SetEventMeta(meta) + } +} + +func EventID(e Event) ulid.ULID { + return e.EventMeta().EventID +} +func SetEventID(e Event, id ulid.ULID) { + meta := e.EventMeta() + meta.EventID = id + e.SetEventMeta(meta) +} +func SetPosition(e Event, i uint64) { + meta := e.EventMeta() + meta.Position = i + e.SetEventMeta(meta) +} +type Meta struct { + EventID ulid.ULID + StreamID string + Position uint64 +} + +func (m Meta) Time() time.Time { + return ulid.Time(m.EventID.Time()) +} diff --git a/pkg/es/event/events_test.go b/pkg/es/event/events_test.go new file mode 100644 index 0000000..0935ab4 --- /dev/null +++ b/pkg/es/event/events_test.go @@ -0,0 +1,59 @@ +package event_test + +import ( + "bytes" + "testing" + + "github.com/matryer/is" + + "github.com/sour-is/ev/pkg/es/event" +) + +type DummyEvent struct { + Value string + + eventMeta event.Meta +} + +func (e *DummyEvent) EventMeta() event.Meta { + if e == nil { + return event.Meta{} + } + return e.eventMeta +} +func (e *DummyEvent) SetEventMeta(eventMeta event.Meta) { + if e == nil { + return + } + e.eventMeta = eventMeta +} + +func TestEventEncode(t *testing.T) { + is := is.New(t) + + event.Register(&DummyEvent{}) + + var lis event.Events = event.NewEvents( + &DummyEvent{Value: "testA"}, + &DummyEvent{Value: "testB"}, + &DummyEvent{Value: "testC"}, + ) + lis.SetStreamID("test") + + blis, err := event.EncodeEvents(lis...) + is.NoErr(err) + + for _, b := range blis { + sp := bytes.SplitN(b, []byte{'\t'}, 4) + is.Equal(len(sp), 4) + is.Equal(string(sp[1]), "test") + is.Equal(string(sp[2]), "event_test.DummyEvent") + } + + chk, err := event.DecodeEvents(blis...) + is.NoErr(err) + + for i := range chk { + is.Equal(lis[i], chk[i]) + } +} diff --git a/pkg/es/event/reflect.go b/pkg/es/event/reflect.go new file mode 100644 index 0000000..1bb33d7 --- /dev/null +++ b/pkg/es/event/reflect.go @@ -0,0 +1,201 @@ +package event + +import ( + "bytes" + "encoding" + "encoding/json" + "fmt" + "io" + "net/url" + "reflect" + "strings" + "sync" +) + +var ( + eventTypes sync.Map +) + +type UnknownEvent struct { + eventType string + values map[string]json.RawMessage + + eventMeta Meta +} + +var _ Event = (*UnknownEvent)(nil) +var _ json.Marshaler = (*UnknownEvent)(nil) +var _ json.Unmarshaler = (*UnknownEvent)(nil) + +func NewUnknownEventFromValues(eventType string, meta Meta, values url.Values) *UnknownEvent { + jsonValues := make(map[string]json.RawMessage, len(values)) + for k, v := range values { + switch len(v) { + case 0: + jsonValues[k] = []byte("null") + case 1: + jsonValues[k] = embedJSON(v[0]) + default: + parts := make([][]byte, len(v)) + for i := range v { + parts[i] = embedJSON(v[i]) + } + jsonValues[k] = append([]byte("["), bytes.Join(parts, []byte(","))...) + jsonValues[k] = append(jsonValues[k], ']') + } + } + + return &UnknownEvent{eventType: eventType, eventMeta: meta, values: jsonValues} +} +func NewUnknownEventFromRaw(eventType string, meta Meta, values map[string]json.RawMessage) *UnknownEvent { + return &UnknownEvent{eventType: eventType, eventMeta: meta, values: values} +} +func (u UnknownEvent) EventMeta() Meta { return u.eventMeta } +func (u UnknownEvent) EventType() string { return u.eventType } +func (u *UnknownEvent) SetEventMeta(em Meta) { + u.eventMeta = em +} +func (u *UnknownEvent) UnmarshalJSON(b []byte) error { + return json.Unmarshal(b, &u.values) +} +func (u *UnknownEvent) MarshalJSON() ([]byte, error) { + return json.Marshal(u.values) +} + +// Register a type container for Unmarshalling values into. The type must implement Event and not be a nil value. +func Register(lis ...Event) { + for _, e := range lis { + if e == nil { + panic(fmt.Sprintf("can't register event.Event of type=%T with value=%v", e, e)) + } + + value := reflect.ValueOf(e) + + if value.IsNil() { + panic(fmt.Sprintf("can't register event.Event of type=%T with value=%v", e, e)) + } + + value = reflect.Indirect(value) + typ := value.Type() + + eventTypes.LoadOrStore(TypeOf(e), typ) + } +} +func GetContainer(s string) Event { + if typ, ok := eventTypes.Load(s); ok { + if typ, ok := typ.(reflect.Type); ok { + newType := reflect.New(typ) + newInterface := newType.Interface() + if typ, ok := newInterface.(Event); ok { + return typ + } + } + } + return &UnknownEvent{eventType: s} +} + +func MarshalText(e Event) (txt []byte, err error) { + b := &bytes.Buffer{} + + if _, err = writeMarshaler(b, e.EventMeta().EventID); err != nil { + return nil, err + } + b.WriteRune('\t') + if _, err = b.WriteString(e.EventMeta().StreamID); err != nil { + return nil, err + } + b.WriteRune('\t') + if _, err = b.WriteString(TypeOf(e)); err != nil { + return nil, err + } + b.WriteRune('\t') + if enc, ok := e.(encoding.TextMarshaler); ok { + if txt, err = enc.MarshalText(); err != nil { + return nil, err + } + } else { + if txt, err = json.Marshal(e); err != nil { + return nil, err + } + } + _, err = b.Write(txt) + + return b.Bytes(), err +} + +func UnmarshalText(txt []byte, pos uint64) (e Event, err error) { + sp := bytes.SplitN(txt, []byte{'\t'}, 4) + if len(sp) != 4 { + return nil, fmt.Errorf("invalid format. expected=4, got=%d", len(sp)) + } + + m := Meta{} + if err = m.EventID.UnmarshalText(sp[0]); err != nil { + return nil, err + } + + m.StreamID = string(sp[1]) + m.Position = pos + + eventType := string(sp[2]) + e = GetContainer(eventType) + + if enc, ok := e.(encoding.TextUnmarshaler); ok { + if err = enc.UnmarshalText(sp[3]); err != nil { + return nil, err + } + } else { + if err = json.Unmarshal(sp[3], e); err != nil { + return nil, err + } + } + e.SetEventMeta(m) + + return e, nil +} + +func writeMarshaler(out io.Writer, in encoding.TextMarshaler) (int, error) { + if b, err := in.MarshalText(); err != nil { + return 0, err + } else { + return out.Write(b) + } +} + +// DecodeEvents unmarshals the byte list into Events. +func DecodeEvents(lis ...[]byte) (Events, error) { + elis := make([]Event, len(lis)) + + var err error + for i, txt := range lis { + elis[i], err = UnmarshalText(txt, uint64(i)) + if err != nil { + return nil, err + } + } + + return elis, nil +} + +func EncodeEvents(events ...Event) (lis [][]byte, err error) { + lis = make([][]byte, len(events)) + + for i, txt := range events { + lis[i], err = MarshalText(txt) + if err != nil { + return nil, err + } + } + + return lis, nil +} + +func embedJSON(s string) json.RawMessage { + if len(s) > 1 && s[0] == '{' && s[len(s)-1] == '}' { + return []byte(s) + } + if len(s) > 1 && s[0] == '[' && s[len(s)-1] == ']' { + return []byte(s) + } + return []byte(fmt.Sprintf(`"%s"`, strings.Replace(s, `"`, `\"`, -1))) +} diff --git a/pkg/locker/locker.go b/pkg/locker/locker.go new file mode 100644 index 0000000..e467b67 --- /dev/null +++ b/pkg/locker/locker.go @@ -0,0 +1,28 @@ +package locker + +import "context" + +type Locked[T any] struct { + state chan *T +} + +func New[T any](initial *T) *Locked[T] { + s := &Locked[T]{} + s.state = make(chan *T, 1) + s.state <- initial + return s +} + +func (s *Locked[T]) Modify(ctx context.Context, fn func(*T) error) error { + if ctx.Err() != nil { + return ctx.Err() + } + + select { + case state := <-s.state: + defer func() { s.state <- state }() + return fn(state) + case <-ctx.Done(): + return ctx.Err() + } +} \ No newline at end of file diff --git a/pkg/math/math.go b/pkg/math/math.go new file mode 100644 index 0000000..3566347 --- /dev/null +++ b/pkg/math/math.go @@ -0,0 +1,36 @@ +package math + +type signed interface { + ~int | ~int8 | ~int16 | ~int32 | ~int64 +} +type unsigned interface { + ~uint | ~uint8 | ~uint16 | ~uint32 | ~uint64 | ~uintptr +} +type integer interface { + signed | unsigned +} +type float interface { + ~float32 | ~float64 +} +type ordered interface { + integer | float | ~string +} + +func Abs[T signed](i T) T { + if i > 0 { + return i + } + return -i +} +func Max[T ordered](i, j T) T { + if i > j { + return i + } + return j +} +func Min[T ordered](i, j T) T { + if i < j { + return i + } + return j +}