Skip to content

Commit

Permalink
fix client wr leak; issue-2690 (#2706)
Browse files Browse the repository at this point in the history
  • Loading branch information
tpashkin authored Dec 19, 2024
1 parent c67f1bf commit 6d14024
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 26 deletions.
36 changes: 30 additions & 6 deletions cloud/blockstore/libs/rdma/impl/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -492,8 +492,7 @@ class TClientEndpoint final
bool IsWorkRequestValid(const TWorkRequestId& id) const;
void HandleFlush(const TWorkRequestId& id) noexcept;
void SendRequest(TRequestPtr req, TSendWr* send);
void SendRequestCompleted(
TSendWr* send, ibv_wc_status status) noexcept;
void SendRequestCompleted(TSendWr* send, ibv_wc_status status) noexcept;
void RecvResponse(TRecvWr* recv);
void RecvResponseCompleted(TRecvWr* recv, ibv_wc_status status);
void AbortRequest(TRequestPtr req, ui32 err, const TString& msg) noexcept;
Expand Down Expand Up @@ -952,7 +951,21 @@ void TClientEndpoint::SendRequest(TRequestPtr req, TSendWr* send)
requestMsg->Out = req->OutBuffer;

RDMA_TRACE("SEND " << TWorkRequestId(send->wr.wr_id));
Verbs->PostSend(Connection->qp, &send->wr);

try {
Verbs->PostSend(Connection->qp, &send->wr);
} catch (const TServiceError& e) {
RDMA_ERROR(
"SEND " << TWorkRequestId(send->wr.wr_id) << ": " << e.what());
SendQueue.Push(send);
ReportRdmaError();
Disconnect();

Counters->RequestEnqueued();
QueuedRequests.Enqueue(std::move(req));

return;
}

LWTRACK(
SendRequestStarted,
Expand Down Expand Up @@ -1013,7 +1026,17 @@ void TClientEndpoint::RecvResponse(TRecvWr* recv)
Zero(*responseMsg);

RDMA_TRACE("RECV " << TWorkRequestId(recv->wr.wr_id));
Verbs->PostRecv(Connection->qp, &recv->wr);

try {
Verbs->PostRecv(Connection->qp, &recv->wr);
} catch (const TServiceError& e) {
RDMA_ERROR(
"RECV " << TWorkRequestId(recv->wr.wr_id) << ": " << e.what());
RecvQueue.Push(recv);
ReportRdmaError();
Disconnect();
return;
}

Counters->RecvResponseStarted();
}
Expand All @@ -1027,7 +1050,7 @@ void TClientEndpoint::RecvResponseCompleted(
<< ": " << NVerbs::GetStatusString(wc_status));

Counters->RecvResponseError();
RecvResponse(recv);
RecvQueue.Push(recv);
ReportRdmaError();
Disconnect();
return;
Expand All @@ -1037,7 +1060,8 @@ void TClientEndpoint::RecvResponseCompleted(
int version = ParseMessageHeader(msg);
if (version != RDMA_PROTO_VERSION) {
RDMA_ERROR("RECV " << TWorkRequestId(recv->wr.wr_id)
<< ": unrecognized protocol version")
<< ": incompatible protocol version "
<< version << ", expected " << int(RDMA_PROTO_VERSION));

Counters->RecvResponseError();
RecvResponse(recv);
Expand Down
57 changes: 41 additions & 16 deletions cloud/blockstore/libs/rdma/impl/client_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ Y_UNIT_TEST_SUITE(TRdmaClientTest)
// wait for receive queue to initialize 2nd time after reconnect
ui64 recv;
do {
recv = AtomicGet(testContext->PostRecv);
recv = AtomicGet(testContext->PostRecvCounter);
} while (recv != 2 * clientConfig->QueueSize);

struct TResponse
Expand Down Expand Up @@ -387,7 +387,7 @@ Y_UNIT_TEST_SUITE(TRdmaClientTest)
UNIT_ASSERT_VALUES_EQUAL(0, response.Status);
}

Y_UNIT_TEST(ShouldIgnoreMalformedCompletions)
Y_UNIT_TEST(ShouldHandleErrors)
{
auto context = MakeIntrusive<NVerbs::TTestContext>();
context->AllowConnect = true;
Expand Down Expand Up @@ -421,42 +421,67 @@ Y_UNIT_TEST_SUITE(TRdmaClientTest)

auto unexpected = counters->GetCounter("UnexpectedCompletions");
auto active = counters->GetCounter("ActiveRecv");
auto errors = counters->GetCounter("RecvErrors");
auto unexpectedOld = unexpected->Val();
auto activeOld = active->Val();
auto errorsOld = errors->Val();

ibv_recv_wr wr;
ibv_recv_wr* wr = context->RecvEvents.back();
auto completion = TVector<ibv_wc>();

with_lock(context->CompletionLock) {
// emulate IBV_QPS_ERR
context->PostRecv = [](auto* qp, auto* wr) {
Y_UNUSED(qp);
Y_UNUSED(wr);
throw TServiceError(ENODEV) << "ibv_post_recv error";
};
// good id, good opcode, error status
completion.push_back({
.wr_id = wr->wr_id,
.status = IBV_WC_RETRY_EXC_ERR,
.opcode = IBV_WC_RECV,
});
// good id, good opcode, success status, bad message
completion.push_back({
.wr_id = wr->wr_id,
.opcode = IBV_WC_RECV,
});
// bad id, good opcode
completion.push_back({
.wr_id = Max<ui64>(),
.opcode = IBV_WC_RECV,
});
// good id, bad opcode
completion.push_back({
.wr_id = context->RecvEvents.back()->wr_id,
.wr_id = wr->wr_id,
.opcode = IBV_WC_RECV_RDMA_WITH_IMM,
});
context->HandleCompletionEvent = [&](ibv_wc* wc) {
static int i = 0;
*wc = completion[i++];
};
// actual wr doesn't matter, only it's address
context->ProcessedRecvEvents.push_back(&wr);
context->ProcessedRecvEvents.push_back(&wr);
// HandleCompletionEvent will override completions, but we still
// need to pass a valid request pointer here
for (size_t i = 0; i < completion.size(); i++) {
context->ProcessedRecvEvents.push_back(wr);
}
context->CompletionHandle.Set();
}

auto start = GetCycleCount();
while (unexpected->Val() != unexpectedOld + 2) {
auto now = GetCycleCount();
if (CyclesToDurationSafe(now - start) > TDuration::Seconds(5)) {
UNIT_FAIL("should increment counter");
auto wait = [](auto& counter, auto value) {
auto start = GetCycleCount();
while (counter->Val() != value) {
auto now = GetCycleCount();
if (CyclesToDurationSafe(now - start) > TDuration::Seconds(5)) {
UNIT_FAIL("timed out waiting for counter");
}
SpinLockPause();
}
SpinLockPause();
}
UNIT_ASSERT_EQUAL(active->Val(), activeOld);
};

wait(unexpected, unexpectedOld + 2);
wait(errors, errorsOld + 2);
wait(active, 8);
}
};

Expand Down
6 changes: 4 additions & 2 deletions cloud/blockstore/libs/rdma/impl/test_verbs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -266,12 +266,14 @@ struct TTestVerbs

void PostRecv(ibv_qp* qp, ibv_recv_wr* wr) override
{
Y_UNUSED(qp);
if (TestContext->PostRecv) {
TestContext->PostRecv(qp, wr);
}

auto g = Guard(TestContext->CompletionLock);
TestContext->RecvEvents.push_back(wr);

AtomicIncrement(TestContext->PostRecv);
AtomicIncrement(TestContext->PostRecvCounter);
}

// connection manager
Expand Down
3 changes: 2 additions & 1 deletion cloud/blockstore/libs/rdma/impl/test_verbs.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ struct TTestContext: TAtomicRefCount<TTestContext>
TDeque<ibv_recv_wr*> RecvEvents;
TDeque<ibv_recv_wr*> ProcessedRecvEvents;
TSpinLock CompletionLock;
TAtomic PostRecv = 0;
TAtomic PostRecvCounter = 0;

std::function<void(ibv_qp* qp, ibv_recv_wr* wr)> PostRecv;
std::function<void(rdma_cm_id* id, int backlog)> Listen;
std::function<void(ibv_wc* wc)> HandleCompletionEvent;
};
Expand Down
2 changes: 1 addition & 1 deletion cloud/blockstore/libs/rdma/impl/verbs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,7 @@ TString PrintConnectionParams(const rdma_conn_param* conn)
TString PrintCompletion(ibv_wc* wc)
{
return TStringBuilder()
<< "[id=" << TWorkRequestId(wc->wr_id)
<< "[wr_id=" << TWorkRequestId(wc->wr_id)
<< " opcode=" << GetOpcodeName(wc->opcode)
<< " status=" << GetStatusString(wc->status)
<< "]";
Expand Down

0 comments on commit 6d14024

Please sign in to comment.