From 7ff22d0be4b340aa30939ba24cdbb402ddeb6890 Mon Sep 17 00:00:00 2001 From: nathan-smit-1 Date: Wed, 25 Sep 2024 12:41:17 +0200 Subject: [PATCH] DBZ-8236 Adjusted committer.markProcessed(record) to only execute after confirmed receipt of published messages by Pubsub --- .../io/debezium/server/pubsub/PubSubChangeConsumer.java | 8 ++++++-- .../debezium/server/pubsub/PubSubLiteChangeConsumer.java | 7 ++++++- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/debezium-server-pubsub/src/main/java/io/debezium/server/pubsub/PubSubChangeConsumer.java b/debezium-server-pubsub/src/main/java/io/debezium/server/pubsub/PubSubChangeConsumer.java index 33d786a9..7dd1a67d 100644 --- a/debezium-server-pubsub/src/main/java/io/debezium/server/pubsub/PubSubChangeConsumer.java +++ b/debezium-server-pubsub/src/main/java/io/debezium/server/pubsub/PubSubChangeConsumer.java @@ -265,8 +265,6 @@ public void handleBatch(List> records, RecordCommitt PubsubMessage message = buildPubSubMessage(record); deliveries.add(publisher.publish(message)); - - committer.markProcessed(record); } List messageIds; try { @@ -276,6 +274,12 @@ public void handleBatch(List> records, RecordCommitt throw new DebeziumException(e); } LOGGER.trace("Sent messages with ids: {}", messageIds); + + // Once publishing is confirmed, mark all records as processed + for (ChangeEvent record : records) { + committer.markProcessed(record); + } + committer.markBatchFinished(); } diff --git a/debezium-server-pubsub/src/main/java/io/debezium/server/pubsub/PubSubLiteChangeConsumer.java b/debezium-server-pubsub/src/main/java/io/debezium/server/pubsub/PubSubLiteChangeConsumer.java index 6a07a23c..86798bdf 100644 --- a/debezium-server-pubsub/src/main/java/io/debezium/server/pubsub/PubSubLiteChangeConsumer.java +++ b/debezium-server-pubsub/src/main/java/io/debezium/server/pubsub/PubSubLiteChangeConsumer.java @@ -132,7 +132,6 @@ public void handleBatch(List> records, RecordCommitt PubsubMessage message = buildPubSubMessage(record); deliveries.add(publisher.publish(message)); - committer.markProcessed(record); } List messageIds; try { @@ -142,6 +141,12 @@ public void handleBatch(List> records, RecordCommitt throw new DebeziumException(e); } LOGGER.trace("Sent messages with ids: {}", messageIds); + + // Once publishing is confirmed, mark all records as processed + for (ChangeEvent record : records) { + committer.markProcessed(record); + } + committer.markBatchFinished(); }