diff --git a/integrations/_infra/Makefile b/integrations/_infra/Makefile index 069dbbc5dd..4329f78d24 100644 --- a/integrations/_infra/Makefile +++ b/integrations/_infra/Makefile @@ -41,7 +41,7 @@ define deactivate_virtualenv fi endef -.SILENT: install install/prod install/local-core lint lint/fix run test clean +.SILENT: install install/prod install/local-core lint lint/fix run test clean seed install: $(call deactivate_virtualenv) && \ @@ -85,3 +85,11 @@ clean: rm -rf .tox/ rm -rf docs/_build rm -rf dist/ + +seed: + @if [ -f "tests/seed_data.py" ]; then \ + $(ACTIVATE) && python tests/seed_data.py; \ + else \ + echo "No seeding script found. Create tests/seed_data.py for this integration if needed."; \ + exit 0; \ + fi diff --git a/integrations/kafka/.port/resources/blueprints.json b/integrations/kafka/.port/resources/blueprints.json index 1e9c0259ea..334859aa06 100644 --- a/integrations/kafka/.port/resources/blueprints.json +++ b/integrations/kafka/.port/resources/blueprints.json @@ -1,48 +1,48 @@ [ { - "identifier": "kafkaCluster", - "title": "Cluster", - "icon": "Kafka", - "schema": { - "properties": { - "controllerId": { - "title": "Controller ID", - "type": "string" + "identifier":"kafkaCluster", + "title":"Cluster", + "icon":"Kafka", + "schema":{ + "properties":{ + "controllerId":{ + "title":"Controller ID", + "type":"string" + } } - } - } + } }, { - "identifier": "kafkaBroker", - "title": "Broker", - "icon": "Kafka", - "schema": { - "properties": { - "address": { - "title": "Address", - "type": "string" - }, - "region": { - "title": "Region", - "type": "string" - }, - "version": { - "title": "Version", - "type": "string" - }, - "config": { - "title": "Config", - "type": "object" + "identifier":"kafkaBroker", + "title":"Broker", + "icon":"Kafka", + "schema":{ + "properties":{ + "address":{ + "title":"Address", + "type":"string" + }, + "region":{ + "title":"Region", + "type":"string" + }, + "version":{ + "title":"Version", + "type":"string" + }, + "config":{ + "title":"Config", + "type":"object" + } } - } - }, - "relations": { - "cluster": { - "target": "kafkaCluster", - "required": true, - "many": false - } - } + }, + "relations":{ + "cluster":{ + "target":"kafkaCluster", + "required":true, + "many":false + } + } }, { "identifier": "kafkaTopic", @@ -85,15 +85,70 @@ }, "relations": { "cluster": { - "target": "kafkaCluster", - "required": true, - "many": false - }, - "brokers": { - "target": "kafkaBroker", - "required": false, - "many": true - } - } + "target":"kafkaCluster", + "required":true, + "many":false + }, + "brokers":{ + "target":"kafkaBroker", + "required":false, + "many":true + } + } + }, + { + "identifier":"kafkaConsumerGroup", + "title":"Consumer Group", + "icon":"Kafka", + "schema":{ + "properties":{ + "state":{ + "title":"State", + "type":"string", + "description":"The current state of the consumer group." + }, + "members":{ + "title":"Members", + "type":"array", + "description":"List of members in the consumer group.", + "items":{ + "type":"string" + } + }, + "coordinator":{ + "title":"Coordinator", + "type":"number", + "description":"Broker ID of the coordinator for the consumer group." + }, + "partition_assignor":{ + "title":"Partition Assignor", + "type":"string", + "description":"Strategy used to assign partitions to consumers." + }, + "is_simple_consumer_group":{ + "title":"Is Simple Consumer Group", + "type":"boolean", + "description":"Indicates if the group is a simple consumer group." + }, + "authorized_operations":{ + "title":"Authorized Operations", + "type":"array", + "description":"List of operations authorized for the consumer group.", + "items":{ + "type":"string" + } + } + } + }, + "calculationProperties":{ + + }, + "relations":{ + "cluster":{ + "target":"kafkaCluster", + "required":true, + "many":false + } + } } ] diff --git a/integrations/kafka/.port/resources/port-app-config.yaml b/integrations/kafka/.port/resources/port-app-config.yaml index 1e81e0eeeb..053c493f7e 100644 --- a/integrations/kafka/.port/resources/port-app-config.yaml +++ b/integrations/kafka/.port/resources/port-app-config.yaml @@ -46,3 +46,21 @@ resources: relations: cluster: .cluster_name brokers: '[.cluster_name + "_" + (.partitions[].replicas[] | tostring)] | unique' + - kind: consumer_group + selector: + query: 'true' + port: + entity: + mappings: + identifier: .cluster_name + "_" + .group_id + title: .group_id + blueprint: '"kafkaConsumerGroup"' + properties: + state: .state + members: '[.members[].client_id]' + coordinator: .coordinator.id + partition_assignor: .partition_assignor + is_simple_consumer_group: .is_simple_consumer_group + authorized_operations: .authorized_operations + relations: + cluster: .cluster_name diff --git a/integrations/kafka/.port/spec.yaml b/integrations/kafka/.port/spec.yaml index b4b867fc60..a11d678b45 100644 --- a/integrations/kafka/.port/spec.yaml +++ b/integrations/kafka/.port/spec.yaml @@ -9,6 +9,7 @@ features: - kind: cluster - kind: broker - kind: topic + - kind: consumer_group saas: enabled: false configurations: diff --git a/integrations/kafka/CHANGELOG.md b/integrations/kafka/CHANGELOG.md index f2c6ba4e19..362000dce4 100644 --- a/integrations/kafka/CHANGELOG.md +++ b/integrations/kafka/CHANGELOG.md @@ -7,6 +7,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 +## 0.1.114 (2025-01-22) + + +### Improvements + +- Added support for ingesting consumer groups + + ## 0.1.113 (2025-01-22) diff --git a/integrations/kafka/examples/consumer_groups.entity.json b/integrations/kafka/examples/consumer_groups.entity.json new file mode 100644 index 0000000000..9b8c4c905a --- /dev/null +++ b/integrations/kafka/examples/consumer_groups.entity.json @@ -0,0 +1,36 @@ +{ + "blueprint": "kafkaConsumerGroup", + "identifier": "local-cluster_conduktor_gateway", + "createdAt": "2025-01-17T14:23:38.182Z", + "updatedBy": "", + "createdBy": "", + "icon": null, + "team": [], + "title": "conduktor_gateway", + "relations": { + "cluster": "local-cluster" + }, + "properties": { + "partition_assignor": "range", + "coordinator": null, + "authorized_operations": null, + "is_simple_consumer_group": false, + "members": [ + { + "assignment": { + "topic_partitions": [ + { + "partition": 0, + "topic": "_conduktor_gateway_license" + } + ] + }, + "host": "172.23.0.6", + "id": "conduktor-gateway_6969-707254dd-d7d6-4e34-9ac9-d868d5fd5f56", + "client_id": "conduktor-gateway_6969" + } + ], + "state": "STABLE" + }, + "updatedAt": "2025-01-17T14:23:38.182Z" +} \ No newline at end of file diff --git a/integrations/kafka/examples/consumer_groups.response.json b/integrations/kafka/examples/consumer_groups.response.json new file mode 100644 index 0000000000..61ff96ca62 --- /dev/null +++ b/integrations/kafka/examples/consumer_groups.response.json @@ -0,0 +1,24 @@ +{ + "group_id": "_confluent-ksql-default_query_CTAS_PURCHASE_PER_PRODUCT_0", + "state": "EMPTY", + "members": [ + { + "assignment": { + "topic_partitions": [ + { + "partition": 0, + "topic": "_conduktor_gateway_license" + } + ] + }, + "host": "172.23.0.6", + "id": "conduktor-gateway_6969-707254dd-d7d6-4e34-9ac9-d868d5fd5f56", + "client_id": "conduktor-gateway_6969" + } + ], + "cluster_name": "local-cluster", + "coordinator": 0, + "partition_assignor": "range", + "is_simple_consumer_group": false, + "authorized_operations": null +} \ No newline at end of file diff --git a/integrations/kafka/kafka_integration/client.py b/integrations/kafka/kafka_integration/client.py index 24d33d5206..3158fcaf2f 100644 --- a/integrations/kafka/kafka_integration/client.py +++ b/integrations/kafka/kafka_integration/client.py @@ -1,4 +1,5 @@ from typing import Any +from anyio import to_thread import confluent_kafka # type: ignore @@ -86,3 +87,59 @@ def describe_topics(self) -> list[dict[str, Any]]: logger.error(f"Failed to describe topic {topic_name}: {e}") raise e return result_topics + + async def describe_consumer_groups(self) -> list[dict[str, Any]]: + """Describe all consumer groups in the cluster.""" + result_groups: list[dict[str, Any]] = [] + + # List all consumer groups and wait for the future to complete + groups_metadata = await to_thread.run_sync( + self.kafka_admin_client.list_consumer_groups + ) + groups_result = await to_thread.run_sync(groups_metadata.result) + group_ids = [group.group_id for group in groups_result.valid] + + logger.info(f"Found {len(group_ids)} consumer groups") + if not group_ids: + return result_groups + + # Describe the consumer groups + groups_description = await to_thread.run_sync( + self.kafka_admin_client.describe_consumer_groups, group_ids + ) + + for group_id, future in groups_description.items(): + try: + group_info = await to_thread.run_sync(future.result) + members = [ + { + "id": member.member_id, + "client_id": member.client_id, + "host": member.host, + "assignment": { + "topic_partitions": [ + {"topic": tp.topic, "partition": tp.partition} + for tp in member.assignment.topic_partitions + ] + }, + } + for member in group_info.members + ] + + result_groups.append( + { + "group_id": group_id, + "state": group_info.state.name, + "members": members, + "cluster_name": self.cluster_name, + "coordinator": group_info.coordinator.id, + "partition_assignor": group_info.partition_assignor, + "is_simple_consumer_group": group_info.is_simple_consumer_group, + "authorized_operations": group_info.authorized_operations, + } + ) + except Exception as e: + logger.error(f"Failed to describe consumer group {group_id}: {e}") + raise e + + return result_groups diff --git a/integrations/kafka/main.py b/integrations/kafka/main.py index ff82f8d021..ae534cd59b 100644 --- a/integrations/kafka/main.py +++ b/integrations/kafka/main.py @@ -33,3 +33,10 @@ async def resync_topics(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: kafka_clients = init_clients() for kafka_client in kafka_clients: yield kafka_client.describe_topics() + + +@ocean.on_resync("consumer_group") +async def resync_consumer_groups(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: + kafka_clients = init_clients() + for kafka_client in kafka_clients: + yield await kafka_client.describe_consumer_groups() diff --git a/integrations/kafka/pyproject.toml b/integrations/kafka/pyproject.toml index 9cf7aabafc..f4387f6c3f 100644 --- a/integrations/kafka/pyproject.toml +++ b/integrations/kafka/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "kafka" -version = "0.1.113" +version = "0.1.114" description = "Integration to import information from a Kafka cluster into Port. The integration supports importing metadata regarding the Kafka cluster, brokers and topics." authors = ["Tal Sabag "] diff --git a/integrations/kafka/tests/seed_data.py b/integrations/kafka/tests/seed_data.py new file mode 100644 index 0000000000..d38da351cd --- /dev/null +++ b/integrations/kafka/tests/seed_data.py @@ -0,0 +1,200 @@ +""" +Kafka Load Testing Script + +This script creates a test environment with multiple topics, consumer groups, and active message production +to help test the Kafka integration under load. + +Setup: + 1. Ensure Kafka is running locally (default: localhost:9092) + 2. Install required packages: + pip install confluent-kafka + +Usage: + python integrations/kafka/tests/seed_data.py + +Configuration (modify constants at top of file): + - NUM_TOPICS: Number of topics to create (default: 50) + - PARTITIONS_PER_TOPIC: Partitions per topic (default: 3) + - NUM_CONSUMER_GROUPS: Number of consumer groups (default: 20) + - CONSUMERS_PER_GROUP: Number of consumers per group (default: 3) + - KAFKA_BOOTSTRAP_SERVERS: Kafka broker address (default: 'localhost:9092') + +The script will: + 1. Create multiple topics with specified partitions + 2. Start a producer continuously sending messages to random topics + 3. Create multiple consumer groups with multiple consumers each + 4. Process messages in each consumer group + +To stop: + Press CTRL+C for graceful shutdown + +Example: + # This will create: + # - 50 topics with 3 partitions each + # - 20 consumer groups with 3 consumers each (60 total consumers) + # - 1 producer sending messages to random topics + python integrations/kafka/tests/seed_data.py +""" + +import random +import string +from typing import List, Any, NoReturn +import time +import threading +import signal +import sys + +from confluent_kafka import Consumer, Producer, KafkaError # type: ignore +from confluent_kafka.admin import AdminClient, NewTopic # type: ignore + + +class KafkaLoadTest: + def __init__(self) -> None: + self.running = threading.Event() + self.producer_thread: threading.Thread | None = None + self.consumer_threads: List[threading.Thread] = [] + + # Set up signal handlers + signal.signal(signal.SIGINT, self.signal_handler) + signal.signal(signal.SIGTERM, self.signal_handler) + + def signal_handler(self, signum: int, frame: Any) -> NoReturn: + """Handle shutdown signals""" + print("\nShutting down...") + self.running.clear() + if self.producer_thread: + self.producer_thread.join() + for thread in self.consumer_threads: + thread.join() + sys.exit(0) + + def run(self) -> None: + """Main function to set up and run the Kafka load test""" + # Create topics + admin_config = {"bootstrap.servers": KAFKA_BOOTSTRAP_SERVERS} + admin_client = AdminClient(admin_config) + + # Generate topic names + topic_names = [f"load-test-topic-{i}" for i in range(NUM_TOPICS)] + create_topics(admin_client, topic_names) + + # Create running event for graceful shutdown + self.running.set() + + # Start producer thread + self.producer_thread = threading.Thread( + target=produce_messages, args=(topic_names, self.running) + ) + self.producer_thread.start() + + # Start consumer threads + for group_num in range(NUM_CONSUMER_GROUPS): + group_id = f"load-test-group-{group_num}" + for consumer_num in range(CONSUMERS_PER_GROUP): + thread = threading.Thread( + target=start_consumer, args=(group_id, topic_names, self.running) + ) + thread.start() + self.consumer_threads.append(thread) + + # Keep main thread alive + while True: + time.sleep(1) + + +# Configuration +KAFKA_BOOTSTRAP_SERVERS = "localhost:19092,localhost:9092,localhost:9093,localhost:9094" +NUM_TOPICS = 50 +PARTITIONS_PER_TOPIC = 3 +REPLICATION_FACTOR = 1 +NUM_CONSUMER_GROUPS = 20 +CONSUMERS_PER_GROUP = 3 + + +def create_topics(admin_client: AdminClient, topic_names: List[str]) -> None: + """Create multiple topics""" + new_topics = [ + NewTopic( + topic, + num_partitions=PARTITIONS_PER_TOPIC, + replication_factor=REPLICATION_FACTOR, + ) + for topic in topic_names + ] + + futures = admin_client.create_topics(new_topics) + for topic, future in futures.items(): + try: + future.result() + print(f"Topic {topic} created") + except Exception as e: + print(f"Failed to create topic {topic}: {e}") + + +def generate_random_string(length: int = 8) -> str: + """Generate a random string of fixed length""" + return "".join(random.choices(string.ascii_lowercase + string.digits, k=length)) + + +def delivery_report(err: Any, msg: Any) -> None: + """Callback for message delivery reports""" + if err is not None: + print(f"Message delivery failed: {err}") + + +def produce_messages(topic_names: List[str], running: threading.Event) -> None: + """Continuously produce messages to topics""" + producer_config = {"bootstrap.servers": KAFKA_BOOTSTRAP_SERVERS} + producer = Producer(producer_config) + + counter = 0 + while running.is_set(): + topic = random.choice(topic_names) + message = f"message-{counter}-{generate_random_string()}" + producer.produce(topic, message.encode("utf-8"), callback=delivery_report) + counter += 1 + + if counter % 1000 == 0: + print(f"Produced {counter} messages") + + producer.poll(0) + time.sleep(0.001) # Small delay to prevent overwhelming + + producer.flush() + + +def start_consumer( + group_id: str, topic_names: List[str], running: threading.Event +) -> None: + """Start a consumer in a consumer group""" + consumer_config = { + "bootstrap.servers": KAFKA_BOOTSTRAP_SERVERS, + "group.id": group_id, + "auto.offset.reset": "earliest", + } + + consumer = Consumer(consumer_config) + consumer.subscribe(topic_names) + + try: + while running.is_set(): + msg = consumer.poll(1.0) + if msg is None: + continue + if msg.error(): + if msg.error().code() == KafkaError._PARTITION_EOF: + continue + else: + print(f"Consumer error: {msg.error()}") + break + + # Process the message (in this case, we just continue) + continue + + finally: + consumer.close() + + +if __name__ == "__main__": + load_test = KafkaLoadTest() + load_test.run()