mirror of
https://github.com/ansible-collections/community.general.git
synced 2024-09-14 20:13:21 +02:00
openssl_* modules: private key errors (#54088)
* Improve error handling, in particular with respect to private key loading problems. * Add tests to validate that modules regenerate invalid input and don't crash. * Don't crash when input is invalid. * Create 'better' broken input. * Fix paths. * Simplifying pyOpenSSL error handling.
This commit is contained in:
parent
627c5e7f50
commit
90c067e947
21 changed files with 327 additions and 228 deletions
|
@ -111,13 +111,14 @@ def load_privatekey(path, passphrase=None, check_passphrase=True, backend='pyope
|
|||
priv_key_detail,
|
||||
to_bytes(passphrase or ''))
|
||||
except crypto.Error as e:
|
||||
if len(e.args) > 0 and len(e.args[0]) > 0 and e.args[0][0][2] == 'bad decrypt':
|
||||
# This happens in case we have the wrong passphrase.
|
||||
if passphrase is not None:
|
||||
raise OpenSSLBadPassphraseError('Wrong passphrase provided for private key!')
|
||||
else:
|
||||
raise OpenSSLBadPassphraseError('No passphrase provided, but private key is password-protected!')
|
||||
raise
|
||||
if len(e.args) > 0 and len(e.args[0]) > 0:
|
||||
if e.args[0][0][2] in ('bad decrypt', 'bad password read'):
|
||||
# This happens in case we have the wrong passphrase.
|
||||
if passphrase is not None:
|
||||
raise OpenSSLBadPassphraseError('Wrong passphrase provided for private key!')
|
||||
else:
|
||||
raise OpenSSLBadPassphraseError('No passphrase provided, but private key is password-protected!')
|
||||
raise OpenSSLObjectError('Error while deserializing key: {0}'.format(e))
|
||||
if check_passphrase:
|
||||
# Next we want to make sure that the key is actually protected by
|
||||
# a passphrase (in case we did try the empty string before, make
|
||||
|
@ -131,10 +132,11 @@ def load_privatekey(path, passphrase=None, check_passphrase=True, backend='pyope
|
|||
# key isn't password-protected
|
||||
raise OpenSSLBadPassphraseError('Passphrase provided, but private key is not password-protected!')
|
||||
except crypto.Error as e:
|
||||
if passphrase is None and len(e.args) > 0 and len(e.args[0]) > 0 and e.args[0][0][2] == 'bad decrypt':
|
||||
# The key is obviously protected by the empty string.
|
||||
# Don't do this at home (if it's possible at all)...
|
||||
raise OpenSSLBadPassphraseError('No passphrase provided, but private key is password-protected!')
|
||||
if passphrase is None and len(e.args) > 0 and len(e.args[0]) > 0:
|
||||
if e.args[0][0][2] in ('bad decrypt', 'bad password read'):
|
||||
# The key is obviously protected by the empty string.
|
||||
# Don't do this at home (if it's possible at all)...
|
||||
raise OpenSSLBadPassphraseError('No passphrase provided, but private key is password-protected!')
|
||||
elif backend == 'cryptography':
|
||||
try:
|
||||
result = load_pem_private_key(priv_key_detail,
|
||||
|
|
|
@ -671,7 +671,10 @@ class Certificate(crypto_utils.OpenSSLObject):
|
|||
if not state_and_perms:
|
||||
return False
|
||||
|
||||
self.cert = crypto_utils.load_certificate(self.path, backend=self.backend)
|
||||
try:
|
||||
self.cert = crypto_utils.load_certificate(self.path, backend=self.backend)
|
||||
except Exception as dummy:
|
||||
return False
|
||||
|
||||
if self.privatekey_path:
|
||||
try:
|
||||
|
@ -1691,103 +1694,97 @@ def main():
|
|||
add_file_common_args=True,
|
||||
)
|
||||
|
||||
if module.params['state'] == 'absent':
|
||||
certificate = CertificateAbsent(module)
|
||||
try:
|
||||
if module.params['state'] == 'absent':
|
||||
certificate = CertificateAbsent(module)
|
||||
|
||||
else:
|
||||
if module.params['provider'] != 'assertonly' and module.params['csr_path'] is None:
|
||||
module.fail_json(msg='csr_path is required when provider is not assertonly')
|
||||
else:
|
||||
if module.params['provider'] != 'assertonly' and module.params['csr_path'] is None:
|
||||
module.fail_json(msg='csr_path is required when provider is not assertonly')
|
||||
|
||||
base_dir = os.path.dirname(module.params['path']) or '.'
|
||||
if not os.path.isdir(base_dir):
|
||||
module.fail_json(
|
||||
name=base_dir,
|
||||
msg='The directory %s does not exist or the file is not a directory' % base_dir
|
||||
)
|
||||
base_dir = os.path.dirname(module.params['path']) or '.'
|
||||
if not os.path.isdir(base_dir):
|
||||
module.fail_json(
|
||||
name=base_dir,
|
||||
msg='The directory %s does not exist or the file is not a directory' % base_dir
|
||||
)
|
||||
|
||||
provider = module.params['provider']
|
||||
provider = module.params['provider']
|
||||
|
||||
backend = module.params['select_crypto_backend']
|
||||
if backend == 'auto':
|
||||
# Detect what backend we can use
|
||||
can_use_cryptography = CRYPTOGRAPHY_FOUND and CRYPTOGRAPHY_VERSION >= LooseVersion(MINIMAL_CRYPTOGRAPHY_VERSION)
|
||||
can_use_pyopenssl = PYOPENSSL_FOUND and PYOPENSSL_VERSION >= LooseVersion(MINIMAL_PYOPENSSL_VERSION)
|
||||
|
||||
# If cryptography is available we'll use it
|
||||
if can_use_cryptography:
|
||||
backend = 'cryptography'
|
||||
elif can_use_pyopenssl:
|
||||
backend = 'pyopenssl'
|
||||
|
||||
if module.params['selfsigned_version'] == 2 or module.params['ownca_version'] == 2:
|
||||
module.warn('crypto backend forced to pyopenssl. The cryptography library does not support v2 certificates')
|
||||
backend = 'pyopenssl'
|
||||
|
||||
# Fail if no backend has been found
|
||||
backend = module.params['select_crypto_backend']
|
||||
if backend == 'auto':
|
||||
module.fail_json(msg=("Can't detect none of the required Python libraries "
|
||||
"cryptography (>= {0}) or PyOpenSSL (>= {1})").format(
|
||||
MINIMAL_CRYPTOGRAPHY_VERSION,
|
||||
MINIMAL_PYOPENSSL_VERSION))
|
||||
# Detect what backend we can use
|
||||
can_use_cryptography = CRYPTOGRAPHY_FOUND and CRYPTOGRAPHY_VERSION >= LooseVersion(MINIMAL_CRYPTOGRAPHY_VERSION)
|
||||
can_use_pyopenssl = PYOPENSSL_FOUND and PYOPENSSL_VERSION >= LooseVersion(MINIMAL_PYOPENSSL_VERSION)
|
||||
|
||||
if backend == 'pyopenssl':
|
||||
if not PYOPENSSL_FOUND:
|
||||
module.fail_json(msg=missing_required_lib('pyOpenSSL'), exception=PYOPENSSL_IMP_ERR)
|
||||
if module.params['provider'] in ['selfsigned', 'ownca', 'assertonly']:
|
||||
try:
|
||||
getattr(crypto.X509Req, 'get_extensions')
|
||||
except AttributeError:
|
||||
module.fail_json(msg='You need to have PyOpenSSL>=0.15')
|
||||
# If cryptography is available we'll use it
|
||||
if can_use_cryptography:
|
||||
backend = 'cryptography'
|
||||
elif can_use_pyopenssl:
|
||||
backend = 'pyopenssl'
|
||||
|
||||
if provider == 'selfsigned':
|
||||
certificate = SelfSignedCertificate(module)
|
||||
elif provider == 'acme':
|
||||
certificate = AcmeCertificate(module, 'pyopenssl')
|
||||
elif provider == 'ownca':
|
||||
certificate = OwnCACertificate(module)
|
||||
else:
|
||||
certificate = AssertOnlyCertificate(module)
|
||||
elif backend == 'cryptography':
|
||||
if not CRYPTOGRAPHY_FOUND:
|
||||
module.fail_json(msg=missing_required_lib('cryptography'), exception=CRYPTOGRAPHY_IMP_ERR)
|
||||
if module.params['selfsigned_version'] == 2 or module.params['ownca_version'] == 2:
|
||||
module.fail_json(msg='The cryptography backend does not support v2 certificates, '
|
||||
'use select_crypto_backend=pyopenssl for v2 certificates')
|
||||
if provider == 'selfsigned':
|
||||
certificate = SelfSignedCertificateCryptography(module)
|
||||
elif provider == 'acme':
|
||||
certificate = AcmeCertificate(module, 'cryptography')
|
||||
elif provider == 'ownca':
|
||||
certificate = OwnCACertificateCryptography(module)
|
||||
else:
|
||||
certificate = AssertOnlyCertificateCryptography(module)
|
||||
if module.params['selfsigned_version'] == 2 or module.params['ownca_version'] == 2:
|
||||
module.warn('crypto backend forced to pyopenssl. The cryptography library does not support v2 certificates')
|
||||
backend = 'pyopenssl'
|
||||
|
||||
if module.params['state'] == 'present':
|
||||
# Fail if no backend has been found
|
||||
if backend == 'auto':
|
||||
module.fail_json(msg=("Can't detect none of the required Python libraries "
|
||||
"cryptography (>= {0}) or PyOpenSSL (>= {1})").format(
|
||||
MINIMAL_CRYPTOGRAPHY_VERSION,
|
||||
MINIMAL_PYOPENSSL_VERSION))
|
||||
|
||||
if module.check_mode:
|
||||
result = certificate.dump(check_mode=True)
|
||||
result['changed'] = module.params['force'] or not certificate.check(module)
|
||||
module.exit_json(**result)
|
||||
if backend == 'pyopenssl':
|
||||
if not PYOPENSSL_FOUND:
|
||||
module.fail_json(msg=missing_required_lib('pyOpenSSL'), exception=PYOPENSSL_IMP_ERR)
|
||||
if module.params['provider'] in ['selfsigned', 'ownca', 'assertonly']:
|
||||
try:
|
||||
getattr(crypto.X509Req, 'get_extensions')
|
||||
except AttributeError:
|
||||
module.fail_json(msg='You need to have PyOpenSSL>=0.15')
|
||||
|
||||
if provider == 'selfsigned':
|
||||
certificate = SelfSignedCertificate(module)
|
||||
elif provider == 'acme':
|
||||
certificate = AcmeCertificate(module, 'pyopenssl')
|
||||
elif provider == 'ownca':
|
||||
certificate = OwnCACertificate(module)
|
||||
else:
|
||||
certificate = AssertOnlyCertificate(module)
|
||||
elif backend == 'cryptography':
|
||||
if not CRYPTOGRAPHY_FOUND:
|
||||
module.fail_json(msg=missing_required_lib('cryptography'), exception=CRYPTOGRAPHY_IMP_ERR)
|
||||
if module.params['selfsigned_version'] == 2 or module.params['ownca_version'] == 2:
|
||||
module.fail_json(msg='The cryptography backend does not support v2 certificates, '
|
||||
'use select_crypto_backend=pyopenssl for v2 certificates')
|
||||
if provider == 'selfsigned':
|
||||
certificate = SelfSignedCertificateCryptography(module)
|
||||
elif provider == 'acme':
|
||||
certificate = AcmeCertificate(module, 'cryptography')
|
||||
elif provider == 'ownca':
|
||||
certificate = OwnCACertificateCryptography(module)
|
||||
else:
|
||||
certificate = AssertOnlyCertificateCryptography(module)
|
||||
|
||||
if module.params['state'] == 'present':
|
||||
if module.check_mode:
|
||||
result = certificate.dump(check_mode=True)
|
||||
result['changed'] = module.params['force'] or not certificate.check(module)
|
||||
module.exit_json(**result)
|
||||
|
||||
try:
|
||||
certificate.generate(module)
|
||||
except CertificateError as exc:
|
||||
module.fail_json(msg=to_native(exc))
|
||||
else:
|
||||
else:
|
||||
if module.check_mode:
|
||||
result = certificate.dump(check_mode=True)
|
||||
result['changed'] = os.path.exists(module.params['path'])
|
||||
module.exit_json(**result)
|
||||
|
||||
if module.check_mode:
|
||||
result = certificate.dump(check_mode=True)
|
||||
result['changed'] = os.path.exists(module.params['path'])
|
||||
module.exit_json(**result)
|
||||
|
||||
try:
|
||||
certificate.remove(module)
|
||||
except CertificateError as exc:
|
||||
module.fail_json(msg=to_native(exc))
|
||||
|
||||
result = certificate.dump()
|
||||
|
||||
module.exit_json(**result)
|
||||
result = certificate.dump()
|
||||
module.exit_json(**result)
|
||||
except crypto_utils.OpenSSLObjectError as exc:
|
||||
module.fail_json(msg=to_native(exc))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
|
|
@ -604,7 +604,10 @@ class CertificateSigningRequestPyOpenSSL(CertificateSigningRequestBase):
|
|||
except crypto.Error:
|
||||
return False
|
||||
|
||||
csr = crypto_utils.load_certificate_request(self.path)
|
||||
try:
|
||||
csr = crypto_utils.load_certificate_request(self.path)
|
||||
except Exception as dummy:
|
||||
return False
|
||||
|
||||
return _check_subject(csr) and _check_extensions(csr) and _check_signature(csr)
|
||||
|
||||
|
@ -854,46 +857,40 @@ def main():
|
|||
'cryptography (>= {0}) and pyOpenSSL (>= {1})').format(
|
||||
MINIMAL_CRYPTOGRAPHY_VERSION,
|
||||
MINIMAL_PYOPENSSL_VERSION))
|
||||
if backend == 'pyopenssl':
|
||||
if not PYOPENSSL_FOUND:
|
||||
module.fail_json(msg=missing_required_lib('pyOpenSSL'), exception=PYOPENSSL_IMP_ERR)
|
||||
try:
|
||||
getattr(crypto.X509Req, 'get_extensions')
|
||||
except AttributeError:
|
||||
module.fail_json(msg='You need to have PyOpenSSL>=0.15 to generate CSRs')
|
||||
csr = CertificateSigningRequestPyOpenSSL(module)
|
||||
elif backend == 'cryptography':
|
||||
if not CRYPTOGRAPHY_FOUND:
|
||||
module.fail_json(msg=missing_required_lib('cryptography'), exception=CRYPTOGRAPHY_IMP_ERR)
|
||||
csr = CertificateSigningRequestCryptography(module)
|
||||
try:
|
||||
if backend == 'pyopenssl':
|
||||
if not PYOPENSSL_FOUND:
|
||||
module.fail_json(msg=missing_required_lib('pyOpenSSL'), exception=PYOPENSSL_IMP_ERR)
|
||||
try:
|
||||
getattr(crypto.X509Req, 'get_extensions')
|
||||
except AttributeError:
|
||||
module.fail_json(msg='You need to have PyOpenSSL>=0.15 to generate CSRs')
|
||||
csr = CertificateSigningRequestPyOpenSSL(module)
|
||||
elif backend == 'cryptography':
|
||||
if not CRYPTOGRAPHY_FOUND:
|
||||
module.fail_json(msg=missing_required_lib('cryptography'), exception=CRYPTOGRAPHY_IMP_ERR)
|
||||
csr = CertificateSigningRequestCryptography(module)
|
||||
|
||||
if module.params['state'] == 'present':
|
||||
if module.params['state'] == 'present':
|
||||
if module.check_mode:
|
||||
result = csr.dump()
|
||||
result['changed'] = module.params['force'] or not csr.check(module)
|
||||
module.exit_json(**result)
|
||||
|
||||
if module.check_mode:
|
||||
result = csr.dump()
|
||||
result['changed'] = module.params['force'] or not csr.check(module)
|
||||
module.exit_json(**result)
|
||||
|
||||
try:
|
||||
csr.generate(module)
|
||||
except (CertificateSigningRequestError, crypto_utils.OpenSSLObjectError) as exc:
|
||||
module.fail_json(msg=to_native(exc))
|
||||
|
||||
else:
|
||||
else:
|
||||
if module.check_mode:
|
||||
result = csr.dump()
|
||||
result['changed'] = os.path.exists(module.params['path'])
|
||||
module.exit_json(**result)
|
||||
|
||||
if module.check_mode:
|
||||
result = csr.dump()
|
||||
result['changed'] = os.path.exists(module.params['path'])
|
||||
module.exit_json(**result)
|
||||
|
||||
try:
|
||||
csr.remove(module)
|
||||
except (CertificateSigningRequestError, crypto_utils.OpenSSLObjectError) as exc:
|
||||
module.fail_json(msg=to_native(exc))
|
||||
|
||||
result = csr.dump()
|
||||
|
||||
module.exit_json(**result)
|
||||
result = csr.dump()
|
||||
module.exit_json(**result)
|
||||
except crypto_utils.OpenSSLObjectError as exc:
|
||||
module.fail_json(msg=to_native(exc))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
|
|
@ -324,16 +324,16 @@ def main():
|
|||
msg="The directory '%s' does not exist or the path is not a directory" % base_dir
|
||||
)
|
||||
|
||||
pkcs12 = Pkcs(module)
|
||||
changed = False
|
||||
try:
|
||||
pkcs12 = Pkcs(module)
|
||||
changed = False
|
||||
|
||||
if module.params['state'] == 'present':
|
||||
if module.check_mode:
|
||||
result = pkcs12.dump()
|
||||
result['changed'] = module.params['force'] or not pkcs12.check(module)
|
||||
module.exit_json(**result)
|
||||
if module.params['state'] == 'present':
|
||||
if module.check_mode:
|
||||
result = pkcs12.dump()
|
||||
result['changed'] = module.params['force'] or not pkcs12.check(module)
|
||||
module.exit_json(**result)
|
||||
|
||||
try:
|
||||
if not pkcs12.check(module, perms_required=False) or module.params['force']:
|
||||
if module.params['action'] == 'export':
|
||||
if not module.params['friendly_name']:
|
||||
|
@ -346,29 +346,25 @@ def main():
|
|||
file_args = module.load_file_common_arguments(module.params)
|
||||
if module.set_fs_attributes_if_different(file_args, changed):
|
||||
changed = True
|
||||
else:
|
||||
if module.check_mode:
|
||||
result = pkcs12.dump()
|
||||
result['changed'] = os.path.exists(module.params['path'])
|
||||
module.exit_json(**result)
|
||||
|
||||
except PkcsError as exc:
|
||||
module.fail_json(msg=to_native(exc))
|
||||
else:
|
||||
if module.check_mode:
|
||||
result = pkcs12.dump()
|
||||
result['changed'] = os.path.exists(module.params['path'])
|
||||
module.exit_json(**result)
|
||||
|
||||
if os.path.exists(module.params['path']):
|
||||
try:
|
||||
if os.path.exists(module.params['path']):
|
||||
pkcs12.remove(module)
|
||||
changed = True
|
||||
except PkcsError as exc:
|
||||
module.fail_json(msg=to_native(exc))
|
||||
|
||||
result = pkcs12.dump()
|
||||
result['changed'] = changed
|
||||
if os.path.exists(module.params['path']):
|
||||
file_mode = "%04o" % stat.S_IMODE(os.stat(module.params['path']).st_mode)
|
||||
result['mode'] = file_mode
|
||||
result = pkcs12.dump()
|
||||
result['changed'] = changed
|
||||
if os.path.exists(module.params['path']):
|
||||
file_mode = "%04o" % stat.S_IMODE(os.stat(module.params['path']).st_mode)
|
||||
result['mode'] = file_mode
|
||||
|
||||
module.exit_json(**result)
|
||||
module.exit_json(**result)
|
||||
except crypto_utils.OpenSSLObjectError as exc:
|
||||
module.fail_json(msg=to_native(exc))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
|
|
@ -373,9 +373,7 @@ class PrivateKeyPyOpenSSL(PrivateKeyBase):
|
|||
try:
|
||||
crypto_utils.load_privatekey(self.path, self.passphrase)
|
||||
return True
|
||||
except crypto.Error:
|
||||
return False
|
||||
except crypto_utils.OpenSSLBadPassphraseError as exc:
|
||||
except Exception as dummy:
|
||||
return False
|
||||
|
||||
def _check_size_and_type(self):
|
||||
|
@ -535,12 +533,8 @@ class PrivateKeyCryptography(PrivateKeyBase):
|
|||
backend=self.cryptography_backend
|
||||
)
|
||||
return True
|
||||
except TypeError as e:
|
||||
if 'Password' in str(e) and 'encrypted' in str(e):
|
||||
return False
|
||||
raise PrivateKeyError(e)
|
||||
except Exception as e:
|
||||
raise PrivateKeyError(e)
|
||||
except Exception as dummy:
|
||||
return False
|
||||
|
||||
def _check_size_and_type(self):
|
||||
privatekey = self._load_privatekey()
|
||||
|
@ -639,41 +633,35 @@ def main():
|
|||
'cryptography (>= {0}) and pyOpenSSL (>= {1})').format(
|
||||
MINIMAL_CRYPTOGRAPHY_VERSION,
|
||||
MINIMAL_PYOPENSSL_VERSION))
|
||||
if backend == 'pyopenssl':
|
||||
if not PYOPENSSL_FOUND:
|
||||
module.fail_json(msg=missing_required_lib('pyOpenSSL'), exception=PYOPENSSL_IMP_ERR)
|
||||
private_key = PrivateKeyPyOpenSSL(module)
|
||||
elif backend == 'cryptography':
|
||||
if not CRYPTOGRAPHY_FOUND:
|
||||
module.fail_json(msg=missing_required_lib('cryptography'), exception=CRYPTOGRAPHY_IMP_ERR)
|
||||
private_key = PrivateKeyCryptography(module)
|
||||
try:
|
||||
if backend == 'pyopenssl':
|
||||
if not PYOPENSSL_FOUND:
|
||||
module.fail_json(msg=missing_required_lib('pyOpenSSL'), exception=PYOPENSSL_IMP_ERR)
|
||||
private_key = PrivateKeyPyOpenSSL(module)
|
||||
elif backend == 'cryptography':
|
||||
if not CRYPTOGRAPHY_FOUND:
|
||||
module.fail_json(msg=missing_required_lib('cryptography'), exception=CRYPTOGRAPHY_IMP_ERR)
|
||||
private_key = PrivateKeyCryptography(module)
|
||||
|
||||
if private_key.state == 'present':
|
||||
if private_key.state == 'present':
|
||||
if module.check_mode:
|
||||
result = private_key.dump()
|
||||
result['changed'] = module.params['force'] or not private_key.check(module)
|
||||
module.exit_json(**result)
|
||||
|
||||
if module.check_mode:
|
||||
result = private_key.dump()
|
||||
result['changed'] = module.params['force'] or not private_key.check(module)
|
||||
module.exit_json(**result)
|
||||
|
||||
try:
|
||||
private_key.generate(module)
|
||||
except PrivateKeyError as exc:
|
||||
module.fail_json(msg=to_native(exc))
|
||||
else:
|
||||
else:
|
||||
if module.check_mode:
|
||||
result = private_key.dump()
|
||||
result['changed'] = os.path.exists(module.params['path'])
|
||||
module.exit_json(**result)
|
||||
|
||||
if module.check_mode:
|
||||
result = private_key.dump()
|
||||
result['changed'] = os.path.exists(module.params['path'])
|
||||
module.exit_json(**result)
|
||||
|
||||
try:
|
||||
private_key.remove(module)
|
||||
except PrivateKeyError as exc:
|
||||
module.fail_json(msg=to_native(exc))
|
||||
|
||||
result = private_key.dump()
|
||||
|
||||
module.exit_json(**result)
|
||||
result = private_key.dump()
|
||||
module.exit_json(**result)
|
||||
except crypto_utils.OpenSSLObjectError as exc:
|
||||
module.fail_json(msg=to_native(exc))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
|
|
@ -235,7 +235,7 @@ class PublicKey(crypto_utils.OpenSSLObject):
|
|||
crypto.FILETYPE_ASN1,
|
||||
crypto.load_publickey(crypto.FILETYPE_PEM, publickey_content)
|
||||
)
|
||||
except (crypto.Error, ValueError):
|
||||
except Exception as dummy:
|
||||
return False
|
||||
|
||||
try:
|
||||
|
@ -293,34 +293,28 @@ def main():
|
|||
msg="The directory '%s' does not exist or the file is not a directory" % base_dir
|
||||
)
|
||||
|
||||
public_key = PublicKey(module)
|
||||
try:
|
||||
public_key = PublicKey(module)
|
||||
|
||||
if public_key.state == 'present':
|
||||
if public_key.state == 'present':
|
||||
if module.check_mode:
|
||||
result = public_key.dump()
|
||||
result['changed'] = module.params['force'] or not public_key.check(module)
|
||||
module.exit_json(**result)
|
||||
|
||||
if module.check_mode:
|
||||
result = public_key.dump()
|
||||
result['changed'] = module.params['force'] or not public_key.check(module)
|
||||
module.exit_json(**result)
|
||||
|
||||
try:
|
||||
public_key.generate(module)
|
||||
except PublicKeyError as exc:
|
||||
module.fail_json(msg=to_native(exc))
|
||||
else:
|
||||
else:
|
||||
if module.check_mode:
|
||||
result = public_key.dump()
|
||||
result['changed'] = os.path.exists(module.params['path'])
|
||||
module.exit_json(**result)
|
||||
|
||||
if module.check_mode:
|
||||
result = public_key.dump()
|
||||
result['changed'] = os.path.exists(module.params['path'])
|
||||
module.exit_json(**result)
|
||||
|
||||
try:
|
||||
public_key.remove(module)
|
||||
except PublicKeyError as exc:
|
||||
module.fail_json(msg=to_native(exc))
|
||||
|
||||
result = public_key.dump()
|
||||
|
||||
module.exit_json(**result)
|
||||
result = public_key.dump()
|
||||
module.exit_json(**result)
|
||||
except crypto_utils.OpenSSLObjectError as exc:
|
||||
module.fail_json(msg=to_native(exc))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
|
|
@ -101,6 +101,6 @@
|
|||
- passphrase_error_1 is failed
|
||||
- "'assphrase' in passphrase_error_1.msg or 'assword' in passphrase_error_1.msg"
|
||||
- passphrase_error_2 is failed
|
||||
- "'assphrase' in passphrase_error_1.msg or 'assword' in passphrase_error_2.msg"
|
||||
- "'assphrase' in passphrase_error_2.msg or 'assword' in passphrase_error_2.msg or 'serializ' in passphrase_error_2.msg"
|
||||
- passphrase_error_3 is failed
|
||||
- "'assphrase' in passphrase_error_1.msg or 'assword' in passphrase_error_3.msg"
|
||||
- "'assphrase' in passphrase_error_3.msg or 'assword' in passphrase_error_3.msg or 'serializ' in passphrase_error_3.msg"
|
||||
|
|
|
@ -202,4 +202,19 @@
|
|||
ignore_errors: yes
|
||||
register: passphrase_error_3
|
||||
|
||||
- name: Create broken certificate
|
||||
copy:
|
||||
dest: "{{ output_dir }}/ownca_broken.pem"
|
||||
content: "broken"
|
||||
- name: Regenerate broken cert
|
||||
openssl_certificate:
|
||||
path: '{{ output_dir }}/ownca_broken.pem'
|
||||
csr_path: '{{ output_dir }}/csr_ecc.csr'
|
||||
privatekey_path: '{{ output_dir }}/privatekey_ecc.pem'
|
||||
ownca_path: '{{ output_dir }}/ca_cert.pem'
|
||||
ownca_privatekey_path: '{{ output_dir }}/ca_privatekey.pem'
|
||||
provider: ownca
|
||||
ownca_digest: sha256
|
||||
register: ownca_broken
|
||||
|
||||
- import_tasks: ../tests/validate_ownca.yml
|
||||
|
|
|
@ -211,4 +211,17 @@
|
|||
ignore_errors: yes
|
||||
register: passphrase_error_3
|
||||
|
||||
- name: Create broken certificate
|
||||
copy:
|
||||
dest: "{{ output_dir }}/cert_broken.pem"
|
||||
content: "broken"
|
||||
- name: Regenerate broken cert
|
||||
openssl_certificate:
|
||||
path: '{{ output_dir }}/cert_broken.pem'
|
||||
csr_path: '{{ output_dir }}/csr_ecc.csr'
|
||||
privatekey_path: '{{ output_dir }}/privatekey_ecc.pem'
|
||||
provider: selfsigned
|
||||
selfsigned_digest: sha256
|
||||
register: selfsigned_broken
|
||||
|
||||
- import_tasks: ../tests/validate_selfsigned.yml
|
||||
|
|
|
@ -98,6 +98,11 @@
|
|||
- passphrase_error_1 is failed
|
||||
- "'assphrase' in passphrase_error_1.msg or 'assword' in passphrase_error_1.msg"
|
||||
- passphrase_error_2 is failed
|
||||
- "'assphrase' in passphrase_error_1.msg or 'assword' in passphrase_error_2.msg"
|
||||
- "'assphrase' in passphrase_error_2.msg or 'assword' in passphrase_error_2.msg or 'serializ' in passphrase_error_2.msg"
|
||||
- passphrase_error_3 is failed
|
||||
- "'assphrase' in passphrase_error_1.msg or 'assword' in passphrase_error_3.msg"
|
||||
- "'assphrase' in passphrase_error_3.msg or 'assword' in passphrase_error_3.msg or 'serializ' in passphrase_error_3.msg"
|
||||
|
||||
- name: Verify that broken certificate will be regenerated
|
||||
assert:
|
||||
that:
|
||||
- ownca_broken is changed
|
||||
|
|
|
@ -99,6 +99,11 @@
|
|||
- passphrase_error_1 is failed
|
||||
- "'assphrase' in passphrase_error_1.msg or 'assword' in passphrase_error_1.msg"
|
||||
- passphrase_error_2 is failed
|
||||
- "'assphrase' in passphrase_error_1.msg or 'assword' in passphrase_error_2.msg"
|
||||
- "'assphrase' in passphrase_error_2.msg or 'assword' in passphrase_error_2.msg or 'serializ' in passphrase_error_2.msg"
|
||||
- passphrase_error_3 is failed
|
||||
- "'assphrase' in passphrase_error_1.msg or 'assword' in passphrase_error_3.msg"
|
||||
- "'assphrase' in passphrase_error_3.msg or 'assword' in passphrase_error_3.msg or 'serializ' in passphrase_error_3.msg"
|
||||
|
||||
- name: Verify that broken certificate will be regenerated
|
||||
assert:
|
||||
that:
|
||||
- selfsigned_broken is changed
|
||||
|
|
|
@ -274,3 +274,17 @@
|
|||
select_crypto_backend: '{{ select_crypto_backend }}'
|
||||
ignore_errors: yes
|
||||
register: passphrase_error_3
|
||||
|
||||
- name: Create broken CSR
|
||||
copy:
|
||||
dest: "{{ output_dir }}/csrbroken.csr"
|
||||
content: "broken"
|
||||
- name: Regenerate broken CSR
|
||||
openssl_csr:
|
||||
path: '{{ output_dir }}/csrbroken.csr'
|
||||
privatekey_path: '{{ output_dir }}/privatekey2.pem'
|
||||
subject:
|
||||
commonName: This is for Ansible
|
||||
useCommonNameForSAN: no
|
||||
select_crypto_backend: '{{ select_crypto_backend }}'
|
||||
register: output_broken
|
||||
|
|
|
@ -116,6 +116,11 @@
|
|||
- passphrase_error_1 is failed
|
||||
- "'assphrase' in passphrase_error_1.msg or 'assword' in passphrase_error_1.msg"
|
||||
- passphrase_error_2 is failed
|
||||
- "'assphrase' in passphrase_error_1.msg or 'assword' in passphrase_error_2.msg"
|
||||
- "'assphrase' in passphrase_error_2.msg or 'assword' in passphrase_error_2.msg or 'serializ' in passphrase_error_2.msg"
|
||||
- passphrase_error_3 is failed
|
||||
- "'assphrase' in passphrase_error_1.msg or 'assword' in passphrase_error_3.msg"
|
||||
- "'assphrase' in passphrase_error_3.msg or 'assword' in passphrase_error_3.msg or 'serializ' in passphrase_error_3.msg"
|
||||
|
||||
- name: Verify that broken CSR will be regenerated
|
||||
assert:
|
||||
that:
|
||||
- output_broken is changed
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
---
|
||||
- block:
|
||||
# This module generates unsafe parameters for testing purposes
|
||||
# otherwise tests would be too slow
|
||||
|
@ -41,4 +42,15 @@
|
|||
force: yes
|
||||
register: dhparam_changed_force
|
||||
|
||||
- name: Create broken params
|
||||
copy:
|
||||
dest: "{{ output_dir }}/dhbroken.pem"
|
||||
content: "broken"
|
||||
- name: Regenerate broken params
|
||||
openssl_dhparam:
|
||||
path: '{{ output_dir }}/dhbroken.pem'
|
||||
size: 512
|
||||
force: yes
|
||||
register: output_broken
|
||||
|
||||
- import_tasks: ../tests/validate.yml
|
||||
|
|
|
@ -30,3 +30,8 @@
|
|||
- dhparam_changed_512 is not changed
|
||||
- dhparam_changed_to_512 is changed
|
||||
- dhparam_changed_force is changed
|
||||
|
||||
- name: Verify that broken params will be regenerated
|
||||
assert:
|
||||
that:
|
||||
- output_broken is changed
|
||||
|
|
|
@ -100,6 +100,21 @@
|
|||
state: present
|
||||
register: p12_no_pkey
|
||||
|
||||
- name: 'Create broken PKCS#12'
|
||||
copy:
|
||||
dest: "{{ output_dir }}/broken.p12"
|
||||
content: "broken"
|
||||
- name: 'Regenerate broken PKCS#12'
|
||||
openssl_pkcs12:
|
||||
path: "{{ output_dir }}/broken.p12"
|
||||
friendly_name: 'abracadabra'
|
||||
privatekey_path: "{{ output_dir }}/ansible_pkey.pem"
|
||||
certificate_path: "{{ output_dir }}/ansible.crt"
|
||||
state: present
|
||||
force: True
|
||||
mode: 0644
|
||||
register: output_broken
|
||||
|
||||
- import_tasks: ../tests/validate.yml
|
||||
|
||||
always:
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
---
|
||||
- name: 'Install pexpect'
|
||||
pip:
|
||||
name: 'pexpect'
|
||||
|
@ -27,6 +28,11 @@
|
|||
- passphrase_error_1 is failed
|
||||
- "'assphrase' in passphrase_error_1.msg or 'assword' in passphrase_error_1.msg"
|
||||
- passphrase_error_2 is failed
|
||||
- "'assphrase' in passphrase_error_1.msg or 'assword' in passphrase_error_2.msg"
|
||||
- "'assphrase' in passphrase_error_2.msg or 'assword' in passphrase_error_2.msg or 'serializ' in passphrase_error_2.msg"
|
||||
- passphrase_error_3 is failed
|
||||
- "'assphrase' in passphrase_error_1.msg or 'assword' in passphrase_error_3.msg"
|
||||
- "'assphrase' in passphrase_error_3.msg or 'assword' in passphrase_error_3.msg or 'serializ' in passphrase_error_3.msg"
|
||||
|
||||
- name: "Verify that broken PKCS#12 will be regenerated"
|
||||
assert:
|
||||
that:
|
||||
- output_broken is changed
|
||||
|
|
|
@ -184,6 +184,16 @@
|
|||
backup: yes
|
||||
register: passphrase_5
|
||||
|
||||
- name: Create broken key
|
||||
copy:
|
||||
dest: "{{ output_dir }}/broken"
|
||||
content: "broken"
|
||||
- name: Regenerate broken key
|
||||
openssl_privatekey:
|
||||
path: '{{ output_dir }}/broken.pem'
|
||||
select_crypto_backend: '{{ select_crypto_backend }}'
|
||||
register: output_broken
|
||||
|
||||
- name: Remove module
|
||||
openssl_privatekey:
|
||||
path: '{{ output_dir }}/privatekeypw.pem'
|
||||
|
|
|
@ -119,6 +119,11 @@
|
|||
- passphrase_4.backup_file is undefined
|
||||
- passphrase_5.backup_file is string
|
||||
|
||||
- name: Verify that broken key will be regenerated
|
||||
assert:
|
||||
that:
|
||||
- output_broken is changed
|
||||
|
||||
- name: Validate remove
|
||||
assert:
|
||||
that:
|
||||
|
|
|
@ -108,6 +108,16 @@
|
|||
ignore_errors: yes
|
||||
register: passphrase_error_3
|
||||
|
||||
- name: Create broken key
|
||||
copy:
|
||||
dest: "{{ output_dir }}/publickeybroken.pub"
|
||||
content: "broken"
|
||||
- name: Regenerate broken key
|
||||
openssl_publickey:
|
||||
path: '{{ output_dir }}/publickeybroken.pub'
|
||||
privatekey_path: '{{ output_dir }}/privatekey5.pem'
|
||||
register: output_broken
|
||||
|
||||
- import_tasks: ../tests/validate.yml
|
||||
|
||||
when: pyopenssl_version.stdout is version('16.0.0', '>=')
|
||||
|
|
|
@ -103,6 +103,11 @@
|
|||
- passphrase_error_1 is failed
|
||||
- "'assphrase' in passphrase_error_1.msg or 'assword' in passphrase_error_1.msg"
|
||||
- passphrase_error_2 is failed
|
||||
- "'assphrase' in passphrase_error_1.msg or 'assword' in passphrase_error_2.msg"
|
||||
- "'assphrase' in passphrase_error_2.msg or 'assword' in passphrase_error_2.msg or 'serializ' in passphrase_error_2.msg"
|
||||
- passphrase_error_3 is failed
|
||||
- "'assphrase' in passphrase_error_1.msg or 'assword' in passphrase_error_3.msg"
|
||||
- "'assphrase' in passphrase_error_3.msg or 'assword' in passphrase_error_3.msg or 'serializ' in passphrase_error_3.msg"
|
||||
|
||||
- name: Verify that broken key will be regenerated
|
||||
assert:
|
||||
that:
|
||||
- output_broken is changed
|
||||
|
|
Loading…
Reference in a new issue