mirror of
https://github.com/ansible-collections/community.general.git
synced 2024-09-14 20:13:21 +02:00
8de1c0c205
* refactor and test
* require version >= 5.21.0
Prior to this version the status output was different
* python version compatability
* use exception classes from utils
* modify monit to use 'status' output instead of 'summary' output
The summary output is a fixed width table which truncates the
contents and prevents us from parsing the actual status of the
program.
* add integration tests + fixes
* remove unused handlers in monit integration test
* fix lint
* add '__metaclass__ = type' to integration python files
* raise AttributeError
* simplify status
* lint: add type to parameter docs
* remove lint ignore
* move monit process config into main file
* specify path to monit PID file
* set config location based on os_family
* create required directories
* update aliases to set group and skips
* add changelog
* add author
* add types to docs
* add EPEL repo
* custom vars for centos-6
* uninstall EPEL
* support older versions
* wait for status to change before exiting
* use 'validate' to force status updates
* handle 'execution failed'
* better status output for errors
* add more context to failure + standardize
* don't check rc for validate
* legacy string format support
* add integration test for 'reloaded' and 'present'
* don't wait after reload
* lint
* Revert "uninstall EPEL"
This reverts commit 4d548718d0
.
* make 'present' more robust
* Apply suggestions from code review
Co-authored-by: Andrew Klychkov <aaklychkov@mail.ru>
* add license header
* drop daemon.py and use python-daemon instead
* skip python2.6 which is not supported by python-daemon
* refactor test tasks for reuse
* cleanup files after test
* lint
* start process before enabling monit
This shouldn't be necessary but I'm adding it in the hopes
it will make tests more robust.
* retry task
* attempt to rescue the task on failure
* fix indentation
* ignore check if rescue ran
* restart monit instead of reload
Co-authored-by: Andrew Klychkov <aaklychkov@mail.ru>
140 lines
5.7 KiB
Python
140 lines
5.7 KiB
Python
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
|
|
|
|
from __future__ import (absolute_import, division, print_function)
|
|
__metaclass__ = type
|
|
|
|
import mock
|
|
import pytest
|
|
|
|
from ansible_collections.community.general.tests.unit.compat import unittest
|
|
from ansible_collections.community.general.plugins.modules.monitoring import monit
|
|
from ansible_collections.community.general.tests.unit.plugins.modules.utils import AnsibleExitJson, AnsibleFailJson
|
|
|
|
|
|
TEST_OUTPUT = """
|
|
Process '%s'
|
|
status %s
|
|
monitoring status Not monitored
|
|
monitoring mode active
|
|
"""
|
|
|
|
|
|
class MonitTest(unittest.TestCase):
|
|
def setUp(self):
|
|
self.module = mock.MagicMock()
|
|
self.module.exit_json.side_effect = AnsibleExitJson
|
|
self.module.fail_json.side_effect = AnsibleFailJson
|
|
self.monit = monit.Monit(self.module, 'monit', 'processX', 1)
|
|
self.monit._status_change_retry_count = 1
|
|
mock_sleep = mock.patch('time.sleep')
|
|
mock_sleep.start()
|
|
self.addCleanup(mock_sleep.stop)
|
|
|
|
def patch_status(self, side_effect):
|
|
if not isinstance(side_effect, list):
|
|
side_effect = [side_effect]
|
|
return mock.patch.object(self.monit, 'get_status', side_effect=side_effect)
|
|
|
|
def test_change_state_success(self):
|
|
with self.patch_status([monit.Status.OK, monit.Status.NOT_MONITORED]):
|
|
with self.assertRaises(AnsibleExitJson):
|
|
self.monit.stop()
|
|
self.module.fail_json.assert_not_called()
|
|
self.module.run_command.assert_called_with('monit stop processX', check_rc=True)
|
|
|
|
def test_change_state_fail(self):
|
|
with self.patch_status([monit.Status.OK] * 3):
|
|
with self.assertRaises(AnsibleFailJson):
|
|
self.monit.stop()
|
|
|
|
def test_reload_fail(self):
|
|
self.module.run_command.return_value = (1, 'stdout', 'stderr')
|
|
with self.assertRaises(AnsibleFailJson):
|
|
self.monit.reload()
|
|
|
|
def test_reload(self):
|
|
self.module.run_command.return_value = (0, '', '')
|
|
with self.patch_status(monit.Status.OK):
|
|
with self.assertRaises(AnsibleExitJson):
|
|
self.monit.reload()
|
|
|
|
def test_wait_for_status_to_stop_pending(self):
|
|
status = [
|
|
monit.Status.MISSING,
|
|
monit.Status.DOES_NOT_EXIST,
|
|
monit.Status.INITIALIZING,
|
|
monit.Status.OK.pending(),
|
|
monit.Status.OK
|
|
]
|
|
with self.patch_status(status) as get_status:
|
|
self.monit.wait_for_monit_to_stop_pending()
|
|
self.assertEqual(get_status.call_count, len(status))
|
|
|
|
def test_wait_for_status_change(self):
|
|
with self.patch_status([monit.Status.NOT_MONITORED, monit.Status.OK]) as get_status:
|
|
self.monit.wait_for_status_change(monit.Status.NOT_MONITORED)
|
|
self.assertEqual(get_status.call_count, 2)
|
|
|
|
def test_wait_for_status_change_fail(self):
|
|
with self.patch_status([monit.Status.OK] * 3):
|
|
with self.assertRaises(AnsibleFailJson):
|
|
self.monit.wait_for_status_change(monit.Status.OK)
|
|
|
|
def test_monitor(self):
|
|
with self.patch_status([monit.Status.NOT_MONITORED, monit.Status.OK.pending(), monit.Status.OK]):
|
|
with self.assertRaises(AnsibleExitJson):
|
|
self.monit.monitor()
|
|
|
|
def test_monitor_fail(self):
|
|
with self.patch_status([monit.Status.NOT_MONITORED] * 3):
|
|
with self.assertRaises(AnsibleFailJson):
|
|
self.monit.monitor()
|
|
|
|
def test_timeout(self):
|
|
self.monit.timeout = 0
|
|
with self.patch_status(monit.Status.NOT_MONITORED.pending()):
|
|
with self.assertRaises(AnsibleFailJson):
|
|
self.monit.wait_for_monit_to_stop_pending()
|
|
|
|
|
|
@pytest.mark.parametrize('status_name', [name for name in monit.StatusValue.ALL_STATUS])
|
|
def test_status_value(status_name):
|
|
value = getattr(monit.StatusValue, status_name.upper())
|
|
status = monit.StatusValue(value)
|
|
assert getattr(status, 'is_%s' % status_name)
|
|
assert not all(getattr(status, 'is_%s' % name) for name in monit.StatusValue.ALL_STATUS if name != status_name)
|
|
|
|
|
|
BASIC_OUTPUT_CASES = [
|
|
(TEST_OUTPUT % ('processX', name), getattr(monit.Status, name.upper()))
|
|
for name in monit.StatusValue.ALL_STATUS
|
|
]
|
|
|
|
|
|
@pytest.mark.parametrize('output, expected', BASIC_OUTPUT_CASES + [
|
|
('', monit.Status.MISSING),
|
|
(TEST_OUTPUT % ('processY', 'OK'), monit.Status.MISSING),
|
|
(TEST_OUTPUT % ('processX', 'Not Monitored - start pending'), monit.Status.OK),
|
|
(TEST_OUTPUT % ('processX', 'Monitored - stop pending'), monit.Status.NOT_MONITORED),
|
|
(TEST_OUTPUT % ('processX', 'Monitored - restart pending'), monit.Status.OK),
|
|
(TEST_OUTPUT % ('processX', 'Not Monitored - monitor pending'), monit.Status.OK),
|
|
(TEST_OUTPUT % ('processX', 'Does not exist'), monit.Status.DOES_NOT_EXIST),
|
|
(TEST_OUTPUT % ('processX', 'Not monitored'), monit.Status.NOT_MONITORED),
|
|
(TEST_OUTPUT % ('processX', 'Running'), monit.Status.OK),
|
|
(TEST_OUTPUT % ('processX', 'Execution failed | Does not exist'), monit.Status.EXECUTION_FAILED),
|
|
])
|
|
def test_parse_status(output, expected):
|
|
status = monit.Monit(None, '', 'processX', 0)._parse_status(output, '')
|
|
assert status == expected
|
|
|
|
|
|
@pytest.mark.parametrize('output, expected', [
|
|
('This is monit version 5.18.1', '5.18.1'),
|
|
('This is monit version 12.18', '12.18'),
|
|
('This is monit version 5.1.12', '5.1.12'),
|
|
])
|
|
def test_parse_version(output, expected):
|
|
module = mock.MagicMock()
|
|
module.run_command.return_value = (0, output, '')
|
|
raw_version, version_tuple = monit.Monit(module, '', 'processX', 0)._get_monit_version()
|
|
assert raw_version == expected
|