Skip to content

Commit

Permalink
add endpoint override support
Browse files Browse the repository at this point in the history
  • Loading branch information
koneru9999 committed Dec 4, 2023
1 parent e71a359 commit ea03bde
Showing 1 changed file with 5 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/
package io.debezium.server.sqs;

import java.net.URI;
import java.time.Duration;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -51,10 +52,12 @@ public class SqsChangeConsumer extends BaseChangeConsumer implements ChangeConsu

protected static final String PROP_PREFIX = "debezium.sink.sqs.";
protected static final String PROP_REGION_NAME = PROP_PREFIX + "region";
private static final String PROP_ENDPOINT_NAME = PROP_PREFIX + "endpoint";
private static final String PROP_QUEUE_URL = PROP_PREFIX + "queue.url";
private static final String PROP_CREDENTIALS_PROFILE = PROP_PREFIX + "credentials.profile";

private String region;
private Optional<String> endpointOverride;
private String queueUrl;

private Boolean isFifoQueue;
Expand Down Expand Up @@ -84,10 +87,12 @@ void connect() {

final Config config = ConfigProvider.getConfig();
region = config.getValue(PROP_REGION_NAME, String.class);
endpointOverride = config.getOptionalValue(PROP_ENDPOINT_NAME, String.class);
queueUrl = config.getValue(PROP_QUEUE_URL, String.class);
isFifoQueue = queueUrl.endsWith(".fifo");
final SqsClientBuilder builder = SqsClient.builder()
.region(Region.of(region));
endpointOverride.ifPresent(endpoint -> builder.endpointOverride(URI.create(endpoint)));
credentialsProfile.ifPresent(profile -> builder.credentialsProvider(ProfileCredentialsProvider.create(profile)));

client = builder.build();
Expand Down

0 comments on commit ea03bde

Please sign in to comment.