From 8cd8d179804da1e864c91e0886dd4316ab36d5f6 Mon Sep 17 00:00:00 2001 From: Sam Doran Date: Thu, 30 Aug 2018 16:24:06 -0400 Subject: [PATCH] Add ability to unlock 1Password vault to lookup plugins (#44923) * Add ability to use login to 1Password vault to 1Password lookups * Adjust unit tests * Add changelog --- .../fragments/onepassword_unlock_vault.yaml | 2 + lib/ansible/plugins/lookup/onepassword.py | 68 ++++++++++++++----- lib/ansible/plugins/lookup/onepassword_raw.py | 23 +++++-- test/units/plugins/lookup/test_onepassword.py | 11 ++- 4 files changed, 81 insertions(+), 23 deletions(-) create mode 100644 changelogs/fragments/onepassword_unlock_vault.yaml diff --git a/changelogs/fragments/onepassword_unlock_vault.yaml b/changelogs/fragments/onepassword_unlock_vault.yaml new file mode 100644 index 0000000000..593c1b1ebd --- /dev/null +++ b/changelogs/fragments/onepassword_unlock_vault.yaml @@ -0,0 +1,2 @@ +minor_changes: + - onepassword/onepassword_raw - accept subdomain and vault_password to allow Ansible to unlock 1Password vaults diff --git a/lib/ansible/plugins/lookup/onepassword.py b/lib/ansible/plugins/lookup/onepassword.py index c172acd5fa..5e9aebc5f4 100644 --- a/lib/ansible/plugins/lookup/onepassword.py +++ b/lib/ansible/plugins/lookup/onepassword.py @@ -16,6 +16,7 @@ DOCUMENTATION = """ author: - Scott Buchanan - Andrew Zenk + - Sam Doran version_added: "2.6" requirements: - C(op) 1Password command line utility. See U(https://support.1password.com/command-line/) @@ -25,7 +26,7 @@ DOCUMENTATION = """ - onepassword wraps the C(op) command line utility to fetch specific field values from 1Password options: _terms: - description: identifier(s) (UUID, name or domain; case-insensitive) of item(s) to retrieve + description: identifier(s) (UUID, name, or subdomain; case-insensitive) of item(s) to retrieve required: True field: description: field to return from each matching item (case-insensitive) @@ -33,23 +34,35 @@ DOCUMENTATION = """ section: description: item section containing the field to retrieve (case-insensitive); if absent will return first match from any section default: None + subdomain: + description: The 1Password subdomain to authenticate against. + default: None + version_added: '2.7' vault: description: vault containing the item to retrieve (case-insensitive); if absent will search all vaults default: None + vault_password: + description: The password used to unlock the specified vault. + default: None + version_added: '2.7' """ EXAMPLES = """ -- name: "retrieve password for KITT" +- name: Retrieve password for KITT debug: - msg: "{{ lookup('onepassword', 'KITT') }}" + var: lookup('onepassword', 'KITT') -- name: "retrieve password for Wintermute" +- name: Retrieve password for Wintermute debug: - msg: "{{ lookup('onepassword', 'Tessier-Ashpool', section='Wintermute') }}" + var: lookup('onepassword', 'Tessier-Ashpool', section='Wintermute') -- name: "retrieve username for HAL" +- name: Retrieve username for HAL debug: - msg: "{{ lookup('onepassword', 'HAL 9000', field='username', vault='Discovery') }}" + var: lookup('onepassword', 'HAL 9000', field='username', vault='Discovery') + +- name: Retrieve password for HAL when not signed in to 1Password + debug: + var: lookup('onepassword', 'HAL 9000', subdomain='Discovery', vault_password='DmbslfLvasjdl') """ RETURN = """ @@ -64,45 +77,64 @@ from subprocess import Popen, PIPE from ansible.plugins.lookup import LookupBase from ansible.errors import AnsibleLookupError +from ansible.module_utils._text import to_bytes class OnePass(object): def __init__(self, path='op'): self._cli_path = path + self._logged_in = False + self._token = None + self._subdomain = None + self._vault_password = None @property def cli_path(self): return self._cli_path + def get_token(self): + if not self._subdomain and not self._vault_password: + raise AnsibleLookupError('Both subdomain and password are required when logging in.') + args = ['signin', self._subdomain, '--output=raw'] + rc, out, err = self._run(args, command_input=to_bytes(self._vault_password)) + self._token = out.strip() + def assert_logged_in(self): try: - self._run(["get", "account"]) + rc, out, err = self._run(['get', 'account'], ignore_errors=True) + if rc != 1: + self._logged_in = True + if not self._logged_in: + self.get_token() except OSError as e: if e.errno == errno.ENOENT: raise AnsibleLookupError("1Password CLI tool not installed in path on control machine") raise e except AnsibleLookupError: - raise AnsibleLookupError("Not logged into 1Password: please run 'op signin' first") + raise AnsibleLookupError("Not logged into 1Password: please run 'op signin' first, or provide both subdomain and vault_password.") def get_raw(self, item_id, vault=None): args = ["get", "item", item_id] if vault is not None: args += ['--vault={0}'.format(vault)] - output, dummy = self._run(args) + if not self._logged_in: + args += [to_bytes('--session=') + self._token] + rc, output, dummy = self._run(args) return output def get_field(self, item_id, field, section=None, vault=None): output = self.get_raw(item_id, vault) return self._parse_field(output, field, section) if output != '' else '' - def _run(self, args, expected_rc=0): - p = Popen([self.cli_path] + args, stdout=PIPE, stderr=PIPE, stdin=PIPE) - out, err = p.communicate() + def _run(self, args, expected_rc=0, command_input=None, ignore_errors=False): + command = [self.cli_path] + args + p = Popen(command, stdout=PIPE, stderr=PIPE, stdin=PIPE) + out, err = p.communicate(input=command_input) rc = p.wait() - if rc != expected_rc: + if not ignore_errors and rc != expected_rc: raise AnsibleLookupError(err) - return out, err + return rc, out, err def _parse_field(self, data_json, field_name, section_title=None): data = json.loads(data_json) @@ -124,11 +156,13 @@ class LookupModule(LookupBase): def run(self, terms, variables=None, **kwargs): op = OnePass() - op.assert_logged_in() - field = kwargs.get('field', 'password') section = kwargs.get('section') vault = kwargs.get('vault') + op._subdomain = kwargs.get('subdomain') + op._vault_password = kwargs.get('vault_password') + + op.assert_logged_in() values = [] for term in terms: diff --git a/lib/ansible/plugins/lookup/onepassword_raw.py b/lib/ansible/plugins/lookup/onepassword_raw.py index 9bc5e1cabd..9150fe5be3 100644 --- a/lib/ansible/plugins/lookup/onepassword_raw.py +++ b/lib/ansible/plugins/lookup/onepassword_raw.py @@ -16,6 +16,7 @@ DOCUMENTATION = """ author: - Scott Buchanan - Andrew Zenk + - Sam Doran version_added: "2.6" requirements: - C(op) 1Password command line utility. See U(https://support.1password.com/command-line/) @@ -27,15 +28,27 @@ DOCUMENTATION = """ _terms: description: identifier(s) (UUID, name, or domain; case-insensitive) of item(s) to retrieve required: True + subdomain: + description: The 1Password subdomain to authenticate against. + default: None + version_added: '2.7' vault: description: vault containing the item to retrieve (case-insensitive); if absent will search all vaults default: None + vault_password: + description: The password used to unlock the specified vault. + default: None + version_added: '2.7' """ EXAMPLES = """ -- name: "retrieve all data about Wintermute" +- name: Retrieve all data about Wintermute debug: - msg: "{{ lookup('onepassword_raw', 'Wintermute') }}" + var: lookup('onepassword_raw', 'Wintermute') + +- name: Retrieve all data about Wintermute when not signed in to 1Password + debug: + var: lookup('onepassword_raw', 'Wintermute', subdomain='Turing', vault_password='DmbslfLvasjdl') """ RETURN = """ @@ -54,9 +67,11 @@ class LookupModule(LookupBase): def run(self, terms, variables=None, **kwargs): op = OnePass() - op.assert_logged_in() - vault = kwargs.get('vault') + op._subdomain = kwargs.get('subdomain') + op._vault_password = kwargs.get('vault_password') + + op.assert_logged_in() values = [] for term in terms: diff --git a/test/units/plugins/lookup/test_onepassword.py b/test/units/plugins/lookup/test_onepassword.py index f1d0aa71b6..374a103b50 100644 --- a/test/units/plugins/lookup/test_onepassword.py +++ b/test/units/plugins/lookup/test_onepassword.py @@ -153,7 +153,7 @@ class MockOnePass(OnePass): if key in match_fields: return entry['output'] - def _run(self, args, expected_rc=0): + def _run(self, args, expected_rc=0, command_input=None, ignore_errors=False): parser = ArgumentParser() command_parser = parser.add_subparsers(dest='command') @@ -174,7 +174,7 @@ class MockOnePass(OnePass): if error != '': now = datetime.date.today() error = '[LOG] {0} (ERROR) {1}'.format(now.strftime('%Y/%m/%d %H:$M:$S'), error) - return output, error + return rc, output, error if args.command == 'get': if self._mock_logged_out: @@ -233,38 +233,45 @@ class TestOnePass(unittest.TestCase): def test_onepassword_get(self): op = MockOnePass() + op._logged_in = True query_generator = get_mock_query_generator() for dummy, query, dummy, field_name, field_value in query_generator: self.assertEqual(field_value, op.get_field(query, field_name)) def test_onepassword_get_raw(self): op = MockOnePass() + op._logged_in = True for entry in MOCK_ENTRIES: for query in entry['queries']: self.assertEqual(json.dumps(entry['output']), op.get_raw(query)) def test_onepassword_get_not_found(self): op = MockOnePass() + op._logged_in = True self.assertEqual('', op.get_field('a fake query', 'a fake field')) def test_onepassword_get_with_section(self): op = MockOnePass() + op._logged_in = True dummy, query, section_title, field_name, field_value = get_one_mock_query() self.assertEqual(field_value, op.get_field(query, field_name, section=section_title)) def test_onepassword_get_with_vault(self): op = MockOnePass() + op._logged_in = True entry, query, dummy, field_name, field_value = get_one_mock_query() for vault_query in [entry['vault_name'], entry['output']['vaultUuid']]: self.assertEqual(field_value, op.get_field(query, field_name, vault=vault_query)) def test_onepassword_get_with_wrong_vault(self): op = MockOnePass() + op._logged_in = True dummy, query, dummy, field_name, dummy = get_one_mock_query() self.assertEqual('', op.get_field(query, field_name, vault='a fake vault')) def test_onepassword_get_diff_case(self): op = MockOnePass() + op._logged_in = True entry, query, section_title, field_name, field_value = get_one_mock_query() self.assertEqual( field_value,