diff --git a/CMakeLists.txt b/CMakeLists.txt index 0e328a043..06b7fa0b8 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -170,6 +170,9 @@ option(PARSEC_DIST_WITH_MPI if(PARSEC_DIST_WITH_MPI AND 0) message(FATAL_ERROR "PARSEC_DIST_WITH_MPI and PARSEC_DIST_WITH_OTHER are mutually exclusive, please select only one") endif() +option(PARSEC_MPI_IS_GPU_AWARE + "Build PaRSEC assuming the MPI library is GPU-aware, aka. can move data directly to and from GPU memory.\ + As of today (mid 2024) while most MPI support such an option, they require a single process per GPU" ON) option(PARSEC_DIST_THREAD "Use an extra thread to progress the data movements" ON) option(PARSEC_DIST_PRIORITIES diff --git a/configure b/configure index 3bbe6df04..63387743c 100755 --- a/configure +++ b/configure @@ -103,7 +103,8 @@ cat <:OTF2::OTF2> $<$:MPI::MPI_C> $<$:CUDA::cudart> + $<$:cuda> $<$:hip::host> ${EXTRA_LIBS} INTERFACE diff --git a/parsec/arena.c b/parsec/arena.c index ca7def080..3fc19f561 100644 --- a/parsec/arena.c +++ b/parsec/arena.c @@ -235,43 +235,118 @@ int parsec_arena_allocate_device_private(parsec_data_copy_t *copy, return PARSEC_SUCCESS; } -parsec_data_copy_t *parsec_arena_get_copy(parsec_arena_t *arena, - size_t count, int device, - parsec_datatype_t dtt) +#include "parsec/utils/zone_malloc.h" +#include "mca/device/device_gpu.h" + +#if defined(PARSEC_DEBUG) +static int64_t parsec_countable_incoming_message = 0xF000000000000000; +#endif /* defined(PARSEC_DEBUG) */ + +static inline parsec_data_copy_t * +parsec_arena_internal_copy_new(parsec_arena_t *arena, + parsec_data_t *data, + size_t count, int device, + parsec_datatype_t dtt) { - parsec_data_t *data; - parsec_data_copy_t *copy; - int rc; - - - data = parsec_data_new(); + parsec_data_copy_t *copy = NULL; + parsec_data_t* ldata = data; if( NULL == data ) { + ldata = parsec_data_new(); + if( NULL == ldata ) { + return NULL; + } +#if defined(PARSEC_DEBUG) + /* Name the data with a default key to facilitate debuging */ + ldata->key = (uint64_t)parsec_atomic_fetch_inc_int64(&parsec_countable_incoming_message); + ldata->key |= ((uint64_t)device) << 56; +#endif /* defined(PARSEC_DEBUG) */ + } + if( 0 == device ) { + copy = parsec_data_copy_new(ldata, device, dtt, + PARSEC_DATA_FLAG_PARSEC_OWNED | PARSEC_DATA_FLAG_PARSEC_MANAGED | PARSEC_DATA_FLAG_ARENA); + if (NULL == copy) { + goto free_and_return; + } + int rc = parsec_arena_allocate_device_private(copy, arena, count, device, dtt); + if (PARSEC_SUCCESS != rc) { + goto free_and_return; + } + return copy; + } + /** + * This part is not really nice, it breaks the separation between devices, and how their memory is + * managed. But, it should give nice perfromance improvements if the communication layer is + * capable of sending or receiving data directly to and from the accelerator memory. The only drawback + * is that once the GPU memory is full, this will fail, so the soeftware will fall back to the + * prior behavior, going through the CPU memory. + * + * The zone deallocation is not symmetric, it will happen in the GPU management, when the data copies + * are released from the different LRU lists. + */ + parsec_device_gpu_module_t *gpu_device = (parsec_device_gpu_module_t *)parsec_mca_device_get(device); + if (NULL == gpu_device) { return NULL; } + size_t size = count * arena->elem_size; + void* device_private = zone_malloc(gpu_device->memory, size); + if( NULL == device_private ) { + PARSEC_DEBUG_VERBOSE(10, parsec_debug_output, "Arena:\tallocate data copy on device %d of size %zu from zone %p failed (out of memory)\n", + device, size, (void *)copy->arena_chunk); + goto free_and_return; + } + copy = parsec_data_copy_new(ldata, device, dtt, + PARSEC_DATA_FLAG_PARSEC_OWNED | PARSEC_DATA_FLAG_PARSEC_MANAGED); + if (NULL == copy) { + PARSEC_DEBUG_VERBOSE(10, parsec_debug_output, "Arena:\tallocate data copy on device %d of size %zu from zone %p failed to allocate copy (out of memory)\n", + device, size, (void *)copy->arena_chunk); + zone_free(gpu_device->memory, device_private); + goto free_and_return; + } + copy->dtt = dtt; + copy->device_private = device_private; + copy->arena_chunk = (parsec_arena_chunk_t*)gpu_device->memory; + PARSEC_DEBUG_VERBOSE(10, parsec_debug_output, "Arena:\tallocate data copy on device %d of size %zu from zone %p, " + "data ptr %p", + device, size, (void*)copy->arena_chunk, (void*)copy->device_private); + copy->version = 0; + copy->coherency_state = PARSEC_DATA_COHERENCY_INVALID; + copy->original->owner_device = device; + copy->original->preferred_device = device; + return copy; + free_and_return: + if( NULL != copy ) + PARSEC_OBJ_RELEASE(copy); + if( NULL == data) + PARSEC_OBJ_RELEASE(ldata); /* release the locally allocated data */ + return NULL; +} - copy = parsec_data_copy_new( data, device, dtt, - PARSEC_DATA_FLAG_ARENA | - PARSEC_DATA_FLAG_PARSEC_OWNED | - PARSEC_DATA_FLAG_PARSEC_MANAGED); +parsec_data_copy_t * +parsec_arena_get_new_copy(parsec_arena_t *arena, + size_t count, int device, + parsec_datatype_t dtt) +{ + parsec_data_copy_t *dev0_copy, *copy; - if(NULL == copy) { - PARSEC_OBJ_RELEASE(data); + dev0_copy = parsec_arena_internal_copy_new(arena, NULL, count, 0 /* first allocate the copy on the device 0 */, dtt); + if( NULL == dev0_copy ) { return NULL; } + dev0_copy->coherency_state = PARSEC_DATA_COHERENCY_INVALID; + dev0_copy->version = 0; /* start from somewhere */ + if( 0 == device ) { + return dev0_copy; + } - rc = parsec_arena_allocate_device_private(copy, arena, count, device, dtt); - + copy = parsec_arena_internal_copy_new(arena, dev0_copy->original, count, device, dtt); + if( NULL == copy ) { + copy = dev0_copy; /* return the main memory data copy */ + } /* This data is going to be released once all copies are released * It does not exist without at least a copy, and we don't give the * pointer to the user, so we must remove our retain from it */ - PARSEC_OBJ_RELEASE(data); - - if( PARSEC_SUCCESS != rc ) { - PARSEC_OBJ_RELEASE(copy); - return NULL; - } - + PARSEC_OBJ_RELEASE(dev0_copy->original); return copy; } diff --git a/parsec/arena.h b/parsec/arena.h index acf8697a0..17783b307 100644 --- a/parsec/arena.h +++ b/parsec/arena.h @@ -133,15 +133,15 @@ int parsec_arena_construct_ex(parsec_arena_t* arena, * enough resource to allocate a new data copy of this type. */ -parsec_data_copy_t *parsec_arena_get_copy(parsec_arena_t *arena, - size_t count, int device, - parsec_datatype_t dtt); +parsec_data_copy_t *parsec_arena_get_new_copy(parsec_arena_t *arena, + size_t count, int device, + parsec_datatype_t dtt); /** * @brief Allocates memory for a given data copy. This is a function used by * DSLs to set the memory associated with a data copy they have created. - * It is also used by parsec_arena_get_copy. - * + * It is also used by parsec_arena_get_new_copy. + * * @param copy the (empty) data copy to allocate memory for. NB: the @p original * field of this data copy must be set. The operation overwrites the device * dtt and count of this data copy, as well as the device_private pointer. diff --git a/parsec/class/info.c b/parsec/class/info.c index 789f77f0d..4afbd6d38 100644 --- a/parsec/class/info.c +++ b/parsec/class/info.c @@ -313,6 +313,8 @@ void *parsec_info_get(parsec_info_object_array_t *oa, parsec_info_id_t iid) if(NULL == ie->constructor) return ret; nio = ie->constructor(oa->cons_obj, ie->cons_data); + if( NULL == nio ) + return ret; ret = parsec_info_test_and_set(oa, iid, nio, NULL); if(ret != nio && NULL != ie->destructor) { ie->destructor(nio, ie->des_data); diff --git a/parsec/data.c b/parsec/data.c index 8dffaa027..2f93d3820 100644 --- a/parsec/data.c +++ b/parsec/data.c @@ -1,5 +1,5 @@ /* - * Copyright (c) 2012-2024 The University of Tennessee and The University + * Copyright (c) 2012-2025 The University of Tennessee and The University * of Tennessee Research Foundation. All rights * reserved. */ @@ -15,6 +15,7 @@ #include "parsec/sys/atomic.h" #include "parsec/remote_dep.h" #include "parsec/parsec_internal.h" +#include "parsec/utils/zone_malloc.h" static parsec_lifo_t parsec_data_lifo; static parsec_lifo_t parsec_data_copies_lifo; @@ -64,6 +65,7 @@ static void parsec_data_construct(parsec_data_t* obj ) obj->preferred_device = -1; obj->key = 0; obj->nb_elts = 0; + obj->nb_copies = 0; for( uint32_t i = 0; i < parsec_nb_devices; obj->device_copies[i] = NULL, i++ ); obj->dc = NULL; @@ -98,11 +100,12 @@ static void parsec_data_destruct(parsec_data_t* obj ) * GPU copies are normally stored in LRU lists, and must be * destroyed by the release list to free the memory on the device */ - PARSEC_OBJ_RELEASE( copy ); + PARSEC_DATA_COPY_RELEASE(copy); } } assert(NULL == obj->device_copies[i]); } + assert(0 == obj->nb_copies); } PARSEC_OBJ_CLASS_INSTANCE(parsec_data_t, parsec_object_t, @@ -160,8 +163,8 @@ void parsec_data_delete(parsec_data_t* data) inline int parsec_data_copy_attach(parsec_data_t* data, - parsec_data_copy_t* copy, - uint8_t device) + parsec_data_copy_t* copy, + uint8_t device) { assert(NULL == copy->original); assert(NULL == copy->older); @@ -174,6 +177,7 @@ parsec_data_copy_attach(parsec_data_t* data, copy->older = NULL; return PARSEC_ERROR; } + parsec_atomic_fetch_add_int32(&data->nb_copies, 1); PARSEC_OBJ_RETAIN(data); return PARSEC_SUCCESS; } @@ -191,6 +195,7 @@ int parsec_data_copy_detach(parsec_data_t* data, return PARSEC_ERR_NOT_FOUND; } data->device_copies[device] = copy->older; + parsec_atomic_fetch_add_int32(&data->nb_copies, -1); copy->original = NULL; copy->older = NULL; @@ -220,7 +225,7 @@ parsec_data_copy_t* parsec_data_copy_new(parsec_data_t* data, uint8_t device, } copy->flags = flags; if( PARSEC_SUCCESS != parsec_data_copy_attach(data, copy, device) ) { - PARSEC_OBJ_RELEASE(copy); + PARSEC_DATA_COPY_RELEASE(copy); return NULL; } copy->dtt = dtt; @@ -329,6 +334,12 @@ int parsec_data_start_transfer_ownership_to_copy(parsec_data_t* data, copy = data->device_copies[device]; assert( NULL != copy ); + if( valid_copy == device ) { + PARSEC_DEBUG_VERBOSE(10, parsec_debug_output, + "DEV[%d]: already has ownership of data %p to copy %p in mode %d", + device, data, copy, access_mode); + goto bookkeeping; + } PARSEC_DEBUG_VERBOSE(10, parsec_debug_output, "DEV[%d]: start transfer ownership of data %p to copy %p in mode %d", device, data, copy, access_mode); @@ -416,6 +427,7 @@ int parsec_data_start_transfer_ownership_to_copy(parsec_data_t* data, } } + bookkeeping: if( PARSEC_FLOW_ACCESS_READ & access_mode ) { copy->readers++; } @@ -434,40 +446,53 @@ int parsec_data_start_transfer_ownership_to_copy(parsec_data_t* data, return valid_copy; } -static char dump_coherency_codex(parsec_data_coherency_t state) -{ - if( PARSEC_DATA_COHERENCY_INVALID == state ) return 'I'; - if( PARSEC_DATA_COHERENCY_OWNED == state ) return 'O'; - if( PARSEC_DATA_COHERENCY_EXCLUSIVE == state ) return 'E'; - if( PARSEC_DATA_COHERENCY_SHARED == state ) return 'S'; - return 'X'; -} - -void parsec_dump_data_copy(parsec_data_copy_t* copy) +void parsec_data_copy_dump(parsec_data_copy_t* copy) { - parsec_debug_verbose(0, 0, "- [%d]: copy %p state %c readers %d version %u\n", - (int)copy->device_index, copy, dump_coherency_codex(copy->coherency_state), copy->readers, copy->version); + char *tranfer = "---", flags[] = "----", *coherency = "undef"; + switch(copy->data_transfer_status) { + case PARSEC_DATA_STATUS_NOT_TRANSFER: tranfer = "no"; break; + case PARSEC_DATA_STATUS_UNDER_TRANSFER: tranfer = "yes"; break; + case PARSEC_DATA_STATUS_COMPLETE_TRANSFER: tranfer = "no"; break; + } + if (copy->flags & PARSEC_DATA_FLAG_ARENA) flags[0] = 'A'; + if (copy->flags & PARSEC_DATA_FLAG_TRANSIT) flags[1] = 'T'; + if (copy->flags & PARSEC_DATA_FLAG_PARSEC_MANAGED) flags[2] = 'M'; + if (copy->flags & PARSEC_DATA_FLAG_PARSEC_OWNED) flags[3] = 'O'; + + if( PARSEC_DATA_COHERENCY_INVALID == copy->coherency_state ) coherency = "invalid"; + if( PARSEC_DATA_COHERENCY_OWNED == copy->coherency_state ) coherency = "owned"; + if( PARSEC_DATA_COHERENCY_EXCLUSIVE == copy->coherency_state ) coherency = "exclusive"; + if( PARSEC_DATA_COHERENCY_SHARED == copy->coherency_state ) coherency = "shared"; + + parsec_debug_verbose(0, 0, "%s [%d]: copy %p [ref %d] coherency %s readers %d version %u transit %s flags %s\n" + " older %p orig %p [%llx] arena %p dev_priv %p\n", + ((NULL != copy->original) && (copy->original->owner_device == copy->device_index)) ? "*" : " ", + (int)copy->device_index, copy, copy->super.super.obj_reference_count, coherency, copy->readers, copy->version, tranfer, flags, + (void *)copy->older, (void *)copy->original, + (NULL != copy->original) ? (unsigned long)copy->original->key : (unsigned long)-1, (void *)copy->arena_chunk, copy->device_private); } -void parsec_dump_data(parsec_data_t* data) +void parsec_data_dump(parsec_data_t* data) { - parsec_debug_verbose(0, 0, "data %p key %lu owner %d\n", data, data->key, data->owner_device); + parsec_debug_verbose(0, 0, "data %p [ref %d] key %lu owner dev %d pref dev %d copies %d dc %p [# elems %zu]\n", + data, data->super.obj_reference_count, data->key, data->owner_device, data->preferred_device, data->nb_copies, + (void*)data->dc, data->nb_elts); for( uint32_t i = 0; i < parsec_nb_devices; i++ ) { if( NULL != data->device_copies[i]) - parsec_dump_data_copy(data->device_copies[i]); + parsec_data_copy_dump(data->device_copies[i]); } } parsec_data_copy_t* parsec_data_get_copy(parsec_data_t* data, uint32_t device) { - return PARSEC_DATA_GET_COPY(data, device); + return PARSEC_DATA_GET_COPY(data, device); } void parsec_data_copy_release(parsec_data_copy_t* copy) { - /* TODO: Move the copy back to the CPU before destroying it */ + /* TODO: Move the copy back to the CPU before destroying it */ PARSEC_DATA_COPY_RELEASE(copy); } @@ -508,7 +533,7 @@ parsec_data_create( parsec_data_t **holder, if( !parsec_atomic_cas_ptr(holder, NULL, data) ) { parsec_data_copy_detach(data, data_copy, 0); - PARSEC_OBJ_RELEASE(data_copy); + PARSEC_DATA_COPY_RELEASE(data_copy); data = *holder; } } else { @@ -559,3 +584,42 @@ parsec_data_destroy( parsec_data_t *data ) #endif PARSEC_OBJ_RELEASE(data); } + +#include "parsec/utils/debug.h" + +int parsec_data_release_self_contained_data(parsec_data_t *data) +{ + int32_t nb_copies = data->nb_copies; + if (data->super.obj_reference_count != nb_copies) return 0; + parsec_data_copy_t *copy = NULL; + /* this data is only referenced by it's own copies. If these copies are also only referenced by + * data, then we can release them all. + */ + for( uint32_t i = 0; i < parsec_nb_devices; i++) { + if (NULL == (copy = data->device_copies[i])) continue; + if( copy->super.super.obj_reference_count > 1 || copy->readers > 0 ) + return 0; + } + PARSEC_DEBUG_VERBOSE(90, parsec_debug_output, "Release copy %p from self-contained data %p", copy, data); + for( uint32_t i = 0; i < parsec_nb_devices; i++) { + if (NULL == (copy = data->device_copies[i])) continue; + assert(1 == copy->super.super.obj_reference_count && 0 == copy->readers); + if( 0 == copy->device_index ) { + PARSEC_OBJ_RELEASE(copy); + assert(NULL == copy); + } else { + /* Do not release data copies that do not belong to the CPU or really bad things will happen. + * Only the device manager can release these copies, the best we can do here is to detach them + * from the data and eventually release their memory. + */ + parsec_data_copy_detach(data, copy, copy->device_index); + zone_free((zone_malloc_t *)copy->arena_chunk, copy->device_private); + copy->device_private = NULL; + copy->arena_chunk = NULL; + } + if (0 == --nb_copies) return 1; /* we deallocate the data_t during the copy_release/detach of the last copy, so we need to stop now */ + } + parsec_warning("Release copy %p from self-contained data %p had %d more copies than present in device_copies", copy, data, nb_copies); + return 0; +} + diff --git a/parsec/data.h b/parsec/data.h index e94d56df5..d8fca1a52 100644 --- a/parsec/data.h +++ b/parsec/data.h @@ -31,9 +31,9 @@ typedef uint8_t parsec_data_coherency_t; #define PARSEC_DATA_COHERENCY_SHARED ((parsec_data_coherency_t)0x4) typedef uint8_t parsec_data_status_t; -#define PARSEC_DATA_STATUS_NOT_TRANSFER ((parsec_data_coherency_t)0x0) -#define PARSEC_DATA_STATUS_UNDER_TRANSFER ((parsec_data_coherency_t)0x1) -#define PARSEC_DATA_STATUS_COMPLETE_TRANSFER ((parsec_data_coherency_t)0x2) +#define PARSEC_DATA_STATUS_NOT_TRANSFER ((parsec_data_status_t)0x0) +#define PARSEC_DATA_STATUS_UNDER_TRANSFER ((parsec_data_status_t)0x1) +#define PARSEC_DATA_STATUS_COMPLETE_TRANSFER ((parsec_data_status_t)0x2) /** * Data copies have three levels of 'ownership': * - a data copy can be owned and managed by PaRSEC. @@ -124,8 +124,8 @@ PARSEC_DECLSPEC void parsec_data_end_transfer_ownership_to_copy(parsec_data_t* data, uint8_t device, uint8_t access_mode); -PARSEC_DECLSPEC void parsec_dump_data_copy(parsec_data_copy_t* copy); -PARSEC_DECLSPEC void parsec_dump_data(parsec_data_t* copy); +PARSEC_DECLSPEC void parsec_data_copy_dump(parsec_data_copy_t *copy); +PARSEC_DECLSPEC void parsec_data_dump(parsec_data_t* copy); PARSEC_DECLSPEC parsec_data_t * parsec_data_create( parsec_data_t **holder, diff --git a/parsec/data_dist/matrix/map_operator.c b/parsec/data_dist/matrix/map_operator.c index fe7846f8f..4bbafcab4 100644 --- a/parsec/data_dist/matrix/map_operator.c +++ b/parsec/data_dist/matrix/map_operator.c @@ -297,13 +297,13 @@ static int data_lookup(parsec_execution_stream_t *es, this_task->data[0].data_in = parsec_data_get_copy(src(m,n), 0); this_task->data[0].source_repo_entry = NULL; this_task->data[0].data_out = NULL; - PARSEC_OBJ_RETAIN(this_task->data[0].data_in); + PARSEC_DATA_COPY_RETAIN(this_task->data[0].data_in); } if( NULL != __tp->dest ) { this_task->data[1].data_in = parsec_data_get_copy(dest(m,n), 0); this_task->data[1].source_repo_entry = NULL; this_task->data[1].data_out = this_task->data[1].data_in; - PARSEC_OBJ_RETAIN(this_task->data[1].data_in); + PARSEC_DATA_COPY_RETAIN(this_task->data[1].data_in); } return PARSEC_HOOK_RETURN_DONE; } diff --git a/parsec/data_dist/matrix/two_dim_rectangle_cyclic.h b/parsec/data_dist/matrix/two_dim_rectangle_cyclic.h index 6dddc3ebd..a0c718b1f 100644 --- a/parsec/data_dist/matrix/two_dim_rectangle_cyclic.h +++ b/parsec/data_dist/matrix/two_dim_rectangle_cyclic.h @@ -45,7 +45,6 @@ typedef struct parsec_matrix_block_cyclic { * @param dc matrix description structure, already allocated, that will be initialize * @param mtype type of data used for this matrix * @param storage type of storage of data - * @param nodes number of nodes * @param myrank rank of the local node (as of mpi rank) * @param mb number of row in a tile * @param nb number of column in a tile diff --git a/parsec/data_internal.h b/parsec/data_internal.h index 4b4a396d2..5f89139f1 100644 --- a/parsec/data_internal.h +++ b/parsec/data_internal.h @@ -19,6 +19,7 @@ #include "parsec/arena.h" #include "parsec/data.h" #include "parsec/class/parsec_future.h" +#include "parsec/utils/debug.h" /** * This structure is the keeper of all the information regarding @@ -30,11 +31,12 @@ struct parsec_data_s { parsec_atomic_lock_t lock; - parsec_data_key_t key; int8_t owner_device; int8_t preferred_device; /* Hint set from the MEMADVICE device API to define on * which device this data should be modified RW when there * are multiple choices. -1 means no preference. */ + int32_t nb_copies; /* How many valid copies are attached to this data */ + parsec_data_key_t key; struct parsec_data_collection_s* dc; size_t nb_elts; /* size in bytes of the memory layout */ struct parsec_data_copy_s *device_copies[]; /* this array allocated according to the number of devices @@ -85,22 +87,50 @@ PARSEC_DECLSPEC PARSEC_OBJ_CLASS_DECLARATION(parsec_data_copy_t); #define PARSEC_DATA_GET_COPY(DATA, DEVID) \ ((DATA)->device_copies[(DEVID)]) + +int parsec_data_release_self_contained_data(parsec_data_t* data); /** * Decrease the refcount of this copy of the data. If the refcount reach * 0 the upper level is in charge of cleaning up and releasing all content * of the copy. */ -#define PARSEC_DATA_COPY_RELEASE(DATA) \ +#if 0 +#define PARSEC_DATA_COPY_RELEASE(COPY) \ do { \ - PARSEC_DEBUG_VERBOSE(20, parsec_debug_output, "Release data copy %p at %s:%d", (DATA), __FILE__, __LINE__); \ - PARSEC_OBJ_RELEASE((DATA)); \ + PARSEC_DEBUG_VERBOSE(20, parsec_debug_output, "Release data copy %p at %s:%d", (COPY), __FILE__, __LINE__); \ + PARSEC_OBJ_RELEASE((COPY)); \ + if( (NULL != (COPY)) && (NULL != ((COPY)->original)) ) parsec_data_release_self_contained_data((COPY)->original); \ } while(0) +#define PARSEC_DATA_COPY_RETAIN(COPY) \ + do { \ + PARSEC_DEBUG_VERBOSE(20, parsec_debug_output, "Retain data copy %p at %s:%d", (COPY), __FILE__, __LINE__); \ + PARSEC_OBJ_RETAIN((COPY)); \ + } while(0) +#else +static inline void __parsec_data_copy_release(parsec_data_copy_t** copy) +{ + PARSEC_DEBUG_VERBOSE(20, parsec_debug_output, "Release data copy %p at %s:%d", *copy, __FILE__, __LINE__); + PARSEC_OBJ_RELEASE(*copy); + if ((NULL != *copy) && (NULL != (*copy)->original) && (1 == (*copy)->super.super.obj_reference_count)) + parsec_data_release_self_contained_data((*copy)->original); +} +#define PARSEC_DATA_COPY_RELEASE(COPY) \ + __parsec_data_copy_release(&(COPY)) + +static inline void __parsec_data_copy_retain(parsec_data_copy_t* copy) +{ + PARSEC_DEBUG_VERBOSE(20, parsec_debug_output, "Retain data copy %p at %s:%d", copy, __FILE__, __LINE__); + PARSEC_OBJ_RETAIN(copy); +} +#define PARSEC_DATA_COPY_RETAIN(COPY) \ + __parsec_data_copy_retain((COPY)) +#endif /* 0 */ /** * Return the device private pointer for a datacopy. */ -#define PARSEC_DATA_COPY_GET_PTR(DATA) \ - ((DATA) ? (DATA)->device_private : NULL) +#define PARSEC_DATA_COPY_GET_PTR(COPY) \ + ((COPY) ? (COPY)->device_private : NULL) /** @} */ diff --git a/parsec/include/parsec/parsec_options.h.in b/parsec/include/parsec/parsec_options.h.in index 85af2db8e..9c397c5fb 100644 --- a/parsec/include/parsec/parsec_options.h.in +++ b/parsec/include/parsec/parsec_options.h.in @@ -71,6 +71,7 @@ /* Communication engine */ #cmakedefine PARSEC_DIST_WITH_MPI +#cmakedefine PARSEC_MPI_IS_GPU_AWARE #cmakedefine PARSEC_DIST_THREAD #cmakedefine PARSEC_DIST_PRIORITIES #cmakedefine PARSEC_DIST_COLLECTIVES diff --git a/parsec/include/parsec/sys/atomic-c11.h b/parsec/include/parsec/sys/atomic-c11.h index 000825c89..c507d4c00 100644 --- a/parsec/include/parsec/sys/atomic-c11.h +++ b/parsec/include/parsec/sys/atomic-c11.h @@ -191,7 +191,7 @@ static_assert(sizeof(volatile int) >= sizeof(volatile atomic_flag), "The type size for atomic_flag is larger than expected. Please report this error to PaRSEC developpers." "You may compile without C11 atomic support (-DSUPPORT_C11=OFF in cmake) to fallback on other atomic types."); -# define PARSEC_ATOMIC_HAS_ATOMIC_INIT +#define PARSEC_ATOMIC_HAS_ATOMIC_INIT ATOMIC_STATIC_INLINE void parsec_atomic_lock_init( parsec_atomic_lock_t* atomic_lock ) { diff --git a/parsec/interfaces/dtd/insert_function_internal.h b/parsec/interfaces/dtd/insert_function_internal.h index 38da01ad0..1b38a6c56 100644 --- a/parsec/interfaces/dtd/insert_function_internal.h +++ b/parsec/interfaces/dtd/insert_function_internal.h @@ -417,16 +417,16 @@ void parsec_dtd_fini(); static inline void -parsec_dtd_retain_data_copy( parsec_data_copy_t *data ) +parsec_dtd_retain_data_copy( parsec_data_copy_t *copy ) { - assert( data->super.super.obj_reference_count >= 1 ); - PARSEC_OBJ_RETAIN(data); + assert( copy->super.super.obj_reference_count >= 1 ); + PARSEC_DATA_COPY_RETAIN(copy); } static inline void -parsec_dtd_release_data_copy( parsec_data_copy_t *data ) +parsec_dtd_release_data_copy(parsec_data_copy_t *copy) { - PARSEC_OBJ_RELEASE(data); + PARSEC_DATA_COPY_RELEASE(copy); } diff --git a/parsec/interfaces/ptg/ptg-compiler/jdf2c.c b/parsec/interfaces/ptg/ptg-compiler/jdf2c.c index 3c81a0158..b78857b22 100644 --- a/parsec/interfaces/ptg/ptg-compiler/jdf2c.c +++ b/parsec/interfaces/ptg/ptg-compiler/jdf2c.c @@ -673,6 +673,17 @@ static char* dump_local_assignments( void** elem, void* arg ) if( dos > 0 ) { string_arena_init(info->sa); string_arena_add_string(info->sa, "const int %s = %s%s.value;", def->name, info->holder, def->name); +#if 0 + jdf_expr_t* type_str = jdf_find_property( def->properties, "type", NULL ); + if( NULL == type_str ) { + string_arena_add_string(info->sa, "const int %s = %s%s.value;", def->name, info->holder, def->name); + } else { + expr_info_t expr_info = {.sa = info->sa, .prefix = "", .suffix = "", .assignments = "locals"}; + string_arena_add_string(info->sa, "const %s %s = %s%s.value;", + dump_expr((void**)type_str, &expr_info), + def->name, info->holder, def->name); + } +#endif if( dos > 1 ) string_arena_add_string(info->sa, " (void)%s;", def->name); return string_arena_get_string(info->sa); @@ -4460,7 +4471,7 @@ static void jdf_generate_startup_hook( const jdf_t *jdf ) " if(NULL == device) continue;\n" " if(NULL != device->taskpool_register)\n" " if( PARSEC_SUCCESS != device->taskpool_register(device, (parsec_taskpool_t*)__parsec_tp) ) {\n" - " parsec_debug_verbose(5, parsec_debug_output, \"Device %%s refused to register taskpool %%p\", device->name, __parsec_tp);\n" + " parsec_debug_verbose(10, parsec_debug_output, \"Device %%s refused to register taskpool %%p\", device->name, __parsec_tp);\n" " __parsec_tp->super.super.devices_index_mask &= ~(1 << device->device_index);\n" " continue;\n" " }\n" @@ -4552,11 +4563,14 @@ static void jdf_generate_destructor( const jdf_t *jdf ) " free(__parsec_tp->super.super.task_classes_array); __parsec_tp->super.super.task_classes_array = NULL;\n" " __parsec_tp->super.super.nb_task_classes = 0;\n" "\n" + "#if PARSEC_%s_ADT_IDX_MAX > 0\n" " for(i = 0; i < (uint32_t)__parsec_tp->super.arenas_datatypes_size; i++) {\n" " if( NULL != __parsec_tp->super.arenas_datatypes[i].arena ) {\n" " PARSEC_OBJ_RELEASE(__parsec_tp->super.arenas_datatypes[i].arena);\n" " }\n" - " }\n"); + " }\n" + "#endif /* PARSEC_%s_ADT_IDX_MAX > 0 */", + jdf_basename, jdf_basename); coutput(" /* Destroy the data repositories for this object */\n"); for( f = jdf->functions; NULL != f; f = f->next ) { @@ -5578,7 +5592,7 @@ jdf_generate_code_call_initialization(const jdf_t *jdf, const jdf_call_t *call, jdf_generate_code_reshape_input_from_desc(jdf, f, flow, dl, spaces); coutput("%s this_task->data._f_%s.data_out = chunk;\n" - "%s PARSEC_OBJ_RETAIN(chunk);\n", + "%s PARSEC_DATA_COPY_RETAIN(chunk);\n", spaces, flow->varname, spaces); @@ -5604,7 +5618,7 @@ jdf_generate_code_call_initialization(const jdf_t *jdf, const jdf_call_t *call, assert( dl->datatype_local.count != NULL ); string_arena_add_string(sa2, "%s", dump_expr((void**)dl->datatype_local.count, &info)); - coutput("%s chunk = parsec_arena_get_copy(%s->arena, %s, target_device, %s->opaque_dtt);\n" + coutput("%s chunk = parsec_arena_get_new_copy(%s->arena, %s, target_device, %s->opaque_dtt);\n" "%s chunk->original->owner_device = target_device;\n" "%s this_task->data._f_%s.data_out = chunk;\n", spaces, string_arena_get_string(sa), string_arena_get_string(sa2), string_arena_get_string(sa), @@ -5635,12 +5649,19 @@ jdf_generate_code_call_initialization(const jdf_t *jdf, const jdf_call_t *call, /* Code to create & fulfill a reshape promise locally in case this input dependency is typed */ jdf_generate_code_reshape_input_from_dep(jdf, f, flow, dl, spaces); - coutput("%s this_task->data._f_%s.data_out = parsec_data_get_copy(chunk->original, target_device);\n" - "#if defined(PARSEC_PROF_GRAPHER) && defined(PARSEC_PROF_TRACE)\n" + /* TODO: Setting the data_out here is kind of random, especially as some copy of the input flow. The only thing + * that would make sense here is to set the data_out to the dep outputs back into the user memory (output + * dep with a target into a data collection), to give the opportunity to the accelerator components to + * do a pushout to the desired location (instead of the current approach that will do a pushout to the + * data_copy on device 0 followed by a memcpy into the desired location). + */ + //coutput("%s this_task->data._f_%s.data_out = parsec_data_get_copy(chunk->original, target_device);\n", + // spaces, flow->varname); + + coutput("#if defined(PARSEC_PROF_GRAPHER) && defined(PARSEC_PROF_TRACE)\n" "%s parsec_prof_grapher_data_input(chunk->original, (parsec_task_t*)this_task, &%s, 0);\n" "#endif\n" "%s }\n", - spaces, flow->varname, spaces, JDF_OBJECT_ONAME( flow ), spaces); } @@ -5714,7 +5735,7 @@ static void jdf_generate_code_call_init_output(const jdf_t *jdf, const jdf_call_ spaces, flow->varname, spaces); - coutput("%s chunk = parsec_arena_get_copy(%s->arena, %s, target_device, %s);\n" + coutput("%s chunk = parsec_arena_get_new_copy(%s->arena, %s, target_device, %s);\n" "%s chunk->original->owner_device = target_device;\n", spaces, string_arena_get_string(sa_arena), string_arena_get_string(sa_count), string_arena_get_string(sa_datatype), spaces); @@ -6498,10 +6519,10 @@ jdf_generate_code_data_lookup(const jdf_t *jdf, * This way, it's only retained once during release_deps. */ coutput(" if( NULL == this_task->repo_entry ){\n" - " this_task->repo_entry = data_repo_lookup_entry_and_create(es, %s_repo, " + " this_task->repo_entry = data_repo_lookup_entry_and_create(es, %s_repo, \n" " %s((const parsec_taskpool_t*)__parsec_tp, (const parsec_assignment_t*)&this_task->locals));\n" - " data_repo_entry_addto_usage_limit(%s_repo, this_task->repo_entry->ht_item.key, 1);" - " this_task->repo_entry ->generator = (void*)this_task; /* for AYU */\n" + " data_repo_entry_addto_usage_limit(%s_repo, this_task->repo_entry->ht_item.key, 1);\n" + " this_task->repo_entry->generator = (void*)this_task; /* for AYU */\n" "#if defined(PARSEC_SIM)\n" " assert(this_task->repo_entry ->sim_exec_date == 0);\n" " this_task->repo_entry ->sim_exec_date = this_task->sim_exec_date;\n" @@ -6511,7 +6532,7 @@ jdf_generate_code_data_lookup(const jdf_t *jdf, jdf_property_get_string(f->properties, JDF_PROP_UD_MAKE_KEY_FN_NAME, NULL), f->fname); - coutput(" /* The reshape repo is the current task repo. */" + coutput(" /* The reshape repo is the current task repo. */\n" " reshape_repo = %s_repo;\n" " reshape_entry_key = %s((const parsec_taskpool_t*)__parsec_tp, (const parsec_assignment_t*)&this_task->locals) ;\n" " reshape_entry = this_task->repo_entry;\n", @@ -7020,6 +7041,12 @@ static void jdf_generate_code_hook(const jdf_t *jdf, output = UTIL_DUMP_LIST(sa, f->dataflow, next, dump_data_initialization_from_data_array, &ai2, "", "", "", ""); if( 0 != strlen(output) ) { + coutput("/* Make sure we have the data_out set to the data_in */\n"); + for( fl = f->dataflow; fl != NULL; fl = fl->next) { + if( fl->flow_flags & JDF_FLOW_TYPE_CTL ) continue; + coutput(" this_task->data._f_%s.data_out = this_task->data._f_%s.data_in;\n", + fl->varname, fl->varname); + } coutput(" /** Declare the variables that will hold the data, and all the accounting for each */\n" "%s\n", output); diff --git a/parsec/mca/device/device_gpu.c b/parsec/mca/device/device_gpu.c index d932e975e..0b72dadf8 100644 --- a/parsec/mca/device/device_gpu.c +++ b/parsec/mca/device/device_gpu.c @@ -301,7 +301,7 @@ void parsec_device_dump_exec_stream(parsec_gpu_exec_stream_t* exec_stream) int i; parsec_debug_verbose(0, parsec_gpu_output_stream, - "Dev: GPU stream %d{%p} [events = %d, start = %d, end = %d, executed = %d]", + "Dev: GPU stream %s{%p} [events = %d, start = %d, end = %d, executed = %d]", exec_stream->name, exec_stream, exec_stream->max_events, exec_stream->start, exec_stream->end, exec_stream->executed); for( i = 0; i < exec_stream->max_events; i++ ) { @@ -322,12 +322,12 @@ void parsec_device_dump_gpu_state(parsec_device_gpu_module_t* gpu_device) data_in_dev += gpu_device->super.data_in_from_device[i]; } - parsec_output(parsec_gpu_output_stream, "\n\n"); - parsec_output(parsec_gpu_output_stream, "Device %d:%d (%p) epoch\n", gpu_device->super.device_index, - gpu_device->super.device_index, gpu_device, gpu_device->data_avail_epoch); - parsec_output(parsec_gpu_output_stream, "\tpeer mask %x executed tasks with %llu streams %d\n", - gpu_device->peer_access_mask, (unsigned long long)gpu_device->super.executed_tasks, gpu_device->num_exec_streams); - parsec_output(parsec_gpu_output_stream, "\tstats transferred [in: %llu from host %llu from other device out: %llu] required [in: %llu out: %llu]\n", + parsec_output(parsec_gpu_output_stream, + "\n\nDevice %s:%d (%p) epoch %zu\n" + "\tpeer mask %x executed tasks %llu streams %d\n" + "\tstats transferred [in: %llu from host %llu from other device out: %llu] required [in: %llu out: %llu]\n", + gpu_device->super.name, gpu_device->super.device_index, gpu_device, gpu_device->data_avail_epoch, + gpu_device->peer_access_mask, (unsigned long long)gpu_device->super.executed_tasks, gpu_device->num_exec_streams, (unsigned long long)data_in_host, (unsigned long long)data_in_dev, (unsigned long long)gpu_device->super.data_out_to_host, (unsigned long long)gpu_device->super.required_data_in, (unsigned long long)gpu_device->super.required_data_out); @@ -342,7 +342,7 @@ void parsec_device_dump_gpu_state(parsec_device_gpu_module_t* gpu_device) parsec_gpu_data_copy_t* gpu_copy = (parsec_gpu_data_copy_t*)item; parsec_output(parsec_gpu_output_stream, " %d. elem %p flags 0x%x GPU mem %p\n", i, gpu_copy, gpu_copy->flags, gpu_copy->device_private); - parsec_dump_data_copy(gpu_copy); + parsec_data_copy_dump(gpu_copy); i++; }); } @@ -354,7 +354,7 @@ void parsec_device_dump_gpu_state(parsec_device_gpu_module_t* gpu_device) parsec_gpu_data_copy_t* gpu_copy = (parsec_gpu_data_copy_t*)item; parsec_output(parsec_gpu_output_stream, " %d. elem %p flags 0x%x GPU mem %p\n", i, gpu_copy, gpu_copy->flags, gpu_copy->device_private); - parsec_dump_data_copy(gpu_copy); + parsec_data_copy_dump(gpu_copy); i++; }); } @@ -480,7 +480,7 @@ parsec_device_data_advise(parsec_device_module_t *dev, parsec_data_t *data, int PARSEC_DEBUG_VERBOSE(20, parsec_debug_output, "Retain data copy %p [ref_count %d]", data->device_copies[ data->owner_device ], data->device_copies[ data->owner_device ]->super.super.obj_reference_count); - PARSEC_OBJ_RETAIN(data->device_copies[ data->owner_device ]); + PARSEC_DATA_COPY_RETAIN(data->device_copies[ data->owner_device ]); gpu_task->ec->data[0].data_in = data->device_copies[ data->owner_device ]; gpu_task->ec->data[0].data_out = NULL; gpu_task->ec->data[0].source_repo_entry = NULL; @@ -616,9 +616,8 @@ parsec_device_memory_reserve( parsec_device_gpu_module_t* gpu_device, parsec_warning("GPU[%d:%s] Invalid argument: requesting 0 bytes of memory", gpu_device->super.device_index, gpu_device->super.name); return PARSEC_ERROR; - } else { - alloc_size = number_blocks * eltsize; } + alloc_size = number_blocks * eltsize; } else { /* number_blocks == -1 means memory_percentage is used */ alloc_size = (memory_percentage * initial_free_mem) / 100; @@ -670,7 +669,7 @@ parsec_device_memory_reserve( parsec_device_gpu_module_t* gpu_device, gpu_elem->flags |= PARSEC_DATA_FLAG_PARSEC_OWNED; gpu_elem->device_index = gpu_device->super.device_index; mem_elem_per_gpu++; - PARSEC_OBJ_RETAIN(gpu_elem); + PARSEC_DATA_COPY_RETAIN(gpu_elem); PARSEC_DEBUG_VERBOSE(20, parsec_gpu_output_stream, "GPU[%d:%s] Retain and insert GPU copy %p [ref_count %d] in LRU", gpu_device->super.device_index, gpu_device->super.name, gpu_elem, gpu_elem->super.obj_reference_count); @@ -733,12 +732,22 @@ static void parsec_device_memory_release_list(parsec_device_gpu_module_t* gpu_de gpu_device->super.device_index, gpu_device->super.name, gpu_copy, gpu_copy->device_private, gpu_copy->super.super.obj_reference_count, original, (NULL != original ? original->dc : NULL)); assert( gpu_copy->device_index == gpu_device->super.device_index ); - - if( PARSEC_DATA_COHERENCY_OWNED == gpu_copy->coherency_state ) { - parsec_warning("GPU[%d:%s] still OWNS the master memory copy for data %d and it is discarding it!", - gpu_device->super.device_index, gpu_device->super.name, original->key); + if( NULL == gpu_copy->device_private ) { + PARSEC_DEBUG_VERBOSE(35, parsec_gpu_output_stream, + "GPU[%d:%s] copy %p is dangling without private data. This is OK.", + gpu_device->super.device_index, gpu_device->super.name, (void*)gpu_copy); + goto release_and_continue; + } + if( NULL == gpu_copy->original ) { + PARSEC_DEBUG_VERBOSE(35, parsec_gpu_output_stream, + "GPU[%d:%s] copy %p detached from a data but not yet reclaimed!", + gpu_device->super.device_index, gpu_device->super.name, (void*)gpu_copy); + } + if (PARSEC_DATA_COHERENCY_OWNED == gpu_copy->coherency_state) { + parsec_warning("GPU[%d:%s] still OWNS the master memory copy for data %d (%p) and it is discarding it!", + gpu_device->super.device_index, gpu_device->super.name, original->key, (void*)gpu_copy->device_private); } - assert(0 != (gpu_copy->flags & PARSEC_DATA_FLAG_PARSEC_OWNED) ); + assert(0 != (gpu_copy->flags & PARSEC_DATA_FLAG_PARSEC_OWNED)); #if defined(PARSEC_GPU_ALLOC_PER_TILE) gpu_device->memory_free( gpu_copy->device_private ); @@ -762,11 +771,12 @@ static void parsec_device_memory_release_list(parsec_device_gpu_module_t* gpu_de #endif gpu_copy->device_private = NULL; + release_and_continue: /* At this point the data copies should have no attachment to a data_t. Thus, * before we get here (aka below parsec_fini), the destructor of the data * collection must have been called, releasing all the copies. */ - PARSEC_OBJ_RELEASE(gpu_copy); assert(NULL == gpu_copy); + PARSEC_DATA_COPY_RELEASE(gpu_copy); assert(NULL == gpu_copy); } } @@ -826,6 +836,87 @@ parsec_device_memory_release( parsec_device_gpu_module_t* gpu_device ) return PARSEC_SUCCESS; } +int +parsec_device_get_copy( parsec_device_gpu_module_t* gpu_device, parsec_data_copy_t** dc ) +{ + char task_name[] = "unknown"; + parsec_gpu_data_copy_t *gpu_mem_lru_cycling = NULL, *lru_gpu_elem; + /* Get the head of the LRU, assuming it has no readers and mark it as used, using the same mechanism as + * the GPU to GPU tranfers. Once the communication into this copy completes, the task will get into + * the GPU queues, and the data will be reattributed accordingly to this GPU. + */ + find_another_data: + lru_gpu_elem = (parsec_gpu_data_copy_t*)parsec_list_pop_front(&gpu_device->gpu_mem_lru); + if( NULL == lru_gpu_elem ) { + /* nothing available on the GPU. Let the upper level know about this */ + *dc = NULL; + return PARSEC_ERR_OUT_OF_RESOURCE; + } + if( 0 != lru_gpu_elem->readers ) { + PARSEC_DEBUG_VERBOSE(20, parsec_gpu_output_stream, + "GPU[%d:%s]:%s: Drop LRU-retrieved GPU copy %p [readers %d, ref_count %d] original %p", + gpu_device->super.device_index, gpu_device->super.name, task_name, + lru_gpu_elem, lru_gpu_elem->readers, lru_gpu_elem->super.super.obj_reference_count, lru_gpu_elem->original); + /* We do not add the copy back into the LRU. This means that for now this copy is not + * tracked via the LRU (despite being only used in read mode) and instead is dangling + * on other tasks. Thus, it will eventually need to be added back into the LRU when + * current task using it completes. + */ + goto find_another_data; + } + /* It's also possible that the ref_count of that element is bigger than 1 + * In that case, it's because some task completion did not execute yet, and + * we need to keep it in the list until it reaches 1. + */ + if( lru_gpu_elem->super.super.obj_reference_count > 1 ) { + /* It's also possible (although unlikely) that we livelock here: + * if gpu_mem_lru has *only* elements with readers == 0 but + * ref_count > 1, then we might pop/push forever. We save the + * earliest element found and if we see it again it means we + * run over the entire list without finding a suitable replacement. + * We need to make progress on something else. This remains safe for as long as the + * LRU is only modified by a single thread (in this case the current thread). + */ + PARSEC_DEBUG_VERBOSE(20, parsec_gpu_output_stream, + "GPU[%d:%s]:%s: Push back LRU-retrieved GPU copy %p [readers %d, ref_count %d] original %p", + gpu_device->super.device_index, gpu_device->super.name, task_name, + lru_gpu_elem, lru_gpu_elem->readers, lru_gpu_elem->super.super.obj_reference_count, lru_gpu_elem->original); + assert(0 != (lru_gpu_elem->flags & PARSEC_DATA_FLAG_PARSEC_OWNED) ); + parsec_list_push_back(&gpu_device->gpu_mem_lru, &lru_gpu_elem->super); + goto find_another_data; + } + if( gpu_mem_lru_cycling == lru_gpu_elem ) { + PARSEC_DEBUG_VERBOSE(2, parsec_gpu_output_stream, + "GPU[%d:%s]: Cycle detected on allocating memory for %s", + gpu_device->super.device_index, gpu_device->super.name, task_name); + *dc = NULL; /* did our best but failed to find a data. Return and allocate it onto another device. */ + return PARSEC_ERR_OUT_OF_RESOURCE; + } + /* detect cycles to have an opportunity to stop */ + gpu_mem_lru_cycling = (NULL == gpu_mem_lru_cycling) ? lru_gpu_elem : gpu_mem_lru_cycling; /* update the cycle detector */ + + parsec_data_t* master = lru_gpu_elem->original; + if (NULL == master ) { + /* This copy has been detached by the CPU once it has been consumed (by the communication engine), + * there is no device memory associated with it, we can safely release the CPU copy. + */ + assert(1 == lru_gpu_elem->super.super.obj_reference_count); + PARSEC_OBJ_RELEASE(lru_gpu_elem); + goto find_another_data; + } + parsec_atomic_lock(&master->lock); + if ( lru_gpu_elem->data_transfer_status == PARSEC_DATA_STATUS_UNDER_TRANSFER ) { + /* can't reuse, it is drained right now by another device */ + parsec_atomic_unlock(&master->lock); + goto find_another_data; + } + parsec_data_copy_detach(master, lru_gpu_elem, gpu_device->super.device_index); + parsec_atomic_wmb(); + *dc = lru_gpu_elem; + parsec_atomic_unlock(&master->lock); + return PARSEC_SUCCESS; +} + /** * Try to find memory space to move all data on the GPU. We attach a device_elem to * a memory_elem as soon as a device_elem is available. If we fail to find enough @@ -863,7 +954,10 @@ parsec_device_data_reserve_space( parsec_device_gpu_module_t* gpu_device, assert( flow && (flow->flow_index == i) ); /* Skip CTL flows only */ - if(PARSEC_FLOW_ACCESS_NONE == (PARSEC_FLOW_ACCESS_MASK & flow->flow_flags)) continue; + if(PARSEC_FLOW_ACCESS_NONE == (PARSEC_FLOW_ACCESS_MASK & flow->flow_flags)) { + gpu_task->flow_nb_elts[i] = 0; /* assume there is nothing to transfer to the GPU */ + continue; + } PARSEC_DEBUG_VERBOSE(20, parsec_gpu_output_stream, "GPU[%d:%s]:%s: Investigating flow %s:%d", @@ -871,7 +965,16 @@ parsec_device_data_reserve_space( parsec_device_gpu_module_t* gpu_device, temp_loc[i] = NULL; if (this_task->data[i].data_in == NULL) continue; - + /* if the input data is already on this device there is nothing else to do */ + if( gpu_device->super.device_index == this_task->data[i].data_in->device_index ) { + PARSEC_DEBUG_VERBOSE(20, parsec_gpu_output_stream, + "GPU[%d:%s]:%s: Flow %s:%i was already on the device %p%s", + gpu_device->super.device_index, gpu_device->super.name, task_name, + flow->name, i, gpu_elem, + this_task->data[i].data_in->data_transfer_status == PARSEC_DATA_STATUS_UNDER_TRANSFER ? " [in transfer]" : ""); + this_task->data[i].data_out = this_task->data[i].data_in; + continue; + } master = this_task->data[i].data_in->original; parsec_atomic_lock(&master->lock); gpu_elem = PARSEC_DATA_GET_COPY(master, gpu_device->super.device_index); @@ -906,6 +1009,7 @@ parsec_device_data_reserve_space( parsec_device_gpu_module_t* gpu_device, copy_readers_update = 0; assert(0 != (gpu_elem->flags & PARSEC_DATA_FLAG_PARSEC_OWNED) ); gpu_elem->device_private = zone_malloc(gpu_device->memory, gpu_task->flow_nb_elts[i]); + gpu_elem->arena_chunk = (parsec_arena_chunk_t *)gpu_device->memory; if( NULL == gpu_elem->device_private ) { #endif @@ -927,6 +1031,7 @@ parsec_device_data_reserve_space( parsec_device_gpu_module_t* gpu_device, for( j = 0; j <= i; j++ ) { /* This flow could be a control flow */ if( NULL == temp_loc[j] ) continue; + this_task->data[j].data_out = NULL; /* reset the data out */ /* This flow could be non-parsec-owned, in which case we can't reclaim it */ if( 0 == (temp_loc[j]->flags & PARSEC_DATA_FLAG_PARSEC_OWNED) ) continue; PARSEC_DEBUG_VERBOSE(20, parsec_gpu_output_stream, @@ -937,9 +1042,12 @@ parsec_device_data_reserve_space( parsec_device_gpu_module_t* gpu_device, parsec_list_push_front(&gpu_device->gpu_mem_lru, (parsec_list_item_t*)temp_loc[j]); } #if !defined(PARSEC_GPU_ALLOC_PER_TILE) - PARSEC_OBJ_RELEASE(gpu_elem); + PARSEC_DATA_COPY_RELEASE(gpu_elem); #endif parsec_atomic_unlock(&master->lock); + if( data_avail_epoch ) { /* update the memory epoch */ + gpu_device->data_avail_epoch++; + } return PARSEC_HOOK_RETURN_NEXT; } @@ -1095,7 +1203,7 @@ parsec_device_data_reserve_space( parsec_device_gpu_module_t* gpu_device, "GPU[%d:%s]:%s: Release LRU-retrieved GPU copy %p [ref_count %d: must be 1]", gpu_device->super.device_index, gpu_device->super.name, task_name, lru_gpu_elem, lru_gpu_elem->super.super.obj_reference_count); - PARSEC_OBJ_RELEASE(lru_gpu_elem); + PARSEC_DATA_COPY_RELEASE(lru_gpu_elem); assert( NULL == lru_gpu_elem ); goto malloc_data; } @@ -1167,8 +1275,8 @@ parsec_default_gpu_stage_in(parsec_gpu_task_t *gtask, parsec_gpu_exec_stream_t *gpu_stream) { int ret; - parsec_data_copy_t * source; - parsec_data_copy_t * dest; + parsec_data_copy_t * src_copy; + parsec_data_copy_t * dst_copy; parsec_device_gpu_module_t *src_dev; parsec_device_gpu_module_t *dst_dev; parsec_task_t *task = gtask->ec; @@ -1177,10 +1285,10 @@ parsec_default_gpu_stage_in(parsec_gpu_task_t *gtask, for(int i = 0; i < task->task_class->nb_flows; i++) { if( !(flow_mask & (1U << i)) ) continue; - source = gtask->sources[i]; - dest = task->data[i].data_out; - src_dev = (parsec_device_gpu_module_t*)parsec_mca_device_get(source->device_index); - dst_dev = (parsec_device_gpu_module_t*)parsec_mca_device_get(dest->device_index); + src_copy = gtask->sources[i]; + dst_copy = task->data[i].data_out; + src_dev = (parsec_device_gpu_module_t *)parsec_mca_device_get(src_copy->device_index); + dst_dev = (parsec_device_gpu_module_t *)parsec_mca_device_get(dst_copy->device_index); if(src_dev->super.type == dst_dev->super.type) { assert( src_dev->peer_access_mask & (1 << dst_dev->super.device_index) ); @@ -1189,13 +1297,12 @@ parsec_default_gpu_stage_in(parsec_gpu_task_t *gtask, dir = parsec_device_gpu_transfer_direction_h2d; } - count = (source->original->nb_elts <= dest->original->nb_elts) ? - source->original->nb_elts : dest->original->nb_elts; - ret = dst_dev->memcpy_async( dst_dev, gpu_stream, - dest->device_private, - source->device_private, - count, - dir ); + count = (src_copy->original->nb_elts <= dst_copy->original->nb_elts) ? src_copy->original->nb_elts : dst_copy->original->nb_elts; + ret = dst_dev->memcpy_async(dst_dev, gpu_stream, + dst_copy->device_private, + src_copy->device_private, + count, + dir); if(PARSEC_SUCCESS != ret) return PARSEC_HOOK_RETURN_ERROR; } @@ -1217,8 +1324,8 @@ parsec_default_gpu_stage_out(parsec_gpu_task_t *gtask, parsec_gpu_exec_stream_t *gpu_stream) { int ret; - parsec_data_copy_t * source; - parsec_data_copy_t * dest; + parsec_data_copy_t * src_copy; + parsec_data_copy_t * dst_copy; parsec_device_gpu_module_t *dst_dev, *src_dev; parsec_task_t *task = gtask->ec; size_t count; @@ -1226,22 +1333,22 @@ parsec_default_gpu_stage_out(parsec_gpu_task_t *gtask, int i; for(i = 0; i < task->task_class->nb_flows; i++){ if(flow_mask & (1U << i)){ - source = task->data[i].data_out; - dest = source->original->device_copies[0]; - dst_dev = (parsec_device_gpu_module_t*)parsec_mca_device_get(dest->device_index); - src_dev = (parsec_device_gpu_module_t*)parsec_mca_device_get(source->device_index); - - count = (source->original->nb_elts <= dest->original->nb_elts) ? source->original->nb_elts : - dest->original->nb_elts; - if( src_dev->super.type == dst_dev->super.type ) { + src_copy = task->data[i].data_out; + dst_copy = src_copy->original->device_copies[0]; + dst_dev = (parsec_device_gpu_module_t*)parsec_mca_device_get(dst_copy->device_index); + src_dev = (parsec_device_gpu_module_t*)parsec_mca_device_get(src_copy->device_index); + assert(dst_copy->data_transfer_status != PARSEC_DATA_STATUS_UNDER_TRANSFER); + dst_copy->data_transfer_status = PARSEC_DATA_STATUS_UNDER_TRANSFER; + count = (src_copy->original->nb_elts <= dst_copy->original->nb_elts) ? src_copy->original->nb_elts : dst_copy->original->nb_elts; + if( (src_dev->super.type & PARSEC_DEV_ANY_TYPE) == (dst_dev->super.type & PARSEC_DEV_ANY_TYPE) ) { assert( src_dev->peer_access_mask & (1 << dst_dev->super.device_index) ); dir = parsec_device_gpu_transfer_direction_d2d; } else { dir = parsec_device_gpu_transfer_direction_d2h; } ret = src_dev->memcpy_async( src_dev, gpu_stream, - dest->device_private, - source->device_private, + dst_copy->device_private, + src_copy->device_private, count, dir ); if(PARSEC_SUCCESS != ret) { @@ -1279,7 +1386,24 @@ parsec_device_data_stage_in( parsec_device_gpu_module_t* gpu_device, "GPU[%d:%s]: Prefetch task %p is staging in", gpu_device->super.device_index, gpu_device->super.name, gpu_task); } - + if( gpu_elem == candidate ) { /* data already located in the right place */ + if( candidate->device_index == gpu_device->super.device_index ) { + /* the candidate is already located on the GPU, no transfer should be necessary but let's do the bookkeeping */ + if( (PARSEC_FLOW_ACCESS_WRITE & type) && (gpu_task->task_type != PARSEC_GPU_TASK_TYPE_PREFETCH) ) { + candidate->version++; + parsec_list_item_ring_chop((parsec_list_item_t *)candidate); + PARSEC_LIST_ITEM_SINGLETON(candidate); + } + if( PARSEC_FLOW_ACCESS_READ & type ) { + parsec_atomic_fetch_add_int32(&candidate->readers, 1); + } + return PARSEC_HOOK_RETURN_DONE; + } + parsec_warning("GPU[%d:%s]:\t device_data_stage_in without a proper data_out on the device " + "and with a data_in (%p) located on another device %d", + gpu_device->super.device_index, gpu_device->super.name, + candidate, candidate->device_index); + } parsec_atomic_lock( &original->lock ); gpu_task->sources[flow->flow_index] = candidate; /* default source for the transfer */ @@ -1356,10 +1480,13 @@ parsec_device_data_stage_in( parsec_device_gpu_module_t* gpu_device, gpu_device->super.device_index, gpu_device->super.name, task_data->data_in, task_data->data_in->super.super.obj_reference_count, original); if( gpu_device->super.type == candidate_dev->super.type ) { if( gpu_device->peer_access_mask & (1 << candidate_dev->super.device_index) ) { - /* We can directly do D2D, so let's skip the selection */ - PARSEC_DEBUG_VERBOSE(30, parsec_gpu_output_stream, - "GPU[%d:%s]:\tskipping candidate lookup: data_in copy %p on %s has PEER ACCESS", - gpu_device->super.device_index, gpu_device->super.name, task_data->data_in, candidate_dev->super.name); + /* We have a candidate for the d2d transfer. */ + int readers = parsec_atomic_fetch_inc_int32(&candidate->readers); + /* the data-in should not be in the lru so we can skip the coordination protocol with the owner of the candidate being repurposed. */ + assert( readers >= 0 /* repurposed! */); (void)readers; + PARSEC_DEBUG_VERBOSE(10, parsec_gpu_output_stream, + "GPU[%d:%s]:\tData copy %p [ref_count %d] on PaRSEC device %s is the data_in candidate and has PEER ACCESS; increasing its readers to %d", + gpu_device->super.device_index, gpu_device->super.name, candidate, candidate->super.super.obj_reference_count, candidate_dev->super.name, candidate->readers+1); goto src_selected; } } @@ -1650,6 +1777,7 @@ parsec_device_callback_complete_push(parsec_device_gpu_module_t *gpu_device, /* We also don't push back non-parsec-owned copies */ if(NULL != task->data[i].data_out && 0 == (task->data[i].data_out->flags & PARSEC_DATA_FLAG_PARSEC_OWNED)) continue; + if( gpu_device->super.device_index == task->data[i].data_in->device_index ) continue; flow = gtask->flow[i]; assert( flow ); @@ -1996,7 +2124,7 @@ parsec_device_kernel_push( parsec_device_gpu_module_t *gpu_device, gpu_task->last_data_check_epoch = gpu_device->data_avail_epoch; return ret; } - + gpu_task->last_status = 0; /* mark the task as clean */ for( i = 0; i < this_task->task_class->nb_flows; i++ ) { flow = gpu_task->flow[i]; @@ -2024,11 +2152,10 @@ parsec_device_kernel_push( parsec_device_gpu_module_t *gpu_device, return ret; } } - PARSEC_DEBUG_VERBOSE(10, parsec_gpu_output_stream, "GPU[%d:%s]: Push task %s DONE", gpu_device->super.device_index, gpu_device->super.name, - parsec_task_snprintf(tmp, MAX_TASK_STRLEN, this_task) ); + parsec_task_snprintf(tmp, MAX_TASK_STRLEN, this_task)); gpu_task->complete_stage = parsec_device_callback_complete_push; #if defined(PARSEC_PROF_TRACE) gpu_task->prof_key_end = -1; /* We do not log that event as the completion of this task */ @@ -2117,7 +2244,7 @@ parsec_device_kernel_pop( parsec_device_gpu_module_t *gpu_device, /* If the gpu copy is not owned by parsec, we don't manage it at all */ if( 0 == (gpu_copy->flags & PARSEC_DATA_FLAG_PARSEC_OWNED) ) continue; original = gpu_copy->original; - rc = gpu_task->stage_out? gpu_task->stage_out(gpu_task, (1U << i), gpu_stream): PARSEC_SUCCESS; + rc = gpu_task->stage_out ? gpu_task->stage_out(gpu_task, (1U << i), gpu_stream): PARSEC_SUCCESS; if(PARSEC_SUCCESS != rc) { parsec_warning( "GPU[%d:%s]: gpu_task->stage_out from device rc=%d @%s:%d\n" "\tdata %s <<%p>> -> <<%p>>\n", @@ -2155,6 +2282,7 @@ parsec_device_kernel_pop( parsec_device_gpu_module_t *gpu_device, assert( this_task->data[i].data_in == NULL || original == this_task->data[i].data_in->original ); +#if 0 if( (gpu_task->task_type != PARSEC_GPU_TASK_TYPE_D2D_COMPLETE) && !(flow->flow_flags & PARSEC_FLOW_ACCESS_WRITE) ) { /* Do not propagate GPU copies to successors (temporary solution) */ this_task->data[i].data_out = original->device_copies[0]; @@ -2166,6 +2294,7 @@ parsec_device_kernel_pop( parsec_device_gpu_module_t *gpu_device, this_task->data[i].data_out, this_task->data[i].data_out->super.super.obj_reference_count, original); } +#endif parsec_atomic_lock(&original->lock); if( flow->flow_flags & PARSEC_FLOW_ACCESS_READ ) { int current_readers = parsec_atomic_fetch_sub_int32(&gpu_copy->readers, 1) - 1; @@ -2239,7 +2368,7 @@ parsec_device_kernel_pop( parsec_device_gpu_module_t *gpu_device, #endif /* Move the data back into main memory */ rc = gpu_task->stage_out? gpu_task->stage_out(gpu_task, (1U << flow->flow_index), gpu_stream): PARSEC_SUCCESS; - if(PARSEC_SUCCESS != rc) { + if( PARSEC_SUCCESS != rc ) { parsec_warning( "GPU[%d:%s]: gpu_task->stage_out from device rc=%d @%s:%d\n" "\tdata %s <<%p>> -> <<%p>>\n", gpu_device->super.device_index, gpu_device->super.name, rc, __func__, __LINE__, @@ -2265,7 +2394,7 @@ parsec_device_kernel_pop( parsec_device_gpu_module_t *gpu_device, PARSEC_DEBUG_VERBOSE(10, parsec_gpu_output_stream, "GPU[%d:%s]: Pop %s DONE (return %d data epoch %"PRIu64")", gpu_device->super.device_index, gpu_device->super.name, - parsec_task_snprintf(tmp, MAX_TASK_STRLEN, this_task), return_code, gpu_device->data_avail_epoch ); + parsec_task_snprintf(tmp, MAX_TASK_STRLEN, this_task), return_code < 0 ? return_code: how_many, gpu_device->data_avail_epoch ); return (return_code < 0 ? return_code : how_many); } @@ -2278,8 +2407,7 @@ parsec_device_kernel_epilog( parsec_device_gpu_module_t *gpu_device, parsec_gpu_task_t *gpu_task ) { parsec_task_t *this_task = gpu_task->ec; - parsec_gpu_data_copy_t *gpu_copy, *cpu_copy; - parsec_data_t *original; + parsec_gpu_data_copy_t *gpu_copy; int i; #if defined(PARSEC_DEBUG_NOISIER) @@ -2304,45 +2432,60 @@ parsec_device_kernel_epilog( parsec_device_gpu_module_t *gpu_device, } gpu_copy = this_task->data[i].data_out; - original = gpu_copy->original; - cpu_copy = original->device_copies[0]; /* If it is a copy managed by the user, don't bother either */ if( 0 == (gpu_copy->flags & PARSEC_DATA_FLAG_PARSEC_OWNED) ) continue; +#if 0 + parsec_data_t *original = gpu_copy->original; + parsec_gpu_data_copy_t *cpu_copy = original->device_copies[0]; + if( this_task->data[i].data_in == this_task->data[i].data_out ) { + /** + * There might be a race condition here. We can't assume the first CPU + * version is the corresponding CPU copy, as a new CPU-bound data + * might have been created meanwhile. + * + * WARNING: For now we always forward the cpu_copy to the next task, to + * do that, we lie to the engine by updating the CPU copy to the same + * status than the GPU copy without updating the data itself. Thus, the + * cpu copy is really invalid. this is related to Issue #88, and the + * fact that: + * - we don't forward the gpu copy as output + * - we always take a cpu copy as input, so it has to be in the + * same state as the GPU to prevent an extra data movement. + */ + assert( PARSEC_DATA_COHERENCY_OWNED == gpu_copy->coherency_state ); + gpu_copy->coherency_state = PARSEC_DATA_COHERENCY_SHARED; + cpu_copy->coherency_state = PARSEC_DATA_COHERENCY_SHARED; - /** - * There might be a race condition here. We can't assume the first CPU - * version is the corresponding CPU copy, as a new CPU-bound data - * might have been created meanwhile. - * - * WARNING: For now we always forward the cpu_copy to the next task, to - * do that, we lie to the engine by updating the CPU copy to the same - * status than the GPU copy without updating the data itself. Thus, the - * cpu copy is really invalid. this is related to Issue #88, and the - * fact that: - * - we don't forward the gpu copy as output - * - we always take a cpu copy as input, so it has to be in the - * same state as the GPU to prevent an extra data movement. - */ - assert( PARSEC_DATA_COHERENCY_OWNED == gpu_copy->coherency_state ); - gpu_copy->coherency_state = PARSEC_DATA_COHERENCY_SHARED; - cpu_copy->coherency_state = PARSEC_DATA_COHERENCY_SHARED; - - cpu_copy->version = gpu_copy->version; - PARSEC_DEBUG_VERBOSE(10, parsec_gpu_output_stream, - "GPU[%d:%s]: CPU copy %p [ref_count %d] gets the same version %d as GPU copy %p [ref_count %d]", - gpu_device->super.device_index, gpu_device->super.name, - cpu_copy, cpu_copy->super.super.obj_reference_count, cpu_copy->version, gpu_copy, gpu_copy->super.super.obj_reference_count); - - /** - * Let's lie to the engine by reporting that working version of this - * data is now on the CPU. - */ - this_task->data[i].data_out = cpu_copy; + cpu_copy->version = gpu_copy->version; + PARSEC_DEBUG_VERBOSE(10, parsec_gpu_output_stream, + "GPU[%d:%s]: CPU copy %p [ref_count %d] gets the same version %d as GPU copy %p [ref_count %d]", + gpu_device->super.device_index, gpu_device->super.name, + cpu_copy, cpu_copy->super.super.obj_reference_count, cpu_copy->version, gpu_copy, gpu_copy->super.super.obj_reference_count); - assert( 0 <= gpu_copy->readers ); + /** + * Let's lie to the engine by reporting that working version of this + * data is now on the CPU. + */ + this_task->data[i].data_out = cpu_copy; + } +#endif + assert(0 <= gpu_copy->readers); if( gpu_task->pushout & (1 << i) ) { + parsec_data_t *original = gpu_copy->original; + parsec_gpu_data_copy_t *cpu_copy = original->device_copies[0]; + /* Update the CPU copy to reflect the current status */ + assert(cpu_copy->version < gpu_copy->version); + cpu_copy->version = gpu_copy->version; + cpu_copy->coherency_state = PARSEC_DATA_COHERENCY_SHARED; + gpu_copy->coherency_state = PARSEC_DATA_COHERENCY_SHARED; + assert(PARSEC_DATA_STATUS_UNDER_TRANSFER == cpu_copy->data_transfer_status); + cpu_copy->data_transfer_status = PARSEC_DATA_STATUS_COMPLETE_TRANSFER; + if( 0 == (parsec_mpi_allow_gpu_memory_communications & PARSEC_RUNTIME_SEND_GPU_MEMORY) ) { + /* Report the CPU copy as the output of the task. */ + this_task->data[i].data_out = cpu_copy; + } PARSEC_DEBUG_VERBOSE(20, parsec_gpu_output_stream, "GPU copy %p [ref_count %d] moved to the read LRU in %s", gpu_copy, gpu_copy->super.super.obj_reference_count, __func__); diff --git a/parsec/mca/device/device_gpu.h b/parsec/mca/device/device_gpu.h index fa25b87a3..bb9ff93ea 100644 --- a/parsec/mca/device/device_gpu.h +++ b/parsec/mca/device/device_gpu.h @@ -66,7 +66,7 @@ typedef int (parsec_stage_in_function_t)(parsec_gpu_task_t *gtask, /* Function type to transfer data from the GPU device. * Transfer transfer the contiguous bytes from - * task->data[i].data_in to task->data[i].data_out. + * task->data[i].data_out to the copy on device 0. * * @param[in] task parsec_task_t containing task->data[i].data_in, task->data[i].data_out. * @param[in] flow_mask indicating task flows for which to transfer. diff --git a/parsec/parsec.c b/parsec/parsec.c index 3557b15d5..0bf46e51c 100644 --- a/parsec/parsec.c +++ b/parsec/parsec.c @@ -215,6 +215,10 @@ static void __parsec_task_constructor(parsec_task_t* task) { task->selected_chore = -1; task->load = 0; task->status = PARSEC_TASK_STATUS_NONE; +#if defined(PARSEC_DEBUG_NOISIER) + /* used during task_snprintf for non-fully initialized task_t */ + memset(&task->data, 0, MAX_PARAM_COUNT * sizeof(parsec_data_pair_t)); +#endif } /* @@ -1225,7 +1229,7 @@ int parsec_fini( parsec_context_t** pcontext ) #endif /* PARSEC_PROF_TRACE */ /* PAPI SDE needs to process the shutdown before resources exposed to it are freed. - * This includes scheduling resources, so SDE needs to be finalized before the + * This includes scheduling resources, so SDE needs to be finalized before the * computation threads leave */ PARSEC_PAPI_SDE_FINI(); @@ -1552,7 +1556,7 @@ parsec_update_deps_with_counter(parsec_taskpool_t *tp, (void)origin; (void)origin_flow; (void)dest_flow; - + if( 0 == *deps ) { dep_new_value = parsec_check_IN_dependencies_with_counter(tp, task) - 1; if( parsec_atomic_cas_int32( deps, 0, dep_new_value ) == 1 ) @@ -1715,7 +1719,7 @@ parsec_release_local_OUT_dependencies(parsec_execution_stream_t* es, PARSEC_COPY_EXECUTION_CONTEXT(new_context, task); PARSEC_AYU_ADD_TASK(new_context); - PARSEC_DEBUG_VERBOSE(6, parsec_debug_output, + PARSEC_DEBUG_VERBOSE(7, parsec_debug_output, "%s becomes ready from %s on thread %d:%d, with mask 0x%04x", tmp1, parsec_task_snprintf(tmp2, MAX_TASK_STRLEN, origin), @@ -1798,7 +1802,7 @@ parsec_release_dep_fct(parsec_execution_stream_t *es, * Check that we don't forward a NULL data to someone else. This * can be done only on the src node, since the dst node can * check for datatypes without knowing the data yet. - * By checking now, we allow for the data to be created any time bfore we + * By checking now, we allow for the data to be created any time before we * actually try to transfer it. */ if( PARSEC_UNLIKELY((data->data == NULL) && @@ -1839,20 +1843,20 @@ parsec_release_dep_fct(parsec_execution_stream_t *es, #ifdef PARSEC_RESHAPE_BEFORE_SEND_TO_REMOTE /* Now everything is a reshaping entry */ /* Check if we need to reshape before sending */ - if(parsec_is_CTL_dep(output->data)){ /* CTL DEP */ + if(parsec_is_CTL_dep(output->data)) { /* CTL DEP */ output->data.data_future = NULL; output->data.repo = NULL; output->data.repo_key = -1; - }else{ + } else { /* Get reshape from whatever repo it has been set up into */ output->data.data_future = (parsec_datacopy_future_t*)target_dc; output->data.repo = target_repo; output->data.repo_key = target_repo_entry->ht_item.key; PARSEC_DEBUG_VERBOSE(4, parsec_debug_output, - "th%d RESHAPE_PROMISE SETUP FOR REMOTE DEPS [%p:%p] for INLINE REMOTE %s fut %p", - es->th_id, output->data.data, (output->data.data)->dtt, - (target_repo == successor_repo? "UNFULFILLED" : "FULFILLED"), - output->data.data_future); + "th%d RESHAPE_PROMISE SETUP FOR REMOTE DEPS [%p:%p] for INLINE REMOTE %s fut %p", + es->th_id, output->data.data, (output->data.data)->dtt, + (target_repo == successor_repo? "UNFULFILLED" : "FULFILLED"), + output->data.data_future); } #endif } else { @@ -1942,10 +1946,20 @@ parsec_task_snprintf( char* str, size_t size, task->locals[i].value ); if( index >= size ) return str; } - index += snprintf(str + index, size - index, "]<%d>", task->priority ); + index += snprintf(str + index, size - index, "]<%d> keys = {", task->priority ); + if( index >= size ) return str; + for( i = 0; i < tc->nb_flows; i++ ) { + char *prefix = (i == 0) ? "" : ", "; + if ((NULL == task->data[i].data_in) || (NULL == task->data[i].data_in->original)) + index += snprintf(str + index, size - index, "%s*", prefix); + else + index += snprintf(str + index, size - index, "%s%lx", prefix, task->data[i].data_in->original->key); + if( index >= size ) return str; + } + index += snprintf(str + index, size - index, "}" ); if( index >= size ) return str; if( NULL != task->taskpool ) { - index += snprintf(str + index, size - index, "{%u}", task->taskpool->taskpool_id ); + index += snprintf(str + index, size - index, " {tp: %u}", task->taskpool->taskpool_id ); if( index >= size ) return str; } return str; @@ -2084,6 +2098,7 @@ int parsec_taskpool_reserve_id( parsec_taskpool_t* tp ) tp->taskpool_id = idx; assert( NOTASKPOOL == taskpool_array[idx] ); parsec_atomic_unlock( &taskpool_array_lock ); + PARSEC_DEBUG_VERBOSE(5, parsec_debug_output, "Taskpool %s received id %d", tp->taskpool_name, tp->taskpool_id); return idx; } @@ -2206,7 +2221,7 @@ int parsec_taskpool_enable(parsec_taskpool_t* tp, } if( 0 != distributed ) { - PARSEC_DEBUG_VERBOSE(10, parsec_debug_output, "Register a new taskpool %p: %d with the comm engine", tp, tp->taskpool_id); + PARSEC_DEBUG_VERBOSE(10, parsec_debug_output, "Register a new taskpool %s: %d with the comm engine", tp->taskpool_name, tp->taskpool_id); (void)parsec_remote_dep_new_taskpool(tp); } return PARSEC_HOOK_RETURN_DONE; diff --git a/parsec/parsec_internal.h b/parsec/parsec_internal.h index 214d18c75..b836b67f4 100644 --- a/parsec/parsec_internal.h +++ b/parsec/parsec_internal.h @@ -201,6 +201,12 @@ PARSEC_DECLSPEC PARSEC_OBJ_CLASS_DECLARATION(parsec_taskpool_t); #define PARSEC_DEPENDENCIES_STARTUP_TASK ((parsec_dependency_t)(1<<29)) #define PARSEC_DEPENDENCIES_BITMASK (~(PARSEC_DEPENDENCIES_TASK_DONE|PARSEC_DEPENDENCIES_IN_DONE|PARSEC_DEPENDENCIES_STARTUP_TASK)) +/* Mask denoting if we can send and receive communication (e.g., MPI) directly into + * GPU buffers. + */ +#define PARSEC_RUNTIME_SEND_GPU_MEMORY 0x00000002 +#define PARSEC_RUNTIME_RECV_GPU_MEMORY 0x00000001 + /** * This structure is used internally by the parsec_dependencies_t structures */ @@ -492,6 +498,12 @@ PARSEC_DECLSPEC extern int parsec_slow_bind_warning; * the scheduler, but can provide a better cache reuse. */ PARSEC_DECLSPEC extern int parsec_runtime_keep_highest_priority_task; +/** + * Global configuration mask allowing or not for the data to be sent or received, + * from or to, GPU memory. It can be an OR between PARSEC_RUNTIME_SEND_FROM_GPU_MEMORY + * and PARSEC_RUNTIME_RECV_FROM_GPU_MEMORY. + */ +PARSEC_DECLSPEC extern int parsec_mpi_allow_gpu_memory_communications; /** * Description of the state of the task. It indicates what will be the next diff --git a/parsec/parsec_mpi_funnelled.c b/parsec/parsec_mpi_funnelled.c index c2b84998b..324ef6b3b 100644 --- a/parsec/parsec_mpi_funnelled.c +++ b/parsec/parsec_mpi_funnelled.c @@ -203,6 +203,8 @@ parsec_list_t mpi_funnelled_dynamic_sendreq_fifo; /* ordered non threaded fifo * parsec_list_t mpi_funnelled_dynamic_recvreq_fifo; /* ordered non threaded fifo */ parsec_mempool_t *mpi_funnelled_dynamic_req_mempool = NULL; +int parsec_mpi_allow_gpu_memory_communications = 3; + /* This structure is used to save all the information necessary to * invoke a callback after a MPI_Request is satisfied */ @@ -508,6 +510,15 @@ static int mpi_funneled_init_once(parsec_context_t* context) MAX_MPI_TAG, (unsigned int)MAX_MPI_TAG, MAX_MPI_TAG / MAX_DEP_OUT_COUNT); } +#if !defined(PARSEC_MPI_IS_GPU_AWARE) + parsec_mpi_allow_gpu_memory_communications = 0; +#endif + parsec_mca_param_reg_int_name("mpi", "gpu_aware", + "Enabled if PaRSEC should allow MPI to move data directly from or to GPU memory. Otherwise, all data" + " movements will transit through CPU memory, and will always have a backup copy there. Accepted values " + "are ORed between 1 for receiving into GPU memory and 2 for sending from GPU memory", + false, false, parsec_mpi_allow_gpu_memory_communications, &parsec_mpi_allow_gpu_memory_communications); + parsec_mca_param_reg_int_name("runtime", "comm_mpi_overtake", #if defined(PARSEC_HAVE_MPI_OVERTAKE) "Enable MPI allow overtaking of messages (if applicable). (0: no, 1: yes)", @@ -1332,6 +1343,10 @@ parsec_check_overlapping_binding(parsec_context_t *context) #endif } +#if defined(PARSEC_HAVE_DEV_CUDA_SUPPORT) +#include +#endif /* defined(PARSEC_HAVE_DEV_CUDA_SUPPORT) */ + int mpi_no_thread_enable(parsec_comm_engine_t *ce) { @@ -1343,6 +1358,21 @@ mpi_no_thread_enable(parsec_comm_engine_t *ce) if(parsec_ce_mpi_comm == (MPI_Comm)context->comm_ctx) { return PARSEC_SUCCESS; } +#if defined(PARSEC_HAVE_DEV_CUDA_SUPPORT) + /* The communication thread need to have a CUDA context in order to be able to use + * CUDA managed memory. It should be enough to just create a context onto the first + * active device. + */ + for (int dev = 0; dev < (int)parsec_nb_devices; dev++) { + parsec_device_module_t *device = parsec_mca_device_get(dev); + if (PARSEC_DEV_CUDA & device->type) { + parsec_device_gpu_module_t *gpu_dev = (parsec_device_gpu_module_t*)device; + gpu_dev->set_device(gpu_dev); + } + } +#endif /* defined(PARSEC_HAVE_DEV_CUDA_SUPPORT) */ + + /* Finish the initialization of the communication engine */ parsec_ce.mem_register = mpi_no_thread_mem_register; parsec_ce.mem_unregister = mpi_no_thread_mem_unregister; diff --git a/parsec/parsec_reshape.c b/parsec/parsec_reshape.c index eb6045647..83aa970bf 100644 --- a/parsec/parsec_reshape.c +++ b/parsec/parsec_reshape.c @@ -50,8 +50,8 @@ void parsec_cleanup_reshape_promise(parsec_base_future_t *future) free(match_data); } if(d_fut->super.tracked_data != NULL){ - parsec_data_copy_t * data = (parsec_data_copy_t*) d_fut->super.tracked_data; - PARSEC_DATA_COPY_RELEASE(data); + parsec_data_copy_t * copy = (parsec_data_copy_t*) d_fut->super.tracked_data; + PARSEC_DATA_COPY_RELEASE(copy); } } @@ -141,7 +141,7 @@ parsec_new_reshape_promise(parsec_dep_data_description_t* data, * going to consume the original data->data in order to reshape it, and * all other successors will use directly the reshaped data instead. */ - PARSEC_OBJ_RETAIN( future_in_data->data ); + PARSEC_DATA_COPY_RETAIN( future_in_data->data ); return data_future; } @@ -331,7 +331,7 @@ parsec_create_reshape_promise(parsec_execution_stream_t *es, } #endif - /* retain the future if it's being reuse. */ + /* retain the future if it's being reused. */ if ( !new_future ) PARSEC_OBJ_RETAIN(data->data_future); /* Set up the reshape promise. */ @@ -439,7 +439,7 @@ parsec_set_up_reshape_promise(parsec_execution_stream_t *es, /* Data has been received with the expected remote type of the * successor contained on data->data->dtt. */ data->local.dst_datatype = data->local.src_datatype = data->data->dtt; - }else{ + } else { /* Packed data because multiple unpacking alternatives at reception. */ const parsec_task_class_t* fct = newcontext->task_class; uint32_t flow_mask = (1U << dep->flow->flow_index) | 0x80000000; /* in flow */ @@ -579,7 +579,7 @@ parsec_get_copy_reshape_inline(parsec_execution_stream_t *es, data->data_future = (parsec_datacopy_future_t*)reshape_repo_entry->data[dep_flow_index]; } - if(data->data_future == NULL){ + if(data->data_future == NULL) { parsec_create_reshape_promise(es, data, 0, -1, -1, /* no src dst rank */ @@ -595,9 +595,6 @@ parsec_get_copy_reshape_inline(parsec_execution_stream_t *es, char task_name[MAX_TASK_STRLEN]; parsec_task_snprintf(task_name, MAX_TASK_STRLEN, task); - char type_string[MAX_TASK_STRLEN]="UNFULFILLED"; - char orig_string[MAX_TASK_STRLEN]="LOCAL INLINE"; - char type_name_src[MAX_TASK_STRLEN] = "NULL"; char type_name_dst[MAX_TASK_STRLEN] = "NULL"; char type_name_data[MAX_TASK_STRLEN] = "NULL"; @@ -608,12 +605,11 @@ parsec_get_copy_reshape_inline(parsec_execution_stream_t *es, PARSEC_DEBUG_VERBOSE(12, parsec_debug_output, "th%d RESHAPE_PROMISE CREATE %s %s [%s:%p:%p -> %p] flow_idx %u fut %p on %s(%p) k%d dtt %s -> %s [data %s]", - es->th_id, type_string, orig_string, task_name, data->data, data->data->dtt, + es->th_id, "UNFULFILLED", "LOCAL INLINE", task_name, data->data, data->data->dtt, data->local.dst_datatype, dep_flow_index, data->data_future, "CURR_REPO", setup_repo, setup_repo_key, type_name_src, type_name_dst, type_name_data); - #endif } @@ -629,7 +625,7 @@ parsec_get_copy_reshape_inline(parsec_execution_stream_t *es, es->th_id, *reshape, (*reshape)->dtt, data->data_future); /* reshape completed */ - PARSEC_OBJ_RETAIN(*reshape); + PARSEC_DATA_COPY_RETAIN(*reshape); PARSEC_OBJ_RELEASE(data->data_future); /* Clean up the old stuff on the repo used temporarily to hold * the inline reshape promise. @@ -642,7 +638,7 @@ parsec_get_copy_reshape_inline(parsec_execution_stream_t *es, /** * * Routine to obtain a reshaped copy matching the specifications when reading - * a tile from the datacollection. + * a tile from the data collection. * If a reshape needs to be performed, it is done using an inline reshape * promise, i.e., creating and fulfilling a local future promise (only * the current task instance is involved). Each thread accessing the same @@ -769,7 +765,7 @@ parsec_get_copy_reshape_from_dep(parsec_execution_stream_t *es, "th%d RESHAPE_PROMISE OBTAINED [%p:%p] for %s fut %p", es->th_id, *reshape, (*reshape)->dtt, task_string, data->data_future); - PARSEC_OBJ_RETAIN(*reshape); + PARSEC_DATA_COPY_RETAIN(*reshape); PARSEC_OBJ_RELEASE(data->data_future); return PARSEC_HOOK_RETURN_RESHAPE_DONE; diff --git a/parsec/remote_dep.c b/parsec/remote_dep.c index 1f6920cac..fe4b6c5ec 100644 --- a/parsec/remote_dep.c +++ b/parsec/remote_dep.c @@ -492,7 +492,7 @@ int parsec_remote_dep_activate(parsec_execution_stream_t* es, /* This assert is not correct anymore, we don't need an arena to send to a remote * assert(NULL != output->data.remote.arena);*/ assert( !parsec_is_CTL_dep(&output->data) ); - PARSEC_OBJ_RETAIN(output->data.data); + PARSEC_DATA_COPY_RETAIN(output->data.data); } for( array_index = count = 0; count < remote_deps->output[i].count_bits; array_index++ ) { diff --git a/parsec/remote_dep.h b/parsec/remote_dep.h index 931053b7d..4e5c15677 100644 --- a/parsec/remote_dep.h +++ b/parsec/remote_dep.h @@ -87,6 +87,11 @@ struct parsec_dep_data_description_s { */ parsec_datacopy_future_t *data_future; + /* If we can extract a preferred location for the incoming data set it + * here, otherwise the memory for the incoming data will be allocated + * on the main memory (device 0). + */ + int32_t preferred_device; #ifdef PARSEC_RESHAPE_BEFORE_SEND_TO_REMOTE /* Keeping current repo & key to be able to consume when * the "remote" successors (aka the communication engine) diff --git a/parsec/remote_dep_mpi.c b/parsec/remote_dep_mpi.c index a06a2088c..a776fa410 100644 --- a/parsec/remote_dep_mpi.c +++ b/parsec/remote_dep_mpi.c @@ -80,6 +80,7 @@ remote_dep_cmd_to_string(remote_dep_wire_activate_t* origin, if( NULL == task.task_class ) return snprintf(str, len, "UNKNOWN_of_TASKCLASS_%d", origin->task_class_id), str; memcpy(&task.locals, origin->locals, sizeof(parsec_assignment_t) * task.task_class->nb_locals); task.priority = 0xFFFFFFFF; + for(int i = 0; i < task.task_class->nb_flows; task.data[i++].data_in = NULL); return parsec_task_snprintf(str, len, &task); } @@ -142,10 +143,6 @@ parsec_execution_stream_t parsec_comm_es = { static void remote_dep_mpi_put_start(parsec_execution_stream_t* es, dep_cmd_item_t* item); static void remote_dep_mpi_get_start(parsec_execution_stream_t* es, parsec_remote_deps_t* deps); -static void remote_dep_mpi_get_end(parsec_execution_stream_t* es, - int idx, - parsec_remote_deps_t* deps); - static int remote_dep_mpi_get_end_cb(parsec_comm_engine_t *ce, parsec_ce_tag_t tag, @@ -565,25 +562,34 @@ void parsec_remote_dep_memcpy(parsec_execution_stream_t* es, item->cmd.memcpy.destination = dst; item->cmd.memcpy.layout = data->local; - PARSEC_OBJ_RETAIN(src); + PARSEC_DATA_COPY_RETAIN(src); remote_dep_inc_flying_messages(tp); parsec_dequeue_push_back(&dep_cmd_queue, (parsec_list_item_t*) item); } static inline parsec_data_copy_t* -remote_dep_copy_allocate(parsec_dep_type_description_t* data) +remote_dep_copy_allocate(parsec_dep_type_description_t* data, int preferred_device) { parsec_data_copy_t* dc; if( NULL == data->arena ) { assert(0 == data->dst_count); return NULL; } - dc = parsec_arena_get_copy(data->arena, data->dst_count, 0, data->dst_datatype); - - dc->coherency_state = PARSEC_DATA_COHERENCY_EXCLUSIVE; - PARSEC_DEBUG_VERBOSE(20, parsec_comm_output_stream, "MPI:\tMalloc new remote tile %p size %" PRIu64 " count = %" PRIu64 " displ = %" PRIi64 " %p", - dc, data->arena->elem_size, data->dst_count, data->dst_displ, data->arena); + /* Go and allocate on the preferred device. If that fails, fall back and allocate a + * copy on the main memory (device 0), and parsec will transfer the data as needed + * for all tasks executing on acclerators. + */ + dc = parsec_arena_get_new_copy(data->arena, data->dst_count, preferred_device, data->dst_datatype); + PARSEC_DATA_COPY_RETAIN(dc); + /* don't use preferred_device, it might not be the location where the data copy resides */ + parsec_data_start_transfer_ownership_to_copy(dc->original, dc->device_index, PARSEC_FLOW_ACCESS_WRITE); + if (dc->device_index != preferred_device) { + PARSEC_DEBUG_VERBOSE(5, parsec_comm_output_stream, "MPI:\tFail to allocate tile on device %d and instead allocate on device %d\n", + preferred_device, dc->device_index); + } + PARSEC_DEBUG_VERBOSE(5, parsec_comm_output_stream, "MPI:\tMalloc new temporary tile [dev %d] copy %p size %" PRIu64 " count = %" PRIu64 " displ = %" PRIi64 " %p", + dc->device_index, dc, data->arena->elem_size, data->dst_count, data->dst_displ, data->arena); return dc; } @@ -598,7 +604,7 @@ static inline parsec_data_copy_t* reshape_copy_allocate(parsec_dep_type_description_t* data) { parsec_data_copy_t* dc; - dc = remote_dep_copy_allocate(data); + dc = remote_dep_copy_allocate(data, 0 /* default device */); parsec_data_start_transfer_ownership_to_copy(dc->original, 0, @@ -608,7 +614,7 @@ reshape_copy_allocate(parsec_dep_type_description_t* data) /** * - * Routine to fulfilled a reshape promise by the current thread + * Fulfill a reshape promise by the current thread * (when MPI_THREAD_MULTIPLE) or delegate the reshaping to the communication * thread. * Routine set as callback when initializing a future. @@ -657,9 +663,9 @@ void parsec_local_reshape_cb(parsec_base_future_t *future, ... ) } if(src_pack_size != dst_pack_size){ parsec_warning("parsec_local_reshape: reshape requested between dtt with different packed size fut %p dtt [%p:%s = sz(%d) -> %p:%s= sz(%d)]", - future, - dt->local->src_datatype, type_name_src, src_pack_size, - dt->local->dst_datatype, type_name_dst, dst_pack_size); + future, + dt->local->src_datatype, type_name_src, src_pack_size, + dt->local->dst_datatype, type_name_dst, dst_pack_size); } #endif @@ -779,13 +785,19 @@ remote_dep_mpi_retrieve_datatype(parsec_execution_stream_t *eu, return PARSEC_ITERATE_STOP; } if(old_dtt != PARSEC_DATATYPE_NULL) { + /* multiple input deps from the same predecessor exists. Be careful on what format + * to receive the data. It would not make sense to receive different amount, but + * it is legal to receive them with different type signatures. In this case, ignore + * the datatype, and instead fall back into a packed format (aka bytes) and the + * entire length of the incomming data. + */ if(old_dtt != output->data.remote.dst_datatype) { #if defined(PARSEC_DEBUG_NOISIER) char type_name_src[MAX_TASK_STRLEN] = "NULL"; char type_name_dst[MAX_TASK_STRLEN] = "NULL"; int len; - if(old_dtt!=PARSEC_DATATYPE_NULL) MPI_Type_get_name(old_dtt, type_name_src, &len); - if(output->data.remote.dst_datatype!=PARSEC_DATATYPE_NULL) MPI_Type_get_name(output->data.remote.dst_datatype, type_name_dst, &len); + if(old_dtt != PARSEC_DATATYPE_NULL) MPI_Type_get_name(old_dtt, type_name_src, &len); + if(output->data.remote.dst_datatype != PARSEC_DATATYPE_NULL) MPI_Type_get_name(output->data.remote.dst_datatype, type_name_dst, &len); PARSEC_DEBUG_VERBOSE(30, parsec_comm_output_stream, "MPI: retrieve dtt for %s [dep_datatype_index %x] DTT: old %s new %s (%p) --> PACKED", newcontext->task_class->name, dep->dep_datatype_index, type_name_src, type_name_dst, output->data.remote.dst_datatype); #endif @@ -793,7 +805,6 @@ remote_dep_mpi_retrieve_datatype(parsec_execution_stream_t *eu, parsec_ce.pack_size(&parsec_ce, output->data.remote.dst_count, output->data.remote.dst_datatype, &dsize); output->data.remote.src_count = output->data.remote.dst_count = dsize; output->data.remote.src_datatype = output->data.remote.dst_datatype = PARSEC_DATATYPE_PACKED; - return PARSEC_ITERATE_STOP; } } @@ -808,6 +819,20 @@ remote_dep_mpi_retrieve_datatype(parsec_execution_stream_t *eu, newcontext->task_class->name, dep->dep_datatype_index, type_name_src, type_name_dst, output->data.remote.dst_datatype); } #endif + /* Predict where the incoming temporary should be located, by using the data_affinity. + * This only works is the task affinity is linked to the output location of the task, which + * is mostly true for owner-compute type of algorithms. + */ + output->data.preferred_device = 0; /* the default is CPU memory (aka. device 0) */ + if (NULL != fct->data_affinity ) { + parsec_data_ref_t dref; + fct->data_affinity(newcontext, &dref); + if(NULL != dref.dc->data_of_key) { + parsec_data_t* data = dref.dc->data_of_key(dref.dc, dref.key); + output->data.preferred_device = (-1 != data->preferred_device) ? + data->preferred_device : data->owner_device; + } + } return PARSEC_ITERATE_CONTINUE; } @@ -989,6 +1014,12 @@ remote_dep_release_incoming(parsec_execution_stream_t* es, task.data[target->flow_index].source_repo_entry = NULL; task.data[target->flow_index].data_in = origin->output[i].data.data; task.data[target->flow_index].data_out = origin->output[i].data.data; + if( NULL != origin->output[i].data.data ) { /* nothing for control flows */ + /* The data has been fully received, mark the copy accordingly */ + task.data[target->flow_index].data_in->coherency_state = PARSEC_DATA_COHERENCY_OWNED; + task.data[target->flow_index].data_in->flags &= ~PARSEC_DATA_FLAG_TRANSIT; /* not in transit anymore */ + task.data[target->flow_index].data_in->data_transfer_status = PARSEC_DATA_STATUS_COMPLETE_TRANSFER; + } } #ifdef PARSEC_DIST_COLLECTIVES @@ -1436,7 +1467,7 @@ static int local_dep_nothread_reshape(parsec_execution_stream_t* es, * once all successors have consumed the future, in case it is needed * as an input for nested futures. */ - PARSEC_OBJ_RETAIN(cmd->memcpy.source); + PARSEC_DATA_COPY_RETAIN(cmd->memcpy.source); int rc = remote_dep_nothread_memcpy(es, item); assert(MPI_SUCCESS == rc); @@ -1649,7 +1680,7 @@ remote_dep_mpi_put_start(parsec_execution_stream_t* es, deps->output[k].data.data = reshape_data; - PARSEC_OBJ_RETAIN(reshape_data); + PARSEC_DATA_COPY_RETAIN(reshape_data); PARSEC_DATA_COPY_RELEASE(old_data);/*old data has been retained for remote communication*/ PARSEC_OBJ_RELEASE(deps->output[k].data.data_future); @@ -1854,7 +1885,8 @@ static void remote_dep_mpi_recv_activate(parsec_execution_stream_t* es, if((length - (*position)) >= (int)data_sizes[ds_idx]) { assert(NULL == data_desc->data); /* we do not support in-place tiles now, make sure it doesn't happen yet */ if(NULL == data_desc->data) { - data_desc->data = remote_dep_copy_allocate(type_desc); + /* if we have to unpack the data onto this new copy we might want to allocated it on the CPU */ + data_desc->data = remote_dep_copy_allocate(type_desc, 0); } #ifndef PARSEC_PROF_DRY_DEP PARSEC_DEBUG_VERBOSE(10, parsec_comm_output_stream, @@ -2080,9 +2112,13 @@ static void remote_dep_mpi_get_start(parsec_execution_stream_t* es, /* prepare the local receiving data */ assert(NULL == deps->output[k].data.data); /* we do not support in-place tiles now, make sure it doesn't happen yet */ if(NULL == deps->output[k].data.data) { - deps->output[k].data.data = remote_dep_copy_allocate(&deps->output[k].data.remote); + int best_device = (parsec_mpi_allow_gpu_memory_communications & PARSEC_RUNTIME_RECV_GPU_MEMORY) ? deps->output[k].data.preferred_device : 0; + deps->output[k].data.data = remote_dep_copy_allocate(&deps->output[k].data.remote, best_device); } - dtt = deps->output[k].data.remote.dst_datatype; + /* Mark the data under tranfer */ + deps->output[k].data.data->data_transfer_status = PARSEC_DATA_STATUS_UNDER_TRANSFER; + deps->output[k].data.data->flags |= PARSEC_DATA_FLAG_TRANSIT; + dtt = deps->output[k].data.remote.dst_datatype; nbdtt = deps->output[k].data.remote.dst_count; /* We have the remote mem_handle. @@ -2159,14 +2195,6 @@ static void remote_dep_mpi_get_start(parsec_execution_stream_t* es, } } -static void remote_dep_mpi_get_end(parsec_execution_stream_t* es, - int idx, - parsec_remote_deps_t* deps) -{ - /* The ref on the data will be released below */ - remote_dep_release_incoming(es, deps, (1U<es_profile, MPI_Data_pldr_ek, callback_data->event_id); #endif /* PARSEC_PROF_TRACE */ - remote_dep_mpi_get_end(es, callback_data->k, deps); + remote_dep_release_incoming(es, deps, (1U << callback_data->k)); parsec_ce.mem_unregister(&callback_data->memory_handle); parsec_thread_mempool_free(parsec_remote_dep_cb_data_mempool->thread_mempools, callback_data); @@ -2239,6 +2267,7 @@ int remote_dep_ce_reconfigure(parsec_context_t* context) * execution stream to parsec_comm_es. */ parsec_set_my_execution_stream(&parsec_comm_es); } + return PARSEC_SUCCESS; } diff --git a/parsec/scheduling.c b/parsec/scheduling.c index 55657cc36..24fc0126f 100644 --- a/parsec/scheduling.c +++ b/parsec/scheduling.c @@ -140,11 +140,43 @@ int __parsec_execute( parsec_execution_stream_t* es, * If we run get_best_device, the caller core is available to run a task, so directly using time_estimate with a 0 base is accurate. */ parsec_atomic_fetch_add_int64(&task->selected_device->device_load, task->load); } + else { + /* For every input flow, check if we received a GPU copy, and revert to the + * CPU copy since this is a CPU hook. Note: at this point copy versions + * should be synchronized already from a pushout. */ + for( int i = 0; i < task->task_class->nb_flows; i++ ) { + /* Ignore output-only flows */ + if( NULL == task->task_class->in[i] ) continue; + /* Make sure data_in is not NULL */ + if( NULL == task->data[i].data_in ) continue; + if( PARSEC_FLOW_ACCESS_NONE == (PARSEC_FLOW_ACCESS_MASK & task->task_class->in[i]->flow_flags) ) continue; /* control flow */ + + parsec_data_copy_t* copy = task->data[i].data_in; + if(parsec_mca_device_is_gpu(copy->device_index)) { + assert(copy->coherency_state == PARSEC_DATA_COHERENCY_SHARED); + assert(NULL != copy->original && NULL != copy->original->device_copies[0]); + assert(copy->version == copy->original->device_copies[0]->version); + task->data[i].data_in = copy->original->device_copies[0]; + PARSEC_DATA_COPY_RETAIN(task->data[i].data_in); + PARSEC_DATA_COPY_RELEASE(copy); + } + } + } - PARSEC_DEBUG_VERBOSE(5, parsec_debug_output, "Thread %d of VP %d Execute %s chore %d device %d:%s", - es->th_id, es->virtual_process->vp_id, - tmp, task->selected_chore, - task->selected_device->device_index, task->selected_device->name); +#if defined(PARSEC_DEBUG_NOISIER) + char tmp2[4096]; int len = 0; + len += snprintf(&tmp2[len], 4096-len, "Thread %2d:%d\tExecute %s\tchore[%d] dev %d:%s", + es->th_id, es->virtual_process->vp_id, + tmp, task->selected_chore, + task->selected_device->device_index, task->selected_device->name); + for( int i = 0; parsec_debug_verbose > 5 && i < task->task_class->nb_flows; i++ ) { + parsec_data_copy_t* copy = task->data[i].data_in; + if( NULL == copy ) continue; /* Make sure data_in is not NULL */ + if( PARSEC_FLOW_ACCESS_NONE == (PARSEC_FLOW_ACCESS_MASK & task->task_class->in[i]->flow_flags) ) continue; /* control flow */ + len += snprintf(&tmp2[len], 4096-len, "\n\t Data[%d] key %lx\ton dev:%d version:%d readers:%d ptr:%p", i, copy->original->key, copy->device_index, copy->version, copy->readers, copy->device_private); + } + PARSEC_DEBUG_VERBOSE(5, parsec_debug_output, "%s", tmp2); +#endif parsec_hook_t *hook = tc->incarnations[task->selected_chore].hook; assert( NULL != hook ); diff --git a/parsec/utils/debug.h b/parsec/utils/debug.h index 6f3e53231..80290ae8b 100644 --- a/parsec/utils/debug.h +++ b/parsec/utils/debug.h @@ -160,7 +160,7 @@ extern void (*parsec_weaksym_exit)(int status); #else /* defined(PARSEC_DEBUG_NOISIER) */ #define PARSEC_DEBUG_VERBOSE(...) do{} while(0) -#endif /* defined(PARSEC_DEBUG_VERBOSE) */ +#endif /* defined(PARSEC_DEBUG_NOISIER) */ /** $brief To check if any parsec function returned error. */ diff --git a/tests/apps/haar_tree/project_dyn.jdf b/tests/apps/haar_tree/project_dyn.jdf index a34a9cf80..878b74489 100644 --- a/tests/apps/haar_tree/project_dyn.jdf +++ b/tests/apps/haar_tree/project_dyn.jdf @@ -135,7 +135,7 @@ static int my_project_dyn_startup(parsec_execution_stream_t * es, __parsec_proje new_task->data._f_RL.data_out = NULL; new_task->data._f_NODE.source_repo = NULL; new_task->data._f_NODE.source_repo_entry = NULL; - chunk = parsec_arena_get_copy(__tp->super.arenas_datatypes[PARSEC_project_dyn_DEFAULT_ADT_IDX].arena, 1, 0, __tp->super.arenas_datatypes[PARSEC_project_dyn_DEFAULT_ADT_IDX].opaque_dtt); + chunk = parsec_arena_get_new_copy(__tp->super.arenas_datatypes[PARSEC_project_dyn_DEFAULT_ADT_IDX].arena, 1, 0, __tp->super.arenas_datatypes[PARSEC_project_dyn_DEFAULT_ADT_IDX].opaque_dtt); chunk->original->owner_device = 0; new_task->data._f_NODE.data_out = chunk; new_task->data._f_NODE.data_in = chunk; diff --git a/tests/runtime/cuda/CMakeLists.txt b/tests/runtime/cuda/CMakeLists.txt index fbb5a5022..fdfbf9f89 100644 --- a/tests/runtime/cuda/CMakeLists.txt +++ b/tests/runtime/cuda/CMakeLists.txt @@ -22,7 +22,12 @@ if(PARSEC_HAVE_CUDA) endif( NOT TARGET CUDA::cublas ) # Testing for geting best device - parsec_addtest_executable(C testing_get_best_device SOURCES "testing_get_best_device.c") + parsec_addtest_executable(C testing_get_best_device SOURCES testing_get_best_device.c) target_include_directories(testing_get_best_device PRIVATE $<$:${CMAKE_CURRENT_SOURCE_DIR}>) target_ptg_sources(testing_get_best_device PRIVATE "get_best_device_check.jdf") + + # Testing for communications to and from GPU + parsec_addtest_executable(C cuda_rtt SOURCES rtt.c rtt_main.c) + target_include_directories(cuda_rtt PRIVATE $<$:${CMAKE_CURRENT_SOURCE_DIR}>) + target_ptg_sources(cuda_rtt PRIVATE "rtt.jdf") endif(PARSEC_HAVE_CUDA) diff --git a/tests/runtime/cuda/rtt.jdf b/tests/runtime/cuda/rtt.jdf new file mode 100644 index 000000000..a7b3a6cdd --- /dev/null +++ b/tests/runtime/cuda/rtt.jdf @@ -0,0 +1,26 @@ +extern "C" %{ + /* This simple example does not need to include anything */ +%} + +%option no_taskpool_instance = true /* can be anything */ + +NT +WS + +PING(k) + +k = 0 .. NT-1 +: A(0, k % WS) + +RW T <- (k == 0) ? A(0, k % WS) : T PING(k-1) + -> (k < NT) ? T PING(k+1) + +; 0 + +BODY [type = CUDA] + printf("ping(%d)\n", k); + /* + int r; MPI_Comm_rank(MPI_COMM_WORLD, &r); + printf("%d: PING(%d)\n", r, k); + */ +END diff --git a/tests/runtime/cuda/rtt_main.c b/tests/runtime/cuda/rtt_main.c new file mode 100644 index 000000000..0d2f5ae41 --- /dev/null +++ b/tests/runtime/cuda/rtt_main.c @@ -0,0 +1,236 @@ +/** + * Copyright (c) 2019-2024 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + */ + +#include "parsec.h" +#include "parsec/data_distribution.h" +#include "parsec/data_dist/matrix/matrix.h" +#include "parsec/data_dist/matrix/two_dim_rectangle_cyclic.h" +#include "parsec/include/parsec/execution_stream.h" +#include "parsec/utils/mca_param.h" + +#include "rtt.h" + +#if defined(DISTRIBUTED) +#include +#endif + #include + +static int nb_gpus = 1, gpu_mask = 0xff; +static int cuda_device_index_len = 0, *cuda_device_index = NULL; + +/** + * @brief init operator + * + * @param [in] es: execution stream + * @param [in] descA: tiled matrix date descriptor + * @param [inout] A: inout data + * @param [in] uplo: matrix shape + * @param [in] m: tile row index + * @param [in] n: tile column index + * @param [in] args: NULL + */ +static int matrix_init_ops(parsec_execution_stream_t *es, + const parsec_tiled_matrix_t *descA, + void *_A, parsec_matrix_uplo_t uplo, + int m, int n, void *args) +{ + memset(_A, 1, m*n); + + /* Address warning when compile */ +#if 1 + parsec_data_key_t key = descA->super.data_key((parsec_data_collection_t*)descA, m, n); + parsec_data_t* data = descA->super.data_of_key((parsec_data_collection_t*)descA, key); + parsec_advise_data_on_device(data, + cuda_device_index[m % cuda_device_index_len], + PARSEC_DEV_DATA_ADVICE_PREFERRED_DEVICE); +#endif + (void)es; (void)uplo;(void)n;(void)m;(void)args; + return 0; +} + +static void +__parsec_rtt_destructor(parsec_rtt_taskpool_t *rtt_tp) +{ + parsec_type_free(&(rtt_tp->arenas_datatypes[PARSEC_rtt_DEFAULT_ADT_IDX].opaque_dtt)); +} + +PARSEC_OBJ_CLASS_INSTANCE(parsec_rtt_taskpool_t, parsec_taskpool_t, + NULL, __parsec_rtt_destructor); + +parsec_taskpool_t *rtt_New(parsec_context_t *ctx, parsec_matrix_block_cyclic_t *dcA, + parsec_datatype_t block, int roundtrips) +{ + parsec_rtt_taskpool_t *tp = NULL; + + tp = parsec_rtt_new((parsec_data_collection_t*)dcA, roundtrips, ctx->nb_nodes); + tp->arenas_datatypes[PARSEC_rtt_DEFAULT_ADT_IDX].opaque_dtt = block; + + ptrdiff_t lb, extent; + parsec_type_extent(block, &lb, &extent); + + parsec_arena_datatype_construct(&tp->arenas_datatypes[PARSEC_rtt_DEFAULT_ADT_IDX], + extent, PARSEC_ARENA_ALIGNMENT_SSE, + block); + return (parsec_taskpool_t *)tp; +} + +int main(int argc, char *argv[]) +{ + parsec_context_t *parsec = NULL; + parsec_taskpool_t *tp; + int size = 1, rank = 0, loops = 100, frags = 1, nb_runs = 1, cores = 2, do_sleep = 0, ch, use_opt = 1; + struct timeval tstart, tend; + size_t msg_size = 8*1024; + double t, bw; + + while ((ch = getopt(argc, argv, "c:g:G:l:f:m:n:s:")) != -1) { + switch (ch) { + case 'c': cores = atoi(optarg); use_opt += 2; break; + case 'g': nb_gpus = atoi(optarg); use_opt += 2; break; + case 'G': gpu_mask = atoi(optarg); use_opt += 2; break; + case 'l': loops = atoi(optarg); use_opt += 2; break; + case 'f': frags = atoi(optarg); use_opt += 2; break; + case 'm': msg_size = (size_t)atoi(optarg); use_opt += 2; break; + case 'n': nb_runs = atoi(optarg); use_opt += 2; break; + case 's': do_sleep = atoi(optarg); use_opt += 2; break; + default: + fprintf(stderr, + "-c : number of cores to use (default 2)\n" + "-g : number of GPU to use (default 1)\n" + "-G : GPU mask to use (-1 to modulo rank per node)\n" + "-l : loops of bandwidth(default: 100)\n" + "-f : frags, number of fragments (default: 1)\n" + "-m : size, size of message (default: 1024 * 8)\n" + "-n : number of runs (default: 1)\n" + "-s : number of seconds to sleep before running the tests\n" + "\n"); + exit(1); + } + } + /* Remove all options already acknowledged */ + if( NULL == argv[optind] ) { + argc = 1; + } else { + memcpy(&argv[1], &argv[use_opt+1], (argc - use_opt) * sizeof(char*)); + argc -= use_opt; + } + argv[argc] = NULL; +#if defined(DISTRIBUTED) + { + int provided; + MPI_Init_thread(NULL, NULL, MPI_THREAD_SERIALIZED, &provided); + } + MPI_Comm_size(MPI_COMM_WORLD, &size); + MPI_Comm_rank(MPI_COMM_WORLD, &rank); +#if defined(PARSEC_HAVE_DEV_CUDA_SUPPORT) + if (0xFF == gpu_mask) { + extern char **environ; + MPI_Comm local_comm; + int local_rank, local_size; + MPI_Comm_split_type(MPI_COMM_WORLD, MPI_COMM_TYPE_SHARED, 0, + MPI_INFO_NULL, &local_comm); + MPI_Comm_rank(local_comm, &local_rank); + MPI_Comm_size(local_comm, &local_size); + MPI_Comm_free(&local_comm); + int gpu_mask = 0; + for (int i = 0; i < nb_gpus; i++) { + gpu_mask |= ((1 << local_rank) << i); + } + char *value; + asprintf(&value, "%d", gpu_mask); + parsec_setenv_mca_param("device_cuda_mask", value, &environ); + free(value); + value = NULL; + } +#endif +#endif /* DISTRIBUTED */ + if( 0 == rank ) { + printf("Running %d tests of %d steps RTT with a data of size %zu\n", + nb_runs, loops, msg_size); + } + parsec = parsec_init(cores, &argc, &argv); + + /* can the test run? */ + nb_gpus = parsec_context_query(parsec, PARSEC_CONTEXT_QUERY_DEVICES, PARSEC_DEV_CUDA); + assert(nb_gpus >= 0); + if(nb_gpus == 0) { + parsec_warning("This test can only run if at least one GPU device is present"); + exit(-PARSEC_ERR_DEVICE); + } + cuda_device_index = (int *)malloc(parsec_nb_devices * sizeof(int)); + cuda_device_index_len = 0; + for (int dev = 0; dev < (int)parsec_nb_devices; dev++) { + parsec_device_module_t *device = parsec_mca_device_get(dev); + if (PARSEC_DEV_CUDA & device->type) { + cuda_device_index[cuda_device_index_len++] = device->device_index; + } + } + + parsec_datatype_t block; + size_t mb = sqrt(msg_size), nb = msg_size / mb; + if (mb <= 0) { + fprintf(stderr, "To work, RTT must do at least one round time trip of at least one byte\n"); + exit(-1); + } + + parsec_matrix_block_cyclic_t* dcA = (parsec_matrix_block_cyclic_t *)calloc(1, sizeof(parsec_matrix_block_cyclic_t)); + parsec_matrix_block_cyclic_init(dcA, PARSEC_MATRIX_BYTE, PARSEC_MATRIX_TILE, + parsec->my_rank, + mb, nb, + mb, parsec->nb_nodes * nb, + 0, 0, + mb, parsec->nb_nodes * nb, + 1, parsec->nb_nodes, 1, 1, + 0, 0); + dcA->mat = parsec_data_allocate((size_t)dcA->super.nb_local_tiles * + (size_t)dcA->super.bsiz * + (size_t)parsec_datadist_getsizeoftype(dcA->super.mtype)); + parsec_data_collection_set_key((parsec_data_collection_t *)dcA, "A"); + + parsec_type_create_contiguous(mb * nb, parsec_datatype_uint8_t, &block); + + /* Initialize and place the dcA */ + parsec_apply(parsec, PARSEC_MATRIX_FULL, + (parsec_tiled_matrix_t *)dcA, + (parsec_tiled_matrix_unary_op_t)matrix_init_ops, NULL); + + + if( do_sleep ) { + sleep(do_sleep); + } +#if defined(PARSEC_HAVE_MPI) + MPI_Barrier(MPI_COMM_WORLD); +#endif /* defined(PARSEC_HAVE_MPI) */ + gettimeofday(&tstart, NULL); + for( int test_id = 0; test_id < nb_runs; test_id++ ) { + tp = rtt_New(parsec, dcA, block, loops); + if( NULL != tp ) { + parsec_context_add_taskpool(parsec, tp); + parsec_context_start(parsec); + parsec_context_wait(parsec); + parsec_taskpool_free(tp); + } + } +#if defined(PARSEC_HAVE_MPI) + MPI_Barrier(MPI_COMM_WORLD); +#endif /* defined(PARSEC_HAVE_MPI) */ + gettimeofday(&tend, NULL); + + if( 0 == rank ) { + t = ((tend.tv_sec - tstart.tv_sec) * 1000000.0 + (tend.tv_usec - tstart.tv_usec)) / 1000000.0; /* in seconds */ + double total_payload = (double)nb_runs * (double)loops * (double)msg_size / 1024.0 / 1024.0 / 1024.0; + bw = total_payload / t; + printf("%d\t%d\t%d\t%zu\t%08.4g s\t%4.8g GB/s\n", nb_runs, frags, loops, msg_size*sizeof(uint8_t), t, bw); + } + + free(cuda_device_index); cuda_device_index = NULL; + cuda_device_index_len = 0; + parsec_fini(&parsec); +#if defined(DISTRIBUTED) + MPI_Finalize(); +#endif /* DISTRIBUTED */ + return 0; +}