Skip to content

Commit

Permalink
supports falling back hash join to sort merge join when hash table is…
Browse files Browse the repository at this point in the history
… too big.
  • Loading branch information
zhangli20 committed Jan 9, 2025
1 parent 60af35e commit ca69b3b
Show file tree
Hide file tree
Showing 27 changed files with 894 additions and 447 deletions.
1 change: 0 additions & 1 deletion .github/workflows/tpcds.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ name: TPC-DS

on:
workflow_dispatch:
push:
pull_request_target:
types:
- opened
Expand Down
32 changes: 30 additions & 2 deletions native-engine/blaze-jni-bridge/src/conf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use datafusion::common::Result;
use datafusion::common::{DataFusionError, Result};

use crate::{jni_call_static, jni_get_string, jni_new_string};
use crate::{is_jni_bridge_inited, jni_call_static, jni_get_string, jni_new_string};

macro_rules! define_conf {
($conftype:ty, $name:ident) => {
Expand Down Expand Up @@ -42,10 +42,18 @@ define_conf!(BooleanConf, PARQUET_ENABLE_BLOOM_FILTER);
define_conf!(StringConf, SPARK_IO_COMPRESSION_CODEC);
define_conf!(IntConf, SPARK_TASK_CPUS);
define_conf!(StringConf, SPILL_COMPRESSION_CODEC);
define_conf!(BooleanConf, SMJ_FALLBACK_ENABLE);
define_conf!(IntConf, SMJ_FALLBACK_ROWS_THRESHOLD);
define_conf!(IntConf, SMJ_FALLBACK_MEM_SIZE_THRESHOLD);

pub trait BooleanConf {
fn key(&self) -> &'static str;
fn value(&self) -> Result<bool> {
if !is_jni_bridge_inited() {
return Err(DataFusionError::Execution(format!(
"JNIEnv not initialized"
)));
}
let key = jni_new_string!(self.key())?;
jni_call_static!(BlazeConf.booleanConf(key.as_obj()) -> bool)
}
Expand All @@ -54,6 +62,11 @@ pub trait BooleanConf {
pub trait IntConf {
fn key(&self) -> &'static str;
fn value(&self) -> Result<i32> {
if !is_jni_bridge_inited() {
return Err(DataFusionError::Execution(format!(
"JNIEnv not initialized"
)));
}
let key = jni_new_string!(self.key())?;
jni_call_static!(BlazeConf.intConf(key.as_obj()) -> i32)
}
Expand All @@ -62,6 +75,11 @@ pub trait IntConf {
pub trait LongConf {
fn key(&self) -> &'static str;
fn value(&self) -> Result<i64> {
if !is_jni_bridge_inited() {
return Err(DataFusionError::Execution(format!(
"JNIEnv not initialized"
)));
}
let key = jni_new_string!(self.key())?;
jni_call_static!(BlazeConf.longConf(key.as_obj()) -> i64)
}
Expand All @@ -70,6 +88,11 @@ pub trait LongConf {
pub trait DoubleConf {
fn key(&self) -> &'static str;
fn value(&self) -> Result<f64> {
if !is_jni_bridge_inited() {
return Err(DataFusionError::Execution(format!(
"JNIEnv not initialized"
)));
}
let key = jni_new_string!(self.key())?;
jni_call_static!(BlazeConf.doubleConf(key.as_obj()) -> f64)
}
Expand All @@ -78,6 +101,11 @@ pub trait DoubleConf {
pub trait StringConf {
fn key(&self) -> &'static str;
fn value(&self) -> Result<String> {
if !is_jni_bridge_inited() {
return Err(DataFusionError::Execution(format!(
"JNIEnv not initialized"
)));
}
let key = jni_new_string!(self.key())?;
let value = jni_get_string!(
jni_call_static!(BlazeConf.stringConf(key.as_obj()) -> JObject)?
Expand Down
4 changes: 2 additions & 2 deletions native-engine/datafusion-ext-commons/src/algorithm/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@
// limitations under the License.

pub mod loser_tree;
pub mod rdx_tournament_tree;
pub mod rdxsort;
pub mod rdx_queue;
pub mod rdx_sort;
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ use std::ops::{Deref, DerefMut};

use unchecked_index::UncheckedIndex;

pub trait KeyForRadixTournamentTree {
pub trait KeyForRadixQueue {
fn rdx(&self) -> usize;
}

/// An implementation of the radix tournament tree
/// with time complexity of sorting all values: O(n + K)
pub struct RadixTournamentTree<T> {
pub struct RadixQueue<T> {
num_keys: usize,
cur_rdx: usize,
values: UncheckedIndex<Vec<T>>,
Expand All @@ -31,7 +31,7 @@ pub struct RadixTournamentTree<T> {
}

#[allow(clippy::len_without_is_empty)]
impl<T: KeyForRadixTournamentTree> RadixTournamentTree<T> {
impl<T: KeyForRadixQueue> RadixQueue<T> {
pub fn new(values: Vec<T>, num_keys: usize) -> Self {
let num_keys = num_keys + 1; // avoid overflow
let num_values = values.len();
Expand Down Expand Up @@ -117,20 +117,20 @@ impl<T: KeyForRadixTournamentTree> RadixTournamentTree<T> {

/// A PeekMut structure to the loser tree, used to get smallest value and auto
/// adjusting after dropped.
pub struct RadixTournamentTreePeekMut<'a, T: KeyForRadixTournamentTree> {
tree: &'a mut RadixTournamentTree<T>,
pub struct RadixTournamentTreePeekMut<'a, T: KeyForRadixQueue> {
tree: &'a mut RadixQueue<T>,
dirty: bool,
}

impl<T: KeyForRadixTournamentTree> Deref for RadixTournamentTreePeekMut<'_, T> {
impl<T: KeyForRadixQueue> Deref for RadixTournamentTreePeekMut<'_, T> {
type Target = T;

fn deref(&self) -> &Self::Target {
self.tree.peek()
}
}

impl<T: KeyForRadixTournamentTree> DerefMut for RadixTournamentTreePeekMut<'_, T> {
impl<T: KeyForRadixQueue> DerefMut for RadixTournamentTreePeekMut<'_, T> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.dirty = true;
&mut self.tree.values[self
Expand All @@ -142,7 +142,7 @@ impl<T: KeyForRadixTournamentTree> DerefMut for RadixTournamentTreePeekMut<'_, T
}
}

impl<T: KeyForRadixTournamentTree> Drop for RadixTournamentTreePeekMut<'_, T> {
impl<T: KeyForRadixQueue> Drop for RadixTournamentTreePeekMut<'_, T> {
fn drop(&mut self) {
if self.dirty {
self.tree.adjust_tree();
Expand All @@ -155,7 +155,7 @@ mod test {
use itertools::Itertools;
use rand::Rng;

use crate::algorithm::rdx_tournament_tree::{KeyForRadixTournamentTree, RadixTournamentTree};
use crate::algorithm::rdx_queue::{KeyForRadixQueue, RadixQueue};

#[test]
fn fuzztest() {
Expand Down Expand Up @@ -184,12 +184,12 @@ mod test {
row_idx: usize,
values: Vec<u64>,
}
impl KeyForRadixTournamentTree for Cursor {
impl KeyForRadixQueue for Cursor {
fn rdx(&self) -> usize {
self.values.get(self.row_idx).cloned().unwrap_or(u64::MAX) as usize
}
}
let mut loser_tree = RadixTournamentTree::new(
let mut loser_tree = RadixQueue::new(
nodes
.into_iter()
.map(|node| Cursor {
Expand Down
18 changes: 3 additions & 15 deletions native-engine/datafusion-ext-commons/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,7 @@
#![feature(slice_swap_unchecked)]
#![feature(vec_into_raw_parts)]

use blaze_jni_bridge::{
conf::{IntConf, BATCH_SIZE},
is_jni_bridge_inited,
};
use blaze_jni_bridge::conf::{IntConf, BATCH_SIZE};
use once_cell::sync::OnceCell;
use unchecked_index::UncheckedIndex;

Expand Down Expand Up @@ -71,17 +68,8 @@ macro_rules! downcast_any {
}

pub fn batch_size() -> usize {
const CACHED_BATCH_SIZE: OnceCell<i32> = OnceCell::new();
let batch_size = *CACHED_BATCH_SIZE
.get_or_try_init(|| {
if is_jni_bridge_inited() {
BATCH_SIZE.value()
} else {
Ok(10000) // for testing
}
})
.expect("error getting configured batch size") as usize;
batch_size
const CACHED_BATCH_SIZE: OnceCell<usize> = OnceCell::new();
*CACHED_BATCH_SIZE.get_or_init(|| BATCH_SIZE.value().unwrap_or(10000) as usize)
}

// bigger for better radix sort performance
Expand Down
13 changes: 4 additions & 9 deletions native-engine/datafusion-ext-plans/src/agg/agg_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ use arrow::{
use blaze_jni_bridge::{
conf,
conf::{DoubleConf, IntConf},
is_jni_bridge_inited,
};
use datafusion::{
common::{cast::as_binary_array, Result},
Expand Down Expand Up @@ -164,14 +163,10 @@ impl AggContext {
)?;

let (partial_skipping_ratio, partial_skipping_min_rows) = if supports_partial_skipping {
if is_jni_bridge_inited() {
(
conf::PARTIAL_AGG_SKIPPING_RATIO.value()?,
conf::PARTIAL_AGG_SKIPPING_MIN_ROWS.value()? as usize,
)
} else {
(0.999, 20000) // only for testing
}
(
conf::PARTIAL_AGG_SKIPPING_RATIO.value().unwrap_or(0.999),
conf::PARTIAL_AGG_SKIPPING_MIN_ROWS.value().unwrap_or(20000) as usize,
)
} else {
Default::default()
};
Expand Down
10 changes: 5 additions & 5 deletions native-engine/datafusion-ext-plans/src/agg/agg_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ use datafusion::{
};
use datafusion_ext_commons::{
algorithm::{
rdx_tournament_tree::{KeyForRadixTournamentTree, RadixTournamentTree},
rdxsort::radix_sort_by_key,
rdx_queue::{KeyForRadixQueue, RadixQueue},
rdx_sort::radix_sort_by_key,
},
batch_size, df_execution_err, downcast_any,
io::{read_bytes_slice, read_len, write_len},
Expand Down Expand Up @@ -215,8 +215,8 @@ impl AggTable {

// create a radix tournament tree to do the merging
// the mem-table and at least one spill should be in the tree
let mut cursors: RadixTournamentTree<RecordsSpillCursor> =
RadixTournamentTree::new(cursors, NUM_SPILL_BUCKETS);
let mut cursors: RadixQueue<RecordsSpillCursor> =
RadixQueue::new(cursors, NUM_SPILL_BUCKETS);
assert!(cursors.len() > 0);

let mut map = AggHashMap::default();
Expand Down Expand Up @@ -698,7 +698,7 @@ impl<'a> RecordsSpillCursor<'a> {
}
}

impl<'a> KeyForRadixTournamentTree for RecordsSpillCursor<'a> {
impl<'a> KeyForRadixQueue for RecordsSpillCursor<'a> {
fn rdx(&self) -> usize {
self.cur_bucket_idx
}
Expand Down
Loading

0 comments on commit ca69b3b

Please sign in to comment.