add-otel #3
							
								
								
									
										20
									
								
								feed.go
									
									
									
									
									
								
							
							
						
						
									
										20
									
								
								feed.go
									
									
									
									
									
								
							@ -32,6 +32,7 @@ type Feed struct {
 | 
			
		||||
	LastScanOn  TwtTime
 | 
			
		||||
	RefreshRate int
 | 
			
		||||
	NextScanOn  TwtTime
 | 
			
		||||
        LastTwtOn   TwtTime
 | 
			
		||||
 | 
			
		||||
	LastModified TwtTime
 | 
			
		||||
	LastError    sql.NullString
 | 
			
		||||
@ -102,20 +103,25 @@ var (
 | 
			
		||||
		select
 | 
			
		||||
			feed_id,
 | 
			
		||||
			parent_id,
 | 
			
		||||
			coalesce(hashing_uri, uri)         hash_uri,
 | 
			
		||||
			coalesce(hashing_uri, uri) hash_uri,
 | 
			
		||||
			uri,
 | 
			
		||||
			nick,
 | 
			
		||||
			state,
 | 
			
		||||
			last_scan_on,
 | 
			
		||||
			strftime(
 | 
			
		||||
			    '%Y-%m-%dT%H:%M:%fZ',
 | 
			
		||||
				'%Y-%m-%dT%H:%M:%fZ',
 | 
			
		||||
				coalesce(last_scan_on, '1901-01-01'),
 | 
			
		||||
				'+'||refresh_rate||' seconds'
 | 
			
		||||
			)                           next_scan_on,
 | 
			
		||||
				'+'||abs(refresh_rate + cast(random() % 30 as int))||' seconds'
 | 
			
		||||
			) next_scan_on,
 | 
			
		||||
			coalesce(last_twt_on, '1901-01-01T00:00:00Z') last_twt_on,
 | 
			
		||||
			refresh_rate,
 | 
			
		||||
			last_modified_on,
 | 
			
		||||
			last_etag
 | 
			
		||||
		from feeds
 | 
			
		||||
                left join (
 | 
			
		||||
                      select feed_id, max(strftime('%Y-%m-%dT%H:%M:%fZ', (substring(text, 1, instr(text, '	')-1)))) last_twt_on
 | 
			
		||||
                      from twts group by feed_id
 | 
			
		||||
                ) using (feed_id)
 | 
			
		||||
		left join (
 | 
			
		||||
			select
 | 
			
		||||
				feed_id parent_id,
 | 
			
		||||
@ -125,8 +131,8 @@ var (
 | 
			
		||||
		) using (parent_id)
 | 
			
		||||
		where datetime(
 | 
			
		||||
			coalesce(last_scan_on, '1901-01-01'),
 | 
			
		||||
			'+'||refresh_rate||' seconds'
 | 
			
		||||
		) < datetime(current_timestamp, '+2 minutes')
 | 
			
		||||
			'+'||abs(refresh_rate+cast(random()%30 as int))||' seconds'
 | 
			
		||||
		) < datetime(current_timestamp, '+3 minutes')
 | 
			
		||||
	`
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
@ -180,6 +186,7 @@ func (f *Feed) Scan(res interface{ Scan(...any) error }) error {
 | 
			
		||||
		&f.State,
 | 
			
		||||
		&f.LastScanOn,
 | 
			
		||||
		&f.NextScanOn,
 | 
			
		||||
		&f.LastTwtOn,
 | 
			
		||||
		&f.RefreshRate,
 | 
			
		||||
		&f.LastModified,
 | 
			
		||||
		&f.ETag,
 | 
			
		||||
@ -356,7 +363,6 @@ func storeFeed(ctx context.Context, db db, f types.TwtFile) error {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (feed *Feed) MakeHTTPRequest(ctx context.Context) (*http.Request, error) {
 | 
			
		||||
	feed.State = "fetch"
 | 
			
		||||
	if strings.Contains(feed.URI, "lublin.se") {
 | 
			
		||||
		return nil, fmt.Errorf("%w: permaban: %s", ErrPermanentlyDead, feed.URI)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										31
									
								
								http.go
									
									
									
									
									
								
							
							
						
						
									
										31
									
								
								http.go
									
									
									
									
									
								
							@ -31,11 +31,11 @@ func httpServer(c *console, app *appState) error {
 | 
			
		||||
		_, span := otel.Span(r.Context())
 | 
			
		||||
		defer span.End()
 | 
			
		||||
 | 
			
		||||
		w.Header().Set("Content-Type", "text/html; charset=utf-8")
 | 
			
		||||
		w.Header().Set("Content-Type", "text/plain; charset=utf-8")
 | 
			
		||||
		w.Write([]byte("ok"))
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	http.HandleFunc("/conv/{hash}", func(w http.ResponseWriter, r *http.Request) {
 | 
			
		||||
	http.HandleFunc("/api/plain/conv/{hash}", func(w http.ResponseWriter, r *http.Request) {
 | 
			
		||||
		ctx, span := otel.Span(r.Context())
 | 
			
		||||
		defer span.End()
 | 
			
		||||
 | 
			
		||||
@ -104,7 +104,7 @@ func httpServer(c *console, app *appState) error {
 | 
			
		||||
		reg.WriteTo(w)
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	http.HandleFunc("/users", func(w http.ResponseWriter, r *http.Request) {
 | 
			
		||||
	http.HandleFunc("/api/plain/users", func(w http.ResponseWriter, r *http.Request) {
 | 
			
		||||
		ctx, span := otel.Span(r.Context())
 | 
			
		||||
		defer span.End()
 | 
			
		||||
 | 
			
		||||
@ -112,14 +112,20 @@ func httpServer(c *console, app *appState) error {
 | 
			
		||||
 | 
			
		||||
		rows, err := db.QueryContext(
 | 
			
		||||
			ctx,
 | 
			
		||||
			`SELECT 
 | 
			
		||||
						feed_id, 
 | 
			
		||||
						uri, 
 | 
			
		||||
			`SELECT
 | 
			
		||||
						feed_id,
 | 
			
		||||
						uri,
 | 
			
		||||
						nick,
 | 
			
		||||
						last_scan_on 
 | 
			
		||||
						last_scan_on,
 | 
			
		||||
						last_twt_on
 | 
			
		||||
					FROM feeds
 | 
			
		||||
					where parent_id is null
 | 
			
		||||
					order by nick, uri`,
 | 
			
		||||
					left join (
 | 
			
		||||
						select feed_id, max(strftime('%Y-%m-%dT%H:%M:%fZ', (substring(text, 1, instr(text, '	')-1)))) last_twt_on
 | 
			
		||||
			                	from twts group by feed_id
 | 
			
		||||
					) using (feed_id)
 | 
			
		||||
					where parent_id is null and state not in ('permanantly-dead', 'frozen') and last_twt_on is not null
 | 
			
		||||
					order by nick, uri
 | 
			
		||||
                        `,
 | 
			
		||||
		)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			span.RecordError(err)
 | 
			
		||||
@ -134,15 +140,16 @@ func httpServer(c *console, app *appState) error {
 | 
			
		||||
				URI    string
 | 
			
		||||
				Nick   string
 | 
			
		||||
				Dt     TwtTime
 | 
			
		||||
				LastTwtOn TwtTime
 | 
			
		||||
			}
 | 
			
		||||
			err = rows.Scan(&o.FeedID, &o.URI, &o.Nick, &o.Dt)
 | 
			
		||||
			err = rows.Scan(&o.FeedID, &o.URI, &o.Nick, &o.Dt, &o.LastTwtOn)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				span.RecordError(err)
 | 
			
		||||
				return
 | 
			
		||||
			}
 | 
			
		||||
			twts = append(twts, lextwt.NewTwt(
 | 
			
		||||
				types.NewTwter(o.Nick, o.URI),
 | 
			
		||||
				lextwt.NewDateTime(o.Dt.Time, o.Dt.Time.Format(time.RFC3339)),
 | 
			
		||||
				lextwt.NewDateTime(o.Dt.Time, o.LastTwtOn.Time.Format(time.RFC3339)),
 | 
			
		||||
				nil,
 | 
			
		||||
			))
 | 
			
		||||
		}
 | 
			
		||||
@ -150,7 +157,7 @@ func httpServer(c *console, app *appState) error {
 | 
			
		||||
		reg.WriteTo(w)
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	http.HandleFunc("/queue", func(w http.ResponseWriter, r *http.Request) {
 | 
			
		||||
	http.HandleFunc("/api/plain/queue", func(w http.ResponseWriter, r *http.Request) {
 | 
			
		||||
		lis := slices.Collect(app.queue.Iter())
 | 
			
		||||
		sort.Slice(lis, func(i, j int) bool {
 | 
			
		||||
			return lis[i].NextScanOn.Time.Before(lis[j].LastScanOn.Time)
 | 
			
		||||
 | 
			
		||||
@ -23,7 +23,7 @@ const (
 | 
			
		||||
	OneDay     = 86400
 | 
			
		||||
	OneHour    = 3600
 | 
			
		||||
	TenMinutes = 600
 | 
			
		||||
	TwoMinutes = 60
 | 
			
		||||
	TwoMinutes = 120
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func feedRefreshProcessor(c *console, app *appState) error {
 | 
			
		||||
@ -66,9 +66,9 @@ func feedRefreshProcessor(c *console, app *appState) error {
 | 
			
		||||
		f := queue.ExtractMin()
 | 
			
		||||
		if f == nil {
 | 
			
		||||
			sleeping_time.Add(ctx, int64(TwoMinutes))
 | 
			
		||||
			span.AddEvent("sleeping for ", trace.WithAttributes(attribute.Int("seconds", int(TenMinutes))))
 | 
			
		||||
			span.AddEvent("sleeping for ", trace.WithAttributes(attribute.Int("seconds", int(TwoMinutes))))
 | 
			
		||||
			select {
 | 
			
		||||
			case <-time.After(TenMinutes * time.Second):
 | 
			
		||||
			case <-time.After(TwoMinutes * time.Second):
 | 
			
		||||
			case <-c.Done():
 | 
			
		||||
				return nil
 | 
			
		||||
			}
 | 
			
		||||
@ -93,7 +93,7 @@ func feedRefreshProcessor(c *console, app *appState) error {
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
		span.AddEvent(
 | 
			
		||||
			"till next", 
 | 
			
		||||
			"till next",
 | 
			
		||||
			trace.WithAttributes(attribute.String("time", until.String())))
 | 
			
		||||
		sleeping_time.Add(ctx, until.Milliseconds())
 | 
			
		||||
		select {
 | 
			
		||||
@ -140,13 +140,14 @@ func processorLoop(ctx context.Context, db db, fetch *pool[*Feed, *Response]) {
 | 
			
		||||
			err := res.err
 | 
			
		||||
			if res.err != nil {
 | 
			
		||||
				if errors.Is(err, ErrPermanentlyDead) {
 | 
			
		||||
                                        f.State = "permanantly-dead"
 | 
			
		||||
					f.RefreshRate = TenYear
 | 
			
		||||
				}
 | 
			
		||||
				if errors.Is(err, ErrTemporarilyDead) {
 | 
			
		||||
					f.RefreshRate = OneDay
 | 
			
		||||
					f.RefreshRate, f.State = tsTemp(f.LastTwtOn.Time)
 | 
			
		||||
				}
 | 
			
		||||
				if errors.Is(err, ErrUnmodified) {
 | 
			
		||||
					f.RefreshRate = OneDay
 | 
			
		||||
					f.RefreshRate, f.State = tsTemp(f.LastTwtOn.Time)
 | 
			
		||||
				}
 | 
			
		||||
 | 
			
		||||
				span.RecordError(err)
 | 
			
		||||
@ -205,7 +206,7 @@ func processorLoop(ctx context.Context, db db, fetch *pool[*Feed, *Response]) {
 | 
			
		||||
				continue
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			f.RefreshRate = checkTemp(twtfile.Twts())
 | 
			
		||||
			f.RefreshRate, f.State = checkTemp(twtfile.Twts())
 | 
			
		||||
			f.LastError.String = ""
 | 
			
		||||
 | 
			
		||||
			err = f.Save(ctx, db)
 | 
			
		||||
@ -217,9 +218,9 @@ func processorLoop(ctx context.Context, db db, fetch *pool[*Feed, *Response]) {
 | 
			
		||||
	span.RecordError(ctx.Err())
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func checkTemp(twts types.Twts) int {
 | 
			
		||||
func checkTemp(twts types.Twts) (int, State) {
 | 
			
		||||
	if len(twts) < 5 {
 | 
			
		||||
		return 7*OneDay
 | 
			
		||||
		return 7*OneDay, "cold"
 | 
			
		||||
	}
 | 
			
		||||
	sort.Sort(twts)
 | 
			
		||||
 | 
			
		||||
@ -227,32 +228,66 @@ func checkTemp(twts types.Twts) int {
 | 
			
		||||
	since_fifth := -time.Until(twts[4].Created())
 | 
			
		||||
 | 
			
		||||
	if since_first < 2 * time.Hour || since_fifth < 8 * time.Hour {
 | 
			
		||||
		return TwoMinutes
 | 
			
		||||
		return TwoMinutes, "hot"
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if since_first < 4 * time.Hour || since_fifth < 16 * time.Hour{
 | 
			
		||||
		return TenMinutes
 | 
			
		||||
		return TenMinutes, "hot"
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if since_first < 8 * time.Hour || since_fifth < 32 * time.Hour{
 | 
			
		||||
		return 2*TenMinutes
 | 
			
		||||
		return 2*TenMinutes, "warm"
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if since_first < 16 * time.Hour || since_fifth < 64 * time.Hour{
 | 
			
		||||
		return 4*TenMinutes
 | 
			
		||||
		return 4*TenMinutes, "warm"
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if since_first < 24 * time.Hour || since_fifth < 128 * time.Hour{
 | 
			
		||||
		return OneDay
 | 
			
		||||
		return OneDay, "cold"
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if since_first < 48 * time.Hour || since_fifth < 256 * time.Hour{
 | 
			
		||||
		return 2*OneDay
 | 
			
		||||
		return 2*OneDay, "cold"
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if since_first < 96 * time.Hour || since_fifth < 512 * time.Hour{
 | 
			
		||||
		return 7*OneDay
 | 
			
		||||
		return 7*OneDay, "frozen"
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return OneMonth
 | 
			
		||||
	return OneMonth, "frozen"
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func tsTemp(ts time.Time) (int, State) {
 | 
			
		||||
	since_first := -time.Until(ts)
 | 
			
		||||
 | 
			
		||||
	if since_first < 2 * time.Hour {
 | 
			
		||||
		return TwoMinutes, "hot"
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if since_first < 4 * time.Hour {
 | 
			
		||||
		return TenMinutes, "hot"
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if since_first < 8 * time.Hour {
 | 
			
		||||
		return 2*TenMinutes, "warm"
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if since_first < 16 * time.Hour {
 | 
			
		||||
		return 4*TenMinutes, "warm"
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if since_first < 24 * time.Hour {
 | 
			
		||||
		return OneDay, "cold"
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if since_first < 48 * time.Hour {
 | 
			
		||||
		return 2*OneDay, "cold"
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if since_first < 96 * time.Hour {
 | 
			
		||||
		return 7*OneDay, "frozen"
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return OneMonth, "frozen"
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user