From fbd16d9585f10d4070305056351f1156801b38a5 Mon Sep 17 00:00:00 2001 From: Jiajun Yao Date: Fri, 15 Nov 2024 16:21:58 -0800 Subject: [PATCH 01/15] Introduce --batch-size-to-kill to release chaos test framework Signed-off-by: Jiajun Yao --- python/ray/_private/test_utils.py | 70 ++++++++++++++-------------- release/nightly_tests/setup_chaos.py | 7 +-- release/release_tests.yaml | 2 +- 3 files changed, 39 insertions(+), 40 deletions(-) diff --git a/python/ray/_private/test_utils.py b/python/ray/_private/test_utils.py index 1eb26e0fad25..1d4cd0922e51 100644 --- a/python/ray/_private/test_utils.py +++ b/python/ray/_private/test_utils.py @@ -1443,6 +1443,7 @@ def __init__( head_node_id, kill_interval_s: float = 60, max_to_kill: int = 2, + batch_size_to_kill: int = 1, kill_filter_fn: Optional[Callable] = None, ): self.kill_interval_s = kill_interval_s @@ -1451,6 +1452,7 @@ def __init__( self.killed = set() self.done = ray._private.utils.get_or_create_event_loop().create_future() self.max_to_kill = max_to_kill + self.batch_size_to_kill = batch_size_to_kill self.kill_filter_fn = kill_filter_fn self.kill_immediately_after_found = False # -- logger. -- @@ -1462,7 +1464,7 @@ def ready(self): async def run(self): self.is_running = True while self.is_running: - to_kill = await self._find_resource_to_kill() + to_kills = await self._find_resources_to_kill() if not self.is_running: break @@ -1473,14 +1475,15 @@ async def run(self): sleep_interval = random.random() * self.kill_interval_s time.sleep(sleep_interval) - self._kill_resource(*to_kill) + for to_kill in to_kills: + self._kill_resource(*to_kill) if len(self.killed) >= self.max_to_kill: break await asyncio.sleep(self.kill_interval_s - sleep_interval) self.done.set_result(True) - async def _find_resource_to_kill(self): + async def _find_resources_to_kill(self): raise NotImplementedError def _kill_resource(self, *args): @@ -1498,38 +1501,35 @@ async def get_total_killed(self): class NodeKillerBase(ResourceKillerActor): - async def _find_resource_to_kill(self): - node_to_kill_ip = None - node_to_kill_port = None - node_id = None - while node_to_kill_port is None and self.is_running: - nodes = ray.nodes() - alive_nodes = self._get_alive_nodes(nodes) + async def _find_resources_to_kill(self): + nodes_to_kill = [] + while not nodes_to_kill and self.is_running: + candidates = [ + node + for node in ray.nodes() + if node["Alive"] and (node["NodeID"] != self.head_node_id) + ] if self.kill_filter_fn is not None: - nodes = list(filter(self.kill_filter_fn(), nodes)) - for node in nodes: - node_id = node["NodeID"] - # make sure at least 1 worker node is alive. - if ( - node["Alive"] - and node_id != self.head_node_id - and node_id not in self.killed - and alive_nodes > 2 - ): - node_to_kill_ip = node["NodeManagerAddress"] - node_to_kill_port = node["NodeManagerPort"] - break - # Give the cluster some time to start. - await asyncio.sleep(0.1) + candidates = list(filter(self.kill_filter_fn(), candidates)) - return node_id, node_to_kill_ip, node_to_kill_port + # make sure at least 1 worker node is alive. + if (len(candidates) - 1) <= self.batch_size_to_kill: + # Give the cluster some time to start. + await asyncio.sleep(1) + continue + + for candidate in candidates: + nodes_to_kill.append( + ( + candidate["NodeID"], + candidate["NodeManagerAddress"], + candidate["NodeManagerPort"], + ) + ) + if len(nodes_to_kill) == self.batch_size_to_kill: + break - def _get_alive_nodes(self, nodes): - alive_nodes = 0 - for node in nodes: - if node["Alive"]: - alive_nodes += 1 - return alive_nodes + return nodes_to_kill @ray.remote(num_cpus=0) @@ -1619,7 +1619,7 @@ def __init__( ] ) - async def _find_resource_to_kill(self): + async def _find_resources_to_kill(self): from ray.util.state.common import StateResource process_to_kill_task_id = None @@ -1644,7 +1644,7 @@ async def _find_resource_to_kill(self): # Give the cluster some time to start. await asyncio.sleep(0.1) - return process_to_kill_task_id, process_to_kill_pid, process_to_kill_node_id + return [(process_to_kill_task_id, process_to_kill_pid, process_to_kill_node_id)] def _kill_resource( self, process_to_kill_task_id, process_to_kill_pid, process_to_kill_node_id @@ -1681,6 +1681,7 @@ def get_and_run_resource_killer( lifetime=None, no_start=False, max_to_kill=2, + batch_size_to_kill=1, kill_delay_s=0, kill_filter_fn=None, ): @@ -1699,6 +1700,7 @@ def get_and_run_resource_killer( head_node_id, kill_interval_s=kill_interval_s, max_to_kill=max_to_kill, + batch_size_to_kill=batch_size_to_kill, kill_filter_fn=kill_filter_fn, ) print("Waiting for ResourceKiller to be ready...") diff --git a/release/nightly_tests/setup_chaos.py b/release/nightly_tests/setup_chaos.py index b88c81ed3db2..af441369aebc 100644 --- a/release/nightly_tests/setup_chaos.py +++ b/release/nightly_tests/setup_chaos.py @@ -14,9 +14,6 @@ def parse_script_args(): parser = argparse.ArgumentParser() - # '--kill-workers' to be deprecated in favor of '--chaos' - parser.add_argument("--kill-workers", action="store_true", default=False) - parser.add_argument( "--chaos", type=str, @@ -29,6 +26,7 @@ def parse_script_args(): parser.add_argument("--kill-interval", type=int, default=60) parser.add_argument("--max-to-kill", type=int, default=2) + parser.add_argument("--batch-size-to-kill", type=int, default=1) parser.add_argument( "--no-start", action="store_true", @@ -94,8 +92,6 @@ def _filter_fn(node): def get_chaos_killer(args): if args.chaos != "": chaos_type = args.chaos - elif args.kill_workers: - chaos_type = "KillWorker" else: chaos_type = "KillRaylet" # default @@ -125,6 +121,7 @@ def main(): lifetime="detached", no_start=args.no_start, max_to_kill=args.max_to_kill, + batch_size_to_kill=args.batch_size_to_kill, kill_delay_s=args.kill_delay, kill_filter_fn=kill_filter_fn, ) diff --git a/release/release_tests.yaml b/release/release_tests.yaml index 33dc2486eae6..4da008bca61c 100644 --- a/release/release_tests.yaml +++ b/release/release_tests.yaml @@ -4177,7 +4177,7 @@ run: timeout: 18000 - prepare: python setup_chaos.py --kill-workers --kill-interval 100 --max-to-kill 3 --task-names "ReadImage->Map(wnid_to_index)->Map(crop_and_flip_image)" + prepare: python setup_chaos.py --chaos KillWorker --kill-interval 100 --max-to-kill 3 --task-names "ReadImage->Map(wnid_to_index)->Map(crop_and_flip_image)" script: python dataset/multi_node_train_benchmark.py --num-workers 4 --file-type image --use-gpu --num-epochs 1 variations: From 2b1901af1913836b7206a2d3777dd1bd5fc4e4df Mon Sep 17 00:00:00 2001 From: Jiajun Yao Date: Fri, 15 Nov 2024 23:12:10 -0800 Subject: [PATCH 02/15] up Signed-off-by: Jiajun Yao --- release/nightly_tests/setup_chaos.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/release/nightly_tests/setup_chaos.py b/release/nightly_tests/setup_chaos.py index af441369aebc..0dc474101b27 100644 --- a/release/nightly_tests/setup_chaos.py +++ b/release/nightly_tests/setup_chaos.py @@ -126,7 +126,7 @@ def main(): kill_filter_fn=kill_filter_fn, ) print( - f"Successfully deployed a {'worker' if args.kill_workers else 'node'} killer." + f"Successfully deployed a {resource_killer_cls} killer." ) From e80e29017c260a9180ac6efa05caf3540d228900 Mon Sep 17 00:00:00 2001 From: Jiajun Yao Date: Tue, 10 Dec 2024 14:21:10 -0800 Subject: [PATCH 03/15] up Signed-off-by: Jiajun Yao --- python/ray/_private/test_utils.py | 70 ++++++++++++++-------------- release/nightly_tests/setup_chaos.py | 9 ++-- release/release_tests.yaml | 2 +- 3 files changed, 41 insertions(+), 40 deletions(-) diff --git a/python/ray/_private/test_utils.py b/python/ray/_private/test_utils.py index 1d4cd0922e51..1eb26e0fad25 100644 --- a/python/ray/_private/test_utils.py +++ b/python/ray/_private/test_utils.py @@ -1443,7 +1443,6 @@ def __init__( head_node_id, kill_interval_s: float = 60, max_to_kill: int = 2, - batch_size_to_kill: int = 1, kill_filter_fn: Optional[Callable] = None, ): self.kill_interval_s = kill_interval_s @@ -1452,7 +1451,6 @@ def __init__( self.killed = set() self.done = ray._private.utils.get_or_create_event_loop().create_future() self.max_to_kill = max_to_kill - self.batch_size_to_kill = batch_size_to_kill self.kill_filter_fn = kill_filter_fn self.kill_immediately_after_found = False # -- logger. -- @@ -1464,7 +1462,7 @@ def ready(self): async def run(self): self.is_running = True while self.is_running: - to_kills = await self._find_resources_to_kill() + to_kill = await self._find_resource_to_kill() if not self.is_running: break @@ -1475,15 +1473,14 @@ async def run(self): sleep_interval = random.random() * self.kill_interval_s time.sleep(sleep_interval) - for to_kill in to_kills: - self._kill_resource(*to_kill) + self._kill_resource(*to_kill) if len(self.killed) >= self.max_to_kill: break await asyncio.sleep(self.kill_interval_s - sleep_interval) self.done.set_result(True) - async def _find_resources_to_kill(self): + async def _find_resource_to_kill(self): raise NotImplementedError def _kill_resource(self, *args): @@ -1501,35 +1498,38 @@ async def get_total_killed(self): class NodeKillerBase(ResourceKillerActor): - async def _find_resources_to_kill(self): - nodes_to_kill = [] - while not nodes_to_kill and self.is_running: - candidates = [ - node - for node in ray.nodes() - if node["Alive"] and (node["NodeID"] != self.head_node_id) - ] + async def _find_resource_to_kill(self): + node_to_kill_ip = None + node_to_kill_port = None + node_id = None + while node_to_kill_port is None and self.is_running: + nodes = ray.nodes() + alive_nodes = self._get_alive_nodes(nodes) if self.kill_filter_fn is not None: - candidates = list(filter(self.kill_filter_fn(), candidates)) - - # make sure at least 1 worker node is alive. - if (len(candidates) - 1) <= self.batch_size_to_kill: - # Give the cluster some time to start. - await asyncio.sleep(1) - continue - - for candidate in candidates: - nodes_to_kill.append( - ( - candidate["NodeID"], - candidate["NodeManagerAddress"], - candidate["NodeManagerPort"], - ) - ) - if len(nodes_to_kill) == self.batch_size_to_kill: + nodes = list(filter(self.kill_filter_fn(), nodes)) + for node in nodes: + node_id = node["NodeID"] + # make sure at least 1 worker node is alive. + if ( + node["Alive"] + and node_id != self.head_node_id + and node_id not in self.killed + and alive_nodes > 2 + ): + node_to_kill_ip = node["NodeManagerAddress"] + node_to_kill_port = node["NodeManagerPort"] break + # Give the cluster some time to start. + await asyncio.sleep(0.1) + + return node_id, node_to_kill_ip, node_to_kill_port - return nodes_to_kill + def _get_alive_nodes(self, nodes): + alive_nodes = 0 + for node in nodes: + if node["Alive"]: + alive_nodes += 1 + return alive_nodes @ray.remote(num_cpus=0) @@ -1619,7 +1619,7 @@ def __init__( ] ) - async def _find_resources_to_kill(self): + async def _find_resource_to_kill(self): from ray.util.state.common import StateResource process_to_kill_task_id = None @@ -1644,7 +1644,7 @@ async def _find_resources_to_kill(self): # Give the cluster some time to start. await asyncio.sleep(0.1) - return [(process_to_kill_task_id, process_to_kill_pid, process_to_kill_node_id)] + return process_to_kill_task_id, process_to_kill_pid, process_to_kill_node_id def _kill_resource( self, process_to_kill_task_id, process_to_kill_pid, process_to_kill_node_id @@ -1681,7 +1681,6 @@ def get_and_run_resource_killer( lifetime=None, no_start=False, max_to_kill=2, - batch_size_to_kill=1, kill_delay_s=0, kill_filter_fn=None, ): @@ -1700,7 +1699,6 @@ def get_and_run_resource_killer( head_node_id, kill_interval_s=kill_interval_s, max_to_kill=max_to_kill, - batch_size_to_kill=batch_size_to_kill, kill_filter_fn=kill_filter_fn, ) print("Waiting for ResourceKiller to be ready...") diff --git a/release/nightly_tests/setup_chaos.py b/release/nightly_tests/setup_chaos.py index 0dc474101b27..b88c81ed3db2 100644 --- a/release/nightly_tests/setup_chaos.py +++ b/release/nightly_tests/setup_chaos.py @@ -14,6 +14,9 @@ def parse_script_args(): parser = argparse.ArgumentParser() + # '--kill-workers' to be deprecated in favor of '--chaos' + parser.add_argument("--kill-workers", action="store_true", default=False) + parser.add_argument( "--chaos", type=str, @@ -26,7 +29,6 @@ def parse_script_args(): parser.add_argument("--kill-interval", type=int, default=60) parser.add_argument("--max-to-kill", type=int, default=2) - parser.add_argument("--batch-size-to-kill", type=int, default=1) parser.add_argument( "--no-start", action="store_true", @@ -92,6 +94,8 @@ def _filter_fn(node): def get_chaos_killer(args): if args.chaos != "": chaos_type = args.chaos + elif args.kill_workers: + chaos_type = "KillWorker" else: chaos_type = "KillRaylet" # default @@ -121,12 +125,11 @@ def main(): lifetime="detached", no_start=args.no_start, max_to_kill=args.max_to_kill, - batch_size_to_kill=args.batch_size_to_kill, kill_delay_s=args.kill_delay, kill_filter_fn=kill_filter_fn, ) print( - f"Successfully deployed a {resource_killer_cls} killer." + f"Successfully deployed a {'worker' if args.kill_workers else 'node'} killer." ) diff --git a/release/release_tests.yaml b/release/release_tests.yaml index 4da008bca61c..33dc2486eae6 100644 --- a/release/release_tests.yaml +++ b/release/release_tests.yaml @@ -4177,7 +4177,7 @@ run: timeout: 18000 - prepare: python setup_chaos.py --chaos KillWorker --kill-interval 100 --max-to-kill 3 --task-names "ReadImage->Map(wnid_to_index)->Map(crop_and_flip_image)" + prepare: python setup_chaos.py --kill-workers --kill-interval 100 --max-to-kill 3 --task-names "ReadImage->Map(wnid_to_index)->Map(crop_and_flip_image)" script: python dataset/multi_node_train_benchmark.py --num-workers 4 --file-type image --use-gpu --num-epochs 1 variations: From 0de1034f6c3dc11f02ce187630b460bac083058b Mon Sep 17 00:00:00 2001 From: Jiajun Yao Date: Tue, 10 Dec 2024 16:06:44 -0800 Subject: [PATCH 04/15] up Signed-off-by: Jiajun Yao --- .../chaos_test/test_update_compute_config.py | 37 +++++++++++++++++++ release/release_tests.yaml | 19 ++++++++++ 2 files changed, 56 insertions(+) create mode 100644 release/nightly_tests/chaos_test/test_update_compute_config.py diff --git a/release/nightly_tests/chaos_test/test_update_compute_config.py b/release/nightly_tests/chaos_test/test_update_compute_config.py new file mode 100644 index 000000000000..aa8a7f15e3db --- /dev/null +++ b/release/nightly_tests/chaos_test/test_update_compute_config.py @@ -0,0 +1,37 @@ +import os +import requests +import anyscale +import time + +cluster_id = os.environ["ANYSCALE_CLUSTER_ID"] +print(f"cluster id: {cluster_id}") + +sdk = anyscale.AnyscaleSDK() +cluster = sdk.get_cluster(cluster_id) +print(f"cluster: {cluster}") + +existing_compute_config = anyscale.compute_config.get( + name="", _id=cluster.result.cluster_compute_id +).config +compute_config_dict = existing_compute_config.to_dict() +print(f"compute config {compute_config_dict}") +compute_config_dict["worker_nodes"][0]["max_nodes"] = 11 +new_compute_config = anyscale.compute_config.models.ComputeConfig.from_dict( + compute_config_dict +) +new_compute_config_name = anyscale.compute_config.create(new_compute_config, name=None) +new_compute_config_id = anyscale.compute_config.get(name=new_compute_config_name).id +print(f"new compute config {new_compute_config_id}") + +response = requests.put( + f"https://console.anyscale.com/api/v2/sessions/{cluster_id}/cluster_config_with_session_idle_timeout", + params={ + "build_id": cluster.result.cluster_environment_build_id, + "compute_template_id": new_compute_config_id, + }, + headers={"Authorization": f"Bearer {os.environ['ANYSCALE_CLI_TOKEN']}"}, +) +print(response) + + +time.sleep(1000000) diff --git a/release/release_tests.yaml b/release/release_tests.yaml index 21e28449d195..3791415cf0a2 100644 --- a/release/release_tests.yaml +++ b/release/release_tests.yaml @@ -3737,6 +3737,25 @@ # Core Chaos tests ################## +- name: chaos_update_compute_config + group: core-nightly-test + working_dir: nightly_tests + + frequency: nightly + team: core + cluster: + byod: {} + cluster_compute: chaos_test/compute_template.yaml + + run: + timeout: 3600 + wait_for_nodes: + num_nodes: 10 + script: python chaos_test/test_update_compute_config.py + + variations: + - __suffix__: aws + - name: chaos_many_tasks_kill_raylet group: core-nightly-test working_dir: nightly_tests From f046f01c0511964df70d1659ca730ec401e30648 Mon Sep 17 00:00:00 2001 From: Jiajun Yao Date: Tue, 10 Dec 2024 22:11:37 -0800 Subject: [PATCH 05/15] up Signed-off-by: Jiajun Yao --- release/nightly_tests/chaos_test/test_update_compute_config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/release/nightly_tests/chaos_test/test_update_compute_config.py b/release/nightly_tests/chaos_test/test_update_compute_config.py index aa8a7f15e3db..a35cfaf8a02d 100644 --- a/release/nightly_tests/chaos_test/test_update_compute_config.py +++ b/release/nightly_tests/chaos_test/test_update_compute_config.py @@ -24,7 +24,7 @@ print(f"new compute config {new_compute_config_id}") response = requests.put( - f"https://console.anyscale.com/api/v2/sessions/{cluster_id}/cluster_config_with_session_idle_timeout", + f"https://console.anyscale-staging.com/api/v2/sessions/{cluster_id}/cluster_config_with_session_idle_timeout", params={ "build_id": cluster.result.cluster_environment_build_id, "compute_template_id": new_compute_config_id, From 73685575652531e422b4b14a00557ffa79729aac Mon Sep 17 00:00:00 2001 From: Jiajun Yao Date: Wed, 11 Dec 2024 10:16:15 -0800 Subject: [PATCH 06/15] up Signed-off-by: Jiajun Yao --- release/nightly_tests/chaos_test/compute_template.yaml | 1 - release/nightly_tests/chaos_test/test_update_compute_config.py | 1 - 2 files changed, 2 deletions(-) diff --git a/release/nightly_tests/chaos_test/compute_template.yaml b/release/nightly_tests/chaos_test/compute_template.yaml index f91504fb6937..308aa48c59bd 100644 --- a/release/nightly_tests/chaos_test/compute_template.yaml +++ b/release/nightly_tests/chaos_test/compute_template.yaml @@ -5,7 +5,6 @@ advanced_configurations_json: IamInstanceProfile: {"Name": "ray-autoscaler-v1"} head_node_type: - name: head_node instance_type: m5.16xlarge resources: cpu: 0 diff --git a/release/nightly_tests/chaos_test/test_update_compute_config.py b/release/nightly_tests/chaos_test/test_update_compute_config.py index a35cfaf8a02d..14928dcbba56 100644 --- a/release/nightly_tests/chaos_test/test_update_compute_config.py +++ b/release/nightly_tests/chaos_test/test_update_compute_config.py @@ -33,5 +33,4 @@ ) print(response) - time.sleep(1000000) From 28bf2fec8dc2a5d898b54c4cba534044c34c5ef1 Mon Sep 17 00:00:00 2001 From: Jiajun Yao Date: Wed, 11 Dec 2024 10:54:53 -0800 Subject: [PATCH 07/15] up Signed-off-by: Jiajun Yao --- release/nightly_tests/chaos_test/compute_template.yaml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/release/nightly_tests/chaos_test/compute_template.yaml b/release/nightly_tests/chaos_test/compute_template.yaml index 308aa48c59bd..edd0fba557be 100644 --- a/release/nightly_tests/chaos_test/compute_template.yaml +++ b/release/nightly_tests/chaos_test/compute_template.yaml @@ -5,6 +5,7 @@ advanced_configurations_json: IamInstanceProfile: {"Name": "ray-autoscaler-v1"} head_node_type: + name: head-node instance_type: m5.16xlarge resources: cpu: 0 @@ -12,7 +13,7 @@ head_node_type: head: 1 worker_node_types: - - name: worker_node + - name: worker-node instance_type: m5.4xlarge min_workers: 9 max_workers: 9 From 4103637852fb502e13c3a7128f5719ae2da3582b Mon Sep 17 00:00:00 2001 From: Jiajun Yao Date: Wed, 11 Dec 2024 13:40:07 -0800 Subject: [PATCH 08/15] up Signed-off-by: Jiajun Yao --- .../chaos_test/compute_template.yaml | 4 +-- .../chaos_test/test_update_compute_config.py | 36 ------------------- release/release_tests.yaml | 19 ---------- 3 files changed, 2 insertions(+), 57 deletions(-) delete mode 100644 release/nightly_tests/chaos_test/test_update_compute_config.py diff --git a/release/nightly_tests/chaos_test/compute_template.yaml b/release/nightly_tests/chaos_test/compute_template.yaml index edd0fba557be..f91504fb6937 100644 --- a/release/nightly_tests/chaos_test/compute_template.yaml +++ b/release/nightly_tests/chaos_test/compute_template.yaml @@ -5,7 +5,7 @@ advanced_configurations_json: IamInstanceProfile: {"Name": "ray-autoscaler-v1"} head_node_type: - name: head-node + name: head_node instance_type: m5.16xlarge resources: cpu: 0 @@ -13,7 +13,7 @@ head_node_type: head: 1 worker_node_types: - - name: worker-node + - name: worker_node instance_type: m5.4xlarge min_workers: 9 max_workers: 9 diff --git a/release/nightly_tests/chaos_test/test_update_compute_config.py b/release/nightly_tests/chaos_test/test_update_compute_config.py deleted file mode 100644 index 14928dcbba56..000000000000 --- a/release/nightly_tests/chaos_test/test_update_compute_config.py +++ /dev/null @@ -1,36 +0,0 @@ -import os -import requests -import anyscale -import time - -cluster_id = os.environ["ANYSCALE_CLUSTER_ID"] -print(f"cluster id: {cluster_id}") - -sdk = anyscale.AnyscaleSDK() -cluster = sdk.get_cluster(cluster_id) -print(f"cluster: {cluster}") - -existing_compute_config = anyscale.compute_config.get( - name="", _id=cluster.result.cluster_compute_id -).config -compute_config_dict = existing_compute_config.to_dict() -print(f"compute config {compute_config_dict}") -compute_config_dict["worker_nodes"][0]["max_nodes"] = 11 -new_compute_config = anyscale.compute_config.models.ComputeConfig.from_dict( - compute_config_dict -) -new_compute_config_name = anyscale.compute_config.create(new_compute_config, name=None) -new_compute_config_id = anyscale.compute_config.get(name=new_compute_config_name).id -print(f"new compute config {new_compute_config_id}") - -response = requests.put( - f"https://console.anyscale-staging.com/api/v2/sessions/{cluster_id}/cluster_config_with_session_idle_timeout", - params={ - "build_id": cluster.result.cluster_environment_build_id, - "compute_template_id": new_compute_config_id, - }, - headers={"Authorization": f"Bearer {os.environ['ANYSCALE_CLI_TOKEN']}"}, -) -print(response) - -time.sleep(1000000) diff --git a/release/release_tests.yaml b/release/release_tests.yaml index 3791415cf0a2..21e28449d195 100644 --- a/release/release_tests.yaml +++ b/release/release_tests.yaml @@ -3737,25 +3737,6 @@ # Core Chaos tests ################## -- name: chaos_update_compute_config - group: core-nightly-test - working_dir: nightly_tests - - frequency: nightly - team: core - cluster: - byod: {} - cluster_compute: chaos_test/compute_template.yaml - - run: - timeout: 3600 - wait_for_nodes: - num_nodes: 10 - script: python chaos_test/test_update_compute_config.py - - variations: - - __suffix__: aws - - name: chaos_many_tasks_kill_raylet group: core-nightly-test working_dir: nightly_tests From 5cac1fec53ec6b7c47d8d7f23f9265ec3daf74f8 Mon Sep 17 00:00:00 2001 From: Jiajun Yao Date: Wed, 11 Dec 2024 13:51:43 -0800 Subject: [PATCH 09/15] up Signed-off-by: Jiajun Yao --- python/ray/_private/test_utils.py | 70 ++++++++++++++-------------- release/nightly_tests/setup_chaos.py | 11 ++--- 2 files changed, 39 insertions(+), 42 deletions(-) diff --git a/python/ray/_private/test_utils.py b/python/ray/_private/test_utils.py index 7bf0de943269..e7416ad20525 100644 --- a/python/ray/_private/test_utils.py +++ b/python/ray/_private/test_utils.py @@ -1506,6 +1506,7 @@ def __init__( head_node_id, kill_interval_s: float = 60, max_to_kill: int = 2, + batch_size_to_kill: int = 1, kill_filter_fn: Optional[Callable] = None, ): self.kill_interval_s = kill_interval_s @@ -1514,6 +1515,7 @@ def __init__( self.killed = set() self.done = ray._private.utils.get_or_create_event_loop().create_future() self.max_to_kill = max_to_kill + self.batch_size_to_kill = batch_size_to_kill self.kill_filter_fn = kill_filter_fn self.kill_immediately_after_found = False # -- logger. -- @@ -1525,7 +1527,7 @@ def ready(self): async def run(self): self.is_running = True while self.is_running: - to_kill = await self._find_resource_to_kill() + to_kills = await self._find_resources_to_kill() if not self.is_running: break @@ -1536,14 +1538,15 @@ async def run(self): sleep_interval = random.random() * self.kill_interval_s time.sleep(sleep_interval) - self._kill_resource(*to_kill) + for to_kill in to_kills: + self._kill_resource(*to_kill) if len(self.killed) >= self.max_to_kill: break await asyncio.sleep(self.kill_interval_s - sleep_interval) self.done.set_result(True) - async def _find_resource_to_kill(self): + async def _find_resources_to_kill(self): raise NotImplementedError def _kill_resource(self, *args): @@ -1561,38 +1564,35 @@ async def get_total_killed(self): class NodeKillerBase(ResourceKillerActor): - async def _find_resource_to_kill(self): - node_to_kill_ip = None - node_to_kill_port = None - node_id = None - while node_to_kill_port is None and self.is_running: - nodes = ray.nodes() - alive_nodes = self._get_alive_nodes(nodes) + async def _find_resources_to_kill(self): + nodes_to_kill = [] + while not nodes_to_kill and self.is_running: + candidates = [ + node + for node in ray.nodes() + if node["Alive"] and (node["NodeID"] != self.head_node_id) + ] if self.kill_filter_fn is not None: - nodes = list(filter(self.kill_filter_fn(), nodes)) - for node in nodes: - node_id = node["NodeID"] - # make sure at least 1 worker node is alive. - if ( - node["Alive"] - and node_id != self.head_node_id - and node_id not in self.killed - and alive_nodes > 2 - ): - node_to_kill_ip = node["NodeManagerAddress"] - node_to_kill_port = node["NodeManagerPort"] - break - # Give the cluster some time to start. - await asyncio.sleep(0.1) + candidates = list(filter(self.kill_filter_fn(), candidates)) - return node_id, node_to_kill_ip, node_to_kill_port + # make sure at least 1 worker node is alive. + if (len(candidates) - 1) <= self.batch_size_to_kill: + # Give the cluster some time to start. + await asyncio.sleep(1) + continue + + for candidate in candidates: + nodes_to_kill.append( + ( + candidate["NodeID"], + candidate["NodeManagerAddress"], + candidate["NodeManagerPort"], + ) + ) + if len(nodes_to_kill) == self.batch_size_to_kill: + break - def _get_alive_nodes(self, nodes): - alive_nodes = 0 - for node in nodes: - if node["Alive"]: - alive_nodes += 1 - return alive_nodes + return nodes_to_kill @ray.remote(num_cpus=0) @@ -1682,7 +1682,7 @@ def __init__( ] ) - async def _find_resource_to_kill(self): + async def _find_resources_to_kill(self): from ray.util.state.common import StateResource process_to_kill_task_id = None @@ -1707,7 +1707,7 @@ async def _find_resource_to_kill(self): # Give the cluster some time to start. await asyncio.sleep(0.1) - return process_to_kill_task_id, process_to_kill_pid, process_to_kill_node_id + return [(process_to_kill_task_id, process_to_kill_pid, process_to_kill_node_id)] def _kill_resource( self, process_to_kill_task_id, process_to_kill_pid, process_to_kill_node_id @@ -1744,6 +1744,7 @@ def get_and_run_resource_killer( lifetime=None, no_start=False, max_to_kill=2, + batch_size_to_kill=1, kill_delay_s=0, kill_filter_fn=None, ): @@ -1762,6 +1763,7 @@ def get_and_run_resource_killer( head_node_id, kill_interval_s=kill_interval_s, max_to_kill=max_to_kill, + batch_size_to_kill=batch_size_to_kill, kill_filter_fn=kill_filter_fn, ) print("Waiting for ResourceKiller to be ready...") diff --git a/release/nightly_tests/setup_chaos.py b/release/nightly_tests/setup_chaos.py index b88c81ed3db2..40d177f88154 100644 --- a/release/nightly_tests/setup_chaos.py +++ b/release/nightly_tests/setup_chaos.py @@ -14,9 +14,6 @@ def parse_script_args(): parser = argparse.ArgumentParser() - # '--kill-workers' to be deprecated in favor of '--chaos' - parser.add_argument("--kill-workers", action="store_true", default=False) - parser.add_argument( "--chaos", type=str, @@ -29,6 +26,7 @@ def parse_script_args(): parser.add_argument("--kill-interval", type=int, default=60) parser.add_argument("--max-to-kill", type=int, default=2) + parser.add_argument("--batch-size-to-kill", type=int, default=1) parser.add_argument( "--no-start", action="store_true", @@ -94,8 +92,6 @@ def _filter_fn(node): def get_chaos_killer(args): if args.chaos != "": chaos_type = args.chaos - elif args.kill_workers: - chaos_type = "KillWorker" else: chaos_type = "KillRaylet" # default @@ -125,12 +121,11 @@ def main(): lifetime="detached", no_start=args.no_start, max_to_kill=args.max_to_kill, + batch_size_to_kill=args.batch_size_to_kill, kill_delay_s=args.kill_delay, kill_filter_fn=kill_filter_fn, ) - print( - f"Successfully deployed a {'worker' if args.kill_workers else 'node'} killer." - ) + print(f"Successfully deployed a {resource_killer_cls} killer.") main() From f1900eabd226816c2be619fb1795c763ea66ff68 Mon Sep 17 00:00:00 2001 From: Jiajun Yao Date: Wed, 11 Dec 2024 14:05:45 -0800 Subject: [PATCH 10/15] up Signed-off-by: Jiajun Yao --- python/ray/_private/test_utils.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/python/ray/_private/test_utils.py b/python/ray/_private/test_utils.py index e7416ad20525..711a70327017 100644 --- a/python/ray/_private/test_utils.py +++ b/python/ray/_private/test_utils.py @@ -1572,16 +1572,17 @@ async def _find_resources_to_kill(self): for node in ray.nodes() if node["Alive"] and (node["NodeID"] != self.head_node_id) ] - if self.kill_filter_fn is not None: + if self.kill_filter_fn: candidates = list(filter(self.kill_filter_fn(), candidates)) - # make sure at least 1 worker node is alive. - if (len(candidates) - 1) <= self.batch_size_to_kill: + # Ensure at least one worker node remains alive. + if len(candidates) < self.batch_size_to_kill + 1: # Give the cluster some time to start. await asyncio.sleep(1) continue - for candidate in candidates: + # Collect nodes to kill, limited by batch size. + for candidate in candidates[: self.batch_size_to_kill]: nodes_to_kill.append( ( candidate["NodeID"], @@ -1589,8 +1590,6 @@ async def _find_resources_to_kill(self): candidate["NodeManagerPort"], ) ) - if len(nodes_to_kill) == self.batch_size_to_kill: - break return nodes_to_kill From fe466771e180b6e5c025850cbb52774f48112f51 Mon Sep 17 00:00:00 2001 From: Jiajun Yao Date: Wed, 11 Dec 2024 14:30:40 -0800 Subject: [PATCH 11/15] up Signed-off-by: Jiajun Yao --- release/nightly_tests/dataset/autoscaling_gpu_compute.yaml | 3 ++- release/release_data_tests.yaml | 6 +++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/release/nightly_tests/dataset/autoscaling_gpu_compute.yaml b/release/nightly_tests/dataset/autoscaling_gpu_compute.yaml index c30c93c22c67..447767ea0b77 100644 --- a/release/nightly_tests/dataset/autoscaling_gpu_compute.yaml +++ b/release/nightly_tests/dataset/autoscaling_gpu_compute.yaml @@ -3,7 +3,8 @@ cloud_id: {{env["ANYSCALE_CLOUD_ID"]}} region: us-west-2 -max_workers: 10 +advanced_configurations_json: + IamInstanceProfile: {"Name": "ray-autoscaler-v1"} head_node_type: name: head_node diff --git a/release/release_data_tests.yaml b/release/release_data_tests.yaml index 93bb9bf9d4a0..feb1695777b2 100644 --- a/release/release_data_tests.yaml +++ b/release/release_data_tests.yaml @@ -238,7 +238,7 @@ run: timeout: 3600 script: > - python dataset/multi_node_train_benchmark.py --num-workers 16 --file-type parquet + python dataset/multi_node_train_benchmark.py --num-workers 16 --file-type parquet --target-worker-gb 50 --use-gpu variations: @@ -246,7 +246,7 @@ - __suffix__: chaos run: prepare: > - python setup_chaos.py --kill-interval 200 --max-to-kill 1 --task-names + python setup_chaos.py --kill-interval 200 --max-to-kill 1 --task-names "_RayTrainWorker__execute.get_next" ######################### @@ -416,7 +416,7 @@ run: timeout: 1800 - prepare: python setup_chaos.py --max-to-kill 2 --kill-delay 30 + prepare: python setup_chaos.py --chaos TerminateEC2Instance --batch-size-to-kill 2 --max-to-kill 6 --kill-delay 30 script: > python dataset/gpu_batch_inference.py --data-directory 300G-image-data-synthetic-raw-parquet --data-format parquet From 5f0f9edb49c15692b4e00350f135acae87bba170 Mon Sep 17 00:00:00 2001 From: Jiajun Yao Date: Wed, 11 Dec 2024 14:43:44 -0800 Subject: [PATCH 12/15] up Signed-off-by: Jiajun Yao --- .../nightly_tests/chaos_test/test_chaos_basic.py | 2 +- .../nightly_tests/dataset/gpu_batch_inference.py | 14 ++++++++++++++ release/release_data_tests.yaml | 2 +- 3 files changed, 16 insertions(+), 2 deletions(-) diff --git a/release/nightly_tests/chaos_test/test_chaos_basic.py b/release/nightly_tests/chaos_test/test_chaos_basic.py index df2329b74565..be28adf9c394 100644 --- a/release/nightly_tests/chaos_test/test_chaos_basic.py +++ b/release/nightly_tests/chaos_test/test_chaos_basic.py @@ -205,7 +205,7 @@ def main(): node_killer.run.remote() workload(total_num_cpus, args.smoke) print(f"Runtime when there are many failures: {time.time() - start}") - print(f"Total node failures: " f"{ray.get(node_killer.get_total_killed.remote())}") + print(f"Total node failures: {ray.get(node_killer.get_total_killed.remote())}") node_killer.stop_run.remote() used_gb, usage = ray.get(monitor_actor.get_peak_memory_info.remote()) print("Memory usage with failures.") diff --git a/release/nightly_tests/dataset/gpu_batch_inference.py b/release/nightly_tests/dataset/gpu_batch_inference.py index 32308a24e1f7..7a23e68a9437 100644 --- a/release/nightly_tests/dataset/gpu_batch_inference.py +++ b/release/nightly_tests/dataset/gpu_batch_inference.py @@ -31,6 +31,11 @@ def parse_args(): action="store_true", default=False, ) + parser.add_argument( + "--chaos-test", + action="store_true", + default=False, + ) return parser.parse_args() @@ -38,6 +43,7 @@ def main(args): data_directory: str = args.data_directory data_format: str = args.data_format smoke_test: bool = args.smoke_test + chaos_test: bool = args.chaos_test data_url = f"s3://anonymous@air-example-data-2/{data_directory}" print(f"Running GPU batch prediction with data from {data_url}") @@ -124,6 +130,14 @@ def __call__(self, batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]: "Throughput w/o metadata fetching (img/sec): ", throughput_without_metadata_fetch, ) + if chaos_test: + resource_killer = ray.get_actor( + "ResourceKiller", namespace="release_test_namespace" + ) + resource_killer.stop_run.remote() + killed = ray.get(resource_killer.get_total_killed.remote()) + assert killed + print(f"Total chaos killed: {killed}") # For structured output integration with internal tooling results = { diff --git a/release/release_data_tests.yaml b/release/release_data_tests.yaml index feb1695777b2..45017ccb1f7a 100644 --- a/release/release_data_tests.yaml +++ b/release/release_data_tests.yaml @@ -419,7 +419,7 @@ prepare: python setup_chaos.py --chaos TerminateEC2Instance --batch-size-to-kill 2 --max-to-kill 6 --kill-delay 30 script: > python dataset/gpu_batch_inference.py - --data-directory 300G-image-data-synthetic-raw-parquet --data-format parquet + --data-directory 300G-image-data-synthetic-raw-parquet --data-format parquet --chaos-test # 10 TB image classification parquet data with autoscaling heterogenous cluster # 10 g4dn.12xlarge, 10 m5.16xlarge From e4603588e7ba697d5cc0012c4e6c17c021a1ddaf Mon Sep 17 00:00:00 2001 From: Jiajun Yao Date: Wed, 11 Dec 2024 15:58:10 -0800 Subject: [PATCH 13/15] up Signed-off-by: Jiajun Yao --- python/ray/_private/test_utils.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/python/ray/_private/test_utils.py b/python/ray/_private/test_utils.py index 711a70327017..47e73da9ed54 100644 --- a/python/ray/_private/test_utils.py +++ b/python/ray/_private/test_utils.py @@ -1570,7 +1570,9 @@ async def _find_resources_to_kill(self): candidates = [ node for node in ray.nodes() - if node["Alive"] and (node["NodeID"] != self.head_node_id) + if node["Alive"] + and (node["NodeID"] != self.head_node_id) + and (node["NodeID"] not in self.killed) ] if self.kill_filter_fn: candidates = list(filter(self.kill_filter_fn(), candidates)) From c3feadb71e01df0dc4b13cd85b363d0c0111e926 Mon Sep 17 00:00:00 2001 From: Jiajun Yao Date: Thu, 12 Dec 2024 09:28:04 -0800 Subject: [PATCH 14/15] fix Signed-off-by: Jiajun Yao --- python/ray/_private/test_utils.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/python/ray/_private/test_utils.py b/python/ray/_private/test_utils.py index 47e73da9ed54..d4309e007dc0 100644 --- a/python/ray/_private/test_utils.py +++ b/python/ray/_private/test_utils.py @@ -1664,9 +1664,16 @@ def __init__( head_node_id, kill_interval_s: float = 60, max_to_kill: int = 2, + batch_size_to_kill: int = 1, kill_filter_fn: Optional[Callable] = None, ): - super().__init__(head_node_id, kill_interval_s, max_to_kill, kill_filter_fn) + super().__init__( + head_node_id, + kill_interval_s, + max_to_kill, + batch_size_to_kill, + kill_filter_fn, + ) # Kill worker immediately so that the task does # not finish successfully on its own. From b2cb5b563c89ba3e3e473ced88b757f30f912434 Mon Sep 17 00:00:00 2001 From: Jiajun Yao Date: Thu, 12 Dec 2024 11:09:56 -0800 Subject: [PATCH 15/15] fix Signed-off-by: Jiajun Yao --- python/ray/_private/test_utils.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/python/ray/_private/test_utils.py b/python/ray/_private/test_utils.py index d4309e007dc0..4e748cc5222c 100644 --- a/python/ray/_private/test_utils.py +++ b/python/ray/_private/test_utils.py @@ -1567,7 +1567,7 @@ class NodeKillerBase(ResourceKillerActor): async def _find_resources_to_kill(self): nodes_to_kill = [] while not nodes_to_kill and self.is_running: - candidates = [ + worker_nodes = [ node for node in ray.nodes() if node["Alive"] @@ -1575,10 +1575,12 @@ async def _find_resources_to_kill(self): and (node["NodeID"] not in self.killed) ] if self.kill_filter_fn: - candidates = list(filter(self.kill_filter_fn(), candidates)) + candidates = list(filter(self.kill_filter_fn(), worker_nodes)) + else: + candidates = worker_nodes # Ensure at least one worker node remains alive. - if len(candidates) < self.batch_size_to_kill + 1: + if len(worker_nodes) < self.batch_size_to_kill + 1: # Give the cluster some time to start. await asyncio.sleep(1) continue