From 87220ee89720e40a8a15399685dfa5cfd1aa96c9 Mon Sep 17 00:00:00 2001 From: PeterZh6 Date: Mon, 14 Oct 2024 15:41:03 +0800 Subject: [PATCH 1/2] [INLONG-11355][Sort] Add new source metrics for sort-connector-mongodb-cdc-v1.15 Co-authored-by: yangyang-12-wq --- .../sort/mongodb/DebeziumSourceFunction.java | 70 +++++++++-- ...MongoDBConnectorDeserializationSchema.java | 110 ++++++++++-------- .../inlong/sort/mongodb/MongoDBSource.java | 16 ++- .../sort/mongodb/MongoDBTableSource.java | 4 +- .../mongodb/source/MongoDBSourceBuilder.java | 7 ++ 5 files changed, 146 insertions(+), 61 deletions(-) diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/DebeziumSourceFunction.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/DebeziumSourceFunction.java index 2d7191525be..46541295e74 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/DebeziumSourceFunction.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/DebeziumSourceFunction.java @@ -17,6 +17,9 @@ package org.apache.inlong.sort.mongodb; +import org.apache.inlong.sort.base.metric.MetricOption; +import org.apache.inlong.sort.base.metric.SourceExactlyMetric; + import com.ververica.cdc.debezium.Validator; import com.ververica.cdc.debezium.internal.DebeziumChangeConsumer; import com.ververica.cdc.debezium.internal.DebeziumOffset; @@ -61,6 +64,8 @@ import java.lang.reflect.Method; import java.nio.charset.StandardCharsets; import java.util.Collection; +import java.util.HashMap; +import java.util.Map; import java.util.Properties; import java.util.UUID; import java.util.concurrent.ConcurrentLinkedQueue; @@ -197,17 +202,25 @@ public class DebeziumSourceFunction extends RichSourceFunction /** Buffer the events from the source and record the errors from the debezium. */ private transient Handover handover; + private transient SourceExactlyMetric sourceExactlyMetric; + + private final MetricOption metricOption; + + private transient Map checkpointStartTimeMap; + // --------------------------------------------------------------------------------------- public DebeziumSourceFunction( DebeziumDeserializationSchema deserializer, Properties properties, @Nullable DebeziumOffset specificOffset, - Validator validator) { + Validator validator, + MetricOption metricOption) { this.deserializer = deserializer; this.properties = properties; this.specificOffset = specificOffset; this.validator = validator; + this.metricOption = metricOption; } @Override @@ -220,6 +233,14 @@ public void open(Configuration parameters) throws Exception { this.executor = Executors.newSingleThreadExecutor(threadFactory); this.handover = new Handover(); this.changeConsumer = new DebeziumChangeConsumer(handover); + if (metricOption != null) { + sourceExactlyMetric = new SourceExactlyMetric(metricOption, getRuntimeContext().getMetricGroup()); + } + this.checkpointStartTimeMap = new HashMap<>(); + // set sourceExactlyMetric for deserializer + if (deserializer instanceof MongoDBConnectorDeserializationSchema) { + ((MongoDBConnectorDeserializationSchema) deserializer).setSourceExactlyMetric(sourceExactlyMetric); + } } // ------------------------------------------------------------------------ @@ -304,17 +325,32 @@ private void restoreHistoryRecordsState() throws Exception { @Override public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception { - if (handover.hasError()) { - LOG.debug("snapshotState() called on closed source"); - throw new FlinkRuntimeException( - "Call snapshotState() on closed source, checkpoint failed."); - } else { - snapshotOffsetState(functionSnapshotContext.getCheckpointId()); - snapshotHistoryRecordsState(); - } - if (deserializer instanceof MongoDBConnectorDeserializationSchema) { - ((MongoDBConnectorDeserializationSchema) deserializer) - .updateCurrentCheckpointId(functionSnapshotContext.getCheckpointId()); + try { + if (handover.hasError()) { + LOG.debug("snapshotState() called on closed source"); + throw new FlinkRuntimeException( + "Call snapshotState() on closed source, checkpoint failed."); + } else { + snapshotOffsetState(functionSnapshotContext.getCheckpointId()); + snapshotHistoryRecordsState(); + } + if (deserializer instanceof MongoDBConnectorDeserializationSchema) { + ((MongoDBConnectorDeserializationSchema) deserializer) + .updateCurrentCheckpointId(functionSnapshotContext.getCheckpointId()); + } + if (checkpointStartTimeMap != null) { + checkpointStartTimeMap.put(functionSnapshotContext.getCheckpointId(), System.currentTimeMillis()); + } else { + LOG.error("checkpointStartTimeMap is null, can't record the start time of checkpoint"); + } + if (sourceExactlyMetric != null) { + sourceExactlyMetric.incNumSnapshotCreate();; + } + } catch (Exception e) { + if (sourceExactlyMetric != null) { + sourceExactlyMetric.incNumDeserializeError(); + } + throw e; } } @@ -496,6 +532,16 @@ public void notifyCheckpointComplete(long checkpointId) { schema.flushAudit(); schema.updateLastCheckpointId(checkpointId); } + if (checkpointStartTimeMap != null) { + Long snapShotStartTimeById = checkpointStartTimeMap.remove(checkpointId); + if (snapShotStartTimeById != null && sourceExactlyMetric != null) { + sourceExactlyMetric.incNumSnapshotComplete(); + sourceExactlyMetric + .recordSnapshotToCheckpointDelay(System.currentTimeMillis() - snapShotStartTimeById); + } + } else { + LOG.error("checkpointStartTimeMap is null, can't get the start time of checkpoint"); + } } catch (Exception e) { // ignore exception if we are no longer running LOG.warn("Ignore error when committing offset to database.", e); diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/MongoDBConnectorDeserializationSchema.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/MongoDBConnectorDeserializationSchema.java index daa8dccb791..668f6de4cb8 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/MongoDBConnectorDeserializationSchema.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/MongoDBConnectorDeserializationSchema.java @@ -137,53 +137,66 @@ public void open() { @Override public void deserialize(SourceRecord record, Collector out) throws Exception { - Struct value = (Struct) record.value(); - Schema valueSchema = record.valueSchema(); - - OperationType op = operationTypeFor(record); - BsonDocument documentKey = - checkNotNull( - extractBsonDocument( - value, valueSchema, MongoDBEnvelope.DOCUMENT_KEY_FIELD)); - BsonDocument fullDocument = - extractBsonDocument(value, valueSchema, MongoDBEnvelope.FULL_DOCUMENT_FIELD); - switch (op) { - case INSERT: - GenericRowData insert = extractRowData(fullDocument); - insert.setRowKind(RowKind.INSERT); - emit(record, insert, - sourceExactlyMetric == null ? out : new MetricsCollector<>(out, sourceExactlyMetric)); - break; - case DELETE: - GenericRowData delete = extractRowData(documentKey); - delete.setRowKind(RowKind.DELETE); - emit(record, delete, - sourceExactlyMetric == null ? out : new MetricsCollector<>(out, sourceExactlyMetric)); - break; - case UPDATE: - // It’s null if another operation deletes the document - // before the lookup operation happens. Ignored it. - if (fullDocument == null) { + long deserializeStartTime = System.currentTimeMillis(); + try { + Struct value = (Struct) record.value(); + Schema valueSchema = record.valueSchema(); + + OperationType op = operationTypeFor(record); + BsonDocument documentKey = + checkNotNull( + extractBsonDocument( + value, valueSchema, MongoDBEnvelope.DOCUMENT_KEY_FIELD)); + BsonDocument fullDocument = + extractBsonDocument(value, valueSchema, MongoDBEnvelope.FULL_DOCUMENT_FIELD); + switch (op) { + case INSERT: + GenericRowData insert = extractRowData(fullDocument); + insert.setRowKind(RowKind.INSERT); + emit(record, insert, + sourceExactlyMetric == null ? out : new MetricsCollector<>(out, sourceExactlyMetric)); break; - } - GenericRowData updateAfter = extractRowData(fullDocument); - updateAfter.setRowKind(RowKind.UPDATE_AFTER); - emit(record, updateAfter, - sourceExactlyMetric == null ? out : new MetricsCollector<>(out, sourceExactlyMetric)); - break; - case REPLACE: - GenericRowData replaceAfter = extractRowData(fullDocument); - replaceAfter.setRowKind(RowKind.UPDATE_AFTER); - emit(record, replaceAfter, - sourceExactlyMetric == null ? out : new MetricsCollector<>(out, sourceExactlyMetric)); - break; - case INVALIDATE: - case DROP: - case DROP_DATABASE: - case RENAME: - case OTHER: - default: - break; + case DELETE: + GenericRowData delete = extractRowData(documentKey); + delete.setRowKind(RowKind.DELETE); + emit(record, delete, + sourceExactlyMetric == null ? out : new MetricsCollector<>(out, sourceExactlyMetric)); + break; + case UPDATE: + // It’s null if another operation deletes the document + // before the lookup operation happens. Ignored it. + if (fullDocument == null) { + break; + } + GenericRowData updateAfter = extractRowData(fullDocument); + updateAfter.setRowKind(RowKind.UPDATE_AFTER); + emit(record, updateAfter, + sourceExactlyMetric == null ? out : new MetricsCollector<>(out, sourceExactlyMetric)); + break; + case REPLACE: + GenericRowData replaceAfter = extractRowData(fullDocument); + replaceAfter.setRowKind(RowKind.UPDATE_AFTER); + emit(record, replaceAfter, + sourceExactlyMetric == null ? out : new MetricsCollector<>(out, sourceExactlyMetric)); + break; + case INVALIDATE: + case DROP: + case DROP_DATABASE: + case RENAME: + case OTHER: + default: + break; + } + if (sourceExactlyMetric != null) { + sourceExactlyMetric.incNumDeserializeSuccess(); + sourceExactlyMetric.recordDeserializeDelay(System.currentTimeMillis() - deserializeStartTime); + } + + } catch (Exception e) { + if (sourceExactlyMetric != null) { + sourceExactlyMetric.incNumDeserializeError(); + } + throw e; } } @@ -827,4 +840,9 @@ public void updateLastCheckpointId(long checkpointId) { sourceExactlyMetric.updateLastCheckpointId(checkpointId); } } + + /** setter for DebeziumSourceFunction to set SourceExactlyMetric*/ + public void setSourceExactlyMetric(SourceExactlyMetric sourceExactlyMetric) { + this.sourceExactlyMetric = sourceExactlyMetric; + } } diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/MongoDBSource.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/MongoDBSource.java index f9aab2d54f3..67bb51bf69f 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/MongoDBSource.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/MongoDBSource.java @@ -17,6 +17,8 @@ package org.apache.inlong.sort.mongodb; +import org.apache.inlong.sort.base.metric.MetricOption; + import com.mongodb.client.model.changestream.FullDocument; import com.mongodb.kafka.connect.source.MongoSourceConfig; import com.mongodb.kafka.connect.source.MongoSourceConfig.ErrorTolerance; @@ -35,7 +37,11 @@ import static com.ververica.cdc.connectors.mongodb.internal.MongoDBConnectorSourceTask.DATABASE_INCLUDE_LIST; import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.HEARTBEAT_TOPIC_NAME; import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.OUTPUT_SCHEMA; -import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.*; +import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.BATCH_SIZE; +import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.COPY_EXISTING; +import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.HEARTBEAT_INTERVAL_MILLIS; +import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.POLL_AWAIT_TIME_MILLIS; +import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.POLL_MAX_BATCH_SIZE; import static com.ververica.cdc.connectors.mongodb.source.utils.MongoUtils.buildConnectionString; import static org.apache.flink.util.Preconditions.checkArgument; @@ -76,6 +82,7 @@ public static class Builder { private String copyExistingPipeline; private Integer heartbeatIntervalMillis = HEARTBEAT_INTERVAL_MILLIS.defaultValue(); private DebeziumDeserializationSchema deserializer; + private MetricOption metricOption; /** The comma-separated list of hostname and port pairs of mongodb servers. */ public Builder hosts(String hosts) { @@ -243,6 +250,11 @@ public Builder deserializer(DebeziumDeserializationSchema deserializer) { return this; } + public Builder metricOption(MetricOption metricOption) { + this.metricOption = metricOption; + return this; + } + /** * The properties of mongodb kafka connector. * https://docs.mongodb.com/kafka-connector/current/kafka-source @@ -338,7 +350,7 @@ public DebeziumSourceFunction build() { MongoSourceConfig.ERRORS_TOLERANCE_CONFIG, ErrorTolerance.NONE.value()); return new DebeziumSourceFunction<>( - deserializer, props, null, Validator.getDefaultValidator()); + deserializer, props, null, Validator.getDefaultValidator(), metricOption); } } } diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/MongoDBTableSource.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/MongoDBTableSource.java index 9c417b4edfa..a161077b0a3 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/MongoDBTableSource.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/MongoDBTableSource.java @@ -191,13 +191,15 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { .ifPresent(builder::heartbeatIntervalMillis); Optional.ofNullable(splitMetaGroupSize).ifPresent(builder::splitMetaGroupSize); Optional.ofNullable(splitSizeMB).ifPresent(builder::splitSizeMB); + Optional.ofNullable(metricOption).ifPresent(builder::metricOption); return SourceProvider.of(builder.build()); } else { org.apache.inlong.sort.mongodb.MongoDBSource.Builder builder = org.apache.inlong.sort.mongodb.MongoDBSource.builder() .hosts(hosts) - .deserializer(deserializer); + .deserializer(deserializer) + .metricOption(metricOption); Optional.ofNullable(databaseList).ifPresent(builder::databaseList); Optional.ofNullable(collectionList).ifPresent(builder::collectionList); diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/source/MongoDBSourceBuilder.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/source/MongoDBSourceBuilder.java index a95f238a0be..e25a03034a0 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/source/MongoDBSourceBuilder.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/source/MongoDBSourceBuilder.java @@ -17,6 +17,7 @@ package org.apache.inlong.sort.mongodb.source; +import org.apache.inlong.sort.base.metric.MetricOption; import org.apache.inlong.sort.mongodb.DebeziumDeserializationSchema; import com.ververica.cdc.connectors.base.options.StartupOptions; @@ -54,6 +55,7 @@ public class MongoDBSourceBuilder { private final MongoDBSourceConfigFactory configFactory = new MongoDBSourceConfigFactory(); private DebeziumDeserializationSchema deserializer; + private MetricOption metricOption; /** The comma-separated list of hostname and port pairs of mongodb servers. */ public MongoDBSourceBuilder hosts(String hosts) { @@ -189,6 +191,11 @@ public MongoDBSourceBuilder deserializer(DebeziumDeserializationSchema des return this; } + public MongoDBSourceBuilder metricOption(MetricOption metricOption) { + this.metricOption = metricOption; + return this; + } + /** * Build the {@link MongoDBSource}. * From c848546479b8e993042e7734cdde39a312e0910f Mon Sep 17 00:00:00 2001 From: PeterZh6 Date: Mon, 14 Oct 2024 16:00:17 +0800 Subject: [PATCH 2/2] chore: add comment for a builder of metricOption --- .../apache/inlong/sort/mongodb/source/MongoDBSourceBuilder.java | 1 + 1 file changed, 1 insertion(+) diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/source/MongoDBSourceBuilder.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/source/MongoDBSourceBuilder.java index e25a03034a0..07b0d9f1cf5 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/source/MongoDBSourceBuilder.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/source/MongoDBSourceBuilder.java @@ -191,6 +191,7 @@ public MongoDBSourceBuilder deserializer(DebeziumDeserializationSchema des return this; } + /** The metric option used to collect metrics when inlong.metric.labels is present in flink sql. */ public MongoDBSourceBuilder metricOption(MetricOption metricOption) { this.metricOption = metricOption; return this;