diff --git a/app/webfinger/events.go b/app/webfinger/events.go new file mode 100644 index 0000000..88987c3 --- /dev/null +++ b/app/webfinger/events.go @@ -0,0 +1,117 @@ +package webfinger + +import ( + "encoding/json" + + "github.com/sour-is/ev/pkg/es/event" +) + +type SubjectSet struct { + Subject string `json:"subject"` + Aliases []string `json:"aliases,omitempty"` + Properties map[string]*string `json:"properties,omitempty"` + + eventMeta event.Meta +} + +func (e *SubjectSet) EventMeta() event.Meta { + if e == nil { + return event.Meta{} + } + return e.eventMeta +} +func (e *SubjectSet) SetEventMeta(m event.Meta) { + if e != nil { + e.eventMeta = m + } +} +func (e *SubjectSet) MarshalBinary() (text []byte, err error) { + return json.Marshal(e) +} +func (e *SubjectSet) UnmarshalBinary(b []byte) error { + return json.Unmarshal(b, e) +} + +var _ event.Event = (*SubjectSet)(nil) + +type SubjectDeleted struct { + Subject string `json:"subject"` + + eventMeta event.Meta +} + +func (e *SubjectDeleted) EventMeta() event.Meta { + if e == nil { + return event.Meta{} + } + return e.eventMeta +} +func (e *SubjectDeleted) SetEventMeta(m event.Meta) { + if e != nil { + e.eventMeta = m + } +} +func (e *SubjectDeleted) MarshalBinary() (text []byte, err error) { + return json.Marshal(e) +} +func (e *SubjectDeleted) UnmarshalBinary(b []byte) error { + return json.Unmarshal(b, e) +} + +var _ event.Event = (*SubjectDeleted)(nil) + +type LinkSet struct { + Rel string `json:"rel"` + Type string `json:"type,omitempty"` + HRef string `json:"href,omitempty"` + Titles map[string]string `json:"titles,omitempty"` + Properties map[string]*string `json:"properties,omitempty"` + + eventMeta event.Meta +} + +func (e *LinkSet) EventMeta() event.Meta { + if e == nil { + return event.Meta{} + } + return e.eventMeta +} +func (e *LinkSet) SetEventMeta(m event.Meta) { + if e != nil { + e.eventMeta = m + } +} +func (e *LinkSet) MarshalBinary() (text []byte, err error) { + return json.Marshal(e) +} +func (e *LinkSet) UnmarshalBinary(b []byte) error { + return json.Unmarshal(b, e) +} + +var _ event.Event = (*LinkSet)(nil) + +type LinkDeleted struct { + Rel string `json:"rel"` + + eventMeta event.Meta +} + +func (e *LinkDeleted) EventMeta() event.Meta { + if e == nil { + return event.Meta{} + } + return e.eventMeta +} +func (e *LinkDeleted) SetEventMeta(m event.Meta) { + if e != nil { + e.eventMeta = m + } +} +func (e *LinkDeleted) MarshalBinary() (text []byte, err error) { + return json.Marshal(e) +} +func (e *LinkDeleted) UnmarshalBinary(b []byte) error { + return json.Unmarshal(b, e) +} + +var _ event.Event = (*LinkDeleted)(nil) diff --git a/app/webfinger/jrd.go b/app/webfinger/jrd.go new file mode 100644 index 0000000..1e7d2b7 --- /dev/null +++ b/app/webfinger/jrd.go @@ -0,0 +1,303 @@ +package webfinger + +import ( + "encoding/base64" + "encoding/json" + "fmt" + "hash/fnv" + "sort" + + "github.com/sour-is/ev/pkg/es/event" + "github.com/sour-is/ev/pkg/slice" +) + +func StreamID(subject string) string { + h := fnv.New128a() + h.Write([]byte(subject)) + return "webfinger." + base64.RawURLEncoding.EncodeToString(h.Sum(nil)) +} + +// JRD is a JSON Resource Descriptor, specifying properties and related links +// for a resource. +type JRD struct { + Subject string `json:"subject,omitempty"` + Aliases []string `json:"aliases,omitempty"` + Properties map[string]*string `json:"properties,omitempty"` + Links Links `json:"links,omitempty"` + + deleted bool + event.AggregateRoot +} + +var _ event.Aggregate = (*JRD)(nil) + +// Link is a link to a related resource. +type Link struct { + Rel string `json:"rel,omitempty"` + Type string `json:"type,omitempty"` + HRef string `json:"href,omitempty"` + Titles map[string]string `json:"titles,omitempty"` + Properties map[string]*string `json:"properties,omitempty"` +} + +type Links []*Link + +// Len is the number of elements in the collection. +func (l Links) Len() int { + if l == nil { + return 0 + } + return len(l) +} + +// Less reports whether the element with index i +func (l Links) Less(i int, j int) bool { + if l[i] == nil || l[j] == nil { + return false + } + return l[i].Rel < l[j].Rel +} + +// Swap swaps the elements with indexes i and j. +func (l Links) Swap(i int, j int) { + if l == nil { + return + } + l[i], l[j] = l[j], l[i] +} + +// ParseJRD parses the JRD using json.Unmarshal. +func ParseJRD(blob []byte) (*JRD, error) { + jrd := JRD{} + err := json.Unmarshal(blob, &jrd) + if err != nil { + return nil, err + } + return &jrd, nil +} + +// GetLinkByRel returns the first *Link with the specified rel value. +func (jrd *JRD) GetLinkByRel(rel string) *Link { + for _, link := range jrd.Links { + if link.Rel == rel { + return link + } + } + return nil +} + +// GetProperty Returns the property value as a string. +// Per spec a property value can be null, empty string is returned in this case. +func (jrd *JRD) GetProperty(uri string) string { + if jrd.Properties[uri] == nil { + return "" + } + return *jrd.Properties[uri] +} + +// GetProperty Returns the property value as a string. +// Per spec a property value can be null, empty string is returned in this case. +func (link *Link) GetProperty(uri string) string { + if link.Properties[uri] == nil { + return "" + } + return *link.Properties[uri] +} +func (link *Link) SetProperty(name string, value *string) { + if link.Properties == nil { + link.Properties = make(map[string]*string) + } + link.Properties[name] = value +} + +// ApplyEvent implements event.Aggregate +func (a *JRD) ApplyEvent(events ...event.Event) { + for _, e := range events { + switch e := e.(type) { + case *SubjectSet: + a.Subject = e.Subject + a.Aliases = e.Aliases + a.Properties = e.Properties + + case *SubjectDeleted: + a.deleted = true + + a.Subject = "" + a.Aliases = a.Aliases[:0] + a.Links = a.Links[:0] + a.Properties = map[string]*string{} + + case *LinkSet: + link, ok := slice.FindFn(func(l *Link) bool { return l.Rel == e.Rel }, a.Links...) + if !ok { + link = &Link{} + link.Rel = e.Rel + a.Links = append(a.Links, link) + } + + link.HRef = e.HRef + link.Type = e.Type + link.Titles = e.Titles + link.Properties = e.Properties + + case *LinkDeleted: + a.Links = slice.FilterFn(func(link *Link) bool { return link.Rel != e.Rel }, a.Links...) + } + } +} + +const NSpubkey = "https://sour.is/ns/pub" + +func (a *JRD) OnClaims(method, pubkey string, jrd *JRD) error { + if a.Version() > 0 { + if v, ok := a.Properties[NSpubkey]; ok && v != nil && *v == pubkey { + // pubkey matches! + } else { + return fmt.Errorf("pubkey does not match") + } + if a.Subject != jrd.Subject { + return fmt.Errorf("subject does not match") + } + + if method == "DELETE" { + event.Raise(a, &SubjectDeleted{Subject: jrd.Subject}) + return nil + } + } + + jrd.SetProperty(NSpubkey, &pubkey) + + err := a.OnSubjectSet(jrd.Subject, jrd.Aliases, jrd.Properties) + if err != nil { + return err + } + + sort.Sort(jrd.Links) + sort.Sort(a.Links) + for _, z := range slice.Align( + jrd.Links, + a.Links, + func(l, r *Link) bool { return l.Rel < r.Rel }, + ) { + // Not in new == delete + if z.Key == nil { + link := *z.Value + event.Raise(a, &LinkDeleted{Rel: link.Rel}) + continue + } + + // Not in old == create + if z.Value == nil { + link := *z.Key + event.Raise(a, &LinkSet{ + Rel: link.Rel, + Type: link.Type, + HRef: link.HRef, + Titles: link.Titles, + Properties: link.Properties, + }) + continue + } + + // in both == compare + a.OnLinkSet(*z.Key, *z.Value) + } + + return nil +} + +func (a *JRD) OnSubjectSet(subject string, aliases []string, props map[string]*string) error { + modified := false + e := &SubjectSet{ + Subject: subject, + Aliases: aliases, + Properties: props, + } + + if subject != a.Subject { + modified = true + } + + sort.Strings(aliases) + sort.Strings(a.Aliases) + for _, z := range slice.Zip(aliases, a.Aliases) { + if z.Key != z.Value { + modified = true + break + } + } + + for _, z := range slice.Zip( + slice.Zip(slice.FromMap(props)), + slice.Zip(slice.FromMap(a.Properties)), + ) { + if z.Key != z.Value { + modified = true + break + } + } + + if modified { + event.Raise(a, e) + } + + return nil +} + +func (a *JRD) OnLinkSet(o, n *Link) error { + modified := false + e := &LinkSet{ + Rel: n.Rel, + Type: n.Type, + HRef: n.HRef, + Titles: n.Titles, + Properties: n.Properties, + } + + if n.Rel != o.Rel { + modified = true + } + if n.Type != o.Type { + modified = true + } + if n.HRef != o.HRef { + modified = true + } + + for _, z := range slice.Zip( + slice.Zip(slice.FromMap(n.Titles)), + slice.Zip(slice.FromMap(o.Titles)), + ) { + if z.Key != z.Value { + modified = true + break + } + } + + for _, z := range slice.Zip( + slice.Zip(slice.FromMap(n.Properties)), + slice.Zip(slice.FromMap(o.Properties)), + ) { + if z.Key != z.Value { + modified = true + break + } + } + + if modified { + event.Raise(a, e) + } + + return nil +} + +func (a *JRD) IsDeleted() bool { + return a.deleted +} + +func (a *JRD) SetProperty(name string, value *string) { + if a.Properties == nil { + a.Properties = make(map[string]*string) + } + a.Properties[name] = value +} diff --git a/app/webfinger/jrd_test.go b/app/webfinger/jrd_test.go new file mode 100644 index 0000000..345a365 --- /dev/null +++ b/app/webfinger/jrd_test.go @@ -0,0 +1,266 @@ +package webfinger_test + +import ( + "context" + "crypto/ed25519" + "encoding/base64" + "encoding/json" + "fmt" + "strings" + "testing" + "time" + + jwt "github.com/golang-jwt/jwt/v4" + "github.com/matryer/is" + "go.uber.org/multierr" + + "github.com/sour-is/ev" + "github.com/sour-is/ev/app/webfinger" + memstore "github.com/sour-is/ev/pkg/es/driver/mem-store" + "github.com/sour-is/ev/pkg/es/driver/projecter" + "github.com/sour-is/ev/pkg/es/driver/streamer" + "github.com/sour-is/ev/pkg/es/event" +) + +func TestParseJRD(t *testing.T) { + + // Adapted from spec http://tools.ietf.org/html/rfc6415#appendix-A + blob := ` + { + "subject":"http://blog.example.com/article/id/314", + "aliases":[ + "http://blog.example.com/cool_new_thing", + "http://blog.example.com/steve/article/7"], + "properties":{ + "http://blgx.example.net/ns/version":"1.3", + "http://blgx.example.net/ns/ext":null + }, + "links":[ + { + "rel":"author", + "type":"text/html", + "href":"http://blog.example.com/author/steve", + "titles":{ + "default":"About the Author", + "en-us":"Author Information" + }, + "properties":{ + "http://example.com/role":"editor" + } + }, + { + "rel":"author", + "href":"http://example.com/author/john", + "titles":{ + "default":"The other author" + } + }, + { + "rel":"copyright" + } + ] + } + ` + obj, err := webfinger.ParseJRD([]byte(blob)) + if err != nil { + t.Fatal(err) + } + if got, want := obj.Subject, "http://blog.example.com/article/id/314"; got != want { + t.Errorf("JRD.Subject is %q, want %q", got, want) + } + if got, want := obj.GetProperty("http://blgx.example.net/ns/version"), "1.3"; got != want { + t.Errorf("obj.GetProperty('http://blgx.example.net/ns/version') returned %q, want %q", got, want) + } + if got, want := obj.GetProperty("http://blgx.example.net/ns/ext"), ""; got != want { + t.Errorf("obj.GetProperty('http://blgx.example.net/ns/ext') returned %q, want %q", got, want) + } + if obj.GetLinkByRel("copyright") == nil { + t.Error("obj.GetLinkByRel('copyright') returned nil, want non-nil value") + } + if got, want := obj.GetLinkByRel("author").Titles["default"], "About the Author"; got != want { + t.Errorf("obj.GetLinkByRel('author').Titles['default'] returned %q, want %q", got, want) + } + if got, want := obj.GetLinkByRel("author").GetProperty("http://example.com/role"), "editor"; got != want { + t.Errorf("obj.GetLinkByRel('author').GetProperty('http://example.com/role') returned %q, want %q", got, want) + } +} + +func TestEncodeJRD(t *testing.T) { + s, err := json.Marshal(&webfinger.JRD{ + Subject: "test", + Properties: map[string]*string{ + "https://sour.is/ns/prop1": nil, + }, + }) + if err != nil { + t.Fatal(err) + } + if string(s) != `{"subject":"test","properties":{"https://sour.is/ns/prop1":null}}` { + t.Fatal("output does not match") + } +} + +// { "properties":{"https://sour.is/ns/pubkey":"kex1d330ama4vnu3vll5dgwjv3k0pcxsccc5k2xy3j8khndggkszsmsq3hl4ru"},"links":[{"rel":"salty:public","type":"application/json+salty","href":"https://ev.sour.is/inbox/01GAEMKXYJ4857JQP1MJGD61Z5","properties":{"pub":"kex1r8zshlvkc787pxvauaq7hd6awa9kmheddxjj9k80qmenyxk6284s50uvpw"}}]} +//!= {"subject":"acct:me@sour.is","properties":{"https://sour.is/ns/pubkey":"kex1d330ama4vnu3vll5dgwjv3k0pcxsccc5k2xy3j8khndggkszsmsq3hl4ru"},"links":[{"rel":"salty:public","type":"application/json+salty","href":"https://ev.sour.is/inbox/01GAEMKXYJ4857JQP1MJGD61Z5","properties":{"pub":"kex1r8zshlvkc787pxvauaq7hd6awa9kmheddxjj9k80qmenyxk6284s50uvpw"}}]} + +func TestApplyEvents(t *testing.T) { + is := is.New(t) + + events := event.NewEvents( + &webfinger.SubjectSet{ + Subject: "acct:me@sour.is", + Properties: map[string]*string{ + "https://sour.is/ns/pubkey": ptr("kex1d330ama4vnu3vll5dgwjv3k0pcxsccc5k2xy3j8khndggkszsmsq3hl4ru"), + }, + }, + &webfinger.LinkSet{ + Rel: "salty:public", + Type: "application/json+salty", + }, + &webfinger.LinkSet{ + Rel: "salty:private", + Type: "application/json+salty", + }, + &webfinger.LinkSet{ + Rel: "salty:public", + Type: "application/json+salty", + HRef: "https://ev.sour.is/inbox/01GAEMKXYJ4857JQP1MJGD61Z5", + Properties: map[string]*string{ + "pub": ptr("kex1r8zshlvkc787pxvauaq7hd6awa9kmheddxjj9k80qmenyxk6284s50uvpw"), + }, + }, + &webfinger.LinkDeleted{ + Rel: "salty:private", + }, + ) + event.SetStreamID(webfinger.StreamID("acct:me@sour.is"), events...) + + jrd := &webfinger.JRD{} + jrd.ApplyEvent(events...) + + s, err := json.Marshal(jrd) + if err != nil { + t.Fatal(err) + } + + is.Equal(string(s), `{"subject":"acct:me@sour.is","properties":{"https://sour.is/ns/pubkey":"kex1d330ama4vnu3vll5dgwjv3k0pcxsccc5k2xy3j8khndggkszsmsq3hl4ru"},"links":[{"rel":"salty:public","type":"application/json+salty","href":"https://ev.sour.is/inbox/01GAEMKXYJ4857JQP1MJGD61Z5","properties":{"pub":"kex1r8zshlvkc787pxvauaq7hd6awa9kmheddxjj9k80qmenyxk6284s50uvpw"}}]}`) + + events = event.NewEvents( + &webfinger.SubjectDeleted{}, + ) + event.SetStreamID(webfinger.StreamID("acct:me@sour.is"), events...) + + jrd.ApplyEvent(events...) + + s, err = json.Marshal(jrd) + if err != nil { + t.Fatal(err) + } + + t.Log(string(s)) + if string(s) != `{}` { + t.Fatal("output does not match") + } +} + +func TestCommands(t *testing.T) { + is := is.New(t) + ctx := context.Background() + + pub, priv, err := ed25519.GenerateKey(nil) + is.NoErr(err) + + // fmt.Println(base64.RawURLEncoding.EncodeToString(key)) + token := jwt.NewWithClaims(jwt.SigningMethodEdDSA, jwt.MapClaims{ + "sub": "acct:me@sour.is", + "pub": enc(pub), + "aliases": []string{"acct:xuu@sour.is"}, + "properties": map[string]*string{ + "https://example.com/ns/asdf": nil, + }, + "links": []map[string]any{{ + "rel": "salty:public", + "type": "application/json+salty", + "href": "https://ev.sour.is", + "titles": map[string]string{"default": "Jon Lundy"}, + "properties": map[string]*string{ + "pub": ptr("kex140fwaena9t0mrgnjeare5zuknmmvl0vc7agqy5yr938vusxfh9ys34vd2p"), + }, + }}, + "exp": time.Now().Add(30 * time.Second).Unix(), + }) + aToken, err := token.SignedString(priv) + is.NoErr(err) + + es, err := ev.Open(ctx, "mem:", streamer.New(ctx), projecter.New(ctx)) + is.NoErr(err) + + type claims struct { + Subject string `json:"sub"` + PubKey string `json:"pub"` + *webfinger.JRD + jwt.StandardClaims + } + + token, err = jwt.ParseWithClaims( + aToken, + &claims{}, + func(tok *jwt.Token) (any, error) { + c, ok := tok.Claims.(*claims) + if !ok { + return nil, fmt.Errorf("wrong type of claim") + } + + c.JRD.Subject = c.Subject + c.StandardClaims.Subject = c.Subject + + pub, err := dec(c.PubKey) + return ed25519.PublicKey(pub), err + }, + jwt.WithValidMethods([]string{"EdDSA"}), + jwt.WithJSONNumber(), + ) + is.NoErr(err) + + c, ok := token.Claims.(*claims) + is.True(ok) + + t.Logf("%#v", c) + a, err := ev.Upsert(ctx, es, webfinger.StreamID(c.Subject), func(ctx context.Context, a *webfinger.JRD) error { + a.OnClaims("POST", c.PubKey, c.JRD) + return nil + }) + is.NoErr(err) + + for _, e := range a.Events(false) { + t.Log(e) + } +} + +func ptr[T any](v T) *T { + return &v +} +func enc(b []byte) string { + return base64.RawURLEncoding.EncodeToString(b) +} +func dec(s string) ([]byte, error) { + s = strings.TrimSpace(s) + return base64.RawURLEncoding.DecodeString(s) +} + +func TestMain(m *testing.M) { + ctx, stop := context.WithCancel(context.Background()) + defer stop() + + err := multierr.Combine( + ev.Init(ctx), + event.Init(ctx), + memstore.Init(ctx), + ) + if err != nil { + fmt.Println(err) + return + } + + m.Run() +} diff --git a/app/webfinger/webfinger.go b/app/webfinger/webfinger.go index a6a1ab2..6173389 100644 --- a/app/webfinger/webfinger.go +++ b/app/webfinger/webfinger.go @@ -1 +1,187 @@ package webfinger + +import ( + "context" + "crypto/ed25519" + "encoding/base64" + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + "strings" + + "github.com/golang-jwt/jwt/v4" + "github.com/sour-is/ev" + "github.com/sour-is/ev/internal/lg" + "github.com/sour-is/ev/pkg/es/event" +) + +type service struct { + es *ev.EventStore +} + +func New(ctx context.Context, es *ev.EventStore) (*service, error) { + ctx, span := lg.Span(ctx) + defer span.End() + + if err := event.Register( + ctx, + &SubjectSet{}, + &SubjectDeleted{}, + &LinkSet{}, + &LinkDeleted{}, + ); err != nil { + return nil, err + } + svc := &service{es: es} + + return svc, nil +} + +func (s *service) RegisterHTTP(mux *http.ServeMux) {} +func (s *service) RegisterWellKnown(mux *http.ServeMux) { + mux.Handle("/webfinger", lg.Htrace(s, "webfinger")) +} +func (s *service) ServeHTTP(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + ctx, span := lg.Span(ctx) + defer span.End() + + if r.URL.Path != "/webfinger" { + w.WriteHeader(http.StatusNotFound) + fmt.Fprint(w, http.StatusText(http.StatusNotFound)) + return + + } + + switch r.Method { + case http.MethodPut, http.MethodDelete: + if r.ContentLength > 4096 { + w.WriteHeader(http.StatusRequestEntityTooLarge) + fmt.Fprint(w, http.StatusText(http.StatusRequestEntityTooLarge)) + span.AddEvent("request too large") + + return + } + body, err := io.ReadAll(io.LimitReader(r.Body, 4096)) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + fmt.Fprint(w, http.StatusText(http.StatusInternalServerError)) + span.RecordError(err) + + return + } + r.Body.Close() + + type claims struct { + Subject string `json:"sub"` + PubKey string `json:"pub"` + *JRD + jwt.StandardClaims + } + + token, err := jwt.ParseWithClaims( + string(body), + &claims{}, + func(tok *jwt.Token) (any, error) { + c, ok := tok.Claims.(*claims) + if !ok { + return nil, fmt.Errorf("wrong type of claim") + } + + c.JRD.Subject = c.Subject + c.StandardClaims.Subject = c.Subject + + pub, err := dec(c.PubKey) + return ed25519.PublicKey(pub), err + }, + jwt.WithValidMethods([]string{"EdDSA"}), + jwt.WithJSONNumber(), + ) + if err != nil { + w.WriteHeader(http.StatusUnprocessableEntity) + fmt.Fprint(w, http.StatusText(http.StatusUnprocessableEntity), ": ", err.Error()) + span.RecordError(err) + + return + } + + c, ok := token.Claims.(*claims) + if !ok { + w.WriteHeader(http.StatusUnprocessableEntity) + fmt.Fprint(w, http.StatusText(http.StatusUnprocessableEntity)) + span.AddEvent("not a claim") + + return + } + + a, err := ev.Upsert(ctx, s.es, StreamID(c.Subject), func(ctx context.Context, a *JRD) error { + a.OnClaims(r.Method, c.PubKey, c.JRD) + return nil + }) + + if err != nil { + w.WriteHeader(http.StatusUnprocessableEntity) + fmt.Fprint(w, http.StatusText(http.StatusUnprocessableEntity), ": ", err.Error()) + span.RecordError(err) + + return + } + + w.Header().Set("Content-Type", "application/jrd+json") + w.WriteHeader(http.StatusCreated) + + j := json.NewEncoder(w) + j.SetIndent("", " ") + err = j.Encode(a) + span.RecordError(err) + + case http.MethodGet: + resource := r.URL.Query().Get("resource") + + a := &JRD{} + a.SetStreamID(StreamID(resource)) + err := s.es.Load(ctx, a) + if err != nil { + span.RecordError(err) + + if errors.Is(err, ev.ErrNotFound) { + w.WriteHeader(http.StatusNotFound) + fmt.Fprint(w, http.StatusText(http.StatusNotFound)) + return + } + + w.WriteHeader(http.StatusInternalServerError) + fmt.Fprint(w, http.StatusText(http.StatusInternalServerError)) + return + } + + if a.IsDeleted() { + w.WriteHeader(http.StatusGone) + fmt.Fprint(w, http.StatusText(http.StatusGone)) + span.AddEvent("is deleted") + + return + } + + w.Header().Set("Content-Type", "application/jrd+json") + w.WriteHeader(http.StatusOK) + + j := json.NewEncoder(w) + j.SetIndent("", " ") + err = j.Encode(a) + span.RecordError(err) + + default: + w.Header().Set("Allow", "GET, PUT, DELETE, OPTIONS") + w.WriteHeader(http.StatusMethodNotAllowed) + fmt.Fprint(w, http.StatusText(http.StatusMethodNotAllowed)) + span.AddEvent("method not allow: " + r.Method) + } +} + +func dec(s string) ([]byte, error) { + s = strings.TrimSpace(s) + return base64.RawURLEncoding.DecodeString(s) +} diff --git a/cmd/ev/app.webfinger.go b/cmd/ev/app.webfinger.go new file mode 100644 index 0000000..a84cfba --- /dev/null +++ b/cmd/ev/app.webfinger.go @@ -0,0 +1,32 @@ +package main + +import ( + "context" + "fmt" + + "github.com/sour-is/ev" + "github.com/sour-is/ev/app/webfinger" + "github.com/sour-is/ev/internal/lg" + "github.com/sour-is/ev/pkg/service" + "github.com/sour-is/ev/pkg/slice" +) + +var _ = apps.Register(50, func(ctx context.Context, svc *service.Harness) error { + ctx, span := lg.Span(ctx) + defer span.End() + + span.AddEvent("Enable WebFinger") + eventstore, ok := slice.Find[*ev.EventStore](svc.Services...) + if !ok { + return fmt.Errorf("*es.EventStore not found in services") + } + + wf, err := webfinger.New(ctx, eventstore) + if err != nil { + span.RecordError(err) + return err + } + svc.Add(wf) + + return nil +}) diff --git a/cmd/webfinger/app.webfinger.go b/cmd/webfinger/app.webfinger.go new file mode 100644 index 0000000..a84cfba --- /dev/null +++ b/cmd/webfinger/app.webfinger.go @@ -0,0 +1,32 @@ +package main + +import ( + "context" + "fmt" + + "github.com/sour-is/ev" + "github.com/sour-is/ev/app/webfinger" + "github.com/sour-is/ev/internal/lg" + "github.com/sour-is/ev/pkg/service" + "github.com/sour-is/ev/pkg/slice" +) + +var _ = apps.Register(50, func(ctx context.Context, svc *service.Harness) error { + ctx, span := lg.Span(ctx) + defer span.End() + + span.AddEvent("Enable WebFinger") + eventstore, ok := slice.Find[*ev.EventStore](svc.Services...) + if !ok { + return fmt.Errorf("*es.EventStore not found in services") + } + + wf, err := webfinger.New(ctx, eventstore) + if err != nil { + span.RecordError(err) + return err + } + svc.Add(wf) + + return nil +}) diff --git a/cmd/webfinger/main.go b/cmd/webfinger/main.go new file mode 100644 index 0000000..5df79d0 --- /dev/null +++ b/cmd/webfinger/main.go @@ -0,0 +1,37 @@ +package main + +import ( + "context" + "errors" + "log" + "net/http" + "os" + "os/signal" + + "github.com/sour-is/ev/internal/lg" + "github.com/sour-is/ev/pkg/service" +) + +var apps service.Apps +var appName, version = service.AppName() + +func main() { + ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill) + go func() { + <-ctx.Done() + defer cancel() // restore interrupt function + }() + + svc := &service.Harness{} + + ctx, stop := lg.Init(ctx, appName) + svc.OnStop(stop) + svc.Add(lg.NewHTTP(ctx)) + + svc.Setup(ctx, apps.Apps()...) + + // Run application + if err := svc.Run(ctx, appName, version); err != nil && !errors.Is(err, http.ErrServerClosed) { + log.Fatal(err) + } +} diff --git a/cmd/webfinger/svc.es.go b/cmd/webfinger/svc.es.go new file mode 100644 index 0000000..00cec22 --- /dev/null +++ b/cmd/webfinger/svc.es.go @@ -0,0 +1,53 @@ +package main + +import ( + "context" + + "github.com/sour-is/ev" + "github.com/sour-is/ev/internal/lg" + "github.com/sour-is/ev/pkg/env" + "github.com/sour-is/ev/pkg/es" + diskstore "github.com/sour-is/ev/pkg/es/driver/disk-store" + memstore "github.com/sour-is/ev/pkg/es/driver/mem-store" + "github.com/sour-is/ev/pkg/es/driver/projecter" + resolvelinks "github.com/sour-is/ev/pkg/es/driver/resolve-links" + "github.com/sour-is/ev/pkg/es/driver/streamer" + "github.com/sour-is/ev/pkg/es/event" + "github.com/sour-is/ev/pkg/service" + "go.uber.org/multierr" +) + +var _ = apps.Register(10, func(ctx context.Context, svc *service.Harness) error { + ctx, span := lg.Span(ctx) + defer span.End() + + // setup eventstore + err := multierr.Combine( + ev.Init(ctx), + event.Init(ctx), + diskstore.Init(ctx), + memstore.Init(ctx), + ) + if err != nil { + span.RecordError(err) + return err + } + + eventstore, err := ev.Open( + ctx, + env.Default("EV_DATA", "mem:"), + resolvelinks.New(), + streamer.New(ctx), + projecter.New( + ctx, + projecter.DefaultProjection, + ), + ) + if err != nil { + span.RecordError(err) + return err + } + svc.Add(eventstore, &es.EventStore{EventStore: eventstore}) + + return nil +}) diff --git a/cmd/webfinger/svc.http.go b/cmd/webfinger/svc.http.go new file mode 100644 index 0000000..b350f99 --- /dev/null +++ b/cmd/webfinger/svc.http.go @@ -0,0 +1,46 @@ +package main + +import ( + "context" + "log" + "net/http" + "strings" + + "github.com/rs/cors" + "github.com/sour-is/ev/internal/lg" + "github.com/sour-is/ev/pkg/env" + "github.com/sour-is/ev/pkg/mux" + "github.com/sour-is/ev/pkg/service" + "github.com/sour-is/ev/pkg/slice" +) + +var _ = apps.Register(20, func(ctx context.Context, svc *service.Harness) error { + s := &http.Server{} + svc.Add(s) + + mux := mux.New() + s.Handler = cors.AllowAll().Handler(mux) + + // s.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // log.Println(r.URL.Path) + // mux.ServeHTTP(w, r) + // }) + + s.Addr = env.Default("EV_HTTP", ":8080") + if strings.HasPrefix(s.Addr, ":") { + s.Addr = "[::]" + s.Addr + } + svc.OnStart(func(ctx context.Context) error { + _, span := lg.Span(ctx) + defer span.End() + + log.Print("Listen on ", s.Addr) + span.AddEvent("begin listen and serve on " + s.Addr) + + mux.Add(slice.FilterType[interface{ RegisterHTTP(*http.ServeMux) }](svc.Services...)...) + return s.ListenAndServe() + }) + svc.OnStop(s.Shutdown) + + return nil +}) diff --git a/ev.go b/ev.go index dbd1ce5..6898879 100644 --- a/ev.go +++ b/ev.go @@ -164,6 +164,9 @@ func (es *EventStore) Load(ctx context.Context, agg event.Aggregate) error { if err != nil { return err } + if len(events) == 0 { + return ErrNotFound + } Mes_load.Add(ctx, events.Count()) event.Append(agg, events...) @@ -395,7 +398,7 @@ func Upsert[A any, T PA[A]](ctx context.Context, es *EventStore, streamID string attribute.String("agg.streamID", streamID), ) - if err = es.Load(ctx, agg); err != nil { + if err = es.Load(ctx, agg); err != nil && !errors.Is(err, ErrNotFound) { return } diff --git a/go.mod b/go.mod index 30fc50c..c54985c 100644 --- a/go.mod +++ b/go.mod @@ -100,6 +100,7 @@ require ( ) require ( + github.com/golang-jwt/jwt/v4 v4.4.3 github.com/keys-pub/keys v0.1.22 github.com/matryer/is v1.4.0 github.com/oklog/ulid/v2 v2.1.0 diff --git a/go.sum b/go.sum index de07fc2..835e00b 100644 --- a/go.sum +++ b/go.sum @@ -153,6 +153,8 @@ github.com/gobwas/ws v1.0.2 h1:CoAavW/wd/kulfZmSIBt6p24n4j7tHgNVCjsfHVNUbo= github.com/gobwas/ws v1.0.2/go.mod h1:szmBTxLgaFppYjEmNtny/v3w89xOydFnnZMcgRRu/EM= github.com/godbus/dbus v4.1.0+incompatible/go.mod h1:/YcGZj5zSblfDWMMoOzV4fas9FZnQYTkDnsGvmh2Grw= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= +github.com/golang-jwt/jwt/v4 v4.4.3 h1:Hxl6lhQFj4AnOX6MLrsCb/+7tCj7DxP7VA+2rDIq5AU= +github.com/golang-jwt/jwt/v4 v4.4.3/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/glog v1.0.0 h1:nfP3RFugxnNRyKgeWd4oI1nYvXpxrx8ck8ZrcizshdQ= github.com/golang/glog v1.0.0/go.mod h1:EWib/APOK0SL3dFbYqvxE3UYd8E6s1ouQ7iEp/0LWV4= diff --git a/pkg/set/set.go b/pkg/set/set.go index b948140..b31439f 100644 --- a/pkg/set/set.go +++ b/pkg/set/set.go @@ -34,6 +34,22 @@ func (s Set[T]) Delete(items ...T) Set[T] { return s } +func (s Set[T]) Equal(e Set[T]) bool { + for k := range s { + if _, ok := e[k]; !ok{ + return false + } + } + + for k := range e { + if _, ok := s[k]; !ok { + return false + } + } + + return true +} + func (s Set[T]) String() string { if s == nil { return "set()" diff --git a/pkg/slice/slice.go b/pkg/slice/slice.go index 538b028..c835575 100644 --- a/pkg/slice/slice.go +++ b/pkg/slice/slice.go @@ -1,5 +1,9 @@ package slice +import ( + "github.com/sour-is/ev/pkg/math" +) + // FilterType returns a subset that matches the type. func FilterType[T any](in ...any) []T { lis := make([]T, 0, len(in)) @@ -11,11 +15,25 @@ func FilterType[T any](in ...any) []T { return lis } +func FilterFn[T any](fn func(T) bool, in ...T) []T { + lis := make([]T, 0, len(in)) + for _, t := range in { + if fn(t) { + lis = append(lis, t) + } + } + return lis +} + // Find returns the first of type found. or false if not found. func Find[T any](in ...any) (T, bool) { return First(FilterType[T](in...)...) } +func FindFn[T any](fn func(T) bool, in ...T) (T, bool) { + return First(FilterFn(fn, in...)...) +} + // First returns the first element in a slice. func First[T any](in ...T) (T, bool) { if len(in) == 0 { @@ -26,10 +44,101 @@ func First[T any](in ...T) (T, bool) { } // Map applys func to each element s and returns results as slice. -func Map[T, U any](s []T, f func(T) U) []U { - r := make([]U, len(s)) - for i, v := range s { - r[i] = f(v) +func Map[T, U any](f func(T) U) func(...T) []U { + return func(lis ...T) []U { + r := make([]U, len(lis)) + for i, v := range lis { + r[i] = f(v) + } + return r } - return r +} + +func Reduce[T, R any](r R, fn func(T, R, int) R) func(...T) R { + return func(lis ...T) R { + for i, t := range lis { + r = fn(t, r, i) + } + return r + } +} + +type Pair[K comparable, V any] struct { + Key K + Value V +} + +func FromMap[K comparable, V any](m map[K]V) (keys []K, values []V) { + if m == nil { + return nil, nil + } + + keys = make([]K, 0, len(m)) + values = make([]V, 0, len(m)) + + for k := range m { + keys = append(keys, k) + } + for _, k := range keys { + values = append(values, m[k]) + } + + return keys, values +} + +func ToMap[K comparable, V any](keys []K, values []V) (m map[K]V) { + m = make(map[K]V, len(keys)) + + for i := range keys { + if len(values) < i { + break + } + m[keys[i]] = values[i] + } + + return m +} + +func Zip[K comparable, V any](k []K, v []V) []Pair[K, V] { + lis := make([]Pair[K, V], math.Max(len(k), len(v))) + for i := range lis { + if k != nil && len(k) > i { + lis[i].Key = k[i] + } + + if v != nil && len(v) > i { + lis[i].Value = v[i] + } + } + return lis +} + +func Align[T any](k []T, v []T, less func(T, T) bool) []Pair[*T, *T] { + lis := make([]Pair[*T, *T], 0, math.Max(len(k), len(v))) + + var j int + + for i := 0; i < len(k); { + if j >= len(v) || less(k[i], v[j]) { + lis = append(lis, Pair[*T, *T]{&k[i], nil}) + i++ + continue + } + + if less(v[j], k[i]) { + lis = append(lis, Pair[*T, *T]{nil, &v[j]}) + j++ + continue + } + + lis = append(lis, Pair[*T, *T]{&k[i], &v[j]}) + i++ + j++ + } + for ; j < len(v); j++ { + lis = append(lis, Pair[*T, *T]{nil, &v[j]}) + } + + return lis + } diff --git a/pkg/slice/slice_test.go b/pkg/slice/slice_test.go new file mode 100644 index 0000000..2ba81c7 --- /dev/null +++ b/pkg/slice/slice_test.go @@ -0,0 +1,54 @@ +package slice_test + +import ( + "testing" + + "github.com/matryer/is" + "github.com/sour-is/ev/pkg/slice" +) + +func TestAlign(t *testing.T) { + type testCase struct { + left, right []string + combined []slice.Pair[*string, *string] + } + + tests := []testCase{ + { + left: []string{"1", "3", "5"}, + right: []string{"2", "3", "4"}, + combined: []slice.Pair[*string, *string]{ + {ptr("1"), nil}, + {nil, ptr("2")}, + {ptr("3"), ptr("3")}, + {nil, ptr("4")}, + {ptr("5"), nil}, + }, + }, + + { + left: []string{"2", "3", "4"}, + right: []string{"1", "3", "5"}, + combined: []slice.Pair[*string, *string]{ + {nil, ptr("1")}, + {ptr("2"), nil}, + {ptr("3"), ptr("3")}, + {ptr("4"), nil}, + {nil, ptr("5")}, + }, + }, + + } + + is := is.New(t) + + for _, tt := range tests { + combined := slice.Align(tt.left, tt.right, func(l, r string) bool { return l < r }) + is.Equal(len(combined), len(tt.combined)) + for i := range combined { + is.Equal(combined[i], tt.combined[i]) + } + } +} + +func ptr[T any](v T) *T { return &v }