From 0530d494ec39ffa2900cc73eed15adb273e115b2 Mon Sep 17 00:00:00 2001 From: Joseph Hamman Date: Mon, 29 Jul 2024 22:28:00 -0700 Subject: [PATCH 1/4] feature: cache responses --- xpublish_edr/__init__.py | 2 +- xpublish_edr/formats/to_covjson.py | 9 +- xpublish_edr/plugin.py | 149 +++++++++++++++++------------ xpublish_edr/query.py | 12 +++ 4 files changed, 108 insertions(+), 64 deletions(-) diff --git a/xpublish_edr/__init__.py b/xpublish_edr/__init__.py index 1711770..75668a9 100644 --- a/xpublish_edr/__init__.py +++ b/xpublish_edr/__init__.py @@ -1,5 +1,5 @@ """ -xpublish_edr is not a real package, just a set of best practices examples. +Xpublish routers for the OGC EDR API. """ from xpublish_edr.plugin import CfEdrPlugin diff --git a/xpublish_edr/formats/to_covjson.py b/xpublish_edr/formats/to_covjson.py index 3a5c4d2..0437672 100644 --- a/xpublish_edr/formats/to_covjson.py +++ b/xpublish_edr/formats/to_covjson.py @@ -16,6 +16,11 @@ import numpy as np import xarray as xr +from fastapi.responses import JSONResponse + + +class CovJSONResponse(JSONResponse): + media_type = "application/vnd.cov+json" class Domain(TypedDict): @@ -74,7 +79,7 @@ def invert_cf_dims(ds): return inverted -def to_cf_covjson(ds: xr.Dataset) -> CovJSON: +def to_cf_covjson(ds: xr.Dataset) -> JSONResponse: """Transform an xarray dataset to CoverageJSON using CF conventions""" covjson: CovJSON = { @@ -164,4 +169,4 @@ def to_cf_covjson(ds: xr.Dataset) -> CovJSON: covjson["ranges"][var] = cov_range - return covjson + return CovJSONResponse(content=covjson) diff --git a/xpublish_edr/plugin.py b/xpublish_edr/plugin.py index b2536ac..8538893 100644 --- a/xpublish_edr/plugin.py +++ b/xpublish_edr/plugin.py @@ -2,12 +2,15 @@ OGC EDR router for datasets with CF convention metadata """ import logging +from functools import cache from typing import List, Optional +import cachey import pkg_resources import xarray as xr -from fastapi import APIRouter, Depends, HTTPException, Request +from fastapi import APIRouter, Depends, HTTPException, Request, Response from xpublish import Dependencies, Plugin, hookimpl +from xpublish.utils.cache import CostTimer from .formats.to_covjson import to_cf_covjson from .query import EDRQuery, edr_query, edr_query_params @@ -15,6 +18,12 @@ logger = logging.getLogger("cf_edr") +def cache_key_from_request(request: Request, query: EDRQuery, dataset: xr.Dataset): + """Generate a cache key from the request and query parameters""" + return (request, query, str(dataset)) + + +@cache def position_formats(): """ Return response format functions from registered @@ -70,85 +79,103 @@ def get_position( request: Request, query: EDRQuery = Depends(edr_query), dataset: xr.Dataset = Depends(deps.dataset), + cache: cachey.Cache = Depends(deps.cache), ): """ Returns position data based on WKT `Point(lon lat)` coordinates Extra selecting/slicing parameters can be provided as extra query parameters """ - try: - ds = dataset.cf.sel(X=query.point.x, Y=query.point.y, method="nearest") - except KeyError: - raise HTTPException( - status_code=404, - detail="Dataset does not have CF Convention compliant metadata", - ) + cache_key = cache_key_from_request(request, query, dataset) + response: Optional[Response] = cache.get(cache_key) - if query.z: - ds = dataset.cf.sel(Z=query.z, method="nearest") - - if query.datetime: - datetimes = query.datetime.split("/") + if response is not None: + logger.debug(f"Cache hit for {cache_key}") + return response + with CostTimer() as ct: try: - if len(datetimes) == 1: - ds = ds.cf.sel(T=datetimes[0], method="nearest") - elif len(datetimes) == 2: - ds = ds.cf.sel(T=slice(datetimes[0], datetimes[1])) - else: - raise HTTPException( - status_code=404, - detail="Invalid datetimes submitted", - ) - except ValueError as e: - logger.error("Error with datetime", exc_info=True) - raise HTTPException( - status_code=404, - detail=f"Invalid datetime ({e})", - ) from e - - if query.parameters: - try: - ds = ds.cf[query.parameters.split(",")] - except KeyError as e: + ds = dataset.cf.sel( + X=query.point.x, + Y=query.point.y, + method="nearest", + ) + except KeyError: raise HTTPException( status_code=404, - detail=f"Invalid variable: {e}", + detail="Dataset does not have CF Convention compliant metadata", ) - logger.debug(f"Dataset filtered by query params {ds}") + if query.z: + ds = dataset.cf.sel(Z=query.z, method="nearest") + + if query.datetime: + datetimes = query.datetime.split("/") + + try: + if len(datetimes) == 1: + ds = ds.cf.sel(T=datetimes[0], method="nearest") + elif len(datetimes) == 2: + ds = ds.cf.sel(T=slice(datetimes[0], datetimes[1])) + else: + raise HTTPException( + status_code=404, + detail="Invalid datetimes submitted", + ) + except ValueError as e: + logger.error("Error with datetime", exc_info=True) + raise HTTPException( + status_code=404, + detail=f"Invalid datetime ({e})", + ) from e + + if query.parameters: + try: + ds = ds.cf[query.parameters.split(",")] + except KeyError as e: + raise HTTPException( + status_code=404, + detail=f"Invalid variable: {e}", + ) - query_params = dict(request.query_params) - for query_param in request.query_params: - if query_param in edr_query_params: - del query_params[query_param] + logger.debug(f"Dataset filtered by query params {ds}") - method: Optional[str] = "nearest" + query_params = dict(request.query_params) + for query_param in request.query_params: + if query_param in edr_query_params: + del query_params[query_param] - for key, value in query_params.items(): - split_value = value.split("/") - if len(split_value) == 1: - continue - elif len(split_value) == 2: - query_params[key] = slice(split_value[0], split_value[1]) - method = None - else: - raise HTTPException(404, f"Too many values for selecting {key}") + method: Optional[str] = "nearest" - ds = ds.sel(query_params, method=method) + for key, value in query_params.items(): + split_value = value.split("/") + if len(split_value) == 1: + continue + elif len(split_value) == 2: + query_params[key] = slice(split_value[0], split_value[1]) + method = None + else: + raise HTTPException( + 404, + f"Too many values for selecting {key}", + ) - if query.format: - try: - format_fn = position_formats()[query.format] - except KeyError: - raise HTTPException( - 404, - f"{query.format} is not a valid format for EDR position queries. " - "Get `./formats` for valid formats", - ) + ds = ds.sel(query_params, method=method) - return format_fn(ds) + if query.format: + try: + format_fn = position_formats()[query.format] + except KeyError: + raise HTTPException( + 404, + f"{query.format} is not a valid format for EDR position queries. " + "Get `./formats` for valid formats", + ) + else: + format_fn = to_cf_covjson - return to_cf_covjson(ds) + response = format_fn(ds) + cache.put(cache_key, response, ct.time, int(response.headers["content-length"])) + return response return router diff --git a/xpublish_edr/query.py b/xpublish_edr/query.py index 22b1d1f..a390ab3 100644 --- a/xpublish_edr/query.py +++ b/xpublish_edr/query.py @@ -29,6 +29,18 @@ def point(self): """Shapely point from WKT query params""" return wkt.loads(self.coords) + def __hash__(self): + return hash( + ( + self.coords, + self.z, + self.datetime, + self.parameters, + self.crs, + self.format, + ), + ) + def edr_query( coords: str = Query( From 060596a96ec54ff640b7bf0cb11fbf53f576a8e0 Mon Sep 17 00:00:00 2001 From: Joseph Hamman Date: Mon, 29 Jul 2024 22:42:59 -0700 Subject: [PATCH 2/4] improve typing and cache key calculation --- xpublish_edr/formats/to_covjson.py | 3 ++- xpublish_edr/formats/to_csv.py | 2 +- xpublish_edr/formats/to_netcdf.py | 2 +- xpublish_edr/plugin.py | 21 ++++++++++++++++----- 4 files changed, 20 insertions(+), 8 deletions(-) diff --git a/xpublish_edr/formats/to_covjson.py b/xpublish_edr/formats/to_covjson.py index 0437672..990efd9 100644 --- a/xpublish_edr/formats/to_covjson.py +++ b/xpublish_edr/formats/to_covjson.py @@ -20,6 +20,7 @@ class CovJSONResponse(JSONResponse): + # https://docs.ogc.org/cs/21-069r2/21-069r2.html#_b8b17e78-0147-4b58-8ade-a19465b57abc media_type = "application/vnd.cov+json" @@ -79,7 +80,7 @@ def invert_cf_dims(ds): return inverted -def to_cf_covjson(ds: xr.Dataset) -> JSONResponse: +def to_cf_covjson(ds: xr.Dataset) -> CovJSONResponse: """Transform an xarray dataset to CoverageJSON using CF conventions""" covjson: CovJSON = { diff --git a/xpublish_edr/formats/to_csv.py b/xpublish_edr/formats/to_csv.py index 436f81c..99e3d06 100644 --- a/xpublish_edr/formats/to_csv.py +++ b/xpublish_edr/formats/to_csv.py @@ -5,7 +5,7 @@ from fastapi import Response -def to_csv(ds: xr.Dataset): +def to_csv(ds: xr.Dataset) -> Response: """Return a CSV response from an xarray dataset""" ds = ds.squeeze() df = ds.to_pandas() diff --git a/xpublish_edr/formats/to_netcdf.py b/xpublish_edr/formats/to_netcdf.py index 1580771..01039c1 100644 --- a/xpublish_edr/formats/to_netcdf.py +++ b/xpublish_edr/formats/to_netcdf.py @@ -8,7 +8,7 @@ from fastapi import Response -def to_netcdf(ds: xr.Dataset): +def to_netcdf(ds: xr.Dataset) -> Response: """Return a NetCDF response from a dataset""" with TemporaryDirectory() as tmpdir: path = Path(tmpdir) / "position.nc" diff --git a/xpublish_edr/plugin.py b/xpublish_edr/plugin.py index 8538893..7722821 100644 --- a/xpublish_edr/plugin.py +++ b/xpublish_edr/plugin.py @@ -3,9 +3,10 @@ """ import logging from functools import cache -from typing import List, Optional +from typing import Hashable, List, Optional, Tuple import cachey +import dask import pkg_resources import xarray as xr from fastapi import APIRouter, Depends, HTTPException, Request, Response @@ -18,9 +19,14 @@ logger = logging.getLogger("cf_edr") -def cache_key_from_request(request: Request, query: EDRQuery, dataset: xr.Dataset): +def cache_key_from_request( + route: str, + request: Request, + query: EDRQuery, + dataset: xr.Dataset, +) -> Tuple[Hashable, ...]: """Generate a cache key from the request and query parameters""" - return (request, query, str(dataset)) + return (route, request, query, dask.base.tokenize(dataset)) @cache @@ -86,7 +92,7 @@ def get_position( Extra selecting/slicing parameters can be provided as extra query parameters """ - cache_key = cache_key_from_request(request, query, dataset) + cache_key = cache_key_from_request("position", request, query, dataset) response: Optional[Response] = cache.get(cache_key) if response is not None: @@ -175,7 +181,12 @@ def get_position( format_fn = to_cf_covjson response = format_fn(ds) - cache.put(cache_key, response, ct.time, int(response.headers["content-length"])) + cache.put( + cache_key, + response, + ct.time, + int(response.headers["content-length"]), + ) return response return router From 031a967592ea8b3e3ddbec6e851b19f26948e463 Mon Sep 17 00:00:00 2001 From: Joseph Hamman Date: Tue, 30 Jul 2024 07:58:49 -0700 Subject: [PATCH 3/4] deterministic tokenize --- xpublish_edr/plugin.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/xpublish_edr/plugin.py b/xpublish_edr/plugin.py index 26f59f7..26978f6 100644 --- a/xpublish_edr/plugin.py +++ b/xpublish_edr/plugin.py @@ -26,7 +26,9 @@ def cache_key_from_request( dataset: xr.Dataset, ) -> Tuple[Hashable, ...]: """Generate a cache key from the request and query parameters""" - return (route, request, query, dask.base.tokenize(dataset)) + with dask.config.set({"tokenize.ensure-deterministic": True}): + ds_token = dask.base.tokenize(dataset) + return (route, request, query, ds_token) @cache From ed3fe14b60388ad80942a7068725a89b0b9ec36f Mon Sep 17 00:00:00 2001 From: Joseph Hamman Date: Tue, 6 Aug 2024 16:55:46 -0700 Subject: [PATCH 4/4] add docstrings to new methods/classes --- xpublish_edr/formats/to_covjson.py | 2 ++ xpublish_edr/query.py | 1 + 2 files changed, 3 insertions(+) diff --git a/xpublish_edr/formats/to_covjson.py b/xpublish_edr/formats/to_covjson.py index 990efd9..e06a9ba 100644 --- a/xpublish_edr/formats/to_covjson.py +++ b/xpublish_edr/formats/to_covjson.py @@ -20,6 +20,8 @@ class CovJSONResponse(JSONResponse): + """CovJSON response type""" + # https://docs.ogc.org/cs/21-069r2/21-069r2.html#_b8b17e78-0147-4b58-8ade-a19465b57abc media_type = "application/vnd.cov+json" diff --git a/xpublish_edr/query.py b/xpublish_edr/query.py index a390ab3..ed776b7 100644 --- a/xpublish_edr/query.py +++ b/xpublish_edr/query.py @@ -30,6 +30,7 @@ def point(self): return wkt.loads(self.coords) def __hash__(self): + """Hash based on query parameters""" return hash( ( self.coords,