Skip to content

Commit

Permalink
Add query execution metrics (#799)
Browse files Browse the repository at this point in the history
* Add query execution metrics

Signed-off-by: Tomoyuki Morita <[email protected]>

* Remove batch.processingTime metrics

Signed-off-by: Tomoyuki Morita <[email protected]>

---------

Signed-off-by: Tomoyuki Morita <[email protected]>
  • Loading branch information
ykmr1224 authored Oct 23, 2024
1 parent 0b6da30 commit 7546f58
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ public final class MetricConstants {
*/
public static final String STREAMING_HEARTBEAT_FAILED_METRIC = "streaming.heartbeat.failed.count";

/**
* Metric for tracking the latency of query execution (start to complete query execution) excluding result write.
*/
public static final String QUERY_EXECUTION_TIME_METRIC = "query.execution.processingTime";

private MetricConstants() {
// Private constructor to prevent instantiation
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ public static Long stopTimer(Timer.Context context) {
return context != null ? context.stop() : null;
}

public static Timer getTimer(String metricName, boolean isIndexMetric) {
return getOrCreateTimer(metricName, isIndexMetric);
}

/**
* Registers a gauge metric with the provided name and value.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import com.codahale.metrics.Counter;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Timer;
import java.time.Duration;
import java.time.temporal.TemporalUnit;
import org.apache.spark.SparkEnv;
import org.apache.spark.metrics.source.FlintMetricSource;
import org.apache.spark.metrics.source.FlintIndexMetricSource;
Expand Down Expand Up @@ -101,6 +103,34 @@ private void testStartStopTimerHelper(boolean isIndexMetric) {
}
}

@Test
public void testGetTimer() {
try (MockedStatic<SparkEnv> sparkEnvMock = mockStatic(SparkEnv.class)) {
// Mock SparkEnv
SparkEnv sparkEnv = mock(SparkEnv.class, RETURNS_DEEP_STUBS);
sparkEnvMock.when(SparkEnv::get).thenReturn(sparkEnv);

// Mock appropriate MetricSource
String sourceName = FlintMetricSource.FLINT_INDEX_METRIC_SOURCE_NAME();
Source metricSource = Mockito.spy(new FlintIndexMetricSource());
when(sparkEnv.metricsSystem().getSourcesByName(sourceName).head()).thenReturn(
metricSource);

// Test the methods
String testMetric = "testPrefix.processingTime";
long duration = 500;
MetricsUtil.getTimer(testMetric, true).update(duration, TimeUnit.MILLISECONDS);

// Verify interactions
verify(sparkEnv.metricsSystem(), times(0)).registerSource(any());
verify(metricSource, times(1)).metricRegistry();
Timer timer = metricSource.metricRegistry().getTimers().get(testMetric);
Assertions.assertNotNull(timer);
Assertions.assertEquals(1L, timer.getCount());
assertEquals(Duration.ofMillis(duration).getNano(), timer.getSnapshot().getMean(), 0.1);
}
}

@Test
public void testRegisterGauge() {
testRegisterGaugeHelper(false);
Expand Down Expand Up @@ -169,4 +199,4 @@ public void testDefaultBehavior() {
Assertions.assertNotNull(flintMetricSource.metricRegistry().getGauges().get(testGaugeMetric));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

package org.apache.spark.sql

import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.{ThreadPoolExecutor, TimeUnit}
import java.util.concurrent.atomic.AtomicInteger

import scala.concurrent.{ExecutionContext, Future, TimeoutException}
Expand All @@ -14,7 +14,7 @@ import scala.util.{Failure, Success, Try}

import org.opensearch.flint.common.model.FlintStatement
import org.opensearch.flint.common.scheduler.model.LangType
import org.opensearch.flint.core.metrics.MetricConstants
import org.opensearch.flint.core.metrics.{MetricConstants, MetricsUtil}
import org.opensearch.flint.core.metrics.MetricsUtil.incrementCounter
import org.opensearch.flint.spark.FlintSpark

Expand Down Expand Up @@ -136,6 +136,8 @@ case class JobOperator(
"",
startTime))
} finally {
emitQueryExecutionTimeMetric(startTime)

try {
dataToWrite.foreach(df => writeDataFrameToOpensearch(df, resultIndex, osClient))
} catch {
Expand All @@ -148,11 +150,14 @@ case class JobOperator(
statement.error = Some(error)
statementExecutionManager.updateStatement(statement)

cleanUpResources(exceptionThrown, threadPool)
cleanUpResources(exceptionThrown, threadPool, startTime)
}
}

def cleanUpResources(exceptionThrown: Boolean, threadPool: ThreadPoolExecutor): Unit = {
def cleanUpResources(
exceptionThrown: Boolean,
threadPool: ThreadPoolExecutor,
startTime: Long): Unit = {
val isStreaming = jobType.equalsIgnoreCase(FlintJobType.STREAMING)
try {
// Wait for streaming job complete if no error
Expand Down Expand Up @@ -195,6 +200,12 @@ case class JobOperator(
}
}

private def emitQueryExecutionTimeMetric(startTime: Long): Unit = {
MetricsUtil
.getTimer(MetricConstants.QUERY_EXECUTION_TIME_METRIC, false)
.update(System.currentTimeMillis() - startTime, TimeUnit.MILLISECONDS);
}

def stop(): Unit = {
Try {
logInfo("Stopping Spark session")
Expand Down

0 comments on commit 7546f58

Please sign in to comment.