Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-41706: [C++][Acero] Enhance asof_join to work in multi-threaded execution by sequencing input #44083

Merged
merged 5 commits into from
Oct 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/src/arrow/acero/accumulation_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ class SerialSequencingQueue {
/// Strategy that describes how to handle items
class Processor {
public:
virtual ~Processor() = default;
/// Process the batch
///
/// This method will be called on each batch in order. Calls to this method
Expand Down
27 changes: 20 additions & 7 deletions cpp/src/arrow/acero/asof_join_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

#include "arrow/acero/asof_join_node.h"
#include "arrow/acero/accumulation_queue.h"
#include "arrow/acero/backpressure_handler.h"
#include "arrow/acero/concurrent_queue_internal.h"

Expand Down Expand Up @@ -471,7 +472,7 @@ class BackpressureController : public BackpressureControl {
std::atomic<int32_t>& backpressure_counter_;
};

class InputState {
class InputState : public util::SerialSequencingQueue::Processor {
// InputState corresponds to an input
// Input record batches are queued up in InputState until processed and
// turned into output record batches.
Expand All @@ -482,7 +483,8 @@ class InputState {
const std::shared_ptr<arrow::Schema>& schema,
const col_index_t time_col_index,
const std::vector<col_index_t>& key_col_index)
: queue_(std::move(handler)),
: sequencer_(util::SerialSequencingQueue::Make(this)),
queue_(std::move(handler)),
schema_(schema),
time_col_index_(time_col_index),
key_col_index_(key_col_index),
Expand Down Expand Up @@ -699,7 +701,16 @@ class InputState {
DEBUG_MANIP(std::endl));
return updated;
}
Status InsertBatch(ExecBatch batch) {
return sequencer_->InsertBatch(std::move(batch));
}

Status Process(ExecBatch batch) override {
auto rb = *batch.ToRecordBatch(schema_);
DEBUG_SYNC(node_, "received batch from input ", index_, ":", DEBUG_MANIP(std::endl),
rb->ToString(), DEBUG_MANIP(std::endl));
return Push(rb);
}
void Rehash() {
DEBUG_SYNC(node_, "rehashing for input ", index_, ":", DEBUG_MANIP(std::endl));
MemoStore new_memo(DEBUG_ADD(memo_.no_future_, node_, index_));
Expand Down Expand Up @@ -760,6 +771,8 @@ class InputState {
}

private:
// ExecBatch Sequencer
std::unique_ptr<util::SerialSequencingQueue> sequencer_;
// Pending record batches. The latest is the front. Batches cannot be empty.
BackpressureConcurrentQueue<std::shared_ptr<RecordBatch>> queue_;
// Schema associated with the input
Expand Down Expand Up @@ -1399,6 +1412,9 @@ class AsofJoinNode : public ExecNode {
// InputReceived may be called after execution was finished. Pushing it to the
// InputState is unnecessary since we're done (and anyway may cause the
// BackPressureController to pause the input, causing a deadlock), so drop it.
if (::arrow::compute::kUnsequencedIndex == batch.index)
return Status::Invalid("AsofJoin requires sequenced input");

if (process_task_.is_finished()) {
DEBUG_SYNC(this, "Input received while done. Short circuiting.",
DEBUG_MANIP(std::endl));
Expand All @@ -1409,12 +1425,9 @@ class AsofJoinNode : public ExecNode {
ARROW_DCHECK(std_has(inputs_, input));
size_t k = std_find(inputs_, input) - inputs_.begin();

// Put into the queue
auto rb = *batch.ToRecordBatch(input->output_schema());
DEBUG_SYNC(this, "received batch from input ", k, ":", DEBUG_MANIP(std::endl),
rb->ToString(), DEBUG_MANIP(std::endl));
// Put into the sequencing queue
ARROW_RETURN_NOT_OK(state_.at(k)->InsertBatch(std::move(batch)));

ARROW_RETURN_NOT_OK(state_.at(k)->Push(rb));
PushProcess(true);

return Status::OK();
Expand Down
8 changes: 6 additions & 2 deletions cpp/src/arrow/acero/asof_join_node_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ Result<BatchesWithSchema> MakeBatchesFromNumString(
BatchesWithSchema batches;
batches.schema = schema;
int n_fields = schema->num_fields();
size_t batch_index = 0;
for (auto num_batch : num_batches.batches) {
Datum two(Int32Scalar(2));
std::vector<Datum> values;
Expand Down Expand Up @@ -128,6 +129,7 @@ Result<BatchesWithSchema> MakeBatchesFromNumString(
}
}
ExecBatch batch(values, num_batch.length);
batch.index = batch_index++;
batches.batches.push_back(batch);
}
return batches;
Expand Down Expand Up @@ -185,6 +187,7 @@ Result<BatchesWithSchema> MutateByKey(BatchesWithSchema& batches, std::string fr
replace_key ? batches.schema->SetField(from_index, new_field)
: batches.schema->AddField(from_index, new_field));
}
size_t batch_index = 0;
for (const ExecBatch& batch : batches.batches) {
std::vector<Datum> new_values;
for (int i = 0; i < n_fields; i++) {
Expand Down Expand Up @@ -233,6 +236,7 @@ Result<BatchesWithSchema> MutateByKey(BatchesWithSchema& batches, std::string fr
new_values.push_back(value);
}
new_batches.batches.emplace_back(new_values, batch.length);
new_batches.batches.back().index = batch_index++;
}
return new_batches;
}
Expand Down Expand Up @@ -1571,15 +1575,15 @@ void TestSequencing(BatchesMaker maker, int num_batches, int batch_size) {
"asofjoin", {l_src, r_src}, GetRepeatedOptions(2, "time", {"key"}, 1000)};

QueryOptions query_options;
query_options.use_threads = false;
query_options.use_threads = true;
ASSERT_OK_AND_ASSIGN(BatchesWithCommonSchema batches,
DeclarationToExecBatches(asofjoin, query_options));

AssertExecBatchesSequenced(batches.batches);
}

TEST(AsofJoinTest, BatchSequencing) {
return TestSequencing(MakeIntegerBatches, /*num_batches=*/32, /*batch_size=*/1);
return TestSequencing(MakeIntegerBatches, /*num_batches=*/1000, /*batch_size=*/1);
}

template <typename BatchesMaker>
Expand Down
9 changes: 7 additions & 2 deletions cpp/src/arrow/acero/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,18 @@ class ARROW_ACERO_EXPORT SourceNodeOptions : public ExecNodeOptions {
public:
/// Create an instance from values
SourceNodeOptions(std::shared_ptr<Schema> output_schema,
std::function<Future<std::optional<ExecBatch>>()> generator)
: output_schema(std::move(output_schema)), generator(std::move(generator)) {}
std::function<Future<std::optional<ExecBatch>>()> generator,
Ordering ordering = Ordering::Unordered())
: output_schema(std::move(output_schema)),
generator(std::move(generator)),
ordering(std::move(ordering)) {}

/// \brief the schema for batches that will be generated by this source
std::shared_ptr<Schema> output_schema;
/// \brief an asynchronous stream of batches ending with std::nullopt
std::function<Future<std::optional<ExecBatch>>()> generator;

Ordering ordering = Ordering::Unordered();
};

/// \brief a node that generates data from a table already loaded in memory
Expand Down
5 changes: 3 additions & 2 deletions cpp/src/arrow/acero/source_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ struct SourceNode : ExecNode, public TracedNode {
RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, "SourceNode"));
const auto& source_options = checked_cast<const SourceNodeOptions&>(options);
return plan->EmplaceNode<SourceNode>(plan, source_options.output_schema,
source_options.generator);
source_options.generator,
source_options.ordering);
}

const char* kind_name() const override { return "SourceNode"; }
Expand Down Expand Up @@ -406,7 +407,7 @@ struct SchemaSourceNode : public SourceNode {
struct RecordBatchReaderSourceNode : public SourceNode {
RecordBatchReaderSourceNode(ExecPlan* plan, std::shared_ptr<Schema> schema,
arrow::AsyncGenerator<std::optional<ExecBatch>> generator)
: SourceNode(plan, schema, generator) {}
: SourceNode(plan, schema, generator, Ordering::Implicit()) {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this a bit too restrictive? This means all RecordBatch reader source always preserve the implicit order, even if if users do not care? Shouldn't this be configurable? What about other sources? Shouldn't the behaviour be the same for any source?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably option to specify ordering would be best. Or maybe automatically assign implicit ordering based on
batch.index!=compute::kUnsequencedIndex?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate on why your asof_join enhancement requires specifically the RecordBatchReaderSrouceNode to have implicit ordering?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does not. I assumed that when user provides a generator, it has some kind of implicot order. Now I see it is not necessarily a case.


static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
const ExecNodeOptions& options) {
Expand Down
4 changes: 4 additions & 0 deletions cpp/src/arrow/acero/test_util_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,7 @@ Result<BatchesWithSchema> MakeIntegerBatches(
int row = 0;
for (int i = 0; i < num_batches; i++) {
ARROW_ASSIGN_OR_RAISE(auto batch, MakeIntegerBatch(gens, schema, row, batch_size));
batch.index = i;
out.batches.push_back(std::move(batch));
row += batch_size;
}
Expand All @@ -410,6 +411,9 @@ BatchesWithSchema MakeBatchesFromString(const std::shared_ptr<Schema>& schema,
out_batches.batches.push_back(out_batches.batches[i]);
}
}
for (size_t batch_index = 0; batch_index < out_batches.batches.size(); ++batch_index) {
out_batches.batches[batch_index].index = batch_index;
}

return out_batches;
}
Expand Down
11 changes: 7 additions & 4 deletions cpp/src/arrow/dataset/scanner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1032,11 +1032,11 @@ Result<acero::ExecNode*> MakeScanNode(acero::ExecPlan* plan,
} else {
batch_gen = std::move(merged_batch_gen);
}

int64_t index = require_sequenced_output ? 0 : compute::kUnsequencedIndex;
auto gen = MakeMappedGenerator(
std::move(batch_gen),
[scan_options](const EnumeratedRecordBatch& partial)
-> Result<std::optional<compute::ExecBatch>> {
[scan_options, index](const EnumeratedRecordBatch& partial) mutable
-> Result<std::optional<compute::ExecBatch>> {
// TODO(ARROW-13263) fragments may be able to attach more guarantees to batches
// than this, for example parquet's row group stats. Failing to do this leaves
// perf on the table because row group stats could be used to skip kernel execs in
Expand All @@ -1057,9 +1057,12 @@ Result<acero::ExecNode*> MakeScanNode(acero::ExecPlan* plan,
batch->values.emplace_back(partial.record_batch.index);
batch->values.emplace_back(partial.record_batch.last);
batch->values.emplace_back(partial.fragment.value->ToString());
if (index != compute::kUnsequencedIndex) batch->index = index++;
return batch;
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like you are adding the batch index here, which is already done by the SourceNode when ordering = Ordering::Implicit() (see line 153-155):

plan_->query_context()->ScheduleTask(
[this, morsel_length, use_legacy_batching, initial_batch_index, morsel,
has_ordering = !ordering_.is_unordered()]() {
int64_t offset = 0;
int batch_index = initial_batch_index;
do {
int64_t batch_size =
std::min<int64_t>(morsel_length - offset, ExecPlan::kMaxBatchSize);
// In order for the legacy batching model to work we must
// not slice batches from the source
if (use_legacy_batching) {
batch_size = morsel_length;
}
ExecBatch batch = morsel.Slice(offset, batch_size);
UnalignedBufferHandling unaligned_buffer_handling =
plan_->query_context()->options().unaligned_buffer_handling.value_or(
GetDefaultUnalignedBufferHandling());
ARROW_RETURN_NOT_OK(
HandleUnalignedBuffers(&batch, unaligned_buffer_handling));
if (has_ordering) {
batch.index = batch_index;
}
offset += batch_size;
batch_index++;
ARROW_RETURN_NOT_OK(output_->InputReceived(this, std::move(batch)));
} while (offset < morsel.length);
return Status::OK();
},
"SourceNode::ProcessMorsel");

This seems redundant.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems so. I first added indexing and then fixed ordering information propagation to source_node. Not sure it is possible to update already merged PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not possible, I'll draft a followup PR, if you don't mind.


auto ordering = require_sequenced_output ? Ordering::Implicit() : Ordering::Unordered();

auto fields = scan_options->dataset_schema->fields();
if (scan_options->add_augmented_fields) {
for (const auto& aug_field : kAugmentedFields) {
Expand All @@ -1069,7 +1072,7 @@ Result<acero::ExecNode*> MakeScanNode(acero::ExecPlan* plan,

return acero::MakeExecNode(
"source", plan, {},
acero::SourceNodeOptions{schema(std::move(fields)), std::move(gen)});
acero::SourceNodeOptions{schema(std::move(fields)), std::move(gen), ordering});
}

Result<acero::ExecNode*> MakeAugmentedProjectNode(acero::ExecPlan* plan,
Expand Down
9 changes: 7 additions & 2 deletions python/pyarrow/_dataset.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -4015,11 +4015,14 @@ cdef class _ScanNodeOptions(ExecNodeOptions):
def _set_options(self, Dataset dataset, dict scan_options):
cdef:
shared_ptr[CScanOptions] c_scan_options
bint require_sequenced_output=False

c_scan_options = Scanner._make_scan_options(dataset, scan_options)

require_sequenced_output=scan_options.get("require_sequenced_output", False)

self.wrapped.reset(
new CScanNodeOptions(dataset.unwrap(), c_scan_options)
new CScanNodeOptions(dataset.unwrap(), c_scan_options, require_sequenced_output)
)


Expand All @@ -4045,7 +4048,9 @@ class ScanNodeOptions(_ScanNodeOptions):
dataset : pyarrow.dataset.Dataset
The table which acts as the data source.
**kwargs : dict, optional
Scan options. See `Scanner.from_dataset` for possible arguments.
Scan options. See `Scanner.from_dataset` for possible arguments.
require_sequenced_output : bool, default False
Assert implicit ordering on data.
"""

def __init__(self, Dataset dataset, **kwargs):
Expand Down
15 changes: 11 additions & 4 deletions python/pyarrow/acero.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,10 @@ class InMemoryDataset:
ds = DatasetModuleStub


def _dataset_to_decl(dataset, use_threads=True):
decl = Declaration("scan", ScanNodeOptions(dataset, use_threads=use_threads))
def _dataset_to_decl(dataset, use_threads=True, require_sequenced_output=False):
decl = Declaration("scan", ScanNodeOptions(
dataset, use_threads=use_threads,
require_sequenced_output=require_sequenced_output))

# Get rid of special dataset columns
# "__fragment_index", "__batch_index", "__last_in_fragment", "__filename"
Expand Down Expand Up @@ -311,13 +313,18 @@ def _perform_join_asof(left_operand, left_on, left_by,

# Add the join node to the execplan
if isinstance(left_operand, ds.Dataset):
left_source = _dataset_to_decl(left_operand, use_threads=use_threads)
left_source = _dataset_to_decl(
left_operand,
use_threads=use_threads,
require_sequenced_output=True)
else:
left_source = Declaration(
"table_source", TableSourceNodeOptions(left_operand),
)
if isinstance(right_operand, ds.Dataset):
right_source = _dataset_to_decl(right_operand, use_threads=use_threads)
right_source = _dataset_to_decl(
right_operand, use_threads=use_threads,
require_sequenced_output=True)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you really need the dataset to be sequenced? I think all you need is implicit ordering (batch indices) so you can sequence the batches at any point down the pipeline (by your asof join node).

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you really need the dataset to be sequenced? I think all you need is implicit ordering (batch indices) so you can sequence the batches at any point down the pipeline (by your asof join node).

True. But I don't think require_sequenced_output actually means batches should be produced by this node in order, but batches need to be sequenced (ie. have batch.index assigned). Current implementation does produce batches in order as a side effect of indexing.

In context of your GH-26818
Maybe the option require_sequenced_output should just be renamed to implicit_ordering?
What would be the purpose of require_sequenced_output!=implicit_ordering?

P.S. I think that the idea of "Dataset has implicit ordering" is not handled in arrow. There is no way to store ordering information in dataset and order is subjected to alphabetical ordering based on fragment filenames(which with default filename template is incorrect part_1 -> part_10 -> part2 -> part_3 etc...).

else:
right_source = Declaration(
"table_source", TableSourceNodeOptions(right_operand)
Expand Down
2 changes: 1 addition & 1 deletion python/pyarrow/includes/libarrow_dataset.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil:
CExpression filter

cdef cppclass CScanNodeOptions "arrow::dataset::ScanNodeOptions"(CExecNodeOptions):
CScanNodeOptions(shared_ptr[CDataset] dataset, shared_ptr[CScanOptions] scan_options)
CScanNodeOptions(shared_ptr[CDataset] dataset, shared_ptr[CScanOptions] scan_options, bint require_sequenced_output)

shared_ptr[CScanOptions] scan_options

Expand Down
Loading