1
0
Fork 0
mirror of https://github.com/ansible-collections/community.general.git synced 2024-09-14 20:13:21 +02:00

cmd_runner module utils: fix case for as_fixed() format (#5538) (#5562)

* cmd_runner module utils: fix case for as_fixed() format

* add changelog fragment

* simplified test_cmd_runner

* fix handling empty default for `as_map()`

* add changelog fragment

* MissingArgumentValue is reraised in run()

(cherry picked from commit e87ca10b61)

Co-authored-by: Alexei Znamensky <103110+russoz@users.noreply.github.com>
This commit is contained in:
patchback[bot] 2022-11-16 06:58:30 +01:00 committed by GitHub
parent 57a4195b0d
commit c681249364
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 77 additions and 69 deletions

View file

@ -0,0 +1,3 @@
bugfixes:
- cmd_runner module utils - formatting arguments ``cmd_runner_fmt.as_fixed()`` was expecting an non-existing argument (https://github.com/ansible-collections/community.general/pull/5538).
- cmd_runner module utils - fixed bug when handling default cases in ``cmd_runner_fmt.as_map()`` (https://github.com/ansible-collections/community.general/pull/5538).

View file

@ -88,9 +88,10 @@ class FormatError(CmdRunnerException):
class _ArgFormat(object): class _ArgFormat(object):
def __init__(self, func, ignore_none=None): def __init__(self, func, ignore_none=None, ignore_missing_value=False):
self.func = func self.func = func
self.ignore_none = ignore_none self.ignore_none = ignore_none
self.ignore_missing_value = ignore_missing_value
def __call__(self, value, ctx_ignore_none): def __call__(self, value, ctx_ignore_none):
ignore_none = self.ignore_none if self.ignore_none is not None else ctx_ignore_none ignore_none = self.ignore_none if self.ignore_none is not None else ctx_ignore_none
@ -127,7 +128,7 @@ class _Format(object):
@staticmethod @staticmethod
def as_fixed(args): def as_fixed(args):
return _ArgFormat(lambda value: _ensure_list(args), ignore_none=False) return _ArgFormat(lambda value: _ensure_list(args), ignore_none=False, ignore_missing_value=True)
@staticmethod @staticmethod
def as_func(func, ignore_none=None): def as_func(func, ignore_none=None):
@ -135,14 +136,15 @@ class _Format(object):
@staticmethod @staticmethod
def as_map(_map, default=None, ignore_none=None): def as_map(_map, default=None, ignore_none=None):
if default is None:
default = []
return _ArgFormat(lambda value: _ensure_list(_map.get(value, default)), ignore_none=ignore_none) return _ArgFormat(lambda value: _ensure_list(_map.get(value, default)), ignore_none=ignore_none)
@staticmethod @staticmethod
def as_default_type(_type, arg="", ignore_none=None): def as_default_type(_type, arg="", ignore_none=None):
fmt = _Format fmt = _Format
if _type == "dict": if _type == "dict":
return fmt.as_func(lambda d: ["--{0}={1}".format(*a) for a in iteritems(d)], return fmt.as_func(lambda d: ["--{0}={1}".format(*a) for a in iteritems(d)], ignore_none=ignore_none)
ignore_none=ignore_none)
if _type == "list": if _type == "list":
return fmt.as_func(lambda value: ["--{0}".format(x) for x in value], ignore_none=ignore_none) return fmt.as_func(lambda value: ["--{0}".format(x) for x in value], ignore_none=ignore_none)
if _type == "bool": if _type == "bool":
@ -261,10 +263,13 @@ class _CmdRunnerContext(object):
for arg_name in self.args_order: for arg_name in self.args_order:
value = None value = None
try: try:
value = named_args[arg_name] if arg_name in named_args:
value = named_args[arg_name]
elif not runner.arg_formats[arg_name].ignore_missing_value:
raise MissingArgumentValue(self.args_order, arg_name)
self.cmd.extend(runner.arg_formats[arg_name](value, ctx_ignore_none=self.ignore_value_none)) self.cmd.extend(runner.arg_formats[arg_name](value, ctx_ignore_none=self.ignore_value_none))
except KeyError: except MissingArgumentValue:
raise MissingArgumentValue(self.args_order, arg_name) raise
except Exception as e: except Exception as e:
raise FormatError(arg_name, value, runner.arg_formats[arg_name], e) raise FormatError(arg_name, value, runner.arg_formats[arg_name], e)

View file

@ -240,6 +240,60 @@ TC_RUNNER = dict(
), ),
), ),
), ),
aa_bb_fixed=(
dict(
args_bundle=dict(
aa=dict(type="int", value=11, fmt_func=fmt.as_opt_eq_val, fmt_arg="--answer"),
bb=dict(fmt_func=fmt.as_fixed, fmt_arg=["fixed", "args"]),
),
runner_init_args=dict(),
runner_ctx_args=dict(args_order=['aa', 'bb']),
),
dict(runner_ctx_run_args=dict(), rc=0, out="", err=""),
dict(
run_info=dict(
cmd=['/mock/bin/testing', '--answer=11', 'fixed', 'args'],
environ_update={'LANGUAGE': 'C', 'LC_ALL': 'C'},
args_order=('aa', 'bb'),
),
),
),
aa_bb_map=(
dict(
args_bundle=dict(
aa=dict(type="int", value=11, fmt_func=fmt.as_opt_eq_val, fmt_arg="--answer"),
bb=dict(fmt_func=fmt.as_map, fmt_arg={"v1": 111, "v2": 222}),
),
runner_init_args=dict(),
runner_ctx_args=dict(args_order=['aa', 'bb']),
),
dict(runner_ctx_run_args=dict(bb="v2"), rc=0, out="", err=""),
dict(
run_info=dict(
cmd=['/mock/bin/testing', '--answer=11', '222'],
environ_update={'LANGUAGE': 'C', 'LC_ALL': 'C'},
args_order=('aa', 'bb'),
),
),
),
aa_bb_map_default=(
dict(
args_bundle=dict(
aa=dict(type="int", value=11, fmt_func=fmt.as_opt_eq_val, fmt_arg="--answer"),
bb=dict(fmt_func=fmt.as_map, fmt_arg={"v1": 111, "v2": 222}),
),
runner_init_args=dict(),
runner_ctx_args=dict(args_order=['aa', 'bb']),
),
dict(runner_ctx_run_args=dict(bb="v123456789"), rc=0, out="", err=""),
dict(
run_info=dict(
cmd=['/mock/bin/testing', '--answer=11'],
environ_update={'LANGUAGE': 'C', 'LC_ALL': 'C'},
args_order=('aa', 'bb'),
),
),
),
) )
TC_RUNNER_IDS = sorted(TC_RUNNER.keys()) TC_RUNNER_IDS = sorted(TC_RUNNER.keys())
@ -301,70 +355,16 @@ def test_runner_context(runner_input, cmd_execution, expected):
results = ctx.run(**cmd_execution['runner_ctx_run_args']) results = ctx.run(**cmd_execution['runner_ctx_run_args'])
_assert_run(runner_input, cmd_execution, expected, ctx, results) _assert_run(runner_input, cmd_execution, expected, ctx, results)
with pytest.raises(exc):
with runner(**runner_input['runner_ctx_args']) as ctx2:
results2 = ctx2.run(**cmd_execution['runner_ctx_run_args'])
_assert_run(runner_input, cmd_execution, expected, ctx2, results2)
else: else:
with runner.context(**runner_input['runner_ctx_args']) as ctx: with runner.context(**runner_input['runner_ctx_args']) as ctx:
results = ctx.run(**cmd_execution['runner_ctx_run_args']) results = ctx.run(**cmd_execution['runner_ctx_run_args'])
_assert_run(runner_input, cmd_execution, expected, ctx, results) _assert_run(runner_input, cmd_execution, expected, ctx, results)
with runner(**runner_input['runner_ctx_args']) as ctx2:
@pytest.mark.parametrize('runner_input, cmd_execution, expected', results2 = ctx2.run(**cmd_execution['runner_ctx_run_args'])
(TC_RUNNER[tc] for tc in TC_RUNNER_IDS), _assert_run(runner_input, cmd_execution, expected, ctx2, results2)
ids=TC_RUNNER_IDS)
def test_runner_callable(runner_input, cmd_execution, expected):
arg_spec = {}
params = {}
arg_formats = {}
for k, v in runner_input['args_bundle'].items():
try:
arg_spec[k] = {'type': v['type']}
except KeyError:
pass
try:
params[k] = v['value']
except KeyError:
pass
try:
arg_formats[k] = v['fmt_func'](v['fmt_arg'])
except KeyError:
pass
orig_results = tuple(cmd_execution[x] for x in ('rc', 'out', 'err'))
print("arg_spec={0}\nparams={1}\narg_formats={2}\n".format(
arg_spec,
params,
arg_formats,
))
module = MagicMock()
type(module).argument_spec = PropertyMock(return_value=arg_spec)
type(module).params = PropertyMock(return_value=params)
module.get_bin_path.return_value = '/mock/bin/testing'
module.run_command.return_value = orig_results
runner = CmdRunner(
module=module,
command="testing",
arg_formats=arg_formats,
**runner_input['runner_init_args']
)
def _assert_run_info(actual, expected):
reduced = dict((k, actual[k]) for k in expected.keys())
assert reduced == expected, "{0}".format(reduced)
def _assert_run(runner_input, cmd_execution, expected, ctx, results):
_assert_run_info(ctx.run_info, expected['run_info'])
assert results == expected.get('results', orig_results)
exc = expected.get("exc")
if exc:
with pytest.raises(exc):
with runner(**runner_input['runner_ctx_args']) as ctx:
results = ctx.run(**cmd_execution['runner_ctx_run_args'])
_assert_run(runner_input, cmd_execution, expected, ctx, results)
else:
with runner(**runner_input['runner_ctx_args']) as ctx:
results = ctx.run(**cmd_execution['runner_ctx_run_args'])
_assert_run(runner_input, cmd_execution, expected, ctx, results)