From 72fb5ea2c6aa4ce081a64751db4a9cfd2c31b0d6 Mon Sep 17 00:00:00 2001 From: Jolanrensen Date: Fri, 9 Jul 2021 18:39:15 +0200 Subject: [PATCH] fix: adds `reduceK` function to avoid resolution ambiguity for `reduce` --- README.md | 169 ++++++++---------- .../jetbrains/kotlinx/spark/examples/Main.kt | 4 +- .../org/jetbrains/kotlinx/spark/api/ApiV1.kt | 7 + .../jetbrains/kotlinx/spark/api/ApiTest.kt | 25 ++- .../org/jetbrains/kotlinx/spark/api/ApiV1.kt | 7 + .../jetbrains/kotlinx/spark/api/ApiTest.kt | 19 +- 6 files changed, 123 insertions(+), 108 deletions(-) diff --git a/README.md b/README.md index a6b3eea2..5978de70 100644 --- a/README.md +++ b/README.md @@ -1,15 +1,12 @@ # Kotlin for Apache® Spark™ [![Maven Central](https://img.shields.io/maven-central/v/org.jetbrains.kotlinx.spark/kotlin-spark-api-parent.svg?label=Maven%20Central)](https://search.maven.org/search?q=g:org.jetbrains.kotlinx.spark%20AND%20v:1.0.1) [![official JetBrains project](http://jb.gg/badges/incubator.svg)](https://confluence.jetbrains.com/display/ALL/JetBrains+on+GitHub) -Your next API to work with [Apache Spark](https://spark.apache.org/). -This project adds a missing layer of compatibility between [Kotlin](https://kotlinlang.org/) -and [Apache Spark](https://spark.apache.org/). It allows Kotlin developers to use familiar language features such as -data classes, and lambda expressions as simple expressions in curly braces or method references. +Your next API to work with [Apache Spark](https://spark.apache.org/). -We have opened a Spark Project Improvement -Proposal: [Kotlin support for Apache Spark](http://issues.apache.org/jira/browse/SPARK-32530#) to work with the -community towards getting Kotlin support as a first-class citizen in Apache Spark. We encourage you to voice your -opinions and participate in the discussion. +This project adds a missing layer of compatibility between [Kotlin](https://kotlinlang.org/) and [Apache Spark](https://spark.apache.org/). +It allows Kotlin developers to use familiar language features such as data classes, and lambda expressions as simple expressions in curly braces or method references. + +We have opened a Spark Project Improvement Proposal: [Kotlin support for Apache Spark](http://issues.apache.org/jira/browse/SPARK-32530#) to work with the community towards getting Kotlin support as a first-class citizen in Apache Spark. We encourage you to voice your opinions and participate in the discussion. ## Table of Contents @@ -24,7 +21,7 @@ opinions and participate in the discussion. - [withCached function](#withcached-function) - [toList and toArray](#tolist-and-toarray-methods) - [Column infix/operator functions](#column-infixoperator-functions) - - [`reduceGroups`](#reducegroups) + - [Overload Resolution Ambiguity](#overload-resolution-ambiguity) - [Examples](#examples) - [Reporting issues/Support](#reporting-issuessupport) - [Code of Conduct](#code-of-conduct) @@ -40,40 +37,35 @@ opinions and participate in the discussion. ## Releases -The list of Kotlin for Apache Spark releases is -available [here](https://github.com/JetBrains/kotlin-spark-api/releases/). The Kotlin for Spark artifacts adhere to the -following convention: -`[Apache Spark version]_[Scala core version]:[Kotlin for Apache Spark API version]` +The list of Kotlin for Apache Spark releases is available [here](https://github.com/JetBrains/kotlin-spark-api/releases/). +The Kotlin for Spark artifacts adhere to the following convention: +`[Apache Spark version]_[Scala core version]:[Kotlin for Apache Spark API version]` [![Maven Central](https://img.shields.io/maven-central/v/org.jetbrains.kotlinx.spark/kotlin-spark-api-parent.svg?label=Maven%20Central)](https://search.maven.org/search?q=g:%22org.jetbrains.kotlinx.spark%22%20AND%20a:%22kotlin-spark-api-3.0.0_2.12%22) ## How to configure Kotlin for Apache Spark in your project -You can add Kotlin for Apache Spark as a dependency to your project: `Maven`, `Gradle`, `SBT`, and `leinengen` are -supported. - +You can add Kotlin for Apache Spark as a dependency to your project: `Maven`, `Gradle`, `SBT`, and `leinengen` are supported. + Here's an example `pom.xml`: ```xml - - org.jetbrains.kotlinx.spark - kotlin-spark-api-3.0.0 - ${kotlin-spark-api.version} + org.jetbrains.kotlinx.spark + kotlin-spark-api-3.0.0 + ${kotlin-spark-api.version} -org.apache.spark -spark-sql_2.12 -${spark.version} + org.apache.spark + spark-sql_2.12 + ${spark.version} ``` Note that `core` is being compiled against Scala version `2.12`. -You can find a complete example with `pom.xml` and `build.gradle` in -the [Quick Start Guide](https://github.com/JetBrains/kotlin-spark-api/wiki/Quick-Start-Guide). - -Once you have configured the dependency, you only need to add the following import to your Kotlin file: +You can find a complete example with `pom.xml` and `build.gradle` in the [Quick Start Guide](https://github.com/JetBrains/kotlin-spark-api/wiki/Quick-Start-Guide). +Once you have configured the dependency, you only need to add the following import to your Kotlin file: ```kotlin import org.jetbrains.kotlinx.spark.api.* ``` @@ -81,104 +73,97 @@ import org.jetbrains.kotlinx.spark.api.* ## Kotlin for Apache Spark features ### Creating a SparkSession in Kotlin - ```kotlin val spark = SparkSession - .builder() - .master("local[2]") - .appName("Simple Application").orCreate + .builder() + .master("local[2]") + .appName("Simple Application").orCreate ``` ### Creating a Dataset in Kotlin - ```kotlin spark.toDS("a" to 1, "b" to 2) ``` - The example above produces `Dataset>`. - + ### Null safety - -There are several aliases in API, like `leftJoin`, `rightJoin` etc. These are null-safe by design. For -example, `leftJoin` is aware of nullability and returns `Dataset>`. Note that we are forcing `RIGHT` -to be nullable for you as a developer to be able to handle this situation. +There are several aliases in API, like `leftJoin`, `rightJoin` etc. These are null-safe by design. +For example, `leftJoin` is aware of nullability and returns `Dataset>`. +Note that we are forcing `RIGHT` to be nullable for you as a developer to be able to handle this situation. `NullPointerException`s are hard to debug in Spark, and we doing our best to make them as rare as possible. ### withSpark function -We provide you with useful function `withSpark`, which accepts everything that may be needed to run Spark — properties, -name, master location and so on. It also accepts a block of code to execute inside Spark context. +We provide you with useful function `withSpark`, which accepts everything that may be needed to run Spark — properties, name, master location and so on. It also accepts a block of code to execute inside Spark context. After work block ends, `spark.stop()` is called automatically. ```kotlin withSpark { dsOf(1, 2) - .map { it to it } - .show() + .map { it to it } + .show() } ``` `dsOf` is just one more way to create `Dataset` (`Dataset`) from varargs. -### `withCached` function - -It can easily happen that we need to fork our computation to several paths. To compute things only once we should -call `cache` -method. However, it becomes difficult to control when we're using cached `Dataset` and when not. It is also easy to -forget to unpersist cached data, which can break things unexpectedly or take up more memory than intended. +### withCached function +It can easily happen that we need to fork our computation to several paths. To compute things only once we should call `cache` +method. However, it becomes difficult to control when we're using cached `Dataset` and when not. +It is also easy to forget to unpersist cached data, which can break things unexpectedly or take up more memory +than intended. To solve these problems we've added `withCached` function ```kotlin withSpark { dsOf(1, 2, 3, 4, 5) - .map { it to (it + 2) } - .withCached { - showDS() - - filter { it.first % 2 == 0 }.showDS() - } - .map { c(it.first, it.second, (it.first + it.second) * 2) } - .show() + .map { it to (it + 2) } + .withCached { + showDS() + + filter { it.first % 2 == 0 }.showDS() + } + .map { c(it.first, it.second, (it.first + it.second) * 2) } + .show() } ``` -Here we're showing cached `Dataset` for debugging purposes then filtering it. The `filter` method returns -filtered `Dataset` and then the cached `Dataset` is being unpersisted, so we have more memory t o call the `map` method -and collect the resulting `Dataset`. +Here we're showing cached `Dataset` for debugging purposes then filtering it. +The `filter` method returns filtered `Dataset` and then the cached `Dataset` is being unpersisted, so we have more memory t +o call the `map` method and collect the resulting `Dataset`. -### `toList` and `toArray` methods +### toList and toArray methods -For more idiomatic Kotlin code we've added `toList` and `toArray` methods in this API. You can still use the `collect` -method as in Scala API, however the result should be casted to `Array`. This is because `collect` returns a Scala array, -which is not the same as Java/Kotlin one. +For more idiomatic Kotlin code we've added `toList` and `toArray` methods in this API. You can still use the `collect` method as in Scala API, however the result should be casted to `Array`. + This is because `collect` returns a Scala array, which is not the same as Java/Kotlin one. ### Column infix/operator functions -Similar to the Scala API for `Columns`, many of the operator functions could be ported over. For example: - +Similar to the Scala API for `Columns`, many of the operator functions could be ported over. +For example: ```kotlin -dataset.select(col("colA") + 5) -dataset.select(col("colA") / col("colB")) +dataset.select( col("colA") + 5 ) +dataset.select( col("colA") / col("colB") ) -dataset.where(col("colA") `===` 6) +dataset.where( col("colA") `===` 6 ) // or alternatively -dataset.where(col("colA") eq 6) +dataset.where( col("colA") eq 6) ``` In short, all supported operators are: - `==`, -- `!=`, +- `!=`, - `eq` / `` `===` ``, - `neq` / `` `=!=` ``, - `-col(...)`, -- `!col(...)`, +- `!col(...)`, - `gt`, - `lt`, -- `geq`, +- `geq`, - `leq`, - `or`, - `and` / `` `&&` ``, @@ -190,53 +175,43 @@ In short, all supported operators are: Secondly, there are some quality of life additions as well: -In Kotlin, Ranges are often used to solve inclusive/exclusive situations for a range. So, you can now do: - +In Kotlin, Ranges are often +used to solve inclusive/exclusive situations for a range. So, you can now do: ```kotlin -dataset.where(col("colA") inRangeOf 0..2) +dataset.where( col("colA") inRangeOf 0..2 ) ``` Also, for columns containing map- or array like types: ```kotlin -dataset.where(col("colB")[0] geq 5) +dataset.where( col("colB")[0] geq 5 ) ``` -Finally, thanks to Kotlin reflection, we can provide a type- and refactor safe way to create `TypedColumn`s and with -those a new Dataset from pieces of another using the `selectTyped()` function, added to the API: - +Finally, thanks to Kotlin reflection, we can provide a type- and refactor safe way +to create `TypedColumn`s and with those a new Dataset from pieces of another using the `selectTyped()` function, added to the API: ```kotlin val dataset: Dataset = ... val newDataset: Dataset> = dataset.selectTyped(col(YourClass::colA), col(YourClass::colB)) ``` -### `reduceGroups` +### Overload resolution ambiguity + +We had to implement the functions `reduceGroups` and `reduce` for Kotlin separately as `reduceGroupsK` and `reduceK` respectively, because otherwise it caused resolution ambiguity between Kotlin, Scala and Java APIs, which was quite hard to solve. -We had to implemet `reduceGroups` operator for Kotlin separately as `reduceGroupsK` function, because otherwise it -caused resolution ambiguity between Kotlin, Scala and Java APIs, which was quite hard to solve. +We have a special example of work with this function in the [Groups example](https://github.com/JetBrains/kotlin-spark-api/edit/main/examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/Group.kt). -We have a special example of work with this function in -the [Groups example](https://github.com/JetBrains/kotlin-spark-api/edit/main/examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/Group.kt) -. ## Examples -For more, check -out [examples](https://github.com/JetBrains/kotlin-spark-api/tree/master/examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples) -module. To get up and running quickly, check out -this [tutorial](https://github.com/JetBrains/kotlin-spark-api/wiki/Quick-Start-Guide). +For more, check out [examples](https://github.com/JetBrains/kotlin-spark-api/tree/master/examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples) module. +To get up and running quickly, check out this [tutorial](https://github.com/JetBrains/kotlin-spark-api/wiki/Quick-Start-Guide). ## Reporting issues/Support - -Please use [GitHub issues](https://github.com/JetBrains/kotlin-spark-api/issues) for filing feature requests and bug -reports. You are also welcome to join [kotlin-spark channel](https://kotlinlang.slack.com/archives/C015B9ZRGJF) in the -Kotlin Slack. +Please use [GitHub issues](https://github.com/JetBrains/kotlin-spark-api/issues) for filing feature requests and bug reports. +You are also welcome to join [kotlin-spark channel](https://kotlinlang.slack.com/archives/C015B9ZRGJF) in the Kotlin Slack. ## Code of Conduct - -This project and the corresponding community is governed by -the [JetBrains Open Source and Community Code of Conduct](https://confluence.jetbrains.com/display/ALL/JetBrains+Open+Source+and+Community+Code+of+Conduct) -. Please make sure you read it. +This project and the corresponding community is governed by the [JetBrains Open Source and Community Code of Conduct](https://confluence.jetbrains.com/display/ALL/JetBrains+Open+Source+and+Community+Code+of+Conduct). Please make sure you read it. ## License diff --git a/examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/Main.kt b/examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/Main.kt index deada9e9..a253c12d 100644 --- a/examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/Main.kt +++ b/examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/Main.kt @@ -48,8 +48,8 @@ object Main { // .also { it.printSchema() } .map { (triple, pair) -> Five(triple.first, triple.second, triple.third, pair?.first, pair?.second) } .groupByKey { it.a } - .reduceGroups(ReduceFunction { v1, v2 -> v1.copy(a = v1.a + v2.a, b = v1.a + v2.a) }) - .map { it._2 } + .reduceGroupsK { v1, v2 -> v1.copy(a = v1.a + v2.a, b = v1.a + v2.a) } + .map { it.second } .repartition(1) .withCached { write() diff --git a/kotlin-spark-api/2.4/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt b/kotlin-spark-api/2.4/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt index 4bc4811b..34152494 100644 --- a/kotlin-spark-api/2.4/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt +++ b/kotlin-spark-api/2.4/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt @@ -217,6 +217,13 @@ inline fun KeyValueGroupedDataset.reduc reduceGroups(ReduceFunction(func)) .map { t -> t._1 to t._2 } +/** + * (Kotlin-specific) + * Reduces the elements of this Dataset using the specified binary function. The given `func` + * must be commutative and associative or the result may be non-deterministic. + */ +inline fun Dataset.reduceK(noinline func: (T, T) -> T): T = + reduce(ReduceFunction(func)) @JvmName("takeKeysTuple2") inline fun Dataset>.takeKeys(): Dataset = map { it._1() } diff --git a/kotlin-spark-api/2.4/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt b/kotlin-spark-api/2.4/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt index 8da89fb9..bae27b2e 100644 --- a/kotlin-spark-api/2.4/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt +++ b/kotlin-spark-api/2.4/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt @@ -21,20 +21,21 @@ import ch.tutteli.atrium.api.fluent.en_GB.* import ch.tutteli.atrium.api.verbs.expect import io.kotest.core.spec.style.ShouldSpec import io.kotest.matchers.shouldBe -import org.apache.spark.sql.streaming.GroupState -import org.apache.spark.sql.streaming.GroupStateTimeout -import scala.collection.Seq import org.apache.spark.sql.Dataset import org.apache.spark.sql.TypedColumn import org.apache.spark.sql.functions.* +import org.apache.spark.sql.streaming.GroupState +import org.apache.spark.sql.streaming.GroupStateTimeout import scala.Product import scala.Tuple1 import scala.Tuple2 import scala.Tuple3 +import scala.collection.Seq import java.io.Serializable import java.sql.Date import java.sql.Timestamp import java.time.LocalDate +import kotlin.collections.Iterator import scala.collection.Iterator as ScalaIterator import scala.collection.Map as ScalaMap import scala.collection.mutable.Map as ScalaMutableMap @@ -457,7 +458,7 @@ class ApiTest : ShouldSpec({ SomeClass(intArrayOf(4, 3, 2), 1), ) .groupByKey { it.b } - .reduceGroupsK(func = { a, b -> SomeClass(a.a + b.a, a.b) }) + .reduceGroupsK { a, b -> SomeClass(a.a + b.a, a.b) } .takeValues() dataset.count() shouldBe 1 @@ -473,6 +474,18 @@ class ApiTest : ShouldSpec({ dataset.sort(SomeClass::a, SomeClass::b) dataset.takeAsList(1).first().b shouldBe 2 } + should("Have Kotlin ready functions in place of overload ambiguity") { + val dataset: Pair = dsOf( + SomeClass(intArrayOf(1, 2, 3), 1), + SomeClass(intArrayOf(4, 3, 2), 1), + ) + .groupByKey { it: SomeClass -> it.b } + .reduceGroupsK { v1: SomeClass, v2: SomeClass -> v1 } + .filter { it: Pair -> true } // not sure why this does work, but reduce doesn't + .reduceK { v1: Pair, v2: Pair -> v1 } + + dataset.second.a shouldBe intArrayOf(1, 2, 3) + } should("Generate encoder correctly with complex enum data class") { val dataset: Dataset = dsOf( @@ -495,7 +508,7 @@ class ApiTest : ShouldSpec({ first.int shouldBe 1 first.string shouldBe "string" - first.strings shouldBe listOf("1","2") + first.strings shouldBe listOf("1", "2") first.someEnum shouldBe SomeEnum.A first.someOtherEnum shouldBe SomeOtherEnum.C first.someEnums shouldBe listOf(SomeEnum.A, SomeEnum.B) @@ -551,5 +564,5 @@ data class ComplexEnumDataClass( val someOtherEnums: List, val someEnumArray: Array, val someOtherArray: Array, - val enumMap: Map + val enumMap: Map, ) diff --git a/kotlin-spark-api/3.0/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt b/kotlin-spark-api/3.0/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt index 70abb7ed..2dde48cb 100644 --- a/kotlin-spark-api/3.0/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt +++ b/kotlin-spark-api/3.0/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt @@ -212,6 +212,13 @@ inline fun KeyValueGroupedDataset.reduc reduceGroups(ReduceFunction(func)) .map { t -> t._1 to t._2 } +/** + * (Kotlin-specific) + * Reduces the elements of this Dataset using the specified binary function. The given `func` + * must be commutative and associative or the result may be non-deterministic. + */ +inline fun Dataset.reduceK(noinline func: (T, T) -> T): T = + reduce(ReduceFunction(func)) @JvmName("takeKeysTuple2") inline fun Dataset>.takeKeys(): Dataset = map { it._1() } diff --git a/kotlin-spark-api/3.0/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt b/kotlin-spark-api/3.0/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt index 36cb35fc..bff38ac1 100644 --- a/kotlin-spark-api/3.0/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt +++ b/kotlin-spark-api/3.0/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt @@ -499,7 +499,7 @@ class ApiTest : ShouldSpec({ SomeClass(intArrayOf(4, 3, 2), 1), ) .groupByKey { it.b } - .reduceGroupsK(func = { a, b -> SomeClass(a.a + b.a, a.b) }) + .reduceGroupsK { a, b -> SomeClass(a.a + b.a, a.b) } .takeValues() dataset.count() shouldBe 1 @@ -515,6 +515,18 @@ class ApiTest : ShouldSpec({ dataset.sort(SomeClass::a, SomeClass::b) dataset.takeAsList(1).first().b shouldBe 2 } + should("Have Kotlin ready functions in place of overload ambiguity") { + val dataset: Pair = dsOf( + SomeClass(intArrayOf(1, 2, 3), 1), + SomeClass(intArrayOf(4, 3, 2), 1), + ) + .groupByKey { it: SomeClass -> it.b } + .reduceGroupsK { v1: SomeClass, v2: SomeClass -> v1 } + .filter { it: Pair -> true } // not sure why this does work, but reduce doesn't + .reduceK { v1: Pair, v2: Pair -> v1 } + + dataset.second.a shouldBe intArrayOf(1, 2, 3) + } should("Generate encoder correctly with complex enum data class") { val dataset: Dataset = dsOf( @@ -537,7 +549,7 @@ class ApiTest : ShouldSpec({ first.int shouldBe 1 first.string shouldBe "string" - first.strings shouldBe listOf("1","2") + first.strings shouldBe listOf("1", "2") first.someEnum shouldBe SomeEnum.A first.someOtherEnum shouldBe SomeOtherEnum.C first.someEnums shouldBe listOf(SomeEnum.A, SomeEnum.B) @@ -559,6 +571,7 @@ data class SomeClass(val a: IntArray, val b: Int) : Serializable data class SomeOtherClass(val a: IntArray, val b: Int, val c: Boolean) : Serializable + enum class SomeEnum { A, B } enum class SomeOtherEnum(val value: Int) { C(1), D(2) } @@ -573,5 +586,5 @@ data class ComplexEnumDataClass( val someOtherEnums: List, val someEnumArray: Array, val someOtherArray: Array, - val enumMap: Map + val enumMap: Map, ) \ No newline at end of file