# (C) 2021, Victor Martinez # GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) from __future__ import (absolute_import, division, print_function) __metaclass__ = type from ansible.playbook.task import Task from ansible.executor.task_result import TaskResult from ansible_collections.community.general.tests.unit.compat import unittest from ansible_collections.community.general.tests.unit.compat.mock import patch, MagicMock, Mock from ansible_collections.community.general.plugins.callback.opentelemetry import OpenTelemetrySource, TaskData, CallbackModule from collections import OrderedDict import sys OPENTELEMETRY_MINIMUM_PYTHON_VERSION = (3, 7) class TestOpentelemetry(unittest.TestCase): @patch('ansible_collections.community.general.plugins.callback.opentelemetry.socket') def setUp(self, mock_socket): # TODO: this python version validation won't be needed as long as the _time_ns call is mocked. if sys.version_info < OPENTELEMETRY_MINIMUM_PYTHON_VERSION: self.skipTest("Python %s+ is needed for OpenTelemetry" % ",".join(map(str, OPENTELEMETRY_MINIMUM_PYTHON_VERSION))) mock_socket.gethostname.return_value = 'my-host' mock_socket.gethostbyname.return_value = '1.2.3.4' self.opentelemetry = OpenTelemetrySource(display=None) self.task_fields = {'args': {}} self.mock_host = Mock('MockHost') self.mock_host.name = 'myhost' self.mock_host._uuid = 'myhost_uuid' self.mock_task = Task() self.mock_task.action = 'myaction' self.mock_task.no_log = False self.mock_task._role = 'myrole' self.mock_task._uuid = 'myuuid' self.mock_task.args = {} self.mock_task.get_name = MagicMock(return_value='mytask') self.mock_task.get_path = MagicMock(return_value='/mypath') self.my_task = TaskData('myuuid', 'mytask', '/mypath', 'myplay', 'myaction', '') self.my_task_result = TaskResult(host=self.mock_host, task=self.mock_task, return_data={}, task_fields=self.task_fields) def test_start_task(self): tasks_data = OrderedDict() self.opentelemetry.start_task( tasks_data, False, 'myplay', self.mock_task ) task_data = tasks_data['myuuid'] self.assertEqual(task_data.uuid, 'myuuid') self.assertEqual(task_data.name, 'mytask') self.assertEqual(task_data.path, '/mypath') self.assertEqual(task_data.play, 'myplay') self.assertEqual(task_data.action, 'myaction') self.assertEqual(task_data.args, {}) def test_finish_task_with_a_host_match(self): tasks_data = OrderedDict() tasks_data['myuuid'] = self.my_task self.opentelemetry.finish_task( tasks_data, 'ok', self.my_task_result ) task_data = tasks_data['myuuid'] host_data = task_data.host_data['myhost_uuid'] self.assertEqual(host_data.uuid, 'myhost_uuid') self.assertEqual(host_data.name, 'myhost') self.assertEqual(host_data.status, 'ok') def test_finish_task_without_a_host_match(self): result = TaskResult(host=None, task=self.mock_task, return_data={}, task_fields=self.task_fields) tasks_data = OrderedDict() tasks_data['myuuid'] = self.my_task self.opentelemetry.finish_task( tasks_data, 'ok', result ) task_data = tasks_data['myuuid'] host_data = task_data.host_data['include'] self.assertEqual(host_data.uuid, 'include') self.assertEqual(host_data.name, 'include') self.assertEqual(host_data.status, 'ok') def test_get_error_message(self): test_cases = ( ('my-exception', 'my-msg', None, 'my-exception'), (None, 'my-msg', None, 'my-msg'), (None, None, None, 'failed'), ) for tc in test_cases: result = self.opentelemetry.get_error_message(generate_test_data(tc[0], tc[1], tc[2])) self.assertEqual(result, tc[3]) def test_get_error_message_from_results(self): test_cases = ( ('my-exception', 'my-msg', None, False, None), (None, 'my-msg', None, False, None), (None, None, None, False, None), ('my-exception', 'my-msg', None, True, 'shell(none) - my-exception'), (None, 'my-msg', None, True, 'shell(none) - my-msg'), (None, None, None, True, 'shell(none) - failed'), ) for tc in test_cases: result = self.opentelemetry.get_error_message_from_results([generate_test_data(tc[0], tc[1], tc[2], tc[3])], 'shell') self.assertEqual(result, tc[4]) def test_enrich_error_message(self): test_cases = ( ('my-exception', 'my-msg', 'my-stderr', 'message: "my-msg"\nexception: "my-exception"\nstderr: "my-stderr"'), ('my-exception', None, 'my-stderr', 'message: "failed"\nexception: "my-exception"\nstderr: "my-stderr"'), (None, 'my-msg', 'my-stderr', 'message: "my-msg"\nexception: "None"\nstderr: "my-stderr"'), ('my-exception', 'my-msg', None, 'message: "my-msg"\nexception: "my-exception"\nstderr: "None"'), ('my-exception', 'my-msg', '\nline1\nline2', 'message: "my-msg"\nexception: "my-exception"\nstderr: "\nline1\nline2"') ) for tc in test_cases: result = self.opentelemetry.enrich_error_message(generate_test_data(tc[0], tc[1], tc[2])) self.assertEqual(result, tc[3]) def test_enrich_error_message_from_results(self): test_cases = ( ('my-exception', 'my-msg', 'my-stderr', False, ''), ('my-exception', None, 'my-stderr', False, ''), (None, 'my-msg', 'my-stderr', False, ''), ('my-exception', 'my-msg', None, False, ''), ('my-exception', 'my-msg', '\nline1\nline2', False, ''), ('my-exception', 'my-msg', 'my-stderr', True, 'shell(none) - message: "my-msg"\nexception: "my-exception"\nstderr: "my-stderr"\n'), ('my-exception', None, 'my-stderr', True, 'shell(none) - message: "failed"\nexception: "my-exception"\nstderr: "my-stderr"\n'), (None, 'my-msg', 'my-stderr', True, 'shell(none) - message: "my-msg"\nexception: "None"\nstderr: "my-stderr"\n'), ('my-exception', 'my-msg', None, True, 'shell(none) - message: "my-msg"\nexception: "my-exception"\nstderr: "None"\n'), ('my-exception', 'my-msg', '\nline1\nline2', True, 'shell(none) - message: "my-msg"\nexception: "my-exception"\nstderr: "\nline1\nline2"\n') ) for tc in test_cases: result = self.opentelemetry.enrich_error_message_from_results([generate_test_data(tc[0], tc[1], tc[2], tc[3])], 'shell') self.assertEqual(result, tc[4]) def test_url_from_args(self): test_cases = ( ({}, ""), ({'url': 'my-url'}, 'my-url'), ({'url': 'my-url', 'api_url': 'my-api_url'}, 'my-url'), ({'api_url': 'my-api_url'}, 'my-api_url'), ({'api_url': 'my-api_url', 'chart_repo_url': 'my-chart_repo_url'}, 'my-api_url') ) for tc in test_cases: result = self.opentelemetry.url_from_args(tc[0]) self.assertEqual(result, tc[1]) def test_parse_and_redact_url_if_possible(self): test_cases = ( ({}, None), ({'url': 'wrong'}, None), ({'url': 'https://my-url'}, 'https://my-url'), ({'url': 'https://user:pass@my-url'}, 'https://my-url'), ({'url': 'https://my-url:{{ my_port }}'}, 'https://my-url:{{ my_port }}'), ({'url': 'https://{{ my_hostname }}:{{ my_port }}'}, None), ({'url': '{{my_schema}}{{ my_hostname }}:{{ my_port }}'}, None) ) for tc in test_cases: result = self.opentelemetry.parse_and_redact_url_if_possible(tc[0]) if tc[1]: self.assertEqual(result.geturl(), tc[1]) else: self.assertEqual(result, tc[1]) def generate_test_data(exception=None, msg=None, stderr=None, failed=False): res_data = OrderedDict() if exception: res_data['exception'] = exception if msg: res_data['msg'] = msg if stderr: res_data['stderr'] = stderr res_data['failed'] = failed return res_data