diff --git a/debezium-server-pulsar/src/main/java/io/debezium/server/pulsar/PulsarChangeConsumer.java b/debezium-server-pulsar/src/main/java/io/debezium/server/pulsar/PulsarChangeConsumer.java index 2a370ffa..cd5b4ca5 100644 --- a/debezium-server-pulsar/src/main/java/io/debezium/server/pulsar/PulsarChangeConsumer.java +++ b/debezium-server-pulsar/src/main/java/io/debezium/server/pulsar/PulsarChangeConsumer.java @@ -19,6 +19,7 @@ import jakarta.enterprise.context.Dependent; import jakarta.inject.Named; +import org.apache.pulsar.client.api.BatcherBuilder; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; @@ -73,6 +74,9 @@ public interface ProducerBuilder { @ConfigProperty(name = PROP_PREFIX + "timeout", defaultValue = "0") Integer timeout; + @ConfigProperty(name = PROP_PRODUCER_PREFIX + "batcherBuilder", defaultValue = "DEFAULT") + String batcherBuilderConfig; + @PostConstruct void connect() { final Config config = ConfigProvider.getConfig(); @@ -112,12 +116,14 @@ private Producer createProducer(String topicName, Object value) { return pulsarClient.newProducer(Schema.STRING) .loadConf(producerConfig) .topic(topicFullName) + .batcherBuilder(getBatcherBuilder(batcherBuilderConfig)) .create(); } else { return pulsarClient.newProducer() .loadConf(producerConfig) .topic(topicFullName) + .batcherBuilder(getBatcherBuilder(batcherBuilderConfig)) .create(); } } @@ -126,6 +132,16 @@ private Producer createProducer(String topicName, Object value) { } } + private BatcherBuilder getBatcherBuilder(String configValue) { + switch (configValue) { + case "KEY_BASED": + return BatcherBuilder.KEY_BASED; + case "DEFAULT": + default: + return BatcherBuilder.DEFAULT; + } + } + @SuppressWarnings("unchecked") @Override public void handleBatch(List> records, RecordCommitter> committer)