fix: improve paging and subscriptions

This commit is contained in:
Jon Lundy
2022-08-10 10:09:58 -06:00
parent 0642879c07
commit 72e7d5f265
11 changed files with 118 additions and 81 deletions

View File

@@ -2,7 +2,6 @@ package streamer
import (
"context"
"log"
"github.com/sour-is/ev/pkg/es"
"github.com/sour-is/ev/pkg/es/driver"
@@ -45,26 +44,24 @@ func (s *streamer) EventLog(ctx context.Context, streamID string) (driver.EventL
var _ driver.EventStream = (*streamer)(nil)
func (s *streamer) Subscribe(ctx context.Context, streamID string) (driver.Subscription, error) {
log.Println("subscribe", streamID)
func (s *streamer) Subscribe(ctx context.Context, streamID string, start int64) (driver.Subscription, error) {
events, err := s.up.EventLog(ctx, streamID)
if err != nil {
return nil, err
}
sub := &subscription{topic: streamID, events: events}
sub.position = locker.New(&position{
idx: start,
size: es.AllEvents,
})
sub.unsub = s.delete(streamID, sub)
return sub, s.state.Modify(ctx, func(state *state) error {
state.subscribers[streamID] = append(state.subscribers[streamID], sub)
log.Println("subs=", len(state.subscribers[streamID]))
return nil
})
}
func (s *streamer) Send(ctx context.Context, streamID string, events event.Events) error {
log.Println("send", streamID, len(events))
return s.state.Modify(ctx, func(state *state) error {
for _, sub := range state.subscribers[streamID] {
return sub.position.Modify(ctx, func(position *position) error {
@@ -83,7 +80,6 @@ func (s *streamer) Send(ctx context.Context, streamID string, events event.Event
func (s *streamer) delete(streamID string, sub *subscription) func(context.Context) error {
return func(ctx context.Context) error {
log.Println("unsub", streamID)
if err := ctx.Err(); err != nil {
return err
}
@@ -93,7 +89,6 @@ func (s *streamer) delete(streamID string, sub *subscription) func(context.Conte
if lis[i] == sub {
lis[i] = lis[len(lis)-1]
state.subscribers[streamID] = lis[:len(lis)-1]
log.Println("subs=", len(state.subscribers[streamID]))
return nil
}
@@ -148,7 +143,6 @@ type subscription struct {
func (s *subscription) Recv(ctx context.Context) bool {
var wait func(context.Context) bool
log.Println("recv more")
err := s.position.Modify(ctx, func(position *position) error {
if position.size == es.AllEvents {
return nil
@@ -156,14 +150,11 @@ func (s *subscription) Recv(ctx context.Context) bool {
if position.size == 0 {
position.wait = make(chan struct{})
wait = func(ctx context.Context) bool {
log.Println("waiting", s.topic)
select {
case <-position.wait:
log.Println("got some")
return true
case <-ctx.Done():
log.Println("got cancel")
return false
}
@@ -185,15 +176,12 @@ func (s *subscription) Recv(ctx context.Context) bool {
}
func (s *subscription) Events(ctx context.Context) (event.Events, error) {
var events event.Events
log.Println("get events")
return events, s.position.Modify(ctx, func(position *position) error {
var err error
events, err = s.events.Read(ctx, int64(position.idx), position.size)
log.Printf("got events=%d %#v", len(events), position)
position.size = int64(len(events))
if len(events) > 0 {
position.idx = int64(events.First().EventMeta().Position - 1)
log.Println(position, events.First())
}
return err
})