-
Notifications
You must be signed in to change notification settings - Fork 22
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update/integration iqair devices #4011
Update/integration iqair devices #4011
Conversation
📝 WalkthroughWalkthroughThis pull request introduces significant modifications primarily to the data extraction logic within the Changes
Possibly related PRs
Suggested labels
Suggested reviewers
Poem
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Outside diff range and nitpick comments (3)
src/workflows/airqo_etl_utils/airqo_utils.py (1)
498-500
: Correct the typo in the exception messageThere's a minor typo in the exception message: "An error occured" should be "An error occurred". Correcting this enhances readability and professionalism.
Apply this diff to fix the typo:
- logger.exception(f"An error occured: {e} - device {device['name']}") + logger.exception(f"An error occurred: {e} - device {device['name']}")src/workflows/airqo_etl_utils/config.py (1)
290-293
: Well-structured configuration mapping.The DATA_RESOLUTION_MAPPING provides a clean way to handle resolution mappings. Consider adding type hints and documentation for better maintainability.
+ # Mapping of data source resolutions to their corresponding API values + DATA_RESOLUTION_MAPPING: dict[str, dict[str, str]] = { "iqair": {"hourly": "instant", "raw": "instant", "current": "current"} }src/workflows/dags/airqo_measurements.py (1)
461-461
: Consider data pipeline architecture for calibration and merging.The TODO comment raises an important architectural consideration. The decision of when to merge calibrated data with non-airqo data can impact:
- Data processing efficiency
- Calibration accuracy
- Data consistency across sources
Consider these factors:
- Merging after calibration ensures that calibration is performed on clean, source-specific data
- However, this might miss out on environmental context from other sources during calibration
Would you like me to help create a detailed architectural proposal for this data pipeline or create a GitHub issue to track this consideration?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (10)
src/workflows/airqo_etl_utils/airnow_utils.py
(2 hunks)src/workflows/airqo_etl_utils/airqo_api.py
(0 hunks)src/workflows/airqo_etl_utils/airqo_utils.py
(7 hunks)src/workflows/airqo_etl_utils/config.py
(1 hunks)src/workflows/airqo_etl_utils/constants.py
(1 hunks)src/workflows/airqo_etl_utils/data_sources.py
(2 hunks)src/workflows/airqo_etl_utils/data_warehouse_utils.py
(1 hunks)src/workflows/dags/airqo_bam_measurements.py
(4 hunks)src/workflows/dags/airqo_measurements.py
(3 hunks)src/workflows/dags/airqo_mobile_measurements.py
(2 hunks)
💤 Files with no reviewable changes (1)
- src/workflows/airqo_etl_utils/airqo_api.py
🔇 Additional comments (15)
src/workflows/airqo_etl_utils/airqo_utils.py (4)
538-566
: Well-structured aggregation logic for low-cost sensor data
The refactored aggregate_low_cost_sensors_data
method effectively resamples and aggregates the data on an hourly basis, ensuring accurate mean calculations within device groups. The code is clear and follows best practices.
894-897
: Confirm the logic for filtering measurements based on numeric values
The code filters out rows where all numeric columns are NaN
by selecting rows with at least one numeric value. Please verify that this is the intended behavior and that it doesn't inadvertently exclude valid data.
1019-1019
: Ensure that dropping rows lacking device_id
or timestamp
is safe
Dropping rows where device_id
or timestamp
are NaN
may result in data loss if these values are missing due to upstream issues. Please confirm that all necessary data is present before dropping these rows.
Line range hint 287-297
: Ensure all calls to extract_mobile_low_cost_sensors_data
include the new resolution
parameter
You've added a resolution
parameter to extract_mobile_low_cost_sensors_data
, which enhances control over data extraction granularity. Please verify that all calls to this method throughout the codebase are updated to include the new resolution
argument to prevent any unexpected errors.
Run the following script to find any calls missing the resolution
parameter:
src/workflows/dags/airqo_mobile_measurements.py (1)
16-16
: Update consistent with the addition of resolution
parameter
The inclusion of the Frequency
import and passing Frequency.HOURLY
to extract_mobile_low_cost_sensors_data
aligns with the updated method signature. This change ensures that data is extracted at an hourly resolution as intended.
Also applies to: 25-27
src/workflows/airqo_etl_utils/data_warehouse_utils.py (1)
97-97
: Simplify column assignment for device_category
Good job simplifying the assignment by using data["device_category"] = str(DeviceCategory.LOW_COST)
. This enhances code readability without changing functionality.
src/workflows/dags/airqo_bam_measurements.py (2)
24-25
: Introduce resolution
parameter with Frequency.HISTORICAL
for historical BAM data extraction
By importing Frequency
and passing resolution=Frequency.HISTORICAL
, you're correctly specifying the desired data resolution for historical data extraction. This aligns with the updated method signature.
Also applies to: 35-35
82-82
: Set resolution
to Frequency.RAW
for real-time BAM data extraction
Including the Frequency
import and setting resolution=Frequency.RAW
ensures that the real-time DAG extracts data at raw resolution, which is appropriate for live data processing.
Also applies to: 96-96
src/workflows/airqo_etl_utils/data_sources.py (2)
126-128
: LGTM! Well-structured configuration usage.
The use of configuration.DATA_RESOLUTION_MAPPING provides a centralized way to manage resolutions, making it easier to maintain and modify resolution mappings.
102-102
: Verify the impact of changing the default resolution parameter.
The default resolution parameter has changed from "current" to "instant". While this aligns with the new configuration mapping, we should verify existing callers.
✅ Verification successful
Let me check the context of this usage to better understand the impact.
The resolution parameter change is safe to proceed with
The only caller of the iqair
method explicitly specifies the resolution parameter (resolution=resolution
), so changing the default value from "current" to "instant" won't affect existing behavior. The resolution value is passed through from the caller's context, making this change non-breaking.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for existing calls to iqair method
rg -A 2 "\.iqair\("
Length of output: 303
Script:
#!/bin/bash
# Get more context around the iqair usage
rg -B 5 -A 5 "\.iqair\(" src/workflows/airqo_etl_utils/airqo_utils.py
# Check if resolution is explicitly specified
ast-grep --pattern 'iqair($_, resolution=$_)'
Length of output: 703
src/workflows/airqo_etl_utils/constants.py (3)
37-44
: Documentation improvements look good.
The updated documentation clearly describes each frequency option and its purpose.
57-71
: Nice modernization using match statement.
The refactoring to use Python's match statement improves readability and maintainability. However, ensure your deployment environment uses Python 3.10 or later.
✅ Verification successful
Python 3.10 requirement is satisfied for the match statement usage
Based on the Dockerfile in the same directory (src/workflows/
), this service uses Python 3.10:
FROM python:3.10.14-slim-bullseye as utils-package
The match statement was introduced in Python 3.10, so the code's use of this feature is compatible with the deployment environment.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Check for Python version specification
rg "python_requires|Python" setup.py pyproject.toml
Length of output: 157
Script:
#!/bin/bash
# Look for Python version requirements in any configuration files
fd -e toml -e cfg -e ini -e txt | xargs rg -i "python|requires"
# Also check for runtime.txt which is commonly used to specify Python version
fd "runtime.txt" --exec cat {}
# Check for any GitHub workflow files that might specify Python version
fd -e yml -e yaml -p ".github/workflows" | xargs rg "python"
Length of output: 17045
47-54
:
Verify the impact of changing RAW enum value.
The RAW enum value has changed from 1 to 0, which could affect existing code that relies on the numeric value.
Consider maintaining backward compatibility by:
- Keeping RAW as 1 and introducing a new enum for value 0
- Adding a deprecation warning for direct value comparisons
✅ Verification successful
The previous search was too specific. Let's broaden our search to understand how the Frequency enum is used throughout the codebase. This will help us determine if changing the RAW value from 1 to 0 could have any impact.
Changing RAW enum value appears safe to proceed
Based on the codebase analysis, the Frequency.RAW enum is only used in equality comparisons and parameter passing, not in direct numeric comparisons. The usage is found in:
airqo_bam_measurements.py
andairqo_measurements.py
: As a parameter valueairqo_utils.py
: In equality comparisons using==
operator
Since all usages are comparing against the enum symbolically rather than its numeric value, changing RAW from 1 to 0 should not impact the existing functionality.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for direct usage of Frequency.RAW value
rg "Frequency\.RAW\s*[!=]=\s*\d+"
Length of output: 35
Script:
#!/bin/bash
# Let's search for any usage of Frequency.RAW in the codebase
rg "Frequency\.RAW" -A 2
# Also search for potential numeric comparisons with 1 or 0 in files that mention Frequency
rg -l "Frequency" | xargs rg "(?:==|!=)\s*[01]\b" -A 2
# Let's also check if Frequency is used in database queries or data processing
ast-grep --pattern 'Frequency.RAW'
Length of output: 7645
src/workflows/dags/airqo_measurements.py (2)
156-156
: LGTM! Verify resolution parameter's impact.
The addition of the resolution=Frequency.DAILY
parameter aligns well with the data extraction requirements for historical raw measurements.
✅ Verification successful
Resolution parameter usage is consistent and well-integrated
The resolution=Frequency.DAILY
parameter is correctly used in the historical raw measurements DAG, and its counterpart Frequency.RAW_LOW_COST
is appropriately used in another context. The Frequency enum is consistently used across both Python and Java codebases, indicating proper integration of the resolution parameter.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Verify the Frequency enum contains DAILY and its usage
ast-grep --pattern 'class Frequency:
$$$
DAILY = $$$
$$$'
# Check for any other occurrences of DAILY resolution
rg "Frequency\.DAILY" -A 2
Length of output: 4048
Script:
#!/bin/bash
# Let's verify the Frequency enum definition and its usage in the Python codebase
ast-grep --pattern 'class Frequency:
$$$'
# Check for other resolution parameter usages in the DAG file
rg "resolution=" src/workflows/dags/airqo_measurements.py -A 2
Length of output: 220
315-315
: LGTM! Verify resolution parameter's impact.
The addition of the resolution=Frequency.RAW_LOW_COST
parameter appropriately specifies the data granularity for realtime low-cost sensor measurements.
@staticmethod | ||
def extract_devices_data( | ||
start_date_time: str, | ||
end_date_time: str, | ||
device_category: DeviceCategory, | ||
resolution: Frequency = Frequency.RAW, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codebase verification
Most calls to extract_devices_data
already include the resolution parameter, but one call needs updating
I found one call in src/workflows/dags/airqo_measurements.py
that doesn't specify the resolution
parameter. All other calls properly include it with various values (RAW, DAILY, HISTORICAL, etc.).
- Update the following call to include an appropriate resolution:
return AirQoDataUtils.extract_devices_data(
start_date_time=start_date_time,
end_date_time=end_date_time,
device_category=DeviceCategory.LOW_COST,
)
🔗 Analysis chain
Ensure all calls to extract_devices_data
are updated with the resolution
parameter
With the addition of resolution
to the extract_devices_data
method signature, please confirm that all invocations of this method include the appropriate resolution
argument to maintain consistency and prevent potential errors.
Run the following script to identify calls that may need updating:
Let me search for both the function calls and its definition to verify the usage of the resolution parameter. The previous ast-grep command had incorrect syntax, so I'm using ripgrep instead to find all occurrences of this function in the codebase.
I need to see a bit more context around each function call to verify if the resolution parameter is being passed. The previous output shows the function definition has resolution as an optional parameter with a default value of Frequency.RAW, but we need to check if the calls are properly handling this parameter.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Find all calls to 'extract_devices_data' missing the 'resolution' parameter.
ast-grep 'extract_devices_data($_)' --lang python
Length of output: 183
Script:
#!/bin/bash
# Search for function calls to extract_devices_data using ripgrep
rg "extract_devices_data\(" -A 2
# Also search for the function definition to understand its implementation
rg "def extract_devices_data" -A 5
Length of output: 2707
Script:
#!/bin/bash
# Let's get more context around each call to see if resolution is being passed
rg "extract_devices_data\(" -B 1 -A 4
Length of output: 4089
# TODO Update if being used. | ||
tenants = AirQoApi().get_networks(DataSource.AIRNOW) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove unintended continue
statements that disrupt data processing
The continue
statements at lines 69 and 74 within the loop over dates
effectively skip the data extraction and tenant assignment steps, resulting in network_data
remaining empty. This prevents the accumulation of BAM data and may cause downstream processes to fail or produce incorrect results.
Apply this diff to remove the continue
statements:
for start, end in dates:
- continue
query_data = AirnowDataUtils.query_bam_data(
api_key=network_api_key, start_date_time=start, end_date_time=end
)
network_data = pd.concat([network_data, query_data], ignore_index=True)
-continue
network_data["tenant"] = tenant["network"]
Also applies to: 69-69, 74-74
Description
[Provide a brief description of the changes made in this PR]
This removes redundant code and cleans up the tenant/network switch.
Summary by CodeRabbit
New Features
resolution
parameter in various data extraction methods, allowing for more granular control over data retrieval.update_latest_data_topic
in the realtime measurements DAG to enhance data processing.Bug Fixes
thingspeak
method for clearer logging of exceptions.Refactor
extract_devices_data
method by consolidating functionality from the removedextract_devices_data_
method.Frequency
enum with new values and improved documentation for clarity.Documentation