From bd7a71bb29d0fc26ed6fbae802018d06ddefc346 Mon Sep 17 00:00:00 2001 From: Seth Vidal Date: Mon, 27 Feb 2012 00:43:02 -0500 Subject: [PATCH] implement manual multiprocessing pools for the runner. this fixes the ctrl-c not-working problem. implemented this solution: http://www.bryceboe.com/2010/08/26/python-multiprocessing-and-keyboardinterrupt/#georges also add hosts which do not get a chance to return results to the 'dark' results. --- lib/ansible/runner.py | 56 ++++++++++++++++++++++++++++++++++++------- 1 file changed, 48 insertions(+), 8 deletions(-) diff --git a/lib/ansible/runner.py b/lib/ansible/runner.py index 1dd3b65f13..f3a49b696d 100755 --- a/lib/ansible/runner.py +++ b/lib/ansible/runner.py @@ -21,16 +21,23 @@ import fnmatch import multiprocessing +import signal import os import json import traceback import paramiko # non-core dependency import ansible.constants as C -def _executor_hook(x): +def _executor_hook(job_queue, result_queue): ''' callback used by multiprocessing pool ''' - (runner, host) = x - return runner._executor(host) + signal.signal(signal.SIGINT, signal.SIG_IGN) + while not job_queue.empty(): + try: + job = job_queue.get(block=False) + runner, host = job + result_queue.put(runner._executor(host)) + except Queue.Empty: + pass class Runner(object): @@ -288,7 +295,7 @@ class Runner(object): def run(self): ''' xfer & run module on all matched hosts ''' - + # find hosts that match the pattern hosts = self.match_hosts(self.pattern) @@ -296,10 +303,34 @@ class Runner(object): # _executor_hook does all of the work hosts = [ (self,x) for x in hosts ] if self.forks > 1: - pool = multiprocessing.Pool(self.forks) - results = pool.map(_executor_hook, hosts) + job_queue = multiprocessing.Queue() + result_queue = multiprocessing.Queue() + + for i in hosts: + job_queue.put(i) + + workers = [] + for i in range(self.forks): + tmp = multiprocessing.Process(target=_executor_hook, + args=(job_queue, result_queue)) + tmp.start() + workers.append(tmp) + + try: + for worker in workers: + worker.join() + except KeyboardInterrupt: + print 'parent received ctrl-c' + for worker in workers: + worker.terminate() + worker.join() + + results = [] + while not result_queue.empty(): + results.append(result_queue.get(block=False)) + else: - results = [ _executor_hook(x) for x in hosts ] + results = [ x._executor(h) for (x,h) in hosts ] # sort hosts by ones we successfully contacted # and ones we did not so that we can return a @@ -309,13 +340,22 @@ class Runner(object): "contacted" : {}, "dark" : {} } + hosts_with_results = [] for x in results: (host, is_ok, result) = x + hosts_with_results.append(host) if not is_ok: results2["dark"][host] = result else: results2["contacted"][host] = result - + # hosts which were contacted but never got a chance + # to return a result before we exited/ctrl-c'd + # perhaps these shouldn't be 'dark' but I'm not sure if they fit + # anywhere else. + for host in self.match_hosts(self.pattern): + if host not in hosts_with_results: + results2["dark"][host] = {} + return results2