Skip to content

Commit

Permalink
Update for uint64 counters and concurrent indexing
Browse files Browse the repository at this point in the history
  • Loading branch information
1lann committed Aug 5, 2017
1 parent 75d2c06 commit 717790f
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 65 deletions.
2 changes: 1 addition & 1 deletion count_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func testCounting(t *testing.T, compression bool) {

countError := errors.New("cete testing: count error")

r := newRange(func() (string, []byte, int, error) {
r := newRange(func() (string, []byte, uint64, error) {
return "", nil, 0, countError
}, func() {}, nil)

Expand Down
2 changes: 2 additions & 0 deletions error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ func TestBadFiles(t *testing.T) {
panicNotNil(os.Rename(dir+"/data/6572726f725f74657374696e67/old",
dir+"/data/6572726f725f74657374696e67/data"))

panicNotNil(os.RemoveAll(dir + "/data"))

db, err = Open(dir + "/data")
panicNotNil(err)

Expand Down
10 changes: 5 additions & 5 deletions filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func TestDo(t *testing.T) {
}

var sum int32
panicNotNil(db.Table("do_testing").All().Do(func(key string, counter int, doc Document) error {
panicNotNil(db.Table("do_testing").All().Do(func(key string, counter uint64, doc Document) error {
atomic.AddInt32(&sum, 1)
return nil
}))
Expand All @@ -73,7 +73,7 @@ func TestDo(t *testing.T) {

sum = 0

panicNotNil(db.Table("do_testing").All().Do(func(key string, counter int, doc Document) error {
panicNotNil(db.Table("do_testing").All().Do(func(key string, counter uint64, doc Document) error {
sum++
return nil
}, 1))
Expand All @@ -85,7 +85,7 @@ func TestDo(t *testing.T) {
sum = 0
testError := errors.New("cete testing: test do")

err = db.Table("do_testing").All().Do(func(key string, counter int, doc Document) error {
err = db.Table("do_testing").All().Do(func(key string, counter uint64, doc Document) error {
if key == "ben" {
time.Sleep(time.Millisecond * 100)
return testError
Expand All @@ -102,11 +102,11 @@ func TestDo(t *testing.T) {
t.Fatal("sum should be 2, but isn't")
}

r := newRange(func() (string, []byte, int, error) {
r := newRange(func() (string, []byte, uint64, error) {
return "", nil, 0, testError
}, func() {}, nil)

err = r.Do(func(key string, counter int, doc Document) error {
err = r.Do(func(key string, counter uint64, doc Document) error {
t.Fatal("do should not run, but does")
return nil
})
Expand Down
37 changes: 13 additions & 24 deletions index.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,32 +86,21 @@ func (t *Table) NewIndex(name string) error {
}

func (i *Index) indexValues(name string) error {
r := i.table.Between(MinValue, MaxValue)

var entry bufferEntry
var results []interface{}
var err error

for {
entry = <-r.buffer
if entry.err == ErrEndOfRange {
break
} else if entry.err != nil {
return entry.err
}

results, err = i.indexQuery(entry.data, name)
i.table.Between(MinValue, MaxValue).Do(func(key string, counter uint64, doc Document) error {
results, err := i.indexQuery(doc.data, name)
if err != nil {
continue
return nil
}

for _, result := range results {
err = i.addToIndex(valueToBytes(result), entry.key)
err = i.addToIndex(valueToBytes(result), key)
if err != nil {
log.Println("cete: index error for index \""+name+"\":", err)
}
}
}

return nil
}, 20)

return nil
}
Expand Down Expand Up @@ -158,7 +147,7 @@ func (i *Index) indexQuery(data []byte, query string) ([]interface{}, error) {
// must either be a pointer or nil if you would like to only get the key/counter
// and check for existence. Note that indexes are non-unique, a single index key
// can map to multiple values. Use GetAll to get all such matching values.
func (i *Index) One(key interface{}, dst interface{}) (string, int, error) {
func (i *Index) One(key interface{}, dst interface{}) (string, uint64, error) {
r, err := i.GetAll(key)
if err != nil {
return "", 0, err
Expand Down Expand Up @@ -207,7 +196,7 @@ func (i *Index) getAllValues(indexValue []byte) (*Range, error) {
var value []byte
var item badger.KVItem

return newRange(func() (string, []byte, int, error) {
return newRange(func() (string, []byte, uint64, error) {
for {
if c >= len(keys) {
return "", nil, 0, ErrEndOfRange
Expand All @@ -227,7 +216,7 @@ func (i *Index) getAllValues(indexValue []byte) (*Range, error) {
copy(value, item.Value())

c++
return keys[c-1], value, int(item.Counter()), nil
return keys[c-1], value, item.Counter(), nil
}
}, func() {}, i.table), nil
}
Expand All @@ -243,7 +232,7 @@ func (i *Index) getAllValues(indexValue []byte) (*Range, error) {
// bound values.
func (i *Index) Between(lower, upper interface{}, reverse ...bool) *Range {
if lower == MaxValue || upper == MinValue {
return newRange(func() (string, []byte, int, error) {
return newRange(func() (string, []byte, uint64, error) {
return "", nil, 0, ErrEndOfRange
}, func() {}, nil)
}
Expand Down Expand Up @@ -342,13 +331,13 @@ func decodeArrayCount(header []byte) int64 {

func (i *Index) betweenNext(it *badger.Iterator, lastRange *Range,
shouldReverse bool, lower,
upper interface{}) func() (string, []byte, int, error) {
upper interface{}) func() (string, []byte, uint64, error) {
upperBytes := valueToBytes(upper)
lowerBytes := valueToBytes(lower)

var entry bufferEntry

return func() (string, []byte, int, error) {
return func() (string, []byte, uint64, error) {
if lastRange != nil {
entry = <-lastRange.buffer
if entry.err != ErrEndOfRange {
Expand Down
20 changes: 10 additions & 10 deletions range.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@ const bufferSize = 100
type bufferEntry struct {
key string
data []byte
counter int
counter uint64
err error
}

// Range represents a result with multiple values in it and is usually sorted
// by index/key.
type Range struct {
buffer chan bufferEntry
next func() (string, []byte, int, error)
next func() (string, []byte, uint64, error)
close func()
closed int32

Expand All @@ -32,7 +32,7 @@ type Range struct {
// Next stores the next item in the range into dst. dst must be a pointer
// to a value, or nil. If dst is nil then the value will be discarded, but
// the counter and key will still be returned.
func (r *Range) Next(dst interface{}) (string, int, error) {
func (r *Range) Next(dst interface{}) (string, uint64, error) {
entry, more := <-r.buffer
if !more {
return "", 0, ErrEndOfRange
Expand Down Expand Up @@ -121,7 +121,7 @@ func (r *Range) All(dst interface{}) error {
// Limit limits the number of documents that can be read from the range.
// When this limit is reached, ErrEndOfRange will be returned.
func (r *Range) Limit(n int64) *Range {
return newRange(func() (string, []byte, int, error) {
return newRange(func() (string, []byte, uint64, error) {
entry := <-r.buffer

if n <= 0 {
Expand All @@ -141,7 +141,7 @@ func (r *Range) Close() {
}
}

func newRange(next func() (string, []byte, int, error), closer func(),
func newRange(next func() (string, []byte, uint64, error), closer func(),
table *Table) *Range {
r := &Range{
buffer: make(chan bufferEntry, bufferSize),
Expand Down Expand Up @@ -214,7 +214,7 @@ func (r *Range) Filter(filter func(doc Document) (bool, error),
readFromWorker := 0
var entry *bufferEntry

return newRange(func() (string, []byte, int, error) {
return newRange(func() (string, []byte, uint64, error) {
for {
entry = <-outboxes[readFromWorker]
readFromWorker = (readFromWorker + 1) % numWorkers
Expand Down Expand Up @@ -276,7 +276,7 @@ func filterWorker(filter func(doc Document) (bool, error),
//
// You can optionally specify the number of workers to concurrently operate
// on. By default the number of workers is 10.
func (r *Range) Do(operation func(key string, counter int, doc Document) error,
func (r *Range) Do(operation func(key string, counter uint64, doc Document) error,
workers ...int) error {

numWorkers := 10
Expand Down Expand Up @@ -336,7 +336,7 @@ func (r *Range) Do(operation func(key string, counter int, doc Document) error,
return result
}

func doWorker(wg *sync.WaitGroup, operation func(key string, counter int,
func doWorker(wg *sync.WaitGroup, operation func(key string, counter uint64,
doc Document) error, table *Table, inbox chan *bufferEntry,
completion chan error) {
var entry *bufferEntry
Expand Down Expand Up @@ -379,7 +379,7 @@ func (r *Range) Skip(n int) *Range {
for i := 0; i < n; i++ {
entry = <-r.buffer
if entry.err != nil {
return newRange(func() (string, []byte, int, error) {
return newRange(func() (string, []byte, uint64, error) {
return "", nil, 0, entry.err
}, func() {}, nil)
}
Expand Down Expand Up @@ -417,7 +417,7 @@ func (r *Range) Unique() *Range {
var entry bufferEntry
seen := make(map[string]bool)

return newRange(func() (string, []byte, int, error) {
return newRange(func() (string, []byte, uint64, error) {
for {
entry = <-r.buffer

Expand Down
Loading

0 comments on commit 717790f

Please sign in to comment.