From 6de734e3dcaf0614aa5f7f689de25840492fe366 Mon Sep 17 00:00:00 2001 From: Maosong Fu Date: Wed, 28 Jun 2017 17:37:50 -0700 Subject: [PATCH] Currently heron-instances exports most of metrics scoped by different streams. But a lot of customers expressed their interest on the value of all streams aggregated and for now they have to sum them up by themselves. This pull request exports metrics aggregated on differetn streams too. Tested with LocalScheduler. --- .../heron/common/utils/metrics/BoltMetrics.java | 2 +- .../common/utils/metrics/ComponentMetrics.java | 16 +++++++--------- .../common/utils/metrics/FullBoltMetrics.java | 10 +++++++++- .../common/utils/metrics/FullSpoutMetrics.java | 7 +++++++ .../heron/common/utils/metrics/SpoutMetrics.java | 2 +- 5 files changed, 25 insertions(+), 12 deletions(-) diff --git a/heron/common/src/java/com/twitter/heron/common/utils/metrics/BoltMetrics.java b/heron/common/src/java/com/twitter/heron/common/utils/metrics/BoltMetrics.java index c64b29fd88f..9612ce10dc6 100644 --- a/heron/common/src/java/com/twitter/heron/common/utils/metrics/BoltMetrics.java +++ b/heron/common/src/java/com/twitter/heron/common/utils/metrics/BoltMetrics.java @@ -32,7 +32,7 @@ * 4. Expose methods which could be called externally to change the value of metrics */ -public class BoltMetrics implements ComponentMetrics { +public class BoltMetrics extends ComponentMetrics { private final CountMetric ackCount; private final ReducedMetric processLatency; private final ReducedMetric failLatency; diff --git a/heron/common/src/java/com/twitter/heron/common/utils/metrics/ComponentMetrics.java b/heron/common/src/java/com/twitter/heron/common/utils/metrics/ComponentMetrics.java index 5503d144def..192924b5d81 100644 --- a/heron/common/src/java/com/twitter/heron/common/utils/metrics/ComponentMetrics.java +++ b/heron/common/src/java/com/twitter/heron/common/utils/metrics/ComponentMetrics.java @@ -13,16 +13,14 @@ // limitations under the License. package com.twitter.heron.common.utils.metrics; -import com.twitter.heron.classification.InterfaceAudience; -import com.twitter.heron.classification.InterfaceStability; - /** - * Interface for common metric actions that both spouts and bolts support + * Abstract Class for common metric actions that both spouts and bolts support */ -@InterfaceAudience.Private -@InterfaceStability.Evolving -public interface ComponentMetrics { +public abstract class ComponentMetrics { + // Metric-name suffix reserved for value aggregating on all different streams + public static final String ALL_STREAMS_AGGREGATED = "__all-streams-aggregated"; + + public abstract void serializeDataTuple(String streamId, long latency); - void serializeDataTuple(String streamId, long latency); - void emittedTuple(String streamId); + public abstract void emittedTuple(String streamId); } diff --git a/heron/common/src/java/com/twitter/heron/common/utils/metrics/FullBoltMetrics.java b/heron/common/src/java/com/twitter/heron/common/utils/metrics/FullBoltMetrics.java index ee75a8529d8..3c207d72005 100644 --- a/heron/common/src/java/com/twitter/heron/common/utils/metrics/FullBoltMetrics.java +++ b/heron/common/src/java/com/twitter/heron/common/utils/metrics/FullBoltMetrics.java @@ -54,7 +54,6 @@ public class FullBoltMetrics extends BoltMetrics { // so instance could not produce more tuples private final CountMetric outQueueFullCount; - public FullBoltMetrics() { ackCount = new MultiCountMetric(); processLatency = new MultiReducedMetric<>(new MeanReducer()); @@ -126,6 +125,8 @@ public void ackedTuple(String streamId, String sourceComponent, long latency) { ackCount.scope(streamId).incr(); processLatency.scope(streamId).update(latency); + ackCount.scope(ALL_STREAMS_AGGREGATED).incr(); + // Consider there are cases that different streams with the same streamId, // but with different source component. We need to distinguish them too. String globalStreamId = @@ -138,6 +139,8 @@ public void failedTuple(String streamId, String sourceComponent, long latency) { failCount.scope(streamId).incr(); failLatency.scope(streamId).update(latency); + failCount.scope(ALL_STREAMS_AGGREGATED).incr(); + // Consider there are cases that different streams with the same streamId, // but with different source component. We need to distinguish them too. String globalStreamId = @@ -151,6 +154,9 @@ public void executeTuple(String streamId, String sourceComponent, long latency) executeLatency.scope(streamId).update(latency); executeTimeNs.scope(streamId).incrBy(latency); + executeCount.scope(ALL_STREAMS_AGGREGATED).incr(); + executeTimeNs.scope(ALL_STREAMS_AGGREGATED).incrBy(latency); + // Consider there are cases that different streams with the same streamId, // but with different source component. We need to distinguish them too. String globalStreamId = @@ -170,6 +176,7 @@ public void updateOutQueueFullCount() { public void deserializeDataTuple(String streamId, String sourceComponent, long latency) { deserializationTimeNs.scope(streamId).incrBy(latency); + deserializationTimeNs.scope(ALL_STREAMS_AGGREGATED).incrBy(latency); // Consider there are cases that different streams with the same streamId, // but with different source component. We need to distinguish them too. @@ -180,6 +187,7 @@ public void deserializeDataTuple(String streamId, String sourceComponent, long l public void serializeDataTuple(String streamId, long latency) { serializationTimeNs.scope(streamId).incrBy(latency); + serializationTimeNs.scope(ALL_STREAMS_AGGREGATED).incrBy(latency); } } diff --git a/heron/common/src/java/com/twitter/heron/common/utils/metrics/FullSpoutMetrics.java b/heron/common/src/java/com/twitter/heron/common/utils/metrics/FullSpoutMetrics.java index a8e176a8433..ea8346e68db 100644 --- a/heron/common/src/java/com/twitter/heron/common/utils/metrics/FullSpoutMetrics.java +++ b/heron/common/src/java/com/twitter/heron/common/utils/metrics/FullSpoutMetrics.java @@ -110,19 +110,25 @@ public void initMultiCountMetrics(PhysicalPlanHelper helper) { public void ackedTuple(String streamId, long latency) { ackCount.scope(streamId).incr(); completeLatency.scope(streamId).update(latency); + + ackCount.scope(ALL_STREAMS_AGGREGATED).incr(); } public void failedTuple(String streamId, long latency) { failCount.scope(streamId).incr(); failLatency.scope(streamId).update(latency); + + failCount.scope(ALL_STREAMS_AGGREGATED).incr(); } public void timeoutTuple(String streamId) { timeoutCount.scope(streamId).incr(); + timeoutCount.scope(ALL_STREAMS_AGGREGATED).incr(); } public void emittedTuple(String streamId) { emitCount.scope(streamId).incr(); + emitCount.scope(ALL_STREAMS_AGGREGATED).incr(); } public void nextTuple(long latency) { @@ -140,6 +146,7 @@ public void updatePendingTuplesCount(long count) { public void serializeDataTuple(String streamId, long latency) { serializationTimeNs.scope(streamId).incrBy(latency); + serializationTimeNs.scope(ALL_STREAMS_AGGREGATED).incrBy(latency); } } diff --git a/heron/common/src/java/com/twitter/heron/common/utils/metrics/SpoutMetrics.java b/heron/common/src/java/com/twitter/heron/common/utils/metrics/SpoutMetrics.java index 0cc3061e18d..7659be5b6a1 100644 --- a/heron/common/src/java/com/twitter/heron/common/utils/metrics/SpoutMetrics.java +++ b/heron/common/src/java/com/twitter/heron/common/utils/metrics/SpoutMetrics.java @@ -33,7 +33,7 @@ * 4. Expose methods which could be called externally to change the value of metrics */ -public class SpoutMetrics implements ComponentMetrics { +public class SpoutMetrics extends ComponentMetrics { private final CountMetric ackCount; private final ReducedMetric completeLatency; private final ReducedMetric failLatency;