xt/fetcher.go
2025-03-24 17:18:46 -06:00

207 lines
4.5 KiB
Go

package main
import (
"context"
"errors"
"fmt"
"net"
"net/http"
"sync"
"time"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.sour.is/xt/internal/otel"
)
var (
ErrUnmodified = errors.New("unmodified")
ErrPermanentlyDead = errors.New("permanently dead")
ErrTemporarilyDead = errors.New("temporarily dead")
ErrParseFailed = errors.New("parse failed")
)
type Response struct {
Request *Feed
*http.Response
err error
}
func (r *Response) ETag() string {
return r.Header.Get("ETag")
}
func (r *Response) Read(b []byte) (int, error) {
return r.Body.Read(b)
}
// Close closes the Response.Body, which is necessary to free up resources
func (r *Response) Close() {
r.Body.Close()
}
func (r *Response) ContentType() string {
return r.Header.Get("Content-Type")
}
func (r *Response) LastModified() time.Time {
lastModified := time.Now()
if lm, err := time.Parse(http.TimeFormat, r.Header.Get("Last-Modified")); err == nil {
lastModified = lm
}
return lastModified
}
type httpFetcher struct {
client *http.Client
m_fetch_status metric.Int64Counter
m_fetch_second metric.Float64Histogram
}
func NewHTTPFetcher() *httpFetcher {
fetch_total, _ := otel.Meter().Int64Counter("xt_fetch_status_total")
fetch_second, _ := otel.Meter().Float64Histogram("xt_fetch_seconds")
return &httpFetcher{
m_fetch_status: fetch_total,
m_fetch_second: fetch_second,
client: &http.Client{
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 5 * time.Second,
KeepAlive: 5 * time.Second,
}).DialContext,
ForceAttemptHTTP2: false,
MaxIdleConns: 100,
IdleConnTimeout: 10 * time.Second,
TLSHandshakeTimeout: 5 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
},
},
}
}
func (f *httpFetcher) Fetch(ctx context.Context, request *Feed) *Response {
ctx, span := otel.Span(ctx)
defer span.End()
start := time.Now()
defer func() {
since := time.Since(start)
f.m_fetch_second.Record(ctx, since.Seconds())
}()
defer fmt.Println("fetch done", request.URI)
response := &Response{
Request: request,
}
req, err := request.MakeHTTPRequest(ctx)
if err != nil {
response.err = err
return response
}
span.AddEvent("start request")
res, err := f.client.Do(req)
span.AddEvent("got response")
if err != nil {
if errors.Is(err, &net.DNSError{}) {
response.err = fmt.Errorf("%w: %s", ErrTemporarilyDead, err)
return response
}
response.err = fmt.Errorf("%w: %w", ErrTemporarilyDead, err)
return response
}
response.Response = res
switch res.StatusCode {
case 200:
f.m_fetch_status.Add(ctx, 1, metric.WithAttributes(attribute.String("status", "ok")))
case 304:
f.m_fetch_status.Add(ctx, 1, metric.WithAttributes(attribute.String("status", "not_modified")))
response.err = fmt.Errorf("%w: %s", ErrUnmodified, res.Status)
case 400, 406, 429, 500, 502, 503:
f.m_fetch_status.Add(ctx, 1, metric.WithAttributes(attribute.String("status", "temp_fail")))
response.err = fmt.Errorf("%w: %s", ErrTemporarilyDead, res.Status)
case 403, 404, 410:
f.m_fetch_status.Add(ctx, 1, metric.WithAttributes(attribute.String("status", "perm_fail")))
response.err = fmt.Errorf("%w: %s", ErrPermanentlyDead, res.Status)
default:
f.m_fetch_status.Add(ctx, 1, metric.WithAttributes(attribute.Int("status", res.StatusCode)))
response.err = errors.New(res.Status)
}
return response
}
type pool[IN, OUT any] struct {
in chan IN
out chan OUT
Err error
}
func NewFuncPool[IN, OUT any](
ctx context.Context,
size int,
fetch func(ctx context.Context, request IN) OUT,
) (*pool[IN, OUT], func()) {
ctx, span := otel.Span(ctx)
defer span.End()
var wg sync.WaitGroup
in := make(chan IN, size)
out := make(chan OUT, size)
wg.Add(size)
for range size {
go func() {
ctx, span := otel.Span(ctx)
defer span.End()
defer wg.Done()
for request := range in {
ctx, cancel := context.WithTimeoutCause(ctx, 15*time.Second, fmt.Errorf("GOT STUCK"))
defer cancel()
ctx, span := otel.Span(ctx)
defer span.End()
span.AddEvent("start fetch")
r := fetch(ctx, request)
span.AddEvent("got fetch")
select {
case <-ctx.Done():
return
case out <- r:
span.AddEvent("sent queue")
}
}
}()
}
return &pool[IN, OUT]{
in: in,
out: out,
}, func() {
close(in)
wg.Wait()
close(out)
}
}
func (f *pool[IN, OUT]) Fetch(request IN) {
f.in <- request
}
func (f *pool[IN, OUT]) Out() <-chan OUT {
return f.out
}