mirror of
https://github.com/ansible-collections/community.general.git
synced 2024-09-14 20:13:21 +02:00
Merge pull request #13038 from bcoca/min_forks_to_play
use minimal number of forks
This commit is contained in:
commit
9753de35a7
1 changed files with 24 additions and 16 deletions
|
@ -63,6 +63,7 @@ class TaskQueueManager:
|
|||
self._callbacks_loaded = False
|
||||
self._callback_plugins = []
|
||||
self._start_at_done = False
|
||||
self._result_prc = None
|
||||
|
||||
# make sure the module path (if specified) is parsed and
|
||||
# added to the module_loader object
|
||||
|
@ -92,17 +93,14 @@ class TaskQueueManager:
|
|||
# plugins for inter-process locking.
|
||||
self._connection_lockfile = tempfile.TemporaryFile()
|
||||
|
||||
#FIXME: should this move to 'run' and get serial and play pattern applied as limiter?
|
||||
# Treat "forks" config parameter as max value. Only create number of workers
|
||||
# equal to number of hosts in inventory if less than max value.
|
||||
num_workers = min(self._options.forks, len(self._inventory.list_hosts()))
|
||||
|
||||
self._workers = []
|
||||
for i in range(num_workers):
|
||||
|
||||
def _initialize_workers(self, num):
|
||||
for i in range(num):
|
||||
main_q = multiprocessing.Queue()
|
||||
rslt_q = multiprocessing.Queue()
|
||||
|
||||
prc = WorkerProcess(self, main_q, rslt_q, loader)
|
||||
prc = WorkerProcess(self, main_q, rslt_q, self._loader)
|
||||
prc.start()
|
||||
|
||||
self._workers.append((prc, main_q, rslt_q))
|
||||
|
@ -178,6 +176,12 @@ class TaskQueueManager:
|
|||
are done with the current task).
|
||||
'''
|
||||
|
||||
# Treat "forks" config parameter as max value. Only create number of workers
|
||||
# equal to number of hosts in inventory if less than max value.
|
||||
contenders = [self._options.forks, play.serial, len(self._inventory.get_hosts(play.hosts))]
|
||||
contenders = [ v for v in contenders if v is not None and v > 0 ]
|
||||
self._initialize_workers(min( contenders ))
|
||||
|
||||
if not self._callbacks_loaded:
|
||||
self.load_callbacks()
|
||||
|
||||
|
@ -218,21 +222,25 @@ class TaskQueueManager:
|
|||
if getattr(self._options, 'start_at_task', None) is not None and play_context.start_at_task is None:
|
||||
self._start_at_done = True
|
||||
|
||||
# and run the play using the strategy
|
||||
return strategy.run(iterator, play_context)
|
||||
# and run the play using the strategy and cleanup on way out
|
||||
play_return = strategy.run(iterator, play_context)
|
||||
self._cleanup_workers()
|
||||
return play_return
|
||||
|
||||
def cleanup(self):
|
||||
self._display.debug("RUNNING CLEANUP")
|
||||
|
||||
self.terminate()
|
||||
|
||||
self._final_q.close()
|
||||
self._result_prc.terminate()
|
||||
self._cleanup_workers()
|
||||
|
||||
for (worker_prc, main_q, rslt_q) in self._workers:
|
||||
rslt_q.close()
|
||||
main_q.close()
|
||||
worker_prc.terminate()
|
||||
def _cleanup_workers(self):
|
||||
if self._result_prc:
|
||||
self._result_prc.terminate()
|
||||
|
||||
for (worker_prc, main_q, rslt_q) in self._workers:
|
||||
rslt_q.close()
|
||||
main_q.close()
|
||||
worker_prc.terminate()
|
||||
|
||||
def clear_failed_hosts(self):
|
||||
self._failed_hosts = dict()
|
||||
|
|
Loading…
Reference in a new issue