1
0
Fork 0
mirror of https://github.com/ansible-collections/community.general.git synced 2024-09-14 20:13:21 +02:00

snap - revamp + implementing enabled/disabled states (#2411) (#2542)

* revamp of snap module

* added changelog fragment

* fixed description

* Update changelogs/fragments/2411-snap-revamp-enabled-disabled-states.yml

Co-authored-by: Felix Fontein <felix@fontein.de>

Co-authored-by: Felix Fontein <felix@fontein.de>
(cherry picked from commit 345d5f2dfa)

Co-authored-by: Alexei Znamensky <103110+russoz@users.noreply.github.com>
This commit is contained in:
patchback[bot] 2021-06-22 19:21:29 +02:00 committed by GitHub
parent dc14070e08
commit 2d237987ae
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 157 additions and 122 deletions

View file

@ -0,0 +1,2 @@
minor_changes:
- snap - added ``enabled`` and ``disabled`` states (https://github.com/ansible-collections/community.general/issues/1990).

View file

@ -31,7 +31,7 @@ options:
- Desired state of the package.
required: false
default: present
choices: [ absent, present ]
choices: [ absent, present, enabled, disabled ]
type: str
classic:
description:
@ -105,151 +105,184 @@ snaps_removed:
returned: When any snaps have been removed
'''
import operator
import re
from ansible.module_utils.basic import AnsibleModule
from ansible_collections.community.general.plugins.module_utils.module_helper import (
CmdStateModuleHelper, ArgFormat, ModuleHelperException
)
def validate_input_snaps(module):
__state_map = dict(
present='install',
absent='remove',
info='info', # not public
list='list', # not public
enabled='enable',
disabled='disable',
)
def _state_map(value):
return __state_map[value]
class Snap(CmdStateModuleHelper):
__disable_re = re.compile(r'(?:\S+\s+){5}(?P<notes>\S+)')
module = dict(
argument_spec={
'name': dict(type='list', elements='str', required=True),
'state': dict(type='str', required=False, default='present',
choices=['absent', 'present', 'enabled', 'disabled']),
'classic': dict(type='bool', required=False, default=False),
'channel': dict(type='str', required=False, default='stable'),
},
supports_check_mode=True,
)
command = "snap"
command_args_formats = dict(
actionable_snaps=dict(fmt=lambda v: v),
state=dict(fmt=_state_map),
classic=dict(fmt="--classic", style=ArgFormat.BOOLEAN),
channel=dict(fmt=lambda v: [] if v == 'stable' else ['--channel', '{0}']),
)
check_rc = False
@staticmethod
def _first_non_zero(a):
for elem in a:
if elem != 0:
return elem
return 0
def _run_multiple_commands(self, commands):
outputs = [(c,) + self.run_command(params=c) for c in commands]
results = ([], [], [], [])
for output in outputs:
for i in range(4):
results[i].append(output[i])
return [
'; '.join(results[0]),
self._first_non_zero(results[1]),
'\n'.join(results[2]),
'\n'.join(results[3]),
]
def snap_exists(self, snap_name):
return 0 == self.run_command(params=[{'state': 'info'}, {'name': [snap_name]}])[0]
def is_snap_installed(self, snap_name):
return 0 == self.run_command(params=[{'state': 'list'}, {'name': [snap_name]}])[0]
def is_snap_enabled(self, snap_name):
rc, out, err = self.run_command(params=[{'state': 'list'}, {'name': [snap_name]}])
if rc != 0:
return None
result = out.splitlines()[1]
match = self.__disable_re.match(result)
if not match:
raise ModuleHelperException(msg="Unable to parse 'snap list {0}' output:\n{1}".format(snap_name, out))
notes = match.group('notes')
return "disabled" not in notes.split(',')
def validate_input_snaps(self):
"""Ensure that all exist."""
for snap_name in module.params['name']:
if not snap_exists(module, snap_name):
module.fail_json(msg="No snap matching '%s' available." % snap_name)
for snap_name in self.vars.name:
if not self.snap_exists(snap_name):
raise ModuleHelperException(msg="No snap matching '%s' available." % snap_name)
def snap_exists(module, snap_name):
snap_path = module.get_bin_path("snap", True)
cmd_parts = [snap_path, 'info', snap_name]
cmd = ' '.join(cmd_parts)
rc, out, err = module.run_command(cmd, check_rc=False)
return rc == 0
def is_snap_installed(module, snap_name):
snap_path = module.get_bin_path("snap", True)
cmd_parts = [snap_path, 'list', snap_name]
cmd = ' '.join(cmd_parts)
rc, out, err = module.run_command(cmd, check_rc=False)
return rc == 0
def get_snap_for_action(module):
"""Construct a list of snaps to use for current action."""
snaps = module.params['name']
is_present_state = module.params['state'] == 'present'
negation_predicate = operator.not_ if is_present_state else bool
def predicate(s):
return negation_predicate(is_snap_installed(module, s))
return [s for s in snaps if predicate(s)]
def get_base_cmd_parts(module):
action_map = {
'present': 'install',
'absent': 'remove',
}
state = module.params['state']
classic = ['--classic'] if module.params['classic'] else []
channel = ['--channel', module.params['channel']] if module.params['channel'] and module.params['channel'] != 'stable' else []
snap_path = module.get_bin_path("snap", True)
snap_action = action_map[state]
cmd_parts = [snap_path, snap_action]
if snap_action == 'install':
cmd_parts += classic + channel
return cmd_parts
def get_cmd_parts(module, snap_names):
"""Return list of cmds to run in exec format."""
is_install_mode = module.params['state'] == 'present'
has_multiple_snaps = len(snap_names) > 1
cmd_parts = get_base_cmd_parts(module)
has_one_pkg_params = '--classic' in cmd_parts or '--channel' in cmd_parts
if not (is_install_mode and has_one_pkg_params and has_multiple_snaps):
return [cmd_parts + snap_names]
return [cmd_parts + [s] for s in snap_names]
def run_cmd_for(module, snap_names):
cmds_parts = get_cmd_parts(module, snap_names)
cmd = '; '.join(' '.join(c) for c in cmds_parts)
cmd = 'sh -c "{0}"'.format(cmd)
# Actually execute the snap command
return (cmd, ) + module.run_command(cmd, check_rc=False)
def execute_action(module):
is_install_mode = module.params['state'] == 'present'
exit_kwargs = {
'classic': module.params['classic'],
'channel': module.params['channel'],
} if is_install_mode else {}
actionable_snaps = get_snap_for_action(module)
def state_present(self):
self.validate_input_snaps() # if snap doesnt exist, it will explode when trying to install
self.vars.meta('classic').set(output=True)
self.vars.meta('channel').set(output=True)
actionable_snaps = [s for s in self.vars.name if self.is_snap_installed(s)]
if not actionable_snaps:
module.exit_json(changed=False, **exit_kwargs)
changed_def_args = {
'changed': True,
'snaps_{result}'.
format(result='installed' if is_install_mode
else 'removed'): actionable_snaps,
}
if module.check_mode:
module.exit_json(**dict(changed_def_args, **exit_kwargs))
cmd, rc, out, err = run_cmd_for(module, actionable_snaps)
cmd_out_args = {
'cmd': cmd,
'rc': rc,
'stdout': out,
'stderr': err,
}
if rc == 0:
module.exit_json(**dict(changed_def_args, **dict(cmd_out_args, **exit_kwargs)))
return
self.changed = True
self.vars.snaps_installed = actionable_snaps
if self.module.check_mode:
return
params = ['classic', 'channel', 'state'] # get base cmd parts
has_one_pkg_params = bool(self.vars.classic) or self.vars.channel != 'stable'
has_multiple_snaps = len(actionable_snaps) > 1
if has_one_pkg_params and has_multiple_snaps:
commands = [params + [s] for s in actionable_snaps]
else:
msg = "Ooops! Snap installation failed while executing '{cmd}', please examine logs and error output for more details.".format(cmd=cmd)
if is_install_mode:
m = re.match(r'^error: This revision of snap "(?P<package_name>\w+)" was published using classic confinement', err)
if m is not None:
err_pkg = m.group('package_name')
commands = [params + actionable_snaps]
self.vars.cmd, rc, out, err = self._run_multiple_commands(commands)
if rc == 0:
return
classic_snap_pattern = re.compile(r'^error: This revision of snap "(?P<package_name>\w+)"'
r' was published using classic confinement')
match = classic_snap_pattern.match(err)
if match:
err_pkg = match.group('package_name')
msg = "Couldn't install {name} because it requires classic confinement".format(name=err_pkg)
module.fail_json(msg=msg, **dict(cmd_out_args, **exit_kwargs))
else:
msg = "Ooops! Snap installation failed while executing '{cmd}', please examine logs and " \
"error output for more details.".format(cmd=self.vars.cmd)
raise ModuleHelperException(msg=msg)
def state_absent(self):
self.validate_input_snaps() # if snap doesnt exist, it will be absent by definition
actionable_snaps = [s for s in self.vars.name if not self.is_snap_installed(s)]
if not actionable_snaps:
return
self.changed = True
self.vars.snaps_removed = actionable_snaps
if self.module.check_mode:
return
params = ['classic', 'channel', 'state'] # get base cmd parts
commands = [params + actionable_snaps]
self.vars.cmd, rc, out, err = self._run_multiple_commands(commands)
if rc == 0:
return
msg = "Ooops! Snap removal failed while executing '{cmd}', please examine logs and " \
"error output for more details.".format(cmd=self.vars.cmd)
raise ModuleHelperException(msg=msg)
def state_enabled(self):
self.validate_input_snaps()
actionable_snaps = [s for s in self.vars.name if self.is_snap_enabled(s) is False]
if not actionable_snaps:
return
self.changed = True
self.vars.snaps_enabled = actionable_snaps
if self.module.check_mode:
return
params = ['classic', 'channel', 'state'] # get base cmd parts
commands = [params + actionable_snaps]
self.vars.cmd, rc, out, err = self._run_multiple_commands(commands)
if rc == 0:
return
msg = "Ooops! Snap enabling failed while executing '{cmd}', please examine logs and " \
"error output for more details.".format(cmd=self.vars.cmd)
raise ModuleHelperException(msg=msg)
def state_disabled(self):
self.validate_input_snaps()
actionable_snaps = [s for s in self.vars.name if self.is_snap_enabled(s) is True]
if not actionable_snaps:
return
self.changed = True
self.vars.snaps_enabled = actionable_snaps
if self.module.check_mode:
return
params = ['classic', 'channel', 'state'] # get base cmd parts
commands = [params + actionable_snaps]
self.vars.cmd, rc, out, err = self._run_multiple_commands(commands)
if rc == 0:
return
msg = "Ooops! Snap disabling failed while executing '{cmd}', please examine logs and " \
"error output for more details.".format(cmd=self.vars.cmd)
raise ModuleHelperException(msg=msg)
def main():
module_args = {
'name': dict(type='list', elements='str', required=True),
'state': dict(type='str', required=False, default='present', choices=['absent', 'present']),
'classic': dict(type='bool', required=False, default=False),
'channel': dict(type='str', required=False, default='stable'),
}
module = AnsibleModule(
argument_spec=module_args,
supports_check_mode=True,
)
validate_input_snaps(module)
# Apply changes to the snaps
execute_action(module)
snap = Snap()
snap.run()
if __name__ == '__main__':