From f2458140f65a08d9384ee8d2b484a98911e4b020 Mon Sep 17 00:00:00 2001 From: azenk Date: Thu, 22 Dec 2016 16:47:33 -0600 Subject: [PATCH] Lastpass lookup plugin (#16285) This plugin can be used with the lpass cli interface for lastpass. [lastpass-cli](https://github.com/lastpass/lastpass-cli) Example: Add a lookup to your playbooks/variables somewhere: ``` some_variable: "{{ lookup('lastpass','Some Lastpass entry name or ID', field='username') }}" ``` Usage: * start a lpass session prior to using ansible * run ansible * logout when finished ``` lpass login user@domain.com ansible-playbook foo.yml lpass logout ``` --- lib/ansible/plugins/lookup/lastpass.py | 79 +++++++++ test/units/plugins/lookup/test_lastpass.py | 192 +++++++++++++++++++++ 2 files changed, 271 insertions(+) create mode 100644 lib/ansible/plugins/lookup/lastpass.py create mode 100644 test/units/plugins/lookup/test_lastpass.py diff --git a/lib/ansible/plugins/lookup/lastpass.py b/lib/ansible/plugins/lookup/lastpass.py new file mode 100644 index 0000000000..198215ec45 --- /dev/null +++ b/lib/ansible/plugins/lookup/lastpass.py @@ -0,0 +1,79 @@ +# (c) 2016, Andrew Zenk +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see . +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +from subprocess import Popen, PIPE + +from ansible.errors import AnsibleError +from ansible.plugins.lookup import LookupBase + + +class LPassException(AnsibleError): + pass + + +class LPass(object): + + def __init__(self, path='lpass'): + self._cli_path = path + + @property + def cli_path(self): + return self._cli_path + + @property + def logged_in(self): + out, err = self._run(self._build_args("logout"), stdin="n\n", expected_rc=1) + return err.startswith("Are you sure you would like to log out?") + + def _run(self, args, stdin=None, expected_rc=0): + p = Popen([self.cli_path] + args, stdout=PIPE, stderr=PIPE, stdin=PIPE) + out, err = p.communicate(stdin) + rc = p.wait() + if rc != expected_rc: + raise LPassException(err) + return out, err + + def _build_args(self, command, args=None): + if args is None: + args = [] + args = [command] + args + args += ["--color=never"] + return args + + def get_field(self, key, field): + if field in ['username', 'password', 'url', 'notes', 'id', 'name']: + out, err = self._run(self._build_args("show", ["--{0}".format(field), key])) + else: + out, err = self._run(self._build_args("show", ["--field={0}".format(field), key])) + return out.strip() + + +class LookupModule(LookupBase): + + def run(self, terms, variables=None, **kwargs): + lp = LPass() + + if not lp.logged_in: + raise AnsibleError("Not logged into lastpass: please run 'lpass login' first") + + field = kwargs.get('field', 'password') + values = [] + for term in terms: + values.append(lp.get_field(term, field)) + return values diff --git a/test/units/plugins/lookup/test_lastpass.py b/test/units/plugins/lookup/test_lastpass.py new file mode 100644 index 0000000000..bab974759a --- /dev/null +++ b/test/units/plugins/lookup/test_lastpass.py @@ -0,0 +1,192 @@ +# (c)2016 Andrew Zenk +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see . + +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +from ansible.compat.tests import unittest + +from ansible.plugins.lookup.lastpass import LookupModule, LPass, LPassException +from ansible.errors import AnsibleError +from argparse import ArgumentParser +import six + +from ansible.compat.tests.mock import patch + +MOCK_ENTRIES = [{'username': 'user', + 'name': 'Mock Entry', + 'password': 't0pS3cret passphrase entry!', + 'url': 'https://localhost/login', + 'notes': 'Test\nnote with multiple lines.\n', + 'id': '0123456789'}] + + +class MockLPass(LPass): + + _mock_logged_out = False + _mock_disconnected = False + + def _lookup_mock_entry(self, key): + for entry in MOCK_ENTRIES: + if key == entry['id'] or key == entry['name']: + return entry + + def _run(self, args, stdin=None, expected_rc=0): + # Mock behavior of lpass executable + base_options = ArgumentParser(add_help=False) + base_options.add_argument('--color', default="auto", choices=['auto', 'always', 'never']) + + p = ArgumentParser() + sp = p.add_subparsers(help='command', dest='subparser_name') + + logout_p = sp.add_parser('logout', parents=[base_options], help='logout') + show_p = sp.add_parser('show', parents=[base_options], help='show entry details') + + field_group = show_p.add_mutually_exclusive_group(required=True) + for field in MOCK_ENTRIES[0].keys(): + field_group.add_argument("--{0}".format(field), default=False, action='store_true') + field_group.add_argument('--field', default=None) + show_p.add_argument('selector', help='Unique Name or ID') + + args = p.parse_args(args) + + def mock_exit(output='', error='', rc=0): + if rc != expected_rc: + raise LPassException(error) + return output, error + + if args.color != 'never': + return mock_exit(error='Error: Mock only supports --color=never', rc=1) + + if args.subparser_name == 'logout': + if self._mock_logged_out: + return mock_exit(error='Error: Not currently logged in', rc=1) + + logged_in_error = 'Are you sure you would like to log out? [Y/n]' + if stdin and stdin.lower() == 'n\n': + return mock_exit(output='Log out: aborted.', error=logged_in_error, rc=1) + elif stdin and stdin.lower() == 'y\n': + return mock_exit(output='Log out: complete.', error=logged_in_error, rc=0) + else: + return mock_exit(error='Error: aborted response', rc=1) + + if args.subparser_name == 'show': + if self._mock_logged_out: + return mock_exit(error='Error: Could not find decryption key.' + + ' Perhaps you need to login with `lpass login`.', rc=1) + + if self._mock_disconnected: + return mock_exit(error='Error: Couldn\'t resolve host name.', rc=1) + + mock_entry = self._lookup_mock_entry(args.selector) + + if args.field: + return mock_exit(output=mock_entry.get(args.field, '')) + elif args.password: + return mock_exit(output=mock_entry.get('password', '')) + elif args.username: + return mock_exit(output=mock_entry.get('username', '')) + elif args.url: + return mock_exit(output=mock_entry.get('url', '')) + elif args.name: + return mock_exit(output=mock_entry.get('name', '')) + elif args.id: + return mock_exit(output=mock_entry.get('id', '')) + elif args.notes: + return mock_exit(output=mock_entry.get('notes', '')) + + raise LPassException('We should never get here') + + +class DisconnectedMockLPass(MockLPass): + + _mock_disconnected = True + + +class LoggedOutMockLPass(MockLPass): + + _mock_logged_out = True + + +class TestLPass(unittest.TestCase): + + def setUp(self): + pass + + def tearDown(self): + pass + + def test_lastpass_cli_path(self): + lp = MockLPass(path='/dev/null') + self.assertEqual('/dev/null', lp.cli_path) + + def test_lastpass_build_args_logout(self): + lp = MockLPass() + self.assertEqual(['logout', '--color=never'], lp._build_args("logout")) + + def test_lastpass_logged_in_true(self): + lp = MockLPass() + self.assertTrue(lp.logged_in) + + def test_lastpass_logged_in_false(self): + lp = LoggedOutMockLPass() + self.assertFalse(lp.logged_in) + + def test_lastpass_show_disconnected(self): + lp = DisconnectedMockLPass() + + with self.assertRaises(LPassException): + lp.get_field('0123456789', 'username') + + def test_lastpass_show(self): + lp = MockLPass() + for entry in MOCK_ENTRIES: + entry_id = entry.get('id') + for k, v in six.iteritems(entry): + self.assertEqual(v.strip(), lp.get_field(entry_id, k)) + + +class TestLastpassPlugin(unittest.TestCase): + + @patch('ansible.plugins.lookup.lastpass.LPass', new=MockLPass) + def test_lastpass_plugin_normal(self): + lookup_plugin = LookupModule() + + for entry in MOCK_ENTRIES: + entry_id = entry.get('id') + for k, v in six.iteritems(entry): + self.assertEqual(v.strip(), + lookup_plugin.run([entry_id], field=k)[0]) + + @patch('ansible.plugins.lookup.lastpass.LPass', LoggedOutMockLPass) + def test_lastpass_plugin_logged_out(self): + lookup_plugin = LookupModule() + + entry = MOCK_ENTRIES[0] + entry_id = entry.get('id') + with self.assertRaises(AnsibleError): + lookup_plugin.run([entry_id], field='password') + + @patch('ansible.plugins.lookup.lastpass.LPass', DisconnectedMockLPass) + def test_lastpass_plugin_disconnected(self): + lookup_plugin = LookupModule() + + entry = MOCK_ENTRIES[0] + entry_id = entry.get('id') + with self.assertRaises(AnsibleError): + lookup_plugin.run([entry_id], field='password')