Skip to content

Commit

Permalink
Add timeout to cancelInflightRequest()
Browse files Browse the repository at this point in the history
  • Loading branch information
pentschev committed Nov 13, 2023
1 parent 53486f9 commit 5bda348
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 31 deletions.
20 changes: 16 additions & 4 deletions cpp/include/ucxx/endpoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -232,9 +232,21 @@ class Endpoint : public Component {
* This is usually executed by `close()`, when pending requests will no longer be able
* to complete.
*
* If the parent worker is running a progress thread, a maximum timeout may be specified
* for which the close operation will wait. This can be particularly important for cases
* where the progress thread might be attempting to acquire a resource (e.g., the Python
* GIL) while the current thread owns that resource. In particular for Python, the
* `~Endpoint()` will call this method for which we can't release the GIL when the garbage
* collector runs and destroys the object.
*
* @param[in] period maximum period to wait for a generic pre/post progress thread
* operation will wait for.
* @param[in] maxAttempts maximum number of attempts to close endpoint, only applicable
* if worker is running a progress thread and `period > 0`.
*
* @returns Number of requests that were canceled.
*/
size_t cancelInflightRequests();
size_t cancelInflightRequests(uint64_t period = 0, uint64_t maxAttempts = 1);

/**
* @brief Register a user-defined callback to call when endpoint closes.
Expand Down Expand Up @@ -516,10 +528,10 @@ class Endpoint : public Component {
* collector runs and destroys the object.
*
* @param[in] period maximum period to wait for a generic pre/post progress thread
operation will wait for.
* operation will wait for.
* @param[in] maxAttempts maximum number of attempts to close endpoint, only applicable
if worker is running a progress thread and `period > 0`.
* if worker is running a progress thread and `period > 0`.
*
*/
void close(uint64_t period = 0, uint64_t maxAttempts = 1);
};
Expand Down
14 changes: 13 additions & 1 deletion cpp/include/ucxx/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -573,9 +573,21 @@ class Worker : public Component {
* Cancel inflight requests, returning the total number of requests that were canceled.
* This is usually executed during the progress loop.
*
* If the parent worker is running a progress thread, a maximum timeout may be specified
* for which the close operation will wait. This can be particularly important for cases
* where the progress thread might be attempting to acquire a resource (e.g., the Python
* GIL) while the current thread owns that resource. In particular for Python, the
* `~Worker()` will call this method for which we can't release the GIL when the garbage
* collector runs and destroys the object.
*
* @param[in] period maximum period to wait for a generic pre/post progress thread
* operation will wait for.
* @param[in] maxAttempts maximum number of attempts to close endpoint, only applicable
* if worker is running a progress thread and `period > 0`.
*
* @returns Number of requests that were canceled.
*/
size_t cancelInflightRequests();
size_t cancelInflightRequests(uint64_t period = 0, uint64_t maxAttempts = 1);

/**
* @brief Schedule cancelation of inflight requests.
Expand Down
34 changes: 22 additions & 12 deletions cpp/src/endpoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,15 +138,15 @@ std::shared_ptr<Endpoint> createEndpointFromWorkerAddress(std::shared_ptr<Worker

Endpoint::~Endpoint()
{
close(10000000);
close(10000000000 /* 10s */);
ucxx_trace("Endpoint destroyed: %p, UCP handle: %p", this, _originalHandle);
}

void Endpoint::close(uint64_t period, uint64_t maxAttempts)
{
if (_handle == nullptr) return;

size_t canceled = cancelInflightRequests();
size_t canceled = cancelInflightRequests(3000000000 /* 3s */, 3);
ucxx_debug("Endpoint %p canceled %lu requests", _handle, canceled);

// Close the endpoint
Expand Down Expand Up @@ -267,7 +267,7 @@ void Endpoint::removeInflightRequest(const Request* const request)
_inflightRequests->remove(request);
}

size_t Endpoint::cancelInflightRequests()
size_t Endpoint::cancelInflightRequests(uint64_t period, uint64_t maxAttempts)
{
auto worker = ::ucxx::getWorker(this->_parent);
size_t canceled = 0;
Expand All @@ -276,15 +276,25 @@ size_t Endpoint::cancelInflightRequests()
canceled = _inflightRequests->cancelAll();
worker->progress();
} else if (worker->isProgressThreadRunning()) {
utils::CallbackNotifier callbackNotifierPre{};
worker->registerGenericPre([this, &callbackNotifierPre, &canceled]() {
canceled = _inflightRequests->cancelAll();
callbackNotifierPre.set();
});
callbackNotifierPre.wait();
utils::CallbackNotifier callbackNotifierPost{};
worker->registerGenericPost([&callbackNotifierPost]() { callbackNotifierPost.set(); });
callbackNotifierPost.wait();
bool cancelSuccess = false;
for (uint64_t i = 0; i < maxAttempts; ++i) {
utils::CallbackNotifier callbackNotifierPre{};
worker->registerGenericPre([this, &callbackNotifierPre, &canceled]() {
canceled = _inflightRequests->cancelAll();
callbackNotifierPre.set();
});
if (!callbackNotifierPre.wait(period)) continue;

utils::CallbackNotifier callbackNotifierPost{};
worker->registerGenericPost([&callbackNotifierPost]() { callbackNotifierPost.set(); });
if (!callbackNotifierPost.wait(period)) continue;

cancelSuccess = true;
}
if (!cancelSuccess)
ucxx_error("All attempts to cancel inflight requests failed on endpoint: %p, UCP handle: %p",
this,
_handle);
} else {
canceled = _inflightRequests->cancelAll();
}
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,11 @@ Listener::~Listener()
ucp_listener_destroy(_handle);
callbackNotifierPre.set();
});
callbackNotifierPre.wait(10000000);
callbackNotifierPre.wait(10000000000 /* 10s */);

utils::CallbackNotifier callbackNotifierPost{};
worker->registerGenericPost([&callbackNotifierPost]() { callbackNotifierPost.set(); });
callbackNotifierPost.wait(10000000);
callbackNotifierPost.wait(10000000000 /* 10s */);
} else {
ucp_listener_destroy(_handle);
worker->progress();
Expand Down
34 changes: 22 additions & 12 deletions cpp/src/worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ std::shared_ptr<Worker> createWorker(std::shared_ptr<Context> context,

Worker::~Worker()
{
size_t canceled = cancelInflightRequests();
size_t canceled = cancelInflightRequests(3000000000 /* 3s */, 3);
ucxx_debug("Worker %p canceled %lu requests", _handle, canceled);

stopProgressThreadNoWarn();
Expand Down Expand Up @@ -266,7 +266,7 @@ bool Worker::progress()
if (progressScheduledCancel) ret |= progressPending();

// Requests that were not completed now must be canceled.
if (cancelInflightRequests() > 0) ret |= progressPending();
if (cancelInflightRequests(3000000000 /* 3s */, 3) > 0) ret |= progressPending();

return ret;
}
Expand Down Expand Up @@ -399,7 +399,7 @@ bool Worker::isProgressThreadRunning() { return _progressThread != nullptr; }

std::thread::id Worker::getProgressThreadId() { return _progressThreadId; }

size_t Worker::cancelInflightRequests()
size_t Worker::cancelInflightRequests(uint64_t period, uint64_t maxAttempts)
{
size_t canceled = 0;

Expand All @@ -413,16 +413,26 @@ size_t Worker::cancelInflightRequests()
canceled = inflightRequestsToCancel->cancelAll();
progressPending();
} else if (isProgressThreadRunning()) {
utils::CallbackNotifier callbackNotifierPre{};
registerGenericPre([&callbackNotifierPre, &canceled, &inflightRequestsToCancel]() {
canceled = inflightRequestsToCancel->cancelAll();
callbackNotifierPre.set();
});
callbackNotifierPre.wait();
bool cancelSuccess = false;
for (uint64_t i = 0; i < maxAttempts; ++i) {
utils::CallbackNotifier callbackNotifierPre{};
registerGenericPre([&callbackNotifierPre, &canceled, &inflightRequestsToCancel]() {
canceled = inflightRequestsToCancel->cancelAll();
callbackNotifierPre.set();
});
if (!callbackNotifierPre.wait(period)) continue;

utils::CallbackNotifier callbackNotifierPost{};
registerGenericPost([&callbackNotifierPost]() { callbackNotifierPost.set(); });
if (!callbackNotifierPost.wait(period)) continue;

cancelSuccess = true;
}

utils::CallbackNotifier callbackNotifierPost{};
registerGenericPost([&callbackNotifierPost]() { callbackNotifierPost.set(); });
callbackNotifierPost.wait();
if (!cancelSuccess)
ucxx_error("All attempts to cancel inflight requests failed on worker: %p, UCP handle: %p",
this,
_handle);
} else {
canceled = inflightRequestsToCancel->cancelAll();
}
Expand Down

0 comments on commit 5bda348

Please sign in to comment.