-
Notifications
You must be signed in to change notification settings - Fork 22
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update fix/consolidated data cleanup #4203
Update fix/consolidated data cleanup #4203
Conversation
📝 WalkthroughWalkthroughThis pull request introduces significant refactoring of data extraction and processing utilities in the AirQo ETL workflows. The changes primarily involve consolidating data handling methods from Changes
Possibly related PRs
Suggested Labels
Suggested Reviewers
Poem
Finishing Touches
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Nitpick comments (3)
src/workflows/airqo_etl_utils/datautils.py (2)
105-110
: Add type hints to the static method decorator.The method is missing the @staticmethod decorator and return type hint.
Apply this diff:
+ @staticmethod def format_data_for_bigquery( data: pd.DataFrame, datatype: DataType, device_category: DeviceCategory, frequency: Frequency, - ) -> pd.DataFrame: + ) -> pd.DataFrame:
Line range hint
242-243
: Address the TODO comment and consider moving parameter mappings to configuration.The TODO comment about cleaning up the code should be addressed, and the parameter mappings could be moved to the configuration file for better maintainability.
Would you like me to:
- Generate a cleaner implementation of this method?
- Create a configuration structure for the parameter mappings?
- Open a GitHub issue to track this cleanup task?
src/workflows/airqo_etl_utils/config.py (1)
389-410
: Add docstring to explain the DataSource configuration structure.The DataSource dictionary would benefit from documentation explaining its structure and usage.
Add this docstring above the DataSource dictionary:
+ """ + Mapping of data types to their corresponding BigQuery tables. + + Structure: + DataSource = { + DataType: { + DeviceCategory: { + Frequency: table_name + } + } + } + """ DataSource = { DataType.RAW: { DeviceCategory.GENERAL: {
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (6)
src/workflows/airqo_etl_utils/airqo_utils.py
(1 hunks)src/workflows/airqo_etl_utils/config.py
(2 hunks)src/workflows/airqo_etl_utils/datautils.py
(4 hunks)src/workflows/dags/airqo_bam_measurements.py
(5 hunks)src/workflows/dags/airqo_measurements.py
(7 hunks)src/workflows/dags/daily_measurements.py
(4 hunks)
🧰 Additional context used
🪛 Ruff (0.8.2)
src/workflows/airqo_etl_utils/datautils.py
82-82: Local variable e
is assigned to but never used
Remove assignment to unused variable e
(F841)
86-86: Local variable e
is assigned to but never used
Remove assignment to unused variable e
(F841)
133-133: Local variable e
is assigned to but never used
Remove assignment to unused variable e
(F841)
137-137: Local variable e
is assigned to but never used
Remove assignment to unused variable e
(F841)
src/workflows/dags/airqo_measurements.py
483-483: Undefined name data
(F821)
⏰ Context from checks skipped due to timeout of 90000ms (1)
- GitHub Check: Analyze (javascript)
🔇 Additional comments (10)
src/workflows/airqo_etl_utils/airqo_utils.py (1)
473-473
: Ensure consistency in table references for BAM data.The table reference in the
clean_bam_data
method has been updated tobam_hourly_measurements_table
. Please verify that all occurrences ofbam_measurements_table
have been updated across the codebase to maintain consistency.Run the following script to check for any remaining references to the old table name:
src/workflows/dags/daily_measurements.py (3)
38-42
: Update data extraction to useDataUtils
with device category parameter.The
extract
function now utilizesDataUtils.extract_data_from_bigquery
withdevice_category
set toDeviceCategory.GENERAL
. This change aligns with the refactored data utilities and ensures proper categorization of devices during data extraction.
74-78
: Duplicate of previous comment regardingDataUtils
usage.
113-117
: Duplicate of previous comment regardingDataUtils
usage.src/workflows/dags/airqo_bam_measurements.py (6)
41-42
: Switch toDataUtils.format_data_for_bigquery
for data formatting.The code now uses
DataUtils.format_data_for_bigquery
to format data before loading into BigQuery, providing consistency across data processing tasks.
56-57
: Duplicate of previous comment regardingDataUtils
usage.
103-104
: Duplicate of previous comment regardingDataUtils
usage.
123-124
: Duplicate of previous comment regardingDataUtils
usage.
129-129
: Ensure consistency in BAM table references.The table reference has been updated to
bam_hourly_measurements_table
. Please verify that all instances ofbam_measurements_table
have been updated throughout the codebase to maintain consistency.Run the following script to find any remaining references to the old table name:
62-62
: Ensure consistency in BAM table references.The table reference has been updated to
bam_hourly_measurements_table
. Please verify that all instances ofbam_measurements_table
have been updated throughout the codebase to maintain consistency.Run the following script to find any remaining references to the old table name:
try: | ||
source = configuration.DataSource.get(datatype) | ||
table = source.get(device_category).get(frequency) | ||
except KeyError as e: | ||
logger.exception( | ||
f"Invalid combination: {datatype}, {device_category}, {frequency}" | ||
) | ||
except Exception as e: | ||
logger.exception("An unexpected error occurred during column retrieval") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Enhance error handling by including original error messages.
The current error handling loses valuable context by not including the original error message in the logs.
Apply this diff to improve error handling:
try:
source = configuration.DataSource.get(datatype)
table = source.get(device_category).get(frequency)
except KeyError as e:
logger.exception(
- f"Invalid combination: {datatype}, {device_category}, {frequency}"
+ f"Invalid combination: {datatype}, {device_category}, {frequency}. Error: {str(e)}"
)
+ raise
except Exception as e:
logger.exception("An unexpected error occurred during column retrieval")
+ raise
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
try: | |
source = configuration.DataSource.get(datatype) | |
table = source.get(device_category).get(frequency) | |
except KeyError as e: | |
logger.exception( | |
f"Invalid combination: {datatype}, {device_category}, {frequency}" | |
) | |
except Exception as e: | |
logger.exception("An unexpected error occurred during column retrieval") | |
try: | |
source = configuration.DataSource.get(datatype) | |
table = source.get(device_category).get(frequency) | |
except KeyError as e: | |
logger.exception( | |
f"Invalid combination: {datatype}, {device_category}, {frequency}. Error: {str(e)}" | |
) | |
raise | |
except Exception as e: | |
logger.exception("An unexpected error occurred during column retrieval") | |
raise |
🧰 Tools
🪛 Ruff (0.8.2)
82-82: Local variable e
is assigned to but never used
Remove assignment to unused variable e
(F841)
86-86: Local variable e
is assigned to but never used
Remove assignment to unused variable e
(F841)
try: | ||
datasource = configuration.DataSource | ||
cols = datasource.get(datatype).get(device_category).get(frequency) | ||
except KeyError as e: | ||
logger.exception( | ||
f"Invalid combination: {datatype}, {device_category}, {frequency}" | ||
) | ||
except Exception as e: | ||
logger.exception("An unexpected error occurred during column retrieval") | ||
|
||
return Utils.populate_missing_columns(data=data, columns=cols) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Initialize cols variable and improve error handling.
The cols variable could be undefined if an exception occurs, and the error handling could be more informative.
Apply this diff:
+ cols = []
try:
datasource = configuration.DataSource
cols = datasource.get(datatype).get(device_category).get(frequency)
except KeyError as e:
logger.exception(
- f"Invalid combination: {datatype}, {device_category}, {frequency}"
+ f"Invalid combination: {datatype}, {device_category}, {frequency}. Error: {str(e)}"
)
+ raise
except Exception as e:
logger.exception("An unexpected error occurred during column retrieval")
+ raise
return Utils.populate_missing_columns(data=data, columns=cols)
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
try: | |
datasource = configuration.DataSource | |
cols = datasource.get(datatype).get(device_category).get(frequency) | |
except KeyError as e: | |
logger.exception( | |
f"Invalid combination: {datatype}, {device_category}, {frequency}" | |
) | |
except Exception as e: | |
logger.exception("An unexpected error occurred during column retrieval") | |
return Utils.populate_missing_columns(data=data, columns=cols) | |
cols = [] | |
try: | |
datasource = configuration.DataSource | |
cols = datasource.get(datatype).get(device_category).get(frequency) | |
except KeyError as e: | |
logger.exception( | |
f"Invalid combination: {datatype}, {device_category}, {frequency}. Error: {str(e)}" | |
) | |
raise | |
except Exception as e: | |
logger.exception("An unexpected error occurred during column retrieval") | |
raise | |
return Utils.populate_missing_columns(data=data, columns=cols) |
🧰 Tools
🪛 Ruff (0.8.2)
133-133: Local variable e
is assigned to but never used
Remove assignment to unused variable e
(F841)
137-137: Local variable e
is assigned to but never used
Remove assignment to unused variable e
(F841)
data = DataUtils.format_data_for_bigquery( | ||
data, DataType.AVERAGED, DeviceCategory.GENERAL, Frequency.HOURLY | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix undefined variable in send_hourly_measurements_to_bigquery.
The variable 'data' is undefined. It should be 'airqo_data' based on the method parameter.
Apply this diff:
- data = DataUtils.format_data_for_bigquery(
- data, DataType.AVERAGED, DeviceCategory.GENERAL, Frequency.HOURLY
+ data = DataUtils.format_data_for_bigquery(
+ airqo_data, DataType.AVERAGED, DeviceCategory.GENERAL, Frequency.HOURLY
)
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
data = DataUtils.format_data_for_bigquery( | |
data, DataType.AVERAGED, DeviceCategory.GENERAL, Frequency.HOURLY | |
) | |
data = DataUtils.format_data_for_bigquery( | |
airqo_data, DataType.AVERAGED, DeviceCategory.GENERAL, Frequency.HOURLY | |
) |
🧰 Tools
🪛 Ruff (0.8.2)
483-483: Undefined name data
(F821)
Description
This PR removes redundant code by centralizing data functionality in a single file.
Related Issues
Summary by CodeRabbit
Release Notes
Refactor
AirQoDataUtils
with a newDataUtils
class across multiple workflowsBug Fixes
New Features
DataSource
dictionary