Skip to content

Commit

Permalink
Replace FilePointer with universal pathlib (#413)
Browse files Browse the repository at this point in the history
* Checkpoint upath

* Remove storage options and clean up imports.

* Better type hints

* Add columns kwarg.

* Add pip install s3fs

* Clear output again.
  • Loading branch information
delucchi-cmu authored Sep 16, 2024
1 parent a94b85b commit cb30209
Show file tree
Hide file tree
Showing 15 changed files with 223 additions and 128 deletions.
2 changes: 1 addition & 1 deletion docs/tutorials/exporting_results.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
"\n",
"You can save the catalogs that result from running your workflow to disk, in parquet format, using the `to_hipscat` call. \n",
"\n",
"You must provide a `base_catalog_path`, which is the output path for your catalog directory, and (optionally) a name for your catalog, `catalog_name`. The `catalog_name` is the catalog's internal name and therefore may differ from the catalog's base directory name. If the directory already exists and you want to overwrite its content set the `overwrite` flag to True. Do not forget to provide the necessary credentials, as `storage_options`, when trying to export the catalog to protected remote storage.\n",
"You must provide a `base_catalog_path`, which is the output path for your catalog directory, and (optionally) a name for your catalog, `catalog_name`. The `catalog_name` is the catalog's internal name and therefore may differ from the catalog's base directory name. If the directory already exists and you want to overwrite its content set the `overwrite` flag to True. Do not forget to provide the necessary credentials, as `storage_options` to the UPath construction, when trying to export the catalog to protected remote storage.\n",
"\n",
"For example, to save a catalog that contains the results of crossmatching Gaia with ZTF to `\"./my_catalogs/gaia_x_ztf\"` one could run:\n",
"```python\n",
Expand Down
141 changes: 141 additions & 0 deletions docs/tutorials/remote_data.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Accessing Remote Data\n",
"\n",
"If you're accessing hipscat catalogs on a local file system, a typical path string like `\"/path/to/catalogs\"` will be sufficient. This tutorial will help you get started if you need to access data over HTTP/S, cloud storage, or have some additional parameters for connecting to your data.\n",
"\n",
"We use [`fsspec`](https://github.com/fsspec/filesystem_spec) and [`universal_pathlib`](https://github.com/fsspec/universal_pathlib) to create connections to remote sources for data. Please refer to their documentation for a list of supported filesystems and any filesystem-specific parameters.\n",
"\n",
"Below, we provide some a basic workflow for accessing remote data, as well as filesystem-specific hints."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## HTTP / HTTPS\n",
"\n",
"Firstly, make sure to install the fsspect http package:\n",
"\n",
"```\n",
"pip install aiohttp \n",
"```\n",
"OR\n",
"```\n",
"conda install aiohttp\n",
"```\n",
"\n",
"Occasionally, with HTTPS data, you may see issues with missing certificates. If you encounter a `FileNotFoundError`, but you're pretty sure the file should be found:\n",
"\n",
"1. Check your network and server availability\n",
"2. On Linux be sure that openSSL and ca-certificates are in place\n",
"3. On Mac run `/Applications/Python\\ 3.*/Install\\ Certificates.command`"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from upath import UPath\n",
"\n",
"test_path = UPath(\"https://data.lsdb.io/unstable/gaia_dr3/gaia/\")\n",
"test_path.exists()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import lsdb\n",
"\n",
"cat = lsdb.read_hipscat(\"https://data.lsdb.io/unstable/gaia_dr3/gaia/\")\n",
"cat"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## S3 and cloud resources\n",
"\n",
"You'll want to install the `s3fs` package\n",
"\n",
"```\n",
"pip install s3fs\n",
"```\n",
"\n",
"(or `adlfs` or `gcsfs` for your cloud provider).\n",
"\n",
"You can pass your credentials once when creating the `UPath` instance, and use that configuration for any other paths constructed from that instance. In the case of PanStarrs, this is hosted in a public cloud bucket. You are still required to pass `anon = True` for public data buckets, in lieu of credentials. You can confirm that the path and credentials are good, before incurring any further expensive data reads."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"!pip install s3fs --quiet"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"panstarrs_path = UPath(\"s3://stpubdata/panstarrs/ps1/public/hipscat/\", anon=True)\n",
"test_path.exists()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"cat = lsdb.read_hipscat(panstarrs_path / \"otmo\")\n",
"cat"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"cat = lsdb.read_hipscat(panstarrs_path / \"detection\")\n",
"cat"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "hipscatenv",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.14"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
10 changes: 5 additions & 5 deletions src/lsdb/catalog/dataset/healpix_dataset.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import warnings
from pathlib import Path
from typing import Any, Callable, Dict, Iterable, List, Tuple

import dask
Expand All @@ -21,6 +22,7 @@
from pandas._typing import AnyAll, Axis, IndexLabel
from pandas.api.extensions import no_default
from typing_extensions import Self
from upath import UPath

from lsdb import io
from lsdb.catalog.dataset.dataset import Dataset
Expand Down Expand Up @@ -110,7 +112,7 @@ def get_partition_index(self, order: int, pixel: int) -> int:
ValueError: if no data exists for the specified pixel
"""
hp_pixel = HealpixPixel(order, pixel)
if not hp_pixel in self._ddf_pixel_map:
if hp_pixel not in self._ddf_pixel_map:
raise ValueError(f"Pixel at order {order} pixel {pixel} not in Catalog")
partition_index = self._ddf_pixel_map[hp_pixel]
return partition_index
Expand Down Expand Up @@ -411,10 +413,9 @@ def plot_pixels(self, projection: str = "moll", **kwargs):

def to_hipscat(
self,
base_catalog_path: str,
base_catalog_path: str | Path | UPath,
catalog_name: str | None = None,
overwrite: bool = False,
storage_options: dict | None = None,
**kwargs,
):
"""Saves the catalog to disk in HiPSCat format
Expand All @@ -423,10 +424,9 @@ def to_hipscat(
base_catalog_path (str): Location where catalog is saved to
catalog_name (str): The name of the catalog to be saved
overwrite (bool): If True existing catalog is overwritten
storage_options (dict): Dictionary that contains abstract filesystem credentials
**kwargs: Arguments to pass to the parquet write operations
"""
io.to_hipscat(self, base_catalog_path, catalog_name, overwrite, storage_options, **kwargs)
io.to_hipscat(self, base_catalog_path, catalog_name, overwrite, **kwargs)

def dropna(
self,
Expand Down
62 changes: 23 additions & 39 deletions src/lsdb/io/to_hipscat.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@
import dataclasses
from copy import copy
from importlib.metadata import version
from typing import TYPE_CHECKING, Any, Dict, Union
from pathlib import Path
from typing import TYPE_CHECKING, Dict, Union

import dask
import hipscat as hc
import nested_pandas as npd
from hipscat.catalog.healpix_dataset.healpix_dataset import HealpixDataset as HCHealpixDataset
from hipscat.io import FilePointer
from hipscat.pixel_math import HealpixPixel
from upath import UPath

from lsdb.types import HealpixInfo

Expand All @@ -22,8 +23,7 @@
def perform_write(
df: npd.NestedFrame,
hp_pixel: HealpixPixel,
base_catalog_dir: FilePointer,
storage_options: dict | None = None,
base_catalog_dir: str | Path | UPath,
**kwargs,
) -> int:
"""Performs a write of a pandas dataframe to a single parquet file, following the hipscat structure.
Expand All @@ -33,8 +33,7 @@ def perform_write(
Args:
df (npd.NestedFrame): dataframe to write to file
hp_pixel (HealpixPixel): HEALPix pixel of file to be written
base_catalog_dir (FilePointer): Location of the base catalog directory to write to
storage_options (dict): fsspec storage options
base_catalog_dir (path-like): Location of the base catalog directory to write to
**kwargs: other kwargs to pass to pd.to_parquet method
Returns:
Expand All @@ -43,19 +42,18 @@ def perform_write(
if len(df) == 0:
return 0
pixel_dir = hc.io.pixel_directory(base_catalog_dir, hp_pixel.order, hp_pixel.pixel)
hc.io.file_io.make_directory(pixel_dir, exist_ok=True, storage_options=storage_options)
pixel_path = hc.io.paths.pixel_catalog_file(base_catalog_dir, hp_pixel.order, hp_pixel.pixel)
hc.io.file_io.write_dataframe_to_parquet(df, pixel_path, storage_options=storage_options, **kwargs)
hc.io.file_io.make_directory(pixel_dir, exist_ok=True)
pixel_path = hc.io.paths.pixel_catalog_file(base_catalog_dir, hp_pixel)
hc.io.file_io.write_dataframe_to_parquet(df, pixel_path, **kwargs)
return len(df)


# pylint: disable=W0212
def to_hipscat(
catalog: HealpixDataset,
base_catalog_path: str,
base_catalog_path: str | Path | UPath,
catalog_name: Union[str, None] = None,
overwrite: bool = False,
storage_options: dict | None = None,
**kwargs,
):
"""Writes a catalog to disk, in HiPSCat format. The output catalog comprises
Expand All @@ -67,61 +65,48 @@ def to_hipscat(
base_catalog_path (str): Location where catalog is saved to
catalog_name (str): The name of the output catalog
overwrite (bool): If True existing catalog is overwritten
storage_options (dict): Dictionary that contains abstract filesystem credentials
**kwargs: Arguments to pass to the parquet write operations
"""
base_catalog_dir_fp = hc.io.get_file_pointer_from_path(base_catalog_path)
# Create the output directory for the catalog
if hc.io.file_io.directory_has_contents(base_catalog_dir_fp, storage_options=storage_options):
if hc.io.file_io.directory_has_contents(base_catalog_path):
if not overwrite:
raise ValueError(
f"base_catalog_path ({base_catalog_dir_fp}) contains files."
f"base_catalog_path ({str(base_catalog_path)}) contains files."
" choose a different directory or set overwrite to True."
)
hc.io.file_io.remove_directory(base_catalog_dir_fp, storage_options=storage_options)
hc.io.file_io.make_directory(base_catalog_dir_fp, exist_ok=True, storage_options=storage_options)
hc.io.file_io.remove_directory(base_catalog_path)
hc.io.file_io.make_directory(base_catalog_path, exist_ok=True)
# Save partition parquet files
pixel_to_partition_size_map = write_partitions(
catalog, base_catalog_dir_fp, storage_options=storage_options, **kwargs
)
pixel_to_partition_size_map = write_partitions(catalog, base_catalog_path, **kwargs)
# Save parquet metadata
hc.io.write_parquet_metadata(base_catalog_path, storage_options=storage_options)
hc.io.write_parquet_metadata(base_catalog_path)
# Save partition info
partition_info = _get_partition_info_dict(pixel_to_partition_size_map)
hc.io.write_partition_info(base_catalog_dir_fp, partition_info, storage_options=storage_options)
hc.io.write_partition_info(base_catalog_path, partition_info)
# Save catalog info
new_hc_structure = create_modified_catalog_structure(
catalog.hc_structure,
base_catalog_path,
catalog_name if catalog_name else catalog.hc_structure.catalog_name,
total_rows=sum(pi[0] for pi in partition_info.values()),
)
hc.io.write_catalog_info(
catalog_base_dir=base_catalog_path,
dataset_info=new_hc_structure.catalog_info,
storage_options=storage_options,
)
hc.io.write_catalog_info(catalog_base_dir=base_catalog_path, dataset_info=new_hc_structure.catalog_info)
# Save provenance info
hc.io.write_metadata.write_provenance_info(
catalog_base_dir=base_catalog_dir_fp,
catalog_base_dir=base_catalog_path,
dataset_info=new_hc_structure.catalog_info,
tool_args=_get_provenance_info(new_hc_structure),
storage_options=storage_options,
)


def write_partitions(
catalog: HealpixDataset,
base_catalog_dir_fp: FilePointer,
storage_options: Union[Dict[Any, Any], None] = None,
**kwargs,
catalog: HealpixDataset, base_catalog_dir_fp: str | Path | UPath, **kwargs
) -> Dict[HealpixPixel, int]:
"""Saves catalog partitions as parquet to disk
Args:
catalog (HealpixDataset): A catalog to export
base_catalog_dir_fp (FilePointer): Path to the base directory of the catalog
storage_options (dict): Dictionary that contains abstract filesystem credentials
base_catalog_dir_fp (UPath): Path to the base directory of the catalog
**kwargs: Arguments to pass to the parquet write operations
Returns:
Expand All @@ -138,7 +123,6 @@ def write_partitions(
partitions[partition_index],
pixel,
base_catalog_dir_fp,
storage_options=storage_options,
**kwargs,
)
)
Expand Down Expand Up @@ -174,13 +158,13 @@ def _get_partition_info_dict(ddf_points_map: Dict[HealpixPixel, int]) -> Dict[He


def create_modified_catalog_structure(
catalog_structure: HCHealpixDataset, catalog_base_dir: str, catalog_name: str, **kwargs
catalog_structure: HCHealpixDataset, catalog_base_dir: str | Path | UPath, catalog_name: str, **kwargs
) -> HCHealpixDataset:
"""Creates a modified version of the HiPSCat catalog structure
Args:
catalog_structure (hc.catalog.Catalog): HiPSCat catalog structure
catalog_base_dir (str): Base location for the catalog
catalog_base_dir (UPath): Base location for the catalog
catalog_name (str): The name of the catalog to be saved
**kwargs: The remaining parameters to be updated in the catalog info object
Expand All @@ -190,7 +174,7 @@ def create_modified_catalog_structure(
new_hc_structure = copy(catalog_structure)
new_hc_structure.catalog_name = catalog_name
new_hc_structure.catalog_path = catalog_base_dir
new_hc_structure.catalog_base_dir = hc.io.file_io.get_file_pointer_from_path(catalog_base_dir)
new_hc_structure.catalog_base_dir = hc.io.file_io.get_upath(catalog_base_dir)
new_hc_structure.on_disk = True
new_hc_structure.catalog_info = dataclasses.replace(
new_hc_structure.catalog_info, catalog_name=catalog_name, **kwargs
Expand Down
Loading

0 comments on commit cb30209

Please sign in to comment.