ev/api/gql_ev/resolver.go

114 lines
2.3 KiB
Go
Raw Normal View History

2022-08-07 11:55:49 -06:00
package gql_ev
import (
"context"
"fmt"
"time"
2022-08-07 11:55:49 -06:00
"github.com/sour-is/ev/pkg/es"
"github.com/sour-is/ev/pkg/msgbus"
2022-08-07 11:55:49 -06:00
)
// This file will not be regenerated automatically.
//
// It serves as dependency injection for your app, add any dependencies you require here.
type Resolver struct {
es *es.EventStore
2022-08-07 11:55:49 -06:00
}
func New(es *es.EventStore) *Resolver {
2022-08-07 11:55:49 -06:00
return &Resolver{es}
}
// Posts is the resolver for the events field.
func (r *Resolver) Posts(ctx context.Context, streamID string, paging *PageInput) (*Connection, error) {
2022-08-07 11:55:49 -06:00
lis, err := r.es.Read(ctx, streamID, paging.GetIdx(0), paging.GetCount(30))
if err != nil {
return nil, err
}
edges := make([]Edge, 0, len(lis))
for i := range lis {
e := lis[i]
m := e.EventMeta()
post, ok := e.(*msgbus.PostEvent)
2022-08-07 11:55:49 -06:00
if !ok {
continue
}
edges = append(edges, PostEvent{
2022-08-07 11:55:49 -06:00
ID: lis[i].EventMeta().EventID.String(),
Payload: string(post.Payload),
Tags: post.Tags,
Meta: &m,
})
}
var first, last uint64
if first, err = r.es.FirstIndex(ctx, streamID); err != nil {
return nil, err
}
if last, err = r.es.LastIndex(ctx, streamID); err != nil {
return nil, err
}
return &Connection{
Paging: &PageInfo{
Next: lis.Last().EventMeta().Position < last,
Prev: lis.First().EventMeta().Position > first,
Begin: lis.First().EventMeta().Position,
End: lis.Last().EventMeta().Position,
},
Edges: edges,
}, nil
}
2022-08-10 10:09:58 -06:00
func (r *Resolver) PostAdded(ctx context.Context, streamID string, after int64) (<-chan *PostEvent, error) {
es := r.es.EventStream()
if es == nil {
return nil, fmt.Errorf("EventStore does not implement streaming")
}
2022-08-10 10:09:58 -06:00
sub, err := es.Subscribe(ctx, streamID, after)
if err != nil {
return nil, err
}
ch := make(chan *PostEvent)
go func() {
defer func() {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
2022-08-10 10:09:58 -06:00
sub.Close(ctx)
}()
for sub.Recv(ctx) {
events, err := sub.Events(ctx)
if err != nil {
break
}
for _, e := range events {
m := e.EventMeta()
if p, ok := e.(*msgbus.PostEvent); ok {
select {
case ch <- &PostEvent{
ID: m.EventID.String(),
Payload: string(p.Payload),
Tags: p.Tags,
Meta: &m,
}:
continue
case <-ctx.Done():
return
}
}
}
}
}()
return ch, nil
}