Skip to content

Commit

Permalink
first load from s3, then db
Browse files Browse the repository at this point in the history
  • Loading branch information
eaypek-tfh committed Jan 26, 2025
1 parent 3efa275 commit b6d4ea7
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 73 deletions.
2 changes: 1 addition & 1 deletion deploy/stage/common-values-iris-mpc.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
image: "ghcr.io/worldcoin/iris-mpc:c4e143f106b49e99b566af806a7d5fcc0d8a143b"
image: "ghcr.io/worldcoin/iris-mpc:5981970079056315f7bb09bfb4478a54d2764fcd"

environment: stage
replicaCount: 1
Expand Down
2 changes: 1 addition & 1 deletion deploy/stage/smpcv2-0-stage/values-iris-mpc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ env:
value: "true"

- name: SMPC__PAGE_LOCK_CHUNK_PERCENTAGE
value: "25"
value: "10"

- name: SMPC__CLEAR_DB_BEFORE_INIT
value: "true"
Expand Down
2 changes: 1 addition & 1 deletion deploy/stage/smpcv2-1-stage/values-iris-mpc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ env:
value: "true"

- name: SMPC__PAGE_LOCK_CHUNK_PERCENTAGE
value: "25"
value: "10"

- name: SMPC__CLEAR_DB_BEFORE_INIT
value: "true"
Expand Down
2 changes: 1 addition & 1 deletion deploy/stage/smpcv2-2-stage/values-iris-mpc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ env:
value: "true"

- name: SMPC__PAGE_LOCK_CHUNK_PERCENTAGE
value: "25"
value: "10"

- name: SMPC__CLEAR_DB_BEFORE_INIT
value: "true"
Expand Down
10 changes: 10 additions & 0 deletions iris-mpc-gpu/src/helpers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,11 @@ pub fn register_host_memory(
chunk_offset: usize,
code_length: usize,
) {
tracing::info!(
"Page-locking chunk: [{}-{}]",
chunk_offset,
chunk_offset + chunk_length
);
let size = chunk_length / device_manager.device_count();
for (device_index, device) in device_manager.devices().iter().enumerate() {
device.bind_to_thread().unwrap();
Expand All @@ -194,4 +199,9 @@ pub fn register_host_memory(
);
}
}
tracing::info!(
"Page-lock completed for chunk: [{}-{}]",
chunk_offset,
chunk_offset + chunk_length
);
}
111 changes: 42 additions & 69 deletions iris-mpc/src/bin/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ use iris_mpc_gpu::{
},
};
use iris_mpc_store::{
fetch_and_parse_chunks, fetch_to_memory, last_snapshot_timestamp, DbStoredIris, S3Store,
S3StoredIris, Store, StoredIrisRef,
fetch_and_parse_chunks, last_snapshot_timestamp, DbStoredIris, S3Store, S3StoredIris, Store,
StoredIrisRef,
};
use metrics_exporter_statsd::StatsdBuilder;
use reqwest::StatusCode;
Expand Down Expand Up @@ -1226,7 +1226,6 @@ async fn server_main(config: Config) -> eyre::Result<()> {
// prepare the handle for the rest of the page locks
let page_lock_handle = spawn_blocking(move || {
for i in 1..n_page_lock_iters {
tracing::info!("Page-locking chunk {}", i);
let dbs_clone = dbs.clone();
let device_manager_clone = device_manager_clone.clone();
for (db, code_length) in dbs_clone {
Expand All @@ -1239,14 +1238,10 @@ async fn server_main(config: Config) -> eyre::Result<()> {
code_length,
);
}
tracing::info!("Page-locking chunk {} completed", i);
}
});

now = Instant::now();
let mut load_summary_ts = Instant::now();
let mut time_waiting_for_stream = Duration::from_secs(0);
let mut time_loading_into_memory = Duration::from_secs(0);
let mut record_counter = 0;
let mut all_serial_ids: HashSet<i64> =
HashSet::from_iter(1..=(store_len as i64));
Expand All @@ -1261,24 +1256,6 @@ async fn server_main(config: Config) -> eyre::Result<()> {
)
.await?;

if config.test_load_into_memory {
let fetch_test_ts = Instant::now();
fetch_to_memory(
Arc::new(db_chunks_s3_store),
load_chunks_parallelism,
db_chunks_folder_name.clone(),
last_snapshot_details.clone(),
)
.await?;
let elapsed = fetch_test_ts.elapsed();
tracing::info!(
"Fetch to memory took {:?} with {} par",
elapsed,
load_chunks_parallelism
);
load_summary_ts = Instant::now();
}

let min_last_modified_at = last_snapshot_details.timestamp
- config.db_load_safety_overlap_seconds;
tracing::info!(
Expand All @@ -1287,23 +1264,6 @@ async fn server_main(config: Config) -> eyre::Result<()> {
min_last_modified_at
);

let stream_db = store
.stream_irises_par(Some(min_last_modified_at), parallelism)
.await
.boxed();
load_db_records(
&mut actor,
load_summary_ts,
time_waiting_for_stream,
time_loading_into_memory,
record_counter,
&mut all_serial_ids,
stream_db,
)
.await;

let n_loaded_from_db = record_counter;
let mut n_skipped_from_s3 = 0;
let s3_store = S3Store::new(db_chunks_s3_client, db_chunks_bucket_name);
let s3_arc = Arc::new(s3_store);

Expand All @@ -1319,26 +1279,29 @@ async fn server_main(config: Config) -> eyre::Result<()> {
tx.clone(),
)
.await
.expect("Couldn't fetch and parse chunks");
.expect("Couldn't fetch and parse chunks from s3");
});

time_waiting_for_stream = Duration::from_secs(0);
time_loading_into_memory = Duration::from_secs(0);
load_summary_ts = Instant::now();
let mut time_waiting_for_stream = Duration::from_secs(0);
let mut time_loading_into_memory = Duration::from_secs(0);
let mut load_summary_ts = Instant::now();
while let Some(iris) = rx.recv().await {
time_waiting_for_stream += load_summary_ts.elapsed();
load_summary_ts = Instant::now();
let index = iris.index();

// if record already loaded via DB, do not override it from S3
if !all_serial_ids.contains(&(index as i64)) {
n_skipped_from_s3 += 1;
continue;
}
if index == 0 || index > store_len {
if index == 0 {
tracing::error!("Invalid iris index {}", index);
return Err(eyre!("Invalid iris index {}", index));
} else if index > store_len {
tracing::warn!(
"Skipping loading from S3: index {} > store_len {}",
index,
store_len
);
continue;
}

actor.load_single_record_from_s3(
iris.index() - 1,
iris.left_code_odd(),
Expand Down Expand Up @@ -1374,21 +1337,29 @@ async fn server_main(config: Config) -> eyre::Result<()> {
}
tracing::info!(
"S3 Loading summary => Loaded {:?} items. Waited for stream: \
{:?}, Loaded into memory: {:?}. Skipped loading from S3: {:?}",
record_counter - n_loaded_from_db,
{:?}, Loaded into memory: {:?}.",
record_counter,
time_waiting_for_stream,
time_loading_into_memory,
n_skipped_from_s3,
);

let stream_db = store
.stream_irises_par(Some(min_last_modified_at), parallelism)
.await
.boxed();
load_db_records(
&mut actor,
record_counter,
&mut all_serial_ids,
stream_db,
)
.await;
} else {
tracing::info!("S3 importer disabled. Fetching only from db");
let stream_db =
store.stream_irises_par(None, parallelism).await.boxed();
load_db_records(
&mut actor,
load_summary_ts,
time_waiting_for_stream,
time_loading_into_memory,
record_counter,
&mut all_serial_ids,
stream_db,
Expand Down Expand Up @@ -1781,43 +1752,45 @@ async fn server_main(config: Config) -> eyre::Result<()> {

async fn load_db_records<'a>(
actor: &mut ServerActor,
mut load_summary_ts: Instant,
mut time_waiting_for_stream: Duration,
mut time_loading_into_memory: Duration,
mut record_counter: i32,
all_serial_ids: &mut HashSet<i64>,
mut stream_db: BoxStream<'a, eyre::Result<DbStoredIris>>,
) {
let mut load_summary_ts = Instant::now();
let mut time_waiting_for_stream = Duration::from_secs(0);
let mut time_loading_into_memory = Duration::from_secs(0);
let n_loaded_via_s3 = record_counter;
while let Some(iris) = stream_db.next().await {
// Update time waiting for the stream
time_waiting_for_stream += load_summary_ts.elapsed();
load_summary_ts = Instant::now();

let iris = iris.unwrap(); // Handle the Result from the stream
let iris = iris.unwrap();

// Process the iris record
actor.load_single_record_from_db(
iris.index() - 1,
iris.left_code(),
iris.left_mask(),
iris.right_code(),
iris.right_mask(),
);
actor.increment_db_size(iris.index() - 1);

// Only increment db size if record has not been loaded via s3 before
if all_serial_ids.contains(&(iris.index() as i64)) {
actor.increment_db_size(iris.index() - 1);
all_serial_ids.remove(&(iris.index() as i64));
record_counter += 1;
}

// Update time spent loading into memory
time_loading_into_memory += load_summary_ts.elapsed();
load_summary_ts = Instant::now();

// Remove serial ID and increment record counter
all_serial_ids.remove(&(iris.index() as i64));
record_counter += 1;
}

tracing::info!(
"Aurora Loading summary => Loaded {:?} items. Waited for stream: {:?}, Loaded into \
memory: {:?}",
record_counter,
record_counter - n_loaded_via_s3,
time_waiting_for_stream,
time_loading_into_memory,
);
Expand Down

0 comments on commit b6d4ea7

Please sign in to comment.