diff --git a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java index 1e1a6247621..f988d1385e9 100644 --- a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java +++ b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java @@ -345,6 +345,16 @@ private void outputDefaultMetrics(long rowCountSize, long rowDataSize) { } } + /** + * flush audit data + * usually call this method in close method or when checkpointing + */ + public void flushAuditData() { + if (auditOperator != null) { + auditOperator.send(); + } + } + private void outputDefaultMetrics(long rowCountSize, long rowDataSize, long fetchDelay, long emitDelay) { outputDefaultMetrics(rowCountSize, rowDataSize); this.fetchDelay = fetchDelay; diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java index 0cf31c206ee..b43f56e6ce5 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java @@ -17,6 +17,7 @@ package org.apache.inlong.sort.iceberg.sink; +import org.apache.flink.runtime.state.StateSnapshotContext; import org.apache.inlong.sort.base.metric.MetricOption; import org.apache.inlong.sort.iceberg.utils.SinkMetadataUtils; @@ -78,6 +79,11 @@ public void prepareSnapshotPreBarrier(long checkpointId) throws Exception { this.writer = taskWriterFactory.create(); } + @Override + public void snapshotState(StateSnapshotContext context) { + writerMetrics.flushAudit(); + } + @Override public void processElement(StreamRecord element) throws Exception { T data = element.getValue(); diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriterMetrics.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriterMetrics.java index 1d627714bc2..72ca7e0cf5f 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriterMetrics.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriterMetrics.java @@ -113,4 +113,10 @@ void outputMetricsWithEstimate(int size, long time) { sourceMetricData.outputMetrics(1, size, time); } } + + void flushAudit() { + if (sourceMetricData != null) { + sourceMetricData.flushAuditData(); + } + } } diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/IcebergSourceReader.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/IcebergSourceReader.java index ad3a9b13d44..df75723cebe 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/IcebergSourceReader.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/IcebergSourceReader.java @@ -28,6 +28,7 @@ import java.util.Collection; import java.util.Collections; +import java.util.List; import java.util.Map; /** @@ -38,6 +39,7 @@ public class IcebergSourceReader extends SingleThreadMultiplexSourceReaderBase, T, IcebergSourceSplit, IcebergSourceSplit> { + private final InlongIcebergSourceReaderMetrics metrics; public IcebergSourceReader( InlongIcebergSourceReaderMetrics metrics, ReaderFunction readerFunction, @@ -47,6 +49,7 @@ public IcebergSourceReader( new IcebergSourceRecordEmitter<>(), context.getConfiguration(), context); + this.metrics = metrics; } @Override @@ -62,6 +65,11 @@ public void start() { protected void onSplitFinished(Map finishedSplitIds) { requestSplit(Lists.newArrayList(finishedSplitIds.keySet())); } + @Override + public List snapshotState(long checkpointId) { + metrics.flushAudit(); + return super.snapshotState(checkpointId); + } @Override protected IcebergSourceSplit initializedState(IcebergSourceSplit split) { diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/InlongIcebergSourceReaderMetrics.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/InlongIcebergSourceReaderMetrics.java index 252ae4580db..2210fbca02f 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/InlongIcebergSourceReaderMetrics.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/InlongIcebergSourceReaderMetrics.java @@ -77,4 +77,10 @@ private long getDataSize(T object) { } return object.toString().getBytes(StandardCharsets.UTF_8).length; } + + void flushAudit() { + if (sourceMetricData != null) { + sourceMetricData.flushAuditData(); + } + } } diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java index b9fb6d1b0d4..1f261cfef5d 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java @@ -317,6 +317,8 @@ public void snapshotState(FunctionSnapshotContext context) throws Exception { offsetsState.add(new Tuple2<>(entry.getKey(), entry.getValue())); } + deserializationSchema.flushAudit(); + LOG.info("Successfully save the offsets in checkpoint {}: {}.", context.getCheckpointId(), currentOffsets); } diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQDeserializationSchema.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQDeserializationSchema.java index 4c4eaac841a..c6ec9ea9cb7 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQDeserializationSchema.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQDeserializationSchema.java @@ -58,4 +58,6 @@ default void deserialize(Message message, Collector out) throws IOException { out.collect(deserialize); } } + + void flushAudit(); } diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQTableDeserializationSchema.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQTableDeserializationSchema.java index 8ee154c5359..3f2a57d7c79 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQTableDeserializationSchema.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQTableDeserializationSchema.java @@ -109,6 +109,13 @@ public void deserialize(Message message, Collector out) throws IOExcept } + @Override + public void flushAudit() { + if (sourceMetricData != null) { + sourceMetricData.flushAuditData(); + } + } + @Override public TypeInformation getProducedType() { return producedTypeInfo;