Skip to content

Commit

Permalink
Improve load time (mpsc channel, post page-lock, directory buckets) (#…
Browse files Browse the repository at this point in the history
…968)

improve load time with mpsc, post page lock, directory bucket
  • Loading branch information
eaypek-tfh authored Jan 28, 2025
1 parent 4d0d543 commit 021fc02
Show file tree
Hide file tree
Showing 10 changed files with 316 additions and 288 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/temp-branch-build-and-push.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name: Branch - Build and push docker image
on:
push:
branches:
- "ps/buckets"
- "test-benchmark"

concurrency:
group: '${{ github.workflow }} @ ${{ github.event.pull_request.head.label || github.head_ref || github.ref }}'
Expand Down
30 changes: 15 additions & 15 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion deploy/stage/common-values-iris-mpc.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
image: "ghcr.io/worldcoin/iris-mpc:v0.13.23"
image: "ghcr.io/worldcoin/iris-mpc:v0.13.24"

environment: stage
replicaCount: 1
Expand Down
7 changes: 5 additions & 2 deletions deploy/stage/smpcv2-0-stage/values-iris-mpc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,16 @@ env:
value: "true"

- name: SMPC__DB_CHUNKS_BUCKET_NAME
value: "iris-mpc-db-exporter-store-node-0-stage-eu-north-1"
value: "iris-mpc-db-exporter-store-node-0-stage--eun1-az3--x-s3"

- name: SMPC__DB_CHUNKS_FOLDER_NAME
value: "even_odd_binary_output_16k"

- name: SMPC__LOAD_CHUNKS_PARALLELISM
value: "32"
value: "64"

- name: SMPC__LOAD_CHUNKS_BUFFER_SIZE
value: "1024"

- name: SMPC__CLEAR_DB_BEFORE_INIT
value: "true"
Expand Down
7 changes: 5 additions & 2 deletions deploy/stage/smpcv2-1-stage/values-iris-mpc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,16 @@ env:
value: "true"

- name: SMPC__DB_CHUNKS_BUCKET_NAME
value: "iris-mpc-db-exporter-store-node-1-stage-eu-north-1"
value: "iris-mpc-db-exporter-store-node-1-stage--eun1-az3--x-s3"

- name: SMPC__DB_CHUNKS_FOLDER_NAME
value: "even_odd_binary_output_16k"

- name: SMPC__LOAD_CHUNKS_PARALLELISM
value: "32"
value: "64"

- name: SMPC__LOAD_CHUNKS_BUFFER_SIZE
value: "1024"

- name: SMPC__CLEAR_DB_BEFORE_INIT
value: "true"
Expand Down
7 changes: 5 additions & 2 deletions deploy/stage/smpcv2-2-stage/values-iris-mpc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,17 @@ env:
value: "true"

- name: SMPC__DB_CHUNKS_BUCKET_NAME
value: "iris-mpc-db-exporter-store-node-2-stage-eu-north-1"
value: "iris-mpc-db-exporter-store-node-2-stage--eun1-az3--x-s3"

- name: SMPC__DB_CHUNKS_FOLDER_NAME
value: "even_odd_binary_output_16k"

- name: SMPC__LOAD_CHUNKS_PARALLELISM
value: "32"
value: "64"

- name: SMPC__LOAD_CHUNKS_BUFFER_SIZE
value: "1024"

- name: SMPC__CLEAR_DB_BEFORE_INIT
value: "true"

Expand Down
3 changes: 3 additions & 0 deletions iris-mpc-common/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ pub struct Config {

#[serde(default = "default_n_buckets")]
pub n_buckets: usize,

#[serde(default)]
pub load_chunks_buffer_size: usize,
}

fn default_load_chunks_parallelism() -> usize {
Expand Down
42 changes: 12 additions & 30 deletions iris-mpc-store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

mod s3_importer;

use crate::s3_importer::S3StoredIris;
use bytemuck::cast_slice;
use eyre::{eyre, Result};
use futures::{
Expand All @@ -15,7 +14,9 @@ use iris_mpc_common::{
iris_db::iris::IrisCode,
};
use rand::{rngs::StdRng, Rng, SeedableRng};
pub use s3_importer::{fetch_and_parse_chunks, last_snapshot_timestamp, ObjectStore, S3Store};
pub use s3_importer::{
fetch_and_parse_chunks, last_snapshot_timestamp, ObjectStore, S3Store, S3StoredIris,
};
use sqlx::{
migrate::Migrator, postgres::PgPoolOptions, Executor, PgPool, Postgres, Row, Transaction,
};
Expand Down Expand Up @@ -186,7 +187,7 @@ impl Store {
&self,
min_last_modified_at: Option<i64>,
partitions: usize,
) -> impl Stream<Item = eyre::Result<StoredIris>> + '_ {
) -> impl Stream<Item = eyre::Result<DbStoredIris>> + '_ {
let count = self.count_irises().await.expect("Failed count_irises");
let partition_size = count.div_ceil(partitions).max(1);

Expand All @@ -197,37 +198,30 @@ impl Store {
let end_id = start_id + partition_size - 1;

// This base query yields `DbStoredIris`
let base_stream = match min_last_modified_at {
let stream = match min_last_modified_at {
Some(min_last_modified_at) => sqlx::query_as::<_, DbStoredIris>(
"SELECT id, left_code, left_mask, right_code, right_mask FROM irises WHERE id \
BETWEEN $1 AND $2 AND last_modified_at >= $3",
)
.bind(start_id as i64)
.bind(end_id as i64)
.bind(min_last_modified_at)
.fetch(&self.pool),
.fetch(&self.pool)
.map_err(Into::into),
None => sqlx::query_as::<_, DbStoredIris>(
"SELECT id, left_code, left_mask, right_code, right_mask FROM irises WHERE id \
BETWEEN $1 AND $2",
)
.bind(start_id as i64)
.bind(end_id as i64)
.fetch(&self.pool),
};

// Convert `Stream<Item = Result<DbStoredIris, sqlx::Error>>`
// -> `Stream<Item = eyre::Result<DbStoredIris>>` using map_err,
// -> then map_ok(StoredIris::Db) to unify the output type:
let partition_stream = base_stream
.map_err(Into::into) // `sqlx::Error` -> `eyre::Error`
.map_ok(StoredIris::DB) // `DbStoredIris` -> `StoredIris::Db(...)`
.boxed();
.fetch(&self.pool)
.map_err(Into::into),
}
.boxed();

partition_streams.push(partition_stream);
partition_streams.push(stream);
}

// `select_all` requires that all streams have the same Item type:
// which is `Result<StoredIris, eyre::Error>` now.
stream::select_all(partition_streams)
}

Expand Down Expand Up @@ -548,10 +542,6 @@ mod tests {
let got: Vec<DbStoredIris> = store
.stream_irises_par(Some(0), 2)
.await
.map_ok(|stored_iris| match stored_iris {
StoredIris::DB(db_iris) => db_iris,
StoredIris::S3(_) => panic!("Unexpected S3 variant in this test!"),
})
.try_collect()
.await?;
assert_eq!(got.len(), 0);
Expand Down Expand Up @@ -591,10 +581,6 @@ mod tests {
let mut got_par: Vec<DbStoredIris> = store
.stream_irises_par(Some(0), 2)
.await
.map_ok(|stored_iris| match stored_iris {
StoredIris::DB(db_iris) => db_iris,
StoredIris::S3(_) => panic!("Unexpected S3 variant in this test!"),
})
.try_collect()
.await?;
got_par.sort_by_key(|iris| iris.id);
Expand Down Expand Up @@ -676,10 +662,6 @@ mod tests {
let mut got_par: Vec<DbStoredIris> = store
.stream_irises_par(Some(0), parallelism)
.await
.map_ok(|stored_iris| match stored_iris {
StoredIris::DB(db_iris) => db_iris,
StoredIris::S3(_) => panic!("Unexpected S3 variant in this test!"),
})
.try_collect()
.await?;
got_par.sort_by_key(|iris| iris.id);
Expand Down
Loading

0 comments on commit 021fc02

Please sign in to comment.