Skip to content

Commit

Permalink
refactor using futures
Browse files Browse the repository at this point in the history
  • Loading branch information
juliaqiuxy committed Jun 5, 2021
1 parent 87dc58e commit c158328
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 43 deletions.
76 changes: 33 additions & 43 deletions cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"
"encoding/json"
"errors"
"sync"
"fmt"
"time"

"github.com/thoas/go-funk"
Expand All @@ -31,11 +31,7 @@ type StorageMaker func() (Storage, error)
type StartStorageOp func(ctx context.Context, opName string) interface{}
type FinishStorageOp func(interface{})
type Cache struct {
storages []Storage

makers []StorageMaker
initOnce sync.Once
initErr error
storages Future

startOperation StartStorageOp
finishOperation FinishStorageOp
Expand Down Expand Up @@ -66,61 +62,47 @@ func CreateWithHooks(sop StartStorageOp, fop FinishStorageOp, maker StorageMaker
c = &Cache{
startOperation: sop,
finishOperation: fop,
makers: makers,

storages: Promise(func() (interface{}, error) {
return initializeStorages(c, makers)
}),
}

return c, nil
}

func (c *Cache) ensureStorages() error {
c.initOnce.Do(func() {
c.storages = make([]Storage, 0, len(c.makers))
for _, makeStorage := range c.makers {
storage, serr := makeStorage()
if serr != nil {
c.initErr = serr
// TODO(juliaqiuxy) log error
break
}

c.storages = append(c.storages, storage)
func initializeStorages(c *Cache, makers []StorageMaker) ([]Storage, error) {
storages := make([]Storage, 0, len(makers))
for _, makeStorage := range makers {
storage, err := makeStorage()
if err != nil {
return nil, fmt.Errorf(errWFCacheInitialize, err)
}
})

if c.initErr != nil {
return c.initErr
storages = append(storages, storage)
}

// Due to how once.Do works, if the first go routine to acquire
// the lock takes a while to execute, other go routines trying to
// ensure may temporarily get an empty storages slice to work with.
// Since this isn't ideal, we ask to retry
// TODO(juliaqiuxy) instead of an error, we could wait on once to
// finish
if len(c.storages) != len(c.makers) {
return errors.New("cache not ready, try again soon")
}

return nil
return storages, nil
}

func (c *Cache) Get(key string) (*CacheItem, error) {
return c.GetWithContext(context.Background(), key)
}

func (c *Cache) GetWithContext(ctx context.Context, key string) (*CacheItem, error) {
err := c.ensureStorages()
ss, err := c.storages.Await()
if err != nil {
return nil, err
}
storages := ss.([]Storage)

so := c.startOperation(ctx, "Get")
defer c.finishOperation(so)

missingKeyByStorage := map[Storage]string{}

// start waterfall
for _, storage := range c.storages {
for _, storage := range storages {
cacheItem := storage.Get(ctx, key)

if cacheItem == nil {
Expand Down Expand Up @@ -153,10 +135,11 @@ func (c *Cache) BatchGet(keys []string) ([]*CacheItem, error) {
func (c *Cache) BatchGetWithContext(ctx context.Context, keys []string) ([]*CacheItem, error) {
// TODO(juliaqiuxy) Detect dupes, empty keys, then bail

err := c.ensureStorages()
ss, err := c.storages.Await()
if err != nil {
return nil, err
}
storages := ss.([]Storage)

so := c.startOperation(ctx, "BatchGet")
defer c.finishOperation(so)
Expand All @@ -172,7 +155,7 @@ func (c *Cache) BatchGetWithContext(ctx context.Context, keys []string) ([]*Cach
missingKeysByStorage := map[Storage][]string{}

// start waterfall
for _, storage := range c.storages {
for _, storage := range storages {
mds := storage.BatchGet(ctx, missingKeys)

if len(mds) != 0 {
Expand Down Expand Up @@ -233,10 +216,11 @@ func (c *Cache) Set(key string, value interface{}) error {
}

func (c *Cache) SetWithContext(ctx context.Context, key string, value interface{}) error {
err := c.ensureStorages()
ss, err := c.storages.Await()
if err != nil {
return err
}
storages := ss.([]Storage)

so := c.startOperation(ctx, "Set")
defer c.finishOperation(so)
Expand All @@ -246,7 +230,7 @@ func (c *Cache) SetWithContext(ctx context.Context, key string, value interface{
return err
}

for _, storage := range c.storages {
for _, storage := range storages {
err := storage.Set(ctx, key, v)
if err != nil {
return err
Expand All @@ -261,10 +245,11 @@ func (c *Cache) BatchSet(pairs map[string]interface{}) error {
}

func (c *Cache) BatchSetWithContext(ctx context.Context, pairs map[string]interface{}) error {
err := c.ensureStorages()
ss, err := c.storages.Await()
if err != nil {
return err
}
storages := ss.([]Storage)

so := c.startOperation(ctx, "BatchSet")
defer c.finishOperation(so)
Expand All @@ -279,7 +264,7 @@ func (c *Cache) BatchSetWithContext(ctx context.Context, pairs map[string]interf
vPairs[key] = v
}

for _, storage := range c.storages {
for _, storage := range storages {
err := storage.BatchSet(ctx, vPairs)
if err != nil {
return err
Expand All @@ -294,15 +279,16 @@ func (c *Cache) Del(key string) error {
}

func (c *Cache) DelWithContext(ctx context.Context, key string) error {
err := c.ensureStorages()
ss, err := c.storages.Await()
if err != nil {
return err
}
storages := ss.([]Storage)

so := c.startOperation(ctx, "Del")
defer c.finishOperation(so)

for _, storage := range c.storages {
for _, storage := range storages {
err := storage.Del(ctx, key)
if err != nil {
return err
Expand All @@ -311,3 +297,7 @@ func (c *Cache) DelWithContext(ctx context.Context, key string) error {

return nil
}

const errWFCacheInitialize = `error: %s
wfcache failed to initialize`
47 changes: 47 additions & 0 deletions promise.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package wfcache

import (
"context"
)

type Future interface {
Await() (interface{}, error)
AwaitWithContext(context.Context) (interface{}, error)
}

type promise struct {
await func(ctx context.Context) (interface{}, error)
}

func (f promise) Await() (interface{}, error) {
return f.await(context.Background())
}

func (f promise) AwaitWithContext(ctx context.Context) (interface{}, error) {
return f.await(ctx)
}

func Promise(f func() (interface{}, error)) Future {
var val interface{}
var err error

c := make(chan struct{})
go func() {
defer close(c)
val, err = f()
}()

return promise{
await: func(ctx context.Context) (interface{}, error) {
select {
case <-c:
if err != nil {
return nil, err
}
return val, nil
case <-ctx.Done():
return nil, ctx.Err()
}
},
}
}

0 comments on commit c158328

Please sign in to comment.