Skip to content

Commit

Permalink
[INLONG-11357][Sort] Add new source metrics for sort-connector-sqlser…
Browse files Browse the repository at this point in the history
…ver-cdc-v1.15 (#11358)
  • Loading branch information
PeterZh6 authored Oct 16, 2024
1 parent 9d9ba04 commit 9348626
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package org.apache.inlong.sort.sqlserver;

import org.apache.inlong.sort.base.metric.MetricOption;
import org.apache.inlong.sort.base.metric.SourceExactlyMetric;

import com.ververica.cdc.debezium.Validator;
import com.ververica.cdc.debezium.internal.DebeziumChangeConsumer;
import com.ververica.cdc.debezium.internal.DebeziumOffset;
Expand Down Expand Up @@ -61,6 +64,8 @@
import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
Expand Down Expand Up @@ -199,17 +204,24 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
/** Buffer the events from the source and record the errors from the debezium. */
private transient Handover handover;

private transient SourceExactlyMetric sourceExactlyMetric;

private final MetricOption metricOption;

private transient Map<Long, Long> checkpointStartTimeMap;

// ---------------------------------------------------------------------------------------

public DebeziumSourceFunction(
DebeziumDeserializationSchema<T> deserializer,
Properties properties,
@Nullable DebeziumOffset specificOffset,
Validator validator) {
Validator validator, MetricOption metricOption) {
this.deserializer = deserializer;
this.properties = properties;
this.specificOffset = specificOffset;
this.validator = validator;
this.metricOption = metricOption;
}

@Override
Expand All @@ -222,6 +234,14 @@ public void open(Configuration parameters) throws Exception {
this.executor = Executors.newSingleThreadExecutor(threadFactory);
this.handover = new Handover();
this.changeConsumer = new DebeziumChangeConsumer(handover);
if (metricOption != null) {
sourceExactlyMetric = new SourceExactlyMetric(metricOption, getRuntimeContext().getMetricGroup());
}
if (deserializer instanceof RowDataDebeziumDeserializeSchema) {
((RowDataDebeziumDeserializeSchema) deserializer)
.setSourceExactlyMetric(sourceExactlyMetric);
}
this.checkpointStartTimeMap = new HashMap<>();
}

// ------------------------------------------------------------------------
Expand Down Expand Up @@ -306,17 +326,33 @@ private void restoreHistoryRecordsState() throws Exception {

@Override
public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
if (handover.hasError()) {
LOG.debug("snapshotState() called on closed source");
throw new FlinkRuntimeException(
"Call snapshotState() on closed source, checkpoint failed.");
} else {
snapshotOffsetState(functionSnapshotContext.getCheckpointId());
snapshotHistoryRecordsState();
}
if (deserializer instanceof RowDataDebeziumDeserializeSchema) {
((RowDataDebeziumDeserializeSchema) deserializer)
.updateCurrentCheckpointId(functionSnapshotContext.getCheckpointId());
try {
if (handover.hasError()) {
LOG.debug("snapshotState() called on closed source");
throw new FlinkRuntimeException(
"Call snapshotState() on closed source, checkpoint failed.");
} else {
snapshotOffsetState(functionSnapshotContext.getCheckpointId());
snapshotHistoryRecordsState();
}
if (deserializer instanceof RowDataDebeziumDeserializeSchema) {
((RowDataDebeziumDeserializeSchema) deserializer)
.updateCurrentCheckpointId(functionSnapshotContext.getCheckpointId());
}
if (checkpointStartTimeMap != null) {
checkpointStartTimeMap.put(functionSnapshotContext.getCheckpointId(), System.currentTimeMillis());
} else {
LOG.error("checkpointStartTimeMap is null, can't record the start time of checkpoint");
}

if (sourceExactlyMetric != null) {
sourceExactlyMetric.incNumSnapshotCreate();
}
} catch (Exception e) {
if (sourceExactlyMetric != null) {
sourceExactlyMetric.incNumSnapshotCreate();
}
throw e;
}
}

Expand Down Expand Up @@ -498,6 +534,16 @@ public void notifyCheckpointComplete(long checkpointId) {
schema.flushAudit();
schema.updateLastCheckpointId(checkpointId);
}
if (checkpointStartTimeMap != null) {
Long snapShotStartTimeById = checkpointStartTimeMap.remove(checkpointId);
if (snapShotStartTimeById != null && sourceExactlyMetric != null) {
sourceExactlyMetric.incNumSnapshotComplete();
sourceExactlyMetric.recordSnapshotToCheckpointDelay(
System.currentTimeMillis() - snapShotStartTimeById);
}
} else {
LOG.error("checkpointStartTimeMap is null, can't get the start time of checkpoint");
}
} catch (Exception e) {
// ignore exception if we are no longer running
LOG.warn("Ignore error when committing offset to database.", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,37 +139,49 @@ public void open() {

@Override
public void deserialize(SourceRecord record, Collector<RowData> out) throws Exception {
Envelope.Operation op = Envelope.operationFor(record);
Struct value = (Struct) record.value();
Schema valueSchema = record.valueSchema();
if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) {
GenericRowData insert = extractAfterRow(value, valueSchema);
validator.validate(insert, RowKind.INSERT);
insert.setRowKind(RowKind.INSERT);
if (sourceExactlyMetric != null) {
out = new MetricsCollector<>(out, sourceExactlyMetric);
long deserializeStartTime = System.currentTimeMillis();
try {
Envelope.Operation op = Envelope.operationFor(record);
Struct value = (Struct) record.value();
Schema valueSchema = record.valueSchema();
if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) {
GenericRowData insert = extractAfterRow(value, valueSchema);
validator.validate(insert, RowKind.INSERT);
insert.setRowKind(RowKind.INSERT);
if (sourceExactlyMetric != null) {
out = new MetricsCollector<>(out, sourceExactlyMetric);
}
emit(record, insert, out);
} else if (op == Envelope.Operation.DELETE) {
GenericRowData delete = extractBeforeRow(value, valueSchema);
validator.validate(delete, RowKind.DELETE);
delete.setRowKind(RowKind.DELETE);
emit(record, delete, out);
} else {
if (changelogMode == DebeziumChangelogMode.ALL) {
GenericRowData before = extractBeforeRow(value, valueSchema);
validator.validate(before, RowKind.UPDATE_BEFORE);
before.setRowKind(RowKind.UPDATE_BEFORE);
emit(record, before, out);
}

GenericRowData after = extractAfterRow(value, valueSchema);
validator.validate(after, RowKind.UPDATE_AFTER);
after.setRowKind(RowKind.UPDATE_AFTER);
if (sourceExactlyMetric != null) {
out = new MetricsCollector<>(out, sourceExactlyMetric);
}
emit(record, after, out);
}
emit(record, insert, out);
} else if (op == Envelope.Operation.DELETE) {
GenericRowData delete = extractBeforeRow(value, valueSchema);
validator.validate(delete, RowKind.DELETE);
delete.setRowKind(RowKind.DELETE);
emit(record, delete, out);
} else {
if (changelogMode == DebeziumChangelogMode.ALL) {
GenericRowData before = extractBeforeRow(value, valueSchema);
validator.validate(before, RowKind.UPDATE_BEFORE);
before.setRowKind(RowKind.UPDATE_BEFORE);
emit(record, before, out);
if (sourceExactlyMetric != null) {
sourceExactlyMetric.incNumDeserializeSuccess();
sourceExactlyMetric.recordDeserializeDelay(System.currentTimeMillis() - deserializeStartTime);
}

GenericRowData after = extractAfterRow(value, valueSchema);
validator.validate(after, RowKind.UPDATE_AFTER);
after.setRowKind(RowKind.UPDATE_AFTER);
} catch (Exception e) {
if (sourceExactlyMetric != null) {
out = new MetricsCollector<>(out, sourceExactlyMetric);
sourceExactlyMetric.incNumDeserializeError();
}
emit(record, after, out);
throw e;
}
}

Expand Down Expand Up @@ -697,4 +709,9 @@ public void updateLastCheckpointId(long checkpointId) {
sourceExactlyMetric.updateLastCheckpointId(checkpointId);
}
}

/** allow DebeziumSourceFunction to set the SourceExactlyMetric */
public void setSourceExactlyMetric(SourceExactlyMetric sourceExactlyMetric) {
this.sourceExactlyMetric = sourceExactlyMetric;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.inlong.sort.sqlserver;

import org.apache.inlong.sort.base.metric.MetricOption;

import com.ververica.cdc.connectors.sqlserver.SqlServerValidator;
import com.ververica.cdc.connectors.sqlserver.table.StartupOptions;
import io.debezium.connector.sqlserver.SqlServerConnector;
Expand Down Expand Up @@ -51,6 +53,7 @@ public static class Builder<T> {
private Properties dbzProperties;
private StartupOptions startupOptions = StartupOptions.initial();
private DebeziumDeserializationSchema<T> deserializer;
private MetricOption metricOption;

public Builder<T> hostname(String hostname) {
this.hostname = hostname;
Expand Down Expand Up @@ -114,6 +117,12 @@ public Builder<T> startupOptions(StartupOptions startupOptions) {
return this;
}

/** metricOption used to instantiate SourceExactlyMetric when inlong.metric.labels is present in flink sql */
public Builder<T> metricOption(MetricOption metricOption) {
this.metricOption = metricOption;
return this;
}

public DebeziumSourceFunction<T> build() {
Properties props = new Properties();
props.setProperty("connector.class", SqlServerConnector.class.getCanonicalName());
Expand Down Expand Up @@ -154,7 +163,7 @@ public DebeziumSourceFunction<T> build() {
}

return new DebeziumSourceFunction<>(
deserializer, props, null, new SqlServerValidator(props));
deserializer, props, null, new SqlServerValidator(props), metricOption);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
.debeziumProperties(dbzProperties)
.startupOptions(startupOptions)
.deserializer(deserializer)
.metricOption(metricOption)
.build();
return SourceFunctionProvider.of(sourceFunction, false);
}
Expand Down

0 comments on commit 9348626

Please sign in to comment.