Skip to content

Commit

Permalink
squashme: minor fixes for data connectors
Browse files Browse the repository at this point in the history
  • Loading branch information
olevski committed Oct 23, 2024
1 parent 44902b9 commit aae3f82
Show file tree
Hide file tree
Showing 8 changed files with 159 additions and 50 deletions.
1 change: 1 addition & 0 deletions bases/renku_data_services/data_api/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ def register_all_handlers(app: Sanic, config: Config) -> Sanic:
rp_repo=config.rp_repo,
data_connector_repo=config.data_connector_repo,
data_connector_project_link_repo=config.data_connector_to_project_link_repo,
data_connector_secret_repo=config.data_connector_secret_repo,
internal_gitlab_authenticator=config.gitlab_authenticator,
)
platform_config = PlatformConfigBP(
Expand Down
1 change: 1 addition & 0 deletions components/renku_data_services/app_config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,7 @@ def data_connector_secret_repo(self) -> DataConnectorSecretRepository:
data_connector_repo=self.data_connector_repo,
user_repo=self.kc_user_repo,
secret_service_public_key=self.secrets_service_public_key,
authz=self.authz,
)
return self._data_connector_secret_repo

Expand Down
69 changes: 66 additions & 3 deletions components/renku_data_services/data_connectors/db.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
"""Adapters for data connectors database classes."""

from collections.abc import Callable
from typing import TypeVar
from collections.abc import AsyncIterator, Callable
from typing import TypeVar, cast

from cryptography.hazmat.primitives.asymmetric import rsa
from sqlalchemy import Select, delete, func, select
from sqlalchemy import Select, delete, func, or_, select
from sqlalchemy.ext.asyncio import AsyncSession
from ulid import ULID

Expand Down Expand Up @@ -478,11 +478,74 @@ def __init__(
data_connector_repo: DataConnectorRepository,
user_repo: UserRepo,
secret_service_public_key: rsa.RSAPublicKey,
authz: Authz,
) -> None:
self.session_maker = session_maker
self.data_connector_repo = data_connector_repo
self.user_repo = user_repo
self.secret_service_public_key = secret_service_public_key
self.authz = authz

async def get_data_connectors_with_secrets(
self,
user: base_models.APIUser,
project_id: ULID,
) -> AsyncIterator[models.DataConnectorWithSecrets]:
"""Get all data connectors and their secrets for a project."""
if user.id is None:
raise errors.UnauthorizedError(message="You do not have the required permissions for this operation.")

can_read_project = await self.authz.has_permission(user, ResourceType.project, project_id, Scope.READ)
if not can_read_project:
raise errors.MissingResourceError(
message=f"The project ID with {project_id} does not exist or you dont have permission to access it"
)

async with self.session_maker() as session:
stmt = (
select(schemas.DataConnectorORM, schemas.DataConnectorSecretORM)
.select_from(schemas.DataConnectorORM) # NOTE: Makes sure the FROM statement is as expected
.join(
target=schemas.DataConnectorToProjectLinkORM,
onclause=schemas.DataConnectorORM.id == schemas.DataConnectorToProjectLinkORM.data_connector_id,
)
.join(
target=schemas.DataConnectorSecretORM,
onclause=schemas.DataConnectorORM.id == schemas.DataConnectorSecretORM.data_connector_id,
isouter=True, # NOTE: enables us to select data connectors with and without secrets
)
.where(schemas.DataConnectorToProjectLinkORM.project_id == project_id)
.where(
or_(
schemas.DataConnectorSecretORM.user_id == user.id,
# NOTE: the user_id field on a connector secret is non-nullable, but
# since we are doing an outer join this allows us to include data connectors
# without secrets.
schemas.DataConnectorSecretORM.user_id.is_(None),
)
)
# NOTE: The order is important for the processing of the data below
.order_by(schemas.DataConnectorORM.id)
.order_by(schemas.DataConnectorSecretORM.secret_id)
)
results = await session.stream(stmt)
dc_current: models.DataConnector | None = None
dc_secrets: list[models.DataConnectorSecret] = []
async for res in results:
# NOTE: sqlalchemy does not set the types right for outer joins
dc, sec = cast(tuple[schemas.DataConnectorORM, schemas.DataConnectorSecretORM | None], res.t)
if dc_current is not None and dc.id != dc_current.id:
yield models.DataConnectorWithSecrets(dc_current, dc_secrets)
if dc_current is None or dc.id != dc_current.id:
dc_current = dc.dump()
dc_secrets = [sec.dump()] if sec else []
continue
if sec:
dc_secrets.append(sec.dump())
if dc_current is None:
# There are no data connectors at all returned from the DB
return
yield models.DataConnectorWithSecrets(dc_current, dc_secrets)

async def get_data_connector_secrets(
self,
Expand Down
8 changes: 8 additions & 0 deletions components/renku_data_services/data_connectors/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,3 +143,11 @@ class DataConnectorPermissions:
write: bool
delete: bool
change_membership: bool


@dataclass
class DataConnectorWithSecrets:
"""A data connector with its secrets."""

data_connector: DataConnector
secrets: list[DataConnectorSecret] = field(default_factory=list)
4 changes: 0 additions & 4 deletions components/renku_data_services/notebooks/api.spec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1016,7 +1016,6 @@ components:
additionalProperties: true
readonly:
type: boolean
default: true
source_path:
type: string
target_path:
Expand All @@ -1026,9 +1025,6 @@ components:
- "$ref": "#/components/schemas/Ulid"
- description: The storage ID is used to know which storage config from the DB should be overriden
required:
- configuration
- source_path
- target_path
- storage_id
ServerName:
type: string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from configparser import ConfigParser
from io import StringIO
from pathlib import PurePosixPath
from typing import Any, Final, Optional, Self
from typing import Any, Final, Optional, Protocol, Self

from kubernetes import client
from marshmallow import EXCLUDE, Schema, ValidationError, fields, validates_schema
Expand Down Expand Up @@ -36,6 +36,15 @@ def validate_storage(self, data: dict, **kwargs: dict) -> None:
raise ValidationError("'storage_id' cannot be used together with 'source_path' or 'target_path'")


class RCloneStorageRequestOverride(Protocol):
"""A small dataclass for handling overrides to the data connector requests."""

source_path: str | None = None
target_path: str | None = None
configuration: dict[str, Any] | None = None
readonly: bool | None = None


class RCloneStorage(ICloudStorageRequest):
"""RClone based storage."""

Expand Down Expand Up @@ -221,6 +230,17 @@ def _stringify(value: Any) -> str:
parser.write(stringio)
return stringio.getvalue()

def with_override(self, override: RCloneStorageRequestOverride) -> "RCloneStorage":
"""Override certain fields on the storage."""
return RCloneStorage(
source_path=override.source_path if override.source_path else self.source_path,
mount_folder=override.target_path if override.target_path else self.mount_folder,
readonly=override.readonly if override.readonly is not None else self.readonly,
configuration=override.configuration if override.configuration else self.configuration,
name=self.name,
config=self.config,
)


class LaunchNotebookResponseCloudStorage(RCloneStorageRequest):
"""Notebook launch response with cloud storage attached."""
Expand Down
10 changes: 5 additions & 5 deletions components/renku_data_services/notebooks/apispec.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: api.spec.yaml
# timestamp: 2024-10-07T22:25:48+00:00
# timestamp: 2024-10-23T09:06:05+00:00

from __future__ import annotations

Expand Down Expand Up @@ -259,10 +259,10 @@ class SessionLogsResponse(RootModel[Optional[Dict[str, str]]]):


class SessionCloudStoragePost(BaseAPISpec):
configuration: Dict[str, Any]
readonly: bool = True
source_path: str
target_path: str
configuration: Optional[Dict[str, Any]] = None
readonly: Optional[bool] = None
source_path: Optional[str] = None
target_path: Optional[str] = None
storage_id: str = Field(
...,
description="ULID identifier",
Expand Down
94 changes: 57 additions & 37 deletions components/renku_data_services/notebooks/blueprints.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
import base64
import os
from dataclasses import dataclass
from pathlib import PurePosixPath
from typing import Any
from urllib.parse import urljoin, urlparse

import httpx
from kubernetes.client import V1ObjectMeta, V1Secret
from sanic import Request, empty, exceptions, json
from sanic.response import HTTPResponse, JSONResponse
Expand All @@ -19,7 +21,12 @@
from renku_data_services.base_api.blueprint import BlueprintFactoryResponse, CustomBlueprint
from renku_data_services.base_models import AnonymousAPIUser, APIUser, AuthenticatedAPIUser, Authenticator
from renku_data_services.crc.db import ResourcePoolRepository
from renku_data_services.data_connectors.db import DataConnectorProjectLinkRepository, DataConnectorRepository
from renku_data_services.data_connectors.db import (
DataConnectorProjectLinkRepository,
DataConnectorRepository,
DataConnectorSecretRepository,
)
from renku_data_services.data_connectors.models import DataConnectorSecret
from renku_data_services.errors import errors
from renku_data_services.notebooks import apispec, core
from renku_data_services.notebooks.api.amalthea_patches import git_proxy, init_containers
Expand Down Expand Up @@ -242,6 +249,7 @@ class NotebooksNewBP(CustomBlueprint):
rp_repo: ResourcePoolRepository
data_connector_repo: DataConnectorRepository
data_connector_project_link_repo: DataConnectorProjectLinkRepository
data_connector_secret_repo: DataConnectorSecretRepository

def start(self) -> BlueprintFactoryResponse:
"""Start a session with the new operator."""
Expand Down Expand Up @@ -271,62 +279,56 @@ async def _handler(
resource_class_id = body.resource_class_id or default_resource_class.id
await self.nb_config.crc_validator.validate_class_storage(user, resource_class_id, body.disk_storage)
work_dir = environment.working_directory
# TODO: Wait for pitch on users secrets to implement this
# user_secrets: K8sUserSecrets | None = None
# if body.user_secrets:
# user_secrets = K8sUserSecrets(
# name=server_name,
# user_secret_ids=body.user_secrets.user_secret_ids,
# mount_path=body.user_secrets.mount_path,
# )

# TODO
data_connector_links = await self.data_connector_project_link_repo.get_links_to(
user=user, project_id=project.id
)
data_connectors = [
await self.data_connector_repo.get_data_connector(user=user, data_connector_id=link.data_connector_id)
for link in data_connector_links
]
# TODO: handle secrets?
cloud_storage: dict[str, RCloneStorage] = {
str(dc.id): RCloneStorage(
source_path=dc.storage.source_path,
mount_folder=(work_dir / dc.storage.target_path).as_posix(),
configuration=dc.storage.configuration,
readonly=dc.storage.readonly,
data_connectors_stream = self.data_connector_secret_repo.get_data_connectors_with_secrets(user, project.id)
dcs: dict[str, RCloneStorage] = {}
dcs_secrets: dict[str, list[DataConnectorSecret]] = {}
async for dc in data_connectors_stream:
dcs[str(dc.data_connector.id)] = RCloneStorage(
source_path=dc.data_connector.storage.source_path,
mount_folder=dc.data_connector.storage.target_path
if PurePosixPath(dc.data_connector.storage.target_path).is_absolute()
else (work_dir / dc.data_connector.storage.target_path).as_posix(),
configuration=dc.data_connector.storage.configuration,
readonly=dc.data_connector.storage.readonly,
config=self.nb_config,
name=dc.name,
name=dc.data_connector.name,
)
for dc in data_connectors
}
cloud_storage_request: dict[str, RCloneStorage] = {
s.storage_id: RCloneStorage(
source_path=s.source_path,
mount_folder=(work_dir / s.target_path).as_posix(),
configuration=s.configuration,
readonly=s.readonly,
config=self.nb_config,
name=None,
)
for s in body.cloudstorage or []
}
dcs_secrets[str(dc.data_connector.id)] = dc.secrets
# NOTE: Check the cloud storage in the request body and if any match
# then overwrite the projects cloud storages
# NOTE: Cloud storages in the session launch request body that are not form the DB will cause a 422 error
for csr_id, csr in cloud_storage_request.items():
if csr_id not in cloud_storage:
# NOTE: Cloud storages in the session launch request body that are not from the DB will cause a 404 error
# NOTE: Overriding the configuration when a saved secret is there will cause a 422 error
cloud_storage_overrides = body.cloudstorage or []
for csr in cloud_storage_overrides:
csr_id = csr.storage_id
if csr_id not in dcs:
raise errors.MissingResourceError(
message=f"You have requested a cloud storage with ID {csr_id} which does not exist "
"or you dont have access to.",
quiet=True,
)
cloud_storage[csr_id] = csr
if csr.target_path is not None and not PurePosixPath(csr.target_path).is_absolute():
csr.target_path = (work_dir / csr.target_path).as_posix()
if csr_id in dcs_secrets and csr.configuration is not None:
raise errors.ValidationError(
message=f"Overriding the storage configuration for storage with ID {csr_id} "
"is not allowed because the storage has an associated saved secret.",
)
dcs[csr_id] = dcs[csr_id].with_override(csr)
repositories = [Repository(url=i) for i in project.repositories]
secrets_to_create: list[V1Secret] = []
# Generate the cloud starge secrets
data_sources: list[DataSource] = []
for ics, cs in enumerate(cloud_storage.values()):
secret_name = f"{server_name}-ds-{ics}"
for cs_id, cs in dcs.items():
secret_name = f"{server_name}-ds-{cs_id}"
secrets_to_create.append(cs.secret(secret_name, self.nb_config.k8s_client.preferred_namespace))
data_sources.append(
DataSource(mountPath=cs.mount_folder, secretRef=SecretRefWhole(name=secret_name, adopt=True))
Expand Down Expand Up @@ -470,6 +472,24 @@ async def _handler(
for s in secrets_to_create:
await self.nb_config.k8s_v2_client.delete_secret(s.metadata.name)
raise errors.ProgrammingError(message="Could not start the amalthea session")
else:
owner_reference = {
"apiVersion": manifest.apiVersion,
"kind": manifest.kind,
"name": manifest.metadata.name,
"uid": manifest.metadata.uid,
}
secrets_url = self.nb_config.user_secrets.secrets_storage_service_url + "/api/secrets/kubernetes"
headers = {"Authorization": f"bearer {user.access_token}"}
for s_id, secrets in dcs_secrets.items():
request_data = {
"name": f"{server_name}-ds-{s_id}-secrets",
"namespace": self.nb_config.k8s_v2_client.preferred_namespace,
"secret_ids": [str(secret.secret_id) for secret in secrets],
"owner_references": [owner_reference],
}
async with httpx.AsyncClient(timeout=10) as client:
await client.post(secrets_url, headers=headers, json=request_data)

return json(manifest.as_apispec().model_dump(mode="json", exclude_none=True), 201)

Expand Down

0 comments on commit aae3f82

Please sign in to comment.