Skip to content

Commit

Permalink
Add test for misc templates
Browse files Browse the repository at this point in the history
  • Loading branch information
valebes committed Nov 3, 2023
1 parent fd3a4d1 commit 60a1659
Show file tree
Hide file tree
Showing 2 changed files with 136 additions and 42 deletions.
48 changes: 22 additions & 26 deletions src/templates/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -535,16 +535,16 @@ where
/// * `n_worker` - Number of worker threads.
/// * `f_map` - Function to apply to each element of the input.
/// * `f_reduce` - Function to apply to the output of the Map.
///
///
/// # Examples
///
///
/// Given a vector of vectors, each one containing a set of numbers,
/// compute for each vector the square value of each of its elements.
/// Furthermore, compute for each vector the summation of all its elements.
///
///
/// ```
/// use ppl::{prelude::*, templates::misc::{SourceIter, SinkVec}, templates::map::MapReduce};
///
///
/// let mut counter = 1.0;
/// let mut set = Vec::new();
///
Expand All @@ -560,7 +560,7 @@ where
/// // Instantiate a new pipeline.
/// let pipe = pipeline![
/// SourceIter::build(set.into_iter()),
/// MapReduce::build(8,
/// MapReduce::build(8,
/// |el: (usize, f64)| -> (usize, f64) {
/// (el.0, el.1 * el.1)
/// },
Expand Down Expand Up @@ -714,16 +714,16 @@ where
/// The replicas are created by cloning the OrderedMapReduce node.
/// This mean that 4 replicas of an OrderedMapReduce node with 2 workers each
/// will result in the usage of 8 threads.
///
///
/// # Examples
///
///
/// Given a vector of vectors, each one containing a set of numbers,
/// compute for each vector the square value of each of its elements.
/// Furthermore, compute for each vector the summation of all its elements.
/// In this example we want mantain the order of the input in the output.
/// ```
/// use ppl::{prelude::*, templates::misc::{SourceIter, OrderedSinkVec}, templates::map::OrderedMapReduce};
///
///
/// let mut counter = 1.0;
/// let mut set = Vec::new();
///
Expand All @@ -739,7 +739,7 @@ where
/// // Instantiate a new pipeline.
/// let pipe = pipeline![
/// SourceIter::build(set.into_iter()),
/// OrderedMapReduce::build_with_replicas(2, 8,
/// OrderedMapReduce::build_with_replicas(2, 8,
/// |el: (usize, f64)| -> (usize, f64) {
/// (el.0, el.1 * el.1)
/// },
Expand All @@ -750,7 +750,7 @@ where
/// ];
///
/// let res: Vec<Vec<(usize, f64)>> = pipe.start_and_wait_end().unwrap();
///
///
/// // We check here also if the order of the input was preserved
/// // in the output.
/// for (check, vec) in res.into_iter().enumerate() {
Expand Down Expand Up @@ -815,8 +815,8 @@ mod test {
use crate::{
prelude::*,
templates::{
map::{OrderedReduce, OrderedMapReduce},
map::MapReduce,
map::{OrderedMapReduce, OrderedReduce},
misc::{OrderedSinkVec, SinkVec, SourceIter},
},
};
Expand Down Expand Up @@ -1039,7 +1039,6 @@ mod test {
}
}


#[test]
#[serial]
fn summation_of_squares() {
Expand All @@ -1058,13 +1057,11 @@ mod test {

let pipe = pipeline![
SourceIter::build(set.into_iter()),
MapReduce::build(8,
|el: (usize, f64)| -> (usize, f64) {
(el.0, el.1 * el.1)
},
|i, vec| {
(i, vec.iter().sum())
}),
MapReduce::build(
8,
|el: (usize, f64)| -> (usize, f64) { (el.0, el.1 * el.1) },
|i, vec| { (i, vec.iter().sum()) }
),
SinkVec::build()
];

Expand Down Expand Up @@ -1102,13 +1099,12 @@ mod test {

let pipe = pipeline![
SourceIter::build(set.into_iter()),
OrderedMapReduce::build_with_replicas(2, 4,
|el: (usize, f64)| -> (usize, f64) {
(el.0, el.1 * el.1)
},
|i, vec| {
(i, vec.iter().sum())
}),
OrderedMapReduce::build_with_replicas(
2,
4,
|el: (usize, f64)| -> (usize, f64) { (el.0, el.1 * el.1) },
|i, vec| { (i, vec.iter().sum()) }
),
OrderedSinkVec::build()
];

Expand Down
130 changes: 114 additions & 16 deletions src/templates/misc.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{marker::PhantomData, collections::VecDeque};
use std::{collections::VecDeque, marker::PhantomData};

use crate::pipeline::node::{In, InOut, Out};

Expand All @@ -17,8 +17,24 @@ where
I: Iterator<Item = T>,
T: Send + 'static,
{
/// Creates a new source from a iterator.
/// Creates a new source from any type that implements the `Iterator` trait.
/// The source will terminate when the iterator is exhausted.
///
/// # Examples
///
/// In this example we create a source node using a [`SourceIter`]
/// template that emits numbers from 1 to 21.
///
/// ```
/// use ppl::{prelude::*, templates::misc::{SourceIter, Sequential, SinkVec}};
///
/// let p = pipeline![
/// SourceIter::build(1..21),
/// Sequential::build(|el| { el }),
/// SinkVec::build()
/// ];
/// let res = p.start_and_wait_end().unwrap();
/// ```
pub fn build(iterator: I) -> impl Out<T> {
Self {
iterator,
Expand All @@ -40,7 +56,7 @@ where
///
/// Sink node that accumulates data into a vector.
/// The sink will terminate when the upstream terminates.
/// The sink will produce a vector containing all the data received
/// The sink will produce as result a vector containing all the data received
/// from the upstream.
pub struct SinkVec<T> {
data: Vec<T>,
Expand All @@ -52,6 +68,26 @@ where
/// Creates a new sink that accumulates data into a vector.
/// The sink will terminate when the upstream terminates.
/// The sink will produce a vector containing all the data received.
///
/// # Examples
///
/// In this example we send element by element the content of a vector
/// through a pipeline.
/// Using the [`SinkVec`] template, we create a sink node that collects
/// the data received.
///
/// ```
/// use ppl::{prelude::*, templates::misc::{SourceIter, Sequential, SinkVec}};
///
/// let data = vec![1, 2, 3, 4, 5];
/// let p = pipeline![
/// SourceIter::build(data.into_iter()),
/// Sequential::build(|el| { el }),
/// SinkVec::build()
/// ];
/// let res = p.start_and_wait_end().unwrap();
/// assert_eq!(res, vec![1, 2, 3, 4, 5])
/// ```
pub fn build() -> impl In<T, Vec<T>> {
Self { data: Vec::new() }
}
Expand Down Expand Up @@ -85,6 +121,24 @@ where
{
/// Creates a new splitter node.
/// The node will terminate when the upstream terminates.
///
/// # Examples
/// Given a stream of numbers, we create a pipeline with a splitter that
/// create vectors of two elements each.
///
/// ```
/// use ppl::{prelude::*, templates::misc::{SourceIter, Splitter, SinkVec, Aggregator}};
///
/// let vec = vec![1, 2, 3, 4, 5, 6, 7, 8];
/// let p = pipeline![
/// SourceIter::build(vec.into_iter()),
/// Aggregator::build(8), // We aggregate each element in a vector.
/// Splitter::build(2), // We split the received vector in 4 sub-vector of size 2.
/// SinkVec::build()
/// ];
/// let mut res = p.start_and_wait_end().unwrap();
/// assert_eq!(res.len(), 4)
/// ```
pub fn build(chunk_size: usize) -> impl InOut<Vec<T>, Vec<T>> {
Self {
chunk_size,
Expand Down Expand Up @@ -147,6 +201,22 @@ where
{
/// Creates a new aggregator node
/// The node will terminate when the upstream terminates.
///
/// # Examples
/// Given a stream of numbers, we use an [`Aggregator`] template to
/// group the elements of this stream in vectors of size 100.
///
/// ```
/// use ppl::{prelude::*, templates::misc::{SourceIter, SinkVec, Aggregator}};
///
/// let p = pipeline![
/// SourceIter::build(0..2000),
/// Aggregator::build(100),
/// SinkVec::build()
/// ];
/// let res = p.start_and_wait_end().unwrap();
/// assert_eq!(res.len(), 20);
/// ```
pub fn build(chunk_size: usize) -> impl InOut<T, Vec<T>> {
Self {
chunk_size,
Expand Down Expand Up @@ -296,6 +366,22 @@ where
{
/// Creates a new filter node.
/// The node will terminate when the upstream terminates.
///
/// # Examples
/// Given a set of numbers from 0 to 199, we use a [`Filter`]
/// template to filter the even numbers.
/// ```
/// use ppl::{prelude::*, templates::misc::{SourceIter, SinkVec, Filter}};
///
/// let p = pipeline![
/// SourceIter::build(0..200),
/// Filter::build(|el| { el % 2 == 0 }),
/// SinkVec::build()
/// ];
///
/// let res = p.start_and_wait_end().unwrap();
/// assert_eq!(res.len(), 100)
/// ```
pub fn build(f: F) -> impl InOut<T, T> {
Self {
f,
Expand Down Expand Up @@ -370,7 +456,6 @@ where
}
}


/// OrderedSplitter.
///
/// This node receives a vector, split it into chunks of size `chunk_size`
Expand Down Expand Up @@ -647,20 +732,25 @@ where

#[cfg(test)]
mod test {
use serial_test::serial;
use crate::{
prelude::*,
templates::misc::{SinkVec, SourceIter, Sequential, OrderedSinkVec, OrderedSequential, Parallel, OrderedParallel, OrderedSplitter, OrderedAggregator, Filter, OrderedFilter},
templates::misc::{
Filter, OrderedAggregator, OrderedFilter, OrderedParallel, OrderedSequential,
OrderedSinkVec, OrderedSplitter, Parallel, Sequential, SinkVec, SourceIter,
},
};
use serial_test::serial;

use super::{Splitter, Aggregator};
use super::{Aggregator, Splitter};

#[test]
#[serial]
fn simple_pipeline() {
let p = pipeline![
SourceIter::build(1..21),
Sequential::build(|el| { el /*println!("Hello, received: {}", el); */ }),
Sequential::build(|el| {
el /*println!("Hello, received: {}", el); */
}),
SinkVec::build()
];

Expand All @@ -674,7 +764,9 @@ mod test {
fn simple_pipeline_ordered() {
let p = pipeline![
SourceIter::build(1..21),
OrderedSequential::build(|el| { el /*println!("Hello, received: {}", el); */ }),
OrderedSequential::build(|el| {
el /*println!("Hello, received: {}", el); */
}),
OrderedSinkVec::build()
];

Expand All @@ -685,15 +777,18 @@ mod test {
let mut counter = 1;
for el in res {
assert_eq!(el, counter);
counter += 1; }
counter += 1;
}
}

#[test]
#[serial]
fn simple_farm() {
let p = pipeline![
SourceIter::build(1..21),
Parallel::build(8, |el| { el /*println!("Hello, received: {}", el); */ }),
Parallel::build(8, |el| {
el /*println!("Hello, received: {}", el); */
}),
SinkVec::build()
];

Expand All @@ -707,7 +802,9 @@ mod test {
fn simple_farm_ordered() {
let p = pipeline![
SourceIter::build(1..21),
OrderedParallel::build(8, |el| { el /*println!("Hello, received: {}", el); */ }),
OrderedParallel::build(8, |el| {
el /*println!("Hello, received: {}", el); */
}),
OrderedSinkVec::build()
];

Expand All @@ -718,7 +815,8 @@ mod test {
let mut counter = 1;
for el in res {
assert_eq!(el, counter);
counter += 1; }
counter += 1;
}
}

#[test]
Expand Down Expand Up @@ -749,7 +847,7 @@ mod test {
assert_eq!(res.len(), 1);

let vec = res.pop().unwrap();

assert_eq!(vec.len(), 20000)
}

Expand Down Expand Up @@ -781,7 +879,7 @@ mod test {
assert_eq!(res.len(), 1);

let vec = res.pop().unwrap();

assert_eq!(vec.len(), 20000);

counter = 1;
Expand Down Expand Up @@ -861,4 +959,4 @@ mod test {
counter += 2;
}
}
}
}

0 comments on commit 60a1659

Please sign in to comment.