Skip to content

Commit

Permalink
Merge pull request #314 from ondrej-fabry/fix-consul
Browse files Browse the repository at this point in the history
Fix trimming prefix for brokers in Consul
  • Loading branch information
VladoLavor authored Jul 23, 2018
2 parents 99b1f3e + 3058b3b commit 22c14be
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 20 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
# Release v1.4.1 (2018-07-23)

## Bugfix
* Fixed issue in Consul client that caused brokers to incorrectly
trim prefixes and thus storing invalid revisions for resync.

# Release v1.4 (2018-07-16)

## Breaking Changes
Expand Down
35 changes: 22 additions & 13 deletions db/keyval/consul/consul.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func NewClient(cfg *api.Config) (store *Client, err error) {

// Put stores given data for the key.
func (c *Client) Put(key string, data []byte, opts ...datasync.PutOption) error {
consulLogger.Debugf("put: %q\n", key)
consulLogger.Debugf("Put: %q", key)
p := &api.KVPair{Key: transformKey(key), Value: data}
_, err := c.client.KV().Put(p, nil)
if err != nil {
Expand All @@ -86,7 +86,7 @@ func (c *Client) NewTxn() keyval.BytesTxn {

// GetValue returns data for the given key.
func (c *Client) GetValue(key string) (data []byte, found bool, revision int64, err error) {
consulLogger.Debugf("get value: %q\n", key)
consulLogger.Debugf("GetValue: %q", key)
pair, _, err := c.client.KV().Get(transformKey(key), nil)
if err != nil {
return nil, false, 0, err
Expand Down Expand Up @@ -119,7 +119,7 @@ func (c *Client) ListKeys(prefix string) (keyval.BytesKeyIterator, error) {

// Delete deletes given key.
func (c *Client) Delete(key string, opts ...datasync.DelOption) (existed bool, err error) {
consulLogger.Debugf("delete: %q\n", key)
consulLogger.Debugf("Delete: %q", key)
if _, err := c.client.KV().Delete(transformKey(key), nil); err != nil {
return false, err
}
Expand Down Expand Up @@ -171,18 +171,20 @@ func (resp *watchResp) GetRevision() int64 {
}

func (c *Client) watch(resp func(watchResp keyval.BytesWatchResp), closeCh chan string, prefix string) error {
consulLogger.Debug("WATCH:", prefix)
consulLogger.Debug("watch:", prefix)

ctx, cancel := context.WithCancel(context.Background())

recvChan := c.watchPrefix(ctx, prefix)

go func(regPrefix string) {
defer cancel()
for {
select {
case wr, ok := <-recvChan:
if !ok {
consulLogger.WithField("prefix", prefix).Debug("Watch recv chan was closed")
consulLogger.WithField("prefix", prefix).
Debug("Watch recv chan was closed")
return
}
for _, ev := range wr.Events {
Expand Down Expand Up @@ -211,8 +213,8 @@ func (c *Client) watch(resp func(watchResp keyval.BytesWatchResp), closeCh chan
}
case closeVal, ok := <-closeCh:
if !ok || closeVal == regPrefix {
consulLogger.WithField("prefix", prefix).Debug("Watch ended")
cancel()
consulLogger.WithField("prefix", prefix).
Debug("Watch ended")
return
}
}
Expand Down Expand Up @@ -252,9 +254,9 @@ func (c *Client) watchPrefix(ctx context.Context, prefix string) <-chan watchRes
oldIndex := qm.LastIndex
oldPairsMap := make(map[string]*api.KVPair)

consulLogger.Debugf("..retrieved: %v old pairs (old index: %v)", len(oldPairs), oldIndex)
consulLogger.Debugf("prefix %v listing %v pairs (last index: %v)", prefix, len(oldPairs), oldIndex)
for _, pair := range oldPairs {
consulLogger.Debugf(" - key: %q create: %v modify: %v", pair.Key, pair.CreateIndex, pair.ModifyIndex)
consulLogger.Debugf(" - key: %q create: %v modify: %v value: %v", pair.Key, pair.CreateIndex, pair.ModifyIndex, len(pair.Value))
oldPairsMap[pair.Key] = pair
}

Expand All @@ -279,9 +281,9 @@ func (c *Client) watchPrefix(ctx context.Context, prefix string) <-chan watchRes
continue
}

consulLogger.Debugf("prefix %q: %v new pairs (new index: %v) %+v", prefix, len(newPairs), newIndex, qm)
consulLogger.Debugf("prefix %q: listing %v new pairs, new index: %v (old index: %v)", prefix, len(newPairs), newIndex, oldIndex)
for _, pair := range newPairs {
consulLogger.Debugf(" + key: %q create: %v modify: %v", pair.Key, pair.CreateIndex, pair.ModifyIndex)
consulLogger.Debugf(" + key: %q create: %v modify: %v value: %v", pair.Key, pair.CreateIndex, pair.ModifyIndex, len(pair.Value))
}

var evs []*watchEvent
Expand All @@ -293,6 +295,7 @@ func (c *Client) watchPrefix(ctx context.Context, prefix string) <-chan watchRes
if oldPair, ok := oldPairsMap[pair.Key]; ok {
prevVal = oldPair.Value
}
consulLogger.Debugf(" * modified key: %v prevValue: %v prevModify: %v", pair.Key, len(pair.Value), len(prevVal))
evs = append(evs, &watchEvent{
Type: datasync.Put,
Key: pair.Key,
Expand Down Expand Up @@ -392,7 +395,7 @@ func (pdb *BrokerWatcher) GetValue(key string) (data []byte, found bool, revisio
// KeyPrefix defined in constructor is prepended to the key argument.
// The prefix is removed from the keys of the returned values.
func (pdb *BrokerWatcher) ListValues(key string) (keyval.BytesKeyValIterator, error) {
pairs, _, err := pdb.client.KV().List(pdb.prefixKey(transformKey(key)), nil)
pairs, _, err := pdb.client.KV().List(pdb.prefixKey(key), nil)
if err != nil {
return nil, err
}
Expand All @@ -403,7 +406,7 @@ func (pdb *BrokerWatcher) ListValues(key string) (keyval.BytesKeyValIterator, er
// ListKeys calls 'ListKeys' function of the underlying BytesConnectionEtcd.
// KeyPrefix defined in constructor is prepended to the argument.
func (pdb *BrokerWatcher) ListKeys(prefix string) (keyval.BytesKeyIterator, error) {
keys, qm, err := pdb.client.KV().Keys(pdb.prefixKey(transformKey(prefix)), "", nil)
keys, qm, err := pdb.client.KV().Keys(pdb.prefixKey(prefix), "", nil)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -451,6 +454,9 @@ func (it *bytesKeyIterator) GetNext() (key string, rev int64, stop bool) {
}

key = string(it.keys[it.index])
if !strings.HasPrefix(key, "/") && strings.HasPrefix(it.prefix, "/") {
key = "/" + key
}
if it.prefix != "" {
key = strings.TrimPrefix(key, it.prefix)
}
Expand Down Expand Up @@ -483,6 +489,9 @@ func (it *bytesKeyValIterator) GetNext() (val keyval.BytesKeyVal, stop bool) {
}

key := string(it.pairs[it.index].Key)
if !strings.HasPrefix(key, "/") && strings.HasPrefix(it.prefix, "/") {
key = "/" + key
}
if it.prefix != "" {
key = strings.TrimPrefix(key, it.prefix)
}
Expand Down
105 changes: 98 additions & 7 deletions db/keyval/consul/consul_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,13 +149,43 @@ func TestListKeysPrefixed(t *testing.T) {
}
}

func TestListKeysPrefixedSlash(t *testing.T) {
ctx := setupTest(t)
defer ctx.teardownTest()

ctx.testSrv.PopulateKV(t, map[string][]byte{
"myprefix/key/1": []byte("val1"),
"myprefix/key/2": []byte("val2"),
"myprefix/anb/7": []byte("xxx"),
"key/x": []byte("valx"),
})

client := ctx.client.NewBroker("/myprefix/")
kvi, err := client.ListKeys("key/")
Expect(err).ToNot(HaveOccurred())
Expect(kvi).NotTo(BeNil())

expectedKeys := []string{"key/1", "key/2"}
for i := 0; i <= len(expectedKeys); i++ {
key, _, all := kvi.GetNext()
if i == len(expectedKeys) {
Expect(all).To(BeTrue())
break
}
Expect(all).To(BeFalse())
// verify that prefix of BytesBrokerWatcherEtcd is trimmed
Expect(key).To(BeEquivalentTo(expectedKeys[i]))
}
}

func TestListValues(t *testing.T) {
ctx := setupTest(t)
defer ctx.teardownTest()

ctx.testSrv.PopulateKV(t, map[string][]byte{
"key/1": []byte("val1"),
"key/2": []byte("val2"),
"key/1": []byte("val1"),
"key/2": []byte("val2"),
"foo/22": []byte("bar33"),
})

kvi, err := ctx.client.ListValues("key")
Expand All @@ -176,28 +206,89 @@ func TestListValues(t *testing.T) {
}
}

func TestListValuesSlash(t *testing.T) {
ctx := setupTest(t)
defer ctx.teardownTest()

ctx.testSrv.PopulateKV(t, map[string][]byte{
"key/1": []byte("val1"),
"key/2": []byte("val2"),
"foo/22": []byte("bar33"),
})

kvi, err := ctx.client.ListValues("/key")
Expect(err).ToNot(HaveOccurred())
Expect(kvi).NotTo(BeNil())

expectedKeys := []string{"key/1", "key/2"}
for i := 0; i <= len(expectedKeys); i++ {
kv, all := kvi.GetNext()
if i == len(expectedKeys) {
Expect(all).To(BeTrue())
break
}
Expect(kv).NotTo(BeNil())
Expect(all).To(BeFalse())
// verify that prefix of BytesBrokerWatcherEtcd is trimmed
Expect(kv.GetKey()).To(BeEquivalentTo(expectedKeys[i]))
}
}

func TestListValuesPrefixed(t *testing.T) {
ctx := setupTest(t)
defer ctx.teardownTest()

ctx.testSrv.PopulateKV(t, map[string][]byte{
"myprefix/key/1": []byte("val1"),
"myprefix/key/2": []byte("val2"),
"key/x": []byte("valx"),
"myprefix/key/at/1": []byte("val1"),
"myprefix/key/at/2": []byte("val2"),
"myprefix/key/bt/3": []byte("val3"),
"key/x": []byte("valx"),
})

client := ctx.client.NewBroker("myprefix/")
kvi, err := client.ListValues("key")
kvi, err := client.ListValues("key/at/")
Expect(err).ToNot(HaveOccurred())
Expect(kvi).NotTo(BeNil())

expectedKeys := []string{"key/1", "key/2"}
expectedKeys := []string{"key/at/1", "key/at/2"}
for i := 0; i <= len(expectedKeys); i++ {
kv, all := kvi.GetNext()
if i == len(expectedKeys) {
Expect(all).To(BeTrue())
break
}
t.Logf("%+v", kv.GetKey())
Expect(kv).NotTo(BeNil())
Expect(all).To(BeFalse())
// verify that prefix of BytesBrokerWatcherEtcd is trimmed
Expect(kv.GetKey()).To(BeEquivalentTo(expectedKeys[i]))
}
}

func TestListValuesPrefixedSlash(t *testing.T) {
ctx := setupTest(t)
defer ctx.teardownTest()

ctx.testSrv.PopulateKV(t, map[string][]byte{
"myprefix/key/at/1": []byte("val1"),
"myprefix/key/at/2": []byte("val2"),
"myprefix/key/bt/3": []byte("val3"),
"key/x": []byte("valx"),
})

client := ctx.client.NewBroker("/myprefix/")
kvi, err := client.ListValues("key/at/")
Expect(err).ToNot(HaveOccurred())
Expect(kvi).NotTo(BeNil())

expectedKeys := []string{"key/at/1", "key/at/2"}
for i := 0; i <= len(expectedKeys); i++ {
kv, all := kvi.GetNext()
if i == len(expectedKeys) {
Expect(all).To(BeTrue())
break
}
t.Logf("%+v", kv.GetKey())
Expect(kv).NotTo(BeNil())
Expect(all).To(BeFalse())
// verify that prefix of BytesBrokerWatcherEtcd is trimmed
Expand Down

0 comments on commit 22c14be

Please sign in to comment.