Skip to content

Commit

Permalink
šŸ›Autoscaling: Ensure buffer machine type remains the same over the apā€¦
Browse files Browse the repository at this point in the history
ā€¦p session (ITISFoundation#5451)
  • Loading branch information
sanderegg authored Mar 11, 2024
1 parent b7c59f0 commit 599db22
Show file tree
Hide file tree
Showing 7 changed files with 245 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,7 @@ def _list_tasks(
if task.state in task_state_to_tasks:
task_state_to_tasks[task.state].append(task.key)
else:
task_state_to_tasks[task.state] = task.key
task_state_to_tasks[task.state] = [task.key]

return dict(task_state_to_tasks)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@
associate_ec2_instances_with_nodes,
ec2_startup_script,
find_selected_instance_type_for_task,
get_machine_buffer_type,
node_host_name_from_ec2_private_dns,
sort_drained_nodes,
)
from ..utils.rabbitmq import post_autoscaling_status_message
from .auto_scaling_mode_base import BaseAutoscaling
Expand All @@ -56,7 +58,9 @@ def _node_not_ready(node: Node) -> bool:


async def _analyze_current_cluster(
app: FastAPI, auto_scaling_mode: BaseAutoscaling
app: FastAPI,
auto_scaling_mode: BaseAutoscaling,
allowed_instance_types: list[EC2InstanceType],
) -> Cluster:
app_settings = get_application_settings(app)
assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec
Expand Down Expand Up @@ -99,15 +103,14 @@ async def _analyze_current_cluster(
else:
pending_nodes.append(instance)

drained_nodes, reserve_drained_nodes = sort_drained_nodes(
app_settings, all_drained_nodes, allowed_instance_types
)
cluster = Cluster(
active_nodes=active_nodes,
pending_nodes=pending_nodes,
drained_nodes=all_drained_nodes[
app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MACHINES_BUFFER :
],
reserve_drained_nodes=all_drained_nodes[
: app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MACHINES_BUFFER
],
drained_nodes=drained_nodes,
reserve_drained_nodes=reserve_drained_nodes,
pending_ec2s=[NonAssociatedInstance(ec2_instance=i) for i in pending_ec2s],
terminated_instances=terminated_ec2_instances,
disconnected_nodes=[n for n in docker_nodes if _node_not_ready(n)],
Expand Down Expand Up @@ -149,7 +152,10 @@ async def _cleanup_disconnected_nodes(app: FastAPI, cluster: Cluster) -> Cluster


async def _try_attach_pending_ec2s(
app: FastAPI, cluster: Cluster, auto_scaling_mode: BaseAutoscaling
app: FastAPI,
cluster: Cluster,
auto_scaling_mode: BaseAutoscaling,
allowed_instance_types: list[EC2InstanceType],
) -> Cluster:
"""label the drained instances that connected to the swarm which are missing the monitoring labels"""
new_found_instances: list[AssociatedInstance] = []
Expand Down Expand Up @@ -189,14 +195,13 @@ async def _try_attach_pending_ec2s(
all_drained_nodes = (
cluster.drained_nodes + cluster.reserve_drained_nodes + new_found_instances
)
drained_nodes, reserve_drained_nodes = sort_drained_nodes(
app_settings, all_drained_nodes, allowed_instance_types
)
return dataclasses.replace(
cluster,
drained_nodes=all_drained_nodes[
app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MACHINES_BUFFER :
],
reserve_drained_nodes=all_drained_nodes[
: app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MACHINES_BUFFER
],
drained_nodes=drained_nodes,
reserve_drained_nodes=reserve_drained_nodes,
pending_ec2s=still_pending_ec2s,
)

Expand Down Expand Up @@ -496,7 +501,7 @@ async def _find_needed_instances(
app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MACHINES_BUFFER
- len(cluster.reserve_drained_nodes)
):
default_instance_type = available_ec2_types[0]
default_instance_type = get_machine_buffer_type(available_ec2_types)
num_instances_per_type[default_instance_type] += num_missing_nodes

return num_instances_per_type
Expand Down Expand Up @@ -667,13 +672,12 @@ async def _scale_up_cluster(
cluster: Cluster,
unassigned_tasks: list,
auto_scaling_mode: BaseAutoscaling,
allowed_instance_types: list[EC2InstanceType],
) -> Cluster:
app_settings: ApplicationSettings = app.state.settings
assert app_settings.AUTOSCALING_EC2_ACCESS # nosec
assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec

allowed_instance_types = await sorted_allowed_instance_types(app)

# let's start these
if needed_ec2_instances := await _find_needed_instances(
app, unassigned_tasks, allowed_instance_types, cluster, auto_scaling_mode
Expand Down Expand Up @@ -865,7 +869,10 @@ async def _notify_machine_creation_progress(


async def _autoscale_cluster(
app: FastAPI, cluster: Cluster, auto_scaling_mode: BaseAutoscaling
app: FastAPI,
cluster: Cluster,
auto_scaling_mode: BaseAutoscaling,
allowed_instance_types: list[EC2InstanceType],
) -> Cluster:
# 1. check if we have pending tasks and resolve them by activating some drained nodes
unrunnable_tasks = await auto_scaling_mode.list_unrunnable_tasks(app)
Expand Down Expand Up @@ -893,7 +900,11 @@ async def _autoscale_cluster(
len(queued_or_missing_instance_tasks),
)
cluster = await _scale_up_cluster(
app, cluster, queued_or_missing_instance_tasks, auto_scaling_mode
app,
cluster,
queued_or_missing_instance_tasks,
auto_scaling_mode,
allowed_instance_types,
)

elif (
Expand Down Expand Up @@ -946,10 +957,17 @@ async def auto_scale_cluster(
the additional load.
"""

cluster = await _analyze_current_cluster(app, auto_scaling_mode)
allowed_instance_types = await sorted_allowed_instance_types(app)
cluster = await _analyze_current_cluster(
app, auto_scaling_mode, allowed_instance_types
)
cluster = await _cleanup_disconnected_nodes(app, cluster)
cluster = await _try_attach_pending_ec2s(app, cluster, auto_scaling_mode)
cluster = await _try_attach_pending_ec2s(
app, cluster, auto_scaling_mode, allowed_instance_types
)

cluster = await _autoscale_cluster(app, cluster, auto_scaling_mode)
cluster = await _autoscale_cluster(
app, cluster, auto_scaling_mode, allowed_instance_types
)
await _notify_machine_creation_progress(app, cluster, auto_scaling_mode)
await _notify_autoscaling_status(app, cluster, auto_scaling_mode)
Original file line number Diff line number Diff line change
Expand Up @@ -148,3 +148,35 @@ def find_selected_instance_type_for_task(
raise Ec2InstanceInvalidError(msg=msg)

return selected_instance


def get_machine_buffer_type(
available_ec2_types: list[EC2InstanceType],
) -> EC2InstanceType:
assert len(available_ec2_types) > 0 # nosec
return available_ec2_types[0]


DrainedNodes = list[AssociatedInstance]
BufferDrainedNodes = list[AssociatedInstance]


def sort_drained_nodes(
app_settings: ApplicationSettings,
all_drained_nodes: list[AssociatedInstance],
available_ec2_types: list[EC2InstanceType],
) -> tuple[DrainedNodes, BufferDrainedNodes]:
assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec
# we need to keep in reserve only the drained nodes of the right type
machine_buffer_type = get_machine_buffer_type(available_ec2_types)
# NOTE: we keep only in buffer the drained nodes with the right EC2 type, AND the right amount
buffer_drained_nodes = [
node
for node in all_drained_nodes
if node.ec2_instance.type == machine_buffer_type.name
][: app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MACHINES_BUFFER]
# all the others are "normal" drained nodes and may be terminated at some point
other_drained_nodes = [
node for node in all_drained_nodes if node not in buffer_drained_nodes
]
return (other_drained_nodes, buffer_drained_nodes)
79 changes: 77 additions & 2 deletions services/autoscaling/tests/unit/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,12 @@
import pytest
import simcore_service_autoscaling
from asgi_lifespan import LifespanManager
from aws_library.ec2.models import EC2InstanceBootSpecific, EC2InstanceData
from aws_library.ec2.models import (
EC2InstanceBootSpecific,
EC2InstanceData,
EC2InstanceType,
Resources,
)
from deepdiff import DeepDiff
from faker import Faker
from fakeredis.aioredis import FakeRedis
Expand Down Expand Up @@ -49,7 +54,11 @@
ApplicationSettings,
EC2Settings,
)
from simcore_service_autoscaling.models import Cluster, DaskTaskResources
from simcore_service_autoscaling.models import (
AssociatedInstance,
Cluster,
DaskTaskResources,
)
from simcore_service_autoscaling.modules.docker import AutoscalingDocker
from simcore_service_autoscaling.modules.ec2 import SimcoreEC2API
from simcore_service_autoscaling.utils.utils_docker import (
Expand Down Expand Up @@ -767,3 +776,69 @@ async def _change_parameters(*args, **kwargs) -> list[EC2InstanceData]:
autospec=True,
side_effect=_change_parameters,
)


@pytest.fixture
def random_fake_available_instances(faker: Faker) -> list[EC2InstanceType]:
list_of_instances = [
EC2InstanceType(
name=faker.pystr(),
resources=Resources(cpus=n, ram=ByteSize(n)),
)
for n in range(1, 30)
]
random.shuffle(list_of_instances)
return list_of_instances


@pytest.fixture
def create_associated_instance(
fake_ec2_instance_data: Callable[..., EC2InstanceData],
app_settings: ApplicationSettings,
faker: Faker,
host_cpu_count: int,
host_memory_total: ByteSize,
) -> Callable[[DockerNode, bool, dict[str, Any]], AssociatedInstance]:
def _creator(
node: DockerNode,
terminateable_time: bool,
fake_ec2_instance_data_override: dict[str, Any] | None = None,
) -> AssociatedInstance:
assert app_settings.AUTOSCALING_EC2_INSTANCES
assert (
datetime.timedelta(seconds=10)
< app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_TIME_BEFORE_TERMINATION
), "this tests relies on the fact that the time before termination is above 10 seconds"
assert app_settings.AUTOSCALING_EC2_INSTANCES
seconds_delta = (
-datetime.timedelta(seconds=10)
if terminateable_time
else datetime.timedelta(seconds=10)
)

if fake_ec2_instance_data_override is None:
fake_ec2_instance_data_override = {}

return AssociatedInstance(
node=node,
ec2_instance=fake_ec2_instance_data(
launch_time=datetime.datetime.now(datetime.timezone.utc)
- app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_TIME_BEFORE_TERMINATION
- datetime.timedelta(
days=faker.pyint(min_value=0, max_value=100),
hours=faker.pyint(min_value=0, max_value=100),
)
+ seconds_delta,
resources=Resources(cpus=host_cpu_count, ram=host_memory_total),
**fake_ec2_instance_data_override,
),
)

return _creator


@pytest.fixture
def mock_machines_buffer(monkeypatch: pytest.MonkeyPatch) -> int:
num_machines_in_buffer = 5
monkeypatch.setenv("EC2_INSTANCES_MACHINES_BUFFER", f"{num_machines_in_buffer}")
return num_machines_in_buffer
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import arrow
import pytest
from aws_library.ec2.models import EC2InstanceData, Resources
from faker import Faker
from fastapi import FastAPI
from models_library.docker import (
DOCKER_TASK_EC2_INSTANCE_TYPE_PLACEMENT_CONSTRAINT_KEY,
Expand Down Expand Up @@ -117,13 +116,6 @@ def mock_compute_node_used_resources(mocker: MockerFixture) -> mock.Mock:
)


@pytest.fixture
def mock_machines_buffer(monkeypatch: pytest.MonkeyPatch) -> int:
num_machines_in_buffer = 5
monkeypatch.setenv("EC2_INSTANCES_MACHINES_BUFFER", f"{num_machines_in_buffer}")
return num_machines_in_buffer


@pytest.fixture
def with_valid_time_before_termination(
monkeypatch: pytest.MonkeyPatch,
Expand Down Expand Up @@ -1043,43 +1035,6 @@ async def test__find_terminateable_nodes_with_no_hosts(
assert await _find_terminateable_instances(initialized_app, active_cluster) == []


@pytest.fixture
def create_associated_instance(
fake_ec2_instance_data: Callable[..., EC2InstanceData],
app_settings: ApplicationSettings,
faker: Faker,
host_cpu_count: int,
host_memory_total: ByteSize,
) -> Callable[[Node, bool], AssociatedInstance]:
def _creator(node: Node, terminateable_time: bool) -> AssociatedInstance:
assert app_settings.AUTOSCALING_EC2_INSTANCES
assert (
datetime.timedelta(seconds=10)
< app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_TIME_BEFORE_TERMINATION
), "this tests relies on the fact that the time before termination is above 10 seconds"
assert app_settings.AUTOSCALING_EC2_INSTANCES
seconds_delta = (
-datetime.timedelta(seconds=10)
if terminateable_time
else datetime.timedelta(seconds=10)
)
return AssociatedInstance(
node=node,
ec2_instance=fake_ec2_instance_data(
launch_time=datetime.datetime.now(datetime.timezone.utc)
- app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_TIME_BEFORE_TERMINATION
- datetime.timedelta(
days=faker.pyint(min_value=0, max_value=100),
hours=faker.pyint(min_value=0, max_value=100),
)
+ seconds_delta,
resources=Resources(cpus=host_cpu_count, ram=host_memory_total),
),
)

return _creator


async def test__try_scale_down_cluster_with_no_nodes(
minimal_configuration: None,
with_valid_time_before_termination: datetime.timedelta,
Expand Down
Loading

0 comments on commit 599db22

Please sign in to comment.