Skip to content

Commit

Permalink
Current state of DataSpaces test
Browse files Browse the repository at this point in the history
  • Loading branch information
ilumsden committed Mar 25, 2024
1 parent 9a485ff commit 77bc3eb
Show file tree
Hide file tree
Showing 2 changed files with 179 additions and 83 deletions.
200 changes: 124 additions & 76 deletions tests/dspaces_perf/data_plane/data_plane.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@

#include <array>

#define NS_TO_SECS(ns_var) ((double) ns_var / 1000000000.0)
#define AGG_TIME(time_var, agg_time_var, dtype) MPI_Reduce(&time_var, &agg_time_var, 1, dtype, MPI_SUM, 0, MPI_COMM_WORLD)

FILE* redirect_stdout(const char* filename)
{
size_t dir_len = strlen(args.dspaces_timing_dir.c_str());
Expand Down Expand Up @@ -87,179 +90,224 @@ int create_files_per_server_process(dspaces_client_t* client, bool is_local, boo
return rc;
}

void gen_perf_print(size_t data_len, double total_mdata_time, double total_data_time)
{
double agg_mdata_time = 0;
double agg_data_time = 0;
AGG_TIME(total_mdata_time, agg_mdata_time, MPI_DOUBLE);
AGG_TIME(total_data_time, agg_data_time, MPI_DOUBLE);
if (info.rank == 0) {
double final_mdata_time = agg_mdata_time / info.comm_size;
double final_data_time = agg_data_time / info.comm_size;
printf("[DSPACES_TEST],%10d,%10lu,%10lu,%10.6f,%10.6f,%10.6f,%10.6f\n",
info.comm_size, // Comm Size
data_len*args.number_of_files, // Total I/O per process
args.number_of_files, // Number of mdata ops per process
final_mdata_time, // Metadata Time
final_data_time, // Data Time
data_len*args.number_of_files*info.comm_size/final_mdata_time/1024/1024.0, // Metadata Bandwidth
args.number_of_files*info.comm_size/final_data_time/1024/1024.0 // Data Bandwidth
);
}
}

TEST_CASE("RemoteDataBandwidth", "[files= " + std::to_string(args.number_of_files) +"]"
"[file_size= " + std::to_string(args.request_size*args.iteration) +"]"
"[parallel_req= " + std::to_string(info.comm_size) +"]"
"[num_nodes= " + std::to_string(info.comm_size / args.process_per_node) +"]") {
REQUIRE (pretest() == 0);
dspaces_client_t client = dspaces_CLIENT_NULL;
int rc = dspaces_init_mpi(MPI_COMM_WORLD, &client);
REQUIRE (rc == dspaces_SUCCESS);
REQUIRE (create_files_per_server_process(&client, false, true) == 0);
SECTION("Test Max Bandwidth") {
Timer data_time;
REQUIRE (pretest() == 0);
dspaces_client_t client = dspaces_CLIENT_NULL;
int rc = dspaces_init_mpi(MPI_COMM_WORLD, &client);
REQUIRE (rc == dspaces_SUCCESS);
REQUIRE (create_files_per_server_process(&client, false, true) == 0);
char filename[4096];
gen_var_name(filename, false, true, false, true);
size_t data_len = args.request_size*args.iteration;
char* file_data = NULL;
int ndim = 1;
uint64_t lb = 0;
uint64_t ub = data_len - 1;
FILE* fp = redirect_stdout("remote_data_bandwidth.csv");
REQUIRE(fp != NULL);
printf("rank,var_name,version,mdata_time_ns,data_time_ns\n");
char csv_filename[4096];
// sprintf(csv_filename, "remote_data_bandwidth_%d.csv", info.rank);
// FILE* fp = fopen(csv_filename, "w+");
// FILE* fp = redirect_stdout(csv_filename);
// REQUIRE(fp != NULL);
// printf("rank,var_name,version,data_size,mdata_time_ns,data_time_ns\n");
double total_mdata_time = 0;
double total_data_time = 0;
for (size_t file_idx=0; file_idx < args.number_of_files; ++file_idx) {
long long int mdata_time_ns = 0;
long long int data_time_ns = 0;
data_time.resumeTime();
// Using aget instead of get because dyad_get_data also allocates the buffer
// Also, setting timeout to 0 to prevent blocking for data availability since the data should always be available.
rc = dspaces_aget(client, filename, file_idx, ndim, &lb, &ub, (void**) &file_data, -1);
rc = dspaces_aget(client, filename, file_idx, ndim, &lb, &ub, (void**) &file_data, -1, &mdata_time_ns, &data_time_ns);
data_time.pauseTime();
REQUIRE (rc == dspaces_SUCCESS);
free(file_data);
total_mdata_time += NS_TO_SECS(mdata_time_ns);
total_data_time += NS_TO_SECS(data_time_ns);
}
restore_stdout(fp);
AGGREGATE_TIME(data);
if (info.rank == 0) {
printf("[DSPACES_TEST],%10d,%10lu,%10.6f,%10.6f\n",
info.comm_size, data_len*args.number_of_files,
total_data/info.comm_size, data_len*args.number_of_files*info.comm_size*info.comm_size/total_data/1024/1024.0);
}
// restore_stdout(fp);
// AGGREGATE_TIME(data);
gen_perf_print(data_len, total_mdata_time, total_data_time);
rc = dspaces_fini(client);
REQUIRE (rc == dspaces_SUCCESS);
// fclose(fp);
REQUIRE (posttest() == 0);
}
rc = dspaces_fini(client);
REQUIRE (rc == dspaces_SUCCESS);
REQUIRE (posttest() == 0);
}

TEST_CASE("RemoteDataAggBandwidth", "[files= " + std::to_string(args.number_of_files) +"]"
"[file_size= " + std::to_string(args.request_size*args.iteration) +"]"
"[parallel_req= " + std::to_string(info.comm_size) +"]"
"[num_nodes= " + std::to_string(info.comm_size / args.process_per_node) +"]") {
REQUIRE (pretest() == 0);
dspaces_client_t client = dspaces_CLIENT_NULL;
int rc = dspaces_init_mpi(MPI_COMM_WORLD, &client);
REQUIRE (rc == dspaces_SUCCESS);
REQUIRE (create_files_per_server_process(&client, false, false) == 0);
SECTION("Test Max Bandwidth") {
Timer data_time;
REQUIRE (pretest() == 0);
dspaces_client_t client = dspaces_CLIENT_NULL;
int rc = dspaces_init_mpi(MPI_COMM_WORLD, &client);
REQUIRE (rc == dspaces_SUCCESS);
REQUIRE (create_files_per_server_process(&client, false, false) == 0);
char filename[4096];
gen_var_name(filename, false, false, false, true);
char* file_data = NULL;
size_t data_len = args.request_size*args.iteration;
int ndim = 1;
uint64_t lb = 0;
uint64_t ub = data_len - 1;
FILE* fp = redirect_stdout("remote_data_agg_bandwidth.csv");
REQUIRE(fp != NULL);
printf("rank,var_name,version,mdata_time_ns,data_time_ns\n");
char csv_filename[4096];
// sprintf(csv_filename, "remote_data_agg_bandwidth_%d.csv", info.rank);
// FILE* fp = redirect_stdout(csv_filename);
// FILE* fp = fopen(csv_filename, "w+");
// REQUIRE(fp != NULL);
// printf("rank,var_name,version,data_size,mdata_time_ns,data_time_ns\n");
double total_mdata_time = 0;
double total_data_time = 0;
if (info.rank % args.process_per_node != 0)
usleep (10000);
for (size_t file_idx=0; file_idx < args.number_of_files; ++file_idx) {
long long int mdata_time_ns = 0;
long long int data_time_ns = 0;
data_time.resumeTime();
// Using aget instead of get because dyad_get_data also allocates the buffer
// Unlike the previous test, we set the timeout to -1 so it will do any blocking that it might want to do
// TODO: confirm that the timeout is actually needed to guarantee this type of behavior
rc = dspaces_aget(client, filename, file_idx, ndim, &lb, &ub, (void**) &file_data, -1);
rc = dspaces_aget(client, filename, file_idx, ndim, &lb, &ub, (void**) &file_data, -1, &mdata_time_ns, &data_time_ns);
data_time.pauseTime();
REQUIRE (rc == dspaces_SUCCESS);
free(file_data);
total_mdata_time += NS_TO_SECS(mdata_time_ns);
total_data_time += NS_TO_SECS(data_time_ns);
}
restore_stdout(fp);
AGGREGATE_TIME(data);
if (info.rank == 0) {
printf("[DSPACES_TEST],%10d,%10lu,%10.6f,%10.6f\n",
info.comm_size, data_len*args.number_of_files,
total_data/info.comm_size, data_len*args.number_of_files*info.comm_size*info.comm_size/total_data/1024/1024.0);
}
// restore_stdout(fp);
// AGGREGATE_TIME(data);
gen_perf_print(data_len, total_mdata_time, total_data_time);
rc = dspaces_fini(client);
REQUIRE (rc == dspaces_SUCCESS);
// fclose(fp);
REQUIRE (posttest() == 0);
}
rc = dspaces_fini(client);
REQUIRE (rc == dspaces_SUCCESS);
REQUIRE (posttest() == 0);
}


TEST_CASE("LocalProcessDataBandwidth", "[files= " + std::to_string(args.number_of_files) +"]"
"[file_size= " + std::to_string(args.request_size*args.iteration) +"]"
"[parallel_req= " + std::to_string(info.comm_size) +"]"
"[num_nodes= " + std::to_string(info.comm_size / args.process_per_node) +"]") {
REQUIRE (pretest() == 0);
dspaces_client_t client = dspaces_CLIENT_NULL;
int rc = dspaces_init_mpi(MPI_COMM_WORLD, &client);
REQUIRE (rc == dspaces_SUCCESS);
REQUIRE (create_files_per_server_process(&client, true, true) == 0);
SECTION("Test Max Bandwidth") {
Timer data_time;
REQUIRE (pretest() == 0);
dspaces_client_t client = dspaces_CLIENT_NULL;
int rc = dspaces_init_mpi(MPI_COMM_WORLD, &client);
REQUIRE (rc == dspaces_SUCCESS);
REQUIRE (create_files_per_server_process(&client, true, true) == 0);
char filename[4096];
gen_var_name(filename, true, false, false, false);
size_t data_len = args.request_size*args.iteration;
int ndim = 1;
uint64_t lb = 0;
uint64_t ub = data_len - 1;
char* file_data = NULL;
FILE* fp = redirect_stdout("local_process_data_bandwidth.csv");
REQUIRE(fp != NULL);
printf("rank,var_name,version,mdata_time_ns,data_time_ns\n");
char csv_filename[4096];
// sprintf(csv_filename, "local_process_data_bandwidth_%d.csv", info.rank);
// FILE* fp = redirect_stdout(csv_filename);
// FILE* fp = fopen(csv_filename, "w+");
// REQUIRE(fp != NULL);
// printf("rank,var_name,version,data_size,mdata_time_ns,data_time_ns\n");
double total_mdata_time = 0;
double total_data_time = 0;
for (size_t file_idx=0; file_idx < args.number_of_files; ++file_idx) {
long long int mdata_time_ns = 0;
long long int data_time_ns = 0;
data_time.resumeTime();
// Using aget instead of get because dyad_get_data also allocates the buffer
// Also, setting timeout to 0 to prevent blocking for data availability since the data should always be available.
rc = dspaces_aget(client, filename, file_idx, ndim, &lb, &ub, (void**) &file_data, -1);
rc = dspaces_aget(client, filename, file_idx, ndim, &lb, &ub, (void**) &file_data, -1, &mdata_time_ns, &data_time_ns);
data_time.pauseTime();
REQUIRE (rc == dspaces_SUCCESS);
free(file_data);
total_mdata_time += NS_TO_SECS(mdata_time_ns);
total_data_time += NS_TO_SECS(data_time_ns);
}
restore_stdout(fp);
AGGREGATE_TIME(data);
if (info.rank == 0) {
printf("[DSPACES_TEST],%10d,%10lu,%10.6f,%10.6f\n",
info.comm_size, data_len*args.number_of_files,
total_data/info.comm_size, data_len*args.number_of_files*info.comm_size*info.comm_size/total_data/1024/1024.0);
}
// restore_stdout(fp);
// AGGREGATE_TIME(data);
gen_perf_print(data_len, total_mdata_time, total_data_time);
rc = dspaces_fini(client);
REQUIRE (rc == dspaces_SUCCESS);
// fclose(fp);
REQUIRE (posttest() == 0);
}
rc = dspaces_fini(client);
REQUIRE (rc == dspaces_SUCCESS);
REQUIRE (posttest() == 0);
}


TEST_CASE("LocalNodeDataBandwidth", "[files= " + std::to_string(args.number_of_files) +"]"
"[file_size= " + std::to_string(args.request_size*args.iteration) +"]"
"[parallel_req= " + std::to_string(info.comm_size) +"]"
"[num_nodes= " + std::to_string(info.comm_size / args.process_per_node) +"]") {
REQUIRE (pretest() == 0);
dspaces_client_t client = dspaces_CLIENT_NULL;
int rc = dspaces_init_mpi(MPI_COMM_WORLD, &client);
REQUIRE (rc == dspaces_SUCCESS);
REQUIRE (create_files_per_server_process(&client, true, true) == 0);
SECTION("Test Max Bandwidth") {
Timer data_time;
REQUIRE (pretest() == 0);
dspaces_client_t client = dspaces_CLIENT_NULL;
int rc = dspaces_init_mpi(MPI_COMM_WORLD, &client);
REQUIRE (rc == dspaces_SUCCESS);
REQUIRE (create_files_per_server_process(&client, true, true) == 0);
char filename[4096];
gen_var_name(filename, true, false, true, false);
size_t data_len = args.request_size*args.iteration;
int ndim = 1;
uint64_t lb = 0;
uint64_t ub = data_len - 1;
char* file_data = NULL;
FILE* fp = redirect_stdout("local_node_data_bandwidth.csv");
REQUIRE(fp != NULL);
printf("rank,var_name,version,mdata_time_ns,data_time_ns\n");
char csv_filename[4096];
// sprintf(csv_filename, "local_node_data_bandwidth_%d.csv", info.rank);
// FILE* fp = redirect_stdout(csv_filename);
// FILE* fp = fopen(csv_filename, "w+");
// REQUIRE(fp != NULL);
// printf("rank,var_name,version,data_size,mdata_time_ns,data_time_ns\n");
double total_mdata_time = 0;
double total_data_time = 0;
for (size_t file_idx=0; file_idx < args.number_of_files; ++file_idx) {
long long int mdata_time_ns = 0;
long long int data_time_ns = 0;
data_time.resumeTime();
// Using aget instead of get because dyad_get_data also allocates the buffer
// Also, setting timeout to 0 to prevent blocking for data availability since the data should always be available.
rc = dspaces_aget(client, filename, file_idx, ndim, &lb, &ub, (void**) &file_data, -1);
rc = dspaces_aget(client, filename, file_idx, ndim, &lb, &ub, (void**) &file_data, -1, &mdata_time_ns, &data_time_ns);
data_time.pauseTime();
REQUIRE (rc == dspaces_SUCCESS);
free(file_data);
total_mdata_time += NS_TO_SECS(mdata_time_ns);
total_data_time += NS_TO_SECS(data_time_ns);
}
restore_stdout(fp);
AGGREGATE_TIME(data);
if (info.rank == 0) {
printf("[DSPACES_TEST],%10d,%10lu,%10.6f,%10.6f\n",
info.comm_size, data_len*args.number_of_files,
total_data/info.comm_size, data_len*args.number_of_files*info.comm_size*info.comm_size/total_data/1024/1024.0);
}
// restore_stdout(fp);
// AGGREGATE_TIME(data);
gen_perf_print(data_len, total_mdata_time, total_data_time);
rc = dspaces_fini(client);
REQUIRE (rc == dspaces_SUCCESS);
// fclose(fp);
REQUIRE (posttest() == 0);
}
rc = dspaces_fini(client);
REQUIRE (rc == dspaces_SUCCESS);
REQUIRE (posttest() == 0);
}
Loading

0 comments on commit 77bc3eb

Please sign in to comment.