Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support dynamic resource suffix in Airlock #3243

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from
42 changes: 21 additions & 21 deletions airlock_processor/StatusChangedQueueTrigger/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class RequestProperties(BaseModel):
new_status: str
previous_status: Optional[str]
type: str
workspace_id: str
unique_identifier_suffix: str
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Across much of this change, isn't the unique_identifier_suffix always the workspace one?
If so, shouldn't we call it like that?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed



class ContainersCopyMetadata:
Expand Down Expand Up @@ -48,18 +48,18 @@ def handle_status_changed(request_properties: RequestProperties, stepResultEvent
new_status = request_properties.new_status
previous_status = request_properties.previous_status
req_id = request_properties.request_id
ws_id = request_properties.workspace_id
unique_suffix = request_properties.unique_identifier_suffix
request_type = request_properties.type

logging.info('Processing request with id %s. new status is "%s", type is "%s"', req_id, new_status, request_type)
logging.info(f'Processing request with id {req_id}. new status is {new_status}, previous status is {previous_status}, unique_suffix is {unique_suffix} type is {request_type}')

if new_status == constants.STAGE_DRAFT:
account_name = get_storage_account(status=constants.STAGE_DRAFT, request_type=request_type, short_workspace_id=ws_id)
account_name = get_storage_account(status=constants.STAGE_DRAFT, request_type=request_type, unique_identifier_suffix=unique_suffix)
blob_operations.create_container(account_name, req_id)
return

if new_status == constants.STAGE_CANCELLED:
storage_account_name = get_storage_account(previous_status, request_type, ws_id)
storage_account_name = get_storage_account(previous_status, request_type, unique_suffix)
container_to_delete_url = blob_operations.get_blob_url(account_name=storage_account_name, container_name=req_id)
set_output_event_to_trigger_container_deletion(dataDeletionEvent, request_properties, container_url=container_to_delete_url)
return
Expand All @@ -69,7 +69,7 @@ def handle_status_changed(request_properties: RequestProperties, stepResultEvent

if (is_require_data_copy(new_status)):
logging.info('Request with id %s. requires data copy between storage accounts', req_id)
containers_metadata = get_source_dest_for_copy(new_status=new_status, previous_status=previous_status, request_type=request_type, short_workspace_id=ws_id)
containers_metadata = get_source_dest_for_copy(new_status=new_status, previous_status=previous_status, request_type=request_type, unique_identifier_suffix=unique_suffix)
blob_operations.create_container(containers_metadata.dest_account_name, req_id)
blob_operations.copy_data(containers_metadata.source_account_name,
containers_metadata.dest_account_name, req_id)
Expand Down Expand Up @@ -102,7 +102,7 @@ def is_require_data_copy(new_status: str):
return False


def get_source_dest_for_copy(new_status: str, previous_status: str, request_type: str, short_workspace_id: str) -> ContainersCopyMetadata:
def get_source_dest_for_copy(new_status: str, previous_status: str, request_type: str, unique_identifier_suffix: str) -> ContainersCopyMetadata:
# sanity
if is_require_data_copy(new_status) is False:
raise Exception("Given new status is not supported")
Expand All @@ -114,19 +114,19 @@ def get_source_dest_for_copy(new_status: str, previous_status: str, request_type
logging.error(msg)
raise Exception(msg)

source_account_name = get_storage_account(previous_status, request_type, short_workspace_id)
dest_account_name = get_storage_account_destination_for_copy(new_status, request_type, short_workspace_id)
source_account_name = get_storage_account(previous_status, request_type, unique_identifier_suffix)
dest_account_name = get_storage_account_destination_for_copy(new_status, request_type, unique_identifier_suffix)
return ContainersCopyMetadata(source_account_name, dest_account_name)


def get_storage_account(status: str, request_type: str, short_workspace_id: str) -> str:
def get_storage_account(status: str, request_type: str, unique_identifier_suffix: str) -> str:
tre_id = _get_tre_id()

if request_type == constants.IMPORT_TYPE:
if status == constants.STAGE_DRAFT:
return constants.STORAGE_ACCOUNT_NAME_IMPORT_EXTERNAL + tre_id
elif status == constants.STAGE_APPROVED:
return constants.STORAGE_ACCOUNT_NAME_IMPORT_APPROVED + short_workspace_id
return constants.STORAGE_ACCOUNT_NAME_IMPORT_APPROVED + unique_identifier_suffix
elif status == constants.STAGE_REJECTED:
return constants.STORAGE_ACCOUNT_NAME_IMPORT_REJECTED + tre_id
elif status == constants.STAGE_BLOCKED_BY_SCAN:
Expand All @@ -136,43 +136,43 @@ def get_storage_account(status: str, request_type: str, short_workspace_id: str)

if request_type == constants.EXPORT_TYPE:
if status == constants.STAGE_DRAFT:
return constants.STORAGE_ACCOUNT_NAME_EXPORT_INTERNAL + short_workspace_id
return constants.STORAGE_ACCOUNT_NAME_EXPORT_INTERNAL + unique_identifier_suffix
elif status == constants.STAGE_APPROVED:
return constants.STORAGE_ACCOUNT_NAME_EXPORT_APPROVED + tre_id
elif status == constants.STAGE_REJECTED:
return constants.STORAGE_ACCOUNT_NAME_EXPORT_REJECTED + short_workspace_id
return constants.STORAGE_ACCOUNT_NAME_EXPORT_REJECTED + unique_identifier_suffix
elif status == constants.STAGE_BLOCKED_BY_SCAN:
return constants.STORAGE_ACCOUNT_NAME_EXPORT_BLOCKED + short_workspace_id
return constants.STORAGE_ACCOUNT_NAME_EXPORT_BLOCKED + unique_identifier_suffix
elif status in [constants.STAGE_IN_REVIEW, constants.STAGE_SUBMITTED, constants.STAGE_APPROVAL_INPROGRESS, constants.STAGE_REJECTION_INPROGRESS, constants.STAGE_BLOCKING_INPROGRESS]:
return constants.STORAGE_ACCOUNT_NAME_EXPORT_INPROGRESS + short_workspace_id
return constants.STORAGE_ACCOUNT_NAME_EXPORT_INPROGRESS + unique_identifier_suffix

error_message = f"Missing current storage account definition for status '{status}' and request type '{request_type}'."
logging.error(error_message)
raise Exception(error_message)


def get_storage_account_destination_for_copy(new_status: str, request_type: str, short_workspace_id: str) -> str:
def get_storage_account_destination_for_copy(new_status: str, request_type: str, unique_identifier_suffix: str) -> str:
tre_id = _get_tre_id()

if request_type == constants.IMPORT_TYPE:
if new_status == constants.STAGE_SUBMITTED:
return constants.STORAGE_ACCOUNT_NAME_IMPORT_INPROGRESS + tre_id
elif new_status == constants.STAGE_APPROVAL_INPROGRESS:
return constants.STORAGE_ACCOUNT_NAME_IMPORT_APPROVED + short_workspace_id
return constants.STORAGE_ACCOUNT_NAME_IMPORT_APPROVED + unique_identifier_suffix
elif new_status == constants.STAGE_REJECTION_INPROGRESS:
return constants.STORAGE_ACCOUNT_NAME_IMPORT_REJECTED + tre_id
elif new_status == constants.STAGE_BLOCKING_INPROGRESS:
return constants.STORAGE_ACCOUNT_NAME_IMPORT_BLOCKED + tre_id

if request_type == constants.EXPORT_TYPE:
if new_status == constants.STAGE_SUBMITTED:
return constants.STORAGE_ACCOUNT_NAME_EXPORT_INPROGRESS + short_workspace_id
return constants.STORAGE_ACCOUNT_NAME_EXPORT_INPROGRESS + unique_identifier_suffix
elif new_status == constants.STAGE_APPROVAL_INPROGRESS:
return constants.STORAGE_ACCOUNT_NAME_EXPORT_APPROVED + tre_id
elif new_status == constants.STAGE_REJECTION_INPROGRESS:
return constants.STORAGE_ACCOUNT_NAME_EXPORT_REJECTED + short_workspace_id
return constants.STORAGE_ACCOUNT_NAME_EXPORT_REJECTED + unique_identifier_suffix
elif new_status == constants.STAGE_BLOCKING_INPROGRESS:
return constants.STORAGE_ACCOUNT_NAME_EXPORT_BLOCKED + short_workspace_id
return constants.STORAGE_ACCOUNT_NAME_EXPORT_BLOCKED + unique_identifier_suffix

error_message = f"Missing copy destination storage account definition for status '{new_status}' and request type '{request_type}'."
logging.error(error_message)
Expand Down Expand Up @@ -218,7 +218,7 @@ def set_output_event_to_trigger_container_deletion(dataDeletionEvent, request_pr


def get_request_files(request_properties: RequestProperties):
storage_account_name = get_storage_account(request_properties.previous_status, request_properties.type, request_properties.workspace_id)
storage_account_name = get_storage_account(request_properties.previous_status, request_properties.type, request_properties.unique_identifier_suffix)
return blob_operations.get_request_files(account_name=storage_account_name, request_id=request_properties.request_id)


Expand Down
2 changes: 1 addition & 1 deletion airlock_processor/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.4.13"
__version__ = "0.5.0"
20 changes: 10 additions & 10 deletions airlock_processor/tests/test_status_change_queue_trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,25 +11,25 @@

class TestPropertiesExtraction():
def test_extract_prop_valid_body_return_all_values(self):
message_body = "{ \"data\": { \"request_id\":\"123\",\"new_status\":\"456\" ,\"previous_status\":\"789\" , \"type\":\"101112\", \"workspace_id\":\"ws1\" }}"
message_body = "{ \"data\": { \"request_id\":\"123\",\"new_status\":\"456\" ,\"previous_status\":\"789\" , \"type\":\"101112\", \"unique_identifier_suffix\":\"ws1\" }}"
message = _mock_service_bus_message(body=message_body)
req_prop = extract_properties(message)
assert req_prop.request_id == "123"
assert req_prop.new_status == "456"
assert req_prop.previous_status == "789"
assert req_prop.type == "101112"
assert req_prop.workspace_id == "ws1"
assert req_prop.unique_identifier_suffix == "ws1"

def test_extract_prop_missing_arg_throws(self):
message_body = "{ \"data\": { \"status\":\"456\" , \"type\":\"789\", \"workspace_id\":\"ws1\" }}"
message_body = "{ \"data\": { \"status\":\"456\" , \"type\":\"789\", \"unique_identifier_suffix\":\"ws1\" }}"
message = _mock_service_bus_message(body=message_body)
pytest.raises(ValidationError, extract_properties, message)

message_body = "{ \"data\": { \"request_id\":\"123\", \"type\":\"789\", \"workspace_id\":\"ws1\" }}"
message_body = "{ \"data\": { \"request_id\":\"123\", \"type\":\"789\", \"unique_identifier_suffix\":\"ws1\" }}"
message = _mock_service_bus_message(body=message_body)
pytest.raises(ValidationError, extract_properties, message)

message_body = "{ \"data\": { \"request_id\":\"123\",\"status\":\"456\" , \"workspace_id\":\"ws1\" }}"
message_body = "{ \"data\": { \"request_id\":\"123\",\"status\":\"456\" , \"unique_identifier_suffix\":\"ws1\" }}"
message = _mock_service_bus_message(body=message_body)
pytest.raises(ValidationError, extract_properties, message)

Expand Down Expand Up @@ -72,7 +72,7 @@ class TestFileEnumeration():
@patch("StatusChangedQueueTrigger.is_require_data_copy", return_value=False)
@patch.dict(os.environ, {"TRE_ID": "tre-id"}, clear=True)
def test_get_request_files_should_be_called_on_submit_stage(self, _, mock_get_request_files, mock_set_output_event_to_report_request_files):
message_body = "{ \"data\": { \"request_id\":\"123\",\"new_status\":\"submitted\" ,\"previous_status\":\"draft\" , \"type\":\"export\", \"workspace_id\":\"ws1\" }}"
message_body = "{ \"data\": { \"request_id\":\"123\",\"new_status\":\"submitted\" ,\"previous_status\":\"draft\" , \"type\":\"export\", \"unique_identifier_suffix\":\"ws1\" }}"
message = _mock_service_bus_message(body=message_body)
main(msg=message, stepResultEvent=MagicMock(), dataDeletionEvent=MagicMock())
assert mock_get_request_files.called
Expand All @@ -82,7 +82,7 @@ def test_get_request_files_should_be_called_on_submit_stage(self, _, mock_get_re
@patch("StatusChangedQueueTrigger.get_request_files")
@patch("StatusChangedQueueTrigger.handle_status_changed")
def test_get_request_files_should_not_be_called_if_new_status_is_not_submit(self, _, mock_get_request_files, mock_set_output_event_to_report_failure):
message_body = "{ \"data\": { \"request_id\":\"123\",\"new_status\":\"fake-status\" ,\"previous_status\":\"None\" , \"type\":\"export\", \"workspace_id\":\"ws1\" }}"
message_body = "{ \"data\": { \"request_id\":\"123\",\"new_status\":\"fake-status\" ,\"previous_status\":\"None\" , \"type\":\"export\", \"unique_identifier_suffix\":\"ws1\" }}"
message = _mock_service_bus_message(body=message_body)
main(msg=message, stepResultEvent=MagicMock(), dataDeletionEvent=MagicMock())
assert not mock_get_request_files.called
Expand All @@ -92,7 +92,7 @@ def test_get_request_files_should_not_be_called_if_new_status_is_not_submit(self
@patch("StatusChangedQueueTrigger.get_request_files")
@patch("StatusChangedQueueTrigger.handle_status_changed", side_effect=Exception)
def test_get_request_files_should_be_called_when_failing_during_submit_stage(self, _, mock_get_request_files, mock_set_output_event_to_report_failure):
message_body = "{ \"data\": { \"request_id\":\"123\",\"new_status\":\"submitted\" ,\"previous_status\":\"draft\" , \"type\":\"export\", \"workspace_id\":\"ws1\" }}"
message_body = "{ \"data\": { \"request_id\":\"123\",\"new_status\":\"submitted\" ,\"previous_status\":\"draft\" , \"type\":\"export\", \"unique_identifier_suffix\":\"ws1\" }}"
message = _mock_service_bus_message(body=message_body)
main(msg=message, stepResultEvent=MagicMock(), dataDeletionEvent=MagicMock())
assert mock_get_request_files.called
Expand All @@ -102,7 +102,7 @@ def test_get_request_files_should_be_called_when_failing_during_submit_stage(sel
@patch.dict(os.environ, {"TRE_ID": "tre-id"}, clear=True)
def test_get_request_files_called_with_correct_storage_account(self, mock_get_request_files):
source_storage_account_for_submitted_stage = constants.STORAGE_ACCOUNT_NAME_EXPORT_INTERNAL + 'ws1'
message_body = "{ \"data\": { \"request_id\":\"123\",\"new_status\":\"submitted\" ,\"previous_status\":\"draft\" , \"type\":\"export\", \"workspace_id\":\"ws1\" }}"
message_body = "{ \"data\": { \"request_id\":\"123\",\"new_status\":\"submitted\" ,\"previous_status\":\"draft\" , \"type\":\"export\", \"unique_identifier_suffix\":\"ws1\" }}"
message = _mock_service_bus_message(body=message_body)
request_properties = extract_properties(message)
get_request_files(request_properties)
Expand All @@ -113,7 +113,7 @@ class TestFilesDeletion():
@patch("StatusChangedQueueTrigger.set_output_event_to_trigger_container_deletion")
@patch.dict(os.environ, {"TRE_ID": "tre-id"}, clear=True)
def test_delete_request_files_should_be_called_on_cancel_stage(self, mock_set_output_event_to_trigger_container_deletion):
message_body = "{ \"data\": { \"request_id\":\"123\",\"new_status\":\"cancelled\" ,\"previous_status\":\"draft\" , \"type\":\"export\", \"workspace_id\":\"ws1\" }}"
message_body = "{ \"data\": { \"request_id\":\"123\",\"new_status\":\"cancelled\" ,\"previous_status\":\"draft\" , \"type\":\"export\", \"unique_identifier_suffix\":\"ws1\" }}"
message = _mock_service_bus_message(body=message_body)
main(msg=message, stepResultEvent=MagicMock(), dataDeletionEvent=MagicMock())
assert mock_set_output_event_to_trigger_container_deletion.called
Expand Down
2 changes: 1 addition & 1 deletion api_app/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.13.1"
__version__ = "0.14.0"
4 changes: 4 additions & 0 deletions api_app/api/routes/migrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ async def migrate_database(resources_repo=Depends(get_repository(ResourceReposit
num_updated = await airlock_migration.update_review_decision_values()
migrations.append(Migration(issueNumber="3152", status=f'Updated {num_updated} airlock requests with new reviewDecision value'))

logging.info("PR 3243 - Migrate reviewDecision of Airlock Reviews")
num_updated = await resource_migration.add_unique_identifier_suffix()
migrations.append(Migration(issueNumber="3243", status=f'Added the unique_identifier_suffix field to {num_updated} resources'))

return MigrationOutList(migrations=migrations)
except Exception as e:
logging.exception("Failed to migrate database")
Expand Down
9 changes: 9 additions & 0 deletions api_app/db/migrations/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,12 @@ async def archive_history(self, resource_history_repository: ResourceHistoryRepo
num_updated = num_updated + 1

return num_updated

async def add_unique_identifier_suffix(self) -> int:
num_updated = 0
for resource in await self.query("SELECT * from c WHERE NOT IS_DEFINED(c.properties.unique_identifier_suffix)"):
resource['properties']['unique_identifier_suffix'] = resource['id'][-4:]
await self.update_item_dict(resource)
num_updated = num_updated + 1

return num_updated
5 changes: 2 additions & 3 deletions api_app/event_grid/event_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,15 @@
from models.domain.workspace import Workspace


async def send_status_changed_event(airlock_request: AirlockRequest, previous_status: Optional[AirlockRequestStatus]):
async def send_status_changed_event(airlock_request: AirlockRequest, unique_identifier_suffix: str, previous_status: Optional[AirlockRequestStatus]):
request_id = airlock_request.id
new_status = airlock_request.status.value
previous_status = previous_status.value if previous_status else None
request_type = airlock_request.type.value
short_workspace_id = airlock_request.workspaceId[-4:]

status_changed_event = EventGridEvent(
event_type="statusChanged",
data=StatusChangedData(request_id=request_id, new_status=new_status, previous_status=previous_status, type=request_type, workspace_id=short_workspace_id).__dict__,
data=StatusChangedData(request_id=request_id, new_status=new_status, previous_status=previous_status, type=request_type, unique_identifier_suffix=unique_identifier_suffix).__dict__,
subject=f"{request_id}/statusChanged",
data_version="2.0"
)
Expand Down
2 changes: 1 addition & 1 deletion api_app/models/domain/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,4 @@ class StatusChangedData(AzureTREModel):
new_status: str
previous_status: Optional[str]
type: str
workspace_id: str
unique_identifier_suffix: str
Loading