From c3d1cd4cca73debb87e96a84bae68fe79edb5dc4 Mon Sep 17 00:00:00 2001 From: Seth Zegelstein Date: Thu, 16 Jan 2025 01:28:41 +0000 Subject: [PATCH] prov/efa: Make DGRAM provider use new av_entry struct Update the DGRAM provider to use the new efa_base_av_entry structure. This change splits the AV logic for the DGRAM and the RDM providers. Splitting the av logic for DGRAM makes DGRAM's AV logic simpler and faster in a few ways. 1. This change removes the efa_con struct from the av_entry because the DGRAM protocol does not maintain an idea of a connection to the endpoint receiving the messages. It does not matter if the remote endpoint we were previously communicating with exits, and a new QP opens with the same AH (NIC), and QPN (Endpoint). As long as someone is there to post recv buffers for our messages, DGRAM provider is happy. 2. This changes gets rid of the current/prev av reverse lookup map (ahn/qpn -> av_entry), and creates a single reverse look up map because the dgram provider does not care about old connections. Signed-off-by: Seth Zegelstein --- prov/efa/src/efa_av.c | 365 ++++++++++++++++++++++++++++++- prov/efa/src/efa_av.h | 17 ++ prov/efa/src/efa_msg.c | 10 +- prov/efa/test/efa_unit_test_cq.c | 2 +- 4 files changed, 378 insertions(+), 16 deletions(-) diff --git a/prov/efa/src/efa_av.c b/prov/efa/src/efa_av.c index 9c574c54121..c0f7d31f57d 100644 --- a/prov/efa/src/efa_av.c +++ b/prov/efa/src/efa_av.c @@ -65,6 +65,29 @@ struct efa_conn *efa_av_addr_to_conn(struct efa_av *av, fi_addr_t fi_addr) return efa_av_entry->conn.ep_addr ? &efa_av_entry->conn : NULL; } +/* + * @brief find efa_base_av_entry struct using fi_addr + * + * @param[in] av efa av + * @param[in] fi_addr index into av + * @return if address is valid, return pointer to efa_base_av_entry struct + * otherwise, return NULL + */ +struct efa_base_av_entry *efa_av_addr_to_base_av_entry(struct efa_av *av, fi_addr_t fi_addr) +{ + struct util_av_entry *util_av_entry; + + if (OFI_UNLIKELY(fi_addr == FI_ADDR_UNSPEC || fi_addr == FI_ADDR_NOTAVAIL)) + return NULL; + + assert(av->type == FI_AV_TABLE); + util_av_entry = ofi_bufpool_get_ibuf(av->util_av.av_entry_pool, fi_addr); + if (OFI_UNLIKELY(!util_av_entry)) + return NULL; + + return (struct efa_base_av_entry *) util_av_entry->data; +} + /** * @brief find fi_addr for efa endpoint * @@ -76,15 +99,15 @@ struct efa_conn *efa_av_addr_to_conn(struct efa_av *av, fi_addr_t fi_addr) */ fi_addr_t efa_av_reverse_lookup(struct efa_av *av, uint16_t ahn, uint16_t qpn) { - struct efa_cur_reverse_av *cur_entry; - struct efa_cur_reverse_av_key cur_key; + struct efa_dgram_reverse_av *entry = NULL; + struct efa_cur_reverse_av_key key; - memset(&cur_key, 0, sizeof(cur_key)); - cur_key.ahn = ahn; - cur_key.qpn = qpn; - HASH_FIND(hh, av->cur_reverse_av, &cur_key, sizeof(cur_key), cur_entry); + memset(&entry, 0, sizeof(key)); + key.ahn = ahn; + key.qpn = qpn; + HASH_FIND(hh, av->dgram_reverse_av, &key, sizeof(key), entry); - return (OFI_LIKELY(!!cur_entry)) ? cur_entry->conn->fi_addr : FI_ADDR_NOTAVAIL; + return (OFI_LIKELY(!!entry)) ? entry->av_entry->fi_addr : FI_ADDR_NOTAVAIL; } /** @@ -570,6 +593,146 @@ void efa_conn_release(struct efa_av *av, struct efa_conn *conn) av->used--; } +/* + * @brief update reverse_av for dgram when inserting an new address into the AV + * + * @param[in,out] av efa AV + * @param[in] raw_addr raw address + * @param[in] av_entry efa_base_av_entry + * @return On success, return 0. + * Otherwise, return a negative libfabric error code + */ +static +int efa_dgram_av_update_reverse_av(struct efa_av *av, struct efa_ep_addr *raw_addr, + struct efa_base_av_entry *av_entry) +{ + struct efa_dgram_reverse_av *entry = NULL; + struct efa_cur_reverse_av_key key; + + memset(&key, 0, sizeof(key)); + key.ahn = av_entry->ah->ahn; + key.qpn = raw_addr->qpn; + entry = NULL; + + HASH_FIND(hh, av->dgram_reverse_av, &key, sizeof(key), entry); + if (!entry) { + entry = malloc(sizeof(*entry)); + if (!entry) { + EFA_WARN(FI_LOG_AV, "Cannot allocate memory for cur_reverse_av entry\n"); + return -FI_ENOMEM; + } + + entry->key.ahn = key.ahn; + entry->key.qpn = key.qpn; + entry->av_entry = av_entry; + HASH_ADD(hh, av->dgram_reverse_av, key, sizeof(key), entry); + return 0; + } + + EFA_WARN(FI_LOG_AV, "unexpected reverse av entry already exists."); + assert(0); + return -FI_EINVAL; +} + +/** + * @brief insert one address into address vector (AV) with base av_entry + * + * @param[in] av address vector + * @param[in] addr raw address, in the format of gid:qpn:qkey + * @param[out] fi_addr pointer to the output fi address. This address is used by fi_send + * @param[in] flags flags user passed to fi_av_insert. + * @param[in] context context user passed to fi_av_insert + * @return 0 on success, a negative error code on failure + */ +int efa_dgram_av_insert_one(struct efa_av *av, struct efa_ep_addr *addr, + fi_addr_t *fi_addr, uint64_t flags, void *context) +{ + struct util_av_entry *util_av_entry = NULL; + struct efa_base_av_entry *av_entry = NULL; + char raw_gid_str[INET6_ADDRSTRLEN]; + fi_addr_t tmp_fiaddr; + int ret = 0; + + if (av->ep_type == FI_EP_DGRAM) + addr->qkey = EFA_DGRAM_CONNID; + + ofi_mutex_lock(&av->util_av.lock); + memset(raw_gid_str, 0, sizeof(raw_gid_str)); + if (!inet_ntop(AF_INET6, addr->raw, raw_gid_str, INET6_ADDRSTRLEN)) { + EFA_WARN(FI_LOG_AV, "cannot convert address to string. errno: %d\n", errno); + ret = -FI_EFAULT; + *fi_addr = FI_ADDR_NOTAVAIL; + goto out; + } + + EFA_INFO(FI_LOG_AV, "Inserting address GID[%s] QP[%u] QKEY[%u] to AV ....\n", + raw_gid_str, addr->qpn, addr->qkey); + + /* + * Check if this address already has been inserted, if so set *fi_addr to existing address, + * and return 0 for success. + */ + tmp_fiaddr = ofi_av_lookup_fi_addr_unsafe(&av->util_av, addr); + if (tmp_fiaddr != FI_ADDR_NOTAVAIL) { + *fi_addr = tmp_fiaddr; + EFA_INFO(FI_LOG_AV, "Found existing AV entry pointing to this address! fi_addr: %ld\n", *fi_addr); + ret = 0; + goto out; + } + + if (flags & FI_SYNC_ERR) + memset(context, 0, sizeof(int)); + + if (!efa_av_is_valid_address(addr)) { + EFA_WARN(FI_LOG_AV, "Failed to insert bad addr\n"); + goto err_release; + } + + ret = ofi_av_insert_addr(&av->util_av, addr, &tmp_fiaddr); + if (ret) { + EFA_WARN(FI_LOG_AV, "ofi_av_insert_addr failed! Error message: %s\n", + fi_strerror(ret)); + goto err_release; + } + + util_av_entry = ofi_bufpool_get_ibuf(av->util_av.av_entry_pool, + tmp_fiaddr); + av_entry = (struct efa_base_av_entry *)util_av_entry->data; + assert(efa_is_same_addr(addr, &av_entry->ep_addr)); + + av_entry->fi_addr = tmp_fiaddr; + av_entry->ah = efa_ah_alloc(av, addr->raw); + if (!av_entry->ah) + goto err_release; + + ret = efa_dgram_av_update_reverse_av(av, addr, av_entry); + if (ret) + goto err_release; + + av->used++; + + *fi_addr = av_entry->fi_addr; + EFA_INFO(FI_LOG_AV, "Successfully inserted address GID[%s] QP[%u] QKEY[%u] to AV. fi_addr: %ld\n", + raw_gid_str, addr->qpn, addr->qkey, *fi_addr); + + goto out; + +err_release: + if (av_entry && av_entry->ah) + efa_ah_release(av, av_entry->ah); + + ret = ofi_av_remove_addr(&av->util_av, tmp_fiaddr); + if (ret) + EFA_WARN(FI_LOG_AV, "While processing previous failure, ofi_av_remove_addr failed! err=%d\n", + ret); + + ret = -FI_EINVAL; + *fi_addr = FI_ADDR_NOTAVAIL; +out: + ofi_mutex_unlock(&av->util_av.lock); + return ret; +} + /** * @brief insert one address into address vector (AV) * @@ -659,7 +822,11 @@ int efa_av_insert(struct fid_av *av_fid, const void *addr, for (i = 0; i < count; i++) { addr_i = (struct efa_ep_addr *) ((uint8_t *)addr + i * EFA_EP_ADDR_LEN); - ret = efa_av_insert_one(av, addr_i, &fi_addr_res, flags, context, true); + if (av->ep_type == FI_EP_RDM) + ret = efa_av_insert_one(av, addr_i, &fi_addr_res, flags, context, true); + else + ret = efa_dgram_av_insert_one(av, addr_i, &fi_addr_res, flags, context); + if (ret) { EFA_WARN(FI_LOG_AV, "insert raw_addr to av failed! ret=%d\n", ret); @@ -702,6 +869,31 @@ static int efa_av_lookup(struct fid_av *av_fid, fi_addr_t fi_addr, return 0; } +static int efa_dgram_av_lookup(struct fid_av *av_fid, fi_addr_t fi_addr, + void *addr, size_t *addrlen) +{ + struct util_av_entry *util_av_entry = NULL; + struct efa_base_av_entry *av_entry = NULL; + struct efa_av *av = container_of(av_fid, struct efa_av, util_av.av_fid); + + if (av->type != FI_AV_TABLE) + return -FI_EINVAL; + + if (fi_addr == FI_ADDR_NOTAVAIL) + return -FI_EINVAL; + + util_av_entry = ofi_bufpool_get_ibuf(av->util_av.av_entry_pool, fi_addr); + if (!util_av_entry) + return FI_EINVAL; + + av_entry = (struct efa_base_av_entry *) util_av_entry->data; + + memcpy(addr, (void *)&av_entry->ep_addr, MIN(EFA_EP_ADDR_LEN, *addrlen)); + if (*addrlen > EFA_EP_ADDR_LEN) + *addrlen = EFA_EP_ADDR_LEN; + return 0; +} + /* * @brief remove a set of addresses from AV and release its resources * @@ -762,6 +954,100 @@ static int efa_av_remove(struct fid_av *av_fid, fi_addr_t *fi_addr, return err; } +/** + * @brief release a base_av_entry object + * Caller of this function must obtain av->util_av.lock + * + * @param[in] av address vector + * @param[in] conn av_entry object pointer + */ +static +void efa_dgram_base_av_entry_release(struct efa_av *av, struct efa_base_av_entry *av_entry) +{ + struct efa_dgram_reverse_av *reverse_av_entry; + struct efa_cur_reverse_av_key key; + char gidstr[INET6_ADDRSTRLEN]; + int err; + + memset(&key, 0, sizeof(key)); + key.ahn = av_entry->ah->ahn; + key.qpn = av_entry->ep_addr.qpn; + HASH_FIND(hh, av->dgram_reverse_av, &key, sizeof(key), reverse_av_entry); + if (reverse_av_entry) { + HASH_DEL(av->dgram_reverse_av, reverse_av_entry); + free(reverse_av_entry); + } + + efa_ah_release(av, av_entry->ah); + + err = ofi_av_remove_addr(&av->util_av, av_entry->fi_addr); + if (err) { + EFA_WARN(FI_LOG_AV, "ofi_av_remove_addr failed! err=%d\n", err); + } + + inet_ntop(AF_INET6, av_entry->ep_addr.raw, gidstr, INET6_ADDRSTRLEN); + EFA_INFO(FI_LOG_AV, "efa_base_av_entry released! efa_base_av_entry[%p] GID[%s] QP[%u]\n", + av_entry, gidstr, av_entry->ep_addr.qpn); + + memset(&av_entry->ep_addr, 0, EFA_EP_ADDR_LEN); + + av->used--; +} + +/* + * @brief remove a set of addresses from AV and release its resources + * + * This function implements fi_av_remove() for EFA DGRAM provider. + * + * Note that even after an address was removed from AV, it is still + * possible to get TX and RX completion for the address. Per libfabric + * standard, these completions should be ignored. + * + * To help TX completion handler to identify such a TX completion, + * when removing an address, all its outstanding TX packet's addr + * was set to FI_ADDR_NOTAVAIL. The TX completion handler will + * ignore TX packet whose address is FI_ADDR_NOTAVAIL. + * + * Meanwhile, lower provider will set a packet's address to + * FI_ADDR_NOTAVAIL from it is from a removed address. RX completion + * handler will ignore such packets. + * + * @param[in] av_fid fid of AV (address vector) + * @param[in] fi_addr pointer to an array of libfabric addresses + * @param[in] counter number of libfabric addresses in the array + * @param[in] flags flags + * @return 0 if all addresses have been removed successfully, + * negative libfabric error code if error was encountered. + */ +static int efa_dgram_av_remove(struct fid_av *av_fid, fi_addr_t *fi_addr, + size_t count, uint64_t flags) +{ + size_t i; + struct efa_av *av; + struct efa_base_av_entry *av_entry = NULL; + struct util_av_entry *util_av_entry = NULL; + + if (!fi_addr) + return -FI_EINVAL; + + av = container_of(av_fid, struct efa_av, util_av.av_fid); + if (av->type != FI_AV_TABLE) + return -FI_EINVAL; + + ofi_mutex_lock(&av->util_av.lock); + for (i = 0; i < count; i++) { + util_av_entry = ofi_bufpool_get_ibuf(av->util_av.av_entry_pool, fi_addr[i]); + if (!util_av_entry) + return -FI_EINVAL; + + av_entry = (struct efa_base_av_entry *) util_av_entry->data; + efa_dgram_base_av_entry_release(av, av_entry); + } + + ofi_mutex_unlock(&av->util_av.lock); + return 0; +} + static const char *efa_av_straddr(struct fid_av *av_fid, const void *addr, char *buf, size_t *len) { @@ -778,6 +1064,16 @@ static struct fi_ops_av efa_av_ops = { .straddr = efa_av_straddr }; +static struct fi_ops_av efa_dgram_av_ops = { + .size = sizeof(struct fi_ops_av), + .insert = efa_av_insert, + .insertsvc = fi_no_av_insertsvc, + .insertsym = fi_no_av_insertsym, + .remove = efa_dgram_av_remove, + .lookup = efa_dgram_av_lookup, + .straddr = efa_av_straddr +}; + static void efa_av_close_reverse_av(struct efa_av *av) { struct efa_cur_reverse_av *cur_entry, *curtmp; @@ -824,6 +1120,36 @@ static int efa_av_close(struct fid *fid) return err; } +static void efa_dgram_av_close_reverse_av(struct efa_av *av) +{ + struct efa_dgram_reverse_av *entry = NULL, *tmp_entry = NULL; + + ofi_mutex_lock(&av->util_av.lock); + + HASH_ITER(hh, av->dgram_reverse_av, entry, tmp_entry) { + efa_dgram_base_av_entry_release(av, entry->av_entry); + } + + ofi_mutex_unlock(&av->util_av.lock); +} + +static int efa_dgram_av_close(struct fid *fid) +{ + struct efa_av *av; + int err = 0; + + av = container_of(fid, struct efa_av, util_av.av_fid.fid); + + efa_dgram_av_close_reverse_av(av); + + err = ofi_av_close(&av->util_av); + if (OFI_UNLIKELY(err)) + EFA_WARN(FI_LOG_AV, "Failed to close av: %s\n", fi_strerror(err)); + + free(av); + return err; +} + static struct fi_ops efa_av_fi_ops = { .size = sizeof(struct fi_ops), .close = efa_av_close, @@ -832,6 +1158,14 @@ static struct fi_ops efa_av_fi_ops = { .ops_open = fi_no_ops_open, }; +static struct fi_ops efa_dgram_av_fi_ops = { + .size = sizeof(struct fi_ops), + .close = efa_dgram_av_close, + .bind = fi_no_bind, + .control = fi_no_control, + .ops_open = fi_no_ops_open, +}; + /** * @brief initialize the util_av field in efa_av * @@ -854,8 +1188,12 @@ int efa_av_init_util_av(struct efa_domain *efa_domain, &universe_size) == FI_SUCCESS) attr->count = MAX(attr->count, universe_size); + if (EFA_EP_TYPE_IS_RDM(efa_domain->info)) + util_attr.context_len = sizeof(struct efa_av_entry) - EFA_EP_ADDR_LEN; + else + util_attr.context_len = sizeof(struct efa_base_av_entry) - EFA_EP_ADDR_LEN; + util_attr.addrlen = EFA_EP_ADDR_LEN; - util_attr.context_len = sizeof(struct efa_av_entry) - EFA_EP_ADDR_LEN; util_attr.flags = 0; return ofi_av_init(&efa_domain->util_domain, attr, &util_attr, util_av, context); @@ -946,6 +1284,14 @@ int efa_av_open(struct fid_domain *domain_fid, struct fi_av_attr *attr, (*av_fid)->fid.ops = &efa_av_fi_ops; (*av_fid)->ops = &efa_av_ops; + if (av->ep_type == FI_EP_RDM) { + (*av_fid)->fid.ops = &efa_av_fi_ops; + (*av_fid)->ops = &efa_av_ops; + } else { + (*av_fid)->fid.ops = &efa_dgram_av_fi_ops; + (*av_fid)->ops = &efa_dgram_av_ops; + } + return 0; err_close_util_av: @@ -957,4 +1303,3 @@ int efa_av_open(struct fid_domain *domain_fid, struct fi_av_attr *attr, free(av); return ret; } - diff --git a/prov/efa/src/efa_av.h b/prov/efa/src/efa_av.h index bd4d4a2d74e..7df772c7169 100644 --- a/prov/efa/src/efa_av.h +++ b/prov/efa/src/efa_av.h @@ -7,6 +7,7 @@ #include #include "rdm/efa_rdm_protocol.h" #include "rdm/efa_rdm_peer.h" +#include "efa_base_ep.h" #define EFA_MIN_AV_SIZE (16384) #define EFA_SHM_MAX_AV_COUNT (256) @@ -31,6 +32,12 @@ struct efa_av_entry { struct efa_conn conn; }; +struct efa_base_av_entry { + struct efa_ep_addr ep_addr; + struct efa_ah *ah; + fi_addr_t fi_addr; +}; + struct efa_cur_reverse_av_key { uint16_t ahn; uint16_t qpn; @@ -42,6 +49,12 @@ struct efa_cur_reverse_av { UT_hash_handle hh; }; +struct efa_dgram_reverse_av { + struct efa_cur_reverse_av_key key; + struct efa_base_av_entry *av_entry; + UT_hash_handle hh; +}; + struct efa_prv_reverse_av_key { uint16_t ahn; uint16_t qpn; @@ -67,6 +80,7 @@ struct efa_av { */ struct efa_cur_reverse_av *cur_reverse_av; struct efa_prv_reverse_av *prv_reverse_av; + struct efa_dgram_reverse_av *dgram_reverse_av; struct efa_ah *ah_map; struct util_av util_av; enum fi_ep_type ep_type; @@ -81,6 +95,9 @@ int efa_av_insert_one(struct efa_av *av, struct efa_ep_addr *addr, struct efa_conn *efa_av_addr_to_conn(struct efa_av *av, fi_addr_t fi_addr); + +struct efa_base_av_entry *efa_av_addr_to_base_av_entry(struct efa_av *av, fi_addr_t fi_addr); + fi_addr_t efa_av_reverse_lookup_rdm(struct efa_av *av, uint16_t ahn, uint16_t qpn, struct efa_rdm_pke *pkt_entry); fi_addr_t efa_av_reverse_lookup(struct efa_av *av, uint16_t ahn, uint16_t qpn); diff --git a/prov/efa/src/efa_msg.c b/prov/efa/src/efa_msg.c index c2af757e112..96baa19bd2e 100644 --- a/prov/efa/src/efa_msg.c +++ b/prov/efa/src/efa_msg.c @@ -194,7 +194,7 @@ static ssize_t efa_ep_recvv(struct fid_ep *ep_fid, const struct iovec *iov, void static inline ssize_t efa_post_send(struct efa_base_ep *base_ep, const struct fi_msg *msg, uint64_t flags) { struct efa_qp *qp = base_ep->qp; - struct efa_conn *conn; + struct efa_base_av_entry *av_entry; struct ibv_sge sg_list[2]; /* efa device support up to 2 iov */ struct ibv_data_buf inline_data_list[2]; size_t len, i; @@ -204,8 +204,8 @@ static inline ssize_t efa_post_send(struct efa_base_ep *base_ep, const struct fi dump_msg(msg, "send"); - conn = efa_av_addr_to_conn(base_ep->av, msg->addr); - assert(conn && conn->ep_addr); + av_entry = efa_av_addr_to_base_av_entry(base_ep->av, msg->addr); + assert(av_entry); assert(msg->iov_count <= base_ep->info->tx_attr->iov_limit); @@ -262,8 +262,8 @@ static inline ssize_t efa_post_send(struct efa_base_ep *base_ep, const struct fi ibv_wr_set_sge_list(qp->ibv_qp_ex, msg->iov_count, sg_list); } - ibv_wr_set_ud_addr(qp->ibv_qp_ex, conn->ah->ibv_ah, conn->ep_addr->qpn, - conn->ep_addr->qkey); + ibv_wr_set_ud_addr(qp->ibv_qp_ex, av_entry->ah->ibv_ah, av_entry->ep_addr.qpn, + av_entry->ep_addr.qkey); efa_tracepoint(post_send, qp->ibv_qp_ex->wr_id, (uintptr_t)msg->context); diff --git a/prov/efa/test/efa_unit_test_cq.c b/prov/efa/test/efa_unit_test_cq.c index e69fb8b432e..152e86ad2eb 100644 --- a/prov/efa/test/efa_unit_test_cq.c +++ b/prov/efa/test/efa_unit_test_cq.c @@ -874,7 +874,7 @@ static void test_efa_cq_read(struct efa_resource *resource, fi_addr_t *addr, ibv_cqx->read_src_qp = &efa_mock_ibv_read_src_qp_return_mock; ibv_cqx->read_wc_flags = &efa_mock_ibv_read_wc_flags_return_mock; will_return_maybe(efa_mock_ibv_read_byte_len_return_mock, 4096); - will_return_maybe(efa_mock_ibv_read_slid_return_mock, efa_av_addr_to_conn(base_ep->av, *addr)->ah->ahn); + will_return_maybe(efa_mock_ibv_read_slid_return_mock, efa_av_addr_to_base_av_entry(base_ep->av, *addr)->ah->ahn); will_return_maybe(efa_mock_ibv_read_src_qp_return_mock, raw_addr.qpn); will_return_maybe(efa_mock_ibv_read_wc_flags_return_mock, 0); #endif