# # This code is part of Ansible, but is an independent component. # # This particular file snippet, and this file snippet only, is BSD licensed. # Modules you write using this snippet, which is embedded dynamically by Ansible # still belong to the author of the module, and may assign their own license # to the complete work. # # (c) 2017 Red Hat, Inc. # # Redistribution and use in source and binary forms, with or without modification, # are permitted provided that the following conditions are met: # # * Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # * Redistributions in binary form must reproduce the above copyright notice, # this list of conditions and the following disclaimer in the documentation # and/or other materials provided with the distribution. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. # IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE # USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. # import re import socket import sys import traceback from ansible.module_utils.basic import env_fallback from ansible_collections.ansible.netcommon.plugins.module_utils.network.common.utils import to_list, ComplexList from ansible.module_utils.connection import exec_command, ConnectionError from ansible.module_utils.six import iteritems from ansible.module_utils._text import to_native from ansible_collections.ansible.netcommon.plugins.module_utils.network.common.netconf import NetconfConnection try: from ncclient.xml_ import to_xml, new_ele_ns HAS_NCCLIENT = True except ImportError: HAS_NCCLIENT = False try: from lxml import etree except ImportError: from xml.etree import ElementTree as etree _DEVICE_CLI_CONNECTION = None _DEVICE_NC_CONNECTION = None ce_provider_spec = { 'host': dict(), 'port': dict(type='int'), 'username': dict(fallback=(env_fallback, ['ANSIBLE_NET_USERNAME'])), 'password': dict(fallback=(env_fallback, ['ANSIBLE_NET_PASSWORD']), no_log=True), 'ssh_keyfile': dict(fallback=(env_fallback, ['ANSIBLE_NET_SSH_KEYFILE']), type='path'), 'use_ssl': dict(type='bool'), 'validate_certs': dict(type='bool'), 'timeout': dict(type='int'), 'transport': dict(default='cli', choices=['cli', 'netconf']), } ce_argument_spec = { 'provider': dict(type='dict', options=ce_provider_spec), } ce_top_spec = { 'host': dict(removed_in_version=2.9), 'port': dict(removed_in_version=2.9, type='int'), 'username': dict(removed_in_version=2.9), 'password': dict(removed_in_version=2.9, no_log=True), 'ssh_keyfile': dict(removed_in_version=2.9, type='path'), 'use_ssl': dict(removed_in_version=2.9, type='bool'), 'validate_certs': dict(removed_in_version=2.9, type='bool'), 'timeout': dict(removed_in_version=2.9, type='int'), 'transport': dict(removed_in_version=2.9, choices=['cli', 'netconf']), } ce_argument_spec.update(ce_top_spec) def to_string(data): return re.sub(r'|>)', r' 2 and err[0] in ["<", "["] and err[-1] in [">", "]"]: continue err.strip('.,\r\n\t ') if err: msg.append(err) if cmd: msg.insert(0, "Command: %s" % cmd) return ", ".join(msg).capitalize() + "." def to_command(module, commands): default_output = 'text' transform = ComplexList(dict( command=dict(key=True), output=dict(default=default_output), prompt=dict(), answer=dict() ), module) commands = transform(to_list(commands)) return commands def get_config(module, flags=None): flags = [] if flags is None else flags conn = get_connection(module) return conn.get_config(flags) def run_commands(module, commands, check_rc=True): conn = get_connection(module) return conn.run_commands(to_command(module, commands), check_rc) def load_config(module, config): """load_config""" conn = get_connection(module) return conn.load_config(config) def ce_unknown_host_cb(host, fingerprint): """ ce_unknown_host_cb """ return True def get_nc_set_id(xml_str): """get netconf set-id value""" result = re.findall(r'= 0 and index >= len(xml_list): return None if index < 0 and abs(index) > len(xml_list): return None ele = xml_list[index] if not ele.replace(" ", ""): xml_list.pop(index) ele = None return ele def merge_nc_xml(xml1, xml2): """merge xml1 and xml2""" xml1_list = xml1.split("")[0].split("\n") xml2_list = xml2.split("")[1].split("\n") while True: xml1_ele1 = get_xml_line(xml1_list, -1) xml1_ele2 = get_xml_line(xml1_list, -2) xml2_ele1 = get_xml_line(xml2_list, 0) xml2_ele2 = get_xml_line(xml2_list, 1) if not xml1_ele1 or not xml1_ele2 or not xml2_ele1 or not xml2_ele2: return xml1 if "xmlns" in xml2_ele1: xml2_ele1 = xml2_ele1.lstrip().split(" ")[0] + ">" if "xmlns" in xml2_ele2: xml2_ele2 = xml2_ele2.lstrip().split(" ")[0] + ">" if xml1_ele1.replace(" ", "").replace("/", "") == xml2_ele1.replace(" ", "").replace("/", ""): if xml1_ele2.replace(" ", "").replace("/", "") == xml2_ele2.replace(" ", "").replace("/", ""): xml1_list.pop() xml2_list.pop(0) else: break else: break return "\n".join(xml1_list + xml2_list) def get_nc_connection(module): global _DEVICE_NC_CONNECTION if not _DEVICE_NC_CONNECTION: load_params(module) conn = NetconfConnection(module._socket_path) _DEVICE_NC_CONNECTION = conn return _DEVICE_NC_CONNECTION def set_nc_config(module, xml_str): """ set_config """ conn = get_nc_connection(module) try: out = conn.edit_config(target='running', config=xml_str, default_operation='merge', error_option='rollback-on-error') finally: # conn.unlock(target = 'candidate') pass return to_string(to_xml(out)) def get_nc_next(module, xml_str): """ get_nc_next for exchange capability """ conn = get_nc_connection(module) result = None if xml_str is not None: response = conn.get(xml_str, if_rpc_reply=True) result = response.find('./*') set_id = response.get('set-id') while True and set_id is not None: try: fetch_node = new_ele_ns('get-next', 'http://www.huawei.com/netconf/capability/base/1.0', {'set-id': set_id}) next_xml = conn.dispatch_rpc(etree.tostring(fetch_node)) if next_xml is not None: result.extend(next_xml.find('./*')) set_id = next_xml.get('set-id') except ConnectionError: break if result is not None: return etree.tostring(result) return result def get_nc_config(module, xml_str): """ get_config """ conn = get_nc_connection(module) if xml_str is not None: response = conn.get(xml_str) else: return None return to_string(to_xml(response)) def execute_nc_action(module, xml_str): """ huawei execute-action """ conn = get_nc_connection(module) response = conn.execute_action(xml_str) return to_string(to_xml(response)) def execute_nc_cli(module, xml_str): """ huawei execute-cli """ if xml_str is not None: try: conn = get_nc_connection(module) out = conn.execute_nc_cli(command=xml_str) return to_string(to_xml(out)) except Exception as exc: raise Exception(exc) def check_ip_addr(ipaddr): """ check ip address, Supports IPv4 and IPv6 """ if not ipaddr or '\x00' in ipaddr: return False try: res = socket.getaddrinfo(ipaddr, 0, socket.AF_UNSPEC, socket.SOCK_STREAM, 0, socket.AI_NUMERICHOST) return bool(res) except socket.gaierror: err = sys.exc_info()[1] if err.args[0] == socket.EAI_NONAME: return False raise