diff --git a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java index 8fd698fa126..219ef02dcaf 100644 --- a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java +++ b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java @@ -67,6 +67,20 @@ public final class Constants { public static final String CURRENT_EMIT_EVENT_TIME_LAG = "currentEmitEventTimeLag"; + public static final String DESERIALIZE_TIME_LAG = "deserializeTimeLag"; + + public static final String NUM_DESERIALIZE_SUCCESS = "numDeserializeSuccess"; + + public static final String NUM_DESERIALIZE_ERROR = "numDeserializeError"; + + public static final String NUM_SNAPSHOT_CREATE = "numSnapshotCreate"; + + public static final String NUM_SNAPSHOT_ERROR = "numSnapshotError"; + + public static final String NUM_SNAPSHOT_COMPLETE = "numSnapshotComplete"; + + public static final String SNAPSHOT_TO_CHECKPOINT_TIME_LAG = "snapshotToCheckpointTimeLag"; + /** * Timestamp when the read phase changed */ diff --git a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceExactlyMetric.java b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceExactlyMetric.java index 19f9f1eda92..a0b71c554c8 100644 --- a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceExactlyMetric.java +++ b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceExactlyMetric.java @@ -33,12 +33,19 @@ import static org.apache.inlong.common.constant.Constants.DEFAULT_AUDIT_VERSION; import static org.apache.inlong.sort.base.Constants.CURRENT_EMIT_EVENT_TIME_LAG; import static org.apache.inlong.sort.base.Constants.CURRENT_FETCH_EVENT_TIME_LAG; +import static org.apache.inlong.sort.base.Constants.DESERIALIZE_TIME_LAG; import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN; import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN_FOR_METER; import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN_PER_SECOND; +import static org.apache.inlong.sort.base.Constants.NUM_DESERIALIZE_ERROR; +import static org.apache.inlong.sort.base.Constants.NUM_DESERIALIZE_SUCCESS; import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN; import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN_FOR_METER; import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN_PER_SECOND; +import static org.apache.inlong.sort.base.Constants.NUM_SNAPSHOT_COMPLETE; +import static org.apache.inlong.sort.base.Constants.NUM_SNAPSHOT_CREATE; +import static org.apache.inlong.sort.base.Constants.NUM_SNAPSHOT_ERROR; +import static org.apache.inlong.sort.base.Constants.SNAPSHOT_TO_CHECKPOINT_TIME_LAG; import static org.apache.inlong.sort.base.util.CalculateObjectSizeUtils.getDataSize; public class SourceExactlyMetric implements MetricData, Serializable, SourceMetricsReporter { @@ -50,6 +57,13 @@ public class SourceExactlyMetric implements MetricData, Serializable, SourceMetr private Counter numBytesIn; private Counter numRecordsInForMeter; private Counter numBytesInForMeter; + private Counter numDeserializeSuccess; + private Counter numDeserializeError; + private Gauge deserializeTimeLag; + private Counter numSnapshotCreate; + private Counter numSnapshotError; + private Counter numSnapshotComplete; + private Gauge snapshotToCheckpointTimeLag; private Meter numRecordsInPerSecond; private Meter numBytesInPerSecond; private AuditReporterImpl auditReporter; @@ -80,6 +94,17 @@ public class SourceExactlyMetric implements MetricData, Serializable, SourceMetr */ private volatile long emitDelay = 0L; + /** + * deserializeDelay = deserializeEndTime - deserializeStartTime, where the deserializeStartTime is the time method deserialize is called, + * and deserializeEndTime is the time the record is emitted + */ + private volatile long deserializeDelay = 0L; + + /** + * snapshotToCheckpointDelay = snapShotCompleteTime - snapShotStartTimeById, where the snapShotCompleteTime is the time the logic of notifyCheckpointComplete is finished + */ + private volatile long snapshotToCheckpointDelay = 0L; + public SourceExactlyMetric(MetricOption option, MetricGroup metricGroup) { this.metricGroup = metricGroup; this.labels = option.getLabels(); @@ -98,6 +123,13 @@ public SourceExactlyMetric(MetricOption option, MetricGroup metricGroup) { registerMetricsForNumRecordsInPerSecond(); registerMetricsForCurrentFetchEventTimeLag(); registerMetricsForCurrentEmitEventTimeLag(); + registerMetricsForDeserializeTimeLag(); + registerMetricsForNumSnapshotComplete(new ThreadSafeCounter()); + registerMetricsForNumDeserializeSuccess(new ThreadSafeCounter()); + registerMetricsForNumDeserializeError(new ThreadSafeCounter()); + registerMetricsForNumSnapshotCreate(new ThreadSafeCounter()); + registerMetricsForNumSnapshotError(new ThreadSafeCounter()); + registerMetricsForSnapshotToCheckpointTimeLag(); break; } @@ -178,6 +210,58 @@ public void registerMetricsForCurrentFetchEventTimeLag() { public void registerMetricsForCurrentEmitEventTimeLag() { currentEmitEventTimeLag = registerGauge(CURRENT_EMIT_EVENT_TIME_LAG, (Gauge) this::getEmitDelay); } + public void registerMetricsForDeserializeTimeLag() { + deserializeTimeLag = registerGauge(DESERIALIZE_TIME_LAG, (Gauge) this::getDeserializeDelay); + } + + public void registerMetricsForNumDeserializeSuccess(Counter counter) { + numDeserializeSuccess = registerCounter(NUM_DESERIALIZE_SUCCESS, counter); + } + + public void registerMetricsForNumDeserializeError(Counter counter) { + numDeserializeError = registerCounter(NUM_DESERIALIZE_ERROR, counter); + } + + public void registerMetricsForNumSnapshotCreate(Counter counter) { + numSnapshotCreate = registerCounter(NUM_SNAPSHOT_CREATE, counter); + } + + public void registerMetricsForNumSnapshotError(Counter counter) { + numSnapshotError = registerCounter(NUM_SNAPSHOT_ERROR, counter); + } + + public void registerMetricsForNumSnapshotComplete(Counter counter) { + numSnapshotComplete = registerCounter(NUM_SNAPSHOT_COMPLETE, counter); + } + + public void registerMetricsForSnapshotToCheckpointTimeLag() { + snapshotToCheckpointTimeLag = + registerGauge(SNAPSHOT_TO_CHECKPOINT_TIME_LAG, (Gauge) this::getSnapshotToCheckpointDelay); + } + + public Gauge getDeserializeTimeLag() { + return deserializeTimeLag; + } + + public Gauge getSnapshotToCheckpointTimeLag() { + return snapshotToCheckpointTimeLag; + } + + public Counter getNumDeserializeSuccess() { + return numDeserializeSuccess; + } + + public Counter getNumDeserializeError() { + return numDeserializeError; + } + + public Counter getNumSnapshotCreate() { + return numSnapshotCreate; + } + + public Counter getNumSnapshotError() { + return numSnapshotError; + } public Counter getNumRecordsIn() { return numRecordsIn; @@ -211,6 +295,26 @@ public long getEmitDelay() { return emitDelay; } + public long getDeserializeDelay() { + return deserializeDelay; + } + + public long getSnapshotToCheckpointDelay() { + return snapshotToCheckpointDelay; + } + + public Counter getNumSnapshotComplete() { + return numSnapshotComplete; + } + + public void recordDeserializeDelay(long deserializeDelay) { + this.deserializeDelay = deserializeDelay; + } + + public void recordSnapshotToCheckpointDelay(long snapshotToCheckpointDelay) { + this.snapshotToCheckpointDelay = snapshotToCheckpointDelay; + } + @Override public MetricGroup getMetricGroup() { return metricGroup; @@ -262,6 +366,36 @@ private void outputDefaultMetrics(long rowCountSize, long rowDataSize) { } } + public void incNumDeserializeSuccess() { + if (numDeserializeSuccess != null) { + numDeserializeSuccess.inc(); + } + } + + public void incNumDeserializeError() { + if (numDeserializeError != null) { + numDeserializeError.inc(); + } + } + + public void incNumSnapshotCreate() { + if (numSnapshotCreate != null) { + numSnapshotCreate.inc(); + } + } + + public void incNumSnapshotError() { + if (numSnapshotError != null) { + numSnapshotError.inc(); + } + } + + public void incNumSnapshotComplete() { + if (numSnapshotComplete != null) { + numSnapshotComplete.inc(); + } + } + /** * flush audit data * usually call this method in close method or when checkpointing @@ -292,6 +426,15 @@ public String toString() { + ", numBytesInPerSecond=" + numBytesInPerSecond.getRate() + ", currentFetchEventTimeLag=" + currentFetchEventTimeLag.getValue() + ", currentEmitEventTimeLag=" + currentEmitEventTimeLag.getValue() + + ", deserializeTimeLag=" + deserializeTimeLag.getValue() + + ", numDeserializeSuccess=" + numDeserializeSuccess.getCount() + + ", numDeserializeError=" + numDeserializeError.getCount() + + ", numSnapshotCreate=" + numSnapshotCreate.getCount() + + ", numSnapshotError=" + numSnapshotError.getCount() + + ", snapshotToCheckpointTimeLag=" + snapshotToCheckpointTimeLag.getValue() + + ", numSnapshotComplete=" + numSnapshotComplete.getCount() + + ", numRecordsInPerSecond=" + numRecordsInPerSecond.getRate() + + ", numBytesInPerSecond=" + numBytesInPerSecond.getRate() + ", auditReporter=" + auditReporter + '}'; } diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/DebeziumSourceFunction.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/DebeziumSourceFunction.java index 5efc6c6ea5d..b3ee727c0d5 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/DebeziumSourceFunction.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/DebeziumSourceFunction.java @@ -17,6 +17,9 @@ package org.apache.inlong.sort.postgre; +import org.apache.inlong.sort.base.metric.MetricOption; +import org.apache.inlong.sort.base.metric.SourceExactlyMetric; + import com.ververica.cdc.debezium.Validator; import com.ververica.cdc.debezium.internal.DebeziumChangeConsumer; import com.ververica.cdc.debezium.internal.DebeziumOffset; @@ -62,6 +65,8 @@ import java.lang.reflect.Method; import java.nio.charset.StandardCharsets; import java.util.Collection; +import java.util.HashMap; +import java.util.Map; import java.util.Properties; import java.util.UUID; import java.util.concurrent.ConcurrentLinkedQueue; @@ -198,17 +203,27 @@ public class DebeziumSourceFunction extends RichSourceFunction /** Buffer the events from the source and record the errors from the debezium. */ private transient Handover handover; + /** Self-defined Flink metrics. */ + private transient SourceExactlyMetric sourceExactlyMetric; + + private final MetricOption metricOption; + + /** The map to store the start time of each checkpoint. */ + private transient Map checkpointStartTimeMap; + // --------------------------------------------------------------------------------------- public DebeziumSourceFunction( DebeziumDeserializationSchema deserializer, Properties properties, @Nullable DebeziumOffset specificOffset, - Validator validator) { + Validator validator, + MetricOption metricOption) { this.deserializer = deserializer; this.properties = properties; this.specificOffset = specificOffset; this.validator = validator; + this.metricOption = metricOption; } @Override @@ -221,6 +236,15 @@ public void open(Configuration parameters) throws Exception { this.executor = Executors.newSingleThreadExecutor(threadFactory); this.handover = new Handover(); this.changeConsumer = new DebeziumChangeConsumer(handover); + if (metricOption != null) { + sourceExactlyMetric = new SourceExactlyMetric(metricOption, getRuntimeContext().getMetricGroup()); + } + if (deserializer instanceof RowDataDebeziumDeserializeSchema) { + ((RowDataDebeziumDeserializeSchema) deserializer) + .setSourceExactlyMetric(sourceExactlyMetric); + } + // instantiate checkpointStartTimeMap after restoring from checkpoint + checkpointStartTimeMap = new HashMap<>(); } // ------------------------------------------------------------------------ @@ -305,6 +329,17 @@ private void restoreHistoryRecordsState() throws Exception { @Override public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception { + try { + doSnapshotState(functionSnapshotContext); + } catch (Exception e) { + if (sourceExactlyMetric != null) { + sourceExactlyMetric.incNumSnapshotError(); + } + throw e; + } + } + + private void doSnapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception { if (handover.hasError()) { LOG.debug("snapshotState() called on closed source"); throw new FlinkRuntimeException( @@ -317,6 +352,15 @@ public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throw ((RowDataDebeziumDeserializeSchema) deserializer) .updateCurrentCheckpointId(functionSnapshotContext.getCheckpointId()); } + // record the start time of each checkpoint + if (checkpointStartTimeMap != null) { + checkpointStartTimeMap.put(functionSnapshotContext.getCheckpointId(), System.currentTimeMillis()); + } else { + LOG.error("checkpointStartTimeMap is null, can't record the start time of checkpoint"); + } + if (sourceExactlyMetric != null) { + sourceExactlyMetric.incNumSnapshotCreate(); + } } private void snapshotOffsetState(long checkpointId) throws Exception { @@ -496,6 +540,17 @@ public void notifyCheckpointComplete(long checkpointId) { schema.flushAudit(); schema.updateLastCheckpointId(checkpointId); } + // get the start time of the currently completed checkpoint + if (checkpointStartTimeMap != null) { + Long snapShotStartTimeById = checkpointStartTimeMap.remove(checkpointId); + if (snapShotStartTimeById != null && sourceExactlyMetric != null) { + sourceExactlyMetric.incNumSnapshotComplete(); + sourceExactlyMetric + .recordSnapshotToCheckpointDelay(System.currentTimeMillis() - snapShotStartTimeById); + } + } else { + LOG.error("checkpointStartTimeMap is null, can't get the start time of checkpoint"); + } } catch (Exception e) { // ignore exception if we are no longer running LOG.warn("Ignore error when committing offset to database.", e); diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLSource.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLSource.java index 040541b8272..cbd1eb2679d 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLSource.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLSource.java @@ -17,6 +17,8 @@ package org.apache.inlong.sort.postgre; +import org.apache.inlong.sort.base.metric.MetricOption; + import com.ververica.cdc.debezium.Validator; import io.debezium.connector.postgresql.PostgresConnector; @@ -53,6 +55,7 @@ public static class Builder { private String[] tableList; private Properties dbzProperties; private DebeziumDeserializationSchema deserializer; + private MetricOption metricOption; /** * The name of the Postgres logical decoding plug-in installed on the server. Supported @@ -146,6 +149,12 @@ public Builder deserializer(DebeziumDeserializationSchema deserializer) { return this; } + /** metricOption used to instantiate SourceExactlyMetric */ + public Builder metricOption(MetricOption metricOption) { + this.metricOption = metricOption; + return this; + } + public DebeziumSourceFunction build() { Properties props = new Properties(); props.setProperty("connector.class", PostgresConnector.class.getCanonicalName()); @@ -178,7 +187,7 @@ public DebeziumSourceFunction build() { } return new DebeziumSourceFunction<>( - deserializer, props, null, Validator.getDefaultValidator()); + deserializer, props, null, Validator.getDefaultValidator(), metricOption); } } } diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableSource.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableSource.java index 6e4bd7c9229..cabfe255fc4 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableSource.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableSource.java @@ -135,7 +135,6 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { PostgreSQLDeserializationConverterFactory.instance()) .setValueValidator(new PostgresValueValidator(schemaName, tableName)) .setChangelogMode(changelogMode) - .setMetricOption(metricOption) .build(); DebeziumSourceFunction sourceFunction = PostgreSQLSource.builder() @@ -150,6 +149,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { .slotName(slotName) .debeziumProperties(dbzProperties) .deserializer(deserializer) + .metricOption(metricOption) .build(); return SourceFunctionProvider.of(sourceFunction, false); } diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/RowDataDebeziumDeserializeSchema.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/RowDataDebeziumDeserializeSchema.java index fdf2d013279..8afca47c949 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/RowDataDebeziumDeserializeSchema.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/RowDataDebeziumDeserializeSchema.java @@ -17,7 +17,6 @@ package org.apache.inlong.sort.postgre; -import org.apache.inlong.sort.base.metric.MetricOption; import org.apache.inlong.sort.base.metric.MetricsCollector; import org.apache.inlong.sort.base.metric.SourceExactlyMetric; @@ -100,7 +99,8 @@ public interface ValueValidator extends Serializable { /** Changelog Mode to use for encoding changes in Flink internal data structure. */ private final DebeziumChangelogMode changelogMode; - private final MetricOption metricOption; + + /** Self-defined Flink metrics, which will be set by DebeziumSourceFunction with setter */ private SourceExactlyMetric sourceExactlyMetric; /** Returns a builder to build {@link RowDataDebeziumDeserializeSchema}. */ @@ -115,8 +115,7 @@ public static Builder newBuilder() { ValueValidator validator, ZoneId serverTimeZone, DeserializationRuntimeConverterFactory userDefinedConverterFactory, - DebeziumChangelogMode changelogMode, - MetricOption metricOption) { + DebeziumChangelogMode changelogMode) { this.hasMetadata = checkNotNull(metadataConverters).length > 0; this.appendMetadataCollector = new AppendMetadataCollector(metadataConverters); this.physicalConverter = @@ -127,18 +126,27 @@ public static Builder newBuilder() { this.resultTypeInfo = checkNotNull(resultTypeInfo); this.validator = checkNotNull(validator); this.changelogMode = checkNotNull(changelogMode); - this.metricOption = metricOption; } @Override public void open() { - if (metricOption != null) { - sourceExactlyMetric = new SourceExactlyMetric(metricOption); - } } @Override public void deserialize(SourceRecord record, Collector out) throws Exception { + long deserializeStartTime = System.currentTimeMillis(); + try { + doDeserialize(record, out, deserializeStartTime); + } catch (Exception e) { + if (sourceExactlyMetric != null) { + sourceExactlyMetric.incNumDeserializeError(); + } + throw e; + } + } + + private void doDeserialize(SourceRecord record, Collector out, long deserializeStartTime) + throws Exception { Envelope.Operation op = Envelope.operationFor(record); Struct value = (Struct) record.value(); Schema valueSchema = record.valueSchema(); @@ -174,6 +182,10 @@ public void deserialize(SourceRecord record, Collector out) throws Exce } emit(record, after, out); } + if (sourceExactlyMetric != null) { + sourceExactlyMetric.incNumDeserializeSuccess(); + sourceExactlyMetric.recordDeserializeDelay(System.currentTimeMillis() - deserializeStartTime); + } } private GenericRowData extractAfterRow(Struct value, Schema valueSchema) throws Exception { @@ -219,7 +231,6 @@ public static class Builder { private DeserializationRuntimeConverterFactory userDefinedConverterFactory = DeserializationRuntimeConverterFactory.DEFAULT; private DebeziumChangelogMode changelogMode = DebeziumChangelogMode.ALL; - private MetricOption metricOption; public Builder setPhysicalRowType(RowType physicalRowType) { this.physicalRowType = physicalRowType; @@ -251,10 +262,6 @@ public Builder setChangelogMode(DebeziumChangelogMode changelogMode) { this.changelogMode = changelogMode; return this; } - public Builder setMetricOption(MetricOption metricOption) { - this.metricOption = metricOption; - return this; - } public RowDataDebeziumDeserializeSchema build() { return new RowDataDebeziumDeserializeSchema( @@ -264,8 +271,7 @@ public RowDataDebeziumDeserializeSchema build() { validator, serverTimeZone, userDefinedConverterFactory, - changelogMode, - metricOption); + changelogMode); } } @@ -704,4 +710,9 @@ public void updateLastCheckpointId(long checkpointId) { sourceExactlyMetric.updateLastCheckpointId(checkpointId); } } + + /** setter to enable DebeziumSourceFunction to set the metric */ + public void setSourceExactlyMetric(SourceExactlyMetric sourceExactlyMetric) { + this.sourceExactlyMetric = sourceExactlyMetric; + } }