diff --git a/lib/ansible/executor/play_iterator.py b/lib/ansible/executor/play_iterator.py index 99b7285182..e9f6a0c47b 100644 --- a/lib/ansible/executor/play_iterator.py +++ b/lib/ansible/executor/play_iterator.py @@ -153,8 +153,6 @@ class PlayIterator: self._blocks = [] self._variable_manager = variable_manager - self._task_uuid_cache = dict() - # Default options to gather gather_subset = play_context.gather_subset gather_timeout = play_context.gather_timeout @@ -242,16 +240,10 @@ class PlayIterator: return self._host_states[host.name].copy() def cache_block_tasks(self, block): - def _cache_portion(p): - for t in p: - if isinstance(t, Block): - self.cache_block_tasks(t) - elif t._uuid not in self._task_uuid_cache: - self._task_uuid_cache[t._uuid] = t - - for portion in (block.block, block.rescue, block.always): - if portion is not None: - _cache_portion(portion) + # now a noop, we've changed the way we do caching and finding of + # original task entries, but just in case any 3rd party strategies + # are using this we're leaving it here for now + return def get_next_task_for_host(self, host, peek=False): @@ -520,19 +512,8 @@ class PlayIterator: return self._check_failed_state(s) def get_original_task(self, host, task): - ''' - Finds the task in the task list which matches the UUID of the given task. - The executor engine serializes/deserializes objects as they are passed through - the different processes, and not all data structures are preserved. This method - allows us to find the original task passed into the executor engine. - ''' - - if isinstance(task, Task): - the_uuid = task._uuid - else: - the_uuid = task - - return self._task_uuid_cache.get(the_uuid, None) + # now a noop because we've changed the way we do caching + return (None, None) def _insert_tasks_into_state(self, state, task_list): # if we've failed at all, or if the task list is empty, just return the current state @@ -569,6 +550,4 @@ class PlayIterator: return state def add_tasks(self, host, task_list): - for b in task_list: - self.cache_block_tasks(b) self._host_states[host.name] = self._insert_tasks_into_state(self.get_host_state(host), task_list) diff --git a/lib/ansible/plugins/strategy/__init__.py b/lib/ansible/plugins/strategy/__init__.py index 4eb08901b8..7b5d3348a8 100644 --- a/lib/ansible/plugins/strategy/__init__.py +++ b/lib/ansible/plugins/strategy/__init__.py @@ -120,9 +120,9 @@ def debug_closure(func): for result in results: task = result._task host = result._host - _queue_task_args = self._queue_task_args.pop('%s%s' % (host.name, task._uuid)) - task_vars = _queue_task_args['task_vars'] - play_context = _queue_task_args['play_context'] + _queued_task_args = self._queued_task_cache.pop((host.name, task._uuid), None) + task_vars = _queued_task_args['task_vars'] + play_context = _queued_task_args['play_context'] # Try to grab the previous host state, if it doesn't exist use get_host_state to generate an empty state try: prev_host_state = prev_host_states[host.name] @@ -179,7 +179,11 @@ class StrategyBase: self._final_q = tqm._final_q self._step = getattr(tqm._options, 'step', False) self._diff = getattr(tqm._options, 'diff', False) - self._queue_task_args = {} + + # the task cache is a dictionary of tuples of (host.name, task._uuid) + # used to find the original task object of in-flight tasks and to store + # the task args/vars and play context info used to queue the task. + self._queued_task_cache = {} # Backwards compat: self._display isn't really needed, just import the global display and use that. self._display = display @@ -270,13 +274,6 @@ class StrategyBase: def _queue_task(self, host, task, task_vars, play_context): ''' handles queueing the task up to be sent to a worker ''' - self._queue_task_args['%s%s' % (host.name, task._uuid)] = { - 'host': host, - 'task': task, - 'task_vars': task_vars, - 'play_context': play_context - } - display.debug("entering _queue_task() for %s/%s" % (host.name, task.action)) # Add a write lock for tasks. @@ -306,6 +303,13 @@ class StrategyBase: while True: (worker_prc, rslt_q) = self._workers[self._cur_worker] if worker_prc is None or not worker_prc.is_alive(): + self._queued_task_cache[(host.name, task._uuid)] = { + 'host': host, + 'task': task, + 'task_vars': task_vars, + 'play_context': play_context + } + worker_prc = WorkerProcess(self._final_q, task_vars, host, task, play_context, self._loader, self._variable_manager, shared_loader_obj) self._workers[self._cur_worker][0] = worker_prc worker_prc.start() @@ -425,7 +429,8 @@ class StrategyBase: # get the original host and task. We then assign them to the TaskResult for use in callbacks/etc. original_host = get_original_host(task_result._host) - found_task = iterator.get_original_task(original_host, task_result._task) + queue_cache_entry = (original_host.name, task_result._task) + found_task = self._queued_task_cache.get(queue_cache_entry)['task'] original_task = found_task.copy(exclude_parent=True, exclude_tasks=True) original_task._parent = found_task._parent original_task.from_attrs(task_result._task_fields) @@ -854,8 +859,6 @@ class StrategyBase: host_results = [] for host in notified_hosts: if not handler.has_triggered(host) and (not iterator.is_failed(host) or play_context.force_handlers): - if handler._uuid not in iterator._task_uuid_cache: - iterator._task_uuid_cache[handler._uuid] = handler task_vars = self._variable_manager.get_vars(play=iterator._play, host=host, task=handler) self.add_tqm_variables(task_vars, play=iterator._play) self._queue_task(host, handler, task_vars, play_context) diff --git a/test/units/executor/test_play_iterator.py b/test/units/executor/test_play_iterator.py index 7e920ea132..525944185d 100644 --- a/test/units/executor/test_play_iterator.py +++ b/test/units/executor/test_play_iterator.py @@ -155,16 +155,6 @@ class TestPlayIterator(unittest.TestCase): all_vars=dict(), ) - # lookup up an original task - target_task = p._entries[0].tasks[0].block[0] - task_copy = target_task.copy(exclude_parent=True) - found_task = itr.get_original_task(hosts[0], task_copy) - self.assertEqual(target_task, found_task) - - bad_task = Task() - found_task = itr.get_original_task(hosts[0], bad_task) - self.assertIsNone(found_task) - # pre task (host_state, task) = itr.get_next_task_for_host(hosts[0]) self.assertIsNotNone(task) diff --git a/test/units/plugins/strategy/test_strategy_base.py b/test/units/plugins/strategy/test_strategy_base.py index 5c194e3e26..8f3781e98d 100644 --- a/test/units/plugins/strategy/test_strategy_base.py +++ b/test/units/plugins/strategy/test_strategy_base.py @@ -20,6 +20,7 @@ from __future__ import (absolute_import, division, print_function) __metaclass__ = type from units.mock.loader import DictDataLoader +from copy import deepcopy import uuid from ansible.compat.tests import unittest @@ -207,15 +208,18 @@ class TestStrategyBase(unittest.TestCase): tqm._initialize_processes(3) tqm.hostvars = dict() + mock_task = MagicMock() + mock_task._uuid = 'abcd' + try: strategy_base = StrategyBase(tqm=tqm) - strategy_base._queue_task(host=mock_host, task=MagicMock(), task_vars=dict(), play_context=MagicMock()) + strategy_base._queue_task(host=mock_host, task=mock_task, task_vars=dict(), play_context=MagicMock()) self.assertEqual(strategy_base._cur_worker, 1) self.assertEqual(strategy_base._pending_results, 1) - strategy_base._queue_task(host=mock_host, task=MagicMock(), task_vars=dict(), play_context=MagicMock()) + strategy_base._queue_task(host=mock_host, task=mock_task, task_vars=dict(), play_context=MagicMock()) self.assertEqual(strategy_base._cur_worker, 2) self.assertEqual(strategy_base._pending_results, 2) - strategy_base._queue_task(host=mock_host, task=MagicMock(), task_vars=dict(), play_context=MagicMock()) + strategy_base._queue_task(host=mock_host, task=mock_task, task_vars=dict(), play_context=MagicMock()) self.assertEqual(strategy_base._cur_worker, 0) self.assertEqual(strategy_base._pending_results, 3) finally: @@ -282,7 +286,6 @@ class TestStrategyBase(unittest.TestCase): mock_iterator._play = mock_play mock_iterator.mark_host_failed.return_value = None mock_iterator.get_next_task_for_host.return_value = (None, None) - mock_iterator.get_original_task.return_value = mock_task mock_handler_block = MagicMock() mock_handler_block.block = [mock_handler_task] @@ -337,12 +340,16 @@ class TestStrategyBase(unittest.TestCase): strategy_base._blocked_hosts['test01'] = True strategy_base._pending_results = 1 - strategy_base._queue_task_args = MagicMock() - strategy_base._queue_task_args.pop.return_value = { - 'task_vars': {}, - 'play_context': {}, + mock_queued_task_cache = { + (mock_host.name, mock_task._uuid): { + 'task': mock_task, + 'host': mock_host, + 'task_vars': {}, + 'play_context': {}, + } } + strategy_base._queued_task_cache = deepcopy(mock_queued_task_cache) results = strategy_base._wait_on_pending_results(iterator=mock_iterator) self.assertEqual(len(results), 1) self.assertEqual(results[0], task_result) @@ -354,6 +361,7 @@ class TestStrategyBase(unittest.TestCase): strategy_base._blocked_hosts['test01'] = True strategy_base._pending_results = 1 mock_iterator.is_failed.return_value = True + strategy_base._queued_task_cache = deepcopy(mock_queued_task_cache) results = strategy_base._wait_on_pending_results(iterator=mock_iterator) self.assertEqual(len(results), 1) self.assertEqual(results[0], task_result) @@ -367,6 +375,7 @@ class TestStrategyBase(unittest.TestCase): queue_items.append(task_result) strategy_base._blocked_hosts['test01'] = True strategy_base._pending_results = 1 + strategy_base._queued_task_cache = deepcopy(mock_queued_task_cache) results = strategy_base._wait_on_pending_results(iterator=mock_iterator) self.assertEqual(len(results), 1) self.assertEqual(results[0], task_result) @@ -379,6 +388,7 @@ class TestStrategyBase(unittest.TestCase): queue_items.append(task_result) strategy_base._blocked_hosts['test01'] = True strategy_base._pending_results = 1 + strategy_base._queued_task_cache = deepcopy(mock_queued_task_cache) results = strategy_base._wait_on_pending_results(iterator=mock_iterator) self.assertEqual(len(results), 1) self.assertEqual(results[0], task_result) @@ -388,6 +398,7 @@ class TestStrategyBase(unittest.TestCase): queue_items.append(TaskResult(host=mock_host.name, task=mock_task._uuid, return_data=dict(add_host=dict(host_name='newhost01', new_groups=['foo'])))) strategy_base._blocked_hosts['test01'] = True strategy_base._pending_results = 1 + strategy_base._queued_task_cache = deepcopy(mock_queued_task_cache) results = strategy_base._wait_on_pending_results(iterator=mock_iterator) self.assertEqual(len(results), 1) self.assertEqual(strategy_base._pending_results, 0) @@ -396,6 +407,7 @@ class TestStrategyBase(unittest.TestCase): queue_items.append(TaskResult(host=mock_host.name, task=mock_task._uuid, return_data=dict(add_group=dict(group_name='foo')))) strategy_base._blocked_hosts['test01'] = True strategy_base._pending_results = 1 + strategy_base._queued_task_cache = deepcopy(mock_queued_task_cache) results = strategy_base._wait_on_pending_results(iterator=mock_iterator) self.assertEqual(len(results), 1) self.assertEqual(strategy_base._pending_results, 0) @@ -404,6 +416,7 @@ class TestStrategyBase(unittest.TestCase): queue_items.append(TaskResult(host=mock_host.name, task=mock_task._uuid, return_data=dict(changed=True, _ansible_notify=['test handler']))) strategy_base._blocked_hosts['test01'] = True strategy_base._pending_results = 1 + strategy_base._queued_task_cache = deepcopy(mock_queued_task_cache) results = strategy_base._wait_on_pending_results(iterator=mock_iterator) self.assertEqual(len(results), 1) self.assertEqual(strategy_base._pending_results, 0) @@ -501,6 +514,7 @@ class TestStrategyBase(unittest.TestCase): mock_handler_task._role = None mock_handler_task._parent = None mock_handler_task._uuid = 'xxxxxxxxxxxxxxxx' + mock_handler_task.copy.return_value = mock_handler_task mock_handler = MagicMock() mock_handler.block = [mock_handler_task] @@ -516,13 +530,13 @@ class TestStrategyBase(unittest.TestCase): mock_inventory = MagicMock() mock_inventory.get_hosts.return_value = [mock_host] mock_inventory.get.return_value = mock_host + mock_inventory.get_host.return_value = mock_host mock_var_mgr = MagicMock() mock_var_mgr.get_vars.return_value = dict() mock_iterator = MagicMock() mock_iterator._play = mock_play - mock_iterator.get_original_task.return_value = mock_handler_task fake_loader = DictDataLoader() mock_options = MagicMock() @@ -545,9 +559,11 @@ class TestStrategyBase(unittest.TestCase): strategy_base._inventory = mock_inventory strategy_base._notified_handlers = {mock_handler_task._uuid: [mock_host]} - task_result = TaskResult(Host('host01'), Handler(), dict(changed=False)) - strategy_base._queue_task_args = MagicMock() - strategy_base._queue_task_args.pop.return_value = { + task_result = TaskResult(mock_host.name, mock_handler_task._uuid, dict(changed=False)) + strategy_base._queued_task_cache = dict() + strategy_base._queued_task_cache[(mock_host.name, mock_handler_task._uuid)] = { + 'task': mock_handler_task, + 'host': mock_host, 'task_vars': {}, 'play_context': mock_play_context }