Skip to content

Commit

Permalink
feat: add hoodie.read.listing.parallelism config (#235)
Browse files Browse the repository at this point in the history
  • Loading branch information
xushiyan authored Jan 5, 2025
1 parent 11c2430 commit 64b1dc1
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 31 deletions.
27 changes: 17 additions & 10 deletions crates/core/src/config/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,23 +42,24 @@ use crate::config::{ConfigParser, HudiConfigValue};
#[derive(Clone, Debug, PartialEq, Eq, Hash, EnumIter)]
pub enum HudiReadConfig {
/// Define input splits
/// - Hoodie Key : hoodie.read.input.partitions
/// The query instant for time travel. Without specified this option, we query the latest snapshot.
AsOfTimestamp,

/// Number of input partitions to read the data in parallel.
///
/// If has 100 files, [InputPartitions] is 5, will product 5 chunk,
/// every iter or task process 20 files
/// For processing 100 files, [InputPartitions] being 5 will produce 5 partitions, with each partition having 20 files.
InputPartitions,

/// The query instant for time travel. Without specified this option, we query the latest snapshot.
/// - Hoodie Key : hoodie.read.as.of.timestamp
AsOfTimestamp,
/// Parallelism for listing files on storage.
ListingParallelism,
}

impl AsRef<str> for HudiReadConfig {
fn as_ref(&self) -> &str {
match self {
Self::InputPartitions => "hoodie.read.input.partitions",
Self::AsOfTimestamp => "hoodie.read.as.of.timestamp",
Self::InputPartitions => "hoodie.read.input.partitions",
Self::ListingParallelism => "hoodie.read.listing.parallelism",
}
}
}
Expand All @@ -69,6 +70,7 @@ impl ConfigParser for HudiReadConfig {
fn default_value(&self) -> Option<HudiConfigValue> {
match self {
HudiReadConfig::InputPartitions => Some(HudiConfigValue::UInteger(0usize)),
HudiReadConfig::ListingParallelism => Some(HudiConfigValue::UInteger(10usize)),
_ => None,
}
}
Expand All @@ -80,12 +82,17 @@ impl ConfigParser for HudiReadConfig {
.ok_or(NotFound(self.key()));

match self {
Self::AsOfTimestamp => get_result.map(|v| HudiConfigValue::String(v.to_string())),
Self::InputPartitions => get_result
.and_then(|v| {
usize::from_str(v).map_err(|e| ParseInt(self.key(), v.to_string(), e))
})
.map(HudiConfigValue::UInteger),
Self::AsOfTimestamp => get_result.map(|v| HudiConfigValue::String(v.to_string())),
Self::ListingParallelism => get_result
.and_then(|v| {
usize::from_str(v).map_err(|e| ParseInt(self.key(), v.to_string(), e))
})
.map(HudiConfigValue::UInteger),
}
}
}
Expand All @@ -99,7 +106,7 @@ mod tests {
fn parse_valid_config_value() {
let options = HashMap::from([(InputPartitions.as_ref().to_string(), "100".to_string())]);
let value = InputPartitions.parse_value(&options).unwrap().to::<usize>();
assert_eq!(value, 100usize);
assert_eq!(value, 100);
}

#[test]
Expand Down
49 changes: 28 additions & 21 deletions crates/core/src/table/fs_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use crate::file_group::{BaseFile, FileGroup, FileSlice};
use crate::storage::file_info::FileInfo;
use crate::storage::{get_leaf_dirs, Storage};

use crate::config::read::HudiReadConfig::ListingParallelism;
use crate::error::CoreError;
use crate::table::partition::PartitionPruner;
use crate::Result;
Expand Down Expand Up @@ -55,11 +56,11 @@ impl FileSystemView {
})
}

async fn load_all_partition_paths(storage: &Storage) -> Result<Vec<String>> {
Self::load_partition_paths(storage, &PartitionPruner::empty()).await
async fn list_all_partition_paths(storage: &Storage) -> Result<Vec<String>> {
Self::list_partition_paths(storage, &PartitionPruner::empty()).await
}

async fn load_partition_paths(
async fn list_partition_paths(
storage: &Storage,
partition_pruner: &PartitionPruner,
) -> Result<Vec<String>> {
Expand All @@ -86,7 +87,7 @@ impl FileSystemView {
.collect())
}

async fn load_file_groups_for_partition(
async fn list_file_groups_for_partition(
storage: &Storage,
partition_path: &str,
) -> Result<Vec<FileGroup>> {
Expand Down Expand Up @@ -118,35 +119,30 @@ impl FileSystemView {
Ok(file_groups)
}

pub async fn get_file_slices_as_of(
&self,
timestamp: &str,
partition_pruner: &PartitionPruner,
excluding_file_groups: &HashSet<FileGroup>,
) -> Result<Vec<FileSlice>> {
let all_partition_paths = Self::load_all_partition_paths(&self.storage).await?;
async fn load_file_groups(&self, partition_pruner: &PartitionPruner) -> Result<()> {
let all_partition_paths = Self::list_all_partition_paths(&self.storage).await?;

let partition_paths_to_load = all_partition_paths
let partition_paths_to_list = all_partition_paths
.into_iter()
.filter(|p| !self.partition_to_file_groups.contains_key(p))
.filter(|p| partition_pruner.should_include(p))
.collect::<HashSet<_>>();

stream::iter(partition_paths_to_load)
let parallelism = self
.hudi_configs
.get_or_default(ListingParallelism)
.to::<usize>();
stream::iter(partition_paths_to_list)
.map(|path| async move {
let file_groups =
Self::load_file_groups_for_partition(&self.storage, &path).await?;
Self::list_file_groups_for_partition(&self.storage, &path).await?;
Ok::<_, CoreError>((path, file_groups))
})
// TODO parameterize the parallelism for partition loading
.buffer_unordered(10)
.buffer_unordered(parallelism)
.try_for_each(|(path, file_groups)| async move {
self.partition_to_file_groups.insert(path, file_groups);
Ok(())
})
.await?;

self.collect_file_slices_as_of(timestamp, partition_pruner, excluding_file_groups)
.await
}

Expand Down Expand Up @@ -174,6 +170,17 @@ impl FileSystemView {
}
Ok(file_slices)
}

pub async fn get_file_slices_as_of(
&self,
timestamp: &str,
partition_pruner: &PartitionPruner,
excluding_file_groups: &HashSet<FileGroup>,
) -> Result<Vec<FileSlice>> {
self.load_file_groups(partition_pruner).await?;
self.collect_file_slices_as_of(timestamp, partition_pruner, excluding_file_groups)
.await
}
}

#[cfg(test)]
Expand Down Expand Up @@ -205,7 +212,7 @@ mod tests {
let base_url = TestTable::V6Nonpartitioned.url();
let storage = Storage::new_with_base_url(base_url).unwrap();
let partition_pruner = PartitionPruner::empty();
let partition_paths = FileSystemView::load_partition_paths(&storage, &partition_pruner)
let partition_paths = FileSystemView::list_partition_paths(&storage, &partition_pruner)
.await
.unwrap();
let partition_path_set: HashSet<&str> =
Expand All @@ -218,7 +225,7 @@ mod tests {
let base_url = TestTable::V6ComplexkeygenHivestyle.url();
let storage = Storage::new_with_base_url(base_url).unwrap();
let partition_pruner = PartitionPruner::empty();
let partition_paths = FileSystemView::load_partition_paths(&storage, &partition_pruner)
let partition_paths = FileSystemView::list_partition_paths(&storage, &partition_pruner)
.await
.unwrap();
let partition_path_set: HashSet<&str> =
Expand Down

0 comments on commit 64b1dc1

Please sign in to comment.