From b44f1428d04e682c300ae8a743a264bc8b53b8e6 Mon Sep 17 00:00:00 2001 From: Brian Coca Date: Wed, 4 Nov 2015 15:16:20 -0500 Subject: [PATCH 1/2] use minimal number of forks taking serial, forks and play patterns into account --- lib/ansible/executor/task_queue_manager.py | 32 ++++++++++++++-------- 1 file changed, 20 insertions(+), 12 deletions(-) diff --git a/lib/ansible/executor/task_queue_manager.py b/lib/ansible/executor/task_queue_manager.py index ae90a29e93..d139e26acb 100644 --- a/lib/ansible/executor/task_queue_manager.py +++ b/lib/ansible/executor/task_queue_manager.py @@ -63,6 +63,7 @@ class TaskQueueManager: self._callbacks_loaded = False self._callback_plugins = [] self._start_at_done = False + self._result_prc = None # make sure the module path (if specified) is parsed and # added to the module_loader object @@ -92,17 +93,17 @@ class TaskQueueManager: # plugins for inter-process locking. self._connection_lockfile = tempfile.TemporaryFile() - #FIXME: should this move to 'run' and get serial and play pattern applied as limiter? - # Treat "forks" config parameter as max value. Only create number of workers - # equal to number of hosts in inventory if less than max value. - num_workers = min(self._options.forks, len(self._inventory.list_hosts())) - self._workers = [] - for i in range(num_workers): + + def _initialize_workers(self, num): + if num <= 0: + num = 1 + + for i in range(num): main_q = multiprocessing.Queue() rslt_q = multiprocessing.Queue() - prc = WorkerProcess(self, main_q, rslt_q, loader) + prc = WorkerProcess(self, main_q, rslt_q, self._loader) prc.start() self._workers.append((prc, main_q, rslt_q)) @@ -178,6 +179,11 @@ class TaskQueueManager: are done with the current task). ''' + #FIXME: should this move to 'run' and get serial and play pattern applied as limiter? + # Treat "forks" config parameter as max value. Only create number of workers + # equal to number of hosts in inventory if less than max value. + self._initialize_workers(min( [ v for v in [self._options.forks, play.serial, len(self._inventory.get_hosts(play.hosts))] if v is not None] )) + if not self._callbacks_loaded: self.load_callbacks() @@ -227,12 +233,14 @@ class TaskQueueManager: self.terminate() self._final_q.close() - self._result_prc.terminate() - for (worker_prc, main_q, rslt_q) in self._workers: - rslt_q.close() - main_q.close() - worker_prc.terminate() + if self._result_prc: + self._result_prc.terminate() + + for (worker_prc, main_q, rslt_q) in self._workers: + rslt_q.close() + main_q.close() + worker_prc.terminate() def clear_failed_hosts(self): self._failed_hosts = dict() From 931eb16e55b364878973f8e891cd021462aaa74e Mon Sep 17 00:00:00 2001 From: Brian Coca Date: Wed, 4 Nov 2015 17:28:08 -0500 Subject: [PATCH 2/2] clearer selection of min value cleanup workers at each run to avoid stragglers --- lib/ansible/executor/task_queue_manager.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/lib/ansible/executor/task_queue_manager.py b/lib/ansible/executor/task_queue_manager.py index d139e26acb..b027d487ea 100644 --- a/lib/ansible/executor/task_queue_manager.py +++ b/lib/ansible/executor/task_queue_manager.py @@ -96,9 +96,6 @@ class TaskQueueManager: self._workers = [] def _initialize_workers(self, num): - if num <= 0: - num = 1 - for i in range(num): main_q = multiprocessing.Queue() rslt_q = multiprocessing.Queue() @@ -179,10 +176,11 @@ class TaskQueueManager: are done with the current task). ''' - #FIXME: should this move to 'run' and get serial and play pattern applied as limiter? # Treat "forks" config parameter as max value. Only create number of workers # equal to number of hosts in inventory if less than max value. - self._initialize_workers(min( [ v for v in [self._options.forks, play.serial, len(self._inventory.get_hosts(play.hosts))] if v is not None] )) + contenders = [self._options.forks, play.serial, len(self._inventory.get_hosts(play.hosts))] + contenders = [ v for v in contenders if v is not None and v > 0 ] + self._initialize_workers(min( contenders )) if not self._callbacks_loaded: self.load_callbacks() @@ -224,16 +222,18 @@ class TaskQueueManager: if getattr(self._options, 'start_at_task', None) is not None and play_context.start_at_task is None: self._start_at_done = True - # and run the play using the strategy - return strategy.run(iterator, play_context) + # and run the play using the strategy and cleanup on way out + play_return = strategy.run(iterator, play_context) + self._cleanup_workers() + return play_return def cleanup(self): self._display.debug("RUNNING CLEANUP") - self.terminate() - self._final_q.close() + self._cleanup_workers() + def _cleanup_workers(self): if self._result_prc: self._result_prc.terminate()