Skip to content

Commit

Permalink
[data] fix repartition raise error when dataset is empty
Browse files Browse the repository at this point in the history
Signed-off-by: jukejian <[email protected]>
  • Loading branch information
Jay-ju committed Dec 12, 2024
1 parent bcb1b20 commit 23a56c6
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 17 deletions.
4 changes: 2 additions & 2 deletions python/ray/data/_internal/execution/resource_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,9 @@ def __init__(
)

self._object_store_memory_limit_fraction = (
data_context.object_store_memory_limit_fraction
data_context.override_object_store_memory_limit_fraction
if self.op_resource_allocator_enabled()
else data_context.object_store_memory_limit_fraction_wo_resource_reservation
else data_context.override_object_store_memory_limit_fraction * 0.5
)

def _estimate_object_store_memory(self, op, state) -> int:
Expand Down
19 changes: 4 additions & 15 deletions python/ray/data/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,19 +141,11 @@

# The fraction of the object store capacity that will be used as the default object
# store memory limit for the streaming executor,
# when `ReservationOpResourceAllocator` is enabled.
# deprecate DEFAULT_OBJECT_STORE_MEMORY_LIMIT_FRACTION
# when `ReservationOpResourceAllocator` is enabled. 0.5
# when `ReservationOpResourceAllocator` is not enabled. 0.25
DEFAULT_OBJECT_STORE_MEMORY_LIMIT_FRACTION = float(
os.getenv("RAY_DATA_OBJECT_STORE_MEMORY_LIMIT_FRACTION", "0.5")
)
# The fraction of the object store capacity that will be used as the default object
# store memory limit for the streaming executor,
# when `ReservationOpResourceAllocator` is not enabled.
DEFAULT_OBJECT_STORE_MEMORY_LIMIT_FRACTION_WO_RESOURCE_RESERVATION = float(
os.getenv(
"RAY_DATA_OBJECT_STORE_MEMORY_LIMIT_FRACTION_WO_RESOURCE_RESERVATION", "0.25"
)
)

DEFAULT_MAX_ERRORED_BLOCKS = 0

Expand Down Expand Up @@ -356,12 +348,9 @@ class DataContext:
retried_io_errors: List[str] = field(
default_factory=lambda: list(DEFAULT_RETRIED_IO_ERRORS)
)
object_store_memory_limit_fraction: float = (
DEFAULT_OBJECT_STORE_MEMORY_LIMIT_FRACTION
)

object_store_memory_limit_fraction_wo_resource_reservation: float = (
DEFAULT_OBJECT_STORE_MEMORY_LIMIT_FRACTION_WO_RESOURCE_RESERVATION
override_object_store_memory_limit_fraction: float = (
DEFAULT_OBJECT_STORE_MEMORY_LIMIT_FRACTION
)

def __post_init__(self):
Expand Down

0 comments on commit 23a56c6

Please sign in to comment.