Skip to content

Commit

Permalink
Update examples and README with the new Reduce function.
Browse files Browse the repository at this point in the history
  • Loading branch information
valebes committed Nov 24, 2023
1 parent ca36df2 commit 807cc3c
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 181 deletions.
36 changes: 13 additions & 23 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -275,8 +275,8 @@ The pipeline approach in PPL provides an intuitive and flexible way to express c

In the word counter example, the pipeline involves the following stages:
- **Source**: Reads the dataset and emits lines of text.
- **Map**: Converts each line of text into a list of words, where each word is paired with a count of 1.
- **Reduce**: Aggregates the counts of words by summing them for each unique word.
- **MapReduce**: The map function converts each line of text into a list of words, where each word is paired with a count of 1.
Moreover, the reduce function aggregates the counts of words by summing them for each unique word.
- **Sink**: Stores the final word counts in a hashmap.

By breaking down the computation into stages and leveraging parallelism, PPL's pipeline approach allows for efficient distribution of work across multiple threads or cores, leading to faster execution.
Expand All @@ -285,7 +285,7 @@ Here's the Rust code for the word counter using the pipeline approach:

```rust
use ppl::{
templates::map::{Map, Reduce},
templates::map::MapReduce,
prelude::*,
};

Expand Down Expand Up @@ -317,21 +317,17 @@ impl In<Vec<(String, usize)>, Vec<(String, usize)>> for Sink {
pub fn ppl(dataset: &str, threads: usize) {
// Initialization and configuration...

let mut p = pipeline![
let mut p = pipeline![
Source { reader },
Map::build::<Vec<String>, Vec<(String, usize)>>(threads / 2, |str| -> (String, usize) {
(str, 1)
}),
Reduce::build(threads / 2, |str, count| {
let mut sum = 0;
for c in count {
sum += c;
}
(str, sum)
}),
MapReduce::build_with_replicas(
threads / 2,
|str| -> (String, usize) { (str, 1) }, // Map function
|a, b| a + b,
2
),
Sink {
counter: HashMap::new(),
},
counter: HashMap::new()
}
];

p.start();
Expand Down Expand Up @@ -379,13 +375,7 @@ pub fn ppl_map(dataset: &str, threads: usize) {
})
.collect::<Vec<String>>(),
|str| -> (String, usize) { (str, 1) },
|str, count| {
let mut sum = 0;
for c in count {
sum += c;
}
(str, sum)
},
|a, b| a + b,
);
}
```
Expand Down
1 change: 0 additions & 1 deletion examples/wordcount/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ fn main() {
//"rayon" => rayon::rayon(dir_name, threads),
//"std-threads" => std_threads::std_threads(dir_name, threads),
"ppl" => {
timeit(|| ppl::ppl(dataset, threads));
timeit(|| ppl::ppl_combined_map_reduce(dataset, threads));
timeit(|| ppl::ppl_map(dataset, threads));
}
Expand Down
41 changes: 2 additions & 39 deletions examples/wordcount/ppl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,7 @@ use std::{
usize,
};

use ppl::{
prelude::*,
templates::map::{Map, MapReduce, Reduce},
};
use ppl::{prelude::*, templates::map::MapReduce};

struct Source {
reader: BufReader<File>,
Expand Down Expand Up @@ -59,37 +56,6 @@ impl In<Vec<(String, usize)>, Vec<(String, usize)>> for Sink {
}
}

pub fn ppl(dataset: &str, threads: usize) {
let file = File::open(dataset).expect("no such file");
let reader = BufReader::new(file);

let mut p = pipeline![
Source { reader },
Map::build::<Vec<String>, Vec<(String, usize)>>(threads / 2, |str| -> (String, usize) {
(str, 1)
}),
Reduce::build(threads / 2, |str, count| {
let mut sum = 0;
for c in count {
sum += c;
}
(str, sum)
}),
Sink {
counter: HashMap::new()
}
];

p.start();
let res = p.wait_end();

let mut total_words = 0;
for (_key, value) in res.unwrap() {
total_words += value;
}
println!("[PIPELINE] Total words: {}", total_words);
}

// Version that use a node that combine map and reduce
pub fn ppl_combined_map_reduce(dataset: &str, threads: usize) {
let file = File::open(dataset).expect("no such file");
Expand All @@ -100,10 +66,7 @@ pub fn ppl_combined_map_reduce(dataset: &str, threads: usize) {
MapReduce::build_with_replicas(
threads / 2,
|str| -> (String, usize) { (str, 1) }, // Map function
|a, b| {
// Reduce
a + b
},
|a, b| a + b,
2
),
Sink {
Expand Down
Loading

0 comments on commit 807cc3c

Please sign in to comment.