Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added basic and API key and secret credentials for Kafka and SR cluster configs in direct connections #152

Merged
merged 14 commits into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,18 @@
<artifactId>launchdarkly-java-server-sdk</artifactId>
<version>${launch-darkly-sdk.version}</version>
</dependency>
<!-- BEGIN: Quarkus extensions for Kafka and SR clients -->
<!-- This Quarkus extension registers the many classes that may be loaded reflectively by the Kafka client -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-kafka-client</artifactId>
</dependency>
<!-- This Quarkus extension registers the many classes that may be loaded reflectively by the SR client -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-confluent-registry-avro</artifactId>
</dependency>
<!-- END: Quarkus extensions for Kafka and SR clients -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-junit5</artifactId>
Expand Down
82 changes: 81 additions & 1 deletion src/generated/resources/openapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -649,6 +649,34 @@
},
"components" : {
"schemas" : {
"ApiKeyAndSecret" : {
"description" : "Basic authentication credentials",
"required" : [ "api_key", "api_secret" ],
"type" : "object",
"properties" : {
"api_key" : {
"description" : "The API key to use when connecting to the external service.",
"maxLength" : 64,
"minLength" : 1,
rhauch marked this conversation as resolved.
Show resolved Hide resolved
"type" : "string"
},
"api_secret" : {
"description" : "The API secret to use when connecting to the external service.",
"maxLength" : 64,
"minLength" : 1,
"type" : "string",
"allOf" : [ {
"$ref" : "#/components/schemas/ApiSecret"
} ]
}
}
},
"ApiSecret" : {
"description" : "A user-provided API secret that is always masked in responses",
"maxLength" : 64,
"minLength" : 1,
"type" : "string"
},
"ApplyTemplateRequest" : {
"required" : [ "options" ],
"type" : "object",
Expand Down Expand Up @@ -713,12 +741,17 @@
"username" : {
"description" : "The username to use when connecting to the external service.",
"maxLength" : 64,
"minLength" : 1,
"type" : "string"
},
"password" : {
"description" : "The password to use when connecting to the external service.",
"maxLength" : 64,
"type" : "string"
"minLength" : 1,
"type" : "string",
"allOf" : [ {
"$ref" : "#/components/schemas/Password"
} ]
}
}
},
Expand Down Expand Up @@ -874,6 +907,9 @@
}
}
},
"Credentials" : {
"type" : "object"
},
rohitsanj marked this conversation as resolved.
Show resolved Hide resolved
"Error" : {
"description" : "Describes a particular error encountered while performing an operation.",
"type" : "object",
Expand Down Expand Up @@ -1028,6 +1064,31 @@
"maxLength" : 256,
"minLength" : 1,
"type" : "string"
},
"credentials" : {
"description" : "The credentials for the Kafka cluster, or null if no authentication is required",
"type" : "object",
"allOf" : [ {
"$ref" : "#/components/schemas/Credentials"
} ],
"oneOf" : [ {
"$ref" : "#/components/schemas/BasicCredentials"
}, {
"$ref" : "#/components/schemas/ApiKeyAndSecret"
} ],
"nullable" : true
},
"ssl" : {
"description" : "Whether to communicate with the Kafka cluster over TLS/SSL. Defaults to 'true', but set to 'false' when the Kafka cluster does not support TLS/SSL.",
"default" : true,
"type" : "boolean",
"nullable" : true
},
"verify_ssl_certificates" : {
"description" : "Whether to verify the Kafka cluster certificates. Defaults to 'true', but set to 'false' when the Kafka cluster has self-signed certificates.",
"default" : true,
"type" : "boolean",
"nullable" : true
}
}
},
Expand Down Expand Up @@ -1171,6 +1232,12 @@
}
}
},
"Password" : {
"description" : "A user-provided password that is always masked in responses",
"maxLength" : 64,
"minLength" : 1,
"type" : "string"
},
"Preferences" : {
"required" : [ "api_version", "kind", "spec" ],
"type" : "object",
Expand Down Expand Up @@ -1227,6 +1294,19 @@
"maxLength" : 256,
"minLength" : 1,
"type" : "string"
},
"credentials" : {
"description" : "The credentials for the Schema Registry, or null if no authentication is required",
"type" : "object",
"allOf" : [ {
"$ref" : "#/components/schemas/Credentials"
} ],
"oneOf" : [ {
"$ref" : "#/components/schemas/BasicCredentials"
}, {
"$ref" : "#/components/schemas/ApiKeyAndSecret"
} ],
"nullable" : true
}
}
},
Expand Down
69 changes: 69 additions & 0 deletions src/generated/resources/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,30 @@ paths:
$ref: "#/components/schemas/HealthResponse"
components:
schemas:
ApiKeyAndSecret:
description: Basic authentication credentials
required:
- api_key
- api_secret
type: object
properties:
api_key:
description: The API key to use when connecting to the external service.
maxLength: 64
minLength: 1
type: string
api_secret:
description: The API secret to use when connecting to the external service.
maxLength: 64
minLength: 1
type: string
allOf:
- $ref: "#/components/schemas/ApiSecret"
ApiSecret:
description: A user-provided API secret that is always masked in responses
maxLength: 64
minLength: 1
type: string
ApplyTemplateRequest:
required:
- options
Expand Down Expand Up @@ -485,11 +509,15 @@ components:
username:
description: The username to use when connecting to the external service.
maxLength: 64
minLength: 1
type: string
password:
description: The password to use when connecting to the external service.
maxLength: 64
minLength: 1
type: string
allOf:
- $ref: "#/components/schemas/Password"
CCloudConfig:
description: Configuration for Confluent Cloud connections
required:
Expand Down Expand Up @@ -614,6 +642,8 @@ components:
type: array
items:
$ref: "#/components/schemas/Connection"
Credentials:
type: object
Error:
description: Describes a particular error encountered while performing an operation.
type: object
Expand Down Expand Up @@ -733,6 +763,30 @@ components:
maxLength: 256
minLength: 1
type: string
credentials:
description: "The credentials for the Kafka cluster, or null if no authentication\
\ is required"
type: object
allOf:
- $ref: "#/components/schemas/Credentials"
Comment on lines +769 to +771
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TLDR: Including allOf here generates the API client code with credentials as a generic object rather than the more strongly typed union object. Let's keep it this way while the incessant bugs in the typescript-fetch template are fixed, at the cost of a bit of developer unfriendliness in trying to determine whether the opaque object is a BasicCredentials or ApiKeyAndSecret.

Upon running npx gulp apigen format in the confluentinc/vscode repo after copying over the OpenAPI spec from this branch, I see the following diff for src/clients/sidecar/models/KafkaClusterConfig.ts (out of a few other changed files too). You'll see that the type for credentials in the generated TypeScript interface is object | null.

Git diff for `src/clients/sidecar/models/KafkaClusterConfig.ts`


diff --git a/src/clients/sidecar/models/KafkaClusterConfig.ts b/src/clients/sidecar/models/KafkaClusterConfig.ts
index d9fb41b..d1f0695 100644
--- a/src/clients/sidecar/models/KafkaClusterConfig.ts
+++ b/src/clients/sidecar/models/KafkaClusterConfig.ts
@@ -31,6 +31,24 @@ export interface KafkaClusterConfig {
    * @memberof KafkaClusterConfig
    */
   bootstrap_servers: string;
+  /**
+   * The credentials for the Kafka cluster, or null if no authentication is required
+   * @type {object}
+   * @memberof KafkaClusterConfig
+   */
+  credentials?: object | null;
+  /**
+   * Whether to communicate with the Kafka cluster over TLS/SSL. Defaults to 'true', but set to 'false' when the Kafka cluster does not support TLS/SSL.
+   * @type {boolean}
+   * @memberof KafkaClusterConfig
+   */
+  ssl?: boolean | null;
+  /**
+   * Whether to verify the Kafka cluster certificates. Defaults to 'true', but set to 'false' when the Kafka cluster has self-signed certificates.
+   * @type {boolean}
+   * @memberof KafkaClusterConfig
+   */
+  verify_ssl_certificates?: boolean | null;
 }
 
 /**
@@ -55,6 +73,10 @@ export function KafkaClusterConfigFromJSONTyped(
   return {
     id: json["id"] == null ? undefined : json["id"],
     bootstrap_servers: json["bootstrap_servers"],
+    credentials: json["credentials"] == null ? undefined : json["credentials"],
+    ssl: json["ssl"] == null ? undefined : json["ssl"],
+    verify_ssl_certificates:
+      json["verify_ssl_certificates"] == null ? undefined : json["verify_ssl_certificates"],
   };
 }
 
@@ -65,5 +87,8 @@ export function KafkaClusterConfigToJSON(value?: KafkaClusterConfig | null): any
   return {
     id: value["id"],
     bootstrap_servers: value["bootstrap_servers"],
+    credentials: value["credentials"],
+    ssl: value["ssl"],
+    verify_ssl_certificates: value["verify_ssl_certificates"],
   };
 }

To actually declare the type of credentials as a union of BasicCredentials and ApiKeyAndSecret, we'll need to remove the allOf from the spec above (using some bespoke parameter in the @Schema annotation somewhere).

So, I did exactly that, removed the allOf field from both KafkaClusterConfig and SchemaRegistryClusterConfig, then generated the clients again using npx gulp apigen format. Well, this opened a can of worms. Read on..

confluentinc/vscode uses the typescript-fetch template to generate the API client code, and we've currently pinned its version to 7.7.0. The generated type for credentials was how I'd expected it to be (see below), but unfortunately, there was a bug in one of the functions in the generated code: "Function lacks ending return statement and return type does not include 'undefined'".

// In src/clients/sidecar/models/KafkaClusterConfig.ts
...
  /**
   *
   * @type {KafkaClusterConfigCredentials}
   * @memberof KafkaClusterConfig
   */
  credentials?: KafkaClusterConfigCredentials | null;
...
// In src/clients/sidecar/models/KafkaClusterConfigCredentials.ts
...
/**
 * @type KafkaClusterConfigCredentials
 * The credentials for the Kafka cluster, or null if no authentication is required
 * @export
 */
export type KafkaClusterConfigCredentials = ApiKeyAndSecret | BasicCredentials;
...

So, I upgraded the openapi-generator-version to the latest stable version 7.9.0 in the hopes that this bug will disappear and regenerated the API clients (370 file diff) with the sans-allOf OpenAPI spec. Unfortunately, there was yet another bug in the generated code. It was fixed just last week and will likely be a while before it makes it into a stable release.

All this to say, let's keep this spec as is and wait to see if the next release of openapi-generator poses issues.

oneOf:
- $ref: "#/components/schemas/BasicCredentials"
- $ref: "#/components/schemas/ApiKeyAndSecret"
nullable: true
ssl:
description: "Whether to communicate with the Kafka cluster over TLS/SSL.\
\ Defaults to 'true', but set to 'false' when the Kafka cluster does not\
\ support TLS/SSL."
default: true
type: boolean
nullable: true
verify_ssl_certificates:
description: "Whether to verify the Kafka cluster certificates. Defaults\
\ to 'true', but set to 'false' when the Kafka cluster has self-signed\
\ certificates."
default: true
type: boolean
nullable: true
LocalConfig:
description: Configuration when using Confluent Local and optionally a local
Schema Registry.
Expand Down Expand Up @@ -831,6 +885,11 @@ components:
offset:
format: int64
type: integer
Password:
description: A user-provided password that is always masked in responses
maxLength: 64
minLength: 1
type: string
Preferences:
required:
- api_version
Expand Down Expand Up @@ -877,6 +936,16 @@ components:
maxLength: 256
minLength: 1
type: string
credentials:
description: "The credentials for the Schema Registry, or null if no authentication\
\ is required"
type: object
allOf:
- $ref: "#/components/schemas/Credentials"
oneOf:
- $ref: "#/components/schemas/BasicCredentials"
- $ref: "#/components/schemas/ApiKeyAndSecret"
nullable: true
SidecarAccessToken:
type: object
properties:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package io.confluent.idesidecar.restapi.application;

import com.google.protobuf.Message;
import com.sun.security.auth.module.JndiLoginModule;
import com.sun.security.auth.module.KeyStoreLoginModule;
import io.confluent.idesidecar.scaffolding.models.TemplateManifest;
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import io.confluent.kafka.schemaregistry.client.rest.entities.ErrorMessage;
Expand All @@ -16,6 +18,21 @@
import io.quarkus.runtime.annotations.RegisterForReflection;
import org.apache.kafka.clients.consumer.CooperativeStickyAssignor;
import org.apache.kafka.clients.consumer.RangeAssignor;
import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
import org.apache.kafka.common.security.authenticator.SaslClientCallbackHandler;
import org.apache.kafka.common.security.kerberos.KerberosClientCallbackHandler;
import org.apache.kafka.common.security.oauthbearer.OAuthBearerExtensionsValidatorCallback;
import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler;
import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule;
import org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback;
import org.apache.kafka.common.security.oauthbearer.OAuthBearerValidatorCallback;
import org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslClientCallbackHandler;
import org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredLoginCallbackHandler;
import org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredValidatorCallbackHandler;
import org.apache.kafka.common.security.plain.PlainLoginModule;
import org.apache.kafka.common.security.plain.internals.PlainServerCallbackHandler;
import org.apache.kafka.common.security.scram.ScramLoginModule;
import org.apache.kafka.common.security.scram.internals.ScramServerCallbackHandler;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;

/**
Expand All @@ -42,7 +59,30 @@
NullContextNameStrategy.class,
TopicNameStrategy.class,
ErrorMessage.class,
DefaultReferenceSubjectNameStrategy.class
DefaultReferenceSubjectNameStrategy.class,
// Workaround for:
// https://github.com/confluentinc/schema-registry/issues/3257
// https://github.com/quarkusio/quarkus/issues/42845
org.apache.avro.reflect.ReflectData.class,
// Kafka client login module
PlainLoginModule.class,
ScramLoginModule.class,
OAuthBearerLoginModule.class,
JndiLoginModule.class,
KeyStoreLoginModule.class,
// Kafka client callback handler implementations
AuthenticateCallbackHandler.class,
PlainServerCallbackHandler.class,
OAuthBearerLoginCallbackHandler.class,
OAuthBearerExtensionsValidatorCallback.class,
OAuthBearerValidatorCallback.class,
OAuthBearerTokenCallback.class,
KerberosClientCallbackHandler.class,
OAuthBearerSaslClientCallbackHandler.class,
OAuthBearerUnsecuredValidatorCallbackHandler.class,
OAuthBearerUnsecuredLoginCallbackHandler.class,
SaslClientCallbackHandler.class,
ScramServerCallbackHandler.class
}
)
public class ReflectionConfiguration {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,7 @@
import com.github.benmanes.caffeine.cache.CaffeineSpec;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.admin.AdminClient;
import org.eclipse.microprofile.config.inject.ConfigProperty;

/**
* Create an ApplicationScoped bean to cache AdminClient instances by connection ID and client ID.
Expand All @@ -18,10 +15,7 @@ public class AdminClients extends Clients<AdminClient> {
private static final String CAFFEINE_SPEC = "expireAfterAccess=5m";

@Inject
ClusterCache clusterCache;

@ConfigProperty(name = "ide-sidecar.admin-client-configs")
Map<String, String> adminClientSidecarConfigs;
ClientConfigurator configurator;

public AdminClients() {
super(CaffeineSpec.parse(CAFFEINE_SPEC));
Expand All @@ -38,16 +32,12 @@ public AdminClient getClient(String connectionId, String clusterId) {
return getClient(
connectionId,
clusterId,
() -> AdminClient.create(getAdminClientConfig(connectionId, clusterId))
() -> {
// Generate the Kafka admin client configuration
var config = configurator.getAdminClientConfig(connectionId, clusterId, false);
// Create the admin client
return AdminClient.create(config);
}
);
}

private Properties getAdminClientConfig(String connectionId, String clusterId) {
var cluster = clusterCache.getKafkaCluster(connectionId, clusterId);
var props = new Properties();
// Set AdminClient configs provided by the sidecar
props.putAll(adminClientSidecarConfigs);
props.put("bootstrap.servers", cluster.bootstrapServers());
return props;
}
}
Loading