Skip to content
This repository has been archived by the owner on Mar 3, 2023. It is now read-only.

Scalaapi #2673

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
52 changes: 52 additions & 0 deletions heron/api/src/scala/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
licenses(["notice"])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

General BUILD Comment:
*.scala files are not referenced here. Bazel supports Scala via
https://github.com/bazelbuild/rules_scala
so i think we need to use scala_library to compile scala files by referencing
com/twitter/heron/streamlet/scala/**/*.scala as src.


package(default_visibility = ["//visibility:public"])

load("//tools/rules:build_defs.bzl", "DOCLINT_HTML_AND_SYNTAX")
load("//tools/rules:javadoc.bzl", "java_doc")

java_doc(
name = "heron-api-scaladoc",
libs = [":api-scala"],
pkgs = ["com/twitter/heron"],
title = "Heron Scala Streamlet Documentation",
)


# Low Level Api
java_library(
name = "api-java-low-level",
srcs = glob(["com/twitter/heron/api/**/*.java"]),
javacopts = DOCLINT_HTML_AND_SYNTAX,
deps = api_deps_files,
)

# Functional Api
java_library(
name = "api-java",
srcs = glob(["com/twitter/heron/streamlet/impl/*.java"]),
javacopts = DOCLINT_HTML_AND_SYNTAX,
deps = api_deps_files + [
":api-java-low-level",
"//third_party/java:kryo-neverlink",
]
)

java_binary(
name = "api-unshaded",
srcs = glob(["com/twitter/heron/streamlet/scala", "com/twitter/heron/streamlet/impl/*.java"])
)

jarjar_binary(
name = "streamlet-scala-shaded",
src = ":api-unshaded_deploy.jar",
shade = "shade.conf",
deps = ["@org_sonatype_plugins_jarjar_maven_plugin//jar"]
)

genrule(
name = "heron-scala-streamlet",
srcs = [":streamlet-scala-shaded"],
outs = ["heron-scala-streamlet.jar"],
cmd = "cp $< $@",
)
46 changes: 46 additions & 0 deletions heron/api/src/scala/Builder.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
//
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

General Comment:
Currently, All files look living under heron/api/src/scala.

  • Related *.scala files should be under package: heron/api/src/scala/com/twitter/heron/streamlet/scala (except BUILD file)
    e.g: Currently, Java Streamlet API lives under heron/api/src/java/com/twitter/heron/streamlet

// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Copyright 2017 Twitter. All rights reserved.
package com.twitter.heron.streamlet.scala

import com.twitter.heron.streamlet.impl.BuilderImpl
import Builder._


object Builder {
def newBuilder(): Builder = new BuilderImpl()
}

/**
* Builder is used to register all sources. Builder thus keeps track
* of all the starting points of the computation dag and uses this
* information to build the topology
*/
trait Builder {

/**
* All sources of the computation should register using addSource.
* @param supplier The supplier function that is used to create the streamlet
*/
def newSource[R](supplier: SerializableSupplier[R]): Streamlet[R]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the light of Scala User perspective and in my opinion, All exposed Scala Traits should be pure scala. This means Scala traits should accept Scala Functions instead of FunctionalInterface here:

def newSource[R](supplier: () => R): Streamlet[R]

instead of

def newSource[R](supplier: SerializableSupplier[R]): Streamlet[R]


/**
* Creates a new Streamlet using the underlying generator
* @param generator The generator that generates the tuples of the streamlet
* @param <R>
* @return
*/
def newSource[R](generator: Source[R]): Streamlet[R]

}

260 changes: 260 additions & 0 deletions heron/api/src/scala/Config.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,260 @@
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Copyright 2017 Twitter. All rights reserved.
package com.twitter.heron.streamlet.scala
import java.io.Serializable
import com.twitter.heron.streamlet.impl.KryoSerializer
import Config._
import scala.beans.{BeanProperty, BooleanBeanProperty}


object Config {

private val MB: Long = 1024 * 1024

private val GB: Long = 1024 * MB

object DeliverySemantics extends Enumeration {

val ATMOST_ONCE: DeliverySemantics = new DeliverySemantics()

val ATLEAST_ONCE: DeliverySemantics = new DeliverySemantics()

val EFFECTIVELY_ONCE: DeliverySemantics = new DeliverySemantics()

class DeliverySemantics extends Val

implicit def convertValue(v: Value): DeliverySemantics =
v.asInstanceOf[DeliverySemantics]

}

object Serializer extends Enumeration {

val JAVA: Serializer = new Serializer()

val KRYO: Serializer = new Serializer()

class Serializer extends Val

implicit def convertValue(v: Value): Serializer =
v.asInstanceOf[Serializer]

}

object Defaults {

val USE_KRYO: Boolean = true

val CONFIG: com.twitter.heron.api.Config =
new com.twitter.heron.api.Config()

val CPU: Float = 1.0f

val RAM: Long = 100 * MB

val SEMANTICS: DeliverySemantics = DeliverySemantics.ATMOST_ONCE

val SERIALIZER: Serializer = Serializer.KRYO

}

/**
* Sets the topology to use the default configuration: 100 megabytes of RAM per container, 1.0
* CPUs per container, at-most-once delivery semantics, and the Kryo serializer.
*/
def defaultConfig(): Config = new Builder().build()

/**
* Returns a new {@link Builder} that can be used to create a configuration object for Streamlet
* API topologies
*/
def newBuilder(): Builder = new Builder()

private def translateSemantics(semantics: DeliverySemantics)
: com.twitter.heron.api.Config.TopologyReliabilityMode = semantics match {
case ATMOST_ONCE =>
com.twitter.heron.api.Config.TopologyReliabilityMode.ATMOST_ONCE
case ATLEAST_ONCE =>
com.twitter.heron.api.Config.TopologyReliabilityMode.ATLEAST_ONCE
case EFFECTIVELY_ONCE =>
com.twitter.heron.api.Config.TopologyReliabilityMode.EFFECTIVELY_ONCE
case _ => com.twitter.heron.api.Config.TopologyReliabilityMode.ATMOST_ONCE

}

class Builder private () {

private var config: com.twitter.heron.api.Config = Defaults.CONFIG

private var cpu: Float = Defaults.CPU

private var ram: Long = Defaults.RAM

private var deliverySemantics: DeliverySemantics = Defaults.SEMANTICS

private var serializer: Serializer = Serializer.KRYO

/**
* Sets the per-container (per-instance) CPU to be used by this topology
* @param perContainerCpu Per-container (per-instance) CPU as a float
*/
def setPerContainerCpu(perContainerCpu: Float): Builder = {
this.cpu = perContainerCpu
this
}

/**
* Sets the per-container (per-instance) RAM to be used by this topology
* @param perContainerRam Per-container (per-instance) RAM expressed as a Long.
*/
def setPerContainerRam(perContainerRam: Long): Builder = {
this.ram = perContainerRam
this
}

/**
* Sets the per-container (per-instance) RAM to be used by this topology as a number of bytes
* @param perContainerRam Per-container (per-instance) RAM expressed as a Long.
*/
def setPerContainerRamInBytes(perContainerRam: Long): Builder = {
this.ram = perContainerRam
this
}

/**
* Sets the per-container (per-instance) RAM to be used by this topology in megabytes
* @param perContainerRamMB Per-container (per-instance) RAM expressed as a Long.
*/
def setPerContainerRamInMegabytes(perContainerRamMB: Long): Builder = {
this.ram = perContainerRamMB * MB
this
}

/**
* Sets the per-container (per-instance) RAM to be used by this topology in gigabytes
* @param perContainerRamGB Per-container (per-instance) RAM expressed as a Long.
*/
def setPerContainerRamInGigabytes(perContainerRamGB: Long): Builder = {
this.ram = perContainerRamGB * GB
this
}

/**
* Sets the number of containers to run this topology
* @param numContainers The number of containers across which to distribute this topology
*/
def setNumContainers(numContainers: Int): Builder = {
config.setNumStmgrs(numContainers)
this
}

/**
* Sets the delivery semantics of the topology
* @param semantics The delivery semantic to be enforced
*/
def setDeliverySemantics(semantics: DeliverySemantics): Builder = {
this.deliverySemantics = semantics
config.setTopologyReliabilityMode(Config.translateSemantics(semantics))
this
}

/**
* Sets some user-defined key/value mapping
* @param key The user-defined key
* @param value The user-defined value
*/
def setUserConfig(key: String, value: AnyRef): Builder = {
config.put(key, value)
this
}

private def useKryo(): Unit = {
config.setSerializationClassName(classOf[KryoSerializer].getName)
}

/**
* Sets the {@link Serializer} to be used by the topology (current options are {@link
* KryoSerializer} and the native Java serializer.
* @param topologySerializer The data serializer to use for streamlet elements in the topology.
*/
def setSerializer(topologySerializer: Serializer): Builder = {
this.serializer = topologySerializer
this
}

def build(): Config = {
if (serializer == Serializer.KRYO) {
useKryo()
}
new Config(this)
}

}

}

/**
* Config is the way users configure the execution of the topology.
* Things like streamlet delivery semantics, resources used, as well as
* user-defined key/value pairs are passed on to the topology runner via
* this class.
*/
@SerialVersionUID(6204498077403076352L)
class Config private (builder: Builder) extends Serializable {

private val cpu: Float = builder.cpu

private val ram: Long = builder.ram

@BeanProperty
val deliverySemantics: DeliverySemantics = builder.deliverySemantics

@BeanProperty
val serializer: Serializer = builder.serializer

@BeanProperty
var heronConfig: com.twitter.heron.api.Config = builder.config

/**
* Gets the CPU used per topology container
* @return the per-container CPU as a float
*/
def getPerContainerCpu(): Float = cpu

/**
* Gets the RAM used per topology container as a number of bytes
* @return the per-container RAM in bytes
*/
def getPerContainerRam(): Long = ram

/**
* Gets the RAM used per topology container as a number of gigabytes
* @return the per-container RAM in gigabytes
*/
def getPerContainerRamAsGigabytes(): Long = Math.round(ram.toDouble / GB)

/**
* Gets the RAM used per topology container as a number of megabytes
* @return the per-container RAM in megabytes
*/
def getPerContainerRamAsMegabytes(): Long = Math.round(ram.toDouble / MB)

/**
* Gets the RAM used per topology container as a number of bytes
* @return the per-container RAM in bytes
*/
def getPerContainerRamAsBytes(): Long = getPerContainerRam

}

Loading