Skip to content

Commit

Permalink
[v1.21.x] prov/efa: Add tracepoints for rx pkt processing events.
Browse files Browse the repository at this point in the history
Added tracepoints to record the following events:

    - rx_pke_proc_matched_msg_begin(end): processing a rx pkt entry
    that matches a received msg.
    - rx_pkt_copy_payload_begin(end): copying the data from rx pkt
    entry to the ope (application buffer).

Signed-off-by: Shi Jin <[email protected]>
(cherry picked from commit 3254970)
  • Loading branch information
shijin-aws committed Jul 22, 2024
1 parent f7d2c2a commit 50f3952
Show file tree
Hide file tree
Showing 7 changed files with 64 additions and 2 deletions.
1 change: 1 addition & 0 deletions prov/efa/src/rdm/efa_rdm_ope.c
Original file line number Diff line number Diff line change
Expand Up @@ -1615,6 +1615,7 @@ int efa_rdm_rxe_post_local_read_or_queue(struct efa_rdm_ope *rxe,
struct fi_msg_rma msg_rma;
struct efa_rdm_ope *txe;

efa_rdm_tracepoint(rx_pke_local_read_copy_payload_begin, (size_t) pkt_entry, pkt_entry->payload_size, rxe->msg_id, (size_t) rxe->cq_entry.op_context, rxe->total_len);
/* setup rma_iov, which is pointing to buffer in the packet entry */
rma_iov.addr = (uint64_t)pkt_data;
rma_iov.len = data_size;
Expand Down
1 change: 1 addition & 0 deletions prov/efa/src/rdm/efa_rdm_pke.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "efa_rdm_pke_rtm.h"
#include "efa_rdm_pke_nonreq.h"
#include "efa_rdm_pke_req.h"
#include "efa_rdm_tracepoint.h"

/**
* @brief allocate a packet entry
Expand Down
2 changes: 2 additions & 0 deletions prov/efa/src/rdm/efa_rdm_pke_cmd.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "efa_rdm_pke_utils.h"
#include "efa_rdm_pke_nonreq.h"
#include "efa_rdm_pke_req.h"
#include "efa_rdm_tracepoint.h"

/* Handshake wait timeout in microseconds */
#define EFA_RDM_HANDSHAKE_WAIT_TIMEOUT 1000000
Expand Down Expand Up @@ -328,6 +329,7 @@ void efa_rdm_pke_handle_data_copied(struct efa_rdm_pke *pkt_entry)
assert(ep);

ope->bytes_copied += pkt_entry->payload_size;
efa_rdm_tracepoint(rx_pke_proc_matched_msg_end, (size_t) pkt_entry, pkt_entry->payload_size, ope->msg_id, (size_t) ope->cq_entry.op_context, ope->total_len);
efa_rdm_pke_release_rx(pkt_entry);

if (ope->total_len == ope->bytes_copied) {
Expand Down
1 change: 1 addition & 0 deletions prov/efa/src/rdm/efa_rdm_pke_nonreq.c
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,7 @@ void efa_rdm_pke_handle_rma_read_completion(struct efa_rdm_pke *context_pkt_entr
assert(txe->ep->efa_rx_pkts_held > 0);
txe->ep->efa_rx_pkts_held--;
}
efa_rdm_tracepoint(rx_pke_local_read_copy_payload_end, (size_t) data_pkt_entry, data_pkt_entry->payload_size, data_pkt_entry->ope->msg_id, (size_t) data_pkt_entry->ope->cq_entry.op_context, data_pkt_entry->ope->total_len);
efa_rdm_pke_handle_data_copied(data_pkt_entry);
} else {
assert(txe && txe->cq_entry.flags & FI_READ);
Expand Down
1 change: 1 addition & 0 deletions prov/efa/src/rdm/efa_rdm_pke_rtm.c
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ ssize_t efa_rdm_pke_proc_matched_rtm(struct efa_rdm_pke *pkt_entry)
rxe = pkt_entry->ope;
assert(rxe && rxe->state == EFA_RDM_RXE_MATCHED);

efa_rdm_tracepoint(rx_pke_proc_matched_msg_begin, (size_t) pkt_entry, pkt_entry->payload_size, rxe->msg_id, (size_t) rxe->cq_entry.op_context, rxe->total_len);
if (!rxe->peer) {
rxe->addr = pkt_entry->addr;
rxe->peer = efa_rdm_ep_get_peer(ep, rxe->addr);
Expand Down
7 changes: 7 additions & 0 deletions prov/efa/src/rdm/efa_rdm_pke_utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "efa_rdm_pkt_type.h"
#include "efa_rdm_protocol.h"
#include "efa_rdm_pke_req.h"
#include "efa_rdm_tracepoint.h"

/**
* @brief initialize the payload, payload_size, payload_mr and pkt_size of an outgoing packet
Expand Down Expand Up @@ -150,6 +151,7 @@ int efa_rdm_ep_flush_queued_blocking_copy_to_hmem(struct efa_rdm_ep *ep)
desc = rxe->desc[0];
assert(desc && desc->peer.iface != FI_HMEM_SYSTEM);

efa_rdm_tracepoint(rx_pke_blocking_copy_payload_begin, (size_t) pkt_entry, pkt_entry->payload_size, rxe->msg_id, (size_t) rxe->cq_entry.op_context, rxe->total_len);
if (desc->peer.flags & OFI_HMEM_DATA_DEV_REG_HANDLE) {
assert(desc->peer.hmem_data);
bytes_copied[i] = ofi_dev_reg_copy_to_hmem_iov(
Expand All @@ -165,6 +167,7 @@ int efa_rdm_ep_flush_queued_blocking_copy_to_hmem(struct efa_rdm_ep *ep)
segment_offset + ep->msg_prefix_size,
data, pkt_entry->payload_size);
}
efa_rdm_tracepoint(rx_pke_blocking_copy_payload_end, (size_t) pkt_entry, pkt_entry->payload_size, rxe->msg_id, (size_t) rxe->cq_entry.op_context, rxe->total_len);
}

for (i = 0; i < ep->queued_copy_num; ++i) {
Expand Down Expand Up @@ -332,10 +335,12 @@ int efa_rdm_pke_copy_payload_to_cuda(struct efa_rdm_pke *pke,
*/
if (rxe->bytes_copied + pke->payload_size == rxe->total_len) {
assert(desc->peer.hmem_data);
efa_rdm_tracepoint(rx_pke_blocking_copy_payload_begin, (size_t) pke, pke->payload_size, rxe->msg_id, (size_t) rxe->cq_entry.op_context, rxe->total_len);
ofi_dev_reg_copy_to_hmem_iov(FI_HMEM_CUDA, (uint64_t)desc->peer.hmem_data,
rxe->iov, rxe->iov_count,
segment_offset + ep->msg_prefix_size,
pke->payload, pke->payload_size);
efa_rdm_tracepoint(rx_pke_blocking_copy_payload_end, (size_t) pke, pke->payload_size, rxe->msg_id, (size_t) rxe->cq_entry.op_context, rxe->total_len);
efa_rdm_pke_handle_data_copied(pke);
return 0;
}
Expand Down Expand Up @@ -443,9 +448,11 @@ ssize_t efa_rdm_pke_copy_payload_to_ope(struct efa_rdm_pke *pke,
return efa_rdm_pke_queued_copy_payload_to_hmem(pke, ope);

assert( !desc || desc->peer.iface == FI_HMEM_SYSTEM);
efa_rdm_tracepoint(rx_pke_blocking_copy_payload_begin, (size_t) pke, pke->payload_size, ope->msg_id, (size_t) ope->cq_entry.op_context, ope->total_len);
bytes_copied = ofi_copy_to_iov(ope->iov, ope->iov_count,
segment_offset + ep->msg_prefix_size,
pke->payload, pke->payload_size);
efa_rdm_tracepoint(rx_pke_blocking_copy_payload_end, (size_t) pke, pke->payload_size, ope->msg_id, (size_t) ope->cq_entry.op_context, ope->total_len);

if (bytes_copied != MIN(pke->payload_size, ope->cq_entry.len - segment_offset)) {
EFA_WARN(FI_LOG_CQ, "wrong size! bytes_copied: %ld\n",
Expand Down
53 changes: 51 additions & 2 deletions prov/efa/src/rdm/efa_rdm_tracepoint_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,12 +132,61 @@ LTTNG_UST_TRACEPOINT_EVENT(EFA_RDM_TP_PROV,
)
LTTNG_UST_TRACEPOINT_LOGLEVEL(EFA_RDM_TP_PROV, read_completed, LTTNG_UST_TRACEPOINT_LOGLEVEL_INFO)

#define PKE_ARGS \
size_t, wr_id

#define PKE_FIELDS \
lttng_ust_field_integer_hex(size_t, wr_id, wr_id)

LTTNG_UST_TRACEPOINT_EVENT(EFA_RDM_TP_PROV,
poll_cq,
LTTNG_UST_TP_ARGS(size_t, wr_id),
LTTNG_UST_TP_FIELDS(lttng_ust_field_integer_hex(size_t, wr_id, wr_id)))
LTTNG_UST_TP_ARGS(PKE_ARGS),
LTTNG_UST_TP_FIELDS(PKE_FIELDS))
LTTNG_UST_TRACEPOINT_LOGLEVEL(EFA_RDM_TP_PROV, poll_cq, LTTNG_UST_TRACEPOINT_LOGLEVEL_INFO)

#define PKE_OPE_ARGS \
PKE_ARGS , \
int, size, \
X_ENTRY_ARGS

#define PKE_OPE_FIELDS \
PKE_FIELDS \
lttng_ust_field_integer(int, size, size) \
X_ENTRY_FIELDS

LTTNG_UST_TRACEPOINT_EVENT_CLASS(EFA_RDM_TP_PROV, pke_ope,
LTTNG_UST_TP_ARGS(PKE_OPE_ARGS),
LTTNG_UST_TP_FIELDS(PKE_OPE_FIELDS))

LTTNG_UST_TRACEPOINT_EVENT_INSTANCE(EFA_RDM_TP_PROV, pke_ope, EFA_RDM_TP_PROV,
rx_pke_proc_matched_msg_begin,
LTTNG_UST_TP_ARGS(PKE_OPE_ARGS))
LTTNG_UST_TRACEPOINT_LOGLEVEL(EFA_RDM_TP_PROV, rx_pke_proc_matched_msg_begin, LTTNG_UST_TRACEPOINT_LOGLEVEL_INFO)

LTTNG_UST_TRACEPOINT_EVENT_INSTANCE(EFA_RDM_TP_PROV, pke_ope, EFA_RDM_TP_PROV,
rx_pke_proc_matched_msg_end,
LTTNG_UST_TP_ARGS(PKE_OPE_ARGS))
LTTNG_UST_TRACEPOINT_LOGLEVEL(EFA_RDM_TP_PROV, rx_pke_proc_matched_msg_end, LTTNG_UST_TRACEPOINT_LOGLEVEL_INFO)

LTTNG_UST_TRACEPOINT_EVENT_INSTANCE(EFA_RDM_TP_PROV, pke_ope, EFA_RDM_TP_PROV,
rx_pke_blocking_copy_payload_begin,
LTTNG_UST_TP_ARGS(PKE_OPE_ARGS))
LTTNG_UST_TRACEPOINT_LOGLEVEL(EFA_RDM_TP_PROV, rx_pke_blocking_copy_payload_begin, LTTNG_UST_TRACEPOINT_LOGLEVEL_INFO)

LTTNG_UST_TRACEPOINT_EVENT_INSTANCE(EFA_RDM_TP_PROV, pke_ope, EFA_RDM_TP_PROV,
rx_pke_blocking_copy_payload_end,
LTTNG_UST_TP_ARGS(PKE_OPE_ARGS))
LTTNG_UST_TRACEPOINT_LOGLEVEL(EFA_RDM_TP_PROV, rx_pke_blocking_copy_payload_end, LTTNG_UST_TRACEPOINT_LOGLEVEL_INFO)

LTTNG_UST_TRACEPOINT_EVENT_INSTANCE(EFA_RDM_TP_PROV, pke_ope, EFA_RDM_TP_PROV,
rx_pke_local_read_copy_payload_begin,
LTTNG_UST_TP_ARGS(PKE_OPE_ARGS))
LTTNG_UST_TRACEPOINT_LOGLEVEL(EFA_RDM_TP_PROV, rx_pke_local_read_copy_payload_begin, LTTNG_UST_TRACEPOINT_LOGLEVEL_INFO)

LTTNG_UST_TRACEPOINT_EVENT_INSTANCE(EFA_RDM_TP_PROV, pke_ope, EFA_RDM_TP_PROV,
rx_pke_local_read_copy_payload_end,
LTTNG_UST_TP_ARGS(PKE_OPE_ARGS))
LTTNG_UST_TRACEPOINT_LOGLEVEL(EFA_RDM_TP_PROV, rx_pke_local_read_copy_payload_end, LTTNG_UST_TRACEPOINT_LOGLEVEL_INFO)

#endif /* _EFA_RDM_TP_DEF_H */

Expand Down

0 comments on commit 50f3952

Please sign in to comment.