Skip to content

Commit

Permalink
DBZ-7032 Implement Retries for Amazon Kinesis Event Transmittion
Browse files Browse the repository at this point in the history
Add spaces on the conditions
  • Loading branch information
ilyasahsan123 committed Oct 13, 2023
1 parent 128fd02 commit b610ffa
Showing 1 changed file with 40 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,14 @@
package io.debezium.server.kinesis;

import java.net.URI;
import java.time.Duration;
import java.util.List;
import java.util.Optional;

import io.debezium.DebeziumException;
import io.debezium.annotation.VisibleForTesting;
import io.debezium.util.Clock;
import io.debezium.util.Metronome;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import jakarta.enterprise.context.Dependent;
Expand All @@ -34,6 +39,7 @@
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.KinesisClientBuilder;
import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
import software.amazon.awssdk.core.exception.SdkClientException;

/**
* Implementation of the consumer that delivers the messages into Amazon Kinesis destination.
Expand All @@ -56,6 +62,11 @@ public class KinesisChangeConsumer extends BaseChangeConsumer implements Debeziu
private Optional<String> endpointOverride;
private Optional<String> credentialsProfile;

private static int retries;
private static Duration retryInterval;
private static final int DEFAULT_RETRIES = 5;
private static final Long RETRY_INTERVAL = Integer.toUnsignedLong(1_000); // Default to 1s

@ConfigProperty(name = PROP_PREFIX + "null.key", defaultValue = "default")
String nullKey;

Expand Down Expand Up @@ -86,6 +97,12 @@ void connect() {
LOGGER.info("Using default KinesisClient '{}'", client);
}

@VisibleForTesting
void initWithConfig(Config config) {
retries = DEFAULT_RETRIES;
retryInterval = Duration.ofMillis(RETRY_INTERVAL);
}

@PreDestroy
void close() {
try {
Expand All @@ -101,19 +118,39 @@ public void handleBatch(List<ChangeEvent<Object, Object>> records, RecordCommitt
throws InterruptedException {
for (ChangeEvent<Object, Object> record : records) {
LOGGER.trace("Received event '{}'", record);

if (record.value() != null) {
int attempts = 0;
if (!recordSent(record)) {
attempts++;
if (attempts >= retries) {
throw new DebeziumException("Exceeded maximum number of attempts to publish event " + record);
}
Metronome.sleeper(retryInterval, Clock.SYSTEM).pause();
}
committer.markProcessed(record);
}
}
committer.markBatchFinished();
}

private boolean recordSent(ChangeEvent<Object, Object> record) throws InterruptedException {
boolean sent = false;
try {
Object rv = record.value();
if (rv == null) {
rv = "";
}

final PutRecordRequest putRecord = PutRecordRequest.builder()
.partitionKey((record.key() != null) ? getString(record.key()) : nullKey)
.streamName(streamNameMapper.map(record.destination()))
.data(SdkBytes.fromByteArray(getBytes(rv)))
.build();
client.putRecord(putRecord);
committer.markProcessed(record);
} catch (SdkClientException exception) {
LOGGER.error("Failed to send record to {}:", record.destination(), exception);
throw new DebeziumException(exception);
}
committer.markBatchFinished();
return sent;
}
}

0 comments on commit b610ffa

Please sign in to comment.