diff --git a/heron/api/src/scala/BUILD b/heron/api/src/scala/BUILD new file mode 100644 index 00000000000..0a4dbc4c407 --- /dev/null +++ b/heron/api/src/scala/BUILD @@ -0,0 +1,54 @@ +licenses(["notice"]) + +package(default_visibility = ["//visibility:public"]) + +load("//tools/rules:build_defs.bzl", "DOCLINT_HTML_AND_SYNTAX") +load("//tools/rules:javadoc.bzl", "java_doc") + +scala_doc( + name = "heron-api-scaladoc", + libs = [":api-scala"], + pkgs = ["com/twitter/heron"], + title = "Heron Scala Streamlet Documentation", +) + +# Low Level Api +scala_library( + name = "api-scala-low-level", + srcs = glob(["com/twitter/heron/streamlet/*.scala"]), + javacopts = DOCLINT_HTML_AND_SYNTAX, + deps = api_deps_files, +) + +# Functional Api +java_library( + name = "api-java", + srcs = glob(["com/twitter/heron/streamlet/impl/*.java"]), + javacopts = DOCLINT_HTML_AND_SYNTAX, + deps = api_deps_files + [ + ":api-java-low-level", + "//third_party/java:kryo-neverlink", + ], +) + +scala_binary( + name = "api-unshaded", + srcs = glob([ + "com/twitter/heron/streamlet", + "com/twitter/heron/streamlet/impl/*.java", + ]), +) + +jarjar_binary( + name = "streamlet-scala-shaded", + src = ":api-unshaded_deploy.jar", + shade = "shade.conf", + deps = ["@org_sonatype_plugins_jarjar_maven_plugin//jar"], +) + +genrule( + name = "heron-scala-streamlet", + srcs = [":streamlet-scala-shaded"], + outs = ["heron-scala-streamlet.jar"], + cmd = "cp $< $@", +) diff --git a/heron/api/src/scala/com/twitter/heron/streamlet/Builder.scala b/heron/api/src/scala/com/twitter/heron/streamlet/Builder.scala new file mode 100644 index 00000000000..d2935a4e298 --- /dev/null +++ b/heron/api/src/scala/com/twitter/heron/streamlet/Builder.scala @@ -0,0 +1,46 @@ +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// Copyright 2017 Twitter. All rights reserved. +package com.twitter.heron.streamlet.scala + +import com.twitter.heron.streamlet.impl.BuilderImpl +import Builder._ + + +object Builder { + def newBuilder(): Builder = new BuilderImpl() +} + +/** + * Builder is used to register all sources. Builder thus keeps track + * of all the starting points of the computation dag and uses this + * information to build the topology + */ +trait Builder { + + /** + * All sources of the computation should register using addSource. + * @param supplier The supplier function that is used to create the streamlet + */ + def newSource[R](supplier: SerializableSupplier[R]): Streamlet[R] + + /** + * Creates a new Streamlet using the underlying generator + * @param generator The generator that generates the tuples of the streamlet + * @param + * @return + */ + def newSource[R](generator: Source[R]): Streamlet[R] + +} + diff --git a/heron/api/src/scala/com/twitter/heron/streamlet/Config.scala b/heron/api/src/scala/com/twitter/heron/streamlet/Config.scala new file mode 100644 index 00000000000..c0a88a9ef27 --- /dev/null +++ b/heron/api/src/scala/com/twitter/heron/streamlet/Config.scala @@ -0,0 +1,260 @@ +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// Copyright 2017 Twitter. All rights reserved. +package com.twitter.heron.streamlet.scala +import java.io.Serializable +import com.twitter.heron.streamlet.impl.KryoSerializer +import Config._ +import scala.beans.{BeanProperty, BooleanBeanProperty} + + +object Config { + + private val MB: Long = 1024 * 1024 + + private val GB: Long = 1024 * MB + + object DeliverySemantics extends Enumeration { + + val ATMOST_ONCE: DeliverySemantics = new DeliverySemantics() + + val ATLEAST_ONCE: DeliverySemantics = new DeliverySemantics() + + val EFFECTIVELY_ONCE: DeliverySemantics = new DeliverySemantics() + + class DeliverySemantics extends Val + + implicit def convertValue(v: Value): DeliverySemantics = + v.asInstanceOf[DeliverySemantics] + + } + + object Serializer extends Enumeration { + + val JAVA: Serializer = new Serializer() + + val KRYO: Serializer = new Serializer() + + class Serializer extends Val + + implicit def convertValue(v: Value): Serializer = + v.asInstanceOf[Serializer] + + } + + object Defaults { + + val USE_KRYO: Boolean = true + + val CONFIG: com.twitter.heron.api.Config = + new com.twitter.heron.api.Config() + + val CPU: Float = 1.0f + + val RAM: Long = 100 * MB + + val SEMANTICS: DeliverySemantics = DeliverySemantics.ATMOST_ONCE + + val SERIALIZER: Serializer = Serializer.KRYO + + } + + /** + * Sets the topology to use the default configuration: 100 megabytes of RAM per container, 1.0 + * CPUs per container, at-most-once delivery semantics, and the Kryo serializer. + */ + def defaultConfig(): Config = new Builder().build() + + /** + * Returns a new {@link Builder} that can be used to create a configuration object for Streamlet + * API topologies + */ + def newBuilder(): Builder = new Builder() + + private def translateSemantics(semantics: DeliverySemantics) + : com.twitter.heron.api.Config.TopologyReliabilityMode = semantics match { + case ATMOST_ONCE => + com.twitter.heron.api.Config.TopologyReliabilityMode.ATMOST_ONCE + case ATLEAST_ONCE => + com.twitter.heron.api.Config.TopologyReliabilityMode.ATLEAST_ONCE + case EFFECTIVELY_ONCE => + com.twitter.heron.api.Config.TopologyReliabilityMode.EFFECTIVELY_ONCE + case _ => com.twitter.heron.api.Config.TopologyReliabilityMode.ATMOST_ONCE + + } + + class Builder private () { + + private var config: com.twitter.heron.api.Config = Defaults.CONFIG + + private var cpu: Float = Defaults.CPU + + private var ram: Long = Defaults.RAM + + private var deliverySemantics: DeliverySemantics = Defaults.SEMANTICS + + private var serializer: Serializer = Serializer.KRYO + + /** + * Sets the per-container (per-instance) CPU to be used by this topology + * @param perContainerCpu Per-container (per-instance) CPU as a float + */ + def setPerContainerCpu(perContainerCpu: Float): Builder = { + this.cpu = perContainerCpu + this + } + + /** + * Sets the per-container (per-instance) RAM to be used by this topology + * @param perContainerRam Per-container (per-instance) RAM expressed as a Long. + */ + def setPerContainerRam(perContainerRam: Long): Builder = { + this.ram = perContainerRam + this + } + + /** + * Sets the per-container (per-instance) RAM to be used by this topology as a number of bytes + * @param perContainerRam Per-container (per-instance) RAM expressed as a Long. + */ + def setPerContainerRamInBytes(perContainerRam: Long): Builder = { + this.ram = perContainerRam + this + } + + /** + * Sets the per-container (per-instance) RAM to be used by this topology in megabytes + * @param perContainerRamMB Per-container (per-instance) RAM expressed as a Long. + */ + def setPerContainerRamInMegabytes(perContainerRamMB: Long): Builder = { + this.ram = perContainerRamMB * MB + this + } + + /** + * Sets the per-container (per-instance) RAM to be used by this topology in gigabytes + * @param perContainerRamGB Per-container (per-instance) RAM expressed as a Long. + */ + def setPerContainerRamInGigabytes(perContainerRamGB: Long): Builder = { + this.ram = perContainerRamGB * GB + this + } + + /** + * Sets the number of containers to run this topology + * @param numContainers The number of containers across which to distribute this topology + */ + def setNumContainers(numContainers: Int): Builder = { + config.setNumStmgrs(numContainers) + this + } + + /** + * Sets the delivery semantics of the topology + * @param semantics The delivery semantic to be enforced + */ + def setDeliverySemantics(semantics: DeliverySemantics): Builder = { + this.deliverySemantics = semantics + config.setTopologyReliabilityMode(Config.translateSemantics(semantics)) + this + } + + /** + * Sets some user-defined key/value mapping + * @param key The user-defined key + * @param value The user-defined value + */ + def setUserConfig(key: String, value: AnyRef): Builder = { + config.put(key, value) + this + } + + private def useKryo(): Unit = { + config.setSerializationClassName(classOf[KryoSerializer].getName) + } + + /** + * Sets the {@link Serializer} to be used by the topology (current options are {@link + * KryoSerializer} and the native Java serializer. + * @param topologySerializer The data serializer to use for streamlet elements in the topology. + */ + def setSerializer(topologySerializer: Serializer): Builder = { + this.serializer = topologySerializer + this + } + + def build(): Config = { + if (serializer == Serializer.KRYO) { + useKryo() + } + new Config(this) + } + + } + +} + +/** + * Config is the way users configure the execution of the topology. + * Things like streamlet delivery semantics, resources used, as well as + * user-defined key/value pairs are passed on to the topology runner via + * this class. + */ +@SerialVersionUID(6204498077403076352L) +class Config private (builder: Builder) extends Serializable { + + private val cpu: Float = builder.cpu + + private val ram: Long = builder.ram + + @BeanProperty + val deliverySemantics: DeliverySemantics = builder.deliverySemantics + + @BeanProperty + val serializer: Serializer = builder.serializer + + @BeanProperty + var heronConfig: com.twitter.heron.api.Config = builder.config + + /** + * Gets the CPU used per topology container + * @return the per-container CPU as a float + */ + def getPerContainerCpu(): Float = cpu + + /** + * Gets the RAM used per topology container as a number of bytes + * @return the per-container RAM in bytes + */ + def getPerContainerRam(): Long = ram + + /** + * Gets the RAM used per topology container as a number of gigabytes + * @return the per-container RAM in gigabytes + */ + def getPerContainerRamAsGigabytes(): Long = Math.round(ram.toDouble / GB) + + /** + * Gets the RAM used per topology container as a number of megabytes + * @return the per-container RAM in megabytes + */ + def getPerContainerRamAsMegabytes(): Long = Math.round(ram.toDouble / MB) + + /** + * Gets the RAM used per topology container as a number of bytes + * @return the per-container RAM in bytes + */ + def getPerContainerRamAsBytes(): Long = getPerContainerRam + +} + diff --git a/heron/api/src/scala/com/twitter/heron/streamlet/Context.scala b/heron/api/src/scala/com/twitter/heron/streamlet/Context.scala new file mode 100644 index 00000000000..195d9303cf2 --- /dev/null +++ b/heron/api/src/scala/com/twitter/heron/streamlet/Context.scala @@ -0,0 +1,68 @@ +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// Copyright 2017 Twitter. All rights reserved. +package com.twitter.heron.streamlet.scala + +import java.io.Serializable +import java.util.Map +import java.util.function.Supplier +import com.twitter.heron.api.state.State + +/** + * Context is the information available at runtime for operators like transform. + * It contains basic things like config, runtime information like task, + * the stream that it is operating on, ProcessState, etc. + */ +trait Context { + + /** + * Fetches the task id of the current instance of the operator + * @return the task id. + */ + def getTaskId(): Int + + /** + * Fetches the config of the computation + * @return config + */ + def getConfig(): Map[String, Any] + + /** + * The stream name that we are operating on + * @return the stream name that we are operating on + */ + def getStreamName(): String + + /** + * The partition number that we are operating on + * @return the partition number + */ + def getStreamPartition(): Int + + /** + * Register a metric function. This function will be called + * by the system every collectionInterval seconds and the resulting value + * will be collected + */ + def registerMetric[T](metricName: String, + collectionInterval: Int, + metricFn: Supplier[T]): Unit + + /** + * The state where components can store any of their local state + * @return The state interface where users can store their local state + */ + def getState(): State[Serializable, Serializable] + +} + diff --git a/heron/api/src/scala/com/twitter/heron/streamlet/JoinType.scala b/heron/api/src/scala/com/twitter/heron/streamlet/JoinType.scala new file mode 100644 index 00000000000..e2e37e98fd5 --- /dev/null +++ b/heron/api/src/scala/com/twitter/heron/streamlet/JoinType.scala @@ -0,0 +1,30 @@ +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// Copyright 2017 Twitter. All rights reserved. +package com.twitter.heron.streamlet.scala + +object JoinType extends Enumeration { + + val INNER: JoinType = new JoinType() + + val OUTER_LEFT: JoinType = new JoinType() + + val OUTER_RIGHT: JoinType = new JoinType() + + val OUTER: JoinType = new JoinType() + + class JoinType extends Val + + implicit def convertValue(v: Value): JoinType = v.asInstanceOf[JoinType] + +} + diff --git a/heron/api/src/scala/com/twitter/heron/streamlet/KeyValue.scala b/heron/api/src/scala/com/twitter/heron/streamlet/KeyValue.scala new file mode 100644 index 00000000000..660c03da64a --- /dev/null +++ b/heron/api/src/scala/com/twitter/heron/streamlet/KeyValue.scala @@ -0,0 +1,53 @@ +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// Copyright 2016 Twitter. All rights reserved. +package com.twitter.heron.streamlet.scala + +import java.io.Serializable + +import KeyValue._ + +import scala.beans.{BeanProperty, BooleanBeanProperty} + +object KeyValue { + + def create[R, T](k: R, v: T): KeyValue[R, T] = new KeyValue[R, T](k, v) + +} + +/** + * Certain operations in the Streamlet API, like join/reduce, necessitate + * the concept of key value pairs. This file defines a generic KeyValue + * class. We make the KeyValue class serializable to allow it to be + * serialized between components. + */ +@SerialVersionUID(-7120757965590727554L) +class KeyValue[K, V]() extends Serializable { + + @BeanProperty + var key: K = _ + + @BeanProperty + var value: V = _ + + def this(k: K, v: V) = { + this() + this.key = k + this.value = v + } + + override def toString(): String = + "{ " + String.valueOf(key) + " : " + String.valueOf(value) + + " }" + +} diff --git a/heron/api/src/scala/com/twitter/heron/streamlet/KeyedWindow.scala b/heron/api/src/scala/com/twitter/heron/streamlet/KeyedWindow.scala new file mode 100644 index 00000000000..af009af53ca --- /dev/null +++ b/heron/api/src/scala/com/twitter/heron/streamlet/KeyedWindow.scala @@ -0,0 +1,47 @@ +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// Copyright 2017 Twitter. All rights reserved. +package com.twitter.heron.streamlet.scala + +import java.io.Serializable + +import scala.beans.{BeanProperty, BooleanBeanProperty} + +//remove if not needed +import scala.collection.JavaConversions._ + +/** + * Transformation depending on Windowing pass on the window/key information + * using this class + */ +@SerialVersionUID(4193319775040181971L) +class KeyedWindow[T]() extends Serializable { + + @BeanProperty + var key: T = _ + + @BeanProperty + var window: Window = _ + + def this(key: T, window: Window) = { + this() + this.key = key + this.window = window + } + + override def toString(): String = + "{ Key: " + String.valueOf(key) + " Window: " + window.toString + + " }" + +} + diff --git a/heron/api/src/scala/com/twitter/heron/streamlet/Runner.scala b/heron/api/src/scala/com/twitter/heron/streamlet/Runner.scala new file mode 100644 index 00000000000..8a4d744e50a --- /dev/null +++ b/heron/api/src/scala/com/twitter/heron/streamlet/Runner.scala @@ -0,0 +1,48 @@ +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// Copyright 2017 Twitter. All rights reserved. +package com.twitter.heron.streamlet.scala +import com.twitter.heron.api.HeronSubmitter +import com.twitter.heron.api.exception.AlreadyAliveException +import com.twitter.heron.api.exception.InvalidTopologyException +import com.twitter.heron.api.topology.TopologyBuilder +import com.twitter.heron.streamlet.impl.BuilderImpl + + +/** + * Runner is used to run a topology that is built by the builder. + * It exports a sole function called run that takes care of constructing the topology + */ +class Runner { + + /** + * Runs the computation + * @param name The name of the topology + * @param config Any config that is passed to the topology + * @param builder The builder used to keep track of the sources. + */ + def run(name: String, config: Config, builder: Builder): Unit = { + val bldr: BuilderImpl = builder.asInstanceOf[BuilderImpl] + val topologyBuilder: TopologyBuilder = bldr.build() + try HeronSubmitter.submitTopology(name, + config.getHeronConfig, + topologyBuilder.createTopology()) + catch { + case e @ (_: AlreadyAliveException | _: InvalidTopologyException) => + e.printStackTrace() + + } + } + +} + diff --git a/heron/api/src/scala/com/twitter/heron/streamlet/SerializableBiFunction.scala b/heron/api/src/scala/com/twitter/heron/streamlet/SerializableBiFunction.scala new file mode 100644 index 00000000000..ac3d710b562 --- /dev/null +++ b/heron/api/src/scala/com/twitter/heron/streamlet/SerializableBiFunction.scala @@ -0,0 +1,28 @@ +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// Copyright 2017 Twitter. All rights reserved. +package com.twitter.heron.streamlet.scala +import java.io.Serializable +import java.util.function.BiFunction + + +/** + * All user supplied transformation functions have to be serializable. + * Thus all Strealmet transformation definitions take Serializable + * Functions as their input. We simply decorate java.util. function + * definitions with a Serializable tag to ensure that any supplied + * lambda functions automatically become serializable. + */ +trait SerializableBiFunction[A, B, C] + extends BiFunction[A, B, C] + with Serializable diff --git a/heron/api/src/scala/com/twitter/heron/streamlet/SerializableBinaryOperator.scala b/heron/api/src/scala/com/twitter/heron/streamlet/SerializableBinaryOperator.scala new file mode 100644 index 00000000000..a3ca38d1952 --- /dev/null +++ b/heron/api/src/scala/com/twitter/heron/streamlet/SerializableBinaryOperator.scala @@ -0,0 +1,26 @@ +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// Copyright 2017 Twitter. All rights reserved. +package com.twitter.heron.streamlet.scala + +//remove if not needed +import scala.collection.JavaConversions._ + +/** + * All user supplied transformation functions have to be serializable. + * Thus all Strealmet transformation definitions take Serializable + * Functions as their input. We simply decorate java.util. function + * definitions with a Serializable tag to ensure that any supplied + * lambda functions automatically become serializable. + */ +trait SerializableBinaryOperator[T] extends SerializableBiFunction[T, T, T] diff --git a/heron/api/src/scala/com/twitter/heron/streamlet/SerializableConsumer.scala b/heron/api/src/scala/com/twitter/heron/streamlet/SerializableConsumer.scala new file mode 100644 index 00000000000..8ca3013d8de --- /dev/null +++ b/heron/api/src/scala/com/twitter/heron/streamlet/SerializableConsumer.scala @@ -0,0 +1,26 @@ +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// Copyright 2017 Twitter. All rights reserved. +package com.twitter.heron.streamlet.scala +import java.io.Serializable +import java.util.function.Consumer + + +/** + * All user supplied transformation functions have to be serializable. + * Thus all Streamlet transformation definitions take Serializable + * Functions as their input. We simply decorate java.util. function + * definitions with a Serializable tag to ensure that any supplied + * lambda functions automatically become serializable. + */ +trait SerializableConsumer[T] extends Consumer[T] with Serializable diff --git a/heron/api/src/scala/com/twitter/heron/streamlet/SerializableFunction.scala b/heron/api/src/scala/com/twitter/heron/streamlet/SerializableFunction.scala new file mode 100644 index 00000000000..b835449bc04 --- /dev/null +++ b/heron/api/src/scala/com/twitter/heron/streamlet/SerializableFunction.scala @@ -0,0 +1,26 @@ +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// Copyright 2017 Twitter. All rights reserved. +package com.twitter.heron.streamlet.scala +import java.io.Serializable +import java.util.function.Function + +/** + * All user supplied transformation functions have to be serializable. + * Thus all Streamlet transformation definitions take Serializable + * Functions as their input. We simply decorate java.util. function + * definitions with a Serializable tag to ensure that any supplied + * lambda functions automatically become serializable. + */ +trait SerializableFunction[A, B] extends Function[A, B] with Serializable + diff --git a/heron/api/src/scala/com/twitter/heron/streamlet/SerializablePredicate.scala b/heron/api/src/scala/com/twitter/heron/streamlet/SerializablePredicate.scala new file mode 100644 index 00000000000..f6ec396d761 --- /dev/null +++ b/heron/api/src/scala/com/twitter/heron/streamlet/SerializablePredicate.scala @@ -0,0 +1,27 @@ +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Copyright 2017 Twitter. All rights reserved. +package com.twitter.heron.streamlet.scala +import java.io.Serializable +import java.util.function.Predicate + + +/** + * All user supplied transformation functions have to be serializable. + * Thus all Streamlet transformation definitions take Serializable + * Functions as their input. We simply decorate java.util. function + * definitions with a Serializable tag to ensure that any supplied + * lambda functions automatically become serializable. + */ +trait SerializablePredicate[T] extends Predicate[T] with Serializable diff --git a/heron/api/src/scala/com/twitter/heron/streamlet/SerializableSupplier.scala b/heron/api/src/scala/com/twitter/heron/streamlet/SerializableSupplier.scala new file mode 100644 index 00000000000..f89081b1770 --- /dev/null +++ b/heron/api/src/scala/com/twitter/heron/streamlet/SerializableSupplier.scala @@ -0,0 +1,27 @@ +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// Copyright 2017 Twitter. All rights reserved. +package com.twitter.heron.streamlet.scala +import java.io.Serializable +import java.util.function.Supplier + + +/** + * All user supplied transformation functions have to be serializable. + * Thus all Streamlet transformation definitions take Serializable + * Functions as their input. We simply decorate java.util. function + * definitions with a Serializable tag to ensure that any supplied + * lambda functions automatically become serializable. + */ +trait SerializableSupplier[T] extends Supplier[T] with Serializable + diff --git a/heron/api/src/scala/com/twitter/heron/streamlet/SerializableTransformer.scala b/heron/api/src/scala/com/twitter/heron/streamlet/SerializableTransformer.scala new file mode 100644 index 00000000000..fd8c2db10e9 --- /dev/null +++ b/heron/api/src/scala/com/twitter/heron/streamlet/SerializableTransformer.scala @@ -0,0 +1,34 @@ +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// Copyright 2017 Twitter. All rights reserved. +package com.twitter.heron.streamlet.scala +import java.io.Serializable +import java.util.function.Consumer + + +/** + * All user supplied transformation functions have to be serializable. + * Thus all Strealmet transformation definitions take Serializable + * Functions as their input. We simply decorate java.util. function + * definitions with a Serializable tag to ensure that any supplied + * lambda functions automatically become serializable. + */ +trait SerializableTransformer[I, O] extends Serializable { + + def setup(context: Context): Unit + + def transform(i: I, consumer: Consumer[O]): Unit + + def cleanup(): Unit + +} diff --git a/heron/api/src/scala/com/twitter/heron/streamlet/Sink.scala b/heron/api/src/scala/com/twitter/heron/streamlet/Sink.scala new file mode 100644 index 00000000000..cfcea890b43 --- /dev/null +++ b/heron/api/src/scala/com/twitter/heron/streamlet/Sink.scala @@ -0,0 +1,32 @@ +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// Copyright 2017 Twitter. All rights reserved. +package com.twitter.heron.streamlet.scala +import java.io.Serializable + + +/** + * Sink is how Streamlet's end. The put method + * invocation consumes the tuple into say external database/cache, etc. + * setup/cleanup is where the sink can do any one time setup work, like + * establishing/closing connection to sources, etc. + */ +trait Sink[T] extends Serializable { + + def setup(context: Context): Unit + + def put(tuple: T): Unit + + def cleanup(): Unit + +} diff --git a/heron/api/src/scala/com/twitter/heron/streamlet/Source.scala b/heron/api/src/scala/com/twitter/heron/streamlet/Source.scala new file mode 100644 index 00000000000..285641c4123 --- /dev/null +++ b/heron/api/src/scala/com/twitter/heron/streamlet/Source.scala @@ -0,0 +1,37 @@ +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// Copyright 2017 Twitter. All rights reserved. +package com.twitter.heron.streamlet.scala + +import java.io.Serializable + +import java.util.Collection + +//remove if not needed +import scala.collection.JavaConversions._ + +/** + * Source is how Streamlet's originate. The get method + * invocation returns new element that form the tuples of the streamlet. + * setup/cleanup is where the generator can do any one time setup work, like + * establishing/closing connection to sources, etc. + */ +trait Source[T] extends Serializable { + + def setup(context: Context): Unit + + def get(): Collection[T] + + def cleanup(): Unit + +} diff --git a/heron/api/src/scala/com/twitter/heron/streamlet/Streamlet.scala b/heron/api/src/scala/com/twitter/heron/streamlet/Streamlet.scala new file mode 100644 index 00000000000..fba971c1f48 --- /dev/null +++ b/heron/api/src/scala/com/twitter/heron/streamlet/Streamlet.scala @@ -0,0 +1,231 @@ +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// Copyright 2016 Twitter. All rights reserved. +package com.twitter.heron.streamlet.scala +import java.util.List +import com.twitter.heron.classification.InterfaceStability + + +/** + * A Streamlet is a (potentially unbounded) ordered collection of tuples. + * Streamlets originate from pub/sub systems(such Pulsar/Kafka), or from + * static data(such as csv files, HDFS files), or for that matter any other + * source. They are also created by transforming existing Streamlets using + * operations such as map/flatMap, etc. + * Besides the tuples, a Streamlet has the following properties associated with it + * a) name. User assigned or system generated name to refer the streamlet + * b) nPartitions. Number of partitions that the streamlet is composed of. Thus the + * ordering of the tuples in a Streamlet is wrt the tuples within a partition. + * This allows the system to distribute each partition to different nodes across the cluster. + * A bunch of transformations can be done on Streamlets(like map/flatMap, etc.). Each + * of these transformations operate on every tuple of the Streamlet and produce a new + * Streamlet. One can think of a transformation attaching itself to the stream and processing + * each tuple as they go by. Thus the parallelism of any operator is implicitly determined + * by the number of partitions of the stream that it is operating on. If a particular + * transformation wants to operate at a different parallelism, one can repartition the + * Streamlet before doing the transformation. + */ +@InterfaceStability.Evolving +trait Streamlet[R] { + + /** + * Sets the name of the BaseStreamlet. + * @param sName The name given by the user for this BaseStreamlet + * @return Returns back the Streamlet with changed name + */ + def setName(sName: String): Streamlet[R] + + /** + * Gets the name of the Streamlet. + * @return Returns the name of the Streamlet + */ + def getName(): String + + /** + * Sets the number of partitions of the streamlet + * @param numPartitions The user assigned number of partitions + * @return Returns back the Streamlet with changed number of partitions + */ + def setNumPartitions(numPartitions: Int): Streamlet[R] + + /** + * Gets the number of partitions of this Streamlet. + * @return the number of partitions of this Streamlet + */ + def getNumPartitions(): Int + + /** + * Return a new Streamlet by applying mapFn to each element of this Streamlet + * @param mapFn The Map Function that should be applied to each element + */ + def map[T](mapFn: SerializableFunction[R, _ <: T]): Streamlet[T] + + /** + * Return a new Streamlet by applying flatMapFn to each element of this Streamlet and + * flattening the result + * @param flatMapFn The FlatMap Function that should be applied to each element + */ + def flatMap[T]( + flatMapFn: SerializableFunction[R, _ <: java.lang.Iterable[_ <: T]]) + : Streamlet[T] + + /** + * Return a new Streamlet by applying the filterFn on each element of this streamlet + * and including only those elements that satisfy the filterFn + * @param filterFn The filter Function that should be applied to each element + */ + def filter(filterFn: SerializablePredicate[R]): Streamlet[R] + + /** + * Same as filter(filterFn).setNumPartitions(nPartitions) where filterFn is identity + */ + def repartition(numPartitions: Int): Streamlet[R] + + /** + * A more generalized version of repartition where a user can determine which partitions + * any particular tuple should go to. For each element of the current streamlet, the user + * supplied partitionFn is invoked passing in the element as the first argument. The second + * argument is the number of partitions of the downstream streamlet. The partitionFn should + * return 0 or more unique numbers between 0 and npartitions to indicate which partitions + * this element should be routed to. + */ + def repartition( + numPartitions: Int, + partitionFn: SerializableBiFunction[R, Integer, List[Integer]]) + : Streamlet[R] + + /** + * Clones the current Streamlet. It returns an array of numClones Streamlets where each + * Streamlet contains all the tuples of the current Streamlet + * @param numClones The number of clones to clone + */ + def clone(numClones: Int): List[Streamlet[R]] + + /** + * Return a new Streamlet by inner joining 'this streamlet with ‘other’ streamlet. + * The join is done over elements accumulated over a time window defined by windowCfg. + * The elements are compared using the thisKeyExtractor for this streamlet with the + * otherKeyExtractor for the other streamlet. On each matching pair, the joinFunction is applied. + * @param other The Streamlet that we are joining with. + * @param thisKeyExtractor The function applied to a tuple of this streamlet to get the key + * @param otherKeyExtractor The function applied to a tuple of the other streamlet to get the key + * @param windowCfg This is a specification of what kind of windowing strategy you like to + * have. Typical windowing strategies are sliding windows and tumbling windows + * @param joinFunction The join function that needs to be applied + */ + def join[K, S, T](other: Streamlet[S], + thisKeyExtractor: SerializableFunction[R, K], + otherKeyExtractor: SerializableFunction[S, K], + windowCfg: WindowConfig, + joinFunction: SerializableBiFunction[R, S, _ <: T]) + : Streamlet[KeyValue[KeyedWindow[K], T]] + + /** + * Return a new KVStreamlet by joining 'this streamlet with ‘other’ streamlet. The type of joining + * is declared by the joinType parameter. + * The join is done over elements accumulated over a time window defined by windowCfg. + * The elements are compared using the thisKeyExtractor for this streamlet with the + * otherKeyExtractor for the other streamlet. On each matching pair, the joinFunction is applied. + * Types of joins {@link JoinType} + * @param other The Streamlet that we are joining with. + * @param thisKeyExtractor The function applied to a tuple of this streamlet to get the key + * @param otherKeyExtractor The function applied to a tuple of the other streamlet to get the key + * @param windowCfg This is a specification of what kind of windowing strategy you like to + * have. Typical windowing strategies are sliding windows and tumbling windows + * @param joinType Type of Join. Options {@link JoinType} + * @param joinFunction The join function that needs to be applied + */ + def join[K, S, T](other: Streamlet[S], + thisKeyExtractor: SerializableFunction[R, K], + otherKeyExtractor: SerializableFunction[S, K], + windowCfg: WindowConfig, + joinType: JoinType, + joinFunction: SerializableBiFunction[R, S, _ <: T]) + : Streamlet[KeyValue[KeyedWindow[K], T]] + + /** + * Return a new Streamlet accumulating tuples of this streamlet over a Window defined by + * windowCfg and applying reduceFn on those tuples. + * @param keyExtractor The function applied to a tuple of this streamlet to get the key + * @param valueExtractor The function applied to a tuple of this streamlet to extract the value + * to be reduced on + * @param windowCfg This is a specification of what kind of windowing strategy you like to have. + * Typical windowing strategies are sliding windows and tumbling windows + * @param reduceFn The reduce function that you want to apply to all the values of a key. + */ + def reduceByKeyAndWindow[K, V](keyExtractor: SerializableFunction[R, K], + valueExtractor: SerializableFunction[R, V], + windowCfg: WindowConfig, + reduceFn: SerializableBinaryOperator[V]) + : Streamlet[KeyValue[KeyedWindow[K], V]] + + /** + * Return a new Streamlet accumulating tuples of this streamlet over a Window defined by + * windowCfg and applying reduceFn on those tuples. For each window, the value identity is used + * as a initial value. All the matching tuples are reduced using reduceFn startin from this + * initial value. + * @param keyExtractor The function applied to a tuple of this streamlet to get the key + * @param windowCfg This is a specification of what kind of windowing strategy you like to have. + * Typical windowing strategies are sliding windows and tumbling windows + * @param identity The identity element is both the initial value inside the reduction window + * and the default result if there are no elements in the window + * @param reduceFn The reduce function takes two parameters: a partial result of the reduction + * and the next element of the stream. It returns a new partial result. + */ + def reduceByKeyAndWindow[K, T]( + keyExtractor: SerializableFunction[R, K], + windowCfg: WindowConfig, + identity: T, + reduceFn: SerializableBiFunction[T, R, _ <: T]) + : Streamlet[KeyValue[KeyedWindow[K], T]] + + /** + * Returns a new Streamlet that is the union of this and the ‘other’ streamlet. Essentially + * the new streamlet will contain tuples belonging to both Streamlets + */ + def union(other: Streamlet[_ <: R]): Streamlet[R] + + /** + * Returns a new Streamlet by applying the transformFunction on each element of this streamlet. + * Before starting to cycle the transformFunction over the Streamlet, the open function is called. + * This allows the transform Function to do any kind of initialization/loading, etc. + * @param serializableTransformer The transformation function to be applied + * @param The return type of the transform + * @return Streamlet containing the output of the transformFunction + */ + def transform[T](serializableTransformer: SerializableTransformer[R, _ <: T]) + : Streamlet[T] + + /** + * Logs every element of the streamlet using String.valueOf function + * This is one of the sink functions in the sense that this operation returns void + */ + def log(): Unit + + /** + * Applies the consumer function to every element of the stream + * This function does not return anything. + * @param consumer The user supplied consumer function that is invoked for each element + * of this streamlet. + */ + def consume(consumer: SerializableConsumer[R]): Unit + + /** + * Applies the sink's put function to every element of the stream + * This function does not return anything. + * @param sink The Sink whose put method consumes each element + * of this streamlet. + */ + def toSink(sink: Sink[R]): Unit + +} diff --git a/heron/api/src/scala/com/twitter/heron/streamlet/Window.scala b/heron/api/src/scala/com/twitter/heron/streamlet/Window.scala new file mode 100644 index 00000000000..dcdb5f13aab --- /dev/null +++ b/heron/api/src/scala/com/twitter/heron/streamlet/Window.scala @@ -0,0 +1,53 @@ +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// Copyright 2017 Twitter. All rights reserved. +package com.twitter.heron.streamlet.scala +import java.io.Serializable +import scala.beans.{BeanProperty, BooleanBeanProperty} + + +/** + * Window is a container containing information about a particular window. + * Transformations that depend on Windowing, pass the window information + * inside their streamlets using this container. + */ +@SerialVersionUID(5103471810104775854L) +class Window() extends Serializable { + + private var startTimeMs: Long = _ + + private var endTimeMs: Long = _ + + @BeanProperty + var count: Long = _ + + def this(startTimeMs: Long, endTimeMs: Long, count: Long) = { + this() + this.startTimeMs = startTimeMs + this.endTimeMs = endTimeMs + this.count = count + } + + def getStartTime(): Long = startTimeMs + + def getEndTime(): Long = endTimeMs + + override def toString(): String = + "{WindowStart: " + String.valueOf(startTimeMs) + " WindowEnd: " + + String.valueOf(endTimeMs) + + " Count: " + + String.valueOf(count) + + " }" + +} + diff --git a/heron/api/src/scala/com/twitter/heron/streamlet/WindowConfig.scala b/heron/api/src/scala/com/twitter/heron/streamlet/WindowConfig.scala new file mode 100644 index 00000000000..f9fdf877b26 --- /dev/null +++ b/heron/api/src/scala/com/twitter/heron/streamlet/WindowConfig.scala @@ -0,0 +1,71 @@ +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// Copyright 2017 Twitter. All rights reserved. +package com.twitter.heron.streamlet.scala +import java.time.Duration +import com.twitter.heron.api.tuple.Tuple +import com.twitter.heron.api.windowing.EvictionPolicy +import com.twitter.heron.api.windowing.TriggerPolicy +import com.twitter.heron.streamlet.impl.WindowConfigImpl + +object WindowConfig { + + /** + * Creates a time based tumbling window of windowDuration + * @param windowDuration the duration of the tumbling window + * @return WindowConfig that can be passed to the transformation + */ + def TumblingTimeWindow(windowDuration: Duration): WindowConfig = + new WindowConfigImpl(windowDuration, windowDuration) + + /** + * Creates a time based sliding window with windowDuration as the window duration + * and slideInterval as slideInterval + * @param windowDuration The Sliding Window duration + * @param slideInterval The sliding duration + * @return WindowConfig that can be passed to the transformation + */ + def SlidingTimeWindow(windowDuration: Duration, + slideInterval: Duration): WindowConfig = + new WindowConfigImpl(windowDuration, slideInterval) + + /** + * Creates a count based tumbling window of size windowSize + * @param windowSize the size of the tumbling window + * @return WindowConfig that can be passed to the transformation + */ + def TumblingCountWindow(windowSize: Int): WindowConfig = + new WindowConfigImpl(windowSize, windowSize) + + /** + * Creates a count based sliding window with windowSize as the window countsize + * and slideSize as slide size + * @param windowSize The Window Count Size + * @param slideSize The slide size + * @return WindowConfig that can be passed to the transformation + */ + def SlidingCountWindow(windowSize: Int, slideSize: Int): WindowConfig = + new WindowConfigImpl(windowSize, slideSize) + + /** + * Creates a window based on the provided custom trigger and eviction policies + * @param triggerPolicy The trigger policy to use + * @param evictionPolicy The eviction policy to use + * @return WindowConfig that can be passed to the transformation + */ + def CustomWindow(triggerPolicy: TriggerPolicy[Tuple, _], + evictionPolicy: EvictionPolicy[Tuple, _]): WindowConfig = + new WindowConfigImpl(triggerPolicy, evictionPolicy) + +} +