diff --git a/lib/ansible/executor/task_queue_manager.py b/lib/ansible/executor/task_queue_manager.py index ae90a29e93..d139e26acb 100644 --- a/lib/ansible/executor/task_queue_manager.py +++ b/lib/ansible/executor/task_queue_manager.py @@ -63,6 +63,7 @@ class TaskQueueManager: self._callbacks_loaded = False self._callback_plugins = [] self._start_at_done = False + self._result_prc = None # make sure the module path (if specified) is parsed and # added to the module_loader object @@ -92,17 +93,17 @@ class TaskQueueManager: # plugins for inter-process locking. self._connection_lockfile = tempfile.TemporaryFile() - #FIXME: should this move to 'run' and get serial and play pattern applied as limiter? - # Treat "forks" config parameter as max value. Only create number of workers - # equal to number of hosts in inventory if less than max value. - num_workers = min(self._options.forks, len(self._inventory.list_hosts())) - self._workers = [] - for i in range(num_workers): + + def _initialize_workers(self, num): + if num <= 0: + num = 1 + + for i in range(num): main_q = multiprocessing.Queue() rslt_q = multiprocessing.Queue() - prc = WorkerProcess(self, main_q, rslt_q, loader) + prc = WorkerProcess(self, main_q, rslt_q, self._loader) prc.start() self._workers.append((prc, main_q, rslt_q)) @@ -178,6 +179,11 @@ class TaskQueueManager: are done with the current task). ''' + #FIXME: should this move to 'run' and get serial and play pattern applied as limiter? + # Treat "forks" config parameter as max value. Only create number of workers + # equal to number of hosts in inventory if less than max value. + self._initialize_workers(min( [ v for v in [self._options.forks, play.serial, len(self._inventory.get_hosts(play.hosts))] if v is not None] )) + if not self._callbacks_loaded: self.load_callbacks() @@ -227,12 +233,14 @@ class TaskQueueManager: self.terminate() self._final_q.close() - self._result_prc.terminate() - for (worker_prc, main_q, rslt_q) in self._workers: - rslt_q.close() - main_q.close() - worker_prc.terminate() + if self._result_prc: + self._result_prc.terminate() + + for (worker_prc, main_q, rslt_q) in self._workers: + rslt_q.close() + main_q.close() + worker_prc.terminate() def clear_failed_hosts(self): self._failed_hosts = dict()