1
0
Fork 0
mirror of https://github.com/ansible-collections/community.general.git synced 2024-09-14 20:13:21 +02:00
community.general/lib/ansible/module_utils/junos.py
Ricardo Carrillo Cruz 64add28657 Platform agnostic net_system module (#24953)
* Platform agnostic net_system module

Also refactor the action network plugins for better code re-use

Still more refactoring to do once the connection plugin work is complete

* Replace importlib for imp

importlib is not available on 2.6, so we need to stick to imp

* Load action plugin via module metadata

* Better error message if no implementation is found

Now the plugin will show the module name and the network OS in the
error message

* Fix typo on documentation author line

* Fix pep8 issues

* Add missing options key on doc string and stringify version

* Return None in case module has no metadata

* Read module metadata only if it's a python module

Check for module suffix, if it's .py then read metadata.
Otherwise this fails on non-python modules, like Windows PS for example.

* Read metadata variable only if it's a python module

Fix referencing a variable before assignment

* Add action_handler to validate_modules metadata schema

* Pull metadata with plugin_docs get_docstring

Using load_source from PluginLoader is troublesome, it is not guaranteed
a module may be importable at the controller, e.g. if a module depends
on module_utils functions it won't work, because module_utils is not
in the sys path.
Rather than putting that module dependencies introspection, just
use plain parsing like plugin_docs get_docstring does as we only care
about reading ANSIBLE_METADATA.

* Add platform agnostic group of groups for integration tests

This will be the target for platform agnostic integration tests.

* Add integration tests for net_system

* Switch to action plugin inheritance from metadata driven action handler

As the metadata action driven action handler work is being worked on
on its standalone proposal+PR, let's just go back to have one
action handler per platform agnostic module.
Those action plugins will inherit from net_base.

* Add blank line to fix pep8

* Add aliases file to net_system integration test

This will avoid CI failure

* Fix integration tests for net_system

* Give more precedence to task network_os over inventory network_os
2017-06-02 14:06:38 +02:00

215 lines
7 KiB
Python

#
# (c) 2017 Red Hat, Inc.
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
#
from contextlib import contextmanager
from xml.etree.ElementTree import Element, SubElement, fromstring
from ansible.module_utils.basic import env_fallback, return_values
from ansible.module_utils.netconf import send_request, children
from ansible.module_utils.netconf import discard_changes, validate
from ansible.module_utils.six import string_types
from ansible.module_utils._text import to_text
ACTIONS = frozenset(['merge', 'override', 'replace', 'update', 'set'])
JSON_ACTIONS = frozenset(['merge', 'override', 'update'])
FORMATS = frozenset(['xml', 'text', 'json'])
CONFIG_FORMATS = frozenset(['xml', 'text', 'json', 'set'])
junos_argument_spec = {
'host': dict(),
'port': dict(type='int'),
'username': dict(fallback=(env_fallback, ['ANSIBLE_NET_USERNAME'])),
'password': dict(fallback=(env_fallback, ['ANSIBLE_NET_PASSWORD']), no_log=True),
'ssh_keyfile': dict(fallback=(env_fallback, ['ANSIBLE_NET_SSH_KEYFILE']), type='path'),
'timeout': dict(type='int'),
'provider': dict(type='dict'),
'transport': dict()
}
# Add argument's default value here
ARGS_DEFAULT_VALUE = {
'timeout': 10
}
def get_argspec():
return junos_argument_spec
def check_args(module, warnings):
provider = module.params['provider'] or {}
for key in junos_argument_spec:
if key not in ('provider',) and module.params[key]:
warnings.append('argument %s has been deprecated and will be '
'removed in a future version' % key)
# set argument's default value if not provided in input
# This is done to avoid unwanted argument deprecation warning
# in case argument is not given as input (outside provider).
for key in ARGS_DEFAULT_VALUE:
if not module.params.get(key, None):
module.params[key] = ARGS_DEFAULT_VALUE[key]
if provider:
for param in ('password',):
if provider.get(param):
module.no_log_values.update(return_values(provider[param]))
def _validate_rollback_id(module, value):
try:
if not 0 <= int(value) <= 49:
raise ValueError
except ValueError:
module.fail_json(msg='rollback must be between 0 and 49')
def load_configuration(module, candidate=None, action='merge', rollback=None, format='xml'):
if all((candidate is None, rollback is None)):
module.fail_json(msg='one of candidate or rollback must be specified')
elif all((candidate is not None, rollback is not None)):
module.fail_json(msg='candidate and rollback are mutually exclusive')
if format not in FORMATS:
module.fail_json(msg='invalid format specified')
if format == 'json' and action not in JSON_ACTIONS:
module.fail_json(msg='invalid action for format json')
elif format in ('text', 'xml') and action not in ACTIONS:
module.fail_json(msg='invalid action format %s' % format)
if action == 'set' and not format == 'text':
module.fail_json(msg='format must be text when action is set')
if rollback is not None:
_validate_rollback_id(module, rollback)
xattrs = {'rollback': str(rollback)}
else:
xattrs = {'action': action, 'format': format}
obj = Element('load-configuration', xattrs)
if candidate is not None:
lookup = {'xml': 'configuration', 'text': 'configuration-text',
'set': 'configuration-set', 'json': 'configuration-json'}
if action == 'set':
cfg = SubElement(obj, 'configuration-set')
else:
cfg = SubElement(obj, lookup[format])
if isinstance(candidate, string_types):
if format == 'xml':
cfg.append(fromstring(candidate))
else:
cfg.text = to_text(candidate, encoding='latin1')
else:
cfg.append(candidate)
return send_request(module, obj)
def get_configuration(module, compare=False, format='xml', rollback='0'):
if format not in CONFIG_FORMATS:
module.fail_json(msg='invalid config format specified')
xattrs = {'format': format}
if compare:
_validate_rollback_id(module, rollback)
xattrs['compare'] = 'rollback'
xattrs['rollback'] = str(rollback)
return send_request(module, Element('get-configuration', xattrs))
def commit_configuration(module, confirm=False, check=False, comment=None, confirm_timeout=None):
obj = Element('commit-configuration')
if confirm:
SubElement(obj, 'confirmed')
if check:
SubElement(obj, 'check')
if comment:
subele = SubElement(obj, 'log')
subele.text = str(comment)
if confirm_timeout:
subele = SubElement(obj, 'confirm-timeout')
subele.text = str(confirm_timeout)
return send_request(module, obj)
def command(module, command, format='text', rpc_only=False):
xattrs = {'format': format}
if rpc_only:
command += ' | display xml rpc'
xattrs['format'] = 'text'
return send_request(module, Element('command', xattrs, text=command))
def lock_configuration(x):
return send_request(x, Element('lock-configuration'))
def unlock_configuration(x):
return send_request(x, Element('unlock-configuration'))
@contextmanager
def locked_config(module):
try:
lock_configuration(module)
yield
finally:
unlock_configuration(module)
def get_diff(module):
reply = get_configuration(module, compare=True, format='text')
output = reply.find('.//configuration-output')
if output is not None:
return to_text(output.text, encoding='latin1').strip()
def load_config(module, candidate, warnings, action='merge', commit=False, format='xml',
comment=None, confirm=False, confirm_timeout=None):
if not candidate:
return
with locked_config(module):
if isinstance(candidate, list):
candidate = '\n'.join(candidate)
reply = load_configuration(module, candidate, action=action, format=format)
if isinstance(reply, list):
warnings.extend(reply)
validate(module)
diff = get_diff(module)
if diff:
if commit:
commit_configuration(module, confirm=confirm, comment=comment,
confirm_timeout=confirm_timeout)
else:
discard_changes(module)
return diff
def get_param(module, key):
return module.params[key] or module.params['provider'].get(key)