From d244760f5039a15450f5d4566ffe52d19d427771 Mon Sep 17 00:00:00 2001 From: Lloyd Date: Thu, 31 Oct 2024 20:42:46 +0900 Subject: [PATCH] testcontainer refactoring (#4) Based on observation, localstack containers weren't being shutdown properly. It turns out static bindings don't get shut down, so we need https://github.com/testcontainers/testcontainers-rs/issues/707#issuecomment-2309718920 unti https://github.com/testcontainers/testcontainers-rs/issues/577. Signed-off-by: lloydmeta --- server/Cargo.toml | 3 +- server/src/infra/image_caching.rs | 30 +-- server/src/lib.rs | 417 +++++++++++++++++++++++++++++ server/src/main.rs | 429 ------------------------------ server/src/test_utils/mod.rs | 113 ++++++++ 5 files changed, 540 insertions(+), 452 deletions(-) create mode 100644 server/src/test_utils/mod.rs diff --git a/server/Cargo.toml b/server/Cargo.toml index 929bc47..73b2b98 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -24,7 +24,7 @@ reqwest = { version = "0.12", default-features = false, features = [ axum = "0.7" serde = "1.0" serde_json = "1.0" -tokio = { version = "1", features = ["macros"] } +tokio = { version = "1", features = ["macros", "sync"] } image = { version = "0.25", features = ["rayon"] } miniaturs_shared = { path = "../shared" } @@ -36,5 +36,6 @@ bytes = "1.7" tower-http = { version = "0.6.1", features = ["catch-panic"] } [dev-dependencies] +ctor = "0.2.8" testcontainers = { version = "0.23" } testcontainers-modules = { version = "0.11", features = ["localstack"] } diff --git a/server/src/infra/image_caching.rs b/server/src/infra/image_caching.rs index fff1bb5..d8caaba 100644 --- a/server/src/infra/image_caching.rs +++ b/server/src/infra/image_caching.rs @@ -228,24 +228,18 @@ mod tests { use aws_config::{meta::region::RegionProviderChain, BehaviorVersion}; use aws_sdk_s3::{self as s3, primitives::ByteStream}; - use testcontainers::{runners::AsyncRunner, ContainerAsync, ImageExt}; - use testcontainers_modules::localstack::LocalStack; use tokio::sync::OnceCell; use super::*; + use crate::test_utils::{localstack_node, TestResult}; - // Holds the shared Localstack container; if it's not held here, the container ref gets dropped and the _actual_ container - // gets stopped - static LOCALSTACK_NODE: OnceCell> = OnceCell::const_new(); // Holds the shared S3 client that refers to the above; use `s3_client().await` to get it static S3_CLIENT: OnceCell = OnceCell::const_new(); // Bucket, static because we assume the app is passed a created one. static S3_BUCKET: OnceCell = OnceCell::const_new(); - type TestResult = Result<(), Box>; - #[tokio::test] - async fn test_s3() -> TestResult { + async fn test_s3() -> TestResult<()> { let client = s3_client().await; let bucket = "mybucket"; @@ -282,7 +276,7 @@ mod tests { } #[test] - fn test_cache_key() -> TestResult { + fn test_cache_key() -> TestResult<()> { let req = ImageResizeRequest { requested_image_url: "https://beachape.com/images/something.png".to_string(), operations: Operations::build(&Some(ImageResize { @@ -296,7 +290,7 @@ mod tests { } #[test] - fn test_metadata() -> TestResult { + fn test_metadata() -> TestResult<()> { let req = ImageResizedCacheRequest { request: ImageResizeRequest { requested_image_url: "https://beachape.com/images/something.png".to_string(), @@ -318,7 +312,7 @@ mod tests { } #[tokio::test] - async fn test_s3_image_cacher_get_does_not_exist() -> TestResult { + async fn test_s3_image_cacher_get_does_not_exist() -> TestResult<()> { let client = s3_client().await.clone(); let bucket_name = s3_bucket().await.clone(); let s3_image_cacher = S3ImageCacher { @@ -339,7 +333,7 @@ mod tests { } #[tokio::test] - async fn test_s3_image_cacher_set_get() -> TestResult { + async fn test_s3_image_cacher_set_get() -> TestResult<()> { let client = s3_client().await.clone(); let bucket_name = s3_bucket().await.clone(); let s3_image_cacher = S3ImageCacher { @@ -370,7 +364,7 @@ mod tests { } #[tokio::test] - async fn test_s3_image_cacher_setting_metadata_in_s3() -> TestResult { + async fn test_s3_image_cacher_setting_metadata_in_s3() -> TestResult<()> { let client = s3_client().await.clone(); let bucket_name = s3_bucket().await.clone(); let s3_image_cacher = S3ImageCacher { @@ -430,15 +424,7 @@ mod tests { async fn s3_client() -> &'static s3::Client { S3_CLIENT .get_or_init(|| async { - let node = LOCALSTACK_NODE - .get_or_init(|| async { - LocalStack::default() - .with_env_var("SERVICES", "s3") - .start() - .await - .expect("Localstack to start properly") - }) - .await; + let node = localstack_node().await; let host_port = node .get_host_port_ipv4(4566) .await diff --git a/server/src/lib.rs b/server/src/lib.rs index c5f6618..517895b 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -1,2 +1,419 @@ pub mod api; pub mod infra; + +#[cfg(test)] +pub mod test_utils; + +#[cfg(test)] +mod integration_tests { + use std::io::Cursor; + + use crate::api::routing::handlers::create_router; + use crate::infra::components::AppComponents; + use crate::infra::config::Config; + use crate::test_utils::{localstack_node, TestResult}; + + use super::api::responses::MetadataResponse; + use super::infra::config::{AuthenticationSettings, AwsSettings, ImageCacheSettings}; + use super::infra::image_caching::*; + use super::infra::image_manipulation::Operations; + use aws_config::{meta::region::RegionProviderChain, BehaviorVersion}; + use axum::{body::Body, http::Request, Router}; + use http_body_util::BodyExt; + use image::{ImageFormat, ImageReader}; + use lambda_http::tower::ServiceExt; + use miniaturs_shared::signature::make_url_safe_base64_hash; + use reqwest::{header::CONTENT_TYPE, StatusCode}; + use tokio::sync::OnceCell; + + static UNPROCESSED_BUCKET: OnceCell = OnceCell::const_new(); + static PROCESSED_BUCKET: OnceCell = OnceCell::const_new(); + static CONFIG: OnceCell = OnceCell::const_new(); + // Used for setting up buckets only... + static BOOTSTRAP_S3_CLIENT: OnceCell = OnceCell::const_new(); + static AWS_CONFIG: OnceCell = OnceCell::const_new(); + + #[tokio::test] + async fn test_root_response() -> TestResult<()> { + let app = app().await?; + let response = app + .oneshot(Request::builder().uri("/").body(Body::empty())?) + .await?; + assert_eq!(StatusCode::OK, response.status()); + Ok(()) + } + + const PNG_URL_1: &'static str = "https://beachape.com/images/octopress_with_container.png"; + const JPG_URL_1: &'static str = "https://beachape.com/images/super-high-performance.jpg"; + + #[tokio::test] + async fn test_resize_png() -> TestResult<()> { + test_resize( + PNG_URL_1, + ImageFormat::Png, + ImageResize { + target_width: 100, + target_height: 80, + }, + ImageResize { + target_width: 300, + target_height: 100, + }, + ) + .await + } + + #[tokio::test] + async fn test_resize_jpg() -> TestResult<()> { + test_resize( + JPG_URL_1, + ImageFormat::Jpeg, + ImageResize { + target_width: 50, + target_height: 70, + }, + ImageResize { + target_width: 500, + target_height: 600, + }, + ) + .await + } + + #[tokio::test] + async fn test_metadata_response() -> TestResult<()> { + test_metadata( + &PNG_URL_1, + ImageResize { + target_width: 100, + target_height: 80, + }, + ) + .await?; + test_metadata( + &PNG_URL_1, + ImageResize { + target_width: -100, + target_height: 80, + }, + ) + .await?; + test_metadata( + &JPG_URL_1, + ImageResize { + target_width: 100, + target_height: -80, + }, + ) + .await?; + test_metadata( + &PNG_URL_1, + ImageResize { + target_width: -100, + target_height: -80, + }, + ) + .await + } + + async fn test_resize( + image_url: &str, + expected_image_format: ImageFormat, + resize_target_1: ImageResize, + resize_target_2: ImageResize, + ) -> TestResult<()> { + let target_width_1 = resize_target_1.target_width; + + let signed_path_1 = signed_resize_path( + &config().await.authentication_settings, + resize_target_1, + image_url, + )?; + + // ensure nothing cached right now + assert!(retrieve_unprocessed_cached(image_url).await.is_none()); + assert!(retrieve_processed_cached(image_url, resize_target_1) + .await + .is_none()); + assert!(retrieve_processed_cached(image_url, resize_target_2) + .await + .is_none()); + + let response_1 = app() + .await? + .oneshot(Request::builder().uri(signed_path_1).body(Body::empty())?) + .await?; + assert_eq!(StatusCode::OK, response_1.status()); + + // ensure what should be cached is cached + assert!(retrieve_unprocessed_cached(image_url).await.is_some()); + assert!(retrieve_processed_cached(image_url, resize_target_1) + .await + .is_some()); + assert!(retrieve_processed_cached(image_url, resize_target_2) + .await + .is_none()); + + let response_content_type_1 = response_1 + .headers() + .get(CONTENT_TYPE) + .ok_or("No content type in response from miniaturs")? + .to_str()?; + assert_eq!( + expected_image_format.to_mime_type(), + response_content_type_1 + ); + + let response_bytes_1 = response_1.into_body().collect().await?.to_bytes(); + + let image_reader_1 = + ImageReader::new(Cursor::new(response_bytes_1)).with_guessed_format()?; + + assert_eq!( + expected_image_format, + image_reader_1.format().ok_or("Could not guess format")? + ); + let dynamic_image_1 = image_reader_1.decode()?; + + assert_eq!(target_width_1 as u32, dynamic_image_1.width()); + + // resize again + let target_width_2 = resize_target_2.target_width; + let signed_path_2 = signed_resize_path( + &config().await.authentication_settings, + resize_target_2, + image_url, + )?; + + let response_2 = app() + .await? + .oneshot(Request::builder().uri(signed_path_2).body(Body::empty())?) + .await?; + assert_eq!(StatusCode::OK, response_2.status()); + // ensure what should be cached is cached + assert!(retrieve_unprocessed_cached(image_url).await.is_some()); + assert!(retrieve_processed_cached(image_url, resize_target_1) + .await + .is_some()); + assert!(retrieve_processed_cached(image_url, resize_target_2) + .await + .is_some()); + + let response_content_type_2 = response_2 + .headers() + .get(CONTENT_TYPE) + .ok_or("No content type in response from miniaturs")? + .to_str()?; + assert_eq!( + expected_image_format.to_mime_type(), + response_content_type_2 + ); + + let response_bytes_2 = response_2.into_body().collect().await?.to_bytes(); + + let image_reader_2 = + ImageReader::new(Cursor::new(response_bytes_2)).with_guessed_format()?; + + assert_eq!( + expected_image_format, + image_reader_2.format().ok_or("Could not guess format")? + ); + let dynamic_image_2 = image_reader_2.decode()?; + + assert_eq!(target_width_2 as u32, dynamic_image_2.width()); + + Ok(()) + } + + async fn test_metadata(image_url: &str, resize: ImageResize) -> TestResult<()> { + let signed_path = + signed_metadata_path(&config().await.authentication_settings, resize, image_url)?; + let response = app() + .await? + .oneshot(Request::builder().uri(signed_path).body(Body::empty())?) + .await?; + assert_eq!(StatusCode::OK, response.status()); + + let mut body_as_metadata: MetadataResponse = + serde_json::from_slice(response.into_body().collect().await?.to_bytes().as_ref())?; + + assert_eq!(image_url, body_as_metadata.source.url); + + // So we can pop easily + body_as_metadata.operations.reverse(); + + let mut op = body_as_metadata.operations.pop().unwrap(); + assert_eq!("resize", op.r#type); + assert_eq!(resize.target_width.unsigned_abs(), op.width.unwrap()); + assert_eq!(resize.target_height.unsigned_abs(), op.height.unwrap()); + + if resize.target_width.is_negative() { + op = body_as_metadata.operations.pop().unwrap(); + assert_eq!("flip_horizontally", op.r#type); + assert_eq!(None, op.width); + assert_eq!(None, op.height); + } + + if resize.target_height.is_negative() { + op = body_as_metadata.operations.pop().unwrap(); + assert_eq!("flip_vertically", op.r#type); + assert_eq!(None, op.width); + assert_eq!(None, op.height); + } + + Ok(()) + } + + async fn retrieve_unprocessed_cached( + image_url: &str, + ) -> Option> { + let config = config().await; + let app_components = AppComponents::create(config.clone()).ok()?; + let unprocessed_cache_retrieve_req = ImageFetchRequest { + requested_image_url: image_url.to_string(), + }; + app_components + .unprocessed_images_cacher + .get(&unprocessed_cache_retrieve_req) + .await + .unwrap() + } + + async fn retrieve_processed_cached( + image_url: &str, + resize_target: ImageResize, + ) -> Option> { + let config = config().await; + let app_components = AppComponents::create(config.clone()).ok()?; + let processed_cache_retrieve_req = ImageResizeRequest { + requested_image_url: image_url.to_string(), + operations: Operations::build(&Some(resize_target)), + }; + app_components + .processed_images_cacher + .get(&processed_cache_retrieve_req) + .await + .unwrap() + } + + fn signed_resize_path( + auth_settings: &AuthenticationSettings, + resize_target: ImageResize, + + url: &str, + ) -> TestResult { + let target_width = resize_target.target_width; + let target_height = resize_target.target_height; + let path = format!("{target_width}x{target_height}/{url}"); + let hash = make_url_safe_base64_hash(&auth_settings.shared_secret, &path)?; + Ok(format!("/{hash}/{path}")) + } + + fn signed_metadata_path( + auth_settings: &AuthenticationSettings, + resize_target: ImageResize, + + url: &str, + ) -> TestResult { + let target_width = resize_target.target_width; + let target_height = resize_target.target_height; + let path = format!("meta/{target_width}x{target_height}/{url}"); + let hash = make_url_safe_base64_hash(&auth_settings.shared_secret, &path)?; + Ok(format!("/{hash}/{path}")) + } + + async fn app() -> Result> { + let config = config().await; + let app_components = AppComponents::create(config.clone())?; + Ok(create_router(app_components)) + } + + async fn config() -> &'static Config { + CONFIG + .get_or_init(|| async { + let authentication_settings = AuthenticationSettings { + shared_secret: "omgwtfbbq".to_string(), + }; + + let image_cache_settings = ImageCacheSettings { + processed_images_bucket_name: processed_bucket().await.to_string(), + unprocessed_images_bucket_name: unprocessed_bucket().await.to_string(), + }; + + let aws_settings = AwsSettings { + aws_config: aws_config().await.clone(), + path_style_s3: true, + }; + + Config { + authentication_settings, + image_cache_settings, + aws_settings, + } + }) + .await + } + + static UNPROCCESSED_BUCKET_NAME: &'static str = "unprocessed-bucket"; + async fn unprocessed_bucket() -> &'static String { + UNPROCESSED_BUCKET + .get_or_init(|| async { + bootstrap_s3_client() + .await + .create_bucket() + .bucket(UNPROCCESSED_BUCKET_NAME.to_string()) + .send() + .await + .expect("Bucket creation should work"); + UNPROCCESSED_BUCKET_NAME.to_string() + }) + .await + } + static PROCCESSED_BUCKET_NAME: &'static str = "processed-bucket"; + async fn processed_bucket() -> &'static String { + PROCESSED_BUCKET + .get_or_init(|| async { + bootstrap_s3_client() + .await + .create_bucket() + .bucket(PROCCESSED_BUCKET_NAME.to_string()) + .send() + .await + .expect("Bucket creation should work"); + PROCCESSED_BUCKET_NAME.to_string() + }) + .await + } + + async fn bootstrap_s3_client() -> &'static aws_sdk_s3::Client { + BOOTSTRAP_S3_CLIENT + .get_or_init(|| async { + let config = aws_config().await; + aws_sdk_s3::Client::new(&config) + }) + .await + } + + async fn aws_config() -> &'static aws_config::SdkConfig { + AWS_CONFIG + .get_or_init(|| async { + let node = localstack_node().await; + let host_port = node + .get_host_port_ipv4(4566) + .await + .expect("Port from Localstack to be retrievable"); + + let region_provider = RegionProviderChain::default_provider().or_else("us-east-1"); + let region = region_provider.region().await.unwrap(); + let creds = + aws_sdk_s3::config::Credentials::new("fake", "fake", None, None, "test"); + aws_config::defaults(BehaviorVersion::v2024_03_28()) + .region(region.clone()) + .credentials_provider(creds) + .endpoint_url(format!("http://127.0.0.1:{host_port}")) + .load() + .await + }) + .await + } +} diff --git a/server/src/main.rs b/server/src/main.rs index 9576881..f0811f4 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -35,432 +35,3 @@ async fn main() -> Result<(), Error> { run(router).await } - -#[cfg(test)] -mod tests { - use std::io::Cursor; - - use super::*; - - use aws_config::{meta::region::RegionProviderChain, BehaviorVersion}; - use axum::{body::Body, http::Request, Router}; - use http_body_util::BodyExt; - use image::{ImageFormat, ImageReader}; - use lambda_http::tower::ServiceExt; - use miniaturs_server::api::responses::MetadataResponse; - use miniaturs_server::infra::config::{ - AuthenticationSettings, AwsSettings, ImageCacheSettings, - }; - use miniaturs_server::infra::image_caching::*; - use miniaturs_server::infra::image_manipulation::Operations; - use miniaturs_shared::signature::make_url_safe_base64_hash; - use reqwest::{header::CONTENT_TYPE, StatusCode}; - use testcontainers::{runners::AsyncRunner, ContainerAsync, ImageExt}; - use testcontainers_modules::localstack::LocalStack; - use tokio::sync::OnceCell; - - // Holds the shared Localstack container; if it's not held here, the container ref gets dropped and the _actual_ container - // gets stopped - static LOCALSTACK_NODE: OnceCell> = OnceCell::const_new(); - - static UNPROCESSED_BUCKET: OnceCell = OnceCell::const_new(); - static PROCESSED_BUCKET: OnceCell = OnceCell::const_new(); - static CONFIG: OnceCell = OnceCell::const_new(); - // Used for setting up buckets only... - static BOOTSTRAP_S3_CLIENT: OnceCell = OnceCell::const_new(); - static AWS_CONFIG: OnceCell = OnceCell::const_new(); - - type TestResult = Result>; - - #[tokio::test] - async fn test_root_response() -> TestResult<()> { - let app = app().await?; - let response = app - .oneshot(Request::builder().uri("/").body(Body::empty())?) - .await?; - assert_eq!(StatusCode::OK, response.status()); - Ok(()) - } - - const PNG_URL_1: &'static str = "https://beachape.com/images/octopress_with_container.png"; - const JPG_URL_1: &'static str = "https://beachape.com/images/super-high-performance.jpg"; - - #[tokio::test] - async fn test_resize_png() -> TestResult<()> { - test_resize( - PNG_URL_1, - ImageFormat::Png, - ImageResize { - target_width: 100, - target_height: 80, - }, - ImageResize { - target_width: 300, - target_height: 100, - }, - ) - .await - } - - #[tokio::test] - async fn test_resize_jpg() -> TestResult<()> { - test_resize( - JPG_URL_1, - ImageFormat::Jpeg, - ImageResize { - target_width: 50, - target_height: 70, - }, - ImageResize { - target_width: 500, - target_height: 600, - }, - ) - .await - } - - #[tokio::test] - async fn test_metadata_response() -> TestResult<()> { - test_metadata( - &PNG_URL_1, - ImageResize { - target_width: 100, - target_height: 80, - }, - ) - .await?; - test_metadata( - &PNG_URL_1, - ImageResize { - target_width: -100, - target_height: 80, - }, - ) - .await?; - test_metadata( - &JPG_URL_1, - ImageResize { - target_width: 100, - target_height: -80, - }, - ) - .await?; - test_metadata( - &PNG_URL_1, - ImageResize { - target_width: -100, - target_height: -80, - }, - ) - .await - } - - async fn test_resize( - image_url: &str, - expected_image_format: ImageFormat, - resize_target_1: ImageResize, - resize_target_2: ImageResize, - ) -> TestResult<()> { - let target_width_1 = resize_target_1.target_width; - - let signed_path_1 = signed_resize_path( - &config().await.authentication_settings, - resize_target_1, - image_url, - )?; - - // ensure nothing cached right now - assert!(retrieve_unprocessed_cached(image_url).await.is_none()); - assert!(retrieve_processed_cached(image_url, resize_target_1) - .await - .is_none()); - assert!(retrieve_processed_cached(image_url, resize_target_2) - .await - .is_none()); - - let response_1 = app() - .await? - .oneshot(Request::builder().uri(signed_path_1).body(Body::empty())?) - .await?; - assert_eq!(StatusCode::OK, response_1.status()); - - // ensure what should be cached is cached - assert!(retrieve_unprocessed_cached(image_url).await.is_some()); - assert!(retrieve_processed_cached(image_url, resize_target_1) - .await - .is_some()); - assert!(retrieve_processed_cached(image_url, resize_target_2) - .await - .is_none()); - - let response_content_type_1 = response_1 - .headers() - .get(CONTENT_TYPE) - .ok_or("No content type in response from miniaturs")? - .to_str()?; - assert_eq!( - expected_image_format.to_mime_type(), - response_content_type_1 - ); - - let response_bytes_1 = response_1.into_body().collect().await?.to_bytes(); - - let image_reader_1 = - ImageReader::new(Cursor::new(response_bytes_1)).with_guessed_format()?; - - assert_eq!( - expected_image_format, - image_reader_1.format().ok_or("Could not guess format")? - ); - let dynamic_image_1 = image_reader_1.decode()?; - - assert_eq!(target_width_1 as u32, dynamic_image_1.width()); - - // resize again - let target_width_2 = resize_target_2.target_width; - let signed_path_2 = signed_resize_path( - &config().await.authentication_settings, - resize_target_2, - image_url, - )?; - - let response_2 = app() - .await? - .oneshot(Request::builder().uri(signed_path_2).body(Body::empty())?) - .await?; - assert_eq!(StatusCode::OK, response_2.status()); - // ensure what should be cached is cached - assert!(retrieve_unprocessed_cached(image_url).await.is_some()); - assert!(retrieve_processed_cached(image_url, resize_target_1) - .await - .is_some()); - assert!(retrieve_processed_cached(image_url, resize_target_2) - .await - .is_some()); - - let response_content_type_2 = response_2 - .headers() - .get(CONTENT_TYPE) - .ok_or("No content type in response from miniaturs")? - .to_str()?; - assert_eq!( - expected_image_format.to_mime_type(), - response_content_type_2 - ); - - let response_bytes_2 = response_2.into_body().collect().await?.to_bytes(); - - let image_reader_2 = - ImageReader::new(Cursor::new(response_bytes_2)).with_guessed_format()?; - - assert_eq!( - expected_image_format, - image_reader_2.format().ok_or("Could not guess format")? - ); - let dynamic_image_2 = image_reader_2.decode()?; - - assert_eq!(target_width_2 as u32, dynamic_image_2.width()); - - Ok(()) - } - - async fn test_metadata(image_url: &str, resize: ImageResize) -> TestResult<()> { - let signed_path = - signed_metadata_path(&config().await.authentication_settings, resize, image_url)?; - let response = app() - .await? - .oneshot(Request::builder().uri(signed_path).body(Body::empty())?) - .await?; - assert_eq!(StatusCode::OK, response.status()); - - let mut body_as_metadata: MetadataResponse = - serde_json::from_slice(response.into_body().collect().await?.to_bytes().as_ref())?; - - assert_eq!(image_url, body_as_metadata.source.url); - - // So we can pop easily - body_as_metadata.operations.reverse(); - - let mut op = body_as_metadata.operations.pop().unwrap(); - assert_eq!("resize", op.r#type); - assert_eq!(resize.target_width.unsigned_abs(), op.width.unwrap()); - assert_eq!(resize.target_height.unsigned_abs(), op.height.unwrap()); - - if resize.target_width.is_negative() { - op = body_as_metadata.operations.pop().unwrap(); - assert_eq!("flip_horizontally", op.r#type); - assert_eq!(None, op.width); - assert_eq!(None, op.height); - } - - if resize.target_height.is_negative() { - op = body_as_metadata.operations.pop().unwrap(); - assert_eq!("flip_vertically", op.r#type); - assert_eq!(None, op.width); - assert_eq!(None, op.height); - } - - Ok(()) - } - - async fn retrieve_unprocessed_cached( - image_url: &str, - ) -> Option> { - let config = config().await; - let app_components = AppComponents::create(config.clone()).ok()?; - let unprocessed_cache_retrieve_req = ImageFetchRequest { - requested_image_url: image_url.to_string(), - }; - app_components - .unprocessed_images_cacher - .get(&unprocessed_cache_retrieve_req) - .await - .unwrap() - } - - async fn retrieve_processed_cached( - image_url: &str, - resize_target: ImageResize, - ) -> Option> { - let config = config().await; - let app_components = AppComponents::create(config.clone()).ok()?; - let processed_cache_retrieve_req = ImageResizeRequest { - requested_image_url: image_url.to_string(), - operations: Operations::build(&Some(resize_target)), - }; - app_components - .processed_images_cacher - .get(&processed_cache_retrieve_req) - .await - .unwrap() - } - - fn signed_resize_path( - auth_settings: &AuthenticationSettings, - resize_target: ImageResize, - - url: &str, - ) -> TestResult { - let target_width = resize_target.target_width; - let target_height = resize_target.target_height; - let path = format!("{target_width}x{target_height}/{url}"); - let hash = make_url_safe_base64_hash(&auth_settings.shared_secret, &path)?; - Ok(format!("/{hash}/{path}")) - } - - fn signed_metadata_path( - auth_settings: &AuthenticationSettings, - resize_target: ImageResize, - - url: &str, - ) -> TestResult { - let target_width = resize_target.target_width; - let target_height = resize_target.target_height; - let path = format!("meta/{target_width}x{target_height}/{url}"); - let hash = make_url_safe_base64_hash(&auth_settings.shared_secret, &path)?; - Ok(format!("/{hash}/{path}")) - } - - async fn app() -> Result> { - let config = config().await; - let app_components = AppComponents::create(config.clone())?; - Ok(create_router(app_components)) - } - - async fn config() -> &'static Config { - CONFIG - .get_or_init(|| async { - let authentication_settings = AuthenticationSettings { - shared_secret: "omgwtfbbq".to_string(), - }; - - let image_cache_settings = ImageCacheSettings { - processed_images_bucket_name: processed_bucket().await.to_string(), - unprocessed_images_bucket_name: unprocessed_bucket().await.to_string(), - }; - - let aws_settings = AwsSettings { - aws_config: aws_config().await.clone(), - path_style_s3: true, - }; - - Config { - authentication_settings, - image_cache_settings, - aws_settings, - } - }) - .await - } - - static UNPROCCESSED_BUCKET_NAME: &'static str = "unprocessed-bucket"; - async fn unprocessed_bucket() -> &'static String { - UNPROCESSED_BUCKET - .get_or_init(|| async { - bootstrap_s3_client() - .await - .create_bucket() - .bucket(UNPROCCESSED_BUCKET_NAME.to_string()) - .send() - .await - .expect("Bucket creation should work"); - UNPROCCESSED_BUCKET_NAME.to_string() - }) - .await - } - static PROCCESSED_BUCKET_NAME: &'static str = "processed-bucket"; - async fn processed_bucket() -> &'static String { - PROCESSED_BUCKET - .get_or_init(|| async { - bootstrap_s3_client() - .await - .create_bucket() - .bucket(PROCCESSED_BUCKET_NAME.to_string()) - .send() - .await - .expect("Bucket creation should work"); - PROCCESSED_BUCKET_NAME.to_string() - }) - .await - } - - async fn bootstrap_s3_client() -> &'static aws_sdk_s3::Client { - BOOTSTRAP_S3_CLIENT - .get_or_init(|| async { - let config = aws_config().await; - aws_sdk_s3::Client::new(&config) - }) - .await - } - - async fn aws_config() -> &'static aws_config::SdkConfig { - AWS_CONFIG - .get_or_init(|| async { - let node = LOCALSTACK_NODE - .get_or_init(|| async { - LocalStack::default() - .with_env_var("SERVICES", "s3") - .start() - .await - .expect("Localstack to start properly") - }) - .await; - let host_port = node - .get_host_port_ipv4(4566) - .await - .expect("Port from Localstack to be retrievable"); - - let region_provider = RegionProviderChain::default_provider().or_else("us-east-1"); - let region = region_provider.region().await.unwrap(); - let creds = - aws_sdk_s3::config::Credentials::new("fake", "fake", None, None, "test"); - aws_config::defaults(BehaviorVersion::v2024_03_28()) - .region(region.clone()) - .credentials_provider(creds) - .endpoint_url(format!("http://127.0.0.1:{host_port}")) - .load() - .await - }) - .await - } -} diff --git a/server/src/test_utils/mod.rs b/server/src/test_utils/mod.rs new file mode 100644 index 0000000..518316e --- /dev/null +++ b/server/src/test_utils/mod.rs @@ -0,0 +1,113 @@ +use std::thread; + +use testcontainers::{runners::AsyncRunner, ContainerAsync, ImageExt}; +use testcontainers_modules::localstack::LocalStack; +use tokio::sync::{ + mpsc::{self, Receiver, Sender}, + Mutex, OnceCell, +}; + +pub type TestResult = Result>; + +enum ContainerCommands { + Stop, +} + +struct Channel { + tx: Sender, + rx: Mutex>, +} + +fn channel() -> Channel { + let (tx, rx) = mpsc::channel(32); + Channel { + tx, + rx: Mutex::new(rx), + } +} + +// Holds the shared Localstack container; if it's not held here, the container ref gets dropped and the _actual_ container +// gets stopped +static LOCALSTACK_NODE: OnceCell> = OnceCell::const_new(); +pub async fn localstack_node() -> &'static ContainerAsync { + LOCALSTACK_NODE + .get_or_init(|| async { + LocalStack::default() + .with_env_var("SERVICES", "s3") + .start() + .await + .expect("Localstack to start properly") + }) + .await +} + +// Holds a channel that we use to listen to requests to shut down the localstack container +static LOCALSTACK_CHANNEL: std::sync::OnceLock> = + std::sync::OnceLock::new(); +fn localstack_channel() -> &'static Channel { + LOCALSTACK_CHANNEL.get_or_init(|| channel()) +} + +// Holds a channel that we use to block on to messages indicating that the localstack container has been shut down +static LOCALSTACK_SHUT_DOWN_NOTIFIER_CHANNEL: std::sync::OnceLock> = + std::sync::OnceLock::new(); +fn localstack_shut_down_notifier_channel() -> &'static Channel<()> { + LOCALSTACK_SHUT_DOWN_NOTIFIER_CHANNEL.get_or_init(|| channel()) +} + +// Holds a static Tokio runtime for blocking ops +static TOKIO_RUNTIME: std::sync::OnceLock = std::sync::OnceLock::new(); +fn tokio_runtume() -> &'static tokio::runtime::Runtime { + TOKIO_RUNTIME.get_or_init(|| tokio::runtime::Runtime::new().unwrap()) +} + +// Setup hooks registration +#[ctor::ctor] +fn on_startup() { + setup_localstack(); +} + +// Shutdown hook registration +#[ctor::dtor] +fn on_shutdown() { + shutdown_localstack(); +} + +// Function to set up localstack and a thread to listen on the shutdown command channel +fn setup_localstack() { + thread::spawn(|| { + tokio_runtume().block_on(start_localstack()); + // This needs to be here otherwise the container did not call the drop function before the application stops + localstack_shut_down_notifier_channel() + .tx + .blocking_send(()) + .unwrap(); + }); +} + +// Function to send a shutdown command and block on receiving a message that it has occured +fn shutdown_localstack() { + localstack_channel() + .tx + .blocking_send(ContainerCommands::Stop) + .unwrap(); + localstack_shut_down_notifier_channel() + .rx + .blocking_lock() + .blocking_recv() + .unwrap(); +} + +// Start localstack +async fn start_localstack() { + let localstack_node_container = localstack_node().await; + let mut rx = localstack_channel().rx.lock().await; + while let Some(command) = rx.recv().await { + match command { + ContainerCommands::Stop => { + localstack_node_container.stop().await.unwrap(); + rx.close(); + } + } + } +}