From 7a16703dff2840457ba2dfd0557a1ef731eb6f25 Mon Sep 17 00:00:00 2001 From: Felix Fontein Date: Mon, 8 Apr 2019 10:07:56 +0200 Subject: [PATCH] Add openssl_privatekey_info module (#54845) * Add openssl_privatekey_info module. * Addressing review feedback. * Update docs. * Update tests. * Work around too broad sanity checks. * ... * Don't die when None is returned. * Use OpenSSL to extract RSA and DSA key data. * Extend tests. * Make OpenSSL code compatible to OpenSSL < 1.1. * Rewrite tests to use result dicts instead of result lists. * Skip ECC for too old PyOpenSSL. * Reformulate. * Improve return_private_key_data docs. * Rename path_content -> content. * Add sample. * Cleanup. * Add key consistency check. * Improve description. * Adjust minimal version. * Fallback code for some pyOpenSSL < 16.0 versions. * Also support Ed25519 and Ed448 keys (or not). * Add more consistency checks. * Verify DSA keys manually. * Improve DSA key validation. * Forgot one condition. * Make validation more robust. * Move generic arithmetic code to module_utils/crypto.py. --- lib/ansible/module_utils/crypto.py | 70 +- .../modules/crypto/openssl_certificate.py | 2 +- .../crypto/openssl_certificate_info.py | 15 +- lib/ansible/modules/crypto/openssl_csr.py | 4 +- .../modules/crypto/openssl_privatekey.py | 30 +- .../modules/crypto/openssl_privatekey_info.py | 627 ++++++++++++++++++ .../targets/openssl_privatekey_info/aliases | 2 + .../openssl_privatekey_info/meta/main.yml | 2 + .../openssl_privatekey_info/tasks/impl.yml | 167 +++++ .../openssl_privatekey_info/tasks/main.yml | 70 ++ 10 files changed, 970 insertions(+), 19 deletions(-) create mode 100644 lib/ansible/modules/crypto/openssl_privatekey_info.py create mode 100644 test/integration/targets/openssl_privatekey_info/aliases create mode 100644 test/integration/targets/openssl_privatekey_info/meta/main.yml create mode 100644 test/integration/targets/openssl_privatekey_info/tasks/impl.yml create mode 100644 test/integration/targets/openssl_privatekey_info/tasks/main.yml diff --git a/lib/ansible/module_utils/crypto.py b/lib/ansible/module_utils/crypto.py index 8f036993ae..765b782c1d 100644 --- a/lib/ansible/module_utils/crypto.py +++ b/lib/ansible/module_utils/crypto.py @@ -95,20 +95,34 @@ def get_fingerprint(path, passphrase=None): privatekey = load_privatekey(path, passphrase, check_passphrase=False) try: publickey = crypto.dump_publickey(crypto.FILETYPE_ASN1, privatekey) - return get_fingerprint_of_bytes(publickey) except AttributeError: # If PyOpenSSL < 16.0 crypto.dump_publickey() will fail. - # By doing this we prevent the code from raising an error - # yet we return no value in the fingerprint hash. - return None + try: + bio = crypto._new_mem_buf() + rc = crypto._lib.i2d_PUBKEY_bio(bio, privatekey._pkey) + if rc != 1: + crypto._raise_current_error() + publickey = crypto._bio_to_string(bio) + except AttributeError: + # By doing this we prevent the code from raising an error + # yet we return no value in the fingerprint hash. + return None + return get_fingerprint_of_bytes(publickey) -def load_privatekey(path, passphrase=None, check_passphrase=True, backend='pyopenssl'): - """Load the specified OpenSSL private key.""" +def load_privatekey(path, passphrase=None, check_passphrase=True, content=None, backend='pyopenssl'): + """Load the specified OpenSSL private key. + + The content can also be specified via content; in that case, + this function will not load the key from disk. + """ try: - with open(path, 'rb') as b_priv_key_fh: - priv_key_detail = b_priv_key_fh.read() + if content is None: + with open(path, 'rb') as b_priv_key_fh: + priv_key_detail = b_priv_key_fh.read() + else: + priv_key_detail = content if backend == 'pyopenssl': @@ -773,3 +787,43 @@ def cryptography_get_basic_constraints(constraints): else: raise OpenSSLObjectError('Unknown basic constraint "{0}"'.format(constraint)) return ca, path_length + + +def binary_exp_mod(f, e, m): + '''Computes f^e mod m in O(log e) multiplications modulo m.''' + # Compute len_e = floor(log_2(e)) + len_e = -1 + x = e + while x > 0: + x >>= 1 + len_e += 1 + # Compute f**e mod m + result = 1 + for k in range(len_e, -1, -1): + result = (result * result) % m + if ((e >> k) & 1) != 0: + result = (result * f) % m + return result + + +def simple_gcd(a, b): + '''Compute GCD of its two inputs.''' + while b != 0: + a, b = b, a % b + return a + + +def quick_is_not_prime(n): + '''Does some quick checks to see if we can poke a hole into the primality of n. + + A result of `False` does **not** mean that the number is prime; it just means + that we couldn't detect quickly whether it is not prime. + ''' + if n <= 2: + return True + # The constant in the next line is the product of all primes < 200 + if simple_gcd(n, 7799922041683461553249199106329813876687996789903550945093032474868511536164700810) > 1: + return True + # TODO: maybe do some iterations of Miller-Rabin to increase confidence + # (https://en.wikipedia.org/wiki/Miller%E2%80%93Rabin_primality_test) + return False diff --git a/lib/ansible/modules/crypto/openssl_certificate.py b/lib/ansible/modules/crypto/openssl_certificate.py index 845676abb7..242fd09829 100644 --- a/lib/ansible/modules/crypto/openssl_certificate.py +++ b/lib/ansible/modules/crypto/openssl_certificate.py @@ -1777,7 +1777,7 @@ def main(): # Fail if no backend has been found if backend == 'auto': - module.fail_json(msg=("Can't detect none of the required Python libraries " + module.fail_json(msg=("Can't detect any of the required Python libraries " "cryptography (>= {0}) or PyOpenSSL (>= {1})").format( MINIMAL_CRYPTOGRAPHY_VERSION, MINIMAL_PYOPENSSL_VERSION)) diff --git a/lib/ansible/modules/crypto/openssl_certificate_info.py b/lib/ansible/modules/crypto/openssl_certificate_info.py index d098fae410..697acda6d6 100644 --- a/lib/ansible/modules/crypto/openssl_certificate_info.py +++ b/lib/ansible/modules/crypto/openssl_certificate_info.py @@ -637,8 +637,19 @@ class CertificateInfoPyOpenSSL(CertificateInfo): self.cert.get_pubkey() ) except AttributeError: - self.module.warn('Your pyOpenSSL version does not support dumping public keys. ' - 'Please upgrade to version 16.0 or newer, or use the cryptography backend.') + try: + # pyOpenSSL < 16.0: + bio = crypto._new_mem_buf() + if binary: + rc = crypto._lib.i2d_PUBKEY_bio(bio, self.cert.get_pubkey()._pkey) + else: + rc = crypto._lib.PEM_write_bio_PUBKEY(bio, self.cert.get_pubkey()._pkey) + if rc != 1: + crypto._raise_current_error() + return crypto._bio_to_string(bio) + except AttributeError: + self.module.warn('Your pyOpenSSL version does not support dumping public keys. ' + 'Please upgrade to version 16.0 or newer, or use the cryptography backend.') def _get_serial_number(self): return self.cert.get_serial_number() diff --git a/lib/ansible/modules/crypto/openssl_csr.py b/lib/ansible/modules/crypto/openssl_csr.py index b3fd25cd33..7c8c4b3f3d 100644 --- a/lib/ansible/modules/crypto/openssl_csr.py +++ b/lib/ansible/modules/crypto/openssl_csr.py @@ -880,8 +880,8 @@ def main(): # Success? if backend == 'auto': - module.fail_json(msg=('Can detect none of the Python libraries ' - 'cryptography (>= {0}) and pyOpenSSL (>= {1})').format( + module.fail_json(msg=("Can't detect any of the required Python libraries " + "cryptography (>= {0}) or PyOpenSSL (>= {1})").format( MINIMAL_CRYPTOGRAPHY_VERSION, MINIMAL_PYOPENSSL_VERSION)) try: diff --git a/lib/ansible/modules/crypto/openssl_privatekey.py b/lib/ansible/modules/crypto/openssl_privatekey.py index 494b9c90c6..ed6155b0d6 100644 --- a/lib/ansible/modules/crypto/openssl_privatekey.py +++ b/lib/ansible/modules/crypto/openssl_privatekey.py @@ -56,7 +56,7 @@ options: - Depending on the curve, you need a newer version of the cryptography backend. type: str default: RSA - #choices: [ DSA, ECC, RSA, X448, X25519 ] + #choices: [ DSA, ECC, RSA, X448, X25519, Ed448, Ed25519 ] choices: [ DSA, ECC, RSA ] curve: description: @@ -246,6 +246,16 @@ else: CRYPTOGRAPHY_HAS_X448 = True except ImportError: CRYPTOGRAPHY_HAS_X448 = False + try: + import cryptography.hazmat.primitives.asymmetric.ed25519 + CRYPTOGRAPHY_HAS_ED25519 = True + except ImportError: + CRYPTOGRAPHY_HAS_ED25519 = False + try: + import cryptography.hazmat.primitives.asymmetric.ed448 + CRYPTOGRAPHY_HAS_ED448 = True + except ImportError: + CRYPTOGRAPHY_HAS_ED448 = False from ansible.module_utils import crypto as crypto_utils from ansible.module_utils._text import to_native, to_bytes @@ -459,6 +469,10 @@ class PrivateKeyCryptography(PrivateKeyBase): self.module.fail_json(msg='Your cryptography version does not support X25519') if not CRYPTOGRAPHY_HAS_X448 and self.type == 'X448': self.module.fail_json(msg='Your cryptography version does not support X448') + if not CRYPTOGRAPHY_HAS_ED25519 and self.type == 'Ed25519': + self.module.fail_json(msg='Your cryptography version does not support Ed25519') + if not CRYPTOGRAPHY_HAS_ED448 and self.type == 'Ed448': + self.module.fail_json(msg='Your cryptography version does not support Ed448') def _generate_private_key_data(self): try: @@ -477,6 +491,10 @@ class PrivateKeyCryptography(PrivateKeyBase): self.privatekey = cryptography.hazmat.primitives.asymmetric.x25519.X25519PrivateKey.generate() if CRYPTOGRAPHY_HAS_X448 and self.type == 'X448': self.privatekey = cryptography.hazmat.primitives.asymmetric.x448.X448PrivateKey.generate() + if CRYPTOGRAPHY_HAS_ED25519 and self.type == 'Ed25519': + self.privatekey = cryptography.hazmat.primitives.asymmetric.ed25519.Ed25519PrivateKey.generate() + if CRYPTOGRAPHY_HAS_ED448 and self.type == 'Ed448': + self.privatekey = cryptography.hazmat.primitives.asymmetric.ed448.Ed448PrivateKey.generate() if self.type == 'ECC' and self.curve in self.curves: if self.curves[self.curve]['deprecated']: self.module.warn('Elliptic curves of type {0} should not be used for new keys!'.format(self.curve)) @@ -573,9 +591,9 @@ def main(): size=dict(type='int', default=4096), type=dict(type='str', default='RSA', choices=[ 'RSA', 'DSA', 'ECC', - # x25519 is missing serialization functions: https://github.com/pyca/cryptography/issues/4386 - # x448 is also missing it: https://github.com/pyca/cryptography/pull/4580#issuecomment-437913340 - # 'X448', 'X25519', + # FIXME: NO LONGER TRUE: x25519 is missing serialization functions: https://github.com/pyca/cryptography/issues/4386 + # FIXME: NO LONGER TRUE: x448 is also missing it: https://github.com/pyca/cryptography/pull/4580#issuecomment-437913340 + # 'X448', 'X25519', 'Ed448', 'Ed25519' ]), curve=dict(type='str', choices=[ 'secp384r1', 'secp521r1', 'secp224r1', 'secp192r1', 'secp256k1', @@ -629,8 +647,8 @@ def main(): # Success? if backend == 'auto': - module.fail_json(msg=('Can detect none of the Python libraries ' - 'cryptography (>= {0}) and pyOpenSSL (>= {1})').format( + module.fail_json(msg=("Can't detect any of the required Python libraries " + "cryptography (>= {0}) or PyOpenSSL (>= {1})").format( MINIMAL_CRYPTOGRAPHY_VERSION, MINIMAL_PYOPENSSL_VERSION)) try: diff --git a/lib/ansible/modules/crypto/openssl_privatekey_info.py b/lib/ansible/modules/crypto/openssl_privatekey_info.py new file mode 100644 index 0000000000..54f0112045 --- /dev/null +++ b/lib/ansible/modules/crypto/openssl_privatekey_info.py @@ -0,0 +1,627 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- + +# Copyright: (c) 2016-2017, Yanis Guenane +# Copyright: (c) 2017, Markus Teufelberger +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import absolute_import, division, print_function +__metaclass__ = type + +ANSIBLE_METADATA = {'metadata_version': '1.1', + 'status': ['preview'], + 'supported_by': 'community'} + +DOCUMENTATION = r''' +--- +module: openssl_privatekey_info +version_added: '2.8' +short_description: Provide information for OpenSSL private keys +description: + - This module allows one to query information on OpenSSL private keys. + - In case the key consistency checks fail, the module will fail as this indicates a faked + private key. In this case, all return variables are still returned. Note that key consistency + checks are not available all key types; if none is available, C(none) is returned for + C(key_is_consistent). + - It uses the pyOpenSSL or cryptography python library to interact with OpenSSL. If both the + cryptography and PyOpenSSL libraries are available (and meet the minimum version requirements) + cryptography will be preferred as a backend over PyOpenSSL (unless the backend is forced with + C(select_crypto_backend)) +requirements: + - PyOpenSSL >= 0.15 or cryptography >= 1.2.3 +author: + - Felix Fontein (@felixfontein) + - Yanis Guenane (@Spredzy) +options: + path: + description: + - Remote absolute path where the private key file is loaded from. + type: path + required: true + passphrase: + description: + - The passphrase for the private key. + type: str + return_private_key_data: + description: + - Whether to return private key data. + - Only set this to C(yes) when you want private information about this key to + leave the remote machine. + - "WARNING: you have to make sure that private key data isn't accidentally logged!" + type: bool + default: no + + select_crypto_backend: + description: + - Determines which crypto backend to use. + - The default choice is C(auto), which tries to use C(cryptography) if available, and falls back to C(pyopenssl). + - If set to C(pyopenssl), will try to use the L(pyOpenSSL,https://pypi.org/project/pyOpenSSL/) library. + - If set to C(cryptography), will try to use the L(cryptography,https://cryptography.io/) library. + type: str + default: auto + choices: [ auto, cryptography, pyopenssl ] + +seealso: +- module: openssl_privatekey +''' + +EXAMPLES = r''' +- name: Generate an OpenSSL private key with the default values (4096 bits, RSA) + openssl_privatekey: + path: /etc/ssl/private/ansible.com.pem + +- name: Get information on generated key + openssl_privatekey_info: + path: /etc/ssl/private/ansible.com.pem + register: result + +- name: Dump information + debug: + var: result +''' + +RETURN = r''' +can_load_key: + description: Whether the module was able to load the private key from disk + returned: always + type: bool +can_parse_key: + description: Whether the module was able to parse the private key + returned: always + type: bool +key_is_consistent: + description: + - Whether the key is consistent. Can also return C(none) next to C(yes) and + C(no), to indicate that consistency couldn't be checked. + - In case the check returns C(no), the module will fail. + returned: always + type: bool +public_key: + description: Private key's public key in PEM format + returned: success + type: str + sample: "-----BEGIN PUBLIC KEY-----\nMIICIjANBgkqhkiG9w0BAQEFAAOCAg8A..." +public_key_fingerprints: + description: + - Fingerprints of private key's public key. + - For every hash algorithm available, the fingerprint is computed. + returned: success + type: dict + sample: "{'sha256': 'd4:b3:aa:6d:c8:04:ce:4e:ba:f6:29:4d:92:a3:94:b0:c2:ff:bd:bf:33:63:11:43:34:0f:51:b0:95:09:2f:63', + 'sha512': 'f7:07:4a:f0:b0:f0:e6:8b:95:5f:f9:e6:61:0a:32:68:f1..." +type: + description: + - The key's type. + - One of C(RSA), C(DSA), C(ECC), C(Ed25519), C(X25519), C(Ed448), or C(X448). + - Will start with C(unknown) if the key type cannot be determined. + returned: success + type: str + sample: RSA +public_data: + description: + - Public key data. Depends on key type. + returned: success + type: dict +private_data: + description: + - Private key data. Depends on key type. + returned: success and when I(return_private_key_data) is set to C(yes) + type: dict +''' + + +import abc +import os +import traceback +from distutils.version import LooseVersion + +from ansible.module_utils import crypto as crypto_utils +from ansible.module_utils.basic import AnsibleModule, missing_required_lib +from ansible.module_utils._text import to_native, to_bytes + +MINIMAL_CRYPTOGRAPHY_VERSION = '1.2.3' +MINIMAL_PYOPENSSL_VERSION = '0.15' + +PYOPENSSL_IMP_ERR = None +try: + import OpenSSL + from OpenSSL import crypto + PYOPENSSL_VERSION = LooseVersion(OpenSSL.__version__) +except ImportError: + PYOPENSSL_IMP_ERR = traceback.format_exc() + PYOPENSSL_FOUND = False +else: + PYOPENSSL_FOUND = True + +CRYPTOGRAPHY_IMP_ERR = None +try: + import cryptography + from cryptography.hazmat.primitives import serialization + CRYPTOGRAPHY_VERSION = LooseVersion(cryptography.__version__) + try: + import cryptography.hazmat.primitives.asymmetric.x25519 + CRYPTOGRAPHY_HAS_X25519 = True + except ImportError: + CRYPTOGRAPHY_HAS_X25519 = False + try: + import cryptography.hazmat.primitives.asymmetric.x448 + CRYPTOGRAPHY_HAS_X448 = True + except ImportError: + CRYPTOGRAPHY_HAS_X448 = False + try: + import cryptography.hazmat.primitives.asymmetric.ed25519 + CRYPTOGRAPHY_HAS_ED25519 = True + except ImportError: + CRYPTOGRAPHY_HAS_ED25519 = False + try: + import cryptography.hazmat.primitives.asymmetric.ed448 + CRYPTOGRAPHY_HAS_ED448 = True + except ImportError: + CRYPTOGRAPHY_HAS_ED448 = False +except ImportError: + CRYPTOGRAPHY_IMP_ERR = traceback.format_exc() + CRYPTOGRAPHY_FOUND = False +else: + CRYPTOGRAPHY_FOUND = True + +SIGNATURE_TEST_DATA = b'1234' + + +def _get_cryptography_key_info(key): + key_public_data = dict() + key_private_data = dict() + if isinstance(key, cryptography.hazmat.primitives.asymmetric.rsa.RSAPrivateKey): + key_type = 'RSA' + key_public_data['size'] = key.key_size + key_public_data['modulus'] = key.public_key().public_numbers().n + key_public_data['exponent'] = key.public_key().public_numbers().e + key_private_data['p'] = key.private_numbers().p + key_private_data['q'] = key.private_numbers().q + key_private_data['exponent'] = key.private_numbers().d + elif isinstance(key, cryptography.hazmat.primitives.asymmetric.dsa.DSAPrivateKey): + key_type = 'DSA' + key_public_data['size'] = key.key_size + key_public_data['p'] = key.parameters().parameter_numbers().p + key_public_data['q'] = key.parameters().parameter_numbers().q + key_public_data['g'] = key.parameters().parameter_numbers().g + key_public_data['y'] = key.public_key().public_numbers().y + key_private_data['x'] = key.private_numbers().x + elif CRYPTOGRAPHY_HAS_X25519 and isinstance(key, cryptography.hazmat.primitives.asymmetric.x25519.X25519PrivateKey): + key_type = 'X25519' + elif CRYPTOGRAPHY_HAS_X448 and isinstance(key, cryptography.hazmat.primitives.asymmetric.x448.X448PrivateKey): + key_type = 'X448' + elif CRYPTOGRAPHY_HAS_ED25519 and isinstance(key, cryptography.hazmat.primitives.asymmetric.ed25519.Ed25519PrivateKey): + key_type = 'Ed25519' + elif CRYPTOGRAPHY_HAS_ED448 and isinstance(key, cryptography.hazmat.primitives.asymmetric.ed448.Ed448PrivateKey): + key_type = 'Ed448' + elif isinstance(key, cryptography.hazmat.primitives.asymmetric.ec.EllipticCurvePrivateKey): + key_type = 'ECC' + key_public_data['curve'] = key.public_key().curve.name + key_public_data['x'] = key.public_key().public_numbers().x + key_public_data['y'] = key.public_key().public_numbers().y + key_public_data['exponent_size'] = key.public_key().curve.key_size + key_private_data['multiplier'] = key.private_numbers().private_value + else: + key_type = 'unknown ({0})'.format(type(key)) + return key_type, key_public_data, key_private_data + + +def _check_dsa_consistency(key_public_data, key_private_data): + # Get parameters + p = key_public_data.get('p') + q = key_public_data.get('q') + g = key_public_data.get('g') + y = key_public_data.get('y') + x = key_private_data.get('x') + for v in (p, q, g, y, x): + if v is None: + return None + # Make sure that g is not 0, 1 or -1 in Z/pZ + if g < 2 or g >= p - 1: + return False + # Make sure that x is in range + if x < 1 or x >= q: + return False + # Check whether q divides p-1 + if (p - 1) % q != 0: + return False + # Check that g**q mod p == 1 + if crypto_utils.binary_exp_mod(g, q, p) != 1: + return False + # Check whether g**x mod p == y + if crypto_utils.binary_exp_mod(g, x, p) != y: + return False + # Check (quickly) whether p or q are not primes + if crypto_utils.quick_is_not_prime(q) or crypto_utils.quick_is_not_prime(p): + return False + return True + + +def _is_cryptography_key_consistent(key, key_public_data, key_private_data): + if isinstance(key, cryptography.hazmat.primitives.asymmetric.rsa.RSAPrivateKey): + return bool(key._backend._lib.RSA_check_key(key._rsa_cdata)) + if isinstance(key, cryptography.hazmat.primitives.asymmetric.dsa.DSAPrivateKey): + result = _check_dsa_consistency(key_public_data, key_private_data) + if result is not None: + return result + try: + signature = key.sign(SIGNATURE_TEST_DATA, cryptography.hazmat.primitives.hashes.SHA256()) + except AttributeError: + # sign() was added in cryptography 1.5, but we support older versions + return None + try: + key.public_key().verify( + signature, + SIGNATURE_TEST_DATA, + cryptography.hazmat.primitives.hashes.SHA256() + ) + return True + except cryptography.exceptions.InvalidSignature: + return False + if isinstance(key, cryptography.hazmat.primitives.asymmetric.ec.EllipticCurvePrivateKey): + try: + signature = key.sign( + SIGNATURE_TEST_DATA, + cryptography.hazmat.primitives.asymmetric.ec.ECDSA(cryptography.hazmat.primitives.hashes.SHA256()) + ) + except AttributeError: + # sign() was added in cryptography 1.5, but we support older versions + return None + try: + key.public_key().verify( + signature, + SIGNATURE_TEST_DATA, + cryptography.hazmat.primitives.asymmetric.ec.ECDSA(cryptography.hazmat.primitives.hashes.SHA256()) + ) + return True + except cryptography.exceptions.InvalidSignature: + return False + has_simple_sign_function = False + if CRYPTOGRAPHY_HAS_ED25519 and isinstance(key, cryptography.hazmat.primitives.asymmetric.ed25519.Ed25519PrivateKey): + has_simple_sign_function = True + if CRYPTOGRAPHY_HAS_ED448 and isinstance(key, cryptography.hazmat.primitives.asymmetric.ed448.Ed448PrivateKey): + has_simple_sign_function = True + if has_simple_sign_function: + signature = key.sign(SIGNATURE_TEST_DATA) + try: + key.public_key().verify(signature, SIGNATURE_TEST_DATA) + return True + except cryptography.exceptions.InvalidSignature: + return False + # For X25519 and X448, there's no test yet. + return None + + +class PrivateKeyInfo(crypto_utils.OpenSSLObject): + def __init__(self, module, backend): + super(PrivateKeyInfo, self).__init__( + module.params['path'], + 'present', + False, + module.check_mode, + ) + self.backend = backend + self.module = module + + self.passphrase = module.params['passphrase'] + self.return_private_key_data = module.params['return_private_key_data'] + + def generate(self): + # Empty method because crypto_utils.OpenSSLObject wants this + pass + + def dump(self): + # Empty method because crypto_utils.OpenSSLObject wants this + pass + + @abc.abstractmethod + def _get_public_key(self, binary): + pass + + @abc.abstractmethod + def _get_key_info(self): + pass + + @abc.abstractmethod + def _is_key_consistent(self, key_public_data, key_private_data): + pass + + def get_info(self): + result = dict( + can_load_key=False, + can_parse_key=False, + key_is_consistent=None, + ) + try: + with open(self.path, 'rb') as b_priv_key_fh: + priv_key_detail = b_priv_key_fh.read() + result['can_load_key'] = True + except (IOError, OSError) as exc: + self.module.fail_json(msg=to_native(exc), **result) + try: + self.key = crypto_utils.load_privatekey( + path=None, + content=priv_key_detail, + passphrase=to_bytes(self.passphrase) if self.passphrase is not None else self.passphrase, + backend=self.backend + ) + result['can_parse_key'] = True + except crypto_utils.OpenSSLObjectError as exc: + self.module.fail_json(msg=to_native(exc), **result) + + result['public_key'] = self._get_public_key(binary=False) + pk = self._get_public_key(binary=True) + result['public_key_fingerprints'] = crypto_utils.get_fingerprint_of_bytes(pk) if pk is not None else dict() + + key_type, key_public_data, key_private_data = self._get_key_info() + result['type'] = key_type + result['public_data'] = key_public_data + if self.return_private_key_data: + result['private_data'] = key_private_data + + result['key_is_consistent'] = self._is_key_consistent(key_public_data, key_private_data) + if result['key_is_consistent'] is False: + # Only fail when it is False, to avoid to fail on None (which means "we don't know") + result['key_is_consistent'] = False + self.module.fail_json( + msg="Private key is not consistent! (See " + "https://blog.hboeck.de/archives/888-How-I-tricked-Symantec-with-a-Fake-Private-Key.html)", + **result + ) + return result + + +class PrivateKeyInfoCryptography(PrivateKeyInfo): + """Validate the supplied private key, using the cryptography backend""" + def __init__(self, module): + super(PrivateKeyInfoCryptography, self).__init__(module, 'cryptography') + + def _get_public_key(self, binary): + return self.key.public_key().public_bytes( + serialization.Encoding.DER if binary else serialization.Encoding.PEM, + serialization.PublicFormat.SubjectPublicKeyInfo + ) + + def _get_key_info(self): + return _get_cryptography_key_info(self.key) + + def _is_key_consistent(self, key_public_data, key_private_data): + return _is_cryptography_key_consistent(self.key, key_public_data, key_private_data) + + +class PrivateKeyInfoPyOpenSSL(PrivateKeyInfo): + """validate the supplied private key.""" + + def __init__(self, module): + super(PrivateKeyInfoPyOpenSSL, self).__init__(module, 'pyopenssl') + + def _get_public_key(self, binary): + try: + return crypto.dump_publickey( + crypto.FILETYPE_ASN1 if binary else crypto.FILETYPE_PEM, + self.key + ) + except AttributeError: + try: + # pyOpenSSL < 16.0: + bio = crypto._new_mem_buf() + if binary: + rc = crypto._lib.i2d_PUBKEY_bio(bio, self.key._pkey) + else: + rc = crypto._lib.PEM_write_bio_PUBKEY(bio, self.key._pkey) + if rc != 1: + crypto._raise_current_error() + return crypto._bio_to_string(bio) + except AttributeError: + self.module.warn('Your pyOpenSSL version does not support dumping public keys. ' + 'Please upgrade to version 16.0 or newer, or use the cryptography backend.') + + def bigint_to_int(self, bn): + '''Convert OpenSSL BIGINT to Python integer''' + if bn == OpenSSL._util.ffi.NULL: + return None + try: + hex = OpenSSL._util.lib.BN_bn2hex(bn) + return int(OpenSSL._util.ffi.string(hex), 16) + finally: + OpenSSL._util.lib.OPENSSL_free(hex) + + def _get_key_info(self): + key_public_data = dict() + key_private_data = dict() + openssl_key_type = self.key.type() + try_fallback = True + if crypto.TYPE_RSA == openssl_key_type: + key_type = 'RSA' + key_public_data['size'] = self.key.bits() + + try: + # Use OpenSSL directly to extract key data + key = OpenSSL._util.lib.EVP_PKEY_get1_RSA(self.key._pkey) + key = OpenSSL._util.ffi.gc(key, OpenSSL._util.lib.RSA_free) + # OpenSSL 1.1 and newer have functions to extract the parameters + # from the EVP PKEY data structures. Older versions didn't have + # these getters, and it was common use to simply access the values + # directly. Since there's no guarantee that these data structures + # will still be accessible in the future, we use the getters for + # 1.1 and later, and directly access the values for 1.0.x and + # earlier. + if OpenSSL.SSL.OPENSSL_VERSION_NUMBER >= 0x10100000: + # Get modulus and exponents + n = OpenSSL._util.ffi.new("BIGNUM **") + e = OpenSSL._util.ffi.new("BIGNUM **") + d = OpenSSL._util.ffi.new("BIGNUM **") + OpenSSL._util.lib.RSA_get0_key(key, n, e, d) + key_public_data['modulus'] = self.bigint_to_int(n[0]) + key_public_data['exponent'] = self.bigint_to_int(e[0]) + key_private_data['exponent'] = self.bigint_to_int(d[0]) + # Get factors + p = OpenSSL._util.ffi.new("BIGNUM **") + q = OpenSSL._util.ffi.new("BIGNUM **") + OpenSSL._util.lib.RSA_get0_factors(key, p, q) + key_private_data['p'] = self.bigint_to_int(p[0]) + key_private_data['q'] = self.bigint_to_int(q[0]) + else: + # Get modulus and exponents + key_public_data['modulus'] = self.bigint_to_int(key.n) + key_public_data['exponent'] = self.bigint_to_int(key.e) + key_private_data['exponent'] = self.bigint_to_int(key.d) + # Get factors + key_private_data['p'] = self.bigint_to_int(key.p) + key_private_data['q'] = self.bigint_to_int(key.q) + try_fallback = False + except AttributeError: + # Use fallback if available + pass + elif crypto.TYPE_DSA == openssl_key_type: + key_type = 'DSA' + key_public_data['size'] = self.key.bits() + + try: + # Use OpenSSL directly to extract key data + key = OpenSSL._util.lib.EVP_PKEY_get1_DSA(self.key._pkey) + key = OpenSSL._util.ffi.gc(key, OpenSSL._util.lib.DSA_free) + # OpenSSL 1.1 and newer have functions to extract the parameters + # from the EVP PKEY data structures. Older versions didn't have + # these getters, and it was common use to simply access the values + # directly. Since there's no guarantee that these data structures + # will still be accessible in the future, we use the getters for + # 1.1 and later, and directly access the values for 1.0.x and + # earlier. + if OpenSSL.SSL.OPENSSL_VERSION_NUMBER >= 0x10100000: + # Get public parameters (primes and group element) + p = OpenSSL._util.ffi.new("BIGNUM **") + q = OpenSSL._util.ffi.new("BIGNUM **") + g = OpenSSL._util.ffi.new("BIGNUM **") + OpenSSL._util.lib.DSA_get0_pqg(key, p, q, g) + key_public_data['p'] = self.bigint_to_int(p[0]) + key_public_data['q'] = self.bigint_to_int(q[0]) + key_public_data['g'] = self.bigint_to_int(g[0]) + # Get public and private key exponents + y = OpenSSL._util.ffi.new("BIGNUM **") + x = OpenSSL._util.ffi.new("BIGNUM **") + OpenSSL._util.lib.DSA_get0_key(key, y, x) + key_public_data['y'] = self.bigint_to_int(y[0]) + key_private_data['x'] = self.bigint_to_int(x[0]) + else: + # Get public parameters (primes and group element) + key_public_data['p'] = self.bigint_to_int(key.p) + key_public_data['q'] = self.bigint_to_int(key.q) + key_public_data['g'] = self.bigint_to_int(key.g) + # Get public and private key exponents + key_public_data['y'] = self.bigint_to_int(key.pub_key) + key_private_data['x'] = self.bigint_to_int(key.priv_key) + try_fallback = False + except AttributeError: + # Use fallback if available + pass + else: + # Return 'unknown' + key_type = 'unknown ({0})'.format(self.key.type()) + # If needed and if possible, fall back to cryptography + if try_fallback and PYOPENSSL_VERSION >= LooseVersion('16.1.0') and CRYPTOGRAPHY_FOUND: + return _get_cryptography_key_info(self.key.to_cryptography_key()) + return key_type, key_public_data, key_private_data + + def _is_key_consistent(self, key_public_data, key_private_data): + openssl_key_type = self.key.type() + if crypto.TYPE_RSA == openssl_key_type: + try: + return self.key.check() + except crypto.Error: + # OpenSSL error means that key is not consistent + return False + if crypto.TYPE_DSA == openssl_key_type: + result = _check_dsa_consistency(key_public_data, key_private_data) + if result is not None: + return result + signature = crypto.sign(self.key, SIGNATURE_TEST_DATA, 'sha256') + # Verify wants a cert (where it can get the public key from) + cert = crypto.X509() + cert.set_pubkey(self.key) + try: + crypto.verify(cert, signature, SIGNATURE_TEST_DATA, 'sha256') + return True + except crypto.Error: + return False + # If needed and if possible, fall back to cryptography + if PYOPENSSL_VERSION >= LooseVersion('16.1.0') and CRYPTOGRAPHY_FOUND: + return _is_cryptography_key_consistent(self.key.to_cryptography_key(), key_public_data, key_private_data) + return None + + +def main(): + module = AnsibleModule( + argument_spec=dict( + path=dict(type='path', required=True), + passphrase=dict(type='str', no_log=True), + return_private_key_data=dict(type='bool', default=False), + select_crypto_backend=dict(type='str', default='auto', choices=['auto', 'cryptography', 'pyopenssl']), + ), + supports_check_mode=True, + ) + + try: + base_dir = os.path.dirname(module.params['path']) or '.' + if not os.path.isdir(base_dir): + module.fail_json( + name=base_dir, + msg='The directory %s does not exist or the file is not a directory' % base_dir + ) + + backend = module.params['select_crypto_backend'] + if backend == 'auto': + # Detect what backend we can use + can_use_cryptography = CRYPTOGRAPHY_FOUND and CRYPTOGRAPHY_VERSION >= LooseVersion(MINIMAL_CRYPTOGRAPHY_VERSION) + can_use_pyopenssl = PYOPENSSL_FOUND and PYOPENSSL_VERSION >= LooseVersion(MINIMAL_PYOPENSSL_VERSION) + + # If cryptography is available we'll use it + if can_use_cryptography: + backend = 'cryptography' + elif can_use_pyopenssl: + backend = 'pyopenssl' + + # Fail if no backend has been found + if backend == 'auto': + module.fail_json(msg=("Can't detect any of the required Python libraries " + "cryptography (>= {0}) or PyOpenSSL (>= {1})").format( + MINIMAL_CRYPTOGRAPHY_VERSION, + MINIMAL_PYOPENSSL_VERSION)) + + if backend == 'pyopenssl': + if not PYOPENSSL_FOUND: + module.fail_json(msg=missing_required_lib('pyOpenSSL'), exception=PYOPENSSL_IMP_ERR) + privatekey = PrivateKeyInfoPyOpenSSL(module) + elif backend == 'cryptography': + if not CRYPTOGRAPHY_FOUND: + module.fail_json(msg=missing_required_lib('cryptography'), exception=CRYPTOGRAPHY_IMP_ERR) + privatekey = PrivateKeyInfoCryptography(module) + + result = privatekey.get_info() + module.exit_json(**result) + except crypto_utils.OpenSSLObjectError as exc: + module.fail_json(msg=to_native(exc)) + + +if __name__ == "__main__": + main() diff --git a/test/integration/targets/openssl_privatekey_info/aliases b/test/integration/targets/openssl_privatekey_info/aliases new file mode 100644 index 0000000000..6eae8bd8dd --- /dev/null +++ b/test/integration/targets/openssl_privatekey_info/aliases @@ -0,0 +1,2 @@ +shippable/posix/group1 +destructive diff --git a/test/integration/targets/openssl_privatekey_info/meta/main.yml b/test/integration/targets/openssl_privatekey_info/meta/main.yml new file mode 100644 index 0000000000..800aff6428 --- /dev/null +++ b/test/integration/targets/openssl_privatekey_info/meta/main.yml @@ -0,0 +1,2 @@ +dependencies: + - setup_openssl diff --git a/test/integration/targets/openssl_privatekey_info/tasks/impl.yml b/test/integration/targets/openssl_privatekey_info/tasks/impl.yml new file mode 100644 index 0000000000..2bb8d489ff --- /dev/null +++ b/test/integration/targets/openssl_privatekey_info/tasks/impl.yml @@ -0,0 +1,167 @@ +--- +- debug: + msg: "Executing tests with backend {{ select_crypto_backend }}" + +- name: ({{select_crypto_backend}}) Get key 1 info + openssl_privatekey_info: + path: '{{ output_dir }}/privatekey_1.pem' + select_crypto_backend: '{{ select_crypto_backend }}' + register: result + +- name: Check that RSA key info is ok + assert: + that: + - "'public_key' in result" + - "'public_key_fingerprints' in result" + - "'type' in result" + - "result.type == 'RSA'" + - "'public_data' in result" + - "2 ** (result.public_data.size - 1) < result.public_data.modulus < 2 ** result.public_data.size" + - "result.public_data.exponent > 5" + - "'private_data' not in result" + +- name: Update result list + set_fact: + info_results: "{{ info_results | combine({'key1': result}) }}" + +- name: ({{select_crypto_backend}}) Get key 2 info + openssl_privatekey_info: + path: '{{ output_dir }}/privatekey_2.pem' + return_private_key_data: yes + select_crypto_backend: '{{ select_crypto_backend }}' + register: result + +- name: Check that RSA key info is ok + assert: + that: + - "'public_key' in result" + - "'public_key_fingerprints' in result" + - "'type' in result" + - "result.type == 'RSA'" + - "'public_data' in result" + - "result.public_data.size == 2048" + - "2 ** (result.public_data.size - 1) < result.public_data.modulus < 2 ** result.public_data.size" + - "result.public_data.exponent > 5" + - "'private_data' in result" + - "result.public_data.modulus == result.private_data.p * result.private_data.q" + - "result.private_data.exponent > 5" + +- name: Update result list + set_fact: + info_results: "{{ info_results | combine({'key2': result}) }}" + +- name: ({{select_crypto_backend}}) Get key 3 info (without passphrase) + openssl_privatekey_info: + path: '{{ output_dir }}/privatekey_3.pem' + return_private_key_data: yes + select_crypto_backend: '{{ select_crypto_backend }}' + ignore_errors: yes + register: result + +- name: Check that loading passphrase protected key without passphrase failed + assert: + that: + - result is failed + # Check that return values are there + - result.can_load_key is defined + - result.can_parse_key is defined + # Check that return values are correct + - result.can_load_key + - not result.can_parse_key + # Check that additional data isn't there + - "'pulic_key' not in result" + - "'pulic_key_fingerprints' not in result" + - "'type' not in result" + - "'public_data' not in result" + - "'private_data' not in result" + +- name: ({{select_crypto_backend}}) Get key 3 info (with passphrase) + openssl_privatekey_info: + path: '{{ output_dir }}/privatekey_3.pem' + passphrase: hunter2 + return_private_key_data: yes + select_crypto_backend: '{{ select_crypto_backend }}' + register: result + +- name: Check that RSA key info is ok + assert: + that: + - "'public_key' in result" + - "'public_key_fingerprints' in result" + - "'type' in result" + - "result.type == 'RSA'" + - "'public_data' in result" + - "2 ** (result.public_data.size - 1) < result.public_data.modulus < 2 ** result.public_data.size" + - "result.public_data.exponent > 5" + - "'private_data' in result" + - "result.public_data.modulus == result.private_data.p * result.private_data.q" + - "result.private_data.exponent > 5" + +- name: Update result list + set_fact: + info_results: "{{ info_results | combine({'key3': result}) }}" + +- name: ({{select_crypto_backend}}) Get key 4 info + openssl_privatekey_info: + path: '{{ output_dir }}/privatekey_4.pem' + return_private_key_data: yes + select_crypto_backend: '{{ select_crypto_backend }}' + register: result + +- block: + - name: Check that ECC key info is ok + assert: + that: + - "'public_key' in result" + - "'public_key_fingerprints' in result" + - "'type' in result" + - "result.type == 'ECC'" + - "'public_data' in result" + - "result.public_data.curve is string" + - "result.public_data.x != 0" + - "result.public_data.y != 0" + - "result.public_data.exponent_size == (521 if (ansible_distribution == 'CentOS' and ansible_distribution_major_version == '6') else 256)" + - "'private_data' in result" + - "result.private_data.multiplier > 1024" + + - name: Update result list + set_fact: + info_results: "{{ info_results | combine({'key4': result}) }}" + when: select_crypto_backend != 'pyopenssl' or (pyopenssl_version.stdout is version('16.1.0', '>=') and cryptography_version.stdout is version('0.0', '>')) + +- name: Check that ECC key info is ok + assert: + that: + - "'public_key' in result" + - "'public_key_fingerprints' in result" + - "'type' in result" + - "result.type.startswith('unknown ')" + - "'public_data' in result" + - "'private_data' in result" + when: select_crypto_backend == 'pyopenssl' and not (pyopenssl_version.stdout is version('16.1.0', '>=') and cryptography_version.stdout is version('0.0', '>')) + +- name: ({{select_crypto_backend}}) Get key 5 info + openssl_privatekey_info: + path: '{{ output_dir }}/privatekey_5.pem' + return_private_key_data: yes + select_crypto_backend: '{{ select_crypto_backend }}' + register: result + +- name: Check that DSA key info is ok + assert: + that: + - "'public_key' in result" + - "'public_key_fingerprints' in result" + - "'type' in result" + - "result.type == 'DSA'" + - "'public_data' in result" + - "result.public_data.p > 2" + - "result.public_data.q > 2" + - "result.public_data.g >= 2" + - "result.public_data.y > 2" + - "'private_data' in result" + - "result.private_data.x > 2" + +- name: Update result list + set_fact: + info_results: "{{ info_results | combine({'key5': result}) }}" diff --git a/test/integration/targets/openssl_privatekey_info/tasks/main.yml b/test/integration/targets/openssl_privatekey_info/tasks/main.yml new file mode 100644 index 0000000000..34c45b8748 --- /dev/null +++ b/test/integration/targets/openssl_privatekey_info/tasks/main.yml @@ -0,0 +1,70 @@ +--- +- name: Generate privatekey 1 + openssl_privatekey: + path: '{{ output_dir }}/privatekey_1.pem' + +- name: Generate privatekey 2 (less bits) + openssl_privatekey: + path: '{{ output_dir }}/privatekey_2.pem' + type: RSA + size: 2048 + +- name: Generate privatekey 3 (with password) + openssl_privatekey: + path: '{{ output_dir }}/privatekey_3.pem' + passphrase: hunter2 + cipher: auto + select_crypto_backend: cryptography + +- name: Generate privatekey 4 (ECC) + openssl_privatekey: + path: '{{ output_dir }}/privatekey_4.pem' + type: ECC + curve: "{{ (ansible_distribution == 'CentOS' and ansible_distribution_major_version == '6') | ternary('secp521r1', 'secp256k1') }}" + # ^ cryptography on CentOS6 doesn't support secp256k1, so we use secp521r1 instead + select_crypto_backend: cryptography + +- name: Generate privatekey 5 (DSA) + openssl_privatekey: + path: '{{ output_dir }}/privatekey_5.pem' + type: DSA + size: 1024 + +- name: Prepare result list + set_fact: + info_results: {} + +- name: Running tests with pyOpenSSL backend + include_tasks: impl.yml + vars: + select_crypto_backend: pyopenssl + when: pyopenssl_version.stdout is version('0.15', '>=') + +- name: Prepare result list + set_fact: + pyopenssl_info_results: "{{ info_results }}" + info_results: {} + +- name: Running tests with cryptography backend + include_tasks: impl.yml + vars: + select_crypto_backend: cryptography + when: cryptography_version.stdout is version('1.2.3', '>=') + +- name: Prepare result list + set_fact: + cryptography_info_results: "{{ info_results }}" + +- block: + - name: Dump pyOpenSSL results + debug: + var: pyopenssl_info_results + - name: Dump cryptography results + debug: + var: cryptography_info_results + - name: Compare results + assert: + that: + - pyopenssl_info_results[item] == cryptography_info_results[item] + loop: "{{ pyopenssl_info_results.keys() | intersect(cryptography_info_results.keys()) | list }}" + when: pyopenssl_version.stdout is version('0.15', '>=') and cryptography_version.stdout is version('1.2.3', '>=')