From 5169e86c935d0ddf7c750a2460d342e0fd3123de Mon Sep 17 00:00:00 2001 From: Josh Karpel Date: Tue, 19 Nov 2024 10:26:32 -0600 Subject: [PATCH 01/15] set up shared long poll client Signed-off-by: Josh Karpel --- python/ray/serve/_private/long_poll.py | 35 +++++++--- python/ray/serve/_private/router.py | 88 +++++++++++++++++++++++++- 2 files changed, 114 insertions(+), 9 deletions(-) diff --git a/python/ray/serve/_private/long_poll.py b/python/ray/serve/_private/long_poll.py index f3538913b76b..547bd5405899 100644 --- a/python/ray/serve/_private/long_poll.py +++ b/python/ray/serve/_private/long_poll.py @@ -77,8 +77,8 @@ def __init__( host_actor, key_listeners: Dict[KeyType, UpdateStateCallable], call_in_event_loop: AbstractEventLoop, + only_once: bool = False, ) -> None: - assert len(key_listeners) > 0 # We used to allow this to be optional, but due to Ray Client issue # we now enforce all long poll client to post callback to event loop # See https://github.com/ray-project/ray/issues/20971 @@ -87,6 +87,8 @@ def __init__( self.host_actor = host_actor self.key_listeners = key_listeners self.event_loop = call_in_event_loop + self.only_once = only_once + self.snapshot_ids: Dict[KeyType, int] = { # The initial snapshot id for each key is < 0, # but real snapshot keys in the long poll host are always >= 0, @@ -98,6 +100,23 @@ def __init__( self._poll_next() + def add_key_listeners( + self, key_listeners: Dict[KeyType, UpdateStateCallable] + ) -> None: + """Add more key listeners to the client. + The new listeners will only be included in the *next* long poll request; + the current request will continue with the existing listeners. + + If a key is already in the client, the new listener will replace the old one, + but the snapshot ID will be preserved, so the new listener will only be called + on the *next* update to that key. + """ + # Only initialize snapshot ids for *new* keys. + self.snapshot_ids.update( + {key: -1 for key in key_listeners.keys() if key not in self.key_listeners} + ) + self.key_listeners.update(key_listeners) + def _on_callback_completed(self, trigger_at: int): """Called after a single callback is completed. @@ -107,7 +126,7 @@ def _on_callback_completed(self, trigger_at: int): way to serialize the callback invocations between object versions. """ self._callbacks_processed_count += 1 - if self._callbacks_processed_count == trigger_at: + if not self.only_once and self._callbacks_processed_count == trigger_at: self._poll_next() def _poll_next(self): @@ -306,15 +325,15 @@ async def listen_for_change( self._count_send(LongPollState.TIME_OUT) return LongPollState.TIME_OUT else: - updated_object_key: str = async_task_to_watched_keys[done.pop()] - updated_object = { - updated_object_key: UpdatedObject( + updated_objects = {} + for task in done: + updated_object_key = async_task_to_watched_keys[task] + updated_objects[updated_object_key] = UpdatedObject( self.object_snapshots[updated_object_key], self.snapshot_ids[updated_object_key], ) - } - self._count_send(updated_object) - return updated_object + self._count_send(updated_objects) + return updated_objects async def listen_for_change_java( self, diff --git a/python/ray/serve/_private/router.py b/python/ray/serve/_private/router.py index 9cd8c10f5f82..cbdbe86cc84c 100644 --- a/python/ray/serve/_private/router.py +++ b/python/ray/serve/_private/router.py @@ -5,9 +5,12 @@ import time import uuid from abc import ABC, abstractmethod +import weakref +from asyncio import AbstractEventLoop from collections import defaultdict +from collections.abc import MutableMapping from contextlib import contextmanager -from functools import partial +from functools import lru_cache, partial from typing import Any, Coroutine, DefaultDict, Dict, List, Optional, Tuple, Union import ray @@ -398,6 +401,16 @@ def __init__( ), ) + # The Router needs to stay informed about changes to the target deployment's + # running replicas and deployment config. We do this via the long poll system. + # However, for efficiency, we don't want to create a LongPollClient for every + # DeploymentHandle, so we use a shared LongPollClient that all Routers + # register themselves with. But first, the router needs to get a fast initial + # update so that it can start serving requests, which we do with a + # LongPollClient that is told to run only once. This client gets the + # first update quickly, and then future updates are handled + # by the SharedRouterLongPollClient. + self.long_poll_client = LongPollClient( controller_handle, { @@ -411,7 +424,13 @@ def __init__( ): self.update_deployment_config, }, call_in_event_loop=self._event_loop, + only_once=True, + ) + + shared = SharedRouterLongPollClient.get_or_create( + controller_handle, self._event_loop ) + shared.register(self) def running_replicas_populated(self) -> bool: return self._running_replicas_populated @@ -684,3 +703,70 @@ def shutdown(self): asyncio.run_coroutine_threadsafe( self._asyncio_router.shutdown(), loop=self._asyncio_loop ).result() + + +class SharedRouterLongPollClient: + def __init__(self, controller_handle: ActorHandle, event_loop: AbstractEventLoop): + self.controller_handler = controller_handle + + # We use a WeakSet to store the Routers so that we don't prevent them + # from being garbage-collected. + self.routers: MutableMapping[ + DeploymentID, weakref.WeakSet[Router] + ] = defaultdict(weakref.WeakSet) + + # Creating the LongPollClient implicitly starts it + self.long_poll_client = LongPollClient( + controller_handle, + key_listeners={}, + call_in_event_loop=event_loop, + ) + + @classmethod + @lru_cache(maxsize=None) + def get_or_create( + cls, controller_handle: ActorHandle, event_loop: AbstractEventLoop + ) -> "SharedRouterLongPollClient": + shared = cls(controller_handle=controller_handle, event_loop=event_loop) + logger.info(f"Started {shared}.") + return shared + + def update_running_replicas( + self, running_replicas: List[RunningReplicaInfo], deployment_id: DeploymentID + ) -> None: + for router in self.routers[deployment_id]: + router.update_running_replicas(running_replicas) + + def update_deployment_config( + self, deployment_config: DeploymentConfig, deployment_id: DeploymentID + ) -> None: + for router in self.routers[deployment_id]: + router.update_deployment_config(deployment_config) + + def register(self, router: Router) -> None: + self.routers[router.deployment_id].add(router) + + # Remove the entries for any deployment ids that no longer have any routers. + # The WeakSets will automatically lose track of Routers that get GC'd, + # but the outer dict will keep the key around, so we need to clean up manually. + for deployment_id, routers in self.routers.items(): + if not routers: + self.routers.pop(deployment_id) + + # Register the new listeners on the long poll client. + # Some of these listeners may already exist, but it's safe to add them again. + key_listeners = { + **{ + (LongPollNamespace.RUNNING_REPLICAS, deployment_id): partial( + self.update_running_replicas, deployment_id=deployment_id + ) + for deployment_id in self.routers.keys() + }, + **{ + (LongPollNamespace.DEPLOYMENT_CONFIG, deployment_id): partial( + self.update_deployment_config, deployment_id=deployment_id + ) + for deployment_id in self.routers.keys() + }, + } + self.long_poll_client.add_key_listeners(key_listeners) From ae170d3ddb1ad075a5385040b3b6f851a99965d2 Mon Sep 17 00:00:00 2001 From: Josh Karpel Date: Wed, 20 Nov 2024 10:54:31 -0600 Subject: [PATCH 02/15] avoid mutating while iterating Signed-off-by: Josh Karpel --- python/ray/serve/_private/router.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/python/ray/serve/_private/router.py b/python/ray/serve/_private/router.py index cbdbe86cc84c..e2ace81fda16 100644 --- a/python/ray/serve/_private/router.py +++ b/python/ray/serve/_private/router.py @@ -4,8 +4,8 @@ import threading import time import uuid -from abc import ABC, abstractmethod import weakref +from abc import ABC, abstractmethod from asyncio import AbstractEventLoop from collections import defaultdict from collections.abc import MutableMapping @@ -749,7 +749,8 @@ def register(self, router: Router) -> None: # Remove the entries for any deployment ids that no longer have any routers. # The WeakSets will automatically lose track of Routers that get GC'd, # but the outer dict will keep the key around, so we need to clean up manually. - for deployment_id, routers in self.routers.items(): + # Note the list(...) to avoid mutating self.routers while iterating over it. + for deployment_id, routers in list(self.routers.items()): if not routers: self.routers.pop(deployment_id) From 53f97fc2a8755ee0b4ba38469c005c959f7d9727 Mon Sep 17 00:00:00 2001 From: Josh Karpel Date: Wed, 20 Nov 2024 11:20:36 -0600 Subject: [PATCH 03/15] better dictionary merge Signed-off-by: Josh Karpel --- python/ray/serve/_private/router.py | 21 +++++++++------------ 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/python/ray/serve/_private/router.py b/python/ray/serve/_private/router.py index e2ace81fda16..50eb47369ffd 100644 --- a/python/ray/serve/_private/router.py +++ b/python/ray/serve/_private/router.py @@ -757,17 +757,14 @@ def register(self, router: Router) -> None: # Register the new listeners on the long poll client. # Some of these listeners may already exist, but it's safe to add them again. key_listeners = { - **{ - (LongPollNamespace.RUNNING_REPLICAS, deployment_id): partial( - self.update_running_replicas, deployment_id=deployment_id - ) - for deployment_id in self.routers.keys() - }, - **{ - (LongPollNamespace.DEPLOYMENT_CONFIG, deployment_id): partial( - self.update_deployment_config, deployment_id=deployment_id - ) - for deployment_id in self.routers.keys() - }, + (LongPollNamespace.RUNNING_REPLICAS, deployment_id): partial( + self.update_running_replicas, deployment_id=deployment_id + ) + for deployment_id in self.routers.keys() + } | { + (LongPollNamespace.DEPLOYMENT_CONFIG, deployment_id): partial( + self.update_deployment_config, deployment_id=deployment_id + ) + for deployment_id in self.routers.keys() } self.long_poll_client.add_key_listeners(key_listeners) From d586f08a11e1e3f171bc7795f66f8336754b7064 Mon Sep 17 00:00:00 2001 From: Josh Karpel Date: Wed, 20 Nov 2024 16:26:17 -0600 Subject: [PATCH 04/15] protect against empty keys Signed-off-by: Josh Karpel --- python/ray/serve/_private/long_poll.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/python/ray/serve/_private/long_poll.py b/python/ray/serve/_private/long_poll.py index 547bd5405899..afc75167b587 100644 --- a/python/ray/serve/_private/long_poll.py +++ b/python/ray/serve/_private/long_poll.py @@ -2,6 +2,7 @@ import logging import os import random +from asyncio import sleep from asyncio.events import AbstractEventLoop from collections import defaultdict from dataclasses import dataclass @@ -264,10 +265,14 @@ async def listen_for_change( ) -> Union[LongPollState, Dict[KeyType, UpdatedObject]]: """Listen for changed objects. - This method will returns a dictionary of updated objects. It returns + This method will return a dictionary of updated objects. It returns immediately if the snapshot_ids are outdated, otherwise it will block until there's an update. """ + if not keys_to_snapshot_ids: + await sleep(random.uniform(*self._listen_for_change_request_timeout_s)) + return {} + # If there are any keys with outdated snapshot ids, # return their updated values immediately. updated_objects = {} From 9fa746b0e610d72c9d9b80111ec0d79aeef38090 Mon Sep 17 00:00:00 2001 From: Josh Karpel Date: Thu, 21 Nov 2024 09:50:38 -0600 Subject: [PATCH 05/15] call _count_send Signed-off-by: Josh Karpel --- python/ray/serve/_private/long_poll.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/python/ray/serve/_private/long_poll.py b/python/ray/serve/_private/long_poll.py index 0d844a6b91b0..33814057ed39 100644 --- a/python/ray/serve/_private/long_poll.py +++ b/python/ray/serve/_private/long_poll.py @@ -270,9 +270,13 @@ async def listen_for_change( immediately if the snapshot_ids are outdated, otherwise it will block until there's an update. """ + # If there are no keys to listen for, just wait for the timeout and return. if not keys_to_snapshot_ids: await sleep(random.uniform(*self._listen_for_change_request_timeout_s)) - return {} + + updated_objects = {} + self._count_send(updated_objects) + return updated_objects # If there are any keys with outdated snapshot ids, # return their updated values immediately. From 83eb5cfe9e6bed401735277a2bceb3e9daa12535 Mon Sep 17 00:00:00 2001 From: Josh Karpel Date: Thu, 21 Nov 2024 10:19:39 -0600 Subject: [PATCH 06/15] shorter sleep on empty keys Signed-off-by: Josh Karpel --- python/ray/serve/_private/long_poll.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/python/ray/serve/_private/long_poll.py b/python/ray/serve/_private/long_poll.py index 33814057ed39..4ac43ce4be39 100644 --- a/python/ray/serve/_private/long_poll.py +++ b/python/ray/serve/_private/long_poll.py @@ -270,9 +270,11 @@ async def listen_for_change( immediately if the snapshot_ids are outdated, otherwise it will block until there's an update. """ - # If there are no keys to listen for, just wait for the timeout and return. + # If there are no keys to listen for, + # just wait for a short time to provide backpressure, + # then return an empty update. if not keys_to_snapshot_ids: - await sleep(random.uniform(*self._listen_for_change_request_timeout_s)) + await sleep(1) updated_objects = {} self._count_send(updated_objects) From cfc7d1973065e296fceaf7696373c154ff61e4f3 Mon Sep 17 00:00:00 2001 From: Josh Karpel Date: Thu, 21 Nov 2024 11:51:57 -0600 Subject: [PATCH 07/15] poll again if no callbacks Signed-off-by: Josh Karpel --- python/ray/serve/_private/long_poll.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/python/ray/serve/_private/long_poll.py b/python/ray/serve/_private/long_poll.py index 4ac43ce4be39..7352d4658bbc 100644 --- a/python/ray/serve/_private/long_poll.py +++ b/python/ray/serve/_private/long_poll.py @@ -182,6 +182,8 @@ def _process_update(self, updates: Dict[str, UpdatedObject]): f"{list(updates.keys())}.", extra={"log_to_stderr": False}, ) + if not updates: # no updates, no callbacks to run, just poll again + self._schedule_to_event_loop(self._poll_next) for key, update in updates.items(): self.snapshot_ids[key] = update.snapshot_id callback = self.key_listeners[key] From 56b7df57a1685fe515aabe525800838c9c8a17bb Mon Sep 17 00:00:00 2001 From: Josh Karpel Date: Thu, 5 Dec 2024 11:03:39 -0600 Subject: [PATCH 08/15] rework test Signed-off-by: Josh Karpel --- python/ray/serve/tests/test_deploy.py | 33 +++++++++++++++++++++++---- 1 file changed, 29 insertions(+), 4 deletions(-) diff --git a/python/ray/serve/tests/test_deploy.py b/python/ray/serve/tests/test_deploy.py index 769bff110093..2778ab4589f4 100644 --- a/python/ray/serve/tests/test_deploy.py +++ b/python/ray/serve/tests/test_deploy.py @@ -324,10 +324,16 @@ def make_nonblocking_calls(expected, expect_blocking=False): make_nonblocking_calls({"2": 2}) -def test_reconfigure_with_queries(serve_instance): +def test_reconfigure_does_not_run_while_there_are_active_queries(serve_instance): + """ + This tests checks that reconfigure can't trigger while there are active requests, + so that the actor's state is not mutated mid-request. + + https://github.com/ray-project/ray/pull/20315 + """ signal = SignalActor.remote() - @serve.deployment(max_ongoing_requests=10, num_replicas=3) + @serve.deployment(max_ongoing_requests=10, num_replicas=1) class A: def __init__(self): self.state = None @@ -340,17 +346,36 @@ async def __call__(self): return self.state["a"] handle = serve.run(A.options(version="1", user_config={"a": 1}).bind()) - responses = [handle.remote() for _ in range(30)] + responses = [handle.remote() for _ in range(10)] + + # Give the queries time to get to the replicas before the reconfigure. + time.sleep(0.1) @ray.remote(num_cpus=0) def reconfigure(): serve.run(A.options(version="1", user_config={"a": 2}).bind()) + # Start the reconfigure; + # this will not complete until the signal is released + # to allow the queries to complete. reconfigure_ref = reconfigure.remote() + + # Release the signal to allow the queries to complete. signal.send.remote() + + # Wait for the reconfigure to complete. ray.get(reconfigure_ref) - assert all([r.result() == 1 for r in responses]) + # These should all be 1 because the queries were sent before the reconfigure, + # the reconfigure blocks until they complete, + # and we just waited for the reconfigure to finish. + results = [r.result() for r in responses] + print(results) + assert all([r == 1 for r in results]) + + # If we query again, it should be 2, + # because the reconfigure will have gone through after the + # original queries completed. assert handle.remote().result() == 2 From 102fb09dac5ffd38ff53aef20d41620fbae9de7a Mon Sep 17 00:00:00 2001 From: Josh Karpel Date: Thu, 5 Dec 2024 15:00:11 -0600 Subject: [PATCH 09/15] use new handle Signed-off-by: Josh Karpel --- python/ray/serve/tests/test_deploy.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/ray/serve/tests/test_deploy.py b/python/ray/serve/tests/test_deploy.py index 2778ab4589f4..914389bb5255 100644 --- a/python/ray/serve/tests/test_deploy.py +++ b/python/ray/serve/tests/test_deploy.py @@ -213,7 +213,7 @@ async def __call__(self, **kwargs): ref2.result(timeout_s=1) # Redeploy new version. - serve._run(V2.bind(), _blocking=False, name="app") + h = serve._run(V2.bind(), _blocking=False, name="app") with pytest.raises(TimeoutError): client._wait_for_application_running("app", timeout_s=2) @@ -225,7 +225,7 @@ async def __call__(self, **kwargs): vals2, pids2 = zip(*[h.remote(block=False).result() for _ in range(10)]) # Since there is one replica blocking, only one new # replica should be started up. - assert "v1" in vals2 + assert set(vals2) == {"v1", "v2"} # Signal the original call to exit. ray.get(signal.send.remote()) From ca4a1f7edce1c34452d39a2e2e0927f71243894b Mon Sep 17 00:00:00 2001 From: Josh Karpel Date: Wed, 11 Dec 2024 11:39:46 -0600 Subject: [PATCH 10/15] does a long sleep fix it? Signed-off-by: Josh Karpel --- python/ray/serve/tests/test_deploy.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/python/ray/serve/tests/test_deploy.py b/python/ray/serve/tests/test_deploy.py index 914389bb5255..42100eb106ce 100644 --- a/python/ray/serve/tests/test_deploy.py +++ b/python/ray/serve/tests/test_deploy.py @@ -217,6 +217,8 @@ async def __call__(self, **kwargs): with pytest.raises(TimeoutError): client._wait_for_application_running("app", timeout_s=2) + time.sleep(60) + if RAY_SERVE_EAGERLY_START_REPLACEMENT_REPLICAS: # Two new replicas should be started. vals2, pids2 = zip(*[h.remote(block=False).result() for _ in range(10)]) From 9737dbdcd052d0f390c8c65810f238fbe576e6d7 Mon Sep 17 00:00:00 2001 From: Josh Karpel Date: Wed, 11 Dec 2024 14:19:11 -0600 Subject: [PATCH 11/15] do not stop the dedicated client until the shared client gets an update Signed-off-by: Josh Karpel --- python/ray/serve/_private/long_poll.py | 9 +++++++-- python/ray/serve/_private/router.py | 13 ++++++------- python/ray/serve/tests/test_deploy.py | 2 -- 3 files changed, 13 insertions(+), 11 deletions(-) diff --git a/python/ray/serve/_private/long_poll.py b/python/ray/serve/_private/long_poll.py index 7352d4658bbc..fd66dfa350fe 100644 --- a/python/ray/serve/_private/long_poll.py +++ b/python/ray/serve/_private/long_poll.py @@ -79,7 +79,6 @@ def __init__( host_actor, key_listeners: Dict[KeyType, UpdateStateCallable], call_in_event_loop: AbstractEventLoop, - only_once: bool = False, ) -> None: # We used to allow this to be optional, but due to Ray Client issue # we now enforce all long poll client to post callback to event loop @@ -89,7 +88,6 @@ def __init__( self.host_actor = host_actor self.key_listeners = key_listeners self.event_loop = call_in_event_loop - self.only_once = only_once self.snapshot_ids: Dict[KeyType, int] = { # The initial snapshot id for each key is < 0, @@ -102,6 +100,10 @@ def __init__( self._poll_next() + def stop(self) -> None: + """Stop the long poll client after the next RPC returns.""" + self.is_running = False + def add_key_listeners( self, key_listeners: Dict[KeyType, UpdateStateCallable] ) -> None: @@ -135,6 +137,9 @@ def _poll_next(self): """Poll the update. The callback is expected to scheduler another _poll_next call. """ + if not self.is_running: + return + self._callbacks_processed_count = 0 self._current_ref = self.host_actor.listen_for_change.remote(self.snapshot_ids) self._current_ref._on_completed(lambda update: self._process_update(update)) diff --git a/python/ray/serve/_private/router.py b/python/ray/serve/_private/router.py index 3345864214d3..fe04316ba1a4 100644 --- a/python/ray/serve/_private/router.py +++ b/python/ray/serve/_private/router.py @@ -406,10 +406,8 @@ def __init__( # However, for efficiency, we don't want to create a LongPollClient for every # DeploymentHandle, so we use a shared LongPollClient that all Routers # register themselves with. But first, the router needs to get a fast initial - # update so that it can start serving requests, which we do with a - # LongPollClient that is told to run only once. This client gets the - # first update quickly, and then future updates are handled - # by the SharedRouterLongPollClient. + # update so that it can start serving requests, which we do with a dedicated + # LongPollClient that stops running once the shared client takes over. self.long_poll_client = LongPollClient( controller_handle, @@ -424,7 +422,6 @@ def __init__( ): self.update_deployment_config, }, call_in_event_loop=self._event_loop, - only_once=True, ) shared = SharedRouterLongPollClient.get_or_create( @@ -712,7 +709,7 @@ def __init__(self, controller_handle: ActorHandle, event_loop: AbstractEventLoop # We use a WeakSet to store the Routers so that we don't prevent them # from being garbage-collected. self.routers: MutableMapping[ - DeploymentID, weakref.WeakSet[Router] + DeploymentID, weakref.WeakSet[AsyncioRouter] ] = defaultdict(weakref.WeakSet) # Creating the LongPollClient implicitly starts it @@ -736,14 +733,16 @@ def update_running_replicas( ) -> None: for router in self.routers[deployment_id]: router.update_running_replicas(running_replicas) + router.long_poll_client.stop() def update_deployment_config( self, deployment_config: DeploymentConfig, deployment_id: DeploymentID ) -> None: for router in self.routers[deployment_id]: router.update_deployment_config(deployment_config) + router.long_poll_client.stop() - def register(self, router: Router) -> None: + def register(self, router: AsyncioRouter) -> None: self.routers[router.deployment_id].add(router) # Remove the entries for any deployment ids that no longer have any routers. diff --git a/python/ray/serve/tests/test_deploy.py b/python/ray/serve/tests/test_deploy.py index 42100eb106ce..914389bb5255 100644 --- a/python/ray/serve/tests/test_deploy.py +++ b/python/ray/serve/tests/test_deploy.py @@ -217,8 +217,6 @@ async def __call__(self, **kwargs): with pytest.raises(TimeoutError): client._wait_for_application_running("app", timeout_s=2) - time.sleep(60) - if RAY_SERVE_EAGERLY_START_REPLACEMENT_REPLICAS: # Two new replicas should be started. vals2, pids2 = zip(*[h.remote(block=False).result() for _ in range(10)]) From 78d2b7741627483acaee0735e38848bbad24d50a Mon Sep 17 00:00:00 2001 From: Josh Karpel Date: Thu, 12 Dec 2024 09:54:21 -0600 Subject: [PATCH 12/15] fix typo Signed-off-by: Josh Karpel --- python/ray/serve/_private/long_poll.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/serve/_private/long_poll.py b/python/ray/serve/_private/long_poll.py index fd66dfa350fe..7f90007cc73f 100644 --- a/python/ray/serve/_private/long_poll.py +++ b/python/ray/serve/_private/long_poll.py @@ -130,7 +130,7 @@ def _on_callback_completed(self, trigger_at: int): way to serialize the callback invocations between object versions. """ self._callbacks_processed_count += 1 - if not self.only_once and self._callbacks_processed_count == trigger_at: + if self._callbacks_processed_count == trigger_at: self._poll_next() def _poll_next(self): From 9f14f7168e6236c994af6d4fa846428d816ba97f Mon Sep 17 00:00:00 2001 From: Josh Karpel Date: Thu, 12 Dec 2024 09:57:34 -0600 Subject: [PATCH 13/15] tidy up Signed-off-by: Josh Karpel --- python/ray/serve/_private/long_poll.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/python/ray/serve/_private/long_poll.py b/python/ray/serve/_private/long_poll.py index 7f90007cc73f..78b3f8d3f414 100644 --- a/python/ray/serve/_private/long_poll.py +++ b/python/ray/serve/_private/long_poll.py @@ -88,7 +88,6 @@ def __init__( self.host_actor = host_actor self.key_listeners = key_listeners self.event_loop = call_in_event_loop - self.snapshot_ids: Dict[KeyType, int] = { # The initial snapshot id for each key is < 0, # but real snapshot keys in the long poll host are always >= 0, @@ -274,8 +273,8 @@ async def listen_for_change( """Listen for changed objects. This method will return a dictionary of updated objects. It returns - immediately if the snapshot_ids are outdated, otherwise it will block - until there's an update. + immediately if any of the snapshot_ids are outdated, + otherwise it will block until there's an update. """ # If there are no keys to listen for, # just wait for a short time to provide backpressure, From d7dd6deb01914a249e04a5bf34e981aa35d141f7 Mon Sep 17 00:00:00 2001 From: Josh Karpel Date: Thu, 12 Dec 2024 14:12:42 -0600 Subject: [PATCH 14/15] undo test changes Signed-off-by: Josh Karpel --- python/ray/serve/tests/test_deploy.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/ray/serve/tests/test_deploy.py b/python/ray/serve/tests/test_deploy.py index 914389bb5255..2778ab4589f4 100644 --- a/python/ray/serve/tests/test_deploy.py +++ b/python/ray/serve/tests/test_deploy.py @@ -213,7 +213,7 @@ async def __call__(self, **kwargs): ref2.result(timeout_s=1) # Redeploy new version. - h = serve._run(V2.bind(), _blocking=False, name="app") + serve._run(V2.bind(), _blocking=False, name="app") with pytest.raises(TimeoutError): client._wait_for_application_running("app", timeout_s=2) @@ -225,7 +225,7 @@ async def __call__(self, **kwargs): vals2, pids2 = zip(*[h.remote(block=False).result() for _ in range(10)]) # Since there is one replica blocking, only one new # replica should be started up. - assert set(vals2) == {"v1", "v2"} + assert "v1" in vals2 # Signal the original call to exit. ray.get(signal.send.remote()) From 0611daeeadd55e03b019aa08ec3af7d61b7d2a4b Mon Sep 17 00:00:00 2001 From: Josh Karpel Date: Mon, 13 Jan 2025 13:20:10 -0600 Subject: [PATCH 15/15] handle changes around update_running_targets Signed-off-by: Josh Karpel --- python/ray/serve/_private/router.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/python/ray/serve/_private/router.py b/python/ray/serve/_private/router.py index 92b29e106d7b..c64144243770 100644 --- a/python/ray/serve/_private/router.py +++ b/python/ray/serve/_private/router.py @@ -735,11 +735,13 @@ def get_or_create( logger.info(f"Started {shared}.") return shared - def update_running_replicas( - self, running_replicas: List[RunningReplicaInfo], deployment_id: DeploymentID + def update_deployment_targets( + self, + deployment_target_info: DeploymentTargetInfo, + deployment_id: DeploymentID, ) -> None: for router in self.routers[deployment_id]: - router.update_running_replicas(running_replicas) + router.update_deployment_targets(deployment_target_info) router.long_poll_client.stop() def update_deployment_config( @@ -763,8 +765,8 @@ def register(self, router: AsyncioRouter) -> None: # Register the new listeners on the long poll client. # Some of these listeners may already exist, but it's safe to add them again. key_listeners = { - (LongPollNamespace.RUNNING_REPLICAS, deployment_id): partial( - self.update_running_replicas, deployment_id=deployment_id + (LongPollNamespace.DEPLOYMENT_TARGETS, deployment_id): partial( + self.update_deployment_targets, deployment_id=deployment_id ) for deployment_id in self.routers.keys() } | {