Skip to content

Commit

Permalink
Add batch confirmation timeout config and exception handling for Rabb…
Browse files Browse the repository at this point in the history
…itMqStreamNativeChangeConsumer.handleBatch
  • Loading branch information
yevhenii-lopatenko authored and jpechane committed Nov 12, 2024
1 parent fbfee53 commit af0d8c3
Showing 1 changed file with 5 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,9 @@ public class RabbitMqStreamNativeChangeConsumer extends BaseChangeConsumer imple
@ConfigProperty(name = PROP_PREFIX + "producer.enqueueTimeout", defaultValue = "10")
int producerEnqueueTimeout;

@ConfigProperty(name = PROP_PREFIX + "batchConfirmTimeout", defaultValue = "30")
int batchConfirmTimeout;

@ConfigProperty(name = PROP_PREFIX + "null.value", defaultValue = "default")
String nullValue;

Expand Down Expand Up @@ -258,7 +261,7 @@ public void handleBatch(List<ChangeEvent<Object, Object>> records, RecordCommitt
}
}

if (!latch.await(producerConfirmTimeout, TimeUnit.SECONDS)) {
if (!latch.await(batchConfirmTimeout, TimeUnit.SECONDS)) {
LOGGER.warn("Timeout while waiting for batch confirmation");
hasError.set(true);
}
Expand All @@ -268,7 +271,7 @@ public void handleBatch(List<ChangeEvent<Object, Object>> records, RecordCommitt
committer.markBatchFinished();
}
else {
LOGGER.error("Batch processing was incomplete due to record processing errors.");
throw new DebeziumException("Batch processing was incomplete due to record processing errors.");
}
}

Expand Down

0 comments on commit af0d8c3

Please sign in to comment.