From ef9297bba33bb1a60968e35709ea425f615cdf67 Mon Sep 17 00:00:00 2001 From: Valerio Besozzi Date: Tue, 24 Oct 2023 11:25:29 +0200 Subject: [PATCH 01/13] Add example and test for the Map template. Minor cleaning in pipeline! macro. --- src/pipeline/mod.rs | 8 ++--- src/templates/map.rs | 75 +++++++++++++++++++++++++++++++++++++++++ src/thread_pool/test.rs | 1 - 3 files changed, 79 insertions(+), 5 deletions(-) diff --git a/src/pipeline/mod.rs b/src/pipeline/mod.rs index 0b7c7640..db27aafc 100644 --- a/src/pipeline/mod.rs +++ b/src/pipeline/mod.rs @@ -110,7 +110,7 @@ where macro_rules! propagate { ($s1:expr) => { { - let mut block = InNode::new(Box::new($s1), get_global_orchestrator()); + let block = InNode::new(Box::new($s1), get_global_orchestrator()); block } }; @@ -118,7 +118,7 @@ macro_rules! propagate { ($s1:expr $(, $tail:expr)*) => { { let node = ($s1); - let mut block = InOutNode::new(Box::new(node), + let block = InOutNode::new(Box::new(node), propagate!($($tail),*), get_global_orchestrator()); block @@ -168,10 +168,10 @@ macro_rules! pipeline { ($s1:expr $(, $tail:expr)*) => { { let orchestrator = get_global_orchestrator(); - let mut block = OutNode::new(Box::new($s1), + let block = OutNode::new(Box::new($s1), propagate!($($tail),*), orchestrator); - let mut pipeline = Pipeline::new(block); + let pipeline = Pipeline::new(block); pipeline } }; diff --git a/src/templates/map.rs b/src/templates/map.rs index dffc8f17..b69f9e5f 100644 --- a/src/templates/map.rs +++ b/src/templates/map.rs @@ -26,6 +26,33 @@ where /// # Arguments /// * `n_worker` - Number of worker threads. /// * `f` - Function to apply to each element of the input. + /// + /// # Examples + /// + /// Given a vector of vectors, each one containing a set of numbers, + /// compute the square value of each number contained in each + /// vector. + /// + /// ``` + /// use ppl::{prelude::*, templates::misc::{SourceIter, SinkVec}, templates::map::Map}; + /// + /// let mut counter = 1.0; + /// let numbers: Vec = vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0]; + /// let mut vector = Vec::new(); + /// + /// // Create the vector of vectors. + /// for _i in 0..1000 { + /// vector.push(numbers.clone()); + /// } + /// // Instantiate a new Pipeline with a Map operator. + /// let pipe = pipeline![ + /// SourceIter::build(vector.into_iter()), + /// Map::build(4, |el: f64| el * el), + /// SinkVec::build() + /// ]; + /// // Start the pipeline and collect the results. + /// let res: Vec> = pipe.start_and_wait_end().unwrap(); + /// ``` pub fn build(n_worker: usize, f: F) -> impl InOut where TInIter: IntoIterator, @@ -550,3 +577,51 @@ where self.replicas } } + +#[cfg(test)] +mod test { +use serial_test::serial; + +use crate::{prelude::*, templates::misc::{SourceIter, SinkVec}}; +use super::Map; + + + + +fn square(x: f64) -> f64 { + x * x +} + +#[test] +#[serial] +fn simple_map() { + let mut counter = 1.0; + let numbers: Vec = vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0]; + let mut vector = Vec::new(); + + for _i in 0..1000 { + vector.push(numbers.clone()); + } + + + let pipe = pipeline![ + SourceIter::build(vector.into_iter()), + Map::build(4, |el: f64| square(el)), + SinkVec::build() + ]; + + let res: Vec> = pipe.start_and_wait_end().unwrap(); + + for vec in res { + for el in vec { + assert_eq!(el.sqrt(), counter); + counter += 1.0; + } + counter = 1.0; + } + + unsafe { + Orchestrator::delete_global_orchestrator(); + } +} +} \ No newline at end of file diff --git a/src/thread_pool/test.rs b/src/thread_pool/test.rs index d9ad5fbb..b98680e1 100644 --- a/src/thread_pool/test.rs +++ b/src/thread_pool/test.rs @@ -203,7 +203,6 @@ fn test_simple_map() { let mut counter = 1.0; let mut numbers: Vec = vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0]; - // Transform the vec of integers into a vec of strings let res: Vec = pool.par_map(&mut numbers, |el| square(*el)).collect(); for el in res { From 12bfabc2ae559e545a0e390d9717f56bfdd62188 Mon Sep 17 00:00:00 2001 From: Valerio Besozzi Date: Tue, 24 Oct 2023 11:31:18 +0200 Subject: [PATCH 02/13] Add docs --- src/templates/map.rs | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/src/templates/map.rs b/src/templates/map.rs index b69f9e5f..e7b59b4a 100644 --- a/src/templates/map.rs +++ b/src/templates/map.rs @@ -75,6 +75,8 @@ where /// Panics if n_replicas is 0. /// # Remarks /// The replicas are created by cloning the Map node. + /// This mean that 4 replicas of a Map node with 2 workers each + /// will result in the usage of 8 threads. pub fn build_with_replicas( n_worker: usize, n_replicas: usize, @@ -155,6 +157,10 @@ where /// * `f` - Function to apply to each element of the input. /// # Panics /// Panics if n_replicas is 0. + /// # Remarks + /// The replicas are created by cloning the OrderedMap node. + /// This mean that 4 replicas of an Ordered Map node with 2 workers each + /// will result in the usage of 8 threads. pub fn build_with_replicas( n_worker: usize, n_replicas: usize, @@ -240,6 +246,10 @@ where /// * `f` - Function to apply to each element of the input. /// # Panics /// Panics if n_replicas is 0. + /// # Remarks + /// The replicas are created by cloning the Reduce node. + /// This mean that 4 replicas of a Reduce node with 2 workers each + /// will result in the usage of 8 threads. pub fn build_with_replicas( n_worker: usize, n_replicas: usize, @@ -325,6 +335,10 @@ where /// * `f` - Function to apply to each element of the input. /// # Panics /// Panics if n_replicas is 0. + /// # Remarks + /// The replicas are created by cloning the OrderedReduce node. + /// This mean that 4 replicas of an OrderedReduce node with 2 workers each + /// will result in the usage of 8 threads. pub fn build_with_replicas( n_worker: usize, n_replicas: usize, @@ -426,6 +440,10 @@ where /// * `f_reduce` - Function to apply to the output of the Map. /// # Panics /// Panics if n_replicas is 0. + /// # Remarks + /// The replicas are created by cloning the MapReduce node. + /// This mean that 4 replicas of a MapReduce node with 2 workers each + /// will result in the usage of 8 threads. pub fn build_with_replicas( n_worker: usize, f_map: FMap, @@ -534,6 +552,10 @@ where /// * `f_reduce` - Function to apply to the output of the Map. /// # Panics /// Panics if n_replicas is 0. + /// # Remarks + /// The replicas are created by cloning the OrderedMapReduce node. + /// This mean that 4 replicas of an OrderedMapReduce node with 2 workers each + /// will result in the usage of 8 threads. pub fn build_with_replicas( n_worker: usize, n_replicas: usize, From 4c0e52785f8e9c4c6f255ad27058abc3e658c51d Mon Sep 17 00:00:00 2001 From: Valerio Besozzi Date: Tue, 24 Oct 2023 11:53:15 +0200 Subject: [PATCH 03/13] Add Example and Test for OrderedMap --- src/templates/map.rs | 76 ++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 74 insertions(+), 2 deletions(-) diff --git a/src/templates/map.rs b/src/templates/map.rs index e7b59b4a..34f89b5d 100644 --- a/src/templates/map.rs +++ b/src/templates/map.rs @@ -161,6 +161,41 @@ where /// The replicas are created by cloning the OrderedMap node. /// This mean that 4 replicas of an Ordered Map node with 2 workers each /// will result in the usage of 8 threads. + /// + /// # Examples + /// + /// Given a vector of vectors, each one containing a set of numbers, + /// compute the square value of each number contained in each + /// vector. + /// In this case, using the OrderedMap template, it is possible + /// to mantain the order of the input in the output. + /// + /// ``` + /// use ppl::{prelude::*, templates::misc::{SourceIter, OrderedSinkVec}, templates::map::OrderedMap}; + /// let mut counter = 1.0; + /// let mut vector = Vec::new(); + /// + /// // Create a vector of vectors, each one containing a set of numbers. + /// for _i in 0..1000{ + /// let mut numbers = Vec::new(); + /// for _i in 0..10 { + /// numbers.push(counter); + /// counter += 1.0; + /// } + /// vector.push(numbers); + /// } + /// + /// // Instantiate the pipeline. + /// let pipe = pipeline![ + /// SourceIter::build(vector.into_iter()), + /// OrderedMap::build_with_replicas(4, 2, |el: f64| el * el), + /// OrderedSinkVec::build() + /// ]; + /// + /// // Start the pipeline and collect the results. + /// let res: Vec> = pipe.start_and_wait_end().unwrap(); + /// ``` + pub fn build_with_replicas( n_worker: usize, n_replicas: usize, @@ -604,8 +639,8 @@ where mod test { use serial_test::serial; -use crate::{prelude::*, templates::misc::{SourceIter, SinkVec}}; -use super::Map; +use crate::{prelude::*, templates::misc::{SourceIter, SinkVec, OrderedSinkVec}}; +use super::{Map, OrderedMap}; @@ -646,4 +681,41 @@ fn simple_map() { Orchestrator::delete_global_orchestrator(); } } + +#[test] +#[serial] +fn simple_ordered_map() { + let mut counter = 1.0; + let mut vector = Vec::new(); + + for _i in 0..1000{ + let mut numbers = Vec::new(); + for _i in 0..10 { + numbers.push(counter); + counter += 1.0; + } + vector.push(numbers); + } + + + let pipe = pipeline![ + SourceIter::build(vector.into_iter()), + OrderedMap::build_with_replicas(4, 2, |el: f64| square(el)), + OrderedSinkVec::build() + ]; + + let res: Vec> = pipe.start_and_wait_end().unwrap(); + + counter = 1.0; + for vec in res { + for el in vec { + assert_eq!(el.sqrt(), counter); + counter += 1.0; + } + } + + unsafe { + Orchestrator::delete_global_orchestrator(); + } +} } \ No newline at end of file From cf0a522154ccdfbdc797d5a2e0fb7884ce73d2c6 Mon Sep 17 00:00:00 2001 From: Valerio Besozzi Date: Tue, 24 Oct 2023 11:59:40 +0200 Subject: [PATCH 04/13] Add Docs --- src/templates/map.rs | 73 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 73 insertions(+) diff --git a/src/templates/map.rs b/src/templates/map.rs index 34f89b5d..309ceb5c 100644 --- a/src/templates/map.rs +++ b/src/templates/map.rs @@ -169,6 +169,9 @@ where /// vector. /// In this case, using the OrderedMap template, it is possible /// to mantain the order of the input in the output. + /// Moreover, using the `build_with_replicas` method, + /// we can create a stage consisting of four map operator, + /// each one composed by 2 worker. /// /// ``` /// use ppl::{prelude::*, templates::misc::{SourceIter, OrderedSinkVec}, templates::map::OrderedMap}; @@ -682,6 +685,39 @@ fn simple_map() { } } +#[test] +#[serial] +fn simple_map_replicated() { + let mut counter = 1.0; + let numbers: Vec = vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0]; + let mut vector = Vec::new(); + + for _i in 0..1000 { + vector.push(numbers.clone()); + } + + + let pipe = pipeline![ + SourceIter::build(vector.into_iter()), + Map::build_with_replicas(4, 2, |el: f64| square(el)), + SinkVec::build() + ]; + + let res: Vec> = pipe.start_and_wait_end().unwrap(); + + for vec in res { + for el in vec { + assert_eq!(el.sqrt(), counter); + counter += 1.0; + } + counter = 1.0; + } + + unsafe { + Orchestrator::delete_global_orchestrator(); + } +} + #[test] #[serial] fn simple_ordered_map() { @@ -698,6 +734,43 @@ fn simple_ordered_map() { } + let pipe = pipeline![ + SourceIter::build(vector.into_iter()), + OrderedMap::build(4, |el: f64| square(el)), + OrderedSinkVec::build() + ]; + + let res: Vec> = pipe.start_and_wait_end().unwrap(); + + counter = 1.0; + for vec in res { + for el in vec { + assert_eq!(el.sqrt(), counter); + counter += 1.0; + } + } + + unsafe { + Orchestrator::delete_global_orchestrator(); + } +} + +#[test] +#[serial] +fn simple_ordered_map_replicated() { + let mut counter = 1.0; + let mut vector = Vec::new(); + + for _i in 0..1000{ + let mut numbers = Vec::new(); + for _i in 0..10 { + numbers.push(counter); + counter += 1.0; + } + vector.push(numbers); + } + + let pipe = pipeline![ SourceIter::build(vector.into_iter()), OrderedMap::build_with_replicas(4, 2, |el: f64| square(el)), From 38e8ec25bf7f5272a3370a1c5aae7f4f540c4351 Mon Sep 17 00:00:00 2001 From: Valerio Besozzi Date: Wed, 25 Oct 2023 12:17:29 +0200 Subject: [PATCH 05/13] Add Example and unit test for reduce, both for templates and threadpool --- src/templates/map.rs | 339 ++++++++++++++++++++++++---------------- src/thread_pool/mod.rs | 5 +- src/thread_pool/test.rs | 17 ++ 3 files changed, 226 insertions(+), 135 deletions(-) diff --git a/src/templates/map.rs b/src/templates/map.rs index 309ceb5c..1028a1d8 100644 --- a/src/templates/map.rs +++ b/src/templates/map.rs @@ -26,16 +26,16 @@ where /// # Arguments /// * `n_worker` - Number of worker threads. /// * `f` - Function to apply to each element of the input. - /// + /// /// # Examples - /// - /// Given a vector of vectors, each one containing a set of numbers, + /// + /// Given a vector of vectors, each one containing a set of numbers, /// compute the square value of each number contained in each /// vector. - /// + /// /// ``` /// use ppl::{prelude::*, templates::misc::{SourceIter, SinkVec}, templates::map::Map}; - /// + /// /// let mut counter = 1.0; /// let numbers: Vec = vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0]; /// let mut vector = Vec::new(); @@ -76,7 +76,7 @@ where /// # Remarks /// The replicas are created by cloning the Map node. /// This mean that 4 replicas of a Map node with 2 workers each - /// will result in the usage of 8 threads. + /// will result in the usage of 8 threads. pub fn build_with_replicas( n_worker: usize, n_replicas: usize, @@ -160,11 +160,11 @@ where /// # Remarks /// The replicas are created by cloning the OrderedMap node. /// This mean that 4 replicas of an Ordered Map node with 2 workers each - /// will result in the usage of 8 threads. - /// + /// will result in the usage of 8 threads. + /// /// # Examples - /// - /// Given a vector of vectors, each one containing a set of numbers, + /// + /// Given a vector of vectors, each one containing a set of numbers, /// compute the square value of each number contained in each /// vector. /// In this case, using the OrderedMap template, it is possible @@ -172,12 +172,12 @@ where /// Moreover, using the `build_with_replicas` method, /// we can create a stage consisting of four map operator, /// each one composed by 2 worker. - /// + /// /// ``` /// use ppl::{prelude::*, templates::misc::{SourceIter, OrderedSinkVec}, templates::map::OrderedMap}; /// let mut counter = 1.0; /// let mut vector = Vec::new(); - /// + /// /// // Create a vector of vectors, each one containing a set of numbers. /// for _i in 0..1000{ /// let mut numbers = Vec::new(); @@ -187,18 +187,17 @@ where /// } /// vector.push(numbers); /// } - /// + /// /// // Instantiate the pipeline. /// let pipe = pipeline![ /// SourceIter::build(vector.into_iter()), /// OrderedMap::build_with_replicas(4, 2, |el: f64| el * el), /// OrderedSinkVec::build() /// ]; - /// + /// /// // Start the pipeline and collect the results. /// let res: Vec> = pipe.start_and_wait_end().unwrap(); /// ``` - pub fn build_with_replicas( n_worker: usize, n_replicas: usize, @@ -264,6 +263,48 @@ where /// # Arguments /// * `n_worker` - Number of worker threads. /// * `f` - Function to apply to each element of the input. + /// + /// # Examples + /// + /// Given a collection of vectors of integers, for each vector + /// compute the summation of its elements. + /// As this reduce function works by grouping by key, + /// the input of the reduce function must be a vector of tuple (key, value). + /// + /// ``` + /// + /// use ppl::{prelude::*, templates::misc::{SourceIter, SinkVec}, templates::map::Reduce}; + /// + /// // Create the vector of the elements that will be emitted by the Source node. + /// // vec![(key,value)] + /// let vector = vec![ + /// vec![(0, 1), (0, 2), (0, 3), (0, 4), (0, 5), (0, 6), (0, 7), (0, 8), (0, 9), (0 ,10)], + /// vec![(0, 1), (0, 2), (0, 3), (0, 4), (0, 5), (0, 6), (0, 7), (0, 8), (0, 9), (0 ,10)], + /// vec![(0, 1), (0, 2), (0, 3), (0, 4), (0, 5), (0, 6), (0, 7), (0, 8), (0, 9), (0 ,10)], + /// ]; + /// + /// // Instantiate a new pipeline. + /// let pipe = pipeline![ + /// SourceIter::build(vector.into_iter()), + /// Reduce::build(4, |i, vec| -> (i32, i32) { + /// (i, vec.iter().sum()) + /// }), + /// SinkVec::build() + /// ]; + /// + /// // Start the pipeline and wait for the results. + /// let res: Vec> = pipe.start_and_wait_end().unwrap(); + /// + /// // Collect a results for each vector emitted by the Source. In our case we had 3 vectors. + /// assert_eq!(res.len(), 3); + /// + /// // As for each vector emitted we had only one key, we obtain only one result tuple + /// // for vector. + /// for vec in res { + /// for el in vec { + /// assert_eq!(el.1, 55); + /// } + /// } pub fn build(n_worker: usize, f: F) -> impl InOut where TInIter: IntoIterator, @@ -287,7 +328,7 @@ where /// # Remarks /// The replicas are created by cloning the Reduce node. /// This mean that 4 replicas of a Reduce node with 2 workers each - /// will result in the usage of 8 threads. + /// will result in the usage of 8 threads. pub fn build_with_replicas( n_worker: usize, n_replicas: usize, @@ -376,7 +417,7 @@ where /// # Remarks /// The replicas are created by cloning the OrderedReduce node. /// This mean that 4 replicas of an OrderedReduce node with 2 workers each - /// will result in the usage of 8 threads. + /// will result in the usage of 8 threads. pub fn build_with_replicas( n_worker: usize, n_replicas: usize, @@ -481,7 +522,7 @@ where /// # Remarks /// The replicas are created by cloning the MapReduce node. /// This mean that 4 replicas of a MapReduce node with 2 workers each - /// will result in the usage of 8 threads. + /// will result in the usage of 8 threads. pub fn build_with_replicas( n_worker: usize, f_map: FMap, @@ -593,7 +634,7 @@ where /// # Remarks /// The replicas are created by cloning the OrderedMapReduce node. /// This mean that 4 replicas of an OrderedMapReduce node with 2 workers each - /// will result in the usage of 8 threads. + /// will result in the usage of 8 threads. pub fn build_with_replicas( n_worker: usize, n_replicas: usize, @@ -640,155 +681,189 @@ where #[cfg(test)] mod test { -use serial_test::serial; + use serial_test::serial; -use crate::{prelude::*, templates::misc::{SourceIter, SinkVec, OrderedSinkVec}}; -use super::{Map, OrderedMap}; + use super::{Map, OrderedMap, Reduce}; + use crate::{ + prelude::*, + templates::misc::{OrderedSinkVec, SinkVec, SourceIter}, + }; + fn square(x: f64) -> f64 { + x * x + } + #[test] + #[serial] + fn simple_map() { + let mut counter = 1.0; + let numbers: Vec = vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0]; + let mut vector = Vec::new(); + for _i in 0..1000 { + vector.push(numbers.clone()); + } -fn square(x: f64) -> f64 { - x * x -} + let pipe = pipeline![ + SourceIter::build(vector.into_iter()), + Map::build(4, |el: f64| square(el)), + SinkVec::build() + ]; + + let res: Vec> = pipe.start_and_wait_end().unwrap(); -#[test] -#[serial] -fn simple_map() { - let mut counter = 1.0; - let numbers: Vec = vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0]; - let mut vector = Vec::new(); + for vec in res { + for el in vec { + assert_eq!(el.sqrt(), counter); + counter += 1.0; + } + counter = 1.0; + } - for _i in 0..1000 { - vector.push(numbers.clone()); + unsafe { + Orchestrator::delete_global_orchestrator(); + } } - - let pipe = pipeline![ - SourceIter::build(vector.into_iter()), - Map::build(4, |el: f64| square(el)), - SinkVec::build() - ]; + #[test] + #[serial] + fn simple_map_replicated() { + let mut counter = 1.0; + let numbers: Vec = vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0]; + let mut vector = Vec::new(); - let res: Vec> = pipe.start_and_wait_end().unwrap(); + for _i in 0..1000 { + vector.push(numbers.clone()); + } + + let pipe = pipeline![ + SourceIter::build(vector.into_iter()), + Map::build_with_replicas(4, 2, |el: f64| square(el)), + SinkVec::build() + ]; - for vec in res { - for el in vec { - assert_eq!(el.sqrt(), counter); - counter += 1.0; + let res: Vec> = pipe.start_and_wait_end().unwrap(); + + for vec in res { + for el in vec { + assert_eq!(el.sqrt(), counter); + counter += 1.0; + } + counter = 1.0; } - counter = 1.0; - } - unsafe { - Orchestrator::delete_global_orchestrator(); + unsafe { + Orchestrator::delete_global_orchestrator(); + } } -} -#[test] -#[serial] -fn simple_map_replicated() { - let mut counter = 1.0; - let numbers: Vec = vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0]; - let mut vector = Vec::new(); + #[test] + #[serial] + fn simple_ordered_map() { + let mut counter = 1.0; + let mut vector = Vec::new(); - for _i in 0..1000 { - vector.push(numbers.clone()); - } + for _i in 0..1000 { + let mut numbers = Vec::new(); + for _i in 0..10 { + numbers.push(counter); + counter += 1.0; + } + vector.push(numbers); + } - - let pipe = pipeline![ - SourceIter::build(vector.into_iter()), - Map::build_with_replicas(4, 2, |el: f64| square(el)), - SinkVec::build() - ]; + let pipe = pipeline![ + SourceIter::build(vector.into_iter()), + OrderedMap::build(4, |el: f64| square(el)), + OrderedSinkVec::build() + ]; - let res: Vec> = pipe.start_and_wait_end().unwrap(); + let res: Vec> = pipe.start_and_wait_end().unwrap(); - for vec in res { - for el in vec { - assert_eq!(el.sqrt(), counter); - counter += 1.0; - } counter = 1.0; - } + for vec in res { + for el in vec { + assert_eq!(el.sqrt(), counter); + counter += 1.0; + } + } - unsafe { - Orchestrator::delete_global_orchestrator(); + unsafe { + Orchestrator::delete_global_orchestrator(); + } } -} -#[test] -#[serial] -fn simple_ordered_map() { - let mut counter = 1.0; - let mut vector = Vec::new(); - - for _i in 0..1000{ - let mut numbers = Vec::new(); - for _i in 0..10 { - numbers.push(counter); - counter += 1.0; + #[test] + #[serial] + fn simple_ordered_map_replicated() { + let mut counter = 1.0; + let mut vector = Vec::new(); + + for _i in 0..1000 { + let mut numbers = Vec::new(); + for _i in 0..10 { + numbers.push(counter); + counter += 1.0; + } + vector.push(numbers); } - vector.push(numbers); - } - - let pipe = pipeline![ - SourceIter::build(vector.into_iter()), - OrderedMap::build(4, |el: f64| square(el)), - OrderedSinkVec::build() - ]; + let pipe = pipeline![ + SourceIter::build(vector.into_iter()), + OrderedMap::build_with_replicas(4, 2, |el: f64| square(el)), + OrderedSinkVec::build() + ]; - let res: Vec> = pipe.start_and_wait_end().unwrap(); + let res: Vec> = pipe.start_and_wait_end().unwrap(); - counter = 1.0; - for vec in res { - for el in vec { - assert_eq!(el.sqrt(), counter); - counter += 1.0; + counter = 1.0; + for vec in res { + for el in vec { + assert_eq!(el.sqrt(), counter); + counter += 1.0; + } } - } - unsafe { - Orchestrator::delete_global_orchestrator(); + unsafe { + Orchestrator::delete_global_orchestrator(); + } } -} -#[test] -#[serial] -fn simple_ordered_map_replicated() { - let mut counter = 1.0; - let mut vector = Vec::new(); - - for _i in 0..1000{ - let mut numbers = Vec::new(); - for _i in 0..10 { - numbers.push(counter); - counter += 1.0; + #[test] + #[serial] + fn summation() { + let mut counter = 1; + let mut set = Vec::new(); + + for i in 0..1000 { + let mut vector = Vec::new(); + for _i in 0..10 { + vector.push((i, counter)); + counter += 1; + } + counter = 1; + set.push(vector); } - vector.push(numbers); - } - - let pipe = pipeline![ - SourceIter::build(vector.into_iter()), - OrderedMap::build_with_replicas(4, 2, |el: f64| square(el)), - OrderedSinkVec::build() - ]; + let pipe = pipeline![ + SourceIter::build(set.into_iter()), + Reduce::build(4, |i, vec| -> (i32, i32) { (i, vec.iter().sum()) }), + SinkVec::build() + ]; + + let res: Vec> = pipe.start_and_wait_end().unwrap(); - let res: Vec> = pipe.start_and_wait_end().unwrap(); + assert_eq!(res.len(), 1000); - counter = 1.0; - for vec in res { - for el in vec { - assert_eq!(el.sqrt(), counter); - counter += 1.0; + for vec in res { + assert_eq!(vec.len(), 1); + for el in vec { + assert_eq!(el.1, 55); + } } - } - unsafe { - Orchestrator::delete_global_orchestrator(); + unsafe { + Orchestrator::delete_global_orchestrator(); + } } } -} \ No newline at end of file diff --git a/src/thread_pool/mod.rs b/src/thread_pool/mod.rs index 827f9fd4..ac7f893c 100644 --- a/src/thread_pool/mod.rs +++ b/src/thread_pool/mod.rs @@ -222,9 +222,9 @@ impl ThreadPool { /// Create a new thread pool. /// If the environment variable `PPL_MAX_CORES` is set, the capacity of the thread - /// pool is set to that value. Otherwise, the number of logical threads available + /// pool is set to that value. Otherwise, the number of logical threads available /// on the host machine is used instead. - /// + /// /// /// # Examples /// @@ -554,4 +554,3 @@ impl<'pool, 'scope> Scope<'pool, 'scope> { self.pool.execute(task); } } - diff --git a/src/thread_pool/test.rs b/src/thread_pool/test.rs index b98680e1..742f8de9 100644 --- a/src/thread_pool/test.rs +++ b/src/thread_pool/test.rs @@ -148,6 +148,23 @@ fn test_par_map_reduce() { assert!(check) } +#[test] +#[serial] +fn test_par_reduce() { + let mut pool = ThreadPool::new(); + + let mut vec = Vec::new(); + for i in 0..100 { + vec.push((i % 10, i)); + } + + let res: Vec<(i32, i32)> = pool + .par_reduce(vec, |k, v| -> (i32, i32) { (k, v.iter().sum()) }) + .collect(); + + assert_eq!(res.len(), 10); +} + #[test] #[serial] fn test_par_map_reduce_seq() { From 65cda94b35e02e65aa45b088e855a1e64462bc61 Mon Sep 17 00:00:00 2001 From: Valerio Besozzi Date: Wed, 25 Oct 2023 12:42:21 +0200 Subject: [PATCH 06/13] Add Example and unit test for OrderedReduce --- src/templates/map.rs | 97 +++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 92 insertions(+), 5 deletions(-) diff --git a/src/templates/map.rs b/src/templates/map.rs index 1028a1d8..cb7907b3 100644 --- a/src/templates/map.rs +++ b/src/templates/map.rs @@ -272,9 +272,9 @@ where /// the input of the reduce function must be a vector of tuple (key, value). /// /// ``` - /// + /// /// use ppl::{prelude::*, templates::misc::{SourceIter, SinkVec}, templates::map::Reduce}; - /// + /// /// // Create the vector of the elements that will be emitted by the Source node. /// // vec![(key,value)] /// let vector = vec![ @@ -285,8 +285,8 @@ where /// /// // Instantiate a new pipeline. /// let pipe = pipeline![ - /// SourceIter::build(vector.into_iter()), - /// Reduce::build(4, |i, vec| -> (i32, i32) { + /// SourceIter::build(vector.into_iter()), + /// Reduce::build(4, |i, vec| -> (i32, i32) { /// (i, vec.iter().sum()) /// }), /// SinkVec::build() @@ -418,6 +418,50 @@ where /// The replicas are created by cloning the OrderedReduce node. /// This mean that 4 replicas of an OrderedReduce node with 2 workers each /// will result in the usage of 8 threads. + /// + /// # Examples + /// + /// Given a collection of vectors of integers, for each vector + /// compute the summation of its elements. + /// As this reduce function works by grouping by key, + /// the input of the reduce function must be a vector of tuple (key, value). + /// In this example we want mantain the order of the input in the output. + /// + /// ``` + /// + /// use ppl::{prelude::*, templates::misc::{SourceIter, OrderedSinkVec}, templates::map::OrderedReduce}; + /// + /// // Create the vector of the elements that will be emitted by the Source node. + /// // vec![(key,value)] + /// let vector = vec![ + /// vec![(0, 1), (0, 2), (0, 3), (0, 4), (0, 5), (0, 6), (0, 7), (0, 8), (0, 9), (0 ,10)], + /// vec![(1, 1), (1, 2), (1, 3), (1, 4), (1, 5), (1, 6), (1, 7), (1, 8), (1, 9), (1 ,10)], + /// vec![(2, 1), (2, 2), (2, 3), (2, 4), (2, 5), (2, 6), (2, 7), (2, 8), (2, 9), (2 ,10)], + /// ]; + /// + /// // Instantiate a new pipeline. + /// let pipe = pipeline![ + /// SourceIter::build(vector.into_iter()), + /// OrderedReduce::build_with_replicas(2, 4, |i, vec| -> (usize, i32) { + /// (i, vec.iter().sum()) + /// }), + /// OrderedSinkVec::build() + /// ]; + /// + /// // Start the pipeline and wait for the results. + /// let res: Vec> = pipe.start_and_wait_end().unwrap(); + /// + /// // Collect a results for each vector emitted by the Source. In our case we had 3 vectors. + /// assert_eq!(res.len(), 3); + /// + /// // As for each vector emitted we had only one key, we obtain only one result tuple + /// // for vector. Moreover, we check here also if the order of the input was preserved + /// // in the output. + /// for (check, vec) in res.into_iter().enumerate() { + /// for el in vec { + /// assert_eq!(el, (check, 55)); + /// } + /// } pub fn build_with_replicas( n_worker: usize, n_replicas: usize, @@ -686,7 +730,10 @@ mod test { use super::{Map, OrderedMap, Reduce}; use crate::{ prelude::*, - templates::misc::{OrderedSinkVec, SinkVec, SourceIter}, + templates::{ + map::OrderedReduce, + misc::{OrderedSinkVec, SinkVec, SourceIter}, + }, }; fn square(x: f64) -> f64 { @@ -866,4 +913,44 @@ mod test { Orchestrator::delete_global_orchestrator(); } } + + #[test] + #[serial] + fn summation_ordered() { + let mut counter = 1; + let mut set = Vec::new(); + + for i in 0..1000 { + let mut vector = Vec::new(); + for _i in 0..10 { + vector.push((i, counter)); + counter += 1; + } + counter = 1; + set.push(vector); + } + + let pipe = pipeline![ + SourceIter::build(set.into_iter()), + OrderedReduce::build_with_replicas(2, 4, |i, vec| -> (usize, i32) { + (i, vec.iter().sum()) + }), + OrderedSinkVec::build() + ]; + + let res: Vec> = pipe.start_and_wait_end().unwrap(); + + assert_eq!(res.len(), 1000); + + for (check, vec) in res.into_iter().enumerate() { + assert_eq!(vec.len(), 1); + for el in vec { + assert_eq!(el, (check, 55)); + } + } + + unsafe { + Orchestrator::delete_global_orchestrator(); + } + } } From 864120a2dde8f555edf7a744d65246542d7900cd Mon Sep 17 00:00:00 2001 From: Valerio Besozzi Date: Thu, 26 Oct 2023 11:56:23 +0200 Subject: [PATCH 07/13] Add test for MapReduce and OrderedMapReduce. Fix bug in OrderedMapReduce. --- src/templates/map.rs | 96 +++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 94 insertions(+), 2 deletions(-) diff --git a/src/templates/map.rs b/src/templates/map.rs index cb7907b3..2c665723 100644 --- a/src/templates/map.rs +++ b/src/templates/map.rs @@ -36,7 +36,6 @@ where /// ``` /// use ppl::{prelude::*, templates::misc::{SourceIter, SinkVec}, templates::map::Map}; /// - /// let mut counter = 1.0; /// let numbers: Vec = vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0]; /// let mut vector = Vec::new(); /// @@ -721,6 +720,9 @@ where fn number_of_replicas(&self) -> usize { self.replicas } + fn is_ordered(&self) -> bool { + true + } } #[cfg(test)] @@ -731,7 +733,8 @@ mod test { use crate::{ prelude::*, templates::{ - map::OrderedReduce, + map::{OrderedReduce, OrderedMapReduce}, + map::MapReduce, misc::{OrderedSinkVec, SinkVec, SourceIter}, }, }; @@ -953,4 +956,93 @@ mod test { Orchestrator::delete_global_orchestrator(); } } + + + #[test] + #[serial] + fn summation_of_squares() { + let mut counter = 1.0; + let mut set = Vec::new(); + + for i in 0..100000 { + let mut vector = Vec::new(); + for _i in 0..10 { + vector.push((i, counter)); + counter += 1.0; + } + counter = 1.0; + set.push(vector); + } + + let pipe = pipeline![ + SourceIter::build(set.into_iter()), + MapReduce::build(8, + |el: (usize, f64)| -> (usize, f64) { + (el.0, el.1 * el.1) + }, + |i, vec| { + (i, vec.iter().sum()) + }), + SinkVec::build() + ]; + + let res: Vec> = pipe.start_and_wait_end().unwrap(); + + assert_eq!(res.len(), 100000); + + for vec in res { + assert_eq!(vec.len(), 1); + for el in vec { + assert_eq!(el.1, 385.00); + } + } + + unsafe { + Orchestrator::delete_global_orchestrator(); + } + } + + #[test] + #[serial] + fn summation_of_squares_ordered() { + let mut counter = 1.0; + let mut set = Vec::new(); + + for i in 0..100 { + let mut vector = Vec::new(); + for _i in 0..10 { + vector.push((i, counter)); + counter += 1.0; + } + counter = 1.0; + set.push(vector); + } + + let pipe = pipeline![ + SourceIter::build(set.into_iter()), + OrderedMapReduce::build(8, + |el: (usize, f64)| -> (usize, f64) { + (el.0, el.1 * el.1) + }, + |i, vec| { + (i, vec.iter().sum()) + }), + OrderedSinkVec::build() + ]; + + let res: Vec> = pipe.start_and_wait_end().unwrap(); + + assert_eq!(res.len(), 100); + + for (check, vec) in res.into_iter().enumerate() { + assert_eq!(vec.len(), 1); + for el in vec { + assert_eq!(el, (check, 385.00)); + } + } + + unsafe { + Orchestrator::delete_global_orchestrator(); + } + } } From b5c020ea4be9ea7b7de303d096dd3150f95d9cd0 Mon Sep 17 00:00:00 2001 From: Valerio Besozzi Date: Fri, 27 Oct 2023 10:54:51 +0200 Subject: [PATCH 08/13] Add examples for MapReduce, OrderedMapReduce --- src/templates/map.rs | 90 ++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 86 insertions(+), 4 deletions(-) diff --git a/src/templates/map.rs b/src/templates/map.rs index 2c665723..61a017f2 100644 --- a/src/templates/map.rs +++ b/src/templates/map.rs @@ -427,7 +427,6 @@ where /// In this example we want mantain the order of the input in the output. /// /// ``` - /// /// use ppl::{prelude::*, templates::misc::{SourceIter, OrderedSinkVec}, templates::map::OrderedReduce}; /// /// // Create the vector of the elements that will be emitted by the Source node. @@ -536,6 +535,43 @@ where /// * `n_worker` - Number of worker threads. /// * `f_map` - Function to apply to each element of the input. /// * `f_reduce` - Function to apply to the output of the Map. + /// + /// # Examples + /// + /// Given a vector of vectors, each one containing a set of numbers, + /// compute for each vector the square value of each of its elements. + /// Furthermore, compute for each vector the summation of all its elements. + /// + /// ``` + /// use ppl::{prelude::*, templates::misc::{SourceIter, SinkVec}, templates::map::MapReduce}; + /// + /// let mut counter = 1.0; + /// let mut set = Vec::new(); + /// + /// for i in 0..100000 { + /// let mut vector = Vec::new(); + /// for _i in 0..10 { + /// vector.push((i, counter)); + /// counter += 1.0; + /// } + /// counter = 1.0; + /// set.push(vector); + /// } + /// // Instantiate a new pipeline. + /// let pipe = pipeline![ + /// SourceIter::build(set.into_iter()), + /// MapReduce::build(8, + /// |el: (usize, f64)| -> (usize, f64) { + /// (el.0, el.1 * el.1) + /// }, + /// |i, vec| { + /// (i, vec.iter().sum()) + /// }), + /// SinkVec::build() + /// ]; + /// + /// let res: Vec> = pipe.start_and_wait_end().unwrap(); + /// ``` pub fn build( n_worker: usize, f_map: FMap, @@ -678,6 +714,52 @@ where /// The replicas are created by cloning the OrderedMapReduce node. /// This mean that 4 replicas of an OrderedMapReduce node with 2 workers each /// will result in the usage of 8 threads. + /// + /// # Examples + /// + /// Given a vector of vectors, each one containing a set of numbers, + /// compute for each vector the square value of each of its elements. + /// Furthermore, compute for each vector the summation of all its elements. + /// In this example we want mantain the order of the input in the output. + /// ``` + /// use ppl::{prelude::*, templates::misc::{SourceIter, OrderedSinkVec}, templates::map::OrderedMapReduce}; + /// + /// let mut counter = 1.0; + /// let mut set = Vec::new(); + /// + /// for i in 0..100000 { + /// let mut vector = Vec::new(); + /// for _i in 0..10 { + /// vector.push((i, counter)); + /// counter += 1.0; + /// } + /// counter = 1.0; + /// set.push(vector); + /// } + /// // Instantiate a new pipeline. + /// let pipe = pipeline![ + /// SourceIter::build(set.into_iter()), + /// OrderedMapReduce::build_with_replicas(2, 8, + /// |el: (usize, f64)| -> (usize, f64) { + /// (el.0, el.1 * el.1) + /// }, + /// |i, vec| { + /// (i, vec.iter().sum()) + /// }), + /// OrderedSinkVec::build() + /// ]; + /// + /// let res: Vec> = pipe.start_and_wait_end().unwrap(); + /// + /// // We check here also if the order of the input was preserved + /// // in the output. + /// for (check, vec) in res.into_iter().enumerate() { + /// assert_eq!(vec.len(), 1); + /// for el in vec { + /// assert_eq!(el, (check, 385.00)); + /// } + /// } + /// ``` pub fn build_with_replicas( n_worker: usize, n_replicas: usize, @@ -1008,7 +1090,7 @@ mod test { let mut counter = 1.0; let mut set = Vec::new(); - for i in 0..100 { + for i in 0..100000 { let mut vector = Vec::new(); for _i in 0..10 { vector.push((i, counter)); @@ -1020,7 +1102,7 @@ mod test { let pipe = pipeline![ SourceIter::build(set.into_iter()), - OrderedMapReduce::build(8, + OrderedMapReduce::build_with_replicas(2, 4, |el: (usize, f64)| -> (usize, f64) { (el.0, el.1 * el.1) }, @@ -1032,7 +1114,7 @@ mod test { let res: Vec> = pipe.start_and_wait_end().unwrap(); - assert_eq!(res.len(), 100); + assert_eq!(res.len(), 100000); for (check, vec) in res.into_iter().enumerate() { assert_eq!(vec.len(), 1); From 798f8da697164d1f8f91ca85dff1060654777048 Mon Sep 17 00:00:00 2001 From: Valerio Besozzi Date: Fri, 27 Oct 2023 12:04:11 +0200 Subject: [PATCH 09/13] Fix bug in InOut node producer rts. Add unit tests in templates/misc. --- src/pipeline/node/inout_node.rs | 6 +- src/templates/misc.rs | 300 +++++++++++++++++++++----------- 2 files changed, 205 insertions(+), 101 deletions(-) diff --git a/src/pipeline/node/inout_node.rs b/src/pipeline/node/inout_node.rs index 65cccd84..bcb6a8ad 100644 --- a/src/pipeline/node/inout_node.rs +++ b/src/pipeline/node/inout_node.rs @@ -442,6 +442,9 @@ where // If the node is not ordered, then i can send the messages to the next node // without any order. loop { + if tmp.is_empty() { + break; + } let msg = Message { op: Task::New(tmp.pop_front().unwrap()), order: 0, @@ -450,9 +453,6 @@ where if err.is_err() { panic!("Error: {}", err.unwrap_err()); } - if tmp.is_empty() { - break; - } } } } diff --git a/src/templates/misc.rs b/src/templates/misc.rs index 4ab4fa97..584bb4ba 100644 --- a/src/templates/misc.rs +++ b/src/templates/misc.rs @@ -1,4 +1,4 @@ -use std::marker::PhantomData; +use std::{marker::PhantomData, collections::VecDeque}; use crate::pipeline::node::{In, InOut, Out}; @@ -76,7 +76,8 @@ where #[derive(Clone)] pub struct Splitter { chunk_size: usize, - data: Vec, + n_replicas: usize, + data: VecDeque, } impl Splitter where @@ -87,7 +88,18 @@ where pub fn build(chunk_size: usize) -> impl InOut, Vec> { Self { chunk_size, - data: Vec::new(), + n_replicas: 1, + data: VecDeque::new(), + } + } + + /// Creates a new splitter node with 'n_replicas' replicas of the same node. + /// The node will terminate when the upstream terminates. + pub fn build_with_replicas(chunk_size: usize, n_replicas: usize) -> impl InOut, Vec> { + Self { + chunk_size, + n_replicas, + data: VecDeque::new(), } } } @@ -99,13 +111,18 @@ where self.data.extend(input); None } + fn number_of_replicas(&self) -> usize { + self.n_replicas + } fn is_producer(&self) -> bool { true } fn produce(&mut self) -> Option> { if self.data.len() >= self.chunk_size { let mut chunk = Vec::new(); - std::mem::swap(&mut chunk, &mut self.data); + for _i in 0..self.chunk_size { + chunk.push(self.data.pop_front().unwrap()) + } Some(chunk) } else { None @@ -121,7 +138,8 @@ where #[derive(Clone)] pub struct Aggregator { chunk_size: usize, - data: Vec, + n_replicas: usize, + data: VecDeque, } impl Aggregator where @@ -132,7 +150,18 @@ where pub fn build(chunk_size: usize) -> impl InOut> { Self { chunk_size, - data: Vec::new(), + n_replicas: 1, + data: VecDeque::new(), + } + } + + /// Creates a new aggregator nod with 'n_replicas' replicas of the same node. + /// The node will terminate when the upstream terminates. + pub fn build_with_replicas(chunk_size: usize, n_replicas: usize) -> impl InOut> { + Self { + chunk_size, + n_replicas, + data: VecDeque::new(), } } } @@ -141,22 +170,21 @@ where T: Send + 'static + Clone, { fn run(&mut self, input: T) -> Option> { - self.data.push(input); - if self.data.len() >= self.chunk_size { - let mut chunk = Vec::new(); - std::mem::swap(&mut chunk, &mut self.data); - Some(chunk) - } else { - None - } + self.data.push_back(input); + None + } + fn number_of_replicas(&self) -> usize { + self.n_replicas } fn is_producer(&self) -> bool { true } fn produce(&mut self) -> Option> { - if !self.data.is_empty() { + if self.data.len() >= self.chunk_size { let mut chunk = Vec::new(); - std::mem::swap(&mut chunk, &mut self.data); + for _i in 0..self.chunk_size { + chunk.push(self.data.pop_front().unwrap()) + } Some(chunk) } else { None @@ -342,18 +370,18 @@ where } } + /// OrderedSplitter. /// /// This node receives a vector, split it into chunks of size `chunk_size` /// and send each chunk to the next node. -/// This is a ordered version of Splitter. /// The node will terminate when the upstream terminates. -/// The node will produce data in the same order as it is received from the upstream. +/// This node mantains the order of the input in the output. #[derive(Clone)] pub struct OrderedSplitter { chunk_size: usize, n_replicas: usize, - data: Vec, + data: VecDeque, } impl OrderedSplitter where @@ -361,22 +389,21 @@ where { /// Creates a new splitter node. /// The node will terminate when the upstream terminates. - /// The node will produce data in the same order as it is received from the upstream. pub fn build(chunk_size: usize) -> impl InOut, Vec> { Self { chunk_size, n_replicas: 1, - data: Vec::new(), + data: VecDeque::new(), } } + /// Creates a new splitter node with 'n_replicas' replicas of the same node. /// The node will terminate when the upstream terminates. - /// The node will produce data in the same order as it is received from the upstream. pub fn build_with_replicas(chunk_size: usize, n_replicas: usize) -> impl InOut, Vec> { Self { chunk_size, n_replicas, - data: Vec::new(), + data: VecDeque::new(), } } } @@ -388,99 +415,29 @@ where self.data.extend(input); None } - fn is_producer(&self) -> bool { - true - } fn number_of_replicas(&self) -> usize { self.n_replicas } - fn produce(&mut self) -> Option> { - if self.data.len() >= self.chunk_size { - let mut chunk = Vec::new(); - std::mem::swap(&mut chunk, &mut self.data); - Some(chunk) - } else { - None - } - } - fn is_ordered(&self) -> bool { - true - } -} - -/// OrderedAggregator. -/// -/// This node receives elements and accumulates them into a vector. -/// When the vector reaches the size `chunk_size` it send the vector -/// with the elements accumulated to the next node. -/// This is a ordered version of Aggregator. -/// The node will terminate when the upstream terminates. -/// The node will produce data in the same order as it is received from the upstream. -#[derive(Clone)] -pub struct OrderedAggregator { - chunk_size: usize, - n_replicas: usize, - data: Vec, -} -impl OrderedAggregator -where - T: Send + 'static + Clone, -{ - /// Creates a new aggregator node. - /// The node will terminate when the upstream terminates. - /// The node will produce data in the same order as it is received from the upstream. - pub fn build(chunk_size: usize) -> impl InOut> { - Self { - chunk_size, - n_replicas: 1, - data: Vec::new(), - } - } - /// Creates a new aggregator nod with 'n_replicas' replicas of the same node. - /// The node will terminate when the upstream terminates. - /// The node will produce data in the same order as it is received from the upstream. - pub fn build_with_replicas(chunk_size: usize, n_replicas: usize) -> impl InOut> { - Self { - chunk_size, - n_replicas, - data: Vec::new(), - } - } -} -impl InOut> for OrderedAggregator -where - T: Send + 'static + Clone, -{ - fn run(&mut self, input: T) -> Option> { - self.data.push(input); - if self.data.len() >= self.chunk_size { - let mut chunk = Vec::new(); - std::mem::swap(&mut chunk, &mut self.data); - Some(chunk) - } else { - None - } - } fn is_producer(&self) -> bool { true } fn produce(&mut self) -> Option> { - if !self.data.is_empty() { + if self.data.len() >= self.chunk_size { let mut chunk = Vec::new(); - std::mem::swap(&mut chunk, &mut self.data); + for _i in 0..self.chunk_size { + chunk.push(self.data.pop_front().unwrap()) + } Some(chunk) } else { None } } - fn number_of_replicas(&self) -> usize { - self.n_replicas - } fn is_ordered(&self) -> bool { true } } + /// OrderedSequential. /// /// This node receives elements and applies a function to each element. @@ -622,3 +579,150 @@ where self.n_replicas } } + +#[cfg(test)] +mod test { + use serial_test::serial; + use crate::{ + prelude::*, + templates::misc::{SinkVec, SourceIter, Sequential, OrderedSinkVec, OrderedSequential, Parallel, OrderedParallel, OrderedSplitter}, + }; + + use super::{Splitter, Aggregator}; + + #[test] + #[serial] + fn simple_pipeline() { + let p = pipeline![ + SourceIter::build(1..21), + Sequential::build(|el| { el /*println!("Hello, received: {}", el); */ }), + SinkVec::build() + ]; + + let res = p.start_and_wait_end().unwrap(); + + assert_eq!(res.len(), 20) + } + + #[test] + #[serial] + fn simple_pipeline_ordered() { + let p = pipeline![ + SourceIter::build(1..21), + OrderedSequential::build(|el| { el /*println!("Hello, received: {}", el); */ }), + OrderedSinkVec::build() + ]; + + let res = p.start_and_wait_end().unwrap(); + + assert_eq!(res.len(), 20); + + let mut counter = 1; + for el in res { + assert_eq!(el, counter); + counter += 1; } + } + + #[test] + #[serial] + fn simple_farm() { + let p = pipeline![ + SourceIter::build(1..21), + Parallel::build(8, |el| { el /*println!("Hello, received: {}", el); */ }), + SinkVec::build() + ]; + + let res = p.start_and_wait_end().unwrap(); + + assert_eq!(res.len(), 20) + } + + #[test] + #[serial] + fn simple_farm_ordered() { + let p = pipeline![ + SourceIter::build(1..21), + OrderedParallel::build(8, |el| { el /*println!("Hello, received: {}", el); */ }), + OrderedSinkVec::build() + ]; + + let res = p.start_and_wait_end().unwrap(); + + assert_eq!(res.len(), 20); + + let mut counter = 1; + for el in res { + assert_eq!(el, counter); + counter += 1; } + } + + #[test] + #[serial] + fn splitter() { + let mut counter = 1; + let mut set = Vec::new(); + + for i in 0..1000 { + let mut vector = Vec::new(); + for _i in 0..20 { + vector.push((i, counter)); + counter += 1; + } + counter = 1; + set.push(vector); + } + + let p = pipeline![ + SourceIter::build(set.into_iter()), + Splitter::build(2), + Splitter::build(20000), + SinkVec::build() + ]; + + let mut res = p.start_and_wait_end().unwrap(); + + assert_eq!(res.len(), 1); + + let vec = res.pop().unwrap(); + + assert_eq!(vec.len(), 20000) + } + + #[test] + #[serial] + fn splitter_ordered() { + let mut counter = 1; + let mut set = Vec::new(); + + for _i in 0..1000 { + let mut vector = Vec::new(); + for _i in 0..20 { + vector.push(counter); + counter += 1; + } + set.push(vector); + } + + let p = pipeline![ + SourceIter::build(set.into_iter()), + OrderedSplitter::build_with_replicas(2, 2), + OrderedSplitter::build_with_replicas(4, 1), + OrderedSplitter::build(20000), + OrderedSinkVec::build() + ]; + + let mut res = p.start_and_wait_end().unwrap(); + + assert_eq!(res.len(), 1); + + let vec = res.pop().unwrap(); + + assert_eq!(vec.len(), 20000); + + counter = 1; + for el in vec { + assert_eq!(el, counter); + counter += 1; + } + } +} \ No newline at end of file From fd3a4d1f6c456e7c9ed991cb52266e7e39a86fc2 Mon Sep 17 00:00:00 2001 From: Valerio Besozzi Date: Fri, 27 Oct 2023 12:53:11 +0200 Subject: [PATCH 10/13] Add unit tests for Filter and OrderedFilter --- src/templates/misc.rs | 154 +++++++++++++++++++++++++++++++++++++++--- 1 file changed, 145 insertions(+), 9 deletions(-) diff --git a/src/templates/misc.rs b/src/templates/misc.rs index 584bb4ba..366f4d84 100644 --- a/src/templates/misc.rs +++ b/src/templates/misc.rs @@ -95,7 +95,7 @@ where /// Creates a new splitter node with 'n_replicas' replicas of the same node. /// The node will terminate when the upstream terminates. - pub fn build_with_replicas(chunk_size: usize, n_replicas: usize) -> impl InOut, Vec> { + pub fn build_with_replicas(n_replicas: usize, chunk_size: usize) -> impl InOut, Vec> { Self { chunk_size, n_replicas, @@ -157,7 +157,7 @@ where /// Creates a new aggregator nod with 'n_replicas' replicas of the same node. /// The node will terminate when the upstream terminates. - pub fn build_with_replicas(chunk_size: usize, n_replicas: usize) -> impl InOut> { + pub fn build_with_replicas(n_replicas: usize, chunk_size: usize) -> impl InOut> { Self { chunk_size, n_replicas, @@ -387,7 +387,7 @@ impl OrderedSplitter where T: Send + 'static + Clone, { - /// Creates a new splitter node. + /// Creates a new ordered splitter node. /// The node will terminate when the upstream terminates. pub fn build(chunk_size: usize) -> impl InOut, Vec> { Self { @@ -397,9 +397,9 @@ where } } - /// Creates a new splitter node with 'n_replicas' replicas of the same node. + /// Creates a new ordered splitter node with 'n_replicas' replicas of the same node. /// The node will terminate when the upstream terminates. - pub fn build_with_replicas(chunk_size: usize, n_replicas: usize) -> impl InOut, Vec> { + pub fn build_with_replicas(n_replicas: usize, chunk_size: usize) -> impl InOut, Vec> { Self { chunk_size, n_replicas, @@ -437,6 +437,71 @@ where } } +/// OrderedAggregator. +/// +/// This node receives elements and accumulates them into a vector. +/// When the vector reaches the size `chunk_size` it send the vector with the elements accumulated to the next node. +/// The node will terminate when the upstream terminates. +/// This node mantains the order of the input in the output. +#[derive(Clone)] +pub struct OrderedAggregator { + chunk_size: usize, + n_replicas: usize, + data: VecDeque, +} +impl OrderedAggregator +where + T: Send + 'static + Clone, +{ + /// Creates a new ordered aggregator node + /// The node will terminate when the upstream terminates. + pub fn build(chunk_size: usize) -> impl InOut> { + Self { + chunk_size, + n_replicas: 1, + data: VecDeque::new(), + } + } + + /// Creates a new ordered aggregator nod with 'n_replicas' replicas of the same node. + /// The node will terminate when the upstream terminates. + pub fn build_with_replicas(n_replicas: usize, chunk_size: usize) -> impl InOut> { + Self { + chunk_size, + n_replicas, + data: VecDeque::new(), + } + } +} +impl InOut> for OrderedAggregator +where + T: Send + 'static + Clone, +{ + fn run(&mut self, input: T) -> Option> { + self.data.push_back(input); + None + } + fn number_of_replicas(&self) -> usize { + self.n_replicas + } + fn is_producer(&self) -> bool { + true + } + fn produce(&mut self) -> Option> { + if self.data.len() >= self.chunk_size { + let mut chunk = Vec::new(); + for _i in 0..self.chunk_size { + chunk.push(self.data.pop_front().unwrap()) + } + Some(chunk) + } else { + None + } + } + fn is_ordered(&self) -> bool { + true + } +} /// OrderedSequential. /// @@ -552,7 +617,7 @@ where } /// Creates a new filter node. /// The node will terminate when the upstream terminates. - pub fn build_with_replicas(f: F, n_replicas: usize) -> impl InOut { + pub fn build_with_replicas(n_replicas: usize, f: F) -> impl InOut { Self { f, n_replicas, @@ -585,7 +650,7 @@ mod test { use serial_test::serial; use crate::{ prelude::*, - templates::misc::{SinkVec, SourceIter, Sequential, OrderedSinkVec, OrderedSequential, Parallel, OrderedParallel, OrderedSplitter}, + templates::misc::{SinkVec, SourceIter, Sequential, OrderedSinkVec, OrderedSequential, Parallel, OrderedParallel, OrderedSplitter, OrderedAggregator, Filter, OrderedFilter}, }; use super::{Splitter, Aggregator}; @@ -674,7 +739,7 @@ mod test { let p = pipeline![ SourceIter::build(set.into_iter()), - Splitter::build(2), + Splitter::build_with_replicas(2, 2), Splitter::build(20000), SinkVec::build() ]; @@ -705,7 +770,7 @@ mod test { let p = pipeline![ SourceIter::build(set.into_iter()), - OrderedSplitter::build_with_replicas(2, 2), + OrderedSplitter::build_with_replicas(2, 10), OrderedSplitter::build_with_replicas(4, 1), OrderedSplitter::build(20000), OrderedSinkVec::build() @@ -725,4 +790,75 @@ mod test { counter += 1; } } + + #[test] + #[serial] + fn aggregator() { + let p = pipeline![ + SourceIter::build(0..2000), + Aggregator::build(100), + SinkVec::build() + ]; + + let res = p.start_and_wait_end().unwrap(); + + assert_eq!(res.len(), 20); + + for vec in res { + assert_eq!(vec.len(), 100); + } + } + + #[test] + #[serial] + fn aggregator_ordered() { + let p = pipeline![ + SourceIter::build(0..2000), + OrderedAggregator::build_with_replicas(4, 100), + OrderedSplitter::build(1), + OrderedSinkVec::build() + ]; + + let res = p.start_and_wait_end().unwrap(); + + assert_eq!(res.len(), 2000); + + for vec in res { + assert_eq!(vec.len(), 1); + } + } + + #[test] + #[serial] + fn filter() { + let p = pipeline![ + SourceIter::build(0..200), + Filter::build(|el| { el % 2 == 0 }), + SinkVec::build() + ]; + + let res = p.start_and_wait_end().unwrap(); + + assert_eq!(res.len(), 100) + } + + #[test] + #[serial] + fn filter_ordered() { + let p = pipeline![ + SourceIter::build(0..200), + OrderedFilter::build_with_replicas(4, |el| { el % 2 == 0 }), + OrderedSinkVec::build() + ]; + + let res = p.start_and_wait_end().unwrap(); + + assert_eq!(res.len(), 100); + + let mut counter = 0; + for el in res { + assert_eq!(el, counter); + counter += 2; + } + } } \ No newline at end of file From 60a1659b5d915c74eaa47a7546833618e0e0b2ac Mon Sep 17 00:00:00 2001 From: Valerio Besozzi Date: Fri, 3 Nov 2023 12:45:53 +0100 Subject: [PATCH 11/13] Add test for misc templates --- src/templates/map.rs | 48 +++++++--------- src/templates/misc.rs | 130 ++++++++++++++++++++++++++++++++++++------ 2 files changed, 136 insertions(+), 42 deletions(-) diff --git a/src/templates/map.rs b/src/templates/map.rs index 61a017f2..e261aa50 100644 --- a/src/templates/map.rs +++ b/src/templates/map.rs @@ -535,16 +535,16 @@ where /// * `n_worker` - Number of worker threads. /// * `f_map` - Function to apply to each element of the input. /// * `f_reduce` - Function to apply to the output of the Map. - /// + /// /// # Examples - /// + /// /// Given a vector of vectors, each one containing a set of numbers, /// compute for each vector the square value of each of its elements. /// Furthermore, compute for each vector the summation of all its elements. - /// + /// /// ``` /// use ppl::{prelude::*, templates::misc::{SourceIter, SinkVec}, templates::map::MapReduce}; - /// + /// /// let mut counter = 1.0; /// let mut set = Vec::new(); /// @@ -560,7 +560,7 @@ where /// // Instantiate a new pipeline. /// let pipe = pipeline![ /// SourceIter::build(set.into_iter()), - /// MapReduce::build(8, + /// MapReduce::build(8, /// |el: (usize, f64)| -> (usize, f64) { /// (el.0, el.1 * el.1) /// }, @@ -714,16 +714,16 @@ where /// The replicas are created by cloning the OrderedMapReduce node. /// This mean that 4 replicas of an OrderedMapReduce node with 2 workers each /// will result in the usage of 8 threads. - /// + /// /// # Examples - /// + /// /// Given a vector of vectors, each one containing a set of numbers, /// compute for each vector the square value of each of its elements. /// Furthermore, compute for each vector the summation of all its elements. /// In this example we want mantain the order of the input in the output. /// ``` /// use ppl::{prelude::*, templates::misc::{SourceIter, OrderedSinkVec}, templates::map::OrderedMapReduce}; - /// + /// /// let mut counter = 1.0; /// let mut set = Vec::new(); /// @@ -739,7 +739,7 @@ where /// // Instantiate a new pipeline. /// let pipe = pipeline![ /// SourceIter::build(set.into_iter()), - /// OrderedMapReduce::build_with_replicas(2, 8, + /// OrderedMapReduce::build_with_replicas(2, 8, /// |el: (usize, f64)| -> (usize, f64) { /// (el.0, el.1 * el.1) /// }, @@ -750,7 +750,7 @@ where /// ]; /// /// let res: Vec> = pipe.start_and_wait_end().unwrap(); - /// + /// /// // We check here also if the order of the input was preserved /// // in the output. /// for (check, vec) in res.into_iter().enumerate() { @@ -815,8 +815,8 @@ mod test { use crate::{ prelude::*, templates::{ - map::{OrderedReduce, OrderedMapReduce}, map::MapReduce, + map::{OrderedMapReduce, OrderedReduce}, misc::{OrderedSinkVec, SinkVec, SourceIter}, }, }; @@ -1039,7 +1039,6 @@ mod test { } } - #[test] #[serial] fn summation_of_squares() { @@ -1058,13 +1057,11 @@ mod test { let pipe = pipeline![ SourceIter::build(set.into_iter()), - MapReduce::build(8, - |el: (usize, f64)| -> (usize, f64) { - (el.0, el.1 * el.1) - }, - |i, vec| { - (i, vec.iter().sum()) - }), + MapReduce::build( + 8, + |el: (usize, f64)| -> (usize, f64) { (el.0, el.1 * el.1) }, + |i, vec| { (i, vec.iter().sum()) } + ), SinkVec::build() ]; @@ -1102,13 +1099,12 @@ mod test { let pipe = pipeline![ SourceIter::build(set.into_iter()), - OrderedMapReduce::build_with_replicas(2, 4, - |el: (usize, f64)| -> (usize, f64) { - (el.0, el.1 * el.1) - }, - |i, vec| { - (i, vec.iter().sum()) - }), + OrderedMapReduce::build_with_replicas( + 2, + 4, + |el: (usize, f64)| -> (usize, f64) { (el.0, el.1 * el.1) }, + |i, vec| { (i, vec.iter().sum()) } + ), OrderedSinkVec::build() ]; diff --git a/src/templates/misc.rs b/src/templates/misc.rs index 366f4d84..f4127cec 100644 --- a/src/templates/misc.rs +++ b/src/templates/misc.rs @@ -1,4 +1,4 @@ -use std::{marker::PhantomData, collections::VecDeque}; +use std::{collections::VecDeque, marker::PhantomData}; use crate::pipeline::node::{In, InOut, Out}; @@ -17,8 +17,24 @@ where I: Iterator, T: Send + 'static, { - /// Creates a new source from a iterator. + /// Creates a new source from any type that implements the `Iterator` trait. /// The source will terminate when the iterator is exhausted. + /// + /// # Examples + /// + /// In this example we create a source node using a [`SourceIter`] + /// template that emits numbers from 1 to 21. + /// + /// ``` + /// use ppl::{prelude::*, templates::misc::{SourceIter, Sequential, SinkVec}}; + /// + /// let p = pipeline![ + /// SourceIter::build(1..21), + /// Sequential::build(|el| { el }), + /// SinkVec::build() + /// ]; + /// let res = p.start_and_wait_end().unwrap(); + /// ``` pub fn build(iterator: I) -> impl Out { Self { iterator, @@ -40,7 +56,7 @@ where /// /// Sink node that accumulates data into a vector. /// The sink will terminate when the upstream terminates. -/// The sink will produce a vector containing all the data received +/// The sink will produce as result a vector containing all the data received /// from the upstream. pub struct SinkVec { data: Vec, @@ -52,6 +68,26 @@ where /// Creates a new sink that accumulates data into a vector. /// The sink will terminate when the upstream terminates. /// The sink will produce a vector containing all the data received. + /// + /// # Examples + /// + /// In this example we send element by element the content of a vector + /// through a pipeline. + /// Using the [`SinkVec`] template, we create a sink node that collects + /// the data received. + /// + /// ``` + /// use ppl::{prelude::*, templates::misc::{SourceIter, Sequential, SinkVec}}; + /// + /// let data = vec![1, 2, 3, 4, 5]; + /// let p = pipeline![ + /// SourceIter::build(data.into_iter()), + /// Sequential::build(|el| { el }), + /// SinkVec::build() + /// ]; + /// let res = p.start_and_wait_end().unwrap(); + /// assert_eq!(res, vec![1, 2, 3, 4, 5]) + /// ``` pub fn build() -> impl In> { Self { data: Vec::new() } } @@ -85,6 +121,24 @@ where { /// Creates a new splitter node. /// The node will terminate when the upstream terminates. + /// + /// # Examples + /// Given a stream of numbers, we create a pipeline with a splitter that + /// create vectors of two elements each. + /// + /// ``` + /// use ppl::{prelude::*, templates::misc::{SourceIter, Splitter, SinkVec, Aggregator}}; + /// + /// let vec = vec![1, 2, 3, 4, 5, 6, 7, 8]; + /// let p = pipeline![ + /// SourceIter::build(vec.into_iter()), + /// Aggregator::build(8), // We aggregate each element in a vector. + /// Splitter::build(2), // We split the received vector in 4 sub-vector of size 2. + /// SinkVec::build() + /// ]; + /// let mut res = p.start_and_wait_end().unwrap(); + /// assert_eq!(res.len(), 4) + /// ``` pub fn build(chunk_size: usize) -> impl InOut, Vec> { Self { chunk_size, @@ -147,6 +201,22 @@ where { /// Creates a new aggregator node /// The node will terminate when the upstream terminates. + /// + /// # Examples + /// Given a stream of numbers, we use an [`Aggregator`] template to + /// group the elements of this stream in vectors of size 100. + /// + /// ``` + /// use ppl::{prelude::*, templates::misc::{SourceIter, SinkVec, Aggregator}}; + /// + /// let p = pipeline![ + /// SourceIter::build(0..2000), + /// Aggregator::build(100), + /// SinkVec::build() + /// ]; + /// let res = p.start_and_wait_end().unwrap(); + /// assert_eq!(res.len(), 20); + /// ``` pub fn build(chunk_size: usize) -> impl InOut> { Self { chunk_size, @@ -296,6 +366,22 @@ where { /// Creates a new filter node. /// The node will terminate when the upstream terminates. + /// + /// # Examples + /// Given a set of numbers from 0 to 199, we use a [`Filter`] + /// template to filter the even numbers. + /// ``` + /// use ppl::{prelude::*, templates::misc::{SourceIter, SinkVec, Filter}}; + /// + /// let p = pipeline![ + /// SourceIter::build(0..200), + /// Filter::build(|el| { el % 2 == 0 }), + /// SinkVec::build() + /// ]; + /// + /// let res = p.start_and_wait_end().unwrap(); + /// assert_eq!(res.len(), 100) + /// ``` pub fn build(f: F) -> impl InOut { Self { f, @@ -370,7 +456,6 @@ where } } - /// OrderedSplitter. /// /// This node receives a vector, split it into chunks of size `chunk_size` @@ -647,20 +732,25 @@ where #[cfg(test)] mod test { - use serial_test::serial; use crate::{ prelude::*, - templates::misc::{SinkVec, SourceIter, Sequential, OrderedSinkVec, OrderedSequential, Parallel, OrderedParallel, OrderedSplitter, OrderedAggregator, Filter, OrderedFilter}, + templates::misc::{ + Filter, OrderedAggregator, OrderedFilter, OrderedParallel, OrderedSequential, + OrderedSinkVec, OrderedSplitter, Parallel, Sequential, SinkVec, SourceIter, + }, }; + use serial_test::serial; - use super::{Splitter, Aggregator}; + use super::{Aggregator, Splitter}; #[test] #[serial] fn simple_pipeline() { let p = pipeline![ SourceIter::build(1..21), - Sequential::build(|el| { el /*println!("Hello, received: {}", el); */ }), + Sequential::build(|el| { + el /*println!("Hello, received: {}", el); */ + }), SinkVec::build() ]; @@ -674,7 +764,9 @@ mod test { fn simple_pipeline_ordered() { let p = pipeline![ SourceIter::build(1..21), - OrderedSequential::build(|el| { el /*println!("Hello, received: {}", el); */ }), + OrderedSequential::build(|el| { + el /*println!("Hello, received: {}", el); */ + }), OrderedSinkVec::build() ]; @@ -685,7 +777,8 @@ mod test { let mut counter = 1; for el in res { assert_eq!(el, counter); - counter += 1; } + counter += 1; + } } #[test] @@ -693,7 +786,9 @@ mod test { fn simple_farm() { let p = pipeline![ SourceIter::build(1..21), - Parallel::build(8, |el| { el /*println!("Hello, received: {}", el); */ }), + Parallel::build(8, |el| { + el /*println!("Hello, received: {}", el); */ + }), SinkVec::build() ]; @@ -707,7 +802,9 @@ mod test { fn simple_farm_ordered() { let p = pipeline![ SourceIter::build(1..21), - OrderedParallel::build(8, |el| { el /*println!("Hello, received: {}", el); */ }), + OrderedParallel::build(8, |el| { + el /*println!("Hello, received: {}", el); */ + }), OrderedSinkVec::build() ]; @@ -718,7 +815,8 @@ mod test { let mut counter = 1; for el in res { assert_eq!(el, counter); - counter += 1; } + counter += 1; + } } #[test] @@ -749,7 +847,7 @@ mod test { assert_eq!(res.len(), 1); let vec = res.pop().unwrap(); - + assert_eq!(vec.len(), 20000) } @@ -781,7 +879,7 @@ mod test { assert_eq!(res.len(), 1); let vec = res.pop().unwrap(); - + assert_eq!(vec.len(), 20000); counter = 1; @@ -861,4 +959,4 @@ mod test { counter += 2; } } -} \ No newline at end of file +} From ccf4516d478ebb4ebfa229f7b47ca348445f82af Mon Sep 17 00:00:00 2001 From: Valerio Besozzi Date: Sun, 5 Nov 2023 14:28:49 +0100 Subject: [PATCH 12/13] Improve docs for Templates --- src/templates/map.rs | 2 +- src/templates/misc.rs | 33 +++++++++------------------------ 2 files changed, 10 insertions(+), 25 deletions(-) diff --git a/src/templates/map.rs b/src/templates/map.rs index e261aa50..92d09164 100644 --- a/src/templates/map.rs +++ b/src/templates/map.rs @@ -646,7 +646,7 @@ where } } -/// Ordered Map Reduce +/// Ordered MapReduce /// /// Nodes of this type are composed of a Map and a Reduce. /// The Map is applied to each element of the input, and the Reduce is applied to the output of the Map. diff --git a/src/templates/misc.rs b/src/templates/misc.rs index f4127cec..477ef5e9 100644 --- a/src/templates/misc.rs +++ b/src/templates/misc.rs @@ -55,9 +55,6 @@ where /// SinkVec. /// /// Sink node that accumulates data into a vector. -/// The sink will terminate when the upstream terminates. -/// The sink will produce as result a vector containing all the data received -/// from the upstream. pub struct SinkVec { data: Vec, } @@ -108,7 +105,6 @@ where /// /// This node receives a vector, split it into chunks of size `chunk_size` /// and send each chunk to the next node. -/// The node will terminate when the upstream terminates. #[derive(Clone)] pub struct Splitter { chunk_size: usize, @@ -188,7 +184,6 @@ where /// /// This node receives elements and accumulates them into a vector. /// When the vector reaches the size `chunk_size` it send the vector with the elements accumulated to the next node. -/// The node will terminate when the upstream terminates. #[derive(Clone)] pub struct Aggregator { chunk_size: usize, @@ -199,7 +194,7 @@ impl Aggregator where T: Send + 'static + Clone, { - /// Creates a new aggregator node + /// Creates a new aggregator node. /// The node will terminate when the upstream terminates. /// /// # Examples @@ -225,7 +220,7 @@ where } } - /// Creates a new aggregator nod with 'n_replicas' replicas of the same node. + /// Creates a new aggregator node with 'n_replicas' replicas of the same node. /// The node will terminate when the upstream terminates. pub fn build_with_replicas(n_replicas: usize, chunk_size: usize) -> impl InOut> { Self { @@ -348,7 +343,6 @@ where /// Filter. /// /// This node receives elements and filters them according to the given predicate. -/// The node will terminate when the upstream terminates. #[derive(Clone)] pub struct Filter where @@ -421,8 +415,7 @@ where /// OrderedSinkVec. /// /// Sink node that accumulates data into a vector. -/// This is a ordered version of SinkVec. -/// The sink will terminate when the upstream terminates. +/// This is an ordered version of [`SinkVec`]. /// The sink will produce a vector containing all the data received in the same order /// as it was received from the upstream. pub struct OrderedSinkVec { @@ -432,11 +425,8 @@ impl OrderedSinkVec where T: Send + 'static, { - /// Creates a new sink that accumulates data into a vector. - /// This is a ordered version of SinkVec. + /// Creates a new ordered sink that accumulates data into a vector. /// The sink will terminate when the upstream terminates. - /// The sink will produce a vector containing all the data received in the same order - /// as it was received from the upstream. pub fn build() -> impl In> { Self { data: Vec::new() } } @@ -460,7 +450,7 @@ where /// /// This node receives a vector, split it into chunks of size `chunk_size` /// and send each chunk to the next node. -/// The node will terminate when the upstream terminates. +/// This is an ordered versione of [`Splitter`]. /// This node mantains the order of the input in the output. #[derive(Clone)] pub struct OrderedSplitter { @@ -526,7 +516,7 @@ where /// /// This node receives elements and accumulates them into a vector. /// When the vector reaches the size `chunk_size` it send the vector with the elements accumulated to the next node. -/// The node will terminate when the upstream terminates. +/// This is an ordered version of [`Aggregator`]. /// This node mantains the order of the input in the output. #[derive(Clone)] pub struct OrderedAggregator { @@ -591,8 +581,7 @@ where /// OrderedSequential. /// /// This node receives elements and applies a function to each element. -/// This is a ordered version of Sequential. -/// The node will terminate when the upstream terminates. +/// This is an ordered version of [`Sequential`]. /// The node will produce data in the same order as it is received from the upstream. #[derive(Clone)] pub struct OrderedSequential { @@ -607,7 +596,6 @@ where { /// Creates a new sequential node. /// The node will terminate when the upstream terminates. - /// The node will produce data in the same order as it is received from the upstream. pub fn build(f: F) -> impl InOut { Self { f, @@ -632,8 +620,7 @@ where /// OrderedParallel. /// /// This node receives elements and applies a function to each element. -/// This is a ordered version of Parallel. -/// The node will terminate when the upstream terminates. +/// This is an ordered version of [`Parallel`]. /// The node will produce data in the same order as it is received from the upstream. #[derive(Clone)] pub struct OrderedParallel { @@ -649,7 +636,6 @@ where { /// Creates a new parallel node. /// The node will terminate when the upstream terminates. - /// The node will produce data in the same order as it is received from the upstream. pub fn build(n_replicas: usize, f: F) -> impl InOut { Self { f, @@ -678,8 +664,7 @@ where /// OrderedFilter. /// /// This node receives elements and filters them according to a predicate. -/// This is a ordered version of Filter. -/// The node will terminate when the upstream terminates. +/// This is an ordered version of [`Filter`]. #[derive(Clone)] pub struct OrderedFilter { f: F, From 91930abe73a4b3b7659a19b5cf10744c7d98060d Mon Sep 17 00:00:00 2001 From: Valerio Besozzi Date: Wed, 8 Nov 2023 13:21:31 +0100 Subject: [PATCH 13/13] Improve docs for Templates --- src/templates/misc.rs | 94 ++++++++++++++++++++++++++++++++----------- 1 file changed, 70 insertions(+), 24 deletions(-) diff --git a/src/templates/misc.rs b/src/templates/misc.rs index 477ef5e9..8d4baa0c 100644 --- a/src/templates/misc.rs +++ b/src/templates/misc.rs @@ -19,6 +19,10 @@ where { /// Creates a new source from any type that implements the `Iterator` trait. /// The source will terminate when the iterator is exhausted. + /// + /// # Arguments + /// * `iterator` - Type that implements the [`Iterator`] trait + /// and represents the stream of data we want emit. /// /// # Examples /// @@ -63,8 +67,8 @@ where T: Send + 'static, { /// Creates a new sink that accumulates data into a vector. - /// The sink will terminate when the upstream terminates. - /// The sink will produce a vector containing all the data received. + /// The sink will terminate when the stream terminates, producing + /// a vector containing all the data received. /// /// # Examples /// @@ -116,8 +120,10 @@ where T: Send + 'static + Clone, { /// Creates a new splitter node. - /// The node will terminate when the upstream terminates. /// + /// # Arguments + /// * `chunk_size` - Number of elements for each chunk. + /// /// # Examples /// Given a stream of numbers, we create a pipeline with a splitter that /// create vectors of two elements each. @@ -144,7 +150,10 @@ where } /// Creates a new splitter node with 'n_replicas' replicas of the same node. - /// The node will terminate when the upstream terminates. + /// + /// # Arguments + /// * `n_replicas` - Number of replicas. + /// * `chunk_size` - Number of elements for each chunk. pub fn build_with_replicas(n_replicas: usize, chunk_size: usize) -> impl InOut, Vec> { Self { chunk_size, @@ -195,7 +204,9 @@ where T: Send + 'static + Clone, { /// Creates a new aggregator node. - /// The node will terminate when the upstream terminates. + /// + /// # Arguments + /// * `chunk_size` - Number of elements for each chunk. /// /// # Examples /// Given a stream of numbers, we use an [`Aggregator`] template to @@ -221,7 +232,10 @@ where } /// Creates a new aggregator node with 'n_replicas' replicas of the same node. - /// The node will terminate when the upstream terminates. + /// + /// # Arguments + /// * `n_replicas` - Number of replicas. + /// * `chunk_size` - Number of elements for each chunk. pub fn build_with_replicas(n_replicas: usize, chunk_size: usize) -> impl InOut> { Self { chunk_size, @@ -259,7 +273,7 @@ where /// Sequential node. /// -/// Given a function that defines the logic of the stage, this method will create a stage with one replica. +/// Given a function that defines the logic of the node, this method will create a node with one replica. #[derive(Clone)] pub struct Sequential where @@ -277,7 +291,10 @@ where F: FnMut(T) -> U + Send + 'static + Clone, { /// Creates a new sequential node. - /// The node will terminate when the upstream terminates. + /// + /// # Arguments + /// * `f` - Function name or lambda function that specify the logic + /// of this node. pub fn build(f: F) -> impl InOut { Self { f, @@ -298,7 +315,7 @@ where /// Parallel node. /// -/// Given a function that defines the logic of the stage, this method will create 'n_replicas' replicas of that stage. +/// Given a function that defines the logic of the node, this method will create 'n_replicas' replicas of that node. #[derive(Clone)] pub struct Parallel where @@ -316,8 +333,12 @@ where U: Send + 'static + Clone, F: FnMut(T) -> U + Send + 'static + Clone, { - /// Creates a new parallel node - /// The node will terminate when the upstream terminates. + /// Creates a new parallel node. + /// + /// # Arguments + /// * `n_replicas` - Number of replicas. + /// * `f` - Function name or lambda function that specify the logic + /// of this node. pub fn build(n_replicas: usize, f: F) -> impl InOut { Self { n_replicas, @@ -359,7 +380,10 @@ where F: FnMut(&T) -> bool + Send + 'static + Clone, { /// Creates a new filter node. - /// The node will terminate when the upstream terminates. + /// + /// # Arguments + /// * `f` - Function name or lambda function that represent the predicate + /// function we want to apply. /// /// # Examples /// Given a set of numbers from 0 to 199, we use a [`Filter`] @@ -384,7 +408,11 @@ where } } /// Creates a new filter node with 'n_replicas' replicas of the same node. - /// The node will terminate when the upstream terminates. + /// + /// # Arguments + /// * `n_replicas` - Number of replicas. + /// * `f` - Function name or lambda function that represent the predicate + /// function we want to apply. pub fn build_with_replicas(n_replicas: usize, f: F) -> impl InOut { Self { f, @@ -417,7 +445,7 @@ where /// Sink node that accumulates data into a vector. /// This is an ordered version of [`SinkVec`]. /// The sink will produce a vector containing all the data received in the same order -/// as it was received from the upstream. +/// as it was received. pub struct OrderedSinkVec { data: Vec, } @@ -425,8 +453,9 @@ impl OrderedSinkVec where T: Send + 'static, { - /// Creates a new ordered sink that accumulates data into a vector. - /// The sink will terminate when the upstream terminates. + /// Creates a new sink that accumulates data into a vector. + /// The sink will terminate when the stream terminates, producing + /// a vector containing all the data received. pub fn build() -> impl In> { Self { data: Vec::new() } } @@ -463,7 +492,8 @@ where T: Send + 'static + Clone, { /// Creates a new ordered splitter node. - /// The node will terminate when the upstream terminates. + /// # Arguments + /// * `chunk_size` - Number of elements for each chunk. pub fn build(chunk_size: usize) -> impl InOut, Vec> { Self { chunk_size, @@ -473,7 +503,9 @@ where } /// Creates a new ordered splitter node with 'n_replicas' replicas of the same node. - /// The node will terminate when the upstream terminates. + /// # Arguments + /// * `n_replicas` - Number of replicas. + /// * `chunk_size` - Number of elements for each chunk. pub fn build_with_replicas(n_replicas: usize, chunk_size: usize) -> impl InOut, Vec> { Self { chunk_size, @@ -529,7 +561,9 @@ where T: Send + 'static + Clone, { /// Creates a new ordered aggregator node - /// The node will terminate when the upstream terminates. + /// + /// # Arguments + /// * `chunk_size` - Number of elements for each chunk. pub fn build(chunk_size: usize) -> impl InOut> { Self { chunk_size, @@ -539,7 +573,9 @@ where } /// Creates a new ordered aggregator nod with 'n_replicas' replicas of the same node. - /// The node will terminate when the upstream terminates. + /// # Arguments + /// * `n_replicas` - Number of replicas. + /// * `chunk_size` - Number of elements for each chunk. pub fn build_with_replicas(n_replicas: usize, chunk_size: usize) -> impl InOut> { Self { chunk_size, @@ -595,7 +631,9 @@ where F: FnMut(T) -> U + Send + 'static + Clone, { /// Creates a new sequential node. - /// The node will terminate when the upstream terminates. + /// # Arguments + /// * `f` - Function name or lambda function that specify the logic + /// of this node. pub fn build(f: F) -> impl InOut { Self { f, @@ -635,7 +673,10 @@ where F: FnMut(T) -> U + Send + 'static + Clone, { /// Creates a new parallel node. - /// The node will terminate when the upstream terminates. + /// # Arguments + /// * `n_replicas` - Number of replicas. + /// * `f` - Function name or lambda function that specify the logic + /// of this node. pub fn build(n_replicas: usize, f: F) -> impl InOut { Self { f, @@ -677,7 +718,9 @@ where F: FnMut(&T) -> bool + Send + 'static + Clone, { /// Creates a new filter node. - /// The node will terminate when the upstream terminates. + /// # Arguments + /// * `f` - Function name or lambda function that represent the predicate + /// function we want to apply. pub fn build(f: F) -> impl InOut { Self { f, @@ -686,7 +729,10 @@ where } } /// Creates a new filter node. - /// The node will terminate when the upstream terminates. + /// # Arguments + /// * `n_replicas` - Number of replicas. + /// * `f` - Function name or lambda function that represent the predicate + /// function we want to apply. pub fn build_with_replicas(n_replicas: usize, f: F) -> impl InOut { Self { f,