From 0f172229e089a28c6555559feb3a0ee6dab2fe2a Mon Sep 17 00:00:00 2001 From: Sean McGuire Date: Tue, 11 Jun 2024 15:23:50 -0400 Subject: [PATCH 1/3] map partitions supports non dataframe results --- src/lsdb/catalog/dataset/healpix_dataset.py | 10 +++++++++- tests/lsdb/catalog/test_catalog.py | 13 +++++++++++++ 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/src/lsdb/catalog/dataset/healpix_dataset.py b/src/lsdb/catalog/dataset/healpix_dataset.py index 52c0684d..8c6a3ad3 100644 --- a/src/lsdb/catalog/dataset/healpix_dataset.py +++ b/src/lsdb/catalog/dataset/healpix_dataset.py @@ -223,6 +223,12 @@ def map_partitions( " the partitions in place will not work. If the function does not work for empty inputs, " "please specify a `meta` argument." ) + if not isinstance(meta, pd.DataFrame): + warnings.warn( + "output of the function must be a DataFrame to generate an LSDB `Catalog`. `map_partitions` " + "will return a dask object instead of a Catalog.", + RuntimeWarning, + ) if include_pixel: pixels = self.get_ordered_healpix_pixels() @@ -234,7 +240,9 @@ def apply_func(df, *args, partition_info=None, **kwargs): output_ddf = self._ddf.map_partitions(apply_func, *args, meta=meta, **kwargs) else: output_ddf = self._ddf.map_partitions(func, *args, meta=meta, **kwargs) - return self.__class__(output_ddf, self._ddf_pixel_map, self.hc_structure) + if isinstance(meta, pd.DataFrame): + return self.__class__(output_ddf, self._ddf_pixel_map, self.hc_structure) + return output_ddf def prune_empty_partitions(self, persist: bool = False) -> Self: """Prunes the catalog of its empty partitions diff --git a/tests/lsdb/catalog/test_catalog.py b/tests/lsdb/catalog/test_catalog.py index 25034fcf..c560954a 100644 --- a/tests/lsdb/catalog/test_catalog.py +++ b/tests/lsdb/catalog/test_catalog.py @@ -531,6 +531,19 @@ def add_col(df): assert np.all(mapcomp["a"] == mapcomp["ra"] + 1) +def test_map_partitions_non_df(small_sky_order1_catalog): + def get_col(df): + return df["ra"] + 1 + + with pytest.warns(RuntimeWarning, match="DataFrame"): + mapped = small_sky_order1_catalog.map_partitions(get_col) + + assert not isinstance(mapped, Catalog) + assert isinstance(mapped, dd.core.Series) + mapcomp = mapped.compute() + assert np.all(mapcomp == small_sky_order1_catalog.compute()["ra"] + 1) + + def test_non_working_empty_raises(small_sky_order1_catalog): def add_col(df): if len(df) == 0: From fdff109b3177d5cd4cc443863139276f689638c7 Mon Sep 17 00:00:00 2001 From: Sean McGuire Date: Tue, 11 Jun 2024 15:48:27 -0400 Subject: [PATCH 2/3] support all metas --- src/lsdb/catalog/dataset/healpix_dataset.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/lsdb/catalog/dataset/healpix_dataset.py b/src/lsdb/catalog/dataset/healpix_dataset.py index 8c6a3ad3..552c5022 100644 --- a/src/lsdb/catalog/dataset/healpix_dataset.py +++ b/src/lsdb/catalog/dataset/healpix_dataset.py @@ -223,12 +223,6 @@ def map_partitions( " the partitions in place will not work. If the function does not work for empty inputs, " "please specify a `meta` argument." ) - if not isinstance(meta, pd.DataFrame): - warnings.warn( - "output of the function must be a DataFrame to generate an LSDB `Catalog`. `map_partitions` " - "will return a dask object instead of a Catalog.", - RuntimeWarning, - ) if include_pixel: pixels = self.get_ordered_healpix_pixels() @@ -240,8 +234,14 @@ def apply_func(df, *args, partition_info=None, **kwargs): output_ddf = self._ddf.map_partitions(apply_func, *args, meta=meta, **kwargs) else: output_ddf = self._ddf.map_partitions(func, *args, meta=meta, **kwargs) - if isinstance(meta, pd.DataFrame): + + if isinstance(output_ddf, dd.core.DataFrame): return self.__class__(output_ddf, self._ddf_pixel_map, self.hc_structure) + warnings.warn( + "output of the function must be a DataFrame to generate an LSDB `Catalog`. `map_partitions` " + "will return a dask object instead of a Catalog.", + RuntimeWarning, + ) return output_ddf def prune_empty_partitions(self, persist: bool = False) -> Self: From b329cd4e77f1bdf3372eb9c21630c647dc5f65ad Mon Sep 17 00:00:00 2001 From: Sean McGuire Date: Fri, 14 Jun 2024 14:31:15 -0400 Subject: [PATCH 3/3] fix types and docstring --- src/lsdb/catalog/dataset/healpix_dataset.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/lsdb/catalog/dataset/healpix_dataset.py b/src/lsdb/catalog/dataset/healpix_dataset.py index 552c5022..63f6fabe 100644 --- a/src/lsdb/catalog/dataset/healpix_dataset.py +++ b/src/lsdb/catalog/dataset/healpix_dataset.py @@ -179,7 +179,7 @@ def map_partitions( meta: pd.DataFrame | pd.Series | Dict | Iterable | Tuple | None = None, include_pixel: bool = False, **kwargs, - ) -> Self: + ) -> Self | dd.core.Series: """Applies a function to each partition in the catalog. The ra and dec of each row is assumed to remain unchanged. @@ -210,7 +210,7 @@ def map_partitions( Returns: A new catalog with each partition replaced with the output of the function applied to the original - partition. + partition. If the function returns a non dataframe output, a dask Series will be returned. """ if meta is None: if include_pixel: