Skip to content

Commit

Permalink
Merge pull request #3788 from NicholasTurner23/update-fix/pipeline-ta…
Browse files Browse the repository at this point in the history
…sk-retries

Update fix/pipeline task retries
  • Loading branch information
Baalmart authored Oct 29, 2024
2 parents 56da7ef + c48d844 commit 0b2c004
Show file tree
Hide file tree
Showing 8 changed files with 134 additions and 58 deletions.
15 changes: 15 additions & 0 deletions k8s/kafka/topics/kafka-topics.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,21 @@ spec:
config:
retention.ms: 18000000

---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
name: averaged-hourly-measurements-topic
namespace: message-broker
labels:
strimzi.io/cluster: kafka-cluster
spec:
partitions: 3
replicas: 2
config:
min.insync.replicas: 2
retention.ms: 18000000

---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
Expand Down
3 changes: 2 additions & 1 deletion src/workflows/airqo_etl_utils/airqo_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1058,7 +1058,8 @@ def get_devices(group_id: str) -> pd.DataFrame:
except json.JSONDecodeError as e:
logger.exception(f"Error decoding JSON: {e}")
continue
if not key or not value.get("_id") or not value.get("device_id"):

if not key or not value.get("device_id"):
logger.warning(
f"Skipping message with key: {key}, missing 'device_id'."
)
Expand Down
6 changes: 5 additions & 1 deletion src/workflows/airqo_etl_utils/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ class Config:
OPENWEATHER_DATA_BATCH_SIZE = os.getenv("OPENWEATHER_DATA_BATCH_SIZE")
# Kafka
BOOTSTRAP_SERVERS = os.getenv("BOOTSTRAP_SERVERS", "localhost:9092")
TOPIC_PARTITIONS = os.getenv("TOPIC_PARTITIONS", "1,2,3,4").split(",")
TOPIC_PARTITIONS = os.getenv("TOPIC_PARTITIONS", "0,1,2").split(",")
SCHEMA_REGISTRY_URL = os.getenv("SCHEMA_REGISTRY_URL")

# Kafka Topics
Expand All @@ -101,6 +101,10 @@ class Config:
HOURLY_MEASUREMENTS_TOPIC = os.getenv("HOURLY_MEASUREMENTS_TOPIC")
BAM_MEASUREMENTS_TOPIC = os.getenv("BAM_MEASUREMENTS_TOPIC")
DEVICES_TOPIC = os.getenv("DEVICES_TOPIC")
CALIBRATED_HOURLY_MEASUREMENTS_TOPIC = os.getenv(
"CALIBRATED_HOURLY_MEASUREMENTS_TOPIC"
)
AVERAGED_HOURLY_MEASUREMENTS_TOPIC = os.getenv("AVERAGED_HOURLY_MEASUREMENTS_TOPIC")

# Airnow
AIRNOW_BASE_URL = os.getenv("AIRNOW_BASE_URL")
Expand Down
38 changes: 15 additions & 23 deletions src/workflows/airqo_etl_utils/data_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,30 +165,22 @@ def process_data_for_message_broker(
if tenant != Tenant.ALL:
data.loc[:, "tenant"] = str(tenant)

# Additional processing for hourly measurements topic
if topic == configuration.HOURLY_MEASUREMENTS_TOPIC:
data.rename(columns={"device_id": "device_name"}, inplace=True)

devices = AirQoDataUtils.get_devices(group_id=caller)
devices = devices[["tenant", "_id", "site_id", "latitude", "longitude"]]
devices.rename(
columns={
"_id": "device_id",
"latitude": "device_latitude",
"longitude": "device_longitude",
},
inplace=True,
)

data = pd.merge(
left=data,
right=devices,
on=["device_name", "site_id", "tenant"],
how="left",
)
data.rename(columns={"device_id": "device_name"}, inplace=True)

devices = AirQoDataUtils.get_devices(group_id=caller)
devices = devices[
["tenant", "device_name", "site_id", "device_latitude", "device_longitude"]
]

data = pd.merge(
left=data,
right=devices,
on=["device_name", "site_id", "tenant"],
how="left",
)

data.rename(columns={"tenant": "network"}, inplace=True)
data["tenant"] = str(Tenant.AIRQO)
data.rename(columns={"tenant": "network"}, inplace=True)
data["tenant"] = str(Tenant.AIRQO)

return data

Expand Down
9 changes: 6 additions & 3 deletions src/workflows/airqo_etl_utils/date.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,15 @@ def get_dag_date_time_values(
return start_date_time, end_date_time

@staticmethod
def get_query_date_time_values(hours=1, days=0):
start_date_time = datetime.now(timezone.utc) - timedelta(hours=hours)
def get_query_date_time_values(hours=1, days=0, **kwargs):

execution_date = kwargs["dag_run"].execution_date

start_date_time = execution_date - timedelta(hours=hours)
end_date_time = start_date_time + timedelta(hours=hours)

if days != 0:
start_date_time = datetime.now(timezone.utc) - timedelta(days=days)
start_date_time = execution_date - timedelta(days=days)
end_date_time = start_date_time + timedelta(days=days)

return date_to_str_hours(start_date_time), date_to_str_hours(end_date_time)
Expand Down
60 changes: 46 additions & 14 deletions src/workflows/dags/airnow.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
from airqo_etl_utils.workflows_custom_utils import AirflowUtils
from datetime import timedelta
from airqo_etl_utils.config import configuration


# Historical Data DAG
Expand All @@ -15,7 +17,7 @@
def airnow_bam_historical_data():
import pandas as pd

@task()
@task(provide_context=True, retries=3, retry_delay=timedelta(minutes=5))
def extract_bam_data(**kwargs):
from airqo_etl_utils.date import DateUtils
from airqo_etl_utils.airnow_utils import AirnowDataUtils
Expand All @@ -34,13 +36,28 @@ def process_data(data: pd.DataFrame):

return AirnowDataUtils.process_bam_data(data=data)

@task()
def send_to_message_broker(data: pd.DataFrame):
@task(retries=3, retry_delay=timedelta(minutes=5))
def send_to_message_broker(data: pd.DataFrame, **kwargs):
from airqo_etl_utils.message_broker_utils import MessageBrokerUtils
from airqo_etl_utils.data_validator import DataValidationUtils
from airqo_etl_utils.constants import Tenant
from datetime import datetime

MessageBrokerUtils.update_hourly_data_topic(data=data)
now = datetime.now()
unique_str = str(now.date()) + "-" + str(now.hour)

@task()
data = DataValidationUtils.process_data_for_message_broker(
data=data,
tenant=Tenant.AIRQO,
topic=configuration.HOURLY_MEASUREMENTS_TOPIC,
caller=kwargs["dag"].dag_id + unique_str,
)
broker = MessageBrokerUtils()
broker.publish_to_topic(
topic=configuration.HOURLY_MEASUREMENTS_TOPIC, data=data
)

@task(retries=3, retry_delay=timedelta(minutes=5))
def send_to_bigquery(data: pd.DataFrame):
from airqo_etl_utils.bigquery_api import BigQueryApi
from airqo_etl_utils.data_validator import DataValidationUtils
Expand All @@ -53,7 +70,7 @@ def send_to_bigquery(data: pd.DataFrame):
)
big_query_api.load_data(dataframe=processed_data, table=table)

@task()
@task(retries=3, retry_delay=timedelta(minutes=5))
def send_to_api(data: pd.DataFrame, **kwargs):
send_to_api_param = kwargs.get("params", {}).get("send_to_api")
if send_to_api_param:
Expand Down Expand Up @@ -85,12 +102,12 @@ def send_to_api(data: pd.DataFrame, **kwargs):
def airnow_bam_realtime_data():
import pandas as pd

@task()
def extract_bam_data():
@task(provide_context=True, retries=3, retry_delay=timedelta(minutes=5))
def extract_bam_data(**kwargs):
from airqo_etl_utils.airnow_utils import AirnowDataUtils
from airqo_etl_utils.date import DateUtils

start_date_time, end_date_time = DateUtils.get_query_date_time_values()
start_date_time, end_date_time = DateUtils.get_query_date_time_values(**kwargs)
return AirnowDataUtils.extract_bam_data(
start_date_time=start_date_time,
end_date_time=end_date_time,
Expand All @@ -102,13 +119,28 @@ def process_data(data: pd.DataFrame):

return AirnowDataUtils.process_bam_data(data=data)

@task()
def send_to_message_broker(data: pd.DataFrame):
@task(retries=3, retry_delay=timedelta(minutes=5))
def send_to_message_broker(data: pd.DataFrame, **kwargs):
from airqo_etl_utils.message_broker_utils import MessageBrokerUtils
from airqo_etl_utils.data_validator import DataValidationUtils
from airqo_etl_utils.constants import Tenant
from datetime import datetime

MessageBrokerUtils.update_hourly_data_topic(data=data)
now = datetime.now()
unique_str = str(now.date()) + "-" + str(now.hour)

@task()
data = DataValidationUtils.process_data_for_message_broker(
data=data,
tenant=Tenant.AIRQO,
topic=configuration.HOURLY_MEASUREMENTS_TOPIC,
caller=kwargs["dag"].dag_id + unique_str,
)
broker = MessageBrokerUtils()
broker.publish_to_topic(
topic=configuration.HOURLY_MEASUREMENTS_TOPIC, data=data
)

@task(retries=3, retry_delay=timedelta(minutes=5))
def send_to_bigquery(data: pd.DataFrame):
from airqo_etl_utils.bigquery_api import BigQueryApi
from airqo_etl_utils.data_validator import DataValidationUtils
Expand All @@ -121,7 +153,7 @@ def send_to_bigquery(data: pd.DataFrame):
)
big_query_api.load_data(dataframe=processed_data, table=table)

@task()
@task(retries=3, retry_delay=timedelta(minutes=5))
def send_to_api(data: pd.DataFrame):
from airqo_etl_utils.data_validator import DataValidationUtils
from airqo_etl_utils.airqo_api import AirQoApi
Expand Down
41 changes: 28 additions & 13 deletions src/workflows/dags/airqo_bam_measurements.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from airqo_etl_utils.constants import DeviceCategory, DataType
from airqo_etl_utils.date import DateUtils
from airqo_etl_utils.bigquery_api import BigQueryApi
from datetime import timedelta


@dag(
Expand All @@ -20,7 +21,7 @@
start_date=days_ago(1),
)
def airqo_bam_historical_measurements():
@task()
@task(provide_context=True, retries=3, retry_delay=timedelta(minutes=5))
def extract_bam_data(**kwargs) -> pd.DataFrame:
start_date_time, end_date_time = DateUtils.get_dag_date_time_values(
historical=True, **kwargs
Expand All @@ -31,7 +32,7 @@ def extract_bam_data(**kwargs) -> pd.DataFrame:
device_category=DeviceCategory.BAM,
)

@task()
@task(retries=3, retry_delay=timedelta(minutes=5))
def save_unclean_data(data: pd.DataFrame):
data = AirQoDataUtils.format_data_for_bigquery(
data=data, data_type=DataType.UNCLEAN_BAM_DATA
Expand All @@ -46,7 +47,7 @@ def save_unclean_data(data: pd.DataFrame):
def clean_bam_data(data: pd.DataFrame) -> pd.DataFrame:
return AirQoDataUtils.clean_bam_data(data=data)

@task()
@task(retries=3, retry_delay=timedelta(minutes=5))
def save_clean_bam_data(data: pd.DataFrame):
data = AirQoDataUtils.format_data_for_bigquery(
data=data, data_type=DataType.CLEAN_BAM_DATA
Expand Down Expand Up @@ -76,21 +77,21 @@ def save_clean_bam_data(data: pd.DataFrame):
def airqo_bam_realtime_measurements():
import pandas as pd

@task()
def extract_bam_data():
@task(provide_context=True, retries=3, retry_delay=timedelta(minutes=5))
def extract_bam_data(**kwargs):
from airqo_etl_utils.airqo_utils import AirQoDataUtils
from airqo_etl_utils.date import DateUtils
from airqo_etl_utils.constants import DeviceCategory

start_date_time, end_date_time = DateUtils.get_query_date_time_values()
start_date_time, end_date_time = DateUtils.get_query_date_time_values(**kwargs)

return AirQoDataUtils.extract_devices_data(
start_date_time=start_date_time,
end_date_time=end_date_time,
device_category=DeviceCategory.BAM,
)

@task()
@task(retries=3, retry_delay=timedelta(minutes=5))
def save_unclean_data(data: pd.DataFrame):
from airqo_etl_utils.bigquery_api import BigQueryApi
from airqo_etl_utils.constants import DataType
Expand All @@ -111,7 +112,7 @@ def clean_bam_data(data: pd.DataFrame):

return AirQoDataUtils.clean_bam_data(data=data)

@task()
@task(retries=3, retry_delay=timedelta(minutes=5))
def save_clean_bam_data(data: pd.DataFrame):
from airqo_etl_utils.bigquery_api import BigQueryApi
from airqo_etl_utils.constants import DataType
Expand All @@ -126,23 +127,37 @@ def save_clean_bam_data(data: pd.DataFrame):
table=big_query_api.bam_measurements_table,
)

@task()
def update_latest_data_topic(data: pd.DataFrame):
@task(retries=3, retry_delay=timedelta(minutes=5))
def update_latest_data_topic(data: pd.DataFrame, **kwargs):
from airqo_etl_utils.airqo_utils import AirQoDataUtils
from airqo_etl_utils.message_broker_utils import MessageBrokerUtils
from airqo_etl_utils.constants import DeviceCategory
from airqo_etl_utils.data_validator import DataValidationUtils
from airqo_etl_utils.constants import Tenant
from datetime import datetime

now = datetime.now()
unique_str = str(now.date()) + "-" + str(now.hour)

data = AirQoDataUtils.process_latest_data(
data=data, device_category=DeviceCategory.BAM
)
MessageBrokerUtils.update_hourly_data_topic(data=data)
data = DataValidationUtils.process_data_for_message_broker(
data=data,
tenant=Tenant.AIRQO,
topic=configuration.AVERAGED_HOURLY_MEASUREMENTS_TOPIC,
caller=kwargs["dag"].dag_id + unique_str,
)
broker = MessageBrokerUtils()
broker.publish_to_topic(
topic=configuration.AVERAGED_HOURLY_MEASUREMENTS_TOPIC, data=data
)

unclean_data = extract_bam_data()
save_unclean_data(unclean_data)
measurements = clean_bam_data(unclean_data)
save_clean_bam_data(measurements)
if configuration.ENVIRONMENT == "staging":
update_latest_data_topic(measurements)
update_latest_data_topic(measurements)


airqo_bam_realtime_measurements()
Expand Down
20 changes: 17 additions & 3 deletions src/workflows/dags/airqo_measurements.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
extract_raw_airqo_gaseous_data_doc,
)
from datetime import timedelta
import logging

logger = logging.getLogger(__name__)


@dag(
Expand Down Expand Up @@ -419,17 +422,28 @@ def send_raw_measurements_to_bigquery(airqo_data: pd.DataFrame):
big_query_api.load_data(data, table=big_query_api.raw_measurements_table)

@task(retries=3, retry_delay=timedelta(minutes=5))
def update_latest_data_topic(data: pd.DataFrame):
def update_latest_data_topic(data: pd.DataFrame, **kwargs):
from airqo_etl_utils.airqo_utils import AirQoDataUtils
from airqo_etl_utils.message_broker_utils import MessageBrokerUtils
from airqo_etl_utils.constants import DeviceCategory
from airqo_etl_utils.constants import DeviceCategory, Tenant
from airqo_etl_utils.data_validator import DataValidationUtils
from datetime import datetime

now = datetime.now()
unique_str = str(now.date()) + "-" + str(now.hour)

data = AirQoDataUtils.process_latest_data(
data=data, device_category=DeviceCategory.LOW_COST
)
data = DataValidationUtils.process_data_for_message_broker(
data=data,
tenant=Tenant.AIRQO,
topic=configuration.AVERAGED_HOURLY_MEASUREMENTS_TOPIC,
caller=kwargs["dag"].dag_id + unique_str,
)
broker = MessageBrokerUtils()
broker.publish_to_topic(
topic=configuration.HOURLY_MEASUREMENTS_TOPIC, data=data
topic=configuration.AVERAGED_HOURLY_MEASUREMENTS_TOPIC, data=data
)

raw_data = extract_raw_data()
Expand Down

0 comments on commit 0b2c004

Please sign in to comment.