Skip to content

Commit

Permalink
[INLONG-9380][Sort] Audit lost when stop job immediately after checkp…
Browse files Browse the repository at this point in the history
…oint
  • Loading branch information
vernedeng committed Dec 4, 2023
1 parent 6211eb3 commit 3e2558c
Show file tree
Hide file tree
Showing 8 changed files with 47 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,16 @@ private void outputDefaultMetrics(long rowCountSize, long rowDataSize) {
}
}

/**
* flush audit data
* usually call this method in close method or when checkpointing
*/
public void flushAuditData() {
if (auditOperator != null) {
auditOperator.send();
}
}

private void outputDefaultMetrics(long rowCountSize, long rowDataSize, long fetchDelay, long emitDelay) {
outputDefaultMetrics(rowCountSize, rowDataSize);
this.fetchDelay = fetchDelay;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.inlong.sort.iceberg.sink;

import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.inlong.sort.base.metric.MetricOption;
import org.apache.inlong.sort.iceberg.utils.SinkMetadataUtils;

Expand Down Expand Up @@ -78,6 +79,11 @@ public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
this.writer = taskWriterFactory.create();
}

@Override
public void snapshotState(StateSnapshotContext context) {
writerMetrics.flushAudit();
}

@Override
public void processElement(StreamRecord<T> element) throws Exception {
T data = element.getValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,4 +113,10 @@ void outputMetricsWithEstimate(int size, long time) {
sourceMetricData.outputMetrics(1, size, time);
}
}

void flushAudit() {
if (sourceMetricData != null) {
sourceMetricData.flushAuditData();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;

/**
Expand All @@ -38,6 +39,7 @@ public class IcebergSourceReader<T>
extends
SingleThreadMultiplexSourceReaderBase<RecordAndPosition<T>, T, IcebergSourceSplit, IcebergSourceSplit> {

private final InlongIcebergSourceReaderMetrics<T> metrics;
public IcebergSourceReader(
InlongIcebergSourceReaderMetrics<T> metrics,
ReaderFunction<T> readerFunction,
Expand All @@ -47,6 +49,7 @@ public IcebergSourceReader(
new IcebergSourceRecordEmitter<>(),
context.getConfiguration(),
context);
this.metrics = metrics;
}

@Override
Expand All @@ -62,6 +65,11 @@ public void start() {
protected void onSplitFinished(Map<String, IcebergSourceSplit> finishedSplitIds) {
requestSplit(Lists.newArrayList(finishedSplitIds.keySet()));
}
@Override
public List<IcebergSourceSplit> snapshotState(long checkpointId) {
metrics.flushAudit();
return super.snapshotState(checkpointId);
}

@Override
protected IcebergSourceSplit initializedState(IcebergSourceSplit split) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,10 @@ private long getDataSize(T object) {
}
return object.toString().getBytes(StandardCharsets.UTF_8).length;
}

void flushAudit() {
if (sourceMetricData != null) {
sourceMetricData.flushAuditData();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,8 @@ public void snapshotState(FunctionSnapshotContext context) throws Exception {
offsetsState.add(new Tuple2<>(entry.getKey(), entry.getValue()));
}

deserializationSchema.flushAudit();

LOG.info("Successfully save the offsets in checkpoint {}: {}.",
context.getCheckpointId(), currentOffsets);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,6 @@ default void deserialize(Message message, Collector<T> out) throws IOException {
out.collect(deserialize);
}
}

void flushAudit();
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,13 @@ public void deserialize(Message message, Collector<RowData> out) throws IOExcept

}

@Override
public void flushAudit() {
if (sourceMetricData != null) {
sourceMetricData.flushAuditData();
}
}

@Override
public TypeInformation<RowData> getProducedType() {
return producedTypeInfo;
Expand Down

0 comments on commit 3e2558c

Please sign in to comment.