-
Notifications
You must be signed in to change notification settings - Fork 22
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update kafka producer config #3726
Update kafka producer config #3726
Conversation
📝 Walkthrough📝 WalkthroughWalkthroughThe changes in this pull request focus on enhancing the Changes
Possibly related PRs
Suggested reviewers
📜 Recent review detailsConfiguration used: CodeRabbit UI 📒 Files selected for processing (1)
🚧 Files skipped from review as they are similar to previous changes (1)
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Outside diff range and nitpick comments (2)
src/workflows/airqo_etl_utils/message_broker_utils.py (2)
324-333
: Review Kafka Producer Configuration ParametersThe updated configuration includes parameters like
retries
,batch.num.messages
,retry.backoff.ms
,debug
, andmessage.timeout.ms
. This enhances reliability, but please consider the following:
retry.backoff.ms
: A value of 80,000 ms (80 seconds) might introduce long delays between retries. Ensure this aligns with your application's requirements.
debug
: Settingdebug
to"msg"
enables verbose logging, which can impact performance. Consider using it only during development or troubleshooting.
Line range hint
340-371
: Ensuremessage_counts
is Updated in All ScenariosCurrently,
message_counts
is incremented only when acolumn_key
is provided. Ifcolumn_key
isNone
, and data is sent in chunks,message_counts
is not updated, leading to inaccurate message count logging.Apply this diff to increment
message_counts
in both scenarios:message_counts = 0 if column_key: # Existing code message_counts += 1 else: # Existing code + message_counts += len(chunk_data)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (1)
- src/workflows/airqo_etl_utils/message_broker_utils.py (7 hunks)
🧰 Additional context used
🔇 Additional comments (2)
src/workflows/airqo_etl_utils/message_broker_utils.py (2)
34-39
: Configurations Initialized CorrectlyThe
partition_loads
andconfig
dictionaries are appropriately initialized, enhancing the configuration management for the Kafka producer.
300-300
: Appropriate Addition ofproducer.poll(1.0)
Adding
producer.poll(1.0)
ensures that the producer processes delivery reports, enhancing error handling and message delivery confirmation.
logger.info("Dataframe info : ") | ||
logger.info(data.info()) | ||
logger.info("Dataframe description : ") | ||
logger.info(data.describe()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix Logging of DataFrame Information
The methods data.info()
and data.describe()
print their output to the console and return None
. As a result, logger.info(data.info())
will log None
. To correctly log this information, you need to capture and log their outputs.
Apply this diff to fix the logging:
+import io
logger.info("Dataframe info : ")
-buffer
-logger.info(data.info())
+buffer = io.StringIO()
+data.info(buf=buffer)
+logger.info(buffer.getvalue())
logger.info("Dataframe description : ")
-logger.info(data.describe())
+logger.info(f"\n{data.describe()}")
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
logger.info("Dataframe info : ") | |
logger.info(data.info()) | |
logger.info("Dataframe description : ") | |
logger.info(data.describe()) | |
import io | |
logger.info("Dataframe info : ") | |
buffer = io.StringIO() | |
data.info(buf=buffer) | |
logger.info(buffer.getvalue()) | |
logger.info("Dataframe description : ") | |
logger.info(f"\n{data.describe()}") |
self.config.update( | ||
{ | ||
"group.id": group_id, | ||
"auto.offset.reset": auto_offset_reset, | ||
"enable.auto.commit": "true" if auto_commit else "false", | ||
} | ||
) | ||
|
||
consumer = Consumer(self.conf) | ||
consumer = Consumer(self.config) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid Modifying Shared self.config
for Consumer Settings
Updating self.config
with consumer-specific parameters can unintentionally affect producer configurations elsewhere in the class.
Consider using a separate configuration for the consumer:
def consume_from_topic(
self,
# existing parameters
):
+ consumer_config = self.config.copy()
+ consumer_config.update(
+ {
+ "group.id": group_id,
+ "auto.offset.reset": auto_offset_reset,
+ "enable.auto.commit": "true" if auto_commit else "false",
+ }
+ )
- self.config.update(
- {
- "group.id": group_id,
- "auto.offset.reset": auto_offset_reset,
- "enable.auto.commit": "true" if auto_commit else "false",
- }
- )
- consumer = Consumer(self.config)
+ consumer = Consumer(consumer_config)
This ensures that producer configurations remain unaffected.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
self.config.update( | |
{ | |
"group.id": group_id, | |
"auto.offset.reset": auto_offset_reset, | |
"enable.auto.commit": "true" if auto_commit else "false", | |
} | |
) | |
consumer = Consumer(self.conf) | |
consumer = Consumer(self.config) | |
consumer_config = self.config.copy() | |
consumer_config.update( | |
{ | |
"group.id": group_id, | |
"auto.offset.reset": auto_offset_reset, | |
"enable.auto.commit": "true" if auto_commit else "false", | |
} | |
) | |
consumer = Consumer(consumer_config) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks @NicholasTurner23
Description
Improve kafka producer persistence by adding retries and other configs.
Summary by CodeRabbit
New Features
Bug Fixes
Documentation