initial commit
This commit is contained in:
219
pkg/promise/promise.go
Normal file
219
pkg/promise/promise.go
Normal file
@@ -0,0 +1,219 @@
|
||||
package promise
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/rs/zerolog/log"
|
||||
|
||||
"go.uber.org/ratelimit"
|
||||
)
|
||||
|
||||
type Q interface {
|
||||
Key() interface{}
|
||||
Context() context.Context
|
||||
Resolve(interface{})
|
||||
Reject(error)
|
||||
|
||||
Tasker
|
||||
}
|
||||
type ResultQ interface {
|
||||
Key() interface{}
|
||||
Context() context.Context
|
||||
Result() interface{}
|
||||
|
||||
Tasker
|
||||
}
|
||||
type Fn func(Q)
|
||||
type AfterFn func(ResultQ)
|
||||
type Key interface {
|
||||
Key() interface{}
|
||||
}
|
||||
|
||||
func typ(v interface{}) string {
|
||||
return fmt.Sprintf("%T", v)
|
||||
}
|
||||
|
||||
type qTask struct {
|
||||
key Key
|
||||
|
||||
fn Fn
|
||||
ctx context.Context
|
||||
|
||||
cancel func()
|
||||
done chan struct{}
|
||||
|
||||
result interface{}
|
||||
err error
|
||||
|
||||
Tasker
|
||||
}
|
||||
|
||||
func (t *qTask) Key() interface{} { return t.key }
|
||||
func (t *qTask) Context() context.Context { return t.ctx }
|
||||
func (t *qTask) Resolve(r interface{}) { t.result = r; t.finish() }
|
||||
func (t *qTask) Reject(err error) { t.err = err; t.finish() }
|
||||
|
||||
// After runs on successful completion of the task.
|
||||
func (t *qTask) After(fn AfterFn) {
|
||||
log := log.Ctx(t.Context())
|
||||
|
||||
go func() {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
log.Panic().Msgf("%v", r)
|
||||
}
|
||||
}()
|
||||
|
||||
<-t.Await()
|
||||
if err := t.Err(); err != nil {
|
||||
return
|
||||
}
|
||||
fn(t)
|
||||
}()
|
||||
}
|
||||
func (t *qTask) Await() <-chan struct{} { return t.done }
|
||||
func (t *qTask) Cancel() { t.err = fmt.Errorf("task cancelled"); t.finish() }
|
||||
|
||||
func (t *qTask) Result() interface{} { return t.result }
|
||||
func (t *qTask) Err() error { return t.err }
|
||||
|
||||
func (t *qTask) finish() {
|
||||
if t.done == nil {
|
||||
return
|
||||
}
|
||||
|
||||
t.cancel()
|
||||
close(t.done)
|
||||
t.done = nil
|
||||
}
|
||||
|
||||
type Option interface {
|
||||
Apply(*qTask)
|
||||
}
|
||||
type OptionFn func(*qTask)
|
||||
|
||||
func (fn OptionFn) Apply(t *qTask) { fn(t) }
|
||||
|
||||
type Tasker interface {
|
||||
Run(Key, Fn, ...Option) *qTask
|
||||
}
|
||||
|
||||
type Runner struct {
|
||||
defaultOpts []Option
|
||||
queue map[interface{}]*qTask
|
||||
mu sync.RWMutex
|
||||
ctx context.Context
|
||||
cancel func()
|
||||
pause chan struct{}
|
||||
limiter ratelimit.Limiter
|
||||
}
|
||||
|
||||
type Timeout time.Duration
|
||||
|
||||
func (d Timeout) Apply(task *qTask) {
|
||||
task.ctx, task.cancel = context.WithTimeout(task.ctx, time.Duration(d))
|
||||
}
|
||||
|
||||
func (tr *Runner) Run(key Key, fn Fn, opts ...Option) *qTask {
|
||||
log := log.Ctx(tr.ctx)
|
||||
|
||||
tr.mu.RLock()
|
||||
log.Trace().Interface(typ(key), key.Key()).Msg("task to run")
|
||||
|
||||
if task, ok := tr.queue[key.Key()]; ok {
|
||||
tr.mu.RUnlock()
|
||||
log.Trace().Interface(typ(key), key.Key()).Msg("task found running")
|
||||
|
||||
return task
|
||||
}
|
||||
tr.mu.RUnlock()
|
||||
|
||||
task := &qTask{
|
||||
key: key,
|
||||
fn: fn,
|
||||
cancel: func() {},
|
||||
ctx: tr.ctx,
|
||||
done: make(chan struct{}),
|
||||
Tasker: tr,
|
||||
}
|
||||
|
||||
for _, opt := range tr.defaultOpts {
|
||||
opt.Apply(task)
|
||||
}
|
||||
|
||||
for _, opt := range opts {
|
||||
opt.Apply(task)
|
||||
}
|
||||
|
||||
tr.mu.Lock()
|
||||
tr.queue[key.Key()] = task
|
||||
tr.mu.Unlock()
|
||||
|
||||
tr.limiter.Take()
|
||||
|
||||
go func() {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
log.Panic().Msgf("%v", r)
|
||||
}
|
||||
|
||||
if err := task.Err(); err == nil {
|
||||
log.Trace().Interface(typ(key), key.Key()).Msg("task complete")
|
||||
} else {
|
||||
log.Debug().Interface(typ(key), key.Key()).Err(err).Msg("task Failed")
|
||||
}
|
||||
|
||||
tr.mu.Lock()
|
||||
delete(tr.queue, task.Key())
|
||||
tr.mu.Unlock()
|
||||
}()
|
||||
|
||||
log.Trace().Interface(typ(key), key.Key()).Msg("task Running")
|
||||
|
||||
task.fn(task)
|
||||
}()
|
||||
|
||||
return task
|
||||
}
|
||||
|
||||
func NewRunner(ctx context.Context, defaultOpts ...Option) *Runner {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
|
||||
tr := &Runner{
|
||||
defaultOpts: defaultOpts,
|
||||
queue: make(map[interface{}]*qTask),
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
pause: make(chan struct{}),
|
||||
limiter: ratelimit.New(10),
|
||||
}
|
||||
|
||||
return tr
|
||||
}
|
||||
|
||||
func (tr *Runner) List() []*qTask {
|
||||
tr.mu.RLock()
|
||||
defer tr.mu.RUnlock()
|
||||
|
||||
lis := make([]*qTask, 0, len(tr.queue))
|
||||
|
||||
for _, task := range tr.queue {
|
||||
lis = append(lis, task)
|
||||
}
|
||||
|
||||
return lis
|
||||
}
|
||||
|
||||
func (tr *Runner) Stop() {
|
||||
tr.cancel()
|
||||
}
|
||||
|
||||
func (tr *Runner) Len() int {
|
||||
tr.mu.RLock()
|
||||
defer tr.mu.RUnlock()
|
||||
|
||||
return len(tr.queue)
|
||||
}
|
||||
47
pkg/promise/with-cache.go
Normal file
47
pkg/promise/with-cache.go
Normal file
@@ -0,0 +1,47 @@
|
||||
package promise
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/rs/zerolog/log"
|
||||
"github.com/sour-is/keyproofs/pkg/cache"
|
||||
)
|
||||
|
||||
func WithCache(c cache.Cacher, expireAfter time.Duration) OptionFn {
|
||||
return func(task *qTask) {
|
||||
innerFn := task.fn
|
||||
task.fn = func(q Q) {
|
||||
log := log.Ctx(q.Context())
|
||||
|
||||
cacheKey, ok := q.Key().(cache.Key)
|
||||
if !ok {
|
||||
log.Trace().Interface(typ(q), q.Key()).Msg("not a cache key")
|
||||
innerFn(q)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
if v, ok := c.Get(cacheKey); ok {
|
||||
log.Trace().Interface(typ(cacheKey), cacheKey.Key()).Msg("task result in cache")
|
||||
q.Resolve(v.Value())
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
log.Trace().Interface(typ(cacheKey), cacheKey.Key()).Msg("task not in cache")
|
||||
innerFn(q)
|
||||
|
||||
if err := task.Err(); err != nil {
|
||||
log.Err(err)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// expireAfter = time.Duration(rand.Int63() % int64(5*time.Second))
|
||||
result := cache.NewItem(cacheKey, task.Result(), expireAfter)
|
||||
|
||||
log.Trace().Interface(typ(cacheKey), cacheKey.Key()).Msgf("task result to cache")
|
||||
c.Add(cacheKey, result)
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user