Skip to content

Commit

Permalink
Merge pull request #4203 from NicholasTurner23/update-fix/Consolidate…
Browse files Browse the repository at this point in the history
…d-data-cleanup

Update fix/consolidated data cleanup
  • Loading branch information
Baalmart authored Jan 18, 2025
2 parents b3a838a + a4e9bcd commit 3ab8f93
Show file tree
Hide file tree
Showing 6 changed files with 112 additions and 165 deletions.
113 changes: 1 addition & 112 deletions src/workflows/airqo_etl_utils/airqo_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,71 +65,6 @@ def extract_uncalibrated_data(start_date_time, end_date_time) -> pd.DataFrame:

return DataValidationUtils.remove_outliers(hourly_uncalibrated_data)

@staticmethod
def extract_data_from_bigquery(
datatype: str,
start_date_time: str,
end_date_time: str,
frequency: Frequency,
device_network: DeviceNetwork = None,
dynamic_query: bool = False,
remove_outliers: bool = True,
) -> pd.DataFrame:
"""
Extracts data from BigQuery within a specified time range and frequency,
with an optional filter for the device network. The data is cleaned to remove outliers.
Args:
datatype(str): The type of data to extract determined by the source data asset.
start_date_time(str): The start of the time range for data extraction, in ISO 8601 format.
end_date_time(str): The end of the time range for data extraction, in ISO 8601 format.
frequency(Frequency): The frequency of the data to be extracted, e.g., RAW or HOURLY.
device_network(DeviceNetwork, optional): The network to filter devices, default is None (no filter).
dynamic_query (bool, optional): Determines the type of data returned. If True, returns averaged data grouped by `device_number`, `device_id`, and `site_id`. If False, returns raw data without aggregation. Defaults to False.
remove_outliers (bool, optional): If True, removes outliers from the extracted data. Defaults to True.
Returns:
pd.DataFrame: A pandas DataFrame containing the cleaned data from BigQuery.
Raises:
ValueError: If the frequency is unsupported or no table is associated with it.
"""
bigquery_api = BigQueryApi()
table: str = None

source = {
"raw": {
Frequency.RAW: bigquery_api.raw_measurements_table,
},
"averaged": {
Frequency.HOURLY: bigquery_api.hourly_measurements_table,
Frequency.DAILY: bigquery_api.daily_measurements_table,
},
"consolidated": {
Frequency.HOURLY: bigquery_api.consolidated_data_table,
},
"weather": {Frequency.HOURLY: bigquery_api.hourly_weather_table},
}.get(datatype, None)

if source:
table = source.get(frequency, "")

if not table:
raise ValueError("No table information provided.")

raw_data = bigquery_api.query_data(
table=table,
start_date_time=start_date_time,
end_date_time=end_date_time,
network=device_network,
dynamic_query=dynamic_query,
)

if remove_outliers:
raw_data = DataValidationUtils.remove_outliers(raw_data)

return raw_data

@staticmethod
def map_and_extract_data(
data_mapping: Dict[str, Union[str, Dict[str, List[str]]]],
Expand Down Expand Up @@ -535,7 +470,7 @@ def clean_bam_data(data: pd.DataFrame) -> pd.DataFrame:

big_query_api = BigQueryApi()
required_cols = big_query_api.get_columns(
table=big_query_api.bam_measurements_table
table=big_query_api.bam_hourly_measurements_table
)

data = Utils.populate_missing_columns(data=data, columns=required_cols)
Expand Down Expand Up @@ -594,52 +529,6 @@ def clean_low_cost_sensor_data(
data.loc[is_airqo_network, "pm10"] = pm10_mean
return data

@staticmethod
def format_data_for_bigquery(
data: pd.DataFrame, data_type: DataType
) -> pd.DataFrame:
# Currently only used for BAM device measurements
data.loc[:, "timestamp"] = pd.to_datetime(data["timestamp"])

big_query_api = BigQueryApi()
if data_type == DataType.UNCLEAN_BAM_DATA:
cols = big_query_api.get_columns(
table=big_query_api.raw_bam_measurements_table
)
elif data_type == DataType.CLEAN_BAM_DATA:
cols = big_query_api.get_columns(table=big_query_api.bam_measurements_table)
elif data_type == DataType.UNCLEAN_LOW_COST_DATA:
cols = big_query_api.get_columns(table=big_query_api.raw_measurements_table)
elif data_type == DataType.CLEAN_LOW_COST_DATA:
cols = big_query_api.get_columns(table=big_query_api.raw_measurements_table)
elif data_type == DataType.AGGREGATED_LOW_COST_DATA:
cols = big_query_api.get_columns(
table=big_query_api.hourly_measurements_table
)
else:
raise Exception("invalid data type")
return Utils.populate_missing_columns(data=data, columns=cols)

@staticmethod
def process_raw_data_for_bigquery(data: pd.DataFrame) -> pd.DataFrame:
"""
Makes neccessary conversions, adds missing columns and sets them to `None`
"""
data["timestamp"] = pd.to_datetime(data["timestamp"])
big_query_api = BigQueryApi()
cols = big_query_api.get_columns(table=big_query_api.raw_measurements_table)
return Utils.populate_missing_columns(data=data, columns=cols)

@staticmethod
def process_aggregated_data_for_bigquery(data: pd.DataFrame) -> pd.DataFrame:
"""
Makes neccessary conversions, adds missing columns and sets them to `None`
"""
data["timestamp"] = pd.to_datetime(data["timestamp"])
big_query_api = BigQueryApi()
cols = big_query_api.get_columns(table=big_query_api.hourly_measurements_table)
return Utils.populate_missing_columns(data=data, columns=cols)

@staticmethod
def process_latest_data(
data: pd.DataFrame, device_category: DeviceCategory
Expand Down
23 changes: 23 additions & 0 deletions src/workflows/airqo_etl_utils/config.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
from pathlib import Path

from .constants import DataType, DeviceCategory, Frequency
import pymongo as pm
import tweepy
import urllib3
Expand Down Expand Up @@ -385,6 +386,28 @@ class Config:
BIGQUERY_RAW_BAM_DATA_TABLE: "bam_raw_measurements.json",
"all": None,
}
DataSource = {
DataType.RAW: {
DeviceCategory.GENERAL: {
Frequency.RAW: BIGQUERY_RAW_EVENTS_TABLE,
},
DeviceCategory.BAM: {Frequency.RAW: BIGQUERY_RAW_BAM_DATA_TABLE},
DeviceCategory.WEATHER: {Frequency.RAW: BIGQUERY_RAW_WEATHER_TABLE},
},
DataType.AVERAGED: {
DeviceCategory.GENERAL: {
Frequency.HOURLY: BIGQUERY_HOURLY_EVENTS_TABLE,
Frequency.DAILY: BIGQUERY_DAILY_EVENTS_TABLE,
},
DeviceCategory.BAM: {Frequency.HOURLY: BIGQUERY_HOURLY_BAM_EVENTS_TABLE},
DeviceCategory.WEATHER: {Frequency.RAW: BIGQUERY_HOURLY_WEATHER_TABLE},
},
DataType.CONSOLIDATED: {
DeviceCategory.GENERAL: {
Frequency.HOURLY: BIGQUERY_ANALYTICS_TABLE,
}
},
}

# Data unit tests
BUCKET_NAME_AIRQO = os.getenv("BUCKET_NAME")
Expand Down
80 changes: 49 additions & 31 deletions src/workflows/airqo_etl_utils/datautils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import numpy as np
import pandas as pd
import logging

from .config import configuration
from .bigquery_api import BigQueryApi
from .constants import (
DeviceCategory,
Expand All @@ -14,6 +16,8 @@
from .data_validator import DataValidationUtils
from typing import List, Dict, Any, Optional, Union

logger = logging.getLogger(__name__)


class DataUtils:
Device_Field_Mapping = {
Expand Down Expand Up @@ -70,40 +74,17 @@ def extract_data_from_bigquery(
bigquery_api = BigQueryApi()
table: str = None

source = {
DataType.RAW: {
DeviceCategory.GENERAL: {
Frequency.RAW: bigquery_api.raw_measurements_table,
},
DeviceCategory.BAM: {
Frequency.RAW: bigquery_api.raw_bam_measurements_table
},
DeviceCategory.WEATHER: {Frequency.RAW: bigquery_api.raw_weather_table},
},
DataType.AVERAGED: {
DeviceCategory.GENERAL: {
Frequency.HOURLY: bigquery_api.hourly_measurements_table,
Frequency.DAILY: bigquery_api.daily_measurements_table,
},
DeviceCategory.BAM: {
Frequency.HOURLY: bigquery_api.bam_hourly_measurements_table
},
DeviceCategory.WEATHER: {
Frequency.RAW: bigquery_api.hourly_weather_table
},
},
DataType.CONSOLIDATED: {
DeviceCategory.GENERAL: {
Frequency.HOURLY: bigquery_api.consolidated_data_table,
}
},
}.get(datatype, None)

if not device_category:
device_category = DeviceCategory.GENERAL

if source:
try:
source = configuration.DataSource.get(datatype)
table = source.get(device_category).get(frequency)
except KeyError as e:
logger.exception(
f"Invalid combination: {datatype}, {device_category}, {frequency}"
)
except Exception as e:
logger.exception("An unexpected error occurred during column retrieval")

if not table:
raise ValueError("No table information provided.")
Expand All @@ -121,6 +102,43 @@ def extract_data_from_bigquery(

return raw_data

def format_data_for_bigquery(
data: pd.DataFrame,
datatype: DataType,
device_category: DeviceCategory,
frequency: Frequency,
) -> pd.DataFrame:
"""
Formats a pandas DataFrame for BigQuery by ensuring all required columns are present
and the timestamp column is correctly parsed to datetime.
Args:
data (pd.DataFrame): The input DataFrame to be formatted.
data_type (DataType): The type of data (e.g., raw, averaged or processed).
device_category (DeviceCategory): The category of the device (e.g., BAM, low-cost).
frequency (Frequency): The data frequency (e.g., raw, hourly, daily).
Returns:
pd.DataFrame: A DataFrame formatted for BigQuery with required columns populated.
Raises:
KeyError: If the combination of data_type, device_category, and frequency is invalid.
Exception: For unexpected errors during column retrieval or data processing.
"""
data.loc[:, "timestamp"] = pd.to_datetime(data["timestamp"])

try:
datasource = configuration.DataSource
cols = datasource.get(datatype).get(device_category).get(frequency)
except KeyError as e:
logger.exception(
f"Invalid combination: {datatype}, {device_category}, {frequency}"
)
except Exception as e:
logger.exception("An unexpected error occurred during column retrieval")

return Utils.populate_missing_columns(data=data, columns=cols)

@staticmethod
def remove_duplicates(
data: pd.DataFrame,
Expand Down
22 changes: 11 additions & 11 deletions src/workflows/dags/airqo_bam_measurements.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from airflow.utils.dates import days_ago
import pandas as pd
from airqo_etl_utils.airqo_utils import AirQoDataUtils
from airqo_etl_utils.datautils import DataUtils
from airqo_etl_utils.date import DateUtils
from airqo_etl_utils.bigquery_api import BigQueryApi
from datetime import timedelta
Expand Down Expand Up @@ -37,8 +38,8 @@ def extract_bam_data(**kwargs) -> pd.DataFrame:

@task(retries=3, retry_delay=timedelta(minutes=5))
def save_unclean_data(data: pd.DataFrame):
data = AirQoDataUtils.format_data_for_bigquery(
data=data, data_type=DataType.UNCLEAN_BAM_DATA
data = DataUtils.format_data_for_bigquery(
data, DataType.RAW, DeviceCategory.BAM, Frequency.RAW
)
big_query_api = BigQueryApi()
big_query_api.load_data(
Expand All @@ -52,13 +53,13 @@ def clean_bam_data(data: pd.DataFrame) -> pd.DataFrame:

@task(retries=3, retry_delay=timedelta(minutes=5))
def save_clean_bam_data(data: pd.DataFrame):
data = AirQoDataUtils.format_data_for_bigquery(
data=data, data_type=DataType.CLEAN_BAM_DATA
data = DataUtils.format_data_for_bigquery(
data, DataType.AVERAGED, DeviceCategory.BAM, Frequency.HOURLY
)
big_query_api = BigQueryApi()
big_query_api.load_data(
dataframe=data,
table=big_query_api.bam_measurements_table,
table=big_query_api.bam_hourly_measurements_table,
)

unclean_data = extract_bam_data()
Expand Down Expand Up @@ -98,10 +99,9 @@ def extract_bam_data(**kwargs):
@task(retries=3, retry_delay=timedelta(minutes=5))
def save_unclean_data(data: pd.DataFrame):
from airqo_etl_utils.bigquery_api import BigQueryApi
from airqo_etl_utils.airqo_utils import AirQoDataUtils

data = AirQoDataUtils.format_data_for_bigquery(
data=data, data_type=DataType.UNCLEAN_BAM_DATA
data = DataUtils.format_data_for_bigquery(
data, DataType.RAW, DeviceCategory.BAM, Frequency.RAW
)
big_query_api = BigQueryApi()
big_query_api.load_data(
Expand All @@ -120,13 +120,13 @@ def save_clean_bam_data(data: pd.DataFrame):
from airqo_etl_utils.bigquery_api import BigQueryApi
from airqo_etl_utils.airqo_utils import AirQoDataUtils

data = AirQoDataUtils.format_data_for_bigquery(
data=data, data_type=DataType.CLEAN_BAM_DATA
data = DataUtils.format_data_for_bigquery(
data, DataType.AVERAGED, DeviceCategory.BAM, Frequency.HOURLY
)
big_query_api = BigQueryApi()
big_query_api.load_data(
dataframe=data,
table=big_query_api.bam_measurements_table,
table=big_query_api.bam_hourly_measurements_table,
)

@task(retries=3, retry_delay=timedelta(minutes=5))
Expand Down
Loading

0 comments on commit 3ab8f93

Please sign in to comment.