Skip to content

Commit

Permalink
prov/efa: Handle recv for pkts without hdr
Browse files Browse the repository at this point in the history
This patch includes the following changes

1. Introduce efa_rdm_pke_proc_received_no_hdr() to process the pkts
   without hdr separately.
2. For pkts without hdrs, retrieve the cq data from ibv wc directly.
3. Modify the peer address lookup method to handle pkts without hdrs.

Signed-off-by: Shi Jin <[email protected]>
  • Loading branch information
shijin-aws committed Jun 26, 2024
1 parent caafe7b commit 5ac5286
Show file tree
Hide file tree
Showing 9 changed files with 147 additions and 104 deletions.
10 changes: 7 additions & 3 deletions prov/efa/src/efa_av.c
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,13 @@ fi_addr_t efa_av_reverse_lookup_rdm(struct efa_av *av, uint16_t ahn, uint16_t qp
if (OFI_UNLIKELY(!cur_entry))
return FI_ADDR_NOTAVAIL;

if (!pkt_entry) {
/* There is no packet entry to extract connid from when we get an
IBV_WC_RECV_RDMA_WITH_IMM completion from rdma-core. */
if (!pkt_entry || (pkt_entry->alloc_type == EFA_RDM_PKE_FROM_USER_RX_POOL)) {
/**
* There is no packet entry to extract connid from when we get an
* IBV_WC_RECV_RDMA_WITH_IMM completion from rdma-core.
* Or the pkt_entry is allocated from a buffer user posted that
* doesn't expect any pkt hdr.
*/
return cur_entry->conn->fi_addr;
}

Expand Down
116 changes: 104 additions & 12 deletions prov/efa/src/rdm/efa_rdm_cq.c
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,109 @@ fi_addr_t efa_rdm_cq_determine_addr_from_ibv_cq(struct ibv_cq_ex *ibv_cqx, enum
}
#endif

/**
* @brief handle a received packet
*
* @param ep[in,out] endpoint
* @param pkt_entry[in,out] received packet, will be released by this function
*/
static void efa_rdm_cq_handle_recv_completion(struct efa_ibv_cq *ibv_cq, struct efa_rdm_pke *pkt_entry, struct efa_rdm_ep *ep)
{
int pkt_type;
struct efa_rdm_peer *peer;
struct efa_rdm_base_hdr *base_hdr;
struct efa_av *efa_av = ep->base_ep.av;
uint32_t imm_data = 0;
bool has_imm_data = false;
struct ibv_cq_ex *ibv_cq_ex = ibv_cq->ibv_cq_ex;

if (pkt_entry->alloc_type == EFA_RDM_PKE_FROM_USER_RX_POOL) {
assert(ep->user_rx_pkts_posted > 0);
ep->user_rx_pkts_posted--;
} else {
assert(ep->efa_rx_pkts_posted > 0);
ep->efa_rx_pkts_posted--;
}

pkt_entry->addr = efa_av_reverse_lookup_rdm(efa_av, ibv_wc_read_slid(ibv_cq_ex),
ibv_wc_read_src_qp(ibv_cq_ex), pkt_entry);

if (pkt_entry->addr == FI_ADDR_NOTAVAIL) {
pkt_entry->addr = efa_rdm_cq_determine_addr_from_ibv_cq(ibv_cq_ex, ibv_cq->ibv_cq_ex_type);
}

pkt_entry->pkt_size = ibv_wc_read_byte_len(ibv_cq_ex);
assert(pkt_entry->pkt_size > 0);
if (ibv_wc_read_wc_flags(ibv_cq_ex) & IBV_WC_WITH_IMM) {
has_imm_data = true;
imm_data = ibv_wc_read_imm_data(ibv_cq_ex);
}

/*
* Ignore packet if peer address cannot be determined. This ususally happens if
* we had prior communication with the peer, but
* application called fi_av_remove() to remove the address
* from address vector.
*/
if (pkt_entry->addr == FI_ADDR_NOTAVAIL) {
EFA_WARN(FI_LOG_CQ,
"Warning: ignoring a received packet from a removed address. packet type: %" PRIu8
", packet flags: %x\n",
efa_rdm_pke_get_base_hdr(pkt_entry)->type,
efa_rdm_pke_get_base_hdr(pkt_entry)->flags);
efa_rdm_pke_release_rx(pkt_entry);
return;
}

#if ENABLE_DEBUG
dlist_remove(&pkt_entry->dbg_entry);
dlist_insert_tail(&pkt_entry->dbg_entry, &ep->rx_pkt_list);
#ifdef ENABLE_EFA_RDM_PKE_DUMP
efa_rdm_pke_print(pkt_entry, "Received");
#endif
#endif
peer = efa_rdm_ep_get_peer(ep, pkt_entry->addr);
assert(peer);
if (peer->is_local) {
/*
* This happens when the peer is on same instance, but chose to
* use EFA device to communicate with me. In this case, we respect
* that and will not use shm with the peer.
* TODO: decide whether to use shm through handshake packet.
*/
peer->is_local = 0;
}

efa_rdm_ep_post_handshake_or_queue(ep, peer);

/**
* Data is already delivered to user posted pkt without pkt hdrs.
*/
if (pkt_entry->alloc_type == EFA_RDM_PKE_FROM_USER_RX_POOL) {
assert(ep->base_ep.user_recv_qp);
/* User recv pkts are only posted to the user recv qp */
assert(ibv_wc_read_qp_num(ibv_cq->ibv_cq_ex) == ep->base_ep.user_recv_qp->qp_num);
return efa_rdm_pke_proc_received_no_hdr(pkt_entry, has_imm_data, imm_data);
}

/* Proc receives with pkt hdrs (posted to ctrl QPs)*/
base_hdr = efa_rdm_pke_get_base_hdr(pkt_entry);
pkt_type = base_hdr->type;
if (pkt_type >= EFA_RDM_EXTRA_REQ_PKT_END) {
EFA_WARN(FI_LOG_CQ,
"Peer %d is requesting feature %d, which this EP does not support.\n",
(int)pkt_entry->addr, base_hdr->type);

assert(0 && "invalid REQ packet type");
efa_base_ep_write_eq_error(&ep->base_ep, FI_EIO, FI_EFA_ERR_INVALID_PKT_TYPE);
efa_rdm_pke_release_rx(pkt_entry);
return;
}

efa_rdm_pke_proc_received(pkt_entry);
}


/**
* @brief Get the vendor error code for an endpoint's CQ
*
Expand Down Expand Up @@ -328,7 +431,6 @@ void efa_rdm_cq_poll_ibv_cq(ssize_t cqe_to_process, struct efa_ibv_cq *ibv_cq)
* EFA expects .comp_mask = 0, or otherwise returns EINVAL.
*/
struct ibv_poll_cq_attr poll_cq_attr = {.comp_mask = 0};
struct efa_av *efa_av;
struct efa_rdm_pke *pkt_entry;
ssize_t err;
int opcode;
Expand Down Expand Up @@ -381,17 +483,7 @@ void efa_rdm_cq_poll_ibv_cq(ssize_t cqe_to_process, struct efa_ibv_cq *ibv_cq)
efa_rdm_pke_handle_send_completion(pkt_entry);
break;
case IBV_WC_RECV:
efa_av = ep->base_ep.av;
pkt_entry->addr = efa_av_reverse_lookup_rdm(efa_av, ibv_wc_read_slid(ibv_cq->ibv_cq_ex),
ibv_wc_read_src_qp(ibv_cq->ibv_cq_ex), pkt_entry);

if (pkt_entry->addr == FI_ADDR_NOTAVAIL) {
pkt_entry->addr = efa_rdm_cq_determine_addr_from_ibv_cq(ibv_cq->ibv_cq_ex, ibv_cq->ibv_cq_ex_type);
}

pkt_entry->pkt_size = ibv_wc_read_byte_len(ibv_cq->ibv_cq_ex);
assert(pkt_entry->pkt_size > 0);
efa_rdm_pke_handle_recv_completion(pkt_entry);
efa_rdm_cq_handle_recv_completion(ibv_cq, pkt_entry, ep);
#if ENABLE_DEBUG
ep->recv_comps++;
#endif
Expand Down
2 changes: 0 additions & 2 deletions prov/efa/src/rdm/efa_rdm_pke.c
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,6 @@ void efa_rdm_pke_release_rx(struct efa_rdm_pke *pkt_entry)
assert(pkt_entry->next == NULL);
ep = pkt_entry->ep;
assert(ep);
if (ep->use_zcpy_rx && pkt_entry->alloc_type == EFA_RDM_PKE_FROM_USER_RX_POOL)
return;

if (pkt_entry->alloc_type == EFA_RDM_PKE_FROM_EFA_RX_POOL) {
ep->efa_rx_pkts_to_post++;
Expand Down
108 changes: 21 additions & 87 deletions prov/efa/src/rdm/efa_rdm_pke_cmd.c
Original file line number Diff line number Diff line change
Expand Up @@ -766,6 +766,27 @@ fi_addr_t efa_rdm_pke_insert_addr(struct efa_rdm_pke *pkt_entry, void *raw_addr)
return rdm_addr;
}

void efa_rdm_pke_proc_received_no_hdr(struct efa_rdm_pke *pkt_entry, bool has_imm_data, uint32_t imm_data)
{
struct efa_rdm_ope *rxe = pkt_entry->ope;

assert(pkt_entry->alloc_type == EFA_RDM_PKE_FROM_USER_RX_POOL);
assert(rxe);

if (has_imm_data) {
rxe->cq_entry.flags |= FI_REMOTE_CQ_DATA;
rxe->cq_entry.data = imm_data;
}

rxe->addr = pkt_entry->addr;
rxe->total_len = pkt_entry->pkt_size;
rxe->cq_entry.len = pkt_entry->pkt_size;

efa_rdm_rxe_report_completion(rxe);
efa_rdm_rxe_release(rxe);
efa_rdm_pke_release_rx(pkt_entry);
}

/**
* @brief process a received packet
*
Expand Down Expand Up @@ -897,93 +918,6 @@ fi_addr_t efa_rdm_pke_determine_addr(struct efa_rdm_pke *pkt_entry)
return FI_ADDR_NOTAVAIL;
}

/**
* @brief handle a received packet
*
* @param ep[in,out] endpoint
* @param pkt_entry[in,out] received packet, will be released by this function
*/
void efa_rdm_pke_handle_recv_completion(struct efa_rdm_pke *pkt_entry)
{
int pkt_type;
struct efa_rdm_ep *ep;
struct efa_rdm_peer *peer;
struct efa_rdm_base_hdr *base_hdr;
struct efa_rdm_ope *zcpy_rxe = NULL;

ep = pkt_entry->ep;
assert(ep);

assert(ep->efa_rx_pkts_posted > 0);
ep->efa_rx_pkts_posted--;

base_hdr = efa_rdm_pke_get_base_hdr(pkt_entry);
pkt_type = base_hdr->type;
if (pkt_type >= EFA_RDM_EXTRA_REQ_PKT_END) {
EFA_WARN(FI_LOG_CQ,
"Peer %d is requesting feature %d, which this EP does not support.\n",
(int)pkt_entry->addr, base_hdr->type);

assert(0 && "invalid REQ packet type");
efa_base_ep_write_eq_error(&ep->base_ep, FI_EIO, FI_EFA_ERR_INVALID_PKT_TYPE);
efa_rdm_pke_release_rx(pkt_entry);
return;
}

/*
* Ignore packet if peer address cannot be determined. This ususally happens if
* we had prior communication with the peer, but
* application called fi_av_remove() to remove the address
* from address vector.
*/
if (pkt_entry->addr == FI_ADDR_NOTAVAIL) {
EFA_WARN(FI_LOG_CQ,
"Warning: ignoring a received packet from a removed address. packet type: %" PRIu8
", packet flags: %x\n",
efa_rdm_pke_get_base_hdr(pkt_entry)->type,
efa_rdm_pke_get_base_hdr(pkt_entry)->flags);
efa_rdm_pke_release_rx(pkt_entry);
return;
}

#if ENABLE_DEBUG
if (!ep->use_zcpy_rx) {
dlist_remove(&pkt_entry->dbg_entry);
dlist_insert_tail(&pkt_entry->dbg_entry, &ep->rx_pkt_list);
}
#ifdef ENABLE_EFA_RDM_PKE_DUMP
efa_rdm_pke_print(pkt_entry, "Received");
#endif
#endif
peer = efa_rdm_ep_get_peer(ep, pkt_entry->addr);
assert(peer);
if (peer->is_local) {
/*
* This happens when the peer is on same instance, but chose to
* use EFA device to communicate with me. In this case, we respect
* that and will not use shm with the peer.
* TODO: decide whether to use shm through handshake packet.
*/
peer->is_local = 0;
}

efa_rdm_ep_post_handshake_or_queue(ep, peer);


if (pkt_entry->alloc_type == EFA_RDM_PKE_FROM_USER_RX_POOL) {
assert(pkt_entry->ope);
zcpy_rxe = pkt_entry->ope;
}

efa_rdm_pke_proc_received(pkt_entry);

if (zcpy_rxe && pkt_type != EFA_RDM_EAGER_MSGRTM_PKT) {
/* user buffer was not matched with a message,
* therefore reposting the buffer */
efa_rdm_ep_post_user_recv_buf(ep, zcpy_rxe, 0);
}
}

#if ENABLE_DEBUG

/*
Expand Down
4 changes: 4 additions & 0 deletions prov/efa/src/rdm/efa_rdm_pke_cmd.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ void efa_rdm_pke_handle_rx_error(struct efa_rdm_pke *pkt_entry, int prov_errno);

void efa_rdm_pke_handle_recv_completion(struct efa_rdm_pke *pkt_entry);

void efa_rdm_pke_proc_received(struct efa_rdm_pke *pkt_entry);

void efa_rdm_pke_proc_received_no_hdr(struct efa_rdm_pke *pkt_entry, bool has_imm_data, uint32_t imm_data);

#if ENABLE_DEBUG
void efa_rdm_pke_print(struct efa_rdm_pke *pkt_entry, char *prefix);
#endif
Expand Down
2 changes: 2 additions & 0 deletions prov/efa/test/efa_unit_test_cq.c
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,7 @@ static void test_impl_ibv_cq_ex_read_unknow_peer_ah(struct efa_resource *resourc
efa_rdm_cq->ibv_cq.ibv_cq_ex->read_byte_len = &efa_mock_ibv_read_byte_len_return_mock;
efa_rdm_cq->ibv_cq.ibv_cq_ex->read_opcode = &efa_mock_ibv_read_opcode_return_mock;
efa_rdm_cq->ibv_cq.ibv_cq_ex->read_qp_num = &efa_mock_ibv_read_qp_num_return_mock;
efa_rdm_cq->ibv_cq.ibv_cq_ex->read_wc_flags = &efa_mock_ibv_read_wc_flags_return_mock;
efa_rdm_cq->ibv_cq.ibv_cq_ex->read_src_qp = &efa_mock_ibv_read_src_qp_return_mock;

if (support_efadv_cq) {
Expand All @@ -581,6 +582,7 @@ static void test_impl_ibv_cq_ex_read_unknow_peer_ah(struct efa_resource *resourc
will_return(efa_mock_ibv_read_byte_len_return_mock, pkt_entry->pkt_size);
will_return_maybe(efa_mock_ibv_read_opcode_return_mock, IBV_WC_RECV);
will_return_maybe(efa_mock_ibv_read_qp_num_return_mock, 0);
will_return_maybe(efa_mock_ibv_read_wc_flags_return_mock, 0);
will_return_maybe(efa_mock_ibv_read_src_qp_return_mock, raw_addr.qpn);

/* Post receive buffer */
Expand Down
2 changes: 2 additions & 0 deletions prov/efa/test/efa_unit_test_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ void test_efa_rdm_ep_handshake_exchange_host_id(struct efa_resource **state, uin
efa_rdm_cq->ibv_cq.ibv_cq_ex->read_slid = &efa_mock_ibv_read_slid_return_mock;
efa_rdm_cq->ibv_cq.ibv_cq_ex->read_src_qp = &efa_mock_ibv_read_src_qp_return_mock;
efa_rdm_cq->ibv_cq.ibv_cq_ex->read_qp_num = &efa_mock_ibv_read_qp_num_return_mock;
efa_rdm_cq->ibv_cq.ibv_cq_ex->read_wc_flags = &efa_mock_ibv_read_wc_flags_return_mock;
efa_rdm_cq->ibv_cq.ibv_cq_ex->read_vendor_err = &efa_mock_ibv_read_vendor_err_return_mock;
efa_rdm_cq->ibv_cq.ibv_cq_ex->start_poll = &efa_mock_ibv_start_poll_return_mock;
efa_rdm_cq->ibv_cq.ibv_cq_ex->status = IBV_WC_SUCCESS;
Expand All @@ -185,6 +186,7 @@ void test_efa_rdm_ep_handshake_exchange_host_id(struct efa_resource **state, uin
will_return(efa_mock_ibv_read_byte_len_return_mock, pkt_entry->pkt_size);
will_return(efa_mock_ibv_read_opcode_return_mock, IBV_WC_RECV);
will_return(efa_mock_ibv_read_qp_num_return_mock, 0);
will_return(efa_mock_ibv_read_wc_flags_return_mock, 0);
will_return(efa_mock_ibv_read_slid_return_mock, efa_rdm_ep_get_peer_ahn(efa_rdm_ep, peer_addr));
will_return(efa_mock_ibv_read_src_qp_return_mock, raw_addr.qpn);
will_return(efa_mock_ibv_start_poll_return_mock, IBV_WC_SUCCESS);
Expand Down
5 changes: 5 additions & 0 deletions prov/efa/test/efa_unit_test_mocks.c
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,11 @@ uint32_t efa_mock_ibv_read_qp_num_return_mock(struct ibv_cq_ex *current)
return mock();
}

uint32_t efa_mock_ibv_read_wc_flags_return_mock(struct ibv_cq_ex *current)
{
return mock();
}

int g_ofi_copy_from_hmem_iov_call_counter;
ssize_t efa_mock_ofi_copy_from_hmem_iov_inc_counter(void *dest, size_t size,
enum fi_hmem_iface hmem_iface, uint64_t device,
Expand Down
2 changes: 2 additions & 0 deletions prov/efa/test/efa_unit_test_mocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ uint32_t efa_mock_ibv_read_vendor_err_return_mock(struct ibv_cq_ex *current);

uint32_t efa_mock_ibv_read_qp_num_return_mock(struct ibv_cq_ex *current);

uint32_t efa_mock_ibv_read_wc_flags_return_mock(struct ibv_cq_ex *current);

ssize_t __real_ofi_copy_from_hmem_iov(void *dest, size_t size,
enum fi_hmem_iface hmem_iface, uint64_t device,
const struct iovec *hmem_iov,
Expand Down

0 comments on commit 5ac5286

Please sign in to comment.