Skip to content

Commit

Permalink
Merge pull request #148 from mochi-hpc/carns/dev-abt-init-handling
Browse files Browse the repository at this point in the history
improve handling of external Argobots initialization
  • Loading branch information
carns authored Jul 1, 2021
2 parents 685bad6 + 66aab25 commit ff094a5
Show file tree
Hide file tree
Showing 4 changed files with 250 additions and 15 deletions.
16 changes: 16 additions & 0 deletions include/margo.h
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,22 @@ margo_instance_id margo_init_ext(const char* address,
int mode,
const struct margo_init_info* args);

/**
* Configures the runtime environment dependencies for use by Margo without
* initializing Margo. The primary purpose of this function is to set
* preferred environment variables for Argobots (e.g., ULT stack size) if
* Argobots will be initialized before calling margo_init() or
* margo_init_ext().
*
* @param [in] optional_json_config The json-formatted configuration
* parameters to be used by Margo when it
* is initialized later (if known). If not
* specified, then margo_set_environment()
* will use default values.
* @returns returns 0 on success, negative value on failure
*/
int margo_set_environment(const char* optional_json_config);

/**
* Initializes margo library with custom Mercury options.
* @param [in] addr_str Mercury host address with port number
Expand Down
112 changes: 99 additions & 13 deletions src/margo-init.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
#include "margo-prio-pool.h"
#include "abtx_prof.h"

/* default values for key ABT parameters if not specified */
#define MARGO_DEFAULT_ABT_MEM_MAX_NUM_STACKS 8
#define MARGO_DEFAULT_ABT_THREAD_STACKSIZE 2097152

// Validates the format of the configuration and
// fill default values if they are note provided
static int
Expand Down Expand Up @@ -50,11 +54,37 @@ static int create_xstream_from_config(struct json_object* es_config,

// Sets environment variables for Argobots
static void set_argobots_environment_variables(struct json_object* config);
/* confirm if Argobots is running with desired configuration or not */
static void confirm_argobots_configuration(struct json_object* config);

// Shutdown logic for a margo instance
static void remote_shutdown_ult(hg_handle_t handle);
static DECLARE_MARGO_RPC_HANDLER(remote_shutdown_ult)

int margo_set_environment(const char* optional_json_config)
{
struct json_object* config = NULL;
struct json_tokener* tokener = json_tokener_new();
enum json_tokener_error jerr;

if (optional_json_config && strlen(optional_json_config) > 0) {
config = json_tokener_parse_ex(tokener, optional_json_config,
strlen(optional_json_config));
if (!config) {
jerr = json_tokener_get_error(tokener);
MARGO_ERROR(0, "JSON parse error: %s",
json_tokener_error_desc(jerr));
json_tokener_free(tokener);
return -1;
}
}
json_tokener_free(tokener);

set_argobots_environment_variables(config);

return (0);
}

margo_instance_id margo_init_ext(const char* address,
int mode,
const struct margo_init_info* uargs)
Expand Down Expand Up @@ -180,12 +210,13 @@ margo_instance_id margo_init_ext(const char* address,
g_margo_abt_init = 1;
ret = ABT_mutex_create(&g_margo_num_instances_mtx);
if (ret != 0) goto error;
} else {
MARGO_WARNING(0,
"Argobots was initialized externally, so margo_init_ext "
"could not set Argobots environment variables");
}

/* Check if Argobots is now initialized with the desired parameters
* (regardless of whether Margo initialized it or not)
*/
confirm_argobots_configuration(config);

/* Turn on profiling capability if a) it has not been done already (this
* is global to Argobots) and b) the argobots tool interface is enabled.
*/
Expand Down Expand Up @@ -673,8 +704,10 @@ validate_and_complete_config(struct json_object* _margo,

/* ------- Argobots configuration ------ */
/* Fields:
- abt_mem_max_num_stacks: integer >= 0 (default 8)
- abt_thread_stacksize: integer >= 0 (default 2097152)
- abt_mem_max_num_stacks: integer >= 0 (default
MARGO_DEFAULT_ABT_MEM_MAX_NUM_STACKS)
- abt_thread_stacksize: integer >= 0 (default
MARGO_DEFAULT_ABT_THREAD_STACKSIZE)
- pools: array
- schedulers: array
- xstreams: array
Expand All @@ -685,8 +718,9 @@ validate_and_complete_config(struct json_object* _margo,
{ // handle abt_mem_max_num_stacks
const char* abt_mem_max_num_stacks_str
= getenv("ABT_MEM_MAX_NUM_STACKS");
int abt_mem_max_num_stacks
= abt_mem_max_num_stacks_str ? atoi(abt_mem_max_num_stacks_str) : 8;
int abt_mem_max_num_stacks = abt_mem_max_num_stacks_str
? atoi(abt_mem_max_num_stacks_str)
: MARGO_DEFAULT_ABT_MEM_MAX_NUM_STACKS;
if (abt_mem_max_num_stacks_str) {
CONFIG_OVERRIDE_INTEGER(_argobots, "abt_mem_max_num_stacks",
abt_mem_max_num_stacks,
Expand All @@ -706,7 +740,7 @@ validate_and_complete_config(struct json_object* _margo,
const char* abt_thread_stacksize_str = getenv("ABT_THREAD_STACKSIZE");
int abt_thread_stacksize = abt_thread_stacksize_str
? atoi(abt_thread_stacksize_str)
: 2097152;
: MARGO_DEFAULT_ABT_THREAD_STACKSIZE;
if (abt_thread_stacksize_str) {
CONFIG_OVERRIDE_INTEGER(_argobots, "abt_thread_stacksize",
abt_thread_stacksize,
Expand Down Expand Up @@ -1438,16 +1472,68 @@ static int create_xstream_from_config(struct json_object* es_config,
return ABT_SUCCESS;
}

static void set_argobots_environment_variables(struct json_object* config)
static void confirm_argobots_configuration(struct json_object* config)
{
/* this function assumes that the json is already fully populated */
size_t runtime_abt_thread_stacksize = 0;

/* retrieve expected values according to Margo configuration */
struct json_object* argobots = json_object_object_get(config, "argobots");
int abt_mem_max_num_stacks = json_object_get_int64(
json_object_object_get(argobots, "abt_mem_max_num_stacks"));
int abt_thread_stacksize = json_object_get_int64(
int abt_thread_stacksize = json_object_get_int64(
json_object_object_get(argobots, "abt_thread_stacksize"));

/* NOTE: we skip checking num_stacks; this cannot be retrieved with
* ABT_info_query_config(). Fortunately it also is not as crucial as the
* stack size. Recent ABT releases have conservative caps on stack
* cache sizes by default.
*/

/* query Argobots to see if it is in agreement */
ABT_info_query_config(ABT_INFO_QUERY_KIND_DEFAULT_THREAD_STACKSIZE,
&runtime_abt_thread_stacksize);
if (runtime_abt_thread_stacksize != abt_thread_stacksize) {
MARGO_WARNING(
0,
"Margo requested an Argobots ULT stack size of %d, but "
"Argobots is using a ULT stack size of %zd. "
"If you initialized Argobots externally before calling "
"margo_init(), please consider calling the margo_set_environment() "
"function before ABT_init() in order to set preferred Argobots "
"parameters for Margo usage. "
"Margo is likely to encounter stack overflows and memory "
"corruption if the Argobots stack size is not large "
"enough to accomodate typical userspace network "
"transport libraries.",
abt_thread_stacksize, runtime_abt_thread_stacksize);
}
return;
}

static void set_argobots_environment_variables(struct json_object* config)
{
int abt_mem_max_num_stacks = MARGO_DEFAULT_ABT_MEM_MAX_NUM_STACKS;
int abt_thread_stacksize = MARGO_DEFAULT_ABT_THREAD_STACKSIZE;

/* handle cases in which config is not yet fully resolved */
if (config) {
struct json_object* argobots
= json_object_object_get(config, "argobots");
struct json_object* param;

if (argobots) {
if ((param
= json_object_object_get(argobots, "abt_mem_max_num_stacks")))
abt_mem_max_num_stacks = json_object_get_int64(param);
if ((param
= json_object_object_get(argobots, "abt_thread_stacksize")))
abt_thread_stacksize = json_object_get_int64(param);
}
}

margo_set_abt_mem_max_num_stacks(abt_mem_max_num_stacks);
margo_set_abt_thread_stacksize(abt_thread_stacksize);

return;
}

static void remote_shutdown_ult(hg_handle_t handle)
Expand Down
11 changes: 9 additions & 2 deletions tests/unit-tests/Makefile.subdir
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,17 @@ check_PROGRAMS += \
tests/unit-tests/margo-init \
tests/unit-tests/margo-pool \
tests/unit-tests/margo-abt-pool \
tests/unit-tests/margo-logging
tests/unit-tests/margo-logging \
tests/unit-tests/margo-after-abt-init

TESTS += \
tests/unit-tests/margo-addr \
tests/unit-tests/margo-diag \
tests/unit-tests/margo-init \
tests/unit-tests/margo-pool \
tests/unit-tests/margo-abt-pool \
tests/unit-tests/margo-logging
tests/unit-tests/margo-logging \
tests/unit-tests/margo-after-abt-init

tests_unit_tests_margo_addr_SOURCES = \
tests/unit-tests/munit/munit.c \
Expand Down Expand Up @@ -45,4 +47,9 @@ tests_unit_tests_margo_logging_SOURCES = \
tests/unit-tests/margo-logging.c \
tests/unit-tests/helper-server.c

tests_unit_tests_margo_after_abt_init_SOURCES = \
tests/unit-tests/munit/munit.c \
tests/unit-tests/margo-after-abt-init.c \
tests/unit-tests/helper-server.c

endif
126 changes: 126 additions & 0 deletions tests/unit-tests/margo-after-abt-init.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@

#include <margo.h>
#include "helper-server.h"
#include "munit/munit.h"

struct test_context {
margo_instance_id mid;
char* log_buffer;
int log_buffer_pos;
int log_buffer_size;
};

static void test_log_fn(void* uargs, const char* str)
{
struct test_context* ctx = uargs;

/* check for overflow */
munit_assert_int(strlen(str) + ctx->log_buffer_pos, <,
ctx->log_buffer_size);

/* directly copy msg to buffer */
strcpy(&ctx->log_buffer[ctx->log_buffer_pos], str);
ctx->log_buffer_pos += strlen(str);

return;
}

struct margo_logger test_logger;

static void* test_context_setup(const MunitParameter params[], void* user_data)
{
(void)params;
(void)user_data;
int ret;

struct test_context* ctx = calloc(1, sizeof(*ctx));
ctx->log_buffer = calloc(102400, 1);
ctx->log_buffer_size = 102400;

/* set up custom logger to make it easier to validate output */
test_logger.uargs = ctx;
test_logger.trace = test_log_fn;
test_logger.debug = test_log_fn;
test_logger.info = test_log_fn;
test_logger.warning = test_log_fn;
test_logger.error = test_log_fn;
test_logger.critical = test_log_fn;

ret = margo_set_global_logger(&test_logger);
munit_assert_int(ret, ==, 0);

return ctx;
}

static void test_context_tear_down(void* data)
{
struct test_context* ctx = (struct test_context*)data;

free(ctx->log_buffer);
free(ctx);
}

static MunitResult margo_after_abt(const MunitParameter params[], void* data)
{
struct test_context* ctx = (struct test_context*)data;
char* protocol = "na+sm";
int ret;

ret = ABT_init(0, NULL);
munit_assert_int(ret, ==, 0);

ctx->mid = margo_init(protocol, MARGO_CLIENT_MODE, 0, 0);
munit_assert_not_null(ctx->mid);

/* The above should have produced a warning, because margo was unable to
* set the desired abt stack settings
*/
munit_assert_int(ctx->log_buffer_pos, >, 0);
printf("global log contents: %s\n", ctx->log_buffer);

margo_finalize(ctx->mid);

return MUNIT_OK;
}

static MunitResult margo_after_abt_set_env(const MunitParameter params[], void* data)
{
struct test_context* ctx = (struct test_context*)data;
char* protocol = "na+sm";
int ret;

/* In this version, we use a margo utility function to set desired
* parameters before calling ABT_init(). This should silence the
* warning.
*/
margo_set_environment(NULL);

ret = ABT_init(0, NULL);
munit_assert_int(ret, ==, 0);

ctx->mid = margo_init(protocol, MARGO_CLIENT_MODE, 0, 0);
munit_assert_not_null(ctx->mid);

/* check if log is silent */
munit_assert_int(ctx->log_buffer_pos, ==, 0);

margo_finalize(ctx->mid);

return MUNIT_OK;
}


static MunitTest tests[]
= {{"/margo-after-abt", margo_after_abt, test_context_setup,
test_context_tear_down, MUNIT_TEST_OPTION_NONE, NULL},
{"/margo-after-abt-set-env", margo_after_abt_set_env, test_context_setup,
test_context_tear_down, MUNIT_TEST_OPTION_NONE, NULL},
{NULL, NULL, NULL, NULL, MUNIT_TEST_OPTION_NONE, NULL}};

static const MunitSuite test_suite
= {"/margo", tests, NULL, 1, MUNIT_SUITE_OPTION_NONE};

int main(int argc, char** argv)
{
return munit_suite_main(&test_suite, NULL, argc, argv);
}

0 comments on commit ff094a5

Please sign in to comment.