Skip to content

Commit

Permalink
test benchmark
Browse files Browse the repository at this point in the history
  • Loading branch information
eaypek-tfh committed Jan 19, 2025
1 parent 84d1854 commit 4c9dd50
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 12 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/temp-branch-build-and-push.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name: Branch - Build and push docker image
on:
push:
branches:
- "chore/use-custom-http-client-in-s3"
- "test-benchmark"

concurrency:
group: '${{ github.workflow }} @ ${{ github.event.pull_request.head.label || github.head_ref || github.ref }}'
Expand Down
4 changes: 3 additions & 1 deletion iris-mpc-store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ use iris_mpc_common::{
iris_db::iris::IrisCode,
};
use rand::{rngs::StdRng, Rng, SeedableRng};
pub use s3_importer::{fetch_and_parse_chunks, last_snapshot_timestamp, ObjectStore, S3Store};
pub use s3_importer::{
fetch_and_parse_chunks, fetch_to_memory, last_snapshot_timestamp, ObjectStore, S3Store,
};
use sqlx::{
migrate::Migrator, postgres::PgPoolOptions, Executor, PgPool, Postgres, Row, Transaction,
};
Expand Down
66 changes: 64 additions & 2 deletions iris-mpc-store/src/s3_importer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use std::{
},
time::Instant,
};
use tokio::io::AsyncReadExt;
use tokio::{io::AsyncReadExt, task};

const SINGLE_ELEMENT_SIZE: usize = IRIS_CODE_LENGTH * mem::size_of::<u16>() * 2
+ MASK_CODE_LENGTH * mem::size_of::<u16>() * 2
Expand Down Expand Up @@ -187,7 +187,7 @@ impl ObjectStore for S3Store {
}
}

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct LastSnapshotDetails {
pub timestamp: i64,
pub last_serial_id: i64,
Expand Down Expand Up @@ -313,6 +313,68 @@ pub async fn fetch_and_parse_chunks(
result_stream
}

pub async fn fetch_to_memory(
store: Arc<impl ObjectStore>,
concurrency: usize,
prefix_name: String,
last_snapshot_details: LastSnapshotDetails,
) -> eyre::Result<()> {
let mut handles: Vec<task::JoinHandle<Result<(), eyre::Error>>> = Vec::new();
let mut active_handles = 0;
let range_size = MAX_RANGE_SIZE;
let single_range_bytes = range_size * SINGLE_ELEMENT_SIZE;
for chunk in (1..=last_snapshot_details.last_serial_id).step_by(range_size) {
let chunk_id =
(chunk / last_snapshot_details.chunk_size) * last_snapshot_details.chunk_size + 1;
let prefix_name = prefix_name.clone();
let offset_within_chunk = (chunk - chunk_id) as usize;

// Wait if we've hit the concurrency limit
if active_handles >= concurrency {
let handle = handles.remove(0);
handle.await??;
active_handles -= 1;
}

handles.push(task::spawn({
let store = Arc::clone(&store); // Clone the Arc to share ownership
let mut slice = vec![0u8; single_range_bytes];
async move {
let mut result = store
.get_object(
&format!("{}/{}.bin", prefix_name, chunk_id),
(
offset_within_chunk * SINGLE_ELEMENT_SIZE,
(offset_within_chunk + range_size) * SINGLE_ELEMENT_SIZE,
),
)
.await?
.into_async_read();

let mut bytes_read = 0;
while bytes_read < single_range_bytes {
let n = result.read(&mut slice[bytes_read..]).await?;
if n == 0 {
break;
}
bytes_read += n;
}

Ok(())
}
}));

active_handles += 1;
}

// Wait for remaining handles
for handle in handles {
handle.await??;
}

Ok(())
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
32 changes: 24 additions & 8 deletions iris-mpc/src/bin/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ use iris_mpc_gpu::{
},
};
use iris_mpc_store::{
fetch_and_parse_chunks, last_snapshot_timestamp, S3Store, Store, StoredIris, StoredIrisRef,
fetch_and_parse_chunks, fetch_to_memory, last_snapshot_timestamp, S3Store, Store, StoredIris,
StoredIrisRef,
};
use metrics_exporter_statsd::StatsdBuilder;
use reqwest::StatusCode;
Expand Down Expand Up @@ -1160,17 +1161,32 @@ async fn server_main(config: Config) -> eyre::Result<()> {
"Initialize iris db: Loading from DB (parallelism: {})",
parallelism
);
let s3_store = S3Store::new(db_chunks_s3_client, db_chunks_bucket_name);
let db_chunks_s3_store =
S3Store::new(db_chunks_s3_client.clone(), db_chunks_bucket_name.clone());

tokio::runtime::Handle::current().block_on(async {
// First fetch last snapshot from S3
let last_snapshot_details = last_snapshot_timestamp(
&db_chunks_s3_store,
db_chunks_folder_name.clone(),
)
.await?;
let fetch_test_ts = Instant::now();
fetch_to_memory(
Arc::new(db_chunks_s3_store),
load_chunks_parallelism,
db_chunks_folder_name.clone(),
last_snapshot_details.clone(),
)
.await?;
let elapsed = fetch_test_ts.elapsed();
tracing::info!("Fetch to memory took {:?}", elapsed);

let s3_store = S3Store::new(db_chunks_s3_client, db_chunks_bucket_name);

let mut stream = match config.enable_s3_importer {
true => {
tracing::info!("S3 importer enabled. Fetching from s3 + db");
// First fetch last snapshot from S3
let last_snapshot_details = last_snapshot_timestamp(
&s3_store,
db_chunks_folder_name.clone(),
)
.await?;
let min_last_modified_at = last_snapshot_details.timestamp
- config.db_load_safety_overlap_seconds;
tracing::info!(
Expand Down

0 comments on commit 4c9dd50

Please sign in to comment.