diff --git a/pkg/cron/cron.go b/pkg/cron/cron.go new file mode 100644 index 0000000..b7e58ec --- /dev/null +++ b/pkg/cron/cron.go @@ -0,0 +1,160 @@ +package cron + +import ( + "context" + "fmt" + "math/rand" + "strconv" + "strings" + "time" + + "github.com/sour-is/ev/internal/lg" + "github.com/sour-is/ev/pkg/locker" + "github.com/sour-is/ev/pkg/set" + "go.opentelemetry.io/otel/attribute" + "golang.org/x/sync/errgroup" +) + +type task func(context.Context, time.Time) error +type job struct { + Month, Weekday, Day, + Hour, Minute, Second *set.BoundSet[int8] + Task task +} + +var DefaultGranularity = time.Minute + +type state struct { + queue []task +} +type cron struct { + jobs []job + state *locker.Locked[state] + granularity time.Duration +} + +func New(granularity time.Duration) *cron { + return &cron{granularity: granularity, state: locker.New(&state{})} +} + +func parseInto(c string, s *set.BoundSet[int8]) *set.BoundSet[int8] { + if c == "*" || c == "" { + s.AddRange(0, 100) + } + for _, split := range strings.Split(c, ",") { + minmax := strings.SplitN(split, "-", 2) + switch len(minmax) { + case 2: + min, _ := strconv.ParseInt(minmax[0], 10, 8) + max, _ := strconv.ParseInt(minmax[1], 10, 8) + s.AddRange(int8(min), int8(max)) + default: + min, _ := strconv.ParseInt(minmax[0], 10, 8) + s.Add(int8(min)) + } + } + return s +} + +// This function creates a new job that occurs at the given day and the given +// 24hour time. Any of the values may be -1 as an "any" match, so passing in +// a day of -1, the event occurs every day; passing in a second value of -1, the +// event will fire every second that the other parameters match. +func (c *cron) NewJob(expr string, task task) { + sp := append(strings.Fields(expr), make([]string, 5)...)[:5] + + job := job{ + Month: parseInto(sp[4], set.NewBoundSet[int8](1, 12)), + Weekday: parseInto(sp[3], set.NewBoundSet[int8](0, 6)), + Day: parseInto(sp[2], set.NewBoundSet[int8](1, 31)), + Hour: parseInto(sp[1], set.NewBoundSet[int8](0, 23)), + Minute: parseInto(sp[0], set.NewBoundSet[int8](0, 59)), + Task: task, + } + c.jobs = append(c.jobs, job) +} +func (c *cron) Once(ctx context.Context, once task) { + c.state.Modify(ctx, func(ctx context.Context, state *state) error { + state.queue = append(state.queue, once) + return nil + }) +} + +func (cj job) Matches(t time.Time) (ok bool) { + return cj.Month.Has(int8(t.Month())) && + cj.Day.Has(int8(t.Day())) && + cj.Weekday.Has(int8(t.Weekday()%7)) && + cj.Hour.Has(int8(t.Hour())) && + cj.Minute.Has(int8(t.Minute())) +} + +func (cj job) String() string { + return fmt.Sprintf("job[\n m:%s\n h:%s\n d:%s\n w:%s\n M:%s\n]", cj.Minute, cj.Hour, cj.Day, cj.Weekday, cj.Month) +} + +func (c *cron) Run(ctx context.Context) error { + tick := time.NewTicker(c.granularity) + defer tick.Stop() + + go c.run(ctx, time.Now()) + + for { + select { + case <-ctx.Done(): + return nil + case now := <-tick.C: + // fmt.Println(now.Second(), now.Hour(), now.Day(), int8(now.Weekday()), uint8(now.Month())) + go c.run(ctx, now) + } + } +} + +func (c *cron) run(ctx context.Context, now time.Time) { + var run []task + ctx, span := lg.Span(ctx) + defer span.End() + + // Add Jitter + timer := time.NewTimer(time.Duration(rand.Intn(300)) * time.Millisecond) + select { + case <-ctx.Done(): + timer.Stop() + return + case <-timer.C: + } + + span.AddEvent("Cron Run: " + now.Format(time.RFC822)) + // fmt.Println("Cron Run: ", now.Format(time.RFC822)) + + c.state.Modify(ctx, func(ctx context.Context, state *state) error { + run = append(run, state.queue...) + state.queue = state.queue[:0] + + return nil + }) + + for _, j := range c.jobs { + if j.Matches(now) { + span.AddEvent(j.String()) + run = append(run, j.Task) + } + } + + if len(run) == 0 { + return + } + + wg, _ := errgroup.WithContext(ctx) + + for i := range run { + fn := run[i] + wg.Go(func() error { return fn(ctx, now) }) + } + span.SetAttributes( + attribute.String("tick", now.String()), + attribute.Int("count", len(run)), + ) + + err := wg.Wait() + span.RecordError(err) +}