From 1779e772b748a4c86bd6af13c0899c5168958f2f Mon Sep 17 00:00:00 2001 From: Shiyan Xu <2701446+xushiyan@users.noreply.github.com> Date: Wed, 8 Jan 2025 15:33:54 -0600 Subject: [PATCH] refactor: improve `BaseFile` APIs (#239) - Consolidate `FileInfo` and `FileStats` into `FileMetadata` - Improve variable and struct property naming - Make more ergonomical APIs --- crates/core/src/file_group/base_file.rs | 134 +++++++++++++ crates/core/src/file_group/mod.rs | 185 +++++------------- crates/core/src/file_group/reader.rs | 4 +- .../{file_info.rs => file_metadata.rs} | 28 ++- crates/core/src/storage/file_stats.rs | 25 --- crates/core/src/storage/mod.rs | 127 +++++------- crates/core/src/table/fs_view.rs | 19 +- crates/core/src/table/mod.rs | 18 +- crates/datafusion/src/lib.rs | 28 ++- python/hudi/_internal.pyi | 14 +- python/src/internal.rs | 24 +-- python/tests/test_table_read.py | 4 +- 12 files changed, 325 insertions(+), 285 deletions(-) create mode 100644 crates/core/src/file_group/base_file.rs rename crates/core/src/storage/{file_info.rs => file_metadata.rs} (62%) delete mode 100644 crates/core/src/storage/file_stats.rs diff --git a/crates/core/src/file_group/base_file.rs b/crates/core/src/file_group/base_file.rs new file mode 100644 index 00000000..6f26c85d --- /dev/null +++ b/crates/core/src/file_group/base_file.rs @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +use crate::error::CoreError; +use crate::storage::file_metadata::FileMetadata; +use crate::Result; + +/// Hudi Base file, part of a [FileSlice]. +#[derive(Clone, Debug)] +pub struct BaseFile { + /// The file name of the base file. + pub file_name: String, + + /// The id of the enclosing file group. + pub file_group_id: String, + + /// The associated instant time of the base file. + pub instant_time: String, + + /// The metadata about the file. + pub file_metadata: Option, +} + +impl BaseFile { + /// Parse file name and extract `file_group_id` and `instant_time`. + fn parse_file_name(file_name: &str) -> Result<(String, String)> { + let err_msg = format!("Failed to parse file name '{file_name}' for base file."); + let (name, _) = file_name + .rsplit_once('.') + .ok_or_else(|| CoreError::FileGroup(err_msg.clone()))?; + let parts: Vec<&str> = name.split('_').collect(); + let file_group_id = parts + .first() + .ok_or_else(|| CoreError::FileGroup(err_msg.clone()))? + .to_string(); + let instant_time = parts + .get(2) + .ok_or_else(|| CoreError::FileGroup(err_msg.clone()))? + .to_string(); + Ok((file_group_id, instant_time)) + } +} + +impl TryFrom<&str> for BaseFile { + type Error = CoreError; + + fn try_from(file_name: &str) -> Result { + let (file_group_id, instant_time) = Self::parse_file_name(file_name)?; + Ok(Self { + file_name: file_name.to_string(), + file_group_id, + instant_time, + file_metadata: None, + }) + } +} + +impl TryFrom for BaseFile { + type Error = CoreError; + + fn try_from(metadata: FileMetadata) -> Result { + let file_name = metadata.name.clone(); + let (file_group_id, instant_time) = Self::parse_file_name(&file_name)?; + Ok(Self { + file_name, + file_group_id, + instant_time, + file_metadata: Some(metadata), + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use hudi_tests::assert_not; + + #[test] + fn test_create_base_file_from_file_name() { + let file_name = "5a226868-2934-4f84-a16f-55124630c68d-0_0-7-24_20240402144910683.parquet"; + let base_file = BaseFile::try_from(file_name).unwrap(); + assert_eq!( + base_file.file_group_id, + "5a226868-2934-4f84-a16f-55124630c68d-0" + ); + assert_eq!(base_file.instant_time, "20240402144910683"); + assert!(base_file.file_metadata.is_none()); + } + + #[test] + fn test_create_base_file_from_metadata() { + let metadata = FileMetadata::new( + "5a226868-2934-4f84-a16f-55124630c68d-0_0-7-24_20240402144910683.parquet", + 1024, + ); + let base_file = BaseFile::try_from(metadata).unwrap(); + assert_eq!( + base_file.file_group_id, + "5a226868-2934-4f84-a16f-55124630c68d-0" + ); + assert_eq!(base_file.instant_time, "20240402144910683"); + let file_metadata = base_file.file_metadata.unwrap(); + assert_eq!(file_metadata.size, 1024); + assert_not!(file_metadata.fully_populated); + } + + #[test] + fn create_a_base_file_returns_error() { + let result = BaseFile::try_from("no_file_extension"); + assert!(matches!(result.unwrap_err(), CoreError::FileGroup(_))); + + let result = BaseFile::try_from(".parquet"); + assert!(matches!(result.unwrap_err(), CoreError::FileGroup(_))); + + let metadata = FileMetadata::new("no-valid-delimiter.parquet", 1024); + let result = BaseFile::try_from(metadata); + assert!(matches!(result.unwrap_err(), CoreError::FileGroup(_))); + } +} diff --git a/crates/core/src/file_group/mod.rs b/crates/core/src/file_group/mod.rs index f93caefa..8c92f4ea 100644 --- a/crates/core/src/file_group/mod.rs +++ b/crates/core/src/file_group/mod.rs @@ -20,12 +20,12 @@ //! //! A set of data/base files + set of log files, that make up a unit for all operations. +pub mod base_file; pub mod builder; pub mod reader; use crate::error::CoreError; -use crate::storage::file_info::FileInfo; -use crate::storage::file_stats::FileStats; +use crate::file_group::base_file::BaseFile; use crate::storage::Storage; use crate::Result; use std::collections::BTreeMap; @@ -34,70 +34,7 @@ use std::fmt::Formatter; use std::hash::{Hash, Hasher}; use std::path::PathBuf; -/// Represents common metadata about a Hudi Base File. -#[derive(Clone, Debug)] -pub struct BaseFile { - /// The file group id that is unique across the table. - pub file_group_id: String, - - pub commit_time: String, - - pub info: FileInfo, - - pub stats: Option, -} - -impl BaseFile { - /// Parse file name and extract file_group_id and commit_time. - fn parse_file_name(file_name: &str) -> Result<(String, String)> { - let err_msg = format!("Failed to parse file name '{}' for base file.", file_name); - let (name, _) = file_name - .rsplit_once('.') - .ok_or(CoreError::FileGroup(err_msg.clone()))?; - let parts: Vec<&str> = name.split('_').collect(); - let file_group_id = parts - .first() - .ok_or(CoreError::FileGroup(err_msg.clone()))? - .to_string(); - let commit_time = parts - .get(2) - .ok_or(CoreError::FileGroup(err_msg.clone()))? - .to_string(); - Ok((file_group_id, commit_time)) - } - - /// Construct [BaseFile] with the base file name. - /// - /// TODO: refactor such that file info size is optional and no one expects it. - pub fn from_file_name(file_name: &str) -> Result { - let (file_group_id, commit_time) = Self::parse_file_name(file_name)?; - let info = FileInfo { - name: file_name.to_string(), - size: 0, - uri: "".to_string(), - }; - Ok(Self { - file_group_id, - commit_time, - info, - stats: None, - }) - } - - /// Construct [BaseFile] with the [FileInfo]. - pub fn from_file_info(info: FileInfo) -> Result { - let (file_group_id, commit_time) = Self::parse_file_name(&info.name)?; - Ok(Self { - file_group_id, - commit_time, - info, - stats: None, - }) - } -} - -/// Within a file group, a slice is a combination of data file written at a commit time and list of log files, -/// containing changes to the data file from that commit time. +/// Within a [FileGroup], a [FileSlice] is a logical group of [BaseFile] and log files. /// /// [note] The log files are not yet supported. #[derive(Clone, Debug)] @@ -107,47 +44,56 @@ pub struct FileSlice { } impl FileSlice { - #[cfg(test)] - pub fn base_file_path(&self) -> &str { - self.base_file.info.uri.as_str() + pub fn new(base_file: BaseFile, partition_path: Option) -> Self { + Self { + base_file, + partition_path, + } } - pub fn base_file_relative_path(&self) -> String { - let ptn = self.partition_path.as_deref().unwrap_or_default(); - let file_name = &self.base_file.info.name; - PathBuf::from(ptn) - .join(file_name) - .to_str() - .unwrap() - .to_string() + /// Returns the relative path of the base file. + pub fn base_file_relative_path(&self) -> Result { + let file_name = &self.base_file.file_name; + let path = PathBuf::from(self.partition_path()).join(file_name); + path.to_str().map(|s| s.to_string()).ok_or_else(|| { + CoreError::FileGroup(format!( + "Failed to get base file relative path for file slice: {:?}", + self + )) + }) } + /// Returns the enclosing [FileGroup]'s id. + #[inline] pub fn file_group_id(&self) -> &str { &self.base_file.file_group_id } - pub fn set_base_file(&mut self, base_file: BaseFile) { - self.base_file = base_file + /// Returns the partition path of the [FileSlice]. + #[inline] + pub fn partition_path(&self) -> &str { + self.partition_path.as_deref().unwrap_or_default() } - /// Load stats from storage layer for the base file if not already loaded. - pub async fn load_stats(&mut self, storage: &Storage) -> Result<()> { - if self.base_file.stats.is_none() { - let parquet_meta = storage - .get_parquet_file_metadata(&self.base_file_relative_path()) - .await?; - let num_records = parquet_meta.file_metadata().num_rows(); - let size_bytes = parquet_meta - .row_groups() - .iter() - .map(|rg| rg.total_byte_size()) - .sum::(); - let stats = FileStats { - num_records, - size_bytes, - }; - self.base_file.stats = Some(stats); + /// Returns the instant time that marks the [FileSlice] creation. + /// + /// This is also an instant time stored in the [Timeline]. + #[inline] + pub fn creation_instant_time(&self) -> &str { + &self.base_file.instant_time + } + + /// Load [FileMetadata] from storage layer for the [BaseFile] if `file_metadata` is [None] + /// or if `file_metadata` is not fully populated. + pub async fn load_metadata_if_needed(&mut self, storage: &Storage) -> Result<()> { + if let Some(metadata) = &self.base_file.file_metadata { + if metadata.fully_populated { + return Ok(()); + } } + let relative_path = self.base_file_relative_path()?; + let fetched_metadata = storage.get_file_metadata(&relative_path).await?; + self.base_file.file_metadata = Some(fetched_metadata); Ok(()) } } @@ -207,25 +153,21 @@ impl FileGroup { } pub fn add_base_file_from_name(&mut self, file_name: &str) -> Result<&Self> { - let base_file = BaseFile::from_file_name(file_name)?; + let base_file = BaseFile::try_from(file_name)?; self.add_base_file(base_file) } pub fn add_base_file(&mut self, base_file: BaseFile) -> Result<&Self> { - let commit_time = base_file.commit_time.as_str(); - if self.file_slices.contains_key(commit_time) { + let instant_time = base_file.instant_time.as_str(); + if self.file_slices.contains_key(instant_time) { Err(CoreError::FileGroup(format!( - "Commit time {0} is already present in File Group {1}", - commit_time.to_owned(), - self.id, + "Instant time {instant_time} is already present in File Group {}", + self.id ))) } else { self.file_slices.insert( - commit_time.to_owned(), - FileSlice { - partition_path: self.partition_path.clone(), - base_file, - }, + instant_time.to_owned(), + FileSlice::new(base_file, self.partition_path.clone()), ); Ok(self) } @@ -254,31 +196,6 @@ impl FileGroup { mod tests { use super::*; - #[test] - fn create_a_base_file_successfully() { - let base_file = BaseFile::from_file_name( - "5a226868-2934-4f84-a16f-55124630c68d-0_0-7-24_20240402144910683.parquet", - ) - .unwrap(); - assert_eq!( - base_file.file_group_id, - "5a226868-2934-4f84-a16f-55124630c68d-0" - ); - assert_eq!(base_file.commit_time, "20240402144910683"); - } - - #[test] - fn create_a_base_file_returns_error() { - let result = BaseFile::from_file_name("no_file_extension"); - assert!(matches!(result.unwrap_err(), CoreError::FileGroup(_))); - - let result = BaseFile::from_file_name(".parquet"); - assert!(matches!(result.unwrap_err(), CoreError::FileGroup(_))); - - let result = BaseFile::from_file_name("no-valid-delimiter.parquet"); - assert!(matches!(result.unwrap_err(), CoreError::FileGroup(_))); - } - #[test] fn load_a_valid_file_group() { let mut fg = FileGroup::new("5a226868-2934-4f84-a16f-55124630c68d-0".to_owned(), None); @@ -296,7 +213,7 @@ mod tests { fg.get_file_slice_as_of("20240402123035233") .unwrap() .base_file - .commit_time, + .instant_time, "20240402123035233" ); assert!(fg.get_file_slice_as_of("-1").is_none()); @@ -313,7 +230,7 @@ mod tests { "5a226868-2934-4f84-a16f-55124630c68d-0_2-10-0_20240402144910683.parquet", ); assert!(res2.is_err()); - assert_eq!(res2.unwrap_err().to_string(), "File group error: Commit time 20240402144910683 is already present in File Group 5a226868-2934-4f84-a16f-55124630c68d-0"); + assert_eq!(res2.unwrap_err().to_string(), "File group error: Instant time 20240402144910683 is already present in File Group 5a226868-2934-4f84-a16f-55124630c68d-0"); } #[test] diff --git a/crates/core/src/file_group/reader.rs b/crates/core/src/file_group/reader.rs index a7884d06..49200f7f 100644 --- a/crates/core/src/file_group/reader.rs +++ b/crates/core/src/file_group/reader.rs @@ -118,8 +118,8 @@ impl FileGroupReader { } pub async fn read_file_slice(&self, file_slice: &FileSlice) -> Result { - self.read_file_slice_by_base_file_path(&file_slice.base_file_relative_path()) - .await + let relative_path = file_slice.base_file_relative_path()?; + self.read_file_slice_by_base_file_path(&relative_path).await } } diff --git a/crates/core/src/storage/file_info.rs b/crates/core/src/storage/file_metadata.rs similarity index 62% rename from crates/core/src/storage/file_info.rs rename to crates/core/src/storage/file_metadata.rs index a6f1e059..649a94a7 100644 --- a/crates/core/src/storage/file_info.rs +++ b/crates/core/src/storage/file_metadata.rs @@ -17,10 +17,32 @@ * under the License. */ -/// File info that can be retrieved by listing operations without reading the file. #[derive(Clone, Debug, Default, Eq, PartialEq)] -pub struct FileInfo { - pub uri: String, +pub struct FileMetadata { + /// File name pub name: String, + + /// Size in bytes on storage pub size: usize, + + /// Size in bytes in memory + pub byte_size: i64, + + /// Number of records in the file + pub num_records: i64, + + /// Whether all the properties are populated or not + pub fully_populated: bool, +} + +impl FileMetadata { + pub fn new(name: impl Into, size: usize) -> Self { + Self { + name: name.into(), + size, + byte_size: 0, + num_records: 0, + fully_populated: false, + } + } } diff --git a/crates/core/src/storage/file_stats.rs b/crates/core/src/storage/file_stats.rs deleted file mode 100644 index 65fe1c5e..00000000 --- a/crates/core/src/storage/file_stats.rs +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/// File stats that can be retrieved by reading the file's metadata. -#[derive(Clone, Debug, Default)] -pub struct FileStats { - pub num_records: i64, - pub size_bytes: i64, -} diff --git a/crates/core/src/storage/mod.rs b/crates/core/src/storage/mod.rs index 48ab293a..4f5aec37 100644 --- a/crates/core/src/storage/mod.rs +++ b/crates/core/src/storage/mod.rs @@ -38,12 +38,11 @@ use crate::config::table::HudiTableConfig; use crate::config::HudiConfigs; use crate::storage::error::Result; use crate::storage::error::StorageError::{Creation, InvalidPath}; -use crate::storage::file_info::FileInfo; +use crate::storage::file_metadata::FileMetadata; use crate::storage::util::join_url_segments; pub mod error; -pub mod file_info; -pub mod file_stats; +pub mod file_metadata; pub mod util; #[allow(dead_code)] @@ -105,22 +104,41 @@ impl Storage { } #[cfg(test)] - async fn get_file_info(&self, relative_path: &str) -> Result { + async fn get_file_metadata_not_populated(&self, relative_path: &str) -> Result { let obj_url = join_url_segments(&self.base_url, &[relative_path])?; let obj_path = ObjPath::from_url_path(obj_url.path())?; let meta = self.object_store.head(&obj_path).await?; - let uri = obj_url.to_string(); - let name = obj_path + let name = meta.location.filename().ok_or_else(|| { + InvalidPath(format!("Failed to get file name from: {:?}", meta.location)) + })?; + Ok(FileMetadata::new(name.to_string(), meta.size)) + } + + pub async fn get_file_metadata(&self, relative_path: &str) -> Result { + let obj_url = join_url_segments(&self.base_url, &[relative_path])?; + let obj_path = ObjPath::from_url_path(obj_url.path())?; + let obj_store = self.object_store.clone(); + let obj_meta = obj_store.head(&obj_path).await?; + let location = obj_meta.location.clone(); + let file_name = location .filename() - .ok_or(InvalidPath(format!( - "Failed to get file name from {:?}", - obj_path - )))? - .to_string(); - Ok(FileInfo { - uri, - name, - size: meta.size, + .ok_or_else(|| InvalidPath(format!("Failed to get file name from: {:?}", &obj_meta)))?; + let size = obj_meta.size; + let reader = ParquetObjectReader::new(obj_store, obj_meta); + let builder = ParquetRecordBatchStreamBuilder::new(reader).await?; + let parquet_meta = builder.metadata().clone(); + let num_records = parquet_meta.file_metadata().num_rows(); + let size_bytes = parquet_meta + .row_groups() + .iter() + .map(|rg| rg.total_byte_size()) + .sum::(); + Ok(FileMetadata { + name: file_name.to_string(), + size, + byte_size: size_bytes, + num_records, + fully_populated: true, }) } @@ -196,33 +214,22 @@ impl Storage { Ok(list_res.common_prefixes) } - pub async fn list_files(&self, subdir: Option<&str>) -> Result> { + pub async fn list_files(&self, subdir: Option<&str>) -> Result> { let prefix_url = join_url_segments(&self.base_url, &[subdir.unwrap_or_default()])?; let prefix_path = ObjPath::from_url_path(prefix_url.path())?; let list_res = self .object_store .list_with_delimiter(Some(&prefix_path)) .await?; - let mut file_info = Vec::new(); + let mut file_metadata = Vec::new(); for obj_meta in list_res.objects { - let name = obj_meta - .location + let location = obj_meta.location; + let name = location .filename() - .ok_or_else(|| { - InvalidPath(format!( - "Failed to get file name from {:?}", - obj_meta.location - )) - })? - .to_string(); - let uri = join_url_segments(&prefix_url, &[&name])?.to_string(); - file_info.push(FileInfo { - uri, - name, - size: obj_meta.size, - }); + .ok_or_else(|| InvalidPath(format!("Failed to get file name from {location:?}")))?; + file_metadata.push(FileMetadata::new(name.to_string(), obj_meta.size)); } - Ok(file_info) + Ok(file_metadata) } } @@ -349,54 +356,27 @@ mod tests { ) .unwrap(); let storage = Storage::new_with_base_url(base_url).unwrap(); - let file_info_1: Vec = storage + let file_info_1: Vec = storage .list_files(None) .await .unwrap() .into_iter() .collect(); - assert_eq!( - file_info_1, - vec![FileInfo { - uri: join_url_segments(&storage.base_url, &["a.parquet"]) - .unwrap() - .to_string(), - name: "a.parquet".to_string(), - size: 0, - }] - ); - let file_info_2: Vec = storage + assert_eq!(file_info_1, vec![FileMetadata::new("a.parquet", 0)]); + let file_info_2: Vec = storage .list_files(Some("part1")) .await .unwrap() .into_iter() .collect(); - assert_eq!( - file_info_2, - vec![FileInfo { - uri: join_url_segments(&storage.base_url, &["part1/b.parquet"]) - .unwrap() - .to_string(), - name: "b.parquet".to_string(), - size: 0, - }] - ); - let file_info_3: Vec = storage + assert_eq!(file_info_2, vec![FileMetadata::new("b.parquet", 0)],); + let file_info_3: Vec = storage .list_files(Some("part2/part22")) .await .unwrap() .into_iter() .collect(); - assert_eq!( - file_info_3, - vec![FileInfo { - uri: join_url_segments(&storage.base_url, &["part2/part22/c.parquet"]) - .unwrap() - .to_string(), - name: "c.parquet".to_string(), - size: 0, - }] - ); + assert_eq!(file_info_3, vec![FileMetadata::new("c.parquet", 0)],); } #[tokio::test] @@ -432,15 +412,12 @@ mod tests { let base_url = Url::from_directory_path(canonicalize(Path::new("tests/data")).unwrap()).unwrap(); let storage = Storage::new_with_base_url(base_url).unwrap(); - let file_info = storage.get_file_info("a.parquet").await.unwrap(); - assert_eq!(file_info.name, "a.parquet"); - assert_eq!( - file_info.uri, - join_url_segments(&storage.base_url, &["a.parquet"]) - .unwrap() - .to_string() - ); - assert_eq!(file_info.size, 866); + let file_metadata = storage + .get_file_metadata_not_populated("a.parquet") + .await + .unwrap(); + assert_eq!(file_metadata.name, "a.parquet"); + assert_eq!(file_metadata.size, 866); } #[tokio::test] diff --git a/crates/core/src/table/fs_view.rs b/crates/core/src/table/fs_view.rs index eef3150f..4e6aa1d2 100644 --- a/crates/core/src/table/fs_view.rs +++ b/crates/core/src/table/fs_view.rs @@ -21,12 +21,13 @@ use std::collections::{HashMap, HashSet}; use std::sync::Arc; use crate::config::HudiConfigs; -use crate::file_group::{BaseFile, FileGroup, FileSlice}; -use crate::storage::file_info::FileInfo; +use crate::file_group::base_file::BaseFile; +use crate::file_group::{FileGroup, FileSlice}; use crate::storage::{get_leaf_dirs, Storage}; use crate::config::read::HudiReadConfig::ListingParallelism; use crate::error::CoreError; +use crate::storage::file_metadata::FileMetadata; use crate::table::partition::PartitionPruner; use crate::Result; use dashmap::DashMap; @@ -91,7 +92,7 @@ impl FileSystemView { storage: &Storage, partition_path: &str, ) -> Result> { - let file_info: Vec = storage + let file_metadata: Vec = storage .list_files(Some(partition_path)) .await? .into_iter() @@ -99,8 +100,8 @@ impl FileSystemView { .collect(); let mut fg_id_to_base_files: HashMap> = HashMap::new(); - for f in file_info { - let base_file = BaseFile::from_file_info(f)?; + for metadata in file_metadata { + let base_file = BaseFile::try_from(metadata)?; let fg_id = &base_file.file_group_id; fg_id_to_base_files .entry(fg_id.to_owned()) @@ -163,7 +164,7 @@ impl FileSystemView { continue; } if let Some(fsl) = fg.get_file_slice_mut_as_of(timestamp) { - fsl.load_stats(&self.storage).await?; + fsl.load_metadata_if_needed(&self.storage).await?; file_slices.push(fsl.clone()); } } @@ -260,7 +261,7 @@ mod tests { .collect::>(); assert_eq!(fg_ids, vec!["a079bdb3-731c-4894-b855-abfcd6921007-0"]); for fsl in file_slices.iter() { - assert_eq!(fsl.base_file.stats.as_ref().unwrap().num_records, 4); + assert_eq!(fsl.base_file.file_metadata.as_ref().unwrap().num_records, 4); } } @@ -289,7 +290,7 @@ mod tests { .collect::>(); assert_eq!(fg_ids, vec!["ebcb261d-62d3-4895-90ec-5b3c9622dff4-0"]); for fsl in file_slices.iter() { - assert_eq!(fsl.base_file.stats.as_ref().unwrap().num_records, 1); + assert_eq!(fsl.base_file.file_metadata.as_ref().unwrap().num_records, 1); } } @@ -330,7 +331,7 @@ mod tests { .collect::>(); assert_eq!(fg_ids, vec!["a22e8257-e249-45e9-ba46-115bc85adcba-0"]); for fsl in file_slices.iter() { - assert_eq!(fsl.base_file.stats.as_ref().unwrap().num_records, 2); + assert_eq!(fsl.base_file.file_metadata.as_ref().unwrap().num_records, 2); } } } diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs index 6d05c717..2c50fc74 100644 --- a/crates/core/src/table/mod.rs +++ b/crates/core/src/table/mod.rs @@ -61,6 +61,7 @@ //! use url::Url; //! use hudi_core::table::Table; //! use hudi_core::storage::util::parse_uri; +//! use hudi_core::storage::util::join_url_segments; //! //! pub async fn test() { //! let base_uri = Url::from_file_path("/tmp/hudi_data").unwrap(); @@ -74,8 +75,8 @@ //! let file_group_vec = file_slice_vec //! .iter() //! .map(|f| { -//! let url = parse_uri(&f.base_file.info.uri).unwrap(); -//! let size = f.base_file.info.size as u64; +//! let relative_path = f.base_file_relative_path().unwrap(); +//! let url = join_url_segments(&base_uri, &[relative_path.as_str()]).unwrap(); //! url.path().to_string() //! }) //! .collect(); @@ -372,8 +373,11 @@ mod tests { /// Test helper to get relative file paths from the table with filters. async fn get_file_paths_with_filters(table: &Table, filters: &[Filter]) -> Result> { let mut file_paths = Vec::new(); + let base_url = table.base_url()?; for f in table.get_file_slices(filters).await? { - file_paths.push(f.base_file_path().to_string()); + let relative_path = f.base_file_relative_path()?; + let file_url = join_url_segments(&base_url, &[relative_path.as_str()])?; + file_paths.push(file_url.to_string()); } Ok(file_paths) } @@ -710,7 +714,7 @@ mod tests { assert_eq!( file_slices .iter() - .map(|f| f.base_file_relative_path()) + .map(|f| f.base_file_relative_path().unwrap()) .collect::>(), vec!["a079bdb3-731c-4894-b855-abfcd6921007-0_0-203-274_20240418173551906.parquet",] ); @@ -724,7 +728,7 @@ mod tests { assert_eq!( file_slices .iter() - .map(|f| f.base_file_relative_path()) + .map(|f| f.base_file_relative_path().unwrap()) .collect::>(), vec!["a079bdb3-731c-4894-b855-abfcd6921007-0_0-203-274_20240418173551906.parquet",] ); @@ -738,7 +742,7 @@ mod tests { assert_eq!( file_slices .iter() - .map(|f| f.base_file_relative_path()) + .map(|f| f.base_file_relative_path().unwrap()) .collect::>(), vec!["a079bdb3-731c-4894-b855-abfcd6921007-0_0-182-253_20240418173550988.parquet",] ); @@ -752,7 +756,7 @@ mod tests { assert_eq!( file_slices .iter() - .map(|f| f.base_file_relative_path()) + .map(|f| f.base_file_relative_path().unwrap()) .collect::>(), Vec::::new() ); diff --git a/crates/datafusion/src/lib.rs b/crates/datafusion/src/lib.rs index 718dc61b..0c27e749 100644 --- a/crates/datafusion/src/lib.rs +++ b/crates/datafusion/src/lib.rs @@ -45,7 +45,7 @@ use datafusion_physical_expr::create_physical_expr; use crate::util::expr::exprs_to_filters; use hudi_core::config::read::HudiReadConfig::InputPartitions; use hudi_core::config::util::empty_options; -use hudi_core::storage::util::{get_scheme_authority, parse_uri}; +use hudi_core::storage::util::{get_scheme_authority, join_url_segments}; use hudi_core::table::Table as HudiTable; /// Create a `HudiDataSource`. @@ -181,16 +181,26 @@ impl TableProvider for HudiDataSource { .get_file_slices_splits(self.get_input_partitions(), pushdown_filters.as_slice()) .await .map_err(|e| Execution(format!("Failed to get file slices from Hudi table: {}", e)))?; + let base_url = self.table.base_url().map_err(|e| { + Execution(format!( + "Failed to get base path config from Hudi table: {e:?}" + )) + })?; let mut parquet_file_groups: Vec> = Vec::new(); for file_slice_vec in file_slices { - let parquet_file_group_vec = file_slice_vec - .iter() - .map(|f| { - let url = parse_uri(&f.base_file.info.uri).unwrap(); - let size = f.base_file.info.size as u64; - PartitionedFile::new(url.path(), size) - }) - .collect(); + let mut parquet_file_group_vec = Vec::new(); + for f in file_slice_vec { + let relative_path = f.base_file_relative_path().map_err(|e| { + Execution(format!( + "Failed to get base file relative path for {f:?} due to {e:?}" + )) + })?; + let url = join_url_segments(&base_url, &[relative_path.as_str()]) + .map_err(|e| Execution(format!("Failed to join URL segments: {e:?}")))?; + let size = f.base_file.file_metadata.as_ref().map_or(0, |m| m.size); + let partitioned_file = PartitionedFile::new(url.path(), size as u64); + parquet_file_group_vec.push(partitioned_file); + } parquet_file_groups.push(parquet_file_group_vec) } diff --git a/python/hudi/_internal.pyi b/python/hudi/_internal.pyi index 4235d171..d5e046f3 100644 --- a/python/hudi/_internal.pyi +++ b/python/hudi/_internal.pyi @@ -60,22 +60,22 @@ class HudiFileSlice: the partition it belongs to, and associated metadata. Attributes: - file_group_id (str): The ID of the file group this slice belongs to. + file_group_id (str): The id of the file group this file slice belongs to. partition_path (str): The path of the partition containing this file slice. - commit_time (str): The commit time of this file slice. + creation_instant_time (str): The creation instant time of this file slice. base_file_name (str): The name of the base file. - base_file_size (int): The size of the base file. - num_records (int): The number of records in the base file. - size_bytes (int): The size of the file slice in bytes. + base_file_size (int): The on-disk size of the base file in bytes. + base_file_byte_size (int): The in-memory size of the base file in bytes. + num_records (int): The number of records in the file slice. """ file_group_id: str partition_path: str - commit_time: str + creation_instant_time: str base_file_name: str base_file_size: int + base_file_byte_size: int num_records: int - size_bytes: int def base_file_relative_path(self) -> str: """ diff --git a/python/src/internal.rs b/python/src/internal.rs index 1aef081a..4c9b7fd6 100644 --- a/python/src/internal.rs +++ b/python/src/internal.rs @@ -94,15 +94,15 @@ pub struct HudiFileSlice { #[pyo3(get)] partition_path: String, #[pyo3(get)] - commit_time: String, + creation_instant_time: String, #[pyo3(get)] base_file_name: String, #[pyo3(get)] base_file_size: usize, #[pyo3(get)] - num_records: i64, + base_file_byte_size: i64, #[pyo3(get)] - size_bytes: i64, + num_records: i64, } #[cfg(not(tarpaulin))] @@ -126,21 +126,21 @@ impl HudiFileSlice { #[cfg(not(tarpaulin))] fn convert_file_slice(f: &FileSlice) -> HudiFileSlice { let file_group_id = f.file_group_id().to_string(); - let partition_path = f.partition_path.as_deref().unwrap_or_default().to_string(); - let commit_time = f.base_file.commit_time.to_string(); - let base_file_name = f.base_file.info.name.clone(); - let base_file_size = f.base_file.info.size; - let stats = f.base_file.stats.clone().unwrap_or_default(); - let num_records = stats.num_records; - let size_bytes = stats.size_bytes; + let partition_path = f.partition_path().to_string(); + let creation_instant_time = f.creation_instant_time().to_string(); + let base_file_name = f.base_file.file_name.clone(); + let file_metadata = f.base_file.file_metadata.clone().unwrap_or_default(); + let base_file_size = file_metadata.size; + let base_file_byte_size = file_metadata.byte_size; + let num_records = file_metadata.num_records; HudiFileSlice { file_group_id, partition_path, - commit_time, + creation_instant_time, base_file_name, base_file_size, + base_file_byte_size, num_records, - size_bytes, } } diff --git a/python/tests/test_table_read.py b/python/tests/test_table_read.py index f8986a8e..c35be59c 100644 --- a/python/tests/test_table_read.py +++ b/python/tests/test_table_read.py @@ -51,12 +51,12 @@ def test_read_table_returns_correct_file_slices(get_sample_table): file_slices = table.get_file_slices() assert len(file_slices) == 5 - assert set(f.commit_time for f in file_slices) == { + assert set(f.creation_instant_time for f in file_slices) == { "20240402123035233", "20240402144910683", } assert all(f.num_records == 1 for f in file_slices) - assert all(f.size_bytes > 0 for f in file_slices) + assert all(f.base_file_byte_size > 0 for f in file_slices) file_slice_paths = [f.base_file_relative_path() for f in file_slices] assert set(file_slice_paths) == { "chennai/68d3c349-f621-4cd8-9e8b-c6dd8eb20d08-0_4-12-0_20240402123035233.parquet",