Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

issue-2841: add lock on StartEndpointImpl method call #2842

Merged
merged 10 commits into from
Jan 21, 2025
10 changes: 5 additions & 5 deletions cloud/blockstore/libs/daemon/common/bootstrap.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -895,11 +895,6 @@ void TBootstrapBase::Start()
// order
START_COMMON_COMPONENT(Scheduler);

if (Configs->Options->MemLock) {
LockProcessMemory(Log);
STORAGE_INFO("Process memory locked");
}

auto restoreFuture = EndpointManager->RestoreEndpoints();
if (!Configs->Options->TemporaryServer) {
auto balancerSwitch = VolumeBalancerSwitch;
Expand All @@ -910,6 +905,11 @@ void TBootstrapBase::Start()
}
STORAGE_INFO("Started endpoints restoring");

if (Configs->Options->MemLock) {
komarevtsev-d marked this conversation as resolved.
Show resolved Hide resolved
LockProcessMemory(Log);
STORAGE_INFO("Process memory locked");
}

#undef START_COMMON_COMPONENT
#undef START_KIKIMR_COMPONENT
}
Expand Down
46 changes: 31 additions & 15 deletions cloud/blockstore/libs/endpoints/endpoint_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -724,21 +724,33 @@ TFuture<NProto::TStartEndpointResponse> TEndpointManager::RestoreSingleEndpoint(
TCallContextPtr ctx,
std::shared_ptr<NProto::TStartEndpointRequest> request)
{
return Executor->Execute([
weakSelf = weak_from_this(),
ctx,
request] () mutable -> NProto::TStartEndpointResponse
{
auto self = weakSelf.lock();
if (!self) {
return TErrorResponse(E_FAIL, "EndpointManager is destroyed");
}
return Executor->Execute(
[weakSelf = weak_from_this(),
ctx,
request]() mutable -> NProto::TStartEndpointResponse
{
auto self = weakSelf.lock();
if (!self) {
return TErrorResponse(E_FAIL, "EndpointManager is destroyed");
}

return self->StartEndpointImpl(
std::move(ctx),
std::move(request),
true);
});
auto promise =
self->AddProcessingSocket<TStartEndpointMethod>(*request);
if (promise.HasValue()) {
return promise.ExtractValue();
}
komarevtsev-d marked this conversation as resolved.
Show resolved Hide resolved

auto socketPath = request->GetUnixSocketPath();

auto response = self->StartEndpointImpl(
std::move(ctx),
std::move(request),
true);
promise.SetValue(response);

self->RemoveProcessingSocket(socketPath);
return response;
});
}

NProto::TStartEndpointResponse TEndpointManager::DoStartEndpoint(
Expand All @@ -750,12 +762,16 @@ NProto::TStartEndpointResponse TEndpointManager::DoStartEndpoint(
return TErrorResponse(E_REJECTED, "endpoint is restoring now");
}

// We can have concurrent StartEndpoint and RestoreEndpoint call when the
// process starts. AddProcessingSocket should protect us from this race and
// delay the StartEndpoint call.
auto promise = AddProcessingSocket<TStartEndpointMethod>(*request);
if (promise.HasValue()) {
return promise.ExtractValue();
}

auto response = StartEndpointImpl(std::move(ctx), std::move(request), false);
auto response =
StartEndpointImpl(std::move(ctx), std::move(request), false);
promise.SetValue(response);

RemoveProcessingSocket(socketPath);
Expand Down
126 changes: 126 additions & 0 deletions cloud/blockstore/libs/endpoints/endpoint_manager_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,62 @@ struct TTestDeviceFactory

////////////////////////////////////////////////////////////////////////////////

class TControlledDevice final: public NBD::IDevice
{
public:
explicit TControlledDevice(
std::function<NThreading::TFuture<NProto::TError>()> deviceStubFunction)
: DeviceStubFunction(std::move(deviceStubFunction))
{}

NThreading::TFuture<NProto::TError> Start() override
{
return DeviceStubFunction();
}

NThreading::TFuture<NProto::TError> Stop(bool deleteDevice) override
{
Y_UNUSED(deleteDevice);

return DeviceStubFunction();
}

NThreading::TFuture<NProto::TError> Resize(ui64 deviceSizeInBytes) override
{
Y_UNUSED(deviceSizeInBytes);

return DeviceStubFunction();
}

private:
std::function<NThreading::TFuture<NProto::TError>()> DeviceStubFunction;
};

struct TTestControlledDeviceFactory: public NBD::IDeviceFactory
{
std::function<NThreading::TFuture<NProto::TError>()> DeviceStubFunction;

explicit TTestControlledDeviceFactory(
std::function<NThreading::TFuture<NProto::TError>()> deviceStubFunction)
: DeviceStubFunction(std::move(deviceStubFunction))
{}

NBD::IDevicePtr Create(
const TNetworkAddress& connectAddress,
TString deviceName,
ui64 blockCount,
ui32 blockSize) override
{
Y_UNUSED(connectAddress);
Y_UNUSED(deviceName);
Y_UNUSED(blockCount);
Y_UNUSED(blockSize);
return std::make_shared<TControlledDevice>(DeviceStubFunction);
}
};

////////////////////////////////////////////////////////////////////////////////

struct TTestEndpoint
{
NProto::TStartEndpointRequest Request;
Expand Down Expand Up @@ -1939,6 +1995,76 @@ Y_UNIT_TEST_SUITE(TEndpointManagerTest)
UNIT_ASSERT(!HasError(response));
}
}

Y_UNIT_TEST(ShouldNotCrashRestoreEndpointWhileStartingEndpoint)
{
TString nbdDevPrefix = CreateGuidAsString() + "_nbd";
auto nbdDevice = nbdDevPrefix + "0";
auto nbdDeviceAnother = nbdDevPrefix + "1";
TFsPath(nbdDevice).Touch();
TFsPath(nbdDeviceAnother).Touch();
Y_DEFER {
TFsPath(nbdDevice).DeleteIfExists();
TFsPath(nbdDeviceAnother).DeleteIfExists();
};
TTempDir dir;
auto socketPath = dir.Path() / "testSocket";
TString diskId = "testDiskId";
auto ipcType = NProto::IPC_NBD;

TBootstrap bootstrap;
TMap<TString, NProto::TMountVolumeRequest> mountedVolumes;
bootstrap.Service = CreateTestService(mountedVolumes);

bootstrap.Options.NbdDevicePrefix = nbdDevPrefix;

auto promise = NewPromise<NProto::TError>();

auto deviceFactory = std::make_shared<TTestControlledDeviceFactory>(
[&]() { return promise.GetFuture(); });
bootstrap.NbdDeviceFactory = deviceFactory;

auto listener = std::make_shared<TTestEndpointListener>();
bootstrap.EndpointListeners = {{ NProto::IPC_NBD, listener }};

auto manager = CreateEndpointManager(bootstrap);
bootstrap.Start();
Y_DEFER
{
bootstrap.Stop();
};

NProto::TStartEndpointRequest request;
SetDefaultHeaders(request);
request.SetUnixSocketPath(socketPath.GetPath());
request.SetDiskId(diskId);
request.SetClientId(TestClientId);
request.SetIpcType(ipcType);
request.SetNbdDeviceFile(nbdDevice);
request.SetPersistent(true);

auto [str, error] = SerializeEndpoint(request);
UNIT_ASSERT_C(!HasError(error), error);

bootstrap.EndpointStorage->AddEndpoint(socketPath.GetPath(), str);

request.SetNbdDeviceFile(nbdDeviceAnother);
socketPath.DeleteIfExists();
UNIT_ASSERT(!socketPath.Exists());

{
auto futureStartEndpoint = StartEndpoint(*manager, request);
komarevtsev-d marked this conversation as resolved.
Show resolved Hide resolved
auto restoreEndpoints =
bootstrap.Executor->Execute([manager = manager.get()]() mutable
{ manager->RestoreEndpoints(); });

Sleep(TDuration::MilliSeconds(100));
komarevtsev-d marked this conversation as resolved.
Show resolved Hide resolved
promise.SetValue(MakeError(S_OK, {}));

UNIT_ASSERT(
!HasError(futureStartEndpoint.GetValue(TDuration::Seconds(1))));
vladstepanyuk marked this conversation as resolved.
Show resolved Hide resolved
}
komarevtsev-d marked this conversation as resolved.
Show resolved Hide resolved
}
}

} // namespace NCloud::NBlockStore::NServer
Loading