diff --git a/k8s/analytics/values-prod.yaml b/k8s/analytics/values-prod.yaml index a2983a6786..f2fab2ee23 100644 --- a/k8s/analytics/values-prod.yaml +++ b/k8s/analytics/values-prod.yaml @@ -8,7 +8,7 @@ images: celeryWorker: eu.gcr.io/airqo-250220/airqo-analytics-celery-worker reportJob: eu.gcr.io/airqo-250220/airqo-analytics-report-job devicesSummaryJob: eu.gcr.io/airqo-250220/airqo-analytics-devices-summary-job - tag: prod-d58d02be-1729614498 + tag: prod-65aa1824-1729615373 api: name: airqo-analytics-api label: analytics-api diff --git a/k8s/auth-service/values-prod.yaml b/k8s/auth-service/values-prod.yaml index d924926178..d782478c8f 100644 --- a/k8s/auth-service/values-prod.yaml +++ b/k8s/auth-service/values-prod.yaml @@ -6,7 +6,7 @@ app: replicaCount: 3 image: repository: eu.gcr.io/airqo-250220/airqo-auth-api - tag: prod-d58d02be-1729614498 + tag: prod-65aa1824-1729615373 nameOverride: '' fullnameOverride: '' podAnnotations: {} diff --git a/k8s/auth-service/values-stage.yaml b/k8s/auth-service/values-stage.yaml index 1aec9ce57c..1a4190d1b4 100644 --- a/k8s/auth-service/values-stage.yaml +++ b/k8s/auth-service/values-stage.yaml @@ -6,7 +6,7 @@ app: replicaCount: 2 image: repository: eu.gcr.io/airqo-250220/airqo-stage-auth-api - tag: stage-b55d4116-1729602971 + tag: stage-6631dc02-1729615305 nameOverride: '' fullnameOverride: '' podAnnotations: {} diff --git a/k8s/device-registry/values-prod.yaml b/k8s/device-registry/values-prod.yaml index 848c8c0153..fe94a71bc5 100644 --- a/k8s/device-registry/values-prod.yaml +++ b/k8s/device-registry/values-prod.yaml @@ -6,7 +6,7 @@ app: replicaCount: 3 image: repository: eu.gcr.io/airqo-250220/airqo-device-registry-api - tag: prod-d58d02be-1729614498 + tag: prod-65aa1824-1729615373 nameOverride: '' fullnameOverride: '' podAnnotations: {} diff --git a/k8s/exceedance/values-prod-airqo.yaml b/k8s/exceedance/values-prod-airqo.yaml index d571cf40c4..b7a11b30eb 100644 --- a/k8s/exceedance/values-prod-airqo.yaml +++ b/k8s/exceedance/values-prod-airqo.yaml @@ -4,6 +4,6 @@ app: configmap: env-exceedance-production image: repository: eu.gcr.io/airqo-250220/airqo-exceedance-job - tag: prod-d58d02be-1729614498 + tag: prod-65aa1824-1729615373 nameOverride: '' fullnameOverride: '' diff --git a/k8s/exceedance/values-prod-kcca.yaml b/k8s/exceedance/values-prod-kcca.yaml index 23b95b6ef2..90e974b3e0 100644 --- a/k8s/exceedance/values-prod-kcca.yaml +++ b/k8s/exceedance/values-prod-kcca.yaml @@ -4,6 +4,6 @@ app: configmap: env-exceedance-production image: repository: eu.gcr.io/airqo-250220/kcca-exceedance-job - tag: prod-d58d02be-1729614498 + tag: prod-65aa1824-1729615373 nameOverride: '' fullnameOverride: '' diff --git a/k8s/predict/values-prod.yaml b/k8s/predict/values-prod.yaml index fe566878d7..d5f989352d 100644 --- a/k8s/predict/values-prod.yaml +++ b/k8s/predict/values-prod.yaml @@ -7,7 +7,7 @@ images: predictJob: eu.gcr.io/airqo-250220/airqo-predict-job trainJob: eu.gcr.io/airqo-250220/airqo-train-job predictPlaces: eu.gcr.io/airqo-250220/airqo-predict-places-air-quality - tag: prod-d58d02be-1729614498 + tag: prod-65aa1824-1729615373 api: name: airqo-prediction-api label: prediction-api diff --git a/k8s/spatial/values-prod.yaml b/k8s/spatial/values-prod.yaml index f2ad886421..0f116a5c75 100644 --- a/k8s/spatial/values-prod.yaml +++ b/k8s/spatial/values-prod.yaml @@ -6,7 +6,7 @@ app: replicaCount: 3 image: repository: eu.gcr.io/airqo-250220/airqo-spatial-api - tag: prod-d58d02be-1729614498 + tag: prod-65aa1824-1729615373 nameOverride: '' fullnameOverride: '' podAnnotations: {} diff --git a/k8s/workflows/values-prod.yaml b/k8s/workflows/values-prod.yaml index e98cc097ce..56d67c12c8 100644 --- a/k8s/workflows/values-prod.yaml +++ b/k8s/workflows/values-prod.yaml @@ -10,7 +10,7 @@ images: initContainer: eu.gcr.io/airqo-250220/airqo-workflows-xcom redisContainer: eu.gcr.io/airqo-250220/airqo-redis containers: eu.gcr.io/airqo-250220/airqo-workflows - tag: prod-d58d02be-1729614498 + tag: prod-65aa1824-1729615373 nameOverride: '' fullnameOverride: '' podAnnotations: {} diff --git a/src/workflows/airqo_etl_utils/message_broker_utils.py b/src/workflows/airqo_etl_utils/message_broker_utils.py index 2a505ca624..1fa573e0d8 100644 --- a/src/workflows/airqo_etl_utils/message_broker_utils.py +++ b/src/workflows/airqo_etl_utils/message_broker_utils.py @@ -336,9 +336,9 @@ def publish_to_topic( logger.info(f"Preparing to publish data to topic: {topic}") data.replace(np.nan, None, inplace=True) dataframe_list = data.to_dict("records") + message_counts = 0 if column_key: logger.info(f"Using '{column_key}' as the key for messages") - message_counts = 0 for row in dataframe_list: key = row.pop(column_key, None) if key is None: @@ -351,14 +351,15 @@ def publish_to_topic( selected_partition = ( None if auto_partition else self.__get_least_loaded_partition() ) - message_counts += 1 self._send_message(producer, topic, key, message, selected_partition) if not auto_partition: self.partition_loads[selected_partition] += 1 + message_counts += 1 else: logger.info("No key provided, splitting data into chunks and publishing") for chunk_data in self._generate_chunks(dataframe_list): + message_counts += len(chunk_data) message = json.dumps({"data": chunk_data}).encode("utf-8") selected_partition = (