Skip to content

Commit

Permalink
feat(hydroflow)!: remove import!, fix #1110 (#1600)
Browse files Browse the repository at this point in the history
in prep for rust stable #1587

No good way to resolve the source file paths on stable

No way to get good diagnostics on external files in general, at all

rust-lang/rfcs#3200
  • Loading branch information
MingweiSamuel authored Dec 10, 2024
1 parent 408b904 commit f2a4bee
Show file tree
Hide file tree
Showing 14 changed files with 23 additions and 379 deletions.
4 changes: 1 addition & 3 deletions benches/benches/fork_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,7 @@ fn benchmark_hydroflow(c: &mut Criterion) {
fn benchmark_hydroflow_surface(c: &mut Criterion) {
c.bench_function("fork_join/hydroflow/surface", |b| {
b.iter(|| {
let mut hf = hydroflow_syntax! {
source_iter(0..NUM_INTS) -> import!("fork_join_20.hf") -> for_each(|x| { black_box(x); });
};
let mut hf = include!("fork_join_20.hf");
hf.run_available();
})
});
Expand Down
12 changes: 8 additions & 4 deletions benches/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,21 @@ pub fn fork_join() -> std::io::Result<()> {
let file = File::create(path)?;
let mut write = BufWriter::new(file);

writeln!(write, "a0 = mod -> tee();")?;

writeln!(write, "hydroflow_syntax! {{")?;
writeln!(write, "a0 = source_iter(0..NUM_INTS) -> tee();")?;
for i in 0..NUM_OPS {
if i > 0 {
writeln!(write, "a{} = union() -> tee();", i)?;
}
writeln!(write, "a{} -> filter(|x| x % 2 == 0) -> a{};", i, i + 1)?;
writeln!(write, "a{} -> filter(|x| x % 2 == 1) -> a{};", i, i + 1)?;
}

writeln!(write, "a{} = union() -> mod;", NUM_OPS)?;
writeln!(
write,
"a{} = union() -> for_each(|x| {{ black_box(x); }});",
NUM_OPS
)?;
writeln!(write, "}}")?;

write.flush()?;

Expand Down
8 changes: 0 additions & 8 deletions hydroflow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,6 @@ required-features = [ "nightly" ]
name = "python_udf"
required-features = [ "python" ]

[[example]]
name = "modules_outer_join"
required-features = [ "debugging" ]

[[example]]
name = "modules_triple_cross_join"
required-features = [ "debugging" ]

[dependencies]
bincode = "1.3.1"
byteorder = "1.3.2"
Expand Down
23 changes: 0 additions & 23 deletions hydroflow/examples/modules_outer_join/full_outer_join.hf

This file was deleted.

16 changes: 0 additions & 16 deletions hydroflow/examples/modules_outer_join/left_outer_join.hf

This file was deleted.

30 changes: 0 additions & 30 deletions hydroflow/examples/modules_outer_join/main.rs

This file was deleted.

6 changes: 0 additions & 6 deletions hydroflow/examples/modules_outer_join/right_outer_join.hf

This file was deleted.

51 changes: 0 additions & 51 deletions hydroflow/examples/modules_triple_cross_join/main.rs

This file was deleted.

15 changes: 0 additions & 15 deletions hydroflow/examples/modules_triple_cross_join/triple_cross_join.hf

This file was deleted.

146 changes: 1 addition & 145 deletions hydroflow_lang/src/graph/flat_graph_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
use std::borrow::Cow;
use std::collections::btree_map::Entry;
use std::collections::{BTreeMap, BTreeSet};
use std::path::PathBuf;

use itertools::Itertools;
use proc_macro2::Span;
Expand Down Expand Up @@ -71,9 +70,6 @@ pub struct FlatGraphBuilder {
/// Use statements.
uses: Vec<ItemUse>,

/// In order to make import!() statements relative to the current file, we need to know where the file is that is building the flat graph.
invocating_file_path: PathBuf,

/// If the flat graph is being loaded as a module, then two initial ModuleBoundary nodes are inserted into the graph. One
/// for the input into the module and one for the output out of the module.
module_boundary_nodes: Option<(GraphNodeId, GraphNodeId)>,
Expand All @@ -86,37 +82,8 @@ impl FlatGraphBuilder {
}

/// Convert the Hydroflow code AST into a graph builder.
pub fn from_hfcode(input: HfCode, macro_invocation_path: PathBuf) -> Self {
let mut builder = Self {
invocating_file_path: macro_invocation_path,
..Default::default()
};
builder.process_statements(input.statements);
builder
}

/// Convert the Hydroflow code AST into a graph builder.
pub fn from_hfmodule(input: HfCode, root_path: PathBuf) -> Self {
pub fn from_hfcode(input: HfCode) -> Self {
let mut builder = Self::default();
builder.invocating_file_path = root_path; // imports inside of modules should be relative to the importing file.
builder.module_boundary_nodes = Some((
builder.flat_graph.insert_node(
GraphNode::ModuleBoundary {
input: true,
import_expr: Span::call_site(),
},
Some(Ident::new("input", Span::call_site())),
None,
),
builder.flat_graph.insert_node(
GraphNode::ModuleBoundary {
input: false,
import_expr: Span::call_site(),
},
Some(Ident::new("output", Span::call_site())),
None,
),
));
builder.process_statements(input.statements);
builder
}
Expand Down Expand Up @@ -276,118 +243,7 @@ impl FlatGraphBuilder {
out: Some((PortIndexValue::Elided(op_span), GraphDet::Determined(nid))),
}
}
Pipeline::Import(import) => {
// TODO: https://github.com/rust-lang/rfcs/pull/3200
// this would be way better...
let file_path = {
let mut dir = self.invocating_file_path.clone();
dir.pop();
dir.join(import.filename.value())
};

let file_contents = match std::fs::read_to_string(&file_path) {
Ok(contents) => contents,
Err(err) => {
self.diagnostics.push(Diagnostic::spanned(
import.filename.span(),
Level::Error,
format!("filename: {}, err: {err}", import.filename.value()),
));

return Ends {
inn: None,
out: None,
};
}
};

let statements = match syn::parse_str::<HfCode>(&file_contents) {
Ok(code) => code,
Err(err) => {
self.diagnostics.push(Diagnostic::spanned(
import.span(),
Level::Error,
format!("Error in module: {}", err),
));

return Ends {
inn: None,
out: None,
};
}
};

let flat_graph_builder = FlatGraphBuilder::from_hfmodule(statements, file_path);
let (flat_graph, _uses, diagnostics) = flat_graph_builder.build();
diagnostics.iter().for_each(Diagnostic::emit);

self.merge_in(flat_graph, import.span())
}
}
}

/// Merge one flatgraph into the current flatgraph
/// other must be a flatgraph and not be partitioned yet.
fn merge_in(&mut self, other: HydroflowGraph, parent_span: Span) -> Ends {
assert_eq!(other.subgraphs().count(), 0);

let mut ends = Ends {
inn: None,
out: None,
};

let mut node_mapping = BTreeMap::new();

for (other_node_id, node) in other.nodes() {
match node {
GraphNode::Operator(_) => {
let varname = other.node_varname(other_node_id);
let new_id = self.flat_graph.insert_node(node.clone(), varname, None);
node_mapping.insert(other_node_id, new_id);
}
GraphNode::ModuleBoundary { input, .. } => {
let new_id = self.flat_graph.insert_node(
GraphNode::ModuleBoundary {
input: *input,
import_expr: parent_span,
},
Some(Ident::new(&format!("module_{}", input), parent_span)),
None,
);
node_mapping.insert(other_node_id, new_id);

// in the case of nested imports, this module boundary might not be the module boundary into or out of the top-most module
// So we have to be careful to only target those two boundaries.
// There should be no inputs to it, if it is an input boundary, if it is the top-most one.
// and there should be no outputs from it, if it is an output boundary, if it is the top-most one.
if *input && other.node_predecessor_nodes(other_node_id).count() == 0 {
if other.node_predecessor_nodes(other_node_id).count() == 0 {
ends.inn =
Some((PortIndexValue::Elided(None), GraphDet::Determined(new_id)));
}
} else if !(*input) && other.node_successor_nodes(other_node_id).count() == 0 {
ends.out =
Some((PortIndexValue::Elided(None), GraphDet::Determined(new_id)));
}
}
GraphNode::Handoff { .. } => {
panic!("Handoff in graph that is being merged into self")
}
}
}

for (other_edge_id, (other_src, other_dst)) in other.edges() {
let (src_port, dst_port) = other.edge_ports(other_edge_id);

let _new_edge_id = self.flat_graph.insert_edge(
*node_mapping.get(&other_src).unwrap(),
src_port.clone(),
*node_mapping.get(&other_dst).unwrap(),
dst_port.clone(),
);
}

ends
}

/// Connects operator links as a final building step. Processes all the links stored in
Expand Down
4 changes: 1 addition & 3 deletions hydroflow_lang/src/graph/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ mod hydroflow_graph;
mod hydroflow_graph_debugging;

use std::fmt::Display;
use std::path::PathBuf;

pub use di_mul_graph::DiMulGraph;
pub use eliminate_extra_unions_tees::eliminate_extra_unions_tees;
Expand Down Expand Up @@ -376,9 +375,8 @@ impl Display for PortIndexValue {
pub fn build_hfcode(
hf_code: HfCode,
root: &TokenStream,
macro_invocation_path: PathBuf,
) -> (Option<(HydroflowGraph, TokenStream)>, Vec<Diagnostic>) {
let flat_graph_builder = FlatGraphBuilder::from_hfcode(hf_code, macro_invocation_path);
let flat_graph_builder = FlatGraphBuilder::from_hfcode(hf_code);
let (mut flat_graph, uses, mut diagnostics) = flat_graph_builder.build();
if !diagnostics.iter().any(Diagnostic::is_error) {
if let Err(diagnostic) = flat_graph.merge_modules() {
Expand Down
Loading

0 comments on commit f2a4bee

Please sign in to comment.