1
0
Fork 0
mirror of https://github.com/ansible-collections/community.general.git synced 2024-09-14 20:13:21 +02:00
Fixes several bugs exposed in #34893
* Fixes relative path handling in copy so that it splits directories and
  reconstructs the correct file path
* Return failed in the proper circumstances
This commit is contained in:
Toshio Kuratomi 2018-05-16 09:09:32 -07:00 committed by Adam Miller
parent 4373b155a5
commit ca4147f2cc
4 changed files with 332 additions and 4 deletions

View file

@ -237,19 +237,26 @@ import errno
import tempfile
import traceback
# import module snippets
from ansible.module_utils.basic import AnsibleModule
from ansible.module_utils._text import to_bytes, to_native
class AnsibleModuleError(Exception):
def __init__(self, results):
self.results = results
def split_pre_existing_dir(dirname):
'''
Return the first pre-existing directory and a list of the new directories that will be created.
'''
head, tail = os.path.split(dirname)
b_head = to_bytes(head, errors='surrogate_or_strict')
if head == '':
return ('.', [tail])
if not os.path.exists(b_head):
if head == '/':
raise AnsibleModuleError(results={'msg': "The '/' directory doesn't exist on this machine."})
(pre_existing_dir, new_directory_list) = split_pre_existing_dir(head)
else:
return (head, [tail])
@ -342,8 +349,13 @@ def main():
b_dest = to_bytes(dest, errors='surrogate_or_strict')
dirname = os.path.dirname(dest)
b_dirname = to_bytes(dirname, errors='surrogate_or_strict')
if not os.path.exists(b_dirname) and os.path.isabs(b_dirname):
(pre_existing_dir, new_directory_list) = split_pre_existing_dir(dirname)
if not os.path.exists(b_dirname):
try:
(pre_existing_dir, new_directory_list) = split_pre_existing_dir(dirname)
except AnsibleModuleError as e:
e.result['msg'] += ' Could not copy to {0}'.format(dest)
module.fail_json(**e.results)
os.makedirs(b_dirname)
directory_args = module.load_file_common_arguments(module.params)
directory_mode = module.params["directory_mode"]
@ -438,5 +450,6 @@ def main():
module.exit_json(**res_args)
if __name__ == '__main__':
main()

View file

@ -492,6 +492,10 @@ class ActionModule(ActionBase):
if module_return is None:
continue
if module_return.get('failed'):
result.update(module_return)
return result
paths = os.path.split(source_rel)
dir_path = ''
for dir_component in paths:
@ -517,6 +521,11 @@ class ActionModule(ActionBase):
del new_module_args['src']
module_return = self._execute_module(module_name='file', module_args=new_module_args, task_vars=task_vars)
if module_return.get('failed'):
result.update(module_return)
return result
module_executed = True
changed = changed or module_return.get('changed', False)

View file

@ -1067,6 +1067,97 @@
- "not copied_stat.results[1].stat.exists"
- "copied_stat.results[2].stat.exists"
#
# Recursive copy with relative paths (#34893)
#
- name: Create a directory to copy
file:
path: 'source_recursive'
state: directory
- name: Create a file inside of the directory
copy:
content: "testing"
dest: 'source_recursive/file'
- name: Create a directory to place the test output in
file:
path: 'destination'
state: directory
- name: Copy the directory and files within (no trailing slash)
copy:
src: 'source_recursive'
dest: 'destination'
- name: Stat the recursively copied directory
stat:
path: "destination/{{ item }}"
register: copied_stat
with_items:
- "source_recursive"
- "source_recursive/file"
- "file"
- debug:
var: copied_stat
verbosity: 1
- name: Assert with no trailing slash, directory and file is copied
assert:
that:
- "copied_stat.results[0].stat.exists"
- "copied_stat.results[1].stat.exists"
- "not copied_stat.results[2].stat.exists"
- name: Cleanup
file:
path: 'destination'
state: absent
# Try again with no trailing slash
- name: Create a directory to place the test output in
file:
path: 'destination'
state: directory
- name: Copy just the files inside of the directory
copy:
src: 'source_recursive/'
dest: 'destination'
- name: Stat the recursively copied directory
stat:
path: "destination/{{ item }}"
register: copied_stat
with_items:
- "source_recursive"
- "source_recursive/file"
- "file"
- debug:
var: copied_stat
verbosity: 1
- name: Assert with trailing slash, only the file is copied
assert:
that:
- "not copied_stat.results[0].stat.exists"
- "not copied_stat.results[1].stat.exists"
- "copied_stat.results[2].stat.exists"
- name: Cleanup
file:
path: 'destination'
state: absent
- name: Cleanup
file:
path: 'source_recursive'
state: absent
#
# issue 8394
#

View file

@ -0,0 +1,215 @@
# -*- coding: utf-8 -*-
# Copyright:
# (c) 2018 Ansible Project
# License: GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
# Make coding more python3-ish
from __future__ import (absolute_import, division)
__metaclass__ = type
import pytest
from ansible.modules.files.copy import AnsibleModuleError, split_pre_existing_dir
from ansible.module_utils.basic import AnsibleModule
THREE_DIRS_DATA = (('/dir1/dir2',
# 0 existing dirs: error (because / should always exist)
None,
# 1 existing dir:
('/', ['dir1', 'dir2']),
# 2 existing dirs:
('/dir1', ['dir2']),
# 3 existing dirs:
('/dir1/dir2', [])
),
('/dir1/dir2/',
# 0 existing dirs: error (because / should always exist)
None,
# 1 existing dir:
('/', ['dir1', 'dir2']),
# 2 existing dirs:
('/dir1', ['dir2']),
# 3 existing dirs:
('/dir1/dir2', [])
),
)
TWO_DIRS_DATA = (('dir1/dir2',
# 0 existing dirs:
('.', ['dir1', 'dir2']),
# 1 existing dir:
('dir1', ['dir2']),
# 2 existing dirs:
('dir1/dir2', []),
# 3 existing dirs: Same as 2 because we never get to the third
),
('dir1/dir2/',
# 0 existing dirs:
('.', ['dir1', 'dir2']),
# 1 existing dir:
('dir1', ['dir2']),
# 2 existing dirs:
('dir1/dir2', []),
# 3 existing dirs: Same as 2 because we never get to the third
),
('/dir1',
# 0 existing dirs: error (because / should always exist)
None,
# 1 existing dir:
('/', ['dir1']),
# 2 existing dirs:
('/dir1', []),
# 3 existing dirs: Same as 2 because we never get to the third
),
('/dir1/',
# 0 existing dirs: error (because / should always exist)
None,
# 1 existing dir:
('/', ['dir1']),
# 2 existing dirs:
('/dir1', []),
# 3 existing dirs: Same as 2 because we never get to the third
),
) + THREE_DIRS_DATA
ONE_DIR_DATA = (('dir1',
# 0 existing dirs:
('.', ['dir1']),
# 1 existing dir:
('dir1', []),
# 2 existing dirs: Same as 1 because we never get to the third
),
('dir1/',
# 0 existing dirs:
('.', ['dir1']),
# 1 existing dir:
('dir1', []),
# 2 existing dirs: Same as 1 because we never get to the third
),
) + TWO_DIRS_DATA
@pytest.mark.parametrize('directory, expected', ((d[0], d[4]) for d in THREE_DIRS_DATA))
def test_split_pre_existing_dir_three_levels_exist(directory, expected, mocker):
mocker.patch('os.path.exists', side_effect=[True, True, True])
split_pre_existing_dir(directory) == expected
@pytest.mark.parametrize('directory, expected', ((d[0], d[3]) for d in TWO_DIRS_DATA))
def test_split_pre_existing_dir_two_levels_exist(directory, expected, mocker):
mocker.patch('os.path.exists', side_effect=[True, True, False])
split_pre_existing_dir(directory) == expected
@pytest.mark.parametrize('directory, expected', ((d[0], d[2]) for d in ONE_DIR_DATA))
def test_split_pre_existing_dir_one_level_exists(directory, expected, mocker):
mocker.patch('os.path.exists', side_effect=[True, False, False])
split_pre_existing_dir(directory) == expected
@pytest.mark.parametrize('directory', (d[0] for d in ONE_DIR_DATA if d[1] is None))
def test_split_pre_existing_dir_root_does_not_exist(directory, mocker):
mocker.patch('os.path.exists', return_value=False)
with pytest.raises(AnsibleModuleError) as excinfo:
split_pre_existing_dir(directory)
assert excinfo.value.results['msg'].startswith("The '/' directory doesn't exist on this machine.")
@pytest.mark.parametrize('directory, expected', ((d[0], d[1]) for d in ONE_DIR_DATA if not d[0].startswith('/')))
def test_split_pre_existing_dir_working_dir_exists(directory, expected, mocker):
mocker.patch('os.path.exists', return_value=False)
split_pre_existing_dir(directory) == expected
#
# Info helpful for making new test cases:
#
# base_mode = {'dir no perms': 0o040000,
# 'file no perms': 0o100000,
# 'dir all perms': 0o400000 | 0o777,
# 'file all perms': 0o100000, | 0o777}
#
# perm_bits = {'x': 0b001,
# 'w': 0b010,
# 'r': 0b100}
#
# role_shift = {'u': 6,
# 'g': 3,
# 'o': 0}
DATA = ( # Going from no permissions to setting all for user, group, and/or other
(0o040000, u'a+rwx', 0o0777),
(0o040000, u'u+rwx,g+rwx,o+rwx', 0o0777),
(0o040000, u'o+rwx', 0o0007),
(0o040000, u'g+rwx', 0o0070),
(0o040000, u'u+rwx', 0o0700),
# Going from all permissions to none for user, group, and/or other
(0o040777, u'a-rwx', 0o0000),
(0o040777, u'u-rwx,g-rwx,o-rwx', 0o0000),
(0o040777, u'o-rwx', 0o0770),
(0o040777, u'g-rwx', 0o0707),
(0o040777, u'u-rwx', 0o0077),
# now using absolute assignment from None to a set of perms
(0o040000, u'a=rwx', 0o0777),
(0o040000, u'u=rwx,g=rwx,o=rwx', 0o0777),
(0o040000, u'o=rwx', 0o0007),
(0o040000, u'g=rwx', 0o0070),
(0o040000, u'u=rwx', 0o0700),
# X effect on files and dirs
(0o040000, u'a+X', 0o0111),
(0o100000, u'a+X', 0),
(0o040000, u'a=X', 0o0111),
(0o100000, u'a=X', 0),
(0o040777, u'a-X', 0o0666),
# Same as chmod but is it a bug?
# chmod a-X statfile <== removes execute from statfile
(0o100777, u'a-X', 0o0666),
# Multiple permissions
(0o040000, u'u=rw-x+X,g=r-x+X,o=r-x+X', 0o0755),
(0o100000, u'u=rw-x+X,g=r-x+X,o=r-x+X', 0o0644),
)
UMASK_DATA = (
(0o100000, '+rwx', 0o770),
(0o100777, '-rwx', 0o007),
)
INVALID_DATA = (
(0o040000, u'a=foo', "bad symbolic permission for mode: a=foo"),
(0o040000, u'f=rwx', "bad symbolic permission for mode: f=rwx"),
)
@pytest.mark.parametrize('stat_info, mode_string, expected', DATA)
def test_good_symbolic_modes(mocker, stat_info, mode_string, expected):
mock_stat = mocker.MagicMock()
mock_stat.st_mode = stat_info
assert AnsibleModule._symbolic_mode_to_octal(mock_stat, mode_string) == expected
@pytest.mark.parametrize('stat_info, mode_string, expected', UMASK_DATA)
def test_umask_with_symbolic_modes(mocker, stat_info, mode_string, expected):
mock_umask = mocker.patch('os.umask')
mock_umask.return_value = 0o7
mock_stat = mocker.MagicMock()
mock_stat.st_mode = stat_info
assert AnsibleModule._symbolic_mode_to_octal(mock_stat, mode_string) == expected
@pytest.mark.parametrize('stat_info, mode_string, expected', INVALID_DATA)
def test_invalid_symbolic_modes(mocker, stat_info, mode_string, expected):
mock_stat = mocker.MagicMock()
mock_stat.st_mode = stat_info
with pytest.raises(ValueError) as exc:
assert AnsibleModule._symbolic_mode_to_octal(mock_stat, mode_string) == 'blah'
assert exc.match(expected)