Skip to content

Commit

Permalink
RHEL-9435: Get AWS metadata via IMDSv2
Browse files Browse the repository at this point in the history
* Card ID: RHEL-9435

Even though both versions are officially supported, the AWS teams are
tracking connections making v1 requests as WARNINGs [0].

This patch switches the order to try to use IMDSv2 first.

[0]: https://github.com/aws/aws-imds-packet-analyzer
  • Loading branch information
m-horky committed Nov 1, 2023
1 parent 65ab1a6 commit 0eab79b
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 28 deletions.
23 changes: 9 additions & 14 deletions src/cloud_what/providers/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,25 +281,20 @@ def _get_metadata_from_server_imds_v2(self) -> Union[str, None]:

def _get_metadata_from_server(self) -> Union[str, None]:
"""
Try to get metadata from server as is described in this document:
Try to get metadata from server as described in these documents:
- https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/configuring-instance-metadata-service.html
- https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/instance-metadata-v2-how-it-works.html
https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/configuring-instance-metadata-service.html
IMDSv2 requires two HTTP requests (first requests a token, second obtains the metadata).
If that fails, try to fall back to IDMSv1 (which is older and can be disabled in the AWS console).
It is possible to use two versions. We will try to use version IMDSv1 first (this version requires
only one HTTP request), when the usage of IMDSv1 is forbidden, then we will try to use IMDSv2 version.
The version requires two requests (get session TOKEN and then get own metadata using token)
:return: String with metadata or None
"""
metadata = self._get_metadata_from_server_imds_v2()
if metadata is not None:
return metadata

if self._token_exists() is False:
# First try to get metadata using IMDSv1
metadata = self._get_metadata_from_server_imds_v1()

if metadata is not None:
return metadata

# When it wasn't possible to get metadata using IMDSv1, then try to get metadata using IMDSv2
return self._get_metadata_from_server_imds_v2()
return self._get_metadata_from_server_imds_v1()

def _get_signature_from_cache_file(self) -> None:
"""
Expand Down
33 changes: 19 additions & 14 deletions test/test_auto_registration.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import unittest
import base64
from unittest.mock import patch, Mock
from unittest.mock import Mock

from subscription_manager.scripts.rhsmcertd_worker import _collect_cloud_info
from .rhsmlib.facts.test_cloud_facts import AWS_METADATA
Expand Down Expand Up @@ -93,11 +93,17 @@ def send_only_imds_v2_is_supported(request, *args, **kwargs):
return mock_result


def mock_prepare_request(request):
return request
class TestAutomaticRegistration(unittest.TestCase):
def setUp(self):
_ = aws.AWSCloudProvider({})
aws.AWSCloudProvider._instance._get_metadata_from_cache = Mock(return_value=None)
aws.AWSCloudProvider._instance._get_token_from_cache_file = Mock(return_value=None)
aws.AWSCloudProvider._instance._write_token_to_cache_file = Mock()

_ = azure.AzureCloudProvider({})
azure.AzureCloudProvider._instance._get_metadata_from_cache = Mock(return_value=None)
azure.AzureCloudProvider._instance.get_api_versions = Mock(return_value="")

class TestAutomaticRegistration(unittest.TestCase):
def tearDown(self):
aws.AWSCloudProvider._instance = None
aws.AWSCloudProvider._initialized = False
Expand All @@ -106,17 +112,16 @@ def tearDown(self):
gcp.GCPCloudProvider._instance = None
gcp.GCPCloudProvider._initialized = False

@patch("cloud_what.providers.aws.requests.Session")
def test_collect_cloud_info_one_cloud_provider_detected(self, mock_session_class):
def test_collect_cloud_info_one_cloud_provider_detected(self):
"""
Test the case, when we try to collect cloud info only for
one detected cloud provider
"""
mock_session = Mock()
mock_session.send = send_only_imds_v2_is_supported
mock_session.prepare_request = Mock(side_effect=mock_prepare_request)
mock_session.send = send_only_imds_v2_is_supported()
mock_session.prepare_request = Mock(side_effect=lambda request: request)
mock_session.hooks = {"response": []}
mock_session_class.return_value = mock_session
aws.AWSCloudProvider._instance._session = mock_session

cloud_list = ["aws"]
cloud_info = _collect_cloud_info(cloud_list, Mock())
Expand All @@ -136,18 +141,18 @@ def test_collect_cloud_info_one_cloud_provider_detected(self, mock_session_class
signature = base64.b64decode(b64_signature).decode("utf-8")
self.assertEqual(signature, "-----BEGIN PKCS7-----\n" + AWS_SIGNATURE + "\n-----END PKCS7-----")

@patch("cloud_what.providers.aws.requests.Session")
def test_collect_cloud_info_more_cloud_providers_detected(self, mock_session_class):
def test_collect_cloud_info_more_cloud_providers_detected(self):
"""
Test the case, when we try to collect cloud info only for
more than one cloud providers, because more than one cloud
providers were detected
"""
mock_session = Mock()
mock_session.send = send_only_imds_v2_is_supported
mock_session.prepare_request = Mock(side_effect=mock_prepare_request)
mock_session.send = send_only_imds_v2_is_supported()
mock_session.prepare_request = Mock(side_effect=lambda request: request)
mock_session.hooks = {"response": []}
mock_session_class.return_value = mock_session
aws.AWSCloudProvider._instance._session = mock_session
azure.AzureCloudProvider._instance._session = Mock()

# More cloud providers detected
cloud_list = ["azure", "aws"]
Expand Down

0 comments on commit 0eab79b

Please sign in to comment.