Skip to content

Commit

Permalink
Added basic and API key and secret credentials for Kafka and SR clust…
Browse files Browse the repository at this point in the history
…er configs in direct connections (#152)

Resolves #124

Adds basic and API key+secret credentials to direct connections, including validating the credentials in the Connections API and using credentials when connecting to the Kafka cluster and SR defined in the direct connection spec.

### New Credentials types
The `Credentials` interface and `BasicCredentials` and `ApiKeyAndSecret` record types have methods that build the auth-related configuration properties for Kafka clients and SR clients. Each concrete `Credentials` type customizes the logic, though parameters are used to supply information not in the `Credentials` objects.

The `Credentials` interface defines three methods that will likely be overridden by each concrete subtype:
* `kafkaClientProperties(...)` -- Construct the auth-related Kafka client configuration properties. The method parameter defines connectivity options that might affect these properties.
* `schemaRegistryClientProperties(...)` -- Construct the auth-related SR client configuration properties. The method parameter defines connectivity options that might affect these properties.
* `httpClientHeaders(...)` -- Construct the auth-related HTTP headers.

### New Redactable types for write-only objects

The `BasicCredentials` has a `password` field, and the `ApiKeyAndSecret` record type has a `api_secret` field. Because these fields will contain secrets, they must ensure that these fields are always masked (e.g., `********`) when written to the log or in API responses.

To do this, this PR defines a new `Password` class and `ApiSecret` class that extend a new `Redactable` abstract class representing any literal String value that must be redacted in all API responses and never logged in messages (or output by the sidecar). These are essentially write-only values that prevent external reads. The `Redactable` class includes a custom serializer that always writes a _masked_ representation consisting of exactly eight asterisk (`*`) characters _regardless of the actual literal value_. The `toString()` method also outputs the same _masked_ representation, primarily to help prevent sensitive literal values from being included in logs or exception messages. There are also a few methods that can be used in validating, such as checking whether the value is empty or longer than some size. The `hashCode()` and `equals()` methods never use the value. All of these methods are marked as final to ensure subclasses do not alter the behavior.)

### Building Kafka and SR client configurations

The logic to build the complete configurations for the Kafka admin, consumer and producer clients and the Schema Registry clients are moved into a new `ClientConfigurator` bean that is `@ApplicationScoped`. These methods rely upon the `Credentials` methods for the auth-related config properties and the `KafkaCluster` or `SchemaRegistry` cluster for the remaining configuration properties.

The `ClientConfigurator` bean’s methods have a boolean parameter as to whether the resulting configuration should redact secrets, so that the configuration can be expose the connection properties to the user, say to allow them to copy the connection properties and use them in their application, or if we use the generated (but redacted) connection configs in the template service. But the `AdminClients`, `KafkaProducerClients`, `KafkaConsumerFactory` and `SchemaRegistryClients` beans use the configurator and do not redact the configuration.

New methods have been added to the `ConnectionState` class to make it easy to get the `Credentials` for a Kafka cluster with a given ID or a Schema Registry cluster with a given ID. The `DirectConnectionState` subclass always returns the credentials for the one Kafka cluster or one SR cluster. In the future, other `ConnectionState` subclasses (e.g., for CP MDS) might need to maintain a map of credentials by cluster ID for any clusters do not have the same MDS credentials (e.g., the Kafka or SR cluster does not delegate authN functionality to MDS).

### Adding other types of credentials in the future

In the future, the only thing we need to do to support other types of authN credentials, such as [OAuth 2.0](#125), [mTLS](#126), [Kerberos (SASL/GSSAPI)](#127), etc., is to define new `Credentials` subtypes  and implement the methods to construct the auth-related client properties using the subtype-specific credential information.

### Limitations

There are a few shortcuts taken for direct connections that will be addressed in subsequent PR as part of #123:
* the `status` for direct connections is not accurate or useful, and will have to use an AdminClient and SR client to verify the credentials and update the status.
* the `kafka_cluster.id` and `schema_registry.id` are currently optional in the OpenAPI spec but are required until we can obtain the cluster ID of the remote system and verify it matches. The `RealDirectFetcher` will need to perform a describe-cluster using the admin client, and set the cluster ID. (We might consider remove the `id` fields from the connection spec, if we always get a good ID from the describe-cluster.)

### Testing
I've done some manual testing with `quarkus:dev` and native executable by using the REST API to create a direct connection that uses a CCloud cluster with API key and secret, and have verified the admin client and consumer clients are built correctly and will successfully work with the remote cluster.
  • Loading branch information
rhauch authored Nov 13, 2024
1 parent 284eb31 commit 0a3f965
Show file tree
Hide file tree
Showing 48 changed files with 2,308 additions and 162 deletions.
12 changes: 12 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,18 @@
<artifactId>launchdarkly-java-server-sdk</artifactId>
<version>${launch-darkly-sdk.version}</version>
</dependency>
<!-- BEGIN: Quarkus extensions for Kafka and SR clients -->
<!-- This Quarkus extension registers the many classes that may be loaded reflectively by the Kafka client -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-kafka-client</artifactId>
</dependency>
<!-- This Quarkus extension registers the many classes that may be loaded reflectively by the SR client -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-confluent-registry-avro</artifactId>
</dependency>
<!-- END: Quarkus extensions for Kafka and SR clients -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-junit5</artifactId>
Expand Down
82 changes: 81 additions & 1 deletion src/generated/resources/openapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -649,6 +649,34 @@
},
"components" : {
"schemas" : {
"ApiKeyAndSecret" : {
"description" : "Basic authentication credentials",
"required" : [ "api_key", "api_secret" ],
"type" : "object",
"properties" : {
"api_key" : {
"description" : "The API key to use when connecting to the external service.",
"maxLength" : 64,
"minLength" : 1,
"type" : "string"
},
"api_secret" : {
"description" : "The API secret to use when connecting to the external service.",
"maxLength" : 64,
"minLength" : 1,
"type" : "string",
"allOf" : [ {
"$ref" : "#/components/schemas/ApiSecret"
} ]
}
}
},
"ApiSecret" : {
"description" : "A user-provided API secret that is always masked in responses",
"maxLength" : 64,
"minLength" : 1,
"type" : "string"
},
"ApplyTemplateRequest" : {
"required" : [ "options" ],
"type" : "object",
Expand Down Expand Up @@ -713,12 +741,17 @@
"username" : {
"description" : "The username to use when connecting to the external service.",
"maxLength" : 64,
"minLength" : 1,
"type" : "string"
},
"password" : {
"description" : "The password to use when connecting to the external service.",
"maxLength" : 64,
"type" : "string"
"minLength" : 1,
"type" : "string",
"allOf" : [ {
"$ref" : "#/components/schemas/Password"
} ]
}
}
},
Expand Down Expand Up @@ -874,6 +907,9 @@
}
}
},
"Credentials" : {
"type" : "object"
},
"Error" : {
"description" : "Describes a particular error encountered while performing an operation.",
"type" : "object",
Expand Down Expand Up @@ -1028,6 +1064,31 @@
"maxLength" : 256,
"minLength" : 1,
"type" : "string"
},
"credentials" : {
"description" : "The credentials for the Kafka cluster, or null if no authentication is required",
"type" : "object",
"allOf" : [ {
"$ref" : "#/components/schemas/Credentials"
} ],
"oneOf" : [ {
"$ref" : "#/components/schemas/BasicCredentials"
}, {
"$ref" : "#/components/schemas/ApiKeyAndSecret"
} ],
"nullable" : true
},
"ssl" : {
"description" : "Whether to communicate with the Kafka cluster over TLS/SSL. Defaults to 'true', but set to 'false' when the Kafka cluster does not support TLS/SSL.",
"default" : true,
"type" : "boolean",
"nullable" : true
},
"verify_ssl_certificates" : {
"description" : "Whether to verify the Kafka cluster certificates. Defaults to 'true', but set to 'false' when the Kafka cluster has self-signed certificates.",
"default" : true,
"type" : "boolean",
"nullable" : true
}
}
},
Expand Down Expand Up @@ -1171,6 +1232,12 @@
}
}
},
"Password" : {
"description" : "A user-provided password that is always masked in responses",
"maxLength" : 64,
"minLength" : 1,
"type" : "string"
},
"Preferences" : {
"required" : [ "api_version", "kind", "spec" ],
"type" : "object",
Expand Down Expand Up @@ -1227,6 +1294,19 @@
"maxLength" : 256,
"minLength" : 1,
"type" : "string"
},
"credentials" : {
"description" : "The credentials for the Schema Registry, or null if no authentication is required",
"type" : "object",
"allOf" : [ {
"$ref" : "#/components/schemas/Credentials"
} ],
"oneOf" : [ {
"$ref" : "#/components/schemas/BasicCredentials"
}, {
"$ref" : "#/components/schemas/ApiKeyAndSecret"
} ],
"nullable" : true
}
}
},
Expand Down
69 changes: 69 additions & 0 deletions src/generated/resources/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,30 @@ paths:
$ref: "#/components/schemas/HealthResponse"
components:
schemas:
ApiKeyAndSecret:
description: Basic authentication credentials
required:
- api_key
- api_secret
type: object
properties:
api_key:
description: The API key to use when connecting to the external service.
maxLength: 64
minLength: 1
type: string
api_secret:
description: The API secret to use when connecting to the external service.
maxLength: 64
minLength: 1
type: string
allOf:
- $ref: "#/components/schemas/ApiSecret"
ApiSecret:
description: A user-provided API secret that is always masked in responses
maxLength: 64
minLength: 1
type: string
ApplyTemplateRequest:
required:
- options
Expand Down Expand Up @@ -485,11 +509,15 @@ components:
username:
description: The username to use when connecting to the external service.
maxLength: 64
minLength: 1
type: string
password:
description: The password to use when connecting to the external service.
maxLength: 64
minLength: 1
type: string
allOf:
- $ref: "#/components/schemas/Password"
CCloudConfig:
description: Configuration for Confluent Cloud connections
required:
Expand Down Expand Up @@ -614,6 +642,8 @@ components:
type: array
items:
$ref: "#/components/schemas/Connection"
Credentials:
type: object
Error:
description: Describes a particular error encountered while performing an operation.
type: object
Expand Down Expand Up @@ -733,6 +763,30 @@ components:
maxLength: 256
minLength: 1
type: string
credentials:
description: "The credentials for the Kafka cluster, or null if no authentication\
\ is required"
type: object
allOf:
- $ref: "#/components/schemas/Credentials"
oneOf:
- $ref: "#/components/schemas/BasicCredentials"
- $ref: "#/components/schemas/ApiKeyAndSecret"
nullable: true
ssl:
description: "Whether to communicate with the Kafka cluster over TLS/SSL.\
\ Defaults to 'true', but set to 'false' when the Kafka cluster does not\
\ support TLS/SSL."
default: true
type: boolean
nullable: true
verify_ssl_certificates:
description: "Whether to verify the Kafka cluster certificates. Defaults\
\ to 'true', but set to 'false' when the Kafka cluster has self-signed\
\ certificates."
default: true
type: boolean
nullable: true
LocalConfig:
description: Configuration when using Confluent Local and optionally a local
Schema Registry.
Expand Down Expand Up @@ -831,6 +885,11 @@ components:
offset:
format: int64
type: integer
Password:
description: A user-provided password that is always masked in responses
maxLength: 64
minLength: 1
type: string
Preferences:
required:
- api_version
Expand Down Expand Up @@ -877,6 +936,16 @@ components:
maxLength: 256
minLength: 1
type: string
credentials:
description: "The credentials for the Schema Registry, or null if no authentication\
\ is required"
type: object
allOf:
- $ref: "#/components/schemas/Credentials"
oneOf:
- $ref: "#/components/schemas/BasicCredentials"
- $ref: "#/components/schemas/ApiKeyAndSecret"
nullable: true
SidecarAccessToken:
type: object
properties:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package io.confluent.idesidecar.restapi.application;

import com.google.protobuf.Message;
import com.sun.security.auth.module.JndiLoginModule;
import com.sun.security.auth.module.KeyStoreLoginModule;
import io.confluent.idesidecar.scaffolding.models.TemplateManifest;
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import io.confluent.kafka.schemaregistry.client.rest.entities.ErrorMessage;
Expand All @@ -16,6 +18,21 @@
import io.quarkus.runtime.annotations.RegisterForReflection;
import org.apache.kafka.clients.consumer.CooperativeStickyAssignor;
import org.apache.kafka.clients.consumer.RangeAssignor;
import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
import org.apache.kafka.common.security.authenticator.SaslClientCallbackHandler;
import org.apache.kafka.common.security.kerberos.KerberosClientCallbackHandler;
import org.apache.kafka.common.security.oauthbearer.OAuthBearerExtensionsValidatorCallback;
import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler;
import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule;
import org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback;
import org.apache.kafka.common.security.oauthbearer.OAuthBearerValidatorCallback;
import org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslClientCallbackHandler;
import org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredLoginCallbackHandler;
import org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredValidatorCallbackHandler;
import org.apache.kafka.common.security.plain.PlainLoginModule;
import org.apache.kafka.common.security.plain.internals.PlainServerCallbackHandler;
import org.apache.kafka.common.security.scram.ScramLoginModule;
import org.apache.kafka.common.security.scram.internals.ScramServerCallbackHandler;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;

/**
Expand All @@ -42,7 +59,30 @@
NullContextNameStrategy.class,
TopicNameStrategy.class,
ErrorMessage.class,
DefaultReferenceSubjectNameStrategy.class
DefaultReferenceSubjectNameStrategy.class,
// Workaround for:
// https://github.com/confluentinc/schema-registry/issues/3257
// https://github.com/quarkusio/quarkus/issues/42845
org.apache.avro.reflect.ReflectData.class,
// Kafka client login module
PlainLoginModule.class,
ScramLoginModule.class,
OAuthBearerLoginModule.class,
JndiLoginModule.class,
KeyStoreLoginModule.class,
// Kafka client callback handler implementations
AuthenticateCallbackHandler.class,
PlainServerCallbackHandler.class,
OAuthBearerLoginCallbackHandler.class,
OAuthBearerExtensionsValidatorCallback.class,
OAuthBearerValidatorCallback.class,
OAuthBearerTokenCallback.class,
KerberosClientCallbackHandler.class,
OAuthBearerSaslClientCallbackHandler.class,
OAuthBearerUnsecuredValidatorCallbackHandler.class,
OAuthBearerUnsecuredLoginCallbackHandler.class,
SaslClientCallbackHandler.class,
ScramServerCallbackHandler.class
}
)
public class ReflectionConfiguration {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,7 @@
import com.github.benmanes.caffeine.cache.CaffeineSpec;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.admin.AdminClient;
import org.eclipse.microprofile.config.inject.ConfigProperty;

/**
* Create an ApplicationScoped bean to cache AdminClient instances by connection ID and client ID.
Expand All @@ -18,10 +15,7 @@ public class AdminClients extends Clients<AdminClient> {
private static final String CAFFEINE_SPEC = "expireAfterAccess=5m";

@Inject
ClusterCache clusterCache;

@ConfigProperty(name = "ide-sidecar.admin-client-configs")
Map<String, String> adminClientSidecarConfigs;
ClientConfigurator configurator;

public AdminClients() {
super(CaffeineSpec.parse(CAFFEINE_SPEC));
Expand All @@ -38,16 +32,12 @@ public AdminClient getClient(String connectionId, String clusterId) {
return getClient(
connectionId,
clusterId,
() -> AdminClient.create(getAdminClientConfig(connectionId, clusterId))
() -> {
// Generate the Kafka admin client configuration
var config = configurator.getAdminClientConfig(connectionId, clusterId, false);
// Create the admin client
return AdminClient.create(config);
}
);
}

private Properties getAdminClientConfig(String connectionId, String clusterId) {
var cluster = clusterCache.getKafkaCluster(connectionId, clusterId);
var props = new Properties();
// Set AdminClient configs provided by the sidecar
props.putAll(adminClientSidecarConfigs);
props.put("bootstrap.servers", cluster.bootstrapServers());
return props;
}
}
Loading

0 comments on commit 0a3f965

Please sign in to comment.