diff --git a/pkg/es/driver/disk-store/disk-store.go b/pkg/es/driver/disk-store/disk-store.go index 7f151d7..f9a586c 100644 --- a/pkg/es/driver/disk-store/disk-store.go +++ b/pkg/es/driver/disk-store/disk-store.go @@ -3,6 +3,7 @@ package diskstore import ( "context" "fmt" + "log" "os" "path/filepath" "strings" @@ -115,6 +116,7 @@ func (es *eventLog) Read(ctx context.Context, pos, count int64) (event.Events, e } start, count := math.PagerBox(first, last, pos, count) + log.Println("reading", first, last, pos, count, start) if count == 0 { return nil } diff --git a/pkg/es/driver/streamer/streamer.go b/pkg/es/driver/streamer/streamer.go index 26e362e..6209c04 100644 --- a/pkg/es/driver/streamer/streamer.go +++ b/pkg/es/driver/streamer/streamer.go @@ -2,6 +2,7 @@ package streamer import ( "context" + "log" "github.com/sour-is/ev/pkg/es" "github.com/sour-is/ev/pkg/es/driver" @@ -55,16 +56,20 @@ func (s *streamer) Subscribe(ctx context.Context, streamID string, start int64) size: es.AllEvents, }) sub.unsub = s.delete(streamID, sub) + log.Println("start ", sub) return sub, s.state.Modify(ctx, func(state *state) error { state.subscribers[streamID] = append(state.subscribers[streamID], sub) + log.Println("add ", len(state.subscribers[streamID])) return nil }) } func (s *streamer) Send(ctx context.Context, streamID string, events event.Events) error { return s.state.Modify(ctx, func(state *state) error { + log.Println("trigger ", len(state.subscribers[streamID])) for _, sub := range state.subscribers[streamID] { - return sub.position.Modify(ctx, func(position *position) error { + log.Println("trigg ", sub) + err := sub.position.Modify(ctx, func(position *position) error { position.size = int64(events.Last().EventMeta().Position - uint64(position.idx)) if position.wait != nil { @@ -73,6 +78,7 @@ func (s *streamer) Send(ctx context.Context, streamID string, events event.Event } return nil }) + if err != nil { return err } } return nil }) @@ -83,6 +89,7 @@ func (s *streamer) delete(streamID string, sub *subscription) func(context.Conte if err := ctx.Err(); err != nil { return err } + log.Println("unsub ", s) return s.state.Modify(ctx, func(state *state) error { lis := state.subscribers[streamID] for i := range lis { @@ -143,6 +150,8 @@ type subscription struct { func (s *subscription) Recv(ctx context.Context) bool { var wait func(context.Context) bool + log.Println("wait ", s) + defer log.Println("recv ", s) err := s.position.Modify(ctx, func(position *position) error { if position.size == es.AllEvents { return nil @@ -178,7 +187,13 @@ func (s *subscription) Events(ctx context.Context) (event.Events, error) { var events event.Events return events, s.position.Modify(ctx, func(position *position) error { var err error - events, err = s.events.Read(ctx, int64(position.idx), position.size) + log.Println("pos=", position, s) + events, err = s.events.Read(ctx, position.idx, position.size) + if err != nil { + log.Println(err, s) + return err + } + log.Println("events=", len(events), s) position.size = int64(len(events)) if len(events) > 0 { position.idx = int64(events.First().EventMeta().Position - 1) diff --git a/pkg/msgbus/service.go b/pkg/msgbus/service.go index 4a6da6e..5603281 100644 --- a/pkg/msgbus/service.go +++ b/pkg/msgbus/service.go @@ -85,7 +85,7 @@ func (s *service) get(w http.ResponseWriter, r *http.Request) { if strings.Contains(r.Header.Get("Accept"), "application/json") { w.Header().Add("Content-Type", "application/json") - if err = encodeJSON(w, first, events); err != nil { + if err = encodeJSON(w, first, events...); err != nil { log.Print(err) w.WriteHeader(http.StatusInternalServerError) @@ -147,7 +147,7 @@ func (s *service) post(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusAccepted) if strings.Contains(r.Header.Get("Accept"), "application/json") { w.Header().Add("Content-Type", "application/json") - if err = encodeJSON(w, first, events); err != nil { + if err = encodeJSON(w, first, events...); err != nil { log.Print(err) w.WriteHeader(http.StatusInternalServerError) @@ -178,7 +178,7 @@ func (s *service) websocket(w http.ResponseWriter, r *http.Request) { qry := r.URL.Query() if i, err := strconv.ParseInt(qry.Get("index"), 10, 64); err == nil { - pos = i + pos = i - 1 } log.Print("WS topic=", name, " idx=", pos) @@ -235,15 +235,15 @@ func (s *service) websocket(w http.ResponseWriter, r *http.Request) { break } log.Println("got events ", len(events)) - for _, e := range events { - e, ok := e.(*PostEvent) + for i := range events { + e, ok := events[i].(*PostEvent) if !ok { continue } log.Println("send", e.String()) var b bytes.Buffer - if err = encodeJSON(&b, first, events); err != nil { + if err = encodeJSON(&b, first, e); err != nil { log.Print(err) } @@ -300,7 +300,7 @@ func fields(s string) []string { return strings.Split(s, "/") } -func encodeJSON(w io.Writer, first event.Event, events event.Events) error { +func encodeJSON(w io.Writer, first event.Event, events ...event.Event) error { out := make([]struct { ID uint64 `json:"id"` Payload []byte `json:"payload"`