From 4b859e11b0a289e945ba182dfc207cc3af111ac0 Mon Sep 17 00:00:00 2001 From: Ian Lumsden Date: Fri, 27 Oct 2023 15:54:52 -0400 Subject: [PATCH] Implements 'dyad_get_metadata' in core and adds Python bindings for it --- pydyad/pydyad/bindings.py | 264 ++++++++++++++++++++++++++++++++++++++ src/core/dyad_core.c | 210 +++++++++++++++++++++--------- src/core/dyad_core.h | 33 ++++- src/dtl/dyad_rc.h | 3 +- 4 files changed, 443 insertions(+), 67 deletions(-) create mode 100644 pydyad/pydyad/bindings.py diff --git a/pydyad/pydyad/bindings.py b/pydyad/pydyad/bindings.py new file mode 100644 index 00000000..f10aa3db --- /dev/null +++ b/pydyad/pydyad/bindings.py @@ -0,0 +1,264 @@ +import ctypes +from ctypes.util import find_library +from pathlib import Path +import warnings +import weakref + + +DYAD_LIB_DIR = None + + +class FluxHandle(ctypes.Structure): + pass + + +class DyadDTLHandle(ctypes.Structure): + pass + + +class DyadCtxWrapper(ctypes.Structure): + _fields_ = [ + ("h", ctypes.POINTER(FluxHandle)), + ("dtl_handle", ctypes.POINTER(DyadDTLHandle)), + ("debug", ctypes.c_bool), + ("check", ctypes.c_bool), + ("reenter", ctypes.c_bool), + ("initialized", ctypes.c_bool), + ("shared_storage", ctypes.c_bool), + ("sync_started", ctypes.c_bool), + ("key_depth", ctypes.c_uint), + ("key_bins", ctypes.c_uint), + ("rank", ctypes.c_uint32), + ("kvs_namespace", ctypes.c_char_p), + ("prod_managed_path", ctypes.c_char_p), + ("cons_managed_path", ctypes.c_char_p), + ] + + +class DyadMetadataWrapper(ctypes.Structure): + _fields_ = [ + ("fpath", ctypes.c_char_p), + ("owner_rank", ctypes.c_uint32), + ] + + +class DyadMetadata: + + def __init__(self, metadata_wrapper, dyad_obj): + self.mdata = metadata_wrapper + self.dyad_free_metadata = weakref.ref(dyad_obj.dyad_free_metadata) + self.mdata_attrs = [tup[0] for tup in metadata_wrapper._fields_] + + def __getattr__(self, attr_name): + if self.mdata is not None: + if attr_name not in self.mdata_attrs: + raise AttributeError("{} is not an attribute of DYAD's metadata".format(attr_name)) + return getattr(self.mdata.contents, attr_name) + raise AttributeError("Underlying metadata object has already been freed") + + def __del__(self): + if self.mdata is not None: + self.dyad_free_metadata(self.mdata) + self.mdata = None + + +class Dyad: + + def __init__(self): + self.initialized = False + self.dyad_core_lib = None + self.ctx = None + self.dyad_init = None + self.dyad_init_env = None + self.dyad_produce = None + self.dyad_consume = None + self.dyad_finalize = None + dyad_core_lib_file = None + self.cons_path = None + self.prod_path = None + dyad_core_lib_file = find_library("dyad_core") + if dyad_core_lib_file is None: + raise FileNotFoundError("Cannot find libdyad_core.so using 'ctypes'") + self.dyad_core_lib = ctypes.CDLL(dyad_core_lib_file) + if self.dyad_core_lib is None: + raise FileNotFoundError("Cannot find libdyad_core") + self.ctx = ctypes.POINTER(DyadCtxWrapper)() + self.dyad_init = self.dyad_core_lib.dyad_init + self.dyad_init.argtypes = [ + ctypes.c_bool, # debug + ctypes.c_bool, # check + ctypes.c_bool, # shared_storage + ctypes.c_uint, # key_depth + ctypes.c_uint, # key_bins + ctypes.c_char_p, # kvs_namespace + ctypes.c_char_p, # prod_managed_path + ctypes.c_char_p, # cons_managed_path + ctypes.POINTER(ctypes.POINTER(DyadCtxWrapper)), # ctx + ] + self.dyad_init.restype = ctypes.c_int + self.dyad_init_env = self.dyad_core_lib.dyad_init_env + self.dyad_init_env.argtypes = [ + ctypes.POINTER(ctypes.POINTER(DyadCtxWrapper)) + ] + self.dyad_init_env.restype = ctypes.c_int + self.dyad_produce = self.dyad_core_lib.dyad_produce + self.dyad_produce.argtypes = [ + ctypes.POINTER(DyadCtxWrapper), + ctypes.c_char_p, + ] + self.dyad_produce.restype = ctypes.c_int + self.dyad_get_metadata = self.dyad_core_lib.dyad_get_metadata + self.dyad_get_metadata.argtypes = [ + ctypes.POINTER(DyadCtxWrapper), + ctypes.c_char_p, + ctypes.c_bool, + ctypes.POINTER(ctypes.POINTER(DyadMetadataWrapper)), + ] + self.dyad_get_metadata.restype = ctypes.c_int + self.dyad_free_metadata = self.dyad_core_lib.dyad_free_metadata + self.dyad_free_metadata.argtypes = [ + ctypes.POINTER(DyadMetadataWrapper) + ] + self.dyad_free_metadata.restype = ctypes.c_int + self.dyad_consume = self.dyad_core_lib.dyad_consume + self.dyad_consume.argtypes = [ + ctypes.POINTER(DyadCtxWrapper), + ctypes.c_char_p, + ] + self.dyad_consume.restype = ctypes.c_int + self.dyad_finalize = self.dyad_core_lib.dyad_finalize + self.dyad_finalize.argtypes = [ + ctypes.POINTER(ctypes.POINTER(DyadCtxWrapper)), + ] + self.dyad_finalize.restype = ctypes.c_int + self.cons_path = None + self.prod_path = None + + def init(self, debug, check, shared_storage, key_depth, + key_bins, kvs_namespace, prod_managed_path, + cons_managed_path): + if self.dyad_init is None: + warnings.warn( + "Trying to initialize DYAD when libdyad_core.so was not found", + RuntimeWarning + ) + return + res = self.dyad_init( + ctypes.c_bool(debug), + ctypes.c_bool(check), + ctypes.c_bool(shared_storage), + ctypes.c_uint(key_depth), + ctypes.c_uint(key_bins), + kvs_namespace.encode() if kvs_namespace is not None else None, + prod_managed_path.encode() if prod_managed_path is not None else None, + cons_managed_path.encode() if cons_managed_path is not None else None, + ctypes.byref(self.ctx), + ) + if int(res) != 0: + raise RuntimeError("Could not initialize DYAD!") + if self.ctx.contents.prod_managed_path is None: + self.prod_path = None + else: + self.prod_path = Path(self.ctx.contents.prod_managed_path.decode("utf-8")).expanduser().resolve() + if self.ctx.contents.cons_managed_path is None: + self.cons_path = None + else: + self.cons_path = Path(self.ctx.contents.cons_managed_path.decode("utf-8")).expanduser().resolve() + self.initialized = True + + def init_env(self): + if self.dyad_init_env is None: + warnings.warn( + "Trying to initialize DYAD when libdyad_core.so was not found", + RuntimeWarning + ) + return + res = self.dyad_init_env( + ctypes.byref(self.ctx) + ) + if int(res) != 0: + raise RuntimeError("Could not initialize DYAD with environment variables") + if self.ctx.contents.prod_managed_path is None: + self.prod_path = None + else: + self.prod_path = Path(self.ctx.contents.prod_managed_path.decode("utf-8")).expanduser().resolve() + if self.ctx.contents.cons_managed_path is None: + self.cons_path = None + else: + self.cons_path = Path(self.ctx.contents.cons_managed_path.decode("utf-8")).expanduser().resolve() + + def __del__(self): + self.finalize() + + def produce(self, fname): + if self.dyad_produce is None: + warnings.warn( + "Trying to produce with DYAD when libdyad_core.so was not found", + RuntimeWarning + ) + return + res = self.dyad_produce( + self.ctx, + fname.encode(), + ) + if int(res) != 0: + raise RuntimeError("Cannot produce data with DYAD!") + + def get_metadata(self, fname, should_wait=False, raw=False): + if self.dyad_get_metadata is None: + warnings.warn( + "Trying to get metadata for file with DYAD when libdyad_core.so was not found", + RuntimeWarning + ) + return None + mdata = ctypes.POINTER(DyadMetadataWrapper)() + res = self.dyad_get_metadata( + self.ctx, + fname.encode(), + should_wait, + ctypes.byref(mdata) + ) + if int(res) != 0: + return None + if not raw: + return DyadMetadata(mdata, self) + return mdata + + def free_metadata(self, metadata_wrapper): + if self.dyad_free_metadata is None: + warnings.warn("Trying to free DYAD metadata when libdyad_core.so was not found", RuntimeWarning) + return + res = self.dyad_free_metadata( + metadata_wrapper + ) + if int(res) != 0: + raise RuntimeError("Could not free DYAD metadata") + + def consume(self, fname): + if self.dyad_consume is None: + warnings.warn( + "Trying to consunme with DYAD when libdyad_core.so was not found", + RuntimeWarning + ) + return + res = self.dyad_consume( + self.ctx, + fname.encode(), + ) + if int(res) != 0: + raise RuntimeError("Cannot consume data with DYAD!") + + def finalize(self): + if not self.initialized: + return + if self.dyad_finalize is None: + warnings.warn( + "Trying to finalize DYAD when libdyad_core.so was not found", + RuntimeWarning + ) + return + res = self.dyad_finalize( + ctypes.byref(self.ctx) + ) + if int(res) != 0: + raise RuntimeError("Cannot finalize DYAD!") diff --git a/src/core/dyad_core.c b/src/core/dyad_core.c index 018fc995..5a119359 100644 --- a/src/core/dyad_core.c +++ b/src/core/dyad_core.c @@ -183,43 +183,83 @@ commit_done:; return rc; } -DYAD_CORE_FUNC_MODS dyad_rc_t dyad_kvs_lookup (const dyad_ctx_t* ctx, - const char* restrict kvs_topic, - uint32_t* owner_rank, - flux_future_t** f) +DYAD_CORE_FUNC_MODS dyad_rc_t dyad_kvs_read (const dyad_ctx_t* ctx, + const char* topic, + bool should_wait, + dyad_metadata_t** mdata) { dyad_rc_t rc = DYAD_RC_OK; + int kvs_lookup_flags = 0; + flux_future_t* f = NULL; + if (mdata == NULL) { + DYAD_LOG_ERR (ctx, + "Metadata double pointer is NULL. Cannot correctly create metadata " + "object"); + rc = DYAD_RC_BADLOOKUP; + goto kvs_read_end; + } // Lookup information about the desired file (represented by kvs_topic) // from the Flux KVS. If there is no information, wait for it to be // made available - DYAD_LOG_INFO (ctx, "Retrieving information from KVS under the key %s\n", kvs_topic); - *f = flux_kvs_lookup (ctx->h, ctx->kvs_namespace, FLUX_KVS_WAITCREATE, kvs_topic); + if (should_wait) + kvs_lookup_flags = FLUX_KVS_WAITCREATE; + DYAD_LOG_INFO (ctx, "Retrieving information from KVS under the key %s\n", topic); + f = flux_kvs_lookup (ctx->h, ctx->kvs_namespace, kvs_lookup_flags, topic); // If the KVS lookup failed, log an error and return DYAD_BADLOOKUP - if (*f == NULL) { + if (f == NULL) { DYAD_LOG_ERR (ctx, "KVS lookup failed!\n"); - return DYAD_RC_BADLOOKUP; + rc = DYAD_RC_BADLOOKUP; + goto kvs_read_end; } // Extract the rank of the producer from the KVS response - DYAD_LOG_INFO (ctx, "Retrieving owner rank from KVS entry\n"); - rc = flux_kvs_lookup_get_unpack (*f, "i", owner_rank); + DYAD_LOG_INFO (ctx, "Building metadata object from KVS entry\n"); + if (*mdata != NULL) { + DYAD_LOG_INFO (ctx, "Metadata object is already allocated. Skipping allocation"); + } else { + *mdata = (dyad_metadata_t*)malloc (sizeof (struct dyad_metadata)); + if (*mdata == NULL) { + DYAD_LOG_ERR (ctx, "Cannot allocate memory for metadata object"); + rc = DYAD_RC_SYSFAIL; + goto kvs_read_end; + } + } + size_t topic_len = strlen (topic); + *mdata->fpath = (char*)malloc (topic_len + 1); + if (*mdata->fpath == NULL) { + DYAD_LOG_ERR (ctx, "Cannot allocate memory for fpath in metadata object"); + rc = DYAD_RC_SYSFAIL; + goto kvs_read_end; + } + memset (*mdata->fpath, '\0', topic_len + 1); + strncpy (*mdata->fpath, topic, topic_len); + rc = flux_kvs_lookup_get_unpack (f, "i", &((*mdata)->owner_rank)); // If the extraction did not work, log an error and return DYAD_BADFETCH if (rc < 0) { DYAD_LOG_ERR (ctx, "Could not unpack owner's rank from KVS response\n"); - return DYAD_RC_BADFETCH; + rc = DYAD_RC_BADMETADATA; + goto kvs_read_end; } - return DYAD_RC_OK; + rc = DYAD_RC_OK; + +kvs_read_end: + if (DYAD_IS_ERROR (rc) && mdata != NULL && *mdata != NULL) { + if (*mdata->fpath != NULL) + free (*mdata->fpath); + free (*mdata); + *mdata = NULL; + } + return rc; } DYAD_CORE_FUNC_MODS dyad_rc_t dyad_fetch (const dyad_ctx_t* restrict ctx, const char* restrict fname, - dyad_kvs_response_t** restrict resp) + dyad_mdata_t** restrict mdata) { dyad_rc_t rc = DYAD_RC_OK; char upath[PATH_MAX]; uint32_t owner_rank = 0; const size_t topic_len = PATH_MAX; char topic[PATH_MAX + 1]; - flux_future_t* f = NULL; memset (upath, 0, PATH_MAX); memset (topic, 0, topic_len + 1); // Extract the path to the file specified by fname relative to the @@ -227,20 +267,21 @@ DYAD_CORE_FUNC_MODS dyad_rc_t dyad_fetch (const dyad_ctx_t* restrict ctx, // This relative path will be stored in upath if (!cmp_canonical_path_prefix (ctx->cons_managed_path, fname, upath, PATH_MAX)) { DYAD_LOG_INFO (ctx, "%s is not in the Consumer's managed path\n", fname); - return DYAD_RC_OK; + rc = DYAD_RC_OK; + goto fetch_done; } DYAD_LOG_INFO (ctx, "Obtained file path relative to consumer directory: %s\n", upath); // Generate the KVS key from the file path relative to // the consumer-managed directory gen_path_key (upath, topic, topic_len, ctx->key_depth, ctx->key_bins); DYAD_LOG_INFO (ctx, "Generated KVS key for consumer: %s\n", topic); - // Call dyad_kvs_lookup to retrieve infromation about the file + // Call dyad_kvs_read to retrieve infromation about the file // from the Flux KVS - rc = dyad_kvs_lookup (ctx, topic, &owner_rank, &f); - // If an error occured in dyad_kvs_lookup, log it and propagate the return + rc = dyad_kvs_read (ctx, topic, true, mdata); + // If an error occured in dyad_kvs_read, log it and propagate the return // code if (DYAD_IS_ERROR (rc)) { - DYAD_LOG_ERR (ctx, "dyad_kvs_lookup failed!\n"); + DYAD_LOG_ERR (ctx, "dyad_kvs_read failed!\n"); goto fetch_done; } // There are two cases where we do not want to perform file transfer: @@ -255,42 +296,23 @@ DYAD_CORE_FUNC_MODS dyad_rc_t dyad_fetch (const dyad_ctx_t* restrict ctx, "is the " "same as the consumer rank\n"); *resp = NULL; + if (mdata != NULL && *mdata != NULL) { + if (*mdata->fpath != NULL) + free (*mdata->fpath); + free (*mdata); + *mdata = NULL; + } rc = DYAD_RC_OK; goto fetch_done; } - // Allocate and populate the dyad_kvs_response_t object. - // If an error occurs, log it and return the appropriate - // return code - DYAD_LOG_INFO (ctx, "Creating KVS response object to store retrieved data\n"); - *resp = malloc (sizeof (struct dyad_kvs_response)); - if (*resp == NULL) { - DYAD_LOG_ERR (ctx, "Cannot allocate a dyad_kvs_response_t object!\n"); - rc = DYAD_RC_BADRESPONSE; - goto fetch_done; - } - (*resp)->fpath = malloc (strlen (upath) + 1); - if ((*resp)->fpath == NULL) { - DYAD_LOG_ERR (ctx, - "Cannot allocate a buffer for the file path in the " - "dyad_kvs_response_t object\n"); - free (*resp); - rc = DYAD_RC_BADRESPONSE; - goto fetch_done; - } - strncpy ((*resp)->fpath, upath, strlen (upath) + 1); - (*resp)->owner_rank = owner_rank; rc = DYAD_RC_OK; + fetch_done:; - // Destroy the Flux future if needed - if (f != NULL) { - flux_future_destroy (f); - f = NULL; - } return rc; } DYAD_CORE_FUNC_MODS dyad_rc_t dyad_get_data (const dyad_ctx_t* ctx, - const dyad_kvs_response_t* restrict kvs_data, + const dyad_metadata_t* restrict mdata, const char** file_data, size_t* file_len) { @@ -300,8 +322,8 @@ DYAD_CORE_FUNC_MODS dyad_rc_t dyad_get_data (const dyad_ctx_t* ctx, json_t* rpc_payload; DYAD_LOG_INFO (ctx, "Packing payload for RPC to DYAD module"); rc = ctx->dtl_handle->rpc_pack (ctx->dtl_handle, - kvs_data->fpath, - kvs_data->owner_rank, + mdata->fpath, + mdata->owner_rank, &rpc_payload); if (DYAD_IS_ERROR (rc)) { DYAD_LOG_ERR (ctx, @@ -312,7 +334,7 @@ DYAD_CORE_FUNC_MODS dyad_rc_t dyad_get_data (const dyad_ctx_t* ctx, DYAD_LOG_INFO (ctx, "Sending payload for RPC to DYAD module"); f = flux_rpc_pack (ctx->h, DYAD_DTL_RPC_NAME, - kvs_data->owner_rank, + mdata->owner_rank, FLUX_RPC_STREAMING, "o", rpc_payload); @@ -333,7 +355,7 @@ DYAD_CORE_FUNC_MODS dyad_rc_t dyad_get_data (const dyad_ctx_t* ctx, DYAD_LOG_ERR (ctx, "Cannot establish connection with DYAD module on broker " "%u\n", - kvs_data->owner_rank); + mdata->owner_rank); goto get_done; } DYAD_LOG_INFO (ctx, "Receive file data via DTL"); @@ -377,7 +399,7 @@ get_done:; } DYAD_CORE_FUNC_MODS dyad_rc_t dyad_pull (const dyad_ctx_t* restrict ctx, - const dyad_kvs_response_t* restrict kvs_data) + const dyad_metadata_t* restrict mdata) { dyad_rc_t rc = DYAD_RC_OK; const char* file_data = NULL; @@ -393,14 +415,14 @@ DYAD_CORE_FUNC_MODS dyad_rc_t dyad_pull (const dyad_ctx_t* restrict ctx, // Call dyad_get_data to dispatch a RPC to the producer's Flux broker // and retrieve the data associated with the file - rc = dyad_get_data (ctx, kvs_data, &file_data, &file_len); + rc = dyad_get_data (ctx, mdata, &file_data, &file_len); if (DYAD_IS_ERROR (rc)) { goto pull_done; } // Build the full path to the file being consumed strncpy (file_path, ctx->cons_managed_path, PATH_MAX - 1); - concat_str (file_path, kvs_data->fpath, "/", PATH_MAX); + concat_str (file_path, mdata->fpath, "/", PATH_MAX); strncpy (file_path_copy, file_path, PATH_MAX); // dirname modifies the arg DYAD_LOG_INFO (ctx, "Saving retrieved data to %s\n", file_path); @@ -711,9 +733,75 @@ dyad_rc_t dyad_produce (dyad_ctx_t* ctx, const char* fname) return dyad_commit (ctx, fname); } +// DYAD_CORE_FUNC_MODS dyad_rc_t dyad_kvs_lookup (const dyad_ctx_t* ctx, +// const char* restrict kvs_topic, +// uint32_t* owner_rank, +// flux_future_t** f) +dyad_rc_t dyad_get_metadata (dyad_ctx_t* ctx, + const char* fname, + bool should_wait, + dyad_metadata_t** mdata) +{ + dyad_rc_t rc = DYAD_RC_OK; + const size_t topic_len = PATH_MAX; + char topic[PATH_MAX + 1]; + char upath[PATH_MAX]; + memset (topic, 0, topic_len + 1); + memset (topic, '\0', topic_len + 1); + memset (upath, 0, PATH_MAX); + // Extract the path to the file specified by fname relative to the + // producer-managed path + // This relative path will be stored in upath + DYAD_LOG_INFO (ctx, "Obtaining file path relative to consumer directory: %s", upath); + if (!cmp_canonical_path_prefix (ctx->cons_managed_path, fname, upath, PATH_MAX)) { + DYAD_LOG_INFO (ctx, "%s is not in the Consumer's managed path\n", fname); + rc = DYAD_RC_UNTRACKED; + goto get_metadata_done; + } + DYAD_LOG_INFO (ctx, "Generating KVS key: %s", topic); + gen_path_key (upath, topic, topic_len, ctx->key_depth, ctx->key_bins); + rc = dyad_kvs_read (ctx, topic, should_wait, mdata); + if (DYAD_IS_ERROR (rc)) { + DYAD_LOG_ERR (ctx, "Could not read data from the KVS"); + goto get_metadata_done; + } + free (*mdata->fpath); + size_t fname_len = strlen (fname); + *mdata->fpath = (char*)malloc (fname_len + 1); + if (*mdata->fpath == NULL) { + DYAD_LOG_ERR (ctx, "Could not allocate memory for fpath"); + rc = DYAD_RC_SYSFAIL; + goto get_metadata_done; + } + memset (*mdata->fpath, '\0', fname_len + 1); + strncpy (*mdata->fpath, fname, fname_len); + rc = DYAD_RC_OK; + +get_metadata_done: + if (DYAD_IS_ERROR (rc) && mdata != NULL && *mdata != NULL) { + if (*mdata->fpath != NULL) + free (*mdata->fpath); + free (*mdata); + *mdata = NULL; + } + return rc; +} + +dyad_rc_t dyad_free_metadata (dyad_metadata_t* mdata) +{ + if (mdata != NULL) { + if (mdata->fpath != NULL) + free (mdata->fpath); + free (mdata); + mdata = NULL; + } + return DYAD_RC_OK; +} + dyad_rc_t dyad_consume (dyad_ctx_t* ctx, const char* fname) { int rc = 0; + dyad_metadata_t* mdata = NULL; // If the context is not defined, then it is not valid. // So, return DYAD_NOCTX if (!ctx || !ctx->h) { @@ -729,8 +817,7 @@ dyad_rc_t dyad_consume (dyad_ctx_t* ctx, const char* fname) ctx->reenter = false; // Call dyad_fetch to get (and possibly wait on) // data from the Flux KVS - dyad_kvs_response_t* resp = NULL; - rc = dyad_fetch (ctx, fname, &resp); + rc = dyad_fetch (ctx, fname, &mdata); // If an error occured in dyad_fetch, log an error // and return the corresponding DYAD return code if (DYAD_IS_ERROR (rc)) { @@ -741,20 +828,21 @@ dyad_rc_t dyad_consume (dyad_ctx_t* ctx, const char* fname) // then we need to skip data transfer. // This will most likely happend because shared_storage // is enabled - if (resp == NULL) { + if (mdata == NULL) { DYAD_LOG_INFO (ctx, "The KVS response is NULL! Skipping dyad_pull!\n"); rc = DYAD_RC_OK; goto consume_done; } // Call dyad_pull to fetch the data from the producer's // Flux broker - rc = dyad_pull (ctx, resp); + rc = dyad_pull (ctx, mdata); // Regardless if there was an error in dyad_pull, // free the KVS response object - if (resp != NULL) { - free (resp->fpath); - free (resp); - resp = NULL; + if (mdata != NULL) { + if (mdata->fpath != NULL) + free (mdata->fpath); + free (mdata); + mdata = NULL; } // If an error occured in dyad_pull, log it // and return the corresponding DYAD return code diff --git a/src/core/dyad_core.h b/src/core/dyad_core.h index a84a6033..f8c33ee8 100644 --- a/src/core/dyad_core.h +++ b/src/core/dyad_core.h @@ -50,6 +50,12 @@ struct dyad_ctx { DYAD_DLL_EXPORTED extern const struct dyad_ctx dyad_ctx_default; typedef struct dyad_ctx dyad_ctx_t; +struct dyad_metadata { + char* fpath; + uint32_t owner_rank; +}; +typedef struct dyad_metadata dyad_metadata_t; + // Debug message #ifndef DPRINTF #define DPRINTF(curr_dyad_ctx, fmt, ...) \ @@ -84,7 +90,7 @@ typedef struct dyad_ctx dyad_ctx_t; * instance of DYAD * @param[out] ctx the newly initialized context * - * @return An integer error code (values TBD) + * @return An error code from dyad_rc.h */ DYAD_DLL_EXPORTED dyad_rc_t dyad_init (bool debug, bool check, @@ -100,7 +106,8 @@ DYAD_DLL_EXPORTED dyad_rc_t dyad_init (bool debug, /** * @brief Intialize the DYAD context using environment variables * @param[out] ctx the newly initialized context - * @return An error code + * + * @return An error code from dyad_rc.h */ DYAD_DLL_EXPORTED dyad_rc_t dyad_init_env (dyad_ctx_t** ctx); @@ -110,18 +117,34 @@ DYAD_DLL_EXPORTED dyad_rc_t dyad_init_env (dyad_ctx_t** ctx); * @param[in] ctx the DYAD context for the operation * @param[in] fname the name of the file being "produced" * - * @return An integer error code (values TBD) + * @return An error code from dyad_rc.h */ DYAD_PFA_ANNOTATE DYAD_DLL_EXPORTED dyad_rc_t dyad_produce (dyad_ctx_t* ctx, const char* fname); +/** + * @brief Obtain DYAD metadata for a file in the consumer-managed directory + * @param[in] ctx the DYAD context for the operation + * @param[in] fname the name of the file for which metadata is obtained + * @param[in] should_wait if true, wait for the file to be produced before returning + * @param[out] mdata a dyad_metadata_t object containing the metadata for the file + * + * @return An error code from dyad_rc.h + */ +DYAD_PFA_ANNOTATE DYAD_DLL_EXPORTED dyad_rc_t dyad_get_metadata (dyad_ctx_t* ctx, + const char* fname, + bool should_wait, + dyad_metadata_t** mdata); + +DYAD_PFA_ANNOTATE DYAD_DLL_EXPORTED dyad_rc_t dyad_free_metadata (dyad_metadata_t* mdata); + /** * @brief Wrapper function that performs all the common tasks needed * of a consumer * @param[in] ctx the DYAD context for the operation * @param[in] fname the name of the file being "consumed" * - * @return An integer error code (values TBD) + * @return An error code from dyad_rc.h */ DYAD_PFA_ANNOTATE DYAD_DLL_EXPORTED dyad_rc_t dyad_consume (dyad_ctx_t* ctx, const char* fname); @@ -130,7 +153,7 @@ DYAD_PFA_ANNOTATE DYAD_DLL_EXPORTED dyad_rc_t dyad_consume (dyad_ctx_t* ctx, * @brief Finalizes the DYAD instance and deallocates the context * @param[in] ctx the DYAD context being finalized * - * @return An integer error code (values TBD) + * @return An error code from dyad_rc.h */ DYAD_DLL_EXPORTED dyad_rc_t dyad_finalize (dyad_ctx_t** ctx); diff --git a/src/dtl/dyad_rc.h b/src/dtl/dyad_rc.h index 5ffccffc..83362068 100644 --- a/src/dtl/dyad_rc.h +++ b/src/dtl/dyad_rc.h @@ -26,7 +26,7 @@ enum dyad_core_return_codes { DYAD_RC_BADCOMMIT = -4, // Flux KVS commit didn't work DYAD_RC_BADLOOKUP = -5, // Flux KVS lookup didn't work DYAD_RC_BADFETCH = -6, // Flux KVS commit didn't work - DYAD_RC_BADRESPONSE = -7, // Cannot create/populate a DYAD response + DYAD_RC_BADMETADATA = -7, // Cannot create/populate a DYAD response DYAD_RC_BADRPC = -8, // Flux RPC pack or get didn't work DYAD_RC_BADFIO = -9, // File I/O failed DYAD_RC_BADMANAGEDPATH = -10, // Cons or Prod Manged Path is bad @@ -41,6 +41,7 @@ enum dyad_core_return_codes { // end of stream) sooner than expected DYAD_RC_BAD_B64DECODE = -18, // Decoding of data w/ base64 failed DYAD_RC_BAD_COMM_MODE = -19, // Invalid comm mode provided to DTL + DYAD_RC_UNTRACKED = -20, // Provided path is not tracked by DYAD }; typedef enum dyad_core_return_codes dyad_rc_t;