Skip to content

Commit

Permalink
non-blocking ep cache finalization
Browse files Browse the repository at this point in the history
  • Loading branch information
JaeseungYeom committed Feb 15, 2024
1 parent c4cd6d5 commit ec9f48c
Showing 1 changed file with 58 additions and 7 deletions.
65 changes: 58 additions & 7 deletions src/dyad/dtl/ucx_ep_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -230,14 +230,65 @@ dyad_rc_t dyad_ucx_ep_cache_finalize (const dyad_ctx_t *ctx, ucx_ep_cache_h* cac
if (cache == nullptr || *cache == nullptr) {
return DYAD_RC_OK;
}
cache_type* cpp_cache = static_cast<cache_type*> (*cache);
for (cache_type::iterator it = cpp_cache->begin (); it != cpp_cache->end ();) {
ucx_disconnect (ctx, worker, it->second);
//it = cache_remove_impl (ctx, cpp_cache, it, worker);

dyad_rc_t rc = DYAD_RC_OK;
cache_type& cpp_cache = *(static_cast<cache_type*> (*cache));
std::vector<ucs_status_ptr_t> stat_ptrs (cpp_cache.size ());
auto it_stat = stat_ptrs.begin ();

#if UCP_API_VERSION >= UCP_VERSION(1, 10)
ucp_request_param_t close_params;
close_params.op_attr_mask = UCP_OP_ATTR_FIELD_FLAGS;
close_params.flags = UCP_EP_CLOSE_FLAG_FORCE;
#endif

for (cache_type::iterator it = cpp_cache.begin (); it != cpp_cache.end ();) {
ucp_ep_h& ep = it->second;
if (ep != NULL) {
// ucp_tag_send_sync_nbx is the prefered version of this send
// since UCX 1.9 However, some systems (e.g., Lassen) may have
// an older verison This conditional compilation will use
// ucp_tag_send_sync_nbx if using UCX 1.9+, and it will use the
// deprecated ucp_tag_send_sync_nb if using UCX < 1.9.
#if UCP_API_VERSION >= UCP_VERSION(1, 10)
*(it_stat++) = ucp_ep_close_nbx (ep, &close_params);
#else
// TODO change to FORCE if we decide to enable err handleing
// mode
*(it_stat++) = ucp_ep_close_nb (ep, UCP_EP_CLOSE_MODE_FORCE);
#endif
}
}
cpp_cache->clear ();
delete cpp_cache;

for (auto& stat_ptr : stat_ptrs)
{
ucs_status_t status = UCS_OK;
// Don't use dyad_ucx_request_wait here because ep_close behaves
// differently than other UCX calls
if (stat_ptr != NULL) {
if (UCS_PTR_IS_PTR (stat_ptr)) {
// Endpoint close is in-progress.
// Wait until finished
do {
ucp_worker_progress (worker);
status = ucp_request_check_status (stat_ptr);
} while (status == UCS_INPROGRESS);
ucp_request_free (stat_ptr);
} else {
// An error occurred during endpoint closure
// However, the endpoint can no longer be used
// Get the status code for reporting
status = UCS_PTR_STATUS (stat_ptr);
}
if (UCX_STATUS_FAIL (status)) {
rc = DYAD_RC_UCXEP_FAIL;
}
}
}

cpp_cache.clear ();
delete &cpp_cache;
*cache = nullptr;
DYAD_C_FUNCTION_END();
return DYAD_RC_OK;
return rc;
}

0 comments on commit ec9f48c

Please sign in to comment.