1
0
Fork 0
mirror of https://github.com/ansible-collections/community.general.git synced 2024-09-14 20:13:21 +02:00

Add opentelemetry callback plugin (#3091)

* Add opentelemetry callback plugin

* Apply suggestions from code review

Co-authored-by: Felix Fontein <felix@fontein.de>

* Formatting (text), booleans and renamed env variables

* This should be done in a future release

* Remove insecure in favour of the OTEL env variable. Add descriptions

* Use OpenTelemetrySource

* Move generate_distributed_traces

* Move update_span_data and set_span_attribute

* Move finish_task

* Move start_task

* Refactor to support UTs

* Add first UT

* Fix codestyle

* opentelemetry callback entry in the botmeta

* Fix linting

* Fix signature

* Mock methods

* Use MagicMock

* Mock the methods

* UT for transform_to_boolean_or_default

* Fix linting

* Set test data

* Mock _time_ns

* Exclude tests for python <= 3.6

* Remove obsoleted setup task type configuration

* Remove unused docs

* Apply suggestions from code review

Co-authored-by: Felix Fontein <felix@fontein.de>

* Fix docs

* unrequired logic that was originally took from https://github.com/ansible/ansible/blob/devel/lib/ansible/plugins/callback/junit.py\#L226

* Use raise_from for the required dependencies

* Fix linting

* Add requirements for the UTs

* add missing dependency for the opentelemetry plugin in the UTs

* Add ANSIBLE_ prefix for the ansible specific options

* Add more context in the docs and remove duplicated docs

* As suggested in the code review

* Verify if the OTEL env variables for the endpoint were set

* Fix docs typo

* Fix linting

* Revert "Fix linting"

This reverts commit 3a54c827c5472553a6baf5598bc76a0f63f020c1.

* Revert "Verify if the OTEL env variables for the endpoint were set"

This reverts commit cab9d8648899c28c0345745690c4ec7a41f7e680.

* Remove console_output as suggested

* Apply suggestions from code review

Co-authored-by: flowerysong <junk+github@flowerysong.com>

* Delegate the definition of OTEL_EXPORTER_OTLP_INSECURE to the user

* Move definitions above, close to the class that uses them

Co-authored-by: Felix Fontein <felix@fontein.de>
Co-authored-by: flowerysong <junk+github@flowerysong.com>
This commit is contained in:
Victor Martinez 2021-09-14 20:05:02 +01:00 committed by GitHub
parent dc8d076a25
commit 517570a64f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 502 additions and 0 deletions

3
.github/BOTMETA.yml vendored
View file

@ -62,6 +62,9 @@ files:
$callbacks/nrdp.py:
maintainers: rverchere
$callbacks/null.py: {}
$callbacks/opentelemetry.py:
maintainers: v1v
keywords: opentelemetry observability
$callbacks/say.py:
notify: chris-short
maintainers: $team_macos

View file

@ -0,0 +1,401 @@
# (C) 2021, Victor Martinez <VictorMartinezRubio@gmail.com>
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
DOCUMENTATION = '''
author: Victor Martinez (@v1v) <VictorMartinezRubio@gmail.com>
name: opentelemetry
type: notification
short_description: Create distributed traces with OpenTelemetry
version_added: 3.7.0
description:
- This callback creates distributed traces for each Ansible task with OpenTelemetry.
- You can configure the OpenTelemetry exporter and SDK with environment variables.
- See U(https://opentelemetry-python.readthedocs.io/en/latest/exporter/otlp/otlp.html).
- See U(https://opentelemetry-python.readthedocs.io/en/latest/sdk/environment_variables.html#opentelemetry-sdk-environment-variables).
options:
hide_task_arguments:
default: false
type: bool
description:
- Hide the arguments for a task.
env:
- name: ANSIBLE_OPENTELEMETRY_HIDE_TASK_ARGUMENTS
otel_service_name:
default: ansible
type: str
description:
- The service name resource attribute.
env:
- name: OTEL_SERVICE_NAME
requirements:
- opentelemetry-api (python lib)
- opentelemetry-exporter-otlp (python lib)
- opentelemetry-sdk (python lib)
'''
EXAMPLES = '''
examples: |
Enable the plugin in ansible.cfg:
[defaults]
callbacks_enabled = community.general.opentelemetry
Set the environment variable:
export OTEL_EXPORTER_OTLP_ENDPOINT=<your endpoint (OTLP/HTTP)>
export OTEL_EXPORTER_OTLP_HEADERS="authorization=Bearer your_otel_token"
export OTEL_SERVICE_NAME=your_service_name
'''
import getpass
import os
import socket
import sys
import time
import uuid
from os.path import basename
from ansible.errors import AnsibleError
from ansible.module_utils.six import raise_from
from ansible.plugins.callback import CallbackBase
try:
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.trace.status import Status, StatusCode
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import (
ConsoleSpanExporter,
SimpleSpanProcessor,
BatchSpanProcessor
)
from opentelemetry.util._time import _time_ns
except ImportError as imp_exc:
OTEL_LIBRARY_IMPORT_ERROR = imp_exc
else:
OTEL_LIBRARY_IMPORT_ERROR = None
try:
from collections import OrderedDict
except ImportError:
try:
from ordereddict import OrderedDict
except ImportError as imp_exc:
ORDER_LIBRARY_IMPORT_ERROR = imp_exc
else:
ORDER_LIBRARY_IMPORT_ERROR = None
else:
ORDER_LIBRARY_IMPORT_ERROR = None
class TaskData:
"""
Data about an individual task.
"""
def __init__(self, uuid, name, path, play, action, args):
self.uuid = uuid
self.name = name
self.path = path
self.play = play
self.host_data = OrderedDict()
if sys.version_info >= (3, 7):
self.start = time.time_ns()
else:
self.start = _time_ns()
self.action = action
self.args = args
def add_host(self, host):
if host.uuid in self.host_data:
if host.status == 'included':
# concatenate task include output from multiple items
host.result = '%s\n%s' % (self.host_data[host.uuid].result, host.result)
else:
return
self.host_data[host.uuid] = host
class HostData:
"""
Data about an individual host.
"""
def __init__(self, uuid, name, status, result):
self.uuid = uuid
self.name = name
self.status = status
self.result = result
if sys.version_info >= (3, 7):
self.finish = time.time_ns()
else:
self.finish = _time_ns()
class OpenTelemetrySource(object):
def __init__(self, display):
self.ansible_playbook = ""
self.ansible_version = None
self.session = str(uuid.uuid4())
self.host = socket.gethostname()
try:
self.ip_address = socket.gethostbyname(socket.gethostname())
except Exception as e:
self.ip_address = None
self.user = getpass.getuser()
self._display = display
def start_task(self, tasks_data, hide_task_arguments, play_name, task):
""" record the start of a task for one or more hosts """
uuid = task._uuid
if uuid in tasks_data:
return
name = task.get_name().strip()
path = task.get_path()
action = task.action
args = None
if not task.no_log and not hide_task_arguments:
args = ', '.join(('%s=%s' % a for a in task.args.items()))
tasks_data[uuid] = TaskData(uuid, name, path, play_name, action, args)
def finish_task(self, tasks_data, status, result):
""" record the results of a task for a single host """
task_uuid = result._task._uuid
if hasattr(result, '_host') and result._host is not None:
host_uuid = result._host._uuid
host_name = result._host.name
else:
host_uuid = 'include'
host_name = 'include'
task = tasks_data[task_uuid]
if self.ansible_version is None and result._task_fields['args'].get('_ansible_version'):
self.ansible_version = result._task_fields['args'].get('_ansible_version')
task.add_host(HostData(host_uuid, host_name, status, result))
def generate_distributed_traces(self, otel_service_name, ansible_playbook, tasks_data, status):
""" generate distributed traces from the collected TaskData and HostData """
tasks = []
parent_start_time = None
for task_uuid, task in tasks_data.items():
if parent_start_time is None:
parent_start_time = task.start
tasks.append(task)
trace.set_tracer_provider(
TracerProvider(
resource=Resource.create({SERVICE_NAME: otel_service_name})
)
)
processor = BatchSpanProcessor(OTLPSpanExporter())
trace.get_tracer_provider().add_span_processor(processor)
tracer = trace.get_tracer(__name__)
with tracer.start_as_current_span(ansible_playbook, start_time=parent_start_time) as parent:
parent.set_status(status)
# Populate trace metadata attributes
if self.ansible_version is not None:
parent.set_attribute("ansible.version", self.ansible_version)
parent.set_attribute("ansible.session", self.session)
parent.set_attribute("ansible.host.name", self.host)
if self.ip_address is not None:
parent.set_attribute("ansible.host.ip", self.ip_address)
parent.set_attribute("ansible.host.user", self.user)
for task in tasks:
for host_uuid, host_data in task.host_data.items():
with tracer.start_as_current_span(task.name, start_time=task.start, end_on_exit=False) as span:
self.update_span_data(task, host_data, span)
def update_span_data(self, task_data, host_data, span):
""" update the span with the given TaskData and HostData """
name = '[%s] %s: %s' % (host_data.name, task_data.play, task_data.name)
message = 'success'
status = Status(status_code=StatusCode.OK)
if host_data.status == 'included':
rc = 0
else:
res = host_data.result._result
rc = res.get('rc', 0)
if host_data.status == 'failed':
if 'exception' in res:
message = res['exception'].strip().split('\n')[-1]
elif 'msg' in res:
message = res['msg']
else:
message = 'failed'
status = Status(status_code=StatusCode.ERROR)
elif host_data.status == 'skipped':
if 'skip_reason' in res:
message = res['skip_reason']
else:
message = 'skipped'
status = Status(status_code=StatusCode.UNSET)
span.set_status(status)
self.set_span_attribute(span, "ansible.task.args", task_data.args)
self.set_span_attribute(span, "ansible.task.module", task_data.action)
self.set_span_attribute(span, "ansible.task.message", message)
self.set_span_attribute(span, "ansible.task.name", name)
self.set_span_attribute(span, "ansible.task.result", rc)
self.set_span_attribute(span, "ansible.task.host.name", host_data.name)
self.set_span_attribute(span, "ansible.task.host.status", host_data.status)
span.end(end_time=host_data.finish)
def set_span_attribute(self, span, attributeName, attributeValue):
""" update the span attribute with the given attribute and value if not None """
if span is None and self._display is not None:
self._display.warning('span object is None. Please double check if that is expected.')
else:
if attributeValue is not None:
span.set_attribute(attributeName, attributeValue)
class CallbackModule(CallbackBase):
"""
This callback creates distributed traces.
"""
CALLBACK_VERSION = 2.0
CALLBACK_TYPE = 'notification'
CALLBACK_NAME = 'community.general.opentelemetry'
CALLBACK_NEEDS_ENABLED = True
def __init__(self, display=None):
super(CallbackModule, self).__init__(display=display)
self.hide_task_arguments = None
self.otel_service_name = None
self.ansible_playbook = None
self.play_name = None
self.tasks_data = None
self.errors = 0
self.disabled = False
if OTEL_LIBRARY_IMPORT_ERROR:
raise_from(
AnsibleError('The `opentelemetry-api`, `opentelemetry-exporter-otlp` or `opentelemetry-sdk` must be installed to use this plugin'),
OTEL_LIBRARY_IMPORT_ERROR)
if ORDER_LIBRARY_IMPORT_ERROR:
raise_from(
AnsibleError('The `ordereddict` must be installed to use this plugin'),
ORDER_LIBRARY_IMPORT_ERROR)
else:
self.tasks_data = OrderedDict()
self.opentelemetry = OpenTelemetrySource(display=self._display)
def set_options(self, task_keys=None, var_options=None, direct=None):
super(CallbackModule, self).set_options(task_keys=task_keys,
var_options=var_options,
direct=direct)
self.hide_task_arguments = self.get_option('hide_task_arguments')
self.otel_service_name = self.get_option('otel_service_name')
if not self.otel_service_name:
self.otel_service_name = 'ansible'
def v2_playbook_on_start(self, playbook):
self.ansible_playbook = basename(playbook._file_name)
def v2_playbook_on_play_start(self, play):
self.play_name = play.get_name()
def v2_runner_on_no_hosts(self, task):
self.opentelemetry.start_task(
self.tasks_data,
self.hide_task_arguments,
self.play_name,
task
)
def v2_playbook_on_task_start(self, task, is_conditional):
self.opentelemetry.start_task(
self.tasks_data,
self.hide_task_arguments,
self.play_name,
task
)
def v2_playbook_on_cleanup_task_start(self, task):
self.opentelemetry.start_task(
self.tasks_data,
self.hide_task_arguments,
self.play_name,
task
)
def v2_playbook_on_handler_task_start(self, task):
self.opentelemetry.start_task(
self.tasks_data,
self.hide_task_arguments,
self.play_name,
task
)
def v2_runner_on_failed(self, result, ignore_errors=False):
self.errors += 1
self.opentelemetry.finish_task(
self.tasks_data,
'failed',
result
)
def v2_runner_on_ok(self, result):
self.opentelemetry.finish_task(
self.tasks_data,
'ok',
result
)
def v2_runner_on_skipped(self, result):
self.opentelemetry.finish_task(
self.tasks_data,
'skipped',
result
)
def v2_playbook_on_include(self, included_file):
self.opentelemetry.finish_task(
self.tasks_data,
'included',
included_file
)
def v2_playbook_on_stats(self, stats):
if self.errors == 0:
status = Status(status_code=StatusCode.OK)
else:
status = Status(status_code=StatusCode.ERROR)
self.opentelemetry.generate_distributed_traces(
self.otel_service_name,
self.ansible_playbook,
self.tasks_data,
status
)
def v2_runner_on_async_failed(self, result, **kwargs):
self.errors += 1

View file

@ -0,0 +1,93 @@
# (C) 2021, Victor Martinez <VictorMartinezRubio@gmail.com>
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
from ansible.playbook.task import Task
from ansible.executor.task_result import TaskResult
from ansible_collections.community.general.tests.unit.compat import unittest
from ansible_collections.community.general.tests.unit.compat.mock import patch, MagicMock, Mock
from ansible_collections.community.general.plugins.callback.opentelemetry import OpenTelemetrySource, TaskData, CallbackModule
from collections import OrderedDict
import sys
OPENTELEMETRY_MINIMUM_PYTHON_VERSION = (3, 7)
class TestOpentelemetry(unittest.TestCase):
@patch('ansible_collections.community.general.plugins.callback.opentelemetry.socket')
def setUp(self, mock_socket):
# TODO: this python version validation won't be needed as long as the _time_ns call is mocked.
if sys.version_info < OPENTELEMETRY_MINIMUM_PYTHON_VERSION:
self.skipTest("Python %s+ is needed for OpenTelemetry" %
",".join(map(str, OPENTELEMETRY_MINIMUM_PYTHON_VERSION)))
mock_socket.gethostname.return_value = 'my-host'
mock_socket.gethostbyname.return_value = '1.2.3.4'
self.opentelemetry = OpenTelemetrySource(display=None)
self.task_fields = {'args': {}}
self.mock_host = Mock('MockHost')
self.mock_host.name = 'myhost'
self.mock_host._uuid = 'myhost_uuid'
self.mock_task = Task()
self.mock_task.action = 'myaction'
self.mock_task.no_log = False
self.mock_task._role = 'myrole'
self.mock_task._uuid = 'myuuid'
self.mock_task.args = {}
self.mock_task.get_name = MagicMock(return_value='mytask')
self.mock_task.get_path = MagicMock(return_value='/mypath')
self.my_task = TaskData('myuuid', 'mytask', '/mypath', 'myplay', 'myaction', '')
self.my_task_result = TaskResult(host=self.mock_host, task=self.mock_task, return_data={}, task_fields=self.task_fields)
def test_start_task(self):
tasks_data = OrderedDict()
self.opentelemetry.start_task(
tasks_data,
False,
'myplay',
self.mock_task
)
task_data = tasks_data['myuuid']
self.assertEqual(task_data.uuid, 'myuuid')
self.assertEqual(task_data.name, 'mytask')
self.assertEqual(task_data.path, '/mypath')
self.assertEqual(task_data.play, 'myplay')
self.assertEqual(task_data.action, 'myaction')
self.assertEqual(task_data.args, '')
def test_finish_task_with_a_host_match(self):
tasks_data = OrderedDict()
tasks_data['myuuid'] = self.my_task
self.opentelemetry.finish_task(
tasks_data,
'ok',
self.my_task_result
)
task_data = tasks_data['myuuid']
host_data = task_data.host_data['myhost_uuid']
self.assertEqual(host_data.uuid, 'myhost_uuid')
self.assertEqual(host_data.name, 'myhost')
self.assertEqual(host_data.status, 'ok')
def test_finish_task_without_a_host_match(self):
result = TaskResult(host=None, task=self.mock_task, return_data={}, task_fields=self.task_fields)
tasks_data = OrderedDict()
tasks_data['myuuid'] = self.my_task
self.opentelemetry.finish_task(
tasks_data,
'ok',
result
)
task_data = tasks_data['myuuid']
host_data = task_data.host_data['include']
self.assertEqual(host_data.uuid, 'include')
self.assertEqual(host_data.name, 'include')
self.assertEqual(host_data.status, 'ok')

View file

@ -26,3 +26,8 @@ datadog-api-client >= 1.0.0b3 ; python_version >= '3.6'
# requirement for dnsimple module
dnsimple >= 2 ; python_version >= '3.6'
dataclasses ; python_version == '3.6'
# requirement for the opentelemetry callback plugin
opentelemetry-api ; python_version >= '3.6'
opentelemetry-exporter-otlp ; python_version >= '3.6'
opentelemetry-sdk ; python_version >= '3.6'