diff --git a/deploy/stage/common-values-iris-mpc.yaml b/deploy/stage/common-values-iris-mpc.yaml index cd5c1a2e8..4a6910627 100644 --- a/deploy/stage/common-values-iris-mpc.yaml +++ b/deploy/stage/common-values-iris-mpc.yaml @@ -1,4 +1,4 @@ -image: "ghcr.io/worldcoin/iris-mpc:c4e143f106b49e99b566af806a7d5fcc0d8a143b" +image: "ghcr.io/worldcoin/iris-mpc:5981970079056315f7bb09bfb4478a54d2764fcd" environment: stage replicaCount: 1 diff --git a/deploy/stage/smpcv2-0-stage/values-iris-mpc.yaml b/deploy/stage/smpcv2-0-stage/values-iris-mpc.yaml index e21cbb16a..bdd56b915 100644 --- a/deploy/stage/smpcv2-0-stage/values-iris-mpc.yaml +++ b/deploy/stage/smpcv2-0-stage/values-iris-mpc.yaml @@ -96,7 +96,7 @@ env: value: "true" - name: SMPC__PAGE_LOCK_CHUNK_PERCENTAGE - value: "25" + value: "10" - name: SMPC__CLEAR_DB_BEFORE_INIT value: "true" diff --git a/deploy/stage/smpcv2-1-stage/values-iris-mpc.yaml b/deploy/stage/smpcv2-1-stage/values-iris-mpc.yaml index 1729f08ab..3d98651fb 100644 --- a/deploy/stage/smpcv2-1-stage/values-iris-mpc.yaml +++ b/deploy/stage/smpcv2-1-stage/values-iris-mpc.yaml @@ -96,7 +96,7 @@ env: value: "true" - name: SMPC__PAGE_LOCK_CHUNK_PERCENTAGE - value: "25" + value: "10" - name: SMPC__CLEAR_DB_BEFORE_INIT value: "true" diff --git a/deploy/stage/smpcv2-2-stage/values-iris-mpc.yaml b/deploy/stage/smpcv2-2-stage/values-iris-mpc.yaml index d11468f52..76fe01db0 100644 --- a/deploy/stage/smpcv2-2-stage/values-iris-mpc.yaml +++ b/deploy/stage/smpcv2-2-stage/values-iris-mpc.yaml @@ -96,7 +96,7 @@ env: value: "true" - name: SMPC__PAGE_LOCK_CHUNK_PERCENTAGE - value: "25" + value: "10" - name: SMPC__CLEAR_DB_BEFORE_INIT value: "true" diff --git a/iris-mpc-gpu/src/helpers/mod.rs b/iris-mpc-gpu/src/helpers/mod.rs index 5274c2912..e757e924b 100644 --- a/iris-mpc-gpu/src/helpers/mod.rs +++ b/iris-mpc-gpu/src/helpers/mod.rs @@ -177,6 +177,11 @@ pub fn register_host_memory( chunk_offset: usize, code_length: usize, ) { + tracing::info!( + "Page-locking chunk: [{}-{}]", + chunk_offset, + chunk_offset + chunk_length + ); let size = chunk_length / device_manager.device_count(); for (device_index, device) in device_manager.devices().iter().enumerate() { device.bind_to_thread().unwrap(); @@ -194,4 +199,9 @@ pub fn register_host_memory( ); } } + tracing::info!( + "Page-lock completed for chunk: [{}-{}]", + chunk_offset, + chunk_offset + chunk_length + ); } diff --git a/iris-mpc/src/bin/server.rs b/iris-mpc/src/bin/server.rs index 6f72bb972..3035f0471 100644 --- a/iris-mpc/src/bin/server.rs +++ b/iris-mpc/src/bin/server.rs @@ -51,8 +51,8 @@ use iris_mpc_gpu::{ }, }; use iris_mpc_store::{ - fetch_and_parse_chunks, fetch_to_memory, last_snapshot_timestamp, DbStoredIris, S3Store, - S3StoredIris, Store, StoredIrisRef, + fetch_and_parse_chunks, last_snapshot_timestamp, DbStoredIris, S3Store, S3StoredIris, Store, + StoredIrisRef, }; use metrics_exporter_statsd::StatsdBuilder; use reqwest::StatusCode; @@ -1226,7 +1226,6 @@ async fn server_main(config: Config) -> eyre::Result<()> { // prepare the handle for the rest of the page locks let page_lock_handle = spawn_blocking(move || { for i in 1..n_page_lock_iters { - tracing::info!("Page-locking chunk {}", i); let dbs_clone = dbs.clone(); let device_manager_clone = device_manager_clone.clone(); for (db, code_length) in dbs_clone { @@ -1239,14 +1238,10 @@ async fn server_main(config: Config) -> eyre::Result<()> { code_length, ); } - tracing::info!("Page-locking chunk {} completed", i); } }); now = Instant::now(); - let mut load_summary_ts = Instant::now(); - let mut time_waiting_for_stream = Duration::from_secs(0); - let mut time_loading_into_memory = Duration::from_secs(0); let mut record_counter = 0; let mut all_serial_ids: HashSet = HashSet::from_iter(1..=(store_len as i64)); @@ -1261,24 +1256,6 @@ async fn server_main(config: Config) -> eyre::Result<()> { ) .await?; - if config.test_load_into_memory { - let fetch_test_ts = Instant::now(); - fetch_to_memory( - Arc::new(db_chunks_s3_store), - load_chunks_parallelism, - db_chunks_folder_name.clone(), - last_snapshot_details.clone(), - ) - .await?; - let elapsed = fetch_test_ts.elapsed(); - tracing::info!( - "Fetch to memory took {:?} with {} par", - elapsed, - load_chunks_parallelism - ); - load_summary_ts = Instant::now(); - } - let min_last_modified_at = last_snapshot_details.timestamp - config.db_load_safety_overlap_seconds; tracing::info!( @@ -1287,23 +1264,6 @@ async fn server_main(config: Config) -> eyre::Result<()> { min_last_modified_at ); - let stream_db = store - .stream_irises_par(Some(min_last_modified_at), parallelism) - .await - .boxed(); - load_db_records( - &mut actor, - load_summary_ts, - time_waiting_for_stream, - time_loading_into_memory, - record_counter, - &mut all_serial_ids, - stream_db, - ) - .await; - - let n_loaded_from_db = record_counter; - let mut n_skipped_from_s3 = 0; let s3_store = S3Store::new(db_chunks_s3_client, db_chunks_bucket_name); let s3_arc = Arc::new(s3_store); @@ -1319,26 +1279,29 @@ async fn server_main(config: Config) -> eyre::Result<()> { tx.clone(), ) .await - .expect("Couldn't fetch and parse chunks"); + .expect("Couldn't fetch and parse chunks from s3"); }); - time_waiting_for_stream = Duration::from_secs(0); - time_loading_into_memory = Duration::from_secs(0); - load_summary_ts = Instant::now(); + let mut time_waiting_for_stream = Duration::from_secs(0); + let mut time_loading_into_memory = Duration::from_secs(0); + let mut load_summary_ts = Instant::now(); while let Some(iris) = rx.recv().await { time_waiting_for_stream += load_summary_ts.elapsed(); load_summary_ts = Instant::now(); let index = iris.index(); - // if record already loaded via DB, do not override it from S3 - if !all_serial_ids.contains(&(index as i64)) { - n_skipped_from_s3 += 1; - continue; - } - if index == 0 || index > store_len { + if index == 0 { tracing::error!("Invalid iris index {}", index); return Err(eyre!("Invalid iris index {}", index)); + } else if index > store_len { + tracing::warn!( + "Skipping loading from S3: index {} > store_len {}", + index, + store_len + ); + continue; } + actor.load_single_record_from_s3( iris.index() - 1, iris.left_code_odd(), @@ -1374,21 +1337,29 @@ async fn server_main(config: Config) -> eyre::Result<()> { } tracing::info!( "S3 Loading summary => Loaded {:?} items. Waited for stream: \ - {:?}, Loaded into memory: {:?}. Skipped loading from S3: {:?}", - record_counter - n_loaded_from_db, + {:?}, Loaded into memory: {:?}.", + record_counter, time_waiting_for_stream, time_loading_into_memory, - n_skipped_from_s3, ); + + let stream_db = store + .stream_irises_par(Some(min_last_modified_at), parallelism) + .await + .boxed(); + load_db_records( + &mut actor, + record_counter, + &mut all_serial_ids, + stream_db, + ) + .await; } else { tracing::info!("S3 importer disabled. Fetching only from db"); let stream_db = store.stream_irises_par(None, parallelism).await.boxed(); load_db_records( &mut actor, - load_summary_ts, - time_waiting_for_stream, - time_loading_into_memory, record_counter, &mut all_serial_ids, stream_db, @@ -1781,21 +1752,21 @@ async fn server_main(config: Config) -> eyre::Result<()> { async fn load_db_records<'a>( actor: &mut ServerActor, - mut load_summary_ts: Instant, - mut time_waiting_for_stream: Duration, - mut time_loading_into_memory: Duration, mut record_counter: i32, all_serial_ids: &mut HashSet, mut stream_db: BoxStream<'a, eyre::Result>, ) { + let mut load_summary_ts = Instant::now(); + let mut time_waiting_for_stream = Duration::from_secs(0); + let mut time_loading_into_memory = Duration::from_secs(0); + let n_loaded_via_s3 = record_counter; while let Some(iris) = stream_db.next().await { // Update time waiting for the stream time_waiting_for_stream += load_summary_ts.elapsed(); load_summary_ts = Instant::now(); - let iris = iris.unwrap(); // Handle the Result from the stream + let iris = iris.unwrap(); - // Process the iris record actor.load_single_record_from_db( iris.index() - 1, iris.left_code(), @@ -1803,21 +1774,23 @@ async fn load_db_records<'a>( iris.right_code(), iris.right_mask(), ); - actor.increment_db_size(iris.index() - 1); + + // Only increment db size if record has not been loaded via s3 before + if all_serial_ids.contains(&(iris.index() as i64)) { + actor.increment_db_size(iris.index() - 1); + all_serial_ids.remove(&(iris.index() as i64)); + record_counter += 1; + } // Update time spent loading into memory time_loading_into_memory += load_summary_ts.elapsed(); load_summary_ts = Instant::now(); - - // Remove serial ID and increment record counter - all_serial_ids.remove(&(iris.index() as i64)); - record_counter += 1; } tracing::info!( "Aurora Loading summary => Loaded {:?} items. Waited for stream: {:?}, Loaded into \ memory: {:?}", - record_counter, + record_counter - n_loaded_via_s3, time_waiting_for_stream, time_loading_into_memory, );