Skip to content

Commit

Permalink
deal only with specified folder
Browse files Browse the repository at this point in the history
  • Loading branch information
eaypek-tfh committed Dec 9, 2024
1 parent 24f69a4 commit e4380d0
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 13 deletions.
21 changes: 13 additions & 8 deletions iris-mpc-store/src/s3_importer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ const SINGLE_ELEMENT_SIZE: usize = IRIS_CODE_LENGTH * mem::size_of::<u16>() * 2
#[async_trait]
pub trait ObjectStore: Send + Sync + 'static {
async fn get_object(&self, key: &str) -> eyre::Result<Vec<u8>>;
async fn list_objects(&self) -> eyre::Result<Vec<String>>;
async fn list_objects(&self, prefix: &str) -> eyre::Result<Vec<String>>;
}

pub struct S3Store {
Expand Down Expand Up @@ -42,12 +42,16 @@ impl ObjectStore for S3Store {
Ok(data.to_vec())
}

async fn list_objects(&self) -> eyre::Result<Vec<String>> {
async fn list_objects(&self, prefix: &str) -> eyre::Result<Vec<String>> {
let mut objects = Vec::new();
let mut continuation_token = None;

loop {
let mut request = self.client.list_objects_v2().bucket(&self.bucket);
let mut request = self
.client
.list_objects_v2()
.bucket(&self.bucket)
.prefix(prefix);

if let Some(token) = continuation_token {
request = request.continuation_token(token);
Expand Down Expand Up @@ -79,7 +83,7 @@ pub async fn last_snapshot_timestamp(
tracing::info!("Looking for last snapshot time in folder: {}", folder_name);
let start_path = format!("{}/", folder_name);
store
.list_objects()
.list_objects(folder_name.as_str())
.await?
.into_iter()
.filter(|f| f.starts_with(start_path.as_str()) && f.ends_with(".timestamp"))
Expand All @@ -96,8 +100,9 @@ pub async fn last_snapshot_timestamp(
pub async fn fetch_and_parse_chunks(
store: &impl ObjectStore,
concurrency: usize,
folder_name: String,
) -> Pin<Box<dyn Stream<Item = eyre::Result<StoredIris>> + Send + '_>> {
let chunks = store.list_objects().await.unwrap();
let chunks = store.list_objects(folder_name.as_str()).await.unwrap();

let result_stream = stream::iter(chunks)
.filter_map(|chunk| async move {
Expand Down Expand Up @@ -193,7 +198,7 @@ mod tests {
.cloned()
}

async fn list_objects(&self) -> eyre::Result<Vec<String>> {
async fn list_objects(&self, _: &str) -> eyre::Result<Vec<String>> {
Ok(self.objects.keys().cloned().collect())
}
}
Expand Down Expand Up @@ -231,11 +236,11 @@ mod tests {
}

assert_eq!(
store.list_objects().await.unwrap().len(),
store.list_objects("").await.unwrap().len(),
MOCK_ENTRIES.div_ceil(MOCK_CHUNK_SIZE)
);

let mut chunks = fetch_and_parse_chunks(&store, 1).await;
let mut chunks = fetch_and_parse_chunks(&store, 1, String::new()).await;
let mut count = 0;
let mut ids: HashSet<usize> = HashSet::from_iter(1..MOCK_ENTRIES);
while let Some(chunk) = chunks.next().await {
Expand Down
15 changes: 10 additions & 5 deletions iris-mpc/src/bin/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -986,18 +986,23 @@ async fn server_main(config: Config) -> eyre::Result<()> {
tokio::runtime::Handle::current().block_on(async {
// First fetch last snapshot from S3
let last_snapshot_timestamp =
last_snapshot_timestamp(&s3_store, db_chunks_folder_name).await?;
last_snapshot_timestamp(&s3_store, db_chunks_folder_name.clone())
.await?;
let min_last_modified_at =
last_snapshot_timestamp - config.db_load_safety_overlap_seconds;
tracing::info!(
"Last snapshot timestamp: {}, min_last_modified_at: {}",
last_snapshot_timestamp,
min_last_modified_at
);
let stream_s3 = fetch_and_parse_chunks(&s3_store, load_chunks_parallelism)
.await
.map(|result| result.map(IrisSource::S3))
.boxed();
let stream_s3 = fetch_and_parse_chunks(
&s3_store,
load_chunks_parallelism,
db_chunks_folder_name,
)
.await
.map(|result| result.map(IrisSource::S3))
.boxed();

let stream_db = store
.stream_irises_par(min_last_modified_at, parallelism)
Expand Down

0 comments on commit e4380d0

Please sign in to comment.