Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support dynamic resource suffix in Airlock #3243

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from
42 changes: 21 additions & 21 deletions airlock_processor/StatusChangedQueueTrigger/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class RequestProperties(BaseModel):
new_status: str
previous_status: Optional[str]
type: str
workspace_id: str
workspace_unique_identifier_suffix: str


class ContainersCopyMetadata:
Expand Down Expand Up @@ -48,18 +48,18 @@ def handle_status_changed(request_properties: RequestProperties, stepResultEvent
new_status = request_properties.new_status
previous_status = request_properties.previous_status
req_id = request_properties.request_id
ws_id = request_properties.workspace_id
unique_suffix = request_properties.workspace_unique_identifier_suffix
request_type = request_properties.type

logging.info('Processing request with id %s. new status is "%s", type is "%s"', req_id, new_status, request_type)
logging.info(f'Processing request with id {req_id}. new status is {new_status}, previous status is {previous_status}, unique_suffix is {unique_suffix} type is {request_type}')

if new_status == constants.STAGE_DRAFT:
account_name = get_storage_account(status=constants.STAGE_DRAFT, request_type=request_type, short_workspace_id=ws_id)
account_name = get_storage_account(status=constants.STAGE_DRAFT, request_type=request_type, workspace_unique_identifier_suffix=unique_suffix)
blob_operations.create_container(account_name, req_id)
return

if new_status == constants.STAGE_CANCELLED:
storage_account_name = get_storage_account(previous_status, request_type, ws_id)
storage_account_name = get_storage_account(previous_status, request_type, unique_suffix)
container_to_delete_url = blob_operations.get_blob_url(account_name=storage_account_name, container_name=req_id)
set_output_event_to_trigger_container_deletion(dataDeletionEvent, request_properties, container_url=container_to_delete_url)
return
Expand All @@ -69,7 +69,7 @@ def handle_status_changed(request_properties: RequestProperties, stepResultEvent

if (is_require_data_copy(new_status)):
logging.info('Request with id %s. requires data copy between storage accounts', req_id)
containers_metadata = get_source_dest_for_copy(new_status=new_status, previous_status=previous_status, request_type=request_type, short_workspace_id=ws_id)
containers_metadata = get_source_dest_for_copy(new_status=new_status, previous_status=previous_status, request_type=request_type, workspace_unique_identifier_suffix=unique_suffix)
blob_operations.create_container(containers_metadata.dest_account_name, req_id)
blob_operations.copy_data(containers_metadata.source_account_name,
containers_metadata.dest_account_name, req_id)
Expand Down Expand Up @@ -102,7 +102,7 @@ def is_require_data_copy(new_status: str):
return False


def get_source_dest_for_copy(new_status: str, previous_status: str, request_type: str, short_workspace_id: str) -> ContainersCopyMetadata:
def get_source_dest_for_copy(new_status: str, previous_status: str, request_type: str, workspace_unique_identifier_suffix: str) -> ContainersCopyMetadata:
# sanity
if is_require_data_copy(new_status) is False:
raise Exception("Given new status is not supported")
Expand All @@ -114,19 +114,19 @@ def get_source_dest_for_copy(new_status: str, previous_status: str, request_type
logging.error(msg)
raise Exception(msg)

source_account_name = get_storage_account(previous_status, request_type, short_workspace_id)
dest_account_name = get_storage_account_destination_for_copy(new_status, request_type, short_workspace_id)
source_account_name = get_storage_account(previous_status, request_type, workspace_unique_identifier_suffix)
dest_account_name = get_storage_account_destination_for_copy(new_status, request_type, workspace_unique_identifier_suffix)
return ContainersCopyMetadata(source_account_name, dest_account_name)


def get_storage_account(status: str, request_type: str, short_workspace_id: str) -> str:
def get_storage_account(status: str, request_type: str, workspace_unique_identifier_suffix: str) -> str:
tre_id = _get_tre_id()

if request_type == constants.IMPORT_TYPE:
if status == constants.STAGE_DRAFT:
return constants.STORAGE_ACCOUNT_NAME_IMPORT_EXTERNAL + tre_id
elif status == constants.STAGE_APPROVED:
return constants.STORAGE_ACCOUNT_NAME_IMPORT_APPROVED + short_workspace_id
return constants.STORAGE_ACCOUNT_NAME_IMPORT_APPROVED + workspace_unique_identifier_suffix
elif status == constants.STAGE_REJECTED:
return constants.STORAGE_ACCOUNT_NAME_IMPORT_REJECTED + tre_id
elif status == constants.STAGE_BLOCKED_BY_SCAN:
Expand All @@ -136,43 +136,43 @@ def get_storage_account(status: str, request_type: str, short_workspace_id: str)

if request_type == constants.EXPORT_TYPE:
if status == constants.STAGE_DRAFT:
return constants.STORAGE_ACCOUNT_NAME_EXPORT_INTERNAL + short_workspace_id
return constants.STORAGE_ACCOUNT_NAME_EXPORT_INTERNAL + workspace_unique_identifier_suffix
elif status == constants.STAGE_APPROVED:
return constants.STORAGE_ACCOUNT_NAME_EXPORT_APPROVED + tre_id
elif status == constants.STAGE_REJECTED:
return constants.STORAGE_ACCOUNT_NAME_EXPORT_REJECTED + short_workspace_id
return constants.STORAGE_ACCOUNT_NAME_EXPORT_REJECTED + workspace_unique_identifier_suffix
elif status == constants.STAGE_BLOCKED_BY_SCAN:
return constants.STORAGE_ACCOUNT_NAME_EXPORT_BLOCKED + short_workspace_id
return constants.STORAGE_ACCOUNT_NAME_EXPORT_BLOCKED + workspace_unique_identifier_suffix
elif status in [constants.STAGE_IN_REVIEW, constants.STAGE_SUBMITTED, constants.STAGE_APPROVAL_INPROGRESS, constants.STAGE_REJECTION_INPROGRESS, constants.STAGE_BLOCKING_INPROGRESS]:
return constants.STORAGE_ACCOUNT_NAME_EXPORT_INPROGRESS + short_workspace_id
return constants.STORAGE_ACCOUNT_NAME_EXPORT_INPROGRESS + workspace_unique_identifier_suffix

error_message = f"Missing current storage account definition for status '{status}' and request type '{request_type}'."
logging.error(error_message)
raise Exception(error_message)


def get_storage_account_destination_for_copy(new_status: str, request_type: str, short_workspace_id: str) -> str:
def get_storage_account_destination_for_copy(new_status: str, request_type: str, workspace_unique_identifier_suffix: str) -> str:
tre_id = _get_tre_id()

if request_type == constants.IMPORT_TYPE:
if new_status == constants.STAGE_SUBMITTED:
return constants.STORAGE_ACCOUNT_NAME_IMPORT_INPROGRESS + tre_id
elif new_status == constants.STAGE_APPROVAL_INPROGRESS:
return constants.STORAGE_ACCOUNT_NAME_IMPORT_APPROVED + short_workspace_id
return constants.STORAGE_ACCOUNT_NAME_IMPORT_APPROVED + workspace_unique_identifier_suffix
elif new_status == constants.STAGE_REJECTION_INPROGRESS:
return constants.STORAGE_ACCOUNT_NAME_IMPORT_REJECTED + tre_id
elif new_status == constants.STAGE_BLOCKING_INPROGRESS:
return constants.STORAGE_ACCOUNT_NAME_IMPORT_BLOCKED + tre_id

if request_type == constants.EXPORT_TYPE:
if new_status == constants.STAGE_SUBMITTED:
return constants.STORAGE_ACCOUNT_NAME_EXPORT_INPROGRESS + short_workspace_id
return constants.STORAGE_ACCOUNT_NAME_EXPORT_INPROGRESS + workspace_unique_identifier_suffix
elif new_status == constants.STAGE_APPROVAL_INPROGRESS:
return constants.STORAGE_ACCOUNT_NAME_EXPORT_APPROVED + tre_id
elif new_status == constants.STAGE_REJECTION_INPROGRESS:
return constants.STORAGE_ACCOUNT_NAME_EXPORT_REJECTED + short_workspace_id
return constants.STORAGE_ACCOUNT_NAME_EXPORT_REJECTED + workspace_unique_identifier_suffix
elif new_status == constants.STAGE_BLOCKING_INPROGRESS:
return constants.STORAGE_ACCOUNT_NAME_EXPORT_BLOCKED + short_workspace_id
return constants.STORAGE_ACCOUNT_NAME_EXPORT_BLOCKED + workspace_unique_identifier_suffix

error_message = f"Missing copy destination storage account definition for status '{new_status}' and request type '{request_type}'."
logging.error(error_message)
Expand Down Expand Up @@ -218,7 +218,7 @@ def set_output_event_to_trigger_container_deletion(dataDeletionEvent, request_pr


def get_request_files(request_properties: RequestProperties):
storage_account_name = get_storage_account(request_properties.previous_status, request_properties.type, request_properties.workspace_id)
storage_account_name = get_storage_account(request_properties.previous_status, request_properties.type, request_properties.workspace_unique_identifier_suffix)
return blob_operations.get_request_files(account_name=storage_account_name, request_id=request_properties.request_id)


Expand Down
2 changes: 1 addition & 1 deletion airlock_processor/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.7.2"
__version__ = "0.7.3"
20 changes: 10 additions & 10 deletions airlock_processor/tests/test_status_change_queue_trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,25 +11,25 @@

class TestPropertiesExtraction():
def test_extract_prop_valid_body_return_all_values(self):
message_body = "{ \"data\": { \"request_id\":\"123\",\"new_status\":\"456\" ,\"previous_status\":\"789\" , \"type\":\"101112\", \"workspace_id\":\"ws1\" }}"
message_body = "{ \"data\": { \"request_id\":\"123\",\"new_status\":\"456\" ,\"previous_status\":\"789\" , \"type\":\"101112\", \"workspace_unique_identifier_suffix\":\"ws1\" }}"
message = _mock_service_bus_message(body=message_body)
req_prop = extract_properties(message)
assert req_prop.request_id == "123"
assert req_prop.new_status == "456"
assert req_prop.previous_status == "789"
assert req_prop.type == "101112"
assert req_prop.workspace_id == "ws1"
assert req_prop.workspace_unique_identifier_suffix == "ws1"

def test_extract_prop_missing_arg_throws(self):
message_body = "{ \"data\": { \"status\":\"456\" , \"type\":\"789\", \"workspace_id\":\"ws1\" }}"
message_body = "{ \"data\": { \"status\":\"456\" , \"type\":\"789\", \"workspace_unique_identifier_suffix\":\"ws1\" }}"
message = _mock_service_bus_message(body=message_body)
pytest.raises(ValidationError, extract_properties, message)

message_body = "{ \"data\": { \"request_id\":\"123\", \"type\":\"789\", \"workspace_id\":\"ws1\" }}"
message_body = "{ \"data\": { \"request_id\":\"123\", \"type\":\"789\", \"workspace_unique_identifier_suffix\":\"ws1\" }}"
message = _mock_service_bus_message(body=message_body)
pytest.raises(ValidationError, extract_properties, message)

message_body = "{ \"data\": { \"request_id\":\"123\",\"status\":\"456\" , \"workspace_id\":\"ws1\" }}"
message_body = "{ \"data\": { \"request_id\":\"123\",\"status\":\"456\" , \"workspace_unique_identifier_suffix\":\"ws1\" }}"
message = _mock_service_bus_message(body=message_body)
pytest.raises(ValidationError, extract_properties, message)

Expand Down Expand Up @@ -72,7 +72,7 @@ class TestFileEnumeration():
@patch("StatusChangedQueueTrigger.is_require_data_copy", return_value=False)
@patch.dict(os.environ, {"TRE_ID": "tre-id"}, clear=True)
def test_get_request_files_should_be_called_on_submit_stage(self, _, mock_get_request_files, mock_set_output_event_to_report_request_files):
message_body = "{ \"data\": { \"request_id\":\"123\",\"new_status\":\"submitted\" ,\"previous_status\":\"draft\" , \"type\":\"export\", \"workspace_id\":\"ws1\" }}"
message_body = "{ \"data\": { \"request_id\":\"123\",\"new_status\":\"submitted\" ,\"previous_status\":\"draft\" , \"type\":\"export\", \"workspace_unique_identifier_suffix\":\"ws1\" }}"
message = _mock_service_bus_message(body=message_body)
main(msg=message, stepResultEvent=MagicMock(), dataDeletionEvent=MagicMock())
assert mock_get_request_files.called
Expand All @@ -82,7 +82,7 @@ def test_get_request_files_should_be_called_on_submit_stage(self, _, mock_get_re
@patch("StatusChangedQueueTrigger.get_request_files")
@patch("StatusChangedQueueTrigger.handle_status_changed")
def test_get_request_files_should_not_be_called_if_new_status_is_not_submit(self, _, mock_get_request_files, mock_set_output_event_to_report_failure):
message_body = "{ \"data\": { \"request_id\":\"123\",\"new_status\":\"fake-status\" ,\"previous_status\":\"None\" , \"type\":\"export\", \"workspace_id\":\"ws1\" }}"
message_body = "{ \"data\": { \"request_id\":\"123\",\"new_status\":\"fake-status\" ,\"previous_status\":\"None\" , \"type\":\"export\", \"workspace_unique_identifier_suffix\":\"ws1\" }}"
message = _mock_service_bus_message(body=message_body)
main(msg=message, stepResultEvent=MagicMock(), dataDeletionEvent=MagicMock())
assert not mock_get_request_files.called
Expand All @@ -92,7 +92,7 @@ def test_get_request_files_should_not_be_called_if_new_status_is_not_submit(self
@patch("StatusChangedQueueTrigger.get_request_files")
@patch("StatusChangedQueueTrigger.handle_status_changed", side_effect=Exception)
def test_get_request_files_should_be_called_when_failing_during_submit_stage(self, _, mock_get_request_files, mock_set_output_event_to_report_failure):
message_body = "{ \"data\": { \"request_id\":\"123\",\"new_status\":\"submitted\" ,\"previous_status\":\"draft\" , \"type\":\"export\", \"workspace_id\":\"ws1\" }}"
message_body = "{ \"data\": { \"request_id\":\"123\",\"new_status\":\"submitted\" ,\"previous_status\":\"draft\" , \"type\":\"export\", \"workspace_unique_identifier_suffix\":\"ws1\" }}"
message = _mock_service_bus_message(body=message_body)
main(msg=message, stepResultEvent=MagicMock(), dataDeletionEvent=MagicMock())
assert mock_get_request_files.called
Expand All @@ -102,7 +102,7 @@ def test_get_request_files_should_be_called_when_failing_during_submit_stage(sel
@patch.dict(os.environ, {"TRE_ID": "tre-id"}, clear=True)
def test_get_request_files_called_with_correct_storage_account(self, mock_get_request_files):
source_storage_account_for_submitted_stage = constants.STORAGE_ACCOUNT_NAME_EXPORT_INTERNAL + 'ws1'
message_body = "{ \"data\": { \"request_id\":\"123\",\"new_status\":\"submitted\" ,\"previous_status\":\"draft\" , \"type\":\"export\", \"workspace_id\":\"ws1\" }}"
message_body = "{ \"data\": { \"request_id\":\"123\",\"new_status\":\"submitted\" ,\"previous_status\":\"draft\" , \"type\":\"export\", \"workspace_unique_identifier_suffix\":\"ws1\" }}"
message = _mock_service_bus_message(body=message_body)
request_properties = extract_properties(message)
get_request_files(request_properties)
Expand All @@ -113,7 +113,7 @@ class TestFilesDeletion():
@patch("StatusChangedQueueTrigger.set_output_event_to_trigger_container_deletion")
@patch.dict(os.environ, {"TRE_ID": "tre-id"}, clear=True)
def test_delete_request_files_should_be_called_on_cancel_stage(self, mock_set_output_event_to_trigger_container_deletion):
message_body = "{ \"data\": { \"request_id\":\"123\",\"new_status\":\"cancelled\" ,\"previous_status\":\"draft\" , \"type\":\"export\", \"workspace_id\":\"ws1\" }}"
message_body = "{ \"data\": { \"request_id\":\"123\",\"new_status\":\"cancelled\" ,\"previous_status\":\"draft\" , \"type\":\"export\", \"workspace_unique_identifier_suffix\":\"ws1\" }}"
message = _mock_service_bus_message(body=message_body)
main(msg=message, stepResultEvent=MagicMock(), dataDeletionEvent=MagicMock())
assert mock_set_output_event_to_trigger_container_deletion.called
Expand Down
2 changes: 1 addition & 1 deletion api_app/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.18.11"
__version__ = "0.18.12"
6 changes: 3 additions & 3 deletions api_app/api/routes/migrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,9 @@ async def migrate_database(resources_repo=Depends(get_repository(ResourceReposit
num_updated = await airlock_migration.update_review_decision_values()
migrations.append(Migration(issueNumber="3152", status=f'Updated {num_updated} airlock requests with new reviewDecision value'))

logger.info("PR 3358 - Migrate OperationSteps of Operations")
num_updated = await resource_migration.migrate_step_id_of_operation_steps(operations_repo)
migrations.append(Migration(issueNumber="3358", status=f'Updated {num_updated} operations'))
logging.info("PR 3243 - Migrate reviewDecision of Airlock Reviews")
num_updated = await resource_migration.add_unique_identifier_suffix()
migrations.append(Migration(issueNumber="3243", status=f'Added the unique_identifier_suffix field to {num_updated} resources'))

return MigrationOutList(migrations=migrations)
except Exception as e:
Expand Down
12 changes: 4 additions & 8 deletions api_app/db/migrations/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,11 @@ async def archive_history(self, resource_history_repository: ResourceHistoryRepo

return num_updated

async def migrate_step_id_of_operation_steps(self, operations_repository: OperationRepository) -> int:
async def add_unique_identifier_suffix(self) -> int:
num_updated = 0
for operation in await operations_repository.query("SELECT * from c WHERE ARRAY_LENGTH(c.steps) > 0 AND IS_DEFINED(c.steps[0].stepId)"):
for operation_step in operation['steps']:
operation_step['templateStepId'] = operation_step['stepId']
operation_step['id'] = str(uuid.uuid4())
del operation_step['stepId']

await operations_repository.update_item_dict(operation)
for resource in await self.query("SELECT * from c WHERE NOT IS_DEFINED(c.properties.unique_identifier_suffix)"):
resource['properties']['unique_identifier_suffix'] = resource['id'][-4:]
await self.update_item_dict(resource)
num_updated = num_updated + 1

return num_updated
5 changes: 2 additions & 3 deletions api_app/event_grid/event_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,15 @@
from services.logging import logger


async def send_status_changed_event(airlock_request: AirlockRequest, previous_status: Optional[AirlockRequestStatus]):
async def send_status_changed_event(airlock_request: AirlockRequest, workspace_unique_identifier_suffix: str, previous_status: Optional[AirlockRequestStatus]):
request_id = airlock_request.id
new_status = airlock_request.status.value
previous_status = previous_status.value if previous_status else None
request_type = airlock_request.type.value
short_workspace_id = airlock_request.workspaceId[-4:]

status_changed_event = EventGridEvent(
event_type="statusChanged",
data=StatusChangedData(request_id=request_id, new_status=new_status, previous_status=previous_status, type=request_type, workspace_id=short_workspace_id).__dict__,
data=StatusChangedData(request_id=request_id, new_status=new_status, previous_status=previous_status, type=request_type, workspace_unique_identifier_suffix=workspace_unique_identifier_suffix).__dict__,
subject=f"{request_id}/statusChanged",
data_version="2.0"
)
Expand Down
2 changes: 1 addition & 1 deletion api_app/models/domain/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,4 @@ class StatusChangedData(AzureTREModel):
new_status: str
previous_status: Optional[str]
type: str
workspace_id: str
workspace_unique_identifier_suffix: str
Loading
Loading