From 60a1659b5d915c74eaa47a7546833618e0e0b2ac Mon Sep 17 00:00:00 2001 From: Valerio Besozzi Date: Fri, 3 Nov 2023 12:45:53 +0100 Subject: [PATCH] Add test for misc templates --- src/templates/map.rs | 48 +++++++--------- src/templates/misc.rs | 130 ++++++++++++++++++++++++++++++++++++------ 2 files changed, 136 insertions(+), 42 deletions(-) diff --git a/src/templates/map.rs b/src/templates/map.rs index 61a017f2..e261aa50 100644 --- a/src/templates/map.rs +++ b/src/templates/map.rs @@ -535,16 +535,16 @@ where /// * `n_worker` - Number of worker threads. /// * `f_map` - Function to apply to each element of the input. /// * `f_reduce` - Function to apply to the output of the Map. - /// + /// /// # Examples - /// + /// /// Given a vector of vectors, each one containing a set of numbers, /// compute for each vector the square value of each of its elements. /// Furthermore, compute for each vector the summation of all its elements. - /// + /// /// ``` /// use ppl::{prelude::*, templates::misc::{SourceIter, SinkVec}, templates::map::MapReduce}; - /// + /// /// let mut counter = 1.0; /// let mut set = Vec::new(); /// @@ -560,7 +560,7 @@ where /// // Instantiate a new pipeline. /// let pipe = pipeline![ /// SourceIter::build(set.into_iter()), - /// MapReduce::build(8, + /// MapReduce::build(8, /// |el: (usize, f64)| -> (usize, f64) { /// (el.0, el.1 * el.1) /// }, @@ -714,16 +714,16 @@ where /// The replicas are created by cloning the OrderedMapReduce node. /// This mean that 4 replicas of an OrderedMapReduce node with 2 workers each /// will result in the usage of 8 threads. - /// + /// /// # Examples - /// + /// /// Given a vector of vectors, each one containing a set of numbers, /// compute for each vector the square value of each of its elements. /// Furthermore, compute for each vector the summation of all its elements. /// In this example we want mantain the order of the input in the output. /// ``` /// use ppl::{prelude::*, templates::misc::{SourceIter, OrderedSinkVec}, templates::map::OrderedMapReduce}; - /// + /// /// let mut counter = 1.0; /// let mut set = Vec::new(); /// @@ -739,7 +739,7 @@ where /// // Instantiate a new pipeline. /// let pipe = pipeline![ /// SourceIter::build(set.into_iter()), - /// OrderedMapReduce::build_with_replicas(2, 8, + /// OrderedMapReduce::build_with_replicas(2, 8, /// |el: (usize, f64)| -> (usize, f64) { /// (el.0, el.1 * el.1) /// }, @@ -750,7 +750,7 @@ where /// ]; /// /// let res: Vec> = pipe.start_and_wait_end().unwrap(); - /// + /// /// // We check here also if the order of the input was preserved /// // in the output. /// for (check, vec) in res.into_iter().enumerate() { @@ -815,8 +815,8 @@ mod test { use crate::{ prelude::*, templates::{ - map::{OrderedReduce, OrderedMapReduce}, map::MapReduce, + map::{OrderedMapReduce, OrderedReduce}, misc::{OrderedSinkVec, SinkVec, SourceIter}, }, }; @@ -1039,7 +1039,6 @@ mod test { } } - #[test] #[serial] fn summation_of_squares() { @@ -1058,13 +1057,11 @@ mod test { let pipe = pipeline![ SourceIter::build(set.into_iter()), - MapReduce::build(8, - |el: (usize, f64)| -> (usize, f64) { - (el.0, el.1 * el.1) - }, - |i, vec| { - (i, vec.iter().sum()) - }), + MapReduce::build( + 8, + |el: (usize, f64)| -> (usize, f64) { (el.0, el.1 * el.1) }, + |i, vec| { (i, vec.iter().sum()) } + ), SinkVec::build() ]; @@ -1102,13 +1099,12 @@ mod test { let pipe = pipeline![ SourceIter::build(set.into_iter()), - OrderedMapReduce::build_with_replicas(2, 4, - |el: (usize, f64)| -> (usize, f64) { - (el.0, el.1 * el.1) - }, - |i, vec| { - (i, vec.iter().sum()) - }), + OrderedMapReduce::build_with_replicas( + 2, + 4, + |el: (usize, f64)| -> (usize, f64) { (el.0, el.1 * el.1) }, + |i, vec| { (i, vec.iter().sum()) } + ), OrderedSinkVec::build() ]; diff --git a/src/templates/misc.rs b/src/templates/misc.rs index 366f4d84..f4127cec 100644 --- a/src/templates/misc.rs +++ b/src/templates/misc.rs @@ -1,4 +1,4 @@ -use std::{marker::PhantomData, collections::VecDeque}; +use std::{collections::VecDeque, marker::PhantomData}; use crate::pipeline::node::{In, InOut, Out}; @@ -17,8 +17,24 @@ where I: Iterator, T: Send + 'static, { - /// Creates a new source from a iterator. + /// Creates a new source from any type that implements the `Iterator` trait. /// The source will terminate when the iterator is exhausted. + /// + /// # Examples + /// + /// In this example we create a source node using a [`SourceIter`] + /// template that emits numbers from 1 to 21. + /// + /// ``` + /// use ppl::{prelude::*, templates::misc::{SourceIter, Sequential, SinkVec}}; + /// + /// let p = pipeline![ + /// SourceIter::build(1..21), + /// Sequential::build(|el| { el }), + /// SinkVec::build() + /// ]; + /// let res = p.start_and_wait_end().unwrap(); + /// ``` pub fn build(iterator: I) -> impl Out { Self { iterator, @@ -40,7 +56,7 @@ where /// /// Sink node that accumulates data into a vector. /// The sink will terminate when the upstream terminates. -/// The sink will produce a vector containing all the data received +/// The sink will produce as result a vector containing all the data received /// from the upstream. pub struct SinkVec { data: Vec, @@ -52,6 +68,26 @@ where /// Creates a new sink that accumulates data into a vector. /// The sink will terminate when the upstream terminates. /// The sink will produce a vector containing all the data received. + /// + /// # Examples + /// + /// In this example we send element by element the content of a vector + /// through a pipeline. + /// Using the [`SinkVec`] template, we create a sink node that collects + /// the data received. + /// + /// ``` + /// use ppl::{prelude::*, templates::misc::{SourceIter, Sequential, SinkVec}}; + /// + /// let data = vec![1, 2, 3, 4, 5]; + /// let p = pipeline![ + /// SourceIter::build(data.into_iter()), + /// Sequential::build(|el| { el }), + /// SinkVec::build() + /// ]; + /// let res = p.start_and_wait_end().unwrap(); + /// assert_eq!(res, vec![1, 2, 3, 4, 5]) + /// ``` pub fn build() -> impl In> { Self { data: Vec::new() } } @@ -85,6 +121,24 @@ where { /// Creates a new splitter node. /// The node will terminate when the upstream terminates. + /// + /// # Examples + /// Given a stream of numbers, we create a pipeline with a splitter that + /// create vectors of two elements each. + /// + /// ``` + /// use ppl::{prelude::*, templates::misc::{SourceIter, Splitter, SinkVec, Aggregator}}; + /// + /// let vec = vec![1, 2, 3, 4, 5, 6, 7, 8]; + /// let p = pipeline![ + /// SourceIter::build(vec.into_iter()), + /// Aggregator::build(8), // We aggregate each element in a vector. + /// Splitter::build(2), // We split the received vector in 4 sub-vector of size 2. + /// SinkVec::build() + /// ]; + /// let mut res = p.start_and_wait_end().unwrap(); + /// assert_eq!(res.len(), 4) + /// ``` pub fn build(chunk_size: usize) -> impl InOut, Vec> { Self { chunk_size, @@ -147,6 +201,22 @@ where { /// Creates a new aggregator node /// The node will terminate when the upstream terminates. + /// + /// # Examples + /// Given a stream of numbers, we use an [`Aggregator`] template to + /// group the elements of this stream in vectors of size 100. + /// + /// ``` + /// use ppl::{prelude::*, templates::misc::{SourceIter, SinkVec, Aggregator}}; + /// + /// let p = pipeline![ + /// SourceIter::build(0..2000), + /// Aggregator::build(100), + /// SinkVec::build() + /// ]; + /// let res = p.start_and_wait_end().unwrap(); + /// assert_eq!(res.len(), 20); + /// ``` pub fn build(chunk_size: usize) -> impl InOut> { Self { chunk_size, @@ -296,6 +366,22 @@ where { /// Creates a new filter node. /// The node will terminate when the upstream terminates. + /// + /// # Examples + /// Given a set of numbers from 0 to 199, we use a [`Filter`] + /// template to filter the even numbers. + /// ``` + /// use ppl::{prelude::*, templates::misc::{SourceIter, SinkVec, Filter}}; + /// + /// let p = pipeline![ + /// SourceIter::build(0..200), + /// Filter::build(|el| { el % 2 == 0 }), + /// SinkVec::build() + /// ]; + /// + /// let res = p.start_and_wait_end().unwrap(); + /// assert_eq!(res.len(), 100) + /// ``` pub fn build(f: F) -> impl InOut { Self { f, @@ -370,7 +456,6 @@ where } } - /// OrderedSplitter. /// /// This node receives a vector, split it into chunks of size `chunk_size` @@ -647,20 +732,25 @@ where #[cfg(test)] mod test { - use serial_test::serial; use crate::{ prelude::*, - templates::misc::{SinkVec, SourceIter, Sequential, OrderedSinkVec, OrderedSequential, Parallel, OrderedParallel, OrderedSplitter, OrderedAggregator, Filter, OrderedFilter}, + templates::misc::{ + Filter, OrderedAggregator, OrderedFilter, OrderedParallel, OrderedSequential, + OrderedSinkVec, OrderedSplitter, Parallel, Sequential, SinkVec, SourceIter, + }, }; + use serial_test::serial; - use super::{Splitter, Aggregator}; + use super::{Aggregator, Splitter}; #[test] #[serial] fn simple_pipeline() { let p = pipeline![ SourceIter::build(1..21), - Sequential::build(|el| { el /*println!("Hello, received: {}", el); */ }), + Sequential::build(|el| { + el /*println!("Hello, received: {}", el); */ + }), SinkVec::build() ]; @@ -674,7 +764,9 @@ mod test { fn simple_pipeline_ordered() { let p = pipeline![ SourceIter::build(1..21), - OrderedSequential::build(|el| { el /*println!("Hello, received: {}", el); */ }), + OrderedSequential::build(|el| { + el /*println!("Hello, received: {}", el); */ + }), OrderedSinkVec::build() ]; @@ -685,7 +777,8 @@ mod test { let mut counter = 1; for el in res { assert_eq!(el, counter); - counter += 1; } + counter += 1; + } } #[test] @@ -693,7 +786,9 @@ mod test { fn simple_farm() { let p = pipeline![ SourceIter::build(1..21), - Parallel::build(8, |el| { el /*println!("Hello, received: {}", el); */ }), + Parallel::build(8, |el| { + el /*println!("Hello, received: {}", el); */ + }), SinkVec::build() ]; @@ -707,7 +802,9 @@ mod test { fn simple_farm_ordered() { let p = pipeline![ SourceIter::build(1..21), - OrderedParallel::build(8, |el| { el /*println!("Hello, received: {}", el); */ }), + OrderedParallel::build(8, |el| { + el /*println!("Hello, received: {}", el); */ + }), OrderedSinkVec::build() ]; @@ -718,7 +815,8 @@ mod test { let mut counter = 1; for el in res { assert_eq!(el, counter); - counter += 1; } + counter += 1; + } } #[test] @@ -749,7 +847,7 @@ mod test { assert_eq!(res.len(), 1); let vec = res.pop().unwrap(); - + assert_eq!(vec.len(), 20000) } @@ -781,7 +879,7 @@ mod test { assert_eq!(res.len(), 1); let vec = res.pop().unwrap(); - + assert_eq!(vec.len(), 20000); counter = 1; @@ -861,4 +959,4 @@ mod test { counter += 2; } } -} \ No newline at end of file +}