diff --git a/.github/prod-cons/dyad_producer.sh b/.github/prod-cons/dyad_producer.sh index 447dc41c..ccf01bea 100755 --- a/.github/prod-cons/dyad_producer.sh +++ b/.github/prod-cons/dyad_producer.sh @@ -8,8 +8,8 @@ export DYAD_PATH_PRODUCER=${DYAD_PATH}_producer mkdir -p ${DYAD_PATH_PRODUCER} echo "Loading DYAD module" -echo flux module load ${DYAD_INSTALL_PREFIX}/lib/dyad.so $DYAD_PATH_PRODUCER $DYAD_DTL_MODE -flux module load ${DYAD_INSTALL_PREFIX}/lib/dyad.so $DYAD_PATH_PRODUCER $DYAD_DTL_MODE +echo flux module load ${DYAD_INSTALL_PREFIX}/lib/dyad.so --dtl_mode=$DYAD_DTL_MODE $DYAD_PATH_PRODUCER +flux module load ${DYAD_INSTALL_PREFIX}/lib/dyad.so --dtl_mode=$DYAD_DTL_MODE $DYAD_PATH_PRODUCER if [[ "$mode" == "${valid_modes[0]}" ]]; then echo ${GITHUB_WORKSPACE}/docs/demos/ecp_feb_2023/c_prod 10 $DYAD_PATH_PRODUCER diff --git a/.github/workflows/compile_test.yaml b/.github/workflows/compile_test.yaml index e26804ad..0602cc91 100644 --- a/.github/workflows/compile_test.yaml +++ b/.github/workflows/compile_test.yaml @@ -12,7 +12,8 @@ jobs: fail-fast: false matrix: flux: [ 0.52.0, 0.49.0] - mode: ["FLUX_RPC", "UCX"] + dtl_mode: ["FLUX_RPC", "UCX"] + profiler_mode: ["CALIPER", "NONE"] test_mode: ["c", "cpp"] #, "python"] runs-on: ubuntu-20.04 # Docker-based jobs must run on Ubuntu env: @@ -20,7 +21,8 @@ jobs: SPACK_DIR: "/home/runner/work/spack" DYAD_INSTALL_PREFIX: "/home/runner/work/dyad/install" DYAD_KVS_NAMESPACE: "test" - DYAD_DTL_MODE: ${{ matrix.mode }} + DYAD_DTL_MODE: ${{ matrix.dtl_mode }} + DYAD_PROFILER_MODE: ${{ matrix.profiler_mode }} DYAD_PATH: "/home/runner/work/dyad/temp" DYAD_TEST_MODE: ${{ matrix.test_mode }} steps: @@ -191,11 +193,17 @@ jobs: if [[ $DYAD_DTL_MODE == 'UCX' ]]; then spack install -j4 ucx@1.13.1 fi + if [[ $DYAD_PROFILER_MODE == 'CALIPER' ]]; then + spack install -j4 caliper + fi mkdir -p ${DYAD_INSTALL_PREFIX} spack view --verbose symlink ${DYAD_INSTALL_PREFIX} flux-core@${FLUX_VERSION} if [[ $DYAD_DTL_MODE == 'UCX' ]]; then spack view --verbose symlink ${DYAD_INSTALL_PREFIX} ucx@1.13.1 fi + if [[ $DYAD_PROFILER_MODE == 'CALIPER' ]]; then + spack view --verbose symlink ${DYAD_INSTALL_PREFIX} caliper + fi - name: Compile DYAD run: | echo "Activating spack" @@ -207,9 +215,14 @@ jobs: mkdir build cd build export PKG_CONFIG_PATH=$PKG_CONFIG_PATH:${DYAD_INSTALL_PREFIX}/lib/pkgconfig - CONFIGURE_FLAGS="-DDYAD_CONTROL_PLANE=FLUX_RPC -DDYAD_DATA_PLANE=FLUX_RPC -DDYAD_PROFILER=NONE" + CONFIGURE_FLAGS="-DDYAD_CONTROL_PLANE=FLUX_RPC -DDYAD_DATA_PLANE=FLUX_RPC" if [[ $DYAD_DTL_MODE == 'UCX' ]]; then - CONFIGURE_FLAGS="-DDYAD_CONTROL_PLANE=FLUX_RPC -DDYAD_DATA_PLANE=UCX -DDYAD_PROFILER=NONE" + CONFIGURE_FLAGS="-DDYAD_CONTROL_PLANE=FLUX_RPC -DDYAD_DATA_PLANE=UCX" + fi + if [[ $DYAD_PROFILER_MODE == 'CALIPER' ]]; then + CONFIGURE_FLAGS="${CONFIGURE_FLAGS} -DDYAD_PROFILER=CALIPER" + else + CONFIGURE_FLAGS="${CONFIGURE_FLAGS} -DDYAD_PROFILER=NONE" fi cmake -DCMAKE_INSTALL_PREFIX=${DYAD_INSTALL_PREFIX} ${CONFIGURE_FLAGS} -DENABLE_DYAD_DEBUG=ON .. make install -j diff --git a/CMakeLists.txt b/CMakeLists.txt index 6e9f4bce..0660fa6a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -158,6 +158,21 @@ if(DYAD_PROFILER STREQUAL "PERFFLOW_ASPECT") message(FATAL_ERROR "-- [${PROJECT_NAME}] ucx is needed for ${PROJECT_NAME} build") endif () endif() +if(DYAD_PROFILER STREQUAL "CALIPER") + find_package(caliper REQUIRED) + if (${caliper_FOUND}) + # For some reason, Caliper sets things a little oddly. The main things to be aware of are: + # * caliper_INCLUDE_DIR (singular) is used instead of caliper_INCLUDE_DIRS + # * caliper_LIB_DIR is used instead of caliper_LIBRARIES + # * There is no variable from caliper-config.cmake that points to the actual library files. + # Caliper expects us to use CMake targets for actual linking + message(STATUS "[${PROJECT_NAME}] found caliper at ${caliper_INCLUDE_DIR}") + include_directories(${caliper_INCLUDE_DIR}) + set(DEPENDENCY_LIB ${DEPENDENCY_LIB} ${caliper_LID_DIR}) + else () + message(FATAL_ERROR "-- [${PROJECT_NAME}] caliper is needed for ${PROJECT_NAME} build") + endif () +endif() if(DYAD_DATA_PLANE STREQUAL "UCX") find_package(ucx 1.6 REQUIRED) if (${ucx_FOUND}) diff --git a/configure.ac b/configure.ac index d50132bf..aff3f7a1 100644 --- a/configure.ac +++ b/configure.ac @@ -81,6 +81,12 @@ AC_ARG_ENABLE([ucx], [enable_ucx=no] ) AM_CONDITIONAL([UCX], [test "x$enable_ucx" = "xyes"]) +AC_ARG_ENABLE([caliper], + [AS_HELP_STRING([--enable-caliper], + [enable performance measurement with Caliper])], + [enable_caliper=$withval], + [enable_caliper=no] +) ############################################# # Define PKG_CHECK_VAR if it does not exist # @@ -120,9 +126,10 @@ AX_PERFFLOW_ASPECT([PERFFLOW], [pkg_check_perfflow_found=yes], [pkg_check_perfflow_found=no] ) -if test "x$enable_perfflow" = "xyes" && test "x$pkg_check_perfflow_found" = "xno"; then +if test "x$with_perfflow" = "xyes" && test "x$pkg_check_perfflow_found" = "xno"; then AC_MSG_ERROR([requested PerfFlow Aspect support, but cannot find PerfFlow Aspect with pkg-config]) fi +DYAD_MOD_RPATH="" # Check for UCX v1.6.0 or higher PKG_CHECK_MODULES([UCX], [ucx >= 1.6.0], @@ -143,11 +150,52 @@ if test "x$enable_ucx" = "xyes"; then [AC_MSG_FAILURE([check_var succeeded, but value is incorrect])] ) AS_IF([test "x$enable_ucx" = "xyes"], - [DYAD_MOD_RPATH="-Wl,-rpath,$UCX_LIBDIR"], - [DYAD_MOD_RPATH=""] + [ + if test -z "$DYAD_MOD_RPATH"; then + DYAD_MOD_RPATH="$UCX_LIBDIR" + else + DYAD_MOD_RPATH="$DYAD_MOD_RPATH:$UCX_LIBDIR" + fi + ], + [] ) - AC_SUBST([DYAD_MOD_RPATH]) fi +PKG_CHECK_MODULES([CALIPER], + [caliper], + [pkg_check_caliper_found=yes], + [pkg_check_caliper_found=no] +) +if test "x$enable_caliper" = "xyes" && test "x$pkg_check_caliper_found" = "xno"; then + AC_MSG_ERROR([requested Caliper support, but cannot find Caliper with pkg-config]) +fi +if test "x$enable_caliper" = "xyes"; then + PKG_CHECK_VAR([CALIPER_LIBDIR], + [caliper], + [libdir], + [], + [AC_MSG_FAILURE([Could not find libdir for Caliper])] + ) + AS_IF([test "x$CALIPER_LIBDIR" = "x"], + [AC_MSG_FAILURE([check_var succeeded, but value is incorrect])] + ) + AS_IF([test "x$enable_caliper" = "xyes"], + [ + if test -z "$DYAD_MOD_RPATH"; then + DYAD_MOD_RPATH="$CALIPER_LIBDIR" + else + DYAD_MOD_RPATH="$DYAD_MOD_RPATH:$CALIPER_LIBDIR" + fi + ], + [] + ) +fi +AM_CONDITIONAL([WITH_CALIPER], [test "x$enable_caliper" = "xyes"]) + +AS_IF([test -z "$DYAD_MOD_RPATH"], + [], + [DYAD_MOD_RPATH="-Wl,-rpath,$DYAD_MOD_RPATH"] +) +AC_SUBST([DYAD_MOD_RPATH]) ########################### # Checks for header files # @@ -207,6 +255,7 @@ fi ######################## AC_CONFIG_FILES([Makefile src/Makefile + src/perf/Makefile src/utils/Makefile src/utils/base64/Makefile src/utils/libtap/Makefile diff --git a/src/dyad/CMakeLists.txt b/src/dyad/CMakeLists.txt index 7a5f5655..e1ea3a36 100644 --- a/src/dyad/CMakeLists.txt +++ b/src/dyad/CMakeLists.txt @@ -1,4 +1,5 @@ add_subdirectory(utils) +add_subdirectory(perf) add_subdirectory(dtl) add_subdirectory(core) add_subdirectory(modules) diff --git a/src/dyad/Makefile.am b/src/dyad/Makefile.am index 626467d4..c9e3e449 100644 --- a/src/dyad/Makefile.am +++ b/src/dyad/Makefile.am @@ -1 +1 @@ -SUBDIRS = utils dtl modules core wrapper stream +SUBDIRS = utils perf dtl modules core wrapper stream diff --git a/src/dyad/common/dyad_rc.h b/src/dyad/common/dyad_rc.h index eb2ec30f..0cf1ec05 100644 --- a/src/dyad/common/dyad_rc.h +++ b/src/dyad/common/dyad_rc.h @@ -7,6 +7,8 @@ #define DYAD_DLL_EXPORTED #endif +#define DYAD_DLL_HIDDEN __attribute__ ((__visibility__ ("default"))) + #if DYAD_PERFFLOW #define DYAD_PFA_ANNOTATE __attribute__ ((annotate ("@critical_path()"))) #else @@ -42,6 +44,9 @@ enum dyad_core_return_codes { DYAD_RC_BAD_B64DECODE = -18, // Decoding of data w/ base64 failed DYAD_RC_BAD_COMM_MODE = -19, // Invalid comm mode provided to DTL DYAD_RC_UNTRACKED = -20, // Provided path is not tracked by DYAD + DYAD_RC_PERF_INIT_FAIL = -21, // Performance measurement initialization failed + DYAD_RC_BAD_CLI_ARG_DEF = -22, // Trying to define a CLI argument failed + DYAD_RC_BAD_CLI_PARSE = -23, // Trying to parse CLI arguments failed }; typedef enum dyad_core_return_codes dyad_rc_t; diff --git a/src/dyad/core/Makefile.am b/src/dyad/core/Makefile.am index 8df09abd..337714d8 100644 --- a/src/dyad/core/Makefile.am +++ b/src/dyad/core/Makefile.am @@ -9,12 +9,14 @@ libdyad_core_la_CFLAGS = \ $(AM_CFLAGS) \ -I$(top_srcdir)/src/utils \ -I$(top_srcdir)/src/utils/base64 \ + -I$(top_srcdir)/src/common \ -I$(top_srcdir)/src/dtl \ - $(JANSSON_CFLAGS) \ + -I$(top_srcdir)/src/perf \ + $(JANSSON_CFLAGS) \ $(FLUX_CORE_CFLAGS) \ -DBUILDING_DYAD=1 \ -fvisibility=hidden -libdyad_core_la_CPPFLAGS = +libdyad_core_la_CPPFLAGS = if UCX libdyad_core_la_LIBADD += $(UCX_LIBS) libdyad_core_la_CFLAGS += $(UCX_CFLAGS) @@ -24,5 +26,12 @@ libdyad_core_la_LIBADD += $(PERFFLOW_LIBS) libdyad_core_la_CFLAGS += $(PERFFLOW_CFLAGS) -DDYAD_PERFFLOW=1 libdyad_core_la_CPPFLAGS += $(PERFFLOW_PLUGIN_CPPFLAGS) endif +if WITH_CALIPER +libdyad_core_la_CFLAGS += \ + -DWITH_CALIPER=1 \ + $(CALIPER_CFLAGS) +libdyad_core_la_LIBADD += \ + $(CALIPER_LIBS) +endif include_HEADERS = dyad_core.h dyad_envs.h diff --git a/src/dyad/core/dyad_core.c b/src/dyad/core/dyad_core.c index 803b2092..74a674f7 100644 --- a/src/dyad/core/dyad_core.c +++ b/src/dyad/core/dyad_core.c @@ -6,6 +6,7 @@ #include #include #include +#include #include #include @@ -28,6 +29,7 @@ const struct dyad_ctx dyad_ctx_default = { NULL, // h NULL, // dtl_handle + NULL, // perf_handle false, // debug false, // check false, // reenter @@ -89,20 +91,26 @@ static int gen_path_key (const char* str, DYAD_CORE_FUNC_MODS dyad_rc_t dyad_kvs_commit (const dyad_ctx_t* ctx, flux_kvs_txn_t* txn) { flux_future_t* f = NULL; + dyad_rc_t rc = DYAD_RC_OK; + DYAD_PERF_REGION_BEGIN (ctx->perf_handle, "dyad_kvs_commit"); DYAD_LOG_INFO (ctx, "Committing transaction to KVS\n"); // Commit the transaction to the Flux KVS f = flux_kvs_commit (ctx->h, ctx->kvs_namespace, 0, txn); // If the commit failed, log an error and return DYAD_BADCOMMIT if (f == NULL) { DYAD_LOG_ERR (ctx, "Could not commit transaction to Flux KVS\n"); - return DYAD_RC_BADCOMMIT; + rc = DYAD_RC_BADCOMMIT; + goto kvs_commit_region_finish; } // If the commit is pending, wait for it to complete flux_future_wait_for (f, -1.0); // Once the commit is complete, destroy the future and transaction flux_future_destroy (f); f = NULL; - return DYAD_RC_OK; + rc = DYAD_RC_OK; +kvs_commit_region_finish: + DYAD_PERF_REGION_END (ctx->perf_handle, "dyad_kvs_commit"); + return rc; } DYAD_CORE_FUNC_MODS dyad_rc_t publish_via_flux (const dyad_ctx_t* restrict ctx, @@ -122,18 +130,20 @@ DYAD_CORE_FUNC_MODS dyad_rc_t publish_via_flux (const dyad_ctx_t* restrict ctx, // The transaction will contain a single key-value pair // with the previously generated key as the key and the // producer's rank as the value + DYAD_PERF_REGION_BEGIN (ctx->perf_handle, "dyad_kvs_transaction_create_and_pack"); DYAD_LOG_INFO (ctx, "Creating KVS transaction under the key %s\n", topic); txn = flux_kvs_txn_create (); if (txn == NULL) { DYAD_LOG_ERR (ctx, "Could not create Flux KVS transaction\n"); rc = DYAD_RC_FLUXFAIL; - goto publish_done; + goto kvs_transaction_create_and_pack_region_finish; } if (flux_kvs_txn_pack (txn, 0, topic, "i", ctx->rank) < 0) { DYAD_LOG_ERR (ctx, "Could not pack Flux KVS transaction\n"); rc = DYAD_RC_FLUXFAIL; - goto publish_done; + goto kvs_transaction_create_and_pack_region_finish; } + DYAD_PERF_REGION_END (ctx->perf_handle, "dyad_kvs_transaction_create_and_pack"); // Call dyad_kvs_commit to commit the transaction into the Flux KVS rc = dyad_kvs_commit (ctx, txn); // If dyad_kvs_commit failed, log an error and forward the return code @@ -147,6 +157,9 @@ publish_done:; flux_kvs_txn_destroy (txn); } return rc; +kvs_transaction_create_and_pack_region_finish: + DYAD_PERF_REGION_END (ctx->perf_handle, "dyad_kvs_transaction_create_and_pack"); + goto publish_done; } DYAD_CORE_FUNC_MODS dyad_rc_t dyad_commit (dyad_ctx_t* restrict ctx, @@ -182,7 +195,8 @@ commit_done:; } DYAD_CORE_FUNC_MODS dyad_rc_t dyad_kvs_read (const dyad_ctx_t* restrict ctx, - const char* restrict topic, + const char* topic, + const char* upath, bool should_wait, dyad_metadata_t** mdata) { @@ -201,11 +215,13 @@ DYAD_CORE_FUNC_MODS dyad_rc_t dyad_kvs_read (const dyad_ctx_t* restrict ctx, // made available if (should_wait) kvs_lookup_flags = FLUX_KVS_WAITCREATE; + DYAD_PERF_REGION_BEGIN (ctx->perf_handle, "dyad_kvs_lookup"); DYAD_LOG_INFO (ctx, "Retrieving information from KVS under the key %s\n", topic); f = flux_kvs_lookup (ctx->h, ctx->kvs_namespace, kvs_lookup_flags, topic); // If the KVS lookup failed, log an error and return DYAD_BADLOOKUP if (f == NULL) { DYAD_LOG_ERR (ctx, "KVS lookup failed!\n"); + DYAD_PERF_REGION_END (ctx->perf_handle, "dyad_kvs_lookup"); rc = DYAD_RC_NOTFOUND; goto kvs_read_end; } @@ -221,15 +237,15 @@ DYAD_CORE_FUNC_MODS dyad_rc_t dyad_kvs_read (const dyad_ctx_t* restrict ctx, goto kvs_read_end; } } - size_t topic_len = strlen (topic); - (*mdata)->fpath = (char*)malloc (topic_len + 1); + size_t upath_len = strlen (upath); + (*mdata)->fpath = (char*)malloc (upath_len + 1); if ((*mdata)->fpath == NULL) { DYAD_LOG_ERR (ctx, "Cannot allocate memory for fpath in metadata object"); rc = DYAD_RC_SYSFAIL; goto kvs_read_end; } - memset ((*mdata)->fpath, '\0', topic_len + 1); - strncpy ((*mdata)->fpath, topic, topic_len); + memset ((*mdata)->fpath, '\0', upath_len + 1); + strncpy ((*mdata)->fpath, upath, upath_len); rc = flux_kvs_lookup_get_unpack (f, "i", &((*mdata)->owner_rank)); // If the extraction did not work, log an error and return DYAD_BADFETCH if (rc < 0) { @@ -247,6 +263,7 @@ DYAD_CORE_FUNC_MODS dyad_rc_t dyad_kvs_read (const dyad_ctx_t* restrict ctx, flux_future_destroy (f); f = NULL; } + DYAD_PERF_REGION_END (ctx->perf_handle, "dyad_kvs_lookup"); return rc; } @@ -276,7 +293,7 @@ DYAD_CORE_FUNC_MODS dyad_rc_t dyad_fetch (const dyad_ctx_t* restrict ctx, DYAD_LOG_INFO (ctx, "Generated KVS key for consumer: %s\n", topic); // Call dyad_kvs_read to retrieve infromation about the file // from the Flux KVS - rc = dyad_kvs_read (ctx, topic, true, mdata); + rc = dyad_kvs_read (ctx, topic, upath, true, mdata); // If an error occured in dyad_kvs_read, log it and propagate the return // code if (DYAD_IS_ERROR (rc)) { @@ -315,6 +332,7 @@ DYAD_CORE_FUNC_MODS dyad_rc_t dyad_get_data (const dyad_ctx_t* ctx, dyad_rc_t final_rc = DYAD_RC_OK; flux_future_t* f; json_t* rpc_payload; + DYAD_PERF_REGION_BEGIN (ctx->perf_handle, "dyad_get_data"); DYAD_LOG_INFO (ctx, "Packing payload for RPC to DYAD module"); rc = ctx->dtl_handle->rpc_pack (ctx->dtl_handle, mdata->fpath, @@ -327,12 +345,14 @@ DYAD_CORE_FUNC_MODS dyad_rc_t dyad_get_data (const dyad_ctx_t* ctx, goto get_done; } DYAD_LOG_INFO (ctx, "Sending payload for RPC to DYAD module"); + DYAD_PERF_REGION_BEGIN (ctx->perf_handle, "dyad_send_rpc"); f = flux_rpc_pack (ctx->h, DYAD_DTL_RPC_NAME, mdata->owner_rank, FLUX_RPC_STREAMING, "o", rpc_payload); + DYAD_PERF_REGION_END (ctx->perf_handle, "dyad_send_rpc"); if (f == NULL) { DYAD_LOG_ERR (ctx, "Cannot send RPC to producer module\n"); rc = DYAD_RC_BADRPC; @@ -375,9 +395,8 @@ get_done:; // well in the module, this last message will set errno to ENODATA (i.e., // end of stream). Otherwise, something went wrong, so we'll return // DYAD_RC_BADRPC. - DYAD_LOG_INFO (ctx, - "Wait for end-of-stream message from module (current RC = %d)\n", - rc); + DYAD_LOG_INFO (ctx, "Wait for end-of-stream message from module (current RC = %d)\n", rc); + DYAD_PERF_REGION_BEGIN (ctx->perf_handle, "dyad_wait_for_rpc_end"); if (rc != DYAD_RC_RPC_FINISHED && rc != DYAD_RC_BADRPC) { if (!(flux_rpc_get (f, NULL) < 0 && errno == ENODATA)) { DYAD_LOG_ERR (ctx, @@ -388,8 +407,10 @@ get_done:; rc = DYAD_RC_BADRPC; } } + DYAD_PERF_REGION_END (ctx->perf_handle, "dyad_wait_for_rpc_end"); DYAD_LOG_INFO (ctx, "Destroy the Flux future for the RPC\n"); flux_future_destroy (f); + DYAD_PERF_REGION_END (ctx->perf_handle, "dyad_get_data"); return rc; } @@ -416,6 +437,7 @@ DYAD_CORE_FUNC_MODS dyad_rc_t dyad_pull (const dyad_ctx_t* restrict ctx, } // Build the full path to the file being consumed + DYAD_PERF_REGION_BEGIN (ctx->perf_handle, "dyad_write_data_for_consumer"); strncpy (file_path, ctx->cons_managed_path, PATH_MAX - 1); concat_str (file_path, mdata->fpath, "/", PATH_MAX); strncpy (file_path_copy, file_path, PATH_MAX); // dirname modifies the arg @@ -454,6 +476,7 @@ DYAD_CORE_FUNC_MODS dyad_rc_t dyad_pull (const dyad_ctx_t* restrict ctx, rc = DYAD_RC_OK; pull_done: + DYAD_PERF_REGION_END (ctx->perf_handle, "dyad_write_data_for_consumer"); if (file_data != NULL) { free ((void*)file_data); } @@ -476,6 +499,12 @@ dyad_rc_t dyad_init (bool debug, dyad_ctx_t** ctx) { dyad_rc_t rc = DYAD_RC_OK; + dyad_perf_t* perf_handle = NULL; + rc = dyad_perf_init (&perf_handle, false, NULL); + if (DYAD_IS_ERROR (rc)) { + goto init_region_finish; + } + DYAD_PERF_REGION_BEGIN (perf_handle, "dyad_init"); // If ctx is NULL, we won't be able to return a dyad_ctx_t // to the user. In that case, print an error and return // immediately with DYAD_NOCTX. @@ -484,7 +513,8 @@ dyad_rc_t dyad_init (bool debug, "'ctx' argument to dyad_init is NULL! This prevents us from " "returning " "a dyad_ctx_t object!\n"); - return DYAD_RC_NOCTX; + rc = DYAD_RC_NOCTX; + goto init_region_finish; } // Check if the actual dyad_ctx_t object is not NULL. // If it is not NULL, that means the dyad_ctx_t object @@ -495,7 +525,8 @@ dyad_rc_t dyad_init (bool debug, if ((*ctx)->initialized) { // TODO Indicate already initialized DPRINTF ((*ctx), "DYAD context already initialized\n"); - return DYAD_RC_OK; + rc = DYAD_RC_OK; + goto init_region_finish; } } else { // Allocate the dyad_ctx_t object and make sure the allocation @@ -503,7 +534,8 @@ dyad_rc_t dyad_init (bool debug, *ctx = (dyad_ctx_t*)malloc (sizeof (struct dyad_ctx)); if (*ctx == NULL) { fprintf (stderr, "Could not allocate DYAD context!\n"); - return DYAD_RC_NOCTX; + rc = DYAD_RC_NOCTX; + goto init_region_finish; } } // Set the initial contents of the dyad_ctx_t object @@ -515,9 +547,11 @@ dyad_rc_t dyad_init (bool debug, fprintf (stderr, "Warning: no managed path provided! DYAD will not do " "anything!\n"); - return DYAD_RC_OK; + rc = DYAD_RC_OK; + goto init_region_finish; } // Set the values in dyad_ctx_t that don't need allocation + (*ctx)->perf_handle = perf_handle; (*ctx)->debug = debug; (*ctx)->check = check; (*ctx)->shared_storage = shared_storage; @@ -528,21 +562,24 @@ dyad_rc_t dyad_init (bool debug, (*ctx)->h = flux_open (NULL, 0); if ((*ctx)->h == NULL) { fprintf (stderr, "Could not open Flux handle!\n"); - return DYAD_RC_FLUXFAIL; + rc = DYAD_RC_FLUXFAIL; + goto init_region_finish; } // Get the rank of the Flux broker corresponding // to the handle. If this fails, return DYAD_FLUXFAIL FLUX_LOG_INFO ((*ctx)->h, "DYAD_CORE: getting Flux rank"); if (flux_get_rank ((*ctx)->h, &((*ctx)->rank)) < 0) { FLUX_LOG_ERR ((*ctx)->h, "Could not get Flux rank!\n"); - return DYAD_RC_FLUXFAIL; + rc = DYAD_RC_FLUXFAIL; + goto init_region_finish; } // If the namespace is provided, copy it into the dyad_ctx_t object FLUX_LOG_INFO ((*ctx)->h, "DYAD_CORE: saving KVS namespace"); if (kvs_namespace == NULL) { FLUX_LOG_ERR ((*ctx)->h, "No KVS namespace provided!\n"); // TODO see if we want a different return val - return DYAD_RC_NOCTX; + rc = DYAD_RC_NOCTX; + goto init_region_finish; } const size_t namespace_len = strlen (kvs_namespace); (*ctx)->kvs_namespace = (char*)malloc (namespace_len + 1); @@ -550,16 +587,17 @@ dyad_rc_t dyad_init (bool debug, FLUX_LOG_ERR ((*ctx)->h, "Could not allocate buffer for KVS namespace!\n"); free (*ctx); *ctx = NULL; - return DYAD_RC_NOCTX; + rc = DYAD_RC_NOCTX; + goto init_region_finish; } strncpy ((*ctx)->kvs_namespace, kvs_namespace, namespace_len + 1); // Initialize the DTL based on the value of dtl_mode // If an error occurs, log it and return an error FLUX_LOG_INFO ((*ctx)->h, "DYAD_CORE: inintializing DYAD DTL"); - rc = dyad_dtl_init (&(*ctx)->dtl_handle, dtl_mode, (*ctx)->h, (*ctx)->debug); + rc = dyad_dtl_init (&(*ctx)->dtl_handle, dtl_mode, (*ctx)->h, (*ctx)->debug, (*ctx)->perf_handle); if (DYAD_IS_ERROR (rc)) { FLUX_LOG_ERR ((*ctx)->h, "Cannot initialize the DTL\n"); - return rc; + goto init_region_finish; } // If the producer-managed path is provided, copy it into // the dyad_ctx_t object @@ -576,7 +614,8 @@ dyad_rc_t dyad_init (bool debug, free ((*ctx)->kvs_namespace); free (*ctx); *ctx = NULL; - return DYAD_RC_NOCTX; + rc = DYAD_RC_NOCTX; + goto init_region_finish; } strncpy ((*ctx)->prod_managed_path, prod_managed_path, prod_path_len + 1); } @@ -596,7 +635,8 @@ dyad_rc_t dyad_init (bool debug, free ((*ctx)->prod_managed_path); free (*ctx); *ctx = NULL; - return DYAD_RC_NOCTX; + rc = DYAD_RC_NOCTX; + goto init_region_finish; } strncpy ((*ctx)->cons_managed_path, cons_managed_path, cons_path_len + 1); } @@ -605,7 +645,11 @@ dyad_rc_t dyad_init (bool debug, (*ctx)->reenter = true; (*ctx)->initialized = true; // TODO Print logging info - return DYAD_RC_OK; + rc = DYAD_RC_OK; + +init_region_finish: + DYAD_PERF_REGION_END ((*ctx)->perf_handle, "dyad_init"); + return rc; } dyad_rc_t dyad_init_env (dyad_ctx_t** ctx) @@ -755,7 +799,7 @@ dyad_rc_t dyad_get_metadata (dyad_ctx_t* ctx, } DYAD_LOG_INFO (ctx, "Generating KVS key: %s", topic); gen_path_key (upath, topic, topic_len, ctx->key_depth, ctx->key_bins); - rc = dyad_kvs_read (ctx, topic, should_wait, mdata); + rc = dyad_kvs_read (ctx, topic, upath, should_wait, mdata); if (DYAD_IS_ERROR (rc)) { DYAD_LOG_ERR (ctx, "Could not read data from the KVS"); goto get_metadata_done; @@ -847,11 +891,15 @@ consume_done:; return rc; } -int dyad_finalize (dyad_ctx_t** ctx) +dyad_rc_t dyad_finalize (dyad_ctx_t** ctx) { + dyad_rc_t rc = DYAD_RC_OK; if (ctx == NULL || *ctx == NULL) { - return DYAD_RC_OK; + rc = DYAD_RC_OK; + goto finalize_region_finish; } + dyad_perf_t* perf_handle = (*ctx)->perf_handle; + DYAD_PERF_REGION_BEGIN (perf_handle, "dyad_finalize"); dyad_dtl_finalize (&(*ctx)->dtl_handle); if ((*ctx)->h != NULL) { flux_close ((*ctx)->h); @@ -871,7 +919,11 @@ int dyad_finalize (dyad_ctx_t** ctx) } free (*ctx); *ctx = NULL; - return DYAD_RC_OK; + rc = DYAD_RC_OK; +finalize_region_finish: + DYAD_PERF_REGION_END (perf_handle, "dyad_finalize"); + dyad_perf_finalize (&perf_handle); + return rc; } #if DYAD_SYNC_DIR diff --git a/src/dyad/core/dyad_core.h b/src/dyad/core/dyad_core.h index 0351b0ae..01f34ede 100644 --- a/src/dyad/core/dyad_core.h +++ b/src/dyad/core/dyad_core.h @@ -30,23 +30,26 @@ extern "C" { #endif +struct dyad_perf; + /** * @struct dyad_ctx */ struct dyad_ctx { - flux_t* h; // the Flux handle for DYAD - struct dyad_dtl* dtl_handle; // Opaque handle to DTL info - bool debug; // if true, perform debug logging - bool check; // if true, perform some check logging - bool reenter; // if false, do not recursively enter DYAD - bool initialized; // if true, DYAD is initialized - bool shared_storage; // if true, the managed path is shared - unsigned int key_depth; // Depth of bins for the Flux KVS - unsigned int key_bins; // Number of bins for the Flux KVS - uint32_t rank; // Flux rank for DYAD - char* kvs_namespace; // Flux KVS namespace for DYAD - char* prod_managed_path; // producer path managed by DYAD - char* cons_managed_path; // consumer path managed by DYAD + flux_t* h; // the Flux handle for DYAD + struct dyad_dtl* dtl_handle; // Opaque handle to DTL info + struct dyad_perf* perf_handle; // Opaque handle to performance measurement tools + bool debug; // if true, perform debug logging + bool check; // if true, perform some check logging + bool reenter; // if false, do not recursively enter DYAD + bool initialized; // if true, DYAD is initialized + bool shared_storage; // if true, the managed path is shared + unsigned int key_depth; // Depth of bins for the Flux KVS + unsigned int key_bins; // Number of bins for the Flux KVS + uint32_t rank; // Flux rank for DYAD + char* kvs_namespace; // Flux KVS namespace for DYAD + char* prod_managed_path; // producer path managed by DYAD + char* cons_managed_path; // consumer path managed by DYAD }; DYAD_DLL_EXPORTED extern const struct dyad_ctx dyad_ctx_default; typedef struct dyad_ctx dyad_ctx_t; diff --git a/src/dyad/dtl/CMakeLists.txt b/src/dyad/dtl/CMakeLists.txt index 90fffd45..2648ecfb 100644 --- a/src/dyad/dtl/CMakeLists.txt +++ b/src/dyad/dtl/CMakeLists.txt @@ -1,7 +1,6 @@ # DTL Interface set(DTL_SRC ${CMAKE_CURRENT_SOURCE_DIR}/dyad_dtl_impl.c) -set(DTL_PRIVATE_HEADERS ${CMAKE_CURRENT_SOURCE_DIR}/dyad_dtl_impl.h - ${CMAKE_CURRENT_SOURCE_DIR}/dyad_rc.h) +set(DTL_PRIVATE_HEADERS ${CMAKE_CURRENT_SOURCE_DIR}/dyad_dtl_impl.h) set(DTL_PUBLIC_HEADERS ) # Flux implementation for DTL @@ -24,7 +23,7 @@ set(DTL_PRIVATE_HEADERS ${DTL_PRIVATE_HEADERS} ${FLUX_PRIVATE_HEADERS}) set(DTL_PUBLIC_HEADERS ${DTL_PUBLIC_HEADERS} ${FLUX_PUBLIC_HEADERS}) add_library(${PROJECT_NAME}_dtl SHARED ${DTL_SRC}) -target_link_libraries(${PROJECT_NAME}_dtl PUBLIC ${PROJECT_NAME}_utils Jansson::Jansson flux::core) +target_link_libraries(${PROJECT_NAME}_dtl PUBLIC ${PROJECT_NAME}_utils Jansson::Jansson flux::core ${PROJECT_NAME}_perf) if(DYAD_DATA_PLANE STREQUAL "UCX") target_link_libraries(${PROJECT_NAME}_dtl PUBLIC ucx::ucp ucx::ucs) diff --git a/src/dyad/dtl/Makefile.am b/src/dyad/dtl/Makefile.am index 9877a412..25e4c9c6 100644 --- a/src/dyad/dtl/Makefile.am +++ b/src/dyad/dtl/Makefile.am @@ -8,15 +8,25 @@ libdyad_dtl_la_SOURCES = \ libdyad_dtl_la_LIBADD = \ $(top_builddir)/src/utils/libutils.la \ $(top_builddir)/src/utils/libmurmur3.la \ + $(top_builddir)/src/perf/libdyad_perf.la \ $(JANSSON_LIBS) \ $(FLUX_CORE_LIBS) libdyad_dtl_la_CFLAGS = \ - $(AM_CFLAGS) \ - -I$(top_srcdir)/src/utils \ - -I$(top_srcdir)/src/utils/base64 \ - $(JANSSON_CFLAGS) \ - $(FLUX_CORE_CLFAGS) \ - -fvisibility=hidden + $(AM_CFLAGS) \ + -I$(top_srcdir)/src/utils \ + -I$(top_srcdir)/src/utils/base64 \ + -I$(top_srcdir)/src/common \ + -I$(top_srcdir)/src/perf \ + $(JANSSON_CFLAGS) \ + $(FLUX_CORE_CLFAGS) \ + -fvisibility=hidden +if WITH_CALIPER +libdyad_dtl_la_CFLAGS += \ + -DWITH_CALIPER=1 \ + $(CALIPER_CFLAGS) +libdyad_dtl_la_LIBADD += \ + $(CALIPER_LIBS) +endif if UCX libdyad_dtl_la_SOURCES += \ @@ -26,4 +36,4 @@ libdyad_dtl_la_LIBADD += $(UCX_LIBS) libdyad_dtl_la_CFLAGS += $(UCX_CFLAGS) -DDYAD_ENABLE_UCX=1 endif -include_HEADERS = dyad_rc.h dyad_flux_log.h dyad_dtl.h \ No newline at end of file +include_HEADERS = dyad_dtl.h diff --git a/src/dyad/dtl/dyad_dtl_impl.c b/src/dyad/dtl/dyad_dtl_impl.c index 2dbf07de..8c0393d4 100644 --- a/src/dyad/dtl/dyad_dtl_impl.c +++ b/src/dyad/dtl/dyad_dtl_impl.c @@ -1,6 +1,7 @@ #include #include +#include #if DYAD_ENABLE_UCX #include "ucx_dtl.h" @@ -9,15 +10,18 @@ dyad_rc_t dyad_dtl_init (dyad_dtl_t **dtl_handle, dyad_dtl_mode_t mode, flux_t *h, - bool debug) + bool debug, + dyad_perf_t *perf_handle) { dyad_rc_t rc = DYAD_RC_OK; + DYAD_PERF_REGION_BEGIN (perf_handle, "dyad_dtl_init"); *dtl_handle = malloc (sizeof (struct dyad_dtl)); if (*dtl_handle == NULL) { rc = DYAD_RC_SYSFAIL; goto dtl_init_done; } (*dtl_handle)->mode = mode; + (*dtl_handle)->perf_handle = perf_handle; #if DYAD_ENABLE_UCX if (mode == DYAD_DTL_UCX) { rc = dyad_dtl_ucx_init (*dtl_handle, mode, h, debug); @@ -39,6 +43,7 @@ dyad_rc_t dyad_dtl_init (dyad_dtl_t **dtl_handle, rc = DYAD_RC_OK; dtl_init_done: + DYAD_PERF_REGION_END (perf_handle, "dyad_dtl_init"); return rc; } @@ -53,6 +58,8 @@ dyad_rc_t dyad_dtl_finalize (dyad_dtl_t **dtl_handle) rc = DYAD_RC_OK; goto dtl_finalize_done; } + dyad_perf_t *perf_handle = (*dtl_handle)->perf_handle; + DYAD_PERF_REGION_BEGIN (perf_handle, "dyad_dtl_finalize"); #if DYAD_ENABLE_UCX if ((*dtl_handle)->mode == DYAD_DTL_UCX) { if ((*dtl_handle)->private.ucx_dtl_handle != NULL) { @@ -80,5 +87,6 @@ dyad_rc_t dyad_dtl_finalize (dyad_dtl_t **dtl_handle) dtl_finalize_done: free (*dtl_handle); *dtl_handle = NULL; + DYAD_PERF_REGION_END (perf_handle, "dyad_dtl_finalize"); return rc; } diff --git a/src/dyad/dtl/dyad_dtl_impl.h b/src/dyad/dtl/dyad_dtl_impl.h index aa682f52..9cdeaa75 100644 --- a/src/dyad/dtl/dyad_dtl_impl.h +++ b/src/dyad/dtl/dyad_dtl_impl.h @@ -7,6 +7,7 @@ #include #include #include +#include #include #ifdef __cplusplus #include @@ -41,6 +42,7 @@ typedef enum dyad_dtl_comm_mode dyad_dtl_comm_mode_t; struct dyad_dtl { dyad_dtl_private_t private; dyad_dtl_mode_t mode; + dyad_perf_t* perf_handle; dyad_rc_t (*rpc_pack) (struct dyad_dtl* restrict self, const char* restrict upath, uint32_t producer_rank, @@ -61,7 +63,8 @@ typedef struct dyad_dtl dyad_dtl_t; dyad_rc_t dyad_dtl_init (dyad_dtl_t** dtl_handle, dyad_dtl_mode_t mode, flux_t* h, - bool debug); + bool debug, + dyad_perf_t* perf_handle); dyad_rc_t dyad_dtl_finalize (dyad_dtl_t** dtl_handle); diff --git a/src/dyad/dtl/flux_dtl.c b/src/dyad/dtl/flux_dtl.c index e579b989..c780047d 100644 --- a/src/dyad/dtl/flux_dtl.c +++ b/src/dyad/dtl/flux_dtl.c @@ -5,10 +5,13 @@ dyad_rc_t dyad_dtl_flux_init (dyad_dtl_t* self, flux_t* h, bool debug) { + dyad_rc_t rc = DYAD_RC_OK; + DYAD_PERF_REGION_BEGIN (self->perf_handle, "dyad_dtl_flux_init"); self->private.flux_dtl_handle = malloc (sizeof (struct dyad_dtl_flux)); if (self->private.flux_dtl_handle == NULL) { FLUX_LOG_ERR (h, "Cannot allocate the Flux DTL handle\n"); - return DYAD_RC_SYSFAIL; + rc = DYAD_RC_SYSFAIL; + goto dtl_flux_init_region_finish; } self->private.flux_dtl_handle->h = h; self->private.flux_dtl_handle->debug = debug; @@ -24,7 +27,10 @@ dyad_rc_t dyad_dtl_flux_init (dyad_dtl_t* self, self->recv = dyad_dtl_flux_recv; self->close_connection = dyad_dtl_flux_close_connection; - return DYAD_RC_OK; + rc = DYAD_RC_OK; +dtl_flux_init_region_finish: + DYAD_PERF_REGION_END (self->perf_handle, "dyad_dtl_flux_init"); + return rc; } dyad_rc_t dyad_dtl_flux_rpc_pack (dyad_dtl_t* restrict self, @@ -32,27 +38,39 @@ dyad_rc_t dyad_dtl_flux_rpc_pack (dyad_dtl_t* restrict self, uint32_t producer_rank, json_t** restrict packed_obj) { + dyad_rc_t rc = DYAD_RC_OK; + DYAD_PERF_REGION_BEGIN (self->perf_handle, "dyad_dtl_flux_rpc_pack"); dyad_dtl_flux_t* dtl_handle = self->private.flux_dtl_handle; *packed_obj = json_pack ("{s:s}", "upath", upath); if (*packed_obj == NULL) { FLUX_LOG_ERR (dtl_handle->h, "Could not pack upath for Flux DTL\n"); - return DYAD_RC_BADPACK; + rc = DYAD_RC_BADPACK; + goto dtl_flux_rpc_pack; } - return DYAD_RC_OK; + rc = DYAD_RC_OK; +dtl_flux_rpc_pack: + DYAD_PERF_REGION_END (self->perf_handle, "dyad_dtl_flux_rpc_pack"); + return rc; } dyad_rc_t dyad_dtl_flux_rpc_unpack (dyad_dtl_t* self, const flux_msg_t* msg, char** upath) { int rc = 0; + dyad_rc_t dyad_rc = DYAD_RC_OK; + DYAD_PERF_REGION_BEGIN (self->perf_handle, "dyad_dtl_flux_rpc_unpack"); rc = flux_request_unpack (msg, NULL, "{s:s}", "upath", upath); if (FLUX_IS_ERROR (rc)) { FLUX_LOG_ERR (self->private.flux_dtl_handle->h, "Could not unpack Flux message from consumer\n"); // TODO create new RC for this - return DYAD_RC_BADUNPACK; + dyad_rc = DYAD_RC_BADUNPACK; + goto dtl_flux_rpc_unpack_region_finish; } self->private.flux_dtl_handle->msg = (flux_msg_t*)msg; - return DYAD_RC_OK; + dyad_rc = DYAD_RC_OK; +dtl_flux_rpc_unpack_region_finish: + DYAD_PERF_REGION_END (self->perf_handle, "dyad_dtl_flux_rpc_unpack"); + return dyad_rc; } dyad_rc_t dyad_dtl_flux_rpc_respond (dyad_dtl_t* self, const flux_msg_t* orig_msg) @@ -75,6 +93,8 @@ dyad_rc_t dyad_dtl_flux_establish_connection (dyad_dtl_t* self, dyad_rc_t dyad_dtl_flux_send (dyad_dtl_t* self, void* buf, size_t buflen) { int rc = 0; + dyad_rc_t dyad_rc = DYAD_RC_OK; + DYAD_PERF_REGION_BEGIN (self->perf_handle, "dyad_dtl_flux_send"); FLUX_LOG_INFO (self->private.flux_dtl_handle->h, "Send data to consumer using a Flux RPC response"); rc = flux_respond_raw (self->private.flux_dtl_handle->h, @@ -85,13 +105,17 @@ dyad_rc_t dyad_dtl_flux_send (dyad_dtl_t* self, void* buf, size_t buflen) FLUX_LOG_ERR (self->private.flux_dtl_handle->h, "Could not send Flux RPC response containing file " "contents\n"); - return DYAD_RC_FLUXFAIL; + dyad_rc = DYAD_RC_FLUXFAIL; + goto dtl_flux_send_region_finish; } if (self->private.flux_dtl_handle->debug) { FLUX_LOG_INFO (self->private.flux_dtl_handle->h, "Successfully sent file contents to consumer\n"); } - return DYAD_RC_OK; + dyad_rc = DYAD_RC_OK; +dtl_flux_send_region_finish: + DYAD_PERF_REGION_END (self->perf_handle, "dyad_dtl_flux_send"); + return dyad_rc; } dyad_rc_t dyad_dtl_flux_recv (dyad_dtl_t* self, void** buf, size_t* buflen) @@ -100,11 +124,13 @@ dyad_rc_t dyad_dtl_flux_recv (dyad_dtl_t* self, void** buf, size_t* buflen) dyad_rc_t dyad_rc = DYAD_RC_OK; errno = 0; dyad_dtl_flux_t* dtl_handle = self->private.flux_dtl_handle; + DYAD_PERF_REGION_BEGIN (self->perf_handle, "dyad_dtl_flux_recv"); FLUX_LOG_INFO (dtl_handle->h, "Get file contents from module using Flux RPC\n"); if (dtl_handle->f == NULL) { FLUX_LOG_ERR (dtl_handle->h, "Cannot get data using RPC without a Flux future\n"); // TODO create new RC for this - return DYAD_RC_FLUXFAIL; + dyad_rc = DYAD_RC_FLUXFAIL; + goto finish_recv; } void* tmp_buf; size_t tmp_buflen; @@ -129,7 +155,9 @@ dyad_rc_t dyad_dtl_flux_recv (dyad_dtl_t* self, void** buf, size_t* buflen) memcpy (*buf, tmp_buf, tmp_buflen); dyad_rc = DYAD_RC_OK; finish_recv: - flux_future_reset (dtl_handle->f); + if (dtl_handle->f != NULL) + flux_future_reset (dtl_handle->f); + DYAD_PERF_REGION_END (self->perf_handle, "dyad_dtl_flux_recv"); return dyad_rc; } @@ -144,12 +172,16 @@ dyad_rc_t dyad_dtl_flux_close_connection (dyad_dtl_t* self) dyad_rc_t dyad_dtl_flux_finalize (dyad_dtl_t** self) { - if (self == NULL || *self == NULL) + if (self == NULL || *self == NULL) { return DYAD_RC_OK; + } + dyad_perf_t* perf_handle = (*self)->perf_handle; + DYAD_PERF_REGION_BEGIN (perf_handle, "dyad_dtl_flux_finalize"); (*self)->private.flux_dtl_handle->h = NULL; (*self)->private.flux_dtl_handle->f = NULL; (*self)->private.flux_dtl_handle->msg = NULL; free ((*self)->private.flux_dtl_handle); (*self)->private.flux_dtl_handle = NULL; + DYAD_PERF_REGION_END (perf_handle, "dyad_dtl_flux_finalize"); return DYAD_RC_OK; } \ No newline at end of file diff --git a/src/dyad/dtl/flux_dtl.h b/src/dyad/dtl/flux_dtl.h index f58580b6..4105ee12 100644 --- a/src/dyad/dtl/flux_dtl.h +++ b/src/dyad/dtl/flux_dtl.h @@ -3,7 +3,7 @@ #include -#include "dyad_dtl_impl.h" +#include struct dyad_dtl_flux { flux_t* h; diff --git a/src/dyad/dtl/ucx_dtl.c b/src/dyad/dtl/ucx_dtl.c index aa03d1e4..6c04f745 100644 --- a/src/dyad/dtl/ucx_dtl.c +++ b/src/dyad/dtl/ucx_dtl.c @@ -1,5 +1,6 @@ #include - +#include +#include #include #include #include @@ -68,9 +69,11 @@ void dyad_ucx_ep_err_handler (void* arg, ucp_ep_h ep, ucs_status_t status) // Simple function used to wait on the async receive static ucs_status_t dyad_ucx_request_wait (dyad_dtl_ucx_t* dtl_handle, - dyad_ucx_request_t* request) + dyad_ucx_request_t* request, + dyad_perf_t* perf_handle) { ucs_status_t final_request_status = UCS_OK; + DYAD_PERF_REGION_BEGIN (perf_handle, "dyad_dtl_ucx_request_wait"); // If 'request' is actually a request handle, this means the communication // operation is scheduled, but not yet completed. if (UCS_PTR_IS_PTR (request)) { @@ -88,22 +91,22 @@ static ucs_status_t dyad_ucx_request_wait (dyad_dtl_ucx_t* dtl_handle, } while (final_request_status == UCS_INPROGRESS); // Free and deallocate the request object ucp_request_free (request); - return final_request_status; + goto dtl_ucx_request_wait_region_finish; } // If 'request' is actually a UCX error, this means the communication // operation immediately failed. In that case, we simply grab the // 'ucs_status_t' object for the error. else if (UCS_PTR_IS_ERR (request)) { - return UCS_PTR_STATUS (request); + final_request_status = UCS_PTR_STATUS (request); + goto dtl_ucx_request_wait_region_finish; } // If 'request' is neither a request handle nor an error, then // the communication operation immediately completed successfully. // So, we simply set the status to UCS_OK - return UCS_OK; -} - -static inline dyad_rc_t dyad_dtl_ucx_finalize_impl (dyad_dtl_ucx_t** dtl_handle) -{ + final_request_status = UCS_OK; +dtl_ucx_request_wait_region_finish: + DYAD_PERF_REGION_END (perf_handle, "dyad_dtl_ucx_request_wait"); + return final_request_status; } dyad_rc_t dyad_dtl_ucx_init (dyad_dtl_t* self, @@ -118,9 +121,12 @@ dyad_rc_t dyad_dtl_ucx_init (dyad_dtl_t* self, ucp_worker_attr_t worker_attrs; dyad_dtl_ucx_t* dtl_handle = NULL; + DYAD_PERF_REGION_BEGIN (self->perf_handle, "dyad_dtl_ucx_init"); + self->private.ucx_dtl_handle = malloc (sizeof (struct dyad_dtl_ucx)); if (self->private.ucx_dtl_handle == NULL) { FLUX_LOG_ERR (h, "Could not allocate UCX DTL context\n"); + DYAD_PERF_REGION_END (self->perf_handle, "dyad_dtl_ucx_init"); return DYAD_RC_SYSFAIL; } dtl_handle = self->private.ucx_dtl_handle; @@ -218,12 +224,14 @@ dyad_rc_t dyad_dtl_ucx_init (dyad_dtl_t* self, self->recv = dyad_dtl_ucx_recv; self->close_connection = dyad_dtl_ucx_close_connection; + DYAD_PERF_REGION_END (self->perf_handle, "dyad_dtl_ucx_init"); return DYAD_RC_OK; error:; // If an error occured, finalize the DTL handle and // return a failing error code dyad_dtl_ucx_finalize (&self); + DYAD_PERF_REGION_END (self->perf_handle, "dyad_dtl_ucx_init"); return DYAD_RC_UCXINIT_FAIL; } @@ -232,13 +240,16 @@ dyad_rc_t dyad_dtl_ucx_rpc_pack (dyad_dtl_t* restrict self, uint32_t producer_rank, json_t** restrict packed_obj) { + dyad_rc_t rc = DYAD_RC_OK; size_t enc_len = 0; char* enc_buf = NULL; ssize_t enc_size = 0; dyad_dtl_ucx_t* dtl_handle = self->private.ucx_dtl_handle; + DYAD_PERF_REGION_BEGIN (self->perf_handle, "dyad_dtl_ucx_rpc_pack"); if (dtl_handle->consumer_address == NULL) { // TODO log error - return DYAD_RC_BADPACK; + rc = DYAD_RC_BADPACK; + goto dtl_ucx_rpc_pack_region_finish; } FLUX_LOG_INFO (dtl_handle->h, "Encode UCP address using base64\n"); enc_len = base64_encoded_length (dtl_handle->addr_len); @@ -247,7 +258,8 @@ dyad_rc_t dyad_dtl_ucx_rpc_pack (dyad_dtl_t* restrict self, enc_buf = malloc (enc_len + 1); if (enc_buf == NULL) { FLUX_LOG_ERR (dtl_handle->h, "Could not allocate buffer for packed address\n"); - return DYAD_RC_SYSFAIL; + rc = DYAD_RC_SYSFAIL; + goto dtl_ucx_rpc_pack_region_finish; } // consumer_address is casted to const char* to avoid warnings // This is valid because it is a pointer to an opaque struct, @@ -260,7 +272,8 @@ dyad_rc_t dyad_dtl_ucx_rpc_pack (dyad_dtl_t* restrict self, if (enc_size < 0) { // TODO log error free (enc_buf); - return DYAD_RC_BADPACK; + rc = DYAD_RC_BADPACK; + goto dtl_ucx_rpc_pack_region_finish; } FLUX_LOG_INFO (dtl_handle->h, "Creating UCP tag for tag matching\n"); // Because we're using tag-matching send/recv for communication, @@ -270,7 +283,8 @@ dyad_rc_t dyad_dtl_ucx_rpc_pack (dyad_dtl_t* restrict self, uint32_t consumer_rank = 0; if (flux_get_rank (dtl_handle->h, &consumer_rank) < 0) { FLUX_LOG_ERR (dtl_handle->h, "Cannot get consumer rank\n"); - return DYAD_RC_FLUXFAIL; + rc = DYAD_RC_FLUXFAIL; + goto dtl_ucx_rpc_pack_region_finish; } // The tag is a 64 bit unsigned integer consisting of the // 32-bit rank of the producer followed by the 32-bit rank @@ -293,13 +307,18 @@ dyad_rc_t dyad_dtl_ucx_rpc_pack (dyad_dtl_t* restrict self, // If the packing failed, log an error if (*packed_obj == NULL) { FLUX_LOG_ERR (dtl_handle->h, "Could not pack upath and UCX address for RPC\n"); - return DYAD_RC_BADPACK; + rc = DYAD_RC_BADPACK; + goto dtl_ucx_rpc_pack_region_finish; } - return DYAD_RC_OK; + rc = DYAD_RC_OK; +dtl_ucx_rpc_pack_region_finish: + DYAD_PERF_REGION_END (self->perf_handle, "dyad_dtl_ucx_rpc_pack"); + return rc; } dyad_rc_t dyad_dtl_ucx_rpc_unpack (dyad_dtl_t* self, const flux_msg_t* msg, char** upath) { + dyad_rc_t rc = DYAD_RC_OK; char* enc_addr = NULL; size_t enc_addr_len = 0; int errcode = 0; @@ -307,6 +326,7 @@ dyad_rc_t dyad_dtl_ucx_rpc_unpack (dyad_dtl_t* self, const flux_msg_t* msg, char uint32_t tag_cons = 0; ssize_t decoded_len = 0; dyad_dtl_ucx_t* dtl_handle = self->private.ucx_dtl_handle; + DYAD_PERF_REGION_BEGIN (self->perf_handle, "dyad_dtl_ucx_rpc_unpack"); FLUX_LOG_INFO (dtl_handle->h, "Unpacking RPC payload\n"); errcode = flux_request_unpack (msg, NULL, @@ -322,7 +342,8 @@ dyad_rc_t dyad_dtl_ucx_rpc_unpack (dyad_dtl_t* self, const flux_msg_t* msg, char &enc_addr_len); if (errcode < 0) { FLUX_LOG_ERR (dtl_handle->h, "Could not unpack Flux message from consumer!\n"); - return DYAD_RC_BADUNPACK; + rc = DYAD_RC_BADUNPACK; + goto dtl_ucx_rpc_unpack_region_finish; } dtl_handle->comm_tag = ((uint64_t)tag_prod << 32) | (uint64_t)tag_cons; FLUX_LOG_INFO (dtl_handle->h, "Obtained upath from RPC payload: %s\n", upath); @@ -334,7 +355,8 @@ dyad_rc_t dyad_dtl_ucx_rpc_unpack (dyad_dtl_t* self, const flux_msg_t* msg, char dtl_handle->consumer_address = (ucp_address_t*)malloc (dtl_handle->addr_len); if (dtl_handle->consumer_address == NULL) { FLUX_LOG_ERR (dtl_handle->h, "Could not allocate memory for consumer address"); - return DYAD_RC_SYSFAIL; + rc = DYAD_RC_SYSFAIL; + goto dtl_ucx_rpc_unpack_region_finish; } decoded_len = base64_decode_using_maps (&base64_maps_rfc4648, (char*)dtl_handle->consumer_address, @@ -346,9 +368,13 @@ dyad_rc_t dyad_dtl_ucx_rpc_unpack (dyad_dtl_t* self, const flux_msg_t* msg, char free (dtl_handle->consumer_address); dtl_handle->consumer_address = NULL; dtl_handle->addr_len = 0; - return DYAD_RC_BAD_B64DECODE; + rc = DYAD_RC_BAD_B64DECODE; + goto dtl_ucx_rpc_unpack_region_finish; } - return DYAD_RC_OK; + rc = DYAD_RC_OK; +dtl_ucx_rpc_unpack_region_finish: + DYAD_PERF_REGION_END (self->perf_handle, "dyad_dtl_ucx_rpc_unpack"); + return rc; } dyad_rc_t dyad_dtl_ucx_rpc_respond (dyad_dtl_t* self, const flux_msg_t* orig_msg) @@ -364,10 +390,12 @@ dyad_rc_t dyad_dtl_ucx_rpc_recv_response (dyad_dtl_t* self, flux_future_t* f) dyad_rc_t dyad_dtl_ucx_establish_connection (dyad_dtl_t* self, dyad_dtl_comm_mode_t comm_mode) { + dyad_rc_t rc = DYAD_RC_OK; ucp_ep_params_t params; ucs_status_t status = UCS_OK; dyad_dtl_ucx_t* dtl_handle = self->private.ucx_dtl_handle; dtl_handle->curr_comm_mode = comm_mode; + DYAD_PERF_REGION_BEGIN (self->perf_handle, "dyad_dtl_ucx_establish_connection"); if (comm_mode == DYAD_COMM_SEND) { params.field_mask = UCP_EP_PARAM_FIELD_REMOTE_ADDRESS | UCP_EP_PARAM_FIELD_ERR_HANDLING_MODE @@ -383,41 +411,53 @@ dyad_rc_t dyad_dtl_ucx_establish_connection (dyad_dtl_t* self, FLUX_LOG_ERR (dtl_handle->h, "ucp_ep_create failed with status %d\n", (int)status); - return DYAD_RC_UCXCOMM_FAIL; + rc = DYAD_RC_UCXCOMM_FAIL; + goto dtl_ucx_establish_connection_region_finish; } if (dtl_handle->debug) { ucp_ep_print_info (dtl_handle->ep, stderr); } - return DYAD_RC_OK; + rc = DYAD_RC_OK; + goto dtl_ucx_establish_connection_region_finish; } else if (comm_mode == DYAD_COMM_RECV) { FLUX_LOG_INFO (dtl_handle->h, "No explicit connection establishment needed for UCX " "receiver\n"); - return DYAD_RC_OK; + rc = DYAD_RC_OK; + goto dtl_ucx_establish_connection_region_finish; } else { FLUX_LOG_ERR (dtl_handle->h, "Invalid communication mode: %d\n", comm_mode); // TODO create new RC for this - return DYAD_RC_BAD_COMM_MODE; + rc = DYAD_RC_BAD_COMM_MODE; + goto dtl_ucx_establish_connection_region_finish; } + rc = DYAD_RC_OK; +dtl_ucx_establish_connection_region_finish: + DYAD_PERF_REGION_END (self->perf_handle, "dyad_dtl_ucx_establish_connection"); + return rc; } dyad_rc_t dyad_dtl_ucx_send (dyad_dtl_t* self, void* buf, size_t buflen) { + dyad_rc_t rc = DYAD_RC_OK; ucs_status_ptr_t stat_ptr; ucs_status_t status = UCS_OK; dyad_ucx_request_t* req = NULL; dyad_dtl_ucx_t* dtl_handle = self->private.ucx_dtl_handle; + DYAD_PERF_REGION_BEGIN (self->perf_handle, "dyad_dtl_ucx_send"); if (dtl_handle->ep == NULL) { FLUX_LOG_INFO (dtl_handle->h, "UCP endpoint was not created prior to invoking " "send!\n"); - return DYAD_RC_UCXCOMM_FAIL; + rc = DYAD_RC_UCXCOMM_FAIL; + goto dtl_ucx_send_region_finish; } // ucp_tag_send_sync_nbx is the prefered version of this send since UCX 1.9 // However, some systems (e.g., Lassen) may have an older verison // This conditional compilation will use ucp_tag_send_sync_nbx if using // UCX 1.9+, and it will use the deprecated ucp_tag_send_sync_nb if using // UCX < 1.9. + DYAD_PERF_REGION_BEGIN (self->perf_handle, "ucp_tag_send"); #if UCP_API_VERSION >= UCP_VERSION(1, 10) ucp_request_param_t params; params.op_attr_mask = UCP_OP_ATTR_FIELD_CALLBACK; @@ -437,14 +477,19 @@ dyad_rc_t dyad_dtl_ucx_send (dyad_dtl_t* self, void* buf, size_t buflen) dtl_handle->comm_tag, dyad_send_callback); #endif + DYAD_PERF_REGION_END (self->perf_handle, "ucp_tag_send"); FLUX_LOG_INFO (dtl_handle->h, "Processing UCP send request\n"); status = dyad_ucx_request_wait (dtl_handle, stat_ptr); if (status != UCS_OK) { FLUX_LOG_ERR (dtl_handle->h, "UCP Tag Send failed (status = %d)!\n", (int)status); - return DYAD_RC_UCXCOMM_FAIL; + rc = DYAD_RC_UCXCOMM_FAIL; + goto dtl_ucx_send_region_finish; } FLUX_LOG_INFO (dtl_handle->h, "Data send with UCP succeeded\n"); - return DYAD_RC_OK; + rc = DYAD_RC_OK; +dtl_ucx_send_region_finish: + DYAD_PERF_REGION_END (self->perf_handle, "dyad_dtl_ucx_send"); + return rc; } dyad_rc_t dyad_dtl_ucx_recv (dyad_dtl_t* self, void** buf, size_t* buflen) @@ -452,10 +497,12 @@ dyad_rc_t dyad_dtl_ucx_recv (dyad_dtl_t* self, void** buf, size_t* buflen) ucs_status_t status = UCS_OK; ucp_tag_message_h msg = NULL; ucp_tag_recv_info_t msg_info; - dyad_ucx_request_t* req = NULL; + ucp_status_ptr_t* stat_ptr = NULL; dyad_dtl_ucx_t* dtl_handle = self->private.ucx_dtl_handle; + DYAD_PERF_REGION_BEGIN (self->perf_handle, "dyad_dtl_ucx_recv"); FLUX_LOG_INFO (dtl_handle->h, "Poll UCP for incoming data\n"); // TODO replace this loop with a resiliency response over RPC + DYAD_PERF_REGION_BEGIN (self->perf_handle, "ucp_tag_probe"); do { ucp_worker_progress (dtl_handle->ucx_worker); msg = ucp_tag_probe_nb (dtl_handle->ucx_worker, @@ -466,6 +513,7 @@ dyad_rc_t dyad_dtl_ucx_recv (dyad_dtl_t* self, void** buf, size_t* buflen) // with the ucp_tag_message_h to retrieve message &msg_info); } while (msg == NULL); + DYAD_PERF_REGION_END (self->perf_handle, "ucp_tag_probe"); // TODO: This version of the polling code is not supposed to spin-lock, // unlike the code above. Currently, it does not work. Once it starts // working, we can replace the code above with a version of this code. @@ -524,9 +572,11 @@ dyad_rc_t dyad_dtl_ucx_recv (dyad_dtl_t* self, void** buf, size_t* buflen) // If allocation fails, log an error if (*buf == NULL) { FLUX_LOG_ERR (dtl_handle->h, "Could not allocate memory for file\n"); - return DYAD_RC_SYSFAIL; + rc = DYAD_RC_SYSFAIL; + goto dtl_ucx_recv_region_finish; } FLUX_LOG_INFO (dtl_handle->h, "Receive data using async UCX operation\n"); + DYAD_PERF_REGION_BEGIN (self->perf_handle, "ucp_tag_msg_recv"); #if UCP_API_VERSION >= UCP_VERSION(1, 10) // Define the settings for the recv operation // @@ -543,36 +593,43 @@ dyad_rc_t dyad_dtl_ucx_recv (dyad_dtl_t* self, void** buf, size_t* buflen) // allows UCX to potentially perform some optimizations recv_params.memory_type = UCS_MEMORY_TYPE_HOST; // Perform the async recv operation using the probed tag recv event - req = ucp_tag_msg_recv_nbx (dtl_handle->ucx_worker, *buf, *buflen, msg, &recv_params); + stat_ptr = ucp_tag_msg_recv_nbx (dtl_handle->ucx_worker, *buf, *buflen, msg, &recv_params); #else - req = ucp_tag_msg_recv_nb (dtl_handle->ucx_worker, + stat_ptr = ucp_tag_msg_recv_nb (dtl_handle->ucx_worker, *buf, *buflen, UCP_DATATYPE_CONTIG, msg, dyad_recv_callback); #endif + DYAD_PERF_REGION_END (self->perf_handle, "ucp_tag_msg_recv"); // Wait on the recv operation to complete FLUX_LOG_INFO (dtl_handle->h, "Wait for UCP recv operation to complete\n"); - status = dyad_ucx_request_wait (dtl_handle, req); + status = dyad_ucx_request_wait (self->private.ucx_dtl_handle, stat_ptr, self->perf_handle); // If the recv operation failed, log an error, free the data buffer, // and set the buffer pointer to NULL if (UCX_STATUS_FAIL (status)) { FLUX_LOG_ERR (dtl_handle->h, "UCX recv failed!\n"); free (*buf); *buf = NULL; - return DYAD_RC_UCXCOMM_FAIL; + rc = DYAD_RC_UCXCOMM_FAIL; + goto dtl_ucx_recv_region_finish; } FLUX_LOG_INFO (dtl_handle->h, "Data receive using UCX is successful\n"); FLUX_LOG_INFO (dtl_handle->h, "Received %lu bytes from producer\n", *buflen); - return DYAD_RC_OK; + rc = DYAD_RC_OK; +dtl_ucx_recv_region_finish: + DYAD_PERF_REGION_END (self->perf_handle, "dyad_dtl_ucx_recv"); + return rc; } dyad_rc_t dyad_dtl_ucx_close_connection (dyad_dtl_t* self) { + dyad_rc_t rc = DYAD_RC_OK; ucs_status_t status = UCS_OK; ucs_status_ptr_t stat_ptr; dyad_dtl_ucx_t* dtl_handle = self->private.ucx_dtl_handle; + DYAD_PERF_REGION_BEGIN (self->perf_handle, "dyad_dtl_ucx_close_connection"); if (dtl_handle->curr_comm_mode == DYAD_COMM_SEND) { if (dtl_handle != NULL) { if (dtl_handle->ep != NULL) { @@ -631,7 +688,7 @@ dyad_rc_t dyad_dtl_ucx_close_connection (dyad_dtl_t* self) dtl_handle->comm_tag = 0; } FLUX_LOG_INFO (dtl_handle->h, "UCP endpoint close successful\n"); - return DYAD_RC_OK; + rc = DYAD_RC_OK; } else if (dtl_handle->curr_comm_mode == DYAD_COMM_RECV) { // Since we're using tag send/recv, there's no need // to explicitly close the connection. So, all we're @@ -639,23 +696,28 @@ dyad_rc_t dyad_dtl_ucx_close_connection (dyad_dtl_t* self) // be valid for DYAD because DYAD won't send a file from // one node to the same node). dtl_handle->comm_tag = 0; - return DYAD_RC_OK; + rc = DYAD_RC_OK; } else { FLUX_LOG_ERR (dtl_handle->h, "Somehow, an invalid comm mode reached " "'close_connection'\n"); // TODO create new RC for this case - return DYAD_RC_BAD_COMM_MODE; + rc = DYAD_RC_BAD_COMM_MODE; } + DYAD_PERF_REGION_END (self->perf_handle, "dyad_dtl_ucx_close_connection"); + return rc; } dyad_rc_t dyad_dtl_ucx_finalize (dyad_dtl_t** self) { dyad_dtl_ucx_t* dtl_handle = NULL; dyad_rc_t rc = DYAD_RC_OK; + DYAD_PERF_REGION_BEGIN (perf_handle, "dyad_dtl_ucx_finalize"); if (self == NULL || *self == NULL || (*self)->private.ucx_dtl_handle == NULL) { - return DYAD_RC_OK; + rc = DYAD_RC_OK; + goto dtl_ucx_finalize_region_finish; } + dyad_perf_t* perf_handle = (*self)->perf_handle; dtl_handle = (*self)->private.ucx_dtl_handle; FLUX_LOG_INFO (dtl_handle->h, "Finalizing UCX DTL\n"); if (dtl_handle->ep != NULL) { @@ -683,5 +745,8 @@ dyad_rc_t dyad_dtl_ucx_finalize (dyad_dtl_t** self) // Free the handle and set to NULL to prevent double free free (dtl_handle); (*self)->private.ucx_dtl_handle = NULL; - return DYAD_RC_OK; + rc = DYAD_RC_OK; +dtl_ucx_finalize_region_finish: + DYAD_PERF_REGION_END (perf_handle, "dyad_dtl_ucx_finalize"); + return rc; } diff --git a/src/dyad/modules/Makefile.am b/src/dyad/modules/Makefile.am index cda72bd9..7dd1e171 100644 --- a/src/dyad/modules/Makefile.am +++ b/src/dyad/modules/Makefile.am @@ -18,7 +18,9 @@ dyad_la_CFLAGS = \ $(AM_CFLAGS) \ -I$(top_srcdir)/src/utils \ -I$(top_srcdir)/src/utils/base64 \ + -I$(top_srcdir)/src/common \ -I$(top_srcdir)/src/dtl \ + -I$(top_srcdir)/src/perf \ $(JANSSON_CFLAGS) \ $(FLUX_CORE_CFLAGS) \ -DBUILDING_DYAD \ @@ -33,6 +35,12 @@ dyad_la_LIBADD += $(PERFFLOW_LIBS) dyad_la_CFLAGS += $(PERFFLOW_CFLAGS) -DDYAD_PERFFLOW=1 dyad_la_CPPFLAGS += $(PERFFLOW_PLUGIN_CPPFLAGS) endif +if WITH_CALIPER +dyad_la_CFLAGS += \ + -DWITH_CALIPER=1 \ + $(CALIPER_CFLAGS) +dyad_la_LIBADD += $(CALIPER_LIBS) +endif if URPC lib_LTLIBRARIES += urpc.la diff --git a/src/dyad/modules/dyad.c b/src/dyad/modules/dyad.c index 953bc34c..b37e1bc9 100644 --- a/src/dyad/modules/dyad.c +++ b/src/dyad/modules/dyad.c @@ -25,6 +25,11 @@ #include #endif // defined(__cplusplus) +#include +#include +#include +#include +#include #include #include #include @@ -46,6 +51,7 @@ struct dyad_mod_ctx { flux_msg_handler_t **handlers; const char *dyad_path; dyad_dtl_t *dtl_handle; + dyad_perf_t *perf_handle; }; const struct dyad_mod_ctx dyad_mod_ctx_default = {NULL, false, NULL, NULL, NULL}; @@ -70,6 +76,11 @@ static void freectx (void *arg) dyad_dtl_finalize (&(ctx->dtl_handle)); ctx->dtl_handle = NULL; } + if (ctx->perf_handle != NULL) { + ctx->perf_handle->flush (ctx->perf_handle); + dyad_perf_finalize (&(ctx->perf_handle)); + ctx->perf_handle = NULL; + } free (ctx); } @@ -121,6 +132,8 @@ dyad_fetch_request_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, int saved_errno = errno; dyad_rc_t rc = 0; + DYAD_PERF_REGION_BEGIN (ctx->perf_handle, "dyad_server_cb"); + if (!flux_msg_is_streaming (msg)) { errno = EPROTO; goto fetch_error; @@ -158,17 +171,22 @@ dyad_fetch_request_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, } #endif // DYAD_SPIN_WAIT + DYAD_PERF_REGION_BEGIN (ctx->perf_handle, "dyad_server_read_data"); FLUX_LOG_INFO (h, "Reading file %s for transfer", fullpath); fd = open (fullpath, O_RDONLY); if (fd < 0) { FLUX_LOG_ERR (h, "DYAD_MOD: Failed to open file \"%s\".\n", fullpath); + DYAD_PERF_REGION_END (ctx->perf_handle, "dyad_server_read_data"); goto fetch_error; } if ((inlen = read_all (fd, &inbuf)) < 0) { FLUX_LOG_ERR (h, "DYAD_MOD: Failed to load file \"%s\".\n", fullpath); + close (fd); + DYAD_PERF_REGION_END (ctx->perf_handle, "dyad_server_read_data"); goto fetch_error; } close (fd); + DYAD_PERF_REGION_END (ctx->perf_handle, "dyad_server_read_data"); FLUX_LOG_INFO (h, "Is inbuf NULL? -> %i\n", (int)(inbuf == NULL)); FLUX_LOG_INFO (h, "Establish DTL connection with consumer"); @@ -190,33 +208,45 @@ dyad_fetch_request_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, } FLUX_LOG_INFO (h, "Close RPC message stream with an ENODATA (%d) message", ENODATA); + DYAD_PERF_REGION_BEGIN (ctx->perf_handle, "dyad_server_send_response"); if (flux_respond_error (h, msg, ENODATA, NULL) < 0) { FLUX_LOG_ERR (h, "DYAD_MOD: %s: flux_respond_error with ENODATA failed\n", __FUNCTION__); } + DYAD_PERF_REGION_END (ctx->perf_handle, "dyad_server_send_response"); errno = saved_errno; FLUX_LOG_INFO (h, "Finished dyad.fetch module invocation\n"); - return; + goto end_fetch_cb; fetch_error: FLUX_LOG_ERR (h, "Close RPC message stream with an error (errno = %d)\n", errno); + DYAD_PERF_REGION_BEGIN (ctx->perf_handle, "dyad_server_send_response"); if (flux_respond_error (h, msg, errno, NULL) < 0) { FLUX_LOG_ERR (h, "DYAD_MOD: %s: flux_respond_error", __FUNCTION__); } + DYAD_PERF_REGION_END (ctx->perf_handle, "dyad_server_send_response"); errno = saved_errno; return; + +end_fetch_cb: + DYAD_PERF_REGION_END (ctx->perf_handle, "dyad_server_cb"); + return; } -static dyad_rc_t dyad_open (flux_t *h, dyad_dtl_mode_t dtl_mode, bool debug) +static dyad_rc_t dyad_open (flux_t *h, dyad_dtl_mode_t dtl_mode, bool debug, optparse_t *opts) { dyad_mod_ctx_t *ctx = getctx (h); dyad_rc_t rc = 0; char *e = NULL; ctx->debug = debug; - rc = dyad_dtl_init (&(ctx->dtl_handle), dtl_mode, h, ctx->debug); + rc = dyad_perf_init (&(ctx->perf_handle), true, opts); + if (DYAD_IS_ERROR (rc)) + goto open_done; + rc = dyad_dtl_init (&(ctx->dtl_handle), dtl_mode, h, ctx->debug, ctx->perf_handle); +open_done: return rc; } @@ -224,83 +254,102 @@ static const struct flux_msg_handler_spec htab[] = {{FLUX_MSGTYPE_REQUEST, DYAD_DTL_RPC_NAME, dyad_fetch_request_cb, 0}, FLUX_MSGHANDLER_TABLE_END}; -void usage () -{ - fprintf (stderr, - "Usage: flux exec -r all flux module load dyad.so " - "[DTL_MODE] [--debug | -d]\n\n"); - fprintf (stderr, "Required Arguments:\n"); - fprintf (stderr, "===================\n"); - fprintf (stderr, - " * PRODUCER_PATH: the producer-managed path that the module should " - "track\n\n"); - fprintf (stderr, "Optional Arguments:\n"); - fprintf (stderr, "===================\n"); - fprintf (stderr, - " * DTL_MODE: a valid data transport layer (DTL) mode. Can be one of the " - "following values\n"); - fprintf (stderr, " * UCX (default): use UCX to send data to consumer\n"); - fprintf (stderr, - " * FLUX_RPC: use Flux's RPC response mechanism to send data to " - "consumer\n"); - fprintf (stderr, " * --debug | -d: if provided, add debugging log messages\n"); -} +static struct optparse_option cmdline_opts[] = {{.name = "dtl_mode", + .key = 'm', + .has_arg = 1, + .arginfo = "DTL_MODE", + .usage = "Specify the data transport " + "layer mode. Can be one of " + "'FLUX_RPC' (default) or " + "'UCX'"}, + {.name = "debug", + .key = 'd', + .has_arg = 0, + .usage = "If provided, add debugging " + "log messages"}, + OPTPARSE_TABLE_END}; DYAD_DLL_EXPORTED int mod_main (flux_t *h, int argc, char **argv) { const mode_t m = (S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH | S_ISGID); dyad_mod_ctx_t *ctx = NULL; - size_t flag_len = 0; - dyad_dtl_mode_t dtl_mode = DYAD_DTL_DEFAULT; + size_t arglen = 0; bool debug = false; + dyad_dtl_mode_t dtl_mode = DYAD_DTL_DEFAULT; + const char *optargp = NULL; + int optindex = 0; + optparse_t *opts = NULL; + int i = 0; if (!h) { fprintf (stderr, "Failed to get flux handle\n"); goto mod_done; } + for (i = 0; i < argc; i++) { + FLUX_LOG_INFO (h, "argv[%d] = %s", i, argv[i]); + } + + FLUX_LOG_INFO (h, "Getting context from AUX"); ctx = getctx (h); - if (argc < 1) { - FLUX_LOG_ERR (ctx->h, - "DYAD_MOD: Missing argument(s). " - "Requires a local dyad path.\n"); - usage (); + FLUX_LOG_INFO (h, "Creating optparser"); + opts = optparse_create ("dyad.so"); + FLUX_LOG_INFO (h, "Adding option table to parser"); + if (optparse_add_option_table (opts, cmdline_opts) < 0) { + FLUX_LOG_ERR (h, "Cannot add option table for DYAD module"); + goto mod_error; + } + FLUX_LOG_INFO (h, "Adding perf options to parser"); + if (DYAD_IS_ERROR (dyad_perf_setopts (opts))) { + FLUX_LOG_ERR (h, "Cannot set command-line options for the performance plugin"); + goto mod_error; + } + FLUX_LOG_INFO (h, "Parsing command line options"); + if ((optindex = optparse_parse_args (opts, argc, argv)) < 0) { + FLUX_LOG_ERR (h, "Cannot parse command line arguments to dyad.so"); + goto mod_error; + } + if (optindex >= argc) { + FLUX_LOG_ERR (h, "Positional arguments not provided to dyad.so"); + optparse_print_usage (opts); goto mod_error; } - (ctx->dyad_path) = argv[0]; - mkdir_as_needed (ctx->dyad_path, m); - if (argc >= 2) { - FLUX_LOG_INFO (h, "DTL Mode (from cmd line) is %s\n", argv[1]); - flag_len = strlen (argv[1]); - if (strncmp (argv[1], "FLUX_RPC", flag_len) == 0) { + if (optparse_getopt (opts, "dtl_mode", &optargp) > 0) { + FLUX_LOG_INFO (h, "Found 'dtl_mode': %s", optargp); + arglen = strlen (optargp); + if (strncmp (optargp, "FLUX_RPC", arglen) == 0) { dtl_mode = DYAD_DTL_FLUX_RPC; - } else if (strncmp (argv[1], "UCX", flag_len) == 0) { + } else if (strncmp (optargp, "UCX", arglen) == 0) { dtl_mode = DYAD_DTL_UCX; } else { - FLUX_LOG_ERR (ctx->h, "Invalid DTL mode provided\n"); - usage (); + FLUX_LOG_ERR (h, "Invalid DTL mode provided\n"); + optparse_print_usage (opts); goto mod_error; } + } else { + FLUX_LOG_INFO (h, "Did not find 'dtl_mode'"); } - if (argc >= 3) { - flag_len = strlen (argv[2]); - if (strncmp (argv[2], "--debug", flag_len) == 0 - || strncmp (argv[2], "-d", flag_len) == 0) { - debug = true; - } else { - debug = false; - } + if (optparse_getopt (opts, "debug", &optargp) > 0) { + FLUX_LOG_INFO (h, "Found 'debug'"); + debug = true; + } else { + FLUX_LOG_INFO (h, "Did not find 'debug'"); } - if (DYAD_IS_ERROR (dyad_open (h, dtl_mode, debug))) { + (ctx->dyad_path) = argv[optindex]; + mkdir_as_needed (ctx->dyad_path, m); + + if (DYAD_IS_ERROR (dyad_open (h, dtl_mode, debug, opts))) { FLUX_LOG_ERR (ctx->h, "dyad_open failed"); goto mod_error; } - FLUX_LOG_INFO (ctx->h, "dyad module begins using \"%s\"\n", argv[0]); + optparse_destroy (opts); + + FLUX_LOG_INFO (ctx->h, "dyad module begins using \"%s\"\n", argv[optindex]); if (flux_msg_handler_addvec (ctx->h, htab, (void *)h, &ctx->handlers) < 0) { FLUX_LOG_ERR (ctx->h, "flux_msg_handler_addvec: %s\n", strerror (errno)); diff --git a/src/dyad/perf/CMakeLists.txt b/src/dyad/perf/CMakeLists.txt new file mode 100644 index 00000000..6617a447 --- /dev/null +++ b/src/dyad/perf/CMakeLists.txt @@ -0,0 +1,41 @@ +set(DYAD_PERF_SRC ) +set(DYAD_PERF_PRIVATE_HEADERS ${CMAKE_CURRENT_SOURCE_DIR}/dyad_perf.h) +set(DYAD_PERF_PUBLIC_HEADERS ) + +set(CALIPER_PERF_SRC ${CMAKE_CURRENT_SOURCE_DIR}/caliper_perf.cpp) +set(CALIPER_PERF_PRIVATE_HEADERS ) +set(CALIPER_PERF_PUBLIC_HEADERS ) + +set(DEFAULT_PERF_SRC ${CMAKE_CURRENT_SOURCE_DIR}/dyad_perf.c) +set(DEFAULT_PERF_PRIVATE_HEADERS ) +set(DEFAULT_PERF_PUBLIC_HEADERS ) + +if (DYAD_PROFILER STREQUAL "CALIPER") + set(DYAD_PERF_SRC ${DYAD_PERF_SRC} ${CALIPER_PERF_SRC}) + set(DYAD_PERF_PRIVATE_HEADERS ${DYAD_PERF_PRIVATE_HEADERS} ${CALIPER_PERF_PRIVATE_HEADERS}) + set(DYAD_PERF_PUBLIC_HEADERS ${DYAD_PERF_PUBLIC_HEADERS} ${CALIPER_PERF_PUBLIC_HEADERS}) +else () + set(DYAD_PERF_SRC ${DYAD_PERF_SRC} ${DEFAULT_PERF_SRC}) + set(DYAD_PERF_PRIVATE_HEADERS ${DYAD_PERF_PRIVATE_HEADERS} ${DEFAULT_PERF_PRIVATE_HEADERS}) + set(DYAD_PERF_PUBLIC_HEADERS ${DYAD_PERF_PUBLIC_HEADERS} ${DEFAULT_PERF_PUBLIC_HEADERS}) +endif () + +add_library(${PROJECT_NAME}_perf SHARED ${DYAD_PERF_SRC}) +target_link_libraries(${PROJECT_NAME}_perf PUBLIC flux::core flux::optparse) + +if (DYAD_PROFILER STREQUAL "CALIPER") + target_link_libraries(${PROJECT_NAME}_perf PUBLIC caliper) + target_compile_definitions(${PROJECT_NAME}_perf PUBLIC WITH_CALIPER=1) +endif () + +install( + TARGETS ${PROJECT_NAME}_perf + EXPORT ${DYAD_EXPORTED_TARGETS} + LIBRARY DESTINATION ${DYAD_INSTALL_LIB_DIR} + ARCHIVE DESTINATION ${DYAD_INSTALL_LIB_DIR} + RUNTIME DESTINATION ${DYAD_INSTALL_BIN_DIR} +) + +if(NOT ${DYAD_PERF_PUBLIC_HEADERS} STREQUAL "") + dyad_install_headers("${DYAD_PERF_PUBLIC_HEADERS}" ${CMAKE_CURRENT_SOURCE_DIR}) +endif() \ No newline at end of file diff --git a/src/dyad/perf/Makefile.am b/src/dyad/perf/Makefile.am new file mode 100644 index 00000000..d35a7d2c --- /dev/null +++ b/src/dyad/perf/Makefile.am @@ -0,0 +1,32 @@ +noinst_LTLIBRARIES = libdyad_perf.la + +libdyad_perf_la_SOURCES = dyad_perf.h +libdyad_perf_la_LIBADD = \ + $(FLUX_CORE_LIBS) \ + $(FLUX_OPTPARSE_LIBS) +libdyad_perf_la_CFLAGS = \ + $(AM_CFLAGS) \ + -I$(top_srcdir)/src/common \ + $(FLUX_CORE_CFLAGS) \ + $(FLUX_OPTPARSE_CFLAGS) \ + -fvisibility=hidden +libdyad_perf_la_CXXFLAGS = \ + $(AM_CFLAGS) \ + -I$(top_srcdir)/src/common \ + $(FLUX_CORE_CFLAGS) \ + $(FLUX_OPTPARSE_CFLAGS) \ + -fvisibility=hidden + +if WITH_CALIPER +libdyad_perf_la_SOURCES += caliper_perf.cpp +libdyad_perf_la_LIBADD += \ + $(CALIPER_LIBS) +libdyad_perf_la_CFLAGS += \ + -DWITH_CALIPER=1 \ + $(CALIPER_CFLAGS) +libdyad_perf_la_CXXFLAGS += \ + -DWITH_CALIPER=1 \ + $(CALIPER_CFLAGS) +else +libdyad_perf_la_SOURCES += dyad_perf.c +endif \ No newline at end of file diff --git a/src/dyad/perf/caliper_perf.cpp b/src/dyad/perf/caliper_perf.cpp new file mode 100644 index 00000000..1550051f --- /dev/null +++ b/src/dyad/perf/caliper_perf.cpp @@ -0,0 +1,121 @@ +#include +#include +#include + +#include +#include + +extern "C" { +struct dyad_perf_impl { + void* config_manager; +}; +} + +void dyad_perf_cali_region_begin (dyad_perf_t* self, const char* region_name) +{ + if (self->enabled) { + cali_begin_region (region_name); + } +} + +void dyad_perf_cali_region_end (dyad_perf_t* self, const char* region_name) +{ + if (self->enabled) { + cali_end_region (region_name); + } +} + +void dyad_perf_cali_flush (dyad_perf_t* self) +{ + if (!self->is_module) + return; + cali::ConfigManager* mgr = (cali::ConfigManager*)self->priv->config_manager; + printf ("Calling ConfigManager->flush\n"); + mgr->flush (); + return; +} + +dyad_rc_t dyad_perf_setopts (optparse_t* opts) +{ + const struct optparse_option cali_config_opt = {.name = "cali_config", + .key = 'c', + .has_arg = 1, + .arginfo = "CALI_CONFIG", + .usage = + "If provided, enable Caliper " + "annotations with the provided " + "configuration"}; + if (optparse_add_option (opts, &cali_config_opt) < 0) + return DYAD_RC_BAD_CLI_ARG_DEF; + return DYAD_RC_OK; +} + +dyad_rc_t dyad_perf_init (dyad_perf_t** perf_handle, bool is_module, optparse_t* opts) +{ + dyad_rc_t rc = DYAD_RC_OK; + const char* optargp = NULL; + if (perf_handle == NULL && *perf_handle == NULL) { + rc = DYAD_RC_PERF_INIT_FAIL; + goto perf_init_done; + } + *perf_handle = (dyad_perf_t*)malloc (sizeof (struct dyad_perf)); + if (*perf_handle == NULL) { + rc = DYAD_RC_SYSFAIL; + // Since we couldn't allocate the handle, + // we don't need to cleanup anything. + // So, we go to perf_init_done. + goto perf_init_done; + } + (*perf_handle)->priv = NULL; + (*perf_handle)->is_module = is_module; + (*perf_handle)->enabled = true; + (*perf_handle)->region_begin = dyad_perf_cali_region_begin; + (*perf_handle)->region_end = dyad_perf_cali_region_end; + (*perf_handle)->flush = dyad_perf_cali_flush; + if (is_module) { + if (optparse_getopt (opts, "cali_config", &optargp) <= 0) { + (*perf_handle)->enabled = false; + } else { + printf ("Found 'cali_config': %s\n", optargp); + cali::ConfigManager* mgr = new cali::ConfigManager (); + mgr->add (optargp); + (*perf_handle)->priv = (dyad_perf_impl_t*)malloc (sizeof (struct dyad_perf_impl)); + if ((*perf_handle)->priv == NULL) { + delete mgr; + rc = DYAD_RC_SYSFAIL; + goto perf_init_error; + } + (*perf_handle)->priv->config_manager = (void*)mgr; + mgr->start (); + } + } + rc = DYAD_RC_OK; + goto perf_init_done; + +perf_init_error: + dyad_perf_finalize (perf_handle); + +perf_init_done: + return rc; +} + +dyad_rc_t dyad_perf_finalize (dyad_perf_t** perf_handle) +{ + dyad_rc_t rc = DYAD_RC_OK; + if (perf_handle == NULL || *perf_handle == NULL) { + goto perf_finalize_done; + } + if ((*perf_handle)->is_module && (*perf_handle)->priv != NULL) { + cali::ConfigManager* mgr = (cali::ConfigManager*)(*perf_handle)->priv->config_manager; + (*perf_handle)->priv->config_manager = NULL; + if (mgr != NULL) + delete mgr; + free ((*perf_handle)->priv); + (*perf_handle)->priv = NULL; + } + free (*perf_handle); + rc = DYAD_RC_OK; + +perf_finalize_done: + return rc; +} \ No newline at end of file diff --git a/src/dyad/perf/dyad_perf.c b/src/dyad/perf/dyad_perf.c new file mode 100644 index 00000000..6c2d3c5c --- /dev/null +++ b/src/dyad/perf/dyad_perf.c @@ -0,0 +1,57 @@ +#if !defined(WITH_CALIPER) +#include + +#include + +void dyad_perf_default_region_begin (dyad_perf_t* restrict self, const char* restrict region_name) +{ + return; +} + +void dyad_perf_default_region_end (dyad_perf_t* restrict self, const char* restrict region_name) +{ + return; +} + +void dyad_perf_default_flush (dyad_perf_t* restrict self) +{ + return; +} + +dyad_rc_t dyad_perf_setopts (optparse_t* opts) +{ + return DYAD_RC_OK; +} + +dyad_rc_t dyad_perf_init (dyad_perf_t** perf_handle, bool is_module, optparse_t* opts) +{ + dyad_rc_t rc = DYAD_RC_OK; + if (perf_handle == NULL && *perf_handle == NULL) { + rc = DYAD_RC_PERF_INIT_FAIL; + goto perf_init_done; + } + *perf_handle = (dyad_perf_t*)malloc (sizeof (struct dyad_perf)); + if (*perf_handle == NULL) { + rc = DYAD_RC_SYSFAIL; + goto perf_init_done; + } + (*perf_handle)->priv = NULL; + (*perf_handle)->is_module = is_module; + (*perf_handle)->enabled = false; + (*perf_handle)->region_begin = dyad_perf_default_region_begin; + (*perf_handle)->region_end = dyad_perf_default_region_end; + (*perf_handle)->flush = dyad_perf_default_flush; + +perf_init_done: + return rc; +} + +dyad_rc_t dyad_perf_finalize (dyad_perf_t** perf_handle) +{ + if (perf_handle == NULL || *perf_handle == NULL) { + free (*perf_handle); + } + return DYAD_RC_OK; +} + +#endif /* defined(WITH_CALIPER) */ \ No newline at end of file diff --git a/src/dyad/perf/dyad_perf.h b/src/dyad/perf/dyad_perf.h new file mode 100644 index 00000000..8b6abb65 --- /dev/null +++ b/src/dyad/perf/dyad_perf.h @@ -0,0 +1,50 @@ +#ifndef DYAD_PERF_DYAD_PERF_H +#define DYAD_PERF_DYAD_PERF_H + +#include + +#include "flux/optparse.h" + +#ifdef __cplusplus +extern "C" { +#else +#include +#endif + +struct dyad_perf_impl; + +typedef struct dyad_perf_impl dyad_perf_impl_t; + +struct dyad_perf { + bool is_module; + bool enabled; + dyad_perf_impl_t* priv; + void (*region_begin) (struct dyad_perf* self, const char* region_name); + void (*region_end) (struct dyad_perf* self, const char* region_name); + void (*flush) (struct dyad_perf* self); +}; + +typedef struct dyad_perf dyad_perf_t; + +DYAD_DLL_HIDDEN dyad_rc_t dyad_perf_setopts (optparse_t* opts); + +DYAD_DLL_HIDDEN dyad_rc_t dyad_perf_init (dyad_perf_t** perf_handle, + bool is_module, + optparse_t* opts); + +DYAD_DLL_HIDDEN dyad_rc_t dyad_perf_finalize (dyad_perf_t** perf_handle); + +#define DYAD_PERF_REGION_BEGIN(handle, name) \ + if (handle != NULL) { \ + handle->region_begin (handle, name); \ + } +#define DYAD_PERF_REGION_END(handle, name) \ + if (handle != NULL) { \ + handle->region_end (handle, name); \ + } + +#ifdef __cplusplus +} +#endif + +#endif /* DYAD_PERF_DYAD_PERF_H */ \ No newline at end of file diff --git a/src/dyad/stream/Makefile.am b/src/dyad/stream/Makefile.am index 66851c31..ca3b4868 100644 --- a/src/dyad/stream/Makefile.am +++ b/src/dyad/stream/Makefile.am @@ -9,6 +9,7 @@ libdyad_fstream_la_LIBADD = $(top_builddir)/src/core/libdyad_core.la libdyad_fstream_la_CXXFLAGS = \ $(AM_CFLAGS) \ -I$(top_srcdir)/src/utils \ + -I$(top_srcdir)/src/common \ -I$(top_srcdir)/src/core \ -I$(top_srcdir)/src/dtl \ $(FLUX_CORE_CFLAGS) diff --git a/src/dyad/wrapper/Makefile.am b/src/dyad/wrapper/Makefile.am index a4875231..618f6232 100644 --- a/src/dyad/wrapper/Makefile.am +++ b/src/dyad/wrapper/Makefile.am @@ -11,6 +11,7 @@ dyad_wrapper_la_LIBADD = \ $(top_builddir)/src/core/libdyad_core.la dyad_wrapper_la_CPPFLAGS = \ -I$(top_srcdir)/src/utils \ + -I$(top_srcdir)/src/common \ -I$(top_srcdir)/src/core \ -I$(top_srcdir)/src/dtl \ $(FLUX_CORE_CFLAGS) \