Skip to content

Commit

Permalink
Deduplicate function get_final_indices_from_shared_bitmap (#14145)
Browse files Browse the repository at this point in the history
* Deduplicate function get_final_indices_from_shared_bitmap

* update
  • Loading branch information
lewiszlw authored Jan 16, 2025
1 parent 7f70b6c commit dc22b3f
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 26 deletions.
21 changes: 8 additions & 13 deletions datafusion/physical-plan/src/joins/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@ use std::sync::Arc;
use std::task::Poll;
use std::{any::Any, vec};

use super::utils::{asymmetric_join_output_partitioning, reorder_output_after_swap};
use super::utils::{
asymmetric_join_output_partitioning, get_final_indices_from_shared_bitmap,
reorder_output_after_swap,
};
use super::{
utils::{OnceAsync, OnceFut},
PartitionMode, SharedBitmapBuilder,
Expand All @@ -39,10 +42,10 @@ use crate::{
joins::utils::{
adjust_indices_by_join_type, apply_join_filter_to_indices,
build_batch_from_indices, build_join_schema, check_join_is_valid,
estimate_join_statistics, get_final_indices_from_bit_map,
need_produce_result_in_final, symmetric_join_output_partitioning,
BuildProbeJoinMetrics, ColumnIndex, JoinFilter, JoinHashMap, JoinHashMapOffset,
JoinHashMapType, JoinOn, JoinOnRef, StatefulStreamResult,
estimate_join_statistics, need_produce_result_in_final,
symmetric_join_output_partitioning, BuildProbeJoinMetrics, ColumnIndex,
JoinFilter, JoinHashMap, JoinHashMapOffset, JoinHashMapType, JoinOn, JoinOnRef,
StatefulStreamResult,
},
metrics::{ExecutionPlanMetricsSet, MetricsSet},
DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
Expand Down Expand Up @@ -1349,14 +1352,6 @@ pub fn equal_rows_arr(
))
}

fn get_final_indices_from_shared_bitmap(
shared_bitmap: &SharedBitmapBuilder,
join_type: JoinType,
) -> (UInt64Array, UInt32Array) {
let bitmap = shared_bitmap.lock();
get_final_indices_from_bit_map(&bitmap, join_type)
}

impl HashJoinStream {
/// Separate implementation function that unpins the [`HashJoinStream`] so
/// that partial borrows work correctly
Expand Down
17 changes: 4 additions & 13 deletions datafusion/physical-plan/src/joins/nested_loop_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,16 @@ use std::sync::Arc;
use std::task::Poll;

use super::utils::{
asymmetric_join_output_partitioning, need_produce_result_in_final,
reorder_output_after_swap, BatchSplitter, BatchTransformer, NoopBatchTransformer,
StatefulStreamResult,
asymmetric_join_output_partitioning, get_final_indices_from_shared_bitmap,
need_produce_result_in_final, reorder_output_after_swap, BatchSplitter,
BatchTransformer, NoopBatchTransformer, StatefulStreamResult,
};
use crate::coalesce_partitions::CoalescePartitionsExec;
use crate::execution_plan::{boundedness_from_children, EmissionType};
use crate::joins::utils::{
adjust_indices_by_join_type, apply_join_filter_to_indices, build_batch_from_indices,
build_join_schema, check_join_is_valid, estimate_join_statistics,
get_final_indices_from_bit_map, BuildProbeJoinMetrics, ColumnIndex, JoinFilter,
OnceAsync, OnceFut,
BuildProbeJoinMetrics, ColumnIndex, JoinFilter, OnceAsync, OnceFut,
};
use crate::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use crate::{
Expand Down Expand Up @@ -878,14 +877,6 @@ fn join_left_and_right_batch(
)
}

fn get_final_indices_from_shared_bitmap(
shared_bitmap: &SharedBitmapBuilder,
join_type: JoinType,
) -> (UInt64Array, UInt32Array) {
let bitmap = shared_bitmap.lock();
get_final_indices_from_bit_map(&bitmap, join_type)
}

impl<T: BatchTransformer + Unpin + Send> Stream for NestedLoopJoinStream<T> {
type Item = Result<RecordBatch>;

Expand Down
9 changes: 9 additions & 0 deletions datafusion/physical-plan/src/joins/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ use datafusion_physical_expr::{
LexOrdering, PhysicalExpr, PhysicalExprRef, PhysicalSortExpr,
};

use crate::joins::SharedBitmapBuilder;
use crate::projection::ProjectionExec;
use futures::future::{BoxFuture, Shared};
use futures::{ready, FutureExt};
Expand Down Expand Up @@ -1112,6 +1113,14 @@ pub(crate) fn need_produce_result_in_final(join_type: JoinType) -> bool {
)
}

pub(crate) fn get_final_indices_from_shared_bitmap(
shared_bitmap: &SharedBitmapBuilder,
join_type: JoinType,
) -> (UInt64Array, UInt32Array) {
let bitmap = shared_bitmap.lock();
get_final_indices_from_bit_map(&bitmap, join_type)
}

/// In the end of join execution, need to use bit map of the matched
/// indices to generate the final left and right indices.
///
Expand Down

0 comments on commit dc22b3f

Please sign in to comment.