1
0
Fork 0
mirror of https://github.com/ansible-collections/community.general.git synced 2024-09-14 20:13:21 +02:00

refactor moudule utils of hwc_utils.py (#55858)

* use navigate_value instead navigate_hash

* add async wait method

* update dict compare

* remove unuse methods

* not all modules have timeouts parameter

* navigate_value, the input data may be None
This commit is contained in:
zengchen 2019-05-09 21:04:52 +08:00 committed by jctanner
parent 34a8594c91
commit d8314e1a45
6 changed files with 305 additions and 327 deletions

View file

@ -3,6 +3,7 @@
# https://opensource.org/licenses/BSD-2-Clause) # https://opensource.org/licenses/BSD-2-Clause)
import re import re
import time
import traceback import traceback
THIRD_LIBRARIES_IMP_ERR = None THIRD_LIBRARIES_IMP_ERR = None
@ -20,59 +21,14 @@ from ansible.module_utils.basic import (AnsibleModule, env_fallback,
from ansible.module_utils._text import to_text from ansible.module_utils._text import to_text
def navigate_hash(source, path, default=None): class HwcModuleException(Exception):
if not (source and path): def __init__(self, message):
return None super(HwcModuleException, self).__init__()
key = path[0] self._message = message
path = path[1:]
if key not in source:
return default
result = source[key]
if path:
return navigate_hash(result, path, default)
else:
return result
def __str__(self):
def remove_empty_from_dict(obj): return "[HwcClientException] message=%s" % self._message
return _DictClean(
obj,
lambda v: v is not None and v != {} and v != []
)()
def remove_nones_from_dict(obj):
return _DictClean(obj, lambda v: v is not None)()
def replace_resource_dict(item, value):
""" Handles the replacement of dicts with values ->
the needed value for HWC API"""
if isinstance(item, list):
items = []
for i in item:
items.append(replace_resource_dict(i, value))
return items
else:
if not item:
return item
return item.get(value)
def are_dicts_different(expect, actual):
"""Remove all output-only from actual."""
actual_vals = {}
for k, v in actual.items():
if k in expect:
actual_vals[k] = v
expect_vals = {}
for k, v in expect.items():
if k in actual:
expect_vals[k] = v
return DictComparison(expect_vals) != DictComparison(actual_vals)
class HwcClientException(Exception): class HwcClientException(Exception):
@ -116,9 +72,9 @@ def session_method_wrapper(f):
code = r.status_code code = r.status_code
if code not in [200, 201, 202, 203, 204, 205, 206, 207, 208, 226]: if code not in [200, 201, 202, 203, 204, 205, 206, 207, 208, 226]:
msg = "" msg = ""
for i in [['message'], ['error', 'message']]: for i in ['message', 'error.message']:
try: try:
msg = navigate_hash(result, i) msg = navigate_value(result, i)
break break
except Exception: except Exception:
pass pass
@ -281,14 +237,9 @@ class HwcModule(AnsibleModule):
fallback=(env_fallback, ['ANSIBLE_HWC_PROJECT']), fallback=(env_fallback, ['ANSIBLE_HWC_PROJECT']),
), ),
region=dict( region=dict(
required=True, type='str', type='str',
fallback=(env_fallback, ['ANSIBLE_HWC_REGION']), fallback=(env_fallback, ['ANSIBLE_HWC_REGION']),
), ),
timeouts=dict(type='dict', options=dict(
create=dict(default='10m', type='str'),
update=dict(default='10m', type='str'),
delete=dict(default='10m', type='str'),
), default={}),
id=dict(type='str') id=dict(type='str')
) )
) )
@ -296,12 +247,13 @@ class HwcModule(AnsibleModule):
super(HwcModule, self).__init__(*args, **kwargs) super(HwcModule, self).__init__(*args, **kwargs)
class DictComparison(object): class _DictComparison(object):
''' This class takes in two dictionaries `a` and `b`. ''' This class takes in two dictionaries `a` and `b`.
These are dictionaries of arbitrary depth, but made up of standard These are dictionaries of arbitrary depth, but made up of standard
Python types only. Python types only.
This differ will compare all values in `a` to those in `b`. This differ will compare all values in `a` to those in `b`.
Note: Only keys in `a` will be compared. Extra keys in `b` will be ignored. If value in `a` is None, always returns True, indicating
this value is no need to compare.
Note: On all lists, order does matter. Note: On all lists, order does matter.
''' '''
@ -315,76 +267,136 @@ class DictComparison(object):
return not self.__eq__(other) return not self.__eq__(other)
def _compare_dicts(self, dict1, dict2): def _compare_dicts(self, dict1, dict2):
if len(dict1.keys()) != len(dict2.keys()): if dict1 is None:
return True
if set(dict1.keys()) != set(dict2.keys()):
return False return False
return all([ for k in dict1:
self._compare_value(dict1.get(k), dict2.get(k)) for k in dict1 if not self._compare_value(dict1.get(k), dict2.get(k)):
]) return False
return True
def _compare_lists(self, list1, list2): def _compare_lists(self, list1, list2):
"""Takes in two lists and compares them.""" """Takes in two lists and compares them."""
if list1 is None:
return True
if len(list1) != len(list2): if len(list1) != len(list2):
return False return False
difference = [] for i in range(len(list1)):
for index in range(len(list1)): if not self._compare_value(list1[i], list2[i]):
value1 = list1[index] return False
if index < len(list2):
value2 = list2[index]
difference.append(self._compare_value(value1, value2))
return all(difference) return True
def _compare_value(self, value1, value2): def _compare_value(self, value1, value2):
""" """
return: True: value1 is same as value2, otherwise False. return: True: value1 is same as value2, otherwise False.
""" """
if value1 is None:
return True
if not (value1 and value2): if not (value1 and value2):
return (not value1) and (not value2) return (not value1) and (not value2)
# Can assume non-None types at this point. # Can assume non-None types at this point.
if isinstance(value1, list): if isinstance(value1, list) and isinstance(value2, list):
return self._compare_lists(value1, value2) return self._compare_lists(value1, value2)
elif isinstance(value1, dict):
elif isinstance(value1, dict) and isinstance(value2, dict):
return self._compare_dicts(value1, value2) return self._compare_dicts(value1, value2)
# Always use to_text values to avoid unicode issues. # Always use to_text values to avoid unicode issues.
return (to_text(value1, errors='surrogate_or_strict') == to_text(
value2, errors='surrogate_or_strict'))
def wait_to_finish(target, pending, refresh, timeout, min_interval=1, delay=3):
is_last_time = False
not_found_times = 0
wait = 0
time.sleep(delay)
end = time.time() + timeout
while not is_last_time:
if time.time() > end:
is_last_time = True
obj, status = refresh()
if obj is None:
not_found_times += 1
if not_found_times > 10:
raise HwcModuleException(
"not found the object for %d times" % not_found_times)
else: else:
return (to_text(value1, errors='surrogate_or_strict') not_found_times = 0
== to_text(value2, errors='surrogate_or_strict'))
if status in target:
return obj
if pending and status not in pending:
raise HwcModuleException(
"unexpect status(%s) occured" % status)
if not is_last_time:
wait *= 2
if wait < min_interval:
wait = min_interval
elif wait > 10:
wait = 10
time.sleep(wait)
raise HwcModuleException("asycn wait timeout after %d seconds" % timeout)
class _DictClean(object): def navigate_value(data, index, array_index=None):
def __init__(self, obj, func): if array_index and (not isinstance(array_index, dict)):
self.obj = obj raise HwcModuleException("array_index must be dict")
self.keep_it = func
def __call__(self): d = data
return self._clean_dict(self.obj) for n in range(len(index)):
if d is None:
return None
def _clean_dict(self, obj): if not isinstance(d, dict):
r = {} raise HwcModuleException(
for k, v in obj.items(): "can't navigate value from a non-dict object")
v1 = v
if isinstance(v, dict):
v1 = self._clean_dict(v)
elif isinstance(v, list):
v1 = self._clean_list(v)
if self.keep_it(v1):
r[k] = v1
return r
def _clean_list(self, obj): i = index[n]
r = [] if i not in d:
for v in obj: raise HwcModuleException(
v1 = v "navigate value failed: key(%s) is not exist in dict" % i)
if isinstance(v, dict): d = d[i]
v1 = self._clean_dict(v)
elif isinstance(v, list): if not array_index:
v1 = self._clean_list(v) continue
if self.keep_it(v1):
r.append(v1) k = ".".join(index[: (n + 1)])
return r if k not in array_index:
continue
if d is None:
return None
if not isinstance(d, list):
raise HwcModuleException(
"can't navigate value from a non-list object")
j = array_index.get(k)
if j >= len(d):
raise HwcModuleException(
"navigate value failed: the index is out of list")
d = d[j]
return d
def build_path(module, path, kv=None): def build_path(module, path, kv=None):
@ -411,4 +423,12 @@ def get_region(module):
if module.params['region']: if module.params['region']:
return module.params['region'] return module.params['region']
return module.params['project_name'].split("_")[0] return module.params['project'].split("_")[0]
def is_empty_value(v):
return (not v)
def are_different_dicts(dict1, dict2):
return _DictComparison(dict1) != _DictComparison(dict2)

View file

@ -34,6 +34,27 @@ options:
type: str type: str
choices: ['present', 'absent'] choices: ['present', 'absent']
default: 'present' default: 'present'
timeouts:
description:
- The timeouts for each operations.
type: dict
version_added: '2.9'
suboptions:
create:
description:
- The timeout for create operation.
type: str
default: '15m'
update:
description:
- The timeout for update operation.
type: str
default: '15m'
delete:
description:
- The timeout for delete operation.
type: str
default: '15m'
name: name:
description: description:
- the name of vpc. - the name of vpc.
@ -110,14 +131,12 @@ RETURN = '''
# Imports # Imports
############################################################################### ###############################################################################
from ansible.module_utils.hwc_utils import (Config, HwcModule, get_region, from ansible.module_utils.hwc_utils import (Config, HwcClientException,
HwcClientException, navigate_hash, HwcClientException404, HwcModule,
HwcClientException404, are_different_dicts, is_empty_value,
remove_nones_from_dict, build_path, wait_to_finish, get_region,
remove_empty_from_dict, build_path, navigate_value)
are_dicts_different)
import re import re
import time
############################################################################### ###############################################################################
# Main # Main
@ -129,7 +148,13 @@ def main():
module = HwcModule( module = HwcModule(
argument_spec=dict( argument_spec=dict(
state=dict(default='present', choices=['present', 'absent'], type='str'), state=dict(
default='present', choices=['present', 'absent'], type='str'),
timeouts=dict(type='dict', options=dict(
create=dict(default='15m', type='str'),
update=dict(default='15m', type='str'),
delete=dict(default='15m', type='str'),
), default=dict()),
name=dict(required=True, type='str'), name=dict(required=True, type='str'),
cidr=dict(required=True, type='str') cidr=dict(required=True, type='str')
), ),
@ -156,7 +181,8 @@ def main():
if state == 'present': if state == 'present':
expect = _get_editable_properties(module) expect = _get_editable_properties(module)
current_state = response_to_hash(module, fetch) current_state = response_to_hash(module, fetch)
if are_dicts_different(expect, current_state): current = {"cidr": current_state["cidr"]}
if are_different_dicts(expect, current):
if not module.check_mode: if not module.check_mode:
fetch = update(config, self_link(module)) fetch = update(config, self_link(module))
fetch = response_to_hash(module, fetch.get('vpc')) fetch = response_to_hash(module, fetch.get('vpc'))
@ -195,8 +221,12 @@ def create(config, link):
module.fail_json(msg=msg) module.fail_json(msg=msg)
wait_done = wait_for_operation(config, 'create', r) wait_done = wait_for_operation(config, 'create', r)
v = ""
try:
v = navigate_value(wait_done, ['vpc', 'id'])
except Exception as ex:
module.fail_json(msg=str(ex))
v = navigate_hash(wait_done, ['vpc', 'id'])
url = build_path(module, 'vpcs/{op_id}', {'op_id': v}) url = build_path(module, 'vpcs/{op_id}', {'op_id': v})
return fetch_resource(module, client, url) return fetch_resource(module, client, url)
@ -268,7 +298,8 @@ def get_id_by_name(config):
elif len(ids) == 1: elif len(ids) == 1:
return ids[0] return ids[0]
else: else:
module.fail_json(msg="Multiple resources with same name are found.") module.fail_json(
msg="Multiple resources with same name are found.")
elif none_values: elif none_values:
module.fail_json( module.fail_json(
msg="Can not find id by name because url includes None.") msg="Can not find id by name because url includes None.")
@ -303,28 +334,43 @@ def self_link(module):
def resource_to_create(module): def resource_to_create(module):
request = remove_empty_from_dict({ params = dict()
u'name': module.params.get('name'),
u'cidr': module.params.get('cidr') v = module.params.get('cidr')
}) if not is_empty_value(v):
return {'vpc': request} params["cidr"] = v
v = module.params.get('name')
if not is_empty_value(v):
params["name"] = v
if not params:
return params
params = {"vpc": params}
return params
def resource_to_update(module): def resource_to_update(module):
request = remove_nones_from_dict({ params = dict()
u'name': module.params.get('name'),
u'cidr': module.params.get('cidr') v = module.params.get('cidr')
}) if not is_empty_value(v):
return {'vpc': request} params["cidr"] = v
if not params:
return params
params = {"vpc": params}
return params
def _get_editable_properties(module): def _get_editable_properties(module):
request = remove_nones_from_dict({ return {
"name": module.params.get("name"),
"cidr": module.params.get("cidr"), "cidr": module.params.get("cidr"),
}) }
return request
def response_to_hash(module, response): def response_to_hash(module, response):
@ -336,14 +382,20 @@ def response_to_hash(module, response):
u'name': response.get(u'name'), u'name': response.get(u'name'),
u'cidr': response.get(u'cidr'), u'cidr': response.get(u'cidr'),
u'status': response.get(u'status'), u'status': response.get(u'status'),
u'routes': VpcRoutesArray(response.get(u'routes', []), module).from_response(), u'routes': VpcRoutesArray(
response.get(u'routes', []), module).from_response(),
u'enable_shared_snat': response.get(u'enable_shared_snat') u'enable_shared_snat': response.get(u'enable_shared_snat')
} }
def wait_for_operation(config, op_type, op_result): def wait_for_operation(config, op_type, op_result):
module = config.module module = config.module
op_id = navigate_hash(op_result, ['vpc', 'id']) op_id = ""
try:
op_id = navigate_value(op_result, ['vpc', 'id'])
except Exception as ex:
module.fail_json(msg=str(ex))
url = build_path(module, "vpcs/{op_id}", {'op_id': op_id}) url = build_path(module, "vpcs/{op_id}", {'op_id': op_id})
timeout = 60 * int(module.params['timeouts'][op_type].rstrip('m')) timeout = 60 * int(module.params['timeouts'][op_type].rstrip('m'))
states = { states = {
@ -365,47 +417,47 @@ def wait_for_completion(op_uri, timeout, allowed_states,
complete_states, config): complete_states, config):
module = config.module module = config.module
client = config.client(get_region(module), "vpc", "project") client = config.client(get_region(module), "vpc", "project")
end = time.time() + timeout
while time.time() <= end: def _refresh_status():
r = None
try: try:
op_result = fetch_resource(module, client, op_uri) r = fetch_resource(module, client, op_uri)
except Exception: except Exception:
time.sleep(1.0) return None, ""
continue
raise_if_errors(op_result, module) status = ""
try:
status = navigate_value(r, ['vpc', 'status'])
except Exception:
return None, ""
status = navigate_hash(op_result, ['vpc', 'status']) return r, status
if status not in allowed_states:
module.fail_json(msg="Invalid async operation status %s" % status)
if status in complete_states:
return op_result
time.sleep(1.0) try:
return wait_to_finish(complete_states, allowed_states,
module.fail_json(msg="Timeout to wait completion.") _refresh_status, timeout)
except Exception as ex:
module.fail_json(msg=str(ex))
def raise_if_errors(response, module):
errors = navigate_hash(response, [])
if errors:
module.fail_json(msg=navigate_hash(response, []))
def wait_for_delete(module, client, link): def wait_for_delete(module, client, link):
end = time.time() + 60 * int(
module.params['timeouts']['delete'].rstrip('m')) def _refresh_status():
while time.time() <= end:
try: try:
client.get(link) client.get(link)
except HwcClientException404: except HwcClientException404:
return return True, "Done"
except Exception: except Exception:
pass return None, ""
time.sleep(1.0) return True, "Pending"
module.fail_json(msg="Timeout to wait for deletion to be complete.") timeout = 60 * int(module.params['timeouts']['delete'].rstrip('m'))
try:
return wait_to_finish(["Done"], ["Pending"], _refresh_status, timeout)
except Exception as ex:
module.fail_json(msg=str(ex))
class VpcRoutesArray(object): class VpcRoutesArray(object):

View file

@ -109,11 +109,10 @@ update_time:
# Imports # Imports
############################################################################### ###############################################################################
from ansible.module_utils.hwc_utils import (Config, HwcModule, build_path, from ansible.module_utils.hwc_utils import (Config, HwcClientException,
HwcClientException, navigate_hash, HwcModule, navigate_value,
remove_nones_from_dict, get_region, are_different_dicts, is_empty_value,
remove_empty_from_dict, build_path, get_region)
are_dicts_different)
import re import re
############################################################################### ###############################################################################
@ -153,7 +152,8 @@ def main():
if state == 'present': if state == 'present':
expect = _get_resource_editable_properties(module) expect = _get_resource_editable_properties(module)
current_state = response_to_hash(module, fetch) current_state = response_to_hash(module, fetch)
if are_dicts_different(expect, current_state): current = {'display_name': current_state['display_name']}
if are_different_dicts(expect, current):
if not module.check_mode: if not module.check_mode:
fetch = update(config) fetch = update(config)
fetch = response_to_hash(module, fetch) fetch = response_to_hash(module, fetch)
@ -236,7 +236,13 @@ def get_resource(config, result):
module = config.module module = config.module
client = config.client(get_region(module), "smn", "project") client = config.client(get_region(module), "smn", "project")
d = {'topic_urn': navigate_hash(result, ['topic_urn'])} v = ""
try:
v = navigate_value(result, ['topic_urn'])
except Exception as ex:
module.fail_json(msg=str(ex))
d = {'topic_urn': v}
url = build_path(module, 'notifications/topics/{topic_urn}', d) url = build_path(module, 'notifications/topics/{topic_urn}', d)
return fetch_resource(module, client, url) return fetch_resource(module, client, url)
@ -280,24 +286,33 @@ def self_link(module):
def create_resource_opts(module): def create_resource_opts(module):
request = remove_empty_from_dict({ params = dict()
u'display_name': module.params.get('display_name'),
u'name': module.params.get('name') v = module.params.get('display_name')
}) if not is_empty_value(v):
return request params["display_name"] = v
v = module.params.get('name')
if not is_empty_value(v):
params["name"] = v
return params
def update_resource_opts(module): def update_resource_opts(module):
request = remove_nones_from_dict({ params = dict()
u'display_name': module.params.get('display_name')
}) v = module.params.get('display_name')
return request if not is_empty_value(v):
params["display_name"] = v
return params
def _get_resource_editable_properties(module): def _get_resource_editable_properties(module):
return remove_nones_from_dict({ return {
"display_name": module.params.get("display_name"), "display_name": module.params.get("display_name"),
}) }
def response_to_hash(module, response): def response_to_hash(module, response):

View file

@ -41,26 +41,6 @@ options:
description: description:
- The region to which the project belongs. - The region to which the project belongs.
type: str type: str
required: true
timeouts:
description:
- The timeouts for create/update/delete operation.
type: dict
suboptions:
create:
description:
- The timeouts for create operation.
type: str
default: '10m'
update:
description:
- The timeouts for update operation.
type: str
default: '10m'
delete:
description:
- The timeouts for delete operation.
type: str
id: id:
description: description:
- The id of resource to be managed. - The id of resource to be managed.

View file

@ -21,7 +21,7 @@ import os
import sys import sys
from units.compat import unittest from units.compat import unittest
from ansible.module_utils.hwc_utils import DictComparison from ansible.module_utils.hwc_utils import are_different_dicts
class HwcDictComparisonTestCase(unittest.TestCase): class HwcDictComparisonTestCase(unittest.TestCase):
@ -30,9 +30,8 @@ class HwcDictComparisonTestCase(unittest.TestCase):
'foo': 'bar', 'foo': 'bar',
'test': 'original' 'test': 'original'
} }
d = DictComparison(value1)
d_ = d self.assertFalse(are_different_dicts(value1, value1))
self.assertTrue(d == d_)
def test_simple_different(self): def test_simple_different(self):
value1 = { value1 = {
@ -46,12 +45,10 @@ class HwcDictComparisonTestCase(unittest.TestCase):
value3 = { value3 = {
'test': 'original' 'test': 'original'
} }
dict1 = DictComparison(value1)
dict2 = DictComparison(value2) self.assertTrue(are_different_dicts(value1, value2))
dict3 = DictComparison(value3) self.assertTrue(are_different_dicts(value1, value3))
self.assertFalse(dict1 == dict2) self.assertTrue(are_different_dicts(value2, value3))
self.assertFalse(dict1 == dict3)
self.assertFalse(dict2 == dict3)
def test_nested_dictionaries_no_difference(self): def test_nested_dictionaries_no_difference(self):
value1 = { value1 = {
@ -63,9 +60,8 @@ class HwcDictComparisonTestCase(unittest.TestCase):
}, },
'test': 'original' 'test': 'original'
} }
d = DictComparison(value1)
d_ = d self.assertFalse(are_different_dicts(value1, value1))
self.assertTrue(d == d_)
def test_nested_dictionaries_with_difference(self): def test_nested_dictionaries_with_difference(self):
value1 = { value1 = {
@ -95,12 +91,9 @@ class HwcDictComparisonTestCase(unittest.TestCase):
} }
} }
dict1 = DictComparison(value1) self.assertTrue(are_different_dicts(value1, value2))
dict2 = DictComparison(value2) self.assertTrue(are_different_dicts(value1, value3))
dict3 = DictComparison(value3) self.assertTrue(are_different_dicts(value2, value3))
self.assertFalse(dict1 == dict2)
self.assertFalse(dict1 == dict3)
self.assertFalse(dict2 == dict3)
def test_arrays_strings_no_difference(self): def test_arrays_strings_no_difference(self):
value1 = { value1 = {
@ -109,9 +102,8 @@ class HwcDictComparisonTestCase(unittest.TestCase):
'bar' 'bar'
] ]
} }
d = DictComparison(value1)
d_ = d self.assertFalse(are_different_dicts(value1, value1))
self.assertTrue(d == d_)
def test_arrays_strings_with_difference(self): def test_arrays_strings_with_difference(self):
value1 = { value1 = {
@ -133,12 +125,9 @@ class HwcDictComparisonTestCase(unittest.TestCase):
] ]
} }
dict1 = DictComparison(value1) self.assertTrue(are_different_dicts(value1, value2))
dict2 = DictComparison(value2) self.assertTrue(are_different_dicts(value1, value3))
dict3 = DictComparison(value3) self.assertTrue(are_different_dicts(value2, value3))
self.assertFalse(dict1 == dict2)
self.assertFalse(dict1 == dict3)
self.assertFalse(dict2 == dict3)
def test_arrays_dicts_with_no_difference(self): def test_arrays_dicts_with_no_difference(self):
value1 = { value1 = {
@ -152,9 +141,8 @@ class HwcDictComparisonTestCase(unittest.TestCase):
} }
] ]
} }
d = DictComparison(value1)
d_ = d self.assertFalse(are_different_dicts(value1, value1))
self.assertTrue(d == d_)
def test_arrays_dicts_with_difference(self): def test_arrays_dicts_with_difference(self):
value1 = { value1 = {
@ -184,9 +172,7 @@ class HwcDictComparisonTestCase(unittest.TestCase):
} }
] ]
} }
dict1 = DictComparison(value1)
dict2 = DictComparison(value2) self.assertTrue(are_different_dicts(value1, value2))
dict3 = DictComparison(value3) self.assertTrue(are_different_dicts(value1, value3))
self.assertFalse(dict1 == dict2) self.assertTrue(are_different_dicts(value2, value3))
self.assertFalse(dict1 == dict3)
self.assertFalse(dict2 == dict3)

View file

@ -1,109 +1,34 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import os
import sys
from units.compat import unittest from units.compat import unittest
from ansible.module_utils.hwc_utils import (navigate_hash, from ansible.module_utils.hwc_utils import (HwcModuleException, navigate_value)
remove_empty_from_dict,
remove_nones_from_dict,
replace_resource_dict)
class HwcUtilsTestCase(unittest.TestCase): class HwcUtilsTestCase(unittest.TestCase):
def test_navigate_hash(self): def test_navigate_value(self):
value = { value = {
'foo': { 'foo': {
'quiet': { 'quiet': {
'tree': 'test' 'tree': 'test',
"trees": [0, 1]
}, },
} }
} }
self.assertEquals(navigate_hash(value, ["foo", "quiet", "tree"]), self.assertEquals(navigate_value(value, ["foo", "quiet", "tree"]),
"test") "test")
self.assertEquals(navigate_hash(value, ["foo", "q", "tree"], 123), self.assertEquals(
123) navigate_value(value, ["foo", "quiet", "trees"],
{"foo.quiet.trees": 1}),
1)
self.assertIsNone(navigate_hash(value, [], 123)) self.assertRaisesRegexp(HwcModuleException,
r".* key\(q\) is not exist in dict",
navigate_value, value, ["foo", "q", "tree"])
def test_remove_empty_from_dict(self): self.assertRaisesRegexp(HwcModuleException,
value = { r".* the index is out of list",
'foo': { navigate_value, value,
'quiet': { ["foo", "quiet", "trees"],
'tree': 'test', {"foo.quiet.trees": 2})
'tree1': [
None,
{},
[],
'test'
],
'tree2': {},
'tree3': []
},
},
'foo1': [],
'foo2': {},
'foo3': None,
}
expect = {
'foo': {
'quiet': {
'tree': 'test',
'tree1': [
'test'
]
},
},
}
self.assertEqual(remove_empty_from_dict(value), expect)
def test_remove_nones_from_dict(self):
value = {
'foo': {
'quiet': {
'tree': 'test',
'tree1': [
None,
{},
[],
'test'
],
'tree2': {},
'tree3': []
},
},
'foo1': [],
'foo2': {},
'foo3': None,
}
expect = {
'foo': {
'quiet': {
'tree': 'test',
'tree1': [
{},
[],
'test'
],
'tree2': {},
'tree3': []
},
},
'foo1': [],
'foo2': {},
}
self.assertEqual(remove_nones_from_dict(value), expect)
def test_replace_resource_dict(self):
self.assertEqual(replace_resource_dict({'foo': 'quiet'}, 'foo'), 'quiet')
self.assertEqual(replace_resource_dict({}, 'foo'), {})
self.assertEqual(replace_resource_dict([[{'foo': 'quiet'}]], 'foo'),
[['quiet']])