From fff6c3f4591563f773360cef3cdd91177790512c Mon Sep 17 00:00:00 2001 From: NicholasTurner23 Date: Wed, 11 Dec 2024 10:57:11 +0300 Subject: [PATCH 1/4] Gracefully handle error due to failure to get devices data from kafka. --- .../airqo_etl_utils/data_validator.py | 35 +++++++++++++------ src/workflows/dags/airnow.py | 12 +++++++ src/workflows/dags/kcca_measurements.py | 6 ++++ 3 files changed, 43 insertions(+), 10 deletions(-) diff --git a/src/workflows/airqo_etl_utils/data_validator.py b/src/workflows/airqo_etl_utils/data_validator.py index 61b03a3a1b..3fc90a151a 100644 --- a/src/workflows/airqo_etl_utils/data_validator.py +++ b/src/workflows/airqo_etl_utils/data_validator.py @@ -214,16 +214,31 @@ def process_data_for_message_broker( data.rename(columns={"device_id": "device_name"}, inplace=True) devices = AirQoDataUtils.get_devices(group_id=caller) - devices = devices[ - ["device_name", "site_id", "device_latitude", "device_longitude", "network"] - ] - - data = pd.merge( - left=data, - right=devices, - on=["device_name", "site_id", "network"], - how="left", - ) + try: + devices = devices[ + [ + "device_name", + "site_id", + "device_latitude", + "device_longitude", + "network", + ] + ] + + data = pd.merge( + left=data, + right=devices, + on=["device_name", "site_id", "network"], + how="left", + ) + except KeyError as e: + logger.exception( + f"KeyError: The key(s) '{e.args}' are not available in the returned devices data." + ) + return None + except Exception as e: + logger.exception(f"An error occured: {e}") + return None return data @staticmethod diff --git a/src/workflows/dags/airnow.py b/src/workflows/dags/airnow.py index 4546f40a37..65fb5023a6 100644 --- a/src/workflows/dags/airnow.py +++ b/src/workflows/dags/airnow.py @@ -3,6 +3,7 @@ from airqo_etl_utils.workflows_custom_utils import AirflowUtils from datetime import timedelta from airqo_etl_utils.config import configuration +from airflow.exceptions import AirflowFailException # Historical Data DAG @@ -51,6 +52,11 @@ def send_to_message_broker(data: pd.DataFrame, **kwargs): topic=configuration.HOURLY_MEASUREMENTS_TOPIC, caller=kwargs["dag"].dag_id + unique_str, ) + if not data: + raise AirflowFailException( + "Processing for message broker failed. Please check if kafka is up and running." + ) + broker = MessageBrokerUtils() broker.publish_to_topic( topic=configuration.HOURLY_MEASUREMENTS_TOPIC, data=data @@ -133,6 +139,12 @@ def send_to_message_broker(data: pd.DataFrame, **kwargs): topic=configuration.HOURLY_MEASUREMENTS_TOPIC, caller=kwargs["dag"].dag_id + unique_str, ) + + if not data: + raise AirflowFailException( + "Processing for message broker failed. Please check if kafka is up and running." + ) + broker = MessageBrokerUtils() broker.publish_to_topic( topic=configuration.HOURLY_MEASUREMENTS_TOPIC, data=data diff --git a/src/workflows/dags/kcca_measurements.py b/src/workflows/dags/kcca_measurements.py index 6588fe03c3..d72a0f0179 100644 --- a/src/workflows/dags/kcca_measurements.py +++ b/src/workflows/dags/kcca_measurements.py @@ -1,6 +1,7 @@ from airflow.decorators import dag, task from airqo_etl_utils.workflows_custom_utils import AirflowUtils +from airflow.exceptions import AirflowFailException from airqo_etl_utils.config import configuration @@ -56,6 +57,11 @@ def send_to_message_broker(data: pd.DataFrame, **kwargs): caller=kwargs["dag"].dag_id, ) + if not data: + raise AirflowFailException( + "Processing for message broker failed. Please check if kafka is up and running." + ) + MessageBrokerUtils.update_hourly_data_topic(data=data) @task() From cc7f77ecae6729eaffa20fc5900eda1506232443 Mon Sep 17 00:00:00 2001 From: NicholasTurner23 Date: Wed, 11 Dec 2024 13:35:27 +0300 Subject: [PATCH 2/4] Updates sites tenant/network --- src/workflows/airqo_etl_utils/airqo_api.py | 10 +++--- src/workflows/airqo_etl_utils/airqo_utils.py | 2 +- .../airqo_etl_utils/data_warehouse_utils.py | 8 ++--- .../airqo_etl_utils/meta_data_utils.py | 32 +++++++++---------- src/workflows/dags/airqo_automated_tweets.py | 2 +- src/workflows/dags/airqo_bam_measurements.py | 7 ++++ src/workflows/dags/airqo_measurements.py | 19 +++++++++++ src/workflows/dags/meta_data.py | 3 +- src/workflows/dags/weather_measurements.py | 2 +- 9 files changed, 54 insertions(+), 31 deletions(-) diff --git a/src/workflows/airqo_etl_utils/airqo_api.py b/src/workflows/airqo_etl_utils/airqo_api.py index 598a026abd..6197d525fc 100644 --- a/src/workflows/airqo_etl_utils/airqo_api.py +++ b/src/workflows/airqo_etl_utils/airqo_api.py @@ -719,7 +719,7 @@ def get_cohorts(self, tenant: Tenant = Tenant.ALL) -> List[Dict[str, Any]]: for cohort in response.get("cohorts", []) ] - def get_sites(self, tenant: Tenant = Tenant.ALL) -> List[Dict[str, Any]]: + def get_sites(self, network: str = "all") -> List[Dict[str, Any]]: """ Retrieve sites given a tenant. @@ -766,10 +766,10 @@ def get_sites(self, tenant: Tenant = Tenant.ALL) -> List[Dict[str, Any]]: }, ] """ - query_params = {"tenant": str(Tenant.AIRQO)} + query_params = {} - if tenant != Tenant.ALL: - query_params["network"] = str(tenant) + if network != "all": + query_params["network"] = network response = self.__request("devices/sites", query_params) @@ -777,8 +777,6 @@ def get_sites(self, tenant: Tenant = Tenant.ALL) -> List[Dict[str, Any]]: { **site, "site_id": site.get("_id", None), - "tenant": site.get("network", site.get("tenant", None)), - "location": site.get("location", None), "approximate_latitude": site.get( "approximate_latitude", site.get("latitude", None) ), diff --git a/src/workflows/airqo_etl_utils/airqo_utils.py b/src/workflows/airqo_etl_utils/airqo_utils.py index 8e24b25074..dd392d3d58 100644 --- a/src/workflows/airqo_etl_utils/airqo_utils.py +++ b/src/workflows/airqo_etl_utils/airqo_utils.py @@ -832,7 +832,7 @@ def merge_aggregated_weather_data( airqo_api = AirQoApi() sites: List[Dict[str, Any]] = [] - for site in airqo_api.get_sites(tenant=Tenant.AIRQO): + for site in airqo_api.get_sites(network="airqo"): sites.extend( [ { diff --git a/src/workflows/airqo_etl_utils/data_warehouse_utils.py b/src/workflows/airqo_etl_utils/data_warehouse_utils.py index 19b3082586..00434bd2ec 100644 --- a/src/workflows/airqo_etl_utils/data_warehouse_utils.py +++ b/src/workflows/airqo_etl_utils/data_warehouse_utils.py @@ -106,14 +106,14 @@ def extract_hourly_weather_data( ) @staticmethod - def extract_sites_meta_data(tenant: Tenant = Tenant.ALL) -> pd.DataFrame: + def extract_sites_meta_data(network: str = "all") -> pd.DataFrame: airqo_api = AirQoApi() - sites = airqo_api.get_sites(tenant=tenant) + sites = airqo_api.get_sites(network=network) sites = pd.DataFrame(sites) sites.rename( columns={ - "latitude": "site_latitude", - "longitude": "site_longitude", + "approximate_latitude": "site_latitude", + "approximate_longitude": "site_longitude", "description": "site_description", "altitude": "site_altitude", "name": "site_name", diff --git a/src/workflows/airqo_etl_utils/meta_data_utils.py b/src/workflows/airqo_etl_utils/meta_data_utils.py index 2a997c6e83..328bb73afb 100644 --- a/src/workflows/airqo_etl_utils/meta_data_utils.py +++ b/src/workflows/airqo_etl_utils/meta_data_utils.py @@ -119,8 +119,8 @@ def merge_cohorts_and_devices(data: pd.DataFrame) -> pd.DataFrame: return pd.DataFrame(merged_data) @staticmethod - def extract_sites_from_api(tenant: Tenant = Tenant.ALL) -> pd.DataFrame: - sites = AirQoApi().get_sites(tenant=tenant) + def extract_sites_from_api(network: str = "all") -> pd.DataFrame: + sites = AirQoApi().get_sites(network=network) dataframe = pd.json_normalize(sites) dataframe = dataframe[ [ @@ -155,8 +155,8 @@ def extract_sites_from_api(tenant: Tenant = Tenant.ALL) -> pd.DataFrame: return dataframe @staticmethod - def extract_sites_meta_data_from_api(tenant: Tenant = Tenant.ALL) -> pd.DataFrame: - sites = AirQoApi().get_sites(tenant=tenant) + def extract_sites_meta_data_from_api(network: str = "all") -> pd.DataFrame: + sites = AirQoApi().get_sites(network=network) dataframe = pd.json_normalize(sites) big_query_api = BigQueryApi() cols = big_query_api.get_columns(table=big_query_api.sites_meta_data_table) @@ -167,15 +167,15 @@ def extract_sites_meta_data_from_api(tenant: Tenant = Tenant.ALL) -> pd.DataFram return dataframe @staticmethod - def update_nearest_weather_stations(tenant: Tenant) -> None: + def update_nearest_weather_stations(network: str) -> None: airqo_api = AirQoApi() - sites = airqo_api.get_sites(tenant=tenant) + sites = airqo_api.get_sites(network=network) sites_data = [ { "site_id": site.get("site_id", None), - "tenant": site.get("tenant", None), - "latitude": site.get("latitude", None), - "longitude": site.get("longitude", None), + "network": site.get("network", None), + "latitude": site.get("approximate_latitude", None), + "longitude": site.get("approximate_longitude", None), } for site in sites ] @@ -184,7 +184,7 @@ def update_nearest_weather_stations(tenant: Tenant) -> None: updated_sites = [ { "site_id": site.get("site_id"), - "tenant": site.get("tenant"), + "network": site.get("network"), "weather_stations": site.get("weather_stations"), } for site in updated_sites @@ -192,16 +192,16 @@ def update_nearest_weather_stations(tenant: Tenant) -> None: airqo_api.update_sites(updated_sites) @staticmethod - def update_sites_distance_measures(tenant: Tenant) -> None: + def update_sites_distance_measures(network: str) -> None: airqo_api = AirQoApi() - sites = airqo_api.get_sites(tenant=tenant) + sites = airqo_api.get_sites(network=network) updated_sites = [] for site in sites: record = { "site_id": site.get("site_id", None), - "tenant": site.get("tenant", None), - "latitude": site.get("latitude", None), - "longitude": site.get("longitude", None), + "network": site.get("network", None), + "latitude": site.get("approximate_latitude", None), + "longitude": site.get("approximate_longitude", None), } meta_data = airqo_api.get_meta_data( latitude=record.get("latitude"), @@ -212,7 +212,7 @@ def update_sites_distance_measures(tenant: Tenant) -> None: updated_sites.append( { **meta_data, - **{"site_id": record["site_id"], "tenant": record["tenant"]}, + **{"site_id": record["site_id"], "network": record["network"]}, } ) diff --git a/src/workflows/dags/airqo_automated_tweets.py b/src/workflows/dags/airqo_automated_tweets.py index 0379e594d5..121a832672 100644 --- a/src/workflows/dags/airqo_automated_tweets.py +++ b/src/workflows/dags/airqo_automated_tweets.py @@ -18,7 +18,7 @@ def create_forecast_tweets(): def retrieve_sites(): from airqo_etl_utils.airqo_api import AirQoApi - return AirQoApi().get_sites(tenant=Tenant.AIRQO) + return AirQoApi().get_sites(network="airqo") @task() def select_forecast_sites(sites): diff --git a/src/workflows/dags/airqo_bam_measurements.py b/src/workflows/dags/airqo_bam_measurements.py index 9cd0ca5636..7b3c4d6a6d 100644 --- a/src/workflows/dags/airqo_bam_measurements.py +++ b/src/workflows/dags/airqo_bam_measurements.py @@ -10,6 +10,7 @@ from airqo_etl_utils.date import DateUtils from airqo_etl_utils.bigquery_api import BigQueryApi from datetime import timedelta +from airflow.exceptions import AirflowFailException @dag( @@ -152,6 +153,12 @@ def update_latest_data_topic(data: pd.DataFrame, **kwargs): topic=configuration.AVERAGED_HOURLY_MEASUREMENTS_TOPIC, caller=kwargs["dag"].dag_id + unique_str, ) + + if not data: + raise AirflowFailException( + "Processing for message broker failed. Please check if kafka is up and running." + ) + broker = MessageBrokerUtils() broker.publish_to_topic( topic=configuration.AVERAGED_HOURLY_MEASUREMENTS_TOPIC, data=data diff --git a/src/workflows/dags/airqo_measurements.py b/src/workflows/dags/airqo_measurements.py index 058222d21e..e48f7d8405 100644 --- a/src/workflows/dags/airqo_measurements.py +++ b/src/workflows/dags/airqo_measurements.py @@ -3,6 +3,7 @@ from airqo_etl_utils.config import configuration from airqo_etl_utils.workflows_custom_utils import AirflowUtils from airqo_etl_utils.constants import Frequency +from airflow.exceptions import AirflowFailException from dag_docs import ( airqo_realtime_low_cost_measurements_doc, airqo_historical_hourly_measurements_doc, @@ -112,6 +113,12 @@ def send_hourly_measurements_to_message_broker( topic=configuration.HOURLY_MEASUREMENTS_TOPIC, caller=kwargs["dag"].dag_id + unique_str, ) + + if not data: + raise AirflowFailException( + "Processing for message broker failed. Please check if kafka is up and running." + ) + broker = MessageBrokerUtils() broker.publish_to_topic( topic=configuration.HOURLY_MEASUREMENTS_TOPIC, data=data @@ -398,6 +405,12 @@ def send_hourly_measurements_to_message_broker(data: pd.DataFrame, **kwargs): topic=configuration.HOURLY_MEASUREMENTS_TOPIC, caller=kwargs["dag"].dag_id + unique_str, ) + + if not data: + raise AirflowFailException( + "Processing for message broker failed. Please check if kafka is up and running." + ) + broker = MessageBrokerUtils() broker.publish_to_topic( topic=configuration.HOURLY_MEASUREMENTS_TOPIC, data=data @@ -443,6 +456,12 @@ def update_latest_data_topic(data: pd.DataFrame, **kwargs): topic=configuration.AVERAGED_HOURLY_MEASUREMENTS_TOPIC, caller=kwargs["dag"].dag_id + unique_str, ) + + if not data: + raise AirflowFailException( + "Processing for message broker failed. Please check if kafka is up and running." + ) + broker = MessageBrokerUtils() broker.publish_to_topic( topic=configuration.AVERAGED_HOURLY_MEASUREMENTS_TOPIC, data=data diff --git a/src/workflows/dags/meta_data.py b/src/workflows/dags/meta_data.py index 64732a6024..914c038dfc 100644 --- a/src/workflows/dags/meta_data.py +++ b/src/workflows/dags/meta_data.py @@ -211,9 +211,8 @@ def meta_data_update_microservice_sites_meta_data(): @task() def update_nearest_weather_stations() -> None: from airqo_etl_utils.meta_data_utils import MetaDataUtils - from airqo_etl_utils.constants import Tenant - MetaDataUtils.update_nearest_weather_stations(tenant=Tenant.ALL) + MetaDataUtils.update_nearest_weather_stations(network="all") @task() def update_distance_measures() -> None: diff --git a/src/workflows/dags/weather_measurements.py b/src/workflows/dags/weather_measurements.py index 2ece7b7009..6372659bd4 100644 --- a/src/workflows/dags/weather_measurements.py +++ b/src/workflows/dags/weather_measurements.py @@ -244,7 +244,7 @@ def openweathermap_data(): def retrieve_sites(): from airqo_etl_utils.airqo_api import AirQoApi - return AirQoApi().get_sites(tenant=Tenant.AIRQO) + return AirQoApi().get_sites(network="airqo") @task() def retrieve_weather_data(sites): From b82fe30d5d81e1b68f604b112743173a5baf6fbc Mon Sep 17 00:00:00 2001 From: NicholasTurner23 Date: Wed, 11 Dec 2024 17:01:15 +0300 Subject: [PATCH 3/4] Explicitly filter data to calibrate while maintaining the rest --- src/workflows/airqo_etl_utils/airqo_utils.py | 50 ++++++++++++-------- 1 file changed, 29 insertions(+), 21 deletions(-) diff --git a/src/workflows/airqo_etl_utils/airqo_utils.py b/src/workflows/airqo_etl_utils/airqo_utils.py index dd392d3d58..25b73e4a8a 100644 --- a/src/workflows/airqo_etl_utils/airqo_utils.py +++ b/src/workflows/airqo_etl_utils/airqo_utils.py @@ -554,7 +554,6 @@ def aggregate_low_cost_sensors_data(data: pd.DataFrame) -> pd.DataFrame: numeric_columns = data.select_dtypes(include=["number"]).columns numeric_columns = numeric_columns.difference(["device_number"]) data_for_aggregation = data[["timestamp", "device_id"] + list(numeric_columns)] - aggregated = ( data_for_aggregation.groupby("device_id") .apply(lambda group: group.resample("1H", on="timestamp").mean()) @@ -744,20 +743,19 @@ def process_data_for_api(data: pd.DataFrame, frequency: Frequency) -> list: devices = airqo_api.get_devices() device_lookup = { - device["device_number"]: device - for device in devices - if device.get("device_number") + device["device_id"]: device for device in devices if device.get("device_id") } for _, row in data.iterrows(): try: device_number = row["device_number"] + device_id = row["device_id"] # Get device details from the lookup dictionary - device_details = device_lookup.get(device_number) + device_details = device_lookup.get(device_id) if not device_details: logger.exception( - f"Device number {device_number} not found in device list." + f"Device number {device_id} not found in device list." ) continue @@ -766,7 +764,7 @@ def process_data_for_api(data: pd.DataFrame, frequency: Frequency) -> list: "device_id": device_details["_id"], "site_id": row["site_id"], "device_number": device_number, - "tenant": str(Tenant.AIRQO), + "network": device_details["network"], "location": { "latitude": {"value": row["latitude"]}, "longitude": {"value": row["longitude"]}, @@ -894,7 +892,8 @@ def merge_aggregated_weather_data( numeric_columns = measurements.select_dtypes(include=["number"]).columns numeric_columns = numeric_columns.difference(["device_number"]) numeric_counts = measurements[numeric_columns].notna().sum(axis=1) - measurements = measurements[numeric_counts >= 1] + # Raws with more than 1 numeric values + measurements = measurements[numeric_counts > 1] return measurements @staticmethod @@ -1012,12 +1011,10 @@ def calibrate_data(data: pd.DataFrame) -> pd.DataFrame: data["timestamp"] = pd.to_datetime(data["timestamp"]) sites = AirQoApi().get_sites() - sites_df = pd.DataFrame(sites, columns=["_id", "city"]).rename( - columns={"_id": "site_id"} - ) + sites_df = pd.DataFrame(sites, columns=["site_id", "city"]) + data = pd.merge(data, sites_df, on="site_id", how="left") data.dropna(subset=["device_id", "timestamp"], inplace=True) - columns_to_fill = [ "s1_pm2_5", "s1_pm10", @@ -1027,9 +1024,9 @@ def calibrate_data(data: pd.DataFrame) -> pd.DataFrame: "humidity", ] - data[columns_to_fill] = data[columns_to_fill].fillna(0) # TODO: Need to opt for a different approach eg forward fill, can't do here as df only has data of last 1 hour. Perhaps use raw data only? # May have to rewrite entire pipeline flow + data[columns_to_fill] = data[columns_to_fill].fillna(0) # additional input columns for calibration data["avg_pm2_5"] = data[["s1_pm2_5", "s2_pm2_5"]].mean(axis=1).round(2) @@ -1052,9 +1049,12 @@ def calibrate_data(data: pd.DataFrame) -> pd.DataFrame: "pm2_5_pm10_mod", ] data[input_variables] = data[input_variables].replace([np.inf, -np.inf], 0) - data.dropna(subset=input_variables, inplace=True) - grouped_df = data.groupby("city", dropna=False) + # Explicitly filter data to calibrate. + to_calibrate = data["network"] == "airqo" + data_to_calibrate = data.loc[to_calibrate] + data_to_calibrate.dropna(subset=input_variables, inplace=True) + grouped_df = data_to_calibrate.groupby("city", dropna=False) rf_model = GCSUtils.get_trained_model_from_gcs( project_name=project_id, @@ -1071,6 +1071,8 @@ def calibrate_data(data: pd.DataFrame) -> pd.DataFrame: ), ) for city, group in grouped_df: + # What was the intention of this? + # If the below condition fails, the rf_model and lasso_model default to the previously ones used and the ones set as "default" outside the forloop. if str(city).lower() in [c.value.lower() for c in CityModel]: try: rf_model = GCSUtils.get_trained_model_from_gcs( @@ -1087,6 +1089,7 @@ def calibrate_data(data: pd.DataFrame) -> pd.DataFrame: ) except Exception as ex: logger.exception(f"Error getting model: {ex}") + continue group["pm2_5_calibrated_value"] = rf_model.predict(group[input_variables]) group["pm10_calibrated_value"] = lasso_model.predict(group[input_variables]) @@ -1100,15 +1103,20 @@ def calibrate_data(data: pd.DataFrame) -> pd.DataFrame: data["pm2_5_raw_value"] = data[["s1_pm2_5", "s2_pm2_5"]].mean(axis=1) data["pm10_raw_value"] = data[["s1_pm10", "s2_pm10"]].mean(axis=1) if "pm2_5_calibrated_value" in data.columns: - data["pm2_5"] = data["pm2_5_calibrated_value"] + data.loc[to_calibrate, "pm2_5"] = data.loc[ + to_calibrate, "pm2_5_calibrated_value" + ] else: - data["pm2_5_calibrated_value"] = None - data["pm2_5"] = None + data.loc[to_calibrate, "pm2_5_calibrated_value"] = None + data.loc[to_calibrate, "pm2_5"] = None if "pm10_calibrated_value" in data.columns: - data["pm10"] = data["pm10_calibrated_value"] + data.loc[to_calibrate, "pm10"] = data.loc[ + to_calibrate, "pm10_calibrated_value" + ] else: - data["pm10_calibrated_value"] = None - data["pm10"] = None + data.loc[to_calibrate, "pm10_calibrated_value"] = None + data.loc[to_calibrate, "pm10"] = None + data["pm2_5"] = data["pm2_5"].fillna(data["pm2_5_raw_value"]) data["pm10"] = data["pm10"].fillna(data["pm10_raw_value"]) From 01830313aea938e35874d4bd348aa60b3e1bfc50 Mon Sep 17 00:00:00 2001 From: NicholasTurner23 Date: Wed, 11 Dec 2024 17:02:58 +0300 Subject: [PATCH 4/4] Updates/cleanup --- src/workflows/airqo_etl_utils/data_validator.py | 2 +- src/workflows/dags/airnow.py | 4 ++-- src/workflows/dags/airqo_bam_measurements.py | 2 +- src/workflows/dags/airqo_measurements.py | 6 +++--- src/workflows/dags/kcca_measurements.py | 2 +- 5 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/workflows/airqo_etl_utils/data_validator.py b/src/workflows/airqo_etl_utils/data_validator.py index 3fc90a151a..ae128a7220 100644 --- a/src/workflows/airqo_etl_utils/data_validator.py +++ b/src/workflows/airqo_etl_utils/data_validator.py @@ -188,8 +188,8 @@ def process_for_big_query(dataframe: pd.DataFrame, table: str) -> pd.DataFrame: @staticmethod def process_data_for_message_broker( data: pd.DataFrame, - topic: str, caller: str, + topic: str = None, frequency: Frequency = Frequency.HOURLY, ) -> pd.DataFrame: """ diff --git a/src/workflows/dags/airnow.py b/src/workflows/dags/airnow.py index 65fb5023a6..4919846150 100644 --- a/src/workflows/dags/airnow.py +++ b/src/workflows/dags/airnow.py @@ -49,8 +49,8 @@ def send_to_message_broker(data: pd.DataFrame, **kwargs): data = DataValidationUtils.process_data_for_message_broker( data=data, - topic=configuration.HOURLY_MEASUREMENTS_TOPIC, caller=kwargs["dag"].dag_id + unique_str, + topic=configuration.HOURLY_MEASUREMENTS_TOPIC, ) if not data: raise AirflowFailException( @@ -136,8 +136,8 @@ def send_to_message_broker(data: pd.DataFrame, **kwargs): data = DataValidationUtils.process_data_for_message_broker( data=data, - topic=configuration.HOURLY_MEASUREMENTS_TOPIC, caller=kwargs["dag"].dag_id + unique_str, + topic=configuration.HOURLY_MEASUREMENTS_TOPIC, ) if not data: diff --git a/src/workflows/dags/airqo_bam_measurements.py b/src/workflows/dags/airqo_bam_measurements.py index 7b3c4d6a6d..148b0e72cd 100644 --- a/src/workflows/dags/airqo_bam_measurements.py +++ b/src/workflows/dags/airqo_bam_measurements.py @@ -150,8 +150,8 @@ def update_latest_data_topic(data: pd.DataFrame, **kwargs): ) data = DataValidationUtils.process_data_for_message_broker( data=data, - topic=configuration.AVERAGED_HOURLY_MEASUREMENTS_TOPIC, caller=kwargs["dag"].dag_id + unique_str, + topic=configuration.AVERAGED_HOURLY_MEASUREMENTS_TOPIC, ) if not data: diff --git a/src/workflows/dags/airqo_measurements.py b/src/workflows/dags/airqo_measurements.py index e48f7d8405..b551293aeb 100644 --- a/src/workflows/dags/airqo_measurements.py +++ b/src/workflows/dags/airqo_measurements.py @@ -110,8 +110,8 @@ def send_hourly_measurements_to_message_broker( data = DataValidationUtils.process_data_for_message_broker( data=data, - topic=configuration.HOURLY_MEASUREMENTS_TOPIC, caller=kwargs["dag"].dag_id + unique_str, + topic=configuration.HOURLY_MEASUREMENTS_TOPIC, ) if not data: @@ -402,8 +402,8 @@ def send_hourly_measurements_to_message_broker(data: pd.DataFrame, **kwargs): data = DataValidationUtils.process_data_for_message_broker( data=data, - topic=configuration.HOURLY_MEASUREMENTS_TOPIC, caller=kwargs["dag"].dag_id + unique_str, + topic=configuration.HOURLY_MEASUREMENTS_TOPIC, ) if not data: @@ -453,8 +453,8 @@ def update_latest_data_topic(data: pd.DataFrame, **kwargs): ) data = DataValidationUtils.process_data_for_message_broker( data=data, - topic=configuration.AVERAGED_HOURLY_MEASUREMENTS_TOPIC, caller=kwargs["dag"].dag_id + unique_str, + topic=configuration.AVERAGED_HOURLY_MEASUREMENTS_TOPIC, ) if not data: diff --git a/src/workflows/dags/kcca_measurements.py b/src/workflows/dags/kcca_measurements.py index d72a0f0179..15c8354655 100644 --- a/src/workflows/dags/kcca_measurements.py +++ b/src/workflows/dags/kcca_measurements.py @@ -53,8 +53,8 @@ def send_to_message_broker(data: pd.DataFrame, **kwargs): data = DataValidationUtils.process_data_for_message_broker( data=data, - topic=configuration.HOURLY_MEASUREMENTS_TOPIC, caller=kwargs["dag"].dag_id, + topic=configuration.HOURLY_MEASUREMENTS_TOPIC, ) if not data: