Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update fix/clean up #4179

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 17 additions & 12 deletions src/workflows/airqo_etl_utils/airnow_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from .airnow_api import AirNowApi
from .airqo_api import AirQoApi
from .constants import DataSource, DeviceCategory, Frequency
from .constants import DataSource, DeviceCategory, Frequency, DeviceNetwork
from .data_validator import DataValidationUtils
from .date import str_to_date, date_to_str
from .utils import Utils
Expand Down Expand Up @@ -68,7 +68,9 @@ def extract_bam_data(start_date_time: str, end_date_time: str) -> pd.DataFrame:
Raises:
ValueError: If no devices are found for the BAM network or if no data is returned for the specified date range.
"""
devices = AirQoApi().get_devices_by_network(DeviceCategory.BAM)
devices = AirQoApi().get_devices_by_network(
device_network=DeviceNetwork.METONE, device_category=DeviceCategory.BAM
)
bam_data = pd.DataFrame()

if not devices:
Expand Down Expand Up @@ -119,7 +121,12 @@ def process_bam_data(data: pd.DataFrame) -> pd.DataFrame:
"""
air_now_data = []

devices = AirQoApi().get_devices(device_category=DeviceCategory.BAM)
devices = AirQoApi().get_devices_by_network(
device_network=DeviceNetwork.METONE, device_category=DeviceCategory.BAM
)

# Initialize pollutant values (note: pm10 and no2 are not always present)
pollutant_value = {"pm2_5": None, "pm10": None, "no2": None}

# Precompute device mapping for faster lookup
device_mapping = {}
Expand All @@ -129,17 +136,15 @@ def process_bam_data(data: pd.DataFrame) -> pd.DataFrame:

for _, row in data.iterrows():
try:
device_id = str(row["FullAQSCode"])
# Temp external device id
device_id_ = str(row["FullAQSCode"])

# Lookup device details based on FullAQSCode
device_details = device_mapping.get(device_id)
device_details = device_mapping.get(device_id_)
if not device_details:
logger.exception(f"Device with ID {device_id} not found")
logger.exception(f"Device with ID {device_id_} not found")
continue

# Initialize pollutant values (note: pm10 and no2 are not always present)
pollutant_value = {"pm2_5": None, "pm10": None, "no2": None}

# Get the corresponding pollutant value for the current parameter
parameter_col_name = AirnowDataUtils.parameter_column_name(
row["Parameter"]
Expand All @@ -148,16 +153,16 @@ def process_bam_data(data: pd.DataFrame) -> pd.DataFrame:
pollutant_value[parameter_col_name] = row["Value"]

if row["network"] != device_details.get("network"):
logger.exception(f"Network mismatch for device ID {device_id}")
logger.exception(f"Network mismatch for device ID {device_id_}")
continue

air_now_data.append(
{
"timestamp": row["UTC"],
"network": row["network"],
"site_id": device_details.get("site_id"),
"device_id": device_details.get("device_id"),
"mongo_id": device_details.get("mongo_id"),
"device_id": device_details.get("name"),
"mongo_id": device_details.get("_id"),
"device_number": device_details.get("device_number"),
"frequency": str(Frequency.HOURLY),
"latitude": row["Latitude"],
Expand Down
27 changes: 16 additions & 11 deletions src/workflows/airqo_etl_utils/airqo_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ def __init__(self) -> None:

def save_events(self, measurements: List) -> None:
# Temporarily disabling usage of the API to store measurements.
if "staging" in self.AIRQO_BASE_URL_V2.lower():
return
# if "staging" in self.AIRQO_BASE_URL_V2.lower():
# return
# TODO Findout if there is a bulk post api option greater than 5.
for i in range(0, len(measurements), int(configuration.POST_EVENTS_BODY_SIZE)):
data = measurements[i : i + int(configuration.POST_EVENTS_BODY_SIZE)]
Expand All @@ -41,7 +41,7 @@ def save_events(self, measurements: List) -> None:
)

def get_maintenance_logs(
self, tenant: str, device: str, activity_type: str = None
self, network: str, device: str, activity_type: str = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Parameter rename from tenant to network needs documentation update.

The parameter has been renamed from tenant to network, but the docstring still references tenant. This inconsistency could confuse future developers.

Apply this diff to update the documentation:

        """
        Retrieve devices given a tenant and device category.

        Args:
-           - tenant: An Enum that represents site ownership.
+           - network: A string that represents the device network/manufacturer.
            - device: The name of the device.
            - activity_type: Defines if the activity logged is a maintenance or deployment activity. If not supplied returns all activities for the given device.

Also applies to: 73-73

) -> List:
"""
Retrieve devices given a tenant and device category.
Expand Down Expand Up @@ -70,7 +70,7 @@ def get_maintenance_logs(
]
"""
# Why is tenant still a parameter when it is being overriden.
params = {"tenant": str(Tenant.AIRQO), "device": device}
params = {"network": network, "device": device}

if activity_type:
params["activity_type"] = activity_type
Expand Down Expand Up @@ -241,12 +241,14 @@ def get_networks(
return networks, exception_message

def get_devices_by_network(
self, device_category: DeviceCategory = None
self, device_network: str = None, device_category: DeviceCategory = None
) -> List[Dict[str, Any]]:
"""
Retrieve devices by network based on the specified device category.

Args: device_category (DeviceCategory, optional): The category of devices to retrieve. Defaults to `DeviceCategory.LOW_COST`.
Args:
network (str): This defines the network or manufacture of the device(s) to retrieve. Defaults to `None`. If not passed, devices from all networks are returned.
device_category (DeviceCategory, optional): The category of devices to retrieve. Defaults to `None`. If not passed, devices from all categories are returned.
Comment on lines +244 to +251
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Documentation improvement needed for device_network parameter.

The docstring has a mismatch between the parameter name in the signature (device_network) and the documentation (network).

Apply this diff to fix the documentation:

        """
        Retrieve devices by network based on the specified device category.

        Args:
-           network (str): This defines the network or manufacture of the device(s) to retrieve. Defaults to `None`. If not passed, devices from all networks are returned.
+           device_network (str): This defines the network or manufacturer of the device(s) to retrieve. Defaults to `None`. If not passed, devices from all networks are returned.
            device_category (DeviceCategory, optional): The category of devices to retrieve. Defaults to `None`. If not passed, devices from all categories are returned.
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
self, device_network: str = None, device_category: DeviceCategory = None
) -> List[Dict[str, Any]]:
"""
Retrieve devices by network based on the specified device category.
Args: device_category (DeviceCategory, optional): The category of devices to retrieve. Defaults to `DeviceCategory.LOW_COST`.
Args:
network (str): This defines the network or manufacture of the device(s) to retrieve. Defaults to `None`. If not passed, devices from all networks are returned.
device_category (DeviceCategory, optional): The category of devices to retrieve. Defaults to `None`. If not passed, devices from all categories are returned.
self, device_network: str = None, device_category: DeviceCategory = None
) -> List[Dict[str, Any]]:
"""
Retrieve devices by network based on the specified device category.
Args:
device_network (str): This defines the network or manufacturer of the device(s) to retrieve. Defaults to `None`. If not passed, devices from all networks are returned.
device_category (DeviceCategory, optional): The category of devices to retrieve. Defaults to `None`. If not passed, devices from all categories are returned.


Returns:
List[Dict[str, Any]]: A List of dictionaries containing the details of the devices. The dictionary has the following structure.
Expand Down Expand Up @@ -284,12 +286,15 @@ def get_devices_by_network(
]
"""
devices: List[Dict[str, Any]] = []
networks, error = self.get_networks()
networks: List[str] = []
params: Dict = {}

if error:
logger.error(f"Error while fetching networks: {error}")
return devices
if device_network:
networks.append({"net_name": device_network})
else:
networks, error = self.get_networks()
if error:
logger.error(f"Error while fetching networks: {error}")
return devices

if device_category:
params["category"] = str(device_category)
Expand Down
21 changes: 12 additions & 9 deletions src/workflows/airqo_etl_utils/airqo_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from .config import configuration
from .constants import (
DeviceCategory,
Tenant,
DeviceNetwork,
Frequency,
DataSource,
DataType,
Expand Down Expand Up @@ -59,7 +59,7 @@ def extract_uncalibrated_data(start_date_time, end_date_time) -> pd.DataFrame:
null_cols=["pm2_5_calibrated_value"],
start_date_time=start_date_time,
end_date_time=end_date_time,
network=str(Tenant.AIRQO),
network=DeviceNetwork.AIRQO,
)

return DataValidationUtils.remove_outliers(hourly_uncalibrated_data)
Expand All @@ -79,7 +79,7 @@ def extract_data_from_bigquery(
table=table,
start_date_time=start_date_time,
end_date_time=end_date_time,
network=str(Tenant.AIRQO),
network=DeviceNetwork.AIRQO,
)

return DataValidationUtils.remove_outliers(raw_data)
Expand Down Expand Up @@ -392,7 +392,7 @@ def merge_aggregated_mobile_devices_data_and_weather_data(
@staticmethod
def restructure_airqo_mobile_data_for_bigquery(data: pd.DataFrame) -> pd.DataFrame:
data["timestamp"] = pd.to_datetime(data["timestamp"])
data["tenant"] = "airqo"
data["network"] = "airqo"
big_query_api = BigQueryApi()
cols = big_query_api.get_columns(
table=big_query_api.airqo_mobile_measurements_table
Expand All @@ -404,12 +404,13 @@ def extract_devices_data(
start_date_time: str,
end_date_time: str,
device_category: DeviceCategory,
device_network: DeviceNetwork = None,
resolution: Frequency = Frequency.RAW,
device_numbers: list = None,
remove_outliers: bool = True,
) -> pd.DataFrame:
"""
Extracts sensor measurements from AirQo devices recorded between specified date and time ranges.
Extracts sensor measurements from network devices recorded between specified date and time ranges.

Retrieves sensor data from Thingspeak API for devices belonging to the specified device category (BAM or low-cost sensors).
Optionally filters data by specific device numbers and removes outliers if requested.
Expand All @@ -425,7 +426,9 @@ def extract_devices_data(
airqo_api = AirQoApi()
data_source_api = DataSourcesApis()

devices = airqo_api.get_devices_by_network(device_category=device_category)
devices = airqo_api.get_devices_by_network(
device_network=device_network, device_category=device_category
)
if not devices:
logger.exception(
"Failed to fetch devices. Please check if devices are deployed"
Expand Down Expand Up @@ -571,7 +574,7 @@ def clean_bam_data(data: pd.DataFrame) -> pd.DataFrame:
subset=["timestamp", "device_number"], keep="first", inplace=True
)

data["tenant"] = str(Tenant.AIRQO)
data["network"] = DeviceNetwork.AIRQO
data.rename(columns=configuration.AIRQO_BAM_MAPPING, inplace=True)

big_query_api = BigQueryApi()
Expand Down Expand Up @@ -899,12 +902,12 @@ def merge_aggregated_weather_data(
@staticmethod
def extract_devices_deployment_logs() -> pd.DataFrame:
airqo_api = AirQoApi()
devices = airqo_api.get_devices(network=str(Tenant.AIRQO))
devices = airqo_api.get_devices(network=DeviceNetwork.AIRQO)
devices_history = pd.DataFrame()
for device in devices:
try:
maintenance_logs = airqo_api.get_maintenance_logs(
tenant="airqo",
network="airqo",
device=device.get("name", None),
activity_type="deployment",
)
Expand Down
26 changes: 26 additions & 0 deletions src/workflows/airqo_etl_utils/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,32 @@ def category_from_str(category: str):
return DeviceCategory.LOW_COST


class DeviceNetwork(Enum):
"""
METONE -> Us embassy
AIRQO -> Airqo
URBANBETTER -> Urban Better
IQAIR -> Iqair
"""

METONE = 1
AIRQO = 2
URBANBETTER = 3
IQAIR = 4

def __str__(self):
if self == self.METONE:
return "metone"
elif self == self.AIRQO:
return "airqo"
elif self == self.URBANBETTER:
return "airbeam"
elif self == self.IQAIR:
return "iqair"
else:
raise LookupError("Invalid network supplied")


class Frequency(Enum):
"""
RAW -> Raw current data returned from devices
Expand Down
2 changes: 2 additions & 0 deletions src/workflows/airqo_etl_utils/data_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,8 @@ def process_data_for_api(data: pd.DataFrame) -> list:
cols = bigquery_api.get_columns(bigquery_api.hourly_measurements_table)
cols.append("battery")
data = DataValidationUtils.fill_missing_columns(data, cols=cols)

# TODO Use DataValidation.format_data_types() to convert cleanup multipe columns.
data["device_number"] = (
data["device_number"]
.fillna("")
Expand Down
12 changes: 3 additions & 9 deletions src/workflows/dags/airqo_bam_measurements.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@
from airflow.utils.dates import days_ago
import pandas as pd
from airqo_etl_utils.airqo_utils import AirQoDataUtils
from airqo_etl_utils.constants import DeviceCategory, DataType
from airqo_etl_utils.date import DateUtils
from airqo_etl_utils.bigquery_api import BigQueryApi
from datetime import timedelta
from airflow.exceptions import AirflowFailException
from airqo_etl_utils.constants import Frequency, DeviceNetwork, DeviceCategory, DataType


@dag(
Expand All @@ -22,8 +22,6 @@
start_date=days_ago(1),
)
def airqo_bam_historical_measurements():
from airqo_etl_utils.constants import Frequency

@task(provide_context=True, retries=3, retry_delay=timedelta(minutes=5))
def extract_bam_data(**kwargs) -> pd.DataFrame:
start_date_time, end_date_time = DateUtils.get_dag_date_time_values(
Expand All @@ -33,6 +31,7 @@ def extract_bam_data(**kwargs) -> pd.DataFrame:
start_date_time=start_date_time,
end_date_time=end_date_time,
device_category=DeviceCategory.BAM,
device_network=DeviceNetwork.AIRQO,
resolution=Frequency.HISTORICAL,
)

Expand Down Expand Up @@ -80,27 +79,25 @@ def save_clean_bam_data(data: pd.DataFrame):
)
def airqo_bam_realtime_measurements():
import pandas as pd
from airqo_etl_utils.constants import Frequency

@task(provide_context=True, retries=3, retry_delay=timedelta(minutes=5))
def extract_bam_data(**kwargs):
from airqo_etl_utils.airqo_utils import AirQoDataUtils
from airqo_etl_utils.date import DateUtils
from airqo_etl_utils.constants import DeviceCategory

start_date_time, end_date_time = DateUtils.get_query_date_time_values(**kwargs)

return AirQoDataUtils.extract_devices_data(
start_date_time=start_date_time,
end_date_time=end_date_time,
device_category=DeviceCategory.BAM,
device_network=DeviceNetwork.AIRQO,
resolution=Frequency.RAW,
)

@task(retries=3, retry_delay=timedelta(minutes=5))
def save_unclean_data(data: pd.DataFrame):
from airqo_etl_utils.bigquery_api import BigQueryApi
from airqo_etl_utils.constants import DataType
from airqo_etl_utils.airqo_utils import AirQoDataUtils

data = AirQoDataUtils.format_data_for_bigquery(
Expand All @@ -121,7 +118,6 @@ def clean_bam_data(data: pd.DataFrame):
@task(retries=3, retry_delay=timedelta(minutes=5))
def save_clean_bam_data(data: pd.DataFrame):
from airqo_etl_utils.bigquery_api import BigQueryApi
from airqo_etl_utils.constants import DataType
from airqo_etl_utils.airqo_utils import AirQoDataUtils

data = AirQoDataUtils.format_data_for_bigquery(
Expand All @@ -137,9 +133,7 @@ def save_clean_bam_data(data: pd.DataFrame):
def update_latest_data_topic(data: pd.DataFrame, **kwargs):
from airqo_etl_utils.airqo_utils import AirQoDataUtils
from airqo_etl_utils.message_broker_utils import MessageBrokerUtils
from airqo_etl_utils.constants import DeviceCategory
from airqo_etl_utils.data_validator import DataValidationUtils
from airqo_etl_utils.constants import Tenant
from datetime import datetime

now = datetime.now()
Expand Down
Loading
Loading