-
Notifications
You must be signed in to change notification settings - Fork 22
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update fix/consolidated data cleanup #4214
Update fix/consolidated data cleanup #4214
Conversation
📝 WalkthroughWalkthroughThe pull request introduces significant refactoring in data extraction utilities across multiple files. The changes primarily focus on simplifying data extraction methods, removing redundant code, and enhancing type safety. Key modifications include the removal of the Changes
Possibly related PRs
Suggested Labels
Suggested Reviewers
Poem
Finishing Touches
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
src/workflows/airqo_etl_utils/data_warehouse_utils.py
(2 hunks)src/workflows/airqo_etl_utils/schema/bam_raw_measurements.json
(0 hunks)src/workflows/airqo_etl_utils/weather_data_utils.py
(1 hunks)src/workflows/dags/data_warehouse.py
(10 hunks)
💤 Files with no reviewable changes (1)
- src/workflows/airqo_etl_utils/schema/bam_raw_measurements.json
🧰 Additional context used
🪛 Ruff (0.8.2)
src/workflows/dags/data_warehouse.py
56-56: Local variable data
is assigned to but never used
Remove assignment to unused variable data
(F841)
⏰ Context from checks skipped due to timeout of 90000ms (1)
- GitHub Check: Analyze (javascript)
🔇 Additional comments (6)
src/workflows/airqo_etl_utils/data_warehouse_utils.py (2)
56-61
: Well-structured refactoring of data extraction!The consolidation of data extraction logic using
DataUtils.extract_data_from_bigquery
improves code maintainability and consistency.
87-87
: Great enhancement to type safety!The change from string to
DeviceNetwork
enum improves type safety and makes the API more explicit. This helps prevent runtime errors from invalid network values.src/workflows/airqo_etl_utils/weather_data_utils.py (1)
49-49
: Clean simplification of return logic!The removal of unnecessary DataFrame construction for empty results simplifies the code while maintaining the required functionality.
src/workflows/dags/data_warehouse.py (3)
190-194
: Excellent improvement to task resilience!The addition of retry configurations with appropriate delays enhances the robustness of the DAG tasks. The longer delay (10 minutes) for certain tasks suggests careful consideration of potential transient failures.
Also applies to: 207-211, 224-228, 241-244
137-142
: Consistent implementation of data extraction!The use of
DataUtils.extract_data_from_bigquery
aligns well with the codebase's consolidated approach to data extraction.
316-321
: Maintains consistency in data extraction approach!The implementation follows the same pattern as other data extraction tasks, contributing to a more maintainable codebase.
data = DataUtils.extract_data_from_bigquery( | ||
DataType.AVERAGED, | ||
start_date_time=start_date_time, | ||
end_date_time=end_date_time, | ||
frequency=Frequency.HOURLY, | ||
device_category=DeviceCategory.BAM, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove or utilize the unused data extraction
The data extracted using DataUtils.extract_data_from_bigquery
is stored in the data
variable but never used. This could lead to unnecessary computation and resource usage.
Either remove the unused extraction or update the return statement to use this data:
- data = DataUtils.extract_data_from_bigquery(
- DataType.AVERAGED,
- start_date_time=start_date_time,
- end_date_time=end_date_time,
- frequency=Frequency.HOURLY,
- device_category=DeviceCategory.BAM,
- )
-
return DataWarehouseUtils.extract_hourly_weather_data(
start_date_time=start_date_time, end_date_time=end_date_time
)
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
data = DataUtils.extract_data_from_bigquery( | |
DataType.AVERAGED, | |
start_date_time=start_date_time, | |
end_date_time=end_date_time, | |
frequency=Frequency.HOURLY, | |
device_category=DeviceCategory.BAM, | |
) | |
return DataWarehouseUtils.extract_hourly_weather_data( | |
start_date_time=start_date_time, end_date_time=end_date_time | |
) |
🧰 Tools
🪛 Ruff (0.8.2)
56-56: Local variable data
is assigned to but never used
Remove assignment to unused variable data
(F841)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (3)
src/workflows/airqo_etl_utils/datautils.py (1)
128-142
: LGTM! Error handling improvements look good.The changes enhance error handling and improve the code structure. The addition of the actual error message in the exception logging will help with debugging.
Consider adding error recovery or retry logic for transient BigQuery API errors:
try: datasource = configuration.DataSource table = datasource.get(datatype).get(device_category).get(frequency) cols = bigquery.get_columns(table=table) +except ConnectionError as e: + logger.warning(f"Transient BigQuery API error: {e}. Retrying...") + time.sleep(1) # Add retry logic here + cols = bigquery.get_columns(table=table) except KeyError: logger.exception( f"Invalid combination: {datatype}, {device_category}, {frequency}"src/workflows/airqo_etl_utils/config.py (1)
Line range hint
1-524
: Consider enhancing the configuration structure.While the current implementation is functional, consider these improvements for better maintainability and type safety:
- Add type hints to the configuration class
- Add docstrings explaining the configuration structure
- Implement configuration validation
Example implementation:
from typing import Dict, Union, TypeVar, Optional from dataclasses import dataclass T = TypeVar('T', str, Dict) @dataclass class Config: """Configuration class for AirQo ETL utilities. Attributes: DataSource (Dict): Mapping of data types, device categories, and frequencies to BigQuery tables SCHEMA_FILE_MAPPING (Dict): Mapping of BigQuery tables to their schema files """ def __post_init__(self): """Validate configuration values.""" self._validate_config() def _validate_config(self) -> None: """Validate required configuration values.""" required_env_vars = ['GOOGLE_APPLICATION_CREDENTIALS', 'GOOGLE_CLOUD_PROJECT_ID'] missing_vars = [var for var in required_env_vars if not getattr(self, var)] if missing_vars: raise ValueError(f"Missing required environment variables: {', '.join(missing_vars)}")src/workflows/airqo_etl_utils/bigquery_api.py (1)
647-647
: Address memory optimization concern.The comment questions why the DataFrame is being copied and suggests potential memory wastage. The copy appears unnecessary since we're only performing timestamp conversion.
Consider this memory-efficient approach:
- data = dataframe.copy() # Not sure why this dataframe is being copied. # Memory wastage? - data["timestamp"] = pd.to_datetime(data["timestamp"]) + # Convert timestamp without copying the entire DataFrame + timestamps = pd.to_datetime(dataframe["timestamp"]) try: - start_date_time = date_to_str(data["timestamp"].min()) - end_date_time = date_to_str(data["timestamp"].max()) + start_date_time = date_to_str(timestamps.min()) + end_date_time = date_to_str(timestamps.max())
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
src/workflows/airqo_etl_utils/bigquery_api.py
(1 hunks)src/workflows/airqo_etl_utils/config.py
(1 hunks)src/workflows/airqo_etl_utils/datautils.py
(1 hunks)src/workflows/dags/airqo_measurements.py
(3 hunks)
🧰 Additional context used
📓 Learnings (1)
src/workflows/dags/airqo_measurements.py (4)
Learnt from: AnthonyByansi
PR: airqo-platform/AirQo-api#3262
File: src/workflows/dags/airqo_measurements.py:0-0
Timestamp: 2024-11-12T09:15:21.461Z
Learning: The dataset name `AirQodataset` was corrected to `AirQo-dataset` in multiple places within the `airqo_measurements.py` file to ensure consistency.
Learnt from: AnthonyByansi
PR: airqo-platform/AirQo-api#3262
File: src/workflows/dags/airqo_measurements.py:0-0
Timestamp: 2024-11-12T09:15:21.461Z
Learning: Ensure that dataset names are consistently used as `AirQo-dataset` in the AirQo API codebase.
Learnt from: AnthonyByansi
PR: airqo-platform/AirQo-api#3262
File: src/workflows/dags/airqo_measurements.py:113-122
Timestamp: 2024-11-12T09:15:21.461Z
Learning: The dataset name `AirQo-dataset` should be used consistently in the Great Expectations tasks within Airflow DAGs.
Learnt from: AnthonyByansi
PR: airqo-platform/AirQo-api#3262
File: src/workflows/dags/airqo_measurements.py:0-0
Timestamp: 2024-11-12T09:15:21.461Z
Learning: The dataset name should be `AirQo-dataset` for consistency in the codebase.
🪛 Ruff (0.8.2)
src/workflows/dags/airqo_measurements.py
491-491: airqo_etl_utils.airqo_utils.AirQoDataUtils
imported but unused
Remove unused import: airqo_etl_utils.airqo_utils.AirQoDataUtils
(F401)
⏰ Context from checks skipped due to timeout of 90000ms (2)
- GitHub Check: Analyze (python)
- GitHub Check: Analyze (javascript)
🔇 Additional comments (4)
src/workflows/dags/airqo_measurements.py (2)
441-445
: LGTM! Parameter naming is now more consistent.The renaming of the parameter from
airqo_data
todata
improves consistency with other methods in the codebase.
Line range hint
476-495
: LGTM! Parameter naming consistency maintained.The parameter renaming aligns with the codebase's naming conventions while preserving the existing functionality.
🧰 Tools
🪛 Ruff (0.8.2)
491-491:
airqo_etl_utils.airqo_utils.AirQoDataUtils
imported but unusedRemove unused import:
airqo_etl_utils.airqo_utils.AirQoDataUtils
(F401)
src/workflows/airqo_etl_utils/config.py (1)
403-403
: LGTM! Configuration update aligns with data cleanup objectives.The change from RAW to HOURLY frequency for weather data processing is consistent with the consolidated data cleanup effort. However, it would be beneficial to verify that all dependent systems are prepared to handle hourly weather data.
Let's verify the impact on dependent systems:
✅ Verification successful
Verified: Weather frequency change is safe and well-structured
The change from RAW to HOURLY frequency for weather data is safe because:
- Both frequencies use the same schema ("weather_data.json")
- No hardcoded dependencies on RAW frequency exist in the codebase
- The BigQuery API layer is frequency-agnostic, accessing tables through configuration
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Check for direct usage of weather data frequency in the codebase # Expected results: No direct dependencies on raw weather data frequency # Search for weather data processing patterns rg -l "DataType.AVERAGED.*DeviceCategory.WEATHER" # Search for direct references to weather tables rg -l "BIGQUERY_(RAW|HOURLY)_WEATHER_TABLE"Length of output: 183
Script:
#!/bin/bash # Check how bigquery_api.py handles weather data rg -A 5 -B 5 "WEATHER" src/workflows/airqo_etl_utils/bigquery_api.py # Look for any RAW frequency references specifically with weather rg -A 3 -B 3 "RAW.*WEATHER|WEATHER.*RAW" --multilineLength of output: 4126
src/workflows/airqo_etl_utils/bigquery_api.py (1)
Line range hint
789-789
: Document thefetch_raw_readings
method and review the frequency.The method lacks documentation and has a TODO comment about reviewing the frequency. This could impact data quality and system performance.
Let's analyze the query frequency patterns:
Would you like me to:
- Generate comprehensive documentation for the method?
- Propose an optimal frequency based on the codebase analysis?
Also applies to: 793-793
def send_raw_measurements_to_bigquery(data: pd.DataFrame): | ||
from airqo_etl_utils.airqo_utils import AirQoDataUtils | ||
from airqo_etl_utils.bigquery_api import BigQueryApi | ||
|
||
data = DataUtils.format_data_for_bigquery( | ||
airqo_data, DataType.RAW, DeviceCategory.GENERAL, Frequency.RAW | ||
data, DataType.RAW, DeviceCategory.GENERAL, Frequency.RAW |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Remove unused import.
The AirQoDataUtils
import is not used in this method and should be removed.
Apply this diff to remove the unused import:
-from airqo_etl_utils.airqo_utils import AirQoDataUtils
from airqo_etl_utils.bigquery_api import BigQueryApi
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
def send_raw_measurements_to_bigquery(data: pd.DataFrame): | |
from airqo_etl_utils.airqo_utils import AirQoDataUtils | |
from airqo_etl_utils.bigquery_api import BigQueryApi | |
data = DataUtils.format_data_for_bigquery( | |
airqo_data, DataType.RAW, DeviceCategory.GENERAL, Frequency.RAW | |
data, DataType.RAW, DeviceCategory.GENERAL, Frequency.RAW | |
def send_raw_measurements_to_bigquery(data: pd.DataFrame): | |
from airqo_etl_utils.bigquery_api import BigQueryApi | |
data = DataUtils.format_data_for_bigquery( | |
data, DataType.RAW, DeviceCategory.GENERAL, Frequency.RAW |
🧰 Tools
🪛 Ruff (0.8.2)
491-491: airqo_etl_utils.airqo_utils.AirQoDataUtils
imported but unused
Remove unused import: airqo_etl_utils.airqo_utils.AirQoDataUtils
(F401)
try: | ||
start_date_time = date_to_str(data["timestamp"].min()) | ||
end_date_time = date_to_str(data["timestamp"].max()) | ||
except Exception as e: | ||
logger.exception(f"Time conversion error {e}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Good addition of error handling, but consider enhancing the error recovery strategy.
The addition of error handling for timestamp conversion is a good practice. However, the current implementation continues execution after logging the error, which could lead to undefined behavior since start_date_time
and end_date_time
might be undefined after an exception.
Consider this enhanced implementation:
try:
start_date_time = date_to_str(data["timestamp"].min())
end_date_time = date_to_str(data["timestamp"].max())
except Exception as e:
- logger.exception(f"Time conversion error {e}")
+ logger.exception(f"Time conversion error: {e}")
+ raise ValueError("Failed to determine date range from timestamp data") from e
This change:
- Provides more context in the error message
- Prevents silent failures by re-raising the exception
- Maintains the exception chain using
raise ... from e
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
try: | |
start_date_time = date_to_str(data["timestamp"].min()) | |
end_date_time = date_to_str(data["timestamp"].max()) | |
except Exception as e: | |
logger.exception(f"Time conversion error {e}") | |
try: | |
start_date_time = date_to_str(data["timestamp"].min()) | |
end_date_time = date_to_str(data["timestamp"].max()) | |
except Exception as e: | |
logger.exception(f"Time conversion error: {e}") | |
raise ValueError("Failed to determine date range from timestamp data") from e |
Description
Just some clean up.
Summary by CodeRabbit
Data Extraction
Schema Changes
Error Handling
Performance