diff --git a/lib/ansible/module_utils/common/network.py b/lib/ansible/module_utils/common/network.py new file mode 100644 index 0000000000..45fba7df20 --- /dev/null +++ b/lib/ansible/module_utils/common/network.py @@ -0,0 +1,115 @@ +# Copyright (c) 2016 Red Hat Inc +# Simplified BSD License (see licenses/simplified_bsd.txt or https://opensource.org/licenses/BSD-2-Clause) + +# General networking tools that may be used by all modules + +from struct import pack +from socket import inet_ntoa + +from ansible.module_utils.six.moves import zip + + +VALID_MASKS = [2**8 - 2**i for i in range(0, 9)] + + +def is_netmask(val): + parts = str(val).split('.') + if not len(parts) == 4: + return False + for part in parts: + try: + if int(part) not in VALID_MASKS: + raise ValueError + except ValueError: + return False + return True + + +def is_masklen(val): + try: + return 0 <= int(val) <= 32 + except ValueError: + return False + + +def to_netmask(val): + """ converts a masklen to a netmask """ + if not is_masklen(val): + raise ValueError('invalid value for masklen') + + bits = 0 + for i in range(32 - int(val), 32): + bits |= (1 << i) + + return inet_ntoa(pack('>I', bits)) + + +def to_masklen(val): + """ converts a netmask to a masklen """ + if not is_netmask(val): + raise ValueError('invalid value for netmask: %s' % val) + + bits = list() + for x in val.split('.'): + octet = bin(int(x)).count('1') + bits.append(octet) + + return sum(bits) + + +def to_subnet(addr, mask, dotted_notation=False): + """ coverts an addr / mask pair to a subnet in cidr notation """ + try: + if not is_masklen(mask): + raise ValueError + cidr = int(mask) + mask = to_netmask(mask) + except ValueError: + cidr = to_masklen(mask) + + addr = addr.split('.') + mask = mask.split('.') + + network = list() + for s_addr, s_mask in zip(addr, mask): + network.append(str(int(s_addr) & int(s_mask))) + + if dotted_notation: + return '%s %s' % ('.'.join(network), to_netmask(cidr)) + return '%s/%s' % ('.'.join(network), cidr) + + +def to_ipv6_network(addr): + """ IPv6 addresses are eight groupings. The first three groupings (48 bits) comprise the network address. """ + + # Split by :: to identify omitted zeros + ipv6_prefix = addr.split('::')[0] + + # Get the first three groups, or as many as are found + :: + found_groups = [] + for group in ipv6_prefix.split(':'): + found_groups.append(group) + if len(found_groups) == 3: + break + if len(found_groups) < 3: + found_groups.append('::') + + # Concatenate network address parts + network_addr = '' + for group in found_groups: + if group != '::': + network_addr += str(group) + network_addr += str(':') + + # Ensure network address ends with :: + if not network_addr.endswith('::'): + network_addr += str(':') + return network_addr + + +def to_bits(val): + """ converts a netmask to bits """ + bits = '' + for octet in val.split('.'): + bits += bin(int(octet))[2:].zfill(8) + return str diff --git a/lib/ansible/module_utils/network/common/utils.py b/lib/ansible/module_utils/network/common/utils.py index fddeb6940b..4cebe64263 100644 --- a/lib/ansible/module_utils/network/common/utils.py +++ b/lib/ansible/module_utils/network/common/utils.py @@ -25,20 +25,26 @@ # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE # USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. # + +# Networking tools for network modules only + import re import ast import operator import socket from itertools import chain -from struct import pack -from socket import inet_aton, inet_ntoa +from socket import inet_aton from ansible.module_utils._text import to_text from ansible.module_utils.six import iteritems, string_types -from ansible.module_utils.six.moves import zip from ansible.module_utils.basic import AnsibleFallbackNotFound +# Backwards compatibility for 3rd party modules +from ansible.module_utils.common.network import ( + to_bits, is_netmask, is_masklen, to_netmask, to_masklen, to_subnet, to_ipv6_network, VALID_MASKS +) + try: from jinja2 import Environment, StrictUndefined from jinja2.exceptions import UndefinedError @@ -49,7 +55,6 @@ except ImportError: OPERATORS = frozenset(['ge', 'gt', 'eq', 'neq', 'lt', 'le']) ALIASES = frozenset([('min', 'ge'), ('max', 'le'), ('exactly', 'eq'), ('neq', 'ne')]) -VALID_MASKS = [2**8 - 2**i for i in range(0, 9)] def to_list(val): @@ -463,106 +468,3 @@ class Template: if marker in data: return True return False - - -def is_netmask(val): - parts = str(val).split('.') - if not len(parts) == 4: - return False - for part in parts: - try: - if int(part) not in VALID_MASKS: - raise ValueError - except ValueError: - return False - return True - - -def is_masklen(val): - try: - return 0 <= int(val) <= 32 - except ValueError: - return False - - -def to_bits(val): - """ converts a netmask to bits """ - bits = '' - for octet in val.split('.'): - bits += bin(int(octet))[2:].zfill(8) - return str - - -def to_netmask(val): - """ converts a masklen to a netmask """ - if not is_masklen(val): - raise ValueError('invalid value for masklen') - - bits = 0 - for i in range(32 - int(val), 32): - bits |= (1 << i) - - return inet_ntoa(pack('>I', bits)) - - -def to_masklen(val): - """ converts a netmask to a masklen """ - if not is_netmask(val): - raise ValueError('invalid value for netmask: %s' % val) - - bits = list() - for x in val.split('.'): - octet = bin(int(x)).count('1') - bits.append(octet) - - return sum(bits) - - -def to_subnet(addr, mask, dotted_notation=False): - """ coverts an addr / mask pair to a subnet in cidr notation """ - try: - if not is_masklen(mask): - raise ValueError - cidr = int(mask) - mask = to_netmask(mask) - except ValueError: - cidr = to_masklen(mask) - - addr = addr.split('.') - mask = mask.split('.') - - network = list() - for s_addr, s_mask in zip(addr, mask): - network.append(str(int(s_addr) & int(s_mask))) - - if dotted_notation: - return '%s %s' % ('.'.join(network), to_netmask(cidr)) - return '%s/%s' % ('.'.join(network), cidr) - - -def to_ipv6_network(addr): - """ IPv6 addresses are eight groupings. The first three groupings (48 bits) comprise the network address. """ - - # Split by :: to identify omitted zeros - ipv6_prefix = addr.split('::')[0] - - # Get the first three groups, or as many as are found + :: - found_groups = [] - for group in ipv6_prefix.split(':'): - found_groups.append(group) - if len(found_groups) == 3: - break - if len(found_groups) < 3: - found_groups.append('::') - - # Concatenate network address parts - network_addr = '' - for group in found_groups: - if group != '::': - network_addr += str(group) - network_addr += str(':') - - # Ensure network address ends with :: - if not network_addr.endswith('::'): - network_addr += str(':') - return network_addr diff --git a/test/units/module_utils/common/test_network.py b/test/units/module_utils/common/test_network.py new file mode 100644 index 0000000000..1267d0cef0 --- /dev/null +++ b/test/units/module_utils/common/test_network.py @@ -0,0 +1,68 @@ +# -*- coding: utf-8 -*- +# (c) 2017 Red Hat, Inc. +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +# Make coding more python3-ish +from __future__ import absolute_import, division, print_function +__metaclass__ = type + +import pytest + +from ansible.module_utils.common.network import ( + to_masklen, + to_netmask, + to_subnet, + to_ipv6_network, + is_masklen, + is_netmask +) + + +def test_to_masklen(): + assert 24 == to_masklen('255.255.255.0') + + +def test_to_masklen_invalid(): + with pytest.raises(ValueError): + to_masklen('255') + + +def test_to_netmask(): + assert '255.0.0.0' == to_netmask(8) + assert '255.0.0.0' == to_netmask('8') + + +def test_to_netmask_invalid(): + with pytest.raises(ValueError): + to_netmask(128) + + +def test_to_subnet(): + result = to_subnet('192.168.1.1', 24) + assert '192.168.1.0/24' == result + + result = to_subnet('192.168.1.1', 24, dotted_notation=True) + assert '192.168.1.0 255.255.255.0' == result + + +def test_to_subnet_invalid(): + with pytest.raises(ValueError): + to_subnet('foo', 'bar') + + +def test_is_masklen(): + assert is_masklen(32) + assert not is_masklen(33) + assert not is_masklen('foo') + + +def test_is_netmask(): + assert is_netmask('255.255.255.255') + assert not is_netmask(24) + assert not is_netmask('foo') + + +def test_to_ipv6_network(): + assert '2001:db8::' == to_ipv6_network('2001:db8::') + assert '2001:0db8:85a3::' == to_ipv6_network('2001:0db8:85a3:0000:0000:8a2e:0370:7334') + assert '2001:0db8:85a3::' == to_ipv6_network('2001:0db8:85a3:0:0:8a2e:0370:7334')