1
0
Fork 0
mirror of https://github.com/ansible-collections/community.general.git synced 2024-09-14 20:13:21 +02:00

clearer selection of min value

cleanup workers at each run to avoid stragglers
This commit is contained in:
Brian Coca 2015-11-04 17:28:08 -05:00
parent b44f1428d0
commit 931eb16e55

View file

@ -96,9 +96,6 @@ class TaskQueueManager:
self._workers = [] self._workers = []
def _initialize_workers(self, num): def _initialize_workers(self, num):
if num <= 0:
num = 1
for i in range(num): for i in range(num):
main_q = multiprocessing.Queue() main_q = multiprocessing.Queue()
rslt_q = multiprocessing.Queue() rslt_q = multiprocessing.Queue()
@ -179,10 +176,11 @@ class TaskQueueManager:
are done with the current task). are done with the current task).
''' '''
#FIXME: should this move to 'run' and get serial and play pattern applied as limiter?
# Treat "forks" config parameter as max value. Only create number of workers # Treat "forks" config parameter as max value. Only create number of workers
# equal to number of hosts in inventory if less than max value. # equal to number of hosts in inventory if less than max value.
self._initialize_workers(min( [ v for v in [self._options.forks, play.serial, len(self._inventory.get_hosts(play.hosts))] if v is not None] )) contenders = [self._options.forks, play.serial, len(self._inventory.get_hosts(play.hosts))]
contenders = [ v for v in contenders if v is not None and v > 0 ]
self._initialize_workers(min( contenders ))
if not self._callbacks_loaded: if not self._callbacks_loaded:
self.load_callbacks() self.load_callbacks()
@ -224,16 +222,18 @@ class TaskQueueManager:
if getattr(self._options, 'start_at_task', None) is not None and play_context.start_at_task is None: if getattr(self._options, 'start_at_task', None) is not None and play_context.start_at_task is None:
self._start_at_done = True self._start_at_done = True
# and run the play using the strategy # and run the play using the strategy and cleanup on way out
return strategy.run(iterator, play_context) play_return = strategy.run(iterator, play_context)
self._cleanup_workers()
return play_return
def cleanup(self): def cleanup(self):
self._display.debug("RUNNING CLEANUP") self._display.debug("RUNNING CLEANUP")
self.terminate() self.terminate()
self._final_q.close() self._final_q.close()
self._cleanup_workers()
def _cleanup_workers(self):
if self._result_prc: if self._result_prc:
self._result_prc.terminate() self._result_prc.terminate()