1
0
Fork 0
mirror of https://github.com/ansible-collections/community.general.git synced 2024-09-14 20:13:21 +02:00

adds new filter plugins for network use cases (#27695)

* adds new filter plugins for network use cases

* adds parse_cli filter
* adds parse_cli_textfsm filter
* adds Template class to network_common
* adds conditional function to network_common

* fix up PEP8 issues
This commit is contained in:
Peter Sprygada 2017-08-04 07:47:12 -04:00 committed by GitHub
parent 19b1361184
commit 7b604368d3
3 changed files with 282 additions and 2 deletions

View file

@ -25,11 +25,25 @@
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
# USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. # USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
# #
import re
import ast
import operator
from itertools import chain from itertools import chain
from ansible.module_utils.six import iteritems from ansible.module_utils.six import iteritems, string_types
from ansible.module_utils.basic import AnsibleFallbackNotFound from ansible.module_utils.basic import AnsibleFallbackNotFound
from ansible.module_utils.six import iteritems
try:
from jinja2 import Environment
from jinja2.exceptions import UndefinedError
HAS_JINJA2 = True
except ImportError:
HAS_JINJA2 = False
OPERATORS = frozenset(['ge', 'gt', 'eq', 'neq', 'lt', 'le'])
ALIASES = frozenset([('min', 'ge'), ('max', 'le'), ('exactly', 'eq'), ('neq', 'ne')])
def to_list(val): def to_list(val):
@ -281,3 +295,75 @@ def dict_merge(base, other):
combined[key] = other.get(key) combined[key] = other.get(key)
return combined return combined
def conditional(expr, val, cast=None):
match = re.match('^(.+)\((.+)\)$', str(expr), re.I)
if match:
op, arg = match.groups()
else:
op = 'eq'
assert (' ' not in str(expr)), 'invalid expression: cannot contain spaces'
arg = expr
if cast is None and val is not None:
arg = type(val)(arg)
elif callable(cast):
arg = cast(arg)
val = cast(val)
op = next((oper for alias, oper in ALIASES if op == alias), op)
if not hasattr(operator, op) and op not in OPERATORS:
raise ValueError('unknown operator: %s' % op)
func = getattr(operator, op)
return func(val, arg)
def ternary(value, true_val, false_val):
''' value ? true_val : false_val '''
if value:
return true_val
else:
return false_val
class Template:
def __init__(self):
if not HAS_JINJA2:
raise ImportError("jinja2 is required but does not appear to be installed. "
"It can be installed using `pip install jinja2`")
self.env = Environment()
self.env.filters.update({'ternary': ternary})
def __call__(self, value, variables=None):
variables = variables or {}
if not self.contains_vars(value):
return value
value = self.env.from_string(value).render(variables)
if value:
try:
return ast.literal_eval(value)
except ValueError:
return str(value)
else:
return None
def can_template(self, tmpl):
try:
self(tmpl)
return True
except:
return False
def contains_vars(self, data):
if isinstance(data, string_types):
for marker in (self.env.block_start_string, self.env.variable_start_string, self.env.comment_start_string):
if marker in data:
return True
return False

View file

@ -0,0 +1,172 @@
#
# {c) 2017 Red Hat, Inc.
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
# Make coding more python3-ish
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import re
import os
import json
from collections import Mapping
from ansible.module_utils.network_common import Template
from ansible.module_utils.six import iteritems
from ansible.errors import AnsibleError
try:
import yaml
HAS_YAML = True
except ImportError:
HAS_YAML = False
try:
import textfsm
HAS_TEXTFSM = True
except ImportError:
HAS_TEXTFSM = False
try:
from __main__ import display
except ImportError:
from ansible.utils.display import Display
display = Display()
def re_matchall(regex, value):
objects = list()
for match in re.findall(regex.pattern, value, re.M):
obj = {}
if regex.groupindex:
for name, index in iteritems(regex.groupindex):
obj[name] = match[index - 1]
objects.append(obj)
return objects
def re_search(regex, value):
obj = {}
match = regex.search(value, re.M)
if match:
items = list(match.groups())
if regex.groupindex:
for name, index in iteritems(regex.groupindex):
obj[name] = items[index - 1]
return obj
def parse_cli(output, tmpl):
try:
template = Template()
except ImportError as exc:
raise AnsibleError(str(exc))
spec = yaml.load(open(tmpl).read())
obj = {}
for name, attrs in iteritems(spec['attributes']):
value = attrs['value']
if template.can_template(value):
value = template(value, spec)
if 'items' in attrs:
regexp = re.compile(attrs['items'])
when = attrs.get('when')
conditional = "{%% if %s %%}True{%% else %%}False{%% endif %%}" % when
if isinstance(value, Mapping) and 'key' not in value:
values = list()
for item in re_matchall(regexp, output):
entry = {}
for item_key, item_value in iteritems(value):
entry[item_key] = template(item_value, {'item': item})
if when:
if template(conditional, {'item': entry}):
values.append(entry)
else:
values.append(entry)
obj[name] = values
elif isinstance(value, Mapping):
values = dict()
for item in re_matchall(regexp, output):
entry = {}
for item_key, item_value in iteritems(value['values']):
entry[item_key] = template(item_value, {'item': item})
key = template(value['key'], {'item': item})
if when:
if template(conditional, {'item': {'key': key, 'value': entry}}):
values[key] = entry
else:
values[key] = entry
obj[name] = values
else:
item = re_search(regexp, output)
obj[name] = template(value, {'item': item})
else:
obj[name] = value
return obj
def parse_cli_textfsm(value, template):
if not HAS_TEXTFSM:
raise AnsibleError('parse_cli_textfsm filter requires TextFSM library to be installed')
if not os.path.exists(template):
raise AnsibleError('unable to locate parse_cli template: %s' % template)
try:
template = open(template)
except IOError as exc:
raise AnsibleError(str(exc))
re_table = textfsm.TextFSM(template)
fsm_results = re_table.ParseText(value)
results = list()
for item in fsm_results:
results.append(dict(zip(re_table.header, item)))
return results
class FilterModule(object):
"""Filters for working with output from network devices"""
filter_map = {
'parse_cli': parse_cli,
'parse_cli_textfsm': parse_cli_textfsm
}
def filters(self):
return self.filter_map

View file

@ -25,6 +25,7 @@ from ansible.compat.tests import unittest
from ansible.module_utils.network_common import to_list, sort_list from ansible.module_utils.network_common import to_list, sort_list
from ansible.module_utils.network_common import dict_diff, dict_merge from ansible.module_utils.network_common import dict_diff, dict_merge
from ansible.module_utils.network_common import conditional, Template
class TestModuleUtilsNetworkCommon(unittest.TestCase): class TestModuleUtilsNetworkCommon(unittest.TestCase):
@ -127,3 +128,24 @@ class TestModuleUtilsNetworkCommon(unittest.TestCase):
self.assertIn('b2', result) self.assertIn('b2', result)
self.assertTrue(result['b3']) self.assertTrue(result['b3'])
self.assertTrue(result['b4']) self.assertTrue(result['b4'])
def test_conditional(self):
self.assertTrue(conditional(10, 10))
self.assertTrue(conditional('10', '10'))
self.assertTrue(conditional('foo', 'foo'))
self.assertTrue(conditional(True, True))
self.assertTrue(conditional(False, False))
self.assertTrue(conditional(None, None))
self.assertTrue(conditional("ge(1)", 1))
self.assertTrue(conditional("gt(1)", 2))
self.assertTrue(conditional("le(2)", 2))
self.assertTrue(conditional("lt(3)", 2))
self.assertTrue(conditional("eq(1)", 1))
self.assertTrue(conditional("neq(0)", 1))
self.assertTrue(conditional("min(1)", 1))
self.assertTrue(conditional("max(1)", 1))
self.assertTrue(conditional("exactly(1)", 1))
def test_template(self):
tmpl = Template()
self.assertEqual('foo', tmpl('{{ test }}', {'test': 'foo'}))