Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Serve] Faster bulk imperative Serve Application deploys #49168

Open
wants to merge 45 commits into
base: master
Choose a base branch
from

Conversation

JoshKarpel
Copy link
Contributor

@JoshKarpel JoshKarpel commented Dec 9, 2024

Why are these changes needed?

Our pattern of using Ray Serve has us deploying many hundreds/thousands of apps using the imperative API (serve.run). This ends up being very slow because the Controller needs to checkpoint as part of every RPC. It would be significantly more efficient to batch the deploys so that we can checkpoint fewer times.

This PR adds a new serve.run_many() public API, marked as developer-only, that can submit many applications to the Serve Controller in one RPC, with just a single checkpoint being saved after all of those applications are registered. The entire existing code path (including serve.run()) is refactored to be bulk operations under the hood (serve.run() calls serve.run_many()).

To further help with our particular use case, where the applications are being deployed from a controller that doesn't care about waiting for e.g. ingress deployment creation, the new code path also has fine-grained control over which things are waited for.


Just introducing a batch API isn't sufficient to actually provide a meaningful speedup. As mentioned above, the thing that is slow is the checkpointing, and right now, the checkpointing is very granular: the various stateful components checkpoint themselves at the bottom of the call stack, so even a single RPC might cause them to checkpoint multiple times right now.

Below I've tried to map out all the reasons that the Application/DeploymentStateManagers might checkpoint:

graph TD;
    deployment_state_set_target_state[DeploymentState._set_target_state] --> dsm_checkpoint[DeploymentStateManager._save_checkpoint_func]
    deployment_state_deploy[DeploymentState.deploy] --> deployment_state_set_target_state
    deployment_state_manager_deploy[DeploymentStateManager.deploy] --> deployment_state_deploy
    application_state_apply_deployment_info[ApplicationState.apply_deployment_info] --> deployment_state_manager_deploy
    application_state_reconcile_target_deployments[ApplicationState._reconcile_target_deployments] --x application_state_apply_deployment_info
    application_state_update[ApplicationState.update] --> application_state_reconcile_target_deployments
    application_state_manager_update[ApplicationStateManager.update] --x application_state_update
    serve_controller_run_control_loop[ServeController.run_control_loop] --> application_state_manager_update
    


    deployment_state_set_target_state_deleting[DeploymentState._set_target_state_deleting] --> dsm_checkpoint
    deployment_state_delete[DeploymentState.delete] --> deployment_state_set_target_state_deleting
    deployment_state_manager_delete_deployment[DeploymentStateManager.delete_deployment] --> deployment_state_delete
    application_state_delete_deployment[ApplicationState._delete_deployment] --> deployment_state_manager_delete_deployment
    application_state_reconcile_target_deployments --> application_state_delete_deployment

    deployment_state_autoscale[DeploymentState.autoscale] --> deployment_state_set_target_state
    deployment_state_manager_update[DeploymentStateManager.update] --> deployment_state_autoscale
    serve_controller_run_control_loop --> deployment_state_manager_update

    as_set_target_state[ApplicationState._set_target_state] --> asm_checkpoint[ApplicationStateManager._save_checkpoint_func]
 
 
 
 
as_recover_target_state_from_checkpoint[ApplicationState.recover_target_state_from_checkpoint] --> as_set_target_state
    asm_recover_from_checkpoint[ApplicationStateManager._recover_from_checkpoint] --> as_recover_target_state_from_checkpoint
    asm_init[ApplicationStateManager.__init__] --> asm_recover_from_checkpoint
    sc_init[ServeController.__init__] --> asm_init

    as_set_target_state_deleting[ApplicationState._set_target_state_deleting] --> as_set_target_state
    as_delete[ApplicationState.delete] --> as_set_target_state_deleting
    asm_delete_app[ApplicationStateManager.delete_app] --> as_delete
    sc_delete_apps[ServeController.delete_apps] --x asm_delete_app
    RPC --> sc_delete_apps

    as_clear_target_state_and_store_config[ApplicationState._clear_target_state_and_store_config] --> as_set_target_state
    as_apply_app_config[ApplicationState.apply_app_config] --> as_clear_target_state_and_store_config
    asm_apply_app_configs[ApplicationStateManager.apply_app_configs] --x as_apply_app_config
    sc_apply_config[ServeController.apply_config] --> asm_apply_app_configs
    RPC --> sc_apply_config


    as_deploy_app[ApplicationState.deploy_app] --> as_set_target_state
    asm_deploy_app[ApplicationStateManager.deploy_app] --> as_deploy_app
    sc_deploy_application[ServeController.deploy_application] --> asm_deploy_app
    RPC --> sc_deploy_application

    as_apply_app_config --> as_set_target_state
Loading

So, in addition to the batch API that the client sees, I've refactored where these checkpoints are done so that they happen at the top of those call stacks instead of at the bottom.

  • We still checkpoint before (now just before) returning an RPC that mutates state.
  • We still checkpoint after making any changes to internal state and before issuing any commands to the cluster to e.g. start/stop replicas (just not immediately after making the internal state change).

I did not change the EndpointState's checkpointing because it hasn't shown up in our flamegraphs.


Before these changes, deploying 5k Serve apps, each with one deployment, took >1 hour and would often never finish because the Serve Controller would become unresponsive and KubeRay would end up restarting the cluster.

With these changes, deploying 5k Serve apps with a batch size of 100 per API call only takes about 90 seconds!

Related issue number

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

@jcotant1 jcotant1 added the serve Ray Serve Related Issue label Dec 9, 2024
Signed-off-by: Josh Karpel <[email protected]>
Signed-off-by: Josh Karpel <[email protected]>
Signed-off-by: Josh Karpel <[email protected]>
Signed-off-by: Josh Karpel <[email protected]>
Signed-off-by: Josh Karpel <[email protected]>
Signed-off-by: Josh Karpel <[email protected]>
Signed-off-by: Josh Karpel <[email protected]>
Signed-off-by: Josh Karpel <[email protected]>
Signed-off-by: Josh Karpel <[email protected]>
Signed-off-by: Josh Karpel <[email protected]>
Signed-off-by: Josh Karpel <[email protected]>
Signed-off-by: Josh Karpel <[email protected]>
Signed-off-by: Josh Karpel <[email protected]>
Signed-off-by: Josh Karpel <[email protected]>
Comment on lines 70 to 71
serve.run_many
serve.RunTarget
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should these be here, or should they get sectioned off in some "beta APIs" section?

Comment on lines +922 to +927
live_route_prefixes: Dict[str, str] = {
app_state.route_prefix: app_name
for app_name, app_state in self._application_states.items()
if app_state.route_prefix is not None
and not app_state.status == ApplicationStatus.DELETING
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This, along with https://github.com/ray-project/ray/pull/49168/files#diff-31975e75fad7092f05d8d5759dfeb90d8b454fc4193e4970b55914dbb87ad968R942-R946 , is an actual change that isn't just looping over existing code. For efficiency, creating this mapping is pulled up a level, and the mapping is updated as we go through all the apps we're deploying. Otherwise we'd need to loop over all existing apps for each new app in the batch.

else:
client = _private_api.serve_start(
http_options={"location": "EveryNode"},
global_logging_config=logging_config,
global_logging_config=t.logging_config, # implicitly uses the last target
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm struggling with this a bit. Is the logging config intended to be per-app or Serve-wide? Should the global logging config be a separate parameter?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logging config can be set globally and overridden per-app

In this case, it should be empty

Comment on lines -1112 to -1113
if writeahead_checkpoints is not None:
application_state_info.update(writeahead_checkpoints)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed the writeahead_checkpoints functionality after discussing with @edoakes - it sounds this is vestigial from when the Controller was much more async, and there's no functional reason to do it now (the checkpoint can happen after setting the target state, as long as no changes are made to the Ray cluster between them).

@JoshKarpel JoshKarpel marked this pull request as ready for review December 18, 2024 15:38
@JoshKarpel
Copy link
Contributor Author

Ready for a first look - I've left some questions in comments that need some spicier decisions made!

Comment on lines +49 to +50
route_prefix: Optional[str]
logging_config: Optional[LoggingConfig]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are now part of the BuiltApplication to make it easier to group things up without adding an extra intermediate data structure (that would presumably have a BuiltApplication + route_prefix + logging_config).

)[0]


@PublicAPI(stability="beta")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make this a DeveloperAPI -- I'm not comfortable maintaining it as a public API with the regular guarantees on stability (yet)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oo, didn't realize that was a thing, can do!

Comment on lines +546 to +547
wait_for_ingress_deployment_creation: bool = True,
wait_for_applications_running: bool = True,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are these options necessary for your usage?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They're a dramatic speedup for us because otherwise the client would be waiting for the controller to actually create actors/those actors to be ready, which we check for separately on each of our control loop cycles. Not needing to wait means we can turn around and submit the next group of targets immediately instead of waiting for the next Serve Controller cycle (which is especially slow for us because we've had to increase the loop interval to give the controller more time to respond to requests, though we might be able to undo that with #48807 and https://github.com/ray-project/ray/pull/45957/files).

Comment on lines +2732 to +2733
# In real code this checkpoint would be done by the caller of .deploy()
dsm.save_checkpoint()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why's this needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without this, no one actually saves the checkpoint that https://github.com/ray-project/ray/pull/49168/files#diff-41218c5a0b704f1be5080d99a1c21c89c421e12d28df768be56588ee7a53544dR2738 would try to use - with this PR, dsm.update() does not implicitly save a checkpoint; whoever is calling DeploymentStateManager.update() is responsible for that.

@JoshKarpel JoshKarpel changed the title [Serve] Faster imperative Serve Application deploys [Serve] Faster bulk imperative Serve Application deploys Jan 7, 2025
@JoshKarpel JoshKarpel requested a review from edoakes January 7, 2025 16:22
@JoshKarpel
Copy link
Contributor Author

@zcin @edoakes this could use another round of review when you get a chance 🙇🏻

Copy link
Contributor

@zcin zcin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM!

self.application_state_manager.save_checkpoint()
# ApplicationStateManager.update() can also mutate the
# DeploymentStateManager so we need to checkpoint that as well
self.deployment_state_manager.save_checkpoint()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to save deployment_state_manager checkpoint twice?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is re: https://github.com/ray-project/ray/pull/49168/files#diff-1b8f248dbc32098673b5329a85021c686d9e5f03b05c145c9354988280bf491fR404 ?

My thinking was that it's still best to checkpoint as soon as possible after making a batch of changes - but I'd be fine with moving these two checkpoints down to here-ish https://github.com/ray-project/ray/pull/49168/files#diff-1b8f248dbc32098673b5329a85021c686d9e5f03b05c145c9354988280bf491fR435 , after both update steps are complete.

a2, pida2 = serve.get_app_handle("a").remote().result()

assert a1 == a2
assert pida1 == pida2
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you also add a test to test_local_testing_mode.py?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can do - anything in particular that I should be checking there, or just an equivalent of these tests but with _local_testing_mode=True in the run calls?

Comment on lines -1853 to +1855
# Since we set up the `backoff_sequence_s` to be 999s, this 1s timeout will still
# Since we set up the `backoff_sequence_s` to be 999s, this 10s timeout will still
# capture the extra delay if it was added between scheduling loop.
done, _ = await asyncio.wait([task], timeout=1)
done, _ = await asyncio.wait([task], timeout=10)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got what looked like a flaky failure here on https://buildkite.com/ray-project/microcheck/builds/10134#01948a39-6ed6-48a0-9e2b-5e685c57a3f2 , presumably because this timeout is short and CI is slow

@JoshKarpel JoshKarpel requested a review from zcin January 21, 2025 20:36
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
serve Ray Serve Related Issue
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants