diff --git a/.github/workflows/temp-branch-build-and-push.yaml b/.github/workflows/temp-branch-build-and-push.yaml new file mode 100644 index 000000000..461118538 --- /dev/null +++ b/.github/workflows/temp-branch-build-and-push.yaml @@ -0,0 +1,47 @@ +name: Branch - Build and push docker image + +on: + push: + branches: + - "ertugrul/change-to-binary" + +concurrency: + group: '${{ github.workflow }} @ ${{ github.event.pull_request.head.label || github.head_ref || github.ref }}' + cancel-in-progress: true + +env: + REGISTRY: ghcr.io + IMAGE_NAME: ${{ github.repository }} + +jobs: + docker: + runs-on: + labels: ubuntu-22.04-64core + permissions: + packages: write + contents: read + attestations: write + id-token: write + steps: + - name: Checkout + uses: actions/checkout@v4 + - name: Set up QEMU + uses: docker/setup-qemu-action@v3 + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 + - name: Log in to the Container registry + uses: docker/login-action@v3 + with: + registry: ${{ env.REGISTRY }} + username: ${{ github.repository_owner }} + password: ${{ secrets.GITHUB_TOKEN }} + - name: Build and Push + uses: docker/build-push-action@v6 + with: + context: . + push: true + tags: | + ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:${{ github.sha }} + platforms: linux/amd64 + cache-from: type=gha + cache-to: type=gha,mode=max diff --git a/deploy/stage/common-values-iris-mpc.yaml b/deploy/stage/common-values-iris-mpc.yaml index 58c5a22b2..cdaa36dab 100644 --- a/deploy/stage/common-values-iris-mpc.yaml +++ b/deploy/stage/common-values-iris-mpc.yaml @@ -1,4 +1,4 @@ -image: "ghcr.io/worldcoin/iris-mpc:v0.12.3" +image: "ghcr.io/worldcoin/iris-mpc:v0.12.4" environment: stage replicaCount: 1 @@ -82,4 +82,4 @@ preStop: sleepPeriod: 10 # terminationGracePeriodSeconds specifies the grace time between SIGTERM and SIGKILL -terminationGracePeriodSeconds: 180 # 3x SMPC__PROCESSING_TIMEOUT_SECS +terminationGracePeriodSeconds: 20 diff --git a/deploy/stage/smpcv2-0-stage/values-iris-mpc.yaml b/deploy/stage/smpcv2-0-stage/values-iris-mpc.yaml index 58dd3cf9d..79f2885a9 100644 --- a/deploy/stage/smpcv2-0-stage/values-iris-mpc.yaml +++ b/deploy/stage/smpcv2-0-stage/values-iris-mpc.yaml @@ -74,6 +74,12 @@ env: - name: SMPC__DB_CHUNKS_BUCKET_NAME value: "iris-mpc-db-exporter-store-node-0-stage-eu-north-1" + - name: SMPC__DB_CHUNKS_FOLDER_NAME + value: "binary_output_2k" + + - name: SMPC__LOAD_CHUNKS_PARALLELISM + value: "32" + - name: SMPC__CLEAR_DB_BEFORE_INIT value: "true" diff --git a/deploy/stage/smpcv2-1-stage/values-iris-mpc.yaml b/deploy/stage/smpcv2-1-stage/values-iris-mpc.yaml index d5c66effd..902c69a54 100644 --- a/deploy/stage/smpcv2-1-stage/values-iris-mpc.yaml +++ b/deploy/stage/smpcv2-1-stage/values-iris-mpc.yaml @@ -74,6 +74,12 @@ env: - name: SMPC__DB_CHUNKS_BUCKET_NAME value: "iris-mpc-db-exporter-store-node-1-stage-eu-north-1" + - name: SMPC__DB_CHUNKS_FOLDER_NAME + value: "binary_output_2k" + + - name: SMPC__LOAD_CHUNKS_PARALLELISM + value: "32" + - name: SMPC__CLEAR_DB_BEFORE_INIT value: "true" diff --git a/deploy/stage/smpcv2-2-stage/values-iris-mpc.yaml b/deploy/stage/smpcv2-2-stage/values-iris-mpc.yaml index 9d57157b6..2f497bb7c 100644 --- a/deploy/stage/smpcv2-2-stage/values-iris-mpc.yaml +++ b/deploy/stage/smpcv2-2-stage/values-iris-mpc.yaml @@ -74,6 +74,12 @@ env: - name: SMPC__DB_CHUNKS_BUCKET_NAME value: "iris-mpc-db-exporter-store-node-2-stage-eu-north-1" + - name: SMPC__DB_CHUNKS_FOLDER_NAME + value: "binary_output_2k" + + - name: SMPC__LOAD_CHUNKS_PARALLELISM + value: "32" + - name: SMPC__CLEAR_DB_BEFORE_INIT value: "true" diff --git a/iris-mpc-common/src/config/mod.rs b/iris-mpc-common/src/config/mod.rs index f1bf0e2ae..666884068 100644 --- a/iris-mpc-common/src/config/mod.rs +++ b/iris-mpc-common/src/config/mod.rs @@ -102,6 +102,9 @@ pub struct Config { /// updated during the DB export to S3 #[serde(default = "default_db_load_safety_overlap_seconds")] pub db_load_safety_overlap_seconds: i64, + + #[serde(default)] + pub db_chunks_folder_name: String, } fn default_load_chunks_parallelism() -> usize { diff --git a/iris-mpc-store/src/lib.rs b/iris-mpc-store/src/lib.rs index 32e62705e..ee3792448 100644 --- a/iris-mpc-store/src/lib.rs +++ b/iris-mpc-store/src/lib.rs @@ -1,3 +1,5 @@ +#![feature(int_roundings)] + mod s3_importer; use bytemuck::cast_slice; @@ -10,6 +12,7 @@ use iris_mpc_common::{ config::Config, galois_engine::degree4::{GaloisRingIrisCodeShare, GaloisRingTrimmedMaskCodeShare}, iris_db::iris::IrisCode, + IRIS_CODE_LENGTH, MASK_CODE_LENGTH, }; use rand::{rngs::StdRng, Rng, SeedableRng}; pub use s3_importer::{fetch_and_parse_chunks, last_snapshot_timestamp, ObjectStore, S3Store}; @@ -74,6 +77,43 @@ impl StoredIris { pub fn id(&self) -> i64 { self.id } + + pub fn from_bytes(bytes: &[u8]) -> Result { + let mut cursor = 0; + + // Helper closure to extract a slice of a given size + let extract_slice = + |bytes: &[u8], cursor: &mut usize, size: usize| -> Result, eyre::Error> { + if *cursor + size > bytes.len() { + return Err(eyre!("Exceeded total bytes while extracting slice",)); + } + let slice = &bytes[*cursor..*cursor + size]; + *cursor += size; + Ok(slice.to_vec()) + }; + + // Parse `id` (i64) + let id_bytes = extract_slice(bytes, &mut cursor, 4)?; + let id = u32::from_be_bytes( + id_bytes + .try_into() + .map_err(|_| eyre!("Failed to convert id bytes to i64"))?, + ) as i64; + + // parse codes and masks + let left_code = extract_slice(bytes, &mut cursor, IRIS_CODE_LENGTH * size_of::())?; + let left_mask = extract_slice(bytes, &mut cursor, MASK_CODE_LENGTH * size_of::())?; + let right_code = extract_slice(bytes, &mut cursor, IRIS_CODE_LENGTH * size_of::())?; + let right_mask = extract_slice(bytes, &mut cursor, MASK_CODE_LENGTH * size_of::())?; + + Ok(StoredIris { + id, + left_code, + left_mask, + right_code, + right_mask, + }) + } } #[derive(Clone)] diff --git a/iris-mpc-store/src/s3_importer.rs b/iris-mpc-store/src/s3_importer.rs index 0ec5f4ab8..896b4af64 100644 --- a/iris-mpc-store/src/s3_importer.rs +++ b/iris-mpc-store/src/s3_importer.rs @@ -4,20 +4,17 @@ use aws_sdk_s3::Client; use bytes::Bytes; use futures::{stream, Stream, StreamExt}; use iris_mpc_common::{IRIS_CODE_LENGTH, MASK_CODE_LENGTH}; -use rayon::{iter::ParallelIterator, prelude::ParallelBridge}; -use serde::Deserialize; -use std::{io::Cursor, mem, pin::Pin, sync::Arc}; +use std::{mem, pin::Pin, sync::Arc, time::Instant}; use tokio::task; const SINGLE_ELEMENT_SIZE: usize = IRIS_CODE_LENGTH * mem::size_of::() * 2 + MASK_CODE_LENGTH * mem::size_of::() * 2 + mem::size_of::(); // 75 KB -const CSV_BUFFER_CAPACITY: usize = SINGLE_ELEMENT_SIZE * 10; #[async_trait] pub trait ObjectStore: Send + Sync + 'static { async fn get_object(&self, key: &str) -> eyre::Result; - async fn list_objects(&self) -> eyre::Result>; + async fn list_objects(&self, prefix: &str) -> eyre::Result>; } pub struct S3Store { @@ -46,12 +43,16 @@ impl ObjectStore for S3Store { Ok(data.into_bytes()) } - async fn list_objects(&self) -> eyre::Result> { + async fn list_objects(&self, prefix: &str) -> eyre::Result> { let mut objects = Vec::new(); let mut continuation_token = None; loop { - let mut request = self.client.list_objects_v2().bucket(&self.bucket); + let mut request = self + .client + .list_objects_v2() + .bucket(&self.bucket) + .prefix(prefix); if let Some(token) = continuation_token { request = request.continuation_token(token); @@ -76,101 +77,98 @@ impl ObjectStore for S3Store { } } -#[derive(Debug, Deserialize)] -struct CsvIrisRecord { - id: String, - left_code: String, - left_mask: String, - right_code: String, - right_mask: String, +#[derive(Debug)] +pub struct LastSnapshotDetails { + pub timestamp: i64, + pub last_serial_id: i64, + pub chunk_size: i64, } -fn hex_to_bytes(hex: &str, byte_len: usize) -> eyre::Result> { - if hex.is_empty() { - return Ok(vec![]); +impl LastSnapshotDetails { + // Parse last snapshot from s3 file name. + // It is in {unixTime}_{batchSize}_{lastSerialId} format. + pub fn new_from_str(last_snapshot_str: &str) -> Option { + let parts: Vec<&str> = last_snapshot_str.split('_').collect(); + match parts.len() { + 3 => Some(Self { + timestamp: parts[0].parse().unwrap(), + chunk_size: parts[1].parse().unwrap(), + last_serial_id: parts[2].parse().unwrap(), + }), + _ => { + tracing::warn!("Invalid export timestamp file name: {}", last_snapshot_str); + None + } + } } - let mut bytes = vec![0; byte_len]; - hex::decode_to_slice(hex, &mut bytes)?; - Ok(bytes) } -pub async fn last_snapshot_timestamp(store: &impl ObjectStore) -> eyre::Result { +pub async fn last_snapshot_timestamp( + store: &impl ObjectStore, + prefix_name: String, +) -> eyre::Result { + tracing::info!("Looking for last snapshot time in prefix: {}", prefix_name); + let timestamps_path = format!("{}/timestamps/", prefix_name); store - .list_objects() + .list_objects(timestamps_path.as_str()) .await? .into_iter() - .filter(|f| f.starts_with("output/") && f.ends_with(".timestamp")) - .filter_map(|f| { - f.replace(".timestamp", "") - .replace("output/", "") - .parse::() - .ok() + .filter_map(|f| match f.split('/').last() { + Some(file_name) => LastSnapshotDetails::new_from_str(file_name), + _ => None, }) - .max() + .max_by_key(|s| s.timestamp) .ok_or_else(|| eyre::eyre!("No snapshot found")) } pub async fn fetch_and_parse_chunks( store: &impl ObjectStore, concurrency: usize, + prefix_name: String, + last_snapshot_details: LastSnapshotDetails, ) -> Pin> + Send + '_>> { - let chunks = store.list_objects().await.unwrap(); - stream::iter(chunks) - .filter_map(|chunk| async move { - if chunk.ends_with(".csv") { - tracing::info!("Processing chunk: {}", chunk); - Some(chunk) - } else { - None - } - }) + tracing::info!("Generating chunk files using: {:?}", last_snapshot_details); + let chunks: Vec = (1..=last_snapshot_details.last_serial_id) + .step_by(last_snapshot_details.chunk_size as usize) + .map(|num| format!("{}/{}.bin", prefix_name, num)) + .collect(); + tracing::info!("Generated {} chunk names", chunks.len()); + + let result_stream = stream::iter(chunks) .map(move |chunk| async move { + let mut now = Instant::now(); let result = store.get_object(&chunk).await?; - task::spawn_blocking(move || { - let cursor = Cursor::new(result); - let reader = csv::ReaderBuilder::new() - .has_headers(true) - .buffer_capacity(CSV_BUFFER_CAPACITY) - .from_reader(cursor); - - let records: Vec> = reader - .into_deserialize() - .par_bridge() - .map(|r: Result| { - let raw = r.map_err(|e| eyre::eyre!("CSV parse error: {}", e))?; - - Ok(StoredIris { - id: raw.id.parse()?, - left_code: hex_to_bytes( - &raw.left_code, - IRIS_CODE_LENGTH * mem::size_of::(), - )?, - left_mask: hex_to_bytes( - &raw.left_mask, - MASK_CODE_LENGTH * mem::size_of::(), - )?, - right_code: hex_to_bytes( - &raw.right_code, - IRIS_CODE_LENGTH * mem::size_of::(), - )?, - right_mask: hex_to_bytes( - &raw.right_mask, - MASK_CODE_LENGTH * mem::size_of::(), - )?, - }) - }) - .collect(); + let get_object_time = now.elapsed(); + tracing::info!("Got chunk object: {} in {:?}", chunk, get_object_time,); + + now = Instant::now(); + let task = task::spawn_blocking(move || { + let n_records = result.len().div_floor(SINGLE_ELEMENT_SIZE); + + let mut records = Vec::with_capacity(n_records); + for i in 0..n_records { + let start = i * SINGLE_ELEMENT_SIZE; + let end = (i + 1) * SINGLE_ELEMENT_SIZE; + let chunk = &result[start..end]; + let iris = StoredIris::from_bytes(chunk); + records.push(iris); + } Ok::<_, eyre::Error>(stream::iter(records)) }) - .await? + .await?; + let parse_time = now.elapsed(); + tracing::info!("Parsed chunk: {} in {:?}", chunk, parse_time,); + task }) .buffer_unordered(concurrency) .flat_map(|result| match result { Ok(stream) => stream.boxed(), Err(e) => stream::once(async move { Err(e) }).boxed(), }) - .boxed() + .boxed(); + + result_stream } #[cfg(test)] @@ -189,27 +187,20 @@ mod tests { Self::default() } + pub fn add_timestamp_file(&mut self, key: &str) { + self.objects.insert(key.to_string(), Vec::new()); + } + pub fn add_test_data(&mut self, key: &str, records: Vec) { - let mut csv = Vec::new(); - { - let mut writer = csv::Writer::from_writer(&mut csv); - writer - .write_record(["id", "left_code", "left_mask", "right_code", "right_mask"]) - .unwrap(); - - for record in records { - writer - .write_record(&[ - record.id.to_string(), - hex::encode(record.left_code), - hex::encode(record.left_mask), - hex::encode(record.right_code), - hex::encode(record.right_mask), - ]) - .unwrap(); - } + let mut result = Vec::new(); + for record in records { + result.extend_from_slice(&(record.id as u32).to_be_bytes()); + result.extend_from_slice(&record.left_code); + result.extend_from_slice(&record.left_mask); + result.extend_from_slice(&record.right_code); + result.extend_from_slice(&record.right_mask); } - self.objects.insert(key.to_string(), csv); + self.objects.insert(key.to_string(), result); } } @@ -223,7 +214,7 @@ mod tests { .ok_or_else(|| eyre::eyre!("Object not found: {}", key)) } - async fn list_objects(&self) -> eyre::Result> { + async fn list_objects(&self, _: &str) -> eyre::Result> { Ok(self.objects.keys().cloned().collect()) } } @@ -245,6 +236,21 @@ mod tests { } } + #[tokio::test] + async fn test_last_snapshot_timestamp() { + let mut store = MockStore::new(); + store.add_timestamp_file("out/timestamps/123_100_954"); + store.add_timestamp_file("out/timestamps/124_100_958"); + store.add_timestamp_file("out/timestamps/125_100_958"); + + let last_snapshot = last_snapshot_timestamp(&store, "out".to_string()) + .await + .unwrap(); + assert_eq!(last_snapshot.timestamp, 125); + assert_eq!(last_snapshot.last_serial_id, 958); + assert_eq!(last_snapshot.chunk_size, 100); + } + #[tokio::test] async fn test_fetch_and_parse_chunks() { const MOCK_ENTRIES: usize = 107; @@ -255,17 +261,19 @@ mod tests { let start_serial_id = i * MOCK_CHUNK_SIZE + 1; let end_serial_id = min((i + 1) * MOCK_CHUNK_SIZE, MOCK_ENTRIES); store.add_test_data( - &format!("{start_serial_id}.csv"), + &format!("out/{start_serial_id}.bin"), (start_serial_id..=end_serial_id).map(dummy_entry).collect(), ); } - assert_eq!( - store.list_objects().await.unwrap().len(), - MOCK_ENTRIES.div_ceil(MOCK_CHUNK_SIZE) - ); - - let mut chunks = fetch_and_parse_chunks(&store, 1).await; + assert_eq!(store.list_objects("").await.unwrap().len(), n_chunks); + let last_snapshot_details = LastSnapshotDetails { + timestamp: 0, + last_serial_id: MOCK_ENTRIES as i64, + chunk_size: MOCK_CHUNK_SIZE as i64, + }; + let mut chunks = + fetch_and_parse_chunks(&store, 1, "out".to_string(), last_snapshot_details).await; let mut count = 0; let mut ids: HashSet = HashSet::from_iter(1..MOCK_ENTRIES); while let Some(chunk) = chunks.next().await { diff --git a/iris-mpc/src/bin/server.rs b/iris-mpc/src/bin/server.rs index cf60621eb..6b7208ccf 100644 --- a/iris-mpc/src/bin/server.rs +++ b/iris-mpc/src/bin/server.rs @@ -905,6 +905,7 @@ async fn server_main(config: Config) -> eyre::Result<()> { let load_chunks_parallelism = config.load_chunks_parallelism; let db_chunks_bucket_name = config.db_chunks_bucket_name.clone(); + let db_chunks_folder_name = config.db_chunks_folder_name.clone(); let (tx, rx) = oneshot::channel(); background_tasks.spawn_blocking(move || { @@ -989,15 +990,27 @@ async fn server_main(config: Config) -> eyre::Result<()> { true => { tracing::info!("S3 importer enabled. Fetching from s3 + db"); // First fetch last snapshot from S3 - let last_snapshot_timestamp = - last_snapshot_timestamp(&s3_store).await?; - let min_last_modified_at = - last_snapshot_timestamp - config.db_load_safety_overlap_seconds; - let stream_s3 = - fetch_and_parse_chunks(&s3_store, load_chunks_parallelism) - .await - .map(|result| result.map(IrisSource::S3)) - .boxed(); + let last_snapshot_details = last_snapshot_timestamp( + &s3_store, + db_chunks_folder_name.clone(), + ) + .await?; + let min_last_modified_at = last_snapshot_details.timestamp + - config.db_load_safety_overlap_seconds; + tracing::info!( + "Last snapshot timestamp: {}, min_last_modified_at: {}", + last_snapshot_details.timestamp, + min_last_modified_at + ); + let stream_s3 = fetch_and_parse_chunks( + &s3_store, + load_chunks_parallelism, + db_chunks_folder_name, + last_snapshot_details, + ) + .await + .map(|result| result.map(IrisSource::S3)) + .boxed(); let stream_db = store .stream_irises_par(Some(min_last_modified_at), parallelism)