Skip to content

Commit

Permalink
add timing around get_object and parse csv
Browse files Browse the repository at this point in the history
  • Loading branch information
eaypek-tfh committed Dec 9, 2024
1 parent 8932e93 commit 301b904
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 5 deletions.
45 changes: 45 additions & 0 deletions .github/workflows/temp-branch-build-and-push.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
name: Branch - Build and push docker image

on:
push:

concurrency:
group: '${{ github.workflow }} @ ${{ github.event.pull_request.head.label || github.head_ref || github.ref }}'
cancel-in-progress: true

env:
REGISTRY: ghcr.io
IMAGE_NAME: ${{ github.repository }}

jobs:
docker:
runs-on:
labels: ubuntu-22.04-64core
permissions:
packages: write
contents: read
attestations: write
id-token: write
steps:
- name: Checkout
uses: actions/checkout@v4
- name: Set up QEMU
uses: docker/setup-qemu-action@v3
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Log in to the Container registry
uses: docker/login-action@v3
with:
registry: ${{ env.REGISTRY }}
username: ${{ github.repository_owner }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Build and Push
uses: docker/build-push-action@v6
with:
context: .
push: true
tags: |
${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:${{ github.sha }}
platforms: linux/amd64
cache-from: type=gha
cache-to: type=gha,mode=max
30 changes: 25 additions & 5 deletions iris-mpc-store/src/s3_importer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use futures::{stream, Stream, StreamExt};
use iris_mpc_common::{IRIS_CODE_LENGTH, MASK_CODE_LENGTH};
use rayon::{iter::ParallelIterator, prelude::ParallelBridge};
use serde::Deserialize;
use std::{io::Cursor, mem, pin::Pin, sync::Arc};
use std::{io::Cursor, mem, pin::Pin, sync::Arc, time, time::Instant};
use tokio::task;

const SINGLE_ELEMENT_SIZE: usize = IRIS_CODE_LENGTH * mem::size_of::<u16>() * 2
Expand Down Expand Up @@ -115,7 +115,10 @@ pub async fn fetch_and_parse_chunks(
concurrency: usize,
) -> Pin<Box<dyn Stream<Item = eyre::Result<StoredIris>> + Send + '_>> {
let chunks = store.list_objects().await.unwrap();
stream::iter(chunks)
let mut total_get_object_time = time::Duration::from_secs(0);
let mut total_csv_parse_time = time::Duration::from_secs(0);

let result_stream = stream::iter(chunks)
.filter_map(|chunk| async move {
if chunk.ends_with(".csv") {
tracing::info!("Processing chunk: {}", chunk);
Expand All @@ -125,8 +128,14 @@ pub async fn fetch_and_parse_chunks(
}
})
.map(move |chunk| async move {
let mut now = Instant::now();
let result = store.get_object(&chunk).await?;
task::spawn_blocking(move || {
let get_object_time = now.elapsed();
tracing::info!("Got chunk object: {} in {:?}", chunk, get_object_time,);
total_get_object_time += get_object_time;

now = Instant::now();
let task = task::spawn_blocking(move || {
let cursor = Cursor::new(result);
let reader = csv::ReaderBuilder::new()
.has_headers(true)
Expand Down Expand Up @@ -163,14 +172,25 @@ pub async fn fetch_and_parse_chunks(

Ok::<_, eyre::Error>(stream::iter(records))
})
.await?
.await?;
let csv_parse_time = now.elapsed();
tracing::info!("Parsed csv chunk: {} in {:?}", chunk, csv_parse_time,);
total_csv_parse_time += csv_parse_time;
task
})
.buffer_unordered(concurrency)
.flat_map(|result| match result {
Ok(stream) => stream.boxed(),
Err(e) => stream::once(async move { Err(e) }).boxed(),
})
.boxed()
.boxed();
tracing::info!(
"fetch_and_parse_chunks summary => Total get_object time: {:?}, Total csv parse time: {:?}",
total_get_object_time,
total_csv_parse_time,
);

result_stream
}

#[cfg(test)]
Expand Down

0 comments on commit 301b904

Please sign in to comment.