diff --git a/src/main/java/io/debezium/connector/db2/Db2OffsetContext.java b/src/main/java/io/debezium/connector/db2/Db2OffsetContext.java index 621986e..e46fa43 100644 --- a/src/main/java/io/debezium/connector/db2/Db2OffsetContext.java +++ b/src/main/java/io/debezium/connector/db2/Db2OffsetContext.java @@ -10,7 +10,9 @@ import org.apache.kafka.connect.data.Schema; +import io.debezium.connector.AbstractSourceInfo; import io.debezium.connector.SnapshotRecord; +import io.debezium.connector.SnapshotType; import io.debezium.pipeline.CommonOffsetContext; import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotContext; import io.debezium.pipeline.source.snapshot.incremental.SignalBasedIncrementalSnapshotContext; @@ -27,7 +29,6 @@ public class Db2OffsetContext extends CommonOffsetContext { private static final String EVENT_SERIAL_NO_KEY = "event_serial_no"; private final Schema sourceInfoSchema; - private boolean snapshotCompleted; private final TransactionContext transactionContext; private final IncrementalSnapshotContext incrementalSnapshotContext; @@ -37,35 +38,35 @@ public class Db2OffsetContext extends CommonOffsetContext { */ private long eventSerialNo; - public Db2OffsetContext(Db2ConnectorConfig connectorConfig, TxLogPosition position, boolean snapshot, boolean snapshotCompleted, long eventSerialNo, + public Db2OffsetContext(Db2ConnectorConfig connectorConfig, TxLogPosition position, SnapshotType snapshot, boolean snapshotCompleted, long eventSerialNo, TransactionContext transactionContext, IncrementalSnapshotContext incrementalSnapshotContext) { - super(new SourceInfo(connectorConfig)); + super(new SourceInfo(connectorConfig), snapshotCompleted); sourceInfo.setCommitLsn(position.getCommitLsn()); sourceInfo.setChangeLsn(position.getInTxLsn()); sourceInfoSchema = sourceInfo.schema(); - this.snapshotCompleted = snapshotCompleted; if (this.snapshotCompleted) { postSnapshotCompletion(); } else { - sourceInfo.setSnapshot(snapshot ? SnapshotRecord.TRUE : SnapshotRecord.FALSE); + setSnapshot(snapshot); + sourceInfo.setSnapshot(snapshot != null ? SnapshotRecord.TRUE : SnapshotRecord.FALSE); } this.eventSerialNo = eventSerialNo; this.transactionContext = transactionContext; this.incrementalSnapshotContext = incrementalSnapshotContext; } - public Db2OffsetContext(Db2ConnectorConfig connectorConfig, TxLogPosition position, boolean snapshot, boolean snapshotCompleted) { + public Db2OffsetContext(Db2ConnectorConfig connectorConfig, TxLogPosition position, SnapshotType snapshot, boolean snapshotCompleted) { this(connectorConfig, position, snapshot, snapshotCompleted, 1, new TransactionContext(), new SignalBasedIncrementalSnapshotContext<>(false)); } @Override public Map getOffset() { - if (sourceInfo.isSnapshot()) { + if (getSnapshot().isPresent()) { return Collect.hashMapOf( - SourceInfo.SNAPSHOT_KEY, true, + AbstractSourceInfo.SNAPSHOT_KEY, getSnapshot().get().toString(), SNAPSHOT_COMPLETED_KEY, snapshotCompleted, SourceInfo.COMMIT_LSN_KEY, sourceInfo.getCommitLsn().toString()); } @@ -102,26 +103,10 @@ public void setChangePosition(TxLogPosition position, int eventCount) { sourceInfo.setChangeLsn(position.getInTxLsn()); } - @Override - public boolean isSnapshotRunning() { - return sourceInfo.isSnapshot() && !snapshotCompleted; - } - public boolean isSnapshotCompleted() { return snapshotCompleted; } - @Override - public void preSnapshotStart() { - sourceInfo.setSnapshot(SnapshotRecord.TRUE); - snapshotCompleted = false; - } - - @Override - public void preSnapshotCompletion() { - snapshotCompleted = true; - } - public static class Loader implements OffsetContext.Loader { private final Db2ConnectorConfig connectorConfig; @@ -134,7 +119,7 @@ public Loader(Db2ConnectorConfig connectorConfig) { public Db2OffsetContext load(Map offset) { final Lsn changeLsn = Lsn.valueOf((String) offset.get(SourceInfo.CHANGE_LSN_KEY)); final Lsn commitLsn = Lsn.valueOf((String) offset.get(SourceInfo.COMMIT_LSN_KEY)); - boolean snapshot = Boolean.TRUE.equals(offset.get(SourceInfo.SNAPSHOT_KEY)); + final SnapshotType snapshot = loadSnapshot((Map) offset); boolean snapshotCompleted = Boolean.TRUE.equals(offset.get(SNAPSHOT_COMPLETED_KEY)); // only introduced in 0.10.Beta1, so it might be not present when upgrading from earlier versions diff --git a/src/main/java/io/debezium/connector/db2/Db2SnapshotChangeEventSource.java b/src/main/java/io/debezium/connector/db2/Db2SnapshotChangeEventSource.java index 7ca4ff4..e051b3f 100644 --- a/src/main/java/io/debezium/connector/db2/Db2SnapshotChangeEventSource.java +++ b/src/main/java/io/debezium/connector/db2/Db2SnapshotChangeEventSource.java @@ -133,7 +133,7 @@ protected void determineSnapshotOffset(RelationalSnapshotContext it = records.iterator(); it.hasNext();) { SourceRecord record = it.next(); - assertThat(record.sourceOffset().get("snapshot")).as("Snapshot phase").isEqualTo(true); + assertThat(record.sourceOffset().get("snapshot")).as("Snapshot phase").isEqualTo(SnapshotType.INITIAL.toString()); if (it.hasNext()) { assertThat(record.sourceOffset().get("snapshot_completed")).as("Snapshot in progress").isEqualTo(false); } diff --git a/src/test/java/io/debezium/connector/db2/SchemaHistoryTopicIT.java b/src/test/java/io/debezium/connector/db2/SchemaHistoryTopicIT.java index bdbcebf..f72f2d9 100644 --- a/src/test/java/io/debezium/connector/db2/SchemaHistoryTopicIT.java +++ b/src/test/java/io/debezium/connector/db2/SchemaHistoryTopicIT.java @@ -19,6 +19,7 @@ import org.junit.Test; import io.debezium.config.Configuration; +import io.debezium.connector.SnapshotType; import io.debezium.connector.db2.Db2ConnectorConfig.SnapshotMode; import io.debezium.connector.db2.util.TestHelper; import io.debezium.doc.FixFor; @@ -98,7 +99,7 @@ public void snapshotSchemaChanges() throws Exception { schemaRecords.forEach(record -> { assertThat(record.topic()).isEqualTo("testdb"); assertThat(((Struct) record.key()).getString("databaseName")).isEqualTo("TESTDB"); - assertThat(record.sourceOffset().get("snapshot")).isEqualTo(true); + assertThat(record.sourceOffset().get("snapshot")).isEqualTo(SnapshotType.INITIAL.toString()); }); assertThat(((Struct) schemaRecords.get(0).value()).getStruct("source").getString("snapshot")).isEqualTo("true"); assertThat(((Struct) schemaRecords.get(1).value()).getStruct("source").getString("snapshot")).isEqualTo("true");