From 0bb17c43f41549c307c6977bc48cf19d7ca268a6 Mon Sep 17 00:00:00 2001 From: Brennan Paciorek <50780403+BrennanPaciorek@users.noreply.github.com> Date: Thu, 21 Nov 2024 13:29:26 -0500 Subject: [PATCH] fix(lib.validation): properly validate ca-signed keypairs (#652) Change the mechanism we use to validate keypairs to not assume that a certificate is self-signed. This causes some valid keypairs to fail. AAP-33294 --- ansible_base/lib/testing/fixtures.py | 66 +++++++++++++++++++++ ansible_base/lib/utils/validation.py | 16 +++-- test_app/tests/lib/utils/test_validation.py | 10 ++++ 3 files changed, 87 insertions(+), 5 deletions(-) diff --git a/ansible_base/lib/testing/fixtures.py b/ansible_base/lib/testing/fixtures.py index b99b58413..2177fc432 100644 --- a/ansible_base/lib/testing/fixtures.py +++ b/ansible_base/lib/testing/fixtures.py @@ -233,6 +233,72 @@ def rsa_keypair(rsa_keypair_factory): return rsa_keypair_factory() +@copy_fixture(copies=3) +@pytest.fixture +def rsa_keypair_with_signed_cert(rsa_keypair_factory): + root_keypair = rsa_keypair_factory() + root_private_key = serialization.load_pem_private_key(root_keypair.private.encode("utf-8"), password=None) + root_public_key = serialization.load_pem_public_key(root_keypair.public.encode("utf-8")) + rsa_keypair = rsa_keypair_factory() + public_key = serialization.load_pem_public_key(rsa_keypair.public.encode("utf-8")) + issuer = x509.Name( + [ + x509.NameAttribute(NameOID.COUNTRY_NAME, u"US"), + x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u"California"), + x509.NameAttribute(NameOID.LOCALITY_NAME, u"San Francisco"), + x509.NameAttribute(NameOID.ORGANIZATION_NAME, u"My Company"), + x509.NameAttribute(NameOID.COMMON_NAME, u"mycompany.com"), + ] + ) + subject = x509.Name( + [ + x509.NameAttribute(NameOID.COUNTRY_NAME, u"US"), + x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u"California"), + x509.NameAttribute(NameOID.LOCALITY_NAME, u"San Francisco"), + x509.NameAttribute(NameOID.ORGANIZATION_NAME, u"My Company"), + x509.NameAttribute(NameOID.COMMON_NAME, u"qa.mycompany.com"), + ] + ) + root_certificate = ( + x509.CertificateBuilder() + .subject_name(issuer) + .issuer_name(issuer) + .public_key(root_public_key) + .serial_number(x509.random_serial_number()) + .not_valid_before(datetime.utcnow()) + .not_valid_after(datetime.utcnow() + timedelta(days=365)) + .add_extension( + x509.SubjectAlternativeName([x509.DNSName(u"mycompany.com")]), + critical=False, + ) + .sign(root_private_key, hashes.SHA256()) + ) + certificate = ( + x509.CertificateBuilder() + .subject_name(subject) + .issuer_name(issuer) + .public_key(public_key) + .serial_number(x509.random_serial_number()) + .not_valid_before(datetime.utcnow()) + .not_valid_after(datetime.utcnow() + timedelta(days=365)) + .add_extension( + x509.SubjectAlternativeName([x509.DNSName(u"qa.mycompany.com")]), + critical=False, + ) + .sign(root_private_key, hashes.SHA256()) + ) + + root_certificate_bytes = root_certificate.public_bytes(serialization.Encoding.PEM).decode("utf-8") + certificate_bytes = certificate.public_bytes(serialization.Encoding.PEM).decode("utf-8") + + RSAKeyPairWithCert = namedtuple("RSAKeyPairWithCert", ["private", "public", "certificate"]) + CertificateChain = namedtuple("CertificateChain", ["root", "subordinate"]) + + root_keypair_with_cert = RSAKeyPairWithCert(private=root_keypair.private, public=root_keypair.public, certificate=root_certificate_bytes) + subordinate_keypair_with_cert = RSAKeyPairWithCert(private=rsa_keypair.private, public=rsa_keypair.public, certificate=certificate_bytes) + return CertificateChain(root=root_keypair_with_cert, subordinate=subordinate_keypair_with_cert) + + @copy_fixture(copies=3) @pytest.fixture def rsa_keypair_with_cert(rsa_keypair_factory): diff --git a/ansible_base/lib/utils/validation.py b/ansible_base/lib/utils/validation.py index 9700a898c..247255389 100644 --- a/ansible_base/lib/utils/validation.py +++ b/ansible_base/lib/utils/validation.py @@ -1,6 +1,7 @@ import base64 import binascii import re +import secrets from urllib.parse import urlparse, urlunsplit from cryptography.exceptions import InvalidSignature @@ -79,7 +80,7 @@ def validate_cert_with_key(public_cert_string, private_key_string): # None if one of the parameters wasn't set # False if we failed to load an item (should be pre-tried by your serializer) # A ValidationError exception if the key/value don't match - # True if everything checks out + # True if everything checks out, meaning that the certificate and private key form a valid keypair if not private_key_string or not public_cert_string: return None @@ -92,11 +93,16 @@ def validate_cert_with_key(public_cert_string, private_key_string): except Exception: return False + # Generate nonce for keypair verification + nonce = secrets.token_bytes(64) + signature = private_key.sign(nonce, padding.PKCS1v15(), public_cert.signature_hash_algorithm) + try: - # We have both pieces of the puzzle, lets make sure they interlock - private_key.public_key().verify( - public_cert.signature, - public_cert.tbs_certificate_bytes, + # We have both pieces of the puzzle, lets make sure they interlock; + # do so by verifying the nonce we just signed can be verified by the provided certificate + public_cert.public_key().verify( + signature, + nonce, # Depends on the algorithm used to create the certificate padding.PKCS1v15(), public_cert.signature_hash_algorithm, diff --git a/test_app/tests/lib/utils/test_validation.py b/test_app/tests/lib/utils/test_validation.py index 68d9b88d9..c60c0fa4e 100644 --- a/test_app/tests/lib/utils/test_validation.py +++ b/test_app/tests/lib/utils/test_validation.py @@ -101,6 +101,16 @@ def test_validate_cert_with_key_mismatch(rsa_keypair_with_cert_1, rsa_keypair_wi assert "The certificate and private key do not match" in str(e.value) +def test_validate_cert_with_signed_certificate(rsa_keypair_with_signed_cert_1): + """ + Ensure that validate_cert_with_key raises a ValidationError when the cert and key don't match. + """ + keypair = rsa_keypair_with_signed_cert_1.root + assert validate_cert_with_key(keypair.certificate, keypair.private) + keypair = rsa_keypair_with_signed_cert_1.subordinate + assert validate_cert_with_key(keypair.certificate, keypair.private) + + def test_validate_image_data_with_valid_data(): """ Ensure that validate_image_data accepts valid data.