Skip to content

Commit

Permalink
Make DoCompact operation interruptible
Browse files Browse the repository at this point in the history
  • Loading branch information
marco6 committed Jul 1, 2024
1 parent e858e19 commit 02585d2
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 24 deletions.
6 changes: 3 additions & 3 deletions pkg/kine/logstructured/logstructured.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type Log interface {
Count(ctx context.Context, prefix, startKey string, revision int64) (int64, int64, error)
Append(ctx context.Context, event *server.Event) (int64, error)
DbSize(ctx context.Context) (int64, error)
DoCompact() error
DoCompact(ctx context.Context) error
}

type LogStructured struct {
Expand All @@ -31,8 +31,8 @@ func New(log Log) *LogStructured {
}
}

func (l *LogStructured) DoCompact() error {
return l.log.DoCompact()
func (l *LogStructured) DoCompact(ctx context.Context) error {
return l.log.DoCompact(ctx)
}

func (l *LogStructured) Start(ctx context.Context) error {
Expand Down
34 changes: 17 additions & 17 deletions pkg/kine/logstructured/sqllog/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"github.com/sirupsen/logrus"
)

const supersededCount = 100

type SQLLog struct {
d Dialect
broadcaster broadcaster.Broadcaster
Expand Down Expand Up @@ -100,25 +102,25 @@ func (s *SQLLog) compactStart(ctx context.Context) error {

// DoCompact makes a single compaction run when called. It is intended to be called
// from test functions that have access to the backend.
func (s *SQLLog) DoCompact() error {
if err := s.compactStart(s.ctx); err != nil {
func (s *SQLLog) DoCompact(ctx context.Context) error {
if err := s.compactStart(ctx); err != nil {
return fmt.Errorf("failed to initialise compaction: %v", err)
}

nextEnd, _ := s.d.CurrentRevision(s.ctx)
_, err := s.compactor(nextEnd)
nextEnd, _ := s.d.CurrentRevision(ctx)
_, err := s.compactor(ctx, nextEnd)

return err
}

func (s *SQLLog) compactor(nextEnd int64) (int64, error) {
currentRev, err := s.d.CurrentRevision(s.ctx)
func (s *SQLLog) compactor(ctx context.Context, nextEnd int64) (int64, error) {
currentRev, err := s.d.CurrentRevision(ctx)
if err != nil {
logrus.Errorf("failed to get current revision: %v", err)
return nextEnd, fmt.Errorf("failed to get current revision: %v", err)
}

cursor, _, err := s.d.GetCompactRevision(s.ctx)
cursor, _, err := s.d.GetCompactRevision(ctx)
if err != nil {
logrus.Errorf("failed to get compact revision: %v", err)
return nextEnd, fmt.Errorf("failed to get compact revision: %v", err)
Expand All @@ -131,15 +133,13 @@ func (s *SQLLog) compactor(nextEnd int64) (int64, error) {
// This is because of low activity, where the created list is part of the last 1000 revisions and is not compacted.
// Link to failing test: https://github.com/kubernetes/kubernetes/blob/f2cfbf44b1fb482671aedbfff820ae2af256a389/test/e2e/apimachinery/chunking.go#L144
// To address this, we only ignore the last 100 revisions instead

// end = end - 1000
end = end - 100
end = end - supersededCount

savedCursor := cursor
// Purposefully start at the current and redo the current as
// it could have failed before actually compacting
for ; cursor <= end; cursor++ {
rows, err := s.d.GetRevision(s.ctx, cursor)
rows, err := s.d.GetRevision(ctx, cursor)
if err != nil {
logrus.Errorf("failed to get revision %d: %v", cursor, err)
return nextEnd, fmt.Errorf("failed to get revision %d: %v", cursor, err)
Expand All @@ -165,38 +165,38 @@ func (s *SQLLog) compactor(nextEnd int64) (int64, error) {
setRev := false
if event.PrevKV != nil && event.PrevKV.ModRevision != 0 {
if savedCursor != cursor {
if err := s.d.SetCompactRevision(s.ctx, cursor); err != nil {
if err := s.d.SetCompactRevision(ctx, cursor); err != nil {
logrus.Errorf("failed to record compact revision: %v", err)
return nextEnd, fmt.Errorf("failed to record compact revision: %v", err)
}
savedCursor = cursor
setRev = true
}

if err := s.d.DeleteRevision(s.ctx, event.PrevKV.ModRevision); err != nil {
if err := s.d.DeleteRevision(ctx, event.PrevKV.ModRevision); err != nil {
logrus.Errorf("failed to delete revision %d: %v", event.PrevKV.ModRevision, err)
return nextEnd, fmt.Errorf("failed to delete revision %d: %v", event.PrevKV.ModRevision, err)
}
}

if event.Delete {
if !setRev && savedCursor != cursor {
if err := s.d.SetCompactRevision(s.ctx, cursor); err != nil {
if err := s.d.SetCompactRevision(ctx, cursor); err != nil {
logrus.Errorf("failed to record compact revision: %v", err)
return nextEnd, fmt.Errorf("failed to record compact revision: %v", err)
}
savedCursor = cursor
}

if err := s.d.DeleteRevision(s.ctx, cursor); err != nil {
if err := s.d.DeleteRevision(ctx, cursor); err != nil {
logrus.Errorf("failed to delete current revision %d: %v", cursor, err)
return nextEnd, fmt.Errorf("failed to delete current revision %d: %v", cursor, err)
}
}
}

if savedCursor != cursor {
if err := s.d.SetCompactRevision(s.ctx, cursor); err != nil {
if err := s.d.SetCompactRevision(ctx, cursor); err != nil {
logrus.Errorf("failed to record compact revision: %v", err)
return nextEnd, fmt.Errorf("failed to record compact revision: %v", err)
}
Expand All @@ -218,7 +218,7 @@ func (s *SQLLog) compact() {
case <-t.C:
}

nextEnd, _ = s.compactor(nextEnd)
nextEnd, _ = s.compactor(s.ctx, nextEnd)
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/kine/server/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type Backend interface {
Update(ctx context.Context, key string, value []byte, revision, lease int64) (int64, *KeyValue, bool, error)
Watch(ctx context.Context, key string, revision int64) <-chan []*Event
DbSize(ctx context.Context) (int64, error)
DoCompact() error
DoCompact(ctx context.Context) error
}

type KeyValue struct {
Expand Down
6 changes: 3 additions & 3 deletions test/compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func TestCompaction(t *testing.T) {
initialSize, err := backend.DbSize(ctx)
g.Expect(err).To(BeNil())

err = backend.DoCompact()
err = backend.DoCompact(ctx)
g.Expect(err).To(BeNil())

finalSize, err := backend.DbSize(ctx)
Expand Down Expand Up @@ -56,7 +56,7 @@ func TestCompaction(t *testing.T) {
initialSize, err := backend.DbSize(ctx)
g.Expect(err).To(BeNil())

err = backend.DoCompact()
err = backend.DoCompact(ctx)
g.Expect(err).To(BeNil())

finalSize, err := backend.DbSize(ctx)
Expand Down Expand Up @@ -120,7 +120,7 @@ func BenchmarkCompaction(b *testing.B) {
initialSize, err := backend.DbSize(ctx)
g.Expect(err).To(BeNil())

err = backend.DoCompact()
err = backend.DoCompact(ctx)
g.Expect(err).To(BeNil())

finalSize, err := backend.DbSize(ctx)
Expand Down

0 comments on commit 02585d2

Please sign in to comment.