refactor: move graphql into individual services

This commit is contained in:
Jon Lundy
2022-08-19 12:26:42 -06:00
parent d06e0bac3e
commit 814c974e93
20 changed files with 633 additions and 503 deletions

View File

@@ -1,43 +0,0 @@
scalar Time
scalar Map
type Connection {
paging: PageInfo!
edges: [Edge!]!
}
input PageInput {
idx: Int = 0
count: Int = 30
}
type PageInfo {
next: Boolean!
prev: Boolean!
begin: Int!
end: Int!
}
interface Edge {
id: ID!
}
type Meta {
eventID: String! @goField(name: "getEventID")
streamID: String!
created: Time!
position: Int!
}
directive @goModel(
model: String
models: [String!]
) on OBJECT | INPUT_OBJECT | SCALAR | ENUM | INTERFACE | UNION
directive @goField(
forceResolver: Boolean
name: String
) on INPUT_FIELD_DEFINITION | FIELD_DEFINITION
directive @goTag(
key: String!
value: String
) on INPUT_FIELD_DEFINITION | FIELD_DEFINITION

View File

@@ -1 +0,0 @@
package gql_ev

View File

@@ -1,66 +0,0 @@
package gql_ev
import (
"context"
"encoding/json"
"github.com/sour-is/ev/pkg/es/event"
)
type Edge interface {
IsEdge()
}
type Connection struct {
Paging *PageInfo `json:"paging"`
Edges []Edge `json:"edges"`
}
type PostEvent struct {
ID string `json:"id"`
Payload string `json:"payload"`
Tags []string `json:"tags"`
Meta *event.Meta `json:"meta"`
}
func (PostEvent) IsEdge() {}
func (e *PostEvent) PayloadJSON(ctx context.Context) (m map[string]interface{}, err error) {
err = json.Unmarshal([]byte(e.Payload), &m)
return
}
type PageInfo struct {
Next bool `json:"next"`
Prev bool `json:"prev"`
Begin uint64 `json:"begin"`
End uint64 `json:"end"`
}
type PageInput struct {
Idx *int64 `json:"idx"`
Count *int64 `json:"count"`
}
func (p *PageInput) GetIdx(v int64) int64 {
if p == nil || p.Idx == nil {
return v
}
return *p.Idx
}
func (p *PageInput) GetCount(v int64) int64 {
if p == nil || p.Count == nil {
return v
}
return *p.Count
}
type SaltyUser struct {
Nick string `json:"nick"`
Pubkey string `json:"pubkey"`
Inbox string `json:"inbox"`
}
func (s SaltyUser) Endpoint(ctx context.Context) string {
return "https://ev.sour.is/inbox/" + s.Inbox
}

View File

@@ -1,16 +0,0 @@
extend type Query {
posts(streamID: String! paging: PageInput): Connection!
}
extend type Subscription {
"""after == 0 start from begining, after == -1 start from end"""
postAdded(streamID: String! after: Int! = -1): PostEvent
}
type PostEvent implements Edge {
id: ID!
payload: String!
payloadJSON: Map!
tags: [String!]!
meta: Meta!
}

View File

@@ -2,232 +2,54 @@ package gql_ev
import (
"context"
"crypto/sha256"
"errors"
"fmt"
"strings"
"time"
"net/http"
"reflect"
"github.com/keys-pub/keys"
"github.com/sour-is/ev/app/msgbus"
"github.com/sour-is/ev/app/salty"
"github.com/sour-is/ev/internal/graph/generated"
"github.com/sour-is/ev/internal/logz"
"github.com/sour-is/ev/pkg/domain"
"github.com/sour-is/ev/pkg/es"
"github.com/sour-is/ev/pkg/msgbus"
"go.opentelemetry.io/otel/metric/instrument/syncint64"
"go.uber.org/multierr"
)
type Resolver struct {
es *es.EventStore
Mresolver_posts syncint64.Counter
Mresolver_post_added syncint64.Counter
Mresolver_post_added_event syncint64.Counter
Mresolver_create_salty_user syncint64.Counter
Mresolver_salty_user syncint64.Counter
msgbus.MsgbusResolver
salty.SaltyResolver
}
func New(ctx context.Context, es *es.EventStore) (*Resolver, error) {
ctx, span := logz.Span(ctx)
func New(ctx context.Context, m msgbus.MsgbusResolver, s salty.SaltyResolver) (*Resolver, error) {
_, span := logz.Span(ctx)
defer span.End()
m := logz.Meter(ctx)
r := &Resolver{m, s}
var err, errs error
r := &Resolver{es: es}
r.Mresolver_posts, err = m.SyncInt64().Counter("resolver_posts")
errs = multierr.Append(errs, err)
r.Mresolver_post_added, err = m.SyncInt64().Counter("resolver_post_added")
errs = multierr.Append(errs, err)
r.Mresolver_post_added_event, err = m.SyncInt64().Counter("resolver_post_added")
errs = multierr.Append(errs, err)
r.Mresolver_create_salty_user, err = m.SyncInt64().Counter("resolver_create_salty_user")
errs = multierr.Append(errs, err)
r.Mresolver_salty_user, err = m.SyncInt64().Counter("resolver_salty_user")
errs = multierr.Append(errs, err)
span.RecordError(err)
return r, errs
return r, nil
}
// Posts is the resolver for the events field.
func (r *Resolver) Posts(ctx context.Context, streamID string, paging *PageInput) (*Connection, error) {
ctx, span := logz.Span(ctx)
defer span.End()
// Query returns generated.QueryResolver implementation.
func (r *Resolver) Query() generated.QueryResolver { return r }
r.Mresolver_posts.Add(ctx, 1)
// Query returns generated.QueryResolver implementation.
func (r *Resolver) Mutation() generated.MutationResolver { return r }
lis, err := r.es.Read(ctx, streamID, paging.GetIdx(0), paging.GetCount(30))
if err != nil {
span.RecordError(err)
return nil, err
}
// Subscription returns generated.SubscriptionResolver implementation.
func (r *Resolver) Subscription() generated.SubscriptionResolver { return r }
edges := make([]Edge, 0, len(lis))
for i := range lis {
span.AddEvent(fmt.Sprint("post ", i, " of ", len(lis)))
e := lis[i]
m := e.EventMeta()
post, ok := e.(*msgbus.PostEvent)
if !ok {
// ChainMiddlewares will check all embeded resolvers for a GetMiddleware func and add to handler.
func (r *Resolver) ChainMiddlewares(h http.Handler) http.Handler {
v := reflect.ValueOf(r) // Get reflected value of *Resolver
v = reflect.Indirect(v) // Get the pointed value (returns a zero value on nil)
n := v.NumField() // Get number of fields to iterate over.
for i := 0; i < n; i++ {
f := v.Field(i)
if !f.CanInterface() { // Skip non-interface types.
continue
}
edges = append(edges, PostEvent{
ID: lis[i].EventMeta().EventID.String(),
Payload: string(post.Payload),
Tags: post.Tags,
Meta: &m,
})
}
var first, last uint64
if first, err = r.es.FirstIndex(ctx, streamID); err != nil {
span.RecordError(err)
return nil, err
}
if last, err = r.es.LastIndex(ctx, streamID); err != nil {
span.RecordError(err)
return nil, err
}
return &Connection{
Paging: &PageInfo{
Next: lis.Last().EventMeta().Position < last,
Prev: lis.First().EventMeta().Position > first,
Begin: lis.First().EventMeta().Position,
End: lis.Last().EventMeta().Position,
},
Edges: edges,
}, nil
}
func (r *Resolver) PostAdded(ctx context.Context, streamID string, after int64) (<-chan *PostEvent, error) {
ctx, span := logz.Span(ctx)
defer span.End()
r.Mresolver_post_added.Add(ctx, 1)
es := r.es.EventStream()
if es == nil {
return nil, fmt.Errorf("EventStore does not implement streaming")
}
sub, err := es.Subscribe(ctx, streamID, after)
if err != nil {
span.RecordError(err)
return nil, err
}
ch := make(chan *PostEvent)
go func() {
ctx, span := logz.Span(ctx)
defer span.End()
defer func() {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
err := sub.Close(ctx)
span.RecordError(err)
}()
for sub.Recv(ctx) {
events, err := sub.Events(ctx)
if err != nil {
span.RecordError(err)
break
}
span.AddEvent(fmt.Sprintf("received %d events", len(events)))
r.Mresolver_post_added_event.Add(ctx, int64(len(events)))
for _, e := range events {
m := e.EventMeta()
if p, ok := e.(*msgbus.PostEvent); ok {
select {
case ch <- &PostEvent{
ID: m.EventID.String(),
Payload: string(p.Payload),
Tags: p.Tags,
Meta: &m,
}:
continue
case <-ctx.Done():
return
}
}
}
if iface, ok := f.Interface().(interface {
GetMiddleware() func(http.Handler) http.Handler
}); ok {
h = iface.GetMiddleware()(h) // Append only items that fulfill the interface.
}
}()
return ch, nil
}
func (r *Resolver) CreateSaltyUser(ctx context.Context, nick string, pub string) (*SaltyUser, error) {
ctx, span := logz.Span(ctx)
defer span.End()
r.Mresolver_create_salty_user.Add(ctx, 1)
streamID := fmt.Sprintf("saltyuser-%x", sha256.Sum256([]byte(strings.ToLower(nick))))
span.AddEvent(streamID)
key, err := keys.NewEdX25519PublicKeyFromID(keys.ID(pub))
if err != nil {
span.RecordError(err)
return nil, err
}
a, err := es.Create(ctx, r.es, streamID, func(ctx context.Context, agg *domain.SaltyUser) error {
return agg.OnUserRegister(nick, key)
})
switch {
case errors.Is(err, es.ErrShouldNotExist):
span.RecordError(err)
return nil, fmt.Errorf("user exists")
case err != nil:
span.RecordError(err)
return nil, fmt.Errorf("internal error")
}
return &SaltyUser{
Nick: nick,
Pubkey: pub,
Inbox: a.Inbox.String(),
}, nil
}
func (r *Resolver) SaltyUser(ctx context.Context, nick string) (*SaltyUser, error) {
ctx, span := logz.Span(ctx)
defer span.End()
r.Mresolver_salty_user.Add(ctx, 1)
streamID := fmt.Sprintf("saltyuser-%x", sha256.Sum256([]byte(strings.ToLower(nick))))
span.AddEvent(streamID)
a, err := es.Update(ctx, r.es, streamID, func(ctx context.Context, agg *domain.SaltyUser) error { return nil })
switch {
case errors.Is(err, es.ErrShouldExist):
span.RecordError(err)
return nil, fmt.Errorf("user not found")
case err != nil:
span.RecordError(err)
return nil, fmt.Errorf("%w internal error", err)
}
return &SaltyUser{
Nick: nick,
Pubkey: a.Pubkey.String(),
Inbox: a.Inbox.String(),
}, err
return h
}

View File

@@ -1,14 +0,0 @@
extend type Query {
saltyUser(nick: String!): SaltyUser
}
extend type Mutation {
createSaltyUser(nick: String! pubkey: String!): SaltyUser
}
type SaltyUser {
nick: String!
pubkey: String!
inbox: String!
endpoint: String!
}