From 560c54fccba853ec6af8190eb8c8f26428866969 Mon Sep 17 00:00:00 2001 From: gayurgin Date: Thu, 16 Jan 2025 11:23:03 +0300 Subject: [PATCH] minor fixes --- .../create_snapshot_from_disk_task.go | 91 +++++++++++-------- .../create_snapshot_from_disk_task_test.go | 20 ++-- .../create_snapshot_from_disk_task.proto | 4 +- 3 files changed, 63 insertions(+), 52 deletions(-) diff --git a/cloud/disk_manager/internal/pkg/services/snapshots/create_snapshot_from_disk_task.go b/cloud/disk_manager/internal/pkg/services/snapshots/create_snapshot_from_disk_task.go index c52604b2760..aa5d4d3dfbd 100644 --- a/cloud/disk_manager/internal/pkg/services/snapshots/create_snapshot_from_disk_task.go +++ b/cloud/disk_manager/internal/pkg/services/snapshots/create_snapshot_from_disk_task.go @@ -79,7 +79,7 @@ func (t *createSnapshotFromDiskTask) run( return nil } - if t.state.FinalCheckpointID == "" { + if t.state.CheckpointID == "" { err = t.updateCheckpoint(ctx, nbsClient) if err != nil { return err @@ -96,7 +96,7 @@ func (t *createSnapshotFromDiskTask) run( return err } - t.state.FinalCheckpointID = t.getCurrentCheckpointID() + t.state.CheckpointID = t.getCurrentCheckpointID() err = execCtx.SaveState(ctx) if err != nil { return err @@ -110,7 +110,7 @@ func (t *createSnapshotFromDiskTask) run( disk.ZoneId, &dataplane_protos.CreateSnapshotFromDiskRequest{ SrcDisk: disk, - SrcDiskCheckpointId: t.state.FinalCheckpointID, + SrcDiskCheckpointId: t.state.CheckpointID, DstSnapshotId: t.request.DstSnapshotId, UseS3: t.request.UseS3, UseProxyOverlayDisk: t.request.UseProxyOverlayDisk, @@ -152,7 +152,7 @@ func (t *createSnapshotFromDiskTask) run( return t.storage.SnapshotCreated( ctx, t.request.DstSnapshotId, - t.state.FinalCheckpointID, + t.state.CheckpointID, time.Now(), uint64(t.state.SnapshotSize), uint64(t.state.SnapshotStorageSize), @@ -178,8 +178,8 @@ func (t *createSnapshotFromDiskTask) Run( err = nbsClient.DeleteCheckpointData( ctx, - t.request.GetSrcDisk().DiskId, - t.state.FinalCheckpointID, + disk.DiskId, + t.state.CheckpointID, ) if err != nil { return err @@ -200,16 +200,7 @@ func (t *createSnapshotFromDiskTask) Cancel( return err } - err = nbsClient.DeleteCheckpoint( - ctx, - t.request.GetSrcDisk().DiskId, - t.getCurrentCheckpointID(), - ) - if err != nil { - return err - } - - err = t.deletePreviousCheckpoint(ctx, nbsClient) + err = t.cleanupCheckpoints(ctx, nbsClient) if err != nil { return err } @@ -316,12 +307,14 @@ func (t *createSnapshotFromDiskTask) ensureCheckpointReady( return errors.NewInterruptExecutionError() case nbs.CheckpointStatusError: - t.state.FailedCheckpointsCount++ + t.state.CheckpointIteration++ err = execCtx.SaveState(ctx) if err != nil { return err } - return errors.NewRetriableErrorf("Filling the NRD disk replica ended with an error.") + return errors.NewRetriableErrorf( + "Filling the NRD disk replica ended with an error.", + ) case nbs.CheckpointStatusReady: // Nothing to do. @@ -332,6 +325,38 @@ func (t *createSnapshotFromDiskTask) ensureCheckpointReady( //////////////////////////////////////////////////////////////////////////////// +func (t *createSnapshotFromDiskTask) makeCheckpointID(index int) string { + if index == 0 { + return t.request.DstSnapshotId + } + return fmt.Sprintf("%v_%v", t.request.DstSnapshotId, index) +} + +func (t *createSnapshotFromDiskTask) getCurrentCheckpointID() string { + return t.makeCheckpointID(int(t.state.CheckpointIteration)) +} + +func (t *createSnapshotFromDiskTask) deletePreviousCheckpoint( + ctx context.Context, + nbsClient nbs.Client, +) error { + + if t.state.CheckpointIteration == 0 { + // No previous checkpoint, nothing to do. + return nil + } + + checkpointID := t.makeCheckpointID( + int(t.state.CheckpointIteration) - 1, + ) + + return nbsClient.DeleteCheckpoint( + ctx, + t.request.SrcDisk.DiskId, + checkpointID, + ) +} + func (t *createSnapshotFromDiskTask) updateCheckpoint( ctx context.Context, nbsClient nbs.Client, @@ -345,39 +370,25 @@ func (t *createSnapshotFromDiskTask) updateCheckpoint( return nbsClient.CreateCheckpoint( ctx, nbs.CheckpointParams{ - DiskID: t.request.GetSrcDisk().DiskId, + DiskID: t.request.SrcDisk.DiskId, CheckpointID: t.getCurrentCheckpointID(), }, ) } -func (t *createSnapshotFromDiskTask) deletePreviousCheckpoint( +func (t *createSnapshotFromDiskTask) cleanupCheckpoints( ctx context.Context, nbsClient nbs.Client, ) error { - if t.state.FailedCheckpointsCount == 0 { - // No previous checkpoint, nothing to do. - return nil - } - checkpointID := t.makeCheckpointID( - int(t.state.FailedCheckpointsCount) - 1, - ) + err := t.deletePreviousCheckpoint(ctx, nbsClient) + if err != nil { + return err + } return nbsClient.DeleteCheckpoint( ctx, - t.request.GetSrcDisk().DiskId, - checkpointID, + t.request.SrcDisk.DiskId, + t.getCurrentCheckpointID(), ) } - -func (t *createSnapshotFromDiskTask) getCurrentCheckpointID() string { - return t.makeCheckpointID(int(t.state.FailedCheckpointsCount)) -} - -func (t *createSnapshotFromDiskTask) makeCheckpointID(index int) string { - if index == 0 { - return t.request.DstSnapshotId - } - return fmt.Sprintf("%v_%v", t.request.DstSnapshotId, index) -} diff --git a/cloud/disk_manager/internal/pkg/services/snapshots/create_snapshot_from_disk_task_test.go b/cloud/disk_manager/internal/pkg/services/snapshots/create_snapshot_from_disk_task_test.go index 090dcfcec1c..14865c5436b 100644 --- a/cloud/disk_manager/internal/pkg/services/snapshots/create_snapshot_from_disk_task_test.go +++ b/cloud/disk_manager/internal/pkg/services/snapshots/create_snapshot_from_disk_task_test.go @@ -55,8 +55,8 @@ func TestCreateSnapshotFromDiskUpdatesCheckpointsCorrectly(t *testing.T) { state: &protos.CreateSnapshotFromDiskTaskState{}, } - require.Equal(t, int32(0), task.state.FailedCheckpointsCount) - require.Empty(t, task.state.FinalCheckpointID) + require.Equal(t, int32(0), task.state.CheckpointIteration) + require.Empty(t, task.state.CheckpointID) // Create first checkpoint and get checkpoint status 'Error'. execCtx.On("GetTaskID").Return("create_snapshot_from_disk_task") @@ -81,8 +81,8 @@ func TestCreateSnapshotFromDiskUpdatesCheckpointsCorrectly(t *testing.T) { err := task.Run(ctx, execCtx) require.Error(t, err) require.True(t, errors.CanRetry(err)) - require.Equal(t, int32(1), task.state.FailedCheckpointsCount) - require.Empty(t, task.state.FinalCheckpointID) + require.Equal(t, int32(1), task.state.CheckpointIteration) + require.Empty(t, task.state.CheckpointID) mock.AssertExpectationsForObjects(t, scheduler, storage, nbsFactory, nbsClient, execCtx) // Create second checkpoint and get checkpoint status 'Ready'. @@ -126,8 +126,8 @@ func TestCreateSnapshotFromDiskUpdatesCheckpointsCorrectly(t *testing.T) { err = task.Run(ctx, execCtx) require.Error(t, err) require.True(t, errors.Is(err, errors.NewInterruptExecutionError())) - require.Equal(t, int32(1), task.state.FailedCheckpointsCount) - require.Equal(t, task.state.FinalCheckpointID, "snapshotID_1") + require.Equal(t, int32(1), task.state.CheckpointIteration) + require.Equal(t, task.state.CheckpointID, "snapshotID_1") mock.AssertExpectationsForObjects(t, scheduler, storage, nbsFactory, nbsClient, execCtx) // Finish waiting for the dataplane task, finish creating snapshot. @@ -154,8 +154,8 @@ func TestCreateSnapshotFromDiskUpdatesCheckpointsCorrectly(t *testing.T) { err = task.Run(ctx, execCtx) require.NoError(t, err) - require.Equal(t, int32(1), task.state.FailedCheckpointsCount) - require.Equal(t, task.state.FinalCheckpointID, "snapshotID_1") + require.Equal(t, int32(1), task.state.CheckpointIteration) + require.Equal(t, task.state.CheckpointID, "snapshotID_1") mock.AssertExpectationsForObjects(t, scheduler, storage, nbsFactory, nbsClient, execCtx) // Cancel the task. @@ -185,8 +185,8 @@ func TestCreateSnapshotFromDiskUpdatesCheckpointsCorrectly(t *testing.T) { err = task.Cancel(ctx, execCtx) require.NoError(t, err) - require.Equal(t, int32(1), task.state.FailedCheckpointsCount) - require.Equal(t, task.state.FinalCheckpointID, "snapshotID_1") + require.Equal(t, int32(1), task.state.CheckpointIteration) + require.Equal(t, task.state.CheckpointID, "snapshotID_1") mock.AssertExpectationsForObjects( t, scheduler, diff --git a/cloud/disk_manager/internal/pkg/services/snapshots/protos/create_snapshot_from_disk_task.proto b/cloud/disk_manager/internal/pkg/services/snapshots/protos/create_snapshot_from_disk_task.proto index bc88d8a0bc2..d77af2a4335 100644 --- a/cloud/disk_manager/internal/pkg/services/snapshots/protos/create_snapshot_from_disk_task.proto +++ b/cloud/disk_manager/internal/pkg/services/snapshots/protos/create_snapshot_from_disk_task.proto @@ -26,6 +26,6 @@ message CreateSnapshotFromDiskTaskState { string DataplaneTaskID = 6; // Needed for shadow disk based checkpoints. - int32 FailedCheckpointsCount = 7; - string FinalCheckpointID = 8; + int32 CheckpointIteration = 7; + string CheckpointID = 8; }