From 2a5c7a20b9f7cffccab773b3db007d9a9d85c39a Mon Sep 17 00:00:00 2001 From: Ertugrul Aypek Date: Mon, 9 Dec 2024 19:09:53 +0100 Subject: [PATCH] convert to binary --- iris-mpc-store/src/lib.rs | 39 +++++++++++++ iris-mpc-store/src/s3_importer.rs | 93 +++++++------------------------ 2 files changed, 58 insertions(+), 74 deletions(-) diff --git a/iris-mpc-store/src/lib.rs b/iris-mpc-store/src/lib.rs index c0d504674..66db5ec06 100644 --- a/iris-mpc-store/src/lib.rs +++ b/iris-mpc-store/src/lib.rs @@ -1,3 +1,5 @@ +#![feature(int_roundings)] + mod s3_importer; use bytemuck::cast_slice; @@ -74,6 +76,43 @@ impl StoredIris { pub fn id(&self) -> i64 { self.id } + + pub fn from_bytes(bytes: &[u8]) -> Result { + let mut cursor = 0; + + // Helper closure to extract a slice of a given size + let extract_slice = + |bytes: &[u8], cursor: &mut usize, size: usize| -> Result, eyre::Error> { + if *cursor + size > bytes.len() { + return Err(eyre!("Exceeded total bytes while extracting slice",)); + } + let slice = &bytes[*cursor..*cursor + size]; + *cursor += size; + Ok(slice.to_vec()) + }; + + // Parse `id` (i64) + let id_bytes = extract_slice(bytes, &mut cursor, 4)?; + let id = i64::from_be_bytes( + id_bytes + .try_into() + .map_err(|_| eyre!("Failed to convert id bytes to i64"))?, + ); + + // parse codes and masks + let left_code = extract_slice(bytes, &mut cursor, 25_600)?; + let left_mask = extract_slice(bytes, &mut cursor, 12_800)?; + let right_code = extract_slice(bytes, &mut cursor, 25_600)?; + let right_mask = extract_slice(bytes, &mut cursor, 12_800)?; + + Ok(StoredIris { + id, + left_code, + left_mask, + right_code, + right_mask, + }) + } } #[derive(Clone)] diff --git a/iris-mpc-store/src/s3_importer.rs b/iris-mpc-store/src/s3_importer.rs index d1981e81b..defabdd42 100644 --- a/iris-mpc-store/src/s3_importer.rs +++ b/iris-mpc-store/src/s3_importer.rs @@ -1,22 +1,18 @@ use crate::StoredIris; use async_trait::async_trait; use aws_sdk_s3::Client; -use bytes::Bytes; use futures::{stream, Stream, StreamExt}; use iris_mpc_common::{IRIS_CODE_LENGTH, MASK_CODE_LENGTH}; -use rayon::{iter::ParallelIterator, prelude::ParallelBridge}; -use serde::Deserialize; -use std::{io::Cursor, mem, pin::Pin, sync::Arc, time, time::Instant}; +use std::{mem, pin::Pin, sync::Arc, time::Instant}; use tokio::task; const SINGLE_ELEMENT_SIZE: usize = IRIS_CODE_LENGTH * mem::size_of::() * 2 + MASK_CODE_LENGTH * mem::size_of::() * 2 + mem::size_of::(); // 75 KB -const CSV_BUFFER_CAPACITY: usize = SINGLE_ELEMENT_SIZE * 10; #[async_trait] pub trait ObjectStore: Send + Sync + 'static { - async fn get_object(&self, key: &str) -> eyre::Result; + async fn get_object(&self, key: &str) -> eyre::Result>; async fn list_objects(&self) -> eyre::Result>; } @@ -33,7 +29,7 @@ impl S3Store { #[async_trait] impl ObjectStore for S3Store { - async fn get_object(&self, key: &str) -> eyre::Result { + async fn get_object(&self, key: &str) -> eyre::Result> { let result = self .client .get_object() @@ -43,7 +39,7 @@ impl ObjectStore for S3Store { .await?; let data = result.body.collect().await?; - Ok(data.into_bytes()) + Ok(data.to_vec()) } async fn list_objects(&self) -> eyre::Result> { @@ -76,24 +72,6 @@ impl ObjectStore for S3Store { } } -#[derive(Debug, Deserialize)] -struct CsvIrisRecord { - id: String, - left_code: String, - left_mask: String, - right_code: String, - right_mask: String, -} - -fn hex_to_bytes(hex: &str, byte_len: usize) -> eyre::Result> { - if hex.is_empty() { - return Ok(vec![]); - } - let mut bytes = vec![0; byte_len]; - hex::decode_to_slice(hex, &mut bytes)?; - Ok(bytes) -} - pub async fn last_snapshot_timestamp(store: &impl ObjectStore) -> eyre::Result { store .list_objects() @@ -115,12 +93,10 @@ pub async fn fetch_and_parse_chunks( concurrency: usize, ) -> Pin> + Send + '_>> { let chunks = store.list_objects().await.unwrap(); - let mut total_get_object_time = time::Duration::from_secs(0); - let mut total_csv_parse_time = time::Duration::from_secs(0); let result_stream = stream::iter(chunks) .filter_map(|chunk| async move { - if chunk.ends_with(".csv") { + if chunk.ends_with(".bin") { tracing::info!("Processing chunk: {}", chunk); Some(chunk) } else { @@ -132,50 +108,25 @@ pub async fn fetch_and_parse_chunks( let result = store.get_object(&chunk).await?; let get_object_time = now.elapsed(); tracing::info!("Got chunk object: {} in {:?}", chunk, get_object_time,); - total_get_object_time += get_object_time; now = Instant::now(); let task = task::spawn_blocking(move || { - let cursor = Cursor::new(result); - let reader = csv::ReaderBuilder::new() - .has_headers(true) - .buffer_capacity(CSV_BUFFER_CAPACITY) - .from_reader(cursor); - - let records: Vec> = reader - .into_deserialize() - .par_bridge() - .map(|r: Result| { - let raw = r.map_err(|e| eyre::eyre!("CSV parse error: {}", e))?; - - Ok(StoredIris { - id: raw.id.parse()?, - left_code: hex_to_bytes( - &raw.left_code, - IRIS_CODE_LENGTH * mem::size_of::(), - )?, - left_mask: hex_to_bytes( - &raw.left_mask, - MASK_CODE_LENGTH * mem::size_of::(), - )?, - right_code: hex_to_bytes( - &raw.right_code, - IRIS_CODE_LENGTH * mem::size_of::(), - )?, - right_mask: hex_to_bytes( - &raw.right_mask, - MASK_CODE_LENGTH * mem::size_of::(), - )?, - }) - }) - .collect(); + let n_records = result.len().div_floor(SINGLE_ELEMENT_SIZE); + + let mut records = Vec::with_capacity(n_records); + for i in 0..n_records { + let start = i * SINGLE_ELEMENT_SIZE; + let end = (i + 1) * SINGLE_ELEMENT_SIZE; + let chunk = &result[start..end]; + let iris = StoredIris::from_bytes(chunk); + records.push(iris); + } Ok::<_, eyre::Error>(stream::iter(records)) }) .await?; - let csv_parse_time = now.elapsed(); - tracing::info!("Parsed csv chunk: {} in {:?}", chunk, csv_parse_time,); - total_csv_parse_time += csv_parse_time; + let parse_time = now.elapsed(); + tracing::info!("Parsed chunk: {} in {:?}", chunk, parse_time,); task }) .buffer_unordered(concurrency) @@ -184,11 +135,6 @@ pub async fn fetch_and_parse_chunks( Err(e) => stream::once(async move { Err(e) }).boxed(), }) .boxed(); - tracing::info!( - "fetch_and_parse_chunks summary => Total get_object time: {:?}, Total csv parse time: {:?}", - total_get_object_time, - total_csv_parse_time, - ); result_stream } @@ -235,12 +181,11 @@ mod tests { #[async_trait] impl ObjectStore for MockStore { - async fn get_object(&self, key: &str) -> eyre::Result { + async fn get_object(&self, key: &str) -> eyre::Result> { self.objects .get(key) - .cloned() - .map(Bytes::from) .ok_or_else(|| eyre::eyre!("Object not found: {}", key)) + .cloned() } async fn list_objects(&self) -> eyre::Result> {