Skip to content

Commit

Permalink
aurel/hawk-actor: search and insert, pseudo-parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
Aurélien Nicolas committed Jan 21, 2025
1 parent 164310f commit 26745a3
Showing 1 changed file with 57 additions and 6 deletions.
63 changes: 57 additions & 6 deletions iris-mpc-cpu/src/execution/hawk_main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,10 @@ pub struct HawkRequest {
pub my_iris_shares: Vec<GaloisRingSharedIris>,
}

pub type InsertPlan = InsertPlanX<Aby3Store>;
pub type InsertPlan = InsertPlanV<Aby3Store>;

#[derive(Debug)]
pub struct InsertPlanX<V: VectorStore> {
pub struct InsertPlanV<V: VectorStore> {
query: V::QueryRef,
links: Vec<FurthestQueue<V::VectorRef, V::DistanceRef>>,
set_ep: bool,
Expand Down Expand Up @@ -185,21 +185,21 @@ impl HawkActor {
}

// TODO: Implement actual parallelism.
pub async fn search_to_insert_par(
pub async fn search_to_insert(
&mut self,
sessions: &mut [HawkSession],
req: HawkRequest,
) -> Result<Vec<InsertPlan>> {
let mut plans = vec![];
for (i, iris) in req.my_iris_shares.into_iter().enumerate() {
let session = &mut sessions[i % sessions.len()];
plans.push(self.search_to_insert(session, iris).await?);
plans.push(self.search_to_insert_one(session, iris).await?);
}
Ok(plans)
}

// TODO: Remove `&mut self` requirement to support parallel sessions.
pub async fn search_to_insert(
async fn search_to_insert_one(
&mut self,
session: &mut HawkSession,
iris: GaloisRingSharedIris,
Expand All @@ -223,6 +223,36 @@ impl HawkActor {
set_ep,
})
}

// TODO: Implement actual parallelism.
pub async fn insert(
&mut self,
sessions: &mut [HawkSession],
plans: Vec<InsertPlan>,
) -> Result<()> {
let plans = join_plans(plans);
for (i, plan) in izip!(0.., plans) {
let session = &mut sessions[i % sessions.len()];
self.insert_one(session, plan).await?;
}
Ok(())
}

// TODO: Remove `&mut self` requirement to support parallel sessions.
async fn insert_one(&mut self, session: &mut HawkSession, plan: InsertPlan) -> Result<()> {
let inserted = session.aby3_store.insert(&plan.query).await;

self.search_params
.insert_from_search_results(
&mut session.aby3_store,
&mut self.graph_store,
inserted,
plan.links,
plan.set_ep,
)
.await;
Ok(())
}
}

#[derive(Default)]
Expand All @@ -238,6 +268,26 @@ impl Consensus {
}
}

/// Combine insert plans from parallel searches, repairing any conflict.
fn join_plans(mut plans: Vec<InsertPlan>) -> Vec<InsertPlan> {
let set_ep = plans.iter().any(|plan| plan.set_ep);
if set_ep {
// There can be at most one new entry point.
let highest = plans
.iter()
.map(|plan| plan.links.len())
.position_max()
.unwrap();

for plan in &mut plans {
plan.set_ep = false;
}
plans[highest].set_ep = true;
plans.swap(0, highest);
}
plans
}

pub async fn hawk_main(args: HawkArgs) -> Result<()> {
println!("🦅 Starting Hawk node {}", args.party_index);
let mut hawk_actor = HawkActor::from_cli(&args).await?;
Expand All @@ -261,7 +311,8 @@ pub async fn hawk_main(args: HawkArgs) -> Result<()> {
.collect_vec();
let req = HawkRequest { my_iris_shares };

hawk_actor.search_to_insert_par(&mut sessions, req).await?;
let plans = hawk_actor.search_to_insert(&mut sessions, req).await?;
hawk_actor.insert(&mut sessions, plans).await?;

println!("🎉 Inserted {batch_size} items into the database");
Ok(())
Expand Down

0 comments on commit 26745a3

Please sign in to comment.