Skip to content

Commit

Permalink
Refactoring MapReduce
Browse files Browse the repository at this point in the history
  • Loading branch information
valebes committed Nov 20, 2023
1 parent 75c24ae commit f1e5517
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 70 deletions.
16 changes: 3 additions & 13 deletions examples/wordcount/ppl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,9 @@ pub fn ppl_combined_map_reduce(dataset: &str, threads: usize) {
MapReduce::build_with_replicas(
threads / 2,
|str| -> (String, usize) { (str, 1) }, // Map function
|str, count| {
|a, b| {
// Reduce
let mut sum = 0;
for c in count {
sum += c;
}
(str, sum)
a + b
},
2
),
Expand Down Expand Up @@ -153,13 +149,7 @@ pub fn ppl_map(dataset: &str, threads: usize) {
})
.collect::<Vec<String>>(),
|str| -> (String, usize) { (str, 1) },
|str, count| {
let mut sum = 0;
for c in count {
sum += c;
}
(str, sum)
},
|a, b| a + b,
);

let mut total_words = 0;
Expand Down
56 changes: 24 additions & 32 deletions src/templates/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -505,30 +505,27 @@ where
/// Nodes of this type are composed of a Map and a Reduce.
/// The Map is applied to each element of the input, and the Reduce is applied to the output of the Map.
#[derive(Clone)]
pub struct MapReduce<TIn, TMapOut, TKey, TReduce, FMap, FReduce>
pub struct MapReduce<TIn, TMapOut, TKey, FMap, FReduce>
where
TIn: Send,
TMapOut: Send,
TKey: Send,
TReduce: Send,
FMap: FnOnce(TIn) -> (TKey, TMapOut) + Send + Copy,
FReduce: FnOnce(TKey, Vec<TMapOut>) -> (TKey, TReduce) + Send + Copy,
FReduce: FnOnce(TMapOut, TMapOut) -> TMapOut,
{
threadpool: ThreadPool,
replicas: usize,
f_map: FMap,
f_reduce: FReduce,
phantom: PhantomData<(TIn, TMapOut, TKey, TReduce)>,
phantom: PhantomData<(TIn, TMapOut, TKey)>,
}
impl<TIn, TMapOut, TKey, TReduce, FMap, FReduce>
MapReduce<TIn, TMapOut, TKey, TReduce, FMap, FReduce>
impl<TIn, TMapOut, TKey, FMap, FReduce> MapReduce<TIn, TMapOut, TKey, FMap, FReduce>
where
TIn: Send + Clone + 'static,
TMapOut: Send + Clone + 'static,
TKey: Send + Clone + 'static + Ord,
TReduce: Send + Clone + 'static,
FMap: FnOnce(TIn) -> (TKey, TMapOut) + Send + Copy,
FReduce: FnOnce(TKey, Vec<TMapOut>) -> (TKey, TReduce) + Send + Copy,
FReduce: FnOnce(TMapOut, TMapOut) -> TMapOut + Send + Copy + Sync,
{
/// Create a new MapReduce node.
/// # Arguments
Expand Down Expand Up @@ -579,7 +576,7 @@ where
) -> impl InOut<TInIter, TOutIter>
where
TInIter: IntoIterator<Item = TIn>,
TOutIter: FromIterator<(TKey, TReduce)>,
TOutIter: FromIterator<(TKey, TMapOut)>,
{
Self {
threadpool: ThreadPool::with_capacity(n_worker),
Expand Down Expand Up @@ -610,7 +607,7 @@ where
) -> impl InOut<TInIter, TOutIter>
where
TInIter: IntoIterator<Item = TIn>,
TOutIter: FromIterator<(TKey, TReduce)>,
TOutIter: FromIterator<(TKey, TMapOut)>,
{
assert!(n_replicas > 0);
Self {
Expand All @@ -622,17 +619,16 @@ where
}
}
}
impl<TIn, TMapOut, TInIter, TKey, TReduce, TOutIter, FMap, FReduce> InOut<TInIter, TOutIter>
for MapReduce<TIn, TMapOut, TKey, TReduce, FMap, FReduce>
impl<TIn, TMapOut, TInIter, TKey, TOutIter, FMap, FReduce> InOut<TInIter, TOutIter>
for MapReduce<TIn, TMapOut, TKey, FMap, FReduce>
where
TIn: Send + Clone + 'static,
TMapOut: Send + Clone + 'static,
TInIter: IntoIterator<Item = TIn>,
TKey: Send + Clone + 'static + Ord,
TReduce: Send + Clone + 'static,
TOutIter: FromIterator<(TKey, TReduce)>,
TOutIter: FromIterator<(TKey, TMapOut)>,
FMap: FnOnce(TIn) -> (TKey, TMapOut) + Send + Copy,
FReduce: FnOnce(TKey, Vec<TMapOut>) -> (TKey, TReduce) + Send + Copy,
FReduce: FnOnce(TMapOut, TMapOut) -> TMapOut + Send + Copy + Sync,
{
fn run(&mut self, input: TInIter) -> Option<TOutIter> {
let res: TOutIter = self
Expand All @@ -654,30 +650,27 @@ where
/// This node is slower than MapReduce but preserves the order of the input.
/// This node is useful when the order of the input is important.
#[derive(Clone)]
pub struct OrderedMapReduce<TIn, TMapOut, TKey, TReduce, FMap, FReduce>
pub struct OrderedMapReduce<TIn, TMapOut, TKey, FMap, FReduce>
where
TIn: Send,
TMapOut: Send,
TKey: Send,
TReduce: Send,
FMap: FnOnce(TIn) -> (TKey, TMapOut) + Send + Copy,
FReduce: FnOnce(TKey, Vec<TMapOut>) -> (TKey, TReduce) + Send + Copy,
FReduce: FnOnce(TMapOut, TMapOut) -> TMapOut + Send + Copy,
{
threadpool: ThreadPool,
replicas: usize,
f_map: FMap,
f_reduce: FReduce,
phantom: PhantomData<(TIn, TMapOut, TKey, TReduce)>,
phantom: PhantomData<(TIn, TMapOut, TKey)>,
}
impl<TIn, TMapOut, TKey, TReduce, FMap, FReduce>
OrderedMapReduce<TIn, TMapOut, TKey, TReduce, FMap, FReduce>
impl<TIn, TMapOut, TKey, FMap, FReduce> OrderedMapReduce<TIn, TMapOut, TKey, FMap, FReduce>
where
TIn: Send + Clone + 'static,
TMapOut: Send + Clone + 'static,
TKey: Send + Clone + 'static + Ord,
TReduce: Send + Clone + 'static,
FMap: FnOnce(TIn) -> (TKey, TMapOut) + Send + Copy,
FReduce: FnOnce(TKey, Vec<TMapOut>) -> (TKey, TReduce) + Send + Copy,
FReduce: FnOnce(TMapOut, TMapOut) -> TMapOut + Send + Copy + Sync,
{
/// Create a new OrderedMapReduce node.
/// # Arguments
Expand All @@ -691,7 +684,7 @@ where
) -> impl InOut<TInIter, TOutIter>
where
TInIter: IntoIterator<Item = TIn>,
TOutIter: FromIterator<(TKey, TReduce)>,
TOutIter: FromIterator<(TKey, TMapOut)>,
{
Self {
threadpool: ThreadPool::with_capacity(n_worker),
Expand Down Expand Up @@ -768,7 +761,7 @@ where
) -> impl InOut<TInIter, TOutIter>
where
TInIter: IntoIterator<Item = TIn>,
TOutIter: FromIterator<(TKey, TReduce)>,
TOutIter: FromIterator<(TKey, TMapOut)>,
{
assert!(n_replicas > 0);
Self {
Expand All @@ -780,17 +773,16 @@ where
}
}
}
impl<TIn, TMapOut, TInIter, TKey, TReduce, TOutIter, FMap, FReduce> InOut<TInIter, TOutIter>
for OrderedMapReduce<TIn, TMapOut, TKey, TReduce, FMap, FReduce>
impl<TIn, TMapOut, TInIter, TKey, TOutIter, FMap, FReduce> InOut<TInIter, TOutIter>
for OrderedMapReduce<TIn, TMapOut, TKey, FMap, FReduce>
where
TIn: Send + Clone + 'static,
TMapOut: Send + Clone + 'static,
TInIter: IntoIterator<Item = TIn>,
TKey: Send + Clone + 'static + Ord,
TReduce: Send + Clone + 'static,
TOutIter: FromIterator<(TKey, TReduce)>,
TOutIter: FromIterator<(TKey, TMapOut)>,
FMap: FnOnce(TIn) -> (TKey, TMapOut) + Send + Copy,
FReduce: FnOnce(TKey, Vec<TMapOut>) -> (TKey, TReduce) + Send + Copy,
FReduce: FnOnce(TMapOut, TMapOut) -> TMapOut + Send + Copy + Sync,
{
fn run(&mut self, input: TInIter) -> Option<TOutIter> {
let res: TOutIter = self
Expand Down Expand Up @@ -1060,7 +1052,7 @@ mod test {
MapReduce::build(
8,
|el: (usize, f64)| -> (usize, f64) { (el.0, el.1 * el.1) },
|i, vec| { (i, vec.iter().sum()) }
|a, b| { a + b }
),
SinkVec::build()
];
Expand Down Expand Up @@ -1103,7 +1095,7 @@ mod test {
2,
4,
|el: (usize, f64)| -> (usize, f64) { (el.0, el.1 * el.1) },
|i, vec| { (i, vec.iter().sum()) }
|a, b| { a + b }
),
OrderedSinkVec::build()
];
Expand Down
39 changes: 24 additions & 15 deletions src/thread_pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,23 +426,33 @@ impl ThreadPool {
/// }).collect();
/// assert_eq!(res.len(), 10);
/// ```
pub fn par_map_reduce<Iter, F, K, V, R, Reduce>(
pub fn par_map_reduce<Iter, F, K, V, Reduce>(
&mut self,
iter: Iter,
f: F,
reduce: Reduce,
) -> impl Iterator<Item = (K, R)>
) -> impl Iterator<Item = (K, V)>
where
F: FnOnce(Iter::Item) -> (K, V) + Send + Copy,
<Iter as IntoIterator>::Item: Send,
K: Send + Ord + 'static,
V: Send + 'static,
R: Send + 'static,
Reduce: FnOnce(K, Vec<V>) -> (K, R) + Send + Copy,
Reduce: FnOnce(V, V) -> V + Send + Copy + Sync,
Iter: IntoIterator,
{
let map = self.par_map(iter, f);
self.par_reduce_by_key(map, reduce)

// Shuffle by grouping the elements by key.
let mut ordered_map = BTreeMap::new();
for (k, v) in map {
ordered_map.entry(k).or_insert_with(Vec::new).push(v);
}
let mut res = Vec::new();
for el in ordered_map {
res.push((el.0, self.par_reduce(el.1, reduce)));
}

res.into_iter()
}

/// Reduces in parallel the elements of an iterator `iter` by the function `f`.
Expand Down Expand Up @@ -509,29 +519,28 @@ impl ThreadPool {
while data.len() != 1 {
let mut tmp = Vec::new();
let mut num_proc = self.num_workers;

while data.len() < 2 * num_proc {
num_proc -= 1;
}
let mut counter = 0;

while !data.is_empty() {
counter %= num_proc;
tmp.push((counter, data.pop().unwrap()));
counter += 1;
}

data = self
.par_reduce_by_key(tmp, |k, v| {
(k, v.into_iter().reduce(|a, b| f(a, b)).unwrap())
})
.collect::<Vec<(usize, V)>>()
.into_iter()
.map(|(_a, b)| b)
.collect();
.par_reduce_by_key(tmp, |k, v| {
(k, v.into_iter().reduce(|a, b| f(a, b)).unwrap())
})
.collect::<Vec<(usize, V)>>()
.into_iter()

Check warning on line 539 in src/thread_pool/mod.rs

View check run for this annotation

Codecov / codecov/patch

src/thread_pool/mod.rs#L538-L539

Added lines #L538 - L539 were not covered by tests
.map(|(_a, b)| b)
.collect();

Check warning on line 541 in src/thread_pool/mod.rs

View check run for this annotation

Codecov / codecov/patch

src/thread_pool/mod.rs#L541

Added line #L541 was not covered by tests
}
data.pop().unwrap()

}

/// Create a new scope to execute jobs on other threads.
Expand Down
6 changes: 1 addition & 5 deletions src/thread_pool/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,7 @@ fn test_par_map_reduce() {
}
}

let res = tp.par_map_reduce(
vec,
|el| -> (i32, i32) { (el, 1) },
|k, v| (k, v.iter().sum::<i32>()),
);
let res = tp.par_map_reduce(vec, |el| -> (i32, i32) { (el, 1) }, |a, b| a + b);

let mut check = true;
for (k, v) in res {
Expand Down
6 changes: 1 addition & 5 deletions tests/test_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,7 @@ fn test_producer() {
assert_eq!(res.len(), 5000);

// Count the occurrences of each number.
let check = tp.par_map_reduce(
res,
|el| -> (usize, usize) { (el, 1) },
|k, v| -> (usize, usize) { (k, v.iter().sum()) },
);
let check = tp.par_map_reduce(res, |el| -> (usize, usize) { (el, 1) }, |a, b| a + b);

// Check that the number of occurrences is correct.
for (_, v) in check {
Expand Down

0 comments on commit f1e5517

Please sign in to comment.