diff --git a/k8s/kafka/topics/kafka-topics.yaml b/k8s/kafka/topics/kafka-topics.yaml index 5f6710189f..bd594f7b29 100644 --- a/k8s/kafka/topics/kafka-topics.yaml +++ b/k8s/kafka/topics/kafka-topics.yaml @@ -25,6 +25,21 @@ spec: config: retention.ms: 18000000 +--- +apiVersion: kafka.strimzi.io/v1beta2 +kind: KafkaTopic +metadata: + name: averaged-hourly-measurements-topic + namespace: message-broker + labels: + strimzi.io/cluster: kafka-cluster +spec: + partitions: 3 + replicas: 2 + config: + min.insync.replicas: 2 + retention.ms: 18000000 + --- apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaTopic diff --git a/src/workflows/airqo_etl_utils/airqo_utils.py b/src/workflows/airqo_etl_utils/airqo_utils.py index fc427a976c..abbbc0ec52 100644 --- a/src/workflows/airqo_etl_utils/airqo_utils.py +++ b/src/workflows/airqo_etl_utils/airqo_utils.py @@ -1058,7 +1058,8 @@ def get_devices(group_id: str) -> pd.DataFrame: except json.JSONDecodeError as e: logger.exception(f"Error decoding JSON: {e}") continue - if not key or not value.get("_id") or not value.get("device_id"): + + if not key or not value.get("device_id"): logger.warning( f"Skipping message with key: {key}, missing 'device_id'." ) diff --git a/src/workflows/airqo_etl_utils/config.py b/src/workflows/airqo_etl_utils/config.py index 2b1be34c37..cfbaeb9386 100644 --- a/src/workflows/airqo_etl_utils/config.py +++ b/src/workflows/airqo_etl_utils/config.py @@ -92,7 +92,7 @@ class Config: OPENWEATHER_DATA_BATCH_SIZE = os.getenv("OPENWEATHER_DATA_BATCH_SIZE") # Kafka BOOTSTRAP_SERVERS = os.getenv("BOOTSTRAP_SERVERS", "localhost:9092") - TOPIC_PARTITIONS = os.getenv("TOPIC_PARTITIONS", "1,2,3,4").split(",") + TOPIC_PARTITIONS = os.getenv("TOPIC_PARTITIONS", "0,1,2").split(",") SCHEMA_REGISTRY_URL = os.getenv("SCHEMA_REGISTRY_URL") # Kafka Topics @@ -101,6 +101,10 @@ class Config: HOURLY_MEASUREMENTS_TOPIC = os.getenv("HOURLY_MEASUREMENTS_TOPIC") BAM_MEASUREMENTS_TOPIC = os.getenv("BAM_MEASUREMENTS_TOPIC") DEVICES_TOPIC = os.getenv("DEVICES_TOPIC") + CALIBRATED_HOURLY_MEASUREMENTS_TOPIC = os.getenv( + "CALIBRATED_HOURLY_MEASUREMENTS_TOPIC" + ) + AVERAGED_HOURLY_MEASUREMENTS_TOPIC = os.getenv("AVERAGED_HOURLY_MEASUREMENTS_TOPIC") # Airnow AIRNOW_BASE_URL = os.getenv("AIRNOW_BASE_URL") diff --git a/src/workflows/airqo_etl_utils/data_validator.py b/src/workflows/airqo_etl_utils/data_validator.py index 657b06c8ee..5b93ca21f9 100644 --- a/src/workflows/airqo_etl_utils/data_validator.py +++ b/src/workflows/airqo_etl_utils/data_validator.py @@ -165,30 +165,22 @@ def process_data_for_message_broker( if tenant != Tenant.ALL: data.loc[:, "tenant"] = str(tenant) - # Additional processing for hourly measurements topic - if topic == configuration.HOURLY_MEASUREMENTS_TOPIC: - data.rename(columns={"device_id": "device_name"}, inplace=True) - - devices = AirQoDataUtils.get_devices(group_id=caller) - devices = devices[["tenant", "_id", "site_id", "latitude", "longitude"]] - devices.rename( - columns={ - "_id": "device_id", - "latitude": "device_latitude", - "longitude": "device_longitude", - }, - inplace=True, - ) - - data = pd.merge( - left=data, - right=devices, - on=["device_name", "site_id", "tenant"], - how="left", - ) + data.rename(columns={"device_id": "device_name"}, inplace=True) + + devices = AirQoDataUtils.get_devices(group_id=caller) + devices = devices[ + ["tenant", "device_name", "site_id", "device_latitude", "device_longitude"] + ] + + data = pd.merge( + left=data, + right=devices, + on=["device_name", "site_id", "tenant"], + how="left", + ) - data.rename(columns={"tenant": "network"}, inplace=True) - data["tenant"] = str(Tenant.AIRQO) + data.rename(columns={"tenant": "network"}, inplace=True) + data["tenant"] = str(Tenant.AIRQO) return data diff --git a/src/workflows/airqo_etl_utils/date.py b/src/workflows/airqo_etl_utils/date.py index cb8986fbb2..f9e284922b 100644 --- a/src/workflows/airqo_etl_utils/date.py +++ b/src/workflows/airqo_etl_utils/date.py @@ -83,12 +83,15 @@ def get_dag_date_time_values( return start_date_time, end_date_time @staticmethod - def get_query_date_time_values(hours=1, days=0): - start_date_time = datetime.now(timezone.utc) - timedelta(hours=hours) + def get_query_date_time_values(hours=1, days=0, **kwargs): + + execution_date = kwargs["dag_run"].execution_date + + start_date_time = execution_date - timedelta(hours=hours) end_date_time = start_date_time + timedelta(hours=hours) if days != 0: - start_date_time = datetime.now(timezone.utc) - timedelta(days=days) + start_date_time = execution_date - timedelta(days=days) end_date_time = start_date_time + timedelta(days=days) return date_to_str_hours(start_date_time), date_to_str_hours(end_date_time) diff --git a/src/workflows/dags/airnow.py b/src/workflows/dags/airnow.py index 8fe9bc3c85..e97d377c05 100644 --- a/src/workflows/dags/airnow.py +++ b/src/workflows/dags/airnow.py @@ -1,6 +1,8 @@ from airflow.decorators import dag, task from airflow.utils.dates import days_ago from airqo_etl_utils.workflows_custom_utils import AirflowUtils +from datetime import timedelta +from airqo_etl_utils.config import configuration # Historical Data DAG @@ -15,7 +17,7 @@ def airnow_bam_historical_data(): import pandas as pd - @task() + @task(provide_context=True, retries=3, retry_delay=timedelta(minutes=5)) def extract_bam_data(**kwargs): from airqo_etl_utils.date import DateUtils from airqo_etl_utils.airnow_utils import AirnowDataUtils @@ -34,13 +36,28 @@ def process_data(data: pd.DataFrame): return AirnowDataUtils.process_bam_data(data=data) - @task() - def send_to_message_broker(data: pd.DataFrame): + @task(retries=3, retry_delay=timedelta(minutes=5)) + def send_to_message_broker(data: pd.DataFrame, **kwargs): from airqo_etl_utils.message_broker_utils import MessageBrokerUtils + from airqo_etl_utils.data_validator import DataValidationUtils + from airqo_etl_utils.constants import Tenant + from datetime import datetime - MessageBrokerUtils.update_hourly_data_topic(data=data) + now = datetime.now() + unique_str = str(now.date()) + "-" + str(now.hour) - @task() + data = DataValidationUtils.process_data_for_message_broker( + data=data, + tenant=Tenant.AIRQO, + topic=configuration.HOURLY_MEASUREMENTS_TOPIC, + caller=kwargs["dag"].dag_id + unique_str, + ) + broker = MessageBrokerUtils() + broker.publish_to_topic( + topic=configuration.HOURLY_MEASUREMENTS_TOPIC, data=data + ) + + @task(retries=3, retry_delay=timedelta(minutes=5)) def send_to_bigquery(data: pd.DataFrame): from airqo_etl_utils.bigquery_api import BigQueryApi from airqo_etl_utils.data_validator import DataValidationUtils @@ -53,7 +70,7 @@ def send_to_bigquery(data: pd.DataFrame): ) big_query_api.load_data(dataframe=processed_data, table=table) - @task() + @task(retries=3, retry_delay=timedelta(minutes=5)) def send_to_api(data: pd.DataFrame, **kwargs): send_to_api_param = kwargs.get("params", {}).get("send_to_api") if send_to_api_param: @@ -85,12 +102,12 @@ def send_to_api(data: pd.DataFrame, **kwargs): def airnow_bam_realtime_data(): import pandas as pd - @task() - def extract_bam_data(): + @task(provide_context=True, retries=3, retry_delay=timedelta(minutes=5)) + def extract_bam_data(**kwargs): from airqo_etl_utils.airnow_utils import AirnowDataUtils from airqo_etl_utils.date import DateUtils - start_date_time, end_date_time = DateUtils.get_query_date_time_values() + start_date_time, end_date_time = DateUtils.get_query_date_time_values(**kwargs) return AirnowDataUtils.extract_bam_data( start_date_time=start_date_time, end_date_time=end_date_time, @@ -102,13 +119,28 @@ def process_data(data: pd.DataFrame): return AirnowDataUtils.process_bam_data(data=data) - @task() - def send_to_message_broker(data: pd.DataFrame): + @task(retries=3, retry_delay=timedelta(minutes=5)) + def send_to_message_broker(data: pd.DataFrame, **kwargs): from airqo_etl_utils.message_broker_utils import MessageBrokerUtils + from airqo_etl_utils.data_validator import DataValidationUtils + from airqo_etl_utils.constants import Tenant + from datetime import datetime - MessageBrokerUtils.update_hourly_data_topic(data=data) + now = datetime.now() + unique_str = str(now.date()) + "-" + str(now.hour) - @task() + data = DataValidationUtils.process_data_for_message_broker( + data=data, + tenant=Tenant.AIRQO, + topic=configuration.HOURLY_MEASUREMENTS_TOPIC, + caller=kwargs["dag"].dag_id + unique_str, + ) + broker = MessageBrokerUtils() + broker.publish_to_topic( + topic=configuration.HOURLY_MEASUREMENTS_TOPIC, data=data + ) + + @task(retries=3, retry_delay=timedelta(minutes=5)) def send_to_bigquery(data: pd.DataFrame): from airqo_etl_utils.bigquery_api import BigQueryApi from airqo_etl_utils.data_validator import DataValidationUtils @@ -121,7 +153,7 @@ def send_to_bigquery(data: pd.DataFrame): ) big_query_api.load_data(dataframe=processed_data, table=table) - @task() + @task(retries=3, retry_delay=timedelta(minutes=5)) def send_to_api(data: pd.DataFrame): from airqo_etl_utils.data_validator import DataValidationUtils from airqo_etl_utils.airqo_api import AirQoApi diff --git a/src/workflows/dags/airqo_bam_measurements.py b/src/workflows/dags/airqo_bam_measurements.py index 38163d5247..e8b2cbff32 100644 --- a/src/workflows/dags/airqo_bam_measurements.py +++ b/src/workflows/dags/airqo_bam_measurements.py @@ -9,6 +9,7 @@ from airqo_etl_utils.constants import DeviceCategory, DataType from airqo_etl_utils.date import DateUtils from airqo_etl_utils.bigquery_api import BigQueryApi +from datetime import timedelta @dag( @@ -20,7 +21,7 @@ start_date=days_ago(1), ) def airqo_bam_historical_measurements(): - @task() + @task(provide_context=True, retries=3, retry_delay=timedelta(minutes=5)) def extract_bam_data(**kwargs) -> pd.DataFrame: start_date_time, end_date_time = DateUtils.get_dag_date_time_values( historical=True, **kwargs @@ -31,7 +32,7 @@ def extract_bam_data(**kwargs) -> pd.DataFrame: device_category=DeviceCategory.BAM, ) - @task() + @task(retries=3, retry_delay=timedelta(minutes=5)) def save_unclean_data(data: pd.DataFrame): data = AirQoDataUtils.format_data_for_bigquery( data=data, data_type=DataType.UNCLEAN_BAM_DATA @@ -46,7 +47,7 @@ def save_unclean_data(data: pd.DataFrame): def clean_bam_data(data: pd.DataFrame) -> pd.DataFrame: return AirQoDataUtils.clean_bam_data(data=data) - @task() + @task(retries=3, retry_delay=timedelta(minutes=5)) def save_clean_bam_data(data: pd.DataFrame): data = AirQoDataUtils.format_data_for_bigquery( data=data, data_type=DataType.CLEAN_BAM_DATA @@ -76,13 +77,13 @@ def save_clean_bam_data(data: pd.DataFrame): def airqo_bam_realtime_measurements(): import pandas as pd - @task() - def extract_bam_data(): + @task(provide_context=True, retries=3, retry_delay=timedelta(minutes=5)) + def extract_bam_data(**kwargs): from airqo_etl_utils.airqo_utils import AirQoDataUtils from airqo_etl_utils.date import DateUtils from airqo_etl_utils.constants import DeviceCategory - start_date_time, end_date_time = DateUtils.get_query_date_time_values() + start_date_time, end_date_time = DateUtils.get_query_date_time_values(**kwargs) return AirQoDataUtils.extract_devices_data( start_date_time=start_date_time, @@ -90,7 +91,7 @@ def extract_bam_data(): device_category=DeviceCategory.BAM, ) - @task() + @task(retries=3, retry_delay=timedelta(minutes=5)) def save_unclean_data(data: pd.DataFrame): from airqo_etl_utils.bigquery_api import BigQueryApi from airqo_etl_utils.constants import DataType @@ -111,7 +112,7 @@ def clean_bam_data(data: pd.DataFrame): return AirQoDataUtils.clean_bam_data(data=data) - @task() + @task(retries=3, retry_delay=timedelta(minutes=5)) def save_clean_bam_data(data: pd.DataFrame): from airqo_etl_utils.bigquery_api import BigQueryApi from airqo_etl_utils.constants import DataType @@ -126,23 +127,37 @@ def save_clean_bam_data(data: pd.DataFrame): table=big_query_api.bam_measurements_table, ) - @task() - def update_latest_data_topic(data: pd.DataFrame): + @task(retries=3, retry_delay=timedelta(minutes=5)) + def update_latest_data_topic(data: pd.DataFrame, **kwargs): from airqo_etl_utils.airqo_utils import AirQoDataUtils from airqo_etl_utils.message_broker_utils import MessageBrokerUtils from airqo_etl_utils.constants import DeviceCategory + from airqo_etl_utils.data_validator import DataValidationUtils + from airqo_etl_utils.constants import Tenant + from datetime import datetime + + now = datetime.now() + unique_str = str(now.date()) + "-" + str(now.hour) data = AirQoDataUtils.process_latest_data( data=data, device_category=DeviceCategory.BAM ) - MessageBrokerUtils.update_hourly_data_topic(data=data) + data = DataValidationUtils.process_data_for_message_broker( + data=data, + tenant=Tenant.AIRQO, + topic=configuration.AVERAGED_HOURLY_MEASUREMENTS_TOPIC, + caller=kwargs["dag"].dag_id + unique_str, + ) + broker = MessageBrokerUtils() + broker.publish_to_topic( + topic=configuration.AVERAGED_HOURLY_MEASUREMENTS_TOPIC, data=data + ) unclean_data = extract_bam_data() save_unclean_data(unclean_data) measurements = clean_bam_data(unclean_data) save_clean_bam_data(measurements) - if configuration.ENVIRONMENT == "staging": - update_latest_data_topic(measurements) + update_latest_data_topic(measurements) airqo_bam_realtime_measurements() diff --git a/src/workflows/dags/airqo_measurements.py b/src/workflows/dags/airqo_measurements.py index d15a2dea96..31d9bd6889 100644 --- a/src/workflows/dags/airqo_measurements.py +++ b/src/workflows/dags/airqo_measurements.py @@ -15,6 +15,9 @@ extract_raw_airqo_gaseous_data_doc, ) from datetime import timedelta +import logging + +logger = logging.getLogger(__name__) @dag( @@ -419,17 +422,28 @@ def send_raw_measurements_to_bigquery(airqo_data: pd.DataFrame): big_query_api.load_data(data, table=big_query_api.raw_measurements_table) @task(retries=3, retry_delay=timedelta(minutes=5)) - def update_latest_data_topic(data: pd.DataFrame): + def update_latest_data_topic(data: pd.DataFrame, **kwargs): from airqo_etl_utils.airqo_utils import AirQoDataUtils from airqo_etl_utils.message_broker_utils import MessageBrokerUtils - from airqo_etl_utils.constants import DeviceCategory + from airqo_etl_utils.constants import DeviceCategory, Tenant + from airqo_etl_utils.data_validator import DataValidationUtils + from datetime import datetime + + now = datetime.now() + unique_str = str(now.date()) + "-" + str(now.hour) data = AirQoDataUtils.process_latest_data( data=data, device_category=DeviceCategory.LOW_COST ) + data = DataValidationUtils.process_data_for_message_broker( + data=data, + tenant=Tenant.AIRQO, + topic=configuration.AVERAGED_HOURLY_MEASUREMENTS_TOPIC, + caller=kwargs["dag"].dag_id + unique_str, + ) broker = MessageBrokerUtils() broker.publish_to_topic( - topic=configuration.HOURLY_MEASUREMENTS_TOPIC, data=data + topic=configuration.AVERAGED_HOURLY_MEASUREMENTS_TOPIC, data=data ) raw_data = extract_raw_data()