diff --git a/src/workflows/airqo_etl_utils/airnow_utils.py b/src/workflows/airqo_etl_utils/airnow_utils.py index d06b1aeb27..572c4fe0f5 100644 --- a/src/workflows/airqo_etl_utils/airnow_utils.py +++ b/src/workflows/airqo_etl_utils/airnow_utils.py @@ -4,7 +4,7 @@ from .airnow_api import AirNowApi from .airqo_api import AirQoApi -from .constants import DataSource, DeviceCategory, Frequency +from .constants import DataSource, DeviceCategory, Frequency, DeviceNetwork from .data_validator import DataValidationUtils from .date import str_to_date, date_to_str from .utils import Utils @@ -68,7 +68,9 @@ def extract_bam_data(start_date_time: str, end_date_time: str) -> pd.DataFrame: Raises: ValueError: If no devices are found for the BAM network or if no data is returned for the specified date range. """ - devices = AirQoApi().get_devices_by_network(DeviceCategory.BAM) + devices = AirQoApi().get_devices_by_network( + device_network=DeviceNetwork.METONE, device_category=DeviceCategory.BAM + ) bam_data = pd.DataFrame() if not devices: @@ -119,7 +121,12 @@ def process_bam_data(data: pd.DataFrame) -> pd.DataFrame: """ air_now_data = [] - devices = AirQoApi().get_devices(device_category=DeviceCategory.BAM) + devices = AirQoApi().get_devices_by_network( + device_network=DeviceNetwork.METONE, device_category=DeviceCategory.BAM + ) + + # Initialize pollutant values (note: pm10 and no2 are not always present) + pollutant_value = {"pm2_5": None, "pm10": None, "no2": None} # Precompute device mapping for faster lookup device_mapping = {} @@ -129,17 +136,15 @@ def process_bam_data(data: pd.DataFrame) -> pd.DataFrame: for _, row in data.iterrows(): try: - device_id = str(row["FullAQSCode"]) + # Temp external device id + device_id_ = str(row["FullAQSCode"]) # Lookup device details based on FullAQSCode - device_details = device_mapping.get(device_id) + device_details = device_mapping.get(device_id_) if not device_details: - logger.exception(f"Device with ID {device_id} not found") + logger.exception(f"Device with ID {device_id_} not found") continue - # Initialize pollutant values (note: pm10 and no2 are not always present) - pollutant_value = {"pm2_5": None, "pm10": None, "no2": None} - # Get the corresponding pollutant value for the current parameter parameter_col_name = AirnowDataUtils.parameter_column_name( row["Parameter"] @@ -148,7 +153,7 @@ def process_bam_data(data: pd.DataFrame) -> pd.DataFrame: pollutant_value[parameter_col_name] = row["Value"] if row["network"] != device_details.get("network"): - logger.exception(f"Network mismatch for device ID {device_id}") + logger.exception(f"Network mismatch for device ID {device_id_}") continue air_now_data.append( @@ -156,8 +161,8 @@ def process_bam_data(data: pd.DataFrame) -> pd.DataFrame: "timestamp": row["UTC"], "network": row["network"], "site_id": device_details.get("site_id"), - "device_id": device_details.get("device_id"), - "mongo_id": device_details.get("mongo_id"), + "device_id": device_details.get("name"), + "mongo_id": device_details.get("_id"), "device_number": device_details.get("device_number"), "frequency": str(Frequency.HOURLY), "latitude": row["Latitude"], diff --git a/src/workflows/airqo_etl_utils/airqo_api.py b/src/workflows/airqo_etl_utils/airqo_api.py index 48ba58abfe..76740f6934 100644 --- a/src/workflows/airqo_etl_utils/airqo_api.py +++ b/src/workflows/airqo_etl_utils/airqo_api.py @@ -26,8 +26,8 @@ def __init__(self) -> None: def save_events(self, measurements: List) -> None: # Temporarily disabling usage of the API to store measurements. - if "staging" in self.AIRQO_BASE_URL_V2.lower(): - return + # if "staging" in self.AIRQO_BASE_URL_V2.lower(): + # return # TODO Findout if there is a bulk post api option greater than 5. for i in range(0, len(measurements), int(configuration.POST_EVENTS_BODY_SIZE)): data = measurements[i : i + int(configuration.POST_EVENTS_BODY_SIZE)] @@ -41,7 +41,7 @@ def save_events(self, measurements: List) -> None: ) def get_maintenance_logs( - self, tenant: str, device: str, activity_type: str = None + self, network: str, device: str, activity_type: str = None ) -> List: """ Retrieve devices given a tenant and device category. @@ -70,7 +70,7 @@ def get_maintenance_logs( ] """ # Why is tenant still a parameter when it is being overriden. - params = {"tenant": str(Tenant.AIRQO), "device": device} + params = {"network": network, "device": device} if activity_type: params["activity_type"] = activity_type @@ -241,12 +241,14 @@ def get_networks( return networks, exception_message def get_devices_by_network( - self, device_category: DeviceCategory = None + self, device_network: str = None, device_category: DeviceCategory = None ) -> List[Dict[str, Any]]: """ Retrieve devices by network based on the specified device category. - Args: device_category (DeviceCategory, optional): The category of devices to retrieve. Defaults to `DeviceCategory.LOW_COST`. + Args: + network (str): This defines the network or manufacture of the device(s) to retrieve. Defaults to `None`. If not passed, devices from all networks are returned. + device_category (DeviceCategory, optional): The category of devices to retrieve. Defaults to `None`. If not passed, devices from all categories are returned. Returns: List[Dict[str, Any]]: A List of dictionaries containing the details of the devices. The dictionary has the following structure. @@ -284,12 +286,15 @@ def get_devices_by_network( ] """ devices: List[Dict[str, Any]] = [] - networks, error = self.get_networks() + networks: List[str] = [] params: Dict = {} - - if error: - logger.error(f"Error while fetching networks: {error}") - return devices + if device_network: + networks.append({"net_name": device_network}) + else: + networks, error = self.get_networks() + if error: + logger.error(f"Error while fetching networks: {error}") + return devices if device_category: params["category"] = str(device_category) diff --git a/src/workflows/airqo_etl_utils/airqo_utils.py b/src/workflows/airqo_etl_utils/airqo_utils.py index 25b73e4a8a..4e1b281b3f 100644 --- a/src/workflows/airqo_etl_utils/airqo_utils.py +++ b/src/workflows/airqo_etl_utils/airqo_utils.py @@ -8,7 +8,7 @@ from .config import configuration from .constants import ( DeviceCategory, - Tenant, + DeviceNetwork, Frequency, DataSource, DataType, @@ -59,7 +59,7 @@ def extract_uncalibrated_data(start_date_time, end_date_time) -> pd.DataFrame: null_cols=["pm2_5_calibrated_value"], start_date_time=start_date_time, end_date_time=end_date_time, - network=str(Tenant.AIRQO), + network=DeviceNetwork.AIRQO, ) return DataValidationUtils.remove_outliers(hourly_uncalibrated_data) @@ -79,7 +79,7 @@ def extract_data_from_bigquery( table=table, start_date_time=start_date_time, end_date_time=end_date_time, - network=str(Tenant.AIRQO), + network=DeviceNetwork.AIRQO, ) return DataValidationUtils.remove_outliers(raw_data) @@ -392,7 +392,7 @@ def merge_aggregated_mobile_devices_data_and_weather_data( @staticmethod def restructure_airqo_mobile_data_for_bigquery(data: pd.DataFrame) -> pd.DataFrame: data["timestamp"] = pd.to_datetime(data["timestamp"]) - data["tenant"] = "airqo" + data["network"] = "airqo" big_query_api = BigQueryApi() cols = big_query_api.get_columns( table=big_query_api.airqo_mobile_measurements_table @@ -404,12 +404,13 @@ def extract_devices_data( start_date_time: str, end_date_time: str, device_category: DeviceCategory, + device_network: DeviceNetwork = None, resolution: Frequency = Frequency.RAW, device_numbers: list = None, remove_outliers: bool = True, ) -> pd.DataFrame: """ - Extracts sensor measurements from AirQo devices recorded between specified date and time ranges. + Extracts sensor measurements from network devices recorded between specified date and time ranges. Retrieves sensor data from Thingspeak API for devices belonging to the specified device category (BAM or low-cost sensors). Optionally filters data by specific device numbers and removes outliers if requested. @@ -425,7 +426,9 @@ def extract_devices_data( airqo_api = AirQoApi() data_source_api = DataSourcesApis() - devices = airqo_api.get_devices_by_network(device_category=device_category) + devices = airqo_api.get_devices_by_network( + device_network=device_network, device_category=device_category + ) if not devices: logger.exception( "Failed to fetch devices. Please check if devices are deployed" @@ -571,7 +574,7 @@ def clean_bam_data(data: pd.DataFrame) -> pd.DataFrame: subset=["timestamp", "device_number"], keep="first", inplace=True ) - data["tenant"] = str(Tenant.AIRQO) + data["network"] = DeviceNetwork.AIRQO data.rename(columns=configuration.AIRQO_BAM_MAPPING, inplace=True) big_query_api = BigQueryApi() @@ -899,12 +902,12 @@ def merge_aggregated_weather_data( @staticmethod def extract_devices_deployment_logs() -> pd.DataFrame: airqo_api = AirQoApi() - devices = airqo_api.get_devices(network=str(Tenant.AIRQO)) + devices = airqo_api.get_devices(network=DeviceNetwork.AIRQO) devices_history = pd.DataFrame() for device in devices: try: maintenance_logs = airqo_api.get_maintenance_logs( - tenant="airqo", + network="airqo", device=device.get("name", None), activity_type="deployment", ) diff --git a/src/workflows/airqo_etl_utils/constants.py b/src/workflows/airqo_etl_utils/constants.py index cbec15ab49..3398dae310 100644 --- a/src/workflows/airqo_etl_utils/constants.py +++ b/src/workflows/airqo_etl_utils/constants.py @@ -47,6 +47,32 @@ def category_from_str(category: str): return DeviceCategory.LOW_COST +class DeviceNetwork(Enum): + """ + METONE -> Us embassy + AIRQO -> Airqo + URBANBETTER -> Urban Better + IQAIR -> Iqair + """ + + METONE = 1 + AIRQO = 2 + URBANBETTER = 3 + IQAIR = 4 + + def __str__(self): + if self == self.METONE: + return "metone" + elif self == self.AIRQO: + return "airqo" + elif self == self.URBANBETTER: + return "airbeam" + elif self == self.IQAIR: + return "iqair" + else: + raise LookupError("Invalid network supplied") + + class Frequency(Enum): """ RAW -> Raw current data returned from devices diff --git a/src/workflows/airqo_etl_utils/data_validator.py b/src/workflows/airqo_etl_utils/data_validator.py index 6b7c3af043..7bac7358af 100644 --- a/src/workflows/airqo_etl_utils/data_validator.py +++ b/src/workflows/airqo_etl_utils/data_validator.py @@ -274,6 +274,8 @@ def process_data_for_api(data: pd.DataFrame) -> list: cols = bigquery_api.get_columns(bigquery_api.hourly_measurements_table) cols.append("battery") data = DataValidationUtils.fill_missing_columns(data, cols=cols) + + # TODO Use DataValidation.format_data_types() to convert cleanup multipe columns. data["device_number"] = ( data["device_number"] .fillna("") diff --git a/src/workflows/dags/airqo_bam_measurements.py b/src/workflows/dags/airqo_bam_measurements.py index 148b0e72cd..05fe454fb9 100644 --- a/src/workflows/dags/airqo_bam_measurements.py +++ b/src/workflows/dags/airqo_bam_measurements.py @@ -6,11 +6,11 @@ from airflow.utils.dates import days_ago import pandas as pd from airqo_etl_utils.airqo_utils import AirQoDataUtils -from airqo_etl_utils.constants import DeviceCategory, DataType from airqo_etl_utils.date import DateUtils from airqo_etl_utils.bigquery_api import BigQueryApi from datetime import timedelta from airflow.exceptions import AirflowFailException +from airqo_etl_utils.constants import Frequency, DeviceNetwork, DeviceCategory, DataType @dag( @@ -22,8 +22,6 @@ start_date=days_ago(1), ) def airqo_bam_historical_measurements(): - from airqo_etl_utils.constants import Frequency - @task(provide_context=True, retries=3, retry_delay=timedelta(minutes=5)) def extract_bam_data(**kwargs) -> pd.DataFrame: start_date_time, end_date_time = DateUtils.get_dag_date_time_values( @@ -33,6 +31,7 @@ def extract_bam_data(**kwargs) -> pd.DataFrame: start_date_time=start_date_time, end_date_time=end_date_time, device_category=DeviceCategory.BAM, + device_network=DeviceNetwork.AIRQO, resolution=Frequency.HISTORICAL, ) @@ -80,13 +79,11 @@ def save_clean_bam_data(data: pd.DataFrame): ) def airqo_bam_realtime_measurements(): import pandas as pd - from airqo_etl_utils.constants import Frequency @task(provide_context=True, retries=3, retry_delay=timedelta(minutes=5)) def extract_bam_data(**kwargs): from airqo_etl_utils.airqo_utils import AirQoDataUtils from airqo_etl_utils.date import DateUtils - from airqo_etl_utils.constants import DeviceCategory start_date_time, end_date_time = DateUtils.get_query_date_time_values(**kwargs) @@ -94,13 +91,13 @@ def extract_bam_data(**kwargs): start_date_time=start_date_time, end_date_time=end_date_time, device_category=DeviceCategory.BAM, + device_network=DeviceNetwork.AIRQO, resolution=Frequency.RAW, ) @task(retries=3, retry_delay=timedelta(minutes=5)) def save_unclean_data(data: pd.DataFrame): from airqo_etl_utils.bigquery_api import BigQueryApi - from airqo_etl_utils.constants import DataType from airqo_etl_utils.airqo_utils import AirQoDataUtils data = AirQoDataUtils.format_data_for_bigquery( @@ -121,7 +118,6 @@ def clean_bam_data(data: pd.DataFrame): @task(retries=3, retry_delay=timedelta(minutes=5)) def save_clean_bam_data(data: pd.DataFrame): from airqo_etl_utils.bigquery_api import BigQueryApi - from airqo_etl_utils.constants import DataType from airqo_etl_utils.airqo_utils import AirQoDataUtils data = AirQoDataUtils.format_data_for_bigquery( @@ -137,9 +133,7 @@ def save_clean_bam_data(data: pd.DataFrame): def update_latest_data_topic(data: pd.DataFrame, **kwargs): from airqo_etl_utils.airqo_utils import AirQoDataUtils from airqo_etl_utils.message_broker_utils import MessageBrokerUtils - from airqo_etl_utils.constants import DeviceCategory from airqo_etl_utils.data_validator import DataValidationUtils - from airqo_etl_utils.constants import Tenant from datetime import datetime now = datetime.now() diff --git a/src/workflows/dags/airqo_measurements.py b/src/workflows/dags/airqo_measurements.py index b551293aeb..a3e7150dc5 100644 --- a/src/workflows/dags/airqo_measurements.py +++ b/src/workflows/dags/airqo_measurements.py @@ -2,7 +2,6 @@ from airqo_etl_utils.config import configuration from airqo_etl_utils.workflows_custom_utils import AirflowUtils -from airqo_etl_utils.constants import Frequency from airflow.exceptions import AirflowFailException from dag_docs import ( airqo_realtime_low_cost_measurements_doc, @@ -16,6 +15,7 @@ send_raw_measurements_to_bigquery_doc, extract_raw_airqo_gaseous_data_doc, ) +from airqo_etl_utils.constants import DeviceNetwork, DeviceCategory, Frequency from datetime import timedelta import logging @@ -102,7 +102,6 @@ def send_hourly_measurements_to_message_broker( ) -> None: from airqo_etl_utils.message_broker_utils import MessageBrokerUtils from airqo_etl_utils.data_validator import DataValidationUtils - from airqo_etl_utils.constants import Tenant from datetime import datetime now = datetime.now() @@ -151,7 +150,6 @@ def airqo_historical_raw_measurements(): def extract_raw_data(**kwargs): from airqo_etl_utils.date import DateUtils from airqo_etl_utils.airqo_utils import AirQoDataUtils - from airqo_etl_utils.constants import DeviceCategory start_date_time, end_date_time = DateUtils.get_dag_date_time_values( historical=True, days=2, **kwargs @@ -166,7 +164,6 @@ def extract_raw_data(**kwargs): @task() def clean_data_raw_data(data: pd.DataFrame): from airqo_etl_utils.airqo_utils import AirQoDataUtils - from airqo_etl_utils.constants import DeviceCategory return AirQoDataUtils.clean_low_cost_sensor_data( data=data, device_category=DeviceCategory.LOW_COST @@ -217,7 +214,6 @@ def load_data(airqo_data: pd.DataFrame): ) def airqo_cleanup_measurements(): import pandas as pd - from airqo_etl_utils.constants import Frequency @task(provide_context=True) def extract_raw_data(**kwargs) -> pd.DataFrame: @@ -307,7 +303,6 @@ def airqo_realtime_measurements(): ) def extract_raw_data(**kwargs) -> pd.DataFrame: from airqo_etl_utils.airqo_utils import AirQoDataUtils - from airqo_etl_utils.constants import DeviceCategory execution_date = kwargs["dag_run"].execution_date hour_of_day = execution_date - timedelta(hours=1) @@ -326,7 +321,6 @@ def extract_raw_data(**kwargs) -> pd.DataFrame: def clean_data_raw_data(data: pd.DataFrame) -> pd.DataFrame: # TODO Future possibility of adding gx data quality checks here to measure data quality from airqo_etl_utils.airqo_utils import AirQoDataUtils - from airqo_etl_utils.constants import DeviceCategory return AirQoDataUtils.clean_low_cost_sensor_data( data=data, device_category=DeviceCategory.LOW_COST @@ -394,7 +388,6 @@ def send_hourly_measurements_to_api(airqo_data: pd.DataFrame): def send_hourly_measurements_to_message_broker(data: pd.DataFrame, **kwargs): from airqo_etl_utils.message_broker_utils import MessageBrokerUtils from airqo_etl_utils.data_validator import DataValidationUtils - from airqo_etl_utils.constants import Tenant from datetime import datetime now = datetime.now() @@ -441,7 +434,6 @@ def send_raw_measurements_to_bigquery(airqo_data: pd.DataFrame): def update_latest_data_topic(data: pd.DataFrame, **kwargs): from airqo_etl_utils.airqo_utils import AirQoDataUtils from airqo_etl_utils.message_broker_utils import MessageBrokerUtils - from airqo_etl_utils.constants import DeviceCategory, Tenant from airqo_etl_utils.data_validator import DataValidationUtils from datetime import datetime @@ -503,7 +495,6 @@ def airqo_raw_data_measurements(): ) def extract_raw_data(**kwargs): from airqo_etl_utils.airqo_utils import AirQoDataUtils - from airqo_etl_utils.constants import DeviceCategory from airqo_etl_utils.date import date_to_str_hours from datetime import datetime, timedelta @@ -524,7 +515,6 @@ def extract_raw_data(**kwargs): ) def clean_data_raw_data(data: pd.DataFrame): from airqo_etl_utils.airqo_utils import AirQoDataUtils - from airqo_etl_utils.constants import DeviceCategory return AirQoDataUtils.clean_low_cost_sensor_data( data=data, device_category=DeviceCategory.LOW_COST @@ -567,7 +557,6 @@ def airqo_gaseous_realtime_measurements(): ) def extract_raw_data(**kwargs) -> pd.DataFrame: from airqo_etl_utils.airqo_utils import AirQoDataUtils - from airqo_etl_utils.constants import DeviceCategory from airqo_etl_utils.date import date_to_str_hours from datetime import datetime, timedelta @@ -580,6 +569,7 @@ def extract_raw_data(**kwargs) -> pd.DataFrame: start_date_time=start_date_time, end_date_time=end_date_time, device_category=DeviceCategory.LOW_COST_GAS, + device_network=DeviceNetwork.AIRQO, remove_outliers=False, ) @@ -588,7 +578,6 @@ def extract_raw_data(**kwargs) -> pd.DataFrame: ) def clean_data_raw_data(data: pd.DataFrame): from airqo_etl_utils.airqo_utils import AirQoDataUtils - from airqo_etl_utils.constants import DeviceCategory return AirQoDataUtils.clean_low_cost_sensor_data( data=data, device_category=DeviceCategory.LOW_COST_GAS