mirror of
https://github.com/ansible-collections/community.general.git
synced 2024-09-14 20:13:21 +02:00
b60854357b
* Unify login behavior between 1Password lookup plugins and module - Use the same names for all credential aspects - Only require the minimal amount of information for each - Add more examples * Change parameter terms - use terms in line with 1Password documentation. - update examples - update tests * Improve error messages in lookup plugin * Unify onepassword_facts with lookup plugins - use same methods and logic for signing in or reusing existing session - unify terms with lookup plugins * Change rc test for determing login An rc other than 1 can be returned when a current login session does not exist. * Create AnsibleModuleError class ansible.errors is not available to modules, so create an AnsibleModuleError class within the module Do not user os.path.expanduser since this is already done by virtue of the type being "path" in the argument spec. * Add note about risk with fact caching sensitive data * Add note on op version that was used for testing
320 lines
11 KiB
Python
320 lines
11 KiB
Python
# (c) 2018, Scott Buchanan <sbuchanan@ri.pn>
|
|
# (c) 2016, Andrew Zenk <azenk@umn.edu> (test_lastpass.py used as starting point)
|
|
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
|
|
from __future__ import (absolute_import, division, print_function)
|
|
__metaclass__ = type
|
|
|
|
import json
|
|
import datetime
|
|
|
|
try:
|
|
from urllib.parse import urlparse
|
|
except ImportError:
|
|
from urlparse import urlparse
|
|
|
|
from argparse import ArgumentParser
|
|
|
|
|
|
from ansible.compat.tests import unittest
|
|
from ansible.compat.tests.mock import patch
|
|
from ansible.errors import AnsibleError
|
|
from ansible.plugins.lookup.onepassword import OnePass, LookupModule
|
|
from ansible.plugins.lookup.onepassword_raw import LookupModule as OnePasswordRawLookup
|
|
|
|
|
|
# Intentionally excludes metadata leaf nodes that would exist in real output if not relevant.
|
|
MOCK_ENTRIES = [
|
|
{
|
|
'vault_name': 'Acme "Quot\'d" Servers',
|
|
'queries': [
|
|
'0123456789',
|
|
'Mock "Quot\'d" Server'
|
|
],
|
|
'output': {
|
|
'uuid': '0123456789',
|
|
'vaultUuid': '2468',
|
|
'overview': {
|
|
'title': 'Mock "Quot\'d" Server'
|
|
},
|
|
'details': {
|
|
'sections': [{
|
|
'title': '',
|
|
'fields': [
|
|
{'t': 'username', 'v': 'jamesbond'},
|
|
{'t': 'password', 'v': 't0pS3cret'},
|
|
{'t': 'notes', 'v': 'Test note with\nmultiple lines and trailing space.\n\n'},
|
|
{'t': 'tricksy "quot\'d" field\\', 'v': '"quot\'d" value'}
|
|
]
|
|
}]
|
|
}
|
|
}
|
|
},
|
|
{
|
|
'vault_name': 'Acme Logins',
|
|
'queries': [
|
|
'9876543210',
|
|
'Mock Website',
|
|
'acme.com'
|
|
],
|
|
'output': {
|
|
'uuid': '9876543210',
|
|
'vaultUuid': '1357',
|
|
'overview': {
|
|
'title': 'Mock Website',
|
|
'URLs': [
|
|
{'l': 'website', 'u': 'https://acme.com/login'}
|
|
]
|
|
},
|
|
'details': {
|
|
'sections': [{
|
|
'title': '',
|
|
'fields': [
|
|
{'t': 'password', 'v': 't0pS3cret'}
|
|
]
|
|
}]
|
|
}
|
|
}
|
|
},
|
|
{
|
|
'vault_name': 'Acme Logins',
|
|
'queries': [
|
|
'864201357'
|
|
],
|
|
'output': {
|
|
'uuid': '864201357',
|
|
'vaultUuid': '1357',
|
|
'overview': {
|
|
'title': 'Mock Something'
|
|
},
|
|
'details': {
|
|
'fields': [
|
|
{
|
|
'value': 'jbond@mi6.gov.uk',
|
|
'name': 'emailAddress'
|
|
},
|
|
{
|
|
'name': 'password',
|
|
'value': 'vauxhall'
|
|
}
|
|
]
|
|
}
|
|
}
|
|
},
|
|
]
|
|
|
|
|
|
def get_mock_query_generator(require_field=None):
|
|
def _process_field(field, section_title=None):
|
|
field_name = field.get('name', field.get('t'))
|
|
field_value = field.get('value', field.get('v'))
|
|
|
|
if require_field is None or field_name == require_field:
|
|
return entry, query, section_title, field_name, field_value
|
|
|
|
for entry in MOCK_ENTRIES:
|
|
for query in entry['queries']:
|
|
for field in entry['output']['details'].get('fields', []):
|
|
fixture = _process_field(field)
|
|
if fixture:
|
|
yield fixture
|
|
for section in entry['output']['details'].get('sections', []):
|
|
for field in section['fields']:
|
|
fixture = _process_field(field, section['title'])
|
|
if fixture:
|
|
yield fixture
|
|
|
|
|
|
def get_one_mock_query(require_field=None):
|
|
generator = get_mock_query_generator(require_field)
|
|
return next(generator)
|
|
|
|
|
|
class MockOnePass(OnePass):
|
|
|
|
_mock_logged_out = False
|
|
_mock_timed_out = False
|
|
|
|
def _lookup_mock_entry(self, key, vault=None):
|
|
for entry in MOCK_ENTRIES:
|
|
if vault is not None and vault.lower() != entry['vault_name'].lower() and vault.lower() != entry['output']['vaultUuid'].lower():
|
|
continue
|
|
|
|
match_fields = [
|
|
entry['output']['uuid'],
|
|
entry['output']['overview']['title']
|
|
]
|
|
|
|
# Note that exactly how 1Password matches on domains in non-trivial cases is neither documented
|
|
# nor obvious, so this may not precisely match the real behavior.
|
|
urls = entry['output']['overview'].get('URLs')
|
|
if urls is not None:
|
|
match_fields += [urlparse(url['u']).netloc for url in urls]
|
|
|
|
if key in match_fields:
|
|
return entry['output']
|
|
|
|
def _run(self, args, expected_rc=0, command_input=None, ignore_errors=False):
|
|
parser = ArgumentParser()
|
|
|
|
command_parser = parser.add_subparsers(dest='command')
|
|
|
|
get_parser = command_parser.add_parser('get')
|
|
get_options = ArgumentParser(add_help=False)
|
|
get_options.add_argument('--vault')
|
|
get_type_parser = get_parser.add_subparsers(dest='object_type')
|
|
get_type_parser.add_parser('account', parents=[get_options])
|
|
get_item_parser = get_type_parser.add_parser('item', parents=[get_options])
|
|
get_item_parser.add_argument('item_id')
|
|
|
|
args = parser.parse_args(args)
|
|
|
|
def mock_exit(output='', error='', rc=0):
|
|
if rc != expected_rc:
|
|
raise AnsibleError(error)
|
|
if error != '':
|
|
now = datetime.date.today()
|
|
error = '[LOG] {0} (ERROR) {1}'.format(now.strftime('%Y/%m/%d %H:$M:$S'), error)
|
|
return rc, output, error
|
|
|
|
if args.command == 'get':
|
|
if self._mock_logged_out:
|
|
return mock_exit(error='You are not currently signed in. Please run `op signin --help` for instructions', rc=1)
|
|
|
|
if self._mock_timed_out:
|
|
return mock_exit(error='401: Authentication required.', rc=1)
|
|
|
|
if args.object_type == 'item':
|
|
mock_entry = self._lookup_mock_entry(args.item_id, args.vault)
|
|
|
|
if mock_entry is None:
|
|
return mock_exit(error='Item {0} not found'.format(args.item_id))
|
|
|
|
return mock_exit(output=json.dumps(mock_entry))
|
|
|
|
if args.object_type == 'account':
|
|
# Since we don't actually ever use this output, don't bother mocking output.
|
|
return mock_exit()
|
|
|
|
raise AnsibleError('Unsupported command string passed to OnePass mock: {0}'.format(args))
|
|
|
|
|
|
class LoggedOutMockOnePass(MockOnePass):
|
|
|
|
_mock_logged_out = True
|
|
|
|
|
|
class TimedOutMockOnePass(MockOnePass):
|
|
|
|
_mock_timed_out = True
|
|
|
|
|
|
class TestOnePass(unittest.TestCase):
|
|
|
|
def test_onepassword_cli_path(self):
|
|
op = MockOnePass(path='/dev/null')
|
|
self.assertEqual('/dev/null', op.cli_path)
|
|
|
|
def test_onepassword_logged_in(self):
|
|
op = MockOnePass()
|
|
try:
|
|
op.assert_logged_in()
|
|
except:
|
|
self.fail()
|
|
|
|
def test_onepassword_logged_out(self):
|
|
op = LoggedOutMockOnePass()
|
|
with self.assertRaises(AnsibleError):
|
|
op.assert_logged_in()
|
|
|
|
def test_onepassword_timed_out(self):
|
|
op = TimedOutMockOnePass()
|
|
with self.assertRaises(AnsibleError):
|
|
op.assert_logged_in()
|
|
|
|
def test_onepassword_get(self):
|
|
op = MockOnePass()
|
|
op.logged_in = True
|
|
query_generator = get_mock_query_generator()
|
|
for dummy, query, dummy, field_name, field_value in query_generator:
|
|
self.assertEqual(field_value, op.get_field(query, field_name))
|
|
|
|
def test_onepassword_get_raw(self):
|
|
op = MockOnePass()
|
|
op.logged_in = True
|
|
for entry in MOCK_ENTRIES:
|
|
for query in entry['queries']:
|
|
self.assertEqual(json.dumps(entry['output']), op.get_raw(query))
|
|
|
|
def test_onepassword_get_not_found(self):
|
|
op = MockOnePass()
|
|
op.logged_in = True
|
|
self.assertEqual('', op.get_field('a fake query', 'a fake field'))
|
|
|
|
def test_onepassword_get_with_section(self):
|
|
op = MockOnePass()
|
|
op.logged_in = True
|
|
dummy, query, section_title, field_name, field_value = get_one_mock_query()
|
|
self.assertEqual(field_value, op.get_field(query, field_name, section=section_title))
|
|
|
|
def test_onepassword_get_with_vault(self):
|
|
op = MockOnePass()
|
|
op.logged_in = True
|
|
entry, query, dummy, field_name, field_value = get_one_mock_query()
|
|
for vault_query in [entry['vault_name'], entry['output']['vaultUuid']]:
|
|
self.assertEqual(field_value, op.get_field(query, field_name, vault=vault_query))
|
|
|
|
def test_onepassword_get_with_wrong_vault(self):
|
|
op = MockOnePass()
|
|
op.logged_in = True
|
|
dummy, query, dummy, field_name, dummy = get_one_mock_query()
|
|
self.assertEqual('', op.get_field(query, field_name, vault='a fake vault'))
|
|
|
|
def test_onepassword_get_diff_case(self):
|
|
op = MockOnePass()
|
|
op.logged_in = True
|
|
entry, query, section_title, field_name, field_value = get_one_mock_query()
|
|
self.assertEqual(
|
|
field_value,
|
|
op.get_field(
|
|
query,
|
|
field_name.upper(),
|
|
vault=entry['vault_name'].upper(),
|
|
section=section_title.upper()
|
|
)
|
|
)
|
|
|
|
|
|
@patch('ansible.plugins.lookup.onepassword.OnePass', MockOnePass)
|
|
class TestLookupModule(unittest.TestCase):
|
|
|
|
def test_onepassword_plugin_multiple(self):
|
|
lookup_plugin = LookupModule()
|
|
|
|
entry = MOCK_ENTRIES[0]
|
|
field = entry['output']['details']['sections'][0]['fields'][0]
|
|
|
|
self.assertEqual(
|
|
[field['v']] * len(entry['queries']),
|
|
lookup_plugin.run(entry['queries'], field=field['t'])
|
|
)
|
|
|
|
def test_onepassword_plugin_default_field(self):
|
|
lookup_plugin = LookupModule()
|
|
|
|
dummy, query, dummy, dummy, field_value = get_one_mock_query('password')
|
|
self.assertEqual([field_value], lookup_plugin.run([query]))
|
|
|
|
|
|
@patch('ansible.plugins.lookup.onepassword_raw.OnePass', MockOnePass)
|
|
class TestOnePasswordRawLookup(unittest.TestCase):
|
|
|
|
def test_onepassword_raw_plugin_multiple(self):
|
|
raw_lookup_plugin = OnePasswordRawLookup()
|
|
|
|
entry = MOCK_ENTRIES[0]
|
|
raw_value = entry['output']
|
|
|
|
self.assertEqual(
|
|
[raw_value] * len(entry['queries']),
|
|
raw_lookup_plugin.run(entry['queries'])
|
|
)
|