From ad57c899457b58011e7006347e874ff895a16bb0 Mon Sep 17 00:00:00 2001 From: Jon Lundy Date: Sun, 14 Aug 2022 10:04:15 -0600 Subject: [PATCH] feat: add create salty user --- api/gql_ev/models.go | 10 + api/gql_ev/resolver.go | 41 ++ api/gql_ev/salty.graphqls | 14 + go.mod | 13 + go.sum | 46 ++ internal/graph/generated/generated.go | 562 +++++++++++++++++++++++++ internal/graph/resolver.go | 5 + main.go | 8 +- pkg/domain/salty-user.go | 74 ++++ pkg/es/driver/disk-store/disk-store.go | 21 +- pkg/es/driver/driver.go | 6 +- pkg/es/driver/mem-store/mem-store.go | 16 +- pkg/es/driver/streamer/streamer.go | 4 + pkg/es/es.go | 70 ++- pkg/es/event/aggregate.go | 40 +- pkg/es/event/aggregate_test.go | 56 +++ pkg/math/math_test.go | 5 +- pkg/msgbus/service.go | 32 +- 18 files changed, 969 insertions(+), 54 deletions(-) create mode 100644 api/gql_ev/salty.graphqls create mode 100644 pkg/domain/salty-user.go create mode 100644 pkg/es/event/aggregate_test.go diff --git a/api/gql_ev/models.go b/api/gql_ev/models.go index ed78c4c..9da9d82 100644 --- a/api/gql_ev/models.go +++ b/api/gql_ev/models.go @@ -54,3 +54,13 @@ func (p *PageInput) GetCount(v int64) int64 { } return *p.Count } + +type SaltyUser struct { + Nick string `json:"nick"` + Pubkey string `json:"pubkey"` + Inbox string `json:"inbox"` +} + +func (s SaltyUser) Endpoint(ctx context.Context) string { + return "https://ev.sour.is/inbox/" + s.Inbox +} diff --git a/api/gql_ev/resolver.go b/api/gql_ev/resolver.go index 1f148a0..e9260de 100644 --- a/api/gql_ev/resolver.go +++ b/api/gql_ev/resolver.go @@ -2,10 +2,14 @@ package gql_ev import ( "context" + "crypto/sha256" "fmt" + "strings" "time" + "github.com/keys-pub/keys" "github.com/sour-is/ev/internal/logz" + "github.com/sour-is/ev/pkg/domain" "github.com/sour-is/ev/pkg/es" "github.com/sour-is/ev/pkg/msgbus" "go.opentelemetry.io/otel/metric/instrument/syncint64" @@ -145,3 +149,40 @@ func (r *Resolver) PostAdded(ctx context.Context, streamID string, after int64) return ch, nil } + +func (r *Resolver) CreateSaltyUser(ctx context.Context, nick string, pub string) (*SaltyUser, error) { + streamID := fmt.Sprintf("saltyuser-%x", sha256.Sum256([]byte(strings.ToLower(nick)))) + + key, err := keys.NewEdX25519PublicKeyFromID(keys.ID(pub)) + if err != nil { + return nil, err + } + + a, err := es.Create(ctx, r.es, streamID, func(ctx context.Context, agg *domain.SaltyUser) error { + return agg.OnUserRegister(nick, key) + }) + if err != nil { + return nil, err + } + + return &SaltyUser{ + Nick: nick, + Pubkey: pub, + Inbox: a.Inbox.String(), + }, err +} + +func (r *Resolver) SaltyUser(ctx context.Context, nick string) (*SaltyUser, error) { + streamID := fmt.Sprintf("saltyuser-%x", sha256.Sum256([]byte(strings.ToLower(nick)))) + + a, err := es.Update(ctx, r.es, streamID, func(ctx context.Context, agg *domain.SaltyUser) error { return nil }) + if err != nil { + return nil, err + } + + return &SaltyUser{ + Nick: nick, + Pubkey: a.Pubkey.String(), + Inbox: a.Inbox.String(), + }, err +} diff --git a/api/gql_ev/salty.graphqls b/api/gql_ev/salty.graphqls new file mode 100644 index 0000000..0088be2 --- /dev/null +++ b/api/gql_ev/salty.graphqls @@ -0,0 +1,14 @@ +extend type Query { + saltyUser(nick: String!): SaltyUser +} + +extend type Mutation { + createSaltyUser(nick: String! pubkey: String!): SaltyUser +} + +type SaltyUser { + nick: String! + pubkey: String! + inbox: String! + endpoint: String! +} \ No newline at end of file diff --git a/go.mod b/go.mod index 0c73899..f173970 100644 --- a/go.mod +++ b/go.mod @@ -23,22 +23,29 @@ require ( ) require ( + github.com/ScaleFT/sshkeys v0.0.0-20200327173127-6142f742bca5 // indirect github.com/agnivade/levenshtein v1.1.1 // indirect github.com/beeker1121/goque v2.1.0+incompatible // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cenkalti/backoff/v4 v4.1.3 // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/dchest/bcrypt_pbkdf v0.0.0-20150205184540-83f37f9c154a // indirect + github.com/dchest/blake2b v1.0.0 // indirect github.com/felixge/httpsnoop v1.0.3 // indirect github.com/go-logr/logr v1.2.3 // indirect github.com/go-ole/go-ole v1.2.6 // indirect github.com/golang/protobuf v1.5.2 // indirect github.com/golang/snappy v0.0.4 // indirect + github.com/google/go-cmp v0.5.8 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0 // indirect github.com/hashicorp/golang-lru v0.5.4 // indirect + github.com/keybase/saltpack v0.0.0-20200430135328-e19b1910c0c5 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/onsi/ginkgo v1.14.0 // indirect github.com/onsi/gomega v1.10.3 // indirect + github.com/pkg/errors v0.9.1 // indirect github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect github.com/prometheus/client_golang v1.12.2 // indirect github.com/prometheus/client_model v0.2.0 // indirect @@ -46,21 +53,27 @@ require ( github.com/prometheus/procfs v0.7.3 // indirect github.com/shirou/gopsutil/v3 v3.22.3 // indirect github.com/syndtr/goleveldb v1.0.0 // indirect + github.com/tyler-smith/go-bip39 v1.1.0 // indirect + github.com/vmihailenco/msgpack/v4 v4.3.12 // indirect + github.com/vmihailenco/tagparser v0.1.2 // indirect github.com/yusufpapurcu/wmi v1.2.2 // indirect go.opentelemetry.io/contrib v1.9.0 // indirect go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.9.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.9.0 // indirect go.opentelemetry.io/proto/otlp v0.18.0 // indirect go.uber.org/atomic v1.9.0 // indirect + golang.org/x/crypto v0.0.0-20210921155107-089bfa567519 // indirect golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f // indirect golang.org/x/sys v0.0.0-20220227234510-4e6760a101f9 // indirect golang.org/x/text v0.3.7 // indirect + google.golang.org/appengine v1.6.7 // indirect google.golang.org/genproto v0.0.0-20211118181313-81c1377c94b1 // indirect google.golang.org/grpc v1.46.2 // indirect google.golang.org/protobuf v1.28.0 // indirect ) require ( + github.com/keys-pub/keys v0.1.22 github.com/matryer/is v1.4.0 github.com/oklog/ulid/v2 v2.1.0 github.com/tidwall/gjson v1.10.2 // indirect diff --git a/go.sum b/go.sum index e86c7eb..acfe328 100644 --- a/go.sum +++ b/go.sum @@ -37,6 +37,8 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03 github.com/BurntSushi/toml v1.1.0/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= +github.com/ScaleFT/sshkeys v0.0.0-20200327173127-6142f742bca5 h1:VauE2GcJNZFun2Och6tIT2zJZK1v6jxALQDA9BIji/E= +github.com/ScaleFT/sshkeys v0.0.0-20200327173127-6142f742bca5/go.mod h1:gxOHeajFfvGQh/fxlC8oOKBe23xnnJTif00IFFbiT+o= github.com/agnivade/levenshtein v1.0.1/go.mod h1:CURSv5d9Uaml+FovSIICkLbAUZ9S4RqaHDIsdSBg7lM= github.com/agnivade/levenshtein v1.1.1 h1:QY8M92nrzkmr798gCo3kmMyqXFzdQVpxLlGPRBij0P8= github.com/agnivade/levenshtein v1.1.1/go.mod h1:veldBMzWxcCG2ZvUTKD2kJNRdCk5hVbJomOvKkmgYbo= @@ -77,9 +79,15 @@ github.com/cncf/xds/go v0.0.0-20210922020428-25de7278fc84/go.mod h1:eXthEFrGJvWH github.com/cncf/xds/go v0.0.0-20211001041855-01bcc9b48dfe/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cpuguy83/go-md2man/v2 v2.0.1/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/danieljoos/wincred v1.1.0/go.mod h1:XYlo+eRTsVA9aHGp7NGjFkPla4m+DCL7hqDjlFjiygg= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dchest/bcrypt_pbkdf v0.0.0-20150205184540-83f37f9c154a h1:saTgr5tMLFnmy/yg3qDTft4rE5DY2uJ/cCxCe3q0XTU= +github.com/dchest/bcrypt_pbkdf v0.0.0-20150205184540-83f37f9c154a/go.mod h1:Bw9BbhOJVNR+t0jCqx2GC6zv0TGBsShs56Y3gfSCvl0= +github.com/dchest/blake2b v1.0.0 h1:KK9LimVmE0MjRl9095XJmKqZ+iLxWATvlcpVFRtaw6s= +github.com/dchest/blake2b v1.0.0/go.mod h1:U034kXgbJpCle2wSk5ybGIVhOSHCVLMDqOzcPEA0F7s= github.com/dgryski/trifles v0.0.0-20200323201526-dd97f9abfb48 h1:fRzb/w+pyskVMQ+UbP35JkH8yB7MYb4q/qhBarqZE6g= github.com/dgryski/trifles v0.0.0-20200323201526-dd97f9abfb48/go.mod h1:if7Fbed8SFyPtHLHbg49SI7NAdJiC5WIA09pe59rfAA= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= @@ -92,6 +100,7 @@ github.com/envoyproxy/go-control-plane v0.10.2-0.20220325020618-49ff273808a1/go. github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/felixge/httpsnoop v1.0.3 h1:s/nj+GCswXYzN5v2DpNMuMQYe+0DDwt5WVCU6CWBdXk= github.com/felixge/httpsnoop v1.0.3/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= +github.com/flynn/noise v1.0.0/go.mod h1:xbMo+0i6+IGbYdJhF31t2eR1BIU0CYc12+BNAKwUTag= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= @@ -113,6 +122,7 @@ github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY= github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= +github.com/godbus/dbus v4.1.0+incompatible/go.mod h1:/YcGZj5zSblfDWMMoOzV4fas9FZnQYTkDnsGvmh2Grw= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/glog v1.0.0 h1:nfP3RFugxnNRyKgeWd4oI1nYvXpxrx8ck8ZrcizshdQ= @@ -142,6 +152,7 @@ github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QD github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.1/go.mod h1:DopwsBzvsk0Fs44TXzsVbJyPhcCPeIwnvohx4u74HPM= github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= @@ -161,6 +172,7 @@ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE= github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg= +github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0= @@ -196,13 +208,25 @@ github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/X github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= github.com/kevinmbeaulieu/eq-go v1.0.0/go.mod h1:G3S8ajA56gKBZm4UB9AOyoOS37JO3roToPzKNM8dtdM= +github.com/keybase/go-codec v0.0.0-20180928230036-164397562123/go.mod h1:r/eVVWCngg6TsFV/3HuS9sWhDkAzGG8mXhiuYA+Z/20= +github.com/keybase/go-keychain v0.0.0-20201121013009-976c83ec27a6/go.mod h1:N83iQ9rnnzi2KZuTu+0xBcD1JNWn1jSN140ggAF7HeE= +github.com/keybase/go.dbus v0.0.0-20200324223359-a94be52c0b03/go.mod h1:a8clEhrrGV/d76/f9r2I41BwANMihfZYV9C223vaxqE= +github.com/keybase/saltpack v0.0.0-20200430135328-e19b1910c0c5 h1:X6nYzCVURqxDv0GuyptaCcRFTXPM0rSGNUrTeQ2NKUQ= +github.com/keybase/saltpack v0.0.0-20200430135328-e19b1910c0c5/go.mod h1:FNSq71OhXv/Z1W9M37nnHxJVhXitc03z6qshCbAten8= +github.com/keys-pub/keys v0.1.22 h1:bO0nx7c3HuC8dqjmjZ8njC8DzpuKWnOZQ5njaO5+A+o= +github.com/keys-pub/keys v0.1.22/go.mod h1:+41yREqLkYyGfGf4OkhUn/ljwe/+kwhrlTq1/46Jj8c= +github.com/keys-pub/secretservice v0.0.0-20200519003656-26e44b8df47f/go.mod h1:YRHMiVbZqh7u8xRm77CvwJNAZdDlNXwWvQ4DK0N9mYg= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI= +github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/logrusorgru/aurora/v3 v3.0.0/go.mod h1:vsR12bk5grlLvLXAYrBsb5Oc/N+LxAlxggSjiwMnCUc= github.com/logzio/logzio-go v1.0.6 h1:BIVu5TWDZc0vlEkwSDjoxPlV/aMJV2LdM3k+CjdzFDg= github.com/logzio/logzio-go v1.0.6/go.mod h1:ljlI3Zfi3hntJiHqCqWSUPT9cZP6yvDHUzDl5ZLGYRE= @@ -242,6 +266,7 @@ github.com/onsi/gomega v1.10.3/go.mod h1:V9xEwhxec5O8UDM77eCW8vLymOMltsqPVYWrpDs github.com/pborman/getopt v0.0.0-20170112200414-7148bc3a4c30/go.mod h1:85jBQOZwpVEaDAr341tbn15RS4fCAsIst0qp7i8ex1o= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= @@ -286,6 +311,7 @@ github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrf github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.3.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= @@ -307,10 +333,17 @@ github.com/tidwall/wal v1.1.7 h1:emc1TRjIVsdKKSnpwGBAcsAGg0767SvUk8+ygx7Bb+4= github.com/tidwall/wal v1.1.7/go.mod h1:r6lR1j27W9EPalgHiB7zLJDYu3mzW5BQP5KrzBpYY/E= github.com/tklauser/go-sysconf v0.3.10/go.mod h1:C8XykCvCb+Gn0oNCWPIlcb0RuglQTYaQ2hGm7jmxEFk= github.com/tklauser/numcpus v0.4.0/go.mod h1:1+UI3pD8NW14VMwdgJNJ1ESk2UnwhAnz5hMwiKKqXCQ= +github.com/tyler-smith/go-bip39 v1.1.0 h1:5eUemwrMargf3BSLRRCalXT93Ns6pQJIjYQN2nyfOP8= +github.com/tyler-smith/go-bip39 v1.1.0/go.mod h1:gUYDtqQw1JS3ZJ8UWVcGTGqqr6YIN3CWg+kkNaLt55U= github.com/urfave/cli/v2 v2.8.1/go.mod h1:Z41J9TPoffeoqP0Iza0YbAhGvymRdZAd2uPmZ5JxRdY= github.com/vektah/gqlparser/v2 v2.4.6/go.mod h1:flJWIR04IMQPGz+BXLrORkrARBxv/rtyIAFvd/MceW0= github.com/vektah/gqlparser/v2 v2.4.7 h1:yub2WLoSIr+chP1zMv6bjrsgTasfubxGZJeC8ISEpgE= github.com/vektah/gqlparser/v2 v2.4.7/go.mod h1:flJWIR04IMQPGz+BXLrORkrARBxv/rtyIAFvd/MceW0= +github.com/vmihailenco/msgpack/v4 v4.3.12 h1:07s4sz9IReOgdikxLTKNbBdqDMLsjPKXwvCazn8G65U= +github.com/vmihailenco/msgpack/v4 v4.3.12/go.mod h1:gborTTJjAo/GWTqqRjrLCn9pgNN+NXzzngzBKDPIqw4= +github.com/vmihailenco/tagparser v0.1.1/go.mod h1:OeAg3pn3UbLjkWt+rN9oFYB6u/cQgqMEUPoW2WPyhdI= +github.com/vmihailenco/tagparser v0.1.2 h1:gnjoVuB/kljJ5wICEEOpx98oXMWPLj22G67Vbd1qPqc= +github.com/vmihailenco/tagparser v0.1.2/go.mod h1:OeAg3pn3UbLjkWt+rN9oFYB6u/cQgqMEUPoW2WPyhdI= github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673/go.mod h1:N3UwUGtsrSj3ccvlPHLoLsHnpR27oXr4ZE984MbSER8= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= @@ -360,7 +393,11 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20200427165652-729f1e841bcc/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519 h1:7I4JAnoQBe7ZtJcBaYHi5UtiO8tQHbUSXxL+pnGRANg= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -427,6 +464,7 @@ golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81R golang.org/x/net v0.0.0-20201006153459-a7d1128ccaa0/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20210326060303-6b1517762897/go.mod h1:uSPa2vr4CLtc/ILN5odXGNXS6mhrKVzTaCXzk9m6W3k= golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f h1:OfiFi4JbukWwe3lzw+xunroH1mnC1e2Gy5cxNJApiSY= @@ -475,6 +513,7 @@ golang.org/x/sys v0.0.0-20200113162924-86b910548bc1/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200122134326-e047566fdf82/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200212091648-12a6c2dcc1e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200219091948-cb0a6d8edb6c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200302150141-5c8b2ff67527/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -492,6 +531,7 @@ golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20201204225414-ed752295db88/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210324051608-47abb6519492/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -505,6 +545,8 @@ golang.org/x/sys v0.0.0-20220128215802-99c3d69c2c27/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220227234510-4e6760a101f9 h1:nhht2DYV/Sn3qOayu8lM+cU1ii9sTLUeBQwQQfUHtrs= golang.org/x/sys v0.0.0-20220227234510-4e6760a101f9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210317153231-de623e64d2a6 h1:EC6+IGYTjPpRfv9a2b/6Puw0W+hLtAhkV1tPsXhutqs= +golang.org/x/term v0.0.0-20210317153231-de623e64d2a6/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -586,6 +628,8 @@ google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7 google.golang.org/appengine v1.6.1/go.mod h1:i06prIuMbXzDqacNJfV5OdTW448YApPu5ww/cMBSeb0= google.golang.org/appengine v1.6.5/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= google.golang.org/appengine v1.6.6/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= +google.golang.org/appengine v1.6.7 h1:FZR1q0exgwxzPzp/aF+VccGrSfxfPpkBqjIIEq3ru6c= +google.golang.org/appengine v1.6.7/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/genproto v0.0.0-20190307195333-5fe7a883aa19/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= google.golang.org/genproto v0.0.0-20190418145605-e7d98fc518a7/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= @@ -655,6 +699,8 @@ gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLks gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= diff --git a/internal/graph/generated/generated.go b/internal/graph/generated/generated.go index b7419b8..956f69a 100644 --- a/internal/graph/generated/generated.go +++ b/internal/graph/generated/generated.go @@ -40,6 +40,7 @@ type Config struct { } type ResolverRoot interface { + Mutation() MutationResolver Query() QueryResolver Subscription() SubscriptionResolver } @@ -60,6 +61,10 @@ type ComplexityRoot struct { StreamID func(childComplexity int) int } + Mutation struct { + CreateSaltyUser func(childComplexity int, nick string, pubkey string) int + } + PageInfo struct { Begin func(childComplexity int) int End func(childComplexity int) int @@ -77,9 +82,17 @@ type ComplexityRoot struct { Query struct { Posts func(childComplexity int, streamID string, paging *gql_ev.PageInput) int + SaltyUser func(childComplexity int, nick string) int __resolve__service func(childComplexity int) int } + SaltyUser struct { + Endpoint func(childComplexity int) int + Inbox func(childComplexity int) int + Nick func(childComplexity int) int + Pubkey func(childComplexity int) int + } + Subscription struct { PostAdded func(childComplexity int, streamID string, after int64) int } @@ -89,8 +102,12 @@ type ComplexityRoot struct { } } +type MutationResolver interface { + CreateSaltyUser(ctx context.Context, nick string, pubkey string) (*gql_ev.SaltyUser, error) +} type QueryResolver interface { Posts(ctx context.Context, streamID string, paging *gql_ev.PageInput) (*gql_ev.Connection, error) + SaltyUser(ctx context.Context, nick string) (*gql_ev.SaltyUser, error) } type SubscriptionResolver interface { PostAdded(ctx context.Context, streamID string, after int64) (<-chan *gql_ev.PostEvent, error) @@ -153,6 +170,18 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in return e.complexity.Meta.StreamID(childComplexity), true + case "Mutation.createSaltyUser": + if e.complexity.Mutation.CreateSaltyUser == nil { + break + } + + args, err := ec.field_Mutation_createSaltyUser_args(context.TODO(), rawArgs) + if err != nil { + return 0, false + } + + return e.complexity.Mutation.CreateSaltyUser(childComplexity, args["nick"].(string), args["pubkey"].(string)), true + case "PageInfo.begin": if e.complexity.PageInfo.Begin == nil { break @@ -228,6 +257,18 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in return e.complexity.Query.Posts(childComplexity, args["streamID"].(string), args["paging"].(*gql_ev.PageInput)), true + case "Query.saltyUser": + if e.complexity.Query.SaltyUser == nil { + break + } + + args, err := ec.field_Query_saltyUser_args(context.TODO(), rawArgs) + if err != nil { + return 0, false + } + + return e.complexity.Query.SaltyUser(childComplexity, args["nick"].(string)), true + case "Query._service": if e.complexity.Query.__resolve__service == nil { break @@ -235,6 +276,34 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in return e.complexity.Query.__resolve__service(childComplexity), true + case "SaltyUser.endpoint": + if e.complexity.SaltyUser.Endpoint == nil { + break + } + + return e.complexity.SaltyUser.Endpoint(childComplexity), true + + case "SaltyUser.inbox": + if e.complexity.SaltyUser.Inbox == nil { + break + } + + return e.complexity.SaltyUser.Inbox(childComplexity), true + + case "SaltyUser.nick": + if e.complexity.SaltyUser.Nick == nil { + break + } + + return e.complexity.SaltyUser.Nick(childComplexity), true + + case "SaltyUser.pubkey": + if e.complexity.SaltyUser.Pubkey == nil { + break + } + + return e.complexity.SaltyUser.Pubkey(childComplexity), true + case "Subscription.postAdded": if e.complexity.Subscription.PostAdded == nil { break @@ -278,6 +347,21 @@ func (e *executableSchema) Exec(ctx context.Context) graphql.ResponseHandler { var buf bytes.Buffer data.MarshalGQL(&buf) + return &graphql.Response{ + Data: buf.Bytes(), + } + } + case ast.Mutation: + return func(ctx context.Context) *graphql.Response { + if !first { + return nil + } + first = false + ctx = graphql.WithUnmarshalerMap(ctx, inputUnmarshalMap) + data := ec._Mutation(ctx, rc.Operation.SelectionSet) + var buf bytes.Buffer + data.MarshalGQL(&buf) + return &graphql.Response{ Data: buf.Bytes(), } @@ -383,6 +467,20 @@ type PostEvent implements Edge { tags: [String!]! meta: Meta! +}`, BuiltIn: false}, + {Name: "../../../api/gql_ev/salty.graphqls", Input: `extend type Query { + saltyUser(nick: String!): SaltyUser +} + +extend type Mutation { + createSaltyUser(nick: String! pubkey: String!): SaltyUser +} + +type SaltyUser { + nick: String! + pubkey: String! + inbox: String! + endpoint: String! }`, BuiltIn: false}, {Name: "../../../federation/directives.graphql", Input: ` scalar _Any @@ -411,6 +509,30 @@ var parsedSchema = gqlparser.MustLoadSchema(sources...) // region ***************************** args.gotpl ***************************** +func (ec *executionContext) field_Mutation_createSaltyUser_args(ctx context.Context, rawArgs map[string]interface{}) (map[string]interface{}, error) { + var err error + args := map[string]interface{}{} + var arg0 string + if tmp, ok := rawArgs["nick"]; ok { + ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("nick")) + arg0, err = ec.unmarshalNString2string(ctx, tmp) + if err != nil { + return nil, err + } + } + args["nick"] = arg0 + var arg1 string + if tmp, ok := rawArgs["pubkey"]; ok { + ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("pubkey")) + arg1, err = ec.unmarshalNString2string(ctx, tmp) + if err != nil { + return nil, err + } + } + args["pubkey"] = arg1 + return args, nil +} + func (ec *executionContext) field_Query___type_args(ctx context.Context, rawArgs map[string]interface{}) (map[string]interface{}, error) { var err error args := map[string]interface{}{} @@ -450,6 +572,21 @@ func (ec *executionContext) field_Query_posts_args(ctx context.Context, rawArgs return args, nil } +func (ec *executionContext) field_Query_saltyUser_args(ctx context.Context, rawArgs map[string]interface{}) (map[string]interface{}, error) { + var err error + args := map[string]interface{}{} + var arg0 string + if tmp, ok := rawArgs["nick"]; ok { + ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("nick")) + arg0, err = ec.unmarshalNString2string(ctx, tmp) + if err != nil { + return nil, err + } + } + args["nick"] = arg0 + return args, nil +} + func (ec *executionContext) field_Subscription_postAdded_args(ctx context.Context, rawArgs map[string]interface{}) (map[string]interface{}, error) { var err error args := map[string]interface{}{} @@ -786,6 +923,68 @@ func (ec *executionContext) fieldContext_Meta_position(ctx context.Context, fiel return fc, nil } +func (ec *executionContext) _Mutation_createSaltyUser(ctx context.Context, field graphql.CollectedField) (ret graphql.Marshaler) { + fc, err := ec.fieldContext_Mutation_createSaltyUser(ctx, field) + if err != nil { + return graphql.Null + } + ctx = graphql.WithFieldContext(ctx, fc) + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return ec.resolvers.Mutation().CreateSaltyUser(rctx, fc.Args["nick"].(string), fc.Args["pubkey"].(string)) + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + return graphql.Null + } + res := resTmp.(*gql_ev.SaltyUser) + fc.Result = res + return ec.marshalOSaltyUser2ᚖgithubᚗcomᚋsourᚑisᚋevᚋapiᚋgql_evᚐSaltyUser(ctx, field.Selections, res) +} + +func (ec *executionContext) fieldContext_Mutation_createSaltyUser(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "Mutation", + Field: field, + IsMethod: true, + IsResolver: true, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + switch field.Name { + case "nick": + return ec.fieldContext_SaltyUser_nick(ctx, field) + case "pubkey": + return ec.fieldContext_SaltyUser_pubkey(ctx, field) + case "inbox": + return ec.fieldContext_SaltyUser_inbox(ctx, field) + case "endpoint": + return ec.fieldContext_SaltyUser_endpoint(ctx, field) + } + return nil, fmt.Errorf("no field named %q was found under type SaltyUser", field.Name) + }, + } + defer func() { + if r := recover(); r != nil { + err = ec.Recover(ctx, r) + ec.Error(ctx, err) + } + }() + ctx = graphql.WithFieldContext(ctx, fc) + if fc.Args, err = ec.field_Mutation_createSaltyUser_args(ctx, field.ArgumentMap(ec.Variables)); err != nil { + ec.Error(ctx, err) + return + } + return fc, nil +} + func (ec *executionContext) _PageInfo_next(ctx context.Context, field graphql.CollectedField, obj *gql_ev.PageInfo) (ret graphql.Marshaler) { fc, err := ec.fieldContext_PageInfo_next(ctx, field) if err != nil { @@ -1253,6 +1452,68 @@ func (ec *executionContext) fieldContext_Query_posts(ctx context.Context, field return fc, nil } +func (ec *executionContext) _Query_saltyUser(ctx context.Context, field graphql.CollectedField) (ret graphql.Marshaler) { + fc, err := ec.fieldContext_Query_saltyUser(ctx, field) + if err != nil { + return graphql.Null + } + ctx = graphql.WithFieldContext(ctx, fc) + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return ec.resolvers.Query().SaltyUser(rctx, fc.Args["nick"].(string)) + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + return graphql.Null + } + res := resTmp.(*gql_ev.SaltyUser) + fc.Result = res + return ec.marshalOSaltyUser2ᚖgithubᚗcomᚋsourᚑisᚋevᚋapiᚋgql_evᚐSaltyUser(ctx, field.Selections, res) +} + +func (ec *executionContext) fieldContext_Query_saltyUser(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "Query", + Field: field, + IsMethod: true, + IsResolver: true, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + switch field.Name { + case "nick": + return ec.fieldContext_SaltyUser_nick(ctx, field) + case "pubkey": + return ec.fieldContext_SaltyUser_pubkey(ctx, field) + case "inbox": + return ec.fieldContext_SaltyUser_inbox(ctx, field) + case "endpoint": + return ec.fieldContext_SaltyUser_endpoint(ctx, field) + } + return nil, fmt.Errorf("no field named %q was found under type SaltyUser", field.Name) + }, + } + defer func() { + if r := recover(); r != nil { + err = ec.Recover(ctx, r) + ec.Error(ctx, err) + } + }() + ctx = graphql.WithFieldContext(ctx, fc) + if fc.Args, err = ec.field_Query_saltyUser_args(ctx, field.ArgumentMap(ec.Variables)); err != nil { + ec.Error(ctx, err) + return + } + return fc, nil +} + func (ec *executionContext) _Query__service(ctx context.Context, field graphql.CollectedField) (ret graphql.Marshaler) { fc, err := ec.fieldContext_Query__service(ctx, field) if err != nil { @@ -1430,6 +1691,182 @@ func (ec *executionContext) fieldContext_Query___schema(ctx context.Context, fie return fc, nil } +func (ec *executionContext) _SaltyUser_nick(ctx context.Context, field graphql.CollectedField, obj *gql_ev.SaltyUser) (ret graphql.Marshaler) { + fc, err := ec.fieldContext_SaltyUser_nick(ctx, field) + if err != nil { + return graphql.Null + } + ctx = graphql.WithFieldContext(ctx, fc) + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return obj.Nick, nil + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + if !graphql.HasFieldError(ctx, fc) { + ec.Errorf(ctx, "must not be null") + } + return graphql.Null + } + res := resTmp.(string) + fc.Result = res + return ec.marshalNString2string(ctx, field.Selections, res) +} + +func (ec *executionContext) fieldContext_SaltyUser_nick(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "SaltyUser", + Field: field, + IsMethod: false, + IsResolver: false, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + return nil, errors.New("field of type String does not have child fields") + }, + } + return fc, nil +} + +func (ec *executionContext) _SaltyUser_pubkey(ctx context.Context, field graphql.CollectedField, obj *gql_ev.SaltyUser) (ret graphql.Marshaler) { + fc, err := ec.fieldContext_SaltyUser_pubkey(ctx, field) + if err != nil { + return graphql.Null + } + ctx = graphql.WithFieldContext(ctx, fc) + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return obj.Pubkey, nil + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + if !graphql.HasFieldError(ctx, fc) { + ec.Errorf(ctx, "must not be null") + } + return graphql.Null + } + res := resTmp.(string) + fc.Result = res + return ec.marshalNString2string(ctx, field.Selections, res) +} + +func (ec *executionContext) fieldContext_SaltyUser_pubkey(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "SaltyUser", + Field: field, + IsMethod: false, + IsResolver: false, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + return nil, errors.New("field of type String does not have child fields") + }, + } + return fc, nil +} + +func (ec *executionContext) _SaltyUser_inbox(ctx context.Context, field graphql.CollectedField, obj *gql_ev.SaltyUser) (ret graphql.Marshaler) { + fc, err := ec.fieldContext_SaltyUser_inbox(ctx, field) + if err != nil { + return graphql.Null + } + ctx = graphql.WithFieldContext(ctx, fc) + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return obj.Inbox, nil + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + if !graphql.HasFieldError(ctx, fc) { + ec.Errorf(ctx, "must not be null") + } + return graphql.Null + } + res := resTmp.(string) + fc.Result = res + return ec.marshalNString2string(ctx, field.Selections, res) +} + +func (ec *executionContext) fieldContext_SaltyUser_inbox(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "SaltyUser", + Field: field, + IsMethod: false, + IsResolver: false, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + return nil, errors.New("field of type String does not have child fields") + }, + } + return fc, nil +} + +func (ec *executionContext) _SaltyUser_endpoint(ctx context.Context, field graphql.CollectedField, obj *gql_ev.SaltyUser) (ret graphql.Marshaler) { + fc, err := ec.fieldContext_SaltyUser_endpoint(ctx, field) + if err != nil { + return graphql.Null + } + ctx = graphql.WithFieldContext(ctx, fc) + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return obj.Endpoint(ctx), nil + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + if !graphql.HasFieldError(ctx, fc) { + ec.Errorf(ctx, "must not be null") + } + return graphql.Null + } + res := resTmp.(string) + fc.Result = res + return ec.marshalNString2string(ctx, field.Selections, res) +} + +func (ec *executionContext) fieldContext_SaltyUser_endpoint(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "SaltyUser", + Field: field, + IsMethod: true, + IsResolver: false, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + return nil, errors.New("field of type String does not have child fields") + }, + } + return fc, nil +} + func (ec *executionContext) _Subscription_postAdded(ctx context.Context, field graphql.CollectedField) (ret func(ctx context.Context) graphql.Marshaler) { fc, err := ec.fieldContext_Subscription_postAdded(ctx, field) if err != nil { @@ -3473,6 +3910,42 @@ func (ec *executionContext) _Meta(ctx context.Context, sel ast.SelectionSet, obj return out } +var mutationImplementors = []string{"Mutation"} + +func (ec *executionContext) _Mutation(ctx context.Context, sel ast.SelectionSet) graphql.Marshaler { + fields := graphql.CollectFields(ec.OperationContext, sel, mutationImplementors) + ctx = graphql.WithFieldContext(ctx, &graphql.FieldContext{ + Object: "Mutation", + }) + + out := graphql.NewFieldSet(fields) + var invalids uint32 + for i, field := range fields { + innerCtx := graphql.WithRootFieldContext(ctx, &graphql.RootFieldContext{ + Object: field.Name, + Field: field, + }) + + switch field.Name { + case "__typename": + out.Values[i] = graphql.MarshalString("Mutation") + case "createSaltyUser": + + out.Values[i] = ec.OperationContext.RootResolverMiddleware(innerCtx, func(ctx context.Context) (res graphql.Marshaler) { + return ec._Mutation_createSaltyUser(ctx, field) + }) + + default: + panic("unknown field " + strconv.Quote(field.Name)) + } + } + out.Dispatch() + if invalids > 0 { + return graphql.Null + } + return out +} + var pageInfoImplementors = []string{"PageInfo"} func (ec *executionContext) _PageInfo(ctx context.Context, sel ast.SelectionSet, obj *gql_ev.PageInfo) graphql.Marshaler { @@ -3630,6 +4103,26 @@ func (ec *executionContext) _Query(ctx context.Context, sel ast.SelectionSet) gr return ec.OperationContext.RootResolverMiddleware(ctx, innerFunc) } + out.Concurrently(i, func() graphql.Marshaler { + return rrm(innerCtx) + }) + case "saltyUser": + field := field + + innerFunc := func(ctx context.Context) (res graphql.Marshaler) { + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + } + }() + res = ec._Query_saltyUser(ctx, field) + return res + } + + rrm := func(ctx context.Context) graphql.Marshaler { + return ec.OperationContext.RootResolverMiddleware(ctx, innerFunc) + } + out.Concurrently(i, func() graphql.Marshaler { return rrm(innerCtx) }) @@ -3679,6 +4172,68 @@ func (ec *executionContext) _Query(ctx context.Context, sel ast.SelectionSet) gr return out } +var saltyUserImplementors = []string{"SaltyUser"} + +func (ec *executionContext) _SaltyUser(ctx context.Context, sel ast.SelectionSet, obj *gql_ev.SaltyUser) graphql.Marshaler { + fields := graphql.CollectFields(ec.OperationContext, sel, saltyUserImplementors) + out := graphql.NewFieldSet(fields) + var invalids uint32 + for i, field := range fields { + switch field.Name { + case "__typename": + out.Values[i] = graphql.MarshalString("SaltyUser") + case "nick": + + out.Values[i] = ec._SaltyUser_nick(ctx, field, obj) + + if out.Values[i] == graphql.Null { + atomic.AddUint32(&invalids, 1) + } + case "pubkey": + + out.Values[i] = ec._SaltyUser_pubkey(ctx, field, obj) + + if out.Values[i] == graphql.Null { + atomic.AddUint32(&invalids, 1) + } + case "inbox": + + out.Values[i] = ec._SaltyUser_inbox(ctx, field, obj) + + if out.Values[i] == graphql.Null { + atomic.AddUint32(&invalids, 1) + } + case "endpoint": + field := field + + innerFunc := func(ctx context.Context) (res graphql.Marshaler) { + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + } + }() + res = ec._SaltyUser_endpoint(ctx, field, obj) + if res == graphql.Null { + atomic.AddUint32(&invalids, 1) + } + return res + } + + out.Concurrently(i, func() graphql.Marshaler { + return innerFunc(ctx) + + }) + default: + panic("unknown field " + strconv.Quote(field.Name)) + } + } + out.Dispatch() + if invalids > 0 { + return graphql.Null + } + return out +} + var subscriptionImplementors = []string{"Subscription"} func (ec *executionContext) _Subscription(ctx context.Context, sel ast.SelectionSet) func(ctx context.Context) graphql.Marshaler { @@ -4602,6 +5157,13 @@ func (ec *executionContext) marshalOPostEvent2ᚖgithubᚗcomᚋsourᚑisᚋev return ec._PostEvent(ctx, sel, v) } +func (ec *executionContext) marshalOSaltyUser2ᚖgithubᚗcomᚋsourᚑisᚋevᚋapiᚋgql_evᚐSaltyUser(ctx context.Context, sel ast.SelectionSet, v *gql_ev.SaltyUser) graphql.Marshaler { + if v == nil { + return graphql.Null + } + return ec._SaltyUser(ctx, sel, v) +} + func (ec *executionContext) unmarshalOString2string(ctx context.Context, v interface{}) (string, error) { res, err := graphql.UnmarshalString(v) return res, graphql.ErrorOnPath(ctx, err) diff --git a/internal/graph/resolver.go b/internal/graph/resolver.go index f884591..f253ebf 100644 --- a/internal/graph/resolver.go +++ b/internal/graph/resolver.go @@ -23,11 +23,16 @@ func New(r *gql_ev.Resolver) *Resolver { // Query returns generated.QueryResolver implementation. func (r *Resolver) Query() generated.QueryResolver { return &queryResolver{r} } +// Query returns generated.QueryResolver implementation. +func (r *Resolver) Mutation() generated.MutationResolver { return &mutationResolver{r} } + // Subscription returns generated.SubscriptionResolver implementation. func (r *Resolver) Subscription() generated.SubscriptionResolver { return &subscriptionResolver{r} } type queryResolver struct{ *Resolver } +type mutationResolver struct{ *Resolver } + type subscriptionResolver struct{ *Resolver } func (r *Resolver) ChainMiddlewares(h http.Handler) http.Handler { diff --git a/main.go b/main.go index 6914df0..eb42e13 100644 --- a/main.go +++ b/main.go @@ -18,6 +18,7 @@ import ( "github.com/sour-is/ev/internal/graph" "github.com/sour-is/ev/internal/graph/generated" "github.com/sour-is/ev/internal/logz" + "github.com/sour-is/ev/pkg/domain" "github.com/sour-is/ev/pkg/es" diskstore "github.com/sour-is/ev/pkg/es/driver/disk-store" memstore "github.com/sour-is/ev/pkg/es/driver/mem-store" @@ -44,13 +45,16 @@ func main() { } Mup.Add(ctx, 1) - if err := run(ctx); err != nil { - log.Println(err) + if err := run(ctx); err != nil && err != http.ErrServerClosed { + log.Fatal(err) } } func run(ctx context.Context) error { diskstore.Init(ctx) memstore.Init(ctx) + if err := domain.Init(ctx); err != nil { + return err + } es, err := es.Open(ctx, env("EV_DATA", "file:data"), streamer.New(ctx)) if err != nil { diff --git a/pkg/domain/salty-user.go b/pkg/domain/salty-user.go new file mode 100644 index 0000000..b7203e9 --- /dev/null +++ b/pkg/domain/salty-user.go @@ -0,0 +1,74 @@ +package domain + +import ( + "context" + "crypto/sha256" + "fmt" + "log" + "strings" + + "github.com/keys-pub/keys" + "github.com/oklog/ulid/v2" + "github.com/sour-is/ev/pkg/es/event" +) + +func Init(ctx context.Context) error { + return event.Register(ctx, &UserRegistered{}) +} + +type SaltyUser struct { + Name string + Pubkey *keys.EdX25519PublicKey + Inbox ulid.ULID + + event.AggregateRoot +} + +var _ event.Aggregate = (*SaltyUser)(nil) + +// ApplyEvent applies the event to the aggrigate state +func (a *SaltyUser) ApplyEvent(lis ...event.Event) { + for _, e := range lis { + switch e := e.(type) { + case *UserRegistered: + a.Name = e.Name + a.Pubkey = e.Pubkey + a.Inbox = e.EventMeta().EventID + a.SetStreamID(a.streamID()) + default: + log.Printf("unknown event %T", e) + } + } +} + +func (a *SaltyUser) streamID() string { + return fmt.Sprintf("saltyuser-%x", sha256.Sum256([]byte(strings.ToLower(a.Name)))) +} + +func (a *SaltyUser) OnUserRegister(name string, pubkey *keys.EdX25519PublicKey) error { + event.Raise(a, &UserRegistered{Name: name, Pubkey: pubkey}) + return nil +} + +type UserRegistered struct { + Name string + Pubkey *keys.EdX25519PublicKey + Endpoint ulid.ULID + + eventMeta event.Meta +} + +var _ event.Event = (*UserRegistered)(nil) + +func (e *UserRegistered) EventMeta() event.Meta { + if e == nil { + return event.Meta{} + } + return e.eventMeta +} + +func (e *UserRegistered) SetEventMeta(m event.Meta) { + if e != nil { + e.eventMeta = m + } +} diff --git a/pkg/es/driver/disk-store/disk-store.go b/pkg/es/driver/disk-store/disk-store.go index ccafb14..51c44b1 100644 --- a/pkg/es/driver/disk-store/disk-store.go +++ b/pkg/es/driver/disk-store/disk-store.go @@ -3,7 +3,6 @@ package diskstore import ( "context" "fmt" - "log" "os" "path/filepath" "strings" @@ -94,12 +93,14 @@ func (d *diskStore) Open(ctx context.Context, dsn string) (driver.Driver, error) err := w.Close() if err != nil { - log.Print(err) + span.RecordError(err) + return err } return nil }) }) if err != nil { + span.RecordError(err) return nil, err } logs := &openlogs{logs: c} @@ -127,6 +128,7 @@ func (ds *diskStore) EventLog(ctx context.Context, streamID string) (driver.Even l, err := wal.Open(filepath.Join(ds.path, streamID), wal.DefaultOptions) if err != nil { + span.RecordError(err) return err } el.events = locker.New(l) @@ -155,6 +157,7 @@ func (es *eventLog) Append(ctx context.Context, events event.Events, version uin last, err := l.LastIndex() if err != nil { + span.RecordError(err) return err } @@ -170,6 +173,8 @@ func (es *eventLog) Append(ctx context.Context, events event.Events, version uin b, err = event.MarshalText(e) if err != nil { + span.RecordError(err) + return err } pos := last + uint64(i) + 1 @@ -196,10 +201,12 @@ func (es *eventLog) Read(ctx context.Context, pos, count int64) (event.Events, e first, err := stream.FirstIndex() if err != nil { + span.RecordError(err) return err } last, err := stream.LastIndex() if err != nil { + span.RecordError(err) return err } // --- @@ -208,7 +215,7 @@ func (es *eventLog) Read(ctx context.Context, pos, count int64) (event.Events, e } start, count := math.PagerBox(first, last, pos, count) - log.Println("reading", first, last, pos, count, start) + span.AddEvent(fmt.Sprint("reading", first, last, pos, count, start)) if count == 0 { return nil } @@ -221,10 +228,12 @@ func (es *eventLog) Read(ctx context.Context, pos, count int64) (event.Events, e var b []byte b, err = stream.Read(start) if err != nil { + span.RecordError(err) return err } events[i], err = event.UnmarshalText(ctx, b, start) if err != nil { + span.RecordError(err) return err } // --- @@ -242,12 +251,13 @@ func (es *eventLog) Read(ctx context.Context, pos, count int64) (event.Events, e return nil }) if err != nil { + span.RecordError(err) return nil, err } event.SetStreamID(es.streamID, events...) - return events, err + return events, nil } func (es *eventLog) FirstIndex(ctx context.Context) (uint64, error) { _, span := logz.Span(ctx) @@ -277,3 +287,6 @@ func (es *eventLog) LastIndex(ctx context.Context) (uint64, error) { return idx, err } +func (es *eventLog) LoadForUpdate(ctx context.Context, a event.Aggregate, fn func(context.Context, event.Aggregate) error) (uint64, error) { + panic("not implemented") +} diff --git a/pkg/es/driver/driver.go b/pkg/es/driver/driver.go index 625baf6..175970e 100644 --- a/pkg/es/driver/driver.go +++ b/pkg/es/driver/driver.go @@ -14,8 +14,10 @@ type Driver interface { type EventLog interface { Read(ctx context.Context, pos, count int64) (event.Events, error) Append(ctx context.Context, events event.Events, version uint64) (uint64, error) - FirstIndex(ctx context.Context) (uint64, error) - LastIndex(ctx context.Context) (uint64, error) + FirstIndex(context.Context) (uint64, error) + LastIndex(context.Context) (uint64, error) + + LoadForUpdate(context.Context, event.Aggregate, func(context.Context, event.Aggregate) error) (uint64, error) } type Subscription interface { diff --git a/pkg/es/driver/mem-store/mem-store.go b/pkg/es/driver/mem-store/mem-store.go index e5d7d89..ddec1c9 100644 --- a/pkg/es/driver/mem-store/mem-store.go +++ b/pkg/es/driver/mem-store/mem-store.go @@ -79,6 +79,8 @@ func (m *eventLog) Append(ctx context.Context, events event.Events, version uint _, span := logz.Span(ctx) defer span.End() + span.AddEvent(fmt.Sprintf(" %s %#v %d", m.streamID, stream, len(*stream))) + last := uint64(len(*stream)) if version != AppendOnly && version != last { return fmt.Errorf("current version wrong %d != %d", version, last) @@ -97,16 +99,18 @@ func (m *eventLog) Append(ctx context.Context, events event.Events, version uint } // Read implements driver.EventStore -func (es *eventLog) Read(ctx context.Context, pos int64, count int64) (event.Events, error) { +func (m *eventLog) Read(ctx context.Context, pos int64, count int64) (event.Events, error) { ctx, span := logz.Span(ctx) defer span.End() var events event.Events - err := es.events.Modify(ctx, func(stream *event.Events) error { + err := m.events.Modify(ctx, func(stream *event.Events) error { _, span := logz.Span(ctx) defer span.End() + span.AddEvent(fmt.Sprintf(" %s %#v %d", m.streamID, stream, len(*stream))) + first := stream.First().EventMeta().Position last := stream.Last().EventMeta().Position // --- @@ -118,7 +122,7 @@ func (es *eventLog) Read(ctx context.Context, pos int64, count int64) (event.Eve if count == 0 { return nil } - + span.AddEvent(fmt.Sprint("box", first, last, pos, count)) events = make([]event.Event, math.Abs(count)) for i := range events { span.AddEvent(fmt.Sprintf("read event %d of %d", i, math.Abs(count))) @@ -143,7 +147,7 @@ func (es *eventLog) Read(ctx context.Context, pos int64, count int64) (event.Eve return nil, err } - event.SetStreamID(es.streamID, events...) + event.SetStreamID(m.streamID, events...) return events, nil } @@ -165,3 +169,7 @@ func (m *eventLog) LastIndex(ctx context.Context) (uint64, error) { events, err := m.events.Copy(ctx) return events.Last().EventMeta().Position, err } + +func (m *eventLog) LoadForUpdate(ctx context.Context, a event.Aggregate, fn func(context.Context, event.Aggregate) error) (uint64, error) { + panic("not implemented") +} diff --git a/pkg/es/driver/streamer/streamer.go b/pkg/es/driver/streamer/streamer.go index 380ee9b..ccc6507 100644 --- a/pkg/es/driver/streamer/streamer.go +++ b/pkg/es/driver/streamer/streamer.go @@ -169,6 +169,10 @@ func (w *wrapper) LastIndex(ctx context.Context) (uint64, error) { return w.up.LastIndex(ctx) } +func (w *wrapper) LoadForUpdate(ctx context.Context, a event.Aggregate, fn func(context.Context, event.Aggregate) error) (uint64, error) { + return w.up.LoadForUpdate(ctx, a, fn) +} + type position struct { size int64 idx int64 diff --git a/pkg/es/es.go b/pkg/es/es.go index 902c91b..639b935 100644 --- a/pkg/es/es.go +++ b/pkg/es/es.go @@ -110,13 +110,17 @@ func (es *EventStore) Save(ctx context.Context, agg event.Aggregate) (uint64, er ctx, span := logz.Span(ctx) defer span.End() + events := agg.Events(true) + if len(events) == 0 { + return 0, nil + } + Mes_save.Add(ctx, 1) l, err := es.EventLog(ctx, agg.StreamID()) if err != nil { return 0, err } - events := agg.Events(true) count, err := l.Append(ctx, events, agg.StreamVersion()) if err != nil { @@ -141,6 +145,7 @@ func (es *EventStore) Load(ctx context.Context, agg event.Aggregate) error { if err != nil { return err } + event.Append(agg, events...) return nil @@ -189,7 +194,6 @@ func (es *EventStore) LastIndex(ctx context.Context, streamID string) (uint64, e } return l.LastIndex(ctx) } - func (es *EventStore) EventStream() driver.EventStream { d := es.Driver for d != nil { @@ -212,3 +216,65 @@ func Unwrap[T any](t T) T { } var ErrNoDriver = errors.New("no driver") + +type PA[T any] interface { + event.Aggregate + *T +} + +// Create uses fn to create a new aggregate and store in db. +func Create[A any, T PA[A]](ctx context.Context, es *EventStore, streamID string, fn func(context.Context, T) error) (agg T, err error) { + ctx, span := logz.Span(ctx) + defer span.End() + + agg = new(A) + agg.SetStreamID(streamID) + + if err = es.Load(ctx, agg); err != nil { + return + } + + if err = event.NotExists(agg); err != nil { + return + } + + if err = fn(ctx, agg); err != nil { + return + } + + var i uint64 + if i, err = es.Save(ctx, agg); err != nil { + return + } + + span.AddEvent(fmt.Sprint("wrote events = ", i)) + + return +} + +// Update uses fn to update an exsisting aggregate and store in db. +func Update[A any, T PA[A]](ctx context.Context, es *EventStore, streamID string, fn func(context.Context, T) error) (agg T, err error) { + ctx, span := logz.Span(ctx) + defer span.End() + + agg = new(A) + agg.SetStreamID(streamID) + + if err = es.Load(ctx, agg); err != nil { + return + } + + if err = event.ShouldExist(agg); err != nil { + return + } + + if err = fn(ctx, agg); err != nil { + return + } + + if _, err = es.Save(ctx, agg); err != nil { + return + } + + return +} diff --git a/pkg/es/event/aggregate.go b/pkg/es/event/aggregate.go index 51b3f07..6b0e9f3 100644 --- a/pkg/es/event/aggregate.go +++ b/pkg/es/event/aggregate.go @@ -9,7 +9,6 @@ import ( type Aggregate interface { // ApplyEvent applies the event to the aggrigate state ApplyEvent(...Event) - StreamID() string AggregateRootInterface } @@ -28,14 +27,6 @@ func Append(a Aggregate, lis ...Event) { a.ApplyEvent(lis...) } -// CheckVersion returns an error if the version does not match. -func CheckVersion(a Aggregate, version uint64) error { - if version != uint64(a.StreamVersion()) { - return fmt.Errorf("version wrong, got (proposed) %d != (expected) %d", version, a.StreamVersion()) - } - return nil -} - // NotExists returns error if there are no events present. func NotExists(a Aggregate) error { if a.StreamVersion() != 0 { @@ -44,10 +35,22 @@ func NotExists(a Aggregate) error { return nil } +// ShouldExists returns error if there are no events present. +func ShouldExist(a Aggregate) error { + if a.StreamVersion() == 0 { + return fmt.Errorf("%w, got version == %d", ErrShouldExist, a.StreamVersion()) + } + return nil +} + type AggregateRootInterface interface { - // Events returns the aggrigate events + // Events returns the aggregate events // pass true for only uncommitted events Events(bool) Events + // StreamID returns aggregate stream ID + StreamID() string + // SetStreamID sets aggregate stream ID + SetStreamID(streamID string) // StreamVersion returns last commit events StreamVersion() uint64 // Version returns the current aggrigate version. (committed + uncommitted) @@ -62,18 +65,17 @@ var _ AggregateRootInterface = &AggregateRoot{} type AggregateRoot struct { events Events + streamID string streamVersion uint64 mu sync.RWMutex } -func (a *AggregateRoot) Commit() { - a.streamVersion = uint64(len(a.events)) -} - -func (a *AggregateRoot) StreamVersion() uint64 { - return a.streamVersion -} +func (a *AggregateRoot) Commit() { a.streamVersion = uint64(len(a.events)) } +func (a *AggregateRoot) StreamID() string { return a.streamID } +func (a *AggregateRoot) SetStreamID(streamID string) { a.streamID = streamID } +func (a *AggregateRoot) StreamVersion() uint64 { return a.streamVersion } +func (a *AggregateRoot) Version() uint64 { return uint64(len(a.events)) } func (a *AggregateRoot) Events(new bool) Events { a.mu.RLock() defer a.mu.RUnlock() @@ -88,9 +90,6 @@ func (a *AggregateRoot) Events(new bool) Events { return lis } -func (a *AggregateRoot) Version() uint64 { - return uint64(len(a.events)) -} //lint:ignore U1000 is called by embeded interface func (a *AggregateRoot) raise(lis ...Event) { //nolint @@ -122,3 +121,4 @@ func (a *AggregateRoot) posStartAt(lis ...Event) { } var ErrShouldNotExist = errors.New("should not exist") +var ErrShouldExist = errors.New("should exist") diff --git a/pkg/es/event/aggregate_test.go b/pkg/es/event/aggregate_test.go new file mode 100644 index 0000000..ddc634b --- /dev/null +++ b/pkg/es/event/aggregate_test.go @@ -0,0 +1,56 @@ +package event_test + +import ( + "testing" + + "github.com/sour-is/ev/pkg/es/event" +) + +type Agg struct { + Value string + + event.AggregateRoot +} + +var _ event.Aggregate = (*Agg)(nil) + +func (a *Agg) streamID() string { + return "value-" + a.Value +} + +// ApplyEvent applies the event to the aggrigate state +func (a *Agg) ApplyEvent(lis ...event.Event) { + for _, e := range lis { + switch e := e.(type) { + case *ValueApplied: + a.Value = e.Value + a.SetStreamID(a.streamID()) + } + } +} + +type ValueApplied struct { + Value string + + eventMeta event.Meta +} + +var _ event.Event = (*ValueApplied)(nil) + +func (e *ValueApplied) EventMeta() event.Meta { + if e == nil { + return event.Meta{} + } + return e.eventMeta +} + +func (e *ValueApplied) SetEventMeta(m event.Meta) { + if e != nil { + e.eventMeta = m + } +} + +func TestAggregate(t *testing.T) { + agg := &Agg{} + event.Append(agg, &ValueApplied{Value: "one"}) +} diff --git a/pkg/math/math_test.go b/pkg/math/math_test.go index 83cd4fc..2ae1d90 100644 --- a/pkg/math/math_test.go +++ b/pkg/math/math_test.go @@ -1,7 +1,6 @@ package math_test import ( - "log" "testing" "github.com/matryer/is" @@ -82,9 +81,9 @@ func TestPagerBox(t *testing.T) { for _, tt := range tests { start, count := math.PagerBox(tt.first, tt.last, tt.pos, tt.n) if count > 0 { - log.Print(tt, "|", start, count, int64(start)+count-1) + t.Log(tt, "|", start, count, int64(start)+count-1) } else { - log.Print(tt, "|", start, count, int64(start)+count+1) + t.Log(tt, "|", start, count, int64(start)+count+1) } is.Equal(start, tt.start) diff --git a/pkg/msgbus/service.go b/pkg/msgbus/service.go index 961fddb..a73499d 100644 --- a/pkg/msgbus/service.go +++ b/pkg/msgbus/service.go @@ -6,7 +6,6 @@ import ( "encoding/json" "fmt" "io" - "log" "net/http" "strconv" "strings" @@ -85,10 +84,10 @@ func (s *service) get(w http.ResponseWriter, r *http.Request) { count = i } - log.Print("GET topic=", name, " idx=", pos, " n=", count) + span.AddEvent(fmt.Sprint("GET topic=", name, " idx=", pos, " n=", count)) events, err := s.es.Read(ctx, "post-"+name, pos, count) if err != nil { - log.Print(err) + span.RecordError(err) w.WriteHeader(http.StatusInternalServerError) return } @@ -97,7 +96,7 @@ func (s *service) get(w http.ResponseWriter, r *http.Request) { w.Header().Add("Content-Type", "application/json") if err = encodeJSON(w, first, events...); err != nil { - log.Print(err) + span.RecordError(err) w.WriteHeader(http.StatusInternalServerError) return @@ -159,7 +158,6 @@ func (s *service) post(w http.ResponseWriter, r *http.Request) { m := events.First().EventMeta() span.AddEvent(fmt.Sprint("POST topic=", name, " tags=", tags, " idx=", m.Position, " id=", m.EventID)) - // log.Print("POST topic=", name, " tags=", tags, " idx=", m.Position, " id=", m.EventID) w.WriteHeader(http.StatusAccepted) if strings.Contains(r.Header.Get("Accept"), "application/json") { @@ -201,11 +199,11 @@ func (s *service) websocket(w http.ResponseWriter, r *http.Request) { pos = i - 1 } - log.Print("WS topic=", name, " idx=", pos) + span.AddEvent(fmt.Sprint("WS topic=", name, " idx=", pos)) c, err := upgrader.Upgrade(w, r, nil) if err != nil { - log.Print("upgrade:", err) + span.RecordError(err) return } defer c.Close() @@ -222,54 +220,54 @@ func (s *service) websocket(w http.ResponseWriter, r *http.Request) { } mt, message, err := c.ReadMessage() if err != nil { - log.Println("read:", err) + span.RecordError(err) return } - log.Printf("recv: %d %s", mt, message) + span.AddEvent(fmt.Sprintf("recv: %d %s", mt, message)) } }() es := s.es.EventStream() if es == nil { - log.Println("EventStore does not implement streaming") + span.AddEvent(fmt.Sprint("EventStore does not implement streaming")) w.WriteHeader(http.StatusInternalServerError) return } sub, err := es.Subscribe(ctx, "post-"+name, pos) if err != nil { - log.Println(err) + span.RecordError(err) return } defer func() { ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) defer cancel() - log.Println("stop ws") + span.AddEvent(fmt.Sprint("stop ws")) sub.Close(ctx) }() - log.Println("start ws") + span.AddEvent(fmt.Sprint("start ws")) for sub.Recv(ctx) { events, err := sub.Events(ctx) if err != nil { break } - log.Println("got events ", len(events)) + span.AddEvent(fmt.Sprint("got events ", len(events))) for i := range events { e, ok := events[i].(*PostEvent) if !ok { continue } - log.Println("send", e.String()) + span.AddEvent(fmt.Sprint("send", i, e.String())) var b bytes.Buffer if err = encodeJSON(&b, first, e); err != nil { - log.Print(err) + span.RecordError(err) } err = c.WriteMessage(websocket.TextMessage, b.Bytes()) if err != nil { - log.Println("write:", err) + span.RecordError(err) break } }