1
0
Fork 0
mirror of https://github.com/ansible-collections/community.general.git synced 2024-09-14 20:13:21 +02:00

use minimal number of forks taking serial, forks and play patterns into account

This commit is contained in:
Brian Coca 2015-11-04 15:16:20 -05:00
parent 3c35655b52
commit b44f1428d0

View file

@ -63,6 +63,7 @@ class TaskQueueManager:
self._callbacks_loaded = False self._callbacks_loaded = False
self._callback_plugins = [] self._callback_plugins = []
self._start_at_done = False self._start_at_done = False
self._result_prc = None
# make sure the module path (if specified) is parsed and # make sure the module path (if specified) is parsed and
# added to the module_loader object # added to the module_loader object
@ -92,17 +93,17 @@ class TaskQueueManager:
# plugins for inter-process locking. # plugins for inter-process locking.
self._connection_lockfile = tempfile.TemporaryFile() self._connection_lockfile = tempfile.TemporaryFile()
#FIXME: should this move to 'run' and get serial and play pattern applied as limiter?
# Treat "forks" config parameter as max value. Only create number of workers
# equal to number of hosts in inventory if less than max value.
num_workers = min(self._options.forks, len(self._inventory.list_hosts()))
self._workers = [] self._workers = []
for i in range(num_workers):
def _initialize_workers(self, num):
if num <= 0:
num = 1
for i in range(num):
main_q = multiprocessing.Queue() main_q = multiprocessing.Queue()
rslt_q = multiprocessing.Queue() rslt_q = multiprocessing.Queue()
prc = WorkerProcess(self, main_q, rslt_q, loader) prc = WorkerProcess(self, main_q, rslt_q, self._loader)
prc.start() prc.start()
self._workers.append((prc, main_q, rslt_q)) self._workers.append((prc, main_q, rslt_q))
@ -178,6 +179,11 @@ class TaskQueueManager:
are done with the current task). are done with the current task).
''' '''
#FIXME: should this move to 'run' and get serial and play pattern applied as limiter?
# Treat "forks" config parameter as max value. Only create number of workers
# equal to number of hosts in inventory if less than max value.
self._initialize_workers(min( [ v for v in [self._options.forks, play.serial, len(self._inventory.get_hosts(play.hosts))] if v is not None] ))
if not self._callbacks_loaded: if not self._callbacks_loaded:
self.load_callbacks() self.load_callbacks()
@ -227,12 +233,14 @@ class TaskQueueManager:
self.terminate() self.terminate()
self._final_q.close() self._final_q.close()
self._result_prc.terminate()
for (worker_prc, main_q, rslt_q) in self._workers: if self._result_prc:
rslt_q.close() self._result_prc.terminate()
main_q.close()
worker_prc.terminate() for (worker_prc, main_q, rslt_q) in self._workers:
rslt_q.close()
main_q.close()
worker_prc.terminate()
def clear_failed_hosts(self): def clear_failed_hosts(self):
self._failed_hosts = dict() self._failed_hosts = dict()