From 167bbf9deab74ebf453675606f43c9e62894bf90 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Thu, 19 Dec 2024 15:56:30 +0100 Subject: [PATCH] Update DataFusion to 43 (#1125) * Update DataFusions to 43 * Fmt * Debug * Remove comment * Update proto * Update common proto as well --- Cargo.toml | 9 +- ballista/client/tests/context_setup.rs | 2 +- ballista/core/proto/datafusion.proto | 174 ++++++++---------- ballista/core/proto/datafusion_common.proto | 9 +- ballista/core/src/extension.rs | 2 +- ballista/core/src/utils.rs | 15 +- ballista/executor/src/executor_process.rs | 2 +- ballista/executor/src/standalone.rs | 2 +- ballista/scheduler/src/test_utils.rs | 5 +- benchmarks/src/bin/tpch.rs | 2 +- .../source/user-guide/extending-components.md | 2 +- examples/src/object_store.rs | 2 +- examples/tests/object_store.rs | 10 +- 13 files changed, 119 insertions(+), 117 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index f92064585..a9e9556fc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,11 +26,10 @@ arrow-flight = { version = "53", features = ["flight-sql-experimental"] } clap = { version = "4.5", features = ["derive", "cargo"] } configure_me = { version = "0.4.0" } configure_me_codegen = { version = "0.4.4" } -# bump directly to datafusion v43 to avoid the serde bug on v42 (https://github.com/apache/datafusion/pull/12626) -datafusion = "42.0.0" -datafusion-cli = "42.0.0" -datafusion-proto = "42.0.0" -datafusion-proto-common = "42.0.0" +datafusion = "43.0.0" +datafusion-cli = "43.0.0" +datafusion-proto = "43.0.0" +datafusion-proto-common = "43.0.0" object_store = "0.11" prost = "0.13" prost-types = "0.13" diff --git a/ballista/client/tests/context_setup.rs b/ballista/client/tests/context_setup.rs index 4dc6050df..7806d8476 100644 --- a/ballista/client/tests/context_setup.rs +++ b/ballista/client/tests/context_setup.rs @@ -380,7 +380,7 @@ mod standalone { } } - #[derive(Default)] + #[derive(Debug, Default)] struct BadPlanner {} #[async_trait::async_trait] diff --git a/ballista/core/proto/datafusion.proto b/ballista/core/proto/datafusion.proto index cf166ba93..5a044cb43 100644 --- a/ballista/core/proto/datafusion.proto +++ b/ballista/core/proto/datafusion.proto @@ -75,10 +75,6 @@ message LogicalExprNodeCollection { repeated LogicalExprNode logical_expr_nodes = 1; } -message SortExprNodeCollection { - repeated SortExprNode sort_expr_nodes = 1; -} - message ListingTableScanNode { reserved 1; // was string table_name TableReference table_name = 14; @@ -94,9 +90,8 @@ message ListingTableScanNode { datafusion_common.CsvFormat csv = 10; datafusion_common.ParquetFormat parquet = 11; datafusion_common.AvroFormat avro = 12; - datafusion_common.NdJsonFormat json = 15; } - repeated SortExprNodeCollection file_sort_order = 13; + repeated LogicalExprNodeCollection file_sort_order = 13; } message ViewTableScanNode { @@ -133,7 +128,7 @@ message SelectionNode { message SortNode { LogicalPlanNode input = 1; - repeated SortExprNode expr = 2; + repeated LogicalExprNode expr = 2; // Maximum number of highest/lowest rows to fetch; negative means no limit int64 fetch = 3; } @@ -164,12 +159,12 @@ message CreateExternalTableNode { repeated string table_partition_cols = 5; bool if_not_exists = 6; string definition = 7; - repeated SortExprNodeCollection order_exprs = 10; + repeated LogicalExprNodeCollection order_exprs = 10; bool unbounded = 11; map options = 8; datafusion_common.Constraints constraints = 12; map column_defaults = 13; -} + } message PrepareNode { string name = 1; @@ -249,51 +244,35 @@ message DistinctNode { message DistinctOnNode { repeated LogicalExprNode on_expr = 1; repeated LogicalExprNode select_expr = 2; - repeated SortExprNode sort_expr = 3; + repeated LogicalExprNode sort_expr = 3; LogicalPlanNode input = 4; } message CopyToNode { - LogicalPlanNode input = 1; - string output_url = 2; - bytes file_type = 3; - repeated string partition_by = 7; + LogicalPlanNode input = 1; + string output_url = 2; + oneof format_options { + datafusion_common.CsvOptions csv = 8; + datafusion_common.JsonOptions json = 9; + datafusion_common.TableParquetOptions parquet = 10; + datafusion_common.AvroOptions avro = 11; + datafusion_common.ArrowOptions arrow = 12; + } + repeated string partition_by = 7; } message UnnestNode { - LogicalPlanNode input = 1; - repeated ColumnUnnestExec exec_columns = 2; - repeated ColumnUnnestListItem list_type_columns = 3; - repeated uint64 struct_type_columns = 4; - repeated uint64 dependency_indices = 5; - datafusion_common.DfSchema schema = 6; - UnnestOptions options = 7; -} -message ColumnUnnestListItem { - uint32 input_index = 1; - ColumnUnnestListRecursion recursion = 2; -} - -message ColumnUnnestListRecursions { - repeated ColumnUnnestListRecursion recursions = 2; -} - -message ColumnUnnestListRecursion { - datafusion_common.Column output_column = 1; - uint32 depth = 2; -} - -message ColumnUnnestExec { - datafusion_common.Column column = 1; - oneof UnnestType { - ColumnUnnestListRecursions list = 2; - datafusion_common.EmptyMessage struct = 3; - datafusion_common.EmptyMessage inferred = 4; - } + LogicalPlanNode input = 1; + repeated datafusion_common.Column exec_columns = 2; + repeated uint64 list_type_columns = 3; + repeated uint64 struct_type_columns = 4; + repeated uint64 dependency_indices = 5; + datafusion_common.DfSchema schema = 6; + UnnestOptions options = 7; } message UnnestOptions { - bool preserve_nulls = 1; + bool preserve_nulls = 1; } message UnionNode { @@ -337,6 +316,8 @@ message LogicalExprNode { // binary expressions BinaryExprNode binary_expr = 4; + // aggregate expressions + AggregateExprNode aggregate_expr = 5; // null checks IsNull is_null_expr = 6; @@ -346,6 +327,7 @@ message LogicalExprNode { BetweenNode between = 9; CaseNode case_ = 10; CastNode cast = 11; + SortExprNode sort = 12; NegativeNode negative = 13; InListNode in_list = 14; Wildcard wildcard = 15; @@ -387,7 +369,7 @@ message LogicalExprNode { } message Wildcard { - TableReference qualifier = 1; + string qualifier = 1; } message PlaceholderNode { @@ -489,14 +471,57 @@ message InListNode { bool negated = 3; } +enum AggregateFunction { + MIN = 0; + MAX = 1; + SUM = 2; + AVG = 3; + COUNT = 4; + APPROX_DISTINCT = 5; + ARRAY_AGG = 6; + // VARIANCE = 7; + VARIANCE_POP = 8; + // COVARIANCE = 9; + // COVARIANCE_POP = 10; + STDDEV = 11; + STDDEV_POP = 12; + CORRELATION = 13; + APPROX_PERCENTILE_CONT = 14; + APPROX_MEDIAN = 15; + APPROX_PERCENTILE_CONT_WITH_WEIGHT = 16; + GROUPING = 17; + // MEDIAN = 18; + BIT_AND = 19; + BIT_OR = 20; + BIT_XOR = 21; + BOOL_AND = 22; + BOOL_OR = 23; + REGR_SLOPE = 26; + REGR_INTERCEPT = 27; + REGR_COUNT = 28; + REGR_R2 = 29; + REGR_AVGX = 30; + REGR_AVGY = 31; + REGR_SXX = 32; + REGR_SYY = 33; + REGR_SXY = 34; + STRING_AGG = 35; + NTH_VALUE_AGG = 36; +} + +message AggregateExprNode { + AggregateFunction aggr_function = 1; + repeated LogicalExprNode expr = 2; + bool distinct = 3; + LogicalExprNode filter = 4; + repeated LogicalExprNode order_by = 5; +} message AggregateUDFExprNode { string fun_name = 1; repeated LogicalExprNode args = 2; - bool distinct = 5; LogicalExprNode filter = 3; - repeated SortExprNode order_by = 4; - optional bytes fun_definition = 6; + repeated LogicalExprNode order_by = 4; } message ScalarUDFExprNode { @@ -506,8 +531,7 @@ message ScalarUDFExprNode { } enum BuiltInWindowFunction { - UNSPECIFIED = 0; // https://protobuf.dev/programming-guides/dos-donts/#unspecified-enum - // ROW_NUMBER = 0; + ROW_NUMBER = 0; RANK = 1; DENSE_RANK = 2; PERCENT_RANK = 3; @@ -522,16 +546,16 @@ enum BuiltInWindowFunction { message WindowExprNode { oneof window_function { + AggregateFunction aggr_function = 1; BuiltInWindowFunction built_in_function = 2; string udaf = 3; string udwf = 9; } LogicalExprNode expr = 4; repeated LogicalExprNode partition_by = 5; - repeated SortExprNode order_by = 6; + repeated LogicalExprNode order_by = 6; // repeated LogicalExprNode filter = 7; WindowFrame window_frame = 8; - optional bytes fun_definition = 10; } message BetweenNode { @@ -650,11 +674,9 @@ message PlanType { datafusion_common.EmptyMessage FinalLogicalPlan = 3; datafusion_common.EmptyMessage InitialPhysicalPlan = 4; datafusion_common.EmptyMessage InitialPhysicalPlanWithStats = 9; - datafusion_common.EmptyMessage InitialPhysicalPlanWithSchema = 11; OptimizedPhysicalPlanType OptimizedPhysicalPlan = 5; datafusion_common.EmptyMessage FinalPhysicalPlan = 6; datafusion_common.EmptyMessage FinalPhysicalPlanWithStats = 10; - datafusion_common.EmptyMessage FinalPhysicalPlanWithSchema = 12; } } @@ -715,11 +737,10 @@ message PhysicalPlanNode { AnalyzeExecNode analyze = 23; JsonSinkExecNode json_sink = 24; SymmetricHashJoinExecNode symmetric_hash_join = 25; - InterleaveExecNode interleave = 26; + InterleaveExecNode interleave = 26; PlaceholderRowExecNode placeholder_row = 27; CsvSinkExecNode csv_sink = 28; ParquetSinkExecNode parquet_sink = 29; - UnnestExecNode unnest = 30; } } @@ -731,21 +752,13 @@ message PartitionColumn { message FileSinkConfig { reserved 6; // writer_mode - reserved 8; // was `overwrite` which has been superseded by `insert_op` string object_store_url = 1; repeated PartitionedFile file_groups = 2; repeated string table_paths = 3; datafusion_common.Schema output_schema = 4; repeated PartitionColumn table_partition_cols = 5; - bool keep_partition_by_columns = 9; - InsertOp insert_op = 10; -} - -enum InsertOp { - Append = 0; - Overwrite = 1; - Replace = 2; + bool overwrite = 8; } message JsonSink { @@ -784,19 +797,6 @@ message ParquetSinkExecNode { PhysicalSortExprNodeCollection sort_order = 4; } -message UnnestExecNode { - PhysicalPlanNode input = 1; - datafusion_common.Schema schema = 2; - repeated ListUnnest list_type_columns = 3; - repeated uint64 struct_type_columns = 4; - UnnestOptions options = 5; -} - -message ListUnnest { - uint32 index_in_input_schema = 1; - uint32 depth = 2; -} - message PhysicalExtensionNode { bytes node = 1; repeated PhysicalPlanNode inputs = 2; @@ -838,8 +838,6 @@ message PhysicalExprNode { // was PhysicalDateTimeIntervalExprNode date_time_interval_expr = 17; PhysicalLikeExprNode like_expr = 18; - - PhysicalExtensionExprNode extension = 19; } } @@ -852,17 +850,17 @@ message PhysicalScalarUdfNode { message PhysicalAggregateExprNode { oneof AggregateFunction { + AggregateFunction aggr_function = 1; string user_defined_aggr_function = 4; } repeated PhysicalExprNode expr = 2; repeated PhysicalSortExprNode ordering_req = 5; bool distinct = 3; - bool ignore_nulls = 6; - optional bytes fun_definition = 7; } message PhysicalWindowExprNode { oneof window_function { + AggregateFunction aggr_function = 1; BuiltInWindowFunction built_in_function = 2; string user_defined_aggr_function = 3; } @@ -871,7 +869,6 @@ message PhysicalWindowExprNode { repeated PhysicalSortExprNode order_by = 6; WindowFrame window_frame = 7; string name = 8; - optional bytes fun_definition = 9; } message PhysicalIsNull { @@ -947,16 +944,10 @@ message PhysicalNegativeNode { PhysicalExprNode expr = 1; } -message PhysicalExtensionExprNode { - bytes expr = 1; - repeated PhysicalExprNode inputs = 2; -} - message FilterExecNode { PhysicalPlanNode input = 1; PhysicalExprNode expr = 2; uint32 default_filter_selectivity = 3; - repeated uint32 projection = 9; } message FileGroup { @@ -1003,10 +994,6 @@ message CsvScanExecNode { oneof optional_escape { string escape = 5; } - oneof optional_comment { - string comment = 6; - } - bool newlines_in_values = 7; } message AvroScanExecNode { @@ -1187,7 +1174,6 @@ message NestedLoopJoinExecNode { message CoalesceBatchesExecNode { PhysicalPlanNode input = 1; uint32 target_batch_size = 2; - optional uint32 fetch = 3; } message CoalescePartitionsExecNode { diff --git a/ballista/core/proto/datafusion_common.proto b/ballista/core/proto/datafusion_common.proto index c3906abf7..94490ec24 100644 --- a/ballista/core/proto/datafusion_common.proto +++ b/ballista/core/proto/datafusion_common.proto @@ -84,6 +84,7 @@ enum JoinType { LEFTANTI = 5; RIGHTSEMI = 6; RIGHTANTI = 7; + LEFTMARK = 8; } enum JoinConstraint { @@ -413,7 +414,7 @@ message CsvOptions { bytes quote = 3; // Quote character as a byte bytes escape = 4; // Optional escape character as a byte CompressionTypeVariant compression = 5; // Compression type - uint64 schema_infer_max_rec = 6; // Max records for schema inference + optional uint64 schema_infer_max_rec = 6; // Optional max records for schema inference string date_format = 7; // Optional date format string datetime_format = 8; // Optional datetime format string timestamp_format = 9; // Optional timestamp format @@ -429,7 +430,7 @@ message CsvOptions { // Options controlling CSV format message JsonOptions { CompressionTypeVariant compression = 1; // Compression type - uint64 schema_infer_max_rec = 2; // Max records for schema inference + optional uint64 schema_infer_max_rec = 2; // Optional max records for schema inference } message TableParquetOptions { @@ -494,6 +495,7 @@ message ParquetOptions { bool bloom_filter_on_read = 26; // default = true bool bloom_filter_on_write = 27; // default = false bool schema_force_view_types = 28; // default = false + bool binary_as_string = 29; // default = false oneof metadata_size_hint_opt { uint64 metadata_size_hint = 4; @@ -540,9 +542,10 @@ message ParquetOptions { string created_by = 16; } -enum JoinSide{ +enum JoinSide { LEFT_SIDE = 0; RIGHT_SIDE = 1; + NONE = 2; } message Precision{ diff --git a/ballista/core/src/extension.rs b/ballista/core/src/extension.rs index bb43e93bd..25bdbad92 100644 --- a/ballista/core/src/extension.rs +++ b/ballista/core/src/extension.rs @@ -133,7 +133,7 @@ impl SessionStateExt for SessionState { .with_round_robin_repartition(false); let runtime_config = RuntimeConfig::default(); - let runtime_env = RuntimeEnv::new(runtime_config)?; + let runtime_env = RuntimeEnv::try_new(runtime_config)?; let session_state = SessionStateBuilder::new() .with_default_features() .with_config(session_config) diff --git a/ballista/core/src/utils.rs b/ballista/core/src/utils.rs index 14eeb9a21..913e955d3 100644 --- a/ballista/core/src/utils.rs +++ b/ballista/core/src/utils.rs @@ -56,7 +56,7 @@ pub fn default_session_builder( Ok(SessionStateBuilder::new() .with_default_features() .with_config(config) - .with_runtime_env(Arc::new(RuntimeEnv::new(RuntimeConfig::default())?)) + .with_runtime_env(Arc::new(RuntimeEnv::try_new(RuntimeConfig::default())?)) .build()) } @@ -125,6 +125,17 @@ pub struct BallistaQueryPlanner { plan_repr: PhantomData, } +impl std::fmt::Debug for BallistaQueryPlanner { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("BallistaQueryPlanner") + .field("scheduler_url", &self.scheduler_url) + .field("config", &self.config) + .field("extension_codec", &self.extension_codec) + .field("plan_repr", &self.plan_repr) + .finish() + } +} + impl BallistaQueryPlanner { pub fn new(scheduler_url: String, config: BallistaConfig) -> Self { Self { @@ -316,7 +327,7 @@ mod test { use crate::utils::LocalRun; fn context() -> SessionContext { - let runtime_environment = RuntimeEnv::new(RuntimeConfig::new()).unwrap(); + let runtime_environment = RuntimeEnv::try_new(RuntimeConfig::new()).unwrap(); let session_config = SessionConfig::new().with_information_schema(true); diff --git a/ballista/executor/src/executor_process.rs b/ballista/executor/src/executor_process.rs index e350f391e..fac02b48d 100644 --- a/ballista/executor/src/executor_process.rs +++ b/ballista/executor/src/executor_process.rs @@ -204,7 +204,7 @@ pub async fn start_executor_process( opt.override_runtime_producer.clone().unwrap_or_else(|| { Arc::new(move |_| { let config = RuntimeConfig::new().with_temp_file_path(wd.clone()); - Ok(Arc::new(RuntimeEnv::new(config)?)) + Ok(Arc::new(RuntimeEnv::try_new(config)?)) }) }); diff --git a/ballista/executor/src/standalone.rs b/ballista/executor/src/standalone.rs index 2c2906f07..57082fc2c 100644 --- a/ballista/executor/src/standalone.rs +++ b/ballista/executor/src/standalone.rs @@ -185,7 +185,7 @@ pub async fn new_standalone_executor< let wd = work_dir.clone(); let runtime_producer: RuntimeProducer = Arc::new(move |_: &SessionConfig| { let config = RuntimeConfig::new().with_temp_file_path(wd.clone()); - Ok(Arc::new(RuntimeEnv::new(config)?)) + Ok(Arc::new(RuntimeEnv::try_new(config)?)) }); let executor = Arc::new(Executor::new_basic( diff --git a/ballista/scheduler/src/test_utils.rs b/ballista/scheduler/src/test_utils.rs index 629cc285b..8e4565a45 100644 --- a/ballista/scheduler/src/test_utils.rs +++ b/ballista/scheduler/src/test_utils.rs @@ -71,6 +71,7 @@ const TEST_SCHEDULER_NAME: &str = "localhost:50050"; /// Sometimes we need to construct logical plans that will produce errors /// when we try and create physical plan. A scan using `ExplodingTableProvider` /// will do the trick +#[derive(Debug)] pub struct ExplodingTableProvider; #[async_trait] @@ -135,7 +136,7 @@ pub async fn datafusion_test_context(path: &str) -> Result { let default_shuffle_partitions = 2; let config = SessionConfig::new().with_target_partitions(default_shuffle_partitions); let ctx = SessionContext::new_with_config(config); - for table in TPCH_TABLES { + for &table in TPCH_TABLES { let schema = get_tpch_schema(table); let options = CsvReadOptions::new() .schema(&schema) @@ -143,7 +144,7 @@ pub async fn datafusion_test_context(path: &str) -> Result { .has_header(false) .file_extension(".tbl"); let dir = format!("{path}/{table}"); - ctx.register_csv(table, &dir, options).await?; + ctx.register_csv(table, dir, options).await?; } Ok(ctx) } diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs index 77c48bb1f..72cc848df 100644 --- a/benchmarks/src/bin/tpch.rs +++ b/benchmarks/src/bin/tpch.rs @@ -562,7 +562,7 @@ async fn register_tables( ctx: &SessionContext, debug: bool, ) -> Result<()> { - for table in TABLES { + for &table in TABLES { match file_format { // dbgen creates .tbl ('|' delimited) files without header "tbl" => { diff --git a/docs/source/user-guide/extending-components.md b/docs/source/user-guide/extending-components.md index 556c0a36b..60de1b7b1 100644 --- a/docs/source/user-guide/extending-components.md +++ b/docs/source/user-guide/extending-components.md @@ -74,7 +74,7 @@ pub fn custom_runtime_env_with_s3_support( CustomObjectStoreRegistry::new(s3options.clone()), )); - Ok(Arc::new(RuntimeEnv::new(config)?)) + Ok(Arc::new(RuntimeEnv::try_new(config)?)) } ``` diff --git a/examples/src/object_store.rs b/examples/src/object_store.rs index 3cd22fa6a..5b5e38a6a 100644 --- a/examples/src/object_store.rs +++ b/examples/src/object_store.rs @@ -79,7 +79,7 @@ pub fn custom_runtime_env_with_s3_support( CustomObjectStoreRegistry::new(s3options.clone()), )); - Ok(Arc::new(RuntimeEnv::new(config)?)) + Ok(Arc::new(RuntimeEnv::try_new(config)?)) } /// Custom [SessionState] constructor method diff --git a/examples/tests/object_store.rs b/examples/tests/object_store.rs index ca47c5cb9..3c3443e82 100644 --- a/examples/tests/object_store.rs +++ b/examples/tests/object_store.rs @@ -60,7 +60,7 @@ mod standalone { let test_data = examples_test_data(); let config = RuntimeConfig::new(); - let runtime_env = RuntimeEnv::new(config)?; + let runtime_env = RuntimeEnv::try_new(config)?; runtime_env.register_object_store( &format!("s3://{}", crate::common::BUCKET) @@ -147,7 +147,7 @@ mod remote { .map_err(|e| DataFusionError::External(e.into()))?; let config = RuntimeConfig::new(); - let runtime_env = RuntimeEnv::new(config)?; + let runtime_env = RuntimeEnv::try_new(config)?; runtime_env.register_object_store( &format!("s3://{}", crate::common::BUCKET) @@ -219,6 +219,7 @@ mod custom_s3_config { use ballista_core::RuntimeProducer; use ballista_examples::object_store::{CustomObjectStoreRegistry, S3Options}; use ballista_examples::test_util::examples_test_data; + use datafusion::execution::runtime_env::RuntimeEnv; use datafusion::execution::SessionState; use datafusion::prelude::SessionConfig; use datafusion::{assert_batches_eq, prelude::SessionContext}; @@ -288,7 +289,7 @@ mod custom_s3_config { CustomObjectStoreRegistry::new(s3options.clone()), )); - Ok(Arc::new(RuntimeEnv::new(config)?)) + Ok(Arc::new(RuntimeEnv::try_new(config)?)) }); // Session builder creates SessionState @@ -494,7 +495,8 @@ mod custom_s3_config { let config = RuntimeConfig::new().with_object_store_registry(Arc::new( CustomObjectStoreRegistry::new(s3options.clone()), )); - let runtime_env = RuntimeEnv::new(config)?; + + let runtime_env = RuntimeEnv::try_new(config)?; Ok(SessionStateBuilder::new() .with_runtime_env(runtime_env.into())