Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#831: [Disk Manager] validate crc32 for each block; log new crc32 for each block after writing in tests; #836

Merged
merged 4 commits into from
Apr 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 17 additions & 5 deletions cloud/disk_manager/internal/pkg/clients/nbs/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,14 @@ type CheckpointParams struct {
CheckpointType CheckpointType
}

// Used in tests.
type DiskContentInfo struct {
ContentSize uint64 // The coordinate of the last non-zero byte.
StorageSize uint64
Crc32 uint32
BlockCrc32s []uint32
}

////////////////////////////////////////////////////////////////////////////////

type Client interface {
Expand Down Expand Up @@ -294,25 +302,29 @@ type Client interface {
) error

// Used in tests.
ValidateCrc32(diskID string, contentSize uint64, expectedCrc32 uint32) error
ValidateCrc32(
ctx context.Context,
diskID string,
expectedDiskContentInfo DiskContentInfo,
) error

// Used in tests.
ValidateCrc32WithEncryption(
ctx context.Context,
diskID string,
contentSize uint64,
expectedDiskContentInfo DiskContentInfo,
encryption *types.EncryptionDesc,
expectedCrc32 uint32,
) error

// Used in tests.
CalculateCrc32(diskID string, contentSize uint64) (uint32, error)
CalculateCrc32(diskID string, contentSize uint64) (DiskContentInfo, error)

// Used in tests.
CalculateCrc32WithEncryption(
diskID string,
contentSize uint64,
encryption *types.EncryptionDesc,
) (uint32, error)
) (DiskContentInfo, error)

// Used in tests.
MountForReadWrite(diskID string) (func(), error)
Expand Down
23 changes: 14 additions & 9 deletions cloud/disk_manager/internal/pkg/clients/nbs/mocks/client_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,29 +323,34 @@ func (c *ClientMock) Stat(
////////////////////////////////////////////////////////////////////////////////

func (c *ClientMock) ValidateCrc32(
ctx context.Context,
diskID string,
contentSize uint64,
expectedCrc32 uint32,
expectedDiskContentInfo nbs.DiskContentInfo,
) error {

return c.ValidateCrc32WithEncryption(diskID, contentSize, nil, expectedCrc32)
return c.ValidateCrc32WithEncryption(
ctx,
diskID,
expectedDiskContentInfo,
nil,
)
}

func (c *ClientMock) ValidateCrc32WithEncryption(
ctx context.Context,
diskID string,
contentSize uint64,
expectedDiskContentInfo nbs.DiskContentInfo,
encryption *types.EncryptionDesc,
expectedCrc32 uint32,
) error {

args := c.Called(diskID, contentSize, encryption, expectedCrc32)
args := c.Called(ctx, diskID, expectedDiskContentInfo, encryption)
return args.Error(0)
}

func (c *ClientMock) CalculateCrc32(
diskID string,
contentSize uint64,
) (uint32, error) {
) (nbs.DiskContentInfo, error) {

return c.CalculateCrc32WithEncryption(diskID, contentSize, nil)
}
Expand All @@ -354,10 +359,10 @@ func (c *ClientMock) CalculateCrc32WithEncryption(
diskID string,
contentSize uint64,
encryption *types.EncryptionDesc,
) (uint32, error) {
) (nbs.DiskContentInfo, error) {

args := c.Called(diskID, contentSize, encryption)
return args.Get(0).(uint32), args.Error(1)
return args.Get(0).(nbs.DiskContentInfo), args.Error(1)
}

func (c *ClientMock) MountForReadWrite(
Expand Down
93 changes: 72 additions & 21 deletions cloud/disk_manager/internal/pkg/clients/nbs/testing_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@ import (
"github.com/ydb-platform/nbs/cloud/blockstore/public/api/protos"
nbs_client "github.com/ydb-platform/nbs/cloud/blockstore/public/sdk/go/client"
"github.com/ydb-platform/nbs/cloud/disk_manager/internal/pkg/types"
"github.com/ydb-platform/nbs/cloud/tasks/logging"
)

////////////////////////////////////////////////////////////////////////////////

func (c *client) CalculateCrc32(
diskID string,
contentSize uint64,
) (uint32, error) {
) (DiskContentInfo, error) {

return c.CalculateCrc32WithEncryption(diskID, contentSize, nil)
}
Expand All @@ -24,13 +25,13 @@ func (c *client) CalculateCrc32WithEncryption(
diskID string,
contentSize uint64,
encryption *types.EncryptionDesc,
) (uint32, error) {
) (DiskContentInfo, error) {

ctx := setupStderrLogger(context.Background())

nbsClient, _, err := c.nbs.DiscoverInstance(ctx)
if err != nil {
return 0, err
return DiskContentInfo{}, err
}
defer nbsClient.Close()

Expand All @@ -42,7 +43,7 @@ func (c *client) CalculateCrc32WithEncryption(

encryptionSpec, err := getEncryptionSpec(encryption)
if err != nil {
return 0, err
return DiskContentInfo{}, err
}

opts := nbs_client.MountVolumeOpts{
Expand All @@ -54,22 +55,22 @@ func (c *client) CalculateCrc32WithEncryption(
}
err = session.MountVolume(ctx, diskID, &opts)
if err != nil {
return 0, err
return DiskContentInfo{}, err
}
defer session.UnmountVolume(ctx)

volume := session.Volume()

volumeBlockSize := uint64(volume.BlockSize)
if volumeBlockSize == 0 {
return 0, fmt.Errorf(
return DiskContentInfo{}, fmt.Errorf(
"%v volume block size should not be zero",
diskID,
)
}

if contentSize%volumeBlockSize != 0 {
return 0, fmt.Errorf(
return DiskContentInfo{}, fmt.Errorf(
"%v contentSize %v should be multiple of volumeBlockSize %v",
diskID,
contentSize,
Expand All @@ -81,7 +82,7 @@ func (c *client) CalculateCrc32WithEncryption(
volumeSize := volume.BlocksCount * volumeBlockSize

if contentSize > volumeSize {
return 0, fmt.Errorf(
return DiskContentInfo{}, fmt.Errorf(
"%v contentSize %v should not be greater than volumeSize %v",
diskID,
contentSize,
Expand All @@ -92,12 +93,13 @@ func (c *client) CalculateCrc32WithEncryption(
chunkSize := uint64(4 * 1024 * 1024)
blocksInChunk := chunkSize / volumeBlockSize
acc := crc32.NewIEEE()
blockCrc32s := []uint32{}

for offset := uint64(0); offset < contentBlocksCount; offset += blocksInChunk {
blocksToRead := min(contentBlocksCount-offset, blocksInChunk)
buffers, err := session.ReadBlocks(ctx, offset, uint32(blocksToRead), "")
if err != nil {
return 0, fmt.Errorf(
return DiskContentInfo{}, fmt.Errorf(
"%v read blocks at (%v, %v) failed: %w",
diskID,
offset,
Expand All @@ -107,14 +109,22 @@ func (c *client) CalculateCrc32WithEncryption(
}

for _, buffer := range buffers {
blockAcc := crc32.NewIEEE()
if len(buffer) == 0 {
buffer = make([]byte, volumeBlockSize)
}

_, err := acc.Write(buffer)
if err != nil {
return 0, err
return DiskContentInfo{}, err
}

_, err = blockAcc.Write(buffer)
if err != nil {
return DiskContentInfo{}, err
}

blockCrc32s = append(blockCrc32s, blockAcc.Sum32())
}
}

Expand All @@ -123,7 +133,7 @@ func (c *client) CalculateCrc32WithEncryption(
blocksToRead := min(volume.BlocksCount-offset, blocksInChunk)
buffers, err := session.ReadBlocks(ctx, offset, uint32(blocksToRead), "")
if err != nil {
return 0, fmt.Errorf(
return DiskContentInfo{}, fmt.Errorf(
"%v read blocks at (%v, %v) failed: %w",
diskID,
offset,
Expand All @@ -136,7 +146,7 @@ func (c *client) CalculateCrc32WithEncryption(
if len(buffer) != 0 {
for j, b := range buffer {
if b != 0 {
return 0, fmt.Errorf(
return DiskContentInfo{}, fmt.Errorf(
"%v non zero byte %v detected at (%v, %v)",
diskID,
b,
Expand All @@ -149,33 +159,74 @@ func (c *client) CalculateCrc32WithEncryption(
}
}

return acc.Sum32(), nil
return DiskContentInfo{
ContentSize: contentSize,
Crc32: acc.Sum32(),
BlockCrc32s: blockCrc32s,
}, nil
}

func (c *client) ValidateCrc32(
ctx context.Context,
diskID string,
contentSize uint64,
expectedCrc32 uint32,
expectedDiskContentInfo DiskContentInfo,
) error {

return c.ValidateCrc32WithEncryption(diskID, contentSize, nil, expectedCrc32)
return c.ValidateCrc32WithEncryption(
ctx,
diskID,
expectedDiskContentInfo,
nil,
)
}

func (c *client) ValidateCrc32WithEncryption(
ctx context.Context,
diskID string,
contentSize uint64,
expectedDiskContentInfo DiskContentInfo,
encryption *types.EncryptionDesc,
expectedCrc32 uint32,
) error {

actualCrc32, err := c.CalculateCrc32WithEncryption(diskID, contentSize, encryption)
actualDiskContentInfo, err := c.CalculateCrc32WithEncryption(
diskID,
expectedDiskContentInfo.ContentSize,
encryption,
)
if err != nil {
return err
}

if expectedCrc32 != actualCrc32 {
actualCrc32 := actualDiskContentInfo.Crc32
expectedCrc32 := expectedDiskContentInfo.Crc32
actualBlockCrc32s := actualDiskContentInfo.BlockCrc32s
expectedBlockCrc32s := expectedDiskContentInfo.BlockCrc32s

if len(actualBlockCrc32s) != len(expectedBlockCrc32s) {
logging.Debug(
ctx,
"%v blocksCrc32 length doesn't match, expected %v, actual %v",
diskID,
len(expectedBlockCrc32s),
len(actualBlockCrc32s),
)
} else {
for i := range expectedDiskContentInfo.BlockCrc32s {
if actualBlockCrc32s[i] != expectedBlockCrc32s[i] {
logging.Debug(
ctx,
"%v block with index %v crc32 doesn't match, expected %v, actual %v",
diskID,
i,
expectedBlockCrc32s[i],
actualBlockCrc32s[i],
)
}
}
}

if actualCrc32 != expectedCrc32 {
return fmt.Errorf(
"%v crc32 doesn't match, expected=%v, actual=%v",
"%v crc32 doesn't match, expected %v, actual %v",
diskID,
expectedCrc32,
actualCrc32,
Expand Down
Loading