1
0
Fork 0
mirror of https://github.com/ansible-collections/community.general.git synced 2024-09-14 20:13:21 +02:00
community.general/lib/ansible/plugins/action/include_vars.py
Toshio Kuratomi 4ed88512e4 Move uses of to_bytes, to_text, to_native to use the module_utils version (#17423)
We couldn't copy to_unicode, to_bytes, to_str into module_utils because
of licensing.  So once created it we had two sets of functions that did
the same things but had different implementations.  To remedy that, this
change removes the ansible.utils.unicode versions of those functions.
2016-09-06 22:54:17 -07:00

294 lines
10 KiB
Python

# (c) 2016, Allen Sanabria <asanabria@linuxdynasty.org>
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
from os import path, walk
import re
from ansible.errors import AnsibleError
from ansible.module_utils._text import to_native
from ansible.plugins.action import ActionBase
class ActionModule(ActionBase):
TRANSFERS_FILES = False
def _mutually_exclusive(self):
dir_arguments = [
self.source_dir, self.files_matching, self.ignore_files,
self.depth
]
if self.source_file and None not in dir_arguments:
err_msg = (
"Can not include {0} with file argument"
.format(", ".join(self.VALID_DIR_ARGUMENTS))
)
raise AnsibleError(err_msg)
elif self.source_dir and self.source_file:
err_msg = (
"Need to pass either file or dir"
)
raise AnsibleError(err_msg)
def _set_dir_defaults(self):
if not self.depth:
self.depth = 0
if self.files_matching:
self.matcher = re.compile(r'{0}'.format(self.files_matching))
else:
self.matcher = None
if not self.ignore_files:
self.ignore_files = list()
if isinstance(self.ignore_files, str):
self.ignore_files = self.ignore_files.split()
elif isinstance(self.ignore_files, dict):
return {
'failed': True,
'message': '{0} must be a list'.format(self.ignore_files)
}
def _set_args(self):
""" Set instance variables based on the arguments that were passed
"""
self.VALID_DIR_ARGUMENTS = [
'dir', 'depth', 'files_matching', 'ignore_files'
]
self.VALID_FILE_ARGUMENTS = ['file', '_raw_params']
self.GLOBAL_FILE_ARGUMENTS = ['name']
self.VALID_ARGUMENTS = (
self.VALID_DIR_ARGUMENTS + self.VALID_FILE_ARGUMENTS +
self.GLOBAL_FILE_ARGUMENTS
)
for arg in self._task.args:
if arg not in self.VALID_ARGUMENTS:
err_msg = '{0} is not a valid option in debug'.format(arg)
raise AnsibleError(err_msg)
self.return_results_as_name = self._task.args.get('name', None)
self.source_dir = self._task.args.get('dir', None)
self.source_file = self._task.args.get('file', None)
if not self.source_dir and not self.source_file:
self.source_file = self._task.args.get('_raw_params')
self.depth = self._task.args.get('depth', None)
self.files_matching = self._task.args.get('files_matching', None)
self.ignore_files = self._task.args.get('ignore_files', None)
self._mutually_exclusive()
def run(self, tmp=None, task_vars=None):
""" Load yml files recursively from a directory.
"""
self.VALID_FILE_EXTENSIONS = ['yaml', 'yml', '.json']
if not task_vars:
task_vars = dict()
self.show_content = True
self._set_args()
results = dict()
if self.source_dir:
self._set_dir_defaults()
self._set_root_dir()
if path.exists(self.source_dir):
for root_dir, filenames in self._traverse_dir_depth():
failed, err_msg, updated_results = (
self._load_files_in_dir(root_dir, filenames)
)
if not failed:
results.update(updated_results)
else:
break
else:
failed = True
err_msg = (
'{0} directory does not exist'.format(self.source_dir)
)
else:
try:
self.source_file = self._find_needle('vars', self.source_file)
failed, err_msg, updated_results = (
self._load_files(self.source_file)
)
if not failed:
results.update(updated_results)
except AnsibleError as e:
err_msg = to_native(e)
raise AnsibleError(err_msg)
if self.return_results_as_name:
scope = dict()
scope[self.return_results_as_name] = results
results = scope
result = super(ActionModule, self).run(tmp, task_vars)
if failed:
result['failed'] = failed
result['message'] = err_msg
result['ansible_facts'] = results
result['_ansible_no_log'] = not self.show_content
return result
def _set_root_dir(self):
if self._task._role:
if self.source_dir.split('/')[0] == 'vars':
path_to_use = (
path.join(self._task._role._role_path, self.source_dir)
)
if path.exists(path_to_use):
self.source_dir = path_to_use
else:
path_to_use = (
path.join(
self._task._role._role_path, 'vars', self.source_dir
)
)
self.source_dir = path_to_use
else:
current_dir = (
"/".join(self._task._ds._data_source.split('/')[:-1])
)
self.source_dir = path.join(current_dir, self.source_dir)
def _traverse_dir_depth(self):
""" Recursively iterate over a directory and sort the files in
alphabetical order. Do not iterate pass the set depth.
The default depth is unlimited.
"""
current_depth = 0
sorted_walk = list(walk(self.source_dir))
sorted_walk.sort(key=lambda x: x[0])
for current_root, current_dir, current_files in sorted_walk:
current_depth += 1
if current_depth <= self.depth or self.depth == 0:
current_files.sort()
yield (current_root, current_files)
else:
break
def _ignore_file(self, filename):
""" Return True if a file matches the list of ignore_files.
Args:
filename (str): The filename that is being matched against.
Returns:
Boolean
"""
for file_type in self.ignore_files:
try:
if re.search(r'{0}$'.format(file_type), filename):
return True
except Exception:
err_msg = 'Invalid regular expression: {0}'.format(file_type)
raise AnsibleError(err_msg)
return False
def _is_valid_file_ext(self, source_file):
""" Verify if source file has a valid extension
Args:
source_file (str): The full path of source file or source file.
Returns:
Bool
"""
success = False
file_ext = source_file.split('.')
if len(file_ext) >= 1:
if file_ext[-1] in self.VALID_FILE_EXTENSIONS:
success = True
return success
return success
def _load_files(self, filename):
""" Loads a file and converts the output into a valid Python dict.
Args:
filename (str): The source file.
Returns:
Tuple (bool, str, dict)
"""
results = dict()
failed = False
err_msg = ''
if not self._is_valid_file_ext(filename):
failed = True
err_msg = (
'{0} does not have a valid extension: {1}'
.format(filename, ', '.join(self.VALID_FILE_EXTENSIONS))
)
return failed, err_msg, results
data, show_content = self._loader._get_file_contents(filename)
self.show_content = show_content
data = self._loader.load(data, show_content)
if not data:
data = dict()
if not isinstance(data, dict):
failed = True
err_msg = (
'{0} must be stored as a dictionary/hash'
.format(filename)
)
else:
results.update(data)
return failed, err_msg, results
def _load_files_in_dir(self, root_dir, var_files):
""" Load the found yml files and update/overwrite the dictionary.
Args:
root_dir (str): The base directory of the list of files that is being passed.
var_files: (list): List of files to iterate over and load into a dictionary.
Returns:
Tuple (bool, str, dict)
"""
results = dict()
failed = False
err_msg = ''
for filename in var_files:
stop_iter = False
# Never include main.yml from a role, as that is the default included by the role
if self._task._role:
if filename == 'main.yml':
stop_iter = True
continue
filepath = path.join(root_dir, filename)
if self.files_matching:
if not self.matcher.search(filename):
stop_iter = True
if not stop_iter and not failed:
if path.exists(filepath) and not self._ignore_file(filename):
failed, err_msg, loaded_data = self._load_files(filepath)
if not failed:
results.update(loaded_data)
return failed, err_msg, results