From 5458d35be0e8e71d894e39831ae94410129a12ea Mon Sep 17 00:00:00 2001 From: Jon Lundy Date: Mon, 15 Aug 2022 08:05:04 -0600 Subject: [PATCH] fix: event marshalling --- pkg/domain/salty-user.go | 6 ++--- pkg/es/driver/disk-store/disk-store.go | 4 ++-- pkg/es/es_test.go | 4 ++-- pkg/es/event/aggregate_test.go | 4 ++-- pkg/es/event/events.go | 20 ++++------------- pkg/es/event/events_test.go | 4 ++-- pkg/es/event/reflect.go | 31 +++++++++----------------- pkg/msgbus/service.go | 4 ++-- 8 files changed, 27 insertions(+), 50 deletions(-) diff --git a/pkg/domain/salty-user.go b/pkg/domain/salty-user.go index 41a5744..6223157 100644 --- a/pkg/domain/salty-user.go +++ b/pkg/domain/salty-user.go @@ -4,7 +4,6 @@ import ( "bytes" "context" "crypto/sha256" - "encoding" "fmt" "log" "strings" @@ -60,7 +59,6 @@ type UserRegistered struct { } var _ event.Event = (*UserRegistered)(nil) -var _ encoding.TextMarshaler = (*UserRegistered)(nil) func (e *UserRegistered) EventMeta() event.Meta { if e == nil { @@ -74,7 +72,7 @@ func (e *UserRegistered) SetEventMeta(m event.Meta) { e.eventMeta = m } } -func (e *UserRegistered) MarshalText() (text []byte, err error) { +func (e *UserRegistered) MarshalBinary() (text []byte, err error) { var b bytes.Buffer b.WriteString(e.Name) b.WriteRune('\t') @@ -82,7 +80,7 @@ func (e *UserRegistered) MarshalText() (text []byte, err error) { return b.Bytes(), nil } -func (e *UserRegistered) UnmarshalText(b []byte) error { +func (e *UserRegistered) UnmarshalBinary(b []byte) error { name, pub, ok := bytes.Cut(b, []byte{'\t'}) if !ok { return fmt.Errorf("parse error") diff --git a/pkg/es/driver/disk-store/disk-store.go b/pkg/es/driver/disk-store/disk-store.go index 51c44b1..04657e1 100644 --- a/pkg/es/driver/disk-store/disk-store.go +++ b/pkg/es/driver/disk-store/disk-store.go @@ -171,7 +171,7 @@ func (es *eventLog) Append(ctx context.Context, events event.Events, version uin for i, e := range events { span.AddEvent(fmt.Sprintf("append event %d of %d", i, len(events))) - b, err = event.MarshalText(e) + b, err = event.MarshalBinary(e) if err != nil { span.RecordError(err) @@ -231,7 +231,7 @@ func (es *eventLog) Read(ctx context.Context, pos, count int64) (event.Events, e span.RecordError(err) return err } - events[i], err = event.UnmarshalText(ctx, b, start) + events[i], err = event.UnmarshalBinary(ctx, b, start) if err != nil { span.RecordError(err) return err diff --git a/pkg/es/es_test.go b/pkg/es/es_test.go index 3e01783..2572260 100644 --- a/pkg/es/es_test.go +++ b/pkg/es/es_test.go @@ -111,10 +111,10 @@ func (e *ValueSet) SetEventMeta(eventMeta event.Meta) { } e.eventMeta = eventMeta } -func (e *ValueSet) MarshalText() ([]byte, error) { +func (e *ValueSet) MarshalBinary() ([]byte, error) { return json.Marshal(e) } -func (e *ValueSet) UnmarshalText(b []byte) error { +func (e *ValueSet) UnmarshalBinary(b []byte) error { return json.Unmarshal(b, e) } diff --git a/pkg/es/event/aggregate_test.go b/pkg/es/event/aggregate_test.go index a917388..ed16082 100644 --- a/pkg/es/event/aggregate_test.go +++ b/pkg/es/event/aggregate_test.go @@ -51,10 +51,10 @@ func (e *ValueApplied) SetEventMeta(m event.Meta) { } } -func (e *ValueApplied) MarshalText() ([]byte, error) { +func (e *ValueApplied) MarshalBinary() ([]byte, error) { return json.Marshal(e) } -func (e *ValueApplied) UnmarshalText(b []byte) error { +func (e *ValueApplied) UnmarshalBinary(b []byte) error { return json.Unmarshal(b, e) } diff --git a/pkg/es/event/events.go b/pkg/es/event/events.go index 281990f..be42a2d 100644 --- a/pkg/es/event/events.go +++ b/pkg/es/event/events.go @@ -1,7 +1,6 @@ package event import ( - "bytes" "crypto/rand" "encoding" "encoding/json" @@ -31,8 +30,8 @@ type Event interface { EventMeta() Meta SetEventMeta(Meta) - encoding.TextMarshaler - encoding.TextUnmarshaler + encoding.BinaryMarshaler + encoding.BinaryUnmarshaler } // Events is a list of events @@ -75,17 +74,6 @@ func (lis Events) Last() Event { } return lis[len(lis)-1] } -func (lis Events) MarshalText() ([]byte, error) { - b := &bytes.Buffer{} - for i := range lis { - txt, err := MarshalText(lis[i]) - if err != nil { - return nil, err - } - b.Write(txt) - } - return b.Bytes(), nil -} func TypeOf(e Event) string { if ie, ok := e.(interface{ UnwrapEvent() Event }); ok { @@ -150,9 +138,9 @@ func (*nilEvent) SetEventMeta(eventMeta Meta) {} var NilEvent = &nilEvent{} -func (e *nilEvent) MarshalText() ([]byte, error) { +func (e *nilEvent) MarshalBinary() ([]byte, error) { return json.Marshal(e) } -func (e *nilEvent) UnmarshalText(b []byte) error { +func (e *nilEvent) UnmarshalBinary(b []byte) error { return json.Unmarshal(b, e) } diff --git a/pkg/es/event/events_test.go b/pkg/es/event/events_test.go index 4fae20b..bee5fed 100644 --- a/pkg/es/event/events_test.go +++ b/pkg/es/event/events_test.go @@ -29,10 +29,10 @@ func (e *DummyEvent) SetEventMeta(eventMeta event.Meta) { } e.eventMeta = eventMeta } -func (e *DummyEvent) MarshalText() ([]byte, error) { +func (e *DummyEvent) MarshalBinary() ([]byte, error) { return json.Marshal(e) } -func (e *DummyEvent) UnmarshalText(b []byte) error { +func (e *DummyEvent) UnmarshalBinary(b []byte) error { return json.Unmarshal(b, e) } diff --git a/pkg/es/event/reflect.go b/pkg/es/event/reflect.go index bbce699..97de8c4 100644 --- a/pkg/es/event/reflect.go +++ b/pkg/es/event/reflect.go @@ -3,10 +3,8 @@ package event import ( "bytes" "context" - "encoding" "encoding/json" "fmt" - "io" "net/url" "reflect" "strings" @@ -59,10 +57,10 @@ func (u UnknownEvent) EventType() string { return u.eventType } func (u *UnknownEvent) SetEventMeta(em Meta) { u.eventMeta = em } -func (u *UnknownEvent) UnmarshalText(b []byte) error { +func (u *UnknownEvent) UnmarshalBinary(b []byte) error { return json.Unmarshal(b, &u.values) } -func (u *UnknownEvent) MarshalText() ([]byte, error) { +func (u *UnknownEvent) MarshalBinary() ([]byte, error) { return json.Marshal(u.values) } @@ -119,14 +117,15 @@ func GetContainer(ctx context.Context, s string) Event { return e } -func MarshalText(e Event) (txt []byte, err error) { +func MarshalBinary(e Event) (txt []byte, err error) { b := &bytes.Buffer{} - if _, err = writeMarshaler(b, e.EventMeta().EventID); err != nil { + m := e.EventMeta() + if _, err = b.WriteString(m.EventID.String()); err != nil { return nil, err } b.WriteRune('\t') - if _, err = b.WriteString(e.EventMeta().StreamID); err != nil { + if _, err = b.WriteString(m.StreamID); err != nil { return nil, err } b.WriteRune('\t') @@ -134,7 +133,7 @@ func MarshalText(e Event) (txt []byte, err error) { return nil, err } b.WriteRune('\t') - if txt, err = e.MarshalText(); err != nil { + if txt, err = e.MarshalBinary(); err != nil { return nil, err } _, err = b.Write(txt) @@ -142,7 +141,7 @@ func MarshalText(e Event) (txt []byte, err error) { return b.Bytes(), err } -func UnmarshalText(ctx context.Context, txt []byte, pos uint64) (e Event, err error) { +func UnmarshalBinary(ctx context.Context, txt []byte, pos uint64) (e Event, err error) { sp := bytes.SplitN(txt, []byte{'\t'}, 4) if len(sp) != 4 { return nil, fmt.Errorf("invalid format. expected=4, got=%d", len(sp)) @@ -159,7 +158,7 @@ func UnmarshalText(ctx context.Context, txt []byte, pos uint64) (e Event, err er eventType := string(sp[2]) e = GetContainer(ctx, eventType) - if err = e.UnmarshalText(sp[3]); err != nil { + if err = e.UnmarshalBinary(sp[3]); err != nil { return nil, err } @@ -168,21 +167,13 @@ func UnmarshalText(ctx context.Context, txt []byte, pos uint64) (e Event, err er return e, nil } -func writeMarshaler(out io.Writer, in encoding.TextMarshaler) (int, error) { - if b, err := in.MarshalText(); err != nil { - return 0, err - } else { - return out.Write(b) - } -} - // DecodeEvents unmarshals the byte list into Events. func DecodeEvents(ctx context.Context, lis ...[]byte) (Events, error) { elis := make([]Event, len(lis)) var err error for i, txt := range lis { - elis[i], err = UnmarshalText(ctx, txt, uint64(i)) + elis[i], err = UnmarshalBinary(ctx, txt, uint64(i)) if err != nil { return nil, err } @@ -195,7 +186,7 @@ func EncodeEvents(events ...Event) (lis [][]byte, err error) { lis = make([][]byte, len(events)) for i, txt := range events { - lis[i], err = MarshalText(txt) + lis[i], err = MarshalBinary(txt) if err != nil { return nil, err } diff --git a/pkg/msgbus/service.go b/pkg/msgbus/service.go index bcecc1f..d3ce35b 100644 --- a/pkg/msgbus/service.go +++ b/pkg/msgbus/service.go @@ -337,10 +337,10 @@ func (e *PostEvent) SetEventMeta(eventMeta event.Meta) { } e.eventMeta = eventMeta } -func (e *PostEvent) MarshalText() ([]byte, error) { +func (e *PostEvent) MarshalBinary() ([]byte, error) { return json.Marshal(e) } -func (e *PostEvent) UnmarshalText(b []byte) error { +func (e *PostEvent) UnmarshalBinary(b []byte) error { return json.Unmarshal(b, e) }