mirror of
https://github.com/ansible-collections/community.general.git
synced 2024-09-14 20:13:21 +02:00
3fba006207
This will keep us from accidentally catching program-exiting exceptions like KeyboardInterupt and SystemExit.
168 lines
6 KiB
Python
168 lines
6 KiB
Python
# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com>
|
|
#
|
|
# This file is part of Ansible
|
|
#
|
|
# Ansible is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# Ansible is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
# Make coding more python3-ish
|
|
from __future__ import (absolute_import, division, print_function)
|
|
__metaclass__ = type
|
|
|
|
import multiprocessing
|
|
import os
|
|
import sys
|
|
import traceback
|
|
|
|
from jinja2.exceptions import TemplateNotFound
|
|
|
|
HAS_PYCRYPTO_ATFORK = False
|
|
try:
|
|
from Crypto.Random import atfork
|
|
HAS_PYCRYPTO_ATFORK = True
|
|
except Exception:
|
|
# We only need to call atfork if pycrypto is used because it will need to
|
|
# reinitialize its RNG. Since old paramiko could be using pycrypto, we
|
|
# need to take charge of calling it.
|
|
pass
|
|
|
|
from ansible.errors import AnsibleConnectionFailure
|
|
from ansible.executor.task_executor import TaskExecutor
|
|
from ansible.executor.task_result import TaskResult
|
|
from ansible.module_utils._text import to_text
|
|
from ansible.utils.display import Display
|
|
|
|
__all__ = ['WorkerProcess']
|
|
|
|
display = Display()
|
|
|
|
|
|
class WorkerProcess(multiprocessing.Process):
|
|
'''
|
|
The worker thread class, which uses TaskExecutor to run tasks
|
|
read from a job queue and pushes results into a results queue
|
|
for reading later.
|
|
'''
|
|
|
|
def __init__(self, final_q, task_vars, host, task, play_context, loader, variable_manager, shared_loader_obj):
|
|
|
|
super(WorkerProcess, self).__init__()
|
|
# takes a task queue manager as the sole param:
|
|
self._final_q = final_q
|
|
self._task_vars = task_vars
|
|
self._host = host
|
|
self._task = task
|
|
self._play_context = play_context
|
|
self._loader = loader
|
|
self._variable_manager = variable_manager
|
|
self._shared_loader_obj = shared_loader_obj
|
|
|
|
if sys.stdin.isatty():
|
|
# dupe stdin, if we have one
|
|
self._new_stdin = sys.stdin
|
|
try:
|
|
fileno = sys.stdin.fileno()
|
|
if fileno is not None:
|
|
try:
|
|
self._new_stdin = os.fdopen(os.dup(fileno))
|
|
except OSError:
|
|
# couldn't dupe stdin, most likely because it's
|
|
# not a valid file descriptor, so we just rely on
|
|
# using the one that was passed in
|
|
pass
|
|
except (AttributeError, ValueError):
|
|
# couldn't get stdin's fileno, so we just carry on
|
|
pass
|
|
else:
|
|
# set to /dev/null
|
|
self._new_stdin = os.devnull
|
|
|
|
def run(self):
|
|
'''
|
|
Called when the process is started. Pushes the result onto the
|
|
results queue. We also remove the host from the blocked hosts list, to
|
|
signify that they are ready for their next task.
|
|
'''
|
|
|
|
# import cProfile, pstats, StringIO
|
|
# pr = cProfile.Profile()
|
|
# pr.enable()
|
|
|
|
if HAS_PYCRYPTO_ATFORK:
|
|
atfork()
|
|
|
|
try:
|
|
# execute the task and build a TaskResult from the result
|
|
display.debug("running TaskExecutor() for %s/%s" % (self._host, self._task))
|
|
executor_result = TaskExecutor(
|
|
self._host,
|
|
self._task,
|
|
self._task_vars,
|
|
self._play_context,
|
|
self._new_stdin,
|
|
self._loader,
|
|
self._shared_loader_obj,
|
|
self._final_q
|
|
).run()
|
|
|
|
display.debug("done running TaskExecutor() for %s/%s [%s]" % (self._host, self._task, self._task._uuid))
|
|
self._host.vars = dict()
|
|
self._host.groups = []
|
|
task_result = TaskResult(
|
|
self._host.name,
|
|
self._task._uuid,
|
|
executor_result,
|
|
task_fields=self._task.dump_attrs(),
|
|
)
|
|
|
|
# put the result on the result queue
|
|
display.debug("sending task result for task %s" % self._task._uuid)
|
|
self._final_q.put(task_result)
|
|
display.debug("done sending task result for task %s" % self._task._uuid)
|
|
|
|
except AnsibleConnectionFailure:
|
|
self._host.vars = dict()
|
|
self._host.groups = []
|
|
task_result = TaskResult(
|
|
self._host.name,
|
|
self._task._uuid,
|
|
dict(unreachable=True),
|
|
task_fields=self._task.dump_attrs(),
|
|
)
|
|
self._final_q.put(task_result, block=False)
|
|
|
|
except Exception as e:
|
|
if not isinstance(e, (IOError, EOFError, KeyboardInterrupt, SystemExit)) or isinstance(e, TemplateNotFound):
|
|
try:
|
|
self._host.vars = dict()
|
|
self._host.groups = []
|
|
task_result = TaskResult(
|
|
self._host.name,
|
|
self._task._uuid,
|
|
dict(failed=True, exception=to_text(traceback.format_exc()), stdout=''),
|
|
task_fields=self._task.dump_attrs(),
|
|
)
|
|
self._final_q.put(task_result, block=False)
|
|
except Exception:
|
|
display.debug(u"WORKER EXCEPTION: %s" % to_text(e))
|
|
display.debug(u"WORKER TRACEBACK: %s" % to_text(traceback.format_exc()))
|
|
|
|
display.debug("WORKER PROCESS EXITING")
|
|
|
|
# pr.disable()
|
|
# s = StringIO.StringIO()
|
|
# sortby = 'time'
|
|
# ps = pstats.Stats(pr, stream=s).sort_stats(sortby)
|
|
# ps.print_stats()
|
|
# with open('worker_%06d.stats' % os.getpid(), 'w') as f:
|
|
# f.write(s.getvalue())
|