Skip to content

Commit

Permalink
prov/efa: Extend efa_ep interface
Browse files Browse the repository at this point in the history
Extend efa ep interface to make it cover all the
applied features that efa-rdm ep interface supports
today.

It also refactors and moves several internal efa_rdm_ep functions
to efa_base_ep.c to cover both efa_direct and efa_rdm ep.

Signed-off-by: Shi Jin <[email protected]>
  • Loading branch information
shijin-aws committed Jan 16, 2025
1 parent efbf69f commit bfd0575
Show file tree
Hide file tree
Showing 16 changed files with 889 additions and 381 deletions.
17 changes: 17 additions & 0 deletions prov/efa/src/efa.h
Original file line number Diff line number Diff line change
Expand Up @@ -227,4 +227,21 @@ bool efa_use_unsolicited_write_recv()
return efa_env.use_unsolicited_write_recv && efa_device_support_unsolicited_write_recv();
}

/**
* Convenience macro for setopt with an enforced threshold
*/
#define EFA_EP_SETOPT_THRESHOLD(opt, field, threshold) { \
size_t _val = *(size_t *) optval; \
if (optlen != sizeof field) \
return -FI_EINVAL; \
if (_val > threshold) { \
EFA_WARN(FI_LOG_EP_CTRL, \
"Requested size of %zu for FI_OPT_" #opt " " \
"exceeds the maximum (%zu)\n", \
_val, threshold); \
return -FI_EINVAL; \
} \
field = _val; \
}

#endif /* EFA_H */
218 changes: 215 additions & 3 deletions prov/efa/src/efa_base_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "efa.h"
#include "efa_av.h"
#include "efa_cq.h"
#include "efa_cntr.h"
#include "rdm/efa_rdm_protocol.h"

int efa_base_ep_bind_av(struct efa_base_ep *base_ep, struct efa_av *av)
Expand Down Expand Up @@ -366,9 +367,10 @@ int efa_base_ep_construct(struct efa_base_ep *base_ep,
base_ep->qp = NULL;
base_ep->user_recv_qp = NULL;

base_ep->max_msg_size = info->ep_attr->max_msg_size;
base_ep->max_rma_size = info->ep_attr->max_msg_size;
base_ep->inject_msg_size = info->tx_attr->inject_size;
/* Use device's native limit as the default value of base ep*/
base_ep->max_msg_size = (size_t) base_ep->domain->device->ibv_port_attr.max_msg_sz;
base_ep->max_rma_size = (size_t) base_ep->domain->device->max_rdma_size;
base_ep->inject_msg_size = (size_t) base_ep->domain->device->efa_attr.inline_buf_size;
/* TODO: update inject_rma_size to inline size after firmware
* supports inline rdma write */
base_ep->inject_rma_size = 0;
Expand Down Expand Up @@ -531,3 +533,213 @@ struct efa_cq *efa_base_ep_get_rx_cq(struct efa_base_ep *ep)
{
return ep->util_ep.rx_cq ? container_of(ep->util_ep.rx_cq, struct efa_cq, util_cq) : NULL;
}

/**
* @brief Construct the ibv qp init attr for given ep and cq
*
* @param ep a ptr to the efa_base_ep
* @param attr_ex the constructed qp attr
* @param tx_cq tx cq
* @param rx_cq rx cq
*/
static inline
void efa_base_ep_construct_ibv_qp_init_attr_ex(struct efa_base_ep *ep,
struct ibv_qp_init_attr_ex *attr_ex,
struct ibv_cq_ex *tx_cq,
struct ibv_cq_ex *rx_cq)
{
struct fi_info *info;

if (ep->info->ep_attr->type == FI_EP_RDM) {
attr_ex->qp_type = IBV_QPT_DRIVER;
info = ep->domain->device->rdm_info;
} else {
assert(ep->info->ep_attr->type == FI_EP_DGRAM);
attr_ex->qp_type = IBV_QPT_UD;
info = ep->domain->device->dgram_info;
}
attr_ex->cap.max_send_wr = info->tx_attr->size;
attr_ex->cap.max_send_sge = info->tx_attr->iov_limit;
attr_ex->cap.max_recv_wr = info->rx_attr->size;
attr_ex->cap.max_recv_sge = info->rx_attr->iov_limit;
attr_ex->cap.max_inline_data = ep->domain->device->efa_attr.inline_buf_size;
attr_ex->pd = ep->domain->ibv_pd;
attr_ex->qp_context = ep;
attr_ex->sq_sig_all = 1;

attr_ex->send_cq = ibv_cq_ex_to_cq(tx_cq);
attr_ex->recv_cq = ibv_cq_ex_to_cq(rx_cq);
}

/**
* @brief check the in order aligned 128 bytes support for a given ibv_wr_op code
*
* @param ep efa_base_ep
* @param op_code ibv wr op code
* @return int 0 if in order aligned 128 bytes is supported, -FI_EOPNOTSUPP if
* it is not supported. Other negative integer for other errors.
*/
int efa_base_ep_check_qp_in_order_aligned_128_bytes(struct efa_base_ep *ep,
enum ibv_wr_opcode op_code)
{
struct efa_qp *qp = NULL;
struct ibv_qp_init_attr_ex attr_ex = {0};
int ret, retv;
struct ibv_cq_ex *ibv_cq_ex = NULL;
enum ibv_cq_ex_type ibv_cq_ex_type;
struct fi_cq_attr cq_attr = {0};

ret = efa_cq_ibv_cq_ex_open(&cq_attr, ep->domain->device->ibv_ctx, &ibv_cq_ex, &ibv_cq_ex_type);
if (ret) {
EFA_WARN(FI_LOG_CQ, "Unable to create extended CQ: %d\n", ret);
ret = -FI_EINVAL;
goto out;
}

/* Create a dummy qp for query only */
efa_base_ep_construct_ibv_qp_init_attr_ex(ep, &attr_ex, ibv_cq_ex, ibv_cq_ex);

ret = efa_qp_create(&qp, &attr_ex, FI_TC_UNSPEC);
if (ret)
goto out;

if (!efa_qp_support_op_in_order_aligned_128_bytes(qp, op_code))
ret = -FI_EOPNOTSUPP;

out:
if (qp)
efa_qp_destruct(qp);

if (ibv_cq_ex) {
retv = -ibv_destroy_cq(ibv_cq_ex_to_cq(ibv_cq_ex));
if (retv)
EFA_WARN(FI_LOG_EP_CTRL, "Unable to close ibv cq: %s\n",
fi_strerror(-retv));
}
return ret;
}

/**
* @brief Insert tx/rx cq into the cntrs the ep is bind to
*
* @param ep efa_base_ep
* @return int 0 on success, negative integer on failure
*/
int efa_base_ep_insert_cntr_ibv_cq_poll_list(struct efa_base_ep *ep)
{
int i, ret;
struct efa_cntr *efa_cntr;
struct util_cntr *util_cntr;
struct efa_cq *tx_cq, *rx_cq;

tx_cq = efa_base_ep_get_tx_cq(ep);
rx_cq = efa_base_ep_get_rx_cq(ep);

for (i = 0; i < CNTR_CNT; i++) {
util_cntr = ep->util_ep.cntrs[i];
if (util_cntr) {
efa_cntr = container_of(util_cntr, struct efa_cntr, util_cntr);
if (tx_cq) {
ret = efa_ibv_cq_poll_list_insert(&efa_cntr->ibv_cq_poll_list, &efa_cntr->util_cntr.ep_list_lock, &tx_cq->ibv_cq);
if (ret)
return ret;
}
if (rx_cq) {
ret = efa_ibv_cq_poll_list_insert(&efa_cntr->ibv_cq_poll_list, &efa_cntr->util_cntr.ep_list_lock, &rx_cq->ibv_cq);
if (ret)
return ret;
}
ofi_genlock_lock(&efa_cntr->util_cntr.ep_list_lock);
efa_cntr->need_to_scan_ep_list = true;
ofi_genlock_unlock(&efa_cntr->util_cntr.ep_list_lock);
}
}

return FI_SUCCESS;
}

/**
* @brief Remove tx/rx cq from the cntr that ep is bind to
*
* @param ep efa_base_ep
*/
void efa_base_ep_remove_cntr_ibv_cq_poll_list(struct efa_base_ep *ep)
{
int i;
struct efa_cntr *efa_cntr;
struct util_cntr *util_cntr;
struct efa_cq *tx_cq, *rx_cq;

tx_cq = efa_base_ep_get_tx_cq(ep);
rx_cq = efa_base_ep_get_rx_cq(ep);

for (i = 0; i< CNTR_CNT; i++) {
util_cntr = ep->util_ep.cntrs[i];
if (util_cntr) {
efa_cntr = container_of(util_cntr, struct efa_cntr, util_cntr);
if (tx_cq && !ofi_atomic_get32(&tx_cq->util_cq.ref))
efa_ibv_cq_poll_list_remove(&efa_cntr->ibv_cq_poll_list, &efa_cntr->util_cntr.ep_list_lock, &tx_cq->ibv_cq);

if (rx_cq && !ofi_atomic_get32(&rx_cq->util_cq.ref))
efa_ibv_cq_poll_list_remove(&efa_cntr->ibv_cq_poll_list, &efa_cntr->util_cntr.ep_list_lock, &rx_cq->ibv_cq);
}
}
}

/**
* @brief Create and enable the IBV QP that backs the EP
*
* @param ep efa_base_ep
* @param create_user_recv_qp whether to create the user_recv_qp. This boolean
* is only true for the zero copy recv mode in the efa-rdm endpoint
*
* @return int 0 on success, negative integer on failure
*/
int efa_base_ep_create_and_enable_qp(struct efa_base_ep *ep, bool create_user_recv_qp)
{
struct ibv_qp_init_attr_ex attr_ex = { 0 };
struct efa_cq *scq, *rcq;
struct ibv_cq_ex *tx_ibv_cq, *rx_ibv_cq;
int err;

scq = efa_base_ep_get_tx_cq(ep);
rcq = efa_base_ep_get_rx_cq(ep);

if (!scq && !rcq) {
EFA_WARN(FI_LOG_EP_CTRL,
"Endpoint is not bound to a send or receive completion queue\n");
return -FI_ENOCQ;
}

if (!scq && ofi_needs_tx(ep->info->caps)) {
EFA_WARN(FI_LOG_EP_CTRL,
"Endpoint is not bound to a send completion queue when it has transmit capabilities enabled (FI_SEND).\n");
return -FI_ENOCQ;
}

if (!rcq && ofi_needs_rx(ep->info->caps)) {
EFA_WARN(FI_LOG_EP_CTRL,
"Endpoint is not bound to a receive completion queue when it has receive capabilities enabled. (FI_RECV)\n");
return -FI_ENOCQ;
}

tx_ibv_cq = scq ? scq->ibv_cq.ibv_cq_ex : rcq->ibv_cq.ibv_cq_ex;
rx_ibv_cq = rcq ? rcq->ibv_cq.ibv_cq_ex : scq->ibv_cq.ibv_cq_ex;

efa_base_ep_construct_ibv_qp_init_attr_ex(ep, &attr_ex, tx_ibv_cq, rx_ibv_cq);

err = efa_base_ep_create_qp(ep, &attr_ex);
if (err)
return err;

if (create_user_recv_qp) {
err = efa_qp_create(&ep->user_recv_qp, &attr_ex, ep->info->tx_attr->tclass);
if (err) {
efa_base_ep_destruct_qp(ep);
return err;
}
ep->user_recv_qp->base_ep = ep;
}

return efa_base_ep_enable(ep);
}
9 changes: 9 additions & 0 deletions prov/efa/src/efa_base_ep.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,4 +117,13 @@ struct efa_cq *efa_base_ep_get_tx_cq(struct efa_base_ep *ep);

struct efa_cq *efa_base_ep_get_rx_cq(struct efa_base_ep *ep);

int efa_base_ep_check_qp_in_order_aligned_128_bytes(struct efa_base_ep *base_ep,
enum ibv_wr_opcode op_code);

int efa_base_ep_insert_cntr_ibv_cq_poll_list(struct efa_base_ep *ep);

void efa_base_ep_remove_cntr_ibv_cq_poll_list(struct efa_base_ep *ep);

int efa_base_ep_create_and_enable_qp(struct efa_base_ep *ep, bool create_user_recv_qp);

#endif
16 changes: 7 additions & 9 deletions prov/efa/src/efa_cntr.c
Original file line number Diff line number Diff line change
Expand Up @@ -180,18 +180,16 @@ static void efa_rdm_cntr_progress(struct util_cntr *cntr)

static void efa_cntr_progress(struct util_cntr *cntr)
{
struct util_ep *ep;
struct fid_list_entry *fid_entry;
struct dlist_entry *item;
struct efa_ibv_cq_poll_list_entry *poll_list_entry;
struct efa_cntr *efa_cntr;

efa_cntr = container_of(cntr, struct efa_cntr, util_cntr);

ofi_genlock_lock(&cntr->ep_list_lock);
dlist_foreach(&cntr->ep_list, item) {
fid_entry = container_of(item, struct fid_list_entry, entry);
ep = container_of(fid_entry->fid, struct util_ep, ep_fid.fid);
if (ep->tx_cq)
efa_cq_progress(ep->tx_cq);
if (ep->rx_cq && ep->rx_cq != ep->tx_cq)
efa_cq_progress(ep->rx_cq);
dlist_foreach(&efa_cntr->ibv_cq_poll_list, item) {
poll_list_entry = container_of(item, struct efa_ibv_cq_poll_list_entry, entry);
efa_cq_poll_ibv_cq(efa_env.efa_cq_read_size, poll_list_entry->cq);
}
ofi_genlock_unlock(&cntr->ep_list_lock);
}
Expand Down
8 changes: 5 additions & 3 deletions prov/efa/src/efa_cq.c
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ efa_cq_proc_ibv_recv_rdma_with_imm_completion(struct efa_base_ep *base_ep,
* A negative number means to poll until cq empty.
* @param[in] util_cq util_cq
*/
void efa_cq_poll_ibv_cq(ssize_t cqe_to_process, struct util_cq *util_cq)
void efa_cq_poll_ibv_cq(ssize_t cqe_to_process, struct efa_ibv_cq *ibv_cq)
{
bool should_end_poll = false;
struct efa_base_ep *base_ep;
Expand All @@ -260,7 +260,7 @@ void efa_cq_poll_ibv_cq(ssize_t cqe_to_process, struct util_cq *util_cq)
*/
struct ibv_poll_cq_attr poll_cq_attr = {.comp_mask = 0};

cq = container_of(util_cq, struct efa_cq, util_cq);
cq = container_of(ibv_cq, struct efa_cq, ibv_cq);
efa_domain = container_of(cq->util_cq.domain, struct efa_domain, util_domain);

/* Call ibv_start_poll only once */
Expand Down Expand Up @@ -381,7 +381,9 @@ static struct fi_ops_cq efa_cq_ops = {

void efa_cq_progress(struct util_cq *cq)
{
efa_cq_poll_ibv_cq(efa_env.efa_cq_read_size, cq);
struct efa_cq *efa_cq = container_of(cq, struct efa_cq, util_cq);

efa_cq_poll_ibv_cq(efa_env.efa_cq_read_size, &efa_cq->ibv_cq);
}

static int efa_cq_close(fid_t fid)
Expand Down
7 changes: 7 additions & 0 deletions prov/efa/src/efa_cq.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
/* SPDX-License-Identifier: BSD-2-Clause OR GPL-2.0-only */
/* SPDX-FileCopyrightText: Copyright Amazon.com, Inc. or its affiliates. All rights reserved. */

#ifndef _EFA_CQ_H
#define _EFA_CQ_H

#include "efa.h"

enum ibv_cq_ex_type {
Expand Down Expand Up @@ -269,3 +272,7 @@ static inline int efa_write_error_msg(struct efa_base_ep *ep, fi_addr_t addr,

return 0;
}

void efa_cq_poll_ibv_cq(ssize_t cqe_to_process, struct efa_ibv_cq *ibv_cq);

#endif /* end of _EFA_CQ_H*/
Loading

0 comments on commit bfd0575

Please sign in to comment.