Skip to content

Commit

Permalink
automatic formatting with scalafmt
Browse files Browse the repository at this point in the history
  • Loading branch information
barend-xebia committed Dec 3, 2024
1 parent 9f23a1a commit c8432cd
Show file tree
Hide file tree
Showing 14 changed files with 146 additions and 113 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ jobs:
shell: bash
run: |
sbt +test
- name: Verify formatting
shell: bash
run: |
sbt +scalafmtCheckAll
- name: Verify assembly
shell: bash
run: |
Expand Down
12 changes: 12 additions & 0 deletions .scalafmt.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# https://central.sonatype.com/artifact/org.scalameta/scalafmt-core_2.13/versions
version = 3.8.4-RC3
runner.dialect = scala213

preset = IntelliJ
maxColumn = 120
lineEndings = unix

rewrite {
rules = ["imports"]
imports.sort = scalastyle
}
1 change: 1 addition & 0 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
addSbtPlugin("com.github.sbt" % "sbt-dynver" % "5.0.1")
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.2.0")
addSbtPlugin("com.eed3si9n" % "sbt-projectmatrix" % "0.10.0")
addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.5.2")
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,22 @@ import org.slf4j.LoggerFactory

import java.util.concurrent.TimeUnit


/**
* Uses OpenTelemetry Autoconfigure to build an OpenTelemetry SDK.
*
* Any SparkConf properties that start with `spark.otel` (such as `spark.otel.service.name`) are exposed as JVM system
* properties (sans `spark.` prefix). This allows otel configuration (see link below) to be included as `--conf` args
* to spark-submit.
*
* To configure the autoconf SDK, see [[https://opentelemetry.io/docs/languages/java/configuration/]]. If you're on
* Kubernetes, have a look at the OpenTelemetry Operator.
*/
/** Uses OpenTelemetry Autoconfigure to build an OpenTelemetry SDK.
*
* Any SparkConf properties that start with `spark.otel` (such as `spark.otel.service.name`) are exposed as JVM system
* properties (sans `spark.` prefix). This allows otel configuration (see link below) to be included as `--conf` args
* to spark-submit.
*
* To configure the autoconf SDK, see [[https://opentelemetry.io/docs/languages/java/configuration/]]. If you're on
* Kubernetes, have a look at the OpenTelemetry Operator.
*/
class SdkProvider extends OpenTelemetrySdkProvider {
private val logger = LoggerFactory.getLogger(classOf[SdkProvider])

override def get(config: Map[String, String]): OpenTelemetrySdk = {
logger.info("Using AutoConfigured OpenTelemetry SDK.")
config.foreach {
case (k,v) if k.startsWith("spark.otel") =>
case (k, v) if k.startsWith("spark.otel") =>
val otelProperty = k.substring(6)
sys.props.get(otelProperty) match {
case Some(old) =>
Expand All @@ -43,7 +41,7 @@ class SdkProvider extends OpenTelemetrySdkProvider {
completion.whenComplete(() => {
completion.getFailureThrowable match {
case e: Throwable => logger.warn(s"OpenTelemetry SDK shut down with Exception: ${e.toString}", e)
case _ => logger.info("OpenTelemetry SDK shut down successfully.")
case _ => logger.info("OpenTelemetry SDK shut down successfully.")
}
})
completion.join(1, TimeUnit.MINUTES)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,5 @@ class AutoconfiguredOpenTelemetrySdkProviderTest extends AnyFlatSpec with should
}

private[this] class TestOpenTelemetrySupport extends OpenTelemetrySupport {
override def spotConfig: Map[String, String] = Map(
"spark.otel.service.name" -> "this is a test"
)
override def spotConfig: Map[String, String] = Map("spark.otel.service.name" -> "this is a test")
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@ import io.opentelemetry.context.propagation.TextMapGetter
import java.lang
import scala.collection.JavaConverters._

/**
* Bridges between Spark config and OpenTelemetry's context propagator system.
*/
/** Bridges between Spark config and OpenTelemetry's context propagator system.
*/
class GetContextFromConfig extends TextMapGetter[Map[String, String]] {
override def keys(carrier: Map[String, String]): lang.Iterable[String] = carrier.keys
.filter(_.startsWith(SPOT_CONFIG_PREFIX))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@ package com.xebia.data.spot

import io.opentelemetry.api.OpenTelemetry

/**
* Enables spot to obtain an OpenTelemetry SDK instance.
*/
/** Enables spot to obtain an OpenTelemetry SDK instance.
*/
trait OpenTelemetrySdkProvider {
/**
* Returns an instance of [[OpenTelemetry]].
*
* @param config all SparkConf values.
* @return an instance of [[OpenTelemetry]].
*/

/** Returns an instance of [[OpenTelemetry]].
*
* @param config
* all SparkConf values.
* @return
* an instance of [[OpenTelemetry]].
*/
def get(config: Map[String, String]): OpenTelemetry
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,12 @@ import io.opentelemetry.api.OpenTelemetry
import io.opentelemetry.api.trace.Tracer
import org.slf4j.{Logger, LoggerFactory}

/**
* Grants access to an OpenTelemetry instance.
*
* If no configuration is provided, this attempts to load the spot.autoconf.SdkProvider, which is defined in the "spot-
* complete" subproject. If the configuration contains a value for the key 'spark.com.xebia.data.spot.sdkProvider', it
* attempts to load the class indicated by that value.
*/
/** Grants access to an OpenTelemetry instance.
*
* If no configuration is provided, this attempts to load the spot.autoconf.SdkProvider, which is defined in the "spot-
* complete" subproject. If the configuration contains a value for the key 'spark.com.xebia.data.spot.sdkProvider', it
* attempts to load the class indicated by that value.
*/
trait OpenTelemetrySupport {
import OpenTelemetrySupport.logger

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,9 @@ import io.opentelemetry.api.common.AttributeKey

import java.lang

/**
* Precomposed attribute keys. These aren't necessary in most cases (the `setAttribute` function has an overload that
* takes a string), but I think it's useful to have them all in one overview.
*/
/** Precomposed attribute keys. These aren't necessary in most cases (the `setAttribute` function has an overload that
* takes a string), but I think it's useful to have them all in one overview.
*/
object TelemetrySpanAttributes {
import AttributeKey._
val appAttemptId: AttributeKey[String] = stringKey("spark.appAttemptId")
Expand Down
113 changes: 62 additions & 51 deletions spot/src/main/scala/com/xebia/data/spot/TelemetrySparkListener.scala
Original file line number Diff line number Diff line change
@@ -1,26 +1,35 @@
package com.xebia.data.spot

import io.opentelemetry.api.common.{Attributes, AttributeKey}
import io.opentelemetry.api.common.{AttributeKey, Attributes}
import io.opentelemetry.api.trace.{Span, StatusCode}
import io.opentelemetry.context.{Context, Scope}
import org.apache.spark.SparkConf
import org.apache.spark.scheduler.{JobFailed, JobSucceeded, SparkListener, SparkListenerApplicationEnd, SparkListenerApplicationStart, SparkListenerJobEnd, SparkListenerJobStart, SparkListenerStageCompleted}
import org.apache.spark.scheduler.{
JobFailed,
JobSucceeded,
SparkListener,
SparkListenerApplicationEnd,
SparkListenerApplicationStart,
SparkListenerJobEnd,
SparkListenerJobStart,
SparkListenerStageCompleted
}
import org.slf4j.{Logger, LoggerFactory}

import scala.collection.mutable

/**
* A SparkListener that publishes job telemetry to OpenTelemetry.
*
* Usage:
* {{{
* spark-submit \
* --conf=spark.extraListeners=com.xebia.data.spot.TelemetrySparkListener \
* com.example.MySparkJob
* }}}
*
* @param sparkConf the `SparkConf`. This is provided automatically by the Spark application as it bootstraps.
*/
/** A SparkListener that publishes job telemetry to OpenTelemetry.
*
* Usage:
* {{{
* spark-submit \
* --conf=spark.extraListeners=com.xebia.data.spot.TelemetrySparkListener \
* com.example.MySparkJob
* }}}
*
* @param sparkConf
* the `SparkConf`. This is provided automatically by the Spark application as it bootstraps.
*/
class TelemetrySparkListener(val sparkConf: SparkConf) extends SparkListener with OpenTelemetrySupport {
import com.xebia.data.spot.TelemetrySparkListener.{PendingContext, PendingSpan}
import com.xebia.data.spot.{TelemetrySpanAttributes => atts}
Expand All @@ -37,13 +46,15 @@ class TelemetrySparkListener(val sparkConf: SparkConf) extends SparkListener wit

lazy val rootContext: PendingContext = {
logger.info(s"Find rootcontext; config is ${spotConfig}")
val rc = openTelemetry.getPropagators.getTextMapPropagator.extract(Context.root(), spotConfig, new GetContextFromConfig())
val rc =
openTelemetry.getPropagators.getTextMapPropagator.extract(Context.root(), spotConfig, new GetContextFromConfig())
val scope = rc.makeCurrent()
(rc, scope)
}

override def onApplicationStart(event: SparkListenerApplicationStart): Unit = {
val sb = tracer.spanBuilder(s"application-${event.appName}")
val sb = tracer
.spanBuilder(s"application-${event.appName}")
.setParent(rootContext._1)
.setAttribute(atts.appName, event.appName)
.setAttribute(atts.sparkUser, event.sparkUser)
Expand All @@ -68,48 +79,48 @@ class TelemetrySparkListener(val sparkConf: SparkConf) extends SparkListener wit
rootContext._2.close()
}

override def onJobStart(event: SparkListenerJobStart): Unit = {
applicationSpan.foreach { case (_, parentContext, _) =>
val span = tracer.spanBuilder("job-%05d".format(event.jobId))
.setParent(parentContext)
.startSpan()
stageIdToJobId ++= event.stageIds.map(stageId => stageId -> event.jobId)
span.setAttribute("stageIds", event.stageIds.mkString(","))
val scope = span.makeCurrent()
val context = span.storeInContext(parentContext)
jobSpans += event.jobId -> (span, context, scope)
}
override def onJobStart(event: SparkListenerJobStart): Unit = {
applicationSpan.foreach { case (_, parentContext, _) =>
val span = tracer
.spanBuilder("job-%05d".format(event.jobId))
.setParent(parentContext)
.startSpan()
stageIdToJobId ++= event.stageIds.map(stageId => stageId -> event.jobId)
span.setAttribute("stageIds", event.stageIds.mkString(","))
val scope = span.makeCurrent()
val context = span.storeInContext(parentContext)
jobSpans += event.jobId -> (span, context, scope)
}
}

override def onJobEnd(event: SparkListenerJobEnd): Unit = {
jobSpans.get(event.jobId).foreach { case (span, _, scope) =>
event.jobResult match {
case JobSucceeded => span.setStatus(StatusCode.OK)
case _ =>
// For some reason, the JobFailed case class is private[spark], and we can't record the exception.
span.setStatus(StatusCode.ERROR)
}
span.setAttribute(atts.jobTime, Long.box(event.time))
span.end()
scope.close()
override def onJobEnd(event: SparkListenerJobEnd): Unit = {
jobSpans.get(event.jobId).foreach { case (span, _, scope) =>
event.jobResult match {
case JobSucceeded => span.setStatus(StatusCode.OK)
case _ =>
// For some reason, the JobFailed case class is private[spark], and we can't record the exception.
span.setStatus(StatusCode.ERROR)
}
span.setAttribute(atts.jobTime, Long.box(event.time))
span.end()
scope.close()
}
}

override def onStageCompleted(event: SparkListenerStageCompleted): Unit = {
logger.info(s"onStageCompleted: $event, parentIds=${event.stageInfo.parentIds}")
stageIdToJobId.get(event.stageInfo.stageId).map(jobSpans).foreach {
case (span, _, _) =>
val atb = Attributes.builder()
atb.put(atts.jobStageId, event.stageInfo.stageId)
atb.put(atts.jobStageName, event.stageInfo.name)
atb.put(atts.jobStageAttempt, event.stageInfo.attemptNumber())
atb.put(atts.jobStageDiskSpill, Long.box(event.stageInfo.taskMetrics.diskBytesSpilled))
atb.put(atts.jobStageMemSpill, Long.box(event.stageInfo.taskMetrics.memoryBytesSpilled))
atb.put(atts.jobStagePeakExMem, Long.box(event.stageInfo.taskMetrics.peakExecutionMemory))
event.stageInfo.failureReason.foreach { reason =>
atb.put(atts.jobStageFailureReason, reason)
}
span.addEvent("stageCompleted", atb.build())
stageIdToJobId.get(event.stageInfo.stageId).map(jobSpans).foreach { case (span, _, _) =>
val atb = Attributes.builder()
atb.put(atts.jobStageId, event.stageInfo.stageId)
atb.put(atts.jobStageName, event.stageInfo.name)
atb.put(atts.jobStageAttempt, event.stageInfo.attemptNumber())
atb.put(atts.jobStageDiskSpill, Long.box(event.stageInfo.taskMetrics.diskBytesSpilled))
atb.put(atts.jobStageMemSpill, Long.box(event.stageInfo.taskMetrics.memoryBytesSpilled))
atb.put(atts.jobStagePeakExMem, Long.box(event.stageInfo.taskMetrics.peakExecutionMemory))
event.stageInfo.failureReason.foreach { reason =>
atb.put(atts.jobStageFailureReason, reason)
}
span.addEvent("stageCompleted", atb.build())
}
stageIdToJobId -= event.stageInfo.stageId
}
Expand Down
9 changes: 5 additions & 4 deletions spot/src/main/scala/com/xebia/data/spot/package.scala
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package com.xebia.data

/**
* Spot: Spark-OpenTelemetry integration.
*/
/** Spot: Spark-OpenTelemetry integration.
*/
package object spot {

/** Fully-qualified classname of the default SdkProvider implementation. This class is defined in the sibling project
* and therefore unavailable on our compile-time classpath. */
* and therefore unavailable on our compile-time classpath.
*/
val DEFAULT_PROVIDER_FQCN = "com.xebia.data.spot.autoconf.SdkProvider"

/** Common prefix for all our keys in the SparkConf. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,19 @@ package com.xebia.data.spot
import org.scalatest.flatspec.AnyFlatSpecLike
import scala.collection.JavaConverters._


class GetContextFromConfigTest extends AnyFlatSpecLike {
import org.scalatest.matchers.should.Matchers._

behavior of "GetContextFromConfigTest"

it should "only return keys in the spark.com.xebia.data.spot namespace, with prefix removed" in new ContextFromConfigTest {
val keys = getContextFromConfig.keys(spotConfig).asScala
keys should contain only("abc", "xyz")
keys should contain only ("abc", "xyz")
}

it should "get values by applying the spark.com.xebia.data.spot prefix" in new ContextFromConfigTest {
getContextFromConfig.get(spotConfig, "abc") should equal ("abc")
getContextFromConfig.get(spotConfig, "xyz") should equal ("xyz")
getContextFromConfig.get(spotConfig, "abc") should equal("abc")
getContextFromConfig.get(spotConfig, "xyz") should equal("xyz")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class OpenTelemetrySupportTest extends AnyFlatSpec with should.Matchers {
}

class NoopOpenTelemetrySupport(config: (String, String)*) extends OpenTelemetrySupport {
override def spotConfig: Map[String, String] = Map(config:_*)
override def spotConfig: Map[String, String] = Map(config: _*)
}

class NoopSdkProvider extends OpenTelemetrySdkProvider {
Expand Down
Loading

0 comments on commit c8432cd

Please sign in to comment.