1
0
Fork 0
mirror of https://github.com/ansible-collections/community.general.git synced 2024-09-14 20:13:21 +02:00

updates ios modules to support persistent socket (#21258)

* updates all ios modules to support persistent socket
* adds ios action plugin to connect to device
* adds exec_command() to ios shared module
* fixes ios_config and ios_template local action
* update all unit test cases
* adds base test module for ios module testing
This commit is contained in:
Peter Sprygada 2017-02-13 10:22:14 -05:00 committed by John R Barker
parent 8bf69411d9
commit 7f1c43e597
19 changed files with 493 additions and 737 deletions

View file

@ -25,10 +25,31 @@
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
# USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
from ansible.module_utils.basic import env_fallback
from ansible.module_utils.network_common import to_list
from ansible.module_utils.connection import exec_command
_DEVICE_CONFIGS = {}
ios_argument_spec = {
'host': dict(),
'port': dict(type='int'),
'username': dict(fallback=(env_fallback, ['ANSIBLE_NET_USERNAME'])),
'password': dict(fallback=(env_fallback, ['ANSIBLE_NET_PASSWORD']), no_log=True),
'ssh_keyfile': dict(fallback=(env_fallback, ['ANSIBLE_NET_SSH_KEYFILE']), type='path'),
'authorize': dict(fallback=(env_fallback, ['ANSIBLE_NET_AUTHORIZE']), type='bool'),
'auth_pass': dict(fallback=(env_fallback, ['ANSIBLE_NET_AUTH_PASS']), no_log=True),
'timeout': dict(type='int', default=10),
'provider': dict(type='dict'),
}
def check_args(module, warnings):
provider = module.params['provider'] or {}
for key in ios_argument_spec:
if key != 'provider' and module.params[key]:
warnings.append('argument %s has been deprecated and will be '
'removed in a future version' % key)
def get_config(module, flags=[]):
cmd = 'show running-config '
cmd += ' '.join(flags)
@ -37,7 +58,7 @@ def get_config(module, flags=[]):
try:
return _DEVICE_CONFIGS[cmd]
except KeyError:
rc, out, err = module.exec_command(cmd)
rc, out, err = exec_command(module, cmd)
if rc != 0:
module.fail_json(msg='unable to retrieve current config', stderr=err)
cfg = str(out).strip()
@ -48,24 +69,23 @@ def run_commands(module, commands, check_rc=True):
responses = list()
for cmd in to_list(commands):
cmd = module.jsonify(cmd)
rc, out, err = module.exec_command(cmd)
rc, out, err = exec_command(module, cmd)
if check_rc and rc != 0:
module.fail_json(msg=err, rc=rc)
responses.append(out)
return responses
def load_config(module, commands):
assert isinstance(commands, list), 'commands must be a list'
rc, out, err = module.exec_command('configure terminal')
rc, out, err = exec_command(module, 'configure terminal')
if rc != 0:
module.fail_json(msg='unable to enter configuration mode', err=err)
for command in commands:
for command in to_list(commands):
if command == 'end':
continue
rc, out, err = module.exec_command(command)
rc, out, err = exec_command(module, command)
if rc != 0:
module.fail_json(msg=err, command=command, rc=rc)
module.exec_command('end')
exec_command(module, 'end')

View file

@ -1,156 +0,0 @@
#
# This code is part of Ansible, but is an independent component.
#
# This particular file snippet, and this file snippet only, is BSD licensed.
# Modules you write using this snippet, which is embedded dynamically by Ansible
# still belong to the author of the module, and may assign their own license
# to the complete work.
#
# (c) 2017 Red Hat, Inc.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
# IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
# USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
import re
from ansible.module_utils.shell import CliBase
from ansible.module_utils.basic import env_fallback, get_exception
from ansible.module_utils.network_common import to_list
from ansible.module_utils.netcli import Command
from ansible.module_utils.six import iteritems
from ansible.module_utils.network import NetworkError
_DEVICE_CONFIGS = {}
_DEVICE_CONNECTION = None
ios_cli_argument_spec = {
'host': dict(),
'port': dict(type='int'),
'username': dict(fallback=(env_fallback, ['ANSIBLE_NET_USERNAME'])),
'password': dict(fallback=(env_fallback, ['ANSIBLE_NET_PASSWORD']), no_log=True),
'authorize': dict(fallback=(env_fallback, ['ANSIBLE_NET_AUTHORIZE']), type='bool'),
'auth_pass': dict(fallback=(env_fallback, ['ANSIBLE_NET_AUTH_PASS']), no_log=True),
'timeout': dict(type='int', default=10),
'provider': dict(type='dict'),
}
def check_args(module):
provider = module.params['provider'] or {}
for key in ('host', 'username', 'password'):
if not module.params[key] and not provider.get(key):
module.fail_json(msg='missing required argument %s' % key)
class Cli(CliBase):
CLI_PROMPTS_RE = [
re.compile(r"[\r\n]?[\w+\-\.:\/\[\]]+(?:\([^\)]+\)){,3}(?:>|#) ?$"),
re.compile(r"\[\w+\@[\w\-\.]+(?: [^\]])\] ?[>#\$] ?$")
]
CLI_ERRORS_RE = [
re.compile(r"% ?Error"),
re.compile(r"% ?Bad secret"),
re.compile(r"invalid input", re.I),
re.compile(r"(?:incomplete|ambiguous) command", re.I),
re.compile(r"connection timed out", re.I),
re.compile(r"[^\r\n]+ not found", re.I),
]
def __init__(self, module):
self._module = module
super(Cli, self).__init__()
provider = module.params.get('provider') or dict()
for key, value in iteritems(provider):
if key in ios_cli_argument_spec:
if module.params.get(key) is None and value is not None:
module.params[key] = value
try:
self.connect()
except NetworkError:
exc = get_exception()
self._module.fail_json(msg=str(exc))
if module.params['authorize']:
self.authorize()
def connect(self):
super(Cli, self).connect(self._module.params, kickstart=False)
self.exec_command('terminal length 0')
def authorize(self):
passwd = self._module.params['auth_pass']
if passwd:
prompt = "[\r\n]?Password: $"
self.exec_command(dict(command='enable', prompt=prompt, response=passwd))
else:
self.exec_command('enable')
def connection(module):
global _DEVICE_CONNECTION
if not _DEVICE_CONNECTION:
cli = Cli(module)
_DEVICE_CONNECTION = cli
return _DEVICE_CONNECTION
def get_config(module, flags=[]):
cmd = 'show running-config '
cmd += ' '.join(flags)
cmd = cmd.strip()
try:
return _DEVICE_CONFIGS[cmd]
except KeyError:
conn = connection(module)
out = conn.exec_command(cmd)
cfg = str(out).strip()
_DEVICE_CONFIGS[cmd] = cfg
return cfg
def run_commands(module, commands, check_rc=True):
responses = list()
conn = connection(module)
for cmd in to_list(commands):
rc, out, err = conn.exec_command(cmd)
if check_rc and rc != 0:
module.fail_json(msg=err, rc=rc)
responses.append(out)
return responses
def load_config(module, commands):
conn = connection(module)
rc, out, err = conn.exec_command('configure terminal')
if rc != 0:
module.fail_json(msg='unable to enter configuration mode', err=err)
for command in to_list(commands):
if command == 'end':
continue
rc, out, err = conn.exec_command(command)
if rc != 0:
module.fail_json(msg=err, command=command, rc=rc)
conn.exec_command('end')

View file

@ -35,7 +35,6 @@ description:
commands that are not already configured. The config source can
be a set of commands or a template.
deprecated: Deprecated in 2.2. Use M(ios_config) instead.
extends_documentation_fragment: ios
options:
src:
description:
@ -108,52 +107,15 @@ updates:
returned: always
type: list
sample: ['...', '...']
start:
description: The time the job started
returned: always
type: str
sample: "2016-11-16 10:38:15.126146"
end:
description: The time the job ended
returned: always
type: str
sample: "2016-11-16 10:38:25.595612"
delta:
description: The time elapsed to perform all operations
returned: always
type: str
sample: "0:00:10.469466"
"""
from functools import partial
from ansible.module_utils import ios
from ansible.module_utils import ios_cli
from ansible.module_utils.ios import load_config, get_config
from ansible.module_utils.ios import ios_argument_spec, check_args
from ansible.module_utils.basic import AnsibleModule
from ansible.module_utils.local import LocalAnsibleModule
from ansible.module_utils.network_common import ComplexList
from ansible.module_utils.netcli import Conditional
from ansible.module_utils.six import string_types
from ansible.module_utils.netcfg import NetworkConfig, dumps
SHARED_LIB = 'ios'
def get_ansible_module():
if SHARED_LIB == 'ios':
return LocalAnsibleModule
return AnsibleModule
def invoke(name, *args, **kwargs):
obj = globals().get(SHARED_LIB)
func = getattr(obj, name)
return func(*args, **kwargs)
load_config = partial(invoke, 'load_config')
get_config = partial(invoke, 'get_config')
def check_args(module, warnings):
if SHARED_LIB == 'ios_cli':
ios_cli.check_args(module)
def get_current_config(module):
if module.params['config']:
return module.params['config']
@ -174,23 +136,20 @@ def main():
config=dict(),
)
argument_spec.update(ios_cli.ios_cli_argument_spec)
argument_spec.update(ios_argument_spec)
mutually_exclusive = [('config', 'backup'), ('config', 'force')]
cls = get_ansible_module()
module = cls(argument_spec=argument_spec,
mutually_exclusive=mutually_exclusive,
supports_check_mode=True)
warnings = list()
check_args(module, warnings)
module = AnsibleModule(argument_spec=argument_spec,
mutually_exclusive=mutually_exclusive,
supports_check_mode=True)
candidate = NetworkConfig(contents=module.params['src'], indent=1)
result = {'changed': False}
if warnings:
result['warnings'] = warnings
warnings = list()
check_args(module, warnings)
result['warnings'] = warnings
if module.params['backup']:
result['__backup__'] = get_config(module=module)
@ -210,9 +169,9 @@ def main():
result['changed'] = True
result['updates'] = commands
result['commands'] = commands
module.exit_json(**result)
if __name__ == '__main__':
SHARED_LIB = 'ios_cli'
main()

View file

@ -35,7 +35,6 @@ description:
before returning or timing out if the condition is not met.
- This module does not support running commands in configuration mode.
Please use M(ios_config) to configure IOS devices.
extends_documentation_fragment: ios
options:
commands:
description:
@ -129,52 +128,16 @@ failed_conditions:
returned: failed
type: list
sample: ['...', '...']
start:
description: The time the job started
returned: always
type: str
sample: "2016-11-16 10:38:15.126146"
end:
description: The time the job ended
returned: always
type: str
sample: "2016-11-16 10:38:25.595612"
delta:
description: The time elapsed to perform all operations
returned: always
type: str
sample: "0:00:10.469466"
"""
import time
from functools import partial
from ansible.module_utils import ios
from ansible.module_utils import ios_cli
from ansible.module_utils.ios import run_commands
from ansible.module_utils.ios import ios_argument_spec, check_args
from ansible.module_utils.basic import AnsibleModule
from ansible.module_utils.local import LocalAnsibleModule
from ansible.module_utils.network_common import ComplexList
from ansible.module_utils.netcli import Conditional
from ansible.module_utils.six import string_types
SHARED_LIB = 'ios'
def get_ansible_module():
if SHARED_LIB == 'ios':
return LocalAnsibleModule
return AnsibleModule
def invoke(name, *args, **kwargs):
obj = globals().get(SHARED_LIB)
func = getattr(obj, name)
return func(*args, **kwargs)
run_commands = partial(invoke, 'run_commands')
def check_args(module, warnings):
if SHARED_LIB == 'ios_cli':
ios_cli.check_args(module)
def to_lines(stdout):
for item in stdout:
if isinstance(item, string_types):
@ -215,17 +178,17 @@ def main():
interval=dict(default=1, type='int')
)
argument_spec.update(ios_cli.ios_cli_argument_spec)
argument_spec.update(ios_argument_spec)
cls = get_ansible_module()
module = cls(argument_spec=argument_spec, supports_check_mode=True)
warnings = list()
check_args(module, warnings)
module = AnsibleModule(argument_spec=argument_spec,
supports_check_mode=True)
result = {'changed': False}
warnings = list()
check_args(module, warnings)
commands = parse_commands(module, warnings)
result['warnings'] = warnings
wait_for = module.params['wait_for'] or list()
conditionals = [Conditional(c) for c in wait_for]
@ -259,7 +222,6 @@ def main():
result = {
'changed': False,
'stdout': responses,
'warnings': warnings,
'stdout_lines': list(to_lines(responses))
}
@ -267,5 +229,4 @@ def main():
if __name__ == '__main__':
SHARED_LIB = 'ios_cli'
main()

View file

@ -33,7 +33,6 @@ description:
for segmenting configuration into sections. This module provides
an implementation for working with IOS configuration sections in
a deterministic way.
extends_documentation_fragment: ios
options:
lines:
description:
@ -202,31 +201,14 @@ backup_path:
returned: when backup is yes
type: path
sample: /playbooks/ansible/backup/ios_config.2016-07-16@22:28:34
start:
description: The time the job started
returned: always
type: str
sample: "2016-11-16 10:38:15.126146"
end:
description: The time the job ended
returned: always
type: str
sample: "2016-11-16 10:38:25.595612"
delta:
description: The time elapsed to perform all operations
returned: always
type: str
sample: "0:00:10.469466"
"""
import re
import time
from functools import partial
from ansible.module_utils import ios
from ansible.module_utils import ios_cli
from ansible.module_utils.ios import run_commands, get_config, load_config
from ansible.module_utils.ios import ios_argument_spec
from ansible.module_utils.ios import check_args as ios_check_args
from ansible.module_utils.basic import AnsibleModule
from ansible.module_utils.local import LocalAnsibleModule
from ansible.module_utils.network_common import ComplexList
from ansible.module_utils.netcli import Conditional
from ansible.module_utils.six import string_types
@ -234,26 +216,8 @@ from ansible.module_utils.netcfg import NetworkConfig, dumps
from ansible.module_utils.six import iteritems
SHARED_LIB = 'ios'
def get_ansible_module():
if SHARED_LIB == 'ios':
return LocalAnsibleModule
return AnsibleModule
def invoke(name, *args, **kwargs):
obj = globals().get(SHARED_LIB)
func = getattr(obj, name)
return func(*args, **kwargs)
run_commands = partial(invoke, 'run_commands')
load_config = partial(invoke, 'load_config')
get_config = partial(invoke, 'get_config')
def check_args(module, warnings):
if SHARED_LIB == 'ios_cli':
ios_cli.check_args(module)
ios_check_args(module, warnings)
if module.params['multiline_delimiter']:
if len(module.params['multiline_delimiter']) != 1:
module.fail_json(msg='multiline_delimiter value can only be a '
@ -350,7 +314,7 @@ def main():
save=dict(type='bool', default=False),
)
argument_spec.update(ios_cli.ios_cli_argument_spec)
argument_spec.update(ios_argument_spec)
mutually_exclusive = [('lines', 'src')]
@ -358,22 +322,19 @@ def main():
('match', 'exact', ['lines']),
('replace', 'block', ['lines'])]
cls = get_ansible_module()
module = cls(argument_spec=argument_spec,
mutually_exclusive=mutually_exclusive,
required_if=required_if,
supports_check_mode=True)
warnings = list()
check_args(module, warnings)
module = AnsibleModule(argument_spec=argument_spec,
mutually_exclusive=mutually_exclusive,
required_if=required_if,
supports_check_mode=True)
if module.params['force'] is True:
module.params['match'] = 'none'
result = {'changed': False}
warnings = list()
check_args(module, warnings)
result = {'changed': False, 'warnings': warnings}
result['warnings'] = warnings
if any((module.params['lines'], module.params['src'])):
match = module.params['match']
@ -403,7 +364,7 @@ def main():
if module.params['after']:
commands.extend(module.params['after'])
result['updates'] = commands
result['commands'] = commands
result['banners'] = banners
# send the configuration commands to the device and merge
@ -428,5 +389,4 @@ def main():
if __name__ == '__main__':
SHARED_LIB = 'ios_cli'
main()

View file

@ -33,7 +33,6 @@ description:
base network fact keys with C(ansible_net_<fact>). The facts
module will always collect a base set of facts from the device
and can enable or disable collection of additional facts.
extends_documentation_fragment: ios
options:
gather_subset:
description:
@ -143,33 +142,12 @@ ansible_net_neighbors:
"""
import re
from functools import partial
from ansible.module_utils import ios
from ansible.module_utils import ios_cli
from ansible.module_utils.ios import run_commands
from ansible.module_utils.ios import ios_argument_spec, check_args
from ansible.module_utils.basic import AnsibleModule
from ansible.module_utils.local import LocalAnsibleModule
from ansible.module_utils.six import iteritems
from ansible.module_utils.six.moves import zip
SHARED_LIB = 'ios'
def get_ansible_module():
if SHARED_LIB == 'ios':
return LocalAnsibleModule
return AnsibleModule
def invoke(name, *args, **kwargs):
obj = globals().get(SHARED_LIB)
func = getattr(obj, name)
return func(*args, **kwargs)
run_commands = partial(invoke, 'run_commands')
def check_args(module, warnings):
if SHARED_LIB == 'ios_cli':
ios_cli.check_args(module)
class FactsBase(object):
@ -443,13 +421,10 @@ def main():
gather_subset=dict(default=['!config'], type='list')
)
argument_spec.update(ios_cli.ios_cli_argument_spec)
argument_spec.update(ios_argument_spec)
cls = get_ansible_module()
module = cls(argument_spec=argument_spec, supports_check_mode=True)
warnings = list()
check_args(module, warnings)
module = AnsibleModule(argument_spec=argument_spec,
supports_check_mode=True)
gather_subset = module.params['gather_subset']
@ -500,9 +475,11 @@ def main():
key = 'ansible_net_%s' % key
ansible_facts[key] = value
module.exit_json(ansible_facts=ansible_facts)
warnings = list()
check_args(module, warnings)
module.exit_json(ansible_facts=ansible_facts, warnings=warnings)
if __name__ == '__main__':
SHARED_LIB = 'ios_cli'
main()

View file

@ -126,26 +126,12 @@ commands:
sample:
- hostname ios01
- ip domain name eng.ansible.com
start:
description: The time the job started
returned: always
type: str
sample: "2016-11-16 10:38:15.126146"
end:
description: The time the job ended
returned: always
type: str
sample: "2016-11-16 10:38:25.595612"
delta:
description: The time elapsed to perform all operations
returned: always
type: str
sample: "0:00:10.469466"
"""
import re
from ansible.module_utils.local import LocalAnsibleModule
from ansible.module_utils.basic import AnsibleModule
from ansible.module_utils.ios import get_config, load_config
from ansible.module_utils.ios import ios_argument_spec, check_args
from ansible.module_utils.network_common import ComplexList
_CONFIGURED_VRFS = None
@ -364,11 +350,17 @@ def main():
state=dict(choices=['present', 'absent'], default='present')
)
module = LocalAnsibleModule(argument_spec=argument_spec,
supports_check_mode=True)
argument_spec.update(ios_argument_spec)
module = AnsibleModule(argument_spec=argument_spec,
supports_check_mode=True)
result = {'changed': False}
warnings = list()
check_args(module, warnings)
result['warnings'] = warnings
want = map_params_to_obj(module)
have = map_config_to_obj(module)

View file

@ -142,8 +142,9 @@ import re
from functools import partial
from ansible.module_utils.local import LocalAnsibleModule
from ansible.module_utils.basic import AnsibleModule
from ansible.module_utils.ios import load_config, get_config
from ansible.module_utils.ios import ios_argument_spec, check_args
from ansible.module_utils.netcfg import NetworkConfig
from ansible.module_utils.six import iteritems
@ -327,14 +328,20 @@ def main():
state=dict(default='present', choices=['present', 'absent'])
)
argument_spec.update(ios_argument_spec)
mutually_exclusive = [('name', 'vrfs')]
module = LocalAnsibleModule(argument_spec=argument_spec,
mutually_exclusive=mutually_exclusive,
supports_check_mode=True)
module = AnsibleModule(argument_spec=argument_spec,
mutually_exclusive=mutually_exclusive,
supports_check_mode=True)
result = {'changed': False}
warnings = list()
check_args(module, warnings)
result['warnings'] = warnings
want = map_params_to_obj(module)
have = map_config_to_obj(module)

View file

@ -0,0 +1,96 @@
#
# (c) 2016 Red Hat Inc.
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
#
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import os
import sys
import copy
from ansible.plugins.action.normal import ActionModule as _ActionModule
from ansible.utils.path import unfrackpath
from ansible.plugins import connection_loader
from ansible.compat.six import iteritems
from ansible.module_utils.ios import ios_argument_spec
from ansible.module_utils.basic import AnsibleFallbackNotFound
from ansible.module_utils._text import to_bytes
class ActionModule(_ActionModule):
def run(self, tmp=None, task_vars=None):
self.load_provider()
provider = self._task.args['provider']
pc = copy.deepcopy(self._play_context)
pc.connection = 'network_cli'
pc.port = provider['port'] or self._play_context.port
pc.remote_user = provider['username'] or self._play_context.connection_user
pc.password = provider['password'] or self._play_context.password
pc.become = provider['authorize'] or False
pc.become_pass = provider['auth_pass']
socket_path = self._get_socket_path(pc)
if not os.path.exists(socket_path):
# start the connection if it isn't started
connection = self._shared_loader_obj.connection_loader.get('persistent', pc, sys.stdin)
connection.exec_command('EXEC: show version')
task_vars['ansible_socket'] = socket_path
if self._play_context.become_method == 'enable':
self._play_context.become = False
self._play_context.become_method = None
return super(ActionModule, self).run(tmp, task_vars)
def _get_socket_path(self, play_context):
ssh = connection_loader.get('ssh', class_only=True)
cp = ssh._create_control_path(play_context.remote_addr, play_context.port, play_context.remote_user)
path = unfrackpath("$HOME/.ansible/pc")
return cp % dict(directory=path)
def load_provider(self):
provider = self._task.args.get('provider', {})
for key, value in iteritems(ios_argument_spec):
if key in self._task.args:
provider[key] = self._task.args[key]
elif 'fallback' in value:
provider[key] = self._fallback(value['fallback'])
elif key not in provider:
provider[key] = None
self._task.args['provider'] = provider
def _fallback(self, fallback):
strategy = fallback[0]
args = []
kwargs = {}
for item in fallback[1:]:
if isinstance(item, dict):
kwargs = item
else:
args = item
try:
return strategy(*args, **kwargs)
except AnsibleFallbackNotFound:
pass

View file

@ -1,5 +1,5 @@
#
# Copyright 2015 Peter Sprygada <psprygada@ansible.com>
# (c) 2017, Red Hat, Inc.
#
# This file is part of Ansible
#
@ -19,9 +19,94 @@
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
from ansible.plugins.action.net_config import ActionModule as _ActionModule
import os
import re
import time
import glob
from ansible.plugins.action.ios import ActionModule as _ActionModule
from ansible.module_utils._text import to_text
from ansible.module_utils.six.moves.urllib.parse import urlsplit
from ansible.utils.vars import merge_hash
PRIVATE_KEYS_RE = re.compile('__.+__')
class ActionModule(_ActionModule):
pass
def run(self, tmp=None, task_vars=None):
if self._task.args.get('src'):
try:
self._handle_template()
except ValueError as exc:
return dict(failed=True, msg=exc.message)
result = super(ActionModule, self).run(tmp, task_vars)
if self._task.args.get('backup') and result.get('__backup__'):
# User requested backup and no error occurred in module.
# NOTE: If there is a parameter error, _backup key may not be in results.
filepath = self._write_backup(task_vars['inventory_hostname'],
result['__backup__'])
result['backup_path'] = filepath
# strip out any keys that have two leading and two trailing
# underscore characters
for key in result.keys():
if PRIVATE_KEYS_RE.match(key):
del result[key]
return result
def _get_working_path(self):
cwd = self._loader.get_basedir()
if self._task._role is not None:
cwd = self._task._role._role_path
return cwd
def _write_backup(self, host, contents):
backup_path = self._get_working_path() + '/backup'
if not os.path.exists(backup_path):
os.mkdir(backup_path)
for fn in glob.glob('%s/%s*' % (backup_path, host)):
os.remove(fn)
tstamp = time.strftime("%Y-%m-%d@%H:%M:%S", time.localtime(time.time()))
filename = '%s/%s_config.%s' % (backup_path, host, tstamp)
open(filename, 'w').write(contents)
return filename
def _handle_template(self):
src = self._task.args.get('src')
working_path = self._get_working_path()
if os.path.isabs(src) or urlsplit('src').scheme:
source = src
else:
source = self._loader.path_dwim_relative(working_path, 'templates', src)
if not source:
source = self._loader.path_dwim_relative(working_path, src)
if not os.path.exists(source):
raise ValueError('path specified in src not found')
try:
with open(source, 'r') as f:
template_data = to_text(f.read())
except IOError:
return dict(failed=True, msg='unable to load src file')
# Create a template search path in the following order:
# [working_path, self_role_path, dependent_role_paths, dirname(source)]
searchpath = [working_path]
if self._task._role is not None:
searchpath.append(self._task._role._role_path)
if hasattr(self._task, "_block:"):
dep_chain = self._task._block.get_dep_chain()
if dep_chain is not None:
for role in dep_chain:
searchpath.append(role._role_path)
searchpath.append(os.path.dirname(source))
self._templar.environment.loader.searchpath = searchpath
self._task.args['src'] = self._templar.template(template_data)

View file

@ -1,5 +1,5 @@
#
# (c) 2016 Red Hat Inc.
# Copyright 2015 Peter Sprygada <psprygada@ansible.com>
#
# This file is part of Ansible
#
@ -19,7 +19,84 @@
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
from ansible.plugins.action.net_template import ActionModule as _ActionModule
import os
import time
import glob
import urlparse
from ansible.module_utils._text import to_text
from ansible.plugins.action.ios import ActionModule as _ActionModule
class ActionModule(_ActionModule):
pass
def run(self, tmp=None, task_vars=None):
try:
self._handle_template()
except (ValueError, AttributeError) as exc:
return dict(failed=True, msg=exc.message)
result = super(ActionModule, self).run(tmp, task_vars)
if self._task.args.get('backup') and result.get('__backup__'):
# User requested backup and no error occurred in module.
# NOTE: If there is a parameter error, __backup__ key may not be in results.
self._write_backup(task_vars['inventory_hostname'], result['__backup__'])
if '__backup__' in result:
del result['__backup__']
return result
def _get_working_path(self):
cwd = self._loader.get_basedir()
if self._task._role is not None:
cwd = self._task._role._role_path
return cwd
def _write_backup(self, host, contents):
backup_path = self._get_working_path() + '/backup'
if not os.path.exists(backup_path):
os.mkdir(backup_path)
for fn in glob.glob('%s/%s*' % (backup_path, host)):
os.remove(fn)
tstamp = time.strftime("%Y-%m-%d@%H:%M:%S", time.localtime(time.time()))
filename = '%s/%s_config.%s' % (backup_path, host, tstamp)
open(filename, 'w').write(contents)
def _handle_template(self):
src = self._task.args.get('src')
if not src:
raise ValueError('missing required arguments: src')
working_path = self._get_working_path()
if os.path.isabs(src) or urlparse.urlsplit(src).scheme:
source = src
else:
source = self._loader.path_dwim_relative(working_path, 'templates', src)
if not source:
source = self._loader.path_dwim_relative(working_path, src)
if not os.path.exists(source):
return
try:
with open(source, 'r') as f:
template_data = to_text(f.read())
except IOError:
return dict(failed=True, msg='unable to load src file')
# Create a template search path in the following order:
# [working_path, self_role_path, dependent_role_paths, dirname(source)]
searchpath = [working_path]
if self._task._role is not None:
searchpath.append(self._task._role._role_path)
if hasattr(self._task, "_block:"):
dep_chain = self._task._block.get_dep_chain()
if dep_chain is not None:
for role in dep_chain:
searchpath.append(role._role_path)
searchpath.append(os.path.dirname(source))
self._templar.environment.loader.searchpath = searchpath
self._task.args['src'] = self._templar.template(template_data)

View file

@ -1,92 +0,0 @@
#
# (c) 2015, Peter Sprygada <psprygada@ansible.com>
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
class ModuleDocFragment(object):
# Standard files documentation fragment
DOCUMENTATION = """
options:
host:
description:
- Specifies the DNS host name or address for connecting to the remote
device over the specified transport. The value of host is used as
the destination address for the transport.
required: true
port:
description:
- Specifies the port to use when building the connection to the remote.
device.
required: false
default: 22
username:
description:
- Configures the username to use to authenticate the connection to
the remote device. This value is used to authenticate
the SSH session. If the value is not specified in the task, the
value of environment variable C(ANSIBLE_NET_USERNAME) will be used instead.
required: false
password:
description:
- Specifies the password to use to authenticate the connection to
the remote device. This value is used to authenticate
the SSH session. If the value is not specified in the task, the
value of environment variable C(ANSIBLE_NET_PASSWORD) will be used instead.
required: false
default: null
timeout:
description:
- Specifies the timeout in seconds for communicating with the network device
for either connecting or sending commands. If the timeout is
exceeded before the operation is completed, the module will error.
require: false
default: 10
ssh_keyfile:
description:
- Specifies the SSH key to use to authenticate the connection to
the remote device. This value is the path to the
key used to authenticate the SSH session. If the value is not specified
in the task, the value of environment variable C(ANSIBLE_NET_SSH_KEYFILE)
will be used instead.
required: false
authorize:
description:
- Instructs the module to enter privileged mode on the remote device
before sending any commands. If not specified, the device will
attempt to execute all commands in non-privileged mode. If the value
is not specified in the task, the value of environment variable
C(ANSIBLE_NET_AUTHORIZE) will be used instead.
required: false
default: no
choices: ['yes', 'no']
auth_pass:
description:
- Specifies the password to use if required to enter privileged mode
on the remote device. If I(authorize) is false, then this argument
does nothing. If the value is not specified in the task, the value of
environment variable C(ANSIBLE_NET_AUTH_PASS) will be used instead.
required: false
default: none
provider:
description:
- Convenience method that allows all I(ios) arguments to be passed as
a dict object. All constraints (required, choices, etc) must be
met either by individual arguments or values in this dict.
required: false
default: null
"""

View file

@ -73,7 +73,7 @@ class ManageNetworkCI(object):
'ansible_host=%s' % self.core_ci.connection.hostname,
'ansible_user=%s' % self.core_ci.connection.username,
'ansible_port=%s' % self.core_ci.connection.port,
'ansible_connection=network_cli',
'ansible_connection=local',
'ansible_ssh_private_key_file=%s' % self.core_ci.ssh_key.key,
'ansible_network_os=%s' % self.core_ci.platform,
]

View file

@ -0,0 +1,113 @@
# (c) 2016 Red Hat Inc.
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
# Make coding more python3-ish
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import os
import json
from ansible.compat.tests import unittest
from ansible.compat.tests.mock import patch
from ansible.module_utils import basic
from ansible.module_utils._text import to_bytes
def set_module_args(args):
args = json.dumps({'ANSIBLE_MODULE_ARGS': args})
basic._ANSIBLE_ARGS = to_bytes(args)
fixture_path = os.path.join(os.path.dirname(__file__), 'fixtures')
fixture_data = {}
def load_fixture(name):
path = os.path.join(fixture_path, name)
if path in fixture_data:
return fixture_data[path]
with open(path) as f:
data = f.read()
try:
data = json.loads(data)
except:
pass
fixture_data[path] = data
return data
class AnsibleExitJson(Exception):
pass
class AnsibleFailJson(Exception):
pass
class TestIosModule(unittest.TestCase):
def execute_module(self, failed=False, changed=False, commands=None,
sort=True, defaults=False):
self.load_fixtures(commands)
if failed:
result = self.failed()
self.assertTrue(result['failed'], result)
else:
result = self.changed(changed)
self.assertEqual(result['changed'], changed, result)
if commands:
if sort:
self.assertEqual(sorted(commands), sorted(result['commands']), result['commands'])
else:
self.assertEqual(commands, result['commands'], result['commands'])
return result
def failed(self):
def fail_json(*args, **kwargs):
kwargs['failed'] = True
raise AnsibleFailJson(kwargs)
with patch.object(basic.AnsibleModule, 'fail_json', fail_json):
with self.assertRaises(AnsibleFailJson) as exc:
self.module.main()
result = exc.exception.args[0]
self.assertTrue(result['failed'], result)
return result
def changed(self, changed=False):
def exit_json(*args, **kwargs):
if 'changed' not in kwargs:
kwargs['changed'] = False
raise AnsibleExitJson(kwargs)
with patch.object(basic.AnsibleModule, 'exit_json', exit_json):
with self.assertRaises(AnsibleExitJson) as exc:
self.module.main()
result = exc.exception.args[0]
self.assertEqual(result['changed'], changed, result)
return result
def load_fixtures(self, commands=None):
pass

View file

@ -19,43 +19,15 @@
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import os
import json
from ansible.compat.tests import unittest
from ansible.compat.tests.mock import patch, MagicMock
from ansible.errors import AnsibleModuleExit
from ansible.compat.tests.mock import patch
from ansible.modules.network.ios import ios_command
from ansible.module_utils import basic
from ansible.module_utils._text import to_bytes
from .ios_module import TestIosModule, load_fixture, set_module_args
class TestIosCommandModule(TestIosModule):
def set_module_args(args):
args = json.dumps({'ANSIBLE_MODULE_ARGS': args})
basic._ANSIBLE_ARGS = to_bytes(args)
fixture_path = os.path.join(os.path.dirname(__file__), 'fixtures')
fixture_data = {}
def load_fixture(name):
path = os.path.join(fixture_path, name)
if path in fixture_data:
return fixture_data[path]
with open(path) as f:
data = f.read()
try:
data = json.loads(data)
except:
pass
fixture_data[path] = data
return data
class test_iosCommandModule(unittest.TestCase):
module = ios_command
def setUp(self):
self.mock_run_commands = patch('ansible.modules.network.ios.ios_command.run_commands')
@ -64,7 +36,7 @@ class test_iosCommandModule(unittest.TestCase):
def tearDown(self):
self.mock_run_commands.stop()
def execute_module(self, failed=False, changed=False):
def load_fixtures(self, commands=None):
def load_from_file(*args, **kwargs):
module, commands = args
@ -82,18 +54,6 @@ class test_iosCommandModule(unittest.TestCase):
self.run_commands.side_effect = load_from_file
with self.assertRaises(AnsibleModuleExit) as exc:
ios_command.main()
result = exc.exception.result
if failed:
self.assertTrue(result.get('failed'))
else:
self.assertEqual(result.get('changed'), changed, result)
return result
def test_ios_command_simple(self):
set_module_args(dict(commands=['show version']))
result = self.execute_module()

View file

@ -20,43 +20,16 @@
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import os
import json
from ansible.compat.tests import unittest
from ansible.compat.tests.mock import patch, MagicMock
from ansible.errors import AnsibleModuleExit
from ansible.compat.tests.mock import patch
from ansible.modules.network.ios import ios_config
from ansible.module_utils import basic
from ansible.module_utils._text import to_bytes
from .ios_module import TestIosModule, load_fixture, set_module_args
def set_module_args(args):
args = json.dumps({'ANSIBLE_MODULE_ARGS': args})
basic._ANSIBLE_ARGS = to_bytes(args)
class TestIosConfigModule(TestIosModule):
fixture_path = os.path.join(os.path.dirname(__file__), 'fixtures')
fixture_data = {}
def load_fixture(name):
path = os.path.join(fixture_path, name)
if path in fixture_data:
return fixture_data[path]
with open(path) as f:
data = f.read()
try:
data = json.loads(data)
except:
pass
fixture_data[path] = data
return data
class TestIosConfigModule(unittest.TestCase):
module = ios_config
def setUp(self):
self.mock_get_config = patch('ansible.modules.network.ios.ios_config.get_config')
@ -73,31 +46,11 @@ class TestIosConfigModule(unittest.TestCase):
self.mock_load_config.stop()
self.mock_run_commands.stop()
def execute_module(self, failed=False, changed=False, commands=None,
sort=True, defaults=False):
config_file = 'ios_config_defaults.cfg' if defaults else 'ios_config_config.cfg'
def load_fixtures(self, commands=None):
config_file = 'ios_config_config.cfg'
self.get_config.return_value = load_fixture(config_file)
self.load_config.return_value = None
with self.assertRaises(AnsibleModuleExit) as exc:
ios_config.main()
result = exc.exception.result
if failed:
self.assertTrue(result['failed'], result)
else:
self.assertEqual(result.get('changed'), changed, result)
if commands:
if sort:
self.assertEqual(sorted(commands), sorted(result['updates']), result['updates'])
else:
self.assertEqual(commands, result['updates'], result['updates'])
return result
def test_ios_config_unchanged(self):
src = load_fixture('ios_config_config.cfg')
set_module_args(dict(src=src))
@ -134,11 +87,6 @@ class TestIosConfigModule(unittest.TestCase):
commands = ['interface GigabitEthernet0/0', 'shutdown']
self.execute_module(changed=True, commands=commands)
def test_ios_config_defaults(self):
set_module_args(dict(lines=['no shutdown'], parents=['interface GigabitEthernet0/0'],
defaults=True))
self.execute_module(defaults=True)
def test_ios_config_before(self):
set_module_args(dict(lines=['hostname foo'], before=['test1','test2']))
commands = ['test1', 'test2', 'hostname foo']

View file

@ -20,44 +20,16 @@
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import os
import json
from ansible.compat.tests import unittest
from ansible.compat.tests.mock import patch, MagicMock
from ansible.errors import AnsibleModuleExit
from ansible.compat.tests.mock import patch
from ansible.modules.network.ios import ios_system
from ansible.module_utils import basic
#from ansible.module_utils.local import LocalAnsibleModule
from ansible.module_utils._text import to_bytes
from .ios_module import TestIosModule, load_fixture, set_module_args
def set_module_args(args):
args = json.dumps({'ANSIBLE_MODULE_ARGS': args})
basic._ANSIBLE_ARGS = to_bytes(args)
class TestIosSystemModule(TestIosModule):
fixture_path = os.path.join(os.path.dirname(__file__), 'fixtures')
fixture_data = {}
def load_fixture(name):
path = os.path.join(fixture_path, name)
if path in fixture_data:
return fixture_data[path]
with open(path) as f:
data = f.read()
try:
data = json.loads(data)
except:
pass
fixture_data[path] = data
return data
class TestIosSystemModule(unittest.TestCase):
module = ios_system
def setUp(self):
self.mock_get_config = patch('ansible.modules.network.ios.ios_system.get_config')
@ -70,29 +42,10 @@ class TestIosSystemModule(unittest.TestCase):
self.mock_get_config.stop()
self.mock_load_config.stop()
def execute_module(self, failed=False, changed=False, commands=None, sort=True):
def load_fixtures(self, commnads=None):
self.get_config.return_value = load_fixture('ios_system_config.cfg')
self.load_config.return_value = None
with self.assertRaises(AnsibleModuleExit) as exc:
ios_system.main()
result = exc.exception.result
if failed:
self.assertTrue(result['failed'], result)
else:
self.assertEqual(result['changed'], changed, result)
if commands:
if sort:
self.assertEqual(sorted(commands), sorted(result['commands']), result['commands'])
else:
self.assertEqual(commands, result['commands'])
return result
def test_ios_system_hostname_changed(self):
set_module_args(dict(hostname='foo'))
commands = ['hostname foo']

View file

@ -20,43 +20,15 @@
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import os
import json
from ansible.compat.tests import unittest
from ansible.compat.tests.mock import patch, MagicMock
from ansible.errors import AnsibleModuleExit
from ansible.compat.tests.mock import patch
from ansible.modules.network.ios import _ios_template
from ansible.module_utils import basic
from ansible.module_utils._text import to_bytes
from ansible.module_utils.local import LocalAnsibleModule
from .ios_module import TestIosModule, load_fixture, set_module_args
def set_module_args(args):
args = json.dumps({'ANSIBLE_MODULE_ARGS': args})
basic._ANSIBLE_ARGS = to_bytes(args)
class TestIosTemplateModule(TestIosModule):
fixture_path = os.path.join(os.path.dirname(__file__), 'fixtures')
fixture_data = {}
def load_fixture(name):
path = os.path.join(fixture_path, name)
if path in fixture_data:
return fixture_data[path]
with open(path) as f:
data = f.read()
try:
data = json.loads(data)
except:
pass
fixture_data[path] = data
return data
class TestIosTemplateModule(unittest.TestCase):
module = _ios_template
def setUp(self):
self.mock_get_config = patch('ansible.modules.network.ios._ios_template.get_config')
@ -69,31 +41,11 @@ class TestIosTemplateModule(unittest.TestCase):
self.mock_get_config.stop()
self.mock_load_config.stop()
def execute_module(self, failed=False, changed=False, commands=None,
sort=True, defaults=False):
config_file = 'ios_template_defaults.cfg' if defaults else 'ios_template_config.cfg'
def load_fixtures(self, commands=None):
config_file = 'ios_template_config.cfg'
self.get_config.return_value = load_fixture(config_file)
self.load_config.return_value = None
with self.assertRaises(AnsibleModuleExit) as exc:
_ios_template.main()
result = exc.exception.result
if failed:
self.assertTrue(result['failed'], result)
else:
self.assertEqual(result.get('changed'), changed, result)
if commands:
if sort:
self.assertEqual(sorted(commands), sorted(result['updates']), result['updates'])
else:
self.assertEqual(commands, result['updates'], result['updates'])
return result
def test_ios_template_unchanged(self):
src = load_fixture('ios_template_config.cfg')
set_module_args(dict(src=src))
@ -114,16 +66,6 @@ class TestIosTemplateModule(unittest.TestCase):
self.execute_module(changed=True, commands=commands)
self.assertFalse(self.get_config.called)
def test_ios_template_include_defaults_false(self):
src = load_fixture('ios_template_config.cfg')
set_module_args(dict(src=src, include_defaults=False))
self.execute_module()
_, kwargs = self.get_config.call_args
# Ensure flags doesn't contain "default", or any other value
self.assertEqual(kwargs['flags'], [])
self.assertIsInstance(kwargs['module'], LocalAnsibleModule)
def test_ios_template_backup(self):
set_module_args(dict(backup=True))
result = self.execute_module()

View file

@ -20,43 +20,16 @@
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import os
import json
from ansible.compat.tests import unittest
from ansible.compat.tests.mock import patch, MagicMock
from ansible.errors import AnsibleModuleExit
from ansible.compat.tests.mock import patch
from ansible.modules.network.ios import ios_vrf
from ansible.module_utils import basic
from ansible.module_utils._text import to_bytes
from .ios_module import TestIosModule, load_fixture, set_module_args
def set_module_args(args):
args = json.dumps({'ANSIBLE_MODULE_ARGS': args})
basic._ANSIBLE_ARGS = to_bytes(args)
class TestIosVrfModule(TestIosModule):
fixture_path = os.path.join(os.path.dirname(__file__), 'fixtures')
fixture_data = {}
def load_fixture(name):
path = os.path.join(fixture_path, name)
if path in fixture_data:
return fixture_data[path]
with open(path) as f:
data = f.read()
try:
data = json.loads(data)
except:
pass
fixture_data[path] = data
return data
class TestIosVrfModule(unittest.TestCase):
module = ios_vrf
def setUp(self):
self.mock_get_config = patch('ansible.modules.network.ios.ios_vrf.get_config')
@ -69,29 +42,10 @@ class TestIosVrfModule(unittest.TestCase):
self.mock_get_config.stop()
self.mock_load_config.stop()
def execute_module(self, failed=False, changed=False, commands=None, sort=True):
def load_fixtures(self, commands=None):
self.get_config.return_value = load_fixture('ios_vrf_config.cfg')
self.load_config.return_value = None
with self.assertRaises(AnsibleModuleExit) as exc:
ios_vrf.main()
result = exc.exception.result
if failed:
self.assertTrue(result['failed'], result)
else:
self.assertEqual(result.get('changed'), changed, result)
if commands:
if sort:
self.assertEqual(sorted(commands), sorted(result['commands']))
else:
self.assertEqual(commands, result['commands'], result['commands'])
return result
def test_ios_vrf_name(self):
set_module_args(dict(name='test_4'))
commands = ['vrf definition test_4']