From 1b2bf3a77554191b3974f84b8130490dac943d5f Mon Sep 17 00:00:00 2001 From: Ning Wang Date: Sun, 19 Dec 2021 02:14:43 -0800 Subject: [PATCH] Add tagged metric support in MetricsCollector --- .../metric/ConcurrentCountMetricWithTag.java | 50 ++++++++++++ .../heron/api/metric/CountMetricWithTag.java | 81 +++++++++++++++++++ .../org/apache/heron/api/metric/IMetric.java | 11 +++ .../utils/metrics/MetricsCollector.java | 52 +++++++++--- .../metricsmgr/MetricsManagerServer.java | 3 +- heron/proto/metrics.proto | 1 + .../spi/metricsmgr/metrics/MetricsInfo.java | 23 +++++- 7 files changed, 207 insertions(+), 14 deletions(-) create mode 100644 heron/api/src/java/org/apache/heron/api/metric/ConcurrentCountMetricWithTag.java create mode 100644 heron/api/src/java/org/apache/heron/api/metric/CountMetricWithTag.java diff --git a/heron/api/src/java/org/apache/heron/api/metric/ConcurrentCountMetricWithTag.java b/heron/api/src/java/org/apache/heron/api/metric/ConcurrentCountMetricWithTag.java new file mode 100644 index 00000000000..eeb7a6e6e51 --- /dev/null +++ b/heron/api/src/java/org/apache/heron/api/metric/ConcurrentCountMetricWithTag.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.heron.api.metric; + +import java.util.List; +import java.util.Map; + +// A thread safe count metric +public class ConcurrentCountMetricWithTag implements IMetric { + private CountMetricWithTag counter; + + public ConcurrentCountMetricWithTag() { + counter = new CountMetricWithTag(); + } + + public synchronized void incr(String... tags) { + counter.incr(tags); + } + + public synchronized void incrBy(long incrementBy, String... tags) { + counter.incrBy(incrementBy, tags); + } + + @Override + public CountMetric getValueAndReset() { + return null; // Not needed. `getTaggedMetricsAndReset` should be used instead. + } + + @Override + public Map, CountMetric> getTaggedMetricsAndReset() { + return counter.getTaggedMetricsAndReset(); + } +} diff --git a/heron/api/src/java/org/apache/heron/api/metric/CountMetricWithTag.java b/heron/api/src/java/org/apache/heron/api/metric/CountMetricWithTag.java new file mode 100644 index 00000000000..01898f8d02b --- /dev/null +++ b/heron/api/src/java/org/apache/heron/api/metric/CountMetricWithTag.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.heron.api.metric; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class CountMetricWithTag implements IMetric { + /** + * List is the list of tags. The counters are aggregated by tag lists. + * For example, ["device:ios", "endpoint:v2"] is a entirely different entry from ["device:ios"] + */ + private Map, CountMetric> taggedCounts = new HashMap<>(); + + public CountMetricWithTag() { + } + + public void incr(String... tags) { + incrBy(1, tags); + } + + /** + * Increment the metrics with optional tags. + * Tags are comma separated strings: "tagName:tagValue". + * For example: "device:ios", and "endpoint:v2". + * Normally tag names and values should have limited values, + * otherwise, like "id:192.168.0.123", the memory utililization + * and the cost of tracking metrics could increase dramatically. + * @param tags optional comma separated tags, like "device:ios", "endpoint:v2". + * Normally tag names and values should have limited values, + */ + public void incrBy(long incrementBy, String... tags) { + if (tags.length > 0) { + Arrays.sort(tags); // Sort the tags to make the key deterministic. + List tagList = Arrays.asList(tags); + incrTaggedCountBy(tagList, incrementBy); + } else { + incrTaggedCountBy(Collections.emptyList(), incrementBy); + } + } + + private void incrTaggedCountBy(List tagList, long incrementBy) { + if (!taggedCounts.containsKey(tagList)) { + taggedCounts.put(tagList, new CountMetric()); + } + taggedCounts.get(tagList).incrBy(incrementBy); + } + + @Override + public CountMetric getValueAndReset() { + return null; // Not needed. `getTaggedMetricsAndReset` should be used instead. + } + + @Override + public Map, CountMetric> getTaggedMetricsAndReset() { + Map, CountMetric> ret = taggedCounts; + taggedCounts = new HashMap<>(); + return ret; + } +} diff --git a/heron/api/src/java/org/apache/heron/api/metric/IMetric.java b/heron/api/src/java/org/apache/heron/api/metric/IMetric.java index ae26a38f49d..5d04ed503ac 100644 --- a/heron/api/src/java/org/apache/heron/api/metric/IMetric.java +++ b/heron/api/src/java/org/apache/heron/api/metric/IMetric.java @@ -19,10 +19,21 @@ package org.apache.heron.api.metric; +import java.util.List; +import java.util.Map; + /** * Interface for a metric that can be tracked * @param the type of the metric value being tracked */ public interface IMetric { T getValueAndReset(); + + /** + * Get the pairs of the metric and reset it to the identity value. + * @return a map of pairs. Return null if this function is not supported. + */ + default Map, T> getTaggedMetricsAndReset() { + return null; + } } diff --git a/heron/common/src/java/org/apache/heron/common/utils/metrics/MetricsCollector.java b/heron/common/src/java/org/apache/heron/common/utils/metrics/MetricsCollector.java index f0b1166348e..69dee8ecb0d 100644 --- a/heron/common/src/java/org/apache/heron/common/utils/metrics/MetricsCollector.java +++ b/heron/common/src/java/org/apache/heron/common/utils/metrics/MetricsCollector.java @@ -110,12 +110,12 @@ public void forceGatherAllMetrics() { for (List metricNames : timeBucketToMetricNames.values()) { for (String metricName : metricNames) { - gatherOneMetric(metricName, builder); + gatherOneMetric(builder, metricName); } } metricCollectionCount.incr(); - addDataToMetricPublisher(builder, COLLECTION_COUNT_NAME, metricCollectionCount); + addDataToMetricPublisher(builder, COLLECTION_COUNT_NAME, metricCollectionCount, null); Metrics.MetricPublisherPublishMessage msg = builder.build(); @@ -124,7 +124,8 @@ public void forceGatherAllMetrics() { private void addDataToMetricPublisher(Metrics.MetricPublisherPublishMessage.Builder builder, String metricName, - Object metricValue) { + Object metricValue, + List tagList) { // Metric name is discarded if value is of type MetricsDatum or ExceptionData. if (metricValue instanceof Metrics.MetricDatum.Builder) { builder.addMetrics((Metrics.MetricDatum.Builder) metricValue); @@ -134,6 +135,11 @@ private void addDataToMetricPublisher(Metrics.MetricPublisherPublishMessage.Buil assert metricName != null; Metrics.MetricDatum.Builder d = Metrics.MetricDatum.newBuilder(); d.setName(metricName).setValue(metricValue.toString()); + if (tagList != null) { + for (String tag : tagList) { + d.addTag(tag); + } + } builder.addMetrics(d); } } @@ -149,12 +155,11 @@ private void gatherMetrics(final int timeBucketSizeInSecs) { Metrics.MetricPublisherPublishMessage.Builder builder = Metrics.MetricPublisherPublishMessage.newBuilder(); for (String metricName : timeBucketToMetricNames.get(timeBucketSizeInSecs)) { - gatherOneMetric(metricName, builder); + gatherOneMetric(builder, metricName); } metricCollectionCount.incr(); - addDataToMetricPublisher(builder, COLLECTION_COUNT_NAME, - metricCollectionCount.getValueAndReset()); + addDataToMetricPublisher(builder, COLLECTION_COUNT_NAME, metricCollectionCount.getValueAndReset(), null); Metrics.MetricPublisherPublishMessage msg = builder.build(); @@ -171,13 +176,36 @@ public void run() { } } - // Gather the value of given metricName, convert it into protobuf, + // Gather the value of given metricName, convert it into protobuf, // and add it to MetricPublisherPublishMessage builder given. @SuppressWarnings("unchecked") private void gatherOneMetric( + Metrics.MetricPublisherPublishMessage.Builder builder, + String metricName) { + IMetric metric = metrics.get(metricName); + + Map, IMetric> taggedMetrics = metric.getTaggedMetricsAndReset(); + if (taggedMetrics != null) { + // If taggedMetrics is not null, it means the metric is tagged, and + // the tags should be reported to MetricPublisher. No need to report + // the non-tagged value of the metric in this case. + for (Map.Entry, IMetric> entry : taggedMetrics.entrySet()) { + gatherOneMetricValue(builder, metricName, entry.getValue().getValueAndReset(), entry.getKey()); + } + } else { + // Regular metric without tag support. + Object metricValue = metric.getValueAndReset(); + gatherOneMetricValue(builder, metricName, metricValue, null); + } + } + + @SuppressWarnings("unchecked") + private void gatherOneMetricValue( + Metrics.MetricPublisherPublishMessage.Builder builder, String metricName, - Metrics.MetricPublisherPublishMessage.Builder builder) { - Object metricValue = metrics.get(metricName).getValueAndReset(); + Object metricValue, + List tagList) { + // Decide how to handle the metric based on type if (metricValue == null) { return; @@ -186,16 +214,16 @@ private void gatherOneMetric( for (Map.Entry entry : ((Map) metricValue).entrySet()) { if (entry.getKey() != null && entry.getValue() != null) { addDataToMetricPublisher( - builder, metricName + "/" + entry.getKey().toString(), entry.getValue()); + builder, metricName + "/" + entry.getKey().toString(), entry.getValue(), tagList); } } } else if (metricValue instanceof Collection) { int index = 0; for (Object value : (Collection) metricValue) { - addDataToMetricPublisher(builder, metricName + "/" + (index++), value); + addDataToMetricPublisher(builder, metricName + "/" + (index++), value, tagList); } } else { - addDataToMetricPublisher(builder, metricName, metricValue); + addDataToMetricPublisher(builder, metricName, metricValue, tagList); } } } diff --git a/heron/metricsmgr/src/java/org/apache/heron/metricsmgr/MetricsManagerServer.java b/heron/metricsmgr/src/java/org/apache/heron/metricsmgr/MetricsManagerServer.java index c940c7fcf20..02560b9eb3a 100644 --- a/heron/metricsmgr/src/java/org/apache/heron/metricsmgr/MetricsManagerServer.java +++ b/heron/metricsmgr/src/java/org/apache/heron/metricsmgr/MetricsManagerServer.java @@ -267,7 +267,8 @@ private void handlePublisherPublishMessage(Metrics.MetricPublisher request, List metricsInfos = new ArrayList(message.getMetricsCount()); for (Metrics.MetricDatum metricDatum : message.getMetricsList()) { - MetricsInfo info = new MetricsInfo(metricDatum.getName(), metricDatum.getValue()); + // TODO: support tags. metricDatum.getTags() + MetricsInfo info = new MetricsInfo(metricDatum.getName(), metricDatum.getValue(), metricDatum.getTagList()); metricsInfos.add(info); } diff --git a/heron/proto/metrics.proto b/heron/proto/metrics.proto index 51dd35e9920..14723e2bcee 100644 --- a/heron/proto/metrics.proto +++ b/heron/proto/metrics.proto @@ -34,6 +34,7 @@ import "tmanager.proto"; message MetricDatum { required string name = 1; required string value = 2; + repeated string tag = 3; } message ExceptionData { diff --git a/heron/spi/src/java/org/apache/heron/spi/metricsmgr/metrics/MetricsInfo.java b/heron/spi/src/java/org/apache/heron/spi/metricsmgr/metrics/MetricsInfo.java index 5076c17ffbf..fd0958267d6 100644 --- a/heron/spi/src/java/org/apache/heron/spi/metricsmgr/metrics/MetricsInfo.java +++ b/heron/spi/src/java/org/apache/heron/spi/metricsmgr/metrics/MetricsInfo.java @@ -19,6 +19,8 @@ package org.apache.heron.spi.metricsmgr.metrics; +import java.util.List; + /** * An immutable class providing a view of MetricsInfo * The value is in type String, and IMetricsSink would determine how to parse it. @@ -26,10 +28,16 @@ public class MetricsInfo { private final String name; private final String value; + private final List tags; public MetricsInfo(String name, String value) { + this(name, value, null); + } + + public MetricsInfo(String name, String value, List tags) { this.name = name; this.value = value; + this.tags = tags; } /** @@ -50,8 +58,21 @@ public String getValue() { return value; } + /** + * Get the tags of the metric + * + * @return the tags of the metric + */ + public List getTags() { + return tags; + } + @Override public String toString() { - return String.format("%s = %s", getName(), getValue()); + if (tags == null) { + return name + "=" + value; + } else { + return name + "=" + value + " tags=" + tags; + } } }