Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CELEBORN-1743] [FOLLOWUP]Introduces a configuration option to determine whether application metrics should be included #2964

Draft
wants to merge 11 commits into
base: main
Choose a base branch
from
Draft
Original file line number Diff line number Diff line change
Expand Up @@ -879,6 +879,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
def metricsWorkerForceAppendPauseSpentTimeThreshold: Int =
get(METRICS_WORKER_PAUSE_SPENT_TIME_FORCE_APPEND_THRESHOLD)
def metricsJsonPrettyEnabled: Boolean = get(METRICS_JSON_PRETTY_ENABLED)
def metricsAppEnabled: Boolean = get(METRICS_APP_ENABLED)

// //////////////////////////////////////////////////////
// Quota //
Expand Down Expand Up @@ -5340,6 +5341,14 @@ object CelebornConf extends Logging {
.booleanConf
.createWithDefault(true)

val METRICS_APP_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.metrics.application.enabled")
.categories("metrics")
.doc("When false, the metrics of application won't return to reduce the num of metrics.")
.version("0.6.0")
.booleanConf
.createWithDefault(true)

val IDENTITY_PROVIDER: ConfigEntry[String] =
buildConf("celeborn.identity.provider")
.withAlternative("celeborn.quota.identity.provider")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,18 @@ import org.apache.celeborn.common.util.{JavaUtils, ThreadUtils, Utils}
// Can Remove this if celeborn don't support scala211 in future
import org.apache.celeborn.common.util.FunctionConverter._

case class NamedCounter(name: String, counter: Counter, labels: Map[String, String])
case class NamedCounter(
name: String,
counter: Counter,
labels: Map[String, String],
isApp: Boolean = false)
extends MetricLabels

case class NamedGauge[T](name: String, gauge: Gauge[T], labels: Map[String, String])
case class NamedGauge[T](
name: String,
gauge: Gauge[T],
labels: Map[String, String],
isApp: Boolean = false)
extends MetricLabels

case class NamedMeter(name: String, meter: Meter, labels: Map[String, String])
Expand Down Expand Up @@ -76,6 +84,8 @@ abstract class AbstractSource(conf: CelebornConf, role: String)
val staticLabels: Map[String, String] = conf.metricsExtraLabels + roleLabel ++ instanceLabel
val staticLabelsString: String = MetricLabels.labelString(staticLabels)

val metricsAppEnabled: Boolean = conf.metricsAppEnabled

val applicationLabel = "applicationId"

val timerMetrics: ConcurrentLinkedQueue[String] = new ConcurrentLinkedQueue[String]()
Expand All @@ -101,30 +111,44 @@ abstract class AbstractSource(conf: CelebornConf, role: String)
def addGauge[T](
name: String,
labels: Map[String, String],
gauge: Gauge[T]): Unit = {
// filter out non-number type gauges
if (gauge.getValue.isInstanceOf[Number]) {
namedGauges.putIfAbsent(
metricNameWithCustomizedLabels(name, labels),
NamedGauge(name, gauge, labels ++ staticLabels))
} else {
logWarning(
s"Add gauge $name failed, the value type ${gauge.getValue.getClass} is not a number")
gauge: Gauge[T],
isAppMetrics: Boolean): Unit = {
if (metricsAppEnabled || (!metricsAppEnabled && !isAppMetrics)) {
// filter out non-number type gauges
if (gauge.getValue.isInstanceOf[Number]) {
namedGauges.putIfAbsent(
metricNameWithCustomizedLabels(name, labels),
NamedGauge(name, gauge, labels ++ staticLabels, isAppMetrics))
} else {
logWarning(
s"Add gauge $name failed, the value type ${gauge.getValue.getClass} is not a number")
}
}
}

def addGauge[T](
name: String,
labels: Map[String, String],
gauge: Gauge[T]): Unit = {
addGauge(name, labels, gauge, false)
}

def addGauge[T](
name: String,
labels: JMap[String, String],
gauge: Gauge[T]): Unit = {
addGauge(name, labels.asScala.toMap, gauge)
}

def addGauge[T](name: String, labels: Map[String, String] = Map.empty)(f: () => T): Unit = {
def addGauge[T](
name: String,
labels: Map[String, String] = Map.empty,
isAppMetrics: Boolean = false)(f: () => T): Unit = {
addGauge(
name,
labels,
metricRegistry.gauge(metricNameWithCustomizedLabels(name, labels), new GaugeSupplier[T](f)))
metricRegistry.gauge(metricNameWithCustomizedLabels(name, labels), new GaugeSupplier[T](f)),
isAppMetrics)
}

def addGauge[T](name: String, gauge: Gauge[T]): Unit = {
Expand Down Expand Up @@ -176,11 +200,17 @@ abstract class AbstractSource(conf: CelebornConf, role: String)

def addCounter(name: String): Unit = addCounter(name, Map.empty[String, String])

def addCounter(name: String, labels: Map[String, String]): Unit = {
val metricNameWithLabel = metricNameWithCustomizedLabels(name, labels)
namedCounters.putIfAbsent(
metricNameWithLabel,
NamedCounter(name, metricRegistry.counter(metricNameWithLabel), labels ++ staticLabels))
def addCounter(name: String, labels: Map[String, String], isAppMetrics: Boolean = false): Unit = {
if (metricsAppEnabled || (!metricsAppEnabled && !isAppMetrics)) {
val metricNameWithLabel = metricNameWithCustomizedLabels(name, labels)
namedCounters.putIfAbsent(
metricNameWithLabel,
NamedCounter(
name,
metricRegistry.counter(metricNameWithLabel),
labels ++ staticLabels,
isAppMetrics))
}
}

def counters(): List[NamedCounter] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,29 +22,24 @@ import org.apache.celeborn.common.CelebornConf

class CelebornSourceSuite extends CelebornFunSuite {

test("test getMetrics with customized label") {
val conf = new CelebornConf()
createAbstractSourceAndCheck(conf, "", Role.MASTER)
createAbstractSourceAndCheck(conf, "", Role.WORKER)
}

def createAbstractSourceAndCheck(
def createAbstractSource(
conf: CelebornConf,
extraLabels: String,
role: String = "mock"): Unit = {
role: String = "mock"): (String, List[String]) = {
val mockSource = new AbstractSource(conf, role) {
override def sourceName: String = "mockSource"
}
val user1 = Map("user" -> "user1")
val user2 = Map("user" -> "user2")
val user3 = Map("user" -> "user3")
mockSource.addGauge("Gauge1") { () => 1000 }
mockSource.addGauge("Gauge2", user1) { () => 2000 }
mockSource.addCounter("Counter1")
mockSource.addCounter("Counter2", user2)
mockSource.addGauge("Gauge2", user1, true) { () => 2000 }
mockSource.addCounter("Counter1", Map.empty[String, String], true)
mockSource.addCounter("Counter2", user2, true)
// test operation with and without label
mockSource.incCounter("Counter1", 3000)
mockSource.incCounter("Counter2", 4000, user2)
mockSource.incCounter("Counter2", -4000, user2)
mockSource.addTimer("Timer1")
mockSource.addTimer("Timer2", user3)
// ditto
Expand All @@ -54,6 +49,8 @@ class CelebornSourceSuite extends CelebornFunSuite {
mockSource.stopTimer("Timer1", "key1")
mockSource.stopTimer("Timer2", "key2", user3)

mockSource.timerMetrics.add("testTimerMetricsMap")

val res = mockSource.getMetrics()
var extraLabelsStr = extraLabels
if (extraLabels.nonEmpty) {
Expand All @@ -66,37 +63,86 @@ class CelebornSourceSuite extends CelebornFunSuite {
s"""metrics_Gauge2_Value{${extraLabelsStr}${instanceLabelStr}role="$role",user="user1"} 2000"""
val exp3 = s"""metrics_Counter1_Count{${extraLabelsStr}${instanceLabelStr}role="$role"} 3000"""
val exp4 =
s"""metrics_Counter2_Count{${extraLabelsStr}${instanceLabelStr}role="$role",user="user2"} 4000"""
s"""metrics_Counter2_Count{${extraLabelsStr}${instanceLabelStr}role="$role",user="user2"} 0"""
val exp5 = s"""metrics_Timer1_Count{${extraLabelsStr}${instanceLabelStr}role="$role"} 1"""
val exp6 =
s"""metrics_Timer2_Count{${extraLabelsStr}${instanceLabelStr}role="$role",user="user3"} 1"""
val exp7 = "testTimerMetricsMap"

assert(res.contains(exp1))
assert(res.contains(exp2))
assert(res.contains(exp3))
assert(res.contains(exp4))
assert(res.contains(exp5))
assert(res.contains(exp6))
val expList = List[String](exp1, exp2, exp3, exp4, exp5, exp6, exp7)
(res, expList)
}

def checkMetricsRes(res: String, labelList: List[String]): Unit = {
labelList.foreach { exp =>
assert(res.contains(exp))
}
}

test("test getMetrics with customized label by conf") {
val conf = new CelebornConf()
val (resM, expsM) = createAbstractSource(conf, "", Role.MASTER)
checkMetricsRes(resM, expsM)
val (resW, expsW) = createAbstractSource(conf, "", Role.WORKER)
checkMetricsRes(resW, expsW)

// label's is normal
conf.set(CelebornConf.METRICS_EXTRA_LABELS.key, "l1=v1,l2=v2,l3=v3")
val extraLabels = """l1="v1",l2="v2",l3="v3""""
createAbstractSourceAndCheck(conf, extraLabels)
val (res, exps) = createAbstractSource(conf, extraLabels)
checkMetricsRes(res, exps)

// labels' kv not correct
assertThrows[IllegalArgumentException] {
conf.set(CelebornConf.METRICS_EXTRA_LABELS.key, "l1=v1,l2=")
val extraLabels2 = """l1="v1",l2="v2",l3="v3""""
createAbstractSourceAndCheck(conf, extraLabels2)
val (res2, exps2) = createAbstractSource(conf, extraLabels2)
checkMetricsRes(res2, exps2)
}

// there are spaces in labels
conf.set(CelebornConf.METRICS_EXTRA_LABELS.key, " l1 = v1, l2 =v2 ,l3 =v3 ")
val extraLabels3 = """l1="v1",l2="v2",l3="v3""""
createAbstractSourceAndCheck(conf, extraLabels3)
val (res3, exps3) = createAbstractSource(conf, extraLabels3)
checkMetricsRes(res3, exps3)
}

test("test getMetrics with full capacity and isAppEnable false") {
val conf = new CelebornConf()

// metrics won't contain appMetrics
conf.set(CelebornConf.METRICS_APP_ENABLED.key, "false")
conf.set(CelebornConf.METRICS_CAPACITY.key, "7")
val (res1, exps1) = createAbstractSource(conf, "")
List[Int](0, 4, 5, 6).foreach { i =>
assert(res1.contains(exps1(i)))
}
List[Int](1, 2, 3).foreach { i =>
assert(!res1.contains(exps1(i)))
}

// metrics contain appMetrics
conf.set(CelebornConf.METRICS_APP_ENABLED.key, "true")
conf.set(CelebornConf.METRICS_CAPACITY.key, "7")
val (res2, exps2) = createAbstractSource(conf, "")
checkMetricsRes(res2, exps2)
}

test("test getAndClearTimerMetrics in timerMetrics") {
val conf = new CelebornConf()
conf.set(CelebornConf.METRICS_CAPACITY.key, "6")
val role = "mock"
val mockSource = new AbstractSource(conf, role) {
override def sourceName: String = "mockSource"
}
val exp1 = "testTimerMetrics1"
val exp2 = "testTimerMetrics2"
mockSource.timerMetrics.add(exp1)
val res1 = mockSource.getMetrics()
mockSource.timerMetrics.add(exp2)
val res2 = mockSource.getMetrics()

assert(res1.contains(exp1) && !res1.contains(exp2))
assert(res2.contains(exp2) && !res2.contains(exp1))
}
}
1 change: 1 addition & 0 deletions docs/configuration/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ license: |
<!--begin-include-->
| Key | Default | isDynamic | Description | Since | Deprecated |
| --- | ------- | --------- | ----------- | ----- | ---------- |
| celeborn.metrics.application.enabled | true | false | When false, the metrics of application won't return to reduce the num of metrics. | 0.6.0 | |
| celeborn.metrics.capacity | 4096 | false | The maximum number of metrics which a source can use to generate output strings. | 0.2.0 | |
| celeborn.metrics.collectPerfCritical.enabled | false | false | It controls whether to collect metrics which may affect performance. When enable, Celeborn collects them. | 0.2.0 | |
| celeborn.metrics.conf | &lt;undefined&gt; | false | Custom metrics configuration file path. Default use `metrics.properties` in classpath. | 0.3.0 | |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -703,23 +703,27 @@ private[celeborn] class Worker(
resourceConsumptionLabel += (resourceConsumptionSource.applicationLabel -> applicationId)
resourceConsumptionSource.addGauge(
ResourceConsumptionSource.DISK_FILE_COUNT,
resourceConsumptionLabel) { () =>
resourceConsumptionLabel,
true) { () =>
computeResourceConsumption(userIdentifier, resourceConsumption).diskFileCount
}
resourceConsumptionSource.addGauge(
ResourceConsumptionSource.DISK_BYTES_WRITTEN,
resourceConsumptionLabel) { () =>
resourceConsumptionLabel,
true) { () =>
computeResourceConsumption(userIdentifier, resourceConsumption).diskBytesWritten
}
if (hasHDFSStorage) {
resourceConsumptionSource.addGauge(
ResourceConsumptionSource.HDFS_FILE_COUNT,
resourceConsumptionLabel) { () =>
resourceConsumptionLabel,
true) { () =>
computeResourceConsumption(userIdentifier, resourceConsumption).hdfsFileCount
}
resourceConsumptionSource.addGauge(
ResourceConsumptionSource.HDFS_BYTES_WRITTEN,
resourceConsumptionLabel) { () =>
resourceConsumptionLabel,
true) { () =>
computeResourceConsumption(userIdentifier, resourceConsumption).hdfsBytesWritten
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ class WorkerSource(conf: CelebornConf) extends AbstractSource(conf, Role.WORKER)
val applicationIds = appActiveConnections.get(client.getChannel.id().asLongText())
val applicationId = Utils.splitShuffleKey(shuffleKey)._1
if (applicationIds != null && !applicationIds.contains(applicationId)) {
addCounter(ACTIVE_CONNECTION_COUNT, Map(applicationLabel -> applicationId))
addCounter(ACTIVE_CONNECTION_COUNT, Map(applicationLabel -> applicationId), true)
incCounter(ACTIVE_CONNECTION_COUNT, 1, Map(applicationLabel -> applicationId))
applicationIds.add(applicationId)
}
Expand Down
Loading