From 409f8a15bd90c87eb26ac43f1fe759346336e254 Mon Sep 17 00:00:00 2001 From: Felix Fontein Date: Mon, 14 Jan 2019 14:33:51 +0100 Subject: [PATCH] openssl_csr: Allow to use cryptography as backend (#50324) * Allow to use cryptography as backend for openssl_csr. * Use different curve. * Adding changelog. Includes changelog fragment for #49416, which didn't include one. --- changelogs/fragments/openssl-cryptography.yml | 3 + lib/ansible/modules/crypto/openssl_csr.py | 586 +++++++++++++++--- .../targets/openssl_csr/tasks/impl.yml | 148 +++++ .../targets/openssl_csr/tasks/main.yml | 161 +---- 4 files changed, 686 insertions(+), 212 deletions(-) create mode 100644 changelogs/fragments/openssl-cryptography.yml create mode 100644 test/integration/targets/openssl_csr/tasks/impl.yml diff --git a/changelogs/fragments/openssl-cryptography.yml b/changelogs/fragments/openssl-cryptography.yml new file mode 100644 index 0000000000..394c6adfe9 --- /dev/null +++ b/changelogs/fragments/openssl-cryptography.yml @@ -0,0 +1,3 @@ +minor_changes: +- "openssl_csr - now works with both PyOpenSSL and cryptography Python libraries. Autodetection can be overridden with ``select_crypto_backend`` option." +- "openssl_privatekey - now works with both PyOpenSSL and cryptography Python libraries. Autodetection can be overridden with ``select_crypto_backend`` option." diff --git a/lib/ansible/modules/crypto/openssl_csr.py b/lib/ansible/modules/crypto/openssl_csr.py index e67c9dd0b7..e8add2f2c5 100644 --- a/lib/ansible/modules/crypto/openssl_csr.py +++ b/lib/ansible/modules/crypto/openssl_csr.py @@ -25,7 +25,9 @@ description: the subjectAltName, keyUsage, extendedKeyUsage, basicConstraints and OCSP Must Staple extensions." requirements: - - "python-pyOpenSSL >= 0.15" + - "One of the following Python libraries:" + - "cryptography >= 1.3" + - "pyOpenSSL >= 0.15" options: state: required: false @@ -169,6 +171,22 @@ options: are required to reject such certificates (see U(https://tools.ietf.org/html/rfc7633#section-4))." version_added: 2.5 + select_crypto_backend: + description: + - "Determines which crypto backend to use. The default choice is C(auto), + which tries to use C(cryptography) if available, and falls back to + C(pyopenssl)." + - "If set to C(pyopenssl), will try to use the L(pyOpenSSL,https://pypi.org/project/pyOpenSSL/) + library." + - "If set to C(cryptography), will try to use the + L(cryptography,https://cryptography.io/) library." + type: str + default: 'auto' + choices: + - auto + - cryptography + - pyopenssl + version_added: "2.8" extends_documentation_fragment: files notes: @@ -289,37 +307,59 @@ ocsp_must_staple: sample: false ''' +import abc import os +from distutils.version import LooseVersion from ansible.module_utils import crypto as crypto_utils from ansible.module_utils.basic import AnsibleModule -from ansible.module_utils._text import to_native, to_bytes +from ansible.module_utils._text import to_native, to_bytes, to_text + +MINIMAL_PYOPENSSL_VERSION = '0.15' +MINIMAL_CRYPTOGRAPHY_VERSION = '1.3' try: import OpenSSL from OpenSSL import crypto + PYOPENSSL_VERSION = LooseVersion(OpenSSL.__version__) except ImportError: - pyopenssl_found = False + PYOPENSSL_FOUND = False else: - pyopenssl_found = True + PYOPENSSL_FOUND = True if OpenSSL.SSL.OPENSSL_VERSION_NUMBER >= 0x10100000: # OpenSSL 1.1.0 or newer - MUST_STAPLE_NAME = b"tlsfeature" - MUST_STAPLE_VALUE = b"status_request" + OPENSSL_MUST_STAPLE_NAME = b"tlsfeature" + OPENSSL_MUST_STAPLE_VALUE = b"status_request" else: # OpenSSL 1.0.x or older - MUST_STAPLE_NAME = b"1.3.6.1.5.5.7.1.24" - MUST_STAPLE_VALUE = b"DER:30:03:02:01:05" + OPENSSL_MUST_STAPLE_NAME = b"1.3.6.1.5.5.7.1.24" + OPENSSL_MUST_STAPLE_VALUE = b"DER:30:03:02:01:05" + +try: + import cryptography + import cryptography.x509 + import cryptography.x509.oid + import cryptography.exceptions + import cryptography.hazmat.backends + import cryptography.hazmat.primitives.serialization + import cryptography.hazmat.primitives.hashes + CRYPTOGRAPHY_VERSION = LooseVersion(cryptography.__version__) +except ImportError: + CRYPTOGRAPHY_FOUND = False +else: + CRYPTOGRAPHY_FOUND = True + CRYPTOGRAPHY_MUST_STAPLE_NAME = cryptography.x509.oid.ObjectIdentifier("1.3.6.1.5.5.7.1.24") + CRYPTOGRAPHY_MUST_STAPLE_VALUE = b"\x30\x03\x02\x01\x05" class CertificateSigningRequestError(crypto_utils.OpenSSLObjectError): pass -class CertificateSigningRequest(crypto_utils.OpenSSLObject): +class CertificateSigningRequestBase(crypto_utils.OpenSSLObject): def __init__(self, module): - super(CertificateSigningRequest, self).__init__( + super(CertificateSigningRequestBase, self).__init__( module.params['path'], module.params['state'], module.params['force'], @@ -358,53 +398,22 @@ class CertificateSigningRequest(crypto_utils.OpenSSLObject): if not self.subjectAltName: for sub in self.subject: - if OpenSSL._util.lib.OBJ_txt2nid(to_bytes(sub[0])) == 13: # 13 is the NID for "commonName" + if sub[0] in ('commonName', 'CN'): self.subjectAltName = ['DNS:%s' % sub[1]] break + @abc.abstractmethod + def _generate_csr(self): + pass + def generate(self, module): '''Generate the certificate signing request.''' - if not self.check(module, perms_required=False) or self.force: - req = crypto.X509Req() - req.set_version(self.version - 1) - subject = req.get_subject() - for entry in self.subject: - if entry[1] is not None: - # Workaround for https://github.com/pyca/pyopenssl/issues/165 - nid = OpenSSL._util.lib.OBJ_txt2nid(to_bytes(entry[0])) - OpenSSL._util.lib.X509_NAME_add_entry_by_NID(subject._name, nid, OpenSSL._util.lib.MBSTRING_UTF8, to_bytes(entry[1]), -1, -1, 0) - - extensions = [] - if self.subjectAltName: - altnames = ', '.join(self.subjectAltName) - extensions.append(crypto.X509Extension(b"subjectAltName", self.subjectAltName_critical, altnames.encode('ascii'))) - - if self.keyUsage: - usages = ', '.join(self.keyUsage) - extensions.append(crypto.X509Extension(b"keyUsage", self.keyUsage_critical, usages.encode('ascii'))) - - if self.extendedKeyUsage: - usages = ', '.join(self.extendedKeyUsage) - extensions.append(crypto.X509Extension(b"extendedKeyUsage", self.extendedKeyUsage_critical, usages.encode('ascii'))) - - if self.basicConstraints: - usages = ', '.join(self.basicConstraints) - extensions.append(crypto.X509Extension(b"basicConstraints", self.basicConstraints_critical, usages.encode('ascii'))) - - if self.ocspMustStaple: - extensions.append(crypto.X509Extension(MUST_STAPLE_NAME, self.ocspMustStaple_critical, MUST_STAPLE_VALUE)) - - if extensions: - req.add_extensions(extensions) - - req.set_pubkey(self.privatekey) - req.sign(self.privatekey, self.digest) - self.request = req + result = self._generate_csr() try: csr_file = open(self.path, 'wb') - csr_file.write(crypto.dump_certificate_request(crypto.FILETYPE_PEM, self.request)) + csr_file.write(result) csr_file.close() except (IOError, OSError) as exc: raise CertificateSigningRequestError(exc) @@ -415,12 +424,91 @@ class CertificateSigningRequest(crypto_utils.OpenSSLObject): if module.set_fs_attributes_if_different(file_args, False): self.changed = True + @abc.abstractmethod + def _load_private_key(self): + pass + + @abc.abstractmethod + def _check_csr(self): + pass + def check(self, module, perms_required=True): """Ensure the resource is in its desired state.""" - state_and_perms = super(CertificateSigningRequest, self).check(module, perms_required) + state_and_perms = super(CertificateSigningRequestBase, self).check(module, perms_required) + self._load_private_key() + + if not state_and_perms: + return False + + return self._check_csr() + + def dump(self): + '''Serialize the object into a dictionary.''' + + result = { + 'privatekey': self.privatekey_path, + 'filename': self.path, + 'subject': self.subject, + 'subjectAltName': self.subjectAltName, + 'keyUsage': self.keyUsage, + 'extendedKeyUsage': self.extendedKeyUsage, + 'basicConstraints': self.basicConstraints, + 'ocspMustStaple': self.ocspMustStaple, + 'changed': self.changed + } + + return result + + +class CertificateSigningRequestPyOpenSSL(CertificateSigningRequestBase): + + def __init__(self, module): + super(CertificateSigningRequestPyOpenSSL, self).__init__(module) + + def _generate_csr(self): + req = crypto.X509Req() + req.set_version(self.version - 1) + subject = req.get_subject() + for entry in self.subject: + if entry[1] is not None: + # Workaround for https://github.com/pyca/pyopenssl/issues/165 + nid = OpenSSL._util.lib.OBJ_txt2nid(to_bytes(entry[0])) + OpenSSL._util.lib.X509_NAME_add_entry_by_NID(subject._name, nid, OpenSSL._util.lib.MBSTRING_UTF8, to_bytes(entry[1]), -1, -1, 0) + + extensions = [] + if self.subjectAltName: + altnames = ', '.join(self.subjectAltName) + extensions.append(crypto.X509Extension(b"subjectAltName", self.subjectAltName_critical, altnames.encode('ascii'))) + + if self.keyUsage: + usages = ', '.join(self.keyUsage) + extensions.append(crypto.X509Extension(b"keyUsage", self.keyUsage_critical, usages.encode('ascii'))) + + if self.extendedKeyUsage: + usages = ', '.join(self.extendedKeyUsage) + extensions.append(crypto.X509Extension(b"extendedKeyUsage", self.extendedKeyUsage_critical, usages.encode('ascii'))) + + if self.basicConstraints: + usages = ', '.join(self.basicConstraints) + extensions.append(crypto.X509Extension(b"basicConstraints", self.basicConstraints_critical, usages.encode('ascii'))) + + if self.ocspMustStaple: + extensions.append(crypto.X509Extension(OPENSSL_MUST_STAPLE_NAME, self.ocspMustStaple_critical, OPENSSL_MUST_STAPLE_VALUE)) + + if extensions: + req.add_extensions(extensions) + + req.set_pubkey(self.privatekey) + req.sign(self.privatekey, self.digest) + self.request = req + + return crypto.dump_certificate_request(crypto.FILETYPE_PEM, self.request) + + def _load_private_key(self): self.privatekey = crypto_utils.load_privatekey(self.privatekey_path, self.privatekey_passphrase) + def _check_csr(self): def _check_subject(csr): subject = [(OpenSSL._util.lib.OBJ_txt2nid(to_bytes(sub[0])), to_bytes(sub[1])) for sub in self.subject] current_subject = [(OpenSSL._util.lib.OBJ_txt2nid(to_bytes(sub[0])), to_bytes(sub[1])) for sub in csr.get_subject().get_components()] @@ -476,7 +564,7 @@ class CertificateSigningRequest(crypto_utils.OpenSSLObject): return _check_keyUsage_(extensions, b'basicConstraints', self.basicConstraints, self.basicConstraints_critical) def _check_ocspMustStaple(extensions): - oms_ext = [ext for ext in extensions if to_bytes(ext.get_short_name()) == MUST_STAPLE_NAME and to_bytes(ext) == MUST_STAPLE_VALUE] + oms_ext = [ext for ext in extensions if to_bytes(ext.get_short_name()) == OPENSSL_MUST_STAPLE_NAME and to_bytes(ext) == OPENSSL_MUST_STAPLE_VALUE] if OpenSSL.SSL.OPENSSL_VERSION_NUMBER < 0x10100000: # Older versions of libssl don't know about OCSP Must Staple oms_ext.extend([ext for ext in extensions if ext.get_short_name() == b'UNDEF' and ext.get_data() == b'\x30\x03\x02\x01\x05']) @@ -497,29 +585,345 @@ class CertificateSigningRequest(crypto_utils.OpenSSLObject): except crypto.Error: return False - if not state_and_perms: - return False - csr = crypto_utils.load_certificate_request(self.path) return _check_subject(csr) and _check_extensions(csr) and _check_signature(csr) - def dump(self): - '''Serialize the object into a dictionary.''' - result = { - 'privatekey': self.privatekey_path, - 'filename': self.path, - 'subject': self.subject, - 'subjectAltName': self.subjectAltName, - 'keyUsage': self.keyUsage, - 'extendedKeyUsage': self.extendedKeyUsage, - 'basicConstraints': self.basicConstraints, - 'ocspMustStaple': self.ocspMustStaple, - 'changed': self.changed - } +class CertificateSigningRequestCryptography(CertificateSigningRequestBase): - return result + def __init__(self, module): + super(CertificateSigningRequestCryptography, self).__init__(module) + self.cryptography_backend = cryptography.hazmat.backends.default_backend() + + def _get_name_oid(self, id): + if id in ('CN', 'commonName'): + return cryptography.x509.oid.NameOID.COMMON_NAME + if id in ('C', 'countryName'): + return cryptography.x509.oid.NameOID.COUNTRY_NAME + if id in ('L', 'localityName'): + return cryptography.x509.oid.NameOID.LOCALITY_NAME + if id in ('ST', 'stateOrProvinceName'): + return cryptography.x509.oid.NameOID.STATE_OR_PROVINCE_NAME + if id in ('street', 'streetAddress'): + return cryptography.x509.oid.NameOID.STREET_ADDRESS + if id in ('O', 'organizationName'): + return cryptography.x509.oid.NameOID.ORGANIZATION_NAME + if id in ('OU', 'organizationalUnitName'): + return cryptography.x509.oid.NameOID.ORGANIZATIONAL_UNIT_NAME + if id in ('serialNumber', ): + return cryptography.x509.oid.NameOID.SERIAL_NUMBER + if id in ('SN', 'surname'): + return cryptography.x509.oid.NameOID.SURNAME + if id in ('GN', 'givenName'): + return cryptography.x509.oid.NameOID.GIVEN_NAME + if id in ('title', ): + return cryptography.x509.oid.NameOID.TITLE + if id in ('generationQualifier', ): + return cryptography.x509.oid.NameOID.GENERATION_QUALIFIER + if id in ('x500UniqueIdentifier', ): + return cryptography.x509.oid.NameOID.X500_UNIQUE_IDENTIFIER + if id in ('dnQualifier', ): + return cryptography.x509.oid.NameOID.DN_QUALIFIER + if id in ('pseudonym', ): + return cryptography.x509.oid.NameOID.PSEUDONYM + if id in ('UID', 'userId'): + return cryptography.x509.oid.NameOID.USER_ID + if id in ('DC', 'domainComponent'): + return cryptography.x509.oid.NameOID.DOMAIN_COMPONENT + if id in ('emailAddress', ): + return cryptography.x509.oid.NameOID.EMAIL_ADDRESS + if id in ('jurisdictionC', 'jurisdictionCountryName'): + return cryptography.x509.oid.NameOID.JURISDICTION_COUNTRY_NAME + if id in ('jurisdictionL', 'jurisdictionLocalityName'): + return cryptography.x509.oid.NameOID.JURISDICTION_LOCALITY_NAME + if id in ('jurisdictionST', 'jurisdictionStateOrProvinceName'): + return cryptography.x509.oid.NameOID.JURISDICTION_STATE_OR_PROVINCE_NAME + if id in ('businessCategory', ): + return cryptography.x509.oid.NameOID.BUSINESS_CATEGORY + if id in ('postalAddress', ): + return cryptography.x509.oid.NameOID.POSTAL_ADDRESS + if id in ('postalCode', ): + return cryptography.x509.oid.NameOID.POSTAL_CODE + raise CertificateSigningRequestError('Unknown subject field identifier "{0}"'.format(id)) + + def _get_san(self, name): + if name.startswith('DNS:'): + return cryptography.x509.DNSName(to_text(name[4:])) + if name.startswith('IP:'): + return cryptography.x509.IPAddress(to_text(name[3:])) + if name.startswith('email:'): + return cryptography.x509.RFC822Name(to_text(name[6:])) + if name.startswith('URI:'): + return cryptography.x509.UniformResourceIdentifier(to_text(name[4:])) + if name.startswith('DirName:'): + return cryptography.x509.DirectoryName(to_text(name[8:])) + if ':' not in name: + raise CertificateSigningRequestError('Cannot parse Subject Alternative Name "{0}" (forgot "DNS:" prefix?)'.format(name)) + raise CertificateSigningRequestError('Cannot parse Subject Alternative Name "{0}" (potentially unsupported by cryptography backend)'.format(name)) + + def _get_keyusage(self, usage): + if usage in ('Digital Signature', 'digitalSignature'): + return 'digital_signature' + if usage in ('Non Repudiation', 'nonRepudiation'): + return 'content_commitment' + if usage in ('Key Encipherment', 'keyEncipherment'): + return 'key_encipherment' + if usage in ('Data Encipherment', 'dataEncipherment'): + return 'data_encipherment' + if usage in ('Key Agreement', 'keyAgreement'): + return 'key_agreement' + if usage in ('Certificate Sign', 'keyCertSign'): + return 'key_cert_sign' + if usage in ('CRL Sign', 'cRLSign'): + return 'crl_sign' + if usage in ('Encipher Only', 'encipherOnly'): + return 'encipher_only' + if usage in ('Decipher Only', 'decipherOnly'): + return 'decipher_only' + raise CertificateSigningRequestError('Unknown key usage "{0}"'.format(usage)) + + def _get_ext_keyusage(self, usage): + if usage in ('serverAuth', 'TLS Web Server Authentication'): + return cryptography.x509.oid.ExtendedKeyUsageOID.SERVER_AUTH + if usage in ('clientAuth', 'TLS Web Client Authentication'): + return cryptography.x509.oid.ExtendedKeyUsageOID.CLIENT_AUTH + if usage in ('codeSigning', 'Code Signing'): + return cryptography.x509.oid.ExtendedKeyUsageOID.CODE_SIGNING + if usage in ('emailProtection', 'E-mail Protection'): + return cryptography.x509.oid.ExtendedKeyUsageOID.EMAIL_PROTECTION + if usage in ('timeStamping', 'Time Stamping'): + return cryptography.x509.oid.ExtendedKeyUsageOID.TIME_STAMPING + if usage in ('OCSPSigning', 'OCSP Signing'): + return cryptography.x509.oid.ExtendedKeyUsageOID.OCSP_SIGNING + if usage in ('anyExtendedKeyUsage', 'Any Extended Key Usage'): + return cryptography.x509.oid.ExtendedKeyUsageOID.ANY_EXTENDED_KEY_USAGE + if usage in ('qcStatements', ): + return cryptography.x509.oid.ObjectIdentifier("1.3.6.1.5.5.7.1.3") + if usage in ('DVCS', ): + return cryptography.x509.oid.ObjectIdentifier("1.3.6.1.5.5.7.3.10") + if usage in ('IPSec User', 'ipsecUser'): + return cryptography.x509.oid.ObjectIdentifier("1.3.6.1.5.5.7.3.7") + if usage in ('Biometric Info', 'biometricInfo'): + return cryptography.x509.oid.ObjectIdentifier("1.3.6.1.5.5.7.1.2") + # FIXME need some more, probably all from https://www.iana.org/assignments/smi-numbers/smi-numbers.xhtml#smi-numbers-1.3.6.1.5.5.7.3 + raise CertificateSigningRequestError('Unknown extended key usage "{0}"'.format(usage)) + + def _get_basic_constraints(self, constraints): + ca = False + path_length = None + if constraints: + for constraint in constraints: + if constraint.startswith('CA:'): + if constraint == 'CA:TRUE': + ca = True + elif constraint == 'CA:FALSE': + ca = False + else: + raise CertificateSigningRequestError('Unknown basic constraint value "{0}" for CA'.format(constraint[3:])) + elif constraint.startswith('pathlen:'): + v = constraint[len('pathlen:'):] + try: + path_length = int(v) + except Exception as e: + raise CertificateSigningRequestError('Cannot parse path length constraint "{0}" ({1})'.format(v, e)) + else: + raise CertificateSigningRequestError('Unknown basic constraint "{0}"'.format(constraint)) + return ca, path_length + + def _parse_key_usage(self): + params = dict( + digital_signature=False, + content_commitment=False, + key_encipherment=False, + data_encipherment=False, + key_agreement=False, + key_cert_sign=False, + crl_sign=False, + encipher_only=False, + decipher_only=False, + ) + for usage in self.keyUsage: + params[self._get_keyusage(usage)] = True + return params + + def _generate_csr(self): + csr = cryptography.x509.CertificateSigningRequestBuilder() + csr = csr.subject_name(cryptography.x509.Name([ + cryptography.x509.NameAttribute(self._get_name_oid(entry[0]), to_text(entry[1])) for entry in self.subject + ])) + + if self.subjectAltName: + csr = csr.add_extension(cryptography.x509.SubjectAlternativeName([ + self._get_san(name) for name in self.subjectAltName + ]), critical=self.subjectAltName_critical) + + if self.keyUsage: + params = self._parse_key_usage() + csr = csr.add_extension(cryptography.x509.KeyUsage(**params), critical=self.keyUsage_critical) + + if self.extendedKeyUsage: + usages = [self._get_ext_keyusage(usage) for usage in self.extendedKeyUsage] + csr = csr.add_extension(cryptography.x509.ExtendedKeyUsage(usages), critical=self.extendedKeyUsage_critical) + + if self.basicConstraints: + params = {} + ca, path_length = self._get_basic_constraints(self.basicConstraints) + csr = csr.add_extension(cryptography.x509.BasicConstraints(ca, path_length), critical=self.basicConstraints_critical) + + if self.ocspMustStaple: + try: + # This only works with cryptography >= 2.1 + csr = csr.add_extension(cryptography.x509.TLSFeature([cryptography.x509.TLSFeatureType.status_request]), critical=self.ocspMustStaple_critical) + except AttributeError as dummy: + csr = csr.add_extension( + cryptography.x509.UnrecognizedExtension(CRYPTOGRAPHY_MUST_STAPLE_NAME, CRYPTOGRAPHY_MUST_STAPLE_VALUE), + critical=self.ocspMustStaple_critical + ) + + digest = None + if self.digest == 'sha256': + digest = cryptography.hazmat.primitives.hashes.SHA256() + elif self.digest == 'sha384': + digest = cryptography.hazmat.primitives.hashes.SHA384() + elif self.digest == 'sha512': + digest = cryptography.hazmat.primitives.hashes.SHA512() + elif self.digest == 'sha1': + digest = cryptography.hazmat.primitives.hashes.SHA1() + elif self.digest == 'md5': + digest = cryptography.hazmat.primitives.hashes.MD5() + # FIXME + else: + raise CertificateSigningRequestError('Unsupported digest "{0}"'.format(self.digest)) + self.request = csr.sign(self.privatekey, digest, self.cryptography_backend) + + return self.request.public_bytes(cryptography.hazmat.primitives.serialization.Encoding.PEM) + + def _load_private_key(self): + try: + with open(self.privatekey_path, 'rb') as f: + self.privatekey = cryptography.hazmat.primitives.serialization.load_pem_private_key( + f.read(), + None if self.privatekey_passphrase is None else to_bytes(self.privatekey_passphrase), + backend=self.cryptography_backend + ) + except Exception as e: + raise CertificateSigningRequestError(e) + + def _check_csr(self): + def _check_subject(csr): + subject = [(self._get_name_oid(entry[0]), entry[1]) for entry in self.subject] + current_subject = [(sub.oid, sub.value) for sub in csr.subject] + return set(subject) == set(current_subject) + + def _find_extension(extensions, type): + return next( + (ext for ext in extensions if isinstance(ext.value, type)), + None + ) + + def _check_subjectAltName(extensions): + current_altnames_ext = _find_extension(extensions, cryptography.x509.SubjectAlternativeName) + current_altnames = [str(altname) for altname in current_altnames_ext.value] if current_altnames_ext else [] + altnames = [str(self._get_san(altname)) for altname in self.subjectAltName] + if set(altnames) != set(current_altnames): + return False + if altnames: + if current_altnames_ext.critical != self.subjectAltName_critical: + return False + return True + + def _check_keyUsage(extensions): + current_keyusage_ext = _find_extension(extensions, cryptography.x509.KeyUsage) + if not self.keyUsage: + return current_keyusage_ext is None + elif current_keyusage_ext is None: + return False + params = self._parse_key_usage() + for param in params: + if getattr(current_keyusage_ext.value, '_' + param) != params[param]: + return False + if current_keyusage_ext.critical != self.keyUsage_critical: + return False + return True + + def _check_extenededKeyUsage(extensions): + current_usages_ext = _find_extension(extensions, cryptography.x509.ExtendedKeyUsage) + current_usages = [str(usage) for usage in current_usages_ext.value] if current_usages_ext else [] + usages = [str(self._get_ext_keyusage(usage)) for usage in self.extendedKeyUsage] if self.extendedKeyUsage else [] + if set(current_usages) != set(usages): + return False + if usages: + if current_usages_ext.critical != self.extendedKeyUsage_critical: + return False + return True + + def _check_basicConstraints(extensions): + bc_ext = _find_extension(extensions, cryptography.x509.BasicConstraints) + current_ca = bc_ext.ca if bc_ext else False + current_path_length = bc_ext.path_length if bc_ext else None + ca, path_length = self._get_basic_constraints(self.basicConstraints) + # Check CA flag + if ca != current_ca: + return False + # Check path length + if path_length != current_path_length: + return False + # Check criticality + if self.basicConstraints: + if bc_ext.critical != self.basicConstraints_critical: + return False + return True + + def _check_ocspMustStaple(extensions): + try: + # This only works with cryptography >= 2.1 + tlsfeature_ext = _find_extension(extensions, cryptography.x509.TLSFeature) + has_tlsfeature = True + except AttributeError as dummy: + tlsfeature_ext = next( + (ext for ext in extensions if ext.value.oid == CRYPTOGRAPHY_MUST_STAPLE_NAME), + None + ) + has_tlsfeature = False + if self.ocspMustStaple: + if not tlsfeature_ext or tlsfeature_ext.critical != self.ocspMustStaple_critical: + return False + if has_tlsfeature: + return cryptography.x509.TLSFeatureType.status_request in tlsfeature_ext.value + else: + return tlsfeature_ext.value.value == CRYPTOGRAPHY_MUST_STAPLE_VALUE + else: + return tlsfeature_ext is None + + def _check_extensions(csr): + extensions = csr.extensions + return (_check_subjectAltName(extensions) and _check_keyUsage(extensions) and + _check_extenededKeyUsage(extensions) and _check_basicConstraints(extensions) and + _check_ocspMustStaple(extensions)) + + def _check_signature(csr): + if not csr.is_signature_valid: + return False + # To check whether public key of CSR belongs to private key, + # encode both public keys and compare PEMs. + key_a = csr.public_key().public_bytes( + cryptography.hazmat.primitives.serialization.Encoding.PEM, + cryptography.hazmat.primitives.serialization.PublicFormat.SubjectPublicKeyInfo + ) + key_b = self.privatekey.public_key().public_bytes( + cryptography.hazmat.primitives.serialization.Encoding.PEM, + cryptography.hazmat.primitives.serialization.PublicFormat.SubjectPublicKeyInfo + ) + return key_a == key_b + + try: + with open(self.path, 'rb') as f: + csr = cryptography.x509.load_pem_x509_csr(f.read(), self.cryptography_backend) + except Exception as dummy: + return False + + return _check_subject(csr) and _check_extensions(csr) and _check_signature(csr) def main(): @@ -550,24 +954,54 @@ def main(): basicConstraints_critical=dict(aliases=['basic_constraints_critical'], default=False, type='bool'), ocspMustStaple=dict(aliases=['ocsp_must_staple'], default=False, type='bool'), ocspMustStaple_critical=dict(aliases=['ocsp_must_staple_critical'], default=False, type='bool'), + select_crypto_backend=dict(required=False, choices=['auto', 'pyopenssl', 'cryptography'], default='auto', type='str'), ), add_file_common_args=True, supports_check_mode=True, ) - if not pyopenssl_found: - module.fail_json(msg='the python pyOpenSSL module is required') - - try: - getattr(crypto.X509Req, 'get_extensions') - except AttributeError: - module.fail_json(msg='You need to have PyOpenSSL>=0.15 to generate CSRs') - base_dir = os.path.dirname(module.params['path']) or '.' if not os.path.isdir(base_dir): module.fail_json(name=base_dir, msg='The directory %s does not exist or the file is not a directory' % base_dir) - csr = CertificateSigningRequest(module) + backend = module.params['select_crypto_backend'] + if backend == 'auto': + # Detection what is possible + can_use_cryptography = CRYPTOGRAPHY_FOUND and CRYPTOGRAPHY_VERSION >= LooseVersion(MINIMAL_CRYPTOGRAPHY_VERSION) + can_use_pyopenssl = PYOPENSSL_FOUND and PYOPENSSL_VERSION >= LooseVersion(MINIMAL_PYOPENSSL_VERSION) + + # Decision + if module.params['cipher'] and module.params['passphrase'] and module.params['cipher'] != 'auto': + # First try pyOpenSSL, then cryptography + if can_use_pyopenssl: + backend = 'pyopenssl' + elif can_use_cryptography: + backend = 'cryptography' + else: + # First try cryptography, then pyOpenSSL + if can_use_cryptography: + backend = 'cryptography' + elif can_use_pyopenssl: + backend = 'pyopenssl' + + # Success? + if backend == 'auto': + module.fail_json(msg=('Can detect none of the Python libraries ' + 'cryptography (>= {0}) and pyOpenSSL (>= {1})').format( + MINIMAL_CRYPTOGRAPHY_VERSION, + MINIMAL_PYOPENSSL_VERSION)) + if backend == 'pyopenssl': + if not PYOPENSSL_FOUND: + module.fail_json(msg='The Python pyOpenSSL library is required') + try: + getattr(crypto.X509Req, 'get_extensions') + except AttributeError: + module.fail_json(msg='You need to have PyOpenSSL>=0.15 to generate CSRs') + csr = CertificateSigningRequestPyOpenSSL(module) + elif backend == 'cryptography': + if not CRYPTOGRAPHY_FOUND: + module.fail_json(msg='The Python cryptography library is required') + csr = CertificateSigningRequestCryptography(module) if module.params['state'] == 'present': diff --git a/test/integration/targets/openssl_csr/tasks/impl.yml b/test/integration/targets/openssl_csr/tasks/impl.yml new file mode 100644 index 0000000000..ffc4bf32e1 --- /dev/null +++ b/test/integration/targets/openssl_csr/tasks/impl.yml @@ -0,0 +1,148 @@ +--- +- name: Generate privatekey + openssl_privatekey: + path: '{{ output_dir }}/privatekey.pem' + +- name: Generate CSR (check mode) + openssl_csr: + path: '{{ output_dir }}/csr.csr' + privatekey_path: '{{ output_dir }}/privatekey.pem' + subject: + commonName: www.ansible.com + select_crypto_backend: '{{ select_crypto_backend }}' + check_mode: yes + register: generate_csr_check + +- name: Generate CSR + openssl_csr: + path: '{{ output_dir }}/csr.csr' + privatekey_path: '{{ output_dir }}/privatekey.pem' + subject: + commonName: www.ansible.com + select_crypto_backend: '{{ select_crypto_backend }}' + register: generate_csr + +- name: Generate CSR (idempotent) + openssl_csr: + path: '{{ output_dir }}/csr.csr' + privatekey_path: '{{ output_dir }}/privatekey.pem' + subject: + commonName: www.ansible.com + select_crypto_backend: '{{ select_crypto_backend }}' + register: generate_csr_check_idempotent + +- name: Generate CSR (idempotent, check mode) + openssl_csr: + path: '{{ output_dir }}/csr.csr' + privatekey_path: '{{ output_dir }}/privatekey.pem' + subject: + commonName: www.ansible.com + select_crypto_backend: '{{ select_crypto_backend }}' + check_mode: yes + register: generate_csr_check_idempotent_check + +# keyUsage longname and shortname should be able to be used +# interchangeably. Hence the long name is specified here +# but the short name is used to test idempotency for ipsecuser +# and vice-versa for biometricInfo +- name: Generate CSR with KU and XKU + openssl_csr: + path: '{{ output_dir }}/csr_ku_xku.csr' + privatekey_path: '{{ output_dir }}/privatekey.pem' + subject: + CN: www.ansible.com + keyUsage: + - digitalSignature + - keyAgreement + extendedKeyUsage: + - qcStatements + - DVCS + - IPSec User + - biometricInfo + select_crypto_backend: '{{ select_crypto_backend }}' + +- name: Generate CSR with KU and XKU (test idempotency) + openssl_csr: + path: '{{ output_dir }}/csr_ku_xku.csr' + privatekey_path: '{{ output_dir }}/privatekey.pem' + subject: + commonName: 'www.ansible.com' + keyUsage: + - Key Agreement + - digitalSignature + extendedKeyUsage: + - ipsecUser + - qcStatements + - DVCS + - Biometric Info + select_crypto_backend: '{{ select_crypto_backend }}' + register: csr_ku_xku + +- name: Generate CSR with KU and XKU (test XKU change) + openssl_csr: + path: '{{ output_dir }}/csr_ku_xku.csr' + privatekey_path: '{{ output_dir }}/privatekey.pem' + subject: + commonName: 'www.ansible.com' + keyUsage: + - digitalSignature + - keyAgreement + extendedKeyUsage: + - ipsecUser + - qcStatements + - Biometric Info + select_crypto_backend: '{{ select_crypto_backend }}' + register: csr_ku_xku_change + +- name: Generate CSR with KU and XKU (test KU change) + openssl_csr: + path: '{{ output_dir }}/csr_ku_xku.csr' + privatekey_path: '{{ output_dir }}/privatekey.pem' + subject: + commonName: 'www.ansible.com' + keyUsage: + - digitalSignature + extendedKeyUsage: + - ipsecUser + - qcStatements + - Biometric Info + select_crypto_backend: '{{ select_crypto_backend }}' + register: csr_ku_xku_change_2 + +- name: Generate CSR with old API + openssl_csr: + path: '{{ output_dir }}/csr_oldapi.csr' + privatekey_path: '{{ output_dir }}/privatekey.pem' + commonName: www.ansible.com + select_crypto_backend: '{{ select_crypto_backend }}' + +- name: Generate CSR with OCSP Must Staple + openssl_csr: + path: '{{ output_dir }}/csr_ocsp.csr' + privatekey_path: '{{ output_dir }}/privatekey.pem' + subject_alt_name: "DNS:www.ansible.com" + ocsp_must_staple: true + select_crypto_backend: '{{ select_crypto_backend }}' + +- name: Generate CSR with OCSP Must Staple (test idempotency) + openssl_csr: + path: '{{ output_dir }}/csr_ocsp.csr' + privatekey_path: '{{ output_dir }}/privatekey.pem' + subject_alt_name: "DNS:www.ansible.com" + ocsp_must_staple: true + select_crypto_backend: '{{ select_crypto_backend }}' + register: csr_ocsp_idempotency + +- name: Generate ECC privatekey + openssl_privatekey: + path: '{{ output_dir }}/privatekey2.pem' + type: ECC + curve: secp384r1 + +- name: Generate CSR with ECC privatekey + openssl_csr: + path: '{{ output_dir }}/csr2.csr' + privatekey_path: '{{ output_dir }}/privatekey2.pem' + subject: + commonName: www.ansible.com + select_crypto_backend: '{{ select_crypto_backend }}' diff --git a/test/integration/targets/openssl_csr/tasks/main.yml b/test/integration/targets/openssl_csr/tasks/main.yml index 2c7806dd19..ceed31702f 100644 --- a/test/integration/targets/openssl_csr/tasks/main.yml +++ b/test/integration/targets/openssl_csr/tasks/main.yml @@ -1,141 +1,30 @@ --- - block: - - name: Generate privatekey - openssl_privatekey: - path: '{{ output_dir }}/privatekey.pem' + - name: Running tests with pyOpenSSL backend + include_tasks: impl.yml + vars: + select_crypto_backend: pyopenssl - - name: Generate CSR (check mode) - openssl_csr: - path: '{{ output_dir }}/csr.csr' - privatekey_path: '{{ output_dir }}/privatekey.pem' - subject: - commonName: www.ansible.com - check_mode: yes - register: generate_csr_check - - - name: Generate CSR - openssl_csr: - path: '{{ output_dir }}/csr.csr' - privatekey_path: '{{ output_dir }}/privatekey.pem' - subject: - commonName: www.ansible.com - register: generate_csr - - - name: Generate CSR (idempotent) - openssl_csr: - path: '{{ output_dir }}/csr.csr' - privatekey_path: '{{ output_dir }}/privatekey.pem' - subject: - commonName: www.ansible.com - register: generate_csr_check_idempotent - - - name: Generate CSR (idempotent, check mode) - openssl_csr: - path: '{{ output_dir }}/csr.csr' - privatekey_path: '{{ output_dir }}/privatekey.pem' - subject: - commonName: www.ansible.com - check_mode: yes - register: generate_csr_check_idempotent_check - - # keyUsage longname and shortname should be able to be used - # interchangeably. Hence the long name is specified here - # but the short name is used to test idempotency for ipsecuser - # and vice-versa for biometricInfo - - name: Generate CSR with KU and XKU - openssl_csr: - path: '{{ output_dir }}/csr_ku_xku.csr' - privatekey_path: '{{ output_dir }}/privatekey.pem' - subject: - CN: www.ansible.com - keyUsage: - - digitalSignature - - keyAgreement - extendedKeyUsage: - - qcStatements - - DVCS - - IPSec User - - biometricInfo - - - name: Generate CSR with KU and XKU (test idempotency) - openssl_csr: - path: '{{ output_dir }}/csr_ku_xku.csr' - privatekey_path: '{{ output_dir }}/privatekey.pem' - subject: - commonName: 'www.ansible.com' - keyUsage: - - Key Agreement - - digitalSignature - extendedKeyUsage: - - ipsecUser - - qcStatements - - DVCS - - Biometric Info - register: csr_ku_xku - - - name: Generate CSR with KU and XKU (test XKU change) - openssl_csr: - path: '{{ output_dir }}/csr_ku_xku.csr' - privatekey_path: '{{ output_dir }}/privatekey.pem' - subject: - commonName: 'www.ansible.com' - keyUsage: - - digitalSignature - - keyAgreement - extendedKeyUsage: - - ipsecUser - - qcStatements - - Biometric Info - register: csr_ku_xku_change - - - name: Generate CSR with KU and XKU (test KU change) - openssl_csr: - path: '{{ output_dir }}/csr_ku_xku.csr' - privatekey_path: '{{ output_dir }}/privatekey.pem' - subject: - commonName: 'www.ansible.com' - keyUsage: - - digitalSignature - extendedKeyUsage: - - ipsecUser - - qcStatements - - Biometric Info - register: csr_ku_xku_change_2 - - - name: Generate CSR with old API - openssl_csr: - path: '{{ output_dir }}/csr_oldapi.csr' - privatekey_path: '{{ output_dir }}/privatekey.pem' - commonName: www.ansible.com - - - name: Generate CSR with OCSP Must Staple - openssl_csr: - path: '{{ output_dir }}/csr_ocsp.csr' - privatekey_path: '{{ output_dir }}/privatekey.pem' - subject_alt_name: "DNS:www.ansible.com" - ocsp_must_staple: true - - - name: Generate CSR with OCSP Must Staple (test idempotency) - openssl_csr: - path: '{{ output_dir }}/csr_ocsp.csr' - privatekey_path: '{{ output_dir }}/privatekey.pem' - subject_alt_name: "DNS:www.ansible.com" - ocsp_must_staple: true - register: csr_ocsp_idempotency - - - name: Generate ECC privatekey - openssl_privatekey: - path: '{{ output_dir }}/privatekey2.pem' - type: ECC - curve: secp256k1 - - - name: Generate CSR with ECC privatekey - openssl_csr: - path: '{{ output_dir }}/csr2.csr' - privatekey_path: '{{ output_dir }}/privatekey2.pem' - subject: - commonName: www.ansible.com - - - import_tasks: ../tests/validate.yml + - import_tasks: ../tests/validate.yml when: pyopenssl_version.stdout is version('0.15', '>=') + +- name: Remove output directory + file: + path: "{{ output_dir }}" + state: absent + +- name: Re-create output directory + file: + path: "{{ output_dir }}" + state: directory + +- block: + - name: Running tests with cryptography backend + include_tasks: impl.yml + vars: + select_crypto_backend: cryptography + + - import_tasks: ../tests/validate.yml + + when: cryptography_version.stdout is version('1.3', '>=')