diff --git a/packages/pytest-simcore/src/pytest_simcore/rabbit_service.py b/packages/pytest-simcore/src/pytest_simcore/rabbit_service.py index 91873a69d08..61aed94151a 100644 --- a/packages/pytest-simcore/src/pytest_simcore/rabbit_service.py +++ b/packages/pytest-simcore/src/pytest_simcore/rabbit_service.py @@ -6,7 +6,6 @@ import asyncio import logging from collections.abc import AsyncIterator, Awaitable, Callable -from contextlib import suppress import aio_pika import pytest @@ -141,12 +140,11 @@ async def ensure_parametrized_queue_is_empty( rabbitmq_client = create_rabbitmq_client("pytest-purger") async def _queue_messages_purger() -> None: - with suppress(aio_pika.exceptions.ChannelClosed): - assert rabbitmq_client._channel_pool # noqa: SLF001 - async with rabbitmq_client._channel_pool.acquire() as channel: # noqa: SLF001 - assert isinstance(channel, aio_pika.RobustChannel) - queue = await channel.get_queue(queue_name) - await queue.purge() + assert rabbitmq_client._channel_pool # noqa: SLF001 + async with rabbitmq_client._channel_pool.acquire() as channel: # noqa: SLF001 + assert isinstance(channel, aio_pika.RobustChannel) + queue = await channel.get_queue(queue_name) + await queue.purge() await _queue_messages_purger() yield diff --git a/packages/pytest-simcore/src/pytest_simcore/redis_service.py b/packages/pytest-simcore/src/pytest_simcore/redis_service.py index 7793228d3c4..05aec86a234 100644 --- a/packages/pytest-simcore/src/pytest_simcore/redis_service.py +++ b/packages/pytest-simcore/src/pytest_simcore/redis_service.py @@ -10,7 +10,6 @@ import tenacity from pytest_mock import MockerFixture from redis.asyncio import Redis, from_url -from servicelib.redis import _constants as redis_constants from settings_library.basic_types import PortInt from settings_library.redis import RedisDatabase, RedisSettings from tenacity.before_sleep import before_sleep_log @@ -119,4 +118,6 @@ async def wait_till_redis_responsive(redis_url: URL | str) -> None: @pytest.fixture def mock_redis_socket_timeout(mocker: MockerFixture) -> None: # lowered to allow CI to properly shutdown RedisClientSDK instances - mocker.patch.object(redis_constants, "DEFAULT_SOCKET_TIMEOUT", timedelta(seconds=1)) + mocker.patch( + "servicelib.redis._client.DEFAULT_SOCKET_TIMEOUT", timedelta(seconds=0.25) + ) diff --git a/packages/service-library/src/servicelib/async_utils.py b/packages/service-library/src/servicelib/async_utils.py index 0b1a6b27ca5..c6466df0a70 100644 --- a/packages/service-library/src/servicelib/async_utils.py +++ b/packages/service-library/src/servicelib/async_utils.py @@ -1,15 +1,22 @@ import asyncio +import contextlib +import datetime import logging from collections import deque +from collections.abc import Awaitable, Callable, Coroutine from contextlib import suppress from dataclasses import dataclass from functools import wraps -from typing import TYPE_CHECKING, Any, Awaitable, Callable, Deque +from typing import TYPE_CHECKING, Any, ParamSpec, TypeVar from . import tracing from .utils_profiling_middleware import dont_profile, is_profiling, profile_context -logger = logging.getLogger(__name__) +_logger = logging.getLogger(__name__) + +P = ParamSpec("P") +R = TypeVar("R") + if TYPE_CHECKING: Queue = asyncio.Queue @@ -54,7 +61,7 @@ async def _safe_cancel(context: Context) -> None: await context.task except RuntimeError as e: if "Event loop is closed" in f"{e}": - logger.warning("event loop is closed and could not cancel %s", context) + _logger.warning("event loop is closed and could not cancel %s", context) else: raise @@ -65,7 +72,7 @@ async def cancel_sequential_workers() -> None: await _safe_cancel(context) _sequential_jobs_contexts.clear() - logger.info("All run_sequentially_in_context pending workers stopped") + _logger.info("All run_sequentially_in_context pending workers stopped") # NOTE: If you get funny mismatches with mypy in returned values it might be due to this decorator. @@ -118,25 +125,25 @@ def _get_context(args: Any, kwargs: dict) -> Context: arg_names = decorated_function.__code__.co_varnames[ : decorated_function.__code__.co_argcount ] - search_args = dict(zip(arg_names, args)) + search_args = dict(zip(arg_names, args, strict=False)) search_args.update(kwargs) - key_parts: Deque[str] = deque() + key_parts: deque[str] = deque() for arg in target_args: sub_args = arg.split(".") main_arg = sub_args[0] if main_arg not in search_args: - raise ValueError( + msg = ( f"Expected '{main_arg}' in '{decorated_function.__name__}'" f" arguments. Got '{search_args}'" ) + raise ValueError(msg) context_key = search_args[main_arg] for attribute in sub_args[1:]: potential_key = getattr(context_key, attribute) if not potential_key: - raise ValueError( - f"Expected '{attribute}' attribute in '{context_key.__name__}' arguments." - ) + msg = f"Expected '{attribute}' attribute in '{context_key.__name__}' arguments." + raise ValueError(msg) context_key = potential_key key_parts.append(f"{decorated_function.__name__}_{context_key}") @@ -205,3 +212,40 @@ async def worker(in_q: Queue[QueueElement], out_q: Queue) -> None: return wrapper return decorator + + +def delayed_start( + delay: datetime.timedelta, +) -> Callable[ + [Callable[P, Coroutine[Any, Any, R]]], Callable[P, Coroutine[Any, Any, R]] +]: + def _decorator( + func: Callable[P, Coroutine[Any, Any, R]], + ) -> Callable[P, Coroutine[Any, Any, R]]: + @wraps(func) + async def _wrapper(*args: P.args, **kwargs: P.kwargs) -> R: + await asyncio.sleep(delay.total_seconds()) + return await func(*args, **kwargs) + + return _wrapper + + return _decorator + + +async def cancel_wait_task( + task: asyncio.Task, + *, + max_delay: float | None = None, +) -> None: + """Cancel a asyncio.Task and waits for it to finish. + + :param task: task to be canceled + :param max_delay: duration (in seconds) to wait before giving + up the cancellation. If None it waits forever. + :raises TimeoutError: raised if cannot cancel the task. + """ + + task.cancel() + async with asyncio.timeout(max_delay): + with contextlib.suppress(asyncio.CancelledError): + await task diff --git a/packages/service-library/src/servicelib/background_task.py b/packages/service-library/src/servicelib/background_task.py index 96675d2f74e..793d05b1f9b 100644 --- a/packages/service-library/src/servicelib/background_task.py +++ b/packages/service-library/src/servicelib/background_task.py @@ -1,136 +1,115 @@ import asyncio import contextlib import datetime +import functools import logging -from collections.abc import AsyncIterator, Awaitable, Callable -from typing import Final +from collections.abc import AsyncIterator, Awaitable, Callable, Coroutine +from typing import Any, Final, ParamSpec, TypeVar -from common_library.errors_classes import OsparcErrorMixin -from tenacity import TryAgain -from tenacity.asyncio import AsyncRetrying -from tenacity.stop import stop_after_attempt +from tenacity import TryAgain, before_sleep_log, retry, retry_if_exception_type from tenacity.wait import wait_fixed -from .decorators import async_delayed -from .logging_utils import log_catch, log_context +from .async_utils import cancel_wait_task, delayed_start +from .logging_utils import log_context _logger = logging.getLogger(__name__) _DEFAULT_STOP_TIMEOUT_S: Final[int] = 5 -_MAX_TASK_CANCELLATION_ATTEMPTS: Final[int] = 3 - - -class PeriodicTaskCancellationError(OsparcErrorMixin, Exception): - msg_template: str = "Could not cancel task '{task_name}'" class SleepUsingAsyncioEvent: - """Sleep strategy that waits on an event to be set.""" + """Sleep strategy that waits on an event to be set or sleeps.""" def __init__(self, event: "asyncio.Event") -> None: self.event = event - async def __call__(self, timeout: float | None) -> None: + async def __call__(self, delay: float | None) -> None: with contextlib.suppress(TimeoutError): - await asyncio.wait_for(self.event.wait(), timeout=timeout) + await asyncio.wait_for(self.event.wait(), timeout=delay) self.event.clear() -async def _periodic_scheduled_task( - task: Callable[..., Awaitable[None]], +P = ParamSpec("P") +R = TypeVar("R") + + +def periodic( *, interval: datetime.timedelta, - task_name: str, - early_wake_up_event: asyncio.Event | None, - **task_kwargs, -) -> None: - # NOTE: This retries forever unless cancelled - nap = ( - asyncio.sleep - if early_wake_up_event is None - else SleepUsingAsyncioEvent(early_wake_up_event) - ) - async for attempt in AsyncRetrying( - sleep=nap, - wait=wait_fixed(interval.total_seconds()), - ): - with attempt: - with log_context( - _logger, - logging.DEBUG, - msg=f"iteration {attempt.retry_state.attempt_number} of '{task_name}'", - ), log_catch(_logger): - await task(**task_kwargs) + raise_on_error: bool = False, + early_wake_up_event: asyncio.Event | None = None, +) -> Callable[ + [Callable[P, Coroutine[Any, Any, None]]], Callable[P, Coroutine[Any, Any, None]] +]: + """Calls the function periodically with a given interval. + + Arguments: + interval -- the interval between calls + + Keyword Arguments: + raise_on_error -- If False the function will be retried indefinitely unless cancelled. + If True the function will be retried indefinitely unless cancelled + or an exception is raised. (default: {False}) + early_wake_up_event -- allows to awaken the function before the interval has passed. (default: {None}) + + Returns: + coroutine that will be called periodically (runs forever) + """ + def _decorator( + func: Callable[P, Coroutine[Any, Any, None]], + ) -> Callable[P, Coroutine[Any, Any, None]]: + nap = ( + asyncio.sleep + if early_wake_up_event is None + else SleepUsingAsyncioEvent(early_wake_up_event) + ) + + @retry( + sleep=nap, + wait=wait_fixed(interval.total_seconds()), + reraise=True, + retry=( + retry_if_exception_type(TryAgain) + if raise_on_error + else retry_if_exception_type() + ), + before_sleep=before_sleep_log(_logger, logging.DEBUG), + ) + @functools.wraps(func) + async def _wrapper(*args: P.args, **kwargs: P.kwargs) -> None: + await func(*args, **kwargs) raise TryAgain + return _wrapper -def start_periodic_task( + return _decorator + + +def create_periodic_task( task: Callable[..., Awaitable[None]], *, interval: datetime.timedelta, task_name: str, + raise_on_error: bool = False, wait_before_running: datetime.timedelta = datetime.timedelta(0), early_wake_up_event: asyncio.Event | None = None, **kwargs, ) -> asyncio.Task: - with log_context( - _logger, logging.DEBUG, msg=f"create periodic background task '{task_name}'" - ): - delayed_periodic_scheduled_task = async_delayed(wait_before_running)( - _periodic_scheduled_task - ) - return asyncio.create_task( - delayed_periodic_scheduled_task( - task, - interval=interval, - task_name=task_name, - early_wake_up_event=early_wake_up_event, - **kwargs, - ), - name=task_name, - ) - + @delayed_start(wait_before_running) + @periodic( + interval=interval, + raise_on_error=raise_on_error, + early_wake_up_event=early_wake_up_event, + ) + async def _() -> None: + await task(**kwargs) -async def cancel_task( - task: asyncio.Task, - *, - timeout: float | None, - cancellation_attempts: int = _MAX_TASK_CANCELLATION_ATTEMPTS, -) -> None: - """Reliable task cancellation. Some libraries will just hang without - cancelling the task. It is important to retry the operation to provide - a timeout in that situation to avoid forever pending tasks. - - :param task: task to be canceled - :param timeout: total duration (in seconds) to wait before giving - up the cancellation. If None it waits forever. - :raises TryAgain: raised if cannot cancel the task. - """ - async for attempt in AsyncRetrying( - stop=stop_after_attempt(cancellation_attempts), reraise=True - ): - with attempt: - task.cancel() - _, pending = await asyncio.wait((task,), timeout=timeout) - if pending: - task_name = task.get_name() - _logger.info( - "tried to cancel '%s' but timed-out! %s", task_name, pending - ) - raise PeriodicTaskCancellationError(task_name=task_name) - - -async def stop_periodic_task( - asyncio_task: asyncio.Task, *, timeout: float | None = None -) -> None: with log_context( - _logger, - logging.DEBUG, - msg=f"cancel periodic background task '{asyncio_task.get_name()}'", + _logger, logging.DEBUG, msg=f"create periodic background task '{task_name}'" ): - await cancel_task(asyncio_task, timeout=timeout) + return asyncio.create_task(_(), name=task_name) @contextlib.asynccontextmanager @@ -140,16 +119,21 @@ async def periodic_task( interval: datetime.timedelta, task_name: str, stop_timeout: float = _DEFAULT_STOP_TIMEOUT_S, + raise_on_error: bool = False, **kwargs, ) -> AsyncIterator[asyncio.Task]: asyncio_task: asyncio.Task | None = None try: - asyncio_task = start_periodic_task( - task, interval=interval, task_name=task_name, **kwargs + asyncio_task = create_periodic_task( + task, + interval=interval, + task_name=task_name, + raise_on_error=raise_on_error, + **kwargs, ) yield asyncio_task finally: if asyncio_task is not None: # NOTE: this stopping is shielded to prevent the cancellation to propagate # into the stopping procedure - await asyncio.shield(stop_periodic_task(asyncio_task, timeout=stop_timeout)) + await asyncio.shield(cancel_wait_task(asyncio_task, max_delay=stop_timeout)) diff --git a/packages/service-library/src/servicelib/background_task_utils.py b/packages/service-library/src/servicelib/background_task_utils.py new file mode 100644 index 00000000000..e275bb1306b --- /dev/null +++ b/packages/service-library/src/servicelib/background_task_utils.py @@ -0,0 +1,50 @@ +import datetime +import functools +from collections.abc import Callable, Coroutine +from typing import Any, ParamSpec, TypeVar + +from .background_task import periodic +from .redis import RedisClientSDK, exclusive + +P = ParamSpec("P") +R = TypeVar("R") + + +def exclusive_periodic( + redis_client: RedisClientSDK | Callable[..., RedisClientSDK], + *, + task_interval: datetime.timedelta, + retry_after: datetime.timedelta = datetime.timedelta(seconds=1), +) -> Callable[ + [Callable[P, Coroutine[Any, Any, None]]], Callable[P, Coroutine[Any, Any, None]] +]: + """decorates a function to become exclusive and periodic. + + Arguments: + client -- The Redis client + task_interval -- the task interval, i.e. how often the task should run + retry_after -- in case the exclusive lock cannot be acquired or is lost, this is the retry interval + + Raises: + Nothing + + Returns: + Nothing, a periodic method does not return anything as it runs forever. + """ + + def _decorator( + coro: Callable[P, Coroutine[Any, Any, None]], + ) -> Callable[P, Coroutine[Any, Any, None]]: + @periodic(interval=retry_after) + @exclusive( + redis_client, + lock_key=f"lock:exclusive_periodic_task:{coro.__module__}.{coro.__name__}", + ) + @periodic(interval=task_interval) + @functools.wraps(coro) + async def _wrapper(*args: P.args, **kwargs: P.kwargs) -> None: + return await coro(*args, **kwargs) + + return _wrapper + + return _decorator diff --git a/packages/service-library/src/servicelib/decorators.py b/packages/service-library/src/servicelib/decorators.py index 825171145ff..a0d3cae59ff 100644 --- a/packages/service-library/src/servicelib/decorators.py +++ b/packages/service-library/src/servicelib/decorators.py @@ -1,16 +1,13 @@ -""" General purpose decorators +"""General purpose decorators IMPORTANT: lowest level module I order to avoid cyclic dependences, please DO NOT IMPORT ANYTHING from . """ -import asyncio -import datetime + import logging -from collections.abc import Callable, Coroutine from copy import deepcopy from functools import wraps -from typing import Any _logger = logging.getLogger(__name__) @@ -36,17 +33,3 @@ def safe_func(*args, **kwargs): return safe_func return decorate - - -def async_delayed( - interval: datetime.timedelta, -) -> Callable[..., Callable[..., Coroutine]]: - def decorator(func) -> Callable[..., Coroutine]: - @wraps(func) - async def wrapper(*args, **kwargs) -> Any: - await asyncio.sleep(interval.total_seconds()) - return await func(*args, **kwargs) - - return wrapper - - return decorator diff --git a/packages/service-library/src/servicelib/redis/__init__.py b/packages/service-library/src/servicelib/redis/__init__.py index fe2455409d5..8d78d47ece5 100644 --- a/packages/service-library/src/servicelib/redis/__init__.py +++ b/packages/service-library/src/servicelib/redis/__init__.py @@ -1,7 +1,6 @@ from ._client import RedisClientSDK from ._clients_manager import RedisClientsManager from ._decorators import exclusive -from ._distributed_locks_utils import start_exclusive_periodic_task from ._errors import ( CouldNotAcquireLockError, CouldNotConnectToRedisError, @@ -19,7 +18,6 @@ "RedisClientSDK", "RedisClientsManager", "RedisManagerDBConfig", - "start_exclusive_periodic_task", ) # nopycln: file diff --git a/packages/service-library/src/servicelib/redis/_client.py b/packages/service-library/src/servicelib/redis/_client.py index 6e87d122cce..38a1a5f6f31 100644 --- a/packages/service-library/src/servicelib/redis/_client.py +++ b/packages/service-library/src/servicelib/redis/_client.py @@ -1,32 +1,25 @@ import asyncio -import contextlib import datetime import logging from asyncio import Task -from collections.abc import AsyncIterator from dataclasses import dataclass, field from uuid import uuid4 import redis.asyncio as aioredis import redis.exceptions -from pydantic import NonNegativeFloat from redis.asyncio.lock import Lock from redis.asyncio.retry import Retry from redis.backoff import ExponentialBackoff -from tenacity import retry -from yarl import URL -from ..background_task import periodic_task +from ..async_utils import cancel_wait_task +from ..background_task import periodic from ..logging_utils import log_catch -from ..retry_policies import RedisRetryPolicyUponInitialization from ._constants import ( DEFAULT_DECODE_RESPONSES, DEFAULT_HEALTH_CHECK_INTERVAL, DEFAULT_LOCK_TTL, DEFAULT_SOCKET_TIMEOUT, ) -from ._errors import CouldNotAcquireLockError, CouldNotConnectToRedisError -from ._utils import auto_extend_lock, cancel_or_warn _logger = logging.getLogger(__name__) @@ -40,8 +33,8 @@ class RedisClientSDK: _client: aioredis.Redis = field(init=False) _health_check_task: Task | None = None + _health_check_task_started_event: asyncio.Event | None = None _is_healthy: bool = False - _continue_health_checking: bool = True @property def redis(self) -> aioredis.Redis: @@ -55,27 +48,27 @@ def __post_init__(self) -> None: retry_on_error=[ redis.exceptions.BusyLoadingError, redis.exceptions.ConnectionError, - redis.exceptions.TimeoutError, ], + retry_on_timeout=True, socket_timeout=DEFAULT_SOCKET_TIMEOUT.total_seconds(), - socket_connect_timeout=DEFAULT_SOCKET_TIMEOUT.total_seconds(), encoding="utf-8", decode_responses=self.decode_responses, client_name=self.client_name, ) + # NOTE: connection is done here already + self._is_healthy = False + self._health_check_task_started_event = asyncio.Event() - @retry(**RedisRetryPolicyUponInitialization(_logger).kwargs) - async def setup(self) -> None: - if not await self.ping(): - await self.shutdown() - url_safe: URL = URL(self.redis_dsn).with_password("???") - raise CouldNotConnectToRedisError(dsn=f"{url_safe}") + @periodic(interval=self.health_check_interval) + async def _periodic_check_health() -> None: + assert self._health_check_task_started_event # nosec + self._health_check_task_started_event.set() + self._is_healthy = await self.ping() self._health_check_task = asyncio.create_task( - self._check_health(), + _periodic_check_health(), name=f"redis_service_health_check_{self.redis_dsn}__{uuid4()}", ) - self._is_healthy = True _logger.info( "Connection to %s succeeded with %s", @@ -85,9 +78,9 @@ async def setup(self) -> None: async def shutdown(self) -> None: if self._health_check_task: - self._continue_health_checking = False - await cancel_or_warn(self._health_check_task) - self._health_check_task = None + assert self._health_check_task_started_event # nosec + await self._health_check_task_started_event.wait() # NOTE: wait for the health check task to have started once before we can cancel it + await cancel_wait_task(self._health_check_task) await self._client.aclose(close_connection_pool=True) @@ -97,14 +90,6 @@ async def ping(self) -> bool: return True return False - async def _check_health(self) -> None: - sleep_s = self.health_check_interval.total_seconds() - - while self._continue_health_checking: - with log_catch(_logger, reraise=False): - self._is_healthy = await self.ping() - await asyncio.sleep(sleep_s) - @property def is_healthy(self) -> bool: """Returns the result of the last health check. @@ -117,77 +102,15 @@ def is_healthy(self) -> bool: """ return self._is_healthy - @contextlib.asynccontextmanager - async def lock_context( - self, - lock_key: str, - lock_value: bytes | str | None = None, - *, - blocking: bool = False, - blocking_timeout_s: NonNegativeFloat = 5, - ) -> AsyncIterator[Lock]: - """Tries to acquire a lock. - - :param lock_key: unique name of the lock - :param lock_value: content of the lock, defaults to None - :param blocking: should block here while acquiring the lock, defaults to False - :param blocking_timeout_s: time to wait while acquire a lock before giving up, defaults to 5 - - :raises CouldNotAcquireLockError: reasons why lock acquisition fails: - 1. `blocking==False` the lock was already acquired by some other entity - 2. `blocking==True` timeouts out while waiting for lock to be free (another entity holds the lock) - """ - - total_lock_duration: datetime.timedelta = DEFAULT_LOCK_TTL - lock_unique_id = f"lock_extender_{lock_key}_{uuid4()}" - - ttl_lock: Lock = self._client.lock( - name=lock_key, - timeout=total_lock_duration.total_seconds(), - blocking=blocking, - blocking_timeout=blocking_timeout_s, + def create_lock( + self, lock_name: str, *, ttl: datetime.timedelta | None = DEFAULT_LOCK_TTL + ) -> Lock: + return self._client.lock( + name=lock_name, + timeout=ttl.total_seconds() if ttl is not None else None, + blocking=False, ) - if not await ttl_lock.acquire(token=lock_value): - raise CouldNotAcquireLockError(lock=ttl_lock) - - try: - async with periodic_task( - auto_extend_lock, - interval=total_lock_duration / 2, - task_name=lock_unique_id, - lock=ttl_lock, - stop_timeout=0.1, - ): - # lock is in use now - yield ttl_lock - finally: - # NOTE Why is this error suppressed? Given the following situation: - # - 250 locks are acquired in parallel with the option `blocking=True`, - # meaning: it will wait for the lock to be free before acquiring it - # - when the lock is acquired the `_extend_lock` task is started - # in the background, extending the lock at a fixed interval of time, - # which is half of the duration of the lock's TTL. - # - before the task is released the lock extension task is cancelled - # Here is where the issue occurs: - # - some time passes between the task's cancellation and - # the call to release the lock - # - if the TTL is too small, 1/2 of the TTL might be just shorter than - # the time it passes to between the task is canceled and the task lock is released - # - this means that the lock will expire and be considered as not owned any longer - # For example: in one of the failing tests the TTL is set to `0.25` seconds, - # and half of that is `0.125` seconds. - - # Above implies that only one "task" `owns` and `extends` the lock at a time. - # The issue appears to be related some timings (being too low). - try: - await ttl_lock.release() - except redis.exceptions.LockNotOwnedError: - # if this appears outside tests it can cause issues since something might be happening - _logger.warning( - "Attention: lock is no longer owned. This is unexpected and requires investigation" - ) - async def lock_value(self, lock_name: str) -> str | None: output: str | None = await self._client.get(lock_name) return output diff --git a/packages/service-library/src/servicelib/redis/_clients_manager.py b/packages/service-library/src/servicelib/redis/_clients_manager.py index 01d34781cf2..60b93360b88 100644 --- a/packages/service-library/src/servicelib/redis/_clients_manager.py +++ b/packages/service-library/src/servicelib/redis/_clients_manager.py @@ -1,3 +1,4 @@ +import asyncio from dataclasses import dataclass, field from settings_library.redis import RedisDatabase, RedisSettings @@ -27,14 +28,11 @@ async def setup(self) -> None: client_name=f"{self.client_name}", ) - for client in self._client_sdks.values(): - await client.setup() - async def shutdown(self) -> None: - # NOTE: somehow using logged_gather is not an option - # doing so will make the shutdown procedure hang - for client in self._client_sdks.values(): - await client.shutdown() + await asyncio.gather( + *[client.shutdown() for client in self._client_sdks.values()], + return_exceptions=True, + ) def client(self, database: RedisDatabase) -> RedisClientSDK: return self._client_sdks[database] diff --git a/packages/service-library/src/servicelib/redis/_decorators.py b/packages/service-library/src/servicelib/redis/_decorators.py index 53c952b3991..6d686a33af5 100644 --- a/packages/service-library/src/servicelib/redis/_decorators.py +++ b/packages/service-library/src/servicelib/redis/_decorators.py @@ -1,56 +1,144 @@ +import asyncio +import contextlib import functools import logging -from collections.abc import Awaitable, Callable -from typing import ParamSpec, TypeVar +import socket +from collections.abc import Callable, Coroutine +from datetime import timedelta +from typing import Any, Final, ParamSpec, TypeVar +import arrow +import redis.exceptions +from redis.asyncio.lock import Lock + +from ..background_task import periodic from ._client import RedisClientSDK +from ._constants import DEFAULT_LOCK_TTL +from ._errors import CouldNotAcquireLockError, LockLostError +from ._utils import auto_extend_lock _logger = logging.getLogger(__file__) P = ParamSpec("P") R = TypeVar("R") +_EXCLUSIVE_TASK_NAME: Final[str] = "exclusive/{module_name}.{func_name}" +_EXCLUSIVE_AUTO_EXTEND_TASK_NAME: Final[ + str +] = "exclusive/autoextend_lock_{redis_lock_key}" + + +@periodic(interval=DEFAULT_LOCK_TTL / 2, raise_on_error=True) +async def _periodic_auto_extender(lock: Lock, started_event: asyncio.Event) -> None: + await auto_extend_lock(lock) + started_event.set() + def exclusive( - redis: RedisClientSDK | Callable[..., RedisClientSDK], + redis_client: RedisClientSDK | Callable[..., RedisClientSDK], *, lock_key: str | Callable[..., str], lock_value: bytes | str | None = None, -) -> Callable[[Callable[P, Awaitable[R]]], Callable[P, Awaitable[R]]]: + blocking: bool = False, + blocking_timeout: timedelta | None = None, +) -> Callable[ + [Callable[P, Coroutine[Any, Any, R]]], Callable[P, Coroutine[Any, Any, R]] +]: """ Define a method to run exclusively across processes by leveraging a Redis Lock. - parameters: - redis: the redis client SDK - lock_key: a string as the name of the lock (good practice: app_name:lock_name) - lock_value: some additional data that can be retrieved by another client + Arguments: + redis -- the redis client + lock_key -- a string as the name of the lock (good practice: app_name:lock_name) + lock_value -- some additional data that can be retrieved by another client if None, + it will be automatically filled with the current time and the client name Raises: - ValueError if used incorrectly - CouldNotAcquireLockError if the lock could not be acquired + - LockLostError if the lock was lost (e.g. due to Redis restart, or TTL was not extended in time) """ if not lock_key: msg = "lock_key cannot be empty string!" raise ValueError(msg) - def decorator(func: Callable[P, Awaitable[R]]) -> Callable[P, Awaitable[R]]: - @functools.wraps(func) - async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R: + def _decorator( + coro: Callable[P, Coroutine[Any, Any, R]], + ) -> Callable[P, Coroutine[Any, Any, R]]: + @functools.wraps(coro) + async def _wrapper(*args: P.args, **kwargs: P.kwargs) -> R: redis_lock_key = ( lock_key(*args, **kwargs) if callable(lock_key) else lock_key ) assert isinstance(redis_lock_key, str) # nosec - redis_client = redis(*args, **kwargs) if callable(redis) else redis - assert isinstance(redis_client, RedisClientSDK) # nosec + client = ( + redis_client(*args, **kwargs) + if callable(redis_client) + else redis_client + ) + assert isinstance(client, RedisClientSDK) # nosec + nonlocal lock_value + if lock_value is None: + lock_value = f"locked since {arrow.utcnow().format()} by {client.client_name} on {socket.gethostname()}" - async with redis_client.lock_context( - lock_key=redis_lock_key, lock_value=lock_value + lock = client.create_lock(redis_lock_key, ttl=DEFAULT_LOCK_TTL) + if not await lock.acquire( + token=lock_value, + blocking=blocking, + blocking_timeout=( + blocking_timeout.total_seconds() if blocking_timeout else None + ), ): - return await func(*args, **kwargs) + raise CouldNotAcquireLockError(lock=lock) + + try: + async with asyncio.TaskGroup() as tg: + started_event = asyncio.Event() + # first create a task that will auto-extend the lock + auto_extend_lock_task = tg.create_task( + _periodic_auto_extender(lock, started_event), + name=_EXCLUSIVE_AUTO_EXTEND_TASK_NAME.format( + redis_lock_key=redis_lock_key + ), + ) + # NOTE: In case the work thread is raising right away, + # this ensures the extend task ran once and ensure cancellation works + await started_event.wait() + + # then the task that runs the user code + assert asyncio.iscoroutinefunction(coro) # nosec + work_task = tg.create_task( + coro(*args, **kwargs), + name=_EXCLUSIVE_TASK_NAME.format( + module_name=coro.__module__, func_name=coro.__name__ + ), + ) + + res = await work_task + auto_extend_lock_task.cancel() + return res + + except BaseExceptionGroup as eg: + # Separate exceptions into LockLostError and others + lock_lost_errors, other_errors = eg.split(LockLostError) + + # If there are any other errors, re-raise them + if other_errors: + assert len(other_errors.exceptions) == 1 # nosec + raise other_errors.exceptions[0] from eg + + assert lock_lost_errors is not None # nosec + assert len(lock_lost_errors.exceptions) == 1 # nosec + raise lock_lost_errors.exceptions[0] from eg + finally: + with contextlib.suppress(redis.exceptions.LockNotOwnedError): + # in the case where the lock would have been lost, + # this would raise again and is not necessary + await lock.release() - return wrapper + return _wrapper - return decorator + return _decorator diff --git a/packages/service-library/src/servicelib/redis/_distributed_locks_utils.py b/packages/service-library/src/servicelib/redis/_distributed_locks_utils.py deleted file mode 100644 index 2560e88c41f..00000000000 --- a/packages/service-library/src/servicelib/redis/_distributed_locks_utils.py +++ /dev/null @@ -1,79 +0,0 @@ -import asyncio -import datetime -import logging -from collections.abc import Awaitable, Callable - -import arrow -from servicelib.background_task import start_periodic_task - -from ._client import RedisClientSDK -from ._decorators import exclusive -from ._errors import CouldNotAcquireLockError - -_logger = logging.getLogger(__name__) - - -async def _exclusive_task_starter( - client: RedisClientSDK, - usr_tsk_task: Callable[..., Awaitable[None]], - *, - usr_tsk_interval: datetime.timedelta, - usr_tsk_task_name: str, - **kwargs, -) -> None: - lock_key = f"lock:exclusive_task_starter:{usr_tsk_task_name}" - lock_value = f"locked since {arrow.utcnow().format()}" - - try: - await exclusive(client, lock_key=lock_key, lock_value=lock_value)( - start_periodic_task - )( - usr_tsk_task, - interval=usr_tsk_interval, - task_name=usr_tsk_task_name, - **kwargs, - ) - except CouldNotAcquireLockError: - _logger.debug( - "Could not acquire lock '%s' with value '%s'", lock_key, lock_value - ) - except Exception as e: - _logger.exception(e) # noqa: TRY401 - raise - - -def start_exclusive_periodic_task( - client: RedisClientSDK, - task: Callable[..., Awaitable[None]], - *, - task_period: datetime.timedelta, - retry_after: datetime.timedelta = datetime.timedelta(seconds=1), - task_name: str, - **kwargs, -) -> asyncio.Task: - """ - Ensures that only 1 process periodically ever runs ``task`` at all times. - If one process dies, another process will run the ``task``. - - Creates a background task that periodically tries to start the user ``task``. - Before the ``task`` is scheduled for periodic background execution, it acquires a lock. - Subsequent calls to ``start_exclusive_periodic_task`` will not allow the same ``task`` - to start since the lock will prevent the scheduling. - - Q&A: - - Why is `_exclusive_task_starter` run as a task? - This is usually used at setup time and cannot block the setup process forever - - Why is `_exclusive_task_starter` task a periodic task? - If Redis connectivity is lost, the periodic `_exclusive_task_starter` ensures the lock is - reacquired - """ - return start_periodic_task( - _exclusive_task_starter, - interval=retry_after, - task_name=f"exclusive_task_starter_{task_name}", - client=client, - usr_tsk_task=task, - usr_tsk_interval=task_period, - usr_tsk_task_name=task_name, - **kwargs, - ) diff --git a/packages/service-library/src/servicelib/redis/_errors.py b/packages/service-library/src/servicelib/redis/_errors.py index 1a62f8f2e5a..998a9c1cb51 100644 --- a/packages/service-library/src/servicelib/redis/_errors.py +++ b/packages/service-library/src/servicelib/redis/_errors.py @@ -14,4 +14,8 @@ class CouldNotConnectToRedisError(BaseRedisError): class LockLostError(BaseRedisError): - msg_template: str = "Lock {lock.name} has been lost" + msg_template: str = ( + "Lock {lock.name} has been lost (e.g. it could not be auto-extended!)" + "TIP: check connection to Redis DBs or look for Synchronous " + "code that might block the auto-extender task. Somehow the distributed lock disappeared!" + ) diff --git a/packages/service-library/src/servicelib/redis/_utils.py b/packages/service-library/src/servicelib/redis/_utils.py index 76fe12cb10e..52d112ca4fe 100644 --- a/packages/service-library/src/servicelib/redis/_utils.py +++ b/packages/service-library/src/servicelib/redis/_utils.py @@ -1,4 +1,3 @@ -import asyncio import logging from collections.abc import Awaitable from typing import Any @@ -7,22 +6,21 @@ from redis.asyncio.lock import Lock from ..logging_utils import log_context -from ._constants import SHUTDOWN_TIMEOUT_S from ._errors import LockLostError _logger = logging.getLogger(__name__) -async def cancel_or_warn(task: asyncio.Task) -> None: - if not task.cancelled(): - task.cancel() - _, pending = await asyncio.wait((task,), timeout=SHUTDOWN_TIMEOUT_S) - if pending: - task_name = task.get_name() - _logger.warning("Could not cancel task_name=%s pending=%s", task_name, pending) +async def auto_extend_lock(lock: Lock) -> None: + """automatically extend a distributed lock TTL (time to live) by re-acquiring the lock + Arguments: + lock -- the lock to auto-extend -async def auto_extend_lock(lock: Lock) -> None: + Raises: + LockLostError: in case the lock is not available anymore + LockError: in case of wrong usage (no timeout or lock was not previously acquired) + """ try: with log_context(_logger, logging.DEBUG, f"Autoextend lock {lock.name!r}"): await lock.reacquire() diff --git a/packages/service-library/tests/conftest.py b/packages/service-library/tests/conftest.py index b6af04f6b78..979a3731071 100644 --- a/packages/service-library/tests/conftest.py +++ b/packages/service-library/tests/conftest.py @@ -87,7 +87,6 @@ async def _( assert client assert client.redis_dsn == redis_resources_dns assert client.client_name == "pytest" - await client.setup() yield client diff --git a/packages/service-library/tests/deferred_tasks/example_app.py b/packages/service-library/tests/deferred_tasks/example_app.py index 991aa2efe8e..9adb654e896 100644 --- a/packages/service-library/tests/deferred_tasks/example_app.py +++ b/packages/service-library/tests/deferred_tasks/example_app.py @@ -95,7 +95,6 @@ def __init__( ) async def setup(self) -> None: - await self._redis_client.setup() await self._manager.setup() diff --git a/packages/service-library/tests/deferred_tasks/test__base_deferred_handler.py b/packages/service-library/tests/deferred_tasks/test__base_deferred_handler.py index 3aa5b53e7f5..cc19133b6b2 100644 --- a/packages/service-library/tests/deferred_tasks/test__base_deferred_handler.py +++ b/packages/service-library/tests/deferred_tasks/test__base_deferred_handler.py @@ -57,7 +57,6 @@ async def redis_client_sdk( decode_responses=False, client_name="pytest", ) - await sdk.setup() yield sdk await sdk.shutdown() @@ -430,7 +429,7 @@ async def test_deferred_manager_code_times_out( get_mocked_deferred_handler: Callable[ [int, timedelta, Callable[[DeferredContext], Awaitable[Any]]], tuple[dict[MockKeys, Mock], type[BaseDeferredHandler]], - ] + ], ): async def _run_that_times_out(_: DeferredContext) -> None: await asyncio.sleep(1e6) diff --git a/packages/service-library/tests/deferred_tasks/test_deferred_tasks.py b/packages/service-library/tests/deferred_tasks/test_deferred_tasks.py index 8f09e436885..daccac23c61 100644 --- a/packages/service-library/tests/deferred_tasks/test_deferred_tasks.py +++ b/packages/service-library/tests/deferred_tasks/test_deferred_tasks.py @@ -22,7 +22,6 @@ from pytest_mock import MockerFixture from servicelib.rabbitmq import RabbitMQClient from servicelib.redis import RedisClientSDK -from servicelib.redis import _constants as redis_client_constants from servicelib.sequences_utils import partition_gen from settings_library.rabbit import RabbitSettings from settings_library.redis import RedisSettings @@ -389,9 +388,8 @@ async def pause_redis(self) -> AsyncIterator[None]: @pytest.fixture def mock_default_socket_timeout(mocker: MockerFixture) -> None: - mocker.patch.object( - redis_client_constants, - "DEFAULT_SOCKET_TIMEOUT", + mocker.patch( + "servicelib.redis._client.DEFAULT_SOCKET_TIMEOUT", datetime.timedelta(seconds=0.25), ) diff --git a/packages/service-library/tests/redis/conftest.py b/packages/service-library/tests/redis/conftest.py new file mode 100644 index 00000000000..ae6d04c2085 --- /dev/null +++ b/packages/service-library/tests/redis/conftest.py @@ -0,0 +1,32 @@ +import datetime +from collections.abc import AsyncIterator, Callable +from contextlib import AbstractAsyncContextManager + +import pytest +from faker import Faker +from pytest_mock import MockerFixture +from servicelib.redis import _constants as redis_constants +from servicelib.redis._client import RedisClientSDK +from settings_library.redis import RedisDatabase + + +@pytest.fixture +async def redis_client_sdk( + get_redis_client_sdk: Callable[ + [RedisDatabase], AbstractAsyncContextManager[RedisClientSDK] + ], +) -> AsyncIterator[RedisClientSDK]: + async with get_redis_client_sdk(RedisDatabase.RESOURCES) as client: + yield client + + +@pytest.fixture +def lock_name(faker: Faker) -> str: + return faker.pystr() + + +@pytest.fixture +def with_short_default_redis_lock_ttl(mocker: MockerFixture) -> datetime.timedelta: + short_ttl = datetime.timedelta(seconds=0.25) + mocker.patch.object(redis_constants, "DEFAULT_LOCK_TTL", short_ttl) + return short_ttl diff --git a/packages/service-library/tests/redis/test_client.py b/packages/service-library/tests/redis/test_client.py new file mode 100644 index 00000000000..210c857bb9b --- /dev/null +++ b/packages/service-library/tests/redis/test_client.py @@ -0,0 +1,143 @@ +# pylint:disable=unused-variable +# pylint:disable=unused-argument +# pylint:disable=redefined-outer-name +# pylint:disable=protected-access + + +import asyncio +import datetime +from collections.abc import Callable +from contextlib import AbstractAsyncContextManager + +import pytest +from redis.exceptions import LockError, LockNotOwnedError +from servicelib.redis import RedisClientSDK +from settings_library.redis import RedisDatabase, RedisSettings +from tenacity import ( + AsyncRetrying, + retry_if_exception_type, + stop_after_delay, + wait_fixed, +) + +pytest_simcore_core_services_selection = [ + "redis", +] + +pytest_simcore_ops_services_selection = [ + "redis-commander", +] + + +@pytest.fixture +def redis_lock_ttl() -> datetime.timedelta: + return datetime.timedelta(seconds=1) + + +async def test_redis_lock_no_ttl(redis_client_sdk: RedisClientSDK, lock_name: str): + lock = redis_client_sdk.create_lock(lock_name, ttl=None) + assert await lock.locked() is False + + lock_acquired = await lock.acquire(blocking=False) + assert lock_acquired is True + assert await lock.locked() is True + assert await lock.owned() is True + with pytest.raises(LockError): + # a lock with no ttl cannot be reacquired + await lock.reacquire() + with pytest.raises(LockError): + # a lock with no ttl cannot be extended + await lock.extend(2) + + # try to acquire the lock a second time + same_lock = redis_client_sdk.create_lock(lock_name, ttl=None) + assert await same_lock.locked() is True + assert await same_lock.owned() is False + assert await same_lock.acquire(blocking=False) is False + + # now release the lock + await lock.release() + assert not await lock.locked() + assert not await lock.owned() + + +async def test_redis_lock_context_manager_no_ttl( + redis_client_sdk: RedisClientSDK, lock_name: str +): + lock = redis_client_sdk.create_lock(lock_name, ttl=None) + assert not await lock.locked() + + async with lock: + assert await lock.locked() + assert await lock.owned() + with pytest.raises(LockError): + # a lock with no timeout cannot be reacquired + await lock.reacquire() + + with pytest.raises(LockError): + # a lock with no timeout cannot be extended + await lock.extend(2) + + # try to acquire the lock a second time + same_lock = redis_client_sdk.create_lock(lock_name, ttl=None) + assert await same_lock.locked() + assert not await same_lock.owned() + assert await same_lock.acquire() is False + with pytest.raises(LockError): + async with same_lock: + ... + assert not await lock.locked() + + +async def test_redis_lock_with_ttl( + redis_client_sdk: RedisClientSDK, lock_name: str, redis_lock_ttl: datetime.timedelta +): + ttl_lock = redis_client_sdk.create_lock(lock_name, ttl=redis_lock_ttl) + assert not await ttl_lock.locked() + + with pytest.raises(LockNotOwnedError): # noqa: PT012 + # this raises as the lock is lost + async with ttl_lock: + assert await ttl_lock.locked() + assert await ttl_lock.owned() + await asyncio.sleep(2 * redis_lock_ttl.total_seconds()) + assert not await ttl_lock.locked() + + +async def test_redis_client_sdk_setup_shutdown( + mock_redis_socket_timeout: None, redis_service: RedisSettings +): + # setup + redis_resources_dns = redis_service.build_redis_dsn(RedisDatabase.RESOURCES) + client = RedisClientSDK(redis_resources_dns, client_name="pytest") + assert client + assert client.redis_dsn == redis_resources_dns + + # ensure health check task sets the health to True + client._is_healthy = False # noqa: SLF001 + async for attempt in AsyncRetrying( + wait=wait_fixed(0.1), + stop=stop_after_delay(10), + reraise=True, + retry=retry_if_exception_type(AssertionError), + ): + with attempt: + assert client.is_healthy is True + + # cleanup + await client.redis.flushall() + await client.shutdown() + + +async def test_regression_fails_on_redis_service_outage( + mock_redis_socket_timeout: None, + paused_container: Callable[[str], AbstractAsyncContextManager[None]], + redis_client_sdk: RedisClientSDK, +): + assert await redis_client_sdk.ping() is True + + async with paused_container("redis"): + # no connection available any longer should not hang but timeout + assert await redis_client_sdk.ping() is False + + assert await redis_client_sdk.ping() is True diff --git a/packages/service-library/tests/redis/test_clients_manager.py b/packages/service-library/tests/redis/test_clients_manager.py new file mode 100644 index 00000000000..eeb110557e3 --- /dev/null +++ b/packages/service-library/tests/redis/test_clients_manager.py @@ -0,0 +1,33 @@ +# pylint:disable=unused-variable +# pylint:disable=unused-argument +# pylint:disable=redefined-outer-name +# pylint:disable=protected-access + +from servicelib.redis._clients_manager import RedisClientsManager +from servicelib.redis._models import RedisManagerDBConfig +from settings_library.redis import RedisDatabase, RedisSettings + +pytest_simcore_core_services_selection = [ + "redis", +] +pytest_simcore_ops_services_selection = [ + "redis-commander", +] + + +async def test_redis_client_sdks_manager( + mock_redis_socket_timeout: None, + redis_service: RedisSettings, +): + all_redis_configs: set[RedisManagerDBConfig] = { + RedisManagerDBConfig(database=db) for db in RedisDatabase + } + manager = RedisClientsManager( + databases_configs=all_redis_configs, + settings=redis_service, + client_name="pytest", + ) + + async with manager: + for config in all_redis_configs: + assert manager.client(config.database) diff --git a/packages/service-library/tests/redis/test_decorators.py b/packages/service-library/tests/redis/test_decorators.py new file mode 100644 index 00000000000..643cfef99d8 --- /dev/null +++ b/packages/service-library/tests/redis/test_decorators.py @@ -0,0 +1,312 @@ +# pylint:disable=unused-variable +# pylint:disable=unused-argument +# pylint:disable=redefined-outer-name +# pylint:disable=protected-access + + +import asyncio +import datetime +from collections.abc import Awaitable, Callable +from typing import Final + +import pytest +from faker import Faker +from servicelib.redis import CouldNotAcquireLockError, RedisClientSDK, exclusive +from servicelib.redis._decorators import ( + _EXCLUSIVE_AUTO_EXTEND_TASK_NAME, + _EXCLUSIVE_TASK_NAME, +) +from servicelib.redis._errors import LockLostError +from servicelib.utils import limited_gather, logged_gather + +pytest_simcore_core_services_selection = [ + "redis", +] +pytest_simcore_ops_services_selection = [ + "redis-commander", +] + + +def _assert_exclusive_tasks_are_cancelled(lock_name: str, func: Callable) -> None: + assert _EXCLUSIVE_AUTO_EXTEND_TASK_NAME.format(redis_lock_key=lock_name) not in [ + t.get_name() for t in asyncio.tasks.all_tasks() + ], "the auto extend lock task was not properly stopped!" + assert _EXCLUSIVE_TASK_NAME.format( + module_name=func.__module__, func_name=func.__name__ + ) not in [ + t.get_name() for t in asyncio.tasks.all_tasks() + ], "the exclusive task was not properly stopped!" + + +async def _is_locked(redis_client_sdk: RedisClientSDK, lock_name: str) -> bool: + lock = redis_client_sdk.redis.lock(lock_name) + return await lock.locked() + + +def _exclusive_sleeping_task( + redis_client_sdk: RedisClientSDK | Callable[..., RedisClientSDK], + lock_name: str | Callable[..., str], + sleep_duration: float, +) -> Callable[..., Awaitable[float]]: + @exclusive(redis_client_sdk, lock_key=lock_name) + async def _() -> float: + resolved_client = ( + redis_client_sdk() if callable(redis_client_sdk) else redis_client_sdk + ) + resolved_lock_name = lock_name() if callable(lock_name) else lock_name + assert await _is_locked(resolved_client, resolved_lock_name) + await asyncio.sleep(sleep_duration) + assert await _is_locked(resolved_client, resolved_lock_name) + return sleep_duration + + return _ + + +@pytest.fixture +def sleep_duration(faker: Faker) -> float: + return faker.pyfloat(min_value=0.2, max_value=0.8) + + +async def test_exclusive_with_empty_lock_key_raises(redis_client_sdk: RedisClientSDK): + with pytest.raises(ValueError, match="lock_key cannot be empty"): + + @exclusive(redis_client_sdk, lock_key="") + async def _(): + pass + + +async def test_exclusive_decorator( + redis_client_sdk: RedisClientSDK, + lock_name: str, + sleep_duration: float, +): + for _ in range(3): + assert ( + await _exclusive_sleeping_task( + redis_client_sdk, lock_name, sleep_duration + )() + == sleep_duration + ) + + +async def test_exclusive_decorator_with_key_builder( + redis_client_sdk: RedisClientSDK, + lock_name: str, + sleep_duration: float, +): + def _get_lock_name(*args, **kwargs) -> str: + assert args is not None + assert kwargs is not None + return lock_name + + for _ in range(3): + assert ( + await _exclusive_sleeping_task( + redis_client_sdk, _get_lock_name, sleep_duration + )() + == sleep_duration + ) + + +async def test_exclusive_decorator_with_client_builder( + redis_client_sdk: RedisClientSDK, + lock_name: str, + sleep_duration: float, +): + def _get_redis_client_builder(*args, **kwargs) -> RedisClientSDK: + assert args is not None + assert kwargs is not None + return redis_client_sdk + + for _ in range(3): + assert ( + await _exclusive_sleeping_task( + _get_redis_client_builder, lock_name, sleep_duration + )() + == sleep_duration + ) + + +async def _acquire_lock_and_exclusively_sleep( + redis_client_sdk: RedisClientSDK, + lock_name: str | Callable[..., str], + sleep_duration: float, +) -> None: + redis_lock_name = lock_name() if callable(lock_name) else lock_name + + @exclusive(redis_client_sdk, lock_key=lock_name) + async def _() -> float: + assert await _is_locked(redis_client_sdk, redis_lock_name) + await asyncio.sleep(sleep_duration) + assert await _is_locked(redis_client_sdk, redis_lock_name) + return sleep_duration + + assert await _() == sleep_duration + + assert not await _is_locked(redis_client_sdk, redis_lock_name) + + +async def test_exclusive_parallel_lock_is_released_and_reacquired( + redis_client_sdk: RedisClientSDK, + lock_name: str, +): + parallel_tasks = 10 + results = await logged_gather( + *[ + _acquire_lock_and_exclusively_sleep( + redis_client_sdk, lock_name, sleep_duration=1 + ) + for _ in range(parallel_tasks) + ], + reraise=False, + ) + assert results.count(None) == 1 + assert [isinstance(x, CouldNotAcquireLockError) for x in results].count( + True + ) == parallel_tasks - 1 + + # check lock is released + assert not await _is_locked(redis_client_sdk, lock_name) + + +async def test_exclusive_raises_if_lock_is_lost( + redis_client_sdk: RedisClientSDK, + lock_name: str, +): + started_event = asyncio.Event() + + @exclusive(redis_client_sdk, lock_key=lock_name) + async def _sleeper(time_to_sleep: datetime.timedelta) -> datetime.timedelta: + started_event.set() + await asyncio.sleep(time_to_sleep.total_seconds()) + return time_to_sleep + + exclusive_task = asyncio.create_task(_sleeper(datetime.timedelta(seconds=10))) + await asyncio.wait_for(started_event.wait(), timeout=2) + # let's simlulate lost lock by forcefully deleting it + await redis_client_sdk.redis.delete(lock_name) + + with pytest.raises(LockLostError): + await exclusive_task + + _assert_exclusive_tasks_are_cancelled(lock_name, _sleeper) + + +@pytest.fixture +def lock_data(faker: Faker) -> str: + return faker.text() + + +async def test_exclusive_with_lock_value( + redis_client_sdk: RedisClientSDK, lock_name: str, lock_data: str +): + started_event = asyncio.Event() + + @exclusive(redis_client_sdk, lock_key=lock_name, lock_value=lock_data) + async def _sleeper(time_to_sleep: datetime.timedelta) -> datetime.timedelta: + started_event.set() + await asyncio.sleep(time_to_sleep.total_seconds()) + return time_to_sleep + + # initial state + assert await _is_locked(redis_client_sdk, lock_name) is False + assert await redis_client_sdk.lock_value(lock_name) is None + + # run the exclusive task + exclusive_task = asyncio.create_task(_sleeper(datetime.timedelta(seconds=3))) + await asyncio.wait_for(started_event.wait(), timeout=2) + # expected + assert await _is_locked(redis_client_sdk, lock_name) is True + assert await redis_client_sdk.lock_value(lock_name) == lock_data + # now let the task finish + assert await exclusive_task == datetime.timedelta(seconds=3) + # expected + assert await _is_locked(redis_client_sdk, lock_name) is False + assert await redis_client_sdk.lock_value(lock_name) is None + + _assert_exclusive_tasks_are_cancelled(lock_name, _sleeper) + + +async def test_exclusive_task_erroring_releases_lock( + redis_client_sdk: RedisClientSDK, lock_name: str +): + @exclusive(redis_client_sdk, lock_key=lock_name) + async def _raising_func() -> None: + msg = "Expected error" + raise RuntimeError(msg) + + # initial state + assert await _is_locked(redis_client_sdk, lock_name) is False + assert await redis_client_sdk.lock_value(lock_name) is None + + with pytest.raises(RuntimeError): + await _raising_func() + + assert await redis_client_sdk.lock_value(lock_name) is None + + _assert_exclusive_tasks_are_cancelled(lock_name, _raising_func) + + +async def test_lock_acquired_in_parallel_to_update_same_resource( + with_short_default_redis_lock_ttl: datetime.timedelta, + redis_client_sdk: RedisClientSDK, + lock_name: str, +): + INCREASE_OPERATIONS: Final[int] = 250 + INCREASE_BY: Final[int] = 10 + + class RaceConditionCounter: + def __init__(self) -> None: + self.value: int = 0 + + async def race_condition_increase(self, by: int) -> None: + current_value = self.value + current_value += by + # most likely situation which creates issues + await asyncio.sleep(with_short_default_redis_lock_ttl.total_seconds() / 2) + self.value = current_value + + counter = RaceConditionCounter() + # ensures it does nto time out before acquiring the lock + time_for_all_inc_counter_calls_to_finish = ( + with_short_default_redis_lock_ttl * INCREASE_OPERATIONS * 10 + ) + + @exclusive( + redis_client_sdk, + lock_key=lock_name, + blocking=True, + blocking_timeout=time_for_all_inc_counter_calls_to_finish, + ) + async def _inc_counter() -> None: + await counter.race_condition_increase(INCREASE_BY) + + await limited_gather( + *(_inc_counter() for _ in range(INCREASE_OPERATIONS)), limit=15 + ) + assert counter.value == INCREASE_BY * INCREASE_OPERATIONS + + _assert_exclusive_tasks_are_cancelled(lock_name, _inc_counter) + + +async def test_cancelling_exclusive_task_cancels_properly( + redis_client_sdk: RedisClientSDK, lock_name: str +): + started_event = asyncio.Event() + + @exclusive(redis_client_sdk, lock_key=lock_name) + async def _sleep_task(time_to_sleep: datetime.timedelta) -> datetime.timedelta: + started_event.set() + await asyncio.sleep(time_to_sleep.total_seconds()) + return time_to_sleep + + exclusive_task = asyncio.create_task(_sleep_task(datetime.timedelta(seconds=10))) + await asyncio.wait_for(started_event.wait(), timeout=2) + exclusive_task.cancel() + + with pytest.raises(asyncio.CancelledError): + await exclusive_task + + assert not await _is_locked(redis_client_sdk, lock_name) + + _assert_exclusive_tasks_are_cancelled(lock_name, _sleep_task) diff --git a/packages/service-library/tests/redis/test_redis.py b/packages/service-library/tests/redis/test_redis.py deleted file mode 100644 index 6a88641f844..00000000000 --- a/packages/service-library/tests/redis/test_redis.py +++ /dev/null @@ -1,349 +0,0 @@ -# pylint:disable=unused-variable -# pylint:disable=unused-argument -# pylint:disable=redefined-outer-name -# pylint:disable=protected-access - - -import asyncio -import datetime -from collections.abc import AsyncIterator, Callable -from contextlib import AbstractAsyncContextManager -from typing import Final - -import pytest -from faker import Faker -from pytest_mock import MockerFixture -from redis.exceptions import LockError, LockNotOwnedError -from servicelib.redis import ( - CouldNotAcquireLockError, - LockLostError, - RedisClientSDK, - RedisClientsManager, - RedisManagerDBConfig, -) -from servicelib.redis import _constants as redis_constants -from servicelib.utils import limited_gather -from settings_library.redis import RedisDatabase, RedisSettings -from tenacity import ( - AsyncRetrying, - retry_if_exception_type, - stop_after_delay, - wait_fixed, -) - -pytest_simcore_core_services_selection = [ - "redis", -] - -pytest_simcore_ops_services_selection = [ - "redis-commander", -] - - -async def _is_locked(redis_client_sdk: RedisClientSDK, lock_name: str) -> bool: - lock = redis_client_sdk.redis.lock(lock_name) - return await lock.locked() - - -@pytest.fixture -async def redis_client_sdk( - get_redis_client_sdk: Callable[ - [RedisDatabase], AbstractAsyncContextManager[RedisClientSDK] - ], -) -> AsyncIterator[RedisClientSDK]: - async with get_redis_client_sdk(RedisDatabase.RESOURCES) as client: - yield client - - -@pytest.fixture -def lock_timeout() -> datetime.timedelta: - return datetime.timedelta(seconds=1) - - -@pytest.fixture -def mock_default_lock_ttl(mocker: MockerFixture) -> None: - mocker.patch.object( - redis_constants, "DEFAULT_LOCK_TTL", datetime.timedelta(seconds=0.25) - ) - - -async def test_redis_key_encode_decode(redis_client_sdk: RedisClientSDK, faker: Faker): - key = faker.pystr() - value = faker.pystr() - await redis_client_sdk.redis.set(key, value) - val = await redis_client_sdk.redis.get(key) - assert val == value - await redis_client_sdk.redis.delete(key) - - -async def test_redis_lock_acquisition(redis_client_sdk: RedisClientSDK, faker: Faker): - lock_name = faker.pystr() - lock = redis_client_sdk.redis.lock(lock_name) - assert await lock.locked() is False - - # Try to acquire the lock: - lock_acquired = await lock.acquire(blocking=False) - assert lock_acquired is True - assert await lock.locked() is True - assert await lock.owned() is True - with pytest.raises(LockError): - # a lock with no timeout cannot be reacquired - await lock.reacquire() - with pytest.raises(LockError): - # a lock with no timeout cannot be extended - await lock.extend(2) - - # try to acquire the lock a second time - same_lock = redis_client_sdk.redis.lock(lock_name) - assert await same_lock.locked() is True - assert await same_lock.owned() is False - assert await same_lock.acquire(blocking=False) is False - - # now release the lock - await lock.release() - assert not await lock.locked() - assert not await lock.owned() - - -async def test_redis_lock_context_manager( - redis_client_sdk: RedisClientSDK, faker: Faker -): - lock_name = faker.pystr() - lock = redis_client_sdk.redis.lock(lock_name) - assert not await lock.locked() - - async with lock: - assert await lock.locked() - assert await lock.owned() - with pytest.raises(LockError): - # a lock with no timeout cannot be reacquired - await lock.reacquire() - - with pytest.raises(LockError): - # a lock with no timeout cannot be extended - await lock.extend(2) - - # try to acquire the lock a second time - same_lock = redis_client_sdk.redis.lock(lock_name, blocking_timeout=1) - assert await same_lock.locked() - assert not await same_lock.owned() - assert await same_lock.acquire() is False - with pytest.raises(LockError): - async with same_lock: - ... - assert not await lock.locked() - - -async def test_redis_lock_with_ttl( - redis_client_sdk: RedisClientSDK, faker: Faker, lock_timeout: datetime.timedelta -): - ttl_lock = redis_client_sdk.redis.lock( - faker.pystr(), timeout=lock_timeout.total_seconds() - ) - assert not await ttl_lock.locked() - - with pytest.raises(LockNotOwnedError): # noqa: PT012 - # this raises as the lock is lost - async with ttl_lock: - assert await ttl_lock.locked() - assert await ttl_lock.owned() - await asyncio.sleep(2 * lock_timeout.total_seconds()) - assert not await ttl_lock.locked() - - -async def test_lock_context( - redis_client_sdk: RedisClientSDK, faker: Faker, lock_timeout: datetime.timedelta -): - lock_name = faker.pystr() - assert await _is_locked(redis_client_sdk, lock_name) is False - async with redis_client_sdk.lock_context(lock_name) as ttl_lock: - assert await _is_locked(redis_client_sdk, lock_name) is True - assert await ttl_lock.owned() is True - await asyncio.sleep(5 * lock_timeout.total_seconds()) - assert await _is_locked(redis_client_sdk, lock_name) is True - assert await ttl_lock.owned() is True - assert await _is_locked(redis_client_sdk, lock_name) is False - assert await ttl_lock.owned() is False - - -@pytest.mark.xfail(reason="This test shows an issue, that will be fixed in the next PR") -async def test_lock_context_raises_if_lock_is_lost( - redis_client_sdk: RedisClientSDK, faker: Faker -): - lock_name = faker.pystr() - with pytest.raises(LockLostError): # noqa: PT012 - async with redis_client_sdk.lock_context(lock_name) as ttl_lock: - assert await _is_locked(redis_client_sdk, lock_name) is True - assert await ttl_lock.owned() is True - # let's simlulate lost lock by forcefully deleting it - await redis_client_sdk._client.delete(lock_name) # noqa: SLF001 - # now we wait for the exception to be raised - await asyncio.sleep(20) - - -async def test_lock_context_with_already_locked_lock_raises( - redis_client_sdk: RedisClientSDK, faker: Faker -): - lock_name = faker.pystr() - assert await _is_locked(redis_client_sdk, lock_name) is False - async with redis_client_sdk.lock_context(lock_name) as lock: - assert await _is_locked(redis_client_sdk, lock_name) is True - - assert isinstance(lock.name, str) - - # case where gives up immediately to acquire lock without waiting - with pytest.raises(CouldNotAcquireLockError): - async with redis_client_sdk.lock_context(lock.name, blocking=False): - ... - - # case when lock waits up to blocking_timeout_s before giving up on - # lock acquisition - with pytest.raises(CouldNotAcquireLockError): - async with redis_client_sdk.lock_context( - lock.name, blocking=True, blocking_timeout_s=0.1 - ): - ... - - assert await lock.locked() is True - assert await _is_locked(redis_client_sdk, lock_name) is False - - -async def test_lock_context_with_data(redis_client_sdk: RedisClientSDK, faker: Faker): - lock_data = faker.text() - lock_name = faker.pystr() - assert await _is_locked(redis_client_sdk, lock_name) is False - assert await redis_client_sdk.lock_value(lock_name) is None - async with redis_client_sdk.lock_context(lock_name, lock_value=lock_data): - assert await _is_locked(redis_client_sdk, lock_name) is True - assert await redis_client_sdk.lock_value(lock_name) == lock_data - assert await _is_locked(redis_client_sdk, lock_name) is False - assert await redis_client_sdk.lock_value(lock_name) is None - - -async def test_lock_context_released_after_error( - redis_client_sdk: RedisClientSDK, faker: Faker -): - lock_name = faker.pystr() - - assert await redis_client_sdk.lock_value(lock_name) is None - - with pytest.raises(RuntimeError): # noqa: PT012 - async with redis_client_sdk.lock_context(lock_name): - assert await redis_client_sdk.redis.get(lock_name) is not None - msg = "Expected error" - raise RuntimeError(msg) - - assert await redis_client_sdk.lock_value(lock_name) is None - - -async def test_lock_acquired_in_parallel_to_update_same_resource( - mock_default_lock_ttl: None, - get_redis_client_sdk: Callable[ - [RedisDatabase], AbstractAsyncContextManager[RedisClientSDK] - ], - faker: Faker, -): - INCREASE_OPERATIONS: Final[int] = 250 - INCREASE_BY: Final[int] = 10 - - class RaceConditionCounter: - def __init__(self): - self.value: int = 0 - - async def race_condition_increase(self, by: int) -> None: - current_value = self.value - current_value += by - # most likely situation which creates issues - await asyncio.sleep(redis_constants.DEFAULT_LOCK_TTL.total_seconds() / 2) - self.value = current_value - - counter = RaceConditionCounter() - lock_name: str = faker.pystr() - # ensures it does nto time out before acquiring the lock - time_for_all_inc_counter_calls_to_finish_s: float = ( - redis_constants.DEFAULT_LOCK_TTL.total_seconds() * INCREASE_OPERATIONS * 10 - ) - - async def _inc_counter() -> None: - async with get_redis_client_sdk( # noqa: SIM117 - RedisDatabase.RESOURCES - ) as redis_client_sdk: - async with redis_client_sdk.lock_context( - lock_key=lock_name, - blocking=True, - blocking_timeout_s=time_for_all_inc_counter_calls_to_finish_s, - ): - await counter.race_condition_increase(INCREASE_BY) - - await limited_gather( - *(_inc_counter() for _ in range(INCREASE_OPERATIONS)), limit=15 - ) - assert counter.value == INCREASE_BY * INCREASE_OPERATIONS - - -async def test_redis_client_sdks_manager( - mock_redis_socket_timeout: None, redis_service: RedisSettings -): - all_redis_configs: set[RedisManagerDBConfig] = { - RedisManagerDBConfig(database=db) for db in RedisDatabase - } - manager = RedisClientsManager( - databases_configs=all_redis_configs, - settings=redis_service, - client_name="pytest", - ) - - async with manager: - for config in all_redis_configs: - assert manager.client(config.database) - - -async def test_redis_client_sdk_setup_shutdown( - mock_redis_socket_timeout: None, redis_service: RedisSettings -): - # setup - redis_resources_dns = redis_service.build_redis_dsn(RedisDatabase.RESOURCES) - client = RedisClientSDK(redis_resources_dns, client_name="pytest") - assert client - assert client.redis_dsn == redis_resources_dns - - # ensure nothing happens if shutdown is called before setup - await client.shutdown() - - await client.setup() - - # ensure health check task sets the health to True - client._is_healthy = False # noqa: SLF001 - async for attempt in AsyncRetrying( - wait=wait_fixed(0.1), - stop=stop_after_delay(10), - reraise=True, - retry=retry_if_exception_type(AssertionError), - ): - with attempt: - assert client.is_healthy is True - - # cleanup - await client.redis.flushall() - await client.shutdown() - - -@pytest.fixture -def mock_default_socket_timeout(mocker: MockerFixture) -> None: - mocker.patch.object( - redis_constants, "DEFAULT_SOCKET_TIMEOUT", datetime.timedelta(seconds=0.25) - ) - - -async def test_regression_fails_on_redis_service_outage( - mock_default_socket_timeout: None, - paused_container: Callable[[str], AbstractAsyncContextManager[None]], - redis_client_sdk: RedisClientSDK, -): - assert await redis_client_sdk.ping() is True - - async with paused_container("redis"): - # no connection available any longer should not hang but timeout - assert await redis_client_sdk.ping() is False - - assert await redis_client_sdk.ping() is True diff --git a/packages/service-library/tests/redis/test_redis__reconection.py b/packages/service-library/tests/redis/test_redis__reconection.py deleted file mode 100644 index 8fe5a718527..00000000000 --- a/packages/service-library/tests/redis/test_redis__reconection.py +++ /dev/null @@ -1,45 +0,0 @@ -# pylint:disable=unused-argument - -import docker -from servicelib.redis import RedisClientSDK -from settings_library.redis import RedisDatabase, RedisSettings -from tenacity.asyncio import AsyncRetrying -from tenacity.stop import stop_after_delay -from tenacity.wait import wait_fixed - -pytest_simcore_core_services_selection = [ - "redis", -] - - -# NOTE: keep this test ONLY tes tin the file! -# It breaks the service `redis` from `pytest_simcore_core_services_selection` -# since the service is being removed. -async def test_redis_client_sdk_lost_connection( - mock_redis_socket_timeout: None, - redis_service: RedisSettings, - docker_client: docker.client.DockerClient, -): - redis_client_sdk = RedisClientSDK( - redis_service.build_redis_dsn(RedisDatabase.RESOURCES), client_name="pytest" - ) - assert redis_client_sdk.client_name == "pytest" - await redis_client_sdk.setup() - - assert await redis_client_sdk.ping() is True - # now let's put down the rabbit service - for rabbit_docker_service in ( - docker_service - for docker_service in docker_client.services.list() - if "redis" in docker_service.name # type: ignore - ): - rabbit_docker_service.remove() # type: ignore - - # check that connection was lost - async for attempt in AsyncRetrying( - stop=stop_after_delay(60), wait=wait_fixed(0.5), reraise=True - ): - with attempt: - assert await redis_client_sdk.ping() is False - - await redis_client_sdk.shutdown() diff --git a/packages/service-library/tests/redis/test_redis_utils.py b/packages/service-library/tests/redis/test_redis_utils.py deleted file mode 100644 index 5df925ad2ca..00000000000 --- a/packages/service-library/tests/redis/test_redis_utils.py +++ /dev/null @@ -1,269 +0,0 @@ -# pylint:disable=redefined-outer-name - -import asyncio -from collections.abc import Callable -from contextlib import AbstractAsyncContextManager -from datetime import timedelta -from itertools import chain -from typing import Awaitable -from unittest.mock import Mock - -import arrow -import pytest -from faker import Faker -from servicelib.background_task import stop_periodic_task -from servicelib.redis import ( - CouldNotAcquireLockError, - RedisClientSDK, - exclusive, - start_exclusive_periodic_task, -) -from servicelib.utils import logged_gather -from settings_library.redis import RedisDatabase -from tenacity.asyncio import AsyncRetrying -from tenacity.retry import retry_if_exception_type -from tenacity.stop import stop_after_delay -from tenacity.wait import wait_fixed - -pytest_simcore_core_services_selection = [ - "redis", -] - - -async def _is_locked(redis_client_sdk: RedisClientSDK, lock_name: str) -> bool: - lock = redis_client_sdk.redis.lock(lock_name) - return await lock.locked() - - -@pytest.fixture -def lock_name(faker: Faker) -> str: - return faker.pystr() - - -def _exclusive_sleeping_task( - redis_client_sdk: RedisClientSDK | Callable[..., RedisClientSDK], - lock_name: str | Callable[..., str], - sleep_duration: float, -) -> Callable[..., Awaitable[float]]: - @exclusive(redis_client_sdk, lock_key=lock_name) - async def _() -> float: - resolved_client = ( - redis_client_sdk() if callable(redis_client_sdk) else redis_client_sdk - ) - resolved_lock_name = lock_name() if callable(lock_name) else lock_name - assert await _is_locked(resolved_client, resolved_lock_name) - await asyncio.sleep(sleep_duration) - assert await _is_locked(resolved_client, resolved_lock_name) - return sleep_duration - - return _ - - -@pytest.fixture -def sleep_duration(faker: Faker) -> float: - return faker.pyfloat(positive=True, min_value=0.2, max_value=0.8) - - -async def test_exclusive_decorator( - get_redis_client_sdk: Callable[ - [RedisDatabase], AbstractAsyncContextManager[RedisClientSDK] - ], - lock_name: str, - sleep_duration: float, -): - async with get_redis_client_sdk(RedisDatabase.RESOURCES) as redis_client: - for _ in range(3): - assert ( - await _exclusive_sleeping_task( - redis_client, lock_name, sleep_duration - )() - == sleep_duration - ) - - -async def test_exclusive_decorator_with_key_builder( - get_redis_client_sdk: Callable[ - [RedisDatabase], AbstractAsyncContextManager[RedisClientSDK] - ], - lock_name: str, - sleep_duration: float, -): - def _get_lock_name(*args, **kwargs) -> str: - assert args is not None - assert kwargs is not None - return lock_name - - async with get_redis_client_sdk(RedisDatabase.RESOURCES) as redis_client: - for _ in range(3): - assert ( - await _exclusive_sleeping_task( - redis_client, _get_lock_name, sleep_duration - )() - == sleep_duration - ) - - -async def test_exclusive_decorator_with_client_builder( - get_redis_client_sdk: Callable[ - [RedisDatabase], AbstractAsyncContextManager[RedisClientSDK] - ], - lock_name: str, - sleep_duration: float, -): - async with get_redis_client_sdk(RedisDatabase.RESOURCES) as redis_client: - - def _get_redis_client_builder(*args, **kwargs) -> RedisClientSDK: - assert args is not None - assert kwargs is not None - return redis_client - - for _ in range(3): - assert ( - await _exclusive_sleeping_task( - _get_redis_client_builder, lock_name, sleep_duration - )() - == sleep_duration - ) - - -async def _acquire_lock_and_exclusively_sleep( - get_redis_client_sdk: Callable[ - [RedisDatabase], AbstractAsyncContextManager[RedisClientSDK] - ], - lock_name: str | Callable[..., str], - sleep_duration: float, -) -> None: - async with get_redis_client_sdk(RedisDatabase.RESOURCES) as redis_client_sdk: - redis_lock_name = lock_name() if callable(lock_name) else lock_name - - @exclusive(redis_client_sdk, lock_key=lock_name) - async def _() -> float: - assert await _is_locked(redis_client_sdk, redis_lock_name) - await asyncio.sleep(sleep_duration) - assert await _is_locked(redis_client_sdk, redis_lock_name) - return sleep_duration - - assert await _() == sleep_duration - - assert not await _is_locked(redis_client_sdk, redis_lock_name) - - -async def test_exclusive_parallel_lock_is_released_and_reacquired( - get_redis_client_sdk: Callable[ - [RedisDatabase], AbstractAsyncContextManager[RedisClientSDK] - ], - lock_name: str, -): - parallel_tasks = 10 - results = await logged_gather( - *[ - _acquire_lock_and_exclusively_sleep( - get_redis_client_sdk, lock_name, sleep_duration=1 - ) - for _ in range(parallel_tasks) - ], - reraise=False, - ) - assert results.count(None) == 1 - assert [isinstance(x, CouldNotAcquireLockError) for x in results].count( - True - ) == parallel_tasks - 1 - - # check lock is released - async with get_redis_client_sdk(RedisDatabase.RESOURCES) as redis_client_sdk: - assert not await _is_locked(redis_client_sdk, lock_name) - - -async def _sleep_task(sleep_interval: float, on_sleep_events: Mock) -> None: - on_sleep_events(arrow.utcnow()) - await asyncio.sleep(sleep_interval) - on_sleep_events(arrow.utcnow()) - - -async def _assert_on_sleep_done(on_sleep_events: Mock, *, stop_after: float): - async for attempt in AsyncRetrying( - wait=wait_fixed(0.1), - stop=stop_after_delay(stop_after), - reraise=True, - retry=retry_if_exception_type(AssertionError), - ): - with attempt: - assert on_sleep_events.call_count == 2 - - -async def _assert_task_completes_once( - get_redis_client_sdk: Callable[ - [RedisDatabase], AbstractAsyncContextManager[RedisClientSDK] - ], - stop_after: float, -) -> tuple[float, ...]: - async with get_redis_client_sdk(RedisDatabase.RESOURCES) as redis_client_sdk: - sleep_events = Mock() - - started_task = start_exclusive_periodic_task( - redis_client_sdk, - _sleep_task, - task_period=timedelta(seconds=1), - task_name="long_running", - sleep_interval=1, - on_sleep_events=sleep_events, - ) - - await _assert_on_sleep_done(sleep_events, stop_after=stop_after) - - await stop_periodic_task(started_task, timeout=5) - - events_timestamps: tuple[float, ...] = tuple( - x.args[0].timestamp() for x in sleep_events.call_args_list - ) - return events_timestamps - - -async def test_start_exclusive_periodic_task_single( - get_redis_client_sdk: Callable[ - [RedisDatabase], AbstractAsyncContextManager[RedisClientSDK] - ], -): - await _assert_task_completes_once(get_redis_client_sdk, stop_after=2) - - -def _check_elements_lower(lst: list) -> bool: - # False when lst[x] => lst[x+1] otherwise True - return all(lst[i] < lst[i + 1] for i in range(len(lst) - 1)) - - -def test__check_elements_lower(): - assert _check_elements_lower([1, 2, 3, 4, 5]) - assert not _check_elements_lower([1, 2, 3, 3, 4, 5]) - assert not _check_elements_lower([1, 2, 3, 5, 4]) - assert not _check_elements_lower([2, 1, 3, 4, 5]) - assert not _check_elements_lower([1, 2, 4, 3, 5]) - - -async def test_start_exclusive_periodic_task_parallel_all_finish( - get_redis_client_sdk: Callable[ - [RedisDatabase], AbstractAsyncContextManager[RedisClientSDK] - ], -): - parallel_tasks = 10 - results: list[tuple[float, float]] = await logged_gather( - *[ - _assert_task_completes_once(get_redis_client_sdk, stop_after=60) - for _ in range(parallel_tasks) - ], - reraise=False, - ) - - # check no error occurred - assert [isinstance(x, tuple) for x in results].count(True) == parallel_tasks - assert [x[0] < x[1] for x in results].count(True) == parallel_tasks - - # sort by start time (task start order is not equal to the task lock acquisition order) - sorted_results: list[tuple[float, float]] = sorted(results, key=lambda x: x[0]) - - # pylint:disable=unnecessary-comprehension - flattened_results: list[float] = [x for x in chain(*sorted_results)] # noqa: C416 - - # NOTE all entries should be in increasing order; - # this means that the `_sleep_task` ran sequentially - assert _check_elements_lower(flattened_results) diff --git a/packages/service-library/tests/redis/test_utils.py b/packages/service-library/tests/redis/test_utils.py new file mode 100644 index 00000000000..25eb129c853 --- /dev/null +++ b/packages/service-library/tests/redis/test_utils.py @@ -0,0 +1,26 @@ +# pylint:disable=unused-variable +# pylint:disable=unused-argument +# pylint:disable=redefined-outer-name +# pylint:disable=protected-access + + +from faker import Faker +from servicelib.redis import RedisClientSDK, handle_redis_returns_union_types + +pytest_simcore_core_services_selection = [ + "redis", +] + +pytest_simcore_ops_services_selection = [ + "redis-commander", +] + + +async def test_handle_redis_returns_union_types( + redis_client_sdk: RedisClientSDK, faker: Faker +): + await handle_redis_returns_union_types( + redis_client_sdk.redis.hset( + faker.pystr(), mapping={faker.pystr(): faker.pystr()} + ) + ) diff --git a/packages/service-library/tests/test_async_utils.py b/packages/service-library/tests/test_async_utils.py index 902cbcc9b82..9bb1b4fff45 100644 --- a/packages/service-library/tests/test_async_utils.py +++ b/packages/service-library/tests/test_async_utils.py @@ -7,6 +7,7 @@ import random from collections import deque from dataclasses import dataclass +from datetime import timedelta from time import time from typing import Any @@ -14,6 +15,7 @@ from faker import Faker from servicelib.async_utils import ( _sequential_jobs_contexts, + delayed_start, run_sequentially_in_context, ) @@ -135,7 +137,6 @@ async def test_context_aware_wrong_target_args_name( expected_param_name: str, ensure_run_in_sequence_context_is_empty: None, # pylint: disable=unused-argument ) -> None: - # pylint: disable=unused-argument @run_sequentially_in_context(target_args=[expected_param_name]) async def target_function(the_param: Any) -> None: @@ -224,3 +225,20 @@ async def test_multiple_context_calls(context_param: int) -> int: assert i == await test_multiple_context_calls(i) assert len(_sequential_jobs_contexts) == RETRIES + + +async def test_with_delay(): + @delayed_start(timedelta(seconds=0.2)) + async def decorated_awaitable() -> int: + return 42 + + assert await decorated_awaitable() == 42 + + async def another_awaitable() -> int: + return 42 + + decorated_another_awaitable = delayed_start(timedelta(seconds=0.2))( + another_awaitable + ) + + assert await decorated_another_awaitable() == 42 diff --git a/packages/service-library/tests/test_background_task.py b/packages/service-library/tests/test_background_task.py index 8d69b5f5e2b..0f23035318f 100644 --- a/packages/service-library/tests/test_background_task.py +++ b/packages/service-library/tests/test_background_task.py @@ -9,15 +9,21 @@ from collections.abc import AsyncIterator, Awaitable, Callable from typing import Final from unittest import mock +from unittest.mock import AsyncMock import pytest from faker import Faker from pytest_mock.plugin import MockerFixture -from servicelib.background_task import ( - periodic_task, - start_periodic_task, - stop_periodic_task, -) +from servicelib.async_utils import cancel_wait_task +from servicelib.background_task import create_periodic_task, periodic, periodic_task + +pytest_simcore_core_services_selection = [ + "redis", +] +pytest_simcore_ops_services_selection = [ + "redis-commander", +] + _FAST_POLL_INTERVAL: Final[int] = 1 _VERY_SLOW_POLL_INTERVAL: Final[int] = 100 @@ -58,7 +64,7 @@ async def _creator( task: Callable[..., Awaitable], early_wake_up_event: asyncio.Event | None, ) -> asyncio.Task: - background_task = start_periodic_task( + background_task = create_periodic_task( task, interval=interval, task_name=faker.pystr(), @@ -71,7 +77,7 @@ async def _creator( yield _creator # cleanup await asyncio.gather( - *(stop_periodic_task(t, timeout=stop_task_timeout) for t in created_tasks) + *(cancel_wait_task(t, max_delay=stop_task_timeout) for t in created_tasks) ) @@ -177,3 +183,24 @@ async def test_periodic_task_context_manager( assert asyncio_task.cancelled() is False assert asyncio_task.done() is False assert asyncio_task.cancelled() is True + + +async def test_periodic_decorator(): + # This mock function will allow us to test if the function is called periodically + mock_func = AsyncMock() + + @periodic(interval=datetime.timedelta(seconds=0.1)) + async def _func() -> None: + await mock_func() + + task = asyncio.create_task(_func()) + + # Give some time for the periodic calls to happen + await asyncio.sleep(0.5) + + # Once enough time has passed, cancel the task + task.cancel() + with pytest.raises(asyncio.CancelledError): + await task + + assert mock_func.call_count > 1 diff --git a/packages/service-library/tests/test_background_task_utils.py b/packages/service-library/tests/test_background_task_utils.py new file mode 100644 index 00000000000..9a03a6c3541 --- /dev/null +++ b/packages/service-library/tests/test_background_task_utils.py @@ -0,0 +1,125 @@ +# pylint: disable=no-value-for-parameter +# pylint: disable=redefined-outer-name +# pylint: disable=unused-argument +# pylint: disable=unused-variable + + +import asyncio +import datetime +from collections.abc import AsyncIterator, Callable +from contextlib import AbstractAsyncContextManager +from itertools import chain +from unittest import mock + +import arrow +import pytest +from servicelib.async_utils import cancel_wait_task +from servicelib.background_task_utils import exclusive_periodic +from servicelib.redis import RedisClientSDK +from settings_library.redis import RedisDatabase +from tenacity import ( + AsyncRetrying, + retry_if_exception_type, + stop_after_delay, + wait_fixed, +) + +pytest_simcore_core_services_selection = [ + "redis", +] +pytest_simcore_ops_services_selection = [ + "redis-commander", +] + + +@pytest.fixture +async def redis_client_sdk( + get_redis_client_sdk: Callable[ + [RedisDatabase], AbstractAsyncContextManager[RedisClientSDK] + ], +) -> AsyncIterator[RedisClientSDK]: + async with get_redis_client_sdk(RedisDatabase.RESOURCES) as client: + yield client + + +async def _assert_on_sleep_done(on_sleep_events: mock.Mock, *, stop_after: float): + async for attempt in AsyncRetrying( + wait=wait_fixed(0.1), + stop=stop_after_delay(stop_after), + reraise=True, + retry=retry_if_exception_type(AssertionError), + ): + with attempt: + assert on_sleep_events.call_count == 2 + print("sleep was done with", on_sleep_events.call_count, " counts") + + +async def _assert_task_completes_once( + redis_client_sdk: RedisClientSDK, + stop_after: float, +) -> tuple[float, ...]: + @exclusive_periodic(redis_client_sdk, task_interval=datetime.timedelta(seconds=1)) + async def _sleep_task(sleep_interval: float, on_sleep_events: mock.Mock) -> None: + on_sleep_events(arrow.utcnow()) + await asyncio.sleep(sleep_interval) + print("Slept for", sleep_interval) + on_sleep_events(arrow.utcnow()) + + sleep_events = mock.Mock() + + task = asyncio.create_task(_sleep_task(1, sleep_events), name="pytest_sleep_task") + + await _assert_on_sleep_done(sleep_events, stop_after=stop_after) + + await cancel_wait_task(task, max_delay=5) + + events_timestamps: tuple[float, ...] = tuple( + x.args[0].timestamp() for x in sleep_events.call_args_list + ) + return events_timestamps + + +def _check_elements_lower(lst: list) -> bool: + # False when lst[x] => lst[x+1] otherwise True + return all(lst[i] < lst[i + 1] for i in range(len(lst) - 1)) + + +def test__check_elements_lower(): + assert _check_elements_lower([1, 2, 3, 4, 5]) + assert not _check_elements_lower([1, 2, 3, 3, 4, 5]) + assert not _check_elements_lower([1, 2, 3, 5, 4]) + assert not _check_elements_lower([2, 1, 3, 4, 5]) + assert not _check_elements_lower([1, 2, 4, 3, 5]) + + +async def test_exclusive_periodic_decorator_single( + redis_client_sdk: RedisClientSDK, +): + await _assert_task_completes_once(redis_client_sdk, stop_after=2) + + +async def test_exclusive_periodic_decorator_parallel_all_finish( + redis_client_sdk: RedisClientSDK, +): + parallel_tasks = 10 + results = await asyncio.gather( + *[ + _assert_task_completes_once(redis_client_sdk, stop_after=60) + for _ in range(parallel_tasks) + ], + return_exceptions=True, + ) + + # check no error occurred + assert [isinstance(x, tuple) for x in results].count(True) == parallel_tasks + assert [isinstance(x, Exception) for x in results].count(True) == 0 + valid_results = [x for x in results if isinstance(x, tuple)] + assert [x[0] < x[1] for x in valid_results].count(True) == parallel_tasks + + # sort by start time (task start order is not equal to the task lock acquisition order) + sorted_results = sorted(valid_results, key=lambda x: x[0]) + flattened_results = list(chain(*sorted_results)) + + # NOTE all entries should be in increasing order; + # this means that the `_sleep_task` ran sequentially + assert _check_elements_lower(flattened_results) diff --git a/packages/service-library/tests/test_decorators.py b/packages/service-library/tests/test_decorators.py index af120e5dc4f..c8e368a57a3 100644 --- a/packages/service-library/tests/test_decorators.py +++ b/packages/service-library/tests/test_decorators.py @@ -2,9 +2,8 @@ # pylint:disable=unused-argument # pylint:disable=redefined-outer-name -from datetime import timedelta -from servicelib.decorators import async_delayed, safe_return +from servicelib.decorators import safe_return def test_safe_return_decorator(): @@ -28,20 +27,3 @@ def return_mutable(): assert return_mutable() == some_mutable_return # contains the same assert return_mutable() is not some_mutable_return # but is not the same - - -async def test_async_delayed(): - @async_delayed(timedelta(seconds=0.2)) - async def decorated_awaitable() -> int: - return 42 - - assert await decorated_awaitable() == 42 - - async def another_awaitable() -> int: - return 42 - - decorated_another_awaitable = async_delayed(timedelta(seconds=0.2))( - another_awaitable - ) - - assert await decorated_another_awaitable() == 42 diff --git a/services/agent/src/simcore_service_agent/services/volumes_manager.py b/services/agent/src/simcore_service_agent/services/volumes_manager.py index fa5a0cd1b17..d26062a936e 100644 --- a/services/agent/src/simcore_service_agent/services/volumes_manager.py +++ b/services/agent/src/simcore_service_agent/services/volumes_manager.py @@ -9,7 +9,8 @@ from fastapi import FastAPI from models_library.projects_nodes_io import NodeID from pydantic import NonNegativeFloat -from servicelib.background_task import start_periodic_task, stop_periodic_task +from servicelib.async_utils import cancel_wait_task +from servicelib.background_task import create_periodic_task from servicelib.fastapi.app_state import SingletonInAppStateMixin from servicelib.logging_utils import log_context from servicelib.rabbitmq.rpc_interfaces.agent.errors import ( @@ -45,12 +46,12 @@ class VolumesManager( # pylint:disable=too-many-instance-attributes app_state_name: str = "volumes_manager" async def setup(self) -> None: - self._task_bookkeeping = start_periodic_task( + self._task_bookkeeping = create_periodic_task( self._bookkeeping_task, interval=self.book_keeping_interval, task_name="volumes bookkeeping", ) - self._task_periodic_volume_cleanup = start_periodic_task( + self._task_periodic_volume_cleanup = create_periodic_task( self._bookkeeping_task, interval=self.volume_cleanup_interval, task_name="volume cleanup", @@ -60,10 +61,10 @@ async def shutdown(self) -> None: await self.docker.close() if self._task_bookkeeping: - await stop_periodic_task(self._task_bookkeeping) + await cancel_wait_task(self._task_bookkeeping) if self._task_periodic_volume_cleanup: - await stop_periodic_task(self._task_periodic_volume_cleanup) + await cancel_wait_task(self._task_periodic_volume_cleanup) async def _bookkeeping_task(self) -> None: with log_context(_logger, logging.DEBUG, "volume bookkeeping"): diff --git a/services/api-server/src/simcore_service_api_server/core/_prometheus_instrumentation.py b/services/api-server/src/simcore_service_api_server/core/_prometheus_instrumentation.py index 7d435cd086c..16acff288b0 100644 --- a/services/api-server/src/simcore_service_api_server/core/_prometheus_instrumentation.py +++ b/services/api-server/src/simcore_service_api_server/core/_prometheus_instrumentation.py @@ -6,7 +6,8 @@ from fastapi import FastAPI from prometheus_client import CollectorRegistry, Gauge from pydantic import PositiveInt -from servicelib.background_task import start_periodic_task, stop_periodic_task +from servicelib.async_utils import cancel_wait_task +from servicelib.background_task import create_periodic_task from servicelib.fastapi.prometheus_instrumentation import ( setup_prometheus_instrumentation as setup_rest_instrumentation, ) @@ -67,7 +68,7 @@ async def on_startup() -> None: registry=instrumentator.registry ) await wait_till_log_distributor_ready(app) - app.state.instrumentation_task = start_periodic_task( + app.state.instrumentation_task = create_periodic_task( task=_collect_prometheus_metrics_task, interval=timedelta( seconds=app.state.settings.API_SERVER_PROMETHEUS_INSTRUMENTATION_COLLECT_SECONDS @@ -79,7 +80,7 @@ async def on_startup() -> None: async def on_shutdown() -> None: assert app.state.instrumentation_task # nosec with log_catch(_logger, reraise=False): - await stop_periodic_task(app.state.instrumentation_task) + await cancel_wait_task(app.state.instrumentation_task) app.add_event_handler("startup", on_startup) app.add_event_handler("shutdown", on_shutdown) diff --git a/services/api-server/src/simcore_service_api_server/core/health_checker.py b/services/api-server/src/simcore_service_api_server/core/health_checker.py index a05e046b2e8..b5a5180b12b 100644 --- a/services/api-server/src/simcore_service_api_server/core/health_checker.py +++ b/services/api-server/src/simcore_service_api_server/core/health_checker.py @@ -9,7 +9,8 @@ from models_library.rabbitmq_messages import LoggerRabbitMessage from models_library.users import UserID from pydantic import NonNegativeInt, PositiveFloat, PositiveInt -from servicelib.background_task import start_periodic_task, stop_periodic_task +from servicelib.async_utils import cancel_wait_task +from servicelib.background_task import create_periodic_task from servicelib.fastapi.dependencies import get_app from servicelib.logging_utils import log_catch from servicelib.rabbitmq import RabbitMQClient @@ -53,7 +54,7 @@ async def setup(self, health_check_task_period_seconds: PositiveFloat): await self._log_distributor.register( job_id=self._dummy_job_id, queue=self._dummy_queue ) - self._background_task = start_periodic_task( + self._background_task = create_periodic_task( task=self._background_task_method, interval=timedelta(seconds=health_check_task_period_seconds), task_name="api_server_health_check_task", @@ -62,8 +63,8 @@ async def setup(self, health_check_task_period_seconds: PositiveFloat): async def teardown(self): if self._background_task: with log_catch(_logger, reraise=False): - await stop_periodic_task( - self._background_task, timeout=self._timeout_seconds + await cancel_wait_task( + self._background_task, max_delay=self._timeout_seconds ) await self._log_distributor.deregister(job_id=self._dummy_job_id) diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_task.py b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_task.py index 28a37953a52..5ebc6a190f8 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_task.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_task.py @@ -3,7 +3,8 @@ from typing import Final from fastapi import FastAPI -from servicelib.background_task import start_periodic_task, stop_periodic_task +from servicelib.async_utils import cancel_wait_task +from servicelib.background_task import create_periodic_task from servicelib.redis import exclusive from ..core.settings import ApplicationSettings @@ -24,7 +25,7 @@ async def _startup() -> None: lock_key, lock_value = create_lock_key_and_value(app) assert lock_key # nosec assert lock_value # nosec - app.state.autoscaler_task = start_periodic_task( + app.state.autoscaler_task = create_periodic_task( exclusive(get_redis_client(app), lock_key=lock_key, lock_value=lock_value)( auto_scale_cluster ), @@ -43,7 +44,7 @@ async def _startup() -> None: def on_app_shutdown(app: FastAPI) -> Callable[[], Awaitable[None]]: async def _stop() -> None: - await stop_periodic_task(app.state.autoscaler_task) + await cancel_wait_task(app.state.autoscaler_task) return _stop diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/buffer_machines_pool_task.py b/services/autoscaling/src/simcore_service_autoscaling/modules/buffer_machines_pool_task.py index a4a0eefe11d..2985e2ffcc4 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/buffer_machines_pool_task.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/buffer_machines_pool_task.py @@ -3,7 +3,8 @@ from typing import Final from fastapi import FastAPI -from servicelib.background_task import start_periodic_task, stop_periodic_task +from servicelib.async_utils import cancel_wait_task +from servicelib.background_task import create_periodic_task from servicelib.redis import exclusive from ..core.settings import ApplicationSettings @@ -25,7 +26,7 @@ async def _startup() -> None: assert lock_value # nosec assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec - app.state.buffers_pool_task = start_periodic_task( + app.state.buffers_pool_task = create_periodic_task( exclusive( get_redis_client(app), lock_key=f"{lock_key}_buffers_pool", @@ -43,7 +44,7 @@ async def _startup() -> None: def on_app_shutdown(app: FastAPI) -> Callable[[], Awaitable[None]]: async def _stop() -> None: if hasattr(app.state, "buffers_pool_task"): - await stop_periodic_task(app.state.buffers_pool_task) + await cancel_wait_task(app.state.buffers_pool_task) return _stop diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/redis.py b/services/autoscaling/src/simcore_service_autoscaling/modules/redis.py index 60ce15df956..c0cf7a15e07 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/redis.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/redis.py @@ -15,10 +15,9 @@ async def on_startup() -> None: app.state.redis_client_sdk = None settings: RedisSettings = app.state.settings.AUTOSCALING_REDIS redis_locks_dsn = settings.build_redis_dsn(RedisDatabase.LOCKS) - app.state.redis_client_sdk = client = RedisClientSDK( + app.state.redis_client_sdk = RedisClientSDK( redis_locks_dsn, client_name=APP_NAME ) - await client.setup() async def on_shutdown() -> None: redis_client_sdk: None | RedisClientSDK = app.state.redis_client_sdk diff --git a/services/autoscaling/tests/unit/conftest.py b/services/autoscaling/tests/unit/conftest.py index 9b7489268e6..a3f554493b1 100644 --- a/services/autoscaling/tests/unit/conftest.py +++ b/services/autoscaling/tests/unit/conftest.py @@ -322,12 +322,12 @@ def mocked_ec2_instances_envs( @pytest.fixture def disable_dynamic_service_background_task(mocker: MockerFixture) -> None: mocker.patch( - "simcore_service_autoscaling.modules.auto_scaling_task.start_periodic_task", + "simcore_service_autoscaling.modules.auto_scaling_task.create_periodic_task", autospec=True, ) mocker.patch( - "simcore_service_autoscaling.modules.auto_scaling_task.stop_periodic_task", + "simcore_service_autoscaling.modules.auto_scaling_task.cancel_wait_task", autospec=True, ) @@ -335,12 +335,12 @@ def disable_dynamic_service_background_task(mocker: MockerFixture) -> None: @pytest.fixture def disable_buffers_pool_background_task(mocker: MockerFixture) -> None: mocker.patch( - "simcore_service_autoscaling.modules.buffer_machines_pool_task.start_periodic_task", + "simcore_service_autoscaling.modules.buffer_machines_pool_task.create_periodic_task", autospec=True, ) mocker.patch( - "simcore_service_autoscaling.modules.buffer_machines_pool_task.stop_periodic_task", + "simcore_service_autoscaling.modules.buffer_machines_pool_task.cancel_wait_task", autospec=True, ) @@ -826,7 +826,7 @@ def aws_instance_private_dns() -> str: @pytest.fixture def fake_localhost_ec2_instance_data( - fake_ec2_instance_data: Callable[..., EC2InstanceData] + fake_ec2_instance_data: Callable[..., EC2InstanceData], ) -> EC2InstanceData: local_ip = get_localhost_ip() fake_local_ec2_private_dns = f"ip-{local_ip.replace('.', '-')}.ec2.internal" @@ -1126,7 +1126,7 @@ def buffer_count( ], ) -> int: def _by_buffer_count( - instance_type_and_settings: tuple[InstanceTypeType, EC2InstanceBootSpecific] + instance_type_and_settings: tuple[InstanceTypeType, EC2InstanceBootSpecific], ) -> bool: _, boot_specific = instance_type_and_settings return boot_specific.buffer_count > 0 diff --git a/services/clusters-keeper/src/simcore_service_clusters_keeper/modules/clusters_management_task.py b/services/clusters-keeper/src/simcore_service_clusters_keeper/modules/clusters_management_task.py index cb5caa71c21..c540d7b160f 100644 --- a/services/clusters-keeper/src/simcore_service_clusters_keeper/modules/clusters_management_task.py +++ b/services/clusters-keeper/src/simcore_service_clusters_keeper/modules/clusters_management_task.py @@ -3,7 +3,8 @@ from collections.abc import Awaitable, Callable from fastapi import FastAPI -from servicelib.background_task import start_periodic_task, stop_periodic_task +from servicelib.async_utils import cancel_wait_task +from servicelib.background_task import create_periodic_task from servicelib.redis import exclusive from .._meta import APP_NAME @@ -22,7 +23,7 @@ async def _startup() -> None: lock_key = f"{APP_NAME}:clusters-management_lock" lock_value = json.dumps({}) - app.state.clusters_cleaning_task = start_periodic_task( + app.state.clusters_cleaning_task = create_periodic_task( exclusive(get_redis_client(app), lock_key=lock_key, lock_value=lock_value)( check_clusters ), @@ -36,7 +37,7 @@ async def _startup() -> None: def on_app_shutdown(app: FastAPI) -> Callable[[], Awaitable[None]]: async def _stop() -> None: - await stop_periodic_task(app.state.clusters_cleaning_task, timeout=5) + await cancel_wait_task(app.state.clusters_cleaning_task, max_delay=5) return _stop diff --git a/services/clusters-keeper/src/simcore_service_clusters_keeper/modules/redis.py b/services/clusters-keeper/src/simcore_service_clusters_keeper/modules/redis.py index a0a0d6a8745..8e2d5b71e33 100644 --- a/services/clusters-keeper/src/simcore_service_clusters_keeper/modules/redis.py +++ b/services/clusters-keeper/src/simcore_service_clusters_keeper/modules/redis.py @@ -16,10 +16,9 @@ async def on_startup() -> None: app.state.redis_client_sdk = None settings: RedisSettings = get_application_settings(app).CLUSTERS_KEEPER_REDIS redis_locks_dsn = settings.build_redis_dsn(RedisDatabase.LOCKS) - app.state.redis_client_sdk = client = RedisClientSDK( + app.state.redis_client_sdk = RedisClientSDK( redis_locks_dsn, client_name=APP_NAME ) - await client.setup() async def on_shutdown() -> None: redis_client_sdk: None | RedisClientSDK = app.state.redis_client_sdk diff --git a/services/clusters-keeper/src/simcore_service_clusters_keeper/rpc/clusters.py b/services/clusters-keeper/src/simcore_service_clusters_keeper/rpc/clusters.py index 93240eeb564..b7552dcfb09 100644 --- a/services/clusters-keeper/src/simcore_service_clusters_keeper/rpc/clusters.py +++ b/services/clusters-keeper/src/simcore_service_clusters_keeper/rpc/clusters.py @@ -1,3 +1,5 @@ +import datetime + from aws_library.ec2 import EC2InstanceData from aws_library.ec2._errors import EC2InstanceNotFoundError from fastapi import FastAPI @@ -5,6 +7,8 @@ from models_library.users import UserID from models_library.wallets import WalletID from servicelib.rabbitmq import RPCRouter +from servicelib.redis._client import RedisClientSDK +from servicelib.redis._decorators import exclusive from ..core.settings import get_application_settings from ..modules import clusters @@ -16,7 +20,33 @@ router = RPCRouter() +def _get_app_from_args(*args, **kwargs) -> FastAPI: + assert kwargs is not None # nosec + if args: + app = args[0] + else: + assert "app" in kwargs # nosec + app = kwargs["app"] + assert isinstance(app, FastAPI) # nosec + return app + + +def _get_redis_client_from_app(*args, **kwargs) -> RedisClientSDK: + app = _get_app_from_args(*args, **kwargs) + return get_redis_client(app) + + +def _get_redis_lock_key(*_args, user_id: UserID, wallet_id: WalletID | None) -> str: + return f"get_or_create_cluster-{user_id=}-{wallet_id=}" + + @router.expose() +@exclusive( + _get_redis_client_from_app, + lock_key=_get_redis_lock_key, + blocking=True, + blocking_timeout=datetime.timedelta(seconds=10), +) async def get_or_create_cluster( app: FastAPI, *, user_id: UserID, wallet_id: WalletID | None ) -> OnDemandCluster: @@ -26,32 +56,28 @@ async def get_or_create_cluster( Calling several time will always return the same cluster. """ ec2_instance: EC2InstanceData | None = None - redis = get_redis_client(app) dask_scheduler_ready = False cluster_auth = get_scheduler_auth(app) - async with redis.lock_context( - f"get_or_create_cluster-{user_id=}-{wallet_id=}", - blocking=True, - blocking_timeout_s=10, - ): - try: - ec2_instance = await clusters.get_cluster( - app, user_id=user_id, wallet_id=wallet_id - ) - except EC2InstanceNotFoundError: - new_ec2_instances = await clusters.create_cluster( - app, user_id=user_id, wallet_id=wallet_id - ) - assert new_ec2_instances # nosec - assert len(new_ec2_instances) == 1 # nosec - ec2_instance = new_ec2_instances[0] - dask_scheduler_ready = bool( - ec2_instance.state == "running" - and await ping_scheduler(get_scheduler_url(ec2_instance), cluster_auth) + try: + ec2_instance = await clusters.get_cluster( + app, user_id=user_id, wallet_id=wallet_id + ) + except EC2InstanceNotFoundError: + new_ec2_instances = await clusters.create_cluster( + app, user_id=user_id, wallet_id=wallet_id ) - if dask_scheduler_ready: - await clusters.cluster_heartbeat(app, user_id=user_id, wallet_id=wallet_id) + assert new_ec2_instances # nosec + assert len(new_ec2_instances) == 1 # nosec + ec2_instance = new_ec2_instances[0] + + dask_scheduler_ready = bool( + ec2_instance.state == "running" + and await ping_scheduler(get_scheduler_url(ec2_instance), cluster_auth) + ) + if dask_scheduler_ready: + await clusters.cluster_heartbeat(app, user_id=user_id, wallet_id=wallet_id) + assert ec2_instance is not None # nosec app_settings = get_application_settings(app) assert app_settings.CLUSTERS_KEEPER_PRIMARY_EC2_INSTANCES # nosec diff --git a/services/clusters-keeper/tests/unit/conftest.py b/services/clusters-keeper/tests/unit/conftest.py index 890968f4d83..77fbff6bd65 100644 --- a/services/clusters-keeper/tests/unit/conftest.py +++ b/services/clusters-keeper/tests/unit/conftest.py @@ -206,12 +206,12 @@ def disable_clusters_management_background_task( mocker: MockerFixture, ) -> Iterator[None]: start_background_task = mocker.patch( - "simcore_service_clusters_keeper.modules.clusters_management_task.start_periodic_task", + "simcore_service_clusters_keeper.modules.clusters_management_task.create_periodic_task", autospec=True, ) stop_background_task = mocker.patch( - "simcore_service_clusters_keeper.modules.clusters_management_task.stop_periodic_task", + "simcore_service_clusters_keeper.modules.clusters_management_task.cancel_wait_task", autospec=True, ) diff --git a/services/director-v2/src/simcore_service_director_v2/core/settings.py b/services/director-v2/src/simcore_service_director_v2/core/settings.py index 6b9c64de4a3..249bbe836ec 100644 --- a/services/director-v2/src/simcore_service_director_v2/core/settings.py +++ b/services/director-v2/src/simcore_service_director_v2/core/settings.py @@ -16,6 +16,7 @@ NoAuthentication, ) from pydantic import ( + AfterValidator, AliasChoices, AnyHttpUrl, AnyUrl, @@ -251,10 +252,14 @@ class AppSettings(BaseApplicationSettings, MixinLoggingSettings): description="resource usage tracker service client's plugin", ) - DIRECTOR_V2_PUBLIC_API_BASE_URL: AnyHttpUrl = Field( - ..., - description="Base URL used to access the public api e.g. http://127.0.0.1:6000 for development or https://api.osparc.io", - ) + DIRECTOR_V2_PUBLIC_API_BASE_URL: Annotated[ + AnyHttpUrl | str, + AfterValidator(lambda v: f"{v}".rstrip("/")), + Field( + ..., + description="Base URL used to access the public api e.g. http://127.0.0.1:6000 for development or https://api.osparc.io", + ), + ] DIRECTOR_V2_TRACING: TracingSettings | None = Field( json_schema_extra={"auto_default_from_env": True}, description="settings for opentelemetry tracing", diff --git a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_manager.py b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_manager.py index 87fd24fe8ad..509df01c27f 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_manager.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_manager.py @@ -6,7 +6,8 @@ from fastapi import FastAPI from models_library.projects import ProjectID from models_library.users import UserID -from servicelib.background_task import start_periodic_task, stop_periodic_task +from servicelib.async_utils import cancel_wait_task +from servicelib.background_task import create_periodic_task from servicelib.exception_utils import silence_exceptions from servicelib.logging_utils import log_context from servicelib.redis import CouldNotAcquireLockError, exclusive @@ -155,7 +156,7 @@ async def schedule_all_pipelines(app: FastAPI) -> None: async def setup_manager(app: FastAPI) -> None: - app.state.scheduler_manager = start_periodic_task( + app.state.scheduler_manager = create_periodic_task( silence_exceptions((CouldNotAcquireLockError,))(schedule_all_pipelines), interval=SCHEDULER_INTERVAL, task_name=MODULE_NAME_SCHEDULER, @@ -164,4 +165,4 @@ async def setup_manager(app: FastAPI) -> None: async def shutdown_manager(app: FastAPI) -> None: - await stop_periodic_task(app.state.scheduler_manager) + await cancel_wait_task(app.state.scheduler_manager) diff --git a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_worker.py b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_worker.py index 2bfd2f2f5a4..914629bcef5 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_worker.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_worker.py @@ -33,7 +33,7 @@ def _unique_key_builder( @exclusive( - redis=get_redis_client_from_app, + get_redis_client_from_app, lock_key=get_redis_lock_key( MODULE_NAME_WORKER, unique_lock_key_builder=_unique_key_builder ), diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_scheduler.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_scheduler.py index a65c2cd84ff..6860717238d 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_scheduler.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_scheduler.py @@ -41,11 +41,8 @@ from models_library.users import UserID from models_library.wallets import WalletID from pydantic import NonNegativeFloat -from servicelib.background_task import ( - cancel_task, - start_periodic_task, - stop_periodic_task, -) +from servicelib.async_utils import cancel_wait_task +from servicelib.background_task import create_periodic_task from servicelib.fastapi.long_running_tasks.client import ProgressCallback from servicelib.fastapi.long_running_tasks.server import TaskProgress from servicelib.redis import RedisClientsManager, exclusive @@ -107,7 +104,7 @@ async def start(self) -> None: settings: DynamicServicesSchedulerSettings = ( self.app.state.settings.DYNAMIC_SERVICES.DYNAMIC_SCHEDULER ) - self._scheduler_task = start_periodic_task( + self._scheduler_task = create_periodic_task( exclusive( redis_clients_manager.client(RedisDatabase.LOCKS), lock_key=f"{__name__}.{self.__class__.__name__}", @@ -129,7 +126,7 @@ async def shutdown(self) -> None: self._to_observe = {} if self._scheduler_task is not None: - await stop_periodic_task(self._scheduler_task, timeout=5) + await cancel_wait_task(self._scheduler_task, max_delay=5) self._scheduler_task = None if self._trigger_observation_queue_task is not None: @@ -368,7 +365,7 @@ async def mark_service_for_removal( self._service_observation_task[service_name] ) if isinstance(service_task, asyncio.Task): - await cancel_task(service_task, timeout=10) + await cancel_wait_task(service_task, max_delay=10) if skip_observation_recreation: return diff --git a/services/director-v2/src/simcore_service_director_v2/modules/osparc_variables/substitutions.py b/services/director-v2/src/simcore_service_director_v2/modules/osparc_variables/substitutions.py index 2249937341d..22a7bef360d 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/osparc_variables/substitutions.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/osparc_variables/substitutions.py @@ -1,6 +1,4 @@ -""" Substitution of osparc variables and secrets - -""" +"""Substitution of osparc variables and secrets""" import functools import logging @@ -24,6 +22,7 @@ from pydantic import BaseModel from servicelib.fastapi.app_state import SingletonInAppStateMixin from servicelib.logging_utils import log_context +from simcore_service_director_v2.core.settings import get_application_settings from ...utils.db import get_repository from ...utils.osparc_variables import ( @@ -194,6 +193,7 @@ async def resolve_and_substitute_session_variables_in_model( # if it raises an error vars need replacement raise_if_unresolved_osparc_variable_identifier_found(model) except UnresolvedOsparcVariableIdentifierError: + app_settings = get_application_settings(app) table = OsparcSessionVariablesTable.get_from_app_state(app) identifiers = await resolve_variables_from_context( table.copy(), @@ -204,7 +204,7 @@ async def resolve_and_substitute_session_variables_in_model( project_id=project_id, node_id=node_id, run_id=service_run_id, - api_server_base_url=app.state.settings.DIRECTOR_V2_PUBLIC_API_BASE_URL, + api_server_base_url=app_settings.DIRECTOR_V2_PUBLIC_API_BASE_URL, ), ) _logger.debug("replacing with the identifiers=%s", identifiers) @@ -238,6 +238,7 @@ async def resolve_and_substitute_session_variables_in_specs( identifiers_to_replace, ) if identifiers_to_replace: + app_settings = get_application_settings(app) environs = await resolve_variables_from_context( table.copy(include=identifiers_to_replace), context=ContextDict( @@ -247,7 +248,7 @@ async def resolve_and_substitute_session_variables_in_specs( project_id=project_id, node_id=node_id, run_id=service_run_id, - api_server_base_url=app.state.settings.DIRECTOR_V2_PUBLIC_API_BASE_URL, + api_server_base_url=app_settings.DIRECTOR_V2_PUBLIC_API_BASE_URL, ), ) diff --git a/services/director-v2/src/simcore_service_director_v2/utils/base_distributed_identifier.py b/services/director-v2/src/simcore_service_director_v2/utils/base_distributed_identifier.py index 194a308f8b9..ea685777a0d 100644 --- a/services/director-v2/src/simcore_service_director_v2/utils/base_distributed_identifier.py +++ b/services/director-v2/src/simcore_service_director_v2/utils/base_distributed_identifier.py @@ -5,7 +5,8 @@ from typing import Final, Generic, TypeVar from pydantic import NonNegativeInt -from servicelib.background_task import start_periodic_task, stop_periodic_task +from servicelib.async_utils import cancel_wait_task +from servicelib.background_task import create_periodic_task from servicelib.logging_utils import log_catch, log_context from servicelib.redis import RedisClientSDK from servicelib.utils import logged_gather @@ -67,7 +68,7 @@ def __init__( self._cleanup_task: Task | None = None async def setup(self) -> None: - self._cleanup_task = start_periodic_task( + self._cleanup_task = create_periodic_task( self._cleanup_unused_identifiers, interval=self.cleanup_interval, task_name="cleanup_unused_identifiers_task", @@ -75,7 +76,7 @@ async def setup(self) -> None: async def shutdown(self) -> None: if self._cleanup_task: - await stop_periodic_task(self._cleanup_task, timeout=5) + await cancel_wait_task(self._cleanup_task, max_delay=5) @classmethod def class_path(cls) -> str: @@ -170,9 +171,12 @@ async def remove(self, identifier: Identifier, *, reraise: bool = False) -> None ) return - with log_context( - _logger, logging.DEBUG, f"{self.__class__}: removing {identifier}" - ), log_catch(_logger, reraise=reraise): + with ( + log_context( + _logger, logging.DEBUG, f"{self.__class__}: removing {identifier}" + ), + log_catch(_logger, reraise=reraise), + ): await self._destroy(identifier, cleanup_context) await self._redis_client_sdk.redis.delete(self._to_redis_key(identifier)) diff --git a/services/director-v2/tests/unit/test_utils_distributed_identifier.py b/services/director-v2/tests/unit/test_utils_distributed_identifier.py index 88b143612af..c7ad46b74a9 100644 --- a/services/director-v2/tests/unit/test_utils_distributed_identifier.py +++ b/services/director-v2/tests/unit/test_utils_distributed_identifier.py @@ -176,7 +176,6 @@ async def redis_client_sdk( client = RedisClientSDK(redis_resources_dns, client_name="pytest") assert client assert client.redis_dsn == redis_resources_dns - await client.setup() # cleanup, previous run's leftovers await client.redis.flushall() diff --git a/services/director-v2/tests/unit/with_dbs/comp_scheduler/conftest.py b/services/director-v2/tests/unit/with_dbs/comp_scheduler/conftest.py index 8f1c2898222..0804a848d35 100644 --- a/services/director-v2/tests/unit/with_dbs/comp_scheduler/conftest.py +++ b/services/director-v2/tests/unit/with_dbs/comp_scheduler/conftest.py @@ -28,6 +28,7 @@ def mock_env( postgres_host_config: dict[str, str], rabbit_service: RabbitSettings, redis_service: RedisSettings, + rabbit_env_vars_dict: EnvVarsDict, ) -> EnvVarsDict: return mock_env | setenvs_from_dict( monkeypatch, @@ -35,7 +36,8 @@ def mock_env( | { "COMPUTATIONAL_BACKEND_ENABLED": True, "COMPUTATIONAL_BACKEND_DASK_CLIENT_ENABLED": True, - }, + } + | rabbit_env_vars_dict, ) diff --git a/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_worker.py b/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_worker.py index 8a66e543ed1..608204a3c73 100644 --- a/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_worker.py +++ b/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_worker.py @@ -12,6 +12,7 @@ from unittest import mock import pytest +from tenacity import retry, stop_after_delay, wait_fixed from _helpers import PublishedProject from fastapi import FastAPI from pytest_mock import MockerFixture @@ -25,6 +26,7 @@ from simcore_service_director_v2.modules.comp_scheduler._worker import ( _get_scheduler_worker, ) +from settings_library.rabbit import RabbitSettings pytest_simcore_core_services_selection = ["postgres", "rabbit", "redis"] pytest_simcore_ops_services_selection = ["adminer"] @@ -93,11 +95,14 @@ def with_scheduling_concurrency( ) +@pytest.mark.testit @pytest.mark.parametrize("scheduling_concurrency", [1, 50, 100]) @pytest.mark.parametrize( "queue_name", [SchedulePipelineRabbitMessage.get_channel_name()] ) async def test_worker_scheduling_parallelism( + rabbit_service: RabbitSettings, + ensure_parametrized_queue_is_empty: None, scheduling_concurrency: int, with_scheduling_concurrency: EnvVarsDict, with_disabled_auto_scheduling: mock.Mock, @@ -105,7 +110,6 @@ async def test_worker_scheduling_parallelism( initialized_app: FastAPI, publish_project: Callable[[], Awaitable[PublishedProject]], run_metadata: RunMetadataDict, - ensure_parametrized_queue_is_empty: None, ): with_disabled_auto_scheduling.assert_called_once() @@ -125,8 +129,15 @@ async def _project_pipeline_creation_workflow() -> None: use_on_demand_clusters=False, ) + # whatever scheduling concurrency we call in here, we shall always see the same number of calls to the scheduler await asyncio.gather( *(_project_pipeline_creation_workflow() for _ in range(scheduling_concurrency)) ) + # the call to run the pipeline is async so we need to wait here mocked_scheduler_api.assert_called() - assert mocked_scheduler_api.call_count == scheduling_concurrency + + @retry(stop=stop_after_delay(5), reraise=True, wait=wait_fixed(0.5)) + def _assert_expected_called(): + assert mocked_scheduler_api.call_count == scheduling_concurrency + + _assert_expected_called() diff --git a/services/director/src/simcore_service_director/registry_proxy.py b/services/director/src/simcore_service_director/registry_proxy.py index a1adfe977a0..b0df79d6c4d 100644 --- a/services/director/src/simcore_service_director/registry_proxy.py +++ b/services/director/src/simcore_service_director/registry_proxy.py @@ -10,7 +10,8 @@ import httpx from aiocache import Cache, SimpleMemoryCache # type: ignore[import-untyped] from fastapi import FastAPI, status -from servicelib.background_task import start_periodic_task, stop_periodic_task +from servicelib.async_utils import cancel_wait_task +from servicelib.background_task import create_periodic_task from servicelib.logging_utils import log_catch, log_context from servicelib.utils import limited_as_completed from tenacity import retry @@ -247,7 +248,7 @@ async def on_startup() -> None: app_settings = get_application_settings(app) app.state.auto_cache_task = None if app_settings.DIRECTOR_REGISTRY_CACHING: - app.state.auto_cache_task = start_periodic_task( + app.state.auto_cache_task = create_periodic_task( _list_all_services_task, interval=app_settings.DIRECTOR_REGISTRY_CACHING_TTL / 2, task_name="director-auto-cache-task", @@ -256,7 +257,7 @@ async def on_startup() -> None: async def on_shutdown() -> None: if app.state.auto_cache_task: - await stop_periodic_task(app.state.auto_cache_task) + await cancel_wait_task(app.state.auto_cache_task) app.add_event_handler("startup", on_startup) app.add_event_handler("shutdown", on_shutdown) @@ -594,7 +595,6 @@ async def get_service_extras( # discrete resources (custom made ones) --- # check if the service requires GPU support if not invalid_with_msg and _validate_kind(entry, "VRAM"): - result["node_requirements"]["GPU"] = 1 if not invalid_with_msg and _validate_kind(entry, "MPI"): result["node_requirements"]["MPI"] = 1 diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/status_monitor/_monitor.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/status_monitor/_monitor.py index c404535c1e4..750b0dbdc63 100644 --- a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/status_monitor/_monitor.py +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/status_monitor/_monitor.py @@ -1,3 +1,4 @@ +import asyncio import logging from datetime import timedelta from functools import cached_property @@ -7,8 +8,8 @@ from fastapi import FastAPI from models_library.projects_nodes_io import NodeID from pydantic import NonNegativeFloat, NonNegativeInt -from servicelib.background_task import stop_periodic_task -from servicelib.redis import start_exclusive_periodic_task +from servicelib.async_utils import cancel_wait_task +from servicelib.background_task_utils import exclusive_periodic from servicelib.utils import limited_gather from settings_library.redis import RedisDatabase @@ -133,14 +134,19 @@ async def _worker_check_services_require_status_update(self) -> None: ) async def setup(self) -> None: - self.app.state.status_monitor_background_task = start_exclusive_periodic_task( + @exclusive_periodic( get_redis_client(self.app, RedisDatabase.LOCKS), - self._worker_check_services_require_status_update, - task_period=_INTERVAL_BETWEEN_CHECKS, + task_interval=_INTERVAL_BETWEEN_CHECKS, retry_after=_INTERVAL_BETWEEN_CHECKS, - task_name="periodic_service_status_update", + ) + async def _periodic_check_services_require_status_update() -> None: + await self._worker_check_services_require_status_update() + + self.app.state.status_monitor_background_task = asyncio.create_task( + _periodic_check_services_require_status_update(), + name="periodic_service_status_update", ) async def shutdown(self) -> None: if getattr(self.app.state, "status_monitor_background_task", None): - await stop_periodic_task(self.app.state.status_monitor_background_task) + await cancel_wait_task(self.app.state.status_monitor_background_task) diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/outputs/_manager.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/outputs/_manager.py index adf046dd468..9a669aacc6b 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/outputs/_manager.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/outputs/_manager.py @@ -12,7 +12,8 @@ from models_library.rabbitmq_messages import ProgressType from pydantic import PositiveFloat from servicelib import progress_bar -from servicelib.background_task import start_periodic_task, stop_periodic_task +from servicelib.async_utils import cancel_wait_task +from servicelib.background_task import create_periodic_task from servicelib.logging_utils import log_catch, log_context from simcore_sdk.node_ports_common.file_io_utils import LogRedirectCB @@ -194,7 +195,7 @@ def set_all_ports_for_upload(self) -> None: self._schedule_all_ports_for_upload = True async def start(self) -> None: - self._task_scheduler_worker = start_periodic_task( + self._task_scheduler_worker = create_periodic_task( self._scheduler_worker, interval=timedelta(seconds=self.task_monitor_interval_s), task_name="outputs_manager_scheduler_worker", @@ -204,8 +205,8 @@ async def shutdown(self) -> None: with log_context(_logger, logging.INFO, f"{OutputsManager.__name__} shutdown"): await self._uploading_task_cancel() if self._task_scheduler_worker is not None: - await stop_periodic_task( - self._task_scheduler_worker, timeout=self.task_monitor_interval_s + await cancel_wait_task( + self._task_scheduler_worker, max_delay=self.task_monitor_interval_s ) async def port_key_content_changed(self, port_key: str) -> None: diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/prometheus_metrics.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/prometheus_metrics.py index 5a4d09ca71d..df5ae853d24 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/prometheus_metrics.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/prometheus_metrics.py @@ -9,7 +9,7 @@ from fastapi import FastAPI, status from models_library.callbacks_mapping import CallbacksMapping, UserServiceCommand from pydantic import BaseModel, NonNegativeFloat, NonNegativeInt -from servicelib.background_task import cancel_task +from servicelib.async_utils import cancel_wait_task from servicelib.logging_utils import log_context from servicelib.sequences_utils import pairwise from simcore_service_dynamic_sidecar.core.errors import ( @@ -143,8 +143,8 @@ async def start(self) -> None: async def stop(self) -> None: with log_context(_logger, logging.INFO, "shutdown service metrics recovery"): if self._metrics_recovery_task: - await cancel_task( - self._metrics_recovery_task, timeout=_TASK_CANCELLATION_TIMEOUT_S + await cancel_wait_task( + self._metrics_recovery_task, max_delay=_TASK_CANCELLATION_TIMEOUT_S ) diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/resource_tracking/_core.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/resource_tracking/_core.py index 6da3aa3f00c..eecbfd2089e 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/resource_tracking/_core.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/resource_tracking/_core.py @@ -14,7 +14,8 @@ from models_library.services import ServiceType from models_library.services_creation import CreateServiceMetricsAdditionalParams from pydantic import NonNegativeFloat -from servicelib.background_task import start_periodic_task, stop_periodic_task +from servicelib.async_utils import cancel_wait_task +from servicelib.background_task import create_periodic_task from servicelib.logging_utils import log_context from ...core.docker_utils import ( @@ -49,7 +50,7 @@ async def _start_heart_beat_task(app: FastAPI) -> None: raise RuntimeError(msg) with log_context(_logger, logging.DEBUG, "starting heart beat task"): - resource_tracking.heart_beat_task = start_periodic_task( + resource_tracking.heart_beat_task = create_periodic_task( _heart_beat_task, app=app, interval=resource_tracking_settings.RESOURCE_TRACKING_HEARTBEAT_INTERVAL, @@ -61,8 +62,8 @@ async def _start_heart_beat_task(app: FastAPI) -> None: async def stop_heart_beat_task(app: FastAPI) -> None: resource_tracking: ResourceTrackingState = app.state.resource_tracking if resource_tracking.heart_beat_task: - await stop_periodic_task( - resource_tracking.heart_beat_task, timeout=_STOP_WORKER_TIMEOUT_S + await cancel_wait_task( + resource_tracking.heart_beat_task, max_delay=_STOP_WORKER_TIMEOUT_S ) diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/system_monitor/_disk_usage.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/system_monitor/_disk_usage.py index f1f34e0a6e7..45f16614068 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/system_monitor/_disk_usage.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/system_monitor/_disk_usage.py @@ -14,7 +14,8 @@ ) from models_library.projects_nodes_io import NodeID from models_library.users import UserID -from servicelib.background_task import start_periodic_task, stop_periodic_task +from servicelib.async_utils import cancel_wait_task +from servicelib.background_task import create_periodic_task from servicelib.logging_utils import log_context from servicelib.utils import logged_gather @@ -120,7 +121,7 @@ def _replace_incoming_usage( @staticmethod def _get_grouped_usage_to_folder_names( - local_disk_usage: dict[str, DiskUsage] + local_disk_usage: dict[str, DiskUsage], ) -> dict[DiskUsage, set[str]]: """Groups all paths that have the same metrics together""" usage_to_folder_names: dict[DiskUsage, set[str]] = {} @@ -174,13 +175,13 @@ async def _monitor(self) -> None: self._last_usage = supported_usage async def setup(self) -> None: - self._monitor_task = start_periodic_task( + self._monitor_task = create_periodic_task( self._monitor, interval=self.interval, task_name="monitor_disk_usage" ) async def shutdown(self) -> None: if self._monitor_task: - await stop_periodic_task(self._monitor_task) + await cancel_wait_task(self._monitor_task) def set_disk_usage_for_path(self, overwrite_usage: dict[str, DiskUsage]) -> None: """ diff --git a/services/efs-guardian/src/simcore_service_efs_guardian/services/background_tasks.py b/services/efs-guardian/src/simcore_service_efs_guardian/services/background_tasks.py index 8ac475a4742..d932efec51a 100644 --- a/services/efs-guardian/src/simcore_service_efs_guardian/services/background_tasks.py +++ b/services/efs-guardian/src/simcore_service_efs_guardian/services/background_tasks.py @@ -1,5 +1,5 @@ import logging -from datetime import datetime, timezone +from datetime import UTC, datetime from fastapi import FastAPI from models_library.projects import ProjectID @@ -29,7 +29,7 @@ async def removal_policy_task(app: FastAPI) -> None: assert app_settings # nosec efs_manager: EfsManager = app.state.efs_manager - base_start_timestamp = datetime.now(tz=timezone.utc) + base_start_timestamp = datetime.now(tz=UTC) efs_project_ids: list[ ProjectID @@ -60,9 +60,9 @@ async def removal_policy_task(app: FastAPI) -> None: logging.INFO, msg=f"Removing data for project {project_id} started, project last change date {_project_last_change_date}, efs removal policy task age limit timedelta {app_settings.EFS_REMOVAL_POLICY_TASK_AGE_LIMIT_TIMEDELTA}", ): - redis_lock = get_redis_lock_client(app).redis.lock( + redis_lock = get_redis_lock_client(app).create_lock( PROJECT_REDIS_LOCK_KEY.format(project_id), - timeout=PROJECT_LOCK_TIMEOUT.total_seconds(), + ttl=PROJECT_LOCK_TIMEOUT, ) async with lock_project( redis_lock, diff --git a/services/efs-guardian/src/simcore_service_efs_guardian/services/background_tasks_setup.py b/services/efs-guardian/src/simcore_service_efs_guardian/services/background_tasks_setup.py index 58218a79cb1..2f42bc5d870 100644 --- a/services/efs-guardian/src/simcore_service_efs_guardian/services/background_tasks_setup.py +++ b/services/efs-guardian/src/simcore_service_efs_guardian/services/background_tasks_setup.py @@ -5,18 +5,24 @@ from typing import TypedDict from fastapi import FastAPI -from servicelib.background_task import stop_periodic_task +from servicelib.async_utils import cancel_wait_task +from servicelib.background_task_utils import exclusive_periodic from servicelib.logging_utils import log_catch, log_context -from servicelib.redis import start_exclusive_periodic_task from .background_tasks import removal_policy_task from .modules.redis import get_redis_lock_client -_logger = logging.getLogger(__name__) -_SEC = 1 # in s -_MIN = 60 * _SEC # in s -_HOUR = 60 * _MIN # in s +@exclusive_periodic( + get_redis_lock_client, + task_interval=timedelta(hours=1), + retry_after=timedelta(minutes=5), +) +async def periodic_removal_policy_task(app: FastAPI) -> None: + await removal_policy_task(app) + + +_logger = logging.getLogger(__name__) class EfsGuardianBackgroundTask(TypedDict): @@ -26,7 +32,7 @@ class EfsGuardianBackgroundTask(TypedDict): _EFS_GUARDIAN_BACKGROUND_TASKS = [ EfsGuardianBackgroundTask( - name="efs_removal_policy_task", task_func=removal_policy_task + name="efs_removal_policy_task", task_func=periodic_removal_policy_task ) ] @@ -41,15 +47,9 @@ async def _startup() -> None: # Setup periodic tasks for task in _EFS_GUARDIAN_BACKGROUND_TASKS: - exclusive_task = start_exclusive_periodic_task( - get_redis_lock_client(app), - task["task_func"], - task_period=timedelta(seconds=1 * _HOUR), - retry_after=timedelta(seconds=5 * _MIN), - task_name=task["name"], - app=app, + app.state.efs_guardian_background_tasks.append( + asyncio.create_task(task["task_func"](), name=task["name"]) ) - app.state.efs_guardian_background_tasks.append(exclusive_task) return _startup @@ -66,7 +66,7 @@ async def _stop() -> None: if _app.state.efs_guardian_background_tasks: await asyncio.gather( *[ - stop_periodic_task(task) + cancel_wait_task(task) for task in _app.state.efs_guardian_background_tasks ] ) diff --git a/services/efs-guardian/src/simcore_service_efs_guardian/services/modules/redis.py b/services/efs-guardian/src/simcore_service_efs_guardian/services/modules/redis.py index 4876e5b8b21..78d1462378a 100644 --- a/services/efs-guardian/src/simcore_service_efs_guardian/services/modules/redis.py +++ b/services/efs-guardian/src/simcore_service_efs_guardian/services/modules/redis.py @@ -15,10 +15,9 @@ async def on_startup() -> None: app.state.redis_lock_client_sdk = None settings: RedisSettings = app.state.settings.EFS_GUARDIAN_REDIS redis_locks_dsn = settings.build_redis_dsn(RedisDatabase.LOCKS) - app.state.redis_lock_client_sdk = lock_client = RedisClientSDK( + app.state.redis_lock_client_sdk = RedisClientSDK( redis_locks_dsn, client_name=APP_NAME ) - await lock_client.setup() async def on_shutdown() -> None: redis_lock_client_sdk: None | RedisClientSDK = app.state.redis_lock_client_sdk diff --git a/services/efs-guardian/src/simcore_service_efs_guardian/services/process_messages.py b/services/efs-guardian/src/simcore_service_efs_guardian/services/process_messages.py index 4de25a56c03..77907bc51a5 100644 --- a/services/efs-guardian/src/simcore_service_efs_guardian/services/process_messages.py +++ b/services/efs-guardian/src/simcore_service_efs_guardian/services/process_messages.py @@ -1,3 +1,4 @@ +import datetime import logging from fastapi import FastAPI @@ -8,6 +9,7 @@ from servicelib.rabbitmq.rpc_interfaces.dynamic_sidecar.disk_usage import ( update_disk_usage, ) +from servicelib.redis._decorators import exclusive from servicelib.utils import fire_and_forget_task from ..core.settings import get_application_settings @@ -77,13 +79,13 @@ async def process_dynamic_service_running_message(app: FastAPI, data: bytes) -> msg = f"Removing write permissions inside of EFS starts for project ID: {rabbit_message.project_id}, node ID: {rabbit_message.node_id}, current user: {rabbit_message.user_id}, size: {size}, upper limit: {settings.EFS_DEFAULT_USER_SERVICE_SIZE_BYTES}" with log_context(_logger, logging.WARNING, msg=msg): redis = get_redis_lock_client(app) - async with redis.lock_context( - f"efs_remove_write_permissions-{rabbit_message.project_id=}-{rabbit_message.node_id=}", + await exclusive( + redis, + lock_key=f"efs_remove_write_permissions-{rabbit_message.project_id=}-{rabbit_message.node_id=}", blocking=True, - blocking_timeout_s=10, - ): - await efs_manager.remove_project_node_data_write_permissions( - project_id=rabbit_message.project_id, node_id=rabbit_message.node_id - ) + blocking_timeout=datetime.timedelta(seconds=10), + )(efs_manager.remove_project_node_data_write_permissions)( + project_id=rabbit_message.project_id, node_id=rabbit_message.node_id + ) return True diff --git a/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/background_task_periodic_heartbeat_check.py b/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/background_task_periodic_heartbeat_check.py index 98a18522e9e..e295f8a03ea 100644 --- a/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/background_task_periodic_heartbeat_check.py +++ b/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/background_task_periodic_heartbeat_check.py @@ -126,7 +126,7 @@ async def _close_unhealthy_service( ) -async def periodic_check_of_running_services_task(app: FastAPI) -> None: +async def check_running_services(app: FastAPI) -> None: _logger.info("Periodic check started") # This check runs across all products diff --git a/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/background_task_periodic_heartbeat_check_setup.py b/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/background_task_periodic_heartbeat_check_setup.py index c1f407a7765..abaefe1e9b7 100644 --- a/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/background_task_periodic_heartbeat_check_setup.py +++ b/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/background_task_periodic_heartbeat_check_setup.py @@ -1,16 +1,15 @@ +import asyncio import logging from collections.abc import Awaitable, Callable from typing import TypedDict from fastapi import FastAPI -from servicelib.background_task import stop_periodic_task +from servicelib.async_utils import cancel_wait_task +from servicelib.background_task_utils import exclusive_periodic from servicelib.logging_utils import log_catch, log_context -from servicelib.redis import start_exclusive_periodic_task from ..core.settings import ApplicationSettings -from .background_task_periodic_heartbeat_check import ( - periodic_check_of_running_services_task, -) +from .background_task_periodic_heartbeat_check import check_running_services from .modules.redis import get_redis_lock_client _logger = logging.getLogger(__name__) @@ -38,17 +37,19 @@ async def _startup() -> None: app.state.rut_background_task__periodic_check_of_running_services = None - # Setup periodic task - exclusive_task = start_exclusive_periodic_task( + @exclusive_periodic( get_redis_lock_client(app), - periodic_check_of_running_services_task, - task_period=app_settings.RESOURCE_USAGE_TRACKER_MISSED_HEARTBEAT_INTERVAL_SEC, + task_interval=app_settings.RESOURCE_USAGE_TRACKER_MISSED_HEARTBEAT_INTERVAL_SEC, retry_after=app_settings.RESOURCE_USAGE_TRACKER_MISSED_HEARTBEAT_INTERVAL_SEC, - task_name=_TASK_NAME_PERIODICALY_CHECK_RUNNING_SERVICES, - app=app, ) + async def _periodic_check_running_services() -> None: + await check_running_services(app) + app.state.rut_background_task__periodic_check_of_running_services = ( - exclusive_task + asyncio.create_task( + _periodic_check_running_services(), + name=_TASK_NAME_PERIODICALY_CHECK_RUNNING_SERVICES, + ) ) return _startup @@ -68,7 +69,7 @@ async def _stop() -> None: ): assert _app # nosec if _app.state.rut_background_task__periodic_check_of_running_services: - await stop_periodic_task( + await cancel_wait_task( _app.state.rut_background_task__periodic_check_of_running_services ) diff --git a/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/modules/redis.py b/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/modules/redis.py index be7724a5667..e2790b2a4e9 100644 --- a/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/modules/redis.py +++ b/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/modules/redis.py @@ -21,10 +21,9 @@ async def on_startup() -> None: app.state.redis_client_sdk = None settings: RedisSettings = app.state.settings.RESOURCE_USAGE_TRACKER_REDIS redis_locks_dsn = settings.build_redis_dsn(RedisDatabase.LOCKS) - app.state.redis_client_sdk = client = RedisClientSDK( + app.state.redis_client_sdk = RedisClientSDK( redis_locks_dsn, client_name=APP_NAME ) - await client.setup() async def on_shutdown() -> None: with log_context( diff --git a/services/resource-usage-tracker/tests/unit/conftest.py b/services/resource-usage-tracker/tests/unit/conftest.py index 6e4bfcd6b98..1a6a864a447 100644 --- a/services/resource-usage-tracker/tests/unit/conftest.py +++ b/services/resource-usage-tracker/tests/unit/conftest.py @@ -202,12 +202,12 @@ def mocked_prometheus_with_query( @pytest.fixture def disabled_tracker_background_task(mocker: MockerFixture) -> dict[str, mock.Mock]: mocked_start = mocker.patch( - "simcore_service_resource_usage_tracker.modules.prometheus_containers.plugin.start_periodic_task", + "simcore_service_resource_usage_tracker.modules.prometheus_containers.plugin.create_periodic_task", autospec=True, ) mocked_stop = mocker.patch( - "simcore_service_resource_usage_tracker.modules.prometheus_containers.plugin.stop_periodic_task", + "simcore_service_resource_usage_tracker.modules.prometheus_containers.plugin.cancel_wait_task", autospec=True, ) return {"start_task": mocked_start, "stop_task": mocked_stop} diff --git a/services/resource-usage-tracker/tests/unit/with_dbs/test_background_task_periodic_heartbeat_check.py b/services/resource-usage-tracker/tests/unit/with_dbs/test_background_task_periodic_heartbeat_check.py index 8ebe34bbd2d..42249956a8e 100644 --- a/services/resource-usage-tracker/tests/unit/with_dbs/test_background_task_periodic_heartbeat_check.py +++ b/services/resource-usage-tracker/tests/unit/with_dbs/test_background_task_periodic_heartbeat_check.py @@ -1,5 +1,5 @@ from collections.abc import Callable, Iterator -from datetime import datetime, timedelta, timezone +from datetime import UTC, datetime, timedelta import pytest import sqlalchemy as sa @@ -21,7 +21,7 @@ ) from simcore_service_resource_usage_tracker.models.service_runs import ServiceRunDB from simcore_service_resource_usage_tracker.services.background_task_periodic_heartbeat_check import ( - periodic_check_of_running_services_task, + check_running_services, ) pytest_simcore_core_services_selection = ["postgres", "rabbit"] @@ -33,8 +33,8 @@ _SERVICE_RUN_ID_S4L_10_MIN_OLD = "2" _SERVICE_RUN_ID_OSPARC_NOW = "3" -_LAST_HEARTBEAT_10_MIN_OLD = datetime.now(tz=timezone.utc) - timedelta(minutes=10) -_LAST_HEARTBEAT_NOW = datetime.now(tz=timezone.utc) +_LAST_HEARTBEAT_10_MIN_OLD = datetime.now(tz=UTC) - timedelta(minutes=10) +_LAST_HEARTBEAT_NOW = datetime.now(tz=UTC) @pytest.fixture() @@ -131,11 +131,11 @@ async def test_process_event_functions( app_settings: ApplicationSettings = initialized_app.state.settings for _ in range(app_settings.RESOURCE_USAGE_TRACKER_MISSED_HEARTBEAT_COUNTER_FAIL): - await periodic_check_of_running_services_task(initialized_app) + await check_running_services(initialized_app) # NOTE: As we are doing check that the modified field needs to be older then some # threshold, we need to make this field artificaly older in this test with postgres_db.connect() as con: - fake_old_modified_at = datetime.now(tz=timezone.utc) - timedelta(minutes=5) + fake_old_modified_at = datetime.now(tz=UTC) - timedelta(minutes=5) update_stmt = resource_tracker_service_runs.update().values( modified=fake_old_modified_at ) @@ -160,7 +160,7 @@ async def test_process_event_functions( assert service_run.service_run_status == ServiceRunStatus.RUNNING # Now we call the function one more time and it should consider some of running services as unhealthy - await periodic_check_of_running_services_task(initialized_app) + await check_running_services(initialized_app) with postgres_db.connect() as con: result = con.execute(sa.select(resource_tracker_service_runs)) diff --git a/services/static-webserver/client/source/class/osparc/desktop/wallets/MembersList.js b/services/static-webserver/client/source/class/osparc/desktop/wallets/MembersList.js index fe239b9f7ef..55177a4f264 100644 --- a/services/static-webserver/client/source/class/osparc/desktop/wallets/MembersList.js +++ b/services/static-webserver/client/source/class/osparc/desktop/wallets/MembersList.js @@ -106,7 +106,7 @@ qx.Class.define("osparc.desktop.wallets.MembersList", { }, __createIntroText: function() { - const msg = this.tr("Only Accountants of an organization can share a wallet with the entire organization and members."); + const msg = this.tr("Only Accountants of an Organization can share a wallet with other users."); const intro = new qx.ui.basic.Label().set({ value: msg, alignX: "left", diff --git a/services/static-webserver/client/source/class/osparc/filter/SearchingCollaborators.js b/services/static-webserver/client/source/class/osparc/filter/SearchingCollaborators.js new file mode 100644 index 00000000000..084dd9a8e1d --- /dev/null +++ b/services/static-webserver/client/source/class/osparc/filter/SearchingCollaborators.js @@ -0,0 +1,24 @@ +/* + * oSPARC - The SIMCORE frontend - https://osparc.io + * Copyright: 2025 IT'IS Foundation - https://itis.swiss + * License: MIT - https://opensource.org/licenses/MIT + * Authors: Odei Maiz (odeimaiz) + */ + +qx.Class.define("osparc.filter.SearchingCollaborators", { + extend: qx.ui.basic.Atom, + + construct: function() { + this.base(arguments); + + this.set({ + label: this.tr("Searching..."), + icon: "@FontAwesome5Solid/circle-notch/14", + appearance: "tagbutton", + gap: 10, + }); + + this.getChildControl("icon").getContentElement().addClass("rotate"); + this.getChildControl("label").setTextColor("text"); + }, +}); diff --git a/services/static-webserver/client/source/class/osparc/share/NewCollaboratorsManager.js b/services/static-webserver/client/source/class/osparc/share/NewCollaboratorsManager.js index 412fef60293..7f757b831a3 100644 --- a/services/static-webserver/client/source/class/osparc/share/NewCollaboratorsManager.js +++ b/services/static-webserver/client/source/class/osparc/share/NewCollaboratorsManager.js @@ -44,8 +44,9 @@ qx.Class.define("osparc.share.NewCollaboratorsManager", { __resourceData: null, __showOrganizations: null, __textFilter: null, - __searchButton: null, __collabButtonsContainer: null, + __searchingCollaborators: null, + __searchDelayer: null, __shareButton: null, __selectedCollaborators: null, __potentialCollaborators: null, @@ -72,17 +73,24 @@ qx.Class.define("osparc.share.NewCollaboratorsManager", { })); const filter = this.__textFilter = new osparc.filter.TextFilter("name", "collaboratorsManager"); filter.setCompact(true); - this.addListener("appear", () => filter.getChildControl("textfield").focus()); + const filterTextField = filter.getChildControl("textfield"); + filterTextField.setPlaceholder(this.tr("Search")); + this.addListener("appear", () => filterTextField.focus()); toolbar.add(filter, { flex: 1 }); - const searchButton = this.__searchButton = new osparc.ui.form.FetchButton(this.tr("Search"), "@FontAwesome5Solid/search/12").set({ - maxHeight: 30, + filterTextField.addListener("input", e => { + const filterValue = e.getData(); + if (this.__searchDelayer) { + clearTimeout(this.__searchDelayer); + } + if (filterValue.length > 3) { + const waitBeforeSearching = 1000; + this.__searchDelayer = setTimeout(() => { + this.__searchUsers(); + }, waitBeforeSearching); + } }); - const command = new qx.ui.command.Command("Enter"); - searchButton.setCommand(command); - searchButton.addListener("execute", () => this.__searchUsers(), this); - toolbar.add(searchButton); this.add(toolbar); const collabButtonsContainer = this.__collabButtonsContainer = new qx.ui.container.Composite(new qx.ui.layout.VBox()); @@ -92,6 +100,10 @@ qx.Class.define("osparc.share.NewCollaboratorsManager", { flex: 1 }); + const searchingCollaborators = this.__searchingCollaborators = new osparc.filter.SearchingCollaborators(); + searchingCollaborators.exclude(); + this.__collabButtonsContainer.add(searchingCollaborators); + const buttons = new qx.ui.container.Composite(new qx.ui.layout.HBox().set({ alignX: "right" })); @@ -105,8 +117,8 @@ qx.Class.define("osparc.share.NewCollaboratorsManager", { }, __searchUsers: function() { + this.__searchingCollaborators.show(); const text = this.__textFilter.getChildControl("textfield").getValue(); - this.__searchButton.setFetching(true); osparc.store.Users.getInstance().searchUsers(text) .then(users => { users.forEach(user => user["collabType"] = 2); @@ -116,7 +128,7 @@ qx.Class.define("osparc.share.NewCollaboratorsManager", { console.error(err); osparc.FlashMessenger.getInstance().logAs(err.message, "ERROR"); }) - .finally(() => this.__searchButton.setFetching(false)); + .finally(() => this.__searchingCollaborators.exclude()); }, __showProductEveryone: function() { @@ -208,6 +220,10 @@ qx.Class.define("osparc.share.NewCollaboratorsManager", { } this.__collabButtonsContainer.add(this.__collaboratorButton(potentialCollaborator)); }); + + // move it to last position + this.__collabButtonsContainer.remove(this.__searchingCollaborators); + this.__collabButtonsContainer.add(this.__searchingCollaborators); }, __shareClicked: function() { diff --git a/services/static-webserver/client/source/class/osparc/ui/hint/Hint.js b/services/static-webserver/client/source/class/osparc/ui/hint/Hint.js index d3f5d2d21d1..cc3bc8f7333 100644 --- a/services/static-webserver/client/source/class/osparc/ui/hint/Hint.js +++ b/services/static-webserver/client/source/class/osparc/ui/hint/Hint.js @@ -71,6 +71,8 @@ qx.Class.define("osparc.ui.hint.Hint", { setText: function(text) { this.getChildControl("label").setValue(text); + // After setting the text, the dimensions of the Hint changed: recenter it + setTimeout(() => this.updatePosition(), 10); } } }); diff --git a/services/storage/src/simcore_service_storage/dsm_cleaner.py b/services/storage/src/simcore_service_storage/dsm_cleaner.py index 67d859a4032..fe3fcf897ea 100644 --- a/services/storage/src/simcore_service_storage/dsm_cleaner.py +++ b/services/storage/src/simcore_service_storage/dsm_cleaner.py @@ -24,9 +24,9 @@ from typing import cast from aiohttp import web -from servicelib.background_task import stop_periodic_task +from servicelib.async_utils import cancel_wait_task +from servicelib.background_task_utils import exclusive_periodic from servicelib.logging_utils import log_catch, log_context -from servicelib.redis import start_exclusive_periodic_task from .constants import APP_CONFIG_KEY, APP_DSM_KEY from .dsm_factory import DataManagerProvider @@ -45,14 +45,7 @@ async def dsm_cleaner_task(app: web.Application) -> None: simcore_s3_dsm: SimcoreS3DataManager = cast( SimcoreS3DataManager, dsm.get(SimcoreS3DataManager.get_location_id()) ) - try: - await simcore_s3_dsm.clean_expired_uploads() - - except asyncio.CancelledError: # noqa: PERF203 - _logger.info("cancelled dsm cleaner task") - raise - except Exception: # pylint: disable=broad-except - _logger.exception("Unhandled error in dsm cleaner task, restarting task...") + await simcore_s3_dsm.clean_expired_uploads() def setup_dsm_cleaner(app: web.Application): @@ -64,17 +57,19 @@ async def _setup(app: web.Application): cfg: Settings = app[APP_CONFIG_KEY] assert cfg.STORAGE_CLEANER_INTERVAL_S # nosec - storage_background_task = start_exclusive_periodic_task( + @exclusive_periodic( get_redis_client(app), - dsm_cleaner_task, - task_period=timedelta(seconds=cfg.STORAGE_CLEANER_INTERVAL_S), + task_interval=timedelta(seconds=cfg.STORAGE_CLEANER_INTERVAL_S), retry_after=timedelta(minutes=5), - task_name=_TASK_NAME_PERIODICALY_CLEAN_DSM, - app=app, ) + async def _periodic_dsm_clean() -> None: + await dsm_cleaner_task(app) + storage_background_task = asyncio.create_task( + _periodic_dsm_clean(), name=_TASK_NAME_PERIODICALY_CLEAN_DSM + ) yield - await stop_periodic_task(storage_background_task) + await cancel_wait_task(storage_background_task) app.cleanup_ctx.append(_setup) diff --git a/services/storage/src/simcore_service_storage/redis.py b/services/storage/src/simcore_service_storage/redis.py index f18f891ec19..9bf600e3907 100644 --- a/services/storage/src/simcore_service_storage/redis.py +++ b/services/storage/src/simcore_service_storage/redis.py @@ -24,7 +24,6 @@ async def _setup(app: web.Application): app[_APP_REDIS_KEY] = client = RedisClientSDK( redis_locks_dsn, client_name=APP_NAME ) - await client.setup() yield diff --git a/services/storage/tests/unit/test_dsm_cleaner.py b/services/storage/tests/unit/test_dsm_cleaner.py index 95b883ede7f..70134dbdb88 100644 --- a/services/storage/tests/unit/test_dsm_cleaner.py +++ b/services/storage/tests/unit/test_dsm_cleaner.py @@ -37,9 +37,7 @@ def short_dsm_cleaner_interval(monkeypatch: pytest.MonkeyPatch) -> int: async def test_setup_dsm_cleaner(client: TestClient): all_tasks = asyncio.all_tasks() assert any( - t.get_name().startswith( - f"exclusive_task_starter_{_TASK_NAME_PERIODICALY_CLEAN_DSM}" - ) + t.get_name().startswith(f"{_TASK_NAME_PERIODICALY_CLEAN_DSM}") for t in all_tasks ) @@ -47,9 +45,7 @@ async def test_setup_dsm_cleaner(client: TestClient): async def test_disable_dsm_cleaner(disable_dsm_cleaner, client: TestClient): all_tasks = asyncio.all_tasks() assert not any( - t.get_name().startswith( - f"exclusive_task_starter_{_TASK_NAME_PERIODICALY_CLEAN_DSM}" - ) + t.get_name().startswith(f"{_TASK_NAME_PERIODICALY_CLEAN_DSM}") for t in all_tasks ) diff --git a/services/web/server/src/simcore_service_webserver/projects/projects_api.py b/services/web/server/src/simcore_service_webserver/projects/projects_api.py index 3edd4c50e39..d4507c565e4 100644 --- a/services/web/server/src/simcore_service_webserver/projects/projects_api.py +++ b/services/web/server/src/simcore_service_webserver/projects/projects_api.py @@ -1,10 +1,10 @@ -""" Interface to other subsystems +"""Interface to other subsystems - - Data validation - - Operations on projects - - are NOT handlers, therefore do not return web.Response - - return data and successful HTTP responses (or raise them) - - upon failure raise errors that can be also HTTP reponses +- Data validation +- Operations on projects + - are NOT handlers, therefore do not return web.Response + - return data and successful HTTP responses (or raise them) + - upon failure raise errors that can be also HTTP reponses """ import asyncio @@ -53,7 +53,7 @@ PricingAndHardwareInfoTuple, PricingInfo, ) -from models_library.services import DynamicServiceKey, ServiceKey, ServiceVersion +from models_library.services import ServiceKey, ServiceVersion from models_library.services_resources import ( DEFAULT_SINGLE_SERVICE_NAME, ServiceResourcesDict, @@ -81,6 +81,7 @@ ServiceWaitingForManualInterventionError, ServiceWasNotFoundError, ) +from servicelib.redis._decorators import exclusive from servicelib.utils import fire_and_forget_task, logged_gather from simcore_postgres_database.models.users import UserRole from simcore_postgres_database.utils_projects_nodes import ( @@ -88,7 +89,6 @@ ProjectNodesNodeNotFoundError, ) from simcore_postgres_database.webserver_models import ProjectType -from simcore_service_webserver.projects._db_utils import PermissionStr from ..application_settings import get_application_settings from ..catalog import client as catalog_client @@ -126,6 +126,7 @@ check_user_project_permission, has_user_project_access_rights, ) +from ._db_utils import PermissionStr from ._nodes_utils import set_reservation_same_as_limit, validate_new_service_resources from ._wallets_api import connect_wallet_to_project, get_project_wallet from .db import APP_PROJECT_DBAPI, ProjectDBAPI @@ -488,11 +489,7 @@ def _by_type_name(ec2: EC2InstanceTypeGet) -> bool: except KeyError as exc: raise InvalidKeysInResourcesSpecsError(missing_key=f"{exc}") from exc - except ( - RemoteMethodNotRegisteredError, - RPCServerError, - asyncio.TimeoutError, - ) as exc: + except (TimeoutError, RemoteMethodNotRegisteredError, RPCServerError) as exc: raise ClustersKeeperNotAvailableError from exc @@ -503,7 +500,6 @@ async def _check_project_node_has_all_required_inputs( project_uuid: ProjectID, node_id: NodeID, ) -> None: - product_name = await db.get_project_product(project_uuid) await check_user_project_permission( app, @@ -560,7 +556,7 @@ def _check_required_input(required_input_key: KeyIDStr) -> None: ) -async def _start_dynamic_service( +async def _start_dynamic_service( # noqa: C901 request: web.Request, *, service_key: ServiceKey, @@ -599,17 +595,17 @@ async def _start_dynamic_service( request.app, project_id=project_uuid, user_id=user_id, permission="write" ) - lock_key = _nodes_api.get_service_start_lock_key(user_id, project_uuid) - redis_client_sdk = get_redis_lock_manager_client_sdk(request.app) - project_settings: ProjectsSettings = get_plugin_settings(request.app) - - async with redis_client_sdk.lock_context( - lock_key, + @exclusive( + get_redis_lock_manager_client_sdk(request.app), + lock_key=_nodes_api.get_service_start_lock_key(user_id, project_uuid), blocking=True, - blocking_timeout_s=_nodes_api.get_total_project_dynamic_nodes_creation_interval( - project_settings.PROJECTS_MAX_NUM_RUNNING_DYNAMIC_NODES + blocking_timeout=datetime.timedelta( + seconds=_nodes_api.get_total_project_dynamic_nodes_creation_interval( + get_plugin_settings(request.app).PROJECTS_MAX_NUM_RUNNING_DYNAMIC_NODES + ) ), - ): + ) + async def _() -> None: project_running_nodes = await dynamic_scheduler_api.list_dynamic_services( request.app, user_id=user_id, project_id=project_uuid ) @@ -742,7 +738,7 @@ async def _start_dynamic_service( can_save=save_state, project_id=project_uuid, user_id=user_id, - service_key=DynamicServiceKey(service_key), + service_key=service_key, service_version=service_version, service_uuid=node_uuid, request_dns=extract_dns_without_default_port(request.url), @@ -759,6 +755,8 @@ async def _start_dynamic_service( ), ) + await _() + async def add_project_node( request: web.Request,