diff --git a/cpp/src/arrow/acero/options.h b/cpp/src/arrow/acero/options.h index 2beacfe26baa1..2eb7df0085155 100644 --- a/cpp/src/arrow/acero/options.h +++ b/cpp/src/arrow/acero/options.h @@ -103,8 +103,8 @@ class ARROW_ACERO_EXPORT SourceNodeOptions : public ExecNodeOptions { std::shared_ptr output_schema; /// \brief an asynchronous stream of batches ending with std::nullopt std::function>()> generator; - - Ordering ordering = Ordering::Unordered(); + /// \brief the order of the data, defaults to Ordering::Unordered + Ordering ordering; }; /// \brief a node that generates data from a table already loaded in memory diff --git a/cpp/src/arrow/acero/source_node.cc b/cpp/src/arrow/acero/source_node.cc index ac34e4b6a09fc..2d3e2a1da1735 100644 --- a/cpp/src/arrow/acero/source_node.cc +++ b/cpp/src/arrow/acero/source_node.cc @@ -407,7 +407,7 @@ struct SchemaSourceNode : public SourceNode { struct RecordBatchReaderSourceNode : public SourceNode { RecordBatchReaderSourceNode(ExecPlan* plan, std::shared_ptr schema, arrow::AsyncGenerator> generator) - : SourceNode(plan, schema, generator, Ordering::Implicit()) {} + : SourceNode(plan, schema, generator) {} static Result Make(ExecPlan* plan, std::vector inputs, const ExecNodeOptions& options) { diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc index 0df8fd802656c..e66fed3f06c9f 100644 --- a/cpp/src/arrow/dataset/scanner.cc +++ b/cpp/src/arrow/dataset/scanner.cc @@ -1000,6 +1000,7 @@ Result MakeScanNode(acero::ExecPlan* plan, auto scan_options = scan_node_options.scan_options; auto dataset = scan_node_options.dataset; bool require_sequenced_output = scan_node_options.require_sequenced_output; + bool implicit_ordering = scan_node_options.implicit_ordering; RETURN_NOT_OK(NormalizeScanOptions(scan_options, dataset->schema())); @@ -1032,11 +1033,11 @@ Result MakeScanNode(acero::ExecPlan* plan, } else { batch_gen = std::move(merged_batch_gen); } - int64_t index = require_sequenced_output ? 0 : compute::kUnsequencedIndex; + auto gen = MakeMappedGenerator( std::move(batch_gen), - [scan_options, index](const EnumeratedRecordBatch& partial) mutable - -> Result> { + [scan_options](const EnumeratedRecordBatch& partial) + -> Result> { // TODO(ARROW-13263) fragments may be able to attach more guarantees to batches // than this, for example parquet's row group stats. Failing to do this leaves // perf on the table because row group stats could be used to skip kernel execs in @@ -1057,11 +1058,10 @@ Result MakeScanNode(acero::ExecPlan* plan, batch->values.emplace_back(partial.record_batch.index); batch->values.emplace_back(partial.record_batch.last); batch->values.emplace_back(partial.fragment.value->ToString()); - if (index != compute::kUnsequencedIndex) batch->index = index++; return batch; }); - auto ordering = require_sequenced_output ? Ordering::Implicit() : Ordering::Unordered(); + auto ordering = implicit_ordering ? Ordering::Implicit() : Ordering::Unordered(); auto fields = scan_options->dataset_schema->fields(); if (scan_options->add_augmented_fields) { diff --git a/cpp/src/arrow/dataset/scanner.h b/cpp/src/arrow/dataset/scanner.h index d2de267897180..af8888ad873f1 100644 --- a/cpp/src/arrow/dataset/scanner.h +++ b/cpp/src/arrow/dataset/scanner.h @@ -557,20 +557,27 @@ class ARROW_DS_EXPORT ScannerBuilder { /// \brief Construct a source ExecNode which yields batches from a dataset scan. /// /// Does not construct associated filter or project nodes. -/// Yielded batches will be augmented with fragment/batch indices to enable stable -/// ordering for simple ExecPlans. +/// +/// Batches are yielded sequentially, like single-threaded, +/// when require_sequenced_output=true. +/// +/// Yielded batches will be augmented with fragment/batch indices when +/// implicit_ordering=true to enable stable ordering for simple ExecPlans. class ARROW_DS_EXPORT ScanNodeOptions : public acero::ExecNodeOptions { public: explicit ScanNodeOptions(std::shared_ptr dataset, std::shared_ptr scan_options, - bool require_sequenced_output = false) + bool require_sequenced_output = false, + bool implicit_ordering = false) : dataset(std::move(dataset)), scan_options(std::move(scan_options)), - require_sequenced_output(require_sequenced_output) {} + require_sequenced_output(require_sequenced_output), + implicit_ordering(implicit_ordering) {} std::shared_ptr dataset; std::shared_ptr scan_options; bool require_sequenced_output; + bool implicit_ordering; }; /// @} diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index fd50215cee9ae..f8c8b9bc11f6e 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -4077,13 +4077,15 @@ cdef class _ScanNodeOptions(ExecNodeOptions): cdef: shared_ptr[CScanOptions] c_scan_options bint require_sequenced_output=False + bint implicit_ordering=False c_scan_options = Scanner._make_scan_options(dataset, scan_options) require_sequenced_output=scan_options.get("require_sequenced_output", False) + implicit_ordering=scan_options.get("implicit_ordering", False) self.wrapped.reset( - new CScanNodeOptions(dataset.unwrap(), c_scan_options, require_sequenced_output) + new CScanNodeOptions(dataset.unwrap(), c_scan_options, require_sequenced_output, implicit_ordering) ) @@ -4101,8 +4103,8 @@ class ScanNodeOptions(_ScanNodeOptions): expression or projection to the scan node that you also supply to the filter or project node. - Yielded batches will be augmented with fragment/batch indices to - enable stable ordering for simple ExecPlans. + Yielded batches will be augmented with fragment/batch indices when + implicit_ordering=True to enable stable ordering for simple ExecPlans. Parameters ---------- @@ -4111,7 +4113,9 @@ class ScanNodeOptions(_ScanNodeOptions): **kwargs : dict, optional Scan options. See `Scanner.from_dataset` for possible arguments. require_sequenced_output : bool, default False - Assert implicit ordering on data. + Batches are yielded sequentially, like single-threaded + implicit_ordering : bool, default False + Preserve implicit ordering of data. """ def __init__(self, Dataset dataset, **kwargs): diff --git a/python/pyarrow/acero.py b/python/pyarrow/acero.py index 706338bd8cdb8..86bf7cbf4d29d 100644 --- a/python/pyarrow/acero.py +++ b/python/pyarrow/acero.py @@ -56,10 +56,10 @@ class InMemoryDataset: ds = DatasetModuleStub -def _dataset_to_decl(dataset, use_threads=True, require_sequenced_output=False): +def _dataset_to_decl(dataset, use_threads=True, implicit_ordering=False): decl = Declaration("scan", ScanNodeOptions( dataset, use_threads=use_threads, - require_sequenced_output=require_sequenced_output)) + implicit_ordering=implicit_ordering)) # Get rid of special dataset columns # "__fragment_index", "__batch_index", "__last_in_fragment", "__filename" @@ -316,7 +316,7 @@ def _perform_join_asof(left_operand, left_on, left_by, left_source = _dataset_to_decl( left_operand, use_threads=use_threads, - require_sequenced_output=True) + implicit_ordering=True) else: left_source = Declaration( "table_source", TableSourceNodeOptions(left_operand), @@ -324,7 +324,7 @@ def _perform_join_asof(left_operand, left_on, left_by, if isinstance(right_operand, ds.Dataset): right_source = _dataset_to_decl( right_operand, use_threads=use_threads, - require_sequenced_output=True) + implicit_ordering=True) else: right_source = Declaration( "table_source", TableSourceNodeOptions(right_operand) diff --git a/python/pyarrow/includes/libarrow_dataset.pxd b/python/pyarrow/includes/libarrow_dataset.pxd index d2fbcd0ee4d3b..403e306e13762 100644 --- a/python/pyarrow/includes/libarrow_dataset.pxd +++ b/python/pyarrow/includes/libarrow_dataset.pxd @@ -51,7 +51,7 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: CExpression filter cdef cppclass CScanNodeOptions "arrow::dataset::ScanNodeOptions"(CExecNodeOptions): - CScanNodeOptions(shared_ptr[CDataset] dataset, shared_ptr[CScanOptions] scan_options, bint require_sequenced_output) + CScanNodeOptions(shared_ptr[CDataset] dataset, shared_ptr[CScanOptions] scan_options, bint require_sequenced_output, bint implicit_ordering) shared_ptr[CScanOptions] scan_options