feat: updates to peerfinder app

This commit is contained in:
Jon Lundy 2022-11-20 10:28:50 -07:00
parent 5e31d27c54
commit bbb45c8854
Signed by untrusted user who does not match committer: xuu
GPG Key ID: C63E6D61F3035024
12 changed files with 1029 additions and 134 deletions

View File

@ -0,0 +1,8 @@
extend type Query{
keys(namespace: String!) [String!]!
get(namespace: String! keys: [String!]) [String]!
}
extend type Mutation{
set(namespace: String! key: String! value: String): Bool!
}

1
app/mercury/service.go Normal file
View File

@ -0,0 +1 @@
package mercury

View File

@ -1,21 +0,0 @@
<div class="form-group"><label class="col-sm-2 control-label" >Peer Name</label><div class='col-sm-10'><input class="form-control" type=text name=peer_name value="{$o.peer_name|default:''|escape}"/></div></div>
<div class="form-group"><label class="col-sm-2 control-label" >IRC Nick</label><div class='col-sm-10'><input class="form-control" type=text name=peer_nick value="{$o.peer_nick|default:''|escape}"/></div></div>
<div class="form-group"><label class="col-sm-2 control-label" >Note</label><div class='col-sm-10'><input class="form-control" type=text name=peer_note value="{$o.peer_note|default:''|escape}"/></div></div>
<div class="form-group"><label class="col-sm-2 control-label" >Country</label><div class='col-sm-2'><input class="form-control" type=text name=peer_country maxlength=3 value="{$o.peer_country|default:''|escape}"/></div></div>
<div class="form-group"><label class="col-sm-2 control-label" >VPN Types</label><div class='col-sm-10'><select class="form-control" size=12 multiple name="peer_type[]">
<option {$types['openvpn']|default:''} value="openvpn">openvpn</option>
<option {$types['gre/ipsec']|default:''} value="gre/ipsec">gre/ipsec</option>
<option {$types['gre/plain']|default:''} value="gre/plain">gre/plain</option>
<option {$types['fastd']|default:''} value="fastd">fastd</option>
<option {$types['tinc']|default:''} value="tinc">tinc</option>
<option {$types['zerotier']|default:''} value="zerotier">zerotier</option>
<option {$types['wireguard']|default:''} value="wireguard">wireguard</option>
<option {$types['pptp']|default:''} value="pptp">pptp</option>
<option {$types['l2tp']|default:''} value="l2tp">l2tp</option>
<option {$types['other']|default:''} value="other">other</option>
</select></div></div>
<div class="form-group"><label class="col-sm-2 control-label" >Address Family</label><div class='col-sm-10'>
<label><input type="radio" name="peer_family" value="1" {$fam[0]|default:''} /> ipv4 </label>
<label><input type="radio" name="peer_family" value="2" {$fam[1]|default:''} /> ipv6 </label>
<label><input type="radio" name="peer_family" value="3" {$fam[2]|default:''} /> both </label>
</div></div>

View File

@ -18,9 +18,11 @@
<nav> <nav>
<ul class="nav nav-pills pull-right"> <ul class="nav nav-pills pull-right">
<li role="presentation"><a href="/peers">Home</a></li> <li role="presentation"><a href="/peers">Home</a></li>
<li role="presentation"><a href="/peers/status">Status</a></li> <!--
<li role="presentation"><a href="/peers/status">Status</a></li>
-->
<li role="presentation"><a href="//util.sour.is/peer">Sign up/Manage</a></li> <li role="presentation"><a href="//util.sour.is/peer">Sign up/Manage</a></li>
<li role="presentation"><a href="//git.dn42.us/dn42/pingfinder/src/master/clients">Scripts</a></li> <li role="presentation"><a href="https://git.dn42.dev/dn42/pingfinder/src/branch/master/clients">Scripts</a></li>
</ul> </ul>
</nav> </nav>
<h3 class="text-muted">DN42 PeerFinder</h3> <h3 class="text-muted">DN42 PeerFinder</h3>
@ -31,11 +33,6 @@
{{template "content" .}} {{template "content" .}}
</div> </div>
<div class=container>
<h2>JSON Output</h2>
<pre style="background:#222; color:#ddd; height: 20em; font-size: 65%">{$o|json_encode:128|escape}</pre>
</div>
</body> </body>
</html> </html>
{{end}} {{end}}

View File

@ -30,9 +30,9 @@
<form class="form-inline" method="POST" action="/peers/req"> <form class="form-inline" method="POST" action="/peers/req">
<label>Ping IP Address [Check Hidden?]:</label> <label>Ping IP Address [Check Hidden?]:</label>
<div class="input-group input-group-sm"> <div class="input-group input-group-sm">
<input class="form-control" type="text" name="req_ip" placeholder="{$rq->remote_ip}"> <input class="form-control" type="text" name="req_ip" placeholder="{{ .RemoteIP }}">
<span class="input-group-addon"> <span class="input-group-addon">
<input type="checkbox" name="req_hidden" value=1 aria-label="Hidden?"> <input type="checkbox" name="req_hidden" disabled value=1 aria-label="Hidden?">
</span> </span>
</div> </div>
<button class="btn btn-default" type="submit">Submit</button> <button class="btn btn-default" type="submit">Submit</button>
@ -40,4 +40,21 @@
<p>If you mark your measurement as hidden, it will not be displayed on the <p>If you mark your measurement as hidden, it will not be displayed on the
page below. Note that the IP addresses of the target will be shown alongside the result.</p> page below. Note that the IP addresses of the target will be shown alongside the result.</p>
<div class=row>
<h2>Results</h2>
{{ with $args := . }}
{{range $req := .Requests}}
<div class="panel panel-primary">
<div class="panel-heading">
<a href="/peers/req/{{ $req.RequestID }}">{{ $req.RequestIP }} on {{ $req.Created }}</a>
<div style='float:right'><a href="/peers/req/{{ $req.RequestID }}" class='btn btn-success'>{{ countResponses $req }} / {{ $args.CountPeers }} </a></div>
</div>
<div class="panel-body"><b>Request ID:</b> {{ $req.RequestID }}</div>
</div>
{{end}}
{{end}}
</div>
{{end}} {{end}}

View File

@ -0,0 +1,52 @@
{{template "main" .}}
{{define "meta"}}
<meta http-equiv="refresh" content="30">
{{end}}
{{define "content"}}
{{range .Requests}}
<h2>Results to {{.RequestIP}}</h2>
{{with (orderByPeer .)}}
{{range .}}
<div class="panel panel-primary" id="peer-{{.Nick}}">
<div class="panel-heading">
<b> {{.Country}} :: {{.Name}} :: {{.Nick}} </b>
<div style='float:right'>
<a class='btn btn-success' href="#peer-{{.Nick}}">{{printf "%0.3f ms" .Latency}}</a>
</div>
</div>
<div class="panel-body">
<b>Note:</b> {{.Note}}<br/>
<b>VPN Types:</b> {{range .VPNTypes}} {{.}} {{end}}<br/>
<b>IRC:</b> {{.Nick}}
<h4>Other Results</h4>
<table class="table table-striped">
<thead>
<tr>
<th>Peer Name</th>
<th>Country</th>
<th>Latency</th>
<th>Jitter</th>
</tr>
</thead>
<tbody>
{{range .Results}}
<tr>
<th>{{.Name}}</th>
<td>{{.Country}}</td>
<td>{{printf "%0.3f ms" .Latency}}</td>
<td>{{ if eq .Jitter 0.0 }}&mdash;{{ else }}{{ printf "%0.3f ms" .Jitter }}{{ end }}</td>
</tr>
{{end}}
</tbody>
</table>
</div>
</div>
{{end}}
{{end}}
{{end}}
{{end}}

View File

@ -4,43 +4,149 @@ import (
"bytes" "bytes"
"encoding/json" "encoding/json"
"fmt" "fmt"
"net"
"net/netip" "net/netip"
"strconv" "strconv"
"strings"
"time" "time"
"github.com/tj/go-semver"
"github.com/oklog/ulid/v2"
"github.com/sour-is/ev/pkg/es/event" "github.com/sour-is/ev/pkg/es/event"
) )
type Request struct{ type Request struct {
event.AggregateRoot event.AggregateRoot
RequestIP string `json:"req_ip"` RequestID string `json:"req_id"`
Hidden bool `json:"hide,omitempty"` RequestIP string `json:"req_ip"`
Hidden bool `json:"hide,omitempty"`
Created time.Time `json:"req_created"`
Responses []Response `json:"responses"` Responses []Response `json:"responses"`
} }
var _ event.Aggregate = (*Request)(nil) var _ event.Aggregate = (*Request)(nil)
func (a *Request) ApplyEvent(lis ...event.Event) { func (a *Request) ApplyEvent(lis ...event.Event) {
for _, e := range lis { for _, e := range lis {
switch e:=e.(type) { switch e := e.(type) {
case *RequestSubmitted: case *RequestSubmitted:
a.RequestID = e.eventMeta.EventID.String()
a.RequestIP = e.RequestIP a.RequestIP = e.RequestIP
a.Hidden = e.Hidden a.Hidden = e.Hidden
a.Created = ulid.Time(e.EventMeta().EventID.Time())
case *ResultSubmitted: case *ResultSubmitted:
a.Responses = append(a.Responses, Response{ a.Responses = append(a.Responses, Response{
PeerID: e.PeerID, PeerID: e.PeerID,
PeerVersion: e.PeerVersion, ScriptVersion: e.PeerVersion,
Latency: e.Latency, Latency: e.Latency,
}) Jitter: e.Jitter,
MinRTT: e.MinRTT,
MaxRTT: e.MaxRTT,
Sent: e.Sent,
Received: e.Received,
Unreachable: e.Unreachable,
Created: ulid.Time(e.EventMeta().EventID.Time()),
})
} }
} }
} }
type Time time.Time
func (t *Time) UnmarshalJSON(b []byte) error {
time, err := time.Parse(`"2006-01-02 15:04:05"`, string(b))
*t = Time(time)
return err
}
func (t *Time) MarshalJSON() ([]byte, error) {
if t == nil {
return nil, nil
}
i := *t
return time.Time(i).MarshalJSON()
}
type ipFamily string
const (
ipFamilyV4 ipFamily = "IPv4"
ipFamilyV6 ipFamily = "IPv6"
ipFamilyBoth ipFamily = "both"
ipFamilyNone ipFamily = "none"
)
func (t *ipFamily) UnmarshalJSON(b []byte) error {
i, err := strconv.Atoi(strings.Trim(string(b), `"`))
switch i {
case 1:
*t = ipFamilyV4
case 2:
*t = ipFamilyV6
case 3:
*t = ipFamilyBoth
default:
*t = ipFamilyNone
}
return err
}
type peerType []string
func (t *peerType) UnmarshalJSON(b []byte) error {
*t = strings.Split(strings.Trim(string(b), `"`), ",")
return nil
}
type Peer struct {
ID string `json:"peer_id,omitempty"`
Owner string `json:"peer_owner"`
Nick string `json:"peer_nick"`
Name string `json:"peer_name"`
Country string `json:"peer_country"`
Note string `json:"peer_note"`
Family ipFamily `json:"peer_family"`
Type peerType `json:"peer_type"`
Created Time `json:"peer_created"`
}
func (p *Peer) CanSupport(ip string) bool {
addr := net.ParseIP(ip)
if addr == nil {
return false
}
if !addr.IsGlobalUnicast() {
return false
}
switch p.Family {
case ipFamilyV4:
return addr.To4() != nil
case ipFamilyV6:
return addr.To16() != nil
case ipFamilyNone:
return false
}
return true
}
type Response struct { type Response struct {
PeerID string `json:"peer_id"` Peer *Peer `json:"peer"`
PeerVersion string `json:"peer_version"` PeerID string `json:"-"`
Latency float64 `json:"latency,omitempty"` ScriptVersion string `json:"peer_scriptver"`
Latency float64 `json:"res_latency"`
Jitter float64 `json:"res_jitter,omitempty"`
MaxRTT float64 `json:"res_maxrtt,omitempty"`
MinRTT float64 `json:"res_minrtt,omitempty"`
Sent int `json:"res_sent,omitempty"`
Received int `json:"res_recv,omitempty"`
Unreachable bool `json:"unreachable,omitempty"`
Created time.Time `json:"res_created"`
} }
type RequestSubmitted struct { type RequestSubmitted struct {
@ -131,6 +237,12 @@ type ResultSubmitted struct {
PeerID string `json:"peer_id"` PeerID string `json:"peer_id"`
PeerVersion string `json:"peer_version"` PeerVersion string `json:"peer_version"`
Latency float64 `json:"latency,omitempty"` Latency float64 `json:"latency,omitempty"`
Jitter float64 `json:"jitter,omitempty"`
MaxRTT float64 `json:"maxrtt,omitempty"`
MinRTT float64 `json:"minrtt,omitempty"`
Sent int `json:"res_sent,omitempty"`
Received int `json:"res_recv,omitempty"`
Unreachable bool `json:"unreachable,omitempty"`
} }
func (r *ResultSubmitted) Created() time.Time { func (r *ResultSubmitted) Created() time.Time {
@ -185,10 +297,17 @@ func (a *Info) MarshalEnviron() ([]byte, error) {
return b.Bytes(), nil return b.Bytes(), nil
} }
func (a *Info) OnCreate() error { func (a *Info) OnUpsert() error {
if a.StreamVersion() == 0 { if a.StreamVersion() == 0 {
event.Raise(a, &VersionChanged{ScriptVersion: initVersion}) event.Raise(a, &VersionChanged{ScriptVersion: initVersion})
} }
current, _ := semver.Parse(initVersion)
previous, _ := semver.Parse(a.ScriptVersion)
if current.Compare(previous) > 0 {
event.Raise(a, &VersionChanged{ScriptVersion: initVersion})
}
return nil return nil
} }

View File

@ -4,33 +4,41 @@ import (
"context" "context"
"embed" "embed"
"encoding/json" "encoding/json"
"errors"
"fmt"
"html/template" "html/template"
"io" "io"
"io/fs" "io/fs"
"log" "log"
"net" "net"
"net/http" "net/http"
"sort"
"strconv" "strconv"
"strings" "strings"
"time"
"github.com/oklog/ulid"
contentnegotiation "gitlab.com/jamietanna/content-negotiation-go" contentnegotiation "gitlab.com/jamietanna/content-negotiation-go"
"go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/attribute"
"github.com/oklog/ulid"
"github.com/sour-is/ev/internal/lg" "github.com/sour-is/ev/internal/lg"
"github.com/sour-is/ev/pkg/es" "github.com/sour-is/ev/pkg/es"
"github.com/sour-is/ev/pkg/es/event" "github.com/sour-is/ev/pkg/es/event"
"github.com/sour-is/ev/pkg/locker" "github.com/sour-is/ev/pkg/locker"
"github.com/sour-is/ev/pkg/math"
"github.com/sour-is/ev/pkg/set"
) )
const ( const (
aggInfo = "pf-info" aggInfo = "pf-info"
queueRequests = "pf-requests" queueRequests = "pf-requests"
queueResponses = "pf-request-" queueResults = "pf-results"
queuePeers = "pf-peer-" initVersion = "1.2.1"
initVersion = "1.1.0"
) )
func aggRequest(id string) string { return "pf-request-" + id }
func aggPeer(id string) string { return "pf-peer-" + id }
var ( var (
//go:embed pages/* layouts/* assets/* //go:embed pages/* layouts/* assets/*
files embed.FS files embed.FS
@ -38,33 +46,33 @@ var (
) )
type service struct { type service struct {
es *es.EventStore es *es.EventStore
statusURL string
State locker.Locked[state] state *locker.Locked[state]
} }
type state struct { type state struct {
Version string peers map[string]*Peer
Requests []Request
} }
func New(ctx context.Context, es *es.EventStore) (*service, error) { func New(ctx context.Context, es *es.EventStore, statusURL string) (*service, error) {
ctx, span := lg.Span(ctx) ctx, span := lg.Span(ctx)
defer span.End() defer span.End()
loadTemplates()
if err := event.Register(ctx, &RequestSubmitted{}, &ResultSubmitted{}, &VersionChanged{}); err != nil { if err := event.Register(ctx, &RequestSubmitted{}, &ResultSubmitted{}, &VersionChanged{}); err != nil {
span.RecordError(err) span.RecordError(err)
return nil, err return nil, err
} }
svc := &service{es: es} svc := &service{es: es, statusURL: statusURL, state: locker.New(&state{peers: make(map[string]*Peer)})}
return svc, nil return svc, nil
} }
func (s *service) RegisterHTTP(mux *http.ServeMux) { func (s *service) RegisterHTTP(mux *http.ServeMux) {
loadTemplates() a, _ := fs.Sub(files, "assets")
a, err := fs.Sub(files, "assets")
log.Println(err)
assets := http.StripPrefix("/peers/assets/", http.FileServer(http.FS(a))) assets := http.StripPrefix("/peers/assets/", http.FileServer(http.FS(a)))
mux.Handle("/peers/assets/", lg.Htrace(assets, "peer-assets")) mux.Handle("/peers/assets/", lg.Htrace(assets, "peer-assets"))
@ -73,7 +81,7 @@ func (s *service) RegisterHTTP(mux *http.ServeMux) {
func (s *service) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (s *service) ServeHTTP(w http.ResponseWriter, r *http.Request) {
ctx := r.Context() ctx := r.Context()
_, span := lg.Span(ctx) ctx, span := lg.Span(ctx)
defer span.End() defer span.End()
r = r.WithContext(ctx) r = r.WithContext(ctx)
@ -86,12 +94,20 @@ func (s *service) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return return
case strings.HasPrefix(r.URL.Path, "/peers/req/"): case strings.HasPrefix(r.URL.Path, "/peers/req/"):
s.getResults(w, r, strings.TrimPrefix(r.URL.Path, "/peers/req/")) s.getResultsForRequest(w, r, strings.TrimPrefix(r.URL.Path, "/peers/req/"))
return return
case strings.HasPrefix(r.URL.Path, "/peers/status"):
s.state.Modify(r.Context(), func(ctx context.Context, state *state) error {
for id, p := range state.peers {
fmt.Fprintln(w, "PEER: ", id, p.Owner, p.Name)
}
return nil
})
default: default:
t := templates["home.tpl"] s.getResults(w, r)
t.Execute(w, nil)
return return
} }
case http.MethodPost: case http.MethodPost:
@ -122,8 +138,23 @@ func (s *service) getPending(w http.ResponseWriter, r *http.Request, uuid string
attribute.String("uuid", uuid), attribute.String("uuid", uuid),
) )
info, err := es.Upsert(ctx, s.es, "pf-info", func(ctx context.Context, agg *Info) error { var peer *Peer
return agg.OnCreate() // initialize if not exists err := s.state.Modify(ctx, func(ctx context.Context, state *state) error {
var ok bool
if peer, ok = state.peers[uuid]; !ok {
return fmt.Errorf("peer not found: %s", uuid)
}
return nil
})
if err != nil {
span.RecordError(err)
w.WriteHeader(http.StatusNotFound)
return
}
info, err := es.Upsert(ctx, s.es, aggInfo, func(ctx context.Context, agg *Info) error {
return agg.OnUpsert() // initialize if not exists
}) })
if err != nil { if err != nil {
span.RecordError(err) span.RecordError(err)
@ -138,14 +169,16 @@ func (s *service) getPending(w http.ResponseWriter, r *http.Request, uuid string
return return
} }
responses, err := s.es.Read(ctx, queuePeers+uuid, -1, -30) responses, err := s.es.Read(ctx, aggPeer(uuid), -1, -30)
if err != nil { if err != nil {
span.RecordError(err) span.RecordError(err)
w.WriteHeader(http.StatusInternalServerError) w.WriteHeader(http.StatusInternalServerError)
return return
} }
req := filter(requests, responses) span.AddEvent(fmt.Sprintf("req = %d, res = %d", len(requests), len(responses)))
req := filter(peer, requests, responses)
negotiator := contentnegotiation.NewNegotiator("application/json", "text/environment", "text/plain", "text/html") negotiator := contentnegotiation.NewNegotiator("application/json", "text/environment", "text/plain", "text/html")
negotiated, _, err := negotiator.Negotiate(r.Header.Get("Accept")) negotiated, _, err := negotiator.Negotiate(r.Header.Get("Accept"))
@ -183,7 +216,41 @@ func (s *service) getPending(w http.ResponseWriter, r *http.Request, uuid string
} }
span.RecordError(err) span.RecordError(err)
} }
func (s *service) getResults(w http.ResponseWriter, r *http.Request, uuid string) { func (s *service) getResults(w http.ResponseWriter, r *http.Request) {
ctx, span := lg.Span(r.Context())
defer span.End()
events, err := s.es.Read(ctx, queueRequests, -1, -30)
if err != nil {
span.RecordError(err)
w.WriteHeader(http.StatusInternalServerError)
return
}
requests := make([]*Request, len(events))
for i, req := range events {
if req, ok := req.(*RequestSubmitted); ok {
requests[i], err = s.loadResult(ctx, req.RequestID())
if err != nil {
span.RecordError(err)
w.WriteHeader(http.StatusInternalServerError)
return
}
}
}
args := requestArgs(r)
args.Requests = requests
s.state.Modify(ctx, func(ctx context.Context, state *state) error {
args.CountPeers = len(state.peers)
return nil
})
t := templates["home.tpl"]
t.Execute(w, args)
}
func (s *service) getResultsForRequest(w http.ResponseWriter, r *http.Request, uuid string) {
ctx, span := lg.Span(r.Context()) ctx, span := lg.Span(r.Context())
defer span.End() defer span.End()
@ -191,23 +258,33 @@ func (s *service) getResults(w http.ResponseWriter, r *http.Request, uuid string
attribute.String("uuid", uuid), attribute.String("uuid", uuid),
) )
responses, err := s.es.Read(ctx, queueResponses+uuid, -1, es.AllEvents) request, err := s.loadResult(ctx, uuid)
if err != nil { if err != nil {
w.WriteHeader(http.StatusInternalServerError) w.WriteHeader(http.StatusInternalServerError)
return return
} }
negotiator := contentnegotiation.NewNegotiator("application/json", "text/environment", "text/csv", "text/plain", "text/html") negotiator := contentnegotiation.NewNegotiator("application/json", "text/environment", "text/csv", "text/plain", "text/html")
negotiated, _, err := negotiator.Negotiate("application/json") negotiated, _, err := negotiator.Negotiate(r.Header.Get("Accept"))
if err != nil { if err != nil {
w.WriteHeader(http.StatusNotAcceptable) w.WriteHeader(http.StatusNotAcceptable)
return return
} }
span.AddEvent(negotiated.String())
switch negotiated.String() { switch negotiated.String() {
// case "text/environment": // case "text/environment":
// encodeTo(w, responses.MarshalBinary) // encodeTo(w, responses.MarshalBinary)
case "application/json": case "application/json":
json.NewEncoder(w).Encode(responses) json.NewEncoder(w).Encode(request)
return
default:
args := requestArgs(r)
args.Requests = append(args.Requests, request)
span.AddEvent(fmt.Sprint(args))
err := renderTo(w, "req.tpl", args)
span.RecordError(err)
return
} }
} }
func (s *service) postRequest(w http.ResponseWriter, r *http.Request) { func (s *service) postRequest(w http.ResponseWriter, r *http.Request) {
@ -219,7 +296,14 @@ func (s *service) postRequest(w http.ResponseWriter, r *http.Request) {
return return
} }
ip := net.ParseIP(r.Form.Get("req_ip")) args := requestArgs(r)
requestIP := args.RemoteIP
if ip := r.Form.Get("req_ip"); ip != "" {
requestIP = ip
}
ip := net.ParseIP(requestIP)
if ip == nil { if ip == nil {
w.WriteHeader(http.StatusBadRequest) w.WriteHeader(http.StatusBadRequest)
return return
@ -236,6 +320,8 @@ func (s *service) postRequest(w http.ResponseWriter, r *http.Request) {
) )
s.es.Append(ctx, queueRequests, event.NewEvents(req)) s.es.Append(ctx, queueRequests, event.NewEvents(req))
http.Redirect(w, r, "/peers/req/"+req.RequestID(), http.StatusSeeOther)
} }
func (s *service) postResult(w http.ResponseWriter, r *http.Request, id string) { func (s *service) postResult(w http.ResponseWriter, r *http.Request, id string) {
ctx, span := lg.Span(r.Context()) ctx, span := lg.Span(r.Context())
@ -255,10 +341,26 @@ func (s *service) postResult(w http.ResponseWriter, r *http.Request, id string)
return return
} }
peerID := r.Form.Get("peer_id")
err := s.state.Modify(ctx, func(ctx context.Context, state *state) error {
var ok bool
if _, ok = state.peers[peerID]; !ok {
// fmt.Printf("peer not found: %s\n", peerID)
return fmt.Errorf("peer not found: %s", peerID)
}
return nil
})
if err != nil {
span.RecordError(err)
w.WriteHeader(http.StatusNotFound)
return
}
var unreach bool
latency, err := strconv.ParseFloat(r.Form.Get("res_latency"), 64) latency, err := strconv.ParseFloat(r.Form.Get("res_latency"), 64)
if err != nil { if err != nil {
w.WriteHeader(http.StatusUnprocessableEntity) unreach = true
return
} }
req := &ResultSubmitted{ req := &ResultSubmitted{
@ -266,13 +368,35 @@ func (s *service) postResult(w http.ResponseWriter, r *http.Request, id string)
PeerID: r.Form.Get("peer_id"), PeerID: r.Form.Get("peer_id"),
PeerVersion: r.Form.Get("peer_version"), PeerVersion: r.Form.Get("peer_version"),
Latency: latency, Latency: latency,
Unreachable: unreach,
}
if jitter, err := strconv.ParseFloat(r.Form.Get("res_jitter"), 64); err == nil {
req.Jitter = jitter
} else {
span.RecordError(err)
}
if minrtt, err := strconv.ParseFloat(r.Form.Get("res_minrtt"), 64); err == nil {
req.MinRTT = minrtt
} else {
span.RecordError(err)
}
if maxrtt, err := strconv.ParseFloat(r.Form.Get("res_maxrtt"), 64); err == nil {
req.MaxRTT = maxrtt
} else {
span.RecordError(err)
} }
span.SetAttributes( span.SetAttributes(
attribute.Stringer("result", req), attribute.Stringer("result", req),
) )
idx, err := s.es.LastIndex(ctx, queueResponses+id) s.state.Modify(ctx, func(ctx context.Context, state *state) error {
return nil
})
idx, err := s.es.LastIndex(ctx, aggRequest(id))
if err != nil { if err != nil {
w.WriteHeader(http.StatusInternalServerError) w.WriteHeader(http.StatusInternalServerError)
return return
@ -282,16 +406,219 @@ func (s *service) postResult(w http.ResponseWriter, r *http.Request, id string)
return return
} }
s.es.Append(ctx, queueRequests, event.NewEvents(req)) s.es.Append(ctx, queueResults, event.NewEvents(req))
}
func (s *service) RefreshJob(ctx context.Context, _ time.Time) error {
ctx, span := lg.Span(ctx)
defer span.End()
req, err := http.NewRequestWithContext(ctx, http.MethodGet, s.statusURL, nil)
span.RecordError(err)
if err != nil {
return err
}
req.Header.Set("Accept", "application/json")
res, err := http.DefaultClient.Do(req)
span.RecordError(err)
if err != nil {
return err
}
defer res.Body.Close()
var peers []*Peer
err = json.NewDecoder(res.Body).Decode(&peers)
span.RecordError(err)
if err != nil {
return err
}
span.AddEvent(fmt.Sprintf("processed %d peers", len(peers)))
err = s.state.Modify(ctx, func(ctx context.Context, t *state) error {
for _, peer := range peers {
t.peers[peer.ID] = peer
}
return nil
})
span.RecordError(err)
return err
}
func (s *service) CleanJob(ctx context.Context, now time.Time) error {
ctx, span := lg.Span(ctx)
defer span.End()
fmt.Println("clear peerfinder requests")
span.AddEvent("clear peerfinder requests")
endRequestID, err := s.cleanRequests(ctx, now)
if err != nil {
return err
}
if err = s.cleanResults(ctx, endRequestID); err != nil {
return err
}
return s.cleanPeerJobs(ctx)
}
func (s *service) cleanPeerJobs(ctx context.Context) error {
ctx, span := lg.Span(ctx)
defer span.End()
peers := set.New[string]()
err := s.state.Modify(ctx, func(ctx context.Context, state *state) error {
for id := range state.peers {
peers.Add(id)
}
return nil
})
if err != nil {
return err
}
// trunctate all the peer streams to last 30
for streamID := range peers {
streamID = aggPeer(streamID)
first, err := s.es.FirstIndex(ctx, streamID)
if err != nil {
return err
}
last, err := s.es.LastIndex(ctx, streamID)
if err != nil {
return err
}
newFirst := math.Max(math.Max(int64(last-30), int64(first)), 1)
if last == 0 || newFirst == int64(first) {
// fmt.Println("SKIP", streamID, first, newFirst, last)
span.AddEvent(fmt.Sprint("SKIP", streamID, first, newFirst, last))
continue
}
// fmt.Println("TRUNC", streamID, first, newFirst, last)
span.AddEvent(fmt.Sprint("TRUNC", streamID, first, newFirst, last))
err = s.es.Truncate(ctx, streamID, int64(newFirst))
if err != nil {
return err
}
}
return nil
}
func (s *service) cleanRequests(ctx context.Context, now time.Time) (string, error) {
ctx, span := lg.Span(ctx)
defer span.End()
var streamIDs []string
var endPosition uint64
var endRequestID string
last, err := s.es.LastIndex(ctx, queueRequests)
if err != nil {
return "", err
}
end:
for {
events, err := s.es.Read(ctx, queueRequests, int64(endPosition), 1000) // read 1000 from the top each loop.
if err != nil && !errors.Is(err, es.ErrNotFound) {
span.RecordError(err)
return "", err
}
if len(events) == 0 {
break
}
endPosition = events.Last().EventMeta().ActualPosition
for _, event := range events {
switch e := event.(type) {
case *RequestSubmitted:
if e.eventMeta.ActualPosition < last-30 || e.Created().Before(now.Add(-24*time.Hour)) {
streamIDs = append(streamIDs, aggRequest(e.RequestID()))
} else {
endRequestID = e.RequestID()
endPosition = e.eventMeta.ActualPosition
break end
}
}
}
}
// truncate all reqs to found end position
// fmt.Println("TRUNC", queueRequests, int64(endPosition), last)
span.AddEvent(fmt.Sprint("TRUNC", queueRequests, int64(endPosition), last))
err = s.es.Truncate(ctx, queueRequests, int64(endPosition))
if err != nil {
return "", err
}
// truncate all the request streams
for _, streamID := range streamIDs {
last, err := s.es.LastIndex(ctx, streamID)
if err != nil {
return "", err
}
fmt.Println("TRUNC", streamID, last)
span.AddEvent(fmt.Sprint("TRUNC", streamID, last))
err = s.es.Truncate(ctx, streamID, int64(last))
if err != nil {
return "", err
}
}
return endRequestID, nil
}
func (s *service) cleanResults(ctx context.Context, endRequestID string) error {
ctx, span := lg.Span(ctx)
defer span.End()
var endPosition uint64
done := false
for !done {
events, err := s.es.Read(ctx, queueResults, int64(endPosition), 1000) // read 30 from the top each loop.
if err != nil {
return err
}
if len(events) == 0 {
done = true
continue
}
endPosition = events.Last().EventMeta().ActualPosition
for _, event := range events {
switch e := event.(type) {
case *ResultSubmitted:
if e.RequestID == endRequestID {
done = true
endPosition = e.eventMeta.ActualPosition
}
}
}
}
// truncate all reqs to found end position
// fmt.Println("TRUNC", queueResults, int64(endPosition), last)
span.AddEvent(fmt.Sprint("TRUNC", queueResults, int64(endPosition)))
err := s.es.Truncate(ctx, queueResults, int64(endPosition))
if err != nil {
return err
}
return nil
} }
func filter(requests, responses event.Events) *RequestSubmitted { func filter(peer *Peer, requests, responses event.Events) *RequestSubmitted {
have := make(map[string]struct{}, len(responses)) have := make(map[string]struct{}, len(responses))
for _, res := range toList[ResultSubmitted](responses...) { for _, res := range toList[ResultSubmitted](responses...) {
have[res.RequestID] = struct{}{} have[res.RequestID] = struct{}{}
} }
for _, req := range reverse(toList[RequestSubmitted](requests...)...) { for _, req := range reverse(toList[RequestSubmitted](requests...)...) {
if _, ok := have[req.RequestID()]; !ok { if _, ok := have[req.RequestID()]; !ok {
if !peer.CanSupport(req.RequestIP) {
continue
}
return req return req
} }
} }
@ -345,12 +672,14 @@ func loadTemplates() error {
if tmpl.IsDir() { if tmpl.IsDir() {
continue continue
} }
log.Println(tmpl.Name()) pt := template.New(tmpl.Name())
pt, err := template.ParseFS(files, "pages/"+tmpl.Name(), "layouts/*.tpl") pt.Funcs(funcMap)
pt, err = pt.ParseFS(files, "pages/"+tmpl.Name(), "layouts/*.tpl")
if err != nil { if err != nil {
log.Println(err)
return err return err
} }
templates[tmpl.Name()] = pt templates[tmpl.Name()] = pt
} }
return nil return nil
@ -364,17 +693,144 @@ func Projector(e event.Event) []event.Event {
switch e := e.(type) { switch e := e.(type) {
case *RequestSubmitted: case *RequestSubmitted:
e1 := event.NewPtr(streamID, streamPos) e1 := event.NewPtr(streamID, streamPos)
event.SetStreamID(queueResponses+e.RequestID(), e1) event.SetStreamID(aggRequest(e.RequestID()), e1)
return []event.Event{e1} return []event.Event{e1}
case *ResultSubmitted: case *ResultSubmitted:
e1 := event.NewPtr(streamID, streamPos) e1 := event.NewPtr(streamID, streamPos)
event.SetStreamID(queueResponses+e.RequestID, e1) event.SetStreamID(aggRequest(e.RequestID), e1)
e2 := event.NewPtr(streamID, streamPos) e2 := event.NewPtr(streamID, streamPos)
event.SetStreamID(queuePeers+e.PeerID, e2) event.SetStreamID(aggPeer(e.PeerID), e2)
return []event.Event{e1, e2} return []event.Event{e1, e2}
} }
return nil return nil
} }
type Args struct {
RemoteIP string
Requests []*Request
CountPeers int
}
func requestArgs(r *http.Request) Args {
remoteIP, _, _ := strings.Cut(r.RemoteAddr, ":")
if s := r.Header.Get("X-Forwarded-For"); s != "" {
remoteIP = s
}
return Args{
RemoteIP: remoteIP,
}
}
func renderTo(w io.Writer, name string, args any) (err error) {
defer func() {
if p := recover(); p != nil {
err = fmt.Errorf("panic: %s", p)
}
if err != nil {
fmt.Fprint(w, err)
}
}()
t, ok := templates[name]
if !ok || t == nil {
return fmt.Errorf("missing template")
}
return t.Execute(w, args)
}
type ListResponse []Response
func (lis ListResponse) Len() int {
return len(lis)
}
func (lis ListResponse) Less(i, j int) bool {
return lis[i].Latency < lis[j].Latency
}
func (lis ListResponse) Swap(i, j int) {
lis[i], lis[j] = lis[j], lis[i]
}
func fnOrderByPeer(rq *Request) any {
type peerResult struct {
Name string
Country string
Latency float64
Jitter float64
}
type peer struct {
Name string
Note string
Nick string
Country string
Latency float64
Jitter float64
VPNTypes []string
Results []peerResult
}
peers := make(map[string]peer)
sort.Sort(ListResponse(rq.Responses))
for _, rs := range rq.Responses {
p, ok := peers[rs.Peer.Owner]
if !ok {
p.Country = rs.Peer.Country
p.Name = rs.Peer.Name
p.Nick = rs.Peer.Nick
p.Note = rs.Peer.Note
p.Latency = rs.Latency
p.Jitter = rs.Jitter
p.VPNTypes = rs.Peer.Type
}
p.Results = append(p.Results, peerResult{
Name: rs.Peer.Name,
Country: rs.Peer.Country,
Latency: rs.Latency,
Jitter: rs.Jitter,
})
peers[rs.Peer.Owner] = p
}
return peers
}
func fnCountResponses(rq *Request) int {
count := 0
for _, res := range rq.Responses {
if !res.Unreachable {
count++
}
}
return count
}
var funcMap = map[string]any{
"orderByPeer": fnOrderByPeer,
"countResponses": fnCountResponses,
}
func (s *service) loadResult(ctx context.Context, uuid string) (*Request, error) {
request := &Request{}
request.SetStreamID(aggRequest(uuid))
err := s.es.Load(ctx, request)
if err != nil {
return nil, err
}
return request, s.state.Modify(ctx, func(ctx context.Context, t *state) error {
for i := range request.Responses {
res := &request.Responses[i]
if peer, ok := t.peers[res.PeerID]; ok {
res.Peer = peer
res.Peer.ID = ""
}
}
return nil
})
}

2
go.mod
View File

@ -22,6 +22,8 @@ require (
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4
) )
require github.com/tj/go-semver v1.0.0
require ( require (
git.mills.io/prologic/msgbus v0.1.19 // indirect git.mills.io/prologic/msgbus v0.1.19 // indirect
github.com/ScaleFT/sshkeys v0.0.0-20200327173127-6142f742bca5 // indirect github.com/ScaleFT/sshkeys v0.0.0-20200327173127-6142f742bca5 // indirect

4
go.sum
View File

@ -409,6 +409,10 @@ github.com/tidwall/wal v1.1.7 h1:emc1TRjIVsdKKSnpwGBAcsAGg0767SvUk8+ygx7Bb+4=
github.com/tidwall/wal v1.1.7/go.mod h1:r6lR1j27W9EPalgHiB7zLJDYu3mzW5BQP5KrzBpYY/E= github.com/tidwall/wal v1.1.7/go.mod h1:r6lR1j27W9EPalgHiB7zLJDYu3mzW5BQP5KrzBpYY/E=
github.com/timewasted/go-accept-headers v0.0.0-20130320203746-c78f304b1b09 h1:QVxbx5l/0pzciWYOynixQMtUhPYC3YKD6EcUlOsgGqw= github.com/timewasted/go-accept-headers v0.0.0-20130320203746-c78f304b1b09 h1:QVxbx5l/0pzciWYOynixQMtUhPYC3YKD6EcUlOsgGqw=
github.com/timewasted/go-accept-headers v0.0.0-20130320203746-c78f304b1b09/go.mod h1:Uy/Rnv5WKuOO+PuDhuYLEpUiiKIZtss3z519uk67aF0= github.com/timewasted/go-accept-headers v0.0.0-20130320203746-c78f304b1b09/go.mod h1:Uy/Rnv5WKuOO+PuDhuYLEpUiiKIZtss3z519uk67aF0=
github.com/tj/assert v0.0.0-20190920132354-ee03d75cd160 h1:NSWpaDaurcAJY7PkL8Xt0PhZE7qpvbZl5ljd8r6U0bI=
github.com/tj/assert v0.0.0-20190920132354-ee03d75cd160/go.mod h1:mZ9/Rh9oLWpLLDRpvE+3b7gP/C2YyLFYxNmcLnPTMe0=
github.com/tj/go-semver v1.0.0 h1:vpn6Jmn6Hi3QSmrP1PzYcqScop9IZiGCVOSn18wzu8w=
github.com/tj/go-semver v1.0.0/go.mod h1:YZuwVc013rh7KDV0k6tPbWrFeEHBHcp8amfJL+nHzjM=
github.com/tklauser/go-sysconf v0.3.10/go.mod h1:C8XykCvCb+Gn0oNCWPIlcb0RuglQTYaQ2hGm7jmxEFk= github.com/tklauser/go-sysconf v0.3.10/go.mod h1:C8XykCvCb+Gn0oNCWPIlcb0RuglQTYaQ2hGm7jmxEFk=
github.com/tklauser/numcpus v0.4.0/go.mod h1:1+UI3pD8NW14VMwdgJNJ1ESk2UnwhAnz5hMwiKKqXCQ= github.com/tklauser/numcpus v0.4.0/go.mod h1:1+UI3pD8NW14VMwdgJNJ1ESk2UnwhAnz5hMwiKKqXCQ=
github.com/tyler-smith/go-bip39 v1.1.0 h1:5eUemwrMargf3BSLRRCalXT93Ns6pQJIjYQN2nyfOP8= github.com/tyler-smith/go-bip39 v1.1.0 h1:5eUemwrMargf3BSLRRCalXT93Ns6pQJIjYQN2nyfOP8=

View File

@ -58,25 +58,28 @@ type ComplexityRoot struct {
} }
Event struct { Event struct {
Bytes func(childComplexity int) int Bytes func(childComplexity int) int
Created func(childComplexity int) int Created func(childComplexity int) int
EventID func(childComplexity int) int EventID func(childComplexity int) int
ID func(childComplexity int) int ID func(childComplexity int) int
Linked func(childComplexity int) int Linked func(childComplexity int) int
Meta func(childComplexity int) int Meta func(childComplexity int) int
Type func(childComplexity int) int Position func(childComplexity int) int
Values func(childComplexity int) int StreamID func(childComplexity int) int
Type func(childComplexity int) int
Values func(childComplexity int) int
} }
Meta struct { Meta struct {
Created func(childComplexity int) int ActualPosition func(childComplexity int) int
GetEventID func(childComplexity int) int ActualStreamID func(childComplexity int) int
Position func(childComplexity int) int Created func(childComplexity int) int
StreamID func(childComplexity int) int GetEventID func(childComplexity int) int
} }
Mutation struct { Mutation struct {
CreateSaltyUser func(childComplexity int, nick string, pubkey string) int CreateSaltyUser func(childComplexity int, nick string, pubkey string) int
TruncateStream func(childComplexity int, streamID string, index int64) int
} }
PageInfo struct { PageInfo struct {
@ -118,6 +121,7 @@ type ComplexityRoot struct {
} }
type MutationResolver interface { type MutationResolver interface {
TruncateStream(ctx context.Context, streamID string, index int64) (bool, error)
CreateSaltyUser(ctx context.Context, nick string, pubkey string) (*salty.SaltyUser, error) CreateSaltyUser(ctx context.Context, nick string, pubkey string) (*salty.SaltyUser, error)
} }
type QueryResolver interface { type QueryResolver interface {
@ -201,6 +205,20 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in
return e.complexity.Event.Meta(childComplexity), true return e.complexity.Event.Meta(childComplexity), true
case "Event.position":
if e.complexity.Event.Position == nil {
break
}
return e.complexity.Event.Position(childComplexity), true
case "Event.streamID":
if e.complexity.Event.StreamID == nil {
break
}
return e.complexity.Event.StreamID(childComplexity), true
case "Event.type": case "Event.type":
if e.complexity.Event.Type == nil { if e.complexity.Event.Type == nil {
break break
@ -215,6 +233,20 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in
return e.complexity.Event.Values(childComplexity), true return e.complexity.Event.Values(childComplexity), true
case "Meta.position":
if e.complexity.Meta.ActualPosition == nil {
break
}
return e.complexity.Meta.ActualPosition(childComplexity), true
case "Meta.streamID":
if e.complexity.Meta.ActualStreamID == nil {
break
}
return e.complexity.Meta.ActualStreamID(childComplexity), true
case "Meta.created": case "Meta.created":
if e.complexity.Meta.Created == nil { if e.complexity.Meta.Created == nil {
break break
@ -229,20 +261,6 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in
return e.complexity.Meta.GetEventID(childComplexity), true return e.complexity.Meta.GetEventID(childComplexity), true
case "Meta.position":
if e.complexity.Meta.Position == nil {
break
}
return e.complexity.Meta.Position(childComplexity), true
case "Meta.streamID":
if e.complexity.Meta.StreamID == nil {
break
}
return e.complexity.Meta.StreamID(childComplexity), true
case "Mutation.createSaltyUser": case "Mutation.createSaltyUser":
if e.complexity.Mutation.CreateSaltyUser == nil { if e.complexity.Mutation.CreateSaltyUser == nil {
break break
@ -255,6 +273,18 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in
return e.complexity.Mutation.CreateSaltyUser(childComplexity, args["nick"].(string), args["pubkey"].(string)), true return e.complexity.Mutation.CreateSaltyUser(childComplexity, args["nick"].(string), args["pubkey"].(string)), true
case "Mutation.truncateStream":
if e.complexity.Mutation.TruncateStream == nil {
break
}
args, err := ec.field_Mutation_truncateStream_args(context.TODO(), rawArgs)
if err != nil {
return 0, false
}
return e.complexity.Mutation.TruncateStream(childComplexity, args["streamID"].(string), args["index"].(int64)), true
case "PageInfo.begin": case "PageInfo.begin":
if e.complexity.PageInfo.Begin == nil { if e.complexity.PageInfo.Begin == nil {
break break
@ -502,14 +532,17 @@ var sources = []*ast.Source{
{Name: "../../../pkg/es/es.graphqls", Input: ` {Name: "../../../pkg/es/es.graphqls", Input: `
type Meta @goModel(model: "github.com/sour-is/ev/pkg/es/event.Meta") { type Meta @goModel(model: "github.com/sour-is/ev/pkg/es/event.Meta") {
eventID: String! @goField(name: "getEventID") eventID: String! @goField(name: "getEventID")
streamID: String! streamID: String! @goField(name: "ActualStreamID")
position: Int! @goField(name: "ActualPosition")
created: Time! created: Time!
position: Int!
} }
extend type Query { extend type Query {
events(streamID: String! paging: PageInput): Connection! events(streamID: String! paging: PageInput): Connection!
} }
extend type Mutation {
truncateStream(streamID: String! index:Int!): Boolean!
}
extend type Subscription { extend type Subscription {
"""after == 0 start from begining, after == -1 start from end""" """after == 0 start from begining, after == -1 start from end"""
eventAdded(streamID: String! after: Int! = -1): Event eventAdded(streamID: String! after: Int! = -1): Event
@ -519,6 +552,9 @@ type Event implements Edge @goModel(model: "github.com/sour-is/ev/pkg/es.GQLEven
id: ID! id: ID!
eventID: String! eventID: String!
streamID: String!
position: Int!
values: Map! values: Map!
bytes: String! bytes: String!
type: String! type: String!
@ -644,6 +680,30 @@ func (ec *executionContext) field_Mutation_createSaltyUser_args(ctx context.Cont
return args, nil return args, nil
} }
func (ec *executionContext) field_Mutation_truncateStream_args(ctx context.Context, rawArgs map[string]interface{}) (map[string]interface{}, error) {
var err error
args := map[string]interface{}{}
var arg0 string
if tmp, ok := rawArgs["streamID"]; ok {
ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("streamID"))
arg0, err = ec.unmarshalNString2string(ctx, tmp)
if err != nil {
return nil, err
}
}
args["streamID"] = arg0
var arg1 int64
if tmp, ok := rawArgs["index"]; ok {
ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("index"))
arg1, err = ec.unmarshalNInt2int64(ctx, tmp)
if err != nil {
return nil, err
}
}
args["index"] = arg1
return args, nil
}
func (ec *executionContext) field_Query___type_args(ctx context.Context, rawArgs map[string]interface{}) (map[string]interface{}, error) { func (ec *executionContext) field_Query___type_args(ctx context.Context, rawArgs map[string]interface{}) (map[string]interface{}, error) {
var err error var err error
args := map[string]interface{}{} args := map[string]interface{}{}
@ -994,6 +1054,94 @@ func (ec *executionContext) fieldContext_Event_eventID(ctx context.Context, fiel
return fc, nil return fc, nil
} }
func (ec *executionContext) _Event_streamID(ctx context.Context, field graphql.CollectedField, obj *es.GQLEvent) (ret graphql.Marshaler) {
fc, err := ec.fieldContext_Event_streamID(ctx, field)
if err != nil {
return graphql.Null
}
ctx = graphql.WithFieldContext(ctx, fc)
defer func() {
if r := recover(); r != nil {
ec.Error(ctx, ec.Recover(ctx, r))
ret = graphql.Null
}
}()
resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) {
ctx = rctx // use context from middleware stack in children
return obj.StreamID(), nil
})
if err != nil {
ec.Error(ctx, err)
return graphql.Null
}
if resTmp == nil {
if !graphql.HasFieldError(ctx, fc) {
ec.Errorf(ctx, "must not be null")
}
return graphql.Null
}
res := resTmp.(string)
fc.Result = res
return ec.marshalNString2string(ctx, field.Selections, res)
}
func (ec *executionContext) fieldContext_Event_streamID(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) {
fc = &graphql.FieldContext{
Object: "Event",
Field: field,
IsMethod: true,
IsResolver: false,
Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) {
return nil, errors.New("field of type String does not have child fields")
},
}
return fc, nil
}
func (ec *executionContext) _Event_position(ctx context.Context, field graphql.CollectedField, obj *es.GQLEvent) (ret graphql.Marshaler) {
fc, err := ec.fieldContext_Event_position(ctx, field)
if err != nil {
return graphql.Null
}
ctx = graphql.WithFieldContext(ctx, fc)
defer func() {
if r := recover(); r != nil {
ec.Error(ctx, ec.Recover(ctx, r))
ret = graphql.Null
}
}()
resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) {
ctx = rctx // use context from middleware stack in children
return obj.Position(), nil
})
if err != nil {
ec.Error(ctx, err)
return graphql.Null
}
if resTmp == nil {
if !graphql.HasFieldError(ctx, fc) {
ec.Errorf(ctx, "must not be null")
}
return graphql.Null
}
res := resTmp.(uint64)
fc.Result = res
return ec.marshalNInt2uint64(ctx, field.Selections, res)
}
func (ec *executionContext) fieldContext_Event_position(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) {
fc = &graphql.FieldContext{
Object: "Event",
Field: field,
IsMethod: true,
IsResolver: false,
Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) {
return nil, errors.New("field of type Int does not have child fields")
},
}
return fc, nil
}
func (ec *executionContext) _Event_values(ctx context.Context, field graphql.CollectedField, obj *es.GQLEvent) (ret graphql.Marshaler) { func (ec *executionContext) _Event_values(ctx context.Context, field graphql.CollectedField, obj *es.GQLEvent) (ret graphql.Marshaler) {
fc, err := ec.fieldContext_Event_values(ctx, field) fc, err := ec.fieldContext_Event_values(ctx, field)
if err != nil { if err != nil {
@ -1213,10 +1361,10 @@ func (ec *executionContext) fieldContext_Event_meta(ctx context.Context, field g
return ec.fieldContext_Meta_eventID(ctx, field) return ec.fieldContext_Meta_eventID(ctx, field)
case "streamID": case "streamID":
return ec.fieldContext_Meta_streamID(ctx, field) return ec.fieldContext_Meta_streamID(ctx, field)
case "created":
return ec.fieldContext_Meta_created(ctx, field)
case "position": case "position":
return ec.fieldContext_Meta_position(ctx, field) return ec.fieldContext_Meta_position(ctx, field)
case "created":
return ec.fieldContext_Meta_created(ctx, field)
} }
return nil, fmt.Errorf("no field named %q was found under type Meta", field.Name) return nil, fmt.Errorf("no field named %q was found under type Meta", field.Name)
}, },
@ -1264,6 +1412,10 @@ func (ec *executionContext) fieldContext_Event_linked(ctx context.Context, field
return ec.fieldContext_Event_id(ctx, field) return ec.fieldContext_Event_id(ctx, field)
case "eventID": case "eventID":
return ec.fieldContext_Event_eventID(ctx, field) return ec.fieldContext_Event_eventID(ctx, field)
case "streamID":
return ec.fieldContext_Event_streamID(ctx, field)
case "position":
return ec.fieldContext_Event_position(ctx, field)
case "values": case "values":
return ec.fieldContext_Event_values(ctx, field) return ec.fieldContext_Event_values(ctx, field)
case "bytes": case "bytes":
@ -1341,7 +1493,7 @@ func (ec *executionContext) _Meta_streamID(ctx context.Context, field graphql.Co
}() }()
resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) {
ctx = rctx // use context from middleware stack in children ctx = rctx // use context from middleware stack in children
return obj.StreamID, nil return obj.ActualStreamID, nil
}) })
if err != nil { if err != nil {
ec.Error(ctx, err) ec.Error(ctx, err)
@ -1371,6 +1523,50 @@ func (ec *executionContext) fieldContext_Meta_streamID(ctx context.Context, fiel
return fc, nil return fc, nil
} }
func (ec *executionContext) _Meta_position(ctx context.Context, field graphql.CollectedField, obj *event.Meta) (ret graphql.Marshaler) {
fc, err := ec.fieldContext_Meta_position(ctx, field)
if err != nil {
return graphql.Null
}
ctx = graphql.WithFieldContext(ctx, fc)
defer func() {
if r := recover(); r != nil {
ec.Error(ctx, ec.Recover(ctx, r))
ret = graphql.Null
}
}()
resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) {
ctx = rctx // use context from middleware stack in children
return obj.ActualPosition, nil
})
if err != nil {
ec.Error(ctx, err)
return graphql.Null
}
if resTmp == nil {
if !graphql.HasFieldError(ctx, fc) {
ec.Errorf(ctx, "must not be null")
}
return graphql.Null
}
res := resTmp.(uint64)
fc.Result = res
return ec.marshalNInt2uint64(ctx, field.Selections, res)
}
func (ec *executionContext) fieldContext_Meta_position(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) {
fc = &graphql.FieldContext{
Object: "Meta",
Field: field,
IsMethod: false,
IsResolver: false,
Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) {
return nil, errors.New("field of type Int does not have child fields")
},
}
return fc, nil
}
func (ec *executionContext) _Meta_created(ctx context.Context, field graphql.CollectedField, obj *event.Meta) (ret graphql.Marshaler) { func (ec *executionContext) _Meta_created(ctx context.Context, field graphql.CollectedField, obj *event.Meta) (ret graphql.Marshaler) {
fc, err := ec.fieldContext_Meta_created(ctx, field) fc, err := ec.fieldContext_Meta_created(ctx, field)
if err != nil { if err != nil {
@ -1415,8 +1611,8 @@ func (ec *executionContext) fieldContext_Meta_created(ctx context.Context, field
return fc, nil return fc, nil
} }
func (ec *executionContext) _Meta_position(ctx context.Context, field graphql.CollectedField, obj *event.Meta) (ret graphql.Marshaler) { func (ec *executionContext) _Mutation_truncateStream(ctx context.Context, field graphql.CollectedField) (ret graphql.Marshaler) {
fc, err := ec.fieldContext_Meta_position(ctx, field) fc, err := ec.fieldContext_Mutation_truncateStream(ctx, field)
if err != nil { if err != nil {
return graphql.Null return graphql.Null
} }
@ -1429,7 +1625,7 @@ func (ec *executionContext) _Meta_position(ctx context.Context, field graphql.Co
}() }()
resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) {
ctx = rctx // use context from middleware stack in children ctx = rctx // use context from middleware stack in children
return obj.Position, nil return ec.resolvers.Mutation().TruncateStream(rctx, fc.Args["streamID"].(string), fc.Args["index"].(int64))
}) })
if err != nil { if err != nil {
ec.Error(ctx, err) ec.Error(ctx, err)
@ -1441,21 +1637,32 @@ func (ec *executionContext) _Meta_position(ctx context.Context, field graphql.Co
} }
return graphql.Null return graphql.Null
} }
res := resTmp.(uint64) res := resTmp.(bool)
fc.Result = res fc.Result = res
return ec.marshalNInt2uint64(ctx, field.Selections, res) return ec.marshalNBoolean2bool(ctx, field.Selections, res)
} }
func (ec *executionContext) fieldContext_Meta_position(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { func (ec *executionContext) fieldContext_Mutation_truncateStream(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) {
fc = &graphql.FieldContext{ fc = &graphql.FieldContext{
Object: "Meta", Object: "Mutation",
Field: field, Field: field,
IsMethod: false, IsMethod: true,
IsResolver: false, IsResolver: true,
Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) {
return nil, errors.New("field of type Int does not have child fields") return nil, errors.New("field of type Boolean does not have child fields")
}, },
} }
defer func() {
if r := recover(); r != nil {
err = ec.Recover(ctx, r)
ec.Error(ctx, err)
}
}()
ctx = graphql.WithFieldContext(ctx, fc)
if fc.Args, err = ec.field_Mutation_truncateStream_args(ctx, field.ArgumentMap(ec.Variables)); err != nil {
ec.Error(ctx, err)
return
}
return fc, nil return fc, nil
} }
@ -1914,10 +2121,10 @@ func (ec *executionContext) fieldContext_PostEvent_meta(ctx context.Context, fie
return ec.fieldContext_Meta_eventID(ctx, field) return ec.fieldContext_Meta_eventID(ctx, field)
case "streamID": case "streamID":
return ec.fieldContext_Meta_streamID(ctx, field) return ec.fieldContext_Meta_streamID(ctx, field)
case "created":
return ec.fieldContext_Meta_created(ctx, field)
case "position": case "position":
return ec.fieldContext_Meta_position(ctx, field) return ec.fieldContext_Meta_position(ctx, field)
case "created":
return ec.fieldContext_Meta_created(ctx, field)
} }
return nil, fmt.Errorf("no field named %q was found under type Meta", field.Name) return nil, fmt.Errorf("no field named %q was found under type Meta", field.Name)
}, },
@ -2470,6 +2677,10 @@ func (ec *executionContext) fieldContext_Subscription_eventAdded(ctx context.Con
return ec.fieldContext_Event_id(ctx, field) return ec.fieldContext_Event_id(ctx, field)
case "eventID": case "eventID":
return ec.fieldContext_Event_eventID(ctx, field) return ec.fieldContext_Event_eventID(ctx, field)
case "streamID":
return ec.fieldContext_Event_streamID(ctx, field)
case "position":
return ec.fieldContext_Event_position(ctx, field)
case "values": case "values":
return ec.fieldContext_Event_values(ctx, field) return ec.fieldContext_Event_values(ctx, field)
case "bytes": case "bytes":
@ -4526,6 +4737,20 @@ func (ec *executionContext) _Event(ctx context.Context, sel ast.SelectionSet, ob
out.Values[i] = ec._Event_eventID(ctx, field, obj) out.Values[i] = ec._Event_eventID(ctx, field, obj)
if out.Values[i] == graphql.Null {
atomic.AddUint32(&invalids, 1)
}
case "streamID":
out.Values[i] = ec._Event_streamID(ctx, field, obj)
if out.Values[i] == graphql.Null {
atomic.AddUint32(&invalids, 1)
}
case "position":
out.Values[i] = ec._Event_position(ctx, field, obj)
if out.Values[i] == graphql.Null { if out.Values[i] == graphql.Null {
atomic.AddUint32(&invalids, 1) atomic.AddUint32(&invalids, 1)
} }
@ -4616,16 +4841,16 @@ func (ec *executionContext) _Meta(ctx context.Context, sel ast.SelectionSet, obj
if out.Values[i] == graphql.Null { if out.Values[i] == graphql.Null {
invalids++ invalids++
} }
case "created": case "position":
out.Values[i] = ec._Meta_created(ctx, field, obj) out.Values[i] = ec._Meta_position(ctx, field, obj)
if out.Values[i] == graphql.Null { if out.Values[i] == graphql.Null {
invalids++ invalids++
} }
case "position": case "created":
out.Values[i] = ec._Meta_position(ctx, field, obj) out.Values[i] = ec._Meta_created(ctx, field, obj)
if out.Values[i] == graphql.Null { if out.Values[i] == graphql.Null {
invalids++ invalids++
@ -4660,6 +4885,15 @@ func (ec *executionContext) _Mutation(ctx context.Context, sel ast.SelectionSet)
switch field.Name { switch field.Name {
case "__typename": case "__typename":
out.Values[i] = graphql.MarshalString("Mutation") out.Values[i] = graphql.MarshalString("Mutation")
case "truncateStream":
out.Values[i] = ec.OperationContext.RootResolverMiddleware(innerCtx, func(ctx context.Context) (res graphql.Marshaler) {
return ec._Mutation_truncateStream(ctx, field)
})
if out.Values[i] == graphql.Null {
invalids++
}
case "createSaltyUser": case "createSaltyUser":
out.Values[i] = ec.OperationContext.RootResolverMiddleware(innerCtx, func(ctx context.Context) (res graphql.Marshaler) { out.Values[i] = ec.OperationContext.RootResolverMiddleware(innerCtx, func(ctx context.Context) (res graphql.Marshaler) {

36
main.go
View File

@ -7,9 +7,11 @@ import (
"net/url" "net/url"
"os" "os"
"os/signal" "os/signal"
"runtime/debug"
"strings" "strings"
"time" "time"
"go.opentelemetry.io/otel/attribute"
"go.uber.org/multierr" "go.uber.org/multierr"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
@ -18,6 +20,7 @@ import (
"github.com/sour-is/ev/app/peerfinder" "github.com/sour-is/ev/app/peerfinder"
"github.com/sour-is/ev/app/salty" "github.com/sour-is/ev/app/salty"
"github.com/sour-is/ev/internal/lg" "github.com/sour-is/ev/internal/lg"
"github.com/sour-is/ev/pkg/cron"
"github.com/sour-is/ev/pkg/es" "github.com/sour-is/ev/pkg/es"
diskstore "github.com/sour-is/ev/pkg/es/driver/disk-store" diskstore "github.com/sour-is/ev/pkg/es/driver/disk-store"
memstore "github.com/sour-is/ev/pkg/es/driver/mem-store" memstore "github.com/sour-is/ev/pkg/es/driver/mem-store"
@ -28,8 +31,6 @@ import (
"github.com/sour-is/ev/pkg/set" "github.com/sour-is/ev/pkg/set"
) )
const AppName string = "sour.is-ev"
func main() { func main() {
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill) ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill)
go func() { go func() {
@ -37,7 +38,7 @@ func main() {
defer cancel() defer cancel()
}() }()
ctx, stop := lg.Init(ctx, AppName) ctx, stop := lg.Init(ctx, appName)
defer stop() defer stop()
if err := run(ctx); err != nil && err != http.ErrServerClosed { if err := run(ctx); err != nil && err != http.ErrServerClosed {
@ -47,9 +48,17 @@ func main() {
func run(ctx context.Context) error { func run(ctx context.Context) error {
g, ctx := errgroup.WithContext(ctx) g, ctx := errgroup.WithContext(ctx)
cron := cron.New(cron.DefaultGranularity)
{ {
ctx, span := lg.Span(ctx) ctx, span := lg.Span(ctx)
log.Println(appName, version)
span.SetAttributes(
attribute.String("app", appName),
attribute.String("version", version),
)
err := multierr.Combine( err := multierr.Combine(
es.Init(ctx), es.Init(ctx),
event.Init(ctx), event.Init(ctx),
@ -64,13 +73,13 @@ func run(ctx context.Context) error {
es, err := es.Open( es, err := es.Open(
ctx, ctx,
env("EV_DATA", "mem:"), env("EV_DATA", "mem:"),
resolvelinks.New(),
streamer.New(ctx), streamer.New(ctx),
projecter.New( projecter.New(
ctx, ctx,
projecter.DefaultProjection, projecter.DefaultProjection,
peerfinder.Projector, peerfinder.Projector,
), ),
resolvelinks.New(),
) )
if err != nil { if err != nil {
span.RecordError(err) span.RecordError(err)
@ -118,12 +127,16 @@ func run(ctx context.Context) error {
if enable.Has("peers") { if enable.Has("peers") {
span.AddEvent("Enable Peers") span.AddEvent("Enable Peers")
peers, err := peerfinder.New(ctx, es) peers, err := peerfinder.New(ctx, es, env("PEER_STATUS", ""))
if err != nil { if err != nil {
span.RecordError(err) span.RecordError(err)
return err return err
} }
svcs = append(svcs, peers) svcs = append(svcs, peers)
cron.Once(ctx, peers.RefreshJob)
cron.NewJob("0,15,30,45", peers.RefreshJob)
cron.Once(ctx, peers.CleanJob)
cron.NewJob("0 1", peers.CleanJob)
} }
if enable.Has("gql") { if enable.Has("gql") {
@ -162,6 +175,8 @@ func run(ctx context.Context) error {
span.End() span.End()
} }
g.Go(func() error { return cron.Run(ctx) })
if err := g.Wait(); err != nil && err != http.ErrServerClosed { if err := g.Wait(); err != nil && err != http.ErrServerClosed {
return err return err
} }
@ -177,3 +192,14 @@ func env(name, defaultValue string) string {
log.Println("#", name, "=", defaultValue, "(default)") log.Println("#", name, "=", defaultValue, "(default)")
return defaultValue return defaultValue
} }
var appName, version = func() (string, string) {
if info, ok := debug.ReadBuildInfo(); ok {
_, name, _ := strings.Cut(info.Main.Path, "/")
name = strings.Replace(name, "-", ".", -1)
name = strings.Replace(name, "/", "-", -1)
return name, info.Main.Version
}
return "sour.is-ev", "(devel)"
}()