From bbb45c8854c44a6e6022426a061214befa3a4aa2 Mon Sep 17 00:00:00 2001 From: Jon Lundy Date: Sun, 20 Nov 2022 10:28:50 -0700 Subject: [PATCH] feat: updates to peerfinder app --- app/mercury/mercury.graphqls | 8 + app/mercury/service.go | 1 + app/peerfinder/layouts/form.tpl | 21 - app/peerfinder/layouts/main.tpl | 11 +- app/peerfinder/pages/home.tpl | 21 +- app/peerfinder/pages/req.tpl | 52 +++ app/peerfinder/peer.go | 145 ++++++- app/peerfinder/service.go | 534 ++++++++++++++++++++++++-- go.mod | 2 + go.sum | 4 + internal/graph/generated/generated.go | 328 +++++++++++++--- main.go | 36 +- 12 files changed, 1029 insertions(+), 134 deletions(-) create mode 100644 app/mercury/mercury.graphqls create mode 100644 app/mercury/service.go delete mode 100644 app/peerfinder/layouts/form.tpl create mode 100644 app/peerfinder/pages/req.tpl diff --git a/app/mercury/mercury.graphqls b/app/mercury/mercury.graphqls new file mode 100644 index 0000000..d35001c --- /dev/null +++ b/app/mercury/mercury.graphqls @@ -0,0 +1,8 @@ +extend type Query{ + keys(namespace: String!) [String!]! + get(namespace: String! keys: [String!]) [String]! +} + +extend type Mutation{ + set(namespace: String! key: String! value: String): Bool! +} \ No newline at end of file diff --git a/app/mercury/service.go b/app/mercury/service.go new file mode 100644 index 0000000..22da966 --- /dev/null +++ b/app/mercury/service.go @@ -0,0 +1 @@ +package mercury diff --git a/app/peerfinder/layouts/form.tpl b/app/peerfinder/layouts/form.tpl deleted file mode 100644 index 33963e0..0000000 --- a/app/peerfinder/layouts/form.tpl +++ /dev/null @@ -1,21 +0,0 @@ -
-
-
-
-
-
- - - -
\ No newline at end of file diff --git a/app/peerfinder/layouts/main.tpl b/app/peerfinder/layouts/main.tpl index ca52e77..a880a57 100644 --- a/app/peerfinder/layouts/main.tpl +++ b/app/peerfinder/layouts/main.tpl @@ -18,9 +18,11 @@

DN42 PeerFinder

@@ -31,11 +33,6 @@ {{template "content" .}} -
-

JSON Output

-
{$o|json_encode:128|escape}
-
- {{end}} diff --git a/app/peerfinder/pages/home.tpl b/app/peerfinder/pages/home.tpl index 81eb800..e3dfdd4 100644 --- a/app/peerfinder/pages/home.tpl +++ b/app/peerfinder/pages/home.tpl @@ -30,9 +30,9 @@
- + - +
@@ -40,4 +40,21 @@

If you mark your measurement as hidden, it will not be displayed on the page below. Note that the IP addresses of the target will be shown alongside the result.

+ + +
+

Results

+ {{ with $args := . }} + {{range $req := .Requests}} + + {{end}} + {{end}} +
+ {{end}} diff --git a/app/peerfinder/pages/req.tpl b/app/peerfinder/pages/req.tpl new file mode 100644 index 0000000..d67341c --- /dev/null +++ b/app/peerfinder/pages/req.tpl @@ -0,0 +1,52 @@ +{{template "main" .}} + +{{define "meta"}} + +{{end}} + + +{{define "content"}} + {{range .Requests}} +

Results to {{.RequestIP}}

+ + {{with (orderByPeer .)}} + {{range .}} +
+
+ {{.Country}} :: {{.Name}} :: {{.Nick}} +
+ {{printf "%0.3f ms" .Latency}} +
+
+
+ Note: {{.Note}}
+ VPN Types: {{range .VPNTypes}} {{.}} {{end}}
+ IRC: {{.Nick}} +

Other Results

+ + + + + + + + + + + + {{range .Results}} + + + + + + + {{end}} + +
Peer NameCountryLatencyJitter
{{.Name}}{{.Country}}{{printf "%0.3f ms" .Latency}}{{ if eq .Jitter 0.0 }}—{{ else }}{{ printf "%0.3f ms" .Jitter }}{{ end }}
+
+
+ {{end}} + {{end}} + {{end}} +{{end}} \ No newline at end of file diff --git a/app/peerfinder/peer.go b/app/peerfinder/peer.go index 333ca3e..036de1e 100644 --- a/app/peerfinder/peer.go +++ b/app/peerfinder/peer.go @@ -4,43 +4,149 @@ import ( "bytes" "encoding/json" "fmt" + "net" "net/netip" "strconv" + "strings" "time" + "github.com/tj/go-semver" + + "github.com/oklog/ulid/v2" "github.com/sour-is/ev/pkg/es/event" ) -type Request struct{ +type Request struct { event.AggregateRoot - RequestIP string `json:"req_ip"` - Hidden bool `json:"hide,omitempty"` + RequestID string `json:"req_id"` + RequestIP string `json:"req_ip"` + Hidden bool `json:"hide,omitempty"` + Created time.Time `json:"req_created"` Responses []Response `json:"responses"` } var _ event.Aggregate = (*Request)(nil) + func (a *Request) ApplyEvent(lis ...event.Event) { for _, e := range lis { - switch e:=e.(type) { + switch e := e.(type) { case *RequestSubmitted: + a.RequestID = e.eventMeta.EventID.String() a.RequestIP = e.RequestIP a.Hidden = e.Hidden + a.Created = ulid.Time(e.EventMeta().EventID.Time()) case *ResultSubmitted: - a.Responses = append(a.Responses, Response{ - PeerID: e.PeerID, - PeerVersion: e.PeerVersion, - Latency: e.Latency, - }) + a.Responses = append(a.Responses, Response{ + PeerID: e.PeerID, + ScriptVersion: e.PeerVersion, + Latency: e.Latency, + Jitter: e.Jitter, + MinRTT: e.MinRTT, + MaxRTT: e.MaxRTT, + Sent: e.Sent, + Received: e.Received, + Unreachable: e.Unreachable, + Created: ulid.Time(e.EventMeta().EventID.Time()), + }) } } } +type Time time.Time + +func (t *Time) UnmarshalJSON(b []byte) error { + time, err := time.Parse(`"2006-01-02 15:04:05"`, string(b)) + *t = Time(time) + return err +} +func (t *Time) MarshalJSON() ([]byte, error) { + if t == nil { + return nil, nil + } + i := *t + return time.Time(i).MarshalJSON() +} + +type ipFamily string + +const ( + ipFamilyV4 ipFamily = "IPv4" + ipFamilyV6 ipFamily = "IPv6" + ipFamilyBoth ipFamily = "both" + ipFamilyNone ipFamily = "none" +) + +func (t *ipFamily) UnmarshalJSON(b []byte) error { + i, err := strconv.Atoi(strings.Trim(string(b), `"`)) + switch i { + case 1: + *t = ipFamilyV4 + case 2: + *t = ipFamilyV6 + case 3: + *t = ipFamilyBoth + default: + *t = ipFamilyNone + } + return err +} + +type peerType []string + +func (t *peerType) UnmarshalJSON(b []byte) error { + *t = strings.Split(strings.Trim(string(b), `"`), ",") + return nil +} + +type Peer struct { + ID string `json:"peer_id,omitempty"` + Owner string `json:"peer_owner"` + Nick string `json:"peer_nick"` + Name string `json:"peer_name"` + Country string `json:"peer_country"` + Note string `json:"peer_note"` + Family ipFamily `json:"peer_family"` + Type peerType `json:"peer_type"` + Created Time `json:"peer_created"` +} + +func (p *Peer) CanSupport(ip string) bool { + addr := net.ParseIP(ip) + if addr == nil { + return false + } + if !addr.IsGlobalUnicast() { + return false + } + + switch p.Family { + case ipFamilyV4: + return addr.To4() != nil + case ipFamilyV6: + return addr.To16() != nil + case ipFamilyNone: + return false + } + + return true +} + type Response struct { - PeerID string `json:"peer_id"` - PeerVersion string `json:"peer_version"` - Latency float64 `json:"latency,omitempty"` + Peer *Peer `json:"peer"` + PeerID string `json:"-"` + ScriptVersion string `json:"peer_scriptver"` + + Latency float64 `json:"res_latency"` + Jitter float64 `json:"res_jitter,omitempty"` + MaxRTT float64 `json:"res_maxrtt,omitempty"` + MinRTT float64 `json:"res_minrtt,omitempty"` + Sent int `json:"res_sent,omitempty"` + Received int `json:"res_recv,omitempty"` + Unreachable bool `json:"unreachable,omitempty"` + + Created time.Time `json:"res_created"` } type RequestSubmitted struct { @@ -131,6 +237,12 @@ type ResultSubmitted struct { PeerID string `json:"peer_id"` PeerVersion string `json:"peer_version"` Latency float64 `json:"latency,omitempty"` + Jitter float64 `json:"jitter,omitempty"` + MaxRTT float64 `json:"maxrtt,omitempty"` + MinRTT float64 `json:"minrtt,omitempty"` + Sent int `json:"res_sent,omitempty"` + Received int `json:"res_recv,omitempty"` + Unreachable bool `json:"unreachable,omitempty"` } func (r *ResultSubmitted) Created() time.Time { @@ -185,10 +297,17 @@ func (a *Info) MarshalEnviron() ([]byte, error) { return b.Bytes(), nil } -func (a *Info) OnCreate() error { +func (a *Info) OnUpsert() error { if a.StreamVersion() == 0 { event.Raise(a, &VersionChanged{ScriptVersion: initVersion}) } + current, _ := semver.Parse(initVersion) + previous, _ := semver.Parse(a.ScriptVersion) + + if current.Compare(previous) > 0 { + event.Raise(a, &VersionChanged{ScriptVersion: initVersion}) + } + return nil } diff --git a/app/peerfinder/service.go b/app/peerfinder/service.go index fdc086d..b7ddec6 100644 --- a/app/peerfinder/service.go +++ b/app/peerfinder/service.go @@ -4,33 +4,41 @@ import ( "context" "embed" "encoding/json" + "errors" + "fmt" "html/template" "io" "io/fs" "log" "net" "net/http" + "sort" "strconv" "strings" + "time" + "github.com/oklog/ulid" contentnegotiation "gitlab.com/jamietanna/content-negotiation-go" "go.opentelemetry.io/otel/attribute" - "github.com/oklog/ulid" "github.com/sour-is/ev/internal/lg" "github.com/sour-is/ev/pkg/es" "github.com/sour-is/ev/pkg/es/event" "github.com/sour-is/ev/pkg/locker" + "github.com/sour-is/ev/pkg/math" + "github.com/sour-is/ev/pkg/set" ) const ( - aggInfo = "pf-info" - queueRequests = "pf-requests" - queueResponses = "pf-request-" - queuePeers = "pf-peer-" - initVersion = "1.1.0" + aggInfo = "pf-info" + queueRequests = "pf-requests" + queueResults = "pf-results" + initVersion = "1.2.1" ) +func aggRequest(id string) string { return "pf-request-" + id } +func aggPeer(id string) string { return "pf-peer-" + id } + var ( //go:embed pages/* layouts/* assets/* files embed.FS @@ -38,33 +46,33 @@ var ( ) type service struct { - es *es.EventStore + es *es.EventStore + statusURL string - State locker.Locked[state] + state *locker.Locked[state] } type state struct { - Version string - Requests []Request + peers map[string]*Peer } -func New(ctx context.Context, es *es.EventStore) (*service, error) { +func New(ctx context.Context, es *es.EventStore, statusURL string) (*service, error) { ctx, span := lg.Span(ctx) defer span.End() + loadTemplates() + if err := event.Register(ctx, &RequestSubmitted{}, &ResultSubmitted{}, &VersionChanged{}); err != nil { span.RecordError(err) return nil, err } - svc := &service{es: es} + svc := &service{es: es, statusURL: statusURL, state: locker.New(&state{peers: make(map[string]*Peer)})} return svc, nil } func (s *service) RegisterHTTP(mux *http.ServeMux) { - loadTemplates() - a, err := fs.Sub(files, "assets") - log.Println(err) + a, _ := fs.Sub(files, "assets") assets := http.StripPrefix("/peers/assets/", http.FileServer(http.FS(a))) mux.Handle("/peers/assets/", lg.Htrace(assets, "peer-assets")) @@ -73,7 +81,7 @@ func (s *service) RegisterHTTP(mux *http.ServeMux) { func (s *service) ServeHTTP(w http.ResponseWriter, r *http.Request) { ctx := r.Context() - _, span := lg.Span(ctx) + ctx, span := lg.Span(ctx) defer span.End() r = r.WithContext(ctx) @@ -86,12 +94,20 @@ func (s *service) ServeHTTP(w http.ResponseWriter, r *http.Request) { return case strings.HasPrefix(r.URL.Path, "/peers/req/"): - s.getResults(w, r, strings.TrimPrefix(r.URL.Path, "/peers/req/")) + s.getResultsForRequest(w, r, strings.TrimPrefix(r.URL.Path, "/peers/req/")) return + case strings.HasPrefix(r.URL.Path, "/peers/status"): + s.state.Modify(r.Context(), func(ctx context.Context, state *state) error { + for id, p := range state.peers { + fmt.Fprintln(w, "PEER: ", id, p.Owner, p.Name) + } + + return nil + }) + default: - t := templates["home.tpl"] - t.Execute(w, nil) + s.getResults(w, r) return } case http.MethodPost: @@ -122,8 +138,23 @@ func (s *service) getPending(w http.ResponseWriter, r *http.Request, uuid string attribute.String("uuid", uuid), ) - info, err := es.Upsert(ctx, s.es, "pf-info", func(ctx context.Context, agg *Info) error { - return agg.OnCreate() // initialize if not exists + var peer *Peer + err := s.state.Modify(ctx, func(ctx context.Context, state *state) error { + var ok bool + if peer, ok = state.peers[uuid]; !ok { + return fmt.Errorf("peer not found: %s", uuid) + } + + return nil + }) + if err != nil { + span.RecordError(err) + w.WriteHeader(http.StatusNotFound) + return + } + + info, err := es.Upsert(ctx, s.es, aggInfo, func(ctx context.Context, agg *Info) error { + return agg.OnUpsert() // initialize if not exists }) if err != nil { span.RecordError(err) @@ -138,14 +169,16 @@ func (s *service) getPending(w http.ResponseWriter, r *http.Request, uuid string return } - responses, err := s.es.Read(ctx, queuePeers+uuid, -1, -30) + responses, err := s.es.Read(ctx, aggPeer(uuid), -1, -30) if err != nil { span.RecordError(err) w.WriteHeader(http.StatusInternalServerError) return } - req := filter(requests, responses) + span.AddEvent(fmt.Sprintf("req = %d, res = %d", len(requests), len(responses))) + + req := filter(peer, requests, responses) negotiator := contentnegotiation.NewNegotiator("application/json", "text/environment", "text/plain", "text/html") negotiated, _, err := negotiator.Negotiate(r.Header.Get("Accept")) @@ -183,7 +216,41 @@ func (s *service) getPending(w http.ResponseWriter, r *http.Request, uuid string } span.RecordError(err) } -func (s *service) getResults(w http.ResponseWriter, r *http.Request, uuid string) { +func (s *service) getResults(w http.ResponseWriter, r *http.Request) { + ctx, span := lg.Span(r.Context()) + defer span.End() + + events, err := s.es.Read(ctx, queueRequests, -1, -30) + if err != nil { + span.RecordError(err) + w.WriteHeader(http.StatusInternalServerError) + return + } + + requests := make([]*Request, len(events)) + for i, req := range events { + if req, ok := req.(*RequestSubmitted); ok { + requests[i], err = s.loadResult(ctx, req.RequestID()) + if err != nil { + span.RecordError(err) + w.WriteHeader(http.StatusInternalServerError) + return + } + } + } + + args := requestArgs(r) + args.Requests = requests + + s.state.Modify(ctx, func(ctx context.Context, state *state) error { + args.CountPeers = len(state.peers) + return nil + }) + + t := templates["home.tpl"] + t.Execute(w, args) +} +func (s *service) getResultsForRequest(w http.ResponseWriter, r *http.Request, uuid string) { ctx, span := lg.Span(r.Context()) defer span.End() @@ -191,23 +258,33 @@ func (s *service) getResults(w http.ResponseWriter, r *http.Request, uuid string attribute.String("uuid", uuid), ) - responses, err := s.es.Read(ctx, queueResponses+uuid, -1, es.AllEvents) + request, err := s.loadResult(ctx, uuid) if err != nil { w.WriteHeader(http.StatusInternalServerError) return } negotiator := contentnegotiation.NewNegotiator("application/json", "text/environment", "text/csv", "text/plain", "text/html") - negotiated, _, err := negotiator.Negotiate("application/json") + negotiated, _, err := negotiator.Negotiate(r.Header.Get("Accept")) if err != nil { w.WriteHeader(http.StatusNotAcceptable) return } + span.AddEvent(negotiated.String()) switch negotiated.String() { // case "text/environment": // encodeTo(w, responses.MarshalBinary) case "application/json": - json.NewEncoder(w).Encode(responses) + json.NewEncoder(w).Encode(request) + return + default: + args := requestArgs(r) + args.Requests = append(args.Requests, request) + span.AddEvent(fmt.Sprint(args)) + err := renderTo(w, "req.tpl", args) + span.RecordError(err) + + return } } func (s *service) postRequest(w http.ResponseWriter, r *http.Request) { @@ -219,7 +296,14 @@ func (s *service) postRequest(w http.ResponseWriter, r *http.Request) { return } - ip := net.ParseIP(r.Form.Get("req_ip")) + args := requestArgs(r) + requestIP := args.RemoteIP + + if ip := r.Form.Get("req_ip"); ip != "" { + requestIP = ip + } + + ip := net.ParseIP(requestIP) if ip == nil { w.WriteHeader(http.StatusBadRequest) return @@ -236,6 +320,8 @@ func (s *service) postRequest(w http.ResponseWriter, r *http.Request) { ) s.es.Append(ctx, queueRequests, event.NewEvents(req)) + + http.Redirect(w, r, "/peers/req/"+req.RequestID(), http.StatusSeeOther) } func (s *service) postResult(w http.ResponseWriter, r *http.Request, id string) { ctx, span := lg.Span(r.Context()) @@ -255,10 +341,26 @@ func (s *service) postResult(w http.ResponseWriter, r *http.Request, id string) return } + peerID := r.Form.Get("peer_id") + err := s.state.Modify(ctx, func(ctx context.Context, state *state) error { + var ok bool + if _, ok = state.peers[peerID]; !ok { + // fmt.Printf("peer not found: %s\n", peerID) + return fmt.Errorf("peer not found: %s", peerID) + } + + return nil + }) + if err != nil { + span.RecordError(err) + w.WriteHeader(http.StatusNotFound) + return + } + + var unreach bool latency, err := strconv.ParseFloat(r.Form.Get("res_latency"), 64) if err != nil { - w.WriteHeader(http.StatusUnprocessableEntity) - return + unreach = true } req := &ResultSubmitted{ @@ -266,13 +368,35 @@ func (s *service) postResult(w http.ResponseWriter, r *http.Request, id string) PeerID: r.Form.Get("peer_id"), PeerVersion: r.Form.Get("peer_version"), Latency: latency, + Unreachable: unreach, + } + + if jitter, err := strconv.ParseFloat(r.Form.Get("res_jitter"), 64); err == nil { + req.Jitter = jitter + } else { + span.RecordError(err) + } + if minrtt, err := strconv.ParseFloat(r.Form.Get("res_minrtt"), 64); err == nil { + req.MinRTT = minrtt + } else { + span.RecordError(err) + } + if maxrtt, err := strconv.ParseFloat(r.Form.Get("res_maxrtt"), 64); err == nil { + req.MaxRTT = maxrtt + } else { + span.RecordError(err) } span.SetAttributes( attribute.Stringer("result", req), ) - idx, err := s.es.LastIndex(ctx, queueResponses+id) + s.state.Modify(ctx, func(ctx context.Context, state *state) error { + + return nil + }) + + idx, err := s.es.LastIndex(ctx, aggRequest(id)) if err != nil { w.WriteHeader(http.StatusInternalServerError) return @@ -282,16 +406,219 @@ func (s *service) postResult(w http.ResponseWriter, r *http.Request, id string) return } - s.es.Append(ctx, queueRequests, event.NewEvents(req)) + s.es.Append(ctx, queueResults, event.NewEvents(req)) +} +func (s *service) RefreshJob(ctx context.Context, _ time.Time) error { + ctx, span := lg.Span(ctx) + defer span.End() + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, s.statusURL, nil) + span.RecordError(err) + if err != nil { + return err + } + req.Header.Set("Accept", "application/json") + + res, err := http.DefaultClient.Do(req) + span.RecordError(err) + if err != nil { + return err + } + + defer res.Body.Close() + var peers []*Peer + err = json.NewDecoder(res.Body).Decode(&peers) + span.RecordError(err) + if err != nil { + return err + } + + span.AddEvent(fmt.Sprintf("processed %d peers", len(peers))) + + err = s.state.Modify(ctx, func(ctx context.Context, t *state) error { + for _, peer := range peers { + t.peers[peer.ID] = peer + } + + return nil + }) + span.RecordError(err) + return err +} +func (s *service) CleanJob(ctx context.Context, now time.Time) error { + ctx, span := lg.Span(ctx) + defer span.End() + + fmt.Println("clear peerfinder requests") + span.AddEvent("clear peerfinder requests") + + endRequestID, err := s.cleanRequests(ctx, now) + if err != nil { + return err + } + if err = s.cleanResults(ctx, endRequestID); err != nil { + return err + } + + return s.cleanPeerJobs(ctx) +} +func (s *service) cleanPeerJobs(ctx context.Context) error { + ctx, span := lg.Span(ctx) + defer span.End() + + peers := set.New[string]() + err := s.state.Modify(ctx, func(ctx context.Context, state *state) error { + for id := range state.peers { + peers.Add(id) + } + return nil + }) + if err != nil { + return err + } + + // trunctate all the peer streams to last 30 + for streamID := range peers { + streamID = aggPeer(streamID) + first, err := s.es.FirstIndex(ctx, streamID) + if err != nil { + return err + } + last, err := s.es.LastIndex(ctx, streamID) + if err != nil { + return err + } + newFirst := math.Max(math.Max(int64(last-30), int64(first)), 1) + if last == 0 || newFirst == int64(first) { + // fmt.Println("SKIP", streamID, first, newFirst, last) + span.AddEvent(fmt.Sprint("SKIP", streamID, first, newFirst, last)) + continue + } + // fmt.Println("TRUNC", streamID, first, newFirst, last) + span.AddEvent(fmt.Sprint("TRUNC", streamID, first, newFirst, last)) + err = s.es.Truncate(ctx, streamID, int64(newFirst)) + if err != nil { + return err + } + } + + return nil +} +func (s *service) cleanRequests(ctx context.Context, now time.Time) (string, error) { + ctx, span := lg.Span(ctx) + defer span.End() + + var streamIDs []string + var endPosition uint64 + var endRequestID string + + last, err := s.es.LastIndex(ctx, queueRequests) + if err != nil { + return "", err + } + +end: + for { + events, err := s.es.Read(ctx, queueRequests, int64(endPosition), 1000) // read 1000 from the top each loop. + if err != nil && !errors.Is(err, es.ErrNotFound) { + span.RecordError(err) + return "", err + } + + if len(events) == 0 { + break + } + + endPosition = events.Last().EventMeta().ActualPosition + for _, event := range events { + switch e := event.(type) { + case *RequestSubmitted: + if e.eventMeta.ActualPosition < last-30 || e.Created().Before(now.Add(-24*time.Hour)) { + streamIDs = append(streamIDs, aggRequest(e.RequestID())) + } else { + endRequestID = e.RequestID() + endPosition = e.eventMeta.ActualPosition + break end + } + } + } + } + + // truncate all reqs to found end position + // fmt.Println("TRUNC", queueRequests, int64(endPosition), last) + span.AddEvent(fmt.Sprint("TRUNC", queueRequests, int64(endPosition), last)) + err = s.es.Truncate(ctx, queueRequests, int64(endPosition)) + if err != nil { + return "", err + } + + // truncate all the request streams + for _, streamID := range streamIDs { + last, err := s.es.LastIndex(ctx, streamID) + if err != nil { + return "", err + } + fmt.Println("TRUNC", streamID, last) + span.AddEvent(fmt.Sprint("TRUNC", streamID, last)) + err = s.es.Truncate(ctx, streamID, int64(last)) + if err != nil { + return "", err + } + } + + return endRequestID, nil +} +func (s *service) cleanResults(ctx context.Context, endRequestID string) error { + ctx, span := lg.Span(ctx) + defer span.End() + + var endPosition uint64 + + done := false + for !done { + events, err := s.es.Read(ctx, queueResults, int64(endPosition), 1000) // read 30 from the top each loop. + if err != nil { + return err + } + + if len(events) == 0 { + done = true + continue + } + + endPosition = events.Last().EventMeta().ActualPosition + + for _, event := range events { + switch e := event.(type) { + case *ResultSubmitted: + if e.RequestID == endRequestID { + done = true + endPosition = e.eventMeta.ActualPosition + } + } + } + } + // truncate all reqs to found end position + // fmt.Println("TRUNC", queueResults, int64(endPosition), last) + span.AddEvent(fmt.Sprint("TRUNC", queueResults, int64(endPosition))) + err := s.es.Truncate(ctx, queueResults, int64(endPosition)) + if err != nil { + return err + } + return nil } -func filter(requests, responses event.Events) *RequestSubmitted { +func filter(peer *Peer, requests, responses event.Events) *RequestSubmitted { have := make(map[string]struct{}, len(responses)) for _, res := range toList[ResultSubmitted](responses...) { have[res.RequestID] = struct{}{} } for _, req := range reverse(toList[RequestSubmitted](requests...)...) { if _, ok := have[req.RequestID()]; !ok { + if !peer.CanSupport(req.RequestIP) { + continue + } + return req } } @@ -345,12 +672,14 @@ func loadTemplates() error { if tmpl.IsDir() { continue } - log.Println(tmpl.Name()) - pt, err := template.ParseFS(files, "pages/"+tmpl.Name(), "layouts/*.tpl") + pt := template.New(tmpl.Name()) + pt.Funcs(funcMap) + pt, err = pt.ParseFS(files, "pages/"+tmpl.Name(), "layouts/*.tpl") if err != nil { + log.Println(err) + return err } - templates[tmpl.Name()] = pt } return nil @@ -364,17 +693,144 @@ func Projector(e event.Event) []event.Event { switch e := e.(type) { case *RequestSubmitted: e1 := event.NewPtr(streamID, streamPos) - event.SetStreamID(queueResponses+e.RequestID(), e1) + event.SetStreamID(aggRequest(e.RequestID()), e1) return []event.Event{e1} case *ResultSubmitted: e1 := event.NewPtr(streamID, streamPos) - event.SetStreamID(queueResponses+e.RequestID, e1) + event.SetStreamID(aggRequest(e.RequestID), e1) e2 := event.NewPtr(streamID, streamPos) - event.SetStreamID(queuePeers+e.PeerID, e2) + event.SetStreamID(aggPeer(e.PeerID), e2) return []event.Event{e1, e2} } return nil } + +type Args struct { + RemoteIP string + Requests []*Request + CountPeers int +} + +func requestArgs(r *http.Request) Args { + remoteIP, _, _ := strings.Cut(r.RemoteAddr, ":") + if s := r.Header.Get("X-Forwarded-For"); s != "" { + remoteIP = s + } + return Args{ + RemoteIP: remoteIP, + } +} + +func renderTo(w io.Writer, name string, args any) (err error) { + defer func() { + if p := recover(); p != nil { + err = fmt.Errorf("panic: %s", p) + } + if err != nil { + fmt.Fprint(w, err) + } + }() + + t, ok := templates[name] + if !ok || t == nil { + return fmt.Errorf("missing template") + } + return t.Execute(w, args) +} + +type ListResponse []Response + +func (lis ListResponse) Len() int { + return len(lis) +} +func (lis ListResponse) Less(i, j int) bool { + return lis[i].Latency < lis[j].Latency +} +func (lis ListResponse) Swap(i, j int) { + lis[i], lis[j] = lis[j], lis[i] +} + +func fnOrderByPeer(rq *Request) any { + type peerResult struct { + Name string + Country string + Latency float64 + Jitter float64 + } + type peer struct { + Name string + Note string + Nick string + Country string + Latency float64 + Jitter float64 + VPNTypes []string + + Results []peerResult + } + + peers := make(map[string]peer) + sort.Sort(ListResponse(rq.Responses)) + for _, rs := range rq.Responses { + p, ok := peers[rs.Peer.Owner] + + if !ok { + p.Country = rs.Peer.Country + p.Name = rs.Peer.Name + p.Nick = rs.Peer.Nick + p.Note = rs.Peer.Note + p.Latency = rs.Latency + p.Jitter = rs.Jitter + p.VPNTypes = rs.Peer.Type + } + + p.Results = append(p.Results, peerResult{ + Name: rs.Peer.Name, + Country: rs.Peer.Country, + Latency: rs.Latency, + Jitter: rs.Jitter, + }) + + peers[rs.Peer.Owner] = p + } + + return peers +} +func fnCountResponses(rq *Request) int { + count := 0 + for _, res := range rq.Responses { + if !res.Unreachable { + count++ + } + } + return count +} + +var funcMap = map[string]any{ + "orderByPeer": fnOrderByPeer, + "countResponses": fnCountResponses, +} + +func (s *service) loadResult(ctx context.Context, uuid string) (*Request, error) { + request := &Request{} + request.SetStreamID(aggRequest(uuid)) + err := s.es.Load(ctx, request) + if err != nil { + return nil, err + } + + return request, s.state.Modify(ctx, func(ctx context.Context, t *state) error { + for i := range request.Responses { + res := &request.Responses[i] + if peer, ok := t.peers[res.PeerID]; ok { + res.Peer = peer + res.Peer.ID = "" + } + } + + return nil + }) +} diff --git a/go.mod b/go.mod index 2314b08..30fc50c 100644 --- a/go.mod +++ b/go.mod @@ -22,6 +22,8 @@ require ( golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 ) +require github.com/tj/go-semver v1.0.0 + require ( git.mills.io/prologic/msgbus v0.1.19 // indirect github.com/ScaleFT/sshkeys v0.0.0-20200327173127-6142f742bca5 // indirect diff --git a/go.sum b/go.sum index b59e7c2..de07fc2 100644 --- a/go.sum +++ b/go.sum @@ -409,6 +409,10 @@ github.com/tidwall/wal v1.1.7 h1:emc1TRjIVsdKKSnpwGBAcsAGg0767SvUk8+ygx7Bb+4= github.com/tidwall/wal v1.1.7/go.mod h1:r6lR1j27W9EPalgHiB7zLJDYu3mzW5BQP5KrzBpYY/E= github.com/timewasted/go-accept-headers v0.0.0-20130320203746-c78f304b1b09 h1:QVxbx5l/0pzciWYOynixQMtUhPYC3YKD6EcUlOsgGqw= github.com/timewasted/go-accept-headers v0.0.0-20130320203746-c78f304b1b09/go.mod h1:Uy/Rnv5WKuOO+PuDhuYLEpUiiKIZtss3z519uk67aF0= +github.com/tj/assert v0.0.0-20190920132354-ee03d75cd160 h1:NSWpaDaurcAJY7PkL8Xt0PhZE7qpvbZl5ljd8r6U0bI= +github.com/tj/assert v0.0.0-20190920132354-ee03d75cd160/go.mod h1:mZ9/Rh9oLWpLLDRpvE+3b7gP/C2YyLFYxNmcLnPTMe0= +github.com/tj/go-semver v1.0.0 h1:vpn6Jmn6Hi3QSmrP1PzYcqScop9IZiGCVOSn18wzu8w= +github.com/tj/go-semver v1.0.0/go.mod h1:YZuwVc013rh7KDV0k6tPbWrFeEHBHcp8amfJL+nHzjM= github.com/tklauser/go-sysconf v0.3.10/go.mod h1:C8XykCvCb+Gn0oNCWPIlcb0RuglQTYaQ2hGm7jmxEFk= github.com/tklauser/numcpus v0.4.0/go.mod h1:1+UI3pD8NW14VMwdgJNJ1ESk2UnwhAnz5hMwiKKqXCQ= github.com/tyler-smith/go-bip39 v1.1.0 h1:5eUemwrMargf3BSLRRCalXT93Ns6pQJIjYQN2nyfOP8= diff --git a/internal/graph/generated/generated.go b/internal/graph/generated/generated.go index c8d6943..82bd7be 100644 --- a/internal/graph/generated/generated.go +++ b/internal/graph/generated/generated.go @@ -58,25 +58,28 @@ type ComplexityRoot struct { } Event struct { - Bytes func(childComplexity int) int - Created func(childComplexity int) int - EventID func(childComplexity int) int - ID func(childComplexity int) int - Linked func(childComplexity int) int - Meta func(childComplexity int) int - Type func(childComplexity int) int - Values func(childComplexity int) int + Bytes func(childComplexity int) int + Created func(childComplexity int) int + EventID func(childComplexity int) int + ID func(childComplexity int) int + Linked func(childComplexity int) int + Meta func(childComplexity int) int + Position func(childComplexity int) int + StreamID func(childComplexity int) int + Type func(childComplexity int) int + Values func(childComplexity int) int } Meta struct { - Created func(childComplexity int) int - GetEventID func(childComplexity int) int - Position func(childComplexity int) int - StreamID func(childComplexity int) int + ActualPosition func(childComplexity int) int + ActualStreamID func(childComplexity int) int + Created func(childComplexity int) int + GetEventID func(childComplexity int) int } Mutation struct { CreateSaltyUser func(childComplexity int, nick string, pubkey string) int + TruncateStream func(childComplexity int, streamID string, index int64) int } PageInfo struct { @@ -118,6 +121,7 @@ type ComplexityRoot struct { } type MutationResolver interface { + TruncateStream(ctx context.Context, streamID string, index int64) (bool, error) CreateSaltyUser(ctx context.Context, nick string, pubkey string) (*salty.SaltyUser, error) } type QueryResolver interface { @@ -201,6 +205,20 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in return e.complexity.Event.Meta(childComplexity), true + case "Event.position": + if e.complexity.Event.Position == nil { + break + } + + return e.complexity.Event.Position(childComplexity), true + + case "Event.streamID": + if e.complexity.Event.StreamID == nil { + break + } + + return e.complexity.Event.StreamID(childComplexity), true + case "Event.type": if e.complexity.Event.Type == nil { break @@ -215,6 +233,20 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in return e.complexity.Event.Values(childComplexity), true + case "Meta.position": + if e.complexity.Meta.ActualPosition == nil { + break + } + + return e.complexity.Meta.ActualPosition(childComplexity), true + + case "Meta.streamID": + if e.complexity.Meta.ActualStreamID == nil { + break + } + + return e.complexity.Meta.ActualStreamID(childComplexity), true + case "Meta.created": if e.complexity.Meta.Created == nil { break @@ -229,20 +261,6 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in return e.complexity.Meta.GetEventID(childComplexity), true - case "Meta.position": - if e.complexity.Meta.Position == nil { - break - } - - return e.complexity.Meta.Position(childComplexity), true - - case "Meta.streamID": - if e.complexity.Meta.StreamID == nil { - break - } - - return e.complexity.Meta.StreamID(childComplexity), true - case "Mutation.createSaltyUser": if e.complexity.Mutation.CreateSaltyUser == nil { break @@ -255,6 +273,18 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in return e.complexity.Mutation.CreateSaltyUser(childComplexity, args["nick"].(string), args["pubkey"].(string)), true + case "Mutation.truncateStream": + if e.complexity.Mutation.TruncateStream == nil { + break + } + + args, err := ec.field_Mutation_truncateStream_args(context.TODO(), rawArgs) + if err != nil { + return 0, false + } + + return e.complexity.Mutation.TruncateStream(childComplexity, args["streamID"].(string), args["index"].(int64)), true + case "PageInfo.begin": if e.complexity.PageInfo.Begin == nil { break @@ -502,14 +532,17 @@ var sources = []*ast.Source{ {Name: "../../../pkg/es/es.graphqls", Input: ` type Meta @goModel(model: "github.com/sour-is/ev/pkg/es/event.Meta") { eventID: String! @goField(name: "getEventID") - streamID: String! + streamID: String! @goField(name: "ActualStreamID") + position: Int! @goField(name: "ActualPosition") created: Time! - position: Int! } extend type Query { events(streamID: String! paging: PageInput): Connection! } +extend type Mutation { + truncateStream(streamID: String! index:Int!): Boolean! +} extend type Subscription { """after == 0 start from begining, after == -1 start from end""" eventAdded(streamID: String! after: Int! = -1): Event @@ -519,6 +552,9 @@ type Event implements Edge @goModel(model: "github.com/sour-is/ev/pkg/es.GQLEven id: ID! eventID: String! + streamID: String! + position: Int! + values: Map! bytes: String! type: String! @@ -644,6 +680,30 @@ func (ec *executionContext) field_Mutation_createSaltyUser_args(ctx context.Cont return args, nil } +func (ec *executionContext) field_Mutation_truncateStream_args(ctx context.Context, rawArgs map[string]interface{}) (map[string]interface{}, error) { + var err error + args := map[string]interface{}{} + var arg0 string + if tmp, ok := rawArgs["streamID"]; ok { + ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("streamID")) + arg0, err = ec.unmarshalNString2string(ctx, tmp) + if err != nil { + return nil, err + } + } + args["streamID"] = arg0 + var arg1 int64 + if tmp, ok := rawArgs["index"]; ok { + ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("index")) + arg1, err = ec.unmarshalNInt2int64(ctx, tmp) + if err != nil { + return nil, err + } + } + args["index"] = arg1 + return args, nil +} + func (ec *executionContext) field_Query___type_args(ctx context.Context, rawArgs map[string]interface{}) (map[string]interface{}, error) { var err error args := map[string]interface{}{} @@ -994,6 +1054,94 @@ func (ec *executionContext) fieldContext_Event_eventID(ctx context.Context, fiel return fc, nil } +func (ec *executionContext) _Event_streamID(ctx context.Context, field graphql.CollectedField, obj *es.GQLEvent) (ret graphql.Marshaler) { + fc, err := ec.fieldContext_Event_streamID(ctx, field) + if err != nil { + return graphql.Null + } + ctx = graphql.WithFieldContext(ctx, fc) + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return obj.StreamID(), nil + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + if !graphql.HasFieldError(ctx, fc) { + ec.Errorf(ctx, "must not be null") + } + return graphql.Null + } + res := resTmp.(string) + fc.Result = res + return ec.marshalNString2string(ctx, field.Selections, res) +} + +func (ec *executionContext) fieldContext_Event_streamID(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "Event", + Field: field, + IsMethod: true, + IsResolver: false, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + return nil, errors.New("field of type String does not have child fields") + }, + } + return fc, nil +} + +func (ec *executionContext) _Event_position(ctx context.Context, field graphql.CollectedField, obj *es.GQLEvent) (ret graphql.Marshaler) { + fc, err := ec.fieldContext_Event_position(ctx, field) + if err != nil { + return graphql.Null + } + ctx = graphql.WithFieldContext(ctx, fc) + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return obj.Position(), nil + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + if !graphql.HasFieldError(ctx, fc) { + ec.Errorf(ctx, "must not be null") + } + return graphql.Null + } + res := resTmp.(uint64) + fc.Result = res + return ec.marshalNInt2uint64(ctx, field.Selections, res) +} + +func (ec *executionContext) fieldContext_Event_position(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "Event", + Field: field, + IsMethod: true, + IsResolver: false, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + return nil, errors.New("field of type Int does not have child fields") + }, + } + return fc, nil +} + func (ec *executionContext) _Event_values(ctx context.Context, field graphql.CollectedField, obj *es.GQLEvent) (ret graphql.Marshaler) { fc, err := ec.fieldContext_Event_values(ctx, field) if err != nil { @@ -1213,10 +1361,10 @@ func (ec *executionContext) fieldContext_Event_meta(ctx context.Context, field g return ec.fieldContext_Meta_eventID(ctx, field) case "streamID": return ec.fieldContext_Meta_streamID(ctx, field) - case "created": - return ec.fieldContext_Meta_created(ctx, field) case "position": return ec.fieldContext_Meta_position(ctx, field) + case "created": + return ec.fieldContext_Meta_created(ctx, field) } return nil, fmt.Errorf("no field named %q was found under type Meta", field.Name) }, @@ -1264,6 +1412,10 @@ func (ec *executionContext) fieldContext_Event_linked(ctx context.Context, field return ec.fieldContext_Event_id(ctx, field) case "eventID": return ec.fieldContext_Event_eventID(ctx, field) + case "streamID": + return ec.fieldContext_Event_streamID(ctx, field) + case "position": + return ec.fieldContext_Event_position(ctx, field) case "values": return ec.fieldContext_Event_values(ctx, field) case "bytes": @@ -1341,7 +1493,7 @@ func (ec *executionContext) _Meta_streamID(ctx context.Context, field graphql.Co }() resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { ctx = rctx // use context from middleware stack in children - return obj.StreamID, nil + return obj.ActualStreamID, nil }) if err != nil { ec.Error(ctx, err) @@ -1371,6 +1523,50 @@ func (ec *executionContext) fieldContext_Meta_streamID(ctx context.Context, fiel return fc, nil } +func (ec *executionContext) _Meta_position(ctx context.Context, field graphql.CollectedField, obj *event.Meta) (ret graphql.Marshaler) { + fc, err := ec.fieldContext_Meta_position(ctx, field) + if err != nil { + return graphql.Null + } + ctx = graphql.WithFieldContext(ctx, fc) + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return obj.ActualPosition, nil + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + if !graphql.HasFieldError(ctx, fc) { + ec.Errorf(ctx, "must not be null") + } + return graphql.Null + } + res := resTmp.(uint64) + fc.Result = res + return ec.marshalNInt2uint64(ctx, field.Selections, res) +} + +func (ec *executionContext) fieldContext_Meta_position(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "Meta", + Field: field, + IsMethod: false, + IsResolver: false, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + return nil, errors.New("field of type Int does not have child fields") + }, + } + return fc, nil +} + func (ec *executionContext) _Meta_created(ctx context.Context, field graphql.CollectedField, obj *event.Meta) (ret graphql.Marshaler) { fc, err := ec.fieldContext_Meta_created(ctx, field) if err != nil { @@ -1415,8 +1611,8 @@ func (ec *executionContext) fieldContext_Meta_created(ctx context.Context, field return fc, nil } -func (ec *executionContext) _Meta_position(ctx context.Context, field graphql.CollectedField, obj *event.Meta) (ret graphql.Marshaler) { - fc, err := ec.fieldContext_Meta_position(ctx, field) +func (ec *executionContext) _Mutation_truncateStream(ctx context.Context, field graphql.CollectedField) (ret graphql.Marshaler) { + fc, err := ec.fieldContext_Mutation_truncateStream(ctx, field) if err != nil { return graphql.Null } @@ -1429,7 +1625,7 @@ func (ec *executionContext) _Meta_position(ctx context.Context, field graphql.Co }() resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { ctx = rctx // use context from middleware stack in children - return obj.Position, nil + return ec.resolvers.Mutation().TruncateStream(rctx, fc.Args["streamID"].(string), fc.Args["index"].(int64)) }) if err != nil { ec.Error(ctx, err) @@ -1441,21 +1637,32 @@ func (ec *executionContext) _Meta_position(ctx context.Context, field graphql.Co } return graphql.Null } - res := resTmp.(uint64) + res := resTmp.(bool) fc.Result = res - return ec.marshalNInt2uint64(ctx, field.Selections, res) + return ec.marshalNBoolean2bool(ctx, field.Selections, res) } -func (ec *executionContext) fieldContext_Meta_position(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { +func (ec *executionContext) fieldContext_Mutation_truncateStream(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { fc = &graphql.FieldContext{ - Object: "Meta", + Object: "Mutation", Field: field, - IsMethod: false, - IsResolver: false, + IsMethod: true, + IsResolver: true, Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { - return nil, errors.New("field of type Int does not have child fields") + return nil, errors.New("field of type Boolean does not have child fields") }, } + defer func() { + if r := recover(); r != nil { + err = ec.Recover(ctx, r) + ec.Error(ctx, err) + } + }() + ctx = graphql.WithFieldContext(ctx, fc) + if fc.Args, err = ec.field_Mutation_truncateStream_args(ctx, field.ArgumentMap(ec.Variables)); err != nil { + ec.Error(ctx, err) + return + } return fc, nil } @@ -1914,10 +2121,10 @@ func (ec *executionContext) fieldContext_PostEvent_meta(ctx context.Context, fie return ec.fieldContext_Meta_eventID(ctx, field) case "streamID": return ec.fieldContext_Meta_streamID(ctx, field) - case "created": - return ec.fieldContext_Meta_created(ctx, field) case "position": return ec.fieldContext_Meta_position(ctx, field) + case "created": + return ec.fieldContext_Meta_created(ctx, field) } return nil, fmt.Errorf("no field named %q was found under type Meta", field.Name) }, @@ -2470,6 +2677,10 @@ func (ec *executionContext) fieldContext_Subscription_eventAdded(ctx context.Con return ec.fieldContext_Event_id(ctx, field) case "eventID": return ec.fieldContext_Event_eventID(ctx, field) + case "streamID": + return ec.fieldContext_Event_streamID(ctx, field) + case "position": + return ec.fieldContext_Event_position(ctx, field) case "values": return ec.fieldContext_Event_values(ctx, field) case "bytes": @@ -4526,6 +4737,20 @@ func (ec *executionContext) _Event(ctx context.Context, sel ast.SelectionSet, ob out.Values[i] = ec._Event_eventID(ctx, field, obj) + if out.Values[i] == graphql.Null { + atomic.AddUint32(&invalids, 1) + } + case "streamID": + + out.Values[i] = ec._Event_streamID(ctx, field, obj) + + if out.Values[i] == graphql.Null { + atomic.AddUint32(&invalids, 1) + } + case "position": + + out.Values[i] = ec._Event_position(ctx, field, obj) + if out.Values[i] == graphql.Null { atomic.AddUint32(&invalids, 1) } @@ -4616,16 +4841,16 @@ func (ec *executionContext) _Meta(ctx context.Context, sel ast.SelectionSet, obj if out.Values[i] == graphql.Null { invalids++ } - case "created": + case "position": - out.Values[i] = ec._Meta_created(ctx, field, obj) + out.Values[i] = ec._Meta_position(ctx, field, obj) if out.Values[i] == graphql.Null { invalids++ } - case "position": + case "created": - out.Values[i] = ec._Meta_position(ctx, field, obj) + out.Values[i] = ec._Meta_created(ctx, field, obj) if out.Values[i] == graphql.Null { invalids++ @@ -4660,6 +4885,15 @@ func (ec *executionContext) _Mutation(ctx context.Context, sel ast.SelectionSet) switch field.Name { case "__typename": out.Values[i] = graphql.MarshalString("Mutation") + case "truncateStream": + + out.Values[i] = ec.OperationContext.RootResolverMiddleware(innerCtx, func(ctx context.Context) (res graphql.Marshaler) { + return ec._Mutation_truncateStream(ctx, field) + }) + + if out.Values[i] == graphql.Null { + invalids++ + } case "createSaltyUser": out.Values[i] = ec.OperationContext.RootResolverMiddleware(innerCtx, func(ctx context.Context) (res graphql.Marshaler) { diff --git a/main.go b/main.go index 9893f57..8b876c9 100644 --- a/main.go +++ b/main.go @@ -7,9 +7,11 @@ import ( "net/url" "os" "os/signal" + "runtime/debug" "strings" "time" + "go.opentelemetry.io/otel/attribute" "go.uber.org/multierr" "golang.org/x/sync/errgroup" @@ -18,6 +20,7 @@ import ( "github.com/sour-is/ev/app/peerfinder" "github.com/sour-is/ev/app/salty" "github.com/sour-is/ev/internal/lg" + "github.com/sour-is/ev/pkg/cron" "github.com/sour-is/ev/pkg/es" diskstore "github.com/sour-is/ev/pkg/es/driver/disk-store" memstore "github.com/sour-is/ev/pkg/es/driver/mem-store" @@ -28,8 +31,6 @@ import ( "github.com/sour-is/ev/pkg/set" ) -const AppName string = "sour.is-ev" - func main() { ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill) go func() { @@ -37,7 +38,7 @@ func main() { defer cancel() }() - ctx, stop := lg.Init(ctx, AppName) + ctx, stop := lg.Init(ctx, appName) defer stop() if err := run(ctx); err != nil && err != http.ErrServerClosed { @@ -47,9 +48,17 @@ func main() { func run(ctx context.Context) error { g, ctx := errgroup.WithContext(ctx) + cron := cron.New(cron.DefaultGranularity) + { ctx, span := lg.Span(ctx) + log.Println(appName, version) + span.SetAttributes( + attribute.String("app", appName), + attribute.String("version", version), + ) + err := multierr.Combine( es.Init(ctx), event.Init(ctx), @@ -64,13 +73,13 @@ func run(ctx context.Context) error { es, err := es.Open( ctx, env("EV_DATA", "mem:"), + resolvelinks.New(), streamer.New(ctx), projecter.New( ctx, projecter.DefaultProjection, peerfinder.Projector, ), - resolvelinks.New(), ) if err != nil { span.RecordError(err) @@ -118,12 +127,16 @@ func run(ctx context.Context) error { if enable.Has("peers") { span.AddEvent("Enable Peers") - peers, err := peerfinder.New(ctx, es) + peers, err := peerfinder.New(ctx, es, env("PEER_STATUS", "")) if err != nil { span.RecordError(err) return err } svcs = append(svcs, peers) + cron.Once(ctx, peers.RefreshJob) + cron.NewJob("0,15,30,45", peers.RefreshJob) + cron.Once(ctx, peers.CleanJob) + cron.NewJob("0 1", peers.CleanJob) } if enable.Has("gql") { @@ -162,6 +175,8 @@ func run(ctx context.Context) error { span.End() } + g.Go(func() error { return cron.Run(ctx) }) + if err := g.Wait(); err != nil && err != http.ErrServerClosed { return err } @@ -177,3 +192,14 @@ func env(name, defaultValue string) string { log.Println("#", name, "=", defaultValue, "(default)") return defaultValue } + +var appName, version = func() (string, string) { + if info, ok := debug.ReadBuildInfo(); ok { + _, name, _ := strings.Cut(info.Main.Path, "/") + name = strings.Replace(name, "-", ".", -1) + name = strings.Replace(name, "/", "-", -1) + return name, info.Main.Version + } + + return "sour.is-ev", "(devel)" +}()