Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-44629: [C++][Acero] Use implicit_ordering for asof_join rather than require_sequenced_output #44616

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cpp/src/arrow/acero/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@ class ARROW_ACERO_EXPORT SourceNodeOptions : public ExecNodeOptions {
std::shared_ptr<Schema> output_schema;
/// \brief an asynchronous stream of batches ending with std::nullopt
std::function<Future<std::optional<ExecBatch>>()> generator;

Ordering ordering = Ordering::Unordered();
/// \brief the order of the data, defaults to Ordering::Unordered
Ordering ordering;
};

/// \brief a node that generates data from a table already loaded in memory
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/acero/source_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ struct SchemaSourceNode : public SourceNode {
struct RecordBatchReaderSourceNode : public SourceNode {
RecordBatchReaderSourceNode(ExecPlan* plan, std::shared_ptr<Schema> schema,
arrow::AsyncGenerator<std::optional<ExecBatch>> generator)
: SourceNode(plan, schema, generator, Ordering::Implicit()) {}
: SourceNode(plan, schema, generator) {}

static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
const ExecNodeOptions& options) {
Expand Down
10 changes: 5 additions & 5 deletions cpp/src/arrow/dataset/scanner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1000,6 +1000,7 @@ Result<acero::ExecNode*> MakeScanNode(acero::ExecPlan* plan,
auto scan_options = scan_node_options.scan_options;
auto dataset = scan_node_options.dataset;
bool require_sequenced_output = scan_node_options.require_sequenced_output;
bool implicit_ordering = scan_node_options.implicit_ordering;

RETURN_NOT_OK(NormalizeScanOptions(scan_options, dataset->schema()));

Expand Down Expand Up @@ -1032,11 +1033,11 @@ Result<acero::ExecNode*> MakeScanNode(acero::ExecPlan* plan,
} else {
batch_gen = std::move(merged_batch_gen);
}
int64_t index = require_sequenced_output ? 0 : compute::kUnsequencedIndex;

auto gen = MakeMappedGenerator(
std::move(batch_gen),
[scan_options, index](const EnumeratedRecordBatch& partial) mutable
-> Result<std::optional<compute::ExecBatch>> {
[scan_options](const EnumeratedRecordBatch& partial)
-> Result<std::optional<compute::ExecBatch>> {
// TODO(ARROW-13263) fragments may be able to attach more guarantees to batches
// than this, for example parquet's row group stats. Failing to do this leaves
// perf on the table because row group stats could be used to skip kernel execs in
Expand All @@ -1057,11 +1058,10 @@ Result<acero::ExecNode*> MakeScanNode(acero::ExecPlan* plan,
batch->values.emplace_back(partial.record_batch.index);
batch->values.emplace_back(partial.record_batch.last);
batch->values.emplace_back(partial.fragment.value->ToString());
if (index != compute::kUnsequencedIndex) batch->index = index++;
return batch;
});

auto ordering = require_sequenced_output ? Ordering::Implicit() : Ordering::Unordered();
auto ordering = implicit_ordering ? Ordering::Implicit() : Ordering::Unordered();

auto fields = scan_options->dataset_schema->fields();
if (scan_options->add_augmented_fields) {
Expand Down
7 changes: 5 additions & 2 deletions cpp/src/arrow/dataset/scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -563,14 +563,17 @@ class ARROW_DS_EXPORT ScanNodeOptions : public acero::ExecNodeOptions {
public:
explicit ScanNodeOptions(std::shared_ptr<Dataset> dataset,
std::shared_ptr<ScanOptions> scan_options,
bool require_sequenced_output = false)
bool require_sequenced_output = false,
bool implicit_ordering = false)
: dataset(std::move(dataset)),
scan_options(std::move(scan_options)),
require_sequenced_output(require_sequenced_output) {}
require_sequenced_output(require_sequenced_output),
implicit_ordering(implicit_ordering) {}

std::shared_ptr<Dataset> dataset;
std::shared_ptr<ScanOptions> scan_options;
bool require_sequenced_output;
bool implicit_ordering;
Comment on lines 575 to +576
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see there are documents of these two fields in the python counterpart. Could you add them in C++ too so this can be self-explaining?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, require_sequenced_output is handled by the scanner by collapsing the underlying generator to single-threaded, whereas implicit_ordering is delegated to the generated source node?

};

/// @}
Expand Down
12 changes: 8 additions & 4 deletions python/pyarrow/_dataset.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -4077,13 +4077,15 @@ cdef class _ScanNodeOptions(ExecNodeOptions):
cdef:
shared_ptr[CScanOptions] c_scan_options
bint require_sequenced_output=False
bint implicit_ordering=False

c_scan_options = Scanner._make_scan_options(dataset, scan_options)

require_sequenced_output=scan_options.get("require_sequenced_output", False)
implicit_ordering=scan_options.get("implicit_ordering", False)

self.wrapped.reset(
new CScanNodeOptions(dataset.unwrap(), c_scan_options, require_sequenced_output)
new CScanNodeOptions(dataset.unwrap(), c_scan_options, require_sequenced_output, implicit_ordering)
)


Expand All @@ -4101,8 +4103,8 @@ class ScanNodeOptions(_ScanNodeOptions):
expression or projection to the scan node that you also supply
to the filter or project node.

Yielded batches will be augmented with fragment/batch indices to
enable stable ordering for simple ExecPlans.
Yielded batches will be augmented with fragment/batch indices when
implicit_ordering=True to enable stable ordering for simple ExecPlans.

Parameters
----------
Expand All @@ -4111,7 +4113,9 @@ class ScanNodeOptions(_ScanNodeOptions):
**kwargs : dict, optional
Scan options. See `Scanner.from_dataset` for possible arguments.
require_sequenced_output : bool, default False
Assert implicit ordering on data.
Batches are yielded sequentially, like single-threaded
Comment on lines 4115 to +4116
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this still needed?

implicit_ordering : bool, default False
Preserve implicit ordering of data.
"""

def __init__(self, Dataset dataset, **kwargs):
Expand Down
8 changes: 4 additions & 4 deletions python/pyarrow/acero.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,10 @@ class InMemoryDataset:
ds = DatasetModuleStub


def _dataset_to_decl(dataset, use_threads=True, require_sequenced_output=False):
def _dataset_to_decl(dataset, use_threads=True, implicit_ordering=False):
decl = Declaration("scan", ScanNodeOptions(
dataset, use_threads=use_threads,
require_sequenced_output=require_sequenced_output))
implicit_ordering=implicit_ordering))

# Get rid of special dataset columns
# "__fragment_index", "__batch_index", "__last_in_fragment", "__filename"
Expand Down Expand Up @@ -316,15 +316,15 @@ def _perform_join_asof(left_operand, left_on, left_by,
left_source = _dataset_to_decl(
left_operand,
use_threads=use_threads,
require_sequenced_output=True)
implicit_ordering=True)
else:
left_source = Declaration(
"table_source", TableSourceNodeOptions(left_operand),
)
if isinstance(right_operand, ds.Dataset):
right_source = _dataset_to_decl(
right_operand, use_threads=use_threads,
require_sequenced_output=True)
implicit_ordering=True)
else:
right_source = Declaration(
"table_source", TableSourceNodeOptions(right_operand)
Expand Down
2 changes: 1 addition & 1 deletion python/pyarrow/includes/libarrow_dataset.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil:
CExpression filter

cdef cppclass CScanNodeOptions "arrow::dataset::ScanNodeOptions"(CExecNodeOptions):
CScanNodeOptions(shared_ptr[CDataset] dataset, shared_ptr[CScanOptions] scan_options, bint require_sequenced_output)
CScanNodeOptions(shared_ptr[CDataset] dataset, shared_ptr[CScanOptions] scan_options, bint require_sequenced_output, bint implicit_ordering)

shared_ptr[CScanOptions] scan_options

Expand Down
Loading