Skip to content

Commit

Permalink
refactor: improve BaseFile APIs (#239)
Browse files Browse the repository at this point in the history
- Consolidate `FileInfo` and `FileStats` into `FileMetadata`
- Improve variable and struct property naming
- Make more ergonomical APIs
  • Loading branch information
xushiyan authored Jan 8, 2025
1 parent a396f6c commit 1779e77
Show file tree
Hide file tree
Showing 12 changed files with 325 additions and 285 deletions.
134 changes: 134 additions & 0 deletions crates/core/src/file_group/base_file.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
use crate::error::CoreError;
use crate::storage::file_metadata::FileMetadata;
use crate::Result;

/// Hudi Base file, part of a [FileSlice].
#[derive(Clone, Debug)]
pub struct BaseFile {
/// The file name of the base file.
pub file_name: String,

/// The id of the enclosing file group.
pub file_group_id: String,

/// The associated instant time of the base file.
pub instant_time: String,

/// The metadata about the file.
pub file_metadata: Option<FileMetadata>,
}

impl BaseFile {
/// Parse file name and extract `file_group_id` and `instant_time`.
fn parse_file_name(file_name: &str) -> Result<(String, String)> {
let err_msg = format!("Failed to parse file name '{file_name}' for base file.");
let (name, _) = file_name
.rsplit_once('.')
.ok_or_else(|| CoreError::FileGroup(err_msg.clone()))?;
let parts: Vec<&str> = name.split('_').collect();
let file_group_id = parts
.first()
.ok_or_else(|| CoreError::FileGroup(err_msg.clone()))?
.to_string();
let instant_time = parts
.get(2)
.ok_or_else(|| CoreError::FileGroup(err_msg.clone()))?
.to_string();
Ok((file_group_id, instant_time))
}
}

impl TryFrom<&str> for BaseFile {
type Error = CoreError;

fn try_from(file_name: &str) -> Result<Self> {
let (file_group_id, instant_time) = Self::parse_file_name(file_name)?;
Ok(Self {
file_name: file_name.to_string(),
file_group_id,
instant_time,
file_metadata: None,
})
}
}

impl TryFrom<FileMetadata> for BaseFile {
type Error = CoreError;

fn try_from(metadata: FileMetadata) -> Result<Self> {
let file_name = metadata.name.clone();
let (file_group_id, instant_time) = Self::parse_file_name(&file_name)?;
Ok(Self {
file_name,
file_group_id,
instant_time,
file_metadata: Some(metadata),
})
}
}

#[cfg(test)]
mod tests {
use super::*;
use hudi_tests::assert_not;

#[test]
fn test_create_base_file_from_file_name() {
let file_name = "5a226868-2934-4f84-a16f-55124630c68d-0_0-7-24_20240402144910683.parquet";
let base_file = BaseFile::try_from(file_name).unwrap();
assert_eq!(
base_file.file_group_id,
"5a226868-2934-4f84-a16f-55124630c68d-0"
);
assert_eq!(base_file.instant_time, "20240402144910683");
assert!(base_file.file_metadata.is_none());
}

#[test]
fn test_create_base_file_from_metadata() {
let metadata = FileMetadata::new(
"5a226868-2934-4f84-a16f-55124630c68d-0_0-7-24_20240402144910683.parquet",
1024,
);
let base_file = BaseFile::try_from(metadata).unwrap();
assert_eq!(
base_file.file_group_id,
"5a226868-2934-4f84-a16f-55124630c68d-0"
);
assert_eq!(base_file.instant_time, "20240402144910683");
let file_metadata = base_file.file_metadata.unwrap();
assert_eq!(file_metadata.size, 1024);
assert_not!(file_metadata.fully_populated);
}

#[test]
fn create_a_base_file_returns_error() {
let result = BaseFile::try_from("no_file_extension");
assert!(matches!(result.unwrap_err(), CoreError::FileGroup(_)));

let result = BaseFile::try_from(".parquet");
assert!(matches!(result.unwrap_err(), CoreError::FileGroup(_)));

let metadata = FileMetadata::new("no-valid-delimiter.parquet", 1024);
let result = BaseFile::try_from(metadata);
assert!(matches!(result.unwrap_err(), CoreError::FileGroup(_)));
}
}
185 changes: 51 additions & 134 deletions crates/core/src/file_group/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@
//!
//! A set of data/base files + set of log files, that make up a unit for all operations.
pub mod base_file;
pub mod builder;
pub mod reader;

use crate::error::CoreError;
use crate::storage::file_info::FileInfo;
use crate::storage::file_stats::FileStats;
use crate::file_group::base_file::BaseFile;
use crate::storage::Storage;
use crate::Result;
use std::collections::BTreeMap;
Expand All @@ -34,70 +34,7 @@ use std::fmt::Formatter;
use std::hash::{Hash, Hasher};
use std::path::PathBuf;

/// Represents common metadata about a Hudi Base File.
#[derive(Clone, Debug)]
pub struct BaseFile {
/// The file group id that is unique across the table.
pub file_group_id: String,

pub commit_time: String,

pub info: FileInfo,

pub stats: Option<FileStats>,
}

impl BaseFile {
/// Parse file name and extract file_group_id and commit_time.
fn parse_file_name(file_name: &str) -> Result<(String, String)> {
let err_msg = format!("Failed to parse file name '{}' for base file.", file_name);
let (name, _) = file_name
.rsplit_once('.')
.ok_or(CoreError::FileGroup(err_msg.clone()))?;
let parts: Vec<&str> = name.split('_').collect();
let file_group_id = parts
.first()
.ok_or(CoreError::FileGroup(err_msg.clone()))?
.to_string();
let commit_time = parts
.get(2)
.ok_or(CoreError::FileGroup(err_msg.clone()))?
.to_string();
Ok((file_group_id, commit_time))
}

/// Construct [BaseFile] with the base file name.
///
/// TODO: refactor such that file info size is optional and no one expects it.
pub fn from_file_name(file_name: &str) -> Result<Self> {
let (file_group_id, commit_time) = Self::parse_file_name(file_name)?;
let info = FileInfo {
name: file_name.to_string(),
size: 0,
uri: "".to_string(),
};
Ok(Self {
file_group_id,
commit_time,
info,
stats: None,
})
}

/// Construct [BaseFile] with the [FileInfo].
pub fn from_file_info(info: FileInfo) -> Result<Self> {
let (file_group_id, commit_time) = Self::parse_file_name(&info.name)?;
Ok(Self {
file_group_id,
commit_time,
info,
stats: None,
})
}
}

/// Within a file group, a slice is a combination of data file written at a commit time and list of log files,
/// containing changes to the data file from that commit time.
/// Within a [FileGroup], a [FileSlice] is a logical group of [BaseFile] and log files.
///
/// [note] The log files are not yet supported.
#[derive(Clone, Debug)]
Expand All @@ -107,47 +44,56 @@ pub struct FileSlice {
}

impl FileSlice {
#[cfg(test)]
pub fn base_file_path(&self) -> &str {
self.base_file.info.uri.as_str()
pub fn new(base_file: BaseFile, partition_path: Option<String>) -> Self {
Self {
base_file,
partition_path,
}
}

pub fn base_file_relative_path(&self) -> String {
let ptn = self.partition_path.as_deref().unwrap_or_default();
let file_name = &self.base_file.info.name;
PathBuf::from(ptn)
.join(file_name)
.to_str()
.unwrap()
.to_string()
/// Returns the relative path of the base file.
pub fn base_file_relative_path(&self) -> Result<String> {
let file_name = &self.base_file.file_name;
let path = PathBuf::from(self.partition_path()).join(file_name);
path.to_str().map(|s| s.to_string()).ok_or_else(|| {
CoreError::FileGroup(format!(
"Failed to get base file relative path for file slice: {:?}",
self
))
})
}

/// Returns the enclosing [FileGroup]'s id.
#[inline]
pub fn file_group_id(&self) -> &str {
&self.base_file.file_group_id
}

pub fn set_base_file(&mut self, base_file: BaseFile) {
self.base_file = base_file
/// Returns the partition path of the [FileSlice].
#[inline]
pub fn partition_path(&self) -> &str {
self.partition_path.as_deref().unwrap_or_default()
}

/// Load stats from storage layer for the base file if not already loaded.
pub async fn load_stats(&mut self, storage: &Storage) -> Result<()> {
if self.base_file.stats.is_none() {
let parquet_meta = storage
.get_parquet_file_metadata(&self.base_file_relative_path())
.await?;
let num_records = parquet_meta.file_metadata().num_rows();
let size_bytes = parquet_meta
.row_groups()
.iter()
.map(|rg| rg.total_byte_size())
.sum::<i64>();
let stats = FileStats {
num_records,
size_bytes,
};
self.base_file.stats = Some(stats);
/// Returns the instant time that marks the [FileSlice] creation.
///
/// This is also an instant time stored in the [Timeline].
#[inline]
pub fn creation_instant_time(&self) -> &str {
&self.base_file.instant_time
}

/// Load [FileMetadata] from storage layer for the [BaseFile] if `file_metadata` is [None]
/// or if `file_metadata` is not fully populated.
pub async fn load_metadata_if_needed(&mut self, storage: &Storage) -> Result<()> {
if let Some(metadata) = &self.base_file.file_metadata {
if metadata.fully_populated {
return Ok(());
}
}
let relative_path = self.base_file_relative_path()?;
let fetched_metadata = storage.get_file_metadata(&relative_path).await?;
self.base_file.file_metadata = Some(fetched_metadata);
Ok(())
}
}
Expand Down Expand Up @@ -207,25 +153,21 @@ impl FileGroup {
}

pub fn add_base_file_from_name(&mut self, file_name: &str) -> Result<&Self> {
let base_file = BaseFile::from_file_name(file_name)?;
let base_file = BaseFile::try_from(file_name)?;
self.add_base_file(base_file)
}

pub fn add_base_file(&mut self, base_file: BaseFile) -> Result<&Self> {
let commit_time = base_file.commit_time.as_str();
if self.file_slices.contains_key(commit_time) {
let instant_time = base_file.instant_time.as_str();
if self.file_slices.contains_key(instant_time) {
Err(CoreError::FileGroup(format!(
"Commit time {0} is already present in File Group {1}",
commit_time.to_owned(),
self.id,
"Instant time {instant_time} is already present in File Group {}",
self.id
)))
} else {
self.file_slices.insert(
commit_time.to_owned(),
FileSlice {
partition_path: self.partition_path.clone(),
base_file,
},
instant_time.to_owned(),
FileSlice::new(base_file, self.partition_path.clone()),
);
Ok(self)
}
Expand Down Expand Up @@ -254,31 +196,6 @@ impl FileGroup {
mod tests {
use super::*;

#[test]
fn create_a_base_file_successfully() {
let base_file = BaseFile::from_file_name(
"5a226868-2934-4f84-a16f-55124630c68d-0_0-7-24_20240402144910683.parquet",
)
.unwrap();
assert_eq!(
base_file.file_group_id,
"5a226868-2934-4f84-a16f-55124630c68d-0"
);
assert_eq!(base_file.commit_time, "20240402144910683");
}

#[test]
fn create_a_base_file_returns_error() {
let result = BaseFile::from_file_name("no_file_extension");
assert!(matches!(result.unwrap_err(), CoreError::FileGroup(_)));

let result = BaseFile::from_file_name(".parquet");
assert!(matches!(result.unwrap_err(), CoreError::FileGroup(_)));

let result = BaseFile::from_file_name("no-valid-delimiter.parquet");
assert!(matches!(result.unwrap_err(), CoreError::FileGroup(_)));
}

#[test]
fn load_a_valid_file_group() {
let mut fg = FileGroup::new("5a226868-2934-4f84-a16f-55124630c68d-0".to_owned(), None);
Expand All @@ -296,7 +213,7 @@ mod tests {
fg.get_file_slice_as_of("20240402123035233")
.unwrap()
.base_file
.commit_time,
.instant_time,
"20240402123035233"
);
assert!(fg.get_file_slice_as_of("-1").is_none());
Expand All @@ -313,7 +230,7 @@ mod tests {
"5a226868-2934-4f84-a16f-55124630c68d-0_2-10-0_20240402144910683.parquet",
);
assert!(res2.is_err());
assert_eq!(res2.unwrap_err().to_string(), "File group error: Commit time 20240402144910683 is already present in File Group 5a226868-2934-4f84-a16f-55124630c68d-0");
assert_eq!(res2.unwrap_err().to_string(), "File group error: Instant time 20240402144910683 is already present in File Group 5a226868-2934-4f84-a16f-55124630c68d-0");
}

#[test]
Expand Down
Loading

0 comments on commit 1779e77

Please sign in to comment.