diff --git a/lib/ansible/executor/process/worker.py b/lib/ansible/executor/process/worker.py index f4d26caf2a..2cf9bbb45a 100644 --- a/lib/ansible/executor/process/worker.py +++ b/lib/ansible/executor/process/worker.py @@ -57,11 +57,11 @@ class WorkerProcess(multiprocessing.Process): for reading later. ''' - def __init__(self, rslt_q, task_vars, host, task, play_context, loader, variable_manager, shared_loader_obj): + def __init__(self, final_q, task_vars, host, task, play_context, loader, variable_manager, shared_loader_obj): super(WorkerProcess, self).__init__() # takes a task queue manager as the sole param: - self._rslt_q = rslt_q + self._final_q = final_q self._task_vars = task_vars self._host = host self._task = task @@ -115,7 +115,7 @@ class WorkerProcess(multiprocessing.Process): self._new_stdin, self._loader, self._shared_loader_obj, - self._rslt_q + self._final_q ).run() display.debug("done running TaskExecutor() for %s/%s [%s]" % (self._host, self._task, self._task._uuid)) @@ -130,7 +130,7 @@ class WorkerProcess(multiprocessing.Process): # put the result on the result queue display.debug("sending task result for task %s" % self._task._uuid) - self._rslt_q.put(task_result) + self._final_q.put(task_result) display.debug("done sending task result for task %s" % self._task._uuid) except AnsibleConnectionFailure: @@ -142,7 +142,7 @@ class WorkerProcess(multiprocessing.Process): dict(unreachable=True), task_fields=self._task.dump_attrs(), ) - self._rslt_q.put(task_result, block=False) + self._final_q.put(task_result, block=False) except Exception as e: if not isinstance(e, (IOError, EOFError, KeyboardInterrupt, SystemExit)) or isinstance(e, TemplateNotFound): @@ -155,7 +155,7 @@ class WorkerProcess(multiprocessing.Process): dict(failed=True, exception=to_text(traceback.format_exc()), stdout=''), task_fields=self._task.dump_attrs(), ) - self._rslt_q.put(task_result, block=False) + self._final_q.put(task_result, block=False) except: display.debug(u"WORKER EXCEPTION: %s" % to_text(e)) display.debug(u"WORKER TRACEBACK: %s" % to_text(traceback.format_exc())) diff --git a/lib/ansible/executor/task_executor.py b/lib/ansible/executor/task_executor.py index a60d208221..baeaeecafe 100644 --- a/lib/ansible/executor/task_executor.py +++ b/lib/ansible/executor/task_executor.py @@ -68,7 +68,7 @@ class TaskExecutor: # the module SQUASH_ACTIONS = frozenset(C.DEFAULT_SQUASH_ACTIONS) - def __init__(self, host, task, job_vars, play_context, new_stdin, loader, shared_loader_obj, rslt_q): + def __init__(self, host, task, job_vars, play_context, new_stdin, loader, shared_loader_obj, final_q): self._host = host self._task = task self._job_vars = job_vars @@ -77,7 +77,7 @@ class TaskExecutor: self._loader = loader self._shared_loader_obj = shared_loader_obj self._connection = None - self._rslt_q = rslt_q + self._final_q = final_q self._loop_eval_error = None self._task.squash() @@ -348,7 +348,7 @@ class TaskExecutor: # gets templated here unlike rest of loop_control fields, depends on loop_var above res['_ansible_item_label'] = templar.template(label, cache=False) - self._rslt_q.put( + self._final_q.put( TaskResult( self._host.name, self._task._uuid, @@ -669,7 +669,7 @@ class TaskExecutor: result['_ansible_retry'] = True result['retries'] = retries display.debug('Retrying task, attempt %d of %d' % (attempt, retries)) - self._rslt_q.put(TaskResult(self._host.name, self._task._uuid, result, task_fields=self._task.dump_attrs()), block=False) + self._final_q.put(TaskResult(self._host.name, self._task._uuid, result, task_fields=self._task.dump_attrs()), block=False) time.sleep(delay) else: if retries > 1: diff --git a/lib/ansible/executor/task_queue_manager.py b/lib/ansible/executor/task_queue_manager.py index d6f3029cf6..f45c5de4b3 100644 --- a/lib/ansible/executor/task_queue_manager.py +++ b/lib/ansible/executor/task_queue_manager.py @@ -114,8 +114,7 @@ class TaskQueueManager: self._workers = [] for i in range(num): - rslt_q = multiprocessing.Queue() - self._workers.append([None, rslt_q]) + self._workers.append(None) def _initialize_notified_handlers(self, play): ''' @@ -307,8 +306,7 @@ class TaskQueueManager: def _cleanup_processes(self): if hasattr(self, '_workers'): - for (worker_prc, rslt_q) in self._workers: - rslt_q.close() + for worker_prc in self._workers: if worker_prc and worker_prc.is_alive(): try: worker_prc.terminate() @@ -340,8 +338,8 @@ class TaskQueueManager: defunct = False for (idx, x) in enumerate(self._workers): - if hasattr(x[0], 'exitcode'): - if x[0].exitcode in [-9, -11, -15]: + if hasattr(x, 'exitcode'): + if x.exitcode in [-9, -11, -15]: defunct = True return defunct diff --git a/lib/ansible/plugins/strategy/__init__.py b/lib/ansible/plugins/strategy/__init__.py index 97be5bf3a3..480bb3ddd8 100644 --- a/lib/ansible/plugins/strategy/__init__.py +++ b/lib/ansible/plugins/strategy/__init__.py @@ -302,7 +302,7 @@ class StrategyBase: queued = False starting_worker = self._cur_worker while True: - (worker_prc, rslt_q) = self._workers[self._cur_worker] + worker_prc = self._workers[self._cur_worker] if worker_prc is None or not worker_prc.is_alive(): self._queued_task_cache[(host.name, task._uuid)] = { 'host': host, @@ -312,7 +312,7 @@ class StrategyBase: } worker_prc = WorkerProcess(self._final_q, task_vars, host, task, play_context, self._loader, self._variable_manager, shared_loader_obj) - self._workers[self._cur_worker][0] = worker_prc + self._workers[self._cur_worker] = worker_prc worker_prc.start() display.debug("worker is %d (out of %d available)" % (self._cur_worker + 1, len(self._workers))) queued = True diff --git a/test/units/executor/test_task_executor.py b/test/units/executor/test_task_executor.py index 99fcb03d93..7870d1f847 100644 --- a/test/units/executor/test_task_executor.py +++ b/test/units/executor/test_task_executor.py @@ -55,7 +55,7 @@ class TestTaskExecutor(unittest.TestCase): new_stdin=new_stdin, loader=fake_loader, shared_loader_obj=mock_shared_loader, - rslt_q=mock_queue, + final_q=mock_queue, ) def test_task_executor_run(self): @@ -82,7 +82,7 @@ class TestTaskExecutor(unittest.TestCase): new_stdin=new_stdin, loader=fake_loader, shared_loader_obj=mock_shared_loader, - rslt_q=mock_queue, + final_q=mock_queue, ) te._get_loop_items = MagicMock(return_value=None) @@ -126,7 +126,7 @@ class TestTaskExecutor(unittest.TestCase): new_stdin=new_stdin, loader=fake_loader, shared_loader_obj=mock_shared_loader, - rslt_q=mock_queue, + final_q=mock_queue, ) items = te._get_loop_items() @@ -162,7 +162,7 @@ class TestTaskExecutor(unittest.TestCase): new_stdin=new_stdin, loader=fake_loader, shared_loader_obj=mock_shared_loader, - rslt_q=mock_queue, + final_q=mock_queue, ) def _execute(variables): @@ -208,7 +208,7 @@ class TestTaskExecutor(unittest.TestCase): new_stdin=new_stdin, loader=fake_loader, shared_loader_obj=mock_shared_loader, - rslt_q=mock_queue, + final_q=mock_queue, ) # No replacement @@ -400,7 +400,7 @@ class TestTaskExecutor(unittest.TestCase): new_stdin=new_stdin, loader=fake_loader, shared_loader_obj=shared_loader, - rslt_q=mock_queue, + final_q=mock_queue, ) te._get_connection = MagicMock(return_value=mock_connection) @@ -455,7 +455,7 @@ class TestTaskExecutor(unittest.TestCase): new_stdin=new_stdin, loader=fake_loader, shared_loader_obj=shared_loader, - rslt_q=mock_queue, + final_q=mock_queue, ) te._connection = MagicMock()