diff --git a/k8s/analytics/values-prod.yaml b/k8s/analytics/values-prod.yaml index 74304224cb..478e8a4885 100644 --- a/k8s/analytics/values-prod.yaml +++ b/k8s/analytics/values-prod.yaml @@ -8,7 +8,7 @@ images: celeryWorker: eu.gcr.io/airqo-250220/airqo-analytics-celery-worker reportJob: eu.gcr.io/airqo-250220/airqo-analytics-report-job devicesSummaryJob: eu.gcr.io/airqo-250220/airqo-analytics-devices-summary-job - tag: prod-24272a02-1731937745 + tag: prod-9ef437cc-1732110711 api: name: airqo-analytics-api label: analytics-api @@ -17,8 +17,8 @@ api: podAnnotations: {} resources: limits: - cpu: 100m - memory: 600Mi + cpu: 1000m + memory: 2000Mi requests: cpu: 10m memory: 250Mi diff --git a/k8s/analytics/values-stage.yaml b/k8s/analytics/values-stage.yaml index c4dca0a5e7..0a818364ea 100644 --- a/k8s/analytics/values-stage.yaml +++ b/k8s/analytics/values-stage.yaml @@ -8,7 +8,7 @@ images: celeryWorker: eu.gcr.io/airqo-250220/airqo-stage-analytics-celery-worker reportJob: eu.gcr.io/airqo-250220/airqo-stage-analytics-report-job devicesSummaryJob: eu.gcr.io/airqo-250220/airqo-stage-analytics-devices-summary-job - tag: stage-5ba65cfa-1731092285 + tag: stage-ddf20bb7-1732102979 api: name: airqo-stage-analytics-api label: sta-alytics-api @@ -17,8 +17,8 @@ api: podAnnotations: {} resources: limits: - cpu: 100m - memory: 600Mi + cpu: 500m + memory: 1000Mi requests: cpu: 10m memory: 250Mi diff --git a/k8s/auth-service/values-prod.yaml b/k8s/auth-service/values-prod.yaml index 7c5b7f13cf..f15c1e3ebf 100644 --- a/k8s/auth-service/values-prod.yaml +++ b/k8s/auth-service/values-prod.yaml @@ -6,7 +6,7 @@ app: replicaCount: 3 image: repository: eu.gcr.io/airqo-250220/airqo-auth-api - tag: prod-24272a02-1731937745 + tag: prod-9ef437cc-1732110711 nameOverride: '' fullnameOverride: '' podAnnotations: {} diff --git a/k8s/calibrate/values-prod.yaml b/k8s/calibrate/values-prod.yaml index f2358e5f1f..61d33022a4 100644 --- a/k8s/calibrate/values-prod.yaml +++ b/k8s/calibrate/values-prod.yaml @@ -6,11 +6,11 @@ app: initContainer: image: repository: eu.gcr.io/airqo-250220/airqo-calibrate-pickle-file - tag: prod-24272a02-1731937745 + tag: prod-8aa5ca9a-1732095343 replicaCount: 3 image: repository: eu.gcr.io/airqo-250220/airqo-calibrate-api - tag: prod-24272a02-1731937745 + tag: prod-8aa5ca9a-1732095343 nameOverride: '' fullnameOverride: '' podAnnotations: {} diff --git a/k8s/device-registry/values-prod.yaml b/k8s/device-registry/values-prod.yaml index 3a08fe5f0a..85d58a560d 100644 --- a/k8s/device-registry/values-prod.yaml +++ b/k8s/device-registry/values-prod.yaml @@ -6,7 +6,7 @@ app: replicaCount: 3 image: repository: eu.gcr.io/airqo-250220/airqo-device-registry-api - tag: prod-24272a02-1731937745 + tag: prod-9ef437cc-1732110711 nameOverride: '' fullnameOverride: '' podAnnotations: {} diff --git a/k8s/device-registry/values-stage.yaml b/k8s/device-registry/values-stage.yaml index d1aaa22a88..dd2c04fd7d 100644 --- a/k8s/device-registry/values-stage.yaml +++ b/k8s/device-registry/values-stage.yaml @@ -6,7 +6,7 @@ app: replicaCount: 2 image: repository: eu.gcr.io/airqo-250220/airqo-stage-device-registry-api - tag: stage-4292371f-1731851989 + tag: stage-09945614-1732110667 nameOverride: '' fullnameOverride: '' podAnnotations: {} diff --git a/k8s/exceedance/values-prod-airqo.yaml b/k8s/exceedance/values-prod-airqo.yaml index 1cd8bd092d..d7857a926d 100644 --- a/k8s/exceedance/values-prod-airqo.yaml +++ b/k8s/exceedance/values-prod-airqo.yaml @@ -4,6 +4,6 @@ app: configmap: env-exceedance-production image: repository: eu.gcr.io/airqo-250220/airqo-exceedance-job - tag: prod-24272a02-1731937745 + tag: prod-9ef437cc-1732110711 nameOverride: '' fullnameOverride: '' diff --git a/k8s/exceedance/values-prod-kcca.yaml b/k8s/exceedance/values-prod-kcca.yaml index 57a49574db..5c224b6857 100644 --- a/k8s/exceedance/values-prod-kcca.yaml +++ b/k8s/exceedance/values-prod-kcca.yaml @@ -4,6 +4,6 @@ app: configmap: env-exceedance-production image: repository: eu.gcr.io/airqo-250220/kcca-exceedance-job - tag: prod-24272a02-1731937745 + tag: prod-9ef437cc-1732110711 nameOverride: '' fullnameOverride: '' diff --git a/k8s/locate/values-prod.yaml b/k8s/locate/values-prod.yaml index d96723327a..2a64afcf72 100644 --- a/k8s/locate/values-prod.yaml +++ b/k8s/locate/values-prod.yaml @@ -6,7 +6,7 @@ app: replicaCount: 3 image: repository: eu.gcr.io/airqo-250220/airqo-locate-api - tag: prod-24272a02-1731937745 + tag: prod-9ef437cc-1732110711 nameOverride: '' fullnameOverride: '' podAnnotations: {} diff --git a/k8s/predict/values-prod.yaml b/k8s/predict/values-prod.yaml index c6e89c3f43..eac19ba34e 100644 --- a/k8s/predict/values-prod.yaml +++ b/k8s/predict/values-prod.yaml @@ -7,7 +7,7 @@ images: predictJob: eu.gcr.io/airqo-250220/airqo-predict-job trainJob: eu.gcr.io/airqo-250220/airqo-train-job predictPlaces: eu.gcr.io/airqo-250220/airqo-predict-places-air-quality - tag: prod-24272a02-1731937745 + tag: prod-9ef437cc-1732110711 api: name: airqo-prediction-api label: prediction-api diff --git a/k8s/website/values-stage.yaml b/k8s/website/values-stage.yaml index 0fed1fa0e1..45a3fa7f09 100644 --- a/k8s/website/values-stage.yaml +++ b/k8s/website/values-stage.yaml @@ -6,7 +6,7 @@ app: replicaCount: 2 image: repository: eu.gcr.io/airqo-250220/airqo-stage-website-api - tag: stage-3fbb8b49-1731943981 + tag: stage-18f9b58d-1731945154 nameOverride: '' fullnameOverride: '' podAnnotations: {} diff --git a/src/analytics/api/models/events.py b/src/analytics/api/models/events.py index f527889cc8..4061402faa 100644 --- a/src/analytics/api/models/events.py +++ b/src/analytics/api/models/events.py @@ -15,6 +15,11 @@ class EventsModel(BasePyMongoModel): + """ + This class manages data retrieval and query construction for events data, integrating device, site, + and airqloud information for specified pollutants and weather fields. + """ + BIGQUERY_AIRQLOUDS_SITES = f"`{CONFIGURATIONS.BIGQUERY_AIRQLOUDS_SITES}`" BIGQUERY_AIRQLOUDS = f"`{CONFIGURATIONS.BIGQUERY_AIRQLOUDS}`" BIGQUERY_GRIDS = f"`{CONFIGURATIONS.BIGQUERY_GRIDS}`" @@ -37,9 +42,353 @@ class EventsModel(BasePyMongoModel): DEVICES_SUMMARY_TABLE = CONFIGURATIONS.DEVICES_SUMMARY_TABLE def __init__(self, tenant): + """ + Initializes the EventsModel with default settings and mappings for limit thresholds, + and specifies collections and BigQuery table references. + + Args: + tenant (str): The tenant identifier for managing database collections. + """ self.limit_mapper = {"pm2_5": 500.5, "pm10": 604.5, "no2": 2049} + self.sites_table = self.BIGQUERY_SITES + self.airqlouds_sites_table = self.BIGQUERY_AIRQLOUDS_SITES + self.devices_table = self.BIGQUERY_DEVICES + self.airqlouds_table = self.BIGQUERY_AIRQLOUDS super().__init__(tenant, collection_name="events") + @property + def device_info_query(self): + """Generates a device information query including site_id, tenant, and approximate location details.""" + return ( + f"{self.devices_table}.site_id AS site_id, " + f"{self.devices_table}.tenant AS tenant " + ) + + @property + def device_info_query_airqloud(self): + """Generates a device information query specifically for airqlouds, excluding the site_id.""" + return f"{self.devices_table}.tenant AS tenant " + + @property + def site_info_query(self): + """Generates a site information query to retrieve site name and approximate location details.""" + return f"{self.sites_table}.name AS site_name " + + @property + def airqloud_info_query(self): + """Generates an Airqloud information query to retrieve the airqloud name.""" + return f"{self.airqlouds_table}.name AS airqloud_name" + + def add_device_join(self, data_query, filter_clause=""): + """ + Joins device information with a given data query based on device_name. + + Args: + data_query (str): The data query to join with device information. + filter_clause (str): Optional SQL filter clause. + + Returns: + str: Modified query with device join. + """ + return ( + f"SELECT {self.device_info_query}, data.* " + f"FROM {self.devices_table} " + f"RIGHT JOIN ({data_query}) data ON data.device_name = {self.devices_table}.device_id " + f"{filter_clause}" + ) + + def add_device_join_to_airqlouds(self, data_query, filter_clause=""): + """ + Joins device information with airqloud data based on site_id. + + Args: + data_query (str): The data query to join with airqloud device information. + filter_clause (str): Optional SQL filter clause. + + Returns: + str: Modified query with device-airqloud join. + """ + return ( + f"SELECT {self.device_info_query_airqloud}, data.* " + f"FROM {self.devices_table} " + f"RIGHT JOIN ({data_query}) data ON data.site_id = {self.devices_table}.site_id " + f"{filter_clause}" + ) + + def add_site_join(self, data_query): + """ + Joins site information with the given data query based on site_id. + + Args: + data_query (str): The data query to join with site information. + + Returns: + str: Modified query with site join. + """ + return ( + f"SELECT {self.site_info_query}, data.* " + f"FROM {self.sites_table} " + f"RIGHT JOIN ({data_query}) data ON data.site_id = {self.sites_table}.id " + ) + + def add_airqloud_join(self, data_query): + """ + Joins Airqloud information with the provided data query based on airqloud_id. + + Args: + data_query (str): The data query to join with Airqloud information. + + Returns: + str: Modified query with Airqloud join. + """ + return ( + f"SELECT {self.airqloud_info_query}, data.* " + f"FROM {self.airqlouds_table} " + f"RIGHT JOIN ({data_query}) data ON data.airqloud_id = {self.airqlouds_table}.id " + ) + + def get_time_grouping(self, frequency): + """ + Determines the appropriate time grouping fields based on the frequency. + + Args: + frequency (str): Frequency like 'raw', 'daily', 'hourly', 'weekly', etc. + + Returns: + str: The time grouping clause for the SQL query. + """ + grouping_map = { + "weekly": "TIMESTAMP_TRUNC(timestamp, WEEK(MONDAY)) AS week", + "monthly": "TIMESTAMP_TRUNC(timestamp, MONTH) AS month", + "yearly": "EXTRACT(YEAR FROM timestamp) AS year", + } + + return grouping_map.get(frequency, "timestamp") + + def get_device_query( + self, + data_table, + filter_value, + pollutants_query, + bam_pollutants_query, + time_grouping, + start_date, + end_date, + frequency, + ): + """ + Constructs a SQL query to retrieve data for specific devices. + + Args: + data_table (str): The name of the data table containing measurements. + filter_value (str): The list of device IDs to filter by. + pollutants_query (str): The SQL query for standard pollutants. + bam_pollutants_query (str): The SQL query for BAM pollutants. + time_grouping (str): The time grouping clause based on frequency. + start_date (str): The start date for the query range. + end_date (str): The end date for the query range. + frequency (str): The frequency of the data (e.g., 'raw', 'daily', 'weekly'). + + Returns: + str: The SQL query string to retrieve device-specific data, + including BAM data if applicable. + """ + query = ( + f"{pollutants_query}, {time_grouping}, {self.device_info_query}, {self.devices_table}.name AS device_name " + f"FROM {data_table} " + f"JOIN {self.devices_table} ON {self.devices_table}.device_id = {data_table}.device_id " + f"WHERE {data_table}.timestamp BETWEEN '{start_date}' AND '{end_date}' " + f"AND {self.devices_table}.device_id IN UNNEST(@filter_value) " + ) + if frequency in ["weekly", "monthly", "yearly"]: + query += " GROUP BY ALL" + + query = self.add_site_join(query) + if frequency in ["hourly", "weekly", "monthly", "yearly"]: + bam_query = ( + f"{bam_pollutants_query}, {time_grouping}, {self.device_info_query}, {self.devices_table}.name AS device_name " + f"FROM {self.BIGQUERY_BAM_DATA} " + f"JOIN {self.devices_table} ON {self.devices_table}.device_id = {self.BIGQUERY_BAM_DATA}.device_id " + f"WHERE {self.BIGQUERY_BAM_DATA}.timestamp BETWEEN '{start_date}' AND '{end_date}' " + f"AND {self.devices_table}.device_id IN UNNEST(@filter_value) " + ) + if frequency in ["weekly", "monthly", "yearly"]: + bam_query += " GROUP BY ALL" + bam_query = self.add_site_join(bam_query) + query = f"{query} UNION ALL {bam_query}" + + return query + + def get_site_query( + self, + data_table, + filter_value, + pollutants_query, + time_grouping, + start_date, + end_date, + frequency, + ): + """ + Constructs a SQL query to retrieve data for specific sites. + + Args: + data_table (str): The name of the data table containing measurements. + filter_value (str): The list of site IDs to filter by. + pollutants_query (str): The SQL query for pollutants. + time_grouping (str): The time grouping clause based on frequency. + start_date (str): The start date for the query range. + end_date (str): The end date for the query range. + frequency (str): The frequency of the data (e.g., 'raw', 'daily', 'weekly'). + + Returns: + str: The SQL query string to retrieve site-specific data. + """ + query = ( + f"{pollutants_query}, {time_grouping}, {self.site_info_query}, {data_table}.device_id AS device_name " + f"FROM {data_table} " + f"JOIN {self.sites_table} ON {self.sites_table}.id = {data_table}.site_id " + f"WHERE {data_table}.timestamp BETWEEN '{start_date}' AND '{end_date}' " + f"AND {self.sites_table}.id IN UNNEST(@filter_value) " + ) + if frequency in ["weekly", "monthly", "yearly"]: + query += " GROUP BY ALL" + return self.add_device_join(query) + + def get_airqloud_query( + self, + data_table, + filter_value, + pollutants_query, + time_grouping, + start_date, + end_date, + frequency, + ): + """ + Constructs a SQL query to retrieve data for specific AirQlouds. + + Args: + data_table (str): The name of the data table containing measurements. + filter_value (str): The list of AirQloud IDs to filter by. + pollutants_query (str): The SQL query for pollutants. + time_grouping (str): The time grouping clause based on frequency. + start_date (str): The start date for the query range. + end_date (str): The end date for the query range. + frequency (str): The frequency of the data (e.g., 'raw', 'daily', 'weekly'). + + Returns: + str: The SQL query string to retrieve AirQloud-specific data. + """ + meta_data_query = ( + f"SELECT {self.airqlouds_sites_table}.airqloud_id, " + f"{self.airqlouds_sites_table}.site_id AS site_id " + f"FROM {self.airqlouds_sites_table} " + f"WHERE {self.airqlouds_sites_table}.airqloud_id IN UNNEST(@filter_value) " + ) + meta_data_query = self.add_airqloud_join(meta_data_query) + meta_data_query = self.add_site_join(meta_data_query) + meta_data_query = self.add_device_join_to_airqlouds(meta_data_query) + + query = ( + f"{pollutants_query}, {time_grouping}, {data_table}.device_id AS device_name, meta_data.* " + f"FROM {data_table} " + f"RIGHT JOIN ({meta_data_query}) meta_data ON meta_data.site_id = {data_table}.site_id " + f"WHERE {data_table}.timestamp BETWEEN '{start_date}' AND '{end_date}' " + ) + order_by_clause = ( + f"ORDER BY {data_table}.timestamp" + if frequency not in ["weekly", "monthly", "yearly"] + else "GROUP BY ALL" + ) + + return query + order_by_clause + + def build_query( + self, + data_table, + filter_type, + filter_value, + pollutants_query, + bam_pollutants_query, + start_date, + end_date, + frequency=None, + ): + """ + Builds a SQL query to retrieve pollutant and weather data with associated device or site information. + + Args: + data_table (str): The table name containing the main data records. + filter_type (str): Type of filter (e.g., devices, sites, airqlouds). + filter_value (list): Filter values corresponding to the filter type. + pollutants_query (str): Query for pollutant data. + bam_pollutants_query (str): Query for BAM pollutant data. + start_date (str): Start date for data retrieval. + end_date (str): End date for data retrieval. + frequency (str): Optional frequency filter. + + Returns: + str: Final constructed SQL query. + """ + time_grouping = self.get_time_grouping(frequency) + + # TODO Find a better way to do this. + if frequency in ["weekly", "monthly", "yearly"]: + # Drop datetime alias + pollutants_query = pollutants_query.replace( + f", FORMAT_DATETIME('%Y-%m-%d %H:%M:%S', {data_table}.timestamp) AS datetime", + "", + ) + bam_pollutants_query = bam_pollutants_query.replace( + f", FORMAT_DATETIME('%Y-%m-%d %H:%M:%S', {self.BIGQUERY_BAM_DATA}.timestamp) AS datetime", + "", + ) + + if filter_type in ["devices", "device_ids", "device_names"]: + return self.get_device_query( + data_table, + filter_value, + pollutants_query, + bam_pollutants_query, + time_grouping, + start_date, + end_date, + frequency, + ) + elif filter_type in ["sites", "site_names", "site_ids"]: + return self.get_site_query( + data_table, + filter_value, + pollutants_query, + time_grouping, + start_date, + end_date, + frequency, + ) + elif filter_type == "airqlouds": + return self.get_airqloud_query( + data_table, + filter_value, + pollutants_query, + time_grouping, + start_date, + end_date, + frequency, + ) + else: + raise ValueError("Invalid filter type") + + def get_columns(cls, mapping, frequency, data_type, decimal_places, data_table): + if frequency in ["weekly", "monthly", "yearly"]: + return [ + f"ROUND(AVG({data_table}.{col}), {decimal_places}) AS {col}" + for col in mapping + ] + return [ + f"ROUND({data_table}.{col}, {decimal_places}) AS {col}" for col in mapping + ] + @classmethod @cache.memoize() def download_from_bigquery( @@ -50,24 +399,36 @@ def download_from_bigquery( end_date, frequency, pollutants, + data_type, weather_fields, ) -> pd.DataFrame: + """ + Retrieves data from BigQuery with specified filters, frequency, pollutants, and weather fields. + + Args: + filter_type (str): Type of filter to apply (e.g., 'devices', 'sites', 'airqlouds'). + filter_value (list): Filter values (IDs or names) for the selected filter type. + start_date (str): Start date for the data query. + end_date (str): End date for the data query. + frequency (str): Data frequency (e.g., 'raw', 'daily', 'hourly'). + pollutants (list): List of pollutants to include in the data. + data_type (str): Type of data ('raw' or 'aggregated'). + weather_fields (list): List of weather fields to retrieve. + + Returns: + pd.DataFrame: Retrieved data in DataFrame format, with duplicates removed and sorted by timestamp. + """ decimal_places = cls.DATA_EXPORT_DECIMAL_PLACES - # Data sources - sites_table = cls.BIGQUERY_SITES - airqlouds_sites_table = cls.BIGQUERY_AIRQLOUDS_SITES - devices_table = cls.BIGQUERY_DEVICES - airqlouds_table = cls.BIGQUERY_AIRQLOUDS - - sorting_cols = ["site_id", "datetime", "device_name"] + sorting_cols = ["site_id", "device_name"] - # Define table mapping for dynamic selection based on frequency data_table = { "raw": cls.BIGQUERY_RAW_DATA, "daily": cls.BIGQUERY_DAILY_DATA, "hourly": cls.BIGQUERY_HOURLY_DATA, - }.get(frequency) + }.get( + frequency, cls.BIGQUERY_HOURLY_DATA + ) # Return hourly if the frequency is weekly, monthly yearly. Validation of frequency is done in data.py if not data_table: raise ValueError("Invalid frequency") @@ -76,180 +437,74 @@ def download_from_bigquery( bam_pollutant_columns = [] weather_columns = [] for pollutant in pollutants: - pollutant_mapping = BIGQUERY_FREQUENCY_MAPPER.get(frequency).get( - pollutant, [] + + if pollutant == "raw": + key = pollutant + else: + key = f"{pollutant}_{data_type}" + + pollutant_mapping = BIGQUERY_FREQUENCY_MAPPER.get(frequency, {}).get( + key, [] ) pollutant_columns.extend( - [ - f"ROUND({data_table}.{mapping}, {decimal_places}) AS {mapping}" - for mapping in pollutant_mapping - ] - ) - - if pollutant == "pm2_5": - bam_pollutant_columns.extend( - ["pm2_5 as pm2_5_raw_value", "pm2_5 as pm2_5_calibrated_value"] - ) - elif pollutant == "pm10": - bam_pollutant_columns.extend( - ["pm10 as pm10_raw_value", "pm10 as pm10_calibrated_value"] - ) - elif pollutant == "no2": - bam_pollutant_columns.extend( - ["no2 as no2_raw_value", "no2 as no2_calibrated_value"] + cls.get_columns( + cls, + pollutant_mapping, + frequency, + data_type, + decimal_places, + data_table, ) + ) - if weather_fields is not None: + # TODO Clean up by use using `get_columns` helper method + if pollutant in {"pm2_5", "pm10", "no2"}: + if frequency in ["weekly", "monthly", "yearly"]: + bam_pollutant_columns.extend( + [f"ROUND(AVG({pollutant}), {decimal_places}) AS {key}_value"] + ) + else: + bam_pollutant_columns.extend( + [f"ROUND({pollutant}, {decimal_places}) AS {key}_value"] + ) + # TODO Fix query when weather data is included. Currently failing + if weather_fields: for field in weather_fields: - weather_mapping = WEATHER_FIELDS_MAPPER.get(field, None) - weather_columns.extend( - [ - f"ROUND({data_table}.{weather_mapping}, {decimal_places}) AS {weather_mapping}" - ] - ) + weather_mapping = WEATHER_FIELDS_MAPPER.get(field) + if weather_mapping: + weather_columns.extend( + cls.get_columns( + cls, + weather_mapping, + frequency, + data_type, + decimal_places, + data_table, + ) + ) + + selected_columns = set(pollutant_columns + weather_columns) pollutants_query = ( - f" SELECT {', '.join(map(str, set(pollutant_columns + weather_columns)))} ," - f" FORMAT_DATETIME('%Y-%m-%d %H:%M:%S', {data_table}.timestamp) AS datetime " + f"SELECT {', '.join(selected_columns)}, " + f"FORMAT_DATETIME('%Y-%m-%d %H:%M:%S', {data_table}.timestamp) AS datetime " ) + + bam_selected_columns = set(bam_pollutant_columns) bam_pollutants_query = ( - f" SELECT {', '.join(map(str, set(bam_pollutant_columns)))} ," - f" FORMAT_DATETIME('%Y-%m-%d %H:%M:%S', {cls.BIGQUERY_BAM_DATA}.timestamp) AS datetime " + f"SELECT {', '.join(bam_selected_columns)}, " + f"FORMAT_DATETIME('%Y-%m-%d %H:%M:%S', {cls.BIGQUERY_BAM_DATA}.timestamp) AS datetime " + ) + instance = cls("build_query") + query = instance.build_query( + data_table, + filter_type, + filter_value, + pollutants_query, + bam_pollutants_query, + start_date, + end_date, + frequency=frequency, ) - - if filter_type == "devices": - # Adding device information, start and end times - query = ( - f" {pollutants_query} , " - f" {devices_table}.device_id AS device_name , " - f" {devices_table}.site_id AS site_id , " - f" {devices_table}.tenant AS tenant , " - f" {devices_table}.approximate_latitude AS device_latitude , " - f" {devices_table}.approximate_longitude AS device_longitude , " - f" FROM {data_table} " - f" JOIN {devices_table} ON {devices_table}.device_id = {data_table}.device_id " - f" WHERE {data_table}.timestamp >= '{start_date}' " - f" AND {data_table}.timestamp <= '{end_date}' " - f" AND {devices_table}.device_id IN UNNEST(@filter_value) " - ) - - bam_query = ( - f" {bam_pollutants_query} , " - f" {devices_table}.device_id AS device_name , " - f" {devices_table}.site_id AS site_id , " - f" {devices_table}.tenant AS tenant , " - f" {devices_table}.approximate_latitude AS device_latitude , " - f" {devices_table}.approximate_longitude AS device_longitude , " - f" FROM {cls.BIGQUERY_BAM_DATA} " - f" JOIN {devices_table} ON {devices_table}.device_id = {cls.BIGQUERY_BAM_DATA}.device_id " - f" WHERE {cls.BIGQUERY_BAM_DATA}.timestamp >= '{start_date}' " - f" AND {cls.BIGQUERY_BAM_DATA}.timestamp <= '{end_date}' " - f" AND {devices_table}.device_id IN UNNEST(@filter_value) " - ) - - # Adding site information - query = ( - f" SELECT " - f" {sites_table}.name AS site_name , " - f" {sites_table}.approximate_latitude AS site_latitude , " - f" {sites_table}.approximate_longitude AS site_longitude , " - f" data.* " - f" FROM {sites_table} " - f" RIGHT JOIN ({query}) data ON data.site_id = {sites_table}.id " - ) - - bam_query = ( - f" SELECT " - f" {sites_table}.name AS site_name , " - f" {sites_table}.approximate_latitude AS site_latitude , " - f" {sites_table}.approximate_longitude AS site_longitude , " - f" data.* " - f" FROM {sites_table} " - f" RIGHT JOIN ({bam_query}) data ON data.site_id = {sites_table}.id " - ) - - if frequency == "hourly": - query = f"{query} UNION ALL {bam_query}" - - elif filter_type == "sites": - # Adding site information, start and end times - query = ( - f" {pollutants_query} , " - f" {sites_table}.tenant AS tenant , " - f" {sites_table}.id AS site_id , " - f" {sites_table}.name AS site_name , " - f" {sites_table}.approximate_latitude AS site_latitude , " - f" {sites_table}.approximate_longitude AS site_longitude , " - f" {data_table}.device_id AS device_name , " - f" FROM {data_table} " - f" JOIN {sites_table} ON {sites_table}.id = {data_table}.site_id " - f" WHERE {data_table}.timestamp >= '{start_date}' " - f" AND {data_table}.timestamp <= '{end_date}' " - f" AND {sites_table}.id IN UNNEST(@filter_value) " - ) - - # Adding device information - query = ( - f" SELECT " - f" {devices_table}.approximate_latitude AS device_latitude , " - f" {devices_table}.approximate_longitude AS device_longitude , " - # f" {devices_table}.device_id AS device_name , " #this column creates a duplicate column - f" data.* " - f" FROM {devices_table} " - f" RIGHT JOIN ({query}) data ON data.device_name = {devices_table}.device_id " - ) - elif filter_type == "airqlouds": - sorting_cols = ["airqloud_id", "site_id", "datetime", "device_name"] - - meta_data_query = ( - f" SELECT {airqlouds_sites_table}.tenant , " - f" {airqlouds_sites_table}.airqloud_id , " - f" {airqlouds_sites_table}.site_id , " - f" FROM {airqlouds_sites_table} " - f" WHERE {airqlouds_sites_table}.airqloud_id IN UNNEST(@filter_value) " - ) - - # Adding airqloud information - meta_data_query = ( - f" SELECT " - f" {airqlouds_table}.name AS airqloud_name , " - f" meta_data.* " - f" FROM {airqlouds_table} " - f" RIGHT JOIN ({meta_data_query}) meta_data ON meta_data.airqloud_id = {airqlouds_table}.id " - ) - - # Adding site information - meta_data_query = ( - f" SELECT " - f" {sites_table}.approximate_latitude AS site_latitude , " - f" {sites_table}.approximate_longitude AS site_longitude , " - f" {sites_table}.name AS site_name , " - f" meta_data.* " - f" FROM {sites_table} " - f" RIGHT JOIN ({meta_data_query}) meta_data ON meta_data.site_id = {sites_table}.id " - ) - - # Adding device information - meta_data_query = ( - f" SELECT " - f" {devices_table}.approximate_latitude AS device_latitude , " - f" {devices_table}.approximate_longitude AS device_longitude , " - f" {devices_table}.device_id AS device_name , " - f" meta_data.* " - f" FROM {devices_table} " - f" RIGHT JOIN ({meta_data_query}) meta_data ON meta_data.site_id = {devices_table}.site_id " - ) - - # Adding start and end times - query = ( - f" {pollutants_query} , " - f" meta_data.* " - f" FROM {data_table} " - f" RIGHT JOIN ({meta_data_query}) meta_data ON meta_data.site_id = {data_table}.site_id " - f" WHERE {data_table}.timestamp >= '{start_date}' " - f" AND {data_table}.timestamp <= '{end_date}' " - f" ORDER BY {data_table}.timestamp " - ) - job_config = bigquery.QueryJobConfig( query_parameters=[ bigquery.ArrayQueryParameter("filter_value", "STRING", filter_value), @@ -269,9 +524,15 @@ def download_from_bigquery( if len(dataframe) == 0: return dataframe - dataframe.drop_duplicates( - subset=["datetime", "device_name"], inplace=True, keep="first" - ) + drop_columns = ["device_name"] + if frequency in ["weekly", "monthly", "yearly"]: + drop_columns.append(frequency[:-2]) + sorting_cols.append(frequency[:-2]) + else: + drop_columns.append("datetime") + sorting_cols.append("datetime") + + dataframe.drop_duplicates(subset=drop_columns, inplace=True, keep="first") dataframe.sort_values(sorting_cols, ascending=True, inplace=True) dataframe["frequency"] = frequency dataframe = dataframe.replace(np.nan, None) diff --git a/src/analytics/api/utils/data_formatters.py b/src/analytics/api/utils/data_formatters.py index e609953e5e..bace9559de 100644 --- a/src/analytics/api/utils/data_formatters.py +++ b/src/analytics/api/utils/data_formatters.py @@ -1,6 +1,6 @@ from enum import Enum from typing import Any, List, Union, Dict -import logging +from werkzeug.exceptions import BadRequest import pandas as pd import requests @@ -15,7 +15,8 @@ AQCSV_DATA_STATUS_MAPPER, ) from api.utils.http import AirQoRequests -from config import Config + +import logging logger = logging.getLogger(__name__) @@ -294,7 +295,7 @@ def device_category_to_str(device_category: str) -> str: return "" -def filter_non_private_sites(sites: List[str]) -> Dict[str, Any]: +def filter_non_private_sites(filter_type: str, sites: List[str]) -> Dict[str, Any]: """ Filters out private site IDs from a provided array of site IDs. @@ -313,17 +314,21 @@ def filter_non_private_sites(sites: List[str]) -> Dict[str, Any]: try: airqo_requests = AirQoRequests() response = airqo_requests.request( - endpoint=endpoint, body={"sites": sites}, method="post" + endpoint=endpoint, body={filter_type: sites}, method="post" ) - if response and response.get("status", None) == "success": - return response.get("data") + if response and response.get("status") == "success": + return airqo_requests.create_response( + message="Successfully returned data.", + data=response.get("data"), + success=True, + ) else: - raise RuntimeError(response.get("message")) - except RuntimeError as rex: - raise RuntimeError(f"Error while filtering non private sites {rex}") + return airqo_requests.create_response(response, success=False) + except Exception as rex: + logger.exception(f"Error while filtering non private devices {rex}") -def filter_non_private_devices(devices: List[str]) -> Dict[str, Any]: +def filter_non_private_devices(filter_type: str, devices: List[str]) -> Dict[str, Any]: """ FilterS out private device IDs from a provided array of device IDs. @@ -341,11 +346,15 @@ def filter_non_private_devices(devices: List[str]) -> Dict[str, Any]: try: airqo_requests = AirQoRequests() response = airqo_requests.request( - endpoint=endpoint, body={"devices": devices}, method="post" + endpoint=endpoint, body={filter_type: devices}, method="post" ) - if response and response.get("status", None) == "success": - return response.get("data") + if response and response.get("status") == "success": + return airqo_requests.create_response( + message="Successfully returned data.", + data=response.get("data"), + success=True, + ) else: - raise RuntimeError(response.get("message")) - except RuntimeError as rex: - raise RuntimeError(f"Error while filtering non private devices {rex}") + return airqo_requests.create_response(response, success=False) + except Exception as rex: + logger.exception(f"Error while filtering non private devices {rex}") diff --git a/src/analytics/api/utils/http.py b/src/analytics/api/utils/http.py index 4dde800778..d858ad74c8 100644 --- a/src/analytics/api/utils/http.py +++ b/src/analytics/api/utils/http.py @@ -163,7 +163,7 @@ def request(self, endpoint, params=None, body=None, method="get", base_url=None) success=True, ) else: - return self.create_response(f"Error: {response.status}", success=False) + return self.create_response(f"Error: {response.data}", success=False) except urllib3.exceptions.HTTPError as ex: logger.exception(f"HTTPError: {ex}") diff --git a/src/analytics/api/utils/pollutants/pm_25.py b/src/analytics/api/utils/pollutants/pm_25.py index e585d5abd7..6e3f2a986b 100644 --- a/src/analytics/api/utils/pollutants/pm_25.py +++ b/src/analytics/api/utils/pollutants/pm_25.py @@ -54,22 +54,26 @@ "no2": ["no2_calibrated_value", "no2_raw_value"], } +COMMON_POLLUTANT_MAPPING = { + "pm2_5_calibrated": ["pm2_5_calibrated_value"], + "pm2_5_raw": ["pm2_5_raw_value"], + "pm10_calibrated": ["pm10_calibrated_value"], + "pm10_raw": ["pm10_raw_value"], + "no2_calibrated": ["no2_calibrated_value"], + "no2_raw": ["no2_raw_value"], +} + BIGQUERY_FREQUENCY_MAPPER = { "raw": { "pm2_5": ["pm2_5", "s1_pm2_5", "s2_pm2_5"], "pm10": ["pm10", "s1_pm10", "s2_pm10"], "no2": ["no2"], }, - "daily": { - "pm2_5": ["pm2_5_calibrated_value", "pm2_5_raw_value"], - "pm10": ["pm10_calibrated_value", "pm10_raw_value"], - "no2": ["no2_calibrated_value", "no2_raw_value"], - }, - "hourly": { - "pm2_5": ["pm2_5_calibrated_value", "pm2_5_raw_value"], - "pm10": ["pm10_calibrated_value", "pm10_raw_value"], - "no2": ["no2_calibrated_value", "no2_raw_value"], - }, + "daily": COMMON_POLLUTANT_MAPPING, + "hourly": COMMON_POLLUTANT_MAPPING, + "weekly": COMMON_POLLUTANT_MAPPING, + "monthly": COMMON_POLLUTANT_MAPPING, + "yearly": COMMON_POLLUTANT_MAPPING, } PM_COLOR_CATEGORY = { @@ -113,9 +117,9 @@ } WEATHER_FIELDS_MAPPER = { - "temperature": "device_temperature", - "humidity": "device_humidity", - "wind_speed": "wind_speed", + "temperature": ["device_temperature"], + "humidity": ["device_humidity"], + "wind_speed": ["wind_speed"], } diff --git a/src/analytics/api/views/dashboard.py b/src/analytics/api/views/dashboard.py index 5daec57382..62d31bc01e 100644 --- a/src/analytics/api/views/dashboard.py +++ b/src/analytics/api/views/dashboard.py @@ -1,6 +1,7 @@ # Third-party libraries import math +from typing import List from flasgger import swag_from from flask import request @@ -27,6 +28,10 @@ from api.utils.request_validators import validate_request_json from main import rest_api_v2 +import logging + +logger = logging.getLogger(__name__) + @rest_api_v2.route("/dashboard/chart/data") class ChartDataResource(Resource): @@ -145,13 +150,79 @@ class D3ChartDataResource(Resource): "pollutant|required:str", "chartType|required:str", ) + def _get_validated_filter(self, json_data): + """ + Validates that exactly one of 'airqlouds', 'sites', or 'devices' is provided in the request, + and applies filtering if necessary. + + Args: + json_data (dict): JSON payload from the request. + + Returns: + tuple: The name of the filter ("sites", "devices", or "airqlouds") and its validated value if valid. + + Raises: + ValueError: If more than one or none of the filters are provided. + """ + error_message: str = "" + validated_data: List[str] = None + + # TODO Lias with device registry to cleanup this makeshift implementation + devices = ["devices", "device_ids", "device_names"] + sites = ["sites", "site_names", "site_ids"] + + valid_filters = [ + "sites", + "site_names", + "site_ids", + "devices", + "device_ids", + "airqlouds", + "device_names", + ] + provided_filters = [key for key in valid_filters if json_data.get(key)] + if len(provided_filters) != 1: + raise ValueError( + "Specify exactly one of 'airqlouds', 'sites', 'device_names', or 'devices' in the request body." + ) + filter_type = provided_filters[0] + filter_value = json_data.get(filter_type) + + if filter_type in sites: + validated_value = filter_non_private_sites(filter_type, filter_value) + elif filter_type in devices: + validated_value = filter_non_private_devices(filter_type, filter_value) + else: + return filter_type, filter_value, None + + if validated_value and validated_value.get("status") == "success": + # TODO This should be cleaned up. + validated_data = validated_value.get("data", {}).get( + "sites" if filter_type in sites else "devices", [] + ) + else: + error_message = validated_value.get("message", "Validation failed") + + return filter_type, validated_data, error_message + def post(self): tenant = request.args.get("tenant", "airqo") json_data = request.get_json() - sites = filter_non_private_sites(sites=json_data.get("sites", {})).get( + + try: + filter_type, filter_value, error_message = self._get_validated_filter( + json_data + ) + if error_message: + return error_message, AirQoRequests.Status.HTTP_400_BAD_REQUEST + except Exception as e: + logger.exception(f"An error has occured; {e}") + + sites = filter_non_private_sites("sites", json_data.get("sites", {})).get( "sites", [] ) + start_date = json_data["startDate"] end_date = json_data["endDate"] frequency = json_data["frequency"] @@ -161,7 +232,7 @@ def post(self): events_model = EventsModel(tenant) # data = events_model.get_d3_chart_events(sites, start_date, end_date, pollutant, frequency) data = events_model.get_d3_chart_events_v2( - sites, start_date, end_date, pollutant, frequency, tenant + filter_value, start_date, end_date, pollutant, frequency, tenant ) if chart_type.lower() == "pie": diff --git a/src/analytics/api/views/data.py b/src/analytics/api/views/data.py index 528bb228e4..caf4d6439d 100644 --- a/src/analytics/api/views/data.py +++ b/src/analytics/api/views/data.py @@ -1,6 +1,6 @@ import datetime import traceback -import logging +from typing import List import flask_excel as excel import pandas as pd @@ -33,6 +33,7 @@ from api.utils.http import AirQoRequests from api.utils.request_validators import validate_request_json, validate_request_params from main import rest_api_v2 +import logging logger = logging.getLogger(__name__) @@ -63,7 +64,11 @@ class DataExportResource(Resource): "outputFormat|optional:str", "pollutants|optional:list", "sites|optional:list", + "site_ids|optional:list", + "site_names|optional:list", + "device_ids|optional:list", "devices|optional:list", + "device_names|optional:list", "airqlouds|optional:list", "datatype|optional:str", "minimum|optional:bool", @@ -82,21 +87,21 @@ def post(self): "download_types": ["csv", "json"], "data_types": ["calibrated", "raw"], "output_formats": ["airqo-standard", "aqcsv"], - "frequencies": ["hourly", "daily", "raw"], + "frequencies": ["hourly", "daily", "raw", "weekly", "monthly", "yearly"], } json_data = request.get_json() start_date = json_data["startDateTime"] end_date = json_data["endDateTime"] - try: - filter_type, filter_value = self._get_validated_filter(json_data) - except ValueError as e: - return ( - AirQoRequests.create_response(f"An error occured: {e}", success=False), - AirQoRequests.Status.HTTP_400_BAD_REQUEST, + filter_type, filter_value, error_message = self._get_validated_filter( + json_data ) + if error_message: + return error_message, AirQoRequests.Status.HTTP_400_BAD_REQUEST + except Exception as e: + logger.exception(f"An error has occured; {e}") try: frequency = self._get_valid_option( @@ -144,6 +149,7 @@ def post(self): end_date=end_date, frequency=frequency, pollutants=pollutants, + data_type=data_type, weather_fields=weather_fields, ) @@ -153,13 +159,10 @@ def post(self): AirQoRequests.Status.HTTP_404_NOT_FOUND, ) if minimum_output: + # Drop unnecessary columns data_frame.drop( columns=[ - "device_latitude", - "device_longitude", "site_id", - "site_latitude", - "site_longitude", ], inplace=True, ) @@ -183,11 +186,10 @@ def post(self): records, "csv", file_name=f"{frequency}-air-quality{postfix}data" ) except Exception as ex: - print(ex) - traceback.print_exc() + logger.exception(f"An error occurred: {ex}") return ( AirQoRequests.create_response( - f"An Error occurred while processing your request. Please contact support", + f"An Error occurred while processing your request. Please contact support. {ex}", success=False, ), AirQoRequests.Status.HTTP_500_INTERNAL_SERVER_ERROR, @@ -195,8 +197,8 @@ def post(self): def _get_validated_filter(self, json_data): """ - Ensures that only one of 'airqlouds', 'sites', or 'devices' is provided in the request. - Calls filter_non_private_* only after confirming exclusivity. + Validates that exactly one of 'airqlouds', 'sites', or 'devices' is provided in the request, + and applies filtering if necessary. Args: json_data (dict): JSON payload from the request. @@ -207,31 +209,46 @@ def _get_validated_filter(self, json_data): Raises: ValueError: If more than one or none of the filters are provided. """ - provided_filters = [ - key for key in ["sites", "devices", "airqlouds"] if json_data.get(key) + error_message: str = "" + validated_data: List[str] = None + + # TODO Lias with device registry to cleanup this makeshift implementation + devices = ["devices", "device_ids", "device_names"] + sites = ["sites", "site_names", "site_ids"] + + valid_filters = [ + "sites", + "site_names", + "site_ids", + "devices", + "device_ids", + "airqlouds", + "device_names", ] - + provided_filters = [key for key in valid_filters if json_data.get(key)] if len(provided_filters) != 1: raise ValueError( - "Specify exactly one of 'airqlouds', 'sites', or 'devices' in the request body." + "Specify exactly one of 'airqlouds', 'sites', 'device_names', or 'devices' in the request body." ) - filter_type = provided_filters[0] filter_value = json_data.get(filter_type) - if filter_type == "sites": - validated_value = filter_non_private_sites(sites=filter_value).get( - "sites", [] - ) - elif filter_type == "devices": - validated_value = filter_non_private_devices(devices=filter_value).get( - "devices", [] + if filter_type in sites: + validated_value = filter_non_private_sites(filter_type, filter_value) + elif filter_type in devices: + validated_value = filter_non_private_devices(filter_type, filter_value) + else: + return filter_type, filter_value, None + + if validated_value and validated_value.get("status") == "success": + # TODO This should be cleaned up. + validated_data = validated_value.get("data", {}).get( + "sites" if filter_type in sites else "devices", [] ) else: - # No additional processing is needed for 'airqlouds' - validated_value = filter_value + error_message = validated_value.get("message", "Validation failed") - return filter_type, validated_value + return filter_type, validated_data, error_message def _get_valid_option(self, option, valid_options, option_name): """ diff --git a/src/website/README.md b/src/website/README.md index a1ce487c73..d50f5ebfdb 100644 --- a/src/website/README.md +++ b/src/website/README.md @@ -1 +1 @@ -# New Website Backend . \ No newline at end of file +# New Website Backend