diff --git a/Makefile b/Makefile index 27f4b3a..c7912bb 100644 --- a/Makefile +++ b/Makefile @@ -1,7 +1,7 @@ export EV_DATA = mem: -# export EV_HTTP = :8080 +export EV_HTTP = :8080 -run: +run: gen go run . test: go test -cover -race ./... @@ -9,10 +9,15 @@ test: GQLDIR=api/gql_ev GQLS=$(wildcard $(GQLDIR)/*.go) $(wildcard $(GQLDIR)/*.graphqls) gqlgen.yml -GQLSRC=internal/ev/graph/generated/generated.go +GQLSRC=internal/graph/generated/generated.go gen: gql gql: $(GQLSRC) $(GQLSRC): $(GQLS) - go get github.com/99designs/gqlgen@latest - go run github.com/99designs/gqlgen \ No newline at end of file +ifeq (, $(shell which gqlgen)) + go install github.com/99designs/gqlgen@latest +endif + gqlgen + +load: + watch -n .1 "http POST localhost:8080/event/asdf/test a=b one=1 two:='{\"v\":2}' | jq" \ No newline at end of file diff --git a/api/gql_ev/schema.graphqls b/api/gql_ev/common.graphqls similarity index 61% rename from api/gql_ev/schema.graphqls rename to api/gql_ev/common.graphqls index a13c974..89a5502 100644 --- a/api/gql_ev/schema.graphqls +++ b/api/gql_ev/common.graphqls @@ -1,6 +1,5 @@ -extend type Query { - events(streamID: String! paging: PageInput): Connection! -} +scalar Time +scalar Map type Connection { paging: PageInfo! @@ -21,26 +20,24 @@ interface Edge { id: ID! } -type Event implements Edge { - id: ID! - - payload: String! - tags: [String!]! - - meta: Meta! -} - type Meta { - id: String! - + eventID: String! @goField(name: "getEventID") streamID: String! created: Time! position: Int! } -scalar Time +directive @goModel( + model: String + models: [String!] +) on OBJECT | INPUT_OBJECT | SCALAR | ENUM | INTERFACE | UNION directive @goField( forceResolver: Boolean name: String ) on INPUT_FIELD_DEFINITION | FIELD_DEFINITION + +directive @goTag( + key: String! + value: String +) on INPUT_FIELD_DEFINITION | FIELD_DEFINITION \ No newline at end of file diff --git a/api/gql_ev/models.go b/api/gql_ev/models.go index 8f4819e..ed78c4c 100644 --- a/api/gql_ev/models.go +++ b/api/gql_ev/models.go @@ -1,6 +1,11 @@ package gql_ev -import "github.com/sour-is/ev/pkg/es/event" +import ( + "context" + "encoding/json" + + "github.com/sour-is/ev/pkg/es/event" +) type Edge interface { IsEdge() @@ -11,14 +16,19 @@ type Connection struct { Edges []Edge `json:"edges"` } -type Event struct { +type PostEvent struct { ID string `json:"id"` Payload string `json:"payload"` Tags []string `json:"tags"` Meta *event.Meta `json:"meta"` } -func (Event) IsEdge() {} +func (PostEvent) IsEdge() {} + +func (e *PostEvent) PayloadJSON(ctx context.Context) (m map[string]interface{}, err error) { + err = json.Unmarshal([]byte(e.Payload), &m) + return +} type PageInfo struct { Next bool `json:"next"` diff --git a/api/gql_ev/msgbus.graphqls b/api/gql_ev/msgbus.graphqls new file mode 100644 index 0000000..8c1c59c --- /dev/null +++ b/api/gql_ev/msgbus.graphqls @@ -0,0 +1,15 @@ +extend type Query { + posts(streamID: String! paging: PageInput): Connection! +} +extend type Subscription { + postAdded(streamID: String!): PostEvent +} +type PostEvent implements Edge { + id: ID! + + payload: String! + payloadJSON: Map! + tags: [String!]! + + meta: Meta! +} \ No newline at end of file diff --git a/api/gql_ev/resolver.go b/api/gql_ev/resolver.go index 614cfe4..a75aa66 100644 --- a/api/gql_ev/resolver.go +++ b/api/gql_ev/resolver.go @@ -2,9 +2,12 @@ package gql_ev import ( "context" + "fmt" + "log" + "time" - "github.com/sour-is/ev/pkg/es/driver" - "github.com/sour-is/ev/pkg/es/service" + "github.com/sour-is/ev/pkg/es" + "github.com/sour-is/ev/pkg/msgbus" ) // This file will not be regenerated automatically. @@ -12,15 +15,15 @@ import ( // It serves as dependency injection for your app, add any dependencies you require here. type Resolver struct { - es driver.EventStore + es *es.EventStore } -func New(es driver.EventStore) *Resolver { +func New(es *es.EventStore) *Resolver { return &Resolver{es} } -// Events is the resolver for the events field. -func (r *Resolver) Events(ctx context.Context, streamID string, paging *PageInput) (*Connection, error) { +// Posts is the resolver for the events field. +func (r *Resolver) Posts(ctx context.Context, streamID string, paging *PageInput) (*Connection, error) { lis, err := r.es.Read(ctx, streamID, paging.GetIdx(0), paging.GetCount(30)) if err != nil { return nil, err @@ -31,12 +34,12 @@ func (r *Resolver) Events(ctx context.Context, streamID string, paging *PageInpu e := lis[i] m := e.EventMeta() - post, ok := e.(*service.PostEvent) + post, ok := e.(*msgbus.PostEvent) if !ok { continue } - edges = append(edges, Event{ + edges = append(edges, PostEvent{ ID: lis[i].EventMeta().EventID.String(), Payload: string(post.Payload), Tags: post.Tags, @@ -62,3 +65,50 @@ func (r *Resolver) Events(ctx context.Context, streamID string, paging *PageInpu Edges: edges, }, nil } + +func (r *Resolver) PostAdded(ctx context.Context, streamID string) (<-chan *PostEvent, error) { + es := r.es.EventStream() + if es == nil { + return nil, fmt.Errorf("EventStore does not implement streaming") + } + + sub, err := es.Subscribe(ctx, streamID) + if err != nil { + return nil, err + } + + ch := make(chan *PostEvent) + + go func() { + defer func() { + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + log.Print(sub.Close(ctx)) + }() + + for sub.Recv(ctx) { + events, err := sub.Events(ctx) + if err != nil { + break + } + for _, e := range events { + m := e.EventMeta() + if p, ok := e.(*msgbus.PostEvent); ok { + select { + case ch <- &PostEvent{ + ID: m.EventID.String(), + Payload: string(p.Payload), + Tags: p.Tags, + Meta: &m, + }: + continue + case <-ctx.Done(): + return + } + } + } + } + }() + + return ch, nil +} diff --git a/internal/graph/generated/generated.go b/internal/graph/generated/generated.go index a64e596..693a1a1 100644 --- a/internal/graph/generated/generated.go +++ b/internal/graph/generated/generated.go @@ -7,6 +7,7 @@ import ( "context" "errors" "fmt" + "io" "strconv" "sync" "sync/atomic" @@ -40,6 +41,7 @@ type Config struct { type ResolverRoot interface { Query() QueryResolver + Subscription() SubscriptionResolver } type DirectiveRoot struct { @@ -51,18 +53,11 @@ type ComplexityRoot struct { Paging func(childComplexity int) int } - Event struct { - ID func(childComplexity int) int - Meta func(childComplexity int) int - Payload func(childComplexity int) int - Tags func(childComplexity int) int - } - Meta struct { - Created func(childComplexity int) int - ID func(childComplexity int) int - Position func(childComplexity int) int - StreamID func(childComplexity int) int + Created func(childComplexity int) int + GetEventID func(childComplexity int) int + Position func(childComplexity int) int + StreamID func(childComplexity int) int } PageInfo struct { @@ -72,18 +67,33 @@ type ComplexityRoot struct { Prev func(childComplexity int) int } + PostEvent struct { + ID func(childComplexity int) int + Meta func(childComplexity int) int + Payload func(childComplexity int) int + PayloadJSON func(childComplexity int) int + Tags func(childComplexity int) int + } + Query struct { - Events func(childComplexity int, streamID string, paging *gql_ev.PageInput) int + Posts func(childComplexity int, streamID string, paging *gql_ev.PageInput) int __resolve__service func(childComplexity int) int } + Subscription struct { + PostAdded func(childComplexity int, streamID string) int + } + _Service struct { SDL func(childComplexity int) int } } type QueryResolver interface { - Events(ctx context.Context, streamID string, paging *gql_ev.PageInput) (*gql_ev.Connection, error) + Posts(ctx context.Context, streamID string, paging *gql_ev.PageInput) (*gql_ev.Connection, error) +} +type SubscriptionResolver interface { + PostAdded(ctx context.Context, streamID string) (<-chan *gql_ev.PostEvent, error) } type executableSchema struct { @@ -115,34 +125,6 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in return e.complexity.Connection.Paging(childComplexity), true - case "Event.id": - if e.complexity.Event.ID == nil { - break - } - - return e.complexity.Event.ID(childComplexity), true - - case "Event.meta": - if e.complexity.Event.Meta == nil { - break - } - - return e.complexity.Event.Meta(childComplexity), true - - case "Event.payload": - if e.complexity.Event.Payload == nil { - break - } - - return e.complexity.Event.Payload(childComplexity), true - - case "Event.tags": - if e.complexity.Event.Tags == nil { - break - } - - return e.complexity.Event.Tags(childComplexity), true - case "Meta.created": if e.complexity.Meta.Created == nil { break @@ -150,12 +132,12 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in return e.complexity.Meta.Created(childComplexity), true - case "Meta.id": - if e.complexity.Meta.ID == nil { + case "Meta.eventID": + if e.complexity.Meta.GetEventID == nil { break } - return e.complexity.Meta.ID(childComplexity), true + return e.complexity.Meta.GetEventID(childComplexity), true case "Meta.position": if e.complexity.Meta.Position == nil { @@ -199,17 +181,52 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in return e.complexity.PageInfo.Prev(childComplexity), true - case "Query.events": - if e.complexity.Query.Events == nil { + case "PostEvent.id": + if e.complexity.PostEvent.ID == nil { break } - args, err := ec.field_Query_events_args(context.TODO(), rawArgs) + return e.complexity.PostEvent.ID(childComplexity), true + + case "PostEvent.meta": + if e.complexity.PostEvent.Meta == nil { + break + } + + return e.complexity.PostEvent.Meta(childComplexity), true + + case "PostEvent.payload": + if e.complexity.PostEvent.Payload == nil { + break + } + + return e.complexity.PostEvent.Payload(childComplexity), true + + case "PostEvent.payloadJSON": + if e.complexity.PostEvent.PayloadJSON == nil { + break + } + + return e.complexity.PostEvent.PayloadJSON(childComplexity), true + + case "PostEvent.tags": + if e.complexity.PostEvent.Tags == nil { + break + } + + return e.complexity.PostEvent.Tags(childComplexity), true + + case "Query.posts": + if e.complexity.Query.Posts == nil { + break + } + + args, err := ec.field_Query_posts_args(context.TODO(), rawArgs) if err != nil { return 0, false } - return e.complexity.Query.Events(childComplexity, args["streamID"].(string), args["paging"].(*gql_ev.PageInput)), true + return e.complexity.Query.Posts(childComplexity, args["streamID"].(string), args["paging"].(*gql_ev.PageInput)), true case "Query._service": if e.complexity.Query.__resolve__service == nil { @@ -218,6 +235,18 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in return e.complexity.Query.__resolve__service(childComplexity), true + case "Subscription.postAdded": + if e.complexity.Subscription.PostAdded == nil { + break + } + + args, err := ec.field_Subscription_postAdded_args(context.TODO(), rawArgs) + if err != nil { + return 0, false + } + + return e.complexity.Subscription.PostAdded(childComplexity, args["streamID"].(string)), true + case "_Service.sdl": if e.complexity._Service.SDL == nil { break @@ -249,6 +278,23 @@ func (e *executableSchema) Exec(ctx context.Context) graphql.ResponseHandler { var buf bytes.Buffer data.MarshalGQL(&buf) + return &graphql.Response{ + Data: buf.Bytes(), + } + } + case ast.Subscription: + next := ec._Subscription(ctx, rc.Operation.SelectionSet) + + var buf bytes.Buffer + return func(ctx context.Context) *graphql.Response { + buf.Reset() + data := next(ctx) + + if data == nil { + return nil + } + data.MarshalGQL(&buf) + return &graphql.Response{ Data: buf.Bytes(), } @@ -279,9 +325,8 @@ func (ec *executionContext) introspectType(name string) (*introspection.Type, er } var sources = []*ast.Source{ - {Name: "../../../api/gql_ev/schema.graphqls", Input: `extend type Query { - events(streamID: String! paging: PageInput): Connection! -} + {Name: "../../../api/gql_ev/common.graphqls", Input: `scalar Time +scalar Map type Connection { paging: PageInfo! @@ -302,30 +347,42 @@ interface Edge { id: ID! } -type Event implements Edge { - id: ID! - - payload: String! - tags: [String!]! - - meta: Meta! -} - type Meta { - id: String! - + eventID: String! @goField(name: "getEventID") streamID: String! created: Time! position: Int! } -scalar Time +directive @goModel( + model: String + models: [String!] +) on OBJECT | INPUT_OBJECT | SCALAR | ENUM | INTERFACE | UNION directive @goField( forceResolver: Boolean name: String ) on INPUT_FIELD_DEFINITION | FIELD_DEFINITION -`, BuiltIn: false}, + +directive @goTag( + key: String! + value: String +) on INPUT_FIELD_DEFINITION | FIELD_DEFINITION`, BuiltIn: false}, + {Name: "../../../api/gql_ev/msgbus.graphqls", Input: `extend type Query { + posts(streamID: String! paging: PageInput): Connection! +} +extend type Subscription { + postAdded(streamID: String!): PostEvent +} +type PostEvent implements Edge { + id: ID! + + payload: String! + payloadJSON: Map! + tags: [String!]! + + meta: Meta! +}`, BuiltIn: false}, {Name: "../../../federation/directives.graphql", Input: ` scalar _Any scalar _FieldSet @@ -368,7 +425,7 @@ func (ec *executionContext) field_Query___type_args(ctx context.Context, rawArgs return args, nil } -func (ec *executionContext) field_Query_events_args(ctx context.Context, rawArgs map[string]interface{}) (map[string]interface{}, error) { +func (ec *executionContext) field_Query_posts_args(ctx context.Context, rawArgs map[string]interface{}) (map[string]interface{}, error) { var err error args := map[string]interface{}{} var arg0 string @@ -392,6 +449,21 @@ func (ec *executionContext) field_Query_events_args(ctx context.Context, rawArgs return args, nil } +func (ec *executionContext) field_Subscription_postAdded_args(ctx context.Context, rawArgs map[string]interface{}) (map[string]interface{}, error) { + var err error + args := map[string]interface{}{} + var arg0 string + if tmp, ok := rawArgs["streamID"]; ok { + ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("streamID")) + arg0, err = ec.unmarshalNString2string(ctx, tmp) + if err != nil { + return nil, err + } + } + args["streamID"] = arg0 + return args, nil +} + func (ec *executionContext) field___Type_enumValues_args(ctx context.Context, rawArgs map[string]interface{}) (map[string]interface{}, error) { var err error args := map[string]interface{}{} @@ -528,8 +600,8 @@ func (ec *executionContext) fieldContext_Connection_edges(ctx context.Context, f return fc, nil } -func (ec *executionContext) _Event_id(ctx context.Context, field graphql.CollectedField, obj *gql_ev.Event) (ret graphql.Marshaler) { - fc, err := ec.fieldContext_Event_id(ctx, field) +func (ec *executionContext) _Meta_eventID(ctx context.Context, field graphql.CollectedField, obj *event.Meta) (ret graphql.Marshaler) { + fc, err := ec.fieldContext_Meta_eventID(ctx, field) if err != nil { return graphql.Null } @@ -542,51 +614,7 @@ func (ec *executionContext) _Event_id(ctx context.Context, field graphql.Collect }() resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { ctx = rctx // use context from middleware stack in children - return obj.ID, nil - }) - if err != nil { - ec.Error(ctx, err) - return graphql.Null - } - if resTmp == nil { - if !graphql.HasFieldError(ctx, fc) { - ec.Errorf(ctx, "must not be null") - } - return graphql.Null - } - res := resTmp.(string) - fc.Result = res - return ec.marshalNID2string(ctx, field.Selections, res) -} - -func (ec *executionContext) fieldContext_Event_id(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { - fc = &graphql.FieldContext{ - Object: "Event", - Field: field, - IsMethod: false, - IsResolver: false, - Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { - return nil, errors.New("field of type ID does not have child fields") - }, - } - return fc, nil -} - -func (ec *executionContext) _Event_payload(ctx context.Context, field graphql.CollectedField, obj *gql_ev.Event) (ret graphql.Marshaler) { - fc, err := ec.fieldContext_Event_payload(ctx, field) - if err != nil { - return graphql.Null - } - ctx = graphql.WithFieldContext(ctx, fc) - defer func() { - if r := recover(); r != nil { - ec.Error(ctx, ec.Recover(ctx, r)) - ret = graphql.Null - } - }() - resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { - ctx = rctx // use context from middleware stack in children - return obj.Payload, nil + return obj.GetEventID(), nil }) if err != nil { ec.Error(ctx, err) @@ -603,149 +631,7 @@ func (ec *executionContext) _Event_payload(ctx context.Context, field graphql.Co return ec.marshalNString2string(ctx, field.Selections, res) } -func (ec *executionContext) fieldContext_Event_payload(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { - fc = &graphql.FieldContext{ - Object: "Event", - Field: field, - IsMethod: false, - IsResolver: false, - Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { - return nil, errors.New("field of type String does not have child fields") - }, - } - return fc, nil -} - -func (ec *executionContext) _Event_tags(ctx context.Context, field graphql.CollectedField, obj *gql_ev.Event) (ret graphql.Marshaler) { - fc, err := ec.fieldContext_Event_tags(ctx, field) - if err != nil { - return graphql.Null - } - ctx = graphql.WithFieldContext(ctx, fc) - defer func() { - if r := recover(); r != nil { - ec.Error(ctx, ec.Recover(ctx, r)) - ret = graphql.Null - } - }() - resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { - ctx = rctx // use context from middleware stack in children - return obj.Tags, nil - }) - if err != nil { - ec.Error(ctx, err) - return graphql.Null - } - if resTmp == nil { - if !graphql.HasFieldError(ctx, fc) { - ec.Errorf(ctx, "must not be null") - } - return graphql.Null - } - res := resTmp.([]string) - fc.Result = res - return ec.marshalNString2ᚕstringᚄ(ctx, field.Selections, res) -} - -func (ec *executionContext) fieldContext_Event_tags(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { - fc = &graphql.FieldContext{ - Object: "Event", - Field: field, - IsMethod: false, - IsResolver: false, - Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { - return nil, errors.New("field of type String does not have child fields") - }, - } - return fc, nil -} - -func (ec *executionContext) _Event_meta(ctx context.Context, field graphql.CollectedField, obj *gql_ev.Event) (ret graphql.Marshaler) { - fc, err := ec.fieldContext_Event_meta(ctx, field) - if err != nil { - return graphql.Null - } - ctx = graphql.WithFieldContext(ctx, fc) - defer func() { - if r := recover(); r != nil { - ec.Error(ctx, ec.Recover(ctx, r)) - ret = graphql.Null - } - }() - resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { - ctx = rctx // use context from middleware stack in children - return obj.Meta, nil - }) - if err != nil { - ec.Error(ctx, err) - return graphql.Null - } - if resTmp == nil { - if !graphql.HasFieldError(ctx, fc) { - ec.Errorf(ctx, "must not be null") - } - return graphql.Null - } - res := resTmp.(*event.Meta) - fc.Result = res - return ec.marshalNMeta2ᚖgithubᚗcomᚋsourᚑisᚋevᚋpkgᚋesᚋeventᚐMeta(ctx, field.Selections, res) -} - -func (ec *executionContext) fieldContext_Event_meta(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { - fc = &graphql.FieldContext{ - Object: "Event", - Field: field, - IsMethod: false, - IsResolver: false, - Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { - switch field.Name { - case "id": - return ec.fieldContext_Meta_id(ctx, field) - case "streamID": - return ec.fieldContext_Meta_streamID(ctx, field) - case "created": - return ec.fieldContext_Meta_created(ctx, field) - case "position": - return ec.fieldContext_Meta_position(ctx, field) - } - return nil, fmt.Errorf("no field named %q was found under type Meta", field.Name) - }, - } - return fc, nil -} - -func (ec *executionContext) _Meta_id(ctx context.Context, field graphql.CollectedField, obj *event.Meta) (ret graphql.Marshaler) { - fc, err := ec.fieldContext_Meta_id(ctx, field) - if err != nil { - return graphql.Null - } - ctx = graphql.WithFieldContext(ctx, fc) - defer func() { - if r := recover(); r != nil { - ec.Error(ctx, ec.Recover(ctx, r)) - ret = graphql.Null - } - }() - resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { - ctx = rctx // use context from middleware stack in children - return obj.ID(), nil - }) - if err != nil { - ec.Error(ctx, err) - return graphql.Null - } - if resTmp == nil { - if !graphql.HasFieldError(ctx, fc) { - ec.Errorf(ctx, "must not be null") - } - return graphql.Null - } - res := resTmp.(string) - fc.Result = res - return ec.marshalNString2string(ctx, field.Selections, res) -} - -func (ec *executionContext) fieldContext_Meta_id(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { +func (ec *executionContext) fieldContext_Meta_eventID(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { fc = &graphql.FieldContext{ Object: "Meta", Field: field, @@ -1066,8 +952,8 @@ func (ec *executionContext) fieldContext_PageInfo_end(ctx context.Context, field return fc, nil } -func (ec *executionContext) _Query_events(ctx context.Context, field graphql.CollectedField) (ret graphql.Marshaler) { - fc, err := ec.fieldContext_Query_events(ctx, field) +func (ec *executionContext) _PostEvent_id(ctx context.Context, field graphql.CollectedField, obj *gql_ev.PostEvent) (ret graphql.Marshaler) { + fc, err := ec.fieldContext_PostEvent_id(ctx, field) if err != nil { return graphql.Null } @@ -1080,7 +966,237 @@ func (ec *executionContext) _Query_events(ctx context.Context, field graphql.Col }() resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { ctx = rctx // use context from middleware stack in children - return ec.resolvers.Query().Events(rctx, fc.Args["streamID"].(string), fc.Args["paging"].(*gql_ev.PageInput)) + return obj.ID, nil + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + if !graphql.HasFieldError(ctx, fc) { + ec.Errorf(ctx, "must not be null") + } + return graphql.Null + } + res := resTmp.(string) + fc.Result = res + return ec.marshalNID2string(ctx, field.Selections, res) +} + +func (ec *executionContext) fieldContext_PostEvent_id(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "PostEvent", + Field: field, + IsMethod: false, + IsResolver: false, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + return nil, errors.New("field of type ID does not have child fields") + }, + } + return fc, nil +} + +func (ec *executionContext) _PostEvent_payload(ctx context.Context, field graphql.CollectedField, obj *gql_ev.PostEvent) (ret graphql.Marshaler) { + fc, err := ec.fieldContext_PostEvent_payload(ctx, field) + if err != nil { + return graphql.Null + } + ctx = graphql.WithFieldContext(ctx, fc) + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return obj.Payload, nil + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + if !graphql.HasFieldError(ctx, fc) { + ec.Errorf(ctx, "must not be null") + } + return graphql.Null + } + res := resTmp.(string) + fc.Result = res + return ec.marshalNString2string(ctx, field.Selections, res) +} + +func (ec *executionContext) fieldContext_PostEvent_payload(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "PostEvent", + Field: field, + IsMethod: false, + IsResolver: false, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + return nil, errors.New("field of type String does not have child fields") + }, + } + return fc, nil +} + +func (ec *executionContext) _PostEvent_payloadJSON(ctx context.Context, field graphql.CollectedField, obj *gql_ev.PostEvent) (ret graphql.Marshaler) { + fc, err := ec.fieldContext_PostEvent_payloadJSON(ctx, field) + if err != nil { + return graphql.Null + } + ctx = graphql.WithFieldContext(ctx, fc) + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return obj.PayloadJSON(ctx) + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + if !graphql.HasFieldError(ctx, fc) { + ec.Errorf(ctx, "must not be null") + } + return graphql.Null + } + res := resTmp.(map[string]interface{}) + fc.Result = res + return ec.marshalNMap2map(ctx, field.Selections, res) +} + +func (ec *executionContext) fieldContext_PostEvent_payloadJSON(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "PostEvent", + Field: field, + IsMethod: true, + IsResolver: false, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + return nil, errors.New("field of type Map does not have child fields") + }, + } + return fc, nil +} + +func (ec *executionContext) _PostEvent_tags(ctx context.Context, field graphql.CollectedField, obj *gql_ev.PostEvent) (ret graphql.Marshaler) { + fc, err := ec.fieldContext_PostEvent_tags(ctx, field) + if err != nil { + return graphql.Null + } + ctx = graphql.WithFieldContext(ctx, fc) + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return obj.Tags, nil + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + if !graphql.HasFieldError(ctx, fc) { + ec.Errorf(ctx, "must not be null") + } + return graphql.Null + } + res := resTmp.([]string) + fc.Result = res + return ec.marshalNString2ᚕstringᚄ(ctx, field.Selections, res) +} + +func (ec *executionContext) fieldContext_PostEvent_tags(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "PostEvent", + Field: field, + IsMethod: false, + IsResolver: false, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + return nil, errors.New("field of type String does not have child fields") + }, + } + return fc, nil +} + +func (ec *executionContext) _PostEvent_meta(ctx context.Context, field graphql.CollectedField, obj *gql_ev.PostEvent) (ret graphql.Marshaler) { + fc, err := ec.fieldContext_PostEvent_meta(ctx, field) + if err != nil { + return graphql.Null + } + ctx = graphql.WithFieldContext(ctx, fc) + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return obj.Meta, nil + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + if !graphql.HasFieldError(ctx, fc) { + ec.Errorf(ctx, "must not be null") + } + return graphql.Null + } + res := resTmp.(*event.Meta) + fc.Result = res + return ec.marshalNMeta2ᚖgithubᚗcomᚋsourᚑisᚋevᚋpkgᚋesᚋeventᚐMeta(ctx, field.Selections, res) +} + +func (ec *executionContext) fieldContext_PostEvent_meta(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "PostEvent", + Field: field, + IsMethod: false, + IsResolver: false, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + switch field.Name { + case "eventID": + return ec.fieldContext_Meta_eventID(ctx, field) + case "streamID": + return ec.fieldContext_Meta_streamID(ctx, field) + case "created": + return ec.fieldContext_Meta_created(ctx, field) + case "position": + return ec.fieldContext_Meta_position(ctx, field) + } + return nil, fmt.Errorf("no field named %q was found under type Meta", field.Name) + }, + } + return fc, nil +} + +func (ec *executionContext) _Query_posts(ctx context.Context, field graphql.CollectedField) (ret graphql.Marshaler) { + fc, err := ec.fieldContext_Query_posts(ctx, field) + if err != nil { + return graphql.Null + } + ctx = graphql.WithFieldContext(ctx, fc) + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return ec.resolvers.Query().Posts(rctx, fc.Args["streamID"].(string), fc.Args["paging"].(*gql_ev.PageInput)) }) if err != nil { ec.Error(ctx, err) @@ -1097,7 +1213,7 @@ func (ec *executionContext) _Query_events(ctx context.Context, field graphql.Col return ec.marshalNConnection2ᚖgithubᚗcomᚋsourᚑisᚋevᚋapiᚋgql_evᚐConnection(ctx, field.Selections, res) } -func (ec *executionContext) fieldContext_Query_events(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { +func (ec *executionContext) fieldContext_Query_posts(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { fc = &graphql.FieldContext{ Object: "Query", Field: field, @@ -1120,7 +1236,7 @@ func (ec *executionContext) fieldContext_Query_events(ctx context.Context, field } }() ctx = graphql.WithFieldContext(ctx, fc) - if fc.Args, err = ec.field_Query_events_args(ctx, field.ArgumentMap(ec.Variables)); err != nil { + if fc.Args, err = ec.field_Query_posts_args(ctx, field.ArgumentMap(ec.Variables)); err != nil { ec.Error(ctx, err) return } @@ -1304,6 +1420,84 @@ func (ec *executionContext) fieldContext_Query___schema(ctx context.Context, fie return fc, nil } +func (ec *executionContext) _Subscription_postAdded(ctx context.Context, field graphql.CollectedField) (ret func(ctx context.Context) graphql.Marshaler) { + fc, err := ec.fieldContext_Subscription_postAdded(ctx, field) + if err != nil { + return nil + } + ctx = graphql.WithFieldContext(ctx, fc) + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = nil + } + }() + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return ec.resolvers.Subscription().PostAdded(rctx, fc.Args["streamID"].(string)) + }) + if err != nil { + ec.Error(ctx, err) + return nil + } + if resTmp == nil { + return nil + } + return func(ctx context.Context) graphql.Marshaler { + select { + case res, ok := <-resTmp.(<-chan *gql_ev.PostEvent): + if !ok { + return nil + } + return graphql.WriterFunc(func(w io.Writer) { + w.Write([]byte{'{'}) + graphql.MarshalString(field.Alias).MarshalGQL(w) + w.Write([]byte{':'}) + ec.marshalOPostEvent2ᚖgithubᚗcomᚋsourᚑisᚋevᚋapiᚋgql_evᚐPostEvent(ctx, field.Selections, res).MarshalGQL(w) + w.Write([]byte{'}'}) + }) + case <-ctx.Done(): + return nil + } + } +} + +func (ec *executionContext) fieldContext_Subscription_postAdded(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "Subscription", + Field: field, + IsMethod: true, + IsResolver: true, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + switch field.Name { + case "id": + return ec.fieldContext_PostEvent_id(ctx, field) + case "payload": + return ec.fieldContext_PostEvent_payload(ctx, field) + case "payloadJSON": + return ec.fieldContext_PostEvent_payloadJSON(ctx, field) + case "tags": + return ec.fieldContext_PostEvent_tags(ctx, field) + case "meta": + return ec.fieldContext_PostEvent_meta(ctx, field) + } + return nil, fmt.Errorf("no field named %q was found under type PostEvent", field.Name) + }, + } + defer func() { + if r := recover(); r != nil { + err = ec.Recover(ctx, r) + ec.Error(ctx, err) + } + }() + ctx = graphql.WithFieldContext(ctx, fc) + if fc.Args, err = ec.field_Subscription_postAdded_args(ctx, field.ArgumentMap(ec.Variables)); err != nil { + ec.Error(ctx, err) + return + } + return fc, nil +} + func (ec *executionContext) __Service_sdl(ctx context.Context, field graphql.CollectedField, obj *fedruntime.Service) (ret graphql.Marshaler) { fc, err := ec.fieldContext__Service_sdl(ctx, field) if err != nil { @@ -3169,13 +3363,13 @@ func (ec *executionContext) _Edge(ctx context.Context, sel ast.SelectionSet, obj switch obj := (obj).(type) { case nil: return graphql.Null - case gql_ev.Event: - return ec._Event(ctx, sel, &obj) - case *gql_ev.Event: + case gql_ev.PostEvent: + return ec._PostEvent(ctx, sel, &obj) + case *gql_ev.PostEvent: if obj == nil { return graphql.Null } - return ec._Event(ctx, sel, obj) + return ec._PostEvent(ctx, sel, obj) default: panic(fmt.Errorf("unexpected type %T", obj)) } @@ -3220,55 +3414,6 @@ func (ec *executionContext) _Connection(ctx context.Context, sel ast.SelectionSe return out } -var eventImplementors = []string{"Event", "Edge"} - -func (ec *executionContext) _Event(ctx context.Context, sel ast.SelectionSet, obj *gql_ev.Event) graphql.Marshaler { - fields := graphql.CollectFields(ec.OperationContext, sel, eventImplementors) - out := graphql.NewFieldSet(fields) - var invalids uint32 - for i, field := range fields { - switch field.Name { - case "__typename": - out.Values[i] = graphql.MarshalString("Event") - case "id": - - out.Values[i] = ec._Event_id(ctx, field, obj) - - if out.Values[i] == graphql.Null { - invalids++ - } - case "payload": - - out.Values[i] = ec._Event_payload(ctx, field, obj) - - if out.Values[i] == graphql.Null { - invalids++ - } - case "tags": - - out.Values[i] = ec._Event_tags(ctx, field, obj) - - if out.Values[i] == graphql.Null { - invalids++ - } - case "meta": - - out.Values[i] = ec._Event_meta(ctx, field, obj) - - if out.Values[i] == graphql.Null { - invalids++ - } - default: - panic("unknown field " + strconv.Quote(field.Name)) - } - } - out.Dispatch() - if invalids > 0 { - return graphql.Null - } - return out -} - var metaImplementors = []string{"Meta"} func (ec *executionContext) _Meta(ctx context.Context, sel ast.SelectionSet, obj *event.Meta) graphql.Marshaler { @@ -3279,9 +3424,9 @@ func (ec *executionContext) _Meta(ctx context.Context, sel ast.SelectionSet, obj switch field.Name { case "__typename": out.Values[i] = graphql.MarshalString("Meta") - case "id": + case "eventID": - out.Values[i] = ec._Meta_id(ctx, field, obj) + out.Values[i] = ec._Meta_eventID(ctx, field, obj) if out.Values[i] == graphql.Null { invalids++ @@ -3367,6 +3512,75 @@ func (ec *executionContext) _PageInfo(ctx context.Context, sel ast.SelectionSet, return out } +var postEventImplementors = []string{"PostEvent", "Edge"} + +func (ec *executionContext) _PostEvent(ctx context.Context, sel ast.SelectionSet, obj *gql_ev.PostEvent) graphql.Marshaler { + fields := graphql.CollectFields(ec.OperationContext, sel, postEventImplementors) + out := graphql.NewFieldSet(fields) + var invalids uint32 + for i, field := range fields { + switch field.Name { + case "__typename": + out.Values[i] = graphql.MarshalString("PostEvent") + case "id": + + out.Values[i] = ec._PostEvent_id(ctx, field, obj) + + if out.Values[i] == graphql.Null { + atomic.AddUint32(&invalids, 1) + } + case "payload": + + out.Values[i] = ec._PostEvent_payload(ctx, field, obj) + + if out.Values[i] == graphql.Null { + atomic.AddUint32(&invalids, 1) + } + case "payloadJSON": + field := field + + innerFunc := func(ctx context.Context) (res graphql.Marshaler) { + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + } + }() + res = ec._PostEvent_payloadJSON(ctx, field, obj) + if res == graphql.Null { + atomic.AddUint32(&invalids, 1) + } + return res + } + + out.Concurrently(i, func() graphql.Marshaler { + return innerFunc(ctx) + + }) + case "tags": + + out.Values[i] = ec._PostEvent_tags(ctx, field, obj) + + if out.Values[i] == graphql.Null { + atomic.AddUint32(&invalids, 1) + } + case "meta": + + out.Values[i] = ec._PostEvent_meta(ctx, field, obj) + + if out.Values[i] == graphql.Null { + atomic.AddUint32(&invalids, 1) + } + default: + panic("unknown field " + strconv.Quote(field.Name)) + } + } + out.Dispatch() + if invalids > 0 { + return graphql.Null + } + return out +} + var queryImplementors = []string{"Query"} func (ec *executionContext) _Query(ctx context.Context, sel ast.SelectionSet) graphql.Marshaler { @@ -3386,7 +3600,7 @@ func (ec *executionContext) _Query(ctx context.Context, sel ast.SelectionSet) gr switch field.Name { case "__typename": out.Values[i] = graphql.MarshalString("Query") - case "events": + case "posts": field := field innerFunc := func(ctx context.Context) (res graphql.Marshaler) { @@ -3395,7 +3609,7 @@ func (ec *executionContext) _Query(ctx context.Context, sel ast.SelectionSet) gr ec.Error(ctx, ec.Recover(ctx, r)) } }() - res = ec._Query_events(ctx, field) + res = ec._Query_posts(ctx, field) if res == graphql.Null { atomic.AddUint32(&invalids, 1) } @@ -3455,6 +3669,26 @@ func (ec *executionContext) _Query(ctx context.Context, sel ast.SelectionSet) gr return out } +var subscriptionImplementors = []string{"Subscription"} + +func (ec *executionContext) _Subscription(ctx context.Context, sel ast.SelectionSet) func(ctx context.Context) graphql.Marshaler { + fields := graphql.CollectFields(ec.OperationContext, sel, subscriptionImplementors) + ctx = graphql.WithFieldContext(ctx, &graphql.FieldContext{ + Object: "Subscription", + }) + if len(fields) != 1 { + ec.Errorf(ctx, "must subscribe to exactly one stream") + return nil + } + + switch fields[0].Name { + case "postAdded": + return ec._Subscription_postAdded(ctx, fields[0]) + default: + panic("unknown field " + strconv.Quote(fields[0].Name)) + } +} + var _ServiceImplementors = []string{"_Service"} func (ec *executionContext) __Service(ctx context.Context, sel ast.SelectionSet, obj *fedruntime.Service) graphql.Marshaler { @@ -3911,6 +4145,27 @@ func (ec *executionContext) marshalNInt2uint64(ctx context.Context, sel ast.Sele return res } +func (ec *executionContext) unmarshalNMap2map(ctx context.Context, v interface{}) (map[string]interface{}, error) { + res, err := graphql.UnmarshalMap(v) + return res, graphql.ErrorOnPath(ctx, err) +} + +func (ec *executionContext) marshalNMap2map(ctx context.Context, sel ast.SelectionSet, v map[string]interface{}) graphql.Marshaler { + if v == nil { + if !graphql.HasFieldError(ctx, graphql.GetFieldContext(ctx)) { + ec.Errorf(ctx, "the requested element is null which the schema does not allow") + } + return graphql.Null + } + res := graphql.MarshalMap(v) + if res == graphql.Null { + if !graphql.HasFieldError(ctx, graphql.GetFieldContext(ctx)) { + ec.Errorf(ctx, "the requested element is null which the schema does not allow") + } + } + return res +} + func (ec *executionContext) marshalNMeta2ᚖgithubᚗcomᚋsourᚑisᚋevᚋpkgᚋesᚋeventᚐMeta(ctx context.Context, sel ast.SelectionSet, v *event.Meta) graphql.Marshaler { if v == nil { if !graphql.HasFieldError(ctx, graphql.GetFieldContext(ctx)) { @@ -4315,6 +4570,13 @@ func (ec *executionContext) unmarshalOPageInput2ᚖgithubᚗcomᚋsourᚑisᚋev return &res, graphql.ErrorOnPath(ctx, err) } +func (ec *executionContext) marshalOPostEvent2ᚖgithubᚗcomᚋsourᚑisᚋevᚋapiᚋgql_evᚐPostEvent(ctx context.Context, sel ast.SelectionSet, v *gql_ev.PostEvent) graphql.Marshaler { + if v == nil { + return graphql.Null + } + return ec._PostEvent(ctx, sel, v) +} + func (ec *executionContext) unmarshalOString2string(ctx context.Context, v interface{}) (string, error) { res, err := graphql.UnmarshalString(v) return res, graphql.ErrorOnPath(ctx, err) @@ -4325,6 +4587,44 @@ func (ec *executionContext) marshalOString2string(ctx context.Context, sel ast.S return res } +func (ec *executionContext) unmarshalOString2ᚕstringᚄ(ctx context.Context, v interface{}) ([]string, error) { + if v == nil { + return nil, nil + } + var vSlice []interface{} + if v != nil { + vSlice = graphql.CoerceList(v) + } + var err error + res := make([]string, len(vSlice)) + for i := range vSlice { + ctx := graphql.WithPathContext(ctx, graphql.NewPathWithIndex(i)) + res[i], err = ec.unmarshalNString2string(ctx, vSlice[i]) + if err != nil { + return nil, err + } + } + return res, nil +} + +func (ec *executionContext) marshalOString2ᚕstringᚄ(ctx context.Context, sel ast.SelectionSet, v []string) graphql.Marshaler { + if v == nil { + return graphql.Null + } + ret := make(graphql.Array, len(v)) + for i := range v { + ret[i] = ec.marshalNString2string(ctx, sel, v[i]) + } + + for _, e := range ret { + if e == graphql.Null { + return graphql.Null + } + } + + return ret +} + func (ec *executionContext) unmarshalOString2ᚖstring(ctx context.Context, v interface{}) (*string, error) { if v == nil { return nil, nil diff --git a/internal/graph/resolver.go b/internal/graph/resolver.go index c7f35cc..f884591 100644 --- a/internal/graph/resolver.go +++ b/internal/graph/resolver.go @@ -24,11 +24,11 @@ func New(r *gql_ev.Resolver) *Resolver { func (r *Resolver) Query() generated.QueryResolver { return &queryResolver{r} } // Subscription returns generated.SubscriptionResolver implementation. -// func (r *Resolver) Subscription() generated.SubscriptionResolver { return &subscriptionResolver{r} } +func (r *Resolver) Subscription() generated.SubscriptionResolver { return &subscriptionResolver{r} } type queryResolver struct{ *Resolver } -// type subscriptionResolver struct{ *Resolver } +type subscriptionResolver struct{ *Resolver } func (r *Resolver) ChainMiddlewares(h http.Handler) http.Handler { v := reflect.ValueOf(r) // Get reflected value of *Resolver diff --git a/main.go b/main.go index 90f37f7..28ba878 100644 --- a/main.go +++ b/main.go @@ -17,7 +17,8 @@ import ( "github.com/sour-is/ev/pkg/es" diskstore "github.com/sour-is/ev/pkg/es/driver/disk-store" memstore "github.com/sour-is/ev/pkg/es/driver/mem-store" - "github.com/sour-is/ev/pkg/es/service" + "github.com/sour-is/ev/pkg/es/driver/streamer" + "github.com/sour-is/ev/pkg/msgbus" "github.com/sour-is/ev/pkg/playground" ) @@ -36,12 +37,12 @@ func run(ctx context.Context) error { diskstore.Init(ctx) memstore.Init(ctx) - es, err := es.Open(ctx, env("EV_DATA", "file:data")) + es, err := es.Open(ctx, env("EV_DATA", "file:data"), streamer.New(ctx)) if err != nil { return err } - svc, err := service.New(ctx, es) + svc, err := msgbus.New(ctx, es) if err != nil { return err } diff --git a/pkg/es/driver/disk-store/disk-store.go b/pkg/es/driver/disk-store/disk-store.go index b1b1fe1..cb6743a 100644 --- a/pkg/es/driver/disk-store/disk-store.go +++ b/pkg/es/driver/disk-store/disk-store.go @@ -12,6 +12,7 @@ import ( "github.com/sour-is/ev/pkg/es" "github.com/sour-is/ev/pkg/es/driver" "github.com/sour-is/ev/pkg/es/event" + "github.com/sour-is/ev/pkg/locker" "github.com/sour-is/ev/pkg/math" ) @@ -19,14 +20,17 @@ type diskStore struct { path string } -var _ driver.Driver = (*diskStore)(nil) +const AppendOnly = es.AppendOnly +const AllEvents = es.AllEvents func Init(ctx context.Context) error { es.Register(ctx, "file", &diskStore{}) return nil } -func (diskStore) Open(dsn string) (driver.EventStore, error) { +var _ driver.Driver = (*diskStore)(nil) + +func (diskStore) Open(_ context.Context, dsn string) (driver.Driver, error) { scheme, path, ok := strings.Cut(dsn, ":") if !ok { return nil, fmt.Errorf("expected scheme") @@ -45,181 +49,151 @@ func (diskStore) Open(dsn string) (driver.EventStore, error) { return &diskStore{path: path}, nil } - -func (es *diskStore) Save(ctx context.Context, agg event.Aggregate) (uint64, error) { - l, err := es.readLog(agg.StreamID()) - if err != nil { - return 0, err - } - - var last uint64 - if last, err = l.LastIndex(); err != nil { - return 0, err - } - if agg.StreamVersion() != last { - return 0, fmt.Errorf("current version wrong %d != %d", agg.StreamVersion(), last) - } - - events := agg.Events(true) - - var b []byte - batch := &wal.Batch{} - for _, e := range events { - b, err = event.MarshalText(e) - if err != nil { - return 0, err - } - - batch.Write(e.EventMeta().Position, b) - } - - err = l.WriteBatch(batch) - if err != nil { - return 0, err - } - agg.Commit() - - return uint64(len(events)), nil +func (ds *diskStore) EventLog(ctx context.Context, streamID string) (driver.EventLog, error) { + el := &eventLog{streamID: streamID} + l, err := wal.Open(filepath.Join(ds.path, streamID), wal.DefaultOptions) + el.events = locker.New(l) + return el, err } -func (es *diskStore) Append(ctx context.Context, streamID string, events event.Events) (uint64, error) { - event.SetStreamID(streamID, events...) - l, err := es.readLog(streamID) - if err != nil { - return 0, err - } - - var last uint64 - if last, err = l.LastIndex(); err != nil { - return 0, err - } - - var b []byte - - batch := &wal.Batch{} - for i, e := range events { - b, err = event.MarshalText(e) - if err != nil { - return 0, err - } - pos := last + uint64(i) + 1 - event.SetPosition(e, pos) - - batch.Write(pos, b) - } - - err = l.WriteBatch(batch) - if err != nil { - return 0, err - } - return uint64(len(events)), nil +type eventLog struct { + streamID string + events *locker.Locked[wal.Log] } -func (es *diskStore) Load(ctx context.Context, agg event.Aggregate) error { - l, err := es.readLog(agg.StreamID()) - if err != nil { - return err - } - var i, first, last uint64 +var _ driver.EventLog = (*eventLog)(nil) - if first, err = l.FirstIndex(); err != nil { - return err - } - if last, err = l.LastIndex(); err != nil { - return err - } - if first == 0 || last == 0 { - return nil - } +func (es *eventLog) Append(ctx context.Context, events event.Events, version uint64) (uint64, error) { + event.SetStreamID(es.streamID, events...) - var b []byte - events := make([]event.Event, last-i) - for i = 0; first+i <= last; i++ { - b, err = l.Read(first + i) + var count uint64 + err := es.events.Modify(ctx, func(l *wal.Log) error { + last, err := l.LastIndex() if err != nil { return err } - events[i], err = event.UnmarshalText(ctx, b, first+i) - if err != nil { - return err + + if version != AppendOnly && version != last { + return fmt.Errorf("current version wrong %d != %d", version, last) } - } - event.Append(agg, events...) - return nil -} -func (es *diskStore) Read(ctx context.Context, streamID string, pos, count int64) (event.Events, error) { - l, err := es.readLog(streamID) - if err != nil { - return nil, err - } - - var first, last, start uint64 - if first, err = l.FirstIndex(); err != nil { - return nil, err - } - if last, err = l.LastIndex(); err != nil { - return nil, err - } - if first == 0 || last == 0 { - return nil, nil - } - - switch { - case pos >= 0: - start = first + uint64(pos) - if pos == 0 && count < 0 { - count = -count // if pos=0 assume forward count. - } - case pos < 0: - start = uint64(int64(last) + pos + 1) - if pos == -1 && count > 0 { - count = -count // if pos=-1 assume backward count. - } - } - - events := make([]event.Event, math.Abs(count)) - for i := range events { var b []byte - b, err = l.Read(start) + batch := &wal.Batch{} + for i, e := range events { + b, err = event.MarshalText(e) + if err != nil { + return err + } + pos := last + uint64(i) + 1 + event.SetPosition(e, pos) + + batch.Write(pos, b) + } + + count = uint64(len(events)) + return l.WriteBatch(batch) + }) + + return count, err +} +func (es *eventLog) Read(ctx context.Context, pos, count int64) (event.Events, error) { + var events event.Events + + err := es.events.Modify(ctx, func(stream *wal.Log) error { + first, err := stream.FirstIndex() if err != nil { - return events, err + return err } - events[i], err = event.UnmarshalText(ctx, b, start) + last, err := stream.LastIndex() if err != nil { - return events, err + return err + } + // --- + if first == 0 || last == 0 { + return nil } - if count > 0 { - start += 1 - } else { - start -= 1 + if count == AllEvents { + count = int64(first - last) } - if start < first || start > last { - events = events[:i+1] - break - } - } - event.SetStreamID(streamID, events...) - return events, nil -} -func (es *diskStore) FirstIndex(ctx context.Context, streamID string) (uint64, error) { - l, err := es.readLog(streamID) + var start uint64 + + switch { + case pos >= 0 && count > 0: + start = first + uint64(pos) + case pos < 0 && count > 0: + start = uint64(int64(last) + pos + 1) + + case pos >= 0 && count < 0: + start = first + uint64(pos) + if pos > 1 { + start -= 2 // if pos is positive and count negative start before + } + if pos <= 1 { + return nil // if pos is one or zero and negative count nothing to return + } + case pos < 0 && count < 0: + start = uint64(int64(last) + pos) + } + if start >= last { + return nil // if start is after last and positive count nothing to return + } + + events = make([]event.Event, math.Abs(count)) + for i := range events { + // --- + var b []byte + b, err = stream.Read(start) + if err != nil { + return err + } + events[i], err = event.UnmarshalText(ctx, b, start) + if err != nil { + return err + } + // --- + + if count > 0 { + start += 1 + } else { + start -= 1 + } + if start < first || start > last { + events = events[:i+1] + break + } + } + return nil + }) if err != nil { - return 0, err + return nil, err } - return l.FirstIndex() -} -func (es *diskStore) LastIndex(ctx context.Context, streamID string) (uint64, error) { - l, err := es.readLog(streamID) - if err != nil { - return 0, err - } - return l.LastIndex() -} -func (es *diskStore) readLog(name string) (*wal.Log, error) { - return wal.Open(filepath.Join(es.path, name), wal.DefaultOptions) + event.SetStreamID(es.streamID, events...) + + return events, err +} +func (es *eventLog) FirstIndex(ctx context.Context) (uint64, error) { + var idx uint64 + var err error + + err = es.events.Modify(ctx, func(events *wal.Log) error { + idx, err = events.FirstIndex() + return err + }) + + return idx, err +} +func (es *eventLog) LastIndex(ctx context.Context) (uint64, error) { + var idx uint64 + var err error + + err = es.events.Modify(ctx, func(events *wal.Log) error { + idx, err = events.LastIndex() + return err + }) + + return idx, err } diff --git a/pkg/es/driver/driver.go b/pkg/es/driver/driver.go index abe184b..ddaf696 100644 --- a/pkg/es/driver/driver.go +++ b/pkg/es/driver/driver.go @@ -7,14 +7,24 @@ import ( ) type Driver interface { - Open(string) (EventStore, error) + Open(ctx context.Context, dsn string) (Driver, error) + EventLog(ctx context.Context, streamID string) (EventLog, error) } -type EventStore interface { - Save(ctx context.Context, agg event.Aggregate) (uint64, error) - Load(ctx context.Context, agg event.Aggregate) error - Read(ctx context.Context, streamID string, pos, count int64) (event.Events, error) - Append(ctx context.Context, streamID string, events event.Events) (uint64, error) - FirstIndex(ctx context.Context, streamID string) (uint64, error) - LastIndex(ctx context.Context, streamID string) (uint64, error) +type EventLog interface { + Read(ctx context.Context, pos, count int64) (event.Events, error) + Append(ctx context.Context, events event.Events, version uint64) (uint64, error) + FirstIndex(ctx context.Context) (uint64, error) + LastIndex(ctx context.Context) (uint64, error) +} + +type Subscription interface { + Recv(context.Context) bool + Events(context.Context) (event.Events, error) + Close(context.Context) error +} + +type EventStream interface { + Subscribe(ctx context.Context, streamID string) (Subscription, error) + Send(ctx context.Context, streamID string, events event.Events) error } diff --git a/pkg/es/driver/mem-store/mem-store.go b/pkg/es/driver/mem-store/mem-store.go index 66656cb..13d0869 100644 --- a/pkg/es/driver/mem-store/mem-store.go +++ b/pkg/es/driver/mem-store/mem-store.go @@ -12,84 +12,113 @@ import ( ) type state struct { - streams map[string]event.Events + streams map[string]*locker.Locked[event.Events] +} +type eventLog struct { + streamID string + events *locker.Locked[event.Events] } - type memstore struct { state *locker.Locked[state] } -var _ driver.Driver = (*memstore)(nil) +const AppendOnly = es.AppendOnly +const AllEvents = es.AllEvents func Init(ctx context.Context) { es.Register(ctx, "mem", &memstore{}) } -func (memstore) Open(name string) (driver.EventStore, error) { - s := &state{streams: make(map[string]event.Events)} +var _ driver.Driver = (*memstore)(nil) + +func (memstore) Open(_ context.Context, name string) (driver.Driver, error) { + s := &state{streams: make(map[string]*locker.Locked[event.Events])} return &memstore{locker.New(s)}, nil } +func (m *memstore) EventLog(ctx context.Context, streamID string) (driver.EventLog, error) { + el := &eventLog{streamID: streamID} + + err := m.state.Modify(ctx, func(state *state) error { + l, ok := state.streams[streamID] + if !ok { + l = locker.New(&event.Events{}) + state.streams[streamID] = l + } + el.events = l + return nil + }) + if err != nil { + return nil, err + } + return el, err +} + +var _ driver.EventLog = (*eventLog)(nil) // Append implements driver.EventStore -func (m *memstore) Append(ctx context.Context, streamID string, events event.Events) (uint64, error) { - event.SetStreamID(streamID, events...) +func (m *eventLog) Append(ctx context.Context, events event.Events, version uint64) (uint64, error) { + event.SetStreamID(m.streamID, events...) + + return uint64(len(events)), m.events.Modify(ctx, func(stream *event.Events) error { + last := uint64(len(*stream)) + if version != AppendOnly && version != last { + return fmt.Errorf("current version wrong %d != %d", version, last) + } - return uint64(len(events)), m.state.Modify(ctx, func(state *state) error { - stream := state.streams[streamID] - last := uint64(len(stream)) for i := range events { pos := last + uint64(i) + 1 event.SetPosition(events[i], pos) - stream = append(stream, events[i]) - state.streams[streamID] = stream + *stream = append(*stream, events[i]) } return nil }) } -// Load implements driver.EventStore -func (m *memstore) Load(ctx context.Context, agg event.Aggregate) error { - return m.state.Modify(ctx, func(state *state) error { - events := state.streams[agg.StreamID()] - event.SetStreamID(agg.StreamID(), events...) - agg.ApplyEvent(events...) - return nil - }) -} - // Read implements driver.EventStore -func (m *memstore) Read(ctx context.Context, streamID string, pos int64, count int64) (event.Events, error) { - events := make([]event.Event, math.Abs(count)) - - err := m.state.Modify(ctx, func(state *state) error { - stream := state.streams[streamID] - - var first, last, start uint64 - first = stream.First().EventMeta().Position - last = stream.Last().EventMeta().Position +func (es *eventLog) Read(ctx context.Context, pos int64, count int64) (event.Events, error) { + var events event.Events + err := es.events.Modify(ctx, func(stream *event.Events) error { + first := stream.First().EventMeta().Position + last := stream.Last().EventMeta().Position + // --- if first == 0 || last == 0 { - events = events[:0] - return nil } - switch { - case pos >= 0: - start = first + uint64(pos) - if pos == 0 && count < 0 { - count = -count // if pos=0 assume forward count. - } - case pos < 0: - start = uint64(int64(last) + pos + 1) - if pos == -1 && count > 0 { - count = -count // if pos=-1 assume backward count. - } + if count == AllEvents { + count = int64(first - last) } + var start uint64 + + switch { + case pos >= 0 && count > 0: + start = first + uint64(pos) + case pos < 0 && count > 0: + start = uint64(int64(last) + pos + 1) + + case pos >= 0 && count < 0: + start = first + uint64(pos) + if pos > 1 { + start -= 2 // if pos is positive and count negative start before + } + if pos <= 1 { + return nil // if pos is one or zero and negative count nothing to return + } + case pos < 0 && count < 0: + start = uint64(int64(last) + pos) + } + if start >= last { + return nil // if start is after last and positive count nothing to return + } + + events = make([]event.Event, math.Abs(count)) for i := range events { - events[i] = stream[start-1] + // --- + events[i] = (*stream)[start-1] + // --- if count > 0 { start += 1 @@ -108,51 +137,19 @@ func (m *memstore) Read(ctx context.Context, streamID string, pos int64, count i return nil, err } + event.SetStreamID(es.streamID, events...) + return events, nil } -// Save implements driver.EventStore -func (m *memstore) Save(ctx context.Context, agg event.Aggregate) (uint64, error) { - events := agg.Events(true) - event.SetStreamID(agg.StreamID(), events...) - - err := m.state.Modify(ctx, func(state *state) error { - stream := state.streams[agg.StreamID()] - - last := uint64(len(stream)) - if agg.StreamVersion() != last { - return fmt.Errorf("current version wrong %d != %d", agg.StreamVersion(), last) - } - - for i := range events { - pos := last + uint64(i) + 1 - event.SetPosition(events[i], pos) - stream = append(stream, events[i]) - } - - state.streams[agg.StreamID()] = stream - return nil - }) - if err != nil { - return 0, err - } - agg.Commit() - - return uint64(len(events)), nil +// FirstIndex for the streamID +func (m *eventLog) FirstIndex(ctx context.Context) (uint64, error) { + events, err := m.events.Copy(ctx) + return events.First().EventMeta().Position, err } -func (m *memstore) FirstIndex(ctx context.Context, streamID string) (uint64, error) { - stream, err := m.state.Copy(ctx) - if err != nil { - return 0, err - } - return stream.streams[streamID].First().EventMeta().Position, nil -} -func (m *memstore) LastIndex(ctx context.Context, streamID string) (uint64, error) { - stream, err := m.state.Copy(ctx) - if err != nil { - return 0, err - } - return stream.streams[streamID].Last().EventMeta().Position, nil - +// LastIndex for the streamID +func (m *eventLog) LastIndex(ctx context.Context) (uint64, error) { + events, err := m.events.Copy(ctx) + return events.Last().EventMeta().Position, err } diff --git a/pkg/es/driver/streamer/streamer.go b/pkg/es/driver/streamer/streamer.go new file mode 100644 index 0000000..3342c7a --- /dev/null +++ b/pkg/es/driver/streamer/streamer.go @@ -0,0 +1,203 @@ +package streamer + +import ( + "context" + "log" + + "github.com/sour-is/ev/pkg/es" + "github.com/sour-is/ev/pkg/es/driver" + "github.com/sour-is/ev/pkg/es/event" + "github.com/sour-is/ev/pkg/locker" +) + +type state struct { + subscribers map[string][]*subscription +} + +type streamer struct { + state *locker.Locked[state] + up driver.Driver +} + +func New(ctx context.Context) *streamer { + return &streamer{state: locker.New(&state{subscribers: map[string][]*subscription{}})} +} + +var _ es.Option = (*streamer)(nil) + +func (s *streamer) Apply(e *es.EventStore) { + s.up = e.Driver + e.Driver = s +} +func (s *streamer) Unwrap() driver.Driver { + return s.up +} + +var _ driver.Driver = (*streamer)(nil) + +func (s *streamer) Open(ctx context.Context, dsn string) (driver.Driver, error) { + return s.up.Open(ctx, dsn) +} +func (s *streamer) EventLog(ctx context.Context, streamID string) (driver.EventLog, error) { + l, err := s.up.EventLog(ctx, streamID) + return &wrapper{streamID, l, s}, err +} + +var _ driver.EventStream = (*streamer)(nil) + +func (s *streamer) Subscribe(ctx context.Context, streamID string) (driver.Subscription, error) { + log.Println("subscribe", streamID) + events, err := s.up.EventLog(ctx, streamID) + if err != nil { + return nil, err + } + sub := &subscription{topic: streamID, events: events} + sub.position = locker.New(&position{ + size: es.AllEvents, + }) + sub.unsub = s.delete(streamID, sub) + + return sub, s.state.Modify(ctx, func(state *state) error { + state.subscribers[streamID] = append(state.subscribers[streamID], sub) + log.Println("subs=", len(state.subscribers[streamID])) + return nil + }) +} +func (s *streamer) Send(ctx context.Context, streamID string, events event.Events) error { + log.Println("send", streamID, len(events)) + return s.state.Modify(ctx, func(state *state) error { + for _, sub := range state.subscribers[streamID] { + return sub.position.Modify(ctx, func(position *position) error { + position.size = int64(events.Last().EventMeta().Position - uint64(position.idx)) + + if position.wait != nil { + close(position.wait) + position.wait = nil + } + return nil + }) + } + return nil + }) +} + +func (s *streamer) delete(streamID string, sub *subscription) func(context.Context) error { + return func(ctx context.Context) error { + log.Println("unsub", streamID) + if err := ctx.Err(); err != nil { + return err + } + return s.state.Modify(ctx, func(state *state) error { + lis := state.subscribers[streamID] + for i := range lis { + if lis[i] == sub { + lis[i] = lis[len(lis)-1] + state.subscribers[streamID] = lis[:len(lis)-1] + log.Println("subs=", len(state.subscribers[streamID])) + + return nil + } + } + return nil + }) + } +} + +type wrapper struct { + topic string + up driver.EventLog + streamer *streamer +} + +var _ driver.EventLog = (*wrapper)(nil) + +func (w *wrapper) Read(ctx context.Context, pos int64, count int64) (event.Events, error) { + return w.up.Read(ctx, pos, count) +} + +func (w *wrapper) Append(ctx context.Context, events event.Events, version uint64) (uint64, error) { + i, err := w.up.Append(ctx, events, version) + if err != nil { + return i, err + } + return i, w.streamer.Send(ctx, w.topic, events) +} + +func (w *wrapper) FirstIndex(ctx context.Context) (uint64, error) { + return w.up.FirstIndex(ctx) +} + +func (w *wrapper) LastIndex(ctx context.Context) (uint64, error) { + return w.up.LastIndex(ctx) +} + +type position struct { + size int64 + idx int64 + wait chan struct{} +} + +type subscription struct { + topic string + + position *locker.Locked[position] + + events driver.EventLog + unsub func(context.Context) error +} + +func (s *subscription) Recv(ctx context.Context) bool { + var wait func(context.Context) bool + log.Println("recv more") + err := s.position.Modify(ctx, func(position *position) error { + if position.size == es.AllEvents { + return nil + } + if position.size == 0 { + position.wait = make(chan struct{}) + wait = func(ctx context.Context) bool { + log.Println("waiting", s.topic) + select { + case <-position.wait: + log.Println("got some") + + return true + case <-ctx.Done(): + log.Println("got cancel") + + return false + } + } + } + position.idx += position.size + position.size = 0 + return nil + }) + if err != nil { + return false + } + + if wait != nil { + return wait(ctx) + } + + return true +} +func (s *subscription) Events(ctx context.Context) (event.Events, error) { + var events event.Events + log.Println("get events") + return events, s.position.Modify(ctx, func(position *position) error { + var err error + events, err = s.events.Read(ctx, int64(position.idx), position.size) + log.Printf("got events=%d %#v", len(events), position) + position.size = int64(len(events)) + if len(events) > 0 { + position.idx = int64(events.First().EventMeta().Position - 1) + log.Println(position, events.First()) + } + return err + }) +} +func (s *subscription) Close(ctx context.Context) error { + return s.unsub(ctx) +} diff --git a/pkg/es/es.go b/pkg/es/es.go index 0acb99a..4d10ed5 100644 --- a/pkg/es/es.go +++ b/pkg/es/es.go @@ -7,6 +7,7 @@ import ( "strings" "github.com/sour-is/ev/pkg/es/driver" + "github.com/sour-is/ev/pkg/es/event" "github.com/sour-is/ev/pkg/locker" ) @@ -14,6 +15,9 @@ type config struct { drivers map[string]driver.Driver } +const AppendOnly = ^uint64(0) +const AllEvents = int64(AppendOnly >> 1) + var ( drivers = locker.New(&config{drivers: make(map[string]driver.Driver)}) ) @@ -28,23 +32,118 @@ func Register(ctx context.Context, name string, d driver.Driver) error { }) } -func Open(ctx context.Context, dsn string) (driver.EventStore, error) { +type EventStore struct { + driver.Driver +} + +func Open(ctx context.Context, dsn string, options ...Option) (*EventStore, error) { name, _, ok := strings.Cut(dsn, ":") if !ok { return nil, fmt.Errorf("%w: no scheme", ErrNoDriver) } var d driver.Driver - drivers.Modify(ctx,func(c *config) error { - var ok bool - d, ok = c.drivers[name] - if !ok { - return fmt.Errorf("%w: %s not registered", ErrNoDriver, name) - } - return nil - }) + c, err := drivers.Copy(ctx) + if err != nil { + return nil, err + } - return d.Open(dsn) + d, ok = c.drivers[name] + if !ok { + return nil, fmt.Errorf("%w: %s not registered", ErrNoDriver, name) + } + + conn, err := d.Open(ctx, dsn) + + es := &EventStore{Driver: conn} + for _, o := range options { + o.Apply(es) + } + + return es, err +} + +type Option interface { + Apply(*EventStore) +} + +func (es *EventStore) Save(ctx context.Context, agg event.Aggregate) (uint64, error) { + l, err := es.EventLog(ctx, agg.StreamID()) + if err != nil { + return 0, err + } + events := agg.Events(true) + + count, err := l.Append(ctx, events, agg.StreamVersion()) + if err != nil { + return 0, err + } + + agg.Commit() + return count, err +} +func (es *EventStore) Load(ctx context.Context, agg event.Aggregate) error { + l, err := es.Driver.EventLog(ctx, agg.StreamID()) + if err != nil { + return err + } + + events, err := l.Read(ctx, 0, AllEvents) + if err != nil { + return err + } + event.Append(agg, events...) + + return nil +} +func (es *EventStore) Read(ctx context.Context, streamID string, pos, count int64) (event.Events, error) { + l, err := es.Driver.EventLog(ctx, streamID) + if err != nil { + return nil, err + } + return l.Read(ctx, pos, count) +} +func (es *EventStore) Append(ctx context.Context, streamID string, events event.Events) (uint64, error) { + l, err := es.Driver.EventLog(ctx, streamID) + if err != nil { + return 0, err + } + return l.Append(ctx, events, AppendOnly) +} +func (es *EventStore) FirstIndex(ctx context.Context, streamID string) (uint64, error) { + l, err := es.Driver.EventLog(ctx, streamID) + if err != nil { + return 0, err + } + return l.FirstIndex(ctx) +} +func (es *EventStore) LastIndex(ctx context.Context, streamID string) (uint64, error) { + l, err := es.Driver.EventLog(ctx, streamID) + if err != nil { + return 0, err + } + return l.LastIndex(ctx) +} + +func (es *EventStore) EventStream() driver.EventStream { + d := es.Driver + for d != nil { + if d, ok := d.(driver.EventStream); ok { + return d + } + + d = Unwrap(d) + } + return nil +} + +func Unwrap[T any](t T) T { + if unwrap, ok := any(t).(interface{Unwrap() T}); ok { + return unwrap.Unwrap() + } else { + var zero T + return zero + } } var ErrNoDriver = errors.New("no driver") diff --git a/pkg/es/event/events.go b/pkg/es/event/events.go index 100ace9..05ba02a 100644 --- a/pkg/es/event/events.go +++ b/pkg/es/event/events.go @@ -54,7 +54,7 @@ func (lis Events) SetStreamID(streamID string) { } func (lis Events) First() Event { if len(lis) == 0 { - return nilEvent + return NilEvent } return lis[0] } @@ -66,7 +66,7 @@ func (lis Events) Rest() Events { } func (lis Events) Last() Event { if len(lis) == 0 { - return nilEvent + return NilEvent } return lis[len(lis)-1] } @@ -134,13 +134,14 @@ type Meta struct { func (m Meta) Created() time.Time { return ulid.Time(m.EventID.Time()) } -func (m Meta) ID() string { return m.EventID.String() } +func (m Meta) GetEventID() string { return m.EventID.String() } -type _nilEvent struct{} -func (_nilEvent) EventMeta() Meta { +type nilEvent struct{} + +func (nilEvent) EventMeta() Meta { return Meta{} } -func (_nilEvent) SetEventMeta(eventMeta Meta) {} +func (nilEvent) SetEventMeta(eventMeta Meta) {} -var nilEvent _nilEvent +var NilEvent nilEvent diff --git a/pkg/locker/locker.go b/pkg/locker/locker.go index 5610c7e..6495e44 100644 --- a/pkg/locker/locker.go +++ b/pkg/locker/locker.go @@ -1,6 +1,9 @@ package locker -import "context" +import ( + "context" + "log" +) type Locked[T any] struct { state chan *T @@ -23,6 +26,9 @@ func (s *Locked[T]) Modify(ctx context.Context, fn func(*T) error) error { select { case state := <-s.state: defer func() { s.state <- state }() + log.Printf("locker %T to %p", state, fn) + defer log.Printf("locker %T from %p", state, fn) + return fn(state) case <-ctx.Done(): return ctx.Err() diff --git a/pkg/es/service/service.go b/pkg/msgbus/service.go similarity index 57% rename from pkg/es/service/service.go rename to pkg/msgbus/service.go index 9e92ff2..c140402 100644 --- a/pkg/es/service/service.go +++ b/pkg/msgbus/service.go @@ -1,24 +1,26 @@ -package service +package msgbus import ( "bytes" "context" + "encoding/json" "fmt" "io" "log" "net/http" "strconv" "strings" + "time" - "github.com/sour-is/ev/pkg/es/driver" + "github.com/sour-is/ev/pkg/es" "github.com/sour-is/ev/pkg/es/event" ) type service struct { - es driver.EventStore + es *es.EventStore } -func New(ctx context.Context, es driver.EventStore) (*service, error) { +func New(ctx context.Context, es *es.EventStore) (*service, error) { if err := event.Register(ctx, &PostEvent{}); err != nil { return nil, err } @@ -34,6 +36,11 @@ func (s *service) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } + var first event.Event = event.NilEvent + if lis, err := s.es.Read(ctx, "post-"+name, 0, 1); err == nil && len(lis) > 0 { + first = lis[0] + } + switch r.Method { case http.MethodGet: var pos, count int64 = -1, -99 @@ -54,6 +61,18 @@ func (s *service) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } + if strings.Contains(r.Header.Get("Accept"), "application/json") { + w.Header().Add("Content-Type", "application/json") + + if err = encodeJSON(w, first, events); err != nil { + log.Print(err) + + w.WriteHeader(http.StatusInternalServerError) + return + } + return + } + for i := range events { fmt.Fprintln(w, events[i]) } @@ -85,10 +104,29 @@ func (s *service) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } + if first == event.NilEvent { + first = events.First() + } + m := events.First().EventMeta() - w.WriteHeader(http.StatusAccepted) log.Print("POST topic=", name, " tags=", tags, " idx=", m.Position, " id=", m.EventID) + + w.WriteHeader(http.StatusAccepted) + if strings.Contains(r.Header.Get("Accept"), "application/json") { + w.Header().Add("Content-Type", "application/json") + if err = encodeJSON(w, first, events); err != nil { + log.Print(err) + + w.WriteHeader(http.StatusInternalServerError) + return + } + + return + } + + w.Header().Add("Content-Type", "text/plain") fmt.Fprintf(w, "OK %d %s", m.Position, m.EventID) + return default: w.WriteHeader(http.StatusMethodNotAllowed) } @@ -137,3 +175,38 @@ func fields(s string) []string { } return strings.Split(s, "/") } + +func encodeJSON(w io.Writer, first event.Event, events event.Events) error { + out := make([]struct { + ID uint64 `json:"id"` + Payload []byte `json:"payload"` + Created string `json:"created"` + Tags []string `json:"tags"` + Topic struct { + Name string `json:"name"` + TTL uint64 `json:"ttl"` + Seq uint64 `json:"seq"` + Created string `json:"created"` + } `json:"topic"` + }, len(events)) + + for i := range events { + e, ok := events[i].(*PostEvent) + if !ok { + continue + } + out[i].ID = e.EventMeta().Position + out[i].Created = e.EventMeta().Created().Format(time.RFC3339Nano) + out[i].Payload = e.Payload + out[i].Tags = e.Tags + out[i].Topic.Name = e.EventMeta().StreamID + out[i].Topic.Created = first.EventMeta().Created().Format(time.RFC3339Nano) + out[i].Topic.Seq = e.EventMeta().Position + } + + if len(out) == 1 { + return json.NewEncoder(w).Encode(out[0]) + } + + return json.NewEncoder(w).Encode(out) +}