Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Disk Manager] retry with new checkpoint id when create snapshot if shadow disk failed during filling #2691

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
62 changes: 62 additions & 0 deletions cloud/disk_manager/internal/pkg/clients/nbs/disk_registry_state.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package nbs
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

предлагаю упростить и если уж тащить внутренности dr в дм, то только в тестового клиента (и я бы даже делал это без тестов)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

А что ты понимаешь под "упростить"?

Хм, а в чём принципиальная разница между тестовым клиентом и отдельным файликом disk_registry_state.go? Они ведь всё равно в одном модуле находятся.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Если не писать тесты на методы работы с disk registry, то появляется опасение, что выключение девайса не сработает. И тогда интеграционный тест будет работать вхолостую -- он будет завершаться успехом, но при этом по факту никогда не будет выключать девайс. Хочется обезопаситься от этого.

Было бы здорово в самом интеграционном тесте как-то проверить, что девайс действительно был выключен и что действительно был налит новый чекпоинт. Но ведь так происходит не всегда -- из-за рандома с таймингами могут быть сценарии, когда девайс не ломался.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

то, что написаны тесты - никогда не плохо

но такой большой кусок кода, особенно не используемый ДМом в проде тащить в nbs клиента неправильно - мы вообще в DA не ходим

в клиенте иметь этот код также будет причиной вопросов что это, зачем это, давайте сделаем по аналогии и тп

мы в целом уже давно хотели это исправить и мне кажется этот момент настал, тк этот код нетривиальный (тк это внутренности DA) #892


type diskRegistryCheckpointReplica struct {
CheckpointID string `json:"CheckpointId"`
SourceDiskID string `json:"SourceDiskId"`
}

type diskRegistryDisk struct {
DiskID string `json:"DiskId"`
DeviceUUIDs []string `json:"DeviceUUIDs"`
CheckpointReplica diskRegistryCheckpointReplica `json:"CheckpointReplica"`
}

type diskRegistryDevice struct {
DeviceUUID string `json:"DeviceUUID"`
}

type diskRegistryAgent struct {
Devices []diskRegistryDevice `json:"Devices"`
AgentID string `json:"AgentId"`
}

type DiskRegistryBackup struct {
Disks []diskRegistryDisk `json:"Disks"`
Agents []diskRegistryAgent `json:"Agents"`
}

type diskRegistryState struct {
Backup DiskRegistryBackup `json:"Backup"`
}

func (b *DiskRegistryBackup) GetDevicesOfDisk(diskID string) []string {
for _, disk := range b.Disks {
if disk.DiskID == diskID {
return disk.DeviceUUIDs
}
}
return nil
}

func (b *DiskRegistryBackup) GetDevicesOfShadowDisk(
originalDiskID string,
) []string {

for _, disk := range b.Disks {
if disk.CheckpointReplica.SourceDiskID == originalDiskID {
return disk.DeviceUUIDs
}
}
return nil
}

func (b *DiskRegistryBackup) GetAgentIDByDeviceUUId(deviceUUID string) string {
for _, agent := range b.Agents {
for _, device := range agent.Devices {
if device.DeviceUUID == deviceUUID {
return agent.AgentID
}
}
}
return ""
}
11 changes: 11 additions & 0 deletions cloud/disk_manager/internal/pkg/clients/nbs/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,17 @@ type Client interface {

// Used in tests.
List(ctx context.Context) ([]string, error)

// Used in tests.
BackupDiskRegistryState(ctx context.Context) (*DiskRegistryBackup, error)

// Used in tests.
DisableDevices(
ctx context.Context,
agentID string,
deviceUUIDs []string,
message string,
) error
}

////////////////////////////////////////////////////////////////////////////////
Expand Down
19 changes: 19 additions & 0 deletions cloud/disk_manager/internal/pkg/clients/nbs/mocks/client_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,25 @@ func (c *ClientMock) FinishFillDisk(
return args.Error(0)
}

func (c *ClientMock) BackupDiskRegistryState(
ctx context.Context,
) (*nbs.DiskRegistryBackup, error) {

args := c.Called(ctx)
return args.Get(0).(*nbs.DiskRegistryBackup), args.Error(1)
}

func (c *ClientMock) DisableDevices(
ctx context.Context,
agentID string,
deviceUUIDs []string,
message string,
) error {

args := c.Called(ctx, agentID, deviceUUIDs, message)
return args.Error(0)
}

////////////////////////////////////////////////////////////////////////////////

func NewClientMock() *ClientMock {
Expand Down
51 changes: 51 additions & 0 deletions cloud/disk_manager/internal/pkg/clients/nbs/testing_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package nbs
import (
"bytes"
"context"
"encoding/json"
"fmt"
"hash/crc32"
"math/rand"
Expand Down Expand Up @@ -513,6 +514,56 @@ func (c *client) Write(
return nil
}

func (c *client) BackupDiskRegistryState(
ctx context.Context,
) (*DiskRegistryBackup, error) {

output, err := c.nbs.ExecuteAction(ctx, "backupdiskregistrystate", []byte("{}"))
if err != nil {
return nil, wrapError(err)
}

var state diskRegistryState
err = json.Unmarshal(output, &state)
if err != nil {
return nil, err
}

return &state.Backup, nil
}

func (c *client) DisableDevices(
ctx context.Context,
agentID string,
deviceUUIDs []string,
message string,
) error {

if len(deviceUUIDs) == 0 {
return fmt.Errorf("list of devices to disable should contain at least one device")
}

deviceUUIDsField, err := json.Marshal(deviceUUIDs)
if err != nil {
return nil
}

input := fmt.Sprintf(
"{\"DisableAgent\":{\"AgentId\":\"%v\",\"DeviceUUIDs\":%v},\"Message\":\"%v\"}",
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Хотя ручка и называется "DisableAgent", она не будет ломать весь агент, если ей передать непустой спасок девайсов. Она сломает только девайсы из этого списка.

Сломает -- значит, девайсы начнут отдавать ошибку в ответ на все запросы чтения и записи.

agentID,
string(deviceUUIDsField),
message,
)

_, err = c.nbs.ExecuteAction(
ctx,
"diskregistrychangestate",
[]byte(input),
)

return wrapError(err)
}

////////////////////////////////////////////////////////////////////////////////

type checkpoint struct {
Expand Down
139 changes: 139 additions & 0 deletions cloud/disk_manager/internal/pkg/clients/nbs/tests/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1530,3 +1530,142 @@ func TestReadFromProxyOverlayDiskWithMultipartitionBaseDisk(t *testing.T) {
err = client.ValidateCrc32(ctx, proxyOverlayDiskID, diskContentInfo)
require.NoError(t, err)
}

func TestDiskRegistryState(t *testing.T) {
ctx := newContext()
client := newClient(t, ctx)

diskID := t.Name()

err := client.Create(ctx, nbs.CreateDiskParams{
ID: diskID,
BlocksCount: 2 * 262144,
BlockSize: 4096,
Kind: types.DiskKind_DISK_KIND_SSD_NONREPLICATED,
})
require.NoError(t, err)

backup, err := client.BackupDiskRegistryState(ctx)
require.NoError(t, err)

deviceUUIDs := backup.GetDevicesOfDisk(diskID)
require.Equal(t, len(deviceUUIDs), 2)

agentID := backup.GetAgentIDByDeviceUUId(deviceUUIDs[0])
require.NotEmpty(t, agentID)
agentID = backup.GetAgentIDByDeviceUUId(deviceUUIDs[1])
require.NotEmpty(t, agentID)

deviceUUIDs = backup.GetDevicesOfDisk("nonExistingDiskID")
require.Nil(t, deviceUUIDs)
agentID = backup.GetAgentIDByDeviceUUId("nonExistingDeviceID")
require.Empty(t, agentID)

err = client.Delete(ctx, diskID)
require.NoError(t, err)
}

func TestDiskRegistryDisableDevices(t *testing.T) {
ctx := newContext()
client := newClient(t, ctx)

diskID := t.Name()

err := client.Create(ctx, nbs.CreateDiskParams{
ID: diskID,
BlocksCount: 262144,
BlockSize: 4096,
Kind: types.DiskKind_DISK_KIND_SSD_NONREPLICATED,
})
require.NoError(t, err)

writeBlocks(t, ctx, client, diskID, 0 /* startIndex */, 1 /* blockCount */)

backup, err := client.BackupDiskRegistryState(ctx)
require.NoError(t, err)
deviceUUIDs := backup.GetDevicesOfDisk(diskID)
require.Equal(t, len(deviceUUIDs), 1)
agentID := backup.GetAgentIDByDeviceUUId(deviceUUIDs[0])
require.NotEmpty(t, agentID)

err = client.DisableDevices(ctx, agentID, deviceUUIDs, t.Name())
require.NoError(t, err)

session, err := client.MountRW(
ctx,
diskID,
0, // fillGeneration
0, // fillSeqNumber
nil, // encryption
)
require.NoError(t, err)
require.NotNil(t, session)
defer session.Close(ctx)

data := make([]byte, 4096)
rand.Read(data)

// Device is disabled, all read and write requests should return error.
err = session.Write(ctx, 0, data)
require.Error(t, err)
zero := false
err = session.Read(ctx, 0, 1, "", data, &zero)
require.Error(t, err)

err = client.Delete(ctx, diskID)
require.NoError(t, err)
}

func TestDiskRegistryFindDevicesOfShadowDisk(t *testing.T) {
ctx := newContext()
client := newClient(t, ctx)

diskID := t.Name()

err := client.Create(ctx, nbs.CreateDiskParams{
ID: diskID,
BlocksCount: 262144,
BlockSize: 4096,
Kind: types.DiskKind_DISK_KIND_SSD_NONREPLICATED,
})
require.NoError(t, err)

backup, err := client.BackupDiskRegistryState(ctx)
require.NoError(t, err)
deviceUUIDs := backup.GetDevicesOfShadowDisk(diskID)
// Shadow disk should not exist because checkpoint is not created yet.
require.Nil(t, deviceUUIDs)

checkpointID := "checkpointID"
err = client.CreateCheckpoint(ctx, nbs.CheckpointParams{
DiskID: diskID,
CheckpointID: checkpointID,
CheckpointType: nbs.CheckpointTypeNormal,
})
require.NoError(t, err)

retries := 0
for {
// Waiting for shadow disk to be created.
time.Sleep(time.Second)

backup, err := client.BackupDiskRegistryState(ctx)
require.NoError(t, err)
deviceUUIDs := backup.GetDevicesOfShadowDisk(diskID)

if len(deviceUUIDs) > 0 {
require.Equal(t, len(deviceUUIDs), 1)
break
}

retries++
if retries == 10 {
require.Fail(t, "Shadow disk has not appeared in disk registry state")
}
}

err = client.DeleteCheckpoint(ctx, diskID, checkpointID)
require.NoError(t, err)
err = client.Delete(ctx, diskID)
require.NoError(t, err)
}
1 change: 1 addition & 0 deletions cloud/disk_manager/internal/pkg/clients/nbs/tests/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ GO_TEST_FOR(cloud/disk_manager/internal/pkg/clients/nbs)

SET_APPEND(RECIPE_ARGS --nbs-only)
SET_APPEND(RECIPE_ARGS --multiple-nbs)
SET_APPEND(RECIPE_ARGS --disk-agent-count 3)
BarkovBG marked this conversation as resolved.
Show resolved Hide resolved
INCLUDE(${ARCADIA_ROOT}/cloud/disk_manager/test/recipe/recipe.inc)

GO_XTEST_SRCS(
Expand Down
1 change: 1 addition & 0 deletions cloud/disk_manager/internal/pkg/clients/nbs/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ GO_LIBRARY()

SRCS(
client.go
disk_registry_state.go
factory.go
interface.go
metrics.go
Expand Down
Loading
Loading