Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[NBS] Asynchronous disks allocation #2763

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cloud/blockstore/config/storage.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1074,4 +1074,7 @@ message TStorageServiceConfig

// Enabling direct copying of data between disk agents.
optional bool UseDirectCopyRange = 394;

// Maximum number of pending allocation requests per one disk.
optional uint32 MaxNonReplicatedDiskAllocationRequests = 395;
}
1 change: 1 addition & 0 deletions cloud/blockstore/libs/storage/core/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,7 @@ TDuration MSeconds(ui32 value)
xxx(NonReplicatedDontSuspendDevices, bool, false )\
xxx(AddClientRetryTimeoutIncrement, TDuration, MSeconds(100) )\
xxx(MaxNonReplicatedDiskDeallocationRequests, ui32, 16 )\
xxx(MaxNonReplicatedDiskAllocationRequests, ui32, 16 )\
xxx(BalancerActionDelayInterval, TDuration, Seconds(3) )\
\
xxx(UseMirrorResync, bool, false )\
Expand Down
1 change: 1 addition & 0 deletions cloud/blockstore/libs/storage/core/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,7 @@ class TStorageConfig
TDuration GetDiskRegistryBackupPeriod() const;
TString GetDiskRegistryBackupDirPath() const;

ui32 GetMaxNonReplicatedDiskAllocationRequests() const;
ui32 GetMaxNonReplicatedDiskDeallocationRequests() const;

TDuration GetDiskRegistryMetricsCachePeriod() const;
Expand Down
15 changes: 15 additions & 0 deletions cloud/blockstore/libs/storage/disk_registry/disk_registry_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ class TDiskRegistryActor final
// Pending requests
TDeque<TPendingRequest> PendingRequests;

THashMap<TDiskId, TVector<TRequestInfoPtr>> PendingDiskAllocationRequests;
THashMap<TDiskId, TVector<TRequestInfoPtr>> PendingDiskDeallocationRequests;

bool BrokenDisksDestructionInProgress = false;
Expand Down Expand Up @@ -227,6 +228,20 @@ class TDiskRegistryActor final
TDiskRegistryDatabase& db,
TDiskRegistryStateSnapshot& args);

void AddPendingAllocation(
const NActors::TActorContext& ctx,
const TString& diskId,
TRequestInfoPtr requestInfoPtr);

void ReplyToPendingAllocations(
const NActors::TActorContext& ctx,
const TString& diskId);

void ReplyToPendingAllocations(
const NActors::TActorContext& ctx,
TVector<TRequestInfoPtr>& requestInfos,
NProto::TError error);

void AddPendingDeallocation(
const NActors::TActorContext& ctx,
const TString& diskId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ void TDiskRegistryActor::ExecuteAddDisk(
&result);

args.Devices = std::move(result.Devices);
args.DirtyDevices = std::move(result.DirtyDevices);
args.DeviceMigrations = std::move(result.Migrations);
args.Replicas = std::move(result.Replicas);
args.DeviceReplacementUUIDs = std::move(result.DeviceReplacementIds);
Expand All @@ -157,6 +158,9 @@ void TDiskRegistryActor::CompleteAddDisk(
TStringBuilder devices;
OutputDevices(args.Devices, devices);

TStringBuilder dirtyDevices;
OutputDevices(args.DirtyDevices, dirtyDevices);

TStringBuilder replicas;
replicas << "[";
if (!args.Replicas.empty()) {
Expand All @@ -180,11 +184,12 @@ void TDiskRegistryActor::CompleteAddDisk(
migrations << "]";

LOG_INFO(ctx, TBlockStoreComponents::DISK_REGISTRY,
"[%lu] AddDisk success. DiskId=%s Devices=%s Replicas=%s"
" Migrations=%s",
"[%lu] AddDisk success. DiskId=%s Devices=%s DirtyDevices=%s"
" Replicas=%s Migrations=%s",
TabletID(),
args.DiskId.Quote().c_str(),
devices.c_str(),
dirtyDevices.c_str(),
replicas.c_str(),
migrations.c_str()
);
Expand Down Expand Up @@ -240,12 +245,70 @@ void TDiskRegistryActor::CompleteAddDisk(
response->Record.SetMuteIOErrors(args.MuteIOErrors);
}

NCloud::Reply(ctx, *args.RequestInfo, std::move(response));
if (HasError(args.Error) || args.DirtyDevices.empty()) {
NCloud::Reply(ctx, *args.RequestInfo, std::move(response));
} else {
AddPendingAllocation(ctx, args.DiskId, args.RequestInfo);
SecureErase(ctx);
}

DestroyBrokenDisks(ctx);
NotifyUsers(ctx);
}

void TDiskRegistryActor::AddPendingAllocation(
const NActors::TActorContext& ctx,
const TString& diskId,
TRequestInfoPtr requestInfo)
{
auto& requestInfos = PendingDiskAllocationRequests[diskId];

if (requestInfos.size() > Config->GetMaxNonReplicatedDiskAllocationRequests()) {
LOG_WARN(ctx, TBlockStoreComponents::DISK_REGISTRY,
"Too many pending allocation requests (%lu) for disk %s. "
"Reject all requests.",
requestInfos.size(),
diskId.Quote().c_str());

ReplyToPendingAllocations(ctx, requestInfos, MakeError(E_REJECTED));
}

requestInfos.emplace_back(std::move(requestInfo));
}

void TDiskRegistryActor::ReplyToPendingAllocations(
const NActors::TActorContext& ctx,
TVector<TRequestInfoPtr>& requestInfos,
NProto::TError error)
{
for (auto& requestInfo: requestInfos) {
NCloud::Reply(
ctx,
*requestInfo,
std::make_unique<TEvDiskRegistry::TEvAllocateDiskResponse>(error));
}
requestInfos.clear();
}

void TDiskRegistryActor::ReplyToPendingAllocations(
const NActors::TActorContext& ctx,
const TString& diskId)
{
auto it = PendingDiskAllocationRequests.find(diskId);
if (it == PendingDiskAllocationRequests.end()) {
return;
}

LOG_INFO(ctx, TBlockStoreComponents::DISK_REGISTRY,
"Reply to pending allocation requests. DiskId=%s PendingRequests=%d",
diskId.Quote().c_str(),
static_cast<int>(it->second.size()));

ReplyToPendingAllocations(ctx, it->second, MakeError(S_OK));

PendingDiskAllocationRequests.erase(it);
}

////////////////////////////////////////////////////////////////////////////////

void TDiskRegistryActor::HandleDeallocateDisk(
Expand Down Expand Up @@ -379,7 +442,7 @@ void TDiskRegistryActor::ReplyToPendingDeallocations(
}

LOG_INFO(ctx, TBlockStoreComponents::DISK_REGISTRY,
"Reply to pending deallocation requests. DiskId=%s PendingRquests=%d",
"Reply to pending deallocation requests. DiskId=%s PendingRequests=%d",
diskId.Quote().c_str(),
static_cast<int>(it->second.size()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,8 +279,8 @@ void TDiskRegistryActor::ExecuteCleanupDevices(
TTxDiskRegistry::TCleanupDevices& args)
{
TDiskRegistryDatabase db(tx.DB);
args.SyncDeallocatedDisks =
State->MarkDevicesAsClean(ctx.Now(), db, args.Devices);
std::tie(args.SyncAllocatedDisks, args.SyncDeallocatedDisks) =
std::move(State->MarkDevicesAsClean(ctx.Now(), db, args.Devices));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

без move

}

void TDiskRegistryActor::CompleteCleanupDevices(
Expand All @@ -293,6 +293,10 @@ void TDiskRegistryActor::CompleteCleanupDevices(
for (const auto& diskId: args.SyncDeallocatedDisks) {
ReplyToPendingDeallocations(ctx, diskId);
}

for (const auto& diskId: args.SyncAllocatedDisks) {
ReplyToPendingAllocations(ctx, diskId);
}
}

////////////////////////////////////////////////////////////////////////////////
Expand Down
107 changes: 90 additions & 17 deletions cloud/blockstore/libs/storage/disk_registry/disk_registry_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -749,7 +749,7 @@ void TDiskRegistryState::ProcessDirtyDevices(TVector<TDirtyDevice> dirtyDevices)
{
for (auto&& [uuid, diskId]: dirtyDevices) {
if (!diskId.empty()) {
auto error = PendingCleanup.Insert(diskId, std::move(uuid));
auto error = PendingCleanup.Insert(diskId, std::move(uuid), /*allocation=*/false);
if (HasError(error)) {
ReportDiskRegistryInsertToPendingCleanupFailed(
TStringBuilder()
Expand Down Expand Up @@ -1299,16 +1299,23 @@ NProto::TError TDiskRegistryState::ReplaceDeviceWithoutDiskStateUpdate(
}

if (!manual && !deviceReplacementId.empty()) {
auto cleaningDiskId =
auto [allocating, deallocating] =
PendingCleanup.FindDiskId(deviceReplacementId);
if (!cleaningDiskId.empty() && cleaningDiskId != diskId) {
if (!allocating || *allocating != diskId) {
allocating = std::nullopt;
}
if (!deallocating || *deallocating != diskId) {
deallocating = std::nullopt;
}
auto owningDisk = allocating ? allocating : deallocating;
if (owningDisk) {
return MakeError(
E_ARGUMENT,
TStringBuilder()
<< "can't allocate specific device "
<< deviceReplacementId.Quote() << " for disk " << diskId
<< " since it is in pending cleanup for disk "
<< cleaningDiskId);
<< *owningDisk);
}
}

Expand Down Expand Up @@ -1414,7 +1421,7 @@ NProto::TError TDiskRegistryState::ReplaceDeviceWithoutDiskStateUpdate(
UpdatePlacementGroup(db, diskId, disk, "ReplaceDevice");
UpdateAndReallocateDisk(db, diskId, disk);

error = PendingCleanup.Insert(diskId, deviceId);
error = PendingCleanup.Insert(diskId, deviceId, /*allocation=*/false);
if (HasError(error)) {
ReportDiskRegistryInsertToPendingCleanupFailed(
TStringBuilder() << "An error occurred while replacing device: "
Expand Down Expand Up @@ -2756,6 +2763,37 @@ NProto::TError TDiskRegistryState::AllocateSimpleDisk(
params.BlockSize << " bytes");
}

// check that we can secure erase each dirty allocated device
TVector<TString> dirtyDevices;
for (const auto& device: allocatedDevices) {
const auto& uuid = device.GetDeviceUUID();
if (!IsDirtyDevice(uuid)) {
continue;
}
// if we can't secure erase one of allocated disk's dirty devices, we can't allocate the disk
if (!CanSecureErase(uuid)) {
onError();

return MakeError(E_BS_DISK_ALLOCATION_FAILED, TStringBuilder() <<
"can't secure erase device " << uuid);
}
dirtyDevices.push_back(uuid);
result->DirtyDevices.push_back(device);
}

if (dirtyDevices) {
NProto::TError cleanupError = PendingCleanup.Insert(params.DiskId, std::move(dirtyDevices), /*allocation=*/true);
// if we can't secure erase one of allocated disk's dirty devices, we can't allocate the disk
if (HasError(cleanupError)) {
onError();

ReportDiskRegistryInsertToPendingCleanupFailed(
TStringBuilder() << "An error occurred while allocating disk: "
<< FormatError(cleanupError));
return cleanupError;
}
}

for (const auto& device: allocatedDevices) {
disk.Devices.push_back(device.GetDeviceUUID());
}
Expand Down Expand Up @@ -2851,7 +2889,7 @@ NProto::TError TDiskRegistryState::DeallocateDisk(
JoinSeq(", ", devicesAllowedToBeCleaned).c_str());

auto error =
PendingCleanup.Insert(diskId, std::move(devicesAllowedToBeCleaned));
PendingCleanup.Insert(diskId, std::move(devicesAllowedToBeCleaned), /*allocation=*/false);
if (HasError(error)) {
ReportDiskRegistryInsertToPendingCleanupFailed(
TStringBuilder() << "An error occurred while deallocating disk: "
Expand Down Expand Up @@ -2998,6 +3036,11 @@ auto TDiskRegistryState::DeallocateSimpleDisk(
db.UpdateDirtyDevice(uuid, diskId);
}

auto agents = FindDiskDevicesAgents(disk);
for (const auto* agent: agents) {
DeviceList.UpdateDevices(*agent, DevicePoolConfigs);
}

DeleteAllDeviceMigrations(diskId);
DeleteDisk(db, diskId);

Expand Down Expand Up @@ -3855,16 +3898,20 @@ bool TDiskRegistryState::MarkDeviceAsDirty(
return true;
}

TDiskRegistryState::TDiskId TDiskRegistryState::MarkDeviceAsClean(
TDiskRegistryState::TOpt2Disk TDiskRegistryState::MarkDeviceAsClean(
TInstant now,
TDiskRegistryDatabase& db,
const TDeviceId& uuid)
{
auto ret = MarkDevicesAsClean(now, db, TVector<TDeviceId>{uuid});
return ret.empty() ? "" : ret[0];
auto [alloc, dealloc] = MarkDevicesAsClean(now, db, TVector<TDeviceId>{uuid});
return {
alloc.empty() ? std::nullopt : std::make_optional(std::move(alloc[0])),
dealloc.empty() ? std::nullopt
: std::make_optional(std::move(dealloc[0]))};
}

TVector<TDiskRegistryState::TDiskId> TDiskRegistryState::MarkDevicesAsClean(
std::pair<TDiskRegistryState::TAllocatedDisksList, TDiskRegistryState::TDellocatedDisksList>
TDiskRegistryState::MarkDevicesAsClean(
TInstant now,
TDiskRegistryDatabase& db,
const TVector<TDeviceId>& uuids)
Expand All @@ -3878,14 +3925,19 @@ TVector<TDiskRegistryState::TDiskId> TDiskRegistryState::MarkDevicesAsClean(
}
}

TVector<TDiskId> ret;
TAllocatedDisksList allocatedDisks;
TDellocatedDisksList dellocatedDisks;
for (const auto& uuid: TryUpdateDevices(now, db, uuids)) {
if (auto diskId = PendingCleanup.EraseDevice(uuid); !diskId.empty()) {
ret.push_back(std::move(diskId));
auto [allocatedDisk, deallocatedDisk] = PendingCleanup.EraseDevice(uuid);
if (allocatedDisk) {
allocatedDisks.push_back(std::move(*allocatedDisk));
}
if (deallocatedDisk) {
dellocatedDisks.push_back(std::move(*deallocatedDisk));
}
}

return ret;
return {std::move(allocatedDisks), std::move(dellocatedDisks)};
}

bool TDiskRegistryState::TryUpdateDevice(
Expand Down Expand Up @@ -4811,7 +4863,7 @@ void TDiskRegistryState::RemoveFinishedMigrations(

DeviceList.ReleaseDevice(m.DeviceId);
db.UpdateDirtyDevice(m.DeviceId, diskId);
auto error = PendingCleanup.Insert(diskId, m.DeviceId);
auto error = PendingCleanup.Insert(diskId, m.DeviceId, /*allocation=*/false);
if (HasError(error)) {
ReportDiskRegistryInsertToPendingCleanupFailed(
TStringBuilder()
Expand Down Expand Up @@ -5072,8 +5124,9 @@ bool TDiskRegistryState::HasDependentSsdDisks(
continue;
}

auto [allocating, deallocating] = PendingCleanup.FindDiskId(d.GetDeviceUUID());
if (d.GetPoolKind() == NProto::DEVICE_POOL_KIND_LOCAL &&
PendingCleanup.FindDiskId(d.GetDeviceUUID()))
(allocating || deallocating))
{
return true;
}
Expand Down Expand Up @@ -5598,6 +5651,24 @@ auto TDiskRegistryState::FindDeviceLocation(const TDeviceId& deviceId) const
return const_cast<TDiskRegistryState*>(this)->FindDeviceLocation(deviceId);
}

auto TDiskRegistryState::FindDiskDevicesAgents(const TDiskState& disk) const
-> std::set<const NProto::TAgentConfig*>
{
std::set<TString> diskAgents;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

THashSet<> diskAgentIds

for (const auto& uuid: disk.Devices) {
// TODO: handle when not found
diskAgents.insert(DeviceList.FindAgentId(uuid));
}

std::set<const NProto::TAgentConfig*> agents;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

THashSet

for (const auto& agentId: diskAgents) {
// TODO: handle when not found
agents.insert(AgentList.FindAgent(agentId));
}

return std::move(agents);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

без move

}

auto TDiskRegistryState::FindDeviceLocation(const TDeviceId& deviceId)
-> std::pair<NProto::TAgentConfig*, NProto::TDeviceConfig*>
{
Expand Down Expand Up @@ -6354,7 +6425,9 @@ NProto::TDiskRegistryStateBackup TDiskRegistryState::BackupState() const
transform(GetDirtyDevices(), backup.MutableDirtyDevices(), [this] (auto& x) {
NProto::TDiskRegistryStateBackup::TDirtyDevice dd;
dd.SetId(x.GetDeviceUUID());
dd.SetDiskId(PendingCleanup.FindDiskId(x.GetDeviceUUID()));
auto [allocating, deallocating] = PendingCleanup.FindDiskId(x.GetDeviceUUID()); // TODO: need to backup
Y_UNUSED(allocating);
dd.SetDiskId(*deallocating);

return dd;
});
Expand Down
Loading
Loading