From 717790f29e5891f0a69604a1f3cdb16de8cce35d Mon Sep 17 00:00:00 2001 From: 1lann Date: Sat, 5 Aug 2017 19:40:13 +1000 Subject: [PATCH] Update for uint64 counters and concurrent indexing --- count_test.go | 2 +- error_test.go | 2 ++ filter_test.go | 10 ++++----- index.go | 37 ++++++++++++-------------------- range.go | 20 +++++++++--------- table.go | 57 ++++++++++++++++++++++++++++---------------------- 6 files changed, 63 insertions(+), 65 deletions(-) diff --git a/count_test.go b/count_test.go index 51e0f76..c8d644f 100644 --- a/count_test.go +++ b/count_test.go @@ -141,7 +141,7 @@ func testCounting(t *testing.T, compression bool) { countError := errors.New("cete testing: count error") - r := newRange(func() (string, []byte, int, error) { + r := newRange(func() (string, []byte, uint64, error) { return "", nil, 0, countError }, func() {}, nil) diff --git a/error_test.go b/error_test.go index 65aac1b..7189054 100644 --- a/error_test.go +++ b/error_test.go @@ -156,6 +156,8 @@ func TestBadFiles(t *testing.T) { panicNotNil(os.Rename(dir+"/data/6572726f725f74657374696e67/old", dir+"/data/6572726f725f74657374696e67/data")) + panicNotNil(os.RemoveAll(dir + "/data")) + db, err = Open(dir + "/data") panicNotNil(err) diff --git a/filter_test.go b/filter_test.go index 10e2849..ccad778 100644 --- a/filter_test.go +++ b/filter_test.go @@ -62,7 +62,7 @@ func TestDo(t *testing.T) { } var sum int32 - panicNotNil(db.Table("do_testing").All().Do(func(key string, counter int, doc Document) error { + panicNotNil(db.Table("do_testing").All().Do(func(key string, counter uint64, doc Document) error { atomic.AddInt32(&sum, 1) return nil })) @@ -73,7 +73,7 @@ func TestDo(t *testing.T) { sum = 0 - panicNotNil(db.Table("do_testing").All().Do(func(key string, counter int, doc Document) error { + panicNotNil(db.Table("do_testing").All().Do(func(key string, counter uint64, doc Document) error { sum++ return nil }, 1)) @@ -85,7 +85,7 @@ func TestDo(t *testing.T) { sum = 0 testError := errors.New("cete testing: test do") - err = db.Table("do_testing").All().Do(func(key string, counter int, doc Document) error { + err = db.Table("do_testing").All().Do(func(key string, counter uint64, doc Document) error { if key == "ben" { time.Sleep(time.Millisecond * 100) return testError @@ -102,11 +102,11 @@ func TestDo(t *testing.T) { t.Fatal("sum should be 2, but isn't") } - r := newRange(func() (string, []byte, int, error) { + r := newRange(func() (string, []byte, uint64, error) { return "", nil, 0, testError }, func() {}, nil) - err = r.Do(func(key string, counter int, doc Document) error { + err = r.Do(func(key string, counter uint64, doc Document) error { t.Fatal("do should not run, but does") return nil }) diff --git a/index.go b/index.go index 12aad7c..3f77c80 100644 --- a/index.go +++ b/index.go @@ -86,32 +86,21 @@ func (t *Table) NewIndex(name string) error { } func (i *Index) indexValues(name string) error { - r := i.table.Between(MinValue, MaxValue) - - var entry bufferEntry - var results []interface{} - var err error - - for { - entry = <-r.buffer - if entry.err == ErrEndOfRange { - break - } else if entry.err != nil { - return entry.err - } - - results, err = i.indexQuery(entry.data, name) + i.table.Between(MinValue, MaxValue).Do(func(key string, counter uint64, doc Document) error { + results, err := i.indexQuery(doc.data, name) if err != nil { - continue + return nil } for _, result := range results { - err = i.addToIndex(valueToBytes(result), entry.key) + err = i.addToIndex(valueToBytes(result), key) if err != nil { log.Println("cete: index error for index \""+name+"\":", err) } } - } + + return nil + }, 20) return nil } @@ -158,7 +147,7 @@ func (i *Index) indexQuery(data []byte, query string) ([]interface{}, error) { // must either be a pointer or nil if you would like to only get the key/counter // and check for existence. Note that indexes are non-unique, a single index key // can map to multiple values. Use GetAll to get all such matching values. -func (i *Index) One(key interface{}, dst interface{}) (string, int, error) { +func (i *Index) One(key interface{}, dst interface{}) (string, uint64, error) { r, err := i.GetAll(key) if err != nil { return "", 0, err @@ -207,7 +196,7 @@ func (i *Index) getAllValues(indexValue []byte) (*Range, error) { var value []byte var item badger.KVItem - return newRange(func() (string, []byte, int, error) { + return newRange(func() (string, []byte, uint64, error) { for { if c >= len(keys) { return "", nil, 0, ErrEndOfRange @@ -227,7 +216,7 @@ func (i *Index) getAllValues(indexValue []byte) (*Range, error) { copy(value, item.Value()) c++ - return keys[c-1], value, int(item.Counter()), nil + return keys[c-1], value, item.Counter(), nil } }, func() {}, i.table), nil } @@ -243,7 +232,7 @@ func (i *Index) getAllValues(indexValue []byte) (*Range, error) { // bound values. func (i *Index) Between(lower, upper interface{}, reverse ...bool) *Range { if lower == MaxValue || upper == MinValue { - return newRange(func() (string, []byte, int, error) { + return newRange(func() (string, []byte, uint64, error) { return "", nil, 0, ErrEndOfRange }, func() {}, nil) } @@ -342,13 +331,13 @@ func decodeArrayCount(header []byte) int64 { func (i *Index) betweenNext(it *badger.Iterator, lastRange *Range, shouldReverse bool, lower, - upper interface{}) func() (string, []byte, int, error) { + upper interface{}) func() (string, []byte, uint64, error) { upperBytes := valueToBytes(upper) lowerBytes := valueToBytes(lower) var entry bufferEntry - return func() (string, []byte, int, error) { + return func() (string, []byte, uint64, error) { if lastRange != nil { entry = <-lastRange.buffer if entry.err != ErrEndOfRange { diff --git a/range.go b/range.go index 00ac8c7..2d23673 100644 --- a/range.go +++ b/range.go @@ -14,7 +14,7 @@ const bufferSize = 100 type bufferEntry struct { key string data []byte - counter int + counter uint64 err error } @@ -22,7 +22,7 @@ type bufferEntry struct { // by index/key. type Range struct { buffer chan bufferEntry - next func() (string, []byte, int, error) + next func() (string, []byte, uint64, error) close func() closed int32 @@ -32,7 +32,7 @@ type Range struct { // Next stores the next item in the range into dst. dst must be a pointer // to a value, or nil. If dst is nil then the value will be discarded, but // the counter and key will still be returned. -func (r *Range) Next(dst interface{}) (string, int, error) { +func (r *Range) Next(dst interface{}) (string, uint64, error) { entry, more := <-r.buffer if !more { return "", 0, ErrEndOfRange @@ -121,7 +121,7 @@ func (r *Range) All(dst interface{}) error { // Limit limits the number of documents that can be read from the range. // When this limit is reached, ErrEndOfRange will be returned. func (r *Range) Limit(n int64) *Range { - return newRange(func() (string, []byte, int, error) { + return newRange(func() (string, []byte, uint64, error) { entry := <-r.buffer if n <= 0 { @@ -141,7 +141,7 @@ func (r *Range) Close() { } } -func newRange(next func() (string, []byte, int, error), closer func(), +func newRange(next func() (string, []byte, uint64, error), closer func(), table *Table) *Range { r := &Range{ buffer: make(chan bufferEntry, bufferSize), @@ -214,7 +214,7 @@ func (r *Range) Filter(filter func(doc Document) (bool, error), readFromWorker := 0 var entry *bufferEntry - return newRange(func() (string, []byte, int, error) { + return newRange(func() (string, []byte, uint64, error) { for { entry = <-outboxes[readFromWorker] readFromWorker = (readFromWorker + 1) % numWorkers @@ -276,7 +276,7 @@ func filterWorker(filter func(doc Document) (bool, error), // // You can optionally specify the number of workers to concurrently operate // on. By default the number of workers is 10. -func (r *Range) Do(operation func(key string, counter int, doc Document) error, +func (r *Range) Do(operation func(key string, counter uint64, doc Document) error, workers ...int) error { numWorkers := 10 @@ -336,7 +336,7 @@ func (r *Range) Do(operation func(key string, counter int, doc Document) error, return result } -func doWorker(wg *sync.WaitGroup, operation func(key string, counter int, +func doWorker(wg *sync.WaitGroup, operation func(key string, counter uint64, doc Document) error, table *Table, inbox chan *bufferEntry, completion chan error) { var entry *bufferEntry @@ -379,7 +379,7 @@ func (r *Range) Skip(n int) *Range { for i := 0; i < n; i++ { entry = <-r.buffer if entry.err != nil { - return newRange(func() (string, []byte, int, error) { + return newRange(func() (string, []byte, uint64, error) { return "", nil, 0, entry.err }, func() {}, nil) } @@ -417,7 +417,7 @@ func (r *Range) Unique() *Range { var entry bufferEntry seen := make(map[string]bool) - return newRange(func() (string, []byte, int, error) { + return newRange(func() (string, []byte, uint64, error) { for { entry = <-r.buffer diff --git a/table.go b/table.go index f8f1119..20c5716 100644 --- a/table.go +++ b/table.go @@ -110,7 +110,7 @@ func (t *Table) Drop() error { // Get retrieves a value from a table with its primary key. dst must either be // a pointer or nil if you only want to get the counter or check for existence. -func (t *Table) Get(key string, dst interface{}) (int, error) { +func (t *Table) Get(key string, dst interface{}) (uint64, error) { var item badger.KVItem err := t.data.Get([]byte(key), &item) if err != nil { @@ -122,19 +122,20 @@ func (t *Table) Get(key string, dst interface{}) (int, error) { } if dst == nil { - return int(item.Counter()), nil + return item.Counter(), nil } if t.keyToCompressed != nil { - return int(item.Counter()), msgpack.UnmarshalCompressed(t.cToKey, item.Value(), dst) + return item.Counter(), msgpack.UnmarshalCompressed(t.cToKey, item.Value(), dst) } - return int(item.Counter()), msgpack.Unmarshal(item.Value(), dst) + return item.Counter(), msgpack.Unmarshal(item.Value(), dst) } // Set sets a value in the table. An optional counter value can be provided -// to only set the value if the counter value is the same. -func (t *Table) Set(key string, value interface{}, counter ...int) error { +// to only set the value if the counter value is the same. A counter value +// of 0 is valid and represents a key that doesn't exist. +func (t *Table) Set(key string, value interface{}, counter ...uint64) error { var item badger.KVItem err := t.data.Get([]byte(key), &item) if err != nil { @@ -142,7 +143,7 @@ func (t *Table) Set(key string, value interface{}, counter ...int) error { } if len(counter) > 0 { - if item.Counter() != uint16(counter[0]) { + if item.Counter() != counter[0] { return ErrCounterChanged } } @@ -158,12 +159,16 @@ func (t *Table) Set(key string, value interface{}, counter ...int) error { } if len(counter) > 0 { - err = t.data.CompareAndSet([]byte(key), data, uint16(counter[0])) + if counter[0] == 0 { + err = t.data.SetIfAbsent([]byte(key), data, 0) + } else { + err = t.data.CompareAndSet([]byte(key), data, counter[0]) + } } else { - err = t.data.Set([]byte(key), data) + err = t.data.Set([]byte(key), data, 0) } - if err == badger.CasMismatch { + if err == badger.CasMismatch || err == badger.KeyExists { return ErrCounterChanged } @@ -357,17 +362,17 @@ func (i *Index) addToIndex(indexKey []byte, key string) error { } if item.Value() == nil { + err = i.index.SetIfAbsent(indexKey, data, 0) + if err == badger.KeyExists { + continue + } + } else { err = i.index.CompareAndSet(indexKey, data, item.Counter()) if err == badger.CasMismatch { continue } } - err = i.index.CompareAndSet(indexKey, data, item.Counter()) - if err == badger.CasMismatch { - continue - } - return err } } @@ -384,7 +389,7 @@ func (i *Index) name() string { // Delete deletes the key from the table. An optional counter value can be // provided to only delete the document if the counter value is the same. -func (t *Table) Delete(key string, counter ...int) error { +func (t *Table) Delete(key string, counter ...uint64) error { var item badger.KVItem err := t.data.Get([]byte(key), &item) if err != nil { @@ -396,11 +401,11 @@ func (t *Table) Delete(key string, counter ...int) error { } if len(counter) > 0 { - if int(item.Counter()) != counter[0] { + if item.Counter() != counter[0] { return ErrCounterChanged } - err = t.data.CompareAndDelete([]byte(key), uint16(counter[0])) + err = t.data.CompareAndDelete([]byte(key), counter[0]) } else { err = t.data.Delete([]byte(key)) } @@ -431,6 +436,8 @@ func (t *Table) Index(index string) *Index { // whether or not the update should be aborted, and will be returned back from // Update. // +// ErrNotFound will be returned if the document does not exist. +// // The modifier function will be continuously called until the counter at the // beginning of handler matches the counter when the document is updated. // This allows for safe updates on a single document, such as incrementing a @@ -466,7 +473,7 @@ func (t *Table) Update(key string, handler interface{}) error { return result[1].Interface().(error) } - err = t.Set(key, result[0].Interface(), counter) + err = t.Set(key, result[0].Interface(), counter, 0) if err == ErrCounterChanged { continue } @@ -498,7 +505,7 @@ func (t *Table) name() string { func (t *Table) Between(lower interface{}, upper interface{}, reverse ...bool) *Range { if lower == MaxValue || upper == MinValue { - return newRange(func() (string, []byte, int, error) { + return newRange(func() (string, []byte, uint64, error) { return "", nil, 0, ErrEndOfRange }, func() {}, nil) } @@ -516,7 +523,7 @@ func (t *Table) Between(lower interface{}, upper interface{}, log.Println("cete: warning: lower and upper bounds of " + "table.Between must be a string or Bounds. An empty range has " + "been returned instead") - return newRange(func() (string, []byte, int, error) { + return newRange(func() (string, []byte, uint64, error) { return "", nil, 0, ErrEndOfRange }, func() {}, nil) } @@ -527,7 +534,7 @@ func (t *Table) Between(lower interface{}, upper interface{}, log.Println("cete: warning: lower and upper bounds of " + "table.Between must be a string or Bounds. An empty range has " + "been returned instead") - return newRange(func() (string, []byte, int, error) { + return newRange(func() (string, []byte, uint64, error) { return "", nil, 0, ErrEndOfRange }, func() {}, nil) } @@ -550,10 +557,10 @@ func (t *Table) Between(lower interface{}, upper interface{}, } var key string - var counter int + var counter uint64 var value []byte - return newRange(func() (string, []byte, int, error) { + return newRange(func() (string, []byte, uint64, error) { for it.Valid() { if !shouldReverse && upper != MaxValue && bytes.Compare(it.Item().Key(), upperBytes) > 0 { @@ -564,7 +571,7 @@ func (t *Table) Between(lower interface{}, upper interface{}, } key = string(it.Item().Key()) - counter = int(it.Item().Counter()) + counter = it.Item().Counter() value = make([]byte, len(it.Item().Value())) copy(value, it.Item().Value()) it.Next()