From be43098a824674754215c86ccaeae9ad7599c451 Mon Sep 17 00:00:00 2001 From: hpal Date: Fri, 17 Jan 2025 14:34:05 +0000 Subject: [PATCH 1/2] add consumer groups --- .../kafka/.port/resources/blueprints.json | 97 +++++++++++++++++++ .../.port/resources/port-app-config.yaml | 18 ++++ integrations/kafka/.port/spec.yaml | 1 + .../kafka/kafka_integration/client.py | 48 +++++++++ integrations/kafka/main.py | 7 ++ 5 files changed, 171 insertions(+) diff --git a/integrations/kafka/.port/resources/blueprints.json b/integrations/kafka/.port/resources/blueprints.json index a911c50f12..1b2a12ec5a 100644 --- a/integrations/kafka/.port/resources/blueprints.json +++ b/integrations/kafka/.port/resources/blueprints.json @@ -92,5 +92,102 @@ "many": true } } + }, + { + "identifier": "kafkaConsumerGroup", + "title": "Consumer Group", + "icon": "Kafka", + "schema": { + "properties": { + "state": { + "title": "State", + "type": "string", + "description": "The current state of the consumer group." + }, + "members": { + "title": "Members", + "type": "array", + "description": "List of members in the consumer group.", + "items": { + "type": "object", + "properties": { + "id": { + "title": "Member ID", + "type": "string", + "description": "Unique identifier for the consumer member." + }, + "client_id": { + "title": "Client ID", + "type": "string", + "description": "Client ID of the consumer member." + }, + "host": { + "title": "Host", + "type": "string", + "description": "Host address of the consumer member." + }, + "assignment": { + "title": "Assignment", + "type": "object", + "description": "Details of the topic partitions assigned to the member.", + "properties": { + "topic_partitions": { + "title": "Topic Partitions", + "type": "array", + "description": "List of topic-partition pairs assigned to the member.", + "items": { + "type": "object", + "properties": { + "topic": { + "title": "Topic", + "type": "string", + "description": "Name of the topic." + }, + "partition": { + "title": "Partition", + "type": "number", + "description": "Partition number within the topic." + } + } + } + } + } + } + } + } + }, + "coordinator": { + "title": "Coordinator", + "type": "number", + "description": "Broker ID of the coordinator for the consumer group." + }, + "partition_assignor": { + "title": "Partition Assignor", + "type": "string", + "description": "Strategy used to assign partitions to consumers." + }, + "is_simple_consumer_group": { + "title": "Is Simple Consumer Group", + "type": "boolean", + "description": "Indicates if the group is a simple consumer group." + }, + "authorized_operations": { + "title": "Authorized Operations", + "type": "array", + "description": "List of operations authorized for the consumer group.", + "items": { + "type": "string" + } + } + } + }, + "calculationProperties": {}, + "relations": { + "cluster": { + "target": "kafkaCluster", + "required": true, + "many": false + } + } } ] diff --git a/integrations/kafka/.port/resources/port-app-config.yaml b/integrations/kafka/.port/resources/port-app-config.yaml index 1e81e0eeeb..80b7ef7354 100644 --- a/integrations/kafka/.port/resources/port-app-config.yaml +++ b/integrations/kafka/.port/resources/port-app-config.yaml @@ -46,3 +46,21 @@ resources: relations: cluster: .cluster_name brokers: '[.cluster_name + "_" + (.partitions[].replicas[] | tostring)] | unique' + - kind: consumer_group + selector: + query: 'true' + port: + entity: + mappings: + identifier: .cluster_name + "_" + .group_id + title: .group_id + blueprint: '"kafkaConsumerGroup"' + properties: + state: .state + members: .members + coordinator: .coordinator.id + partition_assignor: .partition_assignor + is_simple_consumer_group: .is_simple_consumer_group + authorized_operations: .authorized_operations + relations: + cluster: .cluster_name diff --git a/integrations/kafka/.port/spec.yaml b/integrations/kafka/.port/spec.yaml index b4b867fc60..a11d678b45 100644 --- a/integrations/kafka/.port/spec.yaml +++ b/integrations/kafka/.port/spec.yaml @@ -9,6 +9,7 @@ features: - kind: cluster - kind: broker - kind: topic + - kind: consumer_group saas: enabled: false configurations: diff --git a/integrations/kafka/kafka_integration/client.py b/integrations/kafka/kafka_integration/client.py index 24d33d5206..8953002b91 100644 --- a/integrations/kafka/kafka_integration/client.py +++ b/integrations/kafka/kafka_integration/client.py @@ -86,3 +86,51 @@ def describe_topics(self) -> list[dict[str, Any]]: logger.error(f"Failed to describe topic {topic_name}: {e}") raise e return result_topics + + def describe_consumer_groups(self) -> list[dict[str, Any]]: + """Describe all consumer groups in the cluster.""" + result_groups = [] + + # List all consumer groups and wait for the future to complete + groups_metadata = self.kafka_admin_client.list_consumer_groups() + groups_result = groups_metadata.result() + group_ids = [group.group_id for group in groups_result.valid] + + logger.info(f"Found {len(group_ids)} consumer groups") + if not group_ids: + return result_groups + + # Describe the consumer groups + groups_description = self.kafka_admin_client.describe_consumer_groups(group_ids) + + for group_id, future in groups_description.items(): + try: + group_info = future.result() + members = [{ + 'id': member.member_id, + 'client_id': member.client_id, + 'host': member.host, + 'assignment': { + 'topic_partitions': [ + {'topic': tp.topic, 'partition': tp.partition} + for tp in member.assignment.topic_partitions + ] + } + } for member in group_info.members] + + result_groups.append({ + 'group_id': group_id, + 'state': group_info.state.name, + 'members': members, + 'cluster_name': self.cluster_name, + 'coordinator': group_info.coordinator.id, + 'partition_assignor': group_info.partition_assignor, + 'is_simple_consumer_group': group_info.is_simple_consumer_group, + 'authorized_operations': group_info.authorized_operations + }) + except Exception as e: + logger.error(f"Failed to describe consumer group {group_id}: {e}") + raise e + + return result_groups + diff --git a/integrations/kafka/main.py b/integrations/kafka/main.py index ff82f8d021..4d6fafb2cd 100644 --- a/integrations/kafka/main.py +++ b/integrations/kafka/main.py @@ -33,3 +33,10 @@ async def resync_topics(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: kafka_clients = init_clients() for kafka_client in kafka_clients: yield kafka_client.describe_topics() + + +@ocean.on_resync("consumer_group") +async def resync_consumer_groups(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: + kafka_clients = init_clients() + for kafka_client in kafka_clients: + yield kafka_client.describe_consumer_groups() From 7f4e1a27ea93930aa002d0f263e13b1ec4da6176 Mon Sep 17 00:00:00 2001 From: hpal Date: Fri, 17 Jan 2025 14:46:12 +0000 Subject: [PATCH 2/2] add examples and --- .../examples/consumer_groups.entity.json | 36 ++++++++++++ .../examples/consumer_groups.response.json | 24 ++++++++ .../kafka/kafka_integration/client.py | 56 ++++++++++--------- 3 files changed, 90 insertions(+), 26 deletions(-) create mode 100644 integrations/kafka/examples/consumer_groups.entity.json create mode 100644 integrations/kafka/examples/consumer_groups.response.json diff --git a/integrations/kafka/examples/consumer_groups.entity.json b/integrations/kafka/examples/consumer_groups.entity.json new file mode 100644 index 0000000000..9b8c4c905a --- /dev/null +++ b/integrations/kafka/examples/consumer_groups.entity.json @@ -0,0 +1,36 @@ +{ + "blueprint": "kafkaConsumerGroup", + "identifier": "local-cluster_conduktor_gateway", + "createdAt": "2025-01-17T14:23:38.182Z", + "updatedBy": "", + "createdBy": "", + "icon": null, + "team": [], + "title": "conduktor_gateway", + "relations": { + "cluster": "local-cluster" + }, + "properties": { + "partition_assignor": "range", + "coordinator": null, + "authorized_operations": null, + "is_simple_consumer_group": false, + "members": [ + { + "assignment": { + "topic_partitions": [ + { + "partition": 0, + "topic": "_conduktor_gateway_license" + } + ] + }, + "host": "172.23.0.6", + "id": "conduktor-gateway_6969-707254dd-d7d6-4e34-9ac9-d868d5fd5f56", + "client_id": "conduktor-gateway_6969" + } + ], + "state": "STABLE" + }, + "updatedAt": "2025-01-17T14:23:38.182Z" +} \ No newline at end of file diff --git a/integrations/kafka/examples/consumer_groups.response.json b/integrations/kafka/examples/consumer_groups.response.json new file mode 100644 index 0000000000..61ff96ca62 --- /dev/null +++ b/integrations/kafka/examples/consumer_groups.response.json @@ -0,0 +1,24 @@ +{ + "group_id": "_confluent-ksql-default_query_CTAS_PURCHASE_PER_PRODUCT_0", + "state": "EMPTY", + "members": [ + { + "assignment": { + "topic_partitions": [ + { + "partition": 0, + "topic": "_conduktor_gateway_license" + } + ] + }, + "host": "172.23.0.6", + "id": "conduktor-gateway_6969-707254dd-d7d6-4e34-9ac9-d868d5fd5f56", + "client_id": "conduktor-gateway_6969" + } + ], + "cluster_name": "local-cluster", + "coordinator": 0, + "partition_assignor": "range", + "is_simple_consumer_group": false, + "authorized_operations": null +} \ No newline at end of file diff --git a/integrations/kafka/kafka_integration/client.py b/integrations/kafka/kafka_integration/client.py index 8953002b91..6429b04bca 100644 --- a/integrations/kafka/kafka_integration/client.py +++ b/integrations/kafka/kafka_integration/client.py @@ -89,8 +89,8 @@ def describe_topics(self) -> list[dict[str, Any]]: def describe_consumer_groups(self) -> list[dict[str, Any]]: """Describe all consumer groups in the cluster.""" - result_groups = [] - + result_groups: list[dict[str, Any]] = [] + # List all consumer groups and wait for the future to complete groups_metadata = self.kafka_admin_client.list_consumer_groups() groups_result = groups_metadata.result() @@ -102,35 +102,39 @@ def describe_consumer_groups(self) -> list[dict[str, Any]]: # Describe the consumer groups groups_description = self.kafka_admin_client.describe_consumer_groups(group_ids) - + for group_id, future in groups_description.items(): try: group_info = future.result() - members = [{ - 'id': member.member_id, - 'client_id': member.client_id, - 'host': member.host, - 'assignment': { - 'topic_partitions': [ - {'topic': tp.topic, 'partition': tp.partition} - for tp in member.assignment.topic_partitions - ] + members = [ + { + "id": member.member_id, + "client_id": member.client_id, + "host": member.host, + "assignment": { + "topic_partitions": [ + {"topic": tp.topic, "partition": tp.partition} + for tp in member.assignment.topic_partitions + ] + }, } - } for member in group_info.members] - - result_groups.append({ - 'group_id': group_id, - 'state': group_info.state.name, - 'members': members, - 'cluster_name': self.cluster_name, - 'coordinator': group_info.coordinator.id, - 'partition_assignor': group_info.partition_assignor, - 'is_simple_consumer_group': group_info.is_simple_consumer_group, - 'authorized_operations': group_info.authorized_operations - }) + for member in group_info.members + ] + + result_groups.append( + { + "group_id": group_id, + "state": group_info.state.name, + "members": members, + "cluster_name": self.cluster_name, + "coordinator": group_info.coordinator.id, + "partition_assignor": group_info.partition_assignor, + "is_simple_consumer_group": group_info.is_simple_consumer_group, + "authorized_operations": group_info.authorized_operations, + } + ) except Exception as e: logger.error(f"Failed to describe consumer group {group_id}: {e}") raise e - - return result_groups + return result_groups