Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

move to production #4043

Merged
merged 15 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
15 commits
Select commit Hold shift + click to select a range
10c64f1
Update device registry staging image tag to stage-9f584165-1733862604
github-actions[bot] Dec 10, 2024
817fd02
Update AirQo exceedance production image tag to prod-3c9423e2-1733862695
github-actions[bot] Dec 10, 2024
4470535
Update KCCA exceedance production image tag to prod-3c9423e2-1733862695
github-actions[bot] Dec 10, 2024
c1213ea
Update device registry production image tag to prod-3c9423e2-1733862695
github-actions[bot] Dec 10, 2024
a4fcc3d
Update website production image tag to prod-3c9423e2-1733862695
github-actions[bot] Dec 10, 2024
b21851d
Update workflows prod image tag to prod-3c9423e2-1733862695
github-actions[bot] Dec 10, 2024
74a7839
Update analytics production image tag to prod-3c9423e2-1733862695
github-actions[bot] Dec 10, 2024
8441af6
Update predict production image tag to prod-3c9423e2-1733862695
github-actions[bot] Dec 10, 2024
741573b
Update spatial production image tag to prod-3c9423e2-1733862695
github-actions[bot] Dec 10, 2024
fff6c3f
Gracefully handle error due to failure to get devices data from kafka.
NicholasTurner23 Dec 11, 2024
cc7f77e
Updates sites tenant/network
NicholasTurner23 Dec 11, 2024
b82fe30
Explicitly filter data to calibrate while maintaining the rest
NicholasTurner23 Dec 11, 2024
0183031
Updates/cleanup
NicholasTurner23 Dec 11, 2024
88f3ffe
Merge branch 'staging' into refactor-fix-update/code-clean-up
NicholasTurner23 Dec 11, 2024
c0cf056
Merge pull request #4042 from NicholasTurner23/refactor-fix-update/co…
Baalmart Dec 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion k8s/analytics/values-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ images:
celeryWorker: eu.gcr.io/airqo-250220/airqo-analytics-celery-worker
reportJob: eu.gcr.io/airqo-250220/airqo-analytics-report-job
devicesSummaryJob: eu.gcr.io/airqo-250220/airqo-analytics-devices-summary-job
tag: prod-5e45c956-1733849508
tag: prod-3c9423e2-1733862695
api:
name: airqo-analytics-api
label: analytics-api
Expand Down
2 changes: 1 addition & 1 deletion k8s/device-registry/values-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ app:
replicaCount: 3
image:
repository: eu.gcr.io/airqo-250220/airqo-device-registry-api
tag: prod-5e45c956-1733849508
tag: prod-3c9423e2-1733862695
nameOverride: ''
fullnameOverride: ''
podAnnotations: {}
Expand Down
2 changes: 1 addition & 1 deletion k8s/device-registry/values-stage.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ app:
replicaCount: 2
image:
repository: eu.gcr.io/airqo-250220/airqo-stage-device-registry-api
tag: stage-c1e369be-1733844028
tag: stage-9f584165-1733862604
nameOverride: ''
fullnameOverride: ''
podAnnotations: {}
Expand Down
2 changes: 1 addition & 1 deletion k8s/exceedance/values-prod-airqo.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ app:
configmap: env-exceedance-production
image:
repository: eu.gcr.io/airqo-250220/airqo-exceedance-job
tag: prod-5e45c956-1733849508
tag: prod-3c9423e2-1733862695
nameOverride: ''
fullnameOverride: ''
2 changes: 1 addition & 1 deletion k8s/exceedance/values-prod-kcca.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ app:
configmap: env-exceedance-production
image:
repository: eu.gcr.io/airqo-250220/kcca-exceedance-job
tag: prod-5e45c956-1733849508
tag: prod-3c9423e2-1733862695
nameOverride: ''
fullnameOverride: ''
2 changes: 1 addition & 1 deletion k8s/predict/values-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ images:
predictJob: eu.gcr.io/airqo-250220/airqo-predict-job
trainJob: eu.gcr.io/airqo-250220/airqo-train-job
predictPlaces: eu.gcr.io/airqo-250220/airqo-predict-places-air-quality
tag: prod-5e45c956-1733849508
tag: prod-3c9423e2-1733862695
api:
name: airqo-prediction-api
label: prediction-api
Expand Down
2 changes: 1 addition & 1 deletion k8s/spatial/values-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ app:
replicaCount: 3
image:
repository: eu.gcr.io/airqo-250220/airqo-spatial-api
tag: prod-5e45c956-1733849508
tag: prod-3c9423e2-1733862695
nameOverride: ''
fullnameOverride: ''
podAnnotations: {}
Expand Down
2 changes: 1 addition & 1 deletion k8s/website/values-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ app:
replicaCount: 3
image:
repository: eu.gcr.io/airqo-250220/airqo-website-api
tag: prod-5e45c956-1733849508
tag: prod-3c9423e2-1733862695
nameOverride: ''
fullnameOverride: ''
podAnnotations: {}
Expand Down
2 changes: 1 addition & 1 deletion k8s/workflows/values-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ images:
initContainer: eu.gcr.io/airqo-250220/airqo-workflows-xcom
redisContainer: eu.gcr.io/airqo-250220/airqo-redis
containers: eu.gcr.io/airqo-250220/airqo-workflows
tag: prod-5e45c956-1733849508
tag: prod-3c9423e2-1733862695
nameOverride: ''
fullnameOverride: ''
podAnnotations: {}
Expand Down
10 changes: 4 additions & 6 deletions src/workflows/airqo_etl_utils/airqo_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -719,7 +719,7 @@ def get_cohorts(self, tenant: Tenant = Tenant.ALL) -> List[Dict[str, Any]]:
for cohort in response.get("cohorts", [])
]

def get_sites(self, tenant: Tenant = Tenant.ALL) -> List[Dict[str, Any]]:
def get_sites(self, network: str = "all") -> List[Dict[str, Any]]:
"""
Retrieve sites given a tenant.

Expand Down Expand Up @@ -766,19 +766,17 @@ def get_sites(self, tenant: Tenant = Tenant.ALL) -> List[Dict[str, Any]]:
},
]
"""
query_params = {"tenant": str(Tenant.AIRQO)}
query_params = {}

if tenant != Tenant.ALL:
query_params["network"] = str(tenant)
if network != "all":
query_params["network"] = network

response = self.__request("devices/sites", query_params)

return [
{
**site,
"site_id": site.get("_id", None),
"tenant": site.get("network", site.get("tenant", None)),
"location": site.get("location", None),
"approximate_latitude": site.get(
"approximate_latitude", site.get("latitude", None)
),
Expand Down
52 changes: 30 additions & 22 deletions src/workflows/airqo_etl_utils/airqo_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,6 @@ def aggregate_low_cost_sensors_data(data: pd.DataFrame) -> pd.DataFrame:
numeric_columns = data.select_dtypes(include=["number"]).columns
numeric_columns = numeric_columns.difference(["device_number"])
data_for_aggregation = data[["timestamp", "device_id"] + list(numeric_columns)]

aggregated = (
data_for_aggregation.groupby("device_id")
.apply(lambda group: group.resample("1H", on="timestamp").mean())
Expand Down Expand Up @@ -744,20 +743,19 @@ def process_data_for_api(data: pd.DataFrame, frequency: Frequency) -> list:
devices = airqo_api.get_devices()

device_lookup = {
device["device_number"]: device
for device in devices
if device.get("device_number")
device["device_id"]: device for device in devices if device.get("device_id")
}

for _, row in data.iterrows():
try:
device_number = row["device_number"]
device_id = row["device_id"]

# Get device details from the lookup dictionary
device_details = device_lookup.get(device_number)
device_details = device_lookup.get(device_id)
if not device_details:
logger.exception(
f"Device number {device_number} not found in device list."
f"Device number {device_id} not found in device list."
)
continue

Expand All @@ -766,7 +764,7 @@ def process_data_for_api(data: pd.DataFrame, frequency: Frequency) -> list:
"device_id": device_details["_id"],
"site_id": row["site_id"],
"device_number": device_number,
"tenant": str(Tenant.AIRQO),
"network": device_details["network"],
"location": {
"latitude": {"value": row["latitude"]},
"longitude": {"value": row["longitude"]},
Expand Down Expand Up @@ -832,7 +830,7 @@ def merge_aggregated_weather_data(
airqo_api = AirQoApi()
sites: List[Dict[str, Any]] = []

for site in airqo_api.get_sites(tenant=Tenant.AIRQO):
for site in airqo_api.get_sites(network="airqo"):
sites.extend(
[
{
Expand Down Expand Up @@ -894,7 +892,8 @@ def merge_aggregated_weather_data(
numeric_columns = measurements.select_dtypes(include=["number"]).columns
numeric_columns = numeric_columns.difference(["device_number"])
numeric_counts = measurements[numeric_columns].notna().sum(axis=1)
measurements = measurements[numeric_counts >= 1]
# Raws with more than 1 numeric values
measurements = measurements[numeric_counts > 1]
return measurements

@staticmethod
Expand Down Expand Up @@ -1012,12 +1011,10 @@ def calibrate_data(data: pd.DataFrame) -> pd.DataFrame:

data["timestamp"] = pd.to_datetime(data["timestamp"])
sites = AirQoApi().get_sites()
sites_df = pd.DataFrame(sites, columns=["_id", "city"]).rename(
columns={"_id": "site_id"}
)
sites_df = pd.DataFrame(sites, columns=["site_id", "city"])

data = pd.merge(data, sites_df, on="site_id", how="left")
data.dropna(subset=["device_id", "timestamp"], inplace=True)

columns_to_fill = [
"s1_pm2_5",
"s1_pm10",
Expand All @@ -1027,9 +1024,9 @@ def calibrate_data(data: pd.DataFrame) -> pd.DataFrame:
"humidity",
]

data[columns_to_fill] = data[columns_to_fill].fillna(0)
# TODO: Need to opt for a different approach eg forward fill, can't do here as df only has data of last 1 hour. Perhaps use raw data only?
# May have to rewrite entire pipeline flow
data[columns_to_fill] = data[columns_to_fill].fillna(0)

# additional input columns for calibration
data["avg_pm2_5"] = data[["s1_pm2_5", "s2_pm2_5"]].mean(axis=1).round(2)
Expand All @@ -1052,9 +1049,12 @@ def calibrate_data(data: pd.DataFrame) -> pd.DataFrame:
"pm2_5_pm10_mod",
]
data[input_variables] = data[input_variables].replace([np.inf, -np.inf], 0)
data.dropna(subset=input_variables, inplace=True)

grouped_df = data.groupby("city", dropna=False)
# Explicitly filter data to calibrate.
to_calibrate = data["network"] == "airqo"
data_to_calibrate = data.loc[to_calibrate]
data_to_calibrate.dropna(subset=input_variables, inplace=True)
grouped_df = data_to_calibrate.groupby("city", dropna=False)

rf_model = GCSUtils.get_trained_model_from_gcs(
project_name=project_id,
Expand All @@ -1071,6 +1071,8 @@ def calibrate_data(data: pd.DataFrame) -> pd.DataFrame:
),
)
for city, group in grouped_df:
# What was the intention of this?
# If the below condition fails, the rf_model and lasso_model default to the previously ones used and the ones set as "default" outside the forloop.
if str(city).lower() in [c.value.lower() for c in CityModel]:
try:
rf_model = GCSUtils.get_trained_model_from_gcs(
Expand All @@ -1087,6 +1089,7 @@ def calibrate_data(data: pd.DataFrame) -> pd.DataFrame:
)
except Exception as ex:
logger.exception(f"Error getting model: {ex}")
continue
group["pm2_5_calibrated_value"] = rf_model.predict(group[input_variables])
group["pm10_calibrated_value"] = lasso_model.predict(group[input_variables])

Expand All @@ -1100,15 +1103,20 @@ def calibrate_data(data: pd.DataFrame) -> pd.DataFrame:
data["pm2_5_raw_value"] = data[["s1_pm2_5", "s2_pm2_5"]].mean(axis=1)
data["pm10_raw_value"] = data[["s1_pm10", "s2_pm10"]].mean(axis=1)
if "pm2_5_calibrated_value" in data.columns:
data["pm2_5"] = data["pm2_5_calibrated_value"]
data.loc[to_calibrate, "pm2_5"] = data.loc[
to_calibrate, "pm2_5_calibrated_value"
]
else:
data["pm2_5_calibrated_value"] = None
data["pm2_5"] = None
data.loc[to_calibrate, "pm2_5_calibrated_value"] = None
data.loc[to_calibrate, "pm2_5"] = None
if "pm10_calibrated_value" in data.columns:
data["pm10"] = data["pm10_calibrated_value"]
data.loc[to_calibrate, "pm10"] = data.loc[
to_calibrate, "pm10_calibrated_value"
]
else:
data["pm10_calibrated_value"] = None
data["pm10"] = None
data.loc[to_calibrate, "pm10_calibrated_value"] = None
data.loc[to_calibrate, "pm10"] = None

data["pm2_5"] = data["pm2_5"].fillna(data["pm2_5_raw_value"])
data["pm10"] = data["pm10"].fillna(data["pm10_raw_value"])

Expand Down
37 changes: 26 additions & 11 deletions src/workflows/airqo_etl_utils/data_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,8 @@ def process_for_big_query(dataframe: pd.DataFrame, table: str) -> pd.DataFrame:
@staticmethod
def process_data_for_message_broker(
data: pd.DataFrame,
topic: str,
caller: str,
topic: str = None,
frequency: Frequency = Frequency.HOURLY,
) -> pd.DataFrame:
"""
Expand All @@ -214,16 +214,31 @@ def process_data_for_message_broker(
data.rename(columns={"device_id": "device_name"}, inplace=True)

devices = AirQoDataUtils.get_devices(group_id=caller)
devices = devices[
["device_name", "site_id", "device_latitude", "device_longitude", "network"]
]

data = pd.merge(
left=data,
right=devices,
on=["device_name", "site_id", "network"],
how="left",
)
try:
devices = devices[
[
"device_name",
"site_id",
"device_latitude",
"device_longitude",
"network",
]
]

data = pd.merge(
left=data,
right=devices,
on=["device_name", "site_id", "network"],
how="left",
)
except KeyError as e:
logger.exception(
f"KeyError: The key(s) '{e.args}' are not available in the returned devices data."
)
return None
except Exception as e:
logger.exception(f"An error occured: {e}")
return None
return data

@staticmethod
Expand Down
8 changes: 4 additions & 4 deletions src/workflows/airqo_etl_utils/data_warehouse_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,14 +106,14 @@ def extract_hourly_weather_data(
)

@staticmethod
def extract_sites_meta_data(tenant: Tenant = Tenant.ALL) -> pd.DataFrame:
def extract_sites_meta_data(network: str = "all") -> pd.DataFrame:
airqo_api = AirQoApi()
sites = airqo_api.get_sites(tenant=tenant)
sites = airqo_api.get_sites(network=network)
sites = pd.DataFrame(sites)
sites.rename(
columns={
"latitude": "site_latitude",
"longitude": "site_longitude",
"approximate_latitude": "site_latitude",
"approximate_longitude": "site_longitude",
"description": "site_description",
"altitude": "site_altitude",
"name": "site_name",
Expand Down
Loading
Loading