diff --git a/src/cloud_what/providers/aws.py b/src/cloud_what/providers/aws.py index 332c8d1a89..32e1a229cb 100644 --- a/src/cloud_what/providers/aws.py +++ b/src/cloud_what/providers/aws.py @@ -285,25 +285,20 @@ def _get_metadata_from_server_imds_v2(self) -> Union[str, None]: def _get_metadata_from_server(self) -> Union[str, None]: """ - Try to get metadata from server as is described in this document: + Try to get metadata from server as described in these documents: + - https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/configuring-instance-metadata-service.html + - https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/instance-metadata-v2-how-it-works.html - https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/configuring-instance-metadata-service.html + IMDSv2 requires two HTTP requests (first requests a token, second obtains the metadata). + If that fails, try to fall back to IDMSv1 (which is older and can be disabled in the AWS console). - It is possible to use two versions. We will try to use version IMDSv1 first (this version requires - only one HTTP request), when the usage of IMDSv1 is forbidden, then we will try to use IMDSv2 version. - The version requires two requests (get session TOKEN and then get own metadata using token) :return: String with metadata or None """ + metadata = self._get_metadata_from_server_imds_v2() + if metadata is not None: + return metadata - if self._token_exists() is False: - # First try to get metadata using IMDSv1 - metadata = self._get_metadata_from_server_imds_v1() - - if metadata is not None: - return metadata - - # When it wasn't possible to get metadata using IMDSv1, then try to get metadata using IMDSv2 - return self._get_metadata_from_server_imds_v2() + return self._get_metadata_from_server_imds_v1() def _get_signature_from_cache_file(self) -> None: """ diff --git a/test/test_auto_registration.py b/test/test_auto_registration.py index 8bcf93f2ab..5a585845a8 100644 --- a/test/test_auto_registration.py +++ b/test/test_auto_registration.py @@ -20,7 +20,7 @@ import unittest import base64 -from mock import patch, Mock +from unittest.mock import Mock from subscription_manager.scripts.rhsmcertd_worker import _collect_cloud_info from .rhsmlib_test.test_cloud_facts import AWS_METADATA @@ -49,57 +49,71 @@ AWS_TOKEN = "ABCDEFGHIJKLMNOPQRSTVWXYZabcdefghijklmnopqrstvwxyz0123==" -def send_only_imds_v2_is_supported(request, *args, **kwargs): +def send__aws_imdsv2_only(request, *args, **kwargs): """ - Mock result, when we try to get metadata using GET method against - AWS metadata provider. This mock is for the case, when only IMDSv2 - is supported by instance. + Mock result for metadata request on AWS where only IMDSv2 is supported. + This function should be used to replace function `requests.Session.send()`. + :param request: HTTP request - :return: Mock with result + :return: Mocked server result. """ - mock_result = Mock() - - if request.method == 'PUT': - if request.url == aws.AWSCloudProvider.CLOUD_PROVIDER_TOKEN_URL: - if 'X-aws-ec2-metadata-token-ttl-seconds' in request.headers: - mock_result.status_code = 200 - mock_result.text = AWS_TOKEN - else: - mock_result.status_code = 400 - mock_result.text = 'Error: TTL for token not specified' - else: - mock_result.status_code = 400 - mock_result.text = 'Error: Invalid URL' - elif request.method == 'GET': - if 'X-aws-ec2-metadata-token' in request.headers.keys(): - if request.headers['X-aws-ec2-metadata-token'] == AWS_TOKEN: - if request.url == aws.AWSCloudProvider.CLOUD_PROVIDER_METADATA_URL: - mock_result.status_code = 200 - mock_result.text = AWS_METADATA - elif request.url == aws.AWSCloudProvider.CLOUD_PROVIDER_SIGNATURE_URL: - mock_result.status_code = 200 - mock_result.text = AWS_SIGNATURE - else: - mock_result.status_code = 400 - mock_result.text = 'Error: Invalid URL' - else: - mock_result.status_code = 400 - mock_result.text = 'Error: Invalid metadata token provided' - else: - mock_result.status_code = 400 - mock_result.text = 'Error: IMDSv1 is not supported on this instance' - else: - mock_result.status_code = 400 - mock_result.text = 'Error: not supported request method' - - return mock_result - - -def mock_prepare_request(request): - return request + result = Mock() + + if request.method == "PUT": + if request.url != aws.AWSCloudProvider.CLOUD_PROVIDER_TOKEN_URL: + result.status_code = 400 + result.text = "Error: Invalid URL" + return result + + if "X-aws-ec2-metadata-token-ttl-seconds" not in request.headers: + result.status_code = 400 + result.text = "Error: TTL for token not specified" + return result + + result.status_code = 200 + result.text = AWS_TOKEN + return result + + if request.method == "GET": + if "X-aws-ec2-metadata-token" not in request.headers.keys(): + result.status_code = 400 + result.text = "Error: IMDSv1 is not supported on this instance" + return result + + if request.headers["X-aws-ec2-metadata-token"] != AWS_TOKEN: + result.status_code = 400 + result.text = "Error: Invalid metadata token provided" + return result + + if request.url == aws.AWSCloudProvider.CLOUD_PROVIDER_METADATA_URL: + result.status_code = 200 + result.text = AWS_METADATA + return result + + if request.url == aws.AWSCloudProvider.CLOUD_PROVIDER_SIGNATURE_URL: + result.status_code = 200 + result.text = AWS_SIGNATURE + return result + + result.status_code = 400 + result.text = "Error: Invalid URL" + return result + + result.status_code = 400 + result.text = "Error: not supported request method" + return result class TestAutomaticRegistration(unittest.TestCase): + def setUp(self): + _ = aws.AWSCloudProvider({}) + aws.AWSCloudProvider._instance._get_metadata_from_cache = Mock(return_value=None) + aws.AWSCloudProvider._instance._get_token_from_cache_file = Mock(return_value=None) + aws.AWSCloudProvider._instance._write_token_to_cache_file = Mock() + + _ = azure.AzureCloudProvider({}) + azure.AzureCloudProvider._instance._get_metadata_from_cache = Mock(return_value=None) + azure.AzureCloudProvider._instance.get_api_versions = Mock(return_value="") def tearDown(self): aws.AWSCloudProvider._instance = None @@ -109,17 +123,16 @@ def tearDown(self): gcp.GCPCloudProvider._instance = None gcp.GCPCloudProvider._initialized = False - @patch('cloud_what.providers.aws.requests.Session') - def test_collect_cloud_info_one_cloud_provider_detected(self, mock_session_class): + def test_collect_cloud_info_one_cloud_provider_detected(self): """ Test the case, when we try to collect cloud info only for one detected cloud provider """ mock_session = Mock() - mock_session.send = send_only_imds_v2_is_supported - mock_session.prepare_request = Mock(side_effect=mock_prepare_request) - mock_session.hooks = {'response': []} - mock_session_class.return_value = mock_session + mock_session.send = send__aws_imdsv2_only + mock_session.prepare_request = Mock(side_effect=lambda request: request) + mock_session.hooks = {"response": []} + aws.AWSCloudProvider._instance._session = mock_session cloud_list = ['aws'] cloud_info = _collect_cloud_info(cloud_list, Mock()) @@ -134,26 +147,23 @@ def test_collect_cloud_info_one_cloud_provider_detected(self, mock_session_class metadata = base64.b64decode(b64_metadata).decode('utf-8') self.assertEqual(metadata, AWS_METADATA) # Test signature - self.assertTrue('signature' in cloud_info) - b64_signature = cloud_info['signature'] - signature = base64.b64decode(b64_signature).decode('utf-8') - self.assertEqual( - signature, - '-----BEGIN PKCS7-----\n' + AWS_SIGNATURE + '\n-----END PKCS7-----' - ) + self.assertTrue("signature" in cloud_info) + b64_signature = cloud_info["signature"] + signature = base64.b64decode(b64_signature).decode("utf-8") + self.assertEqual(signature, "-----BEGIN PKCS7-----\n" + AWS_SIGNATURE + "\n-----END PKCS7-----") - @patch('cloud_what.providers.aws.requests.Session') - def test_collect_cloud_info_more_cloud_providers_detected(self, mock_session_class): + def test_collect_cloud_info_more_cloud_providers_detected(self): """ Test the case, when we try to collect cloud info only for more than one cloud providers, because more than one cloud providers were detected """ mock_session = Mock() - mock_session.send = send_only_imds_v2_is_supported - mock_session.prepare_request = Mock(side_effect=mock_prepare_request) - mock_session.hooks = {'response': []} - mock_session_class.return_value = mock_session + mock_session.send = send__aws_imdsv2_only + mock_session.prepare_request = Mock(side_effect=lambda request: request) + mock_session.hooks = {"response": []} + aws.AWSCloudProvider._instance._session = mock_session + azure.AzureCloudProvider._instance._session = Mock() # More cloud providers detected cloud_list = ['azure', 'aws']