Skip to content

Commit

Permalink
properly set future returned by Start/Stop methods; issue-1164 (#1443)
Browse files Browse the repository at this point in the history
properly set future returned by Start/Stop methods; issue-1164
  • Loading branch information
tpashkin authored Jun 20, 2024
1 parent 4edd26c commit 7dbf27a
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 51 deletions.
2 changes: 1 addition & 1 deletion .github/actions/prepare/action.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ runs:
sudo apt-get update
sudo apt-get install -y --no-install-recommends git wget gnupg lsb-release curl xz-utils tzdata cmake \
python3-dev python3-pip ninja-build antlr3 m4 libidn11-dev libaio1 libaio-dev make clang-14 lld-14 llvm-14 file \
distcc strace qemu-kvm qemu-utils dpkg-dev atop pigz pbzip2 xz-utils pixz
distcc strace qemu-kvm qemu-utils dpkg-dev atop pigz pbzip2 xz-utils pixz libnl-3-dev libnl-genl-3-dev
sudo apt-get remove -y unattended-upgrades
sudo pip3 install conan==1.59 pytest==7.1.3 pytest-timeout pytest-xdist==3.3.1 setproctitle==1.3.2 grpcio grpcio-tools \
PyHamcrest tornado xmltodict pyarrow boto3 moto[server] psutil yandexcloud==0.258.0 PyGithub==2.2.0 pyinstaller==5.13.2 \
Expand Down
2 changes: 1 addition & 1 deletion .github/scripts/github-runner.sh
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ sudo apt-get install -y --no-install-recommends \
dpkg-dev docker-ce docker-ce-cli containerd.io \
docker-buildx-plugin docker-compose-plugin jq \
aria2 jq tree tmux atop awscli iftop htop \
pixz pigz pbzip2 xz-utils
pixz pigz pbzip2 xz-utils libnl-3-dev libnl-genl-3-dev
cat << EOF > /tmp/requirements.txt
conan==1.59
pytest==7.1.3
Expand Down
38 changes: 27 additions & 11 deletions cloud/blockstore/libs/endpoint_proxy/server/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -584,15 +584,24 @@ struct TServer: IEndpointProxyServer
if (ep.NbdDevicePath) {
if (Config.Netlink) {
#ifdef NETLINK
ep.NbdDevice = NBD::CreateNetlinkDevice(
Logging,
*ep.ListenAddress,
request.GetNbdDevice(),
TDuration::Minutes(1), // request timeout
TDuration::Days(1), // connection timeout
true); // reconfigure device if exists
try {
ep.NbdDevice = NBD::CreateNetlinkDevice(
Logging,
*ep.ListenAddress,
request.GetNbdDevice(),
TDuration::Minutes(1), // request timeout
TDuration::Days(1), // connection timeout
true); // reconfigure existing device

} catch (const std::exception& e) {
STORAGE_ERROR(request.ShortDebugString().Quote()
<< " - Unable to create netlink device: " << e.what()
<< ", falling back to ioctl");
}
#else
STORAGE_ERROR("built without netlink support, falling back to ioctl");
STORAGE_ERROR(request.ShortDebugString().Quote()
<< " - Built without netlink support"
<< ", falling back to ioctl");
#endif
}
if (ep.NbdDevice == nullptr) {
Expand All @@ -602,10 +611,17 @@ struct TServer: IEndpointProxyServer
request.GetNbdDevice(),
TDuration::Days(1)); // request timeout
}
ep.NbdDevice->Start();

STORAGE_INFO(request.ShortDebugString().Quote()
<< " - Started NBD device connection");
auto start = ep.NbdDevice->Start();
const auto& value = start.GetValue();
if (HasError(value)) {
STORAGE_ERROR(request.ShortDebugString().Quote()
<< " - Unable to start nbd device: "
<< value.GetMessage());
} else {
STORAGE_INFO(request.ShortDebugString().Quote()
<< " - Started NBD device connection");
}
} else {
STORAGE_WARN(request.ShortDebugString().Quote()
<< " - NbdDevice missing - no nbd connection with the"
Expand Down
127 changes: 89 additions & 38 deletions cloud/blockstore/libs/nbd/netlink_device.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ namespace NCloud::NBlockStore::NBD {

namespace {

using namespace NThreading;

////////////////////////////////////////////////////////////////////////////////

constexpr TStringBuf NBD_DEVICE_SUFFIX = "/dev/nbd";
Expand Down Expand Up @@ -131,7 +133,8 @@ class TNetlinkMessage
void Put(int attribute, T data)
{
if (nla_put(Message, attribute, sizeof(T), &data) < 0) {
throw TServiceError(E_FAIL) << "unable to put attribute " << attribute;
throw TServiceError(E_FAIL) << "unable to put attribute "
<< attribute;
}
}

Expand All @@ -155,7 +158,18 @@ class TNetlinkMessage

class TNetlinkDevice final
: public IDevice
, public std::enable_shared_from_this<TNetlinkDevice>
{
private:
struct THandlerContext
{
std::shared_ptr<TNetlinkDevice> Device;

THandlerContext(std::shared_ptr<TNetlinkDevice> device)
: Device(std::move(device))
{}
};

private:
const ILoggingServicePtr Logging;
const TNetworkAddress ConnectAddress;
Expand All @@ -168,9 +182,11 @@ class TNetlinkDevice final
IClientHandlerPtr Handler;
TSocket Socket;
ui32 DeviceIndex;

TAtomic ShouldStop = 0;

TPromise<NProto::TError> StartResult = NewPromise<NProto::TError>();
TPromise<NProto::TError> StopResult = NewPromise<NProto::TError>();

public:
TNetlinkDevice(
ILoggingServicePtr logging,
Expand All @@ -197,7 +213,8 @@ class TNetlinkDevice final
}

try {
TMemoryInput stream(DeviceName.data() + pos + NBD_DEVICE_SUFFIX.size());
TMemoryInput stream(
DeviceName.data() + pos + NBD_DEVICE_SUFFIX.size());
stream >> DeviceIndex;
} catch (...) {
throw TServiceError(E_ARGUMENT)
Expand All @@ -207,29 +224,42 @@ class TNetlinkDevice final

~TNetlinkDevice()
{
Stop(false);
Stop(false).GetValueSync();
}

NThreading::TFuture<NProto::TError> Start() override
TFuture<NProto::TError> Start() override
{
ConnectSocket();
ConnectDevice();

return NThreading::MakeFuture(MakeError(S_OK));
// will be set asynchronously in Connect > HandleStatus > DoConnect
return StartResult.GetFuture();
}

NThreading::TFuture<NProto::TError> Stop(bool deleteDevice) override
TFuture<NProto::TError> Stop(bool deleteDevice) override
{
if (AtomicSwap(&ShouldStop, 1) == 1) {
return NThreading::MakeFuture(MakeError(S_OK));
return StopResult.GetFuture();
}

if (!deleteDevice) {
StopResult.SetValue(MakeError(S_OK));
return StopResult.GetFuture();
}

if (deleteDevice) {
try {
DisconnectDevice();
DisconnectSocket();
StopResult.SetValue(MakeError(S_OK));

} catch (const TServiceError& e) {
StopResult.SetValue(MakeError(
E_FAIL,
TStringBuilder() << "unable to disconnect " << DeviceName
<< ": " << e.what()));
}

return NThreading::MakeFuture(MakeError(S_OK));
return StopResult.GetFuture();
}

private:
Expand Down Expand Up @@ -290,15 +320,19 @@ void TNetlinkDevice::DoConnectDevice(bool connected)
const auto& info = Handler->GetExportInfo();
message.Put(NBD_ATTR_INDEX, DeviceIndex);
message.Put(NBD_ATTR_SIZE_BYTES, static_cast<ui64>(info.Size));
message.Put(NBD_ATTR_BLOCK_SIZE_BYTES, static_cast<ui64>(info.MinBlockSize));
message.Put(
NBD_ATTR_BLOCK_SIZE_BYTES,
static_cast<ui64>(info.MinBlockSize));
message.Put(NBD_ATTR_SERVER_FLAGS, static_cast<ui64>(info.Flags));

if (Timeout) {
message.Put(NBD_ATTR_TIMEOUT, Timeout.Seconds());
}

if (DeadConnectionTimeout) {
message.Put(NBD_ATTR_DEAD_CONN_TIMEOUT, DeadConnectionTimeout.Seconds());
message.Put(
NBD_ATTR_DEAD_CONN_TIMEOUT,
DeadConnectionTimeout.Seconds());
}

{
Expand All @@ -308,23 +342,24 @@ void TNetlinkDevice::DoConnectDevice(bool connected)
}

message.Send(socket);
StartResult.SetValue(MakeError(S_OK));

} catch (const TServiceError& e) {
STORAGE_ERROR("unable to configure " << DeviceName << ": " << e.what());
StartResult.SetValue(MakeError(
e.GetCode(),
TStringBuilder()
<< "unable to configure " << DeviceName << ": " << e.what()));
}
}

void TNetlinkDevice::DisconnectDevice()
{
STORAGE_INFO("disconnect " << DeviceName);

try {
TNetlinkSocket socket;
TNetlinkMessage message(socket.GetFamily(), NBD_CMD_DISCONNECT);
message.Put(NBD_ATTR_INDEX, DeviceIndex);
message.Send(socket);
} catch (const TServiceError& e) {
STORAGE_ERROR("unable to disconnect " << DeviceName << ": " << e.what());
}
TNetlinkSocket socket;
TNetlinkMessage message(socket.GetFamily(), NBD_CMD_DISCONNECT);
message.Put(NBD_ATTR_INDEX, DeviceIndex);
message.Send(socket);
}

// queries device status and registers callback that will connect
Expand All @@ -333,32 +368,33 @@ void TNetlinkDevice::ConnectDevice()
{
try {
TNetlinkSocket socket;
auto context = std::make_unique<THandlerContext>(shared_from_this());

nl_socket_modify_cb(
socket,
NL_CB_VALID,
NL_CB_CUSTOM,
TNetlinkDevice::StatusHandler,
this);

// TODO: use proper context containing device pointer and a socket
// send_sync waits for the response and invokes callback immediately
// even before returning, so it's technically okay to pass 'this' as
// an argument, but it still looks flimsy
context.release()); // libnl doesn't throw

TNetlinkMessage message(socket.GetFamily(), NBD_CMD_STATUS);
message.Put(NBD_ATTR_INDEX, DeviceIndex);
message.Send(socket);

} catch (const TServiceError& e) {
throw TServiceError(e.GetCode())
<< "unable to configure " << DeviceName << ": " << e.what();
StartResult.SetValue(MakeError(
e.GetCode(),
TStringBuilder()
<< "unable to configure " << DeviceName << ": " << e.what()));
}
}

int TNetlinkDevice::StatusHandler(nl_msg* message, void* argument)
{
auto* header = static_cast<genlmsghdr*>(nlmsg_data(nlmsg_hdr(message)));
auto* conn = static_cast<TNetlinkDevice*>(argument);
auto Log = conn->Log;
auto context = std::unique_ptr<THandlerContext>(
static_cast<THandlerContext*>(argument));

nlattr* attr[NBD_ATTR_MAX + 1] = {};
nlattr* deviceItem[NBD_DEVICE_ITEM_MAX + 1] = {};
nlattr* device[NBD_DEVICE_ATTR_MAX + 1] = {};
Expand All @@ -377,12 +413,17 @@ int TNetlinkDevice::StatusHandler(nl_msg* message, void* argument)
genlmsg_attrlen(header, 0),
NULL))
{
STORAGE_ERROR("unable to parse NBD_CMD_STATUS response: " << err);
context->Device->StartResult.SetValue(MakeError(
E_FAIL,
TStringBuilder()
<< "unable to parse NBD_CMD_STATUS response: " << err));
return NL_STOP;
}

if (!attr[NBD_ATTR_DEVICE_LIST]) {
STORAGE_ERROR("did not receive NBD_ATTR_DEVICE_LIST");
context->Device->StartResult.SetValue(MakeError(
E_FAIL,
"did not receive NBD_ATTR_DEVICE_LIST"));
return NL_STOP;
}

Expand All @@ -392,12 +433,17 @@ int TNetlinkDevice::StatusHandler(nl_msg* message, void* argument)
attr[NBD_ATTR_DEVICE_LIST],
deviceItemPolicy))
{
STORAGE_ERROR("unable to parse NBD_ATTR_DEVICE_LIST: " << err);
context->Device->StartResult.SetValue(MakeError(
E_FAIL,
TStringBuilder()
<< "unable to parse NBD_ATTR_DEVICE_LIST: " << err));
return NL_STOP;
}

if (!deviceItem[NBD_DEVICE_ITEM]) {
STORAGE_ERROR("did not receive NBD_DEVICE_ITEM");
context->Device->StartResult.SetValue(MakeError(
E_FAIL,
"did not receive NBD_DEVICE_ITEM"));
return NL_STOP;
}

Expand All @@ -407,16 +453,21 @@ int TNetlinkDevice::StatusHandler(nl_msg* message, void* argument)
deviceItem[NBD_DEVICE_ITEM],
devicePolicy))
{
STORAGE_ERROR("unable to parse NBD_DEVICE_ITEM: " << err);
context->Device->StartResult.SetValue(MakeError(
E_FAIL,
TStringBuilder()
<< "unable to parse NBD_DEVICE_ITEM: " << err));
return NL_STOP;
}

if (!device[NBD_DEVICE_CONNECTED]) {
STORAGE_ERROR("did not receive NBD_DEVICE_CONNECTED");
context->Device->StartResult.SetValue(MakeError(
E_FAIL,
"did not receive NBD_DEVICE_CONNECTED"));
return NL_STOP;
}

conn->DoConnectDevice(nla_get_u8(device[NBD_DEVICE_CONNECTED]));
context->Device->DoConnectDevice(nla_get_u8(device[NBD_DEVICE_CONNECTED]));

return NL_OK;
}
Expand Down

0 comments on commit 7dbf27a

Please sign in to comment.