From fbfee538b84056585fcaea794aa24d1e8e4282e7 Mon Sep 17 00:00:00 2001 From: yevhenii-lopatenko Date: Sun, 3 Nov 2024 23:33:05 +0100 Subject: [PATCH] fix data loss for RabbitMqStreamNativeChangeConsumer.java --- .../RabbitMqStreamNativeChangeConsumer.java | 37 +++++++++++++++++-- 1 file changed, 33 insertions(+), 4 deletions(-) diff --git a/debezium-server-rabbitmq/src/main/java/io/debezium/server/rabbitmq/RabbitMqStreamNativeChangeConsumer.java b/debezium-server-rabbitmq/src/main/java/io/debezium/server/rabbitmq/RabbitMqStreamNativeChangeConsumer.java index e0bc18f2..70de2942 100644 --- a/debezium-server-rabbitmq/src/main/java/io/debezium/server/rabbitmq/RabbitMqStreamNativeChangeConsumer.java +++ b/debezium-server-rabbitmq/src/main/java/io/debezium/server/rabbitmq/RabbitMqStreamNativeChangeConsumer.java @@ -10,6 +10,9 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; @@ -38,7 +41,6 @@ * Implementation of the consumer that delivers the messages into RabbitMQ Stream destination. * * @author Olivier Boudet - * */ @Named("rabbitmqstream") @Dependent @@ -200,6 +202,9 @@ void close() { public void handleBatch(List> records, RecordCommitter> committer) throws InterruptedException { + CountDownLatch latch = new CountDownLatch(records.size()); + AtomicBoolean hasError = new AtomicBoolean(false); + for (ChangeEvent record : records) { LOGGER.trace("Received event '{}'", record); @@ -230,17 +235,41 @@ public void handleBatch(List> records, RecordCommitt producer.send( producer.messageBuilder().addData(getBytes(value)).build(), confirmationStatus -> { + try { + if (confirmationStatus.isConfirmed()) { + committer.markProcessed(record); + } + else { + LOGGER.error("Failed to confirm message delivery for event '{}'", record); + hasError.set(true); + } + } + catch (Exception e) { + LOGGER.error("Failed to process record '{}': {}", record, e.getMessage()); + hasError.set(true); + } + finally { + latch.countDown(); + } }); } catch (StreamException e) { throw new DebeziumException(e); } + } - committer.markProcessed(record); + if (!latch.await(producerConfirmTimeout, TimeUnit.SECONDS)) { + LOGGER.warn("Timeout while waiting for batch confirmation"); + hasError.set(true); } - LOGGER.trace("Sent messages"); - committer.markBatchFinished(); + if (!hasError.get()) { + LOGGER.trace("All messages sent successfully"); + committer.markBatchFinished(); + } + else { + LOGGER.error("Batch processing was incomplete due to record processing errors."); + } } private Map convertRabbitMqHeaders(ChangeEvent record) {