Skip to content

Commit

Permalink
Fix shadow disk notification (#2619)
Browse files Browse the repository at this point in the history
  • Loading branch information
drbasic committed Dec 5, 2024
1 parent 5d2893d commit 0a64e1d
Show file tree
Hide file tree
Showing 3 changed files with 233 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -425,11 +425,16 @@ void TDiskRegistryState::AllowNotifications(
{
// currently we don't want to notify users about mirrored disks since they are not
// supposed to break

if (disk.MasterDiskId) {
return;
}

// We do not want to notify the user about the breakdowns of the shadow disks.
if (disk.CheckpointReplica.GetCheckpointId()) {
return;
}


Y_DEBUG_ABORT_UNLESS(IsDiskRegistryMediaKind(disk.MediaKind));
if (!IsReliableDiskRegistryMediaKind(disk.MediaKind)) {
NotificationSystem.AllowNotifications(diskId);
Expand Down Expand Up @@ -608,6 +613,11 @@ void TDiskRegistryState::AddMigration(
const TString& diskId,
const TString& sourceDeviceId)
{
if (disk.CheckpointReplica.GetCheckpointId()) {
// Don't start migrations for shadow disks.
return;
}

if (IsDiskRegistryLocalMediaKind(disk.MediaKind) ||
disk.MediaKind == NProto::STORAGE_MEDIA_HDD_NONREPLICATED)
{
Expand Down Expand Up @@ -4742,6 +4752,9 @@ ui64 TDiskRegistryState::AddReallocateRequest(
if (disk && disk->MasterDiskId) {
diskId = disk->MasterDiskId;
}
if (disk && disk->CheckpointReplica.GetCheckpointId()) {
diskId = disk->CheckpointReplica.GetSourceDiskId();
}

return NotificationSystem.AddReallocateRequest(db, diskId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,8 @@ class TDiskRegistryState

ui32 ReplicaCount = 0;
TString MasterDiskId;

// Filled if the disk is a shadow disk for the checkpoint.
NProto::TCheckpointReplica CheckpointReplica;

TVector<TDeviceId> DeviceReplacementIds;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,26 @@ TDiskRegistryState MakeDiskRegistryState()
.Build();
}

auto ChangeAgentState(
TDiskRegistryState& state,
TDiskRegistryDatabase db,
const NProto::TAgentConfig& config,
NProto::EAgentState newState)
{
TVector<TString> affectedDisks;

auto error = state.UpdateAgentState(
db,
config.GetAgentId(),
newState,
TInstant::Now(),
"test",
affectedDisks);
UNIT_ASSERT_VALUES_EQUAL(S_OK, error.GetCode());

return affectedDisks;
}

} // namespace

////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -608,6 +628,203 @@ Y_UNIT_TEST_SUITE(TDiskRegistryStateCheckpointTest)
UNIT_ASSERT_VALUES_EQUAL(E_NOT_FOUND, error.GetCode());
});
}

Y_UNIT_TEST(ShouldNotMigrateShadowDiskDevices)
{
TTestExecutor executor;
executor.WriteTx([&](TDiskRegistryDatabase db) { db.InitSchema(); });

TDiskRegistryState state = MakeDiskRegistryState();

// Create source disk
executor.WriteTx(
[&](TDiskRegistryDatabase db)
{
TVector<TDeviceConfig> devices;
TVector<TVector<TDeviceConfig>> replicas;
TVector<NProto::TDeviceMigration> migrations;
TVector<TString> deviceReplacementIds;
auto error = AllocateDisk(
db,
state,
"disk-1",
"", // placementGroupId
0, // placementPartitionIndex
40_GB,
devices);
UNIT_ASSERT_SUCCESS(error);
});

// create checkpoint
executor.WriteTx(
[&](TDiskRegistryDatabase db)
{
TString shadowDiskId;
TVector<TDeviceConfig> devices;
auto error = AllocateCheckpoint(
Now(),
db,
state,
"disk-1",
"checkpoint-1",
&shadowDiskId,
&devices);
UNIT_ASSERT_SUCCESS(error);
});

const auto checkpointId =
TCheckpointInfo::MakeId("disk-1", "checkpoint-1");
const auto& secondAgent = state.GetAgents()[1];

// Change state of second agent where shadow disk
executor.WriteTx(
[&](TDiskRegistryDatabase db) mutable
{
auto affectedDisks = ChangeAgentState(
state,
db,
secondAgent,
NProto::AGENT_STATE_WARNING);

// State of shadow disk changed to "warning"
UNIT_ASSERT_VALUES_EQUAL(1, affectedDisks.size());
UNIT_ASSERT_VALUES_EQUAL(checkpointId, affectedDisks[0]);
});

// No migrations started
UNIT_ASSERT(state.IsMigrationListEmpty());

// Source disk notified.
UNIT_ASSERT(state.GetDisksToReallocate().FindPtr("disk-1"));
UNIT_ASSERT(!state.GetDisksToReallocate().FindPtr(checkpointId));
}

Y_UNIT_TEST(ShouldNotifySourceDiskWhenNodeIdForShadowDiskChanged)
{
TTestExecutor executor;
executor.WriteTx([&](TDiskRegistryDatabase db) { db.InitSchema(); });

TDiskRegistryState state = MakeDiskRegistryState();

// Create source disk
executor.WriteTx(
[&](TDiskRegistryDatabase db)
{
TVector<TDeviceConfig> devices;
TVector<TVector<TDeviceConfig>> replicas;
TVector<NProto::TDeviceMigration> migrations;
TVector<TString> deviceReplacementIds;
auto error = AllocateDisk(
db,
state,
"disk-1",
"", // placementGroupId
0, // placementPartitionIndex
40_GB,
devices);
UNIT_ASSERT_SUCCESS(error);
});

// create checkpoint
executor.WriteTx(
[&](TDiskRegistryDatabase db)
{
TString shadowDiskId;
TVector<TDeviceConfig> devices;
auto error = AllocateCheckpoint(
Now(),
db,
state,
"disk-1",
"checkpoint-1",
&shadowDiskId,
&devices);
UNIT_ASSERT_SUCCESS(error);
});
const auto checkpointId =
TCheckpointInfo::MakeId("disk-1", "checkpoint-1");

// Change NodeId for second disk agent
auto secondAgent = state.GetAgents()[1];
secondAgent.SetNodeId(42);
executor.WriteTx(
[&](TDiskRegistryDatabase db) mutable
{
UNIT_ASSERT_SUCCESS(
state.RegisterAgent(db, secondAgent, Now()).GetError());
});

// Source disk notified.
UNIT_ASSERT(state.GetDisksToReallocate().FindPtr("disk-1"));
UNIT_ASSERT(!state.GetDisksToReallocate().FindPtr(checkpointId));
}

Y_UNIT_TEST(ShouldNotifyVolumeOnce)
{
TTestExecutor executor;
executor.WriteTx([&](TDiskRegistryDatabase db) { db.InitSchema(); });

auto agentConfig = AgentConfig(
1,
{
Device("dev-1", "uuid-1", "rack-1"),
Device("dev-2", "uuid-2", "rack-1"),
});

TDiskRegistryState state =
TDiskRegistryStateBuilder().WithKnownAgents({agentConfig}).Build();

// Create source disk
executor.WriteTx(
[&](TDiskRegistryDatabase db)
{
TVector<TDeviceConfig> devices;
TVector<TVector<TDeviceConfig>> replicas;
TVector<NProto::TDeviceMigration> migrations;
TVector<TString> deviceReplacementIds;
auto error = AllocateDisk(
db,
state,
"disk-1",
"", // placementGroupId
0, // placementPartitionIndex
10_GB,
devices);
UNIT_ASSERT_SUCCESS(error);
});

// create checkpoint
executor.WriteTx(
[&](TDiskRegistryDatabase db)
{
TString shadowDiskId;
TVector<TDeviceConfig> devices;
auto error = AllocateCheckpoint(
Now(),
db,
state,
"disk-1",
"checkpoint-1",
&shadowDiskId,
&devices);
UNIT_ASSERT_SUCCESS(error);
});
const auto checkpointId =
TCheckpointInfo::MakeId("disk-1", "checkpoint-1");

// Change NodeId for disk agent
auto agent = state.GetAgents()[0];
agent.SetNodeId(42);
executor.WriteTx(
[&](TDiskRegistryDatabase db) mutable {
UNIT_ASSERT_SUCCESS(
state.RegisterAgent(db, agent, Now()).GetError());
});

// Source disk notified.
UNIT_ASSERT(state.GetDisksToReallocate().FindPtr("disk-1"));
UNIT_ASSERT(!state.GetDisksToReallocate().FindPtr(checkpointId));
}
}

} // namespace NCloud::NBlockStore::NStorage

0 comments on commit 0a64e1d

Please sign in to comment.