Skip to content

Commit

Permalink
fix(lib.validation): properly validate ca-signed keypairs (#652)
Browse files Browse the repository at this point in the history
Change the mechanism we use to validate keypairs to not assume that a 
certificate is self-signed. This causes some valid keypairs to fail.

AAP-33294
  • Loading branch information
BrennanPaciorek authored Nov 21, 2024
1 parent aa092d6 commit 0bb17c4
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 5 deletions.
66 changes: 66 additions & 0 deletions ansible_base/lib/testing/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,72 @@ def rsa_keypair(rsa_keypair_factory):
return rsa_keypair_factory()


@copy_fixture(copies=3)
@pytest.fixture
def rsa_keypair_with_signed_cert(rsa_keypair_factory):
root_keypair = rsa_keypair_factory()
root_private_key = serialization.load_pem_private_key(root_keypair.private.encode("utf-8"), password=None)
root_public_key = serialization.load_pem_public_key(root_keypair.public.encode("utf-8"))
rsa_keypair = rsa_keypair_factory()
public_key = serialization.load_pem_public_key(rsa_keypair.public.encode("utf-8"))
issuer = x509.Name(
[
x509.NameAttribute(NameOID.COUNTRY_NAME, u"US"),
x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u"California"),
x509.NameAttribute(NameOID.LOCALITY_NAME, u"San Francisco"),
x509.NameAttribute(NameOID.ORGANIZATION_NAME, u"My Company"),
x509.NameAttribute(NameOID.COMMON_NAME, u"mycompany.com"),
]
)
subject = x509.Name(
[
x509.NameAttribute(NameOID.COUNTRY_NAME, u"US"),
x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u"California"),
x509.NameAttribute(NameOID.LOCALITY_NAME, u"San Francisco"),
x509.NameAttribute(NameOID.ORGANIZATION_NAME, u"My Company"),
x509.NameAttribute(NameOID.COMMON_NAME, u"qa.mycompany.com"),
]
)
root_certificate = (
x509.CertificateBuilder()
.subject_name(issuer)
.issuer_name(issuer)
.public_key(root_public_key)
.serial_number(x509.random_serial_number())
.not_valid_before(datetime.utcnow())
.not_valid_after(datetime.utcnow() + timedelta(days=365))
.add_extension(
x509.SubjectAlternativeName([x509.DNSName(u"mycompany.com")]),
critical=False,
)
.sign(root_private_key, hashes.SHA256())
)
certificate = (
x509.CertificateBuilder()
.subject_name(subject)
.issuer_name(issuer)
.public_key(public_key)
.serial_number(x509.random_serial_number())
.not_valid_before(datetime.utcnow())
.not_valid_after(datetime.utcnow() + timedelta(days=365))
.add_extension(
x509.SubjectAlternativeName([x509.DNSName(u"qa.mycompany.com")]),
critical=False,
)
.sign(root_private_key, hashes.SHA256())
)

root_certificate_bytes = root_certificate.public_bytes(serialization.Encoding.PEM).decode("utf-8")
certificate_bytes = certificate.public_bytes(serialization.Encoding.PEM).decode("utf-8")

RSAKeyPairWithCert = namedtuple("RSAKeyPairWithCert", ["private", "public", "certificate"])
CertificateChain = namedtuple("CertificateChain", ["root", "subordinate"])

root_keypair_with_cert = RSAKeyPairWithCert(private=root_keypair.private, public=root_keypair.public, certificate=root_certificate_bytes)
subordinate_keypair_with_cert = RSAKeyPairWithCert(private=rsa_keypair.private, public=rsa_keypair.public, certificate=certificate_bytes)
return CertificateChain(root=root_keypair_with_cert, subordinate=subordinate_keypair_with_cert)


@copy_fixture(copies=3)
@pytest.fixture
def rsa_keypair_with_cert(rsa_keypair_factory):
Expand Down
16 changes: 11 additions & 5 deletions ansible_base/lib/utils/validation.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import base64
import binascii
import re
import secrets
from urllib.parse import urlparse, urlunsplit

from cryptography.exceptions import InvalidSignature
Expand Down Expand Up @@ -79,7 +80,7 @@ def validate_cert_with_key(public_cert_string, private_key_string):
# None if one of the parameters wasn't set
# False if we failed to load an item (should be pre-tried by your serializer)
# A ValidationError exception if the key/value don't match
# True if everything checks out
# True if everything checks out, meaning that the certificate and private key form a valid keypair

if not private_key_string or not public_cert_string:
return None
Expand All @@ -92,11 +93,16 @@ def validate_cert_with_key(public_cert_string, private_key_string):
except Exception:
return False

# Generate nonce for keypair verification
nonce = secrets.token_bytes(64)
signature = private_key.sign(nonce, padding.PKCS1v15(), public_cert.signature_hash_algorithm)

try:
# We have both pieces of the puzzle, lets make sure they interlock
private_key.public_key().verify(
public_cert.signature,
public_cert.tbs_certificate_bytes,
# We have both pieces of the puzzle, lets make sure they interlock;
# do so by verifying the nonce we just signed can be verified by the provided certificate
public_cert.public_key().verify(
signature,
nonce,
# Depends on the algorithm used to create the certificate
padding.PKCS1v15(),
public_cert.signature_hash_algorithm,
Expand Down
10 changes: 10 additions & 0 deletions test_app/tests/lib/utils/test_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,16 @@ def test_validate_cert_with_key_mismatch(rsa_keypair_with_cert_1, rsa_keypair_wi
assert "The certificate and private key do not match" in str(e.value)


def test_validate_cert_with_signed_certificate(rsa_keypair_with_signed_cert_1):
"""
Ensure that validate_cert_with_key raises a ValidationError when the cert and key don't match.
"""
keypair = rsa_keypair_with_signed_cert_1.root
assert validate_cert_with_key(keypair.certificate, keypair.private)
keypair = rsa_keypair_with_signed_cert_1.subordinate
assert validate_cert_with_key(keypair.certificate, keypair.private)


def test_validate_image_data_with_valid_data():
"""
Ensure that validate_image_data accepts valid data.
Expand Down

0 comments on commit 0bb17c4

Please sign in to comment.