feat: add logs/metrics/tracing

This commit is contained in:
Jon Lundy
2022-08-12 15:53:16 -06:00
parent fd97f2ff17
commit ea2186a034
12 changed files with 2238 additions and 15 deletions

View File

@@ -2,7 +2,6 @@ package streamer
import (
"context"
"log"
"github.com/sour-is/ev/pkg/es"
"github.com/sour-is/ev/pkg/es/driver"
@@ -56,19 +55,15 @@ func (s *streamer) Subscribe(ctx context.Context, streamID string, start int64)
size: es.AllEvents,
})
sub.unsub = s.delete(streamID, sub)
log.Println("start ", sub)
return sub, s.state.Modify(ctx, func(state *state) error {
state.subscribers[streamID] = append(state.subscribers[streamID], sub)
log.Println("add ", len(state.subscribers[streamID]))
return nil
})
}
func (s *streamer) Send(ctx context.Context, streamID string, events event.Events) error {
return s.state.Modify(ctx, func(state *state) error {
log.Println("trigger ", len(state.subscribers[streamID]))
for _, sub := range state.subscribers[streamID] {
log.Println("trigg ", sub)
err := sub.position.Modify(ctx, func(position *position) error {
position.size = int64(events.Last().EventMeta().Position - uint64(position.idx))
@@ -78,7 +73,9 @@ func (s *streamer) Send(ctx context.Context, streamID string, events event.Event
}
return nil
})
if err != nil { return err }
if err != nil {
return err
}
}
return nil
})
@@ -89,7 +86,6 @@ func (s *streamer) delete(streamID string, sub *subscription) func(context.Conte
if err := ctx.Err(); err != nil {
return err
}
log.Println("unsub ", s)
return s.state.Modify(ctx, func(state *state) error {
lis := state.subscribers[streamID]
for i := range lis {
@@ -150,8 +146,6 @@ type subscription struct {
func (s *subscription) Recv(ctx context.Context) bool {
var wait func(context.Context) bool
log.Println("wait ", s)
defer log.Println("recv ", s)
err := s.position.Modify(ctx, func(position *position) error {
if position.size == es.AllEvents {
return nil
@@ -187,13 +181,10 @@ func (s *subscription) Events(ctx context.Context) (event.Events, error) {
var events event.Events
return events, s.position.Modify(ctx, func(position *position) error {
var err error
log.Println("pos=", position, s)
events, err = s.events.Read(ctx, position.idx, position.size)
if err != nil {
log.Println(err, s)
return err
}
log.Println("events=", len(events), s)
position.size = int64(len(events))
if len(events) > 0 {
position.idx = int64(events.First().EventMeta().Position - 1)