Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update connection status to work for local, CCloud and direct connections #123

Closed
rhauch opened this issue Oct 26, 2024 · 1 comment · Fixed by #169
Closed

Update connection status to work for local, CCloud and direct connections #123

rhauch opened this issue Oct 26, 2024 · 1 comment · Fixed by #169
Assignees

Comments

@rhauch
Copy link
Contributor

rhauch commented Oct 26, 2024

Continuation of #96 and #122, and followup to #116 that added initial and limited direct connections (with GraphQL support) but did not support credentials nor unify the status to work for all kinds of connections. (The status is heavily oriented around CCloud connections.)

Note that direct connections do not yet support credentials, so direct connections will only work with Kafka clusters and SR that have no authentication. We'll track adding support for other credential types in subsequent issues:

rhauch added a commit that referenced this issue Oct 26, 2024
## Summary of Changes

Resolves #96

This adds the _initial but limited_ support for a new type of connection resources, called `DIRECT` connections, that represent a local or remote Kafka cluster and/or Schema Registry. The extension might create a direct connection to talk to a locally running CP installation, or a remote Apache Kafka cluster with a remote Schema Registry, or even a CCloud cluster with Schema Registry. The user may have to provide credentials to successfully authenticate and connect to each remote service/cluster.

The direction connections are not necessarily the constructs we want to expose. It is likely that near-term versions of the VS Code extension will allow users to connect to local or remote Kafka and SR clusters, but it may also expose to users different kinds of connections that are all represented in the sidecar as direct connections. For example, maybe the VS Code extension allows users to connect to “Confluent Platform”, “Apache Kafka”, and “WarpStream” (in addition to CCloud and local AK+SR), and these might all be customized facades on top of direct connections.

This is the first step in full support for direct connections, and as such only limited functionality is available in this PR:
* CRUDL connection resources with `type=DIRECT`, with validation of fields for each type of connection.
* A direct connection contains zero or one `kafka_cluster` object and zero or one `schema_registry` objects, allowing it to connect to a (remote or local) Kafka cluster and/or a (remote or local) Schema Registry.
* GraphQL support for the Kafka cluster and SR referenced by direct connections.

Once this PR is merged, the following will handled as followups to complete the support for direct connections:
- #123
- #124
- #125 
- #126
- #127

After that, we will have more improvements to better support Confluent Platform: 
- #128

Other improvements may include:
- #129
- #130
- Allow direct connections to connect to Flink cluster and Kafka Connect cluster.

Also, we avoided using `javax` and instead used `jakarta` imports due to the transfer of Java EE to the Eclipse Foundation, where it became Jakarta EE. The older `javax` packages within Java EE have been deprecated. Jakarta EE uses the `jakarta` namespace, which is incompatible with the legacy javax namespace. Mixing the two can cause issues. Several projects and frameworks… have already adopted Jakarta. Additionally, upgrading to Jakarta allows you to keep up with the latest changes and modernize your applications.

https://github.com/jakartaee/platform/blob/main/namespace/mappings.adoc#mapping-javax-to-jakarta


## Pull request checklist

Please check if your PR fulfills the following (if applicable):

- Tests:
    - [x] Added new
    - [x] Updated existing
    - [x] Deleted existing
- [x] Have you validated this change locally against a running instance of the Quarkus dev server?
    ```shell
    make quarkus-dev
    ```
- [x] Have you validated this change against a locally running native executable?
    ```shell
    make mvn-package-native && ./target/ide-sidecar-*-runner
    ```
@rhauch rhauch self-assigned this Nov 4, 2024
rhauch added a commit that referenced this issue Nov 4, 2024
… connections (#141)

Resolves #122

Modified the `ClusterStrategyProcessor` and `ConsumeStrategyProcessor` implementations to support proxying requests to Kafka and SR clusters defined on direct connections.

Limitations: the `status` on direct connections does not yet reflect the ability to connect (see #123), and direct connections do not yet have authentication credentials (see #124).

Also slightly refactors the ITs for these APIs that run against local connections, so that we can easily run these tests against local connections and direct connections.
rhauch added a commit that referenced this issue Nov 13, 2024
…er configs in direct connections (#152)

Resolves #124

Adds basic and API key+secret credentials to direct connections, including validating the credentials in the Connections API and using credentials when connecting to the Kafka cluster and SR defined in the direct connection spec.

### New Credentials types
The `Credentials` interface and `BasicCredentials` and `ApiKeyAndSecret` record types have methods that build the auth-related configuration properties for Kafka clients and SR clients. Each concrete `Credentials` type customizes the logic, though parameters are used to supply information not in the `Credentials` objects.

The `Credentials` interface defines three methods that will likely be overridden by each concrete subtype:
* `kafkaClientProperties(...)` -- Construct the auth-related Kafka client configuration properties. The method parameter defines connectivity options that might affect these properties.
* `schemaRegistryClientProperties(...)` -- Construct the auth-related SR client configuration properties. The method parameter defines connectivity options that might affect these properties.
* `httpClientHeaders(...)` -- Construct the auth-related HTTP headers.

### New Redactable types for write-only objects

The `BasicCredentials` has a `password` field, and the `ApiKeyAndSecret` record type has a `api_secret` field. Because these fields will contain secrets, they must ensure that these fields are always masked (e.g., `********`) when written to the log or in API responses.

To do this, this PR defines a new `Password` class and `ApiSecret` class that extend a new `Redactable` abstract class representing any literal String value that must be redacted in all API responses and never logged in messages (or output by the sidecar). These are essentially write-only values that prevent external reads. The `Redactable` class includes a custom serializer that always writes a _masked_ representation consisting of exactly eight asterisk (`*`) characters _regardless of the actual literal value_. The `toString()` method also outputs the same _masked_ representation, primarily to help prevent sensitive literal values from being included in logs or exception messages. There are also a few methods that can be used in validating, such as checking whether the value is empty or longer than some size. The `hashCode()` and `equals()` methods never use the value. All of these methods are marked as final to ensure subclasses do not alter the behavior.)

### Building Kafka and SR client configurations

The logic to build the complete configurations for the Kafka admin, consumer and producer clients and the Schema Registry clients are moved into a new `ClientConfigurator` bean that is `@ApplicationScoped`. These methods rely upon the `Credentials` methods for the auth-related config properties and the `KafkaCluster` or `SchemaRegistry` cluster for the remaining configuration properties.

The `ClientConfigurator` bean’s methods have a boolean parameter as to whether the resulting configuration should redact secrets, so that the configuration can be expose the connection properties to the user, say to allow them to copy the connection properties and use them in their application, or if we use the generated (but redacted) connection configs in the template service. But the `AdminClients`, `KafkaProducerClients`, `KafkaConsumerFactory` and `SchemaRegistryClients` beans use the configurator and do not redact the configuration.

New methods have been added to the `ConnectionState` class to make it easy to get the `Credentials` for a Kafka cluster with a given ID or a Schema Registry cluster with a given ID. The `DirectConnectionState` subclass always returns the credentials for the one Kafka cluster or one SR cluster. In the future, other `ConnectionState` subclasses (e.g., for CP MDS) might need to maintain a map of credentials by cluster ID for any clusters do not have the same MDS credentials (e.g., the Kafka or SR cluster does not delegate authN functionality to MDS).

### Adding other types of credentials in the future

In the future, the only thing we need to do to support other types of authN credentials, such as [OAuth 2.0](#125), [mTLS](#126), [Kerberos (SASL/GSSAPI)](#127), etc., is to define new `Credentials` subtypes  and implement the methods to construct the auth-related client properties using the subtype-specific credential information.

### Limitations

There are a few shortcuts taken for direct connections that will be addressed in subsequent PR as part of #123:
* the `status` for direct connections is not accurate or useful, and will have to use an AdminClient and SR client to verify the credentials and update the status.
* the `kafka_cluster.id` and `schema_registry.id` are currently optional in the OpenAPI spec but are required until we can obtain the cluster ID of the remote system and verify it matches. The `RealDirectFetcher` will need to perform a describe-cluster using the admin client, and set the cluster ID. (We might consider remove the `id` fields from the connection spec, if we always get a good ID from the describe-cluster.)

### Testing
I've done some manual testing with `quarkus:dev` and native executable by using the REST API to create a direct connection that uses a CCloud cluster with API key and secret, and have verified the admin client and consumer clients are built correctly and will successfully work with the remote cluster.
@rhauch
Copy link
Contributor Author

rhauch commented Nov 14, 2024

Every Connection resource includes a spec field and status field. The spec is the user-defined information about the connection, and the status is the system-generated information. The status field is defined in the OpenAPI spec as the ConnectionStatus schema:

    ConnectionStatus:
      required:
      - authentication
      type: object
      properties:
        authentication:
          $ref: "#/components/schemas/Authentication"

with the Authentication schema:

    Authentication:
      required:
      - status
      type: object
      properties:
        status:
          $ref: "#/components/schemas/Status"
        requires_authentication_at:
          $ref: "#/components/schemas/Instant"
        user:
          $ref: "#/components/schemas/UserInfo"
        errors:
          $ref: "#/components/schemas/AuthErrors"

and the Status schema:

    Status:
      enum:
      - NO_TOKEN
      - VALID_TOKEN
      - INVALID_TOKEN
      - FAILED
      type: string

New connection resources always have an "initial" status of (in a simplified not-quite-JSON representation):

status: {
  authentication: {
    status: NO_TOKEN
  }
}

and successfully-connected CCloud connections have this status:

status: {
  authentication: {
    status: SUCCESS,
    user_info: ...
    errors: ...
  }
}

The errors field is used only if the status is FAILED or INVALID_TOKEN.

This design was added for CCloud connections and the structure and terminology of the Status enum literals is very much oriented around CCloud and/or OAuth.

Meanwhile, the spec for connection resources is defined in the OpenAPI spec as:

    ConnectionSpec:
      description: The connection details that can be set or changed.
      type: object
      properties:
        id:
          description: The unique identifier of the connection resource.
          maxLength: 64
          minLength: 1
          type: string
        name:
          description: The user-supplied name of the connection resource.
          maxLength: 64
          type: string
        type:
          description: The type of connection resource.
          type: string
          allOf:
          - $ref: "#/components/schemas/ConnectionType"
        ccloud_config:
          description: The details for connecting to CCloud.
          type: object
          allOf:
          - $ref: "#/components/schemas/CCloudConfig"
        local_config:
          description: The details for connecting to Confluent Local.
          type: object
          allOf:
          - $ref: "#/components/schemas/LocalConfig"

Note the ccloud_config and local_config objects used for CCloud and local connections, respectively.
The ccloud object, with the CCloud organization is to be used. The CCloudConfig schema is defined as:

    CCloudConfig:
      description: Configuration for Confluent Cloud connections
      required:
      - organization_id
      type: object
      properties:
        organization_id:
          description: The identifier of the CCloud organization to use. The user's
            default organization is used when absent.
          maxLength: 36
          minLength: 36
          type: string

As part of adding support for direct connections, the Connection spec was expanded to include a spec.kafka_cluster and spec.schema_registry:

    ConnectionSpec:
      description: The connection details that can be set or changed.
      type: object
      properties:
        id:
          description: The unique identifier of the connection resource.
          maxLength: 64
          minLength: 1
          type: string
        name:
          description: The user-supplied name of the connection resource.
          maxLength: 64
          type: string
        type:
          description: The type of connection resource.
          type: string
          allOf:
          - $ref: "#/components/schemas/ConnectionType"
        ccloud_config:
          description: The details for connecting to CCloud.
          type: object
          allOf:
          - $ref: "#/components/schemas/CCloudConfig"
        local_config:
          description: The details for connecting to Confluent Local.
          type: object
          allOf:
          - $ref: "#/components/schemas/LocalConfig"
        kafka_cluster:
          description: "The details for connecting to a CCloud, Confluent Platform,\
            \ or Apache Kafka cluster."
          type: object
          allOf:
          - $ref: "#/components/schemas/KafkaClusterConfig"
        schema_registry:
          description: The details for connecting to a Schema Registry.
          type: object
          allOf:
          - $ref: "#/components/schemas/SchemaRegistryConfig"

Now, local connections include a local_config object, CCloud connections include a ccloud_config object, and direct connections include a kafka_cluster object (when directly connecting to a Kafka cluster) and/or a schema_registry object (when directly connecting to a Schema Registry).

The status needs to change so that we can adequately reflect the connected-ness (and other errors) for each of the one or more local_config, ccloud_config, kafka_cluster and/or schema_registry objects in the spec. To do this, we will slowly evolve away from the status.authentication object and instead introduce an object in status for each spec configuration object:

    ConnectionStatus:
      required:
      - authentication
      type: object
      properties:
        ccloud:
          $ref: "#/components/schemas/CCloudStatus"
        kafka_cluster:
          $ref: "#/components/schemas/KafkaClusterStatus"
        schema_registry:
          $ref: "#/components/schemas/SchemaRegistryStatus"
        authentication:
          $ref: "#/components/schemas/Authentication"

where CCloudStatus is essentially the same as Authentication:

    CCloudStatus:
      description: The status related to CCloud.
      required:
      - state
      type: object
      properties:
        state:
          description: The state of the connection to CCloud.
          type: string
          allOf:
          - $ref: "#/components/schemas/ConnectedState"
        requires_authentication_at:
          description: "If the connection's auth context holds a valid token, this\
            \ attribute holds the time at which the user must re-authenticate because,\
            \ for instance, the refresh token reached the end of its absolute lifetime."
          type: string
          allOf:
          - $ref: "#/components/schemas/Instant"
        user:
          description: "Information about the authenticated principal, if known."
          type: object
          allOf:
          - $ref: "#/components/schemas/UserInfo"
        errors:
          description: Errors related to the connection to the Kafka cluster.
          type: object
          allOf:
          - $ref: "#/components/schemas/AuthErrors"

and the KafkaClusterStatus and SchemaRegistry status are defined similarly:

    KafkaClusterStatus:
      description: The status related to the specified Kafka cluster.
      required:
      - state
      type: object
      properties:
        state:
          description: The state of the connection to the Kafka cluster.
          type: string
          allOf:
          - $ref: "#/components/schemas/ConnectedState"
        user:
          description: "Information about the authenticated principal, if known."
          type: object
          allOf:
          - $ref: "#/components/schemas/UserInfo"
        errors:
          description: Errors related to the connection to the Kafka cluster.
          type: object
          allOf:
          - $ref: "#/components/schemas/AuthErrors"
...
    SchemaRegistryStatus:
      description: The status related to the specified Schema Registry.
      required:
      - state
      type: object
      properties:
        state:
          description: The state of the connection to the Schema Registry.
          type: string
          allOf:
          - $ref: "#/components/schemas/ConnectedState"
        user:
          description: "Information about the authenticated principal, if known."
          type: object
          allOf:
          - $ref: "#/components/schemas/UserInfo"
        errors:
          description: Errors related to the connection to the Schema Registry.
          type: object
          allOf:
          - $ref: "#/components/schemas/AuthErrors"

The major difference in these schemas compared to Authentication is the new ones use state rather than status (to avoid having a status.authentication.status object) and to use more generic enum literals for ConnectedState:

    ConnectedState:
      enum:
      - NONE
      - ATTEMPTING
      - SUCCESS
      - EXPIRED
      - FAILED
      type: string

where:

  • NONE means no connection has been established yet.
  • ATTEMPTING is a transient state while the sidecar is trying to validate the corresponding configuration
  • SUCCESS means the connection to the corresponding system was successfully established
  • EXPIRED means the connection was previously successfully connected but any credentials have expired
  • FAILED means the connection to the corresponding system could not be established

The extension needs to transition from using status.authentication to status.ccloud. In the short term, both will be kept (the former will be generated from the latter) so the extension can migrate in the near future, as it adds support for status.kafka_cluster and status.schema_registry.

Once that migration has been completed and the extension is no longer using status.authentication, the status.authentication object will be removed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
1 participant