diff --git a/.gitignore b/.gitignore
index 88ca35d2..a4125900 100644
--- a/.gitignore
+++ b/.gitignore
@@ -47,3 +47,5 @@ charts/package/
# Ignore the templates.zip file created in main and test resources
src/main/resources/static/templates.zip
src/test/resources/static/templates.zip
+
+.cp-demo
diff --git a/.semaphore/multi-arch-builds-and-upload.yml b/.semaphore/multi-arch-builds-and-upload.yml
index ed36641c..8b361dc3 100644
--- a/.semaphore/multi-arch-builds-and-upload.yml
+++ b/.semaphore/multi-arch-builds-and-upload.yml
@@ -13,11 +13,13 @@ global_job_config:
- checkout
- make ci-bin-sem-cache-restore
- make docker-login-ci
+ - make load-cached-docker-images
epilogue:
always:
commands:
- - make ci-bin-sem-cache-store
- make store-test-results-to-semaphore
+ - make ci-bin-sem-cache-store
+ - make cache-docker-images
blocks:
- name: "Build Native Executable (MacOS AMD64)"
diff --git a/.semaphore/semaphore.yml b/.semaphore/semaphore.yml
index 88e1f6c2..0bde9ff3 100644
--- a/.semaphore/semaphore.yml
+++ b/.semaphore/semaphore.yml
@@ -27,11 +27,13 @@ global_job_config:
- checkout
- make ci-bin-sem-cache-restore
- make docker-login-ci
+ - make load-cached-docker-images
epilogue:
always:
commands:
- - make ci-bin-sem-cache-store
- make store-test-results-to-semaphore
+ - make ci-bin-sem-cache-store
+ - make cache-docker-images
blocks:
- name: "Build JARs and Unit Test"
diff --git a/Makefile b/Makefile
index 7440c6c6..e0b1fc94 100644
--- a/Makefile
+++ b/Makefile
@@ -95,3 +95,93 @@ upload-artifacts-to-github-release:
.PHONY: collect-notices-binary
collect-notices-binary: clean mvn-package-native-sources-only
$(IDE_SIDECAR_SCRIPTS)/collect-notices-binary.sh target/native-sources/lib
+
+# Targets for managing cp-demo testcontainers used by the integration tests
+
+# Start the cp-demo testcontainers
+# Note: You do not need to run this in order to run the integration tests, however, if you want
+# to manually bring up the cp-demo environment, you may run this target. You will be
+# able to run the integration tests against the same environment, please keep that in mind!
+.PHONY: cp-demo-start
+cp-demo-start:
+ export TESTCONTAINERS_RYUK_DISABLED=true; \
+ ./mvnw -s .mvn/settings.xml \
+ -Dexec.mainClass=io.confluent.idesidecar.restapi.util.CPDemoTestEnvironment \
+ -Dexec.classpathScope=test \
+ test-compile exec:java
+
+# Stop the cp-demo testcontainers
+.PHONY: cp-demo-stop
+cp-demo-stop:
+ ./mvnw -s .mvn/settings.xml test-compile && \
+ ./mvnw -s .mvn/settings.xml \
+ -Dexec.mainClass=io.confluent.idesidecar.restapi.util.CPDemoTestEnvironment \
+ -Dexec.classpathScope=test \
+ -Dexec.args=stop \
+ exec:java
+
+
+CONFLUENT_DOCKER_TAG = $(shell yq e '.ide-sidecar.integration-tests.cp-demo.tag' src/main/resources/application.yml | sed 's/^v//')
+# See io.confluent.idesidecar.restapi.util.ConfluentLocalKafkaWithRestProxyContainer
+CONFLUENT_LOCAL_DOCKER_TAG = "7.6.0"
+# See io.confluent.idesidecar.restapi.util.cpdemo.OpenldapContainer
+OSIXIA_OPENLDAP_DOCKER_TAG = "1.3.0"
+# See io.confluent.idesidecar.restapi.util.cpdemo.ToolsContainer
+CNFLDEMOS_TOOLS_DOCKER_TAG = "0.3"
+
+# Key for storing docker images in Semaphore CI cache
+SEMAPHORE_CP_ZOOKEEPER_DOCKER := ide-sidecar-docker-cp-zookeeper-$(CONFLUENT_DOCKER_TAG)
+SEMAPHORE_CP_SERVER_DOCKER := ide-sidecar-docker-cp-server-$(CONFLUENT_DOCKER_TAG)
+SEMAPHORE_OPENLDAP_DOCKER := ide-sidecar-docker-openldap-$(OSIXIA_OPENLDAP_DOCKER_TAG)
+SEMAPHORE_CNFLDEMOS_TOOLS_DOCKER := ide-sidecar-docker-cnfldemos-tools-$(CNFLDEMOS_TOOLS_DOCKER_TAG)
+SEMAPHORE_CONFLUENT_LOCAL_DOCKER := ide-sidecar-docker-confluent-local-$(CONFLUENT_LOCAL_DOCKER_TAG)
+
+## Cache docker images in Semaphore cache.
+.PHONY: cache-docker-images
+cache-docker-images:
+ cache has_key $(SEMAPHORE_CP_ZOOKEEPER_DOCKER) || (\
+ docker pull confluentinc/cp-zookeeper:$(CONFLUENT_DOCKER_TAG) && \
+ docker save confluentinc/cp-zookeeper:$(CONFLUENT_DOCKER_TAG) | gzip > cp-zookeeper.tgz && \
+ cache store $(SEMAPHORE_CP_ZOOKEEPER_DOCKER) cp-zookeeper.tgz && \
+ rm -rf cp-zookeeper.tgz)
+
+ cache has_key $(SEMAPHORE_CP_SERVER_DOCKER) || (\
+ docker pull confluentinc/cp-server:$(CONFLUENT_DOCKER_TAG) && \
+ docker save confluentinc/cp-server:$(CONFLUENT_DOCKER_TAG) | gzip > cp-server.tgz && \
+ cache store $(SEMAPHORE_CP_SERVER_DOCKER) cp-server.tgz && \
+ rm -rf cp-server.tgz)
+
+ cache has_key $(SEMAPHORE_OPENLDAP_DOCKER) || (\
+ docker pull osixia/openldap:$(OSIXIA_OPENLDAP_DOCKER_TAG) && \
+ docker save osixia/openldap:$(OSIXIA_OPENLDAP_DOCKER_TAG) | gzip > openldap.tgz && \
+ cache store $(SEMAPHORE_OPENLDAP_DOCKER) openldap.tgz && \
+ rm -rf openldap.tgz)
+
+ cache has_key $(SEMAPHORE_CNFLDEMOS_TOOLS_DOCKER) || (\
+ docker pull cnfldemos/tools:$(CNFLDEMOS_TOOLS_DOCKER_TAG) && \
+ docker save cnfldemos/tools:$(CNFLDEMOS_TOOLS_DOCKER_TAG) | gzip > cnfdemos-tools.tgz && \
+ cache store $(SEMAPHORE_CNFLDEMOS_TOOLS_DOCKER) cnfdemos-tools.tgz && \
+ rm -rf cnfdemos-tools.tgz)
+
+ cache has_key $(SEMAPHORE_CONFLUENT_LOCAL_DOCKER) || (\
+ docker pull confluentinc/cp-local:$(CONFLUENT_LOCAL_DOCKER_TAG) && \
+ docker save confluentinc/cp-local:$(CONFLUENT_LOCAL_DOCKER_TAG) | gzip > cp-local.tgz && \
+ cache store $(SEMAPHORE_CONFLUENT_LOCAL_DOCKER) cp-local.tgz && \
+ rm -rf cp-local.tgz)
+
+.PHONY: load-cached-docker-images
+load-cached-docker-images:
+ cache restore $(SEMAPHORE_CP_ZOOKEEPER_DOCKER) \
+ [ -f cp-zookeeper.tgz ] && docker load -i cp-zookeeper.tgz && rm -rf cp-zookeeper.tgz || true
+
+ cache restore $(SEMAPHORE_CP_SERVER_DOCKER) \
+ [ -f cp-server.tgz ] && docker load -i cp-server.tgz && rm -rf cp-server.tgz || true
+
+ cache restore $(SEMAPHORE_OPENLDAP_DOCKER) \
+ [ -f openldap.tgz ] && docker load -i openldap.tgz && rm -rf openldap.tgz || true
+
+ cache restore $(SEMAPHORE_CNFLDEMOS_TOOLS_DOCKER) \
+ [ -f cnfdemos-tools.tgz ] && docker load -i cnfdemos-tools.tgz && rm -rf cnfdemos-tools.tgz || true
+
+ cache restore $(SEMAPHORE_CONFLUENT_LOCAL_DOCKER) \
+ [ -f cp-local.tgz ] && docker load -i cp-local.tgz && rm -rf cp-local.tgz || true
diff --git a/pom.xml b/pom.xml
index 092cfd14..aab4d63f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -413,7 +413,7 @@
integration-test
verify
-
+
true
diff --git a/src/generated/resources/openapi.json b/src/generated/resources/openapi.json
index 284b70e3..f006f27c 100644
--- a/src/generated/resources/openapi.json
+++ b/src/generated/resources/openapi.json
@@ -659,7 +659,7 @@
"components" : {
"schemas" : {
"ApiKeyAndSecret" : {
- "description" : "Basic authentication credentials",
+ "description" : "API key and secret authentication credentials",
"required" : [ "api_key", "api_secret" ],
"type" : "object",
"properties" : {
@@ -1121,19 +1121,17 @@
"$ref" : "#/components/schemas/BasicCredentials"
}, {
"$ref" : "#/components/schemas/ApiKeyAndSecret"
+ }, {
+ "$ref" : "#/components/schemas/OAuthCredentials"
} ],
"nullable" : true
},
"ssl" : {
- "description" : "Whether to communicate with the Kafka cluster over TLS/SSL. Defaults to 'true', but set to 'false' when the Kafka cluster does not support TLS/SSL.",
- "default" : true,
- "type" : "boolean",
- "nullable" : true
- },
- "verify_ssl_certificates" : {
- "description" : "Whether to verify the Kafka cluster certificates. Defaults to 'true', but set to 'false' when the Kafka cluster has self-signed certificates.",
- "default" : true,
- "type" : "boolean",
+ "description" : "The SSL configuration for connecting to the Kafka cluster. To disable, set `enabled` to false. To use the default SSL settings, set `enabled` to true and leave the `truststore` and `keystore` fields unset.",
+ "type" : "object",
+ "allOf" : [ {
+ "$ref" : "#/components/schemas/TLSConfig"
+ } ],
"nullable" : true
}
}
@@ -1166,6 +1164,42 @@
}
}
},
+ "KeyStore" : {
+ "required" : [ "path" ],
+ "type" : "object",
+ "properties" : {
+ "path" : {
+ "description" : "The path to the local key store file. Only specified if client needs to be authenticated by the server (mutual TLS).",
+ "maxLength" : 256,
+ "type" : "string"
+ },
+ "password" : {
+ "description" : "The password for the local key store file. If a password is not set, key store file configured will still be used, but integrity checking is disabled. A key store password is not supported for PEM format.",
+ "type" : "string",
+ "allOf" : [ {
+ "$ref" : "#/components/schemas/Password"
+ } ],
+ "nullable" : true
+ },
+ "type" : {
+ "description" : "The file format of the local key store file.",
+ "default" : "JKS",
+ "type" : "string",
+ "allOf" : [ {
+ "$ref" : "#/components/schemas/StoreType"
+ } ],
+ "nullable" : true
+ },
+ "key_password" : {
+ "description" : "The password of the private key in the local key store file.",
+ "type" : "string",
+ "allOf" : [ {
+ "$ref" : "#/components/schemas/Password"
+ } ],
+ "nullable" : true
+ }
+ }
+ },
"LocalConfig" : {
"description" : "Configuration when using Confluent Local and optionally a local Schema Registry.",
"type" : "object",
@@ -1177,6 +1211,46 @@
}
}
},
+ "OAuthCredentials" : {
+ "description" : "OAuth 2.0 authentication credentials",
+ "required" : [ "tokens_url", "client_id" ],
+ "type" : "object",
+ "properties" : {
+ "tokens_url" : {
+ "description" : "The URL of the OAuth 2.0 identity provider's token endpoint.",
+ "maxLength" : 256,
+ "type" : "string"
+ },
+ "client_id" : {
+ "description" : "The public identifier for the application as registered with the OAuth 2.0 identity provider.",
+ "maxLength" : 128,
+ "minLength" : 1,
+ "type" : "string"
+ },
+ "client_secret" : {
+ "description" : "The client secret known only to the application and the OAuth 2.0 identity provider.",
+ "type" : "string",
+ "allOf" : [ {
+ "$ref" : "#/components/schemas/Password"
+ } ]
+ },
+ "scope" : {
+ "description" : "The scope to use. The scope is optional and required only when your identity provider doesn't have a default scope or your groups claim is linked to a scope path to use when connecting to the external service.",
+ "maxLength" : 256,
+ "type" : "string"
+ },
+ "connect_timeout_millis" : {
+ "format" : "int32",
+ "description" : "The timeout in milliseconds when connecting to your identity provider.",
+ "minimum" : 0,
+ "type" : "integer"
+ },
+ "identityPool" : {
+ "description" : "Additional property that can be added in the request header to identify the principal ID for authorization. For example, this may bea Confluent Cloud identity pool.",
+ "type" : "string"
+ }
+ }
+ },
"ObjectMetadata" : {
"type" : "object",
"properties" : {
@@ -1375,6 +1449,16 @@
"$ref" : "#/components/schemas/BasicCredentials"
}, {
"$ref" : "#/components/schemas/ApiKeyAndSecret"
+ }, {
+ "$ref" : "#/components/schemas/OAuthCredentials"
+ } ],
+ "nullable" : true
+ },
+ "ssl" : {
+ "description" : "The SSL configuration for connecting to Schema Registry. If null, the connection will use SSL with the default settings. To disable, set `enabled` to false.",
+ "type" : "object",
+ "allOf" : [ {
+ "$ref" : "#/components/schemas/TLSConfig"
} ],
"nullable" : true
}
@@ -1475,6 +1559,43 @@
"enum" : [ "NO_TOKEN", "VALID_TOKEN", "INVALID_TOKEN", "FAILED" ],
"type" : "string"
},
+ "StoreType" : {
+ "enum" : [ "JKS", "PKCS12", "PEM", "UNKNOWN" ],
+ "type" : "string"
+ },
+ "TLSConfig" : {
+ "description" : "SSL configuration",
+ "required" : [ "enabled" ],
+ "type" : "object",
+ "properties" : {
+ "verify_hostname" : {
+ "description" : "Whether to verify the server certificate hostname. Defaults to true if not set.",
+ "default" : true,
+ "type" : "boolean"
+ },
+ "enabled" : {
+ "description" : "Whether SSL is enabled. If not set, defaults to true.",
+ "default" : true,
+ "type" : "boolean"
+ },
+ "truststore" : {
+ "description" : "The trust store configuration for authenticating the server's certificate.",
+ "type" : "object",
+ "allOf" : [ {
+ "$ref" : "#/components/schemas/TrustStore"
+ } ],
+ "nullable" : true
+ },
+ "keystore" : {
+ "description" : "The key store configuration that will identify and authenticate the client to the server, required for mutual TLS (mTLS)",
+ "type" : "object",
+ "allOf" : [ {
+ "$ref" : "#/components/schemas/KeyStore"
+ } ],
+ "nullable" : true
+ }
+ }
+ },
"Template" : {
"required" : [ "api_version", "kind", "id", "metadata", "spec" ],
"type" : "object",
@@ -1557,6 +1678,34 @@
"enum" : [ "NO_TIMESTAMP_TYPE", "CREATE_TIME", "LOG_APPEND_TIME" ],
"type" : "string"
},
+ "TrustStore" : {
+ "required" : [ "path" ],
+ "type" : "object",
+ "properties" : {
+ "path" : {
+ "description" : "The path to the local trust store file. Required for authenticating the server's certificate.",
+ "maxLength" : 256,
+ "type" : "string"
+ },
+ "password" : {
+ "description" : "The password for the local trust store file. If a password is not set, trust store file configured will still be used, but integrity checking is disabled. A trust store password is not supported for PEM format.",
+ "type" : "string",
+ "allOf" : [ {
+ "$ref" : "#/components/schemas/Password"
+ } ],
+ "nullable" : true
+ },
+ "type" : {
+ "description" : "The file format of the local trust store file",
+ "default" : "JKS",
+ "type" : "string",
+ "allOf" : [ {
+ "$ref" : "#/components/schemas/StoreType"
+ } ],
+ "nullable" : true
+ }
+ }
+ },
"UserInfo" : {
"type" : "object",
"properties" : {
diff --git a/src/generated/resources/openapi.yaml b/src/generated/resources/openapi.yaml
index 85d0fed4..ed2b413d 100644
--- a/src/generated/resources/openapi.yaml
+++ b/src/generated/resources/openapi.yaml
@@ -445,7 +445,7 @@ paths:
components:
schemas:
ApiKeyAndSecret:
- description: Basic authentication credentials
+ description: API key and secret authentication credentials
required:
- api_key
- api_secret
@@ -814,20 +814,16 @@ components:
oneOf:
- $ref: "#/components/schemas/BasicCredentials"
- $ref: "#/components/schemas/ApiKeyAndSecret"
+ - $ref: "#/components/schemas/OAuthCredentials"
nullable: true
ssl:
- description: "Whether to communicate with the Kafka cluster over TLS/SSL.\
- \ Defaults to 'true', but set to 'false' when the Kafka cluster does not\
- \ support TLS/SSL."
- default: true
- type: boolean
- nullable: true
- verify_ssl_certificates:
- description: "Whether to verify the Kafka cluster certificates. Defaults\
- \ to 'true', but set to 'false' when the Kafka cluster has self-signed\
- \ certificates."
- default: true
- type: boolean
+ description: "The SSL configuration for connecting to the Kafka cluster.\
+ \ To disable, set `enabled` to false. To use the default SSL settings,\
+ \ set `enabled` to true and leave the `truststore` and `keystore` fields\
+ \ unset."
+ type: object
+ allOf:
+ - $ref: "#/components/schemas/TLSConfig"
nullable: true
KafkaClusterStatus:
description: The status related to the specified Kafka cluster.
@@ -850,6 +846,38 @@ components:
type: object
allOf:
- $ref: "#/components/schemas/AuthErrors"
+ KeyStore:
+ required:
+ - path
+ type: object
+ properties:
+ path:
+ description: The path to the local key store file. Only specified if client
+ needs to be authenticated by the server (mutual TLS).
+ maxLength: 256
+ type: string
+ password:
+ description: "The password for the local key store file. If a password is\
+ \ not set, key store file configured will still be used, but integrity\
+ \ checking is disabled. A key store password is not supported for PEM\
+ \ format."
+ type: string
+ allOf:
+ - $ref: "#/components/schemas/Password"
+ nullable: true
+ type:
+ description: The file format of the local key store file.
+ default: JKS
+ type: string
+ allOf:
+ - $ref: "#/components/schemas/StoreType"
+ nullable: true
+ key_password:
+ description: The password of the private key in the local key store file.
+ type: string
+ allOf:
+ - $ref: "#/components/schemas/Password"
+ nullable: true
LocalConfig:
description: Configuration when using Confluent Local and optionally a local
Schema Registry.
@@ -859,6 +887,46 @@ components:
description: The URL of the Schema Registry running locally.
maxLength: 512
type: string
+ OAuthCredentials:
+ description: OAuth 2.0 authentication credentials
+ required:
+ - tokens_url
+ - client_id
+ type: object
+ properties:
+ tokens_url:
+ description: The URL of the OAuth 2.0 identity provider's token endpoint.
+ maxLength: 256
+ type: string
+ client_id:
+ description: The public identifier for the application as registered with
+ the OAuth 2.0 identity provider.
+ maxLength: 128
+ minLength: 1
+ type: string
+ client_secret:
+ description: The client secret known only to the application and the OAuth
+ 2.0 identity provider.
+ type: string
+ allOf:
+ - $ref: "#/components/schemas/Password"
+ scope:
+ description: The scope to use. The scope is optional and required only when
+ your identity provider doesn't have a default scope or your groups claim
+ is linked to a scope path to use when connecting to the external service.
+ maxLength: 256
+ type: string
+ connect_timeout_millis:
+ format: int32
+ description: The timeout in milliseconds when connecting to your identity
+ provider.
+ minimum: 0
+ type: integer
+ identityPool:
+ description: "Additional property that can be added in the request header\
+ \ to identify the principal ID for authorization. For example, this may\
+ \ bea Confluent Cloud identity pool."
+ type: string
ObjectMetadata:
type: object
properties:
@@ -1005,6 +1073,15 @@ components:
oneOf:
- $ref: "#/components/schemas/BasicCredentials"
- $ref: "#/components/schemas/ApiKeyAndSecret"
+ - $ref: "#/components/schemas/OAuthCredentials"
+ nullable: true
+ ssl:
+ description: "The SSL configuration for connecting to Schema Registry. If\
+ \ null, the connection will use SSL with the default settings. To disable,\
+ \ set `enabled` to false."
+ type: object
+ allOf:
+ - $ref: "#/components/schemas/TLSConfig"
nullable: true
SchemaRegistryStatus:
description: The status related to the specified Schema Registry.
@@ -1076,6 +1153,42 @@ components:
- INVALID_TOKEN
- FAILED
type: string
+ StoreType:
+ enum:
+ - JKS
+ - PKCS12
+ - PEM
+ - UNKNOWN
+ type: string
+ TLSConfig:
+ description: SSL configuration
+ required:
+ - enabled
+ type: object
+ properties:
+ verify_hostname:
+ description: Whether to verify the server certificate hostname. Defaults
+ to true if not set.
+ default: true
+ type: boolean
+ enabled:
+ description: "Whether SSL is enabled. If not set, defaults to true."
+ default: true
+ type: boolean
+ truststore:
+ description: The trust store configuration for authenticating the server's
+ certificate.
+ type: object
+ allOf:
+ - $ref: "#/components/schemas/TrustStore"
+ nullable: true
+ keystore:
+ description: "The key store configuration that will identify and authenticate\
+ \ the client to the server, required for mutual TLS (mTLS)"
+ type: object
+ allOf:
+ - $ref: "#/components/schemas/KeyStore"
+ nullable: true
Template:
required:
- api_version
@@ -1150,6 +1263,32 @@ components:
- CREATE_TIME
- LOG_APPEND_TIME
type: string
+ TrustStore:
+ required:
+ - path
+ type: object
+ properties:
+ path:
+ description: The path to the local trust store file. Required for authenticating
+ the server's certificate.
+ maxLength: 256
+ type: string
+ password:
+ description: "The password for the local trust store file. If a password\
+ \ is not set, trust store file configured will still be used, but integrity\
+ \ checking is disabled. A trust store password is not supported for PEM\
+ \ format."
+ type: string
+ allOf:
+ - $ref: "#/components/schemas/Password"
+ nullable: true
+ type:
+ description: The file format of the local trust store file
+ default: JKS
+ type: string
+ allOf:
+ - $ref: "#/components/schemas/StoreType"
+ nullable: true
UserInfo:
type: object
properties:
diff --git a/src/main/java/io/confluent/idesidecar/restapi/application/ProxyProcessorBeanProducers.java b/src/main/java/io/confluent/idesidecar/restapi/application/ProxyProcessorBeanProducers.java
index d2860845..dff3f4d3 100644
--- a/src/main/java/io/confluent/idesidecar/restapi/application/ProxyProcessorBeanProducers.java
+++ b/src/main/java/io/confluent/idesidecar/restapi/application/ProxyProcessorBeanProducers.java
@@ -12,6 +12,7 @@
import io.confluent.idesidecar.restapi.proxy.clusters.processors.ClusterStrategyProcessor;
import io.confluent.idesidecar.restapi.util.WebClientFactory;
import io.vertx.core.Future;
+import io.vertx.core.Vertx;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Produces;
import jakarta.inject.Inject;
@@ -30,6 +31,9 @@ public class ProxyProcessorBeanProducers {
@Inject
WebClientFactory webClientFactory;
+ @Inject
+ Vertx vertx;
+
@Produces
@Singleton
@Named("clusterProxyProcessor")
@@ -45,7 +49,7 @@ public Processor> clusterProxyP
clusterInfoProcessor,
clusterStrategyProcessor,
clusterProxyProcessor,
- new ProxyRequestProcessor<>(webClientFactory),
+ new ProxyRequestProcessor<>(webClientFactory, vertx),
new EmptyProcessor<>()
);
}
diff --git a/src/main/java/io/confluent/idesidecar/restapi/clients/AdminClients.java b/src/main/java/io/confluent/idesidecar/restapi/clients/AdminClients.java
index f8907328..e68d3bb2 100644
--- a/src/main/java/io/confluent/idesidecar/restapi/clients/AdminClients.java
+++ b/src/main/java/io/confluent/idesidecar/restapi/clients/AdminClients.java
@@ -38,7 +38,7 @@ public AdminClient getClient(String connectionId, String clusterId) {
// Generate the Kafka admin client configuration
var config = configurator.getAdminClientConfig(connectionId, clusterId);
Log.debugf(
- "Creating schema registry client for connection %s and cluster %s with configuration:\n %s",
+ "Creating admin client for connection %s and cluster %s with configuration:\n %s",
connectionId,
clusterId,
config
diff --git a/src/main/java/io/confluent/idesidecar/restapi/clients/ClientConfigurator.java b/src/main/java/io/confluent/idesidecar/restapi/clients/ClientConfigurator.java
index a04b718e..6b3ca875 100644
--- a/src/main/java/io/confluent/idesidecar/restapi/clients/ClientConfigurator.java
+++ b/src/main/java/io/confluent/idesidecar/restapi/clients/ClientConfigurator.java
@@ -3,25 +3,24 @@
import io.confluent.idesidecar.restapi.cache.ClusterCache;
import io.confluent.idesidecar.restapi.connections.ConnectionState;
import io.confluent.idesidecar.restapi.connections.ConnectionStateManager;
-import io.confluent.idesidecar.restapi.credentials.Credentials;
import io.confluent.idesidecar.restapi.exceptions.ClusterNotFoundException;
import io.confluent.idesidecar.restapi.exceptions.ConnectionNotFoundException;
import io.confluent.idesidecar.restapi.kafkarest.SchemaManager;
import io.confluent.idesidecar.restapi.models.graph.KafkaCluster;
import io.confluent.idesidecar.restapi.models.graph.SchemaRegistry;
import io.confluent.idesidecar.restapi.util.CCloud;
+import io.confluent.idesidecar.restapi.util.ConfigUtil;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientConfig;
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import io.quarkus.logging.Log;
-import io.confluent.idesidecar.restapi.util.ConfigUtil;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import java.time.Duration;
import java.util.LinkedHashMap;
import java.util.Map;
-import org.apache.kafka.clients.CommonClientConfigs;
-import java.util.function.Supplier;
import java.util.Optional;
+import java.util.function.Supplier;
+import org.apache.kafka.clients.CommonClientConfigs;
@ApplicationScoped
public class ClientConfigurator {
@@ -258,10 +257,20 @@ public static Map getKafkaClientConfig(
// Second, add any connection properties for Kafka cluster credentials (if defined)
var options = connection.getKafkaConnectionOptions().withRedact(redact);
- connection
- .getKafkaCredentials()
- .flatMap(creds -> creds.kafkaClientProperties(options))
- .ifPresent(props::putAll);
+
+ if (connection.getKafkaCredentials().isPresent()) {
+ connection
+ .getKafkaCredentials()
+ .flatMap(creds -> creds.kafkaClientProperties(options))
+ .ifPresent(props::putAll);
+ } else if (connection.getKafkaTLSConfig().isPresent()) {
+ // No credentials, but maybe TLS config is present
+ var tlsConfig = connection.getKafkaTLSConfig().get();
+ if (tlsConfig.enabled()) {
+ props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
+ tlsConfig.getProperties(redact).ifPresent(props::putAll);
+ }
+ }
// Add any auth properties for Schema Registry to the Kafka client config,
// with the "schema.registry." prefix (unless the property already starts with that)
@@ -314,11 +323,23 @@ public static Map getSchemaRegistryClientConfig(
.orElse(null);
// Add any properties for SR credentials (if defined)
- var options = new Credentials.SchemaRegistryConnectionOptions(redact, logicalId);
- connection
- .getSchemaRegistryCredentials()
- .flatMap(creds -> creds.schemaRegistryClientProperties(options))
- .ifPresent(props::putAll);
+ var options = connection
+ .getSchemaRegistryOptions()
+ .withRedact(redact)
+ .withLogicalClusterId(logicalId);
+ if (connection.getSchemaRegistryCredentials().isPresent()) {
+ connection
+ .getSchemaRegistryCredentials()
+ .flatMap(creds -> creds.schemaRegistryClientProperties(options))
+ .ifPresent(props::putAll);
+ } else if (connection.getSchemaRegistryTLSConfig().isPresent()) {
+ // No credentials, but maybe TLS config is present
+ var tlsConfig = connection.getSchemaRegistryTLSConfig().get();
+ if (tlsConfig.enabled()) {
+ tlsConfig.getProperties(redact).ifPresent(props::putAll);
+ }
+ }
+
return props;
}
diff --git a/src/main/java/io/confluent/idesidecar/restapi/clients/SchemaRegistryClients.java b/src/main/java/io/confluent/idesidecar/restapi/clients/SchemaRegistryClients.java
index c7ccd9eb..59729fc3 100644
--- a/src/main/java/io/confluent/idesidecar/restapi/clients/SchemaRegistryClients.java
+++ b/src/main/java/io/confluent/idesidecar/restapi/clients/SchemaRegistryClients.java
@@ -9,10 +9,11 @@
import io.confluent.idesidecar.restapi.util.RequestHeadersConstants;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.client.rest.RestService;
+import io.confluent.kafka.schemaregistry.client.security.SslFactory;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
-import java.util.Collections;
import java.util.Map;
import org.eclipse.microprofile.config.inject.ConfigProperty;
@@ -85,12 +86,21 @@ private SchemaRegistryClient createClient(
Map configurationProperties,
Map headers
) {
+ var restService = new RestService(srClusterUri);
+ restService.configure(configurationProperties);
+ restService.setHttpHeaders(headers);
+
+ var sslFactory = new SslFactory(configurationProperties);
+ if (sslFactory.sslContext() != null) {
+ restService.setSslSocketFactory(sslFactory.sslContext().getSocketFactory());
+ }
+
return new CachedSchemaRegistryClient(
- Collections.singletonList(srClusterUri),
+ restService,
SR_CACHE_SIZE,
SCHEMA_PROVIDERS,
- configurationProperties,
- headers
+ null,
+ null
);
}
}
diff --git a/src/main/java/io/confluent/idesidecar/restapi/connections/ConnectionState.java b/src/main/java/io/confluent/idesidecar/restapi/connections/ConnectionState.java
index 6b1d2124..a5537d12 100644
--- a/src/main/java/io/confluent/idesidecar/restapi/connections/ConnectionState.java
+++ b/src/main/java/io/confluent/idesidecar/restapi/connections/ConnectionState.java
@@ -2,6 +2,9 @@
import io.confluent.idesidecar.restapi.credentials.Credentials;
import io.confluent.idesidecar.restapi.credentials.Credentials.KafkaConnectionOptions;
+import io.confluent.idesidecar.restapi.credentials.CredentialsKafkaConnectionOptionsBuilder;
+import io.confluent.idesidecar.restapi.credentials.CredentialsSchemaRegistryConnectionOptionsBuilder;
+import io.confluent.idesidecar.restapi.credentials.TLSConfig;
import io.confluent.idesidecar.restapi.models.ConnectionMetadata;
import io.confluent.idesidecar.restapi.models.ConnectionSpec;
import io.confluent.idesidecar.restapi.models.ConnectionSpec.ConnectionType;
@@ -169,17 +172,32 @@ public String getInternalId() {
*/
public KafkaConnectionOptions getKafkaConnectionOptions() {
if (spec.kafkaClusterConfig() != null) {
- return new KafkaConnectionOptions(
- spec.kafkaClusterConfig().sslOrDefault(),
- spec.kafkaClusterConfig().verifySslCertificatesOrDefault(),
- false
- );
+ return CredentialsKafkaConnectionOptionsBuilder
+ .builder()
+ .redact(false)
+ .tlsConfig(spec.kafkaClusterConfig().tlsConfig())
+ .build();
}
- return new KafkaConnectionOptions(
- ConnectionSpec.KafkaClusterConfig.DEFAULT_SSL,
- ConnectionSpec.KafkaClusterConfig.DEFAULT_VERIFY_SSL_CERTIFICATES,
- false
- );
+
+ return CredentialsKafkaConnectionOptionsBuilder
+ .builder()
+ .redact(false)
+ .build();
+ }
+
+ public Credentials.SchemaRegistryConnectionOptions getSchemaRegistryOptions() {
+ if (spec.schemaRegistryConfig() != null) {
+ return CredentialsSchemaRegistryConnectionOptionsBuilder
+ .builder()
+ .redact(false)
+ .tlsConfig(spec.schemaRegistryConfig().tlsConfig())
+ .build();
+ }
+
+ return CredentialsSchemaRegistryConnectionOptionsBuilder
+ .builder()
+ .redact(false)
+ .build();
}
/**
@@ -201,4 +219,12 @@ public Optional getKafkaCredentials() {
public Optional getSchemaRegistryCredentials() {
return Optional.empty();
}
+
+ public Optional getKafkaTLSConfig() {
+ return Optional.empty();
+ }
+
+ public Optional getSchemaRegistryTLSConfig() {
+ return Optional.empty();
+ }
}
\ No newline at end of file
diff --git a/src/main/java/io/confluent/idesidecar/restapi/connections/DirectConnectionState.java b/src/main/java/io/confluent/idesidecar/restapi/connections/DirectConnectionState.java
index ee3d6867..45302758 100644
--- a/src/main/java/io/confluent/idesidecar/restapi/connections/DirectConnectionState.java
+++ b/src/main/java/io/confluent/idesidecar/restapi/connections/DirectConnectionState.java
@@ -5,6 +5,7 @@
import io.confluent.idesidecar.restapi.auth.AuthErrors;
import io.confluent.idesidecar.restapi.clients.ClientConfigurator;
import io.confluent.idesidecar.restapi.credentials.Credentials;
+import io.confluent.idesidecar.restapi.credentials.TLSConfig;
import io.confluent.idesidecar.restapi.models.ClusterType;
import io.confluent.idesidecar.restapi.models.ConnectionSpec;
import io.confluent.idesidecar.restapi.models.ConnectionSpec.ConnectionType;
@@ -17,7 +18,9 @@
import io.confluent.idesidecar.restapi.models.ConnectionStatusSchemaRegistryStatusBuilder;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.client.rest.RestService;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
+import io.confluent.kafka.schemaregistry.client.security.SslFactory;
import io.quarkus.logging.Log;
import io.smallrye.common.constraint.NotNull;
import io.smallrye.common.constraint.Nullable;
@@ -27,8 +30,7 @@
import java.io.IOException;
import java.net.UnknownHostException;
import java.time.Duration;
-import java.util.Collections;
-import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -137,6 +139,35 @@ public Optional getSchemaRegistryCredentials() {
return Optional.ofNullable(credentials);
}
+ @Override
+ public Optional getKafkaTLSConfig() {
+ if (spec.kafkaClusterConfig() != null) {
+ return Optional.of(
+ Objects.requireNonNullElse(
+ spec.kafkaClusterConfig().tlsConfig(),
+ // Use the default TLS configuration if none is provided
+ new TLSConfig()
+ )
+ );
+ }
+
+ return Optional.empty();
+ }
+
+ @Override
+ public Optional getSchemaRegistryTLSConfig() {
+ if (spec.schemaRegistryConfig() != null) {
+ return Optional.of(
+ spec.schemaRegistryConfig().tlsConfig() != null
+ ? spec.schemaRegistryConfig().tlsConfig()
+ // Use the default TLS configuration if none is provided
+ : new TLSConfig()
+ );
+ }
+
+ return Optional.empty();
+ }
+
@Override
protected Future doRefreshStatus() {
return Future.join(
@@ -185,7 +216,7 @@ protected Future getKafkaConnectionStatus() {
);
} else if (cause instanceof TimeoutException) {
message = ("Unable to connect to the Kafka cluster at %s."
- + "Check the credentials or the network."
+ + " Check the credentials or the network."
).formatted(
spec.kafkaClusterConfig().bootstrapServers()
);
@@ -372,10 +403,12 @@ protected SchemaRegistryClient createSchemaRegistryClient(
false,
TIMEOUT
);
- return new CachedSchemaRegistryClient(
- Collections.singletonList(config.uri()),
- 10,
- srClientConfig
- );
+ var restService = new RestService(config.uri());
+ restService.configure(srClientConfig);
+ var sslFactory = new SslFactory(srClientConfig);
+ if (sslFactory.sslContext() != null) {
+ restService.setSslSocketFactory(sslFactory.sslContext().getSocketFactory());
+ }
+ return new CachedSchemaRegistryClient(restService, 10);
}
}
diff --git a/src/main/java/io/confluent/idesidecar/restapi/credentials/ApiKeyAndSecret.java b/src/main/java/io/confluent/idesidecar/restapi/credentials/ApiKeyAndSecret.java
index e8d6a0b9..ea30a2c0 100644
--- a/src/main/java/io/confluent/idesidecar/restapi/credentials/ApiKeyAndSecret.java
+++ b/src/main/java/io/confluent/idesidecar/restapi/credentials/ApiKeyAndSecret.java
@@ -14,9 +14,11 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
+
+import org.apache.kafka.clients.CommonClientConfigs;
import org.eclipse.microprofile.openapi.annotations.media.Schema;
-@Schema(description = "Basic authentication credentials")
+@Schema(description = "API key and secret authentication credentials")
@RegisterForReflection
public record ApiKeyAndSecret(
@@ -54,15 +56,14 @@ public Optional