1
0
Fork 0
mirror of https://github.com/ansible-collections/community.general.git synced 2024-09-14 20:13:21 +02:00

Allow persistent connection plugins to queue messages back to ansible-connection (#49977)

* Connections can queue messages to be returned from ansible-connection

* Provide fallback for invalid display level

* Strip display from plugins

* Route messages through helper method to try to avoid improper appends
This commit is contained in:
Nathaniel Case 2018-12-19 10:54:42 -05:00 committed by GitHub
parent 49993a55e5
commit 1829a72885
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 75 additions and 83 deletions

View file

@ -90,7 +90,7 @@ class ConnectionProcess(object):
messages = list()
result = {}
messages.append('control socket path is %s' % self.socket_path)
messages.append(('vvvv', 'control socket path is %s' % self.socket_path))
# If this is a relative path (~ gets expanded later) then plug the
# key's path on to the directory we originally came from, so we can
@ -100,17 +100,18 @@ class ConnectionProcess(object):
self.connection = connection_loader.get(self.play_context.connection, self.play_context, '/dev/null',
ansible_playbook_pid=self._ansible_playbook_pid)
self.connection.set_options(var_options=variables)
self.connection._connect()
self.connection._socket_path = self.socket_path
self.srv.register(self.connection)
messages.extend(sys.stdout.getvalue().splitlines())
messages.append('connection to remote device started successfully')
messages.extend([('vvvv', msg) for msg in sys.stdout.getvalue().splitlines()])
messages.append(('vvvv', 'connection to remote device started successfully'))
self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.sock.bind(self.socket_path)
self.sock.listen(1)
messages.append('local domain socket listeners started successfully')
messages.append(('vvvv', 'local domain socket listeners started successfully'))
except Exception as exc:
result['error'] = to_text(exc)
result['exception'] = traceback.format_exc()
@ -256,7 +257,7 @@ def main():
with file_lock(lock_path):
if not os.path.exists(socket_path):
messages.append('local domain socket does not exist, starting it')
messages.append(('vvvv', 'local domain socket does not exist, starting it'))
original_path = os.getcwd()
r, w = os.pipe()
pid = fork_process()
@ -268,7 +269,7 @@ def main():
process = ConnectionProcess(wfd, play_context, socket_path, original_path, ansible_playbook_pid)
process.start(variables)
except Exception:
messages.append(traceback.format_exc())
messages.append(('error', traceback.format_exc()))
rc = 1
if rc == 0:
@ -286,12 +287,12 @@ def main():
result.update(data)
else:
messages.append('found existing local domain socket, using it!')
messages.append(('vvvv', 'found existing local domain socket, using it!'))
conn = Connection(socket_path)
conn.set_options(var_options=variables)
pc_data = to_text(init_data)
try:
messages.extend(conn.update_play_context(pc_data))
conn.update_play_context(pc_data)
except Exception as exc:
# Only network_cli has update_play context, so missing this is
# not fatal e.g. netconf
@ -303,7 +304,8 @@ def main():
'exception': traceback.format_exc()
})
messages.append(sys.stdout.getvalue())
messages.extend(Connection(socket_path).pop_messages())
messages.append(('vvvv', sys.stdout.getvalue()))
result.update({
'messages': messages,
'socket_path': socket_path

View file

@ -1027,8 +1027,16 @@ class TaskExecutor:
result = {'error': to_text(stderr, errors='surrogate_then_replace')}
if 'messages' in result:
for msg in result.get('messages'):
display.vvvv('%s' % msg, host=self._play_context.remote_addr)
for level, message in result['messages']:
if level == 'log':
display.display(message, log_only=True)
elif level in ('debug', 'v', 'vv', 'vvv', 'vvvv', 'vvvvv', 'vvvvvv'):
getattr(display, level)(message, host=self._play_context.remote_addr)
else:
if hasattr(display, level):
getattr(display, level)(message)
else:
display.vvvv(message, host=self._play_context.remote_addr)
if 'error' in result:
if self._play_context.verbosity > 2:

View file

@ -25,7 +25,6 @@ from functools import wraps
from ansible.plugins import AnsiblePlugin
from ansible.errors import AnsibleError, AnsibleConnectionFailure
from ansible.module_utils._text import to_bytes, to_text
from ansible.utils.display import Display
try:
from scp import SCPClient
@ -33,8 +32,6 @@ try:
except ImportError:
HAS_SCP = False
display = Display()
def enable_mode(func):
@wraps(func)
@ -88,7 +85,7 @@ class CliconfBase(AnsiblePlugin):
def _alarm_handler(self, signum, frame):
"""Alarm handler raised in case of command timeout """
display.display('closing shell due to command timeout (%s seconds).' % self._connection._play_context.timeout, log_only=True)
self._connection.queue_message('log', 'closing shell due to command timeout (%s seconds).' % self._connection._play_context.timeout)
self.close()
def send_command(self, command=None, prompt=None, answer=None, sendonly=False, newline=True, prompt_retry_check=False, check_all=False):

View file

@ -27,9 +27,6 @@ from itertools import chain
from ansible.module_utils._text import to_bytes, to_text
from ansible.module_utils.network.common.utils import to_list
from ansible.plugins.cliconf import CliconfBase, enable_mode
from ansible.utils.display import Display
display = Display()
class Cliconf(CliconfBase):

View file

@ -291,6 +291,7 @@ class NetworkConnectionBase(ConnectionBase):
def __init__(self, play_context, new_stdin, *args, **kwargs):
super(NetworkConnectionBase, self).__init__(play_context, new_stdin, *args, **kwargs)
self._messages = []
self._network_os = self._play_context.network_os
@ -319,6 +320,20 @@ class NetworkConnectionBase(ConnectionBase):
def exec_command(self, cmd, in_data=None, sudoable=True):
return self._local.exec_command(cmd, in_data, sudoable)
def queue_message(self, level, message):
"""
Adds a message to the queue of messages waiting to be pushed back to the controller process.
:arg level: A string which can either be the name of a method in display, or 'log'. When
the messages are returned to task_executor, a value of log will correspond to
``display.display(message, log_only=True)``, while another value will call ``display.[level](message)``
"""
self._messages.append((level, message))
def pop_messages(self):
messages, self._messages = self._messages, []
return messages
def put_file(self, in_path, out_path):
"""Transfer a file from local to remote"""
return self._local.put_file(in_path, out_path)
@ -332,9 +347,9 @@ class NetworkConnectionBase(ConnectionBase):
Reset the connection
'''
if self._socket_path:
display.vvvv('resetting persistent connection for socket_path %s' % self._socket_path, host=self._play_context.remote_addr)
self.queue_message('vvvv', 'resetting persistent connection for socket_path %s' % self._socket_path)
self.close()
display.vvvv('reset call on connection instance', host=self._play_context.remote_addr)
self.queue_message('vvvv', 'reset call on connection instance')
def close(self):
if self._connected:

View file

@ -156,9 +156,6 @@ from ansible.module_utils.urls import open_url
from ansible.playbook.play_context import PlayContext
from ansible.plugins.loader import httpapi_loader
from ansible.plugins.connection import NetworkConnectionBase
from ansible.utils.display import Display
display = Display()
class Connection(NetworkConnectionBase):
@ -178,7 +175,7 @@ class Connection(NetworkConnectionBase):
self.httpapi = httpapi_loader.get(self._network_os, self)
if self.httpapi:
self._sub_plugin = {'type': 'httpapi', 'name': self._network_os, 'obj': self.httpapi}
display.vvvv('loaded API plugin for network_os %s' % self._network_os)
self.queue_message('vvvv', 'loaded API plugin for network_os %s' % self._network_os)
else:
raise AnsibleConnectionFailure('unable to load API plugin for network_os %s' % self._network_os)
@ -187,7 +184,7 @@ class Connection(NetworkConnectionBase):
'Unable to automatically determine host network os. Please '
'manually configure ansible_network_os value for this host'
)
display.display('network_os is set to %s' % self._network_os, log_only=True)
self.queue_message('log', 'network_os is set to %s' % self._network_os)
def update_play_context(self, pc_data):
"""Updates the play context information for the connection"""
@ -199,16 +196,15 @@ class Connection(NetworkConnectionBase):
play_context = PlayContext()
play_context.deserialize(pc_data)
messages = ['updating play_context for connection']
self.queue_message('vvvv', 'updating play_context for connection')
if self._play_context.become ^ play_context.become:
self.set_become(play_context)
if play_context.become is True:
messages.append('authorizing connection')
self.queue_message('vvvv', 'authorizing connection')
else:
messages.append('deauthorizing connection')
self.queue_message('vvvv', 'deauthorizing connection')
self._play_context = play_context
return messages
def _connect(self):
if not self.connected:
@ -228,7 +224,7 @@ class Connection(NetworkConnectionBase):
'''
# only close the connection if its connected.
if self._connected:
display.vvvv("closing http(s) connection to device", host=self._play_context.remote_addr)
self.queue_message('vvvv', "closing http(s) connection to device")
self.logout()
super(Connection, self).close()

View file

@ -131,7 +131,6 @@ options:
from ansible.errors import AnsibleConnectionFailure, AnsibleError
from ansible.plugins.connection import NetworkConnectionBase
from ansible.utils.display import Display
try:
from napalm import get_network_driver
@ -140,8 +139,6 @@ try:
except ImportError:
HAS_NAPALM = False
display = Display()
class Connection(NetworkConnectionBase):
"""Napalm connections"""
@ -168,7 +165,7 @@ class Connection(NetworkConnectionBase):
'Unable to automatically determine host network os. Please '
'manually configure ansible_network_os value for this host'
)
display.display('network_os is set to %s' % self._network_os, log_only=True)
self.queue_message('log', 'network_os is set to %s' % self._network_os)
try:
driver = get_network_driver(self._network_os)
@ -186,7 +183,7 @@ class Connection(NetworkConnectionBase):
self.napalm.open()
self._sub_plugin = {'type': 'external', 'name': 'napalm', 'obj': self.napalm}
display.vvvv('created napalm device for network_os %s' % self._network_os, host=host)
self.queue_message('vvvv', 'created napalm device for network_os %s' % self._network_os)
self._connected = True
def close(self):

View file

@ -181,7 +181,6 @@ from ansible.module_utils._text import to_bytes, to_native, to_text
from ansible.module_utils.parsing.convert_bool import BOOLEANS_TRUE, BOOLEANS_FALSE
from ansible.plugins.loader import netconf_loader
from ansible.plugins.connection import NetworkConnectionBase
from ansible.utils.display import Display
try:
from ncclient import manager
@ -192,8 +191,6 @@ try:
except ImportError:
HAS_NCCLIENT = False
display = Display()
logging.getLogger('ncclient').setLevel(logging.INFO)
NETWORK_OS_DEVICE_PARAM_MAP = {
@ -219,12 +216,12 @@ class Connection(NetworkConnectionBase):
netconf = netconf_loader.get(self._network_os, self)
if netconf:
self._sub_plugin = {'type': 'netconf', 'name': self._network_os, 'obj': netconf}
display.display('loaded netconf plugin for network_os %s' % self._network_os, log_only=True)
self.queue_message('log', 'loaded netconf plugin for network_os %s' % self._network_os)
else:
netconf = netconf_loader.get("default", self)
self._sub_plugin = {'type': 'netconf', 'name': 'default', 'obj': netconf}
display.display('unable to load netconf plugin for network_os %s, falling back to default plugin' % self._network_os)
display.display('network_os is set to %s' % self._network_os, log_only=True)
self.queue_message('display', 'unable to load netconf plugin for network_os %s, falling back to default plugin' % self._network_os)
self.queue_message('log', 'network_os is set to %s' % self._network_os)
self._manager = None
self.key_filename = None
@ -259,7 +256,7 @@ class Connection(NetworkConnectionBase):
'Please run pip install ncclient'
)
display.display('ssh connection done, starting ncclient', log_only=True)
self.queue_message('log', 'ssh connection done, starting ncclient')
allow_agent = True
if self._play_context.password is not None:
@ -274,7 +271,7 @@ class Connection(NetworkConnectionBase):
for cls in netconf_loader.all(class_only=True):
network_os = cls.guess_network_os(self)
if network_os:
display.display('discovered network_os %s' % network_os, log_only=True)
self.queue_message('log', 'discovered network_os %s' % network_os)
self._network_os = network_os
device_params = {'name': NETWORK_OS_DEVICE_PARAM_MAP.get(self._network_os) or self._network_os}
@ -307,7 +304,7 @@ class Connection(NetworkConnectionBase):
if not self._manager.connected:
return 1, b'', b'not connected'
display.display('ncclient manager object created successfully', log_only=True)
self.queue_message('log', 'ncclient manager object created successfully')
self._connected = True

View file

@ -192,9 +192,6 @@ from ansible.module_utils._text import to_bytes, to_text
from ansible.playbook.play_context import PlayContext
from ansible.plugins.connection import NetworkConnectionBase
from ansible.plugins.loader import cliconf_loader, terminal_loader, connection_loader
from ansible.utils.display import Display
display = Display()
class AnsibleCmdRespRecv(Exception):
@ -230,16 +227,16 @@ class Connection(NetworkConnectionBase):
self.cliconf = cliconf_loader.get(self._network_os, self)
if self.cliconf:
display.vvvv('loaded cliconf plugin for network_os %s' % self._network_os)
self.queue_message('vvvv', 'loaded cliconf plugin for network_os %s' % self._network_os)
self._sub_plugin = {'type': 'cliconf', 'name': self._network_os, 'obj': self.cliconf}
else:
display.vvvv('unable to load cliconf for network_os %s' % self._network_os)
self.queue_message('vvvv', 'unable to load cliconf for network_os %s' % self._network_os)
else:
raise AnsibleConnectionFailure(
'Unable to automatically determine host network os. Please '
'manually configure ansible_network_os value for this host'
)
display.display('network_os is set to %s' % self._network_os, log_only=True)
self.queue_message('log', 'network_os is set to %s' % self._network_os)
def _get_log_channel(self):
name = "p=%s u=%s | " % (os.getpid(), getpass.getuser())
@ -282,15 +279,15 @@ class Connection(NetworkConnectionBase):
play_context = PlayContext()
play_context.deserialize(pc_data)
messages = ['updating play_context for connection']
self.queue_message('vvvv', 'updating play_context for connection')
if self._play_context.become ^ play_context.become:
if play_context.become is True:
auth_pass = play_context.become_pass
self._terminal.on_become(passwd=auth_pass)
messages.append('authorizing connection')
self.queue_message('vvvv', 'authorizing connection')
else:
self._terminal.on_unbecome()
messages.append('deauthorizing connection')
self.queue_message('vvvv', 'deauthorizing connection')
self._play_context = play_context
@ -299,8 +296,6 @@ class Connection(NetworkConnectionBase):
if hasattr(self, 'disable_response_logging'):
self.disable_response_logging()
return messages
def _connect(self):
'''
Connects to the remote device and starts the terminal
@ -313,7 +308,7 @@ class Connection(NetworkConnectionBase):
ssh = self.paramiko_conn._connect()
host = self.get_option('host')
display.vvvv('ssh connection done, setting terminal', host=host)
self.queue_message('vvvv', 'ssh connection done, setting terminal')
self._ssh_shell = ssh.ssh.invoke_shell()
self._ssh_shell.settimeout(self.get_option('persistent_command_timeout'))
@ -322,20 +317,20 @@ class Connection(NetworkConnectionBase):
if not self._terminal:
raise AnsibleConnectionFailure('network os %s is not supported' % self._network_os)
display.vvvv('loaded terminal plugin for network_os %s' % self._network_os, host=host)
self.queue_message('vvvv', 'loaded terminal plugin for network_os %s' % self._network_os)
self.receive(prompts=self._terminal.terminal_initial_prompt, answer=self._terminal.terminal_initial_answer,
newline=self._terminal.terminal_inital_prompt_newline)
display.vvvv('firing event: on_open_shell()', host=host)
self.queue_message('vvvv', 'firing event: on_open_shell()')
self._terminal.on_open_shell()
if self._play_context.become and self._play_context.become_method == 'enable':
display.vvvv('firing event: on_become', host=host)
self.queue_message('vvvv', 'firing event: on_become')
auth_pass = self._play_context.become_pass
self._terminal.on_become(passwd=auth_pass)
display.vvvv('ssh connection has completed successfully', host=host)
self.queue_message('vvvv', 'ssh connection has completed successfully')
self._connected = True
return self
@ -346,17 +341,17 @@ class Connection(NetworkConnectionBase):
'''
# only close the connection if its connected.
if self._connected:
display.debug("closing ssh connection to device", host=self._play_context.remote_addr)
self.queue_message('debug', "closing ssh connection to device")
if self._ssh_shell:
display.debug("firing event: on_close_shell()")
self.queue_message('debug', "firing event: on_close_shell()")
self._terminal.on_close_shell()
self._ssh_shell.close()
self._ssh_shell = None
display.debug("cli session is now closed")
self.queue_message('debug', "cli session is now closed")
self.paramiko_conn.close()
self.paramiko_conn = None
display.debug("ssh connection has been closed successfully")
self.queue_message('debug', "ssh connection has been closed successfully")
super(Connection, self).close()
def receive(self, command=None, prompts=None, answer=None, newline=True, prompt_retry_check=False, check_all=False):
@ -451,19 +446,19 @@ class Connection(NetworkConnectionBase):
response = self.receive(command, prompt, answer, newline, prompt_retry_check, check_all)
return to_text(response, errors='surrogate_or_strict')
except (socket.timeout, AttributeError):
display.vvvv(traceback.format_exc(), host=self._play_context.remote_addr)
self.queue_message('error', traceback.format_exc())
raise AnsibleConnectionFailure("timeout value %s seconds reached while trying to send command: %s"
% (self._ssh_shell.gettimeout(), command.strip()))
def _handle_buffer_read_timeout(self, signum, frame):
display.vvvv("Response received, triggered 'persistent_buffer_read_timeout' timer of %s seconds"
% self.get_option('persistent_buffer_read_timeout'), host=self._play_context.remote_addr)
self.queue_message('vvvv', "Response received, triggered 'persistent_buffer_read_timeout' timer of %s seconds" %
self.get_option('persistent_buffer_read_timeout'))
raise AnsibleCmdRespRecv()
def _handle_command_timeout(self, signum, frame):
msg = 'command timeout triggered, timeout value is %s secs.\nSee the timeout setting options in the Network Debug and Troubleshooting Guide.'\
% self.get_option('persistent_command_timeout')
display.display(msg, log_only=True)
self.queue_message('log', msg)
raise AnsibleConnectionFailure(msg)
def _strip(self, data):

View file

@ -32,9 +32,6 @@ from ansible.module_utils._text import to_text
from ansible.module_utils.connection import ConnectionError
from ansible.module_utils.network.common.utils import to_list
from ansible.plugins.httpapi import HttpApiBase
from ansible.utils.display import Display
display = Display()
OPTIONS = {
@ -73,7 +70,7 @@ class HttpApi(HttpApiBase):
def send_request(self, data, **message_kwargs):
data = to_list(data)
if self._become:
display.vvvv('firing event: on_become')
self.connection.queue_message('vvvv', 'firing event: on_become')
data.insert(0, {"cmd": "enable", "input": self._become_pass})
output = message_kwargs.get('output', 'text')

View file

@ -60,7 +60,6 @@ from ansible.plugins.httpapi import HttpApiBase
from urllib3 import encode_multipart_formdata
from urllib3.fields import RequestField
from ansible.module_utils.connection import ConnectionError
from ansible.utils.display import Display
BASE_HEADERS = {
'Content-Type': 'application/json',
@ -70,8 +69,6 @@ BASE_HEADERS = {
TOKEN_EXPIRATION_STATUS_CODE = 408
UNAUTHORIZED_STATUS_CODE = 401
display = Display()
class HttpApi(HttpApiBase):
def __init__(self, connection):
@ -224,7 +221,7 @@ class HttpApi(HttpApiBase):
return None
def _display(self, http_method, title, msg=''):
display.vvvv('REST:%s:%s:%s\n%s' % (http_method, self.connection._url, title, msg))
self.connection.queue_message('vvvv', 'REST:%s:%s:%s\n%s' % (http_method, self.connection._url, title, msg))
@staticmethod
def _get_response_value(response_data):

View file

@ -22,9 +22,6 @@ from ansible.module_utils._text import to_text
from ansible.module_utils.connection import ConnectionError
from ansible.module_utils.network.common.utils import to_list
from ansible.plugins.httpapi import HttpApiBase
from ansible.utils.display import Display
display = Display()
OPTIONS = {
@ -75,7 +72,7 @@ class HttpApi(HttpApiBase):
def _run_queue(self, queue, output):
if self._become:
display.vvvv('firing event: on_become')
self.connection.queue_message('vvvv', 'firing event: on_become')
queue.insert(0, 'enable')
request = request_builder(queue, output)

View file

@ -27,7 +27,6 @@ from ansible.module_utils._text import to_text, to_bytes, to_native
from ansible.errors import AnsibleConnectionFailure, AnsibleError
from ansible.plugins.netconf import NetconfBase
from ansible.plugins.netconf import ensure_connected
from ansible.utils.display import Display
try:
from ncclient import manager
@ -37,8 +36,6 @@ try:
except ImportError:
raise AnsibleError("ncclient is not installed")
display = Display()
class Netconf(NetconfBase):