Skip to content

Commit

Permalink
up
Browse files Browse the repository at this point in the history
Signed-off-by: Rui Qiao <[email protected]>
  • Loading branch information
ruisearch42 committed Jan 22, 2025
1 parent 758c57c commit 752fe99
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 22 deletions.
40 changes: 18 additions & 22 deletions python/ray/dag/compiled_dag_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -912,7 +912,7 @@ def __init__(
"ray.actor.ActorHandle", List[_DAGNodeOperation]
] = defaultdict(list)
# Mapping from the actor handle to the node ID that the actor is on.
self.actor_to_node_id: Dict["ray.actor.ActorHandle", str] = {}
self.actor_to_node_id: Dict[Optional["ray.actor.ActorHandle"], str] = {}

# This is set to true when type hint of `transport="nccl"` is used.
self._use_default_nccl_group = False
Expand Down Expand Up @@ -1086,7 +1086,7 @@ def _preprocess(self) -> None:
)

if actor_handle not in self.actor_to_gpu_ids:
self.actor_to_gpu_ids[actor_handle] = self._get_gpu_ids(
self.actor_to_gpu_ids[actor_handle] = CompiledDAG._get_gpu_ids(
actor_handle
)

Expand Down Expand Up @@ -1132,17 +1132,23 @@ def _preprocess(self) -> None:
"supported for NCCL collective operations. Please set "
"overlap_gpu_communication=False."
)
elif isinstance(dag_node, InputNode):
elif isinstance(dag_node, InputNode) or isinstance(
dag_node, InputAttributeNode
):
if dag_node.type_hint.requires_nccl():
raise ValueError(
"DAG inputs cannot be transferred via NCCL because "
"the driver cannot participate in the NCCL group"
)
if isinstance(dag_node.type_hint, AutoTransportType):
# Currently driver on GPU is not supported, so we always
# use shared memory to transfer tensors.
dag_node.type_hint = TorchTensorType()

if type(dag_node.type_hint) is ChannelOutputType:
# No type hint specified by the user. Replace
# with the default type hint for this DAG.
dag_node.type_hint(self._default_type_hint)
dag_node.type_hint = self._default_type_hint

for _, val in task.kwargs.items():
if isinstance(val, DAGNode):
Expand Down Expand Up @@ -1239,12 +1245,6 @@ def _preprocess(self) -> None:
for task in auto_transport_tasks:
writer = task.dag_node._get_actor_handle()
readers = task.downstream_task_idxs.values()
if any(reader is None for reader in readers):
# None means reader is the driver, currently driver on GPU
# is not supported, so we always use shared memory to transfer
# tensors.
task.dag_node.type_hint = TorchTensorType()
continue
writer_and_node = (writer, self._get_node_id(writer))
reader_and_node_list = [
(reader, self._get_node_id(reader)) for reader in readers
Expand Down Expand Up @@ -1343,38 +1343,39 @@ def _preprocess(self) -> None:
self._input_num_positional_args = max(input_positional_args) + 1
self._input_kwargs = tuple(input_kwargs)

def _get_gpu_ids(self, actor_handle: "ray.actor.ActorHandle") -> List[str]:
@staticmethod
def _get_gpu_ids(actor_handle: "ray.actor.ActorHandle") -> List[str]:
"""
Get the GPU IDs of an actor handle.
"""
accelerator_ids = ray.get(
actor_handle.__ray_call__.remote(
lambda self: ray.get_runtime_context().get_accelerator_ids()
lambda: ray.get_runtime_context().get_accelerator_ids()
)
)
return accelerator_ids.get("GPU", [])

def _get_node_id(self, actor_handle: "ray.actor.ActorHandle") -> str:
def _get_node_id(self, actor_handle: Optional["ray.actor.ActorHandle"]) -> str:
"""
Get the node ID of an actor handle and cache it.
Args:
actor_handle: The actor handle.
actor_handle: The actor handle, or None if the actor handle is the
driver.
Returns:
The node ID of the actor handle.
The node ID of the actor handle or driver.
"""
if actor_handle in self.actor_to_node_id:
return self.actor_to_node_id[actor_handle]
node_id = None
if actor_handle == self._proxy_actor:
if actor_handle == self._proxy_actor or actor_handle is None:
node_id = ray.get_runtime_context().get_node_id()
else:
node_id = ray.get(
actor_handle.__ray_call__.remote(
lambda self: ray.get_runtime_context().get_node_id()
)
)
assert node_id is not None
self.actor_to_node_id[actor_handle] = node_id
return node_id

Expand Down Expand Up @@ -1555,11 +1556,6 @@ def _get_or_compile(
input_node_to_reader_and_node_set[input_dag_node]
)

if isinstance(input_dag_node.type_hint, AutoTransportType):
# Currently driver on GPU is not supported, so we always
# use shared memory to transfer tensors.
input_dag_node.type_hint = TorchTensorType()

output_channel = do_allocate_channel(
self,
reader_and_node_list,
Expand Down
6 changes: 6 additions & 0 deletions python/ray/experimental/channel/auto_transport_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,12 @@ def resolve(
writer = writer_and_node[0]
readers = [reader for reader, _ in reader_and_node_list]

if any(reader is None for reader in readers):
# None means reader is the driver, currently driver on GPU
# is not supported, so we always use shared memory to transfer
# tensors.
return TorchTensorType()

# Case 1: writer and readers don't both use GPU, use shared memory
# to transport the tensors
if not (self._use_gpu(writer) and self._use_gpu(readers)):
Expand Down

0 comments on commit 752fe99

Please sign in to comment.