1
0
Fork 0
mirror of https://github.com/ansible-collections/community.general.git synced 2024-09-14 20:13:21 +02:00
community.general/plugins/filter/lists.py

211 lines
5.1 KiB
Python
Raw Permalink Normal View History

# -*- coding: utf-8 -*-
# Copyright (c) Ansible Project
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
# SPDX-License-Identifier: GPL-3.0-or-later
from __future__ import absolute_import, division, print_function
__metaclass__ = type
from ansible.errors import AnsibleFilterError
from ansible.module_utils.common.collections import is_sequence
def remove_duplicates(lst):
seen = set()
seen_add = seen.add
result = []
for item in lst:
try:
if item not in seen:
seen_add(item)
result.append(item)
except TypeError:
# This happens for unhashable values `item`. If this happens,
# convert `seen` to a list and continue.
seen = list(seen)
seen_add = seen.append
if item not in seen:
seen_add(item)
result.append(item)
return result
def flatten_list(lst):
result = []
for sublist in lst:
if not is_sequence(sublist):
msg = ("All arguments must be lists. %s is %s")
raise AnsibleFilterError(msg % (sublist, type(sublist)))
if len(sublist) > 0:
if all(is_sequence(sub) for sub in sublist):
for item in sublist:
result.append(item)
else:
result.append(sublist)
return result
def lists_union(*args, **kwargs):
lists = args
flatten = kwargs.pop('flatten', False)
if kwargs:
# Some unused kwargs remain
raise AnsibleFilterError(
"lists_union() got unexpected keywords arguments: {0}".format(
", ".join(kwargs.keys())
)
)
if flatten:
lists = flatten_list(args)
if not lists:
return []
if len(lists) == 1:
return lists[0]
a = lists[0]
for b in lists[1:]:
a = do_union(a, b)
return remove_duplicates(a)
def do_union(a, b):
return a + b
def lists_intersect(*args, **kwargs):
lists = args
flatten = kwargs.pop('flatten', False)
if kwargs:
# Some unused kwargs remain
raise AnsibleFilterError(
"lists_intersect() got unexpected keywords arguments: {0}".format(
", ".join(kwargs.keys())
)
)
if flatten:
lists = flatten_list(args)
if not lists:
return []
if len(lists) == 1:
return lists[0]
a = remove_duplicates(lists[0])
for b in lists[1:]:
a = do_intersect(a, b)
return a
def do_intersect(a, b):
isect = []
try:
other = set(b)
isect = [item for item in a if item in other]
except TypeError:
# This happens for unhashable values,
# use a list instead and redo.
other = list(b)
isect = [item for item in a if item in other]
return isect
def lists_difference(*args, **kwargs):
lists = args
flatten = kwargs.pop('flatten', False)
if kwargs:
# Some unused kwargs remain
raise AnsibleFilterError(
"lists_difference() got unexpected keywords arguments: {0}".format(
", ".join(kwargs.keys())
)
)
if flatten:
lists = flatten_list(args)
if not lists:
return []
if len(lists) == 1:
return lists[0]
a = remove_duplicates(lists[0])
for b in lists[1:]:
a = do_difference(a, b)
return a
def do_difference(a, b):
diff = []
try:
other = set(b)
diff = [item for item in a if item not in other]
except TypeError:
# This happens for unhashable values,
# use a list instead and redo.
other = list(b)
diff = [item for item in a if item not in other]
return diff
def lists_symmetric_difference(*args, **kwargs):
lists = args
flatten = kwargs.pop('flatten', False)
if kwargs:
# Some unused kwargs remain
raise AnsibleFilterError(
"lists_difference() got unexpected keywords arguments: {0}".format(
", ".join(kwargs.keys())
)
)
if flatten:
lists = flatten_list(args)
if not lists:
return []
if len(lists) == 1:
return lists[0]
a = lists[0]
for b in lists[1:]:
a = do_symmetric_difference(a, b)
return a
def do_symmetric_difference(a, b):
sym_diff = []
union = lists_union(a, b)
try:
isect = set(a) & set(b)
sym_diff = [item for item in union if item not in isect]
except TypeError:
# This happens for unhashable values,
# build the intersection of `a` and `b` backed
# by a list instead of a set and redo.
isect = lists_intersect(a, b)
sym_diff = [item for item in union if item not in isect]
return sym_diff
class FilterModule(object):
''' Ansible lists jinja2 filters '''
def filters(self):
return {
'lists_union': lists_union,
'lists_intersect': lists_intersect,
'lists_difference': lists_difference,
'lists_symmetric_difference': lists_symmetric_difference,
}