diff --git a/changelogs/fragments/2411-snap-revamp-enabled-disabled-states.yml b/changelogs/fragments/2411-snap-revamp-enabled-disabled-states.yml new file mode 100644 index 0000000000..a52b377817 --- /dev/null +++ b/changelogs/fragments/2411-snap-revamp-enabled-disabled-states.yml @@ -0,0 +1,2 @@ +minor_changes: + - snap - added ``enabled`` and ``disabled`` states (https://github.com/ansible-collections/community.general/issues/1990). diff --git a/plugins/modules/packaging/os/snap.py b/plugins/modules/packaging/os/snap.py index 9776b4e50a..fab2558ccf 100644 --- a/plugins/modules/packaging/os/snap.py +++ b/plugins/modules/packaging/os/snap.py @@ -31,7 +31,7 @@ options: - Desired state of the package. required: false default: present - choices: [ absent, present ] + choices: [ absent, present, enabled, disabled ] type: str classic: description: @@ -105,151 +105,184 @@ snaps_removed: returned: When any snaps have been removed ''' -import operator import re -from ansible.module_utils.basic import AnsibleModule +from ansible_collections.community.general.plugins.module_utils.module_helper import ( + CmdStateModuleHelper, ArgFormat, ModuleHelperException +) -def validate_input_snaps(module): - """Ensure that all exist.""" - for snap_name in module.params['name']: - if not snap_exists(module, snap_name): - module.fail_json(msg="No snap matching '%s' available." % snap_name) +__state_map = dict( + present='install', + absent='remove', + info='info', # not public + list='list', # not public + enabled='enable', + disabled='disable', +) -def snap_exists(module, snap_name): - snap_path = module.get_bin_path("snap", True) - cmd_parts = [snap_path, 'info', snap_name] - cmd = ' '.join(cmd_parts) - rc, out, err = module.run_command(cmd, check_rc=False) - - return rc == 0 +def _state_map(value): + return __state_map[value] -def is_snap_installed(module, snap_name): - snap_path = module.get_bin_path("snap", True) - cmd_parts = [snap_path, 'list', snap_name] - cmd = ' '.join(cmd_parts) - rc, out, err = module.run_command(cmd, check_rc=False) +class Snap(CmdStateModuleHelper): + __disable_re = re.compile(r'(?:\S+\s+){5}(?P\S+)') + module = dict( + argument_spec={ + 'name': dict(type='list', elements='str', required=True), + 'state': dict(type='str', required=False, default='present', + choices=['absent', 'present', 'enabled', 'disabled']), + 'classic': dict(type='bool', required=False, default=False), + 'channel': dict(type='str', required=False, default='stable'), + }, + supports_check_mode=True, + ) + command = "snap" + command_args_formats = dict( + actionable_snaps=dict(fmt=lambda v: v), + state=dict(fmt=_state_map), + classic=dict(fmt="--classic", style=ArgFormat.BOOLEAN), + channel=dict(fmt=lambda v: [] if v == 'stable' else ['--channel', '{0}']), + ) + check_rc = False - return rc == 0 + @staticmethod + def _first_non_zero(a): + for elem in a: + if elem != 0: + return elem + return 0 -def get_snap_for_action(module): - """Construct a list of snaps to use for current action.""" - snaps = module.params['name'] + def _run_multiple_commands(self, commands): + outputs = [(c,) + self.run_command(params=c) for c in commands] + results = ([], [], [], []) + for output in outputs: + for i in range(4): + results[i].append(output[i]) - is_present_state = module.params['state'] == 'present' - negation_predicate = operator.not_ if is_present_state else bool + return [ + '; '.join(results[0]), + self._first_non_zero(results[1]), + '\n'.join(results[2]), + '\n'.join(results[3]), + ] - def predicate(s): - return negation_predicate(is_snap_installed(module, s)) + def snap_exists(self, snap_name): + return 0 == self.run_command(params=[{'state': 'info'}, {'name': [snap_name]}])[0] - return [s for s in snaps if predicate(s)] + def is_snap_installed(self, snap_name): + return 0 == self.run_command(params=[{'state': 'list'}, {'name': [snap_name]}])[0] + def is_snap_enabled(self, snap_name): + rc, out, err = self.run_command(params=[{'state': 'list'}, {'name': [snap_name]}]) + if rc != 0: + return None + result = out.splitlines()[1] + match = self.__disable_re.match(result) + if not match: + raise ModuleHelperException(msg="Unable to parse 'snap list {0}' output:\n{1}".format(snap_name, out)) + notes = match.group('notes') + return "disabled" not in notes.split(',') -def get_base_cmd_parts(module): - action_map = { - 'present': 'install', - 'absent': 'remove', - } + def validate_input_snaps(self): + """Ensure that all exist.""" + for snap_name in self.vars.name: + if not self.snap_exists(snap_name): + raise ModuleHelperException(msg="No snap matching '%s' available." % snap_name) - state = module.params['state'] + def state_present(self): + self.validate_input_snaps() # if snap doesnt exist, it will explode when trying to install + self.vars.meta('classic').set(output=True) + self.vars.meta('channel').set(output=True) + actionable_snaps = [s for s in self.vars.name if self.is_snap_installed(s)] + if not actionable_snaps: + return + self.changed = True + self.vars.snaps_installed = actionable_snaps + if self.module.check_mode: + return + params = ['classic', 'channel', 'state'] # get base cmd parts + has_one_pkg_params = bool(self.vars.classic) or self.vars.channel != 'stable' + has_multiple_snaps = len(actionable_snaps) > 1 + if has_one_pkg_params and has_multiple_snaps: + commands = [params + [s] for s in actionable_snaps] + else: + commands = [params + actionable_snaps] + self.vars.cmd, rc, out, err = self._run_multiple_commands(commands) + if rc == 0: + return - classic = ['--classic'] if module.params['classic'] else [] - channel = ['--channel', module.params['channel']] if module.params['channel'] and module.params['channel'] != 'stable' else [] + classic_snap_pattern = re.compile(r'^error: This revision of snap "(?P\w+)"' + r' was published using classic confinement') + match = classic_snap_pattern.match(err) + if match: + err_pkg = match.group('package_name') + msg = "Couldn't install {name} because it requires classic confinement".format(name=err_pkg) + else: + msg = "Ooops! Snap installation failed while executing '{cmd}', please examine logs and " \ + "error output for more details.".format(cmd=self.vars.cmd) + raise ModuleHelperException(msg=msg) - snap_path = module.get_bin_path("snap", True) - snap_action = action_map[state] + def state_absent(self): + self.validate_input_snaps() # if snap doesnt exist, it will be absent by definition + actionable_snaps = [s for s in self.vars.name if not self.is_snap_installed(s)] + if not actionable_snaps: + return + self.changed = True + self.vars.snaps_removed = actionable_snaps + if self.module.check_mode: + return + params = ['classic', 'channel', 'state'] # get base cmd parts + commands = [params + actionable_snaps] + self.vars.cmd, rc, out, err = self._run_multiple_commands(commands) + if rc == 0: + return + msg = "Ooops! Snap removal failed while executing '{cmd}', please examine logs and " \ + "error output for more details.".format(cmd=self.vars.cmd) + raise ModuleHelperException(msg=msg) - cmd_parts = [snap_path, snap_action] - if snap_action == 'install': - cmd_parts += classic + channel + def state_enabled(self): + self.validate_input_snaps() + actionable_snaps = [s for s in self.vars.name if self.is_snap_enabled(s) is False] + if not actionable_snaps: + return + self.changed = True + self.vars.snaps_enabled = actionable_snaps + if self.module.check_mode: + return + params = ['classic', 'channel', 'state'] # get base cmd parts + commands = [params + actionable_snaps] + self.vars.cmd, rc, out, err = self._run_multiple_commands(commands) + if rc == 0: + return + msg = "Ooops! Snap enabling failed while executing '{cmd}', please examine logs and " \ + "error output for more details.".format(cmd=self.vars.cmd) + raise ModuleHelperException(msg=msg) - return cmd_parts - - -def get_cmd_parts(module, snap_names): - """Return list of cmds to run in exec format.""" - is_install_mode = module.params['state'] == 'present' - has_multiple_snaps = len(snap_names) > 1 - - cmd_parts = get_base_cmd_parts(module) - has_one_pkg_params = '--classic' in cmd_parts or '--channel' in cmd_parts - - if not (is_install_mode and has_one_pkg_params and has_multiple_snaps): - return [cmd_parts + snap_names] - - return [cmd_parts + [s] for s in snap_names] - - -def run_cmd_for(module, snap_names): - cmds_parts = get_cmd_parts(module, snap_names) - cmd = '; '.join(' '.join(c) for c in cmds_parts) - cmd = 'sh -c "{0}"'.format(cmd) - - # Actually execute the snap command - return (cmd, ) + module.run_command(cmd, check_rc=False) - - -def execute_action(module): - is_install_mode = module.params['state'] == 'present' - exit_kwargs = { - 'classic': module.params['classic'], - 'channel': module.params['channel'], - } if is_install_mode else {} - - actionable_snaps = get_snap_for_action(module) - if not actionable_snaps: - module.exit_json(changed=False, **exit_kwargs) - - changed_def_args = { - 'changed': True, - 'snaps_{result}'. - format(result='installed' if is_install_mode - else 'removed'): actionable_snaps, - } - - if module.check_mode: - module.exit_json(**dict(changed_def_args, **exit_kwargs)) - - cmd, rc, out, err = run_cmd_for(module, actionable_snaps) - cmd_out_args = { - 'cmd': cmd, - 'rc': rc, - 'stdout': out, - 'stderr': err, - } - - if rc == 0: - module.exit_json(**dict(changed_def_args, **dict(cmd_out_args, **exit_kwargs))) - else: - msg = "Ooops! Snap installation failed while executing '{cmd}', please examine logs and error output for more details.".format(cmd=cmd) - if is_install_mode: - m = re.match(r'^error: This revision of snap "(?P\w+)" was published using classic confinement', err) - if m is not None: - err_pkg = m.group('package_name') - msg = "Couldn't install {name} because it requires classic confinement".format(name=err_pkg) - module.fail_json(msg=msg, **dict(cmd_out_args, **exit_kwargs)) + def state_disabled(self): + self.validate_input_snaps() + actionable_snaps = [s for s in self.vars.name if self.is_snap_enabled(s) is True] + if not actionable_snaps: + return + self.changed = True + self.vars.snaps_enabled = actionable_snaps + if self.module.check_mode: + return + params = ['classic', 'channel', 'state'] # get base cmd parts + commands = [params + actionable_snaps] + self.vars.cmd, rc, out, err = self._run_multiple_commands(commands) + if rc == 0: + return + msg = "Ooops! Snap disabling failed while executing '{cmd}', please examine logs and " \ + "error output for more details.".format(cmd=self.vars.cmd) + raise ModuleHelperException(msg=msg) def main(): - module_args = { - 'name': dict(type='list', elements='str', required=True), - 'state': dict(type='str', required=False, default='present', choices=['absent', 'present']), - 'classic': dict(type='bool', required=False, default=False), - 'channel': dict(type='str', required=False, default='stable'), - } - module = AnsibleModule( - argument_spec=module_args, - supports_check_mode=True, - ) - - validate_input_snaps(module) - - # Apply changes to the snaps - execute_action(module) + snap = Snap() + snap.run() if __name__ == '__main__':