diff --git a/iris-mpc-store/src/s3_importer.rs b/iris-mpc-store/src/s3_importer.rs index 66ba7203f..951080bb5 100644 --- a/iris-mpc-store/src/s3_importer.rs +++ b/iris-mpc-store/src/s3_importer.rs @@ -13,7 +13,7 @@ const SINGLE_ELEMENT_SIZE: usize = IRIS_CODE_LENGTH * mem::size_of::() * 2 #[async_trait] pub trait ObjectStore: Send + Sync + 'static { async fn get_object(&self, key: &str) -> eyre::Result>; - async fn list_objects(&self) -> eyre::Result>; + async fn list_objects(&self, prefix: &str) -> eyre::Result>; } pub struct S3Store { @@ -42,12 +42,16 @@ impl ObjectStore for S3Store { Ok(data.to_vec()) } - async fn list_objects(&self) -> eyre::Result> { + async fn list_objects(&self, prefix: &str) -> eyre::Result> { let mut objects = Vec::new(); let mut continuation_token = None; loop { - let mut request = self.client.list_objects_v2().bucket(&self.bucket); + let mut request = self + .client + .list_objects_v2() + .bucket(&self.bucket) + .prefix(prefix); if let Some(token) = continuation_token { request = request.continuation_token(token); @@ -79,7 +83,7 @@ pub async fn last_snapshot_timestamp( tracing::info!("Looking for last snapshot time in folder: {}", folder_name); let start_path = format!("{}/", folder_name); store - .list_objects() + .list_objects(folder_name.as_str()) .await? .into_iter() .filter(|f| f.starts_with(start_path.as_str()) && f.ends_with(".timestamp")) @@ -96,8 +100,9 @@ pub async fn last_snapshot_timestamp( pub async fn fetch_and_parse_chunks( store: &impl ObjectStore, concurrency: usize, + folder_name: String, ) -> Pin> + Send + '_>> { - let chunks = store.list_objects().await.unwrap(); + let chunks = store.list_objects(folder_name.as_str()).await.unwrap(); let result_stream = stream::iter(chunks) .filter_map(|chunk| async move { @@ -193,7 +198,7 @@ mod tests { .cloned() } - async fn list_objects(&self) -> eyre::Result> { + async fn list_objects(&self, _: &str) -> eyre::Result> { Ok(self.objects.keys().cloned().collect()) } } @@ -231,11 +236,11 @@ mod tests { } assert_eq!( - store.list_objects().await.unwrap().len(), + store.list_objects("").await.unwrap().len(), MOCK_ENTRIES.div_ceil(MOCK_CHUNK_SIZE) ); - let mut chunks = fetch_and_parse_chunks(&store, 1).await; + let mut chunks = fetch_and_parse_chunks(&store, 1, String::new()).await; let mut count = 0; let mut ids: HashSet = HashSet::from_iter(1..MOCK_ENTRIES); while let Some(chunk) = chunks.next().await { diff --git a/iris-mpc/src/bin/server.rs b/iris-mpc/src/bin/server.rs index 7e4bab23b..ed5a05647 100644 --- a/iris-mpc/src/bin/server.rs +++ b/iris-mpc/src/bin/server.rs @@ -986,7 +986,8 @@ async fn server_main(config: Config) -> eyre::Result<()> { tokio::runtime::Handle::current().block_on(async { // First fetch last snapshot from S3 let last_snapshot_timestamp = - last_snapshot_timestamp(&s3_store, db_chunks_folder_name).await?; + last_snapshot_timestamp(&s3_store, db_chunks_folder_name.clone()) + .await?; let min_last_modified_at = last_snapshot_timestamp - config.db_load_safety_overlap_seconds; tracing::info!( @@ -994,10 +995,14 @@ async fn server_main(config: Config) -> eyre::Result<()> { last_snapshot_timestamp, min_last_modified_at ); - let stream_s3 = fetch_and_parse_chunks(&s3_store, load_chunks_parallelism) - .await - .map(|result| result.map(IrisSource::S3)) - .boxed(); + let stream_s3 = fetch_and_parse_chunks( + &s3_store, + load_chunks_parallelism, + db_chunks_folder_name, + ) + .await + .map(|result| result.map(IrisSource::S3)) + .boxed(); let stream_db = store .stream_irises_par(min_last_modified_at, parallelism)