1
0
Fork 0
mirror of https://github.com/ansible-collections/community.general.git synced 2024-09-14 20:13:21 +02:00

Move network utils that are used by AWS modules (#45579)

* Separate networking tools that may be used by modules outside of networking so changes to networking-only utilities don't trigger AWS integration tests

* Add unit tests for moved network utils

* Add comment to prevent imports from being mistakenly removed

* Move to_bits as well
This commit is contained in:
Sloane Hertel 2018-10-02 15:08:00 -04:00 committed by ansibot
parent a1fae193c7
commit 27534e9b47
3 changed files with 192 additions and 107 deletions

View file

@ -0,0 +1,115 @@
# Copyright (c) 2016 Red Hat Inc
# Simplified BSD License (see licenses/simplified_bsd.txt or https://opensource.org/licenses/BSD-2-Clause)
# General networking tools that may be used by all modules
from struct import pack
from socket import inet_ntoa
from ansible.module_utils.six.moves import zip
VALID_MASKS = [2**8 - 2**i for i in range(0, 9)]
def is_netmask(val):
parts = str(val).split('.')
if not len(parts) == 4:
return False
for part in parts:
try:
if int(part) not in VALID_MASKS:
raise ValueError
except ValueError:
return False
return True
def is_masklen(val):
try:
return 0 <= int(val) <= 32
except ValueError:
return False
def to_netmask(val):
""" converts a masklen to a netmask """
if not is_masklen(val):
raise ValueError('invalid value for masklen')
bits = 0
for i in range(32 - int(val), 32):
bits |= (1 << i)
return inet_ntoa(pack('>I', bits))
def to_masklen(val):
""" converts a netmask to a masklen """
if not is_netmask(val):
raise ValueError('invalid value for netmask: %s' % val)
bits = list()
for x in val.split('.'):
octet = bin(int(x)).count('1')
bits.append(octet)
return sum(bits)
def to_subnet(addr, mask, dotted_notation=False):
""" coverts an addr / mask pair to a subnet in cidr notation """
try:
if not is_masklen(mask):
raise ValueError
cidr = int(mask)
mask = to_netmask(mask)
except ValueError:
cidr = to_masklen(mask)
addr = addr.split('.')
mask = mask.split('.')
network = list()
for s_addr, s_mask in zip(addr, mask):
network.append(str(int(s_addr) & int(s_mask)))
if dotted_notation:
return '%s %s' % ('.'.join(network), to_netmask(cidr))
return '%s/%s' % ('.'.join(network), cidr)
def to_ipv6_network(addr):
""" IPv6 addresses are eight groupings. The first three groupings (48 bits) comprise the network address. """
# Split by :: to identify omitted zeros
ipv6_prefix = addr.split('::')[0]
# Get the first three groups, or as many as are found + ::
found_groups = []
for group in ipv6_prefix.split(':'):
found_groups.append(group)
if len(found_groups) == 3:
break
if len(found_groups) < 3:
found_groups.append('::')
# Concatenate network address parts
network_addr = ''
for group in found_groups:
if group != '::':
network_addr += str(group)
network_addr += str(':')
# Ensure network address ends with ::
if not network_addr.endswith('::'):
network_addr += str(':')
return network_addr
def to_bits(val):
""" converts a netmask to bits """
bits = ''
for octet in val.split('.'):
bits += bin(int(octet))[2:].zfill(8)
return str

View file

@ -25,20 +25,26 @@
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
# USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. # USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
# #
# Networking tools for network modules only
import re import re
import ast import ast
import operator import operator
import socket import socket
from itertools import chain from itertools import chain
from struct import pack from socket import inet_aton
from socket import inet_aton, inet_ntoa
from ansible.module_utils._text import to_text from ansible.module_utils._text import to_text
from ansible.module_utils.six import iteritems, string_types from ansible.module_utils.six import iteritems, string_types
from ansible.module_utils.six.moves import zip
from ansible.module_utils.basic import AnsibleFallbackNotFound from ansible.module_utils.basic import AnsibleFallbackNotFound
# Backwards compatibility for 3rd party modules
from ansible.module_utils.common.network import (
to_bits, is_netmask, is_masklen, to_netmask, to_masklen, to_subnet, to_ipv6_network, VALID_MASKS
)
try: try:
from jinja2 import Environment, StrictUndefined from jinja2 import Environment, StrictUndefined
from jinja2.exceptions import UndefinedError from jinja2.exceptions import UndefinedError
@ -49,7 +55,6 @@ except ImportError:
OPERATORS = frozenset(['ge', 'gt', 'eq', 'neq', 'lt', 'le']) OPERATORS = frozenset(['ge', 'gt', 'eq', 'neq', 'lt', 'le'])
ALIASES = frozenset([('min', 'ge'), ('max', 'le'), ('exactly', 'eq'), ('neq', 'ne')]) ALIASES = frozenset([('min', 'ge'), ('max', 'le'), ('exactly', 'eq'), ('neq', 'ne')])
VALID_MASKS = [2**8 - 2**i for i in range(0, 9)]
def to_list(val): def to_list(val):
@ -463,106 +468,3 @@ class Template:
if marker in data: if marker in data:
return True return True
return False return False
def is_netmask(val):
parts = str(val).split('.')
if not len(parts) == 4:
return False
for part in parts:
try:
if int(part) not in VALID_MASKS:
raise ValueError
except ValueError:
return False
return True
def is_masklen(val):
try:
return 0 <= int(val) <= 32
except ValueError:
return False
def to_bits(val):
""" converts a netmask to bits """
bits = ''
for octet in val.split('.'):
bits += bin(int(octet))[2:].zfill(8)
return str
def to_netmask(val):
""" converts a masklen to a netmask """
if not is_masklen(val):
raise ValueError('invalid value for masklen')
bits = 0
for i in range(32 - int(val), 32):
bits |= (1 << i)
return inet_ntoa(pack('>I', bits))
def to_masklen(val):
""" converts a netmask to a masklen """
if not is_netmask(val):
raise ValueError('invalid value for netmask: %s' % val)
bits = list()
for x in val.split('.'):
octet = bin(int(x)).count('1')
bits.append(octet)
return sum(bits)
def to_subnet(addr, mask, dotted_notation=False):
""" coverts an addr / mask pair to a subnet in cidr notation """
try:
if not is_masklen(mask):
raise ValueError
cidr = int(mask)
mask = to_netmask(mask)
except ValueError:
cidr = to_masklen(mask)
addr = addr.split('.')
mask = mask.split('.')
network = list()
for s_addr, s_mask in zip(addr, mask):
network.append(str(int(s_addr) & int(s_mask)))
if dotted_notation:
return '%s %s' % ('.'.join(network), to_netmask(cidr))
return '%s/%s' % ('.'.join(network), cidr)
def to_ipv6_network(addr):
""" IPv6 addresses are eight groupings. The first three groupings (48 bits) comprise the network address. """
# Split by :: to identify omitted zeros
ipv6_prefix = addr.split('::')[0]
# Get the first three groups, or as many as are found + ::
found_groups = []
for group in ipv6_prefix.split(':'):
found_groups.append(group)
if len(found_groups) == 3:
break
if len(found_groups) < 3:
found_groups.append('::')
# Concatenate network address parts
network_addr = ''
for group in found_groups:
if group != '::':
network_addr += str(group)
network_addr += str(':')
# Ensure network address ends with ::
if not network_addr.endswith('::'):
network_addr += str(':')
return network_addr

View file

@ -0,0 +1,68 @@
# -*- coding: utf-8 -*-
# (c) 2017 Red Hat, Inc.
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
# Make coding more python3-ish
from __future__ import absolute_import, division, print_function
__metaclass__ = type
import pytest
from ansible.module_utils.common.network import (
to_masklen,
to_netmask,
to_subnet,
to_ipv6_network,
is_masklen,
is_netmask
)
def test_to_masklen():
assert 24 == to_masklen('255.255.255.0')
def test_to_masklen_invalid():
with pytest.raises(ValueError):
to_masklen('255')
def test_to_netmask():
assert '255.0.0.0' == to_netmask(8)
assert '255.0.0.0' == to_netmask('8')
def test_to_netmask_invalid():
with pytest.raises(ValueError):
to_netmask(128)
def test_to_subnet():
result = to_subnet('192.168.1.1', 24)
assert '192.168.1.0/24' == result
result = to_subnet('192.168.1.1', 24, dotted_notation=True)
assert '192.168.1.0 255.255.255.0' == result
def test_to_subnet_invalid():
with pytest.raises(ValueError):
to_subnet('foo', 'bar')
def test_is_masklen():
assert is_masklen(32)
assert not is_masklen(33)
assert not is_masklen('foo')
def test_is_netmask():
assert is_netmask('255.255.255.255')
assert not is_netmask(24)
assert not is_netmask('foo')
def test_to_ipv6_network():
assert '2001:db8::' == to_ipv6_network('2001:db8::')
assert '2001:0db8:85a3::' == to_ipv6_network('2001:0db8:85a3:0000:0000:8a2e:0370:7334')
assert '2001:0db8:85a3::' == to_ipv6_network('2001:0db8:85a3:0:0:8a2e:0370:7334')