1
0
Fork 0
mirror of https://github.com/ansible-collections/community.general.git synced 2024-09-14 20:13:21 +02:00

bitwarden - add support for filtering by organization_id (#8188)

* bitwarden - add support for filtering by organization_id

* Update changelogs/fragments/8188-bitwarden-add-organization_id.yml

Co-authored-by: Felix Fontein <felix@fontein.de>

* implement PR discussion result on wording

* rewrite search_field filtering

To correctly handle organization_id and collection_id by passing both to bw.
Tests needed to be extended to filter organizations / collections and
the testdata needed changes to reflect that a collection always belongs to a
single organizaion

---------

Co-authored-by: Felix Fontein <felix@fontein.de>
This commit is contained in:
Kai 2024-04-20 12:12:45 +02:00 committed by GitHub
parent 7fd37ea247
commit 865de5baa0
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 100 additions and 28 deletions

View file

@ -0,0 +1,2 @@
minor_changes:
- bitwarden lookup plugin - add support to filter by organization ID (https://github.com/ansible-collections/community.general/pull/8188).

View file

@ -29,7 +29,7 @@ DOCUMENTATION = """
- Field to retrieve, for example V(name) or V(id). - Field to retrieve, for example V(name) or V(id).
- If set to V(id), only zero or one element can be returned. - If set to V(id), only zero or one element can be returned.
Use the Jinja C(first) filter to get the only list element. Use the Jinja C(first) filter to get the only list element.
- When O(collection_id) is set, this field can be undefined to retrieve the whole collection records. - If set to V(None) or V(''), or if O(_terms) is empty, records are not filtered by fields.
type: str type: str
default: name default: name
version_added: 5.7.0 version_added: 5.7.0
@ -40,6 +40,10 @@ DOCUMENTATION = """
description: Collection ID to filter results by collection. Leave unset to skip filtering. description: Collection ID to filter results by collection. Leave unset to skip filtering.
type: str type: str
version_added: 6.3.0 version_added: 6.3.0
organization_id:
description: Organization ID to filter results by organization. Leave unset to skip filtering.
type: str
version_added: 8.5.0
bw_session: bw_session:
description: Pass session key instead of reading from env. description: Pass session key instead of reading from env.
type: str type: str
@ -142,45 +146,44 @@ class Bitwarden(object):
raise BitwardenException(err) raise BitwardenException(err)
return to_text(out, errors='surrogate_or_strict'), to_text(err, errors='surrogate_or_strict') return to_text(out, errors='surrogate_or_strict'), to_text(err, errors='surrogate_or_strict')
def _get_matches(self, search_value, search_field, collection_id=None): def _get_matches(self, search_value, search_field, collection_id=None, organization_id=None):
"""Return matching records whose search_field is equal to key. """Return matching records whose search_field is equal to key.
""" """
# Prepare set of params for Bitwarden CLI # Prepare set of params for Bitwarden CLI
if search_value:
if search_field == 'id': if search_field == 'id':
params = ['get', 'item', search_value] params = ['get', 'item', search_value]
else: else:
params = ['list', 'items', '--search', search_value] params = ['list', 'items']
if search_value:
params.extend(['--search', search_value])
if collection_id: if collection_id:
params.extend(['--collectionid', collection_id]) params.extend(['--collectionid', collection_id])
else: if organization_id:
if not collection_id: params.extend(['--organizationid', organization_id])
raise AnsibleError("search_value is required if collection_id is not set.")
params = ['list', 'items', '--collectionid', collection_id]
out, err = self._run(params) out, err = self._run(params)
# This includes things that matched in different fields. # This includes things that matched in different fields.
initial_matches = AnsibleJSONDecoder().raw_decode(out)[0] initial_matches = AnsibleJSONDecoder().raw_decode(out)[0]
if search_field == 'id' or not search_value: if search_field == 'id':
if initial_matches is None: if initial_matches is None:
initial_matches = [] initial_matches = []
else: else:
initial_matches = [initial_matches] initial_matches = [initial_matches]
# Filter to only include results from the right field. # Filter to only include results from the right field.
return [item for item in initial_matches if item[search_field] == search_value] return [item for item in initial_matches if not search_value or item[search_field] == search_value]
def get_field(self, field, search_value=None, search_field="name", collection_id=None): def get_field(self, field, search_value, search_field="name", collection_id=None, organization_id=None):
"""Return a list of the specified field for records whose search_field match search_value """Return a list of the specified field for records whose search_field match search_value
and filtered by collection if collection has been provided. and filtered by collection if collection has been provided.
If field is None, return the whole record for each match. If field is None, return the whole record for each match.
""" """
matches = self._get_matches(search_value, search_field, collection_id) matches = self._get_matches(search_value, search_field, collection_id, organization_id)
if not field: if not field:
return matches return matches
field_matches = [] field_matches = []
@ -215,15 +218,16 @@ class LookupModule(LookupBase):
field = self.get_option('field') field = self.get_option('field')
search_field = self.get_option('search') search_field = self.get_option('search')
collection_id = self.get_option('collection_id') collection_id = self.get_option('collection_id')
organization_id = self.get_option('organization_id')
_bitwarden.session = self.get_option('bw_session') _bitwarden.session = self.get_option('bw_session')
if not _bitwarden.unlocked: if not _bitwarden.unlocked:
raise AnsibleError("Bitwarden Vault locked. Run 'bw unlock'.") raise AnsibleError("Bitwarden Vault locked. Run 'bw unlock'.")
if not terms: if not terms:
return [_bitwarden.get_field(field, None, search_field, collection_id)] terms = [None]
return [_bitwarden.get_field(field, term, search_field, collection_id) for term in terms] return [_bitwarden.get_field(field, term, search_field, collection_id, organization_id) for term in terms]
_bitwarden = Bitwarden() _bitwarden = Bitwarden()

View file

@ -6,6 +6,7 @@
from __future__ import (absolute_import, division, print_function) from __future__ import (absolute_import, division, print_function)
__metaclass__ = type __metaclass__ = type
import re
from ansible_collections.community.general.tests.unit.compat import unittest from ansible_collections.community.general.tests.unit.compat import unittest
from ansible_collections.community.general.tests.unit.compat.mock import patch from ansible_collections.community.general.tests.unit.compat.mock import patch
@ -13,8 +14,10 @@ from ansible.errors import AnsibleError
from ansible.module_utils import six from ansible.module_utils import six
from ansible.plugins.loader import lookup_loader from ansible.plugins.loader import lookup_loader
from ansible_collections.community.general.plugins.lookup.bitwarden import Bitwarden from ansible_collections.community.general.plugins.lookup.bitwarden import Bitwarden
from ansible.parsing.ajson import AnsibleJSONEncoder
MOCK_COLLECTION_ID = "3b12a9da-7c49-40b8-ad33-aede017a7ead" MOCK_COLLECTION_ID = "3b12a9da-7c49-40b8-ad33-aede017a7ead"
MOCK_ORGANIZATION_ID = "292ba0c6-f289-11ee-9301-ef7b639ccd2a"
MOCK_RECORDS = [ MOCK_RECORDS = [
{ {
@ -48,7 +51,7 @@ MOCK_RECORDS = [
"name": "a_test", "name": "a_test",
"notes": None, "notes": None,
"object": "item", "object": "item",
"organizationId": None, "organizationId": MOCK_ORGANIZATION_ID,
"passwordHistory": [ "passwordHistory": [
{ {
"lastUsedDate": "2022-07-26T23:03:23.405Z", "lastUsedDate": "2022-07-26T23:03:23.405Z",
@ -68,9 +71,7 @@ MOCK_RECORDS = [
"type": 1 "type": 1
}, },
{ {
"collectionIds": [ "collectionIds": [],
MOCK_COLLECTION_ID
],
"deletedDate": None, "deletedDate": None,
"favorite": False, "favorite": False,
"folderId": None, "folderId": None,
@ -106,10 +107,30 @@ MOCK_RECORDS = [
"name": "dupe_name", "name": "dupe_name",
"notes": None, "notes": None,
"object": "item", "object": "item",
"organizationId": None, "organizationId": MOCK_ORGANIZATION_ID,
"reprompt": 0, "reprompt": 0,
"revisionDate": "2022-07-27T03:42:46.673Z", "revisionDate": "2022-07-27T03:42:46.673Z",
"type": 1 "type": 1
},
{
"collectionIds": [],
"deletedDate": None,
"favorite": False,
"folderId": None,
"id": "2bf517be-fb13-11ee-be89-a345aa369a94",
"login": {
"password": "e",
"passwordRevisionDate": None,
"totp": None,
"username": "f"
},
"name": "non_collection_org_record",
"notes": None,
"object": "item",
"organizationId": MOCK_ORGANIZATION_ID,
"reprompt": 0,
"revisionDate": "2024-14-15T11:30:00.000Z",
"type": 1
} }
] ]
@ -118,11 +139,41 @@ class MockBitwarden(Bitwarden):
unlocked = True unlocked = True
def _get_matches(self, search_value=None, search_field="name", collection_id=None): def _run(self, args, stdin=None, expected_rc=0):
if not search_value and collection_id: if args[0] == 'get':
return list(filter(lambda record: collection_id in record['collectionIds'], MOCK_RECORDS)) if args[1] == 'item':
for item in MOCK_RECORDS:
if item.get('id') == args[2]:
return AnsibleJSONEncoder().encode(item), ''
if args[0] == 'list':
if args[1] == 'items':
try:
search_value = args[args.index('--search') + 1]
except ValueError:
search_value = None
return list(filter(lambda record: record[search_field] == search_value, MOCK_RECORDS)) try:
collection_to_filter = args[args.index('--collectionid') + 1]
except ValueError:
collection_to_filter = None
try:
organization_to_filter = args[args.index('--organizationid') + 1]
except ValueError:
organization_to_filter = None
items = []
for item in MOCK_RECORDS:
if search_value and not re.search(search_value, item.get('name')):
continue
if collection_to_filter and collection_to_filter not in item.get('collectionIds', []):
continue
if organization_to_filter and item.get('organizationId') != organization_to_filter:
continue
items.append(item)
return AnsibleJSONEncoder().encode(items), ''
return '[]', ''
class LoggedOutMockBitwarden(MockBitwarden): class LoggedOutMockBitwarden(MockBitwarden):
@ -194,4 +245,19 @@ class TestLookupModule(unittest.TestCase):
@patch('ansible_collections.community.general.plugins.lookup.bitwarden._bitwarden', new=MockBitwarden()) @patch('ansible_collections.community.general.plugins.lookup.bitwarden._bitwarden', new=MockBitwarden())
def test_bitwarden_plugin_full_collection(self): def test_bitwarden_plugin_full_collection(self):
# Try to retrieve the full records of the given collection. # Try to retrieve the full records of the given collection.
self.assertEqual(MOCK_RECORDS, self.lookup.run(None, collection_id=MOCK_COLLECTION_ID)[0]) self.assertEqual([MOCK_RECORDS[0], MOCK_RECORDS[2]], self.lookup.run(None, collection_id=MOCK_COLLECTION_ID)[0])
@patch('ansible_collections.community.general.plugins.lookup.bitwarden._bitwarden', new=MockBitwarden())
def test_bitwarden_plugin_full_organization(self):
self.assertEqual([MOCK_RECORDS[0], MOCK_RECORDS[2], MOCK_RECORDS[3]],
self.lookup.run(None, organization_id=MOCK_ORGANIZATION_ID)[0])
@patch('ansible_collections.community.general.plugins.lookup.bitwarden._bitwarden', new=MockBitwarden())
def test_bitwarden_plugin_filter_organization(self):
self.assertEqual([MOCK_RECORDS[2]],
self.lookup.run(['dupe_name'], organization_id=MOCK_ORGANIZATION_ID)[0])
@patch('ansible_collections.community.general.plugins.lookup.bitwarden._bitwarden', new=MockBitwarden())
def test_bitwarden_plugin_full_collection_organization(self):
self.assertEqual([MOCK_RECORDS[0], MOCK_RECORDS[2]], self.lookup.run(None,
collection_id=MOCK_COLLECTION_ID, organization_id=MOCK_ORGANIZATION_ID)[0])