diff --git a/k8s/analytics/values-prod.yaml b/k8s/analytics/values-prod.yaml index 51bbfd964c..daedda9bf0 100644 --- a/k8s/analytics/values-prod.yaml +++ b/k8s/analytics/values-prod.yaml @@ -8,7 +8,7 @@ images: celeryWorker: eu.gcr.io/airqo-250220/airqo-analytics-celery-worker reportJob: eu.gcr.io/airqo-250220/airqo-analytics-report-job devicesSummaryJob: eu.gcr.io/airqo-250220/airqo-analytics-devices-summary-job - tag: prod-a75ee27d-1729690126 + tag: prod-9cf6d750-1729798017 api: name: airqo-analytics-api label: analytics-api diff --git a/k8s/auth-service/values-prod.yaml b/k8s/auth-service/values-prod.yaml index 7301b8176b..bc7dbfecee 100644 --- a/k8s/auth-service/values-prod.yaml +++ b/k8s/auth-service/values-prod.yaml @@ -6,7 +6,7 @@ app: replicaCount: 3 image: repository: eu.gcr.io/airqo-250220/airqo-auth-api - tag: prod-a75ee27d-1729690126 + tag: prod-9cf6d750-1729798017 nameOverride: '' fullnameOverride: '' podAnnotations: {} diff --git a/k8s/device-registry/values-prod.yaml b/k8s/device-registry/values-prod.yaml index 63694c4216..1420d2cd52 100644 --- a/k8s/device-registry/values-prod.yaml +++ b/k8s/device-registry/values-prod.yaml @@ -6,7 +6,7 @@ app: replicaCount: 3 image: repository: eu.gcr.io/airqo-250220/airqo-device-registry-api - tag: prod-a75ee27d-1729690126 + tag: prod-9cf6d750-1729798017 nameOverride: '' fullnameOverride: '' podAnnotations: {} diff --git a/k8s/exceedance/values-prod-airqo.yaml b/k8s/exceedance/values-prod-airqo.yaml index 69f103ef9c..7d002d3770 100644 --- a/k8s/exceedance/values-prod-airqo.yaml +++ b/k8s/exceedance/values-prod-airqo.yaml @@ -4,6 +4,6 @@ app: configmap: env-exceedance-production image: repository: eu.gcr.io/airqo-250220/airqo-exceedance-job - tag: prod-a75ee27d-1729690126 + tag: prod-9cf6d750-1729798017 nameOverride: '' fullnameOverride: '' diff --git a/k8s/exceedance/values-prod-kcca.yaml b/k8s/exceedance/values-prod-kcca.yaml index a77882d0c3..34a0824a9a 100644 --- a/k8s/exceedance/values-prod-kcca.yaml +++ b/k8s/exceedance/values-prod-kcca.yaml @@ -4,6 +4,6 @@ app: configmap: env-exceedance-production image: repository: eu.gcr.io/airqo-250220/kcca-exceedance-job - tag: prod-a75ee27d-1729690126 + tag: prod-9cf6d750-1729798017 nameOverride: '' fullnameOverride: '' diff --git a/k8s/predict/values-prod.yaml b/k8s/predict/values-prod.yaml index 484efcc07a..e96473f824 100644 --- a/k8s/predict/values-prod.yaml +++ b/k8s/predict/values-prod.yaml @@ -7,7 +7,7 @@ images: predictJob: eu.gcr.io/airqo-250220/airqo-predict-job trainJob: eu.gcr.io/airqo-250220/airqo-train-job predictPlaces: eu.gcr.io/airqo-250220/airqo-predict-places-air-quality - tag: prod-a75ee27d-1729690126 + tag: prod-9cf6d750-1729798017 api: name: airqo-prediction-api label: prediction-api diff --git a/k8s/workflows/values-prod.yaml b/k8s/workflows/values-prod.yaml index 712961055f..51f44d1118 100644 --- a/k8s/workflows/values-prod.yaml +++ b/k8s/workflows/values-prod.yaml @@ -10,7 +10,7 @@ images: initContainer: eu.gcr.io/airqo-250220/airqo-workflows-xcom redisContainer: eu.gcr.io/airqo-250220/airqo-redis containers: eu.gcr.io/airqo-250220/airqo-workflows - tag: prod-a75ee27d-1729690126 + tag: prod-9cf6d750-1729798017 nameOverride: '' fullnameOverride: '' podAnnotations: {} diff --git a/k8s/workflows/values-stage.yaml b/k8s/workflows/values-stage.yaml index 98da251750..37b1433dae 100644 --- a/k8s/workflows/values-stage.yaml +++ b/k8s/workflows/values-stage.yaml @@ -10,7 +10,7 @@ images: initContainer: eu.gcr.io/airqo-250220/airqo-stage-workflows-xcom redisContainer: eu.gcr.io/airqo-250220/airqo-stage-redis containers: eu.gcr.io/airqo-250220/airqo-stage-workflows - tag: stage-8e811f9a-1729782865 + tag: stage-cf11c99f-1729797943 nameOverride: '' fullnameOverride: '' podAnnotations: {} diff --git a/src/workflows/airqo_etl_utils/data_validator.py b/src/workflows/airqo_etl_utils/data_validator.py index 042cc831c7..5178127127 100644 --- a/src/workflows/airqo_etl_utils/data_validator.py +++ b/src/workflows/airqo_etl_utils/data_validator.py @@ -277,8 +277,23 @@ def process_data_for_api(data: pd.DataFrame) -> list: return restructured_data def transform_devices(devices: List[Dict[str, Any]], task_instance) -> pd.DataFrame: + """ + Transforms and processes the devices DataFrame. If the checksum of the + devices data has not changed since the last execution, it returns an empty DataFrame. + Otherwise, it updates the checksum in XCom and returns the transformed DataFrame. + + Args: + devices (pd.DataFrame): A Pandas DataFrame containing the devices data. + task_instance: The Airflow task instance used to pull and push XCom values. + + Returns: + pd.DataFrame: Transformed DataFrame if the devices data has changed since + the last execution; otherwise, an empty DataFrame. + """ + from .airqo_utils import ( + AirQoDataUtils, + ) import hashlib - from .airqo_utils import AirQoDataUtils devices.rename( columns={ @@ -290,12 +305,15 @@ def transform_devices(devices: List[Dict[str, Any]], task_instance) -> pd.DataFr inplace=True, ) + # Convert devices DataFrame to JSON for consistency since JSON stores metadata and compute checksum devices_json = devices.to_json(orient="records", date_format="iso") api_devices_checksum = hashlib.md5(devices_json.encode()).hexdigest() - previous_cheksum = task_instance.xcom_pull(key="devices_checksum") - if previous_cheksum and previous_cheksum == api_devices_checksum: + previous_checksum = task_instance.xcom_pull(key="devices_checksum") + + if previous_checksum == api_devices_checksum: return pd.DataFrame() task_instance.xcom_push(key="devices_checksum", value=api_devices_checksum) + return devices diff --git a/src/workflows/dags/airqo_kafka_workflows.py b/src/workflows/dags/airqo_kafka_workflows.py index ac25315c36..a670ff2b93 100644 --- a/src/workflows/dags/airqo_kafka_workflows.py +++ b/src/workflows/dags/airqo_kafka_workflows.py @@ -12,7 +12,7 @@ doc_md=extract_store_devices_data_in_kafka, default_args=AirflowUtils.dag_default_configs(), catchup=False, - tags=["devices", "hourly", "low cost"], + tags=["devices", "kafka"], ) def airqo_devices_data(): import pandas as pd @@ -30,10 +30,16 @@ def send_device_data_to_broker(devices: pd.DataFrame, **kwargs) -> None: devices = DataValidationUtils.transform_devices( devices=devices, taskinstance=kwargs["ti"] ) - broker = MessageBrokerUtils() - broker.publish_to_topic( - data=devices, topic=configuration.DEVICES_TOPIC, column_key="device_name" - ) + if not devices.empty: + broker = MessageBrokerUtils() + broker.publish_to_topic( + data=devices, + topic=configuration.DEVICES_TOPIC, + column_key="device_name", + ) extracted_device = extract_devices() send_device_data_to_broker(extracted_device) + + +airqo_devices_data()