Skip to content

Commit

Permalink
Merge pull request #7189 from raffenet/ucx-am-recv-decl
Browse files Browse the repository at this point in the history
ch4/ucx: Reorder function definitions

Approved-by: Ken Raffenetti <[email protected]>
  • Loading branch information
raffenet authored Oct 23, 2024
2 parents 49bb9c1 + 1354596 commit 41def54
Showing 1 changed file with 43 additions and 43 deletions.
86 changes: 43 additions & 43 deletions src/mpid/ch4/netmod/ucx/ucx_am.c
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,49 @@ void MPIDI_UCX_am_send_callback(void *request, ucs_status_t status)
}

#ifdef HAVE_UCP_AM_NBX
/* Called when recv buffer is posted */
int MPIDI_UCX_do_am_recv(MPIR_Request * rreq)
{
void *recv_buf;
bool is_contig;
MPI_Aint data_sz, in_data_sz;
int vci = MPIDI_Request_get_vci(rreq);

MPIDIG_get_recv_buffer(&recv_buf, &data_sz, &is_contig, &in_data_sz, rreq);
if (!is_contig || in_data_sz > data_sz) {
/* non-contig datatype, need receive into pack buffer */
/* ucx will error out if buffer size is less than the promised data size,
* also use a pack buffer in this case */
recv_buf = MPL_malloc(in_data_sz, MPL_MEM_OTHER);
MPIR_Assert(recv_buf);
MPIDI_UCX_AM_RECV_REQUEST(rreq, pack_buffer) = recv_buf;
} else {
MPIDI_UCX_AM_RECV_REQUEST(rreq, pack_buffer) = NULL;
}

MPIDI_UCX_ucp_request_t *ucp_request;
size_t received_length;
ucp_request_param_t param = {
.op_attr_mask = UCP_OP_ATTR_FIELD_CALLBACK | UCP_OP_ATTR_FIELD_RECV_INFO,
.cb.recv_am = &MPIDI_UCX_am_recv_callback_nbx,
.recv_info.length = &received_length,
};
void *data_desc = MPIDI_UCX_AM_RECV_REQUEST(rreq, data_desc);
/* note: use in_data_sz to match promised data size */
ucp_request = ucp_am_recv_data_nbx(MPIDI_UCX_global.ctx[vci].worker,
data_desc, recv_buf, in_data_sz, &param);
if (ucp_request == NULL) {
/* completed immediately */
MPIDI_UCX_ucp_request_t tmp_ucp_request;
tmp_ucp_request.req = rreq;
MPIDI_UCX_am_recv_callback_nbx(&tmp_ucp_request, UCS_OK, received_length, NULL);
} else {
ucp_request->req = rreq;
}

return MPI_SUCCESS;
}

/* Am handler for messages sent from ucp_am_send_nbx. Registered with
* ucp_worker_set_am_recv_handler.
*/
Expand Down Expand Up @@ -116,49 +159,6 @@ ucs_status_t MPIDI_UCX_am_nbx_handler(void *arg, const void *header, size_t head
}
}

/* Called when recv buffer is posted */
int MPIDI_UCX_do_am_recv(MPIR_Request * rreq)
{
void *recv_buf;
bool is_contig;
MPI_Aint data_sz, in_data_sz;
int vci = MPIDI_Request_get_vci(rreq);

MPIDIG_get_recv_buffer(&recv_buf, &data_sz, &is_contig, &in_data_sz, rreq);
if (!is_contig || in_data_sz > data_sz) {
/* non-contig datatype, need receive into pack buffer */
/* ucx will error out if buffer size is less than the promised data size,
* also use a pack buffer in this case */
recv_buf = MPL_malloc(in_data_sz, MPL_MEM_OTHER);
MPIR_Assert(recv_buf);
MPIDI_UCX_AM_RECV_REQUEST(rreq, pack_buffer) = recv_buf;
} else {
MPIDI_UCX_AM_RECV_REQUEST(rreq, pack_buffer) = NULL;
}

MPIDI_UCX_ucp_request_t *ucp_request;
size_t received_length;
ucp_request_param_t param = {
.op_attr_mask = UCP_OP_ATTR_FIELD_CALLBACK | UCP_OP_ATTR_FIELD_RECV_INFO,
.cb.recv_am = &MPIDI_UCX_am_recv_callback_nbx,
.recv_info.length = &received_length,
};
void *data_desc = MPIDI_UCX_AM_RECV_REQUEST(rreq, data_desc);
/* note: use in_data_sz to match promised data size */
ucp_request = ucp_am_recv_data_nbx(MPIDI_UCX_global.ctx[vci].worker,
data_desc, recv_buf, in_data_sz, &param);
if (ucp_request == NULL) {
/* completed immediately */
MPIDI_UCX_ucp_request_t tmp_ucp_request;
tmp_ucp_request.req = rreq;
MPIDI_UCX_am_recv_callback_nbx(&tmp_ucp_request, UCS_OK, received_length, NULL);
} else {
ucp_request->req = rreq;
}

return MPI_SUCCESS;
}

/* callback for ucp_am_recv_data_nbx */
void MPIDI_UCX_am_recv_callback_nbx(void *request, ucs_status_t status, size_t length,
void *user_data)
Expand Down

0 comments on commit 41def54

Please sign in to comment.