Skip to content

Commit

Permalink
remove test function
Browse files Browse the repository at this point in the history
  • Loading branch information
eaypek-tfh committed Jan 27, 2025
1 parent 9ac64a3 commit 0992d33
Show file tree
Hide file tree
Showing 2 changed files with 1 addition and 67 deletions.
3 changes: 1 addition & 2 deletions iris-mpc-store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@ use iris_mpc_common::{
};
use rand::{rngs::StdRng, Rng, SeedableRng};
pub use s3_importer::{
fetch_and_parse_chunks, fetch_to_memory, last_snapshot_timestamp, ObjectStore, S3Store,
S3StoredIris,
fetch_and_parse_chunks, last_snapshot_timestamp, ObjectStore, S3Store, S3StoredIris,
};
use sqlx::{
migrate::Migrator, postgres::PgPoolOptions, Executor, PgPool, Postgres, Row, Transaction,
Expand Down
65 changes: 0 additions & 65 deletions iris-mpc-store/src/s3_importer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,71 +291,6 @@ pub async fn fetch_and_parse_chunks(
Ok(())
}

pub async fn fetch_to_memory(
store: Arc<impl ObjectStore>,
concurrency: usize,
prefix_name: String,
last_snapshot_details: LastSnapshotDetails,
) -> eyre::Result<()> {
let mut handles: Vec<task::JoinHandle<Result<(), eyre::Error>>> = Vec::new();
let mut active_handles = 0;
let range_size = MAX_RANGE_SIZE;
for chunk in (1..=last_snapshot_details.last_serial_id).step_by(range_size) {
let chunk_id =
(chunk / last_snapshot_details.chunk_size) * last_snapshot_details.chunk_size + 1;
let prefix_name = prefix_name.clone();
let offset_within_chunk = (chunk - chunk_id) as usize;

// Wait if we've hit the concurrency limit
if active_handles >= concurrency {
let handle = handles.remove(0);
handle.await??;
active_handles -= 1;
}

handles.push(task::spawn({
let store = Arc::clone(&store); // Clone the Arc to share ownership
let mut slice = vec![0u8; SINGLE_ELEMENT_SIZE];
async move {
let mut result = store
.get_object(
&format!("{}/{}.bin", prefix_name, chunk_id),
(
offset_within_chunk * SINGLE_ELEMENT_SIZE,
(offset_within_chunk + range_size) * SINGLE_ELEMENT_SIZE,
),
)
.await?
.into_async_read();

loop {
match result.read_exact(&mut slice).await {
Ok(_) => {
let iris = S3StoredIris::from_bytes(&slice)?;
if iris.id % 100000 == 0 {
tracing::info!("Fetched iris with id: {}", iris.id);
}
}
Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => break,
Err(e) => return Err(e.into()),
}
}

Ok(())
}
}));

active_handles += 1;
}

// Wait for remaining handles
for handle in handles {
handle.await??;
}

Ok(())
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down

0 comments on commit 0992d33

Please sign in to comment.