From 359a7a9b84c286a130dbb396b9b6b697c2655e81 Mon Sep 17 00:00:00 2001 From: Ondrej Fabry Date: Sat, 21 Jul 2018 01:22:16 +0200 Subject: [PATCH 1/3] Fix trimming prefix for brokers in Consul - forward slash from key prefix was trimmed before listing keys/values, but returned key-value pairs were not prepended back, causing iterators to fail to include their prefix (broker namespace) Signed-off-by: Ondrej Fabry --- db/keyval/consul/consul.go | 35 ++++--- db/keyval/consul/consul_integration_test.go | 105 ++++++++++++++++++-- 2 files changed, 120 insertions(+), 20 deletions(-) diff --git a/db/keyval/consul/consul.go b/db/keyval/consul/consul.go index 6c4621357..6e3c94dfa 100644 --- a/db/keyval/consul/consul.go +++ b/db/keyval/consul/consul.go @@ -67,7 +67,7 @@ func NewClient(cfg *api.Config) (store *Client, err error) { // Put stores given data for the key. func (c *Client) Put(key string, data []byte, opts ...datasync.PutOption) error { - consulLogger.Debugf("put: %q\n", key) + consulLogger.Debugf("Put: %q", key) p := &api.KVPair{Key: transformKey(key), Value: data} _, err := c.client.KV().Put(p, nil) if err != nil { @@ -86,7 +86,7 @@ func (c *Client) NewTxn() keyval.BytesTxn { // GetValue returns data for the given key. func (c *Client) GetValue(key string) (data []byte, found bool, revision int64, err error) { - consulLogger.Debugf("get value: %q\n", key) + consulLogger.Debugf("GetValue: %q", key) pair, _, err := c.client.KV().Get(transformKey(key), nil) if err != nil { return nil, false, 0, err @@ -119,7 +119,7 @@ func (c *Client) ListKeys(prefix string) (keyval.BytesKeyIterator, error) { // Delete deletes given key. func (c *Client) Delete(key string, opts ...datasync.DelOption) (existed bool, err error) { - consulLogger.Debugf("delete: %q\n", key) + consulLogger.Debugf("Delete: %q", key) if _, err := c.client.KV().Delete(transformKey(key), nil); err != nil { return false, err } @@ -171,18 +171,20 @@ func (resp *watchResp) GetRevision() int64 { } func (c *Client) watch(resp func(watchResp keyval.BytesWatchResp), closeCh chan string, prefix string) error { - consulLogger.Debug("WATCH:", prefix) + consulLogger.Debug("watch:", prefix) ctx, cancel := context.WithCancel(context.Background()) recvChan := c.watchPrefix(ctx, prefix) go func(regPrefix string) { + defer cancel() for { select { case wr, ok := <-recvChan: if !ok { - consulLogger.WithField("prefix", prefix).Debug("Watch recv chan was closed") + consulLogger.WithField("prefix", prefix). + Debug("Watch recv chan was closed") return } for _, ev := range wr.Events { @@ -211,8 +213,8 @@ func (c *Client) watch(resp func(watchResp keyval.BytesWatchResp), closeCh chan } case closeVal, ok := <-closeCh: if !ok || closeVal == regPrefix { - consulLogger.WithField("prefix", prefix).Debug("Watch ended") - cancel() + consulLogger.WithField("prefix", prefix). + Debug("Watch ended") return } } @@ -252,9 +254,9 @@ func (c *Client) watchPrefix(ctx context.Context, prefix string) <-chan watchRes oldIndex := qm.LastIndex oldPairsMap := make(map[string]*api.KVPair) - consulLogger.Debugf("..retrieved: %v old pairs (old index: %v)", len(oldPairs), oldIndex) + consulLogger.Debugf("prefix %v listing %v pairs (last index: %v)", prefix, len(oldPairs), oldIndex) for _, pair := range oldPairs { - consulLogger.Debugf(" - key: %q create: %v modify: %v", pair.Key, pair.CreateIndex, pair.ModifyIndex) + consulLogger.Debugf(" - key: %q create: %v modify: %v value: %v", pair.Key, pair.CreateIndex, pair.ModifyIndex, len(pair.Value)) oldPairsMap[pair.Key] = pair } @@ -279,9 +281,9 @@ func (c *Client) watchPrefix(ctx context.Context, prefix string) <-chan watchRes continue } - consulLogger.Debugf("prefix %q: %v new pairs (new index: %v) %+v", prefix, len(newPairs), newIndex, qm) + consulLogger.Debugf("prefix %q: listing %v new pairs, new index: %v (old index: %v)", prefix, len(newPairs), newIndex, oldIndex) for _, pair := range newPairs { - consulLogger.Debugf(" + key: %q create: %v modify: %v", pair.Key, pair.CreateIndex, pair.ModifyIndex) + consulLogger.Debugf(" + key: %q create: %v modify: %v value: %v", pair.Key, pair.CreateIndex, pair.ModifyIndex, len(pair.Value)) } var evs []*watchEvent @@ -293,6 +295,7 @@ func (c *Client) watchPrefix(ctx context.Context, prefix string) <-chan watchRes if oldPair, ok := oldPairsMap[pair.Key]; ok { prevVal = oldPair.Value } + consulLogger.Warnf(" * modified key: %v prevValue: %v prevModify: %v", pair.Key, len(pair.Value), len(prevVal)) evs = append(evs, &watchEvent{ Type: datasync.Put, Key: pair.Key, @@ -392,7 +395,7 @@ func (pdb *BrokerWatcher) GetValue(key string) (data []byte, found bool, revisio // KeyPrefix defined in constructor is prepended to the key argument. // The prefix is removed from the keys of the returned values. func (pdb *BrokerWatcher) ListValues(key string) (keyval.BytesKeyValIterator, error) { - pairs, _, err := pdb.client.KV().List(pdb.prefixKey(transformKey(key)), nil) + pairs, _, err := pdb.client.KV().List(pdb.prefixKey(key), nil) if err != nil { return nil, err } @@ -403,7 +406,7 @@ func (pdb *BrokerWatcher) ListValues(key string) (keyval.BytesKeyValIterator, er // ListKeys calls 'ListKeys' function of the underlying BytesConnectionEtcd. // KeyPrefix defined in constructor is prepended to the argument. func (pdb *BrokerWatcher) ListKeys(prefix string) (keyval.BytesKeyIterator, error) { - keys, qm, err := pdb.client.KV().Keys(pdb.prefixKey(transformKey(prefix)), "", nil) + keys, qm, err := pdb.client.KV().Keys(pdb.prefixKey(prefix), "", nil) if err != nil { return nil, err } @@ -451,6 +454,9 @@ func (it *bytesKeyIterator) GetNext() (key string, rev int64, stop bool) { } key = string(it.keys[it.index]) + if !strings.HasPrefix(key, "/") && strings.HasPrefix(it.prefix, "/") { + key = "/" + key + } if it.prefix != "" { key = strings.TrimPrefix(key, it.prefix) } @@ -483,6 +489,9 @@ func (it *bytesKeyValIterator) GetNext() (val keyval.BytesKeyVal, stop bool) { } key := string(it.pairs[it.index].Key) + if !strings.HasPrefix(key, "/") && strings.HasPrefix(it.prefix, "/") { + key = "/" + key + } if it.prefix != "" { key = strings.TrimPrefix(key, it.prefix) } diff --git a/db/keyval/consul/consul_integration_test.go b/db/keyval/consul/consul_integration_test.go index 1b262fc60..3e7051b9c 100644 --- a/db/keyval/consul/consul_integration_test.go +++ b/db/keyval/consul/consul_integration_test.go @@ -149,13 +149,43 @@ func TestListKeysPrefixed(t *testing.T) { } } +func TestListKeysPrefixedSlash(t *testing.T) { + ctx := setupTest(t) + defer ctx.teardownTest() + + ctx.testSrv.PopulateKV(t, map[string][]byte{ + "myprefix/key/1": []byte("val1"), + "myprefix/key/2": []byte("val2"), + "myprefix/anb/7": []byte("xxx"), + "key/x": []byte("valx"), + }) + + client := ctx.client.NewBroker("/myprefix/") + kvi, err := client.ListKeys("key/") + Expect(err).ToNot(HaveOccurred()) + Expect(kvi).NotTo(BeNil()) + + expectedKeys := []string{"key/1", "key/2"} + for i := 0; i <= len(expectedKeys); i++ { + key, _, all := kvi.GetNext() + if i == len(expectedKeys) { + Expect(all).To(BeTrue()) + break + } + Expect(all).To(BeFalse()) + // verify that prefix of BytesBrokerWatcherEtcd is trimmed + Expect(key).To(BeEquivalentTo(expectedKeys[i])) + } +} + func TestListValues(t *testing.T) { ctx := setupTest(t) defer ctx.teardownTest() ctx.testSrv.PopulateKV(t, map[string][]byte{ - "key/1": []byte("val1"), - "key/2": []byte("val2"), + "key/1": []byte("val1"), + "key/2": []byte("val2"), + "foo/22": []byte("bar33"), }) kvi, err := ctx.client.ListValues("key") @@ -176,28 +206,89 @@ func TestListValues(t *testing.T) { } } +func TestListValuesSlash(t *testing.T) { + ctx := setupTest(t) + defer ctx.teardownTest() + + ctx.testSrv.PopulateKV(t, map[string][]byte{ + "key/1": []byte("val1"), + "key/2": []byte("val2"), + "foo/22": []byte("bar33"), + }) + + kvi, err := ctx.client.ListValues("/key") + Expect(err).ToNot(HaveOccurred()) + Expect(kvi).NotTo(BeNil()) + + expectedKeys := []string{"key/1", "key/2"} + for i := 0; i <= len(expectedKeys); i++ { + kv, all := kvi.GetNext() + if i == len(expectedKeys) { + Expect(all).To(BeTrue()) + break + } + Expect(kv).NotTo(BeNil()) + Expect(all).To(BeFalse()) + // verify that prefix of BytesBrokerWatcherEtcd is trimmed + Expect(kv.GetKey()).To(BeEquivalentTo(expectedKeys[i])) + } +} + func TestListValuesPrefixed(t *testing.T) { ctx := setupTest(t) defer ctx.teardownTest() ctx.testSrv.PopulateKV(t, map[string][]byte{ - "myprefix/key/1": []byte("val1"), - "myprefix/key/2": []byte("val2"), - "key/x": []byte("valx"), + "myprefix/key/at/1": []byte("val1"), + "myprefix/key/at/2": []byte("val2"), + "myprefix/key/bt/3": []byte("val3"), + "key/x": []byte("valx"), }) client := ctx.client.NewBroker("myprefix/") - kvi, err := client.ListValues("key") + kvi, err := client.ListValues("key/at/") Expect(err).ToNot(HaveOccurred()) Expect(kvi).NotTo(BeNil()) - expectedKeys := []string{"key/1", "key/2"} + expectedKeys := []string{"key/at/1", "key/at/2"} + for i := 0; i <= len(expectedKeys); i++ { + kv, all := kvi.GetNext() + if i == len(expectedKeys) { + Expect(all).To(BeTrue()) + break + } + t.Logf("%+v", kv.GetKey()) + Expect(kv).NotTo(BeNil()) + Expect(all).To(BeFalse()) + // verify that prefix of BytesBrokerWatcherEtcd is trimmed + Expect(kv.GetKey()).To(BeEquivalentTo(expectedKeys[i])) + } +} + +func TestListValuesPrefixedSlash(t *testing.T) { + ctx := setupTest(t) + defer ctx.teardownTest() + + ctx.testSrv.PopulateKV(t, map[string][]byte{ + "myprefix/key/at/1": []byte("val1"), + "myprefix/key/at/2": []byte("val2"), + "myprefix/key/bt/3": []byte("val3"), + "key/x": []byte("valx"), + }) + + client := ctx.client.NewBroker("/myprefix/") + kvi, err := client.ListValues("key/at/") + Expect(err).ToNot(HaveOccurred()) + Expect(kvi).NotTo(BeNil()) + + expectedKeys := []string{"key/at/1", "key/at/2"} for i := 0; i <= len(expectedKeys); i++ { kv, all := kvi.GetNext() if i == len(expectedKeys) { Expect(all).To(BeTrue()) break } + t.Logf("%+v", kv.GetKey()) Expect(kv).NotTo(BeNil()) Expect(all).To(BeFalse()) // verify that prefix of BytesBrokerWatcherEtcd is trimmed From 5e663698aac8cd809684bffa6630ac334fb2802f Mon Sep 17 00:00:00 2001 From: Ondrej Fabry Date: Mon, 23 Jul 2018 10:47:02 +0200 Subject: [PATCH 2/3] Fix log level Signed-off-by: Ondrej Fabry --- db/keyval/consul/consul.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/db/keyval/consul/consul.go b/db/keyval/consul/consul.go index 6e3c94dfa..b3d01b543 100644 --- a/db/keyval/consul/consul.go +++ b/db/keyval/consul/consul.go @@ -295,7 +295,7 @@ func (c *Client) watchPrefix(ctx context.Context, prefix string) <-chan watchRes if oldPair, ok := oldPairsMap[pair.Key]; ok { prevVal = oldPair.Value } - consulLogger.Warnf(" * modified key: %v prevValue: %v prevModify: %v", pair.Key, len(pair.Value), len(prevVal)) + consulLogger.Debugf(" * modified key: %v prevValue: %v prevModify: %v", pair.Key, len(pair.Value), len(prevVal)) evs = append(evs, &watchEvent{ Type: datasync.Put, Key: pair.Key, From 3058b3bfb91c88646447564a733ff639d5e11f85 Mon Sep 17 00:00:00 2001 From: Ondrej Fabry Date: Mon, 23 Jul 2018 10:52:15 +0200 Subject: [PATCH 3/3] Update CHANGELOG Signed-off-by: Ondrej Fabry --- CHANGELOG.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4636ba6ff..46a9a2fab 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +# Release v1.4.1 (2018-07-23) + +## Bugfix + * Fixed issue in Consul client that caused brokers to incorrectly + trim prefixes and thus storing invalid revisions for resync. + # Release v1.4 (2018-07-16) ## Breaking Changes