Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rewritten list query, test and fix [#82] #83

Merged
merged 4 commits into from
Feb 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 12 additions & 9 deletions pkg/kine/drivers/generic/generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,17 @@ var (

listSQL = fmt.Sprintf(`
SELECT %s
FROM kine AS kv
LEFT JOIN kine kv2
ON kv.name = kv2.name
AND kv.id < kv2.id
WHERE kv2.name IS NULL
AND kv.name >= ? AND kv.name < ?
AND (? OR kv.deleted = 0)
%%s
FROM kine kv
JOIN (
SELECT MAX(mkv.id) as id
FROM kine mkv
WHERE
mkv.name >= ? AND mkv.name < ?
%%s
GROUP BY mkv.name) maxkv
ON maxkv.id = kv.id
WHERE
(kv.deleted = 0 OR ?)
ORDER BY kv.id ASC
`, columns)

Expand Down Expand Up @@ -200,7 +203,7 @@ func Open(ctx context.Context, driverName, dataSourceName string, paramCharacter
WHERE kv.id = ?`, columns), paramCharacter, numbered),

GetCurrentSQL: q(fmt.Sprintf(listSQL, ""), paramCharacter, numbered),
ListRevisionStartSQL: q(fmt.Sprintf(listSQL, "AND kv.id <= ?"), paramCharacter, numbered),
ListRevisionStartSQL: q(fmt.Sprintf(listSQL, "AND mkv.id <= ?"), paramCharacter, numbered),
GetRevisionAfterSQL: q(revisionAfterSQL, paramCharacter, numbered),

CountSQL: q(fmt.Sprintf(`
Expand Down
50 changes: 50 additions & 0 deletions test/update_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"testing"
"time"

. "github.com/onsi/gomega"
clientv3 "go.etcd.io/etcd/client/v3"
Expand Down Expand Up @@ -77,6 +78,24 @@ func TestUpdate(t *testing.T) {
g.Expect(resp.Kvs[0].ModRevision).To(BeNumerically(">", resp.Kvs[0].CreateRevision))
}
})
t.Run("UpdateSameKeyLinearity", func(t *testing.T) {
g := NewWithT(t)

// Add a large number of entries, two times and
// it should take approximately same amout of time
numAddEntries := 1000

startFirst := time.Now()
addSameEntries(ctx, g, client, numAddEntries, true)
durationFirstBatch := time.Since(startFirst)

startSecond := time.Now()
addSameEntries(ctx, g, client, numAddEntries, false)
durationSecondBatch := time.Since(startSecond)

g.Expect(durationSecondBatch <= durationFirstBatch*2).To(BeTrue())

})

// Trying to update an old revision(in compare) should fail
t.Run("UpdateOldRevisionFails", func(t *testing.T) {
Expand Down Expand Up @@ -125,6 +144,37 @@ func TestUpdate(t *testing.T) {

}

func addSameEntries(ctx context.Context, g Gomega, client *clientv3.Client, numEntries int, create_first bool) {
for i := 0; i < numEntries; i++ {
key := "testkey-same"
value := fmt.Sprintf("value-%d", i)

if i != 0 || !create_first {
updateEntry(ctx, g, client, key, value)
} else {
addEntry(ctx, g, client, key, value)
}
}
}

func updateEntry(ctx context.Context, g Gomega, client *clientv3.Client, key string, value string) {

resp, err := client.Get(ctx, key, clientv3.WithRange(""))

g.Expect(err).To(BeNil())
g.Expect(resp.Kvs).To(HaveLen(1))

resp2, err2 := client.Txn(ctx).
If(clientv3.Compare(clientv3.ModRevision(key), "=", resp.Kvs[0].ModRevision)).
Then(clientv3.OpPut(key, value)).
Else(clientv3.OpGet(key, clientv3.WithRange(""))).
Commit()

g.Expect(err2).To(BeNil())
g.Expect(resp2.Succeeded).To(BeTrue())

}

// BenchmarkUpdate is a benchmark for the Update operation.
func BenchmarkUpdate(b *testing.B) {
ctx := context.Background()
Expand Down
Loading