1
0
Fork 0
mirror of https://github.com/ansible-collections/community.general.git synced 2024-09-14 20:13:21 +02:00

feat(lookup/bitwarden): add support for fetching all items from a collection (#8013)

Signed-off-by: Emilien Escalle <emilien.escalle@escemi.com>
This commit is contained in:
Emilien Escalle 2024-03-24 18:04:36 +01:00 committed by GitHub
parent 9f5193e40b
commit f5fa16c881
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 51 additions and 14 deletions

View file

@ -0,0 +1,2 @@
minor_changes:
- "bitwarden lookup plugin - allows to fetch all records of a given collection ID, by allowing to pass an empty value for ``search_value`` when ``collection_id`` is provided (https://github.com/ansible-collections/community.general/pull/8013)."

View file

@ -29,6 +29,7 @@ DOCUMENTATION = """
- Field to retrieve, for example V(name) or V(id).
- If set to V(id), only zero or one element can be returned.
Use the Jinja C(first) filter to get the only list element.
- When O(collection_id) is set, this field can be undefined to retrieve the whole collection records.
type: str
default: name
version_added: 5.7.0
@ -75,6 +76,11 @@ EXAMPLES = """
ansible.builtin.debug:
msg: >-
{{ lookup('community.general.bitwarden', 'a_test', field='password', bw_session='bXZ9B5TXi6...') }}
- name: "Get all Bitwarden records from collection"
ansible.builtin.debug:
msg: >-
{{ lookup('community.general.bitwarden', None, collection_id='bafba515-af11-47e6-abe3-af1200cd18b2') }}
"""
RETURN = """
@ -136,32 +142,39 @@ class Bitwarden(object):
raise BitwardenException(err)
return to_text(out, errors='surrogate_or_strict'), to_text(err, errors='surrogate_or_strict')
def _get_matches(self, search_value, search_field, collection_id):
def _get_matches(self, search_value, search_field, collection_id=None):
"""Return matching records whose search_field is equal to key.
"""
# Prepare set of params for Bitwarden CLI
if search_field == 'id':
params = ['get', 'item', search_value]
if search_value:
if search_field == 'id':
params = ['get', 'item', search_value]
else:
params = ['list', 'items', '--search', search_value]
if collection_id:
params.extend(['--collectionid', collection_id])
else:
params = ['list', 'items', '--search', search_value]
if not collection_id:
raise AnsibleError("search_value is required if collection_id is not set.")
if collection_id:
params.extend(['--collectionid', collection_id])
params = ['list', 'items', '--collectionid', collection_id]
out, err = self._run(params)
# This includes things that matched in different fields.
initial_matches = AnsibleJSONDecoder().raw_decode(out)[0]
if search_field == 'id':
if search_field == 'id' or not search_value:
if initial_matches is None:
initial_matches = []
else:
initial_matches = [initial_matches]
# Filter to only include results from the right field.
return [item for item in initial_matches if item[search_field] == search_value]
def get_field(self, field, search_value, search_field="name", collection_id=None):
def get_field(self, field, search_value=None, search_field="name", collection_id=None):
"""Return a list of the specified field for records whose search_field match search_value
and filtered by collection if collection has been provided.
@ -188,14 +201,16 @@ class Bitwarden(object):
if field in match:
field_matches.append(match[field])
continue
if matches and not field_matches:
raise AnsibleError("field {field} does not exist in {search_value}".format(field=field, search_value=search_value))
return field_matches
class LookupModule(LookupBase):
def run(self, terms, variables=None, **kwargs):
def run(self, terms=None, variables=None, **kwargs):
self.set_options(var_options=variables, direct=kwargs)
field = self.get_option('field')
search_field = self.get_option('search')
@ -205,6 +220,9 @@ class LookupModule(LookupBase):
if not _bitwarden.unlocked:
raise AnsibleError("Bitwarden Vault locked. Run 'bw unlock'.")
if not terms:
return [_bitwarden.get_field(field, None, search_field, collection_id)]
return [_bitwarden.get_field(field, term, search_field, collection_id) for term in terms]

View file

@ -14,10 +14,13 @@ from ansible.module_utils import six
from ansible.plugins.loader import lookup_loader
from ansible_collections.community.general.plugins.lookup.bitwarden import Bitwarden
MOCK_COLLECTION_ID = "3b12a9da-7c49-40b8-ad33-aede017a7ead"
MOCK_RECORDS = [
{
"collectionIds": [],
"collectionIds": [
MOCK_COLLECTION_ID
],
"deletedDate": None,
"favorite": False,
"fields": [
@ -65,7 +68,9 @@ MOCK_RECORDS = [
"type": 1
},
{
"collectionIds": [],
"collectionIds": [
MOCK_COLLECTION_ID
],
"deletedDate": None,
"favorite": False,
"folderId": None,
@ -85,7 +90,9 @@ MOCK_RECORDS = [
"type": 1
},
{
"collectionIds": [],
"collectionIds": [
MOCK_COLLECTION_ID
],
"deletedDate": None,
"favorite": False,
"folderId": None,
@ -111,7 +118,10 @@ class MockBitwarden(Bitwarden):
unlocked = True
def _get_matches(self, search_value, search_field="name", collection_id=None):
def _get_matches(self, search_value=None, search_field="name", collection_id=None):
if not search_value and collection_id:
return list(filter(lambda record: collection_id in record['collectionIds'], MOCK_RECORDS))
return list(filter(lambda record: record[search_field] == search_value, MOCK_RECORDS))
@ -156,9 +166,11 @@ class TestLookupModule(unittest.TestCase):
def test_bitwarden_plugin_unlocked(self):
record = MOCK_RECORDS[0]
record_name = record['name']
with self.assertRaises(AnsibleError):
with self.assertRaises(AnsibleError) as raised_error:
self.lookup.run([record_name], field='password')
self.assertEqual("Bitwarden Vault locked. Run 'bw unlock'.", str(raised_error.exception))
def test_bitwarden_plugin_without_session_option(self):
mock_bitwarden = MockBitwarden()
with patch("ansible_collections.community.general.plugins.lookup.bitwarden._bitwarden", mock_bitwarden):
@ -178,3 +190,8 @@ class TestLookupModule(unittest.TestCase):
self.lookup.run([record_name], field=None, bw_session=session)
self.assertEqual(mock_bitwarden.session, session)
@patch('ansible_collections.community.general.plugins.lookup.bitwarden._bitwarden', new=MockBitwarden())
def test_bitwarden_plugin_full_collection(self):
# Try to retrieve the full records of the given collection.
self.assertEqual(MOCK_RECORDS, self.lookup.run(None, collection_id=MOCK_COLLECTION_ID)[0])