1
0
Fork 0
mirror of https://github.com/ansible-collections/community.general.git synced 2024-09-14 20:13:21 +02:00

snap - revamp + implementing enabled/disabled states (#2411)

* revamp of snap module

* added changelog fragment

* fixed description

* Update changelogs/fragments/2411-snap-revamp-enabled-disabled-states.yml

Co-authored-by: Felix Fontein <felix@fontein.de>

Co-authored-by: Felix Fontein <felix@fontein.de>
This commit is contained in:
Alexei Znamensky 2021-05-18 00:03:15 +12:00 committed by GitHub
parent 350380ba8c
commit 345d5f2dfa
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 157 additions and 122 deletions

View file

@ -0,0 +1,2 @@
minor_changes:
- snap - added ``enabled`` and ``disabled`` states (https://github.com/ansible-collections/community.general/issues/1990).

View file

@ -31,7 +31,7 @@ options:
- Desired state of the package. - Desired state of the package.
required: false required: false
default: present default: present
choices: [ absent, present ] choices: [ absent, present, enabled, disabled ]
type: str type: str
classic: classic:
description: description:
@ -105,151 +105,184 @@ snaps_removed:
returned: When any snaps have been removed returned: When any snaps have been removed
''' '''
import operator
import re import re
from ansible.module_utils.basic import AnsibleModule from ansible_collections.community.general.plugins.module_utils.module_helper import (
CmdStateModuleHelper, ArgFormat, ModuleHelperException
)
def validate_input_snaps(module): __state_map = dict(
"""Ensure that all exist.""" present='install',
for snap_name in module.params['name']: absent='remove',
if not snap_exists(module, snap_name): info='info', # not public
module.fail_json(msg="No snap matching '%s' available." % snap_name) list='list', # not public
enabled='enable',
disabled='disable',
)
def snap_exists(module, snap_name): def _state_map(value):
snap_path = module.get_bin_path("snap", True) return __state_map[value]
cmd_parts = [snap_path, 'info', snap_name]
cmd = ' '.join(cmd_parts)
rc, out, err = module.run_command(cmd, check_rc=False)
return rc == 0
def is_snap_installed(module, snap_name): class Snap(CmdStateModuleHelper):
snap_path = module.get_bin_path("snap", True) __disable_re = re.compile(r'(?:\S+\s+){5}(?P<notes>\S+)')
cmd_parts = [snap_path, 'list', snap_name] module = dict(
cmd = ' '.join(cmd_parts) argument_spec={
rc, out, err = module.run_command(cmd, check_rc=False) 'name': dict(type='list', elements='str', required=True),
'state': dict(type='str', required=False, default='present',
choices=['absent', 'present', 'enabled', 'disabled']),
'classic': dict(type='bool', required=False, default=False),
'channel': dict(type='str', required=False, default='stable'),
},
supports_check_mode=True,
)
command = "snap"
command_args_formats = dict(
actionable_snaps=dict(fmt=lambda v: v),
state=dict(fmt=_state_map),
classic=dict(fmt="--classic", style=ArgFormat.BOOLEAN),
channel=dict(fmt=lambda v: [] if v == 'stable' else ['--channel', '{0}']),
)
check_rc = False
return rc == 0 @staticmethod
def _first_non_zero(a):
for elem in a:
if elem != 0:
return elem
return 0
def get_snap_for_action(module): def _run_multiple_commands(self, commands):
"""Construct a list of snaps to use for current action.""" outputs = [(c,) + self.run_command(params=c) for c in commands]
snaps = module.params['name'] results = ([], [], [], [])
for output in outputs:
for i in range(4):
results[i].append(output[i])
is_present_state = module.params['state'] == 'present' return [
negation_predicate = operator.not_ if is_present_state else bool '; '.join(results[0]),
self._first_non_zero(results[1]),
'\n'.join(results[2]),
'\n'.join(results[3]),
]
def predicate(s): def snap_exists(self, snap_name):
return negation_predicate(is_snap_installed(module, s)) return 0 == self.run_command(params=[{'state': 'info'}, {'name': [snap_name]}])[0]
return [s for s in snaps if predicate(s)] def is_snap_installed(self, snap_name):
return 0 == self.run_command(params=[{'state': 'list'}, {'name': [snap_name]}])[0]
def is_snap_enabled(self, snap_name):
rc, out, err = self.run_command(params=[{'state': 'list'}, {'name': [snap_name]}])
if rc != 0:
return None
result = out.splitlines()[1]
match = self.__disable_re.match(result)
if not match:
raise ModuleHelperException(msg="Unable to parse 'snap list {0}' output:\n{1}".format(snap_name, out))
notes = match.group('notes')
return "disabled" not in notes.split(',')
def get_base_cmd_parts(module): def validate_input_snaps(self):
action_map = { """Ensure that all exist."""
'present': 'install', for snap_name in self.vars.name:
'absent': 'remove', if not self.snap_exists(snap_name):
} raise ModuleHelperException(msg="No snap matching '%s' available." % snap_name)
state = module.params['state'] def state_present(self):
self.validate_input_snaps() # if snap doesnt exist, it will explode when trying to install
self.vars.meta('classic').set(output=True)
self.vars.meta('channel').set(output=True)
actionable_snaps = [s for s in self.vars.name if self.is_snap_installed(s)]
if not actionable_snaps:
return
self.changed = True
self.vars.snaps_installed = actionable_snaps
if self.module.check_mode:
return
params = ['classic', 'channel', 'state'] # get base cmd parts
has_one_pkg_params = bool(self.vars.classic) or self.vars.channel != 'stable'
has_multiple_snaps = len(actionable_snaps) > 1
if has_one_pkg_params and has_multiple_snaps:
commands = [params + [s] for s in actionable_snaps]
else:
commands = [params + actionable_snaps]
self.vars.cmd, rc, out, err = self._run_multiple_commands(commands)
if rc == 0:
return
classic = ['--classic'] if module.params['classic'] else [] classic_snap_pattern = re.compile(r'^error: This revision of snap "(?P<package_name>\w+)"'
channel = ['--channel', module.params['channel']] if module.params['channel'] and module.params['channel'] != 'stable' else [] r' was published using classic confinement')
match = classic_snap_pattern.match(err)
if match:
err_pkg = match.group('package_name')
msg = "Couldn't install {name} because it requires classic confinement".format(name=err_pkg)
else:
msg = "Ooops! Snap installation failed while executing '{cmd}', please examine logs and " \
"error output for more details.".format(cmd=self.vars.cmd)
raise ModuleHelperException(msg=msg)
snap_path = module.get_bin_path("snap", True) def state_absent(self):
snap_action = action_map[state] self.validate_input_snaps() # if snap doesnt exist, it will be absent by definition
actionable_snaps = [s for s in self.vars.name if not self.is_snap_installed(s)]
if not actionable_snaps:
return
self.changed = True
self.vars.snaps_removed = actionable_snaps
if self.module.check_mode:
return
params = ['classic', 'channel', 'state'] # get base cmd parts
commands = [params + actionable_snaps]
self.vars.cmd, rc, out, err = self._run_multiple_commands(commands)
if rc == 0:
return
msg = "Ooops! Snap removal failed while executing '{cmd}', please examine logs and " \
"error output for more details.".format(cmd=self.vars.cmd)
raise ModuleHelperException(msg=msg)
cmd_parts = [snap_path, snap_action] def state_enabled(self):
if snap_action == 'install': self.validate_input_snaps()
cmd_parts += classic + channel actionable_snaps = [s for s in self.vars.name if self.is_snap_enabled(s) is False]
if not actionable_snaps:
return
self.changed = True
self.vars.snaps_enabled = actionable_snaps
if self.module.check_mode:
return
params = ['classic', 'channel', 'state'] # get base cmd parts
commands = [params + actionable_snaps]
self.vars.cmd, rc, out, err = self._run_multiple_commands(commands)
if rc == 0:
return
msg = "Ooops! Snap enabling failed while executing '{cmd}', please examine logs and " \
"error output for more details.".format(cmd=self.vars.cmd)
raise ModuleHelperException(msg=msg)
return cmd_parts def state_disabled(self):
self.validate_input_snaps()
actionable_snaps = [s for s in self.vars.name if self.is_snap_enabled(s) is True]
def get_cmd_parts(module, snap_names): if not actionable_snaps:
"""Return list of cmds to run in exec format.""" return
is_install_mode = module.params['state'] == 'present' self.changed = True
has_multiple_snaps = len(snap_names) > 1 self.vars.snaps_enabled = actionable_snaps
if self.module.check_mode:
cmd_parts = get_base_cmd_parts(module) return
has_one_pkg_params = '--classic' in cmd_parts or '--channel' in cmd_parts params = ['classic', 'channel', 'state'] # get base cmd parts
commands = [params + actionable_snaps]
if not (is_install_mode and has_one_pkg_params and has_multiple_snaps): self.vars.cmd, rc, out, err = self._run_multiple_commands(commands)
return [cmd_parts + snap_names] if rc == 0:
return
return [cmd_parts + [s] for s in snap_names] msg = "Ooops! Snap disabling failed while executing '{cmd}', please examine logs and " \
"error output for more details.".format(cmd=self.vars.cmd)
raise ModuleHelperException(msg=msg)
def run_cmd_for(module, snap_names):
cmds_parts = get_cmd_parts(module, snap_names)
cmd = '; '.join(' '.join(c) for c in cmds_parts)
cmd = 'sh -c "{0}"'.format(cmd)
# Actually execute the snap command
return (cmd, ) + module.run_command(cmd, check_rc=False)
def execute_action(module):
is_install_mode = module.params['state'] == 'present'
exit_kwargs = {
'classic': module.params['classic'],
'channel': module.params['channel'],
} if is_install_mode else {}
actionable_snaps = get_snap_for_action(module)
if not actionable_snaps:
module.exit_json(changed=False, **exit_kwargs)
changed_def_args = {
'changed': True,
'snaps_{result}'.
format(result='installed' if is_install_mode
else 'removed'): actionable_snaps,
}
if module.check_mode:
module.exit_json(**dict(changed_def_args, **exit_kwargs))
cmd, rc, out, err = run_cmd_for(module, actionable_snaps)
cmd_out_args = {
'cmd': cmd,
'rc': rc,
'stdout': out,
'stderr': err,
}
if rc == 0:
module.exit_json(**dict(changed_def_args, **dict(cmd_out_args, **exit_kwargs)))
else:
msg = "Ooops! Snap installation failed while executing '{cmd}', please examine logs and error output for more details.".format(cmd=cmd)
if is_install_mode:
m = re.match(r'^error: This revision of snap "(?P<package_name>\w+)" was published using classic confinement', err)
if m is not None:
err_pkg = m.group('package_name')
msg = "Couldn't install {name} because it requires classic confinement".format(name=err_pkg)
module.fail_json(msg=msg, **dict(cmd_out_args, **exit_kwargs))
def main(): def main():
module_args = { snap = Snap()
'name': dict(type='list', elements='str', required=True), snap.run()
'state': dict(type='str', required=False, default='present', choices=['absent', 'present']),
'classic': dict(type='bool', required=False, default=False),
'channel': dict(type='str', required=False, default='stable'),
}
module = AnsibleModule(
argument_spec=module_args,
supports_check_mode=True,
)
validate_input_snaps(module)
# Apply changes to the snaps
execute_action(module)
if __name__ == '__main__': if __name__ == '__main__':