2022-08-04 14:37:51 -06:00
|
|
|
package event
|
|
|
|
|
|
|
|
import (
|
|
|
|
"bytes"
|
2022-08-06 09:52:36 -06:00
|
|
|
"context"
|
2022-08-04 14:37:51 -06:00
|
|
|
"encoding"
|
|
|
|
"encoding/json"
|
|
|
|
"fmt"
|
|
|
|
"io"
|
|
|
|
"net/url"
|
|
|
|
"reflect"
|
|
|
|
"strings"
|
2022-08-06 09:52:36 -06:00
|
|
|
|
|
|
|
"github.com/sour-is/ev/pkg/locker"
|
2022-08-04 14:37:51 -06:00
|
|
|
)
|
|
|
|
|
2022-08-06 09:52:36 -06:00
|
|
|
type config struct {
|
|
|
|
eventTypes map[string]reflect.Type
|
|
|
|
}
|
|
|
|
|
2022-08-04 14:37:51 -06:00
|
|
|
var (
|
2022-08-06 09:52:36 -06:00
|
|
|
eventTypes = locker.New(&config{eventTypes: make(map[string]reflect.Type)})
|
2022-08-04 14:37:51 -06:00
|
|
|
)
|
|
|
|
|
|
|
|
type UnknownEvent struct {
|
|
|
|
eventType string
|
|
|
|
values map[string]json.RawMessage
|
|
|
|
|
|
|
|
eventMeta Meta
|
|
|
|
}
|
|
|
|
|
|
|
|
var _ Event = (*UnknownEvent)(nil)
|
|
|
|
|
|
|
|
func NewUnknownEventFromValues(eventType string, meta Meta, values url.Values) *UnknownEvent {
|
|
|
|
jsonValues := make(map[string]json.RawMessage, len(values))
|
|
|
|
for k, v := range values {
|
|
|
|
switch len(v) {
|
|
|
|
case 0:
|
|
|
|
jsonValues[k] = []byte("null")
|
|
|
|
case 1:
|
|
|
|
jsonValues[k] = embedJSON(v[0])
|
|
|
|
default:
|
|
|
|
parts := make([][]byte, len(v))
|
|
|
|
for i := range v {
|
|
|
|
parts[i] = embedJSON(v[i])
|
|
|
|
}
|
|
|
|
jsonValues[k] = append([]byte("["), bytes.Join(parts, []byte(","))...)
|
|
|
|
jsonValues[k] = append(jsonValues[k], ']')
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return &UnknownEvent{eventType: eventType, eventMeta: meta, values: jsonValues}
|
|
|
|
}
|
|
|
|
func NewUnknownEventFromRaw(eventType string, meta Meta, values map[string]json.RawMessage) *UnknownEvent {
|
|
|
|
return &UnknownEvent{eventType: eventType, eventMeta: meta, values: values}
|
|
|
|
}
|
|
|
|
func (u UnknownEvent) EventMeta() Meta { return u.eventMeta }
|
|
|
|
func (u UnknownEvent) EventType() string { return u.eventType }
|
|
|
|
func (u *UnknownEvent) SetEventMeta(em Meta) {
|
|
|
|
u.eventMeta = em
|
|
|
|
}
|
2022-08-14 10:56:00 -06:00
|
|
|
func (u *UnknownEvent) UnmarshalText(b []byte) error {
|
2022-08-04 14:37:51 -06:00
|
|
|
return json.Unmarshal(b, &u.values)
|
|
|
|
}
|
2022-08-14 10:56:00 -06:00
|
|
|
func (u *UnknownEvent) MarshalText() ([]byte, error) {
|
2022-08-04 14:37:51 -06:00
|
|
|
return json.Marshal(u.values)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Register a type container for Unmarshalling values into. The type must implement Event and not be a nil value.
|
2022-08-06 09:52:36 -06:00
|
|
|
func Register(ctx context.Context, lis ...Event) error {
|
2022-08-04 14:37:51 -06:00
|
|
|
for _, e := range lis {
|
2022-08-06 09:52:36 -06:00
|
|
|
if err := ctx.Err(); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2022-08-04 14:37:51 -06:00
|
|
|
if e == nil {
|
2022-08-06 09:52:36 -06:00
|
|
|
return fmt.Errorf("can't register event.Event of type=%T with value=%v", e, e)
|
2022-08-04 14:37:51 -06:00
|
|
|
}
|
|
|
|
|
|
|
|
value := reflect.ValueOf(e)
|
|
|
|
|
|
|
|
if value.IsNil() {
|
2022-08-06 09:52:36 -06:00
|
|
|
return fmt.Errorf("can't register event.Event of type=%T with value=%v", e, e)
|
2022-08-04 14:37:51 -06:00
|
|
|
}
|
|
|
|
|
|
|
|
value = reflect.Indirect(value)
|
2022-08-06 09:52:36 -06:00
|
|
|
|
|
|
|
name := TypeOf(e)
|
2022-08-04 14:37:51 -06:00
|
|
|
typ := value.Type()
|
|
|
|
|
2022-08-06 09:52:36 -06:00
|
|
|
if err := eventTypes.Modify(ctx, func(c *config) error {
|
|
|
|
c.eventTypes[name] = typ
|
|
|
|
return nil
|
|
|
|
}); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2022-08-04 14:37:51 -06:00
|
|
|
}
|
2022-08-06 09:52:36 -06:00
|
|
|
return nil
|
2022-08-04 14:37:51 -06:00
|
|
|
}
|
2022-08-06 09:52:36 -06:00
|
|
|
func GetContainer(ctx context.Context, s string) Event {
|
|
|
|
var e Event
|
|
|
|
|
|
|
|
eventTypes.Modify(ctx, func(c *config) error {
|
2022-08-14 10:56:00 -06:00
|
|
|
typ, ok := c.eventTypes[s]
|
|
|
|
if !ok {
|
|
|
|
return fmt.Errorf("not defined")
|
|
|
|
}
|
|
|
|
newType := reflect.New(typ)
|
|
|
|
newInterface := newType.Interface()
|
|
|
|
if iface, ok := newInterface.(Event); ok {
|
|
|
|
e = iface
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
return fmt.Errorf("failed")
|
2022-08-06 09:52:36 -06:00
|
|
|
})
|
|
|
|
if e == nil {
|
|
|
|
e = &UnknownEvent{eventType: s}
|
2022-08-04 14:37:51 -06:00
|
|
|
}
|
2022-08-06 09:52:36 -06:00
|
|
|
|
|
|
|
return e
|
2022-08-04 14:37:51 -06:00
|
|
|
}
|
|
|
|
|
|
|
|
func MarshalText(e Event) (txt []byte, err error) {
|
|
|
|
b := &bytes.Buffer{}
|
|
|
|
|
|
|
|
if _, err = writeMarshaler(b, e.EventMeta().EventID); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
b.WriteRune('\t')
|
|
|
|
if _, err = b.WriteString(e.EventMeta().StreamID); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
b.WriteRune('\t')
|
|
|
|
if _, err = b.WriteString(TypeOf(e)); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
b.WriteRune('\t')
|
2022-08-14 10:56:00 -06:00
|
|
|
if txt, err = e.MarshalText(); err != nil {
|
|
|
|
return nil, err
|
2022-08-04 14:37:51 -06:00
|
|
|
}
|
|
|
|
_, err = b.Write(txt)
|
|
|
|
|
|
|
|
return b.Bytes(), err
|
|
|
|
}
|
|
|
|
|
2022-08-06 09:52:36 -06:00
|
|
|
func UnmarshalText(ctx context.Context, txt []byte, pos uint64) (e Event, err error) {
|
2022-08-04 14:37:51 -06:00
|
|
|
sp := bytes.SplitN(txt, []byte{'\t'}, 4)
|
|
|
|
if len(sp) != 4 {
|
|
|
|
return nil, fmt.Errorf("invalid format. expected=4, got=%d", len(sp))
|
|
|
|
}
|
|
|
|
|
|
|
|
m := Meta{}
|
|
|
|
if err = m.EventID.UnmarshalText(sp[0]); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
m.StreamID = string(sp[1])
|
|
|
|
m.Position = pos
|
|
|
|
|
|
|
|
eventType := string(sp[2])
|
2022-08-06 09:52:36 -06:00
|
|
|
e = GetContainer(ctx, eventType)
|
2022-08-04 14:37:51 -06:00
|
|
|
|
2022-08-14 10:56:00 -06:00
|
|
|
if err = e.UnmarshalText(sp[3]); err != nil {
|
|
|
|
return nil, err
|
2022-08-04 14:37:51 -06:00
|
|
|
}
|
2022-08-14 10:56:00 -06:00
|
|
|
|
2022-08-04 14:37:51 -06:00
|
|
|
e.SetEventMeta(m)
|
|
|
|
|
|
|
|
return e, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func writeMarshaler(out io.Writer, in encoding.TextMarshaler) (int, error) {
|
|
|
|
if b, err := in.MarshalText(); err != nil {
|
|
|
|
return 0, err
|
|
|
|
} else {
|
|
|
|
return out.Write(b)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// DecodeEvents unmarshals the byte list into Events.
|
2022-08-06 09:52:36 -06:00
|
|
|
func DecodeEvents(ctx context.Context, lis ...[]byte) (Events, error) {
|
2022-08-04 14:37:51 -06:00
|
|
|
elis := make([]Event, len(lis))
|
|
|
|
|
|
|
|
var err error
|
|
|
|
for i, txt := range lis {
|
2022-08-06 09:52:36 -06:00
|
|
|
elis[i], err = UnmarshalText(ctx, txt, uint64(i))
|
2022-08-04 14:37:51 -06:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return elis, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func EncodeEvents(events ...Event) (lis [][]byte, err error) {
|
|
|
|
lis = make([][]byte, len(events))
|
|
|
|
|
|
|
|
for i, txt := range events {
|
|
|
|
lis[i], err = MarshalText(txt)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return lis, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func embedJSON(s string) json.RawMessage {
|
|
|
|
if len(s) > 1 && s[0] == '{' && s[len(s)-1] == '}' {
|
|
|
|
return []byte(s)
|
|
|
|
}
|
|
|
|
if len(s) > 1 && s[0] == '[' && s[len(s)-1] == ']' {
|
|
|
|
return []byte(s)
|
|
|
|
}
|
|
|
|
return []byte(fmt.Sprintf(`"%s"`, strings.Replace(s, `"`, `\"`, -1)))
|
|
|
|
}
|