diff --git a/lib/ansible/executor/task_queue_manager.py b/lib/ansible/executor/task_queue_manager.py index d139e26acb..b027d487ea 100644 --- a/lib/ansible/executor/task_queue_manager.py +++ b/lib/ansible/executor/task_queue_manager.py @@ -96,9 +96,6 @@ class TaskQueueManager: self._workers = [] def _initialize_workers(self, num): - if num <= 0: - num = 1 - for i in range(num): main_q = multiprocessing.Queue() rslt_q = multiprocessing.Queue() @@ -179,10 +176,11 @@ class TaskQueueManager: are done with the current task). ''' - #FIXME: should this move to 'run' and get serial and play pattern applied as limiter? # Treat "forks" config parameter as max value. Only create number of workers # equal to number of hosts in inventory if less than max value. - self._initialize_workers(min( [ v for v in [self._options.forks, play.serial, len(self._inventory.get_hosts(play.hosts))] if v is not None] )) + contenders = [self._options.forks, play.serial, len(self._inventory.get_hosts(play.hosts))] + contenders = [ v for v in contenders if v is not None and v > 0 ] + self._initialize_workers(min( contenders )) if not self._callbacks_loaded: self.load_callbacks() @@ -224,16 +222,18 @@ class TaskQueueManager: if getattr(self._options, 'start_at_task', None) is not None and play_context.start_at_task is None: self._start_at_done = True - # and run the play using the strategy - return strategy.run(iterator, play_context) + # and run the play using the strategy and cleanup on way out + play_return = strategy.run(iterator, play_context) + self._cleanup_workers() + return play_return def cleanup(self): self._display.debug("RUNNING CLEANUP") - self.terminate() - self._final_q.close() + self._cleanup_workers() + def _cleanup_workers(self): if self._result_prc: self._result_prc.terminate()