Skip to content

Commit

Permalink
memtask
Browse files Browse the repository at this point in the history
  • Loading branch information
dahernan committed Jul 24, 2019
1 parent c77ec98 commit ab114ff
Show file tree
Hide file tree
Showing 3 changed files with 296 additions and 0 deletions.
29 changes: 29 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Memtask

Simple async in memory task execution.

Ideal for small project or webapp.

Usage:

```
m := memtask.NewManager(5 * time.Minute)
id := m.Run(ctx, func(ctx context.Context, task Task) error {
// do some work
time.Sleep(1 * time.Second)
// store the results of the work
task.Data = "raw data"
task.Store()
return nil
})
// check the results
task, ok := m.Get(id)
if !ok {...}
if !task.IsFinished() {
// not finished
}
// results here
task.Data
```
144 changes: 144 additions & 0 deletions memtask.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
// Simple memory task manager, to run task async on memory and track the status of the task
// the tasks are deleted from memory when they finished after the expiration time
// so you can store the results of the task to be collected
package memtask

import (
"context"
"crypto/rand"
"fmt"
"sort"
"sync"
"time"
)

const (
TaskStatusProcessing = "processing"
TaskStatusComplete = "complete"
TaskStatusFailed = "failed"
)

type Manager struct {
tasks sync.Map
expireTime time.Duration
}

type Task struct {
ID string `json:"id,omitempty"`
Status string `json:"status,omitempty"`
Err error `json:"-"`
ErrorMessage string `json:"error"`
Started time.Time `json:"started,omitempty"`
Finished time.Time `json:"finished,omitempty"`
Data interface{} `json:"-"`

manager *Manager
}

func NewManager(expireTime time.Duration) *Manager {
return &Manager{
expireTime: expireTime,
}
}

func (m *Manager) Run(ctx context.Context, fn func(ctx context.Context, task Task) error) string {
id := timestampShortUUID()
task := Task{
ID: id,
Status: TaskStatusProcessing,
Started: time.Now(),
manager: m,
}
m.Store(task)
go func() {
err := fn(ctx, task)
// refresh the task (it may be changes)
task, ok := m.Get(task.ID)
if !ok {
return
}
task.Status = TaskStatusComplete
task.Finished = time.Now()
if err != nil {
task.Err = err
task.ErrorMessage = err.Error()
task.Status = TaskStatusFailed
}
m.Store(task)
}()
return id
}

func (m *Manager) Get(ID string) (Task, bool) {
v, ok := m.tasks.Load(ID)
if !ok {
return Task{}, ok
}
return v.(Task), ok
}

func (m *Manager) Delete(ID string) {
_, ok := m.tasks.Load(ID)
if !ok {
return
}
m.tasks.Delete(ID)
}

func (m *Manager) GetAll() []Task {
var taskKeys []string
m.tasks.Range(func(key, value interface{}) bool {
taskKeys = append(taskKeys, key.(string))
return true
})
tasks := []Task{}
sort.Slice(taskKeys, func(i, j int) bool {
return taskKeys[i] > taskKeys[j]
})
for i := range taskKeys {
taskObj, ok := m.tasks.Load(taskKeys[i])
if !ok {
continue
}
task := taskObj.(Task)
// if expired -> delete
if task.Status != TaskStatusProcessing && task.Finished.Add(m.expireTime).Before(time.Now()) {
m.tasks.Delete(task.ID)
} else {
tasks = append(tasks, task)
}
}
return tasks
}

func (m *Manager) Store(task Task) {
m.tasks.Store(task.ID, task)
}

func (tk Task) Store() {
tk.manager.Store(tk)
}

func (tk Task) IsFinished() bool {
if tk.Status == TaskStatusComplete {
return true
}
if tk.Status == TaskStatusFailed {
return true
}
return false
}

func timestampShortUUID() string {
now := uint32(time.Now().UTC().Unix())

b := make([]byte, 4)
count, err := rand.Read(b)
if err != nil {
panic(err)
}
if count != len(b) {
panic("not enough random bytes")
}
return fmt.Sprintf("%08x%x", now, b)
}
123 changes: 123 additions & 0 deletions memtask_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package memtask

import (
"errors"
"testing"
"time"

"github.com/matryer/is"
"golang.org/x/net/context"
)

func TestRun(t *testing.T) {
is := is.New(t)

ctx := context.Background()

now := time.Now()
m := NewManager(4 * time.Second)

id := m.Run(ctx, func(ctx context.Context, task Task) error {
time.Sleep(1 * time.Second)
task.Data = "raw data"
task.Store()
return nil
})

task, ok := m.Get(id)
is.True(ok)
is.True(task.Started.After(now))

// wait to finish
wait(m, id)

task, ok = m.Get(id)
is.True(ok)
is.Equal(task.Status, TaskStatusComplete)
is.Equal(task.Err, nil)
is.Equal(task.Data, "raw data")
is.True(task.Finished.After(task.Started))

}

func TestError(t *testing.T) {
is := is.New(t)

ctx := context.Background()

now := time.Now()
m := NewManager(4 * time.Second)

id := m.Run(ctx, func(ctx context.Context, task Task) error {
task.Data = "raw data"
task.Store()
return errors.New("some error here")
})

task, ok := m.Get(id)
is.True(ok)
is.True(task.Started.After(now))

// wait to finish
wait(m, id)

task, ok = m.Get(id)
is.True(ok)
is.Equal(task.Status, TaskStatusFailed)
is.Equal(task.Err.Error(), "some error here")
is.Equal(task.Data, "raw data")
is.True(task.Finished.After(task.Started))
}

func TestMultiRun(t *testing.T) {
is := is.New(t)

ctx := context.Background()

m := NewManager(1 * time.Second)

fn := func(ctx context.Context, task Task) error {
task.Data = task.ID
task.Store()
return nil
}

id1 := m.Run(ctx, fn)
id2 := m.Run(ctx, fn)
id3 := m.Run(ctx, fn)

tasks := m.GetAll()
is.Equal(len(tasks), 3)
is.Equal(tasks[0].Status, TaskStatusProcessing)

// wait to finish
wait(m, id1)
wait(m, id2)
wait(m, id3)

tasks = m.GetAll()
is.Equal(len(tasks), 3)
is.Equal(tasks[0].Status, TaskStatusComplete)
is.Equal(tasks[0].Err, nil)
is.Equal(tasks[0].Data, tasks[0].ID)

// wait expiration
time.Sleep(2 * time.Second)
tasks = m.GetAll()
is.Equal(len(tasks), 0)

}

// poll waiting for testing
func wait(m *Manager, taskID string) {
for {
task, ok := m.Get(taskID)
if !ok {
return
}
if task.IsFinished() {
return
}
time.Sleep(500 * time.Millisecond)
}
}

0 comments on commit ab114ff

Please sign in to comment.