ev/api/gql_ev/resolver.go
2022-08-13 13:34:13 -06:00

141 lines
3.0 KiB
Go

package gql_ev
import (
"context"
"fmt"
"time"
"github.com/sour-is/ev/internal/logz"
"github.com/sour-is/ev/pkg/es"
"github.com/sour-is/ev/pkg/msgbus"
"go.opentelemetry.io/otel/metric/instrument/syncint64"
"go.uber.org/multierr"
)
type Resolver struct {
es *es.EventStore
Mresolver_posts syncint64.Counter
Mresolver_post_added syncint64.Counter
Mresolver_post_added_event syncint64.Counter
}
func New(es *es.EventStore) (*Resolver, error) {
m := logz.Meter(context.Background())
var errs error
Mresolver_posts, err := m.SyncInt64().Counter("resolver_posts")
errs = multierr.Append(errs, err)
Mresolver_post_added, err := m.SyncInt64().Counter("resolver_post_added")
errs = multierr.Append(errs, err)
Mresolver_post_added_event, err := m.SyncInt64().Counter("resolver_post_added")
errs = multierr.Append(errs, err)
return &Resolver{
es,
Mresolver_posts,
Mresolver_post_added,
Mresolver_post_added_event,
}, errs
}
// Posts is the resolver for the events field.
func (r *Resolver) Posts(ctx context.Context, streamID string, paging *PageInput) (*Connection, error) {
r.Mresolver_posts.Add(ctx, 1)
lis, err := r.es.Read(ctx, streamID, paging.GetIdx(0), paging.GetCount(30))
if err != nil {
return nil, err
}
edges := make([]Edge, 0, len(lis))
for i := range lis {
e := lis[i]
m := e.EventMeta()
post, ok := e.(*msgbus.PostEvent)
if !ok {
continue
}
edges = append(edges, PostEvent{
ID: lis[i].EventMeta().EventID.String(),
Payload: string(post.Payload),
Tags: post.Tags,
Meta: &m,
})
}
var first, last uint64
if first, err = r.es.FirstIndex(ctx, streamID); err != nil {
return nil, err
}
if last, err = r.es.LastIndex(ctx, streamID); err != nil {
return nil, err
}
return &Connection{
Paging: &PageInfo{
Next: lis.Last().EventMeta().Position < last,
Prev: lis.First().EventMeta().Position > first,
Begin: lis.First().EventMeta().Position,
End: lis.Last().EventMeta().Position,
},
Edges: edges,
}, nil
}
func (r *Resolver) PostAdded(ctx context.Context, streamID string, after int64) (<-chan *PostEvent, error) {
r.Mresolver_post_added.Add(ctx, 1)
es := r.es.EventStream()
if es == nil {
return nil, fmt.Errorf("EventStore does not implement streaming")
}
sub, err := es.Subscribe(ctx, streamID, after)
if err != nil {
return nil, err
}
ch := make(chan *PostEvent)
go func() {
defer func() {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
sub.Close(ctx)
}()
for sub.Recv(ctx) {
events, err := sub.Events(ctx)
if err != nil {
break
}
r.Mresolver_post_added_event.Add(ctx, int64(len(events)))
for _, e := range events {
m := e.EventMeta()
if p, ok := e.(*msgbus.PostEvent); ok {
select {
case ch <- &PostEvent{
ID: m.EventID.String(),
Payload: string(p.Payload),
Tags: p.Tags,
Meta: &m,
}:
continue
case <-ctx.Done():
return
}
}
}
}
}()
return ch, nil
}