Skip to content

Commit

Permalink
API: Implement basic sorting
Browse files Browse the repository at this point in the history
This adds basic sorting.  The only missing piece right now is supporting
column and null order arguments.

Balancing was manually checked and seems fine, the tests seem pretty good
about checking the splitting but some random datasets should be thrown
into the mix for sure.

Signed-off-by: Sebastian Berg <[email protected]>
  • Loading branch information
seberg committed Jan 16, 2025
1 parent becf55f commit eb79f23
Show file tree
Hide file tree
Showing 10 changed files with 624 additions and 19 deletions.
1 change: 1 addition & 0 deletions cpp/include/legate_dataframe/core/library.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ enum : int {
ToTimestamps,
ExtractTimestampComponent,
Sequence,
Sort,
GroupByAggregation
};
}
Expand Down
8 changes: 7 additions & 1 deletion cpp/include/legate_dataframe/core/repartition_by_hash.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
* Copyright (c) 2023-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -23,6 +23,12 @@

namespace legate::dataframe::task {

std::pair<std::vector<cudf::table_view>,
std::unique_ptr<std::pair<std::map<int, rmm::device_buffer>, cudf::table>>>
shuffle(GPUTaskContext& ctx,
std::vector<cudf::table_view>& tbl_partitioned,
std::unique_ptr<cudf::table> owning_table);

/**
* @brief Repartition the table into hash table buckets for each rank/node.
*
Expand Down
30 changes: 30 additions & 0 deletions cpp/include/legate_dataframe/sort.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright (c) 2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <string>
#include <vector>

#include <legate_dataframe/core/table.hpp>

namespace legate::dataframe {

LogicalTable sort(const LogicalTable& tbl,
const std::vector<std::string>& keys,
bool stable = false);

} // namespace legate::dataframe
5 changes: 5 additions & 0 deletions cpp/src/core/library.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ class Mapper : public legate::mapping::Mapper {
// TODO: Join is identical to GroupBy, but we would have to look at
// both input tables to ge the maximum column number.
return std::nullopt;
case legate::dataframe::task::OpCode::Sort:
// Also similar to GroupBy, but does multiple shuffles and uses two
// additional helper columns
return std::nullopt;
case legate::dataframe::task::OpCode::GroupByAggregation: {
// Aggregation use repartitioning which uses ZCMEM for NCCL.
// This depends on the number of columns (first scalar when storing
Expand All @@ -88,6 +92,7 @@ class Mapper : public legate::mapping::Mapper {
// No need for repartitioning, so no need for ZCMEM
return 0;
}
// Note: + 2 is for sorting! TODO: refactor into helper...
auto num_cols = task.scalars().at(0).value<int32_t>();
auto nrank = task.get_launch_domain().get_volume();

Expand Down
45 changes: 28 additions & 17 deletions cpp/src/core/repartition_by_hash.cu
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ class ExchangedSizes {
}
};

} // namespace

/**
* @brief Shuffle (all-to-all exchange) packed cudf partitioned table.
*
Expand All @@ -127,14 +129,16 @@ class ExchangedSizes {
* that `tbl_partitioned.at(i)` should end up at rank i.
* @param owning_table Optional table owning the data in `tbl_partitioned`.
* This table is cleaned up early to reduce the peak memory usage.
* @return A vector of tables (each ranks data including itself) and a mapping
* of buffers owning the table data (for cleanup). The caller must ensure the
* tables are copied before the buffers are cleaned up.
* If passed, `tbl_partitioned` is also cleared (as the content is invalid).
* @return An std::pair where the first entry contains a vector of table_view
* with all the chunks (including the local copy). The second entry contains
* a unique_ptr whose contents owns all parts.
*/
std::pair<std::vector<cudf::table_view>, std::map<int, rmm::device_buffer>> shuffle(
GPUTaskContext& ctx,
const std::vector<cudf::table_view>& tbl_partitioned,
std::unique_ptr<cudf::table> owning_table)
std::pair<std::vector<cudf::table_view>,
std::unique_ptr<std::pair<std::map<int, rmm::device_buffer>, cudf::table>>>
shuffle(GPUTaskContext& ctx,
std::vector<cudf::table_view>& tbl_partitioned,
std::unique_ptr<cudf::table> owning_table)
{
if (tbl_partitioned.size() != ctx.nranks) {
throw std::runtime_error("internal error: partition split has wrong size.");
Expand All @@ -152,8 +156,10 @@ std::pair<std::vector<cudf::table_view>, std::map<int, rmm::device_buffer>> shuf
// Also copy tbl_partitioned.at(ctx.rank). This copy is unnecessary but allows
// clearing the (possibly) much larger owning_table (if passed).
cudf::table local_table(tbl_partitioned.at(ctx.rank), ctx.stream(), ctx.mr());
tbl_partitioned.clear();
owning_table.reset();
if (owning_table) {
tbl_partitioned.clear();
owning_table.reset();
}

assert(columns.size() == ctx.nranks - 1);
ExchangedSizes sizes(ctx, columns);
Expand Down Expand Up @@ -224,18 +230,25 @@ std::pair<std::vector<cudf::table_view>, std::map<int, rmm::device_buffer>> shuf
LEGATE_CHECK_CUDA(cudaStreamSynchronize(sizes.stream));

// Let's unpack and return the packed_columns received from our peers
// (and our own chunk so that `ret` is ordered for stable sorts)
std::vector<cudf::table_view> ret;
for (auto& [peer, buf] : recv_metadata) {
for (int peer = 0; peer < ctx.nranks; ++peer) {
if (peer == ctx.rank) {
ret.push_back(local_table.view());
continue;
}
uint8_t* gpu_data = nullptr;
if (recv_gpu_data.count(peer)) {
gpu_data = static_cast<uint8_t*>(recv_gpu_data.at(peer).data());
}
ret.push_back(cudf::unpack(buf.ptr(0), gpu_data));
ret.push_back(cudf::unpack(recv_metadata.at(peer).ptr(0), gpu_data));
}
return std::make_pair(ret, std::move(recv_gpu_data));
}

} // namespace
using owner_t = std::pair<std::map<int, rmm::device_buffer>, cudf::table>;
return std::make_pair(
ret,
std::make_unique<owner_t>(std::make_pair(std::move(recv_gpu_data), std::move(local_table))));
}

std::unique_ptr<cudf::table> repartition_by_hash(
GPUTaskContext& ctx,
Expand Down Expand Up @@ -286,10 +299,8 @@ std::unique_ptr<cudf::table> repartition_by_hash(
tbl_partitioned = cudf::split(*partition_table, partition_offsets, ctx.stream());
}

auto [tables, buffers] = shuffle(ctx, tbl_partitioned, std::move(partition_table));
auto [tables, owners] = shuffle(ctx, tbl_partitioned, std::move(partition_table));

// Let's concatenate our own partition and all the partitioned received from the shuffle.
tables.push_back(local_table);
return cudf::concatenate(tables, ctx.stream(), ctx.mr());
}

Expand Down
Loading

0 comments on commit eb79f23

Please sign in to comment.