1
0
Fork 0
mirror of https://github.com/ansible-collections/community.general.git synced 2024-09-14 20:13:21 +02:00

This patch makes Ansible reuse fork allocation between seperate instantations of the runner API, therefore the overhead of recreating forks

between tasks in a playbook is avoided.  The fork pool will be regenerated when a second play comes along and needs more hosts.
This commit is contained in:
Michael DeHaan 2014-02-07 16:52:17 -05:00
parent 66967bde14
commit 85d66b9a0c

View file

@ -49,6 +49,7 @@ from ansible.module_common import ModuleReplacer
module_replacer = ModuleReplacer(strip_comments=False) module_replacer = ModuleReplacer(strip_comments=False)
NEED_ATFORK=False
HAS_ATFORK=True HAS_ATFORK=True
try: try:
from Crypto.Random import atfork from Crypto.Random import atfork
@ -60,30 +61,28 @@ multiprocessing_runner = None
OUTPUT_LOCKFILE = tempfile.TemporaryFile() OUTPUT_LOCKFILE = tempfile.TemporaryFile()
PROCESS_LOCKFILE = tempfile.TemporaryFile() PROCESS_LOCKFILE = tempfile.TemporaryFile()
from foon import Foon
FOON = Foon()
################################################ ################################################
def _executor_hook(job_queue, result_queue, new_stdin): class KeyboardInterruptError(Exception):
pass
def _executor_hook(params):
(host, my_stdin) = params
# attempt workaround of https://github.com/newsapps/beeswithmachineguns/issues/17 # attempt workaround of https://github.com/newsapps/beeswithmachineguns/issues/17
# this function also not present in CentOS 6 # this function also not present in CentOS 6
if HAS_ATFORK: if HAS_ATFORK and NEED_ATFORK:
atfork() atfork()
signal.signal(signal.SIGINT, signal.SIG_IGN) try:
while not job_queue.empty(): return multiprocessing_runner._executor(host, my_stdin)
try: except KeyboardInterrupt:
host = job_queue.get(block=False) raise KeyboardInterruptError()
return_data = multiprocessing_runner._executor(host, new_stdin)
result_queue.put(return_data)
if 'LEGACY_TEMPLATE_WARNING' in return_data.flags:
# pass data back up across the multiprocessing fork boundary
template.Flags.LEGACY_TEMPLATE_WARNING = True
except Queue.Empty:
pass
except:
traceback.print_exc()
class HostVars(dict): class HostVars(dict):
''' A special view of setup_cache that adds values from the inventory when needed. ''' ''' A special view of setup_cache that adds values from the inventory when needed. '''
@ -209,6 +208,9 @@ class Runner(object):
else: else:
self.transport = "ssh" self.transport = "ssh"
if self.transport == "paramiko":
global NEED_ATFORK
NEED_ATFORK=True
# misc housekeeping # misc housekeeping
if subset and self.inventory._subset is None: if subset and self.inventory._subset is None:
@ -1056,39 +1058,11 @@ class Runner(object):
# ***************************************************** # *****************************************************
def _parallel_exec(self, hosts): def _parallel_exec(self, hosts):
''' handles mulitprocessing when more than 1 fork is required ''' ''' handles mulitprocessing when more than 1 fork is required '''
manager = multiprocessing.Manager() FOON.set_size(self.forks)
job_queue = manager.Queue() return FOON.map(_executor_hook, hosts)
for host in hosts:
job_queue.put(host)
result_queue = manager.Queue()
workers = []
for i in range(self.forks):
new_stdin = os.fdopen(os.dup(sys.stdin.fileno()))
prc = multiprocessing.Process(target=_executor_hook,
args=(job_queue, result_queue, new_stdin))
prc.start()
workers.append(prc)
try:
for worker in workers:
worker.join()
except KeyboardInterrupt:
for worker in workers:
worker.terminate()
worker.join()
results = []
try:
while not result_queue.empty():
results.append(result_queue.get(block=False))
except socket.error:
raise errors.AnsibleError("<interrupted>")
return results
# ***************************************************** # *****************************************************