diff --git a/api/gql_ev/resolver.go b/api/gql_ev/resolver.go index f4cdbaf..1f148a0 100644 --- a/api/gql_ev/resolver.go +++ b/api/gql_ev/resolver.go @@ -44,6 +44,9 @@ func New(es *es.EventStore) (*Resolver, error) { // Posts is the resolver for the events field. func (r *Resolver) Posts(ctx context.Context, streamID string, paging *PageInput) (*Connection, error) { + ctx, span := logz.Span(ctx) + defer span.End() + r.Mresolver_posts.Add(ctx, 1) lis, err := r.es.Read(ctx, streamID, paging.GetIdx(0), paging.GetCount(30)) @@ -53,6 +56,7 @@ func (r *Resolver) Posts(ctx context.Context, streamID string, paging *PageInput edges := make([]Edge, 0, len(lis)) for i := range lis { + span.AddEvent(fmt.Sprint("post ", i, " of ", len(lis))) e := lis[i] m := e.EventMeta() @@ -89,6 +93,9 @@ func (r *Resolver) Posts(ctx context.Context, streamID string, paging *PageInput } func (r *Resolver) PostAdded(ctx context.Context, streamID string, after int64) (<-chan *PostEvent, error) { + ctx, span := logz.Span(ctx) + defer span.End() + r.Mresolver_post_added.Add(ctx, 1) es := r.es.EventStream() diff --git a/internal/logz/init.go b/internal/logz/init.go index 5e6e7db..9de0fcc 100644 --- a/internal/logz/init.go +++ b/internal/logz/init.go @@ -8,6 +8,9 @@ import ( ) func Init(ctx context.Context, name string) (context.Context, func() error) { + ctx, span := Span(ctx) + defer span.End() + stop := [3]func() error{ initLogger(name), } @@ -20,7 +23,9 @@ func Init(ctx context.Context, name string) (context.Context, func() error) { log.Println("flushing logs...") errs := make([]error, len(stop)) for i, fn := range stop { - errs[i] = fn() + if fn != nil { + errs[i] = fn() + } } log.Println("all stopped.") return multierr.Combine(errs...) diff --git a/pkg/es/driver/disk-store/disk-store.go b/pkg/es/driver/disk-store/disk-store.go index b26802d..ccafb14 100644 --- a/pkg/es/driver/disk-store/disk-store.go +++ b/pkg/es/driver/disk-store/disk-store.go @@ -39,6 +39,9 @@ const AppendOnly = es.AppendOnly const AllEvents = es.AllEvents func Init(ctx context.Context) error { + _, span := logz.Span(ctx) + defer span.End() + m := logz.Meter(ctx) var err, errs error @@ -80,8 +83,14 @@ func (d *diskStore) Open(ctx context.Context, dsn string) (driver.Driver, error) } } c, err := cache.NewWithEvict(CachSize, func(ctx context.Context, s string, l *lockedWal) { + _, span := logz.Span(ctx) + defer span.End() + l.Modify(ctx, func(w *wal.Log) error { - // logz.Mdisk_evict.Add(ctx, 1) + _, span := logz.Span(ctx) + defer span.End() + + d.Mdisk_evict.Add(ctx, 1) err := w.Close() if err != nil { @@ -102,9 +111,15 @@ func (d *diskStore) Open(ctx context.Context, dsn string) (driver.Driver, error) }, nil } func (ds *diskStore) EventLog(ctx context.Context, streamID string) (driver.EventLog, error) { + _, span := logz.Span(ctx) + defer span.End() + el := &eventLog{streamID: streamID} return el, ds.openlogs.Modify(ctx, func(openlogs *openlogs) error { + _, span := logz.Span(ctx) + defer span.End() + if events, ok := openlogs.logs.Get(streamID); ok { el.events = *events return nil @@ -128,10 +143,16 @@ type eventLog struct { var _ driver.EventLog = (*eventLog)(nil) func (es *eventLog) Append(ctx context.Context, events event.Events, version uint64) (uint64, error) { + _, span := logz.Span(ctx) + defer span.End() + event.SetStreamID(es.streamID, events...) var count uint64 err := es.events.Modify(ctx, func(l *wal.Log) error { + _, span := logz.Span(ctx) + defer span.End() + last, err := l.LastIndex() if err != nil { return err @@ -145,6 +166,8 @@ func (es *eventLog) Append(ctx context.Context, events event.Events, version uin batch := &wal.Batch{} for i, e := range events { + span.AddEvent(fmt.Sprintf("append event %d of %d", i, len(events))) + b, err = event.MarshalText(e) if err != nil { return err @@ -162,9 +185,15 @@ func (es *eventLog) Append(ctx context.Context, events event.Events, version uin return count, err } func (es *eventLog) Read(ctx context.Context, pos, count int64) (event.Events, error) { + _, span := logz.Span(ctx) + defer span.End() + var events event.Events err := es.events.Modify(ctx, func(stream *wal.Log) error { + _, span := logz.Span(ctx) + defer span.End() + first, err := stream.FirstIndex() if err != nil { return err @@ -186,6 +215,8 @@ func (es *eventLog) Read(ctx context.Context, pos, count int64) (event.Events, e events = make([]event.Event, math.Abs(count)) for i := range events { + span.AddEvent(fmt.Sprintf("read event %d of %d", i, len(events))) + // --- var b []byte b, err = stream.Read(start) @@ -219,6 +250,9 @@ func (es *eventLog) Read(ctx context.Context, pos, count int64) (event.Events, e return events, err } func (es *eventLog) FirstIndex(ctx context.Context) (uint64, error) { + _, span := logz.Span(ctx) + defer span.End() + var idx uint64 var err error @@ -230,6 +264,9 @@ func (es *eventLog) FirstIndex(ctx context.Context) (uint64, error) { return idx, err } func (es *eventLog) LastIndex(ctx context.Context) (uint64, error) { + _, span := logz.Span(ctx) + defer span.End() + var idx uint64 var err error diff --git a/pkg/es/driver/mem-store/mem-store.go b/pkg/es/driver/mem-store/mem-store.go index 9192b7e..e5d7d89 100644 --- a/pkg/es/driver/mem-store/mem-store.go +++ b/pkg/es/driver/mem-store/mem-store.go @@ -4,6 +4,7 @@ import ( "context" "fmt" + "github.com/sour-is/ev/internal/logz" "github.com/sour-is/ev/pkg/es" "github.com/sour-is/ev/pkg/es/driver" "github.com/sour-is/ev/pkg/es/event" @@ -26,19 +27,31 @@ const AppendOnly = es.AppendOnly const AllEvents = es.AllEvents func Init(ctx context.Context) { + ctx, span := logz.Span(ctx) + defer span.End() + es.Register(ctx, "mem", &memstore{}) } var _ driver.Driver = (*memstore)(nil) -func (memstore) Open(_ context.Context, name string) (driver.Driver, error) { +func (memstore) Open(ctx context.Context, name string) (driver.Driver, error) { + _, span := logz.Span(ctx) + defer span.End() + s := &state{streams: make(map[string]*locker.Locked[event.Events])} return &memstore{locker.New(s)}, nil } func (m *memstore) EventLog(ctx context.Context, streamID string) (driver.EventLog, error) { + ctx, span := logz.Span(ctx) + defer span.End() + el := &eventLog{streamID: streamID} err := m.state.Modify(ctx, func(state *state) error { + _, span := logz.Span(ctx) + defer span.End() + l, ok := state.streams[streamID] if !ok { l = locker.New(&event.Events{}) @@ -57,15 +70,23 @@ var _ driver.EventLog = (*eventLog)(nil) // Append implements driver.EventStore func (m *eventLog) Append(ctx context.Context, events event.Events, version uint64) (uint64, error) { + ctx, span := logz.Span(ctx) + defer span.End() + event.SetStreamID(m.streamID, events...) return uint64(len(events)), m.events.Modify(ctx, func(stream *event.Events) error { + _, span := logz.Span(ctx) + defer span.End() + last := uint64(len(*stream)) if version != AppendOnly && version != last { return fmt.Errorf("current version wrong %d != %d", version, last) } for i := range events { + span.AddEvent(fmt.Sprintf("read event %d of %d", i, len(events))) + pos := last + uint64(i) + 1 event.SetPosition(events[i], pos) *stream = append(*stream, events[i]) @@ -77,9 +98,15 @@ func (m *eventLog) Append(ctx context.Context, events event.Events, version uint // Read implements driver.EventStore func (es *eventLog) Read(ctx context.Context, pos int64, count int64) (event.Events, error) { + ctx, span := logz.Span(ctx) + defer span.End() + var events event.Events err := es.events.Modify(ctx, func(stream *event.Events) error { + _, span := logz.Span(ctx) + defer span.End() + first := stream.First().EventMeta().Position last := stream.Last().EventMeta().Position // --- @@ -94,6 +121,7 @@ func (es *eventLog) Read(ctx context.Context, pos int64, count int64) (event.Eve events = make([]event.Event, math.Abs(count)) for i := range events { + span.AddEvent(fmt.Sprintf("read event %d of %d", i, math.Abs(count))) // --- events[i] = (*stream)[start-1] // --- @@ -122,12 +150,18 @@ func (es *eventLog) Read(ctx context.Context, pos int64, count int64) (event.Eve // FirstIndex for the streamID func (m *eventLog) FirstIndex(ctx context.Context) (uint64, error) { + _, span := logz.Span(ctx) + defer span.End() + events, err := m.events.Copy(ctx) return events.First().EventMeta().Position, err } // LastIndex for the streamID func (m *eventLog) LastIndex(ctx context.Context) (uint64, error) { + _, span := logz.Span(ctx) + defer span.End() + events, err := m.events.Copy(ctx) return events.Last().EventMeta().Position, err } diff --git a/pkg/es/driver/streamer/streamer.go b/pkg/es/driver/streamer/streamer.go index 23d67aa..380ee9b 100644 --- a/pkg/es/driver/streamer/streamer.go +++ b/pkg/es/driver/streamer/streamer.go @@ -3,6 +3,7 @@ package streamer import ( "context" + "github.com/sour-is/ev/internal/logz" "github.com/sour-is/ev/pkg/es" "github.com/sour-is/ev/pkg/es/driver" "github.com/sour-is/ev/pkg/es/event" @@ -19,6 +20,9 @@ type streamer struct { } func New(ctx context.Context) *streamer { + ctx, span := logz.Span(ctx) + defer span.End() + return &streamer{state: locker.New(&state{subscribers: map[string][]*subscription{}})} } @@ -35,9 +39,15 @@ func (s *streamer) Unwrap() driver.Driver { var _ driver.Driver = (*streamer)(nil) func (s *streamer) Open(ctx context.Context, dsn string) (driver.Driver, error) { + ctx, span := logz.Span(ctx) + defer span.End() + return s.up.Open(ctx, dsn) } func (s *streamer) EventLog(ctx context.Context, streamID string) (driver.EventLog, error) { + ctx, span := logz.Span(ctx) + defer span.End() + l, err := s.up.EventLog(ctx, streamID) return &wrapper{streamID, l, s}, err } @@ -45,6 +55,9 @@ func (s *streamer) EventLog(ctx context.Context, streamID string) (driver.EventL var _ driver.EventStream = (*streamer)(nil) func (s *streamer) Subscribe(ctx context.Context, streamID string, start int64) (driver.Subscription, error) { + ctx, span := logz.Span(ctx) + defer span.End() + events, err := s.up.EventLog(ctx, streamID) if err != nil { return nil, err @@ -62,9 +75,18 @@ func (s *streamer) Subscribe(ctx context.Context, streamID string, start int64) }) } func (s *streamer) Send(ctx context.Context, streamID string, events event.Events) error { + ctx, span := logz.Span(ctx) + defer span.End() + return s.state.Modify(ctx, func(state *state) error { + ctx, span := logz.Span(ctx) + defer span.End() + for _, sub := range state.subscribers[streamID] { err := sub.position.Modify(ctx, func(position *position) error { + _, span := logz.Span(ctx) + defer span.End() + position.size = int64(events.Last().EventMeta().Position - uint64(position.idx)) if position.wait != nil { @@ -83,10 +105,16 @@ func (s *streamer) Send(ctx context.Context, streamID string, events event.Event func (s *streamer) delete(streamID string, sub *subscription) func(context.Context) error { return func(ctx context.Context) error { + ctx, span := logz.Span(ctx) + defer span.End() + if err := ctx.Err(); err != nil { return err } return s.state.Modify(ctx, func(state *state) error { + _, span := logz.Span(ctx) + defer span.End() + lis := state.subscribers[streamID] for i := range lis { if lis[i] == sub { @@ -110,10 +138,16 @@ type wrapper struct { var _ driver.EventLog = (*wrapper)(nil) func (w *wrapper) Read(ctx context.Context, pos int64, count int64) (event.Events, error) { + ctx, span := logz.Span(ctx) + defer span.End() + return w.up.Read(ctx, pos, count) } func (w *wrapper) Append(ctx context.Context, events event.Events, version uint64) (uint64, error) { + ctx, span := logz.Span(ctx) + defer span.End() + i, err := w.up.Append(ctx, events, version) if err != nil { return i, err @@ -122,10 +156,16 @@ func (w *wrapper) Append(ctx context.Context, events event.Events, version uint6 } func (w *wrapper) FirstIndex(ctx context.Context) (uint64, error) { + ctx, span := logz.Span(ctx) + defer span.End() + return w.up.FirstIndex(ctx) } func (w *wrapper) LastIndex(ctx context.Context) (uint64, error) { + ctx, span := logz.Span(ctx) + defer span.End() + return w.up.LastIndex(ctx) } @@ -145,14 +185,24 @@ type subscription struct { } func (s *subscription) Recv(ctx context.Context) bool { + ctx, span := logz.Span(ctx) + defer span.End() + var wait func(context.Context) bool + err := s.position.Modify(ctx, func(position *position) error { + _, span := logz.Span(ctx) + defer span.End() + if position.size == es.AllEvents { return nil } if position.size == 0 { position.wait = make(chan struct{}) wait = func(ctx context.Context) bool { + ctx, span := logz.Span(ctx) + defer span.End() + select { case <-position.wait: @@ -178,8 +228,14 @@ func (s *subscription) Recv(ctx context.Context) bool { return true } func (s *subscription) Events(ctx context.Context) (event.Events, error) { + ctx, span := logz.Span(ctx) + defer span.End() + var events event.Events return events, s.position.Modify(ctx, func(position *position) error { + ctx, span := logz.Span(ctx) + defer span.End() + var err error events, err = s.events.Read(ctx, position.idx, position.size) if err != nil { @@ -193,5 +249,8 @@ func (s *subscription) Events(ctx context.Context) (event.Events, error) { }) } func (s *subscription) Close(ctx context.Context) error { + ctx, span := logz.Span(ctx) + defer span.End() + return s.unsub(ctx) } diff --git a/pkg/msgbus/service.go b/pkg/msgbus/service.go index 5c4dd44..961fddb 100644 --- a/pkg/msgbus/service.go +++ b/pkg/msgbus/service.go @@ -111,6 +111,7 @@ func (s *service) get(w http.ResponseWriter, r *http.Request) { } func (s *service) post(w http.ResponseWriter, r *http.Request) { ctx := r.Context() + ctx, span := logz.Span(ctx) defer span.End() @@ -127,7 +128,8 @@ func (s *service) post(w http.ResponseWriter, r *http.Request) { b, err := io.ReadAll(io.LimitReader(r.Body, 64*1024)) if err != nil { - log.Print(err) + span.RecordError(err) + w.WriteHeader(http.StatusBadRequest) return } @@ -142,9 +144,10 @@ func (s *service) post(w http.ResponseWriter, r *http.Request) { Payload: b, Tags: fields(tags), }) - _, err = s.es.Append(r.Context(), "post-"+name, events) + + _, err = s.es.Append(ctx, "post-"+name, events) if err != nil { - log.Print(err) + span.RecordError(err) w.WriteHeader(http.StatusInternalServerError) return @@ -155,13 +158,14 @@ func (s *service) post(w http.ResponseWriter, r *http.Request) { } m := events.First().EventMeta() - log.Print("POST topic=", name, " tags=", tags, " idx=", m.Position, " id=", m.EventID) + span.AddEvent(fmt.Sprint("POST topic=", name, " tags=", tags, " idx=", m.Position, " id=", m.EventID)) + // log.Print("POST topic=", name, " tags=", tags, " idx=", m.Position, " id=", m.EventID) w.WriteHeader(http.StatusAccepted) if strings.Contains(r.Header.Get("Accept"), "application/json") { w.Header().Add("Content-Type", "application/json") if err = encodeJSON(w, first, events...); err != nil { - log.Print(err) + span.RecordError(err) w.WriteHeader(http.StatusInternalServerError) return @@ -169,6 +173,7 @@ func (s *service) post(w http.ResponseWriter, r *http.Request) { return } + span.AddEvent("finish response") w.Header().Add("Content-Type", "text/plain") fmt.Fprintf(w, "OK %d %s", m.Position, m.EventID)