diff --git a/LICENSE.md b/LICENSE.md new file mode 100644 index 0000000..04d201a --- /dev/null +++ b/LICENSE.md @@ -0,0 +1,9 @@ +The MIT License (MIT) + +Copyright (c) 2021 Julia Qiu + +Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..53fcf46 --- /dev/null +++ b/README.md @@ -0,0 +1,84 @@ +# Waterfall Cache + +[![GoDoc](https://godoc.org/github.com/juliaqiuxy/wfcache?status.svg)](https://godoc.org/github.com/juliaqiuxy/wfcache) [![npm](https://img.shields.io/github/license/juliaqiuxy/wfcache.svg?style=flat-square)](https://github.com/juliaqiuxy/wfcache/blob/master/LICENSE.md) + +wfcache is a multi-layered cache with waterfall hit propagation and built-in storage adapters for DynamoDB, Redis, BigCache (in-memory) + +> This project is under active development. Use at your own risk. + +wfcache is effective for read-heavy workloads and it can be used both as a side-cache or a read-through/write-through cache. + +## Built-in Storage Adapters + +| Package | Description | Eviction strategy +| --- | --- | --- | +| [Basic](basic/basic.go) | Basic in-memory storage | TTL (enforced on get) | +| [BigCache](https://github.com/allegro/bigcache) | BigCache | TTL/LRU | +| [Redis](https://github.com/go-redis/redis) | Redis | TTL/LRU | +| [DynamoDB](https://docs.aws.amazon.com/sdk-for-go/api/service/dynamodb) | DynamoDB | [TTL](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/howitworks-ttl.html) | + +## Installation + +To retrieve wfcache, run: + +```sh +$ go get github.com/juliaqiuxy/wfcache +``` + +To update to the latest version, run: + +```sh +$ go get -u github.com/juliaqiuxy/wfcache +``` + +### Usage + +```go +import "github.com/juliaqiuxy/wfcache" + +wfcache.Create( + onStartOperation, + onFinishOperation, + basicAdapter.Create(time.Minute), + bigCacheAdapter.Create(time.Hour), + dynamodbAdapter.Create(dynamodbClient, "my-cache-table", 3, 3, 24 * time.Hour), +) +``` + +## How it works + +The following steps outline how reads from wfcache work: + +- When getting a value, wfcache tries to read it from the first storage layer (e.g. BigCache). +- If the storage layer is not populated with the requested key-value pair (cache miss), transparent to the application, wfcache notes the missing key and moves on to the next layer. This continues until all configured storage options are exhausted. +- When there is a cache hit, wfcache then primes each storage layer with a previously reported cache miss to make the data available for any subsequent reads. +- wfcache returns the key-value pair back to the application + +If you want to use wfcache as read-through cache, you can implement a [custom adapter](#implementing-custom-adapters) for your source database and configure it as the last storage layer. In this setup, a cache miss only ever happens in intermediate storage layers (which are then primed as your source storage resolves values) but wfcache would always yield data. + +When mutating wfcache, key-value pairs are written and removed from all storage layers. To mutate a specific storage layer in isolation, you can keep a refernece to it. However, this is not recommended as the interface is subject to change. + +### Cache eviction + +wfcache leaves it up to each storage layer to implement their eviction strategy. Built-in adapters use a combination of Time-to-Live (TTL) and Least Recently Used (LRU) algorithm to decide which items to evict. + +Also note that the built-in Basic storage is not meant for production use as the TTL enforcement only happens if and when a "stale" item is requested form the storage layer. + +## Implementing Custom Adapters + +For use cases where: + +- you require a stroge adapter which is not [included](#built-in-storage-adapters) in wfcache, or +- you want to use wfcache as a read-through/write-through cache + +it is trivial to extend wfcache by implementing the following adapter interface: + +```go +type Storage interface { + Get(ctx context.Context, key string) *Metadata + BatchGet(ctx context.Context, keys []string) []*Metadata + Set(ctx context.Context, key string, value []byte) error + BatchSet(ctx context.Context, pairs map[string][]byte) error + Del(ctx context.Context, key string) error +} +``` \ No newline at end of file diff --git a/basic/basic.go b/basic/basic.go new file mode 100644 index 0000000..02c75b0 --- /dev/null +++ b/basic/basic.go @@ -0,0 +1,107 @@ +package memory + +import ( + "context" + "errors" + "sync" + "time" + + "github.com/juliaqiuxy/wfcache" +) + +const NoTTL time.Duration = -1 + +type BasicStorage struct { + pairs map[string]*wfcache.CacheItem + ttl time.Duration + + mutex sync.RWMutex +} + +func Create(ttl time.Duration) wfcache.StorageMaker { + return func() (wfcache.Storage, error) { + if ttl == 0 { + return nil, errors.New("basic: storage requires a ttl") + } + + s := &BasicStorage{ + pairs: make(map[string]*wfcache.CacheItem), + ttl: ttl, + } + + return s, nil + } +} + +func (s *BasicStorage) TimeToLive() time.Duration { + return s.ttl +} + +func (s *BasicStorage) Get(ctx context.Context, key string) *wfcache.CacheItem { + s.mutex.RLock() + defer s.mutex.RUnlock() + + m, found := s.pairs[key] + + if found { + if s.ttl == NoTTL || time.Now().UTC().Before(time.Unix(m.ExpiresAt, 0)) { + return m + } else { + s.Del(ctx, key) + } + } + + return nil +} + +func (s *BasicStorage) BatchGet(ctx context.Context, keys []string) (results []*wfcache.CacheItem) { + s.mutex.RLock() + defer s.mutex.RUnlock() + + for _, key := range keys { + m := s.Get(ctx, key) + + if m != nil { + results = append(results, m) + } + } + + return results +} + +func (s *BasicStorage) Set(ctx context.Context, key string, data []byte) error { + s.mutex.Lock() + defer s.mutex.Unlock() + + s.pairs[key] = &wfcache.CacheItem{ + Key: key, + Value: data, + ExpiresAt: time.Now().UTC().Add(s.ttl).Unix(), + } + + return nil +} + +func (s *BasicStorage) BatchSet(ctx context.Context, pairs map[string][]byte) error { + s.mutex.Lock() + defer s.mutex.Unlock() + + for key, data := range pairs { + s.pairs[key] = &wfcache.CacheItem{ + Key: key, + Value: data, + ExpiresAt: time.Now().UTC().Add(s.ttl).Unix(), + } + } + + return nil +} + +func (s *BasicStorage) Del(ctx context.Context, key string) error { + s.mutex.Lock() + defer s.mutex.Unlock() + + delete(s.pairs, key) + + return nil +} diff --git a/bigcache/bigcache.go b/bigcache/bigcache.go new file mode 100644 index 0000000..77933a6 --- /dev/null +++ b/bigcache/bigcache.go @@ -0,0 +1,44 @@ +package bigcache + +// TODO(juliaqiuxy) Implement using https://github.com/allegro/bigcache + +import ( + "context" + "time" + + "github.com/juliaqiuxy/wfcache" +) + +type BigCacheStorage struct{} + +func Create(ttl time.Duration) wfcache.StorageMaker { + return func() (wfcache.Storage, error) { + s := &BigCacheStorage{} + + return s, nil + } +} + +func (s *BigCacheStorage) TimeToLive() time.Duration { + return 0 +} + +func (s *BigCacheStorage) Get(ctx context.Context, key string) *wfcache.CacheItem { + panic("bigcache: unimplemented") +} + +func (s *BigCacheStorage) BatchGet(ctx context.Context, keys []string) (results []*wfcache.CacheItem) { + panic("bigcache: unimplemented") +} + +func (s *BigCacheStorage) Set(ctx context.Context, key string, data []byte) error { + panic("bigcache: unimplemented") +} + +func (s *BigCacheStorage) BatchSet(ctx context.Context, pairs map[string][]byte) error { + panic("bigcache: unimplemented") +} + +func (s *BigCacheStorage) Del(ctx context.Context, key string) error { + panic("bigcache: unimplemented") +} diff --git a/cache.go b/cache.go new file mode 100644 index 0000000..f9ee1ea --- /dev/null +++ b/cache.go @@ -0,0 +1,245 @@ +package wfcache + +import ( + "context" + "encoding/json" + "errors" + "time" + + "github.com/thoas/go-funk" +) + +type CacheItem struct { + Key string `json:"key"` + Value []byte `json:"value"` + ExpiresAt int64 `json:"expiresAt"` +} + +type Storage interface { + TimeToLive() time.Duration + + Get(ctx context.Context, key string) *CacheItem + BatchGet(ctx context.Context, keys []string) []*CacheItem + Set(ctx context.Context, key string, value []byte) error + BatchSet(ctx context.Context, pairs map[string][]byte) error + Del(ctx context.Context, key string) error +} + +type StorageMaker func() (Storage, error) + +type StartOperation func() interface{} +type FinishOperation func(interface{}) +type Cache struct { + storages []Storage + + startOperation StartOperation + finishOperation FinishOperation +} + +var ( + ErrNotFulfilled = errors.New("look up not fulfilled") + ErrPartiallyFulfilled = errors.New("look up only partially fulfilled") +) + +func Create(startOperation StartOperation, finishOperation FinishOperation, maker StorageMaker, otherMakers ...StorageMaker) (*Cache, error) { + makers := append([]StorageMaker{maker}, otherMakers...) + + c := Cache{ + startOperation: startOperation, + finishOperation: finishOperation, + } + c.storages = make([]Storage, len(makers)) + for i, makeStorage := range makers { + storage, err := makeStorage() + if err != nil { + panic(err) + } + + c.storages[i] = storage + } + + return &c, nil +} + +func (c *Cache) Get(key string) (*CacheItem, error) { + return c.GetWithContext(context.Background(), key) +} + +func (c *Cache) GetWithContext(ctx context.Context, key string) (*CacheItem, error) { + so := c.startOperation() + defer c.finishOperation(so) + + missingKeyByStorage := map[Storage]string{} + + // start waterfall + for _, storage := range c.storages { + cacheItem := storage.Get(ctx, key) + + if cacheItem == nil { + missingKeyByStorage[storage] = key + continue + } else { + // prime previous storages + for s := range missingKeyByStorage { + s.Set(ctx, key, cacheItem.Value) + } + } + + // value := interface{} + // err := json.Unmarshal(cacheItem.Value, value) + + // if err != nil { + // return nil, err + // } + + return cacheItem, nil + } + + return nil, ErrNotFulfilled +} + +func (c *Cache) BatchGet(keys []string) ([]*CacheItem, error) { + return c.BatchGetWithContext(context.Background(), keys) +} + +func (c *Cache) BatchGetWithContext(ctx context.Context, keys []string) ([]*CacheItem, error) { + // TODO(juliaqiuxy) Detect dupes, empty keys, then bail + + so := c.startOperation() + defer c.finishOperation(so) + + if len(keys) == 0 { + return nil, errors.New("at least one key is required") + } + + missingKeys := keys + + cacheItems := []*CacheItem{} + + missingKeysByStorage := map[Storage][]string{} + + // start waterfall + for _, storage := range c.storages { + md := storage.BatchGet(ctx, missingKeys) + + if len(md) != 0 { + resolvedKeys := funk.Map(md, func(md *CacheItem) string { + return md.Key + }).([]string) + mKeys1, mKeys2 := funk.DifferenceString(resolvedKeys, missingKeys) + missingKeys = append(mKeys1, mKeys2...) + + cacheItems = append(cacheItems, md...) + } + + if len(missingKeys) == 0 { + break + } + + missingKeysByStorage[storage] = missingKeys + } + + // for _, cacheItem := range cacheItems { + // if cacheItem != nil { + // var m interface{} + // json.Unmarshal(cacheItem.Value, &m) + // *values = append(*values, m) + // } + // } + + if len(cacheItems) == 0 { + return nil, ErrNotFulfilled + } + + // prime previous storages + for s, misses := range missingKeysByStorage { + missedValues := map[string][]byte{} + + missedCacheItems := funk.Filter(cacheItems, func(md *CacheItem) bool { + return funk.ContainsString(misses, md.Key) + }).([]*CacheItem) + + for _, m := range missedCacheItems { + missedValues[m.Key] = m.Value + } + + if len(missedValues) != 0 { + s.BatchSet(ctx, missedValues) + } + } + + if len(missingKeys) != 0 { + return cacheItems, ErrPartiallyFulfilled + } + + return cacheItems, nil +} + +func (c *Cache) Set(key string, value interface{}) error { + return c.SetWithContext(context.Background(), key, value) +} + +func (c *Cache) SetWithContext(ctx context.Context, key string, value interface{}) error { + so := c.startOperation() + defer c.finishOperation(so) + + v, err := json.Marshal(value) + if err != nil { + return err + } + + for _, storage := range c.storages { + err := storage.Set(ctx, key, v) + if err != nil { + return err + } + } + + return nil +} + +func (c *Cache) BatchSet(pairs map[string]interface{}) error { + return c.BatchSetWithContext(context.Background(), pairs) +} + +func (c *Cache) BatchSetWithContext(ctx context.Context, pairs map[string]interface{}) error { + so := c.startOperation() + defer c.finishOperation(so) + + vPairs := map[string][]byte{} + for key, value := range pairs { + v, err := json.Marshal(value) + if err != nil { + return err + } + + vPairs[key] = v + } + + for _, storage := range c.storages { + err := storage.BatchSet(ctx, vPairs) + if err != nil { + return err + } + } + + return nil +} + +func (c *Cache) Del(key string) error { + return c.DelWithContext(context.Background(), key) +} + +func (c *Cache) DelWithContext(ctx context.Context, key string) error { + so := c.startOperation() + defer c.finishOperation(so) + + for _, storage := range c.storages { + err := storage.Del(ctx, key) + if err != nil { + return err + } + } + + return nil +} diff --git a/dynamodb/dynamodb.go b/dynamodb/dynamodb.go new file mode 100644 index 0000000..421ca09 --- /dev/null +++ b/dynamodb/dynamodb.go @@ -0,0 +1,393 @@ +package dynamodb + +import ( + "context" + "errors" + "fmt" + "math" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/awserr" + "github.com/aws/aws-sdk-go/aws/request" + "github.com/aws/aws-sdk-go/service/dynamodb" + "github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute" + "github.com/aws/aws-sdk-go/service/dynamodb/dynamodbiface" + "github.com/cenkalti/backoff/v4" + "github.com/juliaqiuxy/wfcache" + "github.com/thoas/go-funk" +) + +type DynamoDbStorage struct { + dynamodbClient dynamodbiface.DynamoDBAPI + tableName string + ttl time.Duration +} + +const maxReadOps = 100 +const maxWriteOps = 25 + +func prepareDynamoDbTableIfNotExists(dynamodbClient dynamodbiface.DynamoDBAPI, tableName string, readCapacityUnits int64, writeCapacityUnits int64) error { + _, err := dynamodbClient.CreateTable(&dynamodb.CreateTableInput{ + AttributeDefinitions: []*dynamodb.AttributeDefinition{ + { + AttributeName: aws.String("key"), + AttributeType: aws.String("S"), + }, + }, + KeySchema: []*dynamodb.KeySchemaElement{ + { + AttributeName: aws.String("key"), + KeyType: aws.String("HASH"), + }, + }, + ProvisionedThroughput: &dynamodb.ProvisionedThroughput{ + ReadCapacityUnits: aws.Int64(readCapacityUnits), + WriteCapacityUnits: aws.Int64(writeCapacityUnits), + }, + TableName: aws.String(tableName), + }) + + if err != nil { + if aerr, ok := err.(awserr.Error); ok { + if aerr.Code() != dynamodb.ErrCodeResourceInUseException { + return err + } + } + } else { + // TODO(juliaqiuxy) perform describe on the table to ensure configuration passed + // by other procs are consistent, or panic with a good explanation as to why e.g. + // Underlying dynamodb table can't be shared across different stroage configuration + dynamodbClient.UpdateTimeToLive(&dynamodb.UpdateTimeToLiveInput{ + TableName: aws.String(tableName), + TimeToLiveSpecification: &dynamodb.TimeToLiveSpecification{ + Enabled: aws.Bool(true), + AttributeName: aws.String("expiresAt"), + }, + }) + } + + return nil +} + +func Create(dynamodbClient dynamodbiface.DynamoDBAPI, tableName string, readCapacityUnits int64, writeCapacityUnits int64, ttl time.Duration) wfcache.StorageMaker { + return func() (wfcache.Storage, error) { + if dynamodbClient == nil { + return nil, errors.New("dynamodb requires a client") + } + if tableName == "" { + return nil, errors.New("dynamodb storage requires a table name") + } + if ttl == 0 { + return nil, errors.New("dynamodb storage requires a ttl") + } + + s := &DynamoDbStorage{ + dynamodbClient: dynamodbClient, + tableName: tableName, + ttl: ttl, + } + + _, err := dynamodbClient.DescribeTable(&dynamodb.DescribeTableInput{ + TableName: aws.String(s.tableName), + }) + + if err != nil { + if awserr, ok := err.(awserr.Error); ok { + switch awserr.Code() { + case dynamodb.ErrCodeResourceNotFoundException: + err = prepareDynamoDbTableIfNotExists( + dynamodbClient, + tableName, + readCapacityUnits, + writeCapacityUnits, + ) + if err != nil { + return nil, err + } + } + } + } + + return s, nil + } +} + +func (s *DynamoDbStorage) TimeToLive() time.Duration { + return s.ttl +} + +func (s *DynamoDbStorage) Get(ctx context.Context, key string) *wfcache.CacheItem { + result, err := s.dynamodbClient.GetItem(&dynamodb.GetItemInput{ + TableName: aws.String(s.tableName), + Key: map[string]*dynamodb.AttributeValue{ + "key": { + S: aws.String(key), + }, + }, + }) + + if err != nil || result.Item == nil { + return nil + } + + cacheItem := wfcache.CacheItem{} + err = dynamodbattribute.UnmarshalMap(result.Item, &cacheItem) + + if err != nil { + return nil + } + + return &cacheItem +} + +// If you request more than 100 items, BatchGetItem returns a ValidationException +// with the message "Too many items requested for the BatchGetItem call." +func (s *DynamoDbStorage) BatchGet(ctx context.Context, keys []string) (results []*wfcache.CacheItem) { + queue := keys + +process: + maxItems := int(math.Min(maxReadOps, float64(len(queue)))) + next := queue[0:maxItems] + queue = queue[maxItems:] + + mapOfAttrKeys := []map[string]*dynamodb.AttributeValue{} + for _, key := range next { + mapOfAttrKeys = append(mapOfAttrKeys, map[string]*dynamodb.AttributeValue{ + "key": { + S: aws.String(key), + }, + }) + } + + var result *dynamodb.BatchGetItemOutput + err := withRetry(ctx, func() error { + var err error + + // TODO use BatchWriteItemWithContext + result, err = s.dynamodbClient.BatchGetItem(&dynamodb.BatchGetItemInput{ + RequestItems: map[string]*dynamodb.KeysAndAttributes{ + s.tableName: { + Keys: mapOfAttrKeys, + }, + }, + }) + + return err + }) + + if err != nil { + // TODO(juliaqiuxy) log debug + return nil + } + + for _, table := range result.Responses { + for _, item := range table { + cacheItem := wfcache.CacheItem{} + err = dynamodbattribute.UnmarshalMap(item, &cacheItem) + + if err != nil { + // TODO(juliaqiuxy) log debug + return nil + } + + results = append(results, &cacheItem) + } + } + + // if the results exceeds 16MB, put the unprocessed keys back on in the queue + unprocessedTable := result.UnprocessedKeys[s.tableName] + + if unprocessedTable != nil && unprocessedTable.Keys != nil { + unprocessedKeys := funk.Map(unprocessedTable.Keys, func(item map[string]*dynamodb.AttributeValue) string { + return *item["key"].S + }).([]string) + + queue = append(queue, unprocessedKeys...) + } + + if len(queue) != 0 { + goto process + } + + return results +} + +func (s *DynamoDbStorage) Set(ctx context.Context, key string, data []byte) error { + item, err := dynamodbattribute.MarshalMap(wfcache.CacheItem{ + Key: key, + Value: data, + ExpiresAt: time.Now().UTC().Add(s.ttl).Unix(), + }) + + if err != nil { + return err + } + + _, err = s.dynamodbClient.PutItem(&dynamodb.PutItemInput{ + TableName: aws.String(s.tableName), + Item: item, + }) + + if err != nil { + return err + } + + return nil +} + +func (s *DynamoDbStorage) BatchSet(ctx context.Context, pairs map[string][]byte) error { + + queue := funk.Keys(pairs).([]string) + +process: + maxItems := int(math.Min(maxWriteOps, float64(len(queue)))) + next := queue[0:maxItems] + queue = queue[maxItems:] + + mapOfAttrKeys := []*dynamodb.WriteRequest{} + for _, key := range next { + item, err := dynamodbattribute.MarshalMap(wfcache.CacheItem{ + Key: key, + Value: pairs[key], + ExpiresAt: time.Now().UTC().Add(s.ttl).Unix(), + }) + + if err != nil { + return err + } + + mapOfAttrKeys = append( + mapOfAttrKeys, + &dynamodb.WriteRequest{ + PutRequest: &dynamodb.PutRequest{ + Item: item, + }, + }, + ) + } + + var result *dynamodb.BatchWriteItemOutput + err := withRetry(ctx, func() error { + var err error + + // TODO use BatchWriteItemWithContext + result, err = s.dynamodbClient.BatchWriteItem(&dynamodb.BatchWriteItemInput{ + RequestItems: map[string][]*dynamodb.WriteRequest{ + s.tableName: mapOfAttrKeys, + }, + }) + return err + }) + + if err != nil { + return fmt.Errorf(errDynamodbBatchWrite, err) + } + + // if we have unprocessed items due to dynamodb limits, + // put them back in the queue + unprocessedItems := result.UnprocessedItems[s.tableName] + + if unprocessedItems != nil { + unprocessedKeys := funk.Map(unprocessedItems, func(item *dynamodb.WriteRequest) string { + return *item.PutRequest.Item["key"].S + }).([]string) + + queue = append(queue, unprocessedKeys...) + } + + if len(queue) != 0 { + goto process + } + + return nil +} + +func (s *DynamoDbStorage) Del(ctx context.Context, key string) error { + _, err := s.dynamodbClient.DeleteItem(&dynamodb.DeleteItemInput{ + TableName: aws.String(s.tableName), + Key: map[string]*dynamodb.AttributeValue{ + "key": { + S: aws.String(key), + }, + }, + }) + + if err != nil { + return err + } + + return nil +} + +func withRetry(ctx aws.Context, fn func() error) (err error) { + var wait time.Duration + + b := backoff.WithContext(backoff.NewExponentialBackOff(), ctx) + + for { + err = fn() + + if err == nil { + return nil + } + + if !isRetriable(err) { + return err + } + + wait = b.NextBackOff() + + if wait == backoff.Stop { + return err + } + + err = aws.SleepWithContext(ctx, wait) + if err != nil { + return err + } + } +} + +func isRetriable(err error) bool { + if aerr, ok := err.(awserr.RequestFailure); ok { + switch aerr.StatusCode() { + case + 429, // error caused due to too many requests + 500, // DynamoDB could not process, retry + 502, // Bad Gateway error should be throttled + 503, // caused when service is unavailable + 504: // error occurred due to gateway timeout + return true + } + } + + if request.IsErrorThrottle(err) || request.IsErrorRetryable(err) { + return true + } + + return false +} + +const errDynamodbBatchWrite = `error: %s + +DynamoDB rejects the entire batch write operation when one or more of the following is true: + +* One or more tables specified in the BatchWriteItem request does not +exist. + +* Primary key attributes specified on an item in the request do not match +those in the corresponding table's primary key schema. + +* You try to perform multiple operations on the same item in the same +BatchWriteItem request. For example, you cannot put and delete the same +item in the same BatchWriteItem request. + +* Your request contains at least two items with identical hash and range +keys (which essentially is two put operations). + +* There are more than 25 requests in the batch. + +* Any individual item in a batch exceeds 400 KB. + +* The total request size exceeds 16 MB` diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..2201252 --- /dev/null +++ b/go.mod @@ -0,0 +1,13 @@ +module github.com/juliaqiuxy/wfcache + +go 1.16 + +require ( + github.com/aws/aws-sdk-go v1.38.51 + github.com/cenkalti/backoff/v4 v4.1.0 + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/stretchr/testify v1.7.0 // indirect + github.com/thoas/go-funk v0.8.0 + gopkg.in/yaml.v2 v2.4.0 // indirect + gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..7014676 --- /dev/null +++ b/go.sum @@ -0,0 +1,38 @@ +github.com/aws/aws-sdk-go v1.38.51 h1:aKQmbVbwOCuQSd8+fm/MR3bq0QOsu9Q7S+/QEND36oQ= +github.com/aws/aws-sdk-go v1.38.51/go.mod h1:hcU610XS61/+aQV88ixoOzUoG7v3b31pl2zKMmprdro= +github.com/cenkalti/backoff/v4 v4.1.0 h1:c8LkOFQTzuO0WBM/ae5HdGQuZPfPxp7lqBRwQRm4fSc= +github.com/cenkalti/backoff/v4 v4.1.0/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= +github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= +github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8= +github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/thoas/go-funk v0.8.0 h1:JP9tKSvnpFVclYgDM0Is7FD9M4fhPvqA0s0BsXmzSRQ= +github.com/thoas/go-funk v0.8.0/go.mod h1:+IWnUfUmFO1+WVYQWQtIJHeRRdaIyyYglZN7xzUPe4Q= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= +gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776 h1:tQIYjPdBoyREyB9XMu+nnTclpTYkz2zFM+lzLJFO4gQ= +gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/redis/redis.go b/redis/redis.go new file mode 100644 index 0000000..bb13d15 --- /dev/null +++ b/redis/redis.go @@ -0,0 +1,42 @@ +package redis + +import ( + "context" + "time" + + "github.com/juliaqiuxy/wfcache" +) + +type RedisStorage struct{} + +func Create(ttl time.Duration) wfcache.StorageMaker { + return func() (wfcache.Storage, error) { + s := &RedisStorage{} + + return s, nil + } +} + +func (s *RedisStorage) TimeToLive() time.Duration { + return 0 +} + +func (s *RedisStorage) Get(ctx context.Context, key string) *wfcache.CacheItem { + panic("redis: unimplemented") +} + +func (s *RedisStorage) BatchGet(ctx context.Context, keys []string) (results []*wfcache.CacheItem) { + panic("redis: unimplemented") +} + +func (s *RedisStorage) Set(ctx context.Context, key string, data []byte) error { + panic("redis: unimplemented") +} + +func (s *RedisStorage) BatchSet(ctx context.Context, pairs map[string][]byte) error { + panic("redis: unimplemented") +} + +func (s *RedisStorage) Del(ctx context.Context, key string) error { + panic("redis: unimplemented") +}