Skip to content

Commit

Permalink
Merge pull request #4042 from NicholasTurner23/refactor-fix-update/co…
Browse files Browse the repository at this point in the history
…de-clean-up

Refactor fix update/code clean up
  • Loading branch information
Baalmart authored Dec 11, 2024
2 parents 741573b + 88f3ffe commit c0cf056
Show file tree
Hide file tree
Showing 12 changed files with 134 additions and 70 deletions.
10 changes: 4 additions & 6 deletions src/workflows/airqo_etl_utils/airqo_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -719,7 +719,7 @@ def get_cohorts(self, tenant: Tenant = Tenant.ALL) -> List[Dict[str, Any]]:
for cohort in response.get("cohorts", [])
]

def get_sites(self, tenant: Tenant = Tenant.ALL) -> List[Dict[str, Any]]:
def get_sites(self, network: str = "all") -> List[Dict[str, Any]]:
"""
Retrieve sites given a tenant.
Expand Down Expand Up @@ -766,19 +766,17 @@ def get_sites(self, tenant: Tenant = Tenant.ALL) -> List[Dict[str, Any]]:
},
]
"""
query_params = {"tenant": str(Tenant.AIRQO)}
query_params = {}

if tenant != Tenant.ALL:
query_params["network"] = str(tenant)
if network != "all":
query_params["network"] = network

response = self.__request("devices/sites", query_params)

return [
{
**site,
"site_id": site.get("_id", None),
"tenant": site.get("network", site.get("tenant", None)),
"location": site.get("location", None),
"approximate_latitude": site.get(
"approximate_latitude", site.get("latitude", None)
),
Expand Down
52 changes: 30 additions & 22 deletions src/workflows/airqo_etl_utils/airqo_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,6 @@ def aggregate_low_cost_sensors_data(data: pd.DataFrame) -> pd.DataFrame:
numeric_columns = data.select_dtypes(include=["number"]).columns
numeric_columns = numeric_columns.difference(["device_number"])
data_for_aggregation = data[["timestamp", "device_id"] + list(numeric_columns)]

aggregated = (
data_for_aggregation.groupby("device_id")
.apply(lambda group: group.resample("1H", on="timestamp").mean())
Expand Down Expand Up @@ -744,20 +743,19 @@ def process_data_for_api(data: pd.DataFrame, frequency: Frequency) -> list:
devices = airqo_api.get_devices()

device_lookup = {
device["device_number"]: device
for device in devices
if device.get("device_number")
device["device_id"]: device for device in devices if device.get("device_id")
}

for _, row in data.iterrows():
try:
device_number = row["device_number"]
device_id = row["device_id"]

# Get device details from the lookup dictionary
device_details = device_lookup.get(device_number)
device_details = device_lookup.get(device_id)
if not device_details:
logger.exception(
f"Device number {device_number} not found in device list."
f"Device number {device_id} not found in device list."
)
continue

Expand All @@ -766,7 +764,7 @@ def process_data_for_api(data: pd.DataFrame, frequency: Frequency) -> list:
"device_id": device_details["_id"],
"site_id": row["site_id"],
"device_number": device_number,
"tenant": str(Tenant.AIRQO),
"network": device_details["network"],
"location": {
"latitude": {"value": row["latitude"]},
"longitude": {"value": row["longitude"]},
Expand Down Expand Up @@ -832,7 +830,7 @@ def merge_aggregated_weather_data(
airqo_api = AirQoApi()
sites: List[Dict[str, Any]] = []

for site in airqo_api.get_sites(tenant=Tenant.AIRQO):
for site in airqo_api.get_sites(network="airqo"):
sites.extend(
[
{
Expand Down Expand Up @@ -894,7 +892,8 @@ def merge_aggregated_weather_data(
numeric_columns = measurements.select_dtypes(include=["number"]).columns
numeric_columns = numeric_columns.difference(["device_number"])
numeric_counts = measurements[numeric_columns].notna().sum(axis=1)
measurements = measurements[numeric_counts >= 1]
# Raws with more than 1 numeric values
measurements = measurements[numeric_counts > 1]
return measurements

@staticmethod
Expand Down Expand Up @@ -1012,12 +1011,10 @@ def calibrate_data(data: pd.DataFrame) -> pd.DataFrame:

data["timestamp"] = pd.to_datetime(data["timestamp"])
sites = AirQoApi().get_sites()
sites_df = pd.DataFrame(sites, columns=["_id", "city"]).rename(
columns={"_id": "site_id"}
)
sites_df = pd.DataFrame(sites, columns=["site_id", "city"])

data = pd.merge(data, sites_df, on="site_id", how="left")
data.dropna(subset=["device_id", "timestamp"], inplace=True)

columns_to_fill = [
"s1_pm2_5",
"s1_pm10",
Expand All @@ -1027,9 +1024,9 @@ def calibrate_data(data: pd.DataFrame) -> pd.DataFrame:
"humidity",
]

data[columns_to_fill] = data[columns_to_fill].fillna(0)
# TODO: Need to opt for a different approach eg forward fill, can't do here as df only has data of last 1 hour. Perhaps use raw data only?
# May have to rewrite entire pipeline flow
data[columns_to_fill] = data[columns_to_fill].fillna(0)

# additional input columns for calibration
data["avg_pm2_5"] = data[["s1_pm2_5", "s2_pm2_5"]].mean(axis=1).round(2)
Expand All @@ -1052,9 +1049,12 @@ def calibrate_data(data: pd.DataFrame) -> pd.DataFrame:
"pm2_5_pm10_mod",
]
data[input_variables] = data[input_variables].replace([np.inf, -np.inf], 0)
data.dropna(subset=input_variables, inplace=True)

grouped_df = data.groupby("city", dropna=False)
# Explicitly filter data to calibrate.
to_calibrate = data["network"] == "airqo"
data_to_calibrate = data.loc[to_calibrate]
data_to_calibrate.dropna(subset=input_variables, inplace=True)
grouped_df = data_to_calibrate.groupby("city", dropna=False)

rf_model = GCSUtils.get_trained_model_from_gcs(
project_name=project_id,
Expand All @@ -1071,6 +1071,8 @@ def calibrate_data(data: pd.DataFrame) -> pd.DataFrame:
),
)
for city, group in grouped_df:
# What was the intention of this?
# If the below condition fails, the rf_model and lasso_model default to the previously ones used and the ones set as "default" outside the forloop.
if str(city).lower() in [c.value.lower() for c in CityModel]:
try:
rf_model = GCSUtils.get_trained_model_from_gcs(
Expand All @@ -1087,6 +1089,7 @@ def calibrate_data(data: pd.DataFrame) -> pd.DataFrame:
)
except Exception as ex:
logger.exception(f"Error getting model: {ex}")
continue
group["pm2_5_calibrated_value"] = rf_model.predict(group[input_variables])
group["pm10_calibrated_value"] = lasso_model.predict(group[input_variables])

Expand All @@ -1100,15 +1103,20 @@ def calibrate_data(data: pd.DataFrame) -> pd.DataFrame:
data["pm2_5_raw_value"] = data[["s1_pm2_5", "s2_pm2_5"]].mean(axis=1)
data["pm10_raw_value"] = data[["s1_pm10", "s2_pm10"]].mean(axis=1)
if "pm2_5_calibrated_value" in data.columns:
data["pm2_5"] = data["pm2_5_calibrated_value"]
data.loc[to_calibrate, "pm2_5"] = data.loc[
to_calibrate, "pm2_5_calibrated_value"
]
else:
data["pm2_5_calibrated_value"] = None
data["pm2_5"] = None
data.loc[to_calibrate, "pm2_5_calibrated_value"] = None
data.loc[to_calibrate, "pm2_5"] = None
if "pm10_calibrated_value" in data.columns:
data["pm10"] = data["pm10_calibrated_value"]
data.loc[to_calibrate, "pm10"] = data.loc[
to_calibrate, "pm10_calibrated_value"
]
else:
data["pm10_calibrated_value"] = None
data["pm10"] = None
data.loc[to_calibrate, "pm10_calibrated_value"] = None
data.loc[to_calibrate, "pm10"] = None

data["pm2_5"] = data["pm2_5"].fillna(data["pm2_5_raw_value"])
data["pm10"] = data["pm10"].fillna(data["pm10_raw_value"])

Expand Down
37 changes: 26 additions & 11 deletions src/workflows/airqo_etl_utils/data_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,8 @@ def process_for_big_query(dataframe: pd.DataFrame, table: str) -> pd.DataFrame:
@staticmethod
def process_data_for_message_broker(
data: pd.DataFrame,
topic: str,
caller: str,
topic: str = None,
frequency: Frequency = Frequency.HOURLY,
) -> pd.DataFrame:
"""
Expand All @@ -214,16 +214,31 @@ def process_data_for_message_broker(
data.rename(columns={"device_id": "device_name"}, inplace=True)

devices = AirQoDataUtils.get_devices(group_id=caller)
devices = devices[
["device_name", "site_id", "device_latitude", "device_longitude", "network"]
]

data = pd.merge(
left=data,
right=devices,
on=["device_name", "site_id", "network"],
how="left",
)
try:
devices = devices[
[
"device_name",
"site_id",
"device_latitude",
"device_longitude",
"network",
]
]

data = pd.merge(
left=data,
right=devices,
on=["device_name", "site_id", "network"],
how="left",
)
except KeyError as e:
logger.exception(
f"KeyError: The key(s) '{e.args}' are not available in the returned devices data."
)
return None
except Exception as e:
logger.exception(f"An error occured: {e}")
return None
return data

@staticmethod
Expand Down
8 changes: 4 additions & 4 deletions src/workflows/airqo_etl_utils/data_warehouse_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,14 +106,14 @@ def extract_hourly_weather_data(
)

@staticmethod
def extract_sites_meta_data(tenant: Tenant = Tenant.ALL) -> pd.DataFrame:
def extract_sites_meta_data(network: str = "all") -> pd.DataFrame:
airqo_api = AirQoApi()
sites = airqo_api.get_sites(tenant=tenant)
sites = airqo_api.get_sites(network=network)
sites = pd.DataFrame(sites)
sites.rename(
columns={
"latitude": "site_latitude",
"longitude": "site_longitude",
"approximate_latitude": "site_latitude",
"approximate_longitude": "site_longitude",
"description": "site_description",
"altitude": "site_altitude",
"name": "site_name",
Expand Down
32 changes: 16 additions & 16 deletions src/workflows/airqo_etl_utils/meta_data_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,8 @@ def merge_cohorts_and_devices(data: pd.DataFrame) -> pd.DataFrame:
return pd.DataFrame(merged_data)

@staticmethod
def extract_sites_from_api(tenant: Tenant = Tenant.ALL) -> pd.DataFrame:
sites = AirQoApi().get_sites(tenant=tenant)
def extract_sites_from_api(network: str = "all") -> pd.DataFrame:
sites = AirQoApi().get_sites(network=network)
dataframe = pd.json_normalize(sites)
dataframe = dataframe[
[
Expand Down Expand Up @@ -155,8 +155,8 @@ def extract_sites_from_api(tenant: Tenant = Tenant.ALL) -> pd.DataFrame:
return dataframe

@staticmethod
def extract_sites_meta_data_from_api(tenant: Tenant = Tenant.ALL) -> pd.DataFrame:
sites = AirQoApi().get_sites(tenant=tenant)
def extract_sites_meta_data_from_api(network: str = "all") -> pd.DataFrame:
sites = AirQoApi().get_sites(network=network)
dataframe = pd.json_normalize(sites)
big_query_api = BigQueryApi()
cols = big_query_api.get_columns(table=big_query_api.sites_meta_data_table)
Expand All @@ -167,15 +167,15 @@ def extract_sites_meta_data_from_api(tenant: Tenant = Tenant.ALL) -> pd.DataFram
return dataframe

@staticmethod
def update_nearest_weather_stations(tenant: Tenant) -> None:
def update_nearest_weather_stations(network: str) -> None:
airqo_api = AirQoApi()
sites = airqo_api.get_sites(tenant=tenant)
sites = airqo_api.get_sites(network=network)
sites_data = [
{
"site_id": site.get("site_id", None),
"tenant": site.get("tenant", None),
"latitude": site.get("latitude", None),
"longitude": site.get("longitude", None),
"network": site.get("network", None),
"latitude": site.get("approximate_latitude", None),
"longitude": site.get("approximate_longitude", None),
}
for site in sites
]
Expand All @@ -184,24 +184,24 @@ def update_nearest_weather_stations(tenant: Tenant) -> None:
updated_sites = [
{
"site_id": site.get("site_id"),
"tenant": site.get("tenant"),
"network": site.get("network"),
"weather_stations": site.get("weather_stations"),
}
for site in updated_sites
]
airqo_api.update_sites(updated_sites)

@staticmethod
def update_sites_distance_measures(tenant: Tenant) -> None:
def update_sites_distance_measures(network: str) -> None:
airqo_api = AirQoApi()
sites = airqo_api.get_sites(tenant=tenant)
sites = airqo_api.get_sites(network=network)
updated_sites = []
for site in sites:
record = {
"site_id": site.get("site_id", None),
"tenant": site.get("tenant", None),
"latitude": site.get("latitude", None),
"longitude": site.get("longitude", None),
"network": site.get("network", None),
"latitude": site.get("approximate_latitude", None),
"longitude": site.get("approximate_longitude", None),
}
meta_data = airqo_api.get_meta_data(
latitude=record.get("latitude"),
Expand All @@ -212,7 +212,7 @@ def update_sites_distance_measures(tenant: Tenant) -> None:
updated_sites.append(
{
**meta_data,
**{"site_id": record["site_id"], "tenant": record["tenant"]},
**{"site_id": record["site_id"], "network": record["network"]},
}
)

Expand Down
16 changes: 14 additions & 2 deletions src/workflows/dags/airnow.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from airqo_etl_utils.workflows_custom_utils import AirflowUtils
from datetime import timedelta
from airqo_etl_utils.config import configuration
from airflow.exceptions import AirflowFailException


# Historical Data DAG
Expand Down Expand Up @@ -48,9 +49,14 @@ def send_to_message_broker(data: pd.DataFrame, **kwargs):

data = DataValidationUtils.process_data_for_message_broker(
data=data,
topic=configuration.HOURLY_MEASUREMENTS_TOPIC,
caller=kwargs["dag"].dag_id + unique_str,
topic=configuration.HOURLY_MEASUREMENTS_TOPIC,
)
if not data:
raise AirflowFailException(
"Processing for message broker failed. Please check if kafka is up and running."
)

broker = MessageBrokerUtils()
broker.publish_to_topic(
topic=configuration.HOURLY_MEASUREMENTS_TOPIC, data=data
Expand Down Expand Up @@ -130,9 +136,15 @@ def send_to_message_broker(data: pd.DataFrame, **kwargs):

data = DataValidationUtils.process_data_for_message_broker(
data=data,
topic=configuration.HOURLY_MEASUREMENTS_TOPIC,
caller=kwargs["dag"].dag_id + unique_str,
topic=configuration.HOURLY_MEASUREMENTS_TOPIC,
)

if not data:
raise AirflowFailException(
"Processing for message broker failed. Please check if kafka is up and running."
)

broker = MessageBrokerUtils()
broker.publish_to_topic(
topic=configuration.HOURLY_MEASUREMENTS_TOPIC, data=data
Expand Down
2 changes: 1 addition & 1 deletion src/workflows/dags/airqo_automated_tweets.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def create_forecast_tweets():
def retrieve_sites():
from airqo_etl_utils.airqo_api import AirQoApi

return AirQoApi().get_sites(tenant=Tenant.AIRQO)
return AirQoApi().get_sites(network="airqo")

@task()
def select_forecast_sites(sites):
Expand Down
Loading

0 comments on commit c0cf056

Please sign in to comment.