217 lines
4.6 KiB
Go
217 lines
4.6 KiB
Go
package peerfinder
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"log"
|
|
"net/http"
|
|
"time"
|
|
|
|
"go.sour.is/ev"
|
|
"go.sour.is/ev/internal/lg"
|
|
"go.sour.is/ev/pkg/es/event"
|
|
"go.sour.is/ev/pkg/set"
|
|
)
|
|
|
|
// RefreshJob retrieves peer info from the peerdb
|
|
func (s *service) RefreshJob(ctx context.Context, _ time.Time) error {
|
|
ctx, span := lg.Span(ctx)
|
|
defer span.End()
|
|
|
|
req, err := http.NewRequestWithContext(ctx, http.MethodGet, s.statusURL, nil)
|
|
span.RecordError(err)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
req.Header.Set("Accept", "application/json")
|
|
|
|
res, err := http.DefaultClient.Do(req)
|
|
span.RecordError(err)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
defer res.Body.Close()
|
|
var peers []*Peer
|
|
err = json.NewDecoder(res.Body).Decode(&peers)
|
|
span.RecordError(err)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = s.state.Use(ctx, func(ctx context.Context, t *state) error {
|
|
for _, peer := range peers {
|
|
t.peers[peer.ID] = peer
|
|
}
|
|
|
|
return nil
|
|
})
|
|
span.RecordError(err)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
log.Printf("processed %d peers", len(peers))
|
|
span.AddEvent(fmt.Sprintf("processed %d peers", len(peers)))
|
|
|
|
s.up.Store(true)
|
|
|
|
err = s.cleanPeerJobs(ctx)
|
|
|
|
span.RecordError(err)
|
|
return err
|
|
}
|
|
|
|
const maxResults = 30
|
|
|
|
// CleanJob truncates streams old request data
|
|
func (s *service) CleanJob(ctx context.Context, now time.Time) error {
|
|
ctx, span := lg.Span(ctx)
|
|
defer span.End()
|
|
|
|
span.AddEvent("clear peerfinder requests")
|
|
|
|
err := s.cleanRequests(ctx, now)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// if err = s.cleanResults(ctx, endRequestID); err != nil {
|
|
// return err
|
|
// }
|
|
|
|
return s.cleanPeerJobs(ctx)
|
|
}
|
|
func (s *service) cleanPeerJobs(ctx context.Context) error {
|
|
ctx, span := lg.Span(ctx)
|
|
defer span.End()
|
|
|
|
peers := set.New[string]()
|
|
err := s.state.Use(ctx, func(ctx context.Context, state *state) error {
|
|
for id := range state.peers {
|
|
peers.Add(id)
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// trunctate all the peer streams to last 30
|
|
for streamID := range peers {
|
|
streamID = aggPeer(streamID)
|
|
first, err := s.es.FirstIndex(ctx, streamID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
last, err := s.es.LastIndex(ctx, streamID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if last-first < maxResults {
|
|
fmt.Println("SKIP", streamID, first, last)
|
|
continue
|
|
}
|
|
|
|
newFirst := int64(last - 30)
|
|
// fmt.Println("TRUNC", streamID, first, newFirst, last)
|
|
span.AddEvent(fmt.Sprint("TRUNC", streamID, first, newFirst, last))
|
|
err = s.es.Truncate(ctx, streamID, int64(newFirst))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
func (s *service) cleanRequests(ctx context.Context, now time.Time) error {
|
|
ctx, span := lg.Span(ctx)
|
|
defer span.End()
|
|
|
|
var streamIDs []string
|
|
var startPosition, endPosition int64
|
|
|
|
first, err := s.es.FirstIndex(ctx, queueRequests)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
last, err := s.es.LastIndex(ctx, queueRequests)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if last-first < maxResults {
|
|
// fmt.Println("SKIP", queueRequests, first, last)
|
|
return nil
|
|
}
|
|
|
|
startPosition = int64(first - 1)
|
|
endPosition = int64(last - maxResults)
|
|
|
|
for {
|
|
events, err := s.es.Read(ctx, queueRequests, startPosition, 1000) // read 1000 from the top each loop.
|
|
if err != nil && !errors.Is(err, ev.ErrNotFound) {
|
|
span.RecordError(err)
|
|
return err
|
|
}
|
|
|
|
if len(events) == 0 {
|
|
break
|
|
}
|
|
|
|
startPosition = int64(events.Last().EventMeta().ActualPosition)
|
|
for _, event := range events {
|
|
switch e := event.(type) {
|
|
case *RequestSubmitted:
|
|
if e.eventMeta.ActualPosition < last-maxResults {
|
|
streamIDs = append(streamIDs, e.RequestID())
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// truncate all reqs to found end position
|
|
// fmt.Println("TRUNC", queueRequests, int64(endPosition), last)
|
|
span.AddEvent(fmt.Sprint("TRUNC", queueRequests, int64(endPosition), last))
|
|
err = s.es.Truncate(ctx, queueRequests, int64(endPosition))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// truncate all the request streams
|
|
for _, streamID := range streamIDs {
|
|
s.state.Use(ctx, func(ctx context.Context, state *state) error {
|
|
return state.ApplyEvents(event.NewEvents(&RequestTruncated{
|
|
RequestID: streamID,
|
|
}))
|
|
})
|
|
|
|
err := s.cleanResult(ctx, streamID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
func (s *service) cleanResult(ctx context.Context, requestID string) error {
|
|
ctx, span := lg.Span(ctx)
|
|
defer span.End()
|
|
|
|
streamID := aggRequest(requestID)
|
|
|
|
last, err := s.es.LastIndex(ctx, streamID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// truncate all reqs to found end position
|
|
// fmt.Println("TRUNC", streamID, last)
|
|
span.AddEvent(fmt.Sprint("TRUNC", streamID, last))
|
|
err = s.es.Truncate(ctx, streamID, int64(last))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|