diff --git a/src/pipeline/mod.rs b/src/pipeline/mod.rs index 0b7c7640..db27aafc 100644 --- a/src/pipeline/mod.rs +++ b/src/pipeline/mod.rs @@ -110,7 +110,7 @@ where macro_rules! propagate { ($s1:expr) => { { - let mut block = InNode::new(Box::new($s1), get_global_orchestrator()); + let block = InNode::new(Box::new($s1), get_global_orchestrator()); block } }; @@ -118,7 +118,7 @@ macro_rules! propagate { ($s1:expr $(, $tail:expr)*) => { { let node = ($s1); - let mut block = InOutNode::new(Box::new(node), + let block = InOutNode::new(Box::new(node), propagate!($($tail),*), get_global_orchestrator()); block @@ -168,10 +168,10 @@ macro_rules! pipeline { ($s1:expr $(, $tail:expr)*) => { { let orchestrator = get_global_orchestrator(); - let mut block = OutNode::new(Box::new($s1), + let block = OutNode::new(Box::new($s1), propagate!($($tail),*), orchestrator); - let mut pipeline = Pipeline::new(block); + let pipeline = Pipeline::new(block); pipeline } }; diff --git a/src/pipeline/node/inout_node.rs b/src/pipeline/node/inout_node.rs index 65cccd84..bcb6a8ad 100644 --- a/src/pipeline/node/inout_node.rs +++ b/src/pipeline/node/inout_node.rs @@ -442,6 +442,9 @@ where // If the node is not ordered, then i can send the messages to the next node // without any order. loop { + if tmp.is_empty() { + break; + } let msg = Message { op: Task::New(tmp.pop_front().unwrap()), order: 0, @@ -450,9 +453,6 @@ where if err.is_err() { panic!("Error: {}", err.unwrap_err()); } - if tmp.is_empty() { - break; - } } } } diff --git a/src/templates/map.rs b/src/templates/map.rs index dffc8f17..92d09164 100644 --- a/src/templates/map.rs +++ b/src/templates/map.rs @@ -26,6 +26,32 @@ where /// # Arguments /// * `n_worker` - Number of worker threads. /// * `f` - Function to apply to each element of the input. + /// + /// # Examples + /// + /// Given a vector of vectors, each one containing a set of numbers, + /// compute the square value of each number contained in each + /// vector. + /// + /// ``` + /// use ppl::{prelude::*, templates::misc::{SourceIter, SinkVec}, templates::map::Map}; + /// + /// let numbers: Vec = vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0]; + /// let mut vector = Vec::new(); + /// + /// // Create the vector of vectors. + /// for _i in 0..1000 { + /// vector.push(numbers.clone()); + /// } + /// // Instantiate a new Pipeline with a Map operator. + /// let pipe = pipeline![ + /// SourceIter::build(vector.into_iter()), + /// Map::build(4, |el: f64| el * el), + /// SinkVec::build() + /// ]; + /// // Start the pipeline and collect the results. + /// let res: Vec> = pipe.start_and_wait_end().unwrap(); + /// ``` pub fn build(n_worker: usize, f: F) -> impl InOut where TInIter: IntoIterator, @@ -48,6 +74,8 @@ where /// Panics if n_replicas is 0. /// # Remarks /// The replicas are created by cloning the Map node. + /// This mean that 4 replicas of a Map node with 2 workers each + /// will result in the usage of 8 threads. pub fn build_with_replicas( n_worker: usize, n_replicas: usize, @@ -128,6 +156,47 @@ where /// * `f` - Function to apply to each element of the input. /// # Panics /// Panics if n_replicas is 0. + /// # Remarks + /// The replicas are created by cloning the OrderedMap node. + /// This mean that 4 replicas of an Ordered Map node with 2 workers each + /// will result in the usage of 8 threads. + /// + /// # Examples + /// + /// Given a vector of vectors, each one containing a set of numbers, + /// compute the square value of each number contained in each + /// vector. + /// In this case, using the OrderedMap template, it is possible + /// to mantain the order of the input in the output. + /// Moreover, using the `build_with_replicas` method, + /// we can create a stage consisting of four map operator, + /// each one composed by 2 worker. + /// + /// ``` + /// use ppl::{prelude::*, templates::misc::{SourceIter, OrderedSinkVec}, templates::map::OrderedMap}; + /// let mut counter = 1.0; + /// let mut vector = Vec::new(); + /// + /// // Create a vector of vectors, each one containing a set of numbers. + /// for _i in 0..1000{ + /// let mut numbers = Vec::new(); + /// for _i in 0..10 { + /// numbers.push(counter); + /// counter += 1.0; + /// } + /// vector.push(numbers); + /// } + /// + /// // Instantiate the pipeline. + /// let pipe = pipeline![ + /// SourceIter::build(vector.into_iter()), + /// OrderedMap::build_with_replicas(4, 2, |el: f64| el * el), + /// OrderedSinkVec::build() + /// ]; + /// + /// // Start the pipeline and collect the results. + /// let res: Vec> = pipe.start_and_wait_end().unwrap(); + /// ``` pub fn build_with_replicas( n_worker: usize, n_replicas: usize, @@ -193,6 +262,48 @@ where /// # Arguments /// * `n_worker` - Number of worker threads. /// * `f` - Function to apply to each element of the input. + /// + /// # Examples + /// + /// Given a collection of vectors of integers, for each vector + /// compute the summation of its elements. + /// As this reduce function works by grouping by key, + /// the input of the reduce function must be a vector of tuple (key, value). + /// + /// ``` + /// + /// use ppl::{prelude::*, templates::misc::{SourceIter, SinkVec}, templates::map::Reduce}; + /// + /// // Create the vector of the elements that will be emitted by the Source node. + /// // vec![(key,value)] + /// let vector = vec![ + /// vec![(0, 1), (0, 2), (0, 3), (0, 4), (0, 5), (0, 6), (0, 7), (0, 8), (0, 9), (0 ,10)], + /// vec![(0, 1), (0, 2), (0, 3), (0, 4), (0, 5), (0, 6), (0, 7), (0, 8), (0, 9), (0 ,10)], + /// vec![(0, 1), (0, 2), (0, 3), (0, 4), (0, 5), (0, 6), (0, 7), (0, 8), (0, 9), (0 ,10)], + /// ]; + /// + /// // Instantiate a new pipeline. + /// let pipe = pipeline![ + /// SourceIter::build(vector.into_iter()), + /// Reduce::build(4, |i, vec| -> (i32, i32) { + /// (i, vec.iter().sum()) + /// }), + /// SinkVec::build() + /// ]; + /// + /// // Start the pipeline and wait for the results. + /// let res: Vec> = pipe.start_and_wait_end().unwrap(); + /// + /// // Collect a results for each vector emitted by the Source. In our case we had 3 vectors. + /// assert_eq!(res.len(), 3); + /// + /// // As for each vector emitted we had only one key, we obtain only one result tuple + /// // for vector. + /// for vec in res { + /// for el in vec { + /// assert_eq!(el.1, 55); + /// } + /// } pub fn build(n_worker: usize, f: F) -> impl InOut where TInIter: IntoIterator, @@ -213,6 +324,10 @@ where /// * `f` - Function to apply to each element of the input. /// # Panics /// Panics if n_replicas is 0. + /// # Remarks + /// The replicas are created by cloning the Reduce node. + /// This mean that 4 replicas of a Reduce node with 2 workers each + /// will result in the usage of 8 threads. pub fn build_with_replicas( n_worker: usize, n_replicas: usize, @@ -298,6 +413,53 @@ where /// * `f` - Function to apply to each element of the input. /// # Panics /// Panics if n_replicas is 0. + /// # Remarks + /// The replicas are created by cloning the OrderedReduce node. + /// This mean that 4 replicas of an OrderedReduce node with 2 workers each + /// will result in the usage of 8 threads. + /// + /// # Examples + /// + /// Given a collection of vectors of integers, for each vector + /// compute the summation of its elements. + /// As this reduce function works by grouping by key, + /// the input of the reduce function must be a vector of tuple (key, value). + /// In this example we want mantain the order of the input in the output. + /// + /// ``` + /// use ppl::{prelude::*, templates::misc::{SourceIter, OrderedSinkVec}, templates::map::OrderedReduce}; + /// + /// // Create the vector of the elements that will be emitted by the Source node. + /// // vec![(key,value)] + /// let vector = vec![ + /// vec![(0, 1), (0, 2), (0, 3), (0, 4), (0, 5), (0, 6), (0, 7), (0, 8), (0, 9), (0 ,10)], + /// vec![(1, 1), (1, 2), (1, 3), (1, 4), (1, 5), (1, 6), (1, 7), (1, 8), (1, 9), (1 ,10)], + /// vec![(2, 1), (2, 2), (2, 3), (2, 4), (2, 5), (2, 6), (2, 7), (2, 8), (2, 9), (2 ,10)], + /// ]; + /// + /// // Instantiate a new pipeline. + /// let pipe = pipeline![ + /// SourceIter::build(vector.into_iter()), + /// OrderedReduce::build_with_replicas(2, 4, |i, vec| -> (usize, i32) { + /// (i, vec.iter().sum()) + /// }), + /// OrderedSinkVec::build() + /// ]; + /// + /// // Start the pipeline and wait for the results. + /// let res: Vec> = pipe.start_and_wait_end().unwrap(); + /// + /// // Collect a results for each vector emitted by the Source. In our case we had 3 vectors. + /// assert_eq!(res.len(), 3); + /// + /// // As for each vector emitted we had only one key, we obtain only one result tuple + /// // for vector. Moreover, we check here also if the order of the input was preserved + /// // in the output. + /// for (check, vec) in res.into_iter().enumerate() { + /// for el in vec { + /// assert_eq!(el, (check, 55)); + /// } + /// } pub fn build_with_replicas( n_worker: usize, n_replicas: usize, @@ -373,6 +535,43 @@ where /// * `n_worker` - Number of worker threads. /// * `f_map` - Function to apply to each element of the input. /// * `f_reduce` - Function to apply to the output of the Map. + /// + /// # Examples + /// + /// Given a vector of vectors, each one containing a set of numbers, + /// compute for each vector the square value of each of its elements. + /// Furthermore, compute for each vector the summation of all its elements. + /// + /// ``` + /// use ppl::{prelude::*, templates::misc::{SourceIter, SinkVec}, templates::map::MapReduce}; + /// + /// let mut counter = 1.0; + /// let mut set = Vec::new(); + /// + /// for i in 0..100000 { + /// let mut vector = Vec::new(); + /// for _i in 0..10 { + /// vector.push((i, counter)); + /// counter += 1.0; + /// } + /// counter = 1.0; + /// set.push(vector); + /// } + /// // Instantiate a new pipeline. + /// let pipe = pipeline![ + /// SourceIter::build(set.into_iter()), + /// MapReduce::build(8, + /// |el: (usize, f64)| -> (usize, f64) { + /// (el.0, el.1 * el.1) + /// }, + /// |i, vec| { + /// (i, vec.iter().sum()) + /// }), + /// SinkVec::build() + /// ]; + /// + /// let res: Vec> = pipe.start_and_wait_end().unwrap(); + /// ``` pub fn build( n_worker: usize, f_map: FMap, @@ -399,6 +598,10 @@ where /// * `f_reduce` - Function to apply to the output of the Map. /// # Panics /// Panics if n_replicas is 0. + /// # Remarks + /// The replicas are created by cloning the MapReduce node. + /// This mean that 4 replicas of a MapReduce node with 2 workers each + /// will result in the usage of 8 threads. pub fn build_with_replicas( n_worker: usize, f_map: FMap, @@ -443,7 +646,7 @@ where } } -/// Ordered Map Reduce +/// Ordered MapReduce /// /// Nodes of this type are composed of a Map and a Reduce. /// The Map is applied to each element of the input, and the Reduce is applied to the output of the Map. @@ -507,6 +710,56 @@ where /// * `f_reduce` - Function to apply to the output of the Map. /// # Panics /// Panics if n_replicas is 0. + /// # Remarks + /// The replicas are created by cloning the OrderedMapReduce node. + /// This mean that 4 replicas of an OrderedMapReduce node with 2 workers each + /// will result in the usage of 8 threads. + /// + /// # Examples + /// + /// Given a vector of vectors, each one containing a set of numbers, + /// compute for each vector the square value of each of its elements. + /// Furthermore, compute for each vector the summation of all its elements. + /// In this example we want mantain the order of the input in the output. + /// ``` + /// use ppl::{prelude::*, templates::misc::{SourceIter, OrderedSinkVec}, templates::map::OrderedMapReduce}; + /// + /// let mut counter = 1.0; + /// let mut set = Vec::new(); + /// + /// for i in 0..100000 { + /// let mut vector = Vec::new(); + /// for _i in 0..10 { + /// vector.push((i, counter)); + /// counter += 1.0; + /// } + /// counter = 1.0; + /// set.push(vector); + /// } + /// // Instantiate a new pipeline. + /// let pipe = pipeline![ + /// SourceIter::build(set.into_iter()), + /// OrderedMapReduce::build_with_replicas(2, 8, + /// |el: (usize, f64)| -> (usize, f64) { + /// (el.0, el.1 * el.1) + /// }, + /// |i, vec| { + /// (i, vec.iter().sum()) + /// }), + /// OrderedSinkVec::build() + /// ]; + /// + /// let res: Vec> = pipe.start_and_wait_end().unwrap(); + /// + /// // We check here also if the order of the input was preserved + /// // in the output. + /// for (check, vec) in res.into_iter().enumerate() { + /// assert_eq!(vec.len(), 1); + /// for el in vec { + /// assert_eq!(el, (check, 385.00)); + /// } + /// } + /// ``` pub fn build_with_replicas( n_worker: usize, n_replicas: usize, @@ -549,4 +802,325 @@ where fn number_of_replicas(&self) -> usize { self.replicas } + fn is_ordered(&self) -> bool { + true + } +} + +#[cfg(test)] +mod test { + use serial_test::serial; + + use super::{Map, OrderedMap, Reduce}; + use crate::{ + prelude::*, + templates::{ + map::MapReduce, + map::{OrderedMapReduce, OrderedReduce}, + misc::{OrderedSinkVec, SinkVec, SourceIter}, + }, + }; + + fn square(x: f64) -> f64 { + x * x + } + + #[test] + #[serial] + fn simple_map() { + let mut counter = 1.0; + let numbers: Vec = vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0]; + let mut vector = Vec::new(); + + for _i in 0..1000 { + vector.push(numbers.clone()); + } + + let pipe = pipeline![ + SourceIter::build(vector.into_iter()), + Map::build(4, |el: f64| square(el)), + SinkVec::build() + ]; + + let res: Vec> = pipe.start_and_wait_end().unwrap(); + + for vec in res { + for el in vec { + assert_eq!(el.sqrt(), counter); + counter += 1.0; + } + counter = 1.0; + } + + unsafe { + Orchestrator::delete_global_orchestrator(); + } + } + + #[test] + #[serial] + fn simple_map_replicated() { + let mut counter = 1.0; + let numbers: Vec = vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0]; + let mut vector = Vec::new(); + + for _i in 0..1000 { + vector.push(numbers.clone()); + } + + let pipe = pipeline![ + SourceIter::build(vector.into_iter()), + Map::build_with_replicas(4, 2, |el: f64| square(el)), + SinkVec::build() + ]; + + let res: Vec> = pipe.start_and_wait_end().unwrap(); + + for vec in res { + for el in vec { + assert_eq!(el.sqrt(), counter); + counter += 1.0; + } + counter = 1.0; + } + + unsafe { + Orchestrator::delete_global_orchestrator(); + } + } + + #[test] + #[serial] + fn simple_ordered_map() { + let mut counter = 1.0; + let mut vector = Vec::new(); + + for _i in 0..1000 { + let mut numbers = Vec::new(); + for _i in 0..10 { + numbers.push(counter); + counter += 1.0; + } + vector.push(numbers); + } + + let pipe = pipeline![ + SourceIter::build(vector.into_iter()), + OrderedMap::build(4, |el: f64| square(el)), + OrderedSinkVec::build() + ]; + + let res: Vec> = pipe.start_and_wait_end().unwrap(); + + counter = 1.0; + for vec in res { + for el in vec { + assert_eq!(el.sqrt(), counter); + counter += 1.0; + } + } + + unsafe { + Orchestrator::delete_global_orchestrator(); + } + } + + #[test] + #[serial] + fn simple_ordered_map_replicated() { + let mut counter = 1.0; + let mut vector = Vec::new(); + + for _i in 0..1000 { + let mut numbers = Vec::new(); + for _i in 0..10 { + numbers.push(counter); + counter += 1.0; + } + vector.push(numbers); + } + + let pipe = pipeline![ + SourceIter::build(vector.into_iter()), + OrderedMap::build_with_replicas(4, 2, |el: f64| square(el)), + OrderedSinkVec::build() + ]; + + let res: Vec> = pipe.start_and_wait_end().unwrap(); + + counter = 1.0; + for vec in res { + for el in vec { + assert_eq!(el.sqrt(), counter); + counter += 1.0; + } + } + + unsafe { + Orchestrator::delete_global_orchestrator(); + } + } + + #[test] + #[serial] + fn summation() { + let mut counter = 1; + let mut set = Vec::new(); + + for i in 0..1000 { + let mut vector = Vec::new(); + for _i in 0..10 { + vector.push((i, counter)); + counter += 1; + } + counter = 1; + set.push(vector); + } + + let pipe = pipeline![ + SourceIter::build(set.into_iter()), + Reduce::build(4, |i, vec| -> (i32, i32) { (i, vec.iter().sum()) }), + SinkVec::build() + ]; + + let res: Vec> = pipe.start_and_wait_end().unwrap(); + + assert_eq!(res.len(), 1000); + + for vec in res { + assert_eq!(vec.len(), 1); + for el in vec { + assert_eq!(el.1, 55); + } + } + + unsafe { + Orchestrator::delete_global_orchestrator(); + } + } + + #[test] + #[serial] + fn summation_ordered() { + let mut counter = 1; + let mut set = Vec::new(); + + for i in 0..1000 { + let mut vector = Vec::new(); + for _i in 0..10 { + vector.push((i, counter)); + counter += 1; + } + counter = 1; + set.push(vector); + } + + let pipe = pipeline![ + SourceIter::build(set.into_iter()), + OrderedReduce::build_with_replicas(2, 4, |i, vec| -> (usize, i32) { + (i, vec.iter().sum()) + }), + OrderedSinkVec::build() + ]; + + let res: Vec> = pipe.start_and_wait_end().unwrap(); + + assert_eq!(res.len(), 1000); + + for (check, vec) in res.into_iter().enumerate() { + assert_eq!(vec.len(), 1); + for el in vec { + assert_eq!(el, (check, 55)); + } + } + + unsafe { + Orchestrator::delete_global_orchestrator(); + } + } + + #[test] + #[serial] + fn summation_of_squares() { + let mut counter = 1.0; + let mut set = Vec::new(); + + for i in 0..100000 { + let mut vector = Vec::new(); + for _i in 0..10 { + vector.push((i, counter)); + counter += 1.0; + } + counter = 1.0; + set.push(vector); + } + + let pipe = pipeline![ + SourceIter::build(set.into_iter()), + MapReduce::build( + 8, + |el: (usize, f64)| -> (usize, f64) { (el.0, el.1 * el.1) }, + |i, vec| { (i, vec.iter().sum()) } + ), + SinkVec::build() + ]; + + let res: Vec> = pipe.start_and_wait_end().unwrap(); + + assert_eq!(res.len(), 100000); + + for vec in res { + assert_eq!(vec.len(), 1); + for el in vec { + assert_eq!(el.1, 385.00); + } + } + + unsafe { + Orchestrator::delete_global_orchestrator(); + } + } + + #[test] + #[serial] + fn summation_of_squares_ordered() { + let mut counter = 1.0; + let mut set = Vec::new(); + + for i in 0..100000 { + let mut vector = Vec::new(); + for _i in 0..10 { + vector.push((i, counter)); + counter += 1.0; + } + counter = 1.0; + set.push(vector); + } + + let pipe = pipeline![ + SourceIter::build(set.into_iter()), + OrderedMapReduce::build_with_replicas( + 2, + 4, + |el: (usize, f64)| -> (usize, f64) { (el.0, el.1 * el.1) }, + |i, vec| { (i, vec.iter().sum()) } + ), + OrderedSinkVec::build() + ]; + + let res: Vec> = pipe.start_and_wait_end().unwrap(); + + assert_eq!(res.len(), 100000); + + for (check, vec) in res.into_iter().enumerate() { + assert_eq!(vec.len(), 1); + for el in vec { + assert_eq!(el, (check, 385.00)); + } + } + + unsafe { + Orchestrator::delete_global_orchestrator(); + } + } } diff --git a/src/templates/misc.rs b/src/templates/misc.rs index 4ab4fa97..8d4baa0c 100644 --- a/src/templates/misc.rs +++ b/src/templates/misc.rs @@ -1,4 +1,4 @@ -use std::marker::PhantomData; +use std::{collections::VecDeque, marker::PhantomData}; use crate::pipeline::node::{In, InOut, Out}; @@ -17,8 +17,28 @@ where I: Iterator, T: Send + 'static, { - /// Creates a new source from a iterator. + /// Creates a new source from any type that implements the `Iterator` trait. /// The source will terminate when the iterator is exhausted. + /// + /// # Arguments + /// * `iterator` - Type that implements the [`Iterator`] trait + /// and represents the stream of data we want emit. + /// + /// # Examples + /// + /// In this example we create a source node using a [`SourceIter`] + /// template that emits numbers from 1 to 21. + /// + /// ``` + /// use ppl::{prelude::*, templates::misc::{SourceIter, Sequential, SinkVec}}; + /// + /// let p = pipeline![ + /// SourceIter::build(1..21), + /// Sequential::build(|el| { el }), + /// SinkVec::build() + /// ]; + /// let res = p.start_and_wait_end().unwrap(); + /// ``` pub fn build(iterator: I) -> impl Out { Self { iterator, @@ -39,9 +59,6 @@ where /// SinkVec. /// /// Sink node that accumulates data into a vector. -/// The sink will terminate when the upstream terminates. -/// The sink will produce a vector containing all the data received -/// from the upstream. pub struct SinkVec { data: Vec, } @@ -50,8 +67,28 @@ where T: Send + 'static, { /// Creates a new sink that accumulates data into a vector. - /// The sink will terminate when the upstream terminates. - /// The sink will produce a vector containing all the data received. + /// The sink will terminate when the stream terminates, producing + /// a vector containing all the data received. + /// + /// # Examples + /// + /// In this example we send element by element the content of a vector + /// through a pipeline. + /// Using the [`SinkVec`] template, we create a sink node that collects + /// the data received. + /// + /// ``` + /// use ppl::{prelude::*, templates::misc::{SourceIter, Sequential, SinkVec}}; + /// + /// let data = vec![1, 2, 3, 4, 5]; + /// let p = pipeline![ + /// SourceIter::build(data.into_iter()), + /// Sequential::build(|el| { el }), + /// SinkVec::build() + /// ]; + /// let res = p.start_and_wait_end().unwrap(); + /// assert_eq!(res, vec![1, 2, 3, 4, 5]) + /// ``` pub fn build() -> impl In> { Self { data: Vec::new() } } @@ -72,22 +109,56 @@ where /// /// This node receives a vector, split it into chunks of size `chunk_size` /// and send each chunk to the next node. -/// The node will terminate when the upstream terminates. #[derive(Clone)] pub struct Splitter { chunk_size: usize, - data: Vec, + n_replicas: usize, + data: VecDeque, } impl Splitter where T: Send + 'static + Clone, { /// Creates a new splitter node. - /// The node will terminate when the upstream terminates. + /// + /// # Arguments + /// * `chunk_size` - Number of elements for each chunk. + /// + /// # Examples + /// Given a stream of numbers, we create a pipeline with a splitter that + /// create vectors of two elements each. + /// + /// ``` + /// use ppl::{prelude::*, templates::misc::{SourceIter, Splitter, SinkVec, Aggregator}}; + /// + /// let vec = vec![1, 2, 3, 4, 5, 6, 7, 8]; + /// let p = pipeline![ + /// SourceIter::build(vec.into_iter()), + /// Aggregator::build(8), // We aggregate each element in a vector. + /// Splitter::build(2), // We split the received vector in 4 sub-vector of size 2. + /// SinkVec::build() + /// ]; + /// let mut res = p.start_and_wait_end().unwrap(); + /// assert_eq!(res.len(), 4) + /// ``` pub fn build(chunk_size: usize) -> impl InOut, Vec> { Self { chunk_size, - data: Vec::new(), + n_replicas: 1, + data: VecDeque::new(), + } + } + + /// Creates a new splitter node with 'n_replicas' replicas of the same node. + /// + /// # Arguments + /// * `n_replicas` - Number of replicas. + /// * `chunk_size` - Number of elements for each chunk. + pub fn build_with_replicas(n_replicas: usize, chunk_size: usize) -> impl InOut, Vec> { + Self { + chunk_size, + n_replicas, + data: VecDeque::new(), } } } @@ -99,13 +170,18 @@ where self.data.extend(input); None } + fn number_of_replicas(&self) -> usize { + self.n_replicas + } fn is_producer(&self) -> bool { true } fn produce(&mut self) -> Option> { if self.data.len() >= self.chunk_size { let mut chunk = Vec::new(); - std::mem::swap(&mut chunk, &mut self.data); + for _i in 0..self.chunk_size { + chunk.push(self.data.pop_front().unwrap()) + } Some(chunk) } else { None @@ -117,22 +193,54 @@ where /// /// This node receives elements and accumulates them into a vector. /// When the vector reaches the size `chunk_size` it send the vector with the elements accumulated to the next node. -/// The node will terminate when the upstream terminates. #[derive(Clone)] pub struct Aggregator { chunk_size: usize, - data: Vec, + n_replicas: usize, + data: VecDeque, } impl Aggregator where T: Send + 'static + Clone, { - /// Creates a new aggregator node - /// The node will terminate when the upstream terminates. + /// Creates a new aggregator node. + /// + /// # Arguments + /// * `chunk_size` - Number of elements for each chunk. + /// + /// # Examples + /// Given a stream of numbers, we use an [`Aggregator`] template to + /// group the elements of this stream in vectors of size 100. + /// + /// ``` + /// use ppl::{prelude::*, templates::misc::{SourceIter, SinkVec, Aggregator}}; + /// + /// let p = pipeline![ + /// SourceIter::build(0..2000), + /// Aggregator::build(100), + /// SinkVec::build() + /// ]; + /// let res = p.start_and_wait_end().unwrap(); + /// assert_eq!(res.len(), 20); + /// ``` pub fn build(chunk_size: usize) -> impl InOut> { Self { chunk_size, - data: Vec::new(), + n_replicas: 1, + data: VecDeque::new(), + } + } + + /// Creates a new aggregator node with 'n_replicas' replicas of the same node. + /// + /// # Arguments + /// * `n_replicas` - Number of replicas. + /// * `chunk_size` - Number of elements for each chunk. + pub fn build_with_replicas(n_replicas: usize, chunk_size: usize) -> impl InOut> { + Self { + chunk_size, + n_replicas, + data: VecDeque::new(), } } } @@ -141,22 +249,21 @@ where T: Send + 'static + Clone, { fn run(&mut self, input: T) -> Option> { - self.data.push(input); - if self.data.len() >= self.chunk_size { - let mut chunk = Vec::new(); - std::mem::swap(&mut chunk, &mut self.data); - Some(chunk) - } else { - None - } + self.data.push_back(input); + None + } + fn number_of_replicas(&self) -> usize { + self.n_replicas } fn is_producer(&self) -> bool { true } fn produce(&mut self) -> Option> { - if !self.data.is_empty() { + if self.data.len() >= self.chunk_size { let mut chunk = Vec::new(); - std::mem::swap(&mut chunk, &mut self.data); + for _i in 0..self.chunk_size { + chunk.push(self.data.pop_front().unwrap()) + } Some(chunk) } else { None @@ -166,7 +273,7 @@ where /// Sequential node. /// -/// Given a function that defines the logic of the stage, this method will create a stage with one replica. +/// Given a function that defines the logic of the node, this method will create a node with one replica. #[derive(Clone)] pub struct Sequential where @@ -184,7 +291,10 @@ where F: FnMut(T) -> U + Send + 'static + Clone, { /// Creates a new sequential node. - /// The node will terminate when the upstream terminates. + /// + /// # Arguments + /// * `f` - Function name or lambda function that specify the logic + /// of this node. pub fn build(f: F) -> impl InOut { Self { f, @@ -205,7 +315,7 @@ where /// Parallel node. /// -/// Given a function that defines the logic of the stage, this method will create 'n_replicas' replicas of that stage. +/// Given a function that defines the logic of the node, this method will create 'n_replicas' replicas of that node. #[derive(Clone)] pub struct Parallel where @@ -223,8 +333,12 @@ where U: Send + 'static + Clone, F: FnMut(T) -> U + Send + 'static + Clone, { - /// Creates a new parallel node - /// The node will terminate when the upstream terminates. + /// Creates a new parallel node. + /// + /// # Arguments + /// * `n_replicas` - Number of replicas. + /// * `f` - Function name or lambda function that specify the logic + /// of this node. pub fn build(n_replicas: usize, f: F) -> impl InOut { Self { n_replicas, @@ -250,7 +364,6 @@ where /// Filter. /// /// This node receives elements and filters them according to the given predicate. -/// The node will terminate when the upstream terminates. #[derive(Clone)] pub struct Filter where @@ -267,7 +380,26 @@ where F: FnMut(&T) -> bool + Send + 'static + Clone, { /// Creates a new filter node. - /// The node will terminate when the upstream terminates. + /// + /// # Arguments + /// * `f` - Function name or lambda function that represent the predicate + /// function we want to apply. + /// + /// # Examples + /// Given a set of numbers from 0 to 199, we use a [`Filter`] + /// template to filter the even numbers. + /// ``` + /// use ppl::{prelude::*, templates::misc::{SourceIter, SinkVec, Filter}}; + /// + /// let p = pipeline![ + /// SourceIter::build(0..200), + /// Filter::build(|el| { el % 2 == 0 }), + /// SinkVec::build() + /// ]; + /// + /// let res = p.start_and_wait_end().unwrap(); + /// assert_eq!(res.len(), 100) + /// ``` pub fn build(f: F) -> impl InOut { Self { f, @@ -276,7 +408,11 @@ where } } /// Creates a new filter node with 'n_replicas' replicas of the same node. - /// The node will terminate when the upstream terminates. + /// + /// # Arguments + /// * `n_replicas` - Number of replicas. + /// * `f` - Function name or lambda function that represent the predicate + /// function we want to apply. pub fn build_with_replicas(n_replicas: usize, f: F) -> impl InOut { Self { f, @@ -307,10 +443,9 @@ where /// OrderedSinkVec. /// /// Sink node that accumulates data into a vector. -/// This is a ordered version of SinkVec. -/// The sink will terminate when the upstream terminates. +/// This is an ordered version of [`SinkVec`]. /// The sink will produce a vector containing all the data received in the same order -/// as it was received from the upstream. +/// as it was received. pub struct OrderedSinkVec { data: Vec, } @@ -319,10 +454,8 @@ where T: Send + 'static, { /// Creates a new sink that accumulates data into a vector. - /// This is a ordered version of SinkVec. - /// The sink will terminate when the upstream terminates. - /// The sink will produce a vector containing all the data received in the same order - /// as it was received from the upstream. + /// The sink will terminate when the stream terminates, producing + /// a vector containing all the data received. pub fn build() -> impl In> { Self { data: Vec::new() } } @@ -346,37 +479,38 @@ where /// /// This node receives a vector, split it into chunks of size `chunk_size` /// and send each chunk to the next node. -/// This is a ordered version of Splitter. -/// The node will terminate when the upstream terminates. -/// The node will produce data in the same order as it is received from the upstream. +/// This is an ordered versione of [`Splitter`]. +/// This node mantains the order of the input in the output. #[derive(Clone)] pub struct OrderedSplitter { chunk_size: usize, n_replicas: usize, - data: Vec, + data: VecDeque, } impl OrderedSplitter where T: Send + 'static + Clone, { - /// Creates a new splitter node. - /// The node will terminate when the upstream terminates. - /// The node will produce data in the same order as it is received from the upstream. + /// Creates a new ordered splitter node. + /// # Arguments + /// * `chunk_size` - Number of elements for each chunk. pub fn build(chunk_size: usize) -> impl InOut, Vec> { Self { chunk_size, n_replicas: 1, - data: Vec::new(), + data: VecDeque::new(), } } - /// Creates a new splitter node with 'n_replicas' replicas of the same node. - /// The node will terminate when the upstream terminates. - /// The node will produce data in the same order as it is received from the upstream. - pub fn build_with_replicas(chunk_size: usize, n_replicas: usize) -> impl InOut, Vec> { + + /// Creates a new ordered splitter node with 'n_replicas' replicas of the same node. + /// # Arguments + /// * `n_replicas` - Number of replicas. + /// * `chunk_size` - Number of elements for each chunk. + pub fn build_with_replicas(n_replicas: usize, chunk_size: usize) -> impl InOut, Vec> { Self { chunk_size, n_replicas, - data: Vec::new(), + data: VecDeque::new(), } } } @@ -388,16 +522,18 @@ where self.data.extend(input); None } - fn is_producer(&self) -> bool { - true - } fn number_of_replicas(&self) -> usize { self.n_replicas } + fn is_producer(&self) -> bool { + true + } fn produce(&mut self) -> Option> { if self.data.len() >= self.chunk_size { let mut chunk = Vec::new(); - std::mem::swap(&mut chunk, &mut self.data); + for _i in 0..self.chunk_size { + chunk.push(self.data.pop_front().unwrap()) + } Some(chunk) } else { None @@ -411,39 +547,40 @@ where /// OrderedAggregator. /// /// This node receives elements and accumulates them into a vector. -/// When the vector reaches the size `chunk_size` it send the vector -/// with the elements accumulated to the next node. -/// This is a ordered version of Aggregator. -/// The node will terminate when the upstream terminates. -/// The node will produce data in the same order as it is received from the upstream. +/// When the vector reaches the size `chunk_size` it send the vector with the elements accumulated to the next node. +/// This is an ordered version of [`Aggregator`]. +/// This node mantains the order of the input in the output. #[derive(Clone)] pub struct OrderedAggregator { chunk_size: usize, n_replicas: usize, - data: Vec, + data: VecDeque, } impl OrderedAggregator where T: Send + 'static + Clone, { - /// Creates a new aggregator node. - /// The node will terminate when the upstream terminates. - /// The node will produce data in the same order as it is received from the upstream. + /// Creates a new ordered aggregator node + /// + /// # Arguments + /// * `chunk_size` - Number of elements for each chunk. pub fn build(chunk_size: usize) -> impl InOut> { Self { chunk_size, n_replicas: 1, - data: Vec::new(), + data: VecDeque::new(), } } - /// Creates a new aggregator nod with 'n_replicas' replicas of the same node. - /// The node will terminate when the upstream terminates. - /// The node will produce data in the same order as it is received from the upstream. - pub fn build_with_replicas(chunk_size: usize, n_replicas: usize) -> impl InOut> { + + /// Creates a new ordered aggregator nod with 'n_replicas' replicas of the same node. + /// # Arguments + /// * `n_replicas` - Number of replicas. + /// * `chunk_size` - Number of elements for each chunk. + pub fn build_with_replicas(n_replicas: usize, chunk_size: usize) -> impl InOut> { Self { chunk_size, n_replicas, - data: Vec::new(), + data: VecDeque::new(), } } } @@ -452,30 +589,26 @@ where T: Send + 'static + Clone, { fn run(&mut self, input: T) -> Option> { - self.data.push(input); - if self.data.len() >= self.chunk_size { - let mut chunk = Vec::new(); - std::mem::swap(&mut chunk, &mut self.data); - Some(chunk) - } else { - None - } + self.data.push_back(input); + None + } + fn number_of_replicas(&self) -> usize { + self.n_replicas } fn is_producer(&self) -> bool { true } fn produce(&mut self) -> Option> { - if !self.data.is_empty() { + if self.data.len() >= self.chunk_size { let mut chunk = Vec::new(); - std::mem::swap(&mut chunk, &mut self.data); + for _i in 0..self.chunk_size { + chunk.push(self.data.pop_front().unwrap()) + } Some(chunk) } else { None } } - fn number_of_replicas(&self) -> usize { - self.n_replicas - } fn is_ordered(&self) -> bool { true } @@ -484,8 +617,7 @@ where /// OrderedSequential. /// /// This node receives elements and applies a function to each element. -/// This is a ordered version of Sequential. -/// The node will terminate when the upstream terminates. +/// This is an ordered version of [`Sequential`]. /// The node will produce data in the same order as it is received from the upstream. #[derive(Clone)] pub struct OrderedSequential { @@ -499,8 +631,9 @@ where F: FnMut(T) -> U + Send + 'static + Clone, { /// Creates a new sequential node. - /// The node will terminate when the upstream terminates. - /// The node will produce data in the same order as it is received from the upstream. + /// # Arguments + /// * `f` - Function name or lambda function that specify the logic + /// of this node. pub fn build(f: F) -> impl InOut { Self { f, @@ -525,8 +658,7 @@ where /// OrderedParallel. /// /// This node receives elements and applies a function to each element. -/// This is a ordered version of Parallel. -/// The node will terminate when the upstream terminates. +/// This is an ordered version of [`Parallel`]. /// The node will produce data in the same order as it is received from the upstream. #[derive(Clone)] pub struct OrderedParallel { @@ -541,8 +673,10 @@ where F: FnMut(T) -> U + Send + 'static + Clone, { /// Creates a new parallel node. - /// The node will terminate when the upstream terminates. - /// The node will produce data in the same order as it is received from the upstream. + /// # Arguments + /// * `n_replicas` - Number of replicas. + /// * `f` - Function name or lambda function that specify the logic + /// of this node. pub fn build(n_replicas: usize, f: F) -> impl InOut { Self { f, @@ -571,8 +705,7 @@ where /// OrderedFilter. /// /// This node receives elements and filters them according to a predicate. -/// This is a ordered version of Filter. -/// The node will terminate when the upstream terminates. +/// This is an ordered version of [`Filter`]. #[derive(Clone)] pub struct OrderedFilter { f: F, @@ -585,7 +718,9 @@ where F: FnMut(&T) -> bool + Send + 'static + Clone, { /// Creates a new filter node. - /// The node will terminate when the upstream terminates. + /// # Arguments + /// * `f` - Function name or lambda function that represent the predicate + /// function we want to apply. pub fn build(f: F) -> impl InOut { Self { f, @@ -594,8 +729,11 @@ where } } /// Creates a new filter node. - /// The node will terminate when the upstream terminates. - pub fn build_with_replicas(f: F, n_replicas: usize) -> impl InOut { + /// # Arguments + /// * `n_replicas` - Number of replicas. + /// * `f` - Function name or lambda function that represent the predicate + /// function we want to apply. + pub fn build_with_replicas(n_replicas: usize, f: F) -> impl InOut { Self { f, n_replicas, @@ -622,3 +760,234 @@ where self.n_replicas } } + +#[cfg(test)] +mod test { + use crate::{ + prelude::*, + templates::misc::{ + Filter, OrderedAggregator, OrderedFilter, OrderedParallel, OrderedSequential, + OrderedSinkVec, OrderedSplitter, Parallel, Sequential, SinkVec, SourceIter, + }, + }; + use serial_test::serial; + + use super::{Aggregator, Splitter}; + + #[test] + #[serial] + fn simple_pipeline() { + let p = pipeline![ + SourceIter::build(1..21), + Sequential::build(|el| { + el /*println!("Hello, received: {}", el); */ + }), + SinkVec::build() + ]; + + let res = p.start_and_wait_end().unwrap(); + + assert_eq!(res.len(), 20) + } + + #[test] + #[serial] + fn simple_pipeline_ordered() { + let p = pipeline![ + SourceIter::build(1..21), + OrderedSequential::build(|el| { + el /*println!("Hello, received: {}", el); */ + }), + OrderedSinkVec::build() + ]; + + let res = p.start_and_wait_end().unwrap(); + + assert_eq!(res.len(), 20); + + let mut counter = 1; + for el in res { + assert_eq!(el, counter); + counter += 1; + } + } + + #[test] + #[serial] + fn simple_farm() { + let p = pipeline![ + SourceIter::build(1..21), + Parallel::build(8, |el| { + el /*println!("Hello, received: {}", el); */ + }), + SinkVec::build() + ]; + + let res = p.start_and_wait_end().unwrap(); + + assert_eq!(res.len(), 20) + } + + #[test] + #[serial] + fn simple_farm_ordered() { + let p = pipeline![ + SourceIter::build(1..21), + OrderedParallel::build(8, |el| { + el /*println!("Hello, received: {}", el); */ + }), + OrderedSinkVec::build() + ]; + + let res = p.start_and_wait_end().unwrap(); + + assert_eq!(res.len(), 20); + + let mut counter = 1; + for el in res { + assert_eq!(el, counter); + counter += 1; + } + } + + #[test] + #[serial] + fn splitter() { + let mut counter = 1; + let mut set = Vec::new(); + + for i in 0..1000 { + let mut vector = Vec::new(); + for _i in 0..20 { + vector.push((i, counter)); + counter += 1; + } + counter = 1; + set.push(vector); + } + + let p = pipeline![ + SourceIter::build(set.into_iter()), + Splitter::build_with_replicas(2, 2), + Splitter::build(20000), + SinkVec::build() + ]; + + let mut res = p.start_and_wait_end().unwrap(); + + assert_eq!(res.len(), 1); + + let vec = res.pop().unwrap(); + + assert_eq!(vec.len(), 20000) + } + + #[test] + #[serial] + fn splitter_ordered() { + let mut counter = 1; + let mut set = Vec::new(); + + for _i in 0..1000 { + let mut vector = Vec::new(); + for _i in 0..20 { + vector.push(counter); + counter += 1; + } + set.push(vector); + } + + let p = pipeline![ + SourceIter::build(set.into_iter()), + OrderedSplitter::build_with_replicas(2, 10), + OrderedSplitter::build_with_replicas(4, 1), + OrderedSplitter::build(20000), + OrderedSinkVec::build() + ]; + + let mut res = p.start_and_wait_end().unwrap(); + + assert_eq!(res.len(), 1); + + let vec = res.pop().unwrap(); + + assert_eq!(vec.len(), 20000); + + counter = 1; + for el in vec { + assert_eq!(el, counter); + counter += 1; + } + } + + #[test] + #[serial] + fn aggregator() { + let p = pipeline![ + SourceIter::build(0..2000), + Aggregator::build(100), + SinkVec::build() + ]; + + let res = p.start_and_wait_end().unwrap(); + + assert_eq!(res.len(), 20); + + for vec in res { + assert_eq!(vec.len(), 100); + } + } + + #[test] + #[serial] + fn aggregator_ordered() { + let p = pipeline![ + SourceIter::build(0..2000), + OrderedAggregator::build_with_replicas(4, 100), + OrderedSplitter::build(1), + OrderedSinkVec::build() + ]; + + let res = p.start_and_wait_end().unwrap(); + + assert_eq!(res.len(), 2000); + + for vec in res { + assert_eq!(vec.len(), 1); + } + } + + #[test] + #[serial] + fn filter() { + let p = pipeline![ + SourceIter::build(0..200), + Filter::build(|el| { el % 2 == 0 }), + SinkVec::build() + ]; + + let res = p.start_and_wait_end().unwrap(); + + assert_eq!(res.len(), 100) + } + + #[test] + #[serial] + fn filter_ordered() { + let p = pipeline![ + SourceIter::build(0..200), + OrderedFilter::build_with_replicas(4, |el| { el % 2 == 0 }), + OrderedSinkVec::build() + ]; + + let res = p.start_and_wait_end().unwrap(); + + assert_eq!(res.len(), 100); + + let mut counter = 0; + for el in res { + assert_eq!(el, counter); + counter += 2; + } + } +} diff --git a/src/thread_pool/mod.rs b/src/thread_pool/mod.rs index 827f9fd4..ac7f893c 100644 --- a/src/thread_pool/mod.rs +++ b/src/thread_pool/mod.rs @@ -222,9 +222,9 @@ impl ThreadPool { /// Create a new thread pool. /// If the environment variable `PPL_MAX_CORES` is set, the capacity of the thread - /// pool is set to that value. Otherwise, the number of logical threads available + /// pool is set to that value. Otherwise, the number of logical threads available /// on the host machine is used instead. - /// + /// /// /// # Examples /// @@ -554,4 +554,3 @@ impl<'pool, 'scope> Scope<'pool, 'scope> { self.pool.execute(task); } } - diff --git a/src/thread_pool/test.rs b/src/thread_pool/test.rs index d9ad5fbb..742f8de9 100644 --- a/src/thread_pool/test.rs +++ b/src/thread_pool/test.rs @@ -148,6 +148,23 @@ fn test_par_map_reduce() { assert!(check) } +#[test] +#[serial] +fn test_par_reduce() { + let mut pool = ThreadPool::new(); + + let mut vec = Vec::new(); + for i in 0..100 { + vec.push((i % 10, i)); + } + + let res: Vec<(i32, i32)> = pool + .par_reduce(vec, |k, v| -> (i32, i32) { (k, v.iter().sum()) }) + .collect(); + + assert_eq!(res.len(), 10); +} + #[test] #[serial] fn test_par_map_reduce_seq() { @@ -203,7 +220,6 @@ fn test_simple_map() { let mut counter = 1.0; let mut numbers: Vec = vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0]; - // Transform the vec of integers into a vec of strings let res: Vec = pool.par_map(&mut numbers, |el| square(*el)).collect(); for el in res {