Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

prov/cxi: Fix CQ wait FD logic #10681

Merged
merged 1 commit into from
Jan 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion man/fi_cxi.7.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,9 @@ The CXI provider supports FI_THREAD_SAFE and FI_THREAD_DOMAIN threading models.

The CXI provider supports FI_WAIT_FD and FI_WAIT_POLLFD CQ wait object types.
FI_WAIT_UNSPEC will default to FI_WAIT_FD. However FI_WAIT_NONE should achieve
the lowest latency and reduce interrupt overhead.
the lowest latency and reduce interrupt overhead. NOTE: A process may return
from a epoll_wait/poll when provider progress is required and a CQ event may
not be available.

## Additional Features

Expand Down
19 changes: 13 additions & 6 deletions prov/cxi/include/cxip.h
Original file line number Diff line number Diff line change
Expand Up @@ -1420,8 +1420,8 @@ struct cxip_cq {
*/
struct ofi_genlock ep_list_lock;

/* Internal CXI wait object allocated only if required. */
struct cxil_wait_obj *priv_wait;
/* CXI CQ wait object EPs are maintained in epoll FD */
int ep_fd;

/* CXI specific fields. */
struct cxip_domain *domain;
Expand Down Expand Up @@ -2428,6 +2428,10 @@ struct cxip_ep_obj {
struct cxip_txc *txc;
struct cxip_rxc *rxc;

/* Internal support for CQ wait object */
struct cxil_wait_obj *priv_wait;
int wait_fd;

/* ASIC version associated with EP/Domain */
enum cassini_version asic_ver;

Expand Down Expand Up @@ -3148,7 +3152,8 @@ static inline bool cxip_cmdq_match(struct cxip_cmdq *cmdq, uint16_t vni,
}

int cxip_evtq_init(struct cxip_evtq *evtq, struct cxip_cq *cq,
size_t num_events, size_t num_fc_events);
size_t num_events, size_t num_fc_events,
struct cxil_wait_obj *priv_wait);
void cxip_evtq_fini(struct cxip_evtq *eq);

int cxip_domain(struct fid_fabric *fabric, struct fi_info *info,
Expand Down Expand Up @@ -3228,16 +3233,19 @@ int cxip_cq_req_complete_addr(struct cxip_req *req, fi_addr_t src);
int cxip_cq_req_error(struct cxip_req *req, size_t olen,
int err, int prov_errno, void *err_data,
size_t err_data_size, fi_addr_t src_addr);
int cxip_cq_add_wait_fd(struct cxip_cq *cq, int wait_fd, int events);
void cxip_cq_del_wait_fd(struct cxip_cq *cq, int wait_fd);

int proverr2errno(int err);
struct cxip_req *cxip_evtq_req_alloc(struct cxip_evtq *evtq,
int remap, void *req_ctx);
void cxip_evtq_req_free(struct cxip_req *req);
void cxip_evtq_progress(struct cxip_evtq *evtq);

void cxip_ep_progress(struct fid *fid);
int cxip_ep_peek(struct fid *fid);
void cxip_ep_flush_trig_reqs(struct cxip_ep_obj *ep_obj);

int cxip_cq_trywait(struct cxip_cq *cq);
void cxip_cq_progress(struct cxip_cq *cq);
void cxip_util_cq_progress(struct util_cq *util_cq);
int cxip_cq_open(struct fid_domain *domain, struct fi_cq_attr *attr,
Expand Down Expand Up @@ -3266,8 +3274,7 @@ void cxip_ep_tgt_ctrl_progress(struct cxip_ep_obj *ep_obj);
void cxip_ep_tgt_ctrl_progress_locked(struct cxip_ep_obj *ep_obj);
int cxip_ep_ctrl_init(struct cxip_ep_obj *ep_obj);
void cxip_ep_ctrl_fini(struct cxip_ep_obj *ep_obj);
void cxip_ep_ctrl_del_wait(struct cxip_ep_obj *ep_obj);
int cxip_ep_ctrl_trywait(void *arg);
int cxip_ep_trywait(struct cxip_ep_obj *ep_obj, struct cxip_cq *cq);

int cxip_av_set(struct fid_av *av, struct fi_av_set_attr *attr,
struct fid_av_set **av_set_fid, void * context);
Expand Down
223 changes: 151 additions & 72 deletions prov/cxi/src/cxip_cq.c
Original file line number Diff line number Diff line change
Expand Up @@ -184,37 +184,33 @@ static const char *cxip_cq_strerror(struct fid_cq *cq, int prov_errno,
return errmsg;
}

/*
* cxip_cq_trywait - Return success if able to block waiting for CQ events.
*/
static int cxip_cq_trywait(void *arg)
int cxip_cq_trywait(struct cxip_cq *cq)
{
struct cxip_cq *cq = (struct cxip_cq *)arg;
struct fid_list_entry *fid_entry;
struct dlist_entry *item;
struct cxip_ep *ep;

assert(cq->util_cq.wait);

if (!cq->priv_wait) {
if (cq->ep_fd < 0) {
CXIP_WARN("No CXI wait object\n");
return -FI_EINVAL;
}

ofi_genlock_lock(&cq->util_cq.cq_lock);
if (!ofi_cirque_isempty(cq->util_cq.cirq)) {
ofi_genlock_unlock(&cq->util_cq.cq_lock);
return -FI_EAGAIN;
}
ofi_genlock_unlock(&cq->util_cq.cq_lock);

ofi_genlock_lock(&cq->ep_list_lock);
dlist_foreach(&cq->util_cq.ep_list, item) {
fid_entry = container_of(item, struct fid_list_entry, entry);
if (cxip_ep_peek(fid_entry->fid)) {
ofi_genlock_unlock(&cq->ep_list_lock);
ep = container_of(fid_entry->fid, struct cxip_ep, ep.fid);

return -FI_EAGAIN;
}
}
if (!ep->ep_obj->priv_wait)
continue;

/* Clear wait, and check for any events */
cxil_clear_wait_obj(cq->priv_wait);
dlist_foreach(&cq->util_cq.ep_list, item) {
fid_entry = container_of(item, struct fid_list_entry, entry);
if (cxip_ep_peek(fid_entry->fid)) {
if (cxip_ep_trywait(ep->ep_obj, cq)) {
ofi_genlock_unlock(&cq->ep_list_lock);

return -FI_EAGAIN;
Expand Down Expand Up @@ -256,21 +252,12 @@ static int cxip_cq_close(struct fid *fid)
{
struct cxip_cq *cq = container_of(fid, struct cxip_cq,
util_cq.cq_fid.fid);
int ret;

if (ofi_atomic_get32(&cq->util_cq.ref))
return -FI_EBUSY;

if (cq->priv_wait) {
ret = ofi_wait_del_fd(cq->util_cq.wait,
cxil_get_wait_obj_fd(cq->priv_wait));
if (ret)
CXIP_WARN("Wait FD delete error: %d\n", ret);

ret = cxil_destroy_wait_obj(cq->priv_wait);
if (ret)
CXIP_WARN("Release CXI wait object failed: %d\n", ret);
}
if (cq->ep_fd >= 0)
close(cq->ep_fd);

ofi_cq_cleanup(&cq->util_cq);
ofi_genlock_destroy(&cq->ep_list_lock);
Expand All @@ -281,14 +268,116 @@ static int cxip_cq_close(struct fid *fid)
return 0;
}

static int cxip_cq_signal(struct fid_cq *cq_fid)
{
return -FI_ENOSYS;
}

static int cxip_cq_control(fid_t fid, int command, void *arg)
{
struct cxip_cq *cq = container_of(fid, struct cxip_cq, util_cq.cq_fid);
struct fi_wait_pollfd *pollfd;
int ret;

switch (command) {
case FI_GETWAIT:
if (cq->ep_fd < 0) {
ret = -FI_ENODATA;
break;
}
if (cq->attr.wait_obj == FI_WAIT_FD) {
*(int *) arg = cq->ep_fd;
return FI_SUCCESS;
}

pollfd = arg;
if (pollfd->nfds >= 1) {
pollfd->fd[0].fd = cq->ep_fd;
pollfd->fd[0].events = POLLIN;
pollfd->nfds = 1;

ret = FI_SUCCESS;
} else {
ret = -FI_ETOOSMALL;
}
break;
case FI_GETWAITOBJ:
*(enum fi_wait_obj *) arg = cq->attr.wait_obj;
ret = FI_SUCCESS;
break;
default:
ret = -FI_ENOSYS;
break;
}

return ret;
}

static ssize_t cxip_cq_sreadfrom(struct fid_cq *cq_fid, void *buf,
size_t count, fi_addr_t *src_addr,
const void *cond, int timeout)
{
struct cxip_cq *cq = container_of(cq_fid, struct cxip_cq,
util_cq.cq_fid);
struct epoll_event ev;
uint64_t endtime;
ssize_t ret;

if (!cq->attr.wait_obj)
return -FI_EINVAL;

endtime = ofi_timeout_time(timeout);

do {
ret = fi_cq_readfrom(cq_fid, buf, count, src_addr);
if (ret != -FI_EAGAIN)
break;

if (ofi_adjust_timeout(endtime, &timeout))
return -FI_EAGAIN;

ret = cxip_cq_trywait(cq);
if (ret == -FI_EAGAIN) {
ret = 0;
continue;
}
assert(ret == FI_SUCCESS);

memset(&ev, 0, sizeof(ev));
ret = epoll_wait(cq->ep_fd, &ev, 1, timeout);
if (ret > 0)
ret = 0;

} while (!ret);

return ret == -FI_ETIMEDOUT ? -FI_EAGAIN : ret;
}

static ssize_t cxip_cq_sread(struct fid_cq *cq_fid, void *buf, size_t count,
const void *cond, int timeout)
{
return cxip_cq_sreadfrom(cq_fid, buf, count, NULL, cond, timeout);
}

static struct fi_ops cxip_cq_fi_ops = {
.size = sizeof(struct fi_ops),
.close = cxip_cq_close,
.bind = fi_no_bind,
.control = ofi_cq_control,
.control = cxip_cq_control,
.ops_open = fi_no_ops_open,
};

static struct fi_ops_cq cxip_cq_ops = {
.size = sizeof(struct fi_ops_cq),
.read = ofi_cq_read,
.readfrom = ofi_cq_readfrom,
.readerr = ofi_cq_readerr,
.sread = cxip_cq_sread,
.sreadfrom = cxip_cq_sreadfrom,
.signal = cxip_cq_signal,
.strerror = ofi_cq_strerror,
};

static struct fi_cq_attr cxip_cq_def_attr = {
.flags = 0,
.format = FI_CQ_FORMAT_CONTEXT,
Expand Down Expand Up @@ -348,50 +437,35 @@ static int cxip_cq_verify_attr(struct fi_cq_attr *attr)
return FI_SUCCESS;
}

/*
* cxip_cq_alloc_priv_wait - Allocate an internal wait channel for the CQ.
*/
static int cxip_cq_alloc_priv_wait(struct cxip_cq *cq)
/* EP adds wait FD to the CQ epoll FD */
int cxip_cq_add_wait_fd(struct cxip_cq *cq, int wait_fd, int events)
{
struct epoll_event ev = {
.events = events,
};
int ret;
int wait_fd;

assert(cq->domain);

/* Not required or already created */
if (!cq->util_cq.wait || cq->priv_wait)
return FI_SUCCESS;

ret = cxil_alloc_wait_obj(cq->domain->lni->lni, &cq->priv_wait);
if (ret) {
CXIP_WARN("Allocation of internal wait object failed %d\n",
ret);
return ret;
}

wait_fd = cxil_get_wait_obj_fd(cq->priv_wait);
ret = fi_fd_nonblock(wait_fd);
if (ret) {
CXIP_WARN("Unable to set CQ wait non-blocking mode: %d\n", ret);
goto destroy_wait;
}
ret = epoll_ctl(cq->ep_fd, EPOLL_CTL_ADD, wait_fd, &ev);
if (ret < 0) {
ret = errno;
CXIP_WARN("EP wait FD add to CQ failed %d\n", ret);

ret = ofi_wait_add_fd(cq->util_cq.wait, wait_fd, POLLIN,
cxip_cq_trywait, cq, &cq->util_cq.cq_fid.fid);
if (ret) {
CXIP_WARN("Add FD of internal wait object failed: %d\n", ret);
goto destroy_wait;
return -FI_EINVAL;
}

CXIP_DBG("Add CQ private wait object, CQ intr FD: %d\n", wait_fd);

return FI_SUCCESS;
}

destroy_wait:
cxil_destroy_wait_obj(cq->priv_wait);
cq->priv_wait = NULL;
/* EP deletes wait FD from the CQ epoll FD */
void cxip_cq_del_wait_fd(struct cxip_cq *cq, int wait_fd)
{
int ret;

return ret;
ret = epoll_ctl(cq->ep_fd, EPOLL_CTL_DEL, wait_fd, NULL);
if (ret < 0) {
ret = errno;
CXIP_WARN("EP wait FD delete from CQ failed %d\n", ret);
}
}

/*
Expand All @@ -402,6 +476,7 @@ int cxip_cq_open(struct fid_domain *domain, struct fi_cq_attr *attr,
{
struct cxip_domain *cxi_dom;
struct cxip_cq *cxi_cq;
struct fi_cq_attr temp_attr;
int ret;

if (!domain || !cq)
Expand All @@ -425,7 +500,10 @@ int cxip_cq_open(struct fid_domain *domain, struct fi_cq_attr *attr,
cxi_cq->attr = *attr;
}

ret = ofi_cq_init(&cxip_prov, domain, &cxi_cq->attr, &cxi_cq->util_cq,
/* CXI does not use common code internal wait object */
temp_attr = cxi_cq->attr;
temp_attr.wait_obj = FI_WAIT_NONE;
ret = ofi_cq_init(&cxip_prov, domain, &temp_attr, &cxi_cq->util_cq,
cxip_util_cq_progress, context);
if (ret != FI_SUCCESS) {
CXIP_WARN("ofi_cq_init() failed: %d\n", ret);
Expand All @@ -434,9 +512,10 @@ int cxip_cq_open(struct fid_domain *domain, struct fi_cq_attr *attr,

cxi_cq->util_cq.cq_fid.ops->strerror = &cxip_cq_strerror;
cxi_cq->util_cq.cq_fid.fid.ops = &cxip_cq_fi_ops;

cxi_cq->util_cq.cq_fid.ops = &cxip_cq_ops;
cxi_cq->domain = cxi_dom;
cxi_cq->ack_batch_size = cxip_env.eq_ack_batch_size;
cxi_cq->ep_fd = -1;

/* Optimize locking when possible */
if (cxi_dom->util_domain.threading == FI_THREAD_DOMAIN ||
Expand All @@ -445,11 +524,11 @@ int cxip_cq_open(struct fid_domain *domain, struct fi_cq_attr *attr,
else
ofi_genlock_init(&cxi_cq->ep_list_lock, OFI_LOCK_SPINLOCK);

if (cxi_cq->util_cq.wait) {
ret = cxip_cq_alloc_priv_wait(cxi_cq);
if (ret != FI_SUCCESS) {
CXIP_WARN("Unable to allocate CXI wait obj: %d\n",
ret);
if (cxi_cq->attr.wait_obj) {
cxi_cq->ep_fd = epoll_create1(0);
if (cxi_cq->ep_fd < 0) {
CXIP_WARN("Unable to open epoll FD: %s\n",
strerror(errno));
goto err_wait_alloc;
}
}
Expand Down
Loading
Loading