diff --git a/lib/ansible/constants.py b/lib/ansible/constants.py index 606fb3c194..82eae8127a 100644 --- a/lib/ansible/constants.py +++ b/lib/ansible/constants.py @@ -25,9 +25,9 @@ from string import ascii_letters, digits from ansible.compat.six import string_types from ansible.compat.six.moves import configparser - -from ansible.parsing.quoting import unquote from ansible.errors import AnsibleOptionsError +from ansible.module_utils._text import to_text +from ansible.parsing.quoting import unquote from ansible.utils.path import makedirs_safe BOOL_TRUE = frozenset([ "true", "t", "y", "1", "yes", "on" ]) @@ -303,8 +303,6 @@ GALAXY_IGNORE_CERTS = get_config(p, 'galaxy', 'ignore_certs', 'ANSIBL # this can be configured to blacklist SCMS but cannot add new ones unless the code is also updated GALAXY_SCMS = get_config(p, 'galaxy', 'scms', 'ANSIBLE_GALAXY_SCMS', 'git, hg', islist=True) -# characters included in auto-generated passwords -DEFAULT_PASSWORD_CHARS = ascii_letters + digits + ".,:-_" STRING_TYPE_FILTERS = get_config(p, 'jinja2', 'dont_type_filters', 'ANSIBLE_STRING_TYPE_FILTERS', ['string', 'to_json', 'to_nice_json', 'to_yaml', 'ppretty', 'json'], islist=True ) # colors @@ -329,6 +327,7 @@ DIFF_CONTEXT = get_config(p, 'diff', 'context', 'ANSIBLE_DIFF_CONTEXT', 3, integ MODULE_REQUIRE_ARGS = ['command', 'win_command', 'shell', 'win_shell', 'raw', 'script'] MODULE_NO_JSON = ['command', 'win_command', 'shell', 'win_shell', 'raw'] DEFAULT_BECOME_PASS = None +DEFAULT_PASSWORD_CHARS = to_text(ascii_letters + digits + ".,:-_", errors='strict') # characters included in auto-generated passwords DEFAULT_SUDO_PASS = None DEFAULT_REMOTE_PASS = None DEFAULT_SUBSET = None diff --git a/lib/ansible/plugins/lookup/password.py b/lib/ansible/plugins/lookup/password.py index 0776f73c24..530fd2ea90 100644 --- a/lib/ansible/plugins/lookup/password.py +++ b/lib/ansible/plugins/lookup/password.py @@ -23,23 +23,26 @@ import os import string import random -from string import ascii_letters, digits - from ansible import constants as C +from ansible.compat.six import text_type from ansible.errors import AnsibleError -from ansible.plugins.lookup import LookupBase +from ansible.module_utils._text import to_bytes, to_native, to_text from ansible.parsing.splitter import parse_kv +from ansible.plugins.lookup import LookupBase from ansible.utils.encrypt import do_encrypt from ansible.utils.path import makedirs_safe + DEFAULT_LENGTH = 20 VALID_PARAMS = frozenset(('length', 'encrypt', 'chars')) def _parse_parameters(term): - # Hacky parsing of params - # See https://github.com/ansible/ansible-modules-core/issues/1968#issuecomment-136842156 - # and the first_found lookup For how we want to fix this later + """Hacky parsing of params + + See https://github.com/ansible/ansible-modules-core/issues/1968#issuecomment-136842156 + and the first_found lookup For how we want to fix this later + """ first_split = term.split(' ', 1) if len(first_split) <= 1: # Only a single argument given, therefore it's a path @@ -50,7 +53,7 @@ def _parse_parameters(term): params = parse_kv(first_split[1]) if '_raw_params' in params: # Spaces in the path? - relpath = ' '.join((relpath, params['_raw_params'])) + relpath = u' '.join((relpath, params['_raw_params'])) del params['_raw_params'] # Check that we parsed the params correctly @@ -73,91 +76,182 @@ def _parse_parameters(term): params['chars'] = params.get('chars', None) if params['chars']: tmp_chars = [] - if ',,' in params['chars']: + if u',,' in params['chars']: tmp_chars.append(u',') - tmp_chars.extend(c for c in params['chars'].replace(',,', ',').split(',') if c) + tmp_chars.extend(c for c in params['chars'].replace(u',,', u',').split(u',') if c) params['chars'] = tmp_chars else: # Default chars for password - params['chars'] = ['ascii_letters', 'digits', ".,:-_"] + params['chars'] = [u'ascii_letters', u'digits', u".,:-_"] return relpath, params +def _read_password_file(b_path): + """Read the contents of a password file and return it + :arg b_path: A byte string containing the path to the password file + :returns: a text string containing the contents of the password file or + None if no password file was present. + """ + content = None + + if os.path.exists(b_path): + with open(b_path, 'rb') as f: + b_content = f.read().rstrip() + content = to_text(b_content, errors='surrogate_or_strict') + + return content + + +def _gen_candidate_chars(characters): + '''Generate a string containing all valid chars as defined by ``characters`` + + :arg characters: A list of character specs. The character specs are + shorthand names for sets of characters like 'digits', 'ascii_letters', + or 'punctuation' or a string to be included verbatim. + + The values of each char spec can be: + + * a name of an attribute in the 'strings' module ('digits' for example). + The value of the attribute will be added to the candidate chars. + * a string of characters. If the string isn't an attribute in 'string' + module, the string will be directly added to the candidate chars. + + For example:: + + characters=['digits', '?|']`` + + will match ``string.digits`` and add all ascii digits. ``'?|'`` will add + the question mark and pipe characters directly. Return will be the string:: + + u'0123456789?|' + ''' + chars = [] + for chars_spec in characters: + # getattr from string expands things like "ascii_letters" and "digits" + # into a set of characters. + chars.append(to_text(getattr(string, to_native(chars_spec), chars_spec), + errors='strict')) + chars = u''.join(chars).replace(u'"', u'').replace(u"'", u'') + return chars + + +def _random_password(length=DEFAULT_LENGTH, chars=C.DEFAULT_PASSWORD_CHARS): + '''Return a random password string of length containing only chars + + :kwarg length: The number of characters in the new password. Defaults to 20. + :kwarg chars: The characters to choose from. The default is all ascii + letters, ascii digits, and these symbols ``.,:-_`` + + .. note: this was moved from the old ansible utils code, as nothing + else appeared to use it. + ''' + assert isinstance(chars, text_type), '%s (%s) is not a text_type' % (chars, type(chars)) + + random_generator = random.SystemRandom() + + password = [] + while len(password) < length: + new_char = random_generator.choice(chars) + password.append(new_char) + + return u''.join(password) + + +def _random_salt(): + """Return a text string suitable for use as a salt for the hash functions we use to encrypt passwords. + """ + # Note passlib salt values must be pure ascii so we can't let the user + # configure this + salt_chars = _gen_candidate_chars(['ascii_letters', 'digits', './']) + return _random_password(length=8, chars=salt_chars) + + +def _parse_content(content): + '''parse our password data format into password and salt + + :arg content: The data read from the file + :returns: password and salt + ''' + password = content + salt = None + + salt_slug = u' salt=' + try: + sep = content.rindex(salt_slug) + except ValueError: + # No salt + pass + else: + salt = password[sep + len(salt_slug):] + password = content[:sep] + + return password, salt + + +def _format_content(password, salt, encrypt=True): + """Format the password and salt for saving + :arg password: the plaintext password to save + :arg salt: the salt to use when encrypting a password + :arg encrypt: Whether the user requests that this password is encrypted. + Note that the password is saved in clear. Encrypt just tells us if we + must save the salt value for idempotence. Defaults to True. + :returns: a text string containing the formatted information + + .. warning:: Passwords are saved in clear. This is because the playbooks + expect to get cleartext passwords from this lookup. + """ + if not encrypt and not salt: + return password + + # At this point, the calling code should have assured us that there is a salt value. + assert salt, '_format_content was called with encryption requested but no salt value' + + return u'%s salt=%s' % (password, salt) + + +def _write_password_file(b_path, content): + b_pathdir = os.path.dirname(b_path) + makedirs_safe(b_pathdir, mode=0o700) + + with open(b_path, 'wb') as f: + os.chmod(b_path, 0o600) + b_content = to_bytes(content, errors='surrogate_or_strict') + b'\n' + f.write(b_content) + + class LookupModule(LookupBase): - - def random_password(self, length=DEFAULT_LENGTH, chars=C.DEFAULT_PASSWORD_CHARS): - ''' - Return a random password string of length containing only chars. - NOTE: this was moved from the old ansible utils code, as nothing - else appeared to use it. - ''' - - password = [] - while len(password) < length: - new_char = os.urandom(1) - if new_char in chars: - password.append(new_char) - - return ''.join(password) - - def random_salt(self): - salt_chars = ascii_letters + digits + './' - return self.random_password(length=8, chars=salt_chars) - def run(self, terms, variables, **kwargs): - ret = [] for term in terms: relpath, params = _parse_parameters(term) - - # get password or create it if file doesn't exist path = self._loader.path_dwim(relpath) - if not os.path.exists(path): - pathdir = os.path.dirname(path) - try: - makedirs_safe(pathdir, mode=0o700) - except OSError as e: - raise AnsibleError("cannot create the path for the password lookup: %s (error was %s)" % (pathdir, str(e))) + b_path = to_bytes(path, errors='surrogate_or_strict') + chars = _gen_candidate_chars(params['chars']) - chars = "".join(getattr(string, c, c) for c in params['chars']).replace('"', '').replace("'", '') - password = ''.join(random.choice(chars) for _ in range(params['length'])) - - if params['encrypt'] is not None: - salt = self.random_salt() - content = '%s salt=%s' % (password, salt) - else: - content = password - with open(path, 'w') as f: - os.chmod(path, 0o600) - f.write(content + '\n') - else: - content = open(path).read().rstrip() - - password = content + changed = False + content = _read_password_file(b_path) + if content is None: + plaintext_password = _random_password(params['length'], chars) salt = None + changed = True + else: + plaintext_password, salt = _parse_content(content) - try: - sep = content.rindex(' salt=') - except ValueError: - # No salt - pass - else: - salt = password[sep + len(' salt='):] - password = content[:sep] + if params['encrypt'] and not salt: + changed = True + salt = _random_salt() - if params['encrypt'] is not None and salt is None: - # crypt requested, add salt if missing - salt = self.random_salt() - content = '%s salt=%s' % (password, salt) - with open(path, 'w') as f: - os.chmod(path, 0o600) - f.write(content + '\n') + if changed: + content = _format_content(plaintext_password, salt, encrypt=params['encrypt']) + _write_password_file(b_path, content) if params['encrypt']: - password = do_encrypt(password, params['encrypt'], salt=salt) - - ret.append(password) + password = do_encrypt(plaintext_password, params['encrypt'], + salt=to_bytes(salt, encoding='ascii', errors='strict')) + ret.append(password) + else: + ret.append(plaintext_password) return ret diff --git a/lib/ansible/utils/encrypt.py b/lib/ansible/utils/encrypt.py index 991476c5de..1ef6e29c48 100644 --- a/lib/ansible/utils/encrypt.py +++ b/lib/ansible/utils/encrypt.py @@ -67,6 +67,7 @@ except ImportError: from ansible import constants as C from ansible.errors import AnsibleError +from ansible.module_utils._text import to_text __all__ = ['do_encrypt'] @@ -89,7 +90,11 @@ def do_encrypt(result, encrypt, salt_size=None, salt=None): else: raise AnsibleError("passlib must be installed to encrypt vars_prompt values") - return result + # Hashes from passlib.hash should be represented as ascii strings of hex + # digits so this should not traceback. If it's not representable as such + # we need to traceback and then blacklist such algorithms because it may + # impact calling code. + return to_text(result, errors='strict') def key_for_hostname(hostname): # fireball mode is an implementation of ansible firing up zeromq via SSH diff --git a/test/units/plugins/lookup/test_password.py b/test/units/plugins/lookup/test_password.py index 46a5bd2be6..7dabba6ce6 100644 --- a/test/units/plugins/lookup/test_password.py +++ b/test/units/plugins/lookup/test_password.py @@ -20,117 +20,422 @@ from __future__ import (absolute_import, division, print_function) __metaclass__ = type -from ansible.compat.tests import unittest +import passlib +from passlib.handlers import pbkdf2 + +from ansible.compat.six import text_type +from ansible.compat.six.moves import builtins +from ansible.compat.tests import unittest +from ansible.compat.tests.mock import mock_open, patch +from ansible.errors import AnsibleError +from ansible.plugins import PluginLoader +from ansible.utils import encrypt + +from units.mock.loader import DictDataLoader + +from ansible.plugins.lookup import password -from ansible.plugins.lookup.password import LookupModule, _parse_parameters, DEFAULT_LENGTH DEFAULT_CHARS = sorted([u'ascii_letters', u'digits', u".,:-_"]) +DEFAULT_CANDIDATE_CHARS = u'.,:-_abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789' -class TestPasswordLookup(unittest.TestCase): +# Currently there isn't a new-style +old_style_params_data = ( + # Simple case + dict(term=u'/path/to/file', + filename=u'/path/to/file', + params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=DEFAULT_CHARS), + candidate_chars=DEFAULT_CANDIDATE_CHARS, + ), - # Currently there isn't a new-style - old_style_params_data = ( - # Simple case - dict(term=u'/path/to/file', - filename=u'/path/to/file', - params=dict(length=DEFAULT_LENGTH, encrypt=None, chars=DEFAULT_CHARS) - ), + # Special characters in path + dict(term=u'/path/with/embedded spaces and/file', + filename=u'/path/with/embedded spaces and/file', + params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=DEFAULT_CHARS), + candidate_chars=DEFAULT_CANDIDATE_CHARS, + ), + dict(term=u'/path/with/equals/cn=com.ansible', + filename=u'/path/with/equals/cn=com.ansible', + params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=DEFAULT_CHARS), + candidate_chars=DEFAULT_CANDIDATE_CHARS, + ), + dict(term=u'/path/with/unicode/くらとみ/file', + filename=u'/path/with/unicode/くらとみ/file', + params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=DEFAULT_CHARS), + candidate_chars=DEFAULT_CANDIDATE_CHARS, + ), + # Mix several special chars + dict(term=u'/path/with/utf 8 and spaces/くらとみ/file', + filename=u'/path/with/utf 8 and spaces/くらとみ/file', + params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=DEFAULT_CHARS), + candidate_chars=DEFAULT_CANDIDATE_CHARS, + ), + dict(term=u'/path/with/encoding=unicode/くらとみ/file', + filename=u'/path/with/encoding=unicode/くらとみ/file', + params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=DEFAULT_CHARS), + candidate_chars=DEFAULT_CANDIDATE_CHARS, + ), + dict(term=u'/path/with/encoding=unicode/くらとみ/and spaces file', + filename=u'/path/with/encoding=unicode/くらとみ/and spaces file', + params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=DEFAULT_CHARS), + candidate_chars=DEFAULT_CANDIDATE_CHARS, + ), - # Special characters in path - dict(term=u'/path/with/embedded spaces and/file', - filename=u'/path/with/embedded spaces and/file', - params=dict(length=DEFAULT_LENGTH, encrypt=None, chars=DEFAULT_CHARS) - ), - dict(term=u'/path/with/equals/cn=com.ansible', - filename=u'/path/with/equals/cn=com.ansible', - params=dict(length=DEFAULT_LENGTH, encrypt=None, chars=DEFAULT_CHARS) - ), - dict(term=u'/path/with/unicode/くらとみ/file', - filename=u'/path/with/unicode/くらとみ/file', - params=dict(length=DEFAULT_LENGTH, encrypt=None, chars=DEFAULT_CHARS) - ), - # Mix several special chars - dict(term=u'/path/with/utf 8 and spaces/くらとみ/file', - filename=u'/path/with/utf 8 and spaces/くらとみ/file', - params=dict(length=DEFAULT_LENGTH, encrypt=None, chars=DEFAULT_CHARS) - ), - dict(term=u'/path/with/encoding=unicode/くらとみ/file', - filename=u'/path/with/encoding=unicode/くらとみ/file', - params=dict(length=DEFAULT_LENGTH, encrypt=None, chars=DEFAULT_CHARS) - ), - dict(term=u'/path/with/encoding=unicode/くらとみ/and spaces file', - filename=u'/path/with/encoding=unicode/くらとみ/and spaces file', - params=dict(length=DEFAULT_LENGTH, encrypt=None, chars=DEFAULT_CHARS) - ), + # Simple parameters + dict(term=u'/path/to/file length=42', + filename=u'/path/to/file', + params=dict(length=42, encrypt=None, chars=DEFAULT_CHARS), + candidate_chars=DEFAULT_CANDIDATE_CHARS, + ), + dict(term=u'/path/to/file encrypt=pbkdf2_sha256', + filename=u'/path/to/file', + params=dict(length=password.DEFAULT_LENGTH, encrypt='pbkdf2_sha256', chars=DEFAULT_CHARS), + candidate_chars=DEFAULT_CANDIDATE_CHARS, + ), + dict(term=u'/path/to/file chars=abcdefghijklmnop', + filename=u'/path/to/file', + params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=[u'abcdefghijklmnop']), + candidate_chars=u'abcdefghijklmnop', + ), + dict(term=u'/path/to/file chars=digits,abc,def', + filename=u'/path/to/file', + params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=sorted([u'digits', u'abc', u'def'])), + candidate_chars=u'abcdef0123456789', + ), - # Simple parameters - dict(term=u'/path/to/file length=42', - filename=u'/path/to/file', - params=dict(length=42, encrypt=None, chars=DEFAULT_CHARS) - ), - dict(term=u'/path/to/file encrypt=pbkdf2_sha256', - filename=u'/path/to/file', - params=dict(length=DEFAULT_LENGTH, encrypt='pbkdf2_sha256', chars=DEFAULT_CHARS) - ), - dict(term=u'/path/to/file chars=abcdefghijklmnop', - filename=u'/path/to/file', - params=dict(length=DEFAULT_LENGTH, encrypt=None, chars=[u'abcdefghijklmnop']) - ), - dict(term=u'/path/to/file chars=digits,abc,def', - filename=u'/path/to/file', - params=dict(length=DEFAULT_LENGTH, encrypt=None, chars=sorted([u'digits', u'abc', u'def'])) - ), - # Including comma in chars - dict(term=u'/path/to/file chars=abcdefghijklmnop,,digits', - filename=u'/path/to/file', - params=dict(length=DEFAULT_LENGTH, encrypt=None, chars=sorted([u'abcdefghijklmnop', u',', u'digits'])) - ), - dict(term=u'/path/to/file chars=,,', - filename=u'/path/to/file', - params=dict(length=DEFAULT_LENGTH, encrypt=None, chars=[u',']) - ), + # Including comma in chars + dict(term=u'/path/to/file chars=abcdefghijklmnop,,digits', + filename=u'/path/to/file', + params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=sorted([u'abcdefghijklmnop', u',', u'digits'])), + candidate_chars = u',abcdefghijklmnop0123456789', + ), + dict(term=u'/path/to/file chars=,,', + filename=u'/path/to/file', + params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=[u',']), + candidate_chars=u',', + ), - # Including = in chars - dict(term=u'/path/to/file chars=digits,=,,', - filename=u'/path/to/file', - params=dict(length=DEFAULT_LENGTH, encrypt=None, chars=sorted([u'digits', u'=', u','])) - ), - dict(term=u'/path/to/file chars=digits,abc=def', - filename=u'/path/to/file', - params=dict(length=DEFAULT_LENGTH, encrypt=None, chars=sorted([u'digits', u'abc=def'])) - ), + # Including = in chars + dict(term=u'/path/to/file chars=digits,=,,', + filename=u'/path/to/file', + params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=sorted([u'digits', u'=', u','])), + candidate_chars=u',=0123456789', + ), + dict(term=u'/path/to/file chars=digits,abc=def', + filename=u'/path/to/file', + params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=sorted([u'digits', u'abc=def'])), + candidate_chars=u'abc=def0123456789', + ), - # Including unicode in chars - dict(term=u'/path/to/file chars=digits,くらとみ,,', - filename=u'/path/to/file', - params=dict(length=DEFAULT_LENGTH, encrypt=None, chars=sorted([u'digits', u'くらとみ', u','])) - ), + # Including unicode in chars + dict(term=u'/path/to/file chars=digits,くらとみ,,', + filename=u'/path/to/file', + params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=sorted([u'digits', u'くらとみ', u','])), + candidate_chars=u',0123456789くらとみ', + ), + # Including only unicode in chars + dict(term=u'/path/to/file chars=くらとみ', + filename=u'/path/to/file', + params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=sorted([u'くらとみ'])), + candidate_chars=u'くらとみ', + ), - # Including special chars in both path and chars - # Special characters in path - dict(term=u'/path/with/embedded spaces and/file chars=abc=def', - filename=u'/path/with/embedded spaces and/file', - params=dict(length=DEFAULT_LENGTH, encrypt=None, chars=[u'abc=def']) - ), - dict(term=u'/path/with/equals/cn=com.ansible chars=abc=def', - filename=u'/path/with/equals/cn=com.ansible', - params=dict(length=DEFAULT_LENGTH, encrypt=None, chars=[u'abc=def']) - ), - dict(term=u'/path/with/unicode/くらとみ/file chars=くらとみ', - filename=u'/path/with/unicode/くらとみ/file', - params=dict(length=DEFAULT_LENGTH, encrypt=None, chars=[u'くらとみ']) - ), - ) + # Include ':' in path + dict(term=u'/path/to/file_with:colon chars=ascii_letters,digits', + filename=u'/path/to/file_with:colon', + params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=sorted([u'ascii_letters', u'digits'])), + candidate_chars=u'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789', + ), - def setUp(self): - pass + # Including special chars in both path and chars + # Special characters in path + dict(term=u'/path/with/embedded spaces and/file chars=abc=def', + filename=u'/path/with/embedded spaces and/file', + params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=[u'abc=def']), + candidate_chars=u'abc=def', + ), + dict(term=u'/path/with/equals/cn=com.ansible chars=abc=def', + filename=u'/path/with/equals/cn=com.ansible', + params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=[u'abc=def']), + candidate_chars=u'abc=def', + ), + dict(term=u'/path/with/unicode/くらとみ/file chars=くらとみ', + filename=u'/path/with/unicode/くらとみ/file', + params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=[u'くらとみ']), + candidate_chars=u'くらとみ', + ), +) - def tearDown(self): - pass - def test_parse_parameters(self): - for testcase in self.old_style_params_data: - filename, params = _parse_parameters(testcase['term']) +class TestParseParameters(unittest.TestCase): + def test(self): + for testcase in old_style_params_data: + filename, params = password._parse_parameters(testcase['term']) params['chars'].sort() self.assertEqual(filename, testcase['filename']) self.assertEqual(params, testcase['params']) + + def test_unrecognized_value(self): + testcase = dict(term=u'/path/to/file chars=くらとみi sdfsdf', + filename=u'/path/to/file', + params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=[u'くらとみ']), + candidate_chars=u'くらとみ') + self.assertRaises(AnsibleError, password._parse_parameters, testcase['term']) + + def test_invalid_params(self): + testcase = dict(term=u'/path/to/file chars=くらとみi somethign_invalid=123', + filename=u'/path/to/file', + params=dict(length=password.DEFAULT_LENGTH, encrypt=None, chars=[u'くらとみ']), + candidate_chars=u'くらとみ') + self.assertRaises(AnsibleError, password._parse_parameters, testcase['term']) + + +class TestReadPasswordFile(unittest.TestCase): + def setUp(self): + self.os_path_exists = password.os.path.exists + + def tearDown(self): + password.os.path.exists = self.os_path_exists + + def test_no_password_file(self): + password.os.path.exists = lambda x: False + self.assertEqual(password._read_password_file(b'/nonexistent'), None) + + def test_with_password_file(self): + password.os.path.exists = lambda x: True + with patch.object(builtins, 'open', mock_open(read_data=b'Testing\n')) as m: + self.assertEqual(password._read_password_file(b'/etc/motd'), u'Testing') + + +class TestGenCandidateChars(unittest.TestCase): + def _assert_gen_candidate_chars(self, testcase): + expected_candidate_chars = testcase['candidate_chars'] + params = testcase['params'] + chars_spec = params['chars'] + res = password._gen_candidate_chars(chars_spec) + self.assertEquals(res, expected_candidate_chars) + + def test_gen_candidate_chars(self): + for testcase in old_style_params_data: + self._assert_gen_candidate_chars(testcase) + + +class TestRandomPassword(unittest.TestCase): + def _assert_valid_chars(self, res, chars): + for res_char in res: + self.assertIn(res_char, chars) + + def test_default(self): + res = password._random_password() + self.assertEquals(len(res), password.DEFAULT_LENGTH) + self.assertTrue(isinstance(res, text_type)) + self._assert_valid_chars(res, DEFAULT_CANDIDATE_CHARS) + + def test_zero_length(self): + res = password._random_password(length=0) + self.assertEquals(len(res), 0) + self.assertTrue(isinstance(res, text_type)) + self._assert_valid_chars(res, u',') + + def test_just_a_common(self): + res = password._random_password(length=1, chars=u',') + self.assertEquals(len(res), 1) + self.assertEquals(res, u',') + + def test_free_will(self): + # A Rush and Spinal Tap reference twofer + res = password._random_password(length=11, chars=u'a') + self.assertEquals(len(res), 11) + self.assertEquals(res, 'aaaaaaaaaaa') + self._assert_valid_chars(res, u'a') + + def test_unicode(self): + res = password._random_password(length=11, chars=u'くらとみ') + self._assert_valid_chars(res, u'くらとみ') + self.assertEquals(len(res), 11) + + def test_gen_password(self): + for testcase in old_style_params_data: + params = testcase['params'] + candidate_chars = testcase['candidate_chars'] + params_chars_spec = password._gen_candidate_chars(params['chars']) + password_string = password._random_password(length=params['length'], + chars=params_chars_spec) + self.assertEquals(len(password_string), + params['length'], + msg='generated password=%s has length (%s) instead of expected length (%s)' % + (password_string, len(password_string), params['length'])) + + for char in password_string: + self.assertIn(char, candidate_chars, + msg='%s not found in %s from chars spect %s' % + (char, candidate_chars, params['chars'])) + + +class TestRandomSalt(unittest.TestCase): + def test(self): + res = password._random_salt() + expected_salt_candidate_chars = u'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789./' + self.assertEquals(len(res), 8) + for res_char in res: + self.assertIn(res_char, expected_salt_candidate_chars) + + +class TestParseContent(unittest.TestCase): + def test_empty_password_file(self): + plaintext_password, salt = password._parse_content(u'') + self.assertEquals(plaintext_password, u'') + self.assertEquals(salt, None) + + def test(self): + expected_content = u'12345678' + file_content = expected_content + plaintext_password, salt = password._parse_content(file_content) + self.assertEquals(plaintext_password, expected_content) + self.assertEquals(salt, None) + + def test_with_salt(self): + expected_content = u'12345678 salt=87654321' + file_content = expected_content + plaintext_password, salt = password._parse_content(file_content) + self.assertEquals(plaintext_password, u'12345678') + self.assertEquals(salt, u'87654321') + + +class TestFormatContent(unittest.TestCase): + def test_no_encrypt(self): + self.assertEqual( + password._format_content(password=u'hunter42', + salt=u'87654321', + encrypt=False), + u'hunter42 salt=87654321') + + def test_no_encrypt_no_salt(self): + self.assertEqual( + password._format_content(password=u'hunter42', + salt=None, + encrypt=False), + u'hunter42') + + def test_encrypt(self): + self.assertEqual( + password._format_content(password=u'hunter42', + salt=u'87654321', + encrypt='pbkdf2_sha256'), + u'hunter42 salt=87654321') + + def test_encrypt_no_salt(self): + self.assertRaises(AssertionError, password._format_content, + u'hunter42', None, 'pbkdf2_sha256') + + +class TestWritePasswordFile(unittest.TestCase): + def setUp(self): + self.makedirs_safe = password.makedirs_safe + self.os_chmod = password.os.chmod + password.makedirs_safe = lambda path, mode: None + password.os.chmod = lambda path, mode: None + + def tearDown(self): + password.makedirs_safe = self.makedirs_safe + password.os.chmod = self.os_chmod + + def test_content_written(self): + + with patch.object(builtins, 'open', mock_open()) as m: + password._write_password_file(b'/this/is/a/test/caf\xc3\xa9', u'Testing Café') + + m.assert_called_once_with(b'/this/is/a/test/caf\xc3\xa9', 'wb') + m().write.assert_called_once_with(u'Testing Café\n'.encode('utf-8')) + + +class TestLookupModule(unittest.TestCase): + def setUp(self): + self.fake_loader = DictDataLoader({'/path/to/somewhere':'sdfsdf'}) + self.password_lookup = password.LookupModule(loader=self.fake_loader) + self.os_path_exists = password.os.path.exists + + # Different releases of passlib default to a different number of rounds + self.sha256 = passlib.registry.get_crypt_handler('pbkdf2_sha256') + sha256_for_tests = pbkdf2.create_pbkdf2_hash("sha256", 32, 20000) + passlib.registry.register_crypt_handler(sha256_for_tests, force=True) + + def tearDown(self): + password.os.path.exists = self.os_path_exists + passlib.registry.register_crypt_handler(self.sha256, force=True) + + @patch.object(PluginLoader, '_get_paths') + @patch('ansible.plugins.lookup.password._write_password_file') + def test_no_encrypt(self, mock_get_paths, mock_write_file): + mock_get_paths.return_value = ['/path/one', '/path/two', '/path/three'] + + results = self.password_lookup.run([u'/path/to/somewhere'], + None) + + # FIXME: assert something useful + for result in results: + self.assertEquals(len(result), password.DEFAULT_LENGTH) + self.assertIsInstance(result, text_type) + + @patch.object(PluginLoader, '_get_paths') + @patch('ansible.plugins.lookup.password._write_password_file') + def test_encrypt(self, mock_get_paths, mock_write_file): + mock_get_paths.return_value = ['/path/one', '/path/two', '/path/three'] + + results = self.password_lookup.run([u'/path/to/somewhere encrypt=pbkdf2_sha256'], + None) + + # pbkdf2 format plus hash + expected_password_length = 76 + + for result in results: + self.assertEquals(len(result), expected_password_length) + # result should have 5 parts split by '$' + str_parts = result.split('$', 5) + + # verify the result is parseable by the passlib + crypt_parts = passlib.hash.pbkdf2_sha256.parsehash(result) + + # verify it used the right algo type + self.assertEquals(str_parts[1], 'pbkdf2-sha256') + + self.assertEquals(len(str_parts), 5) + + # verify the string and parsehash agree on the number of rounds + self.assertEquals(int(str_parts[2]), crypt_parts['rounds']) + self.assertIsInstance(result, text_type) + + @patch.object(PluginLoader, '_get_paths') + @patch('ansible.plugins.lookup.password._write_password_file') + def test_password_already_created_encrypt(self, mock_get_paths, mock_write_file): + mock_get_paths.return_value = ['/path/one', '/path/two', '/path/three'] + password.os.path.exists = lambda x: True + + with patch.object(builtins, 'open', mock_open(read_data=b'hunter42 salt=87654321\n')) as m: + results = self.password_lookup.run([u'/path/to/somewhere chars=anything encrypt=pbkdf2_sha256'], + None) + for result in results: + self.assertEqual(result, u'$pbkdf2-sha256$20000$ODc2NTQzMjE$Uikde0cv0BKaRaAXMrUQB.zvG4GmnjClwjghwIRf2gU') + + @patch.object(PluginLoader, '_get_paths') + @patch('ansible.plugins.lookup.password._write_password_file') + def test_password_already_created_no_encrypt(self, mock_get_paths, mock_write_file): + mock_get_paths.return_value = ['/path/one', '/path/two', '/path/three'] + password.os.path.exists = lambda x: True + + with patch.object(builtins, 'open', mock_open(read_data=b'hunter42 salt=87654321\n')) as m: + results = self.password_lookup.run([u'/path/to/somewhere chars=anything'], + None) + + for result in results: + self.assertEqual(result, u'hunter42') + + @patch.object(PluginLoader, '_get_paths') + @patch('ansible.plugins.lookup.password._write_password_file') + def test_only_a(self, mock_get_paths, mock_write_file): + mock_get_paths.return_value = ['/path/one', '/path/two', '/path/three'] + + results = self.password_lookup.run([u'/path/to/somewhere chars=a'], + None) + for result in results: + self.assertEquals(result, u'a' * password.DEFAULT_LENGTH)