-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Populate the status for direct connections and ensure Kafka and SR proxies use credentials in headers #169
Conversation
…oxies use credentials in headers Modifies the `DirectConnectionState` class to properly populate the status for direct connections, after verifying the ability to connect to Kafka using an AdminClient and to SR using an SR client. Also, fixed a few issues with how we compute the authentiation-related headers using the credentials. These headers are used in the Kafka REST proxy and SR proxy implementations. Some debug log messages were added to the ‘KafkaProducerClients’, `KafkaConsumerFactory`, `SchemaRegistryClients` and `AdminClients` beans, including logging at debug level the (redacted) configuration properties for these clients. To make it easier to log the redacted configurations, the `ClientConfigurator` class was changed to return a `Configuration` object rather than a `Map<String, Object>`. The `Configuration` object has one method to get the configuration properties as a map, while the `toString()` and other methods ensure that only the redacted form of the configuration is otherwise exposed. Finally, fixed a bug in `BasicCredentials` and `ApiKeyAndSecret` classes in the `httpClientHeaders()` implementations. This method returns the authorization-related HTTP headers, and the base64-encoded value was not being created correctly. A few unit tests were added, using obviously-fake secret values.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few comments to help reviewers.
ScramServerCallbackHandler.class, | ||
// Schema Registry client classes that are not registered in | ||
// https://github.com/quarkusio/quarkus/blob/3.16.3/extensions/schema-registry/confluent/common/deployment/src/main/java/io/quarkus/confluent/registry/common/ConfluentRegistryClientProcessor.java | ||
Mode.class, | ||
ExtendedSchema.class, | ||
Rule.class, | ||
RuleKind.class, | ||
RuleMode.class, | ||
RuleSet.class, | ||
SchemaEntity.class, | ||
SchemaTags.class, | ||
SchemaRegistryServerVersion.class, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Quarkus library for Confluent SR does not register all of the classes we will (or might) need at runtime, so we do this here. Most of these are related to data contracts, which were a more recent additional to SR.
var config = configurator.getAdminClientConfig(connectionId, clusterId); | ||
Log.debugf( | ||
"Creating schema registry client for connection %s and cluster %s with configuration:\n %s", | ||
connectionId, | ||
clusterId, | ||
config | ||
); | ||
// Create the admin client | ||
return AdminClient.create(config); | ||
return AdminClient.create(config.asMap()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changing the ClientConfigurator.get*(...)
methods to return a Configuration
object makes it much easier to get the configuration once and then use the redacted form in log message and pass the non-redacted form to the client.
String connectionId, | ||
MultiMap headers, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The places where this method is used knows whether it's using the REST client for Kafka or SR. Since each type of cluster need different credentials and therefore authN-related headers, it's easier for the caller to compute the appropriate headers and simply pass them into this method.
return switch (connectionType) { | ||
case CCLOUD -> clusterType == ClusterType.KAFKA | ||
? confluentCloudKafkaClusterStrategy : confluentCloudSchemaRegistryClusterStrategy; | ||
case LOCAL -> | ||
clusterType == ClusterType.KAFKA | ||
? confluentLocalKafkaClusterStrategy : confluentLocalSchemaRegistryClusterStrategy; | ||
case DIRECT -> | ||
clusterType == ClusterType.KAFKA | ||
? directKafkaClusterStrategy : directSchemaRegistryClusterStrategy; | ||
case PLATFORM -> null; | ||
return switch(clusterType) { | ||
case KAFKA -> switch (connectionType) { | ||
case CCLOUD -> confluentCloudKafkaClusterStrategy; | ||
case LOCAL -> confluentLocalKafkaClusterStrategy; | ||
case DIRECT -> directKafkaClusterStrategy; | ||
case PLATFORM -> null; | ||
}; | ||
case SCHEMA_REGISTRY -> switch (connectionType) { | ||
case CCLOUD -> confluentCloudSchemaRegistryClusterStrategy; | ||
case LOCAL -> confluentLocalSchemaRegistryClusterStrategy; | ||
case DIRECT -> directSchemaRegistryClusterStrategy; | ||
case PLATFORM -> null; | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This just refactors the logic to be a bit easier to debug (and set breakpoints) and read.
/** | ||
* Constructs the headers for the proxied request, and add the authentication headers from the | ||
* credentials, and the `target-sr-cluster` header set to the connection's SR cluster ID. | ||
* @param context the context of the proxy request | ||
* @return the headers to be used in the proxy request to the Schema Registry | ||
*/ | ||
@Override | ||
public MultiMap constructProxyHeaders(ClusterProxyContext context) { | ||
var headers = super.constructProxyHeaders(context); | ||
if (context.getConnectionState() instanceof DirectConnectionState directConnectionState) { | ||
var srConfig = directConnectionState.getSpec().schemaRegistryConfig(); | ||
if (srConfig != null) { | ||
var credentials = srConfig.credentials(); | ||
if (credentials != null) { | ||
credentials.httpClientHeaders().ifPresent(map -> map.forEach(headers::add)); | ||
} | ||
} | ||
} | ||
headers.add(TARGET_SR_CLUSTER_HEADER, context.getClusterId()); | ||
|
||
return headers; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is pretty much the same as CCloudSchemaRegistryClusterStrategy
but the credentials are accessed differently.
var value = "%s:%s".formatted(key, secret.asCharArray()); | ||
var value = "%s:%s".formatted(key, secret.asString()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This bug is super subtle: the toString() of the char array is not the same as a String created from the char array. Added a unit test for this.
return new String(this.asCharArray()); | ||
return new String(raw); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The this.asCharArray()
makes a copy of the raw array; then String makes another copy. This change avoids the extra copy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, @rhauch! I have one question about the handling of timeouts when retrieving the health status of Direct Connections and a few very minor suggestions/questions. Otherwise, your PR looks good to me. 🎉
.flatMap(creds -> creds.kafkaClientProperties(options)) | ||
.ifPresent(props::putAll); | ||
|
||
// Add any auth properties for Schema Registry to the Kafka client config, | ||
// with the "schema.registry." prefix (unless the property already starts with that) | ||
if (sr != null) { | ||
var additional = getSchemaRegistryClientConfig(connection, sr, redact); | ||
if (srUri != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we also verify that srId
is not null
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
srId
can be null at this point. I've clarified the JavaDoc of the method called below, and removed it from the two ConnectionState
methods.
src/main/java/io/confluent/idesidecar/restapi/cache/ClientConfigurator.java
Show resolved
Hide resolved
.state(ConnectedState.SUCCESS) | ||
.build() | ||
); | ||
} catch (Exception e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it make sense to log the exception?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking no, because the message is passed to the status error, and because it is an expected situation if invalid credentials are called. (We don't want users thinking that because the sidecar logged an exception and stack trace that there is a bug within the sidecar causing the failure to connect.)
I could add it a debug log message, though users would never see that and it's not clear it would help us if trying to debug an auth failure. So I'm inclined to not add it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought having access to the exception name and stack trace could be useful for debugging connectivity/auth issues. If the exception message is sufficient, we can keep it as it is. If we decide to log the exception, we should use at least the INFO
level so that it shows up in the log, as you pointed out.
.builder() | ||
.state(ConnectedState.FAILED) | ||
.errors( | ||
new AuthErrors().withSignIn( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd suggest to put this error under authStatusCheck
because it occurred when checking the status of the direct connection. WDYT?
new AuthErrors().withSignIn( | |
new AuthErrors().withAuthStatusCheck( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The whole point of #162 is to deprecate and move away from the status.authentication
object, since it only applies to CCloud. The CCloud-related status is available under a nested status.ccloud
object (structurally identical to status.authentication
), while direct connection errors (incl auth-related errors) are recorded under status.kafka_cluster
or status.schema_registry
(depending upon where the error is).
// and describing the cluster. | ||
try (var adminClient = createAdminClient(kafkaConfig)) { | ||
var clusterDesc = adminClient.describeCluster(); | ||
var actualClusterId = clusterDesc.clusterId().get(5, TimeUnit.SECONDS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we make such timeouts configurable via the application.yml or env vars? It would allow the extension to overwrite/change them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't anticipate the extension needing to change this, but I can add a configuration. How about a ide-sidecar.connections.direct.timeout-seconds
property?
} | ||
// There is a Schema Registry configuration, so validate the connection by creating a | ||
// SchemaRegistryClient and getting the mode. | ||
try (var srClient = createSchemaRegistryClient(schemaRegistryConfig)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you know if createSchemaRegistryClient
(or the constructor of CachedSchemaRegistryClient
) uses a timeout when creating the SR client? If not, we'd probably wait for a long time when running into network issues so we want to set a timeout.
Alternatively, we could add timeouts to the futures in lines 84 and 85, and let them fail if they exceed the timeout. If we go down that road, we'd have to build the FAILED
state in the method getConnectionStatus()
.
I'm leaning towards the latter option. What's your take on it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The default timeout for all REST API calls made by the CachedSchemaRegistryClient
is 30 seconds. It's configurable, so I can just reuse the same timeout value constant discussed above.
credentials.httpClientHeaders().ifPresent(map -> map.forEach(headers::add)); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we throw an exception or log an error if context.getConnectionState()
is not a DirectConnectionState
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We're not doing that in ConfluentCloudSchemaRegistryClusterStrategy, and I expect that's the case because ClusterStrategyProcessor is always using it correctly and therefore it should never occur at runtime.
That means that throwing an exception will only help during development, but maybe that's reason enough.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, if we want to add that, I think we should do that in a followup, since we'd also want to add it in multiple other places.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doing it in a follow-up sounds good to me. I wanted to make sure that we don't swallow any errors.
Thanks for the review, @flippingbits. I've incorporated your feedback -- would you please take a look? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the update, @rhauch. Your PR looks good to me.
Resolves #123
Summary of Changes
Completes the functionality of direct connections, though we will add more integration tests as follow-ons. For example, the existing ITs use
LocalTestEnvironment
that runs containers without authN, and the recently-added ITs for direct connections use no credentials. We'll need anotherConfluentPlatformTestEnvironment
that runs CP clusters with several authN mechanisms enabled, and then we'll run additional tests against that.Any additional details or context that should be provided?
This PR modifies the
DirectConnectionState
class to properly populate the status for direct connections, after verifying the ability to connect to Kafka using an AdminClient and to SR using an SR client.It also fixes a few issues with how we compute the authentication-related headers using the credentials. These headers are used in the Kafka REST proxy and SR proxy implementations.
It adds some debug log messages to the ‘KafkaProducerClients’,
KafkaConsumerFactory
,SchemaRegistryClients
andAdminClients
beans, including logging at debug level the (redacted) configuration properties for these clients. To make it easier to log the redacted configurations, theClientConfigurator
class was changed to return aConfiguration
object rather than aMap<String, Object>
. TheConfiguration
object has one method to get the configuration properties as a map, while thetoString()
and other methods ensure that only the redacted form of the configuration is otherwise exposed.Finally, fixed a bug in
BasicCredentials
andApiKeyAndSecret
classes in thehttpClientHeaders()
implementations. This method returns the authorization-related HTTP headers, and the base64-encoded value was not being created correctly. A few unit tests were added, using obviously-fake secret values.Pull request checklist
Please check if your PR fulfills the following (if applicable):