Skip to content

Commit

Permalink
Introduce unified DataSourceExec for provided datasources, remove `…
Browse files Browse the repository at this point in the history
…ParquetExec`, `CsvExec`, etc (#14224)

* unify ParquetExec, AvroExec, ArrowExec, NDJsonExec, MemoryExec into one DataSourceExec plan

* fix license headers

* fix compile errors on documents

* separate non-parquet code

* format code

* fix typo

* fix imports

* fix clippy
fix csv_json example

* add comment to the example

* fix cargo docs

* change MemoryExec with MemorySourceConfig

* merge fixes

* change MemoryExec to DataSourceExec

* fix merge conflicts

* apply some syntactic sugars

* fix imports and comment line

* simplify some lines

* rename source_config as file_source

* format code

* format code

* make memory metrics default behavior

* remove unnecessary cfg check

* format code

* remove ParquetExec strings

* fix documents and imports

* fix imports

* add constraints and fix tests

* delete redundant file

* make metrics and statistics a part of File type specific configurations
make cache a part of DataSourceExec

* format code

* fix tests

* format code

* split repartitioning into DataSourceExec and FileSourceConfig parts

* move properties into DataSourceExec and split eq_properties and output_partitioning in DataSource trait

* clone source with Arc

* return file type as enum and do not downcast if not necessary
create fmt_extra method

* format code

* re-add deprecated plans in order to support backward compatibility

* reduce diff

* fix doc

* merge fixes

* remove unnecessary files

* rename config structs to source

* remove empty files
fix tests

* removed FileSourceConfig
projected_statistics must be solved!

* fix base_config formatting

* format code

* fix repartition logic

* fix merge conflicts

* fix csv projection error

* clippy fix

* use new() on initialization

* use DataSourceExec on deprecated file operators as well

* move ParquetSource into source.rs
fix doc errors

* use ParquetSource only if parquet feature is enabled

* fix slt tests

* add with_fetch API to MemorySourceConfig and re-add deprecated MemoryExec

* fix merge conflicts
fix memory source fetch error

* format code

* change FileType enum into a dyn Trait so that it can be extensible

* remove metadata_size_hint from required ParquetSource parameters

* remove FileType trait and split with_predicate logic for ParquetSource

* remove predicate from initialization of ParquetSource

* remove unnecessary imports

* deprecate ParquetExecBuilder and add doc hints

* fix slt

* fix clippy

* fix fmt

* return reference of the Arc in source()

* re-add deprecated exec files

* fix doc error
  • Loading branch information
mertak-synnada authored Feb 6, 2025
1 parent 8ebed67 commit 5e1e693
Show file tree
Hide file tree
Showing 170 changed files with 4,868 additions and 3,460 deletions.
20 changes: 11 additions & 9 deletions datafusion-cli/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,29 +16,31 @@
// under the License.

//! Functions that are query-able and searchable via the `\h` command
use std::fmt;
use std::fs::File;
use std::str::FromStr;
use std::sync::Arc;

use arrow::array::{Int64Array, StringArray};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use arrow::util::pretty::pretty_format_batches;
use async_trait::async_trait;

use datafusion::catalog::{Session, TableFunctionImpl};
use datafusion::common::{plan_err, Column};
use datafusion::datasource::TableProvider;
use datafusion::error::Result;
use datafusion::logical_expr::Expr;
use datafusion::physical_plan::memory::MemoryExec;
use datafusion::physical_plan::memory::MemorySourceConfig;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::scalar::ScalarValue;

use async_trait::async_trait;
use parquet::basic::ConvertedType;
use parquet::data_type::{ByteArray, FixedLenByteArray};
use parquet::file::reader::FileReader;
use parquet::file::serialized_reader::SerializedFileReader;
use parquet::file::statistics::Statistics;
use std::fmt;
use std::fs::File;
use std::str::FromStr;
use std::sync::Arc;

#[derive(Debug)]
pub enum Function {
Expand Down Expand Up @@ -240,11 +242,11 @@ impl TableProvider for ParquetMetadataTable {
_filters: &[Expr],
_limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(MemoryExec::try_new(
Ok(MemorySourceConfig::try_new_exec(
&[vec![self.batch.clone()]],
TableProvider::schema(self),
projection.cloned(),
)?))
)?)
}
}

Expand Down
92 changes: 47 additions & 45 deletions datafusion-examples/examples/advanced_parquet_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,14 @@ use std::sync::Arc;

use arrow::array::{ArrayRef, Int32Array, RecordBatch, StringArray};
use arrow::datatypes::SchemaRef;
use async_trait::async_trait;
use bytes::Bytes;
use datafusion::catalog::Session;
use datafusion::common::{
internal_datafusion_err, DFSchema, DataFusionError, Result, ScalarValue,
};
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::physical_plan::parquet::{
ParquetAccessPlan, ParquetExecBuilder,
};
use datafusion::datasource::physical_plan::parquet::ParquetAccessPlan;
use datafusion::datasource::physical_plan::{
parquet::ParquetFileReaderFactory, FileMeta, FileScanConfig,
parquet::ParquetFileReaderFactory, FileMeta, FileScanConfig, ParquetSource,
};
use datafusion::datasource::TableProvider;
use datafusion::execution::object_store::ObjectStoreUrl;
Expand All @@ -56,6 +52,9 @@ use datafusion::physical_optimizer::pruning::PruningPredicate;
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::*;

use async_trait::async_trait;
use bytes::Bytes;
use futures::future::BoxFuture;
use futures::FutureExt;
use object_store::ObjectStore;
Expand Down Expand Up @@ -83,8 +82,8 @@ use url::Url;
/// Specifically, this example illustrates how to:
/// 1. Use [`ParquetFileReaderFactory`] to avoid re-reading parquet metadata on each query
/// 2. Use [`PruningPredicate`] for predicate analysis
/// 3. Pass a row group selection to [`ParquetExec`]
/// 4. Pass a row selection (within a row group) to [`ParquetExec`]
/// 3. Pass a row group selection to [`ParquetSource`]
/// 4. Pass a row selection (within a row group) to [`ParquetSource`]
///
/// Note this is a *VERY* low level example for people who want to build their
/// own custom indexes (e.g. for low latency queries). Most users should use
Expand All @@ -94,38 +93,38 @@ use url::Url;
///
/// # Diagram
///
/// This diagram shows how the `ParquetExec` is configured to do only a single
/// This diagram shows how the `DataSourceExec` with `ParquetSource` is configured to do only a single
/// (range) read from a parquet file, for the data that is needed. It does
/// not read the file footer or any of the row groups that are not needed.
///
/// ```text
/// ┌───────────────────────┐ The TableProvider configures the
/// │ ┌───────────────────┐ │ ParquetExec:
/// │ ┌───────────────────┐ │ DataSourceExec:
/// │ │ │ │
/// │ └───────────────────┘ │
/// │ ┌───────────────────┐ │
/// Row │ │ │ │ 1. To read only specific Row
/// Groups │ └───────────────────┘ │ Groups (the ParquetExec tries
/// Groups │ └───────────────────┘ │ Groups (the DataSourceExec tries
/// │ ┌───────────────────┐ │ to reduce this further based
/// │ │ │ │ on metadata)
/// │ └───────────────────┘ │ ┌────────────────────┐
/// │ ┌───────────────────┐ │ │ │
/// │ │ │◀┼ ─ ─ ┐ │ ParquetExec
/// │ └───────────────────┘ │ │ (Parquet Reader) │
/// │ ... │ └ ─ ─ ─ ─│ │
/// │ ┌───────────────────┐ │ │ ╔═══════════════╗ │
/// │ │ │ │ │ ║ParquetMetadata║ │
/// │ └───────────────────┘ │ │ ╚═══════════════╝ │
/// │ ╔═══════════════════╗ │ └────────────────────┘
/// │ └───────────────────┘ │ ┌──────────────────────
/// │ ┌───────────────────┐ │ │
/// │ │ │◀┼ ─ ─ ┐ │ DataSourceExec
/// │ └───────────────────┘ │ │ (Parquet Reader)
/// │ ... │ └ ─ ─ ─ ─│
/// │ ┌───────────────────┐ │ │ ╔═══════════════╗
/// │ │ │ │ │ ║ParquetMetadata║
/// │ └───────────────────┘ │ │ ╚═══════════════╝
/// │ ╔═══════════════════╗ │ └──────────────────────
/// │ ║ Thrift metadata ║ │
/// │ ╚═══════════════════╝ │ 1. With cached ParquetMetadata, so
/// └───────────────────────┘ the ParquetExec does not re-read /
/// └───────────────────────┘ the ParquetSource does not re-read /
/// Parquet File decode the thrift footer
///
/// ```
///
/// Within a Row Group, Column Chunks store data in DataPages. This example also
/// shows how to configure the ParquetExec to read a `RowSelection` (row ranges)
/// shows how to configure the ParquetSource to read a `RowSelection` (row ranges)
/// which will skip unneeded data pages. This requires that the Parquet file has
/// a [Page Index].
///
Expand All @@ -135,15 +134,15 @@ use url::Url;
/// │ │ Data Page is not fetched or decoded.
/// │ ┌───────────────────┐ │ Note this requires a PageIndex
/// │ │ ┌──────────┐ │ │
/// Row │ │ │DataPage 0│ │ │ ┌────────────────────┐
/// Groups │ │ └──────────┘ │ │ │ │
/// │ │ ┌──────────┐ │ │ │ ParquetExec
/// │ │ ... │DataPage 1│ ◀┼ ┼ ─ ─ ─ │ (Parquet Reader) │
/// │ │ └──────────┘ │ │ └ ─ ─ ─ ─ ─│ │
/// │ │ ┌──────────┐ │ │ │ ╔═══════════════╗ │
/// │ │ │DataPage 2│ │ │ If only rows │ ║ParquetMetadata║ │
/// │ │ └──────────┘ │ │ from DataPage 1 │ ╚═══════════════╝ │
/// │ └───────────────────┘ │ are selected, └────────────────────┘
/// Row │ │ │DataPage 0│ │ │ ┌──────────────────────
/// Groups │ │ └──────────┘ │ │ │
/// │ │ ┌──────────┐ │ │ │ DataSourceExec
/// │ │ ... │DataPage 1│ ◀┼ ┼ ─ ─ ─ │ (Parquet Reader)
/// │ │ └──────────┘ │ │ └ ─ ─ ─ ─ ─│
/// │ │ ┌──────────┐ │ │ │ ╔═══════════════╗
/// │ │ │DataPage 2│ │ │ If only rows │ ║ParquetMetadata║
/// │ │ └──────────┘ │ │ from DataPage 1 │ ╚═══════════════╝
/// │ └───────────────────┘ │ are selected, └──────────────────────
/// │ │ only DataPage 1
/// │ ... │ is fetched and
/// │ │ decoded
Expand Down Expand Up @@ -211,7 +210,7 @@ async fn main() -> Result<()> {
// pages that must be decoded
//
// Note: in order to prune pages, the Page Index must be loaded and the
// ParquetExec will load it on demand if not present. To avoid a second IO
// DataSourceExec will load it on demand if not present. To avoid a second IO
// during query, this example loaded the Page Index preemptively by setting
// `ArrowReader::with_page_index` in `IndexedFile::try_new`
provider.set_use_row_selection(true);
Expand Down Expand Up @@ -478,31 +477,34 @@ impl TableProvider for IndexTableProvider {

let partitioned_file = indexed_file
.partitioned_file()
// provide the starting access plan to the ParquetExec by
// provide the starting access plan to the DataSourceExec by
// storing it as "extensions" on PartitionedFile
.with_extensions(Arc::new(access_plan) as _);

// Prepare for scanning
let schema = self.schema();
let object_store_url = ObjectStoreUrl::parse("file://")?;
let file_scan_config = FileScanConfig::new(object_store_url, schema)
.with_limit(limit)
.with_projection(projection.cloned())
.with_file(partitioned_file);

// Configure a factory interface to avoid re-reading the metadata for each file
let reader_factory =
CachedParquetFileReaderFactory::new(Arc::clone(&self.object_store))
.with_file(indexed_file);

// Finally, put it all together into a ParquetExec
Ok(ParquetExecBuilder::new(file_scan_config)
// provide the predicate so the ParquetExec can try and prune
// row groups internally
.with_predicate(predicate)
// provide the factory to create parquet reader without re-reading metadata
.with_parquet_file_reader_factory(Arc::new(reader_factory))
.build_arc())
let file_source = Arc::new(
ParquetSource::default()
// provide the predicate so the DataSourceExec can try and prune
// row groups internally
.with_predicate(Arc::clone(&schema), predicate)
// provide the factory to create parquet reader without re-reading metadata
.with_parquet_file_reader_factory(Arc::new(reader_factory)),
);
let file_scan_config = FileScanConfig::new(object_store_url, schema, file_source)
.with_limit(limit)
.with_projection(projection.cloned())
.with_file(partitioned_file);

// Finally, put it all together into a DataSourceExec
Ok(file_scan_config.new_exec())
}

/// Tell DataFusion to push filters down to the scan method
Expand Down
61 changes: 35 additions & 26 deletions datafusion-examples/examples/csv_json_opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,21 @@
use std::sync::Arc;

use arrow::datatypes::{DataType, Field, Schema};
use datafusion::datasource::physical_plan::JsonSource;
use datafusion::{
assert_batches_eq,
datasource::{
data_source::FileSource,
file_format::file_compression_type::FileCompressionType,
listing::PartitionedFile,
object_store::ObjectStoreUrl,
physical_plan::{CsvConfig, CsvOpener, FileScanConfig, FileStream, JsonOpener},
physical_plan::{CsvSource, FileScanConfig, FileStream, JsonOpener},
},
error::Result,
physical_plan::metrics::ExecutionPlanMetricsSet,
test_util::aggr_test_schema,
};

use futures::StreamExt;
use object_store::{local::LocalFileSystem, memory::InMemory, ObjectStore};

Expand All @@ -48,29 +51,27 @@ async fn csv_opener() -> Result<()> {
let object_store = Arc::new(LocalFileSystem::new());
let schema = aggr_test_schema();

let config = CsvConfig::new(
8192,
schema.clone(),
Some(vec![12, 0]),
true,
b',',
b'"',
None,
object_store,
Some(b'#'),
);

let opener = CsvOpener::new(Arc::new(config), FileCompressionType::UNCOMPRESSED);

let testdata = datafusion::test_util::arrow_test_data();
let path = format!("{testdata}/csv/aggregate_test_100.csv");

let path = std::path::Path::new(&path).canonicalize()?;

let scan_config = FileScanConfig::new(ObjectStoreUrl::local_filesystem(), schema)
.with_projection(Some(vec![12, 0]))
.with_limit(Some(5))
.with_file(PartitionedFile::new(path.display().to_string(), 10));
let scan_config = FileScanConfig::new(
ObjectStoreUrl::local_filesystem(),
Arc::clone(&schema),
Arc::new(CsvSource::default()),
)
.with_projection(Some(vec![12, 0]))
.with_limit(Some(5))
.with_file(PartitionedFile::new(path.display().to_string(), 10));

let config = CsvSource::new(true, b',', b'"')
.with_comment(Some(b'#'))
.with_schema(schema)
.with_batch_size(8192)
.with_projection(&scan_config);

let opener = config.create_file_opener(Ok(object_store), &scan_config, 0)?;

let mut result = vec![];
let mut stream =
Expand Down Expand Up @@ -120,13 +121,21 @@ async fn json_opener() -> Result<()> {
Arc::new(object_store),
);

let scan_config = FileScanConfig::new(ObjectStoreUrl::local_filesystem(), schema)
.with_projection(Some(vec![1, 0]))
.with_limit(Some(5))
.with_file(PartitionedFile::new(path.to_string(), 10));

let mut stream =
FileStream::new(&scan_config, 0, opener, &ExecutionPlanMetricsSet::new())?;
let scan_config = FileScanConfig::new(
ObjectStoreUrl::local_filesystem(),
schema,
Arc::new(JsonSource::default()),
)
.with_projection(Some(vec![1, 0]))
.with_limit(Some(5))
.with_file(PartitionedFile::new(path.to_string(), 10));

let mut stream = FileStream::new(
&scan_config,
0,
Arc::new(opener),
&ExecutionPlanMetricsSet::new(),
)?;
let mut result = vec![];
while let Some(batch) = stream.next().await.transpose()? {
result.push(batch);
Expand Down
5 changes: 5 additions & 0 deletions datafusion-examples/examples/custom_file_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use arrow::{
datatypes::{DataType, Field, Schema, SchemaRef, UInt64Type},
};
use datafusion::common::{GetExt, Statistics};
use datafusion::datasource::data_source::FileSource;
use datafusion::execution::session_state::SessionStateBuilder;
use datafusion::physical_expr::LexRequirement;
use datafusion::physical_expr::PhysicalExpr;
Expand Down Expand Up @@ -126,6 +127,10 @@ impl FileFormat for TSVFileFormat {
.create_writer_physical_plan(input, state, conf, order_requirements)
.await
}

fn file_source(&self) -> Arc<dyn FileSource> {
self.csv_file_format.file_source()
}
}

#[derive(Default, Debug)]
Expand Down
28 changes: 19 additions & 9 deletions datafusion-examples/examples/parquet_exec_visitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ use std::sync::Arc;

use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::listing::{ListingOptions, PartitionedFile};
use datafusion::datasource::physical_plan::ParquetExec;
use datafusion::datasource::physical_plan::{FileScanConfig, ParquetSource};
use datafusion::error::DataFusionError;
use datafusion::execution::context::SessionContext;
use datafusion::physical_plan::metrics::MetricValue;
use datafusion::physical_plan::source::DataSourceExec;
use datafusion::physical_plan::{
execute_stream, visit_execution_plan, ExecutionPlan, ExecutionPlanVisitor,
};
Expand Down Expand Up @@ -96,15 +97,24 @@ impl ExecutionPlanVisitor for ParquetExecVisitor {
/// or `post_visit` (visit each node after its children/inputs)
fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result<bool, Self::Error> {
// If needed match on a specific `ExecutionPlan` node type
let maybe_parquet_exec = plan.as_any().downcast_ref::<ParquetExec>();
if let Some(parquet_exec) = maybe_parquet_exec {
self.file_groups = Some(parquet_exec.base_config().file_groups.clone());
if let Some(data_source) = plan.as_any().downcast_ref::<DataSourceExec>() {
let source = data_source.source();
if let Some(file_config) = source.as_any().downcast_ref::<FileScanConfig>() {
if file_config
.file_source()
.as_any()
.downcast_ref::<ParquetSource>()
.is_some()
{
self.file_groups = Some(file_config.file_groups.clone());

let metrics = match parquet_exec.metrics() {
None => return Ok(true),
Some(metrics) => metrics,
};
self.bytes_scanned = metrics.sum_by_name("bytes_scanned");
let metrics = match data_source.metrics() {
None => return Ok(true),
Some(metrics) => metrics,
};
self.bytes_scanned = metrics.sum_by_name("bytes_scanned");
}
}
}
Ok(true)
}
Expand Down
Loading

0 comments on commit 5e1e693

Please sign in to comment.