From 949f4d6d976b837ae7473cbb53e6e3e23f8a0c19 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Thu, 5 Dec 2024 12:17:53 +0530 Subject: [PATCH 01/24] feat: split methods and use unordered futures --- src/hottier.rs | 83 +++++++++++++++++++++++++++++--------------------- 1 file changed, 49 insertions(+), 34 deletions(-) diff --git a/src/hottier.rs b/src/hottier.rs index cc4e4bbb1..cbe690e83 100644 --- a/src/hottier.rs +++ b/src/hottier.rs @@ -408,9 +408,8 @@ impl HotTierManager { ); self.put_hot_tier(stream, &mut stream_hot_tier).await?; file_processed = true; - let mut hot_tier_manifest = self - .get_stream_hot_tier_manifest_for_date(stream, &date) - .await?; + let path = self.get_stream_path_for_date(stream, &date); + let mut hot_tier_manifest = HotTierManager::get_hot_tier_manifest_from_path(path).await?; hot_tier_manifest.files.push(parquet_file.clone()); hot_tier_manifest .files @@ -465,36 +464,40 @@ impl HotTierManager { Ok(date_list) } - ///get hot tier manifest for the stream and date - pub async fn get_stream_hot_tier_manifest_for_date( - &self, - stream: &str, - date: &NaiveDate, - ) -> Result { + ///get hot tier manifest on path + pub async fn get_hot_tier_manifest_from_path(path: PathBuf) -> Result { + if !path.exists() { + return Ok(Manifest::default()); + } + + // List the directories and prepare the hot tier manifest + let mut date_dirs = fs::read_dir(&path).await?; let mut hot_tier_manifest = Manifest::default(); - let path = self - .hot_tier_path - .join(stream) - .join(format!("date={}", date)); - if path.exists() { - let date_dirs = ReadDirStream::new(fs::read_dir(&path).await?); - let manifest_files: Vec = date_dirs.try_collect().await?; - for manifest in manifest_files { - if !manifest - .file_name() - .to_string_lossy() - .ends_with(".manifest.json") - { - continue; - } - let file = fs::read(manifest.path()).await?; - let manifest: Manifest = serde_json::from_slice(&file)?; - hot_tier_manifest.files.extend(manifest.files); + + // Avoid unnecessary checks and keep only valid manifest files + while let Some(manifest) = date_dirs.next_entry().await? { + if !manifest + .file_name() + .to_string_lossy() + .ends_with(".manifest.json") + { + continue; } + // Deserialize each manifest file and extend the hot tier manifest with its files + let file = fs::read(manifest.path()).await?; + let manifest: Manifest = serde_json::from_slice(&file)?; + hot_tier_manifest.files.extend(manifest.files); } + Ok(hot_tier_manifest) } + pub fn get_stream_path_for_date(&self, stream: &str, date: &NaiveDate) -> PathBuf { + self.hot_tier_path + .join(stream) + .join(format!("date={}", date)) + } + ///get the list of files from all the manifests present in hot tier directory for the stream pub async fn get_hot_tier_manifest_files( &self, @@ -527,17 +530,29 @@ impl HotTierManager { &self, stream: &str, ) -> Result, HotTierError> { - let mut hot_tier_parquet_files: Vec = Vec::new(); + // Fetch list of dates for the given stream let date_list = self.fetch_hot_tier_dates(stream).await?; + + // Create an unordered iter of futures to async collect files + let mut tasks = FuturesUnordered::new(); + + // For each date, fetch the manifest and extract parquet files for date in date_list { - let manifest = self - .get_stream_hot_tier_manifest_for_date(stream, &date) - .await?; + let path = self.get_stream_path_for_date(stream, &date); + tasks.push(async move { + HotTierManager::get_hot_tier_manifest_from_path(path) + .await + .map(|manifest| manifest.files.clone()) + .unwrap_or_default() // If fetching manifest fails, return an empty vector + }); + } - for parquet_file in manifest.files { - hot_tier_parquet_files.push(parquet_file.clone()); - } + // Collect parquet files for all dates + let mut hot_tier_parquet_files: Vec = vec![]; + while let Some(files) = tasks.next().await { + hot_tier_parquet_files.extend(files); } + Ok(hot_tier_parquet_files) } From 3c388c54ddc8eaf38b1676e6d8dca02cdfff4c16 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Tue, 14 Jan 2025 14:33:33 +0530 Subject: [PATCH 02/24] refactor: map --- src/hottier.rs | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/src/hottier.rs b/src/hottier.rs index d175c52e1..d4dbf8bcc 100644 --- a/src/hottier.rs +++ b/src/hottier.rs @@ -72,18 +72,19 @@ impl HotTierManager { pub fn global() -> Option<&'static HotTierManager> { static INSTANCE: OnceCell = OnceCell::new(); - let hot_tier_path = &CONFIG.options.hot_tier_storage_path; - if hot_tier_path.is_none() { - return None; - } - Some(INSTANCE.get_or_init(|| { - let hot_tier_path = hot_tier_path.as_ref().unwrap().clone(); + CONFIG + .options + .hot_tier_storage_path + .clone() + .map(|hot_tier_path| { + INSTANCE.get_or_init(|| { std::fs::create_dir_all(&hot_tier_path).unwrap(); HotTierManager { filesystem: LocalFileSystem::new(), hot_tier_path, } - })) + }) + }) } ///get the total hot tier size for all streams From 26f818d227eed7fcf936b339638e5529bafd0e90 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Tue, 14 Jan 2025 14:34:20 +0530 Subject: [PATCH 03/24] refactor: get disk usage as method --- src/hottier.rs | 63 +++++++++++++++++++++++++------------------------- 1 file changed, 32 insertions(+), 31 deletions(-) diff --git a/src/hottier.rs b/src/hottier.rs index d4dbf8bcc..db0d5f268 100644 --- a/src/hottier.rs +++ b/src/hottier.rs @@ -133,7 +133,9 @@ impl HotTierManager { total_space, used_space, .. - } = get_disk_usage().expect("Codepath should only be hit if hottier is enabled"); + } = self + .get_disk_usage() + .expect("Codepath should only be hit if hottier is enabled"); let (total_hot_tier_size, total_hot_tier_used_size) = self.get_hot_tiers_size(stream).await?; @@ -642,7 +644,7 @@ impl HotTierManager { total_space, available_space, used_space, - }) = get_disk_usage() + }) = self.get_disk_usage() { if available_space < size_to_download { return Ok(false); @@ -726,6 +728,34 @@ impl HotTierManager { } Ok(()) } + + /// Get the disk usage for the hot tier storage path. If we have a three disk paritions + /// mounted as follows: + /// 1. / + /// 2. /home/parseable + /// 3. /home/example/ignore + /// + /// And parseable is running with `P_HOT_TIER_DIR` pointing to a directory in + /// `/home/parseable`, we should return the usage stats of the disk mounted there. + fn get_disk_usage(&self) -> Option { + let mut disks = Disks::new_with_refreshed_list(); + // Order the disk partitions by decreasing length of mount path + disks.sort_by_key(|disk| disk.mount_point().to_str().unwrap().len()); + disks.reverse(); + + for disk in disks.iter() { + // Returns disk utilisation of first matching mount point + if self.hot_tier_path.starts_with(disk.mount_point()) { + return Some(DiskUtil { + total_space: disk.total_space(), + available_space: disk.available_space(), + used_space: disk.total_space() - disk.available_space(), + }); + } + } + + None + } } /// get the hot tier file path for the stream @@ -743,35 +773,6 @@ struct DiskUtil { used_space: u64, } -/// Get the disk usage for the hot tier storage path. If we have a three disk paritions -/// mounted as follows: -/// 1. / -/// 2. /home/parseable -/// 3. /home/example/ignore -/// -/// And parseable is running with `P_HOT_TIER_DIR` pointing to a directory in -/// `/home/parseable`, we should return the usage stats of the disk mounted there. -fn get_disk_usage() -> Option { - let path = CONFIG.options.hot_tier_storage_path.as_ref()?; - let mut disks = Disks::new_with_refreshed_list(); - // Order the disk partitions by decreasing length of mount path - disks.sort_by_key(|disk| disk.mount_point().to_str().unwrap().len()); - disks.reverse(); - - for disk in disks.iter() { - // Returns disk utilisation of first matching mount point - if path.starts_with(disk.mount_point()) { - return Some(DiskUtil { - total_space: disk.total_space(), - available_space: disk.available_space(), - used_space: disk.total_space() - disk.available_space(), - }); - } - } - - None -} - async fn delete_empty_directory_hot_tier(path: &Path) -> io::Result<()> { async fn delete_helper(path: &Path) -> io::Result<()> { if path.is_dir() { From 592da070e1ab9e1818f337ef2c1791f0caa792a3 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Tue, 14 Jan 2025 14:37:14 +0530 Subject: [PATCH 04/24] method won't be called if hottier doesn't exist --- src/hottier.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/hottier.rs b/src/hottier.rs index db0d5f268..04bdacadf 100644 --- a/src/hottier.rs +++ b/src/hottier.rs @@ -713,9 +713,7 @@ impl HotTierManager { } pub async fn put_internal_stream_hot_tier(&self) -> Result<(), HotTierError> { - if CONFIG.options.hot_tier_storage_path.is_some() - && !self.check_stream_hot_tier_exists(INTERNAL_STREAM_NAME) - { + if !self.check_stream_hot_tier_exists(INTERNAL_STREAM_NAME) { let mut stream_hot_tier = StreamHotTier { version: Some(CURRENT_HOT_TIER_VERSION.to_string()), size: INTERNAL_STREAM_HOT_TIER_SIZE_BYTES.to_string(), From 4af43a5ab4196fc5a88a73b99d0edf26327fec88 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Tue, 14 Jan 2025 14:51:42 +0530 Subject: [PATCH 05/24] DRY: `get_stream_path_for_date` --- src/hottier.rs | 36 +++++++++++------------------------- 1 file changed, 11 insertions(+), 25 deletions(-) diff --git a/src/hottier.rs b/src/hottier.rs index 04bdacadf..58ce2a953 100644 --- a/src/hottier.rs +++ b/src/hottier.rs @@ -78,11 +78,11 @@ impl HotTierManager { .clone() .map(|hot_tier_path| { INSTANCE.get_or_init(|| { - std::fs::create_dir_all(&hot_tier_path).unwrap(); - HotTierManager { - filesystem: LocalFileSystem::new(), - hot_tier_path, - } + std::fs::create_dir_all(&hot_tier_path).unwrap(); + HotTierManager { + filesystem: LocalFileSystem::new(), + hot_tier_path, + } }) }) } @@ -391,9 +391,8 @@ impl HotTierManager { .sort_by_key(|file| file.file_path.clone()); // write the manifest file to the hot tier directory let manifest_path = self - .hot_tier_path - .join(stream) - .join(format!("date={}/hottier.manifest.json", date)); + .get_stream_path_for_date(stream, &date) + .join("hottier.manifest.json"); fs::create_dir_all(manifest_path.parent().unwrap()).await?; fs::write(manifest_path, serde_json::to_vec(&hot_tier_manifest)?).await?; Ok(file_processed) @@ -408,7 +407,7 @@ impl HotTierManager { dates: &[NaiveDate], ) -> Result<(), HotTierError> { for date in dates.iter() { - let path = self.hot_tier_path.join(format!("{}/date={}", stream, date)); + let path = self.get_stream_path_for_date(stream, &date); if path.exists() { fs::remove_dir_all(path.clone()).await?; } @@ -562,11 +561,7 @@ impl HotTierManager { let mut delete_successful = false; let dates = self.fetch_hot_tier_dates(stream).await?; 'loop_dates: for date in dates { - let date_str = date.to_string(); - let path = &self - .hot_tier_path - .join(stream) - .join(format!("date={}", date_str)); + let path = self.get_stream_path_for_date(stream, &date); if !path.exists() { continue; } @@ -588,12 +583,7 @@ impl HotTierManager { 'loop_files: while let Some(file_to_delete) = manifest.files.pop() { let file_size = file_to_delete.file_size; - let path_to_delete = CONFIG - .options - .hot_tier_storage_path - .as_ref() - .unwrap() - .join(&file_to_delete.file_path); + let path_to_delete = self.hot_tier_path.join(&file_to_delete.file_path); if path_to_delete.exists() { if let (Some(download_date_time), Some(delete_date_time)) = ( @@ -670,11 +660,7 @@ impl HotTierManager { } for date in date_list { - let path = self - .hot_tier_path - .join(stream) - .join(format!("date={}", date)); - + let path = self.get_stream_path_for_date(stream, &date); let hours_dir = ReadDirStream::new(fs::read_dir(&path).await?); let mut hours: Vec = hours_dir.try_collect().await?; hours.retain(|entry| { From 94c21899aa195d65da43928d8d26a540aa7b5b5f Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Wed, 15 Jan 2025 01:37:50 +0530 Subject: [PATCH 06/24] stream through results --- src/hottier.rs | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/src/hottier.rs b/src/hottier.rs index 58ce2a953..309677f2f 100644 --- a/src/hottier.rs +++ b/src/hottier.rs @@ -236,17 +236,14 @@ impl HotTierManager { ///sync the hot tier files from S3 to the hot tier directory for all streams async fn sync_hot_tier(&self) -> Result<(), HotTierError> { - let streams = STREAM_INFO.list_streams(); - let sync_hot_tier_tasks = FuturesUnordered::new(); - for stream in streams { + let mut sync_hot_tier_tasks = FuturesUnordered::new(); + for stream in STREAM_INFO.list_streams() { if self.check_stream_hot_tier_exists(&stream) { - sync_hot_tier_tasks.push(async move { self.process_stream(stream).await }); - //self.process_stream(stream).await?; + sync_hot_tier_tasks.push(self.process_stream(stream)); } } - let res: Vec<_> = sync_hot_tier_tasks.collect().await; - for res in res { + while let Some(res) = sync_hot_tier_tasks.next().await { if let Err(err) = res { error!("Failed to run hot tier sync task {err:?}"); return Err(err); From 7305873f34654bc2d43484c5b35325053d41e838 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Wed, 15 Jan 2025 02:13:47 +0530 Subject: [PATCH 07/24] refactor: `human_size` parsing --- src/handlers/http/logstream.rs | 30 +++++++++----- src/hottier.rs | 72 +++++++++++++++------------------- src/migration/mod.rs | 36 ++++------------- src/option.rs | 42 -------------------- src/utils/human_size.rs | 59 ++++++++++++++++++++++++++++ src/utils/mod.rs | 2 + src/validator.rs | 2 +- 7 files changed, 121 insertions(+), 122 deletions(-) create mode 100644 src/utils/human_size.rs diff --git a/src/handlers/http/logstream.rs b/src/handlers/http/logstream.rs index 58a6ba65c..de239bab1 100644 --- a/src/handlers/http/logstream.rs +++ b/src/handlers/http/logstream.rs @@ -50,7 +50,7 @@ use bytes::Bytes; use chrono::Utc; use http::{HeaderName, HeaderValue}; use itertools::Itertools; -use serde_json::Value; +use serde_json::{json, Value}; use std::collections::HashMap; use std::fs; use std::num::NonZeroU32; @@ -630,10 +630,10 @@ pub async fn put_stream_hot_tier( STREAM_INFO.set_hot_tier(&stream_name, true)?; if let Some(hot_tier_manager) = HotTierManager::global() { let existing_hot_tier_used_size = hot_tier_manager - .validate_hot_tier_size(&stream_name, &hottier.size) + .validate_hot_tier_size(&stream_name, hottier.size) .await?; - hottier.used_size = existing_hot_tier_used_size.to_string(); - hottier.available_size = hottier.size.to_string(); + hottier.used_size = existing_hot_tier_used_size; + hottier.available_size = hottier.size; hottier.version = Some(CURRENT_HOT_TIER_VERSION.to_string()); hot_tier_manager .put_hot_tier(&stream_name, &mut hottier) @@ -674,11 +674,23 @@ pub async fn get_stream_hot_tier(req: HttpRequest) -> Result, - pub size: String, - pub used_size: String, - pub available_size: String, - #[serde(skip_serializing_if = "Option::is_none")] + #[serde(with = "crate::utils::human_size")] + pub size: u64, + #[serde(with = "crate::utils::human_size")] + pub used_size: u64, + #[serde(with = "crate::utils::human_size")] + pub available_size: u64, pub oldest_date_time_entry: Option, } @@ -97,8 +99,8 @@ impl HotTierManager { for stream in STREAM_INFO.list_streams() { if self.check_stream_hot_tier_exists(&stream) && stream != current_stream { let stream_hot_tier = self.get_hot_tier(&stream).await?; - total_hot_tier_size += &stream_hot_tier.size.parse::().unwrap(); - total_hot_tier_used_size += stream_hot_tier.used_size.parse::().unwrap(); + total_hot_tier_size += &stream_hot_tier.size; + total_hot_tier_used_size += stream_hot_tier.used_size; } } Ok((total_hot_tier_size, total_hot_tier_used_size)) @@ -111,14 +113,13 @@ impl HotTierManager { pub async fn validate_hot_tier_size( &self, stream: &str, - stream_hot_tier_size: &str, + stream_hot_tier_size: u64, ) -> Result { - let stream_hot_tier_size = stream_hot_tier_size.parse::().unwrap(); let mut existing_hot_tier_used_size = 0; if self.check_stream_hot_tier_exists(stream) { //delete existing hot tier if its size is less than the updated hot tier size else return error let existing_hot_tier = self.get_hot_tier(stream).await?; - existing_hot_tier_used_size = existing_hot_tier.used_size.parse::().unwrap(); + existing_hot_tier_used_size = existing_hot_tier.used_size; if stream_hot_tier_size < existing_hot_tier_used_size { return Err(HotTierError::ObjectStorageError(ObjectStorageError::Custom(format!( @@ -169,20 +170,17 @@ impl HotTierManager { )); } let path = hot_tier_file_path(&self.hot_tier_path, stream)?; - let res = self + let bytes = self .filesystem .get(&path) .and_then(|resp| resp.bytes()) - .await; - match res { - Ok(bytes) => { - let mut stream_hot_tier: StreamHotTier = serde_json::from_slice(&bytes)?; - let oldest_date_time_entry = self.get_oldest_date_time_entry(stream).await?; - stream_hot_tier.oldest_date_time_entry = oldest_date_time_entry; - Ok(stream_hot_tier) - } - Err(err) => Err(err.into()), - } + .await?; + + let mut stream_hot_tier: StreamHotTier = serde_json::from_slice(&bytes)?; + let oldest_date_time_entry = self.get_oldest_date_time_entry(stream).await?; + stream_hot_tier.oldest_date_time_entry = oldest_date_time_entry; + + Ok(stream_hot_tier) } pub async fn delete_hot_tier(&self, stream: &str) -> Result<(), HotTierError> { @@ -256,7 +254,7 @@ impl HotTierManager { /// delete the files from the hot tier directory if the available date range is outside the hot tier range async fn process_stream(&self, stream: String) -> Result<(), HotTierError> { let stream_hot_tier = self.get_hot_tier(&stream).await?; - let mut parquet_file_size = stream_hot_tier.used_size.parse::().unwrap(); + let mut parquet_file_size = stream_hot_tier.used_size; let object_store = CONFIG.storage().get_object_store(); let mut s3_manifest_file_list = object_store.list_manifest_files(&stream).await?; @@ -348,7 +346,7 @@ impl HotTierManager { let mut file_processed = false; let mut stream_hot_tier = self.get_hot_tier(stream).await?; if !self.is_disk_available(parquet_file.file_size).await? - || stream_hot_tier.available_size.parse::().unwrap() <= parquet_file.file_size + || stream_hot_tier.available_size <= parquet_file.file_size { if !self .cleanup_hot_tier_old_data( @@ -361,7 +359,7 @@ impl HotTierManager { { return Ok(file_processed); } - *parquet_file_size = stream_hot_tier.used_size.parse::().unwrap(); + *parquet_file_size = stream_hot_tier.used_size; } let parquet_file_path = RelativePathBuf::from(parquet_file.file_path.clone()); fs::create_dir_all(parquet_path.parent().unwrap()).await?; @@ -373,11 +371,9 @@ impl HotTierManager { .await?; file.write_all(&parquet_data).await?; *parquet_file_size += parquet_file.file_size; - stream_hot_tier.used_size = parquet_file_size.to_string(); + stream_hot_tier.used_size = *parquet_file_size; - stream_hot_tier.available_size = (stream_hot_tier.available_size.parse::().unwrap() - - parquet_file.file_size) - .to_string(); + stream_hot_tier.available_size = stream_hot_tier.available_size - parquet_file.file_size; self.put_hot_tier(stream, &mut stream_hot_tier).await?; file_processed = true; let path = self.get_stream_path_for_date(stream, &date); @@ -598,18 +594,12 @@ impl HotTierManager { fs::remove_dir_all(path_to_delete.parent().unwrap()).await?; delete_empty_directory_hot_tier(path_to_delete.parent().unwrap()).await?; - stream_hot_tier.used_size = - (stream_hot_tier.used_size.parse::().unwrap() - file_size) - .to_string(); - stream_hot_tier.available_size = - (stream_hot_tier.available_size.parse::().unwrap() + file_size) - .to_string(); + stream_hot_tier.used_size = stream_hot_tier.used_size - file_size; + stream_hot_tier.available_size = stream_hot_tier.available_size + file_size; self.put_hot_tier(stream, stream_hot_tier).await?; delete_successful = true; - if stream_hot_tier.available_size.parse::().unwrap() - <= parquet_file_size - { + if stream_hot_tier.available_size <= parquet_file_size { continue 'loop_files; } else { break 'loop_dates; @@ -699,9 +689,9 @@ impl HotTierManager { if !self.check_stream_hot_tier_exists(INTERNAL_STREAM_NAME) { let mut stream_hot_tier = StreamHotTier { version: Some(CURRENT_HOT_TIER_VERSION.to_string()), - size: INTERNAL_STREAM_HOT_TIER_SIZE_BYTES.to_string(), - used_size: "0".to_string(), - available_size: INTERNAL_STREAM_HOT_TIER_SIZE_BYTES.to_string(), + size: INTERNAL_STREAM_HOT_TIER_SIZE_BYTES, + used_size: 0, + available_size: INTERNAL_STREAM_HOT_TIER_SIZE_BYTES, oldest_date_time_entry: None, }; self.put_hot_tier(INTERNAL_STREAM_NAME, &mut stream_hot_tier) diff --git a/src/migration/mod.rs b/src/migration/mod.rs index 046a9dfc0..07713b39a 100644 --- a/src/migration/mod.rs +++ b/src/migration/mod.rs @@ -24,9 +24,8 @@ mod stream_metadata_migration; use std::{fs::OpenOptions, sync::Arc}; use crate::{ - hottier::{HotTierManager, CURRENT_HOT_TIER_VERSION}, metadata::load_stream_metadata_on_server_start, - option::{validation::human_size_to_bytes, Config, Mode, CONFIG}, + option::{Config, Mode, CONFIG}, storage::{ object_storage::{parseable_json_path, schema_path, stream_json_path}, ObjectStorage, ObjectStorageError, PARSEABLE_METADATA_FILE_NAME, PARSEABLE_ROOT_DIRECTORY, @@ -137,38 +136,17 @@ pub async fn run_migration(config: &Config) -> anyhow::Result<()> { let streams = storage.list_streams().await?; for stream in streams { migration_stream(&stream.name, &*storage).await?; - if CONFIG.options.hot_tier_storage_path.is_some() { - migration_hot_tier(&stream.name).await?; - } } Ok(()) } -/// run the migration for hot tier -async fn migration_hot_tier(stream: &str) -> anyhow::Result<()> { - if let Some(hot_tier_manager) = HotTierManager::global() { - if hot_tier_manager.check_stream_hot_tier_exists(stream) { - let mut stream_hot_tier = hot_tier_manager.get_hot_tier(stream).await?; - if stream_hot_tier.version.is_none() { - stream_hot_tier.version = Some(CURRENT_HOT_TIER_VERSION.to_string()); - stream_hot_tier.size = human_size_to_bytes(&stream_hot_tier.size) - .unwrap() - .to_string(); - stream_hot_tier.available_size = - human_size_to_bytes(&stream_hot_tier.available_size) - .unwrap() - .to_string(); - stream_hot_tier.used_size = human_size_to_bytes(&stream_hot_tier.used_size) - .unwrap() - .to_string(); - hot_tier_manager - .put_hot_tier(stream, &mut stream_hot_tier) - .await?; - } - } - } - Ok(()) +#[derive(Debug, serde::Deserialize, serde::Serialize)] +pub struct StreamHotTier { + pub version: Option, + pub size: String, + pub used_size: String, + pub available_size: String, } async fn migration_stream(stream: &str, storage: &dyn ObjectStorage) -> anyhow::Result<()> { diff --git a/src/option.rs b/src/option.rs index e6c2d9200..86e198713 100644 --- a/src/option.rs +++ b/src/option.rs @@ -192,13 +192,10 @@ pub mod validation { env, io, net::ToSocketAddrs, path::{Path, PathBuf}, - str::FromStr, }; use path_clean::PathClean; - use human_size::{multiples, SpecificSize}; - #[cfg(any( all(target_os = "linux", target_arch = "x86_64"), all(target_os = "macos", target_arch = "aarch64") @@ -285,45 +282,6 @@ pub mod validation { } } - pub fn human_size_to_bytes(s: &str) -> Result { - fn parse_and_map( - s: &str, - ) -> Result { - SpecificSize::::from_str(s).map(|x| x.to_bytes()) - } - - let size = parse_and_map::(s) - .or(parse_and_map::(s)) - .or(parse_and_map::(s)) - .or(parse_and_map::(s)) - .or(parse_and_map::(s)) - .or(parse_and_map::(s)) - .map_err(|_| "Could not parse given size".to_string())?; - Ok(size) - } - - pub fn bytes_to_human_size(bytes: u64) -> String { - const KIB: u64 = 1024; - const MIB: u64 = KIB * 1024; - const GIB: u64 = MIB * 1024; - const TIB: u64 = GIB * 1024; - const PIB: u64 = TIB * 1024; - - if bytes < KIB { - format!("{} B", bytes) - } else if bytes < MIB { - format!("{:.2} KB", bytes as f64 / KIB as f64) - } else if bytes < GIB { - format!("{:.2} MiB", bytes as f64 / MIB as f64) - } else if bytes < TIB { - format!("{:.2} GiB", bytes as f64 / GIB as f64) - } else if bytes < PIB { - format!("{:.2} TiB", bytes as f64 / TIB as f64) - } else { - format!("{:.2} PiB", bytes as f64 / PIB as f64) - } - } - pub fn validate_disk_usage(max_disk_usage: &str) -> Result { if let Ok(max_disk_usage) = max_disk_usage.parse::() { if (0.0..=100.0).contains(&max_disk_usage) { diff --git a/src/utils/human_size.rs b/src/utils/human_size.rs new file mode 100644 index 000000000..80862b3c1 --- /dev/null +++ b/src/utils/human_size.rs @@ -0,0 +1,59 @@ +use std::str::FromStr; + +use human_size::{multiples, SpecificSize}; +use serde::{de, Deserialize, Deserializer, Serializer}; + +// Function to convert human-readable size to bytes (already provided) +pub fn human_size_to_bytes(s: &str) -> Result { + fn parse_and_map(s: &str) -> Result { + SpecificSize::::from_str(s).map(|x| x.to_bytes()) + } + + let size = parse_and_map::(s) + .or(parse_and_map::(s)) + .or(parse_and_map::(s)) + .or(parse_and_map::(s)) + .or(parse_and_map::(s)) + .or(parse_and_map::(s)) + .map_err(|_| "Could not parse given size".to_string())?; + Ok(size) +} + +// Function to convert bytes to human-readable size (already provided) +pub fn bytes_to_human_size(bytes: u64) -> String { + const KIB: u64 = 1024; + const MIB: u64 = KIB * 1024; + const GIB: u64 = MIB * 1024; + const TIB: u64 = GIB * 1024; + const PIB: u64 = TIB * 1024; + + if bytes < KIB { + format!("{} B", bytes) + } else if bytes < MIB { + format!("{:.2} KB", bytes as f64 / KIB as f64) + } else if bytes < GIB { + format!("{:.2} MiB", bytes as f64 / MIB as f64) + } else if bytes < TIB { + format!("{:.2} GiB", bytes as f64 / GIB as f64) + } else if bytes < PIB { + format!("{:.2} TiB", bytes as f64 / TIB as f64) + } else { + format!("{:.2} PiB", bytes as f64 / PIB as f64) + } +} + +pub fn serialize(bytes: &u64, serializer: S) -> Result +where + S: Serializer, +{ + let human_readable = bytes_to_human_size(*bytes); + serializer.serialize_str(&human_readable) +} + +pub fn deserialize<'de, D>(deserializer: D) -> Result +where + D: Deserializer<'de>, +{ + let s = String::deserialize(deserializer)?; + human_size_to_bytes(&s).map_err(|e| de::Error::custom(e)) +} diff --git a/src/utils/mod.rs b/src/utils/mod.rs index e539b9e9f..7baf659ac 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -19,10 +19,12 @@ pub mod actix; pub mod arrow; pub mod header_parsing; +pub mod human_size; pub mod json; pub mod time; pub mod uid; pub mod update; + use crate::handlers::http::rbac::RBACError; use crate::option::CONFIG; use crate::rbac::role::{Action, Permission}; diff --git a/src/validator.rs b/src/validator.rs index bfa1dae02..7f6a8fcf4 100644 --- a/src/validator.rs +++ b/src/validator.rs @@ -23,8 +23,8 @@ use crate::alerts::rule::base::{NumericRule, StringRule}; use crate::alerts::rule::{ColumnRule, ConsecutiveNumericRule, ConsecutiveStringRule}; use crate::alerts::{Alerts, Rule}; use crate::hottier::MIN_STREAM_HOT_TIER_SIZE_BYTES; -use crate::option::validation::bytes_to_human_size; use crate::storage::StreamType; +use crate::utils::human_size::bytes_to_human_size; // Add more sql keywords here in lower case const DENIED_NAMES: &[&str] = &[ From 14535f15f7489dd82c698be1482f4cc59938f612 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Wed, 15 Jan 2025 02:37:05 +0530 Subject: [PATCH 08/24] ci: clippy suggestions --- src/hottier.rs | 8 ++++---- src/utils/human_size.rs | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/hottier.rs b/src/hottier.rs index 861541d8e..4b3fcd9cf 100644 --- a/src/hottier.rs +++ b/src/hottier.rs @@ -373,7 +373,7 @@ impl HotTierManager { *parquet_file_size += parquet_file.file_size; stream_hot_tier.used_size = *parquet_file_size; - stream_hot_tier.available_size = stream_hot_tier.available_size - parquet_file.file_size; + stream_hot_tier.available_size -= parquet_file.file_size; self.put_hot_tier(stream, &mut stream_hot_tier).await?; file_processed = true; let path = self.get_stream_path_for_date(stream, &date); @@ -400,7 +400,7 @@ impl HotTierManager { dates: &[NaiveDate], ) -> Result<(), HotTierError> { for date in dates.iter() { - let path = self.get_stream_path_for_date(stream, &date); + let path = self.get_stream_path_for_date(stream, date); if path.exists() { fs::remove_dir_all(path.clone()).await?; } @@ -594,8 +594,8 @@ impl HotTierManager { fs::remove_dir_all(path_to_delete.parent().unwrap()).await?; delete_empty_directory_hot_tier(path_to_delete.parent().unwrap()).await?; - stream_hot_tier.used_size = stream_hot_tier.used_size - file_size; - stream_hot_tier.available_size = stream_hot_tier.available_size + file_size; + stream_hot_tier.used_size -= file_size; + stream_hot_tier.available_size += file_size; self.put_hot_tier(stream, stream_hot_tier).await?; delete_successful = true; diff --git a/src/utils/human_size.rs b/src/utils/human_size.rs index 80862b3c1..c584ff116 100644 --- a/src/utils/human_size.rs +++ b/src/utils/human_size.rs @@ -55,5 +55,5 @@ where D: Deserializer<'de>, { let s = String::deserialize(deserializer)?; - human_size_to_bytes(&s).map_err(|e| de::Error::custom(e)) + human_size_to_bytes(&s).map_err(de::Error::custom) } From e62139c1ffe22071111e5223fc01d019cc403333 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Wed, 15 Jan 2025 11:23:20 +0530 Subject: [PATCH 09/24] rm unused code --- src/migration/mod.rs | 8 -------- 1 file changed, 8 deletions(-) diff --git a/src/migration/mod.rs b/src/migration/mod.rs index 07713b39a..cfd83483d 100644 --- a/src/migration/mod.rs +++ b/src/migration/mod.rs @@ -141,14 +141,6 @@ pub async fn run_migration(config: &Config) -> anyhow::Result<()> { Ok(()) } -#[derive(Debug, serde::Deserialize, serde::Serialize)] -pub struct StreamHotTier { - pub version: Option, - pub size: String, - pub used_size: String, - pub available_size: String, -} - async fn migration_stream(stream: &str, storage: &dyn ObjectStorage) -> anyhow::Result<()> { let mut arrow_schema: Schema = Schema::empty(); From e8bf543c850f51371fc7c0e7c6495213b2c6388b Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Wed, 15 Jan 2025 23:15:03 +0530 Subject: [PATCH 10/24] refactor: `fetch_hot_tier_dates` --- src/hottier.rs | 32 ++++++++++++++++++++------------ 1 file changed, 20 insertions(+), 12 deletions(-) diff --git a/src/hottier.rs b/src/hottier.rs index 4b3fcd9cf..fad80ff12 100644 --- a/src/hottier.rs +++ b/src/hottier.rs @@ -388,6 +388,7 @@ impl HotTierManager { .join("hottier.manifest.json"); fs::create_dir_all(manifest_path.parent().unwrap()).await?; fs::write(manifest_path, serde_json::to_vec(&hot_tier_manifest)?).await?; + Ok(file_processed) } @@ -413,21 +414,28 @@ impl HotTierManager { pub async fn fetch_hot_tier_dates(&self, stream: &str) -> Result, HotTierError> { let mut date_list = Vec::new(); let path = self.hot_tier_path.join(stream); - if path.exists() { - let directories = ReadDirStream::new(fs::read_dir(&path).await?); - let dates: Vec = directories.try_collect().await?; - for date in dates { - if !date.path().is_dir() { - continue; - } - let date = date.file_name().into_string().unwrap(); - date_list.push( - NaiveDate::parse_from_str(date.trim_start_matches("date="), "%Y-%m-%d") - .unwrap(), - ); + if !path.exists() { + return Ok(date_list); + } + + let directories = fs::read_dir(&path).await?; + let mut dates = ReadDirStream::new(directories); + while let Some(date) = dates.next().await { + let date = date?; + if !date.path().is_dir() { + continue; } + let date = NaiveDate::parse_from_str( + date.file_name() + .to_string_lossy() + .trim_start_matches("date="), + "%Y-%m-%d", + ) + .unwrap(); + date_list.push(date); } date_list.sort(); + Ok(date_list) } From 15c2f81a2317e352f30ec7d1562869d16d12937c Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Wed, 15 Jan 2025 23:24:36 +0530 Subject: [PATCH 11/24] refactor: `delete_empty_directory_hot_tier` --- src/hottier.rs | 52 +++++++++++++++++++++++++------------------------- 1 file changed, 26 insertions(+), 26 deletions(-) diff --git a/src/hottier.rs b/src/hottier.rs index fad80ff12..fedaae5cd 100644 --- a/src/hottier.rs +++ b/src/hottier.rs @@ -753,35 +753,35 @@ struct DiskUtil { } async fn delete_empty_directory_hot_tier(path: &Path) -> io::Result<()> { - async fn delete_helper(path: &Path) -> io::Result<()> { - if path.is_dir() { - let mut read_dir = fs::read_dir(path).await?; - let mut subdirs = vec![]; - - while let Some(entry) = read_dir.next_entry().await? { - let entry_path = entry.path(); - if entry_path.is_dir() { - subdirs.push(entry_path); - } - } - let mut tasks = vec![]; - for subdir in &subdirs { - tasks.push(delete_empty_directory_hot_tier(subdir)); - } - futures::stream::iter(tasks) - .buffer_unordered(10) - .try_collect::>() - .await?; + if !path.is_dir() { + return Ok(()); + } + let mut read_dir = fs::read_dir(path).await?; + let mut subdirs = vec![]; - // Re-check the directory after deleting its subdirectories - let mut read_dir = fs::read_dir(path).await?; - if read_dir.next_entry().await?.is_none() { - fs::remove_dir(path).await?; - } + while let Some(entry) = read_dir.next_entry().await? { + let entry_path = entry.path(); + if entry_path.is_dir() { + subdirs.push(entry_path); } - Ok(()) } - delete_helper(path).await + + let mut tasks = vec![]; + for subdir in &subdirs { + tasks.push(delete_empty_directory_hot_tier(subdir)); + } + futures::stream::iter(tasks) + .buffer_unordered(10) + .try_collect::>() + .await?; + + // Re-check the directory after deleting its subdirectories + let mut read_dir = fs::read_dir(path).await?; + if read_dir.next_entry().await?.is_none() { + fs::remove_dir(path).await?; + } + + Ok(()) } #[derive(Debug, thiserror::Error)] From 20cd15f44c3b59f307fa4cbbeab5778b766e4d85 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Thu, 16 Jan 2025 18:14:57 +0530 Subject: [PATCH 12/24] fix: frontend expects "x Bytes" --- src/utils/human_size.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/utils/human_size.rs b/src/utils/human_size.rs index c584ff116..697c58a83 100644 --- a/src/utils/human_size.rs +++ b/src/utils/human_size.rs @@ -46,7 +46,9 @@ pub fn serialize(bytes: &u64, serializer: S) -> Result where S: Serializer, { - let human_readable = bytes_to_human_size(*bytes); + // let human_readable = bytes_to_human_size(*bytes); + // NOTE: frontend expects the size in bytes + let human_readable = format!("{bytes} Bytes"); serializer.serialize_str(&human_readable) } From 9e1d3c238bf7839b9278b2ac01a06953feb69444 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Mon, 27 Jan 2025 13:10:34 +0530 Subject: [PATCH 13/24] refactor: early fail on wrong json --- src/handlers/http/logstream.rs | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/src/handlers/http/logstream.rs b/src/handlers/http/logstream.rs index ec799e3ab..68e74a4af 100644 --- a/src/handlers/http/logstream.rs +++ b/src/handlers/http/logstream.rs @@ -589,7 +589,7 @@ pub async fn get_stream_info(stream_name: Path) -> Result, - Json(json): Json, + Json(mut hottier): Json, ) -> Result { let stream_name = stream_name.into_inner(); if !STREAM_INFO.stream_exists(&stream_name) { @@ -616,11 +616,6 @@ pub async fn put_stream_hot_tier( return Err(StreamError::HotTierNotEnabled(stream_name)); } - let mut hottier: StreamHotTier = match serde_json::from_value(json) { - Ok(hottier) => hottier, - Err(err) => return Err(StreamError::InvalidHotTierConfig(err)), - }; - validator::hot_tier(&hottier.size.to_string())?; STREAM_INFO.set_hot_tier(&stream_name, true)?; @@ -836,8 +831,6 @@ pub mod error { "Hot tier is not enabled at the server config, cannot enable hot tier for stream {0}" )] HotTierNotEnabled(String), - #[error("failed to enable hottier due to err: {0}")] - InvalidHotTierConfig(serde_json::Error), #[error("Hot tier validation failed: {0}")] HotTierValidation(#[from] HotTierValidationError), #[error("{0}")] @@ -875,7 +868,6 @@ pub mod error { err.status().unwrap_or(StatusCode::INTERNAL_SERVER_ERROR) } StreamError::HotTierNotEnabled(_) => StatusCode::BAD_REQUEST, - StreamError::InvalidHotTierConfig(_) => StatusCode::BAD_REQUEST, StreamError::HotTierValidation(_) => StatusCode::BAD_REQUEST, StreamError::HotTierError(_) => StatusCode::INTERNAL_SERVER_ERROR, } From 33822e7ee118dd17a2ca3f13c64ee0a29ee3f7bc Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Mon, 27 Jan 2025 13:11:08 +0530 Subject: [PATCH 14/24] fix: allow just byte count --- src/utils/human_size.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/utils/human_size.rs b/src/utils/human_size.rs index 697c58a83..7857ad171 100644 --- a/src/utils/human_size.rs +++ b/src/utils/human_size.rs @@ -5,6 +5,10 @@ use serde::{de, Deserialize, Deserializer, Serializer}; // Function to convert human-readable size to bytes (already provided) pub fn human_size_to_bytes(s: &str) -> Result { + if let Ok(size) = s.parse() { + return Ok(size); + } + fn parse_and_map(s: &str) -> Result { SpecificSize::::from_str(s).map(|x| x.to_bytes()) } From b9f0076886c3c095a41f45b8aaec659f6072c6ab Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Mon, 27 Jan 2025 13:41:33 +0530 Subject: [PATCH 15/24] doc: note on behavior --- src/utils/human_size.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/utils/human_size.rs b/src/utils/human_size.rs index 7857ad171..ff7ab18f9 100644 --- a/src/utils/human_size.rs +++ b/src/utils/human_size.rs @@ -4,6 +4,7 @@ use human_size::{multiples, SpecificSize}; use serde::{de, Deserialize, Deserializer, Serializer}; // Function to convert human-readable size to bytes (already provided) +// NOTE: consider number values as byte count, e.g. "1234" is 1234 bytes. pub fn human_size_to_bytes(s: &str) -> Result { if let Ok(size) = s.parse() { return Ok(size); From afaa64a2cb52c6df8e1b42e97ac5a3a2b61d36b4 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Mon, 27 Jan 2025 20:39:41 +0530 Subject: [PATCH 16/24] refactor: `hot_tier_enabled: bool` --- src/handlers/http/logstream.rs | 2 +- src/metadata.rs | 4 ++-- src/storage/mod.rs | 6 +++--- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/handlers/http/logstream.rs b/src/handlers/http/logstream.rs index 68e74a4af..717ca8fff 100644 --- a/src/handlers/http/logstream.rs +++ b/src/handlers/http/logstream.rs @@ -631,7 +631,7 @@ pub async fn put_stream_hot_tier( .await?; let storage = CONFIG.storage().get_object_store(); let mut stream_metadata = storage.get_object_store_format(&stream_name).await?; - stream_metadata.hot_tier_enabled = Some(true); + stream_metadata.hot_tier_enabled = true; storage .put_stream_manifest(&stream_name, &stream_metadata) .await?; diff --git a/src/metadata.rs b/src/metadata.rs index 5c18aa329..6f99eaeca 100644 --- a/src/metadata.rs +++ b/src/metadata.rs @@ -74,7 +74,7 @@ pub struct LogStreamMetadata { pub time_partition_limit: Option, pub custom_partition: Option, pub static_schema_flag: bool, - pub hot_tier_enabled: Option, + pub hot_tier_enabled: bool, pub stream_type: Option, pub log_source: LogSource, } @@ -257,7 +257,7 @@ impl StreamInfo { let stream = map .get_mut(stream_name) .ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string()))?; - stream.hot_tier_enabled = Some(enable); + stream.hot_tier_enabled = enable; Ok(()) } diff --git a/src/storage/mod.rs b/src/storage/mod.rs index f86b55757..7098b85d3 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -113,8 +113,8 @@ pub struct ObjectStoreFormat { skip_serializing_if = "std::ops::Not::not" )] pub static_schema_flag: bool, - #[serde(skip_serializing_if = "Option::is_none")] - pub hot_tier_enabled: Option, + #[serde(default)] + pub hot_tier_enabled: bool, pub stream_type: Option, #[serde(default)] pub log_source: LogSource, @@ -217,7 +217,7 @@ impl Default for ObjectStoreFormat { time_partition_limit: None, custom_partition: None, static_schema_flag: false, - hot_tier_enabled: None, + hot_tier_enabled: false, log_source: LogSource::default(), } } From 46d78df0a6c9969c8fdf213b5aea848477d5a11d Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Mon, 27 Jan 2025 20:55:12 +0530 Subject: [PATCH 17/24] refactor: `let else` pattern --- src/handlers/http/logstream.rs | 94 +++++++++++++++------------------- src/validator.rs | 18 +++---- 2 files changed, 51 insertions(+), 61 deletions(-) diff --git a/src/handlers/http/logstream.rs b/src/handlers/http/logstream.rs index 717ca8fff..c8c1f4ad0 100644 --- a/src/handlers/http/logstream.rs +++ b/src/handlers/http/logstream.rs @@ -612,30 +612,28 @@ pub async fn put_stream_hot_tier( status: StatusCode::BAD_REQUEST, }); } - if CONFIG.options.hot_tier_storage_path.is_none() { - return Err(StreamError::HotTierNotEnabled(stream_name)); - } validator::hot_tier(&hottier.size.to_string())?; STREAM_INFO.set_hot_tier(&stream_name, true)?; - if let Some(hot_tier_manager) = HotTierManager::global() { - let existing_hot_tier_used_size = hot_tier_manager - .validate_hot_tier_size(&stream_name, hottier.size) - .await?; - hottier.used_size = existing_hot_tier_used_size; - hottier.available_size = hottier.size; - hottier.version = Some(CURRENT_HOT_TIER_VERSION.to_string()); - hot_tier_manager - .put_hot_tier(&stream_name, &mut hottier) - .await?; - let storage = CONFIG.storage().get_object_store(); - let mut stream_metadata = storage.get_object_store_format(&stream_name).await?; - stream_metadata.hot_tier_enabled = true; - storage - .put_stream_manifest(&stream_name, &stream_metadata) - .await?; - } + let Some(hot_tier_manager) = HotTierManager::global() else { + return Err(StreamError::HotTierNotEnabled(stream_name)); + }; + let existing_hot_tier_used_size = hot_tier_manager + .validate_hot_tier_size(&stream_name, hottier.size) + .await?; + hottier.used_size = existing_hot_tier_used_size; + hottier.available_size = hottier.size; + hottier.version = Some(CURRENT_HOT_TIER_VERSION.to_string()); + hot_tier_manager + .put_hot_tier(&stream_name, &mut hottier) + .await?; + let storage = CONFIG.storage().get_object_store(); + let mut stream_metadata = storage.get_object_store_format(&stream_name).await?; + stream_metadata.hot_tier_enabled = true; + storage + .put_stream_manifest(&stream_name, &stream_metadata) + .await?; Ok(( format!("hot tier set for stream {stream_name}"), @@ -660,34 +658,27 @@ pub async fn get_stream_hot_tier(stream_name: Path) -> Result Result<(), UsernameValidationError> { } pub fn hot_tier(size: &str) -> Result<(), HotTierValidationError> { - if let Ok(size) = size.parse::() { - if size < MIN_STREAM_HOT_TIER_SIZE_BYTES { - return Err(HotTierValidationError::Size(bytes_to_human_size( - MIN_STREAM_HOT_TIER_SIZE_BYTES, - ))); - } - Ok(()) - } else { - Err(HotTierValidationError::InvalidFormat) + let Ok(size) = size.parse::() else { + return Err(HotTierValidationError::InvalidFormat); + }; + if size < MIN_STREAM_HOT_TIER_SIZE_BYTES { + return Err(HotTierValidationError::Size(bytes_to_human_size( + MIN_STREAM_HOT_TIER_SIZE_BYTES, + ))); } + + Ok(()) } pub mod error { From cb4d437debb1a0f90518eff540dc600a21251b79 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Mon, 27 Jan 2025 20:55:41 +0530 Subject: [PATCH 18/24] fix: request ain't bad, just not allowed --- src/handlers/http/logstream.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/handlers/http/logstream.rs b/src/handlers/http/logstream.rs index c8c1f4ad0..5fb0898f0 100644 --- a/src/handlers/http/logstream.rs +++ b/src/handlers/http/logstream.rs @@ -857,7 +857,7 @@ pub mod error { StreamError::Network(err) => { err.status().unwrap_or(StatusCode::INTERNAL_SERVER_ERROR) } - StreamError::HotTierNotEnabled(_) => StatusCode::BAD_REQUEST, + StreamError::HotTierNotEnabled(_) => StatusCode::FORBIDDEN, StreamError::HotTierValidation(_) => StatusCode::BAD_REQUEST, StreamError::HotTierError(_) => StatusCode::INTERNAL_SERVER_ERROR, } From 90b020dcb4f5d0c13949415029e1c9879aa3e7a5 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Mon, 27 Jan 2025 21:07:31 +0530 Subject: [PATCH 19/24] save on cost of path --- src/hottier.rs | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/src/hottier.rs b/src/hottier.rs index 1cb55223e..ff8e3fcc1 100644 --- a/src/hottier.rs +++ b/src/hottier.rs @@ -68,26 +68,27 @@ pub struct StreamHotTier { pub struct HotTierManager { filesystem: LocalFileSystem, - hot_tier_path: PathBuf, + hot_tier_path: &'static Path, } impl HotTierManager { + pub fn new(hot_tier_path: &'static Path) -> Self { + std::fs::create_dir_all(hot_tier_path).unwrap(); + HotTierManager { + filesystem: LocalFileSystem::new(), + hot_tier_path, + } + } + + /// Get a global pub fn global() -> Option<&'static HotTierManager> { static INSTANCE: OnceCell = OnceCell::new(); CONFIG .options .hot_tier_storage_path - .clone() - .map(|hot_tier_path| { - INSTANCE.get_or_init(|| { - std::fs::create_dir_all(&hot_tier_path).unwrap(); - HotTierManager { - filesystem: LocalFileSystem::new(), - hot_tier_path, - } - }) - }) + .as_ref() + .map(|hot_tier_path| INSTANCE.get_or_init(|| HotTierManager::new(hot_tier_path))) } ///get the total hot tier size for all streams From 0c1d992695de41fa9c1dc713104a6f6eb110aced Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Mon, 27 Jan 2025 22:01:58 +0530 Subject: [PATCH 20/24] fix: read as "X Bytes" from hottier manifest file --- src/utils/human_size.rs | 50 +++++++++++++++++++++++++++++++++-------- 1 file changed, 41 insertions(+), 9 deletions(-) diff --git a/src/utils/human_size.rs b/src/utils/human_size.rs index ff7ab18f9..92103f0d0 100644 --- a/src/utils/human_size.rs +++ b/src/utils/human_size.rs @@ -1,12 +1,16 @@ use std::str::FromStr; -use human_size::{multiples, SpecificSize}; +use human_size::{Any, SpecificSize}; use serde::{de, Deserialize, Deserializer, Serializer}; // Function to convert human-readable size to bytes (already provided) // NOTE: consider number values as byte count, e.g. "1234" is 1234 bytes. pub fn human_size_to_bytes(s: &str) -> Result { - if let Ok(size) = s.parse() { + let s = s.trim(); + if let Some(s) = s.strip_suffix("Bytes") { + let size = s.trim().parse().expect("Suffix bytes implies byte count"); + return Ok(size); + } else if let Ok(size) = s.parse() { return Ok(size); } @@ -14,13 +18,8 @@ pub fn human_size_to_bytes(s: &str) -> Result { SpecificSize::::from_str(s).map(|x| x.to_bytes()) } - let size = parse_and_map::(s) - .or(parse_and_map::(s)) - .or(parse_and_map::(s)) - .or(parse_and_map::(s)) - .or(parse_and_map::(s)) - .or(parse_and_map::(s)) - .map_err(|_| "Could not parse given size".to_string())?; + let size = parse_and_map::(s).map_err(|_| "Could not parse given size".to_string())?; + Ok(size) } @@ -64,3 +63,36 @@ where let s = String::deserialize(deserializer)?; human_size_to_bytes(&s).map_err(de::Error::custom) } + +#[cfg(test)] +mod tests { + use crate::utils::human_size::human_size_to_bytes; + + #[test] + fn parse_numeric_input_without_unit() { + assert_eq!(human_size_to_bytes("1234"), Ok(1234)); + } + + #[test] + fn parse_bytes_string_to_bytes() { + assert_eq!(human_size_to_bytes("1234 Bytes"), Ok(1234)); + } + + #[test] + fn handle_empty_string_input() { + assert_eq!( + human_size_to_bytes(""), + Err("Could not parse given size".to_string()) + ); + } + + #[test] + fn convert_mebibyte_string_to_bytes() { + assert_eq!(human_size_to_bytes("1 MiB"), Ok(1048576)); + } + + #[test] + fn parse_gigabyte_string_input() { + assert_eq!(human_size_to_bytes("1 GB"), Ok(1_000_000_000)); + } +} From 4c353fd403d34f0447fd2170bb7794951b1ebd22 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Mon, 27 Jan 2025 22:21:20 +0530 Subject: [PATCH 21/24] refactor: custom error type --- src/utils/human_size.rs | 39 +++++++++++++++++++++++++++------------ 1 file changed, 27 insertions(+), 12 deletions(-) diff --git a/src/utils/human_size.rs b/src/utils/human_size.rs index 92103f0d0..3cf5f40c8 100644 --- a/src/utils/human_size.rs +++ b/src/utils/human_size.rs @@ -3,12 +3,20 @@ use std::str::FromStr; use human_size::{Any, SpecificSize}; use serde::{de, Deserialize, Deserializer, Serializer}; +#[derive(Debug, thiserror::Error)] +enum ParsingError { + #[error("Expected 'X' | 'X Bytes', but error: {0}")] + Int(#[from] std::num::ParseIntError), + #[error("Could not parse given string as human size, erro: {0}")] + HumanSize(#[from] human_size::ParsingError), +} + // Function to convert human-readable size to bytes (already provided) // NOTE: consider number values as byte count, e.g. "1234" is 1234 bytes. -pub fn human_size_to_bytes(s: &str) -> Result { +fn human_size_to_bytes(s: &str) -> Result { let s = s.trim(); if let Some(s) = s.strip_suffix("Bytes") { - let size = s.trim().parse().expect("Suffix bytes implies byte count"); + let size: u64 = s.trim().parse()?; return Ok(size); } else if let Ok(size) = s.parse() { return Ok(size); @@ -17,8 +25,7 @@ pub fn human_size_to_bytes(s: &str) -> Result { fn parse_and_map(s: &str) -> Result { SpecificSize::::from_str(s).map(|x| x.to_bytes()) } - - let size = parse_and_map::(s).map_err(|_| "Could not parse given size".to_string())?; + let size = parse_and_map::(s)?; Ok(size) } @@ -66,33 +73,41 @@ where #[cfg(test)] mod tests { - use crate::utils::human_size::human_size_to_bytes; + use super::*; #[test] fn parse_numeric_input_without_unit() { - assert_eq!(human_size_to_bytes("1234"), Ok(1234)); + assert_eq!(human_size_to_bytes("1234").unwrap(), 1234); } #[test] fn parse_bytes_string_to_bytes() { - assert_eq!(human_size_to_bytes("1234 Bytes"), Ok(1234)); + assert_eq!(human_size_to_bytes("1234 Bytes").unwrap(), 1234); } #[test] fn handle_empty_string_input() { - assert_eq!( + assert!(matches!( human_size_to_bytes(""), - Err("Could not parse given size".to_string()) - ); + Err(ParsingError::HumanSize(_)) + )); + } + + #[test] + fn handle_byte_string_input_without_value() { + assert!(matches!( + human_size_to_bytes("Bytes"), + Err(ParsingError::Int(_)) + )); } #[test] fn convert_mebibyte_string_to_bytes() { - assert_eq!(human_size_to_bytes("1 MiB"), Ok(1048576)); + assert_eq!(human_size_to_bytes("1 MiB").unwrap(), 1048576); } #[test] fn parse_gigabyte_string_input() { - assert_eq!(human_size_to_bytes("1 GB"), Ok(1_000_000_000)); + assert_eq!(human_size_to_bytes("1 GB").unwrap(), 1_000_000_000); } } From a6891a708442552b2768d300bc503f6fd600cfff Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Mon, 27 Jan 2025 22:24:42 +0530 Subject: [PATCH 22/24] ci: clippy suggestion --- src/hottier.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/hottier.rs b/src/hottier.rs index ff8e3fcc1..7eb4bfb5d 100644 --- a/src/hottier.rs +++ b/src/hottier.rs @@ -171,7 +171,7 @@ impl HotTierManager { HotTierValidationError::NotFound(stream.to_owned()), )); } - let path = hot_tier_file_path(&self.hot_tier_path, stream)?; + let path = hot_tier_file_path(self.hot_tier_path, stream)?; let bytes = self .filesystem .get(&path) @@ -204,7 +204,7 @@ impl HotTierManager { stream: &str, hot_tier: &mut StreamHotTier, ) -> Result<(), HotTierError> { - let path = hot_tier_file_path(&self.hot_tier_path, stream)?; + let path = hot_tier_file_path(self.hot_tier_path, stream)?; let bytes = serde_json::to_vec(&hot_tier)?.into(); self.filesystem.put(&path, bytes).await?; Ok(()) From 96969824ac96fe18da2846ecc1073550885c1dc5 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Mon, 27 Jan 2025 22:44:51 +0530 Subject: [PATCH 23/24] refactor: rm loc --- src/hottier.rs | 69 +++++++++++++++++++++++++------------------------- 1 file changed, 34 insertions(+), 35 deletions(-) diff --git a/src/hottier.rs b/src/hottier.rs index 7eb4bfb5d..329048749 100644 --- a/src/hottier.rs +++ b/src/hottier.rs @@ -167,11 +167,9 @@ impl HotTierManager { ///get the hot tier metadata file for the stream pub async fn get_hot_tier(&self, stream: &str) -> Result { if !self.check_stream_hot_tier_exists(stream) { - return Err(HotTierError::HotTierValidationError( - HotTierValidationError::NotFound(stream.to_owned()), - )); + return Err(HotTierValidationError::NotFound(stream.to_owned()).into()); } - let path = hot_tier_file_path(self.hot_tier_path, stream)?; + let path = self.hot_tier_file_path(stream)?; let bytes = self .filesystem .get(&path) @@ -179,22 +177,19 @@ impl HotTierManager { .await?; let mut stream_hot_tier: StreamHotTier = serde_json::from_slice(&bytes)?; - let oldest_date_time_entry = self.get_oldest_date_time_entry(stream).await?; - stream_hot_tier.oldest_date_time_entry = oldest_date_time_entry; + stream_hot_tier.oldest_date_time_entry = self.get_oldest_date_time_entry(stream).await?; Ok(stream_hot_tier) } pub async fn delete_hot_tier(&self, stream: &str) -> Result<(), HotTierError> { - if self.check_stream_hot_tier_exists(stream) { - let path = self.hot_tier_path.join(stream); - fs::remove_dir_all(path).await?; - Ok(()) - } else { - Err(HotTierError::HotTierValidationError( - HotTierValidationError::NotFound(stream.to_owned()), - )) + if !self.check_stream_hot_tier_exists(stream) { + return Err(HotTierValidationError::NotFound(stream.to_owned()).into()); } + let path = self.hot_tier_path.join(stream); + fs::remove_dir_all(path).await?; + + Ok(()) } ///put the hot tier metadata file for the stream @@ -204,12 +199,26 @@ impl HotTierManager { stream: &str, hot_tier: &mut StreamHotTier, ) -> Result<(), HotTierError> { - let path = hot_tier_file_path(self.hot_tier_path, stream)?; + let path = self.hot_tier_file_path(stream)?; let bytes = serde_json::to_vec(&hot_tier)?.into(); self.filesystem.put(&path, bytes).await?; Ok(()) } + /// get the hot tier file path for the stream + pub fn hot_tier_file_path( + &self, + stream: &str, + ) -> Result { + let path = self + .hot_tier_path + .join(stream) + .join(STREAM_HOT_TIER_FILENAME); + let path = object_store::path::Path::from_absolute_path(path)?; + + Ok(path) + } + ///schedule the download of the hot tier files from S3 every minute pub fn download_from_s3<'a>(&'a self) -> Result<(), HotTierError> where @@ -584,7 +593,10 @@ impl HotTierManager { fs::write(manifest_file.path(), serde_json::to_vec(&manifest)?).await?; fs::remove_dir_all(path_to_delete.parent().unwrap()).await?; - delete_empty_directory_hot_tier(path_to_delete.parent().unwrap()).await?; + delete_empty_directory_hot_tier( + path_to_delete.parent().unwrap().to_path_buf(), + ) + .await?; stream_hot_tier.used_size -= file_size; stream_hot_tier.available_size += file_size; @@ -721,48 +733,35 @@ impl HotTierManager { } } -/// get the hot tier file path for the stream -pub fn hot_tier_file_path( - root: impl AsRef, - stream: &str, -) -> Result { - let path = root.as_ref().join(stream).join(STREAM_HOT_TIER_FILENAME); - object_store::path::Path::from_absolute_path(path) -} - struct DiskUtil { total_space: u64, available_space: u64, used_space: u64, } -async fn delete_empty_directory_hot_tier(path: &Path) -> io::Result<()> { +async fn delete_empty_directory_hot_tier(path: PathBuf) -> io::Result<()> { if !path.is_dir() { return Ok(()); } - let mut read_dir = fs::read_dir(path).await?; - let mut subdirs = vec![]; + let mut read_dir = fs::read_dir(&path).await?; + let mut tasks = vec![]; while let Some(entry) = read_dir.next_entry().await? { let entry_path = entry.path(); if entry_path.is_dir() { - subdirs.push(entry_path); + tasks.push(delete_empty_directory_hot_tier(entry_path)); } } - let mut tasks = vec![]; - for subdir in &subdirs { - tasks.push(delete_empty_directory_hot_tier(subdir)); - } futures::stream::iter(tasks) .buffer_unordered(10) .try_collect::>() .await?; // Re-check the directory after deleting its subdirectories - let mut read_dir = fs::read_dir(path).await?; + let mut read_dir = fs::read_dir(&path).await?; if read_dir.next_entry().await?.is_none() { - fs::remove_dir(path).await?; + fs::remove_dir(&path).await?; } Ok(()) From eaf33795e71f0a79b6a6053efa9e5869bfe10bd5 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Tue, 28 Jan 2025 11:15:23 +0530 Subject: [PATCH 24/24] refactor: let it be done --- src/handlers/http/logstream.rs | 21 +++------------------ 1 file changed, 3 insertions(+), 18 deletions(-) diff --git a/src/handlers/http/logstream.rs b/src/handlers/http/logstream.rs index b06223c36..e1b1b987a 100644 --- a/src/handlers/http/logstream.rs +++ b/src/handlers/http/logstream.rs @@ -50,7 +50,7 @@ use bytes::Bytes; use chrono::Utc; use http::{HeaderName, HeaderValue}; use itertools::Itertools; -use serde_json::{json, Value}; +use serde_json::Value; use std::collections::HashMap; use std::fs; use std::num::NonZeroU32; @@ -658,24 +658,9 @@ pub async fn get_stream_hot_tier(stream_name: Path) -> Result