Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into pr-osparc-ooil-ex…
Browse files Browse the repository at this point in the history
…tensions
  • Loading branch information
Andrei Neagu committed Jan 15, 2025
2 parents 0d2cb28 + a63e747 commit 2348feb
Show file tree
Hide file tree
Showing 72 changed files with 1,384 additions and 1,274 deletions.
12 changes: 5 additions & 7 deletions packages/pytest-simcore/src/pytest_simcore/rabbit_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import asyncio
import logging
from collections.abc import AsyncIterator, Awaitable, Callable
from contextlib import suppress

import aio_pika
import pytest
Expand Down Expand Up @@ -141,12 +140,11 @@ async def ensure_parametrized_queue_is_empty(
rabbitmq_client = create_rabbitmq_client("pytest-purger")

async def _queue_messages_purger() -> None:
with suppress(aio_pika.exceptions.ChannelClosed):
assert rabbitmq_client._channel_pool # noqa: SLF001
async with rabbitmq_client._channel_pool.acquire() as channel: # noqa: SLF001
assert isinstance(channel, aio_pika.RobustChannel)
queue = await channel.get_queue(queue_name)
await queue.purge()
assert rabbitmq_client._channel_pool # noqa: SLF001
async with rabbitmq_client._channel_pool.acquire() as channel: # noqa: SLF001
assert isinstance(channel, aio_pika.RobustChannel)
queue = await channel.get_queue(queue_name)
await queue.purge()

await _queue_messages_purger()
yield
Expand Down
5 changes: 3 additions & 2 deletions packages/pytest-simcore/src/pytest_simcore/redis_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import tenacity
from pytest_mock import MockerFixture
from redis.asyncio import Redis, from_url
from servicelib.redis import _constants as redis_constants
from settings_library.basic_types import PortInt
from settings_library.redis import RedisDatabase, RedisSettings
from tenacity.before_sleep import before_sleep_log
Expand Down Expand Up @@ -119,4 +118,6 @@ async def wait_till_redis_responsive(redis_url: URL | str) -> None:
@pytest.fixture
def mock_redis_socket_timeout(mocker: MockerFixture) -> None:
# lowered to allow CI to properly shutdown RedisClientSDK instances
mocker.patch.object(redis_constants, "DEFAULT_SOCKET_TIMEOUT", timedelta(seconds=1))
mocker.patch(
"servicelib.redis._client.DEFAULT_SOCKET_TIMEOUT", timedelta(seconds=0.25)
)
64 changes: 54 additions & 10 deletions packages/service-library/src/servicelib/async_utils.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,22 @@
import asyncio
import contextlib
import datetime
import logging
from collections import deque
from collections.abc import Awaitable, Callable, Coroutine
from contextlib import suppress
from dataclasses import dataclass
from functools import wraps
from typing import TYPE_CHECKING, Any, Awaitable, Callable, Deque
from typing import TYPE_CHECKING, Any, ParamSpec, TypeVar

from . import tracing
from .utils_profiling_middleware import dont_profile, is_profiling, profile_context

logger = logging.getLogger(__name__)
_logger = logging.getLogger(__name__)

P = ParamSpec("P")
R = TypeVar("R")


if TYPE_CHECKING:
Queue = asyncio.Queue
Expand Down Expand Up @@ -54,7 +61,7 @@ async def _safe_cancel(context: Context) -> None:
await context.task
except RuntimeError as e:
if "Event loop is closed" in f"{e}":
logger.warning("event loop is closed and could not cancel %s", context)
_logger.warning("event loop is closed and could not cancel %s", context)
else:
raise

Expand All @@ -65,7 +72,7 @@ async def cancel_sequential_workers() -> None:
await _safe_cancel(context)

_sequential_jobs_contexts.clear()
logger.info("All run_sequentially_in_context pending workers stopped")
_logger.info("All run_sequentially_in_context pending workers stopped")


# NOTE: If you get funny mismatches with mypy in returned values it might be due to this decorator.
Expand Down Expand Up @@ -118,25 +125,25 @@ def _get_context(args: Any, kwargs: dict) -> Context:
arg_names = decorated_function.__code__.co_varnames[
: decorated_function.__code__.co_argcount
]
search_args = dict(zip(arg_names, args))
search_args = dict(zip(arg_names, args, strict=False))
search_args.update(kwargs)

key_parts: Deque[str] = deque()
key_parts: deque[str] = deque()
for arg in target_args:
sub_args = arg.split(".")
main_arg = sub_args[0]
if main_arg not in search_args:
raise ValueError(
msg = (
f"Expected '{main_arg}' in '{decorated_function.__name__}'"
f" arguments. Got '{search_args}'"
)
raise ValueError(msg)
context_key = search_args[main_arg]
for attribute in sub_args[1:]:
potential_key = getattr(context_key, attribute)
if not potential_key:
raise ValueError(
f"Expected '{attribute}' attribute in '{context_key.__name__}' arguments."
)
msg = f"Expected '{attribute}' attribute in '{context_key.__name__}' arguments."
raise ValueError(msg)
context_key = potential_key

key_parts.append(f"{decorated_function.__name__}_{context_key}")
Expand Down Expand Up @@ -205,3 +212,40 @@ async def worker(in_q: Queue[QueueElement], out_q: Queue) -> None:
return wrapper

return decorator


def delayed_start(
delay: datetime.timedelta,
) -> Callable[
[Callable[P, Coroutine[Any, Any, R]]], Callable[P, Coroutine[Any, Any, R]]
]:
def _decorator(
func: Callable[P, Coroutine[Any, Any, R]],
) -> Callable[P, Coroutine[Any, Any, R]]:
@wraps(func)
async def _wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
await asyncio.sleep(delay.total_seconds())
return await func(*args, **kwargs)

return _wrapper

return _decorator


async def cancel_wait_task(
task: asyncio.Task,
*,
max_delay: float | None = None,
) -> None:
"""Cancel a asyncio.Task and waits for it to finish.
:param task: task to be canceled
:param max_delay: duration (in seconds) to wait before giving
up the cancellation. If None it waits forever.
:raises TimeoutError: raised if cannot cancel the task.
"""

task.cancel()
async with asyncio.timeout(max_delay):
with contextlib.suppress(asyncio.CancelledError):
await task
176 changes: 80 additions & 96 deletions packages/service-library/src/servicelib/background_task.py
Original file line number Diff line number Diff line change
@@ -1,136 +1,115 @@
import asyncio
import contextlib
import datetime
import functools
import logging
from collections.abc import AsyncIterator, Awaitable, Callable
from typing import Final
from collections.abc import AsyncIterator, Awaitable, Callable, Coroutine
from typing import Any, Final, ParamSpec, TypeVar

from common_library.errors_classes import OsparcErrorMixin
from tenacity import TryAgain
from tenacity.asyncio import AsyncRetrying
from tenacity.stop import stop_after_attempt
from tenacity import TryAgain, before_sleep_log, retry, retry_if_exception_type
from tenacity.wait import wait_fixed

from .decorators import async_delayed
from .logging_utils import log_catch, log_context
from .async_utils import cancel_wait_task, delayed_start
from .logging_utils import log_context

_logger = logging.getLogger(__name__)


_DEFAULT_STOP_TIMEOUT_S: Final[int] = 5
_MAX_TASK_CANCELLATION_ATTEMPTS: Final[int] = 3


class PeriodicTaskCancellationError(OsparcErrorMixin, Exception):
msg_template: str = "Could not cancel task '{task_name}'"


class SleepUsingAsyncioEvent:
"""Sleep strategy that waits on an event to be set."""
"""Sleep strategy that waits on an event to be set or sleeps."""

def __init__(self, event: "asyncio.Event") -> None:
self.event = event

async def __call__(self, timeout: float | None) -> None:
async def __call__(self, delay: float | None) -> None:
with contextlib.suppress(TimeoutError):
await asyncio.wait_for(self.event.wait(), timeout=timeout)
await asyncio.wait_for(self.event.wait(), timeout=delay)
self.event.clear()


async def _periodic_scheduled_task(
task: Callable[..., Awaitable[None]],
P = ParamSpec("P")
R = TypeVar("R")


def periodic(
*,
interval: datetime.timedelta,
task_name: str,
early_wake_up_event: asyncio.Event | None,
**task_kwargs,
) -> None:
# NOTE: This retries forever unless cancelled
nap = (
asyncio.sleep
if early_wake_up_event is None
else SleepUsingAsyncioEvent(early_wake_up_event)
)
async for attempt in AsyncRetrying(
sleep=nap,
wait=wait_fixed(interval.total_seconds()),
):
with attempt:
with log_context(
_logger,
logging.DEBUG,
msg=f"iteration {attempt.retry_state.attempt_number} of '{task_name}'",
), log_catch(_logger):
await task(**task_kwargs)
raise_on_error: bool = False,
early_wake_up_event: asyncio.Event | None = None,
) -> Callable[
[Callable[P, Coroutine[Any, Any, None]]], Callable[P, Coroutine[Any, Any, None]]
]:
"""Calls the function periodically with a given interval.
Arguments:
interval -- the interval between calls
Keyword Arguments:
raise_on_error -- If False the function will be retried indefinitely unless cancelled.
If True the function will be retried indefinitely unless cancelled
or an exception is raised. (default: {False})
early_wake_up_event -- allows to awaken the function before the interval has passed. (default: {None})
Returns:
coroutine that will be called periodically (runs forever)
"""

def _decorator(
func: Callable[P, Coroutine[Any, Any, None]],
) -> Callable[P, Coroutine[Any, Any, None]]:
nap = (
asyncio.sleep
if early_wake_up_event is None
else SleepUsingAsyncioEvent(early_wake_up_event)
)

@retry(
sleep=nap,
wait=wait_fixed(interval.total_seconds()),
reraise=True,
retry=(
retry_if_exception_type(TryAgain)
if raise_on_error
else retry_if_exception_type()
),
before_sleep=before_sleep_log(_logger, logging.DEBUG),
)
@functools.wraps(func)
async def _wrapper(*args: P.args, **kwargs: P.kwargs) -> None:
await func(*args, **kwargs)
raise TryAgain

return _wrapper

def start_periodic_task(
return _decorator


def create_periodic_task(
task: Callable[..., Awaitable[None]],
*,
interval: datetime.timedelta,
task_name: str,
raise_on_error: bool = False,
wait_before_running: datetime.timedelta = datetime.timedelta(0),
early_wake_up_event: asyncio.Event | None = None,
**kwargs,
) -> asyncio.Task:
with log_context(
_logger, logging.DEBUG, msg=f"create periodic background task '{task_name}'"
):
delayed_periodic_scheduled_task = async_delayed(wait_before_running)(
_periodic_scheduled_task
)
return asyncio.create_task(
delayed_periodic_scheduled_task(
task,
interval=interval,
task_name=task_name,
early_wake_up_event=early_wake_up_event,
**kwargs,
),
name=task_name,
)

@delayed_start(wait_before_running)
@periodic(
interval=interval,
raise_on_error=raise_on_error,
early_wake_up_event=early_wake_up_event,
)
async def _() -> None:
await task(**kwargs)

async def cancel_task(
task: asyncio.Task,
*,
timeout: float | None,
cancellation_attempts: int = _MAX_TASK_CANCELLATION_ATTEMPTS,
) -> None:
"""Reliable task cancellation. Some libraries will just hang without
cancelling the task. It is important to retry the operation to provide
a timeout in that situation to avoid forever pending tasks.
:param task: task to be canceled
:param timeout: total duration (in seconds) to wait before giving
up the cancellation. If None it waits forever.
:raises TryAgain: raised if cannot cancel the task.
"""
async for attempt in AsyncRetrying(
stop=stop_after_attempt(cancellation_attempts), reraise=True
):
with attempt:
task.cancel()
_, pending = await asyncio.wait((task,), timeout=timeout)
if pending:
task_name = task.get_name()
_logger.info(
"tried to cancel '%s' but timed-out! %s", task_name, pending
)
raise PeriodicTaskCancellationError(task_name=task_name)


async def stop_periodic_task(
asyncio_task: asyncio.Task, *, timeout: float | None = None
) -> None:
with log_context(
_logger,
logging.DEBUG,
msg=f"cancel periodic background task '{asyncio_task.get_name()}'",
_logger, logging.DEBUG, msg=f"create periodic background task '{task_name}'"
):
await cancel_task(asyncio_task, timeout=timeout)
return asyncio.create_task(_(), name=task_name)


@contextlib.asynccontextmanager
Expand All @@ -140,16 +119,21 @@ async def periodic_task(
interval: datetime.timedelta,
task_name: str,
stop_timeout: float = _DEFAULT_STOP_TIMEOUT_S,
raise_on_error: bool = False,
**kwargs,
) -> AsyncIterator[asyncio.Task]:
asyncio_task: asyncio.Task | None = None
try:
asyncio_task = start_periodic_task(
task, interval=interval, task_name=task_name, **kwargs
asyncio_task = create_periodic_task(
task,
interval=interval,
task_name=task_name,
raise_on_error=raise_on_error,
**kwargs,
)
yield asyncio_task
finally:
if asyncio_task is not None:
# NOTE: this stopping is shielded to prevent the cancellation to propagate
# into the stopping procedure
await asyncio.shield(stop_periodic_task(asyncio_task, timeout=stop_timeout))
await asyncio.shield(cancel_wait_task(asyncio_task, max_delay=stop_timeout))
Loading

0 comments on commit 2348feb

Please sign in to comment.