diff --git a/lib/ansible/cli/__init__.py b/lib/ansible/cli/__init__.py index 820d549bdf..bb9c480f40 100644 --- a/lib/ansible/cli/__init__.py +++ b/lib/ansible/cli/__init__.py @@ -34,7 +34,7 @@ from abc import ABCMeta, abstractmethod import ansible from ansible import constants as C -from ansible.errors import AnsibleError, AnsibleOptionsError +from ansible.errors import AnsibleOptionsError from ansible.inventory.manager import InventoryManager from ansible.module_utils.six import with_metaclass, string_types from ansible.module_utils._text import to_bytes, to_text @@ -43,6 +43,7 @@ from ansible.release import __version__ from ansible.utils.path import unfrackpath from ansible.utils.vars import load_extra_vars, load_options_vars from ansible.vars.manager import VariableManager +from ansible.parsing.vault import PromptVaultSecret, get_file_vault_secret try: from __main__ import display @@ -168,37 +169,89 @@ class CLI(with_metaclass(ABCMeta, object)): display.v(u"No config file found; using defaults") @staticmethod - def ask_vault_passwords(): - ''' prompt for vault password and/or password change ''' + def split_vault_id(vault_id): + # return (before_@, after_@) + # if no @, return whole string as after_ + if '@' not in vault_id: + return (None, vault_id) - vault_pass = None - try: - vault_pass = getpass.getpass(prompt="Vault password: ") - - except EOFError: - pass - - # enforce no newline chars at the end of passwords - if vault_pass: - vault_pass = to_bytes(vault_pass, errors='surrogate_or_strict', nonstring='simplerepr').strip() - - return vault_pass + parts = vault_id.split('@', 1) + ret = tuple(parts) + return ret @staticmethod - def ask_new_vault_passwords(): - new_vault_pass = None - try: - new_vault_pass = getpass.getpass(prompt="New Vault password: ") - new_vault_pass2 = getpass.getpass(prompt="Confirm New Vault password: ") - if new_vault_pass != new_vault_pass2: - raise AnsibleError("Passwords do not match") - except EOFError: - pass + def build_vault_ids(vault_ids, vault_password_files=None, ask_vault_pass=None): + vault_password_files = vault_password_files or [] + vault_ids = vault_ids or [] - if new_vault_pass: - new_vault_pass = to_bytes(new_vault_pass, errors='surrogate_or_strict', nonstring='simplerepr').strip() + # convert vault_password_files into vault_ids slugs + for password_file in vault_password_files: + id_slug = u'%s@%s' % (C.DEFAULT_VAULT_IDENTITY, password_file) - return new_vault_pass + # note this makes --vault-id higher precendence than --vault-password-file + # if we want to intertwingle them in order probably need a cli callback to populate vault_ids + # used by --vault-id and --vault-password-file + vault_ids.append(id_slug) + + if ask_vault_pass: + id_slug = u'%s@%s' % (C.DEFAULT_VAULT_IDENTITY, u'prompt') + vault_ids.append(id_slug) + + return vault_ids + + # TODO: remove the now unused args + @staticmethod + def setup_vault_secrets(loader, vault_ids, vault_password_files=None, + ask_vault_pass=None, create_new_password=False): + # list of tuples + vault_secrets = [] + + if create_new_password: + prompt_formats = ['New vault password (%s): ', + 'Confirm vew vault password (%s): '] + else: + prompt_formats = ['Vault password (%s): '] + + vault_ids = CLI.build_vault_ids(vault_ids, + vault_password_files, + ask_vault_pass) + + for index, vault_id_slug in enumerate(vault_ids): + vault_id_name, vault_id_value = CLI.split_vault_id(vault_id_slug) + if vault_id_value == 'prompt': + # TODO: we could assume --vault-id=prompt implies --ask-vault-pass + # if not, we need to 'if ask_vault_pass' here + if vault_id_name: + prompted_vault_secret = PromptVaultSecret(prompt_formats=prompt_formats, vault_id=vault_id_name) + prompted_vault_secret.load() + vault_secrets.append((vault_id_name, prompted_vault_secret)) + else: + prompted_vault_secret = PromptVaultSecret(prompt_formats=prompt_formats, + vault_id=C.DEFAULT_VAULT_IDENTITY) + prompted_vault_secret.load() + vault_secrets.append((C.DEFAULT_VAULT_IDENTITY, prompted_vault_secret)) + + # update loader with new secrets incrementally, so we can load a vault password + # that is encrypted with a vault secret provided earlier + loader.set_vault_secrets(vault_secrets) + continue + + # assuming anything else is a password file + display.vvvvv('Reading vault password file: %s' % vault_id_value) + # read vault_pass from a file + file_vault_secret = get_file_vault_secret(filename=vault_id_value, + vault_id_name=vault_id_name, + loader=loader) + file_vault_secret.load() + if vault_id_name: + vault_secrets.append((vault_id_name, file_vault_secret)) + else: + vault_secrets.append((C.DEFAULT_VAULT_IDENTITY, file_vault_secret)) + + # update loader with as-yet-known vault secrets + loader.set_vault_secrets(vault_secrets) + + return vault_secrets def ask_passwords(self): ''' prompt for connection and become passwords if needed ''' @@ -260,7 +313,7 @@ class CLI(with_metaclass(ABCMeta, object)): if vault_opts: # Check for vault related conflicts - if (op.ask_vault_pass and op.vault_password_file): + if (op.ask_vault_pass and op.vault_password_files): self.parser.error("--ask-vault-pass and --vault-password-file are mutually exclusive") if runas_opts: @@ -278,12 +331,14 @@ class CLI(with_metaclass(ABCMeta, object)): @staticmethod def unfrack_paths(option, opt, value, parser): + paths = getattr(parser.values, option.dest) if isinstance(value, string_types): - setattr(parser.values, option.dest, [unfrackpath(x) for x in value.split(os.pathsep)]) + paths.extend([unfrackpath(x) for x in value.split(os.pathsep)]) elif isinstance(value, list): - setattr(parser.values, option.dest, [unfrackpath(x) for x in value]) + paths.extend([unfrackpath(x) for x in value]) else: pass # FIXME: should we raise options error? + setattr(parser.values, option.dest, paths) @staticmethod def unfrack_path(option, opt, value, parser): @@ -324,13 +379,17 @@ class CLI(with_metaclass(ABCMeta, object)): if vault_opts: parser.add_option('--ask-vault-pass', default=C.DEFAULT_ASK_VAULT_PASS, dest='ask_vault_pass', action='store_true', help='ask for vault password') - parser.add_option('--vault-password-file', default=C.DEFAULT_VAULT_PASSWORD_FILE, dest='vault_password_file', - help="vault password file", action="callback", callback=CLI.unfrack_path, type='string') - parser.add_option('--new-vault-password-file', dest='new_vault_password_file', - help="new vault password file for rekey", action="callback", callback=CLI.unfrack_path, type='string') + parser.add_option('--vault-password-file', default=[], dest='vault_password_files', + help="vault password file", action="callback", callback=CLI.unfrack_paths, type='string') + parser.add_option('--new-vault-password-file', default=[], dest='new_vault_password_files', + help="new vault password file for rekey", action="callback", callback=CLI.unfrack_paths, type='string') parser.add_option('--output', default=None, dest='output_file', help='output file name for encrypt or decrypt; use - for stdout', - action="callback", callback=CLI.unfrack_path, type='string') + action="callback", callback=CLI.unfrack_path, type='string'), + parser.add_option('--vault-id', default=[], dest='vault_ids', action='append', type='string', + help='the vault identity to use') + parser.add_option('--new-vault-id', default=None, dest='new_vault_id', type='string', + help='the new vault identity to use for rekey') if subset_opts: parser.add_option('-t', '--tags', dest='tags', default=[], action='append', @@ -649,54 +708,17 @@ class CLI(with_metaclass(ABCMeta, object)): return t - @staticmethod - def read_vault_password_file(vault_password_file, loader): - """ - Read a vault password from a file or if executable, execute the script and - retrieve password from STDOUT - """ - - this_path = os.path.realpath(os.path.expanduser(vault_password_file)) - if not os.path.exists(this_path): - raise AnsibleError("The vault password file %s was not found" % this_path) - - if loader.is_executable(this_path): - try: - # STDERR not captured to make it easier for users to prompt for input in their scripts - p = subprocess.Popen(this_path, stdout=subprocess.PIPE) - except OSError as e: - raise AnsibleError("Problem running vault password script %s (%s). If this is not a script, " - "remove the executable bit from the file." % (' '.join(this_path), e)) - stdout, stderr = p.communicate() - if p.returncode != 0: - raise AnsibleError("Vault password script %s returned non-zero (%s): %s" % (this_path, p.returncode, p.stderr)) - vault_pass = stdout.strip(b'\r\n') - else: - try: - f = open(this_path, "rb") - vault_pass = f.read().strip() - f.close() - except (OSError, IOError) as e: - raise AnsibleError("Could not read vault password file %s: %s" % (this_path, e)) - - return vault_pass - @staticmethod def _play_prereqs(options): # all needs loader loader = DataLoader() - # vault - b_vault_pass = None - if options.vault_password_file: - # read vault_pass from a file - b_vault_pass = CLI.read_vault_password_file(options.vault_password_file, loader=loader) - elif options.ask_vault_pass: - b_vault_pass = CLI.ask_vault_passwords() - - if b_vault_pass is not None: - loader.set_vault_password(b_vault_pass) + vault_secrets = CLI.setup_vault_secrets(loader, + vault_ids=options.vault_ids, + vault_password_files=options.vault_password_files, + ask_vault_pass=options.ask_vault_pass) + loader.set_vault_secrets(vault_secrets) # create the inventory, and filter it based on the subset specified (if any) inventory = InventoryManager(loader=loader, sources=options.inventory) diff --git a/lib/ansible/cli/console.py b/lib/ansible/cli/console.py index a70f7020e8..cb15d7970b 100644 --- a/lib/ansible/cli/console.py +++ b/lib/ansible/cli/console.py @@ -415,6 +415,12 @@ class ConsoleCLI(CLI, cmd.Cmd): self.loader, self.inventory, self.variable_manager = self._play_prereqs(self.options) + vault_secrets = self.setup_vault_secrets(self.loader, + vault_id=self.options.vault_ids, + vault_password_files=self.options.vault_password_files, + ask_vault_pass=self.options.ask_vault_pass) + self.loader.set_vault_secrets(vault_secrets) + no_hosts = False if len(self.inventory.list_hosts()) == 0: # Empty inventory diff --git a/lib/ansible/cli/pull.py b/lib/ansible/cli/pull.py index 98f3a9ebfc..5ebb51881f 100644 --- a/lib/ansible/cli/pull.py +++ b/lib/ansible/cli/pull.py @@ -226,8 +226,9 @@ class PullCLI(CLI): # Build playbook command cmd = '%s/ansible-playbook %s %s' % (bin_path, base_opts, playbook) - if self.options.vault_password_file: - cmd += " --vault-password-file=%s" % self.options.vault_password_file + if self.options.vault_password_files: + for vault_password_file in self.options.vault_password_files: + cmd += " --vault-password-file=%s" % vault_password_file if inv_opts: cmd += ' %s' % inv_opts for ev in self.options.extra_vars: diff --git a/lib/ansible/cli/vault.py b/lib/ansible/cli/vault.py index ce3da2b8fe..1f28cfb8ce 100644 --- a/lib/ansible/cli/vault.py +++ b/lib/ansible/cli/vault.py @@ -23,10 +23,10 @@ import os import sys from ansible.cli import CLI -from ansible.errors import AnsibleError, AnsibleOptionsError +from ansible.errors import AnsibleOptionsError from ansible.module_utils._text import to_text, to_bytes from ansible.parsing.dataloader import DataLoader -from ansible.parsing.vault import VaultEditor +from ansible.parsing.vault import VaultEditor, VaultLib, match_encrypt_secret try: from __main__ import display @@ -59,6 +59,12 @@ class VaultCLI(CLI): self.b_vault_pass = None self.b_new_vault_pass = None self.encrypt_string_read_stdin = False + + self.encrypt_secret = None + self.encrypt_vault_id = None + self.new_encrypt_secret = None + self.new_encrypt_vault_id = None + super(VaultCLI, self).__init__(args) def set_action(self): @@ -108,6 +114,11 @@ class VaultCLI(CLI): can_output = ['encrypt', 'decrypt', 'encrypt_string'] + if self.options.vault_ids: + for vault_id in self.options.vault_ids: + if u';' in vault_id: + raise AnsibleOptionsError("'%s' is not a valid vault id. The character ';' is not allowed in vault ids" % vault_id) + if self.action not in can_output: if self.options.output_file: raise AnsibleOptionsError("The --output option can be used only with ansible-vault %s" % '/'.join(can_output)) @@ -132,43 +143,79 @@ class VaultCLI(CLI): raise AnsibleOptionsError('The --prompt option is not supported if also reading input from stdin') def run(self): - super(VaultCLI, self).run() loader = DataLoader() # set default restrictive umask old_umask = os.umask(0o077) - if self.options.vault_password_file: - # read vault_pass from a file - self.b_vault_pass = CLI.read_vault_password_file(self.options.vault_password_file, loader) + vault_ids = self.options.vault_ids - if self.options.new_vault_password_file: - # for rekey only - self.b_new_vault_pass = CLI.read_vault_password_file(self.options.new_vault_password_file, loader) + # there are 3 types of actions, those that just 'read' (decrypt, view) and only + # need to ask for a password once, and those that 'write' (create, encrypt) that + # ask for a new password and confirm it, and 'read/write (rekey) that asks for the + # old password, then asks for a new one and confirms it. - if not self.b_vault_pass or self.options.ask_vault_pass: - # the 'read' options don't need to ask for password confirmation. - # 'edit' is read/write, but the decrypt will confirm. - if self.action in ['decrypt', 'edit', 'view', 'rekey']: - self.b_vault_pass = self.ask_vault_passwords() - else: - self.b_vault_pass = self.ask_new_vault_passwords() + # TODO: instead of prompting for these before, we could let VaultEditor + # call a callback when it needs it. + if self.action in ['decrypt', 'view', 'rekey']: + vault_secrets = self.setup_vault_secrets(loader, + vault_ids=vault_ids, + vault_password_files=self.options.vault_password_files, + ask_vault_pass=self.options.ask_vault_pass) - if not self.b_vault_pass: - raise AnsibleOptionsError("A password is required to use Ansible's Vault") + if not vault_secrets: + raise AnsibleOptionsError("A vault password is required to use Ansible's Vault") - if self.action == 'rekey': - if not self.b_new_vault_pass: - self.b_new_vault_pass = self.ask_new_vault_passwords() - if not self.b_new_vault_pass: - raise AnsibleOptionsError("A password is required to rekey Ansible's Vault") + if self.action in ['encrypt', 'encrypt_string', 'create', 'edit']: + if len(vault_ids) > 1: + raise AnsibleOptionsError("Only one --vault-id can be used for encryption") - if self.action == 'encrypt_string': - if self.options.encrypt_string_prompt: - self.encrypt_string_prompt = True + vault_secrets = None + vault_secrets = \ + self.setup_vault_secrets(loader, + vault_ids=vault_ids, + vault_password_files=self.options.vault_password_files, + ask_vault_pass=self.options.ask_vault_pass, + create_new_password=True) - self.editor = VaultEditor(self.b_vault_pass) + if not vault_secrets: + raise AnsibleOptionsError("A vault password is required to use Ansible's Vault") + + encrypt_secret = match_encrypt_secret(vault_secrets) + # only one secret for encrypt for now, use the first vault_id and use its first secret + # self.encrypt_vault_id = list(vault_secrets.keys())[0] + # self.encrypt_secret = vault_secrets[self.encrypt_vault_id][0] + self.encrypt_vault_id = encrypt_secret[0] + self.encrypt_secret = encrypt_secret[1] + + if self.action in ['rekey']: + new_vault_ids = [] + if self.options.new_vault_id: + new_vault_ids.append(self.options.new_vault_id) + + new_vault_secrets = \ + self.setup_vault_secrets(loader, + vault_ids=new_vault_ids, + vault_password_files=self.options.new_vault_password_files, + ask_vault_pass=self.options.ask_vault_pass, + create_new_password=True) + + if not new_vault_secrets: + raise AnsibleOptionsError("A new vault password is required to use Ansible's Vault rekey") + + # There is only one new_vault_id currently and one new_vault_secret + new_encrypt_secret = match_encrypt_secret(new_vault_secrets) + + self.new_encrypt_vault_id = new_encrypt_secret[0] + self.new_encrypt_secret = new_encrypt_secret[1] + + loader.set_vault_secrets(vault_secrets) + self.secrets = vault_secrets + + # FIXME: do we need to create VaultEditor here? its not reused + vault = VaultLib(self.secrets) + self.editor = VaultEditor(vault) self.execute() @@ -182,7 +229,10 @@ class VaultCLI(CLI): display.display("Reading plaintext input from stdin", stderr=True) for f in self.args or ['-']: - self.editor.encrypt_file(f, output_file=self.options.output_file) + # Fixme: use the correct vau + self.editor.encrypt_file(f, self.encrypt_secret, + vault_id=self.encrypt_vault_id, + output_file=self.options.output_file) if sys.stdout.isatty(): display.display("Encryption successful", stderr=True) @@ -227,6 +277,8 @@ class VaultCLI(CLI): if name_prompt_response != "": name = name_prompt_response + # TODO: could prompt for which vault_id to use for each plaintext string + # currently, it will just be the default # could use private=True for shadowed input if useful prompt_response = display.prompt(msg) @@ -282,8 +334,9 @@ class VaultCLI(CLI): b_plaintext = to_bytes(plaintext) b_plaintext_list.append((b_plaintext, self.FROM_ARGS, name)) + # TODO: specify vault_id per string? # Format the encrypted strings and any corresponding stderr output - outputs = self._format_output_vault_strings(b_plaintext_list) + outputs = self._format_output_vault_strings(b_plaintext_list, vault_id=self.encrypt_vault_id) for output in outputs: err = output.get('err', None) @@ -297,7 +350,7 @@ class VaultCLI(CLI): # TODO: offer block or string ala eyaml - def _format_output_vault_strings(self, b_plaintext_list): + def _format_output_vault_strings(self, b_plaintext_list, vault_id=None): # If we are only showing one item in the output, we don't need to included commented # delimiters in the text show_delimiter = False @@ -313,7 +366,9 @@ class VaultCLI(CLI): for index, b_plaintext_info in enumerate(b_plaintext_list): # (the text itself, which input it came from, its name) b_plaintext, src, name = b_plaintext_info - b_ciphertext = self.editor.encrypt_bytes(b_plaintext) + + b_ciphertext = self.editor.encrypt_bytes(b_plaintext, self.encrypt_secret, + vault_id=vault_id) # block formatting yaml_text = self.format_ciphertext_yaml(b_ciphertext, name=name) @@ -347,7 +402,8 @@ class VaultCLI(CLI): if len(self.args) > 1: raise AnsibleOptionsError("ansible-vault create can take only one filename argument") - self.editor.create_file(self.args[0]) + self.editor.create_file(self.args[0], self.encrypt_secret, + vault_id=self.encrypt_vault_id) def execute_edit(self): ''' open and decrypt an existing vaulted file in an editor, that will be encryped again when closed''' @@ -363,15 +419,14 @@ class VaultCLI(CLI): # unicode here because we are displaying it and therefore can make # the decision that the display doesn't have to be precisely what # the input was (leave that to decrypt instead) - self.pager(to_text(self.editor.plaintext(f))) + plaintext = self.editor.plaintext(f) + self.pager(to_text(plaintext)) def execute_rekey(self): ''' re-encrypt a vaulted file with a new secret, the previous secret is required ''' for f in self.args: - if not (os.path.isfile(f)): - raise AnsibleError(f + " does not exist") - - for f in self.args: - self.editor.rekey_file(f, self.b_new_vault_pass) + # FIXME: plumb in vault_id, use the default new_vault_secret for now + self.editor.rekey_file(f, self.new_encrypt_secret, + self.new_encrypt_vault_id) display.display("Rekey successful", stderr=True) diff --git a/lib/ansible/config/data/config.yml b/lib/ansible/config/data/config.yml index a1be87bdda..a4a2f9753d 100644 --- a/lib/ansible/config/data/config.yml +++ b/lib/ansible/config/data/config.yml @@ -1072,6 +1072,22 @@ DEFAULT_VAR_COMPRESSION_LEVEL: value_type: integer vars: [] yaml: {key: defaults.var_compression_level} +DEFAULT_VAULT_ID_MATCH: + default: False + desc: 'If true, decrypting vaults with a vault id will only try the password from the matching vault-id' + env: [{name: ANSIBLE_VAULT_ID_MATCH}] + ini: + - {key: vault_id_match, section: defaults} + vars: [] + yaml: {key: defaults.vault_id_match} +DEFAULT_VAULT_IDENTITY: + default: default + desc: 'TODO: write it' + env: [{name: ANSIBLE_VAULT_IDENTITY}] + ini: + - {key: vault_identity, section: defaults} + vars: [] + yaml: {key: defaults.vault_identity} DEFAULT_VAULT_PASSWORD_FILE: default: ~ desc: 'TODO: write it' diff --git a/lib/ansible/parsing/dataloader.py b/lib/ansible/parsing/dataloader.py index 2844a890dd..13aa273db2 100644 --- a/lib/ansible/parsing/dataloader.py +++ b/lib/ansible/parsing/dataloader.py @@ -26,12 +26,13 @@ import re import tempfile from yaml import YAMLError +from ansible.module_utils.six import text_type, string_types from ansible.errors import AnsibleFileNotFound, AnsibleParserError from ansible.errors.yaml_strings import YAML_SYNTAX_ERROR from ansible.module_utils.basic import is_executable from ansible.module_utils.six import binary_type, text_type from ansible.module_utils._text import to_bytes, to_native, to_text -from ansible.parsing.vault import VaultLib, b_HEADER, is_encrypted, is_encrypted_file +from ansible.parsing.vault import VaultLib, b_HEADER, is_encrypted, is_encrypted_file, parse_vaulttext_envelope from ansible.parsing.quoting import unquote from ansible.parsing.yaml.loader import AnsibleLoader from ansible.parsing.yaml.objects import AnsibleBaseYAMLObject, AnsibleUnicode @@ -73,11 +74,16 @@ class DataLoader: self._tempfiles = set() # initialize the vault stuff with an empty password - self.set_vault_password(None) + # TODO: replace with a ref to something that can get the password + # a creds/auth provider + # self.set_vault_password(None) + self._vaults = {} + self._vault = VaultLib() + self.set_vault_secrets(None) - def set_vault_password(self, b_vault_password): - self._b_vault_password = b_vault_password - self._vault = VaultLib(b_password=b_vault_password) + # TODO: since we can query vault_secrets late, we could provide this to DataLoader init + def set_vault_secrets(self, vault_secrets): + self._vault.secrets = vault_secrets def load(self, data, file_name='', show_content=True): ''' @@ -170,7 +176,7 @@ class DataLoader: def _safe_load(self, stream, file_name=None): ''' Implements yaml.safe_load(), except using our custom loader class. ''' - loader = AnsibleLoader(stream, file_name, self._b_vault_password) + loader = AnsibleLoader(stream, file_name, self._vault.secrets) try: return loader.get_single_data() finally: @@ -206,6 +212,8 @@ class DataLoader: with open(b_file_name, 'rb') as f: data = f.read() if is_encrypted(data): + # FIXME: plugin vault selector + b_ciphertext, b_version, cipher_name, vault_id = parse_vaulttext_envelope(data) data = self._vault.decrypt(data, filename=b_file_name) show_content = False @@ -362,7 +370,6 @@ class DataLoader: b_upath = to_bytes(upath, errors='surrogate_or_strict') b_mydir = os.path.dirname(b_upath) - # FIXME: this detection fails with non main.yml roles # if path is in role and 'tasks' not there already, add it into the search if is_role or self._is_role(path): if b_mydir.endswith(b'tasks'): @@ -439,8 +446,8 @@ class DataLoader: # the decrypt call would throw an error, but we check first # since the decrypt function doesn't know the file name data = f.read() - if not self._b_vault_password: - raise AnsibleParserError("A vault password must be specified to decrypt %s" % to_native(file_path)) + if not self._vault.secrets: + raise AnsibleParserError("A vault password or secret must be specified to decrypt %s" % to_native(file_path)) data = self._vault.decrypt(data, filename=real_path) # Make a temp file diff --git a/lib/ansible/parsing/vault/__init__.py b/lib/ansible/parsing/vault/__init__.py index e281df3384..c69910f2d7 100644 --- a/lib/ansible/parsing/vault/__init__.py +++ b/lib/ansible/parsing/vault/__init__.py @@ -23,6 +23,7 @@ import os import random import shlex import shutil +import subprocess import sys import tempfile import warnings @@ -72,6 +73,7 @@ except ImportError: pass from ansible.errors import AnsibleError +from ansible import constants as C from ansible.module_utils.six import PY3, binary_type from ansible.module_utils.six.moves import zip from ansible.module_utils._text import to_bytes, to_text @@ -142,12 +144,267 @@ def is_encrypted_file(file_obj, start_pos=0, count=-1): file_obj.seek(current_position) -class VaultLib: +def parse_vaulttext_envelope(b_vaulttext_envelope, default_vault_id=None): + """Retrieve information about the Vault and clean the data - def __init__(self, b_password): - self.b_password = to_bytes(b_password, errors='strict', encoding='utf-8') + When data is saved, it has a header prepended and is formatted into 80 + character lines. This method extracts the information from the header + and then removes the header and the inserted newlines. The string returned + is suitable for processing by the Cipher classes. + + :arg b_vaulttext: byte str containing the data from a save file + :returns: a byte str suitable for passing to a Cipher class's + decrypt() function. + """ + # used by decrypt + default_vault_id = default_vault_id or C.DEFAULT_VAULT_IDENTITY + + b_tmpdata = b_vaulttext_envelope.split(b'\n') + b_tmpheader = b_tmpdata[0].strip().split(b';') + + b_version = b_tmpheader[1].strip() + cipher_name = to_text(b_tmpheader[2].strip()) + vault_id = default_vault_id + # vault_id = None + + # Only attempt to find vault_id if the vault file is version 1.2 or newer + # if self.b_version == b'1.2': + if len(b_tmpheader) >= 4: + vault_id = to_text(b_tmpheader[3].strip()) + + b_ciphertext = b''.join(b_tmpdata[1:]) + + return b_ciphertext, b_version, cipher_name, vault_id + + +def format_vaulttext_envelope(b_ciphertext, cipher_name, version=None, vault_id=None): + """ Add header and format to 80 columns + + :arg b_ciphertext: the encrypted and hexlified data as a byte string + :arg cipher_name: unicode cipher name (for ex, u'AES256') + :arg version: unicode vault version (for ex, '1.2'). Optional ('1.1' is default) + :arg vault_id: unicode vault identifier. If provided, the version will be bumped to 1.2. + :returns: a byte str that should be dumped into a file. It's + formatted to 80 char columns and has the header prepended + """ + + if not cipher_name: + raise AnsibleError("the cipher must be set before adding a header") + + version = version or '1.1' + + # If we specify a vault_id, use format version 1.2. For no vault_id, stick to 1.1 + if vault_id and vault_id != u'default': + version = '1.2' + + b_version = to_bytes(version, 'utf-8', errors='strict') + b_vault_id = to_bytes(vault_id, 'utf-8', errors='strict') + b_cipher_name = to_bytes(cipher_name, 'utf-8', errors='strict') + + header_parts = [b_HEADER, + b_version, + b_cipher_name] + + if b_version == b'1.2' and b_vault_id: + header_parts.append(b_vault_id) + + header = b';'.join(header_parts) + + b_vaulttext = [header] + b_vaulttext += [b_ciphertext[i:i + 80] for i in range(0, len(b_ciphertext), 80)] + b_vaulttext += [b''] + b_vaulttext = b'\n'.join(b_vaulttext) + + return b_vaulttext + + +class VaultSecret: + '''Opaque/abstract objects for a single vault secret. ie, a password or a key.''' + def __init__(self, _bytes=None): + # FIXME: ? that seems wrong... Unset etc? + self._bytes = _bytes + + @property + def bytes(self): + '''The secret as a bytestring. + + Sub classes that store text types will need to override to encode the text to bytes. + ''' + return self._bytes + + def load(self): + return self._bytes + + +class PromptVaultSecret(VaultSecret): + default_prompt_formats = ["Vault password (%s): "] + + def __init__(self, _bytes=None, vault_id=None, prompt_formats=None): + self._bytes = _bytes + self.vault_id = vault_id + + if prompt_formats is None: + self.prompt_formats = self.default_prompt_formats + else: + self.prompt_formats = prompt_formats + + @property + def bytes(self): + return self._bytes + + def load(self): + self._bytes = self.ask_vault_passwords() + + def ask_vault_passwords(self): + b_vault_passwords = [] + + for prompt_format in self.prompt_formats: + prompt = prompt_format % self.vault_id + try: + vault_pass = display.prompt(prompt, private=True) + except EOFError: + pass + b_vault_pass = to_bytes(vault_pass, errors='strict', nonstring='simplerepr').strip() + b_vault_passwords.append(b_vault_pass) + + # Make sure the passwords match by comparing them all to the first password + for b_vault_password in b_vault_passwords: + self.confirm(b_vault_passwords[0], b_vault_password) + + if b_vault_passwords: + return b_vault_passwords[0] + + return None + + def confirm(self, b_vault_pass_1, b_vault_pass_2): + # enforce no newline chars at the end of passwords + + if b_vault_pass_1 != b_vault_pass_2: + # FIXME: more specific exception + raise AnsibleError("Passwords do not match") + + +def get_file_vault_secret(filename=None, vault_id_name=None, encoding=None, loader=None): + this_path = os.path.realpath(os.path.expanduser(filename)) + + if not os.path.exists(this_path): + raise AnsibleError("The vault password file %s was not found" % this_path) + + if loader.is_executable(this_path): + # TODO: pass vault_id_name to script via cli + return ScriptVaultSecret(filename=this_path, encoding=encoding, loader=loader) + else: + return FileVaultSecret(filename=this_path, encoding=encoding, loader=loader) + + +# TODO: mv these classes to a seperate file so we don't pollute vault with 'subprocess' etc +class FileVaultSecret(VaultSecret): + def __init__(self, filename=None, encoding=None, loader=None): + super(FileVaultSecret, self).__init__() + self.filename = filename + self.loader = loader + + self.encoding = encoding or 'utf8' + + # We could load from file here, but that is eventually a pain to test + self._bytes = None + self._text = None + + @property + def bytes(self): + if self._bytes: + return self._bytes + if self._text: + return self._text.encode(self.encoding) + return None + + def load(self): + self._bytes = self.read_file(self.filename, self.loader) + + @staticmethod + def read_file(filename, loader): + """ + Read a vault password from a file or if executable, execute the script and + retrieve password from STDOUT + """ + + try: + f = open(filename, "rb") + vault_pass = f.read().strip() + f.close() + except (OSError, IOError) as e: + raise AnsibleError("Could not read vault password file %s: %s" % (filename, e)) + + return vault_pass + + def __repr__(self): + if self.filename: + return "%s(filename='%s')" % (self.__class__.__name__, self.filename) + return "%s()" % (self.__class__.__name__) + + +class ScriptVaultSecret(FileVaultSecret): + + @staticmethod + def read_file(filename, loader): + if not loader.is_executable(filename): + raise AnsibleVaultError("The vault password script %s was not executable" % filename) + + try: + # STDERR not captured to make it easier for users to prompt for input in their scripts + p = subprocess.Popen(filename, stdout=subprocess.PIPE) + except OSError as e: + msg_format = "Problem running vault password script %s (%s)." + "If this is not a script, remove the executable bit from the file." + msg = msg_format % (' '.join(filename), e) + + raise AnsibleError(msg) + + stdout, stderr = p.communicate() + + if p.returncode != 0: + raise AnsibleError("Vault password script %s returned non-zero (%s): %s" % (filename, p.returncode, p.stderr)) + + vault_pass = stdout.strip(b'\r\n') + return vault_pass + + +def match_secrets(secrets, target_vault_ids): + '''Find all VaultSecret objects that are mapped to any of the target_vault_ids in secrets''' + if not secrets: + return [] + + matches = [(vault_id, secret) for vault_id, secret in secrets if vault_id in target_vault_ids] + return matches + + +def match_best_secret(secrets, target_vault_ids): + '''Find the best secret from secrets that matches target_vault_ids + + Since secrets should be ordered so the early secrets are 'better' than later ones, this + just finds all the matches, then returns the first secret''' + matches = match_secrets(secrets, target_vault_ids) + if matches: + return matches[0] + # raise exception? + return None + + +def match_encrypt_secret(secrets): + '''Find the best/first/only secret in secrets to use for encrypting''' + + # ie, consider all of the available secrets as matches + _vault_id_matchers = [_vault_id for _vault_id, _vault_secret in secrets] + best_secret = match_best_secret(secrets, _vault_id_matchers) + # can be empty list sans any tuple + return best_secret + + +class VaultLib: + def __init__(self, secrets=None): + self.secrets = secrets or [] self.cipher_name = None - self.b_version = b'1.1' + self.b_version = b'1.2' @staticmethod def is_encrypted(data): @@ -169,7 +426,7 @@ class VaultLib: display.deprecated(u'vault.VaultLib.is_encrypted_file is deprecated. Use vault.is_encrypted_file instead', version='2.4') return is_encrypted_file(file_obj) - def encrypt(self, plaintext): + def encrypt(self, plaintext, secret=None, vault_id=None): """Vault encrypt a piece of data. :arg plaintext: a text or byte string to encrypt. @@ -181,6 +438,13 @@ class VaultLib: If the string passed in is a text string, it will be encoded to UTF-8 before encryption. """ + + if secret is None: + if self.secrets: + secret_vault_id, secret = match_encrypt_secret(self.secrets) + else: + raise AnsibleVaultError("A vault password must be specified to encrypt data") + b_plaintext = to_bytes(plaintext, errors='surrogate_or_strict') if is_encrypted(b_plaintext): @@ -195,10 +459,13 @@ class VaultLib: raise AnsibleError(u"{0} cipher could not be found".format(self.cipher_name)) # encrypt data - b_ciphertext = this_cipher.encrypt(b_plaintext, self.b_password) + display.vvvvv('Encrypting with vault secret %s' % secret) + b_ciphertext = this_cipher.encrypt(b_plaintext, secret) # format the data for output to the file - b_vaulttext = self._format_output(b_ciphertext) + b_vaulttext = format_vaulttext_envelope(b_ciphertext, + self.cipher_name, + vault_id=vault_id) return b_vaulttext def decrypt(self, vaulttext, filename=None): @@ -213,8 +480,8 @@ class VaultLib: """ b_vaulttext = to_bytes(vaulttext, errors='strict', encoding='utf-8') - if self.b_password is None: - raise AnsibleError("A vault password must be specified to decrypt data") + if self.secrets is None: + raise AnsibleVaultError("A vault password must be specified to decrypt data") if not is_encrypted(b_vaulttext): msg = "input is not vault encrypted data" @@ -222,17 +489,70 @@ class VaultLib: msg += "%s is not a vault encrypted file" % filename raise AnsibleError(msg) - # clean out header - b_vaulttext = self._split_header(b_vaulttext) + b_vaulttext, b_version, cipher_name, vault_id = parse_vaulttext_envelope(b_vaulttext) - # create the cipher object - if self.cipher_name in CIPHER_WHITELIST: - this_cipher = CIPHER_MAPPING[self.cipher_name]() + # create the cipher object, note that the cipher used for decrypt can + # be different than the cipher used for encrypt + if cipher_name in CIPHER_WHITELIST: + this_cipher = CIPHER_MAPPING[cipher_name]() else: - raise AnsibleError("{0} cipher could not be found".format(self.cipher_name)) + raise AnsibleError("{0} cipher could not be found".format(cipher_name)) + + b_plaintext = None + + if not self.secrets: + raise AnsibleVaultError('Attempting to decrypt but no vault secrets found') + + # WARNING: Currently, the vault id is not required to match the vault id in the vault blob to + # decrypt a vault properly. The vault id in the vault blob is not part of the encrypted + # or signed vault payload. There is no cryptographic checking/verification/validation of the + # vault blobs vault id. It can be tampered with and changed. The vault id is just a nick + # name to use to pick the best secret and provide some ux/ui info. + + # iterate over all the applicable secrets (all of them by default) until one works... + # if we specify a vault_id, only the corresponding vault secret is checked and + # we check it first. + + vault_id_matchers = [] + + if vault_id: + display.vvvvv('Found a vault_id (%s) in the vaulttext' % (vault_id)) + vault_id_matchers.append(vault_id) + _matches = match_secrets(self.secrets, vault_id_matchers) + if _matches: + display.vvvvv('We have a secret associated with vault id (%s), will try to use to decrypt %s' % (vault_id, filename)) + else: + display.vvvvv('Found a vault_id (%s) in the vault text, but we do not have a associated secret (--vault-id)' % (vault_id)) + + # Not adding the other secrets to vault_secret_ids enforces a match between the vault_id from the vault_text and + # the known vault secrets. + if not C.DEFAULT_VAULT_ID_MATCH: + # Add all of the known vault_ids as candidates for decrypting a vault. + vault_id_matchers.extend([_vault_id for _vault_id, _secret in self.secrets if _vault_id != vault_id]) + + matched_secrets = match_secrets(self.secrets, vault_id_matchers) + + # for vault_secret_id in vault_secret_ids: + for vault_secret_id, vault_secret in matched_secrets: + display.vvvvv('Trying to use vault secret=(%s) id=%s to decrypt %s' % (vault_secret, vault_secret_id, filename)) + + try: + # secret = self.secrets[vault_secret_id] + display.vvvv('Trying secret %s for vault_id=%s' % (vault_secret, vault_secret_id)) + b_plaintext = this_cipher.decrypt(b_vaulttext, vault_secret) + if b_plaintext is not None: + display.vvvvv('decrypt succesful with secret=%s and vault_id=%s' % (vault_secret, vault_secret_id)) + break + except AnsibleError as e: + display.vvvv('Tried to use the vault secret (%s) to decrypt (%s) but it failed. Error: %s' % + (vault_secret_id, filename, e)) + continue + else: + msg = "Decryption failed (no vault secrets would found that could decrypt)" + if filename: + msg += " on %s" % filename + raise AnsibleVaultError(msg) - # try to unencrypt vaulttext - b_plaintext = this_cipher.decrypt(b_vaulttext, self.b_password) if b_plaintext is None: msg = "Decryption failed" if filename: @@ -241,54 +561,12 @@ class VaultLib: return b_plaintext - def _format_output(self, b_ciphertext): - """ Add header and format to 80 columns - - :arg b_vaulttext: the encrypted and hexlified data as a byte string - :returns: a byte str that should be dumped into a file. It's - formatted to 80 char columns and has the header prepended - """ - - if not self.cipher_name: - raise AnsibleError("the cipher must be set before adding a header") - - header = b';'.join([b_HEADER, self.b_version, - to_bytes(self.cipher_name, 'utf-8', errors='strict')]) - b_vaulttext = [header] - b_vaulttext += [b_ciphertext[i:i + 80] for i in range(0, len(b_ciphertext), 80)] - b_vaulttext += [b''] - b_vaulttext = b'\n'.join(b_vaulttext) - - return b_vaulttext - - def _split_header(self, b_vaulttext): - """Retrieve information about the Vault and clean the data - - When data is saved, it has a header prepended and is formatted into 80 - character lines. This method extracts the information from the header - and then removes the header and the inserted newlines. The string returned - is suitable for processing by the Cipher classes. - - :arg b_vaulttext: byte str containing the data from a save file - :returns: a byte str suitable for passing to a Cipher class's - decrypt() function. - """ - # used by decrypt - - b_tmpdata = b_vaulttext.split(b'\n') - b_tmpheader = b_tmpdata[0].strip().split(b';') - - self.b_version = b_tmpheader[1].strip() - self.cipher_name = to_text(b_tmpheader[2].strip()) - b_ciphertext = b''.join(b_tmpdata[1:]) - - return b_ciphertext - class VaultEditor: - def __init__(self, b_password): - self.vault = VaultLib(b_password) + def __init__(self, vault=None): + # TODO: it may be more useful to just make VaultSecrets and index of VaultLib objects... + self.vault = vault or VaultLib() # TODO: mv shred file stuff to it's own class def _shred_file_custom(self, tmp_path): @@ -358,7 +636,8 @@ class VaultEditor: os.remove(tmp_path) - def _edit_file_helper(self, filename, existing_data=None, force_save=False): + def _edit_file_helper(self, filename, secret, + existing_data=None, force_save=False, vault_id=None): # Create a tempfile fd, tmp_path = tempfile.mkstemp() @@ -385,7 +664,7 @@ class VaultEditor: # encrypt new data and write out to tmp # An existing vaultfile will always be UTF-8, # so decode to unicode here - b_ciphertext = self.vault.encrypt(b_tmpdata) + b_ciphertext = self.vault.encrypt(b_tmpdata, secret, vault_id=vault_id) self.write_data(b_ciphertext, tmp_path) # shuffle tmp file into place @@ -399,13 +678,13 @@ class VaultEditor: real_path = os.path.realpath(filename) return real_path - def encrypt_bytes(self, b_plaintext): + def encrypt_bytes(self, b_plaintext, secret, vault_id=None): - b_ciphertext = self.vault.encrypt(b_plaintext) + b_ciphertext = self.vault.encrypt(b_plaintext, secret, vault_id=vault_id) return b_ciphertext - def encrypt_file(self, filename, output_file=None): + def encrypt_file(self, filename, secret, vault_id=None, output_file=None): # A file to be encrypted into a vaultfile could be any encoding # so treat the contents as a byte string. @@ -414,7 +693,7 @@ class VaultEditor: filename = self._real_path(filename) b_plaintext = self.read_data(filename) - b_ciphertext = self.vault.encrypt(b_plaintext) + b_ciphertext = self.vault.encrypt(b_plaintext, secret, vault_id=vault_id) self.write_data(b_ciphertext, output_file or filename) def decrypt_file(self, filename, output_file=None): @@ -425,12 +704,12 @@ class VaultEditor: ciphertext = self.read_data(filename) try: - plaintext = self.vault.decrypt(ciphertext) + plaintext = self.vault.decrypt(ciphertext, filename=filename) except AnsibleError as e: raise AnsibleError("%s for %s" % (to_bytes(e), to_bytes(filename))) self.write_data(plaintext, output_file or filename, shred=False) - def create_file(self, filename): + def create_file(self, filename, secret, vault_id=None): """ create a new encrypted file """ # FIXME: If we can raise an error here, we can probably just make it @@ -438,58 +717,80 @@ class VaultEditor: if os.path.isfile(filename): raise AnsibleError("%s exists, please use 'edit' instead" % filename) - self._edit_file_helper(filename) + self._edit_file_helper(filename, secret, vault_id=vault_id) def edit_file(self, filename): # follow the symlink filename = self._real_path(filename) - ciphertext = self.read_data(filename) + b_vaulttext = self.read_data(filename) + + # vault or yaml files are always utf8 + vaulttext = to_text(b_vaulttext) try: - plaintext = self.vault.decrypt(ciphertext) + # vaulttext gets converted back to bytes, but alas + plaintext = self.vault.decrypt(vaulttext) except AnsibleError as e: raise AnsibleError("%s for %s" % (to_bytes(e), to_bytes(filename))) + # Figure out the vault id from the file, to select the right secret to re-encrypt it + # (duplicates parts of decrypt, but alas...) + b_ciphertext, b_version, cipher_name, vault_id = parse_vaulttext_envelope(b_vaulttext) + + # if we could decrypt, the vault_id should be in secrets + # though we could have multiple secrets for a given vault_id, pick the first one + secrets = match_secrets(self.vault.secrets, [vault_id]) + secret = secrets[0][1] if self.vault.cipher_name not in CIPHER_WRITE_WHITELIST: # we want to get rid of files encrypted with the AES cipher - self._edit_file_helper(filename, existing_data=plaintext, force_save=True) + self._edit_file_helper(filename, secret, existing_data=plaintext, force_save=True) else: - self._edit_file_helper(filename, existing_data=plaintext, force_save=False) + self._edit_file_helper(filename, secret, existing_data=plaintext, force_save=False) def plaintext(self, filename): - ciphertext = self.read_data(filename) + b_vaulttext = self.read_data(filename) + vaulttext = to_text(b_vaulttext) try: - plaintext = self.vault.decrypt(ciphertext) + plaintext = self.vault.decrypt(vaulttext) + return plaintext except AnsibleError as e: - raise AnsibleError("%s for %s" % (to_bytes(e), to_bytes(filename))) + raise AnsibleVaultError("%s for %s" % (to_bytes(e), to_bytes(filename))) - return plaintext - - def rekey_file(self, filename, b_new_password): + # FIXME/TODO: make this use VaultSecret + def rekey_file(self, filename, new_vault_secret, new_vault_id=None): # follow the symlink filename = self._real_path(filename) prev = os.stat(filename) - ciphertext = self.read_data(filename) + b_vaulttext = self.read_data(filename) + vaulttext = to_text(b_vaulttext) try: - plaintext = self.vault.decrypt(ciphertext) + plaintext = self.vault.decrypt(vaulttext) except AnsibleError as e: raise AnsibleError("%s for %s" % (to_bytes(e), to_bytes(filename))) # This is more or less an assert, see #18247 - if b_new_password is None: + if new_vault_secret is None: raise AnsibleError('The value for the new_password to rekey %s with is not valid' % filename) - new_vault = VaultLib(b_new_password) - new_ciphertext = new_vault.encrypt(plaintext) + # FIXME: VaultContext...? could rekey to a different vault_id in the same VaultSecrets - self.write_data(new_ciphertext, filename) + # Need a new VaultLib because the new vault data can be a different + # vault lib format or cipher (for ex, when we migrate 1.0 style vault data to + # 1.1 style data we change the version and the cipher). This is where a VaultContext might help + + # the new vault will only be used for encrypting, so it doesn't need the vault secrets + # (we will pass one in directly to encrypt) + new_vault = VaultLib(secrets={}) + b_new_vaulttext = new_vault.encrypt(plaintext, new_vault_secret, vault_id=new_vault_id) + + self.write_data(b_new_vaulttext, filename) # preserve permissions os.chmod(filename, prev.st_mode) @@ -565,8 +866,8 @@ class VaultEditor: os.chown(dest, prev.st_uid, prev.st_gid) def _editor_shell_command(self, filename): - EDITOR = os.environ.get('EDITOR', 'vi') - editor = shlex.split(EDITOR) + env_editor = os.environ.get('EDITOR', 'vi') + editor = shlex.split(env_editor) editor.append(filename) return editor @@ -612,15 +913,26 @@ class VaultAES: raise AnsibleError("Encryption disabled for deprecated VaultAES class") + @staticmethod + def _parse_plaintext_envelope(b_envelope): + # split out sha and verify decryption + b_split_data = b_envelope.split(b"\n", 1) + b_this_sha = b_split_data[0] + b_plaintext = b_split_data[1] + b_test_sha = to_bytes(sha256(b_plaintext).hexdigest()) + + return b_plaintext, b_this_sha, b_test_sha + @classmethod def _decrypt_cryptography(cls, b_salt, b_ciphertext, b_password, key_length): + bs = algorithms.AES.block_size // 8 b_key, b_iv = cls._aes_derive_key_and_iv(b_password, b_salt, key_length, bs) cipher = C_Cipher(algorithms.AES(b_key), modes.CBC(b_iv), CRYPTOGRAPHY_BACKEND).decryptor() unpadder = padding.PKCS7(algorithms.AES.block_size).unpadder() try: - b_plaintext = unpadder.update( + b_plaintext_envelope = unpadder.update( cipher.update(b_ciphertext) + cipher.finalize() ) + unpadder.finalize() except ValueError: @@ -628,11 +940,7 @@ class VaultAES: # password was given raise AnsibleError("Decryption failed") - # split out sha and verify decryption - b_split_data = b_plaintext.split(b"\n", 1) - b_this_sha = b_split_data[0] - b_plaintext = b_split_data[1] - b_test_sha = to_bytes(sha256(b_plaintext).hexdigest()) + b_plaintext, b_this_sha, b_test_sha = cls._parse_plaintext_envelope(b_plaintext_envelope) if b_this_sha != b_test_sha: raise AnsibleError("Decryption failed") @@ -646,6 +954,7 @@ class VaultAES: out_file = BytesIO() bs = AES_pycrypto.block_size + b_key, b_iv = cls._aes_derive_key_and_iv(b_password, b_salt, key_length, bs) cipher = AES_pycrypto.new(b_key, AES_pycrypto.MODE_CBC, b_iv) b_next_chunk = b'' @@ -667,14 +976,10 @@ class VaultAES: # reset the stream pointer to the beginning out_file.seek(0) - b_out_data = out_file.read() + b_plaintext_envelope = out_file.read() out_file.close() - # split out sha and verify decryption - b_split_data = b_out_data.split(b"\n", 1) - b_this_sha = b_split_data[0] - b_plaintext = b_split_data[1] - b_test_sha = to_bytes(sha256(b_plaintext).hexdigest()) + b_plaintext, b_this_sha, b_test_sha = cls._parse_plaintext_envelope(b_plaintext_envelope) if b_this_sha != b_test_sha: raise AnsibleError("Decryption failed") @@ -682,7 +987,7 @@ class VaultAES: return b_plaintext @classmethod - def decrypt(cls, b_vaulttext, b_password, key_length=32): + def decrypt(cls, b_vaulttext, secret, key_length=32, vault_id=None): """ Decrypt the given data and return it :arg b_data: A byte string containing the encrypted data @@ -700,6 +1005,8 @@ class VaultAES: b_salt = b_vaultdata[len(b'Salted__'):16] b_ciphertext = b_vaultdata[16:] + b_password = secret.bytes + if HAS_CRYPTOGRAPHY: b_plaintext = cls._decrypt_cryptography(b_salt, b_ciphertext, b_password, key_length) elif HAS_PYCRYPTO: @@ -789,7 +1096,7 @@ class VaultAES256: hmac.update(b_ciphertext) b_hmac = hmac.finalize() - return hexlify(b_hmac), hexlify(b_ciphertext) + return to_bytes(hexlify(b_hmac), errors='surrogate_or_strict'), hexlify(b_ciphertext) @staticmethod def _encrypt_pycrypto(b_plaintext, b_salt, b_key1, b_key2, b_iv): @@ -820,8 +1127,11 @@ class VaultAES256: return to_bytes(hmac.hexdigest(), errors='surrogate_or_strict'), hexlify(b_ciphertext) @classmethod - def encrypt(cls, b_plaintext, b_password): + def encrypt(cls, b_plaintext, secret): + if secret is None: + raise AnsibleVaultError('The secret passed to encrypt() was None') b_salt = os.urandom(32) + b_password = secret.bytes b_key1, b_key2, b_iv = cls._gen_key_initctr(b_password, b_salt) if HAS_CRYPTOGRAPHY: @@ -837,15 +1147,16 @@ class VaultAES256: b_vaulttext = hexlify(b_vaulttext) return b_vaulttext - @staticmethod - def _decrypt_cryptography(b_ciphertext, b_crypted_hmac, b_key1, b_key2, b_iv): + @classmethod + def _decrypt_cryptography(cls, b_ciphertext, b_crypted_hmac, b_key1, b_key2, b_iv): + # b_key1, b_key2, b_iv = self._gen_key_initctr(b_password, b_salt) # EXIT EARLY IF DIGEST DOESN'T MATCH hmac = HMAC(b_key2, hashes.SHA256(), CRYPTOGRAPHY_BACKEND) hmac.update(b_ciphertext) try: hmac.verify(unhexlify(b_crypted_hmac)) - except InvalidSignature: - return None + except InvalidSignature as e: + raise AnsibleVaultError('HMAC verification failed: %s' % e) cipher = C_Cipher(algorithms.AES(b_key1), modes.CTR(b_iv), CRYPTOGRAPHY_BACKEND) decryptor = cipher.decryptor() @@ -904,12 +1215,19 @@ class VaultAES256: return b_plaintext @classmethod - def decrypt(cls, b_vaulttext, b_password): + def decrypt(cls, b_vaulttext, secret): # SPLIT SALT, DIGEST, AND DATA b_vaulttext = unhexlify(b_vaulttext) b_salt, b_crypted_hmac, b_ciphertext = b_vaulttext.split(b"\n", 2) b_salt = unhexlify(b_salt) b_ciphertext = unhexlify(b_ciphertext) + + # TODO: would be nice if a VaultSecret could be passed directly to _decrypt_* + # (move _gen_key_initctr() to a AES256 VaultSecret or VaultContext impl?) + # though, likely needs to be python cryptography specific impl that basically + # creates a Cipher() with b_key1, a Mode.CTR() with b_iv, and a HMAC() with sign key b_key2 + b_password = secret.bytes + b_key1, b_key2, b_iv = cls._gen_key_initctr(b_password, b_salt) if HAS_CRYPTOGRAPHY: @@ -921,6 +1239,7 @@ class VaultAES256: return b_plaintext + # Keys could be made bytes later if the code that gets the data is more # naturally byte-oriented CIPHER_MAPPING = { diff --git a/lib/ansible/parsing/yaml/constructor.py b/lib/ansible/parsing/yaml/constructor.py index 6753089283..52ad8fc0f0 100644 --- a/lib/ansible/parsing/yaml/constructor.py +++ b/lib/ansible/parsing/yaml/constructor.py @@ -23,9 +23,10 @@ from yaml.constructor import SafeConstructor, ConstructorError from yaml.nodes import MappingNode from ansible.module_utils._text import to_bytes -from ansible.parsing.vault import VaultLib -from ansible.parsing.yaml.objects import AnsibleMapping, AnsibleSequence, AnsibleUnicode, AnsibleVaultEncryptedUnicode +from ansible.parsing.yaml.objects import AnsibleMapping, AnsibleSequence, AnsibleUnicode +from ansible.parsing.yaml.objects import AnsibleVaultEncryptedUnicode from ansible.utils.unsafe_proxy import wrap_var +from ansible.parsing.vault import VaultLib, parse_vaulttext_envelope try: @@ -36,12 +37,12 @@ except ImportError: class AnsibleConstructor(SafeConstructor): - def __init__(self, file_name=None, b_vault_password=None): - self._b_vault_password = b_vault_password + def __init__(self, file_name=None, vault_secrets=None): self._ansible_file_name = file_name super(AnsibleConstructor, self).__init__() self._vaults = {} - self._vaults['default'] = VaultLib(b_password=self._b_vault_password) + self.vault_secrets = vault_secrets or [] + self._vaults['default'] = VaultLib(secrets=self.vault_secrets) def construct_yaml_map(self, node): data = AnsibleMapping() @@ -96,17 +97,16 @@ class AnsibleConstructor(SafeConstructor): def construct_vault_encrypted_unicode(self, node): value = self.construct_scalar(node) - ciphertext_data = to_bytes(value) - - if self._b_vault_password is None: + b_ciphertext_data = to_bytes(value) + # could pass in a key id here to choose the vault to associate with + # TODO/FIXME: plugin vault selector + vault = self._vaults['default'] + if vault.secrets is None: raise ConstructorError(context=None, context_mark=None, problem="found !vault but no vault password provided", problem_mark=node.start_mark, note=None) - - # could pass in a key id here to choose the vault to associate with - vault = self._vaults['default'] - ret = AnsibleVaultEncryptedUnicode(ciphertext_data) + ret = AnsibleVaultEncryptedUnicode(b_ciphertext_data) ret.vault = vault return ret diff --git a/lib/ansible/parsing/yaml/loader.py b/lib/ansible/parsing/yaml/loader.py index 7ff309d82c..b6650041c4 100644 --- a/lib/ansible/parsing/yaml/loader.py +++ b/lib/ansible/parsing/yaml/loader.py @@ -32,9 +32,9 @@ from ansible.parsing.yaml.constructor import AnsibleConstructor if HAVE_PYYAML_C: class AnsibleLoader(CParser, AnsibleConstructor, Resolver): - def __init__(self, stream, file_name=None, vault_password=None): + def __init__(self, stream, file_name=None, vault_secrets=None): CParser.__init__(self, stream) - AnsibleConstructor.__init__(self, file_name=file_name, b_vault_password=vault_password) + AnsibleConstructor.__init__(self, file_name=file_name, vault_secrets=vault_secrets) Resolver.__init__(self) else: from yaml.composer import Composer @@ -43,10 +43,10 @@ else: from yaml.parser import Parser class AnsibleLoader(Reader, Scanner, Parser, Composer, AnsibleConstructor, Resolver): - def __init__(self, stream, file_name=None, vault_password=None): + def __init__(self, stream, file_name=None, vault_secrets=None): Reader.__init__(self, stream) Scanner.__init__(self) Parser.__init__(self) Composer.__init__(self) - AnsibleConstructor.__init__(self, file_name=file_name, b_vault_password=vault_password) + AnsibleConstructor.__init__(self, file_name=file_name, vault_secrets=vault_secrets) Resolver.__init__(self) diff --git a/lib/ansible/parsing/yaml/objects.py b/lib/ansible/parsing/yaml/objects.py index bd02090c01..45cbb4d5dd 100644 --- a/lib/ansible/parsing/yaml/objects.py +++ b/lib/ansible/parsing/yaml/objects.py @@ -76,11 +76,11 @@ class AnsibleVaultEncryptedUnicode(yaml.YAMLObject, AnsibleBaseYAMLObject): yaml_tag = u'!vault' @classmethod - def from_plaintext(cls, seq, vault): + def from_plaintext(cls, seq, vault, secret): if not vault: raise vault.AnsibleVaultError('Error creating AnsibleVaultEncryptedUnicode, invalid vault (%s) provided' % vault) - ciphertext = vault.encrypt(seq) + ciphertext = vault.encrypt(seq, secret) avu = cls(ciphertext) avu.vault = vault return avu diff --git a/test/integration/targets/vault/encrypted_file_encrypted_var_password b/test/integration/targets/vault/encrypted_file_encrypted_var_password new file mode 100644 index 0000000000..57bc06e3c1 --- /dev/null +++ b/test/integration/targets/vault/encrypted_file_encrypted_var_password @@ -0,0 +1 @@ +test-encrypted-file-password diff --git a/test/integration/targets/vault/example1_password b/test/integration/targets/vault/example1_password new file mode 100644 index 0000000000..e723c8f9ae --- /dev/null +++ b/test/integration/targets/vault/example1_password @@ -0,0 +1 @@ +example1 diff --git a/test/integration/targets/vault/example2_password b/test/integration/targets/vault/example2_password new file mode 100644 index 0000000000..7b010f8743 --- /dev/null +++ b/test/integration/targets/vault/example2_password @@ -0,0 +1 @@ +example2 diff --git a/test/integration/targets/vault/example3_password b/test/integration/targets/vault/example3_password new file mode 100644 index 0000000000..f5bc5a8c41 --- /dev/null +++ b/test/integration/targets/vault/example3_password @@ -0,0 +1 @@ +example3 diff --git a/test/integration/targets/vault/format_1_2_AES256.yml b/test/integration/targets/vault/format_1_2_AES256.yml new file mode 100644 index 0000000000..1e3795fba2 --- /dev/null +++ b/test/integration/targets/vault/format_1_2_AES256.yml @@ -0,0 +1,6 @@ +$ANSIBLE_VAULT;1.2;AES256;test_vault_id +30383835613535356232333534303264656530633664616233386138396563623939626136366537 +3635323530646538626138383136636437616637616430610a386661346563346136326637656461 +64393364343964633364336666333630383164643662343930663432316333633537353938376437 +6134656262373731390a363166356461376663313532343733326438386632623930313366643038 +6133 diff --git a/test/integration/targets/vault/password-script.py b/test/integration/targets/vault/password-script.py new file mode 100755 index 0000000000..ceb2ac1713 --- /dev/null +++ b/test/integration/targets/vault/password-script.py @@ -0,0 +1,33 @@ +#!/usr/bin/env python +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see . +# +# ansible-vault is a script that encrypts/decrypts YAML files. See +# http://docs.ansible.com/playbooks_vault.html for more details. + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import sys + +PASSWORD = 'test-vault-password' + + +def main(args): + print(PASSWORD) + return 0 + + +if __name__ == '__main__': + sys.exit(main(sys.argv[:])) diff --git a/test/integration/targets/vault/roles/test_vault_embedded_ids/tasks/main.yml b/test/integration/targets/vault/roles/test_vault_embedded_ids/tasks/main.yml new file mode 100644 index 0000000000..9aeaf24060 --- /dev/null +++ b/test/integration/targets/vault/roles/test_vault_embedded_ids/tasks/main.yml @@ -0,0 +1,29 @@ +--- +- name: set a fact from vault_encrypted_example1_releases + set_fact: + example1_releases: "{{ vault_encrypted_example1_releases }}" + +- name: Assert that a embedded vault of a multiline string with a vault id works + assert: + that: + - "vault_encrypted_example1_releases is defined" + - "example1_releases is defined" + - "example1_releases.startswith('Ansible Releases')" + # - '"{{ vault_encrypted_example1_releases }}" == "Setec Astronomy"' + +- name: Assert that a embedded vault with a different vault id works + assert: + that: + - "vault_encrypted_example2_hello == 'Hello world'" + +- name: Assert that a embedded vault with no vault id and format 1.2 works + assert: + that: + - "vault_encrypted_example3_foobar == 'Foobar'" + #- name: Assert that a multi line embedded vault works, including new line + # assert: + # that: + # - vault_encrypted_var == "Setec Astronomy\n" + +# TODO: add a expected fail here +# - debug: var=vault_encrypted_one_line_var_with_embedded_template diff --git a/test/integration/targets/vault/roles/test_vault_embedded_ids/vars/main.yml b/test/integration/targets/vault/roles/test_vault_embedded_ids/vars/main.yml new file mode 100644 index 0000000000..9c8fa4b225 --- /dev/null +++ b/test/integration/targets/vault/roles/test_vault_embedded_ids/vars/main.yml @@ -0,0 +1,194 @@ +vault_encrypted_example2_hello: !vault | + $ANSIBLE_VAULT;1.2;AES256;example2 + 30383930326535616363383537613266376364323738313835353566633533353364363837383638 + 3737633764613862343666346337353964613138653036610a313663393231386139343835626436 + 66633336303866323335616661366363333463616530326635383836656432396665313338313737 + 6539616630663262650a383762303362356438616261646564303230633930336563373566623235 + 3566 +vault_encrypted_example1_releases: !vault | + $ANSIBLE_VAULT;1.2;AES256;example1 + 63643833646565393535303862343135326261343362396234656137313731313864316539616462 + 3333313439353638393963643535633835643035383331340a393639386166313838326336363032 + 65396565616531663839316132646230316561613865333437653666323034396337626431663931 + 3339363233356438350avault_encrypted_example3_foobar: !vault | + $ANSIBLE_VAULT;1.1;AES256 + 37336431373836376339373763306436396334623061366266353763363766313063363230636138 + 3665663061366436306232323636376261303064616339620a333365323266643364396136626665 + 62363862653134623665326635396563643832636234386266616436626334363839326434383431 + 3330373333366233380a363431386334636164643936313430623661633265346632343331373866 + 3732 +# We dont have a secret for this vaulttext, but nothing references it +# so nothing should ever try to decrypt it. So this is testing that +# we dont require all vaulted vars to be decrypted. +vault_encrypted_example4_unknown_password: !vault | + $ANSIBLE_VAULT;1.1;AES256 + 64316436303566666563393931613833316533346539373635663031376664366131353264366132 + 3637623935356263643639313562366434383234633232660a353636666134353030646539643139 + 65376235333932353531356666363434313066366161383532363166653762326533323233623431 + 3934393962633637330a356337626634343736313339316365373239663031663938353063326665 + 30643339386131663336366531663031383030313936356631613432336338313962 diff --git a/test/integration/targets/vault/roles/test_vault_file_encrypted_embedded/README.md b/test/integration/targets/vault/roles/test_vault_file_encrypted_embedded/README.md new file mode 100644 index 0000000000..4a75cece97 --- /dev/null +++ b/test/integration/targets/vault/roles/test_vault_file_encrypted_embedded/README.md @@ -0,0 +1 @@ +file is encrypted with password of 'test-encrypted-file-password' diff --git a/test/integration/targets/vault/roles/test_vault_file_encrypted_embedded/tasks/main.yml b/test/integration/targets/vault/roles/test_vault_file_encrypted_embedded/tasks/main.yml new file mode 100644 index 0000000000..e09004a1d9 --- /dev/null +++ b/test/integration/targets/vault/roles/test_vault_file_encrypted_embedded/tasks/main.yml @@ -0,0 +1,13 @@ +--- +- name: Assert that a vault encrypted file with embedded vault of a string with no newline works + assert: + that: + - '"{{ vault_file_encrypted_with_encrypted_one_line_var }}" == "Setec Astronomy"' + +- name: Assert that a vault encrypted file with multi line embedded vault works, including new line + assert: + that: + - vault_file_encrypted_with_encrypted_var == "Setec Astronomy\n" + +# TODO: add a expected fail here +# - debug: var=vault_encrypted_one_line_var_with_embedded_template diff --git a/test/integration/targets/vault/roles/test_vault_file_encrypted_embedded/vars/main.yml b/test/integration/targets/vault/roles/test_vault_file_encrypted_embedded/vars/main.yml new file mode 100644 index 0000000000..89cc4a0f8b --- /dev/null +++ b/test/integration/targets/vault/roles/test_vault_file_encrypted_embedded/vars/main.yml @@ -0,0 +1,76 @@ +$ANSIBLE_VAULT;1.1;AES256 +31613535653961393639346266636234373833316530373965356161373735666662613137386466 +3365303539306132613861646362396161323962373839640a653030376530316136643961623665 +65643665616338363432383264363730386538353635663339633932353933653132343430613332 +6136663837306333370adiff --git a/test/integration/targets/vault/runme.sh b/test/integration/targets/vault/runme.sh index c06e50788e..e0a9984454 100755 --- a/test/integration/targets/vault/runme.sh +++ b/test/integration/targets/vault/runme.sh @@ -9,6 +9,9 @@ trap 'rm -rf "${MYTMPDIR}"' EXIT TEST_FILE="${MYTMPDIR}/test_file" echo "This is a test file" > "${TEST_FILE}" +TEST_FILE_1_2="${MYTMPDIR}/test_file_1_2" +echo "This is a test file for format 1.2" > "${TEST_FILE_1_2}" + TEST_FILE_OUTPUT="${MYTMPDIR}/test_file_output" # old format @@ -38,13 +41,111 @@ set -eux # new format, view ansible-vault view "$@" --vault-password-file vault-password format_1_1_AES256.yml +# new format, view with vault-id +ansible-vault view "$@" --vault-id=vault-password format_1_1_AES256.yml + +# new format, view, using password script +ansible-vault view "$@" --vault-password-file password-script.py format_1_1_AES256.yml + +# new format, view, using password script with vault-id +ansible-vault view "$@" --vault-id password-script.py format_1_1_AES256.yml + +# new 1.2 format, view +ansible-vault view "$@" --vault-password-file vault-password format_1_2_AES256.yml + +# new 1.2 format, view with vault-id +ansible-vault view "$@" --vault-id=test_vault_id@vault-password format_1_2_AES256.yml + +# new 1,2 format, view, using password script +ansible-vault view "$@" --vault-password-file password-script.py format_1_2_AES256.yml + +# new 1.2 format, view, using password script with vault-id +ansible-vault view "$@" --vault-id password-script.py format_1_2_AES256.yml + +# new 1.2 format, view, ENFORCE_IDENTITY_MATCH=true, should fail, no 'test_vault_id' vault_id +ANSIBLE_VAULT_ID_MATCH=1 ansible-vault view "$@" --vault-password-file vault-password format_1_2_AES256.yml && : +WRONG_RC=$? +echo "rc was $WRONG_RC (1 is expected)" +[ $WRONG_RC -eq 1 ] + +# new 1.2 format, view with vault-id, ENFORCE_IDENTITY_MATCH=true, should work, 'test_vault_id' is provided +ANSIBLE_VAULT_ID_MATCH=1 ansible-vault view "$@" --vault-id=test_vault_id@vault-password format_1_2_AES256.yml + +# new 1,2 format, view, using password script, ENFORCE_IDENTITY_MATCH=true, should fail, no 'test_vault_id' +ANSIBLE_VAULT_ID_MATCH=1 ansible-vault view "$@" --vault-password-file password-script.py format_1_2_AES256.yml && : +WRONG_RC=$? +echo "rc was $WRONG_RC (1 is expected)" +[ $WRONG_RC -eq 1 ] + + +# new 1.2 format, view, using password script with vault-id, ENFORCE_IDENTITY_MATCH=true, should fail +ANSIBLE_VAULT_ID_MATCH=1 ansible-vault view "$@" --vault-id password-script.py format_1_2_AES256.yml && : +WRONG_RC=$? +echo "rc was $WRONG_RC (1 is expected)" +[ $WRONG_RC -eq 1 ] + +# new 1.2 format, view, using password script with vault-id, ENFORCE_IDENTITY_MATCH=true, 'test_vault_id' provided should work +ANSIBLE_VAULT_ID_MATCH=1 ansible-vault view "$@" --vault-id=test_vault_id@password-script.py format_1_2_AES256.yml + # encrypt it ansible-vault encrypt "$@" --vault-password-file vault-password "${TEST_FILE}" ansible-vault view "$@" --vault-password-file vault-password "${TEST_FILE}" +# view with multiple vault-password files, including a wrong one +ansible-vault view "$@" --vault-password-file vault-password --vault-password-file vault-password-wrong "${TEST_FILE}" + +# view with multiple vault-password files, including a wrong one, using vault-id +ansible-vault view "$@" --vault-id vault-password --vault-id vault-password-wrong "${TEST_FILE}" + +# And with the password files specified in a different order +ansible-vault view "$@" --vault-password-file vault-password-wrong --vault-password-file vault-password "${TEST_FILE}" + +# And with the password files specified in a different order, using vault-id +ansible-vault view "$@" --vault-id vault-password-wrong --vault-id vault-password "${TEST_FILE}" + +# And with the password files specified in a different order, using --vault-id and non default vault_ids +ansible-vault view "$@" --vault-id test_vault_id@vault-password-wrong --vault-id test_vault_id@vault-password "${TEST_FILE}" + ansible-vault decrypt "$@" --vault-password-file vault-password "${TEST_FILE}" +# encrypt it, using a vault_id so we write a 1.2 format file +ansible-vault encrypt "$@" --vault-id test_vault_1_2@vault-password "${TEST_FILE_1_2}" + +ansible-vault view "$@" --vault-id vault-password "${TEST_FILE_1_2}" +ansible-vault view "$@" --vault-id test_vault_1_2@vault-password "${TEST_FILE_1_2}" + +# view with multiple vault-password files, including a wrong one +ansible-vault view "$@" --vault-id vault-password --vault-id wrong_password@vault-password-wrong "${TEST_FILE_1_2}" + +# And with the password files specified in a different order, using vault-id +ansible-vault view "$@" --vault-id vault-password-wrong --vault-id vault-password "${TEST_FILE_1_2}" + +# And with the password files specified in a different order, using --vault-id and non default vault_ids +ansible-vault view "$@" --vault-id test_vault_id@vault-password-wrong --vault-id test_vault_id@vault-password "${TEST_FILE_1_2}" + +ansible-vault decrypt "$@" --vault-id test_vault_1_2@vault-password "${TEST_FILE_1_2}" + +# multiple vault passwords +ansible-vault view "$@" --vault-password-file vault-password --vault-password-file vault-password-wrong format_1_1_AES256.yml + +# multiple vault passwords, --vault-id +ansible-vault view "$@" --vault-id test_vault_id@vault-password --vault-id test_vault_id@vault-password-wrong format_1_1_AES256.yml + +# encrypt it, with password from password script +ansible-vault encrypt "$@" --vault-password-file password-script.py "${TEST_FILE}" + +ansible-vault view "$@" --vault-password-file password-script.py "${TEST_FILE}" + +ansible-vault decrypt "$@" --vault-password-file password-script.py "${TEST_FILE}" + +# encrypt it, with password from password script +ansible-vault encrypt "$@" --vault-id test_vault_id@password-script.py "${TEST_FILE}" + +ansible-vault view "$@" --vault-id test_vault_id@password-script.py "${TEST_FILE}" + +ansible-vault decrypt "$@" --vault-id test_vault_id@password-script.py "${TEST_FILE}" + # new password file for rekeyed file NEW_VAULT_PASSWORD="${MYTMPDIR}/new-vault-password" echo "newpassword" > "${NEW_VAULT_PASSWORD}" @@ -55,6 +156,18 @@ ansible-vault rekey "$@" --vault-password-file vault-password --new-vault-passwo ansible-vault view "$@" --vault-password-file "${NEW_VAULT_PASSWORD}" "${TEST_FILE}" +# view with old password file and new password file +ansible-vault view "$@" --vault-password-file "${NEW_VAULT_PASSWORD}" --vault-password-file vault-password "${TEST_FILE}" + +# view with old password file and new password file, different order +ansible-vault view "$@" --vault-password-file vault-password --vault-password-file "${NEW_VAULT_PASSWORD}" "${TEST_FILE}" + +# view with old password file and new password file and another wrong +ansible-vault view "$@" --vault-password-file "${NEW_VAULT_PASSWORD}" --vault-password-file vault-password-wrong --vault-password-file vault-password "${TEST_FILE}" + +# view with old password file and new password file and another wrong, using --vault-id +ansible-vault view "$@" --vault-id "tmp_new_password@${NEW_VAULT_PASSWORD}" --vault-id wrong_password@vault-password-wrong --vault-id myorg@vault-password "${TEST_FILE}" + ansible-vault decrypt "$@" --vault-password-file "${NEW_VAULT_PASSWORD}" "${TEST_FILE}" # reading/writing to/from stdin/stdin (See https://github.com/ansible/ansible/issues/23567) @@ -66,6 +179,10 @@ ansible-vault encrypt_string "$@" --vault-password-file "${NEW_VAULT_PASSWORD}" ansible-vault encrypt_string "$@" --vault-password-file "${NEW_VAULT_PASSWORD}" --name "blippy" "a test string names blippy" +ansible-vault encrypt_string "$@" --vault-id "${NEW_VAULT_PASSWORD}" "a test string" + +ansible-vault encrypt_string "$@" --vault-id "${NEW_VAULT_PASSWORD}" --name "blippy" "a test string names blippy" + # from stdin ansible-vault encrypt_string "$@" --vault-password-file "${NEW_VAULT_PASSWORD}" < "${TEST_FILE}" @@ -85,3 +202,69 @@ ansible-playbook test_vault_embedded.yml -i ../../inventory -v "$@" --vault-pass ansible-playbook test_vault_embedded.yml -i ../../inventory -v "$@" --vault-password-file vault-password ansible-playbook test_vaulted_inventory.yml -i vaulted.inventory -v "$@" --vault-password-file vault-password ansible-playbook test_vaulted_template.yml -i ../../inventory -v "$@" --vault-password-file vault-password + +# test with password from password script +ansible-playbook test_vault.yml -i ../../inventory -v "$@" --vault-password-file password-script.py +ansible-playbook test_vault_embedded.yml -i ../../inventory -v "$@" --vault-password-file password-script.py + +# with multiple password files +ansible-playbook test_vault.yml -i ../../inventory -v "$@" --vault-password-file vault-password --vault-password-file vault-password-wrong +ansible-playbook test_vault.yml -i ../../inventory -v "$@" --vault-password-file vault-password-wrong --vault-password-file vault-password + +ansible-playbook test_vault_embedded.yml -i ../../inventory -v "$@" --vault-password-file vault-password --vault-password-file vault-password-wrong --syntax-check +ansible-playbook test_vault_embedded.yml -i ../../inventory -v "$@" --vault-password-file vault-password-wrong --vault-password-file vault-password + +# test that we can have a vault encrypted yaml file that includes embedded vault vars +# that were encrypted with a different vault secret +ansible-playbook test_vault_file_encrypted_embedded.yml -i ../../inventory "$@" --vault-id encrypted_file_encrypted_var_password --vault-id vault-password + +# with multiple password files, --vault-id, ordering +ansible-playbook test_vault.yml -i ../../inventory -v "$@" --vault-id vault-password --vault-id vault-password-wrong +ansible-playbook test_vault.yml -i ../../inventory -v "$@" --vault-id vault-password-wrong --vault-id vault-password + +ansible-playbook test_vault_embedded.yml -i ../../inventory -v "$@" --vault-id vault-password --vault-id vault-password-wrong --syntax-check +ansible-playbook test_vault_embedded.yml -i ../../inventory -v "$@" --vault-id vault-password-wrong --vault-id vault-password + +# test with multiple password files, including a script, and a wrong password +ansible-playbook test_vault_embedded.yml -i ../../inventory -v "$@" --vault-password-file vault-password-wrong --vault-password-file password-script.py --vault-password-file vault-password + +# test with multiple password files, including a script, and a wrong password, and a mix of --vault-id and --vault-password-file +ansible-playbook test_vault_embedded.yml -i ../../inventory -v "$@" --vault-password-file vault-password-wrong --vault-id password-script.py --vault-id vault-password + +# test with multiple password files, including a script, and a wrong password, and a mix of --vault-id and --vault-password-file +ansible-playbook test_vault_embedded_ids.yml -i ../../inventory -v "$@" \ + --vault-password-file vault-password-wrong \ + --vault-id password-script.py --vault-id example1@example1_password \ + --vault-id example2@example2_password --vault-password-file example3_password \ + --vault-id vault-password + +# with wrong password +ansible-playbook test_vault.yml -i ../../inventory -v "$@" --vault-password-file vault-password-wrong && : +WRONG_RC=$? +echo "rc was $WRONG_RC (1 is expected)" +[ $WRONG_RC -eq 1 ] + +# with multiple wrong passwords +ansible-playbook test_vault.yml -i ../../inventory -v "$@" --vault-password-file vault-password-wrong --vault-password-file vault-password-wrong && : +WRONG_RC=$? +echo "rc was $WRONG_RC (1 is expected)" +[ $WRONG_RC -eq 1 ] + +# with wrong password, --vault-id +ansible-playbook test_vault.yml -i ../../inventory -v "$@" --vault-id vault-password-wrong && : +WRONG_RC=$? +echo "rc was $WRONG_RC (1 is expected)" +[ $WRONG_RC -eq 1 ] + +# with multiple wrong passwords with --vault-id +ansible-playbook test_vault.yml -i ../../inventory -v "$@" --vault-id vault-password-wrong --vault-id vault-password-wrong && : +WRONG_RC=$? +echo "rc was $WRONG_RC (1 is expected)" +[ $WRONG_RC -eq 1 ] + +# with multiple wrong passwords with --vault-id +ansible-playbook test_vault.yml -i ../../inventory -v "$@" --vault-id wrong1@vault-password-wrong --vault-id wrong2@vault-password-wrong && : +WRONG_RC=$? +echo "rc was $WRONG_RC (1 is expected)" +[ $WRONG_RC -eq 1 ] + diff --git a/test/integration/targets/vault/test_vault_embedded_ids.yml b/test/integration/targets/vault/test_vault_embedded_ids.yml new file mode 100644 index 0000000000..23ebbb969f --- /dev/null +++ b/test/integration/targets/vault/test_vault_embedded_ids.yml @@ -0,0 +1,4 @@ +- hosts: testhost + gather_facts: False + roles: + - { role: test_vault_embedded_ids, tags: test_vault_embedded_ids} diff --git a/test/integration/targets/vault/test_vault_file_encrypted_embedded.yml b/test/integration/targets/vault/test_vault_file_encrypted_embedded.yml new file mode 100644 index 0000000000..685d20ef7d --- /dev/null +++ b/test/integration/targets/vault/test_vault_file_encrypted_embedded.yml @@ -0,0 +1,4 @@ +- hosts: testhost + gather_facts: False + roles: + - { role: test_vault_file_encrypted_embedded, tags: test_vault_file_encrypted_embedded} diff --git a/test/units/cli/test_cli.py b/test/units/cli/test_cli.py index 69e929b61b..4692f128ba 100644 --- a/test/units/cli/test_cli.py +++ b/test/units/cli/test_cli.py @@ -20,8 +20,12 @@ from __future__ import (absolute_import, division, print_function) __metaclass__ = type from ansible.compat.tests import unittest +from ansible.compat.tests.mock import patch, MagicMock + +from units.mock.loader import DictDataLoader from ansible.release import __version__ +from ansible.parsing import vault from ansible import cli @@ -39,3 +43,43 @@ class TestCliVersion(unittest.TestCase): def test_version_info_gitinfo(self): version_info = cli.CLI.version_info(gitinfo=True) self.assertIn('python version', version_info['string']) + + +class TestCliSetupVaultSecrets(unittest.TestCase): + def test(self): + res = cli.CLI.setup_vault_secrets(None, None) + self.assertIsInstance(res, list) + + @patch('ansible.cli.get_file_vault_secret') + def test_password_file(self, mock_file_secret): + filename = '/dev/null/secret' + mock_file_secret.return_value = MagicMock(bytes=b'file1_password', + vault_id='file1', + filename=filename) + fake_loader = DictDataLoader({}) + res = cli.CLI.setup_vault_secrets(loader=fake_loader, + vault_ids=['secret1@%s' % filename, 'secret2'], + vault_password_files=[filename]) + self.assertIsInstance(res, list) + matches = vault.match_secrets(res, ['secret1']) + self.assertIn('secret1', [x[0] for x in matches]) + match = matches[0][1] + self.assertEqual(match.bytes, b'file1_password') + + @patch('ansible.cli.PromptVaultSecret') + def test_prompt(self, mock_prompt_secret): + mock_prompt_secret.return_value = MagicMock(bytes=b'prompt1_password', + vault_id='prompt1') + + fake_loader = DictDataLoader({}) + res = cli.CLI.setup_vault_secrets(loader=fake_loader, + vault_ids=['prompt1@prompt'], + ask_vault_pass=True) + + self.assertIsInstance(res, list) + matches = vault.match_secrets(res, ['prompt1']) + self.assertIn('prompt1', [x[0] for x in matches]) + # self.assertIn('prompt1', res) + # self.assertIn('secret1', res) + match = matches[0][1] + self.assertEqual(match.bytes, b'prompt1_password') diff --git a/test/units/cli/test_vault.py b/test/units/cli/test_vault.py new file mode 100644 index 0000000000..4f703121a5 --- /dev/null +++ b/test/units/cli/test_vault.py @@ -0,0 +1,166 @@ +# (c) 2017, Adrian Likins +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see . + +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +from ansible.compat.tests import unittest +from ansible.compat.tests.mock import patch +from units.mock.vault_helper import TextVaultSecret + +from ansible import errors +from ansible.cli.vault import VaultCLI + + +# TODO: make these tests assert something, likely by verifing +# mock calls + + +class TestVaultCli(unittest.TestCase): + def test_parse_empty(self): + cli = VaultCLI([]) + self.assertRaisesRegexp(errors.AnsibleOptionsError, + '.*Missing required action.*', + cli.parse) + + # FIXME: something weird seems to be afoot when parsing actions + # cli = VaultCLI(args=['view', '/dev/null/foo', 'mysecret3']) + # will skip '/dev/null/foo'. something in cli.CLI.set_action() ? + # maybe we self.args gets modified in a loop? + def test_parse_view_file(self): + cli = VaultCLI(args=['ansible-vault', 'view', '/dev/null/foo']) + cli.parse() + + def test_view_missing_file_no_secret(self): + cli = VaultCLI(args=['ansible-vault', 'view', '/dev/null/foo']) + cli.parse() + self.assertRaisesRegexp(errors.AnsibleOptionsError, + "A vault password is required to use Ansible's Vault", + cli.run) + + def test_encrypt_missing_file_no_secret(self): + cli = VaultCLI(args=['ansible-vault', 'encrypt', '/dev/null/foo']) + cli.parse() + self.assertRaisesRegexp(errors.AnsibleOptionsError, + "A vault password is required to use Ansible's Vault", + cli.run) + + @patch('ansible.cli.vault.VaultCLI.setup_vault_secrets') + @patch('ansible.cli.vault.VaultEditor') + def test_encrypt(self, mock_vault_editor, mock_setup_vault_secrets): + mock_setup_vault_secrets.return_value = [('default', TextVaultSecret('password'))] + cli = VaultCLI(args=['ansible-vault', 'encrypt', '/dev/null/foo']) + cli.parse() + cli.run() + + @patch('ansible.cli.vault.VaultCLI.setup_vault_secrets') + @patch('ansible.cli.vault.VaultEditor') + def test_encrypt_string(self, mock_vault_editor, mock_setup_vault_secrets): + mock_setup_vault_secrets.return_value = [('default', TextVaultSecret('password'))] + cli = VaultCLI(args=['ansible-vault', 'encrypt_string', + 'some string to encrypt']) + cli.parse() + cli.run() + + @patch('ansible.cli.vault.VaultCLI.setup_vault_secrets') + @patch('ansible.cli.vault.VaultEditor') + @patch('ansible.cli.vault.display.prompt', return_value='a_prompt') + def test_encrypt_string_prompt(self, mock_display, mock_vault_editor, mock_setup_vault_secrets): + mock_setup_vault_secrets.return_value = [('default', TextVaultSecret('password'))] + cli = VaultCLI(args=['ansible-vault', + 'encrypt_string', + '--prompt', + 'some string to encrypt']) + cli.parse() + cli.run() + + @patch('ansible.cli.vault.VaultCLI.setup_vault_secrets') + @patch('ansible.cli.vault.VaultEditor') + @patch('ansible.cli.vault.sys.stdin.read', return_value='This is data from stdin') + def test_encrypt_string_stdin(self, mock_stdin_read, mock_vault_editor, mock_setup_vault_secrets): + mock_setup_vault_secrets.return_value = [('default', TextVaultSecret('password'))] + cli = VaultCLI(args=['ansible-vault', + 'encrypt_string', + '--stdin-name', + 'the_var_from_stdin', + '-']) + cli.parse() + cli.run() + + @patch('ansible.cli.vault.VaultCLI.setup_vault_secrets') + @patch('ansible.cli.vault.VaultEditor') + def test_encrypt_string_names(self, mock_vault_editor, mock_setup_vault_secrets): + mock_setup_vault_secrets.return_value = [('default', TextVaultSecret('password'))] + cli = VaultCLI(args=['ansible-vault', 'encrypt_string', + '--name', 'foo1', + '--name', 'foo2', + 'some string to encrypt']) + cli.parse() + cli.run() + + @patch('ansible.cli.vault.VaultCLI.setup_vault_secrets') + @patch('ansible.cli.vault.VaultEditor') + def test_encrypt_string_more_args_than_names(self, mock_vault_editor, mock_setup_vault_secrets): + mock_setup_vault_secrets.return_value = [('default', TextVaultSecret('password'))] + cli = VaultCLI(args=['ansible-vault', 'encrypt_string', + '--name', 'foo1', + 'some string to encrypt', + 'other strings', + 'a few more string args']) + cli.parse() + cli.run() + + @patch('ansible.cli.vault.VaultCLI.setup_vault_secrets') + @patch('ansible.cli.vault.VaultEditor') + def test_create(self, mock_vault_editor, mock_setup_vault_secrets): + mock_setup_vault_secrets.return_value = [('default', TextVaultSecret('password'))] + cli = VaultCLI(args=['ansible-vault', 'create', '/dev/null/foo']) + cli.parse() + cli.run() + + @patch('ansible.cli.vault.VaultCLI.setup_vault_secrets') + @patch('ansible.cli.vault.VaultEditor') + def test_edit(self, mock_vault_editor, mock_setup_vault_secrets): + mock_setup_vault_secrets.return_value = [('default', TextVaultSecret('password'))] + cli = VaultCLI(args=['ansible-vault', 'edit', '/dev/null/foo']) + cli.parse() + cli.run() + + @patch('ansible.cli.vault.VaultCLI.setup_vault_secrets') + @patch('ansible.cli.vault.VaultEditor') + def test_decrypt(self, mock_vault_editor, mock_setup_vault_secrets): + mock_setup_vault_secrets.return_value = [('default', TextVaultSecret('password'))] + cli = VaultCLI(args=['ansible-vault', 'decrypt', '/dev/null/foo']) + cli.parse() + cli.run() + + @patch('ansible.cli.vault.VaultCLI.setup_vault_secrets') + @patch('ansible.cli.vault.VaultEditor') + def test_view(self, mock_vault_editor, mock_setup_vault_secrets): + mock_setup_vault_secrets.return_value = [('default', TextVaultSecret('password'))] + cli = VaultCLI(args=['ansible-vault', 'view', '/dev/null/foo']) + cli.parse() + cli.run() + + @patch('ansible.cli.vault.VaultCLI.setup_vault_secrets') + @patch('ansible.cli.vault.VaultEditor') + def test_rekey(self, mock_vault_editor, mock_setup_vault_secrets): + mock_setup_vault_secrets.return_value = [('default', TextVaultSecret('password'))] + cli = VaultCLI(args=['ansible-vault', 'rekey', '/dev/null/foo']) + cli.parse() + cli.run() diff --git a/test/units/mock/loader.py b/test/units/mock/loader.py index e120fe00ee..d05dd14505 100644 --- a/test/units/mock/loader.py +++ b/test/units/mock/loader.py @@ -35,6 +35,7 @@ class DictDataLoader(DataLoader): self._file_mapping = file_mapping self._build_known_directories() + self._vault_secrets = None def load_from_file(self, path, unsafe=False): if path in self._file_mapping: @@ -98,3 +99,6 @@ class DictDataLoader(DataLoader): def get_basedir(self): return os.getcwd() + + def set_vault_secrets(self, vault_secrets): + self._vault_secrets = vault_secrets diff --git a/test/units/mock/vault_helper.py b/test/units/mock/vault_helper.py new file mode 100644 index 0000000000..dcce9c7841 --- /dev/null +++ b/test/units/mock/vault_helper.py @@ -0,0 +1,39 @@ +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see . + +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +from ansible.module_utils._text import to_bytes + +from ansible.parsing.vault import VaultSecret + + +class TextVaultSecret(VaultSecret): + '''A secret piece of text. ie, a password. Tracks text encoding. + + The text encoding of the text may not be the default text encoding so + we keep track of the encoding so we encode it to the same bytes.''' + + def __init__(self, text, encoding=None, errors=None, _bytes=None): + super(TextVaultSecret, self).__init__() + self.text = text + self.encoding = encoding or 'utf-8' + self._bytes = _bytes + self.errors = errors or 'strict' + + @property + def bytes(self): + '''The text encoded with encoding, unless we specifically set _bytes.''' + return self._bytes or to_bytes(self.text, encoding=self.encoding, errors=self.errors) diff --git a/test/units/parsing/test_dataloader.py b/test/units/parsing/test_dataloader.py index 92d930cfbf..a7a16157c3 100644 --- a/test/units/parsing/test_dataloader.py +++ b/test/units/parsing/test_dataloader.py @@ -26,6 +26,8 @@ from ansible.compat.tests.mock import patch, mock_open from ansible.errors import AnsibleParserError, yaml_strings from ansible.module_utils._text import to_text from ansible.module_utils.six import PY3 + +from units.mock.vault_helper import TextVaultSecret from ansible.parsing.dataloader import DataLoader from units.mock.path import mock_unfrackpath_noop @@ -118,7 +120,8 @@ class TestDataLoaderWithVault(unittest.TestCase): def setUp(self): self._loader = DataLoader() - self._loader.set_vault_password('ansible') + vault_secrets = [('default', TextVaultSecret('ansible'))] + self._loader.set_vault_secrets(vault_secrets) def tearDown(self): pass diff --git a/test/units/parsing/vault/test_vault.py b/test/units/parsing/vault/test_vault.py index c657e82936..befb4a0e5a 100644 --- a/test/units/parsing/vault/test_vault.py +++ b/test/units/parsing/vault/test_vault.py @@ -24,6 +24,7 @@ __metaclass__ = type import binascii import io import os +import tempfile from binascii import hexlify import pytest @@ -33,9 +34,141 @@ from ansible.compat.tests import unittest from ansible import errors from ansible.module_utils import six from ansible.module_utils._text import to_bytes, to_text -from ansible.parsing.vault import VaultLib from ansible.parsing import vault +from units.mock.loader import DictDataLoader +from units.mock.vault_helper import TextVaultSecret + + +class TestVaultSecret(unittest.TestCase): + def test(self): + secret = vault.VaultSecret() + secret.load() + self.assertIsNone(secret._bytes) + + def test_bytes(self): + some_text = u'私はガラスを食べられます。それは私を傷つけません。' + _bytes = to_bytes(some_text) + secret = vault.VaultSecret(_bytes) + secret.load() + self.assertEqual(secret.bytes, _bytes) + + +class TestPromptVaultSecret(unittest.TestCase): + def test_empty_prompt_formats(self): + secret = vault.PromptVaultSecret(vault_id='test_id', prompt_formats=[]) + secret.load() + self.assertIsNone(secret._bytes) + + +class TestFileVaultSecret(unittest.TestCase): + def test(self): + secret = vault.FileVaultSecret() + self.assertIsNone(secret._bytes) + self.assertIsNone(secret._text) + + def test_repr_empty(self): + secret = vault.FileVaultSecret() + self.assertEqual(repr(secret), "FileVaultSecret()") + + def test_repr(self): + tmp_file = tempfile.NamedTemporaryFile(delete=False) + fake_loader = DictDataLoader({tmp_file.name: 'sdfadf'}) + + secret = vault.FileVaultSecret(loader=fake_loader, filename=tmp_file.name) + filename = tmp_file.name + tmp_file.close() + self.assertEqual(repr(secret), "FileVaultSecret(filename='%s')" % filename) + + def test_empty_bytes(self): + secret = vault.FileVaultSecret() + self.assertIsNone(secret.bytes) + + def test_file(self): + password = 'some password' + + tmp_file = tempfile.NamedTemporaryFile(delete=False) + tmp_file.write(to_bytes(password)) + tmp_file.close() + + fake_loader = DictDataLoader({tmp_file.name: 'sdfadf'}) + + secret = vault.FileVaultSecret(loader=fake_loader, filename=tmp_file.name) + secret.load() + + os.unlink(tmp_file.name) + + self.assertEqual(secret.bytes, to_bytes(password)) + + def test_file_not_a_directory(self): + filename = '/dev/null/foobar' + fake_loader = DictDataLoader({filename: 'sdfadf'}) + + secret = vault.FileVaultSecret(loader=fake_loader, filename=filename) + self.assertRaisesRegexp(errors.AnsibleError, + '.*Could not read vault password file.*/dev/null/foobar.*Not a directory', + secret.load) + + def test_file_not_found(self): + tmp_file = tempfile.NamedTemporaryFile() + filename = tmp_file.name + tmp_file.close() + + fake_loader = DictDataLoader({filename: 'sdfadf'}) + + secret = vault.FileVaultSecret(loader=fake_loader, filename=filename) + self.assertRaisesRegexp(errors.AnsibleError, + '.*Could not read vault password file.*%s.*' % filename, + secret.load) + + +class TestScriptVaultSecret(unittest.TestCase): + def test(self): + secret = vault.ScriptVaultSecret() + self.assertIsNone(secret._bytes) + self.assertIsNone(secret._text) + + +class TestGetFileVaultSecret(unittest.TestCase): + def test_file(self): + password = 'some password' + + tmp_file = tempfile.NamedTemporaryFile(delete=False) + tmp_file.write(to_bytes(password)) + tmp_file.close() + + fake_loader = DictDataLoader({tmp_file.name: 'sdfadf'}) + + secret = vault.get_file_vault_secret(filename=tmp_file.name, loader=fake_loader) + secret.load() + + os.unlink(tmp_file.name) + + self.assertEqual(secret.bytes, to_bytes(password)) + + def test_file_not_a_directory(self): + filename = '/dev/null/foobar' + fake_loader = DictDataLoader({filename: 'sdfadf'}) + + self.assertRaisesRegexp(errors.AnsibleError, + '.*The vault password file %s was not found.*' % filename, + vault.get_file_vault_secret, + filename=filename, + loader=fake_loader) + + def test_file_not_found(self): + tmp_file = tempfile.NamedTemporaryFile() + filename = tmp_file.name + tmp_file.close() + + fake_loader = DictDataLoader({filename: 'sdfadf'}) + + self.assertRaisesRegexp(errors.AnsibleError, + '.*The vault password file %s was not found.*' % filename, + vault.get_file_vault_secret, + filename=filename, + loader=fake_loader) + class TestVaultIsEncrypted(unittest.TestCase): def test_bytes_not_encrypted(self): @@ -251,11 +384,59 @@ class TestVaultCipherAes256PyCrypto(TestVaultCipherAes256): super(TestVaultCipherAes256PyCrypto, self).tearDown() +class TestMatchSecrets(unittest.TestCase): + def test_empty_tuple(self): + secrets = [tuple()] + vault_ids = ['vault_id_1'] + self.assertRaises(ValueError, + vault.match_secrets, + secrets, vault_ids) + + def test_empty_secrets(self): + matches = vault.match_secrets([], ['vault_id_1']) + self.assertEqual(matches, []) + + def test_single_match(self): + secret = TextVaultSecret('password') + matches = vault.match_secrets([('default', secret)], ['default']) + self.assertEquals(matches, [('default', secret)]) + + def test_no_matches(self): + secret = TextVaultSecret('password') + matches = vault.match_secrets([('default', secret)], ['not_default']) + self.assertEquals(matches, []) + + def test_multiple_matches(self): + secrets = [('vault_id1', TextVaultSecret('password1')), + ('vault_id2', TextVaultSecret('password2')), + ('vault_id1', TextVaultSecret('password3')), + ('vault_id4', TextVaultSecret('password4'))] + vault_ids = ['vault_id1', 'vault_id4'] + matches = vault.match_secrets(secrets, vault_ids) + + self.assertEqual(len(matches), 3) + expected = [('vault_id1', TextVaultSecret('password1')), + ('vault_id1', TextVaultSecret('password3')), + ('vault_id4', TextVaultSecret('password4'))] + self.assertEqual([x for x, y in matches], + [a for a, b in expected]) + + @pytest.mark.skipif(not vault.HAS_CRYPTOGRAPHY, reason="Skipping cryptography tests because cryptography is not installed") class TestVaultLib(unittest.TestCase): def setUp(self): - self.v = VaultLib('test-vault-password') + self.vault_password = "test-vault-password" + text_secret = TextVaultSecret(self.vault_password) + self.vault_secrets = [('default', text_secret), + ('test_id', text_secret)] + self.v = vault.VaultLib(self.vault_secrets) + + def _vault_secrets(self, vault_id, secret): + return [(vault_id, secret)] + + def _vault_secrets_from_password(self, vault_id, password): + return [(vault_id, TextVaultSecret(password))] def test_encrypt(self): plaintext = u'Some text to encrypt in a café' @@ -266,6 +447,15 @@ class TestVaultLib(unittest.TestCase): b_header = b'$ANSIBLE_VAULT;1.1;AES256\n' self.assertEqual(b_vaulttext[:len(b_header)], b_header) + def test_encrypt_vault_id(self): + plaintext = u'Some text to encrypt in a café' + b_vaulttext = self.v.encrypt(plaintext, vault_id='test_id') + + self.assertIsInstance(b_vaulttext, six.binary_type) + + b_header = b'$ANSIBLE_VAULT;1.2;AES256;test_id\n' + self.assertEqual(b_vaulttext[:len(b_header)], b_header) + def test_encrypt_bytes(self): plaintext = to_bytes(u'Some text to encrypt in a café') @@ -276,38 +466,60 @@ class TestVaultLib(unittest.TestCase): b_header = b'$ANSIBLE_VAULT;1.1;AES256\n' self.assertEqual(b_vaulttext[:len(b_header)], b_header) + def test_encrypt_no_secret_empty_secrets(self): + vault_secrets = [] + v = vault.VaultLib(vault_secrets) + + plaintext = u'Some text to encrypt in a café' + self.assertRaisesRegexp(vault.AnsibleVaultError, + '.*A vault password must be specified to encrypt data.*', + v.encrypt, + plaintext) + def test_is_encrypted(self): self.assertFalse(self.v.is_encrypted(b"foobar"), msg="encryption check on plaintext yielded false positive") b_data = b"$ANSIBLE_VAULT;9.9;TEST\n%s" % hexlify(b"ansible") self.assertTrue(self.v.is_encrypted(b_data), msg="encryption check on headered text failed") - def test_format_output(self): - self.v.cipher_name = "TEST" + def test_format_vaulttext_envelope(self): + cipher_name = "TEST" b_ciphertext = b"ansible" - b_vaulttext = self.v._format_output(b_ciphertext) + b_vaulttext = vault.format_vaulttext_envelope(b_ciphertext, + cipher_name, + version=self.v.b_version, + vault_id='default') b_lines = b_vaulttext.split(b'\n') self.assertGreater(len(b_lines), 1, msg="failed to properly add header") b_header = b_lines[0] - self.assertTrue(b_header.endswith(b';TEST'), msg="header does not end with cipher name") + # self.assertTrue(b_header.endswith(b';TEST'), msg="header does not end with cipher name") b_header_parts = b_header.split(b';') - self.assertEqual(len(b_header_parts), 3, msg="header has the wrong number of parts") + self.assertEqual(len(b_header_parts), 4, msg="header has the wrong number of parts") self.assertEqual(b_header_parts[0], b'$ANSIBLE_VAULT', msg="header does not start with $ANSIBLE_VAULT") self.assertEqual(b_header_parts[1], self.v.b_version, msg="header version is incorrect") self.assertEqual(b_header_parts[2], b'TEST', msg="header does not end with cipher name") - def test_split_header(self): + # And just to verify, lets parse the results and compare + b_ciphertext2, b_version2, cipher_name2, vault_id2 = \ + vault.parse_vaulttext_envelope(b_vaulttext) + self.assertEqual(b_ciphertext, b_ciphertext2) + self.assertEqual(self.v.b_version, b_version2) + self.assertEqual(cipher_name, cipher_name2) + self.assertEqual('default', vault_id2) + + def test_parse_vaulttext_envelope(self): b_vaulttext = b"$ANSIBLE_VAULT;9.9;TEST\nansible" - b_ciphertext = self.v._split_header(b_vaulttext) + b_ciphertext, b_version, cipher_name, vault_id = vault.parse_vaulttext_envelope(b_vaulttext) b_lines = b_ciphertext.split(b'\n') self.assertEqual(b_lines[0], b"ansible", msg="Payload was not properly split from the header") - self.assertEqual(self.v.cipher_name, u'TEST', msg="cipher name was not properly set") - self.assertEqual(self.v.b_version, b"9.9", msg="version was not properly set") + self.assertEqual(cipher_name, u'TEST', msg="cipher name was not properly set") + self.assertEqual(b_version, b"9.9", msg="version was not properly set") def test_encrypt_decrypt_aes(self): self.v.cipher_name = u'AES' - self.v.b_password = b'ansible' + vault_secrets = self._vault_secrets_from_password('default', 'ansible') + self.v.secrets = vault_secrets # AES encryption code has been removed, so this is old output for # AES-encrypted 'foobar' with password 'ansible'. b_vaulttext = b'''$ANSIBLE_VAULT;1.1;AES @@ -326,6 +538,67 @@ fe3db930508b65e0ff5947e4386b79af8ab094017629590ef6ba486814cf70f8e4ab0ed0c7d2587e self.assertNotEqual(b_vaulttext, b"foobar", msg="encryption failed") self.assertEqual(b_plaintext, b"foobar", msg="decryption failed") + def test_encrypt_decrypt_aes256_none_secrets(self): + vault_secrets = self._vault_secrets_from_password('default', 'ansible') + v = vault.VaultLib(vault_secrets) + + plaintext = u"foobar" + b_vaulttext = v.encrypt(plaintext) + + # VaultLib will default to empty {} if secrets is None + v_none = vault.VaultLib(None) + # so set secrets None explicitly + v_none.secrets = None + self.assertRaisesRegexp(vault.AnsibleVaultError, + '.*A vault password must be specified to decrypt data.*', + v_none.decrypt, + b_vaulttext) + + def test_encrypt_decrypt_aes256_empty_secrets(self): + vault_secrets = self._vault_secrets_from_password('default', 'ansible') + v = vault.VaultLib(vault_secrets) + + plaintext = u"foobar" + b_vaulttext = v.encrypt(plaintext) + + vault_secrets_empty = [] + v_none = vault.VaultLib(vault_secrets_empty) + + self.assertRaisesRegexp(vault.AnsibleVaultError, + '.*Attempting to decrypt but no vault secrets found.*', + v_none.decrypt, + b_vaulttext) + + def test_encrypt_decrypt_aes256_multiple_secrets_all_wrong(self): + plaintext = u'Some text to encrypt in a café' + b_vaulttext = self.v.encrypt(plaintext) + + vault_secrets = [('default', TextVaultSecret('another-wrong-password')), + ('wrong-password', TextVaultSecret('wrong-password'))] + + v_multi = vault.VaultLib(vault_secrets) + self.assertRaisesRegexp(errors.AnsibleError, + '.*Decryption failed.*', + v_multi.decrypt, + b_vaulttext, + filename='/dev/null/fake/filename') + + def test_encrypt_decrypt_aes256_multiple_secrets_one_valid(self): + plaintext = u'Some text to encrypt in a café' + b_vaulttext = self.v.encrypt(plaintext) + + correct_secret = TextVaultSecret(self.vault_password) + wrong_secret = TextVaultSecret('wrong-password') + + vault_secrets = [('default', wrong_secret), + ('corect_secret', correct_secret), + ('wrong_secret', wrong_secret)] + + v_multi = vault.VaultLib(vault_secrets) + b_plaintext = v_multi.decrypt(b_vaulttext) + self.assertNotEqual(b_vaulttext, to_bytes(plaintext), msg="encryption failed") + self.assertEqual(b_plaintext, to_bytes(plaintext), msg="decryption failed") + def test_encrypt_decrypt_aes256_existing_vault(self): self.v.cipher_name = u'AES256' b_orig_plaintext = b"Setec Astronomy" @@ -380,6 +653,27 @@ fe3db930508b65e0ff5947e4386b79af8ab094017629590ef6ba486814cf70f8e4ab0ed0c7d2587e # assert we throw an error self.v.decrypt(b_invalid_ciphertext) + def test_decrypt_non_default_1_2(self): + b_expected_plaintext = to_bytes('foo bar\n') + vaulttext = '''$ANSIBLE_VAULT;1.2;AES256;ansible_devel +65616435333934613466373335363332373764363365633035303466643439313864663837393234 +3330656363343637313962633731333237313636633534630a386264363438363362326132363239 +39363166646664346264383934393935653933316263333838386362633534326664646166663736 +6462303664383765650a356637643633366663643566353036303162386237336233393065393164 +6264''' + + vault_secrets = self._vault_secrets_from_password('default', 'ansible') + v = vault.VaultLib(vault_secrets) + + b_vaulttext = to_bytes(vaulttext) + + b_plaintext = v.decrypt(b_vaulttext) + self.assertEqual(b_expected_plaintext, b_plaintext) + + b_ciphertext, b_version, cipher_name, vault_id = vault.parse_vaulttext_envelope(b_vaulttext) + self.assertEqual('ansible_devel', vault_id) + self.assertEqual(b'1.2', b_version) + def test_encrypt_encrypted(self): self.v.cipher_name = u'AES' b_vaulttext = b"$ANSIBLE_VAULT;9.9;TEST\n%s" % hexlify(b"ansible") diff --git a/test/units/parsing/vault/test_vault_editor.py b/test/units/parsing/vault/test_vault_editor.py index e216b67a54..30c68f440d 100644 --- a/test/units/parsing/vault/test_vault_editor.py +++ b/test/units/parsing/vault/test_vault_editor.py @@ -30,8 +30,11 @@ from ansible.compat.tests.mock import patch from ansible import errors from ansible.parsing import vault +from ansible.parsing.vault import VaultLib, VaultEditor, match_encrypt_secret + from ansible.module_utils._text import to_bytes, to_text +from units.mock.vault_helper import TextVaultSecret v10_data = """$ANSIBLE_VAULT;1.0;AES 53616c7465645f5fd0026926a2d415a28a2622116273fbc90e377225c12a347e1daf4456d36a77f9 @@ -52,6 +55,14 @@ class TestVaultEditor(unittest.TestCase): def setUp(self): self._test_dir = None + self.vault_password = "test-vault-password" + vault_secret = TextVaultSecret(self.vault_password) + self.vault_secrets = [('vault_secret', vault_secret), + ('default', vault_secret)] + + @property + def vault_secret(self): + return match_encrypt_secret(self.vault_secrets)[1] def tearDown(self): if self._test_dir: @@ -59,6 +70,11 @@ class TestVaultEditor(unittest.TestCase): # shutil.rmtree(self._test_dir) self._test_dir = None + def _secrets(self, password): + vault_secret = TextVaultSecret(password) + vault_secrets = [('default', vault_secret)] + return vault_secrets + def test_methods_exist(self): v = vault.VaultEditor(None) slots = ['create_file', @@ -83,6 +99,11 @@ class TestVaultEditor(unittest.TestCase): opened_file.close() return file_path + def _vault_editor(self, vault_secrets=None): + if vault_secrets is None: + vault_secrets = self._secrets(self.vault_password) + return VaultEditor(VaultLib(vault_secrets)) + @patch('ansible.parsing.vault.call') def test_edit_file_helper_empty_target(self, mock_sp_call): self._test_dir = self._create_test_dir() @@ -91,9 +112,9 @@ class TestVaultEditor(unittest.TestCase): src_file_path = self._create_file(self._test_dir, 'src_file', content=src_contents) mock_sp_call.side_effect = self._faux_command - ve = vault.VaultEditor('password') + ve = self._vault_editor() - b_ciphertext = ve._edit_file_helper(src_file_path) + b_ciphertext = ve._edit_file_helper(src_file_path, self.vault_secret) self.assertNotEqual(src_contents, b_ciphertext) @@ -107,12 +128,13 @@ class TestVaultEditor(unittest.TestCase): error_txt = 'calling editor raised an exception' mock_sp_call.side_effect = errors.AnsibleError(error_txt) - ve = vault.VaultEditor('password') + ve = self._vault_editor() self.assertRaisesRegexp(errors.AnsibleError, error_txt, ve._edit_file_helper, - src_file_path) + src_file_path, + self.vault_secret) @patch('ansible.parsing.vault.call') def test_edit_file_helper_symlink_target(self, mock_sp_call): @@ -126,9 +148,9 @@ class TestVaultEditor(unittest.TestCase): os.symlink(src_file_path, src_file_link_path) mock_sp_call.side_effect = self._faux_command - ve = vault.VaultEditor('password') + ve = self._vault_editor() - b_ciphertext = ve._edit_file_helper(src_file_link_path) + b_ciphertext = ve._edit_file_helper(src_file_link_path, self.vault_secret) self.assertNotEqual(src_file_contents, b_ciphertext, 'b_ciphertext should be encrypted and not equal to src_contents') @@ -160,9 +182,9 @@ class TestVaultEditor(unittest.TestCase): self._faux_editor(editor_args, src_file_contents) mock_sp_call.side_effect = faux_editor - ve = vault.VaultEditor('password') + ve = self._vault_editor() - ve._edit_file_helper(src_file_path, existing_data=src_file_contents) + ve._edit_file_helper(src_file_path, self.vault_secret, existing_data=src_file_contents) new_target_file = open(src_file_path, 'rb') new_target_file_contents = new_target_file.read() @@ -193,13 +215,17 @@ class TestVaultEditor(unittest.TestCase): src_file_contents = to_bytes("some info in a file\nyup.") src_file_path = self._create_file(self._test_dir, 'src_file', content=src_file_contents) - ve = vault.VaultEditor('password') - ve.encrypt_file(src_file_path) + ve = self._vault_editor() + ve.encrypt_file(src_file_path, self.vault_secret) + # FIXME: update to just set self._secrets or just a new vault secret id new_password = 'password2:electricbugaloo' - ve.rekey_file(src_file_path, new_password) + new_vault_secret = TextVaultSecret(new_password) + new_vault_secrets = [('default', new_vault_secret)] + ve.rekey_file(src_file_path, vault.match_encrypt_secret(new_vault_secrets)[1]) - new_ve = vault.VaultEditor(new_password) + # FIXME: can just update self._secrets here + new_ve = vault.VaultEditor(VaultLib(new_vault_secrets)) self._assert_file_is_encrypted(new_ve, src_file_path, src_file_contents) def test_rekey_file_no_new_password(self): @@ -208,8 +234,8 @@ class TestVaultEditor(unittest.TestCase): src_file_contents = to_bytes("some info in a file\nyup.") src_file_path = self._create_file(self._test_dir, 'src_file', content=src_file_contents) - ve = vault.VaultEditor('password') - ve.encrypt_file(src_file_path) + ve = self._vault_editor() + ve.encrypt_file(src_file_path, self.vault_secret) self.assertRaisesRegexp(errors.AnsibleError, 'The value for the new_password to rekey', @@ -223,7 +249,7 @@ class TestVaultEditor(unittest.TestCase): src_file_contents = to_bytes("some info in a file\nyup.") src_file_path = self._create_file(self._test_dir, 'src_file', content=src_file_contents) - ve = vault.VaultEditor('password') + ve = self._vault_editor() new_password = 'password2:electricbugaloo' self.assertRaisesRegexp(errors.AnsibleError, @@ -237,11 +263,11 @@ class TestVaultEditor(unittest.TestCase): src_file_contents = to_bytes("some info in a file\nyup.") src_file_path = self._create_file(self._test_dir, 'src_file', content=src_file_contents) - ve = vault.VaultEditor('password') - ve.encrypt_file(src_file_path) + ve = self._vault_editor() + ve.encrypt_file(src_file_path, self.vault_secret) res = ve.plaintext(src_file_path) - self.assertEquals(src_file_contents, res) + self.assertEqual(src_file_contents, res) def test_plaintext_not_encrypted(self): self._test_dir = self._create_test_dir() @@ -249,7 +275,7 @@ class TestVaultEditor(unittest.TestCase): src_file_contents = to_bytes("some info in a file\nyup.") src_file_path = self._create_file(self._test_dir, 'src_file', content=src_file_contents) - ve = vault.VaultEditor('password') + ve = self._vault_editor() self.assertRaisesRegexp(errors.AnsibleError, 'input is not vault encrypted data', ve.plaintext, @@ -260,8 +286,8 @@ class TestVaultEditor(unittest.TestCase): src_file_contents = to_bytes("some info in a file\nyup.") src_file_path = self._create_file(self._test_dir, 'src_file', content=src_file_contents) - ve = vault.VaultEditor('password') - ve.encrypt_file(src_file_path) + ve = self._vault_editor() + ve.encrypt_file(src_file_path, self.vault_secret) self._assert_file_is_encrypted(ve, src_file_path, src_file_contents) @@ -274,8 +300,8 @@ class TestVaultEditor(unittest.TestCase): src_file_link_path = os.path.join(self._test_dir, 'a_link_to_dest_file') os.symlink(src_file_path, src_file_link_path) - ve = vault.VaultEditor('password') - ve.encrypt_file(src_file_link_path) + ve = self._vault_editor() + ve.encrypt_file(src_file_link_path, self.vault_secret) self._assert_file_is_encrypted(ve, src_file_path, src_file_contents) self._assert_file_is_encrypted(ve, src_file_link_path, src_file_contents) @@ -296,9 +322,9 @@ class TestVaultEditor(unittest.TestCase): mock_sp_call.side_effect = faux_editor - ve = vault.VaultEditor('password') + ve = self._vault_editor() - ve.encrypt_file(src_file_path) + ve.encrypt_file(src_file_path, self.vault_secret) ve.edit_file(src_file_path) new_src_file = open(src_file_path, 'rb') @@ -308,7 +334,6 @@ class TestVaultEditor(unittest.TestCase): self.assertEqual(src_file_plaintext, new_src_contents) new_stat = os.stat(src_file_path) - print(new_stat) @patch('ansible.parsing.vault.call') def test_edit_file_symlink(self, mock_sp_call): @@ -324,9 +349,9 @@ class TestVaultEditor(unittest.TestCase): mock_sp_call.side_effect = faux_editor - ve = vault.VaultEditor('password') + ve = self._vault_editor() - ve.encrypt_file(src_file_path) + ve.encrypt_file(src_file_path, self.vault_secret) src_file_link_path = os.path.join(self._test_dir, 'a_link_to_dest_file') @@ -360,7 +385,7 @@ class TestVaultEditor(unittest.TestCase): mock_sp_call.side_effect = faux_editor - ve = vault.VaultEditor('password') + ve = self._vault_editor() self.assertRaisesRegexp(errors.AnsibleError, 'input is not vault encrypted data', ve.edit_file, @@ -371,18 +396,19 @@ class TestVaultEditor(unittest.TestCase): src_contents = to_bytes("some info in a file\nyup.") src_file_path = self._create_file(self._test_dir, 'src_file', content=src_contents) - ve = vault.VaultEditor('password') + ve = self._vault_editor() self.assertRaisesRegexp(errors.AnsibleError, 'please use .edit. instead', ve.create_file, - src_file_path) + src_file_path, + self.vault_secret) def test_decrypt_file_exception(self): self._test_dir = self._create_test_dir() src_contents = to_bytes("some info in a file\nyup.") src_file_path = self._create_file(self._test_dir, 'src_file', content=src_contents) - ve = vault.VaultEditor('password') + ve = self._vault_editor() self.assertRaisesRegexp(errors.AnsibleError, 'input is not vault encrypted data', ve.decrypt_file, @@ -398,8 +424,9 @@ class TestVaultEditor(unittest.TestCase): tmp_file = tempfile.NamedTemporaryFile() os.unlink(tmp_file.name) - ve = vault.VaultEditor("ansible") - ve.create_file(tmp_file.name) + _secrets = self._secrets('ansible') + ve = self._vault_editor(_secrets) + ve.create_file(tmp_file.name, vault.match_encrypt_secret(_secrets)[1]) self.assertTrue(os.path.exists(tmp_file.name)) @@ -409,7 +436,7 @@ class TestVaultEditor(unittest.TestCase): with v10_file as f: f.write(to_bytes(v10_data)) - ve = vault.VaultEditor("ansible") + ve = self._vault_editor(self._secrets("ansible")) # make sure the password functions for the cipher error_hit = False @@ -417,6 +444,7 @@ class TestVaultEditor(unittest.TestCase): ve.decrypt_file(v10_file.name) except errors.AnsibleError: error_hit = True + raise # verify decrypted content f = open(v10_file.name, "rb") @@ -426,7 +454,7 @@ class TestVaultEditor(unittest.TestCase): os.unlink(v10_file.name) assert error_hit is False, "error decrypting 1.0 file" - self.assertEquals(fdata.strip(), "foo") + self.assertEqual(fdata.strip(), "foo") assert fdata.strip() == "foo", "incorrect decryption of 1.0 file: %s" % fdata.strip() def test_decrypt_1_1(self): @@ -434,13 +462,14 @@ class TestVaultEditor(unittest.TestCase): with v11_file as f: f.write(to_bytes(v11_data)) - ve = vault.VaultEditor("ansible") + ve = self._vault_editor(self._secrets("ansible")) # make sure the password functions for the cipher error_hit = False try: ve.decrypt_file(v11_file.name) except errors.AnsibleError: + raise error_hit = True # verify decrypted content @@ -450,21 +479,23 @@ class TestVaultEditor(unittest.TestCase): os.unlink(v11_file.name) - assert error_hit is False, "error decrypting 1.0 file" - assert fdata.strip() == "foo", "incorrect decryption of 1.0 file: %s" % fdata.strip() + assert error_hit is False, "error decrypting 1.1 file" + assert fdata.strip() == "foo", "incorrect decryption of 1.1 file: %s" % fdata.strip() def test_rekey_migration(self): v10_file = tempfile.NamedTemporaryFile(delete=False) with v10_file as f: f.write(to_bytes(v10_data)) - ve = vault.VaultEditor("ansible") + ve = self._vault_editor(self._secrets("ansible")) # make sure the password functions for the cipher error_hit = False + new_secrets = self._secrets("ansible2") try: - ve.rekey_file(v10_file.name, 'ansible2') + ve.rekey_file(v10_file.name, vault.match_encrypt_secret(new_secrets)[1]) except errors.AnsibleError: + raise error_hit = True # verify decrypted content @@ -475,30 +506,31 @@ class TestVaultEditor(unittest.TestCase): assert error_hit is False, "error rekeying 1.0 file to 1.1" # ensure filedata can be decrypted, is 1.1 and is AES256 - vl = vault.VaultLib("ansible2") + vl = VaultLib(new_secrets) dec_data = None error_hit = False try: dec_data = vl.decrypt(fdata) except errors.AnsibleError: + raise error_hit = True os.unlink(v10_file.name) - assert vl.cipher_name == "AES256", "wrong cipher name set after rekey: %s" % vl.cipher_name + self.assertIn(b'AES256', fdata, 'AES256 was not found in vault file %s' % to_text(fdata)) assert error_hit is False, "error decrypting migrated 1.0 file" assert dec_data.strip() == b"foo", "incorrect decryption of rekeyed/migrated file: %s" % dec_data def test_real_path_dash(self): filename = '-' - ve = vault.VaultEditor('password') + ve = self._vault_editor() res = ve._real_path(filename) self.assertEqual(res, '-') def test_real_path_dev_null(self): filename = '/dev/null' - ve = vault.VaultEditor('password') + ve = self._vault_editor() res = ve._real_path(filename) self.assertEqual(res, '/dev/null') @@ -510,7 +542,7 @@ class TestVaultEditor(unittest.TestCase): os.symlink(file_path, file_link_path) - ve = vault.VaultEditor('password') + ve = self._vault_editor() res = ve._real_path(file_link_path) self.assertEqual(res, file_path) diff --git a/test/units/parsing/yaml/test_dumper.py b/test/units/parsing/yaml/test_dumper.py index 62948fc16a..ba8fa523fb 100644 --- a/test/units/parsing/yaml/test_dumper.py +++ b/test/units/parsing/yaml/test_dumper.py @@ -19,12 +19,6 @@ from __future__ import (absolute_import, division, print_function) __metaclass__ = type import io -import yaml - -try: - from _yaml import ParserError -except ImportError: - from yaml.parser import ParserError from ansible.compat.tests import unittest from ansible.parsing import vault @@ -32,12 +26,15 @@ from ansible.parsing.yaml import dumper, objects from ansible.parsing.yaml.loader import AnsibleLoader from units.mock.yaml_helper import YamlTestUtils +from units.mock.vault_helper import TextVaultSecret class TestAnsibleDumper(unittest.TestCase, YamlTestUtils): def setUp(self): self.vault_password = "hunter42" - self.good_vault = vault.VaultLib(self.vault_password) + vault_secret = TextVaultSecret(self.vault_password) + self.vault_secrets = [('vault_secret', vault_secret)] + self.good_vault = vault.VaultLib(self.vault_secrets) self.vault = self.good_vault self.stream = self._build_stream() self.dumper = dumper.AnsibleDumper @@ -48,11 +45,12 @@ class TestAnsibleDumper(unittest.TestCase, YamlTestUtils): return stream def _loader(self, stream): - return AnsibleLoader(stream, vault_password=self.vault_password) + return AnsibleLoader(stream, vault_secrets=self.vault.secrets) def test(self): plaintext = 'This is a string we are going to encrypt.' - avu = objects.AnsibleVaultEncryptedUnicode.from_plaintext(plaintext, vault=self.vault) + avu = objects.AnsibleVaultEncryptedUnicode.from_plaintext(plaintext, vault=self.vault, + secret=vault.match_secrets(self.vault_secrets, ['vault_secret'])[0][1]) yaml_out = self._dump_string(avu, dumper=self.dumper) stream = self._build_stream(yaml_out) @@ -60,4 +58,4 @@ class TestAnsibleDumper(unittest.TestCase, YamlTestUtils): data_from_yaml = loader.get_single_data() - self.assertEquals(plaintext, data_from_yaml.data) + self.assertEqual(plaintext, data_from_yaml.data) diff --git a/test/units/parsing/yaml/test_loader.py b/test/units/parsing/yaml/test_loader.py index e78bce1ed5..6865d69092 100644 --- a/test/units/parsing/yaml/test_loader.py +++ b/test/units/parsing/yaml/test_loader.py @@ -34,6 +34,7 @@ from ansible.parsing.yaml.objects import AnsibleVaultEncryptedUnicode from ansible.parsing.yaml.dumper import AnsibleDumper from units.mock.yaml_helper import YamlTestUtils +from units.mock.vault_helper import TextVaultSecret try: from _yaml import ParserError @@ -176,25 +177,35 @@ class TestAnsibleLoaderBasic(unittest.TestCase): class TestAnsibleLoaderVault(unittest.TestCase, YamlTestUtils): def setUp(self): self.vault_password = "hunter42" - self.vault = vault.VaultLib(self.vault_password) + vault_secret = TextVaultSecret(self.vault_password) + self.vault_secrets = [('vault_secret', vault_secret), + ('default', vault_secret)] + self.vault = vault.VaultLib(self.vault_secrets) + + @property + def vault_secret(self): + return vault.match_encrypt_secret(self.vault_secrets)[1] def test_wrong_password(self): plaintext = u"Ansible" bob_password = "this is a different password" - bobs_vault = vault.VaultLib(bob_password) + bobs_secret = TextVaultSecret(bob_password) + bobs_secrets = [('default', bobs_secret)] - ciphertext = bobs_vault.encrypt(plaintext) + bobs_vault = vault.VaultLib(bobs_secrets) + + ciphertext = bobs_vault.encrypt(plaintext, vault.match_encrypt_secret(bobs_secrets)[1]) try: self.vault.decrypt(ciphertext) except Exception as e: self.assertIsInstance(e, errors.AnsibleError) - self.assertEqual(e.message, 'Decryption failed') + self.assertEqual(e.message, 'Decryption failed (no vault secrets would found that could decrypt)') def _encrypt_plaintext(self, plaintext): # Construct a yaml repr of a vault by hand - vaulted_var_bytes = self.vault.encrypt(plaintext) + vaulted_var_bytes = self.vault.encrypt(plaintext, self.vault_secret) # add yaml tag vaulted_var = vaulted_var_bytes.decode() @@ -213,7 +224,7 @@ class TestAnsibleLoaderVault(unittest.TestCase, YamlTestUtils): return stream def _loader(self, stream): - return AnsibleLoader(stream, vault_password=self.vault_password) + return AnsibleLoader(stream, vault_secrets=self.vault.secrets) def _load_yaml(self, yaml_text, password): stream = self._build_stream(yaml_text) @@ -224,11 +235,11 @@ class TestAnsibleLoaderVault(unittest.TestCase, YamlTestUtils): return data_from_yaml def test_dump_load_cycle(self): - avu = AnsibleVaultEncryptedUnicode.from_plaintext('The plaintext for test_dump_load_cycle.', vault=self.vault) + avu = AnsibleVaultEncryptedUnicode.from_plaintext('The plaintext for test_dump_load_cycle.', self.vault, self.vault_secret) self._dump_load_cycle(avu) def test_embedded_vault_from_dump(self): - avu = AnsibleVaultEncryptedUnicode.from_plaintext('setec astronomy', vault=self.vault) + avu = AnsibleVaultEncryptedUnicode.from_plaintext('setec astronomy', self.vault, self.vault_secret) blip = {'stuff1': [{'a dict key': 24}, {'shhh-ssh-secrets': avu, 'nothing to see here': 'move along'}], @@ -239,7 +250,6 @@ class TestAnsibleLoaderVault(unittest.TestCase, YamlTestUtils): self._dump_stream(blip, stream, dumper=AnsibleDumper) - print(stream.getvalue()) stream.seek(0) stream.seek(0) @@ -247,6 +257,7 @@ class TestAnsibleLoaderVault(unittest.TestCase, YamlTestUtils): loader = self._loader(stream) data_from_yaml = loader.get_data() + stream2 = NameStringIO(u'') # verify we can dump the object again self._dump_stream(data_from_yaml, stream2, dumper=AnsibleDumper) @@ -266,20 +277,20 @@ class TestAnsibleLoaderVault(unittest.TestCase, YamlTestUtils): data_from_yaml = self._load_yaml(yaml_text, self.vault_password) vault_string = data_from_yaml['the_secret'] - self.assertEquals(plaintext_var, data_from_yaml['the_secret']) + self.assertEqual(plaintext_var, data_from_yaml['the_secret']) test_dict = {} test_dict[vault_string] = 'did this work?' - self.assertEquals(vault_string.data, vault_string) + self.assertEqual(vault_string.data, vault_string) # This looks weird and useless, but the object in question has a custom __eq__ - self.assertEquals(vault_string, vault_string) + self.assertEqual(vault_string, vault_string) another_vault_string = data_from_yaml['another_secret'] different_vault_string = data_from_yaml['different_secret'] - self.assertEquals(vault_string, another_vault_string) + self.assertEqual(vault_string, another_vault_string) self.assertNotEquals(vault_string, different_vault_string) # More testing of __eq__/__ne__ @@ -288,8 +299,8 @@ class TestAnsibleLoaderVault(unittest.TestCase, YamlTestUtils): # Note this is a compare of the str/unicode of these, they are different types # so we want to test self == other, and other == self etc - self.assertEquals(plaintext_var, vault_string) - self.assertEquals(vault_string, plaintext_var) + self.assertEqual(plaintext_var, vault_string) + self.assertEqual(vault_string, plaintext_var) self.assertFalse(plaintext_var != vault_string) self.assertFalse(vault_string != plaintext_var) diff --git a/test/units/parsing/yaml/test_objects.py b/test/units/parsing/yaml/test_objects.py index 99d27ad8b7..71fa7670b8 100644 --- a/test/units/parsing/yaml/test_objects.py +++ b/test/units/parsing/yaml/test_objects.py @@ -21,6 +21,7 @@ __metaclass__ = type from ansible.compat.tests import unittest +from ansible.errors import AnsibleError from ansible.parsing import vault from ansible.parsing.yaml.loader import AnsibleLoader @@ -29,6 +30,7 @@ from ansible.parsing.yaml.loader import AnsibleLoader from ansible.parsing.yaml import objects from units.mock.yaml_helper import YamlTestUtils +from units.mock.vault_helper import TextVaultSecret class TestAnsibleVaultUnicodeNoVault(unittest.TestCase, YamlTestUtils): @@ -68,16 +70,22 @@ class TestAnsibleVaultUnicodeNoVault(unittest.TestCase, YamlTestUtils): class TestAnsibleVaultEncryptedUnicode(unittest.TestCase, YamlTestUtils): def setUp(self): - self.vault_password = "hunter42" - self.good_vault = vault.VaultLib(self.vault_password) + self.good_vault_password = "hunter42" + good_vault_secret = TextVaultSecret(self.good_vault_password) + self.good_vault_secrets = [('good_vault_password', good_vault_secret)] + self.good_vault = vault.VaultLib(self.good_vault_secrets) + # TODO: make this use two vault secret identities instead of two vaultSecrets self.wrong_vault_password = 'not-hunter42' - self.wrong_vault = vault.VaultLib(self.wrong_vault_password) + wrong_vault_secret = TextVaultSecret(self.wrong_vault_password) + self.wrong_vault_secrets = [('wrong_vault_password', wrong_vault_secret)] + self.wrong_vault = vault.VaultLib(self.wrong_vault_secrets) self.vault = self.good_vault + self.vault_secrets = self.good_vault_secrets def _loader(self, stream): - return AnsibleLoader(stream, vault_password=self.vault_password) + return AnsibleLoader(stream, vault_secrets=self.vault_secrets) def test_dump_load_cycle(self): aveu = self._from_plaintext('the test string for TestAnsibleVaultEncryptedUnicode.test_dump_load_cycle') @@ -86,12 +94,13 @@ class TestAnsibleVaultEncryptedUnicode(unittest.TestCase, YamlTestUtils): def assert_values(self, avu, seq): self.assertIsInstance(avu, objects.AnsibleVaultEncryptedUnicode) - self.assertEquals(avu, seq) + self.assertEqual(avu, seq) self.assertTrue(avu.vault is self.vault) self.assertIsInstance(avu.vault, vault.VaultLib) def _from_plaintext(self, seq): - return objects.AnsibleVaultEncryptedUnicode.from_plaintext(seq, vault=self.vault) + id_secret = vault.match_encrypt_secret(self.good_vault_secrets) + return objects.AnsibleVaultEncryptedUnicode.from_plaintext(seq, vault=self.vault, secret=id_secret[1]) def _from_ciphertext(self, ciphertext): avu = objects.AnsibleVaultEncryptedUnicode(ciphertext) @@ -126,7 +135,7 @@ class TestAnsibleVaultEncryptedUnicode(unittest.TestCase, YamlTestUtils): avu = self._from_plaintext(seq) b_avu = avu.encode('utf-8', 'strict') self.assertIsInstance(avu, objects.AnsibleVaultEncryptedUnicode) - self.assertEquals(b_avu, seq.encode('utf-8', 'strict')) + self.assertEqual(b_avu, seq.encode('utf-8', 'strict')) self.assertTrue(avu.vault is self.vault) self.assertIsInstance(avu.vault, vault.VaultLib) @@ -135,4 +144,8 @@ class TestAnsibleVaultEncryptedUnicode(unittest.TestCase, YamlTestUtils): seq = '' self.vault = self.wrong_vault avu = self._from_plaintext(seq) - self.assert_values(avu, seq) + + def compare(avu, seq): + return avu == seq + + self.assertRaises(AnsibleError, compare, avu, seq)