From f1e5517a974267438b4bb14bd6d1e3bc47ec0a7c Mon Sep 17 00:00:00 2001 From: Valerio Besozzi Date: Mon, 20 Nov 2023 16:02:15 +0100 Subject: [PATCH] Refactoring MapReduce --- examples/wordcount/ppl.rs | 16 +++-------- src/templates/map.rs | 56 +++++++++++++++++---------------------- src/thread_pool/mod.rs | 39 ++++++++++++++++----------- src/thread_pool/test.rs | 6 +---- tests/test_producer.rs | 6 +---- 5 files changed, 53 insertions(+), 70 deletions(-) diff --git a/examples/wordcount/ppl.rs b/examples/wordcount/ppl.rs index 96717523..9e045067 100644 --- a/examples/wordcount/ppl.rs +++ b/examples/wordcount/ppl.rs @@ -100,13 +100,9 @@ pub fn ppl_combined_map_reduce(dataset: &str, threads: usize) { MapReduce::build_with_replicas( threads / 2, |str| -> (String, usize) { (str, 1) }, // Map function - |str, count| { + |a, b| { // Reduce - let mut sum = 0; - for c in count { - sum += c; - } - (str, sum) + a + b }, 2 ), @@ -153,13 +149,7 @@ pub fn ppl_map(dataset: &str, threads: usize) { }) .collect::>(), |str| -> (String, usize) { (str, 1) }, - |str, count| { - let mut sum = 0; - for c in count { - sum += c; - } - (str, sum) - }, + |a, b| a + b, ); let mut total_words = 0; diff --git a/src/templates/map.rs b/src/templates/map.rs index 57af6b9e..eaf4a5b4 100644 --- a/src/templates/map.rs +++ b/src/templates/map.rs @@ -505,30 +505,27 @@ where /// Nodes of this type are composed of a Map and a Reduce. /// The Map is applied to each element of the input, and the Reduce is applied to the output of the Map. #[derive(Clone)] -pub struct MapReduce +pub struct MapReduce where TIn: Send, TMapOut: Send, TKey: Send, - TReduce: Send, FMap: FnOnce(TIn) -> (TKey, TMapOut) + Send + Copy, - FReduce: FnOnce(TKey, Vec) -> (TKey, TReduce) + Send + Copy, + FReduce: FnOnce(TMapOut, TMapOut) -> TMapOut, { threadpool: ThreadPool, replicas: usize, f_map: FMap, f_reduce: FReduce, - phantom: PhantomData<(TIn, TMapOut, TKey, TReduce)>, + phantom: PhantomData<(TIn, TMapOut, TKey)>, } -impl - MapReduce +impl MapReduce where TIn: Send + Clone + 'static, TMapOut: Send + Clone + 'static, TKey: Send + Clone + 'static + Ord, - TReduce: Send + Clone + 'static, FMap: FnOnce(TIn) -> (TKey, TMapOut) + Send + Copy, - FReduce: FnOnce(TKey, Vec) -> (TKey, TReduce) + Send + Copy, + FReduce: FnOnce(TMapOut, TMapOut) -> TMapOut + Send + Copy + Sync, { /// Create a new MapReduce node. /// # Arguments @@ -579,7 +576,7 @@ where ) -> impl InOut where TInIter: IntoIterator, - TOutIter: FromIterator<(TKey, TReduce)>, + TOutIter: FromIterator<(TKey, TMapOut)>, { Self { threadpool: ThreadPool::with_capacity(n_worker), @@ -610,7 +607,7 @@ where ) -> impl InOut where TInIter: IntoIterator, - TOutIter: FromIterator<(TKey, TReduce)>, + TOutIter: FromIterator<(TKey, TMapOut)>, { assert!(n_replicas > 0); Self { @@ -622,17 +619,16 @@ where } } } -impl InOut - for MapReduce +impl InOut + for MapReduce where TIn: Send + Clone + 'static, TMapOut: Send + Clone + 'static, TInIter: IntoIterator, TKey: Send + Clone + 'static + Ord, - TReduce: Send + Clone + 'static, - TOutIter: FromIterator<(TKey, TReduce)>, + TOutIter: FromIterator<(TKey, TMapOut)>, FMap: FnOnce(TIn) -> (TKey, TMapOut) + Send + Copy, - FReduce: FnOnce(TKey, Vec) -> (TKey, TReduce) + Send + Copy, + FReduce: FnOnce(TMapOut, TMapOut) -> TMapOut + Send + Copy + Sync, { fn run(&mut self, input: TInIter) -> Option { let res: TOutIter = self @@ -654,30 +650,27 @@ where /// This node is slower than MapReduce but preserves the order of the input. /// This node is useful when the order of the input is important. #[derive(Clone)] -pub struct OrderedMapReduce +pub struct OrderedMapReduce where TIn: Send, TMapOut: Send, TKey: Send, - TReduce: Send, FMap: FnOnce(TIn) -> (TKey, TMapOut) + Send + Copy, - FReduce: FnOnce(TKey, Vec) -> (TKey, TReduce) + Send + Copy, + FReduce: FnOnce(TMapOut, TMapOut) -> TMapOut + Send + Copy, { threadpool: ThreadPool, replicas: usize, f_map: FMap, f_reduce: FReduce, - phantom: PhantomData<(TIn, TMapOut, TKey, TReduce)>, + phantom: PhantomData<(TIn, TMapOut, TKey)>, } -impl - OrderedMapReduce +impl OrderedMapReduce where TIn: Send + Clone + 'static, TMapOut: Send + Clone + 'static, TKey: Send + Clone + 'static + Ord, - TReduce: Send + Clone + 'static, FMap: FnOnce(TIn) -> (TKey, TMapOut) + Send + Copy, - FReduce: FnOnce(TKey, Vec) -> (TKey, TReduce) + Send + Copy, + FReduce: FnOnce(TMapOut, TMapOut) -> TMapOut + Send + Copy + Sync, { /// Create a new OrderedMapReduce node. /// # Arguments @@ -691,7 +684,7 @@ where ) -> impl InOut where TInIter: IntoIterator, - TOutIter: FromIterator<(TKey, TReduce)>, + TOutIter: FromIterator<(TKey, TMapOut)>, { Self { threadpool: ThreadPool::with_capacity(n_worker), @@ -768,7 +761,7 @@ where ) -> impl InOut where TInIter: IntoIterator, - TOutIter: FromIterator<(TKey, TReduce)>, + TOutIter: FromIterator<(TKey, TMapOut)>, { assert!(n_replicas > 0); Self { @@ -780,17 +773,16 @@ where } } } -impl InOut - for OrderedMapReduce +impl InOut + for OrderedMapReduce where TIn: Send + Clone + 'static, TMapOut: Send + Clone + 'static, TInIter: IntoIterator, TKey: Send + Clone + 'static + Ord, - TReduce: Send + Clone + 'static, - TOutIter: FromIterator<(TKey, TReduce)>, + TOutIter: FromIterator<(TKey, TMapOut)>, FMap: FnOnce(TIn) -> (TKey, TMapOut) + Send + Copy, - FReduce: FnOnce(TKey, Vec) -> (TKey, TReduce) + Send + Copy, + FReduce: FnOnce(TMapOut, TMapOut) -> TMapOut + Send + Copy + Sync, { fn run(&mut self, input: TInIter) -> Option { let res: TOutIter = self @@ -1060,7 +1052,7 @@ mod test { MapReduce::build( 8, |el: (usize, f64)| -> (usize, f64) { (el.0, el.1 * el.1) }, - |i, vec| { (i, vec.iter().sum()) } + |a, b| { a + b } ), SinkVec::build() ]; @@ -1103,7 +1095,7 @@ mod test { 2, 4, |el: (usize, f64)| -> (usize, f64) { (el.0, el.1 * el.1) }, - |i, vec| { (i, vec.iter().sum()) } + |a, b| { a + b } ), OrderedSinkVec::build() ]; diff --git a/src/thread_pool/mod.rs b/src/thread_pool/mod.rs index 0ba4b22c..da0435b0 100644 --- a/src/thread_pool/mod.rs +++ b/src/thread_pool/mod.rs @@ -426,23 +426,33 @@ impl ThreadPool { /// }).collect(); /// assert_eq!(res.len(), 10); /// ``` - pub fn par_map_reduce( + pub fn par_map_reduce( &mut self, iter: Iter, f: F, reduce: Reduce, - ) -> impl Iterator + ) -> impl Iterator where F: FnOnce(Iter::Item) -> (K, V) + Send + Copy, ::Item: Send, K: Send + Ord + 'static, V: Send + 'static, - R: Send + 'static, - Reduce: FnOnce(K, Vec) -> (K, R) + Send + Copy, + Reduce: FnOnce(V, V) -> V + Send + Copy + Sync, Iter: IntoIterator, { let map = self.par_map(iter, f); - self.par_reduce_by_key(map, reduce) + + // Shuffle by grouping the elements by key. + let mut ordered_map = BTreeMap::new(); + for (k, v) in map { + ordered_map.entry(k).or_insert_with(Vec::new).push(v); + } + let mut res = Vec::new(); + for el in ordered_map { + res.push((el.0, self.par_reduce(el.1, reduce))); + } + + res.into_iter() } /// Reduces in parallel the elements of an iterator `iter` by the function `f`. @@ -509,12 +519,12 @@ impl ThreadPool { while data.len() != 1 { let mut tmp = Vec::new(); let mut num_proc = self.num_workers; - + while data.len() < 2 * num_proc { num_proc -= 1; } let mut counter = 0; - + while !data.is_empty() { counter %= num_proc; tmp.push((counter, data.pop().unwrap())); @@ -522,16 +532,15 @@ impl ThreadPool { } data = self - .par_reduce_by_key(tmp, |k, v| { - (k, v.into_iter().reduce(|a, b| f(a, b)).unwrap()) - }) - .collect::>() - .into_iter() - .map(|(_a, b)| b) - .collect(); + .par_reduce_by_key(tmp, |k, v| { + (k, v.into_iter().reduce(|a, b| f(a, b)).unwrap()) + }) + .collect::>() + .into_iter() + .map(|(_a, b)| b) + .collect(); } data.pop().unwrap() - } /// Create a new scope to execute jobs on other threads. diff --git a/src/thread_pool/test.rs b/src/thread_pool/test.rs index e2ea95f5..96f8fdaa 100644 --- a/src/thread_pool/test.rs +++ b/src/thread_pool/test.rs @@ -128,11 +128,7 @@ fn test_par_map_reduce() { } } - let res = tp.par_map_reduce( - vec, - |el| -> (i32, i32) { (el, 1) }, - |k, v| (k, v.iter().sum::()), - ); + let res = tp.par_map_reduce(vec, |el| -> (i32, i32) { (el, 1) }, |a, b| a + b); let mut check = true; for (k, v) in res { diff --git a/tests/test_producer.rs b/tests/test_producer.rs index 348bc3eb..d43dd947 100644 --- a/tests/test_producer.rs +++ b/tests/test_producer.rs @@ -73,11 +73,7 @@ fn test_producer() { assert_eq!(res.len(), 5000); // Count the occurrences of each number. - let check = tp.par_map_reduce( - res, - |el| -> (usize, usize) { (el, 1) }, - |k, v| -> (usize, usize) { (k, v.iter().sum()) }, - ); + let check = tp.par_map_reduce(res, |el| -> (usize, usize) { (el, 1) }, |a, b| a + b); // Check that the number of occurrences is correct. for (_, v) in check {