diff --git a/cloud/disk_manager/internal/pkg/clients/nbs/client.go b/cloud/disk_manager/internal/pkg/clients/nbs/client.go index b05ca92de13..ab8b850cf83 100644 --- a/cloud/disk_manager/internal/pkg/clients/nbs/client.go +++ b/cloud/disk_manager/internal/pkg/clients/nbs/client.go @@ -865,6 +865,46 @@ func (c *client) DeleteCheckpointData( return err } +func (c *client) EnsureCheckpointReady( + ctx context.Context, + diskID string, + checkpointID string, +) (err error) { + + defer c.metrics.StatRequest("EnsureCheckpointReady")(&err) + + status, err := c.GetCheckpointStatus(ctx, diskID, checkpointID) + if err != nil { + return err + } + + logging.Debug( + ctx, + "Current status of checkpoint with id %v for disk %v is %v", + checkpointID, + diskID, + status, + ) + + switch status { + case CheckpointStatusNotReady: + return errors.NewInterruptExecutionError() + + case CheckpointStatusError: + _ = c.DeleteCheckpoint(ctx, diskID, checkpointID) + return errors.NewRetriableErrorf( + "creating checkpoint with id %v for disk %v ended with an error", + checkpointID, + diskID, + ) + + case CheckpointStatusReady: + // Nothing to do. + } + + return nil +} + func (c *client) Resize( ctx context.Context, checkpoint func() error, diff --git a/cloud/disk_manager/internal/pkg/clients/nbs/interface.go b/cloud/disk_manager/internal/pkg/clients/nbs/interface.go index c99d73f7f18..700b9d0d32f 100644 --- a/cloud/disk_manager/internal/pkg/clients/nbs/interface.go +++ b/cloud/disk_manager/internal/pkg/clients/nbs/interface.go @@ -161,6 +161,12 @@ type Client interface { checkpointID string, ) error + EnsureCheckpointReady( + ctx context.Context, + diskID string, + checkpointID string, + ) error + Resize( ctx context.Context, saveState func() error, diff --git a/cloud/disk_manager/internal/pkg/clients/nbs/mocks/client_mock.go b/cloud/disk_manager/internal/pkg/clients/nbs/mocks/client_mock.go index 86096a2ce88..a174e056727 100644 --- a/cloud/disk_manager/internal/pkg/clients/nbs/mocks/client_mock.go +++ b/cloud/disk_manager/internal/pkg/clients/nbs/mocks/client_mock.go @@ -106,6 +106,16 @@ func (c *ClientMock) DeleteCheckpointData( return args.Error(0) } +func (c *ClientMock) EnsureCheckpointReady( + ctx context.Context, + diskID string, + checkpointID string, +) error { + + args := c.Called(ctx, diskID, checkpointID) + return args.Error(0) +} + func (c *ClientMock) Resize( ctx context.Context, checkpoint func() error, diff --git a/cloud/disk_manager/internal/pkg/facade/image_service_test/image_service_test.go b/cloud/disk_manager/internal/pkg/facade/image_service_test/image_service_test.go index 84a4f0d0f0d..4458dd3edb6 100644 --- a/cloud/disk_manager/internal/pkg/facade/image_service_test/image_service_test.go +++ b/cloud/disk_manager/internal/pkg/facade/image_service_test/image_service_test.go @@ -176,6 +176,91 @@ func checkUnencryptedImage( require.ErrorContains(t, err, "KeyPath should contain path to encryption key") } +func testImageServiceCreateImageFromDiskWithKind( + t *testing.T, + diskKind disk_manager.DiskKind, + diskSize uint64, +) { + + ctx := testcommon.NewContext() + + client, err := testcommon.NewClient(ctx) + require.NoError(t, err) + defer client.Close() + + diskID := t.Name() + + reqCtx := testcommon.GetRequestContext(t, ctx) + operation, err := client.CreateDisk(reqCtx, &disk_manager.CreateDiskRequest{ + Src: &disk_manager.CreateDiskRequest_SrcEmpty{ + SrcEmpty: &empty.Empty{}, + }, + Size: int64(diskSize), + Kind: diskKind, + DiskId: &disk_manager.DiskId{ + ZoneId: "zone-a", + DiskId: diskID, + }, + }) + require.NoError(t, err) + require.NotEmpty(t, operation) + err = internal_client.WaitOperation(ctx, client, operation.Id) + require.NoError(t, err) + + nbsClient := testcommon.NewNbsClient(t, ctx, "zone-a") + diskContentInfo, err := nbsClient.FillDisk(ctx, diskID, diskSize) + require.NoError(t, err) + + imageID := t.Name() + + reqCtx = testcommon.GetRequestContext(t, ctx) + operation, err = client.CreateImage(reqCtx, &disk_manager.CreateImageRequest{ + Src: &disk_manager.CreateImageRequest_SrcDiskId{ + SrcDiskId: &disk_manager.DiskId{ + ZoneId: "zone-a", + DiskId: diskID, + }, + }, + DstImageId: imageID, + FolderId: "folder", + Pooled: true, + }) + require.NoError(t, err) + require.NotEmpty(t, operation) + + response := disk_manager.CreateImageResponse{} + err = internal_client.WaitResponse(ctx, client, operation.Id, &response) + require.NoError(t, err) + require.Equal(t, int64(diskSize), response.Size) + + meta := disk_manager.CreateImageMetadata{} + err = internal_client.GetOperationMetadata(ctx, client, operation.Id, &meta) + require.NoError(t, err) + require.Equal(t, float64(1), meta.Progress) + + testcommon.RequireCheckpointsAreEmpty(t, ctx, diskID) + + checkUnencryptedImage( + t, + client, + ctx, + imageID, + int64(diskSize), + diskContentInfo.Crc32, + ) + + reqCtx = testcommon.GetRequestContext(t, ctx) + operation, err = client.DeleteImage(reqCtx, &disk_manager.DeleteImageRequest{ + ImageId: imageID, + }) + require.NoError(t, err) + require.NotEmpty(t, operation) + err = internal_client.WaitOperation(ctx, client, operation.Id) + require.NoError(t, err) + + testcommon.CheckConsistency(t, ctx) +} + //////////////////////////////////////////////////////////////////////////////// func TestImageServiceCreateImageFromImage(t *testing.T) { @@ -906,84 +991,19 @@ func TestImageServiceCancelCreateImageFromSnapshot(t *testing.T) { } func TestImageServiceCreateImageFromDisk(t *testing.T) { - ctx := testcommon.NewContext() - - client, err := testcommon.NewClient(ctx) - require.NoError(t, err) - defer client.Close() - - diskID := t.Name() - diskSize := uint64(4194304) - - reqCtx := testcommon.GetRequestContext(t, ctx) - operation, err := client.CreateDisk(reqCtx, &disk_manager.CreateDiskRequest{ - Src: &disk_manager.CreateDiskRequest_SrcEmpty{ - SrcEmpty: &empty.Empty{}, - }, - Size: int64(diskSize), - Kind: disk_manager.DiskKind_DISK_KIND_SSD, - DiskId: &disk_manager.DiskId{ - ZoneId: "zone-a", - DiskId: diskID, - }, - }) - require.NoError(t, err) - require.NotEmpty(t, operation) - err = internal_client.WaitOperation(ctx, client, operation.Id) - require.NoError(t, err) - - nbsClient := testcommon.NewNbsClient(t, ctx, "zone-a") - diskContentInfo, err := nbsClient.FillDisk(ctx, diskID, diskSize) - require.NoError(t, err) - - imageID := t.Name() - - reqCtx = testcommon.GetRequestContext(t, ctx) - operation, err = client.CreateImage(reqCtx, &disk_manager.CreateImageRequest{ - Src: &disk_manager.CreateImageRequest_SrcDiskId{ - SrcDiskId: &disk_manager.DiskId{ - ZoneId: "zone-a", - DiskId: diskID, - }, - }, - DstImageId: imageID, - FolderId: "folder", - Pooled: true, - }) - require.NoError(t, err) - require.NotEmpty(t, operation) - - response := disk_manager.CreateImageResponse{} - err = internal_client.WaitResponse(ctx, client, operation.Id, &response) - require.NoError(t, err) - require.Equal(t, int64(diskSize), response.Size) - - meta := disk_manager.CreateImageMetadata{} - err = internal_client.GetOperationMetadata(ctx, client, operation.Id, &meta) - require.NoError(t, err) - require.Equal(t, float64(1), meta.Progress) - - testcommon.RequireCheckpointsAreEmpty(t, ctx, diskID) - - checkUnencryptedImage( + testImageServiceCreateImageFromDiskWithKind( t, - client, - ctx, - imageID, - int64(diskSize), - diskContentInfo.Crc32, + disk_manager.DiskKind_DISK_KIND_SSD, + uint64(4194304), ) +} - reqCtx = testcommon.GetRequestContext(t, ctx) - operation, err = client.DeleteImage(reqCtx, &disk_manager.DeleteImageRequest{ - ImageId: imageID, - }) - require.NoError(t, err) - require.NotEmpty(t, operation) - err = internal_client.WaitOperation(ctx, client, operation.Id) - require.NoError(t, err) - - testcommon.CheckConsistency(t, ctx) +func TestImageServiceCreateImageFromNonReplicatedDisk(t *testing.T) { + testImageServiceCreateImageFromDiskWithKind( + t, + disk_manager.DiskKind_DISK_KIND_SSD_NONREPLICATED, + uint64(1073741824), + ) } func TestImageServiceCancelCreateImageFromDisk(t *testing.T) { diff --git a/cloud/disk_manager/internal/pkg/services/images/create_image_from_disk_task.go b/cloud/disk_manager/internal/pkg/services/images/create_image_from_disk_task.go index 4909a7bc2a4..67aebbe3d23 100644 --- a/cloud/disk_manager/internal/pkg/services/images/create_image_from_disk_task.go +++ b/cloud/disk_manager/internal/pkg/services/images/create_image_from_disk_task.go @@ -93,6 +93,11 @@ func (t *createImageFromDiskTask) run( return err } + err = nbsClient.EnsureCheckpointReady(ctx, disk.DiskId, checkpointID) + if err != nil { + return err + } + taskID, err := t.scheduler.ScheduleZonalTask( headers.SetIncomingIdempotencyKey(ctx, selfTaskID+"_run"), "dataplane.CreateSnapshotFromDisk", diff --git a/cloud/disk_manager/internal/pkg/services/snapshots/create_snapshot_from_disk_task.go b/cloud/disk_manager/internal/pkg/services/snapshots/create_snapshot_from_disk_task.go index 57ea4321a02..e14a13a870e 100644 --- a/cloud/disk_manager/internal/pkg/services/snapshots/create_snapshot_from_disk_task.go +++ b/cloud/disk_manager/internal/pkg/services/snapshots/create_snapshot_from_disk_task.go @@ -15,7 +15,6 @@ import ( "github.com/ydb-platform/nbs/cloud/tasks" "github.com/ydb-platform/nbs/cloud/tasks/errors" "github.com/ydb-platform/nbs/cloud/tasks/headers" - "github.com/ydb-platform/nbs/cloud/tasks/logging" ) //////////////////////////////////////////////////////////////////////////////// @@ -91,7 +90,7 @@ func (t *createSnapshotFromDiskTask) run( return err } - err = t.ensureCheckpointReady(ctx, nbsClient, disk.DiskId, checkpointID) + err = nbsClient.EnsureCheckpointReady(ctx, disk.DiskId, checkpointID) if err != nil { return err } @@ -273,38 +272,3 @@ func (t *createSnapshotFromDiskTask) GetResponse() proto.Message { StorageSize: t.state.SnapshotStorageSize, } } - -//////////////////////////////////////////////////////////////////////////////// - -func (t *createSnapshotFromDiskTask) ensureCheckpointReady( - ctx context.Context, - nbsClient nbs.Client, - diskID string, - checkpointID string, -) error { - - status, err := nbsClient.GetCheckpointStatus(ctx, diskID, checkpointID) - if err != nil { - return err - } - - logging.Info( - ctx, - "Current CheckpointStatus: %v", - status, - ) - - switch status { - case nbs.CheckpointStatusNotReady: - return errors.NewInterruptExecutionError() - - case nbs.CheckpointStatusError: - _ = nbsClient.DeleteCheckpoint(ctx, diskID, checkpointID) - return errors.NewRetriableErrorf("Filling the NRD disk replica ended with an error.") - - case nbs.CheckpointStatusReady: - // Nothing to do. - } - - return nil -}