diff --git a/src/hipscat/catalog/healpix_dataset/healpix_dataset.py b/src/hipscat/catalog/healpix_dataset/healpix_dataset.py index d9b98025..18374e37 100644 --- a/src/hipscat/catalog/healpix_dataset/healpix_dataset.py +++ b/src/hipscat/catalog/healpix_dataset/healpix_dataset.py @@ -79,18 +79,29 @@ def _get_pixel_tree_from_pixels(pixels: PixelInputTypes) -> PixelTree: @classmethod def _read_args( - cls, catalog_base_dir: FilePointer, storage_options: Union[Dict[Any, Any], None] = None + cls, + catalog_base_dir: FilePointer, + storage_options: Union[Dict[Any, Any], None] = None, ) -> Tuple[CatalogInfoClass, PartitionInfo]: args = super()._read_args(catalog_base_dir, storage_options=storage_options) - partition_info_file = paths.get_partition_info_pointer(catalog_base_dir) - partition_info = PartitionInfo.read_from_file(partition_info_file, storage_options=storage_options) + metadata_file = paths.get_parquet_metadata_pointer(catalog_base_dir) + if file_io.does_file_or_directory_exist(metadata_file, storage_options=storage_options): + partition_info = PartitionInfo.read_from_file(metadata_file, storage_options=storage_options) + else: + partition_info_file = paths.get_partition_info_pointer(catalog_base_dir) + partition_info = PartitionInfo.read_from_csv(partition_info_file, storage_options=storage_options) return args + (partition_info,) @classmethod - def _check_files_exist( - cls, catalog_base_dir: FilePointer, storage_options: Union[Dict[Any, Any], None] = None - ): + def _check_files_exist(cls, catalog_base_dir: FilePointer, storage_options: dict = None): super()._check_files_exist(catalog_base_dir, storage_options=storage_options) + partition_info_file = paths.get_partition_info_pointer(catalog_base_dir) - if not file_io.does_file_or_directory_exist(partition_info_file, storage_options=storage_options): - raise FileNotFoundError(f"No partition info found where expected: {str(partition_info_file)}") + metadata_file = paths.get_parquet_metadata_pointer(catalog_base_dir) + if not ( + file_io.does_file_or_directory_exist(partition_info_file, storage_options=storage_options) + or file_io.does_file_or_directory_exist(metadata_file, storage_options=storage_options) + ): + raise FileNotFoundError( + f"_metadata or partition info file is required in catalog directory {catalog_base_dir}" + ) diff --git a/src/hipscat/catalog/partition_info.py b/src/hipscat/catalog/partition_info.py index b87a3006..49a3ce9b 100644 --- a/src/hipscat/catalog/partition_info.py +++ b/src/hipscat/catalog/partition_info.py @@ -1,10 +1,18 @@ """Container class to hold per-partition metadata""" -from typing import Any, Dict, List, Union +from __future__ import annotations + +from typing import List import numpy as np import pandas as pd +import pyarrow as pa from hipscat.io import FilePointer, file_io +from hipscat.io.parquet_metadata import ( + read_row_group_fragments, + row_group_stat_single_value, + write_parquet_metadata_for_batches, +) from hipscat.pixel_math import HealpixPixel @@ -44,15 +52,59 @@ def write_to_file(self, partition_info_file: FilePointer): """ file_io.write_dataframe_to_csv(self.as_dataframe(), partition_info_file, index=False) + def write_to_metadata_files(self, catalog_path: FilePointer, storage_options: dict = None): + """Generate parquet metadata, using the known partitions. + + Args: + catalog_path (FilePointer): base path for the catalog + storage_options (dict): dictionary that contains abstract filesystem credentials + """ + batches = [ + pa.RecordBatch.from_arrays( + [[pixel.order], [pixel.dir], [pixel.pixel]], + names=[ + self.METADATA_ORDER_COLUMN_NAME, + self.METADATA_DIR_COLUMN_NAME, + self.METADATA_PIXEL_COLUMN_NAME, + ], + ) + for pixel in self.get_healpix_pixels() + ] + + write_parquet_metadata_for_batches(batches, catalog_path, storage_options) + + @classmethod + def read_from_file(cls, metadata_file: FilePointer, storage_options: dict = None) -> PartitionInfo: + """Read partition info from a `_metadata` file to create an object + + Args: + metadata_file (FilePointer): FilePointer to the `_metadata` file + storage_options (dict): dictionary that contains abstract filesystem credentials + + Returns: + A `PartitionInfo` object with the data from the file + """ + pixel_list = [ + HealpixPixel( + row_group_stat_single_value(row_group, cls.METADATA_ORDER_COLUMN_NAME), + row_group_stat_single_value(row_group, cls.METADATA_PIXEL_COLUMN_NAME), + ) + for row_group in read_row_group_fragments(metadata_file, storage_options) + ] + ## Remove duplicates, preserving order. + ## In the case of association partition join info, we may have multiple entries + ## for the primary order/pixels. + pixel_list = list(dict.fromkeys(pixel_list)) + + return cls(pixel_list) + @classmethod - def read_from_file( - cls, partition_info_file: FilePointer, storage_options: Union[Dict[Any, Any], None] = None - ): + def read_from_csv(cls, partition_info_file: FilePointer, storage_options: dict = None) -> PartitionInfo: """Read partition info from a `partition_info.csv` file to create an object Args: - partition_info_file: FilePointer to the `partition_info.csv` file - storage_options: dictionary that contains abstract filesystem credentials + partition_info_file (FilePointer): FilePointer to the `partition_info.csv` file + storage_options (dict): dictionary that contains abstract filesystem credentials Returns: A `PartitionInfo` object with the data from the file @@ -92,7 +144,7 @@ def as_dataframe(self): return pd.DataFrame.from_dict(partition_info_dict) @classmethod - def from_healpix(cls, healpix_pixels: List[HealpixPixel]): + def from_healpix(cls, healpix_pixels: List[HealpixPixel]) -> PartitionInfo: """Create a partition info object from a list of constituent healpix pixels. Args: diff --git a/src/hipscat/io/__init__.py b/src/hipscat/io/__init__.py index 3f046d75..78098bf5 100644 --- a/src/hipscat/io/__init__.py +++ b/src/hipscat/io/__init__.py @@ -1,6 +1,11 @@ """Utilities for reading and writing catalog files""" from .file_io import FilePointer, get_file_pointer_from_path +from .parquet_metadata import ( + read_row_group_fragments, + row_group_stat_single_value, + write_parquet_metadata_for_batches, +) from .paths import ( create_hive_directory_name, create_hive_parquet_file_name, diff --git a/src/hipscat/io/parquet_metadata.py b/src/hipscat/io/parquet_metadata.py new file mode 100644 index 00000000..9aa84b3c --- /dev/null +++ b/src/hipscat/io/parquet_metadata.py @@ -0,0 +1,122 @@ +"""Utility functions for handling parquet metadata files""" +import tempfile +from typing import List + +import pyarrow as pa +import pyarrow.dataset as pds +import pyarrow.parquet as pq + +from hipscat.io import file_io, paths +from hipscat.io.file_io.file_pointer import get_fs, strip_leading_slash_for_pyarrow + + +def row_group_stat_single_value(row_group, stat_key: str): + """Convenience method to find the min and max inside a statistics dictionary, + and raise an error if they're unequal. + + Args: + row_group: dataset fragment row group + stat_key (str): column name of interest. + Returns: + The value of the specified row group statistic + """ + if stat_key not in row_group.statistics: + raise ValueError(f"row group doesn't have expected key {stat_key}") + stat_dict = row_group.statistics[stat_key] + min_val = stat_dict["min"] + max_val = stat_dict["max"] + if min_val != max_val: + raise ValueError(f"stat min != max ({min_val} != {max_val})") + return min_val + + +def write_parquet_metadata(catalog_path: str, storage_options: dict = None, output_path: str = None): + """Generate parquet metadata, using the already-partitioned parquet files + for this catalog. + + For more information on the general parquet metadata files, and why we write them, see + https://arrow.apache.org/docs/python/parquet.html#writing-metadata-and-common-metadata-files + + Args: + catalog_path (str): base path for the catalog + storage_options: dictionary that contains abstract filesystem credentials + output_path (str): base path for writing out metadata files + defaults to `catalog_path` if unspecified + """ + ignore_prefixes = [ + "intermediate", + "_common_metadata", + "_metadata", + ] + + dataset = file_io.read_parquet_dataset( + catalog_path, + storage_options=storage_options, + ignore_prefixes=ignore_prefixes, + exclude_invalid_files=True, + ) + metadata_collector = [] + + for hips_file in dataset.files: + hips_file_pointer = file_io.get_file_pointer_from_path(hips_file, include_protocol=catalog_path) + single_metadata = file_io.read_parquet_metadata(hips_file_pointer, storage_options=storage_options) + + # Users must set the file path of each chunk before combining the metadata. + relative_path = hips_file[len(catalog_path) :] + single_metadata.set_file_path(relative_path) + metadata_collector.append(single_metadata) + + ## Write out the two metadata files + if output_path is None: + output_path = catalog_path + catalog_base_dir = file_io.get_file_pointer_from_path(output_path) + metadata_file_pointer = paths.get_parquet_metadata_pointer(catalog_base_dir) + common_metadata_file_pointer = paths.get_common_metadata_pointer(catalog_base_dir) + + file_io.write_parquet_metadata( + dataset.schema, + metadata_file_pointer, + metadata_collector=metadata_collector, + write_statistics=True, + storage_options=storage_options, + ) + file_io.write_parquet_metadata( + dataset.schema, common_metadata_file_pointer, storage_options=storage_options + ) + + +def write_parquet_metadata_for_batches( + batches: List[pa.RecordBatch], output_path: str = None, storage_options: dict = None +): + """Write parquet metadata files for some pyarrow table batches. + This writes the batches to a temporary parquet dataset using local storage, and + generates the metadata for the partitioned catalog parquet files. + + Args: + batches (List[pa.RecordBatch]): create one batch per group of data (partition or row group) + output_path (str): base path for writing out metadata files + defaults to `catalog_path` if unspecified + storage_options: dictionary that contains abstract filesystem credentials + """ + + temp_info_table = pa.Table.from_batches(batches) + + with tempfile.TemporaryDirectory() as temp_pq_file: + pq.write_to_dataset(temp_info_table, temp_pq_file) + write_parquet_metadata(temp_pq_file, storage_options=storage_options, output_path=output_path) + + +def read_row_group_fragments(metadata_file: str, storage_options: dict = None): + """Generator for metadata fragment row groups in a parquet metadata file. + + Args: + metadata_file (str): path to `_metadata` file. + storage_options: dictionary that contains abstract filesystem credentials + """ + file_system, dir_pointer = get_fs(file_pointer=metadata_file, storage_options=storage_options) + dir_pointer = strip_leading_slash_for_pyarrow(dir_pointer, file_system.protocol) + dataset = pds.parquet_dataset(dir_pointer, filesystem=file_system) + + for frag in dataset.get_fragments(): + for row_group in frag.row_groups: + yield row_group diff --git a/src/hipscat/io/validation.py b/src/hipscat/io/validation.py index 51339bdb..877e7a98 100644 --- a/src/hipscat/io/validation.py +++ b/src/hipscat/io/validation.py @@ -1,5 +1,5 @@ from hipscat.catalog.dataset.catalog_info_factory import from_catalog_dir -from hipscat.io import get_partition_info_pointer +from hipscat.io import get_parquet_metadata_pointer, get_partition_info_pointer from hipscat.io.file_io.file_pointer import FilePointer, is_regular_file @@ -7,20 +7,20 @@ def is_valid_catalog(pointer: FilePointer) -> bool: """Checks if a catalog is valid for a given base catalog pointer Args: - pointer: pointer to base catalog directory + pointer (FilePointer): pointer to base catalog directory Returns: True if both the catalog_info and partition_info files are valid, False otherwise """ - return is_catalog_info_valid(pointer) and is_partition_info_valid(pointer) + return is_catalog_info_valid(pointer) and (is_partition_info_valid(pointer) or is_metadata_valid(pointer)) -def is_catalog_info_valid(pointer): +def is_catalog_info_valid(pointer: FilePointer) -> bool: """Checks if catalog_info is valid for a given base catalog pointer Args: - pointer: pointer to base catalog directory + pointer (FilePointer): pointer to base catalog directory Returns: True if the catalog_info file exists, and it is correctly formatted, @@ -34,11 +34,11 @@ def is_catalog_info_valid(pointer): return is_valid -def is_partition_info_valid(pointer): +def is_partition_info_valid(pointer: FilePointer) -> bool: """Checks if partition_info is valid for a given base catalog pointer Args: - pointer: pointer to base catalog directory + pointer (FilePointer): pointer to base catalog directory Returns: True if the partition_info file exists, False otherwise @@ -46,3 +46,17 @@ def is_partition_info_valid(pointer): partition_info_pointer = get_partition_info_pointer(pointer) partition_info_exists = is_regular_file(partition_info_pointer) return partition_info_exists + + +def is_metadata_valid(pointer: FilePointer) -> bool: + """Checks if _metadata is valid for a given base catalog pointer + + Args: + pointer (FilePointer): pointer to base catalog directory + + Returns: + True if the _metadata file exists, False otherwise + """ + metadata_file = get_parquet_metadata_pointer(pointer) + metadata_file_exists = is_regular_file(metadata_file) + return metadata_file_exists diff --git a/src/hipscat/io/write_metadata.py b/src/hipscat/io/write_metadata.py index d1ed831c..37fc61ce 100644 --- a/src/hipscat/io/write_metadata.py +++ b/src/hipscat/io/write_metadata.py @@ -10,6 +10,7 @@ import pandas as pd from hipscat.io import file_io, paths +from hipscat.io.parquet_metadata import write_parquet_metadata as wpm def write_json_file( @@ -116,41 +117,10 @@ def write_parquet_metadata(catalog_path, storage_options: Union[Dict[Any, Any], catalog_path (str): base path for the catalog storage_options: dictionary that contains abstract filesystem credentials """ - - ignore_prefixes = [ - "intermediate", - "_common_metadata", - "_metadata", - ] - - dataset = file_io.read_parquet_dataset( - catalog_path, - storage_options=storage_options, - ignore_prefixes=ignore_prefixes, - exclude_invalid_files=True, - ) - metadata_collector = [] - - for hips_file in dataset.files: - hips_file_pointer = file_io.get_file_pointer_from_path(hips_file, include_protocol=catalog_path) - single_metadata = file_io.read_parquet_metadata(hips_file_pointer, storage_options=storage_options) - relative_path = hips_file[len(catalog_path) :] - single_metadata.set_file_path(relative_path) - metadata_collector.append(single_metadata) - - ## Write out the two metadata files - catalog_base_dir = file_io.get_file_pointer_from_path(catalog_path) - metadata_file_pointer = paths.get_parquet_metadata_pointer(catalog_base_dir) - common_metadata_file_pointer = paths.get_common_metadata_pointer(catalog_base_dir) - - file_io.write_parquet_metadata( - dataset.schema, - metadata_file_pointer, - metadata_collector=metadata_collector, + wpm( + catalog_path=catalog_path, storage_options=storage_options, - ) - file_io.write_parquet_metadata( - dataset.schema, common_metadata_file_pointer, storage_options=storage_options + output_path=catalog_path, ) diff --git a/src/hipscat/pixel_math/healpix_pixel.py b/src/hipscat/pixel_math/healpix_pixel.py index 1b91e84e..08e578fa 100644 --- a/src/hipscat/pixel_math/healpix_pixel.py +++ b/src/hipscat/pixel_math/healpix_pixel.py @@ -53,7 +53,7 @@ def convert_to_lower_order(self, delta_order: int) -> HealpixPixel: if delta_order < 0: raise ValueError("delta order cannot be below zero") new_order = self.order - delta_order - new_pixel = math.floor(self.pixel / 4**delta_order) + new_pixel = math.floor(self.pixel / 4 ** delta_order) return HealpixPixel(new_order, new_pixel) def convert_to_higher_order(self, delta_order: int) -> List[HealpixPixel]: @@ -77,6 +77,21 @@ def convert_to_higher_order(self, delta_order: int) -> List[HealpixPixel]: raise ValueError("delta order cannot be below zero") pixels = [] new_order = self.order + delta_order - for new_pixel in range(self.pixel * 4**delta_order, (self.pixel + 1) * 4**delta_order): + for new_pixel in range(self.pixel * 4 ** delta_order, (self.pixel + 1) * 4 ** delta_order): pixels.append(HealpixPixel(new_order, new_pixel)) return pixels + + @property + def dir(self) -> int: + """Directory number for the pixel. + + This is necessary for file systems that limit to 10,000 subdirectories. + The directory name will take the HiPS standard form of:: + + /Norder=/Dir= + + Where the directory number is calculated using integer division as:: + + (pixel_number/10000)*10000 + """ + return int(self.pixel / 10_000) * 10_000 diff --git a/tests/conftest.py b/tests/conftest.py index e07e6c76..58595a72 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -216,6 +216,32 @@ def source_catalog_info_file(test_data_dir) -> str: return os.path.join(test_data_dir, "small_sky_source", "catalog_info.json") +@pytest.fixture +def small_sky_source_dir(test_data_dir) -> str: + return os.path.join(test_data_dir, "small_sky_source") + + +@pytest.fixture +def small_sky_source_pixels(): + """Source catalog pixels""" + return [ + HealpixPixel(0, 4), + HealpixPixel(1, 47), + HealpixPixel(2, 176), + HealpixPixel(2, 177), + HealpixPixel(2, 178), + HealpixPixel(2, 179), + HealpixPixel(2, 180), + HealpixPixel(2, 181), + HealpixPixel(2, 182), + HealpixPixel(2, 183), + HealpixPixel(2, 184), + HealpixPixel(2, 185), + HealpixPixel(2, 186), + HealpixPixel(2, 187), + ] + + @pytest.fixture def association_catalog_info(association_catalog_info_data) -> AssociationCatalogInfo: return AssociationCatalogInfo(**association_catalog_info_data) diff --git a/tests/data/pixel_trees/aligned_2_3_inner.csv b/tests/data/pixel_trees/aligned_2_3_inner.csv deleted file mode 100644 index 696dc40e..00000000 --- a/tests/data/pixel_trees/aligned_2_3_inner.csv +++ /dev/null @@ -1,11 +0,0 @@ -Norder,Dir,Npix -1,0,33 -1,0,35 -1,0,40 -1,0,42 -1,0,43 -1,0,44 -1,0,46 -2,0,128 -2,0,130 -2,0,131 \ No newline at end of file diff --git a/tests/data/pixel_trees/aligned_2_3_left.csv b/tests/data/pixel_trees/aligned_2_3_left.csv deleted file mode 100644 index a7347b36..00000000 --- a/tests/data/pixel_trees/aligned_2_3_left.csv +++ /dev/null @@ -1,13 +0,0 @@ -Norder,Dir,Npix -1,0,33 -1,0,35 -1,0,40 -1,0,41 -1,0,42 -1,0,43 -1,0,44 -1,0,45 -1,0,46 -2,0,128 -2,0,130 -2,0,131 \ No newline at end of file diff --git a/tests/data/pixel_trees/aligned_2_3_outer.csv b/tests/data/pixel_trees/aligned_2_3_outer.csv deleted file mode 100644 index 6902c1b5..00000000 --- a/tests/data/pixel_trees/aligned_2_3_outer.csv +++ /dev/null @@ -1,18 +0,0 @@ -Norder,Dir,Npix -1,0,33 -1,0,34 -1,0,35 -1,0,36 -1,0,37 -1,0,40 -1,0,41 -1,0,42 -1,0,43 -1,0,44 -1,0,45 -1,0,46 -1,0,47 -2,0,128 -2,0,129 -2,0,130 -2,0,131 \ No newline at end of file diff --git a/tests/data/pixel_trees/aligned_2_3_right.csv b/tests/data/pixel_trees/aligned_2_3_right.csv deleted file mode 100644 index df302ddc..00000000 --- a/tests/data/pixel_trees/aligned_2_3_right.csv +++ /dev/null @@ -1,16 +0,0 @@ -Norder,Dir,Npix -1,0,33 -1,0,34 -1,0,35 -1,0,36 -1,0,37 -1,0,40 -1,0,42 -1,0,43 -1,0,44 -1,0,46 -1,0,47 -2,0,128 -2,0,129 -2,0,130 -2,0,131 \ No newline at end of file diff --git a/tests/data/small_sky/_common_metadata b/tests/data/small_sky/_common_metadata new file mode 100644 index 00000000..a1505a28 Binary files /dev/null and b/tests/data/small_sky/_common_metadata differ diff --git a/tests/data/small_sky/_metadata b/tests/data/small_sky/_metadata new file mode 100644 index 00000000..63b03188 Binary files /dev/null and b/tests/data/small_sky/_metadata differ diff --git a/tests/data/small_sky/partition_info.csv b/tests/data/small_sky/partition_info.csv deleted file mode 100644 index ed015721..00000000 --- a/tests/data/small_sky/partition_info.csv +++ /dev/null @@ -1,2 +0,0 @@ -Norder,Dir,Npix,num_rows -0,0,11,131 diff --git a/tests/data/small_sky_order1/_common_metadata b/tests/data/small_sky_order1/_common_metadata new file mode 100644 index 00000000..06ad5441 Binary files /dev/null and b/tests/data/small_sky_order1/_common_metadata differ diff --git a/tests/data/small_sky_order1/_metadata b/tests/data/small_sky_order1/_metadata new file mode 100644 index 00000000..dddd6754 Binary files /dev/null and b/tests/data/small_sky_order1/_metadata differ diff --git a/tests/data/small_sky_order1/partition_info.csv b/tests/data/small_sky_order1/partition_info.csv deleted file mode 100644 index d15927f2..00000000 --- a/tests/data/small_sky_order1/partition_info.csv +++ /dev/null @@ -1,5 +0,0 @@ -Norder,Dir,Npix,num_rows -1,0,44,42 -1,0,45,29 -1,0,46,42 -1,0,47,18 diff --git a/tests/data/small_sky_source/_common_metadata b/tests/data/small_sky_source/_common_metadata new file mode 100644 index 00000000..aa1aa280 Binary files /dev/null and b/tests/data/small_sky_source/_common_metadata differ diff --git a/tests/data/small_sky_source/_metadata b/tests/data/small_sky_source/_metadata new file mode 100644 index 00000000..f5083342 Binary files /dev/null and b/tests/data/small_sky_source/_metadata differ diff --git a/tests/data/small_sky_source/partition_info.csv b/tests/data/small_sky_source/partition_info.csv deleted file mode 100644 index 7a5f4e9f..00000000 --- a/tests/data/small_sky_source/partition_info.csv +++ /dev/null @@ -1,15 +0,0 @@ -Norder,Dir,Npix,num_rows -0,0,4,50 -1,0,47,2395 -2,0,176,385 -2,0,177,1510 -2,0,178,1634 -2,0,179,1773 -2,0,180,655 -2,0,181,903 -2,0,182,1246 -2,0,183,1143 -2,0,184,1390 -2,0,185,2942 -2,0,186,452 -2,0,187,683 diff --git a/tests/data/small_sky_source/point_map.fits b/tests/data/small_sky_source/point_map.fits new file mode 100644 index 00000000..e0ac82b9 Binary files /dev/null and b/tests/data/small_sky_source/point_map.fits differ diff --git a/tests/data/small_sky_to_small_sky_order1/_common_metadata b/tests/data/small_sky_to_small_sky_order1/_common_metadata new file mode 100644 index 00000000..94381647 Binary files /dev/null and b/tests/data/small_sky_to_small_sky_order1/_common_metadata differ diff --git a/tests/data/small_sky_to_small_sky_order1/_metadata b/tests/data/small_sky_to_small_sky_order1/_metadata new file mode 100644 index 00000000..44e48186 Binary files /dev/null and b/tests/data/small_sky_to_small_sky_order1/_metadata differ diff --git a/tests/hipscat/catalog/test_catalog.py b/tests/hipscat/catalog/test_catalog.py index 51bbe0b1..67e2d881 100644 --- a/tests/hipscat/catalog/test_catalog.py +++ b/tests/hipscat/catalog/test_catalog.py @@ -140,7 +140,7 @@ def test_empty_directory(tmp_path): os.makedirs(catalog_path, exist_ok=True) ## Path exists but there's nothing there - with pytest.raises(FileNotFoundError): + with pytest.raises(FileNotFoundError, match="catalog info"): Catalog.read_from_hipscat(catalog_path) ## catalog_info file exists - getting closer @@ -148,13 +148,12 @@ def test_empty_directory(tmp_path): with open(file_name, "w", encoding="utf-8") as metadata_file: metadata_file.write('{"catalog_name":"empty", "catalog_type":"source"}') - with pytest.raises(FileNotFoundError): + with pytest.raises(FileNotFoundError, match="metadata"): Catalog.read_from_hipscat(catalog_path) - ## partition_info file exists - enough to create a catalog - file_name = os.path.join(catalog_path, "partition_info.csv") - with open(file_name, "w", encoding="utf-8") as metadata_file: - metadata_file.write("Norder,Dir,Npix") + ## Now we create the needed _metadata and everything is right. + part_info = PartitionInfo.from_healpix([HealpixPixel(0, 11)]) + part_info.write_to_metadata_files(catalog_path=catalog_path) catalog = Catalog.read_from_hipscat(catalog_path) assert catalog.catalog_name == "empty" diff --git a/tests/hipscat/catalog/test_partition_info.py b/tests/hipscat/catalog/test_partition_info.py index a0c2a8a6..3a5d5121 100644 --- a/tests/hipscat/catalog/test_partition_info.py +++ b/tests/hipscat/catalog/test_partition_info.py @@ -10,7 +10,7 @@ def test_load_partition_info_small_sky(small_sky_dir): """Instantiate the partition info for catalog with 1 pixel""" - partition_info_file = paths.get_partition_info_pointer(small_sky_dir) + partition_info_file = paths.get_parquet_metadata_pointer(small_sky_dir) partitions = PartitionInfo.read_from_file(partition_info_file) order_pixel_pairs = partitions.get_healpix_pixels() @@ -19,9 +19,22 @@ def test_load_partition_info_small_sky(small_sky_dir): assert order_pixel_pairs == expected +def test_load_partition_info_from_metadata(small_sky_dir, small_sky_source_dir, small_sky_source_pixels): + """Instantiate the partition info for catalogs via the `_metadata` file""" + metadata_file = paths.get_parquet_metadata_pointer(small_sky_dir) + partitions = PartitionInfo.read_from_file(metadata_file) + + assert partitions.get_healpix_pixels() == [HealpixPixel(0, 11)] + + metadata_file = paths.get_parquet_metadata_pointer(small_sky_source_dir) + partitions = PartitionInfo.read_from_file(metadata_file) + + assert partitions.get_healpix_pixels() == small_sky_source_pixels + + def test_load_partition_info_small_sky_order1(small_sky_order1_dir): """Instantiate the partition info for catalog with 4 pixels""" - partition_info_file = paths.get_partition_info_pointer(small_sky_order1_dir) + partition_info_file = paths.get_parquet_metadata_pointer(small_sky_order1_dir) partitions = PartitionInfo.read_from_file(partition_info_file) order_pixel_pairs = partitions.get_healpix_pixels() @@ -36,15 +49,20 @@ def test_load_partition_info_small_sky_order1(small_sky_order1_dir): def test_load_partition_no_file(tmp_path): - wrong_path = os.path.join(tmp_path, "wrong.csv") + wrong_path = os.path.join(tmp_path, "_metadata") wrong_pointer = file_io.get_file_pointer_from_path(wrong_path) with pytest.raises(FileNotFoundError): PartitionInfo.read_from_file(wrong_pointer) + wrong_path = os.path.join(tmp_path, "partition_info.csv") + wrong_pointer = file_io.get_file_pointer_from_path(wrong_path) + with pytest.raises(FileNotFoundError): + PartitionInfo.read_from_csv(wrong_pointer) + def test_get_highest_order(small_sky_order1_dir): """test the `get_highest_order` method""" - partition_info_file = paths.get_partition_info_pointer(small_sky_order1_dir) + partition_info_file = paths.get_parquet_metadata_pointer(small_sky_order1_dir) partitions = PartitionInfo.read_from_file(partition_info_file) highest_order = partitions.get_highest_order() @@ -54,10 +72,10 @@ def test_get_highest_order(small_sky_order1_dir): def test_write_to_file(tmp_path, small_sky_pixels): """Write out the partition info to file and make sure we can read it again.""" - partition_info_pointer = paths.get_partition_info_pointer(tmp_path) + partition_info_pointer = paths.get_parquet_metadata_pointer(tmp_path) partition_info = PartitionInfo.from_healpix(small_sky_pixels) partition_info.write_to_file(partition_info_pointer) - new_partition_info = PartitionInfo.read_from_file(partition_info_pointer) + new_partition_info = PartitionInfo.read_from_csv(partition_info_pointer) assert partition_info.get_healpix_pixels() == new_partition_info.get_healpix_pixels() diff --git a/tests/hipscat/io/conftest.py b/tests/hipscat/io/conftest.py index 1e787eda..2f154357 100644 --- a/tests/hipscat/io/conftest.py +++ b/tests/hipscat/io/conftest.py @@ -2,9 +2,11 @@ import re +import numpy.testing as npt import pyarrow as pa import pytest +from hipscat.io import file_io from hipscat.io.file_io.file_io import load_text_file from hipscat.io.file_io.file_pointer import does_file_or_directory_exist @@ -58,3 +60,32 @@ def basic_catalog_parquet_metadata(): pa.field("__index_level_0__", pa.int64()), ] ) + + +@pytest.fixture +def check_parquet_schema(): + def check_parquet_schema(file_name, expected_schema, expected_num_row_groups=1): + """Check parquet schema against expectations""" + assert file_io.does_file_or_directory_exist(file_name), f"file not found [{file_name}]" + + single_metadata = file_io.read_parquet_metadata(file_name) + schema = single_metadata.schema.to_arrow_schema() + + assert len(schema) == len( + expected_schema + ), f"object list not the same size ({len(schema)} vs {len(expected_schema)})" + + npt.assert_array_equal(schema.names, expected_schema.names) + + assert schema.equals(expected_schema, check_metadata=False) + + parquet_file = file_io.read_parquet_file(file_name) + assert parquet_file.metadata.num_row_groups == expected_num_row_groups + + for row_index in range(0, parquet_file.metadata.num_row_groups): + row_md = parquet_file.metadata.row_group(row_index) + for column_index in range(0, row_md.num_columns): + column_metadata = row_md.column(column_index) + assert column_metadata.file_path.endswith(".parquet") + + return check_parquet_schema diff --git a/tests/hipscat/io/file_io/test_file_io.py b/tests/hipscat/io/file_io/test_file_io.py index 6dbd5d68..005f1935 100644 --- a/tests/hipscat/io/file_io/test_file_io.py +++ b/tests/hipscat/io/file_io/test_file_io.py @@ -8,7 +8,6 @@ from hipscat.io.file_io import ( delete_file, get_file_pointer_from_path, - load_csv_to_pandas, load_json_file, load_parquet_to_pandas, make_directory, @@ -91,14 +90,6 @@ def test_load_json(small_sky_dir): assert loaded_json_dict == json_dict -def test_load_csv_to_pandas(small_sky_dir): - partition_info_path = os.path.join(small_sky_dir, "partition_info.csv") - csv_df = pd.read_csv(partition_info_path) - partition_info_pointer = get_file_pointer_from_path(partition_info_path) - loaded_df = load_csv_to_pandas(partition_info_pointer) - pd.testing.assert_frame_equal(csv_df, loaded_df) - - def test_load_parquet_to_pandas(small_sky_dir): pixel_data_path = pixel_catalog_file(small_sky_dir, 0, 11) parquet_df = pd.read_parquet(pixel_data_path) diff --git a/tests/hipscat/io/file_io/test_file_pointers.py b/tests/hipscat/io/file_io/test_file_pointers.py index bafe41c2..7bc75550 100644 --- a/tests/hipscat/io/file_io/test_file_pointers.py +++ b/tests/hipscat/io/file_io/test_file_pointers.py @@ -49,8 +49,8 @@ def test_append_paths_to_pointer(tmp_path): def test_is_regular_file(small_sky_dir): - partition_info_file = os.path.join(small_sky_dir, "partition_info.csv") - assert is_regular_file(partition_info_file) + catalog_info_file = os.path.join(small_sky_dir, "catalog_info.json") + assert is_regular_file(catalog_info_file) assert not is_regular_file(small_sky_dir) @@ -60,10 +60,10 @@ def test_is_regular_file(small_sky_dir): def test_find_files_matching_path(small_sky_dir): ## no_wildcard - assert len(find_files_matching_path(small_sky_dir, "partition_info.csv")) == 1 + assert len(find_files_matching_path(small_sky_dir, "catalog_info.json")) == 1 ## wilcard in the name - assert len(find_files_matching_path(small_sky_dir, "*.csv")) == 1 + assert len(find_files_matching_path(small_sky_dir, "*.json")) == 1 def test_find_files_matching_path_directory(small_sky_order1_dir): @@ -85,12 +85,13 @@ def test_get_directory_contents(small_sky_order1_dir, tmp_path): if not content.startswith("/"): small_sky_contents[i] = f"/{content}" - assert len(small_sky_contents) == 4 + assert len(small_sky_contents) == 5 expected = [ os.path.join(small_sky_order1_dir, "Norder=1"), + os.path.join(small_sky_order1_dir, "_common_metadata"), + os.path.join(small_sky_order1_dir, "_metadata"), os.path.join(small_sky_order1_dir, "catalog_info.json"), - os.path.join(small_sky_order1_dir, "partition_info.csv"), os.path.join(small_sky_order1_dir, "point_map.fits"), ] diff --git a/tests/hipscat/io/test_parquet_metadata.py b/tests/hipscat/io/test_parquet_metadata.py new file mode 100644 index 00000000..b7e2bd18 --- /dev/null +++ b/tests/hipscat/io/test_parquet_metadata.py @@ -0,0 +1,119 @@ +"""Tests of file IO (reads and writes)""" + +import os +import shutil + +import pandas as pd +import pyarrow as pa +import pytest + +from hipscat.io import file_io, paths +from hipscat.io.parquet_metadata import ( + read_row_group_fragments, + row_group_stat_single_value, + write_parquet_metadata, +) + + +def test_write_parquet_metadata( + tmp_path, small_sky_dir, basic_catalog_parquet_metadata, check_parquet_schema +): + """Copy existing catalog and create new metadata files for it""" + catalog_base_dir = os.path.join(tmp_path, "catalog") + shutil.copytree( + small_sky_dir, + catalog_base_dir, + ) + write_parquet_metadata(catalog_base_dir) + check_parquet_schema(os.path.join(catalog_base_dir, "_metadata"), basic_catalog_parquet_metadata) + ## _common_metadata has 0 row groups + check_parquet_schema( + os.path.join(catalog_base_dir, "_common_metadata"), + basic_catalog_parquet_metadata, + 0, + ) + ## Re-write - should still have the same properties. + write_parquet_metadata(catalog_base_dir) + check_parquet_schema(os.path.join(catalog_base_dir, "_metadata"), basic_catalog_parquet_metadata) + ## _common_metadata has 0 row groups + check_parquet_schema( + os.path.join(catalog_base_dir, "_common_metadata"), + basic_catalog_parquet_metadata, + 0, + ) + + +def test_write_parquet_metadata_order1( + tmp_path, small_sky_order1_dir, basic_catalog_parquet_metadata, check_parquet_schema +): + """Copy existing catalog and create new metadata files for it, + using a catalog with multiple files.""" + temp_path = os.path.join(tmp_path, "catalog") + shutil.copytree( + small_sky_order1_dir, + temp_path, + ) + + write_parquet_metadata(temp_path) + ## 4 row groups for 4 partitioned parquet files + check_parquet_schema( + os.path.join(temp_path, "_metadata"), + basic_catalog_parquet_metadata, + 4, + ) + ## _common_metadata has 0 row groups + check_parquet_schema( + os.path.join(temp_path, "_common_metadata"), + basic_catalog_parquet_metadata, + 0, + ) + + +def test_write_index_parquet_metadata(tmp_path, check_parquet_schema): + """Create an index-like catalog, and test metadata creation.""" + temp_path = os.path.join(tmp_path, "index") + + index_parquet_path = os.path.join(temp_path, "Parts=0", "part_000_of_001.parquet") + file_io.make_directory(os.path.join(temp_path, "Parts=0")) + basic_index = pd.DataFrame({"_hipscat_id": [4000, 4001], "ps1_objid": [700, 800]}) + file_io.write_dataframe_to_parquet(basic_index, index_parquet_path) + + index_catalog_parquet_metadata = pa.schema( + [ + pa.field("_hipscat_id", pa.int64()), + pa.field("ps1_objid", pa.int64()), + ] + ) + + write_parquet_metadata(temp_path) + check_parquet_schema(os.path.join(tmp_path, "index", "_metadata"), index_catalog_parquet_metadata) + ## _common_metadata has 0 row groups + check_parquet_schema( + os.path.join(tmp_path, "index", "_common_metadata"), + index_catalog_parquet_metadata, + 0, + ) + + +def test_row_group_fragments(small_sky_order1_dir): + partition_info_file = paths.get_parquet_metadata_pointer(small_sky_order1_dir) + + num_row_groups = 0 + for _ in read_row_group_fragments(partition_info_file): + num_row_groups += 1 + + assert num_row_groups == 4 + + +def test_row_group_stats(small_sky_dir): + partition_info_file = paths.get_parquet_metadata_pointer(small_sky_dir) + first_row_group = next(read_row_group_fragments(partition_info_file)) + + assert row_group_stat_single_value(first_row_group, "Norder") == 0 + assert row_group_stat_single_value(first_row_group, "Npix") == 11 + + with pytest.raises(ValueError, match="doesn't have expected key"): + row_group_stat_single_value(first_row_group, "NOT HERE") + + with pytest.raises(ValueError, match="stat min != max"): + row_group_stat_single_value(first_row_group, "ra") diff --git a/tests/hipscat/io/test_validation.py b/tests/hipscat/io/test_validation.py index a6a94234..03a4d959 100644 --- a/tests/hipscat/io/test_validation.py +++ b/tests/hipscat/io/test_validation.py @@ -16,10 +16,9 @@ def test_is_valid_catalog(tmp_path, small_sky_catalog, small_sky_pixels): write_catalog_info(catalog_dir_pointer, small_sky_catalog.catalog_info) assert not is_valid_catalog(catalog_dir_pointer) - # The catalog is valid if both the catalog_info and partition_info files exist, + # The catalog is valid if both the catalog_info and _metadata files exist, # and the catalog_info is in a valid format - partition_info_pointer = paths.get_partition_info_pointer(catalog_dir_pointer) - PartitionInfo.from_healpix(small_sky_pixels).write_to_file(partition_info_pointer) + PartitionInfo.from_healpix(small_sky_pixels).write_to_metadata_files(catalog_dir_pointer) assert is_valid_catalog(catalog_dir_pointer) # A partition_info file alone is also not enough diff --git a/tests/hipscat/io/test_write_metadata.py b/tests/hipscat/io/test_write_metadata.py index 1ee6b910..5f563edc 100644 --- a/tests/hipscat/io/test_write_metadata.py +++ b/tests/hipscat/io/test_write_metadata.py @@ -4,8 +4,6 @@ import shutil import numpy.testing as npt -import pandas as pd -import pyarrow as pa import hipscat.io.write_metadata as io import hipscat.pixel_math as hist @@ -145,7 +143,9 @@ def test_write_partition_info_float(assert_text_file_matches, tmp_path): assert_text_file_matches(expected_lines, metadata_filename) -def test_write_parquet_metadata(tmp_path, small_sky_dir, basic_catalog_parquet_metadata): +def test_write_parquet_metadata( + tmp_path, small_sky_dir, basic_catalog_parquet_metadata, check_parquet_schema +): """Copy existing catalog and create new metadata files for it""" catalog_base_dir = os.path.join(tmp_path, "catalog") shutil.copytree( @@ -171,81 +171,6 @@ def test_write_parquet_metadata(tmp_path, small_sky_dir, basic_catalog_parquet_m ) -def test_write_parquet_metadata_order1(tmp_path, small_sky_order1_dir, basic_catalog_parquet_metadata): - """Copy existing catalog and create new metadata files for it, - using a catalog with multiple files.""" - temp_path = os.path.join(tmp_path, "catalog") - shutil.copytree( - small_sky_order1_dir, - temp_path, - ) - - io.write_parquet_metadata(temp_path) - ## 4 row groups for 4 partitioned parquet files - check_parquet_schema( - os.path.join(temp_path, "_metadata"), - basic_catalog_parquet_metadata, - 4, - ) - ## _common_metadata has 0 row groups - check_parquet_schema( - os.path.join(temp_path, "_common_metadata"), - basic_catalog_parquet_metadata, - 0, - ) - - -def test_write_index_parquet_metadata(tmp_path): - """Create an index-like catalog, and test metadata creation.""" - temp_path = os.path.join(tmp_path, "index") - - index_parquet_path = os.path.join(temp_path, "Parts=0", "part_000_of_001.parquet") - file_io.make_directory(os.path.join(temp_path, "Parts=0")) - basic_index = pd.DataFrame({"_hipscat_id": [4000, 4001], "ps1_objid": [700, 800]}) - file_io.write_dataframe_to_parquet(basic_index, index_parquet_path) - - index_catalog_parquet_metadata = pa.schema( - [ - pa.field("_hipscat_id", pa.int64()), - pa.field("ps1_objid", pa.int64()), - ] - ) - - io.write_parquet_metadata(temp_path) - check_parquet_schema(os.path.join(tmp_path, "index", "_metadata"), index_catalog_parquet_metadata) - ## _common_metadata has 0 row groups - check_parquet_schema( - os.path.join(tmp_path, "index", "_common_metadata"), - index_catalog_parquet_metadata, - 0, - ) - - -def check_parquet_schema(file_name, expected_schema, expected_num_row_groups=1): - """Check parquet schema against expectations""" - assert file_io.does_file_or_directory_exist(file_name), f"file not found [{file_name}]" - - single_metadata = file_io.read_parquet_metadata(file_name) - schema = single_metadata.schema.to_arrow_schema() - - assert len(schema) == len( - expected_schema - ), f"object list not the same size ({len(schema)} vs {len(expected_schema)})" - - npt.assert_array_equal(schema.names, expected_schema.names) - - assert schema.equals(expected_schema, check_metadata=False) - - parquet_file = file_io.read_parquet_file(file_name) - assert parquet_file.metadata.num_row_groups == expected_num_row_groups - - for row_index in range(0, parquet_file.metadata.num_row_groups): - row_md = parquet_file.metadata.row_group(row_index) - for column_index in range(0, row_md.num_columns): - column_metadata = row_md.column(column_index) - assert column_metadata.file_path.endswith(".parquet") - - def test_read_write_fits_point_map(tmp_path): """Check that we write and can read a FITS file for spatial distribution.""" initial_histogram = hist.empty_histogram(1) diff --git a/tests/hipscat/pixel_tree/conftest.py b/tests/hipscat/pixel_tree/conftest.py index 171ab7ce..a2b91384 100644 --- a/tests/hipscat/pixel_tree/conftest.py +++ b/tests/hipscat/pixel_tree/conftest.py @@ -2,7 +2,6 @@ import pytest -from hipscat.catalog import PartitionInfo from hipscat.pixel_math import HealpixPixel from hipscat.pixel_tree.pixel_tree_builder import PixelTreeBuilder @@ -52,44 +51,86 @@ def pixel_tree_3(): @pytest.fixture -def aligned_trees_2_3_inner_path(pixel_trees_dir): - return os.path.join(pixel_trees_dir, "aligned_2_3_inner.csv") - - -@pytest.fixture -def aligned_trees_2_3_left_path(pixel_trees_dir): - return os.path.join(pixel_trees_dir, "aligned_2_3_left.csv") - - -@pytest.fixture -def aligned_trees_2_3_right_path(pixel_trees_dir): - return os.path.join(pixel_trees_dir, "aligned_2_3_right.csv") - - -@pytest.fixture -def aligned_trees_2_3_outer_path(pixel_trees_dir): - return os.path.join(pixel_trees_dir, "aligned_2_3_outer.csv") - - -@pytest.fixture -def aligned_trees_2_3_inner(aligned_trees_2_3_inner_path): - partition_info = PartitionInfo.read_from_file(aligned_trees_2_3_inner_path) - return PixelTreeBuilder.from_healpix(partition_info.get_healpix_pixels()) +def aligned_trees_2_3_inner(): + return PixelTreeBuilder.from_healpix( + [ + HealpixPixel(1, 33), + HealpixPixel(1, 35), + HealpixPixel(1, 40), + HealpixPixel(1, 42), + HealpixPixel(1, 43), + HealpixPixel(1, 44), + HealpixPixel(1, 46), + HealpixPixel(2, 128), + HealpixPixel(2, 130), + HealpixPixel(2, 131), + ] + ) @pytest.fixture -def aligned_trees_2_3_left(aligned_trees_2_3_left_path): - partition_info = PartitionInfo.read_from_file(aligned_trees_2_3_left_path) - return PixelTreeBuilder.from_healpix(partition_info.get_healpix_pixels()) +def aligned_trees_2_3_left(): + return PixelTreeBuilder.from_healpix( + [ + HealpixPixel(1, 33), + HealpixPixel(1, 35), + HealpixPixel(1, 40), + HealpixPixel(1, 41), + HealpixPixel(1, 42), + HealpixPixel(1, 43), + HealpixPixel(1, 44), + HealpixPixel(1, 45), + HealpixPixel(1, 46), + HealpixPixel(2, 128), + HealpixPixel(2, 130), + HealpixPixel(2, 131), + ] + ) @pytest.fixture -def aligned_trees_2_3_right(aligned_trees_2_3_right_path): - partition_info = PartitionInfo.read_from_file(aligned_trees_2_3_right_path) - return PixelTreeBuilder.from_healpix(partition_info.get_healpix_pixels()) +def aligned_trees_2_3_right(): + return PixelTreeBuilder.from_healpix( + [ + HealpixPixel(1, 33), + HealpixPixel(1, 34), + HealpixPixel(1, 35), + HealpixPixel(1, 36), + HealpixPixel(1, 37), + HealpixPixel(1, 40), + HealpixPixel(1, 42), + HealpixPixel(1, 43), + HealpixPixel(1, 44), + HealpixPixel(1, 46), + HealpixPixel(1, 47), + HealpixPixel(2, 128), + HealpixPixel(2, 129), + HealpixPixel(2, 130), + HealpixPixel(2, 131), + ] + ) @pytest.fixture -def aligned_trees_2_3_outer(aligned_trees_2_3_outer_path): - partition_info = PartitionInfo.read_from_file(aligned_trees_2_3_outer_path) - return PixelTreeBuilder.from_healpix(partition_info.get_healpix_pixels()) +def aligned_trees_2_3_outer(): + return PixelTreeBuilder.from_healpix( + [ + HealpixPixel(1, 33), + HealpixPixel(1, 34), + HealpixPixel(1, 35), + HealpixPixel(1, 36), + HealpixPixel(1, 37), + HealpixPixel(1, 40), + HealpixPixel(1, 41), + HealpixPixel(1, 42), + HealpixPixel(1, 43), + HealpixPixel(1, 44), + HealpixPixel(1, 45), + HealpixPixel(1, 46), + HealpixPixel(1, 47), + HealpixPixel(2, 128), + HealpixPixel(2, 129), + HealpixPixel(2, 130), + HealpixPixel(2, 131), + ] + )