Skip to content

Commit

Permalink
Fix messages of size larger than int_max
Browse files Browse the repository at this point in the history
LCI will break the message into multiple chunks and send them with RDMA write.
The size of each chunk is decided by LCI_MAX_SINGLE_MESSAGE_SIZE (2GB by default).
  • Loading branch information
JiakunYan committed Nov 8, 2024
1 parent a2cf58f commit 32165e2
Show file tree
Hide file tree
Showing 9 changed files with 105 additions and 22 deletions.
7 changes: 6 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,10 @@ if(NOT LCI_WITH_LCT_ONLY)
CACHE STRING "The default rendezvous protocol to use (write, writeimm).")
set_property(CACHE LCI_RDV_PROTOCOL_DEFAULT PROPERTY STRINGS write writeimm)

set(LCI_MAX_SINGLE_MESSAGE_SIZE_DEFAULT
2000000000
CACHE STRING "Default single low-level message max size")

mark_as_advanced(
LCI_CONFIG_USE_ALIGNED_ALLOC
LCI_PACKET_SIZE_DEFAULT
Expand All @@ -214,7 +218,8 @@ if(NOT LCI_WITH_LCT_ONLY)
LCI_SERVER_MAX_CQES_DEFAULT
LCI_SERVER_NUM_PKTS_DEFAULT
LCI_CACHE_LINE
LCI_RDV_PROTOCOL_DEFAULT)
LCI_RDV_PROTOCOL_DEFAULT
LCI_MAX_SINGLE_MESSAGE_SIZE_DEFAULT)

# ############################################################################
# LCI Testing related options
Expand Down
14 changes: 10 additions & 4 deletions cmake_modules/AddLCI.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ function(add_lci_executable name)
endfunction()

function(add_lci_test name)
cmake_parse_arguments(ARG "" "" "COMMANDS;LABELS;SOURCES;DEPENDENCIES"
${ARGN})
cmake_parse_arguments(
ARG "" "" "COMMANDS;LABELS;SOURCES;DEPENDENCIES;ENVIRONMENT" ${ARGN})

add_lci_executable(${name} ${ARG_SOURCES})
target_include_directories(${name} PRIVATE ${CMAKE_SOURCE_DIR}/src/include)
Expand All @@ -47,11 +47,15 @@ function(add_lci_test name)
string(REPLACE " " ";" TEST_COMMAND ${TEST_COMMAND})
add_test(NAME ${test_name} COMMAND ${TEST_COMMAND})
set_property(TEST ${test_name} PROPERTY LABELS ${ARG_LABELS})
if(ENVIRONMENT)
set_tests_properties(${test_name} PROPERTIES ENVIRONMENT ${ENVIRONMENT})
endif()
endwhile()
endfunction()

function(add_lci_tests)
cmake_parse_arguments(ARG "" "" "COMMANDS;LABELS;TESTS;DEPENDENCIES" ${ARGN})
cmake_parse_arguments(
ARG "" "" "COMMANDS;LABELS;TESTS;DEPENDENCIES;ENVIRONMENT" ${ARGN})
foreach(name ${ARG_TESTS})
string(REGEX REPLACE "\\.[^.]*$" "" name_without_ext ${name})
add_lci_test(
Expand All @@ -63,6 +67,8 @@ function(add_lci_tests)
COMMANDS
${ARG_COMMANDS}
DEPENDENCIES
${ARG_DEPENDENCIES})
${ARG_DEPENDENCIES}
ENVIRONMENT
${ENVIRONMENT})
endforeach()
endfunction()
7 changes: 7 additions & 0 deletions lci/api/lci.h
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,13 @@ extern int LCI_MEDIUM_SIZE;
*/
extern int LCI_IOVEC_SIZE;

/**
* @ingroup LCI_COMM
* @brief The maximum size (in byte) of a buffer that can be used in long
* messages without being broken.
*/
extern size_t LCI_MAX_SINGLE_MESSAGE_SIZE;

/**
* @ingroup LCI_SETUP
* @brief Initial number of entries in a default matching table.
Expand Down
1 change: 1 addition & 0 deletions lci/api/lci_config.h.in
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#cmakedefine01 LCI_IBV_ENABLE_TD_DEFAULT
#cmakedefine01 LCI_ENABLE_PRG_NET_ENDPOINT_DEFAULT
#define LCI_RDV_PROTOCOL_DEFAULT "@LCI_RDV_PROTOCOL_DEFAULT@"
#define LCI_MAX_SINGLE_MESSAGE_SIZE_DEFAULT @LCI_MAX_SINGLE_MESSAGE_SIZE_DEFAULT@

#define LCI_CQ_MAX_POLL 16
#define LCI_SERVER_MAX_ENDPOINTS 8
Expand Down
10 changes: 10 additions & 0 deletions lci/backend/ibv/server_ibv.c
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,16 @@ void LCISI_event_polling_thread_fina(LCISI_server_t* server)

void LCISD_server_init(LCIS_server_t* s)
{
// Check configurations
if (LCI_MAX_SINGLE_MESSAGE_SIZE >= 2 << 31) {
// ibverbs' max message is 2GiB (or 2GB?)
LCI_MAX_SINGLE_MESSAGE_SIZE = 2 << 31 - 1;
LCI_Warn(
"Reduce LCI_MAX_SINGLE_MESSAGE_SIZE to %lu"
"as required by libibverbs max message size\n",
LCI_MAX_SINGLE_MESSAGE_SIZE);
}

LCISI_server_t* server = LCIU_malloc(sizeof(LCISI_server_t));
*s = (LCIS_server_t)server;

Expand Down
7 changes: 7 additions & 0 deletions lci/backend/ofi/server_ofi.c
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,13 @@ void LCISD_server_init(LCIS_server_t* s)
"inject_size (%lu) < sizeof(LCI_short_t) (%lu)!\n",
server->info->tx_attr->inject_size, sizeof(LCI_short_t));
fi_freeinfo(hints);
if (server->info->ep_attr->max_msg_size < LCI_MAX_SINGLE_MESSAGE_SIZE) {
LCI_MAX_SINGLE_MESSAGE_SIZE = server->info->ep_attr->max_msg_size;
LCI_Warn(
"Reduce LCI_MAX_SINGLE_MESSAGE_SIZE to %lu"
"as required by the libfabric max_msg_size attribute\n",
LCI_MAX_SINGLE_MESSAGE_SIZE);
}
if (strcmp(server->info->fabric_attr->prov_name, "cxi") == 0) {
LCI_Assert(LCI_USE_DREG == 0,
"The registration cache should be turned off "
Expand Down
3 changes: 3 additions & 0 deletions lci/runtime/env.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ LCI_API int LCI_MAX_ENDPOINTS;
LCI_API int LCI_MAX_TAG = (1u << 16) - 1;
LCI_API int LCI_MEDIUM_SIZE = -1;
LCI_API int LCI_IOVEC_SIZE = -1;
LCI_API size_t LCI_MAX_SINGLE_MESSAGE_SIZE;
LCI_API int LCI_DEFAULT_QUEUE_LENGTH;
LCI_API int LCI_MAX_QUEUE_LENGTH;
LCI_API int LCI_MAX_SYNC_LENGTH = INT_MAX;
Expand Down Expand Up @@ -100,6 +101,8 @@ void LCII_env_init(int num_proc, int rank)
(LCI_PACKET_SIZE - sizeof(struct LCII_packet_context) -
sizeof(struct LCII_packet_rtr_t)) /
sizeof(struct LCII_packet_rtr_rbuffer_info_t));
LCI_MAX_SINGLE_MESSAGE_SIZE = LCIU_getenv_or(
"LCI_MAX_SINGLE_MESSAGE_SIZE", LCI_MAX_SINGLE_MESSAGE_SIZE_DEFAULT);
LCI_OFI_CXI_TRY_NO_HACK = LCIU_getenv_or("LCI_OFI_CXI_TRY_NO_HACK", false);
{
// default value
Expand Down
68 changes: 51 additions & 17 deletions lci/runtime/rendezvous.h
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,24 @@ static void LCII_env_init_rdv_protocol()
LCI_RDV_PROTOCOL);
}

static inline int LCII_calculate_rdma_num(LCII_context_t* ctx)
{
int nrdmas = 0;
int nbuffers = (ctx->data_type == LCI_IOVEC) ? ctx->data.iovec.count : 1;
for (int i = 0; i < nbuffers; ++i) {
LCI_lbuffer_t* lbuffer;
if (ctx->data_type == LCI_LONG) {
lbuffer = &ctx->data.lbuffer;
} else {
LCI_DBG_Assert(ctx->data_type == LCI_IOVEC, "");
lbuffer = &ctx->data.iovec.lbuffers[i];
}
nrdmas += (lbuffer->length + LCI_MAX_SINGLE_MESSAGE_SIZE - 1) /
LCI_MAX_SINGLE_MESSAGE_SIZE;
}
return nrdmas;
}

static inline void LCII_rts_fill_rbuffer_info(
struct LCII_packet_rtr_rbuffer_info_t* p, LCI_lbuffer_t lbuffer)
{
Expand Down Expand Up @@ -219,6 +237,7 @@ static inline void LCII_handle_rts(LCI_endpoint_t ep, LCII_packet_t* packet,
&rdv_ctx->data.lbuffer);
rdv_ctx->data_type = LCI_LONG;
} else {
// rdv_type == LCII_RDV_IOVEC
rdv_ctx->data.iovec.count = packet->data.rts.count;
rdv_ctx->data.iovec.piggy_back.length = packet->data.rts.piggy_back_size;
rdv_ctx->data.iovec.piggy_back.address =
Expand All @@ -245,7 +264,10 @@ static inline void LCII_handle_rts(LCI_endpoint_t ep, LCII_packet_t* packet,
// Prepare the RTR packet
// reuse the rts packet as rtr packet
packet->context.poolid = LCII_POOLID_LOCAL;
if (LCI_RDV_PROTOCOL == LCI_RDV_WRITEIMM && rdv_type != LCII_RDV_IOVEC) {
int nrdmas = LCII_calculate_rdma_num(rdv_ctx);
if (nrdmas == 1 && LCI_RDV_PROTOCOL == LCI_RDV_WRITEIMM &&
rdv_type != LCII_RDV_IOVEC) {
// We cannot use writeimm for more than 1 rdma messages.
// IOVEC does not support writeimm for now
uint64_t ctx_key;
int result =
Expand Down Expand Up @@ -298,16 +320,14 @@ static inline void LCII_handle_rtr(LCI_endpoint_t ep, LCII_packet_t* packet)
LCII_context_t* ctx = (LCII_context_t*)packet->data.rtr.send_ctx;
// Set up the "extended context" for write protocol
void* ctx_to_pass = ctx;
if (LCI_RDV_PROTOCOL == LCI_RDV_WRITE || rdv_type == LCII_RDV_IOVEC) {
int nrdmas = LCII_calculate_rdma_num(ctx);
if (nrdmas > 1 || LCI_RDV_PROTOCOL == LCI_RDV_WRITE ||
rdv_type == LCII_RDV_IOVEC) {
LCII_extended_context_t* ectx =
LCIU_malloc(sizeof(LCII_extended_context_t));
LCII_initilize_comp_attr(ectx->comp_attr);
LCII_comp_attr_set_extended(ectx->comp_attr, 1);
if (ctx->data_type == LCI_LONG) {
atomic_init(&ectx->signal_count, 1);
} else {
atomic_init(&ectx->signal_count, ctx->data.iovec.count);
}
atomic_init(&ectx->signal_count, nrdmas);
ectx->context = ctx;
ectx->ep = ep;
ectx->recv_ctx = packet->data.rtr.recv_ctx;
Expand All @@ -334,17 +354,31 @@ static inline void LCII_handle_rtr(LCI_endpoint_t ep, LCII_packet_t* packet)
LCII_PCOUNTER_END(rtr_mem_reg_timer);
// issue the put/putimm
LCII_PCOUNTER_START(rtr_put_timer);
if (LCI_RDV_PROTOCOL == LCI_RDV_WRITE || rdv_type == LCII_RDV_IOVEC) {
LCIS_post_put_bq(ep->bq_p, ep->bq_spinlock_p,
ep->device->endpoint_progress->endpoint, (int)ctx->rank,
lbuffer->address, lbuffer->length, lbuffer->segment->mr,
packet->data.rtr.rbuffer_info_p[i].remote_addr_base,
packet->data.rtr.rbuffer_info_p[i].remote_addr_offset,
packet->data.rtr.rbuffer_info_p[i].rkey, ctx_to_pass);
if (nrdmas > 1 || LCI_RDV_PROTOCOL == LCI_RDV_WRITE ||
rdv_type == LCII_RDV_IOVEC) {
if (lbuffer->length > LCI_MAX_SINGLE_MESSAGE_SIZE) {
LCI_DBG_Log(LCI_LOG_TRACE, "rdv",
"Splitting a large message of %lu bytes\n",
lbuffer->length);
}
for (size_t offset = 0; offset < lbuffer->length;
offset += LCI_MAX_SINGLE_MESSAGE_SIZE) {
char* address = (char*)lbuffer->address + offset;
size_t length =
LCIU_MIN(lbuffer->length - offset, LCI_MAX_SINGLE_MESSAGE_SIZE);
LCIS_post_put_bq(
ep->bq_p, ep->bq_spinlock_p,
ep->device->endpoint_progress->endpoint, (int)ctx->rank, address,
length, lbuffer->segment->mr,
packet->data.rtr.rbuffer_info_p[i].remote_addr_base,
packet->data.rtr.rbuffer_info_p[i].remote_addr_offset + offset,
packet->data.rtr.rbuffer_info_p[i].rkey, ctx_to_pass);
}
} else {
LCI_DBG_Assert(
LCI_RDV_PROTOCOL == LCI_RDV_WRITEIMM && rdv_type != LCII_RDV_IOVEC,
"Unexpected rdv protocol!\n");
LCI_DBG_Assert(lbuffer->length <= LCI_MAX_SINGLE_MESSAGE_SIZE &&
LCI_RDV_PROTOCOL == LCI_RDV_WRITEIMM &&
rdv_type != LCII_RDV_IOVEC,
"Unexpected rdv protocol!\n");
LCIS_post_putImm_bq(ep->bq_p, ep->bq_spinlock_p,
ep->device->endpoint_progress->endpoint,
(int)ctx->rank, lbuffer->address, lbuffer->length,
Expand Down
10 changes: 10 additions & 0 deletions tests/lcit/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,13 @@ add_lci_tests(
"${SRUN_EXE} -n 3 ${SRUN_EXTRA_ARG} [TARGET] --op 1m --nthreads 3 --send-window 1 --recv-window 100 --nsteps 10"
"${SRUN_EXE} -n 3 ${SRUN_EXTRA_ARG} [TARGET] --op 1l --nthreads 3 --send-window 1 --recv-window 100 --nsteps 10"
)

add_lci_tests(
TESTS
lcit_pt2pt.cpp
LABELS
lcit.bigmsg
COMMANDS
"${SRUN_EXE} -n 2 ${SRUN_EXTRA_ARG} env LCI_MAX_SINGLE_MESSAGE_SIZE=1002 [TARGET] --op 2l --min-msg-size=1001 --max-msg-size=4004 --nsteps 1"
"${SRUN_EXE} -n 2 ${SRUN_EXTRA_ARG} env LCI_MAX_SINGLE_MESSAGE_SIZE=1002 [TARGET] --op 1l --min-msg-size=1001 --max-msg-size=4004 --nsteps 1"
)

0 comments on commit 32165e2

Please sign in to comment.