Skip to content

Commit

Permalink
Make from_dataframe defaults consistent with hipscat-import (#379)
Browse files Browse the repository at this point in the history
* Make from_dataframe defaults consistent with hipscat-import

* Code review comments.
  • Loading branch information
delucchi-cmu authored Jul 23, 2024
1 parent 8b9e33a commit ad8ae51
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 61 deletions.
93 changes: 42 additions & 51 deletions src/lsdb/loaders/dataframe/dataframe_catalog_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import dataclasses
import math
import warnings
from typing import Dict, List, Tuple

import astropy.units as u
Expand All @@ -21,7 +22,7 @@
_append_partition_information_to_dataframe,
_generate_dask_dataframe,
)
from lsdb.types import DaskDFPixelMap, HealpixInfo
from lsdb.types import DaskDFPixelMap

pd.options.mode.chained_assignment = None # default='warn'

Expand All @@ -32,8 +33,9 @@ class DataframeCatalogLoader:
def __init__(
self,
dataframe: pd.DataFrame,
lowest_order: int = 3,
lowest_order: int = 0,
highest_order: int = 7,
drop_empty_siblings: bool = False,
partition_size: int | None = None,
threshold: int | None = None,
should_generate_moc: bool = True,
Expand All @@ -47,15 +49,22 @@ def __init__(
dataframe (pd.Dataframe): Catalog Pandas Dataframe.
lowest_order (int): The lowest partition order. Defaults to 3.
highest_order (int): The highest partition order. Defaults to 7.
partition_size (int): The desired partition size, in number of rows.
drop_empty_siblings (bool): When determining final partitionining,
if 3 of 4 pixels are empty, keep only the non-empty pixel
partition_size (int): The desired partition size, in number of bytes in-memory.
threshold (int): The maximum number of data points per pixel.
should_generate_moc (bool): should we generate a MOC (multi-order coverage map)
of the data. can improve performance when joining/crossmatching to
other hipscatted datasets.
moc_max_order (int): if generating a MOC, what to use as the max order. Defaults to 10.
use_pyarrow_types (bool): If True, the data is backed by pyarrow, otherwise we keep the
original data types. Defaults to True.
**kwargs: Arguments to pass to the creation of the catalog info.
"""
self.dataframe = dataframe
self.lowest_order = lowest_order
self.highest_order = highest_order
self.drop_empty_siblings = drop_empty_siblings
self.threshold = self._calculate_threshold(partition_size, threshold)
self.catalog_info = self._create_catalog_info(**kwargs)
self.should_generate_moc = should_generate_moc
Expand All @@ -73,6 +82,13 @@ def _calculate_threshold(self, partition_size: int | None = None, threshold: int
Returns:
The HEALPix pixel threshold
"""
df_total_memory = self.dataframe.memory_usage(deep=True).sum()
if df_total_memory > (1 << 30) or len(self.dataframe) > 1_000_000:
warnings.warn(
"from_dataframe is not intended for large datasets. "
"Consider using hipscat-import: https://hipscat-import.readthedocs.io/",
RuntimeWarning,
)
if threshold is not None and partition_size is not None:
raise ValueError("Specify only one: threshold or partition_size")
if threshold is None:
Expand All @@ -83,7 +99,6 @@ def _calculate_threshold(self, partition_size: int | None = None, threshold: int
threshold = len(self.dataframe) // num_partitions
else:
# Each partition in memory will be of roughly 1Gib
df_total_memory = self.dataframe.memory_usage(deep=True).sum()
partition_memory = df_total_memory / len(self.dataframe)
threshold = math.ceil((1 << 30) / partition_memory)
return threshold
Expand Down Expand Up @@ -111,12 +126,11 @@ def load_catalog(self) -> Catalog:
Catalog object with data from the source given at loader initialization
"""
self._set_hipscat_index()
pixel_map = self._compute_pixel_map()
ddf, ddf_pixel_map, total_rows = self._generate_dask_df_and_map(pixel_map)
pixel_list = self._compute_pixel_list()
ddf, ddf_pixel_map, total_rows = self._generate_dask_df_and_map(pixel_list)
self.catalog_info = dataclasses.replace(self.catalog_info, total_rows=total_rows)
healpix_pixels = list(pixel_map.keys())
moc = self._generate_moc() if self.should_generate_moc else None
hc_structure = hc.catalog.Catalog(self.catalog_info, healpix_pixels, moc=moc)
hc_structure = hc.catalog.Catalog(self.catalog_info, pixel_list, moc=moc)
return Catalog(ddf, ddf_pixel_map, hc_structure)

def _set_hipscat_index(self):
Expand All @@ -128,41 +142,39 @@ def _set_hipscat_index(self):
)
self.dataframe.set_index(HIPSCAT_ID_COLUMN, inplace=True)

def _compute_pixel_map(self) -> Dict[HealpixPixel, HealpixInfo]:
"""Compute object histogram and generate the sorted mapping between
HEALPix pixels and the respective original pixel information. The
pixels are sorted by ascending hipscat_id.
def _compute_pixel_list(self) -> List[HealpixPixel]:
"""Compute object histogram and generate the sorted list of
HEALPix pixels. The pixels are sorted by ascending hipscat_id.
Returns:
A dictionary mapping each HEALPix pixel to the respective
information tuple. The first value of the tuple is the number
of objects in the HEALPix pixel, the second is the list of pixels.
List of HEALPix pixels for the final partitioning.
"""
raw_histogram = generate_histogram(
self.dataframe,
highest_order=self.highest_order,
ra_column=self.catalog_info.ra_column,
dec_column=self.catalog_info.dec_column,
)
pixel_map = hc.pixel_math.compute_pixel_map(
alignment = hc.pixel_math.generate_alignment(
raw_histogram,
highest_order=self.highest_order,
lowest_order=self.lowest_order,
threshold=self.threshold,
drop_empty_siblings=self.drop_empty_siblings,
)
pixels = list(pixel_map.keys())
ordered_pixels = np.array(pixels)[get_pixel_argsort(pixels)]
return {pixel: pixel_map[pixel] for pixel in ordered_pixels}
non_none_elements = alignment[alignment != np.array(None)]
pixel_list = np.unique(non_none_elements)
pixel_list = [HealpixPixel(order, pix) for (order, pix, count) in pixel_list if int(count) > 0]
return list(np.array(pixel_list)[get_pixel_argsort(pixel_list)])

def _generate_dask_df_and_map(
self, pixel_map: Dict[HealpixPixel, HealpixInfo]
self, pixel_list: List[HealpixPixel]
) -> Tuple[dd.DataFrame, DaskDFPixelMap, int]:
"""Load Dask DataFrame from HEALPix pixel Dataframes and
generate a mapping of HEALPix pixels to HEALPix Dataframes
Args:
pixel_map (Dict[HealpixPixel, HealpixInfo]): The mapping between
catalog HEALPix pixels and respective data information.
pixel_list (List[HealpixPixel]): final partitioning of data
Returns:
Tuple containing the Dask Dataframe, the mapping of HEALPix pixels
Expand All @@ -174,42 +186,21 @@ def _generate_dask_df_and_map(
# Mapping HEALPix pixels to the respective Dataframe indices
ddf_pixel_map: Dict[HealpixPixel, int] = {}

for hp_pixel_index, hp_pixel_info in enumerate(pixel_map.items()):
hp_pixel, (_, pixels) = hp_pixel_info
for hp_pixel_index, hp_pixel in enumerate(pixel_list):
# Store HEALPix pixel in map
ddf_pixel_map[hp_pixel] = hp_pixel_index
# Obtain Dataframe for current HEALPix pixel
pixel_dfs.append(self._get_dataframe_for_healpix(hp_pixel, pixels))
# Obtain Dataframe for current HEALPix pixel, using NESTED characteristics.
left_bound = healpix_to_hipscat_id(hp_pixel.order, hp_pixel.pixel)
right_bound = healpix_to_hipscat_id(hp_pixel.order, hp_pixel.pixel + 1)
pixel_df = self.dataframe.loc[
(self.dataframe.index >= left_bound) & (self.dataframe.index < right_bound)
]
pixel_dfs.append(_append_partition_information_to_dataframe(pixel_df, hp_pixel))

# Generate Dask Dataframe with the original schema and desired backend
pixel_list = list(ddf_pixel_map.keys())
ddf, total_rows = _generate_dask_dataframe(pixel_dfs, pixel_list, self.use_pyarrow_types)
return ddf, ddf_pixel_map, total_rows

def _get_dataframe_for_healpix(self, hp_pixel: HealpixPixel, pixels: List[int]) -> pd.DataFrame:
"""Computes the Pandas Dataframe containing the data points
for a certain HEALPix pixel.
Using NESTED ordering scheme, the provided list is a sequence of contiguous
pixel numbers, in ascending order, inside the HEALPix pixel. Therefore, the
corresponding points in the Dataframe will be located between the hipscat
index of the lowest numbered pixel (left_bound) and the hipscat index of the
highest numbered pixel (right_bound).
Args:
hp_pixel (HealpixPixel): The HEALPix pixel to generate the Dataframe for
pixels (List[int]): The indices of the pixels inside the HEALPix pixel
Returns:
The Pandas Dataframe containing the data points for the HEALPix pixel.
"""
left_bound = healpix_to_hipscat_id(self.highest_order, pixels[0])
right_bound = healpix_to_hipscat_id(self.highest_order, pixels[-1] + 1)
pixel_df = self.dataframe.loc[
(self.dataframe.index >= left_bound) & (self.dataframe.index < right_bound)
]
return _append_partition_information_to_dataframe(pixel_df, hp_pixel)

def _generate_moc(self):
lon = self.dataframe[self.catalog_info.ra_column].to_numpy() * u.deg
lat = self.dataframe[self.catalog_info.dec_column].to_numpy() * u.deg
Expand Down
34 changes: 24 additions & 10 deletions src/lsdb/loaders/dataframe/from_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@
from lsdb.loaders.dataframe.margin_catalog_generator import MarginCatalogGenerator


# pylint: disable=too-many-arguments
def from_dataframe(
dataframe: pd.DataFrame,
lowest_order: int = 3,
*,
lowest_order: int = 0,
highest_order: int = 7,
drop_empty_siblings: bool = False,
partition_size: int | None = None,
threshold: int | None = None,
margin_order: int | None = -1,
Expand All @@ -22,15 +25,25 @@ def from_dataframe(
) -> Catalog:
"""Load a catalog from a Pandas Dataframe in CSV format.
Note that this is only suitable for small datasets (< 1million rows and
< 1GB dataframe in-memory). If you need to deal with large datasets, consider
using the hipscat-import package: https://hipscat-import.readthedocs.io/
Args:
dataframe (pd.Dataframe): The catalog Pandas Dataframe.
lowest_order (int): The lowest partition order. Defaults to 3.
lowest_order (int): The lowest partition order. Defaults to 0.
highest_order (int): The highest partition order. Defaults to 7.
partition_size (int): The desired partition size, in number of rows.
drop_empty_siblings (bool): When determining final partitionining,
if 3 of 4 pixels are empty, keep only the non-empty pixel
partition_size (int): The desired partition size, in number of bytes in-memory.
threshold (int): The maximum number of data points per pixel.
margin_order (int): The order at which to generate the margin cache.
margin_threshold (float): The size of the margin cache boundary, in arcseconds. If None,
the margin cache is not generated. Defaults to 5 arcseconds.
should_generate_moc (bool): should we generate a MOC (multi-order coverage map)
of the data. can improve performance when joining/crossmatching to
other hipscatted datasets.
moc_max_order (int): if generating a MOC, what to use as the max order. Defaults to 10.
use_pyarrow_types (bool): If True, the data is backed by pyarrow, otherwise we keep the
original data types. Defaults to True.
**kwargs: Arguments to pass to the creation of the catalog info.
Expand All @@ -40,13 +53,14 @@ def from_dataframe(
"""
catalog = DataframeCatalogLoader(
dataframe,
lowest_order,
highest_order,
partition_size,
threshold,
should_generate_moc,
moc_max_order,
use_pyarrow_types,
lowest_order=lowest_order,
highest_order=highest_order,
drop_empty_siblings=drop_empty_siblings,
partition_size=partition_size,
threshold=threshold,
should_generate_moc=should_generate_moc,
moc_max_order=moc_max_order,
use_pyarrow_types=use_pyarrow_types,
**kwargs,
).load_catalog()
if margin_threshold:
Expand Down
25 changes: 25 additions & 0 deletions tests/lsdb/loaders/dataframe/test_from_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,31 @@ def test_partitions_obey_threshold(small_sky_order1_df, small_sky_order1_catalog
assert all(num_pixels <= threshold for num_pixels in num_partition_pixels)


def test_from_dataframe_large_input(small_sky_order1_catalog, assert_divisions_are_correct):
"""Tests that we can initialize a catalog from a LARGE Pandas Dataframe and
that we're warned about the catalog's size"""
original_catalog_info = small_sky_order1_catalog.hc_structure.catalog_info
kwargs = {
"catalog_name": original_catalog_info.catalog_name,
"catalog_type": original_catalog_info.catalog_type,
}

rng = np.random.default_rng()
random_df = pd.DataFrame({"ra": rng.uniform(0, 60, 1_500_000), "dec": rng.uniform(0, 60, 1_500_000)})

# Read CSV file for the small sky order 1 catalog
with pytest.warns(RuntimeWarning, match="from_dataframe is not intended for large datasets"):
catalog = lsdb.from_dataframe(random_df, margin_threshold=None, **kwargs)
assert isinstance(catalog, lsdb.Catalog)
# Catalogs have the same information
original_catalog_info.total_rows = 1_500_000
assert catalog.hc_structure.catalog_info == original_catalog_info
# Index is set to hipscat index
assert catalog._ddf.index.name == HIPSCAT_ID_COLUMN
# Divisions belong to the respective HEALPix pixels
assert_divisions_are_correct(catalog)


def test_partitions_obey_default_threshold_when_no_arguments_specified(
small_sky_order1_df, small_sky_order1_catalog
):
Expand Down

0 comments on commit ad8ae51

Please sign in to comment.