Skip to content

Commit

Permalink
Merge branch 'hjiang/fix-windows-dependency' of github.com:dentiny/ra…
Browse files Browse the repository at this point in the history
…y into hjiang/fix-windows-dependency
  • Loading branch information
dentiny committed Jan 22, 2025
2 parents 5925ecd + 263ec94 commit a429d6f
Show file tree
Hide file tree
Showing 14 changed files with 370 additions and 48 deletions.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
72 changes: 72 additions & 0 deletions doc/source/ray-core/compiled-graph/execution.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
Execution and failure semantics
===============================

Like classic Ray Core, Ray Compiled Graph propagates exceptions to the final output.
In particular:

- **Application exceptions**: If an application task throws an exception, Compiled Graph
wraps the exception in a :class:`RayTaskError <ray.exceptions.RayTaskError>` and
raises it when the caller calls :func:`ray.get() <ray.get>` on the result. The thrown
exception inherits from both :class:`RayTaskError <ray.exceptions.RayTaskError>`
and the original exception class.

- **System exceptions**: System exceptions include actor death or unexpected errors
such as network errors. For actor death, Compiled Graph raises a
:class:`ActorDiedError <ray.exceptions.ActorDiedError>`, and for other errors, it
raises a :class:`RayChannelError <ray.exceptions.RayChannelError>`.

Ray Compiled Graph remains executable after application exceptions. However, Compiled Graph
automatically shuts down in the case of system exceptions. If an actor's death causes
the Compiled Graph to shut down, this shutdown doesn't affect the remaining actors. See the
following code as an example:

.. testcode::

import ray
from ray.dag import InputNode, MultiOutputNode

@ray.remote
class EchoActor:
def echo(self, msg):
return msg

actors = [EchoActor.remote() for _ in range(4)]
with InputNode() as inp:
outputs = [actor.echo.bind(inp) for actor in actors]
dag = MultiOutputNode(outputs)

compiled_dag = dag.experimental_compile()
# Kill one of the actors to simulate unexpected actor death.
ray.kill(actors[0])
ref = compiled_dag.execute(1)

live_actors = []
try:
ray.get(ref)
except ray.exceptions.ActorDiedError:
# At this point, the Compiled Graph is shutting down.
for actor in actors:
try:
# Check for live actors.
ray.get(actor.echo.remote("ping"))
live_actors.append(actor)
except ray.exceptions.RayActorError:
pass

# Optionally, use the live actors to create a new Compiled Graph.
assert live_actors == actors[1:]

Timeouts
--------

Some errors, such as network errors, require additional handling to avoid hanging.
To address these cases, Compiled Graph allows configurable timeouts for
``compiled_dag.execute()`` and :func:`ray.get() <ray.get>`.

The default timeout is 10 seconds for both. Set the following environment variables
to change the default timeout:

- ``RAY_CGRAPH_submit_timeout``: Timeout for ``compiled_dag.execute()``.
- ``RAY_CGRAPH_get_timeout``: Timeout for :func:`ray.get() <ray.get>`.

:func:`ray.get() <ray.get>` also has a timeout parameter to set timeout on a per-call basis.
99 changes: 99 additions & 0 deletions doc/source/ray-core/compiled-graph/overlap.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
Overlap communication and computation
======================================

Compiled Graph currently provides experimental support for GPU communication and computation overlap. When you turn this feature on, it automatically overlaps the GPU communication with computation operations, thereby hiding the communication overhead and improving performance.

To enable this feature, specify ``_overlap_gpu_communication=True`` when calling ``dag.experimental_compile()``.

The following code has GPU communication and computation operations that benefit
from overlapping.

.. testcode::

import ray
import time
import torch
from ray.dag import InputNode, MultiOutputNode
from ray.experimental.channel.torch_tensor_type import TorchTensorType
from ray.air._internal import torch_utils

@ray.remote(num_cpus=0, num_gpus=1)
class TorchTensorWorker:
def __init__(self):
self.device = torch_utils.get_devices()[0]

def send(self, shape, dtype, value: int, send_tensor=True):
if not send_tensor:
return 1
return torch.ones(shape, dtype=dtype, device=self.device) * value

def recv_and_matmul(self, two_d_tensor):
"""
Receive the tensor and do some expensive computation (matmul).

Args:
two_d_tensor: a 2D tensor that has the same size for its dimensions
"""
# Check that tensor got loaded to the correct device.
assert two_d_tensor.dim() == 2
assert two_d_tensor.size(0) == two_d_tensor.size(1)
assert two_d_tensor.device == self.device
torch.matmul(two_d_tensor, two_d_tensor)
return (two_d_tensor[0][0].item(), two_d_tensor.shape, two_d_tensor.dtype)

def test(overlap_gpu_communication):
num_senders = 3
senders = [TorchTensorWorker.remote() for _ in range(num_senders)]
receiver = TorchTensorWorker.remote()

shape = (10000, 10000)
dtype = torch.float16

with InputNode() as inp:
branches = [sender.send.bind(shape, dtype, inp) for sender in senders]
branches = [
branch.with_type_hint(
TorchTensorType(
transport="nccl", _static_shape=True, _direct_return=True
)
)
for branch in branches
]
branches = [receiver.recv_and_matmul.bind(branch) for branch in branches]
dag = MultiOutputNode(branches)

compiled_dag = dag.experimental_compile(
_overlap_gpu_communication=overlap_gpu_communication
)

start = time.monotonic()
for i in range(5):
ref = compiled_dag.execute(i)
result = ray.get(ref)
assert result == [(i, shape, dtype)] * num_senders
duration = time.monotonic() - start
print(f"{overlap_gpu_communication=}, {duration=}")

if __name__ == "__main__":
for overlap_gpu_communication in [False, True]:
test(overlap_gpu_communication)

The output of the preceding code includes the following two lines:

.. testoutput::

overlap_gpu_communication=False, duration=1.0670117866247892
overlap_gpu_communication=True, duration=0.9211348341777921

The actual performance numbers may vary on different hardware, but enabling ``_overlap_gpu_communication`` improves latency by about 14% for this example.

To verify that Compiled Graph overlaps the communication and computation operations,
:ref:`visualize the execution schedule <execution-schedule>` by setting the environment variable
``RAY_CGRAPH_VISUALIZE_SCHEDULE=1``.

.. image:: ../../images/compiled_graph_schedule_overlap.png
:alt: Execution Schedule with GPU Communication Overlap Enabled
:align: center

Red nodes denote the operations with different execution orders in the optimized schedule
compared to the original order, due to ``_overlap_gpu_communication``.
2 changes: 2 additions & 0 deletions doc/source/ray-core/compiled-graph/ray-compiled-graph.rst
Original file line number Diff line number Diff line change
Expand Up @@ -66,5 +66,7 @@ Learn more details about Ray Compiled Graph from the following links.
:maxdepth: 1

quickstart
execution
visualization
profiling
overlap
2 changes: 2 additions & 0 deletions doc/source/ray-core/compiled-graph/visualization.rst
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ The visualization for the preceding code is shown below:

Note that tasks of the same actor are shown in the same color.

.. _execution-schedule:

Execution schedule
------------------

Expand Down
18 changes: 16 additions & 2 deletions python/ray/_private/runtime_env/packaging.py
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,7 @@ def upload_package_if_needed(
package_file = package_file.with_name(
f"{time.time_ns()}_{os.getpid()}_{package_file.name}"
)

create_package(
module_path,
package_file,
Expand Down Expand Up @@ -656,6 +657,7 @@ async def download_and_unpack_package(
base_directory: str,
gcs_aio_client: Optional["GcsAioClient"] = None, # noqa: F821
logger: Optional[logging.Logger] = default_logger,
overwrite: bool = False,
) -> str:
"""Download the package corresponding to this URI and unpack it if zipped.
Expand All @@ -668,6 +670,7 @@ async def download_and_unpack_package(
directory for the unpacked files.
gcs_aio_client: Client to use for downloading from the GCS.
logger: The logger to use.
overwrite: If True, overwrite the existing package.
Returns:
Path to the local directory containing the unpacked package files.
Expand Down Expand Up @@ -695,10 +698,21 @@ async def download_and_unpack_package(

local_dir = get_local_dir_from_uri(pkg_uri, base_directory)
assert local_dir != pkg_file, "Invalid pkg_file!"
if local_dir.exists():

download_package: bool = True
if local_dir.exists() and not overwrite:
download_package = False
assert local_dir.is_dir(), f"{local_dir} is not a directory"
else:
elif local_dir.exists():
logger.info(f"Removing {local_dir} with pkg_file {pkg_file}")
shutil.rmtree(local_dir)

if download_package:
protocol, _ = parse_uri(pkg_uri)
logger.info(
f"Downloading package from {pkg_uri} to {pkg_file} "
f"with protocol {protocol}"
)
if protocol == Protocol.GCS:
if gcs_aio_client is None:
raise ValueError(
Expand Down
6 changes: 5 additions & 1 deletion python/ray/_private/runtime_env/working_dir.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,11 @@ async def create(
logger: logging.Logger = default_logger,
) -> int:
local_dir = await download_and_unpack_package(
uri, self._resources_dir, self._gcs_aio_client, logger=logger
uri,
self._resources_dir,
self._gcs_aio_client,
logger=logger,
overwrite=True,
)
return get_directory_size_bytes(local_dir)

Expand Down
6 changes: 3 additions & 3 deletions python/ray/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -373,9 +373,9 @@ class ActorDiedError(RayActorError):
cause: The cause of the actor error. `RayTaskError` type means
the actor has died because of an exception within `__init__`.
`ActorDiedErrorContext` means the actor has died because of
unexepected system error. None means the cause is not known.
Theoretically, this should not happen,
but it is there as a safety check.
an unexpected system error. None means the cause isn't known.
Theoretically, this shouldn't happen,
but it's there as a safety check.
"""

BASE_ERROR_MSG = "The actor died unexpectedly before finishing this task."
Expand Down
6 changes: 6 additions & 0 deletions python/ray/serve/_private/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -382,3 +382,9 @@
RAY_SERVE_PROXY_GC_THRESHOLD = int(
os.environ.get("RAY_SERVE_PROXY_GC_THRESHOLD", "10000")
)

# Interval at which cached metrics will be exported using the Ray metric API.
# Set to `0` to disable caching entirely.
RAY_SERVE_METRICS_EXPORT_INTERVAL_MS = int(
os.environ.get("RAY_SERVE_METRICS_EXPORT_INTERVAL_MS", "100")
)
Loading

0 comments on commit a429d6f

Please sign in to comment.