From c18e19aaf3279819e6c4545267932f91aa8fef71 Mon Sep 17 00:00:00 2001 From: Stepanyuk Vladislav Date: Mon, 13 Jan 2025 05:58:18 +0000 Subject: [PATCH 01/10] issue-2841: add lock on StartEndpointImpl method call --- .../libs/daemon/common/bootstrap.cpp | 10 ++-- .../libs/endpoints/endpoint_manager.cpp | 59 ++++++++++++++++++- .../libs/endpoints/session_manager.cpp | 9 +-- 3 files changed, 67 insertions(+), 11 deletions(-) diff --git a/cloud/blockstore/libs/daemon/common/bootstrap.cpp b/cloud/blockstore/libs/daemon/common/bootstrap.cpp index c52bc578c17..f06be81fef5 100644 --- a/cloud/blockstore/libs/daemon/common/bootstrap.cpp +++ b/cloud/blockstore/libs/daemon/common/bootstrap.cpp @@ -895,11 +895,6 @@ void TBootstrapBase::Start() // order START_COMMON_COMPONENT(Scheduler); - if (Configs->Options->MemLock) { - LockProcessMemory(Log); - STORAGE_INFO("Process memory locked"); - } - auto restoreFuture = EndpointManager->RestoreEndpoints(); if (!Configs->Options->TemporaryServer) { auto balancerSwitch = VolumeBalancerSwitch; @@ -910,6 +905,11 @@ void TBootstrapBase::Start() } STORAGE_INFO("Started endpoints restoring"); + if (Configs->Options->MemLock) { + LockProcessMemory(Log); + STORAGE_INFO("Process memory locked"); + } + #undef START_COMMON_COMPONENT #undef START_KIKIMR_COMPONENT } diff --git a/cloud/blockstore/libs/endpoints/endpoint_manager.cpp b/cloud/blockstore/libs/endpoints/endpoint_manager.cpp index 1146dd98078..6eae0632085 100644 --- a/cloud/blockstore/libs/endpoints/endpoint_manager.cpp +++ b/cloud/blockstore/libs/endpoints/endpoint_manager.cpp @@ -2,6 +2,7 @@ #include "endpoint_events.h" #include "endpoint_listener.h" +#include "library/cpp/threading/future/async_semaphore.h" #include "session_manager.h" #include @@ -427,6 +428,8 @@ class TEndpointManager final THashMap Endpoints; + THashMap EndpointsLock; + NClient::IMetricClientPtr RestoringClient; TSet RestoringEndpoints; @@ -567,6 +570,11 @@ class TEndpointManager final return false; } + NProto::TStartEndpointResponse StartEndpointImplWithLock( + TCallContextPtr ctx, + std::shared_ptr request, + bool restoring); + NProto::TStartEndpointResponse StartEndpointImpl( TCallContextPtr ctx, std::shared_ptr request, @@ -734,7 +742,7 @@ TFuture TEndpointManager::RestoreSingleEndpoint( return TErrorResponse(E_FAIL, "EndpointManager is destroyed"); } - return self->StartEndpointImpl( + return self->StartEndpointImplWithLock( std::move(ctx), std::move(request), true); @@ -755,13 +763,60 @@ NProto::TStartEndpointResponse TEndpointManager::DoStartEndpoint( return promise.ExtractValue(); } - auto response = StartEndpointImpl(std::move(ctx), std::move(request), false); + auto response = + StartEndpointImplWithLock(std::move(ctx), std::move(request), false); promise.SetValue(response); RemoveProcessingSocket(socketPath); return response; } +// See issue-2841 +// There are two maps Endpoints: one in the TEndpointManager, another in +// TSessionManager. If we try to CreateSession with endpoint, for which session +// was already created (endpoint was already inserted to the +// TSessionManager::Endpoints), we will crash. Thats why we have map Endpoints +// in TEndpointManager, and at the start of method StartEndpointImpl we check if +// endpoint exists in that map. If it is already exists we call AlterEndpoint. +// But we insert new element to the TEndpointManager::Endpoints only at the end +// of method StartEndpointImpl and between inserting to +// TSessionManager::Endpoints and inserting to TEndpointManager::Endpoints there +// are few async waits, which causes race condition: first routine calls +// StartEndpointImpl, insert endpoint to TSessionManager::Endpoints and fall +// asleep in some wait before inserting TEndpointManager::Endpoints, second +// routine calls StartEndpointImpl, then calls SessionManager::CreateSession and +// crash process. Thats why we call StartEndpointImpl with lock. +NProto::TStartEndpointResponse TEndpointManager::StartEndpointImplWithLock( + TCallContextPtr ctx, + std::shared_ptr request, + bool restoring) +{ + const auto& socketPath = request->GetUnixSocketPath(); + auto lock_it = EndpointsLock.find(socketPath); + auto lock = [&] () { + if (lock_it == EndpointsLock.end()) { + return EndpointsLock.emplace( + std::make_pair(socketPath, TAsyncSemaphore::Make(1))).first->second; + } + return lock_it->second; + }(); + Y_DEFER { + // 2 because one pointer in the map, second on the stack. + if (lock.RefCount() == 2) { + EndpointsLock.erase(socketPath); + } + }; + + NProto::TStartEndpointResponse result; + + auto futureLock = lock->AcquireAsync(); + Executor->WaitFor(futureLock.IgnoreResult()); + + auto deferGuard = lock->MakeAutoRelease(); + + return StartEndpointImpl(std::move(ctx), std::move(request), restoring); +} + NProto::TStartEndpointResponse TEndpointManager::StartEndpointImpl( TCallContextPtr ctx, std::shared_ptr request, diff --git a/cloud/blockstore/libs/endpoints/session_manager.cpp b/cloud/blockstore/libs/endpoints/session_manager.cpp index b42c5cb329e..d23a8fb3807 100644 --- a/cloud/blockstore/libs/endpoints/session_manager.cpp +++ b/cloud/blockstore/libs/endpoints/session_manager.cpp @@ -428,10 +428,11 @@ TSessionManager::TSessionOrError TSessionManager::CreateSessionImpl( auto [it, inserted] = Endpoints.emplace( request.GetUnixSocketPath(), endpoint); - STORAGE_VERIFY( - inserted, - TWellKnownEntityTypes::ENDPOINT, - request.GetUnixSocketPath()); + if (!inserted) { + return TErrorResponse( + E_REJECTED, + "Session with this socket path was created"); + } } return TSessionInfo { From f945c4cdc6bcf2765c64ce10ddbef3d8e4e95be2 Mon Sep 17 00:00:00 2001 From: Stepanyuk Vladislav Date: Mon, 13 Jan 2025 08:04:39 +0000 Subject: [PATCH 02/10] issue-2841: fix use after free --- cloud/blockstore/libs/endpoints/endpoint_manager.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cloud/blockstore/libs/endpoints/endpoint_manager.cpp b/cloud/blockstore/libs/endpoints/endpoint_manager.cpp index 6eae0632085..a33e9e15748 100644 --- a/cloud/blockstore/libs/endpoints/endpoint_manager.cpp +++ b/cloud/blockstore/libs/endpoints/endpoint_manager.cpp @@ -791,7 +791,7 @@ NProto::TStartEndpointResponse TEndpointManager::StartEndpointImplWithLock( std::shared_ptr request, bool restoring) { - const auto& socketPath = request->GetUnixSocketPath(); + const TString socketPath = request->GetUnixSocketPath(); auto lock_it = EndpointsLock.find(socketPath); auto lock = [&] () { if (lock_it == EndpointsLock.end()) { From a3b13e1844225166c25ec846d24b76dbf52c1df7 Mon Sep 17 00:00:00 2001 From: Stepanyuk Vladislav Date: Tue, 14 Jan 2025 10:34:05 +0000 Subject: [PATCH 03/10] issue-2841: correct comment --- .../libs/endpoints/endpoint_manager.cpp | 30 ++++++++----------- .../libs/endpoints/session_manager.cpp | 9 +++--- 2 files changed, 17 insertions(+), 22 deletions(-) diff --git a/cloud/blockstore/libs/endpoints/endpoint_manager.cpp b/cloud/blockstore/libs/endpoints/endpoint_manager.cpp index a33e9e15748..2d799033fbb 100644 --- a/cloud/blockstore/libs/endpoints/endpoint_manager.cpp +++ b/cloud/blockstore/libs/endpoints/endpoint_manager.cpp @@ -772,20 +772,18 @@ NProto::TStartEndpointResponse TEndpointManager::DoStartEndpoint( } // See issue-2841 -// There are two maps Endpoints: one in the TEndpointManager, another in -// TSessionManager. If we try to CreateSession with endpoint, for which session -// was already created (endpoint was already inserted to the -// TSessionManager::Endpoints), we will crash. Thats why we have map Endpoints -// in TEndpointManager, and at the start of method StartEndpointImpl we check if -// endpoint exists in that map. If it is already exists we call AlterEndpoint. -// But we insert new element to the TEndpointManager::Endpoints only at the end -// of method StartEndpointImpl and between inserting to -// TSessionManager::Endpoints and inserting to TEndpointManager::Endpoints there -// are few async waits, which causes race condition: first routine calls -// StartEndpointImpl, insert endpoint to TSessionManager::Endpoints and fall -// asleep in some wait before inserting TEndpointManager::Endpoints, second -// routine calls StartEndpointImpl, then calls SessionManager::CreateSession and -// crash process. Thats why we call StartEndpointImpl with lock. +// There are two maps Endpoints: one in the TEndpointManager, the other in +// TSessionManager. If we try to call TSessionManager::CreateSession method +// using an endpoint, for which a session has already been created (endpoint has +// already been inserted into TSessionManager::Endpoints), a crash will occur. +// To prevent this, we use the TEndpointManager::Endpoints map and check for the +// existence of an endpoint at the beginning of the StartEndpointImpl method. +// If the endpoint already exists, we call AlterEndpoint. However, insertion +// into TEndpointManager::Endpoints is delayed until the end of StartEndpointImpl. +// In the meantime, multiple asynchronous operations are performed, which can +// lead to race conditions. To mitigate this, we invoke StartEndpointImpl with a +// lock, ensuring synchronized access to the maps and preventing concurrent +// issues. NProto::TStartEndpointResponse TEndpointManager::StartEndpointImplWithLock( TCallContextPtr ctx, std::shared_ptr request, @@ -801,14 +799,12 @@ NProto::TStartEndpointResponse TEndpointManager::StartEndpointImplWithLock( return lock_it->second; }(); Y_DEFER { - // 2 because one pointer in the map, second on the stack. + // 2 because the first pointer is in the map, the second is on the stack. if (lock.RefCount() == 2) { EndpointsLock.erase(socketPath); } }; - NProto::TStartEndpointResponse result; - auto futureLock = lock->AcquireAsync(); Executor->WaitFor(futureLock.IgnoreResult()); diff --git a/cloud/blockstore/libs/endpoints/session_manager.cpp b/cloud/blockstore/libs/endpoints/session_manager.cpp index d23a8fb3807..b42c5cb329e 100644 --- a/cloud/blockstore/libs/endpoints/session_manager.cpp +++ b/cloud/blockstore/libs/endpoints/session_manager.cpp @@ -428,11 +428,10 @@ TSessionManager::TSessionOrError TSessionManager::CreateSessionImpl( auto [it, inserted] = Endpoints.emplace( request.GetUnixSocketPath(), endpoint); - if (!inserted) { - return TErrorResponse( - E_REJECTED, - "Session with this socket path was created"); - } + STORAGE_VERIFY( + inserted, + TWellKnownEntityTypes::ENDPOINT, + request.GetUnixSocketPath()); } return TSessionInfo { From 59631f914d0c1deb377b54462e6d975f777d7046 Mon Sep 17 00:00:00 2001 From: Stepanyuk Vladislav Date: Thu, 16 Jan 2025 12:02:59 +0000 Subject: [PATCH 04/10] issue-2841: correct issues and add test --- .../libs/endpoints/endpoint_manager.cpp | 100 +++++++----------- .../libs/endpoints/endpoint_manager.h | 5 + .../libs/endpoints/endpoint_manager_ut.cpp | 43 ++++++++ 3 files changed, 87 insertions(+), 61 deletions(-) diff --git a/cloud/blockstore/libs/endpoints/endpoint_manager.cpp b/cloud/blockstore/libs/endpoints/endpoint_manager.cpp index 2d799033fbb..85c6591819f 100644 --- a/cloud/blockstore/libs/endpoints/endpoint_manager.cpp +++ b/cloud/blockstore/libs/endpoints/endpoint_manager.cpp @@ -554,6 +554,10 @@ class TEndpointManager final }); } + TFuture RestoreSingleEndpointForTesting( + TCallContextPtr ctx, + std::shared_ptr request) override; + TFuture RestoreSingleEndpoint( TCallContextPtr ctx, std::shared_ptr request); @@ -570,10 +574,6 @@ class TEndpointManager final return false; } - NProto::TStartEndpointResponse StartEndpointImplWithLock( - TCallContextPtr ctx, - std::shared_ptr request, - bool restoring); NProto::TStartEndpointResponse StartEndpointImpl( TCallContextPtr ctx, @@ -728,25 +728,45 @@ class TEndpointManager final //////////////////////////////////////////////////////////////////////////////// +TFuture +TEndpointManager::RestoreSingleEndpointForTesting( + TCallContextPtr ctx, + std::shared_ptr request) +{ + return RestoreSingleEndpoint(ctx, std::move(request)); +} + TFuture TEndpointManager::RestoreSingleEndpoint( TCallContextPtr ctx, std::shared_ptr request) { - return Executor->Execute([ - weakSelf = weak_from_this(), - ctx, - request] () mutable -> NProto::TStartEndpointResponse - { - auto self = weakSelf.lock(); - if (!self) { - return TErrorResponse(E_FAIL, "EndpointManager is destroyed"); - } + return Executor->Execute( + [weakSelf = weak_from_this(), + ctx, + request]() mutable -> NProto::TStartEndpointResponse + { + auto self = weakSelf.lock(); + if (!self) { + return TErrorResponse(E_FAIL, "EndpointManager is destroyed"); + } - return self->StartEndpointImplWithLock( - std::move(ctx), - std::move(request), - true); - }); + auto promise = + self->AddProcessingSocket(*request); + if (promise.HasValue()) { + return promise.ExtractValue(); + } + + auto socketPath = request->GetUnixSocketPath(); + + auto response = self->StartEndpointImpl( + std::move(ctx), + std::move(request), + true); + promise.SetValue(response); + + self->RemoveProcessingSocket(socketPath); + return response; + }); } NProto::TStartEndpointResponse TEndpointManager::DoStartEndpoint( @@ -764,55 +784,13 @@ NProto::TStartEndpointResponse TEndpointManager::DoStartEndpoint( } auto response = - StartEndpointImplWithLock(std::move(ctx), std::move(request), false); + StartEndpointImpl(std::move(ctx), std::move(request), false); promise.SetValue(response); RemoveProcessingSocket(socketPath); return response; } -// See issue-2841 -// There are two maps Endpoints: one in the TEndpointManager, the other in -// TSessionManager. If we try to call TSessionManager::CreateSession method -// using an endpoint, for which a session has already been created (endpoint has -// already been inserted into TSessionManager::Endpoints), a crash will occur. -// To prevent this, we use the TEndpointManager::Endpoints map and check for the -// existence of an endpoint at the beginning of the StartEndpointImpl method. -// If the endpoint already exists, we call AlterEndpoint. However, insertion -// into TEndpointManager::Endpoints is delayed until the end of StartEndpointImpl. -// In the meantime, multiple asynchronous operations are performed, which can -// lead to race conditions. To mitigate this, we invoke StartEndpointImpl with a -// lock, ensuring synchronized access to the maps and preventing concurrent -// issues. -NProto::TStartEndpointResponse TEndpointManager::StartEndpointImplWithLock( - TCallContextPtr ctx, - std::shared_ptr request, - bool restoring) -{ - const TString socketPath = request->GetUnixSocketPath(); - auto lock_it = EndpointsLock.find(socketPath); - auto lock = [&] () { - if (lock_it == EndpointsLock.end()) { - return EndpointsLock.emplace( - std::make_pair(socketPath, TAsyncSemaphore::Make(1))).first->second; - } - return lock_it->second; - }(); - Y_DEFER { - // 2 because the first pointer is in the map, the second is on the stack. - if (lock.RefCount() == 2) { - EndpointsLock.erase(socketPath); - } - }; - - auto futureLock = lock->AcquireAsync(); - Executor->WaitFor(futureLock.IgnoreResult()); - - auto deferGuard = lock->MakeAutoRelease(); - - return StartEndpointImpl(std::move(ctx), std::move(request), restoring); -} - NProto::TStartEndpointResponse TEndpointManager::StartEndpointImpl( TCallContextPtr ctx, std::shared_ptr request, diff --git a/cloud/blockstore/libs/endpoints/endpoint_manager.h b/cloud/blockstore/libs/endpoints/endpoint_manager.h index 1717f72223e..df0f1e0b40d 100644 --- a/cloud/blockstore/libs/endpoints/endpoint_manager.h +++ b/cloud/blockstore/libs/endpoints/endpoint_manager.h @@ -37,6 +37,11 @@ struct IEndpointManager #undef ENDPOINT_DECLARE_METHOD virtual NThreading::TFuture RestoreEndpoints() = 0; + + virtual NThreading::TFuture + RestoreSingleEndpointForTesting( + TCallContextPtr ctx, + std::shared_ptr request) = 0; }; //////////////////////////////////////////////////////////////////////////////// diff --git a/cloud/blockstore/libs/endpoints/endpoint_manager_ut.cpp b/cloud/blockstore/libs/endpoints/endpoint_manager_ut.cpp index 97256f4d466..0ef7bd66284 100644 --- a/cloud/blockstore/libs/endpoints/endpoint_manager_ut.cpp +++ b/cloud/blockstore/libs/endpoints/endpoint_manager_ut.cpp @@ -1939,6 +1939,49 @@ Y_UNIT_TEST_SUITE(TEndpointManagerTest) UNIT_ASSERT(!HasError(response)); } } + + Y_UNIT_TEST(ShouldNotCrashRestoreEndpointWhileStartingEndpoint) + { + TTempDir dir; + auto socketPath = dir.Path() / "testSocket"; + TString diskId = "testDiskId"; + auto ipcType = NProto::IPC_GRPC; + + TBootstrap bootstrap; + TMap mountedVolumes; + bootstrap.Service = CreateTestService(mountedVolumes); + + auto grpcListener = + CreateSocketEndpointListener(bootstrap.Logging, 16, MODE0660); + grpcListener->SetClientStorageFactory(CreateClientStorageFactoryStub()); + bootstrap.EndpointListeners = {{NProto::IPC_GRPC, grpcListener}}; + + auto manager = CreateEndpointManager(bootstrap); + bootstrap.Start(); + + NProto::TStartEndpointRequest request; + SetDefaultHeaders(request); + request.SetUnixSocketPath(socketPath.GetPath()); + request.SetDiskId(diskId); + request.SetClientId(TestClientId); + request.SetIpcType(ipcType); + + socketPath.DeleteIfExists(); + UNIT_ASSERT(!socketPath.Exists()); + + { + auto futureStartEndpoint = StartEndpoint(*manager, request); + auto futureRestoreEndpoint = + manager->RestoreSingleEndpointForTesting( + MakeIntrusive(), + std::make_shared(request)); + + UNIT_ASSERT( + !HasError(futureStartEndpoint.GetValue(TDuration::Seconds(5)))); + UNIT_ASSERT(!HasError( + futureRestoreEndpoint.GetValue(TDuration::Seconds(5)))); + } + } } } // namespace NCloud::NBlockStore::NServer From fab4a17c7803553179d4435dafa4da2f0e3dec20 Mon Sep 17 00:00:00 2001 From: Stepanyuk Vladislav Date: Thu, 16 Jan 2025 12:08:30 +0000 Subject: [PATCH 05/10] issue-2841: remove locks and fmt --- cloud/blockstore/libs/endpoints/endpoint_manager.cpp | 4 ---- 1 file changed, 4 deletions(-) diff --git a/cloud/blockstore/libs/endpoints/endpoint_manager.cpp b/cloud/blockstore/libs/endpoints/endpoint_manager.cpp index 85c6591819f..69ae626866d 100644 --- a/cloud/blockstore/libs/endpoints/endpoint_manager.cpp +++ b/cloud/blockstore/libs/endpoints/endpoint_manager.cpp @@ -2,7 +2,6 @@ #include "endpoint_events.h" #include "endpoint_listener.h" -#include "library/cpp/threading/future/async_semaphore.h" #include "session_manager.h" #include @@ -428,8 +427,6 @@ class TEndpointManager final THashMap Endpoints; - THashMap EndpointsLock; - NClient::IMetricClientPtr RestoringClient; TSet RestoringEndpoints; @@ -574,7 +571,6 @@ class TEndpointManager final return false; } - NProto::TStartEndpointResponse StartEndpointImpl( TCallContextPtr ctx, std::shared_ptr request, From f79eb0ea839ec81896d6d8d2e80cd7ba5c2cb359 Mon Sep 17 00:00:00 2001 From: Stepanyuk Vladislav Date: Fri, 17 Jan 2025 07:37:04 +0000 Subject: [PATCH 06/10] issue-2841: rm method from interface and refactor test --- .../libs/endpoints/endpoint_manager.cpp | 12 -- .../libs/endpoints/endpoint_manager.h | 5 - .../libs/endpoints/endpoint_manager_ut.cpp | 118 +++++++++++++++--- 3 files changed, 104 insertions(+), 31 deletions(-) diff --git a/cloud/blockstore/libs/endpoints/endpoint_manager.cpp b/cloud/blockstore/libs/endpoints/endpoint_manager.cpp index 69ae626866d..3376530c0e3 100644 --- a/cloud/blockstore/libs/endpoints/endpoint_manager.cpp +++ b/cloud/blockstore/libs/endpoints/endpoint_manager.cpp @@ -551,10 +551,6 @@ class TEndpointManager final }); } - TFuture RestoreSingleEndpointForTesting( - TCallContextPtr ctx, - std::shared_ptr request) override; - TFuture RestoreSingleEndpoint( TCallContextPtr ctx, std::shared_ptr request); @@ -724,14 +720,6 @@ class TEndpointManager final //////////////////////////////////////////////////////////////////////////////// -TFuture -TEndpointManager::RestoreSingleEndpointForTesting( - TCallContextPtr ctx, - std::shared_ptr request) -{ - return RestoreSingleEndpoint(ctx, std::move(request)); -} - TFuture TEndpointManager::RestoreSingleEndpoint( TCallContextPtr ctx, std::shared_ptr request) diff --git a/cloud/blockstore/libs/endpoints/endpoint_manager.h b/cloud/blockstore/libs/endpoints/endpoint_manager.h index df0f1e0b40d..1717f72223e 100644 --- a/cloud/blockstore/libs/endpoints/endpoint_manager.h +++ b/cloud/blockstore/libs/endpoints/endpoint_manager.h @@ -37,11 +37,6 @@ struct IEndpointManager #undef ENDPOINT_DECLARE_METHOD virtual NThreading::TFuture RestoreEndpoints() = 0; - - virtual NThreading::TFuture - RestoreSingleEndpointForTesting( - TCallContextPtr ctx, - std::shared_ptr request) = 0; }; //////////////////////////////////////////////////////////////////////////////// diff --git a/cloud/blockstore/libs/endpoints/endpoint_manager_ut.cpp b/cloud/blockstore/libs/endpoints/endpoint_manager_ut.cpp index 0ef7bd66284..6e2deaa5fcd 100644 --- a/cloud/blockstore/libs/endpoints/endpoint_manager_ut.cpp +++ b/cloud/blockstore/libs/endpoints/endpoint_manager_ut.cpp @@ -165,10 +165,78 @@ struct TMockSessionManager final: public ISessionManager //////////////////////////////////////////////////////////////////////////////// +class TSlowDevice final: public NBD::IDevice +{ +public: + explicit TSlowDevice(std::weak_ptr<::NCloud::TExecutor> executor) + : Executor(std::move(executor)) + {} + + TExecutorPtr GetExecutor() + { + if (Executor.expired()) { + return {}; + } + + return Executor.lock(); + } + + NThreading::TFuture StubFunction() + { + auto executor = GetExecutor(); + if (!executor) { + return NThreading::MakeFuture(MakeError(S_OK)); + } + + return executor->Execute( + [weakExec = Executor]() + { + + auto executor = weakExec.lock(); + if (!executor) { + return MakeError(S_OK); + } + auto stubFuture = + executor->Execute([]() {}); + + executor->WaitFor(stubFuture); + return MakeError(S_OK); + }); + } + + NThreading::TFuture Start() override + { + return StubFunction(); + } + + NThreading::TFuture Stop(bool deleteDevice) override + { + Y_UNUSED(deleteDevice); + + return StubFunction(); + } + + NThreading::TFuture Resize(ui64 deviceSizeInBytes) override + { + Y_UNUSED(deviceSizeInBytes); + + return StubFunction(); + } + +private: + std::weak_ptr<::NCloud::TExecutor> Executor; +}; + struct TTestDeviceFactory : public NBD::IDeviceFactory { TVector Devices; + std::weak_ptr<::NCloud::TExecutor> Executor; + + void SetExecutor(std::weak_ptr<::NCloud::TExecutor> executor) + { + Executor = std::move(executor); + } NBD::IDevicePtr Create( const TNetworkAddress& connectAddress, @@ -180,7 +248,7 @@ struct TTestDeviceFactory Y_UNUSED(blockCount); Y_UNUSED(blockSize); Devices.push_back(deviceName); - return NBD::CreateDeviceStub(); + return std::make_shared(Executor); } }; @@ -1942,22 +2010,39 @@ Y_UNIT_TEST_SUITE(TEndpointManagerTest) Y_UNIT_TEST(ShouldNotCrashRestoreEndpointWhileStartingEndpoint) { + TString nbdDevPrefix = CreateGuidAsString() + "_nbd"; + auto nbdDevice = nbdDevPrefix + "0"; + auto nbdDeviceAnother = nbdDevPrefix + "1"; + TFsPath(nbdDevice).Touch(); + TFsPath(nbdDeviceAnother).Touch(); + Y_DEFER { + TFsPath(nbdDevice).DeleteIfExists(); + TFsPath(nbdDeviceAnother).DeleteIfExists(); + }; TTempDir dir; auto socketPath = dir.Path() / "testSocket"; TString diskId = "testDiskId"; - auto ipcType = NProto::IPC_GRPC; + auto ipcType = NProto::IPC_NBD; TBootstrap bootstrap; TMap mountedVolumes; bootstrap.Service = CreateTestService(mountedVolumes); - auto grpcListener = - CreateSocketEndpointListener(bootstrap.Logging, 16, MODE0660); - grpcListener->SetClientStorageFactory(CreateClientStorageFactoryStub()); - bootstrap.EndpointListeners = {{NProto::IPC_GRPC, grpcListener}}; + bootstrap.Options.NbdDevicePrefix = nbdDevPrefix; + + auto deviceFactory = std::make_shared(); + deviceFactory->SetExecutor(bootstrap.Executor); + bootstrap.NbdDeviceFactory = deviceFactory; + + auto listener = std::make_shared(); + bootstrap.EndpointListeners = {{ NProto::IPC_NBD, listener }}; auto manager = CreateEndpointManager(bootstrap); bootstrap.Start(); + Y_DEFER + { + bootstrap.Stop(); + }; NProto::TStartEndpointRequest request; SetDefaultHeaders(request); @@ -1965,21 +2050,26 @@ Y_UNIT_TEST_SUITE(TEndpointManagerTest) request.SetDiskId(diskId); request.SetClientId(TestClientId); request.SetIpcType(ipcType); + request.SetNbdDeviceFile(nbdDevice); + request.SetPersistent(true); + auto [str, error] = SerializeEndpoint(request); + UNIT_ASSERT_C(!HasError(error), error); + + bootstrap.EndpointStorage->AddEndpoint(socketPath.GetPath(), str); + + request.SetNbdDeviceFile(nbdDeviceAnother); socketPath.DeleteIfExists(); UNIT_ASSERT(!socketPath.Exists()); { auto futureStartEndpoint = StartEndpoint(*manager, request); - auto futureRestoreEndpoint = - manager->RestoreSingleEndpointForTesting( - MakeIntrusive(), - std::make_shared(request)); + bootstrap.Executor + ->Execute([manager = manager.get()]() mutable + { manager->RestoreEndpoints(); }) + .Wait(); - UNIT_ASSERT( - !HasError(futureStartEndpoint.GetValue(TDuration::Seconds(5)))); - UNIT_ASSERT(!HasError( - futureRestoreEndpoint.GetValue(TDuration::Seconds(5)))); + futureStartEndpoint.Wait(); } } } From 73c9b4c782e5bc7ddf15710b23c67e87894d9fc7 Mon Sep 17 00:00:00 2001 From: Stepanyuk Vladislav Date: Fri, 17 Jan 2025 07:47:37 +0000 Subject: [PATCH 07/10] issue:2841 add comment --- .../blockstore/libs/endpoints/endpoint_manager.cpp | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/cloud/blockstore/libs/endpoints/endpoint_manager.cpp b/cloud/blockstore/libs/endpoints/endpoint_manager.cpp index 3376530c0e3..c571ee7d1c0 100644 --- a/cloud/blockstore/libs/endpoints/endpoint_manager.cpp +++ b/cloud/blockstore/libs/endpoints/endpoint_manager.cpp @@ -775,6 +775,19 @@ NProto::TStartEndpointResponse TEndpointManager::DoStartEndpoint( return response; } +// There are two maps Endpoints: one in the TEndpointManager, the other in +// TSessionManager. If we try to call TSessionManager::CreateSession method +// using an endpoint, for which a session has already been created (endpoint has +// already been inserted into TSessionManager::Endpoints), a crash will occur. +// To prevent this, we use the TEndpointManager::Endpoints map and check for the +// existence of an endpoint at the beginning of the StartEndpointImpl method. +// If the endpoint already exists, we call AlterEndpoint. However, insertion +// into TEndpointManager::Endpoints is delayed until the end of +// StartEndpointImpl. In the meantime, multiple asynchronous operations are +// performed, which can lead to race conditions. To mitigate this, we must call +// this method with the AddProcessingSocket/RemoveProcessingSocket +// synchronization primitive or guarantee that this method is not called +// concurrently. NProto::TStartEndpointResponse TEndpointManager::StartEndpointImpl( TCallContextPtr ctx, std::shared_ptr request, From 420a80c4d1e76b03d78301af7e6c446cca6b426a Mon Sep 17 00:00:00 2001 From: Stepanyuk Vladislav Date: Sun, 19 Jan 2025 16:18:03 +0000 Subject: [PATCH 08/10] issue-2841: correct test --- .../libs/endpoints/endpoint_manager.cpp | 16 +-- .../libs/endpoints/endpoint_manager_ut.cpp | 99 +++++++++---------- 2 files changed, 49 insertions(+), 66 deletions(-) diff --git a/cloud/blockstore/libs/endpoints/endpoint_manager.cpp b/cloud/blockstore/libs/endpoints/endpoint_manager.cpp index c571ee7d1c0..d1f2199dfda 100644 --- a/cloud/blockstore/libs/endpoints/endpoint_manager.cpp +++ b/cloud/blockstore/libs/endpoints/endpoint_manager.cpp @@ -762,6 +762,9 @@ NProto::TStartEndpointResponse TEndpointManager::DoStartEndpoint( return TErrorResponse(E_REJECTED, "endpoint is restoring now"); } + // We can have concurrent StartEndpoint and RestoreEndpoint call when the + // process starts. AddProcessingSocket should protect us from this race and + // delay the StartEndpoint call. auto promise = AddProcessingSocket(*request); if (promise.HasValue()) { return promise.ExtractValue(); @@ -775,19 +778,6 @@ NProto::TStartEndpointResponse TEndpointManager::DoStartEndpoint( return response; } -// There are two maps Endpoints: one in the TEndpointManager, the other in -// TSessionManager. If we try to call TSessionManager::CreateSession method -// using an endpoint, for which a session has already been created (endpoint has -// already been inserted into TSessionManager::Endpoints), a crash will occur. -// To prevent this, we use the TEndpointManager::Endpoints map and check for the -// existence of an endpoint at the beginning of the StartEndpointImpl method. -// If the endpoint already exists, we call AlterEndpoint. However, insertion -// into TEndpointManager::Endpoints is delayed until the end of -// StartEndpointImpl. In the meantime, multiple asynchronous operations are -// performed, which can lead to race conditions. To mitigate this, we must call -// this method with the AddProcessingSocket/RemoveProcessingSocket -// synchronization primitive or guarantee that this method is not called -// concurrently. NProto::TStartEndpointResponse TEndpointManager::StartEndpointImpl( TCallContextPtr ctx, std::shared_ptr request, diff --git a/cloud/blockstore/libs/endpoints/endpoint_manager_ut.cpp b/cloud/blockstore/libs/endpoints/endpoint_manager_ut.cpp index 6e2deaa5fcd..873fe6b2f12 100644 --- a/cloud/blockstore/libs/endpoints/endpoint_manager_ut.cpp +++ b/cloud/blockstore/libs/endpoints/endpoint_manager_ut.cpp @@ -165,78 +165,66 @@ struct TMockSessionManager final: public ISessionManager //////////////////////////////////////////////////////////////////////////////// -class TSlowDevice final: public NBD::IDevice +struct TTestDeviceFactory + : public NBD::IDeviceFactory { -public: - explicit TSlowDevice(std::weak_ptr<::NCloud::TExecutor> executor) - : Executor(std::move(executor)) - {} + TVector Devices; - TExecutorPtr GetExecutor() + NBD::IDevicePtr Create( + const TNetworkAddress& connectAddress, + TString deviceName, + ui64 blockCount, + ui32 blockSize) override { - if (Executor.expired()) { - return {}; - } - - return Executor.lock(); + Y_UNUSED(connectAddress); + Y_UNUSED(blockCount); + Y_UNUSED(blockSize); + Devices.push_back(deviceName); + return NBD::CreateDeviceStub(); } +}; - NThreading::TFuture StubFunction() - { - auto executor = GetExecutor(); - if (!executor) { - return NThreading::MakeFuture(MakeError(S_OK)); - } - - return executor->Execute( - [weakExec = Executor]() - { - - auto executor = weakExec.lock(); - if (!executor) { - return MakeError(S_OK); - } - auto stubFuture = - executor->Execute([]() {}); +//////////////////////////////////////////////////////////////////////////////// - executor->WaitFor(stubFuture); - return MakeError(S_OK); - }); - } +class TControlledDevice final: public NBD::IDevice +{ +public: + explicit TControlledDevice( + std::function()> deviceStubFunction) + : DeviceStubFunction(std::move(deviceStubFunction)) + {} NThreading::TFuture Start() override { - return StubFunction(); + return DeviceStubFunction(); } NThreading::TFuture Stop(bool deleteDevice) override { Y_UNUSED(deleteDevice); - return StubFunction(); + return DeviceStubFunction(); } NThreading::TFuture Resize(ui64 deviceSizeInBytes) override { Y_UNUSED(deviceSizeInBytes); - return StubFunction(); + return DeviceStubFunction(); } private: - std::weak_ptr<::NCloud::TExecutor> Executor; + std::function()> DeviceStubFunction; }; -struct TTestDeviceFactory - : public NBD::IDeviceFactory +struct TTestControlledDeviceFactory: public NBD::IDeviceFactory { - TVector Devices; - std::weak_ptr<::NCloud::TExecutor> Executor; + std::function()> DeviceStubFunction; - void SetExecutor(std::weak_ptr<::NCloud::TExecutor> executor) - { - Executor = std::move(executor); - } + explicit TTestControlledDeviceFactory( + std::function()> deviceStubFunction) + : DeviceStubFunction(std::move(deviceStubFunction)) + {} NBD::IDevicePtr Create( const TNetworkAddress& connectAddress, @@ -245,10 +233,10 @@ struct TTestDeviceFactory ui32 blockSize) override { Y_UNUSED(connectAddress); + Y_UNUSED(deviceName); Y_UNUSED(blockCount); Y_UNUSED(blockSize); - Devices.push_back(deviceName); - return std::make_shared(Executor); + return std::make_shared(DeviceStubFunction); } }; @@ -2030,8 +2018,10 @@ Y_UNIT_TEST_SUITE(TEndpointManagerTest) bootstrap.Options.NbdDevicePrefix = nbdDevPrefix; - auto deviceFactory = std::make_shared(); - deviceFactory->SetExecutor(bootstrap.Executor); + auto promise = NewPromise(); + + auto deviceFactory = std::make_shared( + [&]() { return promise.GetFuture(); }); bootstrap.NbdDeviceFactory = deviceFactory; auto listener = std::make_shared(); @@ -2064,12 +2054,15 @@ Y_UNIT_TEST_SUITE(TEndpointManagerTest) { auto futureStartEndpoint = StartEndpoint(*manager, request); - bootstrap.Executor - ->Execute([manager = manager.get()]() mutable - { manager->RestoreEndpoints(); }) - .Wait(); + auto restoreEndpoints = + bootstrap.Executor->Execute([manager = manager.get()]() mutable + { manager->RestoreEndpoints(); }); - futureStartEndpoint.Wait(); + Sleep(TDuration::MilliSeconds(100)); + promise.SetValue(MakeError(S_OK, {})); + + UNIT_ASSERT( + !HasError(futureStartEndpoint.GetValue(TDuration::Seconds(1)))); } } } From 4507bbf3155c1f684fb7686124ab5b2f33f62e89 Mon Sep 17 00:00:00 2001 From: Stepanyuk Vladislav Date: Mon, 20 Jan 2025 04:29:30 +0000 Subject: [PATCH 09/10] issue-2841: rename variable and add wait --- cloud/blockstore/libs/endpoints/endpoint_manager_ut.cpp | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/cloud/blockstore/libs/endpoints/endpoint_manager_ut.cpp b/cloud/blockstore/libs/endpoints/endpoint_manager_ut.cpp index 873fe6b2f12..0a194b42e50 100644 --- a/cloud/blockstore/libs/endpoints/endpoint_manager_ut.cpp +++ b/cloud/blockstore/libs/endpoints/endpoint_manager_ut.cpp @@ -2053,8 +2053,8 @@ Y_UNIT_TEST_SUITE(TEndpointManagerTest) UNIT_ASSERT(!socketPath.Exists()); { - auto futureStartEndpoint = StartEndpoint(*manager, request); - auto restoreEndpoints = + auto startEndpointFuture = StartEndpoint(*manager, request); + auto restoreEndpointsFuture = bootstrap.Executor->Execute([manager = manager.get()]() mutable { manager->RestoreEndpoints(); }); @@ -2062,7 +2062,8 @@ Y_UNIT_TEST_SUITE(TEndpointManagerTest) promise.SetValue(MakeError(S_OK, {})); UNIT_ASSERT( - !HasError(futureStartEndpoint.GetValue(TDuration::Seconds(1)))); + !HasError(startEndpointFuture.GetValue(TDuration::Seconds(1)))); + restoreEndpointsFuture.Wait(TDuration::Seconds(1)); } } } From 0c4360de576e758fcce5669becabce9b75274555 Mon Sep 17 00:00:00 2001 From: Stepanyuk Vladislav Date: Tue, 21 Jan 2025 07:28:33 +0000 Subject: [PATCH 10/10] issue-2841: rewrite test --- .../libs/endpoints/endpoint_manager_ut.cpp | 27 ++++++++++++++----- .../core/libs/common/scheduler_test.cpp | 14 ++++++++++ .../storage/core/libs/common/scheduler_test.h | 4 +++ 3 files changed, 38 insertions(+), 7 deletions(-) diff --git a/cloud/blockstore/libs/endpoints/endpoint_manager_ut.cpp b/cloud/blockstore/libs/endpoints/endpoint_manager_ut.cpp index 0a194b42e50..6700df7361b 100644 --- a/cloud/blockstore/libs/endpoints/endpoint_manager_ut.cpp +++ b/cloud/blockstore/libs/endpoints/endpoint_manager_ut.cpp @@ -2054,16 +2054,29 @@ Y_UNIT_TEST_SUITE(TEndpointManagerTest) { auto startEndpointFuture = StartEndpoint(*manager, request); - auto restoreEndpointsFuture = - bootstrap.Executor->Execute([manager = manager.get()]() mutable - { manager->RestoreEndpoints(); }); - Sleep(TDuration::MilliSeconds(100)); + auto waitForTaskScheduleFuture = + bootstrap.Scheduler->WaitForTaskSchedule(); + auto restoreEndpointsFuture = bootstrap.Executor->Execute( + [manager = manager.get(), + executor = bootstrap.Executor.get()]() mutable + { + auto future = manager->RestoreEndpoints(); + executor->WaitFor(future); + }); + + // We start RestoreEndpoints with other args and it execute + // concurrently with StartEndpoint request, this means that + // RestoreSingleEndpoint should be rejected until StartEndpoint + // request processing ends. On reject restoring client will schedule + // task to retry request. + waitForTaskScheduleFuture.Wait(); + promise.SetValue(MakeError(S_OK, {})); - UNIT_ASSERT( - !HasError(startEndpointFuture.GetValue(TDuration::Seconds(1)))); - restoreEndpointsFuture.Wait(TDuration::Seconds(1)); + startEndpointFuture.Wait(); + bootstrap.Scheduler->RunAllScheduledTasks(); + restoreEndpointsFuture.Wait(); } } } diff --git a/cloud/storage/core/libs/common/scheduler_test.cpp b/cloud/storage/core/libs/common/scheduler_test.cpp index e32a7273239..e195d4338f7 100644 --- a/cloud/storage/core/libs/common/scheduler_test.cpp +++ b/cloud/storage/core/libs/common/scheduler_test.cpp @@ -14,6 +14,10 @@ void TTestScheduler::Schedule( with_lock (CallbacksLock) { Callbacks.push_back(callback); + if (GotNewCallback) { + GotNewCallback->SetValue(); + } + GotNewCallback = std::nullopt; } } @@ -29,4 +33,14 @@ void TTestScheduler::RunAllScheduledTasks() } } +NThreading::TFuture TTestScheduler::WaitForTaskSchedule() +{ + with_lock (CallbacksLock) { + if (!GotNewCallback) { + GotNewCallback = NThreading::NewPromise(); + } + return GotNewCallback->GetFuture(); + } +} + } // namespace NCloud diff --git a/cloud/storage/core/libs/common/scheduler_test.h b/cloud/storage/core/libs/common/scheduler_test.h index 7493702c9e6..746a20f1950 100644 --- a/cloud/storage/core/libs/common/scheduler_test.h +++ b/cloud/storage/core/libs/common/scheduler_test.h @@ -2,6 +2,7 @@ #include "public.h" +#include "library/cpp/threading/future/core/future.h" #include "scheduler.h" #include @@ -17,6 +18,7 @@ class TTestScheduler final private: TMutex CallbacksLock; TVector Callbacks; + std::optional> GotNewCallback; public: void Start() override {} @@ -28,6 +30,8 @@ class TTestScheduler final TCallback callback) override; void RunAllScheduledTasks(); + + NThreading::TFuture WaitForTaskSchedule(); }; } // namespace NCloud