From 9348626926f1354b5b7e994926a9254d32136f77 Mon Sep 17 00:00:00 2001 From: PeterZh6 Date: Wed, 16 Oct 2024 11:26:11 +0800 Subject: [PATCH 1/4] [INLONG-11357][Sort] Add new source metrics for sort-connector-sqlserver-cdc-v1.15 (#11358) --- .../sqlserver/DebeziumSourceFunction.java | 70 ++++++++++++++---- .../RowDataDebeziumDeserializeSchema.java | 71 ++++++++++++------- .../sort/sqlserver/SqlServerSource.java | 11 ++- .../sort/sqlserver/SqlServerTableSource.java | 1 + 4 files changed, 113 insertions(+), 40 deletions(-) diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/DebeziumSourceFunction.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/DebeziumSourceFunction.java index 01118d65137..c480ad1d454 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/DebeziumSourceFunction.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/DebeziumSourceFunction.java @@ -17,6 +17,9 @@ package org.apache.inlong.sort.sqlserver; +import org.apache.inlong.sort.base.metric.MetricOption; +import org.apache.inlong.sort.base.metric.SourceExactlyMetric; + import com.ververica.cdc.debezium.Validator; import com.ververica.cdc.debezium.internal.DebeziumChangeConsumer; import com.ververica.cdc.debezium.internal.DebeziumOffset; @@ -61,6 +64,8 @@ import java.lang.reflect.Method; import java.nio.charset.StandardCharsets; import java.util.Collection; +import java.util.HashMap; +import java.util.Map; import java.util.Properties; import java.util.UUID; import java.util.concurrent.ConcurrentLinkedQueue; @@ -199,17 +204,24 @@ public class DebeziumSourceFunction extends RichSourceFunction /** Buffer the events from the source and record the errors from the debezium. */ private transient Handover handover; + private transient SourceExactlyMetric sourceExactlyMetric; + + private final MetricOption metricOption; + + private transient Map checkpointStartTimeMap; + // --------------------------------------------------------------------------------------- public DebeziumSourceFunction( DebeziumDeserializationSchema deserializer, Properties properties, @Nullable DebeziumOffset specificOffset, - Validator validator) { + Validator validator, MetricOption metricOption) { this.deserializer = deserializer; this.properties = properties; this.specificOffset = specificOffset; this.validator = validator; + this.metricOption = metricOption; } @Override @@ -222,6 +234,14 @@ public void open(Configuration parameters) throws Exception { this.executor = Executors.newSingleThreadExecutor(threadFactory); this.handover = new Handover(); this.changeConsumer = new DebeziumChangeConsumer(handover); + if (metricOption != null) { + sourceExactlyMetric = new SourceExactlyMetric(metricOption, getRuntimeContext().getMetricGroup()); + } + if (deserializer instanceof RowDataDebeziumDeserializeSchema) { + ((RowDataDebeziumDeserializeSchema) deserializer) + .setSourceExactlyMetric(sourceExactlyMetric); + } + this.checkpointStartTimeMap = new HashMap<>(); } // ------------------------------------------------------------------------ @@ -306,17 +326,33 @@ private void restoreHistoryRecordsState() throws Exception { @Override public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception { - if (handover.hasError()) { - LOG.debug("snapshotState() called on closed source"); - throw new FlinkRuntimeException( - "Call snapshotState() on closed source, checkpoint failed."); - } else { - snapshotOffsetState(functionSnapshotContext.getCheckpointId()); - snapshotHistoryRecordsState(); - } - if (deserializer instanceof RowDataDebeziumDeserializeSchema) { - ((RowDataDebeziumDeserializeSchema) deserializer) - .updateCurrentCheckpointId(functionSnapshotContext.getCheckpointId()); + try { + if (handover.hasError()) { + LOG.debug("snapshotState() called on closed source"); + throw new FlinkRuntimeException( + "Call snapshotState() on closed source, checkpoint failed."); + } else { + snapshotOffsetState(functionSnapshotContext.getCheckpointId()); + snapshotHistoryRecordsState(); + } + if (deserializer instanceof RowDataDebeziumDeserializeSchema) { + ((RowDataDebeziumDeserializeSchema) deserializer) + .updateCurrentCheckpointId(functionSnapshotContext.getCheckpointId()); + } + if (checkpointStartTimeMap != null) { + checkpointStartTimeMap.put(functionSnapshotContext.getCheckpointId(), System.currentTimeMillis()); + } else { + LOG.error("checkpointStartTimeMap is null, can't record the start time of checkpoint"); + } + + if (sourceExactlyMetric != null) { + sourceExactlyMetric.incNumSnapshotCreate(); + } + } catch (Exception e) { + if (sourceExactlyMetric != null) { + sourceExactlyMetric.incNumSnapshotCreate(); + } + throw e; } } @@ -498,6 +534,16 @@ public void notifyCheckpointComplete(long checkpointId) { schema.flushAudit(); schema.updateLastCheckpointId(checkpointId); } + if (checkpointStartTimeMap != null) { + Long snapShotStartTimeById = checkpointStartTimeMap.remove(checkpointId); + if (snapShotStartTimeById != null && sourceExactlyMetric != null) { + sourceExactlyMetric.incNumSnapshotComplete(); + sourceExactlyMetric.recordSnapshotToCheckpointDelay( + System.currentTimeMillis() - snapShotStartTimeById); + } + } else { + LOG.error("checkpointStartTimeMap is null, can't get the start time of checkpoint"); + } } catch (Exception e) { // ignore exception if we are no longer running LOG.warn("Ignore error when committing offset to database.", e); diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/RowDataDebeziumDeserializeSchema.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/RowDataDebeziumDeserializeSchema.java index d90f4705131..394ee0297be 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/RowDataDebeziumDeserializeSchema.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/RowDataDebeziumDeserializeSchema.java @@ -139,37 +139,49 @@ public void open() { @Override public void deserialize(SourceRecord record, Collector out) throws Exception { - Envelope.Operation op = Envelope.operationFor(record); - Struct value = (Struct) record.value(); - Schema valueSchema = record.valueSchema(); - if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) { - GenericRowData insert = extractAfterRow(value, valueSchema); - validator.validate(insert, RowKind.INSERT); - insert.setRowKind(RowKind.INSERT); - if (sourceExactlyMetric != null) { - out = new MetricsCollector<>(out, sourceExactlyMetric); + long deserializeStartTime = System.currentTimeMillis(); + try { + Envelope.Operation op = Envelope.operationFor(record); + Struct value = (Struct) record.value(); + Schema valueSchema = record.valueSchema(); + if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) { + GenericRowData insert = extractAfterRow(value, valueSchema); + validator.validate(insert, RowKind.INSERT); + insert.setRowKind(RowKind.INSERT); + if (sourceExactlyMetric != null) { + out = new MetricsCollector<>(out, sourceExactlyMetric); + } + emit(record, insert, out); + } else if (op == Envelope.Operation.DELETE) { + GenericRowData delete = extractBeforeRow(value, valueSchema); + validator.validate(delete, RowKind.DELETE); + delete.setRowKind(RowKind.DELETE); + emit(record, delete, out); + } else { + if (changelogMode == DebeziumChangelogMode.ALL) { + GenericRowData before = extractBeforeRow(value, valueSchema); + validator.validate(before, RowKind.UPDATE_BEFORE); + before.setRowKind(RowKind.UPDATE_BEFORE); + emit(record, before, out); + } + + GenericRowData after = extractAfterRow(value, valueSchema); + validator.validate(after, RowKind.UPDATE_AFTER); + after.setRowKind(RowKind.UPDATE_AFTER); + if (sourceExactlyMetric != null) { + out = new MetricsCollector<>(out, sourceExactlyMetric); + } + emit(record, after, out); } - emit(record, insert, out); - } else if (op == Envelope.Operation.DELETE) { - GenericRowData delete = extractBeforeRow(value, valueSchema); - validator.validate(delete, RowKind.DELETE); - delete.setRowKind(RowKind.DELETE); - emit(record, delete, out); - } else { - if (changelogMode == DebeziumChangelogMode.ALL) { - GenericRowData before = extractBeforeRow(value, valueSchema); - validator.validate(before, RowKind.UPDATE_BEFORE); - before.setRowKind(RowKind.UPDATE_BEFORE); - emit(record, before, out); + if (sourceExactlyMetric != null) { + sourceExactlyMetric.incNumDeserializeSuccess(); + sourceExactlyMetric.recordDeserializeDelay(System.currentTimeMillis() - deserializeStartTime); } - - GenericRowData after = extractAfterRow(value, valueSchema); - validator.validate(after, RowKind.UPDATE_AFTER); - after.setRowKind(RowKind.UPDATE_AFTER); + } catch (Exception e) { if (sourceExactlyMetric != null) { - out = new MetricsCollector<>(out, sourceExactlyMetric); + sourceExactlyMetric.incNumDeserializeError(); } - emit(record, after, out); + throw e; } } @@ -697,4 +709,9 @@ public void updateLastCheckpointId(long checkpointId) { sourceExactlyMetric.updateLastCheckpointId(checkpointId); } } + + /** allow DebeziumSourceFunction to set the SourceExactlyMetric */ + public void setSourceExactlyMetric(SourceExactlyMetric sourceExactlyMetric) { + this.sourceExactlyMetric = sourceExactlyMetric; + } } diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/SqlServerSource.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/SqlServerSource.java index 6a094521a5d..92353bf0cf6 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/SqlServerSource.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/SqlServerSource.java @@ -17,6 +17,8 @@ package org.apache.inlong.sort.sqlserver; +import org.apache.inlong.sort.base.metric.MetricOption; + import com.ververica.cdc.connectors.sqlserver.SqlServerValidator; import com.ververica.cdc.connectors.sqlserver.table.StartupOptions; import io.debezium.connector.sqlserver.SqlServerConnector; @@ -51,6 +53,7 @@ public static class Builder { private Properties dbzProperties; private StartupOptions startupOptions = StartupOptions.initial(); private DebeziumDeserializationSchema deserializer; + private MetricOption metricOption; public Builder hostname(String hostname) { this.hostname = hostname; @@ -114,6 +117,12 @@ public Builder startupOptions(StartupOptions startupOptions) { return this; } + /** metricOption used to instantiate SourceExactlyMetric when inlong.metric.labels is present in flink sql */ + public Builder metricOption(MetricOption metricOption) { + this.metricOption = metricOption; + return this; + } + public DebeziumSourceFunction build() { Properties props = new Properties(); props.setProperty("connector.class", SqlServerConnector.class.getCanonicalName()); @@ -154,7 +163,7 @@ public DebeziumSourceFunction build() { } return new DebeziumSourceFunction<>( - deserializer, props, null, new SqlServerValidator(props)); + deserializer, props, null, new SqlServerValidator(props), metricOption); } } } diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/SqlServerTableSource.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/SqlServerTableSource.java index c49dd9747af..87defcedca4 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/SqlServerTableSource.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/SqlServerTableSource.java @@ -144,6 +144,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { .debeziumProperties(dbzProperties) .startupOptions(startupOptions) .deserializer(deserializer) + .metricOption(metricOption) .build(); return SourceFunctionProvider.of(sourceFunction, false); } From 2c37cea1e90b2ff2191f4cb78cd046d8ce87ebc1 Mon Sep 17 00:00:00 2001 From: fuweng11 <76141879+fuweng11@users.noreply.github.com> Date: Wed, 16 Oct 2024 17:20:44 +0800 Subject: [PATCH 2/4] [INLONG-11361][Manager] Support querying heartbeat information based on IP address (#11362) --- .../resources/mappers/ComponentHeartbeatEntityMapper.xml | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/ComponentHeartbeatEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/ComponentHeartbeatEntityMapper.xml index c5f0ebf08c7..133947a4e43 100644 --- a/inlong-manager/manager-dao/src/main/resources/mappers/ComponentHeartbeatEntityMapper.xml +++ b/inlong-manager/manager-dao/src/main/resources/mappers/ComponentHeartbeatEntityMapper.xml @@ -66,7 +66,12 @@ select from component_heartbeat - where component = #{request.component, jdbcType=VARCHAR} + + component = #{request.component, jdbcType=VARCHAR} + + and instance = #{request.instance,jdbcType=VARCHAR} + + order by modify_time desc From 67676d863162206c0e52912ec64cc0d18a81e3f3 Mon Sep 17 00:00:00 2001 From: PeterZh6 Date: Wed, 16 Oct 2024 17:21:09 +0800 Subject: [PATCH 3/4] [INLONG-11355][Sort] Add new source metrics for sort-connector-mongodb-cdc-v1.15 (#11356) --- .../sort/mongodb/DebeziumSourceFunction.java | 70 +++++++++-- ...MongoDBConnectorDeserializationSchema.java | 110 ++++++++++-------- .../inlong/sort/mongodb/MongoDBSource.java | 16 ++- .../sort/mongodb/MongoDBTableSource.java | 4 +- .../mongodb/source/MongoDBSourceBuilder.java | 8 ++ 5 files changed, 147 insertions(+), 61 deletions(-) diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/DebeziumSourceFunction.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/DebeziumSourceFunction.java index 2d7191525be..46541295e74 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/DebeziumSourceFunction.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/DebeziumSourceFunction.java @@ -17,6 +17,9 @@ package org.apache.inlong.sort.mongodb; +import org.apache.inlong.sort.base.metric.MetricOption; +import org.apache.inlong.sort.base.metric.SourceExactlyMetric; + import com.ververica.cdc.debezium.Validator; import com.ververica.cdc.debezium.internal.DebeziumChangeConsumer; import com.ververica.cdc.debezium.internal.DebeziumOffset; @@ -61,6 +64,8 @@ import java.lang.reflect.Method; import java.nio.charset.StandardCharsets; import java.util.Collection; +import java.util.HashMap; +import java.util.Map; import java.util.Properties; import java.util.UUID; import java.util.concurrent.ConcurrentLinkedQueue; @@ -197,17 +202,25 @@ public class DebeziumSourceFunction extends RichSourceFunction /** Buffer the events from the source and record the errors from the debezium. */ private transient Handover handover; + private transient SourceExactlyMetric sourceExactlyMetric; + + private final MetricOption metricOption; + + private transient Map checkpointStartTimeMap; + // --------------------------------------------------------------------------------------- public DebeziumSourceFunction( DebeziumDeserializationSchema deserializer, Properties properties, @Nullable DebeziumOffset specificOffset, - Validator validator) { + Validator validator, + MetricOption metricOption) { this.deserializer = deserializer; this.properties = properties; this.specificOffset = specificOffset; this.validator = validator; + this.metricOption = metricOption; } @Override @@ -220,6 +233,14 @@ public void open(Configuration parameters) throws Exception { this.executor = Executors.newSingleThreadExecutor(threadFactory); this.handover = new Handover(); this.changeConsumer = new DebeziumChangeConsumer(handover); + if (metricOption != null) { + sourceExactlyMetric = new SourceExactlyMetric(metricOption, getRuntimeContext().getMetricGroup()); + } + this.checkpointStartTimeMap = new HashMap<>(); + // set sourceExactlyMetric for deserializer + if (deserializer instanceof MongoDBConnectorDeserializationSchema) { + ((MongoDBConnectorDeserializationSchema) deserializer).setSourceExactlyMetric(sourceExactlyMetric); + } } // ------------------------------------------------------------------------ @@ -304,17 +325,32 @@ private void restoreHistoryRecordsState() throws Exception { @Override public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception { - if (handover.hasError()) { - LOG.debug("snapshotState() called on closed source"); - throw new FlinkRuntimeException( - "Call snapshotState() on closed source, checkpoint failed."); - } else { - snapshotOffsetState(functionSnapshotContext.getCheckpointId()); - snapshotHistoryRecordsState(); - } - if (deserializer instanceof MongoDBConnectorDeserializationSchema) { - ((MongoDBConnectorDeserializationSchema) deserializer) - .updateCurrentCheckpointId(functionSnapshotContext.getCheckpointId()); + try { + if (handover.hasError()) { + LOG.debug("snapshotState() called on closed source"); + throw new FlinkRuntimeException( + "Call snapshotState() on closed source, checkpoint failed."); + } else { + snapshotOffsetState(functionSnapshotContext.getCheckpointId()); + snapshotHistoryRecordsState(); + } + if (deserializer instanceof MongoDBConnectorDeserializationSchema) { + ((MongoDBConnectorDeserializationSchema) deserializer) + .updateCurrentCheckpointId(functionSnapshotContext.getCheckpointId()); + } + if (checkpointStartTimeMap != null) { + checkpointStartTimeMap.put(functionSnapshotContext.getCheckpointId(), System.currentTimeMillis()); + } else { + LOG.error("checkpointStartTimeMap is null, can't record the start time of checkpoint"); + } + if (sourceExactlyMetric != null) { + sourceExactlyMetric.incNumSnapshotCreate();; + } + } catch (Exception e) { + if (sourceExactlyMetric != null) { + sourceExactlyMetric.incNumDeserializeError(); + } + throw e; } } @@ -496,6 +532,16 @@ public void notifyCheckpointComplete(long checkpointId) { schema.flushAudit(); schema.updateLastCheckpointId(checkpointId); } + if (checkpointStartTimeMap != null) { + Long snapShotStartTimeById = checkpointStartTimeMap.remove(checkpointId); + if (snapShotStartTimeById != null && sourceExactlyMetric != null) { + sourceExactlyMetric.incNumSnapshotComplete(); + sourceExactlyMetric + .recordSnapshotToCheckpointDelay(System.currentTimeMillis() - snapShotStartTimeById); + } + } else { + LOG.error("checkpointStartTimeMap is null, can't get the start time of checkpoint"); + } } catch (Exception e) { // ignore exception if we are no longer running LOG.warn("Ignore error when committing offset to database.", e); diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/MongoDBConnectorDeserializationSchema.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/MongoDBConnectorDeserializationSchema.java index daa8dccb791..668f6de4cb8 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/MongoDBConnectorDeserializationSchema.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/MongoDBConnectorDeserializationSchema.java @@ -137,53 +137,66 @@ public void open() { @Override public void deserialize(SourceRecord record, Collector out) throws Exception { - Struct value = (Struct) record.value(); - Schema valueSchema = record.valueSchema(); - - OperationType op = operationTypeFor(record); - BsonDocument documentKey = - checkNotNull( - extractBsonDocument( - value, valueSchema, MongoDBEnvelope.DOCUMENT_KEY_FIELD)); - BsonDocument fullDocument = - extractBsonDocument(value, valueSchema, MongoDBEnvelope.FULL_DOCUMENT_FIELD); - switch (op) { - case INSERT: - GenericRowData insert = extractRowData(fullDocument); - insert.setRowKind(RowKind.INSERT); - emit(record, insert, - sourceExactlyMetric == null ? out : new MetricsCollector<>(out, sourceExactlyMetric)); - break; - case DELETE: - GenericRowData delete = extractRowData(documentKey); - delete.setRowKind(RowKind.DELETE); - emit(record, delete, - sourceExactlyMetric == null ? out : new MetricsCollector<>(out, sourceExactlyMetric)); - break; - case UPDATE: - // It’s null if another operation deletes the document - // before the lookup operation happens. Ignored it. - if (fullDocument == null) { + long deserializeStartTime = System.currentTimeMillis(); + try { + Struct value = (Struct) record.value(); + Schema valueSchema = record.valueSchema(); + + OperationType op = operationTypeFor(record); + BsonDocument documentKey = + checkNotNull( + extractBsonDocument( + value, valueSchema, MongoDBEnvelope.DOCUMENT_KEY_FIELD)); + BsonDocument fullDocument = + extractBsonDocument(value, valueSchema, MongoDBEnvelope.FULL_DOCUMENT_FIELD); + switch (op) { + case INSERT: + GenericRowData insert = extractRowData(fullDocument); + insert.setRowKind(RowKind.INSERT); + emit(record, insert, + sourceExactlyMetric == null ? out : new MetricsCollector<>(out, sourceExactlyMetric)); break; - } - GenericRowData updateAfter = extractRowData(fullDocument); - updateAfter.setRowKind(RowKind.UPDATE_AFTER); - emit(record, updateAfter, - sourceExactlyMetric == null ? out : new MetricsCollector<>(out, sourceExactlyMetric)); - break; - case REPLACE: - GenericRowData replaceAfter = extractRowData(fullDocument); - replaceAfter.setRowKind(RowKind.UPDATE_AFTER); - emit(record, replaceAfter, - sourceExactlyMetric == null ? out : new MetricsCollector<>(out, sourceExactlyMetric)); - break; - case INVALIDATE: - case DROP: - case DROP_DATABASE: - case RENAME: - case OTHER: - default: - break; + case DELETE: + GenericRowData delete = extractRowData(documentKey); + delete.setRowKind(RowKind.DELETE); + emit(record, delete, + sourceExactlyMetric == null ? out : new MetricsCollector<>(out, sourceExactlyMetric)); + break; + case UPDATE: + // It’s null if another operation deletes the document + // before the lookup operation happens. Ignored it. + if (fullDocument == null) { + break; + } + GenericRowData updateAfter = extractRowData(fullDocument); + updateAfter.setRowKind(RowKind.UPDATE_AFTER); + emit(record, updateAfter, + sourceExactlyMetric == null ? out : new MetricsCollector<>(out, sourceExactlyMetric)); + break; + case REPLACE: + GenericRowData replaceAfter = extractRowData(fullDocument); + replaceAfter.setRowKind(RowKind.UPDATE_AFTER); + emit(record, replaceAfter, + sourceExactlyMetric == null ? out : new MetricsCollector<>(out, sourceExactlyMetric)); + break; + case INVALIDATE: + case DROP: + case DROP_DATABASE: + case RENAME: + case OTHER: + default: + break; + } + if (sourceExactlyMetric != null) { + sourceExactlyMetric.incNumDeserializeSuccess(); + sourceExactlyMetric.recordDeserializeDelay(System.currentTimeMillis() - deserializeStartTime); + } + + } catch (Exception e) { + if (sourceExactlyMetric != null) { + sourceExactlyMetric.incNumDeserializeError(); + } + throw e; } } @@ -827,4 +840,9 @@ public void updateLastCheckpointId(long checkpointId) { sourceExactlyMetric.updateLastCheckpointId(checkpointId); } } + + /** setter for DebeziumSourceFunction to set SourceExactlyMetric*/ + public void setSourceExactlyMetric(SourceExactlyMetric sourceExactlyMetric) { + this.sourceExactlyMetric = sourceExactlyMetric; + } } diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/MongoDBSource.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/MongoDBSource.java index f9aab2d54f3..67bb51bf69f 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/MongoDBSource.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/MongoDBSource.java @@ -17,6 +17,8 @@ package org.apache.inlong.sort.mongodb; +import org.apache.inlong.sort.base.metric.MetricOption; + import com.mongodb.client.model.changestream.FullDocument; import com.mongodb.kafka.connect.source.MongoSourceConfig; import com.mongodb.kafka.connect.source.MongoSourceConfig.ErrorTolerance; @@ -35,7 +37,11 @@ import static com.ververica.cdc.connectors.mongodb.internal.MongoDBConnectorSourceTask.DATABASE_INCLUDE_LIST; import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.HEARTBEAT_TOPIC_NAME; import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.OUTPUT_SCHEMA; -import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.*; +import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.BATCH_SIZE; +import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.COPY_EXISTING; +import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.HEARTBEAT_INTERVAL_MILLIS; +import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.POLL_AWAIT_TIME_MILLIS; +import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.POLL_MAX_BATCH_SIZE; import static com.ververica.cdc.connectors.mongodb.source.utils.MongoUtils.buildConnectionString; import static org.apache.flink.util.Preconditions.checkArgument; @@ -76,6 +82,7 @@ public static class Builder { private String copyExistingPipeline; private Integer heartbeatIntervalMillis = HEARTBEAT_INTERVAL_MILLIS.defaultValue(); private DebeziumDeserializationSchema deserializer; + private MetricOption metricOption; /** The comma-separated list of hostname and port pairs of mongodb servers. */ public Builder hosts(String hosts) { @@ -243,6 +250,11 @@ public Builder deserializer(DebeziumDeserializationSchema deserializer) { return this; } + public Builder metricOption(MetricOption metricOption) { + this.metricOption = metricOption; + return this; + } + /** * The properties of mongodb kafka connector. * https://docs.mongodb.com/kafka-connector/current/kafka-source @@ -338,7 +350,7 @@ public DebeziumSourceFunction build() { MongoSourceConfig.ERRORS_TOLERANCE_CONFIG, ErrorTolerance.NONE.value()); return new DebeziumSourceFunction<>( - deserializer, props, null, Validator.getDefaultValidator()); + deserializer, props, null, Validator.getDefaultValidator(), metricOption); } } } diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/MongoDBTableSource.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/MongoDBTableSource.java index 9c417b4edfa..a161077b0a3 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/MongoDBTableSource.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/MongoDBTableSource.java @@ -191,13 +191,15 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { .ifPresent(builder::heartbeatIntervalMillis); Optional.ofNullable(splitMetaGroupSize).ifPresent(builder::splitMetaGroupSize); Optional.ofNullable(splitSizeMB).ifPresent(builder::splitSizeMB); + Optional.ofNullable(metricOption).ifPresent(builder::metricOption); return SourceProvider.of(builder.build()); } else { org.apache.inlong.sort.mongodb.MongoDBSource.Builder builder = org.apache.inlong.sort.mongodb.MongoDBSource.builder() .hosts(hosts) - .deserializer(deserializer); + .deserializer(deserializer) + .metricOption(metricOption); Optional.ofNullable(databaseList).ifPresent(builder::databaseList); Optional.ofNullable(collectionList).ifPresent(builder::collectionList); diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/source/MongoDBSourceBuilder.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/source/MongoDBSourceBuilder.java index a95f238a0be..07b0d9f1cf5 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/source/MongoDBSourceBuilder.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/source/MongoDBSourceBuilder.java @@ -17,6 +17,7 @@ package org.apache.inlong.sort.mongodb.source; +import org.apache.inlong.sort.base.metric.MetricOption; import org.apache.inlong.sort.mongodb.DebeziumDeserializationSchema; import com.ververica.cdc.connectors.base.options.StartupOptions; @@ -54,6 +55,7 @@ public class MongoDBSourceBuilder { private final MongoDBSourceConfigFactory configFactory = new MongoDBSourceConfigFactory(); private DebeziumDeserializationSchema deserializer; + private MetricOption metricOption; /** The comma-separated list of hostname and port pairs of mongodb servers. */ public MongoDBSourceBuilder hosts(String hosts) { @@ -189,6 +191,12 @@ public MongoDBSourceBuilder deserializer(DebeziumDeserializationSchema des return this; } + /** The metric option used to collect metrics when inlong.metric.labels is present in flink sql. */ + public MongoDBSourceBuilder metricOption(MetricOption metricOption) { + this.metricOption = metricOption; + return this; + } + /** * Build the {@link MongoDBSource}. * From 75f2e984fb95d0fdedf2f80121df352e95cbc4d7 Mon Sep 17 00:00:00 2001 From: doleyzi <43397300+doleyzi@users.noreply.github.com> Date: Wed, 16 Oct 2024 18:35:54 +0800 Subject: [PATCH 4/4] [INLONG-11360][Audit] Add a metric monitoring system for the Audit Store itself (#11363) --- .../inlong/audit/metric/AbstractMetric.java | 1 + .../inlong/audit/metric/MetricsManager.java | 1 + .../prometheus/ProxyPrometheusMetric.java | 5 + .../inlong/audit/{ => store}/Application.java | 2 +- .../audit/store/config/ConfigConstants.java | 31 ++++++ .../audit/{ => store}/config/JdbcConfig.java | 2 +- .../config/MessageQueueConfig.java | 2 +- .../audit/{ => store}/config/StoreConfig.java | 2 +- .../{db => store}/entities/JdbcDataPo.java | 2 +- .../audit/store/metric/MetricDimension.java | 37 ++++++++ .../inlong/audit/store/metric/MetricItem.java | 40 ++++++++ .../audit/store/metric/MetricsManager.java | 95 +++++++++++++++++++ .../prometheus/StorePrometheusMetric.java | 91 ++++++++++++++++++ .../service/AuditMsgConsumerServer.java | 26 +++-- .../audit/{ => store}/service/InsertData.java | 2 +- .../{ => store}/service/JdbcService.java | 15 ++- .../service/consume/BaseConsume.java | 15 ++- .../service/consume/KafkaConsume.java | 10 +- .../service/consume/PulsarConsume.java | 11 ++- .../service/consume/TubeConsume.java | 12 ++- .../service/consume/KafkaConsumeTest.java | 12 +-- .../service/consume/PulsarConsumeTest.java | 2 +- .../service/consume/TubeConsumeTest.java | 12 +-- inlong-audit/conf/application.properties | 10 +- 24 files changed, 390 insertions(+), 48 deletions(-) rename inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/{ => store}/Application.java (97%) create mode 100644 inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/config/ConfigConstants.java rename inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/{ => store}/config/JdbcConfig.java (97%) rename inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/{ => store}/config/MessageQueueConfig.java (98%) rename inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/{ => store}/config/StoreConfig.java (96%) rename inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/{db => store}/entities/JdbcDataPo.java (96%) create mode 100644 inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/MetricDimension.java create mode 100644 inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/MetricItem.java create mode 100644 inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/MetricsManager.java create mode 100644 inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/prometheus/StorePrometheusMetric.java rename inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/{ => store}/service/AuditMsgConsumerServer.java (90%) rename inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/{ => store}/service/InsertData.java (96%) rename inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/{ => store}/service/JdbcService.java (94%) rename inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/{ => store}/service/consume/BaseConsume.java (85%) rename inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/{ => store}/service/consume/KafkaConsume.java (95%) rename inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/{ => store}/service/consume/PulsarConsume.java (95%) rename inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/{ => store}/service/consume/TubeConsume.java (92%) rename inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/{ => store}/service/consume/KafkaConsumeTest.java (87%) rename inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/{ => store}/service/consume/PulsarConsumeTest.java (97%) rename inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/{ => store}/service/consume/TubeConsumeTest.java (88%) diff --git a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/metric/AbstractMetric.java b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/metric/AbstractMetric.java index 4c2f627916e..a54995b8d66 100644 --- a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/metric/AbstractMetric.java +++ b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/metric/AbstractMetric.java @@ -20,4 +20,5 @@ public interface AbstractMetric { public void report(); + public void stop(); } diff --git a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/metric/MetricsManager.java b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/metric/MetricsManager.java index 433fc71848a..f27920159b6 100644 --- a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/metric/MetricsManager.java +++ b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/metric/MetricsManager.java @@ -94,5 +94,6 @@ public void addSendFailed(long count) { } public void shutdown() { timer.shutdown(); + metric.stop(); } } diff --git a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/metric/prometheus/ProxyPrometheusMetric.java b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/metric/prometheus/ProxyPrometheusMetric.java index 07c2397743e..0871a613b35 100644 --- a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/metric/prometheus/ProxyPrometheusMetric.java +++ b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/metric/prometheus/ProxyPrometheusMetric.java @@ -80,4 +80,9 @@ private MetricFamilySamples.Sample createSample(MetricDimension key, double valu public void report() { LOGGER.info("Report proxy prometheus metric: {} ", metricItem.toString()); } + + @Override + public void stop() { + server.close(); + } } \ No newline at end of file diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/Application.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/Application.java similarity index 97% rename from inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/Application.java rename to inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/Application.java index 0bcb3aa973f..46cc7212eb0 100644 --- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/Application.java +++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/Application.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.inlong.audit; +package org.apache.inlong.audit.store; import org.springframework.boot.WebApplicationType; import org.springframework.boot.autoconfigure.SpringBootApplication; diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/config/ConfigConstants.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/config/ConfigConstants.java new file mode 100644 index 00000000000..a0585efd376 --- /dev/null +++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/config/ConfigConstants.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.audit.store.config; + +/** + * Config constants + */ +public class ConfigConstants { + + public static final String AUDIT_STORE_SERVER_NAME = "audit-store"; + public static final String KEY_PROMETHEUS_PORT = "audit.store.prometheus.port"; + public static final int DEFAULT_PROMETHEUS_PORT = 10083; + public static final String KEY_STORE_METRIC_CLASSNAME = "audit.store.metric.classname"; + public static final String DEFAULT_STORE_METRIC_CLASSNAME = + "org.apache.inlong.audit.store.metric.prometheus.StorePrometheusMetric"; +} diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/JdbcConfig.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/config/JdbcConfig.java similarity index 97% rename from inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/JdbcConfig.java rename to inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/config/JdbcConfig.java index 42249200f09..0b6c7c36bb7 100644 --- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/JdbcConfig.java +++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/config/JdbcConfig.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.inlong.audit.config; +package org.apache.inlong.audit.store.config; import lombok.Data; import org.springframework.beans.factory.annotation.Value; diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/MessageQueueConfig.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/config/MessageQueueConfig.java similarity index 98% rename from inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/MessageQueueConfig.java rename to inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/config/MessageQueueConfig.java index 2cfa4b1d34e..3a4e6c6cae2 100644 --- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/MessageQueueConfig.java +++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/config/MessageQueueConfig.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.inlong.audit.config; +package org.apache.inlong.audit.store.config; import lombok.Getter; import lombok.Setter; diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/StoreConfig.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/config/StoreConfig.java similarity index 96% rename from inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/StoreConfig.java rename to inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/config/StoreConfig.java index ca3358701ed..abee9852fec 100644 --- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/config/StoreConfig.java +++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/config/StoreConfig.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.inlong.audit.config; +package org.apache.inlong.audit.store.config; import lombok.Getter; import lombok.Setter; diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/JdbcDataPo.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/entities/JdbcDataPo.java similarity index 96% rename from inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/JdbcDataPo.java rename to inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/entities/JdbcDataPo.java index ebc42f4a537..0c568225b3f 100644 --- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/JdbcDataPo.java +++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/entities/JdbcDataPo.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.inlong.audit.db.entities; +package org.apache.inlong.audit.store.entities; import lombok.Data; import org.apache.pulsar.client.api.Consumer; diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/MetricDimension.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/MetricDimension.java new file mode 100644 index 00000000000..02c2258dbc4 --- /dev/null +++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/MetricDimension.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.audit.store.metric; + +public enum MetricDimension { + + RECEIVE_COUNT_SUCCESS("receiveCountSuccess"), + RECEIVE_FAILED("receiveFailed"), + SEND_COUNT_SUCCESS("sendCountSuccess"), + SEND_COUNT_FAILED("sendCountFailed"), + SEND_DURATION("sendDuration"); + + private final String key; + + MetricDimension(String key) { + this.key = key; + } + + public String getKey() { + return key; + } +} diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/MetricItem.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/MetricItem.java new file mode 100644 index 00000000000..0e5dd9ad18b --- /dev/null +++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/MetricItem.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.audit.store.metric; + +import lombok.Data; + +import java.util.concurrent.atomic.AtomicLong; + +@Data +public class MetricItem { + + public static final String K_DIMENSION_KEY = "dimensionName"; + private AtomicLong receiveCountSuccess = new AtomicLong(0); + private AtomicLong receiveFailed = new AtomicLong(0); + private AtomicLong sendCountSuccess = new AtomicLong(0); + private AtomicLong sendCountFailed = new AtomicLong(0); + private AtomicLong sendDuration = new AtomicLong(0); + public void resetAllMetrics() { + receiveCountSuccess.set(0); + receiveFailed.set(0); + sendCountSuccess.set(0); + sendCountFailed.set(0); + sendDuration.set(0); + } +} diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/MetricsManager.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/MetricsManager.java new file mode 100644 index 00000000000..68b69609cfa --- /dev/null +++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/MetricsManager.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.audit.store.metric; + +import org.apache.inlong.audit.file.ConfigManager; +import org.apache.inlong.audit.metric.AbstractMetric; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import static org.apache.inlong.audit.store.config.ConfigConstants.DEFAULT_STORE_METRIC_CLASSNAME; +import static org.apache.inlong.audit.store.config.ConfigConstants.KEY_STORE_METRIC_CLASSNAME; + +public class MetricsManager { + + private static final Logger LOGGER = LoggerFactory.getLogger(MetricsManager.class); + + private static class Holder { + + private static final MetricsManager INSTANCE = new MetricsManager(); + } + + private AbstractMetric metric; + + public void init() { + try { + String metricClassName = + ConfigManager.getInstance().getValue(KEY_STORE_METRIC_CLASSNAME, DEFAULT_STORE_METRIC_CLASSNAME); + LOGGER.info("Metric class name: {}", metricClassName); + Constructor constructor = Class.forName(metricClassName) + .getDeclaredConstructor(MetricItem.class); + constructor.setAccessible(true); + metric = (AbstractMetric) constructor.newInstance(metricItem); + + timer.scheduleWithFixedDelay(() -> { + metric.report(); + metricItem.resetAllMetrics(); + }, 0, 1, TimeUnit.MINUTES); + } catch (ClassNotFoundException | NoSuchMethodException | InstantiationException | IllegalAccessException + | InvocationTargetException exception) { + LOGGER.error("Init metrics manager has exception: ", exception); + } + } + + public static MetricsManager getInstance() { + return Holder.INSTANCE; + } + + private final MetricItem metricItem = new MetricItem(); + protected final ScheduledExecutorService timer = Executors.newSingleThreadScheduledExecutor(); + + public void addReceiveSuccess(long count) { + metricItem.getReceiveCountSuccess().addAndGet(count); + } + + public void addReceiveFailed(long pack) { + metricItem.getReceiveFailed().addAndGet(pack); + } + + public void addSendSuccess(long count, long duration) { + metricItem.getSendCountSuccess().addAndGet(count); + metricItem.getSendDuration().addAndGet(duration); + } + + public void addSendFailed(long count, long duration) { + metricItem.getSendCountFailed().addAndGet(count); + metricItem.getSendDuration().addAndGet(duration); + } + + public void shutdown() { + timer.shutdown(); + metric.stop(); + } +} diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/prometheus/StorePrometheusMetric.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/prometheus/StorePrometheusMetric.java new file mode 100644 index 00000000000..6aa60feb6c7 --- /dev/null +++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/prometheus/StorePrometheusMetric.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.audit.store.metric.prometheus; + +import org.apache.inlong.audit.file.ConfigManager; +import org.apache.inlong.audit.metric.AbstractMetric; +import org.apache.inlong.audit.store.metric.MetricDimension; +import org.apache.inlong.audit.store.metric.MetricItem; + +import io.prometheus.client.Collector; +import io.prometheus.client.exporter.HTTPServer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.apache.inlong.audit.store.config.ConfigConstants.AUDIT_STORE_SERVER_NAME; +import static org.apache.inlong.audit.store.config.ConfigConstants.DEFAULT_PROMETHEUS_PORT; +import static org.apache.inlong.audit.store.config.ConfigConstants.KEY_PROMETHEUS_PORT; + +/** + * PrometheusMetric + */ +public class StorePrometheusMetric extends Collector implements AbstractMetric { + + private static final Logger LOGGER = LoggerFactory.getLogger(StorePrometheusMetric.class); + private static final String HELP_DESCRIPTION = "help"; + + private final MetricItem metricItem; + private HTTPServer server; + + public StorePrometheusMetric(MetricItem metricItem) { + this.metricItem = metricItem; + try { + server = new HTTPServer(ConfigManager.getInstance().getValue(KEY_PROMETHEUS_PORT, DEFAULT_PROMETHEUS_PORT)); + this.register(); + } catch (IOException e) { + LOGGER.error("Construct store prometheus metric has IOException", e); + } + } + + @Override + public List collect() { + List samples = Arrays.asList( + createSample(MetricDimension.RECEIVE_COUNT_SUCCESS, metricItem.getReceiveCountSuccess().doubleValue()), + createSample(MetricDimension.RECEIVE_FAILED, metricItem.getReceiveFailed().doubleValue()), + createSample(MetricDimension.SEND_COUNT_SUCCESS, metricItem.getSendCountSuccess().doubleValue()), + createSample(MetricDimension.SEND_COUNT_FAILED, metricItem.getSendCountFailed().doubleValue()), + createSample(MetricDimension.SEND_DURATION, metricItem.getSendDuration().doubleValue())); + + MetricFamilySamples metricFamilySamples = + new MetricFamilySamples(AUDIT_STORE_SERVER_NAME, Type.GAUGE, HELP_DESCRIPTION, samples); + + return Collections.singletonList(metricFamilySamples); + } + + private MetricFamilySamples.Sample createSample(MetricDimension key, double value) { + return new MetricFamilySamples.Sample(AUDIT_STORE_SERVER_NAME, + Collections.singletonList(MetricItem.K_DIMENSION_KEY), + Collections.singletonList(key.getKey()), value); + } + + @Override + public void report() { + LOGGER.info("Report store prometheus metric: {} ", metricItem.toString()); + } + + @Override + public void stop() { + server.close(); + } + +} \ No newline at end of file diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/AuditMsgConsumerServer.java similarity index 90% rename from inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java rename to inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/AuditMsgConsumerServer.java index 51ed0caf609..16251380684 100644 --- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java +++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/AuditMsgConsumerServer.java @@ -15,17 +15,18 @@ * limitations under the License. */ -package org.apache.inlong.audit.service; +package org.apache.inlong.audit.store.service; -import org.apache.inlong.audit.config.JdbcConfig; -import org.apache.inlong.audit.config.MessageQueueConfig; -import org.apache.inlong.audit.config.StoreConfig; import org.apache.inlong.audit.consts.ConfigConstants; import org.apache.inlong.audit.file.RemoteConfigJson; -import org.apache.inlong.audit.service.consume.BaseConsume; -import org.apache.inlong.audit.service.consume.KafkaConsume; -import org.apache.inlong.audit.service.consume.PulsarConsume; -import org.apache.inlong.audit.service.consume.TubeConsume; +import org.apache.inlong.audit.store.config.JdbcConfig; +import org.apache.inlong.audit.store.config.MessageQueueConfig; +import org.apache.inlong.audit.store.config.StoreConfig; +import org.apache.inlong.audit.store.metric.MetricsManager; +import org.apache.inlong.audit.store.service.consume.BaseConsume; +import org.apache.inlong.audit.store.service.consume.KafkaConsume; +import org.apache.inlong.audit.store.service.consume.PulsarConsume; +import org.apache.inlong.audit.store.service.consume.TubeConsume; import org.apache.inlong.common.constant.MQType; import org.apache.inlong.common.pojo.audit.AuditConfigRequest; import org.apache.inlong.common.pojo.audit.MQInfo; @@ -46,6 +47,8 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import javax.annotation.PreDestroy; + import java.io.InputStream; import java.util.ArrayList; import java.util.List; @@ -100,6 +103,8 @@ public void afterPropertiesSet() { if (mqConsume != null) { mqConsume.start(); } + + MetricsManager.getInstance().init(); } /** @@ -177,4 +182,9 @@ private List getMQConfig(String host, String clusterTag) { } return null; } + + @PreDestroy + public void shutdown() { + MetricsManager.getInstance().shutdown(); + } } diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/InsertData.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/InsertData.java similarity index 96% rename from inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/InsertData.java rename to inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/InsertData.java index 7f9bcf82074..ef038177ba1 100644 --- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/InsertData.java +++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/InsertData.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.inlong.audit.service; +package org.apache.inlong.audit.store.service; import org.apache.inlong.audit.protocol.AuditData; diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/JdbcService.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/JdbcService.java similarity index 94% rename from inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/JdbcService.java rename to inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/JdbcService.java index 7aaac09e643..68ba584d739 100644 --- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/JdbcService.java +++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/JdbcService.java @@ -15,11 +15,12 @@ * limitations under the License. */ -package org.apache.inlong.audit.service; +package org.apache.inlong.audit.store.service; -import org.apache.inlong.audit.config.JdbcConfig; -import org.apache.inlong.audit.db.entities.JdbcDataPo; import org.apache.inlong.audit.protocol.AuditData; +import org.apache.inlong.audit.store.config.JdbcConfig; +import org.apache.inlong.audit.store.entities.JdbcDataPo; +import org.apache.inlong.audit.store.metric.MetricsManager; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.MessageId; @@ -113,6 +114,9 @@ private void process() { private boolean executeBatch(List dataList) { boolean result = false; + + long currentTimestamp = System.currentTimeMillis(); + try (PreparedStatement statement = connection.prepareStatement(INSERT_SQL)) { for (JdbcDataPo data : dataList) { statement.setString(1, data.getIp()); @@ -135,7 +139,12 @@ private boolean executeBatch(List dataList) { statement.executeBatch(); connection.commit(); result = true; + + MetricsManager.getInstance().addSendSuccess(dataList.size(), System.currentTimeMillis() - currentTimestamp); + } catch (Exception exception) { + + MetricsManager.getInstance().addSendFailed(dataList.size(), System.currentTimeMillis() - currentTimestamp); LOG.error("Execute batch has failure!", exception); try { reconnect(); diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/BaseConsume.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/consume/BaseConsume.java similarity index 85% rename from inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/BaseConsume.java rename to inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/consume/BaseConsume.java index e0fbb7180ad..c92dcb29f64 100644 --- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/BaseConsume.java +++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/consume/BaseConsume.java @@ -15,12 +15,13 @@ * limitations under the License. */ -package org.apache.inlong.audit.service.consume; +package org.apache.inlong.audit.store.service.consume; -import org.apache.inlong.audit.config.MessageQueueConfig; -import org.apache.inlong.audit.config.StoreConfig; import org.apache.inlong.audit.protocol.AuditData; -import org.apache.inlong.audit.service.InsertData; +import org.apache.inlong.audit.store.config.MessageQueueConfig; +import org.apache.inlong.audit.store.config.StoreConfig; +import org.apache.inlong.audit.store.metric.MetricsManager; +import org.apache.inlong.audit.store.service.InsertData; import com.google.gson.Gson; import org.apache.pulsar.client.api.Consumer; @@ -56,6 +57,9 @@ public BaseConsume(List insertServiceList, StoreConfig storeConfig, */ protected void handleMessage(String body) throws Exception { AuditData msgBody = gson.fromJson(body, AuditData.class); + + MetricsManager.getInstance().addReceiveSuccess(1); + this.insertServiceList.forEach((service) -> { try { service.insert(msgBody); @@ -66,6 +70,9 @@ protected void handleMessage(String body) throws Exception { } protected void handleMessage(String body, Consumer consumer, MessageId messageId) { AuditData msgBody = gson.fromJson(body, AuditData.class); + + MetricsManager.getInstance().addReceiveSuccess(1); + this.insertServiceList.forEach((service) -> { try { service.insert(msgBody, consumer, messageId); diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/KafkaConsume.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/consume/KafkaConsume.java similarity index 95% rename from inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/KafkaConsume.java rename to inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/consume/KafkaConsume.java index cc0d8cac21c..34daef1c41a 100644 --- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/KafkaConsume.java +++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/consume/KafkaConsume.java @@ -15,11 +15,12 @@ * limitations under the License. */ -package org.apache.inlong.audit.service.consume; +package org.apache.inlong.audit.store.service.consume; -import org.apache.inlong.audit.config.MessageQueueConfig; -import org.apache.inlong.audit.config.StoreConfig; -import org.apache.inlong.audit.service.InsertData; +import org.apache.inlong.audit.store.config.MessageQueueConfig; +import org.apache.inlong.audit.store.config.StoreConfig; +import org.apache.inlong.audit.store.metric.MetricsManager; +import org.apache.inlong.audit.store.service.InsertData; import com.google.common.base.Preconditions; import org.apache.commons.lang3.StringUtils; @@ -188,6 +189,7 @@ public void run() { } } } catch (Exception e) { + MetricsManager.getInstance().addReceiveFailed(1); LOG.error("kafka consumer get message error {}", e.getMessage()); } } diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/PulsarConsume.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/consume/PulsarConsume.java similarity index 95% rename from inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/PulsarConsume.java rename to inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/consume/PulsarConsume.java index 7d8efc79ce2..c1a5fe92f25 100644 --- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/PulsarConsume.java +++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/consume/PulsarConsume.java @@ -15,11 +15,12 @@ * limitations under the License. */ -package org.apache.inlong.audit.service.consume; +package org.apache.inlong.audit.store.service.consume; -import org.apache.inlong.audit.config.MessageQueueConfig; -import org.apache.inlong.audit.config.StoreConfig; -import org.apache.inlong.audit.service.InsertData; +import org.apache.inlong.audit.store.config.MessageQueueConfig; +import org.apache.inlong.audit.store.config.StoreConfig; +import org.apache.inlong.audit.store.metric.MetricsManager; +import org.apache.inlong.audit.store.service.InsertData; import com.google.common.base.Preconditions; import org.apache.commons.lang3.StringUtils; @@ -131,6 +132,8 @@ public void received(Consumer consumer, Message msg) { String body = new String(msg.getData(), StandardCharsets.UTF_8); handleMessage(body, consumer, msg.getMessageId()); } catch (Exception e) { + MetricsManager.getInstance().addReceiveFailed(1); + LOG.error("Consumer has exception topic {}, subName {}, ex {}", topic, mqConfig.getPulsarConsumerSubName(), diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/TubeConsume.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/consume/TubeConsume.java similarity index 92% rename from inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/TubeConsume.java rename to inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/consume/TubeConsume.java index 06d1b5b1897..20cbd60e908 100644 --- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/consume/TubeConsume.java +++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/consume/TubeConsume.java @@ -15,11 +15,12 @@ * limitations under the License. */ -package org.apache.inlong.audit.service.consume; +package org.apache.inlong.audit.store.service.consume; -import org.apache.inlong.audit.config.MessageQueueConfig; -import org.apache.inlong.audit.config.StoreConfig; -import org.apache.inlong.audit.service.InsertData; +import org.apache.inlong.audit.store.config.MessageQueueConfig; +import org.apache.inlong.audit.store.config.StoreConfig; +import org.apache.inlong.audit.store.metric.MetricsManager; +import org.apache.inlong.audit.store.service.InsertData; import org.apache.inlong.tubemq.client.config.ConsumerConfig; import org.apache.inlong.tubemq.client.consumer.ConsumePosition; import org.apache.inlong.tubemq.client.consumer.ConsumerResult; @@ -133,12 +134,15 @@ public void run() { } pullMessageConsumer.confirmConsume(csmResult.getConfirmContext(), true); } else { + MetricsManager.getInstance().addReceiveFailed(1); LOG.error("receive messages errorCode is {}, error meddage is {}", csmResult.getErrCode(), csmResult.getErrMsg()); } } catch (TubeClientException e) { + MetricsManager.getInstance().addReceiveFailed(1); LOG.error("tube consumer getMessage error {}", e.getMessage()); } catch (Exception e) { + MetricsManager.getInstance().addReceiveFailed(1); LOG.error("handle audit message error {}", e.getMessage()); } diff --git a/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/service/consume/KafkaConsumeTest.java b/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/store/service/consume/KafkaConsumeTest.java similarity index 87% rename from inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/service/consume/KafkaConsumeTest.java rename to inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/store/service/consume/KafkaConsumeTest.java index 5ddee887bd6..4568078f8f6 100644 --- a/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/service/consume/KafkaConsumeTest.java +++ b/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/store/service/consume/KafkaConsumeTest.java @@ -15,13 +15,13 @@ * limitations under the License. */ -package org.apache.inlong.audit.service.consume; +package org.apache.inlong.audit.store.service.consume; -import org.apache.inlong.audit.config.JdbcConfig; -import org.apache.inlong.audit.config.MessageQueueConfig; -import org.apache.inlong.audit.config.StoreConfig; -import org.apache.inlong.audit.service.InsertData; -import org.apache.inlong.audit.service.JdbcService; +import org.apache.inlong.audit.store.config.JdbcConfig; +import org.apache.inlong.audit.store.config.MessageQueueConfig; +import org.apache.inlong.audit.store.config.StoreConfig; +import org.apache.inlong.audit.store.service.InsertData; +import org.apache.inlong.audit.store.service.JdbcService; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; diff --git a/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/service/consume/PulsarConsumeTest.java b/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/store/service/consume/PulsarConsumeTest.java similarity index 97% rename from inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/service/consume/PulsarConsumeTest.java rename to inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/store/service/consume/PulsarConsumeTest.java index acc7ccd68ff..9cd9b241ef2 100644 --- a/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/service/consume/PulsarConsumeTest.java +++ b/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/store/service/consume/PulsarConsumeTest.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.inlong.audit.service.consume; +package org.apache.inlong.audit.store.service.consume; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.impl.PulsarClientImpl; diff --git a/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/service/consume/TubeConsumeTest.java b/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/store/service/consume/TubeConsumeTest.java similarity index 88% rename from inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/service/consume/TubeConsumeTest.java rename to inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/store/service/consume/TubeConsumeTest.java index 4085f295149..d1bb85c82bf 100644 --- a/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/service/consume/TubeConsumeTest.java +++ b/inlong-audit/audit-store/src/test/java/org/apache/inlong/audit/store/service/consume/TubeConsumeTest.java @@ -15,13 +15,13 @@ * limitations under the License. */ -package org.apache.inlong.audit.service.consume; +package org.apache.inlong.audit.store.service.consume; -import org.apache.inlong.audit.config.JdbcConfig; -import org.apache.inlong.audit.config.MessageQueueConfig; -import org.apache.inlong.audit.config.StoreConfig; -import org.apache.inlong.audit.service.InsertData; -import org.apache.inlong.audit.service.JdbcService; +import org.apache.inlong.audit.store.config.JdbcConfig; +import org.apache.inlong.audit.store.config.MessageQueueConfig; +import org.apache.inlong.audit.store.config.StoreConfig; +import org.apache.inlong.audit.store.service.InsertData; +import org.apache.inlong.audit.store.service.JdbcService; import org.apache.inlong.tubemq.client.consumer.ConsumerResult; import org.apache.inlong.tubemq.client.consumer.PullMessageConsumer; import org.apache.inlong.tubemq.client.exception.TubeClientException; diff --git a/inlong-audit/conf/application.properties b/inlong-audit/conf/application.properties index 063b01aa8ff..ad1b4e487f6 100644 --- a/inlong-audit/conf/application.properties +++ b/inlong-audit/conf/application.properties @@ -53,7 +53,13 @@ audit.store.jdbc.username=root audit.store.jdbc.password=inlong ############################ -# metric config +# Audit Proxy metric config # org.apache.inlong.audit.metric.prometheus.ProxyPrometheusMetric is the default monitoring ########################### -audit.proxy.metric.classname=org.apache.inlong.audit.metric.prometheus.ProxyPrometheusMetric \ No newline at end of file +audit.proxy.metric.classname=org.apache.inlong.audit.metric.prometheus.ProxyPrometheusMetric + +############################ +# Audit Store metric config +# org.apache.inlong.audit.store.metric.prometheus.StorePrometheusMetric is the default monitoring +########################### +audit.store.metric.classname=org.apache.inlong.audit.store.metric.prometheus.StorePrometheusMetric \ No newline at end of file