1
0
Fork 0
mirror of https://github.com/ansible-collections/community.general.git synced 2024-09-14 20:13:21 +02:00

Revert "This patch makes Ansible reuse fork allocation between seperate instantations of the runner API, therefore the overhead of recreating forks"

This reverts commit 85d66b9a0c.
This commit is contained in:
Michael DeHaan 2014-02-07 18:10:38 -05:00
parent c53538dc77
commit 6685b4989e

View file

@ -49,7 +49,6 @@ from ansible.module_common import ModuleReplacer
module_replacer = ModuleReplacer(strip_comments=False) module_replacer = ModuleReplacer(strip_comments=False)
NEED_ATFORK=False
HAS_ATFORK=True HAS_ATFORK=True
try: try:
from Crypto.Random import atfork from Crypto.Random import atfork
@ -61,28 +60,30 @@ multiprocessing_runner = None
OUTPUT_LOCKFILE = tempfile.TemporaryFile() OUTPUT_LOCKFILE = tempfile.TemporaryFile()
PROCESS_LOCKFILE = tempfile.TemporaryFile() PROCESS_LOCKFILE = tempfile.TemporaryFile()
from foon import Foon
FOON = Foon()
################################################ ################################################
class KeyboardInterruptError(Exception): def _executor_hook(job_queue, result_queue, new_stdin):
pass
def _executor_hook(params):
(host, my_stdin) = params
# attempt workaround of https://github.com/newsapps/beeswithmachineguns/issues/17 # attempt workaround of https://github.com/newsapps/beeswithmachineguns/issues/17
# this function also not present in CentOS 6 # this function also not present in CentOS 6
if HAS_ATFORK and NEED_ATFORK: if HAS_ATFORK:
atfork() atfork()
signal.signal(signal.SIGINT, signal.SIG_IGN)
while not job_queue.empty():
try: try:
return multiprocessing_runner._executor(host, my_stdin) host = job_queue.get(block=False)
except KeyboardInterrupt: return_data = multiprocessing_runner._executor(host, new_stdin)
raise KeyboardInterruptError() result_queue.put(return_data)
if 'LEGACY_TEMPLATE_WARNING' in return_data.flags:
# pass data back up across the multiprocessing fork boundary
template.Flags.LEGACY_TEMPLATE_WARNING = True
except Queue.Empty:
pass
except:
traceback.print_exc()
class HostVars(dict): class HostVars(dict):
''' A special view of setup_cache that adds values from the inventory when needed. ''' ''' A special view of setup_cache that adds values from the inventory when needed. '''
@ -208,9 +209,6 @@ class Runner(object):
else: else:
self.transport = "ssh" self.transport = "ssh"
if self.transport == "paramiko":
global NEED_ATFORK
NEED_ATFORK=True
# misc housekeeping # misc housekeeping
if subset and self.inventory._subset is None: if subset and self.inventory._subset is None:
@ -1058,11 +1056,39 @@ class Runner(object):
# ***************************************************** # *****************************************************
def _parallel_exec(self, hosts): def _parallel_exec(self, hosts):
''' handles mulitprocessing when more than 1 fork is required ''' ''' handles mulitprocessing when more than 1 fork is required '''
FOON.set_size(self.forks) manager = multiprocessing.Manager()
return FOON.map(_executor_hook, hosts) job_queue = manager.Queue()
for host in hosts:
job_queue.put(host)
result_queue = manager.Queue()
workers = []
for i in range(self.forks):
new_stdin = os.fdopen(os.dup(sys.stdin.fileno()))
prc = multiprocessing.Process(target=_executor_hook,
args=(job_queue, result_queue, new_stdin))
prc.start()
workers.append(prc)
try:
for worker in workers:
worker.join()
except KeyboardInterrupt:
for worker in workers:
worker.terminate()
worker.join()
results = []
try:
while not result_queue.empty():
results.append(result_queue.get(block=False))
except socket.error:
raise errors.AnsibleError("<interrupted>")
return results
# ***************************************************** # *****************************************************