Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update fix/clean up #4184

Merged
2 changes: 1 addition & 1 deletion src/workflows/airqo_etl_utils/airnow_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def extract_bam_data(start_date_time: str, end_date_time: str) -> pd.DataFrame:
all_device_data.append(device_df)

if not all_device_data:
raise ValueError("No BAM data found for the specified date range.")
logger.info("No BAM data found for the specified date range.")

bam_data = pd.concat(all_device_data, ignore_index=True)

Expand Down
120 changes: 92 additions & 28 deletions src/workflows/airqo_etl_utils/airqo_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,54 +66,118 @@ def extract_uncalibrated_data(start_date_time, end_date_time) -> pd.DataFrame:

@staticmethod
def extract_data_from_bigquery(
start_date_time, end_date_time, frequency: Frequency
start_date_time,
end_date_time,
frequency: Frequency,
device_network: DeviceNetwork = None,
) -> pd.DataFrame:
"""
Extracts data from BigQuery within a specified time range and frequency,
with an optional filter for the device network. The data is cleaned to remove outliers.

Args:
start_date_time(str): The start of the time range for data extraction, in ISO 8601 format.
end_date_time(str): The end of the time range for data extraction, in ISO 8601 format.
frequency(Frequency): The frequency of the data to be extracted, e.g., RAW or HOURLY.
device_network(DeviceNetwork, optional): The network to filter devices, default is None (no filter).

Returns:
pd.DataFrame: A pandas DataFrame containing the cleaned data from BigQuery.

Raises:
ValueError: If the frequency is unsupported or no table is associated with it.
"""
bigquery_api = BigQueryApi()
if frequency == Frequency.RAW:
table = bigquery_api.raw_measurements_table
elif frequency == Frequency.HOURLY:
table = bigquery_api.hourly_measurements_table
else:
table = ""

table = {
Frequency.RAW: bigquery_api.raw_measurements_table,
Frequency.HOURLY: bigquery_api.hourly_measurements_table,
}.get(frequency, "")

raw_data = bigquery_api.query_data(
table=table,
start_date_time=start_date_time,
end_date_time=end_date_time,
network=DeviceNetwork.AIRQO,
network=device_network,
)

return DataValidationUtils.remove_outliers(raw_data)

@staticmethod
def remove_duplicates(data: pd.DataFrame) -> pd.DataFrame:
cols = data.columns.to_list()
cols.remove("timestamp")
cols.remove("device_number")
data.dropna(subset=cols, how="all", inplace=True)
"""
Removes duplicate rows from a pandas DataFrame based on 'device_id' and 'timestamp'
while ensuring missing values are filled and non-duplicated data is retained.

Steps:
1. Drops rows where all non-essential columns (except 'timestamp', 'device_id', and 'device_number') are NaN.
2. Drops rows where 'site_id' is NaN (assumed to be non-deployed devices).
3. Identifies duplicate rows based on 'device_id' and 'timestamp'.
4. Fills missing values for duplicates within each 'site_id' group using forward and backward filling.
5. Retains only the first occurrence of duplicates.

Args:
data (pd.DataFrame): The input DataFrame containing 'timestamp', 'device_id', and 'site_id' columns.

Returns:
pd.DataFrame: A cleaned DataFrame with duplicates handled and missing values filled.
"""
data["timestamp"] = pd.to_datetime(data["timestamp"])

non_essential_cols = [
col
for col in data.columns
if col not in ["timestamp", "device_id", "device_number", "site_id"]
]
data.dropna(subset=non_essential_cols, how="all", inplace=True)

# Drop rows where 'site_id' is NaN (non-deployed devices)
data.dropna(subset=["site_id"], inplace=True)

data["duplicated"] = data.duplicated(
keep=False, subset=["device_number", "timestamp"]
keep=False, subset=["device_id", "timestamp"]
)

if True not in data["duplicated"].values:
if not data["duplicated"].any():
data.drop(columns=["duplicated"], inplace=True)
return data

duplicated_data = data.loc[data["duplicated"]]
not_duplicated_data = data.loc[~data["duplicated"]]
duplicates = data[data["duplicated"]].copy()
non_duplicates = data[~data["duplicated"]].copy()

for _, by_device_number in duplicated_data.groupby(by="device_number"):
for _, by_timestamp in by_device_number.groupby(by="timestamp"):
by_timestamp = by_timestamp.copy()
by_timestamp.fillna(inplace=True, method="ffill")
by_timestamp.fillna(inplace=True, method="bfill")
by_timestamp.drop_duplicates(
subset=["device_number", "timestamp"], inplace=True, keep="first"
)
not_duplicated_data = pd.concat(
[not_duplicated_data, by_timestamp], ignore_index=True
)
columns_to_fill = [
col
for col in duplicates.columns
if col
not in [
"device_number",
"device_id",
"timestamp",
"latitude",
"longitude",
"network",
"site_id",
]
]

# Fill missing values within each 'site_id' group
filled_duplicates = []
for _, group in duplicates.groupby("site_id"):
group = group.sort_values(by=["device_id", "timestamp"])
group[columns_to_fill] = (
group[columns_to_fill].fillna(method="ffill").fillna(method="bfill")
)
group = group.drop_duplicates(
subset=["device_id", "timestamp"], keep="first"
)
filled_duplicates.append(group)

duplicates = pd.concat(filled_duplicates, ignore_index=True)
cleaned_data = pd.concat([non_duplicates, duplicates], ignore_index=True)

cleaned_data.drop(columns=["duplicated"], inplace=True)

return not_duplicated_data
return cleaned_data

@staticmethod
def extract_aggregated_raw_data(
Expand Down
2 changes: 1 addition & 1 deletion src/workflows/airqo_etl_utils/bigquery_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,7 @@ def compose_query(
columns = ", ".join(map(str, columns)) if columns else " * "
where_clause = f" timestamp between '{start_date_time}' and '{end_date_time}' "

if network:
if network and network != "all":
where_clause += f"AND network = '{network}' "

valid_cols = self.get_columns(table=table)
Expand Down
12 changes: 6 additions & 6 deletions src/workflows/dags/airqo_measurements.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,27 +215,27 @@ def load_data(airqo_data: pd.DataFrame):
def airqo_cleanup_measurements():
import pandas as pd

@task(provide_context=True)
@task(provide_context=True, retries=3, retry_delay=timedelta(minutes=5))
def extract_raw_data(**kwargs) -> pd.DataFrame:
from airqo_etl_utils.airqo_utils import AirQoDataUtils
from airqo_etl_utils.date import DateUtils

start_date_time, end_date_time = DateUtils.get_dag_date_time_values(
days=14, **kwargs
days=1, **kwargs
)
return AirQoDataUtils.extract_data_from_bigquery(
start_date_time=start_date_time,
end_date_time=end_date_time,
frequency=Frequency.RAW,
)

@task(provide_context=True)
@task(provide_context=True, retries=3, retry_delay=timedelta(minutes=5))
def extract_hourly_data(**kwargs) -> pd.DataFrame:
from airqo_etl_utils.airqo_utils import AirQoDataUtils
from airqo_etl_utils.date import DateUtils

start_date_time, end_date_time = DateUtils.get_dag_date_time_values(
days=14, **kwargs
days=1, **kwargs
)
return AirQoDataUtils.extract_data_from_bigquery(
start_date_time=start_date_time,
Expand All @@ -255,7 +255,7 @@ def remove_duplicated_hourly_data(data: pd.DataFrame) -> pd.DataFrame:

return AirQoDataUtils.remove_duplicates(data=data)

@task()
@task(provide_context=True, retries=3, retry_delay=timedelta(minutes=5))
def load_raw_data(data: pd.DataFrame):
from airqo_etl_utils.bigquery_api import BigQueryApi

Expand All @@ -264,7 +264,7 @@ def load_raw_data(data: pd.DataFrame):
dataframe=data, table=big_query_api.raw_measurements_table
)

@task()
@task(provide_context=True, retries=3, retry_delay=timedelta(minutes=5))
def load_hourly_data(data: pd.DataFrame):
from airqo_etl_utils.bigquery_api import BigQueryApi

Expand Down
Loading