Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor fix update/code clean up #4042

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions src/workflows/airqo_etl_utils/airqo_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -719,7 +719,7 @@ def get_cohorts(self, tenant: Tenant = Tenant.ALL) -> List[Dict[str, Any]]:
for cohort in response.get("cohorts", [])
]

def get_sites(self, tenant: Tenant = Tenant.ALL) -> List[Dict[str, Any]]:
def get_sites(self, network: str = "all") -> List[Dict[str, Any]]:
"""
Retrieve sites given a tenant.

Expand Down Expand Up @@ -766,19 +766,17 @@ def get_sites(self, tenant: Tenant = Tenant.ALL) -> List[Dict[str, Any]]:
},
]
"""
query_params = {"tenant": str(Tenant.AIRQO)}
query_params = {}

if tenant != Tenant.ALL:
query_params["network"] = str(tenant)
if network != "all":
query_params["network"] = network

response = self.__request("devices/sites", query_params)

return [
{
**site,
"site_id": site.get("_id", None),
"tenant": site.get("network", site.get("tenant", None)),
"location": site.get("location", None),
"approximate_latitude": site.get(
"approximate_latitude", site.get("latitude", None)
),
Expand Down
52 changes: 30 additions & 22 deletions src/workflows/airqo_etl_utils/airqo_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,6 @@ def aggregate_low_cost_sensors_data(data: pd.DataFrame) -> pd.DataFrame:
numeric_columns = data.select_dtypes(include=["number"]).columns
numeric_columns = numeric_columns.difference(["device_number"])
data_for_aggregation = data[["timestamp", "device_id"] + list(numeric_columns)]

aggregated = (
data_for_aggregation.groupby("device_id")
.apply(lambda group: group.resample("1H", on="timestamp").mean())
Expand Down Expand Up @@ -744,20 +743,19 @@ def process_data_for_api(data: pd.DataFrame, frequency: Frequency) -> list:
devices = airqo_api.get_devices()

device_lookup = {
device["device_number"]: device
for device in devices
if device.get("device_number")
device["device_id"]: device for device in devices if device.get("device_id")
}

for _, row in data.iterrows():
try:
device_number = row["device_number"]
device_id = row["device_id"]

# Get device details from the lookup dictionary
device_details = device_lookup.get(device_number)
device_details = device_lookup.get(device_id)
if not device_details:
logger.exception(
f"Device number {device_number} not found in device list."
f"Device number {device_id} not found in device list."
Comment on lines +755 to +758
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Use appropriate logging level for missing device details.

The use of logger.exception is intended for logging exceptions along with stack traces. Since not finding a device in the lookup is not an exception but a possible data inconsistency, consider using logger.warning instead.

Apply this change to adjust the logging level:

- logger.exception(
+ logger.warning(
    f"Device number {device_id} not found in device list."
)

Additionally, consider correcting the log message to reference "Device ID" instead of "Device number" for clarity.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
device_details = device_lookup.get(device_id)
if not device_details:
logger.exception(
f"Device number {device_number} not found in device list."
f"Device number {device_id} not found in device list."
device_details = device_lookup.get(device_id)
if not device_details:
logger.warning(
f"Device number {device_id} not found in device list."

)
continue

Expand All @@ -766,7 +764,7 @@ def process_data_for_api(data: pd.DataFrame, frequency: Frequency) -> list:
"device_id": device_details["_id"],
"site_id": row["site_id"],
"device_number": device_number,
"tenant": str(Tenant.AIRQO),
"network": device_details["network"],
"location": {
"latitude": {"value": row["latitude"]},
"longitude": {"value": row["longitude"]},
Expand Down Expand Up @@ -832,7 +830,7 @@ def merge_aggregated_weather_data(
airqo_api = AirQoApi()
sites: List[Dict[str, Any]] = []

for site in airqo_api.get_sites(tenant=Tenant.AIRQO):
for site in airqo_api.get_sites(network="airqo"):
sites.extend(
[
{
Expand Down Expand Up @@ -894,7 +892,8 @@ def merge_aggregated_weather_data(
numeric_columns = measurements.select_dtypes(include=["number"]).columns
numeric_columns = numeric_columns.difference(["device_number"])
numeric_counts = measurements[numeric_columns].notna().sum(axis=1)
measurements = measurements[numeric_counts >= 1]
# Raws with more than 1 numeric values
measurements = measurements[numeric_counts > 1]
return measurements

@staticmethod
Expand Down Expand Up @@ -1012,12 +1011,10 @@ def calibrate_data(data: pd.DataFrame) -> pd.DataFrame:

data["timestamp"] = pd.to_datetime(data["timestamp"])
sites = AirQoApi().get_sites()
sites_df = pd.DataFrame(sites, columns=["_id", "city"]).rename(
columns={"_id": "site_id"}
)
sites_df = pd.DataFrame(sites, columns=["site_id", "city"])

data = pd.merge(data, sites_df, on="site_id", how="left")
data.dropna(subset=["device_id", "timestamp"], inplace=True)

columns_to_fill = [
"s1_pm2_5",
"s1_pm10",
Expand All @@ -1027,9 +1024,9 @@ def calibrate_data(data: pd.DataFrame) -> pd.DataFrame:
"humidity",
]

data[columns_to_fill] = data[columns_to_fill].fillna(0)
# TODO: Need to opt for a different approach eg forward fill, can't do here as df only has data of last 1 hour. Perhaps use raw data only?
# May have to rewrite entire pipeline flow
data[columns_to_fill] = data[columns_to_fill].fillna(0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Re-evaluate filling NaN values with zero in sensor data.

Filling missing sensor readings with zero may introduce inaccuracies since a zero value could be misinterpreted as a valid measurement rather than missing data.

Consider using a method that preserves the distinction between missing and zero values, such as forward-filling or imputing with the mean or median where appropriate.

- data[columns_to_fill] = data[columns_to_fill].fillna(0)
+ data[columns_to_fill] = data[columns_to_fill].interpolate(method='linear', limit_direction='forward', axis=0)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
data[columns_to_fill] = data[columns_to_fill].fillna(0)
data[columns_to_fill] = data[columns_to_fill].interpolate(method='linear', limit_direction='forward', axis=0)


# additional input columns for calibration
data["avg_pm2_5"] = data[["s1_pm2_5", "s2_pm2_5"]].mean(axis=1).round(2)
Expand All @@ -1052,9 +1049,12 @@ def calibrate_data(data: pd.DataFrame) -> pd.DataFrame:
"pm2_5_pm10_mod",
]
data[input_variables] = data[input_variables].replace([np.inf, -np.inf], 0)
data.dropna(subset=input_variables, inplace=True)

grouped_df = data.groupby("city", dropna=False)
# Explicitly filter data to calibrate.
to_calibrate = data["network"] == "airqo"
data_to_calibrate = data.loc[to_calibrate]
data_to_calibrate.dropna(subset=input_variables, inplace=True)
grouped_df = data_to_calibrate.groupby("city", dropna=False)
Comment on lines +1053 to +1057
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Missing city values are handled with a default value

The codebase shows that missing city values are handled by filling them with a default value "Missing City" before any groupby operations, as seen in analytics/jobs/reports/main.py. This same pattern should be applied here to prevent potential issues with missing city values.

Consider adding:

  • data_to_calibrate['city'] = data_to_calibrate['city'].fillna('Missing City') before the groupby operation
🔗 Analysis chain

Handle potential missing 'city' values when grouping data.

Grouping by 'city' when calibrating data assumes all entries have a 'city' value. If 'city' is missing for some records, this could lead to unexpected behavior.

Run the following script to identify records with missing 'city' values:


Let me analyze the codebase to understand how the city column is handled before and after the grouping operation. This will help us determine if there are any existing safeguards or potential issues with missing city values.


This will help us understand how the city column is handled and if there are any existing safeguards for missing values before the groupby operation.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Find records without 'city' after merging.

# Test: Check for missing 'city' values in data_to_calibrate. Expect: No records with missing 'city'.
python -c '
import pandas as pd
data = pd.read_csv("data.csv")  # Replace with the actual data source
to_calibrate = data["network"] == "airqo"
data_to_calibrate = data.loc[to_calibrate]
missing_city = data_to_calibrate[data_to_calibrate["city"].isnull()]
if not missing_city.empty:
    print("Records with missing city:")
    print(missing_city)
'

Length of output: 447


Script:

#!/bin/bash
# Let's analyze the codebase to understand the data processing flow and city handling
# First, find where the data is loaded and how city is handled
rg -l "city" --type py

# Then look for specific patterns related to city column handling
rg "city" --type py -A 3 -B 3

# Also check for any data validation or preprocessing of city values
ast-grep --pattern 'def $func($$$) {
  $$$
  city$$$
  $$$
}'

Length of output: 52521


Script:

#!/bin/bash
# Let's check how the city column is populated before the groupby operation
rg -A 5 -B 5 "data\[\"city\"\]|data\['city'\]|data\.city" --type py

# Also check for any data validation or preprocessing specifically for city values
rg "dropna.*city" --type py
rg "fillna.*city" --type py

# Look for any error handling around city values
rg "try.*city.*except" --type py -A 5

Length of output: 2973


rf_model = GCSUtils.get_trained_model_from_gcs(
project_name=project_id,
Expand All @@ -1071,6 +1071,8 @@ def calibrate_data(data: pd.DataFrame) -> pd.DataFrame:
),
)
for city, group in grouped_df:
# What was the intention of this?
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Mnoble-19 have you ever noticed this?

# If the below condition fails, the rf_model and lasso_model default to the previously ones used and the ones set as "default" outside the forloop.
if str(city).lower() in [c.value.lower() for c in CityModel]:
try:
rf_model = GCSUtils.get_trained_model_from_gcs(
Expand All @@ -1087,6 +1089,7 @@ def calibrate_data(data: pd.DataFrame) -> pd.DataFrame:
)
except Exception as ex:
logger.exception(f"Error getting model: {ex}")
continue
group["pm2_5_calibrated_value"] = rf_model.predict(group[input_variables])
group["pm10_calibrated_value"] = lasso_model.predict(group[input_variables])

Expand All @@ -1100,15 +1103,20 @@ def calibrate_data(data: pd.DataFrame) -> pd.DataFrame:
data["pm2_5_raw_value"] = data[["s1_pm2_5", "s2_pm2_5"]].mean(axis=1)
data["pm10_raw_value"] = data[["s1_pm10", "s2_pm10"]].mean(axis=1)
if "pm2_5_calibrated_value" in data.columns:
data["pm2_5"] = data["pm2_5_calibrated_value"]
data.loc[to_calibrate, "pm2_5"] = data.loc[
to_calibrate, "pm2_5_calibrated_value"
]
else:
data["pm2_5_calibrated_value"] = None
data["pm2_5"] = None
data.loc[to_calibrate, "pm2_5_calibrated_value"] = None
data.loc[to_calibrate, "pm2_5"] = None
if "pm10_calibrated_value" in data.columns:
data["pm10"] = data["pm10_calibrated_value"]
data.loc[to_calibrate, "pm10"] = data.loc[
to_calibrate, "pm10_calibrated_value"
]
else:
data["pm10_calibrated_value"] = None
data["pm10"] = None
data.loc[to_calibrate, "pm10_calibrated_value"] = None
data.loc[to_calibrate, "pm10"] = None

data["pm2_5"] = data["pm2_5"].fillna(data["pm2_5_raw_value"])
data["pm10"] = data["pm10"].fillna(data["pm10_raw_value"])

Expand Down
37 changes: 26 additions & 11 deletions src/workflows/airqo_etl_utils/data_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,8 @@ def process_for_big_query(dataframe: pd.DataFrame, table: str) -> pd.DataFrame:
@staticmethod
def process_data_for_message_broker(
data: pd.DataFrame,
topic: str,
caller: str,
topic: str = None,
frequency: Frequency = Frequency.HOURLY,
) -> pd.DataFrame:
"""
Expand All @@ -214,16 +214,31 @@ def process_data_for_message_broker(
data.rename(columns={"device_id": "device_name"}, inplace=True)

devices = AirQoDataUtils.get_devices(group_id=caller)
devices = devices[
["device_name", "site_id", "device_latitude", "device_longitude", "network"]
]

data = pd.merge(
left=data,
right=devices,
on=["device_name", "site_id", "network"],
how="left",
)
try:
devices = devices[
[
"device_name",
"site_id",
"device_latitude",
"device_longitude",
"network",
]
]

data = pd.merge(
left=data,
right=devices,
on=["device_name", "site_id", "network"],
how="left",
)
except KeyError as e:
logger.exception(
f"KeyError: The key(s) '{e.args}' are not available in the returned devices data."
)
return None
except Exception as e:
logger.exception(f"An error occured: {e}")
return None
return data

@staticmethod
Expand Down
8 changes: 4 additions & 4 deletions src/workflows/airqo_etl_utils/data_warehouse_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,14 +106,14 @@ def extract_hourly_weather_data(
)

@staticmethod
def extract_sites_meta_data(tenant: Tenant = Tenant.ALL) -> pd.DataFrame:
def extract_sites_meta_data(network: str = "all") -> pd.DataFrame:
airqo_api = AirQoApi()
sites = airqo_api.get_sites(tenant=tenant)
sites = airqo_api.get_sites(network=network)
sites = pd.DataFrame(sites)
sites.rename(
columns={
"latitude": "site_latitude",
"longitude": "site_longitude",
"approximate_latitude": "site_latitude",
"approximate_longitude": "site_longitude",
"description": "site_description",
"altitude": "site_altitude",
"name": "site_name",
Expand Down
32 changes: 16 additions & 16 deletions src/workflows/airqo_etl_utils/meta_data_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,8 @@ def merge_cohorts_and_devices(data: pd.DataFrame) -> pd.DataFrame:
return pd.DataFrame(merged_data)

@staticmethod
def extract_sites_from_api(tenant: Tenant = Tenant.ALL) -> pd.DataFrame:
sites = AirQoApi().get_sites(tenant=tenant)
def extract_sites_from_api(network: str = "all") -> pd.DataFrame:
sites = AirQoApi().get_sites(network=network)
dataframe = pd.json_normalize(sites)
dataframe = dataframe[
[
Expand Down Expand Up @@ -155,8 +155,8 @@ def extract_sites_from_api(tenant: Tenant = Tenant.ALL) -> pd.DataFrame:
return dataframe

@staticmethod
def extract_sites_meta_data_from_api(tenant: Tenant = Tenant.ALL) -> pd.DataFrame:
sites = AirQoApi().get_sites(tenant=tenant)
def extract_sites_meta_data_from_api(network: str = "all") -> pd.DataFrame:
sites = AirQoApi().get_sites(network=network)
dataframe = pd.json_normalize(sites)
big_query_api = BigQueryApi()
cols = big_query_api.get_columns(table=big_query_api.sites_meta_data_table)
Expand All @@ -167,15 +167,15 @@ def extract_sites_meta_data_from_api(tenant: Tenant = Tenant.ALL) -> pd.DataFram
return dataframe

@staticmethod
def update_nearest_weather_stations(tenant: Tenant) -> None:
def update_nearest_weather_stations(network: str) -> None:
airqo_api = AirQoApi()
sites = airqo_api.get_sites(tenant=tenant)
sites = airqo_api.get_sites(network=network)
sites_data = [
{
"site_id": site.get("site_id", None),
"tenant": site.get("tenant", None),
"latitude": site.get("latitude", None),
"longitude": site.get("longitude", None),
"network": site.get("network", None),
"latitude": site.get("approximate_latitude", None),
"longitude": site.get("approximate_longitude", None),
}
for site in sites
]
Expand All @@ -184,24 +184,24 @@ def update_nearest_weather_stations(tenant: Tenant) -> None:
updated_sites = [
{
"site_id": site.get("site_id"),
"tenant": site.get("tenant"),
"network": site.get("network"),
"weather_stations": site.get("weather_stations"),
}
for site in updated_sites
]
airqo_api.update_sites(updated_sites)

@staticmethod
def update_sites_distance_measures(tenant: Tenant) -> None:
def update_sites_distance_measures(network: str) -> None:
airqo_api = AirQoApi()
sites = airqo_api.get_sites(tenant=tenant)
sites = airqo_api.get_sites(network=network)
updated_sites = []
for site in sites:
record = {
"site_id": site.get("site_id", None),
"tenant": site.get("tenant", None),
"latitude": site.get("latitude", None),
"longitude": site.get("longitude", None),
"network": site.get("network", None),
"latitude": site.get("approximate_latitude", None),
"longitude": site.get("approximate_longitude", None),
}
meta_data = airqo_api.get_meta_data(
latitude=record.get("latitude"),
Expand All @@ -212,7 +212,7 @@ def update_sites_distance_measures(tenant: Tenant) -> None:
updated_sites.append(
{
**meta_data,
**{"site_id": record["site_id"], "tenant": record["tenant"]},
**{"site_id": record["site_id"], "network": record["network"]},
}
)

Expand Down
16 changes: 14 additions & 2 deletions src/workflows/dags/airnow.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from airqo_etl_utils.workflows_custom_utils import AirflowUtils
from datetime import timedelta
from airqo_etl_utils.config import configuration
from airflow.exceptions import AirflowFailException


# Historical Data DAG
Expand Down Expand Up @@ -48,9 +49,14 @@ def send_to_message_broker(data: pd.DataFrame, **kwargs):

data = DataValidationUtils.process_data_for_message_broker(
data=data,
topic=configuration.HOURLY_MEASUREMENTS_TOPIC,
caller=kwargs["dag"].dag_id + unique_str,
topic=configuration.HOURLY_MEASUREMENTS_TOPIC,
)
if not data:
raise AirflowFailException(
"Processing for message broker failed. Please check if kafka is up and running."
)

broker = MessageBrokerUtils()
broker.publish_to_topic(
topic=configuration.HOURLY_MEASUREMENTS_TOPIC, data=data
Expand Down Expand Up @@ -130,9 +136,15 @@ def send_to_message_broker(data: pd.DataFrame, **kwargs):

data = DataValidationUtils.process_data_for_message_broker(
data=data,
topic=configuration.HOURLY_MEASUREMENTS_TOPIC,
caller=kwargs["dag"].dag_id + unique_str,
topic=configuration.HOURLY_MEASUREMENTS_TOPIC,
)

if not data:
raise AirflowFailException(
"Processing for message broker failed. Please check if kafka is up and running."
)

broker = MessageBrokerUtils()
broker.publish_to_topic(
topic=configuration.HOURLY_MEASUREMENTS_TOPIC, data=data
Expand Down
2 changes: 1 addition & 1 deletion src/workflows/dags/airqo_automated_tweets.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def create_forecast_tweets():
def retrieve_sites():
from airqo_etl_utils.airqo_api import AirQoApi

return AirQoApi().get_sites(tenant=Tenant.AIRQO)
return AirQoApi().get_sites(network="airqo")

@task()
def select_forecast_sites(sites):
Expand Down
Loading
Loading