diff --git a/CMakeLists.txt b/CMakeLists.txt index 6da49c4d..09b5dbd6 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -206,6 +206,10 @@ if(NOT LCI_WITH_LCT_ONLY) CACHE STRING "The default rendezvous protocol to use (write, writeimm).") set_property(CACHE LCI_RDV_PROTOCOL_DEFAULT PROPERTY STRINGS write writeimm) + set(LCI_MAX_SINGLE_MESSAGE_SIZE_DEFAULT + 2000000000 + CACHE STRING "Default single low-level message max size") + mark_as_advanced( LCI_CONFIG_USE_ALIGNED_ALLOC LCI_PACKET_SIZE_DEFAULT @@ -214,7 +218,8 @@ if(NOT LCI_WITH_LCT_ONLY) LCI_SERVER_MAX_CQES_DEFAULT LCI_SERVER_NUM_PKTS_DEFAULT LCI_CACHE_LINE - LCI_RDV_PROTOCOL_DEFAULT) + LCI_RDV_PROTOCOL_DEFAULT + LCI_MAX_SINGLE_MESSAGE_SIZE_DEFAULT) # ############################################################################ # LCI Testing related options diff --git a/cmake_modules/AddLCI.cmake b/cmake_modules/AddLCI.cmake index e45e1293..f933c0d9 100644 --- a/cmake_modules/AddLCI.cmake +++ b/cmake_modules/AddLCI.cmake @@ -20,8 +20,8 @@ function(add_lci_executable name) endfunction() function(add_lci_test name) - cmake_parse_arguments(ARG "" "" "COMMANDS;LABELS;SOURCES;DEPENDENCIES" - ${ARGN}) + cmake_parse_arguments( + ARG "" "" "COMMANDS;LABELS;SOURCES;DEPENDENCIES;ENVIRONMENT" ${ARGN}) add_lci_executable(${name} ${ARG_SOURCES}) target_include_directories(${name} PRIVATE ${CMAKE_SOURCE_DIR}/src/include) @@ -47,11 +47,15 @@ function(add_lci_test name) string(REPLACE " " ";" TEST_COMMAND ${TEST_COMMAND}) add_test(NAME ${test_name} COMMAND ${TEST_COMMAND}) set_property(TEST ${test_name} PROPERTY LABELS ${ARG_LABELS}) + if(ENVIRONMENT) + set_tests_properties(${test_name} PROPERTIES ENVIRONMENT ${ENVIRONMENT}) + endif() endwhile() endfunction() function(add_lci_tests) - cmake_parse_arguments(ARG "" "" "COMMANDS;LABELS;TESTS;DEPENDENCIES" ${ARGN}) + cmake_parse_arguments( + ARG "" "" "COMMANDS;LABELS;TESTS;DEPENDENCIES;ENVIRONMENT" ${ARGN}) foreach(name ${ARG_TESTS}) string(REGEX REPLACE "\\.[^.]*$" "" name_without_ext ${name}) add_lci_test( @@ -63,6 +67,8 @@ function(add_lci_tests) COMMANDS ${ARG_COMMANDS} DEPENDENCIES - ${ARG_DEPENDENCIES}) + ${ARG_DEPENDENCIES} + ENVIRONMENT + ${ENVIRONMENT}) endforeach() endfunction() diff --git a/lci/api/lci.h b/lci/api/lci.h index b927fbf6..e0584130 100644 --- a/lci/api/lci.h +++ b/lci/api/lci.h @@ -438,6 +438,13 @@ extern int LCI_MEDIUM_SIZE; */ extern int LCI_IOVEC_SIZE; +/** + * @ingroup LCI_COMM + * @brief The maximum size (in byte) of a buffer that can be used in long + * messages without being broken. + */ +extern size_t LCI_MAX_SINGLE_MESSAGE_SIZE; + /** * @ingroup LCI_SETUP * @brief Initial number of entries in a default matching table. diff --git a/lci/api/lci_config.h.in b/lci/api/lci_config.h.in index a1884c23..ad3c6c23 100644 --- a/lci/api/lci_config.h.in +++ b/lci/api/lci_config.h.in @@ -40,6 +40,7 @@ #cmakedefine01 LCI_IBV_ENABLE_TD_DEFAULT #cmakedefine01 LCI_ENABLE_PRG_NET_ENDPOINT_DEFAULT #define LCI_RDV_PROTOCOL_DEFAULT "@LCI_RDV_PROTOCOL_DEFAULT@" +#define LCI_MAX_SINGLE_MESSAGE_SIZE_DEFAULT @LCI_MAX_SINGLE_MESSAGE_SIZE_DEFAULT@ #define LCI_CQ_MAX_POLL 16 #define LCI_SERVER_MAX_ENDPOINTS 8 diff --git a/lci/backend/ibv/server_ibv.c b/lci/backend/ibv/server_ibv.c index bbac265f..edf33680 100644 --- a/lci/backend/ibv/server_ibv.c +++ b/lci/backend/ibv/server_ibv.c @@ -76,6 +76,16 @@ void LCISI_event_polling_thread_fina(LCISI_server_t* server) void LCISD_server_init(LCIS_server_t* s) { + // Check configurations + if (LCI_MAX_SINGLE_MESSAGE_SIZE >= 2 << 31) { + // ibverbs' max message is 2GiB (or 2GB?) + LCI_MAX_SINGLE_MESSAGE_SIZE = 2 << 31 - 1; + LCI_Warn( + "Reduce LCI_MAX_SINGLE_MESSAGE_SIZE to %lu" + "as required by libibverbs max message size\n", + LCI_MAX_SINGLE_MESSAGE_SIZE); + } + LCISI_server_t* server = LCIU_malloc(sizeof(LCISI_server_t)); *s = (LCIS_server_t)server; diff --git a/lci/backend/ofi/server_ofi.c b/lci/backend/ofi/server_ofi.c index ff89b093..eb7dfa89 100644 --- a/lci/backend/ofi/server_ofi.c +++ b/lci/backend/ofi/server_ofi.c @@ -99,6 +99,13 @@ void LCISD_server_init(LCIS_server_t* s) "inject_size (%lu) < sizeof(LCI_short_t) (%lu)!\n", server->info->tx_attr->inject_size, sizeof(LCI_short_t)); fi_freeinfo(hints); + if (server->info->ep_attr->max_msg_size < LCI_MAX_SINGLE_MESSAGE_SIZE) { + LCI_MAX_SINGLE_MESSAGE_SIZE = server->info->ep_attr->max_msg_size; + LCI_Warn( + "Reduce LCI_MAX_SINGLE_MESSAGE_SIZE to %lu" + "as required by the libfabric max_msg_size attribute\n", + LCI_MAX_SINGLE_MESSAGE_SIZE); + } if (strcmp(server->info->fabric_attr->prov_name, "cxi") == 0) { LCI_Assert(LCI_USE_DREG == 0, "The registration cache should be turned off " diff --git a/lci/runtime/env.c b/lci/runtime/env.c index 4de6506a..992fa037 100644 --- a/lci/runtime/env.c +++ b/lci/runtime/env.c @@ -7,6 +7,7 @@ LCI_API int LCI_MAX_ENDPOINTS; LCI_API int LCI_MAX_TAG = (1u << 16) - 1; LCI_API int LCI_MEDIUM_SIZE = -1; LCI_API int LCI_IOVEC_SIZE = -1; +LCI_API size_t LCI_MAX_SINGLE_MESSAGE_SIZE; LCI_API int LCI_DEFAULT_QUEUE_LENGTH; LCI_API int LCI_MAX_QUEUE_LENGTH; LCI_API int LCI_MAX_SYNC_LENGTH = INT_MAX; @@ -100,6 +101,8 @@ void LCII_env_init(int num_proc, int rank) (LCI_PACKET_SIZE - sizeof(struct LCII_packet_context) - sizeof(struct LCII_packet_rtr_t)) / sizeof(struct LCII_packet_rtr_rbuffer_info_t)); + LCI_MAX_SINGLE_MESSAGE_SIZE = LCIU_getenv_or( + "LCI_MAX_SINGLE_MESSAGE_SIZE", LCI_MAX_SINGLE_MESSAGE_SIZE_DEFAULT); LCI_OFI_CXI_TRY_NO_HACK = LCIU_getenv_or("LCI_OFI_CXI_TRY_NO_HACK", false); { // default value diff --git a/lci/runtime/rendezvous.h b/lci/runtime/rendezvous.h index aa9149be..ecbceb9d 100644 --- a/lci/runtime/rendezvous.h +++ b/lci/runtime/rendezvous.h @@ -158,6 +158,24 @@ static void LCII_env_init_rdv_protocol() LCI_RDV_PROTOCOL); } +static inline int LCII_calculate_rdma_num(LCII_context_t* ctx) +{ + int nrdmas = 0; + int nbuffers = (ctx->data_type == LCI_IOVEC) ? ctx->data.iovec.count : 1; + for (int i = 0; i < nbuffers; ++i) { + LCI_lbuffer_t* lbuffer; + if (ctx->data_type == LCI_LONG) { + lbuffer = &ctx->data.lbuffer; + } else { + LCI_DBG_Assert(ctx->data_type == LCI_IOVEC, ""); + lbuffer = &ctx->data.iovec.lbuffers[i]; + } + nrdmas += (lbuffer->length + LCI_MAX_SINGLE_MESSAGE_SIZE - 1) / + LCI_MAX_SINGLE_MESSAGE_SIZE; + } + return nrdmas; +} + static inline void LCII_rts_fill_rbuffer_info( struct LCII_packet_rtr_rbuffer_info_t* p, LCI_lbuffer_t lbuffer) { @@ -219,6 +237,7 @@ static inline void LCII_handle_rts(LCI_endpoint_t ep, LCII_packet_t* packet, &rdv_ctx->data.lbuffer); rdv_ctx->data_type = LCI_LONG; } else { + // rdv_type == LCII_RDV_IOVEC rdv_ctx->data.iovec.count = packet->data.rts.count; rdv_ctx->data.iovec.piggy_back.length = packet->data.rts.piggy_back_size; rdv_ctx->data.iovec.piggy_back.address = @@ -245,7 +264,10 @@ static inline void LCII_handle_rts(LCI_endpoint_t ep, LCII_packet_t* packet, // Prepare the RTR packet // reuse the rts packet as rtr packet packet->context.poolid = LCII_POOLID_LOCAL; - if (LCI_RDV_PROTOCOL == LCI_RDV_WRITEIMM && rdv_type != LCII_RDV_IOVEC) { + int nrdmas = LCII_calculate_rdma_num(rdv_ctx); + if (nrdmas == 1 && LCI_RDV_PROTOCOL == LCI_RDV_WRITEIMM && + rdv_type != LCII_RDV_IOVEC) { + // We cannot use writeimm for more than 1 rdma messages. // IOVEC does not support writeimm for now uint64_t ctx_key; int result = @@ -298,16 +320,14 @@ static inline void LCII_handle_rtr(LCI_endpoint_t ep, LCII_packet_t* packet) LCII_context_t* ctx = (LCII_context_t*)packet->data.rtr.send_ctx; // Set up the "extended context" for write protocol void* ctx_to_pass = ctx; - if (LCI_RDV_PROTOCOL == LCI_RDV_WRITE || rdv_type == LCII_RDV_IOVEC) { + int nrdmas = LCII_calculate_rdma_num(ctx); + if (nrdmas > 1 || LCI_RDV_PROTOCOL == LCI_RDV_WRITE || + rdv_type == LCII_RDV_IOVEC) { LCII_extended_context_t* ectx = LCIU_malloc(sizeof(LCII_extended_context_t)); LCII_initilize_comp_attr(ectx->comp_attr); LCII_comp_attr_set_extended(ectx->comp_attr, 1); - if (ctx->data_type == LCI_LONG) { - atomic_init(&ectx->signal_count, 1); - } else { - atomic_init(&ectx->signal_count, ctx->data.iovec.count); - } + atomic_init(&ectx->signal_count, nrdmas); ectx->context = ctx; ectx->ep = ep; ectx->recv_ctx = packet->data.rtr.recv_ctx; @@ -334,17 +354,31 @@ static inline void LCII_handle_rtr(LCI_endpoint_t ep, LCII_packet_t* packet) LCII_PCOUNTER_END(rtr_mem_reg_timer); // issue the put/putimm LCII_PCOUNTER_START(rtr_put_timer); - if (LCI_RDV_PROTOCOL == LCI_RDV_WRITE || rdv_type == LCII_RDV_IOVEC) { - LCIS_post_put_bq(ep->bq_p, ep->bq_spinlock_p, - ep->device->endpoint_progress->endpoint, (int)ctx->rank, - lbuffer->address, lbuffer->length, lbuffer->segment->mr, - packet->data.rtr.rbuffer_info_p[i].remote_addr_base, - packet->data.rtr.rbuffer_info_p[i].remote_addr_offset, - packet->data.rtr.rbuffer_info_p[i].rkey, ctx_to_pass); + if (nrdmas > 1 || LCI_RDV_PROTOCOL == LCI_RDV_WRITE || + rdv_type == LCII_RDV_IOVEC) { + if (lbuffer->length > LCI_MAX_SINGLE_MESSAGE_SIZE) { + LCI_DBG_Log(LCI_LOG_TRACE, "rdv", + "Splitting a large message of %lu bytes\n", + lbuffer->length); + } + for (size_t offset = 0; offset < lbuffer->length; + offset += LCI_MAX_SINGLE_MESSAGE_SIZE) { + char* address = (char*)lbuffer->address + offset; + size_t length = + LCIU_MIN(lbuffer->length - offset, LCI_MAX_SINGLE_MESSAGE_SIZE); + LCIS_post_put_bq( + ep->bq_p, ep->bq_spinlock_p, + ep->device->endpoint_progress->endpoint, (int)ctx->rank, address, + length, lbuffer->segment->mr, + packet->data.rtr.rbuffer_info_p[i].remote_addr_base, + packet->data.rtr.rbuffer_info_p[i].remote_addr_offset + offset, + packet->data.rtr.rbuffer_info_p[i].rkey, ctx_to_pass); + } } else { - LCI_DBG_Assert( - LCI_RDV_PROTOCOL == LCI_RDV_WRITEIMM && rdv_type != LCII_RDV_IOVEC, - "Unexpected rdv protocol!\n"); + LCI_DBG_Assert(lbuffer->length <= LCI_MAX_SINGLE_MESSAGE_SIZE && + LCI_RDV_PROTOCOL == LCI_RDV_WRITEIMM && + rdv_type != LCII_RDV_IOVEC, + "Unexpected rdv protocol!\n"); LCIS_post_putImm_bq(ep->bq_p, ep->bq_spinlock_p, ep->device->endpoint_progress->endpoint, (int)ctx->rank, lbuffer->address, lbuffer->length, diff --git a/tests/lcit/CMakeLists.txt b/tests/lcit/CMakeLists.txt index 52899adf..369498a3 100644 --- a/tests/lcit/CMakeLists.txt +++ b/tests/lcit/CMakeLists.txt @@ -43,3 +43,13 @@ add_lci_tests( "${SRUN_EXE} -n 3 ${SRUN_EXTRA_ARG} [TARGET] --op 1m --nthreads 3 --send-window 1 --recv-window 100 --nsteps 10" "${SRUN_EXE} -n 3 ${SRUN_EXTRA_ARG} [TARGET] --op 1l --nthreads 3 --send-window 1 --recv-window 100 --nsteps 10" ) + +add_lci_tests( + TESTS + lcit_pt2pt.cpp + LABELS + lcit.bigmsg + COMMANDS + "${SRUN_EXE} -n 2 ${SRUN_EXTRA_ARG} env LCI_MAX_SINGLE_MESSAGE_SIZE=1002 [TARGET] --op 2l --min-msg-size=1001 --max-msg-size=4004 --nsteps 1" + "${SRUN_EXE} -n 2 ${SRUN_EXTRA_ARG} env LCI_MAX_SINGLE_MESSAGE_SIZE=1002 [TARGET] --op 1l --min-msg-size=1001 --max-msg-size=4004 --nsteps 1" +)