From 250395d6b38d75b6d30661dabcab279b0c2527f0 Mon Sep 17 00:00:00 2001 From: Jon Lundy Date: Mon, 19 Dec 2022 10:50:38 -0700 Subject: [PATCH] fix: request cleanup jobs --- app/peerfinder/assets/peerfinder.css | 18 ++ app/peerfinder/ev-peer.go | 28 +- app/peerfinder/ev-request.go | 75 ++++- app/peerfinder/http.go | 299 +++++++++++------- app/peerfinder/jobs.go | 129 ++++---- .../layouts/{main.tpl => main.go.tpl} | 0 .../pages/{home.tpl => home.go.tpl} | 11 +- app/peerfinder/pages/{req.tpl => req.go.tpl} | 2 +- app/peerfinder/service.go | 16 +- main.go | 7 +- pkg/es/driver/driver.go | 1 + pkg/es/driver/projecter/projecter.go | 17 +- pkg/es/driver/resolve-links/resolve-links.go | 75 +++-- pkg/es/es.go | 16 +- pkg/es/es_test.go | 53 +++- pkg/es/event/events.go | 28 ++ pkg/set/set.go | 13 +- pkg/set/set_test.go | 14 + 18 files changed, 575 insertions(+), 227 deletions(-) rename app/peerfinder/layouts/{main.tpl => main.go.tpl} (100%) rename app/peerfinder/pages/{home.tpl => home.go.tpl} (77%) rename app/peerfinder/pages/{req.tpl => req.go.tpl} (95%) diff --git a/app/peerfinder/assets/peerfinder.css b/app/peerfinder/assets/peerfinder.css index 2c7b4c4..f6ed2d1 100644 --- a/app/peerfinder/assets/peerfinder.css +++ b/app/peerfinder/assets/peerfinder.css @@ -37,4 +37,22 @@ body { .container-narrow > hr { margin: 30px 0; +} + +.table tbody tr th { + width: 70% +} + +@media (prefers-color-scheme: dark) { + body, .panel-body { + color: white; + background-color: #121212; + } + .table-striped > tbody > tr:nth-of-type(2n+1) { + background-color: darkslategray; + } +} + +@media (prefers-color-scheme: light) { + } \ No newline at end of file diff --git a/app/peerfinder/ev-peer.go b/app/peerfinder/ev-peer.go index a56ca45..c2bd8aa 100644 --- a/app/peerfinder/ev-peer.go +++ b/app/peerfinder/ev-peer.go @@ -5,6 +5,10 @@ import ( "strconv" "strings" "time" + + "github.com/keys-pub/keys/json" + "github.com/sour-is/ev/pkg/es/event" + "github.com/sour-is/ev/pkg/set" ) type Time time.Time @@ -49,7 +53,9 @@ func (t *ipFamily) UnmarshalJSON(b []byte) error { type peerType []string func (t *peerType) UnmarshalJSON(b []byte) error { - *t = strings.Split(strings.Trim(string(b), `"`), ",") + var bs string + json.Unmarshal(b, &bs) + *t = strings.Split(bs, ",") return nil } @@ -70,7 +76,7 @@ func (p *Peer) CanSupport(ip string) bool { if addr == nil { return false } - if !addr.IsGlobalUnicast() { + if !(addr.IsGlobalUnicast() || addr.IsLoopback() || addr.IsPrivate()) { return false } @@ -85,3 +91,21 @@ func (p *Peer) CanSupport(ip string) bool { return true } + +type PeerResults struct { + set.Set[string] + event.AggregateRoot +} + +func (p *PeerResults) ApplyEvent(lis ...event.Event) { + for _, e := range lis { + switch e := e.(type) { + case *ResultSubmitted: + if p.Set == nil { + p.Set = set.New[string]() + } + p.Set.Add(e.RequestID) + } + } +} + diff --git a/app/peerfinder/ev-request.go b/app/peerfinder/ev-request.go index 35b77ae..0d5484b 100644 --- a/app/peerfinder/ev-request.go +++ b/app/peerfinder/ev-request.go @@ -10,6 +10,7 @@ import ( "github.com/oklog/ulid" "github.com/sour-is/ev/pkg/es/event" + "github.com/sour-is/ev/pkg/set" ) type Request struct { @@ -19,8 +20,11 @@ type Request struct { RequestIP string `json:"req_ip"` Hidden bool `json:"hide,omitempty"` Created time.Time `json:"req_created"` + Family int `json:"family"` - Responses []Response `json:"responses"` + Responses []*Response `json:"responses"` + peers set.Set[string] + initial *RequestSubmitted } var _ event.Aggregate = (*Request)(nil) @@ -33,8 +37,19 @@ func (a *Request) ApplyEvent(lis ...event.Event) { a.RequestIP = e.RequestIP a.Hidden = e.Hidden a.Created = ulid.Time(e.EventMeta().EventID.Time()) + a.Family = e.Family() + + a.initial = e case *ResultSubmitted: - a.Responses = append(a.Responses, Response{ + if a.peers == nil { + a.peers = set.New[string]() + } + if a.peers.Has(e.PeerID) { + continue + } + + a.peers.Add(e.PeerID) + a.Responses = append(a.Responses, &Response{ PeerID: e.PeerID, ScriptVersion: e.PeerVersion, Latency: e.Latency, @@ -50,6 +65,25 @@ func (a *Request) ApplyEvent(lis ...event.Event) { } } +func (a *Request) MarshalEnviron() ([]byte, error) { + return a.initial.MarshalEnviron() +} +func (a *Request) CreatedString() string { + return a.Created.Format("2006-01-02 15:04:05") +} + +type ListRequest []*Request + +func (lis ListRequest) Len() int { + return len(lis) +} +func (lis ListRequest) Less(i, j int) bool { + return lis[i].Created.Before(lis[j].Created) +} +func (lis ListRequest) Swap(i, j int) { + lis[i], lis[j] = lis[j], lis[i] +} + type Response struct { Peer *Peer `json:"peer"` PeerID string `json:"-"` @@ -66,12 +100,15 @@ type Response struct { Created time.Time `json:"res_created"` } -type ListResponse []Response +type ListResponse []*Response func (lis ListResponse) Len() int { return len(lis) } func (lis ListResponse) Less(i, j int) bool { + if lis[i].Latency == 0.0 { + return false + } return lis[i].Latency < lis[j].Latency } func (lis ListResponse) Swap(i, j int) { @@ -112,6 +149,9 @@ func (r *RequestSubmitted) Family() int { return 2 } } +func (r *RequestSubmitted) String() string { + return fmt.Sprint(r.eventMeta.EventID, r.RequestIP, r.Hidden, r.CreatedString()) +} var _ event.Event = (*RequestSubmitted)(nil) @@ -200,3 +240,32 @@ func (e *ResultSubmitted) UnmarshalBinary(b []byte) error { func (e *ResultSubmitted) String() string { return fmt.Sprintf("id: %s\npeer: %s\nversion: %s\nlatency: %0.4f", e.RequestID, e.PeerID, e.PeerVersion, e.Latency) } + +type RequestTruncated struct { + RequestID string + + eventMeta event.Meta +} + +var _ event.Event = (*RequestTruncated)(nil) + +func (e *RequestTruncated) EventMeta() event.Meta { + if e == nil { + return event.Meta{} + } + return e.eventMeta +} +func (e *RequestTruncated) SetEventMeta(m event.Meta) { + if e != nil { + e.eventMeta = m + } +} +func (e *RequestTruncated) MarshalBinary() (text []byte, err error) { + return json.Marshal(e) +} +func (e *RequestTruncated) UnmarshalBinary(b []byte) error { + return json.Unmarshal(b, e) +} +func (e *RequestTruncated) String() string { + return fmt.Sprintf("request truncated id: %s\n", e.RequestID) +} diff --git a/app/peerfinder/http.go b/app/peerfinder/http.go index 7a67a6f..6868be8 100644 --- a/app/peerfinder/http.go +++ b/app/peerfinder/http.go @@ -15,7 +15,7 @@ import ( "strconv" "strings" - "github.com/oklog/ulid" + "github.com/oklog/ulid/v2" contentnegotiation "gitlab.com/jamietanna/content-negotiation-go" "go.opentelemetry.io/otel/attribute" @@ -58,6 +58,10 @@ func (s *service) RegisterHTTP(mux *http.ServeMux) { mux.Handle("/peers/", lg.Htrace(s, "peers")) } +func (s *service) Setup() error { + return nil +} + // ServeHTTP handle HTTP requests func (s *service) ServeHTTP(w http.ResponseWriter, r *http.Request) { ctx := r.Context() @@ -67,6 +71,12 @@ func (s *service) ServeHTTP(w http.ResponseWriter, r *http.Request) { r = r.WithContext(ctx) + if !s.up.Load() { + w.WriteHeader(http.StatusFailedDependency) + fmt.Fprint(w, "Starting up...") + return + } + switch r.Method { case http.MethodGet: switch { @@ -81,11 +91,14 @@ func (s *service) ServeHTTP(w http.ResponseWriter, r *http.Request) { case strings.HasPrefix(r.URL.Path, "/peers/status"): s.state.Modify(r.Context(), func(ctx context.Context, state *state) error { - for id, p := range state.requests { - fmt.Fprintln(w, "REQ: ", id, p.RequestIP, len(p.Responses)) - for id, r := range p.Responses { + for id, rq := range state.requests { + fmt.Fprintln(w, "REQ: ", id, rq.RequestIP, len(rq.Responses)) + for id, r := range rq.Responses { fmt.Fprintln(w, " RES: ", id, r.PeerID[24:], r.Latency, r.Jitter) } + for p := range rq.peers { + fmt.Fprintln(w, " PEER: ", p[24:]) + } } return nil @@ -114,19 +127,19 @@ func (s *service) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } } -func (s *service) getPending(w http.ResponseWriter, r *http.Request, uuid string) { +func (s *service) getPending(w http.ResponseWriter, r *http.Request, peerID string) { ctx, span := lg.Span(r.Context()) defer span.End() span.SetAttributes( - attribute.String("uuid", uuid), + attribute.String("peerID", peerID), ) var peer *Peer err := s.state.Modify(ctx, func(ctx context.Context, state *state) error { var ok bool - if peer, ok = state.peers[uuid]; !ok { - return fmt.Errorf("peer not found: %s", uuid) + if peer, ok = state.peers[peerID]; !ok { + return fmt.Errorf("peer not found: %s", peerID) } return nil @@ -153,16 +166,30 @@ func (s *service) getPending(w http.ResponseWriter, r *http.Request, uuid string return } - responses, err := s.es.Read(ctx, aggPeer(uuid), -1, -30) + peerResults := &PeerResults{} + peerResults.SetStreamID(aggPeer(peerID)) + err = s.es.Load(ctx, peerResults) if err != nil { - span.RecordError(err) - w.WriteHeader(http.StatusInternalServerError) - return + span.RecordError(fmt.Errorf("peer not found: %w", err)) + w.WriteHeader(http.StatusNotFound) } - span.AddEvent(fmt.Sprintf("req = %d, res = %d", len(requests), len(responses))) + var req *Request + for _, e := range requests { + r := &Request{} + r.ApplyEvent(e) - req := filter(peer, requests, responses) + if !peerResults.Has(r.RequestID) { + if !peer.CanSupport(r.RequestIP) { + continue + } + req = r + } + } + if req == nil { + span.RecordError(fmt.Errorf("request not found")) + w.WriteHeader(http.StatusNoContent) + } negotiator := contentnegotiation.NewNegotiator("application/json", "text/environment", "text/plain", "text/html") negotiated, _, err := negotiator.Negotiate(r.Header.Get("Accept")) @@ -190,9 +217,9 @@ func (s *service) getPending(w http.ResponseWriter, r *http.Request, uuid string Created string `json:"req_created"` }{ info.ScriptVersion, - req.RequestID(), + req.RequestID, req.RequestIP, - strconv.Itoa(req.Family()), + strconv.Itoa(req.Family), req.CreatedString(), } } @@ -204,34 +231,53 @@ func (s *service) getResults(w http.ResponseWriter, r *http.Request) { ctx, span := lg.Span(r.Context()) defer span.End() - events, err := s.es.Read(ctx, queueRequests, -1, -30) - if err != nil { - span.RecordError(err) - w.WriteHeader(http.StatusInternalServerError) - return - } + // events, err := s.es.Read(ctx, queueRequests, -1, -30) + // if err != nil { + // span.RecordError(err) + // w.WriteHeader(http.StatusInternalServerError) + // return + // } - requests := make([]*Request, len(events)) - for i, req := range events { - if req, ok := req.(*RequestSubmitted); ok { - requests[i], err = s.loadResult(ctx, req.RequestID()) - if err != nil { - span.RecordError(err) - w.WriteHeader(http.StatusInternalServerError) - return + // requests := make([]*Request, len(events)) + // for i, req := range events { + // if req, ok := req.(*RequestSubmitted); ok { + // requests[i], err = s.loadResult(ctx, req.RequestID()) + // if err != nil { + // span.RecordError(err) + // w.WriteHeader(http.StatusInternalServerError) + // return + // } + // } + // } + + var requests ListRequest + s.state.Modify(ctx, func(ctx context.Context, state *state) error { + requests = make([]*Request, 0, len(state.requests)) + + for _, req := range state.requests { + if req.RequestID == "" { + continue } + if req.Hidden { + continue + } + + requests = append(requests, req) } - } + + return nil + }) + sort.Sort(sort.Reverse(requests)) args := requestArgs(r) - args.Requests = requests + args.Requests = requests[:maxResults] s.state.Modify(ctx, func(ctx context.Context, state *state) error { args.CountPeers = len(state.peers) return nil }) - t := templates["home.tpl"] + t := templates["home.go.tpl"] t.Execute(w, args) } func (s *service) getResultsForRequest(w http.ResponseWriter, r *http.Request, uuid string) { @@ -242,7 +288,20 @@ func (s *service) getResultsForRequest(w http.ResponseWriter, r *http.Request, u attribute.String("uuid", uuid), ) - request, err := s.loadResult(ctx, uuid) + var request *Request + err := s.state.Modify(ctx, func(ctx context.Context, state *state) error { + request = state.requests[uuid] + + return nil + }) + if err != nil { + w.WriteHeader(http.StatusNotFound) + return + } + + request, err = s.loadResult(ctx, request) + + // request, err := s.loadResult(ctx, uuid) if err != nil { w.WriteHeader(http.StatusInternalServerError) return @@ -265,7 +324,7 @@ func (s *service) getResultsForRequest(w http.ResponseWriter, r *http.Request, u args := requestArgs(r) args.Requests = append(args.Requests, request) span.AddEvent(fmt.Sprint(args)) - err := renderTo(w, "req.tpl", args) + err := renderTo(w, "req.go.tpl", args) span.RecordError(err) return @@ -295,7 +354,7 @@ func (s *service) postRequest(w http.ResponseWriter, r *http.Request) { req := &RequestSubmitted{ RequestIP: ip.String(), } - if hidden, err := strconv.ParseBool(r.Form.Get("req_hidden")); err != nil { + if hidden, err := strconv.ParseBool(r.Form.Get("req_hidden")); err == nil { req.Hidden = hidden } @@ -307,15 +366,15 @@ func (s *service) postRequest(w http.ResponseWriter, r *http.Request) { http.Redirect(w, r, "/peers/req/"+req.RequestID(), http.StatusSeeOther) } -func (s *service) postResult(w http.ResponseWriter, r *http.Request, id string) { +func (s *service) postResult(w http.ResponseWriter, r *http.Request, reqID string) { ctx, span := lg.Span(r.Context()) defer span.End() span.SetAttributes( - attribute.String("id", id), + attribute.String("id", reqID), ) - if _, err := ulid.ParseStrict(id); err != nil { + if _, err := ulid.ParseStrict(reqID); err != nil { w.WriteHeader(http.StatusNotFound) return } @@ -336,10 +395,11 @@ func (s *service) postResult(w http.ResponseWriter, r *http.Request, id string) ) peerID := r.Form.Get("peer_id") + err := s.state.Modify(ctx, func(ctx context.Context, state *state) error { var ok bool if _, ok = state.peers[peerID]; !ok { - // fmt.Printf("peer not found: %s\n", peerID) + log.Printf("peer not found: %s\n", peerID) return fmt.Errorf("peer not found: %s", peerID) } @@ -351,6 +411,20 @@ func (s *service) postResult(w http.ResponseWriter, r *http.Request, id string) return } + peerResults := &PeerResults{} + peerResults.SetStreamID(aggPeer(peerID)) + err = s.es.Load(ctx, peerResults) + if err != nil { + span.RecordError(fmt.Errorf("peer not found: %w", err)) + w.WriteHeader(http.StatusNotFound) + } + + if peerResults.Has(reqID) { + span.RecordError(fmt.Errorf("request previously recorded: req=%v peer=%v", reqID, peerID)) + w.WriteHeader(http.StatusAlreadyReported) + return + } + var unreach bool latency, err := strconv.ParseFloat(r.Form.Get("res_latency"), 64) if err != nil { @@ -358,7 +432,7 @@ func (s *service) postResult(w http.ResponseWriter, r *http.Request, id string) } req := &ResultSubmitted{ - RequestID: id, + RequestID: reqID, PeerID: r.Form.Get("peer_id"), PeerVersion: r.Form.Get("peer_version"), Latency: latency, @@ -385,21 +459,7 @@ func (s *service) postResult(w http.ResponseWriter, r *http.Request, id string) attribute.Stringer("result", req), ) - s.state.Modify(ctx, func(ctx context.Context, state *state) error { - - return nil - }) - - idx, err := s.es.LastIndex(ctx, aggRequest(id)) - if err != nil { - w.WriteHeader(http.StatusInternalServerError) - return - } - if idx == 0 { - w.WriteHeader(http.StatusNotFound) - return - } - + log.Printf("record result: %v", req) s.es.Append(ctx, queueResults, event.NewEvents(req)) } @@ -452,7 +512,7 @@ func loadTemplates() error { } pt := template.New(tmpl.Name()) pt.Funcs(funcMap) - pt, err = pt.ParseFS(files, "pages/"+tmpl.Name(), "layouts/*.tpl") + pt, err = pt.ParseFS(files, "pages/"+tmpl.Name(), "layouts/*.go.tpl") if err != nil { log.Println(err) @@ -468,27 +528,44 @@ var funcMap = map[string]any{ "countResponses": fnCountResponses, } -func fnOrderByPeer(rq *Request) any { - type peerResult struct { - Name string - Country string - Latency float64 - Jitter float64 - } - type peer struct { - Name string - Note string - Nick string - Country string - Latency float64 - Jitter float64 - VPNTypes []string +type peerResult struct { + Name string + Country string + Latency float64 + Jitter float64 +} +type peer struct { + Name string + Note string + Nick string + Country string + Latency float64 + Jitter float64 + VPNTypes []string - Results []peerResult + Results []peerResult +} +type listPeer []peer + +func (lis listPeer) Len() int { + return len(lis) +} +func (lis listPeer) Less(i, j int) bool { + if lis[i].Latency == 0.0 { + return false } + return lis[i].Latency < lis[j].Latency +} +func (lis listPeer) Swap(i, j int) { + lis[i], lis[j] = lis[j], lis[i] +} + +func fnOrderByPeer(rq *Request) any { peers := make(map[string]peer) + sort.Sort(ListResponse(rq.Responses)) + for _, rs := range rq.Responses { p, ok := peers[rs.Peer.Owner] @@ -512,7 +589,14 @@ func fnOrderByPeer(rq *Request) any { peers[rs.Peer.Owner] = p } - return peers + peerList := make(listPeer, 0, len(peers)) + for _, v := range peers { + peerList = append(peerList, v) + } + + sort.Sort(peerList) + + return peerList } func fnCountResponses(rq *Request) int { count := 0 @@ -523,37 +607,38 @@ func fnCountResponses(rq *Request) int { } return count } -func filter(peer *Peer, requests, responses event.Events) *RequestSubmitted { - have := make(map[string]struct{}, len(responses)) - for _, res := range toList[ResultSubmitted](responses...) { - have[res.RequestID] = struct{}{} - } - for _, req := range reverse(toList[RequestSubmitted](requests...)...) { - if _, ok := have[req.RequestID()]; !ok { - if !peer.CanSupport(req.RequestIP) { - continue - } - return req - } - } - return nil -} -func toList[E any, T es.PE[E]](lis ...event.Event) []T { - newLis := make([]T, 0, len(lis)) - for i := range lis { - if e, ok := lis[i].(T); ok { - newLis = append(newLis, e) - } - } - return newLis -} -func reverse[T any](s ...T) []T { - first, last := 0, len(s)-1 - for first < last { - s[first], s[last] = s[last], s[first] - first++ - last-- - } - return s -} +// func filter(peer *Peer, requests, responses event.Events) *RequestSubmitted { +// have := make(map[string]struct{}, len(responses)) +// for _, res := range toList[ResultSubmitted](responses...) { +// have[res.RequestID] = struct{}{} +// } +// for _, req := range reverse(toList[RequestSubmitted](requests...)...) { +// if _, ok := have[req.RequestID()]; !ok { +// if !peer.CanSupport(req.RequestIP) { +// continue +// } + +// return req +// } +// } +// return nil +// } +// func toList[E any, T es.PE[E]](lis ...event.Event) []T { +// newLis := make([]T, 0, len(lis)) +// for i := range lis { +// if e, ok := lis[i].(T); ok { +// newLis = append(newLis, e) +// } +// } +// return newLis +// } +// func reverse[T any](s ...T) []T { +// first, last := 0, len(s)-1 +// for first < last { +// s[first], s[last] = s[last], s[first] +// first++ +// last-- +// } +// return s +// } diff --git a/app/peerfinder/jobs.go b/app/peerfinder/jobs.go index bf818f3..a010a3d 100644 --- a/app/peerfinder/jobs.go +++ b/app/peerfinder/jobs.go @@ -5,12 +5,13 @@ import ( "encoding/json" "errors" "fmt" + "log" "net/http" "time" "github.com/sour-is/ev/internal/lg" "github.com/sour-is/ev/pkg/es" - "github.com/sour-is/ev/pkg/math" + "github.com/sour-is/ev/pkg/es/event" "github.com/sour-is/ev/pkg/set" ) @@ -40,8 +41,6 @@ func (s *service) RefreshJob(ctx context.Context, _ time.Time) error { return err } - span.AddEvent(fmt.Sprintf("processed %d peers", len(peers))) - err = s.state.Modify(ctx, func(ctx context.Context, t *state) error { for _, peer := range peers { t.peers[peer.ID] = peer @@ -49,10 +48,24 @@ func (s *service) RefreshJob(ctx context.Context, _ time.Time) error { return nil }) + span.RecordError(err) + if err != nil { + return err + } + + log.Printf("processed %d peers", len(peers)) + span.AddEvent(fmt.Sprintf("processed %d peers", len(peers))) + + s.up.Store(true) + + err = s.cleanPeerJobs(ctx) + span.RecordError(err) return err } +const maxResults = 30 + // CleanJob truncates streams old request data func (s *service) CleanJob(ctx context.Context, now time.Time) error { ctx, span := lg.Span(ctx) @@ -60,13 +73,13 @@ func (s *service) CleanJob(ctx context.Context, now time.Time) error { span.AddEvent("clear peerfinder requests") - endRequestID, err := s.cleanRequests(ctx, now) + err := s.cleanRequests(ctx, now) if err != nil { return err } - if err = s.cleanResults(ctx, endRequestID); err != nil { - return err - } + // if err = s.cleanResults(ctx, endRequestID); err != nil { + // return err + // } return s.cleanPeerJobs(ctx) } @@ -96,12 +109,12 @@ func (s *service) cleanPeerJobs(ctx context.Context) error { if err != nil { return err } - newFirst := math.Max(int64(last-30), int64(first)) - if last == 0 || newFirst == int64(first) { - // fmt.Println("SKIP", streamID, first, newFirst, last) - span.AddEvent(fmt.Sprint("SKIP", streamID, first, newFirst, last)) + if last-first < maxResults { + fmt.Println("SKIP", streamID, first, last) continue } + + newFirst := int64(last - 30) // fmt.Println("TRUNC", streamID, first, newFirst, last) span.AddEvent(fmt.Sprint("TRUNC", streamID, first, newFirst, last)) err = s.es.Truncate(ctx, streamID, int64(newFirst)) @@ -112,41 +125,47 @@ func (s *service) cleanPeerJobs(ctx context.Context) error { return nil } -func (s *service) cleanRequests(ctx context.Context, now time.Time) (string, error) { +func (s *service) cleanRequests(ctx context.Context, now time.Time) error { ctx, span := lg.Span(ctx) defer span.End() var streamIDs []string - var endPosition uint64 - var endRequestID string + var startPosition, endPosition int64 + first, err := s.es.FirstIndex(ctx, queueRequests) + if err != nil { + return err + } last, err := s.es.LastIndex(ctx, queueRequests) if err != nil { - return "", err + return err } -end: + if last-first < maxResults { + // fmt.Println("SKIP", queueRequests, first, last) + return nil + } + + startPosition = int64(first - 1) + endPosition = int64(last - maxResults) + for { - events, err := s.es.Read(ctx, queueRequests, int64(endPosition), 1000) // read 1000 from the top each loop. + events, err := s.es.Read(ctx, queueRequests, startPosition, 1000) // read 1000 from the top each loop. if err != nil && !errors.Is(err, es.ErrNotFound) { span.RecordError(err) - return "", err + return err } if len(events) == 0 { break } - endPosition = events.Last().EventMeta().ActualPosition + startPosition = int64(events.Last().EventMeta().ActualPosition) for _, event := range events { switch e := event.(type) { case *RequestSubmitted: - if e.eventMeta.ActualPosition < last-30 { - streamIDs = append(streamIDs, aggRequest(e.RequestID())) - } else { - endRequestID = e.RequestID() - endPosition = e.eventMeta.ActualPosition - break end + if e.eventMeta.ActualPosition < last-maxResults { + streamIDs = append(streamIDs, e.RequestID()) } } } @@ -157,59 +176,39 @@ end: span.AddEvent(fmt.Sprint("TRUNC", queueRequests, int64(endPosition), last)) err = s.es.Truncate(ctx, queueRequests, int64(endPosition)) if err != nil { - return "", err + return err } // truncate all the request streams for _, streamID := range streamIDs { - last, err := s.es.LastIndex(ctx, streamID) - if err != nil { - return "", err - } - // fmt.Println("TRUNC", streamID, last) - span.AddEvent(fmt.Sprint("TRUNC", streamID, last)) - err = s.es.Truncate(ctx, streamID, int64(last)) - if err != nil { - return "", err - } - } + s.state.Modify(ctx, func(ctx context.Context, state *state) error { + return state.ApplyEvents(event.NewEvents(&RequestTruncated{ + RequestID: streamID, + })) + }) - return endRequestID, nil -} -func (s *service) cleanResults(ctx context.Context, endRequestID string) error { - ctx, span := lg.Span(ctx) - defer span.End() - - var endPosition uint64 - - done := false - for !done { - events, err := s.es.Read(ctx, queueResults, int64(endPosition), 1000) // read 30 from the top each loop. + err := s.cleanResult(ctx, streamID) if err != nil { return err } + } - if len(events) == 0 { - done = true - continue - } + return nil +} +func (s *service) cleanResult(ctx context.Context, requestID string) error { + ctx, span := lg.Span(ctx) + defer span.End() - endPosition = events.Last().EventMeta().ActualPosition + streamID := aggRequest(requestID) - for _, event := range events { - switch e := event.(type) { - case *ResultSubmitted: - if e.RequestID == endRequestID { - done = true - endPosition = e.eventMeta.ActualPosition - } - } - } + last, err := s.es.LastIndex(ctx, streamID) + if err != nil { + return err } // truncate all reqs to found end position - // fmt.Println("TRUNC", queueResults, int64(endPosition), last) - span.AddEvent(fmt.Sprint("TRUNC", queueResults, int64(endPosition))) - err := s.es.Truncate(ctx, queueResults, int64(endPosition)) + // fmt.Println("TRUNC", streamID, last) + span.AddEvent(fmt.Sprint("TRUNC", streamID, last)) + err = s.es.Truncate(ctx, streamID, int64(last)) if err != nil { return err } diff --git a/app/peerfinder/layouts/main.tpl b/app/peerfinder/layouts/main.go.tpl similarity index 100% rename from app/peerfinder/layouts/main.tpl rename to app/peerfinder/layouts/main.go.tpl diff --git a/app/peerfinder/pages/home.tpl b/app/peerfinder/pages/home.go.tpl similarity index 77% rename from app/peerfinder/pages/home.tpl rename to app/peerfinder/pages/home.go.tpl index 89f131d..b7072ac 100644 --- a/app/peerfinder/pages/home.tpl +++ b/app/peerfinder/pages/home.go.tpl @@ -32,7 +32,7 @@
- +
@@ -48,10 +48,13 @@ {{range $req := .Requests}}
-
Request ID: {{ $req.RequestID }}
{{end}} {{end}} diff --git a/app/peerfinder/pages/req.tpl b/app/peerfinder/pages/req.go.tpl similarity index 95% rename from app/peerfinder/pages/req.tpl rename to app/peerfinder/pages/req.go.tpl index bc396d0..f3f1289 100644 --- a/app/peerfinder/pages/req.tpl +++ b/app/peerfinder/pages/req.go.tpl @@ -7,7 +7,7 @@ {{define "content"}} {{range .Requests}} -

Results to {{.RequestIP}}

+

Results to {{.RequestIP}}{{if .Hidden}} 👁️{{end}}

{{with (orderByPeer .)}} {{range .}} diff --git a/app/peerfinder/service.go b/app/peerfinder/service.go index 502259e..2c06c97 100644 --- a/app/peerfinder/service.go +++ b/app/peerfinder/service.go @@ -3,6 +3,7 @@ package peerfinder import ( "context" "fmt" + "sync/atomic" "time" "github.com/sour-is/ev/internal/lg" @@ -27,6 +28,7 @@ type service struct { statusURL string state *locker.Locked[state] + up atomic.Bool stop func() } @@ -56,17 +58,15 @@ func New(ctx context.Context, es *es.EventStore, statusURL string) (*service, er return svc, nil } -func (s *service) loadResult(ctx context.Context, uuid string) (*Request, error) { - request := &Request{} - request.SetStreamID(aggRequest(uuid)) - err := s.es.Load(ctx, request) - if err != nil { - return nil, err +func (s *service) loadResult(ctx context.Context, request *Request) (*Request, error) { + if request == nil { + return request, nil } return request, s.state.Modify(ctx, func(ctx context.Context, t *state) error { + for i := range request.Responses { - res := &request.Responses[i] + res := request.Responses[i] if peer, ok := t.peers[res.PeerID]; ok { res.Peer = peer res.Peer.ID = "" @@ -146,6 +146,8 @@ func (s *state) ApplyEvents(events event.Events) error { s.requests[e.RequestID] = &Request{} } s.requests[e.RequestID].ApplyEvent(e) + case *RequestTruncated: + delete(s.requests, e.RequestID) } } diff --git a/main.go b/main.go index 40e0cfe..c749a42 100644 --- a/main.go +++ b/main.go @@ -83,7 +83,6 @@ func run(ctx context.Context) error { projecter.New( ctx, projecter.DefaultProjection, - peerfinder.Projector, ), ) if err != nil { @@ -135,6 +134,8 @@ func run(ctx context.Context) error { if enable.Has("peers") { span.AddEvent("Enable Peers") + es.Option(projecter.New(ctx, peerfinder.Projector)) + peers, err := peerfinder.New(ctx, es, env("PEER_STATUS", "")) if err != nil { span.RecordError(err) @@ -221,6 +222,10 @@ var appName, version = func() (string, string) { return "sour.is-ev", "(devel)" }() +type application interface { + Setup(context.Context) error +} + type stopFns struct { fns []func(context.Context) error } diff --git a/pkg/es/driver/driver.go b/pkg/es/driver/driver.go index 10ac089..8387de6 100644 --- a/pkg/es/driver/driver.go +++ b/pkg/es/driver/driver.go @@ -19,6 +19,7 @@ type EventLog interface { FirstIndex(context.Context) (uint64, error) LastIndex(context.Context) (uint64, error) } + type EventLogWithTruncate interface { Truncate(context.Context, int64) error } diff --git a/pkg/es/driver/projecter/projecter.go b/pkg/es/driver/projecter/projecter.go index ea1ea45..39f62fe 100644 --- a/pkg/es/driver/projecter/projecter.go +++ b/pkg/es/driver/projecter/projecter.go @@ -16,10 +16,22 @@ type projector struct { fns []func(event.Event) []event.Event } -func New(ctx context.Context, fns ...func(event.Event) []event.Event) *projector { +func New(_ context.Context, fns ...func(event.Event) []event.Event) *projector { return &projector{fns: fns} } func (p *projector) Apply(e *es.EventStore) { + + up := e.Driver + for up != nil { + if op, ok := up.(*projector); ok { + op.AddProjections(p.fns...) + p.up = op.up + return + } + + up = es.Unwrap(up) + } + p.up = e.Driver e.Driver = p } @@ -39,6 +51,9 @@ func (s *projector) EventLog(ctx context.Context, streamID string) (driver.Event l, err := s.up.EventLog(ctx, streamID) return &wrapper{l, s}, err } +func (s *projector) AddProjections(fns ...func(event.Event) []event.Event) { + s.fns = append(s.fns, fns...) +} type wrapper struct { up driver.EventLog diff --git a/pkg/es/driver/resolve-links/resolve-links.go b/pkg/es/driver/resolve-links/resolve-links.go index 2c21098..7ec91cc 100644 --- a/pkg/es/driver/resolve-links/resolve-links.go +++ b/pkg/es/driver/resolve-links/resolve-links.go @@ -17,13 +17,16 @@ type resolvelinks struct { func New() *resolvelinks { return &resolvelinks{} } + func (r *resolvelinks) Apply(es *es.EventStore) { r.up = es.Driver es.Driver = r } + func (r *resolvelinks) Unwrap() driver.Driver { return r.up } + func (r *resolvelinks) Open(ctx context.Context, dsn string) (driver.Driver, error) { ctx, span := lg.Span(ctx) defer span.End() @@ -47,6 +50,7 @@ type wrapper struct { func (r *wrapper) Unwrap() driver.EventLog { return r.up } + func (w *wrapper) Read(ctx context.Context, after int64, count int64) (event.Events, error) { ctx, span := lg.Span(ctx) defer span.End() @@ -56,29 +60,60 @@ func (w *wrapper) Read(ctx context.Context, after int64, count int64) (event.Eve return nil, err } - for i, e := range events { - switch e := e.(type) { - case *event.EventPtr: - d, err := w.resolvelinks.EventLog(ctx, e.StreamID) - if err != nil { - return nil, err - } - lis, err := d.ReadN(ctx, e.Pos) - if err != nil && !errors.Is(err, es.ErrNotFound) { - return nil, err - } - - if ne := lis.First(); ne != event.NilEvent { - meta := ne.EventMeta() - actual := e.EventMeta() - meta.ActualPosition = actual.Position - meta.ActualStreamID = actual.ActualStreamID - ne.SetEventMeta(meta) - events[i] = ne - } + idx := make(map[string][]uint64) + ptrs := make(map[string][]int) + for i := range events { + e := events[i] + if e, ok := e.(*event.EventPtr); ok { + idx[e.StreamID] = append(idx[e.StreamID], e.Pos) + ptrs[e.StreamID] = append(ptrs[e.StreamID], i) } } + for streamID, ids := range idx { + d, err := w.resolvelinks.EventLog(ctx, streamID) + if err != nil { + return nil, err + } + ptr := ptrs[streamID] + lis, err := d.ReadN(ctx, ids...) + if err != nil && !errors.Is(err, es.ErrNotFound) { + return nil, err + } + + for i := range lis { + meta := lis[i].EventMeta() + actual := events[ptr[i]].EventMeta() + meta.ActualPosition = actual.Position + meta.ActualStreamID = actual.ActualStreamID + lis[i].SetEventMeta(meta) + events[i] = lis[i] + } + } + + // for i, e := range events { + // switch e := e.(type) { + // case *event.EventPtr: + // d, err := w.resolvelinks.EventLog(ctx, e.StreamID) + // if err != nil { + // return nil, err + // } + // lis, err := d.ReadN(ctx, e.Pos) + // if err != nil && !errors.Is(err, es.ErrNotFound) { + // return nil, err + // } + + // if ne := lis.First(); ne != event.NilEvent { + // meta := ne.EventMeta() + // actual := e.EventMeta() + // meta.ActualPosition = actual.Position + // meta.ActualStreamID = actual.ActualStreamID + // ne.SetEventMeta(meta) + // events[i] = ne + // } + // } + // } + return events, err } diff --git a/pkg/es/es.go b/pkg/es/es.go index 70204d0..edf5d99 100644 --- a/pkg/es/es.go +++ b/pkg/es/es.go @@ -100,15 +100,19 @@ func Open(ctx context.Context, dsn string, options ...Option) (*EventStore, erro conn, err := d.Open(ctx, dsn) es := &EventStore{Driver: conn} - for _, o := range options { - o.Apply(es) - } + es.Option(options...) Mes_open.Add(ctx, 1) return es, err } +func (es *EventStore) Option(options ...Option) { + for _, o := range options { + o.Apply(es) + } +} + type Option interface { Apply(*EventStore) } @@ -282,10 +286,14 @@ func (es *EventStore) Truncate(ctx context.Context, streamID string, index int64 for up != nil { if up, ok := up.(driver.EventLogWithTruncate); ok { - return up.Truncate(ctx, index) + err = up.Truncate(ctx, index) + if err != nil { + return err + } } up = Unwrap(up) } + return ErrNoDriver } diff --git a/pkg/es/es_test.go b/pkg/es/es_test.go index 7300415..d5bd3ff 100644 --- a/pkg/es/es_test.go +++ b/pkg/es/es_test.go @@ -9,10 +9,13 @@ import ( "time" "github.com/matryer/is" + "go.uber.org/multierr" + "github.com/sour-is/ev/app/peerfinder" "github.com/sour-is/ev/pkg/es" memstore "github.com/sour-is/ev/pkg/es/driver/mem-store" "github.com/sour-is/ev/pkg/es/driver/projecter" + resolvelinks "github.com/sour-is/ev/pkg/es/driver/resolve-links" "github.com/sour-is/ev/pkg/es/driver/streamer" "github.com/sour-is/ev/pkg/es/event" ) @@ -29,9 +32,6 @@ type Thing struct { event.AggregateRoot } -// func (a *Thing) StreamID() string { -// return fmt.Sprintf("thing-%s", a.Name) -// } func (a *Thing) ApplyEvent(lis ...event.Event) { for _, e := range lis { switch e := e.(type) { @@ -78,9 +78,6 @@ func TestES(t *testing.T) { err := event.Register(ctx, &ValueSet{}) is.NoErr(err) - es.Init(ctx) - memstore.Init(ctx) - { store, err := es.Open(ctx, "mem") is.True(errors.Is(err, es.ErrNoDriver)) @@ -138,9 +135,6 @@ func TestESOperations(t *testing.T) { is := is.New(t) ctx := context.Background() - es.Init(ctx) - memstore.Init(ctx) - store, err := es.Open(ctx, "mem:", streamer.New(ctx), projecter.New(ctx)) is.NoErr(err) @@ -187,3 +181,44 @@ func TestUnwrap(t *testing.T) { is.Equal(es.Unwrap(werr), err) is.Equal(es.Unwrap("test"), "") } + +func TestUnwrapProjector(t *testing.T) { + is := is.New(t) + + ctx, stop := context.WithCancel(context.Background()) + defer stop() + + es, err := es.Open( + ctx, + "mem:", + resolvelinks.New(), + streamer.New(ctx), + projecter.New( + ctx, + projecter.DefaultProjection, + peerfinder.Projector, + ), + ) + is.NoErr(err) + + stream := es.EventStream() + is.True(stream != nil) + +} + +func TestMain(m *testing.M) { + ctx, stop := context.WithCancel(context.Background()) + defer stop() + + err := multierr.Combine( + es.Init(ctx), + event.Init(ctx), + memstore.Init(ctx), + ) + if err != nil { + fmt.Println(err) + return + } + + m.Run() +} diff --git a/pkg/es/event/events.go b/pkg/es/event/events.go index 334bfc0..15a99b6 100644 --- a/pkg/es/event/events.go +++ b/pkg/es/event/events.go @@ -45,6 +45,9 @@ func NewEvents(lis ...Event) Events { for i, e := range lis { meta := e.EventMeta() meta.Position = uint64(i) + if meta.ActualPosition == 0 { + meta.ActualPosition = uint64(i) + } meta.EventID = getULID() e.SetEventMeta(meta) } @@ -224,3 +227,28 @@ func (e *EventPtr) Values() any { e.Pos, } } + +type FeedTruncated struct { + eventMeta Meta +} + +// EventMeta implements Event +func (e *FeedTruncated) EventMeta() Meta { + if e == nil { + return Meta{} + } + return e.eventMeta +} + +// SetEventMeta implements Event +func (e *FeedTruncated) SetEventMeta(m Meta) { + if e == nil { + return + } + e.eventMeta = m +} + +func (e *FeedTruncated) Values() any { + return struct { + }{} +} diff --git a/pkg/set/set.go b/pkg/set/set.go index 305cc83..b948140 100644 --- a/pkg/set/set.go +++ b/pkg/set/set.go @@ -62,11 +62,13 @@ type BoundSet[T ordered] struct { } func NewBoundSet[T ordered](min, max T, items ...T) *BoundSet[T] { - return &BoundSet[T]{ + b := &BoundSet[T]{ min: min, max: max, - s: New(items...), + s: New[T](), } + b.Add(items...) + return b } func (l *BoundSet[T]) Add(items ...T) *BoundSet[T] { n := 0 @@ -110,5 +112,10 @@ func (l *BoundSet[T]) String() string { n++ } sort.Strings(lis) - return strings.Join(lis, ",") + + var b strings.Builder + b.WriteString("set(") + b.WriteString(strings.Join(lis, ",")) + b.WriteString(")") + return b.String() } diff --git a/pkg/set/set_test.go b/pkg/set/set_test.go index 8c8ce4b..f126e1d 100644 --- a/pkg/set/set_test.go +++ b/pkg/set/set_test.go @@ -23,3 +23,17 @@ func TestStringSet(t *testing.T) { var n set.Set[string] is.Equal(n.String(), "set()") } + +func TestBoundSet(t *testing.T) { + is := is.New(t) + + s := set.NewBoundSet(1, 100, 1, 2, 3, 100, 1001) + + is.True(s.Has(1)) + is.True(s.Has(2)) + is.True(s.Has(3)) + is.True(!s.Has(1001)) + + is.Equal(set.NewBoundSet(1, 100, 1).String(), "set(1)") + +}