Skip to content

Commit

Permalink
[INLONG-11320][Audit] Add a metric monitoring system for the Audit Pr…
Browse files Browse the repository at this point in the history
…oxy itself (#11359)
  • Loading branch information
doleyzi authored Oct 15, 2024
1 parent 1c21152 commit c807df3
Show file tree
Hide file tree
Showing 13 changed files with 382 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

public class ConfigManager {

Expand Down Expand Up @@ -94,12 +95,28 @@ public Map<String, String> getProperties(String fileName) {
return null;
}

public String getValue(String key) {
public <T> T getValue(String key, T defaultValue, Function<String, T> parser) {
ConfigHolder holder = holderMap.get(DEFAULT_CONFIG_PROPERTIES);
if (holder != null) {
return holder.getHolder().get(key);
if (holder == null) {
return defaultValue;
}
return null;
Object value = holder.getHolder().get(key);
if (value == null) {
return defaultValue;
}
try {
return parser.apply((String) value);
} catch (Exception e) {
return defaultValue;
}
}

public String getValue(String key, String defaultValue) {
return getValue(key, defaultValue, Function.identity());
}

public int getValue(String key, int defaultValue) {
return getValue(key, defaultValue, Integer::parseInt);
}

private boolean updatePropertiesHolder(Map<String, String> result,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.audit.metric;

public interface AbstractMetric {

public void report();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.audit.config;

/**
* Config constants
*/
public class ConfigConstants {

public static final String AUDIT_PROXY_SERVER_NAME = "audit-proxy";
public static final String KEY_PROMETHEUS_PORT = "audit.proxy.prometheus.port";
public static final int DEFAULT_PROMETHEUS_PORT = 10082;
public static final String KEY_PROXY_METRIC_CLASSNAME = "audit.proxy.metric.classname";
public static final String DEFAULT_PROXY_METRIC_CLASSNAME =
"org.apache.inlong.audit.metric.prometheus.ProxyPrometheusMetric";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.audit.metric;

public enum MetricDimension {

RECEIVE_COUNT_SUCCESS("receiveCountSuccess"),
RECEIVE_PACK_SUCCESS("receivePackSuccess"),
RECEIVE_SIZE_SUCCESS("receiveSizeSuccess"),
RECEIVE_COUNT_INVALID("receiveCountInvalid"),
RECEIVE_COUNT_EXPIRED("receiveCountExpired"),
SEND_COUNT_SUCCESS("sendCountSuccess"),
SEND_COUNT_FAILED("sendCountFailed"),
SEND_PACK_SUCCESS("sendPackSuccess"),
SEND_DURATION("sendDuration");

private final String key;

MetricDimension(String key) {
this.key = key;
}

public String getKey() {
return key;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.audit.metric;

import lombok.Data;

import java.util.concurrent.atomic.AtomicLong;

@Data
public class MetricItem {

public static final String K_DIMENSION_KEY = "dimensionName";
private AtomicLong receiveCountSuccess = new AtomicLong(0);
private AtomicLong receivePackSuccess = new AtomicLong(0);
private AtomicLong receiveSizeSuccess = new AtomicLong(0);
private AtomicLong receiveCountInvalid = new AtomicLong(0);
private AtomicLong receiveCountExpired = new AtomicLong(0);
private AtomicLong sendCountSuccess = new AtomicLong(0);
private AtomicLong sendCountFailed = new AtomicLong(0);
public void resetAllMetrics() {
receiveCountSuccess.set(0);
receivePackSuccess.set(0);
receiveSizeSuccess.set(0);
receiveCountInvalid.set(0);
receiveCountExpired.set(0);
sendCountSuccess.set(0);
sendCountFailed.set(0);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.audit.metric;

import org.apache.inlong.audit.file.ConfigManager;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_PROMETHEUS_PORT;
import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_PROXY_METRIC_CLASSNAME;
import static org.apache.inlong.audit.config.ConfigConstants.KEY_PROMETHEUS_PORT;
import static org.apache.inlong.audit.config.ConfigConstants.KEY_PROXY_METRIC_CLASSNAME;

public class MetricsManager {

private static final Logger LOGGER = LoggerFactory.getLogger(MetricsManager.class);

private static class Holder {

private static final MetricsManager INSTANCE = new MetricsManager();
}

private AbstractMetric metric;

public void init(String metricName) {
try {
ConfigManager configManager = ConfigManager.getInstance();
String metricClassName = configManager.getValue(KEY_PROXY_METRIC_CLASSNAME, DEFAULT_PROXY_METRIC_CLASSNAME);
LOGGER.info("Metric class name: {}", metricClassName);
Constructor<?> constructor = Class.forName(metricClassName)
.getDeclaredConstructor(String.class, MetricItem.class, int.class);
constructor.setAccessible(true);
metric = (AbstractMetric) constructor.newInstance(metricName, metricItem,
configManager.getValue(KEY_PROMETHEUS_PORT, DEFAULT_PROMETHEUS_PORT));

timer.scheduleWithFixedDelay(() -> {
metric.report();
metricItem.resetAllMetrics();
}, 0, 1, TimeUnit.MINUTES);
} catch (ClassNotFoundException | NoSuchMethodException | InstantiationException | IllegalAccessException
| InvocationTargetException exception) {
LOGGER.error("Init metrics manager has exception: ", exception);
}
}

public static MetricsManager getInstance() {
return Holder.INSTANCE;
}

private final MetricItem metricItem = new MetricItem();
protected final ScheduledExecutorService timer = Executors.newSingleThreadScheduledExecutor();

public void addReceiveCountInvalid(long count) {
metricItem.getReceiveCountInvalid().addAndGet(count);
}

public void addReceiveCountExpired(long count) {
metricItem.getReceiveCountExpired().addAndGet(count);
}

public void addReceiveSuccess(long count, long pack, long size) {
metricItem.getReceiveCountSuccess().addAndGet(count);
metricItem.getReceivePackSuccess().addAndGet(pack);
metricItem.getReceiveSizeSuccess().addAndGet(size);
}

public void addSendSuccess(long count) {
metricItem.getSendCountSuccess().addAndGet(count);
}
public void addSendFailed(long count) {
metricItem.getSendCountFailed().addAndGet(count);
}
public void shutdown() {
timer.shutdown();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.audit.metric.prometheus;

import org.apache.inlong.audit.metric.AbstractMetric;
import org.apache.inlong.audit.metric.MetricDimension;
import org.apache.inlong.audit.metric.MetricItem;

import io.prometheus.client.Collector;
import io.prometheus.client.exporter.HTTPServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

/**
* PrometheusMetric
*/
public class ProxyPrometheusMetric extends Collector implements AbstractMetric {

private static final Logger LOGGER = LoggerFactory.getLogger(ProxyPrometheusMetric.class);
private static final String HELP_DESCRIPTION = "help";

private final MetricItem metricItem;
private final String metricName;
private HTTPServer server;

public ProxyPrometheusMetric(String metricName, MetricItem metricItem, int prometheusPort) {
this.metricName = metricName;
this.metricItem = metricItem;
try {
server = new HTTPServer(prometheusPort);
this.register();
} catch (IOException e) {
LOGGER.error("Construct proxy prometheus metric has IOException", e);
}
}

@Override
public List<MetricFamilySamples> collect() {
List<MetricFamilySamples.Sample> samples = Arrays.asList(
createSample(MetricDimension.RECEIVE_COUNT_SUCCESS, metricItem.getReceiveCountSuccess().doubleValue()),
createSample(MetricDimension.RECEIVE_PACK_SUCCESS, metricItem.getReceivePackSuccess().doubleValue()),
createSample(MetricDimension.RECEIVE_SIZE_SUCCESS, metricItem.getReceiveSizeSuccess().doubleValue()),
createSample(MetricDimension.RECEIVE_COUNT_INVALID, metricItem.getReceiveCountInvalid().doubleValue()),
createSample(MetricDimension.RECEIVE_COUNT_EXPIRED, metricItem.getReceiveCountExpired().doubleValue()),
createSample(MetricDimension.SEND_COUNT_SUCCESS, metricItem.getSendCountSuccess().doubleValue()),
createSample(MetricDimension.SEND_COUNT_FAILED, metricItem.getSendCountFailed().doubleValue()));

MetricFamilySamples metricFamilySamples =
new MetricFamilySamples(metricName, Type.GAUGE, HELP_DESCRIPTION, samples);

return Collections.singletonList(metricFamilySamples);
}

private MetricFamilySamples.Sample createSample(MetricDimension key, double value) {
return new MetricFamilySamples.Sample(metricName, Collections.singletonList(MetricItem.K_DIMENSION_KEY),
Collections.singletonList(key.getKey()), value);
}

@Override
public void report() {
LOGGER.info("Report proxy prometheus metric: {} ", metricItem.toString());
}
}
Loading

0 comments on commit c807df3

Please sign in to comment.