Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
raulchen committed Dec 11, 2024
1 parent 27559ae commit aedfaa4
Showing 1 changed file with 10 additions and 5 deletions.
15 changes: 10 additions & 5 deletions python/ray/data/tests/test_streaming_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -665,12 +665,13 @@ def after_execution_fails(self, error: Exception):
self._execution_error = error

# Test the success case.
ctx = DataContext.get_current()
ds = ray.data.range(10)
ctx = ds.context
callback = CustomExecutionCallback()
add_execution_callback(callback, ctx)
assert get_execution_callbacks(ctx) == [callback]

ray.data.range(10).take_all()
ds.take_all()

assert callback._before_execution_starts_called
assert callback._after_execution_succeeds_called
Expand All @@ -680,18 +681,22 @@ def after_execution_fails(self, error: Exception):
assert get_execution_callbacks(ctx) == []

# Test the failure case.
ds = ray.data.range(10)
ctx = ds.context
ctx.raise_original_map_exception = True
callback = CustomExecutionCallback()
add_execution_callback(callback, ctx)

def map(_):
def map_fn(_):
raise ValueError("")

ray.data.range(10).map(map).take_all()
with pytest.raises(ValueError):
ds.map(map_fn).take_all()

assert callback._before_execution_starts_called
assert not callback._after_execution_succeeds_called
error = callback._execution_error
assert isinstance(error, ValueError)
assert isinstance(error, ValueError), error


if __name__ == "__main__":
Expand Down

0 comments on commit aedfaa4

Please sign in to comment.