Skip to content

Commit

Permalink
prov/efa: Create 1:1 relationship between libfabric CQs and IBV CQs
Browse files Browse the repository at this point in the history
Today, each endpoint has its own completion queue with the EFA provider, even if all endpoints bind the same CQ.
This caused applications to poll many more hardware completion queues than expected, at significantly reduced performance.
This patch fixes this issue by moving the IBV cq creation to fi_cq_open,
and bind it with ep explicitly via fi_ep_bind.

The ibv cq poll are isolated from the efa_rdm_ep_progress and made a separate call
made by the fi_cq_read and fi_cntr_read. Because applications may create separate
tx and rx ibv cqs, both of them should be polled during the cq progress due to EFA's
protocol requirements. This patch creates a ibv_cq_poll_list structure to track
the ibv cqs that requires polling.

As a consequence of this change, the real QP creation is moved to fi_ep_enable
because the ibv cq must be bound with qp during creation, which is guaranteed
during the ep enable.

Signed-off-by: Shi Jin <[email protected]>
  • Loading branch information
shijin-aws committed Mar 6, 2024
1 parent 5e0191d commit d48fb80
Show file tree
Hide file tree
Showing 20 changed files with 1,159 additions and 536 deletions.
4 changes: 4 additions & 0 deletions prov/efa/Makefile.include
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,10 @@ if HAVE_EFADV_QUERY_MR
prov_efa_test_efa_unit_test_LDFLAGS += -Wl,--wrap=efadv_query_mr
endif HAVE_EFADV_QUERY_MR

if HAVE_EFA_DATA_IN_ORDER_ALIGNED_128_BYTES
prov_efa_test_efa_unit_test_LDFLAGS += -Wl,--wrap=ibv_query_qp_data_in_order
endif

prov_efa_test_efa_unit_test_LIBS = $(efa_LIBS) $(linkback)

endif ENABLE_EFA_UNIT_TEST
Expand Down
1 change: 1 addition & 0 deletions prov/efa/configure.m4
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ AC_DEFUN([FI_EFA_CONFIGURE],[
AM_CONDITIONAL([HAVE_EFADV_CQ_EX], [ test $efadv_support_extended_cq = 1])
AM_CONDITIONAL([HAVE_EFADV_QUERY_MR], [ test $have_efadv_query_mr = 1])
AM_CONDITIONAL([HAVE_EFA_DATA_IN_ORDER_ALIGNED_128_BYTES], [ test $efa_support_data_in_order_aligned_128_byte = 1])
AM_CONDITIONAL([ENABLE_EFA_UNIT_TEST], [ test x"$enable_efa_unit_test" != xno])
AC_SUBST(efa_CPPFLAGS)
Expand Down
60 changes: 43 additions & 17 deletions prov/efa/src/efa_base_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ int efa_base_ep_bind_av(struct efa_base_ep *base_ep, struct efa_av *av)
return 0;
}

static int efa_base_ep_destruct_qp(struct efa_base_ep *base_ep)
int efa_base_ep_destruct_qp(struct efa_base_ep *base_ep)
{
struct efa_domain *domain;
struct efa_qp *qp = base_ep->qp;
Expand All @@ -48,23 +48,31 @@ static int efa_base_ep_destruct_qp(struct efa_base_ep *base_ep)
EFA_INFO(FI_LOG_CORE, "destroy qp[%u] failed!\n", qp->qp_num);

free(qp);
base_ep->qp = NULL;
}

return 0;
}

int efa_base_ep_destruct(struct efa_base_ep *base_ep)
void efa_base_ep_close_util_ep(struct efa_base_ep *base_ep)
{
int err;

/* We need to free the util_ep first to avoid race conditions
* with other threads progressing the cq. */
if (base_ep->util_ep_initialized) {
err = ofi_endpoint_close(&base_ep->util_ep);
if (err)
EFA_WARN(FI_LOG_EP_CTRL, "Unable to close util EP\n");
base_ep->util_ep_initialized = false;
}
}

int efa_base_ep_destruct(struct efa_base_ep *base_ep)
{
int err;

/* We need to free the util_ep first to avoid race conditions
* with other threads progressing the cq. */
efa_base_ep_close_util_ep(base_ep);

fi_freeinfo(base_ep->info);

Expand Down Expand Up @@ -149,34 +157,52 @@ static int efa_base_ep_modify_qp_rst2rts(struct efa_base_ep *base_ep,
IBV_QP_STATE | IBV_QP_SQ_PSN);
}

int efa_base_ep_create_qp(struct efa_base_ep *base_ep,
struct ibv_qp_init_attr_ex *init_attr_ex)
/**
* @brief Create a efa_qp
*
* @param qp double pointer of struct efa_qp
* @param init_attr_ex ibv_qp_init_attr_ex
* @return int 0 on success, negative integer on failure
*/
int efa_qp_create(struct efa_qp **qp, struct ibv_qp_init_attr_ex *init_attr_ex)
{
struct efa_qp *qp;
struct efadv_qp_init_attr efa_attr = { 0 };

qp = calloc(1, sizeof(*qp));
if (!qp)
*qp = calloc(1, sizeof(struct efa_qp));
if (!*qp)
return -FI_ENOMEM;

if (init_attr_ex->qp_type == IBV_QPT_UD) {
qp->ibv_qp = ibv_create_qp_ex(init_attr_ex->pd->context,
(*qp)->ibv_qp = ibv_create_qp_ex(init_attr_ex->pd->context,
init_attr_ex);
} else {
assert(init_attr_ex->qp_type == IBV_QPT_DRIVER);
efa_attr.driver_qp_type = EFADV_QP_DRIVER_TYPE_SRD;
qp->ibv_qp = efadv_create_qp_ex(
(*qp)->ibv_qp = efadv_create_qp_ex(
init_attr_ex->pd->context, init_attr_ex, &efa_attr,
sizeof(struct efadv_qp_init_attr));
}

if (!qp->ibv_qp) {
if (!(*qp)->ibv_qp) {
EFA_WARN(FI_LOG_EP_CTRL, "ibv_create_qp failed. errno: %d\n", errno);
free(qp);
free(*qp);
return -errno;
}

qp->ibv_qp_ex = ibv_qp_to_qp_ex(qp->ibv_qp);
(*qp)->ibv_qp_ex = ibv_qp_to_qp_ex((*qp)->ibv_qp);
return FI_SUCCESS;
}

int efa_base_ep_create_qp(struct efa_base_ep *base_ep,
struct ibv_qp_init_attr_ex *init_attr_ex)
{
struct efa_qp *qp;
int ret;

ret = efa_qp_create(&qp, init_attr_ex);
if (ret)
return ret;

base_ep->qp = qp;
qp->base_ep = base_ep;
return 0;
Expand Down Expand Up @@ -333,17 +359,17 @@ int efa_base_ep_getname(fid_t fid, void *addr, size_t *addrlen)
* in-order operation.
*/
#if HAVE_EFA_DATA_IN_ORDER_ALIGNED_128_BYTES
bool efa_base_ep_support_op_in_order_aligned_128_bytes(struct efa_base_ep *base_ep, enum ibv_wr_opcode op)
bool efa_qp_support_op_in_order_aligned_128_bytes(struct efa_qp *qp, enum ibv_wr_opcode op)
{
int caps;

caps = ibv_query_qp_data_in_order(base_ep->qp->ibv_qp, op,
caps = ibv_query_qp_data_in_order(qp->ibv_qp, op,
IBV_QUERY_QP_DATA_IN_ORDER_RETURN_CAPS);

return !!(caps & IBV_QUERY_QP_DATA_IN_ORDER_ALIGNED_128_BYTES);
}
#else
bool efa_base_ep_support_op_in_order_aligned_128_bytes(struct efa_base_ep *base_ep, enum ibv_wr_opcode op)
bool efa_qp_support_op_in_order_aligned_128_bytes(struct efa_qp *qp, enum ibv_wr_opcode op)
{
return false;
}
Expand Down
8 changes: 7 additions & 1 deletion prov/efa/src/efa_base_ep.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,16 @@ int efa_base_ep_construct(struct efa_base_ep *base_ep,

int efa_base_ep_getname(fid_t fid, void *addr, size_t *addrlen);

int efa_qp_create(struct efa_qp **qp, struct ibv_qp_init_attr_ex *init_attr_ex);

int efa_base_ep_create_qp(struct efa_base_ep *base_ep,
struct ibv_qp_init_attr_ex *init_attr_ex);

bool efa_base_ep_support_op_in_order_aligned_128_bytes(struct efa_base_ep *base_ep,
void efa_base_ep_close_util_ep(struct efa_base_ep *base_ep);

int efa_base_ep_destruct_qp(struct efa_base_ep *base_ep);

bool efa_qp_support_op_in_order_aligned_128_bytes(struct efa_qp *qp,
enum ibv_wr_opcode op);

void efa_base_ep_write_eq_error(struct efa_base_ep *ep,
Expand Down
33 changes: 31 additions & 2 deletions prov/efa/src/efa_cntr.c
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "ofi_util.h"
#include "efa.h"
#include "efa_cntr.h"
#include "rdm/efa_rdm_cq.h"

static int efa_cntr_wait(struct fid_cntr *cntr_fid, uint64_t threshold, int timeout)
{
Expand Down Expand Up @@ -146,6 +147,28 @@ static struct fi_ops efa_cntr_fi_ops = {
.ops_open = fi_no_ops_open,
};

static void efa_rdm_cntr_progress(struct util_cntr *cntr)
{
struct util_ep *ep;
struct fid_list_entry *fid_entry;
struct dlist_entry *item;
struct efa_cntr *efa_cntr;
struct efa_ibv_cq_poll_list_entry *poll_list_entry;

ofi_genlock_lock(&cntr->ep_list_lock);
efa_cntr = container_of(cntr, struct efa_cntr, util_cntr);
dlist_foreach(&efa_cntr->ibv_cq_poll_list, item) {
poll_list_entry = container_of(item, struct efa_ibv_cq_poll_list_entry, entry);
efa_rdm_cq_poll_ibv_cq(efa_env.efa_cq_read_size, poll_list_entry->cq);
}
dlist_foreach(&cntr->ep_list, item) {
fid_entry = container_of(item, struct fid_list_entry, entry);
ep = container_of(fid_entry->fid, struct util_ep, ep_fid.fid);
ep->progress(ep);
}
ofi_genlock_unlock(&cntr->ep_list_lock);
}

int efa_cntr_open(struct fid_domain *domain, struct fi_cntr_attr *attr,
struct fid_cntr **cntr_fid, void *context)
{
Expand All @@ -159,11 +182,17 @@ int efa_cntr_open(struct fid_domain *domain, struct fi_cntr_attr *attr,
if (!cntr)
return -FI_ENOMEM;

dlist_init(&cntr->ibv_cq_poll_list);
efa_domain = container_of(domain, struct efa_domain,
util_domain.domain_fid);

ret = ofi_cntr_init(&efa_prov, domain, attr, &cntr->util_cntr,
&ofi_cntr_progress, context);
if (efa_domain->info->ep_attr->type == FI_EP_RDM)
ret = ofi_cntr_init(&efa_prov, domain, attr, &cntr->util_cntr,
&efa_rdm_cntr_progress, context);
else
ret = ofi_cntr_init(&efa_prov, domain, attr, &cntr->util_cntr,
&ofi_cntr_progress, context);

if (ret)
goto free;

Expand Down
1 change: 1 addition & 0 deletions prov/efa/src/efa_cntr.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
struct efa_cntr {
struct util_cntr util_cntr;
struct fid_cntr *shm_cntr;
struct dlist_entry ibv_cq_poll_list;
};

int efa_cntr_open(struct fid_domain *domain, struct fi_cntr_attr *attr,
Expand Down
77 changes: 77 additions & 0 deletions prov/efa/src/efa_cq.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,83 @@

#include "efa.h"

enum ibv_cq_ex_type {
IBV_CQ,
EFADV_CQ
};

struct efa_ibv_cq {
struct ibv_cq_ex *ibv_cq_ex;
enum ibv_cq_ex_type ibv_cq_ex_type;
/* dlist entry inserted to cq/cntr's poll_list */
struct dlist_entry entry;
};

struct efa_ibv_cq_poll_list_entry {
struct dlist_entry entry;
struct efa_ibv_cq *cq;
};

void efa_rdm_cq_poll_ibv_cq(ssize_t cqe_to_process, struct efa_ibv_cq *ibv_cq);

static inline
int efa_ibv_cq_poll_list_match(struct dlist_entry *entry, const void *cq)
{
struct efa_ibv_cq_poll_list_entry *item;
item = container_of(entry, struct efa_ibv_cq_poll_list_entry, entry);
return (item->cq == cq);
}

/* Serialization must be provided by the caller. */
static inline
int efa_ibv_cq_poll_list_search(struct dlist_entry *poll_list, struct efa_ibv_cq *cq)
{
struct dlist_entry *entry;
struct efa_ibv_cq_poll_list_entry *item;

entry = dlist_find_first_match(poll_list, efa_ibv_cq_poll_list_match, cq);
if (entry)
return -FI_EALREADY;

item = calloc(1, sizeof(*item));
if (!item)
return -FI_ENOMEM;

item->cq = cq;
dlist_insert_tail(&item->entry, poll_list);
//printf("efa_ibv_cq_poll_list_search: poll_list: %p, entry:%p, prev: %p, next: %p\n", (void *) poll_list, (void *) &cq->entry, (void *) &cq->entry.prev, (void *) &cq->entry.next);
return 0;
}

static inline
int efa_ibv_cq_poll_list_insert(struct dlist_entry *poll_list, struct ofi_genlock *lock, struct efa_ibv_cq *cq)
{
int ret = 0;

ofi_genlock_lock(lock);
ret = efa_ibv_cq_poll_list_search(poll_list, cq);
ofi_genlock_unlock(lock);

return (!ret || (ret == -FI_EALREADY)) ? 0 : ret;
}

static inline
void efa_ibv_cq_poll_list_remove(struct dlist_entry *poll_list, struct ofi_genlock *lock,
struct efa_ibv_cq *cq)
{
struct efa_ibv_cq_poll_list_entry *item;
struct dlist_entry *entry;

ofi_genlock_lock(lock);
entry = dlist_remove_first_match(poll_list, efa_ibv_cq_poll_list_match, cq);
ofi_genlock_unlock(lock);

if (entry) {
item = container_of(entry, struct efa_ibv_cq_poll_list_entry, entry);
free(item);
}
}

/**
* @brief Create ibv_cq_ex by calling ibv_create_cq_ex
*
Expand Down
Loading

0 comments on commit d48fb80

Please sign in to comment.