diff --git a/.github/BOTMETA.yml b/.github/BOTMETA.yml index 22c6d60740..f7aae7f272 100644 --- a/.github/BOTMETA.yml +++ b/.github/BOTMETA.yml @@ -49,6 +49,9 @@ files: maintainers: dagwieers $callbacks/diy.py: maintainers: theque5t + $callbacks/elastic.py: + maintainers: v1v + keywords: apm observability $callbacks/hipchat.py: {} $callbacks/jabber.py: {} $callbacks/loganalytics.py: diff --git a/plugins/callback/elastic.py b/plugins/callback/elastic.py new file mode 100644 index 0000000000..188f168f9b --- /dev/null +++ b/plugins/callback/elastic.py @@ -0,0 +1,408 @@ +# (C) 2021, Victor Martinez +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +DOCUMENTATION = ''' + author: Victor Martinez (@v1v) + name: elastic + type: notification + short_description: Create distributed traces for each Ansible task in Elastic APM + version_added: 3.8.0 + description: + - This callback creates distributed traces for each Ansible task in Elastic APM. + - You can configure the plugin with environment variables. + - See U(https://www.elastic.co/guide/en/apm/agent/python/current/configuration.html). + options: + hide_task_arguments: + default: false + type: bool + description: + - Hide the arguments for a task. + env: + - name: ANSIBLE_OPENTELEMETRY_HIDE_TASK_ARGUMENTS + apm_service_name: + default: ansible + type: str + description: + - The service name resource attribute. + env: + - name: ELASTIC_APM_SERVICE_NAME + apm_server_url: + type: str + description: + - Use the APM server and its environment variables. + env: + - name: ELASTIC_APM_SERVER_URL + apm_secret_token: + type: str + description: + - Use the APM server token + env: + - name: ELASTIC_APM_SECRET_TOKEN + apm_api_key: + type: str + description: + - Use the APM API key + env: + - name: ELASTIC_APM_API_KEY + apm_verify_server_cert: + default: true + type: bool + description: + - Verifies the SSL certificate if an HTTPS connection. + env: + - name: ELASTIC_APM_VERIFY_SERVER_CERT + traceparent: + type: str + description: + - The L(W3C Trace Context header traceparent,https://www.w3.org/TR/trace-context-1/#traceparent-header). + env: + - name: TRACEPARENT + requirements: + - elastic-apm (Python library) +''' + + +EXAMPLES = ''' +examples: | + Enable the plugin in ansible.cfg: + [defaults] + callbacks_enabled = community.general.elastic + + Set the environment variable: + export ELASTIC_APM_SERVER_URL= + export ELASTIC_APM_SERVICE_NAME=your_service_name + export ELASTIC_APM_API_KEY=your_APM_API_KEY +''' + +import getpass +import socket +import time +import uuid + +from collections import OrderedDict +from os.path import basename + +from ansible.errors import AnsibleError, AnsibleRuntimeError +from ansible.module_utils.six import raise_from +from ansible.plugins.callback import CallbackBase + +try: + from elasticapm import Client, capture_span, trace_parent_from_string, instrument, label +except ImportError as imp_exc: + ELASTIC_LIBRARY_IMPORT_ERROR = imp_exc +else: + ELASTIC_LIBRARY_IMPORT_ERROR = None + + +class TaskData: + """ + Data about an individual task. + """ + + def __init__(self, uuid, name, path, play, action, args): + self.uuid = uuid + self.name = name + self.path = path + self.play = play + self.host_data = OrderedDict() + self.start = time.time() + self.action = action + self.args = args + + def add_host(self, host): + if host.uuid in self.host_data: + if host.status == 'included': + # concatenate task include output from multiple items + host.result = '%s\n%s' % (self.host_data[host.uuid].result, host.result) + else: + return + + self.host_data[host.uuid] = host + + +class HostData: + """ + Data about an individual host. + """ + + def __init__(self, uuid, name, status, result): + self.uuid = uuid + self.name = name + self.status = status + self.result = result + self.finish = time.time() + + +class ElasticSource(object): + def __init__(self, display): + self.ansible_playbook = "" + self.ansible_version = None + self.session = str(uuid.uuid4()) + self.host = socket.gethostname() + try: + self.ip_address = socket.gethostbyname(socket.gethostname()) + except Exception as e: + self.ip_address = None + self.user = getpass.getuser() + + self._display = display + + def start_task(self, tasks_data, hide_task_arguments, play_name, task): + """ record the start of a task for one or more hosts """ + + uuid = task._uuid + + if uuid in tasks_data: + return + + name = task.get_name().strip() + path = task.get_path() + action = task.action + args = None + + if not task.no_log and not hide_task_arguments: + args = ', '.join(('%s=%s' % a for a in task.args.items())) + + tasks_data[uuid] = TaskData(uuid, name, path, play_name, action, args) + + def finish_task(self, tasks_data, status, result): + """ record the results of a task for a single host """ + + task_uuid = result._task._uuid + + if hasattr(result, '_host') and result._host is not None: + host_uuid = result._host._uuid + host_name = result._host.name + else: + host_uuid = 'include' + host_name = 'include' + + task = tasks_data[task_uuid] + + if self.ansible_version is None and result._task_fields['args'].get('_ansible_version'): + self.ansible_version = result._task_fields['args'].get('_ansible_version') + + task.add_host(HostData(host_uuid, host_name, status, result)) + + def generate_distributed_traces(self, tasks_data, status, end_time, traceparent, apm_service_name, + apm_server_url, apm_verify_server_cert, apm_secret_token, apm_api_key): + """ generate distributed traces from the collected TaskData and HostData """ + + tasks = [] + parent_start_time = None + for task_uuid, task in tasks_data.items(): + if parent_start_time is None: + parent_start_time = task.start + tasks.append(task) + + apm_cli = self.init_apm_client(apm_server_url, apm_service_name, apm_verify_server_cert, apm_secret_token, apm_api_key) + if apm_cli: + instrument() # Only call this once, as early as possible. + if traceparent: + parent = trace_parent_from_string(traceparent) + apm_cli.begin_transaction("Session", trace_parent=parent, start=parent_start_time) + else: + apm_cli.begin_transaction("Session", start=parent_start_time) + # Populate trace metadata attributes + if self.ansible_version is not None: + label(ansible_version=self.ansible_version) + label(ansible_session=self.session, ansible_host_name=self.host, ansible_host_user=self.user) + if self.ip_address is not None: + label(ansible_host_ip=self.ip_address) + + for task_data in tasks: + for host_uuid, host_data in task_data.host_data.items(): + self.create_span_data(apm_cli, task_data, host_data) + + apm_cli.end_transaction(name=__name__, result=status, duration=end_time - parent_start_time) + + def create_span_data(self, apm_cli, task_data, host_data): + """ create the span with the given TaskData and HostData """ + + name = '[%s] %s: %s' % (host_data.name, task_data.play, task_data.name) + + message = "success" + status = "success" + if host_data.status == 'included': + rc = 0 + else: + res = host_data.result._result + rc = res.get('rc', 0) + if host_data.status == 'failed': + if res.get('exception') is not None: + message = res['exception'].strip().split('\n')[-1] + elif 'msg' in res: + message = res['msg'] + else: + message = 'failed' + status = "failure" + elif host_data.status == 'skipped': + if 'skip_reason' in res: + message = res['skip_reason'] + else: + message = 'skipped' + status = "unknown" + + with capture_span(task_data.name, + start=task_data.start, + span_type="ansible.task.run", + duration=host_data.finish - task_data.start, + labels={"ansible.task.args": task_data.args, + "ansible.task.message": message, + "ansible.task.module": task_data.action, + "ansible.task.name": name, + "ansible.task.result": rc, + "ansible.task.host.name": host_data.name, + "ansible.task.host.status": host_data.status}) as span: + span.outcome = status + if 'failure' in status: + exception = AnsibleRuntimeError(message="{0}: {1} failed with error message {2}".format(task_data.action, name, message)) + apm_cli.capture_exception(exc_info=(type(exception), exception, exception.__traceback__), handled=True) + + def init_apm_client(self, apm_server_url, apm_service_name, apm_verify_server_cert, apm_secret_token, apm_api_key): + if apm_server_url: + return Client(service_name=apm_service_name, + server_url=apm_server_url, + verify_server_cert=False, + secret_token=apm_secret_token, + api_key=apm_api_key, + use_elastic_traceparent_header=True, + debug=True) + + +class CallbackModule(CallbackBase): + """ + This callback creates distributed traces with Elastic APM. + """ + + CALLBACK_VERSION = 2.0 + CALLBACK_TYPE = 'notification' + CALLBACK_NAME = 'community.general.elastic' + CALLBACK_NEEDS_ENABLED = True + + def __init__(self, display=None): + super(CallbackModule, self).__init__(display=display) + self.hide_task_arguments = None + self.apm_service_name = None + self.ansible_playbook = None + self.traceparent = False + self.play_name = None + self.tasks_data = None + self.errors = 0 + self.disabled = False + + if ELASTIC_LIBRARY_IMPORT_ERROR: + raise_from( + AnsibleError('The `elastic-apm` must be installed to use this plugin'), + ELASTIC_LIBRARY_IMPORT_ERROR) + + self.tasks_data = OrderedDict() + + self.elastic = ElasticSource(display=self._display) + + def set_options(self, task_keys=None, var_options=None, direct=None): + super(CallbackModule, self).set_options(task_keys=task_keys, + var_options=var_options, + direct=direct) + + self.hide_task_arguments = self.get_option('hide_task_arguments') + + self.apm_service_name = self.get_option('apm_service_name') + if not self.apm_service_name: + self.apm_service_name = 'ansible' + + self.apm_server_url = self.get_option('apm_server_url') + self.apm_secret_token = self.get_option('apm_secret_token') + self.apm_api_key = self.get_option('apm_api_key') + self.apm_verify_server_cert = self.get_option('apm_verify_server_cert') + self.traceparent = self.get_option('traceparent') + + def v2_playbook_on_start(self, playbook): + self.ansible_playbook = basename(playbook._file_name) + + def v2_playbook_on_play_start(self, play): + self.play_name = play.get_name() + + def v2_runner_on_no_hosts(self, task): + self.elastic.start_task( + self.tasks_data, + self.hide_task_arguments, + self.play_name, + task + ) + + def v2_playbook_on_task_start(self, task, is_conditional): + self.elastic.start_task( + self.tasks_data, + self.hide_task_arguments, + self.play_name, + task + ) + + def v2_playbook_on_cleanup_task_start(self, task): + self.elastic.start_task( + self.tasks_data, + self.hide_task_arguments, + self.play_name, + task + ) + + def v2_playbook_on_handler_task_start(self, task): + self.elastic.start_task( + self.tasks_data, + self.hide_task_arguments, + self.play_name, + task + ) + + def v2_runner_on_failed(self, result, ignore_errors=False): + self.errors += 1 + self.elastic.finish_task( + self.tasks_data, + 'failed', + result + ) + + def v2_runner_on_ok(self, result): + self.elastic.finish_task( + self.tasks_data, + 'ok', + result + ) + + def v2_runner_on_skipped(self, result): + self.elastic.finish_task( + self.tasks_data, + 'skipped', + result + ) + + def v2_playbook_on_include(self, included_file): + self.elastic.finish_task( + self.tasks_data, + 'included', + included_file + ) + + def v2_playbook_on_stats(self, stats): + if self.errors == 0: + status = "success" + else: + status = "failure" + self.elastic.generate_distributed_traces( + self.tasks_data, + status, + time.time(), + self.traceparent, + self.apm_service_name, + self.apm_server_url, + self.apm_verify_server_cert, + self.apm_secret_token, + self.apm_api_key + ) + + def v2_runner_on_async_failed(self, result, **kwargs): + self.errors += 1 diff --git a/tests/unit/plugins/callback/test_elastic.py b/tests/unit/plugins/callback/test_elastic.py new file mode 100644 index 0000000000..cba41bd546 --- /dev/null +++ b/tests/unit/plugins/callback/test_elastic.py @@ -0,0 +1,91 @@ +# (C) 2021, Victor Martinez +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +from ansible.playbook.task import Task +from ansible.executor.task_result import TaskResult +from ansible_collections.community.general.tests.unit.compat import unittest +from ansible_collections.community.general.tests.unit.compat.mock import patch, MagicMock, Mock +from ansible_collections.community.general.plugins.callback.elastic import ElasticSource, TaskData +from collections import OrderedDict +import sys + +ELASTIC_MINIMUM_PYTHON_VERSION = (3, 6) + + +class TestOpentelemetry(unittest.TestCase): + @patch('ansible_collections.community.general.plugins.callback.elastic.socket') + def setUp(self, mock_socket): + if sys.version_info < ELASTIC_MINIMUM_PYTHON_VERSION: + self.skipTest("Python %s+ is needed for Elastic" % + ",".join(map(str, ELASTIC_MINIMUM_PYTHON_VERSION))) + mock_socket.gethostname.return_value = 'my-host' + mock_socket.gethostbyname.return_value = '1.2.3.4' + self.elastic = ElasticSource(display=None) + self.task_fields = {'args': {}} + self.mock_host = Mock('MockHost') + self.mock_host.name = 'myhost' + self.mock_host._uuid = 'myhost_uuid' + self.mock_task = Task() + self.mock_task.action = 'myaction' + self.mock_task.no_log = False + self.mock_task._role = 'myrole' + self.mock_task._uuid = 'myuuid' + self.mock_task.args = {} + self.mock_task.get_name = MagicMock(return_value='mytask') + self.mock_task.get_path = MagicMock(return_value='/mypath') + self.my_task = TaskData('myuuid', 'mytask', '/mypath', 'myplay', 'myaction', '') + self.my_task_result = TaskResult(host=self.mock_host, task=self.mock_task, return_data={}, task_fields=self.task_fields) + + def test_start_task(self): + tasks_data = OrderedDict() + + self.elastic.start_task( + tasks_data, + False, + 'myplay', + self.mock_task + ) + + task_data = tasks_data['myuuid'] + self.assertEqual(task_data.uuid, 'myuuid') + self.assertEqual(task_data.name, 'mytask') + self.assertEqual(task_data.path, '/mypath') + self.assertEqual(task_data.play, 'myplay') + self.assertEqual(task_data.action, 'myaction') + self.assertEqual(task_data.args, '') + + def test_finish_task_with_a_host_match(self): + tasks_data = OrderedDict() + tasks_data['myuuid'] = self.my_task + + self.elastic.finish_task( + tasks_data, + 'ok', + self.my_task_result + ) + + task_data = tasks_data['myuuid'] + host_data = task_data.host_data['myhost_uuid'] + self.assertEqual(host_data.uuid, 'myhost_uuid') + self.assertEqual(host_data.name, 'myhost') + self.assertEqual(host_data.status, 'ok') + + def test_finish_task_without_a_host_match(self): + result = TaskResult(host=None, task=self.mock_task, return_data={}, task_fields=self.task_fields) + tasks_data = OrderedDict() + tasks_data['myuuid'] = self.my_task + + self.elastic.finish_task( + tasks_data, + 'ok', + result + ) + + task_data = tasks_data['myuuid'] + host_data = task_data.host_data['include'] + self.assertEqual(host_data.uuid, 'include') + self.assertEqual(host_data.name, 'include') + self.assertEqual(host_data.status, 'ok') diff --git a/tests/unit/requirements.txt b/tests/unit/requirements.txt index 904395d0a9..59ea49ed90 100644 --- a/tests/unit/requirements.txt +++ b/tests/unit/requirements.txt @@ -33,3 +33,6 @@ dataclasses ; python_version == '3.6' opentelemetry-api ; python_version >= '3.6' and python_version < '3.10' opentelemetry-exporter-otlp ; python_version >= '3.6' and python_version < '3.10' opentelemetry-sdk ; python_version >= '3.6' and python_version < '3.10' + +# requirement for the elastic callback plugin +elastic-apm ; python_version >= '3.6'