mirror of
https://github.com/ansible-collections/community.general.git
synced 2024-09-14 20:13:21 +02:00
Get the ssh plugin working with python3 (#17234)
This commit is contained in:
parent
dbab23e68f
commit
bd68c324ce
3 changed files with 97 additions and 94 deletions
|
@ -240,27 +240,29 @@ class ConnectionBase(with_metaclass(ABCMeta, object)):
|
|||
"""Terminate the connection"""
|
||||
pass
|
||||
|
||||
def check_become_success(self, output):
|
||||
for line in output.splitlines(True):
|
||||
if self._play_context.success_key == line.rstrip():
|
||||
def check_become_success(self, b_output):
|
||||
b_success_key = to_bytes(self._play_context.success_key)
|
||||
for b_line in b_output.splitlines(True):
|
||||
if b_success_key == b_line.rstrip():
|
||||
return True
|
||||
return False
|
||||
|
||||
def check_password_prompt(self, output):
|
||||
def check_password_prompt(self, b_output):
|
||||
if self._play_context.prompt is None:
|
||||
return False
|
||||
elif isinstance(self._play_context.prompt, string_types):
|
||||
return output.startswith(self._play_context.prompt)
|
||||
b_prompt = to_bytes(self._play_context.prompt)
|
||||
return b_output.startswith(b_prompt)
|
||||
else:
|
||||
return self._play_context.prompt(output)
|
||||
|
||||
def check_incorrect_password(self, output):
|
||||
incorrect_password = gettext.dgettext(self._play_context.become_method, C.BECOME_ERROR_STRINGS[self._play_context.become_method])
|
||||
return incorrect_password and incorrect_password in output
|
||||
def check_incorrect_password(self, b_output):
|
||||
b_incorrect_password = to_bytes(gettext.dgettext(self._play_context.become_method, C.BECOME_ERROR_STRINGS[self._play_context.become_method]))
|
||||
return b_incorrect_password and b_incorrect_password in b_output
|
||||
|
||||
def check_missing_password(self, output):
|
||||
missing_password = gettext.dgettext(self._play_context.become_method, C.BECOME_MISSING_STRINGS[self._play_context.become_method])
|
||||
return missing_password and missing_password in output
|
||||
def check_missing_password(self, b_output):
|
||||
b_missing_password = to_bytes(gettext.dgettext(self._play_context.become_method, C.BECOME_MISSING_STRINGS[self._play_context.become_method]))
|
||||
return b_missing_password and b_missing_password in b_output
|
||||
|
||||
def connection_lock(self):
|
||||
f = self._play_context.connection_lockfd
|
||||
|
|
|
@ -107,7 +107,7 @@ class Connection(ConnectionBase):
|
|||
explanation of why they were added.
|
||||
"""
|
||||
self._command += args
|
||||
display.vvvvv('SSH: ' + explanation + ': (%s)' % ')('.join(args), host=self._play_context.remote_addr)
|
||||
display.vvvvv('SSH: ' + explanation + ': (%s)' % ')('.join(map(to_unicode, args)), host=self._play_context.remote_addr)
|
||||
|
||||
def _build_command(self, binary, *other_args):
|
||||
'''
|
||||
|
@ -217,15 +217,14 @@ class Connection(ConnectionBase):
|
|||
|
||||
if not controlpath:
|
||||
cpdir = unfrackpath('$HOME/.ansible/cp')
|
||||
b_cpdir = to_bytes(cpdir)
|
||||
|
||||
# The directory must exist and be writable.
|
||||
makedirs_safe(cpdir, 0o700)
|
||||
if not os.access(cpdir, os.W_OK):
|
||||
raise AnsibleError("Cannot write to ControlPath %s" % cpdir)
|
||||
makedirs_safe(b_cpdir, 0o700)
|
||||
if not os.access(b_cpdir, os.W_OK):
|
||||
raise AnsibleError("Cannot write to ControlPath %s" % to_str(cpdir))
|
||||
|
||||
args = ("-o", "ControlPath={0}".format(
|
||||
to_bytes(C.ANSIBLE_SSH_CONTROL_PATH % dict(directory=cpdir)))
|
||||
)
|
||||
args = ("-o", "ControlPath=" + C.ANSIBLE_SSH_CONTROL_PATH % dict(directory=cpdir))
|
||||
self._add_args("found only ControlPersist; added ControlPath", args)
|
||||
|
||||
## Finally, we add any caller-supplied extras.
|
||||
|
@ -233,7 +232,8 @@ class Connection(ConnectionBase):
|
|||
if other_args:
|
||||
self._command += other_args
|
||||
|
||||
return self._command
|
||||
cmd = [to_bytes(a) for a in self._command]
|
||||
return cmd
|
||||
|
||||
def _send_initial_data(self, fh, in_data):
|
||||
'''
|
||||
|
@ -245,7 +245,7 @@ class Connection(ConnectionBase):
|
|||
display.debug('Sending initial data')
|
||||
|
||||
try:
|
||||
fh.write(in_data)
|
||||
fh.write(to_bytes(in_data))
|
||||
fh.close()
|
||||
except (OSError, IOError):
|
||||
raise AnsibleConnectionFailure('SSH Error: data could not be sent to the remote host. Make sure this host can be reached over ssh')
|
||||
|
@ -263,7 +263,7 @@ class Connection(ConnectionBase):
|
|||
|
||||
# This is separate from _run() because we need to do the same thing for stdout
|
||||
# and stderr.
|
||||
def _examine_output(self, source, state, chunk, sudoable):
|
||||
def _examine_output(self, source, state, b_chunk, sudoable):
|
||||
'''
|
||||
Takes a string, extracts complete lines from it, tests to see if they
|
||||
are a prompt, error message, etc., and sets appropriate flags in self.
|
||||
|
@ -274,46 +274,47 @@ class Connection(ConnectionBase):
|
|||
'''
|
||||
|
||||
output = []
|
||||
for l in chunk.splitlines(True):
|
||||
for b_line in b_chunk.splitlines(True):
|
||||
display_line = to_unicode(b_line, errors='replace').rstrip('\r\n')
|
||||
suppress_output = False
|
||||
|
||||
#display.debug("Examining line (source=%s, state=%s): '%s'" % (source, state, l.rstrip('\r\n')))
|
||||
if self._play_context.prompt and self.check_password_prompt(l):
|
||||
display.debug("become_prompt: (source=%s, state=%s): '%s'" % (source, state, l.rstrip('\r\n')))
|
||||
#display.debug("Examining line (source=%s, state=%s): '%s'" % (source, state, display_line))
|
||||
if self._play_context.prompt and self.check_password_prompt(b_line):
|
||||
display.debug("become_prompt: (source=%s, state=%s): '%s'" % (source, state, display_line))
|
||||
self._flags['become_prompt'] = True
|
||||
suppress_output = True
|
||||
elif self._play_context.success_key and self.check_become_success(l):
|
||||
display.debug("become_success: (source=%s, state=%s): '%s'" % (source, state, l.rstrip('\r\n')))
|
||||
elif self._play_context.success_key and self.check_become_success(b_line):
|
||||
display.debug("become_success: (source=%s, state=%s): '%s'" % (source, state, display_line))
|
||||
self._flags['become_success'] = True
|
||||
suppress_output = True
|
||||
elif sudoable and self.check_incorrect_password(l):
|
||||
display.debug("become_error: (source=%s, state=%s): '%s'" % (source, state, l.rstrip('\r\n')))
|
||||
elif sudoable and self.check_incorrect_password(b_line):
|
||||
display.debug("become_error: (source=%s, state=%s): '%s'" % (source, state, display_line))
|
||||
self._flags['become_error'] = True
|
||||
elif sudoable and self.check_missing_password(l):
|
||||
display.debug("become_nopasswd_error: (source=%s, state=%s): '%s'" % (source, state, l.rstrip('\r\n')))
|
||||
elif sudoable and self.check_missing_password(b_line):
|
||||
display.debug("become_nopasswd_error: (source=%s, state=%s): '%s'" % (source, state, display_line))
|
||||
self._flags['become_nopasswd_error'] = True
|
||||
|
||||
if not suppress_output:
|
||||
output.append(l)
|
||||
output.append(b_line)
|
||||
|
||||
# The chunk we read was most likely a series of complete lines, but just
|
||||
# in case the last line was incomplete (and not a prompt, which we would
|
||||
# have removed from the output), we retain it to be processed with the
|
||||
# next chunk.
|
||||
|
||||
remainder = ''
|
||||
if output and not output[-1].endswith('\n'):
|
||||
remainder = b''
|
||||
if output and not output[-1].endswith(b'\n'):
|
||||
remainder = output[-1]
|
||||
output = output[:-1]
|
||||
|
||||
return ''.join(output), remainder
|
||||
return b''.join(output), remainder
|
||||
|
||||
def _run(self, cmd, in_data, sudoable=True):
|
||||
'''
|
||||
Starts the command and communicates with it until it ends.
|
||||
'''
|
||||
|
||||
display_cmd = map(to_unicode, map(pipes.quote, cmd))
|
||||
display_cmd = list(map(pipes.quote, map(to_unicode, cmd)))
|
||||
display.vvv(u'SSH: EXEC {0}'.format(u' '.join(display_cmd)), host=self.host)
|
||||
|
||||
# Start the given command. If we don't need to pipeline data, we can try
|
||||
|
@ -333,7 +334,7 @@ class Connection(ConnectionBase):
|
|||
# Make sure stdin is a proper pty to avoid tcgetattr errors
|
||||
master, slave = pty.openpty()
|
||||
p = subprocess.Popen(cmd, stdin=slave, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||
stdin = os.fdopen(master, 'w', 0)
|
||||
stdin = os.fdopen(master, 'wb', 0)
|
||||
os.close(slave)
|
||||
except (OSError, IOError):
|
||||
p = None
|
||||
|
@ -348,7 +349,7 @@ class Connection(ConnectionBase):
|
|||
if self._play_context.password:
|
||||
os.close(self.sshpass_pipe[0])
|
||||
try:
|
||||
os.write(self.sshpass_pipe[1], "{0}\n".format(to_bytes(self._play_context.password)))
|
||||
os.write(self.sshpass_pipe[1], to_bytes(self._play_context.password) + b'\n')
|
||||
except OSError as e:
|
||||
# Ignore broken pipe errors if the sshpass process has exited.
|
||||
if e.errno != errno.EPIPE or p.poll() is None:
|
||||
|
@ -375,12 +376,12 @@ class Connection(ConnectionBase):
|
|||
# We're requesting escalation with a password, so we have to
|
||||
# wait for a password prompt.
|
||||
state = states.index('awaiting_prompt')
|
||||
display.debug('Initial state: %s: %s' % (states[state], self._play_context.prompt))
|
||||
display.debug(u'Initial state: %s: %s' % (states[state], self._play_context.prompt))
|
||||
elif self._play_context.become and self._play_context.success_key:
|
||||
# We're requesting escalation without a password, so we have to
|
||||
# detect success/failure before sending any initial data.
|
||||
state = states.index('awaiting_escalation')
|
||||
display.debug('Initial state: %s: %s' % (states[state], self._play_context.success_key))
|
||||
display.debug(u'Initial state: %s: %s' % (states[state], self._play_context.success_key))
|
||||
|
||||
# We store accumulated stdout and stderr output from the process here,
|
||||
# but strip any privilege escalation prompt/confirmation lines first.
|
||||
|
@ -388,8 +389,8 @@ class Connection(ConnectionBase):
|
|||
# an array, then checked and removed or copied to stdout or stderr. We
|
||||
# set any flags based on examining the output in self._flags.
|
||||
|
||||
stdout = stderr = ''
|
||||
tmp_stdout = tmp_stderr = ''
|
||||
b_stdout = b_stderr = b''
|
||||
b_tmp_stdout = b_tmp_stderr = b''
|
||||
|
||||
self._flags = dict(
|
||||
become_prompt=False, become_success=False,
|
||||
|
@ -423,43 +424,43 @@ class Connection(ConnectionBase):
|
|||
if p.poll() is not None:
|
||||
break
|
||||
self._terminate_process(p)
|
||||
raise AnsibleError('Timeout (%ds) waiting for privilege escalation prompt: %s' % (timeout, stdout))
|
||||
raise AnsibleError('Timeout (%ds) waiting for privilege escalation prompt: %s' % (timeout, to_str(b_stdout)))
|
||||
|
||||
# Read whatever output is available on stdout and stderr, and stop
|
||||
# listening to the pipe if it's been closed.
|
||||
|
||||
if p.stdout in rfd:
|
||||
chunk = p.stdout.read()
|
||||
if chunk == '':
|
||||
b_chunk = p.stdout.read()
|
||||
if b_chunk == b'':
|
||||
rpipes.remove(p.stdout)
|
||||
tmp_stdout += chunk
|
||||
display.debug("stdout chunk (state=%s):\n>>>%s<<<\n" % (state, chunk))
|
||||
b_tmp_stdout += b_chunk
|
||||
display.debug("stdout chunk (state=%s):\n>>>%s<<<\n" % (state, to_unicode(b_chunk, errors='replace')))
|
||||
|
||||
if p.stderr in rfd:
|
||||
chunk = p.stderr.read()
|
||||
if chunk == '':
|
||||
b_chunk = p.stderr.read()
|
||||
if b_chunk == b'':
|
||||
rpipes.remove(p.stderr)
|
||||
tmp_stderr += chunk
|
||||
display.debug("stderr chunk (state=%s):\n>>>%s<<<\n" % (state, chunk))
|
||||
b_tmp_stderr += b_chunk
|
||||
display.debug("stderr chunk (state=%s):\n>>>%s<<<\n" % (state, to_unicode(b_chunk, errors='replace')))
|
||||
|
||||
# We examine the output line-by-line until we have negotiated any
|
||||
# privilege escalation prompt and subsequent success/error message.
|
||||
# Afterwards, we can accumulate output without looking at it.
|
||||
|
||||
if state < states.index('ready_to_send'):
|
||||
if tmp_stdout:
|
||||
output, unprocessed = self._examine_output('stdout', states[state], tmp_stdout, sudoable)
|
||||
stdout += output
|
||||
tmp_stdout = unprocessed
|
||||
if b_tmp_stdout:
|
||||
b_output, b_unprocessed = self._examine_output('stdout', states[state], b_tmp_stdout, sudoable)
|
||||
b_stdout += b_output
|
||||
b_tmp_stdout = b_unprocessed
|
||||
|
||||
if tmp_stderr:
|
||||
output, unprocessed = self._examine_output('stderr', states[state], tmp_stderr, sudoable)
|
||||
stderr += output
|
||||
tmp_stderr = unprocessed
|
||||
if b_tmp_stderr:
|
||||
b_output, b_unprocessed = self._examine_output('stderr', states[state], b_tmp_stderr, sudoable)
|
||||
b_stderr += b_output
|
||||
b_tmp_stderr = b_unprocessed
|
||||
else:
|
||||
stdout += tmp_stdout
|
||||
stderr += tmp_stderr
|
||||
tmp_stdout = tmp_stderr = ''
|
||||
b_stdout += b_tmp_stdout
|
||||
b_stderr += b_tmp_stderr
|
||||
b_tmp_stdout = b_tmp_stderr = b''
|
||||
|
||||
# If we see a privilege escalation prompt, we send the password.
|
||||
# (If we're expecting a prompt but the escalation succeeds, we
|
||||
|
@ -468,7 +469,7 @@ class Connection(ConnectionBase):
|
|||
if states[state] == 'awaiting_prompt':
|
||||
if self._flags['become_prompt']:
|
||||
display.debug('Sending become_pass in response to prompt')
|
||||
stdin.write('{0}\n'.format(to_bytes(self._play_context.become_pass )))
|
||||
stdin.write(to_bytes(self._play_context.become_pass) + b'\n')
|
||||
self._flags['become_prompt'] = False
|
||||
state += 1
|
||||
elif self._flags['become_success']:
|
||||
|
@ -546,14 +547,14 @@ class Connection(ConnectionBase):
|
|||
if cmd[0] == b"sshpass" and p.returncode == 6:
|
||||
raise AnsibleError('Using a SSH password instead of a key is not possible because Host Key checking is enabled and sshpass does not support this. Please add this host\'s fingerprint to your known_hosts file to manage this host.')
|
||||
|
||||
controlpersisterror = 'Bad configuration option: ControlPersist' in stderr or 'unknown configuration option: ControlPersist' in stderr
|
||||
controlpersisterror = b'Bad configuration option: ControlPersist' in b_stderr or b'unknown configuration option: ControlPersist' in b_stderr
|
||||
if p.returncode != 0 and controlpersisterror:
|
||||
raise AnsibleError('using -c ssh on certain older ssh versions may not support ControlPersist, set ANSIBLE_SSH_ARGS="" (or ssh_args in [ssh_connection] section of the config file) before running again')
|
||||
|
||||
if p.returncode == 255 and in_data:
|
||||
raise AnsibleConnectionFailure('SSH Error: data could not be sent to the remote host. Make sure this host can be reached over ssh')
|
||||
|
||||
return (p.returncode, stdout, stderr)
|
||||
return (p.returncode, b_stdout, b_stderr)
|
||||
|
||||
def _exec_command(self, cmd, in_data=None, sudoable=True):
|
||||
''' run a command on the remote host '''
|
||||
|
|
|
@ -121,42 +121,42 @@ class TestConnectionBaseClass(unittest.TestCase):
|
|||
|
||||
mock_select.side_effect = _mock_select
|
||||
|
||||
mock_popen_res.stdout.read.side_effect = ["some data", ""]
|
||||
mock_popen_res.stderr.read.side_effect = [""]
|
||||
mock_popen_res.stdout.read.side_effect = [b"some data", b""]
|
||||
mock_popen_res.stderr.read.side_effect = [b""]
|
||||
conn._run("ssh", "this is input data")
|
||||
|
||||
# test with a password set to trigger the sshpass write
|
||||
pc.password = '12345'
|
||||
mock_popen_res.stdout.read.side_effect = ["some data", "", ""]
|
||||
mock_popen_res.stderr.read.side_effect = [""]
|
||||
mock_popen_res.stdout.read.side_effect = [b"some data", b"", b""]
|
||||
mock_popen_res.stderr.read.side_effect = [b""]
|
||||
conn._run(["ssh", "is", "a", "cmd"], "this is more data")
|
||||
|
||||
# test with password prompting enabled
|
||||
pc.password = None
|
||||
pc.prompt = True
|
||||
mock_popen_res.stdout.read.side_effect = ["some data", "", ""]
|
||||
mock_popen_res.stderr.read.side_effect = [""]
|
||||
mock_popen_res.stdout.read.side_effect = [b"some data", b"", b""]
|
||||
mock_popen_res.stderr.read.side_effect = [b""]
|
||||
conn._run("ssh", "this is input data")
|
||||
|
||||
# test with some become settings
|
||||
pc.prompt = False
|
||||
pc.become = True
|
||||
pc.success_key = 'BECOME-SUCCESS-abcdefg'
|
||||
mock_popen_res.stdout.read.side_effect = ["some data", "", ""]
|
||||
mock_popen_res.stderr.read.side_effect = [""]
|
||||
mock_popen_res.stdout.read.side_effect = [b"some data", b"", b""]
|
||||
mock_popen_res.stderr.read.side_effect = [b""]
|
||||
conn._run("ssh", "this is input data")
|
||||
|
||||
# simulate no data input
|
||||
mock_openpty.return_value = (98, 99)
|
||||
mock_popen_res.stdout.read.side_effect = ["some data", "", ""]
|
||||
mock_popen_res.stderr.read.side_effect = [""]
|
||||
mock_popen_res.stdout.read.side_effect = [b"some data", b"", b""]
|
||||
mock_popen_res.stderr.read.side_effect = [b""]
|
||||
conn._run("ssh", "")
|
||||
|
||||
# simulate no data input but Popen using new pty's fails
|
||||
mock_Popen.return_value = None
|
||||
mock_Popen.side_effect = [OSError(), mock_popen_res]
|
||||
mock_popen_res.stdout.read.side_effect = ["some data", "", ""]
|
||||
mock_popen_res.stderr.read.side_effect = [""]
|
||||
mock_popen_res.stdout.read.side_effect = [b"some data", b"", b""]
|
||||
mock_popen_res.stderr.read.side_effect = [b""]
|
||||
conn._run("ssh", "")
|
||||
|
||||
def test_plugins_connection_ssh__examine_output(self):
|
||||
|
@ -171,22 +171,22 @@ class TestConnectionBaseClass(unittest.TestCase):
|
|||
conn.check_missing_password = MagicMock()
|
||||
|
||||
def _check_password_prompt(line):
|
||||
if 'foo' in line:
|
||||
if b'foo' in line:
|
||||
return True
|
||||
return False
|
||||
|
||||
def _check_become_success(line):
|
||||
if 'BECOME-SUCCESS-abcdefghijklmnopqrstuvxyz' in line:
|
||||
if b'BECOME-SUCCESS-abcdefghijklmnopqrstuvxyz' in line:
|
||||
return True
|
||||
return False
|
||||
|
||||
def _check_incorrect_password(line):
|
||||
if 'incorrect password' in line:
|
||||
if b'incorrect password' in line:
|
||||
return True
|
||||
return False
|
||||
|
||||
def _check_missing_password(line):
|
||||
if 'bad password' in line:
|
||||
if b'bad password' in line:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
@ -204,9 +204,9 @@ class TestConnectionBaseClass(unittest.TestCase):
|
|||
)
|
||||
|
||||
pc.prompt = True
|
||||
output, unprocessed = conn._examine_output('source', 'state', 'line 1\nline 2\nfoo\nline 3\nthis should be the remainder', False)
|
||||
self.assertEqual(output, 'line 1\nline 2\nline 3\n')
|
||||
self.assertEqual(unprocessed, 'this should be the remainder')
|
||||
output, unprocessed = conn._examine_output(u'source', u'state', b'line 1\nline 2\nfoo\nline 3\nthis should be the remainder', False)
|
||||
self.assertEqual(output, b'line 1\nline 2\nline 3\n')
|
||||
self.assertEqual(unprocessed, b'this should be the remainder')
|
||||
self.assertTrue(conn._flags['become_prompt'])
|
||||
self.assertFalse(conn._flags['become_success'])
|
||||
self.assertFalse(conn._flags['become_error'])
|
||||
|
@ -221,10 +221,10 @@ class TestConnectionBaseClass(unittest.TestCase):
|
|||
)
|
||||
|
||||
pc.prompt = False
|
||||
pc.success_key = 'BECOME-SUCCESS-abcdefghijklmnopqrstuvxyz'
|
||||
output, unprocessed = conn._examine_output('source', 'state', 'line 1\nline 2\nBECOME-SUCCESS-abcdefghijklmnopqrstuvxyz\nline 3\n', False)
|
||||
self.assertEqual(output, 'line 1\nline 2\nline 3\n')
|
||||
self.assertEqual(unprocessed, '')
|
||||
pc.success_key = u'BECOME-SUCCESS-abcdefghijklmnopqrstuvxyz'
|
||||
output, unprocessed = conn._examine_output(u'source', u'state', b'line 1\nline 2\nBECOME-SUCCESS-abcdefghijklmnopqrstuvxyz\nline 3\n', False)
|
||||
self.assertEqual(output, b'line 1\nline 2\nline 3\n')
|
||||
self.assertEqual(unprocessed, b'')
|
||||
self.assertFalse(conn._flags['become_prompt'])
|
||||
self.assertTrue(conn._flags['become_success'])
|
||||
self.assertFalse(conn._flags['become_error'])
|
||||
|
@ -240,9 +240,9 @@ class TestConnectionBaseClass(unittest.TestCase):
|
|||
|
||||
pc.prompt = False
|
||||
pc.success_key = None
|
||||
output, unprocessed = conn._examine_output('source', 'state', 'line 1\nline 2\nincorrect password\n', True)
|
||||
self.assertEqual(output, 'line 1\nline 2\nincorrect password\n')
|
||||
self.assertEqual(unprocessed, '')
|
||||
output, unprocessed = conn._examine_output(u'source', u'state', b'line 1\nline 2\nincorrect password\n', True)
|
||||
self.assertEqual(output, b'line 1\nline 2\nincorrect password\n')
|
||||
self.assertEqual(unprocessed, b'')
|
||||
self.assertFalse(conn._flags['become_prompt'])
|
||||
self.assertFalse(conn._flags['become_success'])
|
||||
self.assertTrue(conn._flags['become_error'])
|
||||
|
@ -258,9 +258,9 @@ class TestConnectionBaseClass(unittest.TestCase):
|
|||
|
||||
pc.prompt = False
|
||||
pc.success_key = None
|
||||
output, unprocessed = conn._examine_output('source', 'state', 'line 1\nbad password\n', True)
|
||||
self.assertEqual(output, 'line 1\nbad password\n')
|
||||
self.assertEqual(unprocessed, '')
|
||||
output, unprocessed = conn._examine_output(u'source', u'state', b'line 1\nbad password\n', True)
|
||||
self.assertEqual(output, b'line 1\nbad password\n')
|
||||
self.assertEqual(unprocessed, b'')
|
||||
self.assertFalse(conn._flags['become_prompt'])
|
||||
self.assertFalse(conn._flags['become_success'])
|
||||
self.assertFalse(conn._flags['become_error'])
|
||||
|
|
Loading…
Reference in a new issue