Skip to content

Commit

Permalink
Fix query list with revision
Browse files Browse the repository at this point in the history
  • Loading branch information
neoaggelos committed Mar 4, 2024
1 parent c2f7aef commit 615dd19
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 31 deletions.
46 changes: 18 additions & 28 deletions pkg/kine/drivers/generic/generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,34 +38,24 @@ var (
ORDER BY kv.name ASC, kv.id ASC
`, columns)

// FIXME this query doesn't seem sound.
revisionAfterSQL = fmt.Sprintf(`
SELECT *
FROM (
SELECT %s
FROM kine AS kv
JOIN (
SELECT MAX(mkv.id) AS id
FROM kine AS mkv
WHERE mkv.name >= ? AND mkv.name < ?
AND mkv.id <= ?
AND mkv.id > (
SELECT ikv.id
FROM kine AS ikv
WHERE
ikv.name = ? AND
ikv.id <= ?
ORDER BY ikv.id DESC
LIMIT 1
)
GROUP BY mkv.name
) AS maxkv
ON maxkv.id = kv.id
WHERE
? OR kv.deleted = 0
) AS lkv
ORDER BY lkv.theid ASC
`, columns)
SELECT *
FROM (
SELECT %s
FROM kine AS kv
JOIN (
SELECT MAX(mkv.id) AS id
FROM kine AS mkv
WHERE mkv.name >= ? AND mkv.name < ?
AND mkv.id <= ?
GROUP BY mkv.name
) AS maxkv
ON maxkv.id = kv.id
WHERE
? OR kv.deleted = 0
) AS lkv
ORDER BY lkv.name ASC, lkv.theid ASC
`, columns)

revisionIntervalSQL = `
SELECT (
Expand Down Expand Up @@ -537,7 +527,7 @@ func (d *Generic) List(ctx context.Context, prefix, startKey string, limit, revi
if limit > 0 {
sql = fmt.Sprintf("%s LIMIT %d", sql, limit)
}
return d.query(ctx, "get_revision_after_sql", sql, start, end, revision, startKey, revision, includeDeleted)
return d.query(ctx, "get_revision_after_sql", sql, startKey+"\x01", end, revision, includeDeleted)
}

func (d *Generic) CurrentRevision(ctx context.Context) (int64, error) {
Expand Down
26 changes: 23 additions & 3 deletions test/list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package test
import (
"context"
"fmt"
"math/rand"
"testing"

. "github.com/onsi/gomega"
Expand All @@ -18,7 +19,7 @@ func TestList(t *testing.T) {
g := NewWithT(t)

// Create some keys
keys := []string{"/key/2", "/key/1", "/key/3", "/key/4", "/key/5"}
keys := shuffleList([]string{"/key/1", "/key/2", "/key/3", "/key/4", "/key/5"})
for _, key := range keys {
resp, err := client.Txn(ctx).
If(clientv3.Compare(clientv3.ModRevision(key), "=", 0)).
Expand Down Expand Up @@ -46,6 +47,7 @@ func TestList(t *testing.T) {
})

t.Run("ListAllLimit", func(t *testing.T) {
var revision int64
t.Run("FirstPage", func(t *testing.T) {
g := NewWithT(t)

Expand All @@ -58,14 +60,16 @@ func TestList(t *testing.T) {
g.Expect(resp.Header.Revision).ToNot(BeZero())
g.Expect(resp.Kvs[0].Key).To(Equal([]byte("/key/1")))
g.Expect(resp.Kvs[1].Key).To(Equal([]byte("/key/2")))

revision = resp.Header.Revision
})

t.Run("SecondPage", func(t *testing.T) {
g := NewWithT(t)

// Inspired from https://github.com/kubernetes/kubernetes/blob/3f4d3b67682335db510f85deb65b322127a3a0a1/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go#L788-L793
// Key is "last_key" + "\x00", and we use the prefix range end
resp, err := client.Get(ctx, "/key/2\x00", clientv3.WithRange(clientv3.GetPrefixRangeEnd("/key")), clientv3.WithLimit(2))
resp, err := client.Get(ctx, "/key/2\x00", clientv3.WithRange(clientv3.GetPrefixRangeEnd("/key")), clientv3.WithLimit(2), clientv3.WithRev(revision))

g.Expect(err).To(BeNil())
g.Expect(resp.Kvs).To(HaveLen(2))
Expand All @@ -74,13 +78,15 @@ func TestList(t *testing.T) {
g.Expect(resp.Header.Revision).ToNot(BeZero())
g.Expect(resp.Kvs[0].Key).To(Equal([]byte("/key/3")))
g.Expect(resp.Kvs[1].Key).To(Equal([]byte("/key/4")))

revision = resp.Header.Revision
})

t.Run("ThirdPage", func(t *testing.T) {
g := NewWithT(t)

// Get a list of all the keys
resp, err := client.Get(ctx, "/key/4\x00", clientv3.WithRange(clientv3.GetPrefixRangeEnd("/key")), clientv3.WithLimit(2))
resp, err := client.Get(ctx, "/key/4\x00", clientv3.WithRange(clientv3.GetPrefixRangeEnd("/key")), clientv3.WithLimit(2), clientv3.WithRev(revision))

g.Expect(err).To(BeNil())
g.Expect(resp.Kvs).To(HaveLen(1))
Expand Down Expand Up @@ -247,3 +253,17 @@ func BenchmarkList(b *testing.B) {
}
})
}

func shuffleList[T any](vals []T) []T {
if len(vals) == 0 {
return vals
}

perm := rand.Perm(len(vals))
shuffled := make([]T, 0, len(vals))
for _, i := range perm {
shuffled = append(shuffled, vals[perm[i]])
}

return shuffled
}

0 comments on commit 615dd19

Please sign in to comment.