Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve catalog validation and column statistics #404

Merged
merged 8 commits into from
Oct 31, 2024
25 changes: 24 additions & 1 deletion src/hats/catalog/dataset/dataset.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
from __future__ import annotations

from pathlib import Path
from typing import List
delucchi-cmu marked this conversation as resolved.
Show resolved Hide resolved

import pyarrow as pa
from upath import UPath

from hats.catalog.dataset.table_properties import TableProperties
from hats.io import file_io
from hats.io.parquet_metadata import aggregate_column_statistics


# pylint: disable=too-few-public-methods
Expand Down Expand Up @@ -34,5 +36,26 @@ def __init__(
self.catalog_path = catalog_path
self.on_disk = catalog_path is not None
self.catalog_base_dir = file_io.get_upath(self.catalog_path)

self.schema = schema

def aggregate_column_statistics(
self,
exclude_hats_columns: bool = True,
exclude_columns: List[str] = None,
include_columns: List[str] = None,
delucchi-cmu marked this conversation as resolved.
Show resolved Hide resolved
):
"""Read footer statistics in parquet metadata, and report on global min/max values.

Args:
exclude_hats_columns (bool): exclude HATS spatial and partitioning fields
from the statistics. Defaults to True.
exclude_columns (List[str]): additional columns to exclude from the statistics.
include_columns (List[str]): if specified, only return statistics for the column
delucchi-cmu marked this conversation as resolved.
Show resolved Hide resolved
names provided. Defaults to None, and returns all non-hats columns.
"""
return aggregate_column_statistics(
self.catalog_base_dir / "dataset" / "_metadata",
exclude_hats_columns=exclude_hats_columns,
exclude_columns=exclude_columns,
include_columns=include_columns,
)
2 changes: 1 addition & 1 deletion src/hats/catalog/partition_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,4 +277,4 @@ def calculate_fractional_coverage(self):
area_by_order = [hp.nside2pixarea(hp.order2nside(order), degrees=True) for order in cov_order]
# 41253 is the number of square degrees in a sphere
# https://en.wikipedia.org/wiki/Square_degree
return (area_by_order * cov_count).sum() / 41253
return (area_by_order * cov_count).sum() / (360**2 / np.pi)
116 changes: 52 additions & 64 deletions src/hats/io/validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@
from pathlib import Path

import numpy as np
import pyarrow.dataset as pds
from upath import UPath

import hats.pixel_math.healpix_shim as hp
from hats.catalog.dataset.table_properties import TableProperties
from hats.catalog.healpix_dataset.healpix_dataset import HealpixDataset
from hats.catalog.partition_info import PartitionInfo
from hats.io import get_common_metadata_pointer, get_parquet_metadata_pointer, get_partition_info_pointer
from hats.io.file_io import read_parquet_dataset
from hats.io.file_io import get_upath
from hats.io.file_io.file_pointer import is_regular_file
from hats.io.paths import get_healpix_from_path
from hats.loaders import read_hats
Expand Down Expand Up @@ -40,6 +41,7 @@
True if both the properties and partition_info files are
valid, False otherwise
"""
pointer = get_upath(pointer)
if not strict:
return is_catalog_info_valid(pointer) and (
is_partition_info_valid(pointer) or is_metadata_valid(pointer)
Expand Down Expand Up @@ -67,9 +69,6 @@
if not is_catalog_info_valid(pointer):
handle_error("properties file does not exist or is invalid.")

if not is_partition_info_valid(pointer):
handle_error("partition_info.csv file does not exist.")

if not is_metadata_valid(pointer):
handle_error("_metadata file does not exist.")

Expand All @@ -83,74 +82,63 @@

# Load as a catalog object. Confirms that the catalog info matches type.
catalog = read_hats(pointer)
expected_pixels = sort_pixels(catalog.get_healpix_pixels())

if verbose:
print(f"Found {len(expected_pixels)} partitions.")

## Compare the pixels in _metadata with partition_info.csv
metadata_file = get_parquet_metadata_pointer(pointer)

# Use both strategies of reading the partition info: strict and !strict.
metadata_pixels = sort_pixels(
PartitionInfo.read_from_file(metadata_file, strict=True).get_healpix_pixels()
## Load as parquet dataset. Allow errors, and check pixel set against _metadata
# As a side effect, this confirms that we can load the directory as a valid dataset.
dataset = pds.parquet_dataset(
metadata_file.path,
filesystem=metadata_file.fs,
)
if not np.array_equal(expected_pixels, metadata_pixels):
handle_error("Partition pixels differ between catalog and _metadata file (strict)")

metadata_pixels = sort_pixels(
PartitionInfo.read_from_file(metadata_file, strict=False).get_healpix_pixels()
)
if not np.array_equal(expected_pixels, metadata_pixels):
handle_error("Partition pixels differ between catalog and _metadata file (non-strict)")
if isinstance(catalog, HealpixDataset):
if not is_partition_info_valid(pointer):
handle_error("partition_info.csv file does not exist.")
return is_valid

partition_info_file = get_partition_info_pointer(pointer)
csv_pixels = sort_pixels(PartitionInfo.read_from_csv(partition_info_file).get_healpix_pixels())
if not np.array_equal(expected_pixels, csv_pixels):
handle_error("Partition pixels differ between catalog and partition_info.csv file")
expected_pixels = sort_pixels(catalog.get_healpix_pixels())

## Load as parquet dataset. Allow errors, and check pixel set against _metadata
ignore_prefixes = [
"_common_metadata",
"_metadata",
"catalog_info.json",
"properties",
"provenance_info.json",
"partition_info.csv",
"point_map.fits",
"README",
]
if verbose:
print(f"Found {len(expected_pixels)} partitions.")

# As a side effect, this confirms that we can load the directory as a valid dataset.
(dataset_path, dataset) = read_parquet_dataset(
pointer,
ignore_prefixes=ignore_prefixes,
exclude_invalid_files=False,
)
## Compare the pixels in _metadata with partition_info.csv
# Use both strategies of reading the partition info: strict and !strict.
metadata_pixels = sort_pixels(
PartitionInfo.read_from_file(metadata_file, strict=True).get_healpix_pixels()
)
if not np.array_equal(expected_pixels, metadata_pixels):
handle_error("Partition pixels differ between catalog and _metadata file (strict)")

metadata_pixels = sort_pixels(
PartitionInfo.read_from_file(metadata_file, strict=False).get_healpix_pixels()
)
if not np.array_equal(expected_pixels, metadata_pixels):
handle_error("Partition pixels differ between catalog and _metadata file (non-strict)")

parquet_path_pixels = []
for hats_file in dataset.files:
relative_path = hats_file[len(dataset_path) :]
healpix_pixel = get_healpix_from_path(relative_path)
if healpix_pixel == INVALID_PIXEL:
handle_error(f"Could not derive partition pixel from parquet path: {relative_path}")
partition_info_file = get_partition_info_pointer(pointer)
partition_info = PartitionInfo.read_from_csv(partition_info_file)
csv_pixels = sort_pixels(partition_info.get_healpix_pixels())
if not np.array_equal(expected_pixels, csv_pixels):
handle_error("Partition pixels differ between catalog and partition_info.csv file")

Check warning on line 122 in src/hats/io/validation.py

View check run for this annotation

Codecov / codecov/patch

src/hats/io/validation.py#L122

Added line #L122 was not covered by tests

parquet_path_pixels.append(healpix_pixel)
parquet_path_pixels = []
for hats_file in dataset.files:
healpix_pixel = get_healpix_from_path(hats_file)
if healpix_pixel == INVALID_PIXEL:
handle_error(f"Could not derive partition pixel from parquet path: {hats_file}")
parquet_path_pixels.append(healpix_pixel)

parquet_path_pixels = sort_pixels(parquet_path_pixels)
parquet_path_pixels = sort_pixels(parquet_path_pixels)

if not np.array_equal(expected_pixels, parquet_path_pixels):
handle_error("Partition pixels differ between catalog and parquet paths")
if not np.array_equal(expected_pixels, parquet_path_pixels):
handle_error("Partition pixels differ between catalog and parquet paths")

if verbose:
# Print a few more stats
pixel_orders = [p.order for p in expected_pixels]
cov_order, cov_count = np.unique(pixel_orders, return_counts=True)
area_by_order = [hp.nside2pixarea(hp.order2nside(order), degrees=True) for order in cov_order]
total_area = (area_by_order * cov_count).sum()
print(
f"Approximate coverage is {total_area:0.2f} sq deg, or {total_area/41253*100:0.2f} % of the sky."
)
if verbose:
# Print a few more stats
print(
"Approximate coverage is "
f"{partition_info.calculate_fractional_coverage()*100:0.2f} % of the sky."
)

return is_valid

Expand All @@ -172,7 +160,7 @@
return True


def is_partition_info_valid(pointer: str | Path | UPath) -> bool:
def is_partition_info_valid(pointer: UPath) -> bool:
"""Checks if partition_info is valid for a given base catalog pointer

Args:
Expand All @@ -186,7 +174,7 @@
return partition_info_exists


def is_metadata_valid(pointer: str | Path | UPath) -> bool:
def is_metadata_valid(pointer: UPath) -> bool:
"""Checks if _metadata is valid for a given base catalog pointer

Args:
Expand All @@ -200,7 +188,7 @@
return metadata_file_exists


def is_common_metadata_valid(pointer: str | Path | UPath) -> bool:
def is_common_metadata_valid(pointer: UPath) -> bool:
"""Checks if _common_metadata is valid for a given base catalog pointer

Args:
Expand Down
13 changes: 13 additions & 0 deletions tests/hats/catalog/test_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,19 @@ def test_load_catalog_small_sky_order1(small_sky_order1_dir):
assert len(cat.get_healpix_pixels()) == 4


def test_aggregate_column_statistics(small_sky_order1_dir):
cat = read_hats(small_sky_order1_dir)

result_frame = cat.aggregate_column_statistics()
assert len(result_frame) == 5

result_frame = cat.aggregate_column_statistics(exclude_hats_columns=False)
assert len(result_frame) == 9

result_frame = cat.aggregate_column_statistics(include_columns=["ra", "dec"])
assert len(result_frame) == 2


def test_load_catalog_small_sky_order1_moc(small_sky_order1_dir):
"""Instantiate a catalog with 4 pixels"""
cat = read_hats(small_sky_order1_dir)
Expand Down
33 changes: 17 additions & 16 deletions tests/hats/io/test_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import os
import shutil
from pathlib import Path

import pytest

Expand Down Expand Up @@ -65,10 +64,7 @@ def test_is_valid_catalog_strict(tmp_path, small_sky_catalog, small_sky_pixels,
assert not is_valid_catalog(tmp_path, **flags)

# This outta do it! Add parquet files that match the _metadata pixels.
shutil.copytree(
Path(small_sky_catalog.catalog_path) / "dataset" / "Norder=0",
tmp_path / "dataset" / "Norder=0",
)
shutil.copytree(small_sky_catalog.catalog_path / "dataset", tmp_path / "dataset", dirs_exist_ok=True)

assert is_valid_catalog(tmp_path, **flags)

Expand All @@ -93,22 +89,20 @@ def test_is_valid_catalog_fail_fast(tmp_path, small_sky_catalog, small_sky_pixel

# Having the catalog_info file is not enough
small_sky_catalog.catalog_info.to_properties_file(tmp_path)
with pytest.raises(ValueError, match="_metadata"):
is_valid_catalog(tmp_path, **flags)

total_rows = PartitionInfo.from_healpix(small_sky_pixels).write_to_metadata_files(tmp_path)
with pytest.raises(ValueError, match="partition_info.csv"):
is_valid_catalog(tmp_path, **flags)

PartitionInfo.from_healpix(small_sky_pixels).write_to_file(catalog_path=tmp_path)
with pytest.raises(ValueError, match="_metadata"):
is_valid_catalog(tmp_path, **flags)

total_rows = PartitionInfo.from_healpix(small_sky_pixels).write_to_metadata_files(tmp_path)
assert total_rows == 1
with pytest.raises(ValueError, match="parquet paths"):
with pytest.raises(ValueError, match="parquet path"):
is_valid_catalog(tmp_path, **flags)

shutil.copytree(
Path(small_sky_catalog.catalog_path) / "dataset" / "Norder=0",
tmp_path / "dataset" / "Norder=0",
)
shutil.copytree(small_sky_catalog.catalog_path / "dataset", tmp_path / "dataset", dirs_exist_ok=True)
assert is_valid_catalog(tmp_path, **flags)


Expand All @@ -127,7 +121,6 @@ def test_is_valid_catalog_verbose_fail(tmp_path, capsys):
captured = capsys.readouterr().out
assert "Validating catalog at path" in captured
assert "properties file does not exist or is invalid" in captured
assert "partition_info.csv file does not exist" in captured
assert "_metadata file does not exist" in captured
assert "_common_metadata file does not exist" in captured

Expand All @@ -145,10 +138,16 @@ def test_is_valid_catalog_verbose_success(small_sky_dir, capsys):
captured = capsys.readouterr().out
assert "Validating catalog at path" in captured
assert "Found 1 partition" in captured
assert "Approximate coverage is 3437.75 sq deg" in captured
assert "Approximate coverage is 8" in captured


def test_valid_catalog_strict_all(small_sky_source_dir, small_sky_order1_dir, small_sky_dir):
def test_valid_catalog_strict_all(
small_sky_source_dir,
small_sky_order1_dir,
small_sky_dir,
small_sky_source_object_index_dir,
margin_catalog_path,
):
"""Check that all of our object catalogs in test data are valid, using strict mechanism"""
flags = {
"strict": True, # more intensive checks
Expand All @@ -158,3 +157,5 @@ def test_valid_catalog_strict_all(small_sky_source_dir, small_sky_order1_dir, sm
assert is_valid_catalog(small_sky_source_dir, **flags)
assert is_valid_catalog(small_sky_order1_dir, **flags)
assert is_valid_catalog(small_sky_dir, **flags)
assert is_valid_catalog(small_sky_source_object_index_dir, **flags)
assert is_valid_catalog(margin_catalog_path, **flags)