Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

API: Implement sorting #13

Merged
merged 9 commits into from
Jan 30, 2025
1 change: 1 addition & 0 deletions cpp/include/legate_dataframe/core/library.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ enum : int {
ToTimestamps,
ExtractTimestampComponent,
Sequence,
Sort,
GroupByAggregation
};
}
Expand Down
8 changes: 7 additions & 1 deletion cpp/include/legate_dataframe/core/repartition_by_hash.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
* Copyright (c) 2023-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -23,6 +23,12 @@

namespace legate::dataframe::task {

std::pair<std::vector<cudf::table_view>,
std::unique_ptr<std::pair<std::map<int, rmm::device_buffer>, cudf::table>>>
shuffle(GPUTaskContext& ctx,
std::vector<cudf::table_view>& tbl_partitioned,
std::unique_ptr<cudf::table> owning_table);

/**
* @brief Repartition the table into hash table buckets for each rank/node.
*
Expand Down
46 changes: 46 additions & 0 deletions cpp/include/legate_dataframe/sort.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright (c) 2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <string>
#include <vector>

#include <legate_dataframe/core/table.hpp>

namespace legate::dataframe {

/**
* @brief Sort a logical table.
*
* Reorder the logical table so that the keys columns are sorted lexicographic
* based on their column_order and null_precedence.
*
* @param tbl The table to sort
* @param keys The column names to sort by.
* @param column_order Either ASCENDING or DESCENDING for each sort key/column.
* @param null_recedence Either BEFORE or AFTER for each sort key/column.
* AFTER means that nulls are considered larger and come last after an ascending
* and first after a descending sort.
* @return The sorted LogicalTable
*/
LogicalTable sort(const LogicalTable& tbl,
const std::vector<std::string>& keys,
const std::vector<cudf::order>& column_order,
const std::vector<cudf::null_order>& null_precedence,
bool stable = false);
Comment on lines +40 to +44
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good with a docstring of sort


} // namespace legate::dataframe
5 changes: 5 additions & 0 deletions cpp/src/core/library.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ class Mapper : public legate::mapping::Mapper {
// TODO: Join is identical to GroupBy, but we would have to look at
// both input tables to ge the maximum column number.
return std::nullopt;
case legate::dataframe::task::OpCode::Sort:
// Also similar to GroupBy, but does multiple shuffles and uses two
// additional helper columns
return std::nullopt;
case legate::dataframe::task::OpCode::GroupByAggregation: {
// Aggregation use repartitioning which uses ZCMEM for NCCL.
// This depends on the number of columns (first scalar when storing
Expand All @@ -88,6 +92,7 @@ class Mapper : public legate::mapping::Mapper {
// No need for repartitioning, so no need for ZCMEM
return 0;
}
// Note: + 2 is for sorting! TODO: refactor into helper...
auto num_cols = task.scalars().at(0).value<int32_t>();
auto nrank = task.get_launch_domain().get_volume();

Expand Down
88 changes: 53 additions & 35 deletions cpp/src/core/repartition_by_hash.cu
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
* Copyright (c) 2023-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -118,19 +118,49 @@ class ExchangedSizes {
}
};

} // namespace

/**
* @brief Shuffle (all-to-all exchange) packed cudf columns.
* @brief Shuffle (all-to-all exchange) packed cudf partitioned table.
*
*
* @param ctx The context of the calling task
* @param columns A mapping of tasks to their packed columns. E.g. `columns.at(i)`
* will be send to the i'th task. NB: all tasks beside itself must have a map thus:
* `columns.size() == ctx.nranks - 1`.
* @return A new table containing "this nodes" unpacked columns.
* @param tbl_partitioned The local table partitioned into multiple tables such
* that `tbl_partitioned.at(i)` should end up at rank i.
* @param owning_table Optional table owning the data in `tbl_partitioned`.
* This table is cleaned up early to reduce the peak memory usage.
* If passed, `tbl_partitioned` is also cleared (as the content is invalid).
* @return An std::pair where the first entry contains a vector of table_view
* with all the chunks (including the local copy). The second entry contains
* a unique_ptr whose contents owns all parts.
*/
std::pair<std::vector<cudf::table_view>, std::map<int, rmm::device_buffer>> shuffle(
GPUTaskContext& ctx, const std::map<int, cudf::packed_columns>& columns)
std::pair<std::vector<cudf::table_view>,
std::unique_ptr<std::pair<std::map<int, rmm::device_buffer>, cudf::table>>>
shuffle(GPUTaskContext& ctx,
std::vector<cudf::table_view>& tbl_partitioned,
std::unique_ptr<cudf::table> owning_table)
{
if (tbl_partitioned.size() != ctx.nranks) {
throw std::runtime_error("internal error: partition split has wrong size.");
}

// First we pack the columns into contiguous chunks for transfer/shuffling
// `columns.at(i)` will be send to the i'th task.
// N.B. all tasks beside itself have a map so `columns.size() == ctx.nranks - 1`.
std::map<int, cudf::packed_columns> columns;
for (int i = 0; static_cast<size_t>(i) < tbl_partitioned.size(); ++i) {
if (i != ctx.rank) {
columns[i] = cudf::detail::pack(tbl_partitioned[i], ctx.stream(), ctx.mr());
}
}
// Also copy tbl_partitioned.at(ctx.rank). This copy is unnecessary but allows
// clearing the (possibly) much larger owning_table (if passed).
cudf::table local_table(tbl_partitioned.at(ctx.rank), ctx.stream(), ctx.mr());
if (owning_table) {
tbl_partitioned.clear();
owning_table.reset();
}

assert(columns.size() == ctx.nranks - 1);
ExchangedSizes sizes(ctx, columns);

Expand Down Expand Up @@ -200,18 +230,25 @@ std::pair<std::vector<cudf::table_view>, std::map<int, rmm::device_buffer>> shuf
LEGATE_CHECK_CUDA(cudaStreamSynchronize(sizes.stream));

// Let's unpack and return the packed_columns received from our peers
// (and our own chunk so that `ret` is ordered for stable sorts)
std::vector<cudf::table_view> ret;
for (auto& [peer, buf] : recv_metadata) {
for (int peer = 0; peer < ctx.nranks; ++peer) {
if (peer == ctx.rank) {
ret.push_back(local_table.view());
continue;
}
uint8_t* gpu_data = nullptr;
if (recv_gpu_data.count(peer)) {
gpu_data = static_cast<uint8_t*>(recv_gpu_data.at(peer).data());
}
ret.push_back(cudf::unpack(buf.ptr(0), gpu_data));
ret.push_back(cudf::unpack(recv_metadata.at(peer).ptr(0), gpu_data));
}
return std::make_pair(ret, std::move(recv_gpu_data));
}

} // namespace
using owner_t = std::pair<std::map<int, rmm::device_buffer>, cudf::table>;
return std::make_pair(
ret,
std::make_unique<owner_t>(std::make_pair(std::move(recv_gpu_data), std::move(local_table))));
}

std::unique_ptr<cudf::table> repartition_by_hash(
GPUTaskContext& ctx,
Expand All @@ -223,8 +260,8 @@ std::unique_ptr<cudf::table> repartition_by_hash(
* 1) Each task split their local cudf table into `ctx.nranks` partitions based on the
* hashing of `columns_to_hash` and assign each partition to a task.
* 2) Each task pack (serialize) the partitions not assigned to itself.
* 3) All tasks exchange the sizes of their packed partitions and associated metadata.
* 4) All tasks shuffle (all-to-all exchange) the packed partitions.
* 4) All tasks shuffle (all-to-all exchange) the partitions. `shuffle` does this by first
* packing each partition into a contiguous memory block for the transfer.
* 5) Each task unpack (deserialize) and concatenate the received columns with the self-assigned
* partition.
* 6) Finally, each task return a new local cudf table that contains the concatenated partitions.
Expand Down Expand Up @@ -261,28 +298,9 @@ std::unique_ptr<cudf::table> repartition_by_hash(

tbl_partitioned = cudf::split(*partition_table, partition_offsets, ctx.stream());
}
if (tbl_partitioned.size() != ctx.nranks) {
throw std::runtime_error("internal error: partition split has wrong size.");
}

// Pack and shuffle the columns
std::map<int, cudf::packed_columns> packed_columns;
for (int i = 0; static_cast<size_t>(i) < tbl_partitioned.size(); ++i) {
if (i != ctx.rank) {
packed_columns[i] = cudf::detail::pack(tbl_partitioned[i], ctx.stream(), ctx.mr());
}
}
// Also copy tbl_partitioned.at(ctx.rank). This copy is unnecessary but allows
// clearing the (presumably) much larger partition_table.
cudf::table local_table(tbl_partitioned.at(ctx.rank), ctx.stream(), ctx.mr());
tbl_partitioned.clear();
partition_table.reset();

auto [tables, buffers] = shuffle(ctx, packed_columns);
packed_columns.clear(); // Clear packed columns to preserve memory
auto [tables, owners] = shuffle(ctx, tbl_partitioned, std::move(partition_table));

// Let's concatenate our own partition and all the partitioned received from the shuffle.
tables.push_back(local_table);
return cudf::concatenate(tables, ctx.stream(), ctx.mr());
}

Expand Down
Loading