Skip to content

Commit

Permalink
Add TTL query to speed up startup
Browse files Browse the repository at this point in the history
  • Loading branch information
marco6 committed Nov 27, 2024
1 parent c0f5db4 commit 9552496
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 27 deletions.
44 changes: 29 additions & 15 deletions pkg/kine/drivers/generic/generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,6 @@ func init() {
}

var (
columns = "kv.id as theid, kv.name, kv.created, kv.deleted, kv.create_revision, kv.prev_revision, kv.lease, kv.value, kv.old_value"

revSQL = `
SELECT MAX(rkv.id) AS id
FROM kine AS rkv`
Expand Down Expand Up @@ -128,20 +126,32 @@ var (
ON maxkv.id = kv.id
WHERE kv.deleted = 0`

afterSQLPrefix = fmt.Sprintf(`
SELECT %s
afterSQLPrefix = `
SELECT id, name, created, deleted, create_revision, prev_revision, lease, value, old_value
FROM kine
WHERE name >= ? AND name < ?
AND id > ?
ORDER BY id ASC`

afterSQL = `
SELECT id, name, created, deleted, create_revision, prev_revision, lease, value, old_value
FROM kine
WHERE id > ?
ORDER BY id ASC`

ttlSQL = `
SELECT kv.id,
name,
lease
FROM kine AS kv
WHERE
kv.name >= ? AND kv.name < ?
AND kv.id > ?
ORDER BY kv.id ASC`, columns)

afterSQL = fmt.Sprintf(`
SELECT %s
FROM kine AS kv
WHERE kv.id > ?
ORDER BY kv.id ASC
`, columns)
JOIN (
SELECT MAX(mkv.id) as id
FROM kine AS mkv
WHERE mkv.id <= ?
GROUP BY mkv.name
) AS maxkv
ON maxkv.id = kv.id
WHERE kv.deleted = 0 AND kv.lease != 0`

deleteRevSQL = `
DELETE FROM kine
Expand Down Expand Up @@ -673,6 +683,10 @@ func (d *Generic) List(ctx context.Context, prefix, startKey string, limit, revi
return d.query(ctx, "list_revision_start_sql", sql, start, end, revision)
}

func (d *Generic) ListTTL(ctx context.Context, revision int64) (*sql.Rows, error) {
return d.query(ctx, "ttl_sql", ttlSQL, revision)
}

func (d *Generic) CurrentRevision(ctx context.Context) (int64, error) {
var id int64
var err error
Expand Down
33 changes: 21 additions & 12 deletions pkg/kine/sqllog/sqllog.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func init() {

type Dialect interface {
List(ctx context.Context, prefix, startKey string, limit, revision int64) (*sql.Rows, error)
ListTTL(ctx context.Context, revision int64) (*sql.Rows, error)
Count(ctx context.Context, prefix, startKey string, revision int64) (int64, error)
CurrentRevision(ctx context.Context) (int64, error)
AfterPrefix(ctx context.Context, prefix string, rev, limit int64) (*sql.Rows, error)
Expand Down Expand Up @@ -313,23 +314,31 @@ func (s *SQLLog) ttl(ctx context.Context) {
go func() {
defer s.wg.Done()

rev, kvs, err := s.List(ctx, "/", "", 1000, 0)
for len(kvs) > 0 {
if err != nil {
startRevision, err := s.d.CurrentRevision(ctx)
if err != nil {
logrus.Errorf("failed to read old events for ttl: %v", err)
return
}

rows, err := s.d.ListTTL(ctx, startRevision)
if err != nil {
logrus.Errorf("failed to read old events for ttl: %v", err)
return
}

var (
key string
revision, lease int64
)
for rows.Next() {
if err := rows.Scan(&revision, &key, &lease); err != nil {
logrus.Errorf("failed to read old events for ttl: %v", err)
return
}

for _, kv := range kvs {
if kv.Lease > 0 {
go run(ctx, kv.Key, kv.ModRevision, time.Duration(kv.Lease)*time.Second)
}
}

_, kvs, err = s.List(ctx, "/", kvs[len(kvs)-1].Key, 1000, rev)
go run(ctx, key, revision, time.Duration(lease)*time.Second)
}

watchCh, err := s.Watch(ctx, "/", rev)
watchCh, err := s.Watch(ctx, "/", startRevision)
if err != nil {
logrus.Errorf("failed to watch events for ttl: %v", err)
return
Expand Down

0 comments on commit 9552496

Please sign in to comment.