From dfb1c0647e907dd27509922da995932dcc6be8db Mon Sep 17 00:00:00 2001 From: James Cammarata Date: Thu, 15 Sep 2016 16:55:54 -0500 Subject: [PATCH] Revert "Move queuing tasks to a background thread" This reverts commit b71957d6e6d666dc9594e798e4230e908c19b299. --- lib/ansible/executor/action_write_locks.py | 43 ------ lib/ansible/executor/module_common.py | 8 +- lib/ansible/executor/process/worker.py | 7 +- lib/ansible/executor/task_queue_manager.py | 128 ++---------------- lib/ansible/plugins/strategy/__init__.py | 116 ++++++++++++++-- lib/ansible/plugins/strategy/linear.py | 7 +- .../plugins/strategies/test_strategy_base.py | 69 +++++----- test/units/template/test_templar.py | 2 +- 8 files changed, 158 insertions(+), 222 deletions(-) delete mode 100644 lib/ansible/executor/action_write_locks.py diff --git a/lib/ansible/executor/action_write_locks.py b/lib/ansible/executor/action_write_locks.py deleted file mode 100644 index 413d56d9d7..0000000000 --- a/lib/ansible/executor/action_write_locks.py +++ /dev/null @@ -1,43 +0,0 @@ -# (c) 2016 - Red Hat, Inc. -# -# This file is part of Ansible -# -# Ansible is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# Ansible is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with Ansible. If not, see . - -# Make coding more python3-ish -from __future__ import (absolute_import, division, print_function) -__metaclass__ = type - -from multiprocessing import Lock -from ansible.module_utils.facts import Facts - -if 'action_write_locks' not in globals(): - # Do not initialize this more than once because it seems to bash - # the existing one. multiprocessing must be reloading the module - # when it forks? - action_write_locks = dict() - - # Below is a Lock for use when we weren't expecting a named module. - # It gets used when an action plugin directly invokes a module instead - # of going through the strategies. Slightly less efficient as all - # processes with unexpected module names will wait on this lock - action_write_locks[None] = Lock() - - # These plugins are called directly by action plugins (not going through - # a strategy). We precreate them here as an optimization - mods = set(p['name'] for p in Facts.PKG_MGRS) - mods.update(('copy', 'file', 'setup', 'slurp', 'stat')) - for mod_name in mods: - action_write_locks[mod_name] = Lock() - diff --git a/lib/ansible/executor/module_common.py b/lib/ansible/executor/module_common.py index 86549ea0b7..d7dac13de0 100644 --- a/lib/ansible/executor/module_common.py +++ b/lib/ansible/executor/module_common.py @@ -36,7 +36,7 @@ from ansible.module_utils._text import to_bytes, to_text # Must import strategy and use write_locks from there # If we import write_locks directly then we end up binding a # variable to the object and then it never gets updated. -from ansible.executor import action_write_locks +from ansible.plugins import strategy try: from __main__ import display @@ -605,16 +605,16 @@ def _find_snippet_imports(module_name, module_data, module_path, module_args, ta display.debug('ANSIBALLZ: using cached module: %s' % cached_module_filename) zipdata = open(cached_module_filename, 'rb').read() else: - if module_name in action_write_locks.action_write_locks: + if module_name in strategy.action_write_locks: display.debug('ANSIBALLZ: Using lock for %s' % module_name) - lock = action_write_locks.action_write_locks[module_name] + lock = strategy.action_write_locks[module_name] else: # If the action plugin directly invokes the module (instead of # going through a strategy) then we don't have a cross-process # Lock specifically for this module. Use the "unexpected # module" lock instead display.debug('ANSIBALLZ: Using generic lock for %s' % module_name) - lock = action_write_locks.action_write_locks[None] + lock = strategy.action_write_locks[None] display.debug('ANSIBALLZ: Acquiring lock') with lock: diff --git a/lib/ansible/executor/process/worker.py b/lib/ansible/executor/process/worker.py index d93de24ab3..0647c59b41 100644 --- a/lib/ansible/executor/process/worker.py +++ b/lib/ansible/executor/process/worker.py @@ -55,12 +55,12 @@ class WorkerProcess(multiprocessing.Process): for reading later. ''' - def __init__(self, rslt_q, play, host, task, task_vars, play_context, loader, variable_manager, shared_loader_obj): + def __init__(self, rslt_q, task_vars, host, task, play_context, loader, variable_manager, shared_loader_obj): super(WorkerProcess, self).__init__() # takes a task queue manager as the sole param: self._rslt_q = rslt_q - self._play = play + self._task_vars = task_vars self._host = host self._task = task self._play_context = play_context @@ -68,8 +68,6 @@ class WorkerProcess(multiprocessing.Process): self._variable_manager = variable_manager self._shared_loader_obj = shared_loader_obj - self._task_vars = task_vars - # dupe stdin, if we have one self._new_stdin = sys.stdin try: @@ -151,4 +149,3 @@ class WorkerProcess(multiprocessing.Process): #with open('worker_%06d.stats' % os.getpid(), 'w') as f: # f.write(s.getvalue()) - sys.exit(0) diff --git a/lib/ansible/executor/task_queue_manager.py b/lib/ansible/executor/task_queue_manager.py index 6744367363..c003275306 100644 --- a/lib/ansible/executor/task_queue_manager.py +++ b/lib/ansible/executor/task_queue_manager.py @@ -22,22 +22,16 @@ __metaclass__ = type import multiprocessing import os import tempfile -import threading -import time - -from collections import deque from ansible import constants as C from ansible.compat.six import string_types from ansible.errors import AnsibleError -from ansible.executor import action_write_locks from ansible.executor.play_iterator import PlayIterator -from ansible.executor.process.worker import WorkerProcess from ansible.executor.stats import AggregateStats from ansible.module_utils._text import to_text from ansible.playbook.block import Block from ansible.playbook.play_context import PlayContext -from ansible.plugins import action_loader, callback_loader, connection_loader, filter_loader, lookup_loader, module_loader, strategy_loader, test_loader +from ansible.plugins import callback_loader, strategy_loader, module_loader from ansible.plugins.callback import CallbackBase from ansible.template import Templar from ansible.utils.helpers import pct_to_int @@ -52,23 +46,6 @@ except ImportError: __all__ = ['TaskQueueManager'] -# TODO: this should probably be in the plugins/__init__.py, with -# a smarter mechanism to set all of the attributes based on -# the loaders created there -class SharedPluginLoaderObj: - ''' - A simple object to make pass the various plugin loaders to - the forked processes over the queue easier - ''' - def __init__(self): - self.action_loader = action_loader - self.connection_loader = connection_loader - self.filter_loader = filter_loader - self.test_loader = test_loader - self.lookup_loader = lookup_loader - self.module_loader = module_loader - - class TaskQueueManager: ''' @@ -100,8 +77,6 @@ class TaskQueueManager: self._run_additional_callbacks = run_additional_callbacks self._run_tree = run_tree - self._iterator = None - self._callbacks_loaded = False self._callback_plugins = [] self._start_at_done = False @@ -123,86 +98,12 @@ class TaskQueueManager: self._failed_hosts = dict() self._unreachable_hosts = dict() - # the "queue" for the background thread to use - self._queued_tasks = deque() - self._queued_tasks_lock = threading.Lock() - - # the background queuing thread - self._queue_thread = None - - self._workers = [] self._final_q = multiprocessing.Queue() # A temporary file (opened pre-fork) used by connection # plugins for inter-process locking. self._connection_lockfile = tempfile.TemporaryFile() - def _queue_thread_main(self): - - # create a dummy object with plugin loaders set as an easier - # way to share them with the forked processes - shared_loader_obj = SharedPluginLoaderObj() - - display.debug("queuing thread starting") - while not self._terminated: - available_workers = [] - for idx, entry in enumerate(self._workers): - (worker_prc, _) = entry - if worker_prc is None or not worker_prc.is_alive(): - available_workers.append(idx) - - if len(available_workers) == 0: - time.sleep(0.01) - continue - - for worker_idx in available_workers: - try: - self._queued_tasks_lock.acquire() - (host, task, task_vars, play_context) = self._queued_tasks.pop() - except IndexError: - break - finally: - self._queued_tasks_lock.release() - - if task.action not in action_write_locks.action_write_locks: - display.debug('Creating lock for %s' % task.action) - action_write_locks.action_write_locks[task.action] = multiprocessing.Lock() - - try: - worker_prc = WorkerProcess( - self._final_q, - self._iterator._play, - host, - task, - task_vars, - play_context, - self._loader, - self._variable_manager, - shared_loader_obj, - ) - self._workers[worker_idx][0] = worker_prc - worker_prc.start() - display.debug("worker is %d (out of %d available)" % (worker_idx+1, len(self._workers))) - - except (EOFError, IOError, AssertionError) as e: - # most likely an abort - display.debug("got an error while queuing: %s" % e) - break - - display.debug("queuing thread exiting") - - def queue_task(self, host, task, task_vars, play_context): - self._queued_tasks_lock.acquire() - self._queued_tasks.append((host, task, task_vars, play_context)) - self._queued_tasks_lock.release() - - def queue_multiple_tasks(self, items, play_context): - for item in items: - (host, task, task_vars) = item - self._queued_tasks_lock.acquire() - self._queued_tasks.append((host, task, task_vars, play_context)) - self._queued_tasks_lock.release() - def _initialize_processes(self, num): self._workers = [] @@ -307,10 +208,6 @@ class TaskQueueManager: if not self._callbacks_loaded: self.load_callbacks() - if self._queue_thread is None: - self._queue_thread = threading.Thread(target=self._queue_thread_main) - self._queue_thread.start() - all_vars = self._variable_manager.get_vars(loader=self._loader, play=play) templar = Templar(loader=self._loader, variables=all_vars) @@ -356,7 +253,7 @@ class TaskQueueManager: raise AnsibleError("Invalid play strategy specified: %s" % new_play.strategy, obj=play._ds) # build the iterator - self._iterator = PlayIterator( + iterator = PlayIterator( inventory=self._inventory, play=new_play, play_context=play_context, @@ -371,7 +268,7 @@ class TaskQueueManager: # hosts so we know what failed this round. for host_name in self._failed_hosts.keys(): host = self._inventory.get_host(host_name) - self._iterator.mark_host_failed(host) + iterator.mark_host_failed(host) self.clear_failed_hosts() @@ -382,10 +279,10 @@ class TaskQueueManager: self._start_at_done = True # and run the play using the strategy and cleanup on way out - play_return = strategy.run(self._iterator, play_context) + play_return = strategy.run(iterator, play_context) # now re-save the hosts that failed from the iterator to our internal list - for host_name in self._iterator.get_failed_hosts(): + for host_name in iterator.get_failed_hosts(): self._failed_hosts[host_name] = True self._cleanup_processes() @@ -398,13 +295,14 @@ class TaskQueueManager: self._cleanup_processes() def _cleanup_processes(self): - for (worker_prc, rslt_q) in self._workers: - rslt_q.close() - if worker_prc and worker_prc.is_alive(): - try: - worker_prc.terminate() - except AttributeError: - pass + if hasattr(self, '_workers'): + for (worker_prc, rslt_q) in self._workers: + rslt_q.close() + if worker_prc and worker_prc.is_alive(): + try: + worker_prc.terminate() + except AttributeError: + pass def clear_failed_hosts(self): self._failed_hosts = dict() diff --git a/lib/ansible/plugins/strategy/__init__.py b/lib/ansible/plugins/strategy/__init__.py index d1036d4a66..aaa164ee75 100644 --- a/lib/ansible/plugins/strategy/__init__.py +++ b/lib/ansible/plugins/strategy/__init__.py @@ -19,19 +19,24 @@ from __future__ import (absolute_import, division, print_function) __metaclass__ = type +import time + +from multiprocessing import Lock from jinja2.exceptions import UndefinedError from ansible.compat.six.moves import queue as Queue from ansible.compat.six import iteritems, string_types from ansible.errors import AnsibleError, AnsibleParserError, AnsibleUndefinedVariable +from ansible.executor.process.worker import WorkerProcess from ansible.executor.task_result import TaskResult from ansible.inventory.host import Host from ansible.inventory.group import Group +from ansible.module_utils.facts import Facts from ansible.playbook.helpers import load_list_of_blocks from ansible.playbook.included_file import IncludedFile from ansible.playbook.task_include import TaskInclude from ansible.playbook.role_include import IncludeRole -from ansible.plugins import action_loader +from ansible.plugins import action_loader, connection_loader, filter_loader, lookup_loader, module_loader, test_loader from ansible.template import Templar from ansible.vars import combine_vars, strip_internal_keys from ansible.module_utils._text import to_text @@ -45,6 +50,41 @@ except ImportError: __all__ = ['StrategyBase'] +if 'action_write_locks' not in globals(): + # Do not initialize this more than once because it seems to bash + # the existing one. multiprocessing must be reloading the module + # when it forks? + action_write_locks = dict() + + # Below is a Lock for use when we weren't expecting a named module. + # It gets used when an action plugin directly invokes a module instead + # of going through the strategies. Slightly less efficient as all + # processes with unexpected module names will wait on this lock + action_write_locks[None] = Lock() + + # These plugins are called directly by action plugins (not going through + # a strategy). We precreate them here as an optimization + mods = set(p['name'] for p in Facts.PKG_MGRS) + mods.update(('copy', 'file', 'setup', 'slurp', 'stat')) + for mod_name in mods: + action_write_locks[mod_name] = Lock() + +# TODO: this should probably be in the plugins/__init__.py, with +# a smarter mechanism to set all of the attributes based on +# the loaders created there +class SharedPluginLoaderObj: + ''' + A simple object to make pass the various plugin loaders to + the forked processes over the queue easier + ''' + def __init__(self): + self.action_loader = action_loader + self.connection_loader = connection_loader + self.filter_loader = filter_loader + self.test_loader = test_loader + self.lookup_loader = lookup_loader + self.module_loader = module_loader + class StrategyBase: @@ -56,6 +96,7 @@ class StrategyBase: def __init__(self, tqm): self._tqm = tqm self._inventory = tqm.get_inventory() + self._workers = tqm.get_workers() self._notified_handlers = tqm._notified_handlers self._listening_handlers = tqm._listening_handlers self._variable_manager = tqm.get_variable_manager() @@ -68,6 +109,7 @@ class StrategyBase: # internal counters self._pending_results = 0 + self._cur_worker = 0 # this dictionary is used to keep track of hosts that have # outstanding tasks still in queue @@ -118,10 +160,58 @@ class StrategyBase: def _queue_task(self, host, task, task_vars, play_context): ''' handles queueing the task up to be sent to a worker ''' - self._tqm.queue_task(host, task, task_vars, play_context) - self._pending_results += 1 - def _process_pending_results(self, iterator, one_pass=False, timeout=0.001): + display.debug("entering _queue_task() for %s/%s" % (host.name, task.action)) + + # Add a write lock for tasks. + # Maybe this should be added somewhere further up the call stack but + # this is the earliest in the code where we have task (1) extracted + # into its own variable and (2) there's only a single code path + # leading to the module being run. This is called by three + # functions: __init__.py::_do_handler_run(), linear.py::run(), and + # free.py::run() so we'd have to add to all three to do it there. + # The next common higher level is __init__.py::run() and that has + # tasks inside of play_iterator so we'd have to extract them to do it + # there. + + global action_write_locks + if task.action not in action_write_locks: + display.debug('Creating lock for %s' % task.action) + action_write_locks[task.action] = Lock() + + # and then queue the new task + try: + + # create a dummy object with plugin loaders set as an easier + # way to share them with the forked processes + shared_loader_obj = SharedPluginLoaderObj() + + queued = False + starting_worker = self._cur_worker + while True: + (worker_prc, rslt_q) = self._workers[self._cur_worker] + if worker_prc is None or not worker_prc.is_alive(): + worker_prc = WorkerProcess(self._final_q, task_vars, host, task, play_context, self._loader, self._variable_manager, shared_loader_obj) + self._workers[self._cur_worker][0] = worker_prc + worker_prc.start() + display.debug("worker is %d (out of %d available)" % (self._cur_worker+1, len(self._workers))) + queued = True + self._cur_worker += 1 + if self._cur_worker >= len(self._workers): + self._cur_worker = 0 + if queued: + break + elif self._cur_worker == starting_worker: + time.sleep(0.0001) + + self._pending_results += 1 + except (EOFError, IOError, AssertionError) as e: + # most likely an abort + display.debug("got an error while queuing: %s" % e) + return + display.debug("exiting _queue_task() for %s/%s" % (host.name, task.action)) + + def _process_pending_results(self, iterator, one_pass=False): ''' Reads results off the final queue and takes appropriate action based on the result (executing callbacks, updating state, etc.). @@ -180,10 +270,10 @@ class StrategyBase: else: return False - passes = 1 - while not self._tqm._terminated and passes < 3: + passes = 0 + while not self._tqm._terminated: try: - task_result = self._final_q.get(timeout=timeout) + task_result = self._final_q.get(timeout=0.001) original_host = get_original_host(task_result._host) original_task = iterator.get_original_task(original_host, task_result._task) task_result._host = original_host @@ -396,6 +486,8 @@ class StrategyBase: except Queue.Empty: passes += 1 + if passes > 2: + break if one_pass: break @@ -411,18 +503,14 @@ class StrategyBase: ret_results = [] display.debug("waiting for pending results...") - dead_check = 10 while self._pending_results > 0 and not self._tqm._terminated: + if self._tqm.has_dead_workers(): + raise AnsibleError("A worker was found in a dead state") + results = self._process_pending_results(iterator) ret_results.extend(results) - dead_check -= 1 - if dead_check == 0: - if self._pending_results > 0 and self._tqm.has_dead_workers(): - raise AnsibleError("A worker was found in a dead state") - dead_check = 10 - display.debug("no more pending results, returning what we have") return ret_results diff --git a/lib/ansible/plugins/strategy/linear.py b/lib/ansible/plugins/strategy/linear.py index 08b7a52c56..c6ef0a347b 100644 --- a/lib/ansible/plugins/strategy/linear.py +++ b/lib/ansible/plugins/strategy/linear.py @@ -182,9 +182,7 @@ class StrategyModule(StrategyBase): any_errors_fatal = False results = [] - items_to_queue = [] for (host, task) in host_tasks: - if not task: continue @@ -258,8 +256,7 @@ class StrategyModule(StrategyBase): display.debug("sending task start callback") self._blocked_hosts[host.get_name()] = True - items_to_queue.append((host, task, task_vars)) - self._pending_results += 1 + self._queue_task(host, task, task_vars, play_context) del task_vars # if we're bypassing the host loop, break out now @@ -271,8 +268,6 @@ class StrategyModule(StrategyBase): # queue for the main thread results += self._process_pending_results(iterator, one_pass=True) - self._tqm.queue_multiple_tasks(items_to_queue, play_context) - # go to next host/task group if skip_rest: continue diff --git a/test/units/plugins/strategies/test_strategy_base.py b/test/units/plugins/strategies/test_strategy_base.py index 2f7a13f1cc..49fd095bd4 100644 --- a/test/units/plugins/strategies/test_strategy_base.py +++ b/test/units/plugins/strategies/test_strategy_base.py @@ -121,44 +121,45 @@ class TestStrategyBase(unittest.TestCase): mock_tqm._unreachable_hosts = ["host02"] self.assertEqual(strategy_base.get_hosts_remaining(play=mock_play), mock_hosts[2:]) - #@patch.object(WorkerProcess, 'run') - #def test_strategy_base_queue_task(self, mock_worker): - # def fake_run(self): - # return + @patch.object(WorkerProcess, 'run') + def test_strategy_base_queue_task(self, mock_worker): + def fake_run(self): + return - # mock_worker.run.side_effect = fake_run + mock_worker.run.side_effect = fake_run - # fake_loader = DictDataLoader() - # mock_var_manager = MagicMock() - # mock_host = MagicMock() - # mock_host.has_hostkey = True - # mock_inventory = MagicMock() - # mock_options = MagicMock() - # mock_options.module_path = None + fake_loader = DictDataLoader() + mock_var_manager = MagicMock() + mock_host = MagicMock() + mock_host.has_hostkey = True + mock_inventory = MagicMock() + mock_options = MagicMock() + mock_options.module_path = None - # tqm = TaskQueueManager( - # inventory=mock_inventory, - # variable_manager=mock_var_manager, - # loader=fake_loader, - # options=mock_options, - # passwords=None, - # ) - # tqm._initialize_processes(3) - # tqm.hostvars = dict() + tqm = TaskQueueManager( + inventory=mock_inventory, + variable_manager=mock_var_manager, + loader=fake_loader, + options=mock_options, + passwords=None, + ) + tqm._initialize_processes(3) + tqm.hostvars = dict() - # try: - # strategy_base = StrategyBase(tqm=tqm) - # strategy_base._queue_task(host=mock_host, task=MagicMock(), task_vars=dict(), play_context=MagicMock()) - # self.assertEqual(strategy_base._cur_worker, 1) - # self.assertEqual(strategy_base._pending_results, 1) - # strategy_base._queue_task(host=mock_host, task=MagicMock(), task_vars=dict(), play_context=MagicMock()) - # self.assertEqual(strategy_base._cur_worker, 2) - # self.assertEqual(strategy_base._pending_results, 2) - # strategy_base._queue_task(host=mock_host, task=MagicMock(), task_vars=dict(), play_context=MagicMock()) - # self.assertEqual(strategy_base._cur_worker, 0) - # self.assertEqual(strategy_base._pending_results, 3) - # finally: - # tqm.cleanup() + try: + strategy_base = StrategyBase(tqm=tqm) + strategy_base._queue_task(host=mock_host, task=MagicMock(), task_vars=dict(), play_context=MagicMock()) + self.assertEqual(strategy_base._cur_worker, 1) + self.assertEqual(strategy_base._pending_results, 1) + strategy_base._queue_task(host=mock_host, task=MagicMock(), task_vars=dict(), play_context=MagicMock()) + self.assertEqual(strategy_base._cur_worker, 2) + self.assertEqual(strategy_base._pending_results, 2) + strategy_base._queue_task(host=mock_host, task=MagicMock(), task_vars=dict(), play_context=MagicMock()) + self.assertEqual(strategy_base._cur_worker, 0) + self.assertEqual(strategy_base._pending_results, 3) + finally: + tqm.cleanup() + def test_strategy_base_process_pending_results(self): mock_tqm = MagicMock() diff --git a/test/units/template/test_templar.py b/test/units/template/test_templar.py index 481dc3e8d5..2ec8f54e0c 100644 --- a/test/units/template/test_templar.py +++ b/test/units/template/test_templar.py @@ -25,7 +25,7 @@ from ansible.compat.tests.mock import patch, MagicMock from ansible import constants as C from ansible.errors import * from ansible.plugins import filter_loader, lookup_loader, module_loader -from ansible.executor.task_queue_manager import SharedPluginLoaderObj +from ansible.plugins.strategy import SharedPluginLoaderObj from ansible.template import Templar from units.mock.loader import DictDataLoader