1
0
Fork 0
mirror of https://github.com/ansible-collections/community.general.git synced 2024-09-14 20:13:21 +02:00
community.general/plugins/lookup/bitwarden.py
Christoph bf117c839c
Clarify Error message when bitwarden vault not unlocked (#5811)
* Clarify Error message when vault not unlocked

You can be logged into the Bitwarden-CLI, but it can still be locked. This took me several hours to debug, since every time I ran 'bw login' it told me, that I am already logged in.
If you run 'bw unlock' without being logged in, you are prompted to log in. 
This clarifies the Error occurring and can drastically reduce debugging time, since you don't have to look into the source code to get an understanding of whats wrong.

* RM: negation

Nobody needs negation

* Update function name

* FIX: tests

* ADD: changelog

* Update changelogs/fragments/5811-clarify-bitwarden-error.yml

Co-authored-by: Felix Fontein <felix@fontein.de>

Co-authored-by: Felix Fontein <felix@fontein.de>
2023-01-22 17:29:11 +01:00

144 lines
4.9 KiB
Python

# -*- coding: utf-8 -*-
# Copyright (c) 2022, Jonathan Lung <lungj@heresjono.com>
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
# SPDX-License-Identifier: GPL-3.0-or-later
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
DOCUMENTATION = """
name: bitwarden
author:
- Jonathan Lung (@lungj) <lungj@heresjono.com>
requirements:
- bw (command line utility)
- be logged into bitwarden
short_description: Retrieve secrets from Bitwarden
version_added: 5.4.0
description:
- Retrieve secrets from Bitwarden.
options:
_terms:
description: Key(s) to fetch values for from login info.
required: true
type: list
elements: str
search:
description: Field to retrieve, for example C(name) or C(id).
type: str
default: name
version_added: 5.7.0
field:
description: Field to fetch; leave unset to fetch whole response.
type: str
"""
EXAMPLES = """
- name: "Get 'password' from Bitwarden record named 'a_test'"
ansible.builtin.debug:
msg: >-
{{ lookup('community.general.bitwarden', 'a_test', field='password') }}
- name: "Get 'password' from Bitwarden record with id 'bafba515-af11-47e6-abe3-af1200cd18b2'"
ansible.builtin.debug:
msg: >-
{{ lookup('community.general.bitwarden', 'bafba515-af11-47e6-abe3-af1200cd18b2', search='id', field='password') }}
- name: "Get full Bitwarden record named 'a_test'"
ansible.builtin.debug:
msg: >-
{{ lookup('community.general.bitwarden', 'a_test') }}
- name: "Get custom field 'api_key' from Bitwarden record named 'a_test'"
ansible.builtin.debug:
msg: >-
{{ lookup('community.general.bitwarden', 'a_test', field='api_key') }}
"""
RETURN = """
_raw:
description: List of requested field or JSON object of list of matches.
type: list
elements: raw
"""
from subprocess import Popen, PIPE
from ansible.errors import AnsibleError
from ansible.module_utils.common.text.converters import to_bytes, to_text
from ansible.parsing.ajson import AnsibleJSONDecoder
from ansible.plugins.lookup import LookupBase
class BitwardenException(AnsibleError):
pass
class Bitwarden(object):
def __init__(self, path='bw'):
self._cli_path = path
@property
def cli_path(self):
return self._cli_path
@property
def unlocked(self):
out, err = self._run(['status'], stdin="")
decoded = AnsibleJSONDecoder().raw_decode(out)[0]
return decoded['status'] == 'unlocked'
def _run(self, args, stdin=None, expected_rc=0):
p = Popen([self.cli_path] + args, stdout=PIPE, stderr=PIPE, stdin=PIPE)
out, err = p.communicate(to_bytes(stdin))
rc = p.wait()
if rc != expected_rc:
raise BitwardenException(err)
return to_text(out, errors='surrogate_or_strict'), to_text(err, errors='surrogate_or_strict')
def _get_matches(self, search_value, search_field):
"""Return matching records whose search_field is equal to key.
"""
out, err = self._run(['list', 'items', '--search', search_value])
# This includes things that matched in different fields.
initial_matches = AnsibleJSONDecoder().raw_decode(out)[0]
# Filter to only include results from the right field.
return [item for item in initial_matches if item[search_field] == search_value]
def get_field(self, field, search_value, search_field="name"):
"""Return a list of the specified field for records whose search_field match search_value.
If field is None, return the whole record for each match.
"""
matches = self._get_matches(search_value, search_field)
if field in ['autofillOnPageLoad', 'password', 'passwordRevisionDate', 'totp', 'uris', 'username']:
return [match['login'][field] for match in matches]
elif not field:
return matches
else:
custom_field_matches = []
for match in matches:
for custom_field in match['fields']:
if custom_field['name'] == field:
custom_field_matches.append(custom_field['value'])
if matches and not custom_field_matches:
raise AnsibleError("Custom field {field} does not exist in {search_value}".format(field=field, search_value=search_value))
return custom_field_matches
class LookupModule(LookupBase):
def run(self, terms, variables=None, **kwargs):
self.set_options(var_options=variables, direct=kwargs)
field = self.get_option('field')
search_field = self.get_option('search')
if not _bitwarden.unlocked:
raise AnsibleError("Bitwarden Vault locked. Run 'bw unlock'.")
return [_bitwarden.get_field(field, term, search_field) for term in terms]
_bitwarden = Bitwarden()