diff --git a/README.md b/README.md
index a6b3eea2..5978de70 100644
--- a/README.md
+++ b/README.md
@@ -1,15 +1,12 @@
# Kotlin for Apache® Spark™ [![Maven Central](https://img.shields.io/maven-central/v/org.jetbrains.kotlinx.spark/kotlin-spark-api-parent.svg?label=Maven%20Central)](https://search.maven.org/search?q=g:org.jetbrains.kotlinx.spark%20AND%20v:1.0.1) [![official JetBrains project](http://jb.gg/badges/incubator.svg)](https://confluence.jetbrains.com/display/ALL/JetBrains+on+GitHub)
-Your next API to work with [Apache Spark](https://spark.apache.org/).
-This project adds a missing layer of compatibility between [Kotlin](https://kotlinlang.org/)
-and [Apache Spark](https://spark.apache.org/). It allows Kotlin developers to use familiar language features such as
-data classes, and lambda expressions as simple expressions in curly braces or method references.
+Your next API to work with [Apache Spark](https://spark.apache.org/).
-We have opened a Spark Project Improvement
-Proposal: [Kotlin support for Apache Spark](http://issues.apache.org/jira/browse/SPARK-32530#) to work with the
-community towards getting Kotlin support as a first-class citizen in Apache Spark. We encourage you to voice your
-opinions and participate in the discussion.
+This project adds a missing layer of compatibility between [Kotlin](https://kotlinlang.org/) and [Apache Spark](https://spark.apache.org/).
+It allows Kotlin developers to use familiar language features such as data classes, and lambda expressions as simple expressions in curly braces or method references.
+
+We have opened a Spark Project Improvement Proposal: [Kotlin support for Apache Spark](http://issues.apache.org/jira/browse/SPARK-32530#) to work with the community towards getting Kotlin support as a first-class citizen in Apache Spark. We encourage you to voice your opinions and participate in the discussion.
## Table of Contents
@@ -24,7 +21,7 @@ opinions and participate in the discussion.
- [withCached function](#withcached-function)
- [toList and toArray](#tolist-and-toarray-methods)
- [Column infix/operator functions](#column-infixoperator-functions)
- - [`reduceGroups`](#reducegroups)
+ - [Overload Resolution Ambiguity](#overload-resolution-ambiguity)
- [Examples](#examples)
- [Reporting issues/Support](#reporting-issuessupport)
- [Code of Conduct](#code-of-conduct)
@@ -40,40 +37,35 @@ opinions and participate in the discussion.
## Releases
-The list of Kotlin for Apache Spark releases is
-available [here](https://github.com/JetBrains/kotlin-spark-api/releases/). The Kotlin for Spark artifacts adhere to the
-following convention:
-`[Apache Spark version]_[Scala core version]:[Kotlin for Apache Spark API version]`
+The list of Kotlin for Apache Spark releases is available [here](https://github.com/JetBrains/kotlin-spark-api/releases/).
+The Kotlin for Spark artifacts adhere to the following convention:
+`[Apache Spark version]_[Scala core version]:[Kotlin for Apache Spark API version]`
[![Maven Central](https://img.shields.io/maven-central/v/org.jetbrains.kotlinx.spark/kotlin-spark-api-parent.svg?label=Maven%20Central)](https://search.maven.org/search?q=g:%22org.jetbrains.kotlinx.spark%22%20AND%20a:%22kotlin-spark-api-3.0.0_2.12%22)
## How to configure Kotlin for Apache Spark in your project
-You can add Kotlin for Apache Spark as a dependency to your project: `Maven`, `Gradle`, `SBT`, and `leinengen` are
-supported.
-
+You can add Kotlin for Apache Spark as a dependency to your project: `Maven`, `Gradle`, `SBT`, and `leinengen` are supported.
+
Here's an example `pom.xml`:
```xml
-
- org.jetbrains.kotlinx.spark
- kotlin-spark-api-3.0.0
- ${kotlin-spark-api.version}
+ org.jetbrains.kotlinx.spark
+ kotlin-spark-api-3.0.0
+ ${kotlin-spark-api.version}
-org.apache.spark
-spark-sql_2.12
-${spark.version}
+ org.apache.spark
+ spark-sql_2.12
+ ${spark.version}
```
Note that `core` is being compiled against Scala version `2.12`.
-You can find a complete example with `pom.xml` and `build.gradle` in
-the [Quick Start Guide](https://github.com/JetBrains/kotlin-spark-api/wiki/Quick-Start-Guide).
-
-Once you have configured the dependency, you only need to add the following import to your Kotlin file:
+You can find a complete example with `pom.xml` and `build.gradle` in the [Quick Start Guide](https://github.com/JetBrains/kotlin-spark-api/wiki/Quick-Start-Guide).
+Once you have configured the dependency, you only need to add the following import to your Kotlin file:
```kotlin
import org.jetbrains.kotlinx.spark.api.*
```
@@ -81,104 +73,97 @@ import org.jetbrains.kotlinx.spark.api.*
## Kotlin for Apache Spark features
### Creating a SparkSession in Kotlin
-
```kotlin
val spark = SparkSession
- .builder()
- .master("local[2]")
- .appName("Simple Application").orCreate
+ .builder()
+ .master("local[2]")
+ .appName("Simple Application").orCreate
```
### Creating a Dataset in Kotlin
-
```kotlin
spark.toDS("a" to 1, "b" to 2)
```
-
The example above produces `Dataset>`.
-
+
### Null safety
-
-There are several aliases in API, like `leftJoin`, `rightJoin` etc. These are null-safe by design. For
-example, `leftJoin` is aware of nullability and returns `Dataset>`. Note that we are forcing `RIGHT`
-to be nullable for you as a developer to be able to handle this situation.
+There are several aliases in API, like `leftJoin`, `rightJoin` etc. These are null-safe by design.
+For example, `leftJoin` is aware of nullability and returns `Dataset>`.
+Note that we are forcing `RIGHT` to be nullable for you as a developer to be able to handle this situation.
`NullPointerException`s are hard to debug in Spark, and we doing our best to make them as rare as possible.
### withSpark function
-We provide you with useful function `withSpark`, which accepts everything that may be needed to run Spark — properties,
-name, master location and so on. It also accepts a block of code to execute inside Spark context.
+We provide you with useful function `withSpark`, which accepts everything that may be needed to run Spark — properties, name, master location and so on. It also accepts a block of code to execute inside Spark context.
After work block ends, `spark.stop()` is called automatically.
```kotlin
withSpark {
dsOf(1, 2)
- .map { it to it }
- .show()
+ .map { it to it }
+ .show()
}
```
`dsOf` is just one more way to create `Dataset` (`Dataset`) from varargs.
-### `withCached` function
-
-It can easily happen that we need to fork our computation to several paths. To compute things only once we should
-call `cache`
-method. However, it becomes difficult to control when we're using cached `Dataset` and when not. It is also easy to
-forget to unpersist cached data, which can break things unexpectedly or take up more memory than intended.
+### withCached function
+It can easily happen that we need to fork our computation to several paths. To compute things only once we should call `cache`
+method. However, it becomes difficult to control when we're using cached `Dataset` and when not.
+It is also easy to forget to unpersist cached data, which can break things unexpectedly or take up more memory
+than intended.
To solve these problems we've added `withCached` function
```kotlin
withSpark {
dsOf(1, 2, 3, 4, 5)
- .map { it to (it + 2) }
- .withCached {
- showDS()
-
- filter { it.first % 2 == 0 }.showDS()
- }
- .map { c(it.first, it.second, (it.first + it.second) * 2) }
- .show()
+ .map { it to (it + 2) }
+ .withCached {
+ showDS()
+
+ filter { it.first % 2 == 0 }.showDS()
+ }
+ .map { c(it.first, it.second, (it.first + it.second) * 2) }
+ .show()
}
```
-Here we're showing cached `Dataset` for debugging purposes then filtering it. The `filter` method returns
-filtered `Dataset` and then the cached `Dataset` is being unpersisted, so we have more memory t o call the `map` method
-and collect the resulting `Dataset`.
+Here we're showing cached `Dataset` for debugging purposes then filtering it.
+The `filter` method returns filtered `Dataset` and then the cached `Dataset` is being unpersisted, so we have more memory t
+o call the `map` method and collect the resulting `Dataset`.
-### `toList` and `toArray` methods
+### toList and toArray methods
-For more idiomatic Kotlin code we've added `toList` and `toArray` methods in this API. You can still use the `collect`
-method as in Scala API, however the result should be casted to `Array`. This is because `collect` returns a Scala array,
-which is not the same as Java/Kotlin one.
+For more idiomatic Kotlin code we've added `toList` and `toArray` methods in this API. You can still use the `collect` method as in Scala API, however the result should be casted to `Array`.
+ This is because `collect` returns a Scala array, which is not the same as Java/Kotlin one.
### Column infix/operator functions
-Similar to the Scala API for `Columns`, many of the operator functions could be ported over. For example:
-
+Similar to the Scala API for `Columns`, many of the operator functions could be ported over.
+For example:
```kotlin
-dataset.select(col("colA") + 5)
-dataset.select(col("colA") / col("colB"))
+dataset.select( col("colA") + 5 )
+dataset.select( col("colA") / col("colB") )
-dataset.where(col("colA") `===` 6)
+dataset.where( col("colA") `===` 6 )
// or alternatively
-dataset.where(col("colA") eq 6)
+dataset.where( col("colA") eq 6)
```
In short, all supported operators are:
- `==`,
-- `!=`,
+- `!=`,
- `eq` / `` `===` ``,
- `neq` / `` `=!=` ``,
- `-col(...)`,
-- `!col(...)`,
+- `!col(...)`,
- `gt`,
- `lt`,
-- `geq`,
+- `geq`,
- `leq`,
- `or`,
- `and` / `` `&&` ``,
@@ -190,53 +175,43 @@ In short, all supported operators are:
Secondly, there are some quality of life additions as well:
-In Kotlin, Ranges are often used to solve inclusive/exclusive situations for a range. So, you can now do:
-
+In Kotlin, Ranges are often
+used to solve inclusive/exclusive situations for a range. So, you can now do:
```kotlin
-dataset.where(col("colA") inRangeOf 0..2)
+dataset.where( col("colA") inRangeOf 0..2 )
```
Also, for columns containing map- or array like types:
```kotlin
-dataset.where(col("colB")[0] geq 5)
+dataset.where( col("colB")[0] geq 5 )
```
-Finally, thanks to Kotlin reflection, we can provide a type- and refactor safe way to create `TypedColumn`s and with
-those a new Dataset from pieces of another using the `selectTyped()` function, added to the API:
-
+Finally, thanks to Kotlin reflection, we can provide a type- and refactor safe way
+to create `TypedColumn`s and with those a new Dataset from pieces of another using the `selectTyped()` function, added to the API:
```kotlin
val dataset: Dataset = ...
val newDataset: Dataset> = dataset.selectTyped(col(YourClass::colA), col(YourClass::colB))
```
-### `reduceGroups`
+### Overload resolution ambiguity
+
+We had to implement the functions `reduceGroups` and `reduce` for Kotlin separately as `reduceGroupsK` and `reduceK` respectively, because otherwise it caused resolution ambiguity between Kotlin, Scala and Java APIs, which was quite hard to solve.
-We had to implemet `reduceGroups` operator for Kotlin separately as `reduceGroupsK` function, because otherwise it
-caused resolution ambiguity between Kotlin, Scala and Java APIs, which was quite hard to solve.
+We have a special example of work with this function in the [Groups example](https://github.com/JetBrains/kotlin-spark-api/edit/main/examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/Group.kt).
-We have a special example of work with this function in
-the [Groups example](https://github.com/JetBrains/kotlin-spark-api/edit/main/examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/Group.kt)
-.
## Examples
-For more, check
-out [examples](https://github.com/JetBrains/kotlin-spark-api/tree/master/examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples)
-module. To get up and running quickly, check out
-this [tutorial](https://github.com/JetBrains/kotlin-spark-api/wiki/Quick-Start-Guide).
+For more, check out [examples](https://github.com/JetBrains/kotlin-spark-api/tree/master/examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples) module.
+To get up and running quickly, check out this [tutorial](https://github.com/JetBrains/kotlin-spark-api/wiki/Quick-Start-Guide).
## Reporting issues/Support
-
-Please use [GitHub issues](https://github.com/JetBrains/kotlin-spark-api/issues) for filing feature requests and bug
-reports. You are also welcome to join [kotlin-spark channel](https://kotlinlang.slack.com/archives/C015B9ZRGJF) in the
-Kotlin Slack.
+Please use [GitHub issues](https://github.com/JetBrains/kotlin-spark-api/issues) for filing feature requests and bug reports.
+You are also welcome to join [kotlin-spark channel](https://kotlinlang.slack.com/archives/C015B9ZRGJF) in the Kotlin Slack.
## Code of Conduct
-
-This project and the corresponding community is governed by
-the [JetBrains Open Source and Community Code of Conduct](https://confluence.jetbrains.com/display/ALL/JetBrains+Open+Source+and+Community+Code+of+Conduct)
-. Please make sure you read it.
+This project and the corresponding community is governed by the [JetBrains Open Source and Community Code of Conduct](https://confluence.jetbrains.com/display/ALL/JetBrains+Open+Source+and+Community+Code+of+Conduct). Please make sure you read it.
## License
diff --git a/examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/Main.kt b/examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/Main.kt
index deada9e9..a253c12d 100644
--- a/examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/Main.kt
+++ b/examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/Main.kt
@@ -48,8 +48,8 @@ object Main {
// .also { it.printSchema() }
.map { (triple, pair) -> Five(triple.first, triple.second, triple.third, pair?.first, pair?.second) }
.groupByKey { it.a }
- .reduceGroups(ReduceFunction { v1, v2 -> v1.copy(a = v1.a + v2.a, b = v1.a + v2.a) })
- .map { it._2 }
+ .reduceGroupsK { v1, v2 -> v1.copy(a = v1.a + v2.a, b = v1.a + v2.a) }
+ .map { it.second }
.repartition(1)
.withCached {
write()
diff --git a/kotlin-spark-api/2.4/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt b/kotlin-spark-api/2.4/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt
index 4bc4811b..34152494 100644
--- a/kotlin-spark-api/2.4/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt
+++ b/kotlin-spark-api/2.4/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt
@@ -217,6 +217,13 @@ inline fun KeyValueGroupedDataset.reduc
reduceGroups(ReduceFunction(func))
.map { t -> t._1 to t._2 }
+/**
+ * (Kotlin-specific)
+ * Reduces the elements of this Dataset using the specified binary function. The given `func`
+ * must be commutative and associative or the result may be non-deterministic.
+ */
+inline fun Dataset.reduceK(noinline func: (T, T) -> T): T =
+ reduce(ReduceFunction(func))
@JvmName("takeKeysTuple2")
inline fun Dataset>.takeKeys(): Dataset = map { it._1() }
diff --git a/kotlin-spark-api/2.4/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt b/kotlin-spark-api/2.4/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt
index 8da89fb9..bae27b2e 100644
--- a/kotlin-spark-api/2.4/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt
+++ b/kotlin-spark-api/2.4/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt
@@ -21,20 +21,21 @@ import ch.tutteli.atrium.api.fluent.en_GB.*
import ch.tutteli.atrium.api.verbs.expect
import io.kotest.core.spec.style.ShouldSpec
import io.kotest.matchers.shouldBe
-import org.apache.spark.sql.streaming.GroupState
-import org.apache.spark.sql.streaming.GroupStateTimeout
-import scala.collection.Seq
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.TypedColumn
import org.apache.spark.sql.functions.*
+import org.apache.spark.sql.streaming.GroupState
+import org.apache.spark.sql.streaming.GroupStateTimeout
import scala.Product
import scala.Tuple1
import scala.Tuple2
import scala.Tuple3
+import scala.collection.Seq
import java.io.Serializable
import java.sql.Date
import java.sql.Timestamp
import java.time.LocalDate
+import kotlin.collections.Iterator
import scala.collection.Iterator as ScalaIterator
import scala.collection.Map as ScalaMap
import scala.collection.mutable.Map as ScalaMutableMap
@@ -457,7 +458,7 @@ class ApiTest : ShouldSpec({
SomeClass(intArrayOf(4, 3, 2), 1),
)
.groupByKey { it.b }
- .reduceGroupsK(func = { a, b -> SomeClass(a.a + b.a, a.b) })
+ .reduceGroupsK { a, b -> SomeClass(a.a + b.a, a.b) }
.takeValues()
dataset.count() shouldBe 1
@@ -473,6 +474,18 @@ class ApiTest : ShouldSpec({
dataset.sort(SomeClass::a, SomeClass::b)
dataset.takeAsList(1).first().b shouldBe 2
}
+ should("Have Kotlin ready functions in place of overload ambiguity") {
+ val dataset: Pair = dsOf(
+ SomeClass(intArrayOf(1, 2, 3), 1),
+ SomeClass(intArrayOf(4, 3, 2), 1),
+ )
+ .groupByKey { it: SomeClass -> it.b }
+ .reduceGroupsK { v1: SomeClass, v2: SomeClass -> v1 }
+ .filter { it: Pair -> true } // not sure why this does work, but reduce doesn't
+ .reduceK { v1: Pair, v2: Pair -> v1 }
+
+ dataset.second.a shouldBe intArrayOf(1, 2, 3)
+ }
should("Generate encoder correctly with complex enum data class") {
val dataset: Dataset =
dsOf(
@@ -495,7 +508,7 @@ class ApiTest : ShouldSpec({
first.int shouldBe 1
first.string shouldBe "string"
- first.strings shouldBe listOf("1","2")
+ first.strings shouldBe listOf("1", "2")
first.someEnum shouldBe SomeEnum.A
first.someOtherEnum shouldBe SomeOtherEnum.C
first.someEnums shouldBe listOf(SomeEnum.A, SomeEnum.B)
@@ -551,5 +564,5 @@ data class ComplexEnumDataClass(
val someOtherEnums: List,
val someEnumArray: Array,
val someOtherArray: Array,
- val enumMap: Map
+ val enumMap: Map,
)
diff --git a/kotlin-spark-api/3.0/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt b/kotlin-spark-api/3.0/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt
index 70abb7ed..2dde48cb 100644
--- a/kotlin-spark-api/3.0/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt
+++ b/kotlin-spark-api/3.0/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt
@@ -212,6 +212,13 @@ inline fun KeyValueGroupedDataset.reduc
reduceGroups(ReduceFunction(func))
.map { t -> t._1 to t._2 }
+/**
+ * (Kotlin-specific)
+ * Reduces the elements of this Dataset using the specified binary function. The given `func`
+ * must be commutative and associative or the result may be non-deterministic.
+ */
+inline fun Dataset.reduceK(noinline func: (T, T) -> T): T =
+ reduce(ReduceFunction(func))
@JvmName("takeKeysTuple2")
inline fun Dataset>.takeKeys(): Dataset = map { it._1() }
diff --git a/kotlin-spark-api/3.0/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt b/kotlin-spark-api/3.0/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt
index 36cb35fc..bff38ac1 100644
--- a/kotlin-spark-api/3.0/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt
+++ b/kotlin-spark-api/3.0/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt
@@ -499,7 +499,7 @@ class ApiTest : ShouldSpec({
SomeClass(intArrayOf(4, 3, 2), 1),
)
.groupByKey { it.b }
- .reduceGroupsK(func = { a, b -> SomeClass(a.a + b.a, a.b) })
+ .reduceGroupsK { a, b -> SomeClass(a.a + b.a, a.b) }
.takeValues()
dataset.count() shouldBe 1
@@ -515,6 +515,18 @@ class ApiTest : ShouldSpec({
dataset.sort(SomeClass::a, SomeClass::b)
dataset.takeAsList(1).first().b shouldBe 2
}
+ should("Have Kotlin ready functions in place of overload ambiguity") {
+ val dataset: Pair = dsOf(
+ SomeClass(intArrayOf(1, 2, 3), 1),
+ SomeClass(intArrayOf(4, 3, 2), 1),
+ )
+ .groupByKey { it: SomeClass -> it.b }
+ .reduceGroupsK { v1: SomeClass, v2: SomeClass -> v1 }
+ .filter { it: Pair -> true } // not sure why this does work, but reduce doesn't
+ .reduceK { v1: Pair, v2: Pair -> v1 }
+
+ dataset.second.a shouldBe intArrayOf(1, 2, 3)
+ }
should("Generate encoder correctly with complex enum data class") {
val dataset: Dataset =
dsOf(
@@ -537,7 +549,7 @@ class ApiTest : ShouldSpec({
first.int shouldBe 1
first.string shouldBe "string"
- first.strings shouldBe listOf("1","2")
+ first.strings shouldBe listOf("1", "2")
first.someEnum shouldBe SomeEnum.A
first.someOtherEnum shouldBe SomeOtherEnum.C
first.someEnums shouldBe listOf(SomeEnum.A, SomeEnum.B)
@@ -559,6 +571,7 @@ data class SomeClass(val a: IntArray, val b: Int) : Serializable
data class SomeOtherClass(val a: IntArray, val b: Int, val c: Boolean) : Serializable
+
enum class SomeEnum { A, B }
enum class SomeOtherEnum(val value: Int) { C(1), D(2) }
@@ -573,5 +586,5 @@ data class ComplexEnumDataClass(
val someOtherEnums: List,
val someEnumArray: Array,
val someOtherArray: Array,
- val enumMap: Map
+ val enumMap: Map,
)
\ No newline at end of file