Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update/kafka implementations [WIP] #3683

Merged

Conversation

NicholasTurner23
Copy link
Contributor

@NicholasTurner23 NicholasTurner23 commented Oct 17, 2024

Description

This PR add dynamic kafka producer and consumer to be used in the kafka integration.

Related Issues

  • JIRA cards:
    • OPS-291

Summary by CodeRabbit

  • New Features

    • Enhanced message delivery and error handling for Kafka interactions.
    • Introduced a method for selecting the least loaded partition for message sending.
    • Added functionality to update hourly data topics with device metadata.
    • Improved message publishing with optional keys and automatic partition selection.
  • Bug Fixes

    • Improved logging for message delivery success and error handling.
  • Chores

    • Updated package dependencies by removing kafka-python and adding confluent-kafka and lz4.

Copy link
Contributor

coderabbitai bot commented Oct 17, 2024

📝 Walkthrough
📝 Walkthrough

Walkthrough

The changes involve significant enhancements to the MessageBrokerUtils class for improved Kafka interaction, including new methods and updated logging practices. The setup.py file has been modified to remove the kafka-python dependency and introduce confluent-kafka and lz4. These updates aim to streamline message handling and improve package dependencies.

Changes

File Path Change Summary
src/workflows/airqo_etl_utils/message_broker_utils.py - Enhanced MessageBrokerUtils class for Kafka interaction.
- Added __get_least_loaded_partition method.
- Updated __send_data and publish_to_topic method signatures.
- Introduced new methods: update_hourly_data_topic_, __on_delivery, _generate_chunks, _send_message.
- Improved logging in delivery success/error handling methods.
- Updated consume_from_topic with additional parameters.
src/workflows/airqo_etl_utils/setup.py - Removed kafka-python dependency.
- Added confluent-kafka==2.5.0 and lz4==4.3.3 dependencies.

Suggested reviewers

  • Benjamin Ssempala
  • Baalmart

🎉 In the land of code where changes flow,
New methods sprout, and dependencies grow.
Kafka's charm now shines so bright,
With logging updates, all feels right!
So here's to the tweaks, both big and small,
In the world of ETL, we’re standing tall! 🎈


📜 Recent review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Files that changed from the base of the PR and between 862a18c and 9c732be.

📒 Files selected for processing (1)
  • src/workflows/airqo_etl_utils/message_broker.py (0 hunks)
💤 Files with no reviewable changes (1)
  • src/workflows/airqo_etl_utils/message_broker.py

Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

🧹 Outside diff range and nitpick comments (1)
src/workflows/airqo_etl_utils/message_broker_utils.py (1)

Line range hint 75-101: Update Producer initialization and message sending for confluent_kafka

In the __send_data method, the initialization of the Producer and the message sending logic are using syntax from kafka-python, not confluent_kafka. This can lead to runtime errors.

Specific issues:

  • The Producer from confluent_kafka should be initialized with a configuration dictionary.
  • The producer.send() method and add_callback/add_errback are not available in confluent_kafka. Instead, use producer.produce() with the callback parameter.
  • Ensure to call producer.flush() after sending messages to ensure they're delivered.

Apply this diff to correct the Producer usage:

             data.to_csv("message_broker_data.csv", index=False)
-            producer = Producer(
-                bootstrap_servers=self.__bootstrap_servers,
-                api_version_auto_timeout_ms=300000,
-                retries=5,
-                request_timeout_ms=300000,
-            )
+            producer = Producer(self.conf)

             print("Dataframe info : ")
             print(data.info())
             print("Dataframe description : ")
             print(data.describe())

             chunks = int(len(data) / 50)
             chunks = chunks if chunks > 0 else 1
             dataframes = np.array_split(data, chunks)
             current_partition = -1
             for dataframe in dataframes:
                 dataframe = pd.DataFrame(dataframe).replace(np.nan, None)
                 message = {"data": dataframe.to_dict("records")}

                 current_partition = (
                     partition
                     if partition or partition == 0
                     else self.__get_partition(current_partition=current_partition)
                 )
                 kafka_message = json.dumps(message, allow_nan=True).encode("utf-8")

-                producer.send(
+                producer.produce(
                     topic=topic,
                     value=kafka_message,
                     partition=current_partition,
-                ).add_callback(self.__on_success).add_errback(self.__on_error)
+                    callback=self.__on_delivery,
+                )
+
+            producer.flush()

This ensures the Producer is properly initialized and messages are sent correctly using confluent_kafka.

🧰 Tools
🪛 Ruff

8-8: time imported but unused

Remove unused import: time

(F401)

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Files that changed from the base of the PR and between 1144763 and 862a18c.

📒 Files selected for processing (2)
  • src/workflows/airqo_etl_utils/message_broker_utils.py (3 hunks)
  • src/workflows/airqo_etl_utils/setup.py (1 hunks)
🧰 Additional context used
🪛 Ruff
src/workflows/airqo_etl_utils/message_broker_utils.py

8-8: time imported but unused

Remove unused import: time

(F401)

🔇 Additional comments (1)
src/workflows/airqo_etl_utils/message_broker_utils.py (1)

227-244: Confirm proper logging in __on_delivery callback

The __on_delivery method correctly handles success and error scenarios using the logging framework. This enhances visibility into message delivery outcomes.

Comment on lines +36 to +37
"confluent-kafka==2.5.0",
"lz4==4.3.3",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Incomplete Removal of kafka-python Detected

It appears that kafka-python is still being utilized in several parts of the codebase:

  • src/workflows/airqo_etl_utils/message_broker.py:from kafka import KafkaProducer
  • src/kafka-connectors/bigquery-connector/message_broker.py:from kafka import KafkaConsumer
  • src/meta-data/message-broker.py:from kafka import KafkaConsumer
  • src/gp-model/message-broker.py:from kafka import KafkaConsumer

While confluent-kafka has been introduced:

  • src/workflows/airqo_etl_utils/message_broker_utils.py:from confluent_kafka import Producer, Consumer, TopicPartition

Recommendations:

  1. Update Imports: Replace all instances of kafka-python imports with confluent-kafka equivalents in the listed files.
  2. Remove Dependency: Ensure that kafka-python is removed from the install_requires section in setup.py to prevent potential conflicts.
  3. Refactor Code: Verify that the Kafka-related code aligns with the confluent-kafka library's API, making necessary adjustments for compatibility.
  4. Add Documentation: Consider updating comments or documentation to reflect the transition to confluent-kafka for future maintainers.
🔗 Analysis chain

Excellent choice of Kafka client upgrade, but let's ensure compatibility.

The addition of confluent-kafka==2.5.0 and lz4==4.3.3 is a positive step towards improving your Kafka integration. The confluent-kafka library offers better performance and more features compared to kafka-python. The inclusion of lz4 suggests you're planning to implement message compression, which is great for optimizing network bandwidth and storage.

However, I have a few points to consider:

  1. The removal of kafka-python isn't visible in this diff. Was it removed in a separate commit? If not, you might want to explicitly remove it to avoid potential conflicts.

  2. The switch from kafka-python to confluent-kafka likely requires changes in your Kafka-related code. Have all relevant parts of the codebase been updated to use the new library?

  3. Consider adding a comment explaining the purpose of lz4 for future maintainers.

Let's verify the Kafka client usage in the codebase:

This will help ensure that the transition to the new Kafka client is complete across the codebase.


Need to Correct the File Type in the Verification Script

It appears there was an issue with the previous script regarding the file type specification. Let's adjust the script to use the correct file type identifier for Python. Please execute the following updated script to verify the usage of kafka-python and confluent-kafka in the codebase:

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check for any remaining usage of kafka-python and new usage of confluent-kafka

echo "Checking for kafka-python usage:"
rg --type python 'from\s+kafka\s+import' || echo "No kafka-python usage found."

echo -e "\nChecking for confluent-kafka usage:"
rg --type python 'from\s+confluent_kafka\s+import' || echo "No confluent-kafka usage found."

Length of output: 459


Script:

#!/bin/bash
# Description: Check for any remaining usage of kafka-python and new usage of confluent-kafka

echo "Checking for kafka-python usage:"
rg --type py 'from\s+kafka\s+import' || echo "No kafka-python usage found."

echo -e "\nChecking for confluent-kafka usage:"
rg --type py 'from\s+confluent_kafka\s+import' || echo "No confluent-kafka usage found."

Length of output: 663


import numpy as np
import pandas as pd
from kafka import KafkaProducer
from confluent_kafka import Producer, Consumer, TopicPartition
import time
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Remove unused import time

The time module is imported but not used anywhere in the code. Removing unused imports helps keep the code clean and maintainable.

Apply this diff to remove the unused import:

 import json
 import logging
 import sys

 import numpy as np
 import pandas as pd
-from confluent_kafka import Producer, Consumer, TopicPartition
-import time
+from confluent_kafka import Producer, Consumer, TopicPartition

 from .config import configuration

Committable suggestion was skipped due to low confidence.

🧰 Tools
🪛 Ruff

8-8: time imported but unused

Remove unused import: time

(F401)

Comment on lines +271 to +302
def _send_message(
self, producer: Producer, topic: str, key: Any, message: str, partition=None
) -> None:
"""
Sends a message to the specified Kafka topic using the provided producer.

Args:
producer: The Kafka Producer instance used to send the message.
topic: The Kafka topic to send the message to.
key: The key of the message (can be None if no key is needed).
message: The message to send, typically serialized to JSON.
partition: Optionally specify the partition to send the message to. If None, Kafka will decide the partition.

Raises:
Exception: If an error occurs while sending the message, the exception is logged.
"""
try:
if partition is not None:
producer.produce(
topic=topic,
key=key,
value=message,
partition=partition,
on_delivery=self.__on_delivery,
)
else:
producer.produce(
topic=topic, key=key, value=message, on_delivery=self.__on_delivery
)
except Exception as e:
logger.exception(f"Error while sending message to topic {topic}: {e}")

Copy link
Contributor

@coderabbitai coderabbitai bot Oct 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ensure messages are flushed in _send_message method

After producing messages with producer.produce(), it's important to call producer.poll(0) to serve the delivery callback queue, and producer.flush() after all messages are produced to ensure they're delivered.

Apply this diff to flush messages:

             except Exception as e:
                 logger.exception(f"Error while sending message to topic {topic}: {e}")

+            producer.poll(0)

And after all messages are sent (outside the method), ensure producer.flush() is called.

Committable suggestion was skipped due to low confidence.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is being done in the caller method.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@NicholasTurner23, thank you for the clarification. I understand that message flushing is handled in the caller method.


✏️ Learnings added
Learnt from: NicholasTurner23
PR: airqo-platform/AirQo-api#3683
File: src/workflows/airqo_etl_utils/message_broker_utils.py:271-302
Timestamp: 2024-10-17T21:49:15.884Z
Learning: In the `MessageBrokerUtils` class, messages are flushed in the caller method after invoking `_send_message`, so it's not necessary to call `producer.flush()` within the `_send_message` method.

Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.

Comment on lines +303 to +362
def publish_to_topic(
self,
topic: str,
data: pd.DataFrame,
column_key: str = None,
auto_partition: bool = True,
):
"""
Publishes data to a Kafka topic. If a `column_key` is provided, each row's key will be
extracted from the specified column, otherwise data is split into chunks and sent without keys.

Args:
topic: The Kafka topic to publish data to.
data: A Pandas DataFrame containing the data to be published.
column_key: Optional column name to be used as the message key. If None, data is chunked and sent without keys.
auto_partition: If True, Kafka will automatically select the partition. If False, partitions are selected manually based on load.

Raises:
Exception: If an error occurs while sending a message, the exception is logged.
"""
producer = Producer(self.conf)

logger.info(f"Preparing to publish data to topic: {topic}")
data.replace(np.nan, None, inplace=True)
dataframe_list = data.to_dict("records")

if column_key:
logger.info(f"Using '{column_key}' as the key for messages")
for row in dataframe_list:
key = row.pop(column_key, None)
if key is None:
logger.warning(
f"No key found for column '{column_key}' in row: {row}"
)
continue
message = json.dumps(row, allow_nan=True).encode("utf-8")

selected_partition = (
None if auto_partition else self.__get_least_loaded_partition()
)
self._send_message(producer, topic, key, message, selected_partition)

if not auto_partition:
self.partition_loads[selected_partition] += 1

else:
logger.info("No key provided, splitting data into chunks and publishing")
for chunk_data in self._generate_chunks(dataframe_list):
message = json.dumps({"data": chunk_data}).encode("utf-8")

selected_partition = (
None if auto_partition else self.__get_least_loaded_partition()
)
self._send_message(producer, topic, None, message, selected_partition)

if not auto_partition:
self.partition_loads[selected_partition] += 1

producer.close()

Copy link
Contributor

@coderabbitai coderabbitai bot Oct 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Call producer.flush() after publishing messages

In the publish_to_topic method, after all messages are sent, it's crucial to call producer.flush() to ensure all messages have been delivered before closing the producer.

Apply this diff to flush and close the producer appropriately:

             producer = Producer(self.conf)

             logger.info(f"Preparing to publish data to topic: {topic}")
             data.replace(np.nan, None, inplace=True)
             dataframe_list = data.to_dict("records")

             if column_key:
                 logger.info(f"Using '{column_key}' as the key for messages")
                 for row in dataframe_list:
                     key = row.pop(column_key, None)
                     if key is None:
                         logger.warning(
                             f"No key found for column '{column_key}' in row: {row}"
                         )
                         continue
                     message = json.dumps(row, allow_nan=True).encode("utf-8")

                     selected_partition = (
                         None if auto_partition else self.__get_least_loaded_partition()
                     )
                     self._send_message(producer, topic, key, message, selected_partition)

                     if not auto_partition:
                         self.partition_loads[selected_partition] += 1

             else:
                 logger.info("No key provided, splitting data into chunks and publishing")
                 for chunk_data in self._generate_chunks(dataframe_list):
                     message = json.dumps({"data": chunk_data}).encode("utf-8")

                     selected_partition = (
                         None if auto_partition else self.__get_least_loaded_partition()
                     )
                     self._send_message(producer, topic, None, message, selected_partition)

                     if not auto_partition:
                         self.partition_loads[selected_partition] += 1

-            producer.close()
+            producer.flush()
+            producer.close()

This ensures all messages are successfully sent before the producer is closed.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def publish_to_topic(
self,
topic: str,
data: pd.DataFrame,
column_key: str = None,
auto_partition: bool = True,
):
"""
Publishes data to a Kafka topic. If a `column_key` is provided, each row's key will be
extracted from the specified column, otherwise data is split into chunks and sent without keys.
Args:
topic: The Kafka topic to publish data to.
data: A Pandas DataFrame containing the data to be published.
column_key: Optional column name to be used as the message key. If None, data is chunked and sent without keys.
auto_partition: If True, Kafka will automatically select the partition. If False, partitions are selected manually based on load.
Raises:
Exception: If an error occurs while sending a message, the exception is logged.
"""
producer = Producer(self.conf)
logger.info(f"Preparing to publish data to topic: {topic}")
data.replace(np.nan, None, inplace=True)
dataframe_list = data.to_dict("records")
if column_key:
logger.info(f"Using '{column_key}' as the key for messages")
for row in dataframe_list:
key = row.pop(column_key, None)
if key is None:
logger.warning(
f"No key found for column '{column_key}' in row: {row}"
)
continue
message = json.dumps(row, allow_nan=True).encode("utf-8")
selected_partition = (
None if auto_partition else self.__get_least_loaded_partition()
)
self._send_message(producer, topic, key, message, selected_partition)
if not auto_partition:
self.partition_loads[selected_partition] += 1
else:
logger.info("No key provided, splitting data into chunks and publishing")
for chunk_data in self._generate_chunks(dataframe_list):
message = json.dumps({"data": chunk_data}).encode("utf-8")
selected_partition = (
None if auto_partition else self.__get_least_loaded_partition()
)
self._send_message(producer, topic, None, message, selected_partition)
if not auto_partition:
self.partition_loads[selected_partition] += 1
producer.close()
def publish_to_topic(
self,
topic: str,
data: pd.DataFrame,
column_key: str = None,
auto_partition: bool = True,
):
"""
Publishes data to a Kafka topic. If a `column_key` is provided, each row's key will be
extracted from the specified column, otherwise data is split into chunks and sent without keys.
Args:
topic: The Kafka topic to publish data to.
data: A Pandas DataFrame containing the data to be published.
column_key: Optional column name to be used as the message key. If None, data is chunked and sent without keys.
auto_partition: If True, Kafka will automatically select the partition. If False, partitions are selected manually based on load.
Raises:
Exception: If an error occurs while sending a message, the exception is logged.
"""
producer = Producer(self.conf)
logger.info(f"Preparing to publish data to topic: {topic}")
data.replace(np.nan, None, inplace=True)
dataframe_list = data.to_dict("records")
if column_key:
logger.info(f"Using '{column_key}' as the key for messages")
for row in dataframe_list:
key = row.pop(column_key, None)
if key is None:
logger.warning(
f"No key found for column '{column_key}' in row: {row}"
)
continue
message = json.dumps(row, allow_nan=True).encode("utf-8")
selected_partition = (
None if auto_partition else self.__get_least_loaded_partition()
)
self._send_message(producer, topic, key, message, selected_partition)
if not auto_partition:
self.partition_loads[selected_partition] += 1
else:
logger.info("No key provided, splitting data into chunks and publishing")
for chunk_data in self._generate_chunks(dataframe_list):
message = json.dumps({"data": chunk_data}).encode("utf-8")
selected_partition = (
None if auto_partition else self.__get_least_loaded_partition()
)
self._send_message(producer, topic, None, message, selected_partition)
if not auto_partition:
self.partition_loads[selected_partition] += 1
producer.flush()
producer.close()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The close() method also performs a flush if I am not mistaken.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@NicholasTurner23, you are correct. The close() method of Producer performs a flush internally. Thank you for the clarification.


✏️ Learnings added
Learnt from: NicholasTurner23
PR: airqo-platform/AirQo-api#3683
File: src/workflows/airqo_etl_utils/message_broker_utils.py:303-362
Timestamp: 2024-10-17T21:44:36.359Z
Learning: In the `confluent_kafka` Producer class, calling `close()` performs a flush internally, so an explicit `flush()` before closing is unnecessary.

Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.

Comment on lines +363 to +458
def consume_from_topic(
self,
topic: str,
group_id: str,
auto_offset_reset: str = "latest",
max_messages: Optional[int] = None,
auto_commit: bool = True,
from_beginning: bool = False,
offset: Optional[int] = None,
wait_time_sec: int = 30,
streaming: bool = False,
) -> Any:
"""
Consume messages from a Kafka topic and return them.

Args:
topic: The Kafka topic to consume from.
group_id: The consumer group ID.
auto_offset_reset: Determines where to start reading when there's no valid offset. Default is 'latest'.
max_messages: Limit on the number of messages to consume. If None, consume all available messages.
auto_commit: Whether to auto-commit offsets.
from_beginning: Whether to start consuming from the beginning of the topic.
offset: Start consuming from a specific offset if provided.
wait_time_sec: How long to wait for messages (useful for one-time data requests).
streaming: If True, run as a continuous streaming job.

Return:
A generator that yields consumed messages from the topic.
"""

self.conf.update(
{
"group.id": group_id,
"auto.offset.reset": auto_offset_reset,
"enable.auto.commit": "true" if auto_commit else "false",
}
)

consumer = Consumer(self.conf)
consumer.subscribe([topic])

assigned = False
while not assigned and wait_time_sec > 0:
logger.info("Waiting for partition assignment...")
msg = consumer.poll(timeout=1.0)
if msg is not None and msg.error() is None:
assigned = True
wait_time_sec -= 1

if from_beginning:
logger.info("Seeking to the beginning of all partitions...")
partitions = [
TopicPartition(topic, p.partition, offset=0)
for p in consumer.assignment()
]
consumer.assign(partitions)
for partition in partitions:
consumer.seek(partition)

if offset is not None:
logger.info(f"Seeking to offset {offset} for all partitions...")
partitions = [
TopicPartition(topic, p.partition, p.offset)
for p in consumer.assignment()
]
consumer.assign(partitions)

message_count = 0
try:
while streaming or (message_count < max_messages if max_messages else True):
msg = consumer.poll(timeout=1.0)

if msg is None:
logger.info("No messages in this poll.")
if not streaming:
break
continue

if msg.error():
logger.exception(f"Consumer error: {msg.error()}")
continue
msg_value = msg.value().decode("utf-8")
yield {
"key": msg.key().decode("utf-8"),
"value": msg_value,
} if msg.key() else {"value": msg_value}
message_count += 1

except Exception as e:
logger.exception(f"Error while consuming messages from topic {topic}: {e}")

finally:
consumer.close()
logger.info(
f"Closed consumer. No more messages to consume from topic: {topic}"
)
Copy link
Contributor

@coderabbitai coderabbitai bot Oct 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Validate loop termination conditions in consume_from_topic

In the consume_from_topic method, when streaming is False and max_messages is None, the loop might run indefinitely.

Ensure that the loop exits appropriately by modifying the condition:

             try:
                 while streaming or (message_count < max_messages if max_messages else False):
                     msg = consumer.poll(timeout=1.0)

This change prevents an infinite loop when not streaming and max_messages is not set.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def consume_from_topic(
self,
topic: str,
group_id: str,
auto_offset_reset: str = "latest",
max_messages: Optional[int] = None,
auto_commit: bool = True,
from_beginning: bool = False,
offset: Optional[int] = None,
wait_time_sec: int = 30,
streaming: bool = False,
) -> Any:
"""
Consume messages from a Kafka topic and return them.
Args:
topic: The Kafka topic to consume from.
group_id: The consumer group ID.
auto_offset_reset: Determines where to start reading when there's no valid offset. Default is 'latest'.
max_messages: Limit on the number of messages to consume. If None, consume all available messages.
auto_commit: Whether to auto-commit offsets.
from_beginning: Whether to start consuming from the beginning of the topic.
offset: Start consuming from a specific offset if provided.
wait_time_sec: How long to wait for messages (useful for one-time data requests).
streaming: If True, run as a continuous streaming job.
Return:
A generator that yields consumed messages from the topic.
"""
self.conf.update(
{
"group.id": group_id,
"auto.offset.reset": auto_offset_reset,
"enable.auto.commit": "true" if auto_commit else "false",
}
)
consumer = Consumer(self.conf)
consumer.subscribe([topic])
assigned = False
while not assigned and wait_time_sec > 0:
logger.info("Waiting for partition assignment...")
msg = consumer.poll(timeout=1.0)
if msg is not None and msg.error() is None:
assigned = True
wait_time_sec -= 1
if from_beginning:
logger.info("Seeking to the beginning of all partitions...")
partitions = [
TopicPartition(topic, p.partition, offset=0)
for p in consumer.assignment()
]
consumer.assign(partitions)
for partition in partitions:
consumer.seek(partition)
if offset is not None:
logger.info(f"Seeking to offset {offset} for all partitions...")
partitions = [
TopicPartition(topic, p.partition, p.offset)
for p in consumer.assignment()
]
consumer.assign(partitions)
message_count = 0
try:
while streaming or (message_count < max_messages if max_messages else True):
msg = consumer.poll(timeout=1.0)
if msg is None:
logger.info("No messages in this poll.")
if not streaming:
break
continue
if msg.error():
logger.exception(f"Consumer error: {msg.error()}")
continue
msg_value = msg.value().decode("utf-8")
yield {
"key": msg.key().decode("utf-8"),
"value": msg_value,
} if msg.key() else {"value": msg_value}
message_count += 1
except Exception as e:
logger.exception(f"Error while consuming messages from topic {topic}: {e}")
finally:
consumer.close()
logger.info(
f"Closed consumer. No more messages to consume from topic: {topic}"
)
def consume_from_topic(
self,
topic: str,
group_id: str,
auto_offset_reset: str = "latest",
max_messages: Optional[int] = None,
auto_commit: bool = True,
from_beginning: bool = False,
offset: Optional[int] = None,
wait_time_sec: int = 30,
streaming: bool = False,
) -> Any:
"""
Consume messages from a Kafka topic and return them.
Args:
topic: The Kafka topic to consume from.
group_id: The consumer group ID.
auto_offset_reset: Determines where to start reading when there's no valid offset. Default is 'latest'.
max_messages: Limit on the number of messages to consume. If None, consume all available messages.
auto_commit: Whether to auto-commit offsets.
from_beginning: Whether to start consuming from the beginning of the topic.
offset: Start consuming from a specific offset if provided.
wait_time_sec: How long to wait for messages (useful for one-time data requests).
streaming: If True, run as a continuous streaming job.
Return:
A generator that yields consumed messages from the topic.
"""
self.conf.update(
{
"group.id": group_id,
"auto.offset.reset": auto_offset_reset,
"enable.auto.commit": "true" if auto_commit else "false",
}
)
consumer = Consumer(self.conf)
consumer.subscribe([topic])
assigned = False
while not assigned and wait_time_sec > 0:
logger.info("Waiting for partition assignment...")
msg = consumer.poll(timeout=1.0)
if msg is not None and msg.error() is None:
assigned = True
wait_time_sec -= 1
if from_beginning:
logger.info("Seeking to the beginning of all partitions...")
partitions = [
TopicPartition(topic, p.partition, offset=0)
for p in consumer.assignment()
]
consumer.assign(partitions)
for partition in partitions:
consumer.seek(partition)
if offset is not None:
logger.info(f"Seeking to offset {offset} for all partitions...")
partitions = [
TopicPartition(topic, p.partition, p.offset)
for p in consumer.assignment()
]
consumer.assign(partitions)
message_count = 0
try:
while streaming or (message_count < max_messages if max_messages else False):
msg = consumer.poll(timeout=1.0)
if msg is None:
logger.info("No messages in this poll.")
if not streaming:
break
continue
if msg.error():
logger.exception(f"Consumer error: {msg.error()}")
continue
msg_value = msg.value().decode("utf-8")
yield {
"key": msg.key().decode("utf-8"),
"value": msg_value,
} if msg.key() else {"value": msg_value}
message_count += 1
except Exception as e:
logger.exception(f"Error while consuming messages from topic {topic}: {e}")
finally:
consumer.close()
logger.info(
f"Closed consumer. No more messages to consume from topic: {topic}"
)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The consume method can be used for streaming where it runs indefinitely as well as consuming data once when triggered. At least that is the intention.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@NicholasTurner23, thank you for the clarification. I understand that the consume_from_topic method is intended to run indefinitely when streaming is enabled, for continuous streaming, and to consume data once when triggered otherwise. I've noted this for future reviews.

🐰❓


✏️ Learnings added
Learnt from: NicholasTurner23
PR: airqo-platform/AirQo-api#3683
File: src/workflows/airqo_etl_utils/message_broker_utils.py:363-458
Timestamp: 2024-10-17T21:48:34.724Z
Learning: In the `src/workflows/airqo_etl_utils/message_broker_utils.py` file, the `consume_from_topic` method is intentionally designed to run indefinitely when `streaming` is `True`, for continuous streaming, and to consume data once when triggered otherwise.

Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.

Copy link
Contributor

@Baalmart Baalmart left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, @NicholasTurner23 , for this amazing contribution. I sincerely appreciate your efforts.

Regarding these proposed changes:

  1. Backward Compatibility: Could you please provide insights on the potential impact of these modifications on existing functionality? Specifically:
  • Are there any known issues or conflicts with currently deployed systems?
  • Have compatibility tests been conducted with our production environment?
  1. Critical Security Systems: Our organization has several critical security systems heavily reliant on Kafka. These systems need to remain operational throughout the ongoing work. Therefore, I would like to request:
  • Confirmation that the proposed changes will not disrupt the current Kafka-based systems.
  • Any mitigation strategies or rollback plans in place to ensure uninterrupted service during the transition period.

@NicholasTurner23
Copy link
Contributor Author

Thank you, @NicholasTurner23 , for this amazing contribution. I sincerely appreciate your efforts.

Regarding these proposed changes:

1. **Backward Compatibility:** Could you please provide insights on the potential impact of these modifications on existing functionality? Specifically:

At the moment, no impact on existing functionality. These changes are for the workflows application(s).
2. Critical Security Systems: Our organization has several critical security systems heavily reliant on Kafka. These systems need to remain operational throughout the ongoing work. Therefore, I would like to request:
There is no interaction with other kafka systems. These changes only affect the workflows application(s).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants