Skip to content

Commit

Permalink
DBZ-7032 Use Duration type
Browse files Browse the repository at this point in the history
  • Loading branch information
jpechane committed Oct 30, 2023
1 parent 860fb62 commit 3783994
Showing 1 changed file with 6 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,6 @@
import java.util.List;
import java.util.Optional;

import io.debezium.DebeziumException;
import io.debezium.util.Clock;
import io.debezium.util.Metronome;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import jakarta.enterprise.context.Dependent;
Expand All @@ -26,19 +23,22 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.debezium.DebeziumException;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.DebeziumEngine.RecordCommitter;
import io.debezium.server.BaseChangeConsumer;
import io.debezium.server.CustomConsumerBuilder;
import io.debezium.util.Clock;
import io.debezium.util.Metronome;

import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.KinesisClientBuilder;
import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
import software.amazon.awssdk.core.exception.SdkClientException;

/**
* Implementation of the consumer that delivers the messages into Amazon Kinesis destination.
Expand All @@ -61,7 +61,7 @@ public class KinesisChangeConsumer extends BaseChangeConsumer implements Debeziu
private Optional<String> endpointOverride;
private Optional<String> credentialsProfile;
private static final int DEFAULT_RETRIES = 5;
private static final Long RETRY_INTERVAL = Integer.toUnsignedLong(1_000);
private static final Duration RETRY_INTERVAL = Duration.ofSeconds(1);

@ConfigProperty(name = PROP_PREFIX + "null.key", defaultValue = "default")
String nullKey;
Expand Down Expand Up @@ -115,7 +115,7 @@ public void handleBatch(List<ChangeEvent<Object, Object>> records, RecordCommitt
if (attempts >= DEFAULT_RETRIES) {
throw new DebeziumException("Exceeded maximum number of attempts to publish event " + record);
}
Metronome.sleeper(Duration.ofMillis(RETRY_INTERVAL), Clock.SYSTEM).pause();
Metronome.sleeper(RETRY_INTERVAL, Clock.SYSTEM).pause();
}
committer.markProcessed(record);
}
Expand Down

0 comments on commit 3783994

Please sign in to comment.