diff --git a/debezium-server-kinesis/src/main/java/io/debezium/server/kinesis/KinesisChangeConsumer.java b/debezium-server-kinesis/src/main/java/io/debezium/server/kinesis/KinesisChangeConsumer.java index c3a8acc3..1f714a5d 100644 --- a/debezium-server-kinesis/src/main/java/io/debezium/server/kinesis/KinesisChangeConsumer.java +++ b/debezium-server-kinesis/src/main/java/io/debezium/server/kinesis/KinesisChangeConsumer.java @@ -6,9 +6,14 @@ package io.debezium.server.kinesis; import java.net.URI; +import java.time.Duration; import java.util.List; import java.util.Optional; +import io.debezium.DebeziumException; +import io.debezium.annotation.VisibleForTesting; +import io.debezium.util.Clock; +import io.debezium.util.Metronome; import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; import jakarta.enterprise.context.Dependent; @@ -34,6 +39,7 @@ import software.amazon.awssdk.services.kinesis.KinesisClient; import software.amazon.awssdk.services.kinesis.KinesisClientBuilder; import software.amazon.awssdk.services.kinesis.model.PutRecordRequest; +import software.amazon.awssdk.core.exception.SdkClientException; /** * Implementation of the consumer that delivers the messages into Amazon Kinesis destination. @@ -56,6 +62,11 @@ public class KinesisChangeConsumer extends BaseChangeConsumer implements Debeziu private Optional endpointOverride; private Optional credentialsProfile; + private static int retries; + private static Duration retryInterval; + private static final int DEFAULT_RETRIES = 5; + private static final Long RETRY_INTERVAL = Integer.toUnsignedLong(1_000); // Default to 1s + @ConfigProperty(name = PROP_PREFIX + "null.key", defaultValue = "default") String nullKey; @@ -86,6 +97,12 @@ void connect() { LOGGER.info("Using default KinesisClient '{}'", client); } + @VisibleForTesting + void initWithConfig(Config config) { + retries = DEFAULT_RETRIES; + retryInterval = Duration.ofMillis(RETRY_INTERVAL); + } + @PreDestroy void close() { try { @@ -101,19 +118,39 @@ public void handleBatch(List> records, RecordCommitt throws InterruptedException { for (ChangeEvent record : records) { LOGGER.trace("Received event '{}'", record); + + if (record.value() != null) { + int attempts = 0; + if (!recordSent(record)) { + attempts++; + if (attempts >= retries) { + throw new DebeziumException("Exceeded maximum number of attempts to publish event " + record); + } + Metronome.sleeper(retryInterval, Clock.SYSTEM).pause(); + } + committer.markProcessed(record); + } + } + committer.markBatchFinished(); + } + + private boolean recordSent(ChangeEvent record) throws InterruptedException { + boolean sent = false; + try { Object rv = record.value(); if (rv == null) { rv = ""; } - final PutRecordRequest putRecord = PutRecordRequest.builder() .partitionKey((record.key() != null) ? getString(record.key()) : nullKey) .streamName(streamNameMapper.map(record.destination())) .data(SdkBytes.fromByteArray(getBytes(rv))) .build(); client.putRecord(putRecord); - committer.markProcessed(record); + } catch (SdkClientException exception) { + LOGGER.error("Failed to send record to {}:", record.destination(), exception); + throw new DebeziumException(exception); } - committer.markBatchFinished(); + return sent; } }