-
Notifications
You must be signed in to change notification settings - Fork 22
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update/kafka implementations #3760
Update/kafka implementations #3760
Conversation
Updates from airqo staging
📝 WalkthroughWalkthroughThe changes in this pull request involve modifications to several classes and functions within the AirQo ETL workflow. Key updates include the renaming of parameters in the Changes
Possibly related PRs
Suggested reviewers
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
🧹 Outside diff range and nitpick comments (5)
src/workflows/dags/airqo_kafka_workflows.py (3)
Line range hint
36-40
: Consider enhancing error handling for Kafka publishing.The Kafka producer implementation could benefit from additional error handling and retry logic for robustness.
Consider wrapping the publish operation in a try-catch block:
if not devices.empty: broker = MessageBrokerUtils() + try: broker.publish_to_topic( data=devices, topic=configuration.DEVICES_TOPIC, column_key="device_name", ) + except Exception as e: + logging.error(f"Failed to publish devices to Kafka: {str(e)}") + raise
Line range hint
1-47
: Consider adding monitoring for Kafka operations.To ensure reliable data flow, consider adding monitoring and metrics collection for the Kafka operations.
Recommendations:
- Add metrics for successful/failed message publications
- Monitor message sizes and processing times
- Set up alerts for publishing failures
- Add logging for tracking message flow
Would you like me to provide a detailed implementation for any of these monitoring aspects?
Consumer group IDs are not configured, potential message processing overlap risk
After examining the Kafka consumer implementations across the codebase, I found that none of the consumers specify a
group_id
in their configurations. This means they're using auto-generated group IDs, which could lead to:
- Inconsistent message processing across service restarts
- Multiple instances of the same service potentially processing the same messages
- No guaranteed message ordering within consumer groups
Key locations requiring attention:
src/meta-data/message-broker.py
: Missing group_id in KafkaConsumer configurationsrc/gp-model/message-broker.py
: Missing group_id in KafkaConsumer configurationsrc/kafka-connectors/bigquery-connector/message_broker.py
: Missing group_id in all three consumer implementationsEach consumer should be configured with a unique, stable group ID to ensure proper message distribution and processing semantics.
🔗 Analysis chain
Line range hint
34-40
: Verify Kafka producer configuration for unique consumer groups.While the producer implementation looks correct, the PR's objective mentions ensuring unique consumer group IDs, but this file only handles the producer side. Please verify if additional changes are needed in the consumer implementations.
Let's search for Kafka consumer configurations in the codebase:
Let's examine the consumer implementations to verify the configuration:
Let's examine the consumer implementations to understand how group IDs are configured. This will help us determine if the consumer-side changes are needed to ensure unique consumer groups.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Search for Kafka consumer configurations echo "Searching for Kafka consumer configurations..." rg -l "KafkaConsumer|consumer_group|group[._]id.*kafka" # Search for MessageBrokerUtils consumer-related methods ast-grep --pattern 'class MessageBrokerUtils { $$$ consume($$$) { $$$ } $$$ }'Length of output: 419
Script:
#!/bin/bash # Check consumer configurations in the identified files echo "=== Checking message-broker.py in meta-data ===" cat "src/meta-data/message-broker.py" echo -e "\n=== Checking message-broker.py in gp-model ===" cat "src/gp-model/message-broker.py" echo -e "\n=== Checking message_broker.py in kafka-connectors ===" cat "src/kafka-connectors/bigquery-connector/message_broker.py" echo -e "\n=== Checking Kafka consumer test ===" cat "src/incentives/bin/test/ut_kafka-consumer.js"Length of output: 17545
src/workflows/airqo_etl_utils/message_broker_utils.py (1)
Line range hint
208-266
: Consider adding unique consumer group ID generation.To support the PR's objective of ensuring unique consumer group IDs, consider adding a helper method to generate unique identifiers. This would prevent potential message consumption issues due to shared consumer groups.
Add this helper method before the
consume_from_topic
method:def generate_unique_group_id(self, base_group_id: str) -> str: """ Generate a unique consumer group ID to prevent shared consumer groups. Args: base_group_id: Base identifier for the consumer group Returns: A unique consumer group ID combining the base ID with a timestamp """ import time import uuid unique_suffix = f"{int(time.time())}_{uuid.uuid4().hex[:8]}" return f"{base_group_id}_{unique_suffix}"Then update the
consume_from_topic
method to use it:def consume_from_topic( self, topic: str, group_id: str, ... ): + unique_group_id = self.generate_unique_group_id(group_id) consumer_config = self.config.copy() consumer_config.update({ - "group.id": group_id, + "group.id": unique_group_id,src/workflows/airqo_etl_utils/data_validator.py (1)
Line range hint
279-294
: Consider following Python naming conventions for parameters.The parameter
taskinstance
should follow Python's snake_case naming convention astask_instance
. Additionally, the docstring's parameter type hint fordevices
should be updated to match the actual type hint in the signature (List[Dict[str, Any]]
instead ofpd.DataFrame
).- def transform_devices(devices: List[Dict[str, Any]], taskinstance) -> pd.DataFrame: + def transform_devices(devices: List[Dict[str, Any]], task_instance) -> pd.DataFrame: """ Transforms and processes the devices DataFrame. If the checksum of the devices data has not changed since the last execution, it returns an empty DataFrame. Otherwise, it updates the checksum in XCom and returns the transformed DataFrame. Args: - devices (pd.DataFrame): A Pandas DataFrame containing the devices data. + devices (List[Dict[str, Any]]): A list of dictionaries containing the devices data. - task_instance: The Airflow task instance used to pull and push XCom values. + task_instance (TaskInstance): The Airflow task instance used to pull and push XCom values. Returns: pd.DataFrame: Transformed DataFrame if the devices data has changed since the last execution; otherwise, an empty DataFrame. """
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (4)
- src/workflows/airqo_etl_utils/data_validator.py (3 hunks)
- src/workflows/airqo_etl_utils/message_broker_utils.py (2 hunks)
- src/workflows/dags/airqo_kafka_workflows.py (1 hunks)
- src/workflows/dags/airqo_measurements.py (2 hunks)
🔇 Additional comments (4)
src/workflows/dags/airqo_kafka_workflows.py (1)
31-33
: LGTM: Clean transition to static method call.The update to use
DataValidationUtils.transform_devices
as a static method call improves code clarity and follows Python best practices for utility methods.src/workflows/airqo_etl_utils/message_broker_utils.py (1)
208-208
: Verify the impact of increased wait time.The default wait time for partition assignment has been increased from 30 to 40 seconds. While this might improve reliability, it could impact performance.
Let's analyze the consumer group behavior with both wait times:
✅ Verification successful
Based on the search results, I can see the full context of how the wait time is used in the message broker consumer. The increased wait time is part of the partition assignment process where the consumer waits for topic partitions to be assigned before starting to consume messages.
Looking at the code:
- The wait time is used in a polling loop that checks for partition assignment
- The consumer polls with a 1-second timeout in each iteration
- The total wait time (40 seconds) is decremented each iteration
- Other timeout configurations like
message.timeout.ms
andrequest.timeout.ms
are set to 300000ms (5 minutes)The increased wait time from 30 to 40 seconds appears reasonable because:
- It provides more buffer time for partition rebalancing in case of network latency or high load
- The 40-second timeout is still well below other timeout configurations
- The polling happens in 1-second intervals, allowing for early exit if partitions are assigned sooner
The increased partition assignment wait time is appropriate and unlikely to cause performance issues
- The change improves reliability by allowing more time for partition assignment
- The incremental polling approach means it won't unnecessarily wait the full 40 seconds
- The timeout is still conservative compared to other message broker timeouts
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Check consumer group lag and processing patterns # This will help understand if the increased wait time is necessary # Search for any timing-related issues or timeout patterns in the codebase rg -g '!*.{log,json,md}' -A 3 'timeout|wait_time|partition.*assign' # Look for any consumer configuration patterns rg -g '!*.{log,json,md}' 'group.id|auto.offset.reset|enable.auto.commit'Length of output: 36434
src/workflows/airqo_etl_utils/data_validator.py (1)
Line range hint
295-309
: LGTM! Clear and explicit data transformation.The explicit DataFrame conversion and column renaming logic is well-structured and maintainable.
src/workflows/dags/airqo_measurements.py (1)
108-108
: Verify the impact on Kafka consumer group managementThe concatenation of DAG ID with the timestamp creates unique consumer groups per hour, which aligns with the PR objective. However, let's verify that this doesn't lead to an excessive number of consumer groups in Kafka.
Also applies to: 381-381
✅ Verification successful
Let me gather more specific information about the consumer group implementation.
This will help us understand:
- How consumer groups are configured in MessageBrokerUtils
- The default Kafka consumer settings
- The context of how the caller ID is used in the DAG
Consumer group implementation is well-managed and safe
The concatenation of DAG ID with timestamp in the caller parameter doesn't affect Kafka consumer groups. The code analysis reveals:
- The
caller
parameter is only used for publishing messages, not for consumer group configuration- Consumer groups are explicitly managed in
MessageBrokerUtils
with a dedicatedgroup_id
parameter- The view service uses a static consumer group ID
view-service
as seen in the message broker propertiesThe current implementation maintains proper separation between message publishing and consumption patterns, ensuring no proliferation of consumer groups.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Check for potential consumer group proliferation # Look for other instances where consumer groups might be created # Search for consumer group related configurations rg -l "group.*id" # Search for other message broker configurations ast-grep --pattern 'MessageBrokerUtils.$$$'Length of output: 2603
Script:
#!/bin/bash # Check the MessageBrokerUtils implementation for consumer group handling rg -A 10 "group.*id" src/workflows/airqo_etl_utils/message_broker_utils.py # Look for actual Kafka consumer configurations cat src/view/src/main/resources/application-messageBroker.properties # Check the context around the caller usage in airqo_measurements.py rg -A 5 "caller.*=.*dag" src/workflows/dags/airqo_measurements.pyLength of output: 2636
@@ -35,7 +35,7 @@ def __init__(self): | |||
# Note: This should be updated in case the number of partions used changes. | |||
self.partition_loads = {int(p): 0 for p in self.__partitions} | |||
self.config = { | |||
"bootstrap.servers": self.__bootstrap_servers, | |||
"bootstrap.servers": "35.187.179.27:30200,34.79.78.204:30200,34.78.90.92:30200", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Critical: Remove hardcoded Kafka bootstrap servers.
The bootstrap servers should not be hardcoded in the source code. This practice:
- Makes deployment across different environments challenging
- Exposes sensitive infrastructure information
- Violates configuration management best practices
Replace the hardcoded servers with configuration reference:
- "bootstrap.servers": "35.187.179.27:30200,34.79.78.204:30200,34.78.90.92:30200",
+ "bootstrap.servers": self.__bootstrap_servers,
Committable suggestion was skipped due to low confidence.
previous_checksum = taskinstance.xcom_pull(key="devices_checksum") | ||
|
||
if previous_checksum == api_devices_checksum: | ||
return pd.DataFrame() | ||
|
||
task_instance.xcom_push(key="devices_checksum", value=api_devices_checksum) | ||
taskinstance.xcom_push(key="devices_checksum", value=api_devices_checksum) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Consider adding error handling for XCom operations.
The XCom operations could fail silently if there are issues with the Airflow context or XCom backend. Consider adding try-except blocks to handle potential errors gracefully.
- previous_checksum = taskinstance.xcom_pull(key="devices_checksum")
+ try:
+ previous_checksum = taskinstance.xcom_pull(key="devices_checksum")
+ except Exception as e:
+ logger.warning(f"Failed to pull previous checksum from XCom: {e}")
+ previous_checksum = None
if previous_checksum == api_devices_checksum:
return pd.DataFrame()
- taskinstance.xcom_push(key="devices_checksum", value=api_devices_checksum)
+ try:
+ taskinstance.xcom_push(key="devices_checksum", value=api_devices_checksum)
+ except Exception as e:
+ logger.error(f"Failed to push new checksum to XCom: {e}")
Committable suggestion was skipped due to low confidence.
from datetime import datetime | ||
|
||
now = datetime.now() | ||
unique_str = str(now.date()) + "-" + str(now.hour) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Consider using a more robust unique identifier format
While using date and hour for uniqueness is a good start, there could be edge cases where multiple DAG runs occur within the same hour. Consider enhancing the unique string generation to include minutes and a random component:
-unique_str = str(now.date()) + "-" + str(now.hour)
+unique_str = f"{now.strftime('%Y-%m-%d-%H-%M')}-{hash(now.timestamp())}"
This would provide better guarantees of uniqueness while maintaining temporal ordering.
Also applies to: 372-375
from datetime import datetime | ||
|
||
now = datetime.now() | ||
unique_str = str(now.date()) + "-" + str(now.hour) | ||
|
||
data = DataValidationUtils.process_data_for_message_broker( | ||
data=data, | ||
tenant=Tenant.AIRQO, | ||
topic=configuration.HOURLY_MEASUREMENTS_TOPIC, | ||
caller=kwargs["dag"].dag_id, | ||
caller=kwargs["dag"].dag_id + unique_str, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Consider extracting unique string generation to a utility function
The unique string generation logic is duplicated in both DAGs. Consider extracting this to a utility function in airqo_etl_utils
to maintain DRY principles and ensure consistent implementation across all DAGs.
Example implementation:
# In airqo_etl_utils/date.py
def generate_unique_caller_id(dag_id: str) -> str:
now = datetime.now()
unique_str = f"{now.strftime('%Y-%m-%d-%H-%M')}-{hash(now.timestamp())}"
return f"{dag_id}{unique_str}"
Then in the DAGs:
-now = datetime.now()
-unique_str = str(now.date()) + "-" + str(now.hour)
-caller=kwargs["dag"].dag_id + unique_str,
+caller=generate_unique_caller_id(kwargs["dag"].dag_id),
Also applies to: 372-381
Description
Summary by CodeRabbit
caller
identifier with a unique string based on the current date and hour.transform_devices
method as a static method, improving clarity and usage.transform_devices
method.