Skip to content

Commit

Permalink
Merge pull request #40 from NeuwirthLab/latest
Browse files Browse the repository at this point in the history
Add a recorder-filter tool
  • Loading branch information
wangvsa authored Jan 15, 2025
2 parents cf455e5 + e8f7875 commit ed49c54
Show file tree
Hide file tree
Showing 4 changed files with 669 additions and 80 deletions.
9 changes: 7 additions & 2 deletions tools/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ include_directories(${CMAKE_SOURCE_DIR}/include)
#------------------------------------------------------------------------------
# FLAGS for building
#------------------------------------------------------------------------------
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-unused-result -std=c++11")

set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-unused-result -std=c++17")

find_package(MPI REQUIRED)
if(MPI_FOUND)
Expand Down Expand Up @@ -59,6 +60,10 @@ add_executable(recorder-summary recorder-summary.c)
target_link_libraries(recorder-summary reader)
add_dependencies(recorder-summary reader)

add_executable(recorder-filter recorder-filter.cpp)
target_link_libraries(recorder-filter reader recorder)
add_dependencies(recorder-filter reader recorder)


if(RECORDER_ENABLE_PARQUET)
message("-- " "Configuring Parquet tool: TRUE")
Expand Down Expand Up @@ -102,7 +107,7 @@ endif()
# Add Target(s) to CMake Install
#-----------------------------------------------------------------------------
#set(targets reader recorder2text metaops_checker conflict_detector)
set(targets reader recorder2text recorder2timeline conflict-detector recorder-summary)
set(targets reader recorder2text recorder2timeline conflict-detector recorder-summary recorder-filter)
foreach(target ${targets})
install(
TARGETS
Expand Down
32 changes: 16 additions & 16 deletions tools/reader-cst-cfg.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,14 @@ void reader_decode_cst_2_3(RecorderReader *reader, int rank, CST *cst) {
cst->cs_list = malloc(cst->entries * sizeof(CallSignature));

for(int i = 0; i < cst->entries; i++) {
fread(&cst->cs_list[i].terminal_id, sizeof(int), 1, f);
fread(&cst->cs_list[i].key_len, sizeof(int), 1, f);
cst->cs_list[i].key = malloc(cst->cs_list[i].key_len);
fread(cst->cs_list[i].key, 1, cst->cs_list[i].key_len, f);
assert(cst->cs_list[i].terminal_id < cst->entries);
}
fread(&cst->cs_list[i].terminal_id, sizeof(int), 1, f);
fread(&cst->cs_list[i].key_len, sizeof(int), 1, f);

cst->cs_list[i].key = malloc(cst->cs_list[i].key_len);
fread(cst->cs_list[i].key, 1, cst->cs_list[i].key_len, f);

assert(cst->cs_list[i].terminal_id < cst->entries);
}
fclose(f);
}

Expand All @@ -58,7 +58,7 @@ void reader_decode_cfg_2_3(RecorderReader *reader, int rank, CFG* cfg) {
cfg->cfg_head = NULL;
for(int i = 0; i < cfg->rules; i++) {
RuleHash *rule = malloc(sizeof(RuleHash));

fread(&(rule->rule_id), sizeof(int), 1, f);
fread(&(rule->symbols), sizeof(int), 1, f);

Expand All @@ -75,18 +75,18 @@ void reader_decode_cst(int rank, void* buf, CST* cst) {
memcpy(&cst->entries, buf, sizeof(int));
buf += sizeof(int);

// cst->cs_list will be stored in the terminal_id order.
// cst->cs_list will be stored in the terminal_id order.
cst->cs_list = malloc(cst->entries * sizeof(CallSignature));

for(int i = 0; i < cst->entries; i++) {

int terminal_id;
int terminal_id;
memcpy(&terminal_id, buf, sizeof(int));
buf += sizeof(int);
assert(terminal_id < cst->entries);
assert(terminal_id < cst->entries);

CallSignature* cs = &(cst->cs_list[terminal_id]);
cs->terminal_id = terminal_id;
CallSignature* cs = &(cst->cs_list[terminal_id]);
cs->terminal_id = terminal_id;

memcpy(&cs->rank, buf, sizeof(int));
buf += sizeof(int);
Expand Down Expand Up @@ -125,14 +125,14 @@ void reader_decode_cfg(int rank, void* buf, CFG* cfg) {
}

CST* reader_get_cst(RecorderReader* reader, int rank) {
CST* cst = reader->csts[rank];
CST* cst = reader->csts[rank];
return cst;
}

CFG* reader_get_cfg(RecorderReader* reader, int rank) {
CFG* cfg;
if (reader->metadata.interprocess_compression)
cfg = reader->cfgs[reader->ug_ids[rank]];
cfg = reader->cfgs[reader->ug_ids[rank]];
else
cfg = reader->cfgs[rank];
return cfg;
Expand Down
121 changes: 59 additions & 62 deletions tools/reader.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,12 @@ void* read_zlib(FILE* source) {
size_t compressed_size, decompressed_size;
fread(&compressed_size, sizeof(size_t), 1, source);
fread(&decompressed_size, sizeof(size_t), 1, source);
//printf("read zlib compressed size: %ld, decompressed size: %ld\n",
// compressed_size, decompressed_size);
//fflush(stdout);
void* compressed = malloc(compressed_size);
void* decompressed = malloc(decompressed_size);
void* p_decompressed = decompressed;
//printf("compressed size: %ld, decompressed size: %ld\n",
// compressed_size, decompressed_size);

strm.avail_in = fread(compressed, 1, compressed_size, source);
strm.next_in = compressed;
Expand Down Expand Up @@ -127,7 +128,7 @@ void read_metadata(RecorderReader* reader) {
// first 1024 bytes are reserved for metadata block
// the rest of the file stores all supported functions
fseek(fp, 0, SEEK_END);
long fsize = ftell(fp) - 1024;
long fsize = ftell(fp) - 1024;
char buf[fsize];

fseek(fp, 1024, SEEK_SET);
Expand Down Expand Up @@ -155,19 +156,19 @@ void read_metadata(RecorderReader* reader) {
memcpy(reader->func_list[func_id], buf+start_pos, end_pos-start_pos);
start_pos = end_pos+1;
if((reader->mpi_start_idx==-1) &&
(NULL!=strstr(reader->func_list[func_id], "MPI")))
(NULL!=strstr(reader->func_list[func_id], "MPI")))
reader->mpi_start_idx = func_id;

if((reader->hdf5_start_idx==-1) &&
(NULL!=strstr(reader->func_list[func_id], "H5")))
(NULL!=strstr(reader->func_list[func_id], "H5")))
reader->hdf5_start_idx = func_id;

if((reader->pnetcdf_start_idx==-1) &&
(NULL!=strstr(reader->func_list[func_id], "ncmpi")))
(NULL!=strstr(reader->func_list[func_id], "ncmpi")))
reader->pnetcdf_start_idx = func_id;

if((reader->netcdf_start_idx==-1) &&
(NULL!=strstr(reader->func_list[func_id], "nc_")))
(NULL!=strstr(reader->func_list[func_id], "nc_")))
reader->netcdf_start_idx = func_id;

func_id++;
Expand All @@ -193,39 +194,39 @@ void recorder_init_reader(const char* logs_dir, RecorderReader *reader) {

read_metadata(reader);

int nprocs= reader->metadata.total_ranks;
int nprocs= reader->metadata.total_ranks;

reader->ug_ids = malloc(sizeof(int) * nprocs);
reader->ug_ids = malloc(sizeof(int) * nprocs);
reader->ugs = malloc(sizeof(CFG*) * nprocs);
reader->csts = malloc(sizeof(CST*) * nprocs);
reader->cfgs = malloc(sizeof(CFG*) * nprocs);
reader->csts = malloc(sizeof(CST*) * nprocs);
reader->cfgs = malloc(sizeof(CFG*) * nprocs);

if(reader->metadata.interprocess_compression) {
if(reader->metadata.interprocess_compression) {
// a single file for merged csts
// and a single for unique cfgs
void* buf_cst;
void* buf_cfg;

// Read and parse the cst file
char cst_fname[1096] = {0};
sprintf(cst_fname, "%s/recorder.cst", reader->logs_dir);
FILE* cst_file = fopen(cst_fname, "rb");
char cst_fname[1096] = {0};
sprintf(cst_fname, "%s/recorder.cst", reader->logs_dir);
FILE* cst_file = fopen(cst_fname, "rb");
buf_cst = read_zlib(cst_file);
reader->csts[0] = (CST*) malloc(sizeof(CST));
reader_decode_cst(0, buf_cst, reader->csts[0]);
reader_decode_cst(0, buf_cst, reader->csts[0]);
fclose(cst_file);
free(buf_cst);

char ug_metadata_fname[1096] = {0};
sprintf(ug_metadata_fname, "%s/ug.mt", reader->logs_dir);
FILE* f = fopen(ug_metadata_fname, "rb");
fread(reader->ug_ids, sizeof(int), nprocs, f);
fread(&reader->num_ugs, sizeof(int), 1, f);
fclose(f);
char ug_metadata_fname[1096] = {0};
sprintf(ug_metadata_fname, "%s/ug.mt", reader->logs_dir);
FILE* f = fopen(ug_metadata_fname, "rb");
fread(reader->ug_ids, sizeof(int), nprocs, f);
fread(&reader->num_ugs, sizeof(int), 1, f);
fclose(f);

char cfg_fname[1096] = {0};
sprintf(cfg_fname, "%s/ug.cfg", reader->logs_dir);
FILE* cfg_file = fopen(cfg_fname, "rb");
char cfg_fname[1096] = {0};
sprintf(cfg_fname, "%s/ug.cfg", reader->logs_dir);
FILE* cfg_file = fopen(cfg_fname, "rb");
for(int i = 0; i < reader->num_ugs; i++) {
buf_cfg = read_zlib(cfg_file);
reader->ugs[i] = (CFG*) malloc(sizeof(CFG));
Expand All @@ -238,8 +239,8 @@ void recorder_init_reader(const char* logs_dir, RecorderReader *reader) {
reader->csts[rank] = reader->csts[0];
reader->cfgs[rank] = reader->ugs[reader->ug_ids[rank]];
}
} else { // interprocess_compression == false

} else {
for(int rank = 0; rank < nprocs; rank++) {
if (reader->trace_version_major == 2 && reader->trace_version_minor == 3) {
reader->csts[rank] = (CST*) malloc(sizeof(CST));
Expand All @@ -254,7 +255,7 @@ void recorder_init_reader(const char* logs_dir, RecorderReader *reader) {
free(buf_cst);
fclose(cst_file);
}

if (reader->trace_version_major == 2 && reader->trace_version_minor == 3) {
reader->cfgs[rank] = (CFG*) malloc(sizeof(CFG));
reader_decode_cfg_2_3(reader, rank, reader->cfgs[rank]);
Expand All @@ -275,24 +276,24 @@ void recorder_init_reader(const char* logs_dir, RecorderReader *reader) {
void recorder_free_reader(RecorderReader *reader) {
assert(reader);

if(reader->metadata.interprocess_compression) {
reader_free_cst(reader->csts[0]);
free(reader->csts[0]);
for(int i = 0; i < reader->num_ugs; i++) {
reader_free_cfg(reader->ugs[i]);
free(reader->ugs[i]);
}
} else {
for(int rank = 0; rank < reader->metadata.total_ranks; rank++) {
if(reader->metadata.interprocess_compression) {
reader_free_cst(reader->csts[0]);
free(reader->csts[0]);
for(int i = 0; i < reader->num_ugs; i++) {
reader_free_cfg(reader->ugs[i]);
free(reader->ugs[i]);
}
} else {
for(int rank = 0; rank < reader->metadata.total_ranks; rank++) {
reader_free_cst(reader->csts[rank]);
reader_free_cfg(reader->cfgs[rank]);
}
}

free(reader->csts);
free(reader->cfgs);
free(reader->ugs);
free(reader->ug_ids);
free(reader->csts);
free(reader->cfgs);
free(reader->ugs);
free(reader->ug_ids);
for(int i = 0; i < reader->supported_funcs; i++)
free(reader->func_list[i]);
free(reader->func_list);
Expand Down Expand Up @@ -338,8 +339,7 @@ void recorder_free_record(Record* r) {
#define TERMINAL_START_ID 0

void rule_application(RecorderReader* reader, CFG* cfg, CST* cst, int rule_id, uint32_t* ts_buf,
void (*user_op)(Record*, void*), void* user_arg, int free_record) {

void (*user_op)(Record*, void*), void* user_arg, int free_record) {
RuleHash *rule = NULL;
HASH_FIND_INT(cfg->cfg_head, &rule_id, rule);
assert(rule != NULL);
Expand All @@ -350,18 +350,15 @@ void rule_application(RecorderReader* reader, CFG* cfg, CST* cst, int rule_id, u

if (sym_val >= TERMINAL_START_ID) { // terminal
for(int j = 0; j < sym_exp; j++) {

Record* record = reader_cs_to_record(&(cst->cs_list[sym_val]));

// Fill in timestamps
// update timestamps
uint32_t ts[2] = {ts_buf[0], ts_buf[1]};
ts_buf += 2;
record->tstart = ts[0] * reader->metadata.time_resolution + reader->prev_tstart;
record->tend = ts[1] * reader->metadata.time_resolution + reader->prev_tstart;
reader->prev_tstart = record->tstart;

user_op(record, user_arg);

if(free_record)
recorder_free_record(record);
}
Expand Down Expand Up @@ -425,10 +422,10 @@ uint32_t* read_timestamp_file(RecorderReader* reader, int rank) {


void decode_records_core(RecorderReader *reader, int rank,
void (*user_op)(Record*, void*), void* user_arg, bool free_record) {
void (*user_op)(Record*, void*), void* user_arg, bool free_record) {

CST* cst = reader_get_cst(reader, rank);
CFG* cfg = reader_get_cfg(reader, rank);
CST* cst = reader_get_cst(reader, rank);
CFG* cfg = reader_get_cfg(reader, rank);

reader->prev_tstart = 0.0;

Expand All @@ -442,12 +439,12 @@ void decode_records_core(RecorderReader *reader, int rank,
// Decode all records for one rank
// one record at a time
void recorder_decode_records(RecorderReader *reader, int rank,
void (*user_op)(Record*, void*), void* user_arg) {
void (*user_op)(Record*, void*), void* user_arg) {
decode_records_core(reader, rank, user_op, user_arg, true);
}

void recorder_decode_records2(RecorderReader *reader, int rank,
void (*user_op)(Record*, void*), void* user_arg) {
void (*user_op)(Record*, void*), void* user_arg) {
decode_records_core(reader, rank, user_op, user_arg, false);
}

Expand Down Expand Up @@ -569,7 +566,7 @@ int update_mpi_src_tag(VerifyIORecord* vir, Record* r, int src_idx, int tag_idx,
int src = atoi(r->args[src_idx]);
int tag = atoi(r->args[tag_idx]);
char* status = r->args[status_idx];

if(src == RECORDER_MPI_ANY_SOURCE) {
char* p = strstr(status, "_");
if (p == NULL) {
Expand Down Expand Up @@ -633,11 +630,11 @@ int create_verifyio_record(RecorderReader* reader, Record* r, VerifyIORecord* vi
}
} else if (func_type == RECORDER_MPI) {
if (strcmp(func_name, "MPI_Send") == 0 ||
strcmp(func_name, "MPI_Ssend") == 0) {
strcmp(func_name, "MPI_Ssend") == 0) {
// dst, tag, comm
verifyio_record_copy_args(vir, r, 3, 3, 4, 5);
} else if (strcmp(func_name, "MPI_Issend") == 0 ||
strcmp(func_name, "MPI_Isend") == 0) {
strcmp(func_name, "MPI_Isend") == 0) {
// dst, tag, comm, req
verifyio_record_copy_args(vir, r, 4, 3, 4, 5, 6);
} else if (strcmp(func_name, "MPI_Recv") == 0) {
Expand Down Expand Up @@ -681,7 +678,7 @@ int create_verifyio_record(RecorderReader* reader, Record* r, VerifyIORecord* vi
// comm
verifyio_record_copy_args(vir, r, 1, 6);
} else if (strcmp(func_name, "MPI_Allreduce") == 0 ||
strcmp(func_name, "MPI_Reduce_scatter") == 0) {
strcmp(func_name, "MPI_Reduce_scatter") == 0) {
// comm
verifyio_record_copy_args(vir, r, 1, 5);
} else if (strcmp(func_name, "MPI_Allgatherv") == 0) {
Expand All @@ -700,7 +697,7 @@ int create_verifyio_record(RecorderReader* reader, Record* r, VerifyIORecord* vi
// comm, local_rank
verifyio_record_copy_args(vir, r, 2, 5, 6);
} else if ((strcmp(func_name, "MPI_Cart_sub") == 0) ||
(strcmp(func_name, "MPI_Comm_create") == 0)) {
(strcmp(func_name, "MPI_Comm_create") == 0)) {
// comm, local_rank
verifyio_record_copy_args(vir, r, 2, 2, 3);
} else if ((strcmp(func_name, "MPI_Waitall")) == 0) {
Expand All @@ -720,14 +717,14 @@ int create_verifyio_record(RecorderReader* reader, Record* r, VerifyIORecord* vi
// only keep needed *write* *read* POSIX calls
// additionally, MPI-IO often uses fcntl to lock files.
if (strstr(func_name, "write") ||
strstr(func_name, "read") ||
strstr(func_name, "fcntl")) {
strstr(func_name, "read") ||
strstr(func_name, "fcntl")) {
verifyio_record_copy_args(vir, r, 1, 0);
} else if (strstr(func_name, "fsync") ||
strstr(func_name, "open") ||
strstr(func_name, "close") ||
strstr(func_name, "fopen") ||
strstr(func_name, "fclose")) {
strstr(func_name, "open") ||
strstr(func_name, "close") ||
strstr(func_name, "fopen") ||
strstr(func_name, "fclose")) {
verifyio_record_copy_args(vir, r, 1, 0);
//included = 0;
}
Expand Down
Loading

0 comments on commit ed49c54

Please sign in to comment.