diff --git a/debezium-server-nats-jetstream/src/main/java/io/debezium/server/nats/jetstream/NatsJetStreamChangeConsumer.java b/debezium-server-nats-jetstream/src/main/java/io/debezium/server/nats/jetstream/NatsJetStreamChangeConsumer.java index ec30d9a0..bb70202a 100644 --- a/debezium-server-nats-jetstream/src/main/java/io/debezium/server/nats/jetstream/NatsJetStreamChangeConsumer.java +++ b/debezium-server-nats-jetstream/src/main/java/io/debezium/server/nats/jetstream/NatsJetStreamChangeConsumer.java @@ -55,6 +55,8 @@ public class NatsJetStreamChangeConsumer extends BaseChangeConsumer private static final String PROP_AUTH_JWT = PROP_PREFIX + "auth.jwt"; private static final String PROP_AUTH_SEED = PROP_PREFIX + "auth.seed"; + private static final String PROP_AUTH_USER = PROP_PREFIX + "auth.user"; + private static final String PROP_AUTH_PASSWORD = PROP_PREFIX + "auth.password"; private Connection nc; private JetStream js; @@ -68,6 +70,12 @@ public class NatsJetStreamChangeConsumer extends BaseChangeConsumer @ConfigProperty(name = PROP_AUTH_SEED) Optional seed; + @ConfigProperty(name = PROP_AUTH_USER) + Optional user; + + @ConfigProperty(name = PROP_AUTH_PASSWORD) + Optional password; + @Inject @CustomConsumerBuilder Instance customStreamingConnection; @@ -94,6 +102,9 @@ void connect() { natsOptionsBuilder .authHandler(Nats.staticCredentials(jwt.get().toCharArray(), seed.get().toCharArray())); } + else if (user.isPresent() && password.isPresent()) { + natsOptionsBuilder.userInfo(user.get(), password.get()); + } nc = Nats.connect(natsOptionsBuilder.build());