Skip to content

Commit

Permalink
make spans current in context
Browse files Browse the repository at this point in the history
  • Loading branch information
barend-xebia committed Nov 22, 2024
1 parent 72633ac commit e3f2934
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 16 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.xebia.data.spot

import io.opentelemetry.api.trace.{Span, StatusCode}
import io.opentelemetry.context.Context
import io.opentelemetry.context.{Context, Scope}
import org.apache.spark.SparkConf
import org.apache.spark.scheduler.{JobSucceeded, SparkListener, SparkListenerApplicationEnd, SparkListenerApplicationStart, SparkListenerJobEnd, SparkListenerJobStart, SparkListenerStageCompleted}
import org.slf4j.{Logger, LoggerFactory}
Expand All @@ -21,58 +21,69 @@ import scala.collection.mutable
* @param sparkConf the `SparkConf`. This is provided automatically by the Spark application as it bootstraps.
*/
class TelemetrySparkListener(val sparkConf: SparkConf) extends SparkListener with OpenTelemetrySupport {
import com.xebia.data.spot.TelemetrySparkListener.{PendingContext, PendingSpan}

@transient
protected lazy val logger: Logger = LoggerFactory.getLogger(getClass.getName)
protected val logger: Logger = LoggerFactory.getLogger(getClass.getName)
logger.info(s"TelemetrySparkListener starting up: ${System.identityHashCode(this)}")

override def spotConfig: Map[String, String] = sparkConf.getAll.toMap

private var applicationSpan: Option[(Span, Context)] = None
private val jobSpans = mutable.Map[Int, (Span, Context)]()
private var applicationSpan: Option[PendingSpan] = None
private val jobSpans = mutable.Map[Int, PendingSpan]()

lazy val rootContext: Context = {
openTelemetry.getPropagators.getTextMapPropagator.extract(Context.root(), spotConfig, new GetContextFromConfig())
lazy val rootContext: PendingContext = {
logger.info(s"Find rootcontext; config is ${spotConfig}")
val rc = openTelemetry.getPropagators.getTextMapPropagator.extract(Context.root(), spotConfig, new GetContextFromConfig())
val scope = rc.makeCurrent()
(rc, scope)
}

override def onApplicationStart(event: SparkListenerApplicationStart): Unit = {
val sb = tracer.spanBuilder(s"application-${event.appName}")
.setParent(rootContext)
.setParent(rootContext._1)
.setAttribute(TelemetrySpanAttributes.appName, event.appName)
.setAttribute(TelemetrySpanAttributes.sparkUser, event.sparkUser)
event.appId.foreach(sb.setAttribute(TelemetrySpanAttributes.appId, _))
event.appAttemptId.foreach(sb.setAttribute(TelemetrySpanAttributes.appAttemptId, _))
val span = sb.startSpan()
val context = span.storeInContext(rootContext)
applicationSpan = Some((span, context))
val scope = span.makeCurrent()
val context = span.storeInContext(rootContext._1)
applicationSpan = Some((span, context, scope))
}

override def onApplicationEnd(event: SparkListenerApplicationEnd): Unit = {
applicationSpan
.map { case (span, _) =>
.map { case (span, _, scope) =>
span.end()
scope.close()
}
.orElse {
logger.warn("Received onApplicationEnd, but found no tracing Span.")
None
}
rootContext._2.close()
}

override def onJobStart(event: SparkListenerJobStart): Unit = {
applicationSpan.foreach { case (_, parentContext) =>
applicationSpan.foreach { case (_, parentContext, _) =>
val span = tracer.spanBuilder("job-%05d".format(event.jobId))
.setParent(parentContext)
.startSpan()
val scope = span.makeCurrent()
val context = span.storeInContext(parentContext)
jobSpans += event.jobId -> (span, context)
jobSpans += event.jobId -> (span, context, scope)
}
}

override def onJobEnd(event: SparkListenerJobEnd): Unit = {
jobSpans.get(event.jobId).foreach { case (span, _) =>
jobSpans.get(event.jobId).foreach { case (span, _, scope) =>
event.jobResult match {
case JobSucceeded => span.setStatus(StatusCode.OK)
case _ => span.setStatus(StatusCode.ERROR)
}
span.end()
scope.close()
}
}

Expand All @@ -82,3 +93,8 @@ class TelemetrySparkListener(val sparkConf: SparkConf) extends SparkListener wit
// event.stageInfo.taskMetrics.peakExecutionMemory
}
}

object TelemetrySparkListener {
type PendingContext = (Context, Scope)
type PendingSpan = (Span, Context, Scope)
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,12 @@ class TestTelemetrySparkListener(extraConf: (String, String)*) {
object TestingSdkProvider {
private[spot] val clock: TestClock = TestClock.create()
private[spot] val spanExporter: InMemorySpanExporter = InMemorySpanExporter.create()
private[spot] val testingSdk: OpenTelemetrySdk = OpenTelemetrySdk.builder().setTracerProvider(
SdkTracerProvider.builder().setClock(clock).setSampler(Sampler.alwaysOn()).addSpanProcessor(SimpleSpanProcessor.builder(spanExporter).build()).build()
).setPropagators(ContextPropagators.create(W3CTraceContextPropagator.getInstance())).build()
private[spot] val testingSdk: OpenTelemetrySdk = {
sys.props.put("io.opentelemetry.context.enableStrictContext", "true")
OpenTelemetrySdk.builder().setTracerProvider(
SdkTracerProvider.builder().setClock(clock).setSampler(Sampler.alwaysOn()).addSpanProcessor(SimpleSpanProcessor.builder(spanExporter).build()).build()
).setPropagators(ContextPropagators.create(W3CTraceContextPropagator.getInstance())).build()
}

def getFinishedSpanItems: util.List[SpanData] = {
spanExporter.flush()
Expand Down

0 comments on commit e3f2934

Please sign in to comment.