From 048f15fe683b24a52928b6afebda04a9281fa916 Mon Sep 17 00:00:00 2001 From: "patchback[bot]" <45432694+patchback[bot]@users.noreply.github.com> Date: Fri, 14 May 2021 22:47:26 +0200 Subject: [PATCH] java_keystore: New ssl_backend option for cryptography (#2485) (#2513) * Adding cryptography as a backend for OpenSSL operations * Updating unit tests and adding changelog fragment * Allowing private key password option when using unprotected key * Incorporating suggestions from initial review * Centralizing module exit path (cherry picked from commit a385cbb11dd22953451890c9c2157538977972a8) Co-authored-by: Ajpantuso --- ...85-java_keystore-ssl_backend-parameter.yml | 2 + plugins/modules/system/java_keystore.py | 461 ++++++++++++------ .../targets/java_keystore/tasks/main.yml | 13 + .../targets/java_keystore/tasks/tests.yml | 1 + .../modules/system/test_java_keystore.py | 168 ++++--- 5 files changed, 414 insertions(+), 231 deletions(-) create mode 100644 changelogs/fragments/2485-java_keystore-ssl_backend-parameter.yml diff --git a/changelogs/fragments/2485-java_keystore-ssl_backend-parameter.yml b/changelogs/fragments/2485-java_keystore-ssl_backend-parameter.yml new file mode 100644 index 0000000000..b446476f82 --- /dev/null +++ b/changelogs/fragments/2485-java_keystore-ssl_backend-parameter.yml @@ -0,0 +1,2 @@ +minor_changes: + - java_keystore - added ``ssl_backend`` parameter for using the cryptography library instead of the OpenSSL binary (https://github.com/ansible-collections/community.general/pull/2485). diff --git a/plugins/modules/system/java_keystore.py b/plugins/modules/system/java_keystore.py index ebfe6abdd7..78bcfb6af6 100644 --- a/plugins/modules/system/java_keystore.py +++ b/plugins/modules/system/java_keystore.py @@ -88,9 +88,19 @@ options: description: - Mode the file should be. required: false + ssl_backend: + description: + - Backend for loading private keys and certificates. + type: str + default: openssl + choices: + - openssl + - cryptography + version_added: 3.1.0 requirements: - - openssl in PATH + - openssl in PATH (when I(ssl_backend=openssl)) - keytool in PATH + - cryptography >= 3.0 (when I(ssl_backend=cryptography)) author: - Guillaume Grossetie (@Mogztter) - quidame (@quidame) @@ -164,55 +174,283 @@ import os import re import tempfile -from ansible.module_utils.basic import AnsibleModule +from ansible.module_utils.six import PY2 +from ansible.module_utils.basic import AnsibleModule, missing_required_lib +from ansible.module_utils.common.text.converters import to_bytes, to_native, to_text + +try: + from cryptography.hazmat.primitives.serialization.pkcs12 import serialize_key_and_certificates + from cryptography.hazmat.primitives.serialization import ( + BestAvailableEncryption, + NoEncryption, + load_pem_private_key, + load_der_private_key, + ) + from cryptography.x509 import ( + load_pem_x509_certificate, + load_der_x509_certificate, + ) + from cryptography.hazmat.primitives import hashes + from cryptography.exceptions import UnsupportedAlgorithm + from cryptography.hazmat.backends.openssl import backend + HAS_CRYPTOGRAPHY_PKCS12 = True +except ImportError: + HAS_CRYPTOGRAPHY_PKCS12 = False -def read_certificate_fingerprint(module, openssl_bin, certificate_path): - current_certificate_fingerprint_cmd = [openssl_bin, "x509", "-noout", "-in", certificate_path, "-fingerprint", "-sha256"] - (rc, current_certificate_fingerprint_out, current_certificate_fingerprint_err) = run_commands(module, current_certificate_fingerprint_cmd) - if rc != 0: - return module.fail_json(msg=current_certificate_fingerprint_out, - err=current_certificate_fingerprint_err, - cmd=current_certificate_fingerprint_cmd, - rc=rc) +class JavaKeystore: + def __init__(self, module): + self.module = module - current_certificate_match = re.search(r"=([\w:]+)", current_certificate_fingerprint_out) - if not current_certificate_match: - return module.fail_json(msg="Unable to find the current certificate fingerprint in %s" % current_certificate_fingerprint_out, - cmd=current_certificate_fingerprint_cmd, - rc=rc) + self.keytool_bin = module.get_bin_path('keytool', True) - return current_certificate_match.group(1) - - -def read_stored_certificate_fingerprint(module, keytool_bin, alias, keystore_path, keystore_password): - stored_certificate_fingerprint_cmd = [keytool_bin, "-list", "-alias", alias, "-keystore", keystore_path, "-storepass:env", "STOREPASS", "-v"] - (rc, stored_certificate_fingerprint_out, stored_certificate_fingerprint_err) = run_commands( - module, stored_certificate_fingerprint_cmd, environ_update=dict(STOREPASS=keystore_password)) - if rc != 0: - if "keytool error: java.lang.Exception: Alias <%s> does not exist" % alias in stored_certificate_fingerprint_out: - return "alias mismatch" - if re.match(r'keytool error: java\.io\.IOException: [Kk]eystore( was tampered with, or)? password was incorrect', - stored_certificate_fingerprint_out): - return "password mismatch" - return module.fail_json(msg=stored_certificate_fingerprint_out, - err=stored_certificate_fingerprint_err, - cmd=stored_certificate_fingerprint_cmd, - rc=rc) - - stored_certificate_match = re.search(r"SHA256: ([\w:]+)", stored_certificate_fingerprint_out) - if not stored_certificate_match: - return module.fail_json(msg="Unable to find the stored certificate fingerprint in %s" % stored_certificate_fingerprint_out, - cmd=stored_certificate_fingerprint_cmd, - rc=rc) - - return stored_certificate_match.group(1) - - -def run_commands(module, cmd, data=None, environ_update=None, check_rc=False): - return module.run_command(cmd, check_rc=check_rc, data=data, environ_update=environ_update) + self.certificate = module.params['certificate'] + self.keypass = module.params['private_key_passphrase'] + self.keystore_path = module.params['dest'] + self.name = module.params['name'] + self.password = module.params['password'] + self.private_key = module.params['private_key'] + self.ssl_backend = module.params['ssl_backend'] + + if self.ssl_backend == 'openssl': + self.openssl_bin = module.get_bin_path('openssl', True) + else: + if not HAS_CRYPTOGRAPHY_PKCS12: + self.module.fail_json(msg=missing_required_lib('cryptography >= 3.0')) + + if module.params['certificate_path'] is None: + self.certificate_path = create_file(self.certificate) + self.module.add_cleanup_file(self.certificate_path) + else: + self.certificate_path = module.params['certificate_path'] + + if module.params['private_key_path'] is None: + self.private_key_path = create_file(self.private_key) + self.module.add_cleanup_file(self.private_key_path) + else: + self.private_key_path = module.params['private_key_path'] + + def update_permissions(self): + try: + file_args = self.module.load_file_common_arguments(self.module.params, path=self.keystore_path) + except TypeError: + # The path argument is only supported in Ansible-base 2.10+. Fall back to + # pre-2.10 behavior for older Ansible versions. + self.module.params['path'] = self.keystore_path + file_args = self.module.load_file_common_arguments(self.module.params) + return self.module.set_fs_attributes_if_different(file_args, False) + + def read_certificate_fingerprint(self, cert_format='PEM'): + if self.ssl_backend == 'cryptography': + if cert_format == 'PEM': + cert_loader = load_pem_x509_certificate + else: + cert_loader = load_der_x509_certificate + + try: + with open(self.certificate_path, 'rb') as cert_file: + cert = cert_loader( + cert_file.read(), + backend=backend + ) + except (OSError, ValueError) as e: + self.module.fail_json(msg="Unable to read the provided certificate: %s" % to_native(e)) + + fp = hex_decode(cert.fingerprint(hashes.SHA256())).upper() + fingerprint = ':'.join([fp[i:i + 2] for i in range(0, len(fp), 2)]) + else: + current_certificate_fingerprint_cmd = [ + self.openssl_bin, "x509", "-noout", "-in", self.certificate_path, "-fingerprint", "-sha256" + ] + (rc, current_certificate_fingerprint_out, current_certificate_fingerprint_err) = self.module.run_command( + current_certificate_fingerprint_cmd, + environ_update=None, + check_rc=False + ) + if rc != 0: + return self.module.fail_json( + msg=current_certificate_fingerprint_out, + err=current_certificate_fingerprint_err, + cmd=current_certificate_fingerprint_cmd, + rc=rc + ) + + current_certificate_match = re.search(r"=([\w:]+)", current_certificate_fingerprint_out) + if not current_certificate_match: + return self.module.fail_json( + msg="Unable to find the current certificate fingerprint in %s" % ( + current_certificate_fingerprint_out + ), + cmd=current_certificate_fingerprint_cmd, + rc=rc + ) + + fingerprint = current_certificate_match.group(1) + return fingerprint + + def read_stored_certificate_fingerprint(self): + stored_certificate_fingerprint_cmd = [ + self.keytool_bin, "-list", "-alias", self.name, "-keystore", + self.keystore_path, "-storepass:env", "STOREPASS", "-v" + ] + (rc, stored_certificate_fingerprint_out, stored_certificate_fingerprint_err) = self.module.run_command( + stored_certificate_fingerprint_cmd, environ_update=dict(STOREPASS=self.password), check_rc=False) + if rc != 0: + if "keytool error: java.lang.Exception: Alias <%s> does not exist" % self.name \ + in stored_certificate_fingerprint_out: + return "alias mismatch" + if re.match( + r'keytool error: java\.io\.IOException: ' + + '[Kk]eystore( was tampered with, or)? password was incorrect', + stored_certificate_fingerprint_out + ): + return "password mismatch" + return self.module.fail_json( + msg=stored_certificate_fingerprint_out, + err=stored_certificate_fingerprint_err, + cmd=stored_certificate_fingerprint_cmd, + rc=rc + ) + + stored_certificate_match = re.search(r"SHA256: ([\w:]+)", stored_certificate_fingerprint_out) + if not stored_certificate_match: + return self.module.fail_json( + msg="Unable to find the stored certificate fingerprint in %s" % stored_certificate_fingerprint_out, + cmd=stored_certificate_fingerprint_cmd, + rc=rc + ) + + return stored_certificate_match.group(1) + + def cert_changed(self): + current_certificate_fingerprint = self.read_certificate_fingerprint() + stored_certificate_fingerprint = self.read_stored_certificate_fingerprint() + return current_certificate_fingerprint != stored_certificate_fingerprint + + def cryptography_create_pkcs12_bundle(self, keystore_p12_path, key_format='PEM', cert_format='PEM'): + if key_format == 'PEM': + key_loader = load_pem_private_key + else: + key_loader = load_der_private_key + + if cert_format == 'PEM': + cert_loader = load_pem_x509_certificate + else: + cert_loader = load_der_x509_certificate + + try: + with open(self.private_key_path, 'rb') as key_file: + private_key = key_loader( + key_file.read(), + password=to_bytes(self.keypass), + backend=backend + ) + except TypeError: + # Re-attempt with no password to match existing behavior + try: + with open(self.private_key_path, 'rb') as key_file: + private_key = key_loader( + key_file.read(), + password=None, + backend=backend + ) + except (OSError, TypeError, ValueError, UnsupportedAlgorithm) as e: + self.module.fail_json( + msg="The following error occurred while loading the provided private_key: %s" % to_native(e) + ) + except (OSError, ValueError, UnsupportedAlgorithm) as e: + self.module.fail_json( + msg="The following error occurred while loading the provided private_key: %s" % to_native(e) + ) + try: + with open(self.certificate_path, 'rb') as cert_file: + cert = cert_loader( + cert_file.read(), + backend=backend + ) + except (OSError, ValueError, UnsupportedAlgorithm) as e: + self.module.fail_json( + msg="The following error occurred while loading the provided certificate: %s" % to_native(e) + ) + + if self.password: + encryption = BestAvailableEncryption(to_bytes(self.password)) + else: + encryption = NoEncryption() + + pkcs12_bundle = serialize_key_and_certificates( + name=to_bytes(self.name), + key=private_key, + cert=cert, + cas=None, + encryption_algorithm=encryption + ) + + with open(keystore_p12_path, 'wb') as p12_file: + p12_file.write(pkcs12_bundle) + + def openssl_create_pkcs12_bundle(self, keystore_p12_path): + export_p12_cmd = [self.openssl_bin, "pkcs12", "-export", "-name", self.name, "-in", self.certificate_path, + "-inkey", self.private_key_path, "-out", keystore_p12_path, "-passout", "stdin"] + + # when keypass is provided, add -passin + cmd_stdin = "" + if self.keypass: + export_p12_cmd.append("-passin") + export_p12_cmd.append("stdin") + cmd_stdin = "%s\n" % self.keypass + cmd_stdin += "%s\n%s" % (self.password, self.password) + + (rc, export_p12_out, dummy) = self.module.run_command( + export_p12_cmd, data=cmd_stdin, environ_update=None, check_rc=False + ) + + if rc != 0: + self.module.fail_json(msg=export_p12_out, cmd=export_p12_cmd, rc=rc) + + def create(self): + if self.module.check_mode: + return {'changed': True} + + if os.path.exists(self.keystore_path): + os.remove(self.keystore_path) + + keystore_p12_path = create_path() + self.module.add_cleanup_file(keystore_p12_path) + + if self.ssl_backend == 'cryptography': + self.cryptography_create_pkcs12_bundle(keystore_p12_path) + else: + self.openssl_create_pkcs12_bundle(keystore_p12_path) + + import_keystore_cmd = [self.keytool_bin, "-importkeystore", + "-destkeystore", self.keystore_path, + "-srckeystore", keystore_p12_path, + "-srcstoretype", "pkcs12", + "-alias", self.name, + "-deststorepass:env", "STOREPASS", + "-srcstorepass:env", "STOREPASS", + "-noprompt"] + + (rc, import_keystore_out, dummy) = self.module.run_command( + import_keystore_cmd, data=None, environ_update=dict(STOREPASS=self.password), check_rc=False + ) + if rc != 0: + return self.module.fail_json(msg=import_keystore_out, cmd=import_keystore_cmd, rc=rc) + + self.update_permissions() + return { + 'changed': True, + 'msg': import_keystore_out, + 'cmd': import_keystore_cmd, + 'rc': rc + } + + def exists(self): + return os.path.exists(self.keystore_path) +# Utility functions def create_path(): dummy, tmpfile = tempfile.mkstemp() os.remove(tmpfile) @@ -226,123 +464,11 @@ def create_file(content): return tmpfile -def create_tmp_certificate(module): - return create_file(module.params['certificate']) - - -def create_tmp_private_key(module): - return create_file(module.params['private_key']) - - -def cert_changed(module, openssl_bin, keytool_bin, keystore_path, keystore_pass, alias): - certificate_path = module.params['certificate_path'] - if certificate_path is None: - certificate_path = create_tmp_certificate(module) - try: - current_certificate_fingerprint = read_certificate_fingerprint(module, openssl_bin, certificate_path) - stored_certificate_fingerprint = read_stored_certificate_fingerprint(module, keytool_bin, alias, keystore_path, keystore_pass) - return current_certificate_fingerprint != stored_certificate_fingerprint - finally: - if module.params['certificate_path'] is None: - os.remove(certificate_path) - - -def create_jks(module, name, openssl_bin, keytool_bin, keystore_path, password, keypass): - if module.check_mode: - return module.exit_json(changed=True) - - certificate_path = module.params['certificate_path'] - if certificate_path is None: - certificate_path = create_tmp_certificate(module) - - private_key_path = module.params['private_key_path'] - if private_key_path is None: - private_key_path = create_tmp_private_key(module) - - keystore_p12_path = create_path() - - try: - if os.path.exists(keystore_path): - os.remove(keystore_path) - - export_p12_cmd = [openssl_bin, "pkcs12", "-export", "-name", name, "-in", certificate_path, - "-inkey", private_key_path, "-out", keystore_p12_path, "-passout", "stdin"] - - # when keypass is provided, add -passin - cmd_stdin = "" - if keypass: - export_p12_cmd.append("-passin") - export_p12_cmd.append("stdin") - cmd_stdin = "%s\n" % keypass - cmd_stdin += "%s\n%s" % (password, password) - - (rc, export_p12_out, dummy) = run_commands(module, export_p12_cmd, data=cmd_stdin) - if rc != 0: - return module.fail_json(msg=export_p12_out, - cmd=export_p12_cmd, - rc=rc) - - import_keystore_cmd = [keytool_bin, "-importkeystore", - "-destkeystore", keystore_path, - "-srckeystore", keystore_p12_path, - "-srcstoretype", "pkcs12", - "-alias", name, - "-deststorepass:env", "STOREPASS", - "-srcstorepass:env", "STOREPASS", - "-noprompt"] - - (rc, import_keystore_out, dummy) = run_commands(module, import_keystore_cmd, data=None, - environ_update=dict(STOREPASS=password)) - if rc != 0: - return module.fail_json(msg=import_keystore_out, - cmd=import_keystore_cmd, - rc=rc) - - update_jks_perm(module, keystore_path) - return module.exit_json(changed=True, - msg=import_keystore_out, - cmd=import_keystore_cmd, - rc=rc) - finally: - if module.params['certificate_path'] is None: - os.remove(certificate_path) - if module.params['private_key_path'] is None: - os.remove(private_key_path) - os.remove(keystore_p12_path) - - -def update_jks_perm(module, keystore_path): - try: - file_args = module.load_file_common_arguments(module.params, path=keystore_path) - except TypeError: - # The path argument is only supported in Ansible-base 2.10+. Fall back to - # pre-2.10 behavior for older Ansible versions. - module.params['path'] = keystore_path - file_args = module.load_file_common_arguments(module.params) - module.set_fs_attributes_if_different(file_args, False) - - -def process_jks(module): - name = module.params['name'] - password = module.params['password'] - keypass = module.params['private_key_passphrase'] - keystore_path = module.params['dest'] - force = module.params['force'] - openssl_bin = module.get_bin_path('openssl', True) - keytool_bin = module.get_bin_path('keytool', True) - - if os.path.exists(keystore_path): - if force: - create_jks(module, name, openssl_bin, keytool_bin, keystore_path, password, keypass) - else: - if cert_changed(module, openssl_bin, keytool_bin, keystore_path, password, name): - create_jks(module, name, openssl_bin, keytool_bin, keystore_path, password, keypass) - else: - if not module.check_mode: - update_jks_perm(module, keystore_path) - module.exit_json(changed=False) +def hex_decode(s): + if PY2: + return s.decode('hex') else: - create_jks(module, name, openssl_bin, keytool_bin, keystore_path, password, keypass) + return s.hex() class ArgumentSpec(object): @@ -358,6 +484,7 @@ class ArgumentSpec(object): private_key_path=dict(type='path', no_log=False), private_key_passphrase=dict(type='str', no_log=True), password=dict(type='str', required=True, no_log=True), + ssl_backend=dict(type='str', default='openssl', choices=['openssl', 'cryptography']), force=dict(type='bool', default=False), ) choose_between = ( @@ -379,7 +506,19 @@ def main(): add_file_common_args=spec.add_file_common_args, ) module.run_command_environ_update = dict(LANG='C', LC_ALL='C', LC_MESSAGES='C') - process_jks(module) + + result = dict() + jks = JavaKeystore(module) + + if jks.exists(): + if module.params['force'] or jks.cert_changed(): + result = jks.create() + else: + result['changed'] = jks.update_permissions() + else: + result = jks.create() + + module.exit_json(**result) if __name__ == '__main__': diff --git a/tests/integration/targets/java_keystore/tasks/main.yml b/tests/integration/targets/java_keystore/tasks/main.yml index 358222aea8..b5f1f01624 100644 --- a/tests/integration/targets/java_keystore/tasks/main.yml +++ b/tests/integration/targets/java_keystore/tasks/main.yml @@ -9,12 +9,22 @@ - name: Include tasks to create ssl materials on the controller include_tasks: prepare.yml +- set_fact: + ssl_backends: ['openssl'] + +- set_fact: + ssl_backends: "{{ ssl_backends + ['cryptography'] }}" + when: cryptography_version.stdout is version('3.0', '>=') + - when: has_java_keytool block: - name: Include tasks to play with 'certificate' and 'private_key' contents include_tasks: tests.yml vars: remote_cert: false + loop: "{{ ssl_backends }}" + loop_control: + loop_var: ssl_backend - name: Include tasks to create ssl materials on the remote host include_tasks: prepare.yml @@ -23,3 +33,6 @@ include_tasks: tests.yml vars: remote_cert: true + loop: "{{ ssl_backends }}" + loop_control: + loop_var: ssl_backend diff --git a/tests/integration/targets/java_keystore/tasks/tests.yml b/tests/integration/targets/java_keystore/tasks/tests.yml index e0de1c6836..b892dd1d29 100644 --- a/tests/integration/targets/java_keystore/tasks/tests.yml +++ b/tests/integration/targets/java_keystore/tasks/tests.yml @@ -23,6 +23,7 @@ private_key_path: "{{ omit if not remote_cert else output_dir ~ '/' ~ (item.keyname | d(item.name)) ~ '.key' }}" private_key_passphrase: "{{ item.passphrase | d(omit) }}" password: changeit + ssl_backend: "{{ ssl_backend }}" loop: "{{ java_keystore_certs }}" check_mode: yes register: result_check diff --git a/tests/unit/plugins/modules/system/test_java_keystore.py b/tests/unit/plugins/modules/system/test_java_keystore.py index ec14b3734d..5e99074c95 100644 --- a/tests/unit/plugins/modules/system/test_java_keystore.py +++ b/tests/unit/plugins/modules/system/test_java_keystore.py @@ -14,7 +14,7 @@ from ansible_collections.community.general.tests.unit.plugins.modules.utils impo from ansible_collections.community.general.tests.unit.compat.mock import patch from ansible_collections.community.general.tests.unit.compat.mock import Mock from ansible.module_utils.basic import AnsibleModule -from ansible_collections.community.general.plugins.modules.system.java_keystore import create_jks, cert_changed, ArgumentSpec +from ansible_collections.community.general.plugins.modules.system.java_keystore import JavaKeystore, ArgumentSpec class TestCreateJavaKeystore(ModuleTestCase): @@ -28,14 +28,16 @@ class TestCreateJavaKeystore(ModuleTestCase): self.spec = ArgumentSpec() self.mock_create_file = patch('ansible_collections.community.general.plugins.modules.system.java_keystore.create_file') self.mock_create_path = patch('ansible_collections.community.general.plugins.modules.system.java_keystore.create_path') - self.mock_run_commands = patch('ansible_collections.community.general.plugins.modules.system.java_keystore.run_commands') + self.mock_run_command = patch('ansible.module_utils.basic.AnsibleModule.run_command') + self.mock_get_bin_path = patch('ansible.module_utils.basic.AnsibleModule.get_bin_path') self.mock_os_path_exists = patch('os.path.exists', side_effect=lambda path: True if path == '/path/to/keystore.jks' else orig_exists(path)) self.mock_selinux_context = patch('ansible.module_utils.basic.AnsibleModule.selinux_context', side_effect=lambda path: ['unconfined_u', 'object_r', 'user_home_t', 's0']) self.mock_is_special_selinux_path = patch('ansible.module_utils.basic.AnsibleModule.is_special_selinux_path', side_effect=lambda path: (False, None)) - self.run_commands = self.mock_run_commands.start() + self.run_command = self.mock_run_command.start() + self.get_bin_path = self.mock_get_bin_path.start() self.create_file = self.mock_create_file.start() self.create_path = self.mock_create_path.start() self.selinux_context = self.mock_selinux_context.start() @@ -47,7 +49,8 @@ class TestCreateJavaKeystore(ModuleTestCase): super(TestCreateJavaKeystore, self).tearDown() self.mock_create_file.stop() self.mock_create_path.stop() - self.mock_run_commands.stop() + self.mock_run_command.stop() + self.mock_get_bin_path.stop() self.mock_selinux_context.stop() self.mock_is_special_selinux_path.stop() self.mock_os_path_exists.stop() @@ -57,7 +60,38 @@ class TestCreateJavaKeystore(ModuleTestCase): certificate='cert-foo', private_key='private-foo', dest='/path/to/keystore.jks', - name='foo', + name='test', + password='changeit' + )) + + module = AnsibleModule( + argument_spec=self.spec.argument_spec, + supports_check_mode=self.spec.supports_check_mode + ) + + with patch('os.remove', return_value=True): + self.create_path.side_effect = ['/tmp/tmpgrzm2ah7'] + self.create_file.side_effect = ['/tmp/etacifitrec', '/tmp/yek_etavirp', ''] + self.run_command.side_effect = [(0, '', ''), (0, '', '')] + self.get_bin_path.side_effect = ['keytool', 'openssl', ''] + jks = JavaKeystore(module) + assert jks.create() == { + 'changed': True, + 'cmd': ["keytool", "-importkeystore", + "-destkeystore", "/path/to/keystore.jks", + "-srckeystore", "/tmp/tmpgrzm2ah7", "-srcstoretype", "pkcs12", "-alias", "test", + "-deststorepass:env", "STOREPASS", "-srcstorepass:env", "STOREPASS", "-noprompt"], + 'msg': '', + 'rc': 0 + } + + def test_create_jks_keypass_fail_export_pkcs12(self): + set_module_args(dict( + certificate='cert-foo', + private_key='private-foo', + private_key_passphrase='passphrase-foo', + dest='/path/to/keystore.jks', + name='test', password='changeit' )) @@ -67,44 +101,15 @@ class TestCreateJavaKeystore(ModuleTestCase): ) module.exit_json = Mock() - - with patch('os.remove', return_value=True): - self.create_path.side_effect = ['/tmp/tmpgrzm2ah7'] - self.create_file.side_effect = ['/tmp/etacifitrec', '/tmp/yek_etavirp'] - self.run_commands.side_effect = [(0, '', ''), (0, '', '')] - create_jks(module, "test", "openssl", "keytool", "/path/to/keystore.jks", "changeit", "") - module.exit_json.assert_called_once_with( - changed=True, - cmd=["keytool", "-importkeystore", - "-destkeystore", "/path/to/keystore.jks", - "-srckeystore", "/tmp/tmpgrzm2ah7", "-srcstoretype", "pkcs12", "-alias", "test", - "-deststorepass:env", "STOREPASS", "-srcstorepass:env", "STOREPASS", "-noprompt"], - msg='', - rc=0 - ) - - def test_create_jks_keypass_fail_export_pkcs12(self): - set_module_args(dict( - certificate='cert-foo', - private_key='private-foo', - private_key_passphrase='passphrase-foo', - dest='/path/to/keystore.jks', - name='foo', - password='changeit' - )) - - module = AnsibleModule( - argument_spec=self.spec.argument_spec, - supports_check_mode=self.spec.supports_check_mode - ) - module.fail_json = Mock() with patch('os.remove', return_value=True): self.create_path.side_effect = ['/tmp/tmp1cyp12xa'] - self.create_file.side_effect = ['/tmp/tmpvalcrt32', '/tmp/tmpwh4key0c'] - self.run_commands.side_effect = [(1, '', ''), (0, '', '')] - create_jks(module, "test", "openssl", "keytool", "/path/to/keystore.jks", "changeit", "passphrase-foo") + self.create_file.side_effect = ['/tmp/tmpvalcrt32', '/tmp/tmpwh4key0c', ''] + self.run_command.side_effect = [(1, '', ''), (0, '', '')] + self.get_bin_path.side_effect = ['keytool', 'openssl', ''] + jks = JavaKeystore(module) + jks.create() module.fail_json.assert_called_once_with( cmd=["openssl", "pkcs12", "-export", "-name", "test", "-in", "/tmp/tmpvalcrt32", @@ -121,7 +126,7 @@ class TestCreateJavaKeystore(ModuleTestCase): certificate='cert-foo', private_key='private-foo', dest='/path/to/keystore.jks', - name='foo', + name='test', password='changeit' )) @@ -130,13 +135,16 @@ class TestCreateJavaKeystore(ModuleTestCase): supports_check_mode=self.spec.supports_check_mode ) + module.exit_json = Mock() module.fail_json = Mock() with patch('os.remove', return_value=True): self.create_path.side_effect = ['/tmp/tmp1cyp12xa'] - self.create_file.side_effect = ['/tmp/tmpvalcrt32', '/tmp/tmpwh4key0c'] - self.run_commands.side_effect = [(1, '', ''), (0, '', '')] - create_jks(module, "test", "openssl", "keytool", "/path/to/keystore.jks", "changeit", "") + self.create_file.side_effect = ['/tmp/tmpvalcrt32', '/tmp/tmpwh4key0c', ''] + self.run_command.side_effect = [(1, '', ''), (0, '', '')] + self.get_bin_path.side_effect = ['keytool', 'openssl', ''] + jks = JavaKeystore(module) + jks.create() module.fail_json.assert_called_once_with( cmd=["openssl", "pkcs12", "-export", "-name", "test", "-in", "/tmp/tmpvalcrt32", @@ -152,7 +160,7 @@ class TestCreateJavaKeystore(ModuleTestCase): certificate='cert-foo', private_key='private-foo', dest='/path/to/keystore.jks', - name='foo', + name='test', password='changeit' )) @@ -161,13 +169,16 @@ class TestCreateJavaKeystore(ModuleTestCase): supports_check_mode=self.spec.supports_check_mode ) + module.exit_json = Mock() module.fail_json = Mock() with patch('os.remove', return_value=True): self.create_path.side_effect = ['/tmp/tmpgrzm2ah7'] - self.create_file.side_effect = ['/tmp/etacifitrec', '/tmp/yek_etavirp'] - self.run_commands.side_effect = [(0, '', ''), (1, '', '')] - create_jks(module, "test", "openssl", "keytool", "/path/to/keystore.jks", "changeit", "") + self.create_file.side_effect = ['/tmp/etacifitrec', '/tmp/yek_etavirp', ''] + self.run_command.side_effect = [(0, '', ''), (1, '', '')] + self.get_bin_path.side_effect = ['keytool', 'openssl', ''] + jks = JavaKeystore(module) + jks.create() module.fail_json.assert_called_once_with( cmd=["keytool", "-importkeystore", "-destkeystore", "/path/to/keystore.jks", @@ -186,15 +197,18 @@ class TestCertChanged(ModuleTestCase): super(TestCertChanged, self).setUp() self.spec = ArgumentSpec() self.mock_create_file = patch('ansible_collections.community.general.plugins.modules.system.java_keystore.create_file') - self.mock_run_commands = patch('ansible_collections.community.general.plugins.modules.system.java_keystore.run_commands') - self.run_commands = self.mock_run_commands.start() + self.mock_run_command = patch('ansible.module_utils.basic.AnsibleModule.run_command') + self.mock_get_bin_path = patch('ansible.module_utils.basic.AnsibleModule.get_bin_path') + self.run_command = self.mock_run_command.start() self.create_file = self.mock_create_file.start() + self.get_bin_path = self.mock_get_bin_path.start() def tearDown(self): """Teardown.""" super(TestCertChanged, self).tearDown() self.mock_create_file.stop() - self.mock_run_commands.stop() + self.mock_run_command.stop() + self.mock_get_bin_path.stop() def test_cert_unchanged_same_fingerprint(self): set_module_args(dict( @@ -211,9 +225,11 @@ class TestCertChanged(ModuleTestCase): ) with patch('os.remove', return_value=True): - self.create_file.side_effect = ['/tmp/placeholder'] - self.run_commands.side_effect = [(0, 'foo=abcd:1234:efgh', ''), (0, 'SHA256: abcd:1234:efgh', '')] - result = cert_changed(module, "openssl", "keytool", "/path/to/keystore.jks", "changeit", 'foo') + self.create_file.side_effect = ['/tmp/placeholder', ''] + self.run_command.side_effect = [(0, 'foo=abcd:1234:efgh', ''), (0, 'SHA256: abcd:1234:efgh', '')] + self.get_bin_path.side_effect = ['keytool', 'openssl', ''] + jks = JavaKeystore(module) + result = jks.cert_changed() self.assertFalse(result, 'Fingerprint is identical') def test_cert_changed_fingerprint_mismatch(self): @@ -231,9 +247,11 @@ class TestCertChanged(ModuleTestCase): ) with patch('os.remove', return_value=True): - self.create_file.side_effect = ['/tmp/placeholder'] - self.run_commands.side_effect = [(0, 'foo=abcd:1234:efgh', ''), (0, 'SHA256: wxyz:9876:stuv', '')] - result = cert_changed(module, "openssl", "keytool", "/path/to/keystore.jks", "changeit", 'foo') + self.create_file.side_effect = ['/tmp/placeholder', ''] + self.run_command.side_effect = [(0, 'foo=abcd:1234:efgh', ''), (0, 'SHA256: wxyz:9876:stuv', '')] + self.get_bin_path.side_effect = ['keytool', 'openssl', ''] + jks = JavaKeystore(module) + result = jks.cert_changed() self.assertTrue(result, 'Fingerprint mismatch') def test_cert_changed_fail_alias_does_not_exist(self): @@ -251,10 +269,12 @@ class TestCertChanged(ModuleTestCase): ) with patch('os.remove', return_value=True): - self.create_file.side_effect = ['/tmp/placeholder'] - self.run_commands.side_effect = [(0, 'foo=abcd:1234:efgh', ''), - (1, 'keytool error: java.lang.Exception: Alias does not exist', '')] - result = cert_changed(module, "openssl", "keytool", "/path/to/keystore.jks", "changeit", 'foo') + self.create_file.side_effect = ['/tmp/placeholder', ''] + self.run_command.side_effect = [(0, 'foo=abcd:1234:efgh', ''), + (1, 'keytool error: java.lang.Exception: Alias does not exist', '')] + self.get_bin_path.side_effect = ['keytool', 'openssl', ''] + jks = JavaKeystore(module) + result = jks.cert_changed() self.assertTrue(result, 'Alias mismatch detected') def test_cert_changed_password_mismatch(self): @@ -272,10 +292,12 @@ class TestCertChanged(ModuleTestCase): ) with patch('os.remove', return_value=True): - self.create_file.side_effect = ['/tmp/placeholder'] - self.run_commands.side_effect = [(0, 'foo=abcd:1234:efgh', ''), - (1, 'keytool error: java.io.IOException: Keystore password was incorrect', '')] - result = cert_changed(module, "openssl", "keytool", "/path/to/keystore.jks", "changeit", 'foo') + self.create_file.side_effect = ['/tmp/placeholder', ''] + self.run_command.side_effect = [(0, 'foo=abcd:1234:efgh', ''), + (1, 'keytool error: java.io.IOException: Keystore password was incorrect', '')] + self.get_bin_path.side_effect = ['keytool', 'openssl', ''] + jks = JavaKeystore(module) + result = jks.cert_changed() self.assertTrue(result, 'Password mismatch detected') def test_cert_changed_fail_read_cert(self): @@ -292,12 +314,15 @@ class TestCertChanged(ModuleTestCase): supports_check_mode=self.spec.supports_check_mode ) + module.exit_json = Mock() module.fail_json = Mock() with patch('os.remove', return_value=True): - self.create_file.side_effect = ['/tmp/tmpdj6bvvme'] - self.run_commands.side_effect = [(1, '', 'Oops'), (0, 'SHA256: wxyz:9876:stuv', '')] - cert_changed(module, "openssl", "keytool", "/path/to/keystore.jks", "changeit", 'foo') + self.create_file.side_effect = ['/tmp/tmpdj6bvvme', ''] + self.run_command.side_effect = [(1, '', 'Oops'), (0, 'SHA256: wxyz:9876:stuv', '')] + self.get_bin_path.side_effect = ['keytool', 'openssl', ''] + jks = JavaKeystore(module) + jks.cert_changed() module.fail_json.assert_called_once_with( cmd=["openssl", "x509", "-noout", "-in", "/tmp/tmpdj6bvvme", "-fingerprint", "-sha256"], msg='', @@ -319,12 +344,15 @@ class TestCertChanged(ModuleTestCase): supports_check_mode=self.spec.supports_check_mode ) + module.exit_json = Mock() module.fail_json = Mock(return_value=True) with patch('os.remove', return_value=True): - self.create_file.side_effect = ['/tmp/placeholder'] - self.run_commands.side_effect = [(0, 'foo: wxyz:9876:stuv', ''), (1, '', 'Oops')] - cert_changed(module, "openssl", "keytool", "/path/to/keystore.jks", "changeit", 'foo') + self.create_file.side_effect = ['/tmp/placeholder', ''] + self.run_command.side_effect = [(0, 'foo: wxyz:9876:stuv', ''), (1, '', 'Oops')] + self.get_bin_path.side_effect = ['keytool', 'openssl', ''] + jks = JavaKeystore(module) + jks.cert_changed() module.fail_json.assert_called_with( cmd=["keytool", "-list", "-alias", "foo", "-keystore", "/path/to/keystore.jks", "-storepass:env", "STOREPASS", "-v"], msg='',