From ca6f485b8e27483c1f57066ce1debecd4a9f550a Mon Sep 17 00:00:00 2001 From: danielfromearth Date: Wed, 8 Nov 2023 16:31:19 -0500 Subject: [PATCH] change output of process_catalogs from single Catalog to list of Catalogs --- batcher/harmony/service_adapter.py | 54 ++++++++++++++++-------------- 1 file changed, 29 insertions(+), 25 deletions(-) diff --git a/batcher/harmony/service_adapter.py b/batcher/harmony/service_adapter.py index 31bff4b..e4e9661 100644 --- a/batcher/harmony/service_adapter.py +++ b/batcher/harmony/service_adapter.py @@ -7,8 +7,8 @@ from pystac.item import Asset from batcher.harmony.util import ( + _get_item_url, _get_netcdf_urls, - _get_output_bounding_box, _get_output_date_range, ) from batcher.tempo_filename_parser import get_batch_indices @@ -48,14 +48,10 @@ def invoke(self): return self.message, self.process_catalog(self.catalog) - def process_catalog(self, catalog: pystac.Catalog): + def process_catalog(self, catalog: pystac.Catalog) -> list[pystac.Catalog]: """Converts a list of STAC catalogs into a list of lists of STAC catalogs.""" self.logger.info("process_catalog() started.") try: - result = catalog.clone() - result.id = str(uuid4()) - result.clear_children() - # Get all the items from the catalog, including from child or linked catalogs items = list(self.get_all_catalog_items(catalog)) @@ -63,6 +59,9 @@ def process_catalog(self, catalog: pystac.Catalog): # Quick return if catalog contains no items if len(items) == 0: + result = catalog.clone() + result.id = str(uuid4()) + result.clear_children() return result # # --- Get granule filepaths (urls) --- @@ -79,38 +78,43 @@ def process_catalog(self, catalog: pystac.Catalog): for k, v in zip(batch_indices, items): grouped.setdefault(k, []).append(v) - # --- Construct a STAC Catalog that holds multiple Items (which represent each TEMPO scan), - # and each Item holds multiple Assets (which represent each granule). - result.clear_items() - + # --- Construct a list of STAC Catalogs (which represent each TEMPO scan), + # and each Catalog holds multiple Items (which represent each granule). + catalogs = [] for batch_id, batch_items in grouped.items(): - batch_urls: list[str] = _get_netcdf_urls(batch_items) - bounding_box = _get_output_bounding_box(batch_items) - properties = _get_output_date_range(batch_items) - - self.logger.info(f"constructing new pystac.Item for batch_id==={batch_id}.") - - # Construct a new pystac.Item with every granule in the batch as a pystac.Asset - output_item = Item( - str(uuid4()), bbox_to_geometry(bounding_box), bounding_box, None, properties - ) + self.logger.info(f"constructing new pystac.Catalog for batch_id==={batch_id}.") + # Initialize a new, empty Catalog + batch_catalog = catalog.clone() + batch_catalog.id = str(uuid4()) + batch_catalog.clear_children() + batch_catalog.clear_items() for idx, item in enumerate(batch_items): + # Construct a new pystac.Item for each granule in the batch + output_item = Item( + str(uuid4()), + bbox_to_geometry(item.bbox), + item.bbox, + None, + _get_output_date_range([item]), + ) output_item.add_asset( f"data_{idx}", Asset( - batch_urls[idx], - title=batch_urls[idx], + _get_item_url(item), + title=_get_item_url(item), media_type="application/x-netcdf4", roles=["data"], ), ) + batch_catalog.add_item(output_item) - result.add_item(output_item) + self.logger.info("STAC catalog creation for batch_id==={batch_id} complete.") + catalogs.append(batch_catalog) - self.logger.info("STAC catalog creation complete.") + self.logger.info("All STAC catalogs are complete.") - return result + return catalogs except Exception as service_exception: self.logger.error(service_exception, exc_info=1)