-
Notifications
You must be signed in to change notification settings - Fork 22
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update fix/clean up #4184
Update fix/clean up #4184
Conversation
…moved, filling is done more accurately too.
Updates from airqo staging
📝 WalkthroughWalkthroughThis pull request introduces refinements to several utility classes and workflows in the AirQo data processing pipeline. The changes focus on improving error handling, data extraction flexibility, and task retry mechanisms across multiple Python files. The modifications enhance the robustness of data processing by adding more nuanced logging, introducing optional filtering parameters, and implementing automatic retry logic for critical tasks. Changes
Sequence DiagramsequenceDiagram
participant DAG as Airflow DAG
participant Extractor as Data Extractor
participant BigQuery as BigQuery API
participant Cleaner as Data Cleaner
DAG->>Extractor: Extract Data
Extractor->>BigQuery: Query with Optional Filters
BigQuery-->>Extractor: Return Data
Extractor->>Cleaner: Remove Duplicates
Cleaner-->>Extractor: Cleaned Data
Extractor->>DAG: Return Processed Data
Possibly Related PRs
Suggested Labels
Suggested Reviewers
Poem
Finishing Touches
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (3)
src/workflows/airqo_etl_utils/airnow_utils.py (1)
105-105
: Consider using warning level logging for missing BAM data.While changing from raising an exception to logging is good for handling expected cases, using
logger.warning()
instead oflogger.info()
would better indicate that missing BAM data might need attention, especially if data is expected for the specified date range.- logger.info("No BAM data found for the specified date range.") + logger.warning("No BAM data found for the specified date range.")src/workflows/airqo_etl_utils/airqo_utils.py (2)
127-161
: Consider using a set for better readability.The list of non-essential columns could be more concisely defined using a set.
- non_essential_cols = [ - col - for col in data.columns - if col not in ["timestamp", "device_id", "device_number", "site_id"] - ] + essential_cols = {"timestamp", "device_id", "device_number", "site_id"} + non_essential_cols = [col for col in data.columns if col not in essential_cols]
163-174
: Consider adding progress logging for large datasets.When processing large datasets, it would be helpful to log progress during the group-by operations.
filled_duplicates = [] + total_groups = len(duplicates.groupby("site_id")) + processed_groups = 0 for _, group in duplicates.groupby("site_id"): + processed_groups += 1 + if processed_groups % 100 == 0: # Log every 100 groups + logger.info(f"Processing duplicates: {processed_groups}/{total_groups} groups completed") group = group.sort_values(by=["device_id", "timestamp"]) group[columns_to_fill] = ( group[columns_to_fill].fillna(method="ffill").fillna(method="bfill") ) group = group.drop_duplicates( subset=["device_id", "timestamp"], keep="first" ) filled_duplicates.append(group)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
src/workflows/airqo_etl_utils/airnow_utils.py
(1 hunks)src/workflows/airqo_etl_utils/airqo_utils.py
(1 hunks)src/workflows/airqo_etl_utils/bigquery_api.py
(1 hunks)src/workflows/dags/airqo_measurements.py
(3 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (2)
- GitHub Check: Analyze (python)
- GitHub Check: Analyze (javascript)
🔇 Additional comments (6)
src/workflows/dags/airqo_measurements.py (2)
218-224
: LGTM! Improved error handling with retry logic and optimized timeframe.The changes enhance robustness and efficiency:
- Added retry mechanism (3 retries with 5-minute delays) helps handle transient failures
- Reduced timeframe from 14 to 1 day optimizes processing load
Also applies to: 232-238
Line range hint
258-274
: Verify the retry configuration across tasks.The retry configuration (3 retries, 5-minute delays) has been consistently applied to data extraction and loading tasks. However, let's verify if similar retry logic should be applied to other tasks in the DAG for consistency.
✅ Verification successful
Retry configuration is appropriately implemented based on task responsibilities
The retry configuration has been correctly applied to tasks that interact with external services (data extraction, API calls, database operations), while pure computational tasks appropriately omit retry logic. This pattern is consistent and follows best practices for task resilience.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Search for @task decorators without retry configuration rg "@task\(" --type py | grep -v "retries="Length of output: 10968
Script:
#!/bin/bash # Search for task implementations in airqo_measurements.py that use external services rg -A 5 "@task" src/workflows/dags/airqo_measurements.pyLength of output: 9395
src/workflows/airqo_etl_utils/bigquery_api.py (1)
570-571
: LGTM! Improved network filtering logic.The explicit check for
network != "all"
provides clearer control over network filtering in SQL queries. This change aligns well with the network-based filtering enhancements across the codebase.src/workflows/airqo_etl_utils/airqo_utils.py (3)
69-89
: LGTM! Well-documented method signature with clear type hints.The addition of the optional
device_network
parameter and comprehensive docstring improves code clarity and maintainability.
92-96
: LGTM! Improved table selection logic.Using a dictionary for table mapping is more maintainable and less error-prone than conditional statements.
108-124
: LGTM! Comprehensive docstring for remove_duplicates method.The detailed docstring clearly explains the method's purpose, steps, and expectations.
Description
This PR does the following:
Related Issues
Summary by CodeRabbit
Bug Fixes
Chores
Refactor