-
Notifications
You must be signed in to change notification settings - Fork 22
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update fix/analytics data export cleanup #4037
Changes from 5 commits
536fcd3
370f458
368c59d
c1cf659
734dd17
84577f2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -413,6 +413,7 @@ def download_from_bigquery( | |
frequency (str): Data frequency (e.g., 'raw', 'daily', 'hourly'). | ||
pollutants (list): List of pollutants to include in the data. | ||
data_type (str): Type of data ('raw' or 'aggregated'). | ||
filter_columns(list) | ||
weather_fields (list): List of weather fields to retrieve. | ||
|
||
Returns: | ||
|
@@ -438,14 +439,11 @@ def download_from_bigquery( | |
weather_columns = [] | ||
for pollutant in pollutants: | ||
|
||
if pollutant == "raw": | ||
key = pollutant | ||
else: | ||
key = f"{pollutant}_{data_type}" | ||
|
||
key = f"{pollutant}_{data_type}" | ||
pollutant_mapping = BIGQUERY_FREQUENCY_MAPPER.get(frequency, {}).get( | ||
key, [] | ||
) | ||
|
||
pollutant_columns.extend( | ||
cls.get_columns( | ||
cls, | ||
|
@@ -459,6 +457,10 @@ def download_from_bigquery( | |
|
||
# TODO Clean up by use using `get_columns` helper method | ||
if pollutant in {"pm2_5", "pm10", "no2"}: | ||
if data_type == "raw": | ||
# Add dummy column to fix union column number missmatch. | ||
bam_pollutant_columns.append("-1 as pm2_5") | ||
|
||
NicholasTurner23 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if frequency in ["weekly", "monthly", "yearly"]: | ||
bam_pollutant_columns.extend( | ||
[f"ROUND(AVG({pollutant}), {decimal_places}) AS {key}_value"] | ||
|
@@ -467,6 +469,7 @@ def download_from_bigquery( | |
bam_pollutant_columns.extend( | ||
[f"ROUND({pollutant}, {decimal_places}) AS {key}_value"] | ||
) | ||
|
||
# TODO Fix query when weather data is included. Currently failing | ||
if weather_fields: | ||
for field in weather_fields: | ||
|
@@ -536,12 +539,55 @@ def download_from_bigquery( | |
drop_columns.append("datetime") | ||
sorting_cols.append("datetime") | ||
|
||
if data_type == "raw": | ||
cls.simple_data_cleaning(dataframe) | ||
|
||
dataframe.drop_duplicates(subset=drop_columns, inplace=True, keep="first") | ||
dataframe.sort_values(sorting_cols, ascending=True, inplace=True) | ||
dataframe["frequency"] = frequency | ||
dataframe = dataframe.replace(np.nan, None) | ||
return dataframe | ||
|
||
@classmethod | ||
def simple_data_cleaning(cls, data: pd.DataFrame) -> pd.DataFrame: | ||
""" | ||
Perform data cleaning on a pandas DataFrame to handle specific conditions | ||
related to "pm2_5" and "pm2_5_raw_value" columns. | ||
|
||
The cleaning process includes: | ||
1. Ensuring correct numeric data types for "pm2_5" and "pm2_5_raw_value". | ||
2. Removing "pm2_5" values where "pm2_5_raw_value" has data. | ||
3. Dropping the "pm2_5_raw_value" column if it has no data at all. | ||
4. Retaining "pm2_5" values where "pm2_5_raw_value" has no data, and removing | ||
"pm2_5" values where "pm2_5_raw_value" has data. | ||
5. Dropping any column (including "pm2_5" and "pm2_5_raw_value") if it is | ||
entirely empty. | ||
|
||
Args: | ||
cls: Class reference (used in classmethods). | ||
data (pd.DataFrame): Input pandas DataFrame with "pm2_5" and | ||
"pm2_5_raw_value" columns. | ||
|
||
Returns: | ||
pd.DataFrame: Cleaned DataFrame with updates applied in place. | ||
|
||
""" | ||
data["pm2_5_raw_value"] = pd.to_numeric( | ||
data["pm2_5_raw_value"], errors="coerce" | ||
) | ||
data["pm2_5"] = pd.to_numeric(data["pm2_5"], errors="coerce") | ||
|
||
data.loc[~data["pm2_5_raw_value"].isna(), "pm2_5"] = np.nan | ||
|
||
if data["pm2_5_raw_value"].isna().all(): | ||
data.drop(columns=["pm2_5_raw_value"], inplace=True) | ||
|
||
data["pm2_5"] = data["pm2_5"].where(data["pm2_5_raw_value"].isna(), np.nan) | ||
|
||
data.dropna(how="all", axis=1, inplace=True) | ||
|
||
return data | ||
|
||
@classmethod | ||
def data_export_query( | ||
cls, | ||
|
@@ -1273,9 +1319,7 @@ def get_d3_chart_events(self, sites, start_date, end_date, pollutant, frequency) | |
) | ||
|
||
@cache.memoize() | ||
def get_d3_chart_events_v2( | ||
self, sites, start_date, end_date, pollutant, frequency, tenant | ||
): | ||
def get_d3_chart_events_v2(self, sites, start_date, end_date, pollutant, frequency): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 💡 Codebase verification Update required: Inconsistent method signatures across codebase The verification reveals that there are inconsistent implementations and usages of
This indicates a potential architectural issue where two different implementations of the same method exist with different signatures, which could lead to runtime errors. 🔗 Analysis chainVerify all usages of The method Run the following script to locate all usages: Review each occurrence to confirm that the 🏁 Scripts executedThe following scripts were executed for the analysis: Script: #!/bin/bash
# Description: Find all calls to 'get_d3_chart_events_v2' and check parameters.
rg --no-heading --line-number 'get_d3_chart_events_v2\(' src/ -A 1
Length of output: 888 |
||
if pollutant not in ["pm2_5", "pm10", "no2", "pm1"]: | ||
raise Exception("Invalid pollutant") | ||
|
||
|
@@ -1293,7 +1337,6 @@ def get_d3_chart_events_v2( | |
JOIN {self.BIGQUERY_SITES} ON {self.BIGQUERY_SITES}.id = {self.BIGQUERY_EVENTS}.site_id | ||
WHERE {self.BIGQUERY_EVENTS}.timestamp >= '{start_date}' | ||
AND {self.BIGQUERY_EVENTS}.timestamp <= '{end_date}' | ||
AND {self.BIGQUERY_EVENTS}.tenant = '{tenant}' | ||
AND `airqo-250220.metadata.sites`.id in UNNEST({sites}) | ||
""" | ||
|
||
|
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -328,6 +328,31 @@ def filter_non_private_sites(filter_type: str, sites: List[str]) -> Dict[str, An | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
logger.exception(f"Error while filtering non private devices {rex}") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
def validate_network(self, network_name: str) -> bool: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Validate if a given network name exists in the list of networks. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Args: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
network_name (str): The name of the network to validate. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Returns: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
bool: True if the network name exists, False otherwise. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if not network_name: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return False | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
endpoint: str = "/users/networks" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
airqo_requests = AirQoRequests() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
response = airqo_requests.request(endpoint=endpoint, method="get") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if response and "networks" in response: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
networks = response["networks"] | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
# TODO Could add an active network filter | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return any(network.get("net_name") == network_name for network in networks) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return False | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Remove unnecessary The Modify the function definition to remove the -def validate_network(self, network_name: str) -> bool:
+def validate_network(network_name: str) -> bool: Also, ensure that all calls to this function are updated accordingly. 📝 Committable suggestion
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
def filter_non_private_devices(filter_type: str, devices: List[str]) -> Dict[str, Any]: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
FilterS out private device IDs from a provided array of device IDs. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -166,6 +166,7 @@ def post(self): | |
data_frame.drop( | ||
columns=[ | ||
"site_id", | ||
"timestamp", | ||
], | ||
inplace=True, | ||
) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix the docstring formatting for
filter_columns
parameterThe new parameter
filter_columns(list)
in the docstring is not properly formatted. It should include a colon and a descriptive explanation to match the existing docstring style.Apply this diff to correct the docstring:
filter_value (list): Filter values corresponding to the filter type. start_date (str): Start date for the data query. end_date (str): End date for the data query. frequency (str): Data frequency (e.g., 'raw', 'daily', 'hourly'). pollutants (list): List of pollutants to include in the data. data_type (str): Type of data ('raw' or 'aggregated'). +filter_columns (list): List of columns to filter. weather_fields (list): List of weather fields to retrieve.