Skip to content

Commit

Permalink
feat(hydro_lang): provide APIs for blanket-deploying locations (#1676)
Browse files Browse the repository at this point in the history
This makes it easy to implement patterns like deploying everything to
localhost.
  • Loading branch information
shadaj authored Jan 29, 2025
1 parent 90b3d8a commit 316d700
Show file tree
Hide file tree
Showing 7 changed files with 183 additions and 26 deletions.
55 changes: 45 additions & 10 deletions hydro_lang/src/builder/built.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ use crate::staging_util::Invariant;

pub struct BuiltFlow<'a> {
pub(super) ir: Vec<HydroLeaf>,
pub(super) processes: Vec<usize>,
pub(super) clusters: Vec<usize>,
pub(super) process_id_name: Vec<(usize, String)>,
pub(super) cluster_id_name: Vec<(usize, String)>,
pub(super) external_id_name: Vec<(usize, String)>,
pub(super) used: bool,

pub(super) _phantom: Invariant<'a>,
Expand Down Expand Up @@ -54,8 +55,9 @@ impl<'a> BuiltFlow<'a> {
self.used = true;
BuiltFlow {
ir: f(std::mem::take(&mut self.ir)),
processes: std::mem::take(&mut self.processes),
clusters: std::mem::take(&mut self.clusters),
process_id_name: std::mem::take(&mut self.process_id_name),
cluster_id_name: std::mem::take(&mut self.cluster_id_name),
external_id_name: std::mem::take(&mut self.external_id_name),
used: false,
_phantom: PhantomData,
}
Expand All @@ -69,28 +71,40 @@ impl<'a> BuiltFlow<'a> {
fn into_deploy<D: LocalDeploy<'a>>(mut self) -> DeployFlow<'a, D> {
self.used = true;
let processes = if D::has_trivial_node() {
self.processes
self.process_id_name
.iter()
.map(|id| (*id, D::trivial_process(*id)))
.map(|id| (id.0, D::trivial_process(id.0)))
.collect()
} else {
HashMap::new()
};

let clusters = if D::has_trivial_node() {
self.clusters
self.cluster_id_name
.iter()
.map(|id| (*id, D::trivial_cluster(*id)))
.map(|id| (id.0, D::trivial_cluster(id.0)))
.collect()
} else {
HashMap::new()
};

let externals = if D::has_trivial_node() {
self.external_id_name
.iter()
.map(|id| (id.0, D::trivial_external(id.0)))
.collect()
} else {
HashMap::new()
};

DeployFlow {
ir: std::mem::take(&mut self.ir),
nodes: processes,
processes,
process_id_name: std::mem::take(&mut self.process_id_name),
clusters,
externals: HashMap::new(),
cluster_id_name: std::mem::take(&mut self.cluster_id_name),
externals,
external_id_name: std::mem::take(&mut self.external_id_name),
used: false,
_phantom: PhantomData,
}
Expand All @@ -104,6 +118,13 @@ impl<'a> BuiltFlow<'a> {
self.into_deploy().with_process(process, spec)
}

pub fn with_remaining_processes<D: LocalDeploy<'a>, S: IntoProcessSpec<'a, D> + 'a>(
self,
spec: impl Fn() -> S,
) -> DeployFlow<'a, D> {
self.into_deploy().with_remaining_processes(spec)
}

pub fn with_external<P, D: LocalDeploy<'a>>(
self,
process: &ExternalProcess<P>,
Expand All @@ -112,6 +133,13 @@ impl<'a> BuiltFlow<'a> {
self.into_deploy().with_external(process, spec)
}

pub fn with_remaining_externals<D: LocalDeploy<'a>, S: ExternalSpec<'a, D> + 'a>(
self,
spec: impl Fn() -> S,
) -> DeployFlow<'a, D> {
self.into_deploy().with_remaining_externals(spec)
}

pub fn with_cluster<C, D: LocalDeploy<'a>>(
self,
cluster: &Cluster<C>,
Expand All @@ -120,6 +148,13 @@ impl<'a> BuiltFlow<'a> {
self.into_deploy().with_cluster(cluster, spec)
}

pub fn with_remaining_clusters<D: LocalDeploy<'a>, S: ClusterSpec<'a, D> + 'a>(
self,
spec: impl Fn() -> S,
) -> DeployFlow<'a, D> {
self.into_deploy().with_remaining_clusters(spec)
}

pub fn compile<D: Deploy<'a>>(self, env: &D::CompileEnv) -> CompiledFlow<'a, D::GraphId> {
self.into_deploy::<D>().compile(env)
}
Expand Down
56 changes: 50 additions & 6 deletions hydro_lang/src/builder/deploy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,19 @@ use crate::staging_util::Invariant;

pub struct DeployFlow<'a, D: LocalDeploy<'a>> {
pub(super) ir: Vec<HydroLeaf>,
pub(super) nodes: HashMap<usize, D::Process>,

/// Deployed instances of each process in the flow
pub(super) processes: HashMap<usize, D::Process>,

/// Lists all the processes that were created in the flow, same ID as `processes`
/// but with the type name of the tag.
pub(super) process_id_name: Vec<(usize, String)>,

pub(super) externals: HashMap<usize, D::ExternalProcess>,
pub(super) external_id_name: Vec<(usize, String)>,

pub(super) clusters: HashMap<usize, D::Cluster>,
pub(super) cluster_id_name: Vec<(usize, String)>,
pub(super) used: bool,

pub(super) _phantom: Invariant<'a, D>,
Expand All @@ -52,13 +62,25 @@ impl<'a, D: LocalDeploy<'a>> DeployFlow<'a, D> {
spec: impl IntoProcessSpec<'a, D>,
) -> Self {
let tag_name = std::any::type_name::<P>().to_string();
self.nodes.insert(
self.processes.insert(
process.id,
spec.into_process_spec().build(process.id, &tag_name),
);
self
}

pub fn with_remaining_processes<S: IntoProcessSpec<'a, D> + 'a>(
mut self,
spec: impl Fn() -> S,
) -> Self {
for (id, name) in &self.process_id_name {
self.processes
.insert(*id, spec().into_process_spec().build(*id, name));
}

self
}

pub fn with_external<P>(
mut self,
process: &ExternalProcess<P>,
Expand All @@ -70,13 +92,35 @@ impl<'a, D: LocalDeploy<'a>> DeployFlow<'a, D> {
self
}

pub fn with_remaining_externals<S: ExternalSpec<'a, D> + 'a>(
mut self,
spec: impl Fn() -> S,
) -> Self {
for (id, name) in &self.external_id_name {
self.externals.insert(*id, spec().build(*id, name));
}

self
}

pub fn with_cluster<C>(mut self, cluster: &Cluster<C>, spec: impl ClusterSpec<'a, D>) -> Self {
let tag_name = std::any::type_name::<C>().to_string();
self.clusters
.insert(cluster.id, spec.build(cluster.id, &tag_name));
self
}

pub fn with_remaining_clusters<S: ClusterSpec<'a, D> + 'a>(
mut self,
spec: impl Fn() -> S,
) -> Self {
for (id, name) in &self.cluster_id_name {
self.clusters.insert(*id, spec().build(*id, name));
}

self
}

pub fn compile_no_network(mut self) -> CompiledFlow<'a, D::GraphId> {
self.used = true;

Expand All @@ -99,7 +143,7 @@ impl<'a, D: Deploy<'a>> DeployFlow<'a, D> {
leaf.compile_network::<D>(
env,
&mut seen_tees,
&self.nodes,
&self.processes,
&self.clusters,
&self.externals,
)
Expand Down Expand Up @@ -130,7 +174,7 @@ impl<'a, D: Deploy<'a>> DeployFlow<'a, D> {
let #self_id_ident = #self_id_expr;
});

for other_location in self.nodes.keys().chain(self.clusters.keys()) {
for other_location in self.processes.keys().chain(self.clusters.keys()) {
let other_id_ident = syn::Ident::new(
&format!("__hydro_lang_cluster_ids_{}", c_id),
Span::call_site(),
Expand Down Expand Up @@ -160,7 +204,7 @@ impl<'a, D: Deploy<'a, CompileEnv = ()>> DeployFlow<'a, D> {
leaf.compile_network::<D>(
&(),
&mut seen_tees_instantiate,
&self.nodes,
&self.processes,
&self.clusters,
&self.externals,
)
Expand All @@ -172,7 +216,7 @@ impl<'a, D: Deploy<'a, CompileEnv = ()>> DeployFlow<'a, D> {
let mut meta = D::Meta::default();

let (mut processes, mut clusters, mut externals) = (
std::mem::take(&mut self.nodes)
std::mem::take(&mut self.processes)
.into_iter()
.filter_map(|(node_id, node)| {
if let Some(ir) = compiled.remove(&node_id) {
Expand Down
50 changes: 42 additions & 8 deletions hydro_lang/src/builder/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::any::type_name;
use std::cell::RefCell;
use std::collections::HashMap;
use std::marker::PhantomData;
Expand Down Expand Up @@ -44,8 +45,9 @@ pub const FLOW_USED_MESSAGE: &str = "Attempted to add a leaf to a flow that has

pub struct FlowBuilder<'a> {
flow_state: FlowState,
nodes: RefCell<Vec<usize>>,
clusters: RefCell<Vec<usize>>,
processes: RefCell<Vec<(usize, String)>>,
clusters: RefCell<Vec<(usize, String)>>,
externals: RefCell<Vec<(usize, String)>>,

next_node_id: RefCell<usize>,

Expand Down Expand Up @@ -87,8 +89,9 @@ impl<'a> FlowBuilder<'a> {
cycle_counts: HashMap::new(),
next_clock_id: 0,
})),
nodes: RefCell::new(vec![]),
processes: RefCell::new(vec![]),
clusters: RefCell::new(vec![]),
externals: RefCell::new(vec![]),
next_node_id: RefCell::new(0),
finalized: false,
_phantom: PhantomData,
Expand All @@ -101,8 +104,9 @@ impl<'a> FlowBuilder<'a> {

built::BuiltFlow {
ir: self.flow_state.borrow_mut().leaves.take().unwrap(),
processes: self.nodes.replace(vec![]),
clusters: self.clusters.replace(vec![]),
process_id_name: self.processes.replace(vec![]),
cluster_id_name: self.clusters.replace(vec![]),
external_id_name: self.externals.replace(vec![]),
used: false,
_phantom: PhantomData,
}
Expand Down Expand Up @@ -130,7 +134,9 @@ impl<'a> FlowBuilder<'a> {
let id = *next_node_id;
*next_node_id += 1;

self.nodes.borrow_mut().push(id);
self.processes
.borrow_mut()
.push((id, type_name::<P>().to_string()));

Process {
id,
Expand All @@ -144,7 +150,9 @@ impl<'a> FlowBuilder<'a> {
let id = *next_node_id;
*next_node_id += 1;

self.nodes.borrow_mut().push(id);
self.externals
.borrow_mut()
.push((id, type_name::<P>().to_string()));

ExternalProcess {
id,
Expand All @@ -158,7 +166,9 @@ impl<'a> FlowBuilder<'a> {
let id = *next_node_id;
*next_node_id += 1;

self.clusters.borrow_mut().push(id);
self.clusters
.borrow_mut()
.push((id, type_name::<C>().to_string()));

Cluster {
id,
Expand All @@ -176,6 +186,14 @@ impl<'a> FlowBuilder<'a> {
self.with_default_optimize().with_process(process, spec)
}

#[cfg(feature = "build")]
pub fn with_remaining_processes<D: LocalDeploy<'a>, S: IntoProcessSpec<'a, D> + 'a>(
self,
spec: impl Fn() -> S,
) -> DeployFlow<'a, D> {
self.with_default_optimize().with_remaining_processes(spec)
}

#[cfg(feature = "build")]
pub fn with_external<P, D: LocalDeploy<'a>>(
self,
Expand All @@ -185,6 +203,14 @@ impl<'a> FlowBuilder<'a> {
self.with_default_optimize().with_external(process, spec)
}

#[cfg(feature = "build")]
pub fn with_remaining_externals<D: LocalDeploy<'a>, S: ExternalSpec<'a, D> + 'a>(
self,
spec: impl Fn() -> S,
) -> DeployFlow<'a, D> {
self.with_default_optimize().with_remaining_externals(spec)
}

#[cfg(feature = "build")]
pub fn with_cluster<C, D: LocalDeploy<'a>>(
self,
Expand All @@ -194,6 +220,14 @@ impl<'a> FlowBuilder<'a> {
self.with_default_optimize().with_cluster(cluster, spec)
}

#[cfg(feature = "build")]
pub fn with_remaining_clusters<D: LocalDeploy<'a>, S: ClusterSpec<'a, D> + 'a>(
self,
spec: impl Fn() -> S,
) -> DeployFlow<'a, D> {
self.with_default_optimize().with_remaining_clusters(spec)
}

#[cfg(feature = "build")]
pub fn compile<D: Deploy<'a>>(self, env: &D::CompileEnv) -> CompiledFlow<'a, D::GraphId> {
self.with_default_optimize::<D>().compile(env)
Expand Down
8 changes: 8 additions & 0 deletions hydro_lang/src/deploy/in_memory_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ impl LocalDeploy<'_> for SingleProcessGraph {
fn trivial_cluster(_id: usize) -> Self::Cluster {
SingleNode {}
}

fn trivial_external(_id: usize) -> Self::ExternalProcess {
SingleNode {}
}
}

impl ProcessSpec<'_, SingleProcessGraph> for () {
Expand Down Expand Up @@ -74,6 +78,10 @@ impl LocalDeploy<'_> for MultiGraph {
fn trivial_cluster(_id: usize) -> Self::Cluster {
MultiNode {}
}

fn trivial_external(_id: usize) -> Self::ExternalProcess {
MultiNode {}
}
}

impl ProcessSpec<'_, MultiGraph> for () {
Expand Down
4 changes: 4 additions & 0 deletions hydro_lang/src/deploy/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ pub trait LocalDeploy<'a> {
fn trivial_cluster(_id: usize) -> Self::Cluster {
panic!("No trivial cluster")
}

fn trivial_external(_id: usize) -> Self::ExternalProcess {
panic!("No trivial external")
}
}

pub trait Deploy<'a> {
Expand Down
4 changes: 2 additions & 2 deletions hydro_lang/src/ir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,13 @@ impl HydroLeaf {
self,
compile_env: &D::CompileEnv,
seen_tees: &mut SeenTees,
nodes: &HashMap<usize, D::Process>,
processes: &HashMap<usize, D::Process>,
clusters: &HashMap<usize, D::Cluster>,
externals: &HashMap<usize, D::ExternalProcess>,
) -> HydroLeaf {
self.transform_children(
|n, s| {
n.compile_network::<D>(compile_env, s, nodes, clusters, externals);
n.compile_network::<D>(compile_env, s, processes, clusters, externals);
},
seen_tees,
)
Expand Down
Loading

0 comments on commit 316d700

Please sign in to comment.