Skip to content

Commit

Permalink
Improve list query
Browse files Browse the repository at this point in the history
  • Loading branch information
marco6 committed Jul 24, 2024
1 parent aac7502 commit a0f5644
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 99 deletions.
149 changes: 79 additions & 70 deletions pkg/kine/drivers/generic/generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,56 +62,56 @@ var (
columns = "kv.id as theid, kv.name, kv.created, kv.deleted, kv.create_revision, kv.prev_revision, kv.lease, kv.value, kv.old_value"

revSQL = `
SELECT MAX(rkv.id) AS id
FROM kine AS rkv`

SELECT MAX(id) AS id
FROM kine`

// listSQL query looks for the latest version of every row in
// the range and returns all columns from it.
// The search for the "latest id" (table `maxkv` in the query)
// can be carried on quickly with a covering index (kine_name_index).
// The `deleted <= ?` is used to select deleted rows:
// - when the argument is 0 (false), the only rows selected are
// those with deleted = 0 (i.e. alive)
// - when the argument is 1 (true), all rows will be selected,
// including deleted ones.
// Unfortunately, using a normal JOIN operation will confuse
// SQLite planner and insert a SORT temp table at the end of
// the plan, forcing SQLite to load and sort the entire set
// before returning it (and making the cost of a paginated
// query very high) and returning an unsorted set would make
// pagination impossible.
// To workaround this silly misplan, a ORDER by in the first
// table forces ordering of `maxkv` (without paying for it
// as it is the same order as the index) and CROSS JOIN is
// used as it forces SQLite to keep the outer-loop order
// when joining tables. See https://www.sqlite.org/optoverview.html#crossjoin
// for more details.
listSQL = fmt.Sprintf(`
SELECT %s
FROM kine kv
JOIN (
SELECT MAX(mkv.id) as id
FROM kine mkv
WITH maxkv AS (
SELECT MAX(id) AS id
FROM kine
WHERE
mkv.name >= ? AND mkv.name < ?
name >= ? AND name < ?
%%s
GROUP BY mkv.name) maxkv
GROUP BY name
HAVING deleted <= ?
ORDER BY name
)
SELECT %s
FROM maxkv CROSS JOIN kine kv
ON maxkv.id = kv.id
WHERE
(kv.deleted = 0 OR ?)
ORDER BY kv.name ASC, kv.id ASC
`, columns)

revisionAfterSQL = fmt.Sprintf(`
SELECT *
FROM (
SELECT %s
FROM kine AS kv
JOIN (
SELECT MAX(mkv.id) AS id
FROM kine AS mkv
WHERE mkv.name >= ? AND mkv.name < ?
AND mkv.id <= ?
GROUP BY mkv.name
) AS maxkv
ON maxkv.id = kv.id
WHERE
? OR kv.deleted = 0
) AS lkv
ORDER BY lkv.name ASC, lkv.theid ASC
`, columns)

revisionIntervalSQL = `
SELECT (
SELECT crkv.prev_revision
FROM kine AS crkv
WHERE crkv.name = 'compact_rev_key'
SELECT prev_revision
FROM kine
WHERE name = 'compact_rev_key'
ORDER BY prev_revision
DESC LIMIT 1
) AS low, (
SELECT id
SELECT MAX(id)
FROM kine
ORDER BY id
DESC LIMIT 1
) AS high`
)

Expand All @@ -138,7 +138,6 @@ type Generic struct {
GetRevisionSQL string
RevisionSQL string
ListRevisionStartSQL string
GetRevisionAfterSQL string
CountCurrentSQL string
CountRevisionSQL string
AfterSQLPrefix string
Expand Down Expand Up @@ -224,40 +223,54 @@ func Open(ctx context.Context, driverName, dataSourceName string, paramCharacter
DB: prepared.New(db, 100),

GetRevisionSQL: q(fmt.Sprintf(`
SELECT
%s
FROM kine kv
WHERE kv.id = ?`, columns), paramCharacter, numbered),
SELECT %s
FROM kine AS kv
WHERE id = ?`, columns), paramCharacter, numbered),

GetCurrentSQL: q(fmt.Sprintf(listSQL, ""), paramCharacter, numbered),
ListRevisionStartSQL: q(fmt.Sprintf(listSQL, "AND mkv.id <= ?"), paramCharacter, numbered),
GetRevisionAfterSQL: q(revisionAfterSQL, paramCharacter, numbered),
ListRevisionStartSQL: q(fmt.Sprintf(listSQL, "AND id <= ?"), paramCharacter, numbered),

CountCurrentSQL: q(fmt.Sprintf(`
SELECT (%s), COUNT(*)
CountCurrentSQL: q(`
SELECT (
SELECT COALESCE(MAX(id), 0) AS id
FROM kine
), COUNT(*)
FROM (
%s
) c`, revSQL, fmt.Sprintf(listSQL, "")), paramCharacter, numbered),

CountRevisionSQL: q(fmt.Sprintf(`
SELECT (%s), COUNT(c.theid)
SELECT MAX(id) AS id
FROM kine
WHERE
name >= ? AND name < ?
GROUP BY name
HAVING deleted = 0
) c`, paramCharacter, numbered),

CountRevisionSQL: q(`
SELECT (
SELECT COALESCE(MAX(id), 0) AS id
FROM kine
), COUNT(*)
FROM (
%s
) c`, revSQL, fmt.Sprintf(listSQL, "AND mkv.id <= ?")), paramCharacter, numbered),
SELECT MAX(id) AS id
FROM kine
WHERE
name >= ? AND name < ?
AND id <= ?
GROUP BY name
HAVING deleted = 0
) c`, paramCharacter, numbered),

AfterSQLPrefix: q(fmt.Sprintf(`
SELECT %s
FROM kine AS kv
WHERE
kv.name >= ? AND kv.name < ?
AND kv.id > ?
ORDER BY kv.id ASC`, columns), paramCharacter, numbered),
WHERE name >= ? AND name < ?
AND id > ?
ORDER BY id ASC`, columns), paramCharacter, numbered),

AfterSQL: q(fmt.Sprintf(`
SELECT %s
FROM kine AS kv
WHERE kv.id > ?
ORDER BY kv.id ASC
FROM kine AS kv
WHERE id > ?
ORDER BY id ASC
`, columns), paramCharacter, numbered),

DeleteSQL: q(`
Expand Down Expand Up @@ -408,7 +421,7 @@ func (d *Generic) Count(ctx context.Context, prefix, startKey string, revision i
if startKey != "" {
start = startKey + "\x01"
}
rows, err := d.query(ctx, "count_revision", d.CountRevisionSQL, start, end, revision, false)
rows, err := d.query(ctx, "count_revision", d.CountRevisionSQL, start, end, revision)
if err != nil {
return 0, 0, err
}
Expand Down Expand Up @@ -528,19 +541,15 @@ func (d *Generic) ListCurrent(ctx context.Context, prefix, startKey string, limi

func (d *Generic) List(ctx context.Context, prefix, startKey string, limit, revision int64, includeDeleted bool) (*sql.Rows, error) {
start, end := getPrefixRange(prefix)
if startKey == "" {
sql := d.ListRevisionStartSQL
if limit > 0 {
sql = fmt.Sprintf("%s LIMIT %d", sql, limit)
}
return d.query(ctx, "list_revision_start_sql", sql, start, end, revision, includeDeleted)
if startKey != "" {
start = startKey + "\x01"
}

sql := d.GetRevisionAfterSQL
sql := d.ListRevisionStartSQL
if limit > 0 {
sql = fmt.Sprintf("%s LIMIT %d", sql, limit)
}
return d.query(ctx, "get_revision_after_sql", sql, startKey+"\x01", end, revision, includeDeleted)
return d.query(ctx, "list_revision_start_sql", sql, start, end, revision, includeDeleted)
}

func (d *Generic) CurrentRevision(ctx context.Context) (int64, error) {
Expand Down
15 changes: 14 additions & 1 deletion pkg/kine/drivers/sqlite/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
// present anymore, and unexpired rows of the key_value table with
// the latest revisions must have been recorded in the Kine table
// already
const databaseSchemaVersion = 1
const databaseSchemaVersion = 2

// applySchemaV1 moves the schema from version 0 to version 1,
// taking into account the possible unversioned schema from
Expand Down Expand Up @@ -61,6 +61,19 @@ CREATE TABLE kine
return nil
}

// applySchemaV2 moves the schema from version 1 to version 2
func applySchemaV2(ctx context.Context, txn *sql.Tx) error {
if _, err := txn.ExecContext(ctx, `DROP INDEX kine_name_index`); err != nil {
return err
}

if _, err := txn.ExecContext(ctx, `CREATE UNIQUE INDEX kine_name_index ON kine(name, id, deleted)`); err != nil {
return err
}

return nil
}

// hasTable checks if a table exists.
func hasTable(ctx context.Context, txn *sql.Tx, tableName string) (bool, error) {
// FIXME: why we can't use `pragma_table_list()`? Is dqlite/sqlite using
Expand Down
7 changes: 4 additions & 3 deletions pkg/kine/drivers/sqlite/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,11 +157,12 @@ func migrate(ctx context.Context, txn *sql.Tx) error {
return err
}
fallthrough
case 1:
if err := applySchemaV2(ctx, txn); err != nil {
return err
}
case databaseSchemaVersion:
break
default:
// FIXME this needs better handling
return errors.Errorf("unsupported version: %d", userVersion)
}

setUserVersionSQL := fmt.Sprintf(`PRAGMA user_version = %d`, databaseSchemaVersion)
Expand Down
3 changes: 0 additions & 3 deletions test/list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,17 +183,14 @@ func BenchmarkList(b *testing.B) {
if err := insertMany(ctx, tx, "key", payloadSize, n); err != nil {
return err
}
b.Log("insert", n)

if err := updateMany(ctx, tx, "key", payloadSize, n/2); err != nil {
return err
}
b.Log("update", n)

if err := deleteMany(ctx, tx, "key", n/2); err != nil {
return err
}
b.Log("delete", n)
return nil
}
backends := []string{endpoint.SQLiteBackend, endpoint.DQLiteBackend}
Expand Down
46 changes: 24 additions & 22 deletions test/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,42 +212,44 @@ FROM gen_id, revision`

func updateMany(ctx context.Context, tx *sql.Tx, prefix string, valueSize, n int) error {
updateManyQuery := fmt.Sprintf(`
WITH maxkv AS (
SELECT MAX(id) AS id
FROM kine
WHERE
?||'/' <= name AND name < ?||'0'
GROUP BY name
HAVING deleted = 0
ORDER BY name
)
INSERT INTO kine(
name, created, deleted, create_revision, prev_revision, lease, value, old_value
)
SELECT kv.name, 0, 0, kv.create_revision, kv.id, 0, randomblob(?), kv.value
FROM kine AS kv
JOIN (
SELECT MAX(mkv.id) as id
FROM kine mkv
WHERE ?||'/' <= mkv.name AND mkv.name < ?||'0'
GROUP BY mkv.name
) maxkv ON maxkv.id = kv.id
WHERE kv.deleted = 0
ORDER BY kv.name
LIMIT %d
`, n)
FROM maxkv CROSS JOIN kine kv
ON maxkv.id = kv.id
LIMIT %d`, n)
_, err := tx.ExecContext(ctx, updateManyQuery, valueSize, prefix, prefix)
return err
}

func deleteMany(ctx context.Context, tx *sql.Tx, prefix string, n int) error {
deleteManyQuery := fmt.Sprintf(`
WITH maxkv AS (
SELECT MAX(id) AS id
FROM kine
WHERE
?||'/' <= name AND name < ?||'0'
GROUP BY name
HAVING deleted = 0
ORDER BY name
)
INSERT INTO kine(
name, created, deleted, create_revision, prev_revision, lease, value, old_value
)
SELECT kv.name, 0, 1, kv.create_revision, kv.id, 0, kv.value, kv.value
FROM kine AS kv
JOIN (
SELECT MAX(mkv.id) as id
FROM kine mkv
WHERE ?||'/' <= mkv.name AND mkv.name < ?||'0'
GROUP BY mkv.name
) maxkv ON maxkv.id = kv.id
WHERE kv.deleted = 0
ORDER BY kv.name
LIMIT %d
`, n)
FROM maxkv CROSS JOIN kine kv
ON maxkv.id = kv.id
LIMIT %d`, n)
_, err := tx.ExecContext(ctx, deleteManyQuery, prefix, prefix)
return err
}

0 comments on commit a0f5644

Please sign in to comment.