From e4be885f2cc2407447d70b97b7f9e8bbcb08ab1b Mon Sep 17 00:00:00 2001 From: martin Date: Tue, 18 Jun 2024 16:55:37 +0100 Subject: [PATCH] first stab at snapshotting --- .../As400ChangeEventSourceFactory.java | 30 +++++++++++++++++++ .../db2as400/As400OffsetContext.java | 13 ++++++-- .../As400StreamingChangeEventSource.java | 13 ++++++++ .../db2/journal/retrieve/RetrieveJournal.java | 2 +- .../AS400JDBCConnectionForcedCcsid.java | 2 -- 5 files changed, 55 insertions(+), 5 deletions(-) diff --git a/debezium-connector-ibmi/src/main/java/io/debezium/connector/db2as400/As400ChangeEventSourceFactory.java b/debezium-connector-ibmi/src/main/java/io/debezium/connector/db2as400/As400ChangeEventSourceFactory.java index 0009227..f1b63f3 100644 --- a/debezium-connector-ibmi/src/main/java/io/debezium/connector/db2as400/As400ChangeEventSourceFactory.java +++ b/debezium-connector-ibmi/src/main/java/io/debezium/connector/db2as400/As400ChangeEventSourceFactory.java @@ -5,17 +5,24 @@ */ package io.debezium.connector.db2as400; +import java.util.Optional; + import io.debezium.jdbc.MainConnectionProvidingConnectionFactory; import io.debezium.pipeline.ErrorHandler; import io.debezium.pipeline.EventDispatcher; import io.debezium.pipeline.notification.NotificationService; +import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotChangeEventSource; +import io.debezium.pipeline.source.snapshot.incremental.SignalBasedIncrementalSnapshotChangeEventSource; import io.debezium.pipeline.source.spi.ChangeEventSourceFactory; +import io.debezium.pipeline.source.spi.DataChangeEventListener; import io.debezium.pipeline.source.spi.SnapshotChangeEventSource; import io.debezium.pipeline.source.spi.SnapshotProgressListener; import io.debezium.pipeline.source.spi.StreamingChangeEventSource; import io.debezium.relational.TableId; import io.debezium.snapshot.SnapshotterService; +import io.debezium.spi.schema.DataCollectionId; import io.debezium.util.Clock; +import io.debezium.util.Strings; public class As400ChangeEventSourceFactory implements ChangeEventSourceFactory { @@ -59,4 +66,27 @@ public StreamingChangeEventSource getStreami return new As400StreamingChangeEventSource(configuration, rpcConnection, jdbcConnectionFactory.mainConnection(), dispatcher, errorHandler, clock, schema); } + + @Override + public Optional> getIncrementalSnapshotChangeEventSource(As400OffsetContext offsetContext, + SnapshotProgressListener snapshotProgressListener, + DataChangeEventListener dataChangeEventListener, + NotificationService notificationService) { + // If no data collection id is provided, don't return an instance as the implementation requires + // that a signal data collection id be provided to work. + if (Strings.isNullOrEmpty(configuration.getSignalingDataCollectionId())) { + return Optional.empty(); + } + final SignalBasedIncrementalSnapshotChangeEventSource incrementalSnapshotChangeEventSource = new SignalBasedIncrementalSnapshotChangeEventSource<>( + configuration, + jdbcConnectionFactory.mainConnection(), + dispatcher, + schema, + clock, + snapshotProgressListener, + dataChangeEventListener, + notificationService); + return Optional.of(incrementalSnapshotChangeEventSource); + } + } diff --git a/debezium-connector-ibmi/src/main/java/io/debezium/connector/db2as400/As400OffsetContext.java b/debezium-connector-ibmi/src/main/java/io/debezium/connector/db2as400/As400OffsetContext.java index 71b0999..1cb39b1 100644 --- a/debezium-connector-ibmi/src/main/java/io/debezium/connector/db2as400/As400OffsetContext.java +++ b/debezium-connector-ibmi/src/main/java/io/debezium/connector/db2as400/As400OffsetContext.java @@ -20,9 +20,12 @@ import io.debezium.connector.SnapshotRecord; import io.debezium.ibmi.db2.journal.retrieve.JournalProcessedPosition; import io.debezium.ibmi.db2.journal.retrieve.JournalReceiver; +import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotContext; +import io.debezium.pipeline.source.snapshot.incremental.SignalBasedIncrementalSnapshotContext; import io.debezium.pipeline.spi.OffsetContext; import io.debezium.pipeline.txmetadata.TransactionContext; import io.debezium.relational.RelationalDatabaseConnectorConfig; +import io.debezium.relational.TableId; import io.debezium.spi.schema.DataCollectionId; public class As400OffsetContext implements OffsetContext { @@ -50,6 +53,7 @@ public class As400OffsetContext implements OffsetContext { private final String inclueTables; private boolean hasNewTables = false; private volatile boolean snapshotComplete = false; + private final IncrementalSnapshotContext incrementalSnapshotContext = new SignalBasedIncrementalSnapshotContext<>(); public As400OffsetContext(As400ConnectorConfig connectorConfig) { super(); @@ -70,7 +74,7 @@ public As400OffsetContext(As400ConnectorConfig connectorConfig, JournalProcessed } public As400OffsetContext(As400ConnectorConfig connectorConfig, JournalProcessedPosition position, String includeTables, - boolean snapshotComplete) { + boolean snapshotComplete, IncrementalSnapshotContext incrementalSnapshotContext) { super(); partition = Collections.singletonMap(SERVER_PARTITION_KEY, connectorConfig.getLogicalName()); this.position = position; @@ -209,7 +213,7 @@ public As400OffsetContext load(Map map) { Instant time = (TimeStr == null) ? Instant.ofEpochSecond(0) : Instant.ofEpochSecond(Long.parseLong(TimeStr)); position = new JournalProcessedPosition(offset, new JournalReceiver(receiver, receiverLibrary), time, processed); } - return new As400OffsetContext(connectorConfig, position, inclueTables, snapshotComplete); + return new As400OffsetContext(connectorConfig, position, inclueTables, snapshotComplete, SignalBasedIncrementalSnapshotContext.load(map)); } } @@ -230,4 +234,9 @@ public String toString() { public void markSnapshotRecord(SnapshotRecord record) { sourceInfo.setSnapshot(record); } + + @Override + public IncrementalSnapshotContext getIncrementalSnapshotContext() { + return incrementalSnapshotContext; + } } diff --git a/debezium-connector-ibmi/src/main/java/io/debezium/connector/db2as400/As400StreamingChangeEventSource.java b/debezium-connector-ibmi/src/main/java/io/debezium/connector/db2as400/As400StreamingChangeEventSource.java index 0ed7bdb..f193b38 100644 --- a/debezium-connector-ibmi/src/main/java/io/debezium/connector/db2as400/As400StreamingChangeEventSource.java +++ b/debezium-connector-ibmi/src/main/java/io/debezium/connector/db2as400/As400StreamingChangeEventSource.java @@ -21,6 +21,7 @@ import io.debezium.DebeziumException; import io.debezium.connector.db2as400.As400RpcConnection.BlockingReceiverConsumer; +import io.debezium.connector.db2as400.As400RpcConnection.RpcException; import io.debezium.data.Envelope.Operation; import io.debezium.ibmi.db2.journal.retrieve.JournalEntryType; import io.debezium.ibmi.db2.journal.retrieve.JournalProcessedPosition; @@ -336,4 +337,16 @@ private BlockingReceiverConsumer processJournalEntries(As400Partition partition, private boolean ignore(JournalEntryType journalCode) { return journalCode == JournalEntryType.OPEN || journalCode == JournalEntryType.CLOSE; } + + @Override + public As400OffsetContext getOffsetContext() { + // TODO should this be the last process position? + try { + return new As400OffsetContext(connectorConfig, new JournalProcessedPosition(dataConnection.getCurrentPosition(), null, true)); + } + catch (RpcException e) { + log.error("failed to retrieve journal position", e); + } + return null; + } } diff --git a/journal-parsing/src/main/java/io/debezium/ibmi/db2/journal/retrieve/RetrieveJournal.java b/journal-parsing/src/main/java/io/debezium/ibmi/db2/journal/retrieve/RetrieveJournal.java index 4462c80..bbe6a65 100644 --- a/journal-parsing/src/main/java/io/debezium/ibmi/db2/journal/retrieve/RetrieveJournal.java +++ b/journal-parsing/src/main/java/io/debezium/ibmi/db2/journal/retrieve/RetrieveJournal.java @@ -99,7 +99,7 @@ public boolean retrieveJournal(JournalProcessedPosition previousPosition, final this.header = new FirstHeader(0, 0, 0, OffsetStatus.NOT_CALLED, new JournalProcessedPosition(range.end(), Instant.EPOCH, true)); - log.debug("start equals end - range {}", range); + log.trace("start equals end - range {}", range); return true; } diff --git a/jt400-override-ccsid/src/main/java/com/ibm/as400/access/AS400JDBCConnectionForcedCcsid.java b/jt400-override-ccsid/src/main/java/com/ibm/as400/access/AS400JDBCConnectionForcedCcsid.java index 3096a39..1e8a305 100644 --- a/jt400-override-ccsid/src/main/java/com/ibm/as400/access/AS400JDBCConnectionForcedCcsid.java +++ b/jt400-override-ccsid/src/main/java/com/ibm/as400/access/AS400JDBCConnectionForcedCcsid.java @@ -23,8 +23,6 @@ public AS400JDBCConnectionForcedCcsid() { @Override public ConvTable getConverter(int ccsid) throws SQLException { if (this.fromCcsid != null && fromCcsid.intValue() == ccsid && toCcsid != null) { - log.fine(() -> String.format("requested ccsid %d using replacement ccsid %d\n\t%s", ccsid, toCcsid, - getStack())); return super.getConverter(this.toCcsid); } log.fine(() -> String.format("requested ccsid %d using parent converter\n\t%s", ccsid, getStack()));