diff --git a/src/hats/catalog/dataset/dataset.py b/src/hats/catalog/dataset/dataset.py index 13e53343..6477e14d 100644 --- a/src/hats/catalog/dataset/dataset.py +++ b/src/hats/catalog/dataset/dataset.py @@ -1,12 +1,14 @@ from __future__ import annotations from pathlib import Path +from typing import List import pyarrow as pa from upath import UPath from hats.catalog.dataset.table_properties import TableProperties from hats.io import file_io +from hats.io.parquet_metadata import aggregate_column_statistics # pylint: disable=too-few-public-methods @@ -34,5 +36,26 @@ def __init__( self.catalog_path = catalog_path self.on_disk = catalog_path is not None self.catalog_base_dir = file_io.get_upath(self.catalog_path) - self.schema = schema + + def aggregate_column_statistics( + self, + exclude_hats_columns: bool = True, + exclude_columns: List[str] = None, + include_columns: List[str] = None, + ): + """Read footer statistics in parquet metadata, and report on global min/max values. + + Args: + exclude_hats_columns (bool): exclude HATS spatial and partitioning fields + from the statistics. Defaults to True. + exclude_columns (List[str]): additional columns to exclude from the statistics. + include_columns (List[str]): if specified, only return statistics for the column + names provided. Defaults to None, and returns all non-hats columns. + """ + return aggregate_column_statistics( + self.catalog_base_dir / "dataset" / "_metadata", + exclude_hats_columns=exclude_hats_columns, + exclude_columns=exclude_columns, + include_columns=include_columns, + ) diff --git a/src/hats/catalog/partition_info.py b/src/hats/catalog/partition_info.py index a3c05ecf..d073702d 100644 --- a/src/hats/catalog/partition_info.py +++ b/src/hats/catalog/partition_info.py @@ -277,4 +277,4 @@ def calculate_fractional_coverage(self): area_by_order = [hp.nside2pixarea(hp.order2nside(order), degrees=True) for order in cov_order] # 41253 is the number of square degrees in a sphere # https://en.wikipedia.org/wiki/Square_degree - return (area_by_order * cov_count).sum() / 41253 + return (area_by_order * cov_count).sum() / (360**2 / np.pi) diff --git a/src/hats/io/validation.py b/src/hats/io/validation.py index d44f5f2a..3717de3c 100644 --- a/src/hats/io/validation.py +++ b/src/hats/io/validation.py @@ -4,13 +4,14 @@ from pathlib import Path import numpy as np +import pyarrow.dataset as pds from upath import UPath -import hats.pixel_math.healpix_shim as hp from hats.catalog.dataset.table_properties import TableProperties +from hats.catalog.healpix_dataset.healpix_dataset import HealpixDataset from hats.catalog.partition_info import PartitionInfo from hats.io import get_common_metadata_pointer, get_parquet_metadata_pointer, get_partition_info_pointer -from hats.io.file_io import read_parquet_dataset +from hats.io.file_io import get_upath from hats.io.file_io.file_pointer import is_regular_file from hats.io.paths import get_healpix_from_path from hats.loaders import read_hats @@ -40,6 +41,7 @@ def is_valid_catalog( True if both the properties and partition_info files are valid, False otherwise """ + pointer = get_upath(pointer) if not strict: return is_catalog_info_valid(pointer) and ( is_partition_info_valid(pointer) or is_metadata_valid(pointer) @@ -67,9 +69,6 @@ def handle_error(msg): if not is_catalog_info_valid(pointer): handle_error("properties file does not exist or is invalid.") - if not is_partition_info_valid(pointer): - handle_error("partition_info.csv file does not exist.") - if not is_metadata_valid(pointer): handle_error("_metadata file does not exist.") @@ -83,74 +82,63 @@ def handle_error(msg): # Load as a catalog object. Confirms that the catalog info matches type. catalog = read_hats(pointer) - expected_pixels = sort_pixels(catalog.get_healpix_pixels()) - - if verbose: - print(f"Found {len(expected_pixels)} partitions.") - - ## Compare the pixels in _metadata with partition_info.csv metadata_file = get_parquet_metadata_pointer(pointer) - # Use both strategies of reading the partition info: strict and !strict. - metadata_pixels = sort_pixels( - PartitionInfo.read_from_file(metadata_file, strict=True).get_healpix_pixels() + ## Load as parquet dataset. Allow errors, and check pixel set against _metadata + # As a side effect, this confirms that we can load the directory as a valid dataset. + dataset = pds.parquet_dataset( + metadata_file.path, + filesystem=metadata_file.fs, ) - if not np.array_equal(expected_pixels, metadata_pixels): - handle_error("Partition pixels differ between catalog and _metadata file (strict)") - metadata_pixels = sort_pixels( - PartitionInfo.read_from_file(metadata_file, strict=False).get_healpix_pixels() - ) - if not np.array_equal(expected_pixels, metadata_pixels): - handle_error("Partition pixels differ between catalog and _metadata file (non-strict)") + if isinstance(catalog, HealpixDataset): + if not is_partition_info_valid(pointer): + handle_error("partition_info.csv file does not exist.") + return is_valid - partition_info_file = get_partition_info_pointer(pointer) - csv_pixels = sort_pixels(PartitionInfo.read_from_csv(partition_info_file).get_healpix_pixels()) - if not np.array_equal(expected_pixels, csv_pixels): - handle_error("Partition pixels differ between catalog and partition_info.csv file") + expected_pixels = sort_pixels(catalog.get_healpix_pixels()) - ## Load as parquet dataset. Allow errors, and check pixel set against _metadata - ignore_prefixes = [ - "_common_metadata", - "_metadata", - "catalog_info.json", - "properties", - "provenance_info.json", - "partition_info.csv", - "point_map.fits", - "README", - ] + if verbose: + print(f"Found {len(expected_pixels)} partitions.") - # As a side effect, this confirms that we can load the directory as a valid dataset. - (dataset_path, dataset) = read_parquet_dataset( - pointer, - ignore_prefixes=ignore_prefixes, - exclude_invalid_files=False, - ) + ## Compare the pixels in _metadata with partition_info.csv + # Use both strategies of reading the partition info: strict and !strict. + metadata_pixels = sort_pixels( + PartitionInfo.read_from_file(metadata_file, strict=True).get_healpix_pixels() + ) + if not np.array_equal(expected_pixels, metadata_pixels): + handle_error("Partition pixels differ between catalog and _metadata file (strict)") + + metadata_pixels = sort_pixels( + PartitionInfo.read_from_file(metadata_file, strict=False).get_healpix_pixels() + ) + if not np.array_equal(expected_pixels, metadata_pixels): + handle_error("Partition pixels differ between catalog and _metadata file (non-strict)") - parquet_path_pixels = [] - for hats_file in dataset.files: - relative_path = hats_file[len(dataset_path) :] - healpix_pixel = get_healpix_from_path(relative_path) - if healpix_pixel == INVALID_PIXEL: - handle_error(f"Could not derive partition pixel from parquet path: {relative_path}") + partition_info_file = get_partition_info_pointer(pointer) + partition_info = PartitionInfo.read_from_csv(partition_info_file) + csv_pixels = sort_pixels(partition_info.get_healpix_pixels()) + if not np.array_equal(expected_pixels, csv_pixels): + handle_error("Partition pixels differ between catalog and partition_info.csv file") - parquet_path_pixels.append(healpix_pixel) + parquet_path_pixels = [] + for hats_file in dataset.files: + healpix_pixel = get_healpix_from_path(hats_file) + if healpix_pixel == INVALID_PIXEL: + handle_error(f"Could not derive partition pixel from parquet path: {hats_file}") + parquet_path_pixels.append(healpix_pixel) - parquet_path_pixels = sort_pixels(parquet_path_pixels) + parquet_path_pixels = sort_pixels(parquet_path_pixels) - if not np.array_equal(expected_pixels, parquet_path_pixels): - handle_error("Partition pixels differ between catalog and parquet paths") + if not np.array_equal(expected_pixels, parquet_path_pixels): + handle_error("Partition pixels differ between catalog and parquet paths") - if verbose: - # Print a few more stats - pixel_orders = [p.order for p in expected_pixels] - cov_order, cov_count = np.unique(pixel_orders, return_counts=True) - area_by_order = [hp.nside2pixarea(hp.order2nside(order), degrees=True) for order in cov_order] - total_area = (area_by_order * cov_count).sum() - print( - f"Approximate coverage is {total_area:0.2f} sq deg, or {total_area/41253*100:0.2f} % of the sky." - ) + if verbose: + # Print a few more stats + print( + "Approximate coverage is " + f"{partition_info.calculate_fractional_coverage()*100:0.2f} % of the sky." + ) return is_valid @@ -172,7 +160,7 @@ def is_catalog_info_valid(pointer: str | Path | UPath) -> bool: return True -def is_partition_info_valid(pointer: str | Path | UPath) -> bool: +def is_partition_info_valid(pointer: UPath) -> bool: """Checks if partition_info is valid for a given base catalog pointer Args: @@ -186,7 +174,7 @@ def is_partition_info_valid(pointer: str | Path | UPath) -> bool: return partition_info_exists -def is_metadata_valid(pointer: str | Path | UPath) -> bool: +def is_metadata_valid(pointer: UPath) -> bool: """Checks if _metadata is valid for a given base catalog pointer Args: @@ -200,7 +188,7 @@ def is_metadata_valid(pointer: str | Path | UPath) -> bool: return metadata_file_exists -def is_common_metadata_valid(pointer: str | Path | UPath) -> bool: +def is_common_metadata_valid(pointer: UPath) -> bool: """Checks if _common_metadata is valid for a given base catalog pointer Args: diff --git a/tests/hats/catalog/test_catalog.py b/tests/hats/catalog/test_catalog.py index f4d4e68c..46b03e5b 100644 --- a/tests/hats/catalog/test_catalog.py +++ b/tests/hats/catalog/test_catalog.py @@ -91,6 +91,19 @@ def test_load_catalog_small_sky_order1(small_sky_order1_dir): assert len(cat.get_healpix_pixels()) == 4 +def test_aggregate_column_statistics(small_sky_order1_dir): + cat = read_hats(small_sky_order1_dir) + + result_frame = cat.aggregate_column_statistics() + assert len(result_frame) == 5 + + result_frame = cat.aggregate_column_statistics(exclude_hats_columns=False) + assert len(result_frame) == 9 + + result_frame = cat.aggregate_column_statistics(include_columns=["ra", "dec"]) + assert len(result_frame) == 2 + + def test_load_catalog_small_sky_order1_moc(small_sky_order1_dir): """Instantiate a catalog with 4 pixels""" cat = read_hats(small_sky_order1_dir) diff --git a/tests/hats/io/test_validation.py b/tests/hats/io/test_validation.py index 3b216bf0..ee05800b 100644 --- a/tests/hats/io/test_validation.py +++ b/tests/hats/io/test_validation.py @@ -2,7 +2,6 @@ import os import shutil -from pathlib import Path import pytest @@ -65,10 +64,7 @@ def test_is_valid_catalog_strict(tmp_path, small_sky_catalog, small_sky_pixels, assert not is_valid_catalog(tmp_path, **flags) # This outta do it! Add parquet files that match the _metadata pixels. - shutil.copytree( - Path(small_sky_catalog.catalog_path) / "dataset" / "Norder=0", - tmp_path / "dataset" / "Norder=0", - ) + shutil.copytree(small_sky_catalog.catalog_path / "dataset", tmp_path / "dataset", dirs_exist_ok=True) assert is_valid_catalog(tmp_path, **flags) @@ -93,22 +89,20 @@ def test_is_valid_catalog_fail_fast(tmp_path, small_sky_catalog, small_sky_pixel # Having the catalog_info file is not enough small_sky_catalog.catalog_info.to_properties_file(tmp_path) + with pytest.raises(ValueError, match="_metadata"): + is_valid_catalog(tmp_path, **flags) + + total_rows = PartitionInfo.from_healpix(small_sky_pixels).write_to_metadata_files(tmp_path) with pytest.raises(ValueError, match="partition_info.csv"): is_valid_catalog(tmp_path, **flags) PartitionInfo.from_healpix(small_sky_pixels).write_to_file(catalog_path=tmp_path) - with pytest.raises(ValueError, match="_metadata"): - is_valid_catalog(tmp_path, **flags) - total_rows = PartitionInfo.from_healpix(small_sky_pixels).write_to_metadata_files(tmp_path) assert total_rows == 1 - with pytest.raises(ValueError, match="parquet paths"): + with pytest.raises(ValueError, match="parquet path"): is_valid_catalog(tmp_path, **flags) - shutil.copytree( - Path(small_sky_catalog.catalog_path) / "dataset" / "Norder=0", - tmp_path / "dataset" / "Norder=0", - ) + shutil.copytree(small_sky_catalog.catalog_path / "dataset", tmp_path / "dataset", dirs_exist_ok=True) assert is_valid_catalog(tmp_path, **flags) @@ -127,7 +121,6 @@ def test_is_valid_catalog_verbose_fail(tmp_path, capsys): captured = capsys.readouterr().out assert "Validating catalog at path" in captured assert "properties file does not exist or is invalid" in captured - assert "partition_info.csv file does not exist" in captured assert "_metadata file does not exist" in captured assert "_common_metadata file does not exist" in captured @@ -145,10 +138,16 @@ def test_is_valid_catalog_verbose_success(small_sky_dir, capsys): captured = capsys.readouterr().out assert "Validating catalog at path" in captured assert "Found 1 partition" in captured - assert "Approximate coverage is 3437.75 sq deg" in captured + assert "Approximate coverage is 8" in captured -def test_valid_catalog_strict_all(small_sky_source_dir, small_sky_order1_dir, small_sky_dir): +def test_valid_catalog_strict_all( + small_sky_source_dir, + small_sky_order1_dir, + small_sky_dir, + small_sky_source_object_index_dir, + margin_catalog_path, +): """Check that all of our object catalogs in test data are valid, using strict mechanism""" flags = { "strict": True, # more intensive checks @@ -158,3 +157,5 @@ def test_valid_catalog_strict_all(small_sky_source_dir, small_sky_order1_dir, sm assert is_valid_catalog(small_sky_source_dir, **flags) assert is_valid_catalog(small_sky_order1_dir, **flags) assert is_valid_catalog(small_sky_dir, **flags) + assert is_valid_catalog(small_sky_source_object_index_dir, **flags) + assert is_valid_catalog(margin_catalog_path, **flags)