Skip to content

Commit

Permalink
#356: [Disk Manager] add large tests for proxy overlay disks (#848)
Browse files Browse the repository at this point in the history
* add large tests for proxy overlay disks

* mv to nbs client tests

* rebase

* fix issues
  • Loading branch information
BarkovBG authored Apr 3, 2024
1 parent 66cd19a commit b36e985
Show file tree
Hide file tree
Showing 9 changed files with 337 additions and 195 deletions.
21 changes: 21 additions & 0 deletions cloud/disk_manager/internal/pkg/clients/nbs/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,27 @@ type Client interface {
fillGeneration uint64,
) error

// Used in tests.
FillDisk(
ctx context.Context,
diskID string,
contentSize uint64,
) (DiskContentInfo, error)

// Used in tests.
FillEncryptedDisk(
ctx context.Context,
diskID string,
contentSize uint64,
encryption *types.EncryptionDesc,
) (DiskContentInfo, error)

// Used in tests.
GoWriteRandomBlocksToNbsDisk(
ctx context.Context,
diskID string,
) (func() error, error)

// Used in tests.
ValidateCrc32(
ctx context.Context,
Expand Down
29 changes: 29 additions & 0 deletions cloud/disk_manager/internal/pkg/clients/nbs/mocks/client_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,35 @@ func (c *ClientMock) Stat(

////////////////////////////////////////////////////////////////////////////////

func (c *ClientMock) FillDisk(
ctx context.Context,
diskID string,
contentSize uint64,
) (nbs.DiskContentInfo, error) {

return c.FillEncryptedDisk(ctx, diskID, contentSize, nil)
}

func (c *ClientMock) FillEncryptedDisk(
ctx context.Context,
diskID string,
contentSize uint64,
encryption *types.EncryptionDesc,
) (nbs.DiskContentInfo, error) {

args := c.Called(ctx, diskID, contentSize, encryption)
return args.Get(0).(nbs.DiskContentInfo), args.Error(1)
}

func (c *ClientMock) GoWriteRandomBlocksToNbsDisk(
ctx context.Context,
diskID string,
) (func() error, error) {

args := c.Called(ctx, diskID)
return args.Get(0).(func() error), args.Error(1)
}

func (c *ClientMock) ValidateCrc32(
ctx context.Context,
diskID string,
Expand Down
173 changes: 173 additions & 0 deletions cloud/disk_manager/internal/pkg/clients/nbs/testing_client.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,191 @@
package nbs

import (
"bytes"
"context"
"fmt"
"hash/crc32"
"math/rand"
"time"

"github.com/ydb-platform/nbs/cloud/blockstore/public/api/protos"
nbs_client "github.com/ydb-platform/nbs/cloud/blockstore/public/sdk/go/client"
"github.com/ydb-platform/nbs/cloud/disk_manager/internal/pkg/types"
"github.com/ydb-platform/nbs/cloud/tasks/logging"
"golang.org/x/sync/errgroup"
)

////////////////////////////////////////////////////////////////////////////////

func (c *client) FillDisk(
ctx context.Context,
diskID string,
contentSize uint64,
) (DiskContentInfo, error) {

return c.FillEncryptedDisk(ctx, diskID, contentSize, nil)
}

func (c *client) FillEncryptedDisk(
ctx context.Context,
diskID string,
contentSize uint64,
encryption *types.EncryptionDesc,
) (DiskContentInfo, error) {

session, err := c.MountRW(
ctx,
diskID,
0, // fillGeneration
0, // fillSeqNumber
encryption,
)
if err != nil {
return DiskContentInfo{}, err
}
defer session.Close(ctx)

chunkSize := uint64(1024 * 4096) // 4 MiB
blockSize := uint64(session.BlockSize())
blocksInChunk := uint32(chunkSize / blockSize)
storageSize := uint64(0)
zeroes := make([]byte, chunkSize)

rand.Seed(time.Now().UnixNano())

acc := crc32.NewIEEE()
blockCrc32s := []uint32{}
for offset := uint64(0); offset < contentSize; offset += chunkSize {
startIndex := offset / blockSize
data := make([]byte, chunkSize)
dice := rand.Intn(3)

var err error
switch dice {
case 0:
rand.Read(data)
if bytes.Equal(data, zeroes) {
logging.Debug(ctx, "rand generated all zeroes")
}

err = session.Write(ctx, startIndex, data)
storageSize += chunkSize
case 1:
err = session.Zero(ctx, startIndex, blocksInChunk)
}
if err != nil {
return DiskContentInfo{}, err
}

_, err = acc.Write(data)
if err != nil {
return DiskContentInfo{}, err
}

for blockIndex := uint32(0); blockIndex < blocksInChunk; blockIndex++ {
blockAcc := crc32.NewIEEE()

startOffset := uint64(blockIndex) * blockSize
endOffset := uint64(blockIndex+1) * blockSize
blockData := data[startOffset:endOffset]

_, err = blockAcc.Write(blockData)
if err != nil {
return DiskContentInfo{}, err
}

logging.Debug(
ctx,
"%v block with index %v crc32 now is %v",
diskID,
startIndex+uint64(blockIndex),
blockAcc.Sum32(),
)
blockCrc32s = append(blockCrc32s, blockAcc.Sum32())
}
}

return DiskContentInfo{
ContentSize: contentSize,
StorageSize: storageSize,
Crc32: acc.Sum32(),
BlockCrc32s: blockCrc32s,
}, nil
}

func (c *client) GoWriteRandomBlocksToNbsDisk(
ctx context.Context,
diskID string,
) (func() error, error) {

session, err := c.MountRW(
ctx,
diskID,
0, // fillGeneration
0, // fillSeqNumber
nil, // encryption
)
if err != nil {
return nil, err
}

errGroup := new(errgroup.Group)

errGroup.Go(func() error {
defer session.Close(ctx)

writeCount := uint32(1000)

blockSize := session.BlockSize()
blocksCount := session.BlockCount()
zeroes := make([]byte, blockSize)

rand.Seed(time.Now().UnixNano())

for i := uint32(0); i < writeCount; i++ {
blockIndex := uint64(rand.Int63n(int64(blocksCount)))
dice := rand.Intn(2)

var err error
blockAcc := crc32.NewIEEE()
data := make([]byte, blockSize)

switch dice {
case 0:
rand.Read(data)
if bytes.Equal(data, zeroes) {
logging.Debug(ctx, "rand generated all zeroes")
}

err = session.Write(ctx, blockIndex, data)
case 1:
err = session.Zero(ctx, blockIndex, 1)
}

if err != nil {
return err
}

_, err = blockAcc.Write(data)
if err != nil {
return err
}

logging.Debug(
ctx,
"%v block with index %v crc32 now is %v",
diskID,
blockIndex,
blockAcc.Sum32(),
)
}

return nil
})

return errGroup.Wait, nil
}

func (c *client) CalculateCrc32(
diskID string,
contentSize uint64,
Expand Down
99 changes: 99 additions & 0 deletions cloud/disk_manager/internal/pkg/clients/nbs/tests/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1429,3 +1429,102 @@ func TestGetChangedBlocksForLightCheckpoints(t *testing.T) {
err = client.DeleteCheckpoint(ctx, diskID, "checkpoint_3")
require.NoError(t, err)
}

func TestReadFromProxyOverlayDisk(t *testing.T) {
ctx := newContext()
client := newClient(t, ctx)

diskID := t.Name()
diskSize := int64(1024 * 4096)

err := client.Create(ctx, nbs.CreateDiskParams{
ID: diskID,
BlocksCount: 1024,
BlockSize: 4096,
Kind: types.DiskKind_DISK_KIND_SSD,
PartitionsCount: 1,
})
require.NoError(t, err)

diskContentInfo, err := client.FillDisk(
ctx,
diskID,
uint64(diskSize),
)
require.NoError(t, err)

err = client.CreateCheckpoint(ctx, nbs.CheckpointParams{
DiskID: diskID,
CheckpointID: "cp",
CheckpointType: nbs.CheckpointTypeNormal,
})
require.NoError(t, err)

proxyOverlayDiskID := "proxy_" + diskID
created, err := client.CreateProxyOverlayDisk(
ctx,
proxyOverlayDiskID,
diskID,
"cp",
)
require.True(t, created)
require.NoError(t, err)

err = client.ValidateCrc32(ctx, proxyOverlayDiskID, diskContentInfo)
require.NoError(t, err)
}

func TestReadFromProxyOverlayDiskWithMultipartitionBaseDisk(t *testing.T) {
ctx := newContext()
client := newClient(t, ctx)

diskID := t.Name()
diskSize := int64(1024 * 4096)

err := client.Create(ctx, nbs.CreateDiskParams{
ID: diskID,
BlocksCount: 1024,
BlockSize: 4096,
Kind: types.DiskKind_DISK_KIND_SSD,
PartitionsCount: 2,
})
require.NoError(t, err)

diskContentInfo, err := client.FillDisk(ctx, diskID, uint64(diskSize))
require.NoError(t, err)

err = client.CreateCheckpoint(ctx, nbs.CheckpointParams{
DiskID: diskID,
CheckpointID: "cp",
CheckpointType: nbs.CheckpointTypeNormal,
})
require.NoError(t, err)

proxyOverlayDiskID := "proxy_" + diskID
created, err := client.CreateProxyOverlayDisk(
ctx,
proxyOverlayDiskID,
diskID,
"cp",
)
// Now it is not allowed to create proxy overlay disks for multipartition
// disks.
require.False(t, created)
require.NoError(t, err)

// We need to create proxy overlay disk manually.
err = client.Create(ctx, nbs.CreateDiskParams{
ID: proxyOverlayDiskID,
BaseDiskID: diskID,
BaseDiskCheckpointID: "cp",
BlocksCount: 1024,
BlockSize: 4096,
Kind: types.DiskKind_DISK_KIND_SSD,
PartitionsCount: 1,
IsSystem: true,
})
require.NoError(t, err)

err = client.ValidateCrc32(ctx, proxyOverlayDiskID, diskContentInfo)
require.NoError(t, err)
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ func setupMigrationTest(

if params.FillDisk {
nbsClient := testcommon.NewNbsClient(t, ctx, params.SrcZoneID)
_, err = testcommon.FillDisk(
nbsClient,
_, err = nbsClient.FillDisk(
ctx,
params.DiskID,
uint64(params.DiskSize),
)
Expand All @@ -80,9 +80,8 @@ func successfullyMigrateDisk(
srcZoneNBSClient := testcommon.NewNbsClient(t, ctx, params.SrcZoneID)

// Writing some additional data to disk in parallel with migration.
waitForWrite, err := testcommon.GoWriteRandomBlocksToNbsDisk(
waitForWrite, err := srcZoneNBSClient.GoWriteRandomBlocksToNbsDisk(
ctx,
srcZoneNBSClient,
params.DiskID,
)
require.NoError(t, err)
Expand Down Expand Up @@ -288,9 +287,8 @@ func migrateDiskInParallel(
srcZoneNBSClient := testcommon.NewNbsClient(t, ctx, params.SrcZoneID)

// Writing some additional data to disk in parallel with migrations.
waitForWrite, err := testcommon.GoWriteRandomBlocksToNbsDisk(
waitForWrite, err := srcZoneNBSClient.GoWriteRandomBlocksToNbsDisk(
ctx,
srcZoneNBSClient,
params.DiskID,
)
require.NoError(t, err)
Expand Down
Loading

0 comments on commit b36e985

Please sign in to comment.