Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set new and additional properties. #396

Merged
merged 9 commits into from
Sep 30, 2024
7 changes: 0 additions & 7 deletions docs/catalogs/arguments.rst
Original file line number Diff line number Diff line change
Expand Up @@ -150,13 +150,6 @@ You can find the full API documentation for
smaller_table = filter_nonsense(smaller_table)
yield smaller_table.to_pandas()

def provenance_info(self) -> dict:
provenance_info = {
"input_reader_type": "StarrReader",
"chunksize": self.chunksize,
}
return provenance_info

...

args = ImportArguments(
Expand Down
2 changes: 1 addition & 1 deletion docs/guide/index_table.rst
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ string sorting will be smart enough to collate the various strings appropriately

.. code-block:: python

divisions = [f"Gaia DR3 {i}" for i in range(10000, 99999, 12)]
divisions = [f"Gaia DR3 {i}" for i in range(10_000, 99_999, 12)]
divisions.append("Gaia DR3 999999988604363776")

Getting hints from ``_metadata``
Expand Down
2 changes: 2 additions & 0 deletions src/.pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,8 @@ ignored-parents=
# Maximum number of arguments for function / method.
max-args=10

max-positional-arguments=15

# Maximum number of attributes for a class (see R0902).
max-attributes=20

Expand Down
33 changes: 8 additions & 25 deletions src/hats_import/catalog/arguments.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,40 +128,23 @@ def _check_arguments(self):
# Basic checks complete - make more checks and create directories where necessary
self.input_paths = find_input_paths(self.input_path, "**/*.*", self.input_file_list)

def to_table_properties(self, total_rows: int) -> TableProperties:
def to_table_properties(
self, total_rows: int, highest_order: int, moc_sky_fraction: float
) -> TableProperties:
"""Catalog-type-specific dataset info."""
info = {
"catalog_name": self.output_artifact_name,
"catalog_type": self.catalog_type,
"total_rows": total_rows,
"ra_column": self.ra_column,
"dec_column": self.dec_column,
}
"hats_cols_sort": self.sort_columns,
"hats_max_rows": self.pixel_threshold,
"hats_order": highest_order,
"moc_sky_fraction": f"{moc_sky_fraction:0.5f}",
} | self.extra_property_dict()
delucchi-cmu marked this conversation as resolved.
Show resolved Hide resolved
return TableProperties(**info)

def additional_runtime_provenance_info(self) -> dict:
file_reader_info = {"type": self.file_reader}
if isinstance(self.file_reader, InputReader):
file_reader_info = self.file_reader.provenance_info()
return {
"catalog_name": self.output_artifact_name,
"catalog_type": self.catalog_type,
"input_path": self.input_path,
"input_paths": self.input_paths,
"input_file_list": self.input_file_list,
"ra_column": self.ra_column,
"dec_column": self.dec_column,
"use_healpix_29": self.use_healpix_29,
"sort_columns": self.sort_columns,
"constant_healpix_order": self.constant_healpix_order,
"lowest_healpix_order": self.lowest_healpix_order,
"highest_healpix_order": self.highest_healpix_order,
"pixel_threshold": self.pixel_threshold,
"mapping_healpix_order": self.mapping_healpix_order,
"debug_stats_only": self.debug_stats_only,
"file_reader_info": file_reader_info,
}


def check_healpix_order_range(order, field_name, lower_bound=0, upper_bound=hipscat_id.SPATIAL_INDEX_ORDER):
"""Helper method to check if the ``order`` is within the range determined by the
Expand Down
15 changes: 0 additions & 15 deletions src/hats_import/catalog/file_readers.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,21 +98,6 @@ def read(self, input_file, read_columns=None):
DataFrame containing chunk of file info.
"""

def provenance_info(self) -> dict:
"""Create dictionary of parameters for provenance tracking.

If any `storage_options` have been provided as kwargs, we will replace the
value with ``REDACTED`` for the purpose of writing to provenance info, as it
may contain user names or API keys.

Returns:
dictionary with all argument_name -> argument_value as key -> value pairs.
"""
all_args = vars(self)
if "kwargs" in all_args and "storage_options" in all_args["kwargs"]:
all_args["kwargs"]["storage_options"] = "REDACTED"
return {"input_reader_type": type(self).__name__, **vars(self)}

def regular_file_exists(self, input_file, **_kwargs):
"""Check that the `input_file` points to a single regular file

Expand Down
14 changes: 7 additions & 7 deletions src/hats_import/catalog/run_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import os
import pickle

import hats.io.write_metadata as io
import hats.io.file_io as io
from hats.catalog import PartitionInfo
from hats.io import paths
from hats.io.parquet_metadata import write_parquet_metadata
Expand Down Expand Up @@ -122,11 +122,6 @@ def run(args, client):
# All done - write out the metadata
if resume_plan.should_run_finishing:
with resume_plan.print_progress(total=5, stage_name="Finishing") as step_progress:
catalog_info = args.to_table_properties(total_rows)
catalog_info.to_properties_file(args.catalog_path)
step_progress.update(1)
## TODO - optionally write out arguments file
step_progress.update(1)
partition_info = PartitionInfo.from_healpix(resume_plan.get_destination_pixels())
partition_info_file = paths.get_partition_info_pointer(args.catalog_path)
partition_info.write_to_file(partition_info_file)
Expand All @@ -140,7 +135,12 @@ def run(args, client):
else:
partition_info.write_to_metadata_files(args.catalog_path)
step_progress.update(1)
io.write_fits_map(args.catalog_path, raw_histogram)
catalog_info = args.to_table_properties(
total_rows, partition_info.get_highest_order(), partition_info.calculate_fractional_coverage()
)
catalog_info.to_properties_file(args.catalog_path)
step_progress.update(1)
io.write_fits_image(raw_histogram, paths.get_point_map_file_pointer(args.catalog_path))
step_progress.update(1)
resume_plan.clean_resume_files()
step_progress.update(1)
17 changes: 3 additions & 14 deletions src/hats_import/index/arguments.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,22 +85,11 @@ def to_table_properties(self, total_rows: int) -> TableProperties:
"""Catalog-type-specific dataset info."""
info = {
"catalog_name": self.output_artifact_name,
"total_rows": total_rows,
"catalog_type": "index",
"total_rows": total_rows,
"primary_catalog": str(self.input_catalog_path),
"indexing_column": self.indexing_column,
}
if len(self.extra_columns) > 0:
info["extra_columns"] = self.extra_columns
"extra_columns": self.extra_columns,
} | self.extra_property_dict()

return TableProperties(**info)

def additional_runtime_provenance_info(self) -> dict:
return {
"input_catalog_path": self.input_catalog_path,
"indexing_column": self.indexing_column,
"extra_columns": self.extra_columns,
"include_healpix_29": self.include_healpix_29,
"include_order_pixel": self.include_order_pixel,
"include_radec": self.include_radec,
}
2 changes: 1 addition & 1 deletion src/hats_import/index/run_index.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Create columnar index of hats table using dask for parallelization"""

from hats.io import file_io, parquet_metadata, write_metadata
from hats.io import file_io, parquet_metadata

import hats_import.index.map_reduce as mr
from hats_import.index.arguments import IndexArguments
Expand Down
10 changes: 6 additions & 4 deletions src/hats_import/margin_cache/margin_cache.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from hats.catalog import PartitionInfo
from hats.io import file_io, parquet_metadata, paths, write_metadata
from hats.io import file_io, parquet_metadata, paths

import hats_import.margin_cache.margin_cache_map_reduce as mcmr
from hats_import.margin_cache.margin_cache_resume_plan import MarginCachePlan
Expand Down Expand Up @@ -63,11 +63,13 @@ def generate_margin_cache(args, client):
partition_info = PartitionInfo.read_from_file(metadata_path)
partition_info_file = paths.get_partition_info_pointer(args.catalog_path)
partition_info.write_to_file(partition_info_file)

step_progress.update(1)
margin_catalog_info = args.to_table_properties(int(total_rows))
margin_catalog_info = args.to_table_properties(
int(total_rows),
partition_info.get_highest_order(),
partition_info.calculate_fractional_coverage(),
)
margin_catalog_info.to_properties_file(args.catalog_path)
## TODO - optionally write out arguments file
step_progress.update(1)
file_io.remove_directory(args.tmp_path, ignore_errors=True)
step_progress.update(1)
16 changes: 6 additions & 10 deletions src/hats_import/margin_cache/margin_cache_arguments.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ def _check_arguments(self):
if margin_pixel_mindist * 60.0 < self.margin_threshold:
raise ValueError("margin pixels must be larger than margin_threshold")

def to_table_properties(self, total_rows) -> TableProperties:
def to_table_properties(
self, total_rows: int, highest_order: int, moc_sky_fraction: float
) -> TableProperties:
"""Catalog-type-specific dataset info."""
info = {
"catalog_name": self.output_artifact_name,
Expand All @@ -85,13 +87,7 @@ def to_table_properties(self, total_rows) -> TableProperties:
"dec_column": self.catalog.catalog_info.dec_column,
"primary_catalog": str(self.input_catalog_path),
"margin_threshold": self.margin_threshold,
}
"hats_order": highest_order,
"moc_sky_fraction": f"{moc_sky_fraction:0.5f}",
} | self.extra_property_dict()
return TableProperties(**info)

def additional_runtime_provenance_info(self) -> dict:
return {
"input_catalog_path": self.input_catalog_path,
"margin_threshold": self.margin_threshold,
"margin_order": self.margin_order,
"debug_filter_pixel_list": self.debug_filter_pixel_list,
}
25 changes: 12 additions & 13 deletions src/hats_import/margin_cache/margin_cache_map_reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import pyarrow as pa
import pyarrow.dataset as ds
from hats import pixel_math
from hats.catalog.partition_info import PartitionInfo
from hats.io import file_io, paths
from hats.pixel_math.healpix_pixel import HealpixPixel

Expand Down Expand Up @@ -112,22 +111,22 @@ def _to_pixel_shard(
shard_path = paths.pixel_catalog_file(partition_dir, source_pixel)

rename_columns = {
PartitionInfo.METADATA_ORDER_COLUMN_NAME: f"margin_{PartitionInfo.METADATA_ORDER_COLUMN_NAME}",
PartitionInfo.METADATA_DIR_COLUMN_NAME: f"margin_{PartitionInfo.METADATA_DIR_COLUMN_NAME}",
PartitionInfo.METADATA_PIXEL_COLUMN_NAME: f"margin_{PartitionInfo.METADATA_PIXEL_COLUMN_NAME}",
paths.PARTITION_ORDER: paths.MARGIN_ORDER,
paths.PARTITION_DIR: paths.MARGIN_DIR,
paths.PARTITION_PIXEL: paths.MARGIN_PIXEL,
}

margin_data = margin_data.rename(columns=rename_columns)

margin_data[PartitionInfo.METADATA_ORDER_COLUMN_NAME] = pixel.order
margin_data[PartitionInfo.METADATA_DIR_COLUMN_NAME] = pixel.dir
margin_data[PartitionInfo.METADATA_PIXEL_COLUMN_NAME] = pixel.pixel
margin_data[paths.PARTITION_ORDER] = pixel.order
margin_data[paths.PARTITION_DIR] = pixel.dir
margin_data[paths.PARTITION_PIXEL] = pixel.pixel

margin_data = margin_data.astype(
{
PartitionInfo.METADATA_ORDER_COLUMN_NAME: np.uint8,
PartitionInfo.METADATA_DIR_COLUMN_NAME: np.uint64,
PartitionInfo.METADATA_PIXEL_COLUMN_NAME: np.uint64,
paths.PARTITION_ORDER: np.uint8,
paths.PARTITION_DIR: np.uint64,
paths.PARTITION_PIXEL: np.uint64,
}
)
margin_data = margin_data.sort_index()
Expand All @@ -152,9 +151,9 @@ def reduce_margin_shards(
schema = file_io.read_parquet_metadata(original_catalog_metadata).schema.to_arrow_schema()

schema = (
schema.append(pa.field("margin_Norder", pa.uint8()))
.append(pa.field("margin_Dir", pa.uint64()))
.append(pa.field("margin_Npix", pa.uint64()))
schema.append(pa.field(paths.MARGIN_ORDER, pa.uint8()))
.append(pa.field(paths.MARGIN_DIR, pa.uint64()))
.append(pa.field(paths.MARGIN_PIXEL, pa.uint64()))
)
data = ds.dataset(shard_dir, format="parquet", schema=schema)
full_df = data.to_table().to_pandas()
Expand Down
62 changes: 31 additions & 31 deletions src/hats_import/runtime_arguments.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@

import re
from dataclasses import dataclass
from importlib.metadata import version
from datetime import datetime, timezone
from pathlib import Path

from hats.io import file_io
from upath import UPath

import hats_import

# pylint: disable=too-many-instance-attributes


Expand All @@ -22,6 +24,11 @@ class RuntimeArguments:
"""base path where new catalog should be output"""
output_artifact_name: str = ""
"""short, convenient name for the catalog"""
addl_hats_properties: dict = None
"""Any additional keyword arguments you would like to provide when writing
the `properties` file for the final HATS table. e.g.
{"hats_cols_default":"id, mjd", "hats_cols_survey_id":"unique_id",
"creator_did": "ivo://CDS/P/2MASS/J"}"""

## Execution
tmp_dir: str | Path | UPath | None = None
Expand Down Expand Up @@ -103,36 +110,19 @@ def _check_arguments(self):
else:
self.resume_tmp = self.tmp_path

def provenance_info(self) -> dict:
"""Fill all known information in a dictionary for provenance tracking.

Returns:
dictionary with all argument_name -> argument_value as key -> value pairs.
"""
runtime_args = {
"catalog_name": self.output_artifact_name,
"output_path": self.output_path,
"output_artifact_name": self.output_artifact_name,
"tmp_dir": self.tmp_dir,
"dask_tmp": self.dask_tmp,
"dask_n_workers": self.dask_n_workers,
"dask_threads_per_worker": self.dask_threads_per_worker,
"catalog_path": self.catalog_path,
"tmp_path": self.tmp_path,
}

runtime_args.update(self.additional_runtime_provenance_info())
provenance_info = {
"tool_name": "hats_import",
"version": version("hats-import"),
"runtime_args": runtime_args,
}

return provenance_info

def additional_runtime_provenance_info(self):
"""Any additional runtime args to be included in provenance info from subclasses"""
return {}
def extra_property_dict(self):
properties = {}
properties["hats_builder"] = f"hats-import v{hats_import.__version__}"

now = datetime.now(tz=timezone.utc)
properties["hats_creation_date"] = now.strftime("%Y-%m-%dT%H:%M%Z")
properties["hats_estsize"] = int(_estimate_dir_size(self.catalog_path) / 1024)
properties["hats_release_date"] = "2024-09-18"
properties["hats_version"] = "v0.1"

if self.addl_hats_properties:
properties = properties | self.addl_hats_properties
return properties


def find_input_paths(input_path="", file_matcher="", input_file_list=None):
Expand Down Expand Up @@ -166,3 +156,13 @@ def find_input_paths(input_path="", file_matcher="", input_file_list=None):
if len(input_paths) == 0:
raise FileNotFoundError("No input files found")
return input_paths


def _estimate_dir_size(dir):
total_size = 0
for item in dir.iterdir():
if item.is_dir():
total_size += _estimate_dir_size(item)
else:
total_size += item.stat().st_size
return total_size
17 changes: 4 additions & 13 deletions src/hats_import/soap/arguments.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def _check_arguments(self):
if self.compute_partition_size < 100_000:
raise ValueError("compute_partition_size must be at least 100_000")

def to_table_properties(self, total_rows: int) -> TableProperties:
def to_table_properties(self, total_rows=10, highest_order=4, moc_sky_fraction=22 / 7) -> TableProperties:
"""Catalog-type-specific dataset info."""
info = {
"catalog_name": self.output_artifact_name,
Expand All @@ -78,16 +78,7 @@ def to_table_properties(self, total_rows: int) -> TableProperties:
"join_column_association": "source_id",
"join_catalog": str(self.source_catalog_dir),
"contains_leaf_files": self.write_leaf_files,
}
"hats_order": highest_order,
"moc_sky_fraction": f"{moc_sky_fraction:0.5f}",
} | self.extra_property_dict()
return TableProperties(**info)

def additional_runtime_provenance_info(self) -> dict:
return {
"object_catalog_dir": self.object_catalog_dir,
"object_id_column": self.object_id_column,
"source_catalog_dir": self.source_catalog_dir,
"source_object_id_column": self.source_object_id_column,
"source_id_column": self.source_id_column,
"compute_partition_size": self.compute_partition_size,
"write_leaf_files": self.write_leaf_files,
}
Loading