Skip to content

Commit

Permalink
fix data loss for RabbitMqStreamNativeChangeConsumer.java
Browse files Browse the repository at this point in the history
  • Loading branch information
yevhenii-lopatenko authored and jpechane committed Nov 12, 2024
1 parent 6dba97b commit fbfee53
Showing 1 changed file with 33 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
Expand Down Expand Up @@ -38,7 +41,6 @@
* Implementation of the consumer that delivers the messages into RabbitMQ Stream destination.
*
* @author Olivier Boudet
*
*/
@Named("rabbitmqstream")
@Dependent
Expand Down Expand Up @@ -200,6 +202,9 @@ void close() {
public void handleBatch(List<ChangeEvent<Object, Object>> records, RecordCommitter<ChangeEvent<Object, Object>> committer)
throws InterruptedException {

CountDownLatch latch = new CountDownLatch(records.size());
AtomicBoolean hasError = new AtomicBoolean(false);

for (ChangeEvent<Object, Object> record : records) {
LOGGER.trace("Received event '{}'", record);

Expand Down Expand Up @@ -230,17 +235,41 @@ public void handleBatch(List<ChangeEvent<Object, Object>> records, RecordCommitt
producer.send(
producer.messageBuilder().addData(getBytes(value)).build(),
confirmationStatus -> {
try {
if (confirmationStatus.isConfirmed()) {
committer.markProcessed(record);
}
else {
LOGGER.error("Failed to confirm message delivery for event '{}'", record);
hasError.set(true);
}
}
catch (Exception e) {
LOGGER.error("Failed to process record '{}': {}", record, e.getMessage());
hasError.set(true);
}
finally {
latch.countDown();
}
});
}
catch (StreamException e) {
throw new DebeziumException(e);
}
}

committer.markProcessed(record);
if (!latch.await(producerConfirmTimeout, TimeUnit.SECONDS)) {
LOGGER.warn("Timeout while waiting for batch confirmation");
hasError.set(true);
}

LOGGER.trace("Sent messages");
committer.markBatchFinished();
if (!hasError.get()) {
LOGGER.trace("All messages sent successfully");
committer.markBatchFinished();
}
else {
LOGGER.error("Batch processing was incomplete due to record processing errors.");
}
}

private Map<String, Object> convertRabbitMqHeaders(ChangeEvent<Object, Object> record) {
Expand Down

0 comments on commit fbfee53

Please sign in to comment.