chore: add apps from go.sour.is/ev

This commit is contained in:
xuu
2023-09-29 10:31:25 -06:00
parent 976ce36be2
commit bec2c14d51
80 changed files with 13030 additions and 439 deletions

37
app/gql/common.graphqls Normal file
View File

@@ -0,0 +1,37 @@
scalar Time
scalar Map
type Connection @goModel(model: "go.sour.is/pkg/gql.Connection") {
paging: PageInfo!
edges: [Edge!]!
}
input PageInput @goModel(model: "go.sour.is/pkg/gql.PageInput") {
after: Int = 0
before: Int
count: Int = 30
}
type PageInfo @goModel(model: "go.sour.is/pkg/gql.PageInfo") {
next: Boolean!
prev: Boolean!
begin: Int!
end: Int!
}
interface Edge @goModel(model: "go.sour.is/pkg/gql.Edge"){
id: ID!
}
directive @goModel(
model: String
models: [String!]
) on OBJECT | INPUT_OBJECT | SCALAR | ENUM | INTERFACE | UNION
directive @goField(
forceResolver: Boolean
name: String
) on INPUT_FIELD_DEFINITION | FIELD_DEFINITION
directive @goTag(
key: String!
value: String
) on INPUT_FIELD_DEFINITION | FIELD_DEFINITION

View File

@@ -0,0 +1,34 @@
type Meta @goModel(model: "go.sour.is/ev/pkg/event.Meta") {
eventID: String! @goField(name: "getEventID")
streamID: String! @goField(name: "ActualStreamID")
position: Int! @goField(name: "ActualPosition")
created: Time!
}
extend type Query {
events(streamID: String! paging: PageInput): Connection!
}
extend type Mutation {
truncateStream(streamID: String! index:Int!): Boolean!
}
extend type Subscription {
"""after == 0 start from begining, after == -1 start from end"""
eventAdded(streamID: String! after: Int! = -1): Event
}
type Event implements Edge @goModel(model: "go.sour.is/ev/pkg/gql.Event") {
id: ID!
eventID: String!
streamID: String!
position: Int!
values: Map!
bytes: String!
type: String!
created: Time!
meta: Meta!
linked: Event
}

66
app/gql/resolver.go Normal file
View File

@@ -0,0 +1,66 @@
package gql
import (
"context"
"github.com/99designs/gqlgen/graphql"
"go.sour.is/pkg/gql"
"go.sour.is/pkg/gql/resolver"
gql_es "go.sour.is/ev/gql"
"go.sour.is/tools/app/msgbus"
"go.sour.is/tools/app/salty"
"go.sour.is/tools/internal/graph/generated"
)
type Resolver struct {
msgbus.MsgbusResolver
salty.SaltyResolver
gql_es.EventResolver
}
// Query returns generated.QueryResolver implementation.
func (r *Resolver) Query() generated.QueryResolver { return r }
// Query returns generated.QueryResolver implementation.
func (r *Resolver) Mutation() generated.MutationResolver { return r }
// Subscription returns generated.SubscriptionResolver implementation.
func (r *Resolver) Subscription() generated.SubscriptionResolver { return r }
// func (r *Resolver) isResolver() {}
func (r *Resolver) ExecutableSchema() graphql.ExecutableSchema {
return generated.NewExecutableSchema(generated.Config{Resolvers: r})
}
func (r *Resolver) BaseResolver() resolver.IsResolver {
return &noop{}
}
type noop struct{}
var _ msgbus.MsgbusResolver = (*noop)(nil)
var _ salty.SaltyResolver = (*noop)(nil)
var _ gql_es.EventResolver = (*noop)(nil)
func (*noop) IsResolver() {}
func (*noop) CreateSaltyUser(ctx context.Context, nick string, pubkey string) (*salty.SaltyUser, error) {
panic("not implemented")
}
func (*noop) Posts(ctx context.Context, name, tag string, paging *gql.PageInput) (*gql.Connection, error) {
panic("not implemented")
}
func (*noop) SaltyUser(ctx context.Context, nick string) (*salty.SaltyUser, error) {
panic("not implemented")
}
func (*noop) PostAdded(ctx context.Context, name, tag string, after int64) (<-chan *msgbus.PostEvent, error) {
panic("not implemented")
}
func (*noop) Events(ctx context.Context, streamID string, paging *gql.PageInput) (*gql.Connection, error) {
panic("not implemented")
}
func (*noop) EventAdded(ctx context.Context, streamID string, after int64) (<-chan *gql_es.Event, error) {
panic("not implemented")
}
func (*noop) TruncateStream(ctx context.Context, streamID string, index int64) (bool, error) {
panic("not implemented")
}

View File

@@ -0,0 +1,9 @@
# extend type Query{
# namespaces: [String!]!
# keys(namespace: String!) [String!]!
# get(namespace: String! keys: [String!]) [String]!
# }
# extend type Mutation{
# set(namespace: String! key: String! value: String): Bool!
# }

1
app/mercury/service.go Normal file
View File

@@ -0,0 +1 @@
package mercury

View File

@@ -0,0 +1,16 @@
extend type Query {
posts(name: String!, tag: String! = "", paging: PageInput): Connection!
}
extend type Subscription {
"""after == 0 start from begining, after == -1 start from end"""
postAdded(name: String!, tag: String! = "", after: Int! = -1): PostEvent
}
type PostEvent implements Edge @goModel(model: "go.sour.is/tools/app/msgbus.PostEvent") {
id: ID!
payload: String!
payloadJSON: Map!
tags: [String!]!
meta: Meta!
}

597
app/msgbus/service.go Normal file
View File

@@ -0,0 +1,597 @@
package msgbus
import (
"bytes"
"context"
"encoding/base64"
"encoding/json"
"fmt"
"hash/fnv"
"io"
"net/http"
"strconv"
"strings"
"time"
"github.com/gorilla/websocket"
"go.opentelemetry.io/otel/metric"
"go.uber.org/multierr"
"go.sour.is/ev"
"go.sour.is/ev/event"
"go.sour.is/pkg/gql"
"go.sour.is/pkg/lg"
)
type service struct {
es *ev.EventStore
m_gql_posts metric.Int64Counter
m_gql_post_added metric.Int64Counter
m_gql_post_added_event metric.Int64Counter
m_req_time metric.Int64Histogram
}
type MsgbusResolver interface {
Posts(ctx context.Context, name, tag string, paging *gql.PageInput) (*gql.Connection, error)
PostAdded(ctx context.Context, name, tag string, after int64) (<-chan *PostEvent, error)
IsResolver()
}
func New(ctx context.Context, es *ev.EventStore) (*service, error) {
ctx, span := lg.Span(ctx)
defer span.End()
if err := event.Register(ctx, &PostEvent{}); err != nil {
return nil, err
}
if err := event.RegisterName(ctx, "domain.PostEvent", &PostEvent{}); err != nil {
return nil, err
}
m := lg.Meter(ctx)
svc := &service{es: es}
var err, errs error
svc.m_gql_posts, err = m.Int64Counter("msgbus_posts",
metric.WithDescription("msgbus graphql posts requests"),
)
errs = multierr.Append(errs, err)
svc.m_gql_post_added, err = m.Int64Counter("msgbus_post_added",
metric.WithDescription("msgbus graphql post added subcription requests"),
)
errs = multierr.Append(errs, err)
svc.m_gql_post_added_event, err = m.Int64Counter("msgbus_post_event",
metric.WithDescription("msgbus graphql post added subscription events"),
)
errs = multierr.Append(errs, err)
svc.m_req_time, err = m.Int64Histogram("msgbus_request_time",
metric.WithDescription("msgbus graphql post added subscription events"),
metric.WithUnit("ns"),
)
errs = multierr.Append(errs, err)
span.RecordError(err)
return svc, errs
}
var upgrader = websocket.Upgrader{
WriteBufferSize: 4096,
CheckOrigin: func(r *http.Request) bool {
return true
},
}
func (s *service) IsResolver() {}
func (s *service) RegisterHTTP(mux *http.ServeMux) {
mux.Handle("/inbox/", lg.Htrace(http.StripPrefix("/inbox/", s), "inbox"))
}
func (s *service) ServeHTTP(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
ctx, span := lg.Span(ctx)
defer span.End()
r = r.WithContext(ctx)
switch r.Method {
case http.MethodGet:
if r.Header.Get("Upgrade") == "websocket" {
s.websocket(w, r)
return
}
s.get(w, r)
case http.MethodPost, http.MethodPut:
s.post(w, r)
default:
w.WriteHeader(http.StatusMethodNotAllowed)
}
}
// Posts is the resolver for the events field.
func (s *service) Posts(ctx context.Context, name, tag string, paging *gql.PageInput) (*gql.Connection, error) {
ctx, span := lg.Span(ctx)
defer span.End()
s.m_gql_posts.Add(ctx, 1)
start := time.Now()
defer s.m_req_time.Record(ctx, time.Since(start).Milliseconds())
streamID := withTag("post-"+name, tag)
lis, err := s.es.Read(ctx, streamID, paging.GetIdx(0), paging.GetCount(30))
if err != nil {
span.RecordError(err)
return nil, err
}
edges := make([]gql.Edge, 0, len(lis))
for i := range lis {
span.AddEvent(fmt.Sprint("post ", i, " of ", len(lis)))
e := lis[i]
post, ok := e.(*PostEvent)
if !ok {
continue
}
edges = append(edges, post)
}
var first, last uint64
if first, err = s.es.FirstIndex(ctx, streamID); err != nil {
span.RecordError(err)
return nil, err
}
if last, err = s.es.LastIndex(ctx, streamID); err != nil {
span.RecordError(err)
return nil, err
}
return &gql.Connection{
Paging: &gql.PageInfo{
Next: lis.Last().EventMeta().Position < last,
Prev: lis.First().EventMeta().Position > first,
Begin: lis.First().EventMeta().Position,
End: lis.Last().EventMeta().Position,
},
Edges: edges,
}, nil
}
func (r *service) PostAdded(ctx context.Context, name, tag string, after int64) (<-chan *PostEvent, error) {
ctx, span := lg.Span(ctx)
defer span.End()
r.m_gql_post_added.Add(ctx, 1)
es := r.es.EventStream()
if es == nil {
return nil, fmt.Errorf("EventStore does not implement streaming")
}
streamID := withTag("post-"+name, tag)
sub, err := es.Subscribe(ctx, streamID, after)
if err != nil {
span.RecordError(err)
return nil, err
}
ch := make(chan *PostEvent)
go func() {
ctx, span := lg.Span(ctx)
defer span.End()
{
ctx, span := lg.Fork(ctx)
defer func() {
defer span.End()
ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()
err := sub.Close(ctx)
span.RecordError(err)
}()
}
for <-sub.Recv(ctx) {
events, err := sub.Events(ctx)
if err != nil {
span.RecordError(err)
break
}
span.AddEvent(fmt.Sprintf("received %d events", len(events)))
r.m_gql_post_added_event.Add(ctx, int64(len(events)))
for _, e := range events {
if p, ok := e.(*PostEvent); ok {
select {
case ch <- p:
continue
case <-ctx.Done():
return
}
}
}
}
}()
return ch, nil
}
func (s *service) get(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
ctx, span := lg.Span(ctx)
defer span.End()
start := time.Now()
defer s.m_req_time.Record(ctx, time.Since(start).Milliseconds())
name, tag, _ := strings.Cut(r.URL.Path, "/")
if name == "" {
w.WriteHeader(http.StatusNotFound)
return
}
streamID := withTag("post-"+name, tag)
var first event.Event = event.NilEvent
if lis, err := s.es.Read(ctx, streamID, 0, 1); err == nil && len(lis) > 0 {
first = lis[0]
}
var pos, count int64 = 0, ev.AllEvents
qry := r.URL.Query()
if i, err := strconv.ParseInt(qry.Get("index"), 10, 64); err == nil && i > 1 {
pos = i - 1
}
if i, err := strconv.ParseInt(qry.Get("pos"), 10, 64); err == nil {
pos = i
}
if i, err := strconv.ParseInt(qry.Get("n"), 10, 64); err == nil {
count = i
}
span.AddEvent(fmt.Sprint("GET topic=", streamID, " idx=", pos, " n=", count))
events, err := s.es.Read(ctx, streamID, pos, count)
if err != nil {
span.RecordError(err)
w.WriteHeader(http.StatusInternalServerError)
return
}
if strings.Contains(r.Header.Get("Accept"), "application/json") {
w.Header().Add("Content-Type", "application/json")
if err = encodeJSON(w, first, events...); err != nil {
span.RecordError(err)
w.WriteHeader(http.StatusInternalServerError)
return
}
return
}
for i := range events {
fmt.Fprintln(w, events[i])
}
}
func (s *service) post(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
ctx, span := lg.Span(ctx)
defer span.End()
start := time.Now()
defer s.m_req_time.Record(ctx, time.Since(start).Milliseconds())
name, tags, _ := strings.Cut(r.URL.Path, "/")
if name == "" {
w.WriteHeader(http.StatusNotFound)
return
}
var first event.Event = event.NilEvent
if lis, err := s.es.Read(ctx, "post-"+name, 0, 1); err == nil && len(lis) > 0 {
first = lis[0]
}
b, err := io.ReadAll(io.LimitReader(r.Body, 64*1024))
if err != nil {
span.RecordError(err)
w.WriteHeader(http.StatusBadRequest)
return
}
r.Body.Close()
if name == "" {
w.WriteHeader(http.StatusNotFound)
return
}
events := event.NewEvents(&PostEvent{
payload: b,
tags: fields(tags),
})
_, err = s.es.Append(ctx, "post-"+name, events)
if err != nil {
span.RecordError(err)
w.WriteHeader(http.StatusInternalServerError)
return
}
if first == event.NilEvent {
first = events.First()
}
m := events.First().EventMeta()
span.AddEvent(fmt.Sprint("POST topic=", name, " tags=", tags, " idx=", m.Position, " id=", m.EventID))
w.WriteHeader(http.StatusAccepted)
if strings.Contains(r.Header.Get("Accept"), "application/json") {
w.Header().Add("Content-Type", "application/json")
if err = encodeJSON(w, first, events...); err != nil {
span.RecordError(err)
w.WriteHeader(http.StatusInternalServerError)
return
}
return
}
span.AddEvent("finish response")
w.Header().Add("Content-Type", "text/plain")
fmt.Fprintf(w, "OK %d %s", m.Position, m.EventID)
}
func (s *service) websocket(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
ctx, span := lg.Span(ctx)
defer span.End()
name, tag, _ := strings.Cut(r.URL.Path, "/")
if name == "" {
w.WriteHeader(http.StatusNotFound)
return
}
streamID := withTag("post-"+name, tag)
var first event.Event = event.NilEvent
if lis, err := s.es.Read(ctx, streamID, 0, 1); err == nil && len(lis) > 0 {
first = lis[0]
}
var pos int64 = 0
qry := r.URL.Query()
if i, err := strconv.ParseInt(qry.Get("index"), 10, 64); err == nil && i > 0 {
pos = i - 1
}
span.AddEvent(fmt.Sprint("WS topic=", streamID, " idx=", pos))
c, err := upgrader.Upgrade(w, r, nil)
if err != nil {
span.RecordError(err)
return
}
defer c.Close()
ctx, cancel := context.WithCancel(ctx)
c.SetCloseHandler(func(code int, text string) error {
cancel()
return nil
})
go func() {
for {
if err := ctx.Err(); err != nil {
return
}
mt, message, err := c.ReadMessage()
if err != nil {
span.RecordError(err)
return
}
span.AddEvent(fmt.Sprintf("recv: %d %s", mt, message))
}
}()
es := s.es.EventStream()
if es == nil {
span.AddEvent("EventStore does not implement streaming")
w.WriteHeader(http.StatusInternalServerError)
return
}
sub, err := es.Subscribe(ctx, streamID, pos)
if err != nil {
span.RecordError(err)
return
}
{
ctx, span := lg.Fork(ctx)
defer func() {
defer span.End()
ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()
err := sub.Close(ctx)
span.RecordError(err)
}()
}
span.AddEvent("start ws")
for <-sub.Recv(ctx) {
events, err := sub.Events(ctx)
if err != nil {
break
}
span.AddEvent(fmt.Sprint("got events ", len(events)))
for i := range events {
e, ok := events[i].(*PostEvent)
if !ok {
continue
}
span.AddEvent(fmt.Sprint("send", i, e.String()))
var b bytes.Buffer
if err = encodeJSON(&b, first, e); err != nil {
span.RecordError(err)
}
err = c.WriteMessage(websocket.TextMessage, b.Bytes())
if err != nil {
span.RecordError(err)
break
}
}
}
}
type PostEvent struct {
payload []byte
tags []string
event.IsEvent
}
func (e *PostEvent) Values() any {
if e == nil {
return nil
}
return struct {
Payload []byte `json:"payload"`
Tags []string `json:"tags,omitempty"`
}{
Payload: e.payload,
Tags: e.tags,
}
}
func (e *PostEvent) MarshalBinary() ([]byte, error) {
j := e.Values()
return json.Marshal(&j)
}
func (e *PostEvent) UnmarshalBinary(b []byte) error {
j := struct {
Payload []byte
Tags []string
}{}
err := json.Unmarshal(b, &j)
e.payload = j.Payload
e.tags = j.Tags
return err
}
func (e *PostEvent) MarshalJSON() ([]byte, error) { return e.MarshalBinary() }
func (e *PostEvent) UnmarshalJSON(b []byte) error { return e.UnmarshalBinary(b) }
func (e *PostEvent) ID() string { return e.EventMeta().GetEventID() }
func (e *PostEvent) Tags() []string { return e.tags }
func (e *PostEvent) Payload() string { return string(e.payload) }
func (e *PostEvent) PayloadJSON(ctx context.Context) (m map[string]interface{}, err error) {
err = json.Unmarshal([]byte(e.payload), &m)
return
}
func (e *PostEvent) Meta() event.Meta { return e.EventMeta() }
func (e *PostEvent) IsEdge() {}
func (e *PostEvent) String() string {
var b bytes.Buffer
b.WriteString(strconv.FormatUint(e.EventMeta().Position, 10))
b.WriteRune('\t')
b.WriteString(e.EventMeta().EventID.String())
b.WriteRune('\t')
b.WriteString(string(e.payload))
if len(e.tags) > 0 {
b.WriteRune('\t')
b.WriteString(strings.Join(e.tags, ","))
}
return b.String()
}
func fields(s string) []string {
if s == "" {
return nil
}
return strings.Split(s, ";")
}
func encodeJSON(w io.Writer, first event.Event, events ...event.Event) error {
out := make([]struct {
ID uint64 `json:"id"`
Payload []byte `json:"payload"`
Created string `json:"created"`
Tags []string `json:"tags"`
Topic struct {
Name string `json:"name"`
TTL uint64 `json:"ttl"`
Seq uint64 `json:"seq"`
Created string `json:"created"`
} `json:"topic"`
}, len(events))
for i := range events {
e, ok := events[i].(*PostEvent)
if !ok {
continue
}
out[i].ID = e.EventMeta().Position
out[i].Created = e.EventMeta().Created().Format(time.RFC3339Nano)
out[i].Payload = e.payload
out[i].Tags = e.tags
out[i].Topic.Name = strings.TrimPrefix(e.EventMeta().StreamID, "post-")
out[i].Topic.Created = first.EventMeta().Created().Format(time.RFC3339Nano)
out[i].Topic.Seq = e.EventMeta().Position
}
if len(out) == 1 {
return json.NewEncoder(w).Encode(out[0])
}
return json.NewEncoder(w).Encode(out)
}
func Projector(e event.Event) []event.Event {
m := e.EventMeta()
streamID := m.StreamID
streamPos := m.Position
switch e := e.(type) {
case *PostEvent:
lis := make([]event.Event, len(e.tags))
for i := range lis {
tag := e.tags[i]
ne := event.NewPtr(streamID, streamPos)
event.SetStreamID(withTag(streamID, tag), ne)
lis[i] = ne
}
return lis
}
return nil
}
func withTag(id, tag string) string {
if tag == "" {
return id
}
h := fnv.New128a()
fmt.Fprint(h, tag)
return id + "-" + base64.RawURLEncoding.EncodeToString(h.Sum(nil))
}

View File

@@ -0,0 +1,25 @@
package msgbus
import (
"encoding/json"
"testing"
)
func TestUnmarshal(t *testing.T) {
m := &PostEvent{}
s := `{"Payload":"QkVHSU4gU0FMVFBBQ0sgRU5DUllQVEVEIE1FU1NBR0UuIGtlRElETVFXWXZWUjU4QiBGVGZUZURRTkhzT2ZuZWMgWUV1dkNYSTNoVDBYZzZKIDJxeXBJcmdsT3ZlZjlqciA0dHVQaWpJRVgxdlpoTEkgUTVKTzNvYVY5cnNna01BIEFOeTJwYjg2Qkd6N0JGMCA4MXJuZk9OV2RRM0VldFAgSmU0ZFlHeUI4NkRydkVrIGNqcFpoajNmcEJUcDdiZiBpMktwRDJQM1kzNVJBQU8gWmIyZGZtOVpneHZNSVJ2IDJsVVRCWTQxVEtZNkJhTyB2NGVIeXF1MENjQkR4dW8gSEZIekxJd3BBb3ZoRGt1IGFJdXRZYzdhZ3puMUxvNCBZQWFyUDZxVVVtTVlrQXAgYkdSYTZLZWVOa3ZzTDdMIHFoMWd6WUlnS2l6cW51eCB1SVQ0QTdaU1BscWxlR1IgbTk3M2ZoNUduWEZTM3MwIDJzQ2FvclpmN2c1RUo5TiBlS1hkZkFSMWF6TVRBek8gSmNEM1hDNDBwVTRpaG9mIE8wYnB2RU1UOVlUb3ZOWCBobVUxZWZ6enpyMUFDdXcgWExwcUhlVXNXdEtGcXRnIHdyWEZleExBYU50T21jRSBOeFFDUi4gRU5EIFNBTFRQQUNLIEVOQ1JZUFRFRCBNRVNTQUdFLgo=","Tags":null}`
err := json.Unmarshal([]byte(s), m)
t.Log(err)
}
func TestMarshal(t *testing.T) {
m := &PostEvent{
payload: []byte("QkVHSU4gU0FMVFBBQ0sgRU5DUllQVEVEIE1FU1NBR0UuIGtlRElETVFXWXZWUjU4QiBGVGZUZURRTkhzT2ZuZWMgWUV1dkNYSTNoVDBYZzZKIDJxeXBJcmdsT3ZlZjlqciA0dHVQaWpJRVgxdlpoTEkgUTVKTzNvYVY5cnNna01BIEFOeTJwYjg2Qkd6N0JGMCA4MXJuZk9OV2RRM0VldFAgSmU0ZFlHeUI4NkRydkVrIGNqcFpoajNmcEJUcDdiZiBpMktwRDJQM1kzNVJBQU8gWmIyZGZtOVpneHZNSVJ2IDJsVVRCWTQxVEtZNkJhTyB2NGVIeXF1MENjQkR4dW8gSEZIekxJd3BBb3ZoRGt1IGFJdXRZYzdhZ3puMUxvNCBZQWFyUDZxVVVtTVlrQXAgYkdSYTZLZWVOa3ZzTDdMIHFoMWd6WUlnS2l6cW51eCB1SVQ0QTdaU1BscWxlR1IgbTk3M2ZoNUduWEZTM3MwIDJzQ2FvclpmN2c1RUo5TiBlS1hkZkFSMWF6TVRBek8gSmNEM1hDNDBwVTRpaG9mIE8wYnB2RU1UOVlUb3ZOWCBobVUxZWZ6enpyMUFDdXcgWExwcUhlVXNXdEtGcXRnIHdyWEZleExBYU50T21jRSBOeFFDUi4gRU5EIFNBTFRQQUNLIEVOQ1JZUFRFRCBNRVNTQUdFLgo="),
}
b, err := json.Marshal(m)
t.Log(err)
err = json.Unmarshal(b, m)
t.Log(err)
}

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,58 @@
/* Space out content a bit */
body {
padding-top: 20px;
padding-bottom: 20px;
}
/* Everything but the jumbotron gets side spacing for mobile first views */
.header,
.footer {
padding-right: 15px;
padding-left: 15px;
}
/* Custom page header */
.header {
padding-bottom: 20px;
border-bottom: 1px solid #e5e5e5;
}
/* Make the masthead heading the same height as the navigation */
.header h3 {
margin-top: 0;
margin-bottom: 0;
line-height: 40px;
}
/* Custom page footer */
.footer {
padding-top: 19px;
color: #777;
border-top: 1px solid #e5e5e5;
}
.panel-heading a {
color: white;
font-weight: bold;
}
.container-narrow > hr {
margin: 30px 0;
}
.table tbody tr th {
width: 70%
}
@media (prefers-color-scheme: dark) {
body, .panel-body {
color: white;
background-color: #121212;
}
.table-striped > tbody > tr:nth-of-type(2n+1) {
background-color: darkslategray;
}
}
@media (prefers-color-scheme: light) {
}

56
app/peerfinder/ev-info.go Normal file
View File

@@ -0,0 +1,56 @@
package peerfinder
import (
"bytes"
"github.com/tj/go-semver"
"go.sour.is/ev/event"
)
type Info struct {
ScriptVersion string `json:"script_version"`
event.IsAggregate
}
var _ event.Aggregate = (*Info)(nil)
func (a *Info) ApplyEvent(lis ...event.Event) {
for _, e := range lis {
switch e := e.(type) {
case *VersionChanged:
a.ScriptVersion = e.ScriptVersion
}
}
}
func (a *Info) MarshalEnviron() ([]byte, error) {
var b bytes.Buffer
b.WriteString("SCRIPT_VERSION=")
b.WriteString(a.ScriptVersion)
b.WriteRune('\n')
return b.Bytes(), nil
}
func (a *Info) OnUpsert() error {
if a.StreamVersion() == 0 {
event.Raise(a, &VersionChanged{ScriptVersion: initVersion})
}
current, _ := semver.Parse(initVersion)
previous, _ := semver.Parse(a.ScriptVersion)
if current.Compare(previous) > 0 {
event.Raise(a, &VersionChanged{ScriptVersion: initVersion})
}
return nil
}
type VersionChanged struct {
ScriptVersion string `json:"script_version"`
event.IsEvent
}
var _ event.Event = (*VersionChanged)(nil)

111
app/peerfinder/ev-peer.go Normal file
View File

@@ -0,0 +1,111 @@
package peerfinder
import (
"net"
"strconv"
"strings"
"time"
"github.com/keys-pub/keys/json"
"go.sour.is/pkg/set"
"go.sour.is/ev/event"
)
type Time time.Time
func (t *Time) UnmarshalJSON(b []byte) error {
time, err := time.Parse(`"2006-01-02 15:04:05"`, string(b))
*t = Time(time)
return err
}
func (t *Time) MarshalJSON() ([]byte, error) {
if t == nil {
return nil, nil
}
i := *t
return time.Time(i).MarshalJSON()
}
type ipFamily string
const (
ipFamilyV4 ipFamily = "IPv4"
ipFamilyV6 ipFamily = "IPv6"
ipFamilyBoth ipFamily = "both"
ipFamilyNone ipFamily = "none"
)
func (t *ipFamily) UnmarshalJSON(b []byte) error {
i, err := strconv.Atoi(strings.Trim(string(b), `"`))
switch i {
case 1:
*t = ipFamilyV4
case 2:
*t = ipFamilyV6
case 3:
*t = ipFamilyBoth
default:
*t = ipFamilyNone
}
return err
}
type peerType []string
func (t *peerType) UnmarshalJSON(b []byte) error {
var bs string
json.Unmarshal(b, &bs)
*t = strings.Split(bs, ",")
return nil
}
type Peer struct {
ID string `json:"peer_id,omitempty"`
Owner string `json:"peer_owner"`
Nick string `json:"peer_nick"`
Name string `json:"peer_name"`
Country string `json:"peer_country"`
Note string `json:"peer_note"`
Family ipFamily `json:"peer_family"`
Type peerType `json:"peer_type"`
Created Time `json:"peer_created"`
}
func (p *Peer) CanSupport(ip string) bool {
addr := net.ParseIP(ip)
if addr == nil {
return false
}
if !(addr.IsGlobalUnicast() || addr.IsLoopback() || addr.IsPrivate()) {
return false
}
switch p.Family {
case ipFamilyV4:
return addr.To4() != nil
case ipFamilyV6:
return addr.To16() != nil
case ipFamilyNone:
return false
}
return true
}
type PeerResults struct {
set.Set[string]
event.IsAggregate
}
func (p *PeerResults) ApplyEvent(lis ...event.Event) {
for _, e := range lis {
switch e := e.(type) {
case *ResultSubmitted:
if p.Set == nil {
p.Set = set.New[string]()
}
p.Set.Add(e.RequestID)
}
}
}

View File

@@ -0,0 +1,230 @@
package peerfinder
import (
"bytes"
"encoding/json"
"fmt"
"net/netip"
"strconv"
"time"
"github.com/oklog/ulid"
"go.sour.is/ev/event"
"go.sour.is/pkg/set"
)
type Request struct {
event.IsAggregate
RequestID string `json:"req_id"`
RequestIP string `json:"req_ip"`
Hidden bool `json:"hide,omitempty"`
Created time.Time `json:"req_created"`
Family int `json:"family"`
Responses []*Response `json:"responses"`
peers set.Set[string]
initial *RequestSubmitted
}
var _ event.Aggregate = (*Request)(nil)
func (a *Request) ApplyEvent(lis ...event.Event) {
for _, e := range lis {
switch e := e.(type) {
case *RequestSubmitted:
a.RequestID = e.EventMeta().EventID.String()
a.RequestIP = e.RequestIP
a.Hidden = e.Hidden
a.Created = ulid.Time(e.EventMeta().EventID.Time())
a.Family = e.Family()
a.initial = e
case *ResultSubmitted:
if a.peers == nil {
a.peers = set.New[string]()
}
if a.peers.Has(e.PeerID) {
continue
}
a.peers.Add(e.PeerID)
a.Responses = append(a.Responses, &Response{
PeerID: e.PeerID,
ScriptVersion: e.PeerVersion,
Latency: e.Latency,
Jitter: e.Jitter,
MinRTT: e.MinRTT,
MaxRTT: e.MaxRTT,
Sent: e.Sent,
Received: e.Received,
Unreachable: e.Unreachable,
Created: ulid.Time(e.EventMeta().EventID.Time()),
})
}
}
}
func (a *Request) MarshalEnviron() ([]byte, error) {
return a.initial.MarshalEnviron()
}
func (a *Request) CreatedString() string {
return a.Created.Format("2006-01-02 15:04:05")
}
type ListRequest []*Request
func (lis ListRequest) Len() int {
return len(lis)
}
func (lis ListRequest) Less(i, j int) bool {
return lis[i].Created.Before(lis[j].Created)
}
func (lis ListRequest) Swap(i, j int) {
lis[i], lis[j] = lis[j], lis[i]
}
type Response struct {
Peer *Peer `json:"peer"`
PeerID string `json:"-"`
ScriptVersion string `json:"peer_scriptver"`
Latency float64 `json:"res_latency"`
Jitter float64 `json:"res_jitter,omitempty"`
MaxRTT float64 `json:"res_maxrtt,omitempty"`
MinRTT float64 `json:"res_minrtt,omitempty"`
Sent int `json:"res_sent,omitempty"`
Received int `json:"res_recv,omitempty"`
Unreachable bool `json:"unreachable,omitempty"`
Created time.Time `json:"res_created"`
}
type ListResponse []*Response
func (lis ListResponse) Len() int {
return len(lis)
}
func (lis ListResponse) Less(i, j int) bool {
if lis[j].Latency == 0.0 && lis[i].Latency > 0.0 {
return true
}
if lis[i].Latency == 0.0 && lis[j].Latency > 0.0 {
return false
}
return lis[j].Latency >= lis[i].Latency
}
func (lis ListResponse) Swap(i, j int) {
lis[i], lis[j] = lis[j], lis[i]
}
type RequestSubmitted struct {
event.IsEvent
RequestIP string `json:"req_ip"`
Hidden bool `json:"hide,omitempty"`
}
func (r *RequestSubmitted) StreamID() string {
return r.EventMeta().GetEventID()
}
func (r *RequestSubmitted) RequestID() string {
return r.EventMeta().GetEventID()
}
func (r *RequestSubmitted) Created() time.Time {
return r.EventMeta().Created()
}
func (r *RequestSubmitted) CreatedString() string {
return r.Created().Format("2006-01-02 15:04:05")
}
func (r *RequestSubmitted) Family() int {
if r == nil {
return 0
}
ip, err := netip.ParseAddr(r.RequestIP)
switch {
case err != nil:
return 0
case ip.Is4():
return 1
default:
return 2
}
}
func (r *RequestSubmitted) String() string {
return fmt.Sprint(r.EventMeta().EventID, r.RequestIP, r.Hidden, r.CreatedString())
}
var _ event.Event = (*RequestSubmitted)(nil)
func (e *RequestSubmitted) MarshalBinary() (text []byte, err error) {
return json.Marshal(e)
}
func (e *RequestSubmitted) UnmarshalBinary(b []byte) error {
return json.Unmarshal(b, e)
}
func (e *RequestSubmitted) MarshalEnviron() ([]byte, error) {
if e == nil {
return nil, nil
}
var b bytes.Buffer
b.WriteString("REQ_ID=")
b.WriteString(e.RequestID())
b.WriteRune('\n')
b.WriteString("REQ_IP=")
b.WriteString(e.RequestIP)
b.WriteRune('\n')
b.WriteString("REQ_FAMILY=")
if family := e.Family(); family > 0 {
b.WriteString(strconv.Itoa(family))
}
b.WriteRune('\n')
b.WriteString("REQ_CREATED=")
b.WriteString(e.CreatedString())
b.WriteRune('\n')
return b.Bytes(), nil
}
type ResultSubmitted struct {
event.IsEvent
RequestID string `json:"req_id"`
PeerID string `json:"peer_id"`
PeerVersion string `json:"peer_version"`
Latency float64 `json:"latency,omitempty"`
Jitter float64 `json:"jitter,omitempty"`
MaxRTT float64 `json:"maxrtt,omitempty"`
MinRTT float64 `json:"minrtt,omitempty"`
Sent int `json:"res_sent,omitempty"`
Received int `json:"res_recv,omitempty"`
Unreachable bool `json:"unreachable,omitempty"`
}
func (r *ResultSubmitted) Created() time.Time {
return r.EventMeta().Created()
}
var _ event.Event = (*ResultSubmitted)(nil)
func (e *ResultSubmitted) String() string {
return fmt.Sprintf("id: %s\npeer: %s\nversion: %s\nlatency: %0.4f", e.RequestID, e.PeerID, e.PeerVersion, e.Latency)
}
type RequestTruncated struct {
RequestID string
event.IsEvent
}
var _ event.Event = (*RequestTruncated)(nil)
func (e *RequestTruncated) String() string {
return fmt.Sprintf("request truncated id: %s\n", e.RequestID)
}

713
app/peerfinder/http.go Normal file
View File

@@ -0,0 +1,713 @@
package peerfinder
import (
"context"
"embed"
"encoding/json"
"errors"
"fmt"
"html/template"
"io"
"io/fs"
"log"
"net"
"net/http"
"sort"
"strconv"
"strings"
"github.com/oklog/ulid/v2"
contentnegotiation "gitlab.com/jamietanna/content-negotiation-go"
"go.opentelemetry.io/otel/attribute"
"go.sour.is/pkg/lg"
"go.sour.is/ev"
"go.sour.is/ev/event"
)
var (
//go:embed pages/* layouts/* assets/*
files embed.FS
templates map[string]*template.Template
)
// Args passed to templates
type Args struct {
RemoteIP string
Requests []*Request
CountPeers int
}
// requestArgs builds args from http.Request
func requestArgs(r *http.Request) Args {
remoteIP, _, _ := strings.Cut(r.RemoteAddr, ":")
if s := r.Header.Get("X-Forwarded-For"); s != "" {
s, _, _ = strings.Cut(s, ", ")
remoteIP = s
}
return Args{
RemoteIP: remoteIP,
}
}
// RegisterHTTP adds handler paths to the ServeMux
func (s *service) RegisterHTTP(mux *http.ServeMux) {
a, _ := fs.Sub(files, "assets")
assets := http.StripPrefix("/peers/assets/", http.FileServer(http.FS(a)))
mux.Handle("/peers/assets/", lg.Htrace(assets, "peer-assets"))
mux.Handle("/peers/", lg.Htrace(s, "peers"))
}
func (s *service) Setup() error {
return nil
}
// ServeHTTP handle HTTP requests
func (s *service) ServeHTTP(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
ctx, span := lg.Span(ctx)
defer span.End()
r = r.WithContext(ctx)
if !s.up.Load() {
w.WriteHeader(http.StatusFailedDependency)
fmt.Fprint(w, "Starting up...")
return
}
switch r.Method {
case http.MethodGet:
switch {
case strings.HasPrefix(r.URL.Path, "/peers/pending/"):
s.getPending(w, r, strings.TrimPrefix(r.URL.Path, "/peers/pending/"))
return
case strings.HasPrefix(r.URL.Path, "/peers/req/"):
s.getResultsForRequest(w, r, strings.TrimPrefix(r.URL.Path, "/peers/req/"))
return
case strings.HasPrefix(r.URL.Path, "/peers/status"):
var pickID string
if strings.HasPrefix(r.URL.Path, "/peers/status/") {
pickID = strings.TrimPrefix(r.URL.Path, "/peers/status/")
}
var requests []*Request
s.state.Use(r.Context(), func(ctx context.Context, state *state) error {
for id, p := range state.peers {
fmt.Fprintln(w, "PEER:", id[24:], p.Owner, p.Name)
}
if pickID != "" {
if rq, ok := state.requests[pickID]; ok {
requests = append(requests, rq)
}
} else {
requests = make([]*Request, 0, len(state.requests))
for i := range state.requests {
rq := state.requests[i]
requests = append(requests, rq)
}
}
for i := range requests {
rq := requests[i]
for i := range rq.Responses {
res := rq.Responses[i]
if peer, ok := state.peers[res.PeerID]; ok {
res.Peer = peer
res.Peer.ID = ""
}
}
}
return nil
})
for i, rq := range requests {
fmt.Fprintln(w, "REQ: ", i, rq.RequestIP, len(rq.Responses))
for i, peer := range fnOrderByPeer(rq) {
fmt.Fprintln(w, " PEER: ", i, peer.RequestID, peer.Name, peer.Latency, peer.Jitter)
for i, res := range peer.Results {
fmt.Fprintln(w, " RES: ", i, res.RequestID, res.Latency, res.Jitter)
}
}
}
default:
s.getResults(w, r)
return
}
case http.MethodPost:
switch {
case strings.HasPrefix(r.URL.Path, "/peers/req/"):
s.postResult(w, r, strings.TrimPrefix(r.URL.Path, "/peers/req/"))
return
case strings.HasPrefix(r.URL.Path, "/peers/req"):
s.postRequest(w, r)
return
default:
w.WriteHeader(http.StatusNotFound)
return
}
default:
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
}
func (s *service) getPending(w http.ResponseWriter, r *http.Request, peerID string) {
ctx, span := lg.Span(r.Context())
defer span.End()
span.SetAttributes(
attribute.String("peerID", peerID),
)
var peer *Peer
err := s.state.Use(ctx, func(ctx context.Context, state *state) error {
var ok bool
if peer, ok = state.peers[peerID]; !ok {
return fmt.Errorf("peer not found: %s", peerID)
}
return nil
})
if err != nil {
span.RecordError(err)
w.WriteHeader(http.StatusNotFound)
return
}
info, err := ev.Upsert(ctx, s.es, aggInfo, func(ctx context.Context, agg *Info) error {
return agg.OnUpsert() // initialize if not exists
})
if err != nil {
span.RecordError(err)
w.WriteHeader(http.StatusInternalServerError)
return
}
requests, err := s.es.Read(ctx, queueRequests, -1, -30)
if err != nil {
span.RecordError(err)
w.WriteHeader(http.StatusInternalServerError)
return
}
peerResults := &PeerResults{}
peerResults.SetStreamID(aggPeer(peerID))
err = s.es.Load(ctx, peerResults)
if err != nil && !errors.Is(err, ev.ErrNotFound) {
span.RecordError(fmt.Errorf("peer not found: %w", err))
w.WriteHeader(http.StatusNotFound)
}
var req *Request
for _, e := range requests {
r := &Request{}
r.ApplyEvent(e)
if !peerResults.Has(r.RequestID) {
if !peer.CanSupport(r.RequestIP) {
continue
}
req = r
}
}
if req == nil {
span.RecordError(fmt.Errorf("request not found"))
w.WriteHeader(http.StatusNoContent)
}
negotiator := contentnegotiation.NewNegotiator("application/json", "text/environment", "text/plain", "text/html")
negotiated, _, err := negotiator.Negotiate(r.Header.Get("Accept"))
if err != nil {
span.RecordError(err)
w.WriteHeader(http.StatusNotAcceptable)
return
}
span.AddEvent(negotiated.String())
mime := negotiated.String()
switch mime {
case "text/environment":
w.Header().Set("content-type", negotiated.String())
_, err = encodeTo(w, info.MarshalEnviron, req.MarshalEnviron)
case "application/json":
w.Header().Set("content-type", negotiated.String())
var out interface{} = info
if req != nil {
out = struct {
ScriptVersion string `json:"script_version"`
RequestID string `json:"req_id"`
RequestIP string `json:"req_ip"`
Family string `json:"req_family"`
Created string `json:"req_created"`
}{
info.ScriptVersion,
req.RequestID,
req.RequestIP,
strconv.Itoa(req.Family),
req.CreatedString(),
}
}
err = json.NewEncoder(w).Encode(out)
}
span.RecordError(err)
}
func (s *service) getResults(w http.ResponseWriter, r *http.Request) {
ctx, span := lg.Span(r.Context())
defer span.End()
// events, err := s.es.Read(ctx, queueRequests, -1, -30)
// if err != nil {
// span.RecordError(err)
// w.WriteHeader(http.StatusInternalServerError)
// return
// }
// requests := make([]*Request, len(events))
// for i, req := range events {
// if req, ok := req.(*RequestSubmitted); ok {
// requests[i], err = s.loadResult(ctx, req.RequestID())
// if err != nil {
// span.RecordError(err)
// w.WriteHeader(http.StatusInternalServerError)
// return
// }
// }
// }
var requests ListRequest
s.state.Use(ctx, func(ctx context.Context, state *state) error {
requests = make([]*Request, 0, len(state.requests))
for _, req := range state.requests {
if req.RequestID == "" {
continue
}
if req.Hidden {
continue
}
requests = append(requests, req)
}
return nil
})
sort.Sort(sort.Reverse(requests))
args := requestArgs(r)
args.Requests = requests[:maxResults]
s.state.Use(ctx, func(ctx context.Context, state *state) error {
args.CountPeers = len(state.peers)
return nil
})
t := templates["home.go.tpl"]
t.Execute(w, args)
}
func (s *service) getResultsForRequest(w http.ResponseWriter, r *http.Request, uuid string) {
ctx, span := lg.Span(r.Context())
defer span.End()
span.SetAttributes(
attribute.String("uuid", uuid),
)
var request *Request
err := s.state.Use(ctx, func(ctx context.Context, state *state) error {
request = state.requests[uuid]
return nil
})
if err != nil {
w.WriteHeader(http.StatusNotFound)
return
}
request, err = s.loadResult(ctx, request)
// request, err := s.loadResult(ctx, uuid)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
negotiator := contentnegotiation.NewNegotiator("application/json", "text/environment", "text/csv", "text/plain", "text/html")
negotiated, _, err := negotiator.Negotiate(r.Header.Get("Accept"))
if err != nil {
w.WriteHeader(http.StatusNotAcceptable)
return
}
span.AddEvent(negotiated.String())
switch negotiated.String() {
// case "text/environment":
// encodeTo(w, responses.MarshalBinary)
case "application/json":
json.NewEncoder(w).Encode(request)
return
default:
args := requestArgs(r)
args.Requests = append(args.Requests, request)
span.AddEvent(fmt.Sprint(args))
err := renderTo(w, "req.go.tpl", args)
span.RecordError(err)
return
}
}
func (s *service) postRequest(w http.ResponseWriter, r *http.Request) {
ctx, span := lg.Span(r.Context())
defer span.End()
if err := r.ParseForm(); err != nil {
w.WriteHeader(http.StatusUnprocessableEntity)
return
}
args := requestArgs(r)
requestIP := args.RemoteIP
if ip := r.Form.Get("req_ip"); ip != "" {
requestIP = ip
}
ip := net.ParseIP(requestIP)
if ip == nil {
w.WriteHeader(http.StatusBadRequest)
return
}
req := &RequestSubmitted{
RequestIP: ip.String(),
}
if hidden, err := strconv.ParseBool(r.Form.Get("req_hidden")); err == nil {
req.Hidden = hidden
}
span.SetAttributes(
attribute.Stringer("req_ip", ip),
)
s.es.Append(ctx, queueRequests, event.NewEvents(req))
http.Redirect(w, r, "/peers/req/"+req.RequestID(), http.StatusSeeOther)
}
func (s *service) postResult(w http.ResponseWriter, r *http.Request, reqID string) {
ctx, span := lg.Span(r.Context())
defer span.End()
span.SetAttributes(
attribute.String("id", reqID),
)
if _, err := ulid.ParseStrict(reqID); err != nil {
w.WriteHeader(http.StatusNotFound)
return
}
if err := r.ParseForm(); err != nil {
w.WriteHeader(http.StatusUnprocessableEntity)
return
}
form := make([]string, 0, len(r.Form))
for k, vals := range r.Form {
for _, v := range vals {
form = append(form, fmt.Sprint(k, v))
}
}
span.SetAttributes(
attribute.StringSlice("form", form),
)
peerID := r.Form.Get("peer_id")
err := s.state.Use(ctx, func(ctx context.Context, state *state) error {
var ok bool
if _, ok = state.peers[peerID]; !ok {
log.Printf("peer not found: %s\n", peerID)
return fmt.Errorf("peer not found: %s", peerID)
}
return nil
})
if err != nil {
span.RecordError(err)
w.WriteHeader(http.StatusNotFound)
return
}
peerResults := &PeerResults{}
peerResults.SetStreamID(aggPeer(peerID))
err = s.es.Load(ctx, peerResults)
if err != nil {
span.RecordError(fmt.Errorf("peer not found: %w", err))
w.WriteHeader(http.StatusNotFound)
}
if peerResults.Has(reqID) {
span.RecordError(fmt.Errorf("request previously recorded: req=%v peer=%v", reqID, peerID))
w.WriteHeader(http.StatusAlreadyReported)
return
}
var unreach bool
latency, err := strconv.ParseFloat(r.Form.Get("res_latency"), 64)
if err != nil {
unreach = true
}
req := &ResultSubmitted{
RequestID: reqID,
PeerID: r.Form.Get("peer_id"),
PeerVersion: r.Form.Get("peer_version"),
Latency: latency,
Unreachable: unreach,
}
if jitter, err := strconv.ParseFloat(r.Form.Get("res_jitter"), 64); err == nil {
req.Jitter = jitter
} else {
span.RecordError(err)
}
if minrtt, err := strconv.ParseFloat(r.Form.Get("res_minrtt"), 64); err == nil {
req.MinRTT = minrtt
} else {
span.RecordError(err)
}
if maxrtt, err := strconv.ParseFloat(r.Form.Get("res_maxrtt"), 64); err == nil {
req.MaxRTT = maxrtt
} else {
span.RecordError(err)
}
span.SetAttributes(
attribute.Stringer("result", req),
)
log.Printf("record result: %v", req)
s.es.Append(ctx, queueResults, event.NewEvents(req))
}
func renderTo(w io.Writer, name string, args any) (err error) {
defer func() {
if p := recover(); p != nil {
err = fmt.Errorf("panic: %s", p)
}
if err != nil {
fmt.Fprint(w, err)
}
}()
t, ok := templates[name]
if !ok || t == nil {
return fmt.Errorf("missing template")
}
return t.Execute(w, args)
}
func encodeTo(w io.Writer, fns ...func() ([]byte, error)) (int, error) {
i := 0
for _, fn := range fns {
b, err := fn()
if err != nil {
return i, err
}
j, err := w.Write(b)
i += j
if err != nil {
return i, err
}
}
return i, nil
}
func loadTemplates() error {
if templates != nil {
return nil
}
templates = make(map[string]*template.Template)
tmplFiles, err := fs.ReadDir(files, "pages")
if err != nil {
return err
}
for _, tmpl := range tmplFiles {
if tmpl.IsDir() {
continue
}
pt := template.New(tmpl.Name())
pt.Funcs(funcMap)
pt, err = pt.ParseFS(files, "pages/"+tmpl.Name(), "layouts/*.go.tpl")
if err != nil {
log.Println(err)
return err
}
templates[tmpl.Name()] = pt
}
return nil
}
var funcMap = map[string]any{
"orderByPeer": fnOrderByPeer,
"countResponses": fnCountResponses,
}
type peerResult struct {
RequestID string
Name string
Country string
Latency float64
Jitter float64
}
type peer struct {
RequestID string
Name string
Note string
Nick string
Country string
Latency float64
Jitter float64
VPNTypes []string
Results peerResults
}
type listPeer []peer
func (lis listPeer) Len() int {
return len(lis)
}
func (lis listPeer) Less(i, j int) bool {
if lis[j].Latency == 0.0 && lis[i].Latency > 0.0 {
return true
}
if lis[i].Latency == 0.0 && lis[j].Latency > 0.0 {
return false
}
return lis[i].Latency < lis[j].Latency
}
func (lis listPeer) Swap(i, j int) {
lis[i], lis[j] = lis[j], lis[i]
}
type peerResults []peerResult
func (lis peerResults) Len() int {
return len(lis)
}
func (lis peerResults) Less(i, j int) bool {
if lis[j].Latency == 0.0 && lis[i].Latency > 0.0 {
return true
}
if lis[i].Latency == 0.0 && lis[j].Latency > 0.0 {
return false
}
return lis[i].Latency < lis[j].Latency
}
func (lis peerResults) Swap(i, j int) {
lis[i], lis[j] = lis[j], lis[i]
}
func fnOrderByPeer(rq *Request) listPeer {
peers := make(map[string]peer)
for i := range rq.Responses {
if rq.Responses[i] == nil || rq.Responses[i].Peer == nil {
continue
}
rs := rq.Responses[i]
p, ok := peers[rs.Peer.Owner]
if !ok {
p.RequestID = rq.RequestID
p.Country = rs.Peer.Country
p.Name = rs.Peer.Name
p.Nick = rs.Peer.Nick
p.Note = rs.Peer.Note
p.Latency = rs.Latency
p.Jitter = rs.Jitter
p.VPNTypes = rs.Peer.Type
}
p.Results = append(p.Results, peerResult{
RequestID: rq.RequestID,
Name: rs.Peer.Name,
Country: rs.Peer.Country,
Latency: rs.Latency,
Jitter: rs.Jitter,
})
peers[rs.Peer.Owner] = p
}
peerList := make(listPeer, 0, len(peers))
for i := range peers {
v := peers[i]
sort.Sort(v.Results)
v.Name = v.Results[0].Name
v.Country = v.Results[0].Country
v.Latency = v.Results[0].Latency
v.Jitter = v.Results[0].Jitter
peerList = append(peerList, v)
}
sort.Sort(peerList)
return peerList
}
func fnCountResponses(rq *Request) int {
count := 0
for _, res := range rq.Responses {
if !res.Unreachable {
count++
}
}
return count
}
// func filter(peer *Peer, requests, responses event.Events) *RequestSubmitted {
// have := make(map[string]struct{}, len(responses))
// for _, res := range toList[ResultSubmitted](responses...) {
// have[res.RequestID] = struct{}{}
// }
// for _, req := range reverse(toList[RequestSubmitted](requests...)...) {
// if _, ok := have[req.RequestID()]; !ok {
// if !peer.CanSupport(req.RequestIP) {
// continue
// }
// return req
// }
// }
// return nil
// }
// func toList[E any, T es.PE[E]](lis ...event.Event) []T {
// newLis := make([]T, 0, len(lis))
// for i := range lis {
// if e, ok := lis[i].(T); ok {
// newLis = append(newLis, e)
// }
// }
// return newLis
// }
// func reverse[T any](s ...T) []T {
// first, last := 0, len(s)-1
// for first < last {
// s[first], s[last] = s[last], s[first]
// first++
// last--
// }
// return s
// }

216
app/peerfinder/jobs.go Normal file
View File

@@ -0,0 +1,216 @@
package peerfinder
import (
"context"
"encoding/json"
"errors"
"fmt"
"log"
"net/http"
"time"
"go.sour.is/ev"
"go.sour.is/ev/event"
"go.sour.is/pkg/lg"
"go.sour.is/pkg/set"
)
// RefreshJob retrieves peer info from the peerdb
func (s *service) RefreshJob(ctx context.Context, _ time.Time) error {
ctx, span := lg.Span(ctx)
defer span.End()
req, err := http.NewRequestWithContext(ctx, http.MethodGet, s.statusURL, nil)
span.RecordError(err)
if err != nil {
return err
}
req.Header.Set("Accept", "application/json")
res, err := http.DefaultClient.Do(req)
span.RecordError(err)
if err != nil {
return err
}
defer res.Body.Close()
var peers []*Peer
err = json.NewDecoder(res.Body).Decode(&peers)
span.RecordError(err)
if err != nil {
return err
}
err = s.state.Use(ctx, func(ctx context.Context, t *state) error {
for _, peer := range peers {
t.peers[peer.ID] = peer
}
return nil
})
span.RecordError(err)
if err != nil {
return err
}
log.Printf("processed %d peers", len(peers))
span.AddEvent(fmt.Sprintf("processed %d peers", len(peers)))
s.up.Store(true)
err = s.cleanPeerJobs(ctx)
span.RecordError(err)
return err
}
const maxResults = 30
// CleanJob truncates streams old request data
func (s *service) CleanJob(ctx context.Context, now time.Time) error {
ctx, span := lg.Span(ctx)
defer span.End()
span.AddEvent("clear peerfinder requests")
err := s.cleanRequests(ctx, now)
if err != nil {
return err
}
// if err = s.cleanResults(ctx, endRequestID); err != nil {
// return err
// }
return s.cleanPeerJobs(ctx)
}
func (s *service) cleanPeerJobs(ctx context.Context) error {
ctx, span := lg.Span(ctx)
defer span.End()
peers := set.New[string]()
err := s.state.Use(ctx, func(ctx context.Context, state *state) error {
for id := range state.peers {
peers.Add(id)
}
return nil
})
if err != nil {
return err
}
// trunctate all the peer streams to last 30
for streamID := range peers {
streamID = aggPeer(streamID)
first, err := s.es.FirstIndex(ctx, streamID)
if err != nil {
return err
}
last, err := s.es.LastIndex(ctx, streamID)
if err != nil {
return err
}
if last-first < maxResults {
fmt.Println("SKIP", streamID, first, last)
continue
}
newFirst := int64(last - 30)
// fmt.Println("TRUNC", streamID, first, newFirst, last)
span.AddEvent(fmt.Sprint("TRUNC", streamID, first, newFirst, last))
err = s.es.Truncate(ctx, streamID, int64(newFirst))
if err != nil {
return err
}
}
return nil
}
func (s *service) cleanRequests(ctx context.Context, now time.Time) error {
ctx, span := lg.Span(ctx)
defer span.End()
var streamIDs []string
var startPosition, endPosition int64
first, err := s.es.FirstIndex(ctx, queueRequests)
if err != nil {
return err
}
last, err := s.es.LastIndex(ctx, queueRequests)
if err != nil {
return err
}
if last-first < maxResults {
// fmt.Println("SKIP", queueRequests, first, last)
return nil
}
startPosition = int64(first - 1)
endPosition = int64(last - maxResults)
for {
events, err := s.es.Read(ctx, queueRequests, startPosition, 1000) // read 1000 from the top each loop.
if err != nil && !errors.Is(err, ev.ErrNotFound) {
span.RecordError(err)
return err
}
if len(events) == 0 {
break
}
startPosition = int64(events.Last().EventMeta().ActualPosition)
for _, event := range events {
switch e := event.(type) {
case *RequestSubmitted:
if e.EventMeta().ActualPosition < last-maxResults {
streamIDs = append(streamIDs, e.RequestID())
}
}
}
}
// truncate all reqs to found end position
// fmt.Println("TRUNC", queueRequests, int64(endPosition), last)
span.AddEvent(fmt.Sprint("TRUNC", queueRequests, int64(endPosition), last))
err = s.es.Truncate(ctx, queueRequests, int64(endPosition))
if err != nil {
return err
}
// truncate all the request streams
for _, streamID := range streamIDs {
s.state.Use(ctx, func(ctx context.Context, state *state) error {
return state.ApplyEvents(event.NewEvents(&RequestTruncated{
RequestID: streamID,
}))
})
err := s.cleanResult(ctx, streamID)
if err != nil {
return err
}
}
return nil
}
func (s *service) cleanResult(ctx context.Context, requestID string) error {
ctx, span := lg.Span(ctx)
defer span.End()
streamID := aggRequest(requestID)
last, err := s.es.LastIndex(ctx, streamID)
if err != nil {
return err
}
// truncate all reqs to found end position
// fmt.Println("TRUNC", streamID, last)
span.AddEvent(fmt.Sprint("TRUNC", streamID, last))
err = s.es.Truncate(ctx, streamID, int64(last))
if err != nil {
return err
}
return nil
}

View File

@@ -0,0 +1,38 @@
{{define "main"}}
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
{{template "meta" .}}
<title>DN42 PingFinder</title>
<link href="/peers/assets/bootstrap.min.css" rel="stylesheet" integrity="sha384-1q8mTJOASx8j1Au+a5WDVnPi2lkFfwwEAa8hDDdjZlpLegxhjVME1fgjWPGmkzs7" crossorigin="anonymous">
<link href="/peers/assets/peerfinder.css" rel="stylesheet" crossorigin="anonymous">
</head>
<body>
<div class="container-fluid">
<div class="header clearfix">
<nav>
<ul class="nav nav-pills pull-right">
<li role="presentation"><a href="/peers">Home</a></li>
<!--
<li role="presentation"><a href="/peers/status">Status</a></li>
-->
<li role="presentation"><a href="//util.sour.is/peer">Sign up/Manage</a></li>
<li role="presentation"><a href="https://git.dn42.dev/dn42/pingfinder/src/branch/master/clients">Scripts</a></li>
</ul>
</nav>
<h3 class="text-muted">DN42 PeerFinder</h3>
</div>
</div>
<div class=container>
{{template "content" .}}
</div>
</body>
</html>
{{end}}

View File

@@ -0,0 +1,65 @@
{{template "main" .}}
{{define "meta"}}
<meta http-equiv="refresh" content="30">
{{end}}
{{define "content"}}
<h2>What is this?</h2>
<p>This tool allows you to find "good" peerings
for <a href="https://dn42.net">dn42</a>, by measuring the latency from
various points in the network towards you.</p>
<p>If you don't know what dn42 is,
read <a href="https://dn42.net/Home">the website</a> and in particular
the <a href="https://dn42.net/Getting-started-with-dn42">Getting Started
guide</a>.</p>
<h2>How does it work?</h2>
<p>
<ol>
<li>You enter your (Internet) IP address</li>
<li>Various routers participating in dn42 will ping you over the Internet</li>
<li>After a short while, you get back all the latency results</li>
<li>You can then peer with people close to you (low latency)</li>
</ol>
</p>
<form class="form-inline" method="POST" action="/peers/req">
<label>Ping IP Address [Check Hidden?]:</label>
<div class="input-group input-group-sm">
<input class="form-control" type="text" name="req_ip" placeholder="{{ .RemoteIP }}">
<span class="input-group-addon">
<input type="checkbox" name="req_hidden" value=1 aria-label="Hidden?">
</span>
</div>
<button class="btn btn-default" type="submit">Submit</button>
</form>
<p>If you mark your measurement as hidden, it will not be displayed on the
page below. Note that the IP addresses of the target will be shown alongside the result.</p>
<div class=row>
<h2>Results</h2>
{{ with $args := . }}
{{ range $req := .Requests }}
{{ if ne $req.RequestID "" }}
<div class="panel panel-primary">
<div class="panel-heading">
<a href="/peers/req/{{ $req.RequestID }}">
{{ $req.RequestIP }} on {{ $req.Created.Format "02 Jan 06 15:04 MST" }}
</a> &mdash; <b>Request ID:</b> {{ $req.RequestID }}
<div style='float:right'>
<a href="/peers/req/{{ $req.RequestID }}" class='btn btn-success'>{{ countResponses $req }} / {{ $args.CountPeers }} </a>
</div>
</div>
</div>
{{end}}
{{end}}
{{end}}
</div>
{{end}}

View File

@@ -0,0 +1,50 @@
{{template "main" .}}
{{define "meta"}}
<meta http-equiv="refresh" content="30">
{{end}}
{{define "content"}}
{{range .Requests}}
<h2>Results to {{.RequestIP}}{{if .Hidden}} 👁️{{end}}</h2>
{{range orderByPeer .}}
<div class="panel panel-primary" id="peer-{{.Nick}}">
<div class="panel-heading">
<b> {{.Country}} :: {{.Name}} :: {{.Nick}} </b>
<div style='float:right'>
<a class='btn btn-success' href="#peer-{{.Nick}}">{{ if eq .Latency 0.0 }}&mdash;{{ else }}{{printf "%0.3f ms" .Latency}}{{ end }}</a>
</div>
</div>
<div class="panel-body">
<b>Note:</b> {{.Note}}<br/>
<b>VPN Types:</b> {{range .VPNTypes}} {{.}} {{end}}<br/>
<b>IRC:</b> {{.Nick}}
<h4>Other Results</h4>
<table class="table table-striped">
<thead>
<tr>
<th>Peer Name</th>
<th>Country</th>
<th>Latency</th>
<th>Jitter</th>
</tr>
</thead>
<tbody>
{{range .Results}}
<tr>
<th>{{.Name}}</th>
<td>{{.Country}}</td>
<td>{{ if eq .Latency 0.0 }}&mdash;{{ else }}{{printf "%0.3f ms" .Latency}}{{ end }}</td>
<td>{{ if eq .Jitter 0.0 }}&mdash;{{ else }}{{ printf "%0.3f ms" .Jitter }}{{ end }}</td>
</tr>
{{end}}
</tbody>
</table>
</div>
</div>
{{end}}
{{end}}
{{end}}

179
app/peerfinder/service.go Normal file
View File

@@ -0,0 +1,179 @@
package peerfinder
import (
"context"
"fmt"
"sync/atomic"
"time"
"go.sour.is/pkg/lg"
"go.sour.is/pkg/locker"
"go.uber.org/multierr"
"go.sour.is/ev"
"go.sour.is/ev/event"
)
const (
aggInfo = "pf-info"
queueRequests = "pf-requests"
queueResults = "pf-results"
initVersion = "1.2.1"
)
func aggRequest(id string) string { return "pf-request-" + id }
func aggPeer(id string) string { return "pf-peer-" + id }
type service struct {
es *ev.EventStore
statusURL string
state *locker.Locked[*state]
up atomic.Bool
stop func()
}
type state struct {
peers map[string]*Peer
requests map[string]*Request
}
func New(ctx context.Context, es *ev.EventStore, statusURL string) (*service, error) {
ctx, span := lg.Span(ctx)
defer span.End()
loadTemplates()
if err := event.Register(ctx, &RequestSubmitted{}, &ResultSubmitted{}, &VersionChanged{}); err != nil {
span.RecordError(err)
return nil, err
}
svc := &service{
es: es,
statusURL: statusURL,
state: locker.New(&state{
peers: make(map[string]*Peer),
requests: make(map[string]*Request),
})}
return svc, nil
}
func (s *service) loadResult(ctx context.Context, request *Request) (*Request, error) {
if request == nil {
return request, nil
}
return request, s.state.Use(ctx, func(ctx context.Context, t *state) error {
for i := range request.Responses {
res := request.Responses[i]
if peer, ok := t.peers[res.PeerID]; ok {
res.Peer = peer
res.Peer.ID = ""
}
}
return nil
})
}
func (s *service) Run(ctx context.Context) (err error) {
var errs error
ctx, span := lg.Span(ctx)
defer span.End()
ctx, s.stop = context.WithCancel(ctx)
subReq, e := s.es.EventStream().Subscribe(ctx, queueRequests, 0)
errs = multierr.Append(errs, e)
subRes, e := s.es.EventStream().Subscribe(ctx, queueResults, 0)
errs = multierr.Append(errs, e)
defer func() {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
err = multierr.Combine(subReq.Close(ctx), subRes.Close(ctx), err)
}()
if errs != nil {
return errs
}
for {
var events event.Events
select {
case <-ctx.Done():
return nil
case ok := <-subReq.Recv(ctx):
if ok {
events, err = subReq.Events(ctx)
}
case ok := <-subRes.Recv(ctx):
if ok {
events, err = subRes.Events(ctx)
}
}
s.state.Use(ctx, func(ctx context.Context, state *state) error {
return state.ApplyEvents(events)
})
events = events[:0]
}
}
func (s *service) Stop(ctx context.Context) (err error) {
defer func() {
if p := recover(); p != nil {
err = fmt.Errorf("PANIC: %v", p)
}
}()
s.stop()
return err
}
func (s *state) ApplyEvents(events event.Events) error {
for _, e := range events {
switch e := e.(type) {
case *RequestSubmitted:
if _, ok := s.requests[e.RequestID()]; !ok {
s.requests[e.RequestID()] = &Request{}
}
s.requests[e.RequestID()].ApplyEvent(e)
case *ResultSubmitted:
if _, ok := s.requests[e.RequestID]; !ok {
s.requests[e.RequestID] = &Request{}
}
s.requests[e.RequestID].ApplyEvent(e)
case *RequestTruncated:
delete(s.requests, e.RequestID)
}
}
return nil
}
func Projector(e event.Event) []event.Event {
m := e.EventMeta()
streamID := m.StreamID
streamPos := m.Position
switch e := e.(type) {
case *RequestSubmitted:
e1 := event.NewPtr(streamID, streamPos)
event.SetStreamID(aggRequest(e.RequestID()), e1)
return []event.Event{e1}
case *ResultSubmitted:
e1 := event.NewPtr(streamID, streamPos)
event.SetStreamID(aggRequest(e.RequestID), e1)
e2 := event.NewPtr(streamID, streamPos)
event.SetStreamID(aggPeer(e.PeerID), e2)
return []event.Event{e1, e2}
}
return nil
}

241
app/salty/blobs.go Normal file
View File

@@ -0,0 +1,241 @@
package salty
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"os"
"path/filepath"
"strings"
"go.opentelemetry.io/otel/metric"
"go.sour.is/pkg/authreq"
"go.sour.is/pkg/lg"
"go.uber.org/multierr"
)
var (
ErrAddressExists = errors.New("error: address already exists")
ErrBlobNotFound = errors.New("error: blob not found")
)
func WithBlobStore(path string) *withBlobStore {
return &withBlobStore{path: path}
}
type withBlobStore struct {
path string
m_get_blob metric.Int64Counter
m_put_blob metric.Int64Counter
m_delete_blob metric.Int64Counter
}
func (o *withBlobStore) ApplySalty(s *service) {}
func (o *withBlobStore) Setup(ctx context.Context) error {
ctx, span := lg.Span(ctx)
defer span.End()
var err, errs error
err = os.MkdirAll(o.path, 0700)
if err != nil {
return err
}
m := lg.Meter(ctx)
o.m_get_blob, err = m.Int64Counter("salty_get_blob",
metric.WithDescription("salty get blob called"),
)
errs = multierr.Append(errs, err)
o.m_put_blob, err = m.Int64Counter("salty_put_blob",
metric.WithDescription("salty put blob called"),
)
errs = multierr.Append(errs, err)
o.m_delete_blob, err = m.Int64Counter("salty_delete_blob",
metric.WithDescription("salty delete blob called"),
)
errs = multierr.Append(errs, err)
return errs
}
func (o *withBlobStore) RegisterAPIv1(mux *http.ServeMux) {
mux.Handle("/blob/", authreq.Authorization(o))
}
func (o *withBlobStore) ServeHTTP(w http.ResponseWriter, r *http.Request) {
ctx, span := lg.Span(r.Context())
defer span.End()
claims := authreq.FromContext(ctx)
if claims == nil {
httpError(w, http.StatusUnauthorized)
return
}
signer := claims.Issuer
key := strings.TrimPrefix(r.URL.Path, "/blob/")
switch r.Method {
case http.MethodDelete:
if err := deleteBlob(o.path, key, signer); err != nil {
if errors.Is(err, ErrBlobNotFound) {
httpError(w, http.StatusNotFound)
return
}
span.RecordError(fmt.Errorf("%w: getting blob %s for %s", err, key, signer))
httpError(w, http.StatusInternalServerError)
return
}
http.Error(w, "Blob Deleted", http.StatusOK)
case http.MethodGet, http.MethodHead:
blob, err := getBlob(o.path, key, signer)
if err != nil {
if errors.Is(err, ErrBlobNotFound) {
httpError(w, http.StatusNotFound)
return
}
span.RecordError(fmt.Errorf("%w: getting blob %s for %s", err, key, signer))
httpError(w, http.StatusInternalServerError)
return
}
defer blob.Close()
blob.SetHeaders(r)
if r.Method == http.MethodGet {
_, _ = io.Copy(w, blob)
}
case http.MethodPut:
data, err := io.ReadAll(r.Body)
if err != nil {
httpError(w, http.StatusInternalServerError)
return
}
defer r.Body.Close()
if err := putBlob(o.path, key, data, signer); err != nil {
span.RecordError(fmt.Errorf("%w: putting blob %s for %s", err, key, signer))
httpError(w, http.StatusInternalServerError)
return
}
http.Error(w, "Blob Created", http.StatusCreated)
default:
http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
}
}
func putBlob(path string, key string, data []byte, signer string) error {
p := filepath.Join(path, signer, key)
if err := os.MkdirAll(p, 0700); err != nil {
return fmt.Errorf("error creating blobs paths %s: %w", p, err)
}
fn := filepath.Join(p, "content")
if err := os.WriteFile(fn, data, os.FileMode(0600)); err != nil {
return fmt.Errorf("error writing blob %s: %w", fn, err)
}
return nil
}
func getBlob(path string, key string, signer string) (*Blob, error) {
p := filepath.Join(path, signer, key)
if err := os.MkdirAll(p, 0755); err != nil {
return nil, fmt.Errorf("error creating blobs paths %s: %w", p, err)
}
fn := filepath.Join(p, "content")
if !FileExists(fn) {
return nil, ErrBlobNotFound
}
return OpenBlob(fn)
}
func deleteBlob(path string, key string, signer string) error {
p := filepath.Join(path, signer, key)
if !FileExists(p) {
return ErrBlobNotFound
}
return os.RemoveAll(p)
}
// FileExists returns true if the given file exists
func FileExists(name string) bool {
if _, err := os.Stat(name); err != nil {
if os.IsNotExist(err) {
return false
}
}
return true
}
func httpError(w http.ResponseWriter, code int) {
http.Error(w, http.StatusText(code), code)
}
// Blob defines the type, filename and whether or not a blob is publicly accessible or not.
// A Blob also holds zero or more properties as a map of key/value pairs of string interpreted
// by the client.
type Blob struct {
r io.ReadSeekCloser `json:"-"`
Type string `json:"type"`
Public bool `json:"public"`
Filename string `json:"-"`
Properties map[string]string `json:"props"`
}
// Close closes the blob and the underlying io.ReadSeekCloser
func (b *Blob) Close() error { return b.r.Close() }
// Read reads data from the blob from the underlying io.ReadSeekCloser
func (b *Blob) Read(p []byte) (n int, err error) { return b.r.Read(p) }
// SetHeaders sets HTTP headers on the net/http.Request object based on the blob's type, filename
// and various other properties (if any).
func (b *Blob) SetHeaders(r *http.Request) {
// TODO: Implement this...
}
// OpenBlob opens a blob at the given path and returns a Blob object
func OpenBlob(fn string) (*Blob, error) {
f, err := os.Open(fn)
if err != nil {
return nil, fmt.Errorf("%w: opening blob %s", err, fn)
}
b := &Blob{r: f, Filename: fn}
props := filepath.Join(filepath.Dir(fn), "props.json")
if FileExists(filepath.Dir(props)) {
pf, err := os.Open(props)
if err != nil {
return b, fmt.Errorf("%w: opening blob props %s", err, props)
}
err = json.NewDecoder(pf).Decode(b)
if err != nil {
return b, fmt.Errorf("%w: opening blob props %s", err, props)
}
}
return b, nil
}

156
app/salty/salty-addr.go Normal file
View File

@@ -0,0 +1,156 @@
package salty
import (
"context"
"crypto/sha256"
"encoding/json"
"fmt"
"net/http"
"net/url"
"strings"
"github.com/keys-pub/keys"
"go.sour.is/pkg/lg"
)
// Config represents a Salty Config for a User which at a minimum is required
// to have an Endpoint and Key (Public Key)
type Config struct {
Endpoint string `json:"endpoint"`
Key string `json:"key"`
}
type Capabilities struct {
AcceptEncoding string
}
func (c Capabilities) String() string {
if c.AcceptEncoding == "" {
return "<nil>"
}
return fmt.Sprint("accept-encoding: ", c.AcceptEncoding)
}
type Addr struct {
User string
Domain string
capabilities Capabilities
discoveredDomain string
dns DNSResolver
endpoint *url.URL
key *keys.EdX25519PublicKey
}
// ParseAddr parsers a Salty Address for a user into it's user and domain
// parts and returns an Addr object with the User and Domain and a method
// for returning the expected User's Well-Known URI
func (s *service) ParseAddr(addr string) (*Addr, error) {
parts := strings.Split(strings.ToLower(addr), "@")
if len(parts) != 2 {
return nil, fmt.Errorf("expected nick@domain found %q", addr)
}
return &Addr{User: parts[0], Domain: parts[1], dns: s.dns}, nil
}
func (a *Addr) String() string {
return fmt.Sprintf("%s@%s", a.User, a.Domain)
}
// Hash returns the Hex(SHA256Sum()) of the Address
func (a *Addr) Hash() string {
return fmt.Sprintf("%x", sha256.Sum256([]byte(strings.ToLower(a.String()))))
}
// URI returns the Well-Known URI for this Addr
func (a *Addr) URI() string {
return fmt.Sprintf("https://%s/.well-known/salty/%s.json", a.DiscoveredDomain(), a.User)
}
// HashURI returns the Well-Known HashURI for this Addr
func (a *Addr) HashURI() string {
return fmt.Sprintf("https://%s/.well-known/salty/%s.json", a.DiscoveredDomain(), a.Hash())
}
// DiscoveredDomain returns the discovered domain (if any) of fallbacks to the Domain
func (a *Addr) DiscoveredDomain() string {
if a.discoveredDomain != "" {
return a.discoveredDomain
}
return a.Domain
}
func (a *Addr) Refresh(ctx context.Context) error {
ctx, span := lg.Span(ctx)
defer span.End()
span.AddEvent(fmt.Sprintf("Looking up SRV record for _salty._tcp.%s", a.Domain))
if _, srv, err := a.dns.LookupSRV(ctx, "salty", "tcp", a.Domain); err == nil {
if len(srv) > 0 {
a.discoveredDomain = strings.TrimSuffix(srv[0].Target, ".")
}
span.AddEvent(fmt.Sprintf("Discovered salty services %s", a.discoveredDomain))
} else if err != nil {
span.RecordError(fmt.Errorf("error looking up SRV record for _salty._tcp.%s : %s", a.Domain, err))
}
config, cap, err := fetchConfig(ctx, a.HashURI())
if err != nil {
// Fallback to plain user nick
span.RecordError(err)
config, cap, err = fetchConfig(ctx, a.URI())
}
if err != nil {
err = fmt.Errorf("error looking up user %s: %w", a, err)
span.RecordError(err)
return err
}
key, err := keys.NewEdX25519PublicKeyFromID(keys.ID(config.Key))
if err != nil {
err = fmt.Errorf("error parsing public key %s: %w", config.Key, err)
span.RecordError(err)
return err
}
a.key = key
u, err := url.Parse(config.Endpoint)
if err != nil {
err = fmt.Errorf("error parsing endpoint %s: %w", config.Endpoint, err)
span.RecordError(err)
return err
}
a.endpoint = u
a.capabilities = cap
span.AddEvent(fmt.Sprintf("Discovered endpoint: %v", a.endpoint))
span.AddEvent(fmt.Sprintf("Discovered capability: %v", a.capabilities))
return nil
}
func fetchConfig(ctx context.Context, addr string) (config Config, cap Capabilities, err error) {
ctx, span := lg.Span(ctx)
defer span.End()
var req *http.Request
req, err = http.NewRequestWithContext(ctx, http.MethodGet, addr, nil)
if err != nil {
return
}
res, err := http.DefaultClient.Do(req)
if err != nil {
return
}
if err = json.NewDecoder(res.Body).Decode(&config); err != nil {
return
}
cap.AcceptEncoding = res.Header.Get("Accept-Encoding")
return
}

94
app/salty/salty-user.go Normal file
View File

@@ -0,0 +1,94 @@
package salty
import (
"bytes"
"context"
"crypto/sha256"
"fmt"
"log"
"net/url"
"strings"
"github.com/keys-pub/keys"
"github.com/oklog/ulid/v2"
"go.sour.is/ev/event"
"go.sour.is/pkg/gql"
)
type SaltyUser struct {
pubkey *keys.EdX25519PublicKey
inbox ulid.ULID
event.IsAggregate
}
var _ event.Aggregate = (*SaltyUser)(nil)
// ApplyEvent applies the event to the aggrigate state
func (a *SaltyUser) ApplyEvent(lis ...event.Event) {
for _, e := range lis {
switch e := e.(type) {
case *UserRegistered:
// a.name = e.Name
a.pubkey = e.Pubkey
a.inbox = e.EventMeta().EventID
// a.SetStreamID(a.streamID())
default:
log.Printf("unknown event %T", e)
}
}
}
func (a *SaltyUser) OnUserRegister(pubkey *keys.EdX25519PublicKey) error {
if err := event.NotExists(a); err != nil {
return err
}
event.Raise(a, &UserRegistered{Pubkey: pubkey})
return nil
}
func (a *SaltyUser) Inbox() string { return a.inbox.String() }
func (a *SaltyUser) Pubkey() string { return a.pubkey.String() }
func (s *SaltyUser) Endpoint(ctx context.Context) (string, error) {
svc := gql.FromContext[contextKey, *service](ctx, saltyKey)
return url.JoinPath(svc.BaseURL(), s.inbox.String())
}
type UserRegistered struct {
Name string
Pubkey *keys.EdX25519PublicKey
event.IsEvent
}
var _ event.Event = (*UserRegistered)(nil)
func (e *UserRegistered) MarshalBinary() (text []byte, err error) {
var b bytes.Buffer
b.WriteString(e.Name)
b.WriteRune('\t')
b.WriteString(e.Pubkey.String())
return b.Bytes(), nil
}
func (e *UserRegistered) UnmarshalBinary(b []byte) error {
name, pub, ok := bytes.Cut(b, []byte{'\t'})
if !ok {
return fmt.Errorf("parse error")
}
var err error
e.Name = string(name)
e.Pubkey, err = keys.NewEdX25519PublicKeyFromID(keys.ID(pub))
return err
}
func NickToStreamID(nick string) string {
return fmt.Sprintf("saltyuser-%x", sha256.Sum256([]byte(strings.ToLower(nick))))
}
func HashToStreamID(hash string) string {
return fmt.Sprint("saltyuser-", hash)
}

13
app/salty/salty.graphqls Normal file
View File

@@ -0,0 +1,13 @@
extend type Query {
saltyUser(nick: String!): SaltyUser
}
extend type Mutation {
createSaltyUser(nick: String! pubkey: String!): SaltyUser
}
type SaltyUser @goModel(model: "go.sour.is/tools/app/salty.SaltyUser"){
pubkey: String!
inbox: String!
endpoint: String!
}

397
app/salty/service.go Normal file
View File

@@ -0,0 +1,397 @@
package salty
import (
"context"
"crypto/sha256"
"encoding/json"
"errors"
"fmt"
"net"
"net/http"
"net/url"
"strings"
"time"
"github.com/keys-pub/keys"
"go.mills.io/saltyim"
"go.opentelemetry.io/otel/metric"
"go.uber.org/multierr"
"go.sour.is/ev"
"go.sour.is/ev/event"
"go.sour.is/pkg/gql"
"go.sour.is/pkg/lg"
)
type DNSResolver interface {
LookupSRV(ctx context.Context, service, proto, name string) (string, []*net.SRV, error)
}
type service struct {
baseURL string
es *ev.EventStore
dns DNSResolver
m_create_user metric.Int64Counter
m_get_user metric.Int64Counter
m_api_ping metric.Int64Counter
m_api_register metric.Int64Counter
m_api_lookup metric.Int64Counter
m_api_send metric.Int64Counter
m_req_time metric.Int64Histogram
opts []Option
}
type Option interface {
ApplySalty(*service)
}
type WithBaseURL string
func (o WithBaseURL) ApplySalty(s *service) {
s.baseURL = string(o)
}
type contextKey struct {
name string
}
var saltyKey = contextKey{"salty"}
type SaltyResolver interface {
CreateSaltyUser(ctx context.Context, nick string, pub string) (*SaltyUser, error)
SaltyUser(ctx context.Context, nick string) (*SaltyUser, error)
IsResolver()
}
func New(ctx context.Context, es *ev.EventStore, opts ...Option) (*service, error) {
ctx, span := lg.Span(ctx)
defer span.End()
if err := event.Register(ctx, &UserRegistered{}); err != nil {
return nil, err
}
if err := event.RegisterName(ctx, "domain.UserRegistered", &UserRegistered{}); err != nil {
return nil, err
}
m := lg.Meter(ctx)
svc := &service{opts: opts, es: es, dns: net.DefaultResolver}
for _, o := range opts {
o.ApplySalty(svc)
if o, ok := o.(interface{ Setup(context.Context) error }); ok {
if err := o.Setup(ctx); err != nil {
return nil, err
}
}
}
var err, errs error
svc.m_create_user, err = m.Int64Counter("salty_create_user",
metric.WithDescription("salty create user graphql called"),
)
errs = multierr.Append(errs, err)
svc.m_get_user, err = m.Int64Counter("salty_get_user",
metric.WithDescription("salty get user graphql called"),
)
errs = multierr.Append(errs, err)
svc.m_api_ping, err = m.Int64Counter("salty_api_ping",
metric.WithDescription("salty api ping called"),
)
errs = multierr.Append(errs, err)
svc.m_api_register, err = m.Int64Counter("salty_api_register",
metric.WithDescription("salty api register"),
)
errs = multierr.Append(errs, err)
svc.m_api_lookup, err = m.Int64Counter("salty_api_lookup",
metric.WithDescription("salty api ping lookup"),
)
errs = multierr.Append(errs, err)
svc.m_api_send, err = m.Int64Counter("salty_api_send",
metric.WithDescription("salty api ping send"),
)
errs = multierr.Append(errs, err)
svc.m_req_time, err = m.Int64Histogram("salty_request_time",
metric.WithDescription("histogram of requests"),
metric.WithUnit("ns"),
)
errs = multierr.Append(errs, err)
span.RecordError(err)
return svc, errs
}
func (s *service) BaseURL() string {
if s == nil {
return "http://missing.context/"
}
return s.baseURL
}
func (s *service) RegisterHTTP(mux *http.ServeMux) {
for _, o := range s.opts {
if o, ok := o.(interface{ RegisterHTTP(mux *http.ServeMux) }); ok {
o.RegisterHTTP(mux)
}
}
}
func (s *service) RegisterAPIv1(mux *http.ServeMux) {
mux.HandleFunc("/ping", s.apiv1)
mux.HandleFunc("/register", s.apiv1)
mux.HandleFunc("/lookup/", s.apiv1)
mux.HandleFunc("/send", s.apiv1)
for _, o := range s.opts {
if o, ok := o.(interface{ RegisterAPIv1(mux *http.ServeMux) }); ok {
o.RegisterAPIv1(mux)
}
}
}
func (s *service) RegisterWellKnown(mux *http.ServeMux) {
mux.Handle("/salty/", lg.Htrace(s, "lookup"))
for _, o := range s.opts {
if o, ok := o.(interface{ RegisterWellKnown(mux *http.ServeMux) }); ok {
o.RegisterWellKnown(mux)
}
}
}
func (s *service) ServeHTTP(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
ctx, span := lg.Span(ctx)
defer span.End()
start := time.Now()
defer s.m_req_time.Record(ctx, time.Since(start).Milliseconds())
addr := "saltyuser-" + strings.TrimPrefix(r.URL.Path, "/salty/")
addr = strings.TrimSuffix(addr, ".json")
span.AddEvent(fmt.Sprint("find ", addr))
a, err := ev.Update(ctx, s.es, addr, func(ctx context.Context, agg *SaltyUser) error { return nil })
switch {
case errors.Is(err, event.ErrShouldExist):
span.RecordError(err)
w.WriteHeader(http.StatusNotFound)
return
case err != nil:
span.RecordError(err)
w.WriteHeader(http.StatusInternalServerError)
return
}
basePath, _ := url.JoinPath(s.baseURL, a.inbox.String())
err = json.NewEncoder(w).Encode(
struct {
Endpoint string `json:"endpoint"`
Key string `json:"key"`
}{
Endpoint: basePath,
Key: a.pubkey.ID().String(),
})
if err != nil {
span.RecordError(err)
}
}
func (s *service) IsResolver() {}
func (s *service) GetMiddleware() func(http.Handler) http.Handler {
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
r = r.WithContext(gql.ToContext(r.Context(), saltyKey, s))
next.ServeHTTP(w, r)
})
}
}
func (s *service) CreateSaltyUser(ctx context.Context, nick string, pub string) (*SaltyUser, error) {
ctx, span := lg.Span(ctx)
defer span.End()
s.m_create_user.Add(ctx, 1)
start := time.Now()
defer s.m_req_time.Record(ctx, time.Since(start).Milliseconds())
streamID := NickToStreamID(nick)
span.AddEvent(streamID)
return s.createSaltyUser(ctx, streamID, pub)
}
func (s *service) createSaltyUser(ctx context.Context, streamID, pub string) (*SaltyUser, error) {
ctx, span := lg.Span(ctx)
defer span.End()
key, err := keys.NewEdX25519PublicKeyFromID(keys.ID(pub))
if err != nil {
span.RecordError(err)
return nil, err
}
a, err := ev.Create(ctx, s.es, streamID, func(ctx context.Context, agg *SaltyUser) error {
return agg.OnUserRegister(key)
})
switch {
case errors.Is(err, ev.ErrShouldNotExist):
span.RecordError(err)
return nil, fmt.Errorf("user exists: %w", err)
case err != nil:
span.RecordError(err)
return nil, fmt.Errorf("internal error: %w", err)
}
return a, nil
}
func (s *service) SaltyUser(ctx context.Context, nick string) (*SaltyUser, error) {
ctx, span := lg.Span(ctx)
defer span.End()
s.m_get_user.Add(ctx, 1)
start := time.Now()
defer s.m_req_time.Record(ctx, time.Since(start).Milliseconds())
streamID := fmt.Sprintf("saltyuser-%x", sha256.Sum256([]byte(strings.ToLower(nick))))
span.AddEvent(streamID)
a, err := ev.Update(ctx, s.es, streamID, func(ctx context.Context, agg *SaltyUser) error { return nil })
switch {
case errors.Is(err, ev.ErrShouldExist):
span.RecordError(err)
return nil, fmt.Errorf("user not found")
case err != nil:
span.RecordError(err)
return nil, fmt.Errorf("%w internal error", err)
}
return a, err
}
func (s *service) apiv1(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
ctx, span := lg.Span(ctx)
defer span.End()
start := time.Now()
defer s.m_req_time.Record(ctx, time.Since(start).Nanoseconds())
switch r.Method {
case http.MethodGet:
switch {
case r.URL.Path == "/ping":
s.m_api_ping.Add(ctx, 1)
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{}`))
case strings.HasPrefix(r.URL.Path, "/lookup/"):
s.m_api_lookup.Add(ctx, 1)
addr, err := s.ParseAddr(strings.TrimPrefix(r.URL.Path, "/lookup/"))
if err != nil {
span.RecordError(err)
w.WriteHeader(http.StatusBadRequest)
return
}
err = addr.Refresh(ctx)
if err != nil {
span.RecordError(err)
w.WriteHeader(http.StatusBadRequest)
return
}
err = json.NewEncoder(w).Encode(addr)
span.RecordError(err)
return
default:
w.WriteHeader(http.StatusNotFound)
return
}
case http.MethodPost:
switch r.URL.Path {
case "/register":
s.m_api_register.Add(ctx, 1)
req, signer, err := saltyim.NewRegisterRequest(r.Body)
if err != nil {
span.RecordError(fmt.Errorf("error parsing register request: %w", err))
http.Error(w, "Bad Request", http.StatusBadRequest)
return
}
if signer != req.Key {
http.Error(w, "Bad Request", http.StatusBadRequest)
return
}
_, err = s.createSaltyUser(ctx, HashToStreamID(req.Hash), req.Key)
if errors.Is(err, event.ErrShouldNotExist) {
http.Error(w, "Already Exists", http.StatusConflict)
return
} else if err != nil {
http.Error(w, "Error", http.StatusInternalServerError)
return
}
http.Error(w, "Endpoint Created", http.StatusCreated)
return
case "/send":
s.m_api_send.Add(ctx, 1)
req, signer, err := saltyim.NewSendRequest(r.Body)
if err != nil {
span.RecordError(fmt.Errorf("error parsing send request: %w", err))
http.Error(w, "Bad Request", http.StatusBadRequest)
return
}
// TODO: Do something with signer?
span.AddEvent(fmt.Sprintf("request signed by %s", signer))
u, err := url.Parse(req.Endpoint)
if err != nil {
span.RecordError(fmt.Errorf("error parsing endpoint %s: %w", req.Endpoint, err))
http.Error(w, "Bad Endpoint", http.StatusBadRequest)
return
}
if !u.IsAbs() {
span.RecordError(fmt.Errorf("endpoint %s is not an absolute uri: %w", req.Endpoint, err))
http.Error(w, "Bad Endpoint", http.StatusBadRequest)
return
}
// TODO: Queue up an internal retry and return immediately on failure?
if err := saltyim.Send(req.Endpoint, req.Message, req.Capabilities); err != nil {
span.RecordError(fmt.Errorf("error sending message to %s: %w", req.Endpoint, err))
http.Error(w, "Send Error", http.StatusInternalServerError)
return
}
http.Error(w, "Message Accepted", http.StatusAccepted)
return
default:
w.WriteHeader(http.StatusNotFound)
return
}
default:
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
}

1
app/twtxt/twtxt.go Normal file
View File

@@ -0,0 +1 @@
package twtxt

35
app/webfinger/addr.go Normal file
View File

@@ -0,0 +1,35 @@
package webfinger
import (
"net/url"
"strings"
)
type Addr struct {
prefix []string
URL *url.URL
}
func Parse(s string) *Addr {
addr := &Addr{}
addr.URL, _ = url.Parse(s)
if addr.URL.Opaque == "" {
return addr
}
var hasPfx = true
pfx := addr.URL.Scheme
for hasPfx {
addr.prefix = append(addr.prefix, pfx)
pfx, addr.URL.Opaque, hasPfx = strings.Cut(addr.URL.Opaque, ":")
}
user, host, _ := strings.Cut(pfx, "@")
addr.URL.User = url.User(user)
addr.URL.Host = host
return addr
}

46
app/webfinger/client.go Normal file
View File

@@ -0,0 +1,46 @@
package webfinger
import (
"crypto/ed25519"
"encoding/base64"
"time"
"github.com/golang-jwt/jwt/v4"
"github.com/oklog/ulid/v2"
)
var (
defaultExpire = 30 * time.Minute
defaultIssuer = "sour.is/webfinger"
defaultAudience = "sour.is/webfinger"
)
func NewSignedRequest(jrd *JRD, key ed25519.PrivateKey) (string, error) {
type claims struct {
PubKey string `json:"pub"`
*JRD
jwt.RegisteredClaims
}
pub := []byte(key.Public().(ed25519.PublicKey))
j := claims{
PubKey: enc(pub),
JRD: jrd.CloneValues(),
RegisteredClaims: jwt.RegisteredClaims{
ID: ulid.Make().String(),
Subject: jrd.Subject,
Audience: jwt.ClaimStrings{defaultAudience},
ExpiresAt: jwt.NewNumericDate(time.Now().Add(defaultExpire)),
IssuedAt: jwt.NewNumericDate(time.Now()),
Issuer: defaultIssuer,
},
}
j.JRD.Subject = "" // move subject into registered claims.
token := jwt.NewWithClaims(jwt.SigningMethodEdDSA, &j)
return token.SignedString(key)
}
func enc(b []byte) string {
return base64.RawURLEncoding.EncodeToString(b)
}

44
app/webfinger/events.go Normal file
View File

@@ -0,0 +1,44 @@
package webfinger
import (
"go.sour.is/ev/event"
)
type SubjectSet struct {
Subject string `json:"subject"`
Aliases []string `json:"aliases,omitempty"`
Properties map[string]*string `json:"properties,omitempty"`
event.IsEvent `json:"-"`
}
type SubjectDeleted struct {
Subject string `json:"subject"`
event.IsEvent `json:"-"`
}
var _ event.Event = (*SubjectDeleted)(nil)
type LinkSet struct {
Index uint64 `json:"idx"`
Rel string `json:"rel"`
Type string `json:"type,omitempty"`
HRef string `json:"href,omitempty"`
Titles map[string]string `json:"titles,omitempty"`
Properties map[string]*string `json:"properties,omitempty"`
Template string `json:"template,omitempty"`
event.IsEvent `json:"-"`
}
var _ event.Event = (*LinkSet)(nil)
type LinkDeleted struct {
Index uint64 `json:"idx"`
Rel string `json:"rel"`
event.IsEvent `json:"-"`
}
var _ event.Event = (*LinkDeleted)(nil)

426
app/webfinger/jrd.go Normal file
View File

@@ -0,0 +1,426 @@
package webfinger
import (
"bytes"
"encoding/base64"
"encoding/json"
"fmt"
"hash/fnv"
"sort"
"go.sour.is/pkg/set"
"go.sour.is/pkg/slice"
"gopkg.in/yaml.v3"
"go.sour.is/ev/event"
)
func StreamID(subject string) string {
h := fnv.New128a()
h.Write([]byte(subject))
return "webfinger." + base64.RawURLEncoding.EncodeToString(h.Sum(nil))
}
// JRD is a JSON Resource Descriptor, specifying properties and related links
// for a resource.
type JRD struct {
Subject string `json:"subject,omitempty" yaml:"subject,omitempty"`
Aliases []string `json:"aliases,omitempty" yaml:"aliases,omitempty"`
Properties map[string]*string `json:"properties,omitempty" yaml:"properties,omitempty"`
Links Links `json:"links,omitempty" yaml:"links,omitempty"`
deleted bool
event.IsAggregate `json:"-" yaml:"-"`
}
func (a *JRD) CloneValues() *JRD {
m := make(map[string]*string, len(a.Properties))
for k, v := range a.Properties {
m[k] = v
}
return &JRD{
Subject: a.Subject,
Aliases: append([]string{}, a.Aliases...),
Properties: m,
Links: append([]*Link{}, a.Links...),
}
}
var _ event.Aggregate = (*JRD)(nil)
// Link is a link to a related resource.
type Link struct {
Index uint64 `json:"-" yaml:"-"`
Rel string `json:"rel,omitempty"`
Type string `json:"type,omitempty"`
HRef string `json:"href,omitempty"`
Titles map[string]string `json:"titles,omitempty"`
Properties map[string]*string `json:"properties,omitempty"`
Template string `json:"template,omitempty"`
}
type Links []*Link
// Len is the number of elements in the collection.
func (l Links) Len() int {
if l == nil {
return 0
}
return len(l)
}
// Less reports whether the element with index i
func (l Links) Less(i int, j int) bool {
if l[i] == nil || l[j] == nil {
return false
}
if l[i].Rel == l[j].Rel {
return l[i].Type < l[j].Type
}
return l[i].Rel < l[j].Rel
}
// Swap swaps the elements with indexes i and j.
func (l Links) Swap(i int, j int) {
if l == nil {
return
}
l[i], l[j] = l[j], l[i]
}
// ParseJRD parses the JRD using json.Unmarshal.
func ParseJRD(blob []byte) (*JRD, error) {
jrd := JRD{}
err := json.Unmarshal(blob, &jrd)
if err != nil {
return nil, err
}
for i := range jrd.Links {
jrd.Links[i].Index = uint64(i)
}
return &jrd, nil
}
// GetLinkByRel returns the first *Link with the specified rel value.
func (jrd *JRD) GetLinkByRel(rel string) *Link {
for _, link := range jrd.Links {
if link.Rel == rel {
return link
}
}
return nil
}
// GetLinksByRel returns each *Link with the specified rel value.
func (jrd *JRD) GetLinksByRel(rel ...string) []*Link {
var lis []*Link
rels := set.New(rel...)
for _, link := range jrd.Links {
if rels.Has(link.Rel) {
lis = append(lis, link)
}
}
return lis
}
// GetProperty Returns the property value as a string.
// Per spec a property value can be null, empty string is returned in this case.
func (jrd *JRD) GetProperty(uri string) string {
if jrd.Properties[uri] == nil {
return ""
}
return *jrd.Properties[uri]
}
func (a *JRD) SetProperty(name string, value *string) {
if a.Properties == nil {
a.Properties = make(map[string]*string)
}
a.Properties[name] = value
}
func (a *JRD) DeleteProperty(name string) {
if a.Properties == nil {
return
}
delete(a.Properties, name)
}
func (a *JRD) IsDeleted() bool {
return a.deleted
}
// GetProperty Returns the property value as a string.
// Per spec a property value can be null, empty string is returned in this case.
func (link *Link) GetProperty(uri string) string {
if link.Properties[uri] == nil {
return ""
}
return *link.Properties[uri]
}
func (link *Link) SetProperty(name string, value *string) {
if link.Properties == nil {
link.Properties = make(map[string]*string)
}
link.Properties[name] = value
}
func (link *Link) DeleteProperty(name string) {
if link.Properties == nil {
return
}
delete(link.Properties, name)
}
// ApplyEvent implements event.Aggregate
func (a *JRD) ApplyEvent(events ...event.Event) {
for _, e := range events {
switch e := e.(type) {
case *SubjectSet:
a.deleted = false
a.Subject = e.Subject
a.Aliases = e.Aliases
a.Properties = e.Properties
case *SubjectDeleted:
a.deleted = true
a.Subject = e.Subject
a.Aliases = a.Aliases[:0]
a.Links = a.Links[:0]
a.Properties = map[string]*string{}
case *LinkSet:
link, ok := slice.FindFn(func(l *Link) bool { return l.Index == e.Index }, a.Links...)
if !ok {
link = &Link{}
link.Index = uint64(len(a.Links))
a.Links = append(a.Links, link)
}
link.Rel = e.Rel
link.HRef = e.HRef
link.Type = e.Type
link.Titles = e.Titles
link.Properties = e.Properties
link.Template = e.Template
case *LinkDeleted:
a.Links = slice.FilterFn(func(link *Link) bool { return link.Index != e.Index }, a.Links...)
}
}
}
const NSauth = "https://sour.is/ns/auth"
const NSpubkey = "https://sour.is/ns/pubkey"
const NSredirect = "https://sour.is/rel/redirect"
func (a *JRD) OnAuth(claim, auth *JRD) error {
pubkey := claim.Properties[NSpubkey]
if v, ok := auth.Properties[NSpubkey]; ok && v != nil && cmpPtr(v, pubkey) {
// pubkey matches!
} else {
return fmt.Errorf("pubkey does not match")
}
if a.Version() > 0 && !a.IsDeleted() && a.Subject != claim.Subject {
return fmt.Errorf("subject does not match")
}
if auth.Subject == claim.Subject {
claim.SetProperty(NSpubkey, pubkey)
} else {
claim.SetProperty(NSauth, &auth.Subject)
claim.DeleteProperty(NSpubkey)
}
return nil
}
func (a *JRD) OnDelete(jrd *JRD) error {
if a.Version() == 0 || a.IsDeleted() {
return nil
}
event.Raise(a, &SubjectDeleted{Subject: jrd.Subject})
return nil
}
func (a *JRD) OnClaims(jrd *JRD) error {
err := a.OnSubjectSet(jrd.Subject, jrd.Aliases, jrd.Properties)
if err != nil {
return err
}
for _, z := range slice.Align(
a.Links, // old
jrd.Links, // new
func(l, r *Link) bool { return l.Index < r.Index },
) {
// Not in new == delete
if z.Key == nil {
link := *z.Value
event.Raise(a, &LinkDeleted{Index: link.Index, Rel: link.Rel})
continue
}
// Not in old == create
if z.Value == nil {
link := *z.Key
event.Raise(a, &LinkSet{
Index: link.Index,
Rel: link.Rel,
Type: link.Type,
HRef: link.HRef,
Titles: link.Titles,
Properties: link.Properties,
Template: link.Template,
})
continue
}
// in both == compare
a.OnLinkSet(*z.Key, *z.Value)
}
return nil
}
func (a *JRD) OnSubjectSet(subject string, aliases []string, props map[string]*string) error {
modified := false
e := &SubjectSet{
Subject: subject,
Aliases: aliases,
Properties: props,
}
if subject != a.Subject {
modified = true
}
sort.Strings(aliases)
sort.Strings(a.Aliases)
for _, z := range slice.Zip(aliases, a.Aliases) {
if z.Key != z.Value {
modified = true
break
}
}
for _, z := range slice.Zip(
slice.Zip(slice.FromMap(props)),
slice.Zip(slice.FromMap(a.Properties)),
) {
newValue := z.Key
curValue := z.Value
if newValue.Key != curValue.Key {
modified = true
break
}
if !cmpPtr(newValue.Value, curValue.Value) {
modified = true
break
}
}
if modified {
event.Raise(a, e)
}
return nil
}
func (a *JRD) OnLinkSet(o, n *Link) error {
modified := false
e := &LinkSet{
Index: n.Index,
Rel: n.Rel,
Type: n.Type,
HRef: n.HRef,
Titles: n.Titles,
Properties: n.Properties,
Template: n.Template,
}
if n.Rel != o.Rel {
modified = true
}
if n.Type != o.Type {
modified = true
}
if n.HRef != o.HRef {
modified = true
}
if n.Template != o.Template {
fmt.Println(360, n.Template, o.Template, e.Template)
modified = true
}
nKeys := slice.FromMapKeys(n.Properties)
sort.Strings(nKeys)
oKeys := slice.FromMapKeys(o.Properties)
sort.Strings(oKeys)
for _, z := range slice.Zip(
slice.Zip(nKeys, slice.FromMapValues(n.Titles, nKeys)),
slice.Zip(oKeys, slice.FromMapValues(o.Titles, oKeys)),
) {
if z.Key != z.Value {
modified = true
break
}
}
nKeys = slice.FromMapKeys(n.Properties)
sort.Strings(nKeys)
oKeys = slice.FromMapKeys(o.Properties)
sort.Strings(oKeys)
for _, z := range slice.Zip(
slice.Zip(nKeys, slice.FromMapValues(n.Properties, nKeys)),
slice.Zip(oKeys, slice.FromMapValues(o.Properties, oKeys)),
) {
newValue := z.Key
curValue := z.Value
if newValue.Key != curValue.Key {
modified = true
break
}
if !cmpPtr(newValue.Value, curValue.Value) {
modified = true
break
}
}
if modified {
event.Raise(a, e)
}
return nil
}
func cmpPtr[T comparable](l, r *T) bool {
if l == nil {
return r == nil
}
if r == nil {
return l == nil
}
return *l == *r
}
func (a *JRD) String() string {
b := &bytes.Buffer{}
y := yaml.NewEncoder(b)
_ = y.Encode(a)
return b.String()
}

310
app/webfinger/jrd_test.go Normal file
View File

@@ -0,0 +1,310 @@
package webfinger_test
import (
"context"
"crypto/ed25519"
"encoding/base64"
"encoding/json"
"fmt"
"strings"
"testing"
"time"
jwt "github.com/golang-jwt/jwt/v4"
"github.com/matryer/is"
"go.sour.is/ev"
"go.uber.org/multierr"
"go.sour.is/tools/app/webfinger"
memstore "go.sour.is/ev/driver/mem-store"
"go.sour.is/ev/driver/projecter"
"go.sour.is/ev/driver/streamer"
"go.sour.is/ev/event"
)
func TestParseJRD(t *testing.T) {
// Adapted from spec http://tools.ietf.org/html/rfc6415#appendix-A
blob := `
{
"subject":"http://blog.example.com/article/id/314",
"aliases":[
"http://blog.example.com/cool_new_thing",
"http://blog.example.com/steve/article/7"],
"properties":{
"http://blgx.example.net/ns/version":"1.3",
"http://blgx.example.net/ns/ext":null
},
"links":[
{
"rel":"author",
"type":"text/html",
"href":"http://blog.example.com/author/steve",
"titles":{
"default":"About the Author",
"en-us":"Author Information"
},
"properties":{
"http://example.com/role":"editor"
}
},
{
"rel":"author",
"href":"http://example.com/author/john",
"titles":{
"default":"The other author"
}
},
{
"rel":"copyright"
}
]
}
`
obj, err := webfinger.ParseJRD([]byte(blob))
if err != nil {
t.Fatal(err)
}
if got, want := obj.Subject, "http://blog.example.com/article/id/314"; got != want {
t.Errorf("JRD.Subject is %q, want %q", got, want)
}
if got, want := obj.GetProperty("http://blgx.example.net/ns/version"), "1.3"; got != want {
t.Errorf("obj.GetProperty('http://blgx.example.net/ns/version') returned %q, want %q", got, want)
}
if got, want := obj.GetProperty("http://blgx.example.net/ns/ext"), ""; got != want {
t.Errorf("obj.GetProperty('http://blgx.example.net/ns/ext') returned %q, want %q", got, want)
}
if obj.GetLinkByRel("copyright") == nil {
t.Error("obj.GetLinkByRel('copyright') returned nil, want non-nil value")
}
if got, want := obj.GetLinkByRel("author").Titles["default"], "About the Author"; got != want {
t.Errorf("obj.GetLinkByRel('author').Titles['default'] returned %q, want %q", got, want)
}
if got, want := obj.GetLinkByRel("author").GetProperty("http://example.com/role"), "editor"; got != want {
t.Errorf("obj.GetLinkByRel('author').GetProperty('http://example.com/role') returned %q, want %q", got, want)
}
}
func TestEncodeJRD(t *testing.T) {
s, err := json.Marshal(&webfinger.JRD{
Subject: "test",
Properties: map[string]*string{
"https://sour.is/ns/prop1": nil,
},
})
if err != nil {
t.Fatal(err)
}
if string(s) != `{"subject":"test","properties":{"https://sour.is/ns/prop1":null}}` {
t.Fatal("output does not match")
}
}
func TestApplyEvents(t *testing.T) {
is := is.New(t)
events := event.NewEvents(
&webfinger.SubjectSet{
Subject: "acct:me@sour.is",
Properties: map[string]*string{
"https://sour.is/ns/pubkey": ptr("kex1d330ama4vnu3vll5dgwjv3k0pcxsccc5k2xy3j8khndggkszsmsq3hl4ru"),
},
},
&webfinger.LinkSet{
Index: 0,
Rel: "salty:public",
Type: "application/json+salty",
},
&webfinger.LinkSet{
Index: 1,
Rel: "salty:private",
Type: "application/json+salty",
},
&webfinger.LinkSet{
Index: 0,
Rel: "salty:public",
Type: "application/json+salty",
HRef: "https://ev.sour.is/inbox/01GAEMKXYJ4857JQP1MJGD61Z5",
Properties: map[string]*string{
"pub": ptr("kex1r8zshlvkc787pxvauaq7hd6awa9kmheddxjj9k80qmenyxk6284s50uvpw"),
},
},
&webfinger.LinkDeleted{
Index: 1,
Rel: "salty:private",
},
)
event.SetStreamID(webfinger.StreamID("acct:me@sour.is"), events...)
jrd := &webfinger.JRD{}
jrd.ApplyEvent(events...)
s, err := json.Marshal(jrd)
if err != nil {
t.Fatal(err)
}
is.Equal(string(s), `{"subject":"acct:me@sour.is","properties":{"https://sour.is/ns/pubkey":"kex1d330ama4vnu3vll5dgwjv3k0pcxsccc5k2xy3j8khndggkszsmsq3hl4ru"},"links":[{"rel":"salty:public","type":"application/json+salty","href":"https://ev.sour.is/inbox/01GAEMKXYJ4857JQP1MJGD61Z5","properties":{"pub":"kex1r8zshlvkc787pxvauaq7hd6awa9kmheddxjj9k80qmenyxk6284s50uvpw"}}]}`)
events = event.NewEvents(
&webfinger.SubjectDeleted{},
)
event.SetStreamID(webfinger.StreamID("acct:me@sour.is"), events...)
jrd.ApplyEvent(events...)
s, err = json.Marshal(jrd)
if err != nil {
t.Fatal(err)
}
t.Log(string(s))
if string(s) != `{}` {
t.Fatal("output does not match")
}
}
func TestCommands(t *testing.T) {
is := is.New(t)
ctx := context.Background()
pub, priv, err := ed25519.GenerateKey(nil)
is.NoErr(err)
token := jwt.NewWithClaims(jwt.SigningMethodEdDSA, jwt.MapClaims{
"sub": "acct:me@sour.is",
"pub": enc(pub),
"aliases": []string{"acct:xuu@sour.is"},
"properties": map[string]*string{
"https://example.com/ns/asdf": nil,
webfinger.NSpubkey: ptr(enc(pub)),
},
"links": []map[string]any{{
"rel": "salty:public",
"type": "application/json+salty",
"href": "https://ev.sour.is",
"titles": map[string]string{"default": "Jon Lundy"},
"properties": map[string]*string{
"pub": ptr("kex140fwaena9t0mrgnjeare5zuknmmvl0vc7agqy5yr938vusxfh9ys34vd2p"),
},
}},
"exp": time.Now().Add(30 * time.Second).Unix(),
})
aToken, err := token.SignedString(priv)
is.NoErr(err)
es, err := ev.Open(ctx, "mem:", streamer.New(ctx), projecter.New(ctx))
is.NoErr(err)
type claims struct {
Subject string `json:"sub"`
PubKey string `json:"pub"`
*webfinger.JRD
jwt.StandardClaims
}
token, err = jwt.ParseWithClaims(
aToken,
&claims{},
func(tok *jwt.Token) (any, error) {
c, ok := tok.Claims.(*claims)
if !ok {
return nil, fmt.Errorf("wrong type of claim")
}
c.JRD.Subject = c.Subject
c.StandardClaims.Subject = c.Subject
c.SetProperty(webfinger.NSpubkey, &c.PubKey)
pub, err := dec(c.PubKey)
return ed25519.PublicKey(pub), err
},
jwt.WithValidMethods([]string{"EdDSA"}),
jwt.WithJSONNumber(),
)
is.NoErr(err)
c, ok := token.Claims.(*claims)
is.True(ok)
t.Logf("%#v", c)
a, err := ev.Upsert(ctx, es, webfinger.StreamID(c.Subject), func(ctx context.Context, a *webfinger.JRD) error {
var auth *webfinger.JRD
// does the target have a pubkey for self auth?
if _, ok := a.Properties[webfinger.NSpubkey]; ok {
auth = a
}
// Check current version for auth.
if authID, ok := a.Properties[webfinger.NSauth]; ok && authID != nil && auth == nil {
auth = &webfinger.JRD{}
auth.SetStreamID(webfinger.StreamID(*authID))
err := es.Load(ctx, auth)
if err != nil {
return err
}
}
if a.Version() == 0 || a.IsDeleted() {
// else does the new object claim auth from another object?
if authID, ok := c.Properties[webfinger.NSauth]; ok && authID != nil && auth == nil {
auth = &webfinger.JRD{}
auth.SetStreamID(webfinger.StreamID(*authID))
err := es.Load(ctx, auth)
if err != nil {
return err
}
}
// fall back to use auth from submitted claims
if auth == nil {
auth = c.JRD
}
}
if auth == nil {
return fmt.Errorf("auth not found")
}
err = a.OnAuth(c.JRD, auth)
if err != nil {
return err
}
return a.OnClaims(c.JRD)
})
is.NoErr(err)
for _, e := range a.Events(false) {
t.Log(e)
}
}
func ptr[T any](v T) *T {
return &v
}
func enc(b []byte) string {
return base64.RawURLEncoding.EncodeToString(b)
}
func dec(s string) ([]byte, error) {
s = strings.TrimSpace(s)
return base64.RawURLEncoding.DecodeString(s)
}
func TestMain(m *testing.M) {
ctx, stop := context.WithCancel(context.Background())
defer stop()
err := multierr.Combine(
ev.Init(ctx),
event.Init(ctx),
memstore.Init(ctx),
)
if err != nil {
fmt.Println(err)
return
}
m.Run()
}

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,95 @@
/* Space out content a bit */
body {
padding-top: 20px;
padding-bottom: 20px;
}
/* Everything but the jumbotron gets side spacing for mobile first views */
.header,
.footer {
padding-right: 15px;
padding-left: 15px;
}
/* Custom page header */
.header {
padding-bottom: 20px;
border-bottom: 1px solid #e5e5e5;
}
/* Make the masthead heading the same height as the navigation */
.header h3 {
margin-top: 0;
margin-bottom: 0;
line-height: 40px;
}
/* Custom page footer */
.footer {
padding-top: 19px;
color: #777;
border-top: 1px solid #e5e5e5;
}
.panel-heading a {
color: white;
font-weight: bold;
}
.container-narrow > hr {
margin: 30px 0;
}
@media (prefers-color-scheme: dark) {
body, .panel-body {
color: white;
background-color: #222;
}
nav.navbar-default {
background-color: rgb(35, 29, 71);
}
.navbar-default .navbar-brand {
color: white;
}
.panel-primary, .list-group, .list-group-item {
color: white;
background-color: #16181c;
}
.table > tbody > tr.active > th, .table > tbody > tr.active > td {
background-color: rgb(35, 29, 71);
}
.table-striped > tbody > tr:nth-of-type(2n+1) {
background-color: rgb(35, 29, 71);
}
.panel pre {
color: white;
background-color: #16181c;
}
.panel .panel-primary > .panel-heading {
background-color: rgb(35, 29, 71);
}
.panel a {
color: cornflowerblue;
}
code {
color: white;
background-color: #282b32;
}
}
@import url(https://cdn.jsdelivr.net/npm/firacode@6.2.0/distr/fira_code.css);
code { font-family: 'Fira Code', monospace; }
@media (min-width: 100) {
.truncate {
width: 750px;
white-space: nowrap;
overflow: hidden;
text-overflow: ellipsis;
}
.container {
width: 750px;
}
}

View File

@@ -0,0 +1,28 @@
{{define "main"}}
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
{{template "meta" .}}
<title>👉 Webfinger 👈</title>
<link href="/webfinger/assets/bootstrap.min.css" rel="stylesheet" integrity="sha384-1q8mTJOASx8j1Au+a5WDVnPi2lkFfwwEAa8hDDdjZlpLegxhjVME1fgjWPGmkzs7" crossorigin="anonymous">
<link href="/webfinger/assets/webfinger.css" rel="stylesheet" crossorigin="anonymous">
</head>
<body>
<nav class="navbar navbar-default">
<div class="container-fluid">
<a class="navbar-brand" href="/webfinger">👉 Webfinger 👈</a>
</div>
</div>
</nav>
<div class=container>
{{template "content" .}}
</div>
</body>
</html>
{{end}}

View File

@@ -0,0 +1,131 @@
{{template "main" .}}
{{define "meta"}}{{end}}
{{define "content"}}
<form method="GET">
<div class="input-group">
<span class="input-group-addon" id="basic-addon1">resource</span>
<input name="resource" class="form-control" placeholder="acct:..."/>
<span class="input-group-btn">
<button class="btn btn-default" type="submit">Go!</button>
</span>
</div>
</form>
<br/>
{{ if ne .Err nil }}
<div class="alert alert-danger" role="alert">
{{ .Err }}
</div>
{{ end }}
{{ if ne .JRD nil }}
<div class="panel panel-primary">
<div class="panel-heading">Webfinger Result</div>
<table class="table">
<tr>
<th style="width:98px">Subject</th>
<td>
<div class="media">
<div class="media-body">
{{ .JRD.Subject }}
</div>
{{ with .JRD.GetLinkByRel "http://webfinger.net/rel/avatar" }}
{{ if ne . nil }}
<div class="media-left media-middle">
<div class="panel panel-default">
<div class="panel-body">
<img src="{{ .HRef }}" />
</div>
</div>
</div>
{{ end }}
{{ end }}
</div>
</td>
</tr>
{{if ne (len .JRD.Aliases) 0}}
<tr>
<th>Aliases</th>
<td>
<ul class="list-group">
{{ range .JRD.Aliases }}<li class="list-group-item">{{ . }}</li>
{{ end }}
</ul>
</td>
</tr>
{{ end }}
{{ if ne (len .JRD.Properties) 0 }}
<tr>
<th>Properties</th>
<td>
<div class="list-group truncate">
{{ range $key, $value := .JRD.Properties }}<div class="list-group-item">
<h5 class="list-group-item-heading" title="{{ $key }}">{{ propName $key }}</h5>
<code class="list-group-item-text">{{ $value }}</code>
</div>
{{ end }}
</div>
</td>
</tr>
{{ end }}
{{ if ne (len .JRD.Links) 0 }}
{{ range .JRD.Links }}
<tr class="active">
{{ if ne (len .Template) 0 }}
<th> Template </th>
<td>{{ .Template }}</td>
{{ else }}
<th> Link </th>
<td>{{ if ne (len .HRef) 0 }}<a href="{{ .HRef }}" target="_blank">{{ .HRef }}</a>{{ end }}</td>
{{ end }}
<tr>
<tr>
<th> Properties </th>
<td>
<div class="list-group">
<div class="list-group-item truncate">
<h5 class="list-group-item-heading">rel<h5>
<code class="list-group-item-text">{{ .Rel }}</code>
</div>
{{ if ne (len .Type) 0 }}<div class="list-group-item truncate">
<h5 class="list-group-item-heading">type</h5>
<code class="list-group-item-text">{{ .Type }}</code>
</div>
{{ end }}
{{ range $key, $value := .Properties }}<div class="list-group-item truncate">
<h5 class="list-group-item-heading" title="{{ $key }}">{{ propName $key }}</h5>
<code class="list-group-item-text">{{ $value }}</code>
</div>
{{ end }}
</div>
</td>
</tr>
{{ end }}
{{ end }}
</table>
</div>
{{ end }}
<div class="panel panel-primary">
<div class="panel-heading">Raw JRD</div>
<pre style="height: 15em; overflow-y: auto; border: 0px">
Status: {{ .Status }}
{{ .Body | printf "%s" }}
</pre>
</div>
{{end}}

416
app/webfinger/webfinger.go Normal file
View File

@@ -0,0 +1,416 @@
package webfinger
import (
"context"
"crypto/ed25519"
"embed"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"html"
"html/template"
"io"
"io/fs"
"log"
"net/http"
"net/http/httptest"
"net/url"
"os"
"strings"
"github.com/golang-jwt/jwt/v4"
"go.sour.is/pkg/lg"
"go.sour.is/pkg/set"
"go.sour.is/ev"
"go.sour.is/ev/event"
)
var (
//go:embed ui/*/*
files embed.FS
templates map[string]*template.Template
)
type service struct {
es *ev.EventStore
self set.Set[string]
cache func(string) bool
}
type Option interface {
ApplyWebfinger(s *service)
}
type WithHostnames []string
func (o WithHostnames) ApplyWebfinger(s *service) {
s.self = set.New(o...)
}
type WithCache func(string) bool
func (o WithCache) ApplyWebfinger(s *service) {
s.cache = o
}
func New(ctx context.Context, es *ev.EventStore, opts ...Option) (*service, error) {
ctx, span := lg.Span(ctx)
defer span.End()
if err := event.Register(
ctx,
&SubjectSet{},
&SubjectDeleted{},
&LinkSet{},
&LinkDeleted{},
); err != nil {
return nil, err
}
svc := &service{es: es}
for _, o := range opts {
o.ApplyWebfinger(svc)
}
return svc, nil
}
func (s *service) RegisterHTTP(mux *http.ServeMux) {
a, _ := fs.Sub(files, "ui/assets")
assets := http.StripPrefix("/webfinger/assets/", http.FileServer(http.FS(a)))
mux.Handle("/webfinger", s.ui())
mux.Handle("/webfinger/assets/", assets)
}
func (s *service) RegisterWellKnown(mux *http.ServeMux) {
mux.Handle("/webfinger", lg.Htrace(s, "webfinger"))
}
func (s *service) ServeHTTP(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
ctx, span := lg.Span(ctx)
defer span.End()
if r.URL.Path != "/webfinger" {
w.WriteHeader(http.StatusNotFound)
fmt.Fprint(w, http.StatusText(http.StatusNotFound))
return
}
switch r.Method {
case http.MethodPut, http.MethodDelete:
if r.ContentLength > 4096 {
w.WriteHeader(http.StatusRequestEntityTooLarge)
fmt.Fprint(w, http.StatusText(http.StatusRequestEntityTooLarge))
span.AddEvent("request too large")
return
}
body, err := io.ReadAll(io.LimitReader(r.Body, 4096))
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
fmt.Fprint(w, http.StatusText(http.StatusInternalServerError))
span.RecordError(err)
return
}
r.Body.Close()
type claims struct {
PubKey string `json:"pub"`
*JRD
jwt.RegisteredClaims
}
token, err := jwt.ParseWithClaims(
string(body),
&claims{},
func(tok *jwt.Token) (any, error) {
c, ok := tok.Claims.(*claims)
if !ok {
return nil, fmt.Errorf("wrong type of claim")
}
if c.JRD == nil {
c.JRD = &JRD{}
}
c.JRD.Subject = c.RegisteredClaims.Subject
c.SetProperty(NSpubkey, &c.PubKey)
pub, err := dec(c.PubKey)
return ed25519.PublicKey(pub), err
},
jwt.WithValidMethods([]string{"EdDSA"}),
jwt.WithJSONNumber(),
)
if err != nil {
w.WriteHeader(http.StatusUnprocessableEntity)
fmt.Fprint(w, http.StatusText(http.StatusUnprocessableEntity), ": ", err.Error())
span.RecordError(err)
return
}
c, ok := token.Claims.(*claims)
if !ok {
w.WriteHeader(http.StatusUnprocessableEntity)
fmt.Fprint(w, http.StatusText(http.StatusUnprocessableEntity))
span.AddEvent("not a claim")
return
}
if c.ID != "" && s.cache != nil {
if ok := s.cache(c.ID); ok {
w.WriteHeader(http.StatusAlreadyReported)
fmt.Fprint(w, http.StatusText(http.StatusAlreadyReported))
span.AddEvent("already seen ID")
return
}
}
json.NewEncoder(os.Stdout).Encode(c.JRD)
for i := range c.JRD.Links {
c.JRD.Links[i].Index = uint64(i)
}
a, err := ev.Upsert(ctx, s.es, StreamID(c.JRD.Subject), func(ctx context.Context, a *JRD) error {
var auth *JRD
for i := range a.Links {
a.Links[i].Index = uint64(i)
}
// does the target have a pubkey for self auth?
if _, ok := a.Properties[NSpubkey]; ok {
auth = a
}
// Check current version for auth.
if authID, ok := a.Properties[NSauth]; ok && authID != nil && auth == nil {
auth = &JRD{}
auth.SetStreamID(StreamID(*authID))
err := s.es.Load(ctx, auth)
if err != nil {
return err
}
}
if a.Version() == 0 || a.IsDeleted() {
// else does the new object claim auth from another object?
if authID, ok := c.Properties[NSauth]; ok && authID != nil && auth == nil {
auth = &JRD{}
auth.SetStreamID(StreamID(*authID))
err := s.es.Load(ctx, auth)
if err != nil {
return err
}
}
// fall back to use auth from submitted claims
if auth == nil {
auth = c.JRD
}
}
if auth == nil {
return fmt.Errorf("auth not found")
}
err = a.OnAuth(c.JRD, auth)
if err != nil {
return err
}
if r.Method == http.MethodDelete {
return a.OnDelete(c.JRD)
}
return a.OnClaims(c.JRD)
})
if err != nil {
w.WriteHeader(http.StatusUnprocessableEntity)
fmt.Fprint(w, http.StatusText(http.StatusUnprocessableEntity), ": ", err.Error())
span.RecordError(err)
return
}
if version := a.Version(); r.Method == http.MethodDelete && version > 0 {
err = s.es.Truncate(ctx, a.StreamID(), int64(version))
span.RecordError(err)
}
w.Header().Set("Content-Type", "application/jrd+json")
if r.Method == http.MethodDelete {
w.WriteHeader(http.StatusNoContent)
} else {
w.WriteHeader(http.StatusCreated)
}
j := json.NewEncoder(w)
j.SetIndent("", " ")
err = j.Encode(a)
span.RecordError(err)
case http.MethodGet:
resource := r.URL.Query().Get("resource")
rels := r.URL.Query()["rel"]
if resource == "" {
w.WriteHeader(http.StatusBadRequest)
return
}
if u := Parse(resource); u != nil && !s.self.Has(u.URL.Host) {
redirect := &url.URL{}
redirect.Scheme = "https"
redirect.Host = u.URL.Host
redirect.RawQuery = r.URL.RawQuery
redirect.Path = "/.well-known/webfinger"
w.Header().Set("location", redirect.String())
w.WriteHeader(http.StatusSeeOther)
return
}
a := &JRD{}
a.SetStreamID(StreamID(resource))
err := s.es.Load(ctx, a)
if err != nil {
span.RecordError(err)
if errors.Is(err, ev.ErrNotFound) {
w.WriteHeader(http.StatusNotFound)
fmt.Fprint(w, http.StatusText(http.StatusNotFound))
return
}
w.WriteHeader(http.StatusInternalServerError)
fmt.Fprint(w, http.StatusText(http.StatusInternalServerError))
return
}
if a.IsDeleted() {
w.WriteHeader(http.StatusGone)
fmt.Fprint(w, http.StatusText(http.StatusGone))
span.AddEvent("is deleted")
return
}
if len(rels) > 0 {
a.Links = a.GetLinksByRel(rels...)
}
if a.Properties != nil {
if redirect, ok := a.Properties[NSredirect]; ok && redirect != nil {
w.Header().Set("location", *redirect)
w.WriteHeader(http.StatusSeeOther)
return
}
}
w.Header().Set("Content-Type", "application/jrd+json")
w.WriteHeader(http.StatusOK)
j := json.NewEncoder(w)
j.SetIndent("", " ")
err = j.Encode(a)
span.RecordError(err)
default:
w.Header().Set("Allow", "GET, PUT, DELETE, OPTIONS")
w.WriteHeader(http.StatusMethodNotAllowed)
fmt.Fprint(w, http.StatusText(http.StatusMethodNotAllowed))
span.AddEvent("method not allow: " + r.Method)
}
}
func (s *service) ui() http.HandlerFunc {
loadTemplates()
return func(w http.ResponseWriter, r *http.Request) {
args := struct {
Req *http.Request
Status int
Body []byte
JRD *JRD
Err error
}{Status: http.StatusOK}
if r.URL.Query().Has("resource") {
args.Req, args.Err = http.NewRequestWithContext(r.Context(), http.MethodGet, r.URL.String(), nil)
if args.Err != nil {
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
return
}
wr := httptest.NewRecorder()
s.ServeHTTP(wr, args.Req)
args.Status = wr.Code
switch wr.Code {
case http.StatusSeeOther:
res, err := http.DefaultClient.Get(wr.Header().Get("location"))
args.Err = err
if err == nil {
args.Status = res.StatusCode
args.Body, args.Err = io.ReadAll(res.Body)
}
case http.StatusOK:
args.Body, args.Err = io.ReadAll(wr.Body)
if args.Err == nil {
args.JRD, args.Err = ParseJRD(args.Body)
}
}
if args.Err == nil && args.Body != nil {
args.JRD, args.Err = ParseJRD(args.Body)
}
}
t := templates["home.go.tpl"]
err := t.Execute(w, args)
if err != nil {
log.Println(err)
}
}
}
func dec(s string) ([]byte, error) {
s = strings.TrimSpace(s)
return base64.RawURLEncoding.DecodeString(s)
}
var funcMap = map[string]any{
"propName": func(in string) string { return in[strings.LastIndex(in, "/")+1:] },
"escape": html.EscapeString,
}
func loadTemplates() error {
if templates != nil {
return nil
}
templates = make(map[string]*template.Template)
tmplFiles, err := fs.ReadDir(files, "ui/pages")
if err != nil {
return err
}
for _, tmpl := range tmplFiles {
if tmpl.IsDir() {
continue
}
pt := template.New(tmpl.Name())
pt.Funcs(funcMap)
pt, err = pt.ParseFS(files, "ui/pages/"+tmpl.Name(), "ui/layouts/*.go.tpl")
if err != nil {
log.Println(err)
return err
}
templates[tmpl.Name()] = pt
}
return nil
}