Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Adding director-v0 client to dynamic-scheduler #7001

Merged
merged 21 commits into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,7 @@ class SimcoreServiceLabels(DynamicSidecarServiceLabels):
settings: Annotated[
Json[SimcoreServiceSettingsLabel],
Field(
default_factory=dict,
default_factory=lambda: SimcoreServiceSettingsLabel.model_validate([]),
alias="simcore.service.settings",
description=(
"Json encoded. Contains setting like environment variables and "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,10 +200,10 @@ def __init__(
self,
*,
total_retry_interval: float,
tracing_settings: TracingSettings | None,
base_url: URLTypes | None = None,
default_http_client_timeout: TimeoutTypes | None = None,
extra_allowed_method_names: set[str] | None = None,
tracing_settings: TracingSettings | None,
) -> None:
_assert_public_interface(self, extra_allowed_method_names)

Expand Down
25 changes: 25 additions & 0 deletions packages/settings-library/src/settings_library/director_v0.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from functools import cached_property

from pydantic import AnyHttpUrl, Field, TypeAdapter
from settings_library.base import BaseCustomSettings
from settings_library.basic_types import PortInt, VersionTag


class DirectorV0Settings(BaseCustomSettings):
DIRECTOR_ENABLED: bool = True

DIRECTOR_HOST: str = "director"
GitHK marked this conversation as resolved.
Show resolved Hide resolved
DIRECTOR_PORT: PortInt = TypeAdapter(PortInt).validate_python(8000)
DIRECTOR_VTAG: VersionTag = Field(
default="v0", description="Director-v0 service API's version tag"
)

@cached_property
def endpoint(self) -> str:
url = AnyHttpUrl.build( # pylint: disable=no-member
scheme="http",
host=self.DIRECTOR_HOST,
port=self.DIRECTOR_PORT,
path=f"{self.DIRECTOR_VTAG}",
)
return f"{url}"
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ def init_app(settings: AppSettings | None = None) -> FastAPI:
if settings.DIRECTOR_V2_TRACING:
setup_tracing(app, settings.DIRECTOR_V2_TRACING, APP_NAME)

if settings.DIRECTOR_V0.DIRECTOR_V0_ENABLED:
if settings.DIRECTOR_V0.DIRECTOR_ENABLED:
director_v0.setup(
app,
director_v0_settings=settings.DIRECTOR_V0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from common_library.pydantic_validators import validate_numeric_string_as_timedelta
from fastapi import FastAPI
from models_library.basic_types import LogLevel, PortInt, VersionTag
from models_library.basic_types import LogLevel, PortInt
from models_library.clusters import (
BaseCluster,
ClusterAuthentication,
Expand All @@ -29,6 +29,7 @@
from settings_library.application import BaseApplicationSettings
from settings_library.base import BaseCustomSettings
from settings_library.catalog import CatalogSettings
from settings_library.director_v0 import DirectorV0Settings
from settings_library.docker_registry import RegistrySettings
from settings_library.http_client_request import ClientRequestSettings
from settings_library.node_ports import (
Expand All @@ -50,26 +51,6 @@
from .dynamic_services_settings import DynamicServicesSettings


class DirectorV0Settings(BaseCustomSettings):
DIRECTOR_V0_ENABLED: bool = True

DIRECTOR_HOST: str = "director"
DIRECTOR_PORT: PortInt = PortInt(8080)
DIRECTOR_V0_VTAG: VersionTag = Field(
default="v0", description="Director-v0 service API's version tag"
)

@cached_property
def endpoint(self) -> str:
url = AnyHttpUrl.build( # pylint: disable=no-member
scheme="http",
host=self.DIRECTOR_HOST,
port=self.DIRECTOR_PORT,
path=f"{self.DIRECTOR_V0_VTAG}",
)
return f"{url}"


class ComputationalBackendSettings(BaseCustomSettings):
COMPUTATIONAL_BACKEND_ENABLED: bool = Field(
default=True,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
from models_library.users import UserID
from servicelib.fastapi.tracing import setup_httpx_client_tracing
from servicelib.logging_utils import log_decorator
from settings_library.director_v0 import DirectorV0Settings
from settings_library.tracing import TracingSettings

from ..core.settings import DirectorV0Settings
from ..utils.client_decorators import handle_errors, handle_retry
from ..utils.clients import unenvelope_or_raise_error

Expand Down
2 changes: 1 addition & 1 deletion services/director-v2/tests/unit/test_modules_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def minimal_catalog_config(
"""set a minimal configuration for testing the director connection only"""
monkeypatch.setenv("DIRECTOR_ENABLED", "0")
monkeypatch.setenv("DIRECTOR_V2_DYNAMIC_SCHEDULER_ENABLED", "false")
monkeypatch.setenv("DIRECTOR_V0_ENABLED", "0")
monkeypatch.setenv("DIRECTOR_ENABLED", "0")
monkeypatch.setenv("COMPUTATIONAL_BACKEND_DASK_CLIENT_ENABLED", "0")
monkeypatch.setenv("COMPUTATIONAL_BACKEND_ENABLED", "0")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ def _minimal_dask_config(
"""set a minimal configuration for testing the dask connection only"""
monkeypatch.setenv("DIRECTOR_ENABLED", "0")
monkeypatch.setenv("DIRECTOR_V2_DYNAMIC_SIDECAR_ENABLED", "false")
monkeypatch.setenv("DIRECTOR_V0_ENABLED", "0")
monkeypatch.setenv("DIRECTOR_ENABLED", "0")
monkeypatch.setenv("DIRECTOR_V2_CATALOG", "null")
monkeypatch.setenv("COMPUTATIONAL_BACKEND_DASK_CLIENT_ENABLED", "1")
monkeypatch.setenv("COMPUTATIONAL_BACKEND_ENABLED", "0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def minimal_dask_config(
"""set a minimal configuration for testing the dask connection only"""
monkeypatch.setenv("DIRECTOR_ENABLED", "0")
monkeypatch.setenv("DIRECTOR_V2_DYNAMIC_SIDECAR_ENABLED", "false")
monkeypatch.setenv("DIRECTOR_V0_ENABLED", "0")
monkeypatch.setenv("DIRECTOR_ENABLED", "0")
monkeypatch.setenv("DIRECTOR_V2_CATALOG", "null")
monkeypatch.setenv("COMPUTATIONAL_BACKEND_DASK_CLIENT_ENABLED", "1")
monkeypatch.setenv("COMPUTATIONAL_BACKEND_ENABLED", "0")
Expand Down
1 change: 0 additions & 1 deletion services/director-v2/tests/unit/test_modules_notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ def mock_env(
"S3_SECRET_KEY": faker.pystr(),
"S3_BUCKET_NAME": faker.pystr(),
"DIRECTOR_ENABLED": "0",
"DIRECTOR_V0_ENABLED": "0",
"DIRECTOR_V2_CATALOG": "null",
"COMPUTATIONAL_BACKEND_DASK_CLIENT_ENABLED": "0",
"COMPUTATIONAL_BACKEND_ENABLED": "0",
Expand Down
2 changes: 1 addition & 1 deletion services/director-v2/tests/unit/test_modules_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def minimal_storage_config(
"""set a minimal configuration for testing the director connection only"""
monkeypatch.setenv("DIRECTOR_ENABLED", "0")
monkeypatch.setenv("DIRECTOR_V2_DYNAMIC_SCHEDULER_ENABLED", "false")
monkeypatch.setenv("DIRECTOR_V0_ENABLED", "0")
monkeypatch.setenv("DIRECTOR_ENABLED", "0")
monkeypatch.setenv("DIRECTOR_V2_CATALOG", "null")
monkeypatch.setenv("COMPUTATIONAL_BACKEND_DASK_CLIENT_ENABLED", "0")
monkeypatch.setenv("COMPUTATIONAL_BACKEND_ENABLED", "0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from ..api.rest.routes import setup_rest_api
from ..api.rpc.routes import setup_rpc_api_routes
from ..services.deferred_manager import setup_deferred_manager
from ..services.director_v0 import setup_director_v0
from ..services.director_v2 import setup_director_v2
from ..services.notifier import setup_notifier
from ..services.rabbitmq import setup_rabbitmq
Expand Down Expand Up @@ -62,6 +63,7 @@ def create_app(settings: ApplicationSettings | None = None) -> FastAPI:
# PLUGINS SETUP

setup_director_v2(app)
setup_director_v0(app)

setup_rabbitmq(app)
setup_rpc_api_routes(app)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
from servicelib.logging_utils_filtering import LoggerName, MessageSubstring
from settings_library.application import BaseApplicationSettings
from settings_library.basic_types import LogLevel, VersionTag
from settings_library.director_v0 import DirectorV0Settings
from settings_library.director_v2 import DirectorV2Settings
from settings_library.http_client_request import ClientRequestSettings
from settings_library.rabbit import RabbitSettings
from settings_library.redis import RedisSettings
from settings_library.tracing import TracingSettings
Expand Down Expand Up @@ -128,6 +130,15 @@ class ApplicationSettings(_BaseApplicationSettings):
default=True, description="If true, it displays swagger doc at /doc"
)

CLIENT_REQUEST: ClientRequestSettings = Field(
json_schema_extra={"auto_default_from_env": True}
)

DYNAMIC_SCHEDULER_DIRECTOR_V0_SETTINGS: DirectorV0Settings = Field(
json_schema_extra={"auto_default_from_env": True},
description="settings for director service",
)

DYNAMIC_SCHEDULER_DIRECTOR_V2_SETTINGS: DirectorV2Settings = Field(
json_schema_extra={"auto_default_from_env": True},
description="settings for director-v2 service",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from ._public_client import DirectorV0PublicClient
from ._setup import setup_director_v0

__all__: tuple[str, ...] = (
"DirectorV0PublicClient",
"setup_director_v0",
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import logging
from typing import Any, cast

import httpx
from fastapi import FastAPI
from models_library.api_schemas_directorv2.dynamic_services_service import (
RunningDynamicServiceDetails,
)
from models_library.projects import ProjectID
from models_library.projects_nodes_io import NodeID
from models_library.users import UserID
from pydantic import TypeAdapter
from servicelib.fastapi.app_state import SingletonInAppStateMixin

from ._thin_client import DirectorV0ThinClient

logger = logging.getLogger(__name__)


def _unenvelope_or_raise_error(resp: httpx.Response) -> dict | list:
"""
Director responses are enveloped
If successful response, we un-envelop it and return data as a dict
If error, is detected raise an ValueError
"""
body = resp.json()
if "data" in body:
return body["data"] # type: ignore[no-any-return]

msg = f"Unexpected, data was not returned: {body=}"
raise ValueError(msg)


class DirectorV0PublicClient(SingletonInAppStateMixin):
app_state_name: str = "director_v0_public_client"

def __init__(self, app: FastAPI) -> None:
self.app = app

async def get_running_service_details(
self, node_id: NodeID
) -> RunningDynamicServiceDetails:
response = await DirectorV0ThinClient.get_from_app_state(
self.app
).get_running_interactive_service_details(node_id)
return TypeAdapter(RunningDynamicServiceDetails).validate_python(
_unenvelope_or_raise_error(response)
)

async def get_running_services(
self, user_id: UserID | None = None, project_id: ProjectID | None = None
) -> list[RunningDynamicServiceDetails]:
response = await DirectorV0ThinClient.get_from_app_state(
self.app
).get_running_interactive_services(user_id=user_id, project_id=project_id)
return [
RunningDynamicServiceDetails(**x)
for x in cast(list[dict[str, Any]], _unenvelope_or_raise_error(response))
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from fastapi import FastAPI

from ._public_client import DirectorV0PublicClient
from ._thin_client import DirectorV0ThinClient


def setup_director_v0(app: FastAPI) -> None:
async def _on_startup() -> None:
thin_client = DirectorV0ThinClient(app)
thin_client.set_to_app_state(app)
thin_client.attach_lifespan_to(app)

public_client = DirectorV0PublicClient(app)
public_client.set_to_app_state(app)

async def _on_shutdown() -> None:
DirectorV0PublicClient.pop_from_app_state(app)
DirectorV0ThinClient.pop_from_app_state(app)

app.add_event_handler("startup", _on_startup)
app.add_event_handler("shutdown", _on_shutdown)
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
from common_library.exclude import as_dict_exclude_none
from fastapi import FastAPI, status
from httpx import Response
from models_library.projects import ProjectID
from models_library.projects_nodes_io import NodeID
from models_library.users import UserID
from servicelib.fastapi.app_state import SingletonInAppStateMixin
from servicelib.fastapi.http_client import AttachLifespanMixin
from servicelib.fastapi.http_client_thin import (
BaseThinClient,
expect_status,
retry_on_errors,
)
from yarl import URL

from ...core.settings import ApplicationSettings


class DirectorV0ThinClient(
SingletonInAppStateMixin, BaseThinClient, AttachLifespanMixin
):
app_state_name: str = "director_v0_thin_client"

def __init__(self, app: FastAPI) -> None:
settings: ApplicationSettings = app.state.settings
assert settings.CLIENT_REQUEST.HTTP_CLIENT_REQUEST_TOTAL_TIMEOUT # nosec

super().__init__(
total_retry_interval=int(
settings.CLIENT_REQUEST.HTTP_CLIENT_REQUEST_TOTAL_TIMEOUT
),
extra_allowed_method_names={
"attach_lifespan_to",
"get_from_app_state",
"pop_from_app_state",
"set_to_app_state",
},
base_url=settings.DYNAMIC_SCHEDULER_DIRECTOR_V0_SETTINGS.endpoint,
tracing_settings=settings.DYNAMIC_SCHEDULER_TRACING,
)

@retry_on_errors()
@expect_status(status.HTTP_200_OK)
async def get_running_interactive_service_details(
self, node_id: NodeID
) -> Response:
return await self.client.get(f"/running_interactive_services/{node_id}")

@retry_on_errors()
@expect_status(status.HTTP_200_OK)
async def get_running_interactive_services(
self, user_id: UserID | None, project_id: ProjectID | None
) -> Response:
request_url = URL("/running_interactive_services").with_query(
as_dict_exclude_none(user_id=user_id, study_id=project_id)
)
return await self.client.get(f"{request_url}")
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from models_library.users import UserID
from pydantic import NonNegativeInt, TypeAdapter
from servicelib.fastapi.app_state import SingletonInAppStateMixin
from servicelib.fastapi.http_client import AttachLifespanMixin, HasClientSetupInterface
from servicelib.fastapi.http_client import AttachLifespanMixin
from servicelib.fastapi.http_client_thin import UnexpectedStatusError
from servicelib.rabbitmq.rpc_interfaces.dynamic_scheduler.errors import (
ServiceWaitingForManualInterventionError,
Expand All @@ -27,9 +27,7 @@
from ._thin_client import DirectorV2ThinClient


class DirectorV2Client(
SingletonInAppStateMixin, AttachLifespanMixin, HasClientSetupInterface
):
class DirectorV2Client(SingletonInAppStateMixin, AttachLifespanMixin):
app_state_name: str = "director_v2_client"

def __init__(self, app: FastAPI) -> None:
Expand Down Expand Up @@ -145,5 +143,4 @@ async def update_projects_networks(self, *, project_id: ProjectID) -> None:

def setup_director_v2(app: FastAPI) -> None:
public_client = DirectorV2Client(app)
public_client.thin_client.attach_lifespan_to(app)
public_client.set_to_app_state(app)
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
X_DYNAMIC_SIDECAR_REQUEST_SCHEME,
X_SIMCORE_USER_AGENT,
)
from servicelib.fastapi.http_client import AttachLifespanMixin
from servicelib.fastapi.http_client_thin import (
BaseThinClient,
expect_status,
Expand All @@ -32,7 +31,7 @@
from ...core.settings import ApplicationSettings


class DirectorV2ThinClient(BaseThinClient, AttachLifespanMixin):
class DirectorV2ThinClient(BaseThinClient):
def __init__(self, app: FastAPI) -> None:
settings: ApplicationSettings = app.state.settings
super().__init__(
Expand Down
Loading
Loading