From 7546f58f3e7482644ba84c482929d4bd8aac0e05 Mon Sep 17 00:00:00 2001 From: Tomoyuki MORITA Date: Wed, 23 Oct 2024 15:03:37 -0700 Subject: [PATCH] Add query execution metrics (#799) * Add query execution metrics Signed-off-by: Tomoyuki Morita * Remove batch.processingTime metrics Signed-off-by: Tomoyuki Morita --------- Signed-off-by: Tomoyuki Morita --- .../flint/core/metrics/MetricConstants.java | 5 +++ .../flint/core/metrics/MetricsUtil.java | 4 +++ .../flint/core/metrics/MetricsUtilTest.java | 32 ++++++++++++++++++- .../org/apache/spark/sql/JobOperator.scala | 19 ++++++++--- 4 files changed, 55 insertions(+), 5 deletions(-) diff --git a/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java b/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java index 4cdfcee01..3a72c1d5a 100644 --- a/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java +++ b/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java @@ -112,6 +112,11 @@ public final class MetricConstants { */ public static final String STREAMING_HEARTBEAT_FAILED_METRIC = "streaming.heartbeat.failed.count"; + /** + * Metric for tracking the latency of query execution (start to complete query execution) excluding result write. + */ + public static final String QUERY_EXECUTION_TIME_METRIC = "query.execution.processingTime"; + private MetricConstants() { // Private constructor to prevent instantiation } diff --git a/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricsUtil.java b/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricsUtil.java index 81a482d5e..ab1207ccc 100644 --- a/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricsUtil.java +++ b/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricsUtil.java @@ -107,6 +107,10 @@ public static Long stopTimer(Timer.Context context) { return context != null ? context.stop() : null; } + public static Timer getTimer(String metricName, boolean isIndexMetric) { + return getOrCreateTimer(metricName, isIndexMetric); + } + /** * Registers a gauge metric with the provided name and value. * diff --git a/flint-core/src/test/java/org/opensearch/flint/core/metrics/MetricsUtilTest.java b/flint-core/src/test/java/org/opensearch/flint/core/metrics/MetricsUtilTest.java index b54269ce0..b5470b6be 100644 --- a/flint-core/src/test/java/org/opensearch/flint/core/metrics/MetricsUtilTest.java +++ b/flint-core/src/test/java/org/opensearch/flint/core/metrics/MetricsUtilTest.java @@ -3,6 +3,8 @@ import com.codahale.metrics.Counter; import com.codahale.metrics.Gauge; import com.codahale.metrics.Timer; +import java.time.Duration; +import java.time.temporal.TemporalUnit; import org.apache.spark.SparkEnv; import org.apache.spark.metrics.source.FlintMetricSource; import org.apache.spark.metrics.source.FlintIndexMetricSource; @@ -101,6 +103,34 @@ private void testStartStopTimerHelper(boolean isIndexMetric) { } } + @Test + public void testGetTimer() { + try (MockedStatic sparkEnvMock = mockStatic(SparkEnv.class)) { + // Mock SparkEnv + SparkEnv sparkEnv = mock(SparkEnv.class, RETURNS_DEEP_STUBS); + sparkEnvMock.when(SparkEnv::get).thenReturn(sparkEnv); + + // Mock appropriate MetricSource + String sourceName = FlintMetricSource.FLINT_INDEX_METRIC_SOURCE_NAME(); + Source metricSource = Mockito.spy(new FlintIndexMetricSource()); + when(sparkEnv.metricsSystem().getSourcesByName(sourceName).head()).thenReturn( + metricSource); + + // Test the methods + String testMetric = "testPrefix.processingTime"; + long duration = 500; + MetricsUtil.getTimer(testMetric, true).update(duration, TimeUnit.MILLISECONDS); + + // Verify interactions + verify(sparkEnv.metricsSystem(), times(0)).registerSource(any()); + verify(metricSource, times(1)).metricRegistry(); + Timer timer = metricSource.metricRegistry().getTimers().get(testMetric); + Assertions.assertNotNull(timer); + Assertions.assertEquals(1L, timer.getCount()); + assertEquals(Duration.ofMillis(duration).getNano(), timer.getSnapshot().getMean(), 0.1); + } + } + @Test public void testRegisterGauge() { testRegisterGaugeHelper(false); @@ -169,4 +199,4 @@ public void testDefaultBehavior() { Assertions.assertNotNull(flintMetricSource.metricRegistry().getGauges().get(testGaugeMetric)); } } -} \ No newline at end of file +} diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/JobOperator.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/JobOperator.scala index 58d868a2e..01d8cb05c 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/JobOperator.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/JobOperator.scala @@ -5,7 +5,7 @@ package org.apache.spark.sql -import java.util.concurrent.ThreadPoolExecutor +import java.util.concurrent.{ThreadPoolExecutor, TimeUnit} import java.util.concurrent.atomic.AtomicInteger import scala.concurrent.{ExecutionContext, Future, TimeoutException} @@ -14,7 +14,7 @@ import scala.util.{Failure, Success, Try} import org.opensearch.flint.common.model.FlintStatement import org.opensearch.flint.common.scheduler.model.LangType -import org.opensearch.flint.core.metrics.MetricConstants +import org.opensearch.flint.core.metrics.{MetricConstants, MetricsUtil} import org.opensearch.flint.core.metrics.MetricsUtil.incrementCounter import org.opensearch.flint.spark.FlintSpark @@ -136,6 +136,8 @@ case class JobOperator( "", startTime)) } finally { + emitQueryExecutionTimeMetric(startTime) + try { dataToWrite.foreach(df => writeDataFrameToOpensearch(df, resultIndex, osClient)) } catch { @@ -148,11 +150,14 @@ case class JobOperator( statement.error = Some(error) statementExecutionManager.updateStatement(statement) - cleanUpResources(exceptionThrown, threadPool) + cleanUpResources(exceptionThrown, threadPool, startTime) } } - def cleanUpResources(exceptionThrown: Boolean, threadPool: ThreadPoolExecutor): Unit = { + def cleanUpResources( + exceptionThrown: Boolean, + threadPool: ThreadPoolExecutor, + startTime: Long): Unit = { val isStreaming = jobType.equalsIgnoreCase(FlintJobType.STREAMING) try { // Wait for streaming job complete if no error @@ -195,6 +200,12 @@ case class JobOperator( } } + private def emitQueryExecutionTimeMetric(startTime: Long): Unit = { + MetricsUtil + .getTimer(MetricConstants.QUERY_EXECUTION_TIME_METRIC, false) + .update(System.currentTimeMillis() - startTime, TimeUnit.MILLISECONDS); + } + def stop(): Unit = { Try { logInfo("Stopping Spark session")