1
0
Fork 0
mirror of https://github.com/ansible-collections/community.general.git synced 2024-09-14 20:13:21 +02:00

Add openssl_privatekey_info module (#54845)

* Add openssl_privatekey_info module.

* Addressing review feedback.

* Update docs.

* Update tests.

* Work around too broad sanity checks.

* ...

* Don't die when None is returned.

* Use OpenSSL to extract RSA and DSA key data.

* Extend tests.

* Make OpenSSL code compatible to OpenSSL < 1.1.

* Rewrite tests to use result dicts instead of result lists.

* Skip ECC for too old PyOpenSSL.

* Reformulate.

* Improve return_private_key_data docs.

* Rename path_content -> content.

* Add sample.

* Cleanup.

* Add key consistency check.

* Improve description.

* Adjust minimal version.

* Fallback code for some pyOpenSSL < 16.0 versions.

* Also support Ed25519 and Ed448 keys (or not).

* Add more consistency checks.

* Verify DSA keys manually.

* Improve DSA key validation.

* Forgot one condition.

* Make validation more robust.

* Move generic arithmetic code to module_utils/crypto.py.
This commit is contained in:
Felix Fontein 2019-04-08 10:07:56 +02:00 committed by Martin Krizek
parent 4503426f32
commit 7a16703dff
10 changed files with 970 additions and 19 deletions

View file

@ -95,20 +95,34 @@ def get_fingerprint(path, passphrase=None):
privatekey = load_privatekey(path, passphrase, check_passphrase=False) privatekey = load_privatekey(path, passphrase, check_passphrase=False)
try: try:
publickey = crypto.dump_publickey(crypto.FILETYPE_ASN1, privatekey) publickey = crypto.dump_publickey(crypto.FILETYPE_ASN1, privatekey)
return get_fingerprint_of_bytes(publickey)
except AttributeError: except AttributeError:
# If PyOpenSSL < 16.0 crypto.dump_publickey() will fail. # If PyOpenSSL < 16.0 crypto.dump_publickey() will fail.
# By doing this we prevent the code from raising an error try:
# yet we return no value in the fingerprint hash. bio = crypto._new_mem_buf()
return None rc = crypto._lib.i2d_PUBKEY_bio(bio, privatekey._pkey)
if rc != 1:
crypto._raise_current_error()
publickey = crypto._bio_to_string(bio)
except AttributeError:
# By doing this we prevent the code from raising an error
# yet we return no value in the fingerprint hash.
return None
return get_fingerprint_of_bytes(publickey)
def load_privatekey(path, passphrase=None, check_passphrase=True, backend='pyopenssl'): def load_privatekey(path, passphrase=None, check_passphrase=True, content=None, backend='pyopenssl'):
"""Load the specified OpenSSL private key.""" """Load the specified OpenSSL private key.
The content can also be specified via content; in that case,
this function will not load the key from disk.
"""
try: try:
with open(path, 'rb') as b_priv_key_fh: if content is None:
priv_key_detail = b_priv_key_fh.read() with open(path, 'rb') as b_priv_key_fh:
priv_key_detail = b_priv_key_fh.read()
else:
priv_key_detail = content
if backend == 'pyopenssl': if backend == 'pyopenssl':
@ -773,3 +787,43 @@ def cryptography_get_basic_constraints(constraints):
else: else:
raise OpenSSLObjectError('Unknown basic constraint "{0}"'.format(constraint)) raise OpenSSLObjectError('Unknown basic constraint "{0}"'.format(constraint))
return ca, path_length return ca, path_length
def binary_exp_mod(f, e, m):
'''Computes f^e mod m in O(log e) multiplications modulo m.'''
# Compute len_e = floor(log_2(e))
len_e = -1
x = e
while x > 0:
x >>= 1
len_e += 1
# Compute f**e mod m
result = 1
for k in range(len_e, -1, -1):
result = (result * result) % m
if ((e >> k) & 1) != 0:
result = (result * f) % m
return result
def simple_gcd(a, b):
'''Compute GCD of its two inputs.'''
while b != 0:
a, b = b, a % b
return a
def quick_is_not_prime(n):
'''Does some quick checks to see if we can poke a hole into the primality of n.
A result of `False` does **not** mean that the number is prime; it just means
that we couldn't detect quickly whether it is not prime.
'''
if n <= 2:
return True
# The constant in the next line is the product of all primes < 200
if simple_gcd(n, 7799922041683461553249199106329813876687996789903550945093032474868511536164700810) > 1:
return True
# TODO: maybe do some iterations of Miller-Rabin to increase confidence
# (https://en.wikipedia.org/wiki/Miller%E2%80%93Rabin_primality_test)
return False

View file

@ -1777,7 +1777,7 @@ def main():
# Fail if no backend has been found # Fail if no backend has been found
if backend == 'auto': if backend == 'auto':
module.fail_json(msg=("Can't detect none of the required Python libraries " module.fail_json(msg=("Can't detect any of the required Python libraries "
"cryptography (>= {0}) or PyOpenSSL (>= {1})").format( "cryptography (>= {0}) or PyOpenSSL (>= {1})").format(
MINIMAL_CRYPTOGRAPHY_VERSION, MINIMAL_CRYPTOGRAPHY_VERSION,
MINIMAL_PYOPENSSL_VERSION)) MINIMAL_PYOPENSSL_VERSION))

View file

@ -637,8 +637,19 @@ class CertificateInfoPyOpenSSL(CertificateInfo):
self.cert.get_pubkey() self.cert.get_pubkey()
) )
except AttributeError: except AttributeError:
self.module.warn('Your pyOpenSSL version does not support dumping public keys. ' try:
'Please upgrade to version 16.0 or newer, or use the cryptography backend.') # pyOpenSSL < 16.0:
bio = crypto._new_mem_buf()
if binary:
rc = crypto._lib.i2d_PUBKEY_bio(bio, self.cert.get_pubkey()._pkey)
else:
rc = crypto._lib.PEM_write_bio_PUBKEY(bio, self.cert.get_pubkey()._pkey)
if rc != 1:
crypto._raise_current_error()
return crypto._bio_to_string(bio)
except AttributeError:
self.module.warn('Your pyOpenSSL version does not support dumping public keys. '
'Please upgrade to version 16.0 or newer, or use the cryptography backend.')
def _get_serial_number(self): def _get_serial_number(self):
return self.cert.get_serial_number() return self.cert.get_serial_number()

View file

@ -880,8 +880,8 @@ def main():
# Success? # Success?
if backend == 'auto': if backend == 'auto':
module.fail_json(msg=('Can detect none of the Python libraries ' module.fail_json(msg=("Can't detect any of the required Python libraries "
'cryptography (>= {0}) and pyOpenSSL (>= {1})').format( "cryptography (>= {0}) or PyOpenSSL (>= {1})").format(
MINIMAL_CRYPTOGRAPHY_VERSION, MINIMAL_CRYPTOGRAPHY_VERSION,
MINIMAL_PYOPENSSL_VERSION)) MINIMAL_PYOPENSSL_VERSION))
try: try:

View file

@ -56,7 +56,7 @@ options:
- Depending on the curve, you need a newer version of the cryptography backend. - Depending on the curve, you need a newer version of the cryptography backend.
type: str type: str
default: RSA default: RSA
#choices: [ DSA, ECC, RSA, X448, X25519 ] #choices: [ DSA, ECC, RSA, X448, X25519, Ed448, Ed25519 ]
choices: [ DSA, ECC, RSA ] choices: [ DSA, ECC, RSA ]
curve: curve:
description: description:
@ -246,6 +246,16 @@ else:
CRYPTOGRAPHY_HAS_X448 = True CRYPTOGRAPHY_HAS_X448 = True
except ImportError: except ImportError:
CRYPTOGRAPHY_HAS_X448 = False CRYPTOGRAPHY_HAS_X448 = False
try:
import cryptography.hazmat.primitives.asymmetric.ed25519
CRYPTOGRAPHY_HAS_ED25519 = True
except ImportError:
CRYPTOGRAPHY_HAS_ED25519 = False
try:
import cryptography.hazmat.primitives.asymmetric.ed448
CRYPTOGRAPHY_HAS_ED448 = True
except ImportError:
CRYPTOGRAPHY_HAS_ED448 = False
from ansible.module_utils import crypto as crypto_utils from ansible.module_utils import crypto as crypto_utils
from ansible.module_utils._text import to_native, to_bytes from ansible.module_utils._text import to_native, to_bytes
@ -459,6 +469,10 @@ class PrivateKeyCryptography(PrivateKeyBase):
self.module.fail_json(msg='Your cryptography version does not support X25519') self.module.fail_json(msg='Your cryptography version does not support X25519')
if not CRYPTOGRAPHY_HAS_X448 and self.type == 'X448': if not CRYPTOGRAPHY_HAS_X448 and self.type == 'X448':
self.module.fail_json(msg='Your cryptography version does not support X448') self.module.fail_json(msg='Your cryptography version does not support X448')
if not CRYPTOGRAPHY_HAS_ED25519 and self.type == 'Ed25519':
self.module.fail_json(msg='Your cryptography version does not support Ed25519')
if not CRYPTOGRAPHY_HAS_ED448 and self.type == 'Ed448':
self.module.fail_json(msg='Your cryptography version does not support Ed448')
def _generate_private_key_data(self): def _generate_private_key_data(self):
try: try:
@ -477,6 +491,10 @@ class PrivateKeyCryptography(PrivateKeyBase):
self.privatekey = cryptography.hazmat.primitives.asymmetric.x25519.X25519PrivateKey.generate() self.privatekey = cryptography.hazmat.primitives.asymmetric.x25519.X25519PrivateKey.generate()
if CRYPTOGRAPHY_HAS_X448 and self.type == 'X448': if CRYPTOGRAPHY_HAS_X448 and self.type == 'X448':
self.privatekey = cryptography.hazmat.primitives.asymmetric.x448.X448PrivateKey.generate() self.privatekey = cryptography.hazmat.primitives.asymmetric.x448.X448PrivateKey.generate()
if CRYPTOGRAPHY_HAS_ED25519 and self.type == 'Ed25519':
self.privatekey = cryptography.hazmat.primitives.asymmetric.ed25519.Ed25519PrivateKey.generate()
if CRYPTOGRAPHY_HAS_ED448 and self.type == 'Ed448':
self.privatekey = cryptography.hazmat.primitives.asymmetric.ed448.Ed448PrivateKey.generate()
if self.type == 'ECC' and self.curve in self.curves: if self.type == 'ECC' and self.curve in self.curves:
if self.curves[self.curve]['deprecated']: if self.curves[self.curve]['deprecated']:
self.module.warn('Elliptic curves of type {0} should not be used for new keys!'.format(self.curve)) self.module.warn('Elliptic curves of type {0} should not be used for new keys!'.format(self.curve))
@ -573,9 +591,9 @@ def main():
size=dict(type='int', default=4096), size=dict(type='int', default=4096),
type=dict(type='str', default='RSA', choices=[ type=dict(type='str', default='RSA', choices=[
'RSA', 'DSA', 'ECC', 'RSA', 'DSA', 'ECC',
# x25519 is missing serialization functions: https://github.com/pyca/cryptography/issues/4386 # FIXME: NO LONGER TRUE: x25519 is missing serialization functions: https://github.com/pyca/cryptography/issues/4386
# x448 is also missing it: https://github.com/pyca/cryptography/pull/4580#issuecomment-437913340 # FIXME: NO LONGER TRUE: x448 is also missing it: https://github.com/pyca/cryptography/pull/4580#issuecomment-437913340
# 'X448', 'X25519', # 'X448', 'X25519', 'Ed448', 'Ed25519'
]), ]),
curve=dict(type='str', choices=[ curve=dict(type='str', choices=[
'secp384r1', 'secp521r1', 'secp224r1', 'secp192r1', 'secp256k1', 'secp384r1', 'secp521r1', 'secp224r1', 'secp192r1', 'secp256k1',
@ -629,8 +647,8 @@ def main():
# Success? # Success?
if backend == 'auto': if backend == 'auto':
module.fail_json(msg=('Can detect none of the Python libraries ' module.fail_json(msg=("Can't detect any of the required Python libraries "
'cryptography (>= {0}) and pyOpenSSL (>= {1})').format( "cryptography (>= {0}) or PyOpenSSL (>= {1})").format(
MINIMAL_CRYPTOGRAPHY_VERSION, MINIMAL_CRYPTOGRAPHY_VERSION,
MINIMAL_PYOPENSSL_VERSION)) MINIMAL_PYOPENSSL_VERSION))
try: try:

View file

@ -0,0 +1,627 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# Copyright: (c) 2016-2017, Yanis Guenane <yanis+ansible@guenane.org>
# Copyright: (c) 2017, Markus Teufelberger <mteufelberger+ansible@mgit.at>
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import absolute_import, division, print_function
__metaclass__ = type
ANSIBLE_METADATA = {'metadata_version': '1.1',
'status': ['preview'],
'supported_by': 'community'}
DOCUMENTATION = r'''
---
module: openssl_privatekey_info
version_added: '2.8'
short_description: Provide information for OpenSSL private keys
description:
- This module allows one to query information on OpenSSL private keys.
- In case the key consistency checks fail, the module will fail as this indicates a faked
private key. In this case, all return variables are still returned. Note that key consistency
checks are not available all key types; if none is available, C(none) is returned for
C(key_is_consistent).
- It uses the pyOpenSSL or cryptography python library to interact with OpenSSL. If both the
cryptography and PyOpenSSL libraries are available (and meet the minimum version requirements)
cryptography will be preferred as a backend over PyOpenSSL (unless the backend is forced with
C(select_crypto_backend))
requirements:
- PyOpenSSL >= 0.15 or cryptography >= 1.2.3
author:
- Felix Fontein (@felixfontein)
- Yanis Guenane (@Spredzy)
options:
path:
description:
- Remote absolute path where the private key file is loaded from.
type: path
required: true
passphrase:
description:
- The passphrase for the private key.
type: str
return_private_key_data:
description:
- Whether to return private key data.
- Only set this to C(yes) when you want private information about this key to
leave the remote machine.
- "WARNING: you have to make sure that private key data isn't accidentally logged!"
type: bool
default: no
select_crypto_backend:
description:
- Determines which crypto backend to use.
- The default choice is C(auto), which tries to use C(cryptography) if available, and falls back to C(pyopenssl).
- If set to C(pyopenssl), will try to use the L(pyOpenSSL,https://pypi.org/project/pyOpenSSL/) library.
- If set to C(cryptography), will try to use the L(cryptography,https://cryptography.io/) library.
type: str
default: auto
choices: [ auto, cryptography, pyopenssl ]
seealso:
- module: openssl_privatekey
'''
EXAMPLES = r'''
- name: Generate an OpenSSL private key with the default values (4096 bits, RSA)
openssl_privatekey:
path: /etc/ssl/private/ansible.com.pem
- name: Get information on generated key
openssl_privatekey_info:
path: /etc/ssl/private/ansible.com.pem
register: result
- name: Dump information
debug:
var: result
'''
RETURN = r'''
can_load_key:
description: Whether the module was able to load the private key from disk
returned: always
type: bool
can_parse_key:
description: Whether the module was able to parse the private key
returned: always
type: bool
key_is_consistent:
description:
- Whether the key is consistent. Can also return C(none) next to C(yes) and
C(no), to indicate that consistency couldn't be checked.
- In case the check returns C(no), the module will fail.
returned: always
type: bool
public_key:
description: Private key's public key in PEM format
returned: success
type: str
sample: "-----BEGIN PUBLIC KEY-----\nMIICIjANBgkqhkiG9w0BAQEFAAOCAg8A..."
public_key_fingerprints:
description:
- Fingerprints of private key's public key.
- For every hash algorithm available, the fingerprint is computed.
returned: success
type: dict
sample: "{'sha256': 'd4:b3:aa:6d:c8:04:ce:4e:ba:f6:29:4d:92:a3:94:b0:c2:ff:bd:bf:33:63:11:43:34:0f:51:b0:95:09:2f:63',
'sha512': 'f7:07:4a:f0:b0:f0:e6:8b:95:5f:f9:e6:61:0a:32:68:f1..."
type:
description:
- The key's type.
- One of C(RSA), C(DSA), C(ECC), C(Ed25519), C(X25519), C(Ed448), or C(X448).
- Will start with C(unknown) if the key type cannot be determined.
returned: success
type: str
sample: RSA
public_data:
description:
- Public key data. Depends on key type.
returned: success
type: dict
private_data:
description:
- Private key data. Depends on key type.
returned: success and when I(return_private_key_data) is set to C(yes)
type: dict
'''
import abc
import os
import traceback
from distutils.version import LooseVersion
from ansible.module_utils import crypto as crypto_utils
from ansible.module_utils.basic import AnsibleModule, missing_required_lib
from ansible.module_utils._text import to_native, to_bytes
MINIMAL_CRYPTOGRAPHY_VERSION = '1.2.3'
MINIMAL_PYOPENSSL_VERSION = '0.15'
PYOPENSSL_IMP_ERR = None
try:
import OpenSSL
from OpenSSL import crypto
PYOPENSSL_VERSION = LooseVersion(OpenSSL.__version__)
except ImportError:
PYOPENSSL_IMP_ERR = traceback.format_exc()
PYOPENSSL_FOUND = False
else:
PYOPENSSL_FOUND = True
CRYPTOGRAPHY_IMP_ERR = None
try:
import cryptography
from cryptography.hazmat.primitives import serialization
CRYPTOGRAPHY_VERSION = LooseVersion(cryptography.__version__)
try:
import cryptography.hazmat.primitives.asymmetric.x25519
CRYPTOGRAPHY_HAS_X25519 = True
except ImportError:
CRYPTOGRAPHY_HAS_X25519 = False
try:
import cryptography.hazmat.primitives.asymmetric.x448
CRYPTOGRAPHY_HAS_X448 = True
except ImportError:
CRYPTOGRAPHY_HAS_X448 = False
try:
import cryptography.hazmat.primitives.asymmetric.ed25519
CRYPTOGRAPHY_HAS_ED25519 = True
except ImportError:
CRYPTOGRAPHY_HAS_ED25519 = False
try:
import cryptography.hazmat.primitives.asymmetric.ed448
CRYPTOGRAPHY_HAS_ED448 = True
except ImportError:
CRYPTOGRAPHY_HAS_ED448 = False
except ImportError:
CRYPTOGRAPHY_IMP_ERR = traceback.format_exc()
CRYPTOGRAPHY_FOUND = False
else:
CRYPTOGRAPHY_FOUND = True
SIGNATURE_TEST_DATA = b'1234'
def _get_cryptography_key_info(key):
key_public_data = dict()
key_private_data = dict()
if isinstance(key, cryptography.hazmat.primitives.asymmetric.rsa.RSAPrivateKey):
key_type = 'RSA'
key_public_data['size'] = key.key_size
key_public_data['modulus'] = key.public_key().public_numbers().n
key_public_data['exponent'] = key.public_key().public_numbers().e
key_private_data['p'] = key.private_numbers().p
key_private_data['q'] = key.private_numbers().q
key_private_data['exponent'] = key.private_numbers().d
elif isinstance(key, cryptography.hazmat.primitives.asymmetric.dsa.DSAPrivateKey):
key_type = 'DSA'
key_public_data['size'] = key.key_size
key_public_data['p'] = key.parameters().parameter_numbers().p
key_public_data['q'] = key.parameters().parameter_numbers().q
key_public_data['g'] = key.parameters().parameter_numbers().g
key_public_data['y'] = key.public_key().public_numbers().y
key_private_data['x'] = key.private_numbers().x
elif CRYPTOGRAPHY_HAS_X25519 and isinstance(key, cryptography.hazmat.primitives.asymmetric.x25519.X25519PrivateKey):
key_type = 'X25519'
elif CRYPTOGRAPHY_HAS_X448 and isinstance(key, cryptography.hazmat.primitives.asymmetric.x448.X448PrivateKey):
key_type = 'X448'
elif CRYPTOGRAPHY_HAS_ED25519 and isinstance(key, cryptography.hazmat.primitives.asymmetric.ed25519.Ed25519PrivateKey):
key_type = 'Ed25519'
elif CRYPTOGRAPHY_HAS_ED448 and isinstance(key, cryptography.hazmat.primitives.asymmetric.ed448.Ed448PrivateKey):
key_type = 'Ed448'
elif isinstance(key, cryptography.hazmat.primitives.asymmetric.ec.EllipticCurvePrivateKey):
key_type = 'ECC'
key_public_data['curve'] = key.public_key().curve.name
key_public_data['x'] = key.public_key().public_numbers().x
key_public_data['y'] = key.public_key().public_numbers().y
key_public_data['exponent_size'] = key.public_key().curve.key_size
key_private_data['multiplier'] = key.private_numbers().private_value
else:
key_type = 'unknown ({0})'.format(type(key))
return key_type, key_public_data, key_private_data
def _check_dsa_consistency(key_public_data, key_private_data):
# Get parameters
p = key_public_data.get('p')
q = key_public_data.get('q')
g = key_public_data.get('g')
y = key_public_data.get('y')
x = key_private_data.get('x')
for v in (p, q, g, y, x):
if v is None:
return None
# Make sure that g is not 0, 1 or -1 in Z/pZ
if g < 2 or g >= p - 1:
return False
# Make sure that x is in range
if x < 1 or x >= q:
return False
# Check whether q divides p-1
if (p - 1) % q != 0:
return False
# Check that g**q mod p == 1
if crypto_utils.binary_exp_mod(g, q, p) != 1:
return False
# Check whether g**x mod p == y
if crypto_utils.binary_exp_mod(g, x, p) != y:
return False
# Check (quickly) whether p or q are not primes
if crypto_utils.quick_is_not_prime(q) or crypto_utils.quick_is_not_prime(p):
return False
return True
def _is_cryptography_key_consistent(key, key_public_data, key_private_data):
if isinstance(key, cryptography.hazmat.primitives.asymmetric.rsa.RSAPrivateKey):
return bool(key._backend._lib.RSA_check_key(key._rsa_cdata))
if isinstance(key, cryptography.hazmat.primitives.asymmetric.dsa.DSAPrivateKey):
result = _check_dsa_consistency(key_public_data, key_private_data)
if result is not None:
return result
try:
signature = key.sign(SIGNATURE_TEST_DATA, cryptography.hazmat.primitives.hashes.SHA256())
except AttributeError:
# sign() was added in cryptography 1.5, but we support older versions
return None
try:
key.public_key().verify(
signature,
SIGNATURE_TEST_DATA,
cryptography.hazmat.primitives.hashes.SHA256()
)
return True
except cryptography.exceptions.InvalidSignature:
return False
if isinstance(key, cryptography.hazmat.primitives.asymmetric.ec.EllipticCurvePrivateKey):
try:
signature = key.sign(
SIGNATURE_TEST_DATA,
cryptography.hazmat.primitives.asymmetric.ec.ECDSA(cryptography.hazmat.primitives.hashes.SHA256())
)
except AttributeError:
# sign() was added in cryptography 1.5, but we support older versions
return None
try:
key.public_key().verify(
signature,
SIGNATURE_TEST_DATA,
cryptography.hazmat.primitives.asymmetric.ec.ECDSA(cryptography.hazmat.primitives.hashes.SHA256())
)
return True
except cryptography.exceptions.InvalidSignature:
return False
has_simple_sign_function = False
if CRYPTOGRAPHY_HAS_ED25519 and isinstance(key, cryptography.hazmat.primitives.asymmetric.ed25519.Ed25519PrivateKey):
has_simple_sign_function = True
if CRYPTOGRAPHY_HAS_ED448 and isinstance(key, cryptography.hazmat.primitives.asymmetric.ed448.Ed448PrivateKey):
has_simple_sign_function = True
if has_simple_sign_function:
signature = key.sign(SIGNATURE_TEST_DATA)
try:
key.public_key().verify(signature, SIGNATURE_TEST_DATA)
return True
except cryptography.exceptions.InvalidSignature:
return False
# For X25519 and X448, there's no test yet.
return None
class PrivateKeyInfo(crypto_utils.OpenSSLObject):
def __init__(self, module, backend):
super(PrivateKeyInfo, self).__init__(
module.params['path'],
'present',
False,
module.check_mode,
)
self.backend = backend
self.module = module
self.passphrase = module.params['passphrase']
self.return_private_key_data = module.params['return_private_key_data']
def generate(self):
# Empty method because crypto_utils.OpenSSLObject wants this
pass
def dump(self):
# Empty method because crypto_utils.OpenSSLObject wants this
pass
@abc.abstractmethod
def _get_public_key(self, binary):
pass
@abc.abstractmethod
def _get_key_info(self):
pass
@abc.abstractmethod
def _is_key_consistent(self, key_public_data, key_private_data):
pass
def get_info(self):
result = dict(
can_load_key=False,
can_parse_key=False,
key_is_consistent=None,
)
try:
with open(self.path, 'rb') as b_priv_key_fh:
priv_key_detail = b_priv_key_fh.read()
result['can_load_key'] = True
except (IOError, OSError) as exc:
self.module.fail_json(msg=to_native(exc), **result)
try:
self.key = crypto_utils.load_privatekey(
path=None,
content=priv_key_detail,
passphrase=to_bytes(self.passphrase) if self.passphrase is not None else self.passphrase,
backend=self.backend
)
result['can_parse_key'] = True
except crypto_utils.OpenSSLObjectError as exc:
self.module.fail_json(msg=to_native(exc), **result)
result['public_key'] = self._get_public_key(binary=False)
pk = self._get_public_key(binary=True)
result['public_key_fingerprints'] = crypto_utils.get_fingerprint_of_bytes(pk) if pk is not None else dict()
key_type, key_public_data, key_private_data = self._get_key_info()
result['type'] = key_type
result['public_data'] = key_public_data
if self.return_private_key_data:
result['private_data'] = key_private_data
result['key_is_consistent'] = self._is_key_consistent(key_public_data, key_private_data)
if result['key_is_consistent'] is False:
# Only fail when it is False, to avoid to fail on None (which means "we don't know")
result['key_is_consistent'] = False
self.module.fail_json(
msg="Private key is not consistent! (See "
"https://blog.hboeck.de/archives/888-How-I-tricked-Symantec-with-a-Fake-Private-Key.html)",
**result
)
return result
class PrivateKeyInfoCryptography(PrivateKeyInfo):
"""Validate the supplied private key, using the cryptography backend"""
def __init__(self, module):
super(PrivateKeyInfoCryptography, self).__init__(module, 'cryptography')
def _get_public_key(self, binary):
return self.key.public_key().public_bytes(
serialization.Encoding.DER if binary else serialization.Encoding.PEM,
serialization.PublicFormat.SubjectPublicKeyInfo
)
def _get_key_info(self):
return _get_cryptography_key_info(self.key)
def _is_key_consistent(self, key_public_data, key_private_data):
return _is_cryptography_key_consistent(self.key, key_public_data, key_private_data)
class PrivateKeyInfoPyOpenSSL(PrivateKeyInfo):
"""validate the supplied private key."""
def __init__(self, module):
super(PrivateKeyInfoPyOpenSSL, self).__init__(module, 'pyopenssl')
def _get_public_key(self, binary):
try:
return crypto.dump_publickey(
crypto.FILETYPE_ASN1 if binary else crypto.FILETYPE_PEM,
self.key
)
except AttributeError:
try:
# pyOpenSSL < 16.0:
bio = crypto._new_mem_buf()
if binary:
rc = crypto._lib.i2d_PUBKEY_bio(bio, self.key._pkey)
else:
rc = crypto._lib.PEM_write_bio_PUBKEY(bio, self.key._pkey)
if rc != 1:
crypto._raise_current_error()
return crypto._bio_to_string(bio)
except AttributeError:
self.module.warn('Your pyOpenSSL version does not support dumping public keys. '
'Please upgrade to version 16.0 or newer, or use the cryptography backend.')
def bigint_to_int(self, bn):
'''Convert OpenSSL BIGINT to Python integer'''
if bn == OpenSSL._util.ffi.NULL:
return None
try:
hex = OpenSSL._util.lib.BN_bn2hex(bn)
return int(OpenSSL._util.ffi.string(hex), 16)
finally:
OpenSSL._util.lib.OPENSSL_free(hex)
def _get_key_info(self):
key_public_data = dict()
key_private_data = dict()
openssl_key_type = self.key.type()
try_fallback = True
if crypto.TYPE_RSA == openssl_key_type:
key_type = 'RSA'
key_public_data['size'] = self.key.bits()
try:
# Use OpenSSL directly to extract key data
key = OpenSSL._util.lib.EVP_PKEY_get1_RSA(self.key._pkey)
key = OpenSSL._util.ffi.gc(key, OpenSSL._util.lib.RSA_free)
# OpenSSL 1.1 and newer have functions to extract the parameters
# from the EVP PKEY data structures. Older versions didn't have
# these getters, and it was common use to simply access the values
# directly. Since there's no guarantee that these data structures
# will still be accessible in the future, we use the getters for
# 1.1 and later, and directly access the values for 1.0.x and
# earlier.
if OpenSSL.SSL.OPENSSL_VERSION_NUMBER >= 0x10100000:
# Get modulus and exponents
n = OpenSSL._util.ffi.new("BIGNUM **")
e = OpenSSL._util.ffi.new("BIGNUM **")
d = OpenSSL._util.ffi.new("BIGNUM **")
OpenSSL._util.lib.RSA_get0_key(key, n, e, d)
key_public_data['modulus'] = self.bigint_to_int(n[0])
key_public_data['exponent'] = self.bigint_to_int(e[0])
key_private_data['exponent'] = self.bigint_to_int(d[0])
# Get factors
p = OpenSSL._util.ffi.new("BIGNUM **")
q = OpenSSL._util.ffi.new("BIGNUM **")
OpenSSL._util.lib.RSA_get0_factors(key, p, q)
key_private_data['p'] = self.bigint_to_int(p[0])
key_private_data['q'] = self.bigint_to_int(q[0])
else:
# Get modulus and exponents
key_public_data['modulus'] = self.bigint_to_int(key.n)
key_public_data['exponent'] = self.bigint_to_int(key.e)
key_private_data['exponent'] = self.bigint_to_int(key.d)
# Get factors
key_private_data['p'] = self.bigint_to_int(key.p)
key_private_data['q'] = self.bigint_to_int(key.q)
try_fallback = False
except AttributeError:
# Use fallback if available
pass
elif crypto.TYPE_DSA == openssl_key_type:
key_type = 'DSA'
key_public_data['size'] = self.key.bits()
try:
# Use OpenSSL directly to extract key data
key = OpenSSL._util.lib.EVP_PKEY_get1_DSA(self.key._pkey)
key = OpenSSL._util.ffi.gc(key, OpenSSL._util.lib.DSA_free)
# OpenSSL 1.1 and newer have functions to extract the parameters
# from the EVP PKEY data structures. Older versions didn't have
# these getters, and it was common use to simply access the values
# directly. Since there's no guarantee that these data structures
# will still be accessible in the future, we use the getters for
# 1.1 and later, and directly access the values for 1.0.x and
# earlier.
if OpenSSL.SSL.OPENSSL_VERSION_NUMBER >= 0x10100000:
# Get public parameters (primes and group element)
p = OpenSSL._util.ffi.new("BIGNUM **")
q = OpenSSL._util.ffi.new("BIGNUM **")
g = OpenSSL._util.ffi.new("BIGNUM **")
OpenSSL._util.lib.DSA_get0_pqg(key, p, q, g)
key_public_data['p'] = self.bigint_to_int(p[0])
key_public_data['q'] = self.bigint_to_int(q[0])
key_public_data['g'] = self.bigint_to_int(g[0])
# Get public and private key exponents
y = OpenSSL._util.ffi.new("BIGNUM **")
x = OpenSSL._util.ffi.new("BIGNUM **")
OpenSSL._util.lib.DSA_get0_key(key, y, x)
key_public_data['y'] = self.bigint_to_int(y[0])
key_private_data['x'] = self.bigint_to_int(x[0])
else:
# Get public parameters (primes and group element)
key_public_data['p'] = self.bigint_to_int(key.p)
key_public_data['q'] = self.bigint_to_int(key.q)
key_public_data['g'] = self.bigint_to_int(key.g)
# Get public and private key exponents
key_public_data['y'] = self.bigint_to_int(key.pub_key)
key_private_data['x'] = self.bigint_to_int(key.priv_key)
try_fallback = False
except AttributeError:
# Use fallback if available
pass
else:
# Return 'unknown'
key_type = 'unknown ({0})'.format(self.key.type())
# If needed and if possible, fall back to cryptography
if try_fallback and PYOPENSSL_VERSION >= LooseVersion('16.1.0') and CRYPTOGRAPHY_FOUND:
return _get_cryptography_key_info(self.key.to_cryptography_key())
return key_type, key_public_data, key_private_data
def _is_key_consistent(self, key_public_data, key_private_data):
openssl_key_type = self.key.type()
if crypto.TYPE_RSA == openssl_key_type:
try:
return self.key.check()
except crypto.Error:
# OpenSSL error means that key is not consistent
return False
if crypto.TYPE_DSA == openssl_key_type:
result = _check_dsa_consistency(key_public_data, key_private_data)
if result is not None:
return result
signature = crypto.sign(self.key, SIGNATURE_TEST_DATA, 'sha256')
# Verify wants a cert (where it can get the public key from)
cert = crypto.X509()
cert.set_pubkey(self.key)
try:
crypto.verify(cert, signature, SIGNATURE_TEST_DATA, 'sha256')
return True
except crypto.Error:
return False
# If needed and if possible, fall back to cryptography
if PYOPENSSL_VERSION >= LooseVersion('16.1.0') and CRYPTOGRAPHY_FOUND:
return _is_cryptography_key_consistent(self.key.to_cryptography_key(), key_public_data, key_private_data)
return None
def main():
module = AnsibleModule(
argument_spec=dict(
path=dict(type='path', required=True),
passphrase=dict(type='str', no_log=True),
return_private_key_data=dict(type='bool', default=False),
select_crypto_backend=dict(type='str', default='auto', choices=['auto', 'cryptography', 'pyopenssl']),
),
supports_check_mode=True,
)
try:
base_dir = os.path.dirname(module.params['path']) or '.'
if not os.path.isdir(base_dir):
module.fail_json(
name=base_dir,
msg='The directory %s does not exist or the file is not a directory' % base_dir
)
backend = module.params['select_crypto_backend']
if backend == 'auto':
# Detect what backend we can use
can_use_cryptography = CRYPTOGRAPHY_FOUND and CRYPTOGRAPHY_VERSION >= LooseVersion(MINIMAL_CRYPTOGRAPHY_VERSION)
can_use_pyopenssl = PYOPENSSL_FOUND and PYOPENSSL_VERSION >= LooseVersion(MINIMAL_PYOPENSSL_VERSION)
# If cryptography is available we'll use it
if can_use_cryptography:
backend = 'cryptography'
elif can_use_pyopenssl:
backend = 'pyopenssl'
# Fail if no backend has been found
if backend == 'auto':
module.fail_json(msg=("Can't detect any of the required Python libraries "
"cryptography (>= {0}) or PyOpenSSL (>= {1})").format(
MINIMAL_CRYPTOGRAPHY_VERSION,
MINIMAL_PYOPENSSL_VERSION))
if backend == 'pyopenssl':
if not PYOPENSSL_FOUND:
module.fail_json(msg=missing_required_lib('pyOpenSSL'), exception=PYOPENSSL_IMP_ERR)
privatekey = PrivateKeyInfoPyOpenSSL(module)
elif backend == 'cryptography':
if not CRYPTOGRAPHY_FOUND:
module.fail_json(msg=missing_required_lib('cryptography'), exception=CRYPTOGRAPHY_IMP_ERR)
privatekey = PrivateKeyInfoCryptography(module)
result = privatekey.get_info()
module.exit_json(**result)
except crypto_utils.OpenSSLObjectError as exc:
module.fail_json(msg=to_native(exc))
if __name__ == "__main__":
main()

View file

@ -0,0 +1,2 @@
shippable/posix/group1
destructive

View file

@ -0,0 +1,2 @@
dependencies:
- setup_openssl

View file

@ -0,0 +1,167 @@
---
- debug:
msg: "Executing tests with backend {{ select_crypto_backend }}"
- name: ({{select_crypto_backend}}) Get key 1 info
openssl_privatekey_info:
path: '{{ output_dir }}/privatekey_1.pem'
select_crypto_backend: '{{ select_crypto_backend }}'
register: result
- name: Check that RSA key info is ok
assert:
that:
- "'public_key' in result"
- "'public_key_fingerprints' in result"
- "'type' in result"
- "result.type == 'RSA'"
- "'public_data' in result"
- "2 ** (result.public_data.size - 1) < result.public_data.modulus < 2 ** result.public_data.size"
- "result.public_data.exponent > 5"
- "'private_data' not in result"
- name: Update result list
set_fact:
info_results: "{{ info_results | combine({'key1': result}) }}"
- name: ({{select_crypto_backend}}) Get key 2 info
openssl_privatekey_info:
path: '{{ output_dir }}/privatekey_2.pem'
return_private_key_data: yes
select_crypto_backend: '{{ select_crypto_backend }}'
register: result
- name: Check that RSA key info is ok
assert:
that:
- "'public_key' in result"
- "'public_key_fingerprints' in result"
- "'type' in result"
- "result.type == 'RSA'"
- "'public_data' in result"
- "result.public_data.size == 2048"
- "2 ** (result.public_data.size - 1) < result.public_data.modulus < 2 ** result.public_data.size"
- "result.public_data.exponent > 5"
- "'private_data' in result"
- "result.public_data.modulus == result.private_data.p * result.private_data.q"
- "result.private_data.exponent > 5"
- name: Update result list
set_fact:
info_results: "{{ info_results | combine({'key2': result}) }}"
- name: ({{select_crypto_backend}}) Get key 3 info (without passphrase)
openssl_privatekey_info:
path: '{{ output_dir }}/privatekey_3.pem'
return_private_key_data: yes
select_crypto_backend: '{{ select_crypto_backend }}'
ignore_errors: yes
register: result
- name: Check that loading passphrase protected key without passphrase failed
assert:
that:
- result is failed
# Check that return values are there
- result.can_load_key is defined
- result.can_parse_key is defined
# Check that return values are correct
- result.can_load_key
- not result.can_parse_key
# Check that additional data isn't there
- "'pulic_key' not in result"
- "'pulic_key_fingerprints' not in result"
- "'type' not in result"
- "'public_data' not in result"
- "'private_data' not in result"
- name: ({{select_crypto_backend}}) Get key 3 info (with passphrase)
openssl_privatekey_info:
path: '{{ output_dir }}/privatekey_3.pem'
passphrase: hunter2
return_private_key_data: yes
select_crypto_backend: '{{ select_crypto_backend }}'
register: result
- name: Check that RSA key info is ok
assert:
that:
- "'public_key' in result"
- "'public_key_fingerprints' in result"
- "'type' in result"
- "result.type == 'RSA'"
- "'public_data' in result"
- "2 ** (result.public_data.size - 1) < result.public_data.modulus < 2 ** result.public_data.size"
- "result.public_data.exponent > 5"
- "'private_data' in result"
- "result.public_data.modulus == result.private_data.p * result.private_data.q"
- "result.private_data.exponent > 5"
- name: Update result list
set_fact:
info_results: "{{ info_results | combine({'key3': result}) }}"
- name: ({{select_crypto_backend}}) Get key 4 info
openssl_privatekey_info:
path: '{{ output_dir }}/privatekey_4.pem'
return_private_key_data: yes
select_crypto_backend: '{{ select_crypto_backend }}'
register: result
- block:
- name: Check that ECC key info is ok
assert:
that:
- "'public_key' in result"
- "'public_key_fingerprints' in result"
- "'type' in result"
- "result.type == 'ECC'"
- "'public_data' in result"
- "result.public_data.curve is string"
- "result.public_data.x != 0"
- "result.public_data.y != 0"
- "result.public_data.exponent_size == (521 if (ansible_distribution == 'CentOS' and ansible_distribution_major_version == '6') else 256)"
- "'private_data' in result"
- "result.private_data.multiplier > 1024"
- name: Update result list
set_fact:
info_results: "{{ info_results | combine({'key4': result}) }}"
when: select_crypto_backend != 'pyopenssl' or (pyopenssl_version.stdout is version('16.1.0', '>=') and cryptography_version.stdout is version('0.0', '>'))
- name: Check that ECC key info is ok
assert:
that:
- "'public_key' in result"
- "'public_key_fingerprints' in result"
- "'type' in result"
- "result.type.startswith('unknown ')"
- "'public_data' in result"
- "'private_data' in result"
when: select_crypto_backend == 'pyopenssl' and not (pyopenssl_version.stdout is version('16.1.0', '>=') and cryptography_version.stdout is version('0.0', '>'))
- name: ({{select_crypto_backend}}) Get key 5 info
openssl_privatekey_info:
path: '{{ output_dir }}/privatekey_5.pem'
return_private_key_data: yes
select_crypto_backend: '{{ select_crypto_backend }}'
register: result
- name: Check that DSA key info is ok
assert:
that:
- "'public_key' in result"
- "'public_key_fingerprints' in result"
- "'type' in result"
- "result.type == 'DSA'"
- "'public_data' in result"
- "result.public_data.p > 2"
- "result.public_data.q > 2"
- "result.public_data.g >= 2"
- "result.public_data.y > 2"
- "'private_data' in result"
- "result.private_data.x > 2"
- name: Update result list
set_fact:
info_results: "{{ info_results | combine({'key5': result}) }}"

View file

@ -0,0 +1,70 @@
---
- name: Generate privatekey 1
openssl_privatekey:
path: '{{ output_dir }}/privatekey_1.pem'
- name: Generate privatekey 2 (less bits)
openssl_privatekey:
path: '{{ output_dir }}/privatekey_2.pem'
type: RSA
size: 2048
- name: Generate privatekey 3 (with password)
openssl_privatekey:
path: '{{ output_dir }}/privatekey_3.pem'
passphrase: hunter2
cipher: auto
select_crypto_backend: cryptography
- name: Generate privatekey 4 (ECC)
openssl_privatekey:
path: '{{ output_dir }}/privatekey_4.pem'
type: ECC
curve: "{{ (ansible_distribution == 'CentOS' and ansible_distribution_major_version == '6') | ternary('secp521r1', 'secp256k1') }}"
# ^ cryptography on CentOS6 doesn't support secp256k1, so we use secp521r1 instead
select_crypto_backend: cryptography
- name: Generate privatekey 5 (DSA)
openssl_privatekey:
path: '{{ output_dir }}/privatekey_5.pem'
type: DSA
size: 1024
- name: Prepare result list
set_fact:
info_results: {}
- name: Running tests with pyOpenSSL backend
include_tasks: impl.yml
vars:
select_crypto_backend: pyopenssl
when: pyopenssl_version.stdout is version('0.15', '>=')
- name: Prepare result list
set_fact:
pyopenssl_info_results: "{{ info_results }}"
info_results: {}
- name: Running tests with cryptography backend
include_tasks: impl.yml
vars:
select_crypto_backend: cryptography
when: cryptography_version.stdout is version('1.2.3', '>=')
- name: Prepare result list
set_fact:
cryptography_info_results: "{{ info_results }}"
- block:
- name: Dump pyOpenSSL results
debug:
var: pyopenssl_info_results
- name: Dump cryptography results
debug:
var: cryptography_info_results
- name: Compare results
assert:
that:
- pyopenssl_info_results[item] == cryptography_info_results[item]
loop: "{{ pyopenssl_info_results.keys() | intersect(cryptography_info_results.keys()) | list }}"
when: pyopenssl_version.stdout is version('0.15', '>=') and cryptography_version.stdout is version('1.2.3', '>=')