diff --git a/cloud/blockstore/libs/daemon/common/bootstrap.cpp b/cloud/blockstore/libs/daemon/common/bootstrap.cpp index c52bc578c17..f06be81fef5 100644 --- a/cloud/blockstore/libs/daemon/common/bootstrap.cpp +++ b/cloud/blockstore/libs/daemon/common/bootstrap.cpp @@ -895,11 +895,6 @@ void TBootstrapBase::Start() // order START_COMMON_COMPONENT(Scheduler); - if (Configs->Options->MemLock) { - LockProcessMemory(Log); - STORAGE_INFO("Process memory locked"); - } - auto restoreFuture = EndpointManager->RestoreEndpoints(); if (!Configs->Options->TemporaryServer) { auto balancerSwitch = VolumeBalancerSwitch; @@ -910,6 +905,11 @@ void TBootstrapBase::Start() } STORAGE_INFO("Started endpoints restoring"); + if (Configs->Options->MemLock) { + LockProcessMemory(Log); + STORAGE_INFO("Process memory locked"); + } + #undef START_COMMON_COMPONENT #undef START_KIKIMR_COMPONENT } diff --git a/cloud/blockstore/libs/endpoints/endpoint_manager.cpp b/cloud/blockstore/libs/endpoints/endpoint_manager.cpp index 1146dd98078..d1f2199dfda 100644 --- a/cloud/blockstore/libs/endpoints/endpoint_manager.cpp +++ b/cloud/blockstore/libs/endpoints/endpoint_manager.cpp @@ -724,21 +724,33 @@ TFuture TEndpointManager::RestoreSingleEndpoint( TCallContextPtr ctx, std::shared_ptr request) { - return Executor->Execute([ - weakSelf = weak_from_this(), - ctx, - request] () mutable -> NProto::TStartEndpointResponse - { - auto self = weakSelf.lock(); - if (!self) { - return TErrorResponse(E_FAIL, "EndpointManager is destroyed"); - } + return Executor->Execute( + [weakSelf = weak_from_this(), + ctx, + request]() mutable -> NProto::TStartEndpointResponse + { + auto self = weakSelf.lock(); + if (!self) { + return TErrorResponse(E_FAIL, "EndpointManager is destroyed"); + } - return self->StartEndpointImpl( - std::move(ctx), - std::move(request), - true); - }); + auto promise = + self->AddProcessingSocket(*request); + if (promise.HasValue()) { + return promise.ExtractValue(); + } + + auto socketPath = request->GetUnixSocketPath(); + + auto response = self->StartEndpointImpl( + std::move(ctx), + std::move(request), + true); + promise.SetValue(response); + + self->RemoveProcessingSocket(socketPath); + return response; + }); } NProto::TStartEndpointResponse TEndpointManager::DoStartEndpoint( @@ -750,12 +762,16 @@ NProto::TStartEndpointResponse TEndpointManager::DoStartEndpoint( return TErrorResponse(E_REJECTED, "endpoint is restoring now"); } + // We can have concurrent StartEndpoint and RestoreEndpoint call when the + // process starts. AddProcessingSocket should protect us from this race and + // delay the StartEndpoint call. auto promise = AddProcessingSocket(*request); if (promise.HasValue()) { return promise.ExtractValue(); } - auto response = StartEndpointImpl(std::move(ctx), std::move(request), false); + auto response = + StartEndpointImpl(std::move(ctx), std::move(request), false); promise.SetValue(response); RemoveProcessingSocket(socketPath); diff --git a/cloud/blockstore/libs/endpoints/endpoint_manager_ut.cpp b/cloud/blockstore/libs/endpoints/endpoint_manager_ut.cpp index 97256f4d466..6700df7361b 100644 --- a/cloud/blockstore/libs/endpoints/endpoint_manager_ut.cpp +++ b/cloud/blockstore/libs/endpoints/endpoint_manager_ut.cpp @@ -186,6 +186,62 @@ struct TTestDeviceFactory //////////////////////////////////////////////////////////////////////////////// +class TControlledDevice final: public NBD::IDevice +{ +public: + explicit TControlledDevice( + std::function()> deviceStubFunction) + : DeviceStubFunction(std::move(deviceStubFunction)) + {} + + NThreading::TFuture Start() override + { + return DeviceStubFunction(); + } + + NThreading::TFuture Stop(bool deleteDevice) override + { + Y_UNUSED(deleteDevice); + + return DeviceStubFunction(); + } + + NThreading::TFuture Resize(ui64 deviceSizeInBytes) override + { + Y_UNUSED(deviceSizeInBytes); + + return DeviceStubFunction(); + } + +private: + std::function()> DeviceStubFunction; +}; + +struct TTestControlledDeviceFactory: public NBD::IDeviceFactory +{ + std::function()> DeviceStubFunction; + + explicit TTestControlledDeviceFactory( + std::function()> deviceStubFunction) + : DeviceStubFunction(std::move(deviceStubFunction)) + {} + + NBD::IDevicePtr Create( + const TNetworkAddress& connectAddress, + TString deviceName, + ui64 blockCount, + ui32 blockSize) override + { + Y_UNUSED(connectAddress); + Y_UNUSED(deviceName); + Y_UNUSED(blockCount); + Y_UNUSED(blockSize); + return std::make_shared(DeviceStubFunction); + } +}; + +//////////////////////////////////////////////////////////////////////////////// + struct TTestEndpoint { NProto::TStartEndpointRequest Request; @@ -1939,6 +1995,90 @@ Y_UNIT_TEST_SUITE(TEndpointManagerTest) UNIT_ASSERT(!HasError(response)); } } + + Y_UNIT_TEST(ShouldNotCrashRestoreEndpointWhileStartingEndpoint) + { + TString nbdDevPrefix = CreateGuidAsString() + "_nbd"; + auto nbdDevice = nbdDevPrefix + "0"; + auto nbdDeviceAnother = nbdDevPrefix + "1"; + TFsPath(nbdDevice).Touch(); + TFsPath(nbdDeviceAnother).Touch(); + Y_DEFER { + TFsPath(nbdDevice).DeleteIfExists(); + TFsPath(nbdDeviceAnother).DeleteIfExists(); + }; + TTempDir dir; + auto socketPath = dir.Path() / "testSocket"; + TString diskId = "testDiskId"; + auto ipcType = NProto::IPC_NBD; + + TBootstrap bootstrap; + TMap mountedVolumes; + bootstrap.Service = CreateTestService(mountedVolumes); + + bootstrap.Options.NbdDevicePrefix = nbdDevPrefix; + + auto promise = NewPromise(); + + auto deviceFactory = std::make_shared( + [&]() { return promise.GetFuture(); }); + bootstrap.NbdDeviceFactory = deviceFactory; + + auto listener = std::make_shared(); + bootstrap.EndpointListeners = {{ NProto::IPC_NBD, listener }}; + + auto manager = CreateEndpointManager(bootstrap); + bootstrap.Start(); + Y_DEFER + { + bootstrap.Stop(); + }; + + NProto::TStartEndpointRequest request; + SetDefaultHeaders(request); + request.SetUnixSocketPath(socketPath.GetPath()); + request.SetDiskId(diskId); + request.SetClientId(TestClientId); + request.SetIpcType(ipcType); + request.SetNbdDeviceFile(nbdDevice); + request.SetPersistent(true); + + auto [str, error] = SerializeEndpoint(request); + UNIT_ASSERT_C(!HasError(error), error); + + bootstrap.EndpointStorage->AddEndpoint(socketPath.GetPath(), str); + + request.SetNbdDeviceFile(nbdDeviceAnother); + socketPath.DeleteIfExists(); + UNIT_ASSERT(!socketPath.Exists()); + + { + auto startEndpointFuture = StartEndpoint(*manager, request); + + auto waitForTaskScheduleFuture = + bootstrap.Scheduler->WaitForTaskSchedule(); + auto restoreEndpointsFuture = bootstrap.Executor->Execute( + [manager = manager.get(), + executor = bootstrap.Executor.get()]() mutable + { + auto future = manager->RestoreEndpoints(); + executor->WaitFor(future); + }); + + // We start RestoreEndpoints with other args and it execute + // concurrently with StartEndpoint request, this means that + // RestoreSingleEndpoint should be rejected until StartEndpoint + // request processing ends. On reject restoring client will schedule + // task to retry request. + waitForTaskScheduleFuture.Wait(); + + promise.SetValue(MakeError(S_OK, {})); + + startEndpointFuture.Wait(); + bootstrap.Scheduler->RunAllScheduledTasks(); + restoreEndpointsFuture.Wait(); + } + } } } // namespace NCloud::NBlockStore::NServer diff --git a/cloud/storage/core/libs/common/scheduler_test.cpp b/cloud/storage/core/libs/common/scheduler_test.cpp index e32a7273239..e195d4338f7 100644 --- a/cloud/storage/core/libs/common/scheduler_test.cpp +++ b/cloud/storage/core/libs/common/scheduler_test.cpp @@ -14,6 +14,10 @@ void TTestScheduler::Schedule( with_lock (CallbacksLock) { Callbacks.push_back(callback); + if (GotNewCallback) { + GotNewCallback->SetValue(); + } + GotNewCallback = std::nullopt; } } @@ -29,4 +33,14 @@ void TTestScheduler::RunAllScheduledTasks() } } +NThreading::TFuture TTestScheduler::WaitForTaskSchedule() +{ + with_lock (CallbacksLock) { + if (!GotNewCallback) { + GotNewCallback = NThreading::NewPromise(); + } + return GotNewCallback->GetFuture(); + } +} + } // namespace NCloud diff --git a/cloud/storage/core/libs/common/scheduler_test.h b/cloud/storage/core/libs/common/scheduler_test.h index 7493702c9e6..746a20f1950 100644 --- a/cloud/storage/core/libs/common/scheduler_test.h +++ b/cloud/storage/core/libs/common/scheduler_test.h @@ -2,6 +2,7 @@ #include "public.h" +#include "library/cpp/threading/future/core/future.h" #include "scheduler.h" #include @@ -17,6 +18,7 @@ class TTestScheduler final private: TMutex CallbacksLock; TVector Callbacks; + std::optional> GotNewCallback; public: void Start() override {} @@ -28,6 +30,8 @@ class TTestScheduler final TCallback callback) override; void RunAllScheduledTasks(); + + NThreading::TFuture WaitForTaskSchedule(); }; } // namespace NCloud