Skip to content

Commit

Permalink
Merge pull request #4043 from airqo-platform/staging
Browse files Browse the repository at this point in the history
move to production
  • Loading branch information
Baalmart authored Dec 11, 2024
2 parents 3c9423e + c0cf056 commit f90b963
Show file tree
Hide file tree
Showing 21 changed files with 143 additions and 79 deletions.
2 changes: 1 addition & 1 deletion k8s/analytics/values-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ images:
celeryWorker: eu.gcr.io/airqo-250220/airqo-analytics-celery-worker
reportJob: eu.gcr.io/airqo-250220/airqo-analytics-report-job
devicesSummaryJob: eu.gcr.io/airqo-250220/airqo-analytics-devices-summary-job
tag: prod-5e45c956-1733849508
tag: prod-3c9423e2-1733862695
api:
name: airqo-analytics-api
label: analytics-api
Expand Down
2 changes: 1 addition & 1 deletion k8s/device-registry/values-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ app:
replicaCount: 3
image:
repository: eu.gcr.io/airqo-250220/airqo-device-registry-api
tag: prod-5e45c956-1733849508
tag: prod-3c9423e2-1733862695
nameOverride: ''
fullnameOverride: ''
podAnnotations: {}
Expand Down
2 changes: 1 addition & 1 deletion k8s/device-registry/values-stage.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ app:
replicaCount: 2
image:
repository: eu.gcr.io/airqo-250220/airqo-stage-device-registry-api
tag: stage-c1e369be-1733844028
tag: stage-9f584165-1733862604
nameOverride: ''
fullnameOverride: ''
podAnnotations: {}
Expand Down
2 changes: 1 addition & 1 deletion k8s/exceedance/values-prod-airqo.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ app:
configmap: env-exceedance-production
image:
repository: eu.gcr.io/airqo-250220/airqo-exceedance-job
tag: prod-5e45c956-1733849508
tag: prod-3c9423e2-1733862695
nameOverride: ''
fullnameOverride: ''
2 changes: 1 addition & 1 deletion k8s/exceedance/values-prod-kcca.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ app:
configmap: env-exceedance-production
image:
repository: eu.gcr.io/airqo-250220/kcca-exceedance-job
tag: prod-5e45c956-1733849508
tag: prod-3c9423e2-1733862695
nameOverride: ''
fullnameOverride: ''
2 changes: 1 addition & 1 deletion k8s/predict/values-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ images:
predictJob: eu.gcr.io/airqo-250220/airqo-predict-job
trainJob: eu.gcr.io/airqo-250220/airqo-train-job
predictPlaces: eu.gcr.io/airqo-250220/airqo-predict-places-air-quality
tag: prod-5e45c956-1733849508
tag: prod-3c9423e2-1733862695
api:
name: airqo-prediction-api
label: prediction-api
Expand Down
2 changes: 1 addition & 1 deletion k8s/spatial/values-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ app:
replicaCount: 3
image:
repository: eu.gcr.io/airqo-250220/airqo-spatial-api
tag: prod-5e45c956-1733849508
tag: prod-3c9423e2-1733862695
nameOverride: ''
fullnameOverride: ''
podAnnotations: {}
Expand Down
2 changes: 1 addition & 1 deletion k8s/website/values-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ app:
replicaCount: 3
image:
repository: eu.gcr.io/airqo-250220/airqo-website-api
tag: prod-5e45c956-1733849508
tag: prod-3c9423e2-1733862695
nameOverride: ''
fullnameOverride: ''
podAnnotations: {}
Expand Down
2 changes: 1 addition & 1 deletion k8s/workflows/values-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ images:
initContainer: eu.gcr.io/airqo-250220/airqo-workflows-xcom
redisContainer: eu.gcr.io/airqo-250220/airqo-redis
containers: eu.gcr.io/airqo-250220/airqo-workflows
tag: prod-5e45c956-1733849508
tag: prod-3c9423e2-1733862695
nameOverride: ''
fullnameOverride: ''
podAnnotations: {}
Expand Down
10 changes: 4 additions & 6 deletions src/workflows/airqo_etl_utils/airqo_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -719,7 +719,7 @@ def get_cohorts(self, tenant: Tenant = Tenant.ALL) -> List[Dict[str, Any]]:
for cohort in response.get("cohorts", [])
]

def get_sites(self, tenant: Tenant = Tenant.ALL) -> List[Dict[str, Any]]:
def get_sites(self, network: str = "all") -> List[Dict[str, Any]]:
"""
Retrieve sites given a tenant.
Expand Down Expand Up @@ -766,19 +766,17 @@ def get_sites(self, tenant: Tenant = Tenant.ALL) -> List[Dict[str, Any]]:
},
]
"""
query_params = {"tenant": str(Tenant.AIRQO)}
query_params = {}

if tenant != Tenant.ALL:
query_params["network"] = str(tenant)
if network != "all":
query_params["network"] = network

response = self.__request("devices/sites", query_params)

return [
{
**site,
"site_id": site.get("_id", None),
"tenant": site.get("network", site.get("tenant", None)),
"location": site.get("location", None),
"approximate_latitude": site.get(
"approximate_latitude", site.get("latitude", None)
),
Expand Down
52 changes: 30 additions & 22 deletions src/workflows/airqo_etl_utils/airqo_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,6 @@ def aggregate_low_cost_sensors_data(data: pd.DataFrame) -> pd.DataFrame:
numeric_columns = data.select_dtypes(include=["number"]).columns
numeric_columns = numeric_columns.difference(["device_number"])
data_for_aggregation = data[["timestamp", "device_id"] + list(numeric_columns)]

aggregated = (
data_for_aggregation.groupby("device_id")
.apply(lambda group: group.resample("1H", on="timestamp").mean())
Expand Down Expand Up @@ -744,20 +743,19 @@ def process_data_for_api(data: pd.DataFrame, frequency: Frequency) -> list:
devices = airqo_api.get_devices()

device_lookup = {
device["device_number"]: device
for device in devices
if device.get("device_number")
device["device_id"]: device for device in devices if device.get("device_id")
}

for _, row in data.iterrows():
try:
device_number = row["device_number"]
device_id = row["device_id"]

# Get device details from the lookup dictionary
device_details = device_lookup.get(device_number)
device_details = device_lookup.get(device_id)
if not device_details:
logger.exception(
f"Device number {device_number} not found in device list."
f"Device number {device_id} not found in device list."
)
continue

Expand All @@ -766,7 +764,7 @@ def process_data_for_api(data: pd.DataFrame, frequency: Frequency) -> list:
"device_id": device_details["_id"],
"site_id": row["site_id"],
"device_number": device_number,
"tenant": str(Tenant.AIRQO),
"network": device_details["network"],
"location": {
"latitude": {"value": row["latitude"]},
"longitude": {"value": row["longitude"]},
Expand Down Expand Up @@ -832,7 +830,7 @@ def merge_aggregated_weather_data(
airqo_api = AirQoApi()
sites: List[Dict[str, Any]] = []

for site in airqo_api.get_sites(tenant=Tenant.AIRQO):
for site in airqo_api.get_sites(network="airqo"):
sites.extend(
[
{
Expand Down Expand Up @@ -894,7 +892,8 @@ def merge_aggregated_weather_data(
numeric_columns = measurements.select_dtypes(include=["number"]).columns
numeric_columns = numeric_columns.difference(["device_number"])
numeric_counts = measurements[numeric_columns].notna().sum(axis=1)
measurements = measurements[numeric_counts >= 1]
# Raws with more than 1 numeric values
measurements = measurements[numeric_counts > 1]
return measurements

@staticmethod
Expand Down Expand Up @@ -1012,12 +1011,10 @@ def calibrate_data(data: pd.DataFrame) -> pd.DataFrame:

data["timestamp"] = pd.to_datetime(data["timestamp"])
sites = AirQoApi().get_sites()
sites_df = pd.DataFrame(sites, columns=["_id", "city"]).rename(
columns={"_id": "site_id"}
)
sites_df = pd.DataFrame(sites, columns=["site_id", "city"])

data = pd.merge(data, sites_df, on="site_id", how="left")
data.dropna(subset=["device_id", "timestamp"], inplace=True)

columns_to_fill = [
"s1_pm2_5",
"s1_pm10",
Expand All @@ -1027,9 +1024,9 @@ def calibrate_data(data: pd.DataFrame) -> pd.DataFrame:
"humidity",
]

data[columns_to_fill] = data[columns_to_fill].fillna(0)
# TODO: Need to opt for a different approach eg forward fill, can't do here as df only has data of last 1 hour. Perhaps use raw data only?
# May have to rewrite entire pipeline flow
data[columns_to_fill] = data[columns_to_fill].fillna(0)

# additional input columns for calibration
data["avg_pm2_5"] = data[["s1_pm2_5", "s2_pm2_5"]].mean(axis=1).round(2)
Expand All @@ -1052,9 +1049,12 @@ def calibrate_data(data: pd.DataFrame) -> pd.DataFrame:
"pm2_5_pm10_mod",
]
data[input_variables] = data[input_variables].replace([np.inf, -np.inf], 0)
data.dropna(subset=input_variables, inplace=True)

grouped_df = data.groupby("city", dropna=False)
# Explicitly filter data to calibrate.
to_calibrate = data["network"] == "airqo"
data_to_calibrate = data.loc[to_calibrate]
data_to_calibrate.dropna(subset=input_variables, inplace=True)
grouped_df = data_to_calibrate.groupby("city", dropna=False)

rf_model = GCSUtils.get_trained_model_from_gcs(
project_name=project_id,
Expand All @@ -1071,6 +1071,8 @@ def calibrate_data(data: pd.DataFrame) -> pd.DataFrame:
),
)
for city, group in grouped_df:
# What was the intention of this?
# If the below condition fails, the rf_model and lasso_model default to the previously ones used and the ones set as "default" outside the forloop.
if str(city).lower() in [c.value.lower() for c in CityModel]:
try:
rf_model = GCSUtils.get_trained_model_from_gcs(
Expand All @@ -1087,6 +1089,7 @@ def calibrate_data(data: pd.DataFrame) -> pd.DataFrame:
)
except Exception as ex:
logger.exception(f"Error getting model: {ex}")
continue
group["pm2_5_calibrated_value"] = rf_model.predict(group[input_variables])
group["pm10_calibrated_value"] = lasso_model.predict(group[input_variables])

Expand All @@ -1100,15 +1103,20 @@ def calibrate_data(data: pd.DataFrame) -> pd.DataFrame:
data["pm2_5_raw_value"] = data[["s1_pm2_5", "s2_pm2_5"]].mean(axis=1)
data["pm10_raw_value"] = data[["s1_pm10", "s2_pm10"]].mean(axis=1)
if "pm2_5_calibrated_value" in data.columns:
data["pm2_5"] = data["pm2_5_calibrated_value"]
data.loc[to_calibrate, "pm2_5"] = data.loc[
to_calibrate, "pm2_5_calibrated_value"
]
else:
data["pm2_5_calibrated_value"] = None
data["pm2_5"] = None
data.loc[to_calibrate, "pm2_5_calibrated_value"] = None
data.loc[to_calibrate, "pm2_5"] = None
if "pm10_calibrated_value" in data.columns:
data["pm10"] = data["pm10_calibrated_value"]
data.loc[to_calibrate, "pm10"] = data.loc[
to_calibrate, "pm10_calibrated_value"
]
else:
data["pm10_calibrated_value"] = None
data["pm10"] = None
data.loc[to_calibrate, "pm10_calibrated_value"] = None
data.loc[to_calibrate, "pm10"] = None

data["pm2_5"] = data["pm2_5"].fillna(data["pm2_5_raw_value"])
data["pm10"] = data["pm10"].fillna(data["pm10_raw_value"])

Expand Down
37 changes: 26 additions & 11 deletions src/workflows/airqo_etl_utils/data_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,8 @@ def process_for_big_query(dataframe: pd.DataFrame, table: str) -> pd.DataFrame:
@staticmethod
def process_data_for_message_broker(
data: pd.DataFrame,
topic: str,
caller: str,
topic: str = None,
frequency: Frequency = Frequency.HOURLY,
) -> pd.DataFrame:
"""
Expand All @@ -214,16 +214,31 @@ def process_data_for_message_broker(
data.rename(columns={"device_id": "device_name"}, inplace=True)

devices = AirQoDataUtils.get_devices(group_id=caller)
devices = devices[
["device_name", "site_id", "device_latitude", "device_longitude", "network"]
]

data = pd.merge(
left=data,
right=devices,
on=["device_name", "site_id", "network"],
how="left",
)
try:
devices = devices[
[
"device_name",
"site_id",
"device_latitude",
"device_longitude",
"network",
]
]

data = pd.merge(
left=data,
right=devices,
on=["device_name", "site_id", "network"],
how="left",
)
except KeyError as e:
logger.exception(
f"KeyError: The key(s) '{e.args}' are not available in the returned devices data."
)
return None
except Exception as e:
logger.exception(f"An error occured: {e}")
return None
return data

@staticmethod
Expand Down
8 changes: 4 additions & 4 deletions src/workflows/airqo_etl_utils/data_warehouse_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,14 +106,14 @@ def extract_hourly_weather_data(
)

@staticmethod
def extract_sites_meta_data(tenant: Tenant = Tenant.ALL) -> pd.DataFrame:
def extract_sites_meta_data(network: str = "all") -> pd.DataFrame:
airqo_api = AirQoApi()
sites = airqo_api.get_sites(tenant=tenant)
sites = airqo_api.get_sites(network=network)
sites = pd.DataFrame(sites)
sites.rename(
columns={
"latitude": "site_latitude",
"longitude": "site_longitude",
"approximate_latitude": "site_latitude",
"approximate_longitude": "site_longitude",
"description": "site_description",
"altitude": "site_altitude",
"name": "site_name",
Expand Down
Loading

0 comments on commit f90b963

Please sign in to comment.