mirror of
https://github.com/ansible-collections/community.general.git
synced 2024-09-14 20:13:21 +02:00
3033fd96b0
* Move ansible.compat.tests to test/units/compat/. * Fix unit test references to ansible.compat.tests. * Move builtins compat to separate file. * Fix classification of test/units/compat/ dir.
183 lines
6.4 KiB
Python
183 lines
6.4 KiB
Python
# Copyright (c) 2018 Ansible Project
|
|
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
|
|
|
|
# Make coding more python3-ish
|
|
from __future__ import (absolute_import, division, print_function)
|
|
__metaclass__ = type
|
|
|
|
|
|
from units.compat import unittest
|
|
from units.compat.mock import patch, MagicMock
|
|
|
|
from ansible.executor.play_iterator import PlayIterator
|
|
from ansible.playbook import Playbook
|
|
from ansible.playbook.play_context import PlayContext
|
|
from ansible.plugins.strategy.linear import StrategyModule
|
|
from ansible.executor.task_queue_manager import TaskQueueManager
|
|
|
|
from units.mock.loader import DictDataLoader
|
|
from units.mock.path import mock_unfrackpath_noop
|
|
|
|
|
|
class TestStrategyLinear(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
pass
|
|
|
|
def tearDown(self):
|
|
pass
|
|
|
|
@patch('ansible.playbook.role.definition.unfrackpath', mock_unfrackpath_noop)
|
|
def test_noop(self):
|
|
fake_loader = DictDataLoader({
|
|
"test_play.yml": """
|
|
- hosts: all
|
|
gather_facts: no
|
|
tasks:
|
|
- block:
|
|
- block:
|
|
- name: task1
|
|
debug: msg='task1'
|
|
failed_when: inventory_hostname == 'host01'
|
|
|
|
- name: task2
|
|
debug: msg='task2'
|
|
|
|
rescue:
|
|
- name: rescue1
|
|
debug: msg='rescue1'
|
|
|
|
- name: rescue2
|
|
debug: msg='rescue2'
|
|
""",
|
|
})
|
|
|
|
mock_var_manager = MagicMock()
|
|
mock_var_manager._fact_cache = dict()
|
|
mock_var_manager.get_vars.return_value = dict()
|
|
|
|
p = Playbook.load('test_play.yml', loader=fake_loader, variable_manager=mock_var_manager)
|
|
|
|
hosts = []
|
|
for i in range(0, 2):
|
|
host = MagicMock()
|
|
host.name = host.get_name.return_value = 'host%02d' % i
|
|
hosts.append(host)
|
|
|
|
mock_var_manager._fact_cache['host00'] = dict()
|
|
|
|
inventory = MagicMock()
|
|
inventory.get_hosts.return_value = hosts
|
|
inventory.filter_hosts.return_value = hosts
|
|
|
|
play_context = PlayContext(play=p._entries[0])
|
|
|
|
itr = PlayIterator(
|
|
inventory=inventory,
|
|
play=p._entries[0],
|
|
play_context=play_context,
|
|
variable_manager=mock_var_manager,
|
|
all_vars=dict(),
|
|
)
|
|
|
|
mock_options = MagicMock()
|
|
mock_options.module_path = None
|
|
|
|
tqm = TaskQueueManager(
|
|
inventory=inventory,
|
|
variable_manager=mock_var_manager,
|
|
loader=fake_loader,
|
|
options=mock_options,
|
|
passwords=None,
|
|
)
|
|
tqm._initialize_processes(3)
|
|
strategy = StrategyModule(tqm)
|
|
|
|
# implicit meta: flush_handlers
|
|
hosts_left = strategy.get_hosts_left(itr)
|
|
hosts_tasks = strategy._get_next_task_lockstep(hosts_left, itr)
|
|
host1_task = hosts_tasks[0][1]
|
|
host2_task = hosts_tasks[1][1]
|
|
self.assertIsNotNone(host1_task)
|
|
self.assertIsNotNone(host2_task)
|
|
self.assertEqual(host1_task.action, 'meta')
|
|
self.assertEqual(host2_task.action, 'meta')
|
|
|
|
# debug: task1, debug: task1
|
|
hosts_left = strategy.get_hosts_left(itr)
|
|
hosts_tasks = strategy._get_next_task_lockstep(hosts_left, itr)
|
|
host1_task = hosts_tasks[0][1]
|
|
host2_task = hosts_tasks[1][1]
|
|
self.assertIsNotNone(host1_task)
|
|
self.assertIsNotNone(host2_task)
|
|
self.assertEqual(host1_task.action, 'debug')
|
|
self.assertEqual(host2_task.action, 'debug')
|
|
self.assertEqual(host1_task.name, 'task1')
|
|
self.assertEqual(host2_task.name, 'task1')
|
|
|
|
# mark the second host failed
|
|
itr.mark_host_failed(hosts[1])
|
|
|
|
# debug: task2, meta: noop
|
|
hosts_left = strategy.get_hosts_left(itr)
|
|
hosts_tasks = strategy._get_next_task_lockstep(hosts_left, itr)
|
|
host1_task = hosts_tasks[0][1]
|
|
host2_task = hosts_tasks[1][1]
|
|
self.assertIsNotNone(host1_task)
|
|
self.assertIsNotNone(host2_task)
|
|
self.assertEqual(host1_task.action, 'debug')
|
|
self.assertEqual(host2_task.action, 'meta')
|
|
self.assertEqual(host1_task.name, 'task2')
|
|
self.assertEqual(host2_task.name, '')
|
|
|
|
# meta: noop, debug: rescue1
|
|
hosts_left = strategy.get_hosts_left(itr)
|
|
hosts_tasks = strategy._get_next_task_lockstep(hosts_left, itr)
|
|
host1_task = hosts_tasks[0][1]
|
|
host2_task = hosts_tasks[1][1]
|
|
self.assertIsNotNone(host1_task)
|
|
self.assertIsNotNone(host2_task)
|
|
self.assertEqual(host1_task.action, 'meta')
|
|
self.assertEqual(host2_task.action, 'debug')
|
|
self.assertEqual(host1_task.name, '')
|
|
self.assertEqual(host2_task.name, 'rescue1')
|
|
|
|
# meta: noop, debug: rescue2
|
|
hosts_left = strategy.get_hosts_left(itr)
|
|
hosts_tasks = strategy._get_next_task_lockstep(hosts_left, itr)
|
|
host1_task = hosts_tasks[0][1]
|
|
host2_task = hosts_tasks[1][1]
|
|
self.assertIsNotNone(host1_task)
|
|
self.assertIsNotNone(host2_task)
|
|
self.assertEqual(host1_task.action, 'meta')
|
|
self.assertEqual(host2_task.action, 'debug')
|
|
self.assertEqual(host1_task.name, '')
|
|
self.assertEqual(host2_task.name, 'rescue2')
|
|
|
|
# implicit meta: flush_handlers
|
|
hosts_left = strategy.get_hosts_left(itr)
|
|
hosts_tasks = strategy._get_next_task_lockstep(hosts_left, itr)
|
|
host1_task = hosts_tasks[0][1]
|
|
host2_task = hosts_tasks[1][1]
|
|
self.assertIsNotNone(host1_task)
|
|
self.assertIsNotNone(host2_task)
|
|
self.assertEqual(host1_task.action, 'meta')
|
|
self.assertEqual(host2_task.action, 'meta')
|
|
|
|
# implicit meta: flush_handlers
|
|
hosts_left = strategy.get_hosts_left(itr)
|
|
hosts_tasks = strategy._get_next_task_lockstep(hosts_left, itr)
|
|
host1_task = hosts_tasks[0][1]
|
|
host2_task = hosts_tasks[1][1]
|
|
self.assertIsNotNone(host1_task)
|
|
self.assertIsNotNone(host2_task)
|
|
self.assertEqual(host1_task.action, 'meta')
|
|
self.assertEqual(host2_task.action, 'meta')
|
|
|
|
# end of iteration
|
|
hosts_left = strategy.get_hosts_left(itr)
|
|
hosts_tasks = strategy._get_next_task_lockstep(hosts_left, itr)
|
|
host1_task = hosts_tasks[0][1]
|
|
host2_task = hosts_tasks[1][1]
|
|
self.assertIsNone(host1_task)
|
|
self.assertIsNone(host2_task)
|