mirror of
https://github.com/ansible-collections/community.general.git
synced 2024-09-14 20:13:21 +02:00
Better vault pass prompt behav on EOF, more unit tests (#27981)
This commit is contained in:
parent
82f550e8cd
commit
220db9cdd2
2 changed files with 81 additions and 3 deletions
|
@ -263,7 +263,7 @@ class PromptVaultSecret(VaultSecret):
|
||||||
try:
|
try:
|
||||||
vault_pass = display.prompt(prompt, private=True)
|
vault_pass = display.prompt(prompt, private=True)
|
||||||
except EOFError:
|
except EOFError:
|
||||||
pass
|
break
|
||||||
b_vault_pass = to_bytes(vault_pass, errors='strict', nonstring='simplerepr').strip()
|
b_vault_pass = to_bytes(vault_pass, errors='strict', nonstring='simplerepr').strip()
|
||||||
b_vault_passwords.append(b_vault_pass)
|
b_vault_passwords.append(b_vault_pass)
|
||||||
|
|
||||||
|
@ -358,10 +358,10 @@ class ScriptVaultSecret(FileVaultSecret):
|
||||||
|
|
||||||
raise AnsibleError(msg)
|
raise AnsibleError(msg)
|
||||||
|
|
||||||
stdout, dummy = p.communicate()
|
stdout, stderr = p.communicate()
|
||||||
|
|
||||||
if p.returncode != 0:
|
if p.returncode != 0:
|
||||||
raise AnsibleError("Vault password script %s returned non-zero (%s): %s" % (filename, p.returncode, p.stderr))
|
raise AnsibleError("Vault password script %s returned non-zero (%s): %s" % (filename, p.returncode, stderr))
|
||||||
|
|
||||||
vault_pass = stdout.strip(b'\r\n')
|
vault_pass = stdout.strip(b'\r\n')
|
||||||
return vault_pass
|
return vault_pass
|
||||||
|
|
|
@ -30,6 +30,7 @@ from binascii import hexlify
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
from ansible.compat.tests import unittest
|
from ansible.compat.tests import unittest
|
||||||
|
from ansible.compat.tests.mock import patch, MagicMock
|
||||||
|
|
||||||
from ansible import errors
|
from ansible import errors
|
||||||
from ansible.module_utils import six
|
from ansible.module_utils import six
|
||||||
|
@ -60,6 +61,34 @@ class TestPromptVaultSecret(unittest.TestCase):
|
||||||
secret.load()
|
secret.load()
|
||||||
self.assertIsNone(secret._bytes)
|
self.assertIsNone(secret._bytes)
|
||||||
|
|
||||||
|
@patch('ansible.parsing.vault.display.prompt', return_value='the_password')
|
||||||
|
def test_prompt_formats_none(self, mock_display_prompt):
|
||||||
|
secret = vault.PromptVaultSecret(vault_id='test_id')
|
||||||
|
secret.load()
|
||||||
|
self.assertEqual(secret._bytes, b'the_password')
|
||||||
|
|
||||||
|
@patch('ansible.parsing.vault.display.prompt', return_value='the_password')
|
||||||
|
def test_custom_prompt(self, mock_display_prompt):
|
||||||
|
secret = vault.PromptVaultSecret(vault_id='test_id',
|
||||||
|
prompt_formats=['The cow flies at midnight: '])
|
||||||
|
secret.load()
|
||||||
|
self.assertEqual(secret._bytes, b'the_password')
|
||||||
|
|
||||||
|
@patch('ansible.parsing.vault.display.prompt', side_effect=EOFError)
|
||||||
|
def test_prompt_eoferror(self, mock_display_prompt):
|
||||||
|
secret = vault.PromptVaultSecret(vault_id='test_id')
|
||||||
|
secret.load()
|
||||||
|
self.assertEqual(secret._bytes, None)
|
||||||
|
|
||||||
|
@patch('ansible.parsing.vault.display.prompt', side_effect=['first_password', 'second_password'])
|
||||||
|
def test_prompt_passwords_dont_match(self, mock_display_prompt):
|
||||||
|
secret = vault.PromptVaultSecret(vault_id='test_id',
|
||||||
|
prompt_formats=['Vault password: ',
|
||||||
|
'Confirm Vault password: '])
|
||||||
|
self.assertRaisesRegexp(errors.AnsibleError,
|
||||||
|
'Passwords do not match',
|
||||||
|
secret.load)
|
||||||
|
|
||||||
|
|
||||||
class TestFileVaultSecret(unittest.TestCase):
|
class TestFileVaultSecret(unittest.TestCase):
|
||||||
def test(self):
|
def test(self):
|
||||||
|
@ -128,6 +157,55 @@ class TestScriptVaultSecret(unittest.TestCase):
|
||||||
self.assertIsNone(secret._bytes)
|
self.assertIsNone(secret._bytes)
|
||||||
self.assertIsNone(secret._text)
|
self.assertIsNone(secret._text)
|
||||||
|
|
||||||
|
def _mock_popen(self, mock_popen, return_code=0, stdout=b'', stderr=b''):
|
||||||
|
def communicate():
|
||||||
|
return stdout, stderr
|
||||||
|
mock_popen.return_value = MagicMock(returncode=return_code)
|
||||||
|
mock_popen_instance = mock_popen.return_value
|
||||||
|
mock_popen_instance.communicate = communicate
|
||||||
|
|
||||||
|
@patch('ansible.parsing.vault.subprocess.Popen')
|
||||||
|
def test_read_file(self, mock_popen):
|
||||||
|
self._mock_popen(mock_popen)
|
||||||
|
secret = vault.ScriptVaultSecret()
|
||||||
|
with patch.object(secret, 'loader') as mock_loader:
|
||||||
|
mock_loader.is_executable = MagicMock(return_value=True)
|
||||||
|
secret.load()
|
||||||
|
|
||||||
|
@patch('ansible.parsing.vault.subprocess.Popen')
|
||||||
|
def test_read_file_os_error(self, mock_popen):
|
||||||
|
self._mock_popen(mock_popen)
|
||||||
|
mock_popen.side_effect = OSError('That is not an executable')
|
||||||
|
secret = vault.ScriptVaultSecret()
|
||||||
|
with patch.object(secret, 'loader') as mock_loader:
|
||||||
|
mock_loader.is_executable = MagicMock(return_value=True)
|
||||||
|
self.assertRaisesRegexp(errors.AnsibleError,
|
||||||
|
'Problem running vault password script.*',
|
||||||
|
secret.load)
|
||||||
|
|
||||||
|
@patch('ansible.parsing.vault.subprocess.Popen')
|
||||||
|
def test_read_file_not_executable(self, mock_popen):
|
||||||
|
self._mock_popen(mock_popen)
|
||||||
|
secret = vault.ScriptVaultSecret()
|
||||||
|
with patch.object(secret, 'loader') as mock_loader:
|
||||||
|
mock_loader.is_executable = MagicMock(return_value=False)
|
||||||
|
self.assertRaisesRegexp(vault.AnsibleVaultError,
|
||||||
|
'The vault password script .* was not executable',
|
||||||
|
secret.load)
|
||||||
|
|
||||||
|
@patch('ansible.parsing.vault.subprocess.Popen')
|
||||||
|
def test_read_file_non_zero_return_code(self, mock_popen):
|
||||||
|
stderr = b'That did not work for a random reason'
|
||||||
|
rc = 37
|
||||||
|
|
||||||
|
self._mock_popen(mock_popen, return_code=rc, stderr=stderr)
|
||||||
|
secret = vault.ScriptVaultSecret(filename='/dev/null/some_vault_secret')
|
||||||
|
with patch.object(secret, 'loader') as mock_loader:
|
||||||
|
mock_loader.is_executable = MagicMock(return_value=True)
|
||||||
|
self.assertRaisesRegexp(errors.AnsibleError,
|
||||||
|
'Vault password script.*returned non-zero \(%s\): %s' % (rc, stderr),
|
||||||
|
secret.load)
|
||||||
|
|
||||||
|
|
||||||
class TestGetFileVaultSecret(unittest.TestCase):
|
class TestGetFileVaultSecret(unittest.TestCase):
|
||||||
def test_file(self):
|
def test_file(self):
|
||||||
|
|
Loading…
Reference in a new issue