Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add initial support for parallel I/O #798

Merged
merged 5 commits into from
Jan 31, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/conda.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jobs:
target-platform: arm-64
conda-bld-dir: "/Users/runner/miniconda3/envs/test/conda-bld/arm-64/"
conda-bld-arg: "--no-test" # TODO
- os: "ubuntu-22.04"
- os: "ubuntu-24.04"
target-platform: linux-64
conda-bld-dir: "/home/runner/miniconda3/envs/test/conda-bld/linux-64/"
conda-bld-arg: ""
Expand Down
18 changes: 9 additions & 9 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ repos:
# - id: check-yaml
- id: check-added-large-files
- repo: https://github.com/psf/black
rev: 24.10.0
rev: 25.1.0
hooks:
- id: black
- repo: https://github.com/pre-commit/mirrors-mypy
rev: v1.11.2
rev: v1.14.1
hooks:
- id: mypy
# args: [--verbose]
Expand All @@ -33,26 +33,26 @@ repos:
source/framework/python/test/.*|
)$
- repo: https://github.com/pycqa/isort
rev: 5.13.2
rev: 6.0.0
hooks:
- id: isort
files: |
(?x)^(
.*\.py|
.*\.py.in|
)$
- repo: https://github.com/markdownlint/markdownlint
rev: v0.12.0
hooks:
- id: markdownlint
# TODO REQUIRES RUBY - repo: https://github.com/markdownlint/markdownlint
# TODO REQUIRES RUBY rev: v0.12.0
# TODO REQUIRES RUBY hooks:
# TODO REQUIRES RUBY - id: markdownlint
- repo: https://github.com/codespell-project/codespell
rev: v2.3.0
rev: v2.4.1
hooks:
- id: codespell
additional_dependencies:
- tomli
- repo: https://github.com/crate-ci/typos
rev: v1.26.0
rev: dictgen-v0.3.1
hooks:
- id: typos
args: [] # Relevant: don't update files, just report
Expand Down
27 changes: 18 additions & 9 deletions environment/cmake/LueConfiguration.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,14 @@ option(LUE_VALIDATE_IDXS
"Validate array indices are within array bounds (expensive!)"
FALSE)

# Only allow the user to configure the use of parallel I/O if this is something that is supported by the
# platform. If so, the default is to support parallel I/O.
cmake_dependent_option(LUE_FRAMEWORK_WITH_PARALLEL_IO
"Use parallel I/O for formats that support it"
TRUE
"LUE_BUILD_DATA_MODEL;LUE_HDF5_REQUIRED;HDF5_IS_PARALLEL;LUE_BUILD_FRAMEWORK;LUE_HPX_REQUIRED;HPX_WITH_NETWORKING;HPX_WITH_PARCELPORT_MPI"
FALSE
)

# Options related to the availability of external packages.
if(WIN32)
Expand Down Expand Up @@ -403,6 +411,10 @@ if(LUE_BUILD_FRAMEWORK)
if(LUE_FRAMEWORK_WITH_PYTHON_API)
set(LUE_PYBIND11_REQUIRED TRUE)
endif()

if(LUE_FRAMEWORK_WITH_PARALLEL_IO)
set(LUE_MPI_REQUIRED TRUE)
endif()
endif()


Expand Down Expand Up @@ -828,6 +840,12 @@ if(LUE_JUPYTER_BOOK_REQUIRED)
find_package(JupyterBook REQUIRED)
endif()


if(LUE_MPI_REQUIRED)
find_package(MPI REQUIRED)
endif()


if(LUE_NLOHMANN_JSON_REQUIRED)
FetchContent_Declare(nlohmann_json
GIT_REPOSITORY https://github.com/nlohmann/json.git
Expand All @@ -847,12 +865,3 @@ if(LUE_SPHINX_REQUIRED)
message(FATAL_ERROR "sphinx not found")
endif()
endif()

# Only allow the user to configure the use of parallel I/O if this is something that is supported by the
# platform. If so, the default is to support parallel I/O.
cmake_dependent_option(LUE_FRAMEWORK_WITH_PARALLEL_IO
"Use parallel I/O for formats that support it"
TRUE
"LUE_BUILD_DATA_MODEL;LUE_HDF5_REQUIRED;HDF5_IS_PARALLEL;LUE_BUILD_FRAMEWORK;LUE_HPX_REQUIRED;HPX_WITH_NETWORKING;HPX_WITH_PARCELPORT_MPI"
FALSE
)
1 change: 1 addition & 0 deletions source/framework/io/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,5 @@ target_link_libraries(lue_framework_io
lue::framework_algorithm
lue::data_model_hl
lue::gdal
$<$<BOOL:${LUE_FRAMEWORK_WITH_PARALLEL_IO}>:MPI::MPI_CXX>
)
1 change: 1 addition & 0 deletions source/framework/io/include/lue/framework/io/dataset.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once
#include "lue/data_model.hpp"
// #include <hpx/future.hpp>


namespace lue {
Expand Down
121 changes: 81 additions & 40 deletions source/framework/io/include/lue/framework/io/from_lue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,10 @@
#include "lue/framework/algorithm/policy.hpp"
#include "lue/framework/core/component.hpp"
#include "lue/framework/io/dataset.hpp"
// #include "lue/framework/io/io_strategy.hpp"
#include "lue/framework/io/util.hpp"
#include "lue/data_model/hl/raster_view.hpp"
#include "lue/data_model/hl/util.hpp"
#include "lue/configure.hpp"
#if LUE_FRAMEWORK_WITH_PARALLEL_IO
// Only available in case MPI is used in HPX
// #include <hpx/mpi_base/mpi_environment.hpp>
#endif


/*!
Expand All @@ -25,6 +20,17 @@
*/


// All tasks can perform independent I/O, asynchronous → independent is OK when blocks are
// non-overlapping / non-interleaved

// NOTE:
// Depending on the expected data size and number of your I/O requests, having it INDEPENDENT or
// COLLECTIVE can help or hinder. Independent is good if the data size of I/O is sufficiently
// large (hopefully a few file system blocksize), otherwise, Collective is better

// - Is there some heuristic we can use to choose between INDEPENDENT / COLLECTIVE?


namespace lue {
namespace detail {

Expand All @@ -34,21 +40,13 @@ namespace lue {
Property const& property,
CreateHyperslab create_hyperslab,
data_model::ID const object_id,
Partitions&& partitions) -> Partitions
Partitions&& partitions) -> std::tuple<Partitions, hpx::future<void>>
{
// Open value. Configure for use of parallel I/O if necessary.
hdf5::Dataset::TransferPropertyList transfer_property_list{};

#if LUE_FRAMEWORK_WITH_PARALLEL_IO
// All tasks can perform independent I/O, asynchronous → independent is OK when blocks are
// non-overlapping / non-interleaved

// KDJ: Old stuff, never worked, replace by something that does
// if(hpx::util::mpi_environment::enabled())
// {
// // Use collective I/O
// transfer_property_list.set_transfer_mode(::H5FD_MPIO_COLLECTIVE);
// }
transfer_property_list.set_transfer_mode(H5FD_MPIO_INDEPENDENT);
#endif

// Iterate over partitions and read each partition's piece from the dataset
Expand All @@ -65,6 +63,8 @@ namespace lue {
Partitions new_partitions(std::size(partitions));
hdf5::Datatype const memory_datatype{hdf5::native_datatype<Element>()};

std::vector<hpx::future<void>> partitions_read_f(std::size(partitions));

for (std::size_t partition_idx = 0; partition_idx < std::size(partitions); ++partition_idx)
{
Partition const& partition{partitions[partition_idx]};
Expand All @@ -87,22 +87,53 @@ namespace lue {
return partition;
};

// TODO In parallel I/O context: spawn as fast as possible

if (partition_idx == 0)
{
new_partitions[partition_idx] = hpx::async(read_partition);
std::tie(new_partitions[partition_idx], partitions_read_f[partition_idx]) =
hpx::split_future(hpx::async(
[read_partition]() -> std::tuple<Partition, bool>
{ return {read_partition(), true}; }));
}
else
{
lue_hpx_assert(new_partitions[partition_idx - 1].valid());
new_partitions[partition_idx] = new_partitions[partition_idx - 1].then(
[read_partition]([[maybe_unused]] Partition const& previous_new_partition)
{ return read_partition(); });
std::tie(new_partitions[partition_idx], partitions_read_f[partition_idx]) =
hpx::split_future(
new_partitions[partition_idx - 1]
.then(/* TODO sync? */
[read_partition](
[[maybe_unused]] Partition const& previous_new_partition)
-> std::tuple<Partition, bool>
{ return {read_partition(), true}; }));
}

lue_hpx_assert(new_partitions[partition_idx].valid());
lue_hpx_assert(partitions_read_f[partition_idx].valid());
}

return new_partitions;
return {std::move(new_partitions), hpx::when_all(std::move(partitions_read_f))};
}


auto keep_dataset_open_until_all_partitions_read(
hpx::future<void>&& all_partitions_read_f, hpx::future<data_model::Dataset>&& dataset_f)
{
// Once the partitions are ready, the dataset is not needed anymore. Until then, we need to keep
// the dataset instance alive.

all_partitions_read_f.then(
[dataset_f = std::move(dataset_f)]([[maybe_unused]] auto&& all_partitions_read_f) mutable
{
HPX_UNUSED(dataset_f);

// Just let the dataset_f go out of scope. We are the only one still holding on
// to it.
// We only get here once the partitions are ready, so by definition the dataset_f
// is ready.
lue_hpx_assert(dataset_f.is_ready());
});
}


Expand All @@ -112,20 +143,20 @@ namespace lue {
std::string const& array_pathname,
hdf5::Offset const& array_hyperslab_start, // Only needed to offset block read from array
data_model::ID const object_id,
Partitions&& partitions) -> hpx::future<Partitions>
Partitions const& partitions) -> hpx::future<Partitions>
{
using Partition = typename Partitions::value_type;
using PartitionServer = Partition::Server;

return hpx::dataflow(
auto [dataset_f, partitions_f, all_partitions_read_f] = hpx::split_future(hpx::dataflow(
hpx::launch::async,
hpx::unwrapping(
[policies, array_pathname, array_hyperslab_start, object_id](
std::vector<Partition>&& partitions)
{
auto const [dataset_pathname, phenomenon_name, property_set_name, property_name] =
parse_array_pathname(array_pathname);
auto const dataset = open_dataset(dataset_pathname, H5F_ACC_RDONLY);
auto dataset = open_dataset(dataset_pathname, H5F_ACC_RDONLY);

// Open phenomenon
auto const& phenomenon{dataset.phenomena()[phenomenon_name]};
Expand All @@ -149,14 +180,19 @@ namespace lue {
[array_hyperslab_start](PartitionServer const& partition_server)
{ return hyperslab(array_hyperslab_start, partition_server); };

return read_partitions(
policies,
property,
create_hyperslab,
object_id,
std::forward<Partitions>(partitions));
auto [partitions_f, all_partitions_read_f] = read_partitions(
policies, property, create_hyperslab, object_id, std::move(partitions));

return std::make_tuple(
std::move(dataset), std::move(partitions_f), std::move(all_partitions_read_f));
}),
hpx::when_all(partitions.begin(), partitions.end()));
hpx::when_all(partitions.begin(), partitions.end())));

keep_dataset_open_until_all_partitions_read(
std::move(all_partitions_read_f), std::move(dataset_f));

// For some reason, RVO doesn't kick in on some compilers (Clang, MSVS)
return std::move(partitions_f);
}


Expand All @@ -167,20 +203,20 @@ namespace lue {
hdf5::Offset const& array_hyperslab_start,
data_model::ID const object_id,
Index const time_step_idx,
Partitions&& partitions) -> hpx::future<Partitions>
Partitions const& partitions) -> hpx::future<Partitions>
{
using Partition = typename Partitions::value_type;
using PartitionServer = Partition::Server;

return hpx::dataflow(
auto [dataset_f, partitions_f, all_partitions_read_f] = hpx::split_future(hpx::dataflow(
hpx::launch::async,
hpx::unwrapping(
[policies, array_pathname, array_hyperslab_start, object_id, time_step_idx](
std::vector<Partition>&& partitions)
{
auto const [dataset_pathname, phenomenon_name, property_set_name, property_name] =
parse_array_pathname(array_pathname);
auto const dataset = open_dataset(dataset_pathname, H5F_ACC_RDONLY);
auto dataset = open_dataset(dataset_pathname, H5F_ACC_RDONLY);

// Open phenomenon
auto const& phenomenon{dataset.phenomena()[phenomenon_name]};
Expand All @@ -207,14 +243,19 @@ namespace lue {
[array_hyperslab_start, time_step_idx](PartitionServer const& partition_server)
{ return hyperslab(array_hyperslab_start, partition_server, 0, time_step_idx); };

return read_partitions(
policies,
property,
create_hyperslab,
object_id,
std::forward<Partitions>(partitions));
auto [partitions_f, all_partitions_read_f] = read_partitions(
policies, property, create_hyperslab, object_id, std::move(partitions));

return std::make_tuple(
std::move(dataset), std::move(partitions_f), std::move(all_partitions_read_f));
}),
hpx::when_all(partitions.begin(), partitions.end()));
hpx::when_all(partitions.begin(), partitions.end())));

keep_dataset_open_until_all_partitions_read(
std::move(all_partitions_read_f), std::move(dataset_f));

// For some reason, RVO doesn't kick in on some compilers (Clang, MSVS)
return std::move(partitions_f);
}


Expand Down
Loading
Loading