1
0
Fork 0
mirror of https://github.com/ansible-collections/community.general.git synced 2024-09-14 20:13:21 +02:00

Fix password prompt matching (#24081)

* Fix password prompt matching

* Add some tests for check_password_prompt

* Prevent pep8 line ends with a space error
This commit is contained in:
Matt Martz 2017-05-01 10:18:15 -05:00 committed by Brian Coca
parent d8f76bed97
commit 040fb4435a
2 changed files with 83 additions and 4 deletions

View file

@ -254,10 +254,8 @@ class ConnectionBase(with_metaclass(ABCMeta, object)):
return False return False
elif isinstance(self._play_context.prompt, string_types): elif isinstance(self._play_context.prompt, string_types):
b_prompt = to_bytes(self._play_context.prompt).strip() b_prompt = to_bytes(self._play_context.prompt).strip()
b_lines = b_output.splitlines(True) b_lines = b_output.splitlines()
if not b_lines: return any(l.strip().startswith(b_prompt) for l in b_lines)
return False
return b_lines[-1].strip().endswith(b_prompt) or b_lines[0].strip().endswith(b_prompt)
else: else:
return self._play_context.prompt(b_output) return self._play_context.prompt(b_output)

View file

@ -45,6 +45,9 @@ class TestConnectionBaseClass(unittest.TestCase):
def setUp(self): def setUp(self):
self.play_context = PlayContext() self.play_context = PlayContext()
self.play_context.prompt = (
'[sudo via ansible, key=ouzmdnewuhucvuaabtjmweasarviygqq] password: '
)
self.in_stream = StringIO() self.in_stream = StringIO()
def tearDown(self): def tearDown(self):
@ -132,3 +135,81 @@ class TestConnectionBaseClass(unittest.TestCase):
def test_network_cli_connection_module(self): def test_network_cli_connection_module(self):
self.assertIsInstance(NetworkCliConnection(self.play_context, self.in_stream), NetworkCliConnection) self.assertIsInstance(NetworkCliConnection(self.play_context, self.in_stream), NetworkCliConnection)
self.assertIsInstance(NetworkCliConnection(self.play_context, self.in_stream), ParamikoConnection) self.assertIsInstance(NetworkCliConnection(self.play_context, self.in_stream), ParamikoConnection)
def test_check_password_prompt(self):
local = (
b'[sudo via ansible, key=ouzmdnewuhucvuaabtjmweasarviygqq] password: \n'
b'BECOME-SUCCESS-ouzmdnewuhucvuaabtjmweasarviygqq\n'
)
ssh_pipelining_vvvv = b'''
debug3: mux_master_read_cb: channel 1 packet type 0x10000002 len 251
debug2: process_mux_new_session: channel 1: request tty 0, X 1, agent 1, subsys 0, term "xterm-256color", cmd "/bin/sh -c 'sudo -H -S -p "[sudo via ansible, key=ouzmdnewuhucvuaabtjmweasarviygqq] password: " -u root /bin/sh -c '"'"'echo BECOME-SUCCESS-ouzmdnewuhucvuaabtjmweasarviygqq; /bin/true'"'"' && sleep 0'", env 0
debug3: process_mux_new_session: got fds stdin 9, stdout 10, stderr 11
debug2: client_session2_setup: id 2
debug1: Sending command: /bin/sh -c 'sudo -H -S -p "[sudo via ansible, key=ouzmdnewuhucvuaabtjmweasarviygqq] password: " -u root /bin/sh -c '"'"'echo BECOME-SUCCESS-ouzmdnewuhucvuaabtjmweasarviygqq; /bin/true'"'"' && sleep 0'
debug2: channel 2: request exec confirm 1
debug2: channel 2: rcvd ext data 67
[sudo via ansible, key=ouzmdnewuhucvuaabtjmweasarviygqq] password: debug2: channel 2: written 67 to efd 11
BECOME-SUCCESS-ouzmdnewuhucvuaabtjmweasarviygqq
debug3: receive packet: type 98
''' # noqa
ssh_nopipelining_vvvv = b'''
debug3: mux_master_read_cb: channel 1 packet type 0x10000002 len 251
debug2: process_mux_new_session: channel 1: request tty 1, X 1, agent 1, subsys 0, term "xterm-256color", cmd "/bin/sh -c 'sudo -H -S -p "[sudo via ansible, key=ouzmdnewuhucvuaabtjmweasarviygqq] password: " -u root /bin/sh -c '"'"'echo BECOME-SUCCESS-ouzmdnewuhucvuaabtjmweasarviygqq; /bin/true'"'"' && sleep 0'", env 0
debug3: mux_client_request_session: session request sent
debug3: send packet: type 98
debug1: Sending command: /bin/sh -c 'sudo -H -S -p "[sudo via ansible, key=ouzmdnewuhucvuaabtjmweasarviygqq] password: " -u root /bin/sh -c '"'"'echo BECOME-SUCCESS-ouzmdnewuhucvuaabtjmweasarviygqq; /bin/true'"'"' && sleep 0'
debug2: channel 2: request exec confirm 1
debug2: exec request accepted on channel 2
[sudo via ansible, key=ouzmdnewuhucvuaabtjmweasarviygqq] password: debug3: receive packet: type 2
debug3: Received SSH2_MSG_IGNORE
debug3: Received SSH2_MSG_IGNORE
BECOME-SUCCESS-ouzmdnewuhucvuaabtjmweasarviygqq
debug3: receive packet: type 98
''' # noqa
ssh_novvvv = (
b'[sudo via ansible, key=ouzmdnewuhucvuaabtjmweasarviygqq] password: \n'
b'BECOME-SUCCESS-ouzmdnewuhucvuaabtjmweasarviygqq\n'
)
dns_issue = (
b'timeout waiting for privilege escalation password prompt:\n'
b'sudo: sudo: unable to resolve host tcloud014\n'
b'[sudo via ansible, key=ouzmdnewuhucvuaabtjmweasarviygqq] password: \n'
b'BECOME-SUCCESS-ouzmdnewuhucvuaabtjmweasarviygqq\n'
)
nothing = b''
in_front = b'''
debug1: Sending command: /bin/sh -c 'sudo -H -S -p "[sudo via ansible, key=ouzmdnewuhucvuaabtjmweasarviygqq] password: " -u root /bin/sh -c '"'"'echo
'''
class ConnectionFoo(ConnectionBase):
@property
def transport(self):
pass
def _connect(self):
pass
def exec_command(self):
pass
def put_file(self):
pass
def fetch_file(self):
pass
def close(self):
pass
c = ConnectionFoo(self.play_context, self.in_stream)
self.assertTrue(c.check_password_prompt(local))
self.assertTrue(c.check_password_prompt(ssh_pipelining_vvvv))
self.assertTrue(c.check_password_prompt(ssh_nopipelining_vvvv))
self.assertTrue(c.check_password_prompt(ssh_novvvv))
self.assertTrue(c.check_password_prompt(dns_issue))
self.assertFalse(c.check_password_prompt(nothing))
self.assertFalse(c.check_password_prompt(in_front))