-
Notifications
You must be signed in to change notification settings - Fork 22
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor fix update/code clean up #4042
Refactor fix update/code clean up #4042
Conversation
Updates from airqo staging
📝 WalkthroughWalkthroughThe changes in this pull request involve significant modifications across multiple files in the Changes
Possibly related PRs
Suggested labels
Suggested reviewers
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Outside diff range and nitpick comments (7)
src/workflows/airqo_etl_utils/airqo_utils.py (5)
746-746
: Ensure the uniqueness ofdevice_id
in the device lookup.While creating the
device_lookup
dictionary, it's crucial to ensure that thedevice_id
values are unique across all devices. If there are duplicatedevice_id
s, later retrievals may not point to the intended device details.To safeguard against potential duplicates, consider adding a check or handling mechanism:
device_lookup = {} for device in devices: device_id = device.get("device_id") if device_id: if device_id in device_lookup: logger.warning(f"Duplicate device_id found: {device_id}") device_lookup[device_id] = device
Line range hint
833-846
: Handle potential missingweather_stations
in site data.When iterating over
site.get("weather_stations", [])
, ensure thatweather_stations
is indeed a list to prevent possibleTypeError
exceptions ifNone
or unexpected types are encountered.To enhance robustness, you can safeguard the iteration:
weather_stations = site.get("weather_stations") or [] for station in weather_stations: # proceed with processing
895-895
: Correct typo in comment for clarity.There's a typographical error in the comment which might cause confusion.
Apply this diff to correct the typo:
- # Raws with more than 1 numeric values + # Rows with more than 1 numeric value
Line range hint
1074-1092
: Clarify the intention behind model assignment logic.The comments and code suggest possible confusion regarding the fallback mechanism for calibration models.
To improve readability and maintainability, consider refactoring the code to make the logic explicit:
# Load default models outside the loop default_rf_model = GCSUtils.get_trained_model_from_gcs(...) default_lasso_model = GCSUtils.get_trained_model_from_gcs(...) for city, group in grouped_df: if str(city).lower() in [c.value.lower() for c in CityModel]: try: rf_model = GCSUtils.get_trained_model_from_gcs(...) lasso_model = GCSUtils.get_trained_model_from_gcs(...) except Exception as ex: logger.exception(f"Error getting model for city {city}: {ex}") rf_model = default_rf_model lasso_model = default_lasso_model else: rf_model = default_rf_model lasso_model = default_lasso_model # Proceed with calibration using rf_model and lasso_modelThis approach ensures that the default models are used when city-specific models are unavailable or an error occurs, and it clarifies the flow of model selection.
1106-1119
: Optimize calibration value assignments and handle missing columns.The repeated checks for the existence of 'pm2_5_calibrated_value' and 'pm10_calibrated_value' columns can be streamlined. Additionally, setting these values to
None
when missing may be unnecessary if the columns don't exist.Refactor the code as follows:
if "pm2_5_calibrated_value" in data.columns: data.loc[to_calibrate, "pm2_5"] = data.loc[to_calibrate, "pm2_5_calibrated_value"] if "pm10_calibrated_value" in data.columns: data.loc[to_calibrate, "pm10"] = data.loc[to_calibrate, "pm10_calibrated_value"]Since you're filling missing 'pm2_5' and 'pm10' values later using the raw values, explicitly setting them to
None
may not be necessary.src/workflows/airqo_etl_utils/meta_data_utils.py (1)
170-178
: Good use of approximate coordinates for weather station proximity.Using approximate_latitude/longitude is more appropriate for weather station calculations as it provides a more stable reference point.
Consider caching the weather station calculations if they're frequently accessed, as these coordinates are less likely to change frequently.
src/workflows/airqo_etl_utils/data_validator.py (1)
217-241
: Enhance error handling with more specific error messages.While the addition of error handling is good, we can improve it:
- The KeyError message could be more specific about the expected keys
- The generic Exception catch could log more context
Consider this improvement:
try: devices = devices[ [ "device_name", "site_id", "device_latitude", "device_longitude", "network", ] ] data = pd.merge( left=data, right=devices, on=["device_name", "site_id", "network"], how="left", ) except KeyError as e: - logger.exception( - f"KeyError: The key(s) '{e.args}' are not available in the returned devices data." - ) + logger.exception( + f"KeyError: Missing required column(s) {e.args}. Expected columns: device_name, site_id, device_latitude, device_longitude, network" + ) return None except Exception as e: - logger.exception(f"An error occured: {e}") + logger.exception(f"Error during device data merge: {e}. Data shape: {data.shape}, Devices shape: {devices.shape}") return None
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (12)
src/workflows/airqo_etl_utils/airqo_api.py
(2 hunks)src/workflows/airqo_etl_utils/airqo_utils.py
(10 hunks)src/workflows/airqo_etl_utils/data_validator.py
(2 hunks)src/workflows/airqo_etl_utils/data_warehouse_utils.py
(1 hunks)src/workflows/airqo_etl_utils/meta_data_utils.py
(5 hunks)src/workflows/dags/airnow.py
(3 hunks)src/workflows/dags/airqo_automated_tweets.py
(1 hunks)src/workflows/dags/airqo_bam_measurements.py
(2 hunks)src/workflows/dags/airqo_measurements.py
(4 hunks)src/workflows/dags/kcca_measurements.py
(2 hunks)src/workflows/dags/meta_data.py
(1 hunks)src/workflows/dags/weather_measurements.py
(1 hunks)
🔇 Additional comments (21)
src/workflows/airqo_etl_utils/airqo_utils.py (1)
1014-1015
: Ensure 'city' exists in site data before creating DataFrame.
When creating sites_df
with pd.DataFrame(sites, columns=["site_id", "city"])
, if any site lacks the 'city' key, it could lead to a KeyError
.
Run the following script to verify that all sites contain the 'city' key:
src/workflows/dags/airqo_automated_tweets.py (1)
21-21
: Confirm the change from tenant
to network
parameter.
The get_sites
method now uses the network
parameter. Ensure that this change is consistent with the updated method signature in AirQoApi
and that all other calls to get_sites
are updated accordingly.
src/workflows/dags/kcca_measurements.py (2)
4-4
: Add missing import for AirflowFailException
.
Good catch adding the import for AirflowFailException
to enable proper error handling.
60-63
: Enhance error handling when data processing fails.
Raising an AirflowFailException
when data
is empty ensures that the DAG fails gracefully, highlighting issues with the message broker.
src/workflows/airqo_etl_utils/data_warehouse_utils.py (2)
109-111
: Update method signature to use network
instead of tenant
.
Changing the parameter from tenant
to network
aligns with the updates in the AirQoApi
and promotes consistency across the codebase.
115-116
: Verify renaming of latitude and longitude columns.
Renaming "approximate_latitude" to "site_latitude" and "approximate_longitude" to "site_longitude" enhances clarity. However, confirm that these columns exist in the data returned by airqo_api.get_sites()
.
Run the following script to verify the presence of these columns:
✅ Verification successful
Column renaming is correctly aligned with the data sources
Based on the code analysis:
- The sites API response includes
approximate_latitude
andapproximate_longitude
fields as shown in theget_sites()
method implementation - The data warehouse schema includes
site_latitude
andsite_longitude
fields - The mapping in the code correctly transforms the source fields to the target schema fields
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Check for the existence of 'approximate_latitude' and 'approximate_longitude' in site data.
# Test: Expect both columns to be present.
python -c '
from airqo_etl_utils.airqo_api import AirQoApi
sites = AirQoApi().get_sites()
if all("approximate_latitude" in site and "approximate_longitude" in site for site in sites):
print("Both columns are present in all site data.")
else:
print("Some sites are missing the required columns.")
'
Length of output: 404
Script:
#!/bin/bash
# Let's examine the API implementation to understand the data structure
rg -A 10 "def get_sites"
# Also search for any site-related data structures or models
rg -l "site_latitude|site_longitude|approximate_latitude|approximate_longitude"
# Check the schema or data structure definitions
ast-grep --pattern 'site = {
$$$
}'
Length of output: 4918
Script:
#!/bin/bash
# Let's check the schema files and API implementation for the column names
cat src/workflows/airqo_etl_utils/schema/sites.json
cat src/workflows/airqo_etl_utils/airqo_api.py
# Also check the data warehouse schema since that's where the mapping is being used
cat src/workflows/airqo_etl_utils/schema/data_warehouse.json
Length of output: 40360
src/workflows/dags/airqo_bam_measurements.py (2)
13-13
: Well-implemented error handling enhancement!
The addition of AirflowFailException
and its implementation provides clear failure signaling when the message broker processing fails. This is a good practice as it prevents silent failures and makes debugging easier.
Also applies to: 157-160
154-154
: LGTM! Parameter ordering is consistent.
The reordering of the topic parameter maintains functionality while improving consistency with other similar calls in the codebase.
src/workflows/dags/airnow.py (1)
6-6
: Consistent error handling implementation across DAGs!
The error handling implementation is well-structured and consistent between historical and realtime DAGs. This uniformity makes the code more maintainable and predictable.
Also applies to: 55-58, 143-146
src/workflows/dags/meta_data.py (1)
215-215
: 🛠️ Refactor suggestion
Parameter naming inconsistency detected.
While the change from tenant
to network
in update_nearest_weather_stations
appears intentional, other similar methods (update_distance_measures
, refresh_airqlouds
, refresh_grids
) still use the tenant
parameter. This inconsistency could lead to confusion.
Let's verify if this is part of a gradual migration:
Consider updating all related methods to use consistent parameter naming. If this is part of a gradual migration, please add a TODO comment explaining the transition plan.
src/workflows/airqo_etl_utils/meta_data_utils.py (3)
158-159
: LGTM! Consistent parameter update.
The change maintains consistency with the extract_sites_from_api method.
195-204
: LGTM! Consistent coordinate and network parameter updates.
The changes align with the previous method updates.
122-123
: LGTM! Parameter change aligns with network-based filtering.
The change from tenant-based to network-based filtering is consistent with the PR objectives.
Let's verify the network values being used across the codebase:
✅ Verification successful
Let me verify the usage of the default "all" value by searching for other calls to get_sites.
Let me check the implementation of the AirQoApi.get_sites method to understand the default behavior.
Parameter change is well-implemented and consistent across the codebase
The verification shows that:
- The
network
parameter is properly implemented inAirQoApi.get_sites()
with "all" as default - Most direct calls specify
network="airqo"
explicitly - When
network="all"
, the API request is made without the network filter, which is the expected behavior - The docstring and implementation align with the network-based filtering approach
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for network parameter usage in function calls
rg -A 2 "get_sites\(.*network="
Length of output: 2012
Script:
#!/bin/bash
# Search for get_sites calls without network parameter
rg "get_sites\([^\n]*\)" --no-filename | grep -v "network="
Length of output: 412
Script:
#!/bin/bash
# Search for get_sites method implementation in AirQoApi
ast-grep --pattern 'def get_sites($$$) -> $_:
$$$'
Length of output: 6120
src/workflows/dags/weather_measurements.py (1)
247-247
: LGTM! Consistent with network-based filtering approach.
The change aligns with the broader refactoring effort.
src/workflows/airqo_etl_utils/data_validator.py (1)
192-192
: LGTM! Optional topic parameter provides more flexibility.
Making the topic parameter optional with a default value of None is a good change.
src/workflows/dags/airqo_measurements.py (4)
6-6
: LGTM: Good addition of AirflowFailException import
The addition of AirflowFailException is appropriate for proper error handling in Airflow tasks.
114-120
: Well-structured error handling for message broker processing
The addition of explicit error handling with AirflowFailException is a good practice. The error message clearly indicates the potential issue with Kafka service.
406-413
: Consistent error handling pattern maintained
Good consistency in error handling implementation across similar functions. The error message remains clear and actionable.
457-464
: Consistent error handling maintained across tasks
The error handling pattern is consistently applied across all message broker related tasks, which is a good practice.
src/workflows/airqo_etl_utils/airqo_api.py (2)
Line range hint 722-769
: Well-documented parameter change from tenant to network
The method signature change aligns with the PR objectives. The documentation is thorough and the return type structure remains consistent.
771-772
: Clean implementation of network parameter handling
The query parameter logic is simplified and properly handles the "all" case. The implementation is consistent with the method signature change.
device_details = device_lookup.get(device_id) | ||
if not device_details: | ||
logger.exception( | ||
f"Device number {device_number} not found in device list." | ||
f"Device number {device_id} not found in device list." |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Use appropriate logging level for missing device details.
The use of logger.exception
is intended for logging exceptions along with stack traces. Since not finding a device in the lookup is not an exception but a possible data inconsistency, consider using logger.warning
instead.
Apply this change to adjust the logging level:
- logger.exception(
+ logger.warning(
f"Device number {device_id} not found in device list."
)
Additionally, consider correcting the log message to reference "Device ID" instead of "Device number" for clarity.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
device_details = device_lookup.get(device_id) | |
if not device_details: | |
logger.exception( | |
f"Device number {device_number} not found in device list." | |
f"Device number {device_id} not found in device list." | |
device_details = device_lookup.get(device_id) | |
if not device_details: | |
logger.warning( | |
f"Device number {device_id} not found in device list." |
# TODO: Need to opt for a different approach eg forward fill, can't do here as df only has data of last 1 hour. Perhaps use raw data only? | ||
# May have to rewrite entire pipeline flow | ||
data[columns_to_fill] = data[columns_to_fill].fillna(0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Re-evaluate filling NaN values with zero in sensor data.
Filling missing sensor readings with zero may introduce inaccuracies since a zero value could be misinterpreted as a valid measurement rather than missing data.
Consider using a method that preserves the distinction between missing and zero values, such as forward-filling or imputing with the mean or median where appropriate.
- data[columns_to_fill] = data[columns_to_fill].fillna(0)
+ data[columns_to_fill] = data[columns_to_fill].interpolate(method='linear', limit_direction='forward', axis=0)
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
data[columns_to_fill] = data[columns_to_fill].fillna(0) | |
data[columns_to_fill] = data[columns_to_fill].interpolate(method='linear', limit_direction='forward', axis=0) |
# Explicitly filter data to calibrate. | ||
to_calibrate = data["network"] == "airqo" | ||
data_to_calibrate = data.loc[to_calibrate] | ||
data_to_calibrate.dropna(subset=input_variables, inplace=True) | ||
grouped_df = data_to_calibrate.groupby("city", dropna=False) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codebase verification
Missing city values are handled with a default value
The codebase shows that missing city values are handled by filling them with a default value "Missing City" before any groupby operations, as seen in analytics/jobs/reports/main.py
. This same pattern should be applied here to prevent potential issues with missing city values.
Consider adding:
data_to_calibrate['city'] = data_to_calibrate['city'].fillna('Missing City')
before the groupby operation
🔗 Analysis chain
Handle potential missing 'city' values when grouping data.
Grouping by 'city' when calibrating data assumes all entries have a 'city' value. If 'city' is missing for some records, this could lead to unexpected behavior.
Run the following script to identify records with missing 'city' values:
Let me analyze the codebase to understand how the city column is handled before and after the grouping operation. This will help us determine if there are any existing safeguards or potential issues with missing city values.
This will help us understand how the city column is handled and if there are any existing safeguards for missing values before the groupby operation.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Find records without 'city' after merging.
# Test: Check for missing 'city' values in data_to_calibrate. Expect: No records with missing 'city'.
python -c '
import pandas as pd
data = pd.read_csv("data.csv") # Replace with the actual data source
to_calibrate = data["network"] == "airqo"
data_to_calibrate = data.loc[to_calibrate]
missing_city = data_to_calibrate[data_to_calibrate["city"].isnull()]
if not missing_city.empty:
print("Records with missing city:")
print(missing_city)
'
Length of output: 447
Script:
#!/bin/bash
# Let's analyze the codebase to understand the data processing flow and city handling
# First, find where the data is loaded and how city is handled
rg -l "city" --type py
# Then look for specific patterns related to city column handling
rg "city" --type py -A 3 -B 3
# Also check for any data validation or preprocessing of city values
ast-grep --pattern 'def $func($$$) {
$$$
city$$$
$$$
}'
Length of output: 52521
Script:
#!/bin/bash
# Let's check how the city column is populated before the groupby operation
rg -A 5 -B 5 "data\[\"city\"\]|data\['city'\]|data\.city" --type py
# Also check for any data validation or preprocessing specifically for city values
rg "dropna.*city" --type py
rg "fillna.*city" --type py
# Look for any error handling around city values
rg "try.*city.*except" --type py -A 5
Length of output: 2973
@@ -1071,6 +1071,8 @@ def calibrate_data(data: pd.DataFrame) -> pd.DataFrame: | |||
), | |||
) | |||
for city, group in grouped_df: | |||
# What was the intention of this? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Mnoble-19 have you ever noticed this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks @NicholasTurner23
Description
[Provide a brief description of the changes made in this PR]
Summary by CodeRabbit
New Features
Bug Fixes
Documentation
Chores