diff --git a/CHANGELOG.md b/CHANGELOG.md index 00e37aa88f07..1962639578b1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ## Current develop ### Added (new features/APIs/variables/...) +- [[PR 1192]](https://github.com/parthenon-hpc-lab/parthenon/pull/1103) Coalesced buffer communication - [[PR 1210]](https://github.com/parthenon-hpc-lab/parthenon/pull/1210) Add cycle based output - [[PR 1103]](https://github.com/parthenon-hpc-lab/parthenon/pull/1103) Add sparsity to vector wave equation test - [[PR 1185]](https://github.com/parthenon-hpc-lab/parthenon/pull/1185) Bugfix to particle defragmentation diff --git a/doc/sphinx/src/boundary_communication.rst b/doc/sphinx/src/boundary_communication.rst index 39e38e81749c..4d99c63387db 100644 --- a/doc/sphinx/src/boundary_communication.rst +++ b/doc/sphinx/src/boundary_communication.rst @@ -476,3 +476,99 @@ For backwards compatibility, we keep the aliases - ``ReceiveFluxCorrections`` = ``ReceiveBoundBufs`` - ``SetFluxCorrections`` = ``SetBoundBufs`` +Coalesced MPI Communication +--------------------------- + +As is described above, a one-dimensional buffer is packed and unpacked for each communicated +field on each pair of blocks that share a unique topological element (below we refer to this +as a variable-boundary buffer). For codes with larger numbers of variables and/or in +simulations run with smaller block sizes, this can result in a large total number of buffers +and importantly a large number of buffers that need to be communicated across MPI ranks. The +latter fact can have significant performance implications, as each ``CommBuffer::Send()`` +call for these non-local buffers corresponds to an ``MPI_Isend``. Generally, these messages +contain a small amount of data which results in a small effective MPI bandwith. Additionally, +MPI implementations seem to have a hard time dealing with the large number of messages +required. In some cases, this can result in poor scaling behavior for Parthenon. + +To get around this, we introduce a second level of buffers for communicating across ranks. +For each ``MeshData`` object on a given MPI rank, coalesced buffers equal in size to all +MPI non-local variable-boundary buffers are created for each other MPI rank that ``MeshData`` +communicates to. These coalesced buffers are then filled from the single variable-boundary +buffers, a *single* MPI send is called per MPI rank pair, and the receiving ranks unpack the +coalesced buffer into the single variable-boundary buffers. This can drastically reduce the +number of MPI sends and increase the total amount of data sent per message, thereby +increasing the effective bandwidth. Further, in cases where Parthenon is running on GPUs but +GPUDirect MPI is not available, this can also minimize the number of DtoH and HtoD copies +during communication. + +To use coalesced communication, your input must include: + +.. code:: + + parthenon/mesh/do_coalesced_comms = true + +curently by default this is set to ``true``. + +Implementation Details +~~~~~~~~~~~~~~~~~~~~~~ + +The coalesced send and receive buffers for each rank are stored in ``Mesh::pcoalesced_comms``, +which is a ``std::shared_ptr`` to a ``CoalescedComms`` object. To do coalesced communication +two pieces are required: 1) an initialization step telling all ranks what coalesced buffer +messages they can expect and 2) a mechanism for packing, sending and unpacking the coalesced +buffers during each boundary communication step. + +For the first piece, after every remesh during ``BuildBoundaryBuffers``, each non-local +variable-boundary buffer is registered with ``pcoalesced_comms``. Once all these buffers are +registered, ``CoalescedComms::ResolveAndSendSendBuffers()`` is called, which determines all +the coalesced buffers that are going to be sent from a given rank to every other rank, packs +information about each of the coalesced buffers into MPI messages, and sends them to the other +ranks so that the receiving ranks know how to interpret the messages they receive from a given +rank. ``CoalescedComms::ReceiveBufferInfo()`` is then called to receive this information from +other ranks. This process basically just packs ``BndId`` objects, which contain the information +necessary to identify a variable-boundary communication channel and the amount of data that +is communicated across that channel, and then unpacks them on the receiving end and finds the +correct variable-boundary buffers. These routines are called once per rank (rather than per +``MeshData``). + +For the second piece, variable-boundary buffers are first filled as normal in ``SendBoundBufs`` +but the states of the ``CommBuffer``s are updated without actually calling the associated +``MPI_Isend``s. Then ``CoalescedComms::PackAndSend(MeshData *pmd, BoundaryType b_type)`` +is called, which for each rank pair associated with ``pmd`` packs the variable-boundary buffers +into the coalesced buffer, packs a second message containing the sparse allocation status of +each variable-boundary buffer, send these two messages, and then stales the associated +variable-boundary buffers since their data is no longer required. On the receiving side, +``ReceiveBoundBufs`` receives these messages, sets the corresponding variable-boundary +buffers to the correct ``received`` or ``received_null`` state, and then unpacks the data +into the buffers. Note that the messages received here do not necessarily correspond to the +``MeshData`` that is passed to the associated ``ReceiveBoundBufs`` call, so all +variable-boundary associated with a given receiving ``MeshData`` must still be checked for +being in a received state. Once they are all in a received state, setting of boundaries, +prolongation, etc. can proceed normally. + +Some notes: +- Internally ``CoalescedComms`` contains maps from MPI rank and ``BoundaryType`` (e.g. regular + communication, flux correction) to ``CoalescedBuffersRank`` objects for sending and receiving + rank pairs. These ``CoalescedBuffersRank`` objects in turn contain maps from ``MeshData`` + partition id of the sending ``MeshData`` (which also doubles as the MPI tag for the messages) + to ``CoalescedBuffer`` objects). +- ``CoalescedBuffersRank`` is where the post-remesh initialization routines are actually + implemented. This can either correspond to the send or receive side. +- ``CoalescedBuffer`` corresponds to each coalesced buffer and is where + the packing, sending, receiving, and unpacking details for coalesced boundary communication + are implemented. This object internally owns the ``CommunicationBuffer>`` + that is used for sending and receiving the coalesced data (as well as the communication buffer + used for communicating allocation status). +- Because Parthenon allows communication on ``MeshData`` objects that contain a subset of the + ``MetaData::FillGhost`` fields in a simulation, we need to be able to interpret coalesced + messages that that contain a subset of fields. Most of what is needed for this is implemented + in ``GetBndIdsOnDevice``. +- The coalesced buffers are sparse aware and approximately allocate the amount of space required + to store the *allocated* fields. This means the size of the buffers can change dynamically + between steps. Currently, we allocate twice as much memory as is required to store the allocated + variable-boundary buffers whenever their total size becomes larger than current size of the + coalesced buffer in an attempt to balance the number of allocations and memory consumption. Since + the receiving end does not *a priori* know the size of the coalesced messages it is going to + receive, we first check the size of the incoming MPI message, reallocate the coalesced receive + buffer if necessary, and then actually post the `Irecv`. FWIW, this prevents pre-posting + the `Irecv`. \ No newline at end of file diff --git a/example/fine_advection/advection_driver.cpp b/example/fine_advection/advection_driver.cpp index ca55c04f8278..1c1608447fc6 100644 --- a/example/fine_advection/advection_driver.cpp +++ b/example/fine_advection/advection_driver.cpp @@ -95,9 +95,6 @@ TaskCollection AdvectionDriver::MakeTaskCollection(BlockList_t &blocks, const in auto &mc1 = pmesh->mesh_data.Add(stage_name[stage], mbase); auto &mdudt = pmesh->mesh_data.Add("dUdt", mbase); - auto start_send = tl.AddTask(none, parthenon::StartReceiveBoundaryBuffers, mc1); - auto start_flxcor = tl.AddTask(none, parthenon::StartReceiveFluxCorrections, mc0); - // Make a sparse variable pack descriptors that can be used to build packs // including some subset of the fields in this example. This will be passed // to the Stokes update routines, so that they can internally create variable @@ -146,9 +143,8 @@ TaskCollection AdvectionDriver::MakeTaskCollection(BlockList_t &blocks, const in } } - auto set_flx = parthenon::AddFluxCorrectionTasks( - start_flxcor | flx | flx_fine | vf_dep, tl, mc0, pmesh->multilevel); - + auto set_flx = parthenon::AddFluxCorrectionTasks(flx | flx_fine | vf_dep, tl, mc0, + pmesh->multilevel); auto update = set_flx; if (do_regular_advection) { update = AddUpdateTasks(set_flx, tl, parthenon::CellLevel::same, TT::Cell, beta, dt, @@ -170,7 +166,7 @@ TaskCollection AdvectionDriver::MakeTaskCollection(BlockList_t &blocks, const in } auto boundaries = parthenon::AddBoundaryExchangeTasks( - update | update_vec | update_fine | start_send, tl, mc1, pmesh->multilevel); + update | update_vec | update_fine, tl, mc1, pmesh->multilevel); auto fill_derived = tl.AddTask(boundaries, parthenon::Update::FillDerived>, mc1.get()); diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 825014341aef..5d39535839c9 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -101,9 +101,13 @@ add_library(parthenon bvals/comms/bvals_in_one.hpp bvals/comms/bvals_utils.hpp bvals/comms/build_boundary_buffers.cpp + bvals/comms/bnd_id.cpp + bvals/comms/bnd_id.hpp bvals/comms/bnd_info.cpp bvals/comms/bnd_info.hpp bvals/comms/boundary_communication.cpp + bvals/comms/coalesced_buffers.cpp + bvals/comms/coalesced_buffers.hpp bvals/comms/tag_map.cpp bvals/comms/tag_map.hpp diff --git a/src/basic_types.hpp b/src/basic_types.hpp index f1f07878a533..cdd22d5137f5 100644 --- a/src/basic_types.hpp +++ b/src/basic_types.hpp @@ -77,6 +77,36 @@ enum class BoundaryType : int { gmg_prolongate_recv }; +inline constexpr bool IsSender(BoundaryType btype) { + if (btype == BoundaryType::flxcor_recv) return false; + if (btype == BoundaryType::gmg_restrict_recv) return false; + if (btype == BoundaryType::gmg_prolongate_recv) return false; + return true; +} + +inline constexpr bool IsReceiver(BoundaryType btype) { + if (btype == BoundaryType::flxcor_send) return false; + if (btype == BoundaryType::gmg_restrict_send) return false; + if (btype == BoundaryType::gmg_prolongate_send) return false; + return true; +} + +inline constexpr BoundaryType GetAssociatedReceiver(BoundaryType btype) { + if (btype == BoundaryType::flxcor_send) return BoundaryType::flxcor_recv; + if (btype == BoundaryType::gmg_restrict_send) return BoundaryType::gmg_restrict_recv; + if (btype == BoundaryType::gmg_prolongate_send) + return BoundaryType::gmg_prolongate_recv; + return btype; +} + +inline constexpr BoundaryType GetAssociatedSender(BoundaryType btype) { + if (btype == BoundaryType::flxcor_recv) return BoundaryType::flxcor_send; + if (btype == BoundaryType::gmg_restrict_recv) return BoundaryType::gmg_restrict_send; + if (btype == BoundaryType::gmg_prolongate_recv) + return BoundaryType::gmg_prolongate_send; + return btype; +} + enum class GridType : int { none, leaf, two_level_composite, single_level_with_internal }; struct GridIdentifier { GridType type = GridType::none; @@ -102,20 +132,6 @@ inline bool operator<(const GridIdentifier &lhs, const GridIdentifier &rhs) { return lhs.logical_level < rhs.logical_level; } -constexpr bool IsSender(BoundaryType btype) { - if (btype == BoundaryType::flxcor_recv) return false; - if (btype == BoundaryType::gmg_restrict_recv) return false; - if (btype == BoundaryType::gmg_prolongate_recv) return false; - return true; -} - -constexpr bool IsReceiver(BoundaryType btype) { - if (btype == BoundaryType::flxcor_send) return false; - if (btype == BoundaryType::gmg_restrict_send) return false; - if (btype == BoundaryType::gmg_prolongate_send) return false; - return true; -} - // Enumeration for accessing a field on different locations of the grid: // CC = cell center of (i, j, k) // F1 = x-normal face at (i - 1/2, j, k) diff --git a/src/bvals/comms/bnd_id.cpp b/src/bvals/comms/bnd_id.cpp new file mode 100644 index 000000000000..66a41eeaa463 --- /dev/null +++ b/src/bvals/comms/bnd_id.cpp @@ -0,0 +1,69 @@ +//======================================================================================== +// Parthenon performance portable AMR framework +// Copyright(C) 2024 The Parthenon collaboration +// Licensed under the 3-clause BSD License, see LICENSE file for details +//======================================================================================== +// (C) (or copyright) 2020-2024. Triad National Security, LLC. All rights reserved. +// +// This program was produced under U.S. Government contract 89233218CNA000001 for Los +// Alamos National Laboratory (LANL), which is operated by Triad National Security, LLC +// for the U.S. Department of Energy/National Nuclear Security Administration. All rights +// in the program are reserved by Triad National Security, LLC, and the U.S. Department +// of Energy/National Nuclear Security Administration. The Government is granted for +// itself and others acting on its behalf a nonexclusive, paid-up, irrevocable worldwide +// license in this material to reproduce, prepare derivative works, distribute copies to +// the public, perform publicly and display publicly, and to permit others to do so. +//======================================================================================== + +#include +#include +#include // debug +#include +#include +#include + +#include "basic_types.hpp" +#include "bvals/comms/bnd_id.hpp" +#include "bvals/comms/bvals_utils.hpp" +#include "bvals/neighbor_block.hpp" +#include "config.hpp" +#include "globals.hpp" +#include "interface/state_descriptor.hpp" +#include "interface/variable.hpp" +#include "kokkos_abstraction.hpp" +#include "mesh/domain.hpp" +#include "mesh/mesh.hpp" +#include "mesh/mesh_refinement.hpp" +#include "mesh/meshblock.hpp" +#include "prolong_restrict/prolong_restrict.hpp" +#include "utils/error_checking.hpp" + +namespace parthenon { + +BndId BndId::GetSend(MeshBlock *pmb, const NeighborBlock &nb, + std::shared_ptr> v, BoundaryType b_type, + int partition, int start_idx) { + auto [send_gid, recv_gid, vlabel, loc, extra_id] = SendKey(pmb, nb, v, b_type); + BndId out; + out.send_gid() = send_gid; + out.recv_gid() = recv_gid; + out.loc_idx() = loc; + out.var_id() = v->GetUniqueID(); + out.extra_id() = extra_id; + out.rank_send() = Globals::my_rank; + out.rank_recv() = nb.rank; + out.partition() = partition; + out.size() = BndInfo::GetSendBndInfo(pmb, nb, v, nullptr).size(); + out.start_idx() = start_idx; + return out; +} + +void BndId::PrintInfo(const std::string &start) { + printf("%s var %s (%i -> %i) starting at %i with size %i (Total combined buffer size = " + "%li, buffer size = %li, buf_allocated = %i) [rank = %i]\n", + start.c_str(), Variable::GetLabel(var_id()).c_str(), send_gid(), + recv_gid(), start_idx(), size(), coalesced_buf.size(), buf.size(), buf_allocated, + Globals::my_rank); +} + +} // namespace parthenon diff --git a/src/bvals/comms/bnd_id.hpp b/src/bvals/comms/bnd_id.hpp new file mode 100644 index 000000000000..9d250af31bb1 --- /dev/null +++ b/src/bvals/comms/bnd_id.hpp @@ -0,0 +1,111 @@ +//======================================================================================== +// Parthenon performance portable AMR framework +// Copyright(C) 2024 The Parthenon collaboration +// Licensed under the 3-clause BSD License, see LICENSE file for details +//======================================================================================== +// (C) (or copyright) 2020-2024. Triad National Security, LLC. All rights reserved. +// +// This program was produced under U.S. Government contract 89233218CNA000001 for Los +// Alamos National Laboratory (LANL), which is operated by Triad National Security, LLC +// for the U.S. Department of Energy/National Nuclear Security Administration. All rights +// in the program are reserved by Triad National Security, LLC, and the U.S. Department +// of Energy/National Nuclear Security Administration. The Government is granted for +// itself and others acting on its behalf a nonexclusive, paid-up, irrevocable worldwide +// license in this material to reproduce, prepare derivative works, distribute copies to +// the public, perform publicly and display publicly, and to permit others to do so. +//======================================================================================== + +#ifndef BVALS_COMMS_BND_ID_HPP_ +#define BVALS_COMMS_BND_ID_HPP_ + +#include +#include +#include + +#include "basic_types.hpp" +#include "bvals/neighbor_block.hpp" +#include "coordinates/coordinates.hpp" +#include "interface/variable_state.hpp" +#include "mesh/domain.hpp" +#include "mesh/forest/logical_coordinate_transformation.hpp" +#include "utils/communication_buffer.hpp" +#include "utils/indexer.hpp" +#include "utils/object_pool.hpp" + +namespace parthenon { + +template +class Variable; + +// Provides the information necessary for identifying a unique variable-boundary +// buffer, identifying the coalesced buffer it is associated with, and its +// position within the coalesced buffer. +struct BndId { + constexpr static std::size_t NDAT = 10; + int data[NDAT]; + + // Information for identifying the buffer with a communication + // channel, variable, and the ranks it is communicated across + KOKKOS_FORCEINLINE_FUNCTION + int &send_gid() { return data[0]; } + KOKKOS_FORCEINLINE_FUNCTION + int &recv_gid() { return data[1]; } + KOKKOS_FORCEINLINE_FUNCTION + int &loc_idx() { return data[2]; } + KOKKOS_FORCEINLINE_FUNCTION + int &var_id() { return data[3]; } + KOKKOS_FORCEINLINE_FUNCTION + int &extra_id() { return data[4]; } + KOKKOS_FORCEINLINE_FUNCTION + int &rank_send() { return data[5]; } + KOKKOS_FORCEINLINE_FUNCTION + int &rank_recv() { return data[6]; } + BoundaryType bound_type; + + // MeshData partition id of the *sender* + // not set by constructors and only necessary for coalesced comms + KOKKOS_FORCEINLINE_FUNCTION + int &partition() { return data[7]; } + KOKKOS_FORCEINLINE_FUNCTION + int &size() { return data[8]; } + KOKKOS_FORCEINLINE_FUNCTION + int &start_idx() { return data[9]; } + + bool buf_allocated; + buf_pool_t::weak_t buf; // comm buffer from pool + BufArray1D coalesced_buf; // Combined buffer + + void PrintInfo(const std::string &start); + + KOKKOS_DEFAULTED_FUNCTION + BndId() = default; + KOKKOS_DEFAULTED_FUNCTION + BndId(const BndId &) = default; + + explicit BndId(const int *const data_in) { + for (int i = 0; i < NDAT; ++i) { + data[i] = data_in[i]; + } + } + + void Serialize(int *data_out) { + for (int i = 0; i < NDAT; ++i) { + data_out[i] = data[i]; + } + } + + bool SameBVChannel(const BndId &other) { + // Don't want to compare start_idx, so -1 + for (int i = 0; i < NDAT - 1; ++i) { + if (data[i] != other.data[i]) return false; + } + return true; + } + + static BndId GetSend(MeshBlock *pmb, const NeighborBlock &nb, + std::shared_ptr> v, BoundaryType b_type, + int partition, int start_idx); +}; +} // namespace parthenon + +#endif // BVALS_COMMS_BND_ID_HPP_ diff --git a/src/bvals/comms/bnd_info.cpp b/src/bvals/comms/bnd_info.cpp index 2b4948278c38..78ee15f229db 100644 --- a/src/bvals/comms/bnd_info.cpp +++ b/src/bvals/comms/bnd_info.cpp @@ -24,6 +24,7 @@ #include "basic_types.hpp" #include "bvals/comms/bnd_info.hpp" +#include "bvals/comms/bvals_utils.hpp" #include "bvals/neighbor_block.hpp" #include "config.hpp" #include "globals.hpp" @@ -251,7 +252,7 @@ CalcIndices(const NeighborBlock &nb, MeshBlock *pmb, {s[2], e[2]}, {s[1], e[1]}, {s[0], e[0]}); } -int GetBufferSize(MeshBlock *pmb, const NeighborBlock &nb, +int GetBufferSize(const MeshBlock *const pmb, const NeighborBlock &nb, std::shared_ptr> v) { // This does not do a careful job of calculating the buffer size, in many // cases there will be some extra storage that is not required, but there @@ -277,10 +278,13 @@ BndInfo::BndInfo(MeshBlock *pmb, const NeighborBlock &nb, allocated = v->IsAllocated(); alloc_status = v->GetAllocationStatus(); - buf = combuf->buffer(); + // Sometimes we may build a BndInfo object just to get the + // size of the index space associated with the boundary. In + // that case an associated communication buffer may not exist + // and a nullptr will be passed instead. + if (combuf != nullptr) buf = combuf->buffer(); same_to_same = pmb->gid == nb.gid && nb.offsets.IsCell(); lcoord_trans = nb.lcoord_trans; - if (!allocated) return; if (nb.origin_loc.level() < pmb->loc.level()) { var = v->coarse_s.Get(); @@ -326,7 +330,7 @@ BndInfo BndInfo::GetSetBndInfo(MeshBlock *pmb, const NeighborBlock &nb, if (nb.offsets.IsCell()) idx_range_type = IndexRangeType::InteriorRecv; BndInfo out(pmb, nb, v, buf, idx_range_type); - auto buf_state = buf->GetState(); + auto buf_state = buf != nullptr ? buf->GetState() : BufferState::received; if (buf_state == BufferState::received) { out.buf_allocated = true; } else if (buf_state == BufferState::received_null) { diff --git a/src/bvals/comms/bnd_info.hpp b/src/bvals/comms/bnd_info.hpp index 43dfb06ef478..b4bbe391af89 100644 --- a/src/bvals/comms/bnd_info.hpp +++ b/src/bvals/comms/bnd_info.hpp @@ -56,6 +56,16 @@ struct BndInfo { SpatiallyMaskedIndexer6D idxer[3]; forest::LogicalCoordinateTransformation lcoord_trans; + // Number of points contained in this boundary region + KOKKOS_FORCEINLINE_FUNCTION + std::size_t size() const { + std::size_t s = 0; + for (int n = 0; n < ntopological_elements; ++n) { + s += idxer[n].size(); + } + return s; + } + CoordinateDirection dir{CoordinateDirection::X0DIR}; bool allocated = true; bool buf_allocated = true; @@ -125,7 +135,7 @@ struct ProResInfo { std::shared_ptr> v); }; -int GetBufferSize(MeshBlock *pmb, const NeighborBlock &nb, +int GetBufferSize(const MeshBlock *const pmb, const NeighborBlock &nb, std::shared_ptr> v); using BndInfoArr_t = ParArray1DRaw; diff --git a/src/bvals/comms/boundary_communication.cpp b/src/bvals/comms/boundary_communication.cpp index 78121cd3fac9..64154ce3b3e9 100644 --- a/src/bvals/comms/boundary_communication.cpp +++ b/src/bvals/comms/boundary_communication.cpp @@ -16,6 +16,7 @@ //======================================================================================== #include +#include #include // debug #include #include @@ -26,6 +27,7 @@ #include "bvals/boundary_conditions.hpp" #include "bvals_in_one.hpp" #include "bvals_utils.hpp" +#include "coalesced_buffers.hpp" #include "config.hpp" #include "globals.hpp" #include "interface/variable.hpp" @@ -62,7 +64,13 @@ TaskStatus SendBoundBufs(std::shared_ptr> &md) { if (nbound == 0) { return TaskStatus::complete; } - if (other_communication_unfinished) { + + bool can_write_combined{true}; + if (pmesh->do_coalesced_comms) + can_write_combined = + pmesh->pcoalesced_comms->IsAvailableForWrite(md.get(), bound_type); + + if (other_communication_unfinished || !can_write_combined) { return TaskStatus::incomplete; } @@ -146,14 +154,16 @@ TaskStatus SendBoundBufs(std::shared_ptr> &md) { if (bound_type == BoundaryType::any || bound_type == BoundaryType::nonlocal) Kokkos::fence(); #endif - + const bool coal_comm = pmesh->do_coalesced_comms; for (int ibuf = 0; ibuf < cache.buf_vec.size(); ++ibuf) { auto &buf = *cache.buf_vec[ibuf]; if (sending_nonzero_flags_h(ibuf) || !Globals::sparse_config.enabled) - buf.Send(); + buf.Send(coal_comm); else - buf.SendNull(); + buf.SendNull(coal_comm); } + if (pmesh->do_coalesced_comms) + pmesh->pcoalesced_comms->PackAndSend(md.get(), bound_type); return TaskStatus::complete; } @@ -177,9 +187,10 @@ TaskStatus StartReceiveBoundBufs(std::shared_ptr> &md) { if (cache.buf_vec.size() == 0) InitializeBufferCache(md, &(pmesh->boundary_comm_map), &cache, ReceiveKey, false); - - std::for_each(std::begin(cache.buf_vec), std::end(cache.buf_vec), - [](auto pbuf) { pbuf->TryStartReceive(); }); + if (!pmesh->do_coalesced_comms) { + std::for_each(std::begin(cache.buf_vec), std::end(cache.buf_vec), + [](auto pbuf) { pbuf->TryStartReceive(); }); + } return TaskStatus::complete; } @@ -208,9 +219,14 @@ TaskStatus ReceiveBoundBufs(std::shared_ptr> &md) { false); bool all_received = true; - std::for_each( - std::begin(cache.buf_vec), std::end(cache.buf_vec), - [&all_received](auto pbuf) { all_received = pbuf->TryReceive() && all_received; }); + if (pmesh->do_coalesced_comms) { + pmesh->pcoalesced_comms->TryReceiveAny(md.get(), bound_type); + } + const bool coal_comm = pmesh->do_coalesced_comms; + std::for_each(std::begin(cache.buf_vec), std::end(cache.buf_vec), + [&all_received, coal_comm](auto pbuf) { + all_received = pbuf->TryReceive(coal_comm) && all_received; + }); int ibound = 0; if (Globals::sparse_config.enabled && all_received) { diff --git a/src/bvals/comms/build_boundary_buffers.cpp b/src/bvals/comms/build_boundary_buffers.cpp index 9a4e3b5c4048..f8478c77bb52 100644 --- a/src/bvals/comms/build_boundary_buffers.cpp +++ b/src/bvals/comms/build_boundary_buffers.cpp @@ -27,6 +27,7 @@ #include "bvals_in_one.hpp" #include "bvals_utils.hpp" +#include "coalesced_buffers.hpp" #include "config.hpp" #include "globals.hpp" #include "interface/variable.hpp" @@ -110,6 +111,7 @@ void BuildBoundaryBufferSubset(std::shared_ptr> &md, tag = pmesh->tag_map.GetTag(pmb, nb); auto comm_label = v->label(); mpi_comm_t comm = pmesh->GetMPIComm(comm_label); + #else // Setting to zero is fine here since this doesn't actually get used when everything // is on the same rank @@ -117,7 +119,9 @@ void BuildBoundaryBufferSubset(std::shared_ptr> &md, #endif bool use_sparse_buffers = v->IsSet(Metadata::Sparse); - auto get_resource_method = [pmesh, buf_size]() { + auto get_resource_method = [pmesh, buf_size](int size) { + PARTHENON_REQUIRE(size <= buf_size, + "Asking for a buffer that is larger than size of pool."); return buf_pool_t::owner_t(pmesh->pool_map.at(buf_size).Get()); }; @@ -128,6 +132,11 @@ void BuildBoundaryBufferSubset(std::shared_ptr> &md, buf_map[s_key] = CommBuffer::owner_t>( tag, sender_rank, receiver_rank, comm, get_resource_method, use_sparse_buffers); + + // Register this buffer with the combined buffers (must happen after CommBuffer is + // created) + if (receiver_rank != sender_rank) + pmesh->pcoalesced_comms->AddSendBuffer(md->partition, pmb, nb, v, BTYPE); } // Also build the non-local receive buffers here @@ -138,6 +147,9 @@ void BuildBoundaryBufferSubset(std::shared_ptr> &md, buf_map[r_key] = CommBuffer::owner_t>( tag, receiver_rank, sender_rank, comm, get_resource_method, use_sparse_buffers); + // Register this buffer with the combined buffers (must happen after CommBuffer is + // created) + pmesh->pcoalesced_comms->AddRecvBuffer(pmb, nb, v, BTYPE); } } }); diff --git a/src/bvals/comms/bvals_utils.hpp b/src/bvals/comms/bvals_utils.hpp index 0c406b33c8c2..4bee2e14ec1f 100644 --- a/src/bvals/comms/bvals_utils.hpp +++ b/src/bvals/comms/bvals_utils.hpp @@ -25,6 +25,7 @@ #include #include +#include "bvals/comms/bnd_id.hpp" #include "bvals/comms/bnd_info.hpp" #include "bvals/comms/bvals_in_one.hpp" #include "interface/variable.hpp" @@ -67,6 +68,11 @@ inline Mesh::channel_key_t ReceiveKey(const MeshBlock *pmb, const NeighborBlock return {sender_id, receiver_id, pcv->label(), location_idx, other}; } +inline Mesh::channel_key_t GetChannelKey(BndId &in) { + return {in.send_gid(), in.recv_gid(), Variable::GetLabel(in.var_id()), + in.loc_idx(), in.extra_id()}; +} + // Build a vector of pointers to all of the sending or receiving communication buffers on // MeshData md. This cache is important for performance, since this elides a map look up // for the buffer every time the bvals code iterates over boundaries. diff --git a/src/bvals/comms/coalesced_buffers.cpp b/src/bvals/comms/coalesced_buffers.cpp new file mode 100644 index 000000000000..f900e872e5c9 --- /dev/null +++ b/src/bvals/comms/coalesced_buffers.cpp @@ -0,0 +1,522 @@ +//======================================================================================== +// (C) (or copyright) 2024. Triad National Security, LLC. All rights reserved. +// +// This program was produced under U.S. Government contract 89233218CNA000001 for Los +// Alamos National Laboratory (LANL), which is operated by Triad National Security, LLC +// for the U.S. Department of Energy/National Nuclear Security Administration. All rights +// in the program are reserved by Triad National Security, LLC, and the U.S. Department +// of Energy/National Nuclear Security Administration. The Government is granted for +// itself and others acting on its behalf a nonexclusive, paid-up, irrevocable worldwide +// license in this material to reproduce, prepare derivative works, distribute copies to +// the public, perform publicly and display publicly, and to permit others to do so. +//======================================================================================== +#include +#include +#include +#include +#include +#include +#include + +#include "basic_types.hpp" +#include "bvals/comms/bnd_id.hpp" +#include "bvals/comms/bvals_utils.hpp" +#include "bvals/comms/coalesced_buffers.hpp" +#include "bvals/neighbor_block.hpp" +#include "coordinates/coordinates.hpp" +#include "interface/mesh_data.hpp" +#include "interface/variable.hpp" +#include "mesh/mesh.hpp" +#include "mesh/meshblock.hpp" +#include "utils/communication_buffer.hpp" + +namespace parthenon { + +//---------------------------------------------------------------------------------------- +CoalescedBuffer::CoalescedBuffer(bool sender, int partition, int other_rank, + BoundaryType b_type, mpi_comm_t comm, Mesh *pmesh) + : sender(sender), partition(partition), other_rank(other_rank), b_type(b_type), + comm(comm), pmesh(pmesh), current_size(0), + coalesced_comm_buffer( + 2 * partition, sender ? Globals::my_rank : other_rank, + sender ? other_rank : Globals::my_rank, comm, + [](int size) { return buf_t("Combined Buffer", 2 * size); }, true), + sparse_status_buffer( + 2 * partition + 1, sender ? Globals::my_rank : other_rank, + sender ? other_rank : Globals::my_rank, comm, + [](int size) { return std::vector(size); }, true) {} + +//---------------------------------------------------------------------------------------- +ParArray1DRaw &CoalescedBuffer::GetBndIdsOnDevice(const std::set &vars, + int *pcomb_size) { + const auto &var_set = vars.size() == 0 ? all_vars : vars; + int nbnd_id{0}; + int comb_size{0}; + for (auto uid : var_set) { + // Skip this variable if it is not communicated in this BoundaryType + if (coalesced_info_buf.count(uid) == 0) continue; + nbnd_id += coalesced_info_buf.at(uid).size(); + for (auto &[bnd_id, pvbbuf] : coalesced_info_buf.at(uid)) { + auto buf_state = pvbbuf->GetState(); + if ((buf_state == BufferState::sending) || (buf_state == BufferState::received)) + comb_size += bnd_id.size(); + } + } + if (pcomb_size != nullptr) *pcomb_size = comb_size; + + bool updated = false; + if (comb_size > coalesced_comm_buffer.buffer().size()) { + PARTHENON_REQUIRE( + sender, "Something bad is going on if we are doing this on a receiving buffer."); + coalesced_comm_buffer.Allocate(comb_size); + updated = true; + } + + if (bnd_ids_device_map.count(var_set) == 0) + bnd_ids_device_map.emplace(std::make_pair( + var_set, ParArray1DRaw(parthenon::ViewOfViewAlloc("bnd_id"), nbnd_id))); + auto &bnd_ids_device = bnd_ids_device_map.at(var_set); + if (bnd_ids_host_map.count(var_set) == 0) + bnd_ids_host_map.emplace( + std::make_pair(var_set, create_view_of_view_mirror(bnd_ids_device))); + auto &bnd_ids_host = bnd_ids_host_map.at(var_set); + + if (nbnd_id != bnd_ids_device.size()) { + bnd_ids_device = ParArray1DRaw(parthenon::ViewOfViewAlloc("bnd_id"), nbnd_id); + bnd_ids_host = create_view_of_view_mirror(bnd_ids_device); + updated = true; + } + + int idx{0}; + int c_buf_idx{0}; // Index at which v-b buffer starts in combined buffer + for (auto uid : var_set) { + // Skip this variable if it is not communicated in this BoundaryType + if (coalesced_info_buf.count(uid) == 0) continue; + for (auto &[bnd_id, pvbbuf] : coalesced_info_buf.at(uid)) { + auto &bid_h = bnd_ids_host[idx]; + auto buf_state = pvbbuf->GetState(); + PARTHENON_REQUIRE(buf_state != BufferState::stale, + "Trying to work with a stale buffer."); + + const bool alloc = + (buf_state == BufferState::sending) || (buf_state == BufferState::received); + + // Test if this boundary has changed + if (!bid_h.SameBVChannel(bnd_id) || (bid_h.buf_allocated != alloc) || + (bid_h.start_idx() != c_buf_idx) || + !UsingSameResource(bid_h.buf, pvbbuf->buffer()) || + bid_h.coalesced_buf.data() != coalesced_comm_buffer.buffer().data()) { + updated = true; + bid_h = bnd_id; + bid_h.buf_allocated = alloc; + bid_h.start_idx() = c_buf_idx; + bid_h.coalesced_buf = coalesced_comm_buffer.buffer(); + if (bid_h.buf_allocated) bid_h.buf = pvbbuf->buffer(); + } + if (bid_h.buf_allocated) c_buf_idx += bid_h.size(); + idx++; + } + } + if (updated) Kokkos::deep_copy(bnd_ids_device, bnd_ids_host); + return bnd_ids_device; +} + +//---------------------------------------------------------------------------------------- +void CoalescedBuffer::PackAndSend(const std::set &vars) { + PARTHENON_REQUIRE(coalesced_comm_buffer.IsAvailableForWrite(), + "Trying to write to a buffer that is in use."); + int comb_size; + auto &bids = GetBndIdsOnDevice(vars, &comb_size); + Kokkos::parallel_for( + PARTHENON_AUTO_LABEL, + Kokkos::TeamPolicy<>(parthenon::DevExecSpace(), bids.size(), Kokkos::AUTO), + KOKKOS_LAMBDA(parthenon::team_mbr_t team_member) { + const int b = team_member.league_rank(); + if (bids[b].buf_allocated) { + const int buf_size = bids[b].size(); + Real *com_buf = &(bids[b].coalesced_buf(bids[b].start_idx())); + Real *buf = &(bids[b].buf(0)); + Kokkos::parallel_for(Kokkos::TeamThreadRange<>(team_member, buf_size), + [&](const int idx) { com_buf[idx] = buf[idx]; }); + } + }); +#ifdef MPI_PARALLEL + Kokkos::fence(); +#endif + coalesced_comm_buffer.Send(false, comb_size); + + // Send the sparse null info as well + if (bids.size() != sparse_status_buffer.buffer().size()) { + sparse_status_buffer.Allocate(bids.size()); + } + + const auto &var_set = vars.size() == 0 ? all_vars : vars; + auto &stat = sparse_status_buffer.buffer(); + int idx{0}; + for (auto uid : var_set) { + // Skip this variable if it is not communicated in this BoundaryType + if (coalesced_info_buf.count(uid) == 0) continue; + for (auto &[bnd_id, pvbbuf] : coalesced_info_buf.at(uid)) { + const auto state = pvbbuf->GetState(); + PARTHENON_REQUIRE(state == BufferState::sending || + state == BufferState::sending_null, + "Bad state."); + int send_type = (state == BufferState::sending); + stat[idx] = send_type; + ++idx; + } + } + sparse_status_buffer.Send(); + + // Information in these send buffers is no longer required + for (auto uid : var_set) { + // Skip this variable if it is not communicated in this BoundaryType + if (coalesced_info_buf.count(uid) == 0) continue; + for (auto &[bnd_id, pvbbuf] : coalesced_info_buf.at(uid)) { + pvbbuf->Stale(); + } + } +} + +//---------------------------------------------------------------------------------------- +bool CoalescedBuffer::TryReceiveAndUnpack(const std::set &vars) { + if ((sparse_status_buffer.GetState() == BufferState::received) && + (coalesced_comm_buffer.GetState() == BufferState::received)) + return true; + + const auto &var_set = vars.size() == 0 ? all_vars : vars; + // Make sure the var-boundary buffers are available to write to + int nbuf{0}; + for (auto uid : var_set) { + // Skip this variable if it is not communicated in this BoundaryType + if (coalesced_info_buf.count(uid) == 0) continue; + for (auto &[bnd_id, pvbbuf] : coalesced_info_buf.at(uid)) { + if (pvbbuf->GetState() != BufferState::stale) return false; + nbuf++; + } + } + + auto received_sparse = sparse_status_buffer.TryReceive(); + auto received = coalesced_comm_buffer.TryReceive(); + if (!received || !received_sparse) return false; + + // Allocate and free buffers as required + int idx{0}; + auto &stat = sparse_status_buffer.buffer(); + for (auto uid : var_set) { + // Skip this variable if it is not communicated in this BoundaryType + if (coalesced_info_buf.count(uid) == 0) continue; + for (auto &[bnd_id, pvbbuf] : coalesced_info_buf.at(uid)) { + if (stat[idx] == 1) { + pvbbuf->SetReceived(); + if (!pvbbuf->IsActive()) pvbbuf->Allocate(); + } else { + pvbbuf->SetReceivedNull(); + if (pvbbuf->IsActive()) pvbbuf->Free(); + } + idx++; + } + } + + auto &bids = GetBndIdsOnDevice(vars); + Kokkos::parallel_for( + PARTHENON_AUTO_LABEL, + Kokkos::TeamPolicy<>(parthenon::DevExecSpace(), bids.size(), Kokkos::AUTO), + KOKKOS_LAMBDA(parthenon::team_mbr_t team_member) { + const int b = team_member.league_rank(); + if (bids[b].buf_allocated) { + const int buf_size = bids[b].size(); + Real *com_buf = &(bids[b].coalesced_buf(bids[b].start_idx())); + Real *buf = &(bids[b].buf(0)); + Kokkos::parallel_for(Kokkos::TeamThreadRange<>(team_member, buf_size), + [&](const int idx) { buf[idx] = com_buf[idx]; }); + } + }); + coalesced_comm_buffer.Stale(); + sparse_status_buffer.Stale(); + + return true; +} + +//---------------------------------------------------------------------------------------- +void CoalescedBuffer::AddVarBoundary(BndId &bnd_id) { + auto key = GetChannelKey(bnd_id); + PARTHENON_REQUIRE(pmesh->boundary_comm_map.count(key), "Buffer doesn't exist."); + var_buf_t *pbuf = &(pmesh->boundary_comm_map.at(key)); + coalesced_info_buf[bnd_id.var_id()].push_back(std::make_pair(bnd_id, pbuf)); + current_size += bnd_id.size(); // This will be the maximum size of communication since + // it includes all variables + all_vars.insert(bnd_id.var_id()); +} + +void CoalescedBuffer::AddVarBoundary(MeshBlock *pmb, const NeighborBlock &nb, + const std::shared_ptr> &var) { + // Store both the variable-boundary buffer information and a pointer to the v-b buffer + // itself associated with var ids + BndId bnd_id = BndId::GetSend(pmb, nb, var, b_type, partition, -1); + AddVarBoundary(bnd_id); +} + +//---------------------------------------------------------------------------------------- +//---------------------------------------------------------------------------------------- +//---------------------------------------------------------------------------------------- +CoalescedBuffersRank::CoalescedBuffersRank(int o_rank, BoundaryType b_type, bool send, + mpi_comm_t comm, Mesh *pmesh) + : other_rank(o_rank), b_type(b_type), sender(send), buffers_built(false), comm(comm), + pmesh(pmesh) { + + int tag = static_cast(GetAssociatedSender(b_type)); + if (sender) { + message = com_buf_t(tag, Globals::my_rank, other_rank, comm, + [](int size) { return std::vector(size); }); + } else { + message = com_buf_t( + tag, other_rank, Globals::my_rank, comm, + [](int size) { return std::vector(size); }, true); + } + PARTHENON_REQUIRE(other_rank != Globals::my_rank, "Should only build for other ranks."); +} + +//---------------------------------------------------------------------------------------- +void CoalescedBuffersRank::AddSendBuffer(int partition, MeshBlock *pmb, + const NeighborBlock &nb, + const std::shared_ptr> &var) { + if (coalesced_bufs.count(partition) == 0) + coalesced_bufs.emplace( + std::make_pair(partition, CoalescedBuffer(true, partition, other_rank, b_type, + comm, pmb->pmy_mesh))); + + auto &coal_buf = coalesced_bufs.at(partition); + coal_buf.AddVarBoundary(pmb, nb, var); +} + +//---------------------------------------------------------------------------------------- +bool CoalescedBuffersRank::TryReceiveBufInfo() { + PARTHENON_REQUIRE(!sender, "Trying to receive on a combined sender."); + if (buffers_built) return buffers_built; + + bool received = message.TryReceive(); + if (received) { + auto &mess_buf = message.buffer(); + int npartitions = mess_buf[0]; + // Unpack into per combined buffer information + int idx{nglobal}; + + for (int p = 0; p < npartitions; ++p) { + const int partition = mess_buf[idx++]; + const int nbuf = mess_buf[idx++]; + const int total_size = mess_buf[idx++]; + + // Create the new partition + coalesced_bufs.emplace(std::make_pair( + partition, CoalescedBuffer(false, partition, other_rank, b_type, comm, pmesh))); + auto &coal_buf = coalesced_bufs.at(partition); + + for (int b = 0; b < nbuf; ++b) { + BndId bnd_id(&(mess_buf[idx])); + coal_buf.AddVarBoundary(bnd_id); + idx += BndId::NDAT; + } + } + message.Stale(); + + buffers_built = true; + return true; + } + return false; +} + +//---------------------------------------------------------------------------------------- +void CoalescedBuffersRank::ResolveAndSendBufInfo() { + // First calculate the total size of the message + int total_buffers{0}; + for (auto &[partition, coalesced_buf] : coalesced_bufs) + total_buffers += coalesced_buf.TotalBuffers(); + int total_partitions = coalesced_bufs.size(); + + int mesg_size = nglobal + nper_part * total_partitions + BndId::NDAT * total_buffers; + message.Allocate(mesg_size); + + auto &mess_buf = message.buffer(); + mess_buf[0] = total_partitions; + + // Pack the data + int idx{nglobal}; + for (auto &[partition, coalesced_buf] : coalesced_bufs) { + mess_buf[idx++] = partition; // Used as the comm tag + mess_buf[idx++] = coalesced_buf.TotalBuffers(); // Number of buffers + mess_buf[idx++] = + coalesced_buf.current_size; // combined size of buffers (now probably unused) + for (auto &[uid, v] : coalesced_buf.coalesced_info_buf) { + for (auto &[bnd_id, pbvbuf] : v) { + bnd_id.Serialize(&(mess_buf[idx])); + idx += BndId::NDAT; + } + } + } + + message.Send(); + + buffers_built = true; +} + +//---------------------------------------------------------------------------------------- +void CoalescedBuffersRank::PackAndSend(MeshData *pmd) { + PARTHENON_REQUIRE(buffers_built, + "Trying to send combined buffers before they have been built"); + if (coalesced_bufs.count(pmd->partition)) { + coalesced_bufs.at(pmd->partition).PackAndSend(pmd->GetUids()); + } + + return; +} + +//---------------------------------------------------------------------------------------- +bool CoalescedBuffersRank::IsAvailableForWrite(MeshData *pmd) { + PARTHENON_REQUIRE(sender, "Shouldn't be checking this on non-sender."); + if (coalesced_bufs.count(pmd->partition) == 0) return true; + return coalesced_bufs.at(pmd->partition).IsAvailableForWrite(); +} + +//---------------------------------------------------------------------------------------- +bool CoalescedBuffersRank::TryReceiveAndUnpack(MeshData *pmd, int partition) { + PARTHENON_REQUIRE(buffers_built, + "Trying to recv combined buffers before they have been built"); + PARTHENON_REQUIRE(coalesced_bufs.count(partition) > 0, + "Trying to receive on a non-existent combined receive buffer."); + return coalesced_bufs.at(partition).TryReceiveAndUnpack(pmd->GetUids()); +} + +//---------------------------------------------------------------------------------------- +//---------------------------------------------------------------------------------------- +//---------------------------------------------------------------------------------------- +CoalescedComms::CoalescedComms(Mesh *pmesh) : pmesh(pmesh) { + // TODO(LFR): Switch to a different communicator for each BoundaryType pair + for (auto b_type : + {BoundaryType::any, BoundaryType::flxcor_send, BoundaryType::gmg_same, + BoundaryType::gmg_restrict_send, BoundaryType::gmg_prolongate_send}) { + auto &comm = comms[b_type]; +#ifdef MPI_PARALLEL + PARTHENON_MPI_CHECK(MPI_Comm_dup(MPI_COMM_WORLD, &comm)); +#else + comm = 0; +#endif + } +} + +//---------------------------------------------------------------------------------------- +CoalescedComms::~CoalescedComms() { +#ifdef MPI_PARALLEL + for (auto &[b_type, comm] : comms) + PARTHENON_MPI_CHECK(MPI_Comm_free(&comm)); +#endif +} + +//---------------------------------------------------------------------------------------- +void CoalescedComms::clear() { + bool can_delete; + int iter{0}; + const int max_iters = 1e8; + do { + can_delete = true; + for (auto &[p, cbr] : coalesced_send_buffers) { + can_delete = cbr.message.IsAvailableForWrite() && can_delete; + for (auto &[r, cbrp] : cbr.coalesced_bufs) { + can_delete = cbrp.IsAvailableForWrite() && can_delete; + } + } + iter++; + } while (!can_delete && iter < max_iters); + if (iter >= max_iters) PARTHENON_FAIL("Waited too long to clear CoalescedComms."); + + coalesced_send_buffers.clear(); + coalesced_recv_buffers.clear(); +} + +//---------------------------------------------------------------------------------------- +void CoalescedComms::AddSendBuffer(int partition, MeshBlock *pmb, const NeighborBlock &nb, + const std::shared_ptr> &var, + BoundaryType b_type) { + if (coalesced_send_buffers.count({nb.rank, b_type}) == 0) + coalesced_send_buffers.emplace( + std::make_pair(std::make_pair(nb.rank, b_type), + CoalescedBuffersRank(nb.rank, b_type, true, + comms[GetAssociatedSender(b_type)], pmesh))); + coalesced_send_buffers.at({nb.rank, b_type}).AddSendBuffer(partition, pmb, nb, var); +} + +//---------------------------------------------------------------------------------------- +void CoalescedComms::AddRecvBuffer(MeshBlock *pmb, const NeighborBlock &nb, + const std::shared_ptr>, + BoundaryType b_type) { + // We don't actually know enough here to register this particular buffer, but we do + // know that it's existence implies that we need to receive a message from the + // neighbor block rank eventually telling us the details + if (coalesced_recv_buffers.count({nb.rank, b_type}) == 0) + coalesced_recv_buffers.emplace( + std::make_pair(std::make_pair(nb.rank, b_type), + CoalescedBuffersRank(nb.rank, b_type, false, + comms[GetAssociatedSender(b_type)], pmesh))); +} + +//---------------------------------------------------------------------------------------- +void CoalescedComms::ResolveAndSendSendBuffers() { + for (auto &[id, buf] : coalesced_send_buffers) + buf.ResolveAndSendBufInfo(); +} + +//---------------------------------------------------------------------------------------- +void CoalescedComms::ReceiveBufferInfo() { + constexpr std::int64_t max_it = 1e10; + std::vector received(coalesced_recv_buffers.size(), false); + bool all_received; + std::int64_t receive_iters = 0; + do { + all_received = true; + for (auto &[id, buf] : coalesced_recv_buffers) + all_received = buf.TryReceiveBufInfo() && all_received; + receive_iters++; + } while (!all_received && receive_iters < max_it); + PARTHENON_REQUIRE( + receive_iters < max_it, + "Too many iterations waiting to receive boundary communication buffers."); +} + +//---------------------------------------------------------------------------------------- +bool CoalescedComms::IsAvailableForWrite(MeshData *pmd, BoundaryType b_type) { + bool available{true}; + for (int rank = 0; rank < Globals::nranks; ++rank) { + if (coalesced_send_buffers.count({rank, b_type})) { + available = + available && coalesced_send_buffers.at({rank, b_type}).IsAvailableForWrite(pmd); + } + } + return available; +} + +//---------------------------------------------------------------------------------------- +void CoalescedComms::PackAndSend(MeshData *pmd, BoundaryType b_type) { + for (int rank = 0; rank < Globals::nranks; ++rank) { + if (coalesced_send_buffers.count({rank, b_type})) { + coalesced_send_buffers.at({rank, b_type}).PackAndSend(pmd); + } + } +} + +//---------------------------------------------------------------------------------------- +bool CoalescedComms::TryReceiveAny(MeshData *pmd, BoundaryType b_type) { +#ifdef MPI_PARALLEL + bool all_received = true; + for (int rank = 0; rank < Globals::nranks; ++rank) { + if (coalesced_recv_buffers.count({rank, b_type})) { + auto &coal_bufs = coalesced_recv_buffers.at({rank, b_type}); + for (auto &[partition, coal_buf] : coal_bufs.coalesced_bufs) { + bool received = coal_buf.TryReceiveAndUnpack(pmd->GetUids()); + all_received = all_received && received; + } + } + } + return all_received; +#endif +} +} // namespace parthenon diff --git a/src/bvals/comms/coalesced_buffers.hpp b/src/bvals/comms/coalesced_buffers.hpp new file mode 100644 index 000000000000..b9200713cacd --- /dev/null +++ b/src/bvals/comms/coalesced_buffers.hpp @@ -0,0 +1,177 @@ +//======================================================================================== +// (C) (or copyright) 2024. Triad National Security, LLC. All rights reserved. +// +// This program was produced under U.S. Government contract 89233218CNA000001 for Los +// Alamos National Laboratory (LANL), which is operated by Triad National Security, LLC +// for the U.S. Department of Energy/National Nuclear Security Administration. All rights +// in the program are reserved by Triad National Security, LLC, and the U.S. Department +// of Energy/National Nuclear Security Administration. The Government is granted for +// itself and others acting on its behalf a nonexclusive, paid-up, irrevocable worldwide +// license in this material to reproduce, prepare derivative works, distribute copies to +// the public, perform publicly and display publicly, and to permit others to do so. +//======================================================================================== + +#ifndef BVALS_COMMS_COALESCED_BUFFERS_HPP_ +#define BVALS_COMMS_COALESCED_BUFFERS_HPP_ + +#include +#include +#include +#include +#include +#include +#include + +#include "basic_types.hpp" +#include "bvals/comms/bnd_id.hpp" +#include "bvals/comms/bvals_utils.hpp" +#include "bvals/neighbor_block.hpp" +#include "coordinates/coordinates.hpp" +#include "interface/variable.hpp" +#include "mesh/mesh.hpp" +#include "mesh/meshblock.hpp" +#include "utils/communication_buffer.hpp" + +namespace parthenon { +// Structure containing the information required for sending coalesced +// messages between ranks + +struct uid_set_hash { + std::size_t operator()(const std::set &in) const { + std::size_t lhs{0}; + for (const auto &uid : in) { + std::size_t rhs = std::hash()(uid); + // Use the Boost hash function for lack of a better idea + lhs ^= rhs + 0x9e3779b9 + (lhs << 6) + (lhs >> 2); + } + return lhs; + } +}; + +struct CoalescedBuffer { + using buf_t = BufArray1D; + + // Rank that these buffers communicate with + BoundaryType b_type; + int other_rank; + int partition; + mpi_comm_t comm; + Mesh *pmesh; + bool sender; + + using var_buf_t = CommBuffer::owner_t>; + std::map>> coalesced_info_buf; + std::set all_vars; + std::unordered_map, ParArray1DRaw, uid_set_hash> + bnd_ids_device_map; + std::unordered_map, ParArray1DRaw::host_mirror_type, + uid_set_hash> + bnd_ids_host_map; + CommBuffer coalesced_comm_buffer; + CommBuffer> sparse_status_buffer; + int current_size; + + CoalescedBuffer(bool sender, int partition, int other_rank, BoundaryType b_type, + mpi_comm_t comm, Mesh *pmesh); + + int TotalBuffers() const { + int total_buffers{0}; + for (const auto &[uid, v] : coalesced_info_buf) + total_buffers += v.size(); + return total_buffers; + } + + void AddVarBoundary(BndId &bnd_id); + + void AddVarBoundary(MeshBlock *pmb, const NeighborBlock &nb, + const std::shared_ptr> &var); + + bool IsAvailableForWrite() { + return sparse_status_buffer.IsAvailableForWrite() && + coalesced_comm_buffer.IsAvailableForWrite(); + } + + ParArray1DRaw &GetBndIdsOnDevice(const std::set &vars, + int *pcomb_size = nullptr); + + void PackAndSend(const std::set &vars); + + bool TryReceiveAndUnpack(const std::set &vars); +}; + +struct CoalescedBuffersRank { + using coalesced_message_structure_t = std::vector; + using buf_t = BufArray1D; + + // Rank that these buffers communicate with + BoundaryType b_type; + int other_rank; + + // map from partion id to coalesced message structure for communication + // partition id of the sender will be the mpi tag we use + bool buffers_built{false}; + + std::map coalesced_bufs; + + static constexpr int nglobal{1}; + static constexpr int nper_part{3}; + + using com_buf_t = CommBuffer>; + com_buf_t message; + + mpi_comm_t comm; + Mesh *pmesh; + bool sender{true}; + + explicit CoalescedBuffersRank(int o_rank, BoundaryType b_type, bool send, + mpi_comm_t comm, Mesh *pmesh); + + void AddSendBuffer(int partition, MeshBlock *pmb, const NeighborBlock &nb, + const std::shared_ptr> &var); + + bool TryReceiveBufInfo(); + + void ResolveAndSendBufInfo(); + + void PackAndSend(MeshData *pmd); + + bool TryReceiveAndUnpack(MeshData *pmd, int partition); + + bool IsAvailableForWrite(MeshData *pmd); +}; + +struct CoalescedComms { + // Combined buffers for each rank + std::map, CoalescedBuffersRank> coalesced_send_buffers; + std::map, CoalescedBuffersRank> coalesced_recv_buffers; + + std::map comms; + + Mesh *pmesh; + + explicit CoalescedComms(Mesh *pmesh); + + ~CoalescedComms(); + + void clear(); + + void AddSendBuffer(int partition, MeshBlock *pmb, const NeighborBlock &nb, + const std::shared_ptr> &var, BoundaryType b_type); + + void AddRecvBuffer(MeshBlock *pmb, const NeighborBlock &nb, + const std::shared_ptr>, BoundaryType b_type); + + void ResolveAndSendSendBuffers(); + + void ReceiveBufferInfo(); + + void PackAndSend(MeshData *pmd, BoundaryType b_type); + + bool TryReceiveAny(MeshData *pmd, BoundaryType b_type); + + bool IsAvailableForWrite(MeshData *pmd, BoundaryType b_type); +}; + +} // namespace parthenon + +#endif // BVALS_COMMS_COALESCED_BUFFERS_HPP_ diff --git a/src/interface/mesh_data.hpp b/src/interface/mesh_data.hpp index 6b6518ee83b9..687629377bf1 100644 --- a/src/interface/mesh_data.hpp +++ b/src/interface/mesh_data.hpp @@ -237,6 +237,8 @@ class MeshData { return IndexRange{-1, -2}; } + const auto &GetUids() const { return block_data_[0]->GetUids(); } + template void Add(Args &&...args) { for (const auto &pbd : block_data_) { diff --git a/src/interface/meshblock_data.hpp b/src/interface/meshblock_data.hpp index a6b681baf142..9c72289041c0 100644 --- a/src/interface/meshblock_data.hpp +++ b/src/interface/meshblock_data.hpp @@ -563,6 +563,8 @@ class MeshBlockData { }); } + const auto &GetUids() const { return varUidIn_; } + void SetAllVariablesToInitialized() { std::for_each(varVector_.begin(), varVector_.end(), [](auto &sp_var) { sp_var->data.initialized = true; }); diff --git a/src/interface/variable.hpp b/src/interface/variable.hpp index 6cd92d787ea0..3d262496e3c7 100644 --- a/src/interface/variable.hpp +++ b/src/interface/variable.hpp @@ -123,6 +123,7 @@ class Variable { Uid_t GetUniqueID() const { return uid_; } static Uid_t GetUniqueID(const std::string &var_label) { return get_uid_(var_label); } + static const std::string &GetLabel(Uid_t uid) { return get_uid_(uid); } /// return information string std::string info(); diff --git a/src/mesh/mesh.cpp b/src/mesh/mesh.cpp index 86d727bae46b..3acc13b26993 100644 --- a/src/mesh/mesh.cpp +++ b/src/mesh/mesh.cpp @@ -42,6 +42,7 @@ #include "application_input.hpp" #include "bvals/boundary_conditions.hpp" #include "bvals/bvals.hpp" +#include "bvals/comms/coalesced_buffers.hpp" #include "defs.hpp" #include "globals.hpp" #include "interface/packages.hpp" @@ -85,7 +86,10 @@ Mesh::Mesh(ParameterInput *pin, ApplicationInput *app_in, Packages_t &packages, lb_manual_(), nslist(Globals::nranks), nblist(Globals::nranks), nref(Globals::nranks), nderef(Globals::nranks), rdisp(Globals::nranks), ddisp(Globals::nranks), bnref(Globals::nranks), bnderef(Globals::nranks), - brdisp(Globals::nranks), bddisp(Globals::nranks) { + brdisp(Globals::nranks), bddisp(Globals::nranks), + pcoalesced_comms(std::make_shared(this)), + do_coalesced_comms{ + pin->GetOrAddBoolean("parthenon/mesh", "do_coalesced_comms", true)} { // Allow for user overrides to default Parthenon functions if (app_in->InitUserMeshData != nullptr) { InitUserMeshData = app_in->InitUserMeshData; @@ -619,6 +623,7 @@ void Mesh::BuildTagMapAndBoundaryBuffers() { // Clear boundary communication buffers boundary_comm_map.clear(); + pcoalesced_comms->clear(); // Build the boundary buffers for the current mesh for (auto &partition : GetDefaultBlockPartitions()) { @@ -635,6 +640,10 @@ void Mesh::BuildTagMapAndBoundaryBuffers() { } } } + + pcoalesced_comms->ResolveAndSendSendBuffers(); + // This operation is blocking + pcoalesced_comms->ReceiveBufferInfo(); } void Mesh::CommunicateBoundaries(std::string md_name, diff --git a/src/mesh/mesh.hpp b/src/mesh/mesh.hpp index ed26be1c6a1b..91431dcb8b48 100644 --- a/src/mesh/mesh.hpp +++ b/src/mesh/mesh.hpp @@ -59,6 +59,7 @@ namespace parthenon { // Forward declarations class ApplicationInput; +class CoalescedComms; class MeshBlock; class MeshRefinement; class Packages_t; @@ -130,6 +131,8 @@ class Mesh { int step_since_lb; int gflag; + const bool do_coalesced_comms; + BlockList_t block_list; Packages_t packages; std::shared_ptr resolved_packages; @@ -223,6 +226,9 @@ class Mesh { // Ordering here is important to prevent deallocation of pools before boundary // communication buffers + // channel_key_t is tuple of (gid_sender, gid_receiver, variable_name, + // block_location_idx, extra_delineater) which uniquely define a communication channel + // between two blocks for a given variable using channel_key_t = std::tuple; using comm_buf_t = CommBuffer::owner_t>; std::unordered_map> pool_map; @@ -231,6 +237,8 @@ class Mesh { comm_buf_map_t boundary_comm_map; TagMap tag_map; + std::shared_ptr pcoalesced_comms; + #ifdef MPI_PARALLEL MPI_Comm GetMPIComm(const std::string &label) const { return mpi_comm_map_.at(label); } #endif diff --git a/src/utils/communication_buffer.hpp b/src/utils/communication_buffer.hpp index edfc04f11c77..9f73d88cbc92 100644 --- a/src/utils/communication_buffer.hpp +++ b/src/utils/communication_buffer.hpp @@ -52,6 +52,8 @@ class CommBuffer { std::shared_ptr nrecv_tries_; std::shared_ptr my_request_; + using get_resource_func_t = std::function; + int my_rank; int tag_; int send_rank_; @@ -62,7 +64,7 @@ class CommBuffer { buf_base_t null_buf_ = std::numeric_limits::signaling_NaN(); bool active_ = false; - std::function get_resource_; + get_resource_func_t get_resource_; T buf_; @@ -77,7 +79,7 @@ class CommBuffer { } CommBuffer(int tag, int send_rank, int recv_rank, mpi_comm_t comm_, - std::function get_resource, bool do_sparse_allocation = false); + get_resource_func_t get_resource, bool do_sparse_allocation = false); ~CommBuffer(); @@ -93,9 +95,9 @@ class CommBuffer { T &buffer() { return buf_; } const T &buffer() const { return buf_; } - void Allocate() { - if (!active_) { - buf_ = get_resource_(); + void Allocate(int size = -1) { + if (!active_ || (size > 0 && buf_.size() != size)) { + buf_ = get_resource_(size); active_ = true; } } @@ -109,13 +111,26 @@ class CommBuffer { BufferState GetState() { return *state_; } - void Send() noexcept; - void SendNull() noexcept; + void Send(bool local = false, int size = -1) noexcept; + void SendNull(bool local = false) noexcept; bool IsAvailableForWrite(); void TryStartReceive() noexcept; - bool TryReceive() noexcept; + bool TryReceive(bool local = false) noexcept; + void SetReceived() noexcept { + PARTHENON_REQUIRE(*comm_type_ == BuffCommType::receiver || + *comm_type_ == BuffCommType::sparse_receiver, + "This doesn't make sense for a non-receiver."); + *state_ = BufferState::received; + } + + void SetReceivedNull() noexcept { + PARTHENON_REQUIRE(*comm_type_ == BuffCommType::sparse_receiver, + "This doesn't make sense for a non-receiver."); + *state_ = BufferState::received_null; + } + bool IsSafeToDelete() { if (*comm_type_ == BuffCommType::sparse_receiver || *comm_type_ == BuffCommType::receiver) { @@ -131,7 +146,7 @@ class CommBuffer { template CommBuffer::CommBuffer(int tag, int send_rank, int recv_rank, mpi_comm_t comm, - std::function get_resource, bool do_sparse_allocation) + get_resource_func_t get_resource, bool do_sparse_allocation) : state_(std::make_shared(BufferState::stale)), comm_type_(std::make_shared(BuffCommType::both)), started_irecv_(std::make_shared(false)), @@ -162,7 +177,8 @@ CommBuffer::CommBuffer(const CommBuffer &in) : buf_(in.buf_), state_(in.state_), comm_type_(in.comm_type_), started_irecv_(in.started_irecv_), nrecv_tries_(in.nrecv_tries_), my_request_(in.my_request_), tag_(in.tag_), send_rank_(in.send_rank_), - recv_rank_(in.recv_rank_), comm_(in.comm_), active_(in.active_) { + recv_rank_(in.recv_rank_), comm_(in.comm_), active_(in.active_), + get_resource_(in.get_resource_) { my_rank = Globals::my_rank; } @@ -202,30 +218,32 @@ CommBuffer &CommBuffer::operator=(const CommBuffer &in) { comm_ = in.comm_; active_ = in.active_; my_rank = Globals::my_rank; + get_resource_ = in.get_resource_; return *this; } template -void CommBuffer::Send() noexcept { +void CommBuffer::Send(bool local, int size) noexcept { if (!active_) { - SendNull(); + SendNull(local); return; } PARTHENON_DEBUG_REQUIRE(*state_ == BufferState::stale, "Trying to send from buffer that hasn't been staled."); *state_ = BufferState::sending; - if (*comm_type_ == BuffCommType::sender) { + if (*comm_type_ == BuffCommType::sender && !local) { // Make sure that this request isn't still out, // this could be blocking #ifdef MPI_PARALLEL PARTHENON_REQUIRE( buf_.size() > 0, "Trying to send zero size buffer, which will be interpreted as sending_null."); + if (size < 0) size = buf_.size(); + PARTHENON_REQUIRE(size <= buf_.size(), "Asking to send too much"); PARTHENON_MPI_CHECK(MPI_Wait(my_request_.get(), MPI_STATUS_IGNORE)); - PARTHENON_MPI_CHECK(MPI_Isend(buf_.data(), buf_.size(), - MPITypeMap::type(), recv_rank_, tag_, comm_, - my_request_.get())); + PARTHENON_MPI_CHECK(MPI_Isend(buf_.data(), size, MPITypeMap::type(), + recv_rank_, tag_, comm_, my_request_.get())); #endif } if (*comm_type_ == BuffCommType::receiver) { @@ -235,11 +253,11 @@ void CommBuffer::Send() noexcept { } template -void CommBuffer::SendNull() noexcept { +void CommBuffer::SendNull(bool local) noexcept { PARTHENON_DEBUG_REQUIRE(*state_ == BufferState::stale, "Trying to send_null from buffer that hasn't been staled."); *state_ = BufferState::sending_null; - if (*comm_type_ == BuffCommType::sender) { + if (*comm_type_ == BuffCommType::sender && !local) { // Make sure that this request isn't still out, // this could be blocking #ifdef MPI_PARALLEL @@ -286,8 +304,8 @@ void CommBuffer::TryStartReceive() noexcept { PARTHENON_REQUIRE( *my_request_ == MPI_REQUEST_NULL, "Cannot have another pending request in a buffer that is starting to receive."); - if (!IsActive()) - Allocate(); // For early start of Irecv, always need storage space even if not used + // For early start of Irecv, always need storage space even if not used + if (!IsActive()) Allocate(-1); PARTHENON_MPI_CHECK(MPI_Irecv(buf_.data(), buf_.size(), MPITypeMap::type(), send_rank_, tag_, comm_, my_request_.get())); @@ -303,7 +321,7 @@ void CommBuffer::TryStartReceive() noexcept { int size; PARTHENON_MPI_CHECK(MPI_Get_count(&status, MPITypeMap::type(), &size)); if (size > 0) { - if (!active_) Allocate(); + if (!active_ || buf_.size() < size) Allocate(size); PARTHENON_MPI_CHECK(MPI_Irecv(buf_.data(), buf_.size(), MPITypeMap::type(), send_rank_, tag_, comm_, my_request_.get())); @@ -319,12 +337,13 @@ void CommBuffer::TryStartReceive() noexcept { } template -bool CommBuffer::TryReceive() noexcept { +bool CommBuffer::TryReceive(bool local) noexcept { if (*state_ == BufferState::received || *state_ == BufferState::received_null) return true; - if (*comm_type_ == BuffCommType::receiver || - *comm_type_ == BuffCommType::sparse_receiver) { + if ((*comm_type_ == BuffCommType::receiver || + *comm_type_ == BuffCommType::sparse_receiver) && + !local) { #ifdef MPI_PARALLEL (*nrecv_tries_)++; PARTHENON_REQUIRE(*nrecv_tries_ < 1e8, @@ -398,7 +417,7 @@ bool CommBuffer::TryReceive() noexcept { return true; } return false; - } else { + } else if (*comm_type_ == BuffCommType::sender) { // This is an error since this is a purely send buffer PARTHENON_FAIL("Trying to receive on a sender"); } @@ -407,10 +426,6 @@ bool CommBuffer::TryReceive() noexcept { template void CommBuffer::Stale() { - PARTHENON_REQUIRE(*comm_type_ != BuffCommType::sender, "Should never get here."); - - if (!(*state_ == BufferState::received || *state_ == BufferState::received_null)) - PARTHENON_DEBUG_WARN("Staling buffer not in the received state."); #ifdef MPI_PARALLEL if (MPI_REQUEST_NULL != *my_request_) PARTHENON_WARN("Staling buffer with pending request."); diff --git a/src/utils/mpi_types.hpp b/src/utils/mpi_types.hpp index a990cecc7e3c..4685adef3158 100644 --- a/src/utils/mpi_types.hpp +++ b/src/utils/mpi_types.hpp @@ -52,9 +52,11 @@ namespace parthenon { #ifdef MPI_PARALLEL using mpi_request_t = MPI_Request; using mpi_comm_t = MPI_Comm; +using mpi_message_t = MPI_Message; #else using mpi_request_t = int; using mpi_comm_t = int; +using mpi_message_t = int; #endif } // namespace parthenon diff --git a/src/utils/unique_id.hpp b/src/utils/unique_id.hpp index eca507f886a1..e4bd9099893f 100644 --- a/src/utils/unique_id.hpp +++ b/src/utils/unique_id.hpp @@ -33,11 +33,15 @@ class UniqueIDGenerator { // as an invalid id Uid_t uid = uids_.size() + 1; uids_.emplace(key, uid); + uids_inverse_.emplace(uid, key); return uid; } + const T &operator()(const Uid_t uid) { return uids_inverse_.at(uid); } + private: std::unordered_map uids_; + std::unordered_map uids_inverse_; }; std::vector UidIntersection(std::vector v1, std::vector v2);