diff --git a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceExactlyMetric.java b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceExactlyMetric.java index a0b71c554c8..a5252402c80 100644 --- a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceExactlyMetric.java +++ b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceExactlyMetric.java @@ -372,6 +372,12 @@ public void incNumDeserializeSuccess() { } } + public void decNumDeserializeSuccess() { + if (numDeserializeSuccess != null) { + numDeserializeSuccess.dec(); + } + } + public void incNumDeserializeError() { if (numDeserializeError != null) { numDeserializeError.inc(); diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/reader/PulsarOrderedSourceReader.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/reader/PulsarOrderedSourceReader.java index f535082ea28..3c75793f93f 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/reader/PulsarOrderedSourceReader.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/reader/PulsarOrderedSourceReader.java @@ -17,6 +17,7 @@ package org.apache.inlong.sort.pulsar.source.reader; +import org.apache.inlong.sort.base.metric.SourceExactlyMetric; import org.apache.inlong.sort.pulsar.table.PulsarTableDeserializationSchema; import org.apache.flink.annotation.Internal; @@ -70,6 +71,10 @@ public class PulsarOrderedSourceReader extends PulsarSourceReaderBase private final AtomicReference cursorCommitThrowable = new AtomicReference<>(); private final PulsarDeserializationSchema deserializationSchema; private ScheduledExecutorService cursorScheduler; + private SourceExactlyMetric sourceExactlyMetric; + + /** The map to store the start time of each checkpoint. */ + private transient Map checkpointStartTimeMap; public PulsarOrderedSourceReader( FutureCompletingBlockingQueue>> elementsQueue, @@ -90,6 +95,12 @@ public PulsarOrderedSourceReader( this.cursorsToCommit = Collections.synchronizedSortedMap(new TreeMap<>()); this.cursorsOfFinishedSplits = new ConcurrentHashMap<>(); this.deserializationSchema = deserializationSchema; + // get SourceExactlyMetric instance from deserializationSchema + if (deserializationSchema instanceof PulsarTableDeserializationSchema) { + this.sourceExactlyMetric = + ((PulsarTableDeserializationSchema) deserializationSchema).getSourceExactlyMetric(); + } + this.checkpointStartTimeMap = new HashMap<>(); } @Override @@ -131,25 +142,40 @@ protected void onSplitFinished(Map finishedSp @Override public List snapshotState(long checkpointId) { - if (deserializationSchema instanceof PulsarTableDeserializationSchema) { - ((PulsarTableDeserializationSchema) deserializationSchema).updateCurrentCheckpointId(checkpointId); - } - List splits = super.snapshotState(checkpointId); + try { + // record the start time of each checkpoint + if (checkpointStartTimeMap != null) { + checkpointStartTimeMap.put(checkpointId, System.currentTimeMillis()); + } + if (sourceExactlyMetric != null) { + sourceExactlyMetric.incNumSnapshotCreate(); + } - // Perform a snapshot for these splits. - Map cursors = - cursorsToCommit.computeIfAbsent(checkpointId, id -> new HashMap<>()); - // Put the cursors of the active splits. - for (PulsarPartitionSplit split : splits) { - MessageId latestConsumedId = split.getLatestConsumedId(); - if (latestConsumedId != null) { - cursors.put(split.getPartition(), latestConsumedId); + if (deserializationSchema instanceof PulsarTableDeserializationSchema) { + ((PulsarTableDeserializationSchema) deserializationSchema).updateCurrentCheckpointId(checkpointId); } - } - // Put cursors of all the finished splits. - cursors.putAll(cursorsOfFinishedSplits); + List splits = super.snapshotState(checkpointId); + + // Perform a snapshot for these splits. + Map cursors = + cursorsToCommit.computeIfAbsent(checkpointId, id -> new HashMap<>()); + // Put the cursors of the active splits. + for (PulsarPartitionSplit split : splits) { + MessageId latestConsumedId = split.getLatestConsumedId(); + if (latestConsumedId != null) { + cursors.put(split.getPartition(), latestConsumedId); + } + } + // Put cursors of all the finished splits. + cursors.putAll(cursorsOfFinishedSplits); - return splits; + return splits; + } catch (Exception e) { + if (sourceExactlyMetric != null) { + sourceExactlyMetric.incNumSnapshotError(); + } + throw e; + } } @Override @@ -170,6 +196,17 @@ public void notifyCheckpointComplete(long checkpointId) { pulsarTableDeserializationSchema.flushAudit(); pulsarTableDeserializationSchema.updateLastCheckpointId(checkpointId); } + // get the start time of the currently completed checkpoint + if (checkpointStartTimeMap != null) { + Long snapShotStartTimeById = checkpointStartTimeMap.remove(checkpointId); + if (snapShotStartTimeById != null && sourceExactlyMetric != null) { + sourceExactlyMetric.incNumSnapshotComplete(); + sourceExactlyMetric + .recordSnapshotToCheckpointDelay(System.currentTimeMillis() - snapShotStartTimeById); + } + } else { + LOG.error("checkpointStartTimeMap is null, can't get the start time of checkpoint"); + } } catch (Exception e) { LOG.error("Failed to acknowledge cursors for checkpoint {}", checkpointId, e); cursorCommitThrowable.compareAndSet(null, e); diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/reader/PulsarUnorderedSourceReader.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/reader/PulsarUnorderedSourceReader.java index 2ccf74fe3aa..adf15de0b11 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/reader/PulsarUnorderedSourceReader.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/reader/PulsarUnorderedSourceReader.java @@ -17,6 +17,7 @@ package org.apache.inlong.sort.pulsar.source.reader; +import org.apache.inlong.sort.base.metric.SourceExactlyMetric; import org.apache.inlong.sort.pulsar.table.PulsarTableDeserializationSchema; import org.apache.flink.annotation.Internal; @@ -41,6 +42,7 @@ import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.SortedMap; @@ -66,6 +68,11 @@ public class PulsarUnorderedSourceReader extends PulsarSourceReaderBase deserializationSchema; private boolean started = false; + private SourceExactlyMetric sourceExactlyMetric; + + /** The map to store the start time of each checkpoint. */ + private transient Map checkpointStartTimeMap; + public PulsarUnorderedSourceReader( FutureCompletingBlockingQueue>> elementsQueue, Supplier> splitReaderSupplier, @@ -86,6 +93,11 @@ public PulsarUnorderedSourceReader( this.transactionsToCommit = Collections.synchronizedSortedMap(new TreeMap<>()); this.transactionsOfFinishedSplits = Collections.synchronizedList(new ArrayList<>()); this.deserializationSchema = deserializationSchema; + if (deserializationSchema instanceof PulsarTableDeserializationSchema) { + this.sourceExactlyMetric = + ((PulsarTableDeserializationSchema) deserializationSchema).getSourceExactlyMetric(); + } + this.checkpointStartTimeMap = new HashMap<>(); } @Override @@ -141,26 +153,38 @@ protected void onSplitFinished(Map finishedSp @Override public List snapshotState(long checkpointId) { - LOG.debug("Trigger the new transaction for downstream readers."); - if (deserializationSchema instanceof PulsarTableDeserializationSchema) { - ((PulsarTableDeserializationSchema) deserializationSchema).updateCurrentCheckpointId(checkpointId); - } - List splits = - ((PulsarUnorderedFetcherManager) splitFetcherManager).snapshotState(); + try { + // record the start time of each checkpoint + if (checkpointStartTimeMap != null) { + checkpointStartTimeMap.put(checkpointId, System.currentTimeMillis()); + } + if (sourceExactlyMetric != null) { + sourceExactlyMetric.incNumSnapshotCreate(); + } + LOG.debug("Trigger the new transaction for downstream readers."); + if (deserializationSchema instanceof PulsarTableDeserializationSchema) { + ((PulsarTableDeserializationSchema) deserializationSchema).updateCurrentCheckpointId(checkpointId); + } + List splits = + ((PulsarUnorderedFetcherManager) splitFetcherManager).snapshotState(); - if (coordinatorClient == null) { - return splits; - } - // Snapshot the transaction status and commit it after checkpoint finishing. - List txnIDs = - transactionsToCommit.computeIfAbsent(checkpointId, id -> new ArrayList<>()); - for (PulsarPartitionSplit split : splits) { - TxnID uncommittedTransactionId = split.getUncommittedTransactionId(); - if (uncommittedTransactionId != null) { - txnIDs.add(uncommittedTransactionId); + if (coordinatorClient == null) { + return splits; + } + // Snapshot the transaction status and commit it after checkpoint finishing. + List txnIDs = + transactionsToCommit.computeIfAbsent(checkpointId, id -> new ArrayList<>()); + for (PulsarPartitionSplit split : splits) { + TxnID uncommittedTransactionId = split.getUncommittedTransactionId(); + if (uncommittedTransactionId != null) { + txnIDs.add(uncommittedTransactionId); + } } + return splits; + } catch (Exception e) { + sourceExactlyMetric.incNumSnapshotError(); + throw e; } - return splits; } @Override @@ -188,5 +212,16 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception { pulsarTableDeserializationSchema.flushAudit(); pulsarTableDeserializationSchema.updateLastCheckpointId(checkpointId); } + // get the start time of the currently completed checkpoint + if (checkpointStartTimeMap != null) { + Long snapShotStartTimeById = checkpointStartTimeMap.remove(checkpointId); + if (snapShotStartTimeById != null && sourceExactlyMetric != null) { + sourceExactlyMetric.incNumSnapshotComplete(); + sourceExactlyMetric + .recordSnapshotToCheckpointDelay(System.currentTimeMillis() - snapShotStartTimeById); + } + } else { + LOG.error("checkpointStartTimeMap is null, can't get the start time of checkpoint"); + } } } diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchema.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchema.java index c05f485af6f..5fef1d95c52 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchema.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchema.java @@ -86,7 +86,7 @@ public void open(DeserializationSchema.InitializationContext context, SourceConf keyDeserialization.open(context); } if (metricOption != null) { - sourceExactlyMetric = new SourceExactlyMetric(metricOption); + sourceExactlyMetric = new SourceExactlyMetric(metricOption, context.getMetricGroup()); } valueDeserialization.open(context); } @@ -94,27 +94,43 @@ public void open(DeserializationSchema.InitializationContext context, SourceConf @Override public void deserialize(Message message, Collector collector) throws IOException { - // Get the key row data - List keyRowData = new ArrayList<>(); - if (keyDeserialization != null) { - keyDeserialization.deserialize(message.getKeyBytes(), new ListCollector<>(keyRowData)); + try { + long deserializeStartTime = System.currentTimeMillis(); + // increase the number of deserialize success first, if deserialize failed, decrease it + if (sourceExactlyMetric != null) { + sourceExactlyMetric.incNumDeserializeSuccess(); + } + // Get the key row data + List keyRowData = new ArrayList<>(); + if (keyDeserialization != null) { + keyDeserialization.deserialize(message.getKeyBytes(), new ListCollector<>(keyRowData)); + } + + // Get the value row data + List valueRowData = new ArrayList<>(); + + if (upsertMode && message.getData().length == 0) { + rowDataConverter.projectToRowWithNullValueRow(message, keyRowData, collector); + return; + } + + MetricsCollector metricsCollector = + new MetricsCollector<>(collector, sourceExactlyMetric); + + valueDeserialization.deserialize(message.getData(), new ListCollector<>(valueRowData)); + + rowDataConverter.projectToProducedRowAndCollect( + message, keyRowData, valueRowData, metricsCollector); + if (sourceExactlyMetric != null) { + sourceExactlyMetric.recordDeserializeDelay(System.currentTimeMillis() - deserializeStartTime); + } + } catch (Exception e) { + if (sourceExactlyMetric != null) { + sourceExactlyMetric.incNumDeserializeError(); + sourceExactlyMetric.decNumDeserializeSuccess(); + } + throw e; } - - // Get the value row data - List valueRowData = new ArrayList<>(); - - if (upsertMode && message.getData().length == 0) { - rowDataConverter.projectToRowWithNullValueRow(message, keyRowData, collector); - return; - } - - MetricsCollector metricsCollector = - new MetricsCollector<>(collector, sourceExactlyMetric); - - valueDeserialization.deserialize(message.getData(), new ListCollector<>(valueRowData)); - - rowDataConverter.projectToProducedRowAndCollect( - message, keyRowData, valueRowData, metricsCollector); } @Override @@ -139,4 +155,9 @@ public void updateLastCheckpointId(long checkpointId) { sourceExactlyMetric.updateLastCheckpointId(checkpointId); } } + + /** getter for PulsarSourceReader to record metrics */ + public SourceExactlyMetric getSourceExactlyMetric() { + return sourceExactlyMetric; + } } \ No newline at end of file