From 6b06374d14dad7e38047053291fab23f563e2098 Mon Sep 17 00:00:00 2001 From: NicholasTurner23 Date: Tue, 15 Oct 2024 11:43:57 +0300 Subject: [PATCH 1/5] Switch from deprecated python-kafka to python-kafka-ng and implement dynamic kafka producer --- .../airqo_etl_utils/message_broker_utils.py | 309 +++++++++++++++++- src/workflows/airqo_etl_utils/setup.py | 2 +- 2 files changed, 294 insertions(+), 17 deletions(-) diff --git a/src/workflows/airqo_etl_utils/message_broker_utils.py b/src/workflows/airqo_etl_utils/message_broker_utils.py index a7f60412bf..b8262839a1 100644 --- a/src/workflows/airqo_etl_utils/message_broker_utils.py +++ b/src/workflows/airqo_etl_utils/message_broker_utils.py @@ -1,41 +1,72 @@ import json +import logging +import sys import numpy as np import pandas as pd -from kafka import KafkaProducer +from kafka import KafkaProducer, KafkaConsumer from .config import configuration from airqo_etl_utils.airqo_api import AirQoApi from airqo_etl_utils.constants import Tenant from .date import date_to_str +from typing import Any, Generator, Optional, List + +logger = logging.getLogger(__name__) + class MessageBrokerUtils: + """ + A utility class for interacting with a Kafka message broker, including publishing data to + a topic and consuming messages from a topic. This class is designed for data ingestion + and retrieval using Kafka as a transport layer. + """ + + MAX_MESSAGE_SIZE = 1 * 1024 * 1024 + def __init__(self): + """ + Initialize the MessageBrokerUtils class with configuration settings. + """ self.__partitions = configuration.TOPIC_PARTITIONS self.__bootstrap_servers = configuration.BOOTSTRAP_SERVERS - self.__partitions = [0, 1, 2] - self.bam_measurements_topic = configuration.BAM_MEASUREMENTS_TOPIC + self.partition_loads = { + int(p): 0 for p in self.__partitions + } # Initialize partition loads def __get_partition(self, current_partition) -> int: + """Get next partition to load data into -- roundrobin""" current_partition = current_partition + 1 if current_partition in self.__partitions: return current_partition return self.__partitions[0] + def __get_least_loaded_partition(self) -> int: + """Select the least loaded partition.""" + return min(self.partition_loads, key=self.partition_loads.get) + @classmethod def __on_success(cls, record_metadata): - print("\nSuccessfully sent message") - print(f"Topic : {record_metadata.topic}") - print(f"Partition : {record_metadata.partition}") - print(f"Offset : {record_metadata.offset}") + logger.info( + f"Successfully sent message: Topic-{record_metadata.topic}, Partition-{record_metadata.partition}, Offset-{record_metadata.offset}" + ) @classmethod - def __on_error(cls, exception): - print("\nFailed to send message") - print(exception) + def __on_error(cls, record_metadata): + logger.exception( + f"Failed to send message: Topic-{record_metadata.topic}, Partition-{record_metadata.partition}, Offset-{record_metadata.offset}" + ) def __send_data(self, topic: str, data: pd.DataFrame, partition: int = None): + """ + Send data in chunks to a specified Kafka topic. + + Args: + topic: Kafka topic to send the data to. + data: Pandas DataFrame containing the data to send. + partition: Optional partition to which the message should be sent. + """ data.to_csv("message_broker_data.csv", index=False) producer = KafkaProducer( bootstrap_servers=self.__bootstrap_servers, @@ -63,7 +94,7 @@ def __send_data(self, topic: str, data: pd.DataFrame, partition: int = None): else self.__get_partition(current_partition=current_partition) ) kafka_message = json.dumps(message, allow_nan=True).encode("utf-8") - print(kafka_message) + producer.send( topic=topic, value=kafka_message, @@ -73,15 +104,18 @@ def __send_data(self, topic: str, data: pd.DataFrame, partition: int = None): @staticmethod def update_hourly_data_topic(data: pd.DataFrame): """ - Add extra devices meta-data such as the device names, latitude and longitude e.t.c to the data. + Update the hourly data topic with additional device metadata, + including device names, latitude, longitude, and tenant information. + + Args: + data(pandas.DataFrame): The Pandas DataFrame to update with device metadata. """ devices = AirQoApi().get_devices(tenant=Tenant.ALL) devices = pd.DataFrame(devices) devices = devices[ [ - "mongo_id", "tenant", - "name", + "device_id", "device_number", "site_id", "latitude", @@ -90,8 +124,8 @@ def update_hourly_data_topic(data: pd.DataFrame): ] devices.rename( columns={ - "name": "device_name", - "mongo_id": "device_id", + "device_id": "device_name", + "_id": "device_id", "latitude": "device_latitude", "longitude": "device_longitude", }, @@ -126,3 +160,246 @@ def update_hourly_data_topic(data: pd.DataFrame): MessageBrokerUtils().__send_data( topic=configuration.HOURLY_MEASUREMENTS_TOPIC, data=data ) + + @staticmethod + def update_hourly_data_topic_(data: pd.DataFrame) -> None: + """ + Update the hourly data topic with additional device metadata, + including device names, latitude, longitude, and tenant information. + + Args: + data (pandas.DataFrame): The Pandas DataFrame to update with device metadata. + """ + # Fetch and prepare device data + devices = AirQoApi().get_devices(tenant=Tenant.ALL) + devices = pd.DataFrame(devices) + devices = devices[ + [ + "tenant", + "device_id", + "device_number", + "site_id", + "latitude", + "longitude", + ] + ] + + devices.rename( + columns={ + "device_id": "device_name", + "_id": "device_id", + "latitude": "device_latitude", + "longitude": "device_longitude", + }, + inplace=True, + ) + + data.rename( + columns={ + "device_id": "device_name", + }, + inplace=True, + ) + + data.drop(columns=["device_number"], inplace=True, errors="ignore") + + data = pd.concat( + [ + data.set_index(["device_name", "site_id", "tenant"]), + devices.set_index(["device_name", "site_id", "tenant"]), + ], + axis=1, + ).reset_index() + + data.rename(columns={"tenant": "network"}, inplace=True) + data["tenant"] = str(Tenant.AIRQO) + data["timestamp"] = pd.to_datetime(data["timestamp"]) + data["timestamp"] = data["timestamp"].apply(date_to_str) + + MessageBrokerUtils().__send_data( + topic=configuration.HOURLY_MEASUREMENTS_TOPIC, data=data + ) + + def _generate_chunks( + self, dataframe_list: List[dict] + ) -> Generator[List[dict], None, None]: + """ + Generator that yields chunks of data that fit within the MAX_MESSAGE_SIZE. + + Args: + dataframe_list: List of dictionaries representing data records. + + yield: + Chunked data fitting within 1MB size. + """ + chunk = [] + size = 0 + for row in dataframe_list: + row_size = sys.getsizeof(json.dumps(row)) + if size + row_size > self.MAX_MESSAGE_SIZE: + yield chunk + chunk = [] + size = 0 + chunk.append(row) + size += row_size + + if chunk: # yield the last chunk + yield chunk + + def publish_to_topic( + self, + topic: str, + data: pd.DataFrame, + partition: int = None, + column_key: str = None, + ): + """ + Publish data to a specified Kafka topic. If `column_key` is provided, each row's + key will be extracted from the specified column, and the rest of the row data will + be the message. Otherwise, the data will be split into chunks and published + across available partitions. + + Args: + topic: The Kafka topic to publish data to. + data: The data (as a Pandas DataFrame) to be published. + partition: Optionally specify the partition to publish to. If None, partitions are handled based on load. + column_key: Optional column name to be used as the message key. If None, data is chunked and sent without keys. + """ + producer = KafkaProducer( + bootstrap_servers="127.0.0.1:9094", + api_version=(3, 6, 0), + request_timeout_ms=300000, + key_serializer=lambda v: json.dumps(v).encode("utf-8"), + value_serializer=lambda v: json.dumps(v, allow_nan=True).encode("utf-8"), + ) + + logger.info(f"Preparing to publish data to topic: {topic}") + data.replace(np.nan, None, inplace=True) + dataframe_list = data.to_dict("records") + + if column_key: + logger.info(f"Using '{column_key}' as the key for messages") + for row in dataframe_list: + key = row.pop(column_key, None) + if key is None: + logger.warning( + f"No key found for column '{column_key}' in row: {row}" + ) + continue + message = json.dumps(row, allow_nan=True) + message = {key: json.dumps(row, allow_nan=True)} + + partition = self.__get_least_loaded_partition() + try: + producer.send( + topic=topic, key=key, value=message, partition=partition + ).add_callback(self.__on_success).add_errback(self.__on_error) + self.partition_loads[partition] += 1 + except Exception as e: + logger.exception( + f"Error while sending message to topic {topic} with key {key}: {e}" + ) + else: + logger.info("No key provided, splitting data into chunks and publishing") + for chunk_data in self._generate_chunks(dataframe_list): + message = {"data": chunk_data} + + partition = self.__get_least_loaded_partition() + try: + producer.send( + topic=topic, value=message, partition=partition + ).add_callback(self.__on_success).add_errback(self.__on_error) + self.partition_loads[partition] += 1 + except Exception as e: + logger.exception( + f"Error while sending message to topic {topic}: {e}" + ) + producer.flush() + + def consume_from_topic( + self, + topic: str, + group_id: str, + auto_offset_reset: str = "latest", + max_messages: Optional[int] = None, + from_beginning: bool = False, + offset: Optional[int] = None, + key_deserializer: Optional[bool] = False, + ) -> Any: + """ + Consume messages from a Kafka topic and return them. + + Args: + topic: The Kafka topic to consume from. + group_id: The consumer group ID. + auto_offset_reset: Determines where to start reading when there's no valid offset. Default is 'latest'. + max_messages: Limit on the number of messages to consume. If None, consume all available messages. + from_beginning: Whether to start consuming from the beginning of the topic. + offset: Start consuming from a specific offset if provided. + key_deserializer: Whether to deserialize the key (e.g., for device_id). Default is False. + + Return: + A generator that yields consumed messages from the topic. If there are no more messages, a 'No more messages' flag is returned. + """ + consumer = KafkaConsumer( + topic, + bootstrap_servers="127.0.0.1:9094", + auto_offset_reset=auto_offset_reset, + enable_auto_commit=True, + group_id=group_id, + value_deserializer=lambda x: json.loads(x.decode("utf-8")), + key_deserializer=( + lambda x: x.decode("utf-8") if key_deserializer else None + ), + ) + + logger.info(f"Starting to consume messages from topic: {topic}") + + # Wait for partition assignment + # import time + # max_attempts = 10 + # attempts = 0 + # while not consumer.assignment() and attempts < max_attempts: + # logger.info(f"Waiting for partition assignment... attempt {attempts + 1}") + # consumer.poll(timeout_ms=100) # This triggers partition assignment + # time.sleep(1) # Add a short sleep to avoid busy waiting + # attempts += 1 + + # if not consumer.assignment(): + # logger.error("Failed to assign partitions after several attempts.") + # else: + # logger.info(f"Partitions assigned: {consumer.assignment()}") + + # if from_beginning: + # consumer.seek_to_beginning() + + if offset is not None: + for partition in consumer.assignment(): + consumer.seek(partition, offset) + + message_count = 0 + + try: + for message in consumer: + key = message.key + value = message.value + + logger.info( + f"Consumed message from topic {message.topic}, " + f"partition {message.partition}, offset {message.offset}, key: {key}" + ) + + yield {"key": key, "value": value} + + message_count += 1 + + # If a max message limit is set, stop when it's reached + if max_messages and message_count >= max_messages: + logger.info(f"Reached max message limit: {max_messages}") + break + except Exception as e: + logger.error(f"Error while consuming messages from topic {topic}: {e}") + finally: + consumer.close() + logger.info(f"No more messages to consume from topic: {topic}") + yield {"no_more_messages": True} diff --git a/src/workflows/airqo_etl_utils/setup.py b/src/workflows/airqo_etl_utils/setup.py index 71e5567775..bf058f1c92 100644 --- a/src/workflows/airqo_etl_utils/setup.py +++ b/src/workflows/airqo_etl_utils/setup.py @@ -17,7 +17,6 @@ "pandas", "requests", "simplejson", - "kafka-python", "numpy", "pyarrow", "python-dotenv", @@ -34,6 +33,7 @@ "great_expectations===0.18.18", "airflow-provider-great-expectations==0.2.8", "sqlalchemy-bigquery==1.11.0", + "kafka-python-ng", ], keywords=["python", "airflow", "AirQo"], license="MIT", From 8816fa6543951322798d935b983a7d1a3fcdef1d Mon Sep 17 00:00:00 2001 From: NicholasTurner23 Date: Fri, 18 Oct 2024 00:01:31 +0300 Subject: [PATCH 2/5] Add re-usable kafka consumer and update producer to both use the confluent-kafka package --- .../airqo_etl_utils/message_broker_utils.py | 239 +++++++++++------- 1 file changed, 146 insertions(+), 93 deletions(-) diff --git a/src/workflows/airqo_etl_utils/message_broker_utils.py b/src/workflows/airqo_etl_utils/message_broker_utils.py index b8262839a1..b2b2bc95cd 100644 --- a/src/workflows/airqo_etl_utils/message_broker_utils.py +++ b/src/workflows/airqo_etl_utils/message_broker_utils.py @@ -4,7 +4,8 @@ import numpy as np import pandas as pd -from kafka import KafkaProducer, KafkaConsumer +from confluent_kafka import Producer, Consumer, TopicPartition +import time from .config import configuration from airqo_etl_utils.airqo_api import AirQoApi @@ -34,6 +35,9 @@ def __init__(self): self.partition_loads = { int(p): 0 for p in self.__partitions } # Initialize partition loads + self.conf = { + "bootstrap.servers": "localhost:9094", + } def __get_partition(self, current_partition) -> int: """Get next partition to load data into -- roundrobin""" @@ -68,7 +72,7 @@ def __send_data(self, topic: str, data: pd.DataFrame, partition: int = None): partition: Optional partition to which the message should be sent. """ data.to_csv("message_broker_data.csv", index=False) - producer = KafkaProducer( + producer = Producer( bootstrap_servers=self.__bootstrap_servers, api_version_auto_timeout_ms=300000, retries=5, @@ -220,6 +224,24 @@ def update_hourly_data_topic_(data: pd.DataFrame) -> None: topic=configuration.HOURLY_MEASUREMENTS_TOPIC, data=data ) + def __on_delivery(self, err, msg): + """ + Delivery callback for Kafka message send operations. + + Args: + err: Error information if the message failed delivery, otherwise None. + msg: The message that was sent. + + Logs: + Success or failure of the message delivery. + """ + if err is not None: + logger.error(f"Message delivery failed: {err}") + else: + logger.info( + f"Message delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}" + ) + def _generate_chunks( self, dataframe_list: List[dict] ) -> Generator[List[dict], None, None]: @@ -246,32 +268,59 @@ def _generate_chunks( if chunk: # yield the last chunk yield chunk + def _send_message( + self, producer: Producer, topic: str, key: Any, message: str, partition=None + ) -> None: + """ + Sends a message to the specified Kafka topic using the provided producer. + + Args: + producer: The Kafka Producer instance used to send the message. + topic: The Kafka topic to send the message to. + key: The key of the message (can be None if no key is needed). + message: The message to send, typically serialized to JSON. + partition: Optionally specify the partition to send the message to. If None, Kafka will decide the partition. + + Raises: + Exception: If an error occurs while sending the message, the exception is logged. + """ + try: + if partition is not None: + producer.produce( + topic=topic, + key=key, + value=message, + partition=partition, + on_delivery=self.__on_delivery, + ) + else: + producer.produce( + topic=topic, key=key, value=message, on_delivery=self.__on_delivery + ) + except Exception as e: + logger.exception(f"Error while sending message to topic {topic}: {e}") + def publish_to_topic( self, topic: str, data: pd.DataFrame, - partition: int = None, column_key: str = None, + auto_partition: bool = True, ): """ - Publish data to a specified Kafka topic. If `column_key` is provided, each row's - key will be extracted from the specified column, and the rest of the row data will - be the message. Otherwise, the data will be split into chunks and published - across available partitions. + Publishes data to a Kafka topic. If a `column_key` is provided, each row's key will be + extracted from the specified column, otherwise data is split into chunks and sent without keys. Args: topic: The Kafka topic to publish data to. - data: The data (as a Pandas DataFrame) to be published. - partition: Optionally specify the partition to publish to. If None, partitions are handled based on load. + data: A Pandas DataFrame containing the data to be published. column_key: Optional column name to be used as the message key. If None, data is chunked and sent without keys. + auto_partition: If True, Kafka will automatically select the partition. If False, partitions are selected manually based on load. + + Raises: + Exception: If an error occurs while sending a message, the exception is logged. """ - producer = KafkaProducer( - bootstrap_servers="127.0.0.1:9094", - api_version=(3, 6, 0), - request_timeout_ms=300000, - key_serializer=lambda v: json.dumps(v).encode("utf-8"), - value_serializer=lambda v: json.dumps(v, allow_nan=True).encode("utf-8"), - ) + producer = Producer(self.conf) logger.info(f"Preparing to publish data to topic: {topic}") data.replace(np.nan, None, inplace=True) @@ -286,35 +335,30 @@ def publish_to_topic( f"No key found for column '{column_key}' in row: {row}" ) continue - message = json.dumps(row, allow_nan=True) - message = {key: json.dumps(row, allow_nan=True)} - - partition = self.__get_least_loaded_partition() - try: - producer.send( - topic=topic, key=key, value=message, partition=partition - ).add_callback(self.__on_success).add_errback(self.__on_error) - self.partition_loads[partition] += 1 - except Exception as e: - logger.exception( - f"Error while sending message to topic {topic} with key {key}: {e}" - ) + message = json.dumps(row, allow_nan=True).encode("utf-8") + + selected_partition = ( + None if auto_partition else self.__get_least_loaded_partition() + ) + self._send_message(producer, topic, key, message, selected_partition) + + if not auto_partition: + self.partition_loads[selected_partition] += 1 + else: logger.info("No key provided, splitting data into chunks and publishing") for chunk_data in self._generate_chunks(dataframe_list): - message = {"data": chunk_data} - - partition = self.__get_least_loaded_partition() - try: - producer.send( - topic=topic, value=message, partition=partition - ).add_callback(self.__on_success).add_errback(self.__on_error) - self.partition_loads[partition] += 1 - except Exception as e: - logger.exception( - f"Error while sending message to topic {topic}: {e}" - ) - producer.flush() + message = json.dumps({"data": chunk_data}).encode("utf-8") + + selected_partition = ( + None if auto_partition else self.__get_least_loaded_partition() + ) + self._send_message(producer, topic, None, message, selected_partition) + + if not auto_partition: + self.partition_loads[selected_partition] += 1 + + producer.close() def consume_from_topic( self, @@ -322,9 +366,11 @@ def consume_from_topic( group_id: str, auto_offset_reset: str = "latest", max_messages: Optional[int] = None, + auto_commit: bool = True, from_beginning: bool = False, offset: Optional[int] = None, - key_deserializer: Optional[bool] = False, + wait_time_sec: int = 30, + streaming: bool = False, ) -> Any: """ Consume messages from a Kafka topic and return them. @@ -334,72 +380,79 @@ def consume_from_topic( group_id: The consumer group ID. auto_offset_reset: Determines where to start reading when there's no valid offset. Default is 'latest'. max_messages: Limit on the number of messages to consume. If None, consume all available messages. + auto_commit: Whether to auto-commit offsets. from_beginning: Whether to start consuming from the beginning of the topic. offset: Start consuming from a specific offset if provided. - key_deserializer: Whether to deserialize the key (e.g., for device_id). Default is False. + wait_time_sec: How long to wait for messages (useful for one-time data requests). + streaming: If True, run as a continuous streaming job. Return: - A generator that yields consumed messages from the topic. If there are no more messages, a 'No more messages' flag is returned. + A generator that yields consumed messages from the topic. """ - consumer = KafkaConsumer( - topic, - bootstrap_servers="127.0.0.1:9094", - auto_offset_reset=auto_offset_reset, - enable_auto_commit=True, - group_id=group_id, - value_deserializer=lambda x: json.loads(x.decode("utf-8")), - key_deserializer=( - lambda x: x.decode("utf-8") if key_deserializer else None - ), - ) - - logger.info(f"Starting to consume messages from topic: {topic}") - - # Wait for partition assignment - # import time - # max_attempts = 10 - # attempts = 0 - # while not consumer.assignment() and attempts < max_attempts: - # logger.info(f"Waiting for partition assignment... attempt {attempts + 1}") - # consumer.poll(timeout_ms=100) # This triggers partition assignment - # time.sleep(1) # Add a short sleep to avoid busy waiting - # attempts += 1 - # if not consumer.assignment(): - # logger.error("Failed to assign partitions after several attempts.") - # else: - # logger.info(f"Partitions assigned: {consumer.assignment()}") + self.conf.update( + { + "group.id": group_id, + "auto.offset.reset": auto_offset_reset, + "enable.auto.commit": "true" if auto_commit else "false", + } + ) - # if from_beginning: - # consumer.seek_to_beginning() + consumer = Consumer(self.conf) + consumer.subscribe([topic]) + + assigned = False + while not assigned and wait_time_sec > 0: + logger.info("Waiting for partition assignment...") + msg = consumer.poll(timeout=1.0) + if msg is not None and msg.error() is None: + assigned = True + wait_time_sec -= 1 + + if from_beginning: + logger.info("Seeking to the beginning of all partitions...") + partitions = [ + TopicPartition(topic, p.partition, offset=0) + for p in consumer.assignment() + ] + consumer.assign(partitions) + for partition in partitions: + consumer.seek(partition) if offset is not None: - for partition in consumer.assignment(): - consumer.seek(partition, offset) + logger.info(f"Seeking to offset {offset} for all partitions...") + partitions = [ + TopicPartition(topic, p.partition, p.offset) + for p in consumer.assignment() + ] + consumer.assign(partitions) message_count = 0 - try: - for message in consumer: - key = message.key - value = message.value - - logger.info( - f"Consumed message from topic {message.topic}, " - f"partition {message.partition}, offset {message.offset}, key: {key}" - ) + while streaming or (message_count < max_messages if max_messages else True): + msg = consumer.poll(timeout=1.0) - yield {"key": key, "value": value} + if msg is None: + logger.info("No messages in this poll.") + if not streaming: + break + continue + if msg.error(): + logger.exception(f"Consumer error: {msg.error()}") + continue + msg_value = msg.value().decode("utf-8") + yield { + "key": msg.key().decode("utf-8"), + "value": msg_value, + } if msg.key() else {"value": msg_value} message_count += 1 - # If a max message limit is set, stop when it's reached - if max_messages and message_count >= max_messages: - logger.info(f"Reached max message limit: {max_messages}") - break except Exception as e: - logger.error(f"Error while consuming messages from topic {topic}: {e}") + logger.exception(f"Error while consuming messages from topic {topic}: {e}") + finally: consumer.close() - logger.info(f"No more messages to consume from topic: {topic}") - yield {"no_more_messages": True} + logger.info( + f"Closed consumer. No more messages to consume from topic: {topic}" + ) From e9973304c383e35d2ffdb72aa6e61951e0f9af97 Mon Sep 17 00:00:00 2001 From: NicholasTurner23 Date: Fri, 18 Oct 2024 00:02:45 +0300 Subject: [PATCH 3/5] Add confluent-kafka and lz4 compression --- src/workflows/airqo_etl_utils/setup.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/workflows/airqo_etl_utils/setup.py b/src/workflows/airqo_etl_utils/setup.py index bf058f1c92..ba29280f31 100644 --- a/src/workflows/airqo_etl_utils/setup.py +++ b/src/workflows/airqo_etl_utils/setup.py @@ -33,7 +33,8 @@ "great_expectations===0.18.18", "airflow-provider-great-expectations==0.2.8", "sqlalchemy-bigquery==1.11.0", - "kafka-python-ng", + "confluent-kafka==2.5.0", + "lz4==4.3.3", ], keywords=["python", "airflow", "AirQo"], license="MIT", From 862a18cd9189cc984603479e1dd11ef39713651a Mon Sep 17 00:00:00 2001 From: NicholasTurner23 Date: Fri, 18 Oct 2024 00:25:56 +0300 Subject: [PATCH 4/5] Clean up --- src/workflows/airqo_etl_utils/message_broker_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/workflows/airqo_etl_utils/message_broker_utils.py b/src/workflows/airqo_etl_utils/message_broker_utils.py index 2589a0f153..5a1ca622c4 100644 --- a/src/workflows/airqo_etl_utils/message_broker_utils.py +++ b/src/workflows/airqo_etl_utils/message_broker_utils.py @@ -36,7 +36,7 @@ def __init__(self): int(p): 0 for p in self.__partitions } # Initialize partition loads self.conf = { - "bootstrap.servers": "localhost:9094", + "bootstrap.servers": self.__bootstrap_servers, } def __get_partition(self, current_partition) -> int: From 9c732be6daf398641a3802c7dc69d18e9dfb2b9b Mon Sep 17 00:00:00 2001 From: NicholasTurner23 Date: Fri, 18 Oct 2024 00:37:24 +0300 Subject: [PATCH 5/5] Clean up --- .../airqo_etl_utils/message_broker.py | 77 ------------------- 1 file changed, 77 deletions(-) delete mode 100644 src/workflows/airqo_etl_utils/message_broker.py diff --git a/src/workflows/airqo_etl_utils/message_broker.py b/src/workflows/airqo_etl_utils/message_broker.py deleted file mode 100644 index f3329802e5..0000000000 --- a/src/workflows/airqo_etl_utils/message_broker.py +++ /dev/null @@ -1,77 +0,0 @@ -import simplejson -from kafka import KafkaProducer - -from .config import configuration - - -class KafkaBrokerClient: - def __init__(self): - self.__partitions = configuration.TOPIC_PARTITIONS - self.__bootstrap_servers = configuration.BOOTSTRAP_SERVERS - self.__partitions = [0, 1, 2] - self.bam_measurements_topic = configuration.BAM_MEASUREMENTS_TOPIC - # self.__schema_registry_url = configuration.SCHEMA_REGISTRY_URL - # self.__registry_client = SchemaRegistry( - # self.__schema_registry_url, - # headers={"Content-Type": "application/vnd.schemaregistry.v1+json"}, - # ) - - def get_partition(self, current_partition) -> int: - current_partition = current_partition + 1 - if current_partition in self.__partitions: - return current_partition - return self.__partitions[0] - - @staticmethod - def on_success(record_metadata): - print("\nSuccessfully sent message") - print(f"Topic : {record_metadata.topic}") - print(f"Partition : {record_metadata.partition}") - print(f"Offset : {record_metadata.offset}") - - @staticmethod - def on_error(exception): - print("\nFailed to send message") - print(exception) - - def send_data(self, topic: str, data: list, partition: int = None): - # avro_serde = AvroKeyValueSerde(self.__registry_client, self.__output_topic) - # bytes_data = avro_serde.value.serialize(measurements, schema_str) - producer = KafkaProducer( - bootstrap_servers=self.__bootstrap_servers, - api_version_auto_timeout_ms=300000, - retries=5, - request_timeout_ms=300000, - ) - - if len(data) > 50: - current_partition = -1 - for i in range(0, len(data), 50): - range_data = data[i : i + 50] - - message = {"data": range_data} - - current_partition = ( - partition - if partition or partition == 0 - else self.get_partition(current_partition=current_partition) - ) - - producer.send( - topic=topic, - value=simplejson.dumps(message, ignore_nan=True).encode("utf-8"), - partition=current_partition, - ).add_callback(self.on_success).add_errback(self.on_error) - - else: - value = simplejson.dumps(data, ignore_nan=True).encode("utf-8") - if partition: - producer.send( - topic=topic, - value=value, - partition=partition, - ).add_callback(self.on_success).add_errback(self.on_error) - else: - producer.send(topic=topic, value=value).add_callback( - self.on_success - ).add_errback(self.on_error)