Skip to content

Commit

Permalink
[data] add column relaxation (#49221)
Browse files Browse the repository at this point in the history
## Why are these changes needed?

Aligns `ray.data.Dataset.add_column` to it's docstring and allows more
flexibility for adding columns to pandas dataframes.

## Related issue number

Closes #49114

## Checks

- [ ] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [ ] I've run `scripts/format.sh` to lint the changes in this PR.
- [ ] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [x] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [x] Unit test
   - [ ] Release tests
   - [ ] This PR is not tested :(
`python -m pytest -v -s
python\ray\data\tests\test_map.py::test_add_column`

---------

Signed-off-by: Ivan Webber <[email protected]>
Signed-off-by: Ivan Webber <[email protected]>
  • Loading branch information
ivanthewebber authored Dec 14, 2024
1 parent 08b4309 commit 5da66ea
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 46 deletions.
17 changes: 1 addition & 16 deletions python/ray/data/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import logging
import time
import warnings
from collections.abc import Sequence
from typing import (
TYPE_CHECKING,
Any,
Expand Down Expand Up @@ -765,20 +764,9 @@ def add_column(
f"got: {batch_format}"
)

def _raise_duplicate_column_error(col: str):
raise ValueError(f"Trying to add an existing column with name {col!r}")

def add_column(batch: DataBatch) -> DataBatch:
column = fn(batch)
if batch_format == "pandas":
import pandas as pd

assert isinstance(column, (pd.Series, Sequence)), (
f"For pandas batch format, the function must return a pandas "
f"Series or sequence, got: {type(column)}"
)
if col in batch:
_raise_duplicate_column_error(col)
batch.loc[:, col] = column
return batch
elif batch_format == "pyarrow":
Expand All @@ -797,10 +785,9 @@ def add_column(batch: DataBatch) -> DataBatch:
# which case we'll want to append it
column_idx = batch.schema.get_field_index(col)
if column_idx == -1:
# Append the column to the table
return batch.append_column(col, column)
else:
_raise_duplicate_column_error(col)
return batch.set_column(column_idx, col, column)

else:
# batch format is assumed to be numpy since we checked at the
Expand All @@ -809,8 +796,6 @@ def add_column(batch: DataBatch) -> DataBatch:
f"For numpy batch format, the function must return a "
f"numpy.ndarray, got: {type(column)}"
)
if col in batch:
_raise_duplicate_column_error(col)
batch[col] = column
return batch

Expand Down
49 changes: 19 additions & 30 deletions python/ray/data/tests/test_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,15 +350,11 @@ def test_add_column(ray_start_regular_shared):
)
assert ds.take(1) == [{"id": 0, "foo": 1}]

# Adding a column that is already there should result in an error
with pytest.raises(
ray.exceptions.UserCodeException,
match="Trying to add an existing column with name 'id'",
):
ds = ray.data.range(5).add_column(
"id", lambda x: pc.add(x["id"], 1), batch_format="pyarrow"
)
assert ds.take(2) == [{"id": 1}, {"id": 2}]
# Adding a column that is already there should not result in an error
ds = ray.data.range(5).add_column(
"id", lambda x: pc.add(x["id"], 1), batch_format="pyarrow"
)
assert ds.take(2) == [{"id": 1}, {"id": 2}]

# Adding a column in the wrong format should result in an error
with pytest.raises(
Expand All @@ -378,15 +374,11 @@ def test_add_column(ray_start_regular_shared):
)
assert ds.take(1) == [{"id": 0, "foo": 1}]

# Adding a column that is already there should result in an error
with pytest.raises(
ray.exceptions.UserCodeException,
match="Trying to add an existing column with name 'id'",
):
ds = ray.data.range(5).add_column(
"id", lambda x: np.add(x["id"], 1), batch_format="numpy"
)
assert ds.take(2) == [{"id": 1}, {"id": 2}]
# Adding a column that is already there should not result in an error
ds = ray.data.range(5).add_column(
"id", lambda x: np.add(x["id"], 1), batch_format="numpy"
)
assert ds.take(2) == [{"id": 1}, {"id": 2}]

# Adding a column in the wrong format should result in an error
with pytest.raises(
Expand All @@ -402,23 +394,20 @@ def test_add_column(ray_start_regular_shared):
ds = ray.data.range(5).add_column("foo", lambda x: x["id"] + 1)
assert ds.take(1) == [{"id": 0, "foo": 1}]

# Adding a column that is already there should result in an error
with pytest.raises(
ray.exceptions.UserCodeException,
match="Trying to add an existing column with name 'id'",
):
ds = ray.data.range(5).add_column("id", lambda x: x["id"] + 1)
assert ds.take(2) == [{"id": 1}, {"id": 2}]
# Adding a column that is already there should not result in an error
ds = ray.data.range(5).add_column("id", lambda x: x["id"] + 1)
assert ds.take(2) == [{"id": 1}, {"id": 2}]

# Adding a column in the wrong format should result in an error
with pytest.raises(
ray.exceptions.UserCodeException, match="For pandas batch format"
):
# Adding a column in the wrong format may result in an error
with pytest.raises(ray.exceptions.UserCodeException):
ds = ray.data.range(5).add_column(
"id", lambda x: np.array([1]), batch_format="pandas"
"id", lambda x: range(7), batch_format="pandas"
)
assert ds.take(2) == [{"id": 1}, {"id": 2}]

ds = ray.data.range(5).add_column("const", lambda _: 3, batch_format="pandas")
assert ds.take(2) == [{"id": 0, "const": 3}, {"id": 1, "const": 3}]

with pytest.raises(ValueError):
ds = ray.data.range(5).add_column("id", 0)

Expand Down

0 comments on commit 5da66ea

Please sign in to comment.