1
0
Fork 0
mirror of https://github.com/ansible-collections/community.general.git synced 2024-09-14 20:13:21 +02:00

implement manual multiprocessing pools for the runner.

this fixes the ctrl-c not-working problem.

implemented this solution: http://www.bryceboe.com/2010/08/26/python-multiprocessing-and-keyboardinterrupt/#georges

also add hosts which do not get a chance to return results to the 'dark' results.
This commit is contained in:
Seth Vidal 2012-02-27 00:43:02 -05:00
parent 1a20b00d1f
commit bd7a71bb29

View file

@ -21,16 +21,23 @@
import fnmatch import fnmatch
import multiprocessing import multiprocessing
import signal
import os import os
import json import json
import traceback import traceback
import paramiko # non-core dependency import paramiko # non-core dependency
import ansible.constants as C import ansible.constants as C
def _executor_hook(x): def _executor_hook(job_queue, result_queue):
''' callback used by multiprocessing pool ''' ''' callback used by multiprocessing pool '''
(runner, host) = x signal.signal(signal.SIGINT, signal.SIG_IGN)
return runner._executor(host) while not job_queue.empty():
try:
job = job_queue.get(block=False)
runner, host = job
result_queue.put(runner._executor(host))
except Queue.Empty:
pass
class Runner(object): class Runner(object):
@ -288,7 +295,7 @@ class Runner(object):
def run(self): def run(self):
''' xfer & run module on all matched hosts ''' ''' xfer & run module on all matched hosts '''
# find hosts that match the pattern # find hosts that match the pattern
hosts = self.match_hosts(self.pattern) hosts = self.match_hosts(self.pattern)
@ -296,10 +303,34 @@ class Runner(object):
# _executor_hook does all of the work # _executor_hook does all of the work
hosts = [ (self,x) for x in hosts ] hosts = [ (self,x) for x in hosts ]
if self.forks > 1: if self.forks > 1:
pool = multiprocessing.Pool(self.forks) job_queue = multiprocessing.Queue()
results = pool.map(_executor_hook, hosts) result_queue = multiprocessing.Queue()
for i in hosts:
job_queue.put(i)
workers = []
for i in range(self.forks):
tmp = multiprocessing.Process(target=_executor_hook,
args=(job_queue, result_queue))
tmp.start()
workers.append(tmp)
try:
for worker in workers:
worker.join()
except KeyboardInterrupt:
print 'parent received ctrl-c'
for worker in workers:
worker.terminate()
worker.join()
results = []
while not result_queue.empty():
results.append(result_queue.get(block=False))
else: else:
results = [ _executor_hook(x) for x in hosts ] results = [ x._executor(h) for (x,h) in hosts ]
# sort hosts by ones we successfully contacted # sort hosts by ones we successfully contacted
# and ones we did not so that we can return a # and ones we did not so that we can return a
@ -309,13 +340,22 @@ class Runner(object):
"contacted" : {}, "contacted" : {},
"dark" : {} "dark" : {}
} }
hosts_with_results = []
for x in results: for x in results:
(host, is_ok, result) = x (host, is_ok, result) = x
hosts_with_results.append(host)
if not is_ok: if not is_ok:
results2["dark"][host] = result results2["dark"][host] = result
else: else:
results2["contacted"][host] = result results2["contacted"][host] = result
# hosts which were contacted but never got a chance
# to return a result before we exited/ctrl-c'd
# perhaps these shouldn't be 'dark' but I'm not sure if they fit
# anywhere else.
for host in self.match_hosts(self.pattern):
if host not in hosts_with_results:
results2["dark"][host] = {}
return results2 return results2