Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Serve] Faster bulk imperative Serve Application deploys #49168

Open
wants to merge 50 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 42 commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
d8c1a17
add bulk version of serve.run
JoshKarpel Dec 9, 2024
cc9ac90
do not wait for deployments to be created before returning
JoshKarpel Dec 9, 2024
32206ab
fix import
JoshKarpel Dec 9, 2024
aa332e8
fix call
JoshKarpel Dec 9, 2024
bd5c479
bring back checkpointing for single changes
JoshKarpel Dec 9, 2024
04f9a37
fix blocking behavior in public API
JoshKarpel Dec 10, 2024
0465f8d
restore wait, fix API tags
JoshKarpel Dec 10, 2024
4e10d33
only wait for deployment creation if blocking
JoshKarpel Dec 10, 2024
b462413
fix import
JoshKarpel Dec 10, 2024
c5e94ad
no *args
JoshKarpel Dec 10, 2024
3786023
make route_prefix default to None in build_app
JoshKarpel Dec 11, 2024
2ab44de
guess we have to wait
JoshKarpel Dec 11, 2024
c4fabcc
ah, blocking should not be passed down
JoshKarpel Dec 11, 2024
d43f98a
fine-grained control of blocking
JoshKarpel Dec 11, 2024
48a6326
Merge branch 'master' into batched-serve-run
JoshKarpel Dec 11, 2024
74e0992
docs
JoshKarpel Dec 11, 2024
6e87175
fix local testing mode
JoshKarpel Dec 11, 2024
acc9fbf
fix docstring
JoshKarpel Dec 11, 2024
6e987b0
rename
JoshKarpel Dec 12, 2024
51cda35
tidy up diff
JoshKarpel Dec 12, 2024
c5eb119
optimize checking for existing prefixes
JoshKarpel Dec 12, 2024
357113a
allow redeploying app with same prefix if same name
JoshKarpel Dec 12, 2024
537b696
woops
JoshKarpel Dec 12, 2024
827b26e
add tests
JoshKarpel Dec 12, 2024
471d085
Merge branch 'master' into batched-serve-run
JoshKarpel Dec 12, 2024
ad6c34e
also need to defer deployment state checkpointing
JoshKarpel Dec 13, 2024
2ab4d03
remove one more writeahead
JoshKarpel Dec 13, 2024
10c9fd9
move checkpointing further up
JoshKarpel Dec 13, 2024
6726c13
pull checkpoint up to top-level only for application and deployment s…
JoshKarpel Dec 16, 2024
1c9ffb8
remove more granular checkpoints
JoshKarpel Dec 16, 2024
19715e8
remove more granular checkpoints
JoshKarpel Dec 16, 2024
aaadf3b
remove more granular checkpoints
JoshKarpel Dec 16, 2024
f93f6c5
Merge branch 'master' into batched-serve-run
JoshKarpel Dec 16, 2024
3549013
Merge branch 'master' into batched-serve-run
JoshKarpel Dec 17, 2024
47c10d1
literals
JoshKarpel Dec 16, 2024
2a42e89
fix test and stop writing checkpoints once shutdown is called
JoshKarpel Dec 17, 2024
087f9e6
Merge branch 'master' into batched-serve-run
JoshKarpel Dec 18, 2024
ec4525d
Merge branch 'master' into batched-serve-run
JoshKarpel Jan 6, 2025
71cb770
mark new functions as developer APIs and remove from docs
JoshKarpel Jan 6, 2025
db0ef46
Merge branch 'master' into batched-serve-run
JoshKarpel Jan 7, 2025
434748f
Merge branch 'master' into batched-serve-run
JoshKarpel Jan 13, 2025
12fa28f
Merge branch 'master' into batched-serve-run
JoshKarpel Jan 14, 2025
d47b31f
add local testing mode tests
JoshKarpel Jan 21, 2025
5317cc6
Merge branch 'master' into batched-serve-run
JoshKarpel Jan 21, 2025
a99d60d
increase timeout to deflake test
JoshKarpel Jan 21, 2025
8c1609b
Merge branch 'master' into batched-serve-run
JoshKarpel Jan 22, 2025
d1a0cff
Merge branch 'master' into batched-serve-run
JoshKarpel Jan 22, 2025
7bb938d
remove unused fixture
JoshKarpel Jan 22, 2025
3989035
Merge branch 'master' into batched-serve-run
JoshKarpel Jan 22, 2025
646d715
Merge branch 'master' into batched-serve-run
JoshKarpel Jan 23, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions python/ray/serve/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
from ray.serve.api import (
Application,
Deployment,
RunTarget,
_run,
_run_many,
delete,
deployment,
get_app_handle,
Expand All @@ -15,6 +17,7 @@
ingress,
multiplexed,
run,
run_many,
shutdown,
start,
status,
Expand All @@ -38,6 +41,7 @@

__all__ = [
"_run",
"_run_many",
"batch",
"start",
"HTTPOptions",
Expand All @@ -46,6 +50,8 @@
"ingress",
"deployment",
"run",
"run_many",
"RunTarget",
"delete",
"Application",
"Deployment",
Expand Down
127 changes: 63 additions & 64 deletions python/ray/serve/_private/application_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from copy import deepcopy
from dataclasses import asdict, dataclass, field
from enum import Enum
from typing import Callable, Dict, List, Optional, Tuple
from typing import Dict, List, Optional, Tuple

import ray
from ray import cloudpickle
Expand Down Expand Up @@ -222,7 +222,6 @@ def __init__(
name: str,
deployment_state_manager: DeploymentStateManager,
endpoint_state: EndpointState,
save_checkpoint_func: Callable,
logging_config: LoggingConfig,
):
"""
Expand All @@ -231,11 +230,6 @@ def __init__(
deployment_state_manager: State manager for all deployments
in the cluster.
endpoint_state: State manager for endpoints in the system.
save_checkpoint_func: Function that can be called to write
a checkpoint of the application state. This should be
called in self._set_target_state() before actually
setting the target state so that the controller can
properly recover application states if it crashes.
"""

self._name = name
Expand All @@ -261,7 +255,6 @@ def __init__(
deleting=False,
api_type=APIType.UNKNOWN,
)
self._save_checkpoint_func = save_checkpoint_func
self._logging_config = logging_config

@property
Expand Down Expand Up @@ -364,11 +357,6 @@ def _set_target_state(
api_type=api_type,
)

# Checkpoint ahead, so that if the controller crashes before we
# write to the target state, the target state will be recovered
# after the controller recovers
self._save_checkpoint_func(writeahead_checkpoints={self._name: target_state})
# Set target state
self._target_state = target_state

def _set_target_state_deleting(self):
Expand All @@ -377,7 +365,7 @@ def _set_target_state_deleting(self):
Wipes the target deployment infos, code version, and config.
"""
self._set_target_state(
deployment_infos=dict(),
deployment_infos={},
api_type=self._target_state.api_type,
code_version=None,
target_config=None,
Expand Down Expand Up @@ -889,7 +877,10 @@ def __init__(
self._endpoint_state = endpoint_state
self._kv_store = kv_store
self._logging_config = logging_config
self._application_states: Dict[str, ApplicationState] = dict()

self._shutting_down = False

self._application_states: Dict[str, ApplicationState] = {}
self._recover_from_checkpoint()

def _recover_from_checkpoint(self):
Expand All @@ -902,7 +893,6 @@ def _recover_from_checkpoint(self):
app_name,
self._deployment_state_manager,
self._endpoint_state,
self._save_checkpoint_func,
self._logging_config,
)
app_state.recover_target_state_from_checkpoint(checkpoint_data)
Expand All @@ -914,6 +904,53 @@ def delete_app(self, name: str) -> None:
return
self._application_states[name].delete()

def deploy_apps(self, name_to_deployment_args: Dict[str, List[Dict]]) -> None:
live_route_prefixes: Dict[str, str] = {
app_state.route_prefix: app_name
for app_name, app_state in self._application_states.items()
if app_state.route_prefix is not None
and not app_state.status == ApplicationStatus.DELETING
}
Comment on lines +908 to +913
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This, along with https://github.com/ray-project/ray/pull/49168/files#diff-31975e75fad7092f05d8d5759dfeb90d8b454fc4193e4970b55914dbb87ad968R942-R946 , is an actual change that isn't just looping over existing code. For efficiency, creating this mapping is pulled up a level, and the mapping is updated as we go through all the apps we're deploying. Otherwise we'd need to loop over all existing apps for each new app in the batch.


for name, deployment_args in name_to_deployment_args.items():
for deploy_param in deployment_args:
# Make sure route_prefix is not being used by other application.
deploy_app_prefix = deploy_param.get("route_prefix")
if deploy_app_prefix is None:
continue

existing_app_name = live_route_prefixes.get(deploy_app_prefix)
# It's ok to redeploy an app with the same prefix
# if it has the same name as the app already using that prefix.
if existing_app_name is not None and existing_app_name != name:
raise RayServeException(
f"Prefix {deploy_app_prefix} is being used by application "
f'"{existing_app_name}". Failed to deploy application "{name}".'
)

# We might be deploying more than one app,
# so we need to add this app's prefix to the
# set of live route prefixes that we're checking
# against during this batch operation.
live_route_prefixes[deploy_app_prefix] = name

if name not in self._application_states:
self._application_states[name] = ApplicationState(
name,
self._deployment_state_manager,
self._endpoint_state,
self._logging_config,
)
ServeUsageTag.NUM_APPS.record(str(len(self._application_states)))

deployment_infos = {
params["deployment_name"]: deploy_args_to_deployment_info(
**params, app_name=name
)
for params in deployment_args
}
self._application_states[name].deploy_app(deployment_infos)

def deploy_app(self, name: str, deployment_args: List[Dict]) -> None:
"""Deploy the specified app to the list of deployment arguments.

Expand All @@ -928,45 +965,7 @@ def deploy_app(self, name: str, deployment_args: List[Dict]) -> None:
RayServeException: If the list of deployments is trying to
use a route prefix that is already used by another application
"""

# Make sure route_prefix is not being used by other application.
live_route_prefixes: Dict[str, str] = {
app_state.route_prefix: app_name
for app_name, app_state in self._application_states.items()
if app_state.route_prefix is not None
and not app_state.status == ApplicationStatus.DELETING
and name != app_name
}

for deploy_param in deployment_args:
deploy_app_prefix = deploy_param.get("route_prefix", None)
if deploy_app_prefix is None:
continue

app_name = live_route_prefixes.get(deploy_app_prefix)
if app_name is not None:
raise RayServeException(
f"Prefix {deploy_app_prefix} is being used by application "
f'"{app_name}". Failed to deploy application "{name}".'
)

if name not in self._application_states:
self._application_states[name] = ApplicationState(
name,
self._deployment_state_manager,
self._endpoint_state,
self._save_checkpoint_func,
self._logging_config,
)
ServeUsageTag.NUM_APPS.record(str(len(self._application_states)))

deployment_infos = {
params["deployment_name"]: deploy_args_to_deployment_info(
**params, app_name=name
)
for params in deployment_args
}
self._application_states[name].deploy_app(deployment_infos)
self.deploy_apps({name: deployment_args})

def apply_app_configs(
self,
Expand All @@ -990,7 +989,6 @@ def apply_app_configs(
app_config.name,
self._deployment_state_manager,
endpoint_state=self._endpoint_state,
save_checkpoint_func=self._save_checkpoint_func,
logging_config=self._logging_config,
)

Expand Down Expand Up @@ -1084,6 +1082,8 @@ def update(self):
ServeUsageTag.NUM_APPS.record(str(len(self._application_states)))

def shutdown(self) -> None:
self._shutting_down = True

for app_state in self._application_states.values():
app_state.delete()

Expand All @@ -1095,23 +1095,22 @@ def is_ready_for_shutdown(self) -> bool:
Iterate through all application states and check if all their applications
are deleted.
"""
return all(
return self._shutting_down and all(
app_state.is_deleted() for app_state in self._application_states.values()
)

def _save_checkpoint_func(
self, *, writeahead_checkpoints: Optional[Dict[str, ApplicationTargetState]]
) -> None:
def save_checkpoint(self) -> None:
"""Write a checkpoint of all application states."""
if self._shutting_down:
# Once we're told to shut down, stop writing checkpoints.
# Calling .shutdown() deletes any existing checkpoint.
return

application_state_info = {
app_name: app_state.get_checkpoint_data()
for app_name, app_state in self._application_states.items()
}

if writeahead_checkpoints is not None:
application_state_info.update(writeahead_checkpoints)
Comment on lines -1112 to -1113
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed the writeahead_checkpoints functionality after discussing with @edoakes - it sounds this is vestigial from when the Controller was much more async, and there's no functional reason to do it now (the checkpoint can happen after setting the target state, as long as no changes are made to the Ray cluster between them).


self._kv_store.put(
CHECKPOINT_KEY,
cloudpickle.dumps(application_state_info),
Expand Down Expand Up @@ -1254,7 +1253,7 @@ def override_deployment_info(
ServeUsageTag.AUTO_NUM_REPLICAS_USED.record("1")

# What to pass to info.update
override_options = dict()
override_options = {}

# Merge app-level and deployment-level runtime_envs.
replica_config = info.replica_config
Expand Down
9 changes: 8 additions & 1 deletion python/ray/serve/_private/build_app.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import logging
from copy import deepcopy
from dataclasses import dataclass
from typing import Any, Callable, Dict, Generic, List, Optional, TypeVar
from typing import Any, Callable, Dict, Generic, List, Optional, TypeVar, Union

from ray.dag.py_obj_scanner import _PyObjScanner
from ray.serve._private.constants import SERVE_LOGGER_NAME
from ray.serve.deployment import Application, Deployment
from ray.serve.handle import DeploymentHandle
from ray.serve.schema import LoggingConfig

logger = logging.getLogger(SERVE_LOGGER_NAME)

Expand Down Expand Up @@ -46,6 +47,8 @@ def __contains__(self, key: object):
class BuiltApplication:
# Name of the application.
name: str
route_prefix: Optional[str]
logging_config: Optional[LoggingConfig]
Comment on lines +50 to +51
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are now part of the BuiltApplication to make it easier to group things up without adding an extra intermediate data structure (that would presumably have a BuiltApplication + route_prefix + logging_config).

# Name of the application's 'ingress' deployment
# (the one exposed over gRPC/HTTP/handle).
ingress_deployment_name: str
Expand All @@ -69,6 +72,8 @@ def build_app(
app: Application,
*,
name: str,
route_prefix: Optional[str] = None,
logging_config: Optional[Union[Dict, LoggingConfig]] = None,
default_runtime_env: Optional[Dict[str, Any]] = None,
make_deployment_handle: Optional[
Callable[[Deployment, str], DeploymentHandle]
Expand Down Expand Up @@ -99,6 +104,8 @@ def build_app(
)
return BuiltApplication(
name=name,
route_prefix=route_prefix,
logging_config=logging_config,
ingress_deployment_name=app._bound_deployment.name,
deployments=deployments,
deployment_handles={
Expand Down
Loading
Loading