Skip to content

Commit

Permalink
add db import too
Browse files Browse the repository at this point in the history
  • Loading branch information
eaypek-tfh committed Jan 23, 2025
1 parent 6b5f506 commit afc96f2
Show file tree
Hide file tree
Showing 3 changed files with 210 additions and 139 deletions.
2 changes: 1 addition & 1 deletion deploy/stage/common-values-iris-mpc.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
image: "ghcr.io/worldcoin/iris-mpc:59eb7e90ffaa5c63d395c080ec09a6909efdc398"
image: "ghcr.io/worldcoin/iris-mpc:dc77f753b0b19d9c672ebe70440158f58e213467"

environment: stage
replicaCount: 1
Expand Down
35 changes: 9 additions & 26 deletions iris-mpc-store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ impl Store {
&self,
min_last_modified_at: Option<i64>,
partitions: usize,
) -> impl Stream<Item = eyre::Result<StoredIris>> + '_ {
) -> impl Stream<Item = eyre::Result<DbStoredIris>> + '_ {
let count = self.count_irises().await.expect("Failed count_irises");
let partition_size = count.div_ceil(partitions).max(1);

Expand All @@ -199,33 +199,28 @@ impl Store {
let end_id = start_id + partition_size - 1;

// This base query yields `DbStoredIris`
let base_stream = match min_last_modified_at {
let stream = match min_last_modified_at {
Some(min_last_modified_at) => sqlx::query_as::<_, DbStoredIris>(
"SELECT id, left_code, left_mask, right_code, right_mask FROM irises WHERE id \
BETWEEN $1 AND $2 AND last_modified_at >= $3",
)
.bind(start_id as i64)
.bind(end_id as i64)
.bind(min_last_modified_at)
.fetch(&self.pool),
.fetch(&self.pool)
.map_err(Into::into),
None => sqlx::query_as::<_, DbStoredIris>(
"SELECT id, left_code, left_mask, right_code, right_mask FROM irises WHERE id \
BETWEEN $1 AND $2",
)
.bind(start_id as i64)
.bind(end_id as i64)
.fetch(&self.pool),
};

// Convert `Stream<Item = Result<DbStoredIris, sqlx::Error>>`
// -> `Stream<Item = eyre::Result<DbStoredIris>>` using map_err,
// -> then map_ok(StoredIris::Db) to unify the output type:
let partition_stream = base_stream
.map_err(Into::into) // `sqlx::Error` -> `eyre::Error`
.map_ok(StoredIris::DB) // `DbStoredIris` -> `StoredIris::Db(...)`
.boxed();
.fetch(&self.pool)
.map_err(Into::into),
}
.boxed();

partition_streams.push(partition_stream);
partition_streams.push(stream);
}

// `select_all` requires that all streams have the same Item type:
Expand Down Expand Up @@ -550,10 +545,6 @@ mod tests {
let got: Vec<DbStoredIris> = store
.stream_irises_par(Some(0), 2)
.await
.map_ok(|stored_iris| match stored_iris {
StoredIris::DB(db_iris) => db_iris,
StoredIris::S3(_) => panic!("Unexpected S3 variant in this test!"),
})
.try_collect()
.await?;
assert_eq!(got.len(), 0);
Expand Down Expand Up @@ -593,10 +584,6 @@ mod tests {
let mut got_par: Vec<DbStoredIris> = store
.stream_irises_par(Some(0), 2)
.await
.map_ok(|stored_iris| match stored_iris {
StoredIris::DB(db_iris) => db_iris,
StoredIris::S3(_) => panic!("Unexpected S3 variant in this test!"),
})
.try_collect()
.await?;
got_par.sort_by_key(|iris| iris.id);
Expand Down Expand Up @@ -678,10 +665,6 @@ mod tests {
let mut got_par: Vec<DbStoredIris> = store
.stream_irises_par(Some(0), parallelism)
.await
.map_ok(|stored_iris| match stored_iris {
StoredIris::DB(db_iris) => db_iris,
StoredIris::S3(_) => panic!("Unexpected S3 variant in this test!"),
})
.try_collect()
.await?;
got_par.sort_by_key(|iris| iris.id);
Expand Down
Loading

0 comments on commit afc96f2

Please sign in to comment.