feat: add create salty user

This commit is contained in:
Jon Lundy
2022-08-14 10:04:15 -06:00
parent 5ab185df21
commit ad57c89945
18 changed files with 969 additions and 54 deletions

74
pkg/domain/salty-user.go Normal file
View File

@@ -0,0 +1,74 @@
package domain
import (
"context"
"crypto/sha256"
"fmt"
"log"
"strings"
"github.com/keys-pub/keys"
"github.com/oklog/ulid/v2"
"github.com/sour-is/ev/pkg/es/event"
)
func Init(ctx context.Context) error {
return event.Register(ctx, &UserRegistered{})
}
type SaltyUser struct {
Name string
Pubkey *keys.EdX25519PublicKey
Inbox ulid.ULID
event.AggregateRoot
}
var _ event.Aggregate = (*SaltyUser)(nil)
// ApplyEvent applies the event to the aggrigate state
func (a *SaltyUser) ApplyEvent(lis ...event.Event) {
for _, e := range lis {
switch e := e.(type) {
case *UserRegistered:
a.Name = e.Name
a.Pubkey = e.Pubkey
a.Inbox = e.EventMeta().EventID
a.SetStreamID(a.streamID())
default:
log.Printf("unknown event %T", e)
}
}
}
func (a *SaltyUser) streamID() string {
return fmt.Sprintf("saltyuser-%x", sha256.Sum256([]byte(strings.ToLower(a.Name))))
}
func (a *SaltyUser) OnUserRegister(name string, pubkey *keys.EdX25519PublicKey) error {
event.Raise(a, &UserRegistered{Name: name, Pubkey: pubkey})
return nil
}
type UserRegistered struct {
Name string
Pubkey *keys.EdX25519PublicKey
Endpoint ulid.ULID
eventMeta event.Meta
}
var _ event.Event = (*UserRegistered)(nil)
func (e *UserRegistered) EventMeta() event.Meta {
if e == nil {
return event.Meta{}
}
return e.eventMeta
}
func (e *UserRegistered) SetEventMeta(m event.Meta) {
if e != nil {
e.eventMeta = m
}
}

View File

@@ -3,7 +3,6 @@ package diskstore
import (
"context"
"fmt"
"log"
"os"
"path/filepath"
"strings"
@@ -94,12 +93,14 @@ func (d *diskStore) Open(ctx context.Context, dsn string) (driver.Driver, error)
err := w.Close()
if err != nil {
log.Print(err)
span.RecordError(err)
return err
}
return nil
})
})
if err != nil {
span.RecordError(err)
return nil, err
}
logs := &openlogs{logs: c}
@@ -127,6 +128,7 @@ func (ds *diskStore) EventLog(ctx context.Context, streamID string) (driver.Even
l, err := wal.Open(filepath.Join(ds.path, streamID), wal.DefaultOptions)
if err != nil {
span.RecordError(err)
return err
}
el.events = locker.New(l)
@@ -155,6 +157,7 @@ func (es *eventLog) Append(ctx context.Context, events event.Events, version uin
last, err := l.LastIndex()
if err != nil {
span.RecordError(err)
return err
}
@@ -170,6 +173,8 @@ func (es *eventLog) Append(ctx context.Context, events event.Events, version uin
b, err = event.MarshalText(e)
if err != nil {
span.RecordError(err)
return err
}
pos := last + uint64(i) + 1
@@ -196,10 +201,12 @@ func (es *eventLog) Read(ctx context.Context, pos, count int64) (event.Events, e
first, err := stream.FirstIndex()
if err != nil {
span.RecordError(err)
return err
}
last, err := stream.LastIndex()
if err != nil {
span.RecordError(err)
return err
}
// ---
@@ -208,7 +215,7 @@ func (es *eventLog) Read(ctx context.Context, pos, count int64) (event.Events, e
}
start, count := math.PagerBox(first, last, pos, count)
log.Println("reading", first, last, pos, count, start)
span.AddEvent(fmt.Sprint("reading", first, last, pos, count, start))
if count == 0 {
return nil
}
@@ -221,10 +228,12 @@ func (es *eventLog) Read(ctx context.Context, pos, count int64) (event.Events, e
var b []byte
b, err = stream.Read(start)
if err != nil {
span.RecordError(err)
return err
}
events[i], err = event.UnmarshalText(ctx, b, start)
if err != nil {
span.RecordError(err)
return err
}
// ---
@@ -242,12 +251,13 @@ func (es *eventLog) Read(ctx context.Context, pos, count int64) (event.Events, e
return nil
})
if err != nil {
span.RecordError(err)
return nil, err
}
event.SetStreamID(es.streamID, events...)
return events, err
return events, nil
}
func (es *eventLog) FirstIndex(ctx context.Context) (uint64, error) {
_, span := logz.Span(ctx)
@@ -277,3 +287,6 @@ func (es *eventLog) LastIndex(ctx context.Context) (uint64, error) {
return idx, err
}
func (es *eventLog) LoadForUpdate(ctx context.Context, a event.Aggregate, fn func(context.Context, event.Aggregate) error) (uint64, error) {
panic("not implemented")
}

View File

@@ -14,8 +14,10 @@ type Driver interface {
type EventLog interface {
Read(ctx context.Context, pos, count int64) (event.Events, error)
Append(ctx context.Context, events event.Events, version uint64) (uint64, error)
FirstIndex(ctx context.Context) (uint64, error)
LastIndex(ctx context.Context) (uint64, error)
FirstIndex(context.Context) (uint64, error)
LastIndex(context.Context) (uint64, error)
LoadForUpdate(context.Context, event.Aggregate, func(context.Context, event.Aggregate) error) (uint64, error)
}
type Subscription interface {

View File

@@ -79,6 +79,8 @@ func (m *eventLog) Append(ctx context.Context, events event.Events, version uint
_, span := logz.Span(ctx)
defer span.End()
span.AddEvent(fmt.Sprintf(" %s %#v %d", m.streamID, stream, len(*stream)))
last := uint64(len(*stream))
if version != AppendOnly && version != last {
return fmt.Errorf("current version wrong %d != %d", version, last)
@@ -97,16 +99,18 @@ func (m *eventLog) Append(ctx context.Context, events event.Events, version uint
}
// Read implements driver.EventStore
func (es *eventLog) Read(ctx context.Context, pos int64, count int64) (event.Events, error) {
func (m *eventLog) Read(ctx context.Context, pos int64, count int64) (event.Events, error) {
ctx, span := logz.Span(ctx)
defer span.End()
var events event.Events
err := es.events.Modify(ctx, func(stream *event.Events) error {
err := m.events.Modify(ctx, func(stream *event.Events) error {
_, span := logz.Span(ctx)
defer span.End()
span.AddEvent(fmt.Sprintf(" %s %#v %d", m.streamID, stream, len(*stream)))
first := stream.First().EventMeta().Position
last := stream.Last().EventMeta().Position
// ---
@@ -118,7 +122,7 @@ func (es *eventLog) Read(ctx context.Context, pos int64, count int64) (event.Eve
if count == 0 {
return nil
}
span.AddEvent(fmt.Sprint("box", first, last, pos, count))
events = make([]event.Event, math.Abs(count))
for i := range events {
span.AddEvent(fmt.Sprintf("read event %d of %d", i, math.Abs(count)))
@@ -143,7 +147,7 @@ func (es *eventLog) Read(ctx context.Context, pos int64, count int64) (event.Eve
return nil, err
}
event.SetStreamID(es.streamID, events...)
event.SetStreamID(m.streamID, events...)
return events, nil
}
@@ -165,3 +169,7 @@ func (m *eventLog) LastIndex(ctx context.Context) (uint64, error) {
events, err := m.events.Copy(ctx)
return events.Last().EventMeta().Position, err
}
func (m *eventLog) LoadForUpdate(ctx context.Context, a event.Aggregate, fn func(context.Context, event.Aggregate) error) (uint64, error) {
panic("not implemented")
}

View File

@@ -169,6 +169,10 @@ func (w *wrapper) LastIndex(ctx context.Context) (uint64, error) {
return w.up.LastIndex(ctx)
}
func (w *wrapper) LoadForUpdate(ctx context.Context, a event.Aggregate, fn func(context.Context, event.Aggregate) error) (uint64, error) {
return w.up.LoadForUpdate(ctx, a, fn)
}
type position struct {
size int64
idx int64

View File

@@ -110,13 +110,17 @@ func (es *EventStore) Save(ctx context.Context, agg event.Aggregate) (uint64, er
ctx, span := logz.Span(ctx)
defer span.End()
events := agg.Events(true)
if len(events) == 0 {
return 0, nil
}
Mes_save.Add(ctx, 1)
l, err := es.EventLog(ctx, agg.StreamID())
if err != nil {
return 0, err
}
events := agg.Events(true)
count, err := l.Append(ctx, events, agg.StreamVersion())
if err != nil {
@@ -141,6 +145,7 @@ func (es *EventStore) Load(ctx context.Context, agg event.Aggregate) error {
if err != nil {
return err
}
event.Append(agg, events...)
return nil
@@ -189,7 +194,6 @@ func (es *EventStore) LastIndex(ctx context.Context, streamID string) (uint64, e
}
return l.LastIndex(ctx)
}
func (es *EventStore) EventStream() driver.EventStream {
d := es.Driver
for d != nil {
@@ -212,3 +216,65 @@ func Unwrap[T any](t T) T {
}
var ErrNoDriver = errors.New("no driver")
type PA[T any] interface {
event.Aggregate
*T
}
// Create uses fn to create a new aggregate and store in db.
func Create[A any, T PA[A]](ctx context.Context, es *EventStore, streamID string, fn func(context.Context, T) error) (agg T, err error) {
ctx, span := logz.Span(ctx)
defer span.End()
agg = new(A)
agg.SetStreamID(streamID)
if err = es.Load(ctx, agg); err != nil {
return
}
if err = event.NotExists(agg); err != nil {
return
}
if err = fn(ctx, agg); err != nil {
return
}
var i uint64
if i, err = es.Save(ctx, agg); err != nil {
return
}
span.AddEvent(fmt.Sprint("wrote events = ", i))
return
}
// Update uses fn to update an exsisting aggregate and store in db.
func Update[A any, T PA[A]](ctx context.Context, es *EventStore, streamID string, fn func(context.Context, T) error) (agg T, err error) {
ctx, span := logz.Span(ctx)
defer span.End()
agg = new(A)
agg.SetStreamID(streamID)
if err = es.Load(ctx, agg); err != nil {
return
}
if err = event.ShouldExist(agg); err != nil {
return
}
if err = fn(ctx, agg); err != nil {
return
}
if _, err = es.Save(ctx, agg); err != nil {
return
}
return
}

View File

@@ -9,7 +9,6 @@ import (
type Aggregate interface {
// ApplyEvent applies the event to the aggrigate state
ApplyEvent(...Event)
StreamID() string
AggregateRootInterface
}
@@ -28,14 +27,6 @@ func Append(a Aggregate, lis ...Event) {
a.ApplyEvent(lis...)
}
// CheckVersion returns an error if the version does not match.
func CheckVersion(a Aggregate, version uint64) error {
if version != uint64(a.StreamVersion()) {
return fmt.Errorf("version wrong, got (proposed) %d != (expected) %d", version, a.StreamVersion())
}
return nil
}
// NotExists returns error if there are no events present.
func NotExists(a Aggregate) error {
if a.StreamVersion() != 0 {
@@ -44,10 +35,22 @@ func NotExists(a Aggregate) error {
return nil
}
// ShouldExists returns error if there are no events present.
func ShouldExist(a Aggregate) error {
if a.StreamVersion() == 0 {
return fmt.Errorf("%w, got version == %d", ErrShouldExist, a.StreamVersion())
}
return nil
}
type AggregateRootInterface interface {
// Events returns the aggrigate events
// Events returns the aggregate events
// pass true for only uncommitted events
Events(bool) Events
// StreamID returns aggregate stream ID
StreamID() string
// SetStreamID sets aggregate stream ID
SetStreamID(streamID string)
// StreamVersion returns last commit events
StreamVersion() uint64
// Version returns the current aggrigate version. (committed + uncommitted)
@@ -62,18 +65,17 @@ var _ AggregateRootInterface = &AggregateRoot{}
type AggregateRoot struct {
events Events
streamID string
streamVersion uint64
mu sync.RWMutex
}
func (a *AggregateRoot) Commit() {
a.streamVersion = uint64(len(a.events))
}
func (a *AggregateRoot) StreamVersion() uint64 {
return a.streamVersion
}
func (a *AggregateRoot) Commit() { a.streamVersion = uint64(len(a.events)) }
func (a *AggregateRoot) StreamID() string { return a.streamID }
func (a *AggregateRoot) SetStreamID(streamID string) { a.streamID = streamID }
func (a *AggregateRoot) StreamVersion() uint64 { return a.streamVersion }
func (a *AggregateRoot) Version() uint64 { return uint64(len(a.events)) }
func (a *AggregateRoot) Events(new bool) Events {
a.mu.RLock()
defer a.mu.RUnlock()
@@ -88,9 +90,6 @@ func (a *AggregateRoot) Events(new bool) Events {
return lis
}
func (a *AggregateRoot) Version() uint64 {
return uint64(len(a.events))
}
//lint:ignore U1000 is called by embeded interface
func (a *AggregateRoot) raise(lis ...Event) { //nolint
@@ -122,3 +121,4 @@ func (a *AggregateRoot) posStartAt(lis ...Event) {
}
var ErrShouldNotExist = errors.New("should not exist")
var ErrShouldExist = errors.New("should exist")

View File

@@ -0,0 +1,56 @@
package event_test
import (
"testing"
"github.com/sour-is/ev/pkg/es/event"
)
type Agg struct {
Value string
event.AggregateRoot
}
var _ event.Aggregate = (*Agg)(nil)
func (a *Agg) streamID() string {
return "value-" + a.Value
}
// ApplyEvent applies the event to the aggrigate state
func (a *Agg) ApplyEvent(lis ...event.Event) {
for _, e := range lis {
switch e := e.(type) {
case *ValueApplied:
a.Value = e.Value
a.SetStreamID(a.streamID())
}
}
}
type ValueApplied struct {
Value string
eventMeta event.Meta
}
var _ event.Event = (*ValueApplied)(nil)
func (e *ValueApplied) EventMeta() event.Meta {
if e == nil {
return event.Meta{}
}
return e.eventMeta
}
func (e *ValueApplied) SetEventMeta(m event.Meta) {
if e != nil {
e.eventMeta = m
}
}
func TestAggregate(t *testing.T) {
agg := &Agg{}
event.Append(agg, &ValueApplied{Value: "one"})
}

View File

@@ -1,7 +1,6 @@
package math_test
import (
"log"
"testing"
"github.com/matryer/is"
@@ -82,9 +81,9 @@ func TestPagerBox(t *testing.T) {
for _, tt := range tests {
start, count := math.PagerBox(tt.first, tt.last, tt.pos, tt.n)
if count > 0 {
log.Print(tt, "|", start, count, int64(start)+count-1)
t.Log(tt, "|", start, count, int64(start)+count-1)
} else {
log.Print(tt, "|", start, count, int64(start)+count+1)
t.Log(tt, "|", start, count, int64(start)+count+1)
}
is.Equal(start, tt.start)

View File

@@ -6,7 +6,6 @@ import (
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"strconv"
"strings"
@@ -85,10 +84,10 @@ func (s *service) get(w http.ResponseWriter, r *http.Request) {
count = i
}
log.Print("GET topic=", name, " idx=", pos, " n=", count)
span.AddEvent(fmt.Sprint("GET topic=", name, " idx=", pos, " n=", count))
events, err := s.es.Read(ctx, "post-"+name, pos, count)
if err != nil {
log.Print(err)
span.RecordError(err)
w.WriteHeader(http.StatusInternalServerError)
return
}
@@ -97,7 +96,7 @@ func (s *service) get(w http.ResponseWriter, r *http.Request) {
w.Header().Add("Content-Type", "application/json")
if err = encodeJSON(w, first, events...); err != nil {
log.Print(err)
span.RecordError(err)
w.WriteHeader(http.StatusInternalServerError)
return
@@ -159,7 +158,6 @@ func (s *service) post(w http.ResponseWriter, r *http.Request) {
m := events.First().EventMeta()
span.AddEvent(fmt.Sprint("POST topic=", name, " tags=", tags, " idx=", m.Position, " id=", m.EventID))
// log.Print("POST topic=", name, " tags=", tags, " idx=", m.Position, " id=", m.EventID)
w.WriteHeader(http.StatusAccepted)
if strings.Contains(r.Header.Get("Accept"), "application/json") {
@@ -201,11 +199,11 @@ func (s *service) websocket(w http.ResponseWriter, r *http.Request) {
pos = i - 1
}
log.Print("WS topic=", name, " idx=", pos)
span.AddEvent(fmt.Sprint("WS topic=", name, " idx=", pos))
c, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Print("upgrade:", err)
span.RecordError(err)
return
}
defer c.Close()
@@ -222,54 +220,54 @@ func (s *service) websocket(w http.ResponseWriter, r *http.Request) {
}
mt, message, err := c.ReadMessage()
if err != nil {
log.Println("read:", err)
span.RecordError(err)
return
}
log.Printf("recv: %d %s", mt, message)
span.AddEvent(fmt.Sprintf("recv: %d %s", mt, message))
}
}()
es := s.es.EventStream()
if es == nil {
log.Println("EventStore does not implement streaming")
span.AddEvent(fmt.Sprint("EventStore does not implement streaming"))
w.WriteHeader(http.StatusInternalServerError)
return
}
sub, err := es.Subscribe(ctx, "post-"+name, pos)
if err != nil {
log.Println(err)
span.RecordError(err)
return
}
defer func() {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
log.Println("stop ws")
span.AddEvent(fmt.Sprint("stop ws"))
sub.Close(ctx)
}()
log.Println("start ws")
span.AddEvent(fmt.Sprint("start ws"))
for sub.Recv(ctx) {
events, err := sub.Events(ctx)
if err != nil {
break
}
log.Println("got events ", len(events))
span.AddEvent(fmt.Sprint("got events ", len(events)))
for i := range events {
e, ok := events[i].(*PostEvent)
if !ok {
continue
}
log.Println("send", e.String())
span.AddEvent(fmt.Sprint("send", i, e.String()))
var b bytes.Buffer
if err = encodeJSON(&b, first, e); err != nil {
log.Print(err)
span.RecordError(err)
}
err = c.WriteMessage(websocket.TextMessage, b.Bytes())
if err != nil {
log.Println("write:", err)
span.RecordError(err)
break
}
}