Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Prepare for DataFusion 45 (bump to DataFusion rev 5592834 + Arrow 54.0.0) #1332

Open
wants to merge 21 commits into
base: main
Choose a base branch
from

Conversation

andygrove
Copy link
Member

@andygrove andygrove commented Jan 23, 2025

Which issue does this PR close?

Part of #1304

Rationale for this change

Use latest DF in preparation for upgrading to DF 45.

What changes are included in this PR?

  • Bump DF version
  • Copy over latest FilterExec and re-apply Comet-specific changes (we could stop doing this if we just unpack all dictionaries in the scan)
  • Remove uses of Field::new_dict

How are these changes tested?

@andygrove andygrove marked this pull request as draft January 23, 2025 17:17
@@ -304,11 +304,7 @@ fn scan_schema(input_batch: &InputBatch, data_types: &[DataType]) -> SchemaRef {
.map(|(idx, c)| {
let datatype = ScanExec::unpack_dictionary_type(c.data_type());
// We don't use the field name. Put a placeholder.
if matches!(datatype, DataType::Dictionary(_, _)) {
Field::new_dict(format!("col_{}", idx), datatype, true, idx as i64, false)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is no longer possible to re-use dictionary id across fields. I am unsure of the impact here. Perhaps @viirya will know.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What you mean to re-use dictionary id across fields? Dictionary id should be unique per field.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, corrected it. If two fields have same dictionary, they may use same dictionary id.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw you resolved this. Is it not an issue now?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to do some micro benchmarks on this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no performance impact from this change. The original code stored a dictionary id in the metadata and the new code does not. This dictionary id is actually not used at all in FFI. It was used in Arrow IPC but is no longer used as of Arrow 54.0.0 because that feature is now removed and Arrow IPC manages its own dictionary ids. We do not use Arrow IPC now because we are using our own proprietary encoding.

I will go ahead and run another TPC-H benchmark and post results here today, just to confirm there are no regressions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just making sure, will this work even the enableFastEncoding option is disabled?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fresh benchmark results:

Using fast encoding = 332 seconds (our published time is 331, and I do see small variance on each run)
Using Arrow IPC = 334 seconds

@andygrove
Copy link
Member Author

Tests are failing:

 Cause: org.apache.comet.CometNativeException: slice index starts at 18446744072774451440 but ends at 32720
[info]         at comet::errors::init::{{closure}}(__internal__:0)
[info]         at std::panicking::rust_panic_with_hook(__internal__:0)
[info]         at std::panicking::begin_panic_handler::{{closure}}(__internal__:0)
[info]         at std::sys::backtrace::__rust_end_short_backtrace(__internal__:0)
[info]         at rust_begin_unwind(__internal__:0)
[info]         at core::panicking::panic_fmt(__internal__:0)
[info]         at core::slice::index::slice_index_order_fail(__internal__:0)
[info]         at arrow_data::transform::variable_size::build_extend::{{closure}}(__internal__:0)
[info]         at arrow_data::transform::MutableArrayData::extend(__internal__:0)
[info]         at arrow_select::concat::concat_fallback(__internal__:0)
[info]         at arrow_select::concat::concat(__internal__:0)
[info]         at arrow_select::concat::concat_batches(__internal__:0)
[info]         at datafusion_physical_plan::sorts::sort::ExternalSorter::in_mem_sort_stream(__internal__:0)
[info]         at <datafusion_physical_plan::stream::RecordBatchStreamAdapter<S> as futures_core::stream::Stream>::poll_next(__internal__:0)
[info]         at <datafusion_physical_plan::joins::sort_merge_join::SortMergeJoinStream as futures_core::stream::Stream>::poll_next(__internal__:0)

@andygrove
Copy link
Member Author

Another failure:

org.apache.comet.CometNativeException: Cast error: Failed to convert 1140852704 to temporal for Date32

@andygrove andygrove changed the title chore: Bump DataFusion to rev 5592834 chore: Prepare for DataFusion 45 Jan 24, 2025
@codecov-commenter
Copy link

codecov-commenter commented Jan 24, 2025

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 39.09%. Comparing base (f09f8af) to head (6f8d2fa).
Report is 10 commits behind head on main.

Additional details and impacted files
@@              Coverage Diff              @@
##               main    #1332       +/-   ##
=============================================
- Coverage     56.12%   39.09%   -17.04%     
- Complexity      976     2065     +1089     
=============================================
  Files           119      260      +141     
  Lines         11743    60237    +48494     
  Branches       2251    12817    +10566     
=============================================
+ Hits           6591    23548    +16957     
- Misses         4012    32205    +28193     
- Partials       1140     4484     +3344     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Comment on lines +991 to +993
DataType::Null => {
matches!(to_type, DataType::List(_))
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was needed to fix test failures in CometArrayExpressionSuite.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will be that rolled back later?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, we will likely need to add support for more casts from Null to other types in the future, especially once we add support for more complex types.

@andygrove andygrove changed the title chore: Prepare for DataFusion 45 chore: Prepare for DataFusion 45 (bump to DataFusion rev 5592834 + Arrow 54.0.0) Jan 28, 2025
@andygrove andygrove marked this pull request as ready for review January 28, 2025 17:26
@@ -33,21 +33,21 @@ edition = "2021"
rust-version = "1.79"
Copy link
Contributor

@comphead comphead Jan 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this probably can be updated to 1.80 like in datafusion or 1.81, the PR is already created

@@ -62,6 +65,8 @@ pub struct FilterExec {
default_selectivity: u8,
/// Properties equivalence properties, partitioning, etc.
cache: PlanProperties,
/// The projection indices of the columns in the output schema of join
projection: Option<Vec<usize>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this projection a part of migration? if so the migration is quite complicated....

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We currently maintain a copy of DataFusion's FilterExec with one small change, so I copied over that latest to keep in sync and then re-applied the change that we need (for memory safety because of the way we re-use buffers).

@comphead
Copy link
Contributor

I think the PR is good in general but what concerns me is really lots of code added just to do the migration. I'm wondering was there breaking changes in DF or Arrow, as looks like we agreed to avoid breaking public API changes in DF

@andygrove
Copy link
Member Author

I think the PR is good in general but what concerns me is really lots of code added just to do the migration. I'm wondering was there breaking changes in DF or Arrow, as looks like we agreed to avoid breaking public API changes in DF

The biggest issue was apache/datafusion#14277

Copy link
Contributor

@comphead comphead left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm thanks @andygrove there is still some download issue for one of the test suites

Comment on lines +456 to +461
let projected_columns = projection
.iter()
.map(|i| Arc::clone(batch.column(*i)))
.collect();
let projected_batch =
RecordBatch::try_new(Arc::clone(output_schema), projected_columns)?;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Normally projection should come after predicate, no?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, you already got predicate filter result filter_array. Then it doesn't matter.

Copy link
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me. But I have a question for reusing dictionary id.

@andygrove
Copy link
Member Author

Looks good to me. But I have a question for reusing dictionary id.

For more context on this, see the discussion in apache/arrow-rs#5981

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants