Skip to content

Commit

Permalink
Improve queries after LogStructured removal (#195)
Browse files Browse the repository at this point in the history
  • Loading branch information
marco6 authored Nov 27, 2024
1 parent c67af60 commit 40bd618
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 59 deletions.
87 changes: 55 additions & 32 deletions pkg/kine/drivers/generic/generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,16 @@ func init() {
}

var (
columns = "kv.id as theid, kv.name, kv.created, kv.deleted, kv.create_revision, kv.prev_revision, kv.lease, kv.value, kv.old_value"

revSQL = `
SELECT MAX(rkv.id) AS id
FROM kine AS rkv`

listSQL = fmt.Sprintf(`
SELECT %s
listSQL = `
SELECT kv.id,
name,
CASE WHEN kv.created THEN kv.id ELSE kv.create_revision END AS create_revision,
lease,
value
FROM kine AS kv
JOIN (
SELECT MAX(mkv.id) as id
Expand All @@ -97,9 +99,8 @@ var (
GROUP BY mkv.name
) AS maxkv
ON maxkv.id = kv.id
WHERE (kv.deleted = 0 OR ?)
ORDER BY kv.name ASC, kv.id ASC
`, columns)
WHERE kv.deleted = 0
ORDER BY kv.name ASC, kv.id ASC`

revisionIntervalSQL = `
SELECT (
Expand All @@ -111,28 +112,46 @@ var (
FROM kine
) AS high`

listRevisionStartSQL = listSQL

countRevisionSQL = fmt.Sprintf(`
countRevisionSQL = `
SELECT COUNT(*)
FROM (
%s
)`, listSQL)

afterSQLPrefix = fmt.Sprintf(`
SELECT %s
FROM kine AS kv
WHERE
kv.name >= ? AND kv.name < ?
AND kv.id > ?
ORDER BY kv.id ASC`, columns)

afterSQL = fmt.Sprintf(`
SELECT %s
FROM kine AS kv
WHERE kv.id > ?
ORDER BY kv.id ASC
`, columns)
JOIN (
SELECT MAX(mkv.id) as id
FROM kine AS mkv
WHERE
mkv.name >= ? AND mkv.name < ?
AND mkv.id <= ?
GROUP BY mkv.name
) AS maxkv
ON maxkv.id = kv.id
WHERE kv.deleted = 0`

afterSQLPrefix = `
SELECT id, name, created, deleted, create_revision, prev_revision, lease, value, old_value
FROM kine
WHERE name >= ? AND name < ?
AND id > ?
ORDER BY id ASC`

afterSQL = `
SELECT id, name, created, deleted, create_revision, prev_revision, lease, value, old_value
FROM kine
WHERE id > ?
ORDER BY id ASC`

ttlSQL = `
SELECT kv.id,
name,
lease
FROM kine AS kv
JOIN (
SELECT MAX(mkv.id) as id
FROM kine AS mkv
WHERE mkv.id <= ?
GROUP BY mkv.name
) AS maxkv
ON maxkv.id = kv.id
WHERE kv.deleted = 0 AND kv.lease != 0`

deleteRevSQL = `
DELETE FROM kine
Expand Down Expand Up @@ -408,7 +427,7 @@ func (d *Generic) Count(ctx context.Context, prefix, startKey string, revision i
if startKey != "" {
start = startKey + "\x01"
}
rows, err := d.query(ctx, "count_revision", countRevisionSQL, start, end, revision, false)
rows, err := d.query(ctx, "count_revision", countRevisionSQL, start, end, revision)
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -651,17 +670,21 @@ func (d *Generic) DeleteRevision(ctx context.Context, revision int64) error {
return err
}

func (d *Generic) List(ctx context.Context, prefix, startKey string, limit, revision int64, includeDeleted bool) (*sql.Rows, error) {
func (d *Generic) List(ctx context.Context, prefix, startKey string, limit, revision int64) (*sql.Rows, error) {
start, end := getPrefixRange(prefix)
if startKey != "" {
start = startKey + "\x01"
}
sql := listRevisionStartSQL
sql := listSQL
if limit > 0 {
sql = fmt.Sprintf("%s LIMIT ?", sql)
return d.query(ctx, "list_revision_start_sql_limit", sql, start, end, revision, includeDeleted, limit)
return d.query(ctx, "list_revision_start_sql_limit", sql, start, end, revision, limit)
}
return d.query(ctx, "list_revision_start_sql", sql, start, end, revision, includeDeleted)
return d.query(ctx, "list_revision_start_sql", sql, start, end, revision)
}

func (d *Generic) ListTTL(ctx context.Context, revision int64) (*sql.Rows, error) {
return d.query(ctx, "ttl_sql", ttlSQL, revision)
}

func (d *Generic) CurrentRevision(ctx context.Context) (int64, error) {
Expand Down
50 changes: 23 additions & 27 deletions pkg/kine/sqllog/sqllog.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ func init() {
}

type Dialect interface {
List(ctx context.Context, prefix, startKey string, limit, revision int64, includeDeleted bool) (*sql.Rows, error)
List(ctx context.Context, prefix, startKey string, limit, revision int64) (*sql.Rows, error)
ListTTL(ctx context.Context, revision int64) (*sql.Rows, error)
Count(ctx context.Context, prefix, startKey string, revision int64) (int64, error)
CurrentRevision(ctx context.Context) (int64, error)
AfterPrefix(ctx context.Context, prefix string, rev, limit int64) (*sql.Rows, error)
Expand Down Expand Up @@ -286,7 +287,7 @@ func (s *SQLLog) List(ctx context.Context, prefix, startKey string, limit, revis
startKey = ""
}

rows, err := s.d.List(ctx, prefix, startKey, limit, revision, false)
rows, err := s.d.List(ctx, prefix, startKey, limit, revision)
if err != nil {
return 0, nil, err
}
Expand All @@ -313,23 +314,31 @@ func (s *SQLLog) ttl(ctx context.Context) {
go func() {
defer s.wg.Done()

rev, kvs, err := s.List(ctx, "/", "", 1000, 0)
for len(kvs) > 0 {
if err != nil {
startRevision, err := s.d.CurrentRevision(ctx)
if err != nil {
logrus.Errorf("failed to read old events for ttl: %v", err)
return
}

rows, err := s.d.ListTTL(ctx, startRevision)
if err != nil {
logrus.Errorf("failed to read old events for ttl: %v", err)
return
}

var (
key string
revision, lease int64
)
for rows.Next() {
if err := rows.Scan(&revision, &key, &lease); err != nil {
logrus.Errorf("failed to read old events for ttl: %v", err)
return
}

for _, kv := range kvs {
if kv.Lease > 0 {
go run(ctx, kv.Key, kv.ModRevision, time.Duration(kv.Lease)*time.Second)
}
}

_, kvs, err = s.List(ctx, "/", kvs[len(kvs)-1].Key, 1000, rev)
go run(ctx, key, revision, time.Duration(lease)*time.Second)
}

watchCh, err := s.Watch(ctx, "/", rev)
watchCh, err := s.Watch(ctx, "/", startRevision)
if err != nil {
logrus.Errorf("failed to watch events for ttl: %v", err)
return
Expand Down Expand Up @@ -654,29 +663,16 @@ func ScanAll[T any](rows *sql.Rows, scanOne func(*sql.Rows) (T, error)) ([]T, er

func scanKeyValue(rows *sql.Rows) (*server.KeyValue, error) {
kv := &server.KeyValue{}
var create, delete bool
var prevRevision int64
var prevValue []byte

err := rows.Scan(
&kv.ModRevision,
&kv.Key,
&create,
&delete,
&kv.CreateRevision,
&prevRevision,
&kv.Lease,
&kv.Value,
&prevValue,
)
if err != nil {
return nil, err
}

if create {
kv.CreateRevision = kv.ModRevision
}

return kv, nil
}

Expand Down

0 comments on commit 40bd618

Please sign in to comment.