go-tools/app/peerfinder/jobs.go

217 lines
4.6 KiB
Go

package peerfinder
import (
"context"
"encoding/json"
"errors"
"fmt"
"log"
"net/http"
"time"
"go.sour.is/ev"
"go.sour.is/ev/event"
"go.sour.is/pkg/lg"
"go.sour.is/pkg/set"
)
// RefreshJob retrieves peer info from the peerdb
func (s *service) RefreshJob(ctx context.Context, _ time.Time) error {
ctx, span := lg.Span(ctx)
defer span.End()
req, err := http.NewRequestWithContext(ctx, http.MethodGet, s.statusURL, nil)
span.RecordError(err)
if err != nil {
return err
}
req.Header.Set("Accept", "application/json")
res, err := http.DefaultClient.Do(req)
span.RecordError(err)
if err != nil {
return err
}
defer res.Body.Close()
var peers []*Peer
err = json.NewDecoder(res.Body).Decode(&peers)
span.RecordError(err)
if err != nil {
return err
}
err = s.state.Use(ctx, func(ctx context.Context, t *state) error {
for _, peer := range peers {
t.peers[peer.ID] = peer
}
return nil
})
span.RecordError(err)
if err != nil {
return err
}
log.Printf("processed %d peers", len(peers))
span.AddEvent(fmt.Sprintf("processed %d peers", len(peers)))
s.up.Store(true)
err = s.cleanPeerJobs(ctx)
span.RecordError(err)
return err
}
const maxResults = 30
// CleanJob truncates streams old request data
func (s *service) CleanJob(ctx context.Context, now time.Time) error {
ctx, span := lg.Span(ctx)
defer span.End()
span.AddEvent("clear peerfinder requests")
err := s.cleanRequests(ctx, now)
if err != nil {
return err
}
// if err = s.cleanResults(ctx, endRequestID); err != nil {
// return err
// }
return s.cleanPeerJobs(ctx)
}
func (s *service) cleanPeerJobs(ctx context.Context) error {
ctx, span := lg.Span(ctx)
defer span.End()
peers := set.New[string]()
err := s.state.Use(ctx, func(ctx context.Context, state *state) error {
for id := range state.peers {
peers.Add(id)
}
return nil
})
if err != nil {
return err
}
// trunctate all the peer streams to last 30
for streamID := range peers {
streamID = aggPeer(streamID)
first, err := s.es.FirstIndex(ctx, streamID)
if err != nil {
return err
}
last, err := s.es.LastIndex(ctx, streamID)
if err != nil {
return err
}
if last-first < maxResults {
fmt.Println("SKIP", streamID, first, last)
continue
}
newFirst := int64(last - 30)
// fmt.Println("TRUNC", streamID, first, newFirst, last)
span.AddEvent(fmt.Sprint("TRUNC", streamID, first, newFirst, last))
err = s.es.Truncate(ctx, streamID, int64(newFirst))
if err != nil {
return err
}
}
return nil
}
func (s *service) cleanRequests(ctx context.Context, now time.Time) error {
ctx, span := lg.Span(ctx)
defer span.End()
var streamIDs []string
var startPosition, endPosition int64
first, err := s.es.FirstIndex(ctx, queueRequests)
if err != nil {
return err
}
last, err := s.es.LastIndex(ctx, queueRequests)
if err != nil {
return err
}
if last-first < maxResults {
// fmt.Println("SKIP", queueRequests, first, last)
return nil
}
startPosition = int64(first - 1)
endPosition = int64(last - maxResults)
for {
events, err := s.es.Read(ctx, queueRequests, startPosition, 1000) // read 1000 from the top each loop.
if err != nil && !errors.Is(err, ev.ErrNotFound) {
span.RecordError(err)
return err
}
if len(events) == 0 {
break
}
startPosition = int64(events.Last().EventMeta().ActualPosition)
for _, event := range events {
switch e := event.(type) {
case *RequestSubmitted:
if e.EventMeta().ActualPosition < last-maxResults {
streamIDs = append(streamIDs, e.RequestID())
}
}
}
}
// truncate all reqs to found end position
// fmt.Println("TRUNC", queueRequests, int64(endPosition), last)
span.AddEvent(fmt.Sprint("TRUNC", queueRequests, int64(endPosition), last))
err = s.es.Truncate(ctx, queueRequests, int64(endPosition))
if err != nil {
return err
}
// truncate all the request streams
for _, streamID := range streamIDs {
s.state.Use(ctx, func(ctx context.Context, state *state) error {
return state.ApplyEvents(event.NewEvents(&RequestTruncated{
RequestID: streamID,
}))
})
err := s.cleanResult(ctx, streamID)
if err != nil {
return err
}
}
return nil
}
func (s *service) cleanResult(ctx context.Context, requestID string) error {
ctx, span := lg.Span(ctx)
defer span.End()
streamID := aggRequest(requestID)
last, err := s.es.LastIndex(ctx, streamID)
if err != nil {
return err
}
// truncate all reqs to found end position
// fmt.Println("TRUNC", streamID, last)
span.AddEvent(fmt.Sprint("TRUNC", streamID, last))
err = s.es.Truncate(ctx, streamID, int64(last))
if err != nil {
return err
}
return nil
}