Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Data] Re-implement APIs like select_columns with PyArrow batch format #48140

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
835e326
initial
ArturNiederfahrenhorst Oct 21, 2024
bccfa91
fix add_column
ArturNiederfahrenhorst Oct 22, 2024
3c034bf
drop columns working
ArturNiederfahrenhorst Oct 22, 2024
d8d6aa0
lint
ArturNiederfahrenhorst Oct 22, 2024
ccc41eb
lint
ArturNiederfahrenhorst Oct 22, 2024
4eb8cc5
fix doctests
ArturNiederfahrenhorst Oct 23, 2024
1a416d5
Add pa.array() to dataset/iterator docstrings
ArturNiederfahrenhorst Oct 23, 2024
b3abe46
lint
ArturNiederfahrenhorst Oct 23, 2024
9511fa8
make pandas default
ArturNiederfahrenhorst Oct 25, 2024
3a5ea5c
revert drop columns test
ArturNiederfahrenhorst Oct 25, 2024
f6ac232
revert docstrings to reflect simple pandas version
ArturNiederfahrenhorst Oct 25, 2024
78b73fa
replace drop_columns with drop
ArturNiederfahrenhorst Oct 25, 2024
d74f318
format
ArturNiederfahrenhorst Oct 31, 2024
7fcc901
fix select columns by removing duplicates and fixing the test
ArturNiederfahrenhorst Oct 31, 2024
d882402
wip
ArturNiederfahrenhorst Oct 31, 2024
35bbfb1
replace kwarg by arg
ArturNiederfahrenhorst Nov 4, 2024
f234862
cleanup
ArturNiederfahrenhorst Nov 4, 2024
f3cbf08
cleanup after merge
ArturNiederfahrenhorst Nov 8, 2024
55bc8f6
Alexey's comment
ArturNiederfahrenhorst Nov 13, 2024
9252d0d
minor change
ArturNiederfahrenhorst Nov 14, 2024
0846f80
lint
ArturNiederfahrenhorst Nov 14, 2024
9fe088d
richard's comments
ArturNiederfahrenhorst Nov 18, 2024
a79db22
Update python/ray/data/dataset.py
ArturNiederfahrenhorst Nov 18, 2024
aae2a7a
add numpy
ArturNiederfahrenhorst Nov 18, 2024
ace4348
add tests
ArturNiederfahrenhorst Nov 18, 2024
84bb1b5
Richard's comment + testing
ArturNiederfahrenhorst Nov 19, 2024
11c75a2
lint
ArturNiederfahrenhorst Nov 19, 2024
a306be3
Update python/ray/data/dataset.py
ArturNiederfahrenhorst Nov 20, 2024
cfe5900
Alexey's comments
ArturNiederfahrenhorst Nov 20, 2024
ddf22e4
fix doctests
ArturNiederfahrenhorst Nov 21, 2024
4e11ff9
Update python/ray/data/dataset.py
ArturNiederfahrenhorst Nov 21, 2024
783e6ca
Remove pymongoarrow datatype testing in test_mongo
ArturNiederfahrenhorst Nov 21, 2024
bcd0ba6
fix doctests
ArturNiederfahrenhorst Nov 21, 2024
149a5cf
lint
ArturNiederfahrenhorst Nov 21, 2024
9fea594
fix gc test
ArturNiederfahrenhorst Nov 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions python/ray/data/block.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ class BlockType(Enum):
# returned from batch UDFs.
DataBatch = Union["pyarrow.Table", "pandas.DataFrame", Dict[str, np.ndarray]]

# User-facing data column type. This is the data type for data that is supplied to and
# returned from column UDFs.
DataBatchColumn = Union[
"pyarrow.ChunkedArray", "pyarrow.Array", "pandas.Series", np.ndarray
]


# A class type that implements __call__.
CallableClass = type
Expand Down
100 changes: 83 additions & 17 deletions python/ray/data/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
Block,
BlockAccessor,
DataBatch,
DataBatchColumn,
T,
U,
UserDefinedFunction,
Expand Down Expand Up @@ -529,7 +530,8 @@ def __call__(self, batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
compute: This argument is deprecated. Use ``concurrency`` argument.
batch_format: If ``"default"`` or ``"numpy"``, batches are
``Dict[str, numpy.ndarray]``. If ``"pandas"``, batches are
``pandas.DataFrame``.
``pandas.DataFrame``. If ``"pyarrow"``, batches are
``pyarrow.Table``.
zero_copy_batch: Whether ``fn`` should be provided zero-copy, read-only
batches. If this is ``True`` and no copy is required for the
``batch_format`` conversion, the batch is a zero-copy, read-only
Expand Down Expand Up @@ -700,16 +702,21 @@ def _map_batches_without_batch_size_validation(
def add_column(
self,
col: str,
fn: Callable[["pandas.DataFrame"], "pandas.Series"],
fn: Callable[
[DataBatch],
DataBatchColumn,
],
*,
batch_format: Optional[str] = "pandas",
compute: Optional[str] = None,
concurrency: Optional[Union[int, Tuple[int, int]]] = None,
**ray_remote_args,
) -> "Dataset":
"""Add the given column to the dataset.

A function generating the new column values given the batch in pandas
format must be specified.
A function generating the new column values given the batch in pyarrow or pandas
format must be specified. This function must operate on batches of
`batch_format`.

Examples:

Expand All @@ -729,18 +736,18 @@ def add_column(
id int64
new_id int64

Overwrite the existing values with zeros.

>>> ds.add_column("id", lambda df: 0).take(3)
[{'id': 0}, {'id': 0}, {'id': 0}]

Time complexity: O(dataset size / parallelism)

Args:
col: Name of the column to add. If the name already exists, the
column is overwritten.
fn: Map function generating the column values given a batch of
records in pandas format.
batch_format: If ``"default"`` or ``"numpy"``, batches are
``Dict[str, numpy.ndarray]``. If ``"pandas"``, batches are
``pandas.DataFrame``. If ``"pyarrow"``, batches are
``pyarrow.Table``. If ``"numpy"``, batches are
``Dict[str, numpy.ndarray]``.
compute: This argument is deprecated. Use ``concurrency`` argument.
concurrency: The number of Ray workers to use concurrently. For a
fixed-sized worker pool of size ``n``, specify ``concurrency=n``. For
Expand All @@ -749,17 +756,72 @@ def add_column(
ray_remote_args: Additional resource requirements to request from
ray (e.g., num_gpus=1 to request GPUs for the map tasks).
"""
# Check that batch_format
accepted_batch_formats = ["pandas", "pyarrow", "numpy"]
if batch_format not in accepted_batch_formats:
raise ValueError(
f"batch_format argument must be on of {accepted_batch_formats}, "
f"got: {batch_format}"
)

def add_column(batch: "pandas.DataFrame") -> "pandas.DataFrame":
batch.loc[:, col] = fn(batch)
return batch
def add_column(batch: DataBatch) -> DataBatch:
column = fn(batch)
if batch_format == "pandas":
import pandas as pd

assert isinstance(column, pd.Series), (
f"For pandas batch format, the function must return a pandas "
f"Series, got: {type(column)}"
)
if col in batch:
raise ValueError(
f"Trying to add an existing column with name" f" {col}"
)
batch.loc[:, col] = column
alexeykudinkin marked this conversation as resolved.
Show resolved Hide resolved
return batch
elif batch_format == "pyarrow":
import pyarrow as pa

assert isinstance(column, (pa.Array, pa.ChunkedArray)), (
f"For pyarrow batch format, the function must return a pyarrow "
f"Array, got: {type(column)}"
)
# Historically, this method was written for pandas batch format.
# To resolve https://github.com/ray-project/ray/issues/48090,
# we also allow pyarrow batch format which is preferred but would be
# a breaking change to enforce.

# For pyarrow, the index of the column will be -1 if it is missing in
# which case we'll want to append it
column_idx = batch.schema.get_field_index(col)
if column_idx == -1:
# Append the column to the table
return batch.append_column(col, column)
else:
raise ValueError(
f"Trying to add an existing column with name {col}"
)

else:
# batch format is assumed to be numpy since we checked at the
# beginning of the add_column function
assert isinstance(column, np.ndarray), (
f"For numpy batch format, the function must return a "
f"numpy.ndarray, got: {type(column)}"
)
if col in batch:
raise ValueError(
f"Trying to add an existing column with name" f" {col}"
)
batch[col] = column
return batch

if not callable(fn):
raise ValueError("`fn` must be callable, got {}".format(fn))

return self.map_batches(
add_column,
batch_format="pandas", # TODO(ekl) we should make this configurable.
batch_format=batch_format,
compute=compute,
concurrency=concurrency,
zero_copy_batch=False,
Expand Down Expand Up @@ -801,7 +863,7 @@ def drop_columns(

Args:
cols: Names of the columns to drop. If any name does not exist,
an exception is raised.
an exception is raised. Column names must be unique.
compute: This argument is deprecated. Use ``concurrency`` argument.
concurrency: The number of Ray workers to use concurrently. For a fixed-sized
worker pool of size ``n``, specify ``concurrency=n``. For an autoscaling
Expand All @@ -810,12 +872,15 @@ def drop_columns(
ray (e.g., num_gpus=1 to request GPUs for the map tasks).
""" # noqa: E501

if len(cols) != len(set(cols)):
raise ValueError(f"drop_columns expects unique column names, got: {cols}")

def drop_columns(batch):
return batch.drop(columns=cols)
return batch.drop(cols)

return self.map_batches(
drop_columns,
batch_format="pandas",
batch_format="pyarrow",
zero_copy_batch=True,
compute=compute,
concurrency=concurrency,
Expand Down Expand Up @@ -4316,7 +4381,8 @@ def to_tf(
If your model accepts additional metadata aside from features and label, specify a single additional column or a list of additional columns.
A common use case is to include sample weights in the data samples and train a ``tf.keras.Model`` with ``tf.keras.Model.fit``.

>>> ds = ds.add_column("sample weights", lambda df: 1)
>>> import pandas as pd
>>> ds = ds.add_column("sample weights", lambda df: pd.Series([1] * len(df)))
>>> ds.to_tf(feature_columns="features", label_columns="target", additional_columns="sample weights")
<_OptionsDataset element_spec=(TensorSpec(shape=(None, 4), dtype=tf.float64, name='features'), TensorSpec(shape=(None,), dtype=tf.int64, name='target'), TensorSpec(shape=(None,), dtype=tf.int64, name='sample weights'))>

Expand Down
3 changes: 2 additions & 1 deletion python/ray/data/iterator.py
Original file line number Diff line number Diff line change
Expand Up @@ -734,7 +734,8 @@ def to_tf(
If your model accepts additional metadata aside from features and label, specify a single additional column or a list of additional columns.
A common use case is to include sample weights in the data samples and train a ``tf.keras.Model`` with ``tf.keras.Model.fit``.

>>> ds = ds.add_column("sample weights", lambda df: 1)
>>> import pandas as pd
>>> ds = ds.add_column("sample weights", lambda df: pd.Series([1] * len(df)))
>>> it = ds.iterator()
>>> it.to_tf(feature_columns="sepal length (cm)", label_columns="target", additional_columns="sample weights")
<_OptionsDataset element_spec=(TensorSpec(shape=(None,), dtype=tf.float64, name='sepal length (cm)'), TensorSpec(shape=(None,), dtype=tf.int64, name='target'), TensorSpec(shape=(None,), dtype=tf.int64, name='sample weights'))>
Expand Down
97 changes: 90 additions & 7 deletions python/ray/data/tests/test_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.compute as pc
import pyarrow.parquet as pq
import pytest

Expand Down Expand Up @@ -330,18 +331,99 @@ def map_generator(item: dict) -> Iterator[int]:


def test_add_column(ray_start_regular_shared):
ds = ray.data.range(5).add_column("foo", lambda x: 1)
"""Tests the add column API."""

# Test with pyarrow batch format
ds = ray.data.range(5).add_column(
"foo", lambda x: pa.array([1] * x.num_rows), batch_format="pyarrow"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's also test with pa.chunked_array

)
assert ds.take(1) == [{"id": 0, "foo": 1}]

# Test with chunked array batch format
ds = ray.data.range(5).add_column(
"foo", lambda x: pa.chunked_array([[1] * x.num_rows]), batch_format="pyarrow"
)
assert ds.take(1) == [{"id": 0, "foo": 1}]

ds = ray.data.range(5).add_column(
"foo", lambda x: pc.add(x["id"], 1), batch_format="pyarrow"
)
assert ds.take(1) == [{"id": 0, "foo": 1}]

# Adding a column that is already there should result in an error
with pytest.raises(
ray.exceptions.UserCodeException,
match="Trying to add an existing column with name id",
):
ds = ray.data.range(5).add_column(
"id", lambda x: pc.add(x["id"], 1), batch_format="pyarrow"
)
assert ds.take(2) == [{"id": 1}, {"id": 2}]

# Adding a column in the wrong format should result in an error
with pytest.raises(
ray.exceptions.UserCodeException, match="For pyarrow batch " "format"
):
ds = ray.data.range(5).add_column("id", lambda x: [1], batch_format="pyarrow")
assert ds.take(2) == [{"id": 1}, {"id": 2}]

# Test with numpy batch format
ds = ray.data.range(5).add_column(
"foo", lambda x: np.array([1] * len(list(x.keys())[0])), batch_format="numpy"
)
assert ds.take(1) == [{"id": 0, "foo": 1}]

ds = ray.data.range(5).add_column(
"foo", lambda x: np.add(x["id"], 1), batch_format="numpy"
)
assert ds.take(1) == [{"id": 0, "foo": 1}]

# Adding a column that is already there should result in an error
with pytest.raises(
ray.exceptions.UserCodeException,
match="Trying to add an existing column with name id",
):
ds = ray.data.range(5).add_column(
"id", lambda x: np.add(x["id"], 1), batch_format="numpy"
)
assert ds.take(2) == [{"id": 1}, {"id": 2}]

# Adding a column in the wrong format should result in an error
with pytest.raises(
ray.exceptions.UserCodeException, match="For numpy batch " "format"
):
ds = ray.data.range(5).add_column("id", lambda x: [1], batch_format="numpy")
assert ds.take(2) == [{"id": 1}, {"id": 2}]

# Test with pandas batch format
ds = ray.data.range(5).add_column("foo", lambda x: pd.Series([1] * x.shape[0]))
assert ds.take(1) == [{"id": 0, "foo": 1}]

ds = ray.data.range(5).add_column("foo", lambda x: x["id"] + 1)
assert ds.take(1) == [{"id": 0, "foo": 1}]

ds = ray.data.range(5).add_column("id", lambda x: x["id"] + 1)
assert ds.take(2) == [{"id": 1}, {"id": 2}]
# Adding a column that is already there should result in an error
with pytest.raises(
ray.exceptions.UserCodeException,
match="Trying to add an existing column with name id",
):
ds = ray.data.range(5).add_column("id", lambda x: x["id"] + 1)
assert ds.take(2) == [{"id": 1}, {"id": 2}]

# Adding a column in the wrong format should result in an error
with pytest.raises(
ray.exceptions.UserCodeException, match="For pandas batch " "format"
):
ds = ray.data.range(5).add_column("id", lambda x: [1], batch_format="pandas")
assert ds.take(2) == [{"id": 1}, {"id": 2}]

with pytest.raises(ValueError):
ds = ray.data.range(5).add_column("id", 0)

# Test that an invalid batch_format raises an error
with pytest.raises(ValueError):
ray.data.range(5).add_column("foo", lambda x: x["id"] + 1, batch_format="foo")


@pytest.mark.parametrize("names", (["foo", "bar"], {"spam": "foo", "ham": "bar"}))
def test_rename_columns(ray_start_regular_shared, names):
Expand All @@ -362,14 +444,15 @@ def test_drop_columns(ray_start_regular_shared, tmp_path):
assert ds.drop_columns(["col2"]).take(1) == [{"col1": 1, "col3": 3}]
assert ds.drop_columns(["col1", "col3"]).take(1) == [{"col2": 2}]
assert ds.drop_columns([]).take(1) == [{"col1": 1, "col2": 2, "col3": 3}]
assert ds.drop_columns(["col1", "col2", "col3"]).take(1) == [{}]
assert ds.drop_columns(["col1", "col1", "col2", "col1"]).take(1) == [
{"col3": 3}
]
assert ds.drop_columns(["col1", "col2", "col3"]).take(1) == []
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed offline, this behavior is arbitrary and probably has little practical relevance.
Since our pyarrow implementation of the drop operation returns an empty list, we decided to just change the test in this case.

assert ds.drop_columns(["col1", "col2"]).take(1) == [{"col3": 3}]
# Test dropping non-existent column
with pytest.raises((UserCodeException, KeyError)):
ds.drop_columns(["dummy_col", "col1", "col2"]).materialize()

with pytest.raises(ValueError, match="drop_columns expects unique column names"):
ds1.drop_columns(["col1", "col2", "col2"])


def test_select_columns(ray_start_regular_shared):
# Test pandas and arrow
Expand Down
Loading
Loading