diff --git a/README.md b/README.md index 3e897133..b8c8b99b 100644 --- a/README.md +++ b/README.md @@ -275,8 +275,8 @@ The pipeline approach in PPL provides an intuitive and flexible way to express c In the word counter example, the pipeline involves the following stages: - **Source**: Reads the dataset and emits lines of text. -- **Map**: Converts each line of text into a list of words, where each word is paired with a count of 1. -- **Reduce**: Aggregates the counts of words by summing them for each unique word. +- **MapReduce**: The map function converts each line of text into a list of words, where each word is paired with a count of 1. +Moreover, the reduce function aggregates the counts of words by summing them for each unique word. - **Sink**: Stores the final word counts in a hashmap. By breaking down the computation into stages and leveraging parallelism, PPL's pipeline approach allows for efficient distribution of work across multiple threads or cores, leading to faster execution. @@ -285,7 +285,7 @@ Here's the Rust code for the word counter using the pipeline approach: ```rust use ppl::{ - templates::map::{Map, Reduce}, + templates::map::MapReduce, prelude::*, }; @@ -317,21 +317,17 @@ impl In, Vec<(String, usize)>> for Sink { pub fn ppl(dataset: &str, threads: usize) { // Initialization and configuration... - let mut p = pipeline![ + let mut p = pipeline![ Source { reader }, - Map::build::, Vec<(String, usize)>>(threads / 2, |str| -> (String, usize) { - (str, 1) - }), - Reduce::build(threads / 2, |str, count| { - let mut sum = 0; - for c in count { - sum += c; - } - (str, sum) - }), + MapReduce::build_with_replicas( + threads / 2, + |str| -> (String, usize) { (str, 1) }, // Map function + |a, b| a + b, + 2 + ), Sink { - counter: HashMap::new(), - }, + counter: HashMap::new() + } ]; p.start(); @@ -379,13 +375,7 @@ pub fn ppl_map(dataset: &str, threads: usize) { }) .collect::>(), |str| -> (String, usize) { (str, 1) }, - |str, count| { - let mut sum = 0; - for c in count { - sum += c; - } - (str, sum) - }, + |a, b| a + b, ); } ``` diff --git a/examples/wordcount/main.rs b/examples/wordcount/main.rs index fd14f5fa..23f5d650 100644 --- a/examples/wordcount/main.rs +++ b/examples/wordcount/main.rs @@ -35,7 +35,6 @@ fn main() { //"rayon" => rayon::rayon(dir_name, threads), //"std-threads" => std_threads::std_threads(dir_name, threads), "ppl" => { - timeit(|| ppl::ppl(dataset, threads)); timeit(|| ppl::ppl_combined_map_reduce(dataset, threads)); timeit(|| ppl::ppl_map(dataset, threads)); } diff --git a/examples/wordcount/ppl.rs b/examples/wordcount/ppl.rs index 9e045067..37eb8d46 100644 --- a/examples/wordcount/ppl.rs +++ b/examples/wordcount/ppl.rs @@ -8,10 +8,7 @@ use std::{ usize, }; -use ppl::{ - prelude::*, - templates::map::{Map, MapReduce, Reduce}, -}; +use ppl::{prelude::*, templates::map::MapReduce}; struct Source { reader: BufReader, @@ -59,37 +56,6 @@ impl In, Vec<(String, usize)>> for Sink { } } -pub fn ppl(dataset: &str, threads: usize) { - let file = File::open(dataset).expect("no such file"); - let reader = BufReader::new(file); - - let mut p = pipeline![ - Source { reader }, - Map::build::, Vec<(String, usize)>>(threads / 2, |str| -> (String, usize) { - (str, 1) - }), - Reduce::build(threads / 2, |str, count| { - let mut sum = 0; - for c in count { - sum += c; - } - (str, sum) - }), - Sink { - counter: HashMap::new() - } - ]; - - p.start(); - let res = p.wait_end(); - - let mut total_words = 0; - for (_key, value) in res.unwrap() { - total_words += value; - } - println!("[PIPELINE] Total words: {}", total_words); -} - // Version that use a node that combine map and reduce pub fn ppl_combined_map_reduce(dataset: &str, threads: usize) { let file = File::open(dataset).expect("no such file"); @@ -100,10 +66,7 @@ pub fn ppl_combined_map_reduce(dataset: &str, threads: usize) { MapReduce::build_with_replicas( threads / 2, |str| -> (String, usize) { (str, 1) }, // Map function - |a, b| { - // Reduce - a + b - }, + |a, b| a + b, 2 ), Sink { diff --git a/src/templates/map.rs b/src/templates/map.rs index 8fc04675..100fc916 100644 --- a/src/templates/map.rs +++ b/src/templates/map.rs @@ -237,26 +237,23 @@ where /// Reduce /// -/// In this Reduce, the elements are grouped by key and then reduced. +/// Takes in input a type that implements the [`Iterator`] trait +/// and applies a reduce function. #[derive(Clone)] -pub struct Reduce +pub struct Reduce where TIn: Send, - TKey: Send, - TReduce: Send, - F: FnOnce(TKey, Vec) -> (TKey, TReduce) + Send + Copy, + F: FnOnce(TIn, TIn) -> TIn + Send + Copy + Sync, { threadpool: ThreadPool, replicas: usize, f: F, - phantom: PhantomData<(TIn, TKey, TReduce)>, + phantom: PhantomData, } -impl Reduce +impl Reduce where TIn: Send + Clone + 'static, - TKey: Send + Clone + 'static + Ord, - TReduce: Send + Clone + 'static, - F: FnOnce(TKey, Vec) -> (TKey, TReduce) + Send + Copy, + F: FnOnce(TIn, TIn) -> TIn + Send + Copy + Sync, { /// Create a new Reduce node. /// # Arguments @@ -267,8 +264,6 @@ where /// /// Given a collection of vectors of integers, for each vector /// compute the summation of its elements. - /// As this reduce function works by grouping by key, - /// the input of the reduce function must be a vector of tuple (key, value). /// /// ``` /// @@ -277,37 +272,34 @@ where /// // Create the vector of the elements that will be emitted by the Source node. /// // vec![(key,value)] /// let vector = vec![ - /// vec![(0, 1), (0, 2), (0, 3), (0, 4), (0, 5), (0, 6), (0, 7), (0, 8), (0, 9), (0 ,10)], - /// vec![(0, 1), (0, 2), (0, 3), (0, 4), (0, 5), (0, 6), (0, 7), (0, 8), (0, 9), (0 ,10)], - /// vec![(0, 1), (0, 2), (0, 3), (0, 4), (0, 5), (0, 6), (0, 7), (0, 8), (0, 9), (0 ,10)], + /// vec![1; 10], + /// vec![1; 10], + /// vec![1; 10], /// ]; /// /// // Instantiate a new pipeline. /// let pipe = pipeline![ /// SourceIter::build(vector.into_iter()), - /// Reduce::build(4, |i, vec| -> (i32, i32) { - /// (i, vec.iter().sum()) + /// Reduce::build(4, |a, b| -> i32 { + /// a + b /// }), /// SinkVec::build() /// ]; /// /// // Start the pipeline and wait for the results. - /// let res: Vec> = pipe.start_and_wait_end().unwrap(); + /// let res: Vec = pipe.start_and_wait_end().unwrap(); /// /// // Collect a results for each vector emitted by the Source. In our case we had 3 vectors. /// assert_eq!(res.len(), 3); /// /// // As for each vector emitted we had only one key, we obtain only one result tuple /// // for vector. - /// for vec in res { - /// for el in vec { - /// assert_eq!(el.1, 55); - /// } + /// for el in res { + /// assert_eq!(el, 10); /// } - pub fn build(n_worker: usize, f: F) -> impl InOut + pub fn build(n_worker: usize, f: F) -> impl InOut where - TInIter: IntoIterator, - TOutIter: FromIterator<(TKey, TReduce)>, + TInIter: IntoIterator, { Self { threadpool: ThreadPool::with_capacity(n_worker), @@ -328,14 +320,13 @@ where /// The replicas are created by cloning the Reduce node. /// This mean that 4 replicas of a Reduce node with 2 workers each /// will result in the usage of 8 threads. - pub fn build_with_replicas( + pub fn build_with_replicas( n_worker: usize, n_replicas: usize, f: F, - ) -> impl InOut + ) -> impl InOut where - TInIter: IntoIterator, - TOutIter: FromIterator<(TKey, TReduce)>, + TInIter: IntoIterator, { assert!(n_replicas > 0); Self { @@ -346,18 +337,14 @@ where } } } -impl InOut - for Reduce +impl InOut for Reduce where TIn: Send + Clone + 'static, - TInIter: IntoIterator, - TKey: Send + Clone + 'static + Ord, - TReduce: Send + Clone + 'static, - TOutIter: FromIterator<(TKey, TReduce)>, - F: FnOnce(TKey, Vec) -> (TKey, TReduce) + Send + Copy, + TInIter: IntoIterator, + F: FnOnce(TIn, TIn) -> TIn + Send + Copy + Sync, { - fn run(&mut self, input: TInIter) -> Option { - let res: TOutIter = self.threadpool.par_reduce_by_key(input, self.f).collect(); + fn run(&mut self, input: TInIter) -> Option { + let res: TIn = self.threadpool.par_reduce(input, self.f); Some(res) } fn number_of_replicas(&self) -> usize { @@ -370,33 +357,28 @@ where /// In this Reduce, the elements are processed in the same order as they are received. /// The order of the output is the same as the order of the input. #[derive(Clone)] -pub struct OrderedReduce +pub struct OrderedReduce where TIn: Send, - TKey: Send, - TReduce: Send, - F: FnOnce(TKey, Vec) -> (TKey, TReduce) + Send + Copy, + F: FnOnce(TIn, TIn) -> TIn + Send + Copy + Sync, { threadpool: ThreadPool, replicas: usize, f: F, - phantom: PhantomData<(TIn, TKey, TReduce)>, + phantom: PhantomData, } -impl OrderedReduce +impl OrderedReduce where TIn: Send + Clone + 'static, - TKey: Send + Clone + 'static + Ord, - TReduce: Send + Clone + 'static, - F: FnOnce(TKey, Vec) -> (TKey, TReduce) + Send + Copy, + F: FnOnce(TIn, TIn) -> TIn + Send + Copy + Sync, { /// Create a new OrderedReduce node. /// # Arguments /// * `n_worker` - Number of worker threads. /// * `f` - Function to apply to each element of the input. - pub fn build(n_worker: usize, f: F) -> impl InOut + pub fn build(n_worker: usize, f: F) -> impl InOut where - TInIter: IntoIterator, - TOutIter: FromIterator<(TKey, TReduce)>, + TInIter: IntoIterator, { Self { threadpool: ThreadPool::with_capacity(n_worker), @@ -422,9 +404,7 @@ where /// /// Given a collection of vectors of integers, for each vector /// compute the summation of its elements. - /// As this reduce function works by grouping by key, - /// the input of the reduce function must be a vector of tuple (key, value). - /// In this example we want mantain the order of the input in the output. + /// In this example we mantain the order of the input in the output. /// /// ``` /// use ppl::{prelude::*, templates::misc::{SourceIter, OrderedSinkVec}, templates::map::OrderedReduce}; @@ -432,42 +412,35 @@ where /// // Create the vector of the elements that will be emitted by the Source node. /// // vec![(key,value)] /// let vector = vec![ - /// vec![(0, 1), (0, 2), (0, 3), (0, 4), (0, 5), (0, 6), (0, 7), (0, 8), (0, 9), (0 ,10)], - /// vec![(1, 1), (1, 2), (1, 3), (1, 4), (1, 5), (1, 6), (1, 7), (1, 8), (1, 9), (1 ,10)], - /// vec![(2, 1), (2, 2), (2, 3), (2, 4), (2, 5), (2, 6), (2, 7), (2, 8), (2, 9), (2 ,10)], + /// vec![1; 10], + /// vec![1; 100], + /// vec![1; 1000], /// ]; /// /// // Instantiate a new pipeline. /// let pipe = pipeline![ /// SourceIter::build(vector.into_iter()), - /// OrderedReduce::build_with_replicas(2, 4, |i, vec| -> (usize, i32) { - /// (i, vec.iter().sum()) + /// OrderedReduce::build_with_replicas(2, 4, |a, b| -> i32 { + /// a + b /// }), /// OrderedSinkVec::build() /// ]; /// /// // Start the pipeline and wait for the results. - /// let res: Vec> = pipe.start_and_wait_end().unwrap(); + /// let res: Vec = pipe.start_and_wait_end().unwrap(); /// /// // Collect a results for each vector emitted by the Source. In our case we had 3 vectors. /// assert_eq!(res.len(), 3); - /// - /// // As for each vector emitted we had only one key, we obtain only one result tuple - /// // for vector. Moreover, we check here also if the order of the input was preserved - /// // in the output. - /// for (check, vec) in res.into_iter().enumerate() { - /// for el in vec { - /// assert_eq!(el, (check, 55)); - /// } - /// } - pub fn build_with_replicas( + /// assert_eq!(res[0], 10); + /// assert_eq!(res[1], 100); + /// assert_eq!(res[2], 1000); + pub fn build_with_replicas( n_worker: usize, n_replicas: usize, f: F, - ) -> impl InOut + ) -> impl InOut where - TInIter: IntoIterator, - TOutIter: FromIterator<(TKey, TReduce)>, + TInIter: IntoIterator, { assert!(n_replicas > 0); Self { @@ -478,18 +451,14 @@ where } } } -impl InOut - for OrderedReduce +impl InOut for OrderedReduce where TIn: Send + Clone + 'static, - TInIter: IntoIterator, - TKey: Send + Clone + 'static + Ord, - TReduce: Send + Clone + 'static, - TOutIter: FromIterator<(TKey, TReduce)>, - F: FnOnce(TKey, Vec) -> (TKey, TReduce) + Send + Copy, + TInIter: IntoIterator, + F: FnOnce(TIn, TIn) -> TIn + Send + Copy + Sync, { - fn run(&mut self, input: TInIter) -> Option { - let res: TOutIter = self.threadpool.par_reduce_by_key(input, self.f).collect(); + fn run(&mut self, input: TInIter) -> Option { + let res: TIn = self.threadpool.par_reduce(input, self.f); Some(res) } fn number_of_replicas(&self) -> usize { @@ -956,34 +925,28 @@ mod test { #[test] #[serial] fn summation() { - let mut counter = 1; let mut set = Vec::new(); - for i in 0..1000 { + for _i in 0..1000 { let mut vector = Vec::new(); for _i in 0..10 { - vector.push((i, counter)); - counter += 1; + vector.push(1); } - counter = 1; set.push(vector); } let pipe = pipeline![ SourceIter::build(set.into_iter()), - Reduce::build(4, |i, vec| -> (i32, i32) { (i, vec.iter().sum()) }), + Reduce::build(4, |a, b| -> i32 { a + b }), SinkVec::build() ]; - let res: Vec> = pipe.start_and_wait_end().unwrap(); + let res: Vec = pipe.start_and_wait_end().unwrap(); assert_eq!(res.len(), 1000); - for vec in res { - assert_eq!(vec.len(), 1); - for el in vec { - assert_eq!(el.1, 55); - } + for el in res { + assert_eq!(el, 10) } unsafe { @@ -994,37 +957,19 @@ mod test { #[test] #[serial] fn summation_ordered() { - let mut counter = 1; - let mut set = Vec::new(); - - for i in 0..1000 { - let mut vector = Vec::new(); - for _i in 0..10 { - vector.push((i, counter)); - counter += 1; - } - counter = 1; - set.push(vector); - } - + let set = vec![vec![1; 10], vec![1; 100], vec![1; 1000]]; let pipe = pipeline![ SourceIter::build(set.into_iter()), - OrderedReduce::build_with_replicas(2, 4, |i, vec| -> (usize, i32) { - (i, vec.iter().sum()) - }), + OrderedReduce::build_with_replicas(2, 4, |a, b| -> i32 { a + b }), OrderedSinkVec::build() ]; - let res: Vec> = pipe.start_and_wait_end().unwrap(); + let res: Vec = pipe.start_and_wait_end().unwrap(); - assert_eq!(res.len(), 1000); - - for (check, vec) in res.into_iter().enumerate() { - assert_eq!(vec.len(), 1); - for el in vec { - assert_eq!(el, (check, 55)); - } - } + assert_eq!(res.len(), 3); + assert_eq!(res[0], 10); + assert_eq!(res[1], 100); + assert_eq!(res[2], 1000); unsafe { Orchestrator::delete_global_orchestrator();