Skip to content

Commit

Permalink
831: [Disk Manager] validate crc32 for each block
Browse files Browse the repository at this point in the history
  • Loading branch information
BarkovBG committed Mar 28, 2024
1 parent d376706 commit 5f559d3
Show file tree
Hide file tree
Showing 5 changed files with 178 additions and 66 deletions.
14 changes: 11 additions & 3 deletions cloud/disk_manager/internal/pkg/clients/nbs/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,25 +294,33 @@ type Client interface {
) error

// Used in tests.
ValidateCrc32(diskID string, contentSize uint64, expectedCrc32 uint32) error
ValidateCrc32(
ctx context.Context,
diskID string,
contentSize uint64,
expectedCrc32 uint32,
expectedBlocksCrc32 []uint32,
) error

// Used in tests.
ValidateCrc32WithEncryption(
ctx context.Context,
diskID string,
contentSize uint64,
encryption *types.EncryptionDesc,
expectedCrc32 uint32,
expectedBlocksCrc32 []uint32,
) error

// Used in tests.
CalculateCrc32(diskID string, contentSize uint64) (uint32, error)
CalculateCrc32(diskID string, contentSize uint64) (uint32, []uint32, error)

// Used in tests.
CalculateCrc32WithEncryption(
diskID string,
contentSize uint64,
encryption *types.EncryptionDesc,
) (uint32, error)
) (uint32, []uint32, error)

// Used in tests.
MountForReadWrite(diskID string) (func(), error)
Expand Down
80 changes: 64 additions & 16 deletions cloud/disk_manager/internal/pkg/clients/nbs/testing_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@ import (
"github.com/ydb-platform/nbs/cloud/blockstore/public/api/protos"
nbs_client "github.com/ydb-platform/nbs/cloud/blockstore/public/sdk/go/client"
"github.com/ydb-platform/nbs/cloud/disk_manager/internal/pkg/types"
"github.com/ydb-platform/nbs/cloud/tasks/logging"
)

////////////////////////////////////////////////////////////////////////////////

func (c *client) CalculateCrc32(
diskID string,
contentSize uint64,
) (uint32, error) {
) (uint32, []uint32, error) {

return c.CalculateCrc32WithEncryption(diskID, contentSize, nil)
}
Expand All @@ -24,13 +25,13 @@ func (c *client) CalculateCrc32WithEncryption(
diskID string,
contentSize uint64,
encryption *types.EncryptionDesc,
) (uint32, error) {
) (uint32, []uint32, error) {

ctx := setupStderrLogger(context.Background())

nbsClient, _, err := c.nbs.DiscoverInstance(ctx)
if err != nil {
return 0, err
return 0, []uint32{}, err
}
defer nbsClient.Close()

Expand All @@ -42,7 +43,7 @@ func (c *client) CalculateCrc32WithEncryption(

encryptionSpec, err := getEncryptionSpec(encryption)
if err != nil {
return 0, err
return 0, []uint32{}, err
}

opts := nbs_client.MountVolumeOpts{
Expand All @@ -54,22 +55,22 @@ func (c *client) CalculateCrc32WithEncryption(
}
err = session.MountVolume(ctx, diskID, &opts)
if err != nil {
return 0, err
return 0, []uint32{}, err
}
defer session.UnmountVolume(ctx)

volume := session.Volume()

volumeBlockSize := uint64(volume.BlockSize)
if volumeBlockSize == 0 {
return 0, fmt.Errorf(
return 0, []uint32{}, fmt.Errorf(
"%v volume block size should not be zero",
diskID,
)
}

if contentSize%volumeBlockSize != 0 {
return 0, fmt.Errorf(
return 0, []uint32{}, fmt.Errorf(
"%v contentSize %v should be multiple of volumeBlockSize %v",
diskID,
contentSize,
Expand All @@ -81,7 +82,7 @@ func (c *client) CalculateCrc32WithEncryption(
volumeSize := volume.BlocksCount * volumeBlockSize

if contentSize > volumeSize {
return 0, fmt.Errorf(
return 0, []uint32{}, fmt.Errorf(
"%v contentSize %v should not be greater than volumeSize %v",
diskID,
contentSize,
Expand All @@ -92,12 +93,13 @@ func (c *client) CalculateCrc32WithEncryption(
chunkSize := uint64(4 * 1024 * 1024)
blocksInChunk := chunkSize / volumeBlockSize
acc := crc32.NewIEEE()
blocksCrc32 := []uint32{}

for offset := uint64(0); offset < contentBlocksCount; offset += blocksInChunk {
blocksToRead := min(contentBlocksCount-offset, blocksInChunk)
buffers, err := session.ReadBlocks(ctx, offset, uint32(blocksToRead), "")
if err != nil {
return 0, fmt.Errorf(
return 0, []uint32{}, fmt.Errorf(
"%v read blocks at (%v, %v) failed: %w",
diskID,
offset,
Expand All @@ -107,14 +109,22 @@ func (c *client) CalculateCrc32WithEncryption(
}

for _, buffer := range buffers {
blockAcc := crc32.NewIEEE()
if len(buffer) == 0 {
buffer = make([]byte, volumeBlockSize)
}

_, err := acc.Write(buffer)
if err != nil {
return 0, err
return 0, []uint32{}, err
}

_, err = blockAcc.Write(buffer)
if err != nil {
return 0, []uint32{}, err
}

blocksCrc32 = append(blocksCrc32, blockAcc.Sum32())
}
}

Expand All @@ -123,7 +133,7 @@ func (c *client) CalculateCrc32WithEncryption(
blocksToRead := min(volume.BlocksCount-offset, blocksInChunk)
buffers, err := session.ReadBlocks(ctx, offset, uint32(blocksToRead), "")
if err != nil {
return 0, fmt.Errorf(
return 0, []uint32{}, fmt.Errorf(
"%v read blocks at (%v, %v) failed: %w",
diskID,
offset,
Expand All @@ -136,7 +146,7 @@ func (c *client) CalculateCrc32WithEncryption(
if len(buffer) != 0 {
for j, b := range buffer {
if b != 0 {
return 0, fmt.Errorf(
return 0, []uint32{}, fmt.Errorf(
"%v non zero byte %v detected at (%v, %v)",
diskID,
b,
Expand All @@ -149,33 +159,71 @@ func (c *client) CalculateCrc32WithEncryption(
}
}

return acc.Sum32(), nil
return acc.Sum32(), blocksCrc32, nil
}

func (c *client) ValidateCrc32(
ctx context.Context,
diskID string,
contentSize uint64,
expectedCrc32 uint32,
expectedBlocksCrc32 []uint32,
) error {

return c.ValidateCrc32WithEncryption(diskID, contentSize, nil, expectedCrc32)
return c.ValidateCrc32WithEncryption(
ctx,
diskID,
contentSize,
nil,
expectedCrc32,
expectedBlocksCrc32,
)
}

func (c *client) ValidateCrc32WithEncryption(
ctx context.Context,
diskID string,
contentSize uint64,
encryption *types.EncryptionDesc,
expectedCrc32 uint32,
expectedBlocksCrc32 []uint32,
) error {

actualCrc32, err := c.CalculateCrc32WithEncryption(diskID, contentSize, encryption)
actualCrc32, actualBlocksCrc32, err := c.CalculateCrc32WithEncryption(
diskID,
contentSize,
encryption,
)
if err != nil {
return err
}

if len(actualBlocksCrc32) != len(expectedBlocksCrc32) {
logging.Debug(
ctx,
"%v blocksCrc32 length doesn't match, expected %v, actual %v",
diskID,
len(expectedBlocksCrc32),
len(actualBlocksCrc32),
)
} else {
for i, _ := range expectedBlocksCrc32 {
if actualBlocksCrc32[i] != expectedBlocksCrc32[i] {
logging.Debug(
ctx,
"%v block with index %v crc32 doesn't match, expected %v, actual %v",
diskID,
i,
expectedBlocksCrc32[i],
actualBlocksCrc32[i],
)
}
}
}

if expectedCrc32 != actualCrc32 {
return fmt.Errorf(
"%v crc32 doesn't match, expected=%v, actual=%v",
"%v crc32 doesn't match, expected %v, actual %v",
diskID,
expectedCrc32,
actualCrc32,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func setupMigrationTest(

if params.FillDisk {
nbsClient := testcommon.NewNbsClient(t, ctx, params.SrcZoneID)
_, _, err = testcommon.FillDisk(
_, _, _, err = testcommon.FillDisk(
nbsClient,
params.DiskID,
uint64(params.DiskSize),
Expand Down Expand Up @@ -113,7 +113,7 @@ func successfullyMigrateDisk(

diskSize := uint64(params.DiskSize)

srcCrc32, err := srcZoneNBSClient.CalculateCrc32(params.DiskID, diskSize)
srcCrc32, srcBlocksCrc32, err := srcZoneNBSClient.CalculateCrc32(params.DiskID, diskSize)
require.NoError(t, err)

err = client.SendMigrationSignal(ctx, &disk_manager.SendMigrationSignalRequest{
Expand Down Expand Up @@ -145,9 +145,15 @@ func successfullyMigrateDisk(
require.ErrorContains(t, err, "Path not found")

dstZoneNBSClient := testcommon.NewNbsClient(t, ctx, params.DstZoneID)
dstCrc32, err := dstZoneNBSClient.CalculateCrc32(params.DiskID, diskSize)

err = dstZoneNBSClient.ValidateCrc32(
ctx,
params.DiskID,
diskSize,
srcCrc32,
srcBlocksCrc32,
)
require.NoError(t, err)
require.Equal(t, srcCrc32, dstCrc32)
}

func startAndCancelMigration(
Expand Down Expand Up @@ -325,7 +331,7 @@ func migrateDiskInParallel(

diskSize := uint64(params.DiskSize)

srcCrc32, err := srcZoneNBSClient.CalculateCrc32(params.DiskID, diskSize)
srcCrc32, srcBlocksCrc32, err := srcZoneNBSClient.CalculateCrc32(params.DiskID, diskSize)
require.NoError(t, err)

for _, operation := range operations {
Expand Down Expand Up @@ -378,14 +384,22 @@ func migrateDiskInParallel(
require.ErrorContains(t, err, "Path not found")

dstZoneNBSClient := testcommon.NewNbsClient(t, ctx, dstZoneID)
dstCrc32, err := dstZoneNBSClient.CalculateCrc32(params.DiskID, diskSize)
err := dstZoneNBSClient.ValidateCrc32(
params.DiskID,
diskSize,
srcCrc32,
srcBlocksCrc32,
)
require.NoError(t, err)
require.Equal(t, srcCrc32, dstCrc32)
} else {
// All migrations are cancelled. Check that src disk is not affected.
crc32, err := srcZoneNBSClient.CalculateCrc32(params.DiskID, diskSize)
err := srcZoneNBSClient.ValidateCrc32(
params.DiskID,
diskSize,
srcCrc32,
srcBlocksCrc32,
)
require.NoError(t, err)
require.Equal(t, srcCrc32, crc32)
}
}

Expand Down Expand Up @@ -421,7 +435,7 @@ func successfullyMigrateEmptyDisk(

diskSize := uint64(migrationTestsDiskSize)

srcCrc32, err := srcZoneNBSClient.CalculateCrc32(params.DiskID, diskSize)
srcCrc32, blocksCrc32, err := srcZoneNBSClient.CalculateCrc32(params.DiskID, diskSize)
require.NoError(t, err)

metadata := &disk_manager.MigrateDiskMetadata{}
Expand All @@ -443,9 +457,13 @@ func successfullyMigrateEmptyDisk(
require.ErrorContains(t, err, "Path not found")

dstZoneNBSClient := testcommon.NewNbsClient(t, ctx, params.DstZoneID)
dstCrc32, err := dstZoneNBSClient.CalculateCrc32(params.DiskID, diskSize)
err = dstZoneNBSClient.ValidateCrc32(
params.DiskID,
diskSize,
srcCrc32,
blocksCrc32,
)
require.NoError(t, err)
require.Equal(t, srcCrc32, dstCrc32)
}

func waitForMigrationStatus(
Expand Down Expand Up @@ -703,7 +721,7 @@ func TestDiskServiceMigrateOverlayDisk(t *testing.T) {
imageID := t.Name()
diskSize := migrationTestsDiskSize
imageSize := diskSize / 2
_, _ = testcommon.CreateImage(
_, _, _ = testcommon.CreateImage(
t,
ctx,
imageID,
Expand Down
Loading

0 comments on commit 5f559d3

Please sign in to comment.