diff --git a/src/fis/adapters/inbound/fastapi_/dummies.py b/src/fis/adapters/inbound/fastapi_/dummies.py index 7efcda9..7603540 100644 --- a/src/fis/adapters/inbound/fastapi_/dummies.py +++ b/src/fis/adapters/inbound/fastapi_/dummies.py @@ -23,8 +23,10 @@ from ghga_service_commons.api.di import DependencyDummy from fis.config import Config -from fis.core.ingest import LegacyUploadMetadataProcessor -from fis.ports.inbound.ingest import UploadMetadataProcessorPort +from fis.ports.inbound.ingest import ( + LegacyUploadMetadataProcessorPort, + UploadMetadataProcessorPort, +) config_dummy = DependencyDummy("config_dummy") upload_processor_port = DependencyDummy("upload_processor_port") @@ -37,5 +39,5 @@ ] LegacyUploadProcessor = Annotated[ - LegacyUploadMetadataProcessor, Depends(legacy_upload_processor) + LegacyUploadMetadataProcessorPort, Depends(legacy_upload_processor) ] diff --git a/src/fis/adapters/inbound/fastapi_/routes.py b/src/fis/adapters/inbound/fastapi_/routes.py index 11c36d2..1974345 100644 --- a/src/fis/adapters/inbound/fastapi_/routes.py +++ b/src/fis/adapters/inbound/fastapi_/routes.py @@ -24,6 +24,11 @@ require_token, ) from fis.core.models import EncryptedPayload +from fis.ports.inbound.ingest import ( + DecryptionError, + VaultCommunicationError, + WrongDecryptedFormatError, +) router = APIRouter() @@ -59,10 +64,7 @@ async def ingest_legacy_metadata( decrypted_metadata = await upload_metadata_processor.decrypt_payload( encrypted=encrypted_payload ) - except ( - upload_metadata_processor.DecryptionError, - upload_metadata_processor.WrongDecryptedFormatError, - ) as error: + except (DecryptionError, WrongDecryptedFormatError) as error: raise HTTPException(status_code=422, detail=str(error)) from error file_secret = decrypted_metadata.file_secret @@ -71,7 +73,7 @@ async def ingest_legacy_metadata( secret_id = await upload_metadata_processor.store_secret( file_secret=file_secret ) - except upload_metadata_processor.VaultCommunicationError as error: + except VaultCommunicationError as error: raise HTTPException(status_code=500, detail=str(error)) from error await upload_metadata_processor.populate_by_event( @@ -100,10 +102,7 @@ async def ingest_metadata( decrypted_metadata = await upload_metadata_processor.decrypt_payload( encrypted=encrypted_payload ) - except ( - upload_metadata_processor.DecryptionError, - upload_metadata_processor.WrongDecryptedFormatError, - ) as error: + except (DecryptionError, WrongDecryptedFormatError) as error: raise HTTPException(status_code=422, detail=str(error)) from error secret_id = decrypted_metadata.secret_id diff --git a/src/fis/core/ingest.py b/src/fis/core/ingest.py index 8aa9fcb..700d6f4 100644 --- a/src/fis/core/ingest.py +++ b/src/fis/core/ingest.py @@ -15,7 +15,6 @@ """Functionality relating to S3 upload metadata processing""" import json -from typing import Generic from ghga_service_commons.utils.crypt import decrypt from nacl.exceptions import CryptoError @@ -23,7 +22,13 @@ from pydantic_settings import BaseSettings from fis.core import models -from fis.ports.inbound.ingest import UploadMetadataModel, UploadMetadataProcessorPort +from fis.ports.inbound.ingest import ( + DecryptionError, + LegacyUploadMetadataProcessorPort, + UploadMetadataProcessorPort, + VaultCommunicationError, + WrongDecryptedFormatError, +) from fis.ports.outbound.event_pub import EventPublisherPort from fis.ports.outbound.vault.client import VaultAdapterPort @@ -54,13 +59,8 @@ class ServiceConfig(BaseSettings): ) -class UploadMetadataProcessorBase( - UploadMetadataProcessorPort, Generic[UploadMetadataModel] -): - """Shared implementations for current/legacy upload processor. - - Do not instantiate directly. - """ +class LegacyUploadMetadataProcessor(LegacyUploadMetadataProcessorPort): + """Handler for S3 upload metadata processing""" def __init__( self, @@ -73,8 +73,24 @@ def __init__( self._event_publisher = event_publisher self._vault_adapter = vault_adapter + async def decrypt_payload( # type: ignore + self, *, encrypted: models.EncryptedPayload + ) -> models.LegacyUploadMetadata: + """Decrypt upload metadata using private key""" + try: + decrypted = decrypt(data=encrypted.payload, key=self._config.private_key) + except (ValueError, CryptoError) as error: + raise DecryptionError() from error + + upload_metadata = json.loads(decrypted) + + try: + return models.LegacyUploadMetadata(**upload_metadata) + except ValidationError as error: + raise WrongDecryptedFormatError(cause=str(error)) from error + async def populate_by_event( - self, *, upload_metadata: UploadMetadataModel, secret_id: str + self, *, upload_metadata: models.LegacyUploadMetadata, secret_id: str ): """Send FileUploadValidationSuccess event to be processed by downstream services""" await self._event_publisher.send_file_metadata( @@ -89,31 +105,22 @@ async def store_secret(self, *, file_secret: str) -> str: try: return self._vault_adapter.store_secret(secret=file_secret) except self._vault_adapter.SecretInsertionError as error: - raise self.VaultCommunicationError(message=str(error)) from error + raise VaultCommunicationError(message=str(error)) from error -class LegacyUploadMetadataProcessor(UploadMetadataProcessorBase): +class UploadMetadataProcessor(UploadMetadataProcessorPort): """Handler for S3 upload metadata processing""" - async def decrypt_payload( # type: ignore - self, *, encrypted: models.EncryptedPayload - ) -> models.LegacyUploadMetadata: - """Decrypt upload metadata using private key""" - try: - decrypted = decrypt(data=encrypted.payload, key=self._config.private_key) - except (ValueError, CryptoError) as error: - raise self.DecryptionError() from error - - upload_metadata = json.loads(decrypted) - - try: - return models.LegacyUploadMetadata(**upload_metadata) - except ValidationError as error: - raise self.WrongDecryptedFormatError(cause=str(error)) from error - - -class UploadMetadataProcessor(UploadMetadataProcessorBase): - """Handler for S3 upload metadata processing""" + def __init__( + self, + *, + config: ServiceConfig, + event_publisher: EventPublisherPort, + vault_adapter: VaultAdapterPort, + ): + self._config = config + self._event_publisher = event_publisher + self._vault_adapter = vault_adapter async def decrypt_payload( self, *, encrypted: models.EncryptedPayload @@ -122,20 +129,38 @@ async def decrypt_payload( try: decrypted = decrypt(data=encrypted.payload, key=self._config.private_key) except (ValueError, CryptoError) as error: - raise self.DecryptionError() from error + raise DecryptionError() from error upload_metadata = json.loads(decrypted) try: return models.UploadMetadata(**upload_metadata) except ValidationError as error: - raise self.WrongDecryptedFormatError(cause=str(error)) from error + raise WrongDecryptedFormatError(cause=str(error)) from error async def decrypt_secret(self, *, encrypted: models.EncryptedPayload) -> str: """Decrypt file secret payload""" try: decrypted = decrypt(data=encrypted.payload, key=self._config.private_key) except (ValueError, CryptoError) as error: - raise self.DecryptionError() from error + raise DecryptionError() from error return decrypted + + async def populate_by_event( + self, *, upload_metadata: models.UploadMetadata, secret_id: str + ): + """Send FileUploadValidationSuccess event to be processed by downstream services""" + await self._event_publisher.send_file_metadata( + secret_id=secret_id, + source_bucket_id=self._config.source_bucket_id, + upload_metadata=upload_metadata, + s3_endpoint_alias=self._config.selected_storage_alias, + ) + + async def store_secret(self, *, file_secret: str) -> str: + """Communicate with HashiCorp Vault to store file secret and get secret ID""" + try: + return self._vault_adapter.store_secret(secret=file_secret) + except self._vault_adapter.SecretInsertionError as error: + raise VaultCommunicationError(message=str(error)) from error diff --git a/src/fis/inject.py b/src/fis/inject.py index aa97215..23a4dd6 100644 --- a/src/fis/inject.py +++ b/src/fis/inject.py @@ -30,14 +30,17 @@ from fis.adapters.outbound.vault import VaultAdapter from fis.config import Config from fis.core.ingest import LegacyUploadMetadataProcessor, UploadMetadataProcessor -from fis.ports.inbound.ingest import UploadMetadataProcessorPort +from fis.ports.inbound.ingest import ( + LegacyUploadMetadataProcessorPort, + UploadMetadataProcessorPort, +) @asynccontextmanager async def prepare_core( *, config: Config ) -> AsyncGenerator[ - tuple[UploadMetadataProcessorPort, LegacyUploadMetadataProcessor], None + tuple[UploadMetadataProcessorPort, LegacyUploadMetadataProcessorPort], None ]: """Constructs and initializes all core components and their outbound dependencies.""" vault_adapter = VaultAdapter(config=config) @@ -49,7 +52,7 @@ async def prepare_core( event_publisher=event_publisher, vault_adapter=vault_adapter, ), - LegacyUploadMetadataProcessor( # type: ignore [abstract] + LegacyUploadMetadataProcessor( config=config, event_publisher=event_publisher, vault_adapter=vault_adapter, @@ -61,7 +64,7 @@ def prepare_core_with_override( *, config: Config, core_override: Optional[ - tuple[UploadMetadataProcessorPort, LegacyUploadMetadataProcessor] + tuple[UploadMetadataProcessorPort, LegacyUploadMetadataProcessorPort] ] = None, ): """Resolve the prepare_core context manager based on config and override (if any).""" @@ -77,7 +80,7 @@ async def prepare_rest_app( *, config: Config, core_override: Optional[ - tuple[UploadMetadataProcessorPort, LegacyUploadMetadataProcessor] + tuple[UploadMetadataProcessorPort, LegacyUploadMetadataProcessorPort] ] = None, ) -> AsyncGenerator[FastAPI, None]: """Construct and initialize a REST API app along with all its dependencies. diff --git a/src/fis/ports/inbound/ingest.py b/src/fis/ports/inbound/ingest.py index 6e58ce5..fca05b1 100644 --- a/src/fis/ports/inbound/ingest.py +++ b/src/fis/ports/inbound/ingest.py @@ -14,53 +14,80 @@ # limitations under the License. """Ports for S3 upload metadata ingest""" -from abc import abstractmethod -from typing import Generic, TypeVar +from abc import ABC, abstractmethod from fis.core import models -UploadMetadataModel = TypeVar("UploadMetadataModel", bound=models.UploadMetadataBase) +class DecryptionError(RuntimeError): + """Thrown when decryption with the provided private key failed""" + + def __init__(self): + message = "Could not decrypt received payload with the given key." + super().__init__(message) -class UploadMetadataProcessorPort(Generic[UploadMetadataModel]): - """Port for S3 upload metadata processor""" - class DecryptionError(RuntimeError): - """Thrown when decryption with the provided private key failed""" +class VaultCommunicationError(RuntimeError): + """Thrown when interaction with the vault resulted in an error""" - def __init__(self): - message = "Could not decrypt received payload with the given key." - super().__init__(message) + def __init__(self, *, message) -> None: + super().__init__(message) - class WrongDecryptedFormatError(RuntimeError): - """Thrown when the decrypted payload""" - def __init__(self, *, cause: str): - message = f"Decrypted payload does not conform to expected format: {cause}." - super().__init__(message) +class WrongDecryptedFormatError(RuntimeError): + """Thrown when the decrypted payload""" - class VaultCommunicationError(RuntimeError): - """Thrown when interaction with the vault resulted in an error""" + def __init__(self, *, cause: str): + message = f"Decrypted payload does not conform to expected format: {cause}." + super().__init__(message) - def __init__(self, *, message) -> None: - super().__init__(message) + +class LegacyUploadMetadataProcessorPort(ABC): + """Port for legacy S3 upload metadata processor""" + + @abstractmethod + async def decrypt_payload( + self, *, encrypted: models.EncryptedPayload + ) -> models.LegacyUploadMetadata: + """Decrypt upload metadata using private key""" + ... @abstractmethod async def populate_by_event( - self, *, upload_metadata: UploadMetadataModel, secret_id: str + self, *, upload_metadata: models.LegacyUploadMetadata, secret_id: str ): """Send FileUploadValidationSuccess event to be processed by downstream services""" + ... @abstractmethod async def store_secret(self, *, file_secret: str) -> str: """Communicate with HashiCorp Vault to store file secret and get secret ID""" + ... - @abstractmethod - async def decrypt_secret(self, *, encrypted: models.EncryptedPayload) -> str: - """Decrypt file secret payload""" + +class UploadMetadataProcessorPort(ABC): + """Port for S3 upload metadata processor""" @abstractmethod async def decrypt_payload( self, *, encrypted: models.EncryptedPayload ) -> models.UploadMetadata: """Decrypt upload metadata using private key""" + ... + + @abstractmethod + async def decrypt_secret(self, *, encrypted: models.EncryptedPayload) -> str: + """Decrypt file secret payload""" + ... + + @abstractmethod + async def populate_by_event( + self, *, upload_metadata: models.UploadMetadata, secret_id: str + ): + """Send FileUploadValidationSuccess event to be processed by downstream services""" + ... + + @abstractmethod + async def store_secret(self, *, file_secret: str) -> str: + """Communicate with HashiCorp Vault to store file secret and get secret ID""" + ... diff --git a/tests/fixtures/joint.py b/tests/fixtures/joint.py index bc0f2eb..cc40fb3 100644 --- a/tests/fixtures/joint.py +++ b/tests/fixtures/joint.py @@ -29,10 +29,12 @@ from hexkit.providers.akafka.testutils import KafkaFixture, kafka_fixture # noqa: F401 from fis.config import Config -from fis.core.ingest import LegacyUploadMetadataProcessor from fis.core.models import UploadMetadataBase from fis.inject import prepare_core, prepare_rest_app -from fis.ports.inbound.ingest import UploadMetadataProcessorPort +from fis.ports.inbound.ingest import ( + LegacyUploadMetadataProcessorPort, + UploadMetadataProcessorPort, +) from tests.fixtures.config import get_config TEST_PAYLOAD = UploadMetadataBase( @@ -59,7 +61,7 @@ class JointFixture: rest_client: httpx.AsyncClient s3_endpoint_alias: str upload_metadata_processor: UploadMetadataProcessorPort - legacy_upload_metadata_processor: LegacyUploadMetadataProcessor + legacy_upload_metadata_processor: LegacyUploadMetadataProcessorPort @pytest_asyncio.fixture diff --git a/tests/test_ingest.py b/tests/test_ingest.py index 67119b1..0076cd6 100644 --- a/tests/test_ingest.py +++ b/tests/test_ingest.py @@ -22,6 +22,10 @@ from ghga_service_commons.utils.crypt import encrypt, generate_key_pair from fis.core.models import EncryptedPayload, LegacyUploadMetadata, UploadMetadata +from fis.ports.inbound.ingest import ( + DecryptionError, + WrongDecryptedFormatError, +) from tests.fixtures.joint import ( # noqa: F401 JointFixture, KafkaFixture, @@ -85,9 +89,7 @@ async def test_decryption_sad(joint_fixture: JointFixture): # noqa: F811 ) ) - with pytest.raises( - joint_fixture.legacy_upload_metadata_processor.WrongDecryptedFormatError - ): + with pytest.raises(WrongDecryptedFormatError): await joint_fixture.legacy_upload_metadata_processor.decrypt_payload( encrypted=encrypted_payload ) @@ -104,7 +106,7 @@ async def test_decryption_sad(joint_fixture: JointFixture): # noqa: F811 payload=encrypt(data=payload.model_dump_json(), key=keypair2.public) ) - with pytest.raises(joint_fixture.legacy_upload_metadata_processor.DecryptionError): + with pytest.raises(DecryptionError): await joint_fixture.legacy_upload_metadata_processor.decrypt_payload( encrypted=encrypted_payload ) @@ -121,9 +123,7 @@ async def test_legacy_decryption_sad(joint_fixture: JointFixture): # noqa: F811 ) ) - with pytest.raises( - joint_fixture.upload_metadata_processor.WrongDecryptedFormatError - ): + with pytest.raises(WrongDecryptedFormatError): await joint_fixture.upload_metadata_processor.decrypt_payload( encrypted=encrypted_payload ) @@ -140,7 +140,7 @@ async def test_legacy_decryption_sad(joint_fixture: JointFixture): # noqa: F811 payload=encrypt(data=payload.model_dump_json(), key=keypair2.public) ) - with pytest.raises(joint_fixture.upload_metadata_processor.DecryptionError): + with pytest.raises(DecryptionError): await joint_fixture.upload_metadata_processor.decrypt_payload( encrypted=encrypted_payload )