feat: eventMeta -> event.IsEvent
This commit is contained in:
@@ -1,9 +1,9 @@
|
||||
extend type Query {
|
||||
posts(streamID: String! paging: PageInput): Connection!
|
||||
posts(name: String!, tag: String! = "", paging: PageInput): Connection!
|
||||
}
|
||||
extend type Subscription {
|
||||
"""after == 0 start from begining, after == -1 start from end"""
|
||||
postAdded(streamID: String! after: Int! = -1): PostEvent
|
||||
postAdded(name: String!, tag: String! = "", after: Int! = -1): PostEvent
|
||||
}
|
||||
type PostEvent implements Edge @goModel(model: "go.sour.is/ev/app/msgbus.PostEvent") {
|
||||
id: ID!
|
||||
|
||||
@@ -3,8 +3,10 @@ package msgbus
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"hash/fnv"
|
||||
"io"
|
||||
"net/http"
|
||||
"strconv"
|
||||
@@ -33,8 +35,8 @@ type service struct {
|
||||
}
|
||||
|
||||
type MsgbusResolver interface {
|
||||
Posts(ctx context.Context, streamID string, paging *gql.PageInput) (*gql.Connection, error)
|
||||
PostAdded(ctx context.Context, streamID string, after int64) (<-chan *PostEvent, error)
|
||||
Posts(ctx context.Context, name, tag string, paging *gql.PageInput) (*gql.Connection, error)
|
||||
PostAdded(ctx context.Context, name, tag string, after int64) (<-chan *PostEvent, error)
|
||||
IsResolver()
|
||||
}
|
||||
|
||||
@@ -115,7 +117,7 @@ func (s *service) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
// Posts is the resolver for the events field.
|
||||
func (s *service) Posts(ctx context.Context, streamID string, paging *gql.PageInput) (*gql.Connection, error) {
|
||||
func (s *service) Posts(ctx context.Context, name, tag string, paging *gql.PageInput) (*gql.Connection, error) {
|
||||
ctx, span := lg.Span(ctx)
|
||||
defer span.End()
|
||||
|
||||
@@ -124,6 +126,7 @@ func (s *service) Posts(ctx context.Context, streamID string, paging *gql.PageIn
|
||||
start := time.Now()
|
||||
defer s.m_req_time.Record(ctx, time.Since(start).Milliseconds())
|
||||
|
||||
streamID := withTag("post-"+name, tag)
|
||||
lis, err := s.es.Read(ctx, streamID, paging.GetIdx(0), paging.GetCount(30))
|
||||
if err != nil {
|
||||
span.RecordError(err)
|
||||
@@ -164,7 +167,7 @@ func (s *service) Posts(ctx context.Context, streamID string, paging *gql.PageIn
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (r *service) PostAdded(ctx context.Context, streamID string, after int64) (<-chan *PostEvent, error) {
|
||||
func (r *service) PostAdded(ctx context.Context, name, tag string, after int64) (<-chan *PostEvent, error) {
|
||||
ctx, span := lg.Span(ctx)
|
||||
defer span.End()
|
||||
|
||||
@@ -175,6 +178,8 @@ func (r *service) PostAdded(ctx context.Context, streamID string, after int64) (
|
||||
return nil, fmt.Errorf("EventStore does not implement streaming")
|
||||
}
|
||||
|
||||
streamID := withTag("post-"+name, tag)
|
||||
|
||||
sub, err := es.Subscribe(ctx, streamID, after)
|
||||
if err != nil {
|
||||
span.RecordError(err)
|
||||
@@ -232,14 +237,15 @@ func (s *service) get(w http.ResponseWriter, r *http.Request) {
|
||||
start := time.Now()
|
||||
defer s.m_req_time.Record(ctx, time.Since(start).Milliseconds())
|
||||
|
||||
name, _, _ := strings.Cut(r.URL.Path, "/")
|
||||
name, tag, _ := strings.Cut(r.URL.Path, "/")
|
||||
if name == "" {
|
||||
w.WriteHeader(http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
streamID := withTag("post-"+name, tag)
|
||||
|
||||
var first event.Event = event.NilEvent
|
||||
if lis, err := s.es.Read(ctx, "post-"+name, 0, 1); err == nil && len(lis) > 0 {
|
||||
if lis, err := s.es.Read(ctx, streamID, 0, 1); err == nil && len(lis) > 0 {
|
||||
first = lis[0]
|
||||
}
|
||||
|
||||
@@ -256,8 +262,8 @@ func (s *service) get(w http.ResponseWriter, r *http.Request) {
|
||||
count = i
|
||||
}
|
||||
|
||||
span.AddEvent(fmt.Sprint("GET topic=", name, " idx=", pos, " n=", count))
|
||||
events, err := s.es.Read(ctx, "post-"+name, pos, count)
|
||||
span.AddEvent(fmt.Sprint("GET topic=", streamID, " idx=", pos, " n=", count))
|
||||
events, err := s.es.Read(ctx, streamID, pos, count)
|
||||
if err != nil {
|
||||
span.RecordError(err)
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
@@ -356,14 +362,16 @@ func (s *service) websocket(w http.ResponseWriter, r *http.Request) {
|
||||
ctx, span := lg.Span(ctx)
|
||||
defer span.End()
|
||||
|
||||
name, _, _ := strings.Cut(r.URL.Path, "/")
|
||||
name, tag, _ := strings.Cut(r.URL.Path, "/")
|
||||
if name == "" {
|
||||
w.WriteHeader(http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
|
||||
streamID := withTag("post-"+name, tag)
|
||||
|
||||
var first event.Event = event.NilEvent
|
||||
if lis, err := s.es.Read(ctx, "post-"+name, 0, 1); err == nil && len(lis) > 0 {
|
||||
if lis, err := s.es.Read(ctx, streamID, 0, 1); err == nil && len(lis) > 0 {
|
||||
first = lis[0]
|
||||
}
|
||||
|
||||
@@ -374,7 +382,7 @@ func (s *service) websocket(w http.ResponseWriter, r *http.Request) {
|
||||
pos = i - 1
|
||||
}
|
||||
|
||||
span.AddEvent(fmt.Sprint("WS topic=", name, " idx=", pos))
|
||||
span.AddEvent(fmt.Sprint("WS topic=", streamID, " idx=", pos))
|
||||
|
||||
c, err := upgrader.Upgrade(w, r, nil)
|
||||
if err != nil {
|
||||
@@ -409,7 +417,7 @@ func (s *service) websocket(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
sub, err := es.Subscribe(ctx, "post-"+name, pos)
|
||||
sub, err := es.Subscribe(ctx, streamID, pos)
|
||||
if err != nil {
|
||||
span.RecordError(err)
|
||||
return
|
||||
@@ -457,21 +465,9 @@ type PostEvent struct {
|
||||
payload []byte
|
||||
tags []string
|
||||
|
||||
eventMeta event.Meta
|
||||
event.IsEvent
|
||||
}
|
||||
|
||||
func (e *PostEvent) EventMeta() event.Meta {
|
||||
if e == nil {
|
||||
return event.Meta{}
|
||||
}
|
||||
return e.eventMeta
|
||||
}
|
||||
func (e *PostEvent) SetEventMeta(eventMeta event.Meta) {
|
||||
if e == nil {
|
||||
return
|
||||
}
|
||||
e.eventMeta = eventMeta
|
||||
}
|
||||
func (e *PostEvent) Values() any {
|
||||
if e == nil {
|
||||
return nil
|
||||
@@ -503,25 +499,23 @@ func (e *PostEvent) UnmarshalBinary(b []byte) error {
|
||||
func (e *PostEvent) MarshalJSON() ([]byte, error) { return e.MarshalBinary() }
|
||||
func (e *PostEvent) UnmarshalJSON(b []byte) error { return e.UnmarshalBinary(b) }
|
||||
|
||||
func (e *PostEvent) ID() string { return e.eventMeta.GetEventID() }
|
||||
func (e *PostEvent) ID() string { return e.EventMeta().GetEventID() }
|
||||
func (e *PostEvent) Tags() []string { return e.tags }
|
||||
func (e *PostEvent) Payload() string { return string(e.payload) }
|
||||
func (e *PostEvent) PayloadJSON(ctx context.Context) (m map[string]interface{}, err error) {
|
||||
err = json.Unmarshal([]byte(e.payload), &m)
|
||||
return
|
||||
}
|
||||
func (e *PostEvent) Meta() *event.Meta { return &e.eventMeta }
|
||||
func (e *PostEvent) IsEdge() {}
|
||||
func (e *PostEvent) Meta() event.Meta { return e.EventMeta() }
|
||||
func (e *PostEvent) IsEdge() {}
|
||||
|
||||
func (e *PostEvent) String() string {
|
||||
var b bytes.Buffer
|
||||
|
||||
// b.WriteString(e.eventMeta.StreamID)
|
||||
// b.WriteRune('@')
|
||||
b.WriteString(strconv.FormatUint(e.eventMeta.Position, 10))
|
||||
b.WriteString(strconv.FormatUint(e.EventMeta().Position, 10))
|
||||
b.WriteRune('\t')
|
||||
|
||||
b.WriteString(e.eventMeta.EventID.String())
|
||||
b.WriteString(e.EventMeta().EventID.String())
|
||||
b.WriteRune('\t')
|
||||
b.WriteString(string(e.payload))
|
||||
if len(e.tags) > 0 {
|
||||
@@ -536,7 +530,7 @@ func fields(s string) []string {
|
||||
if s == "" {
|
||||
return nil
|
||||
}
|
||||
return strings.Split(s, "/")
|
||||
return strings.Split(s, ";")
|
||||
}
|
||||
|
||||
func encodeJSON(w io.Writer, first event.Event, events ...event.Event) error {
|
||||
@@ -573,3 +567,32 @@ func encodeJSON(w io.Writer, first event.Event, events ...event.Event) error {
|
||||
|
||||
return json.NewEncoder(w).Encode(out)
|
||||
}
|
||||
|
||||
func Projector(e event.Event) []event.Event {
|
||||
m := e.EventMeta()
|
||||
streamID := m.StreamID
|
||||
streamPos := m.Position
|
||||
|
||||
switch e := e.(type) {
|
||||
case *PostEvent:
|
||||
lis := make([]event.Event, len(e.tags))
|
||||
for i := range lis {
|
||||
tag := e.tags[i]
|
||||
ne := event.NewPtr(streamID, streamPos)
|
||||
event.SetStreamID(withTag(streamID, tag), ne)
|
||||
lis[i] = ne
|
||||
}
|
||||
|
||||
return lis
|
||||
}
|
||||
return nil
|
||||
}
|
||||
func withTag(id, tag string) string {
|
||||
if tag == "" {
|
||||
return id
|
||||
}
|
||||
|
||||
h := fnv.New128a()
|
||||
fmt.Fprint(h, tag)
|
||||
return id + "-" + base64.RawURLEncoding.EncodeToString(h.Sum(nil))
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user