diff --git a/.github/workflows/temp-branch-build-and-push.yaml b/.github/workflows/temp-branch-build-and-push.yaml new file mode 100644 index 000000000..f2dd284f9 --- /dev/null +++ b/.github/workflows/temp-branch-build-and-push.yaml @@ -0,0 +1,45 @@ +name: Branch - Build and push docker image + +on: + push: + +concurrency: + group: '${{ github.workflow }} @ ${{ github.event.pull_request.head.label || github.head_ref || github.ref }}' + cancel-in-progress: true + +env: + REGISTRY: ghcr.io + IMAGE_NAME: ${{ github.repository }} + +jobs: + docker: + runs-on: + labels: ubuntu-22.04-64core + permissions: + packages: write + contents: read + attestations: write + id-token: write + steps: + - name: Checkout + uses: actions/checkout@v4 + - name: Set up QEMU + uses: docker/setup-qemu-action@v3 + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 + - name: Log in to the Container registry + uses: docker/login-action@v3 + with: + registry: ${{ env.REGISTRY }} + username: ${{ github.repository_owner }} + password: ${{ secrets.GITHUB_TOKEN }} + - name: Build and Push + uses: docker/build-push-action@v6 + with: + context: . + push: true + tags: | + ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:${{ github.sha }} + platforms: linux/amd64 + cache-from: type=gha + cache-to: type=gha,mode=max diff --git a/iris-mpc-store/src/s3_importer.rs b/iris-mpc-store/src/s3_importer.rs index 0ec5f4ab8..d1981e81b 100644 --- a/iris-mpc-store/src/s3_importer.rs +++ b/iris-mpc-store/src/s3_importer.rs @@ -6,7 +6,7 @@ use futures::{stream, Stream, StreamExt}; use iris_mpc_common::{IRIS_CODE_LENGTH, MASK_CODE_LENGTH}; use rayon::{iter::ParallelIterator, prelude::ParallelBridge}; use serde::Deserialize; -use std::{io::Cursor, mem, pin::Pin, sync::Arc}; +use std::{io::Cursor, mem, pin::Pin, sync::Arc, time, time::Instant}; use tokio::task; const SINGLE_ELEMENT_SIZE: usize = IRIS_CODE_LENGTH * mem::size_of::() * 2 @@ -115,7 +115,10 @@ pub async fn fetch_and_parse_chunks( concurrency: usize, ) -> Pin> + Send + '_>> { let chunks = store.list_objects().await.unwrap(); - stream::iter(chunks) + let mut total_get_object_time = time::Duration::from_secs(0); + let mut total_csv_parse_time = time::Duration::from_secs(0); + + let result_stream = stream::iter(chunks) .filter_map(|chunk| async move { if chunk.ends_with(".csv") { tracing::info!("Processing chunk: {}", chunk); @@ -125,8 +128,14 @@ pub async fn fetch_and_parse_chunks( } }) .map(move |chunk| async move { + let mut now = Instant::now(); let result = store.get_object(&chunk).await?; - task::spawn_blocking(move || { + let get_object_time = now.elapsed(); + tracing::info!("Got chunk object: {} in {:?}", chunk, get_object_time,); + total_get_object_time += get_object_time; + + now = Instant::now(); + let task = task::spawn_blocking(move || { let cursor = Cursor::new(result); let reader = csv::ReaderBuilder::new() .has_headers(true) @@ -163,14 +172,25 @@ pub async fn fetch_and_parse_chunks( Ok::<_, eyre::Error>(stream::iter(records)) }) - .await? + .await?; + let csv_parse_time = now.elapsed(); + tracing::info!("Parsed csv chunk: {} in {:?}", chunk, csv_parse_time,); + total_csv_parse_time += csv_parse_time; + task }) .buffer_unordered(concurrency) .flat_map(|result| match result { Ok(stream) => stream.boxed(), Err(e) => stream::once(async move { Err(e) }).boxed(), }) - .boxed() + .boxed(); + tracing::info!( + "fetch_and_parse_chunks summary => Total get_object time: {:?}, Total csv parse time: {:?}", + total_get_object_time, + total_csv_parse_time, + ); + + result_stream } #[cfg(test)]