Skip to content
This repository has been archived by the owner on Jun 28, 2024. It is now read-only.

Replaced cumbersome shared generic base class #22

Merged
merged 1 commit into from
Feb 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions src/fis/adapters/inbound/fastapi_/dummies.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@
from ghga_service_commons.api.di import DependencyDummy

from fis.config import Config
from fis.core.ingest import LegacyUploadMetadataProcessor
from fis.ports.inbound.ingest import UploadMetadataProcessorPort
from fis.ports.inbound.ingest import (
LegacyUploadMetadataProcessorPort,
UploadMetadataProcessorPort,
)

config_dummy = DependencyDummy("config_dummy")
upload_processor_port = DependencyDummy("upload_processor_port")
Expand All @@ -37,5 +39,5 @@
]

LegacyUploadProcessor = Annotated[
LegacyUploadMetadataProcessor, Depends(legacy_upload_processor)
LegacyUploadMetadataProcessorPort, Depends(legacy_upload_processor)
]
17 changes: 8 additions & 9 deletions src/fis/adapters/inbound/fastapi_/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@
require_token,
)
from fis.core.models import EncryptedPayload
from fis.ports.inbound.ingest import (
DecryptionError,
VaultCommunicationError,
WrongDecryptedFormatError,
)

router = APIRouter()

Expand Down Expand Up @@ -59,10 +64,7 @@ async def ingest_legacy_metadata(
decrypted_metadata = await upload_metadata_processor.decrypt_payload(
encrypted=encrypted_payload
)
except (
upload_metadata_processor.DecryptionError,
upload_metadata_processor.WrongDecryptedFormatError,
) as error:
except (DecryptionError, WrongDecryptedFormatError) as error:
raise HTTPException(status_code=422, detail=str(error)) from error

file_secret = decrypted_metadata.file_secret
Expand All @@ -71,7 +73,7 @@ async def ingest_legacy_metadata(
secret_id = await upload_metadata_processor.store_secret(
file_secret=file_secret
)
except upload_metadata_processor.VaultCommunicationError as error:
except VaultCommunicationError as error:
raise HTTPException(status_code=500, detail=str(error)) from error

await upload_metadata_processor.populate_by_event(
Expand Down Expand Up @@ -100,10 +102,7 @@ async def ingest_metadata(
decrypted_metadata = await upload_metadata_processor.decrypt_payload(
encrypted=encrypted_payload
)
except (
upload_metadata_processor.DecryptionError,
upload_metadata_processor.WrongDecryptedFormatError,
) as error:
except (DecryptionError, WrongDecryptedFormatError) as error:
raise HTTPException(status_code=422, detail=str(error)) from error

secret_id = decrypted_metadata.secret_id
Expand Down
93 changes: 59 additions & 34 deletions src/fis/core/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,20 @@
"""Functionality relating to S3 upload metadata processing"""

import json
from typing import Generic

from ghga_service_commons.utils.crypt import decrypt
from nacl.exceptions import CryptoError
from pydantic import Field, ValidationError
from pydantic_settings import BaseSettings

from fis.core import models
from fis.ports.inbound.ingest import UploadMetadataModel, UploadMetadataProcessorPort
from fis.ports.inbound.ingest import (
DecryptionError,
LegacyUploadMetadataProcessorPort,
UploadMetadataProcessorPort,
VaultCommunicationError,
WrongDecryptedFormatError,
)
from fis.ports.outbound.event_pub import EventPublisherPort
from fis.ports.outbound.vault.client import VaultAdapterPort

Expand Down Expand Up @@ -54,13 +59,8 @@ class ServiceConfig(BaseSettings):
)


class UploadMetadataProcessorBase(
UploadMetadataProcessorPort, Generic[UploadMetadataModel]
):
"""Shared implementations for current/legacy upload processor.

Do not instantiate directly.
"""
class LegacyUploadMetadataProcessor(LegacyUploadMetadataProcessorPort):
"""Handler for S3 upload metadata processing"""

def __init__(
self,
Expand All @@ -73,8 +73,24 @@ def __init__(
self._event_publisher = event_publisher
self._vault_adapter = vault_adapter

async def decrypt_payload( # type: ignore
self, *, encrypted: models.EncryptedPayload
) -> models.LegacyUploadMetadata:
"""Decrypt upload metadata using private key"""
try:
decrypted = decrypt(data=encrypted.payload, key=self._config.private_key)
except (ValueError, CryptoError) as error:
raise DecryptionError() from error

upload_metadata = json.loads(decrypted)

try:
return models.LegacyUploadMetadata(**upload_metadata)
except ValidationError as error:
raise WrongDecryptedFormatError(cause=str(error)) from error

async def populate_by_event(
self, *, upload_metadata: UploadMetadataModel, secret_id: str
self, *, upload_metadata: models.LegacyUploadMetadata, secret_id: str
):
"""Send FileUploadValidationSuccess event to be processed by downstream services"""
await self._event_publisher.send_file_metadata(
Expand All @@ -89,31 +105,22 @@ async def store_secret(self, *, file_secret: str) -> str:
try:
return self._vault_adapter.store_secret(secret=file_secret)
except self._vault_adapter.SecretInsertionError as error:
raise self.VaultCommunicationError(message=str(error)) from error
raise VaultCommunicationError(message=str(error)) from error


class LegacyUploadMetadataProcessor(UploadMetadataProcessorBase):
class UploadMetadataProcessor(UploadMetadataProcessorPort):
"""Handler for S3 upload metadata processing"""

async def decrypt_payload( # type: ignore
self, *, encrypted: models.EncryptedPayload
) -> models.LegacyUploadMetadata:
"""Decrypt upload metadata using private key"""
try:
decrypted = decrypt(data=encrypted.payload, key=self._config.private_key)
except (ValueError, CryptoError) as error:
raise self.DecryptionError() from error

upload_metadata = json.loads(decrypted)

try:
return models.LegacyUploadMetadata(**upload_metadata)
except ValidationError as error:
raise self.WrongDecryptedFormatError(cause=str(error)) from error


class UploadMetadataProcessor(UploadMetadataProcessorBase):
"""Handler for S3 upload metadata processing"""
def __init__(
self,
*,
config: ServiceConfig,
event_publisher: EventPublisherPort,
vault_adapter: VaultAdapterPort,
):
self._config = config
self._event_publisher = event_publisher
self._vault_adapter = vault_adapter

async def decrypt_payload(
self, *, encrypted: models.EncryptedPayload
Expand All @@ -122,20 +129,38 @@ async def decrypt_payload(
try:
decrypted = decrypt(data=encrypted.payload, key=self._config.private_key)
except (ValueError, CryptoError) as error:
raise self.DecryptionError() from error
raise DecryptionError() from error

upload_metadata = json.loads(decrypted)

try:
return models.UploadMetadata(**upload_metadata)
except ValidationError as error:
raise self.WrongDecryptedFormatError(cause=str(error)) from error
raise WrongDecryptedFormatError(cause=str(error)) from error

async def decrypt_secret(self, *, encrypted: models.EncryptedPayload) -> str:
"""Decrypt file secret payload"""
try:
decrypted = decrypt(data=encrypted.payload, key=self._config.private_key)
except (ValueError, CryptoError) as error:
raise self.DecryptionError() from error
raise DecryptionError() from error

return decrypted

async def populate_by_event(
self, *, upload_metadata: models.UploadMetadata, secret_id: str
):
"""Send FileUploadValidationSuccess event to be processed by downstream services"""
await self._event_publisher.send_file_metadata(
secret_id=secret_id,
source_bucket_id=self._config.source_bucket_id,
upload_metadata=upload_metadata,
s3_endpoint_alias=self._config.selected_storage_alias,
)

async def store_secret(self, *, file_secret: str) -> str:
"""Communicate with HashiCorp Vault to store file secret and get secret ID"""
try:
return self._vault_adapter.store_secret(secret=file_secret)
except self._vault_adapter.SecretInsertionError as error:
raise VaultCommunicationError(message=str(error)) from error
13 changes: 8 additions & 5 deletions src/fis/inject.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,17 @@
from fis.adapters.outbound.vault import VaultAdapter
from fis.config import Config
from fis.core.ingest import LegacyUploadMetadataProcessor, UploadMetadataProcessor
from fis.ports.inbound.ingest import UploadMetadataProcessorPort
from fis.ports.inbound.ingest import (
LegacyUploadMetadataProcessorPort,
UploadMetadataProcessorPort,
)


@asynccontextmanager
async def prepare_core(
*, config: Config
) -> AsyncGenerator[
tuple[UploadMetadataProcessorPort, LegacyUploadMetadataProcessor], None
tuple[UploadMetadataProcessorPort, LegacyUploadMetadataProcessorPort], None
]:
"""Constructs and initializes all core components and their outbound dependencies."""
vault_adapter = VaultAdapter(config=config)
Expand All @@ -49,7 +52,7 @@ async def prepare_core(
event_publisher=event_publisher,
vault_adapter=vault_adapter,
),
LegacyUploadMetadataProcessor( # type: ignore [abstract]
LegacyUploadMetadataProcessor(
config=config,
event_publisher=event_publisher,
vault_adapter=vault_adapter,
Expand All @@ -61,7 +64,7 @@ def prepare_core_with_override(
*,
config: Config,
core_override: Optional[
tuple[UploadMetadataProcessorPort, LegacyUploadMetadataProcessor]
tuple[UploadMetadataProcessorPort, LegacyUploadMetadataProcessorPort]
] = None,
):
"""Resolve the prepare_core context manager based on config and override (if any)."""
Expand All @@ -77,7 +80,7 @@ async def prepare_rest_app(
*,
config: Config,
core_override: Optional[
tuple[UploadMetadataProcessorPort, LegacyUploadMetadataProcessor]
tuple[UploadMetadataProcessorPort, LegacyUploadMetadataProcessorPort]
] = None,
) -> AsyncGenerator[FastAPI, None]:
"""Construct and initialize a REST API app along with all its dependencies.
Expand Down
73 changes: 50 additions & 23 deletions src/fis/ports/inbound/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,53 +14,80 @@
# limitations under the License.
"""Ports for S3 upload metadata ingest"""

from abc import abstractmethod
from typing import Generic, TypeVar
from abc import ABC, abstractmethod

from fis.core import models

UploadMetadataModel = TypeVar("UploadMetadataModel", bound=models.UploadMetadataBase)

class DecryptionError(RuntimeError):
"""Thrown when decryption with the provided private key failed"""

def __init__(self):
message = "Could not decrypt received payload with the given key."
super().__init__(message)

class UploadMetadataProcessorPort(Generic[UploadMetadataModel]):
"""Port for S3 upload metadata processor"""

class DecryptionError(RuntimeError):
"""Thrown when decryption with the provided private key failed"""
class VaultCommunicationError(RuntimeError):
"""Thrown when interaction with the vault resulted in an error"""

def __init__(self):
message = "Could not decrypt received payload with the given key."
super().__init__(message)
def __init__(self, *, message) -> None:
super().__init__(message)

class WrongDecryptedFormatError(RuntimeError):
"""Thrown when the decrypted payload"""

def __init__(self, *, cause: str):
message = f"Decrypted payload does not conform to expected format: {cause}."
super().__init__(message)
class WrongDecryptedFormatError(RuntimeError):
"""Thrown when the decrypted payload"""

class VaultCommunicationError(RuntimeError):
"""Thrown when interaction with the vault resulted in an error"""
def __init__(self, *, cause: str):
message = f"Decrypted payload does not conform to expected format: {cause}."
super().__init__(message)

def __init__(self, *, message) -> None:
super().__init__(message)

class LegacyUploadMetadataProcessorPort(ABC):
"""Port for legacy S3 upload metadata processor"""

@abstractmethod
async def decrypt_payload(
self, *, encrypted: models.EncryptedPayload
) -> models.LegacyUploadMetadata:
"""Decrypt upload metadata using private key"""
...

@abstractmethod
async def populate_by_event(
self, *, upload_metadata: UploadMetadataModel, secret_id: str
self, *, upload_metadata: models.LegacyUploadMetadata, secret_id: str
):
"""Send FileUploadValidationSuccess event to be processed by downstream services"""
...

@abstractmethod
async def store_secret(self, *, file_secret: str) -> str:
"""Communicate with HashiCorp Vault to store file secret and get secret ID"""
...

@abstractmethod
async def decrypt_secret(self, *, encrypted: models.EncryptedPayload) -> str:
"""Decrypt file secret payload"""

class UploadMetadataProcessorPort(ABC):
"""Port for S3 upload metadata processor"""

@abstractmethod
async def decrypt_payload(
self, *, encrypted: models.EncryptedPayload
) -> models.UploadMetadata:
"""Decrypt upload metadata using private key"""
...

@abstractmethod
async def decrypt_secret(self, *, encrypted: models.EncryptedPayload) -> str:
"""Decrypt file secret payload"""
...

@abstractmethod
async def populate_by_event(
self, *, upload_metadata: models.UploadMetadata, secret_id: str
):
"""Send FileUploadValidationSuccess event to be processed by downstream services"""
...

@abstractmethod
async def store_secret(self, *, file_secret: str) -> str:
"""Communicate with HashiCorp Vault to store file secret and get secret ID"""
...
8 changes: 5 additions & 3 deletions tests/fixtures/joint.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@
from hexkit.providers.akafka.testutils import KafkaFixture, kafka_fixture # noqa: F401

from fis.config import Config
from fis.core.ingest import LegacyUploadMetadataProcessor
from fis.core.models import UploadMetadataBase
from fis.inject import prepare_core, prepare_rest_app
from fis.ports.inbound.ingest import UploadMetadataProcessorPort
from fis.ports.inbound.ingest import (
LegacyUploadMetadataProcessorPort,
UploadMetadataProcessorPort,
)
from tests.fixtures.config import get_config

TEST_PAYLOAD = UploadMetadataBase(
Expand All @@ -59,7 +61,7 @@ class JointFixture:
rest_client: httpx.AsyncClient
s3_endpoint_alias: str
upload_metadata_processor: UploadMetadataProcessorPort
legacy_upload_metadata_processor: LegacyUploadMetadataProcessor
legacy_upload_metadata_processor: LegacyUploadMetadataProcessorPort


@pytest_asyncio.fixture
Expand Down
Loading
Loading