Skip to content

Commit

Permalink
Merge pull request #3783 from airqo-platform/staging
Browse files Browse the repository at this point in the history
move to production
  • Loading branch information
Baalmart authored Oct 28, 2024
2 parents bbf47c3 + f4c5a75 commit cf1605b
Show file tree
Hide file tree
Showing 10 changed files with 24 additions and 21 deletions.
2 changes: 1 addition & 1 deletion k8s/analytics/values-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ images:
celeryWorker: eu.gcr.io/airqo-250220/airqo-analytics-celery-worker
reportJob: eu.gcr.io/airqo-250220/airqo-analytics-report-job
devicesSummaryJob: eu.gcr.io/airqo-250220/airqo-analytics-devices-summary-job
tag: prod-6498d7df-1729853632
tag: prod-bbf47c3d-1729854005
api:
name: airqo-analytics-api
label: analytics-api
Expand Down
2 changes: 1 addition & 1 deletion k8s/device-registry/values-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ app:
replicaCount: 3
image:
repository: eu.gcr.io/airqo-250220/airqo-device-registry-api
tag: prod-6498d7df-1729853632
tag: prod-bbf47c3d-1729854005
nameOverride: ''
fullnameOverride: ''
podAnnotations: {}
Expand Down
2 changes: 1 addition & 1 deletion k8s/exceedance/values-prod-airqo.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ app:
configmap: env-exceedance-production
image:
repository: eu.gcr.io/airqo-250220/airqo-exceedance-job
tag: prod-6498d7df-1729853632
tag: prod-bbf47c3d-1729854005
nameOverride: ''
fullnameOverride: ''
2 changes: 1 addition & 1 deletion k8s/exceedance/values-prod-kcca.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ app:
configmap: env-exceedance-production
image:
repository: eu.gcr.io/airqo-250220/kcca-exceedance-job
tag: prod-6498d7df-1729853632
tag: prod-bbf47c3d-1729854005
nameOverride: ''
fullnameOverride: ''
2 changes: 1 addition & 1 deletion k8s/predict/values-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ images:
predictJob: eu.gcr.io/airqo-250220/airqo-predict-job
trainJob: eu.gcr.io/airqo-250220/airqo-train-job
predictPlaces: eu.gcr.io/airqo-250220/airqo-predict-places-air-quality
tag: prod-6498d7df-1729853632
tag: prod-bbf47c3d-1729854005
api:
name: airqo-prediction-api
label: prediction-api
Expand Down
2 changes: 1 addition & 1 deletion k8s/workflows/values-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ images:
initContainer: eu.gcr.io/airqo-250220/airqo-workflows-xcom
redisContainer: eu.gcr.io/airqo-250220/airqo-redis
containers: eu.gcr.io/airqo-250220/airqo-workflows
tag: prod-6498d7df-1729853632
tag: prod-bbf47c3d-1729854005
nameOverride: ''
fullnameOverride: ''
podAnnotations: {}
Expand Down
2 changes: 1 addition & 1 deletion k8s/workflows/values-stage.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ images:
initContainer: eu.gcr.io/airqo-250220/airqo-stage-workflows-xcom
redisContainer: eu.gcr.io/airqo-250220/airqo-stage-redis
containers: eu.gcr.io/airqo-250220/airqo-stage-workflows
tag: stage-3aaeff8b-1729851291
tag: stage-b3d5eb96-1729853938
nameOverride: ''
fullnameOverride: ''
podAnnotations: {}
Expand Down
7 changes: 6 additions & 1 deletion src/workflows/airqo_etl_utils/airqo_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,12 @@ def get_devices(
params["network"] = str(tenant)

# Note: There is an option of using <api/v2/devices> if more device details are required as shown in the doc string return payload.
response = self.__request("devices/summary", params)
try:
response = self.__request("devices/summary", params)
except Exception as e:
logger.exception(f"Failed to fetch devices: {e}")
return []

devices = [
{
"device_id": device.pop("name"),
Expand Down
20 changes: 11 additions & 9 deletions src/workflows/airqo_etl_utils/data_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ def remove_outliers(data: pd.DataFrame) -> pd.DataFrame:
def fill_missing_columns(data: pd.DataFrame, cols: list) -> pd.DataFrame:
for col in cols:
if col not in list(data.columns):
print(f"{col} missing in dataframe")
logger.warning(f"{col} missing in dataframe")
data.loc[:, col] = None

return data
Expand Down Expand Up @@ -271,8 +271,7 @@ def process_data_for_api(data: pd.DataFrame) -> list:
restructured_data.append(row_data)

except Exception as ex:
logger.exception(f"Error ocurred: {e}")
print(ex)
logger.exception(f"Error ocurred: {ex}")

return restructured_data

Expand Down Expand Up @@ -304,14 +303,17 @@ def transform_devices(devices: List[Dict[str, Any]], taskinstance) -> pd.DataFra
)

# Convert devices DataFrame to JSON for consistency since JSON stores metadata and compute checksum
devices_json = devices.to_json(orient="records", date_format="iso")
api_devices_checksum = hashlib.md5(devices_json.encode()).hexdigest()
if not devices.empty:
devices_json = devices.to_json(orient="records", date_format="iso")
api_devices_checksum = hashlib.md5(devices_json.encode()).hexdigest()

previous_checksum = taskinstance.xcom_pull(key="devices_checksum")
previous_checksum = taskinstance.xcom_pull(key="devices_checksum")

if previous_checksum == api_devices_checksum:
return pd.DataFrame()
if previous_checksum == api_devices_checksum:
return pd.DataFrame()

taskinstance.xcom_push(key="devices_checksum", value=api_devices_checksum)
taskinstance.xcom_push(key="devices_checksum", value=api_devices_checksum)
else:
logger.warning("No devices returned.")

return devices
4 changes: 0 additions & 4 deletions src/workflows/airqo_etl_utils/message_broker_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,6 @@ def consume_from_topic(
"fetch.message.max.bytes": 2 * 1024 * 1024,
}
)

consumer = Consumer(consumer_config)
consumer.subscribe([topic])

Expand All @@ -245,20 +244,17 @@ def consume_from_topic(
if msg is not None and msg.error() is None:
assigned = True
wait_time_sec -= 1

if offset is not None:
logger.info(f"Seeking to offset {offset} for all partitions...")
partitions = [
TopicPartition(topic, p.partition, p.offset)
for p in consumer.assignment()
]
consumer.assign(partitions)

message_count = 0
try:
while streaming or (message_count < max_messages if max_messages else True):
msg = consumer.poll(timeout=1.0)

if msg is None:
logger.info("No messages in this poll.")
if not streaming:
Expand Down

0 comments on commit cf1605b

Please sign in to comment.