1
0
Fork 0
mirror of https://github.com/ansible-collections/community.general.git synced 2024-09-14 20:13:21 +02:00
community.general/plugins/module_utils/network/f5/urls.py
Felix Fontein 25394eeafb
Fix imports and installing dependencies in CI, part 1 (#41)
* Fix ovirt collection name (ovirt.ovirt_collection, not ovirt.ovirt).

* Fix kubernetes module_utils references.

* Fix broken f5 imports on community.general side. The imports in that collection are still broken and will still cause failures.

* Fix Cisco ACI and MSO modules imports.

* Fix check_point.mgmt dependency, fix imports.

* Fix fortimanager imports.

* Fix cisco intersight imports.

* Fix ovirt module docs fragments.

* Fix usage of _ in unit tests to avoid sanity failures.

* Fix Cisco module docs fragments.

* Fix netapp.ontap module docs fragment name.

* Fix documentation.

* Fix some boilerplate (the ones not mentioned in ignore.txt).
2020-03-24 22:14:53 +00:00

122 lines
4.5 KiB
Python

# -*- coding: utf-8 -*-
#
# Copyright (c) 2017, F5 Networks Inc.
# GNU General Public License v3.0 (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import absolute_import, division, print_function
__metaclass__ = type
import re
try:
from library.module_utils.network.f5.common import F5ModuleError
except ImportError:
from ansible_collections.f5networks.f5_modules.plugins.module_utils.common import F5ModuleError
_CLEAN_HEADER_REGEX_BYTE = re.compile(b'^\\S[^\\r\\n]*$|^$')
_CLEAN_HEADER_REGEX_STR = re.compile(r'^\S[^\r\n]*$|^$')
def check_header_validity(header):
"""Verifies that header value is a string which doesn't contain
leading whitespace or return characters.
NOTE: This is a slightly modified version of the original function
taken from the requests library:
http://docs.python-requests.org/en/master/_modules/requests/utils/
:param header: string containing ':'.
"""
try:
name, value = header.split(':')
except ValueError:
raise F5ModuleError('Invalid header format: {0}'.format(header))
if name == '':
raise F5ModuleError('Invalid header format: {0}'.format(header))
if isinstance(value, bytes):
pat = _CLEAN_HEADER_REGEX_BYTE
else:
pat = _CLEAN_HEADER_REGEX_STR
try:
if not pat.match(value):
raise F5ModuleError("Invalid return character or leading space in header: %s" % name)
except TypeError:
raise F5ModuleError("Value for header {%s: %s} must be of type str or "
"bytes, not %s" % (name, value, type(value)))
def build_service_uri(base_uri, partition, name):
"""Build the proper uri for a service resource.
This follows the scheme:
<base_uri>/~<partition>~<<name>.app>~<name>
:param base_uri: str -- base uri of the REST endpoint
:param partition: str -- partition for the service
:param name: str -- name of the service
:returns: str -- uri to access the service
"""
name = name.replace('/', '~')
return '%s~%s~%s.app~%s' % (base_uri, partition, name, name)
def parseStats(entry):
if 'description' in entry:
return entry['description']
elif 'value' in entry:
return entry['value']
elif 'entries' in entry or 'nestedStats' in entry and 'entries' in entry['nestedStats']:
if 'entries' in entry:
entries = entry['entries']
else:
entries = entry['nestedStats']['entries']
result = None
for name in entries:
entry = entries[name]
if 'https://localhost' in name:
name = name.split('/')
name = name[-1]
if result and isinstance(result, list):
result.append(parseStats(entry))
elif result and isinstance(result, dict):
result[name] = parseStats(entry)
else:
try:
int(name)
result = list()
result.append(parseStats(entry))
except ValueError:
result = dict()
result[name] = parseStats(entry)
else:
if '.' in name:
names = name.split('.')
key = names[0]
value = names[1]
if result is None:
# result can be None if this branch is reached first
#
# For example, the mgmt/tm/net/trunk/NAME/stats API
# returns counters.bitsIn before anything else.
result = dict()
result[key] = dict()
elif key not in result:
result[key] = dict()
elif result[key] is None:
result[key] = dict()
result[key][value] = parseStats(entry)
else:
if result and isinstance(result, list):
result.append(parseStats(entry))
elif result and isinstance(result, dict):
result[name] = parseStats(entry)
else:
try:
int(name)
result = list()
result.append(parseStats(entry))
except ValueError:
result = dict()
result[name] = parseStats(entry)
return result