1
0
Fork 0
mirror of https://github.com/ansible-collections/community.general.git synced 2024-09-14 20:13:21 +02:00

java_keystore: improve error handling and returned results (#2183)

* java_keystore - improve error handling and returned results

* set check_rc=False to return results as documented when module fails
* set LANG, LC_ALL and LC_MESSAGES to C to rely keytool output parsing
* fix pylint's `no-else-return` and `unused-variable` hints
* update related unit tests accordingly
* add a changelog fragment

update unit test (remove stdout_lines from returned dict)

fix unit test: failure is now expected when alias does not exist

* Update changelogs/fragments/2183-java_keystore_improve_error_handling.yml

Co-authored-by: Felix Fontein <felix@fontein.de>

* fix integration test: overwrite keystore at the same location

Co-authored-by: Felix Fontein <felix@fontein.de>
This commit is contained in:
quidame 2021-04-12 21:23:43 +02:00 committed by GitHub
parent 7356451aa1
commit 89b7e7191f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 96 additions and 81 deletions

View file

@ -0,0 +1,6 @@
---
bugfixes:
- "java_keystore - improve error handling and return ``cmd`` as documented.
Force ``LANG``, ``LC_ALL`` and ``LC_MESSAGES`` environment variables to ``C`` to rely
on ``keytool`` output parsing. Fix pylint's ``unused-variable`` and ``no-else-return``
hints (https://github.com/ansible-collections/community.general/pull/2183)."

View file

@ -131,16 +131,14 @@ def read_certificate_fingerprint(module, openssl_bin, certificate_path):
if rc != 0:
return module.fail_json(msg=current_certificate_fingerprint_out,
err=current_certificate_fingerprint_err,
rc=rc,
cmd=current_certificate_fingerprint_cmd)
cmd=current_certificate_fingerprint_cmd,
rc=rc)
current_certificate_match = re.search(r"=([\w:]+)", current_certificate_fingerprint_out)
if not current_certificate_match:
return module.fail_json(
msg="Unable to find the current certificate fingerprint in %s" % current_certificate_fingerprint_out,
rc=rc,
cmd=current_certificate_fingerprint_err
)
return module.fail_json(msg="Unable to find the current certificate fingerprint in %s" % current_certificate_fingerprint_out,
cmd=current_certificate_fingerprint_cmd,
rc=rc)
return current_certificate_match.group(1)
@ -150,31 +148,36 @@ def read_stored_certificate_fingerprint(module, keytool_bin, alias, keystore_pat
(rc, stored_certificate_fingerprint_out, stored_certificate_fingerprint_err) = run_commands(
module, stored_certificate_fingerprint_cmd, environ_update=dict(STOREPASS=keystore_password))
if rc != 0:
if "keytool error: java.lang.Exception: Alias <%s> does not exist" % alias not in stored_certificate_fingerprint_out:
return module.fail_json(msg=stored_certificate_fingerprint_out,
err=stored_certificate_fingerprint_err,
rc=rc,
cmd=stored_certificate_fingerprint_cmd)
else:
return None
else:
stored_certificate_match = re.search(r"SHA256: ([\w:]+)", stored_certificate_fingerprint_out)
if not stored_certificate_match:
return module.fail_json(
msg="Unable to find the stored certificate fingerprint in %s" % stored_certificate_fingerprint_out,
rc=rc,
cmd=stored_certificate_fingerprint_cmd
)
# First intention was to not fail, and overwrite the keystore instead,
# in case of alias mismatch; but an issue in error handling caused the
# module to fail anyway.
# See: https://github.com/ansible-collections/community.general/issues/1671
# And: https://github.com/ansible-collections/community.general/pull/2183
# if "keytool error: java.lang.Exception: Alias <%s> does not exist" % alias in stored_certificate_fingerprint_out:
# return "alias mismatch"
# if re.match(r'keytool error: java\.io\.IOException: [Kk]eystore( was tampered with, or)? password was incorrect',
# stored_certificate_fingerprint_out):
# return "password mismatch"
return module.fail_json(msg=stored_certificate_fingerprint_out,
err=stored_certificate_fingerprint_err,
cmd=stored_certificate_fingerprint_cmd,
rc=rc)
return stored_certificate_match.group(1)
stored_certificate_match = re.search(r"SHA256: ([\w:]+)", stored_certificate_fingerprint_out)
if not stored_certificate_match:
return module.fail_json(msg="Unable to find the stored certificate fingerprint in %s" % stored_certificate_fingerprint_out,
cmd=stored_certificate_fingerprint_cmd,
rc=rc)
return stored_certificate_match.group(1)
def run_commands(module, cmd, data=None, environ_update=None, check_rc=True):
def run_commands(module, cmd, data=None, environ_update=None, check_rc=False):
return module.run_command(cmd, check_rc=check_rc, data=data, environ_update=environ_update)
def create_path():
tmpfd, tmpfile = tempfile.mkstemp()
dummy, tmpfile = tempfile.mkstemp()
os.remove(tmpfile)
return tmpfile
@ -206,58 +209,57 @@ def cert_changed(module, openssl_bin, keytool_bin, keystore_path, keystore_pass,
def create_jks(module, name, openssl_bin, keytool_bin, keystore_path, password, keypass):
if module.check_mode:
module.exit_json(changed=True)
else:
certificate_path = create_tmp_certificate(module)
private_key_path = create_tmp_private_key(module)
keystore_p12_path = create_path()
try:
if os.path.exists(keystore_path):
os.remove(keystore_path)
return module.exit_json(changed=True)
export_p12_cmd = [openssl_bin, "pkcs12", "-export", "-name", name, "-in", certificate_path,
"-inkey", private_key_path, "-out", keystore_p12_path, "-passout", "stdin"]
certificate_path = create_tmp_certificate(module)
private_key_path = create_tmp_private_key(module)
keystore_p12_path = create_path()
try:
if os.path.exists(keystore_path):
os.remove(keystore_path)
# when keypass is provided, add -passin
cmd_stdin = ""
if keypass:
export_p12_cmd.append("-passin")
export_p12_cmd.append("stdin")
cmd_stdin = "%s\n" % keypass
export_p12_cmd = [openssl_bin, "pkcs12", "-export", "-name", name, "-in", certificate_path,
"-inkey", private_key_path, "-out", keystore_p12_path, "-passout", "stdin"]
cmd_stdin += "%s\n%s" % (password, password)
(rc, export_p12_out, export_p12_err) = run_commands(module, export_p12_cmd, data=cmd_stdin)
if rc != 0:
return module.fail_json(msg=export_p12_out,
rc=rc,
cmd=export_p12_cmd)
# when keypass is provided, add -passin
cmd_stdin = ""
if keypass:
export_p12_cmd.append("-passin")
export_p12_cmd.append("stdin")
cmd_stdin = "%s\n" % keypass
cmd_stdin += "%s\n%s" % (password, password)
import_keystore_cmd = [keytool_bin, "-importkeystore",
"-destkeystore", keystore_path,
"-srckeystore", keystore_p12_path,
"-srcstoretype", "pkcs12",
"-alias", name,
"-deststorepass:env", "STOREPASS",
"-srcstorepass:env", "STOREPASS",
"-noprompt"]
(rc, export_p12_out, dummy) = run_commands(module, export_p12_cmd, data=cmd_stdin)
if rc != 0:
return module.fail_json(msg=export_p12_out,
cmd=export_p12_cmd,
rc=rc)
(rc, import_keystore_out, import_keystore_err) = run_commands(module, import_keystore_cmd, data=None,
environ_update=dict(STOREPASS=password))
if rc == 0:
update_jks_perm(module, keystore_path)
return module.exit_json(changed=True,
msg=import_keystore_out,
rc=rc,
cmd=import_keystore_cmd,
stdout_lines=import_keystore_out)
else:
return module.fail_json(msg=import_keystore_out,
rc=rc,
cmd=import_keystore_cmd)
finally:
os.remove(certificate_path)
os.remove(private_key_path)
os.remove(keystore_p12_path)
import_keystore_cmd = [keytool_bin, "-importkeystore",
"-destkeystore", keystore_path,
"-srckeystore", keystore_p12_path,
"-srcstoretype", "pkcs12",
"-alias", name,
"-deststorepass:env", "STOREPASS",
"-srcstorepass:env", "STOREPASS",
"-noprompt"]
(rc, import_keystore_out, dummy) = run_commands(module, import_keystore_cmd, data=None,
environ_update=dict(STOREPASS=password))
if rc != 0:
return module.fail_json(msg=import_keystore_out,
cmd=import_keystore_cmd,
rc=rc)
update_jks_perm(module, keystore_path)
return module.exit_json(changed=True,
msg=import_keystore_out,
cmd=import_keystore_cmd,
rc=rc)
finally:
os.remove(certificate_path)
os.remove(private_key_path)
os.remove(keystore_p12_path)
def update_jks_perm(module, keystore_path):
@ -289,7 +291,7 @@ def process_jks(module):
else:
if not module.check_mode:
update_jks_perm(module, keystore_path)
return module.exit_json(changed=False)
module.exit_json(changed=False)
else:
create_jks(module, name, openssl_bin, keytool_bin, keystore_path, password, keypass)
@ -317,6 +319,7 @@ def main():
add_file_common_args=spec.add_file_common_args,
supports_check_mode=spec.supports_check_mode
)
module.run_command_environ_update = dict(LANG='C', LC_ALL='C', LC_MESSAGES='C')
process_jks(module)

View file

@ -63,11 +63,11 @@
- name: Create a Java key store for the given certificates (check mode)
community.general.java_keystore: &create_key_store_data
name: example
certificate: "{{lookup('file', output_dir ~ '/' ~ item.name ~ '.pem') }}"
private_key: "{{lookup('file', output_dir ~ '/' ~ (item.keyname | default(item.name)) ~ '.key') }}"
certificate: "{{ lookup('file', output_dir ~ '/' ~ item.name ~ '.pem') }}"
private_key: "{{ lookup('file', output_dir ~ '/' ~ (item.keyname | default(item.name)) ~ '.key') }}"
private_key_passphrase: "{{ item.passphrase | default(omit) }}"
password: changeit
dest: "{{ output_dir ~ '/' ~ item.name ~ '.jks' }}"
dest: "{{ output_dir ~ '/' ~ (item.keyname | default(item.name)) ~ '.jks' }}"
loop: &create_key_store_loop
- name: cert
- name: cert-pw

View file

@ -80,8 +80,7 @@ class TestCreateJavaKeystore(ModuleTestCase):
"-srckeystore", "/tmp/tmpgrzm2ah7", "-srcstoretype", "pkcs12", "-alias", "test",
"-deststorepass:env", "STOREPASS", "-srcstorepass:env", "STOREPASS", "-noprompt"],
msg='',
rc=0,
stdout_lines=''
rc=0
)
def test_create_jks_keypass_fail_export_pkcs12(self):
@ -237,7 +236,7 @@ class TestCertChanged(ModuleTestCase):
result = cert_changed(module, "openssl", "keytool", "/path/to/keystore.jks", "changeit", 'foo')
self.assertTrue(result, 'Fingerprint mismatch')
def test_cert_changed_alias_does_not_exist(self):
def test_cert_changed_fail_alias_does_not_exist(self):
set_module_args(dict(
certificate='cert-foo',
private_key='private-foo',
@ -251,12 +250,19 @@ class TestCertChanged(ModuleTestCase):
supports_check_mode=self.spec.supports_check_mode
)
module.fail_json = Mock()
with patch('os.remove', return_value=True):
self.create_file.side_effect = ['/tmp/placeholder']
self.run_commands.side_effect = [(0, 'foo=abcd:1234:efgh', ''),
(1, 'keytool error: java.lang.Exception: Alias <foo> does not exist', '')]
result = cert_changed(module, "openssl", "keytool", "/path/to/keystore.jks", "changeit", 'foo')
self.assertTrue(result, 'Certificate does not exist')
cert_changed(module, "openssl", "keytool", "/path/to/keystore.jks", "changeit", 'foo')
module.fail_json.assert_called_once_with(
cmd=["keytool", "-list", "-alias", "foo", "-keystore", "/path/to/keystore.jks", "-storepass:env", "STOREPASS", "-v"],
msg='keytool error: java.lang.Exception: Alias <foo> does not exist',
err='',
rc=1
)
def test_cert_changed_fail_read_cert(self):
set_module_args(dict(