diff --git a/README.md b/README.md index 3e897133..b8c8b99b 100644 --- a/README.md +++ b/README.md @@ -275,8 +275,8 @@ The pipeline approach in PPL provides an intuitive and flexible way to express c In the word counter example, the pipeline involves the following stages: - **Source**: Reads the dataset and emits lines of text. -- **Map**: Converts each line of text into a list of words, where each word is paired with a count of 1. -- **Reduce**: Aggregates the counts of words by summing them for each unique word. +- **MapReduce**: The map function converts each line of text into a list of words, where each word is paired with a count of 1. +Moreover, the reduce function aggregates the counts of words by summing them for each unique word. - **Sink**: Stores the final word counts in a hashmap. By breaking down the computation into stages and leveraging parallelism, PPL's pipeline approach allows for efficient distribution of work across multiple threads or cores, leading to faster execution. @@ -285,7 +285,7 @@ Here's the Rust code for the word counter using the pipeline approach: ```rust use ppl::{ - templates::map::{Map, Reduce}, + templates::map::MapReduce, prelude::*, }; @@ -317,21 +317,17 @@ impl In, Vec<(String, usize)>> for Sink { pub fn ppl(dataset: &str, threads: usize) { // Initialization and configuration... - let mut p = pipeline![ + let mut p = pipeline![ Source { reader }, - Map::build::, Vec<(String, usize)>>(threads / 2, |str| -> (String, usize) { - (str, 1) - }), - Reduce::build(threads / 2, |str, count| { - let mut sum = 0; - for c in count { - sum += c; - } - (str, sum) - }), + MapReduce::build_with_replicas( + threads / 2, + |str| -> (String, usize) { (str, 1) }, // Map function + |a, b| a + b, + 2 + ), Sink { - counter: HashMap::new(), - }, + counter: HashMap::new() + } ]; p.start(); @@ -379,13 +375,7 @@ pub fn ppl_map(dataset: &str, threads: usize) { }) .collect::>(), |str| -> (String, usize) { (str, 1) }, - |str, count| { - let mut sum = 0; - for c in count { - sum += c; - } - (str, sum) - }, + |a, b| a + b, ); } ``` diff --git a/examples/wordcount/main.rs b/examples/wordcount/main.rs index fd14f5fa..23f5d650 100644 --- a/examples/wordcount/main.rs +++ b/examples/wordcount/main.rs @@ -35,7 +35,6 @@ fn main() { //"rayon" => rayon::rayon(dir_name, threads), //"std-threads" => std_threads::std_threads(dir_name, threads), "ppl" => { - timeit(|| ppl::ppl(dataset, threads)); timeit(|| ppl::ppl_combined_map_reduce(dataset, threads)); timeit(|| ppl::ppl_map(dataset, threads)); } diff --git a/examples/wordcount/ppl.rs b/examples/wordcount/ppl.rs index 96717523..37eb8d46 100644 --- a/examples/wordcount/ppl.rs +++ b/examples/wordcount/ppl.rs @@ -8,10 +8,7 @@ use std::{ usize, }; -use ppl::{ - prelude::*, - templates::map::{Map, MapReduce, Reduce}, -}; +use ppl::{prelude::*, templates::map::MapReduce}; struct Source { reader: BufReader, @@ -59,37 +56,6 @@ impl In, Vec<(String, usize)>> for Sink { } } -pub fn ppl(dataset: &str, threads: usize) { - let file = File::open(dataset).expect("no such file"); - let reader = BufReader::new(file); - - let mut p = pipeline![ - Source { reader }, - Map::build::, Vec<(String, usize)>>(threads / 2, |str| -> (String, usize) { - (str, 1) - }), - Reduce::build(threads / 2, |str, count| { - let mut sum = 0; - for c in count { - sum += c; - } - (str, sum) - }), - Sink { - counter: HashMap::new() - } - ]; - - p.start(); - let res = p.wait_end(); - - let mut total_words = 0; - for (_key, value) in res.unwrap() { - total_words += value; - } - println!("[PIPELINE] Total words: {}", total_words); -} - // Version that use a node that combine map and reduce pub fn ppl_combined_map_reduce(dataset: &str, threads: usize) { let file = File::open(dataset).expect("no such file"); @@ -100,14 +66,7 @@ pub fn ppl_combined_map_reduce(dataset: &str, threads: usize) { MapReduce::build_with_replicas( threads / 2, |str| -> (String, usize) { (str, 1) }, // Map function - |str, count| { - // Reduce - let mut sum = 0; - for c in count { - sum += c; - } - (str, sum) - }, + |a, b| a + b, 2 ), Sink { @@ -153,13 +112,7 @@ pub fn ppl_map(dataset: &str, threads: usize) { }) .collect::>(), |str| -> (String, usize) { (str, 1) }, - |str, count| { - let mut sum = 0; - for c in count { - sum += c; - } - (str, sum) - }, + |a, b| a + b, ); let mut total_words = 0; diff --git a/src/templates/map.rs b/src/templates/map.rs index 92d09164..100fc916 100644 --- a/src/templates/map.rs +++ b/src/templates/map.rs @@ -237,26 +237,23 @@ where /// Reduce /// -/// In this Reduce, the elements are grouped by key and then reduced. +/// Takes in input a type that implements the [`Iterator`] trait +/// and applies a reduce function. #[derive(Clone)] -pub struct Reduce +pub struct Reduce where TIn: Send, - TKey: Send, - TReduce: Send, - F: FnOnce(TKey, Vec) -> (TKey, TReduce) + Send + Copy, + F: FnOnce(TIn, TIn) -> TIn + Send + Copy + Sync, { threadpool: ThreadPool, replicas: usize, f: F, - phantom: PhantomData<(TIn, TKey, TReduce)>, + phantom: PhantomData, } -impl Reduce +impl Reduce where TIn: Send + Clone + 'static, - TKey: Send + Clone + 'static + Ord, - TReduce: Send + Clone + 'static, - F: FnOnce(TKey, Vec) -> (TKey, TReduce) + Send + Copy, + F: FnOnce(TIn, TIn) -> TIn + Send + Copy + Sync, { /// Create a new Reduce node. /// # Arguments @@ -267,8 +264,6 @@ where /// /// Given a collection of vectors of integers, for each vector /// compute the summation of its elements. - /// As this reduce function works by grouping by key, - /// the input of the reduce function must be a vector of tuple (key, value). /// /// ``` /// @@ -277,37 +272,34 @@ where /// // Create the vector of the elements that will be emitted by the Source node. /// // vec![(key,value)] /// let vector = vec![ - /// vec![(0, 1), (0, 2), (0, 3), (0, 4), (0, 5), (0, 6), (0, 7), (0, 8), (0, 9), (0 ,10)], - /// vec![(0, 1), (0, 2), (0, 3), (0, 4), (0, 5), (0, 6), (0, 7), (0, 8), (0, 9), (0 ,10)], - /// vec![(0, 1), (0, 2), (0, 3), (0, 4), (0, 5), (0, 6), (0, 7), (0, 8), (0, 9), (0 ,10)], + /// vec![1; 10], + /// vec![1; 10], + /// vec![1; 10], /// ]; /// /// // Instantiate a new pipeline. /// let pipe = pipeline![ /// SourceIter::build(vector.into_iter()), - /// Reduce::build(4, |i, vec| -> (i32, i32) { - /// (i, vec.iter().sum()) + /// Reduce::build(4, |a, b| -> i32 { + /// a + b /// }), /// SinkVec::build() /// ]; /// /// // Start the pipeline and wait for the results. - /// let res: Vec> = pipe.start_and_wait_end().unwrap(); + /// let res: Vec = pipe.start_and_wait_end().unwrap(); /// /// // Collect a results for each vector emitted by the Source. In our case we had 3 vectors. /// assert_eq!(res.len(), 3); /// /// // As for each vector emitted we had only one key, we obtain only one result tuple /// // for vector. - /// for vec in res { - /// for el in vec { - /// assert_eq!(el.1, 55); - /// } + /// for el in res { + /// assert_eq!(el, 10); /// } - pub fn build(n_worker: usize, f: F) -> impl InOut + pub fn build(n_worker: usize, f: F) -> impl InOut where - TInIter: IntoIterator, - TOutIter: FromIterator<(TKey, TReduce)>, + TInIter: IntoIterator, { Self { threadpool: ThreadPool::with_capacity(n_worker), @@ -328,14 +320,13 @@ where /// The replicas are created by cloning the Reduce node. /// This mean that 4 replicas of a Reduce node with 2 workers each /// will result in the usage of 8 threads. - pub fn build_with_replicas( + pub fn build_with_replicas( n_worker: usize, n_replicas: usize, f: F, - ) -> impl InOut + ) -> impl InOut where - TInIter: IntoIterator, - TOutIter: FromIterator<(TKey, TReduce)>, + TInIter: IntoIterator, { assert!(n_replicas > 0); Self { @@ -346,18 +337,14 @@ where } } } -impl InOut - for Reduce +impl InOut for Reduce where TIn: Send + Clone + 'static, - TInIter: IntoIterator, - TKey: Send + Clone + 'static + Ord, - TReduce: Send + Clone + 'static, - TOutIter: FromIterator<(TKey, TReduce)>, - F: FnOnce(TKey, Vec) -> (TKey, TReduce) + Send + Copy, + TInIter: IntoIterator, + F: FnOnce(TIn, TIn) -> TIn + Send + Copy + Sync, { - fn run(&mut self, input: TInIter) -> Option { - let res: TOutIter = self.threadpool.par_reduce(input, self.f).collect(); + fn run(&mut self, input: TInIter) -> Option { + let res: TIn = self.threadpool.par_reduce(input, self.f); Some(res) } fn number_of_replicas(&self) -> usize { @@ -370,33 +357,28 @@ where /// In this Reduce, the elements are processed in the same order as they are received. /// The order of the output is the same as the order of the input. #[derive(Clone)] -pub struct OrderedReduce +pub struct OrderedReduce where TIn: Send, - TKey: Send, - TReduce: Send, - F: FnOnce(TKey, Vec) -> (TKey, TReduce) + Send + Copy, + F: FnOnce(TIn, TIn) -> TIn + Send + Copy + Sync, { threadpool: ThreadPool, replicas: usize, f: F, - phantom: PhantomData<(TIn, TKey, TReduce)>, + phantom: PhantomData, } -impl OrderedReduce +impl OrderedReduce where TIn: Send + Clone + 'static, - TKey: Send + Clone + 'static + Ord, - TReduce: Send + Clone + 'static, - F: FnOnce(TKey, Vec) -> (TKey, TReduce) + Send + Copy, + F: FnOnce(TIn, TIn) -> TIn + Send + Copy + Sync, { /// Create a new OrderedReduce node. /// # Arguments /// * `n_worker` - Number of worker threads. /// * `f` - Function to apply to each element of the input. - pub fn build(n_worker: usize, f: F) -> impl InOut + pub fn build(n_worker: usize, f: F) -> impl InOut where - TInIter: IntoIterator, - TOutIter: FromIterator<(TKey, TReduce)>, + TInIter: IntoIterator, { Self { threadpool: ThreadPool::with_capacity(n_worker), @@ -422,9 +404,7 @@ where /// /// Given a collection of vectors of integers, for each vector /// compute the summation of its elements. - /// As this reduce function works by grouping by key, - /// the input of the reduce function must be a vector of tuple (key, value). - /// In this example we want mantain the order of the input in the output. + /// In this example we mantain the order of the input in the output. /// /// ``` /// use ppl::{prelude::*, templates::misc::{SourceIter, OrderedSinkVec}, templates::map::OrderedReduce}; @@ -432,42 +412,35 @@ where /// // Create the vector of the elements that will be emitted by the Source node. /// // vec![(key,value)] /// let vector = vec![ - /// vec![(0, 1), (0, 2), (0, 3), (0, 4), (0, 5), (0, 6), (0, 7), (0, 8), (0, 9), (0 ,10)], - /// vec![(1, 1), (1, 2), (1, 3), (1, 4), (1, 5), (1, 6), (1, 7), (1, 8), (1, 9), (1 ,10)], - /// vec![(2, 1), (2, 2), (2, 3), (2, 4), (2, 5), (2, 6), (2, 7), (2, 8), (2, 9), (2 ,10)], + /// vec![1; 10], + /// vec![1; 100], + /// vec![1; 1000], /// ]; /// /// // Instantiate a new pipeline. /// let pipe = pipeline![ /// SourceIter::build(vector.into_iter()), - /// OrderedReduce::build_with_replicas(2, 4, |i, vec| -> (usize, i32) { - /// (i, vec.iter().sum()) + /// OrderedReduce::build_with_replicas(2, 4, |a, b| -> i32 { + /// a + b /// }), /// OrderedSinkVec::build() /// ]; /// /// // Start the pipeline and wait for the results. - /// let res: Vec> = pipe.start_and_wait_end().unwrap(); + /// let res: Vec = pipe.start_and_wait_end().unwrap(); /// /// // Collect a results for each vector emitted by the Source. In our case we had 3 vectors. /// assert_eq!(res.len(), 3); - /// - /// // As for each vector emitted we had only one key, we obtain only one result tuple - /// // for vector. Moreover, we check here also if the order of the input was preserved - /// // in the output. - /// for (check, vec) in res.into_iter().enumerate() { - /// for el in vec { - /// assert_eq!(el, (check, 55)); - /// } - /// } - pub fn build_with_replicas( + /// assert_eq!(res[0], 10); + /// assert_eq!(res[1], 100); + /// assert_eq!(res[2], 1000); + pub fn build_with_replicas( n_worker: usize, n_replicas: usize, f: F, - ) -> impl InOut + ) -> impl InOut where - TInIter: IntoIterator, - TOutIter: FromIterator<(TKey, TReduce)>, + TInIter: IntoIterator, { assert!(n_replicas > 0); Self { @@ -478,18 +451,14 @@ where } } } -impl InOut - for OrderedReduce +impl InOut for OrderedReduce where TIn: Send + Clone + 'static, - TInIter: IntoIterator, - TKey: Send + Clone + 'static + Ord, - TReduce: Send + Clone + 'static, - TOutIter: FromIterator<(TKey, TReduce)>, - F: FnOnce(TKey, Vec) -> (TKey, TReduce) + Send + Copy, + TInIter: IntoIterator, + F: FnOnce(TIn, TIn) -> TIn + Send + Copy + Sync, { - fn run(&mut self, input: TInIter) -> Option { - let res: TOutIter = self.threadpool.par_reduce(input, self.f).collect(); + fn run(&mut self, input: TInIter) -> Option { + let res: TIn = self.threadpool.par_reduce(input, self.f); Some(res) } fn number_of_replicas(&self) -> usize { @@ -505,30 +474,27 @@ where /// Nodes of this type are composed of a Map and a Reduce. /// The Map is applied to each element of the input, and the Reduce is applied to the output of the Map. #[derive(Clone)] -pub struct MapReduce +pub struct MapReduce where TIn: Send, TMapOut: Send, TKey: Send, - TReduce: Send, FMap: FnOnce(TIn) -> (TKey, TMapOut) + Send + Copy, - FReduce: FnOnce(TKey, Vec) -> (TKey, TReduce) + Send + Copy, + FReduce: FnOnce(TMapOut, TMapOut) -> TMapOut, { threadpool: ThreadPool, replicas: usize, f_map: FMap, f_reduce: FReduce, - phantom: PhantomData<(TIn, TMapOut, TKey, TReduce)>, + phantom: PhantomData<(TIn, TMapOut, TKey)>, } -impl - MapReduce +impl MapReduce where TIn: Send + Clone + 'static, TMapOut: Send + Clone + 'static, TKey: Send + Clone + 'static + Ord, - TReduce: Send + Clone + 'static, FMap: FnOnce(TIn) -> (TKey, TMapOut) + Send + Copy, - FReduce: FnOnce(TKey, Vec) -> (TKey, TReduce) + Send + Copy, + FReduce: FnOnce(TMapOut, TMapOut) -> TMapOut + Send + Copy + Sync, { /// Create a new MapReduce node. /// # Arguments @@ -564,8 +530,8 @@ where /// |el: (usize, f64)| -> (usize, f64) { /// (el.0, el.1 * el.1) /// }, - /// |i, vec| { - /// (i, vec.iter().sum()) + /// |a, b| { + /// a + b /// }), /// SinkVec::build() /// ]; @@ -579,7 +545,7 @@ where ) -> impl InOut where TInIter: IntoIterator, - TOutIter: FromIterator<(TKey, TReduce)>, + TOutIter: FromIterator<(TKey, TMapOut)>, { Self { threadpool: ThreadPool::with_capacity(n_worker), @@ -610,7 +576,7 @@ where ) -> impl InOut where TInIter: IntoIterator, - TOutIter: FromIterator<(TKey, TReduce)>, + TOutIter: FromIterator<(TKey, TMapOut)>, { assert!(n_replicas > 0); Self { @@ -622,17 +588,16 @@ where } } } -impl InOut - for MapReduce +impl InOut + for MapReduce where TIn: Send + Clone + 'static, TMapOut: Send + Clone + 'static, TInIter: IntoIterator, TKey: Send + Clone + 'static + Ord, - TReduce: Send + Clone + 'static, - TOutIter: FromIterator<(TKey, TReduce)>, + TOutIter: FromIterator<(TKey, TMapOut)>, FMap: FnOnce(TIn) -> (TKey, TMapOut) + Send + Copy, - FReduce: FnOnce(TKey, Vec) -> (TKey, TReduce) + Send + Copy, + FReduce: FnOnce(TMapOut, TMapOut) -> TMapOut + Send + Copy + Sync, { fn run(&mut self, input: TInIter) -> Option { let res: TOutIter = self @@ -654,30 +619,27 @@ where /// This node is slower than MapReduce but preserves the order of the input. /// This node is useful when the order of the input is important. #[derive(Clone)] -pub struct OrderedMapReduce +pub struct OrderedMapReduce where TIn: Send, TMapOut: Send, TKey: Send, - TReduce: Send, FMap: FnOnce(TIn) -> (TKey, TMapOut) + Send + Copy, - FReduce: FnOnce(TKey, Vec) -> (TKey, TReduce) + Send + Copy, + FReduce: FnOnce(TMapOut, TMapOut) -> TMapOut + Send + Copy, { threadpool: ThreadPool, replicas: usize, f_map: FMap, f_reduce: FReduce, - phantom: PhantomData<(TIn, TMapOut, TKey, TReduce)>, + phantom: PhantomData<(TIn, TMapOut, TKey)>, } -impl - OrderedMapReduce +impl OrderedMapReduce where TIn: Send + Clone + 'static, TMapOut: Send + Clone + 'static, TKey: Send + Clone + 'static + Ord, - TReduce: Send + Clone + 'static, FMap: FnOnce(TIn) -> (TKey, TMapOut) + Send + Copy, - FReduce: FnOnce(TKey, Vec) -> (TKey, TReduce) + Send + Copy, + FReduce: FnOnce(TMapOut, TMapOut) -> TMapOut + Send + Copy + Sync, { /// Create a new OrderedMapReduce node. /// # Arguments @@ -691,7 +653,7 @@ where ) -> impl InOut where TInIter: IntoIterator, - TOutIter: FromIterator<(TKey, TReduce)>, + TOutIter: FromIterator<(TKey, TMapOut)>, { Self { threadpool: ThreadPool::with_capacity(n_worker), @@ -743,8 +705,8 @@ where /// |el: (usize, f64)| -> (usize, f64) { /// (el.0, el.1 * el.1) /// }, - /// |i, vec| { - /// (i, vec.iter().sum()) + /// |a, b| { + /// a + b /// }), /// OrderedSinkVec::build() /// ]; @@ -768,7 +730,7 @@ where ) -> impl InOut where TInIter: IntoIterator, - TOutIter: FromIterator<(TKey, TReduce)>, + TOutIter: FromIterator<(TKey, TMapOut)>, { assert!(n_replicas > 0); Self { @@ -780,17 +742,16 @@ where } } } -impl InOut - for OrderedMapReduce +impl InOut + for OrderedMapReduce where TIn: Send + Clone + 'static, TMapOut: Send + Clone + 'static, TInIter: IntoIterator, TKey: Send + Clone + 'static + Ord, - TReduce: Send + Clone + 'static, - TOutIter: FromIterator<(TKey, TReduce)>, + TOutIter: FromIterator<(TKey, TMapOut)>, FMap: FnOnce(TIn) -> (TKey, TMapOut) + Send + Copy, - FReduce: FnOnce(TKey, Vec) -> (TKey, TReduce) + Send + Copy, + FReduce: FnOnce(TMapOut, TMapOut) -> TMapOut + Send + Copy + Sync, { fn run(&mut self, input: TInIter) -> Option { let res: TOutIter = self @@ -964,34 +925,28 @@ mod test { #[test] #[serial] fn summation() { - let mut counter = 1; let mut set = Vec::new(); - for i in 0..1000 { + for _i in 0..1000 { let mut vector = Vec::new(); for _i in 0..10 { - vector.push((i, counter)); - counter += 1; + vector.push(1); } - counter = 1; set.push(vector); } let pipe = pipeline![ SourceIter::build(set.into_iter()), - Reduce::build(4, |i, vec| -> (i32, i32) { (i, vec.iter().sum()) }), + Reduce::build(4, |a, b| -> i32 { a + b }), SinkVec::build() ]; - let res: Vec> = pipe.start_and_wait_end().unwrap(); + let res: Vec = pipe.start_and_wait_end().unwrap(); assert_eq!(res.len(), 1000); - for vec in res { - assert_eq!(vec.len(), 1); - for el in vec { - assert_eq!(el.1, 55); - } + for el in res { + assert_eq!(el, 10) } unsafe { @@ -1002,37 +957,19 @@ mod test { #[test] #[serial] fn summation_ordered() { - let mut counter = 1; - let mut set = Vec::new(); - - for i in 0..1000 { - let mut vector = Vec::new(); - for _i in 0..10 { - vector.push((i, counter)); - counter += 1; - } - counter = 1; - set.push(vector); - } - + let set = vec![vec![1; 10], vec![1; 100], vec![1; 1000]]; let pipe = pipeline![ SourceIter::build(set.into_iter()), - OrderedReduce::build_with_replicas(2, 4, |i, vec| -> (usize, i32) { - (i, vec.iter().sum()) - }), + OrderedReduce::build_with_replicas(2, 4, |a, b| -> i32 { a + b }), OrderedSinkVec::build() ]; - let res: Vec> = pipe.start_and_wait_end().unwrap(); - - assert_eq!(res.len(), 1000); + let res: Vec = pipe.start_and_wait_end().unwrap(); - for (check, vec) in res.into_iter().enumerate() { - assert_eq!(vec.len(), 1); - for el in vec { - assert_eq!(el, (check, 55)); - } - } + assert_eq!(res.len(), 3); + assert_eq!(res[0], 10); + assert_eq!(res[1], 100); + assert_eq!(res[2], 1000); unsafe { Orchestrator::delete_global_orchestrator(); @@ -1060,7 +997,7 @@ mod test { MapReduce::build( 8, |el: (usize, f64)| -> (usize, f64) { (el.0, el.1 * el.1) }, - |i, vec| { (i, vec.iter().sum()) } + |a, b| { a + b } ), SinkVec::build() ]; @@ -1103,7 +1040,7 @@ mod test { 2, 4, |el: (usize, f64)| -> (usize, f64) { (el.0, el.1 * el.1) }, - |i, vec| { (i, vec.iter().sum()) } + |a, b| { a + b } ), OrderedSinkVec::build() ]; diff --git a/src/templates/misc.rs b/src/templates/misc.rs index 8d4baa0c..345d87fd 100644 --- a/src/templates/misc.rs +++ b/src/templates/misc.rs @@ -19,7 +19,7 @@ where { /// Creates a new source from any type that implements the `Iterator` trait. /// The source will terminate when the iterator is exhausted. - /// + /// /// # Arguments /// * `iterator` - Type that implements the [`Iterator`] trait /// and represents the stream of data we want emit. @@ -123,7 +123,7 @@ where /// /// # Arguments /// * `chunk_size` - Number of elements for each chunk. - /// + /// /// # Examples /// Given a stream of numbers, we create a pipeline with a splitter that /// create vectors of two elements each. @@ -204,7 +204,7 @@ where T: Send + 'static + Clone, { /// Creates a new aggregator node. - /// + /// /// # Arguments /// * `chunk_size` - Number of elements for each chunk. /// @@ -232,7 +232,7 @@ where } /// Creates a new aggregator node with 'n_replicas' replicas of the same node. - /// + /// /// # Arguments /// * `n_replicas` - Number of replicas. /// * `chunk_size` - Number of elements for each chunk. @@ -291,7 +291,7 @@ where F: FnMut(T) -> U + Send + 'static + Clone, { /// Creates a new sequential node. - /// + /// /// # Arguments /// * `f` - Function name or lambda function that specify the logic /// of this node. @@ -334,7 +334,7 @@ where F: FnMut(T) -> U + Send + 'static + Clone, { /// Creates a new parallel node. - /// + /// /// # Arguments /// * `n_replicas` - Number of replicas. /// * `f` - Function name or lambda function that specify the logic @@ -380,7 +380,7 @@ where F: FnMut(&T) -> bool + Send + 'static + Clone, { /// Creates a new filter node. - /// + /// /// # Arguments /// * `f` - Function name or lambda function that represent the predicate /// function we want to apply. @@ -561,7 +561,7 @@ where T: Send + 'static + Clone, { /// Creates a new ordered aggregator node - /// + /// /// # Arguments /// * `chunk_size` - Number of elements for each chunk. pub fn build(chunk_size: usize) -> impl InOut> { diff --git a/src/thread_pool/mod.rs b/src/thread_pool/mod.rs index ac7f893c..0b19b221 100644 --- a/src/thread_pool/mod.rs +++ b/src/thread_pool/mod.rs @@ -241,6 +241,8 @@ impl ThreadPool { /// Create a new thread pool with `num_threads` threads. /// + /// # Arguments + /// * `num_threads` - Number of threads in the threadpool. /// # Examples /// /// ``` @@ -255,6 +257,9 @@ impl ThreadPool { /// Execute a function `task` on a thread in the thread pool. /// This method is non-blocking, so the developer must call `wait` to wait for the task to finish. + /// + /// # Arguments + /// * `task` - Function name or lambda function to execute in the threadpool. pub fn execute(&self, task: F) where F: FnOnce() + Send + 'static, @@ -279,6 +284,11 @@ impl ThreadPool { /// it distributes works of size `chunk_size` to the threads in the pool. /// The function `f` is applied to each element in the range. /// The range is split in chunks of size `chunk_size` and each chunk is assigned to a thread. + /// + /// # Arguments + /// * `range` - Range of indices. + /// * `chunk_size` - Size of each chunk. + /// * `f` - Function name or lambda function. pub fn par_for(&mut self, range: Range, chunk_size: usize, mut f: F) where F: FnMut(usize) + Send + Copy, @@ -307,6 +317,11 @@ impl ThreadPool { /// Applies in parallel the function `f` on a iterable object `iter`. /// + /// # Arguments + /// * `iter` - Type that implements the [`Iterator`] trait. + /// * `f` - Function name or lambda function that specify the + /// operation we want to apply to `iter` in parallel. + /// /// # Examples /// /// Increment of 1 all the elements in a vector concurrently: @@ -332,7 +347,10 @@ impl ThreadPool { /// Applies in parallel the function `f` on a iterable object `iter`, /// producing a new iterator with the results. - /// + /// # Arguments + /// * `iter` - Type that implements the [`Iterator`] trait. + /// * `f` - Function name or lambda function that specify the + /// operation we want to apply to `iter` in parallel. /// # Examples /// /// Produce a vec of `String` from the elements of a vector `vec` concurrently: @@ -371,7 +389,7 @@ impl ThreadPool { }); }); - drop(arc_tx); + drop(arc_tx); // Refactoring? let mut disconnected = false; @@ -400,13 +418,18 @@ impl ThreadPool { /// The function `f` must return a tuple of two elements, the first one /// is the key and the second one is the value. /// The results are grouped by key and reduced by the function `reduce`. - /// The function `reduce` must take two arguments, the first one is the - /// key and the second one is a vector of values. - /// The function `reduce` must return a tuple of two elements, the first one - /// is the key and the second one is the value. + /// The function `reduce` must take two arguments. + /// The function `reduce` must return a value of the same type of the input one. /// This method return an iterator of tuples of two elements, the first one /// is the key and the second one is the value. /// + /// # Arguments + /// * `iter` - Type that implements the [`Iterator`] trait. + /// * `f` - Function name or lambda function that specify the + /// operation we want to apply to `iter` in parallel. + /// * `reduce` - Function name or lambda function that specify the reduction function + /// we want to apply. + /// /// # Examples /// /// ``` @@ -421,30 +444,41 @@ impl ThreadPool { /// /// let res: Vec<(i32, i32)> = pool.par_map_reduce(&mut vec, |el| -> (i32, i32) { /// (*el % 10, *el) - /// }, |k, v| -> (i32, i32) { - /// (k, v.iter().sum()) + /// }, |a, b| -> i32 { + /// a + b /// }).collect(); /// assert_eq!(res.len(), 10); /// ``` - pub fn par_map_reduce( + pub fn par_map_reduce( &mut self, iter: Iter, f: F, reduce: Reduce, - ) -> impl Iterator + ) -> impl Iterator where F: FnOnce(Iter::Item) -> (K, V) + Send + Copy, ::Item: Send, K: Send + Ord + 'static, V: Send + 'static, - R: Send + 'static, - Reduce: FnOnce(K, Vec) -> (K, R) + Send + Copy, + Reduce: FnOnce(V, V) -> V + Send + Copy + Sync, Iter: IntoIterator, { let map = self.par_map(iter, f); - self.par_reduce(map, reduce) + + // Shuffle by grouping the elements by key. + let mut ordered_map = BTreeMap::new(); + for (k, v) in map { + ordered_map.entry(k).or_insert_with(Vec::new).push(v); + } + let mut res = Vec::new(); + for el in ordered_map { + res.push((el.0, self.par_reduce(el.1, reduce))); + } + + res.into_iter() } + /// Parallel Reduce by Key /// Reduces in parallel the elements of an iterator `iter` by the function `f`. /// The function `f` must take two arguments, the first one is the /// key and the second one is a vector of values. @@ -455,6 +489,11 @@ impl ThreadPool { /// This method return an iterator of tuples of two elements, the first one /// is the key and the second one is the value obtained by the function `f`. /// + /// # Arguments + /// * `iter` - Type that implements the [`Iterator`] trait. + /// * `f` - Function name or lambda function that specify the + /// operation we want to apply to `iter` in parallel. + /// /// # Examples /// /// ``` @@ -468,12 +507,16 @@ impl ThreadPool { /// vec.push((i % 10, i)); /// } /// - /// let res: Vec<(i32, i32)> = pool.par_reduce(vec, |k, v| -> (i32, i32) { + /// let res: Vec<(i32, i32)> = pool.par_reduce_by_key(vec, |k, v| -> (i32, i32) { /// (k, v.iter().sum()) /// }).collect(); /// assert_eq!(res.len(), 10); /// ``` - pub fn par_reduce(&mut self, iter: Iter, f: F) -> impl Iterator + pub fn par_reduce_by_key( + &mut self, + iter: Iter, + f: F, + ) -> impl Iterator where ::Item: Send, K: Send + Ord + 'static, @@ -491,6 +534,67 @@ impl ThreadPool { self.par_map(ordered_map, move |(k, v)| f(k, v)) } + /// Parallel Reduce + /// Reduces in parallel the elements of the iterator `iter`. + /// The function `reduce` must take two arguments and + /// must return a value of the same type of the input one. + /// + /// # Arguments + /// * `iter` - Type that implements the [`Iterator`] trait. + /// * `f` - Function name or lambda function that specify the reduction function + /// we want to apply. + /// # Examples + /// ``` + /// use ppl::thread_pool::ThreadPool; + /// + /// let mut pool = ThreadPool::new(); + /// + /// let mut vec = Vec::new(); + /// + /// for _i in 0..10 { + /// vec.push(1); + /// } + /// + /// let res = pool.par_reduce(vec, |a, b| -> i32 { + /// a + b + /// }); + /// assert_eq!(res, 10); + pub fn par_reduce(&mut self, iter: Iter, f: F) -> V + where + ::Item: Send, + V: Send + 'static, + F: FnOnce(V, V) -> V + Send + Copy + Sync, + Iter: IntoIterator, + { + let mut data: Vec = iter.into_iter().collect(); + + while data.len() != 1 { + let mut tmp = Vec::new(); + let mut num_proc = self.num_workers; + + while data.len() < 2 * num_proc { + num_proc -= 1; + } + let mut counter = 0; + + while !data.is_empty() { + counter %= num_proc; + tmp.push((counter, data.pop().unwrap())); + counter += 1; + } + + data = self + .par_reduce_by_key(tmp, |k, v| { + (k, v.into_iter().reduce(|a, b| f(a, b)).unwrap()) + }) + .collect::>() + .into_iter() + .map(|(_a, b)| b) + .collect(); + } + data.pop().unwrap() + } + /// Create a new scope to execute jobs on other threads. /// The function passed to this method will be provided with a [`Scope`] object, /// which can be used to spawn new jobs through the [`Scope::execute`] method. diff --git a/src/thread_pool/test.rs b/src/thread_pool/test.rs index 742f8de9..ee4538d7 100644 --- a/src/thread_pool/test.rs +++ b/src/thread_pool/test.rs @@ -128,11 +128,7 @@ fn test_par_map_reduce() { } } - let res = tp.par_map_reduce( - vec, - |el| -> (i32, i32) { (el, 1) }, - |k, v| (k, v.iter().sum::()), - ); + let res = tp.par_map_reduce(vec, |el| -> (i32, i32) { (el, 1) }, |a, b| a + b); let mut check = true; for (k, v) in res { @@ -150,7 +146,7 @@ fn test_par_map_reduce() { #[test] #[serial] -fn test_par_reduce() { +fn test_par_reduce_by_key() { let mut pool = ThreadPool::new(); let mut vec = Vec::new(); @@ -159,12 +155,27 @@ fn test_par_reduce() { } let res: Vec<(i32, i32)> = pool - .par_reduce(vec, |k, v| -> (i32, i32) { (k, v.iter().sum()) }) + .par_reduce_by_key(vec, |k, v| -> (i32, i32) { (k, v.iter().sum()) }) .collect(); assert_eq!(res.len(), 10); } +#[test] +#[serial] +fn test_par_reduce() { + let mut pool = ThreadPool::new(); + + let mut vec = Vec::new(); + for _i in 0..130 { + vec.push(1); + } + + let res = pool.par_reduce(vec, |a, b| a + b); + + assert_eq!(res, 130); +} + #[test] #[serial] fn test_par_map_reduce_seq() { @@ -178,7 +189,7 @@ fn test_par_map_reduce_seq() { } let res = tp.par_map(vec, |el| -> (i32, i32) { (el, 1) }); - let res = tp.par_reduce(res, |k, v| (k, v.iter().sum::())); + let res = tp.par_reduce_by_key(res, |k, v| (k, v.iter().sum::())); let mut check = true; for (k, v) in res { diff --git a/tests/test_producer.rs b/tests/test_producer.rs index 348bc3eb..d43dd947 100644 --- a/tests/test_producer.rs +++ b/tests/test_producer.rs @@ -73,11 +73,7 @@ fn test_producer() { assert_eq!(res.len(), 5000); // Count the occurrences of each number. - let check = tp.par_map_reduce( - res, - |el| -> (usize, usize) { (el, 1) }, - |k, v| -> (usize, usize) { (k, v.iter().sum()) }, - ); + let check = tp.par_map_reduce(res, |el| -> (usize, usize) { (el, 1) }, |a, b| a + b); // Check that the number of occurrences is correct. for (_, v) in check {