diff --git a/lib/ansible/plugins/filter/cast_type.py b/lib/ansible/plugins/filter/cast_type.py new file mode 100644 index 0000000000..d9a6d2cfd2 --- /dev/null +++ b/lib/ansible/plugins/filter/cast_type.py @@ -0,0 +1,63 @@ +# Author Ken Celenza +# Author Jason Edelman + +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +from ansible.errors import AnsibleError, AnsibleFilterError + + +def cast_list_to_dict(data, key): + new_obj = {} + + if not isinstance(data, list): + raise AnsibleFilterError("Type is not a valid list") + for item in data: + if not isinstance(item, dict): + raise AnsibleFilterError("List item is not a valid dict") + try: + key_elem = item.get(key) + except Exception as e: + raise AnsibleFilterError(str(e)) + if new_obj.get(key_elem): + raise AnsibleFilterError("Key {0} is not unique, cannot correctly turn into dict".format(key_elem)) + elif not key_elem: + raise AnsibleFilterError("Key {0} was not found".format(key)) + else: + new_obj[key_elem] = item + return new_obj + + +def cast_dict_to_list(data, key_name): + new_obj = [] + + if not isinstance(data, dict): + raise AnsibleFilterError("Type is not a valid dict") + for key, value in data.items(): + if not isinstance(value, dict): + raise AnsibleFilterError("Type of key {0} value {1} is not a valid dict".format(key, value)) + if value.get(key_name): + raise AnsibleFilterError("Key name {0} is already in use, cannot correctly turn into dict".format(key_name)) + value[key_name] = key + new_obj.append(value) + return new_obj + + +class FilterModule(object): + '''Convert a list to a dictionary provided a key that exists in all dicts. + If it does not, that dict is omitted + ''' + def filters(self): + return { + 'cast_list_to_dict': cast_list_to_dict, + 'cast_dict_to_list': cast_dict_to_list, + } + + +if __name__ == "__main__": + list_data = [{"proto": "eigrp", "state": "enabled"}, {"proto": "ospf", "state": "enabled"}] + print(cast_list_to_dict(list_data, 'proto')) + + dict_data = {'eigrp': {'state': 'enabled', 'as': '1'}, 'ospf': {'state': 'enabled', 'as': '2'}} + print(cast_dict_to_list(dict_data, 'proto')) diff --git a/test/units/plugins/filter/test_cast_types.py b/test/units/plugins/filter/test_cast_types.py new file mode 100644 index 0000000000..2edaa40fe2 --- /dev/null +++ b/test/units/plugins/filter/test_cast_types.py @@ -0,0 +1,63 @@ +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +from ansible.compat.tests import unittest +from ansible.plugins.filter.cast_type import (cast_list_to_dict, cast_dict_to_list) +from ansible.errors import AnsibleError, AnsibleFilterError + + +class TestTypeFilter(unittest.TestCase): + def test_cast_list_to_dict(self): + # Good test + list_original = [{"proto": "eigrp", "state": "enabled"}, {"proto": "ospf", "state": "enabled"}] + key = 'proto' + dict_return = {'eigrp': {'state': 'enabled', 'proto': 'eigrp'}, 'ospf': {'state': 'enabled', 'proto': 'ospf'}} + self.assertEqual(cast_list_to_dict(list_original, key), dict_return) + + # Fail when key is not found + key = 'key_not_to_be_found' + self.assertRaisesRegexp(AnsibleFilterError, 'was not found', cast_list_to_dict, list_original, key) + + # Fail when key is duplicated + list_original = [{"proto": "eigrp", "state": "enabled"}, {"proto": "ospf", "state": "enabled"}, {"proto": "ospf", "state": "enabled"}] + key = 'proto' + self.assertRaisesRegexp(AnsibleFilterError, 'is not unique', cast_list_to_dict, list_original, key) + + # Fail when list item is not a dict + list_original = [{"proto": "eigrp", "state": "enabled"}, "ospf"] + key = 'proto' + self.assertRaisesRegexp(AnsibleFilterError, 'List item is not a valid dict', cast_list_to_dict, list_original, key) + + # Fail when a non list is sent + list_original = {"proto": "eigrp", "state": "enabled"} + key = 'proto' + self.assertRaisesRegexp(AnsibleFilterError, 'not a valid list', cast_list_to_dict, list_original, key) + + def test_cast_dict_to_list(self): + # Good test + dict_original = {'eigrp': {'state': 'enabled', 'as': '1'}, 'ospf': {'state': 'enabled', 'as': '2'}} + key_name = 'proto' + list_return = [{'state': 'enabled', 'proto': 'ospf', 'as': '2'}, {'state': 'enabled', 'proto': 'eigrp', 'as': '1'}] + actual_return = cast_dict_to_list(dict_original, key_name) + + try: + _assertItemsEqual = self.assertCountEqual + _assertItemsEqual(actual_return, list_return) + except AttributeError: + self.assertEqual(sorted(actual_return), sorted(list_return)) + + # Fail when dict key is already used + dict_original = {'eigrp': {'state': 'enabled', 'as': '1', 'proto': 'bgp'}, 'ospf': {'state': 'enabled', 'as': '2'}} + key_name = 'proto' + self.assertRaisesRegexp(AnsibleFilterError, ' already in use, cannot correctly turn into dict', cast_dict_to_list, dict_original, key_name) + + # Fail when sending a non-dict + dict_original = [{'eigrp': {'state': 'enabled', 'as': '1'}, 'ospf': {'state': 'enabled', 'as': '2'}}] + key_name = 'proto' + self.assertRaisesRegexp(AnsibleFilterError, 'Type is not a valid dict', cast_dict_to_list, dict_original, key_name) + + # Fail when dict value is not a dict + dict_original = {'eigrp': [{'state': 'enabled', 'as': '1'}], 'ospf': {'state': 'enabled', 'as': '2'}} + key_name = 'proto' + self.assertRaisesRegexp(AnsibleFilterError, 'Type of key', cast_dict_to_list, dict_original, key_name)