From f5776ebb3ea187febef41d5424a7e4eeb5783c1c Mon Sep 17 00:00:00 2001 From: Ian Lumsden Date: Fri, 27 Oct 2023 15:54:52 -0400 Subject: [PATCH] Implements 'dyad_get_metadata' in core and adds Python bindings for it --- pydyad/pydyad/bindings.py | 72 ++++++++++++- src/core/dyad_core.c | 214 +++++++++++++++++++++++++++----------- src/core/dyad_core.h | 33 +++++- src/dtl/dyad_rc.h | 5 +- 4 files changed, 255 insertions(+), 69 deletions(-) diff --git a/pydyad/pydyad/bindings.py b/pydyad/pydyad/bindings.py index 15069678..f10aa3db 100644 --- a/pydyad/pydyad/bindings.py +++ b/pydyad/pydyad/bindings.py @@ -1,8 +1,8 @@ import ctypes from ctypes.util import find_library -import os from pathlib import Path import warnings +import weakref DYAD_LIB_DIR = None @@ -35,6 +35,33 @@ class DyadCtxWrapper(ctypes.Structure): ] +class DyadMetadataWrapper(ctypes.Structure): + _fields_ = [ + ("fpath", ctypes.c_char_p), + ("owner_rank", ctypes.c_uint32), + ] + + +class DyadMetadata: + + def __init__(self, metadata_wrapper, dyad_obj): + self.mdata = metadata_wrapper + self.dyad_free_metadata = weakref.ref(dyad_obj.dyad_free_metadata) + self.mdata_attrs = [tup[0] for tup in metadata_wrapper._fields_] + + def __getattr__(self, attr_name): + if self.mdata is not None: + if attr_name not in self.mdata_attrs: + raise AttributeError("{} is not an attribute of DYAD's metadata".format(attr_name)) + return getattr(self.mdata.contents, attr_name) + raise AttributeError("Underlying metadata object has already been freed") + + def __del__(self): + if self.mdata is not None: + self.dyad_free_metadata(self.mdata) + self.mdata = None + + class Dyad: def __init__(self): @@ -80,6 +107,19 @@ def __init__(self): ctypes.c_char_p, ] self.dyad_produce.restype = ctypes.c_int + self.dyad_get_metadata = self.dyad_core_lib.dyad_get_metadata + self.dyad_get_metadata.argtypes = [ + ctypes.POINTER(DyadCtxWrapper), + ctypes.c_char_p, + ctypes.c_bool, + ctypes.POINTER(ctypes.POINTER(DyadMetadataWrapper)), + ] + self.dyad_get_metadata.restype = ctypes.c_int + self.dyad_free_metadata = self.dyad_core_lib.dyad_free_metadata + self.dyad_free_metadata.argtypes = [ + ctypes.POINTER(DyadMetadataWrapper) + ] + self.dyad_free_metadata.restype = ctypes.c_int self.dyad_consume = self.dyad_core_lib.dyad_consume self.dyad_consume.argtypes = [ ctypes.POINTER(DyadCtxWrapper), @@ -163,6 +203,36 @@ def produce(self, fname): ) if int(res) != 0: raise RuntimeError("Cannot produce data with DYAD!") + + def get_metadata(self, fname, should_wait=False, raw=False): + if self.dyad_get_metadata is None: + warnings.warn( + "Trying to get metadata for file with DYAD when libdyad_core.so was not found", + RuntimeWarning + ) + return None + mdata = ctypes.POINTER(DyadMetadataWrapper)() + res = self.dyad_get_metadata( + self.ctx, + fname.encode(), + should_wait, + ctypes.byref(mdata) + ) + if int(res) != 0: + return None + if not raw: + return DyadMetadata(mdata, self) + return mdata + + def free_metadata(self, metadata_wrapper): + if self.dyad_free_metadata is None: + warnings.warn("Trying to free DYAD metadata when libdyad_core.so was not found", RuntimeWarning) + return + res = self.dyad_free_metadata( + metadata_wrapper + ) + if int(res) != 0: + raise RuntimeError("Could not free DYAD metadata") def consume(self, fname): if self.dyad_consume is None: diff --git a/src/core/dyad_core.c b/src/core/dyad_core.c index 018fc995..14cbf99d 100644 --- a/src/core/dyad_core.c +++ b/src/core/dyad_core.c @@ -183,43 +183,87 @@ commit_done:; return rc; } -DYAD_CORE_FUNC_MODS dyad_rc_t dyad_kvs_lookup (const dyad_ctx_t* ctx, - const char* restrict kvs_topic, - uint32_t* owner_rank, - flux_future_t** f) +DYAD_CORE_FUNC_MODS dyad_rc_t dyad_kvs_read (const dyad_ctx_t* restrict ctx, + const char* restrict topic, + bool should_wait, + dyad_metadata_t** mdata) { dyad_rc_t rc = DYAD_RC_OK; + int kvs_lookup_flags = 0; + flux_future_t* f = NULL; + if (mdata == NULL) { + DYAD_LOG_ERR (ctx, + "Metadata double pointer is NULL. Cannot correctly create metadata " + "object"); + rc = DYAD_RC_NOTFOUND; + goto kvs_read_end; + } // Lookup information about the desired file (represented by kvs_topic) // from the Flux KVS. If there is no information, wait for it to be // made available - DYAD_LOG_INFO (ctx, "Retrieving information from KVS under the key %s\n", kvs_topic); - *f = flux_kvs_lookup (ctx->h, ctx->kvs_namespace, FLUX_KVS_WAITCREATE, kvs_topic); + if (should_wait) + kvs_lookup_flags = FLUX_KVS_WAITCREATE; + DYAD_LOG_INFO (ctx, "Retrieving information from KVS under the key %s\n", topic); + f = flux_kvs_lookup (ctx->h, ctx->kvs_namespace, kvs_lookup_flags, topic); // If the KVS lookup failed, log an error and return DYAD_BADLOOKUP - if (*f == NULL) { + if (f == NULL) { DYAD_LOG_ERR (ctx, "KVS lookup failed!\n"); - return DYAD_RC_BADLOOKUP; + rc = DYAD_RC_NOTFOUND; + goto kvs_read_end; } // Extract the rank of the producer from the KVS response - DYAD_LOG_INFO (ctx, "Retrieving owner rank from KVS entry\n"); - rc = flux_kvs_lookup_get_unpack (*f, "i", owner_rank); + DYAD_LOG_INFO (ctx, "Building metadata object from KVS entry\n"); + if (*mdata != NULL) { + DYAD_LOG_INFO (ctx, "Metadata object is already allocated. Skipping allocation"); + } else { + *mdata = (dyad_metadata_t*)malloc (sizeof (struct dyad_metadata)); + if (*mdata == NULL) { + DYAD_LOG_ERR (ctx, "Cannot allocate memory for metadata object"); + rc = DYAD_RC_SYSFAIL; + goto kvs_read_end; + } + } + size_t topic_len = strlen (topic); + *mdata->fpath = (char*)malloc (topic_len + 1); + if (*mdata->fpath == NULL) { + DYAD_LOG_ERR (ctx, "Cannot allocate memory for fpath in metadata object"); + rc = DYAD_RC_SYSFAIL; + goto kvs_read_end; + } + memset (*mdata->fpath, '\0', topic_len + 1); + strncpy (*mdata->fpath, topic, topic_len); + rc = flux_kvs_lookup_get_unpack (f, "i", &((*mdata)->owner_rank)); // If the extraction did not work, log an error and return DYAD_BADFETCH if (rc < 0) { DYAD_LOG_ERR (ctx, "Could not unpack owner's rank from KVS response\n"); - return DYAD_RC_BADFETCH; + rc = DYAD_RC_BADMETADATA; + goto kvs_read_end; } - return DYAD_RC_OK; + rc = DYAD_RC_OK; + +kvs_read_end: + if (DYAD_IS_ERROR (rc) && mdata != NULL && *mdata != NULL) { + if (*mdata->fpath != NULL) + free (*mdata->fpath); + free (*mdata); + *mdata = NULL; + } + if (f != NULL) { + flux_future_destroy (f); + f = NULL; + } + return rc; } DYAD_CORE_FUNC_MODS dyad_rc_t dyad_fetch (const dyad_ctx_t* restrict ctx, const char* restrict fname, - dyad_kvs_response_t** restrict resp) + dyad_mdata_t** restrict mdata) { dyad_rc_t rc = DYAD_RC_OK; char upath[PATH_MAX]; uint32_t owner_rank = 0; const size_t topic_len = PATH_MAX; char topic[PATH_MAX + 1]; - flux_future_t* f = NULL; memset (upath, 0, PATH_MAX); memset (topic, 0, topic_len + 1); // Extract the path to the file specified by fname relative to the @@ -227,20 +271,21 @@ DYAD_CORE_FUNC_MODS dyad_rc_t dyad_fetch (const dyad_ctx_t* restrict ctx, // This relative path will be stored in upath if (!cmp_canonical_path_prefix (ctx->cons_managed_path, fname, upath, PATH_MAX)) { DYAD_LOG_INFO (ctx, "%s is not in the Consumer's managed path\n", fname); - return DYAD_RC_OK; + rc = DYAD_RC_OK; + goto fetch_done; } DYAD_LOG_INFO (ctx, "Obtained file path relative to consumer directory: %s\n", upath); // Generate the KVS key from the file path relative to // the consumer-managed directory gen_path_key (upath, topic, topic_len, ctx->key_depth, ctx->key_bins); DYAD_LOG_INFO (ctx, "Generated KVS key for consumer: %s\n", topic); - // Call dyad_kvs_lookup to retrieve infromation about the file + // Call dyad_kvs_read to retrieve infromation about the file // from the Flux KVS - rc = dyad_kvs_lookup (ctx, topic, &owner_rank, &f); - // If an error occured in dyad_kvs_lookup, log it and propagate the return + rc = dyad_kvs_read (ctx, topic, true, mdata); + // If an error occured in dyad_kvs_read, log it and propagate the return // code if (DYAD_IS_ERROR (rc)) { - DYAD_LOG_ERR (ctx, "dyad_kvs_lookup failed!\n"); + DYAD_LOG_ERR (ctx, "dyad_kvs_read failed!\n"); goto fetch_done; } // There are two cases where we do not want to perform file transfer: @@ -255,42 +300,23 @@ DYAD_CORE_FUNC_MODS dyad_rc_t dyad_fetch (const dyad_ctx_t* restrict ctx, "is the " "same as the consumer rank\n"); *resp = NULL; + if (mdata != NULL && *mdata != NULL) { + if (*mdata->fpath != NULL) + free (*mdata->fpath); + free (*mdata); + *mdata = NULL; + } rc = DYAD_RC_OK; goto fetch_done; } - // Allocate and populate the dyad_kvs_response_t object. - // If an error occurs, log it and return the appropriate - // return code - DYAD_LOG_INFO (ctx, "Creating KVS response object to store retrieved data\n"); - *resp = malloc (sizeof (struct dyad_kvs_response)); - if (*resp == NULL) { - DYAD_LOG_ERR (ctx, "Cannot allocate a dyad_kvs_response_t object!\n"); - rc = DYAD_RC_BADRESPONSE; - goto fetch_done; - } - (*resp)->fpath = malloc (strlen (upath) + 1); - if ((*resp)->fpath == NULL) { - DYAD_LOG_ERR (ctx, - "Cannot allocate a buffer for the file path in the " - "dyad_kvs_response_t object\n"); - free (*resp); - rc = DYAD_RC_BADRESPONSE; - goto fetch_done; - } - strncpy ((*resp)->fpath, upath, strlen (upath) + 1); - (*resp)->owner_rank = owner_rank; rc = DYAD_RC_OK; + fetch_done:; - // Destroy the Flux future if needed - if (f != NULL) { - flux_future_destroy (f); - f = NULL; - } return rc; } DYAD_CORE_FUNC_MODS dyad_rc_t dyad_get_data (const dyad_ctx_t* ctx, - const dyad_kvs_response_t* restrict kvs_data, + const dyad_metadata_t* restrict mdata, const char** file_data, size_t* file_len) { @@ -300,8 +326,8 @@ DYAD_CORE_FUNC_MODS dyad_rc_t dyad_get_data (const dyad_ctx_t* ctx, json_t* rpc_payload; DYAD_LOG_INFO (ctx, "Packing payload for RPC to DYAD module"); rc = ctx->dtl_handle->rpc_pack (ctx->dtl_handle, - kvs_data->fpath, - kvs_data->owner_rank, + mdata->fpath, + mdata->owner_rank, &rpc_payload); if (DYAD_IS_ERROR (rc)) { DYAD_LOG_ERR (ctx, @@ -312,7 +338,7 @@ DYAD_CORE_FUNC_MODS dyad_rc_t dyad_get_data (const dyad_ctx_t* ctx, DYAD_LOG_INFO (ctx, "Sending payload for RPC to DYAD module"); f = flux_rpc_pack (ctx->h, DYAD_DTL_RPC_NAME, - kvs_data->owner_rank, + mdata->owner_rank, FLUX_RPC_STREAMING, "o", rpc_payload); @@ -333,7 +359,7 @@ DYAD_CORE_FUNC_MODS dyad_rc_t dyad_get_data (const dyad_ctx_t* ctx, DYAD_LOG_ERR (ctx, "Cannot establish connection with DYAD module on broker " "%u\n", - kvs_data->owner_rank); + mdata->owner_rank); goto get_done; } DYAD_LOG_INFO (ctx, "Receive file data via DTL"); @@ -377,7 +403,7 @@ get_done:; } DYAD_CORE_FUNC_MODS dyad_rc_t dyad_pull (const dyad_ctx_t* restrict ctx, - const dyad_kvs_response_t* restrict kvs_data) + const dyad_metadata_t* restrict mdata) { dyad_rc_t rc = DYAD_RC_OK; const char* file_data = NULL; @@ -393,14 +419,14 @@ DYAD_CORE_FUNC_MODS dyad_rc_t dyad_pull (const dyad_ctx_t* restrict ctx, // Call dyad_get_data to dispatch a RPC to the producer's Flux broker // and retrieve the data associated with the file - rc = dyad_get_data (ctx, kvs_data, &file_data, &file_len); + rc = dyad_get_data (ctx, mdata, &file_data, &file_len); if (DYAD_IS_ERROR (rc)) { goto pull_done; } // Build the full path to the file being consumed strncpy (file_path, ctx->cons_managed_path, PATH_MAX - 1); - concat_str (file_path, kvs_data->fpath, "/", PATH_MAX); + concat_str (file_path, mdata->fpath, "/", PATH_MAX); strncpy (file_path_copy, file_path, PATH_MAX); // dirname modifies the arg DYAD_LOG_INFO (ctx, "Saving retrieved data to %s\n", file_path); @@ -711,9 +737,75 @@ dyad_rc_t dyad_produce (dyad_ctx_t* ctx, const char* fname) return dyad_commit (ctx, fname); } +// DYAD_CORE_FUNC_MODS dyad_rc_t dyad_kvs_lookup (const dyad_ctx_t* ctx, +// const char* restrict kvs_topic, +// uint32_t* owner_rank, +// flux_future_t** f) +dyad_rc_t dyad_get_metadata (dyad_ctx_t* ctx, + const char* fname, + bool should_wait, + dyad_metadata_t** mdata) +{ + dyad_rc_t rc = DYAD_RC_OK; + const size_t topic_len = PATH_MAX; + char topic[PATH_MAX + 1]; + char upath[PATH_MAX]; + memset (topic, 0, topic_len + 1); + memset (topic, '\0', topic_len + 1); + memset (upath, 0, PATH_MAX); + // Extract the path to the file specified by fname relative to the + // producer-managed path + // This relative path will be stored in upath + DYAD_LOG_INFO (ctx, "Obtaining file path relative to consumer directory: %s", upath); + if (!cmp_canonical_path_prefix (ctx->cons_managed_path, fname, upath, PATH_MAX)) { + DYAD_LOG_INFO (ctx, "%s is not in the Consumer's managed path\n", fname); + rc = DYAD_RC_UNTRACKED; + goto get_metadata_done; + } + DYAD_LOG_INFO (ctx, "Generating KVS key: %s", topic); + gen_path_key (upath, topic, topic_len, ctx->key_depth, ctx->key_bins); + rc = dyad_kvs_read (ctx, topic, should_wait, mdata); + if (DYAD_IS_ERROR (rc)) { + DYAD_LOG_ERR (ctx, "Could not read data from the KVS"); + goto get_metadata_done; + } + free (*mdata->fpath); + size_t fname_len = strlen (fname); + *mdata->fpath = (char*)malloc (fname_len + 1); + if (*mdata->fpath == NULL) { + DYAD_LOG_ERR (ctx, "Could not allocate memory for fpath"); + rc = DYAD_RC_SYSFAIL; + goto get_metadata_done; + } + memset (*mdata->fpath, '\0', fname_len + 1); + strncpy (*mdata->fpath, fname, fname_len); + rc = DYAD_RC_OK; + +get_metadata_done: + if (DYAD_IS_ERROR (rc) && mdata != NULL && *mdata != NULL) { + if (*mdata->fpath != NULL) + free (*mdata->fpath); + free (*mdata); + *mdata = NULL; + } + return rc; +} + +dyad_rc_t dyad_free_metadata (dyad_metadata_t* mdata) +{ + if (mdata != NULL) { + if (mdata->fpath != NULL) + free (mdata->fpath); + free (mdata); + mdata = NULL; + } + return DYAD_RC_OK; +} + dyad_rc_t dyad_consume (dyad_ctx_t* ctx, const char* fname) { int rc = 0; + dyad_metadata_t* mdata = NULL; // If the context is not defined, then it is not valid. // So, return DYAD_NOCTX if (!ctx || !ctx->h) { @@ -729,8 +821,7 @@ dyad_rc_t dyad_consume (dyad_ctx_t* ctx, const char* fname) ctx->reenter = false; // Call dyad_fetch to get (and possibly wait on) // data from the Flux KVS - dyad_kvs_response_t* resp = NULL; - rc = dyad_fetch (ctx, fname, &resp); + rc = dyad_fetch (ctx, fname, &mdata); // If an error occured in dyad_fetch, log an error // and return the corresponding DYAD return code if (DYAD_IS_ERROR (rc)) { @@ -741,20 +832,21 @@ dyad_rc_t dyad_consume (dyad_ctx_t* ctx, const char* fname) // then we need to skip data transfer. // This will most likely happend because shared_storage // is enabled - if (resp == NULL) { + if (mdata == NULL) { DYAD_LOG_INFO (ctx, "The KVS response is NULL! Skipping dyad_pull!\n"); rc = DYAD_RC_OK; goto consume_done; } // Call dyad_pull to fetch the data from the producer's // Flux broker - rc = dyad_pull (ctx, resp); + rc = dyad_pull (ctx, mdata); // Regardless if there was an error in dyad_pull, // free the KVS response object - if (resp != NULL) { - free (resp->fpath); - free (resp); - resp = NULL; + if (mdata != NULL) { + if (mdata->fpath != NULL) + free (mdata->fpath); + free (mdata); + mdata = NULL; } // If an error occured in dyad_pull, log it // and return the corresponding DYAD return code diff --git a/src/core/dyad_core.h b/src/core/dyad_core.h index a84a6033..f8c33ee8 100644 --- a/src/core/dyad_core.h +++ b/src/core/dyad_core.h @@ -50,6 +50,12 @@ struct dyad_ctx { DYAD_DLL_EXPORTED extern const struct dyad_ctx dyad_ctx_default; typedef struct dyad_ctx dyad_ctx_t; +struct dyad_metadata { + char* fpath; + uint32_t owner_rank; +}; +typedef struct dyad_metadata dyad_metadata_t; + // Debug message #ifndef DPRINTF #define DPRINTF(curr_dyad_ctx, fmt, ...) \ @@ -84,7 +90,7 @@ typedef struct dyad_ctx dyad_ctx_t; * instance of DYAD * @param[out] ctx the newly initialized context * - * @return An integer error code (values TBD) + * @return An error code from dyad_rc.h */ DYAD_DLL_EXPORTED dyad_rc_t dyad_init (bool debug, bool check, @@ -100,7 +106,8 @@ DYAD_DLL_EXPORTED dyad_rc_t dyad_init (bool debug, /** * @brief Intialize the DYAD context using environment variables * @param[out] ctx the newly initialized context - * @return An error code + * + * @return An error code from dyad_rc.h */ DYAD_DLL_EXPORTED dyad_rc_t dyad_init_env (dyad_ctx_t** ctx); @@ -110,18 +117,34 @@ DYAD_DLL_EXPORTED dyad_rc_t dyad_init_env (dyad_ctx_t** ctx); * @param[in] ctx the DYAD context for the operation * @param[in] fname the name of the file being "produced" * - * @return An integer error code (values TBD) + * @return An error code from dyad_rc.h */ DYAD_PFA_ANNOTATE DYAD_DLL_EXPORTED dyad_rc_t dyad_produce (dyad_ctx_t* ctx, const char* fname); +/** + * @brief Obtain DYAD metadata for a file in the consumer-managed directory + * @param[in] ctx the DYAD context for the operation + * @param[in] fname the name of the file for which metadata is obtained + * @param[in] should_wait if true, wait for the file to be produced before returning + * @param[out] mdata a dyad_metadata_t object containing the metadata for the file + * + * @return An error code from dyad_rc.h + */ +DYAD_PFA_ANNOTATE DYAD_DLL_EXPORTED dyad_rc_t dyad_get_metadata (dyad_ctx_t* ctx, + const char* fname, + bool should_wait, + dyad_metadata_t** mdata); + +DYAD_PFA_ANNOTATE DYAD_DLL_EXPORTED dyad_rc_t dyad_free_metadata (dyad_metadata_t* mdata); + /** * @brief Wrapper function that performs all the common tasks needed * of a consumer * @param[in] ctx the DYAD context for the operation * @param[in] fname the name of the file being "consumed" * - * @return An integer error code (values TBD) + * @return An error code from dyad_rc.h */ DYAD_PFA_ANNOTATE DYAD_DLL_EXPORTED dyad_rc_t dyad_consume (dyad_ctx_t* ctx, const char* fname); @@ -130,7 +153,7 @@ DYAD_PFA_ANNOTATE DYAD_DLL_EXPORTED dyad_rc_t dyad_consume (dyad_ctx_t* ctx, * @brief Finalizes the DYAD instance and deallocates the context * @param[in] ctx the DYAD context being finalized * - * @return An integer error code (values TBD) + * @return An error code from dyad_rc.h */ DYAD_DLL_EXPORTED dyad_rc_t dyad_finalize (dyad_ctx_t** ctx); diff --git a/src/dtl/dyad_rc.h b/src/dtl/dyad_rc.h index 5ffccffc..eb2ec30f 100644 --- a/src/dtl/dyad_rc.h +++ b/src/dtl/dyad_rc.h @@ -24,9 +24,9 @@ enum dyad_core_return_codes { DYAD_RC_NOCTX = -2, // No DYAD Context found DYAD_RC_FLUXFAIL = -3, // Some Flux function failed DYAD_RC_BADCOMMIT = -4, // Flux KVS commit didn't work - DYAD_RC_BADLOOKUP = -5, // Flux KVS lookup didn't work + DYAD_RC_NOTFOUND = -5, // Flux KVS lookup didn't work DYAD_RC_BADFETCH = -6, // Flux KVS commit didn't work - DYAD_RC_BADRESPONSE = -7, // Cannot create/populate a DYAD response + DYAD_RC_BADMETADATA = -7, // Cannot create/populate a DYAD response DYAD_RC_BADRPC = -8, // Flux RPC pack or get didn't work DYAD_RC_BADFIO = -9, // File I/O failed DYAD_RC_BADMANAGEDPATH = -10, // Cons or Prod Manged Path is bad @@ -41,6 +41,7 @@ enum dyad_core_return_codes { // end of stream) sooner than expected DYAD_RC_BAD_B64DECODE = -18, // Decoding of data w/ base64 failed DYAD_RC_BAD_COMM_MODE = -19, // Invalid comm mode provided to DTL + DYAD_RC_UNTRACKED = -20, // Provided path is not tracked by DYAD }; typedef enum dyad_core_return_codes dyad_rc_t;