diff --git a/src/analytics/api/models/events.py b/src/analytics/api/models/events.py index f4884aa24a..34dd31dc06 100644 --- a/src/analytics/api/models/events.py +++ b/src/analytics/api/models/events.py @@ -589,7 +589,7 @@ def simple_data_cleaning(cls, data: pd.DataFrame) -> pd.DataFrame: data.loc[data["pm2_5_raw_value"] != 0, "pm2_5"] = np.nan - if (data["pm2_5_raw_value"] == 0).all(): + if ((data["pm2_5_raw_value"] == 0) | (data["pm2_5_raw_value"].isna())).all(): data.drop(columns=["pm2_5_raw_value"], inplace=True) zero_columns = data.loc[:, (data == 0).all()].columns diff --git a/src/workflows/airqo_etl_utils/bigquery_api.py b/src/workflows/airqo_etl_utils/bigquery_api.py index 761113a622..581982a13e 100644 --- a/src/workflows/airqo_etl_utils/bigquery_api.py +++ b/src/workflows/airqo_etl_utils/bigquery_api.py @@ -604,14 +604,44 @@ def reload_data( self, dataframe: pd.DataFrame, table: str, - tenant: Tenant = Tenant.ALL, + network: str = "all", start_date_time: str = None, end_date_time: str = None, where_fields: dict = None, null_cols: list = None, ) -> None: + """ + Reloads data into a specified table in BigQuery by: + 1. Deleting existing records in the table based on the provided date range, + network, and optional filtering criteria. + 2. Inserting new records from the provided DataFrame. + + Args: + dataframe (pd.DataFrame): The data to be reloaded into the table. + table (str): The target table in BigQuery. + network (str, optional): The network filter to be applied. Defaults to "all". + start_date_time (str, optional): The start of the date range for deletion. + If None, inferred from the DataFrame's earliest timestamp. + end_date_time (str, optional): The end of the date range for deletion. + If None, inferred from the DataFrame's latest timestamp. + where_fields (dict, optional): Additional fields and values for filtering rows to delete. + null_cols (list, optional): Columns to filter on `NULL` values during deletion. + + Returns: + None: The function performs operations directly on the BigQuery table. + + Raises: + ValueError: If `timestamp` column is missing in the DataFrame. + """ + if start_date_time is None or end_date_time is None: - data = dataframe.copy() + if "timestamp" not in dataframe.columns: + raise ValueError( + "The DataFrame must contain a 'timestamp' column to derive the date range." + ) + data = ( + dataframe.copy() + ) # Not sure why this dataframe is being copied. # Memory wastage? data["timestamp"] = pd.to_datetime(data["timestamp"]) start_date_time = date_to_str(data["timestamp"].min()) end_date_time = date_to_str(data["timestamp"].max()) @@ -619,7 +649,7 @@ def reload_data( query = self.compose_query( QueryType.DELETE, table=table, - tenant=tenant, + network=network, start_date_time=start_date_time, end_date_time=end_date_time, where_fields=where_fields, diff --git a/src/workflows/airqo_etl_utils/daily_data_utils.py b/src/workflows/airqo_etl_utils/daily_data_utils.py index 804382c8e2..27ff183025 100644 --- a/src/workflows/airqo_etl_utils/daily_data_utils.py +++ b/src/workflows/airqo_etl_utils/daily_data_utils.py @@ -8,31 +8,47 @@ class DailyDataUtils: @staticmethod def average_data(data: pd.DataFrame) -> pd.DataFrame: - averaged_data = pd.DataFrame() - data["timestamp"] = data["timestamp"].apply(pd.to_datetime) - - for _, by_tenant in data.groupby("tenant"): - tenant = by_tenant.iloc[0]["tenant"] - del by_tenant["tenant"] - for _, by_device in by_tenant.groupby("device_id"): - site_id = by_device.iloc[0]["site_id"] - device_id = by_device.iloc[0]["device_id"] - device_number = by_device.iloc[0]["device_number"] - - del by_device["site_id"] - del by_device["device_id"] - del by_device["device_number"] - - device_averages = by_device.resample("1D", on="timestamp").mean() - device_averages["timestamp"] = device_averages.index - device_averages["device_id"] = device_id - device_averages["site_id"] = site_id - device_averages["device_number"] = device_number - device_averages["tenant"] = tenant - - averaged_data = pd.concat( - [averaged_data, device_averages], ignore_index=True - ) + """ + Averages data in a pandas DataFrame on a daily basis for each device, + grouped by network and device ID. The function resamples data + to compute daily averages for numerical columns. + + Args: + data (pd.DataFrame): A pandas DataFrame containing the following columns: + - "timestamp": Timestamps of the data. + - "network": The network the data belongs to. + - "device_id": Unique identifier for the device. + - "site_id": Unique identifier for the site associated with the device. + - "device_number": Device number. + + Returns: + pd.DataFrame: A DataFrame containing daily averages for each device, + including metadata columns such as "tenant", "device_id", "site_id", + and "device_number". + """ + data["timestamp"] = pd.to_datetime(data["timestamp"]) + + averaged_data_list = [] + + for (network, device_id), group in data.groupby(["network", "device_id"]): + network = group["network"].iloc[0] + site_id = group["site_id"].iloc[0] + device_number = group["device_number"].iloc[0] + + device_averages = ( + group.resample("1D", on="timestamp") + .mean(numeric_only=True) + .reset_index() + ) + + device_averages["network"] = network + device_averages["device_id"] = device_id + device_averages["site_id"] = site_id + device_averages["device_number"] = device_number + + averaged_data_list.append(device_averages) + + averaged_data = pd.concat(averaged_data_list, ignore_index=True) return averaged_data @@ -77,7 +93,7 @@ def cleanup_and_reload( ) bigquery_api.reload_data( - tenant=Tenant.ALL, + network="all", table=table, dataframe=data, start_date_time=start_date_time,