From 40bd618b3ee997a3eefadab617c4965e90fea561 Mon Sep 17 00:00:00 2001 From: Marco Manino Date: Wed, 27 Nov 2024 18:07:20 +0100 Subject: [PATCH] Improve queries after LogStructured removal (#195) --- pkg/kine/drivers/generic/generic.go | 87 ++++++++++++++++++----------- pkg/kine/sqllog/sqllog.go | 50 ++++++++--------- 2 files changed, 78 insertions(+), 59 deletions(-) diff --git a/pkg/kine/drivers/generic/generic.go b/pkg/kine/drivers/generic/generic.go index e75323f6..27dfe581 100644 --- a/pkg/kine/drivers/generic/generic.go +++ b/pkg/kine/drivers/generic/generic.go @@ -79,14 +79,16 @@ func init() { } var ( - columns = "kv.id as theid, kv.name, kv.created, kv.deleted, kv.create_revision, kv.prev_revision, kv.lease, kv.value, kv.old_value" - revSQL = ` SELECT MAX(rkv.id) AS id FROM kine AS rkv` - listSQL = fmt.Sprintf(` - SELECT %s + listSQL = ` + SELECT kv.id, + name, + CASE WHEN kv.created THEN kv.id ELSE kv.create_revision END AS create_revision, + lease, + value FROM kine AS kv JOIN ( SELECT MAX(mkv.id) as id @@ -97,9 +99,8 @@ var ( GROUP BY mkv.name ) AS maxkv ON maxkv.id = kv.id - WHERE (kv.deleted = 0 OR ?) - ORDER BY kv.name ASC, kv.id ASC - `, columns) + WHERE kv.deleted = 0 + ORDER BY kv.name ASC, kv.id ASC` revisionIntervalSQL = ` SELECT ( @@ -111,28 +112,46 @@ var ( FROM kine ) AS high` - listRevisionStartSQL = listSQL - - countRevisionSQL = fmt.Sprintf(` + countRevisionSQL = ` SELECT COUNT(*) - FROM ( - %s - )`, listSQL) - - afterSQLPrefix = fmt.Sprintf(` - SELECT %s FROM kine AS kv - WHERE - kv.name >= ? AND kv.name < ? - AND kv.id > ? - ORDER BY kv.id ASC`, columns) - - afterSQL = fmt.Sprintf(` - SELECT %s - FROM kine AS kv - WHERE kv.id > ? - ORDER BY kv.id ASC - `, columns) + JOIN ( + SELECT MAX(mkv.id) as id + FROM kine AS mkv + WHERE + mkv.name >= ? AND mkv.name < ? + AND mkv.id <= ? + GROUP BY mkv.name + ) AS maxkv + ON maxkv.id = kv.id + WHERE kv.deleted = 0` + + afterSQLPrefix = ` + SELECT id, name, created, deleted, create_revision, prev_revision, lease, value, old_value + FROM kine + WHERE name >= ? AND name < ? + AND id > ? + ORDER BY id ASC` + + afterSQL = ` + SELECT id, name, created, deleted, create_revision, prev_revision, lease, value, old_value + FROM kine + WHERE id > ? + ORDER BY id ASC` + + ttlSQL = ` + SELECT kv.id, + name, + lease + FROM kine AS kv + JOIN ( + SELECT MAX(mkv.id) as id + FROM kine AS mkv + WHERE mkv.id <= ? + GROUP BY mkv.name + ) AS maxkv + ON maxkv.id = kv.id + WHERE kv.deleted = 0 AND kv.lease != 0` deleteRevSQL = ` DELETE FROM kine @@ -408,7 +427,7 @@ func (d *Generic) Count(ctx context.Context, prefix, startKey string, revision i if startKey != "" { start = startKey + "\x01" } - rows, err := d.query(ctx, "count_revision", countRevisionSQL, start, end, revision, false) + rows, err := d.query(ctx, "count_revision", countRevisionSQL, start, end, revision) if err != nil { return 0, err } @@ -651,17 +670,21 @@ func (d *Generic) DeleteRevision(ctx context.Context, revision int64) error { return err } -func (d *Generic) List(ctx context.Context, prefix, startKey string, limit, revision int64, includeDeleted bool) (*sql.Rows, error) { +func (d *Generic) List(ctx context.Context, prefix, startKey string, limit, revision int64) (*sql.Rows, error) { start, end := getPrefixRange(prefix) if startKey != "" { start = startKey + "\x01" } - sql := listRevisionStartSQL + sql := listSQL if limit > 0 { sql = fmt.Sprintf("%s LIMIT ?", sql) - return d.query(ctx, "list_revision_start_sql_limit", sql, start, end, revision, includeDeleted, limit) + return d.query(ctx, "list_revision_start_sql_limit", sql, start, end, revision, limit) } - return d.query(ctx, "list_revision_start_sql", sql, start, end, revision, includeDeleted) + return d.query(ctx, "list_revision_start_sql", sql, start, end, revision) +} + +func (d *Generic) ListTTL(ctx context.Context, revision int64) (*sql.Rows, error) { + return d.query(ctx, "ttl_sql", ttlSQL, revision) } func (d *Generic) CurrentRevision(ctx context.Context) (int64, error) { diff --git a/pkg/kine/sqllog/sqllog.go b/pkg/kine/sqllog/sqllog.go index 83468230..9cb9b8b7 100644 --- a/pkg/kine/sqllog/sqllog.go +++ b/pkg/kine/sqllog/sqllog.go @@ -42,7 +42,8 @@ func init() { } type Dialect interface { - List(ctx context.Context, prefix, startKey string, limit, revision int64, includeDeleted bool) (*sql.Rows, error) + List(ctx context.Context, prefix, startKey string, limit, revision int64) (*sql.Rows, error) + ListTTL(ctx context.Context, revision int64) (*sql.Rows, error) Count(ctx context.Context, prefix, startKey string, revision int64) (int64, error) CurrentRevision(ctx context.Context) (int64, error) AfterPrefix(ctx context.Context, prefix string, rev, limit int64) (*sql.Rows, error) @@ -286,7 +287,7 @@ func (s *SQLLog) List(ctx context.Context, prefix, startKey string, limit, revis startKey = "" } - rows, err := s.d.List(ctx, prefix, startKey, limit, revision, false) + rows, err := s.d.List(ctx, prefix, startKey, limit, revision) if err != nil { return 0, nil, err } @@ -313,23 +314,31 @@ func (s *SQLLog) ttl(ctx context.Context) { go func() { defer s.wg.Done() - rev, kvs, err := s.List(ctx, "/", "", 1000, 0) - for len(kvs) > 0 { - if err != nil { + startRevision, err := s.d.CurrentRevision(ctx) + if err != nil { + logrus.Errorf("failed to read old events for ttl: %v", err) + return + } + + rows, err := s.d.ListTTL(ctx, startRevision) + if err != nil { + logrus.Errorf("failed to read old events for ttl: %v", err) + return + } + + var ( + key string + revision, lease int64 + ) + for rows.Next() { + if err := rows.Scan(&revision, &key, &lease); err != nil { logrus.Errorf("failed to read old events for ttl: %v", err) return } - - for _, kv := range kvs { - if kv.Lease > 0 { - go run(ctx, kv.Key, kv.ModRevision, time.Duration(kv.Lease)*time.Second) - } - } - - _, kvs, err = s.List(ctx, "/", kvs[len(kvs)-1].Key, 1000, rev) + go run(ctx, key, revision, time.Duration(lease)*time.Second) } - watchCh, err := s.Watch(ctx, "/", rev) + watchCh, err := s.Watch(ctx, "/", startRevision) if err != nil { logrus.Errorf("failed to watch events for ttl: %v", err) return @@ -654,29 +663,16 @@ func ScanAll[T any](rows *sql.Rows, scanOne func(*sql.Rows) (T, error)) ([]T, er func scanKeyValue(rows *sql.Rows) (*server.KeyValue, error) { kv := &server.KeyValue{} - var create, delete bool - var prevRevision int64 - var prevValue []byte - err := rows.Scan( &kv.ModRevision, &kv.Key, - &create, - &delete, &kv.CreateRevision, - &prevRevision, &kv.Lease, &kv.Value, - &prevValue, ) if err != nil { return nil, err } - - if create { - kv.CreateRevision = kv.ModRevision - } - return kv, nil }