diff --git a/.github/BOTMETA.yml b/.github/BOTMETA.yml index 773e334cc7..1061fdae80 100644 --- a/.github/BOTMETA.yml +++ b/.github/BOTMETA.yml @@ -197,6 +197,8 @@ files: $inventories/virtualbox.py: {} $lookups/: labels: lookups + $lookups/bitwarden.py: + maintainers: lungj $lookups/cartesian.py: {} $lookups/chef_databag.py: {} $lookups/collection_version.py: diff --git a/plugins/lookup/bitwarden.py b/plugins/lookup/bitwarden.py new file mode 100644 index 0000000000..29a5330243 --- /dev/null +++ b/plugins/lookup/bitwarden.py @@ -0,0 +1,118 @@ +# -*- coding: utf-8 -*- +# (c) 2022, Jonathan Lung +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +DOCUMENTATION = """ + name: bitwarden + author: + - Jonathan Lung (@lungj) + requirements: + - bw (command line utility) + - be logged into bitwarden + short_description: Retrieve secrets from Bitwarden + version_added: 5.4.0 + description: + - Retrieve secrets from Bitwarden. + options: + _terms: + description: Key(s) to fetch values for from login info. + required: true + type: list + elements: str + field: + description: Field to fetch; leave unset to fetch whole response. + type: str +""" + +EXAMPLES = """ +- name: "Get 'password' from Bitwarden record named 'a_test'" + ansible.builtin.debug: + msg: >- + {{ lookup('community.general.bitwarden', 'a_test', field='password') }} + +- name: "Get full Bitwarden record named 'a_test'" + ansible.builtin.debug: + msg: >- + {{ lookup('community.general.bitwarden', 'a_test') }} +""" + +RETURN = """ + _raw: + description: List of requested field or JSON object of list of matches. + type: list + elements: raw +""" + +from subprocess import Popen, PIPE + +from ansible.errors import AnsibleError +from ansible.module_utils.common.text.converters import to_bytes, to_text +from ansible.parsing.ajson import AnsibleJSONDecoder +from ansible.plugins.lookup import LookupBase + + +class BitwardenException(AnsibleError): + pass + + +class Bitwarden(object): + + def __init__(self, path='bw'): + self._cli_path = path + + @property + def cli_path(self): + return self._cli_path + + @property + def logged_in(self): + out, err = self._run(['status'], stdin="") + decoded = AnsibleJSONDecoder().raw_decode(out)[0] + return decoded['status'] == 'unlocked' + + def _run(self, args, stdin=None, expected_rc=0): + p = Popen([self.cli_path] + args, stdout=PIPE, stderr=PIPE, stdin=PIPE) + out, err = p.communicate(to_bytes(stdin)) + rc = p.wait() + if rc != expected_rc: + raise BitwardenException(err) + return to_text(out, errors='surrogate_or_strict'), to_text(err, errors='surrogate_or_strict') + + def _get_matches(self, search_value, search_field="name"): + """Return matching records whose search_field is equal to key. + """ + out, err = self._run(['list', 'items', '--search', search_value]) + + # This includes things that matched in different fields. + initial_matches = AnsibleJSONDecoder().raw_decode(out)[0] + + # Filter to only include results from the right field. + return [item for item in initial_matches if item[search_field] == search_value] + + def get_field(self, field, search_value, search_field="name"): + """Return a list of the specified field for records whose search_field match search_value. + + If field is None, return the whole record for each match. + """ + matches = self._get_matches(search_value) + + if field: + return [match['login'][field] for match in matches] + + return matches + + +class LookupModule(LookupBase): + + def run(self, terms, variables=None, **kwargs): + self.set_options(var_options=variables, direct=kwargs) + field = self.get_option('field') + if not _bitwarden.logged_in: + raise AnsibleError("Not logged into Bitwarden. Run 'bw login'.") + + return [_bitwarden.get_field(field, term) for term in terms] + + +_bitwarden = Bitwarden() diff --git a/tests/unit/plugins/lookup/test_bitwarden.py b/tests/unit/plugins/lookup/test_bitwarden.py new file mode 100644 index 0000000000..893cbc06f0 --- /dev/null +++ b/tests/unit/plugins/lookup/test_bitwarden.py @@ -0,0 +1,161 @@ +# -*- coding: utf-8 -*- +# (c) 2022, Jonathan Lung +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +from argparse import ArgumentParser + +from ansible_collections.community.general.tests.unit.compat import unittest +from ansible_collections.community.general.tests.unit.compat.mock import patch + +from ansible.errors import AnsibleError +from ansible.module_utils import six +from ansible.plugins.loader import lookup_loader +from ansible_collections.community.general.plugins.lookup.bitwarden import LookupModule, Bitwarden, BitwardenException + + +MOCK_RECORDS = [ + { + "collectionIds": [], + "deletedDate": None, + "favorite": False, + "fields": [ + { + "linkedId": None, + "name": "a_new_secret", + "type": 1, + "value": "this is a new secret" + }, + { + "linkedId": None, + "name": "not so secret", + "type": 0, + "value": "not secret" + } + ], + "folderId": "3b12a9da-7c49-40b8-ad33-aede017a7ead", + "id": "90992f63-ddb6-4e76-8bfc-aede016ca5eb", + "login": { + "password": "passwordA3", + "passwordRevisionDate": "2022-07-26T23:03:23.399Z", + "totp": None, + "username": "userA" + }, + "name": "a_test", + "notes": None, + "object": "item", + "organizationId": None, + "passwordHistory": [ + { + "lastUsedDate": "2022-07-26T23:03:23.405Z", + "password": "a_new_secret: this is secret" + }, + { + "lastUsedDate": "2022-07-26T23:03:23.399Z", + "password": "passwordA2" + }, + { + "lastUsedDate": "2022-07-26T22:59:52.885Z", + "password": "passwordA" + } + ], + "reprompt": 0, + "revisionDate": "2022-07-26T23:03:23.743Z", + "type": 1 + }, + { + "collectionIds": [], + "deletedDate": None, + "favorite": False, + "folderId": None, + "id": "5ebd4d31-104c-49fc-a09c-aedf003d28ad", + "login": { + "password": "b", + "passwordRevisionDate": None, + "totp": None, + "username": "a" + }, + "name": "dupe_name", + "notes": None, + "object": "item", + "organizationId": None, + "reprompt": 0, + "revisionDate": "2022-07-27T03:42:40.353Z", + "type": 1 + }, + { + "collectionIds": [], + "deletedDate": None, + "favorite": False, + "folderId": None, + "id": "90657653-6695-496d-9431-aedf003d3015", + "login": { + "password": "d", + "passwordRevisionDate": None, + "totp": None, + "username": "c" + }, + "name": "dupe_name", + "notes": None, + "object": "item", + "organizationId": None, + "reprompt": 0, + "revisionDate": "2022-07-27T03:42:46.673Z", + "type": 1 + } +] + + +class MockBitwarden(Bitwarden): + + logged_in = True + + def _get_matches(self, search_value, search_field="name"): + return list(filter(lambda record: record[search_field] == search_value, MOCK_RECORDS)) + + +class LoggedOutMockBitwarden(MockBitwarden): + + logged_in = False + + +class TestLookupModule(unittest.TestCase): + + def setUp(self): + self.lookup = lookup_loader.get('community.general.bitwarden') + + @patch('ansible_collections.community.general.plugins.lookup.bitwarden._bitwarden', new=MockBitwarden()) + def test_bitwarden_plugin_no_match(self): + # Entry 0, "a_test" of the test input should have no duplicates. + self.assertEqual([], self.lookup.run(['not_here'], field='password')[0]) + + @patch('ansible_collections.community.general.plugins.lookup.bitwarden._bitwarden', new=MockBitwarden()) + def test_bitwarden_plugin_fields(self): + # Entry 0, "a_test" of the test input should have no duplicates. + record = MOCK_RECORDS[0] + record_name = record['name'] + for k, v in six.iteritems(record['login']): + self.assertEqual([v], + self.lookup.run([record_name], field=k)[0]) + + @patch('ansible_collections.community.general.plugins.lookup.bitwarden._bitwarden', new=MockBitwarden()) + def test_bitwarden_plugin_duplicates(self): + # There are two records with name dupe_name; we need to be order-insensitive with + # checking what was retrieved. + self.assertEqual(set(['b', 'd']), + set(self.lookup.run(['dupe_name'], field='password')[0])) + + @patch('ansible_collections.community.general.plugins.lookup.bitwarden._bitwarden', new=MockBitwarden()) + def test_bitwarden_plugin_full_item(self): + # Try to retrieve the full record of the first entry where the name is "a_name". + self.assertEqual([MOCK_RECORDS[0]], + self.lookup.run(['a_test'])[0]) + + @patch('ansible_collections.community.general.plugins.lookup.bitwarden._bitwarden', LoggedOutMockBitwarden()) + def test_bitwarden_plugin_logged_out(self): + record = MOCK_RECORDS[0] + record_name = record['name'] + with self.assertRaises(AnsibleError): + self.lookup.run([record_name], field='password')