Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Release] Introduce --batch-size-to-kill to release chaos test framework #48765

Merged
merged 17 commits into from
Dec 12, 2024
73 changes: 38 additions & 35 deletions python/ray/_private/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1506,6 +1506,7 @@ def __init__(
head_node_id,
kill_interval_s: float = 60,
max_to_kill: int = 2,
batch_size_to_kill: int = 1,
kill_filter_fn: Optional[Callable] = None,
):
self.kill_interval_s = kill_interval_s
Expand All @@ -1514,6 +1515,7 @@ def __init__(
self.killed = set()
self.done = ray._private.utils.get_or_create_event_loop().create_future()
self.max_to_kill = max_to_kill
self.batch_size_to_kill = batch_size_to_kill
self.kill_filter_fn = kill_filter_fn
self.kill_immediately_after_found = False
# -- logger. --
Expand All @@ -1525,7 +1527,7 @@ def ready(self):
async def run(self):
self.is_running = True
while self.is_running:
to_kill = await self._find_resource_to_kill()
to_kills = await self._find_resources_to_kill()

if not self.is_running:
break
Expand All @@ -1536,14 +1538,15 @@ async def run(self):
sleep_interval = random.random() * self.kill_interval_s
time.sleep(sleep_interval)

self._kill_resource(*to_kill)
for to_kill in to_kills:
self._kill_resource(*to_kill)
if len(self.killed) >= self.max_to_kill:
break
await asyncio.sleep(self.kill_interval_s - sleep_interval)

self.done.set_result(True)

async def _find_resource_to_kill(self):
async def _find_resources_to_kill(self):
raise NotImplementedError

def _kill_resource(self, *args):
Expand All @@ -1561,38 +1564,36 @@ async def get_total_killed(self):


class NodeKillerBase(ResourceKillerActor):
async def _find_resource_to_kill(self):
node_to_kill_ip = None
node_to_kill_port = None
node_id = None
while node_to_kill_port is None and self.is_running:
nodes = ray.nodes()
alive_nodes = self._get_alive_nodes(nodes)
if self.kill_filter_fn is not None:
nodes = list(filter(self.kill_filter_fn(), nodes))
for node in nodes:
node_id = node["NodeID"]
# make sure at least 1 worker node is alive.
if (
node["Alive"]
and node_id != self.head_node_id
and node_id not in self.killed
and alive_nodes > 2
):
node_to_kill_ip = node["NodeManagerAddress"]
node_to_kill_port = node["NodeManagerPort"]
break
# Give the cluster some time to start.
await asyncio.sleep(0.1)
async def _find_resources_to_kill(self):
nodes_to_kill = []
while not nodes_to_kill and self.is_running:
candidates = [
node
for node in ray.nodes()
if node["Alive"]
and (node["NodeID"] != self.head_node_id)
and (node["NodeID"] not in self.killed)
]
if self.kill_filter_fn:
candidates = list(filter(self.kill_filter_fn(), candidates))

return node_id, node_to_kill_ip, node_to_kill_port
# Ensure at least one worker node remains alive.
if len(candidates) < self.batch_size_to_kill + 1:
# Give the cluster some time to start.
await asyncio.sleep(1)
continue

# Collect nodes to kill, limited by batch size.
for candidate in candidates[: self.batch_size_to_kill]:
nodes_to_kill.append(
(
candidate["NodeID"],
candidate["NodeManagerAddress"],
candidate["NodeManagerPort"],
)
)

def _get_alive_nodes(self, nodes):
alive_nodes = 0
for node in nodes:
if node["Alive"]:
alive_nodes += 1
return alive_nodes
return nodes_to_kill


@ray.remote(num_cpus=0)
Expand Down Expand Up @@ -1682,7 +1683,7 @@ def __init__(
]
)

async def _find_resource_to_kill(self):
async def _find_resources_to_kill(self):
from ray.util.state.common import StateResource

process_to_kill_task_id = None
Expand All @@ -1707,7 +1708,7 @@ async def _find_resource_to_kill(self):
# Give the cluster some time to start.
await asyncio.sleep(0.1)

return process_to_kill_task_id, process_to_kill_pid, process_to_kill_node_id
return [(process_to_kill_task_id, process_to_kill_pid, process_to_kill_node_id)]

def _kill_resource(
self, process_to_kill_task_id, process_to_kill_pid, process_to_kill_node_id
Expand Down Expand Up @@ -1744,6 +1745,7 @@ def get_and_run_resource_killer(
lifetime=None,
no_start=False,
max_to_kill=2,
batch_size_to_kill=1,
kill_delay_s=0,
kill_filter_fn=None,
):
Expand All @@ -1762,6 +1764,7 @@ def get_and_run_resource_killer(
head_node_id,
kill_interval_s=kill_interval_s,
max_to_kill=max_to_kill,
batch_size_to_kill=batch_size_to_kill,
kill_filter_fn=kill_filter_fn,
)
print("Waiting for ResourceKiller to be ready...")
Expand Down
2 changes: 1 addition & 1 deletion release/nightly_tests/chaos_test/test_chaos_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ def main():
node_killer.run.remote()
workload(total_num_cpus, args.smoke)
print(f"Runtime when there are many failures: {time.time() - start}")
print(f"Total node failures: " f"{ray.get(node_killer.get_total_killed.remote())}")
print(f"Total node failures: {ray.get(node_killer.get_total_killed.remote())}")
node_killer.stop_run.remote()
used_gb, usage = ray.get(monitor_actor.get_peak_memory_info.remote())
print("Memory usage with failures.")
Expand Down
3 changes: 2 additions & 1 deletion release/nightly_tests/dataset/autoscaling_gpu_compute.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
cloud_id: {{env["ANYSCALE_CLOUD_ID"]}}
region: us-west-2

max_workers: 10
advanced_configurations_json:
IamInstanceProfile: {"Name": "ray-autoscaler-v1"}
bveeramani marked this conversation as resolved.
Show resolved Hide resolved

head_node_type:
name: head_node
Expand Down
14 changes: 14 additions & 0 deletions release/nightly_tests/dataset/gpu_batch_inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,19 @@ def parse_args():
action="store_true",
default=False,
)
parser.add_argument(
"--chaos-test",
action="store_true",
default=False,
)
return parser.parse_args()


def main(args):
data_directory: str = args.data_directory
data_format: str = args.data_format
smoke_test: bool = args.smoke_test
chaos_test: bool = args.chaos_test
data_url = f"s3://anonymous@air-example-data-2/{data_directory}"

print(f"Running GPU batch prediction with data from {data_url}")
Expand Down Expand Up @@ -124,6 +130,14 @@ def __call__(self, batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
"Throughput w/o metadata fetching (img/sec): ",
throughput_without_metadata_fetch,
)
if chaos_test:
resource_killer = ray.get_actor(
"ResourceKiller", namespace="release_test_namespace"
)
resource_killer.stop_run.remote()
killed = ray.get(resource_killer.get_total_killed.remote())
assert killed
print(f"Total chaos killed: {killed}")

# For structured output integration with internal tooling
results = {
Expand Down
11 changes: 3 additions & 8 deletions release/nightly_tests/setup_chaos.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,6 @@
def parse_script_args():
parser = argparse.ArgumentParser()

# '--kill-workers' to be deprecated in favor of '--chaos'
parser.add_argument("--kill-workers", action="store_true", default=False)

parser.add_argument(
"--chaos",
type=str,
Expand All @@ -29,6 +26,7 @@ def parse_script_args():

parser.add_argument("--kill-interval", type=int, default=60)
parser.add_argument("--max-to-kill", type=int, default=2)
parser.add_argument("--batch-size-to-kill", type=int, default=1)
parser.add_argument(
"--no-start",
action="store_true",
Expand Down Expand Up @@ -94,8 +92,6 @@ def _filter_fn(node):
def get_chaos_killer(args):
if args.chaos != "":
chaos_type = args.chaos
elif args.kill_workers:
chaos_type = "KillWorker"
else:
chaos_type = "KillRaylet" # default

Expand Down Expand Up @@ -125,12 +121,11 @@ def main():
lifetime="detached",
no_start=args.no_start,
max_to_kill=args.max_to_kill,
batch_size_to_kill=args.batch_size_to_kill,
kill_delay_s=args.kill_delay,
kill_filter_fn=kill_filter_fn,
)
print(
f"Successfully deployed a {'worker' if args.kill_workers else 'node'} killer."
)
print(f"Successfully deployed a {resource_killer_cls} killer.")


main()
8 changes: 4 additions & 4 deletions release/release_data_tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -238,15 +238,15 @@
run:
timeout: 3600
script: >
python dataset/multi_node_train_benchmark.py --num-workers 16 --file-type parquet
python dataset/multi_node_train_benchmark.py --num-workers 16 --file-type parquet
--target-worker-gb 50 --use-gpu

variations:
- __suffix__: regular
- __suffix__: chaos
run:
prepare: >
python setup_chaos.py --kill-interval 200 --max-to-kill 1 --task-names
python setup_chaos.py --kill-interval 200 --max-to-kill 1 --task-names
"_RayTrainWorker__execute.get_next"

#########################
Expand Down Expand Up @@ -416,10 +416,10 @@

run:
timeout: 1800
prepare: python setup_chaos.py --max-to-kill 2 --kill-delay 30
prepare: python setup_chaos.py --chaos TerminateEC2Instance --batch-size-to-kill 2 --max-to-kill 6 --kill-delay 30
script: >
python dataset/gpu_batch_inference.py
--data-directory 300G-image-data-synthetic-raw-parquet --data-format parquet
--data-directory 300G-image-data-synthetic-raw-parquet --data-format parquet --chaos-test

# 10 TB image classification parquet data with autoscaling heterogenous cluster
# 10 g4dn.12xlarge, 10 m5.16xlarge
Expand Down
Loading