mirror of
https://github.com/ansible-collections/community.general.git
synced 2024-09-14 20:13:21 +02:00
102a0857db
New filters lists_union, lists_intersect, lists_difference and lists_symmetric_difference added. Signed-off-by: Christoph Fiehe <c.fiehe@eurodata.de> Co-authored-by: Christoph Fiehe <c.fiehe@eurodata.de>
210 lines
5.1 KiB
Python
210 lines
5.1 KiB
Python
# -*- coding: utf-8 -*-
|
|
# Copyright (c) Ansible Project
|
|
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
|
|
# SPDX-License-Identifier: GPL-3.0-or-later
|
|
|
|
from __future__ import absolute_import, division, print_function
|
|
__metaclass__ = type
|
|
|
|
from ansible.errors import AnsibleFilterError
|
|
from ansible.module_utils.common.collections import is_sequence
|
|
|
|
|
|
def remove_duplicates(lst):
|
|
seen = set()
|
|
seen_add = seen.add
|
|
result = []
|
|
for item in lst:
|
|
try:
|
|
if item not in seen:
|
|
seen_add(item)
|
|
result.append(item)
|
|
except TypeError:
|
|
# This happens for unhashable values `item`. If this happens,
|
|
# convert `seen` to a list and continue.
|
|
seen = list(seen)
|
|
seen_add = seen.append
|
|
if item not in seen:
|
|
seen_add(item)
|
|
result.append(item)
|
|
return result
|
|
|
|
|
|
def flatten_list(lst):
|
|
result = []
|
|
for sublist in lst:
|
|
if not is_sequence(sublist):
|
|
msg = ("All arguments must be lists. %s is %s")
|
|
raise AnsibleFilterError(msg % (sublist, type(sublist)))
|
|
if len(sublist) > 0:
|
|
if all(is_sequence(sub) for sub in sublist):
|
|
for item in sublist:
|
|
result.append(item)
|
|
else:
|
|
result.append(sublist)
|
|
return result
|
|
|
|
|
|
def lists_union(*args, **kwargs):
|
|
lists = args
|
|
flatten = kwargs.pop('flatten', False)
|
|
|
|
if kwargs:
|
|
# Some unused kwargs remain
|
|
raise AnsibleFilterError(
|
|
"lists_union() got unexpected keywords arguments: {0}".format(
|
|
", ".join(kwargs.keys())
|
|
)
|
|
)
|
|
|
|
if flatten:
|
|
lists = flatten_list(args)
|
|
|
|
if not lists:
|
|
return []
|
|
|
|
if len(lists) == 1:
|
|
return lists[0]
|
|
|
|
a = lists[0]
|
|
for b in lists[1:]:
|
|
a = do_union(a, b)
|
|
return remove_duplicates(a)
|
|
|
|
|
|
def do_union(a, b):
|
|
return a + b
|
|
|
|
|
|
def lists_intersect(*args, **kwargs):
|
|
lists = args
|
|
flatten = kwargs.pop('flatten', False)
|
|
|
|
if kwargs:
|
|
# Some unused kwargs remain
|
|
raise AnsibleFilterError(
|
|
"lists_intersect() got unexpected keywords arguments: {0}".format(
|
|
", ".join(kwargs.keys())
|
|
)
|
|
)
|
|
|
|
if flatten:
|
|
lists = flatten_list(args)
|
|
|
|
if not lists:
|
|
return []
|
|
|
|
if len(lists) == 1:
|
|
return lists[0]
|
|
|
|
a = remove_duplicates(lists[0])
|
|
for b in lists[1:]:
|
|
a = do_intersect(a, b)
|
|
return a
|
|
|
|
|
|
def do_intersect(a, b):
|
|
isect = []
|
|
try:
|
|
other = set(b)
|
|
isect = [item for item in a if item in other]
|
|
except TypeError:
|
|
# This happens for unhashable values,
|
|
# use a list instead and redo.
|
|
other = list(b)
|
|
isect = [item for item in a if item in other]
|
|
return isect
|
|
|
|
|
|
def lists_difference(*args, **kwargs):
|
|
lists = args
|
|
flatten = kwargs.pop('flatten', False)
|
|
|
|
if kwargs:
|
|
# Some unused kwargs remain
|
|
raise AnsibleFilterError(
|
|
"lists_difference() got unexpected keywords arguments: {0}".format(
|
|
", ".join(kwargs.keys())
|
|
)
|
|
)
|
|
|
|
if flatten:
|
|
lists = flatten_list(args)
|
|
|
|
if not lists:
|
|
return []
|
|
|
|
if len(lists) == 1:
|
|
return lists[0]
|
|
|
|
a = remove_duplicates(lists[0])
|
|
for b in lists[1:]:
|
|
a = do_difference(a, b)
|
|
return a
|
|
|
|
|
|
def do_difference(a, b):
|
|
diff = []
|
|
try:
|
|
other = set(b)
|
|
diff = [item for item in a if item not in other]
|
|
except TypeError:
|
|
# This happens for unhashable values,
|
|
# use a list instead and redo.
|
|
other = list(b)
|
|
diff = [item for item in a if item not in other]
|
|
return diff
|
|
|
|
|
|
def lists_symmetric_difference(*args, **kwargs):
|
|
lists = args
|
|
flatten = kwargs.pop('flatten', False)
|
|
|
|
if kwargs:
|
|
# Some unused kwargs remain
|
|
raise AnsibleFilterError(
|
|
"lists_difference() got unexpected keywords arguments: {0}".format(
|
|
", ".join(kwargs.keys())
|
|
)
|
|
)
|
|
|
|
if flatten:
|
|
lists = flatten_list(args)
|
|
|
|
if not lists:
|
|
return []
|
|
|
|
if len(lists) == 1:
|
|
return lists[0]
|
|
|
|
a = lists[0]
|
|
for b in lists[1:]:
|
|
a = do_symmetric_difference(a, b)
|
|
return a
|
|
|
|
|
|
def do_symmetric_difference(a, b):
|
|
sym_diff = []
|
|
union = lists_union(a, b)
|
|
try:
|
|
isect = set(a) & set(b)
|
|
sym_diff = [item for item in union if item not in isect]
|
|
except TypeError:
|
|
# This happens for unhashable values,
|
|
# build the intersection of `a` and `b` backed
|
|
# by a list instead of a set and redo.
|
|
isect = lists_intersect(a, b)
|
|
sym_diff = [item for item in union if item not in isect]
|
|
return sym_diff
|
|
|
|
|
|
class FilterModule(object):
|
|
''' Ansible lists jinja2 filters '''
|
|
|
|
def filters(self):
|
|
return {
|
|
'lists_union': lists_union,
|
|
'lists_intersect': lists_intersect,
|
|
'lists_difference': lists_difference,
|
|
'lists_symmetric_difference': lists_symmetric_difference,
|
|
}
|