mirror of
https://github.com/ansible-collections/community.general.git
synced 2024-09-14 20:13:21 +02:00
Adding module_utils/module_helper.py + big revamp in xfconf.py to use it (#1322)
* Big revamp in xfconf.py - added plugin/module_utils/module_helper.py - scaffold class for writing modules, beyond standard AnsibleModule - automatic capture of exceptions - easier dependency testing - StateMixin to easily handle different behaviours for 'state' param - CmdMixin to easily run external commands - adapted test_xfconf.py - the args for run_command are now lists instead of a string - value and previous_value were not being tested before (because xfconf wasn't filling results - see below) - added more tests: setting value to previous_value, getting non-existent property - rewritten xfconf module, keeping the same results - original module posted results as ansible_facts, this version still does it for compatibility, but also adds to the module result * Added suggestions from the PR * Added russoz as maintainer for the module_utils/module_helper.py file * Formatting using printf-style requires special treatment Strings not containing substitution tokens must work as well. * Tidied up variables in module definition * Tests with ArgFormat and DependencyCtxMgr * pytest parameters must be in the same order, it seems * improved testing for the DependencyCtxMgr * fixed test for older pythons * Moved changed property to improve readability * Added testcase for state: absent and adjusted xfconf after it * Fixed param name environ_update in run_command() * added changelog fragment * fixed tests after run_command param change
This commit is contained in:
parent
2ebf2861b6
commit
e3fcc7de2a
6 changed files with 693 additions and 188 deletions
3
.github/BOTMETA.yml
vendored
3
.github/BOTMETA.yml
vendored
|
@ -166,6 +166,9 @@ files:
|
|||
$module_utils/memset.py:
|
||||
maintainers: glitchcrab
|
||||
labels: cloud memset
|
||||
$module_utils/module_helper.py:
|
||||
maintainers: russoz
|
||||
labels: module_helper
|
||||
$module_utils/net_tools/nios/api.py:
|
||||
maintainers: $team_networking sganesh-infoblox
|
||||
labels: infoblox networking
|
||||
|
|
5
changelogs/fragments/1322-module_helper_and_xfconf.yaml
Normal file
5
changelogs/fragments/1322-module_helper_and_xfconf.yaml
Normal file
|
@ -0,0 +1,5 @@
|
|||
minor_changes:
|
||||
- module_helper - added ModuleHelper class and a couple of convenience tools for module developers (https://github.com/ansible-collections/community.general/pull/1322).
|
||||
bugfixes:
|
||||
- xfconf - xfconf no longer passing the command args as a string, but rather as a list (https://github.com/ansible-collections/community.general/issues/1328).
|
||||
- xfconf - parameter ``value`` no longer required for state ``absent`` (https://github.com/ansible-collections/community.general/issues/1329).
|
302
plugins/module_utils/module_helper.py
Normal file
302
plugins/module_utils/module_helper.py
Normal file
|
@ -0,0 +1,302 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# Copyright: (c) 2018, Ansible Project
|
||||
# Simplified BSD License (see licenses/simplified_bsd.txt or https://opensource.org/licenses/BSD-2-Clause)
|
||||
|
||||
from __future__ import absolute_import, division, print_function
|
||||
__metaclass__ = type
|
||||
|
||||
from functools import partial, wraps
|
||||
import traceback
|
||||
|
||||
from ansible.module_utils.basic import AnsibleModule
|
||||
|
||||
|
||||
class ArgFormat(object):
|
||||
"""
|
||||
Argument formatter
|
||||
"""
|
||||
BOOLEAN = 0
|
||||
PRINTF = 1
|
||||
FORMAT = 2
|
||||
|
||||
@staticmethod
|
||||
def stars_deco(num):
|
||||
if num == 1:
|
||||
def deco(f):
|
||||
return lambda v: f(*v)
|
||||
return deco
|
||||
elif num == 2:
|
||||
def deco(f):
|
||||
return lambda v: f(**v)
|
||||
return deco
|
||||
|
||||
return lambda f: f
|
||||
|
||||
def __init__(self, name, fmt=None, style=FORMAT, stars=0):
|
||||
"""
|
||||
Creates a new formatter
|
||||
:param name: Name of the argument to be formatted
|
||||
:param fmt: Either a str to be formatted (using or not printf-style) or a callable that does that
|
||||
:param style: Whether arg_format (as str) should use printf-style formatting.
|
||||
Ignored if arg_format is None or not a str (should be callable).
|
||||
:param stars: A int with 0, 1 or 2 value, indicating to formatting the value as: value, *value or **value
|
||||
"""
|
||||
def printf_fmt(_fmt, v):
|
||||
try:
|
||||
return [_fmt % v]
|
||||
except TypeError as e:
|
||||
if e.args[0] != 'not all arguments converted during string formatting':
|
||||
raise
|
||||
return [_fmt]
|
||||
|
||||
_fmts = {
|
||||
ArgFormat.BOOLEAN: lambda _fmt, v: ([_fmt] if bool(v) else []),
|
||||
ArgFormat.PRINTF: printf_fmt,
|
||||
ArgFormat.FORMAT: lambda _fmt, v: [_fmt.format(v)],
|
||||
}
|
||||
|
||||
self.name = name
|
||||
self.stars = stars
|
||||
|
||||
if fmt is None:
|
||||
fmt = "{0}"
|
||||
style = ArgFormat.FORMAT
|
||||
|
||||
if isinstance(fmt, str):
|
||||
func = _fmts[style]
|
||||
self.arg_format = partial(func, fmt)
|
||||
elif isinstance(fmt, list) or isinstance(fmt, tuple):
|
||||
self.arg_format = lambda v: [_fmts[style](f, v)[0] for f in fmt]
|
||||
elif hasattr(fmt, '__call__'):
|
||||
self.arg_format = fmt
|
||||
else:
|
||||
raise TypeError('Parameter fmt must be either: a string, a list/tuple of '
|
||||
'strings or a function: type={0}, value={1}'.format(type(fmt), fmt))
|
||||
|
||||
if stars:
|
||||
self.arg_format = (self.stars_deco(stars))(self.arg_format)
|
||||
|
||||
def to_text(self, value):
|
||||
func = self.arg_format
|
||||
return [str(p) for p in func(value)]
|
||||
|
||||
|
||||
def cause_changes(func, on_success=True, on_failure=False):
|
||||
@wraps(func)
|
||||
def wrapper(self, *args, **kwargs):
|
||||
try:
|
||||
func(*args, **kwargs)
|
||||
if on_success:
|
||||
self.changed = True
|
||||
except Exception as e:
|
||||
if on_failure:
|
||||
self.changed = True
|
||||
raise
|
||||
return wrapper
|
||||
|
||||
|
||||
def module_fails_on_exception(func):
|
||||
@wraps(func)
|
||||
def wrapper(self, *args, **kwargs):
|
||||
try:
|
||||
func(self, *args, **kwargs)
|
||||
except SystemExit:
|
||||
raise
|
||||
except Exception as e:
|
||||
self.vars.msg = "Module failed with exception: {0}".format(str(e).strip())
|
||||
self.vars.exception = traceback.format_exc()
|
||||
self.module.fail_json(changed=False, msg=self.vars.msg, exception=self.vars.exception, output=self.output, vars=self.vars)
|
||||
return wrapper
|
||||
|
||||
|
||||
class DependencyCtxMgr(object):
|
||||
def __init__(self, name, msg=None):
|
||||
self.name = name
|
||||
self.msg = msg
|
||||
self.has_it = False
|
||||
self.exc_type = None
|
||||
self.exc_val = None
|
||||
self.exc_tb = None
|
||||
|
||||
def __enter__(self):
|
||||
pass
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
self.has_it = exc_type is None
|
||||
self.exc_type = exc_type
|
||||
self.exc_val = exc_val
|
||||
self.exc_tb = exc_tb
|
||||
return not self.has_it
|
||||
|
||||
@property
|
||||
def text(self):
|
||||
return self.msg or str(self.exc_val)
|
||||
|
||||
|
||||
class ModuleHelper(object):
|
||||
_dependencies = []
|
||||
module = {}
|
||||
facts_name = None
|
||||
|
||||
class AttrDict(dict):
|
||||
def __getattr__(self, item):
|
||||
return self[item]
|
||||
|
||||
def __init__(self, module=None):
|
||||
self.vars = ModuleHelper.AttrDict()
|
||||
self.output_dict = dict()
|
||||
self.facts_dict = dict()
|
||||
self._changed = False
|
||||
|
||||
if module:
|
||||
self.module = module
|
||||
|
||||
if not isinstance(module, AnsibleModule):
|
||||
self.module = AnsibleModule(**self.module)
|
||||
|
||||
def update_output(self, **kwargs):
|
||||
if kwargs:
|
||||
self.output_dict.update(kwargs)
|
||||
|
||||
def update_facts(self, **kwargs):
|
||||
if kwargs:
|
||||
self.facts_dict.update(kwargs)
|
||||
|
||||
def __init_module__(self):
|
||||
pass
|
||||
|
||||
def __run__(self):
|
||||
raise NotImplementedError()
|
||||
|
||||
@property
|
||||
def changed(self):
|
||||
return self._changed
|
||||
|
||||
@changed.setter
|
||||
def changed(self, value):
|
||||
self._changed = value
|
||||
|
||||
@property
|
||||
def output(self):
|
||||
result = dict(self.vars)
|
||||
result.update(self.output_dict)
|
||||
if self.facts_name:
|
||||
result['ansible_facts'] = {self.facts_name: self.facts_dict}
|
||||
return result
|
||||
|
||||
@module_fails_on_exception
|
||||
def run(self):
|
||||
self.fail_on_missing_deps()
|
||||
self.__init_module__()
|
||||
self.__run__()
|
||||
self.module.exit_json(changed=self.changed, **self.output_dict)
|
||||
|
||||
@classmethod
|
||||
def dependency(cls, name, msg):
|
||||
cls._dependencies.append(DependencyCtxMgr(name, msg))
|
||||
return cls._dependencies[-1]
|
||||
|
||||
def fail_on_missing_deps(self):
|
||||
for d in self._dependencies:
|
||||
if not d.has_it:
|
||||
self.module.fail_json(changed=False,
|
||||
exception=d.exc_val.__traceback__.format_exc(),
|
||||
msg=d.text,
|
||||
**self.output_dict)
|
||||
|
||||
|
||||
class StateMixin(object):
|
||||
state_param = 'state'
|
||||
default_state = None
|
||||
|
||||
def _state(self):
|
||||
state = self.module.params.get(self.state_param)
|
||||
return self.default_state if state is None else state
|
||||
|
||||
def __run__(self):
|
||||
state = self._state()
|
||||
self.vars.state = state
|
||||
|
||||
# resolve aliases
|
||||
if state not in self.module.params:
|
||||
aliased = [name for name, param in self.module.argument_spec.items() if state in param.get('aliases', [])]
|
||||
if aliased:
|
||||
state = aliased[0]
|
||||
self.vars.effective_state = state
|
||||
|
||||
method = "state_{0}".format(state)
|
||||
if not hasattr(self, method):
|
||||
return self.__state_fallback__()
|
||||
func = getattr(self, method)
|
||||
return func()
|
||||
|
||||
def __state_fallback__(self):
|
||||
raise ValueError("Cannot find method for state: {0}".format(self._state()))
|
||||
|
||||
|
||||
class CmdMixin(object):
|
||||
"""
|
||||
Mixin for mapping module options to running a CLI command with its arguments.
|
||||
"""
|
||||
command = None
|
||||
command_args_formats = dict()
|
||||
check_rc = False
|
||||
force_lang = "C"
|
||||
|
||||
@property
|
||||
def module_formats(self):
|
||||
result = {}
|
||||
for param in self.module.params.keys():
|
||||
result[param] = ArgFormat(param)
|
||||
return result
|
||||
|
||||
@property
|
||||
def custom_formats(self):
|
||||
result = {}
|
||||
for param, fmt_spec in self.command_args_formats.items():
|
||||
result[param] = ArgFormat(param, **fmt_spec)
|
||||
return result
|
||||
|
||||
def _calculate_args(self, extra_params=None, params=None):
|
||||
def add_arg_formatted_param(_cmd_args, arg_format, _value):
|
||||
args = [x for x in arg_format.to_text(_value)]
|
||||
return _cmd_args + args
|
||||
|
||||
def find_format(_param):
|
||||
return self.custom_formats.get(_param, self.module_formats.get(_param))
|
||||
|
||||
extra_params = extra_params or dict()
|
||||
cmd_args = [self.module.get_bin_path(self.command)]
|
||||
param_list = params if params else self.module.params.keys()
|
||||
|
||||
for param in param_list:
|
||||
if param in self.module.argument_spec:
|
||||
if param not in self.module.params:
|
||||
continue
|
||||
fmt = find_format(param)
|
||||
value = self.module.params[param]
|
||||
else:
|
||||
if param not in extra_params:
|
||||
continue
|
||||
fmt = find_format(param)
|
||||
value = extra_params[param]
|
||||
self.cmd_args = cmd_args
|
||||
cmd_args = add_arg_formatted_param(cmd_args, fmt, value)
|
||||
|
||||
return cmd_args
|
||||
|
||||
def process_command_output(self, rc, out, err):
|
||||
return rc, out, err
|
||||
|
||||
def run_command(self, extra_params=None, params=None, *args, **kwargs):
|
||||
self.vars['cmd_args'] = self._calculate_args(extra_params, params)
|
||||
env_update = kwargs.get('environ_update', {})
|
||||
check_rc = kwargs.get('check_rc', self.check_rc)
|
||||
if self.force_lang:
|
||||
env_update.update({'LANGUAGE': self.force_lang})
|
||||
self.update_output(force_lang=self.force_lang)
|
||||
rc, out, err = self.module.run_command(self.vars['cmd_args'],
|
||||
environ_update=env_update,
|
||||
check_rc=check_rc, *args, **kwargs)
|
||||
self.update_output(rc=rc, stdout=out, stderr=err)
|
||||
return self.process_command_output(rc, out, err)
|
|
@ -7,7 +7,6 @@
|
|||
from __future__ import absolute_import, division, print_function
|
||||
__metaclass__ = type
|
||||
|
||||
|
||||
DOCUMENTATION = '''
|
||||
module: xfconf
|
||||
author:
|
||||
|
@ -112,195 +111,149 @@ RETURN = '''
|
|||
sample: "96"
|
||||
'''
|
||||
|
||||
import traceback
|
||||
from ansible_collections.community.general.plugins.module_utils.module_helper import (
|
||||
ModuleHelper, CmdMixin, StateMixin, ArgFormat
|
||||
)
|
||||
|
||||
from ansible.module_utils.basic import AnsibleModule
|
||||
from ansible.module_utils.six.moves import shlex_quote
|
||||
|
||||
def fix_bool(value):
|
||||
vl = value.lower()
|
||||
return vl if vl in ("true", "false") else value
|
||||
|
||||
|
||||
@ArgFormat.stars_deco(1)
|
||||
def values_fmt(values, value_types):
|
||||
result = []
|
||||
for value, value_type in zip(values, value_types):
|
||||
if value_type == 'bool':
|
||||
value = fix_bool(value)
|
||||
result.append('--type')
|
||||
result.append('{0}'.format(value_type))
|
||||
result.append('--set')
|
||||
result.append('{0}'.format(value))
|
||||
return result
|
||||
|
||||
|
||||
class XFConfException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class XFConfProperty(object):
|
||||
SET = "present"
|
||||
GET = "get"
|
||||
RESET = "absent"
|
||||
VALID_STATES = (SET, GET, RESET)
|
||||
VALID_VALUE_TYPES = ('int', 'uint', 'bool', 'float', 'double', 'string')
|
||||
previous_value = None
|
||||
is_array = None
|
||||
|
||||
def __init__(self, module):
|
||||
self.module = module
|
||||
self.channel = module.params['channel']
|
||||
self.property = module.params['property']
|
||||
self.value_type = module.params['value_type']
|
||||
self.value = module.params['value']
|
||||
self.force_array = module.params['force_array']
|
||||
|
||||
self.cmd = "{0} --channel {1} --property {2}".format(
|
||||
module.get_bin_path('xfconf-query', True),
|
||||
shlex_quote(self.channel),
|
||||
shlex_quote(self.property)
|
||||
)
|
||||
self.method_map = dict(zip((self.SET, self.GET, self.RESET),
|
||||
(self.set, self.get, self.reset)))
|
||||
|
||||
self.does_not = 'Property "{0}" does not exist on channel "{1}".'.format(self.property, self.channel)
|
||||
|
||||
def run(cmd):
|
||||
return module.run_command(cmd, check_rc=False)
|
||||
self._run = run
|
||||
|
||||
def _execute_xfconf_query(self, args=None):
|
||||
try:
|
||||
cmd = self.cmd
|
||||
if args:
|
||||
cmd = "{0} {1}".format(cmd, args)
|
||||
|
||||
self.module.debug("Running cmd={0}".format(cmd))
|
||||
rc, out, err = self._run(cmd)
|
||||
if err.rstrip() == self.does_not:
|
||||
return None
|
||||
if rc or len(err):
|
||||
raise XFConfException('xfconf-query failed with error (rc={0}): {1}'.format(rc, err))
|
||||
|
||||
return out.rstrip()
|
||||
|
||||
except OSError as exception:
|
||||
XFConfException('xfconf-query failed with exception: {0}'.format(exception))
|
||||
|
||||
def get(self):
|
||||
previous_value = self._execute_xfconf_query()
|
||||
if previous_value is None:
|
||||
return
|
||||
|
||||
if "Value is an array with" in previous_value:
|
||||
previous_value = previous_value.split("\n")
|
||||
previous_value.pop(0)
|
||||
previous_value.pop(0)
|
||||
|
||||
return previous_value
|
||||
|
||||
def reset(self):
|
||||
self._execute_xfconf_query("--reset")
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def _fix_bool(value):
|
||||
if value.lower() in ("true", "false"):
|
||||
value = value.lower()
|
||||
return value
|
||||
|
||||
def _make_value_args(self, value, value_type):
|
||||
if value_type == 'bool':
|
||||
value = self._fix_bool(value)
|
||||
return " --type '{1}' --set '{0}'".format(shlex_quote(value), shlex_quote(value_type))
|
||||
|
||||
def set(self):
|
||||
args = "--create"
|
||||
if self.is_array:
|
||||
args += " --force-array"
|
||||
for v in zip(self.value, self.value_type):
|
||||
args += self._make_value_args(*v)
|
||||
else:
|
||||
args += self._make_value_args(self.value, self.value_type)
|
||||
self._execute_xfconf_query(args)
|
||||
return self.value
|
||||
|
||||
def call(self, state):
|
||||
return self.method_map[state]()
|
||||
|
||||
def sanitize(self):
|
||||
self.previous_value = self.get()
|
||||
|
||||
if self.value is None and self.value_type is None:
|
||||
return
|
||||
if (self.value is None) ^ (self.value_type is None):
|
||||
raise XFConfException('Must set both "value" and "value_type"')
|
||||
|
||||
# stringify all values - in the CLI they will all be happy strings anyway
|
||||
# and by doing this here the rest of the code can be agnostic to it
|
||||
self.value = [str(v) for v in self.value]
|
||||
|
||||
values_len = len(self.value)
|
||||
types_len = len(self.value_type)
|
||||
|
||||
if types_len == 1:
|
||||
# use one single type for the entire list
|
||||
self.value_type = self.value_type * values_len
|
||||
elif types_len != values_len:
|
||||
# or complain if lists' lengths are different
|
||||
raise XFConfException('Same number of "value" and "value_type" needed')
|
||||
|
||||
# fix boolean values
|
||||
self.value = [self._fix_bool(v[0]) if v[1] == 'bool' else v[0] for v in zip(self.value, self.value_type)]
|
||||
|
||||
# calculates if it is an array
|
||||
self.is_array = self.force_array or isinstance(self.previous_value, list) or values_len > 1
|
||||
if not self.is_array:
|
||||
self.value = self.value[0]
|
||||
self.value_type = self.value_type[0]
|
||||
|
||||
|
||||
def main():
|
||||
facts_name = "xfconf"
|
||||
# Setup the Ansible module
|
||||
module = AnsibleModule(
|
||||
class XFConfProperty(CmdMixin, StateMixin, ModuleHelper):
|
||||
module = dict(
|
||||
argument_spec=dict(
|
||||
state=dict(default=XFConfProperty.SET,
|
||||
choices=XFConfProperty.VALID_STATES,
|
||||
state=dict(default="present",
|
||||
choices=("present", "get", "absent"),
|
||||
type='str'),
|
||||
channel=dict(required=True, type='str'),
|
||||
property=dict(required=True, type='str'),
|
||||
value_type=dict(required=False, type='list',
|
||||
elements='str', choices=XFConfProperty.VALID_VALUE_TYPES),
|
||||
elements='str', choices=('int', 'uint', 'bool', 'float', 'double', 'string')),
|
||||
value=dict(required=False, type='list', elements='raw'),
|
||||
force_array=dict(default=False, type='bool', aliases=['array']),
|
||||
),
|
||||
required_if=[
|
||||
('state', XFConfProperty.SET, ['value', 'value_type'])
|
||||
],
|
||||
supports_check_mode=True
|
||||
required_if=[('state', 'present', ['value', 'value_type'])],
|
||||
required_together=[('value', 'value_type')],
|
||||
supports_check_mode=True,
|
||||
)
|
||||
|
||||
# Force language to 'C' to ensure return values are always formatted in english, even in non-english environments
|
||||
module.run_command_environ_update = dict(LANGUAGE='C')
|
||||
facts_name = "xfconf"
|
||||
default_state = 'present'
|
||||
command = 'xfconf-query'
|
||||
command_args_formats = dict(
|
||||
channel=dict(fmt=('--channel', '{0}'),),
|
||||
property=dict(fmt=('--property', '{0}'),),
|
||||
is_array=dict(fmt="--force-array", style=ArgFormat.BOOLEAN),
|
||||
reset=dict(fmt="--reset", style=ArgFormat.BOOLEAN),
|
||||
create=dict(fmt="--create", style=ArgFormat.BOOLEAN),
|
||||
values_and_types=dict(fmt=values_fmt)
|
||||
)
|
||||
|
||||
state = module.params['state']
|
||||
def update_xfconf_output(self, **kwargs):
|
||||
self.update_output(**kwargs)
|
||||
self.update_facts(**kwargs)
|
||||
|
||||
try:
|
||||
xfconf = XFConfProperty(module)
|
||||
xfconf.sanitize()
|
||||
def __init_module__(self):
|
||||
self.does_not = 'Property "{0}" does not exist on channel "{1}".'.format(self.module.params['property'],
|
||||
self.module.params['channel'])
|
||||
self.vars.previous_value = self._get()
|
||||
|
||||
previous_value = xfconf.previous_value
|
||||
facts = {
|
||||
facts_name: dict(
|
||||
channel=xfconf.channel,
|
||||
property=xfconf.property,
|
||||
value_type=xfconf.value_type,
|
||||
value=previous_value,
|
||||
)
|
||||
}
|
||||
def process_command_output(self, rc, out, err):
|
||||
if err.rstrip() == self.does_not:
|
||||
return None
|
||||
if rc or len(err):
|
||||
raise XFConfException('xfconf-query failed with error (rc={0}): {1}'.format(rc, err))
|
||||
|
||||
if state == XFConfProperty.GET \
|
||||
or (previous_value is not None
|
||||
and (state, set(previous_value)) == (XFConfProperty.SET, set(xfconf.value))):
|
||||
module.exit_json(changed=False, ansible_facts=facts)
|
||||
return
|
||||
result = out.rstrip()
|
||||
if "Value is an array with" in result:
|
||||
result = result.split("\n")
|
||||
result.pop(0)
|
||||
result.pop(0)
|
||||
|
||||
# If check mode, we know a change would have occurred.
|
||||
if module.check_mode:
|
||||
new_value = xfconf.value
|
||||
return result
|
||||
|
||||
@property
|
||||
def changed(self):
|
||||
if self.vars.previous_value is None:
|
||||
return self.vars.value is not None
|
||||
elif self.vars.value is None:
|
||||
return self.vars.previous_value is not None
|
||||
else:
|
||||
new_value = xfconf.call(state)
|
||||
return set(self.vars.previous_value) != set(self.vars.value)
|
||||
|
||||
facts[facts_name].update(value=new_value, previous_value=previous_value)
|
||||
module.exit_json(changed=True, ansible_facts=facts)
|
||||
def _get(self):
|
||||
return self.run_command(params=('channel', 'property'))
|
||||
|
||||
except Exception as e:
|
||||
module.fail_json(msg="Failed with exception: {0}".format(e), exception=traceback.format_exc())
|
||||
def state_get(self):
|
||||
self.vars.value = self.vars.previous_value
|
||||
|
||||
def state_absent(self):
|
||||
self.vars.value = None
|
||||
self.run_command(params=('channel', 'property', 'reset'), extra_params={"reset": True})
|
||||
|
||||
def state_present(self):
|
||||
# stringify all values - in the CLI they will all be happy strings anyway
|
||||
# and by doing this here the rest of the code can be agnostic to it
|
||||
self.vars.value = [str(v) for v in self.module.params['value']]
|
||||
value_type = self.module.params['value_type']
|
||||
|
||||
values_len = len(self.vars.value)
|
||||
types_len = len(value_type)
|
||||
|
||||
if types_len == 1:
|
||||
# use one single type for the entire list
|
||||
value_type = value_type * values_len
|
||||
elif types_len != values_len:
|
||||
# or complain if lists' lengths are different
|
||||
raise XFConfException('Number of elements in "value" and "value_type" must be the same')
|
||||
|
||||
# fix boolean values
|
||||
self.vars.value = [fix_bool(v[0]) if v[1] == 'bool' else v[0] for v in zip(self.vars.value, value_type)]
|
||||
|
||||
# calculates if it is an array
|
||||
self.vars.is_array = \
|
||||
bool(self.module.params['force_array']) or \
|
||||
isinstance(self.vars.previous_value, list) or \
|
||||
values_len > 1
|
||||
|
||||
params = ['channel', 'property', 'create']
|
||||
if self.vars.is_array:
|
||||
params.append('is_array')
|
||||
params.append('values_and_types')
|
||||
|
||||
extra_params = dict(values_and_types=(self.vars.value, value_type))
|
||||
extra_params['create'] = True
|
||||
extra_params['is_array'] = self.vars.is_array
|
||||
|
||||
if not self.module.check_mode:
|
||||
self.run_command(params=params, extra_params=extra_params)
|
||||
|
||||
if not self.vars.is_array:
|
||||
self.vars.value = self.vars.value[0]
|
||||
|
||||
|
||||
def main():
|
||||
xfconf = XFConfProperty()
|
||||
xfconf.run()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
|
84
tests/unit/plugins/module_utils/test_module_helper.py
Normal file
84
tests/unit/plugins/module_utils/test_module_helper.py
Normal file
|
@ -0,0 +1,84 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# (c) 2020, Alexei Znamensky <russoz@gmail.com>
|
||||
# Copyright (c) 2020 Ansible Project
|
||||
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
|
||||
|
||||
from __future__ import (absolute_import, division, print_function)
|
||||
__metaclass__ = type
|
||||
|
||||
import pytest
|
||||
|
||||
from ansible_collections.community.general.plugins.module_utils.module_helper import (
|
||||
ArgFormat, DependencyCtxMgr, ModuleHelper
|
||||
)
|
||||
|
||||
|
||||
def single_lambda_2star(x, y, z):
|
||||
return ["piggies=[{0},{1},{2}]".format(x, y, z)]
|
||||
|
||||
|
||||
ARG_FORMATS = dict(
|
||||
simple_boolean_true=("--superflag", ArgFormat.BOOLEAN, 0, True, ["--superflag"]),
|
||||
simple_boolean_false=("--superflag", ArgFormat.BOOLEAN, 0, False, []),
|
||||
single_printf=("--param=%s", ArgFormat.PRINTF, 0, "potatoes", ["--param=potatoes"]),
|
||||
single_printf_no_substitution=("--param", ArgFormat.PRINTF, 0, "potatoes", ["--param"]),
|
||||
multiple_printf=(["--param", "free-%s"], ArgFormat.PRINTF, 0, "potatoes", ["--param", "free-potatoes"]),
|
||||
single_format=("--param={0}", ArgFormat.FORMAT, 0, "potatoes", ["--param=potatoes"]),
|
||||
single_format_no_substitution=("--param", ArgFormat.FORMAT, 0, "potatoes", ["--param"]),
|
||||
multiple_format=(["--param", "free-{0}"], ArgFormat.FORMAT, 0, "potatoes", ["--param", "free-potatoes"]),
|
||||
single_lambda_0star=((lambda v: ["piggies=[{0},{1},{2}]".format(v[0], v[1], v[2])]),
|
||||
None, 0, ['a', 'b', 'c'], ["piggies=[a,b,c]"]),
|
||||
single_lambda_1star=((lambda a, b, c: ["piggies=[{0},{1},{2}]".format(a, b, c)]),
|
||||
None, 1, ['a', 'b', 'c'], ["piggies=[a,b,c]"]),
|
||||
single_lambda_2star=(single_lambda_2star, None, 2, dict(z='c', x='a', y='b'), ["piggies=[a,b,c]"])
|
||||
)
|
||||
ARG_FORMATS_IDS = sorted(ARG_FORMATS.keys())
|
||||
|
||||
|
||||
@pytest.mark.parametrize('fmt, style, stars, value, expected',
|
||||
(ARG_FORMATS[tc] for tc in ARG_FORMATS_IDS),
|
||||
ids=ARG_FORMATS_IDS)
|
||||
def test_arg_format(fmt, style, stars, value, expected):
|
||||
af = ArgFormat('name', fmt, style, stars)
|
||||
actual = af.to_text(value)
|
||||
print("formatted string = {0}".format(actual))
|
||||
assert actual == expected
|
||||
|
||||
|
||||
ARG_FORMATS_FAIL = dict(
|
||||
int_fmt=(3, None, 0, "", [""]),
|
||||
bool_fmt=(True, None, 0, "", [""]),
|
||||
)
|
||||
ARG_FORMATS_FAIL_IDS = sorted(ARG_FORMATS_FAIL.keys())
|
||||
|
||||
|
||||
@pytest.mark.parametrize('fmt, style, stars, value, expected',
|
||||
(ARG_FORMATS_FAIL[tc] for tc in ARG_FORMATS_FAIL_IDS),
|
||||
ids=ARG_FORMATS_FAIL_IDS)
|
||||
def test_arg_format_fail(fmt, style, stars, value, expected):
|
||||
with pytest.raises(TypeError):
|
||||
af = ArgFormat('name', fmt, style, stars)
|
||||
actual = af.to_text(value)
|
||||
print("formatted string = {0}".format(actual))
|
||||
|
||||
|
||||
def test_dependency_ctxmgr():
|
||||
ctx = DependencyCtxMgr("POTATOES", "Potatoes must be installed")
|
||||
with ctx:
|
||||
import potatoes_that_will_never_be_there
|
||||
print("POTATOES: ctx.text={0}".format(ctx.text))
|
||||
assert ctx.text == "Potatoes must be installed"
|
||||
assert not ctx.has_it
|
||||
|
||||
ctx = DependencyCtxMgr("POTATOES2")
|
||||
with ctx:
|
||||
import potatoes_that_will_never_be_there_again
|
||||
assert not ctx.has_it
|
||||
print("POTATOES2: ctx.text={0}".format(ctx.text))
|
||||
assert ctx.text.startswith("No module named")
|
||||
assert "potatoes_that_will_never_be_there_again" in ctx.text
|
||||
|
||||
ctx = DependencyCtxMgr("TYPING")
|
||||
with ctx:
|
||||
import sys
|
||||
assert ctx.has_it
|
|
@ -9,7 +9,6 @@ __metaclass__ = type
|
|||
|
||||
import json
|
||||
|
||||
from ansible.module_utils import basic
|
||||
from ansible_collections.community.general.plugins.modules.system import xfconf
|
||||
|
||||
import pytest
|
||||
|
@ -22,7 +21,7 @@ def patch_xfconf(mocker):
|
|||
"""
|
||||
Function used for mocking some parts of redhat_subscribtion module
|
||||
"""
|
||||
mocker.patch('ansible_collections.community.general.plugins.modules.system.xfconf.AnsibleModule.get_bin_path',
|
||||
mocker.patch('ansible_collections.community.general.plugins.module_utils.module_helper.AnsibleModule.get_bin_path',
|
||||
return_value='/testbin/xfconf-query')
|
||||
|
||||
|
||||
|
@ -41,7 +40,6 @@ def test_without_required_parameters(capfd, patch_xfconf):
|
|||
|
||||
|
||||
TEST_CASES = [
|
||||
# Test the case, when the system is already registered
|
||||
[
|
||||
{'channel': 'xfwm4', 'property': '/general/inactive_opacity', 'state': 'get'},
|
||||
{
|
||||
|
@ -49,15 +47,35 @@ TEST_CASES = [
|
|||
'run_command.calls': [
|
||||
(
|
||||
# Calling of following command will be asserted
|
||||
'/testbin/xfconf-query --channel xfwm4 --property /general/inactive_opacity',
|
||||
['/testbin/xfconf-query', '--channel', 'xfwm4', '--property', '/general/inactive_opacity'],
|
||||
# Was return code checked?
|
||||
{'check_rc': False},
|
||||
{'environ_update': {'LANGUAGE': 'C'}, 'check_rc': False},
|
||||
# Mock of returned code, stdout and stderr
|
||||
(0, '100/n', '',),
|
||||
(0, '100\n', '',),
|
||||
),
|
||||
],
|
||||
'changed': False,
|
||||
'value': '100'
|
||||
'previous_value': '100',
|
||||
'value': '100',
|
||||
}
|
||||
],
|
||||
[
|
||||
{'channel': 'xfwm4', 'property': '/general/i_dont_exist', 'state': 'get'},
|
||||
{
|
||||
'id': 'test_simple_property_get_nonexistent',
|
||||
'run_command.calls': [
|
||||
(
|
||||
# Calling of following command will be asserted
|
||||
['/testbin/xfconf-query', '--channel', 'xfwm4', '--property', '/general/i_dont_exist'],
|
||||
# Was return code checked?
|
||||
{'environ_update': {'LANGUAGE': 'C'}, 'check_rc': False},
|
||||
# Mock of returned code, stdout and stderr
|
||||
(1, '', 'Property "/general/i_dont_exist" does not exist on channel "xfwm4".\n',),
|
||||
),
|
||||
],
|
||||
'changed': False,
|
||||
'previous_value': None,
|
||||
'value': None,
|
||||
}
|
||||
],
|
||||
[
|
||||
|
@ -67,15 +85,16 @@ TEST_CASES = [
|
|||
'run_command.calls': [
|
||||
(
|
||||
# Calling of following command will be asserted
|
||||
'/testbin/xfconf-query --channel xfwm4 --property /general/workspace_names',
|
||||
['/testbin/xfconf-query', '--channel', 'xfwm4', '--property', '/general/workspace_names'],
|
||||
# Was return code checked?
|
||||
{'check_rc': False},
|
||||
{'environ_update': {'LANGUAGE': 'C'}, 'check_rc': False},
|
||||
# Mock of returned code, stdout and stderr
|
||||
(0, 'Value is an array with 3 items:\n\nMain\nWork\nTmp\n', '',),
|
||||
),
|
||||
],
|
||||
'changed': False,
|
||||
'value': ['Main', 'Work', 'Tmp']
|
||||
'previous_value': ['Main', 'Work', 'Tmp'],
|
||||
'value': ['Main', 'Work', 'Tmp'],
|
||||
},
|
||||
],
|
||||
[
|
||||
|
@ -85,15 +104,16 @@ TEST_CASES = [
|
|||
'run_command.calls': [
|
||||
(
|
||||
# Calling of following command will be asserted
|
||||
'/testbin/xfconf-query --channel xfwm4 --property /general/use_compositing',
|
||||
['/testbin/xfconf-query', '--channel', 'xfwm4', '--property', '/general/use_compositing'],
|
||||
# Was return code checked?
|
||||
{'check_rc': False},
|
||||
{'environ_update': {'LANGUAGE': 'C'}, 'check_rc': False},
|
||||
# Mock of returned code, stdout and stderr
|
||||
(0, 'true', '',),
|
||||
),
|
||||
],
|
||||
'changed': False,
|
||||
'value': True
|
||||
'previous_value': 'true',
|
||||
'value': 'true',
|
||||
},
|
||||
],
|
||||
[
|
||||
|
@ -103,15 +123,84 @@ TEST_CASES = [
|
|||
'run_command.calls': [
|
||||
(
|
||||
# Calling of following command will be asserted
|
||||
'/testbin/xfconf-query --channel xfwm4 --property /general/use_compositing',
|
||||
['/testbin/xfconf-query', '--channel', 'xfwm4', '--property', '/general/use_compositing'],
|
||||
# Was return code checked?
|
||||
{'check_rc': False},
|
||||
{'environ_update': {'LANGUAGE': 'C'}, 'check_rc': False},
|
||||
# Mock of returned code, stdout and stderr
|
||||
(0, 'false', '',),
|
||||
),
|
||||
],
|
||||
'changed': False,
|
||||
'value': False
|
||||
'previous_value': 'false',
|
||||
'value': 'false',
|
||||
},
|
||||
],
|
||||
[
|
||||
{
|
||||
'channel': 'xfwm4',
|
||||
'property': '/general/inactive_opacity',
|
||||
'state': 'present',
|
||||
'value_type': 'int',
|
||||
'value': 90,
|
||||
},
|
||||
{
|
||||
'id': 'test_property_set_property',
|
||||
'run_command.calls': [
|
||||
(
|
||||
# Calling of following command will be asserted
|
||||
['/testbin/xfconf-query', '--channel', 'xfwm4', '--property', '/general/inactive_opacity'],
|
||||
# Was return code checked?
|
||||
{'environ_update': {'LANGUAGE': 'C'}, 'check_rc': False},
|
||||
# Mock of returned code, stdout and stderr
|
||||
(0, '100\n', '',),
|
||||
),
|
||||
(
|
||||
# Calling of following command will be asserted
|
||||
['/testbin/xfconf-query', '--channel', 'xfwm4', '--property', '/general/inactive_opacity',
|
||||
'--create', '--type', 'int', '--set', '90'],
|
||||
# Was return code checked?
|
||||
{'environ_update': {'LANGUAGE': 'C'}, 'check_rc': False},
|
||||
# Mock of returned code, stdout and stderr
|
||||
(0, '', '',),
|
||||
),
|
||||
],
|
||||
'changed': True,
|
||||
'previous_value': '100',
|
||||
'value': '90',
|
||||
},
|
||||
],
|
||||
[
|
||||
{
|
||||
'channel': 'xfwm4',
|
||||
'property': '/general/inactive_opacity',
|
||||
'state': 'present',
|
||||
'value_type': 'int',
|
||||
'value': 90,
|
||||
},
|
||||
{
|
||||
'id': 'test_property_set_property_same_value',
|
||||
'run_command.calls': [
|
||||
(
|
||||
# Calling of following command will be asserted
|
||||
['/testbin/xfconf-query', '--channel', 'xfwm4', '--property', '/general/inactive_opacity'],
|
||||
# Was return code checked?
|
||||
{'environ_update': {'LANGUAGE': 'C'}, 'check_rc': False},
|
||||
# Mock of returned code, stdout and stderr
|
||||
(0, '90\n', '',),
|
||||
),
|
||||
(
|
||||
# Calling of following command will be asserted
|
||||
['/testbin/xfconf-query', '--channel', 'xfwm4', '--property', '/general/inactive_opacity',
|
||||
'--create', '--type', 'int', '--set', '90'],
|
||||
# Was return code checked?
|
||||
{'environ_update': {'LANGUAGE': 'C'}, 'check_rc': False},
|
||||
# Mock of returned code, stdout and stderr
|
||||
(0, '', '',),
|
||||
),
|
||||
],
|
||||
'changed': False,
|
||||
'previous_value': '90',
|
||||
'value': '90',
|
||||
},
|
||||
],
|
||||
[
|
||||
|
@ -127,18 +216,19 @@ TEST_CASES = [
|
|||
'run_command.calls': [
|
||||
(
|
||||
# Calling of following command will be asserted
|
||||
'/testbin/xfconf-query --channel xfwm4 --property /general/workspace_names',
|
||||
['/testbin/xfconf-query', '--channel', 'xfwm4', '--property', '/general/workspace_names'],
|
||||
# Was return code checked?
|
||||
{'check_rc': False},
|
||||
{'environ_update': {'LANGUAGE': 'C'}, 'check_rc': False},
|
||||
# Mock of returned code, stdout and stderr
|
||||
(0, 'Value is an array with 3 items:\n\nMain\nWork\nTmp\n', '',),
|
||||
),
|
||||
(
|
||||
# Calling of following command will be asserted
|
||||
"/testbin/xfconf-query --channel xfwm4 --property /general/workspace_names --create "
|
||||
"--force-array --type 'string' --set 'A' --type 'string' --set 'B' --type 'string' --set 'C'",
|
||||
['/testbin/xfconf-query', '--channel', 'xfwm4', '--property', '/general/workspace_names',
|
||||
'--create', '--force-array', '--type', 'string', '--set', 'A', '--type', 'string', '--set', 'B',
|
||||
'--type', 'string', '--set', 'C'],
|
||||
# Was return code checked?
|
||||
{'check_rc': False},
|
||||
{'environ_update': {'LANGUAGE': 'C'}, 'check_rc': False},
|
||||
# Mock of returned code, stdout and stderr
|
||||
(0, '', '',),
|
||||
),
|
||||
|
@ -148,6 +238,73 @@ TEST_CASES = [
|
|||
'value': ['A', 'B', 'C'],
|
||||
},
|
||||
],
|
||||
[
|
||||
{
|
||||
'channel': 'xfwm4',
|
||||
'property': '/general/workspace_names',
|
||||
'state': 'present',
|
||||
'value_type': 'string',
|
||||
'value': ['A', 'B', 'C'],
|
||||
},
|
||||
{
|
||||
'id': 'test_property_set_array_to_same_value',
|
||||
'run_command.calls': [
|
||||
(
|
||||
# Calling of following command will be asserted
|
||||
['/testbin/xfconf-query', '--channel', 'xfwm4', '--property', '/general/workspace_names'],
|
||||
# Was return code checked?
|
||||
{'environ_update': {'LANGUAGE': 'C'}, 'check_rc': False},
|
||||
# Mock of returned code, stdout and stderr
|
||||
(0, 'Value is an array with 3 items:\n\nA\nB\nC\n', '',),
|
||||
),
|
||||
(
|
||||
# Calling of following command will be asserted
|
||||
['/testbin/xfconf-query', '--channel', 'xfwm4', '--property', '/general/workspace_names',
|
||||
'--create', '--force-array', '--type', 'string', '--set', 'A', '--type', 'string', '--set', 'B',
|
||||
'--type', 'string', '--set', 'C'],
|
||||
# Was return code checked?
|
||||
{'environ_update': {'LANGUAGE': 'C'}, 'check_rc': False},
|
||||
# Mock of returned code, stdout and stderr
|
||||
(0, '', '',),
|
||||
),
|
||||
],
|
||||
'changed': False,
|
||||
'previous_value': ['A', 'B', 'C'],
|
||||
'value': ['A', 'B', 'C'],
|
||||
},
|
||||
],
|
||||
[
|
||||
{
|
||||
'channel': 'xfwm4',
|
||||
'property': '/general/workspace_names',
|
||||
'state': 'absent',
|
||||
},
|
||||
{
|
||||
'id': 'test_property_reset_value',
|
||||
'run_command.calls': [
|
||||
(
|
||||
# Calling of following command will be asserted
|
||||
['/testbin/xfconf-query', '--channel', 'xfwm4', '--property', '/general/workspace_names'],
|
||||
# Was return code checked?
|
||||
{'environ_update': {'LANGUAGE': 'C'}, 'check_rc': False},
|
||||
# Mock of returned code, stdout and stderr
|
||||
(0, 'Value is an array with 3 items:\n\nA\nB\nC\n', '',),
|
||||
),
|
||||
(
|
||||
# Calling of following command will be asserted
|
||||
['/testbin/xfconf-query', '--channel', 'xfwm4', '--property', '/general/workspace_names',
|
||||
'--reset'],
|
||||
# Was return code checked?
|
||||
{'environ_update': {'LANGUAGE': 'C'}, 'check_rc': False},
|
||||
# Mock of returned code, stdout and stderr
|
||||
(0, '', '',),
|
||||
),
|
||||
],
|
||||
'changed': True,
|
||||
'previous_value': ['A', 'B', 'C'],
|
||||
'value': None,
|
||||
},
|
||||
],
|
||||
]
|
||||
TEST_CASES_IDS = [item[1]['id'] for item in TEST_CASES]
|
||||
|
||||
|
@ -165,7 +322,7 @@ def test_xfconf(mocker, capfd, patch_xfconf, testcase):
|
|||
# Mock function used for running commands first
|
||||
call_results = [item[2] for item in testcase['run_command.calls']]
|
||||
mock_run_command = mocker.patch(
|
||||
'ansible_collections.community.general.plugins.modules.system.xfconf.AnsibleModule.run_command',
|
||||
'ansible_collections.community.general.plugins.module_utils.module_helper.AnsibleModule.run_command',
|
||||
side_effect=call_results)
|
||||
|
||||
# Try to run test case
|
||||
|
@ -174,7 +331,8 @@ def test_xfconf(mocker, capfd, patch_xfconf, testcase):
|
|||
|
||||
out, err = capfd.readouterr()
|
||||
results = json.loads(out)
|
||||
print("results = %s" % results)
|
||||
print("testcase =\n%s" % testcase)
|
||||
print("results =\n%s" % results)
|
||||
assert 'changed' in results
|
||||
assert results['changed'] == testcase['changed']
|
||||
if 'msg' in results:
|
||||
|
|
Loading…
Reference in a new issue