1
0
Fork 0
mirror of https://github.com/ansible-collections/community.general.git synced 2024-09-14 20:13:21 +02:00
community.general/test/units/plugins/strategies/test_strategy_base.py
jctanner 238c6461f6 Add a function to check for killed processes in all strategies (#16684)
* Add a function to check for killed processes so that if any
threads are sigkilled or sigtermed, the entire playbook execution is aborted.
2016-07-14 16:37:35 -04:00

427 lines
17 KiB
Python

# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com>
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
# Make coding more python3-ish
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
from ansible.compat.tests import unittest
from ansible.compat.tests.mock import patch, MagicMock
from ansible.errors import AnsibleError, AnsibleParserError
from ansible.plugins.strategy import StrategyBase
from ansible.executor.process.worker import WorkerProcess
from ansible.executor.task_queue_manager import TaskQueueManager
from ansible.executor.task_result import TaskResult
from ansible.playbook.handler import Handler
from ansible.inventory.host import Host
from six.moves import queue as Queue
from units.mock.loader import DictDataLoader
class TestStrategyBase(unittest.TestCase):
def setUp(self):
pass
def tearDown(self):
pass
def test_strategy_base_init(self):
mock_tqm = MagicMock(TaskQueueManager)
mock_tqm._final_q = MagicMock()
mock_tqm._options = MagicMock()
mock_tqm._notified_handlers = {}
mock_tqm._listening_handlers = {}
strategy_base = StrategyBase(tqm=mock_tqm)
def test_strategy_base_run(self):
mock_tqm = MagicMock(TaskQueueManager)
mock_tqm._final_q = MagicMock()
mock_tqm._stats = MagicMock()
mock_tqm._notified_handlers = {}
mock_tqm._listening_handlers = {}
mock_tqm.send_callback.return_value = None
mock_iterator = MagicMock()
mock_iterator._play = MagicMock()
mock_iterator._play.handlers = []
mock_play_context = MagicMock()
mock_tqm._failed_hosts = dict()
mock_tqm._unreachable_hosts = dict()
mock_tqm._options = MagicMock()
mock_tqm._notified_handlers = {}
mock_tqm._listening_handlers = {}
strategy_base = StrategyBase(tqm=mock_tqm)
mock_host = MagicMock()
mock_host.name = 'host1'
self.assertEqual(strategy_base.run(iterator=mock_iterator, play_context=mock_play_context), mock_tqm.RUN_OK)
self.assertEqual(strategy_base.run(iterator=mock_iterator, play_context=mock_play_context, result=False), mock_tqm.RUN_ERROR)
mock_tqm._failed_hosts = dict(host1=True)
mock_iterator.get_failed_hosts.return_value = [mock_host]
self.assertEqual(strategy_base.run(iterator=mock_iterator, play_context=mock_play_context, result=False), mock_tqm.RUN_FAILED_HOSTS)
mock_tqm._unreachable_hosts = dict(host1=True)
mock_iterator.get_failed_hosts.return_value = []
self.assertEqual(strategy_base.run(iterator=mock_iterator, play_context=mock_play_context, result=False), mock_tqm.RUN_UNREACHABLE_HOSTS)
def test_strategy_base_get_hosts(self):
mock_hosts = []
for i in range(0, 5):
mock_host = MagicMock()
mock_host.name = "host%02d" % (i+1)
mock_host.has_hostkey = True
mock_hosts.append(mock_host)
mock_inventory = MagicMock()
mock_inventory.get_hosts.return_value = mock_hosts
mock_tqm = MagicMock()
mock_tqm._final_q = MagicMock()
mock_tqm._notified_handlers = {}
mock_tqm._listening_handlers = {}
mock_tqm.get_inventory.return_value = mock_inventory
mock_play = MagicMock()
mock_play.hosts = ["host%02d" % (i+1) for i in range(0, 5)]
strategy_base = StrategyBase(tqm=mock_tqm)
mock_tqm._failed_hosts = []
mock_tqm._unreachable_hosts = []
self.assertEqual(strategy_base.get_hosts_remaining(play=mock_play), mock_hosts)
mock_tqm._failed_hosts = ["host01"]
self.assertEqual(strategy_base.get_hosts_remaining(play=mock_play), mock_hosts[1:])
self.assertEqual(strategy_base.get_failed_hosts(play=mock_play), [mock_hosts[0]])
mock_tqm._unreachable_hosts = ["host02"]
self.assertEqual(strategy_base.get_hosts_remaining(play=mock_play), mock_hosts[2:])
@patch.object(WorkerProcess, 'run')
def test_strategy_base_queue_task(self, mock_worker):
def fake_run(self):
return
mock_worker.run.side_effect = fake_run
fake_loader = DictDataLoader()
mock_var_manager = MagicMock()
mock_host = MagicMock()
mock_host.has_hostkey = True
mock_inventory = MagicMock()
mock_options = MagicMock()
mock_options.module_path = None
tqm = TaskQueueManager(
inventory=mock_inventory,
variable_manager=mock_var_manager,
loader=fake_loader,
options=mock_options,
passwords=None,
)
tqm._initialize_processes(3)
tqm.hostvars = dict()
try:
strategy_base = StrategyBase(tqm=tqm)
strategy_base._queue_task(host=mock_host, task=MagicMock(), task_vars=dict(), play_context=MagicMock())
self.assertEqual(strategy_base._cur_worker, 1)
self.assertEqual(strategy_base._pending_results, 1)
strategy_base._queue_task(host=mock_host, task=MagicMock(), task_vars=dict(), play_context=MagicMock())
self.assertEqual(strategy_base._cur_worker, 2)
self.assertEqual(strategy_base._pending_results, 2)
strategy_base._queue_task(host=mock_host, task=MagicMock(), task_vars=dict(), play_context=MagicMock())
self.assertEqual(strategy_base._cur_worker, 0)
self.assertEqual(strategy_base._pending_results, 3)
finally:
tqm.cleanup()
def test_strategy_base_process_pending_results(self):
mock_tqm = MagicMock()
mock_tqm._terminated = False
mock_tqm._failed_hosts = dict()
mock_tqm._unreachable_hosts = dict()
mock_tqm.send_callback.return_value = None
mock_tqm._notified_handlers = {}
mock_tqm._listening_handlers = {}
queue_items = []
def _queue_empty(*args, **kwargs):
return len(queue_items) == 0
def _queue_get(*args, **kwargs):
if len(queue_items) == 0:
raise Queue.Empty
else:
return queue_items.pop()
mock_queue = MagicMock()
mock_queue.empty.side_effect = _queue_empty
mock_queue.get.side_effect = _queue_get
mock_tqm._final_q = mock_queue
mock_tqm._stats = MagicMock()
mock_tqm._stats.increment.return_value = None
mock_play = MagicMock()
mock_iterator = MagicMock()
mock_iterator._play = mock_play
mock_iterator.mark_host_failed.return_value = None
mock_iterator.get_next_task_for_host.return_value = (None, None)
mock_host = MagicMock()
mock_host.name = 'test01'
mock_host.vars = dict()
mock_host.has_hostkey = True
mock_task = MagicMock()
mock_task._role = None
mock_task.ignore_errors = False
mock_handler_task = MagicMock(Handler)
mock_handler_task.name = 'test handler'
mock_handler_task.action = 'foo'
mock_handler_task.get_name.return_value = "test handler"
mock_handler_task.has_triggered.return_value = False
mock_handler_block = MagicMock()
mock_handler_block.block = [mock_handler_task]
mock_play.handlers = [mock_handler_block]
mock_tqm._notified_handlers = {mock_handler_task: []}
mock_tqm._listening_handlers = {}
mock_group = MagicMock()
mock_group.add_host.return_value = None
def _get_host(host_name):
if host_name == 'test01':
return mock_host
return None
def _get_group(group_name):
if group_name in ('all', 'foo'):
return mock_group
return None
mock_inventory = MagicMock()
mock_inventory._hosts_cache = dict()
mock_inventory.get_host.side_effect = _get_host
mock_inventory.get_group.side_effect = _get_group
mock_inventory.clear_pattern_cache.return_value = None
mock_inventory.get_host_vars.return_value = {}
mock_var_mgr = MagicMock()
mock_var_mgr.set_host_variable.return_value = None
mock_var_mgr.set_host_facts.return_value = None
strategy_base = StrategyBase(tqm=mock_tqm)
strategy_base._inventory = mock_inventory
strategy_base._variable_manager = mock_var_mgr
strategy_base._blocked_hosts = dict()
def _has_dead_workers():
return False
strategy_base._tqm.has_dead_workers = _has_dead_workers
results = strategy_base._wait_on_pending_results(iterator=mock_iterator)
self.assertEqual(len(results), 0)
task_result = TaskResult(host=mock_host, task=mock_task, return_data=dict(changed=True))
queue_items.append(('host_task_ok', task_result))
strategy_base._blocked_hosts['test01'] = True
strategy_base._pending_results = 1
results = strategy_base._wait_on_pending_results(iterator=mock_iterator)
self.assertEqual(len(results), 1)
self.assertEqual(results[0], task_result)
self.assertEqual(strategy_base._pending_results, 0)
self.assertNotIn('test01', strategy_base._blocked_hosts)
task_result = TaskResult(host=mock_host, task=mock_task, return_data='{"failed":true}')
queue_items.append(('host_task_failed', task_result))
strategy_base._blocked_hosts['test01'] = True
strategy_base._pending_results = 1
results = strategy_base._process_pending_results(iterator=mock_iterator)
self.assertEqual(len(results), 1)
self.assertEqual(results[0], task_result)
self.assertEqual(strategy_base._pending_results, 0)
self.assertNotIn('test01', strategy_base._blocked_hosts)
self.assertIn('test01', mock_tqm._failed_hosts)
del mock_tqm._failed_hosts['test01']
task_result = TaskResult(host=mock_host, task=mock_task, return_data='{}')
queue_items.append(('host_unreachable', task_result))
strategy_base._blocked_hosts['test01'] = True
strategy_base._pending_results = 1
results = strategy_base._wait_on_pending_results(iterator=mock_iterator)
self.assertEqual(len(results), 1)
self.assertEqual(results[0], task_result)
self.assertEqual(strategy_base._pending_results, 0)
self.assertNotIn('test01', strategy_base._blocked_hosts)
self.assertIn('test01', mock_tqm._unreachable_hosts)
del mock_tqm._unreachable_hosts['test01']
task_result = TaskResult(host=mock_host, task=mock_task, return_data='{}')
queue_items.append(('host_task_skipped', task_result))
strategy_base._blocked_hosts['test01'] = True
strategy_base._pending_results = 1
results = strategy_base._wait_on_pending_results(iterator=mock_iterator)
self.assertEqual(len(results), 1)
self.assertEqual(results[0], task_result)
self.assertEqual(strategy_base._pending_results, 0)
self.assertNotIn('test01', strategy_base._blocked_hosts)
strategy_base._blocked_hosts['test01'] = True
strategy_base._pending_results = 1
queue_items.append(('add_host', dict(add_host=dict(host_name='newhost01', new_groups=['foo']))))
results = strategy_base._process_pending_results(iterator=mock_iterator)
self.assertEqual(len(results), 0)
self.assertEqual(strategy_base._pending_results, 1)
self.assertIn('test01', strategy_base._blocked_hosts)
queue_items.append(('add_group', mock_host, dict(add_group=dict(group_name='foo'))))
results = strategy_base._process_pending_results(iterator=mock_iterator)
self.assertEqual(len(results), 0)
self.assertEqual(strategy_base._pending_results, 1)
self.assertIn('test01', strategy_base._blocked_hosts)
task_result = TaskResult(host=mock_host, task=mock_task, return_data=dict(changed=True))
queue_items.append(('notify_handler', task_result, 'test handler'))
results = strategy_base._process_pending_results(iterator=mock_iterator)
self.assertEqual(len(results), 0)
self.assertEqual(strategy_base._pending_results, 1)
self.assertIn('test01', strategy_base._blocked_hosts)
self.assertIn(mock_handler_task, strategy_base._notified_handlers)
self.assertIn(mock_host, strategy_base._notified_handlers[mock_handler_task])
queue_items.append(('set_host_var', mock_host, mock_task, None, 'foo', 'bar'))
results = strategy_base._process_pending_results(iterator=mock_iterator)
self.assertEqual(len(results), 0)
self.assertEqual(strategy_base._pending_results, 1)
queue_items.append(('set_host_facts', mock_host, mock_task, None, 'foo', dict()))
results = strategy_base._process_pending_results(iterator=mock_iterator)
self.assertEqual(len(results), 0)
self.assertEqual(strategy_base._pending_results, 1)
queue_items.append(('bad'))
self.assertRaises(AnsibleError, strategy_base._process_pending_results, iterator=mock_iterator)
def test_strategy_base_load_included_file(self):
fake_loader = DictDataLoader({
"test.yml": """
- debug: msg='foo'
""",
"bad.yml": """
""",
})
mock_tqm = MagicMock()
mock_tqm._final_q = MagicMock()
mock_tqm._notified_handlers = {}
mock_tqm._listening_handlers = {}
strategy_base = StrategyBase(tqm=mock_tqm)
strategy_base._loader = fake_loader
mock_play = MagicMock()
mock_block = MagicMock()
mock_block._play = mock_play
mock_block.vars = dict()
mock_task = MagicMock()
mock_task._block = mock_block
mock_task._role = None
mock_iterator = MagicMock()
mock_iterator.mark_host_failed.return_value = None
mock_inc_file = MagicMock()
mock_inc_file._task = mock_task
mock_inc_file._filename = "test.yml"
res = strategy_base._load_included_file(included_file=mock_inc_file, iterator=mock_iterator)
mock_inc_file._filename = "bad.yml"
res = strategy_base._load_included_file(included_file=mock_inc_file, iterator=mock_iterator)
self.assertEqual(res, [])
@patch.object(WorkerProcess, 'run')
def test_strategy_base_run_handlers(self, mock_worker):
def fake_run(*args):
return
mock_worker.side_effect = fake_run
mock_play_context = MagicMock()
mock_handler_task = MagicMock(Handler)
mock_handler_task.action = 'foo'
mock_handler_task.get_name.return_value = "test handler"
mock_handler_task.has_triggered.return_value = False
mock_handler = MagicMock()
mock_handler.block = [mock_handler_task]
mock_handler.flag_for_host.return_value = False
mock_play = MagicMock()
mock_play.handlers = [mock_handler]
mock_host = MagicMock(Host)
mock_host.name = "test01"
mock_host.has_hostkey = True
mock_inventory = MagicMock()
mock_inventory.get_hosts.return_value = [mock_host]
mock_var_mgr = MagicMock()
mock_var_mgr.get_vars.return_value = dict()
mock_iterator = MagicMock
mock_iterator._play = mock_play
fake_loader = DictDataLoader()
mock_options = MagicMock()
mock_options.module_path = None
tqm = TaskQueueManager(
inventory=mock_inventory,
variable_manager=mock_var_mgr,
loader=fake_loader,
options=mock_options,
passwords=None,
)
tqm._initialize_processes(3)
tqm._initialize_notified_handlers(mock_play)
tqm.hostvars = dict()
try:
strategy_base = StrategyBase(tqm=tqm)
strategy_base._inventory = mock_inventory
strategy_base._notified_handlers = {mock_handler_task: [mock_host]}
task_result = TaskResult(Host('host01'), Handler(), dict(changed=False))
tqm._final_q.put(('host_task_ok', task_result))
result = strategy_base.run_handlers(iterator=mock_iterator, play_context=mock_play_context)
finally:
tqm.cleanup()