From b0aec102faa596688f9c1498b5d70a65ef2f0ff3 Mon Sep 17 00:00:00 2001 From: NicholasTurner23 Date: Wed, 8 Jan 2025 17:30:41 +0300 Subject: [PATCH 1/6] Optimize duplicate removal to ensure data from non active sites is removed, filling is done more accurately too. --- src/workflows/airqo_etl_utils/airqo_utils.py | 85 +++++++++++++++----- 1 file changed, 65 insertions(+), 20 deletions(-) diff --git a/src/workflows/airqo_etl_utils/airqo_utils.py b/src/workflows/airqo_etl_utils/airqo_utils.py index 4e1b281b3f..009d4b8ce9 100644 --- a/src/workflows/airqo_etl_utils/airqo_utils.py +++ b/src/workflows/airqo_etl_utils/airqo_utils.py @@ -86,34 +86,79 @@ def extract_data_from_bigquery( @staticmethod def remove_duplicates(data: pd.DataFrame) -> pd.DataFrame: - cols = data.columns.to_list() - cols.remove("timestamp") - cols.remove("device_number") - data.dropna(subset=cols, how="all", inplace=True) + """ + Removes duplicate rows from a pandas DataFrame based on 'device_id' and 'timestamp' + while ensuring missing values are filled and non-duplicated data is retained. + + Steps: + 1. Drops rows where all non-essential columns (except 'timestamp', 'device_id', and 'device_number') are NaN. + 2. Drops rows where 'site_id' is NaN (assumed to be non-deployed devices). + 3. Identifies duplicate rows based on 'device_id' and 'timestamp'. + 4. Fills missing values for duplicates within each 'site_id' group using forward and backward filling. + 5. Retains only the first occurrence of duplicates. + + Args: + data (pd.DataFrame): The input DataFrame containing 'timestamp', 'device_id', and 'site_id' columns. + + Returns: + pd.DataFrame: A cleaned DataFrame with duplicates handled and missing values filled. + """ data["timestamp"] = pd.to_datetime(data["timestamp"]) + + non_essential_cols = [ + col + for col in data.columns + if col not in ["timestamp", "device_id", "device_number", "site_id"] + ] + data.dropna(subset=non_essential_cols, how="all", inplace=True) + + # Drop rows where 'site_id' is NaN (non-deployed devices) + data.dropna(subset=["site_id"], inplace=True) + data["duplicated"] = data.duplicated( - keep=False, subset=["device_number", "timestamp"] + keep=False, subset=["device_id", "timestamp"] ) - if True not in data["duplicated"].values: + if not data["duplicated"].any(): + data.drop(columns=["duplicated"], inplace=True) return data - duplicated_data = data.loc[data["duplicated"]] - not_duplicated_data = data.loc[~data["duplicated"]] + duplicates = data[data["duplicated"]].copy() + non_duplicates = data[~data["duplicated"]].copy() - for _, by_device_number in duplicated_data.groupby(by="device_number"): - for _, by_timestamp in by_device_number.groupby(by="timestamp"): - by_timestamp = by_timestamp.copy() - by_timestamp.fillna(inplace=True, method="ffill") - by_timestamp.fillna(inplace=True, method="bfill") - by_timestamp.drop_duplicates( - subset=["device_number", "timestamp"], inplace=True, keep="first" - ) - not_duplicated_data = pd.concat( - [not_duplicated_data, by_timestamp], ignore_index=True - ) + columns_to_fill = [ + col + for col in duplicates.columns + if col + not in [ + "device_number", + "device_id", + "timestamp", + "latitude", + "longitude", + "network", + "site_id", + ] + ] + + # Fill missing values within each 'site_id' group + filled_duplicates = [] + for _, group in duplicates.groupby("site_id"): + group = group.sort_values(by=["device_id", "timestamp"]) + group[columns_to_fill] = ( + group[columns_to_fill].fillna(method="ffill").fillna(method="bfill") + ) + group = group.drop_duplicates( + subset=["device_id", "timestamp"], keep="first" + ) + filled_duplicates.append(group) + + duplicates = pd.concat(filled_duplicates, ignore_index=True) + cleaned_data = pd.concat([non_duplicates, duplicates], ignore_index=True) + + cleaned_data.drop(columns=["duplicated"], inplace=True) - return not_duplicated_data + return cleaned_data @staticmethod def extract_aggregated_raw_data( From 4d8976f522a54fd3169f9ed3a2bf23700f706392 Mon Sep 17 00:00:00 2001 From: NicholasTurner23 Date: Wed, 8 Jan 2025 17:32:39 +0300 Subject: [PATCH 2/6] Optimize and add doc strings for data extraction from bigquery --- src/workflows/airqo_etl_utils/airqo_utils.py | 35 +++++++++++++++----- 1 file changed, 27 insertions(+), 8 deletions(-) diff --git a/src/workflows/airqo_etl_utils/airqo_utils.py b/src/workflows/airqo_etl_utils/airqo_utils.py index 009d4b8ce9..aea2c2a15e 100644 --- a/src/workflows/airqo_etl_utils/airqo_utils.py +++ b/src/workflows/airqo_etl_utils/airqo_utils.py @@ -66,20 +66,39 @@ def extract_uncalibrated_data(start_date_time, end_date_time) -> pd.DataFrame: @staticmethod def extract_data_from_bigquery( - start_date_time, end_date_time, frequency: Frequency + start_date_time, + end_date_time, + frequency: Frequency, + device_network: DeviceNetwork = None, ) -> pd.DataFrame: + """ + Extracts data from BigQuery within a specified time range and frequency, + with an optional filter for the device network. The data is cleaned to remove outliers. + + Args: + start_date_time(str): The start of the time range for data extraction, in ISO 8601 format. + end_date_time(str): The end of the time range for data extraction, in ISO 8601 format. + frequency(Frequency): The frequency of the data to be extracted, e.g., RAW or HOURLY. + device_network(DeviceNetwork, optional): The network to filter devices, default is None (no filter). + + Returns: + pd.DataFrame: A pandas DataFrame containing the cleaned data from BigQuery. + + Raises: + ValueError: If the frequency is unsupported or no table is associated with it. + """ bigquery_api = BigQueryApi() - if frequency == Frequency.RAW: - table = bigquery_api.raw_measurements_table - elif frequency == Frequency.HOURLY: - table = bigquery_api.hourly_measurements_table - else: - table = "" + + table = { + Frequency.RAW: bigquery_api.raw_measurements_table, + Frequency.HOURLY: bigquery_api.hourly_measurements_table, + }.get(frequency, "") + raw_data = bigquery_api.query_data( table=table, start_date_time=start_date_time, end_date_time=end_date_time, - network=DeviceNetwork.AIRQO, + network=device_network, ) return DataValidationUtils.remove_outliers(raw_data) From 402010d5c82d0ac31e312a9ed156d2f7570c0b60 Mon Sep 17 00:00:00 2001 From: NicholasTurner23 Date: Wed, 8 Jan 2025 17:34:27 +0300 Subject: [PATCH 3/6] Reduce number of days to reduce system load. --- src/workflows/dags/airqo_measurements.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/workflows/dags/airqo_measurements.py b/src/workflows/dags/airqo_measurements.py index a3e7150dc5..9da49b82fd 100644 --- a/src/workflows/dags/airqo_measurements.py +++ b/src/workflows/dags/airqo_measurements.py @@ -221,7 +221,7 @@ def extract_raw_data(**kwargs) -> pd.DataFrame: from airqo_etl_utils.date import DateUtils start_date_time, end_date_time = DateUtils.get_dag_date_time_values( - days=14, **kwargs + days=1, **kwargs ) return AirQoDataUtils.extract_data_from_bigquery( start_date_time=start_date_time, @@ -235,7 +235,7 @@ def extract_hourly_data(**kwargs) -> pd.DataFrame: from airqo_etl_utils.date import DateUtils start_date_time, end_date_time = DateUtils.get_dag_date_time_values( - days=14, **kwargs + days=1, **kwargs ) return AirQoDataUtils.extract_data_from_bigquery( start_date_time=start_date_time, From 63535bf32667a07bbecef5547d62d8b81eb06fc0 Mon Sep 17 00:00:00 2001 From: NicholasTurner23 Date: Wed, 8 Jan 2025 17:34:57 +0300 Subject: [PATCH 4/6] Cleanup --- src/workflows/airqo_etl_utils/airnow_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/workflows/airqo_etl_utils/airnow_utils.py b/src/workflows/airqo_etl_utils/airnow_utils.py index 572c4fe0f5..997d18d8b7 100644 --- a/src/workflows/airqo_etl_utils/airnow_utils.py +++ b/src/workflows/airqo_etl_utils/airnow_utils.py @@ -102,7 +102,7 @@ def extract_bam_data(start_date_time: str, end_date_time: str) -> pd.DataFrame: all_device_data.append(device_df) if not all_device_data: - raise ValueError("No BAM data found for the specified date range.") + logger.info("No BAM data found for the specified date range.") bam_data = pd.concat(all_device_data, ignore_index=True) From 3107678d1bc6b7b5ed67f252c42694e87c0dcfcb Mon Sep 17 00:00:00 2001 From: NicholasTurner23 Date: Wed, 8 Jan 2025 17:38:18 +0300 Subject: [PATCH 5/6] Add task retries for airqo measurements cleanup tasks --- src/workflows/dags/airqo_measurements.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/workflows/dags/airqo_measurements.py b/src/workflows/dags/airqo_measurements.py index 9da49b82fd..ef7a6eafd4 100644 --- a/src/workflows/dags/airqo_measurements.py +++ b/src/workflows/dags/airqo_measurements.py @@ -215,7 +215,7 @@ def load_data(airqo_data: pd.DataFrame): def airqo_cleanup_measurements(): import pandas as pd - @task(provide_context=True) + @task(provide_context=True, retries=3, retry_delay=timedelta(minutes=5)) def extract_raw_data(**kwargs) -> pd.DataFrame: from airqo_etl_utils.airqo_utils import AirQoDataUtils from airqo_etl_utils.date import DateUtils @@ -229,7 +229,7 @@ def extract_raw_data(**kwargs) -> pd.DataFrame: frequency=Frequency.RAW, ) - @task(provide_context=True) + @task(provide_context=True, retries=3, retry_delay=timedelta(minutes=5)) def extract_hourly_data(**kwargs) -> pd.DataFrame: from airqo_etl_utils.airqo_utils import AirQoDataUtils from airqo_etl_utils.date import DateUtils @@ -255,7 +255,7 @@ def remove_duplicated_hourly_data(data: pd.DataFrame) -> pd.DataFrame: return AirQoDataUtils.remove_duplicates(data=data) - @task() + @task(provide_context=True, retries=3, retry_delay=timedelta(minutes=5)) def load_raw_data(data: pd.DataFrame): from airqo_etl_utils.bigquery_api import BigQueryApi @@ -264,7 +264,7 @@ def load_raw_data(data: pd.DataFrame): dataframe=data, table=big_query_api.raw_measurements_table ) - @task() + @task(provide_context=True, retries=3, retry_delay=timedelta(minutes=5)) def load_hourly_data(data: pd.DataFrame): from airqo_etl_utils.bigquery_api import BigQueryApi From cc36061f641482113ef54d2c71b4de9cf4d1247b Mon Sep 17 00:00:00 2001 From: NicholasTurner23 Date: Thu, 9 Jan 2025 13:07:43 +0300 Subject: [PATCH 6/6] Clean up --- src/workflows/airqo_etl_utils/bigquery_api.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/workflows/airqo_etl_utils/bigquery_api.py b/src/workflows/airqo_etl_utils/bigquery_api.py index 581982a13e..90e6ee2fde 100644 --- a/src/workflows/airqo_etl_utils/bigquery_api.py +++ b/src/workflows/airqo_etl_utils/bigquery_api.py @@ -567,7 +567,7 @@ def compose_query( columns = ", ".join(map(str, columns)) if columns else " * " where_clause = f" timestamp between '{start_date_time}' and '{end_date_time}' " - if network: + if network and network != "all": where_clause += f"AND network = '{network}' " valid_cols = self.get_columns(table=table)