From 6b41eb9c8bceae831b51494453e16f061c56fe6e Mon Sep 17 00:00:00 2001 From: Melissa DeLucchi Date: Fri, 25 Oct 2024 16:43:55 -0400 Subject: [PATCH 1/5] Change validation for improved dataset reads. Add statistics method on catlog. --- src/hats/catalog/dataset/dataset.py | 25 +++++++++++++++++++++++- src/hats/io/validation.py | 30 ++++++++++------------------- tests/hats/catalog/test_catalog.py | 13 +++++++++++++ tests/hats/io/test_validation.py | 13 +++---------- 4 files changed, 50 insertions(+), 31 deletions(-) diff --git a/src/hats/catalog/dataset/dataset.py b/src/hats/catalog/dataset/dataset.py index fc567f60..27975753 100644 --- a/src/hats/catalog/dataset/dataset.py +++ b/src/hats/catalog/dataset/dataset.py @@ -1,13 +1,14 @@ from __future__ import annotations from pathlib import Path -from typing import Tuple +from typing import List, Tuple from typing_extensions import Self from upath import UPath from hats.catalog.dataset.table_properties import TableProperties from hats.io import file_io +from hats.io.parquet_metadata import aggregate_column_statistics class Dataset: @@ -29,6 +30,28 @@ def __init__(self, catalog_info: TableProperties, catalog_path: str | Path | UPa self.on_disk = catalog_path is not None self.catalog_base_dir = file_io.get_upath(self.catalog_path) + def aggregate_column_statistics( + self, + exclude_hats_columns: bool = True, + exclude_columns: List[str] = None, + include_columns: List[str] = None, + ): + """Read footer statistics in parquet metadata, and report on global min/max values. + + Args: + exclude_hats_columns (bool): exclude HATS spatial and partitioning fields + from the statistics. Defaults to True. + exclude_columns (List[str]): additional columns to exclude from the statistics. + include_columns (List[str]): if specified, only return statistics for the column + names provided. Defaults to None, and returns all non-hats columns. + """ + return aggregate_column_statistics( + self.catalog_base_dir / "dataset" / "_metadata", + exclude_hats_columns=exclude_hats_columns, + exclude_columns=exclude_columns, + include_columns=include_columns, + ) + @classmethod def read_hats(cls, catalog_path: str | Path | UPath) -> Self: """Reads a HATS Catalog from a HATS directory diff --git a/src/hats/io/validation.py b/src/hats/io/validation.py index d44f5f2a..84456635 100644 --- a/src/hats/io/validation.py +++ b/src/hats/io/validation.py @@ -4,13 +4,14 @@ from pathlib import Path import numpy as np +import pyarrow.dataset as pds from upath import UPath import hats.pixel_math.healpix_shim as hp from hats.catalog.dataset.table_properties import TableProperties from hats.catalog.partition_info import PartitionInfo from hats.io import get_common_metadata_pointer, get_parquet_metadata_pointer, get_partition_info_pointer -from hats.io.file_io import read_parquet_dataset +from hats.io.file_io import get_upath from hats.io.file_io.file_pointer import is_regular_file from hats.io.paths import get_healpix_from_path from hats.loaders import read_hats @@ -40,6 +41,7 @@ def is_valid_catalog( True if both the properties and partition_info files are valid, False otherwise """ + pointer = get_upath(pointer) if not strict: return is_catalog_info_valid(pointer) and ( is_partition_info_valid(pointer) or is_metadata_valid(pointer) @@ -110,31 +112,19 @@ def handle_error(msg): handle_error("Partition pixels differ between catalog and partition_info.csv file") ## Load as parquet dataset. Allow errors, and check pixel set against _metadata - ignore_prefixes = [ - "_common_metadata", - "_metadata", - "catalog_info.json", - "properties", - "provenance_info.json", - "partition_info.csv", - "point_map.fits", - "README", - ] - # As a side effect, this confirms that we can load the directory as a valid dataset. - (dataset_path, dataset) = read_parquet_dataset( - pointer, - ignore_prefixes=ignore_prefixes, - exclude_invalid_files=False, + dataset = pds.parquet_dataset( + metadata_file.path, + filesystem=metadata_file.fs, ) parquet_path_pixels = [] + dataset_path = str(pointer / "dataset") for hats_file in dataset.files: relative_path = hats_file[len(dataset_path) :] healpix_pixel = get_healpix_from_path(relative_path) if healpix_pixel == INVALID_PIXEL: handle_error(f"Could not derive partition pixel from parquet path: {relative_path}") - parquet_path_pixels.append(healpix_pixel) parquet_path_pixels = sort_pixels(parquet_path_pixels) @@ -172,7 +162,7 @@ def is_catalog_info_valid(pointer: str | Path | UPath) -> bool: return True -def is_partition_info_valid(pointer: str | Path | UPath) -> bool: +def is_partition_info_valid(pointer: UPath) -> bool: """Checks if partition_info is valid for a given base catalog pointer Args: @@ -186,7 +176,7 @@ def is_partition_info_valid(pointer: str | Path | UPath) -> bool: return partition_info_exists -def is_metadata_valid(pointer: str | Path | UPath) -> bool: +def is_metadata_valid(pointer: UPath) -> bool: """Checks if _metadata is valid for a given base catalog pointer Args: @@ -200,7 +190,7 @@ def is_metadata_valid(pointer: str | Path | UPath) -> bool: return metadata_file_exists -def is_common_metadata_valid(pointer: str | Path | UPath) -> bool: +def is_common_metadata_valid(pointer: UPath) -> bool: """Checks if _common_metadata is valid for a given base catalog pointer Args: diff --git a/tests/hats/catalog/test_catalog.py b/tests/hats/catalog/test_catalog.py index 9ac1c213..1ce71eb0 100644 --- a/tests/hats/catalog/test_catalog.py +++ b/tests/hats/catalog/test_catalog.py @@ -91,6 +91,19 @@ def test_load_catalog_small_sky_order1(small_sky_order1_dir): assert len(cat.get_healpix_pixels()) == 4 +def test_aggregate_column_statistics(small_sky_order1_dir): + cat = read_hats(small_sky_order1_dir) + + result_frame = cat.aggregate_column_statistics() + assert len(result_frame) == 5 + + result_frame = cat.aggregate_column_statistics(exclude_hats_columns=False) + assert len(result_frame) == 9 + + result_frame = cat.aggregate_column_statistics(include_columns=["ra", "dec"]) + assert len(result_frame) == 2 + + def test_load_catalog_small_sky_order1_moc(small_sky_order1_dir): """Instantiate a catalog with 4 pixels""" cat = read_hats(small_sky_order1_dir) diff --git a/tests/hats/io/test_validation.py b/tests/hats/io/test_validation.py index 3b216bf0..9a5ca722 100644 --- a/tests/hats/io/test_validation.py +++ b/tests/hats/io/test_validation.py @@ -2,7 +2,6 @@ import os import shutil -from pathlib import Path import pytest @@ -65,10 +64,7 @@ def test_is_valid_catalog_strict(tmp_path, small_sky_catalog, small_sky_pixels, assert not is_valid_catalog(tmp_path, **flags) # This outta do it! Add parquet files that match the _metadata pixels. - shutil.copytree( - Path(small_sky_catalog.catalog_path) / "dataset" / "Norder=0", - tmp_path / "dataset" / "Norder=0", - ) + shutil.copytree(small_sky_catalog.catalog_path / "dataset", tmp_path / "dataset", dirs_exist_ok=True) assert is_valid_catalog(tmp_path, **flags) @@ -102,13 +98,10 @@ def test_is_valid_catalog_fail_fast(tmp_path, small_sky_catalog, small_sky_pixel total_rows = PartitionInfo.from_healpix(small_sky_pixels).write_to_metadata_files(tmp_path) assert total_rows == 1 - with pytest.raises(ValueError, match="parquet paths"): + with pytest.raises(ValueError, match="parquet path"): is_valid_catalog(tmp_path, **flags) - shutil.copytree( - Path(small_sky_catalog.catalog_path) / "dataset" / "Norder=0", - tmp_path / "dataset" / "Norder=0", - ) + shutil.copytree(small_sky_catalog.catalog_path / "dataset", tmp_path / "dataset", dirs_exist_ok=True) assert is_valid_catalog(tmp_path, **flags) From a17a44fc324270696f169cc6aaadbb9065f045c2 Mon Sep 17 00:00:00 2001 From: Melissa DeLucchi Date: Thu, 31 Oct 2024 12:38:03 -0400 Subject: [PATCH 2/5] Skip pixel validation for index catalogs. --- src/hats/io/validation.py | 95 ++++++++++++++++---------------- tests/hats/io/test_validation.py | 18 ++++-- 2 files changed, 62 insertions(+), 51 deletions(-) diff --git a/src/hats/io/validation.py b/src/hats/io/validation.py index 84456635..ed49cfff 100644 --- a/src/hats/io/validation.py +++ b/src/hats/io/validation.py @@ -9,6 +9,7 @@ import hats.pixel_math.healpix_shim as hp from hats.catalog.dataset.table_properties import TableProperties +from hats.catalog.healpix_dataset.healpix_dataset import HealpixDataset from hats.catalog.partition_info import PartitionInfo from hats.io import get_common_metadata_pointer, get_parquet_metadata_pointer, get_partition_info_pointer from hats.io.file_io import get_upath @@ -69,9 +70,6 @@ def handle_error(msg): if not is_catalog_info_valid(pointer): handle_error("properties file does not exist or is invalid.") - if not is_partition_info_valid(pointer): - handle_error("partition_info.csv file does not exist.") - if not is_metadata_valid(pointer): handle_error("_metadata file does not exist.") @@ -85,32 +83,8 @@ def handle_error(msg): # Load as a catalog object. Confirms that the catalog info matches type. catalog = read_hats(pointer) - expected_pixels = sort_pixels(catalog.get_healpix_pixels()) - - if verbose: - print(f"Found {len(expected_pixels)} partitions.") - - ## Compare the pixels in _metadata with partition_info.csv metadata_file = get_parquet_metadata_pointer(pointer) - # Use both strategies of reading the partition info: strict and !strict. - metadata_pixels = sort_pixels( - PartitionInfo.read_from_file(metadata_file, strict=True).get_healpix_pixels() - ) - if not np.array_equal(expected_pixels, metadata_pixels): - handle_error("Partition pixels differ between catalog and _metadata file (strict)") - - metadata_pixels = sort_pixels( - PartitionInfo.read_from_file(metadata_file, strict=False).get_healpix_pixels() - ) - if not np.array_equal(expected_pixels, metadata_pixels): - handle_error("Partition pixels differ between catalog and _metadata file (non-strict)") - - partition_info_file = get_partition_info_pointer(pointer) - csv_pixels = sort_pixels(PartitionInfo.read_from_csv(partition_info_file).get_healpix_pixels()) - if not np.array_equal(expected_pixels, csv_pixels): - handle_error("Partition pixels differ between catalog and partition_info.csv file") - ## Load as parquet dataset. Allow errors, and check pixel set against _metadata # As a side effect, this confirms that we can load the directory as a valid dataset. dataset = pds.parquet_dataset( @@ -118,29 +92,58 @@ def handle_error(msg): filesystem=metadata_file.fs, ) - parquet_path_pixels = [] - dataset_path = str(pointer / "dataset") - for hats_file in dataset.files: - relative_path = hats_file[len(dataset_path) :] - healpix_pixel = get_healpix_from_path(relative_path) - if healpix_pixel == INVALID_PIXEL: - handle_error(f"Could not derive partition pixel from parquet path: {relative_path}") - parquet_path_pixels.append(healpix_pixel) + if isinstance(catalog, HealpixDataset): + if not is_partition_info_valid(pointer): + handle_error("partition_info.csv file does not exist.") + return is_valid - parquet_path_pixels = sort_pixels(parquet_path_pixels) + expected_pixels = sort_pixels(catalog.get_healpix_pixels()) - if not np.array_equal(expected_pixels, parquet_path_pixels): - handle_error("Partition pixels differ between catalog and parquet paths") + if verbose: + print(f"Found {len(expected_pixels)} partitions.") - if verbose: - # Print a few more stats - pixel_orders = [p.order for p in expected_pixels] - cov_order, cov_count = np.unique(pixel_orders, return_counts=True) - area_by_order = [hp.nside2pixarea(hp.order2nside(order), degrees=True) for order in cov_order] - total_area = (area_by_order * cov_count).sum() - print( - f"Approximate coverage is {total_area:0.2f} sq deg, or {total_area/41253*100:0.2f} % of the sky." + ## Compare the pixels in _metadata with partition_info.csv + # Use both strategies of reading the partition info: strict and !strict. + metadata_pixels = sort_pixels( + PartitionInfo.read_from_file(metadata_file, strict=True).get_healpix_pixels() + ) + if not np.array_equal(expected_pixels, metadata_pixels): + handle_error("Partition pixels differ between catalog and _metadata file (strict)") + + metadata_pixels = sort_pixels( + PartitionInfo.read_from_file(metadata_file, strict=False).get_healpix_pixels() ) + if not np.array_equal(expected_pixels, metadata_pixels): + handle_error("Partition pixels differ between catalog and _metadata file (non-strict)") + + partition_info_file = get_partition_info_pointer(pointer) + csv_pixels = sort_pixels(PartitionInfo.read_from_csv(partition_info_file).get_healpix_pixels()) + if not np.array_equal(expected_pixels, csv_pixels): + handle_error("Partition pixels differ between catalog and partition_info.csv file") + + parquet_path_pixels = [] + dataset_path = str(pointer / "dataset") + for hats_file in dataset.files: + relative_path = hats_file[len(dataset_path) :] + healpix_pixel = get_healpix_from_path(relative_path) + if healpix_pixel == INVALID_PIXEL: + handle_error(f"Could not derive partition pixel from parquet path: {relative_path}") + parquet_path_pixels.append(healpix_pixel) + + parquet_path_pixels = sort_pixels(parquet_path_pixels) + + if not np.array_equal(expected_pixels, parquet_path_pixels): + handle_error("Partition pixels differ between catalog and parquet paths") + + if verbose: + # Print a few more stats + pixel_orders = [p.order for p in expected_pixels] + cov_order, cov_count = np.unique(pixel_orders, return_counts=True) + area_by_order = [hp.nside2pixarea(hp.order2nside(order), degrees=True) for order in cov_order] + total_area = (area_by_order * cov_count).sum() + print( + f"Approximate coverage is {total_area:0.2f} sq deg, or {total_area/41253*100:0.2f} % of the sky." + ) return is_valid diff --git a/tests/hats/io/test_validation.py b/tests/hats/io/test_validation.py index 9a5ca722..dc6f5355 100644 --- a/tests/hats/io/test_validation.py +++ b/tests/hats/io/test_validation.py @@ -89,14 +89,15 @@ def test_is_valid_catalog_fail_fast(tmp_path, small_sky_catalog, small_sky_pixel # Having the catalog_info file is not enough small_sky_catalog.catalog_info.to_properties_file(tmp_path) + with pytest.raises(ValueError, match="_metadata"): + is_valid_catalog(tmp_path, **flags) + + total_rows = PartitionInfo.from_healpix(small_sky_pixels).write_to_metadata_files(tmp_path) with pytest.raises(ValueError, match="partition_info.csv"): is_valid_catalog(tmp_path, **flags) PartitionInfo.from_healpix(small_sky_pixels).write_to_file(catalog_path=tmp_path) - with pytest.raises(ValueError, match="_metadata"): - is_valid_catalog(tmp_path, **flags) - total_rows = PartitionInfo.from_healpix(small_sky_pixels).write_to_metadata_files(tmp_path) assert total_rows == 1 with pytest.raises(ValueError, match="parquet path"): is_valid_catalog(tmp_path, **flags) @@ -120,7 +121,6 @@ def test_is_valid_catalog_verbose_fail(tmp_path, capsys): captured = capsys.readouterr().out assert "Validating catalog at path" in captured assert "properties file does not exist or is invalid" in captured - assert "partition_info.csv file does not exist" in captured assert "_metadata file does not exist" in captured assert "_common_metadata file does not exist" in captured @@ -141,7 +141,13 @@ def test_is_valid_catalog_verbose_success(small_sky_dir, capsys): assert "Approximate coverage is 3437.75 sq deg" in captured -def test_valid_catalog_strict_all(small_sky_source_dir, small_sky_order1_dir, small_sky_dir): +def test_valid_catalog_strict_all( + small_sky_source_dir, + small_sky_order1_dir, + small_sky_dir, + small_sky_source_object_index_dir, + margin_catalog_path, +): """Check that all of our object catalogs in test data are valid, using strict mechanism""" flags = { "strict": True, # more intensive checks @@ -151,3 +157,5 @@ def test_valid_catalog_strict_all(small_sky_source_dir, small_sky_order1_dir, sm assert is_valid_catalog(small_sky_source_dir, **flags) assert is_valid_catalog(small_sky_order1_dir, **flags) assert is_valid_catalog(small_sky_dir, **flags) + assert is_valid_catalog(small_sky_source_object_index_dir, **flags) + assert is_valid_catalog(margin_catalog_path, **flags) From 7c6992e43aba3ef47efae4dcc263a0cba71258df Mon Sep 17 00:00:00 2001 From: Melissa DeLucchi Date: Thu, 31 Oct 2024 12:51:46 -0400 Subject: [PATCH 3/5] Pylint from main merge. --- src/hats/catalog/dataset/dataset.py | 1 + src/hats/io/validation.py | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/src/hats/catalog/dataset/dataset.py b/src/hats/catalog/dataset/dataset.py index c531fda3..6477e14d 100644 --- a/src/hats/catalog/dataset/dataset.py +++ b/src/hats/catalog/dataset/dataset.py @@ -1,6 +1,7 @@ from __future__ import annotations from pathlib import Path +from typing import List import pyarrow as pa from upath import UPath diff --git a/src/hats/io/validation.py b/src/hats/io/validation.py index ed49cfff..510ee910 100644 --- a/src/hats/io/validation.py +++ b/src/hats/io/validation.py @@ -142,7 +142,8 @@ def handle_error(msg): area_by_order = [hp.nside2pixarea(hp.order2nside(order), degrees=True) for order in cov_order] total_area = (area_by_order * cov_count).sum() print( - f"Approximate coverage is {total_area:0.2f} sq deg, or {total_area/41253*100:0.2f} % of the sky." + f"Approximate coverage is {total_area:0.2f} sq deg, " + "or {total_area/41253*100:0.2f} % of the sky." ) return is_valid From ab520f50f3cedb12d4779e0664994d6c7e67f429 Mon Sep 17 00:00:00 2001 From: Melissa DeLucchi Date: Thu, 31 Oct 2024 13:38:51 -0400 Subject: [PATCH 4/5] Comments from code review. --- src/hats/catalog/partition_info.py | 2 +- src/hats/io/validation.py | 17 ++++++----------- tests/hats/io/test_validation.py | 2 +- 3 files changed, 8 insertions(+), 13 deletions(-) diff --git a/src/hats/catalog/partition_info.py b/src/hats/catalog/partition_info.py index a3c05ecf..d073702d 100644 --- a/src/hats/catalog/partition_info.py +++ b/src/hats/catalog/partition_info.py @@ -277,4 +277,4 @@ def calculate_fractional_coverage(self): area_by_order = [hp.nside2pixarea(hp.order2nside(order), degrees=True) for order in cov_order] # 41253 is the number of square degrees in a sphere # https://en.wikipedia.org/wiki/Square_degree - return (area_by_order * cov_count).sum() / 41253 + return (area_by_order * cov_count).sum() / (360**2 / np.pi) diff --git a/src/hats/io/validation.py b/src/hats/io/validation.py index 510ee910..800cb606 100644 --- a/src/hats/io/validation.py +++ b/src/hats/io/validation.py @@ -117,17 +117,16 @@ def handle_error(msg): handle_error("Partition pixels differ between catalog and _metadata file (non-strict)") partition_info_file = get_partition_info_pointer(pointer) - csv_pixels = sort_pixels(PartitionInfo.read_from_csv(partition_info_file).get_healpix_pixels()) + partition_info = PartitionInfo.read_from_csv(partition_info_file) + csv_pixels = sort_pixels(partition_info.get_healpix_pixels()) if not np.array_equal(expected_pixels, csv_pixels): handle_error("Partition pixels differ between catalog and partition_info.csv file") parquet_path_pixels = [] - dataset_path = str(pointer / "dataset") for hats_file in dataset.files: - relative_path = hats_file[len(dataset_path) :] - healpix_pixel = get_healpix_from_path(relative_path) + healpix_pixel = get_healpix_from_path(hats_file) if healpix_pixel == INVALID_PIXEL: - handle_error(f"Could not derive partition pixel from parquet path: {relative_path}") + handle_error(f"Could not derive partition pixel from parquet path: {hats_file}") parquet_path_pixels.append(healpix_pixel) parquet_path_pixels = sort_pixels(parquet_path_pixels) @@ -137,13 +136,9 @@ def handle_error(msg): if verbose: # Print a few more stats - pixel_orders = [p.order for p in expected_pixels] - cov_order, cov_count = np.unique(pixel_orders, return_counts=True) - area_by_order = [hp.nside2pixarea(hp.order2nside(order), degrees=True) for order in cov_order] - total_area = (area_by_order * cov_count).sum() print( - f"Approximate coverage is {total_area:0.2f} sq deg, " - "or {total_area/41253*100:0.2f} % of the sky." + "Approximate coverage is " + f"{partition_info.calculate_fractional_coverage()*100:0.2f} % of the sky." ) return is_valid diff --git a/tests/hats/io/test_validation.py b/tests/hats/io/test_validation.py index dc6f5355..ee05800b 100644 --- a/tests/hats/io/test_validation.py +++ b/tests/hats/io/test_validation.py @@ -138,7 +138,7 @@ def test_is_valid_catalog_verbose_success(small_sky_dir, capsys): captured = capsys.readouterr().out assert "Validating catalog at path" in captured assert "Found 1 partition" in captured - assert "Approximate coverage is 3437.75 sq deg" in captured + assert "Approximate coverage is 8" in captured def test_valid_catalog_strict_all( From 1636208b246d275f2b2750ce0a19de05ca5bd5c6 Mon Sep 17 00:00:00 2001 From: Melissa DeLucchi Date: Thu, 31 Oct 2024 13:52:59 -0400 Subject: [PATCH 5/5] pyyyylinttttt --- src/hats/io/validation.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/hats/io/validation.py b/src/hats/io/validation.py index 800cb606..3717de3c 100644 --- a/src/hats/io/validation.py +++ b/src/hats/io/validation.py @@ -7,7 +7,6 @@ import pyarrow.dataset as pds from upath import UPath -import hats.pixel_math.healpix_shim as hp from hats.catalog.dataset.table_properties import TableProperties from hats.catalog.healpix_dataset.healpix_dataset import HealpixDataset from hats.catalog.partition_info import PartitionInfo