1
0
Fork 0
mirror of https://github.com/ansible-collections/community.general.git synced 2024-09-14 20:13:21 +02:00

Add plugin_requirements_info action.

This commit is contained in:
Felix Fontein 2023-12-11 23:16:57 +01:00
parent 0c7b9e50b5
commit 4eab1a9062
3 changed files with 553 additions and 0 deletions

View file

@ -0,0 +1,176 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2023, Felix Fontein <felix@fontein.de>
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
# SPDX-License-Identifier: GPL-3.0-or-later
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
from ansible.errors import AnsibleError
from ansible.module_utils.common.text.converters import to_native
from ansible.module_utils.common._collections_compat import Mapping
from ansible.plugins.action import ActionBase
from ansible.utils.display import Display
from ansible.plugins.loader import connection_loader
from ansible.constants import DOCUMENTABLE_PLUGINS
from ansible.module_utils.common.validation import (
check_type_bool,
check_type_dict,
check_type_list,
check_type_str,
)
from ansible_collections.community.general.plugins.plugin_utils._dependencies import (
LoadingError,
UnknownPlugin,
UnknownPluginType,
retrieve_plugin_dependencies,
get_needed_facts,
get_used_facts,
Requirements,
RequirementFinder,
)
display = Display()
class TimedOutException(Exception):
pass
class ActionModule(ActionBase):
TRANSFERS_FILES = False
_VALID_ARGS = frozenset((
'plugins',
'modules_on_remote',
))
def __init__(self, *args, **kwargs):
super(ActionModule, self).__init__(*args, **kwargs)
def _load_facts(self, task_vars, hostname):
display.vvv('<{host}> {action}: running setup module to get facts'.format(host=hostname, action=self._task.action))
module_output = self._execute_module(
task_vars=task_vars,
module_name='ansible.legacy.setup',
module_args={'gather_subset': 'min'})
if module_output.get('failed', False):
raise AnsibleError('Failed to determine system distribution. {0}, {1}'.format(
to_native(module_output['module_stdout']).strip(),
to_native(module_output['module_stderr']).strip()))
result = {}
used_facts = get_used_facts()
for k, v in module_output['ansible_facts'].items():
if k.startswith('ansible_'):
k = k[8:]
if k in used_facts:
result[k] = v
return result
def _extract_facts(self, task_vars):
if not isinstance(task_vars, Mapping):
return None
if 'ansible_facts' not in task_vars:
return None
ansible_facts = task_vars['ansible_facts']
needed_facts = get_needed_facts()
if any(k not in ansible_facts for k in needed_facts):
return None
used_facts = get_used_facts()
return {k: ansible_facts[k] for k in used_facts if k in ansible_facts}
def _get_facts(self, local, task_vars, hostname):
if local and self._connection.transport != 'local':
format_vars = dict(host=hostname, action=self._task.action)
result = self._extract_facts({'ansible_facts': self._templar.template("{{hostvars['localhost']['ansible_facts']}}")})
if result:
display.vvv('<{host}> {action}: already have local facts'.format(**format_vars))
return result
original_connection = self._connection
try:
display.vvv('<{host}> {action}: getting hold of local connection...'.format(**format_vars))
self._connection = connection_loader.get('ansible.legacy.local', self._play_context)
display.vvv('<{host}> {action}: retrieving local facts...'.format(**format_vars))
return self._load_facts(task_vars, hostname)
finally:
self._connection = original_connection
elif not local and self._connection.transport == 'local':
raise AnsibleError('Cannot retrieve remote facts if connection is local')
format_vars = dict(host=hostname, action=self._task.action, local_remote='local' if local else 'remote')
result = self._extract_facts(task_vars)
if result:
display.vvv('<{host}> {action}: already have {local_remote} facts'.format(**format_vars))
return result
display.vvv('<{host}> {action}: retrieving {local_remote} facts...'.format(**format_vars))
return self._load_facts(task_vars, hostname)
def run(self, tmp=None, task_vars=None):
self._supports_check_mode = True
if task_vars is None:
task_vars = {}
result = super(ActionModule, self).run(tmp, task_vars)
if result.get('skipped', False) or result.get('failed', False):
return result
try:
if 'plugins' not in self._task.args:
raise TypeError('missing required arguments: plugins')
modules_on_remote = check_type_bool(self._task.args.get('modules_on_remote', True))
plugins = []
for plugin in [check_type_dict(plug) for plug in check_type_list(self._task.args['plugins'])]:
if 'name' not in plugin:
raise TypeError('missing required arguments: name found in plugins')
plugin_type = check_type_str(plugin.get('type', 'module'))
if plugin_type not in DOCUMENTABLE_PLUGINS:
raise TypeError('unknown plugin type %s' % plugin_type)
plugins.append({
'name': check_type_str(plugin.get('name')),
'type': plugin_type,
})
except TypeError as exc:
result['failed'] = True
result['msg'] = to_native(exc)
return result
hostname = task_vars.get('inventory_hostname')
need_remote_facts = modules_on_remote and any(plugin['type'] == 'module' for plugin in plugins)
need_local_facts = (plugins and not modules_on_remote) or any(plugin['type'] != 'module' for plugin in plugins)
if self._connection.transport == 'local':
need_remote_facts = False
need_local_facts = bool(plugins)
controller_ansible_facts = self._get_facts(local=True, task_vars=task_vars, hostname=hostname) if need_local_facts else {}
remote_ansible_facts = self._get_facts(local=False, task_vars=task_vars, hostname=hostname) if need_remote_facts else {}
try:
requirement_finder = RequirementFinder(self._templar, controller_ansible_facts, remote_ansible_facts)
all_deps = Requirements()
for plugin in plugins:
display.vvv('<{host}> {action}: retrieving installable requirements of {type} {name}'.format(host=hostname, action=self._task.action, **plugin))
installable_requirements = retrieve_plugin_dependencies(plugin['name'], plugin['type'])
deps = requirement_finder.find(installable_requirements, modules_for_remote=modules_on_remote and self._connection.transport != 'local')
all_deps.merge(deps)
result['python'] = sorted(all_deps.python)
result['system'] = sorted(all_deps.system)
result['changed'] = False
return result
except UnknownPluginType as exc:
result['failed'] = True
result['msg'] = 'Unknown plugin type: %s' % to_native(exc)
return result
except UnknownPlugin as exc:
result['failed'] = True
result['msg'] = 'Unknown plugin: %s' % to_native(exc)
return result
except LoadingError as exc:
result['failed'] = True
result['msg'] = to_native(exc)
return result

View file

@ -0,0 +1,112 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# Copyright (c) 2023, Felix Fontein <felix@fontein.de>
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
# SPDX-License-Identifier: GPL-3.0-or-later
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
DOCUMENTATION = r'''
module: plugin_requirements_info
short_description: Gather requirements for one or multiple plugins
description:
- Gather requirements for one or multiple plugins.
version_added: 8.2.0
extends_documentation_fragment:
- community.general.attributes
- community.general.attributes.info_module
- community.general.attributes.flow
attributes:
action:
support: full
async:
support: none
requirements: []
installable_requirements: []
options:
plugins:
description:
- A list of plugins to query requirements for.
required: true
type: list
elements: dict
suboptions:
name:
description:
- The name of the plugin.
required: true
type: str
type:
description:
- The type of the plugin.
- Not all types are supported by all versions of ansible-core. Generally C(ansible-doc -t <type>) must work.
default: 'module'
type: str
choices:
# CONFIGURABLE_PLUGINS
- become
- cache
- callback
- cliconf
- connection
- httpapi
- inventory
- lookup
- netconf
- shell
- vars
# DOCUMENTABLE_PLUGINS
- module
- strategy
- test
- filter
modules_on_remote:
description:
- Whether to assume that modules run on the remote, and not the controller.
- Set to V(false) if you plan to run the module(s) on the controller.
type: bool
default: true
author:
- Felix Fontein (@felixfontein)
'''
EXAMPLES = r'''
- name: Unconditionally shut down the machine with all defaults
community.general.plugin_requirements_info:
plugins:
- name: community.general.plugin_requirements_info
register: requirements
- name: Install system requirements
ansible.builtin.package:
name: "{{ requirements.system }}"
when: "{{ requirements.system }}"
- name: Install Python requirements
ansible.builtin.pip:
name: "{{ requirements.python }}"
when: "{{ requirements.python }}"
'''
RETURN = r'''
system:
description:
- A list of system requirements.
type: list
elements: str
returned: success
sample:
- openssl
python:
description:
- A list of Python requirements.
type: list
elements: str
returned: success
sample:
- cryptography
'''

View file

@ -0,0 +1,265 @@
# Copyright (c) 2023, Felix Fontein <felix@fontein.de>
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
# SPDX-License-Identifier: GPL-3.0-or-later
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import ansible.plugins.loader as plugin_loader
from ansible import __version__ as ansible_version
from ansible.errors import AnsiblePluginNotFound
from ansible.module_utils.six import raise_from
from ansible.plugins.loader import fragment_loader
from ansible.template import Templar
from ansible.utils.plugin_docs import get_plugin_docs
from ansible_collections.community.general.plugins.module_utils.version import LooseVersion
# Whether Templar has a cache, which can be controlled by Templar.template()'s cache option.
# The cache was removed for ansible-core 2.14 (https://github.com/ansible/ansible/pull/78419)
_TEMPLAR_HAS_TEMPLATE_CACHE = LooseVersion(ansible_version) < LooseVersion('2.14.0')
class LoadingError(Exception):
pass
class UnknownPlugin(LoadingError):
pass
class UnknownPluginType(UnknownPlugin):
pass
class Requirements(object):
def __init__(self, system=None, python=None):
self.system = set(system if system else [])
self.python = set(python if python else [])
def merge(self, other):
self.system.update(other.system)
self.python.update(other.python)
def __repr__(self):
return 'Requirements(system={system!r}, python={python!r})'.format(
system=sorted(self.system),
python=sorted(self.python),
)
def _check(value, name, acceptable_types, accept_none=False):
if accept_none and value is None:
return value
if isinstance(value, acceptable_types):
return value
if not isinstance(acceptable_types, tuple):
acceptable_types = (acceptable_types, )
raise LoadingError(
'{name} {value!r} is not one of types {acceptable_types}{or_none}'.format(
value=value,
name=name,
acceptable_types=', '.join(str(t) for t in acceptable_types),
or_none=', or none' if accept_none else ''
)
)
def _check_list(value, name, element_types, accept_none=False, none_result=None):
if accept_none and value is None:
return none_result
if isinstance(value, element_types):
value = [value]
if not isinstance(value, list):
raise LoadingError('{name} {value!r} is not a list'.format(value=value, name=name))
for i, v in enumerate(value):
_check(v, '{0}[{1}]'.format(name, i + 1), element_types)
return value
class InstallableBlock(object):
def __init__(self, when, system, python):
self.when = when
self.system = system
self.python = python
def __repr__(self):
return 'InstallableBlock(when={when!r}, system={system!r}, python={python!r})'.format(
when=self.when,
system=self.system,
python=self.python,
)
@classmethod
def parse(cls, block):
_check(block, 'block', dict)
when = _check(block.get('when'), 'when', (str, bool), accept_none=True)
system = _check_list(block.get('system'), 'system', str, accept_none=True, none_result=[])
python = _check_list(block.get('python'), 'python', str, accept_none=True, none_result=[])
return cls(when, system, python)
class InstallableRequirement(object):
def __init__(self, name, blocks):
self.name = name
self.blocks = blocks
@classmethod
def parse(cls, requirement):
_check(requirement, 'requirement', dict)
name = _check(requirement.get('name'), 'name', str)
blocks = _check_list(requirement.get('blocks'), 'blocks', dict)
return cls(name, [InstallableBlock.parse(block) for block in blocks])
class InstallableRequirements(object):
def __init__(self, plugin_name, plugin_type, requirements):
self.plugin_name = plugin_name
self.plugin_type = plugin_type
self.requirements = requirements
@classmethod
def parse(cls, plugin_name, plugin_type, reqs):
return cls(
_check(plugin_name, 'plugin_name', str),
_check(plugin_type, 'plugin_type', str),
[InstallableRequirement.parse(req) for req in _check(reqs, 'requirements', list)]
)
def retrieve_plugin_dependencies(plugin_name, plugin_type):
loader = getattr(plugin_loader, '%s_loader' % plugin_type, None)
if loader is None:
raise UnknownPluginType(plugin_type)
try:
doc, dummy1, dummy2, dummy3 = get_plugin_docs(plugin_name, plugin_type, loader, fragment_loader, verbose=False)
except AnsiblePluginNotFound as e:
raise_from(UnknownPlugin(plugin_name), e)
except Exception as e:
raise_from(LoadingError('Error while loading documentation of {0} {1}: {2}'.format(plugin_type, plugin_name, e)), e)
if not doc:
raise LoadingError('Found no documentation of {0} {1}'.format(plugin_type, plugin_name))
reqs = doc.get('installable_requirements')
if not isinstance(reqs, list):
raise LoadingError('Found no installable requirements information for {0} {1}'.format(plugin_type, plugin_name))
try:
return InstallableRequirements.parse(plugin_name, plugin_type, reqs)
except LoadingError as e:
raise_from(LoadingError('Error while parsing installable requirements for {0} {1}: {2}'.format(plugin_type, plugin_name, e)), e)
_REQUIRED_FACTS = (
'architecture',
'distribution',
'distribution_major_version',
'distribution_release',
'distribution_version',
'os_family',
'pkg_mgr',
'python',
'python_version',
'selinux',
'selinux_python_present',
'service_mgr',
'system',
)
_ACCEPTABLE_FACTS = tuple(sorted(_REQUIRED_FACTS + (
'distribution_minor_version',
)))
def get_needed_facts():
return _REQUIRED_FACTS
def get_used_facts():
return _ACCEPTABLE_FACTS
class RequirementFinder(object):
@staticmethod
def _massage_facts(ansible_facts):
result = {}
for fact in _ACCEPTABLE_FACTS:
if fact in ansible_facts:
result[fact] = ansible_facts[fact]
return result
def __init__(self, templar, controller_ansible_facts, remote_ansible_facts):
self.templar = Templar(loader=templar._loader)
self.controller_ansible_facts = self._massage_facts(controller_ansible_facts)
self.remote_ansible_facts = self._massage_facts(remote_ansible_facts)
def _match_block(self, block, ansible_facts):
if block.when is None:
return True
if isinstance(block.when, bool):
return block.when
self.templar.available_variables = {'ansible_facts': ansible_facts}
expression = "{0}{1}{2}".format("{{", block.when, "}}")
kwargs = {
'fail_on_undefined': True,
'disable_lookups': True,
}
if _TEMPLAR_HAS_TEMPLATE_CACHE:
kwargs['cache'] = False
value = self.templar.template(expression, **kwargs)
if isinstance(value, bool):
return value
raise LoadingError('Expression evaluated to {value!r}, expected boolean'.format(value=value))
def _find(self, req, ansible_facts, plugin_name, plugin_type):
for block in req.blocks:
try:
if self._match_block(block, ansible_facts):
return Requirements(system=block.system, python=block.python)
except LoadingError:
raise
except Exception as e:
raise_from(LoadingError('Error while matching block {block!r} for {req!r} of {type} {plugin}: {exc}'.format(
block=block,
req=req.name,
type=plugin_type,
plugin=plugin_name,
exc=e,
)), e)
raise LoadingError('Cannot find matching block for "{req}" of {type} {plugin}'.format(
req=req.name,
type=plugin_type,
plugin=plugin_name,
))
def find(self, installable_requirements, modules_for_remote=True):
ansible_facts = (
self.remote_ansible_facts
if installable_requirements.plugin_type == 'module' and modules_for_remote
else self.controller_ansible_facts
)
result = Requirements()
for req in installable_requirements.requirements:
result.merge(self._find(
req,
ansible_facts,
installable_requirements.plugin_name,
installable_requirements.plugin_type,
))
return result