Skip to content

Commit

Permalink
Merge pull request #127 from wangvsa/main
Browse files Browse the repository at this point in the history
Clean the FLUX module code and fix getopt() bug
  • Loading branch information
wangvsa authored Dec 28, 2024
2 parents ceb0f6f + 71ed298 commit 06c3d85
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 87 deletions.
16 changes: 9 additions & 7 deletions docs/demos/ecp_feb_2023/cons.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ int consume_file (char* full_path, int32_t seed, int32_t* val_buf)
printf ("File %ld: Cannot open\n", (long)seed);
return -1;
}
// Read the file or abort if not possible
// Read then close the file
size_t items_read = fread (val_buf, sizeof (int32_t), NUM_VALS, fp);
fclose (fp);

// Print an error if the number of items read is not what was expected
if (items_read != NUM_VALS) {
fprintf (stderr, "Could not read the full file (%s)\n", full_path);
Expand All @@ -45,15 +46,14 @@ int main (int argc, char** argv)
int32_t num_transfers;
char* fpath;
int rc = parse_cmd_line (argc, argv, &num_transfers, &fpath);
// If an error occured during command-line parsing,
// abort the consumer
// abort if an error occured during command-line parsing,
if (rc != 0) {
return rc;
}

// Largest number of digits for a int32_t when converted to string
const size_t max_digits = 10;
// First 4 is for "data"
// Second 4 is for ".txt"
// First 4 is for "data" and second 4 is for ".txt"
size_t path_len = strlen (fpath) + 4 + max_digits + 4;
// Allocate a buffer for the file path
char* full_path = malloc (path_len + 1);
Expand Down Expand Up @@ -85,11 +85,13 @@ int main (int argc, char** argv)
}
// Validate the content of the file
if (vals_are_valid (seed, val_buf)) {
printf ("File %ld: OK\n", (long)seed);
printf ("File %ld validation: OK\n", (long)seed);
} else {
printf ("File %ld: BAD\n", (long)seed);
printf ("File %ld validation: BAD\n", (long)seed);
}
}
free (val_buf);
free (full_path);

return 0;
}
19 changes: 10 additions & 9 deletions docs/demos/ecp_feb_2023/prod.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,13 @@ int produce_file (char* full_path, int32_t seed, int32_t* val_buf)
printf ("File %ld: Cannot open\n", (long)seed);
return -1;
}
// Write the file or abort if not possible
size_t items_read = fwrite (val_buf, sizeof (int32_t), NUM_VALS, fp);
// Close the file
fclose (fp);

// Write then close the file
size_t items_write = fwrite (val_buf, sizeof(int32_t), NUM_VALS, fp);
fclose(fp);

// If an error occured during writing, abort
if (items_read != NUM_VALS) {
if (items_write != NUM_VALS) {
fprintf (stderr, "Could not write the full file (%s)\n", full_path);
printf ("File %ld: Cannot write\n", (long)seed);
return -1;
Expand All @@ -43,15 +44,13 @@ int main (int argc, char** argv)
int32_t num_transfers;
char* fpath;
int rc = parse_cmd_line (argc, argv, &num_transfers, &fpath);
// If an error occured during command-line parsing,
// abort the consumer
// abort if an error occured during command-line parsing
if (rc != 0) {
return rc;
}
// Largest number of digits for a int32_t when converted to string
const size_t max_digits = 10;
// First 4 is for "data"
// Second 4 is for ".txt"
// First 4 is for "data", second 4 is for ".txt"
size_t path_len = strlen (fpath) + 4 + max_digits + 4;
// Allocate a buffer for the file path
char* full_path = malloc (path_len + 1);
Expand Down Expand Up @@ -86,4 +85,6 @@ int main (int argc, char** argv)
}
}
free (full_path);

return 0;
}
171 changes: 100 additions & 71 deletions src/dyad/modules/dyad.c
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,26 @@
((double)(1000000000L * ((Tend).tv_sec - (Tstart).tv_sec) + (Tend).tv_nsec - (Tstart).tv_nsec) \
/ 1000000000L)

struct dyad_mod_ctx {
/**
* Flux services are implemented as dynamically loaded broker
* plugins called “broker modules”.
* The broker module implementing a new service is expected to
* register message handlers for its methods, then run the
* flux reactor. It should use event driven (reactive) programming
* techniques to remain responsive while juggling work from multiple
* clients.
*
* This code implements such a flux module, which can be loaded
* using "flux module load".
*/

typedef struct dyad_mod_ctx {
flux_msg_handler_t **handlers;
dyad_ctx_t *ctx;
};
} dyad_mod_ctx_t;

const struct dyad_mod_ctx dyad_mod_ctx_default = {NULL, NULL};

typedef struct dyad_mod_ctx dyad_mod_ctx_t;

static void dyad_mod_fini (void) __attribute__ ((destructor));

void dyad_mod_fini (void)
Expand Down Expand Up @@ -291,15 +302,16 @@ struct opt_parse_out {
const char *prod_managed_path;
const char *dtl_mode;
bool debug;
bool showed_help;
};

typedef struct opt_parse_out opt_parse_out_t;

static int opt_parse (opt_parse_out_t *restrict opt,
const unsigned broker_rank,
dyad_dtl_mode_t *restrict dtl_mode,
int _argc,
char **restrict _argv)
int opt_parse (opt_parse_out_t *restrict opt,
const unsigned broker_rank,
dyad_dtl_mode_t *restrict dtl_mode,
int argc,
char **restrict argv)
{
#ifndef DYAD_LOGGER_NO_LOG
char log_file_name[PATH_MAX + 1] = {'\0'};
Expand All @@ -310,49 +322,49 @@ static int opt_parse (opt_parse_out_t *restrict opt,
*dtl_mode = DYAD_DTL_END;

int rc = DYAD_RC_OK;
int argc = 0;
char *argv[PATH_MAX] = {NULL};
char *prod_managed_path = NULL;

if (opt == NULL)
return rc;

if ((argc = _argc + 1) > PATH_MAX) {
DYAD_LOG_STDERR ("DYAD_MOD: too many options.\n");
return DYAD_RC_SYSFAIL;
// In case getopt() is called multiple times, e.g.,
// when doing "flux module load dyad.so -h"
// optind must be reset to 1.
// Otherwise, getopt() may cause crash.
// Note, getopt() assumes the first argument, i.e.,
// argv[0] to be the executable name, so it starts
// checking from optind = 1.
// since Flux module argv doesn't contain the executable
// name as its first argument, we need to create a dummy
// _argc and _argv here for getopt() to work properly.
extern int optind;
optind = 1;
int _argc = argc + 1;
char **_argv = malloc (sizeof (char *) * _argc);
_argv[0] = NULL;
for (int i = 1; i < _argc; i++) {
// we will reuse the same same string in argv[].
_argv[i] = argv[i - 1];
}

for (int i = 1; i < argc; ++i) {
argv[i] = _argv[i - 1];
// DYAD_LOG_STDERR ("DYAD_MOD: argv[%d] = '%s'\n", i, argv[i]);
}

while (1) {
static struct option long_options[] = {{"help", no_argument, 0, 'h'},
{"debug", no_argument, 0, 'd'},
{"mode", required_argument, 0, 'm'},
{"info_log", required_argument, 0, 'i'},
{"error_log", required_argument, 0, 'e'},
{0, 0, 0, 0}};
/* getopt_long stores the option index here. */
int option_index = 0;
int c = -1;

c = getopt_long (argc, argv, "hdm:i:e:", long_options, &option_index);

/* Detect the end of the options. */
if (c == -1) {
// DYAD_LOG_STDERR ("DYAD_MOD: no more option.\n");
break;
}
DYAD_LOG_STDERR ("DYAD_MOD: opt %c, index %d\n", (char)c, optind);
static struct option long_options[] = {{"help", no_argument, 0, 'h'},
{"debug", no_argument, 0, 'd'},
{"mode", required_argument, 0, 'm'},
{"info_log", required_argument, 0, 'i'},
{"error_log", required_argument, 0, 'e'},
{0, 0, 0, 0}};

int c;
while ((c = getopt_long (_argc, _argv, "hdm:i:e:", long_options, NULL)) != -1) {
switch (c) {
case 'h':
show_help ();
// set this to true, so we later we will directly
// return without loading the Flux module.
opt->showed_help = true;
break;
case 'd':
DYAD_LOG_STDERR ("DYAD_MOD: 'debug' option -d \n");
DYAD_LOG_STDERR ("DYAD_MOD: 'debug' option -d\n");
opt->debug = true;
break;
case 'm':
Expand All @@ -368,7 +380,7 @@ static int opt_parse (opt_parse_out_t *restrict opt,
case 'i':
#ifndef DYAD_LOGGER_NO_LOG
DYAD_LOG_STDERR ("DYAD_MOD: 'info_log' option -i with value `%s'\n", optarg);
sprintf (log_file_name, "%s_%u.out", optarg, broker_rank);
sprintf (log_file_name, "%s_%d.out", optarg, broker_rank);
#endif // DYAD_LOGGER_NO_LOG
break;
case 'e':
Expand All @@ -382,6 +394,7 @@ static int opt_parse (opt_parse_out_t *restrict opt,
break;
default:
DYAD_LOG_STDERR ("DYAD_MOD: option parsing failed %d\n", c);
free (_argv);
return DYAD_RC_SYSFAIL;
}
}
Expand All @@ -395,18 +408,21 @@ static int opt_parse (opt_parse_out_t *restrict opt,
opt->dtl_mode = NULL;
}

/* Print any remaining command line arguments (not options). */
while (optind < argc) {
DYAD_LOG_STDERR ("DYAD_MOD: positional arguments %s\n", argv[optind]);
prod_managed_path = argv[optind++];
// Retrive the remaining command line argument (not options).
// it is expected to be the producer managed directory
while (optind < _argc) {
DYAD_LOG_STDERR ("DYAD_MOD: positional arguments %s\n", _argv[optind]);
prod_managed_path = _argv[optind++];
}
opt->prod_managed_path = prod_managed_path;

free (_argv);
return DYAD_RC_OK;
}

dyad_rc_t dyad_module_ctx_init (const opt_parse_out_t *opt, flux_t *h)
{ // Initialize DYAD context
{
// get DYAD Flux module
dyad_mod_ctx_t *mod_ctx = get_mod_ctx (h);

if (mod_ctx == NULL || opt == NULL || h == NULL) {
Expand All @@ -417,60 +433,67 @@ dyad_rc_t dyad_module_ctx_init (const opt_parse_out_t *opt, flux_t *h)
setenv (DYAD_PATH_PRODUCER_ENV, opt->prod_managed_path, 1);
const mode_t m = (S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH | S_ISGID);
mkdir_as_needed (opt->prod_managed_path, m);
DYAD_LOG_STDERR ("DYAD_MOD: Loading DYAD Module with Path %s", opt->prod_managed_path);
DYAD_LOG_STDERR ("DYAD_MOD: Loading DYAD Module with Path %s\n", opt->prod_managed_path);
}

if (opt->dtl_mode) {
setenv (DYAD_DTL_MODE_ENV, opt->dtl_mode, 1);
DYAD_LOG_STDERR (
"DYAD_MOD: DTL 'mode' option set. "
"Setting env %s=%s",
DYAD_DTL_MODE_ENV,
opt->dtl_mode);
DYAD_LOG_STDERR ("DYAD_MOD: DTL 'mode' option set. Setting env %s=%s\n",
DYAD_DTL_MODE_ENV,
opt->dtl_mode);
} else {
DYAD_LOG_STDERR (
"DYAD_MOD: Did not find DTL 'mode' option. "
"Using env %s=%s",
DYAD_DTL_MODE_ENV,
getenv (DYAD_DTL_MODE_ENV));
DYAD_LOG_STDERR ("DYAD_MOD: Did not find DTL 'mode' option. Using env %s=%s\n",
DYAD_DTL_MODE_ENV,
getenv (DYAD_DTL_MODE_ENV));
}
char *kvs = getenv (DYAD_KVS_NAMESPACE_ENV);
if (kvs != NULL) {
DYAD_LOG_STDERR ("DYAD_MOD: %s is set to `%s'\n", DYAD_KVS_NAMESPACE_ENV, kvs);

char *kvs_namespace = getenv ("DYAD_KVS_NAMESPACE");
if (kvs_namespace != NULL) {
DYAD_LOG_STDERR ("DYAD_MOD: DYAD_KVS_NAMESPACE is set to `%s'\n", kvs_namespace);
} else {
DYAD_LOG_STDERR ("DYAD_MOD: %s is not set\n", DYAD_KVS_NAMESPACE_ENV);
DYAD_LOG_STDERR ("DYAD_MOD: DYAD_KVS_NAMESPACE is not set\n");
// Required so that dyad_ctx_init can pass
// TODO: figure out a better for this.
setenv (DYAD_KVS_NAMESPACE_ENV, "dyad_module_dummy_env", 1);
}

// Initialize DYAD context
dyad_ctx_init (DYAD_COMM_SEND, h);
mod_ctx->ctx = dyad_ctx_get ();
dyad_ctx_t *ctx = mod_ctx->ctx;
dyad_ctx_t *ctx = dyad_ctx_get ();
mod_ctx->ctx = ctx;

if (ctx == NULL) {
DYAD_LOG_STDERR ("DYAD_MOD: dyad_ctx_init() failed!");
DYAD_LOG_STDERR ("DYAD_MOD: dyad_ctx_init() failed!\n");
return DYAD_RC_NOCTX;
}
ctx->h = h;
ctx->debug = opt->debug;

if (ctx->dtl_handle == NULL) {
DYAD_LOG_STDERR ("DYAD_MOD: dyad_ctx_init() failed to initialize DTL!");
DYAD_LOG_STDERR ("DYAD_MOD: dyad_ctx_init() failed to initialize DTL!\n");
return DYAD_RC_NOCTX;
}

return DYAD_RC_OK;
}

/**
* @brief This is the starting point for a new FLUX broker module thread
* The flux_t handle provides direct communication with the
* broker over shared memory. The argument list is derived from
* the free arguments on the flux module load command line.
* When mod_main() returns, the thread is terminated and the
* module is unloaded.
*/
DYAD_DLL_EXPORTED int mod_main (flux_t *h, int argc, char **argv)
{
DYAD_LOGGER_INIT ();
DYAD_LOG_STDOUT ("Loading mod_main\n");
DYAD_LOG_STDOUT ("DYAD_MOD: Loading mod_main\n");
dyad_mod_ctx_t *mod_ctx = NULL;
dyad_dtl_mode_t dtl_mode = DYAD_DTL_DEFAULT;

if (!h) {
DYAD_LOG_STDERR ("Failed to get flux handle\n");
DYAD_LOG_STDERR ("DYAD_MOD: Failed to get flux handle\n");
goto mod_done;
}

Expand All @@ -484,14 +507,20 @@ DYAD_DLL_EXPORTED int mod_main (flux_t *h, int argc, char **argv)
#endif
DYAD_C_FUNCTION_START ();

opt_parse_out_t opt = {NULL, NULL, false};
DYAD_LOG_STDERR ("DYAD_MOD: Parsing command line options");
opt_parse_out_t opt = {NULL, NULL, false, false};
DYAD_LOG_STDOUT ("DYAD_MOD: Parsing command line options\n");

if (DYAD_IS_ERROR (opt_parse (&opt, broker_rank, &dtl_mode, argc, argv))) {
DYAD_LOG_STDERR ("DYAD_MOD: Cannot parse command line arguments");
DYAD_LOG_STDERR ("DYAD_MOD: Cannot parse command line arguments\n");
goto mod_error;
}
// the service was invoked with "-h"
// then we return directly after printing out help message
if (opt.showed_help) {
goto mod_done;
}

// initialize mod_ctx->ctx, which is the dyad context
if (DYAD_IS_ERROR (dyad_module_ctx_init (&opt, h))) {
goto mod_error;
}
Expand All @@ -502,10 +531,10 @@ DYAD_DLL_EXPORTED int mod_main (flux_t *h, int argc, char **argv)
}

if (flux_reactor_run (flux_get_reactor (mod_ctx->ctx->h), 0) < 0) {
DYAD_LOG_DEBUG (mod_ctx->ctx, "DYAD_MOD: flux_reactor_run: %s", strerror (errno));
DYAD_LOG_ERROR (mod_ctx->ctx, "DYAD_MOD: flux_reactor_run: %s\n", strerror (errno));
goto mod_error;
}
DYAD_LOG_DEBUG (mod_ctx->ctx, "DYAD_MOD: Finished");
DYAD_LOG_STDOUT ("DYAD_MOD: Finished\n");
goto mod_done;

mod_error:;
Expand Down

0 comments on commit 06c3d85

Please sign in to comment.