diff --git a/pkg/kine/drivers/generic/generic.go b/pkg/kine/drivers/generic/generic.go index 6f2e6c4a..27c9b6d1 100644 --- a/pkg/kine/drivers/generic/generic.go +++ b/pkg/kine/drivers/generic/generic.go @@ -24,14 +24,17 @@ var ( listSQL = fmt.Sprintf(` SELECT %s - FROM kine AS kv - LEFT JOIN kine kv2 - ON kv.name = kv2.name - AND kv.id < kv2.id - WHERE kv2.name IS NULL - AND kv.name >= ? AND kv.name < ? - AND (? OR kv.deleted = 0) - %%s + FROM kine kv + JOIN ( + SELECT MAX(mkv.id) as id + FROM kine mkv + WHERE + mkv.name >= ? AND mkv.name < ? + %%s + GROUP BY mkv.name) maxkv + ON maxkv.id = kv.id + WHERE + (kv.deleted = 0 OR ?) ORDER BY kv.id ASC `, columns) @@ -200,7 +203,7 @@ func Open(ctx context.Context, driverName, dataSourceName string, paramCharacter WHERE kv.id = ?`, columns), paramCharacter, numbered), GetCurrentSQL: q(fmt.Sprintf(listSQL, ""), paramCharacter, numbered), - ListRevisionStartSQL: q(fmt.Sprintf(listSQL, "AND kv.id <= ?"), paramCharacter, numbered), + ListRevisionStartSQL: q(fmt.Sprintf(listSQL, "AND mkv.id <= ?"), paramCharacter, numbered), GetRevisionAfterSQL: q(revisionAfterSQL, paramCharacter, numbered), CountSQL: q(fmt.Sprintf(` diff --git a/test/update_test.go b/test/update_test.go index 1daaf7a2..5190e549 100644 --- a/test/update_test.go +++ b/test/update_test.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "testing" + "time" . "github.com/onsi/gomega" clientv3 "go.etcd.io/etcd/client/v3" @@ -77,6 +78,24 @@ func TestUpdate(t *testing.T) { g.Expect(resp.Kvs[0].ModRevision).To(BeNumerically(">", resp.Kvs[0].CreateRevision)) } }) + t.Run("UpdateSameKeyLinearity", func(t *testing.T) { + g := NewWithT(t) + + // Add a large number of entries, two times and + // it should take approximately same amout of time + numAddEntries := 1000 + + startFirst := time.Now() + addSameEntries(ctx, g, client, numAddEntries, true) + durationFirstBatch := time.Since(startFirst) + + startSecond := time.Now() + addSameEntries(ctx, g, client, numAddEntries, false) + durationSecondBatch := time.Since(startSecond) + + g.Expect(durationSecondBatch <= durationFirstBatch*2).To(BeTrue()) + + }) // Trying to update an old revision(in compare) should fail t.Run("UpdateOldRevisionFails", func(t *testing.T) { @@ -125,6 +144,37 @@ func TestUpdate(t *testing.T) { } +func addSameEntries(ctx context.Context, g Gomega, client *clientv3.Client, numEntries int, create_first bool) { + for i := 0; i < numEntries; i++ { + key := "testkey-same" + value := fmt.Sprintf("value-%d", i) + + if i != 0 || !create_first { + updateEntry(ctx, g, client, key, value) + } else { + addEntry(ctx, g, client, key, value) + } + } +} + +func updateEntry(ctx context.Context, g Gomega, client *clientv3.Client, key string, value string) { + + resp, err := client.Get(ctx, key, clientv3.WithRange("")) + + g.Expect(err).To(BeNil()) + g.Expect(resp.Kvs).To(HaveLen(1)) + + resp2, err2 := client.Txn(ctx). + If(clientv3.Compare(clientv3.ModRevision(key), "=", resp.Kvs[0].ModRevision)). + Then(clientv3.OpPut(key, value)). + Else(clientv3.OpGet(key, clientv3.WithRange(""))). + Commit() + + g.Expect(err2).To(BeNil()) + g.Expect(resp2.Succeeded).To(BeTrue()) + +} + // BenchmarkUpdate is a benchmark for the Update operation. func BenchmarkUpdate(b *testing.B) { ctx := context.Background()