From ad8ae51050089e44ef6654531f9b577778a67995 Mon Sep 17 00:00:00 2001 From: Melissa DeLucchi <113376043+delucchi-cmu@users.noreply.github.com> Date: Tue, 23 Jul 2024 18:20:37 -0400 Subject: [PATCH] Make from_dataframe defaults consistent with hipscat-import (#379) * Make from_dataframe defaults consistent with hipscat-import * Code review comments. --- .../dataframe/dataframe_catalog_loader.py | 93 +++++++++---------- src/lsdb/loaders/dataframe/from_dataframe.py | 34 +++++-- .../loaders/dataframe/test_from_dataframe.py | 25 +++++ 3 files changed, 91 insertions(+), 61 deletions(-) diff --git a/src/lsdb/loaders/dataframe/dataframe_catalog_loader.py b/src/lsdb/loaders/dataframe/dataframe_catalog_loader.py index dfa109c8..d3406837 100644 --- a/src/lsdb/loaders/dataframe/dataframe_catalog_loader.py +++ b/src/lsdb/loaders/dataframe/dataframe_catalog_loader.py @@ -2,6 +2,7 @@ import dataclasses import math +import warnings from typing import Dict, List, Tuple import astropy.units as u @@ -21,7 +22,7 @@ _append_partition_information_to_dataframe, _generate_dask_dataframe, ) -from lsdb.types import DaskDFPixelMap, HealpixInfo +from lsdb.types import DaskDFPixelMap pd.options.mode.chained_assignment = None # default='warn' @@ -32,8 +33,9 @@ class DataframeCatalogLoader: def __init__( self, dataframe: pd.DataFrame, - lowest_order: int = 3, + lowest_order: int = 0, highest_order: int = 7, + drop_empty_siblings: bool = False, partition_size: int | None = None, threshold: int | None = None, should_generate_moc: bool = True, @@ -47,8 +49,14 @@ def __init__( dataframe (pd.Dataframe): Catalog Pandas Dataframe. lowest_order (int): The lowest partition order. Defaults to 3. highest_order (int): The highest partition order. Defaults to 7. - partition_size (int): The desired partition size, in number of rows. + drop_empty_siblings (bool): When determining final partitionining, + if 3 of 4 pixels are empty, keep only the non-empty pixel + partition_size (int): The desired partition size, in number of bytes in-memory. threshold (int): The maximum number of data points per pixel. + should_generate_moc (bool): should we generate a MOC (multi-order coverage map) + of the data. can improve performance when joining/crossmatching to + other hipscatted datasets. + moc_max_order (int): if generating a MOC, what to use as the max order. Defaults to 10. use_pyarrow_types (bool): If True, the data is backed by pyarrow, otherwise we keep the original data types. Defaults to True. **kwargs: Arguments to pass to the creation of the catalog info. @@ -56,6 +64,7 @@ def __init__( self.dataframe = dataframe self.lowest_order = lowest_order self.highest_order = highest_order + self.drop_empty_siblings = drop_empty_siblings self.threshold = self._calculate_threshold(partition_size, threshold) self.catalog_info = self._create_catalog_info(**kwargs) self.should_generate_moc = should_generate_moc @@ -73,6 +82,13 @@ def _calculate_threshold(self, partition_size: int | None = None, threshold: int Returns: The HEALPix pixel threshold """ + df_total_memory = self.dataframe.memory_usage(deep=True).sum() + if df_total_memory > (1 << 30) or len(self.dataframe) > 1_000_000: + warnings.warn( + "from_dataframe is not intended for large datasets. " + "Consider using hipscat-import: https://hipscat-import.readthedocs.io/", + RuntimeWarning, + ) if threshold is not None and partition_size is not None: raise ValueError("Specify only one: threshold or partition_size") if threshold is None: @@ -83,7 +99,6 @@ def _calculate_threshold(self, partition_size: int | None = None, threshold: int threshold = len(self.dataframe) // num_partitions else: # Each partition in memory will be of roughly 1Gib - df_total_memory = self.dataframe.memory_usage(deep=True).sum() partition_memory = df_total_memory / len(self.dataframe) threshold = math.ceil((1 << 30) / partition_memory) return threshold @@ -111,12 +126,11 @@ def load_catalog(self) -> Catalog: Catalog object with data from the source given at loader initialization """ self._set_hipscat_index() - pixel_map = self._compute_pixel_map() - ddf, ddf_pixel_map, total_rows = self._generate_dask_df_and_map(pixel_map) + pixel_list = self._compute_pixel_list() + ddf, ddf_pixel_map, total_rows = self._generate_dask_df_and_map(pixel_list) self.catalog_info = dataclasses.replace(self.catalog_info, total_rows=total_rows) - healpix_pixels = list(pixel_map.keys()) moc = self._generate_moc() if self.should_generate_moc else None - hc_structure = hc.catalog.Catalog(self.catalog_info, healpix_pixels, moc=moc) + hc_structure = hc.catalog.Catalog(self.catalog_info, pixel_list, moc=moc) return Catalog(ddf, ddf_pixel_map, hc_structure) def _set_hipscat_index(self): @@ -128,15 +142,12 @@ def _set_hipscat_index(self): ) self.dataframe.set_index(HIPSCAT_ID_COLUMN, inplace=True) - def _compute_pixel_map(self) -> Dict[HealpixPixel, HealpixInfo]: - """Compute object histogram and generate the sorted mapping between - HEALPix pixels and the respective original pixel information. The - pixels are sorted by ascending hipscat_id. + def _compute_pixel_list(self) -> List[HealpixPixel]: + """Compute object histogram and generate the sorted list of + HEALPix pixels. The pixels are sorted by ascending hipscat_id. Returns: - A dictionary mapping each HEALPix pixel to the respective - information tuple. The first value of the tuple is the number - of objects in the HEALPix pixel, the second is the list of pixels. + List of HEALPix pixels for the final partitioning. """ raw_histogram = generate_histogram( self.dataframe, @@ -144,25 +155,26 @@ def _compute_pixel_map(self) -> Dict[HealpixPixel, HealpixInfo]: ra_column=self.catalog_info.ra_column, dec_column=self.catalog_info.dec_column, ) - pixel_map = hc.pixel_math.compute_pixel_map( + alignment = hc.pixel_math.generate_alignment( raw_histogram, highest_order=self.highest_order, lowest_order=self.lowest_order, threshold=self.threshold, + drop_empty_siblings=self.drop_empty_siblings, ) - pixels = list(pixel_map.keys()) - ordered_pixels = np.array(pixels)[get_pixel_argsort(pixels)] - return {pixel: pixel_map[pixel] for pixel in ordered_pixels} + non_none_elements = alignment[alignment != np.array(None)] + pixel_list = np.unique(non_none_elements) + pixel_list = [HealpixPixel(order, pix) for (order, pix, count) in pixel_list if int(count) > 0] + return list(np.array(pixel_list)[get_pixel_argsort(pixel_list)]) def _generate_dask_df_and_map( - self, pixel_map: Dict[HealpixPixel, HealpixInfo] + self, pixel_list: List[HealpixPixel] ) -> Tuple[dd.DataFrame, DaskDFPixelMap, int]: """Load Dask DataFrame from HEALPix pixel Dataframes and generate a mapping of HEALPix pixels to HEALPix Dataframes Args: - pixel_map (Dict[HealpixPixel, HealpixInfo]): The mapping between - catalog HEALPix pixels and respective data information. + pixel_list (List[HealpixPixel]): final partitioning of data Returns: Tuple containing the Dask Dataframe, the mapping of HEALPix pixels @@ -174,42 +186,21 @@ def _generate_dask_df_and_map( # Mapping HEALPix pixels to the respective Dataframe indices ddf_pixel_map: Dict[HealpixPixel, int] = {} - for hp_pixel_index, hp_pixel_info in enumerate(pixel_map.items()): - hp_pixel, (_, pixels) = hp_pixel_info + for hp_pixel_index, hp_pixel in enumerate(pixel_list): # Store HEALPix pixel in map ddf_pixel_map[hp_pixel] = hp_pixel_index - # Obtain Dataframe for current HEALPix pixel - pixel_dfs.append(self._get_dataframe_for_healpix(hp_pixel, pixels)) + # Obtain Dataframe for current HEALPix pixel, using NESTED characteristics. + left_bound = healpix_to_hipscat_id(hp_pixel.order, hp_pixel.pixel) + right_bound = healpix_to_hipscat_id(hp_pixel.order, hp_pixel.pixel + 1) + pixel_df = self.dataframe.loc[ + (self.dataframe.index >= left_bound) & (self.dataframe.index < right_bound) + ] + pixel_dfs.append(_append_partition_information_to_dataframe(pixel_df, hp_pixel)) # Generate Dask Dataframe with the original schema and desired backend - pixel_list = list(ddf_pixel_map.keys()) ddf, total_rows = _generate_dask_dataframe(pixel_dfs, pixel_list, self.use_pyarrow_types) return ddf, ddf_pixel_map, total_rows - def _get_dataframe_for_healpix(self, hp_pixel: HealpixPixel, pixels: List[int]) -> pd.DataFrame: - """Computes the Pandas Dataframe containing the data points - for a certain HEALPix pixel. - - Using NESTED ordering scheme, the provided list is a sequence of contiguous - pixel numbers, in ascending order, inside the HEALPix pixel. Therefore, the - corresponding points in the Dataframe will be located between the hipscat - index of the lowest numbered pixel (left_bound) and the hipscat index of the - highest numbered pixel (right_bound). - - Args: - hp_pixel (HealpixPixel): The HEALPix pixel to generate the Dataframe for - pixels (List[int]): The indices of the pixels inside the HEALPix pixel - - Returns: - The Pandas Dataframe containing the data points for the HEALPix pixel. - """ - left_bound = healpix_to_hipscat_id(self.highest_order, pixels[0]) - right_bound = healpix_to_hipscat_id(self.highest_order, pixels[-1] + 1) - pixel_df = self.dataframe.loc[ - (self.dataframe.index >= left_bound) & (self.dataframe.index < right_bound) - ] - return _append_partition_information_to_dataframe(pixel_df, hp_pixel) - def _generate_moc(self): lon = self.dataframe[self.catalog_info.ra_column].to_numpy() * u.deg lat = self.dataframe[self.catalog_info.dec_column].to_numpy() * u.deg diff --git a/src/lsdb/loaders/dataframe/from_dataframe.py b/src/lsdb/loaders/dataframe/from_dataframe.py index d207501f..164f7e9d 100644 --- a/src/lsdb/loaders/dataframe/from_dataframe.py +++ b/src/lsdb/loaders/dataframe/from_dataframe.py @@ -7,10 +7,13 @@ from lsdb.loaders.dataframe.margin_catalog_generator import MarginCatalogGenerator +# pylint: disable=too-many-arguments def from_dataframe( dataframe: pd.DataFrame, - lowest_order: int = 3, + *, + lowest_order: int = 0, highest_order: int = 7, + drop_empty_siblings: bool = False, partition_size: int | None = None, threshold: int | None = None, margin_order: int | None = -1, @@ -22,15 +25,25 @@ def from_dataframe( ) -> Catalog: """Load a catalog from a Pandas Dataframe in CSV format. + Note that this is only suitable for small datasets (< 1million rows and + < 1GB dataframe in-memory). If you need to deal with large datasets, consider + using the hipscat-import package: https://hipscat-import.readthedocs.io/ + Args: dataframe (pd.Dataframe): The catalog Pandas Dataframe. - lowest_order (int): The lowest partition order. Defaults to 3. + lowest_order (int): The lowest partition order. Defaults to 0. highest_order (int): The highest partition order. Defaults to 7. - partition_size (int): The desired partition size, in number of rows. + drop_empty_siblings (bool): When determining final partitionining, + if 3 of 4 pixels are empty, keep only the non-empty pixel + partition_size (int): The desired partition size, in number of bytes in-memory. threshold (int): The maximum number of data points per pixel. margin_order (int): The order at which to generate the margin cache. margin_threshold (float): The size of the margin cache boundary, in arcseconds. If None, the margin cache is not generated. Defaults to 5 arcseconds. + should_generate_moc (bool): should we generate a MOC (multi-order coverage map) + of the data. can improve performance when joining/crossmatching to + other hipscatted datasets. + moc_max_order (int): if generating a MOC, what to use as the max order. Defaults to 10. use_pyarrow_types (bool): If True, the data is backed by pyarrow, otherwise we keep the original data types. Defaults to True. **kwargs: Arguments to pass to the creation of the catalog info. @@ -40,13 +53,14 @@ def from_dataframe( """ catalog = DataframeCatalogLoader( dataframe, - lowest_order, - highest_order, - partition_size, - threshold, - should_generate_moc, - moc_max_order, - use_pyarrow_types, + lowest_order=lowest_order, + highest_order=highest_order, + drop_empty_siblings=drop_empty_siblings, + partition_size=partition_size, + threshold=threshold, + should_generate_moc=should_generate_moc, + moc_max_order=moc_max_order, + use_pyarrow_types=use_pyarrow_types, **kwargs, ).load_catalog() if margin_threshold: diff --git a/tests/lsdb/loaders/dataframe/test_from_dataframe.py b/tests/lsdb/loaders/dataframe/test_from_dataframe.py index 8227d4ea..03e9d3fc 100644 --- a/tests/lsdb/loaders/dataframe/test_from_dataframe.py +++ b/tests/lsdb/loaders/dataframe/test_from_dataframe.py @@ -137,6 +137,31 @@ def test_partitions_obey_threshold(small_sky_order1_df, small_sky_order1_catalog assert all(num_pixels <= threshold for num_pixels in num_partition_pixels) +def test_from_dataframe_large_input(small_sky_order1_catalog, assert_divisions_are_correct): + """Tests that we can initialize a catalog from a LARGE Pandas Dataframe and + that we're warned about the catalog's size""" + original_catalog_info = small_sky_order1_catalog.hc_structure.catalog_info + kwargs = { + "catalog_name": original_catalog_info.catalog_name, + "catalog_type": original_catalog_info.catalog_type, + } + + rng = np.random.default_rng() + random_df = pd.DataFrame({"ra": rng.uniform(0, 60, 1_500_000), "dec": rng.uniform(0, 60, 1_500_000)}) + + # Read CSV file for the small sky order 1 catalog + with pytest.warns(RuntimeWarning, match="from_dataframe is not intended for large datasets"): + catalog = lsdb.from_dataframe(random_df, margin_threshold=None, **kwargs) + assert isinstance(catalog, lsdb.Catalog) + # Catalogs have the same information + original_catalog_info.total_rows = 1_500_000 + assert catalog.hc_structure.catalog_info == original_catalog_info + # Index is set to hipscat index + assert catalog._ddf.index.name == HIPSCAT_ID_COLUMN + # Divisions belong to the respective HEALPix pixels + assert_divisions_are_correct(catalog) + + def test_partitions_obey_default_threshold_when_no_arguments_specified( small_sky_order1_df, small_sky_order1_catalog ):