diff --git a/man/fi_cxi.7.md b/man/fi_cxi.7.md index c2cbffe2b52..0109d19e9ed 100644 --- a/man/fi_cxi.7.md +++ b/man/fi_cxi.7.md @@ -80,7 +80,9 @@ The CXI provider supports FI_THREAD_SAFE and FI_THREAD_DOMAIN threading models. The CXI provider supports FI_WAIT_FD and FI_WAIT_POLLFD CQ wait object types. FI_WAIT_UNSPEC will default to FI_WAIT_FD. However FI_WAIT_NONE should achieve -the lowest latency and reduce interrupt overhead. +the lowest latency and reduce interrupt overhead. NOTE: A process may return +from a epoll_wait/poll when provider progress is required and a CQ event may +not be available. ## Additional Features diff --git a/prov/cxi/include/cxip.h b/prov/cxi/include/cxip.h index 0a73fc90582..34a4a9d242c 100644 --- a/prov/cxi/include/cxip.h +++ b/prov/cxi/include/cxip.h @@ -1420,8 +1420,8 @@ struct cxip_cq { */ struct ofi_genlock ep_list_lock; - /* Internal CXI wait object allocated only if required. */ - struct cxil_wait_obj *priv_wait; + /* CXI CQ wait object EPs are maintained in epoll FD */ + int ep_fd; /* CXI specific fields. */ struct cxip_domain *domain; @@ -2428,6 +2428,10 @@ struct cxip_ep_obj { struct cxip_txc *txc; struct cxip_rxc *rxc; + /* Internal support for CQ wait object */ + struct cxil_wait_obj *priv_wait; + int wait_fd; + /* ASIC version associated with EP/Domain */ enum cassini_version asic_ver; @@ -3148,7 +3152,8 @@ static inline bool cxip_cmdq_match(struct cxip_cmdq *cmdq, uint16_t vni, } int cxip_evtq_init(struct cxip_evtq *evtq, struct cxip_cq *cq, - size_t num_events, size_t num_fc_events); + size_t num_events, size_t num_fc_events, + struct cxil_wait_obj *priv_wait); void cxip_evtq_fini(struct cxip_evtq *eq); int cxip_domain(struct fid_fabric *fabric, struct fi_info *info, @@ -3228,6 +3233,9 @@ int cxip_cq_req_complete_addr(struct cxip_req *req, fi_addr_t src); int cxip_cq_req_error(struct cxip_req *req, size_t olen, int err, int prov_errno, void *err_data, size_t err_data_size, fi_addr_t src_addr); +int cxip_cq_add_wait_fd(struct cxip_cq *cq, int wait_fd, int events); +void cxip_cq_del_wait_fd(struct cxip_cq *cq, int wait_fd); + int proverr2errno(int err); struct cxip_req *cxip_evtq_req_alloc(struct cxip_evtq *evtq, int remap, void *req_ctx); @@ -3235,9 +3243,9 @@ void cxip_evtq_req_free(struct cxip_req *req); void cxip_evtq_progress(struct cxip_evtq *evtq); void cxip_ep_progress(struct fid *fid); -int cxip_ep_peek(struct fid *fid); void cxip_ep_flush_trig_reqs(struct cxip_ep_obj *ep_obj); +int cxip_cq_trywait(struct cxip_cq *cq); void cxip_cq_progress(struct cxip_cq *cq); void cxip_util_cq_progress(struct util_cq *util_cq); int cxip_cq_open(struct fid_domain *domain, struct fi_cq_attr *attr, @@ -3266,8 +3274,7 @@ void cxip_ep_tgt_ctrl_progress(struct cxip_ep_obj *ep_obj); void cxip_ep_tgt_ctrl_progress_locked(struct cxip_ep_obj *ep_obj); int cxip_ep_ctrl_init(struct cxip_ep_obj *ep_obj); void cxip_ep_ctrl_fini(struct cxip_ep_obj *ep_obj); -void cxip_ep_ctrl_del_wait(struct cxip_ep_obj *ep_obj); -int cxip_ep_ctrl_trywait(void *arg); +int cxip_ep_trywait(struct cxip_ep_obj *ep_obj, struct cxip_cq *cq); int cxip_av_set(struct fid_av *av, struct fi_av_set_attr *attr, struct fid_av_set **av_set_fid, void * context); diff --git a/prov/cxi/src/cxip_cq.c b/prov/cxi/src/cxip_cq.c index 1c6504c90bc..f55eb27141f 100644 --- a/prov/cxi/src/cxip_cq.c +++ b/prov/cxi/src/cxip_cq.c @@ -184,37 +184,33 @@ static const char *cxip_cq_strerror(struct fid_cq *cq, int prov_errno, return errmsg; } -/* - * cxip_cq_trywait - Return success if able to block waiting for CQ events. - */ -static int cxip_cq_trywait(void *arg) +int cxip_cq_trywait(struct cxip_cq *cq) { - struct cxip_cq *cq = (struct cxip_cq *)arg; struct fid_list_entry *fid_entry; struct dlist_entry *item; + struct cxip_ep *ep; - assert(cq->util_cq.wait); - - if (!cq->priv_wait) { + if (cq->ep_fd < 0) { CXIP_WARN("No CXI wait object\n"); return -FI_EINVAL; } + ofi_genlock_lock(&cq->util_cq.cq_lock); + if (!ofi_cirque_isempty(cq->util_cq.cirq)) { + ofi_genlock_unlock(&cq->util_cq.cq_lock); + return -FI_EAGAIN; + } + ofi_genlock_unlock(&cq->util_cq.cq_lock); + ofi_genlock_lock(&cq->ep_list_lock); dlist_foreach(&cq->util_cq.ep_list, item) { fid_entry = container_of(item, struct fid_list_entry, entry); - if (cxip_ep_peek(fid_entry->fid)) { - ofi_genlock_unlock(&cq->ep_list_lock); + ep = container_of(fid_entry->fid, struct cxip_ep, ep.fid); - return -FI_EAGAIN; - } - } + if (!ep->ep_obj->priv_wait) + continue; - /* Clear wait, and check for any events */ - cxil_clear_wait_obj(cq->priv_wait); - dlist_foreach(&cq->util_cq.ep_list, item) { - fid_entry = container_of(item, struct fid_list_entry, entry); - if (cxip_ep_peek(fid_entry->fid)) { + if (cxip_ep_trywait(ep->ep_obj, cq)) { ofi_genlock_unlock(&cq->ep_list_lock); return -FI_EAGAIN; @@ -256,21 +252,12 @@ static int cxip_cq_close(struct fid *fid) { struct cxip_cq *cq = container_of(fid, struct cxip_cq, util_cq.cq_fid.fid); - int ret; if (ofi_atomic_get32(&cq->util_cq.ref)) return -FI_EBUSY; - if (cq->priv_wait) { - ret = ofi_wait_del_fd(cq->util_cq.wait, - cxil_get_wait_obj_fd(cq->priv_wait)); - if (ret) - CXIP_WARN("Wait FD delete error: %d\n", ret); - - ret = cxil_destroy_wait_obj(cq->priv_wait); - if (ret) - CXIP_WARN("Release CXI wait object failed: %d\n", ret); - } + if (cq->ep_fd >= 0) + close(cq->ep_fd); ofi_cq_cleanup(&cq->util_cq); ofi_genlock_destroy(&cq->ep_list_lock); @@ -281,14 +268,116 @@ static int cxip_cq_close(struct fid *fid) return 0; } +static int cxip_cq_signal(struct fid_cq *cq_fid) +{ + return -FI_ENOSYS; +} + +static int cxip_cq_control(fid_t fid, int command, void *arg) +{ + struct cxip_cq *cq = container_of(fid, struct cxip_cq, util_cq.cq_fid); + struct fi_wait_pollfd *pollfd; + int ret; + + switch (command) { + case FI_GETWAIT: + if (cq->ep_fd < 0) { + ret = -FI_ENODATA; + break; + } + if (cq->attr.wait_obj == FI_WAIT_FD) { + *(int *) arg = cq->ep_fd; + return FI_SUCCESS; + } + + pollfd = arg; + if (pollfd->nfds >= 1) { + pollfd->fd[0].fd = cq->ep_fd; + pollfd->fd[0].events = POLLIN; + pollfd->nfds = 1; + + ret = FI_SUCCESS; + } else { + ret = -FI_ETOOSMALL; + } + break; + case FI_GETWAITOBJ: + *(enum fi_wait_obj *) arg = cq->attr.wait_obj; + ret = FI_SUCCESS; + break; + default: + ret = -FI_ENOSYS; + break; + } + + return ret; +} + +static ssize_t cxip_cq_sreadfrom(struct fid_cq *cq_fid, void *buf, + size_t count, fi_addr_t *src_addr, + const void *cond, int timeout) +{ + struct cxip_cq *cq = container_of(cq_fid, struct cxip_cq, + util_cq.cq_fid); + struct epoll_event ev; + uint64_t endtime; + ssize_t ret; + + if (!cq->attr.wait_obj) + return -FI_EINVAL; + + endtime = ofi_timeout_time(timeout); + + do { + ret = fi_cq_readfrom(cq_fid, buf, count, src_addr); + if (ret != -FI_EAGAIN) + break; + + if (ofi_adjust_timeout(endtime, &timeout)) + return -FI_EAGAIN; + + ret = cxip_cq_trywait(cq); + if (ret == -FI_EAGAIN) { + ret = 0; + continue; + } + assert(ret == FI_SUCCESS); + + memset(&ev, 0, sizeof(ev)); + ret = epoll_wait(cq->ep_fd, &ev, 1, timeout); + if (ret > 0) + ret = 0; + + } while (!ret); + + return ret == -FI_ETIMEDOUT ? -FI_EAGAIN : ret; +} + +static ssize_t cxip_cq_sread(struct fid_cq *cq_fid, void *buf, size_t count, + const void *cond, int timeout) +{ + return cxip_cq_sreadfrom(cq_fid, buf, count, NULL, cond, timeout); +} + static struct fi_ops cxip_cq_fi_ops = { .size = sizeof(struct fi_ops), .close = cxip_cq_close, .bind = fi_no_bind, - .control = ofi_cq_control, + .control = cxip_cq_control, .ops_open = fi_no_ops_open, }; +static struct fi_ops_cq cxip_cq_ops = { + .size = sizeof(struct fi_ops_cq), + .read = ofi_cq_read, + .readfrom = ofi_cq_readfrom, + .readerr = ofi_cq_readerr, + .sread = cxip_cq_sread, + .sreadfrom = cxip_cq_sreadfrom, + .signal = cxip_cq_signal, + .strerror = ofi_cq_strerror, +}; + static struct fi_cq_attr cxip_cq_def_attr = { .flags = 0, .format = FI_CQ_FORMAT_CONTEXT, @@ -348,50 +437,35 @@ static int cxip_cq_verify_attr(struct fi_cq_attr *attr) return FI_SUCCESS; } -/* - * cxip_cq_alloc_priv_wait - Allocate an internal wait channel for the CQ. - */ -static int cxip_cq_alloc_priv_wait(struct cxip_cq *cq) +/* EP adds wait FD to the CQ epoll FD */ +int cxip_cq_add_wait_fd(struct cxip_cq *cq, int wait_fd, int events) { + struct epoll_event ev = { + .events = events, + }; int ret; - int wait_fd; - - assert(cq->domain); - - /* Not required or already created */ - if (!cq->util_cq.wait || cq->priv_wait) - return FI_SUCCESS; - - ret = cxil_alloc_wait_obj(cq->domain->lni->lni, &cq->priv_wait); - if (ret) { - CXIP_WARN("Allocation of internal wait object failed %d\n", - ret); - return ret; - } - wait_fd = cxil_get_wait_obj_fd(cq->priv_wait); - ret = fi_fd_nonblock(wait_fd); - if (ret) { - CXIP_WARN("Unable to set CQ wait non-blocking mode: %d\n", ret); - goto destroy_wait; - } + ret = epoll_ctl(cq->ep_fd, EPOLL_CTL_ADD, wait_fd, &ev); + if (ret < 0) { + ret = errno; + CXIP_WARN("EP wait FD add to CQ failed %d\n", ret); - ret = ofi_wait_add_fd(cq->util_cq.wait, wait_fd, POLLIN, - cxip_cq_trywait, cq, &cq->util_cq.cq_fid.fid); - if (ret) { - CXIP_WARN("Add FD of internal wait object failed: %d\n", ret); - goto destroy_wait; + return -FI_EINVAL; } - CXIP_DBG("Add CQ private wait object, CQ intr FD: %d\n", wait_fd); - return FI_SUCCESS; +} -destroy_wait: - cxil_destroy_wait_obj(cq->priv_wait); - cq->priv_wait = NULL; +/* EP deletes wait FD from the CQ epoll FD */ +void cxip_cq_del_wait_fd(struct cxip_cq *cq, int wait_fd) +{ + int ret; - return ret; + ret = epoll_ctl(cq->ep_fd, EPOLL_CTL_DEL, wait_fd, NULL); + if (ret < 0) { + ret = errno; + CXIP_WARN("EP wait FD delete from CQ failed %d\n", ret); + } } /* @@ -402,6 +476,7 @@ int cxip_cq_open(struct fid_domain *domain, struct fi_cq_attr *attr, { struct cxip_domain *cxi_dom; struct cxip_cq *cxi_cq; + struct fi_cq_attr temp_attr; int ret; if (!domain || !cq) @@ -425,7 +500,10 @@ int cxip_cq_open(struct fid_domain *domain, struct fi_cq_attr *attr, cxi_cq->attr = *attr; } - ret = ofi_cq_init(&cxip_prov, domain, &cxi_cq->attr, &cxi_cq->util_cq, + /* CXI does not use common code internal wait object */ + temp_attr = cxi_cq->attr; + temp_attr.wait_obj = FI_WAIT_NONE; + ret = ofi_cq_init(&cxip_prov, domain, &temp_attr, &cxi_cq->util_cq, cxip_util_cq_progress, context); if (ret != FI_SUCCESS) { CXIP_WARN("ofi_cq_init() failed: %d\n", ret); @@ -434,9 +512,10 @@ int cxip_cq_open(struct fid_domain *domain, struct fi_cq_attr *attr, cxi_cq->util_cq.cq_fid.ops->strerror = &cxip_cq_strerror; cxi_cq->util_cq.cq_fid.fid.ops = &cxip_cq_fi_ops; - + cxi_cq->util_cq.cq_fid.ops = &cxip_cq_ops; cxi_cq->domain = cxi_dom; cxi_cq->ack_batch_size = cxip_env.eq_ack_batch_size; + cxi_cq->ep_fd = -1; /* Optimize locking when possible */ if (cxi_dom->util_domain.threading == FI_THREAD_DOMAIN || @@ -445,11 +524,11 @@ int cxip_cq_open(struct fid_domain *domain, struct fi_cq_attr *attr, else ofi_genlock_init(&cxi_cq->ep_list_lock, OFI_LOCK_SPINLOCK); - if (cxi_cq->util_cq.wait) { - ret = cxip_cq_alloc_priv_wait(cxi_cq); - if (ret != FI_SUCCESS) { - CXIP_WARN("Unable to allocate CXI wait obj: %d\n", - ret); + if (cxi_cq->attr.wait_obj) { + cxi_cq->ep_fd = epoll_create1(0); + if (cxi_cq->ep_fd < 0) { + CXIP_WARN("Unable to open epoll FD: %s\n", + strerror(errno)); goto err_wait_alloc; } } diff --git a/prov/cxi/src/cxip_ctrl.c b/prov/cxi/src/cxip_ctrl.c index bb543b6409a..03b117b7ef4 100644 --- a/prov/cxi/src/cxip_ctrl.c +++ b/prov/cxi/src/cxip_ctrl.c @@ -406,36 +406,6 @@ void cxip_ep_tgt_ctrl_progress_locked(struct cxip_ep_obj *ep_obj) cxip_ep_ctrl_eq_progress(ep_obj, ep_obj->ctrl.tgt_evtq, false, true); } -/* - * cxip_ep_ctrl_trywait() - Return 0 if no events need to be progressed. - */ -int cxip_ep_ctrl_trywait(void *arg) -{ - struct cxip_ep_obj *ep_obj = (struct cxip_ep_obj *)arg; - - if (!ep_obj->ctrl.wait) { - CXIP_WARN("No CXI ep_obj wait object\n"); - return -FI_EINVAL; - } - - if (cxi_eq_peek_event(ep_obj->ctrl.tgt_evtq) || - cxi_eq_peek_event(ep_obj->ctrl.tx_evtq)) - return -FI_EAGAIN; - - ofi_genlock_lock(&ep_obj->lock); - cxil_clear_wait_obj(ep_obj->ctrl.wait); - - if (cxi_eq_peek_event(ep_obj->ctrl.tgt_evtq) || - cxi_eq_peek_event(ep_obj->ctrl.tx_evtq)) { - ofi_genlock_unlock(&ep_obj->lock); - - return -FI_EAGAIN; - } - ofi_genlock_unlock(&ep_obj->lock); - - return FI_SUCCESS; -} - static void cxip_eq_ctrl_eq_free(void *eq_buf, struct cxi_md *eq_md, struct cxi_eq *eq) { @@ -484,7 +454,7 @@ static int cxip_ep_ctrl_eq_alloc(struct cxip_ep_obj *ep_obj, size_t len, /* ep_obj->ctrl.wait will be NULL if not required */ ret = cxil_alloc_evtq(ep_obj->domain->lni->lni, *eq_md, &eq_attr, - ep_obj->ctrl.wait, NULL, eq); + ep_obj->priv_wait, NULL, eq); if (ret) goto err_free_eq_md; @@ -500,107 +470,6 @@ static int cxip_ep_ctrl_eq_alloc(struct cxip_ep_obj *ep_obj, size_t len, return ret; } -/* - * cxip_ep_wait_required() - return true if base EP wait object is required. - */ -static bool cxip_ctrl_wait_required(struct cxip_ep_obj *ep_obj) -{ - if (ep_obj->rxc->recv_cq && ep_obj->rxc->recv_cq->priv_wait) - return true; - - if (ep_obj->txc->send_cq && ep_obj->txc->send_cq->priv_wait) - return true; - - return false; -} - -/* - * cxip_ep_ctrl_del_wait() - Delete control FD object - */ -void cxip_ep_ctrl_del_wait(struct cxip_ep_obj *ep_obj) -{ - int wait_fd; - - wait_fd = cxil_get_wait_obj_fd(ep_obj->ctrl.wait); - - if (ep_obj->txc->send_cq) { - ofi_wait_del_fd(ep_obj->txc->send_cq->util_cq.wait, wait_fd); - CXIP_DBG("Deleted control HW EQ FD: %d from CQ: %p\n", - wait_fd, ep_obj->txc->send_cq); - } - - if (ep_obj->rxc->recv_cq && - ep_obj->rxc->recv_cq != ep_obj->txc->send_cq) { - ofi_wait_del_fd(ep_obj->rxc->recv_cq->util_cq.wait, wait_fd); - CXIP_DBG("Deleted control HW EQ FD: %d from CQ %p\n", - wait_fd, ep_obj->rxc->recv_cq); - } -} - -/* - * cxip_ep_ctrl_add_wait() - Add control FD to CQ object - */ -int cxip_ep_ctrl_add_wait(struct cxip_ep_obj *ep_obj) -{ - struct cxip_cq *cq; - int wait_fd; - int ret; - - ret = cxil_alloc_wait_obj(ep_obj->domain->lni->lni, - &ep_obj->ctrl.wait); - if (ret) { - CXIP_WARN("Control wait object allocation failed: %d\n", ret); - return -FI_ENOMEM; - } - - wait_fd = cxil_get_wait_obj_fd(ep_obj->ctrl.wait); - ret = fi_fd_nonblock(wait_fd); - if (ret) { - CXIP_WARN("Unable to set control wait non-blocking: %d, %s\n", - ret, fi_strerror(-ret)); - goto err; - } - - cq = ep_obj->txc->send_cq; - if (cq) { - ret = ofi_wait_add_fd(cq->util_cq.wait, wait_fd, - POLLIN, cxip_ep_ctrl_trywait, ep_obj, - &cq->util_cq.cq_fid.fid); - if (ret) { - CXIP_WARN("TX CQ add FD failed: %d, %s\n", - ret, fi_strerror(-ret)); - goto err; - } - } - - if (ep_obj->rxc->recv_cq && ep_obj->rxc->recv_cq != cq) { - cq = ep_obj->rxc->recv_cq; - - ret = ofi_wait_add_fd(cq->util_cq.wait, wait_fd, - POLLIN, cxip_ep_ctrl_trywait, ep_obj, - &cq->util_cq.cq_fid.fid); - if (ret) { - CXIP_WARN("RX CQ add FD failed: %d, %s\n", - ret, fi_strerror(-ret)); - goto err_add_fd; - } - } - - CXIP_DBG("Added control EQ private wait object, intr FD: %d\n", - wait_fd); - - return FI_SUCCESS; - -err_add_fd: - if (ep_obj->txc->send_cq) - ofi_wait_del_fd(ep_obj->txc->send_cq->util_cq.wait, wait_fd); -err: - cxil_destroy_wait_obj(ep_obj->ctrl.wait); - ep_obj->ctrl.wait = NULL; - - return ret; -} - /* * cxip_ep_ctrl_init() - Initialize endpoint control resources. * @@ -624,20 +493,6 @@ int cxip_ep_ctrl_init(struct cxip_ep_obj *ep_obj) if (ep_obj->domain->mr_match_events) pt_opts.en_event_match = 1; - /* If CQ(s) are using a wait object, then control event - * queues need to unblock CQ poll as well. CQ will add the - * associated FD to the CQ FD list. - */ - if (cxip_ctrl_wait_required(ep_obj)) { - ret = cxil_alloc_wait_obj(ep_obj->domain->lni->lni, - &ep_obj->ctrl.wait); - if (ret) { - CXIP_WARN("EP ctrl wait object alloc failed: %d\n", - ret); - return ret; - } - } - ret = cxip_ep_ctrl_eq_alloc(ep_obj, 4 * sc_page_size, &ep_obj->ctrl.tx_evtq_buf, &ep_obj->ctrl.tx_evtq_buf_md, diff --git a/prov/cxi/src/cxip_ep.c b/prov/cxi/src/cxip_ep.c index aebec245ef7..48333d02ae2 100644 --- a/prov/cxi/src/cxip_ep.c +++ b/prov/cxi/src/cxip_ep.c @@ -187,26 +187,6 @@ void cxip_ep_progress(struct fid *fid) } } -/* - * cxip_ep_peek() - Peek at EP event queues - * - * Return whether the associated EP event queues are empty. - */ -int cxip_ep_peek(struct fid *fid) -{ - struct cxip_ep *ep = container_of(fid, struct cxip_ep, ep.fid); - struct cxip_ep_obj *ep_obj = ep->ep_obj; - - if (ep_obj->txc->tx_evtq.eq && - cxi_eq_peek_event(ep_obj->txc->tx_evtq.eq)) - return -FI_EAGAIN; - if (ep_obj->rxc->rx_evtq.eq && - cxi_eq_peek_event(ep_obj->rxc->rx_evtq.eq)) - return -FI_EAGAIN; - - return FI_SUCCESS; -} - /* * fi_ep_get_unexpected_msgs() - Get unexpected message information, exposed * via domain open ops. @@ -491,6 +471,134 @@ ssize_t cxip_ep_cancel(fid_t fid, void *context) return ret; } +/* + * cxip_ep_destroy_priv_wait - Free an internal wait channel for the EP. + */ +static void cxip_ep_destroy_priv_wait(struct cxip_ep_obj *ep_obj) +{ + assert(ep_obj->priv_wait); + + if (ep_obj->txc->send_cq && ep_obj->txc->send_cq->attr.wait_obj) + cxip_cq_del_wait_fd(ep_obj->txc->send_cq, ep_obj->wait_fd); + + if (ep_obj->rxc->recv_cq && ep_obj->rxc->recv_cq->attr.wait_obj && + ep_obj->rxc->recv_cq != ep_obj->txc->send_cq) + cxip_cq_del_wait_fd(ep_obj->rxc->recv_cq, ep_obj->wait_fd); + + cxil_destroy_wait_obj(ep_obj->priv_wait); + + ep_obj->priv_wait = NULL; + ep_obj->wait_fd = -1; +} + +/* + * cxip_ep_alloc_priv_wait - Allocate an internal wait channel for the EP. + */ +static int cxip_ep_alloc_priv_wait(struct cxip_ep_obj *ep_obj) +{ + bool tx_cq_added = false; + int ret; + + assert(ep_obj->priv_wait == NULL); + + ret = cxil_alloc_wait_obj(ep_obj->domain->lni->lni, &ep_obj->priv_wait); + if (ret) { + CXIP_WARN("Alloc of EP internal wait object failed %d\n", + ret); + return ret; + } + + ep_obj->wait_fd = cxil_get_wait_obj_fd(ep_obj->priv_wait); + ret = fi_fd_nonblock(ep_obj->wait_fd); + if (ret) { + CXIP_WARN("Unable to set EP wait non-blocking mode: %d\n", ret); + goto destroy_wait; + } + + if (ep_obj->txc->send_cq && ep_obj->txc->send_cq->attr.wait_obj) { + ret = cxip_cq_add_wait_fd(ep_obj->txc->send_cq, ep_obj->wait_fd, + EPOLLPRI | POLLERR); + if (ret) + goto destroy_wait; + + tx_cq_added = true; + } + + if (ep_obj->rxc->recv_cq && ep_obj->rxc->recv_cq->attr.wait_obj && + ep_obj->rxc->recv_cq != ep_obj->txc->send_cq) { + ret = cxip_cq_add_wait_fd(ep_obj->rxc->recv_cq, ep_obj->wait_fd, + EPOLLPRI | POLLERR); + if (ret) { + if (tx_cq_added) + cxip_cq_del_wait_fd(ep_obj->txc->send_cq, + ep_obj->wait_fd); + goto destroy_wait; + } + } + + CXIP_DBG("Add EP private wait object, EP intr FD: %d\n", + ep_obj->wait_fd); + + return FI_SUCCESS; + +destroy_wait: + cxil_destroy_wait_obj(ep_obj->priv_wait); + ep_obj->priv_wait = NULL; + ep_obj->wait_fd = -1; + + return ret; +} + +/* + * cxip_ep_trywait() - Determine if hardware events are waiting to be processed + * for EP based on CQ. + */ +int cxip_ep_trywait(struct cxip_ep_obj *ep_obj, struct cxip_cq *cq) +{ + assert(ep_obj->priv_wait); + + ofi_genlock_lock(&ep_obj->lock); + cxil_clear_wait_obj(ep_obj->priv_wait); + + /* Enable any currently disabled EQ interrupts, if events are + * ready shortcut and return. + */ + if ((ep_obj->txc->send_cq == cq || + ep_obj->rxc->recv_cq == cq) && ep_obj->txc->tx_evtq.eq) { + cxi_eq_int_enable(ep_obj->txc->tx_evtq.eq); + ep_obj->txc->tx_evtq.unacked_events = 0; + + if (cxi_eq_peek_event(ep_obj->txc->tx_evtq.eq)) + goto ready; + } + + if (ep_obj->rxc->recv_cq == cq && ep_obj->rxc->rx_evtq.eq) { + cxi_eq_int_enable(ep_obj->rxc->rx_evtq.eq); + ep_obj->rxc->rx_evtq.unacked_events = 0; + + if (cxi_eq_peek_event(ep_obj->rxc->rx_evtq.eq)) + goto ready; + } + + /* Side band control messages can also require progress */ + cxi_eq_int_enable(ep_obj->ctrl.tx_evtq); + if (cxi_eq_peek_event(ep_obj->ctrl.tx_evtq)) + goto ready; + + cxi_eq_int_enable(ep_obj->ctrl.tgt_evtq); + if (cxi_eq_peek_event(ep_obj->ctrl.tgt_evtq)) + goto ready; + + ofi_genlock_unlock(&ep_obj->lock); + + return FI_SUCCESS; + +ready: + ofi_genlock_unlock(&ep_obj->lock); + + return -FI_EAGAIN; +} + /* * cxip_ep_enable() - Enable standard EP. */ @@ -504,10 +612,23 @@ static int cxip_ep_enable(struct fid_ep *fid_ep) if (ep_obj->enabled) goto unlock; + /* Allocate an EP internal wait object if a CQ is bound with a + * wait object specified. + */ + if ((ep_obj->txc->send_cq && ep_obj->txc->send_cq->attr.wait_obj) || + (ep_obj->rxc->recv_cq && ep_obj->rxc->recv_cq->attr.wait_obj)) { + ret = cxip_ep_alloc_priv_wait(ep_obj); + if (ret) { + CXIP_WARN("EP internal wait alloc failed %s\n", + fi_strerror(-ret)); + goto unlock; + } + } + if (!ep_obj->av) { CXIP_WARN("Endpoint must be bound to an AV\n"); ret = -FI_ENOAV; - goto unlock; + goto free_wait; } assert(ep_obj->domain->enabled); @@ -517,7 +638,7 @@ static int cxip_ep_enable(struct fid_ep *fid_ep) ret = cxip_av_auth_key_get_vnis(ep_obj->av, &ep_obj->vnis, &ep_obj->vni_count); if (ret) - goto unlock; + goto free_wait; ret = cxip_portals_table_alloc(ep_obj->domain->lni, ep_obj->vnis, ep_obj->vni_count, @@ -541,7 +662,7 @@ static int cxip_ep_enable(struct fid_ep *fid_ep) if (ret != FI_SUCCESS) { CXIP_WARN("Failed to allocate portals table: %d\n", ret); - goto unlock; + goto free_wait; } } @@ -625,6 +746,10 @@ static int cxip_ep_enable(struct fid_ep *fid_ep) ep_obj->vni_count); ep_obj->vnis = NULL; } +free_wait: + if (ep_obj->priv_wait) + cxip_ep_destroy_priv_wait(ep_obj); + unlock: ofi_genlock_unlock(&ep_obj->lock); @@ -688,6 +813,8 @@ int cxip_free_endpoint(struct cxip_ep *ep) cxip_txc_close(ep); cxip_rxc_close(ep); cxip_ep_disable(ep_obj); + if (ep_obj->priv_wait) + cxip_ep_destroy_priv_wait(ep_obj); ofi_genlock_unlock(&ep_obj->lock); ofi_atomic_dec32(&ep_obj->domain->ref); @@ -695,6 +822,7 @@ int cxip_free_endpoint(struct cxip_ep *ep) cxip_txc_free(ep_obj->txc); cxip_rxc_free(ep_obj->rxc); + free(ep_obj); ep->ep_obj = NULL; @@ -1277,6 +1405,7 @@ int cxip_alloc_endpoint(struct cxip_domain *cxip_dom, struct fi_info *hints, ep_obj->tgq_size = hints->rx_attr->size; ep_obj->tx_attr = *hints->tx_attr; ep_obj->rx_attr = *hints->rx_attr; + ep_obj->wait_fd = -1; ep_obj->asic_ver = cxip_dom->iface->info->cassini_version; diff --git a/prov/cxi/src/cxip_evtq.c b/prov/cxi/src/cxip_evtq.c index e4c90c31980..42384ca85a8 100644 --- a/prov/cxi/src/cxip_evtq.c +++ b/prov/cxi/src/cxip_evtq.c @@ -457,7 +457,8 @@ static size_t cxip_evtq_get_queue_size(struct cxip_cq *cq, size_t num_events) #define MAP_HUGE_2MB (21 << MAP_HUGE_SHIFT) int cxip_evtq_init(struct cxip_evtq *evtq, struct cxip_cq *cq, - size_t num_events, size_t num_fc_events) + size_t num_events, size_t num_fc_events, + struct cxil_wait_obj *priv_wait) { struct cxi_eq_attr eq_attr = { .reserved_slots = num_fc_events, @@ -561,7 +562,7 @@ int cxip_evtq_init(struct cxip_evtq *evtq, struct cxip_cq *cq, /* cq->priv_wait is NULL if not backed by wait object */ ret = cxil_alloc_evtq(cq->domain->lni->lni, evtq->md, &eq_attr, - cq->priv_wait, NULL, &evtq->eq); + priv_wait, NULL, &evtq->eq); if (ret) { CXIP_WARN("Failed to allocated EQ: %d\n", ret); goto err_unmap_eq_buf; diff --git a/prov/cxi/src/cxip_fabric.c b/prov/cxi/src/cxip_fabric.c index c8528cf829c..b9eede784a4 100644 --- a/prov/cxi/src/cxip_fabric.c +++ b/prov/cxi/src/cxip_fabric.c @@ -24,13 +24,41 @@ int cxip_eq_def_sz = CXIP_EQ_DEF_SZ; static int read_default_params; +static int cxip_trywait(struct fid_fabric *fabric, struct fid **fids, + int count) +{ + struct cxip_cq *cq; + int ret; + int i; + + for (i = 0; i < count; i++) { + switch (fids[i]->fclass) { + case FI_CLASS_CQ: + cq = container_of(fids[i], struct cxip_cq, + util_cq.cq_fid.fid); + ret = cxip_cq_trywait(cq); + if (ret) + return ret; + break; + case FI_CLASS_EQ: + case FI_CLASS_CNTR: + case FI_CLASS_WAIT: + return -FI_ENOSYS; + default: + return -FI_EINVAL; + } + } + + return FI_SUCCESS; +} + static struct fi_ops_fabric cxip_fab_ops = { .size = sizeof(struct fi_ops_fabric), .domain = cxip_domain, .passive_ep = fi_no_passive_ep, .eq_open = cxip_eq_open, - .wait_open = ofi_wait_fd_open, - .trywait = ofi_trywait, + .wait_open = fi_no_wait_open, + .trywait = cxip_trywait, }; static int cxip_fabric_close(fid_t fid) diff --git a/prov/cxi/src/cxip_rxc.c b/prov/cxi/src/cxip_rxc.c index cc3f3e9f91a..8051ccdcade 100644 --- a/prov/cxi/src/cxip_rxc.c +++ b/prov/cxi/src/cxip_rxc.c @@ -127,7 +127,8 @@ static int rxc_msg_init(struct cxip_rxc *rxc) /* Base message initialization */ num_events = cxip_rxc_get_num_events(rxc); - ret = cxip_evtq_init(&rxc->rx_evtq, rxc->recv_cq, num_events, 1); + ret = cxip_evtq_init(&rxc->rx_evtq, rxc->recv_cq, num_events, 1, + rxc->ep_obj->priv_wait); if (ret) { CXIP_WARN("Failed to initialize RXC event queue: %d, %s\n", ret, fi_strerror(-ret)); diff --git a/prov/cxi/src/cxip_txc.c b/prov/cxi/src/cxip_txc.c index fdbd64af604..49d19fd6b58 100644 --- a/prov/cxi/src/cxip_txc.c +++ b/prov/cxi/src/cxip_txc.c @@ -328,7 +328,8 @@ int cxip_txc_enable(struct cxip_txc *txc) num_events = cxip_txc_get_num_events(txc); - ret = cxip_evtq_init(&txc->tx_evtq, txc->send_cq, num_events, 0); + ret = cxip_evtq_init(&txc->tx_evtq, txc->send_cq, num_events, 0, + txc->ep_obj->priv_wait); if (ret) { CXIP_WARN("Failed to initialize TX event queue: %d, %s\n", ret, fi_strerror(-ret)); diff --git a/prov/cxi/test/cxip_test_common.c b/prov/cxi/test/cxip_test_common.c index b0fe3ccd622..fd3fec4b5c5 100644 --- a/prov/cxi/test/cxip_test_common.c +++ b/prov/cxi/test/cxip_test_common.c @@ -774,6 +774,7 @@ void cxit_setup_enabled_ep_fd(void) cxit_fi_hints->domain_attr->data_progress = FI_PROGRESS_MANUAL; cxit_fi_hints->domain_attr->data_progress = FI_PROGRESS_MANUAL; + cxit_fi_hints->domain_attr->threading = FI_THREAD_SAFE; cxit_setup_ep(); diff --git a/prov/cxi/test/tagged.c b/prov/cxi/test/tagged.c index e711767f308..b486a340f60 100644 --- a/prov/cxi/test/tagged.c +++ b/prov/cxi/test/tagged.c @@ -5475,7 +5475,262 @@ Test(tagged_src_err, addr) TestSuite(tagged_cq_wait, .init = cxit_setup_rma_fd, .fini = cxit_teardown_rma_fd, - .timeout = CXIT_DEFAULT_TIMEOUT); + .timeout = 20); + +Test(tagged_cq_wait, timeout_poll) +{ + struct fid *fids[1]; + int cq_fd; + int ret; + struct pollfd fds; + int timeout = 100; + uint64_t end_ms; + uint64_t start_ms; + + sleep(1); + + ret = fi_control(&cxit_rx_cq->fid, FI_GETWAIT, &cq_fd); + cr_assert_eq(ret, FI_SUCCESS, "Get RX CQ wait FD %d", ret); + + fids[0] = &cxit_rx_cq->fid; + ret = fi_trywait(cxit_fabric, fids, 1); + cr_assert_eq(ret, FI_SUCCESS, "Unexpected fi_trywait return %d\n", + ret); + + fds.fd = cq_fd; + fds.events = POLLIN; + start_ms = ofi_gettime_ms(); + ret = poll(&fds, 1, timeout); + cr_assert_eq(ret, 0, "Poll did not timed out, %d", ret); + end_ms = ofi_gettime_ms(); + cr_assert(end_ms >= start_ms + timeout, + "Timeout too short %ld ms asked for %d ms", + end_ms - start_ms, timeout); +} + +Test(tagged_cq_wait, timeout_epoll) +{ + struct epoll_event ev = { + .events = EPOLLIN, + .data.u32 = 0, + }; + int ret; + int epfd; + int waitfd; + struct fid *fids[1]; + int timeout = 100; + uint64_t end_ms; + uint64_t start_ms; + + sleep(1); + + epfd = epoll_create1(0); + cr_assert(epfd >= 0, "epoll_create1() failed %s\n", + strerror(errno)); + + ret = fi_control(&cxit_tx_cq->fid, FI_GETWAIT, &waitfd); + cr_assert(ret == FI_SUCCESS, "get FD for wait object failed %s\n", + strerror(errno)); + + ret = epoll_ctl(epfd, EPOLL_CTL_ADD, waitfd, &ev); + cr_assert(ret == 0, "epoll_ctl failed %s\n", strerror(errno)); + + fids[0] = &cxit_tx_cq->fid; + ret = fi_trywait(cxit_fabric, fids, 1); + cr_assert(ret == FI_SUCCESS, "fi_trywait failed %s\n", + fi_strerror(-ret)); + + /* Ensure timeout since events should not be outsanding */ + memset(&ev, 0, sizeof(ev)); + start_ms = ofi_gettime_ms(); + ret = epoll_wait(epfd, &ev, 1, timeout); + cr_assert(ret == 0, "epoll_wait did not timeout\n"); + end_ms = ofi_gettime_ms(); + cr_assert(end_ms >= start_ms + timeout, + "Timeout too short %ld ms asked for %d ms", + end_ms - start_ms, timeout); + + close(epfd); +} + +Test(tagged_cq_wait, timeout_sread) +{ + int ret; + int timeout = 100; + struct fi_cq_tagged_entry rx_cqe; + uint64_t end_ms; + uint64_t start_ms = ofi_gettime_ms(); + + /* No events should be available. Timeout returns -FI_EAGAIN. */ + ret = fi_cq_sread(cxit_rx_cq, &rx_cqe, 1, NULL, timeout); + cr_assert_eq(ret, -FI_EAGAIN, "Poll did not timed out, %s", + fi_strerror(ret)); + end_ms = ofi_gettime_ms(); + cr_assert(end_ms >= start_ms + timeout, + "Timeout too short %ld ms asked for %d ms", + end_ms - start_ms, timeout); +} + +struct simple_rx_wait { + bool epoll; + bool ux_msg; +}; + +static void *simple_rx_worker(void *data) +{ + struct simple_rx_wait *arg = (struct simple_rx_wait *) data; + struct fid *fids[1]; + int ret; + int recv_len = 64; + uint8_t *recv_buf; + struct fi_cq_tagged_entry rx_cqe; + fi_addr_t from; + int cq_fd; + struct epoll_event ev = { + .events = EPOLLIN, + .data.u32 = 0, + }; + int epfd; + struct pollfd fds; + int tries = 0; + + recv_buf = aligned_alloc(s_page_size, recv_len); + cr_assert(recv_buf); + memset(recv_buf, 0, recv_len); + + ret = fi_recv(cxit_ep, recv_buf, recv_len, NULL, + FI_ADDR_UNSPEC, NULL); + cr_assert_eq(ret, FI_SUCCESS, "fi_recv failed %d", ret); + + ret = fi_control(&cxit_rx_cq->fid, FI_GETWAIT, &cq_fd); + cr_assert_eq(ret, FI_SUCCESS, "Get CQ wait FD %d", cq_fd); + + fids[0] = &cxit_rx_cq->fid; + + /* We want to block waiting for the recv event */ + if (arg->epoll) { + epfd = epoll_create1(0); + cr_assert(epfd >= 0, "epoll_create1() failed %s", + strerror(errno)); + + ev.data.fd = cq_fd; + ret = epoll_ctl(epfd, EPOLL_CTL_ADD, cq_fd, &ev); + cr_assert_eq(ret, 0, "epoll_ctl() failed %s", strerror(errno)); + } + + /* For UX message tests, trywait should return -FI_EAGAIN */ +cqe_not_ready: + ret = fi_trywait(cxit_fabric, fids, 1); + if (arg->ux_msg) { + cr_assert_eq(ret, -FI_EAGAIN, "UX event not ready, ret %s\n", + fi_strerror(-ret)); + do { + ret = fi_cq_readfrom(cxit_rx_cq, &rx_cqe, 1, &from); + } while (ret == -FI_EAGAIN); + cr_assert_eq(ret, 1, "UX message not received\n"); + goto done; + } + + /* No event should be pending, nothing sent yet */ + if (tries == 0) + cr_assert_eq(ret, FI_SUCCESS, "RX CQ event pending ret %d", ret); + + /* Wait for message */ + if (ret == FI_SUCCESS) { + if (arg->epoll) { + struct epoll_event evs[1] = {}; + + ret = epoll_wait(epfd, evs, 1, 5000); + } else { + fds.fd = cq_fd; + fds.events = POLLIN; + ret = poll(&fds, 1, 5000); + } + cr_assert(ret != 0, "RX poll timed out, ret %d\n", ret); + cr_assert(ret > 0, "Unexpected poll error %d\n", ret); + } + + /* We can get woken up for the send event, so -FI_EAGAIN + * is possible. Make sure no more than two wakeups occur. + */ + ret = fi_cq_readfrom(cxit_rx_cq, &rx_cqe, 1, &from); + if (ret == -FI_EAGAIN && ++tries < 2) + goto cqe_not_ready; + + cr_assert_eq(ret, 1, "fi_cq_read unexpected value %d", ret); + +done: + free(recv_buf); + pthread_exit(NULL); +} + +void simple_rx_wait(bool epoll, bool ux_msg) +{ + pthread_t rx_thread; + pthread_attr_t attr = {}; + int ret; + int i; + int send_len = 64; + uint8_t *send_buf; + struct fi_cq_tagged_entry tx_cqe; + struct simple_rx_wait arg = { + .epoll = epoll, + .ux_msg = ux_msg, + }; + + send_buf = aligned_alloc(s_page_size, send_len); + cr_assert(send_buf); + + for (i = 0; i < send_len; i++) + send_buf[i] = i + 0xa0; + + if (!arg.ux_msg) { + /* Start processing receives */ + ret = pthread_create(&rx_thread, &attr, simple_rx_worker, &arg); + cr_assert_eq(ret, 0, "Receive thread create failed %d", ret); + + /* Make sure receive is posted and thread is polling */ + sleep(1); + } + + /* Send 64 byte message to self */ + ret = fi_send(cxit_ep, send_buf, send_len, NULL, cxit_ep_fi_addr, NULL); + cr_assert_eq(ret, FI_SUCCESS, "fi_send failed %d", ret); + + if (arg.ux_msg) { + /* Start processing receives */ + ret = pthread_create(&rx_thread, &attr, simple_rx_worker, &arg); + cr_assert_eq(ret, 0, "Receive thread create failed %d", ret); + } + + ret = pthread_join(rx_thread, NULL); + + /* Wait for async event indicating data has been sent */ + ret = cxit_await_completion(cxit_tx_cq, &tx_cqe); + cr_assert_eq(ret, 1, "fi_cq_read unexpected value %d", ret); + + free(send_buf); +} + +Test(tagged_cq_wait, simple_rx_epoll) +{ + simple_rx_wait(true, false); +} + +Test(tagged_cq_wait, simple_rx_epoll_ux) +{ + simple_rx_wait(true, true); +} + +Test(tagged_cq_wait, simple_rx_poll) +{ + simple_rx_wait(false, false); +} + +Test(tagged_cq_wait, simple_rx_poll_ux) +{ + simple_rx_wait(false, true); +} struct fd_params { size_t length; @@ -5500,36 +5755,54 @@ static void *tagged_cq_wait_evt_worker(void *data) struct fid *fids[1]; int cq_fd; size_t completions = 0; + struct epoll_event ev = { + .events = EPOLLIN, + .data.u32 = 0, + }; + int epfd; args = (struct tagged_cq_wait_event_args *)data; if (args->poll) { + epfd = epoll_create1(0); + cr_assert(epfd >= 0, "epoll_create1() failed %s", + strerror(errno)); + ret = fi_control(&args->cq->fid, FI_GETWAIT, &cq_fd); cr_assert_eq(ret, FI_SUCCESS, "Get CQ wait FD %d", ret); - fids[0] = &args->cq->fid; + + ev.data.fd = cq_fd; + ret = epoll_ctl(epfd, EPOLL_CTL_ADD, cq_fd, &ev); + cr_assert_eq(ret, 0, "epoll_ctl() failed %s", + strerror(errno)); } while (completions < args->io_num) { if (args->poll) { + fids[0] = &args->cq->fid; ret = fi_trywait(cxit_fabric, fids, 1); if (ret == FI_SUCCESS) { - struct pollfd fds; - - fds.fd = cq_fd; - fds.events = POLLIN; + struct epoll_event evs[1] = {}; - ret = poll(&fds, 1, args->timeout); - cr_assert_neq(ret, 0, "Poll timed out"); + ret = epoll_wait(epfd, evs, 1, args->timeout); + cr_assert_neq(ret, 0, "%s CQ poll timed out", + args->cq == cxit_tx_cq ? + "TX" : "RX"); cr_assert_eq(ret, 1, "Poll error"); } + ret = fi_cq_read(args->cq, &args->cqe[completions], 1); if (ret == 1) completions++; + + sched_yield(); } else { ret = fi_cq_sread(args->cq, &args->cqe[completions], 1, NULL, args->timeout); - cr_assert_eq(ret, 1, "Completion not received\n"); + cr_assert_eq(ret, 1, + "%s completion not received ret %d\n", + args->cq == cxit_tx_cq ? "TX" : "RX", ret); completions++; } } @@ -5577,7 +5850,7 @@ void do_cq_wait(struct fd_params *param) struct tagged_thread_args *rx_args; pthread_t tx_thread; pthread_t rx_thread; - pthread_attr_t attr; + pthread_attr_t attr = {}; struct tagged_cq_wait_event_args tx_evt_args = { .cq = cxit_tx_cq, .io_num = param->num_ios, @@ -5650,14 +5923,14 @@ void do_cq_wait(struct fd_params *param) /* Sends last for expected messaging */ if (!param->ux_msg) { - /* Make sure receive has blocked */ + /* Make RX process first */ sleep(1); - cq_wait_post_sends(tx_args, param); /* Start processing Send events */ ret = pthread_create(&tx_thread, &attr, tagged_cq_wait_evt_worker, (void *)&tx_evt_args); + cq_wait_post_sends(tx_args, param); } /* Wait for the RX/TX event threads to complete */ @@ -5689,11 +5962,13 @@ void do_cq_wait(struct fd_params *param) free(rx_args); } +/* Test multiple threads using poll or sread on both CQ */ ParameterizedTestParameters(tagged_cq_wait, wait_fd) { size_t param_sz; static struct fd_params params[] = { + /* Test direct FI_WAIT_FD polling */ {.length = 1024, .num_ios = 4, .timeout = 5000, @@ -5702,6 +5977,7 @@ ParameterizedTestParameters(tagged_cq_wait, wait_fd) .num_ios = 4, .timeout = 5000, .poll = true}, + /* Test indirect FI_WAIT_FD polling via fi_cq_sread */ {.length = 1024, .num_ios = 4, .timeout = 5000,