Skip to content

Commit

Permalink
Add basic index functionality (#191)
Browse files Browse the repository at this point in the history
* Add basic index functionality

* Address lint findings

* More linty stuff that pre-commit is not catching.

* Expanding comments.
  • Loading branch information
delucchi-cmu authored Jan 10, 2024
1 parent 7a25cde commit 6574ad2
Show file tree
Hide file tree
Showing 33 changed files with 349 additions and 23 deletions.
57 changes: 57 additions & 0 deletions src/hipscat/catalog/index/index_catalog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
from typing import List

import numpy as np
import pyarrow.compute as pc
import pyarrow.dataset as pds
from typing_extensions import TypeAlias

from hipscat.catalog.dataset import Dataset
from hipscat.catalog.index import IndexCatalogInfo
from hipscat.io import paths
from hipscat.pixel_math import HealpixPixel
from hipscat.pixel_math.healpix_pixel_function import get_pixel_argsort


class IndexCatalog(Dataset):
"""An index into HiPSCat Catalog for enabling fast lookups on non-spatial values.
Note that this is not a true "HiPScat Catalog", as it is not partitioned spatially.
"""

CatalogInfoClass: TypeAlias = IndexCatalogInfo
catalog_info: CatalogInfoClass

def loc_partitions(self, ids) -> List[HealpixPixel]:
"""Find the set of partitions in the primary catalog for the ids provided.
Args:
ids: the values of the indexing column (e.g. 87,543)
Returns:
partitions of leaf parquet files in the primary catalog
that may contain rows for the id values
"""
metadata_file = paths.get_parquet_metadata_pointer(self.catalog_base_dir)
dataset = pds.parquet_dataset(metadata_file)

# There's a lot happening in a few pyarrow dataset methods:
# We create a simple pyarrow expression that roughly corresponds to a SQL statement like
# WHERE id_column IN (<ids>)
# We stay in pyarrow to group by Norder/Npix to aggregate the results unique values.
# After that convert into pandas, as this handles the integer type conversions
# (uint8 and uint64 aren't always friendly between pyarrow and the rest of python),
# and offers easy iteration to create our HealpixPixel list.
filtered = dataset.filter(pc.field(self.catalog_info.indexing_column).isin(ids)).to_table()
unique_pixel_dataframe = filtered.group_by(["Norder", "Npix"]).aggregate([]).to_pandas()

loc_partitions = [
HealpixPixel(order, pixel)
for order, pixel in zip(
unique_pixel_dataframe["Norder"],
unique_pixel_dataframe["Npix"],
)
]
# Put the partitions in stable order (by nested healpix ordering).
argsort = get_pixel_argsort(loc_partitions)
loc_partitions = np.array(loc_partitions)[argsort]

return loc_partitions
2 changes: 2 additions & 0 deletions src/hipscat/pixel_math/healpix_pixel_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ def get_pixel_argsort(pixels: List[HealpixPixel]):
Returns:
array of indices that sort the pixels in breadth-first order.
"""
if not pixels:
return []
# Construct a parallel list of exploded, high order pixels.
highest_order = np.max(pixels).order

Expand Down
Binary file modified tests/data/small_sky/Norder=0/Dir=0/Npix=11.parquet
Binary file not shown.
Binary file modified tests/data/small_sky/_common_metadata
Binary file not shown.
Binary file modified tests/data/small_sky/_metadata
Binary file not shown.
14 changes: 5 additions & 9 deletions tests/data/small_sky/catalog_info.json
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
{
"catalog_name": "small_sky",
"catalog_type": "source",
"version": "0.0.1",
"generation_date": "2022.12.20",
"catalog_type": "object",
"total_rows": 131,
"epoch": "J2000",
"ra_kw": "ra",
"dec_kw": "dec",
"id_kw": "id",
"total_objects": 131,
"pixel_threshold": 1000000
}
"ra_column": "ra",
"dec_column": "dec"
}
Binary file modified tests/data/small_sky/point_map.fits
Binary file not shown.
52 changes: 52 additions & 0 deletions tests/data/small_sky/provenance_info.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
{
"catalog_name": "small_sky",
"catalog_type": "object",
"total_rows": 131,
"epoch": "J2000",
"ra_column": "ra",
"dec_column": "dec",
"version": "0.2.1",
"generation_date": "2024.01.09",
"tool_args": {
"tool_name": "hipscat_import",
"version": "0.2.1",
"runtime_args": {
"catalog_name": "small_sky",
"output_path": "/home/delucchi/git/hipscat/tests/data/",
"output_artifact_name": "small_sky",
"tmp_dir": "",
"overwrite": true,
"dask_tmp": "",
"dask_n_workers": 1,
"dask_threads_per_worker": 1,
"catalog_path": "/home/delucchi/git/hipscat/tests/data/small_sky",
"tmp_path": "/home/delucchi/git/hipscat/tests/data/small_sky/intermediate",
"epoch": "J2000",
"catalog_type": "object",
"input_path": "/home/delucchi/git/hipscat-import/tests/hipscat_import/data/small_sky",
"input_paths": [
"file:///home/delucchi/git/hipscat-import/tests/hipscat_import/data/small_sky/catalog.csv"
],
"input_format": "csv",
"input_file_list": [],
"ra_column": "ra",
"dec_column": "dec",
"use_hipscat_index": false,
"sort_columns": null,
"constant_healpix_order": -1,
"highest_healpix_order": 7,
"pixel_threshold": 1000000,
"mapping_healpix_order": 7,
"debug_stats_only": false,
"file_reader_info": {
"input_reader_type": "CsvReader",
"chunksize": 500000,
"header": "infer",
"schema_file": null,
"separator": ",",
"column_names": null,
"type_map": {}
}
}
}
}
Binary file modified tests/data/small_sky_order1/Norder=1/Dir=0/Npix=44.parquet
Binary file not shown.
Binary file modified tests/data/small_sky_order1/Norder=1/Dir=0/Npix=45.parquet
Binary file not shown.
Binary file modified tests/data/small_sky_order1/Norder=1/Dir=0/Npix=46.parquet
Binary file not shown.
Binary file modified tests/data/small_sky_order1/Norder=1/Dir=0/Npix=47.parquet
Binary file not shown.
31 changes: 31 additions & 0 deletions tests/data/small_sky_order1/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Catalog description

This catalog has the same data points as other small sky catalogs,
but is coerced to spreading these data points over partitions at order 1, instead
of order 0.

This means there are 4 leaf partition files, instead of just 1, and so can
be useful for confirming reads/writes over multiple leaf partition files.

This catalog was generated with the following snippet:

```
import hipscat_import.pipeline as runner
from hipscat_import.catalog.arguments import ImportArguments
def create_order1():
args = ImportArguments(
input_path="tests/hipscat_import/data/small_sky",
output_path="tests/data",
input_format="csv",
output_artifact_name="small_sky_order1",
constant_healpix_order=1,
)
runner.pipeline(args)
if __name__ == "__main__":
create_index()
```

NB: Setting `constant_healpix_order` coerces the import pipeline to create
leaf partitions at order 1.
Binary file modified tests/data/small_sky_order1/_common_metadata
Binary file not shown.
Binary file modified tests/data/small_sky_order1/_metadata
Binary file not shown.
14 changes: 5 additions & 9 deletions tests/data/small_sky_order1/catalog_info.json
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
{
"catalog_name": "small_sky_order1",
"catalog_type": "source",
"version": "0.0.0",
"generation_date": "2022.12.21",
"catalog_type": "object",
"total_rows": 131,
"epoch": "J2000",
"ra_kw": "ra",
"dec_kw": "dec",
"id_kw": "id",
"total_objects": 131,
"pixel_threshold": 50
}
"ra_column": "ra",
"dec_column": "dec"
}
Binary file modified tests/data/small_sky_order1/point_map.fits
Binary file not shown.
52 changes: 52 additions & 0 deletions tests/data/small_sky_order1/provenance_info.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
{
"catalog_name": "small_sky_order1",
"catalog_type": "object",
"total_rows": 131,
"epoch": "J2000",
"ra_column": "ra",
"dec_column": "dec",
"version": "0.2.1",
"generation_date": "2024.01.09",
"tool_args": {
"tool_name": "hipscat_import",
"version": "0.2.1",
"runtime_args": {
"catalog_name": "small_sky_order1",
"output_path": "tests/data",
"output_artifact_name": "small_sky_order1",
"tmp_dir": "",
"overwrite": true,
"dask_tmp": "",
"dask_n_workers": 1,
"dask_threads_per_worker": 1,
"catalog_path": "tests/data/small_sky_order1",
"tmp_path": "tests/data/small_sky_order1/intermediate",
"epoch": "J2000",
"catalog_type": "object",
"input_path": "/home/delucchi/git/hipscat-import/tests/hipscat_import/data/small_sky",
"input_paths": [
"file:///home/delucchi/git/hipscat-import/tests/hipscat_import/data/small_sky/catalog.csv"
],
"input_format": "csv",
"input_file_list": [],
"ra_column": "ra",
"dec_column": "dec",
"use_hipscat_index": false,
"sort_columns": null,
"constant_healpix_order": 1,
"highest_healpix_order": 7,
"pixel_threshold": 1000000,
"mapping_healpix_order": 1,
"debug_stats_only": false,
"file_reader_info": {
"input_reader_type": "CsvReader",
"chunksize": 500000,
"header": "infer",
"schema_file": null,
"separator": ",",
"column_names": null,
"type_map": {}
}
}
}
}
Binary file not shown.
Binary file added tests/data/small_sky_order1_id_index/_metadata
Binary file not shown.
8 changes: 8 additions & 0 deletions tests/data/small_sky_order1_id_index/catalog_info.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"catalog_name": "small_sky_order1_id_index",
"catalog_type": "index",
"total_rows": 131,
"primary_catalog": "/home/delucchi/git/hipscat/tests/data/small_sky_order1",
"indexing_column": "id",
"extra_columns": []
}
Binary file not shown.
31 changes: 31 additions & 0 deletions tests/data/small_sky_order1_id_index/provenance_info.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
{
"catalog_name": "small_sky_order1_id_index",
"catalog_type": "index",
"total_rows": 131,
"primary_catalog": "/home/delucchi/git/hipscat/tests/data/small_sky_order1",
"indexing_column": "id",
"extra_columns": [],
"version": "0.2.1",
"generation_date": "2024.01.09",
"tool_args": {
"tool_name": "hipscat_import",
"version": "0.2.1",
"runtime_args": {
"catalog_name": "small_sky_order1_id_index",
"output_path": "/home/delucchi/git/hipscat/tests/data/",
"output_artifact_name": "small_sky_order1_id_index",
"tmp_dir": "",
"overwrite": true,
"dask_tmp": "",
"dask_n_workers": 1,
"dask_threads_per_worker": 1,
"catalog_path": "/home/delucchi/git/hipscat/tests/data/small_sky_order1_id_index",
"tmp_path": "/home/delucchi/git/hipscat/tests/data/small_sky_order1_id_index/intermediate",
"input_catalog_path": "/home/delucchi/git/hipscat/tests/data/small_sky_order1",
"indexing_column": "id",
"extra_columns": [],
"include_hipscat_index": "False",
"include_order_pixel": true
}
}
}
36 changes: 36 additions & 0 deletions tests/data/small_sky_source_object_index/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Catalog description

This catalog exists as an index of the SOURCE table, using the OBJECT ID
as the indexed column. This means you should be able to quickly find
partions of SOURCES for a given OBJECT ID.

This catalog was generated using the following snippet:

```
import hipscat_import.pipeline as runner
from hipscat_import.index.arguments import IndexArguments
def create_index():
args = IndexArguments(
input_catalog_path="./tests/hipscat_import/data/small_sky_source_catalog/",
indexing_column="object_id",
output_path="./tests/data/",
output_artifact_name="small_sky_source_object_index",
include_hipscat_index=False,
compute_partition_size=200_000,
)
runner.pipeline(args)
if __name__ == "__main__":
create_index()
```

NB:

- Setting `compute_partition_size` to something less than `1_000_000`
coerces the import pipeline to create smaller result partitions,
and so we have three distinct index partitions.
- Setting `include_hipscat_index=False` keeps us from needing a row for every
source and lets the indexing pipeline create only one row per
unique objectId/Norder/Npix
Binary file not shown.
Binary file not shown.
8 changes: 8 additions & 0 deletions tests/data/small_sky_source_object_index/catalog_info.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"catalog_name": "small_sky_source_object_index",
"catalog_type": "index",
"total_rows": 148,
"primary_catalog": "/home/delucchi/git/hipscat-import/tests/hipscat_import/data/small_sky_source_catalog/",
"indexing_column": "object_id",
"extra_columns": []
}
Binary file not shown.
31 changes: 31 additions & 0 deletions tests/data/small_sky_source_object_index/provenance_info.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
{
"catalog_name": "small_sky_source_object_index",
"catalog_type": "index",
"total_rows": 148,
"primary_catalog": "/home/delucchi/git/hipscat-import/tests/hipscat_import/data/small_sky_source_catalog/",
"indexing_column": "object_id",
"extra_columns": [],
"version": "0.2.1",
"generation_date": "2024.01.09",
"tool_args": {
"tool_name": "hipscat_import",
"version": "0.2.1",
"runtime_args": {
"catalog_name": "small_sky_source_object_index",
"output_path": "./tests/data/",
"output_artifact_name": "small_sky_source_object_index",
"tmp_dir": "",
"overwrite": true,
"dask_tmp": "",
"dask_n_workers": 1,
"dask_threads_per_worker": 1,
"catalog_path": "./tests/data/small_sky_source_object_index",
"tmp_path": "./tests/data/small_sky_source_object_index/intermediate",
"input_catalog_path": "/home/delucchi/git/hipscat-import/tests/hipscat_import/data/small_sky_source_catalog/",
"indexing_column": "object_id",
"extra_columns": [],
"include_hipscat_index": "False",
"include_order_pixel": true
}
}
}
18 changes: 18 additions & 0 deletions tests/hipscat/catalog/index/test_index_catalog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import os

import numpy.testing as npt

from hipscat.catalog.index.index_catalog import IndexCatalog
from hipscat.pixel_math import HealpixPixel


def test_loc_partition(test_data_dir):
index_catalog_dir = os.path.join(test_data_dir, "small_sky_source_object_index")
catalog = IndexCatalog.read_from_hipscat(index_catalog_dir)

assert catalog.on_disk
assert catalog.catalog_path == index_catalog_dir

npt.assert_array_equal(catalog.loc_partitions([700]), [HealpixPixel(2, 184)])
npt.assert_array_equal(catalog.loc_partitions([707]), [HealpixPixel(2, 176), HealpixPixel(2, 178)])
npt.assert_array_equal(catalog.loc_partitions([900]), [])
5 changes: 4 additions & 1 deletion tests/hipscat/io/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,10 @@ def basic_catalog_parquet_metadata():
pa.field("dec", pa.float64()),
pa.field("ra_error", pa.int64()),
pa.field("dec_error", pa.int64()),
pa.field("__index_level_0__", pa.int64()),
pa.field("Norder", pa.uint8()),
pa.field("Dir", pa.uint64()),
pa.field("Npix", pa.uint64()),
pa.field("_hipscat_index", pa.uint64()),
]
)

Expand Down
Loading

0 comments on commit 6574ad2

Please sign in to comment.