Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: parts of hottier #1022

Merged
merged 38 commits into from
Jan 28, 2025
Merged
Show file tree
Hide file tree
Changes from 36 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
949f4d6
feat: split methods and use unordered futures
de-sh Dec 5, 2024
deaa667
Merge branch 'main' into hottier-perf
de-sh Dec 18, 2024
77875b8
Merge remote-tracking branch 'origin/main' into hottier-perf
de-sh Jan 14, 2025
3c388c5
refactor: map
de-sh Jan 14, 2025
26f818d
refactor: get disk usage as method
de-sh Jan 14, 2025
592da07
method won't be called if hottier doesn't exist
de-sh Jan 14, 2025
4af43a5
DRY: `get_stream_path_for_date`
de-sh Jan 14, 2025
94c2189
stream through results
de-sh Jan 14, 2025
7305873
refactor: `human_size` parsing
de-sh Jan 14, 2025
14535f1
ci: clippy suggestions
de-sh Jan 14, 2025
e62139c
rm unused code
de-sh Jan 15, 2025
13e0f71
Merge branch 'main' into hottier-perf
de-sh Jan 15, 2025
e8bf543
refactor: `fetch_hot_tier_dates`
de-sh Jan 15, 2025
15c2f81
refactor: `delete_empty_directory_hot_tier`
de-sh Jan 15, 2025
c38e996
Merge branch 'main' into hottier-perf
de-sh Jan 16, 2025
6bd5c0a
Merge remote-tracking branch 'dev/hottier-perf' into hottier-perf
de-sh Jan 16, 2025
20cd15f
fix: frontend expects "x Bytes"
de-sh Jan 16, 2025
a7dfdce
Merge branch 'main' into hottier-perf
de-sh Jan 16, 2025
6c7356e
Merge branch 'main' into hottier-perf
de-sh Jan 17, 2025
a7de8c4
Merge branch 'main' into hottier-perf
de-sh Jan 23, 2025
2cdf172
Merge remote-tracking branch 'origin/main' into hottier-perf
de-sh Jan 24, 2025
eeee3ac
Merge remote-tracking branch 'origin/main' into hottier-perf
de-sh Jan 24, 2025
13e8e7d
Merge branch 'main' into hottier-perf
nikhilsinhaparseable Jan 25, 2025
e7a37b7
Merge branch 'main' into hottier-perf
nikhilsinhaparseable Jan 26, 2025
9e1d3c2
refactor: early fail on wrong json
de-sh Jan 27, 2025
33822e7
fix: allow just byte count
de-sh Jan 27, 2025
b9f0076
doc: note on behavior
de-sh Jan 27, 2025
afaa64a
refactor: `hot_tier_enabled: bool`
de-sh Jan 27, 2025
46d78df
refactor: `let else` pattern
de-sh Jan 27, 2025
cb4d437
fix: request ain't bad, just not allowed
de-sh Jan 27, 2025
90b020d
save on cost of path
de-sh Jan 27, 2025
0c1d992
fix: read as "X Bytes" from hottier manifest file
de-sh Jan 27, 2025
4c353fd
refactor: custom error type
de-sh Jan 27, 2025
a6891a7
ci: clippy suggestion
de-sh Jan 27, 2025
9696982
refactor: rm loc
de-sh Jan 27, 2025
cf26f43
Merge branch 'main' into hottier-perf
nikhilsinhaparseable Jan 27, 2025
eaf3379
refactor: let it be done
de-sh Jan 28, 2025
1bc67d5
Merge branch 'main' into hottier-perf
de-sh Jan 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 45 additions & 51 deletions src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ use bytes::Bytes;
use chrono::Utc;
use http::{HeaderName, HeaderValue};
use itertools::Itertools;
use serde_json::Value;
use serde_json::{json, Value};
use std::collections::HashMap;
use std::fs;
use std::num::NonZeroU32;
Expand Down Expand Up @@ -586,7 +586,7 @@ pub async fn get_stream_info(stream_name: Path<String>) -> Result<impl Responder

pub async fn put_stream_hot_tier(
stream_name: Path<String>,
Json(json): Json<Value>,
Json(mut hottier): Json<StreamHotTier>,
) -> Result<impl Responder, StreamError> {
let stream_name = stream_name.into_inner();
if !STREAM_INFO.stream_exists(&stream_name) {
Expand All @@ -609,35 +609,28 @@ pub async fn put_stream_hot_tier(
status: StatusCode::BAD_REQUEST,
});
}
if CONFIG.options.hot_tier_storage_path.is_none() {
return Err(StreamError::HotTierNotEnabled(stream_name));
}

let mut hottier: StreamHotTier = match serde_json::from_value(json) {
Ok(hottier) => hottier,
Err(err) => return Err(StreamError::InvalidHotTierConfig(err)),
};

validator::hot_tier(&hottier.size.to_string())?;

STREAM_INFO.set_hot_tier(&stream_name, true)?;
if let Some(hot_tier_manager) = HotTierManager::global() {
let existing_hot_tier_used_size = hot_tier_manager
.validate_hot_tier_size(&stream_name, &hottier.size)
.await?;
hottier.used_size = existing_hot_tier_used_size.to_string();
hottier.available_size = hottier.size.to_string();
hottier.version = Some(CURRENT_HOT_TIER_VERSION.to_string());
hot_tier_manager
.put_hot_tier(&stream_name, &mut hottier)
.await?;
let storage = CONFIG.storage().get_object_store();
let mut stream_metadata = storage.get_object_store_format(&stream_name).await?;
stream_metadata.hot_tier_enabled = Some(true);
storage
.put_stream_manifest(&stream_name, &stream_metadata)
.await?;
}
let Some(hot_tier_manager) = HotTierManager::global() else {
return Err(StreamError::HotTierNotEnabled(stream_name));
};
let existing_hot_tier_used_size = hot_tier_manager
.validate_hot_tier_size(&stream_name, hottier.size)
.await?;
hottier.used_size = existing_hot_tier_used_size;
hottier.available_size = hottier.size;
hottier.version = Some(CURRENT_HOT_TIER_VERSION.to_string());
hot_tier_manager
.put_hot_tier(&stream_name, &mut hottier)
.await?;
let storage = CONFIG.storage().get_object_store();
let mut stream_metadata = storage.get_object_store_format(&stream_name).await?;
stream_metadata.hot_tier_enabled = true;
storage
.put_stream_manifest(&stream_name, &stream_metadata)
.await?;

Ok((
format!("hot tier set for stream {stream_name}"),
Expand All @@ -662,22 +655,27 @@ pub async fn get_stream_hot_tier(stream_name: Path<String>) -> Result<impl Respo
}
}

if CONFIG.options.hot_tier_storage_path.is_none() {
let Some(hot_tier_manager) = HotTierManager::global() else {
return Err(StreamError::HotTierNotEnabled(stream_name));
}

if let Some(hot_tier_manager) = HotTierManager::global() {
let mut hot_tier = hot_tier_manager.get_hot_tier(&stream_name).await?;
hot_tier.size = format!("{} {}", hot_tier.size, "Bytes");
hot_tier.used_size = format!("{} Bytes", hot_tier.used_size);
hot_tier.available_size = format!("{} Bytes", hot_tier.available_size);
Ok((web::Json(hot_tier), StatusCode::OK))
} else {
Err(StreamError::Custom {
msg: format!("hot tier not initialised for stream {}", stream_name),
status: (StatusCode::BAD_REQUEST),
})
}
};
let StreamHotTier {
version,
size,
used_size,
available_size,
oldest_date_time_entry,
} = hot_tier_manager.get_hot_tier(&stream_name).await?;
let mut json = json!({
"version": version,
"size": format!("{size} Bytes"),
"used_size": format!("{used_size} Bytes"),
"available_size": format!("{available_size} Bytes"),
});
if let Some(entry) = oldest_date_time_entry {
json["oldest_date_time_entry"] = serde_json::Value::String(entry);
}

Ok((web::Json(json), StatusCode::OK))
}

pub async fn delete_stream_hot_tier(
Expand All @@ -699,9 +697,9 @@ pub async fn delete_stream_hot_tier(
}
}

if CONFIG.options.hot_tier_storage_path.is_none() {
let Some(hot_tier_manager) = HotTierManager::global() else {
return Err(StreamError::HotTierNotEnabled(stream_name));
}
};

if STREAM_INFO.stream_type(&stream_name).unwrap() == Some(StreamType::Internal.to_string()) {
return Err(StreamError::Custom {
Expand All @@ -710,9 +708,8 @@ pub async fn delete_stream_hot_tier(
});
}

if let Some(hot_tier_manager) = HotTierManager::global() {
hot_tier_manager.delete_hot_tier(&stream_name).await?;
}
hot_tier_manager.delete_hot_tier(&stream_name).await?;

Ok((
format!("hot tier deleted for stream {stream_name}"),
StatusCode::OK,
Expand Down Expand Up @@ -821,8 +818,6 @@ pub mod error {
"Hot tier is not enabled at the server config, cannot enable hot tier for stream {0}"
)]
HotTierNotEnabled(String),
#[error("failed to enable hottier due to err: {0}")]
InvalidHotTierConfig(serde_json::Error),
#[error("Hot tier validation failed: {0}")]
HotTierValidation(#[from] HotTierValidationError),
#[error("{0}")]
Expand Down Expand Up @@ -859,8 +854,7 @@ pub mod error {
StreamError::Network(err) => {
err.status().unwrap_or(StatusCode::INTERNAL_SERVER_ERROR)
}
StreamError::HotTierNotEnabled(_) => StatusCode::BAD_REQUEST,
StreamError::InvalidHotTierConfig(_) => StatusCode::BAD_REQUEST,
StreamError::HotTierNotEnabled(_) => StatusCode::FORBIDDEN,
StreamError::HotTierValidation(_) => StatusCode::BAD_REQUEST,
StreamError::HotTierError(_) => StatusCode::INTERNAL_SERVER_ERROR,
}
Expand Down
Loading
Loading