mirror of
https://github.com/ansible-collections/community.general.git
synced 2024-09-14 20:13:21 +02:00
* Add elastic callback plugin
* Capture task failures
* Catch errors and add UTs
* Skip 3.5< python versions and install dependency
* fix lint
* Fix linting
* Fix linting
* Add botmeta
* Apply suggestions from code review
Co-authored-by: Felix Fontein <felix@fontein.de>
* It's not required
* As suggested in the code review OrderedDict has been added to the Python stdlib since version 2.7
* Update plugins/callback/elastic.py
Co-authored-by: Felix Fontein <felix@fontein.de>
Co-authored-by: Felix Fontein <felix@fontein.de>
(cherry picked from commit 905f4dcfa2
)
Co-authored-by: Victor Martinez <victormartinezrubio@gmail.com>
This commit is contained in:
parent
d7d1659e34
commit
16aa776c93
4 changed files with 505 additions and 0 deletions
3
.github/BOTMETA.yml
vendored
3
.github/BOTMETA.yml
vendored
|
@ -49,6 +49,9 @@ files:
|
|||
maintainers: dagwieers
|
||||
$callbacks/diy.py:
|
||||
maintainers: theque5t
|
||||
$callbacks/elastic.py:
|
||||
maintainers: v1v
|
||||
keywords: apm observability
|
||||
$callbacks/hipchat.py: {}
|
||||
$callbacks/jabber.py: {}
|
||||
$callbacks/loganalytics.py:
|
||||
|
|
408
plugins/callback/elastic.py
Normal file
408
plugins/callback/elastic.py
Normal file
|
@ -0,0 +1,408 @@
|
|||
# (C) 2021, Victor Martinez <VictorMartinezRubio@gmail.com>
|
||||
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
|
||||
|
||||
from __future__ import (absolute_import, division, print_function)
|
||||
__metaclass__ = type
|
||||
|
||||
DOCUMENTATION = '''
|
||||
author: Victor Martinez (@v1v) <VictorMartinezRubio@gmail.com>
|
||||
name: elastic
|
||||
type: notification
|
||||
short_description: Create distributed traces for each Ansible task in Elastic APM
|
||||
version_added: 3.8.0
|
||||
description:
|
||||
- This callback creates distributed traces for each Ansible task in Elastic APM.
|
||||
- You can configure the plugin with environment variables.
|
||||
- See U(https://www.elastic.co/guide/en/apm/agent/python/current/configuration.html).
|
||||
options:
|
||||
hide_task_arguments:
|
||||
default: false
|
||||
type: bool
|
||||
description:
|
||||
- Hide the arguments for a task.
|
||||
env:
|
||||
- name: ANSIBLE_OPENTELEMETRY_HIDE_TASK_ARGUMENTS
|
||||
apm_service_name:
|
||||
default: ansible
|
||||
type: str
|
||||
description:
|
||||
- The service name resource attribute.
|
||||
env:
|
||||
- name: ELASTIC_APM_SERVICE_NAME
|
||||
apm_server_url:
|
||||
type: str
|
||||
description:
|
||||
- Use the APM server and its environment variables.
|
||||
env:
|
||||
- name: ELASTIC_APM_SERVER_URL
|
||||
apm_secret_token:
|
||||
type: str
|
||||
description:
|
||||
- Use the APM server token
|
||||
env:
|
||||
- name: ELASTIC_APM_SECRET_TOKEN
|
||||
apm_api_key:
|
||||
type: str
|
||||
description:
|
||||
- Use the APM API key
|
||||
env:
|
||||
- name: ELASTIC_APM_API_KEY
|
||||
apm_verify_server_cert:
|
||||
default: true
|
||||
type: bool
|
||||
description:
|
||||
- Verifies the SSL certificate if an HTTPS connection.
|
||||
env:
|
||||
- name: ELASTIC_APM_VERIFY_SERVER_CERT
|
||||
traceparent:
|
||||
type: str
|
||||
description:
|
||||
- The L(W3C Trace Context header traceparent,https://www.w3.org/TR/trace-context-1/#traceparent-header).
|
||||
env:
|
||||
- name: TRACEPARENT
|
||||
requirements:
|
||||
- elastic-apm (Python library)
|
||||
'''
|
||||
|
||||
|
||||
EXAMPLES = '''
|
||||
examples: |
|
||||
Enable the plugin in ansible.cfg:
|
||||
[defaults]
|
||||
callbacks_enabled = community.general.elastic
|
||||
|
||||
Set the environment variable:
|
||||
export ELASTIC_APM_SERVER_URL=<your APM server URL)>
|
||||
export ELASTIC_APM_SERVICE_NAME=your_service_name
|
||||
export ELASTIC_APM_API_KEY=your_APM_API_KEY
|
||||
'''
|
||||
|
||||
import getpass
|
||||
import socket
|
||||
import time
|
||||
import uuid
|
||||
|
||||
from collections import OrderedDict
|
||||
from os.path import basename
|
||||
|
||||
from ansible.errors import AnsibleError, AnsibleRuntimeError
|
||||
from ansible.module_utils.six import raise_from
|
||||
from ansible.plugins.callback import CallbackBase
|
||||
|
||||
try:
|
||||
from elasticapm import Client, capture_span, trace_parent_from_string, instrument, label
|
||||
except ImportError as imp_exc:
|
||||
ELASTIC_LIBRARY_IMPORT_ERROR = imp_exc
|
||||
else:
|
||||
ELASTIC_LIBRARY_IMPORT_ERROR = None
|
||||
|
||||
|
||||
class TaskData:
|
||||
"""
|
||||
Data about an individual task.
|
||||
"""
|
||||
|
||||
def __init__(self, uuid, name, path, play, action, args):
|
||||
self.uuid = uuid
|
||||
self.name = name
|
||||
self.path = path
|
||||
self.play = play
|
||||
self.host_data = OrderedDict()
|
||||
self.start = time.time()
|
||||
self.action = action
|
||||
self.args = args
|
||||
|
||||
def add_host(self, host):
|
||||
if host.uuid in self.host_data:
|
||||
if host.status == 'included':
|
||||
# concatenate task include output from multiple items
|
||||
host.result = '%s\n%s' % (self.host_data[host.uuid].result, host.result)
|
||||
else:
|
||||
return
|
||||
|
||||
self.host_data[host.uuid] = host
|
||||
|
||||
|
||||
class HostData:
|
||||
"""
|
||||
Data about an individual host.
|
||||
"""
|
||||
|
||||
def __init__(self, uuid, name, status, result):
|
||||
self.uuid = uuid
|
||||
self.name = name
|
||||
self.status = status
|
||||
self.result = result
|
||||
self.finish = time.time()
|
||||
|
||||
|
||||
class ElasticSource(object):
|
||||
def __init__(self, display):
|
||||
self.ansible_playbook = ""
|
||||
self.ansible_version = None
|
||||
self.session = str(uuid.uuid4())
|
||||
self.host = socket.gethostname()
|
||||
try:
|
||||
self.ip_address = socket.gethostbyname(socket.gethostname())
|
||||
except Exception as e:
|
||||
self.ip_address = None
|
||||
self.user = getpass.getuser()
|
||||
|
||||
self._display = display
|
||||
|
||||
def start_task(self, tasks_data, hide_task_arguments, play_name, task):
|
||||
""" record the start of a task for one or more hosts """
|
||||
|
||||
uuid = task._uuid
|
||||
|
||||
if uuid in tasks_data:
|
||||
return
|
||||
|
||||
name = task.get_name().strip()
|
||||
path = task.get_path()
|
||||
action = task.action
|
||||
args = None
|
||||
|
||||
if not task.no_log and not hide_task_arguments:
|
||||
args = ', '.join(('%s=%s' % a for a in task.args.items()))
|
||||
|
||||
tasks_data[uuid] = TaskData(uuid, name, path, play_name, action, args)
|
||||
|
||||
def finish_task(self, tasks_data, status, result):
|
||||
""" record the results of a task for a single host """
|
||||
|
||||
task_uuid = result._task._uuid
|
||||
|
||||
if hasattr(result, '_host') and result._host is not None:
|
||||
host_uuid = result._host._uuid
|
||||
host_name = result._host.name
|
||||
else:
|
||||
host_uuid = 'include'
|
||||
host_name = 'include'
|
||||
|
||||
task = tasks_data[task_uuid]
|
||||
|
||||
if self.ansible_version is None and result._task_fields['args'].get('_ansible_version'):
|
||||
self.ansible_version = result._task_fields['args'].get('_ansible_version')
|
||||
|
||||
task.add_host(HostData(host_uuid, host_name, status, result))
|
||||
|
||||
def generate_distributed_traces(self, tasks_data, status, end_time, traceparent, apm_service_name,
|
||||
apm_server_url, apm_verify_server_cert, apm_secret_token, apm_api_key):
|
||||
""" generate distributed traces from the collected TaskData and HostData """
|
||||
|
||||
tasks = []
|
||||
parent_start_time = None
|
||||
for task_uuid, task in tasks_data.items():
|
||||
if parent_start_time is None:
|
||||
parent_start_time = task.start
|
||||
tasks.append(task)
|
||||
|
||||
apm_cli = self.init_apm_client(apm_server_url, apm_service_name, apm_verify_server_cert, apm_secret_token, apm_api_key)
|
||||
if apm_cli:
|
||||
instrument() # Only call this once, as early as possible.
|
||||
if traceparent:
|
||||
parent = trace_parent_from_string(traceparent)
|
||||
apm_cli.begin_transaction("Session", trace_parent=parent, start=parent_start_time)
|
||||
else:
|
||||
apm_cli.begin_transaction("Session", start=parent_start_time)
|
||||
# Populate trace metadata attributes
|
||||
if self.ansible_version is not None:
|
||||
label(ansible_version=self.ansible_version)
|
||||
label(ansible_session=self.session, ansible_host_name=self.host, ansible_host_user=self.user)
|
||||
if self.ip_address is not None:
|
||||
label(ansible_host_ip=self.ip_address)
|
||||
|
||||
for task_data in tasks:
|
||||
for host_uuid, host_data in task_data.host_data.items():
|
||||
self.create_span_data(apm_cli, task_data, host_data)
|
||||
|
||||
apm_cli.end_transaction(name=__name__, result=status, duration=end_time - parent_start_time)
|
||||
|
||||
def create_span_data(self, apm_cli, task_data, host_data):
|
||||
""" create the span with the given TaskData and HostData """
|
||||
|
||||
name = '[%s] %s: %s' % (host_data.name, task_data.play, task_data.name)
|
||||
|
||||
message = "success"
|
||||
status = "success"
|
||||
if host_data.status == 'included':
|
||||
rc = 0
|
||||
else:
|
||||
res = host_data.result._result
|
||||
rc = res.get('rc', 0)
|
||||
if host_data.status == 'failed':
|
||||
if res.get('exception') is not None:
|
||||
message = res['exception'].strip().split('\n')[-1]
|
||||
elif 'msg' in res:
|
||||
message = res['msg']
|
||||
else:
|
||||
message = 'failed'
|
||||
status = "failure"
|
||||
elif host_data.status == 'skipped':
|
||||
if 'skip_reason' in res:
|
||||
message = res['skip_reason']
|
||||
else:
|
||||
message = 'skipped'
|
||||
status = "unknown"
|
||||
|
||||
with capture_span(task_data.name,
|
||||
start=task_data.start,
|
||||
span_type="ansible.task.run",
|
||||
duration=host_data.finish - task_data.start,
|
||||
labels={"ansible.task.args": task_data.args,
|
||||
"ansible.task.message": message,
|
||||
"ansible.task.module": task_data.action,
|
||||
"ansible.task.name": name,
|
||||
"ansible.task.result": rc,
|
||||
"ansible.task.host.name": host_data.name,
|
||||
"ansible.task.host.status": host_data.status}) as span:
|
||||
span.outcome = status
|
||||
if 'failure' in status:
|
||||
exception = AnsibleRuntimeError(message="{0}: {1} failed with error message {2}".format(task_data.action, name, message))
|
||||
apm_cli.capture_exception(exc_info=(type(exception), exception, exception.__traceback__), handled=True)
|
||||
|
||||
def init_apm_client(self, apm_server_url, apm_service_name, apm_verify_server_cert, apm_secret_token, apm_api_key):
|
||||
if apm_server_url:
|
||||
return Client(service_name=apm_service_name,
|
||||
server_url=apm_server_url,
|
||||
verify_server_cert=False,
|
||||
secret_token=apm_secret_token,
|
||||
api_key=apm_api_key,
|
||||
use_elastic_traceparent_header=True,
|
||||
debug=True)
|
||||
|
||||
|
||||
class CallbackModule(CallbackBase):
|
||||
"""
|
||||
This callback creates distributed traces with Elastic APM.
|
||||
"""
|
||||
|
||||
CALLBACK_VERSION = 2.0
|
||||
CALLBACK_TYPE = 'notification'
|
||||
CALLBACK_NAME = 'community.general.elastic'
|
||||
CALLBACK_NEEDS_ENABLED = True
|
||||
|
||||
def __init__(self, display=None):
|
||||
super(CallbackModule, self).__init__(display=display)
|
||||
self.hide_task_arguments = None
|
||||
self.apm_service_name = None
|
||||
self.ansible_playbook = None
|
||||
self.traceparent = False
|
||||
self.play_name = None
|
||||
self.tasks_data = None
|
||||
self.errors = 0
|
||||
self.disabled = False
|
||||
|
||||
if ELASTIC_LIBRARY_IMPORT_ERROR:
|
||||
raise_from(
|
||||
AnsibleError('The `elastic-apm` must be installed to use this plugin'),
|
||||
ELASTIC_LIBRARY_IMPORT_ERROR)
|
||||
|
||||
self.tasks_data = OrderedDict()
|
||||
|
||||
self.elastic = ElasticSource(display=self._display)
|
||||
|
||||
def set_options(self, task_keys=None, var_options=None, direct=None):
|
||||
super(CallbackModule, self).set_options(task_keys=task_keys,
|
||||
var_options=var_options,
|
||||
direct=direct)
|
||||
|
||||
self.hide_task_arguments = self.get_option('hide_task_arguments')
|
||||
|
||||
self.apm_service_name = self.get_option('apm_service_name')
|
||||
if not self.apm_service_name:
|
||||
self.apm_service_name = 'ansible'
|
||||
|
||||
self.apm_server_url = self.get_option('apm_server_url')
|
||||
self.apm_secret_token = self.get_option('apm_secret_token')
|
||||
self.apm_api_key = self.get_option('apm_api_key')
|
||||
self.apm_verify_server_cert = self.get_option('apm_verify_server_cert')
|
||||
self.traceparent = self.get_option('traceparent')
|
||||
|
||||
def v2_playbook_on_start(self, playbook):
|
||||
self.ansible_playbook = basename(playbook._file_name)
|
||||
|
||||
def v2_playbook_on_play_start(self, play):
|
||||
self.play_name = play.get_name()
|
||||
|
||||
def v2_runner_on_no_hosts(self, task):
|
||||
self.elastic.start_task(
|
||||
self.tasks_data,
|
||||
self.hide_task_arguments,
|
||||
self.play_name,
|
||||
task
|
||||
)
|
||||
|
||||
def v2_playbook_on_task_start(self, task, is_conditional):
|
||||
self.elastic.start_task(
|
||||
self.tasks_data,
|
||||
self.hide_task_arguments,
|
||||
self.play_name,
|
||||
task
|
||||
)
|
||||
|
||||
def v2_playbook_on_cleanup_task_start(self, task):
|
||||
self.elastic.start_task(
|
||||
self.tasks_data,
|
||||
self.hide_task_arguments,
|
||||
self.play_name,
|
||||
task
|
||||
)
|
||||
|
||||
def v2_playbook_on_handler_task_start(self, task):
|
||||
self.elastic.start_task(
|
||||
self.tasks_data,
|
||||
self.hide_task_arguments,
|
||||
self.play_name,
|
||||
task
|
||||
)
|
||||
|
||||
def v2_runner_on_failed(self, result, ignore_errors=False):
|
||||
self.errors += 1
|
||||
self.elastic.finish_task(
|
||||
self.tasks_data,
|
||||
'failed',
|
||||
result
|
||||
)
|
||||
|
||||
def v2_runner_on_ok(self, result):
|
||||
self.elastic.finish_task(
|
||||
self.tasks_data,
|
||||
'ok',
|
||||
result
|
||||
)
|
||||
|
||||
def v2_runner_on_skipped(self, result):
|
||||
self.elastic.finish_task(
|
||||
self.tasks_data,
|
||||
'skipped',
|
||||
result
|
||||
)
|
||||
|
||||
def v2_playbook_on_include(self, included_file):
|
||||
self.elastic.finish_task(
|
||||
self.tasks_data,
|
||||
'included',
|
||||
included_file
|
||||
)
|
||||
|
||||
def v2_playbook_on_stats(self, stats):
|
||||
if self.errors == 0:
|
||||
status = "success"
|
||||
else:
|
||||
status = "failure"
|
||||
self.elastic.generate_distributed_traces(
|
||||
self.tasks_data,
|
||||
status,
|
||||
time.time(),
|
||||
self.traceparent,
|
||||
self.apm_service_name,
|
||||
self.apm_server_url,
|
||||
self.apm_verify_server_cert,
|
||||
self.apm_secret_token,
|
||||
self.apm_api_key
|
||||
)
|
||||
|
||||
def v2_runner_on_async_failed(self, result, **kwargs):
|
||||
self.errors += 1
|
91
tests/unit/plugins/callback/test_elastic.py
Normal file
91
tests/unit/plugins/callback/test_elastic.py
Normal file
|
@ -0,0 +1,91 @@
|
|||
# (C) 2021, Victor Martinez <VictorMartinezRubio@gmail.com>
|
||||
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
|
||||
|
||||
from __future__ import (absolute_import, division, print_function)
|
||||
__metaclass__ = type
|
||||
|
||||
from ansible.playbook.task import Task
|
||||
from ansible.executor.task_result import TaskResult
|
||||
from ansible_collections.community.general.tests.unit.compat import unittest
|
||||
from ansible_collections.community.general.tests.unit.compat.mock import patch, MagicMock, Mock
|
||||
from ansible_collections.community.general.plugins.callback.elastic import ElasticSource, TaskData
|
||||
from collections import OrderedDict
|
||||
import sys
|
||||
|
||||
ELASTIC_MINIMUM_PYTHON_VERSION = (3, 6)
|
||||
|
||||
|
||||
class TestOpentelemetry(unittest.TestCase):
|
||||
@patch('ansible_collections.community.general.plugins.callback.elastic.socket')
|
||||
def setUp(self, mock_socket):
|
||||
if sys.version_info < ELASTIC_MINIMUM_PYTHON_VERSION:
|
||||
self.skipTest("Python %s+ is needed for Elastic" %
|
||||
",".join(map(str, ELASTIC_MINIMUM_PYTHON_VERSION)))
|
||||
mock_socket.gethostname.return_value = 'my-host'
|
||||
mock_socket.gethostbyname.return_value = '1.2.3.4'
|
||||
self.elastic = ElasticSource(display=None)
|
||||
self.task_fields = {'args': {}}
|
||||
self.mock_host = Mock('MockHost')
|
||||
self.mock_host.name = 'myhost'
|
||||
self.mock_host._uuid = 'myhost_uuid'
|
||||
self.mock_task = Task()
|
||||
self.mock_task.action = 'myaction'
|
||||
self.mock_task.no_log = False
|
||||
self.mock_task._role = 'myrole'
|
||||
self.mock_task._uuid = 'myuuid'
|
||||
self.mock_task.args = {}
|
||||
self.mock_task.get_name = MagicMock(return_value='mytask')
|
||||
self.mock_task.get_path = MagicMock(return_value='/mypath')
|
||||
self.my_task = TaskData('myuuid', 'mytask', '/mypath', 'myplay', 'myaction', '')
|
||||
self.my_task_result = TaskResult(host=self.mock_host, task=self.mock_task, return_data={}, task_fields=self.task_fields)
|
||||
|
||||
def test_start_task(self):
|
||||
tasks_data = OrderedDict()
|
||||
|
||||
self.elastic.start_task(
|
||||
tasks_data,
|
||||
False,
|
||||
'myplay',
|
||||
self.mock_task
|
||||
)
|
||||
|
||||
task_data = tasks_data['myuuid']
|
||||
self.assertEqual(task_data.uuid, 'myuuid')
|
||||
self.assertEqual(task_data.name, 'mytask')
|
||||
self.assertEqual(task_data.path, '/mypath')
|
||||
self.assertEqual(task_data.play, 'myplay')
|
||||
self.assertEqual(task_data.action, 'myaction')
|
||||
self.assertEqual(task_data.args, '')
|
||||
|
||||
def test_finish_task_with_a_host_match(self):
|
||||
tasks_data = OrderedDict()
|
||||
tasks_data['myuuid'] = self.my_task
|
||||
|
||||
self.elastic.finish_task(
|
||||
tasks_data,
|
||||
'ok',
|
||||
self.my_task_result
|
||||
)
|
||||
|
||||
task_data = tasks_data['myuuid']
|
||||
host_data = task_data.host_data['myhost_uuid']
|
||||
self.assertEqual(host_data.uuid, 'myhost_uuid')
|
||||
self.assertEqual(host_data.name, 'myhost')
|
||||
self.assertEqual(host_data.status, 'ok')
|
||||
|
||||
def test_finish_task_without_a_host_match(self):
|
||||
result = TaskResult(host=None, task=self.mock_task, return_data={}, task_fields=self.task_fields)
|
||||
tasks_data = OrderedDict()
|
||||
tasks_data['myuuid'] = self.my_task
|
||||
|
||||
self.elastic.finish_task(
|
||||
tasks_data,
|
||||
'ok',
|
||||
result
|
||||
)
|
||||
|
||||
task_data = tasks_data['myuuid']
|
||||
host_data = task_data.host_data['include']
|
||||
self.assertEqual(host_data.uuid, 'include')
|
||||
self.assertEqual(host_data.name, 'include')
|
||||
self.assertEqual(host_data.status, 'ok')
|
|
@ -33,3 +33,6 @@ dataclasses ; python_version == '3.6'
|
|||
opentelemetry-api ; python_version >= '3.6' and python_version < '3.10'
|
||||
opentelemetry-exporter-otlp ; python_version >= '3.6' and python_version < '3.10'
|
||||
opentelemetry-sdk ; python_version >= '3.6' and python_version < '3.10'
|
||||
|
||||
# requirement for the elastic callback plugin
|
||||
elastic-apm ; python_version >= '3.6'
|
||||
|
|
Loading…
Reference in a new issue