-
Notifications
You must be signed in to change notification settings - Fork 249
join cogroup groupbykey
Join() returns an dataset of [key, leftValue, rightValue], where [key, leftValue] comes from one dataset, and [key, rightValue] from the other dataset.
CoGroup() returns an dataset of [key, leftValues, rightValues], where [key, leftValue] entries from one dataset are group together into [key, leftValues], and [key, rightValue] from the other dataset are grouped into [key, rightValues], and both grouped entries are combined into [key, leftValues, rightValues].
GroupByKey() returns an dataset of [key, values], where [key, value] entries from one dataset are group together.
Join(), GroupByKey() and CoGroup() all depend on Partition(). Both of the input datasets should be partitioned by the same key, and partitioned to the same number of shards. Otherwise, a relatively costly partitioning will be performed.
Here is the source code for Join()
func (d *Dataset) Join(other *Dataset) *Dataset {
sorted_d := d.Partition(len(d.Shards)).LocalSort(nil)
var sorted_other *Dataset
if d == other {
sorted_other = sorted_d
} else {
sorted_other = other.Partition(len(d.Shards)).LocalSort(nil)
}
return sorted_d.JoinPartitionedSorted(sorted_other, nil, false, false)
}
reg, err := regexp.Compile("[^A-Za-z0-9]+")
if err != nil {
panic(err)
}
tokenizer := func(line string, ch chan string) {
line = reg.ReplaceAllString(line, "-")
for _, token := range strings.Split(line, "-") {
ch <- strings.ToLower(token)
}
}
f1 := flow.New()
leftWords := f1.TextFile(
"/etc/passwd", 3,
).Map(tokenizer).Map(func(t string) (string, int) {
return t, 1
}).Sort(nil).LocalReduceByKey(func(x, y int) int {
return x + y
})
rightWords := f1.TextFile(
"word_count.go", 3,
).Map(tokenizer).Map(func(t string) (string, int) {
return t, 1
}).Sort(nil).LocalReduceByKey(func(x, y int) int {
return x + y
})
leftWords.Join(rightWords).Map(func(key string, left, right int) {
println(key, ":", left, ":", right)
}).Run()
Current implementation use default sorting, which can compare keys of int, string, float, and time.Time. Let me know if you need some special comparator.