go-paste/src/pkg/promise/promise.go
2020-10-22 21:17:16 -06:00

224 lines
4.1 KiB
Go

package promise
import (
"context"
"fmt"
"sync"
"time"
"go.uber.org/ratelimit"
"sour.is/x/paste/src/pkg/cache"
"sour.is/x/toolbox/log"
)
type Q interface {
Key() interface{}
Context() context.Context
Resolve(interface{})
Reject(error)
Tasker
}
type Fn func(Q)
type Key interface {
Key() interface{}
}
type qTask struct {
key Key
fn Fn
ctx context.Context
cancel func()
done chan struct{}
result interface{}
err error
Tasker
}
func (t *qTask) Key() interface{} { return t.key }
func (t *qTask) Context() context.Context { return t.ctx }
func (t *qTask) Resolve(r interface{}) { t.result = r; t.finish() }
func (t *qTask) Reject(err error) { t.err = err; t.finish() }
func (t *qTask) Await() <-chan struct{} { return t.done }
func (t *qTask) Cancel() { t.err = fmt.Errorf("task cancelled"); t.finish() }
func (t *qTask) Result() interface{} { return t.result }
func (t *qTask) Err() error { return t.err }
func (t *qTask) finish() {
if t.done == nil {
return
}
t.cancel()
close(t.done)
t.done = nil
}
type Option interface {
Apply(*qTask)
}
type OptionFn func(*qTask)
func (fn OptionFn) Apply(t *qTask) { fn(t) }
type Tasker interface {
Run(Key, Fn, ...Option) *qTask
}
type Runner struct {
defaultOpts []Option
queue map[interface{}]*qTask
mu sync.RWMutex
ctx context.Context
cancel func()
pause chan struct{}
limiter ratelimit.Limiter
}
type Timeout time.Duration
func (d Timeout) Apply(task *qTask) {
task.ctx, task.cancel = context.WithTimeout(task.ctx, time.Duration(d))
}
func (tr *Runner) Run(key Key, fn Fn, opts ...Option) *qTask {
tr.mu.RLock()
log.Infos("task to run", fmt.Sprintf("%T", key), key.Key())
if task, ok := tr.queue[key.Key()]; ok {
tr.mu.RUnlock()
log.Infos("task found running", fmt.Sprintf("%T", key), key.Key())
return task
}
tr.mu.RUnlock()
task := &qTask{
key: key,
fn: fn,
cancel: func() {},
ctx: tr.ctx,
done: make(chan struct{}),
Tasker: tr,
}
for _, opt := range tr.defaultOpts {
opt.Apply(task)
}
for _, opt := range opts {
opt.Apply(task)
}
tr.mu.Lock()
tr.queue[key.Key()] = task
tr.mu.Unlock()
tr.limiter.Take()
go func() {
defer func() {
if r := recover(); r != nil {
task.err = fmt.Errorf("PANIC: %v", r)
}
if err := task.Err(); err == nil {
log.Infos("task complete", fmt.Sprintf("%T", task.Key()), task.Key())
} else {
log.Errors("task Failed", fmt.Sprintf("%T", task.Key()), task.Key(), "err", err)
}
}()
log.Infos("task Running", fmt.Sprintf("%T", task.Key()), task.Key())
task.fn(task)
tr.mu.Lock()
delete(tr.queue, task.Key())
tr.mu.Unlock()
}()
return task
}
func NewRunner(ctx context.Context, defaultOpts ...Option) *Runner {
ctx, cancel := context.WithCancel(ctx)
tr := &Runner{
defaultOpts: defaultOpts,
queue: make(map[interface{}]*qTask),
ctx: ctx,
cancel: cancel,
pause: make(chan struct{}),
limiter: ratelimit.New(1),
}
return tr
}
func (tr *Runner) List() []*qTask {
tr.mu.RLock()
defer tr.mu.RUnlock()
lis := make([]*qTask, 0, len(tr.queue))
for _, task := range tr.queue {
lis = append(lis, task)
}
return lis
}
func (tr *Runner) Stop() {
tr.cancel()
}
func (tr *Runner) Len() int {
tr.mu.RLock()
defer tr.mu.RUnlock()
return len(tr.queue)
}
func WithCache(c cache.Cacher, expireAfter time.Duration) OptionFn {
return func(task *qTask) {
innerFn := task.fn
task.fn = func(q Q) {
cacheKey, ok := q.Key().(cache.Key)
if !ok {
log.Infos("not a cache key", fmt.Sprintf("%T", q.Key()), q.Key())
innerFn(q)
return
}
if v, ok := c.Get(cacheKey); ok {
log.Infos("value in cache", fmt.Sprintf("%T", cacheKey), cacheKey.Key())
q.Resolve(v.Value())
return
}
log.Infos("not in cache", fmt.Sprintf("%T", cacheKey), cacheKey.Key())
innerFn(q)
if err := task.Err(); err != nil {
log.Error(err)
return
}
result := cache.NewItem(cacheKey, task.Result(), expireAfter)
log.Infos("result to cache", fmt.Sprintf("%T", cacheKey), cacheKey.Key())
c.Add(cacheKey, result)
}
}
}