Skip to content

Commit

Permalink
Add KUBERNETES_NAMESPACE, rename pod name to KUBERNETES_POD_NAME
Browse files Browse the repository at this point in the history
  • Loading branch information
EnricoMi committed Jul 30, 2024
1 parent e5d73ca commit 45c3837
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 4 deletions.
3 changes: 2 additions & 1 deletion docs/running-on-kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -1673,7 +1673,8 @@ See the [configuration page](configuration.html) for information on Spark config
<li><code>APP_ID</code>: The unique application id</li>
<li><code>EXECUTOR_ID</code>: The executor id (a positive integer larger than zero)</li>
<li><code>HOSTNAME</code>: The name of the host where the executor runs</li>
<li><code>POD_NAME</code>: The name of the pod that contains the executor</li>
<li><code>KUBERNETES_NAMESPACE</code>: The namespace where the executor pods run</li>
<li><code>KUBERNETES_POD_NAME</code>: The name of the pod that contains the executor</li>
<li><code>FILE_NAME</code>: The name of the log, which is always <code>"log"</code></li>
</ul>
Please note that this configuration also replaces original log urls in event log,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package org.apache.spark.scheduler.cluster.k8s

import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.k8s.Config.KUBERNETES_NAMESPACE
import org.apache.spark.deploy.worker.WorkerWatcher
import org.apache.spark.executor.CoarseGrainedExecutorBackend
import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -62,7 +63,8 @@ private[spark] class KubernetesExecutorBackend(
"APP_ID" -> appId,
"EXECUTOR_ID" -> executorId,
"HOSTNAME" -> hostname,
"POD_NAME" -> podName
"KUBERNETES_NAMESPACE" -> env.conf.get(KUBERNETES_NAMESPACE),
"KUBERNETES_POD_NAME" -> podName
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,18 @@
package org.apache.spark.scheduler.cluster.k8s

import org.mockito.Mockito.mock
import org.mockito.Mockito.when

import org.apache.spark.{SparkEnv, SparkFunSuite}
import org.apache.spark.{SparkConf, SparkEnv, SparkFunSuite}
import org.apache.spark.resource.ResourceProfile
import org.apache.spark.rpc.RpcEnv

class KubernetesExecutorBackendSuite extends SparkFunSuite {
test("extract log urls and attributes") {
val mockRpcEnv = mock(classOf[RpcEnv])
val mockSparkEnv = mock(classOf[SparkEnv])
val conf = new SparkConf()
when(mockSparkEnv.conf).thenReturn(conf)
val mockResourceProfile = mock(classOf[ResourceProfile])

val backend = new KubernetesExecutorBackend(
Expand All @@ -37,7 +40,8 @@ class KubernetesExecutorBackendSuite extends SparkFunSuite {
"APP_ID" -> "app-id",
"EXECUTOR_ID" -> "executor-id",
"HOSTNAME" -> "hostname",
"POD_NAME" -> "pod-name"
"KUBERNETES_NAMESPACE" -> "default",
"KUBERNETES_POD_NAME" -> "pod-name"
)

assert(backend.extractLogUrls === Map.empty)
Expand All @@ -61,6 +65,11 @@ class KubernetesExecutorBackendSuite extends SparkFunSuite {
"ENV_1" -> "val1", "ENV_2" -> "val2"
) ++ expectedKubernetesAttributes)
}

conf.set("spark.kubernetes.namespace", "my-namespace")
assert(backend.extractLogUrls === Map.empty)
assert(backend.extractAttributes === expectedKubernetesAttributes ++
Map("KUBERNETES_NAMESPACE" -> "my-namespace"))
}

private def withEnvs(pairs: (String, String)*)(f: => Unit): Unit = {
Expand Down

0 comments on commit 45c3837

Please sign in to comment.