Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update fix/consolidated data cleanup #4203

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 1 addition & 112 deletions src/workflows/airqo_etl_utils/airqo_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,71 +65,6 @@ def extract_uncalibrated_data(start_date_time, end_date_time) -> pd.DataFrame:

return DataValidationUtils.remove_outliers(hourly_uncalibrated_data)

@staticmethod
def extract_data_from_bigquery(
datatype: str,
start_date_time: str,
end_date_time: str,
frequency: Frequency,
device_network: DeviceNetwork = None,
dynamic_query: bool = False,
remove_outliers: bool = True,
) -> pd.DataFrame:
"""
Extracts data from BigQuery within a specified time range and frequency,
with an optional filter for the device network. The data is cleaned to remove outliers.

Args:
datatype(str): The type of data to extract determined by the source data asset.
start_date_time(str): The start of the time range for data extraction, in ISO 8601 format.
end_date_time(str): The end of the time range for data extraction, in ISO 8601 format.
frequency(Frequency): The frequency of the data to be extracted, e.g., RAW or HOURLY.
device_network(DeviceNetwork, optional): The network to filter devices, default is None (no filter).
dynamic_query (bool, optional): Determines the type of data returned. If True, returns averaged data grouped by `device_number`, `device_id`, and `site_id`. If False, returns raw data without aggregation. Defaults to False.
remove_outliers (bool, optional): If True, removes outliers from the extracted data. Defaults to True.

Returns:
pd.DataFrame: A pandas DataFrame containing the cleaned data from BigQuery.

Raises:
ValueError: If the frequency is unsupported or no table is associated with it.
"""
bigquery_api = BigQueryApi()
table: str = None

source = {
"raw": {
Frequency.RAW: bigquery_api.raw_measurements_table,
},
"averaged": {
Frequency.HOURLY: bigquery_api.hourly_measurements_table,
Frequency.DAILY: bigquery_api.daily_measurements_table,
},
"consolidated": {
Frequency.HOURLY: bigquery_api.consolidated_data_table,
},
"weather": {Frequency.HOURLY: bigquery_api.hourly_weather_table},
}.get(datatype, None)

if source:
table = source.get(frequency, "")

if not table:
raise ValueError("No table information provided.")

raw_data = bigquery_api.query_data(
table=table,
start_date_time=start_date_time,
end_date_time=end_date_time,
network=device_network,
dynamic_query=dynamic_query,
)

if remove_outliers:
raw_data = DataValidationUtils.remove_outliers(raw_data)

return raw_data

@staticmethod
def map_and_extract_data(
data_mapping: Dict[str, Union[str, Dict[str, List[str]]]],
Expand Down Expand Up @@ -535,7 +470,7 @@ def clean_bam_data(data: pd.DataFrame) -> pd.DataFrame:

big_query_api = BigQueryApi()
required_cols = big_query_api.get_columns(
table=big_query_api.bam_measurements_table
table=big_query_api.bam_hourly_measurements_table
)

data = Utils.populate_missing_columns(data=data, columns=required_cols)
Expand Down Expand Up @@ -594,52 +529,6 @@ def clean_low_cost_sensor_data(
data.loc[is_airqo_network, "pm10"] = pm10_mean
return data

@staticmethod
def format_data_for_bigquery(
data: pd.DataFrame, data_type: DataType
) -> pd.DataFrame:
# Currently only used for BAM device measurements
data.loc[:, "timestamp"] = pd.to_datetime(data["timestamp"])

big_query_api = BigQueryApi()
if data_type == DataType.UNCLEAN_BAM_DATA:
cols = big_query_api.get_columns(
table=big_query_api.raw_bam_measurements_table
)
elif data_type == DataType.CLEAN_BAM_DATA:
cols = big_query_api.get_columns(table=big_query_api.bam_measurements_table)
elif data_type == DataType.UNCLEAN_LOW_COST_DATA:
cols = big_query_api.get_columns(table=big_query_api.raw_measurements_table)
elif data_type == DataType.CLEAN_LOW_COST_DATA:
cols = big_query_api.get_columns(table=big_query_api.raw_measurements_table)
elif data_type == DataType.AGGREGATED_LOW_COST_DATA:
cols = big_query_api.get_columns(
table=big_query_api.hourly_measurements_table
)
else:
raise Exception("invalid data type")
return Utils.populate_missing_columns(data=data, columns=cols)

@staticmethod
def process_raw_data_for_bigquery(data: pd.DataFrame) -> pd.DataFrame:
"""
Makes neccessary conversions, adds missing columns and sets them to `None`
"""
data["timestamp"] = pd.to_datetime(data["timestamp"])
big_query_api = BigQueryApi()
cols = big_query_api.get_columns(table=big_query_api.raw_measurements_table)
return Utils.populate_missing_columns(data=data, columns=cols)

@staticmethod
def process_aggregated_data_for_bigquery(data: pd.DataFrame) -> pd.DataFrame:
"""
Makes neccessary conversions, adds missing columns and sets them to `None`
"""
data["timestamp"] = pd.to_datetime(data["timestamp"])
big_query_api = BigQueryApi()
cols = big_query_api.get_columns(table=big_query_api.hourly_measurements_table)
return Utils.populate_missing_columns(data=data, columns=cols)

@staticmethod
def process_latest_data(
data: pd.DataFrame, device_category: DeviceCategory
Expand Down
23 changes: 23 additions & 0 deletions src/workflows/airqo_etl_utils/config.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
from pathlib import Path

from .constants import DataType, DeviceCategory, Frequency
import pymongo as pm
import tweepy
import urllib3
Expand Down Expand Up @@ -385,6 +386,28 @@ class Config:
BIGQUERY_RAW_BAM_DATA_TABLE: "bam_raw_measurements.json",
"all": None,
}
DataSource = {
DataType.RAW: {
DeviceCategory.GENERAL: {
Frequency.RAW: BIGQUERY_RAW_EVENTS_TABLE,
},
DeviceCategory.BAM: {Frequency.RAW: BIGQUERY_RAW_BAM_DATA_TABLE},
DeviceCategory.WEATHER: {Frequency.RAW: BIGQUERY_RAW_WEATHER_TABLE},
},
DataType.AVERAGED: {
DeviceCategory.GENERAL: {
Frequency.HOURLY: BIGQUERY_HOURLY_EVENTS_TABLE,
Frequency.DAILY: BIGQUERY_DAILY_EVENTS_TABLE,
},
DeviceCategory.BAM: {Frequency.HOURLY: BIGQUERY_HOURLY_BAM_EVENTS_TABLE},
DeviceCategory.WEATHER: {Frequency.RAW: BIGQUERY_HOURLY_WEATHER_TABLE},
},
DataType.CONSOLIDATED: {
DeviceCategory.GENERAL: {
Frequency.HOURLY: BIGQUERY_ANALYTICS_TABLE,
}
},
}

# Data unit tests
BUCKET_NAME_AIRQO = os.getenv("BUCKET_NAME")
Expand Down
80 changes: 49 additions & 31 deletions src/workflows/airqo_etl_utils/datautils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import numpy as np
import pandas as pd
import logging

from .config import configuration
from .bigquery_api import BigQueryApi
from .constants import (
DeviceCategory,
Expand All @@ -14,6 +16,8 @@
from .data_validator import DataValidationUtils
from typing import List, Dict, Any, Optional, Union

logger = logging.getLogger(__name__)


class DataUtils:
Device_Field_Mapping = {
Expand Down Expand Up @@ -70,40 +74,17 @@ def extract_data_from_bigquery(
bigquery_api = BigQueryApi()
table: str = None

source = {
DataType.RAW: {
DeviceCategory.GENERAL: {
Frequency.RAW: bigquery_api.raw_measurements_table,
},
DeviceCategory.BAM: {
Frequency.RAW: bigquery_api.raw_bam_measurements_table
},
DeviceCategory.WEATHER: {Frequency.RAW: bigquery_api.raw_weather_table},
},
DataType.AVERAGED: {
DeviceCategory.GENERAL: {
Frequency.HOURLY: bigquery_api.hourly_measurements_table,
Frequency.DAILY: bigquery_api.daily_measurements_table,
},
DeviceCategory.BAM: {
Frequency.HOURLY: bigquery_api.bam_hourly_measurements_table
},
DeviceCategory.WEATHER: {
Frequency.RAW: bigquery_api.hourly_weather_table
},
},
DataType.CONSOLIDATED: {
DeviceCategory.GENERAL: {
Frequency.HOURLY: bigquery_api.consolidated_data_table,
}
},
}.get(datatype, None)

if not device_category:
device_category = DeviceCategory.GENERAL

if source:
try:
source = configuration.DataSource.get(datatype)
table = source.get(device_category).get(frequency)
except KeyError as e:
logger.exception(
f"Invalid combination: {datatype}, {device_category}, {frequency}"
)
except Exception as e:
logger.exception("An unexpected error occurred during column retrieval")
Comment on lines +79 to +87
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Enhance error handling by including original error messages.

The current error handling loses valuable context by not including the original error message in the logs.

Apply this diff to improve error handling:

 try:
     source = configuration.DataSource.get(datatype)
     table = source.get(device_category).get(frequency)
 except KeyError as e:
     logger.exception(
-        f"Invalid combination: {datatype}, {device_category}, {frequency}"
+        f"Invalid combination: {datatype}, {device_category}, {frequency}. Error: {str(e)}"
     )
+    raise
 except Exception as e:
     logger.exception("An unexpected error occurred during column retrieval")
+    raise
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
try:
source = configuration.DataSource.get(datatype)
table = source.get(device_category).get(frequency)
except KeyError as e:
logger.exception(
f"Invalid combination: {datatype}, {device_category}, {frequency}"
)
except Exception as e:
logger.exception("An unexpected error occurred during column retrieval")
try:
source = configuration.DataSource.get(datatype)
table = source.get(device_category).get(frequency)
except KeyError as e:
logger.exception(
f"Invalid combination: {datatype}, {device_category}, {frequency}. Error: {str(e)}"
)
raise
except Exception as e:
logger.exception("An unexpected error occurred during column retrieval")
raise
🧰 Tools
🪛 Ruff (0.8.2)

82-82: Local variable e is assigned to but never used

Remove assignment to unused variable e

(F841)


86-86: Local variable e is assigned to but never used

Remove assignment to unused variable e

(F841)


if not table:
raise ValueError("No table information provided.")
Expand All @@ -121,6 +102,43 @@ def extract_data_from_bigquery(

return raw_data

def format_data_for_bigquery(
data: pd.DataFrame,
datatype: DataType,
device_category: DeviceCategory,
frequency: Frequency,
) -> pd.DataFrame:
"""
Formats a pandas DataFrame for BigQuery by ensuring all required columns are present
and the timestamp column is correctly parsed to datetime.

Args:
data (pd.DataFrame): The input DataFrame to be formatted.
data_type (DataType): The type of data (e.g., raw, averaged or processed).
device_category (DeviceCategory): The category of the device (e.g., BAM, low-cost).
frequency (Frequency): The data frequency (e.g., raw, hourly, daily).

Returns:
pd.DataFrame: A DataFrame formatted for BigQuery with required columns populated.

Raises:
KeyError: If the combination of data_type, device_category, and frequency is invalid.
Exception: For unexpected errors during column retrieval or data processing.
"""
data.loc[:, "timestamp"] = pd.to_datetime(data["timestamp"])

try:
datasource = configuration.DataSource
cols = datasource.get(datatype).get(device_category).get(frequency)
except KeyError as e:
logger.exception(
f"Invalid combination: {datatype}, {device_category}, {frequency}"
)
except Exception as e:
logger.exception("An unexpected error occurred during column retrieval")

return Utils.populate_missing_columns(data=data, columns=cols)
Comment on lines +130 to +140
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Initialize cols variable and improve error handling.

The cols variable could be undefined if an exception occurs, and the error handling could be more informative.

Apply this diff:

+        cols = []
         try:
             datasource = configuration.DataSource
             cols = datasource.get(datatype).get(device_category).get(frequency)
         except KeyError as e:
             logger.exception(
-                f"Invalid combination: {datatype}, {device_category}, {frequency}"
+                f"Invalid combination: {datatype}, {device_category}, {frequency}. Error: {str(e)}"
             )
+            raise
         except Exception as e:
             logger.exception("An unexpected error occurred during column retrieval")
+            raise

         return Utils.populate_missing_columns(data=data, columns=cols)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
try:
datasource = configuration.DataSource
cols = datasource.get(datatype).get(device_category).get(frequency)
except KeyError as e:
logger.exception(
f"Invalid combination: {datatype}, {device_category}, {frequency}"
)
except Exception as e:
logger.exception("An unexpected error occurred during column retrieval")
return Utils.populate_missing_columns(data=data, columns=cols)
cols = []
try:
datasource = configuration.DataSource
cols = datasource.get(datatype).get(device_category).get(frequency)
except KeyError as e:
logger.exception(
f"Invalid combination: {datatype}, {device_category}, {frequency}. Error: {str(e)}"
)
raise
except Exception as e:
logger.exception("An unexpected error occurred during column retrieval")
raise
return Utils.populate_missing_columns(data=data, columns=cols)
🧰 Tools
🪛 Ruff (0.8.2)

133-133: Local variable e is assigned to but never used

Remove assignment to unused variable e

(F841)


137-137: Local variable e is assigned to but never used

Remove assignment to unused variable e

(F841)


@staticmethod
def remove_duplicates(
data: pd.DataFrame,
Expand Down
22 changes: 11 additions & 11 deletions src/workflows/dags/airqo_bam_measurements.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from airflow.utils.dates import days_ago
import pandas as pd
from airqo_etl_utils.airqo_utils import AirQoDataUtils
from airqo_etl_utils.datautils import DataUtils
from airqo_etl_utils.date import DateUtils
from airqo_etl_utils.bigquery_api import BigQueryApi
from datetime import timedelta
Expand Down Expand Up @@ -37,8 +38,8 @@ def extract_bam_data(**kwargs) -> pd.DataFrame:

@task(retries=3, retry_delay=timedelta(minutes=5))
def save_unclean_data(data: pd.DataFrame):
data = AirQoDataUtils.format_data_for_bigquery(
data=data, data_type=DataType.UNCLEAN_BAM_DATA
data = DataUtils.format_data_for_bigquery(
data, DataType.RAW, DeviceCategory.BAM, Frequency.RAW
)
big_query_api = BigQueryApi()
big_query_api.load_data(
Expand All @@ -52,13 +53,13 @@ def clean_bam_data(data: pd.DataFrame) -> pd.DataFrame:

@task(retries=3, retry_delay=timedelta(minutes=5))
def save_clean_bam_data(data: pd.DataFrame):
data = AirQoDataUtils.format_data_for_bigquery(
data=data, data_type=DataType.CLEAN_BAM_DATA
data = DataUtils.format_data_for_bigquery(
data, DataType.AVERAGED, DeviceCategory.BAM, Frequency.HOURLY
)
big_query_api = BigQueryApi()
big_query_api.load_data(
dataframe=data,
table=big_query_api.bam_measurements_table,
table=big_query_api.bam_hourly_measurements_table,
)

unclean_data = extract_bam_data()
Expand Down Expand Up @@ -98,10 +99,9 @@ def extract_bam_data(**kwargs):
@task(retries=3, retry_delay=timedelta(minutes=5))
def save_unclean_data(data: pd.DataFrame):
from airqo_etl_utils.bigquery_api import BigQueryApi
from airqo_etl_utils.airqo_utils import AirQoDataUtils

data = AirQoDataUtils.format_data_for_bigquery(
data=data, data_type=DataType.UNCLEAN_BAM_DATA
data = DataUtils.format_data_for_bigquery(
data, DataType.RAW, DeviceCategory.BAM, Frequency.RAW
)
big_query_api = BigQueryApi()
big_query_api.load_data(
Expand All @@ -120,13 +120,13 @@ def save_clean_bam_data(data: pd.DataFrame):
from airqo_etl_utils.bigquery_api import BigQueryApi
from airqo_etl_utils.airqo_utils import AirQoDataUtils

data = AirQoDataUtils.format_data_for_bigquery(
data=data, data_type=DataType.CLEAN_BAM_DATA
data = DataUtils.format_data_for_bigquery(
data, DataType.AVERAGED, DeviceCategory.BAM, Frequency.HOURLY
)
big_query_api = BigQueryApi()
big_query_api.load_data(
dataframe=data,
table=big_query_api.bam_measurements_table,
table=big_query_api.bam_hourly_measurements_table,
)

@task(retries=3, retry_delay=timedelta(minutes=5))
Expand Down
Loading
Loading