-
Notifications
You must be signed in to change notification settings - Fork 22
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update/kafka implementations #3784
Update/kafka implementations #3784
Conversation
Updates from airqo staging
📝 WalkthroughWalkthroughThe changes in this pull request involve modifications to the Changes
Possibly related PRs
Suggested reviewers
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Outside diff range and nitpick comments (3)
src/workflows/airqo_etl_utils/message_broker_utils.py (3)
235-236
: LGTM! The timeout configurations are well-calibrated.The new timeout configurations are reasonable and align with the PR's objective:
api.version.request.timeout.ms
: 15s for API version negotiationsocket.timeout.ms
: 30s for socket operationsConsider adding inline documentation to explain these timeout values and their implications. For example:
consumer_config.update( { "group.id": group_id, "auto.offset.reset": auto_offset_reset, "enable.auto.commit": "true" if auto_commit else "false", "fetch.message.max.bytes": 2 * 1024 * 1024, - "api.version.request.timeout.ms": 15000, - "socket.timeout.ms": 30000, + # Allow 15s for broker version compatibility checks + "api.version.request.timeout.ms": 15000, + # Allow 30s for socket operations (connect, read, write) + "socket.timeout.ms": 30000, } )
Line range hint
273-275
: Consider enhancing error handling for timeout scenarios.Given the new timeout configurations, it would be beneficial to add specific error handling for timeout-related exceptions to help diagnose connection issues.
Consider updating the error handling as follows:
try: while streaming or (message_count < max_messages if max_messages else True): msg = consumer.poll(timeout=1.0) if msg is None: logger.info("No messages in this poll.") if not streaming: break continue if msg.error(): - logger.exception(f"Consumer error: {msg.error()}") + error_code = msg.error().code() + if error_code in ( + KafkaError._TIMED_OUT, + KafkaError._TRANSPORT, + ): + logger.warning( + f"Timeout or transport error occurred: {msg.error()}. " + f"This might be due to network issues or broker unavailability." + ) + else: + logger.exception(f"Consumer error: {msg.error()}") continueDon't forget to add the import at the top:
from confluent_kafka import KafkaError
Line range hint
261-267
: Enhance partition assignment wait logic.The current wait logic could be improved to provide better visibility into the partition assignment process and handle timeouts more gracefully.
Consider updating the wait logic as follows:
assigned = False + start_time = time.time() while not assigned and wait_time_sec > 0: - logger.info("Waiting for partition assignment...") + elapsed = int(time.time() - start_time) + logger.info( + f"Waiting for partition assignment... " + f"({elapsed}s elapsed, {wait_time_sec - elapsed}s remaining)" + ) msg = consumer.poll(timeout=1.0) if msg is not None and msg.error() is None: assigned = True wait_time_sec -= 1 + if not assigned: + logger.warning( + f"Partition assignment wait time ({wait_time_sec}s) expired. " + f"This might indicate connectivity issues or broker unavailability." + )Don't forget to add the import at the top:
import time
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (2)
- src/workflows/airqo_etl_utils/airqo_utils.py (1 hunks)
- src/workflows/airqo_etl_utils/message_broker_utils.py (1 hunks)
🔇 Additional comments (2)
src/workflows/airqo_etl_utils/airqo_utils.py (2)
1061-1062
: Note about consumer wait time configurationWhile these changes look good, I notice that the PR's primary objective of increasing consumer wait time for connection and network setup isn't reflected in this file. This configuration is likely handled in the
MessageBrokerUtils
class.#!/bin/bash # Description: Locate consumer configuration settings # Test: Search for consumer configuration in MessageBrokerUtils # Check for consumer configuration rg -A 10 "consumer.*config" "src/workflows/airqo_etl_utils/message_broker_utils.py" # Check for timeout settings rg -A 5 "timeout|wait" "src/workflows/airqo_etl_utils/message_broker_utils.py"
1061-1062
: Verify Kafka message schema changeThe change from checking
device_id
to_id
appears correct, and the warning message has been appropriately updated. However, let's verify this change against the Kafka topic schema.✅ Verification successful
This will help us understand the device identification patterns and verify the schema change from "device_id" to "_id".
Based on the search results, I can now provide a definitive response about the schema change from "device_id" to "_id":
Schema change from
device_id
to_id
is correct and consistentThe codebase shows that the change is part of a deliberate mapping where:
- The device's name/identifier is stored as
device_id
in the message- The device's MongoDB
_id
is used as the unique identifier in the transformed data- This is evidenced in the code where
device_details["device_id"]
is mapped to "device" anddevice_details["_id"]
is mapped to "device_id" in the transformed structureThe warning message at lines 1061-1062 correctly checks for the presence of
_id
as it's working with the transformed data structure.🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify the Kafka message schema change from device_id to _id # Test: Search for other occurrences of device_id and _id in the codebase # Check for device_id usage in message handling rg -A 5 "device_id.*message" # Check for _id usage in Kafka related code rg -A 5 "_id.*kafka" # Check for device schema definitions fd -e json -e yaml -e yml | xargs rg -l "device.*schema"Length of output: 2818
Script:
#!/bin/bash # Let's search for device-related patterns in the codebase # Search for device_id usage patterns rg -A 5 "device_id" src/workflows/airqo_etl_utils/ # Search for _id usage patterns specifically in airqo_utils.py rg -A 5 "_id" src/workflows/airqo_etl_utils/airqo_utils.py # Look for device message processing rg -A 5 "def get_devices" src/workflows/airqo_etl_utils/airqo_utils.py # Search for device schema or message structure rg -A 10 "device.*message" src/workflows/airqo_etl_utils/Length of output: 72914
Description
Increase consumer wait time for connection/network setup.
Summary by CodeRabbit
New Features
Bug Fixes
Documentation