diff --git a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/metric/AbstractMetric.java b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/metric/AbstractMetric.java index 4c2f627916e..a54995b8d66 100644 --- a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/metric/AbstractMetric.java +++ b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/metric/AbstractMetric.java @@ -20,4 +20,5 @@ public interface AbstractMetric { public void report(); + public void stop(); } diff --git a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/metric/MetricsManager.java b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/metric/MetricsManager.java index 433fc71848a..f27920159b6 100644 --- a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/metric/MetricsManager.java +++ b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/metric/MetricsManager.java @@ -94,5 +94,6 @@ public void addSendFailed(long count) { } public void shutdown() { timer.shutdown(); + metric.stop(); } } diff --git a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/metric/prometheus/ProxyPrometheusMetric.java b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/metric/prometheus/ProxyPrometheusMetric.java index 07c2397743e..0871a613b35 100644 --- a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/metric/prometheus/ProxyPrometheusMetric.java +++ b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/metric/prometheus/ProxyPrometheusMetric.java @@ -80,4 +80,9 @@ private MetricFamilySamples.Sample createSample(MetricDimension key, double valu public void report() { LOGGER.info("Report proxy prometheus metric: {} ", metricItem.toString()); } + + @Override + public void stop() { + server.close(); + } } \ No newline at end of file diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/Application.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/Application.java similarity index 97% rename from inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/Application.java rename to inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/Application.java index 0bcb3aa973f..46cc7212eb0 100644 --- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/Application.java +++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/Application.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.inlong.audit; +package org.apache.inlong.audit.store; import org.springframework.boot.WebApplicationType; import org.springframework.boot.autoconfigure.SpringBootApplication; diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/config/ConfigConstants.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/config/ConfigConstants.java new file mode 100644 index 00000000000..a0585efd376 --- /dev/null +++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/config/ConfigConstants.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.audit.store.config; + +/** + * Config constants + */ +public class ConfigConstants { + + public static final String AUDIT_STORE_SERVER_NAME = "audit-store"; + public static final String KEY_PROMETHEUS_PORT = "audit.store.prometheus.port"; + public static final int DEFAULT_PROMETHEUS_PORT = 10083; + public static final String KEY_STORE_METRIC_CLASSNAME = "audit.store.metric.classname"; + public static final String DEFAULT_STORE_METRIC_CLASSNAME = + "org.apache.inlong.audit.store.metric.prometheus.StorePrometheusMetric"; +} diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/JdbcConfig.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/config/JdbcConfig.java similarity index 97% rename from inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/JdbcConfig.java rename to inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/config/JdbcConfig.java index 42249200f09..0b6c7c36bb7 100644 --- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/JdbcConfig.java +++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/config/JdbcConfig.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.inlong.audit.config; +package org.apache.inlong.audit.store.config; import lombok.Data; import org.springframework.beans.factory.annotation.Value; diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/MessageQueueConfig.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/config/MessageQueueConfig.java similarity index 98% rename from inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/MessageQueueConfig.java rename to inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/config/MessageQueueConfig.java index 2cfa4b1d34e..3a4e6c6cae2 100644 --- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/MessageQueueConfig.java +++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/config/MessageQueueConfig.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.inlong.audit.config; +package org.apache.inlong.audit.store.config; import lombok.Getter; import lombok.Setter; diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/StoreConfig.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/config/StoreConfig.java similarity index 96% rename from inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/StoreConfig.java rename to inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/config/StoreConfig.java index ca3358701ed..abee9852fec 100644 --- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/StoreConfig.java +++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/config/StoreConfig.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.inlong.audit.config; +package org.apache.inlong.audit.store.config; import lombok.Getter; import lombok.Setter; diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/JdbcDataPo.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/entities/JdbcDataPo.java similarity index 96% rename from inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/JdbcDataPo.java rename to inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/entities/JdbcDataPo.java index ebc42f4a537..0c568225b3f 100644 --- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/JdbcDataPo.java +++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/entities/JdbcDataPo.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.inlong.audit.db.entities; +package org.apache.inlong.audit.store.entities; import lombok.Data; import org.apache.pulsar.client.api.Consumer; diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/MetricDimension.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/MetricDimension.java new file mode 100644 index 00000000000..02c2258dbc4 --- /dev/null +++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/MetricDimension.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.audit.store.metric; + +public enum MetricDimension { + + RECEIVE_COUNT_SUCCESS("receiveCountSuccess"), + RECEIVE_FAILED("receiveFailed"), + SEND_COUNT_SUCCESS("sendCountSuccess"), + SEND_COUNT_FAILED("sendCountFailed"), + SEND_DURATION("sendDuration"); + + private final String key; + + MetricDimension(String key) { + this.key = key; + } + + public String getKey() { + return key; + } +} diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/MetricItem.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/MetricItem.java new file mode 100644 index 00000000000..0e5dd9ad18b --- /dev/null +++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/MetricItem.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.audit.store.metric; + +import lombok.Data; + +import java.util.concurrent.atomic.AtomicLong; + +@Data +public class MetricItem { + + public static final String K_DIMENSION_KEY = "dimensionName"; + private AtomicLong receiveCountSuccess = new AtomicLong(0); + private AtomicLong receiveFailed = new AtomicLong(0); + private AtomicLong sendCountSuccess = new AtomicLong(0); + private AtomicLong sendCountFailed = new AtomicLong(0); + private AtomicLong sendDuration = new AtomicLong(0); + public void resetAllMetrics() { + receiveCountSuccess.set(0); + receiveFailed.set(0); + sendCountSuccess.set(0); + sendCountFailed.set(0); + sendDuration.set(0); + } +} diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/MetricsManager.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/MetricsManager.java new file mode 100644 index 00000000000..68b69609cfa --- /dev/null +++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/MetricsManager.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.audit.store.metric; + +import org.apache.inlong.audit.file.ConfigManager; +import org.apache.inlong.audit.metric.AbstractMetric; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import static org.apache.inlong.audit.store.config.ConfigConstants.DEFAULT_STORE_METRIC_CLASSNAME; +import static org.apache.inlong.audit.store.config.ConfigConstants.KEY_STORE_METRIC_CLASSNAME; + +public class MetricsManager { + + private static final Logger LOGGER = LoggerFactory.getLogger(MetricsManager.class); + + private static class Holder { + + private static final MetricsManager INSTANCE = new MetricsManager(); + } + + private AbstractMetric metric; + + public void init() { + try { + String metricClassName = + ConfigManager.getInstance().getValue(KEY_STORE_METRIC_CLASSNAME, DEFAULT_STORE_METRIC_CLASSNAME); + LOGGER.info("Metric class name: {}", metricClassName); + Constructor constructor = Class.forName(metricClassName) + .getDeclaredConstructor(MetricItem.class); + constructor.setAccessible(true); + metric = (AbstractMetric) constructor.newInstance(metricItem); + + timer.scheduleWithFixedDelay(() -> { + metric.report(); + metricItem.resetAllMetrics(); + }, 0, 1, TimeUnit.MINUTES); + } catch (ClassNotFoundException | NoSuchMethodException | InstantiationException | IllegalAccessException + | InvocationTargetException exception) { + LOGGER.error("Init metrics manager has exception: ", exception); + } + } + + public static MetricsManager getInstance() { + return Holder.INSTANCE; + } + + private final MetricItem metricItem = new MetricItem(); + protected final ScheduledExecutorService timer = Executors.newSingleThreadScheduledExecutor(); + + public void addReceiveSuccess(long count) { + metricItem.getReceiveCountSuccess().addAndGet(count); + } + + public void addReceiveFailed(long pack) { + metricItem.getReceiveFailed().addAndGet(pack); + } + + public void addSendSuccess(long count, long duration) { + metricItem.getSendCountSuccess().addAndGet(count); + metricItem.getSendDuration().addAndGet(duration); + } + + public void addSendFailed(long count, long duration) { + metricItem.getSendCountFailed().addAndGet(count); + metricItem.getSendDuration().addAndGet(duration); + } + + public void shutdown() { + timer.shutdown(); + metric.stop(); + } +} diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/prometheus/StorePrometheusMetric.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/prometheus/StorePrometheusMetric.java new file mode 100644 index 00000000000..6aa60feb6c7 --- /dev/null +++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/prometheus/StorePrometheusMetric.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.audit.store.metric.prometheus; + +import org.apache.inlong.audit.file.ConfigManager; +import org.apache.inlong.audit.metric.AbstractMetric; +import org.apache.inlong.audit.store.metric.MetricDimension; +import org.apache.inlong.audit.store.metric.MetricItem; + +import io.prometheus.client.Collector; +import io.prometheus.client.exporter.HTTPServer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.apache.inlong.audit.store.config.ConfigConstants.AUDIT_STORE_SERVER_NAME; +import static org.apache.inlong.audit.store.config.ConfigConstants.DEFAULT_PROMETHEUS_PORT; +import static org.apache.inlong.audit.store.config.ConfigConstants.KEY_PROMETHEUS_PORT; + +/** + * PrometheusMetric + */ +public class StorePrometheusMetric extends Collector implements AbstractMetric { + + private static final Logger LOGGER = LoggerFactory.getLogger(StorePrometheusMetric.class); + private static final String HELP_DESCRIPTION = "help"; + + private final MetricItem metricItem; + private HTTPServer server; + + public StorePrometheusMetric(MetricItem metricItem) { + this.metricItem = metricItem; + try { + server = new HTTPServer(ConfigManager.getInstance().getValue(KEY_PROMETHEUS_PORT, DEFAULT_PROMETHEUS_PORT)); + this.register(); + } catch (IOException e) { + LOGGER.error("Construct store prometheus metric has IOException", e); + } + } + + @Override + public List collect() { + List samples = Arrays.asList( + createSample(MetricDimension.RECEIVE_COUNT_SUCCESS, metricItem.getReceiveCountSuccess().doubleValue()), + createSample(MetricDimension.RECEIVE_FAILED, metricItem.getReceiveFailed().doubleValue()), + createSample(MetricDimension.SEND_COUNT_SUCCESS, metricItem.getSendCountSuccess().doubleValue()), + createSample(MetricDimension.SEND_COUNT_FAILED, metricItem.getSendCountFailed().doubleValue()), + createSample(MetricDimension.SEND_DURATION, metricItem.getSendDuration().doubleValue())); + + MetricFamilySamples metricFamilySamples = + new MetricFamilySamples(AUDIT_STORE_SERVER_NAME, Type.GAUGE, HELP_DESCRIPTION, samples); + + return Collections.singletonList(metricFamilySamples); + } + + private MetricFamilySamples.Sample createSample(MetricDimension key, double value) { + return new MetricFamilySamples.Sample(AUDIT_STORE_SERVER_NAME, + Collections.singletonList(MetricItem.K_DIMENSION_KEY), + Collections.singletonList(key.getKey()), value); + } + + @Override + public void report() { + LOGGER.info("Report store prometheus metric: {} ", metricItem.toString()); + } + + @Override + public void stop() { + server.close(); + } + +} \ No newline at end of file diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/AuditMsgConsumerServer.java similarity index 90% rename from inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java rename to inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/AuditMsgConsumerServer.java index 51ed0caf609..16251380684 100644 --- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java +++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/AuditMsgConsumerServer.java @@ -15,17 +15,18 @@ * limitations under the License. */ -package org.apache.inlong.audit.service; +package org.apache.inlong.audit.store.service; -import org.apache.inlong.audit.config.JdbcConfig; -import org.apache.inlong.audit.config.MessageQueueConfig; -import org.apache.inlong.audit.config.StoreConfig; import org.apache.inlong.audit.consts.ConfigConstants; import org.apache.inlong.audit.file.RemoteConfigJson; -import org.apache.inlong.audit.service.consume.BaseConsume; -import org.apache.inlong.audit.service.consume.KafkaConsume; -import org.apache.inlong.audit.service.consume.PulsarConsume; -import org.apache.inlong.audit.service.consume.TubeConsume; +import org.apache.inlong.audit.store.config.JdbcConfig; +import org.apache.inlong.audit.store.config.MessageQueueConfig; +import org.apache.inlong.audit.store.config.StoreConfig; +import org.apache.inlong.audit.store.metric.MetricsManager; +import org.apache.inlong.audit.store.service.consume.BaseConsume; +import org.apache.inlong.audit.store.service.consume.KafkaConsume; +import org.apache.inlong.audit.store.service.consume.PulsarConsume; +import org.apache.inlong.audit.store.service.consume.TubeConsume; import org.apache.inlong.common.constant.MQType; import org.apache.inlong.common.pojo.audit.AuditConfigRequest; import org.apache.inlong.common.pojo.audit.MQInfo; @@ -46,6 +47,8 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import javax.annotation.PreDestroy; + import java.io.InputStream; import java.util.ArrayList; import java.util.List; @@ -100,6 +103,8 @@ public void afterPropertiesSet() { if (mqConsume != null) { mqConsume.start(); } + + MetricsManager.getInstance().init(); } /** @@ -177,4 +182,9 @@ private List getMQConfig(String host, String clusterTag) { } return null; } + + @PreDestroy + public void shutdown() { + MetricsManager.getInstance().shutdown(); + } } diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/InsertData.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/InsertData.java similarity index 96% rename from inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/InsertData.java rename to inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/InsertData.java index 7f9bcf82074..ef038177ba1 100644 --- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/InsertData.java +++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/InsertData.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.inlong.audit.service; +package org.apache.inlong.audit.store.service; import org.apache.inlong.audit.protocol.AuditData; diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/JdbcService.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/JdbcService.java similarity index 94% rename from inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/JdbcService.java rename to inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/JdbcService.java index 7aaac09e643..68ba584d739 100644 --- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/JdbcService.java +++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/JdbcService.java @@ -15,11 +15,12 @@ * limitations under the License. */ -package org.apache.inlong.audit.service; +package org.apache.inlong.audit.store.service; -import org.apache.inlong.audit.config.JdbcConfig; -import org.apache.inlong.audit.db.entities.JdbcDataPo; import org.apache.inlong.audit.protocol.AuditData; +import org.apache.inlong.audit.store.config.JdbcConfig; +import org.apache.inlong.audit.store.entities.JdbcDataPo; +import org.apache.inlong.audit.store.metric.MetricsManager; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.MessageId; @@ -113,6 +114,9 @@ private void process() { private boolean executeBatch(List dataList) { boolean result = false; + + long currentTimestamp = System.currentTimeMillis(); + try (PreparedStatement statement = connection.prepareStatement(INSERT_SQL)) { for (JdbcDataPo data : dataList) { statement.setString(1, data.getIp()); @@ -135,7 +139,12 @@ private boolean executeBatch(List dataList) { statement.executeBatch(); connection.commit(); result = true; + + MetricsManager.getInstance().addSendSuccess(dataList.size(), System.currentTimeMillis() - currentTimestamp); + } catch (Exception exception) { + + MetricsManager.getInstance().addSendFailed(dataList.size(), System.currentTimeMillis() - currentTimestamp); LOG.error("Execute batch has failure!", exception); try { reconnect(); diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/BaseConsume.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/consume/BaseConsume.java similarity index 85% rename from inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/BaseConsume.java rename to inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/consume/BaseConsume.java index e0fbb7180ad..c92dcb29f64 100644 --- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/BaseConsume.java +++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/consume/BaseConsume.java @@ -15,12 +15,13 @@ * limitations under the License. */ -package org.apache.inlong.audit.service.consume; +package org.apache.inlong.audit.store.service.consume; -import org.apache.inlong.audit.config.MessageQueueConfig; -import org.apache.inlong.audit.config.StoreConfig; import org.apache.inlong.audit.protocol.AuditData; -import org.apache.inlong.audit.service.InsertData; +import org.apache.inlong.audit.store.config.MessageQueueConfig; +import org.apache.inlong.audit.store.config.StoreConfig; +import org.apache.inlong.audit.store.metric.MetricsManager; +import org.apache.inlong.audit.store.service.InsertData; import com.google.gson.Gson; import org.apache.pulsar.client.api.Consumer; @@ -56,6 +57,9 @@ public BaseConsume(List insertServiceList, StoreConfig storeConfig, */ protected void handleMessage(String body) throws Exception { AuditData msgBody = gson.fromJson(body, AuditData.class); + + MetricsManager.getInstance().addReceiveSuccess(1); + this.insertServiceList.forEach((service) -> { try { service.insert(msgBody); @@ -66,6 +70,9 @@ protected void handleMessage(String body) throws Exception { } protected void handleMessage(String body, Consumer consumer, MessageId messageId) { AuditData msgBody = gson.fromJson(body, AuditData.class); + + MetricsManager.getInstance().addReceiveSuccess(1); + this.insertServiceList.forEach((service) -> { try { service.insert(msgBody, consumer, messageId); diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/KafkaConsume.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/consume/KafkaConsume.java similarity index 95% rename from inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/KafkaConsume.java rename to inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/consume/KafkaConsume.java index cc0d8cac21c..34daef1c41a 100644 --- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/KafkaConsume.java +++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/consume/KafkaConsume.java @@ -15,11 +15,12 @@ * limitations under the License. */ -package org.apache.inlong.audit.service.consume; +package org.apache.inlong.audit.store.service.consume; -import org.apache.inlong.audit.config.MessageQueueConfig; -import org.apache.inlong.audit.config.StoreConfig; -import org.apache.inlong.audit.service.InsertData; +import org.apache.inlong.audit.store.config.MessageQueueConfig; +import org.apache.inlong.audit.store.config.StoreConfig; +import org.apache.inlong.audit.store.metric.MetricsManager; +import org.apache.inlong.audit.store.service.InsertData; import com.google.common.base.Preconditions; import org.apache.commons.lang3.StringUtils; @@ -188,6 +189,7 @@ public void run() { } } } catch (Exception e) { + MetricsManager.getInstance().addReceiveFailed(1); LOG.error("kafka consumer get message error {}", e.getMessage()); } } diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/PulsarConsume.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/consume/PulsarConsume.java similarity index 95% rename from inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/PulsarConsume.java rename to inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/consume/PulsarConsume.java index 7d8efc79ce2..c1a5fe92f25 100644 --- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/PulsarConsume.java +++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/consume/PulsarConsume.java @@ -15,11 +15,12 @@ * limitations under the License. */ -package org.apache.inlong.audit.service.consume; +package org.apache.inlong.audit.store.service.consume; -import org.apache.inlong.audit.config.MessageQueueConfig; -import org.apache.inlong.audit.config.StoreConfig; -import org.apache.inlong.audit.service.InsertData; +import org.apache.inlong.audit.store.config.MessageQueueConfig; +import org.apache.inlong.audit.store.config.StoreConfig; +import org.apache.inlong.audit.store.metric.MetricsManager; +import org.apache.inlong.audit.store.service.InsertData; import com.google.common.base.Preconditions; import org.apache.commons.lang3.StringUtils; @@ -131,6 +132,8 @@ public void received(Consumer consumer, Message msg) { String body = new String(msg.getData(), StandardCharsets.UTF_8); handleMessage(body, consumer, msg.getMessageId()); } catch (Exception e) { + MetricsManager.getInstance().addReceiveFailed(1); + LOG.error("Consumer has exception topic {}, subName {}, ex {}", topic, mqConfig.getPulsarConsumerSubName(), diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/TubeConsume.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/consume/TubeConsume.java similarity index 92% rename from inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/TubeConsume.java rename to inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/consume/TubeConsume.java index 06d1b5b1897..20cbd60e908 100644 --- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/TubeConsume.java +++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/consume/TubeConsume.java @@ -15,11 +15,12 @@ * limitations under the License. */ -package org.apache.inlong.audit.service.consume; +package org.apache.inlong.audit.store.service.consume; -import org.apache.inlong.audit.config.MessageQueueConfig; -import org.apache.inlong.audit.config.StoreConfig; -import org.apache.inlong.audit.service.InsertData; +import org.apache.inlong.audit.store.config.MessageQueueConfig; +import org.apache.inlong.audit.store.config.StoreConfig; +import org.apache.inlong.audit.store.metric.MetricsManager; +import org.apache.inlong.audit.store.service.InsertData; import org.apache.inlong.tubemq.client.config.ConsumerConfig; import org.apache.inlong.tubemq.client.consumer.ConsumePosition; import org.apache.inlong.tubemq.client.consumer.ConsumerResult; @@ -133,12 +134,15 @@ public void run() { } pullMessageConsumer.confirmConsume(csmResult.getConfirmContext(), true); } else { + MetricsManager.getInstance().addReceiveFailed(1); LOG.error("receive messages errorCode is {}, error meddage is {}", csmResult.getErrCode(), csmResult.getErrMsg()); } } catch (TubeClientException e) { + MetricsManager.getInstance().addReceiveFailed(1); LOG.error("tube consumer getMessage error {}", e.getMessage()); } catch (Exception e) { + MetricsManager.getInstance().addReceiveFailed(1); LOG.error("handle audit message error {}", e.getMessage()); } diff --git a/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/service/consume/KafkaConsumeTest.java b/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/store/service/consume/KafkaConsumeTest.java similarity index 87% rename from inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/service/consume/KafkaConsumeTest.java rename to inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/store/service/consume/KafkaConsumeTest.java index 5ddee887bd6..4568078f8f6 100644 --- a/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/service/consume/KafkaConsumeTest.java +++ b/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/store/service/consume/KafkaConsumeTest.java @@ -15,13 +15,13 @@ * limitations under the License. */ -package org.apache.inlong.audit.service.consume; +package org.apache.inlong.audit.store.service.consume; -import org.apache.inlong.audit.config.JdbcConfig; -import org.apache.inlong.audit.config.MessageQueueConfig; -import org.apache.inlong.audit.config.StoreConfig; -import org.apache.inlong.audit.service.InsertData; -import org.apache.inlong.audit.service.JdbcService; +import org.apache.inlong.audit.store.config.JdbcConfig; +import org.apache.inlong.audit.store.config.MessageQueueConfig; +import org.apache.inlong.audit.store.config.StoreConfig; +import org.apache.inlong.audit.store.service.InsertData; +import org.apache.inlong.audit.store.service.JdbcService; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; diff --git a/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/service/consume/PulsarConsumeTest.java b/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/store/service/consume/PulsarConsumeTest.java similarity index 97% rename from inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/service/consume/PulsarConsumeTest.java rename to inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/store/service/consume/PulsarConsumeTest.java index acc7ccd68ff..9cd9b241ef2 100644 --- a/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/service/consume/PulsarConsumeTest.java +++ b/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/store/service/consume/PulsarConsumeTest.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.inlong.audit.service.consume; +package org.apache.inlong.audit.store.service.consume; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.impl.PulsarClientImpl; diff --git a/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/service/consume/TubeConsumeTest.java b/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/store/service/consume/TubeConsumeTest.java similarity index 88% rename from inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/service/consume/TubeConsumeTest.java rename to inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/store/service/consume/TubeConsumeTest.java index 4085f295149..d1bb85c82bf 100644 --- a/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/service/consume/TubeConsumeTest.java +++ b/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/store/service/consume/TubeConsumeTest.java @@ -15,13 +15,13 @@ * limitations under the License. */ -package org.apache.inlong.audit.service.consume; +package org.apache.inlong.audit.store.service.consume; -import org.apache.inlong.audit.config.JdbcConfig; -import org.apache.inlong.audit.config.MessageQueueConfig; -import org.apache.inlong.audit.config.StoreConfig; -import org.apache.inlong.audit.service.InsertData; -import org.apache.inlong.audit.service.JdbcService; +import org.apache.inlong.audit.store.config.JdbcConfig; +import org.apache.inlong.audit.store.config.MessageQueueConfig; +import org.apache.inlong.audit.store.config.StoreConfig; +import org.apache.inlong.audit.store.service.InsertData; +import org.apache.inlong.audit.store.service.JdbcService; import org.apache.inlong.tubemq.client.consumer.ConsumerResult; import org.apache.inlong.tubemq.client.consumer.PullMessageConsumer; import org.apache.inlong.tubemq.client.exception.TubeClientException; diff --git a/inlong-audit/conf/application.properties b/inlong-audit/conf/application.properties index 063b01aa8ff..ad1b4e487f6 100644 --- a/inlong-audit/conf/application.properties +++ b/inlong-audit/conf/application.properties @@ -53,7 +53,13 @@ audit.store.jdbc.username=root audit.store.jdbc.password=inlong ############################ -# metric config +# Audit Proxy metric config # org.apache.inlong.audit.metric.prometheus.ProxyPrometheusMetric is the default monitoring ########################### -audit.proxy.metric.classname=org.apache.inlong.audit.metric.prometheus.ProxyPrometheusMetric \ No newline at end of file +audit.proxy.metric.classname=org.apache.inlong.audit.metric.prometheus.ProxyPrometheusMetric + +############################ +# Audit Store metric config +# org.apache.inlong.audit.store.metric.prometheus.StorePrometheusMetric is the default monitoring +########################### +audit.store.metric.classname=org.apache.inlong.audit.store.metric.prometheus.StorePrometheusMetric \ No newline at end of file