Skip to content

Commit

Permalink
minor fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
gy2411 committed Jan 16, 2025
1 parent e5ab782 commit bc484cf
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (t *createSnapshotFromDiskTask) run(
return nil
}

if t.state.FinalCheckpointID == "" {
if t.state.CheckpointID == "" {
err = t.updateCheckpoint(ctx, nbsClient)
if err != nil {
return err
Expand All @@ -96,7 +96,7 @@ func (t *createSnapshotFromDiskTask) run(
return err
}

t.state.FinalCheckpointID = t.getCurrentCheckpointID()
t.state.CheckpointID = t.getCurrentCheckpointID()
err = execCtx.SaveState(ctx)
if err != nil {
return err
Expand All @@ -110,7 +110,7 @@ func (t *createSnapshotFromDiskTask) run(
disk.ZoneId,
&dataplane_protos.CreateSnapshotFromDiskRequest{
SrcDisk: disk,
SrcDiskCheckpointId: t.state.FinalCheckpointID,
SrcDiskCheckpointId: t.state.CheckpointID,
DstSnapshotId: t.request.DstSnapshotId,
UseS3: t.request.UseS3,
UseProxyOverlayDisk: t.request.UseProxyOverlayDisk,
Expand Down Expand Up @@ -152,7 +152,7 @@ func (t *createSnapshotFromDiskTask) run(
return t.storage.SnapshotCreated(
ctx,
t.request.DstSnapshotId,
t.state.FinalCheckpointID,
t.state.CheckpointID,
time.Now(),
uint64(t.state.SnapshotSize),
uint64(t.state.SnapshotStorageSize),
Expand All @@ -178,8 +178,8 @@ func (t *createSnapshotFromDiskTask) Run(

err = nbsClient.DeleteCheckpointData(
ctx,
t.request.GetSrcDisk().DiskId,
t.state.FinalCheckpointID,
disk.DiskId,
t.state.CheckpointID,
)
if err != nil {
return err
Expand All @@ -200,16 +200,7 @@ func (t *createSnapshotFromDiskTask) Cancel(
return err
}

err = nbsClient.DeleteCheckpoint(
ctx,
t.request.GetSrcDisk().DiskId,
t.getCurrentCheckpointID(),
)
if err != nil {
return err
}

err = t.deletePreviousCheckpoint(ctx, nbsClient)
err = t.cleanupCheckpoints(ctx, nbsClient)
if err != nil {
return err
}
Expand Down Expand Up @@ -316,7 +307,7 @@ func (t *createSnapshotFromDiskTask) ensureCheckpointReady(
return errors.NewInterruptExecutionError()

case nbs.CheckpointStatusError:
t.state.FailedCheckpointsCount++
t.state.CheckpointIteration++
err = execCtx.SaveState(ctx)
if err != nil {
return err
Expand All @@ -332,6 +323,38 @@ func (t *createSnapshotFromDiskTask) ensureCheckpointReady(

////////////////////////////////////////////////////////////////////////////////

func (t *createSnapshotFromDiskTask) makeCheckpointID(index int) string {
if index == 0 {
return t.request.DstSnapshotId
}
return fmt.Sprintf("%v_%v", t.request.DstSnapshotId, index)
}

func (t *createSnapshotFromDiskTask) getCurrentCheckpointID() string {
return t.makeCheckpointID(int(t.state.CheckpointIteration))
}

func (t *createSnapshotFromDiskTask) deletePreviousCheckpoint(
ctx context.Context,
nbsClient nbs.Client,
) error {

if t.state.CheckpointIteration == 0 {
// No previous checkpoint, nothing to do.
return nil
}

checkpointID := t.makeCheckpointID(
int(t.state.CheckpointIteration) - 1,
)

return nbsClient.DeleteCheckpoint(
ctx,
t.request.SrcDisk.DiskId,
checkpointID,
)
}

func (t *createSnapshotFromDiskTask) updateCheckpoint(
ctx context.Context,
nbsClient nbs.Client,
Expand All @@ -345,39 +368,25 @@ func (t *createSnapshotFromDiskTask) updateCheckpoint(
return nbsClient.CreateCheckpoint(
ctx,
nbs.CheckpointParams{
DiskID: t.request.GetSrcDisk().DiskId,
DiskID: t.request.SrcDisk.DiskId,
CheckpointID: t.getCurrentCheckpointID(),
},
)
}

func (t *createSnapshotFromDiskTask) deletePreviousCheckpoint(
func (t *createSnapshotFromDiskTask) cleanupCheckpoints(
ctx context.Context,
nbsClient nbs.Client,
) error {
if t.state.FailedCheckpointsCount == 0 {
// No previous checkpoint, nothing to do.
return nil
}

checkpointID := t.makeCheckpointID(
int(t.state.FailedCheckpointsCount) - 1,
)
err := t.deletePreviousCheckpoint(ctx, nbsClient)
if err != nil {
return err
}

return nbsClient.DeleteCheckpoint(
ctx,
t.request.GetSrcDisk().DiskId,
checkpointID,
t.request.SrcDisk.DiskId,
t.getCurrentCheckpointID(),
)
}

func (t *createSnapshotFromDiskTask) getCurrentCheckpointID() string {
return t.makeCheckpointID(int(t.state.FailedCheckpointsCount))
}

func (t *createSnapshotFromDiskTask) makeCheckpointID(index int) string {
if index == 0 {
return t.request.DstSnapshotId
}
return fmt.Sprintf("%v_%v", t.request.DstSnapshotId, index)
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ func TestCreateSnapshotFromDiskUpdatesCheckpointsCorrectly(t *testing.T) {
state: &protos.CreateSnapshotFromDiskTaskState{},
}

require.Equal(t, int32(0), task.state.FailedCheckpointsCount)
require.Empty(t, task.state.FinalCheckpointID)
require.Equal(t, int32(0), task.state.CheckpointIteration)
require.Empty(t, task.state.CheckpointID)

// Create first checkpoint and get checkpoint status 'Error'.
execCtx.On("GetTaskID").Return("create_snapshot_from_disk_task")
Expand All @@ -81,8 +81,8 @@ func TestCreateSnapshotFromDiskUpdatesCheckpointsCorrectly(t *testing.T) {
err := task.Run(ctx, execCtx)
require.Error(t, err)
require.True(t, errors.CanRetry(err))
require.Equal(t, int32(1), task.state.FailedCheckpointsCount)
require.Empty(t, task.state.FinalCheckpointID)
require.Equal(t, int32(1), task.state.CheckpointIteration)
require.Empty(t, task.state.CheckpointID)
mock.AssertExpectationsForObjects(t, scheduler, storage, nbsFactory, nbsClient, execCtx)

// Create second checkpoint and get checkpoint status 'Ready'.
Expand Down Expand Up @@ -126,8 +126,8 @@ func TestCreateSnapshotFromDiskUpdatesCheckpointsCorrectly(t *testing.T) {
err = task.Run(ctx, execCtx)
require.Error(t, err)
require.True(t, errors.Is(err, errors.NewInterruptExecutionError()))
require.Equal(t, int32(1), task.state.FailedCheckpointsCount)
require.Equal(t, task.state.FinalCheckpointID, "snapshotID_1")
require.Equal(t, int32(1), task.state.CheckpointIteration)
require.Equal(t, task.state.CheckpointID, "snapshotID_1")
mock.AssertExpectationsForObjects(t, scheduler, storage, nbsFactory, nbsClient, execCtx)

// Finish waiting for the dataplane task, finish creating snapshot.
Expand All @@ -154,8 +154,8 @@ func TestCreateSnapshotFromDiskUpdatesCheckpointsCorrectly(t *testing.T) {

err = task.Run(ctx, execCtx)
require.NoError(t, err)
require.Equal(t, int32(1), task.state.FailedCheckpointsCount)
require.Equal(t, task.state.FinalCheckpointID, "snapshotID_1")
require.Equal(t, int32(1), task.state.CheckpointIteration)
require.Equal(t, task.state.CheckpointID, "snapshotID_1")
mock.AssertExpectationsForObjects(t, scheduler, storage, nbsFactory, nbsClient, execCtx)

// Cancel the task.
Expand Down Expand Up @@ -185,8 +185,8 @@ func TestCreateSnapshotFromDiskUpdatesCheckpointsCorrectly(t *testing.T) {

err = task.Cancel(ctx, execCtx)
require.NoError(t, err)
require.Equal(t, int32(1), task.state.FailedCheckpointsCount)
require.Equal(t, task.state.FinalCheckpointID, "snapshotID_1")
require.Equal(t, int32(1), task.state.CheckpointIteration)
require.Equal(t, task.state.CheckpointID, "snapshotID_1")
mock.AssertExpectationsForObjects(
t,
scheduler,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,6 @@ message CreateSnapshotFromDiskTaskState {
string DataplaneTaskID = 6;

// Needed for shadow disk based checkpoints.
int32 FailedCheckpointsCount = 7;
string FinalCheckpointID = 8;
int32 CheckpointIteration = 7;
string CheckpointID = 8;
}

0 comments on commit bc484cf

Please sign in to comment.