Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update fix/analytics data export cleanup #4037

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 52 additions & 9 deletions src/analytics/api/models/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,7 @@ def download_from_bigquery(
frequency (str): Data frequency (e.g., 'raw', 'daily', 'hourly').
pollutants (list): List of pollutants to include in the data.
data_type (str): Type of data ('raw' or 'aggregated').
filter_columns(list)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix the docstring formatting for filter_columns parameter

The new parameter filter_columns(list) in the docstring is not properly formatted. It should include a colon and a descriptive explanation to match the existing docstring style.

Apply this diff to correct the docstring:

 filter_value (list): Filter values corresponding to the filter type.
 start_date (str): Start date for the data query.
 end_date (str): End date for the data query.
 frequency (str): Data frequency (e.g., 'raw', 'daily', 'hourly').
 pollutants (list): List of pollutants to include in the data.
 data_type (str): Type of data ('raw' or 'aggregated').
+filter_columns (list): List of columns to filter.
 weather_fields (list): List of weather fields to retrieve.

Committable suggestion skipped: line range outside the PR's diff.

weather_fields (list): List of weather fields to retrieve.

Returns:
Expand All @@ -438,14 +439,11 @@ def download_from_bigquery(
weather_columns = []
for pollutant in pollutants:

if pollutant == "raw":
key = pollutant
else:
key = f"{pollutant}_{data_type}"

key = f"{pollutant}_{data_type}"
pollutant_mapping = BIGQUERY_FREQUENCY_MAPPER.get(frequency, {}).get(
key, []
)

pollutant_columns.extend(
cls.get_columns(
cls,
Expand All @@ -459,6 +457,10 @@ def download_from_bigquery(

# TODO Clean up by use using `get_columns` helper method
if pollutant in {"pm2_5", "pm10", "no2"}:
if data_type == "raw":
# Add dummy column to fix union column number missmatch.
bam_pollutant_columns.append("-1 as pm2_5")

NicholasTurner23 marked this conversation as resolved.
Show resolved Hide resolved
if frequency in ["weekly", "monthly", "yearly"]:
bam_pollutant_columns.extend(
[f"ROUND(AVG({pollutant}), {decimal_places}) AS {key}_value"]
Expand All @@ -467,6 +469,7 @@ def download_from_bigquery(
bam_pollutant_columns.extend(
[f"ROUND({pollutant}, {decimal_places}) AS {key}_value"]
)

# TODO Fix query when weather data is included. Currently failing
if weather_fields:
for field in weather_fields:
Expand Down Expand Up @@ -536,12 +539,55 @@ def download_from_bigquery(
drop_columns.append("datetime")
sorting_cols.append("datetime")

if data_type == "raw":
cls.simple_data_cleaning(dataframe)

dataframe.drop_duplicates(subset=drop_columns, inplace=True, keep="first")
dataframe.sort_values(sorting_cols, ascending=True, inplace=True)
dataframe["frequency"] = frequency
dataframe = dataframe.replace(np.nan, None)
return dataframe

@classmethod
def simple_data_cleaning(cls, data: pd.DataFrame) -> pd.DataFrame:
"""
Perform data cleaning on a pandas DataFrame to handle specific conditions
related to "pm2_5" and "pm2_5_raw_value" columns.

The cleaning process includes:
1. Ensuring correct numeric data types for "pm2_5" and "pm2_5_raw_value".
2. Removing "pm2_5" values where "pm2_5_raw_value" has data.
3. Dropping the "pm2_5_raw_value" column if it has no data at all.
4. Retaining "pm2_5" values where "pm2_5_raw_value" has no data, and removing
"pm2_5" values where "pm2_5_raw_value" has data.
5. Dropping any column (including "pm2_5" and "pm2_5_raw_value") if it is
entirely empty.

Args:
cls: Class reference (used in classmethods).
data (pd.DataFrame): Input pandas DataFrame with "pm2_5" and
"pm2_5_raw_value" columns.

Returns:
pd.DataFrame: Cleaned DataFrame with updates applied in place.

"""
data["pm2_5_raw_value"] = pd.to_numeric(
data["pm2_5_raw_value"], errors="coerce"
)
data["pm2_5"] = pd.to_numeric(data["pm2_5"], errors="coerce")

data.loc[~data["pm2_5_raw_value"].isna(), "pm2_5"] = np.nan

if data["pm2_5_raw_value"].isna().all():
data.drop(columns=["pm2_5_raw_value"], inplace=True)

data["pm2_5"] = data["pm2_5"].where(data["pm2_5_raw_value"].isna(), np.nan)

data.dropna(how="all", axis=1, inplace=True)

return data

@classmethod
def data_export_query(
cls,
Expand Down Expand Up @@ -1273,9 +1319,7 @@ def get_d3_chart_events(self, sites, start_date, end_date, pollutant, frequency)
)

@cache.memoize()
def get_d3_chart_events_v2(
self, sites, start_date, end_date, pollutant, frequency, tenant
):
def get_d3_chart_events_v2(self, sites, start_date, end_date, pollutant, frequency):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Update required: Inconsistent method signatures across codebase

The verification reveals that there are inconsistent implementations and usages of get_d3_chart_events_v2:

  • src/insights/models/events.py:1010: Method defined with tenant parameter
  • src/insights/namespaces/dashboard.py:147: Called with tenant parameter
  • src/analytics/api/models/events.py:1322: Method defined without tenant parameter
  • src/analytics/api/views/dashboard.py:229: Called without tenant parameter

This indicates a potential architectural issue where two different implementations of the same method exist with different signatures, which could lead to runtime errors.

🔗 Analysis chain

Verify all usages of get_d3_chart_events_v2 are updated

The method get_d3_chart_events_v2 no longer includes the tenant parameter. Ensure that all calls to this method have been updated to prevent potential TypeError exceptions.

Run the following script to locate all usages:

Review each occurrence to confirm that the tenant parameter is no longer being passed.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Find all calls to 'get_d3_chart_events_v2' and check parameters.

rg --no-heading --line-number 'get_d3_chart_events_v2\(' src/ -A 1

Length of output: 888

if pollutant not in ["pm2_5", "pm10", "no2", "pm1"]:
raise Exception("Invalid pollutant")

Expand All @@ -1293,7 +1337,6 @@ def get_d3_chart_events_v2(
JOIN {self.BIGQUERY_SITES} ON {self.BIGQUERY_SITES}.id = {self.BIGQUERY_EVENTS}.site_id
WHERE {self.BIGQUERY_EVENTS}.timestamp >= '{start_date}'
AND {self.BIGQUERY_EVENTS}.timestamp <= '{end_date}'
AND {self.BIGQUERY_EVENTS}.tenant = '{tenant}'
AND `airqo-250220.metadata.sites`.id in UNNEST({sites})
"""

Expand Down
25 changes: 25 additions & 0 deletions src/analytics/api/utils/data_formatters.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,31 @@ def filter_non_private_sites(filter_type: str, sites: List[str]) -> Dict[str, An
logger.exception(f"Error while filtering non private devices {rex}")


def validate_network(network_name: str) -> bool:
"""
Validate if a given network name exists in the list of networks.

Args:
network_name (str): The name of the network to validate.

Returns:
bool: True if the network name exists, False otherwise.
"""
if not network_name:
return False

endpoint: str = "/users/networks"
airqo_requests = AirQoRequests()
response = airqo_requests.request(endpoint=endpoint, method="get")

if response and "networks" in response:
networks = response["networks"]
# TODO Could add an active network filter
return any(network.get("net_name") == network_name for network in networks)

return False


def filter_non_private_devices(filter_type: str, devices: List[str]) -> Dict[str, Any]:
"""
FilterS out private device IDs from a provided array of device IDs.
Expand Down
9 changes: 2 additions & 7 deletions src/analytics/api/utils/pollutants/pm_25.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,19 +56,14 @@

COMMON_POLLUTANT_MAPPING = {
"pm2_5_calibrated": ["pm2_5_calibrated_value"],
"pm2_5_raw": ["pm2_5_raw_value"],
"pm2_5_raw": ["pm2_5_raw_value", "pm2_5"],
NicholasTurner23 marked this conversation as resolved.
Show resolved Hide resolved
"pm10_calibrated": ["pm10_calibrated_value"],
"pm10_raw": ["pm10_raw_value"],
"pm10_raw": ["pm10_raw_value", "pm10"],
"no2_calibrated": ["no2_calibrated_value"],
"no2_raw": ["no2_raw_value"],
}

BIGQUERY_FREQUENCY_MAPPER = {
"raw": {
"pm2_5": ["pm2_5", "s1_pm2_5", "s2_pm2_5"],
"pm10": ["pm10", "s1_pm10", "s2_pm10"],
"no2": ["no2"],
},
"daily": COMMON_POLLUTANT_MAPPING,
"hourly": COMMON_POLLUTANT_MAPPING,
"weekly": COMMON_POLLUTANT_MAPPING,
Expand Down
11 changes: 3 additions & 8 deletions src/analytics/api/views/dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,6 @@ def _get_validated_filter(self, json_data):
return filter_type, validated_data, error_message

def post(self):
tenant = request.args.get("tenant", "airqo")

json_data = request.get_json()

Expand All @@ -219,20 +218,16 @@ def post(self):
except Exception as e:
logger.exception(f"An error has occured; {e}")

sites = filter_non_private_sites("sites", json_data.get("sites", {})).get(
"sites", []
)

start_date = json_data["startDate"]
end_date = json_data["endDate"]
frequency = json_data["frequency"]
pollutant = json_data["pollutant"]
chart_type = json_data["chartType"]

events_model = EventsModel(tenant)
# data = events_model.get_d3_chart_events(sites, start_date, end_date, pollutant, frequency)
events_model = EventsModel("airqo")

data = events_model.get_d3_chart_events_v2(
filter_value, start_date, end_date, pollutant, frequency, tenant
filter_value, start_date, end_date, pollutant, frequency
)

if chart_type.lower() == "pie":
Expand Down
1 change: 1 addition & 0 deletions src/analytics/api/views/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ def post(self):
data_frame.drop(
columns=[
"site_id",
"timestamp",
],
inplace=True,
)
Expand Down
Loading