diff --git a/app/peerfinder/assets/peerfinder.css b/app/peerfinder/assets/peerfinder.css index 2c7b4c4..f6ed2d1 100644 --- a/app/peerfinder/assets/peerfinder.css +++ b/app/peerfinder/assets/peerfinder.css @@ -37,4 +37,22 @@ body { .container-narrow > hr { margin: 30px 0; +} + +.table tbody tr th { + width: 70% +} + +@media (prefers-color-scheme: dark) { + body, .panel-body { + color: white; + background-color: #121212; + } + .table-striped > tbody > tr:nth-of-type(2n+1) { + background-color: darkslategray; + } +} + +@media (prefers-color-scheme: light) { + } \ No newline at end of file diff --git a/app/peerfinder/ev-peer.go b/app/peerfinder/ev-peer.go index a56ca45..c2bd8aa 100644 --- a/app/peerfinder/ev-peer.go +++ b/app/peerfinder/ev-peer.go @@ -5,6 +5,10 @@ import ( "strconv" "strings" "time" + + "github.com/keys-pub/keys/json" + "github.com/sour-is/ev/pkg/es/event" + "github.com/sour-is/ev/pkg/set" ) type Time time.Time @@ -49,7 +53,9 @@ func (t *ipFamily) UnmarshalJSON(b []byte) error { type peerType []string func (t *peerType) UnmarshalJSON(b []byte) error { - *t = strings.Split(strings.Trim(string(b), `"`), ",") + var bs string + json.Unmarshal(b, &bs) + *t = strings.Split(bs, ",") return nil } @@ -70,7 +76,7 @@ func (p *Peer) CanSupport(ip string) bool { if addr == nil { return false } - if !addr.IsGlobalUnicast() { + if !(addr.IsGlobalUnicast() || addr.IsLoopback() || addr.IsPrivate()) { return false } @@ -85,3 +91,21 @@ func (p *Peer) CanSupport(ip string) bool { return true } + +type PeerResults struct { + set.Set[string] + event.AggregateRoot +} + +func (p *PeerResults) ApplyEvent(lis ...event.Event) { + for _, e := range lis { + switch e := e.(type) { + case *ResultSubmitted: + if p.Set == nil { + p.Set = set.New[string]() + } + p.Set.Add(e.RequestID) + } + } +} + diff --git a/app/peerfinder/ev-request.go b/app/peerfinder/ev-request.go index 35b77ae..0d5484b 100644 --- a/app/peerfinder/ev-request.go +++ b/app/peerfinder/ev-request.go @@ -10,6 +10,7 @@ import ( "github.com/oklog/ulid" "github.com/sour-is/ev/pkg/es/event" + "github.com/sour-is/ev/pkg/set" ) type Request struct { @@ -19,8 +20,11 @@ type Request struct { RequestIP string `json:"req_ip"` Hidden bool `json:"hide,omitempty"` Created time.Time `json:"req_created"` + Family int `json:"family"` - Responses []Response `json:"responses"` + Responses []*Response `json:"responses"` + peers set.Set[string] + initial *RequestSubmitted } var _ event.Aggregate = (*Request)(nil) @@ -33,8 +37,19 @@ func (a *Request) ApplyEvent(lis ...event.Event) { a.RequestIP = e.RequestIP a.Hidden = e.Hidden a.Created = ulid.Time(e.EventMeta().EventID.Time()) + a.Family = e.Family() + + a.initial = e case *ResultSubmitted: - a.Responses = append(a.Responses, Response{ + if a.peers == nil { + a.peers = set.New[string]() + } + if a.peers.Has(e.PeerID) { + continue + } + + a.peers.Add(e.PeerID) + a.Responses = append(a.Responses, &Response{ PeerID: e.PeerID, ScriptVersion: e.PeerVersion, Latency: e.Latency, @@ -50,6 +65,25 @@ func (a *Request) ApplyEvent(lis ...event.Event) { } } +func (a *Request) MarshalEnviron() ([]byte, error) { + return a.initial.MarshalEnviron() +} +func (a *Request) CreatedString() string { + return a.Created.Format("2006-01-02 15:04:05") +} + +type ListRequest []*Request + +func (lis ListRequest) Len() int { + return len(lis) +} +func (lis ListRequest) Less(i, j int) bool { + return lis[i].Created.Before(lis[j].Created) +} +func (lis ListRequest) Swap(i, j int) { + lis[i], lis[j] = lis[j], lis[i] +} + type Response struct { Peer *Peer `json:"peer"` PeerID string `json:"-"` @@ -66,12 +100,15 @@ type Response struct { Created time.Time `json:"res_created"` } -type ListResponse []Response +type ListResponse []*Response func (lis ListResponse) Len() int { return len(lis) } func (lis ListResponse) Less(i, j int) bool { + if lis[i].Latency == 0.0 { + return false + } return lis[i].Latency < lis[j].Latency } func (lis ListResponse) Swap(i, j int) { @@ -112,6 +149,9 @@ func (r *RequestSubmitted) Family() int { return 2 } } +func (r *RequestSubmitted) String() string { + return fmt.Sprint(r.eventMeta.EventID, r.RequestIP, r.Hidden, r.CreatedString()) +} var _ event.Event = (*RequestSubmitted)(nil) @@ -200,3 +240,32 @@ func (e *ResultSubmitted) UnmarshalBinary(b []byte) error { func (e *ResultSubmitted) String() string { return fmt.Sprintf("id: %s\npeer: %s\nversion: %s\nlatency: %0.4f", e.RequestID, e.PeerID, e.PeerVersion, e.Latency) } + +type RequestTruncated struct { + RequestID string + + eventMeta event.Meta +} + +var _ event.Event = (*RequestTruncated)(nil) + +func (e *RequestTruncated) EventMeta() event.Meta { + if e == nil { + return event.Meta{} + } + return e.eventMeta +} +func (e *RequestTruncated) SetEventMeta(m event.Meta) { + if e != nil { + e.eventMeta = m + } +} +func (e *RequestTruncated) MarshalBinary() (text []byte, err error) { + return json.Marshal(e) +} +func (e *RequestTruncated) UnmarshalBinary(b []byte) error { + return json.Unmarshal(b, e) +} +func (e *RequestTruncated) String() string { + return fmt.Sprintf("request truncated id: %s\n", e.RequestID) +} diff --git a/app/peerfinder/http.go b/app/peerfinder/http.go index 7a67a6f..6868be8 100644 --- a/app/peerfinder/http.go +++ b/app/peerfinder/http.go @@ -15,7 +15,7 @@ import ( "strconv" "strings" - "github.com/oklog/ulid" + "github.com/oklog/ulid/v2" contentnegotiation "gitlab.com/jamietanna/content-negotiation-go" "go.opentelemetry.io/otel/attribute" @@ -58,6 +58,10 @@ func (s *service) RegisterHTTP(mux *http.ServeMux) { mux.Handle("/peers/", lg.Htrace(s, "peers")) } +func (s *service) Setup() error { + return nil +} + // ServeHTTP handle HTTP requests func (s *service) ServeHTTP(w http.ResponseWriter, r *http.Request) { ctx := r.Context() @@ -67,6 +71,12 @@ func (s *service) ServeHTTP(w http.ResponseWriter, r *http.Request) { r = r.WithContext(ctx) + if !s.up.Load() { + w.WriteHeader(http.StatusFailedDependency) + fmt.Fprint(w, "Starting up...") + return + } + switch r.Method { case http.MethodGet: switch { @@ -81,11 +91,14 @@ func (s *service) ServeHTTP(w http.ResponseWriter, r *http.Request) { case strings.HasPrefix(r.URL.Path, "/peers/status"): s.state.Modify(r.Context(), func(ctx context.Context, state *state) error { - for id, p := range state.requests { - fmt.Fprintln(w, "REQ: ", id, p.RequestIP, len(p.Responses)) - for id, r := range p.Responses { + for id, rq := range state.requests { + fmt.Fprintln(w, "REQ: ", id, rq.RequestIP, len(rq.Responses)) + for id, r := range rq.Responses { fmt.Fprintln(w, " RES: ", id, r.PeerID[24:], r.Latency, r.Jitter) } + for p := range rq.peers { + fmt.Fprintln(w, " PEER: ", p[24:]) + } } return nil @@ -114,19 +127,19 @@ func (s *service) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } } -func (s *service) getPending(w http.ResponseWriter, r *http.Request, uuid string) { +func (s *service) getPending(w http.ResponseWriter, r *http.Request, peerID string) { ctx, span := lg.Span(r.Context()) defer span.End() span.SetAttributes( - attribute.String("uuid", uuid), + attribute.String("peerID", peerID), ) var peer *Peer err := s.state.Modify(ctx, func(ctx context.Context, state *state) error { var ok bool - if peer, ok = state.peers[uuid]; !ok { - return fmt.Errorf("peer not found: %s", uuid) + if peer, ok = state.peers[peerID]; !ok { + return fmt.Errorf("peer not found: %s", peerID) } return nil @@ -153,16 +166,30 @@ func (s *service) getPending(w http.ResponseWriter, r *http.Request, uuid string return } - responses, err := s.es.Read(ctx, aggPeer(uuid), -1, -30) + peerResults := &PeerResults{} + peerResults.SetStreamID(aggPeer(peerID)) + err = s.es.Load(ctx, peerResults) if err != nil { - span.RecordError(err) - w.WriteHeader(http.StatusInternalServerError) - return + span.RecordError(fmt.Errorf("peer not found: %w", err)) + w.WriteHeader(http.StatusNotFound) } - span.AddEvent(fmt.Sprintf("req = %d, res = %d", len(requests), len(responses))) + var req *Request + for _, e := range requests { + r := &Request{} + r.ApplyEvent(e) - req := filter(peer, requests, responses) + if !peerResults.Has(r.RequestID) { + if !peer.CanSupport(r.RequestIP) { + continue + } + req = r + } + } + if req == nil { + span.RecordError(fmt.Errorf("request not found")) + w.WriteHeader(http.StatusNoContent) + } negotiator := contentnegotiation.NewNegotiator("application/json", "text/environment", "text/plain", "text/html") negotiated, _, err := negotiator.Negotiate(r.Header.Get("Accept")) @@ -190,9 +217,9 @@ func (s *service) getPending(w http.ResponseWriter, r *http.Request, uuid string Created string `json:"req_created"` }{ info.ScriptVersion, - req.RequestID(), + req.RequestID, req.RequestIP, - strconv.Itoa(req.Family()), + strconv.Itoa(req.Family), req.CreatedString(), } } @@ -204,34 +231,53 @@ func (s *service) getResults(w http.ResponseWriter, r *http.Request) { ctx, span := lg.Span(r.Context()) defer span.End() - events, err := s.es.Read(ctx, queueRequests, -1, -30) - if err != nil { - span.RecordError(err) - w.WriteHeader(http.StatusInternalServerError) - return - } + // events, err := s.es.Read(ctx, queueRequests, -1, -30) + // if err != nil { + // span.RecordError(err) + // w.WriteHeader(http.StatusInternalServerError) + // return + // } - requests := make([]*Request, len(events)) - for i, req := range events { - if req, ok := req.(*RequestSubmitted); ok { - requests[i], err = s.loadResult(ctx, req.RequestID()) - if err != nil { - span.RecordError(err) - w.WriteHeader(http.StatusInternalServerError) - return + // requests := make([]*Request, len(events)) + // for i, req := range events { + // if req, ok := req.(*RequestSubmitted); ok { + // requests[i], err = s.loadResult(ctx, req.RequestID()) + // if err != nil { + // span.RecordError(err) + // w.WriteHeader(http.StatusInternalServerError) + // return + // } + // } + // } + + var requests ListRequest + s.state.Modify(ctx, func(ctx context.Context, state *state) error { + requests = make([]*Request, 0, len(state.requests)) + + for _, req := range state.requests { + if req.RequestID == "" { + continue } + if req.Hidden { + continue + } + + requests = append(requests, req) } - } + + return nil + }) + sort.Sort(sort.Reverse(requests)) args := requestArgs(r) - args.Requests = requests + args.Requests = requests[:maxResults] s.state.Modify(ctx, func(ctx context.Context, state *state) error { args.CountPeers = len(state.peers) return nil }) - t := templates["home.tpl"] + t := templates["home.go.tpl"] t.Execute(w, args) } func (s *service) getResultsForRequest(w http.ResponseWriter, r *http.Request, uuid string) { @@ -242,7 +288,20 @@ func (s *service) getResultsForRequest(w http.ResponseWriter, r *http.Request, u attribute.String("uuid", uuid), ) - request, err := s.loadResult(ctx, uuid) + var request *Request + err := s.state.Modify(ctx, func(ctx context.Context, state *state) error { + request = state.requests[uuid] + + return nil + }) + if err != nil { + w.WriteHeader(http.StatusNotFound) + return + } + + request, err = s.loadResult(ctx, request) + + // request, err := s.loadResult(ctx, uuid) if err != nil { w.WriteHeader(http.StatusInternalServerError) return @@ -265,7 +324,7 @@ func (s *service) getResultsForRequest(w http.ResponseWriter, r *http.Request, u args := requestArgs(r) args.Requests = append(args.Requests, request) span.AddEvent(fmt.Sprint(args)) - err := renderTo(w, "req.tpl", args) + err := renderTo(w, "req.go.tpl", args) span.RecordError(err) return @@ -295,7 +354,7 @@ func (s *service) postRequest(w http.ResponseWriter, r *http.Request) { req := &RequestSubmitted{ RequestIP: ip.String(), } - if hidden, err := strconv.ParseBool(r.Form.Get("req_hidden")); err != nil { + if hidden, err := strconv.ParseBool(r.Form.Get("req_hidden")); err == nil { req.Hidden = hidden } @@ -307,15 +366,15 @@ func (s *service) postRequest(w http.ResponseWriter, r *http.Request) { http.Redirect(w, r, "/peers/req/"+req.RequestID(), http.StatusSeeOther) } -func (s *service) postResult(w http.ResponseWriter, r *http.Request, id string) { +func (s *service) postResult(w http.ResponseWriter, r *http.Request, reqID string) { ctx, span := lg.Span(r.Context()) defer span.End() span.SetAttributes( - attribute.String("id", id), + attribute.String("id", reqID), ) - if _, err := ulid.ParseStrict(id); err != nil { + if _, err := ulid.ParseStrict(reqID); err != nil { w.WriteHeader(http.StatusNotFound) return } @@ -336,10 +395,11 @@ func (s *service) postResult(w http.ResponseWriter, r *http.Request, id string) ) peerID := r.Form.Get("peer_id") + err := s.state.Modify(ctx, func(ctx context.Context, state *state) error { var ok bool if _, ok = state.peers[peerID]; !ok { - // fmt.Printf("peer not found: %s\n", peerID) + log.Printf("peer not found: %s\n", peerID) return fmt.Errorf("peer not found: %s", peerID) } @@ -351,6 +411,20 @@ func (s *service) postResult(w http.ResponseWriter, r *http.Request, id string) return } + peerResults := &PeerResults{} + peerResults.SetStreamID(aggPeer(peerID)) + err = s.es.Load(ctx, peerResults) + if err != nil { + span.RecordError(fmt.Errorf("peer not found: %w", err)) + w.WriteHeader(http.StatusNotFound) + } + + if peerResults.Has(reqID) { + span.RecordError(fmt.Errorf("request previously recorded: req=%v peer=%v", reqID, peerID)) + w.WriteHeader(http.StatusAlreadyReported) + return + } + var unreach bool latency, err := strconv.ParseFloat(r.Form.Get("res_latency"), 64) if err != nil { @@ -358,7 +432,7 @@ func (s *service) postResult(w http.ResponseWriter, r *http.Request, id string) } req := &ResultSubmitted{ - RequestID: id, + RequestID: reqID, PeerID: r.Form.Get("peer_id"), PeerVersion: r.Form.Get("peer_version"), Latency: latency, @@ -385,21 +459,7 @@ func (s *service) postResult(w http.ResponseWriter, r *http.Request, id string) attribute.Stringer("result", req), ) - s.state.Modify(ctx, func(ctx context.Context, state *state) error { - - return nil - }) - - idx, err := s.es.LastIndex(ctx, aggRequest(id)) - if err != nil { - w.WriteHeader(http.StatusInternalServerError) - return - } - if idx == 0 { - w.WriteHeader(http.StatusNotFound) - return - } - + log.Printf("record result: %v", req) s.es.Append(ctx, queueResults, event.NewEvents(req)) } @@ -452,7 +512,7 @@ func loadTemplates() error { } pt := template.New(tmpl.Name()) pt.Funcs(funcMap) - pt, err = pt.ParseFS(files, "pages/"+tmpl.Name(), "layouts/*.tpl") + pt, err = pt.ParseFS(files, "pages/"+tmpl.Name(), "layouts/*.go.tpl") if err != nil { log.Println(err) @@ -468,27 +528,44 @@ var funcMap = map[string]any{ "countResponses": fnCountResponses, } -func fnOrderByPeer(rq *Request) any { - type peerResult struct { - Name string - Country string - Latency float64 - Jitter float64 - } - type peer struct { - Name string - Note string - Nick string - Country string - Latency float64 - Jitter float64 - VPNTypes []string +type peerResult struct { + Name string + Country string + Latency float64 + Jitter float64 +} +type peer struct { + Name string + Note string + Nick string + Country string + Latency float64 + Jitter float64 + VPNTypes []string - Results []peerResult + Results []peerResult +} +type listPeer []peer + +func (lis listPeer) Len() int { + return len(lis) +} +func (lis listPeer) Less(i, j int) bool { + if lis[i].Latency == 0.0 { + return false } + return lis[i].Latency < lis[j].Latency +} +func (lis listPeer) Swap(i, j int) { + lis[i], lis[j] = lis[j], lis[i] +} + +func fnOrderByPeer(rq *Request) any { peers := make(map[string]peer) + sort.Sort(ListResponse(rq.Responses)) + for _, rs := range rq.Responses { p, ok := peers[rs.Peer.Owner] @@ -512,7 +589,14 @@ func fnOrderByPeer(rq *Request) any { peers[rs.Peer.Owner] = p } - return peers + peerList := make(listPeer, 0, len(peers)) + for _, v := range peers { + peerList = append(peerList, v) + } + + sort.Sort(peerList) + + return peerList } func fnCountResponses(rq *Request) int { count := 0 @@ -523,37 +607,38 @@ func fnCountResponses(rq *Request) int { } return count } -func filter(peer *Peer, requests, responses event.Events) *RequestSubmitted { - have := make(map[string]struct{}, len(responses)) - for _, res := range toList[ResultSubmitted](responses...) { - have[res.RequestID] = struct{}{} - } - for _, req := range reverse(toList[RequestSubmitted](requests...)...) { - if _, ok := have[req.RequestID()]; !ok { - if !peer.CanSupport(req.RequestIP) { - continue - } - return req - } - } - return nil -} -func toList[E any, T es.PE[E]](lis ...event.Event) []T { - newLis := make([]T, 0, len(lis)) - for i := range lis { - if e, ok := lis[i].(T); ok { - newLis = append(newLis, e) - } - } - return newLis -} -func reverse[T any](s ...T) []T { - first, last := 0, len(s)-1 - for first < last { - s[first], s[last] = s[last], s[first] - first++ - last-- - } - return s -} +// func filter(peer *Peer, requests, responses event.Events) *RequestSubmitted { +// have := make(map[string]struct{}, len(responses)) +// for _, res := range toList[ResultSubmitted](responses...) { +// have[res.RequestID] = struct{}{} +// } +// for _, req := range reverse(toList[RequestSubmitted](requests...)...) { +// if _, ok := have[req.RequestID()]; !ok { +// if !peer.CanSupport(req.RequestIP) { +// continue +// } + +// return req +// } +// } +// return nil +// } +// func toList[E any, T es.PE[E]](lis ...event.Event) []T { +// newLis := make([]T, 0, len(lis)) +// for i := range lis { +// if e, ok := lis[i].(T); ok { +// newLis = append(newLis, e) +// } +// } +// return newLis +// } +// func reverse[T any](s ...T) []T { +// first, last := 0, len(s)-1 +// for first < last { +// s[first], s[last] = s[last], s[first] +// first++ +// last-- +// } +// return s +// } diff --git a/app/peerfinder/jobs.go b/app/peerfinder/jobs.go index bf818f3..a010a3d 100644 --- a/app/peerfinder/jobs.go +++ b/app/peerfinder/jobs.go @@ -5,12 +5,13 @@ import ( "encoding/json" "errors" "fmt" + "log" "net/http" "time" "github.com/sour-is/ev/internal/lg" "github.com/sour-is/ev/pkg/es" - "github.com/sour-is/ev/pkg/math" + "github.com/sour-is/ev/pkg/es/event" "github.com/sour-is/ev/pkg/set" ) @@ -40,8 +41,6 @@ func (s *service) RefreshJob(ctx context.Context, _ time.Time) error { return err } - span.AddEvent(fmt.Sprintf("processed %d peers", len(peers))) - err = s.state.Modify(ctx, func(ctx context.Context, t *state) error { for _, peer := range peers { t.peers[peer.ID] = peer @@ -49,10 +48,24 @@ func (s *service) RefreshJob(ctx context.Context, _ time.Time) error { return nil }) + span.RecordError(err) + if err != nil { + return err + } + + log.Printf("processed %d peers", len(peers)) + span.AddEvent(fmt.Sprintf("processed %d peers", len(peers))) + + s.up.Store(true) + + err = s.cleanPeerJobs(ctx) + span.RecordError(err) return err } +const maxResults = 30 + // CleanJob truncates streams old request data func (s *service) CleanJob(ctx context.Context, now time.Time) error { ctx, span := lg.Span(ctx) @@ -60,13 +73,13 @@ func (s *service) CleanJob(ctx context.Context, now time.Time) error { span.AddEvent("clear peerfinder requests") - endRequestID, err := s.cleanRequests(ctx, now) + err := s.cleanRequests(ctx, now) if err != nil { return err } - if err = s.cleanResults(ctx, endRequestID); err != nil { - return err - } + // if err = s.cleanResults(ctx, endRequestID); err != nil { + // return err + // } return s.cleanPeerJobs(ctx) } @@ -96,12 +109,12 @@ func (s *service) cleanPeerJobs(ctx context.Context) error { if err != nil { return err } - newFirst := math.Max(int64(last-30), int64(first)) - if last == 0 || newFirst == int64(first) { - // fmt.Println("SKIP", streamID, first, newFirst, last) - span.AddEvent(fmt.Sprint("SKIP", streamID, first, newFirst, last)) + if last-first < maxResults { + fmt.Println("SKIP", streamID, first, last) continue } + + newFirst := int64(last - 30) // fmt.Println("TRUNC", streamID, first, newFirst, last) span.AddEvent(fmt.Sprint("TRUNC", streamID, first, newFirst, last)) err = s.es.Truncate(ctx, streamID, int64(newFirst)) @@ -112,41 +125,47 @@ func (s *service) cleanPeerJobs(ctx context.Context) error { return nil } -func (s *service) cleanRequests(ctx context.Context, now time.Time) (string, error) { +func (s *service) cleanRequests(ctx context.Context, now time.Time) error { ctx, span := lg.Span(ctx) defer span.End() var streamIDs []string - var endPosition uint64 - var endRequestID string + var startPosition, endPosition int64 + first, err := s.es.FirstIndex(ctx, queueRequests) + if err != nil { + return err + } last, err := s.es.LastIndex(ctx, queueRequests) if err != nil { - return "", err + return err } -end: + if last-first < maxResults { + // fmt.Println("SKIP", queueRequests, first, last) + return nil + } + + startPosition = int64(first - 1) + endPosition = int64(last - maxResults) + for { - events, err := s.es.Read(ctx, queueRequests, int64(endPosition), 1000) // read 1000 from the top each loop. + events, err := s.es.Read(ctx, queueRequests, startPosition, 1000) // read 1000 from the top each loop. if err != nil && !errors.Is(err, es.ErrNotFound) { span.RecordError(err) - return "", err + return err } if len(events) == 0 { break } - endPosition = events.Last().EventMeta().ActualPosition + startPosition = int64(events.Last().EventMeta().ActualPosition) for _, event := range events { switch e := event.(type) { case *RequestSubmitted: - if e.eventMeta.ActualPosition < last-30 { - streamIDs = append(streamIDs, aggRequest(e.RequestID())) - } else { - endRequestID = e.RequestID() - endPosition = e.eventMeta.ActualPosition - break end + if e.eventMeta.ActualPosition < last-maxResults { + streamIDs = append(streamIDs, e.RequestID()) } } } @@ -157,59 +176,39 @@ end: span.AddEvent(fmt.Sprint("TRUNC", queueRequests, int64(endPosition), last)) err = s.es.Truncate(ctx, queueRequests, int64(endPosition)) if err != nil { - return "", err + return err } // truncate all the request streams for _, streamID := range streamIDs { - last, err := s.es.LastIndex(ctx, streamID) - if err != nil { - return "", err - } - // fmt.Println("TRUNC", streamID, last) - span.AddEvent(fmt.Sprint("TRUNC", streamID, last)) - err = s.es.Truncate(ctx, streamID, int64(last)) - if err != nil { - return "", err - } - } + s.state.Modify(ctx, func(ctx context.Context, state *state) error { + return state.ApplyEvents(event.NewEvents(&RequestTruncated{ + RequestID: streamID, + })) + }) - return endRequestID, nil -} -func (s *service) cleanResults(ctx context.Context, endRequestID string) error { - ctx, span := lg.Span(ctx) - defer span.End() - - var endPosition uint64 - - done := false - for !done { - events, err := s.es.Read(ctx, queueResults, int64(endPosition), 1000) // read 30 from the top each loop. + err := s.cleanResult(ctx, streamID) if err != nil { return err } + } - if len(events) == 0 { - done = true - continue - } + return nil +} +func (s *service) cleanResult(ctx context.Context, requestID string) error { + ctx, span := lg.Span(ctx) + defer span.End() - endPosition = events.Last().EventMeta().ActualPosition + streamID := aggRequest(requestID) - for _, event := range events { - switch e := event.(type) { - case *ResultSubmitted: - if e.RequestID == endRequestID { - done = true - endPosition = e.eventMeta.ActualPosition - } - } - } + last, err := s.es.LastIndex(ctx, streamID) + if err != nil { + return err } // truncate all reqs to found end position - // fmt.Println("TRUNC", queueResults, int64(endPosition), last) - span.AddEvent(fmt.Sprint("TRUNC", queueResults, int64(endPosition))) - err := s.es.Truncate(ctx, queueResults, int64(endPosition)) + // fmt.Println("TRUNC", streamID, last) + span.AddEvent(fmt.Sprint("TRUNC", streamID, last)) + err = s.es.Truncate(ctx, streamID, int64(last)) if err != nil { return err } diff --git a/app/peerfinder/layouts/main.tpl b/app/peerfinder/layouts/main.go.tpl similarity index 100% rename from app/peerfinder/layouts/main.tpl rename to app/peerfinder/layouts/main.go.tpl diff --git a/app/peerfinder/pages/home.tpl b/app/peerfinder/pages/home.go.tpl similarity index 77% rename from app/peerfinder/pages/home.tpl rename to app/peerfinder/pages/home.go.tpl index 89f131d..b7072ac 100644 --- a/app/peerfinder/pages/home.tpl +++ b/app/peerfinder/pages/home.go.tpl @@ -32,7 +32,7 @@