From 8db874c472e07cabd59094090e8b4c644e449bd4 Mon Sep 17 00:00:00 2001 From: Myungjin Lee Date: Sun, 16 Jun 2024 17:20:00 -0700 Subject: [PATCH] chore: clean up unnecessary code/files (#23) There exist outdated code and files, which are not compatible with the current API in world_manager and world_communicator. Those code and files are removed. --- examples/leader_recv.py | 219 -------------------------- examples/leader_send.py | 206 ------------------------ examples/leader_threading_recv.py | 254 ------------------------------ multiworld/world_communicator.py | 8 - 4 files changed, 687 deletions(-) delete mode 100644 examples/leader_recv.py delete mode 100644 examples/leader_send.py delete mode 100644 examples/leader_threading_recv.py diff --git a/examples/leader_recv.py b/examples/leader_recv.py deleted file mode 100644 index 108e237..0000000 --- a/examples/leader_recv.py +++ /dev/null @@ -1,219 +0,0 @@ -# Copyright 2024 Cisco Systems, Inc. and its affiliates -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# SPDX-License-Identifier: Apache-2.0 - -""" -leader_recv.py: This script is a modified version of examples/single_world.py. -It demonstrates how to receive data from multiple worlds in a leader process. -""" -#!/usr/bin/env python - - -import os -import time -from datetime import timedelta - -import torch.multiprocessing as mp - -import torch -import torch.distributed as dist - -import atexit -import copy - - -def dummy(world_name, rank, size): - """ - Dummy function to be implemented later. - - Args: - world_name (str): Name of the world. - rank (int): Rank of the process. - size (int): Number of processes. - """ - - print(f"dummy function: world: {world_name}, my rank: {rank}, world size: {size}") - - -def run(world_name, rank, size): - """ - Function to send tensors from the leader process to the other process. - - Args: - world_name (str): Name of the world. - rank (int): Rank of the process. - size (int): Number of processes. - """ - while True: - # Data exchange - print(f"run function: world: {world_name}, my rank: {rank}, world size: {size}") - rank_to_send = 1 if rank == 0 else 0 - tensor = torch.ones(1) - - if world_name == "world2": - tensor = torch.ones(1) * 2 - - dist.send(tensor, dst=rank_to_send) - print(f"run function: world: {world_name}, my rank: {rank}, world size: {size}, tensor = {tensor}") - - -def init_process(port, world_name, rank, size, fn, backend="gloo"): - """ - Initialize the distributed environment. - - Args: - port (str): Port number. - world_name (str): Name of the world. - rank (int): Rank of the process. - size (int): Number of processes. - fn (function): Function to be executed. - backend (str): Backend to be used for distributed communication. - """ - os.environ["MASTER_ADDR"] = "127.0.0.1" - os.environ["MASTER_PORT"] = port - print(f"{os.getpid()} port = {port}") - store = dist.TCPStore( - "127.0.0.1", int(port), 2, True if rank == 0 else False, timedelta(seconds=30) - ) - print(f"tcp store: {store}") - dist.init_process_group(backend, rank=rank, world_size=size, store=store, group_name=world_name) - # dist.init_process_group(backend, rank=rank, world_size=size) - print("init_process_group done") - fn(world_name, rank, size) - - -@dist.WorldManager.world_initializer -def create_world(port, world_name, fn1, fn2): - """ - Create a world with the given port number and name. - - Args: - port (str): Port number. - world_name (str): Name of the world. - fn1 (function): Function to be executed in the world. - fn2 (function): Function to be executed in the world leader. - - Returns: - list: List of processes. - """ - size = 2 - processes = [] - for rank in range(size): - if rank == 0: - continue - p = mp.Process(target=init_process, args=(port, world_name, rank, size, fn1)) - p.start() - print(p.pid) - processes.append(p) - - # run leader late - init_process(port, world_name, 0, size, fn2) - - return processes - -processes = [] - -def cleanup(): - """Cleanup the spawned processes.""" - - print("Cleaning up spwaned processes") - for p in processes: - p.terminate() - - print("Cleaning up done") - - -def receive_data_request(tensor): - """ - Receive data from a process and handle runtime errors. - - Args: - tensor (torch.Tensor): Tensor to receive data. - - Returns: - dist.IRecv: Request object. - """ - try: - print("recv_data_request function: receiving data in leader") - - rank_to_recv = 1 - request = dist.irecv(tensor, src=rank_to_recv) - - return request - except RuntimeError as e: - error_message = str(e) - - if "Connection closed by peer" in error_message: - print("Ignoring Connection closed by peer error") - elif "Connection reset by peer" in error_message: - print("Ignoring Connection reset by peer error") - else: - raise e - - return None - - -def receive_data_continuous(world_manager): - """ - Receive data from multiple worlds continuously. This function is executed by the leader process. - - Args: - world_manager (dist.WorldManager): World manager object. - """ - bit = 0 - - while True: - world2_tensor = torch.zeros(1) - world_manager.set_world("world2") - request_world2 = receive_data_request(world2_tensor) - - world1_tensor = torch.zeros(1) - world_manager.set_world("world1") - request_world1 = receive_data_request(world1_tensor) - - if request_world2 is not None: - request_world2.wait() - print(f"receive_data_continuous function: world2_tensor = {world2_tensor}") - - if request_world1 is not None: - request_world1.wait() - print(f"receive_data_continuous function: world1_tensor = {world1_tensor}") - - time.sleep(2) - - -if __name__ == "__main__": - atexit.register(cleanup) - - world_manager = dist.WorldManager() - world_manager.add_world("world1") - world_manager.add_world("world2") - - size = 2 - mp.set_start_method("spawn") - - pset = create_world("29500", "world1", run, dummy, world_name="world1", world_manager=world_manager) - processes += pset - - pset = create_world("30500", "world2", run, dummy, world_name="world2", world_manager=world_manager) - processes += pset - - print("here") - - # send data from leader to world2 - receive_data_continuous(world_manager) - - for p in processes: - p.join() diff --git a/examples/leader_send.py b/examples/leader_send.py deleted file mode 100644 index 9a01d46..0000000 --- a/examples/leader_send.py +++ /dev/null @@ -1,206 +0,0 @@ -# Copyright 2024 Cisco Systems, Inc. and its affiliates -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# SPDX-License-Identifier: Apache-2.0 - -""" -leader_send.py: This example demonstrates how to send data from a leader process to a worker process in a distributed setup. -""" -#!/usr/bin/env python - - -import os -import time -from datetime import timedelta - -import torch.multiprocessing as mp - -import torch -import torch.distributed as dist - -import atexit -import copy - - -def dummy(world_name, rank, size): - """ - Dummy function to be implemented later. Arguments are similar to run function. - - Args: - world_name (str): Name of the world. - rank (int): Rank of the process. - size (int): Number of processes. - """ - - print(f"dummy function: world: {world_name}, my rank: {rank}, world size: {size}") - - -def run(world_name, rank, size): - """ - Distributed function to be implemented later. - - Args: - world_name (str): Name of the world. - rank (int): Rank of the process. - size (int): Number of processes. - """ - while True: - # Data exchange - print(f"run function: world: {world_name}, my rank: {rank}, world size: {size}") - rank_to_recv = 1 if rank == 0 else 0 - tensor = torch.zeros(1) - - dist.recv(tensor, src=rank_to_recv) - print(f"run function: world: {world_name}, my rank: {rank}, world size: {size}, tensor = {tensor}") - - -def init_process(port, world_name, rank, size, fn, backend="gloo"): - """ - Initialize the distributed environment. - - Args: - port (str): Port number. - world_name (str): Name of the world. - rank (int): Rank of the process. - size (int): Number of processes. - fn (function): Function to be executed. - backend (str): Backend to be used. - """ - os.environ["MASTER_ADDR"] = "127.0.0.1" - os.environ["MASTER_PORT"] = port - print(f"{os.getpid()} port = {port}") - store = dist.TCPStore( - "127.0.0.1", int(port), 2, True if rank == 0 else False, timedelta(seconds=30) - ) - print(f"tcp store: {store}") - dist.init_process_group(backend, rank=rank, world_size=size, store=store, group_name=world_name) - # dist.init_process_group(backend, rank=rank, world_size=size) - print("init_process_group done") - fn(world_name, rank, size) - - -@dist.WorldManager.world_initializer -def create_world(port, world_name, fn1, fn2): - """ - Create a world with the given port and world name. - - Args: - port (str): Port number. - world_name (str): Name of the world. - fn1 (function): Function to be executed. - fn2 (function): Function to be executed. - - Returns: - list: List of processes. - """ - size = 2 - processes = [] - for rank in range(size): - if rank == 0: - continue - p = mp.Process(target=init_process, args=(port, world_name, rank, size, fn1)) - p.start() - print(p.pid) - processes.append(p) - - # run leader late - init_process(port, world_name, 0, size, fn2) - - return processes - -processes = [] - -def cleanup(): - """Cleanup function to terminate all spawned processes.""" - print("Cleaning up spwaned processes") - for p in processes: - p.terminate() - - print("Cleaning up done") - - -def send_data(tensor): - """ - Send data from leader to a worker process and ignore connection errors. - - Args: - tensor (torch.Tensor): Data to be sent. - """ - try: - print("send_data function: sending data from leader") - - rank_to_send = 1 - # tensor = torch.ones(1) - dist.send(tensor, dst=rank_to_send) - - print(f"send_data function: data sent from leader, tensor = {tensor}") - except RuntimeError as e: - error_message = str(e) - - if "Connection closed by peer" in error_message: - print("Ignoring Connection closed by peer error") - elif "Connection reset by peer" in error_message: - print("Ignoring Connection reset by peer error") - else: - raise e - - -def send_data_continuous(world_manager): - """ - Send data from leader to a worker process continuously. - - Args: - world_manager (dist.WorldManager): World manager object. - """ - bit = 0 - - while True: - if bit == 0: - world_manager.set_world("world2") - tensor = torch.ones(1) * 2 - else: - world_manager.set_world("world1") - tensor = torch.ones(1) * 1 - - send_data(tensor) - - # flip bit - bit = 1 - bit - - time.sleep(2) - - -if __name__ == "__main__": - atexit.register(cleanup) - - world_manager = dist.WorldManager() - world_manager.add_world("world1") - world_manager.add_world("world2") - - size = 2 - mp.set_start_method("spawn") - - pset = create_world("29500", "world1", run, dummy, world_name="world1", world_manager=world_manager) - processes += pset - - pset = create_world("30500", "world2", run, dummy, world_name="world2", world_manager=world_manager) - processes += pset - - print("here") - - # send data from leader to world2 - send_data_continuous(world_manager) - - for p in processes: - p.join() diff --git a/examples/leader_threading_recv.py b/examples/leader_threading_recv.py deleted file mode 100644 index 1c965fa..0000000 --- a/examples/leader_threading_recv.py +++ /dev/null @@ -1,254 +0,0 @@ -# Copyright 2024 Cisco Systems, Inc. and its affiliates -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# SPDX-License-Identifier: Apache-2.0 - -""" -leader_threading_recv.py: This script is a modified version of examples/leader_recv.py. -It demonstrates how to receive data from multiple worlds in a leader process using threading. -""" -#!/usr/bin/env python - - -import argparse -import atexit -import os -import time - -import torch -import torch.distributed as dist -import torch.multiprocessing as mp - - -def dummy(world_name, rank, size, backend): - """ - Dummy function to be implemented later. - - Args: - world_name (str): Name of the world. - rank (int): Rank of the process. - size (int): Number of processes. - backend (str): Backend used for communication. - """ - - print(f"dummy function: world: {world_name}, my rank: {rank}, world size: {size}") - - -def run(world_name, rank, size, backend): - """ - Function to send tensors from the leader process to the other process. - - Args: - world_name (str): Name of the world. - rank (int): Rank of the process. - size (int): Number of processes. - backend (str): Backend used for communication. - """ - while True: - # Data exchange - print(f"run function: world: {world_name}, my rank: {rank}, world size: {size}") - rank_to_send = 1 if rank == 0 else 0 - tensor = torch.ones(1) - - if world_name == "world2": - tensor = torch.ones(1) * 2 - - tensor = tensor.to(f"cuda:{rank}") if backend == "nccl" else tensor - - time.sleep(4) - - dist.send(tensor, dst=rank_to_send) - print( - f"run function: world: {world_name}, my rank: {rank}, world size: {size}, tensor = {tensor}" - ) - - -world_manager = None - - -def init_world(world_name, rank, size, fn, backend="gloo", addr="127.0.0.1", port=-1): - """ - Initialize the distributed environment. - - Args: - world_name (str): Name of the world. - rank (int): Rank of the process. - size (int): Number of processes. - fn (function): Function to be executed. - backend (str): Backend to be used. - addr (str): Address of the leader process. - port (int): Port number. - """ - global world_manager - - if world_manager is None: - # TODO: make WorldManager as singleton - world_manager = dist.WorldManager() - - world_manager.initialize_world( - world_name, rank, size, backend=backend, addr=addr, port=port - ) - - fn(world_name, rank, size, backend) - - -# @dist.WorldManager.world_initializer -def create_world(world_name, addr, port, backend, fn1, fn2): - """ - Create a world with the given port and world name. - - Args: - world_name (str): Name of the world. - addr (str): Address of the leader process. - port (int): Port number. - backend (str): Backend to be used. - fn1 (function): Function to be executed in the world. - fn2 (function): Function to be executed in the world leader. - - Returns: - list: List of processes. - """ - size = 2 - processes = [] - for rank in range(size): - if rank == 0: - continue - p = mp.Process( - target=init_world, args=(world_name, rank, size, fn1, backend, addr, port) - ) - p.start() - print(p.pid) - processes.append(p) - - # run leader late - init_world(world_name, 0, size, fn2, backend, addr, port) - - return processes - - -processes = [] - - -def cleanup(): - """Cleanup function to terminate all spawned processes.""" - print("Cleaning up spwaned processes") - for p in processes: - p.terminate() - - print("Cleaning up done") - - -def receive_data_continuous(world_communicator, backend): - """ - Receive data continuously in the leader process from the other processes. - - Args: - world_communicator (dist.Communicator): Communicator object. - backend (str): Backend used for communication. - """ - bit = 0 - - while True: - world1_tensor = torch.zeros(1) - world1_tensor = ( - world1_tensor.to("cuda:0") if backend == "nccl" else world1_tensor - ) - world_communicator.recv(world1_tensor, "world1", 1) - - time.sleep(2) - - world2_tensor = torch.zeros(1) - world2_tensor = ( - world2_tensor.to("cuda:0") if backend == "nccl" else world2_tensor - ) - world_communicator.recv(world2_tensor, "world2", 1) - - time.sleep(2) - - # Empty the queue until we reach and Exception using get_nowait - try: - while True: - tensor = world_communicator.rx_q.get_nowait() - print(f"Received tensor: {tensor}") - except Exception as e: - print(e) - pass - - -def single_host(args): - """ - Run the processes on a single host. - - Args: - args (argparse.Namespace): Command line arguments. - """ - processes = [] - mp.set_start_method("spawn") - - pset = create_world("world1", "127.0.0.1", 29500, args.backend, run, dummy) - processes += pset - - pset = create_world("world2", "127.0.0.1", 30500, args.backend, run, dummy) - processes += pset - - receive_data_continuous(world_manager.communicator, args.backend) - - for p in processes: - p.join() - - -def multi_host(args): - """ - Run the processes on multiple hosts. - - Args: - args (argparse.Namespace): Command line arguments. - """ - size = 2 - if args.rank == 0: - init_world("world1", args.rank, size, dummy, args.backend, args.addr, 29500) - init_world("world2", args.rank, size, dummy, args.backend, args.addr, 30500) - receive_data_continuous(world_manager.communicator, args.backend) - - elif args.rank == 1: - init_world("world1", 1, size, run, args.backend, args.addr, 29500) - - elif args.rank == 2: - init_world("world2", 1, size, run, args.backend, args.addr, 30500) - - else: - print("rank error: rank should be 0, 1 or 2.") - exit(1) - - -if __name__ == "__main__": - parser = argparse.ArgumentParser() - parser.add_argument("--backend", default="gloo") - parser.add_argument("--addr", default="127.0.0.1") - parser.add_argument("--rank", type=int) - parser.add_argument( - "--multihost", action=argparse.BooleanOptionalAction, default=False - ) - - # https://github.com/pytorch/pytorch/blob/main/torch/csrc/distributed/c10d/ProcessGroupNCCL.hpp#L114-L126 - # "2" is CleanUpOnly - os.environ["TORCH_NCCL_ASYNC_ERROR_HANDLING"] = "2" - - args = parser.parse_args() - atexit.register(cleanup) - - if not args.multihost: - single_host(args) - else: - multi_host(args) diff --git a/multiworld/world_communicator.py b/multiworld/world_communicator.py index b70b5e1..b866bb5 100644 --- a/multiworld/world_communicator.py +++ b/multiworld/world_communicator.py @@ -19,7 +19,6 @@ import asyncio import logging -from queue import SimpleQueue as SimpleSyncQ from typing import TYPE_CHECKING import torch.distributed as dist @@ -67,8 +66,6 @@ def __init__(self, world_manager: WorldManager): self._world_manager = world_manager self._broken_world: dict[str, bool] = {} - self._tensor_rx_q = SimpleSyncQ() - self._loop = asyncio.get_running_loop() def __del__(self): @@ -209,8 +206,3 @@ def _handle_error(self, error: RuntimeError, world_name: str) -> None: raise BrokenWorldException(f"{world_name}") raise error - - @property - def rx_q(self): - """Return the rx queue for received tensors.""" - return self._tensor_rx_q