diff --git a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/file/ConfigManager.java b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/file/ConfigManager.java index 68745efd232..62a2f551cae 100644 --- a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/file/ConfigManager.java +++ b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/file/ConfigManager.java @@ -41,6 +41,7 @@ import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import java.util.function.Function; public class ConfigManager { @@ -94,12 +95,28 @@ public Map getProperties(String fileName) { return null; } - public String getValue(String key) { + public T getValue(String key, T defaultValue, Function parser) { ConfigHolder holder = holderMap.get(DEFAULT_CONFIG_PROPERTIES); - if (holder != null) { - return holder.getHolder().get(key); + if (holder == null) { + return defaultValue; } - return null; + Object value = holder.getHolder().get(key); + if (value == null) { + return defaultValue; + } + try { + return parser.apply((String) value); + } catch (Exception e) { + return defaultValue; + } + } + + public String getValue(String key, String defaultValue) { + return getValue(key, defaultValue, Function.identity()); + } + + public int getValue(String key, int defaultValue) { + return getValue(key, defaultValue, Integer::parseInt); } private boolean updatePropertiesHolder(Map result, diff --git a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/metric/AbstractMetric.java b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/metric/AbstractMetric.java new file mode 100644 index 00000000000..4c2f627916e --- /dev/null +++ b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/metric/AbstractMetric.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.audit.metric; + +public interface AbstractMetric { + + public void report(); +} diff --git a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/config/ConfigConstants.java b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/config/ConfigConstants.java new file mode 100644 index 00000000000..67d5183ce58 --- /dev/null +++ b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/config/ConfigConstants.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.audit.config; + +/** + * Config constants + */ +public class ConfigConstants { + + public static final String AUDIT_PROXY_SERVER_NAME = "audit-proxy"; + public static final String KEY_PROMETHEUS_PORT = "audit.proxy.prometheus.port"; + public static final int DEFAULT_PROMETHEUS_PORT = 10082; + public static final String KEY_PROXY_METRIC_CLASSNAME = "audit.proxy.metric.classname"; + public static final String DEFAULT_PROXY_METRIC_CLASSNAME = + "org.apache.inlong.audit.metric.prometheus.ProxyPrometheusMetric"; +} diff --git a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/metric/MetricDimension.java b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/metric/MetricDimension.java new file mode 100644 index 00000000000..3f410330c02 --- /dev/null +++ b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/metric/MetricDimension.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.audit.metric; + +public enum MetricDimension { + + RECEIVE_COUNT_SUCCESS("receiveCountSuccess"), + RECEIVE_PACK_SUCCESS("receivePackSuccess"), + RECEIVE_SIZE_SUCCESS("receiveSizeSuccess"), + RECEIVE_COUNT_INVALID("receiveCountInvalid"), + RECEIVE_COUNT_EXPIRED("receiveCountExpired"), + SEND_COUNT_SUCCESS("sendCountSuccess"), + SEND_COUNT_FAILED("sendCountFailed"), + SEND_PACK_SUCCESS("sendPackSuccess"), + SEND_DURATION("sendDuration"); + + private final String key; + + MetricDimension(String key) { + this.key = key; + } + + public String getKey() { + return key; + } +} diff --git a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/metric/MetricItem.java b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/metric/MetricItem.java new file mode 100644 index 00000000000..a95f4e3484f --- /dev/null +++ b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/metric/MetricItem.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.audit.metric; + +import lombok.Data; + +import java.util.concurrent.atomic.AtomicLong; + +@Data +public class MetricItem { + + public static final String K_DIMENSION_KEY = "dimensionName"; + private AtomicLong receiveCountSuccess = new AtomicLong(0); + private AtomicLong receivePackSuccess = new AtomicLong(0); + private AtomicLong receiveSizeSuccess = new AtomicLong(0); + private AtomicLong receiveCountInvalid = new AtomicLong(0); + private AtomicLong receiveCountExpired = new AtomicLong(0); + private AtomicLong sendCountSuccess = new AtomicLong(0); + private AtomicLong sendCountFailed = new AtomicLong(0); + public void resetAllMetrics() { + receiveCountSuccess.set(0); + receivePackSuccess.set(0); + receiveSizeSuccess.set(0); + receiveCountInvalid.set(0); + receiveCountExpired.set(0); + sendCountSuccess.set(0); + sendCountFailed.set(0); + } +} diff --git a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/metric/MetricsManager.java b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/metric/MetricsManager.java new file mode 100644 index 00000000000..433fc71848a --- /dev/null +++ b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/metric/MetricsManager.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.audit.metric; + +import org.apache.inlong.audit.file.ConfigManager; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_PROMETHEUS_PORT; +import static org.apache.inlong.audit.config.ConfigConstants.DEFAULT_PROXY_METRIC_CLASSNAME; +import static org.apache.inlong.audit.config.ConfigConstants.KEY_PROMETHEUS_PORT; +import static org.apache.inlong.audit.config.ConfigConstants.KEY_PROXY_METRIC_CLASSNAME; + +public class MetricsManager { + + private static final Logger LOGGER = LoggerFactory.getLogger(MetricsManager.class); + + private static class Holder { + + private static final MetricsManager INSTANCE = new MetricsManager(); + } + + private AbstractMetric metric; + + public void init(String metricName) { + try { + ConfigManager configManager = ConfigManager.getInstance(); + String metricClassName = configManager.getValue(KEY_PROXY_METRIC_CLASSNAME, DEFAULT_PROXY_METRIC_CLASSNAME); + LOGGER.info("Metric class name: {}", metricClassName); + Constructor constructor = Class.forName(metricClassName) + .getDeclaredConstructor(String.class, MetricItem.class, int.class); + constructor.setAccessible(true); + metric = (AbstractMetric) constructor.newInstance(metricName, metricItem, + configManager.getValue(KEY_PROMETHEUS_PORT, DEFAULT_PROMETHEUS_PORT)); + + timer.scheduleWithFixedDelay(() -> { + metric.report(); + metricItem.resetAllMetrics(); + }, 0, 1, TimeUnit.MINUTES); + } catch (ClassNotFoundException | NoSuchMethodException | InstantiationException | IllegalAccessException + | InvocationTargetException exception) { + LOGGER.error("Init metrics manager has exception: ", exception); + } + } + + public static MetricsManager getInstance() { + return Holder.INSTANCE; + } + + private final MetricItem metricItem = new MetricItem(); + protected final ScheduledExecutorService timer = Executors.newSingleThreadScheduledExecutor(); + + public void addReceiveCountInvalid(long count) { + metricItem.getReceiveCountInvalid().addAndGet(count); + } + + public void addReceiveCountExpired(long count) { + metricItem.getReceiveCountExpired().addAndGet(count); + } + + public void addReceiveSuccess(long count, long pack, long size) { + metricItem.getReceiveCountSuccess().addAndGet(count); + metricItem.getReceivePackSuccess().addAndGet(pack); + metricItem.getReceiveSizeSuccess().addAndGet(size); + } + + public void addSendSuccess(long count) { + metricItem.getSendCountSuccess().addAndGet(count); + } + public void addSendFailed(long count) { + metricItem.getSendCountFailed().addAndGet(count); + } + public void shutdown() { + timer.shutdown(); + } +} diff --git a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/metric/prometheus/ProxyPrometheusMetric.java b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/metric/prometheus/ProxyPrometheusMetric.java new file mode 100644 index 00000000000..07c2397743e --- /dev/null +++ b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/metric/prometheus/ProxyPrometheusMetric.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.audit.metric.prometheus; + +import org.apache.inlong.audit.metric.AbstractMetric; +import org.apache.inlong.audit.metric.MetricDimension; +import org.apache.inlong.audit.metric.MetricItem; + +import io.prometheus.client.Collector; +import io.prometheus.client.exporter.HTTPServer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +/** + * PrometheusMetric + */ +public class ProxyPrometheusMetric extends Collector implements AbstractMetric { + + private static final Logger LOGGER = LoggerFactory.getLogger(ProxyPrometheusMetric.class); + private static final String HELP_DESCRIPTION = "help"; + + private final MetricItem metricItem; + private final String metricName; + private HTTPServer server; + + public ProxyPrometheusMetric(String metricName, MetricItem metricItem, int prometheusPort) { + this.metricName = metricName; + this.metricItem = metricItem; + try { + server = new HTTPServer(prometheusPort); + this.register(); + } catch (IOException e) { + LOGGER.error("Construct proxy prometheus metric has IOException", e); + } + } + + @Override + public List collect() { + List samples = Arrays.asList( + createSample(MetricDimension.RECEIVE_COUNT_SUCCESS, metricItem.getReceiveCountSuccess().doubleValue()), + createSample(MetricDimension.RECEIVE_PACK_SUCCESS, metricItem.getReceivePackSuccess().doubleValue()), + createSample(MetricDimension.RECEIVE_SIZE_SUCCESS, metricItem.getReceiveSizeSuccess().doubleValue()), + createSample(MetricDimension.RECEIVE_COUNT_INVALID, metricItem.getReceiveCountInvalid().doubleValue()), + createSample(MetricDimension.RECEIVE_COUNT_EXPIRED, metricItem.getReceiveCountExpired().doubleValue()), + createSample(MetricDimension.SEND_COUNT_SUCCESS, metricItem.getSendCountSuccess().doubleValue()), + createSample(MetricDimension.SEND_COUNT_FAILED, metricItem.getSendCountFailed().doubleValue())); + + MetricFamilySamples metricFamilySamples = + new MetricFamilySamples(metricName, Type.GAUGE, HELP_DESCRIPTION, samples); + + return Collections.singletonList(metricFamilySamples); + } + + private MetricFamilySamples.Sample createSample(MetricDimension key, double value) { + return new MetricFamilySamples.Sample(metricName, Collections.singletonList(MetricItem.K_DIMENSION_KEY), + Collections.singletonList(key.getKey()), value); + } + + @Override + public void report() { + LOGGER.info("Report proxy prometheus metric: {} ", metricItem.toString()); + } +} \ No newline at end of file diff --git a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/node/Application.java b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/node/Application.java index 4ab7f29e515..1aa7f6b7d06 100644 --- a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/node/Application.java +++ b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/node/Application.java @@ -18,6 +18,7 @@ package org.apache.inlong.audit.node; import org.apache.inlong.audit.file.ConfigManager; +import org.apache.inlong.audit.metric.MetricsManager; import com.google.common.base.Throwables; import com.google.common.collect.Lists; @@ -58,8 +59,9 @@ import java.util.Set; import java.util.concurrent.locks.ReentrantLock; +import static org.apache.inlong.audit.config.ConfigConstants.AUDIT_PROXY_SERVER_NAME; + /** - * * Application */ public class Application { @@ -259,6 +261,7 @@ private void loadMonitoring() { /** * main + * * @param args */ public static void main(String[] args) { @@ -344,9 +347,12 @@ public static void main(String[] args) { @Override public void run() { appReference.stop(); + MetricsManager.getInstance().shutdown(); } }); + MetricsManager.getInstance().init(AUDIT_PROXY_SERVER_NAME); + } catch (Exception e) { logger.error("A fatal error occurred while running. Exception follows.", e); } diff --git a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/KafkaSink.java b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/KafkaSink.java index dc2c7c154d7..db2a63c46d8 100644 --- a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/KafkaSink.java +++ b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/KafkaSink.java @@ -19,6 +19,7 @@ import org.apache.inlong.audit.base.HighPriorityThreadFactory; import org.apache.inlong.audit.file.ConfigManager; +import org.apache.inlong.audit.metric.MetricsManager; import org.apache.inlong.audit.utils.FailoverChannelProcessorHolder; import org.apache.inlong.common.constant.MQType; import org.apache.inlong.common.pojo.audit.MQInfo; @@ -385,6 +386,8 @@ public void run() { } public void handleMessageSendSuccess(EventStat es) { + MetricsManager.getInstance().addSendSuccess(1); + // Statistics tube performance totalKafkaSuccSendCnt.incrementAndGet(); totalKafkaSuccSendSize.addAndGet(es.getEvent().getBody().length); @@ -494,6 +497,8 @@ private boolean sendMessage(Event event, String topic, EventStat es) { } else { logger.warn("Send message failed, error message: {}, resendQueue size: {}, event:{}", e.getMessage(), resendQueue.size(), es.getEvent().hashCode()); + + MetricsManager.getInstance().addSendFailed(1); } es.incRetryCnt(); diff --git a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/PulsarSink.java b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/PulsarSink.java index a081c32cb88..c99cb9ce168 100644 --- a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/PulsarSink.java +++ b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/PulsarSink.java @@ -18,6 +18,7 @@ package org.apache.inlong.audit.sink; import org.apache.inlong.audit.base.HighPriorityThreadFactory; +import org.apache.inlong.audit.metric.MetricsManager; import org.apache.inlong.audit.sink.pulsar.CreatePulsarClientCallBack; import org.apache.inlong.audit.sink.pulsar.PulsarClientService; import org.apache.inlong.audit.sink.pulsar.SendMessageCallBack; @@ -319,6 +320,7 @@ public void handleCreateClientException(String url) { @Override public void handleMessageSendSuccess(Object result, EventStat eventStat) { + MetricsManager.getInstance().addSendSuccess(1); /* * Statistics pulsar performance */ @@ -346,6 +348,7 @@ public void handleMessageSendSuccess(Object result, EventStat eventStat) { @Override public void handleMessageSendException(EventStat eventStat, Object e) { + MetricsManager.getInstance().addSendFailed(1); if (e instanceof TooLongFrameException) { PulsarSink.this.overflow = true; } else if (e instanceof ProducerQueueIsFullError) { diff --git a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/TubeSink.java b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/TubeSink.java index c6cefcb0887..f2eab52615a 100644 --- a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/TubeSink.java +++ b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/TubeSink.java @@ -20,6 +20,7 @@ import org.apache.inlong.audit.base.HighPriorityThreadFactory; import org.apache.inlong.audit.consts.ConfigConstants; import org.apache.inlong.audit.file.ConfigManager; +import org.apache.inlong.audit.metric.MetricsManager; import org.apache.inlong.audit.utils.FailoverChannelProcessorHolder; import org.apache.inlong.common.constant.MQType; import org.apache.inlong.common.pojo.audit.MQInfo; @@ -313,6 +314,8 @@ public void configure(Context context) { * Send message of success. */ public void handleMessageSendSuccess(EventStat es) { + MetricsManager.getInstance().addSendSuccess(1); + // Statistics tube performance totalTubeSuccSendCnt.incrementAndGet(); totalTubeSuccSendSize.addAndGet(es.getEvent().getBody().length); @@ -630,6 +633,8 @@ public void onMessageSent(final MessageSentResult result) { return; } + MetricsManager.getInstance().addSendFailed(1); + // handle sent error if (result.getErrCode() == TErrCodeConstants.FORBIDDEN) { logger.warn("Send message failed, error message: {}, resendQueue size: {}, event:{}", diff --git a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/source/ServerMessageHandler.java b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/source/ServerMessageHandler.java index 7768ef5e8cd..7595586dd4c 100644 --- a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/source/ServerMessageHandler.java +++ b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/source/ServerMessageHandler.java @@ -17,6 +17,7 @@ package org.apache.inlong.audit.source; +import org.apache.inlong.audit.metric.MetricsManager; import org.apache.inlong.audit.protocol.AuditApi.AuditMessageBody; import org.apache.inlong.audit.protocol.AuditApi.AuditReply; import org.apache.inlong.audit.protocol.AuditApi.AuditReply.RSP_CODE; @@ -142,13 +143,20 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception private AuditReply handleRequest(AuditRequest auditRequest) throws Exception { if (auditRequest == null) { + + MetricsManager.getInstance().addReceiveCountInvalid(1); + throw new Exception("audit request cannot be null"); } + AuditReply reply = AuditReply.newBuilder() .setRequestId(auditRequest.getRequestId()) .setRspCode(RSP_CODE.SUCCESS) .build(); List bodyList = auditRequest.getMsgBodyList(); + + MetricsManager.getInstance().addReceiveSuccess(bodyList.size(), 1, auditRequest.getSerializedSize()); + int errorMsgBody = 0; LOGGER.debug("Receive message count: {}", auditRequest.getMsgBodyCount()); for (AuditMessageBody auditMessageBody : bodyList) { @@ -156,6 +164,9 @@ private AuditReply handleRequest(AuditRequest auditRequest) throws Exception { if (msgDays >= this.msgValidThresholdDays) { LOGGER.debug("Discard the data as it is from {} days ago, only the data with a log timestamp" + " less than {} days is valid", msgDays, this.msgValidThresholdDays); + + MetricsManager.getInstance().addReceiveCountExpired(1); + continue; } AuditData auditData = new AuditData(); @@ -194,6 +205,9 @@ private AuditReply handleRequest(AuditRequest auditRequest) throws Exception { } if (errorMsgBody != 0) { + + MetricsManager.getInstance().addReceiveCountInvalid(errorMsgBody); + reply = reply.toBuilder() .setMessage("writing data error, discard it, error body count=" + errorMsgBody) .setRspCode(RSP_CODE.FAILED) diff --git a/inlong-audit/conf/application.properties b/inlong-audit/conf/application.properties index 17fd3461f62..063b01aa8ff 100644 --- a/inlong-audit/conf/application.properties +++ b/inlong-audit/conf/application.properties @@ -50,4 +50,10 @@ audit.kafka.group.id=audit-consumer-group audit.store.jdbc.driver=com.mysql.cj.jdbc.Driver audit.store.jdbc.url=jdbc:mysql://127.0.0.1:3306/apache_inlong_audit?characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2b8&rewriteBatchedStatements=true&allowMultiQueries=true&zeroDateTimeBehavior=CONVERT_TO_NULL audit.store.jdbc.username=root -audit.store.jdbc.password=inlong \ No newline at end of file +audit.store.jdbc.password=inlong + +############################ +# metric config +# org.apache.inlong.audit.metric.prometheus.ProxyPrometheusMetric is the default monitoring +########################### +audit.proxy.metric.classname=org.apache.inlong.audit.metric.prometheus.ProxyPrometheusMetric \ No newline at end of file