diff --git a/.github/actions/prepare/action.yaml b/.github/actions/prepare/action.yaml index f9488a6d533..0385a480ba9 100644 --- a/.github/actions/prepare/action.yaml +++ b/.github/actions/prepare/action.yaml @@ -16,7 +16,7 @@ runs: sudo apt-get update sudo apt-get install -y --no-install-recommends git wget gnupg lsb-release curl xz-utils tzdata cmake \ python3-dev python3-pip ninja-build antlr3 m4 libidn11-dev libaio1 libaio-dev make clang-14 lld-14 llvm-14 file \ - distcc strace qemu-kvm qemu-utils dpkg-dev atop pigz pbzip2 xz-utils pixz + distcc strace qemu-kvm qemu-utils dpkg-dev atop pigz pbzip2 xz-utils pixz libnl-3-dev libnl-genl-3-dev sudo apt-get remove -y unattended-upgrades sudo pip3 install conan==1.59 pytest==7.1.3 pytest-timeout pytest-xdist==3.3.1 setproctitle==1.3.2 grpcio grpcio-tools \ PyHamcrest tornado xmltodict pyarrow boto3 moto[server] psutil yandexcloud==0.258.0 PyGithub==2.2.0 pyinstaller==5.13.2 \ diff --git a/.github/scripts/github-runner.sh b/.github/scripts/github-runner.sh index e19db7a0695..0ead1326990 100644 --- a/.github/scripts/github-runner.sh +++ b/.github/scripts/github-runner.sh @@ -55,7 +55,7 @@ sudo apt-get install -y --no-install-recommends \ dpkg-dev docker-ce docker-ce-cli containerd.io \ docker-buildx-plugin docker-compose-plugin jq \ aria2 jq tree tmux atop awscli iftop htop \ - pixz pigz pbzip2 xz-utils + pixz pigz pbzip2 xz-utils libnl-3-dev libnl-genl-3-dev cat << EOF > /tmp/requirements.txt conan==1.59 pytest==7.1.3 diff --git a/cloud/blockstore/libs/endpoint_proxy/server/server.cpp b/cloud/blockstore/libs/endpoint_proxy/server/server.cpp index d132499127d..c5c2c88780b 100644 --- a/cloud/blockstore/libs/endpoint_proxy/server/server.cpp +++ b/cloud/blockstore/libs/endpoint_proxy/server/server.cpp @@ -584,15 +584,24 @@ struct TServer: IEndpointProxyServer if (ep.NbdDevicePath) { if (Config.Netlink) { #ifdef NETLINK - ep.NbdDevice = NBD::CreateNetlinkDevice( - Logging, - *ep.ListenAddress, - request.GetNbdDevice(), - TDuration::Minutes(1), // request timeout - TDuration::Days(1), // connection timeout - true); // reconfigure device if exists + try { + ep.NbdDevice = NBD::CreateNetlinkDevice( + Logging, + *ep.ListenAddress, + request.GetNbdDevice(), + TDuration::Minutes(1), // request timeout + TDuration::Days(1), // connection timeout + true); // reconfigure existing device + + } catch (const std::exception& e) { + STORAGE_ERROR(request.ShortDebugString().Quote() + << " - Unable to create netlink device: " << e.what() + << ", falling back to ioctl"); + } #else - STORAGE_ERROR("built without netlink support, falling back to ioctl"); + STORAGE_ERROR(request.ShortDebugString().Quote() + << " - Built without netlink support" + << ", falling back to ioctl"); #endif } if (ep.NbdDevice == nullptr) { @@ -602,10 +611,17 @@ struct TServer: IEndpointProxyServer request.GetNbdDevice(), TDuration::Days(1)); // request timeout } - ep.NbdDevice->Start(); - STORAGE_INFO(request.ShortDebugString().Quote() - << " - Started NBD device connection"); + auto start = ep.NbdDevice->Start(); + const auto& value = start.GetValue(); + if (HasError(value)) { + STORAGE_ERROR(request.ShortDebugString().Quote() + << " - Unable to start nbd device: " + << value.GetMessage()); + } else { + STORAGE_INFO(request.ShortDebugString().Quote() + << " - Started NBD device connection"); + } } else { STORAGE_WARN(request.ShortDebugString().Quote() << " - NbdDevice missing - no nbd connection with the" diff --git a/cloud/blockstore/libs/nbd/netlink_device.cpp b/cloud/blockstore/libs/nbd/netlink_device.cpp index 60d716470c7..bded54ec156 100644 --- a/cloud/blockstore/libs/nbd/netlink_device.cpp +++ b/cloud/blockstore/libs/nbd/netlink_device.cpp @@ -17,6 +17,8 @@ namespace NCloud::NBlockStore::NBD { namespace { +using namespace NThreading; + //////////////////////////////////////////////////////////////////////////////// constexpr TStringBuf NBD_DEVICE_SUFFIX = "/dev/nbd"; @@ -131,7 +133,8 @@ class TNetlinkMessage void Put(int attribute, T data) { if (nla_put(Message, attribute, sizeof(T), &data) < 0) { - throw TServiceError(E_FAIL) << "unable to put attribute " << attribute; + throw TServiceError(E_FAIL) << "unable to put attribute " + << attribute; } } @@ -155,7 +158,18 @@ class TNetlinkMessage class TNetlinkDevice final : public IDevice + , public std::enable_shared_from_this { +private: + struct THandlerContext + { + std::shared_ptr Device; + + THandlerContext(std::shared_ptr device) + : Device(std::move(device)) + {} + }; + private: const ILoggingServicePtr Logging; const TNetworkAddress ConnectAddress; @@ -168,9 +182,11 @@ class TNetlinkDevice final IClientHandlerPtr Handler; TSocket Socket; ui32 DeviceIndex; - TAtomic ShouldStop = 0; + TPromise StartResult = NewPromise(); + TPromise StopResult = NewPromise(); + public: TNetlinkDevice( ILoggingServicePtr logging, @@ -197,7 +213,8 @@ class TNetlinkDevice final } try { - TMemoryInput stream(DeviceName.data() + pos + NBD_DEVICE_SUFFIX.size()); + TMemoryInput stream( + DeviceName.data() + pos + NBD_DEVICE_SUFFIX.size()); stream >> DeviceIndex; } catch (...) { throw TServiceError(E_ARGUMENT) @@ -207,29 +224,42 @@ class TNetlinkDevice final ~TNetlinkDevice() { - Stop(false); + Stop(false).GetValueSync(); } - NThreading::TFuture Start() override + TFuture Start() override { ConnectSocket(); ConnectDevice(); - return NThreading::MakeFuture(MakeError(S_OK)); + // will be set asynchronously in Connect > HandleStatus > DoConnect + return StartResult.GetFuture(); } - NThreading::TFuture Stop(bool deleteDevice) override + TFuture Stop(bool deleteDevice) override { if (AtomicSwap(&ShouldStop, 1) == 1) { - return NThreading::MakeFuture(MakeError(S_OK)); + return StopResult.GetFuture(); + } + + if (!deleteDevice) { + StopResult.SetValue(MakeError(S_OK)); + return StopResult.GetFuture(); } - if (deleteDevice) { + try { DisconnectDevice(); DisconnectSocket(); + StopResult.SetValue(MakeError(S_OK)); + + } catch (const TServiceError& e) { + StopResult.SetValue(MakeError( + E_FAIL, + TStringBuilder() << "unable to disconnect " << DeviceName + << ": " << e.what())); } - return NThreading::MakeFuture(MakeError(S_OK)); + return StopResult.GetFuture(); } private: @@ -290,7 +320,9 @@ void TNetlinkDevice::DoConnectDevice(bool connected) const auto& info = Handler->GetExportInfo(); message.Put(NBD_ATTR_INDEX, DeviceIndex); message.Put(NBD_ATTR_SIZE_BYTES, static_cast(info.Size)); - message.Put(NBD_ATTR_BLOCK_SIZE_BYTES, static_cast(info.MinBlockSize)); + message.Put( + NBD_ATTR_BLOCK_SIZE_BYTES, + static_cast(info.MinBlockSize)); message.Put(NBD_ATTR_SERVER_FLAGS, static_cast(info.Flags)); if (Timeout) { @@ -298,7 +330,9 @@ void TNetlinkDevice::DoConnectDevice(bool connected) } if (DeadConnectionTimeout) { - message.Put(NBD_ATTR_DEAD_CONN_TIMEOUT, DeadConnectionTimeout.Seconds()); + message.Put( + NBD_ATTR_DEAD_CONN_TIMEOUT, + DeadConnectionTimeout.Seconds()); } { @@ -308,8 +342,13 @@ void TNetlinkDevice::DoConnectDevice(bool connected) } message.Send(socket); + StartResult.SetValue(MakeError(S_OK)); + } catch (const TServiceError& e) { - STORAGE_ERROR("unable to configure " << DeviceName << ": " << e.what()); + StartResult.SetValue(MakeError( + e.GetCode(), + TStringBuilder() + << "unable to configure " << DeviceName << ": " << e.what())); } } @@ -317,14 +356,10 @@ void TNetlinkDevice::DisconnectDevice() { STORAGE_INFO("disconnect " << DeviceName); - try { - TNetlinkSocket socket; - TNetlinkMessage message(socket.GetFamily(), NBD_CMD_DISCONNECT); - message.Put(NBD_ATTR_INDEX, DeviceIndex); - message.Send(socket); - } catch (const TServiceError& e) { - STORAGE_ERROR("unable to disconnect " << DeviceName << ": " << e.what()); - } + TNetlinkSocket socket; + TNetlinkMessage message(socket.GetFamily(), NBD_CMD_DISCONNECT); + message.Put(NBD_ATTR_INDEX, DeviceIndex); + message.Send(socket); } // queries device status and registers callback that will connect @@ -333,32 +368,33 @@ void TNetlinkDevice::ConnectDevice() { try { TNetlinkSocket socket; + auto context = std::make_unique(shared_from_this()); + nl_socket_modify_cb( socket, NL_CB_VALID, NL_CB_CUSTOM, TNetlinkDevice::StatusHandler, - this); - - // TODO: use proper context containing device pointer and a socket - // send_sync waits for the response and invokes callback immediately - // even before returning, so it's technically okay to pass 'this' as - // an argument, but it still looks flimsy + context.release()); // libnl doesn't throw TNetlinkMessage message(socket.GetFamily(), NBD_CMD_STATUS); message.Put(NBD_ATTR_INDEX, DeviceIndex); message.Send(socket); + } catch (const TServiceError& e) { - throw TServiceError(e.GetCode()) - << "unable to configure " << DeviceName << ": " << e.what(); + StartResult.SetValue(MakeError( + e.GetCode(), + TStringBuilder() + << "unable to configure " << DeviceName << ": " << e.what())); } } int TNetlinkDevice::StatusHandler(nl_msg* message, void* argument) { auto* header = static_cast(nlmsg_data(nlmsg_hdr(message))); - auto* conn = static_cast(argument); - auto Log = conn->Log; + auto context = std::unique_ptr( + static_cast(argument)); + nlattr* attr[NBD_ATTR_MAX + 1] = {}; nlattr* deviceItem[NBD_DEVICE_ITEM_MAX + 1] = {}; nlattr* device[NBD_DEVICE_ATTR_MAX + 1] = {}; @@ -377,12 +413,17 @@ int TNetlinkDevice::StatusHandler(nl_msg* message, void* argument) genlmsg_attrlen(header, 0), NULL)) { - STORAGE_ERROR("unable to parse NBD_CMD_STATUS response: " << err); + context->Device->StartResult.SetValue(MakeError( + E_FAIL, + TStringBuilder() + << "unable to parse NBD_CMD_STATUS response: " << err)); return NL_STOP; } if (!attr[NBD_ATTR_DEVICE_LIST]) { - STORAGE_ERROR("did not receive NBD_ATTR_DEVICE_LIST"); + context->Device->StartResult.SetValue(MakeError( + E_FAIL, + "did not receive NBD_ATTR_DEVICE_LIST")); return NL_STOP; } @@ -392,12 +433,17 @@ int TNetlinkDevice::StatusHandler(nl_msg* message, void* argument) attr[NBD_ATTR_DEVICE_LIST], deviceItemPolicy)) { - STORAGE_ERROR("unable to parse NBD_ATTR_DEVICE_LIST: " << err); + context->Device->StartResult.SetValue(MakeError( + E_FAIL, + TStringBuilder() + << "unable to parse NBD_ATTR_DEVICE_LIST: " << err)); return NL_STOP; } if (!deviceItem[NBD_DEVICE_ITEM]) { - STORAGE_ERROR("did not receive NBD_DEVICE_ITEM"); + context->Device->StartResult.SetValue(MakeError( + E_FAIL, + "did not receive NBD_DEVICE_ITEM")); return NL_STOP; } @@ -407,16 +453,21 @@ int TNetlinkDevice::StatusHandler(nl_msg* message, void* argument) deviceItem[NBD_DEVICE_ITEM], devicePolicy)) { - STORAGE_ERROR("unable to parse NBD_DEVICE_ITEM: " << err); + context->Device->StartResult.SetValue(MakeError( + E_FAIL, + TStringBuilder() + << "unable to parse NBD_DEVICE_ITEM: " << err)); return NL_STOP; } if (!device[NBD_DEVICE_CONNECTED]) { - STORAGE_ERROR("did not receive NBD_DEVICE_CONNECTED"); + context->Device->StartResult.SetValue(MakeError( + E_FAIL, + "did not receive NBD_DEVICE_CONNECTED")); return NL_STOP; } - conn->DoConnectDevice(nla_get_u8(device[NBD_DEVICE_CONNECTED])); + context->Device->DoConnectDevice(nla_get_u8(device[NBD_DEVICE_CONNECTED])); return NL_OK; }