From 85277c8aae4e65690a4acf864822bcae0e7f99ca Mon Sep 17 00:00:00 2001 From: Jimmy Tang Date: Thu, 24 Sep 2015 16:51:51 +0100 Subject: [PATCH 1/5] Initial add of logentries callback plugin This callback plugin will generate json objects to be sent to the logentries service for auditing/debugging purposes. To use: Add this to your ansible.cfg file in the defaults block [defaults] callback_plugins = ./callback_plugins callback_stdout = logentries callback_whitelist = logentries Copy the callback plugin into the callback_plugings directory Either set the environment variables export LOGENTRIES_API=data.logentries.com export LOGENTRIES_PORT=10000 export LOGENTRIES_ANSIBLE_TOKEN=dd21fc88-f00a-43ff-b977-e3a4233c53af Or create a logentries.ini config file that sites next to the plugin with the following contents [logentries] api = data.logentries.com port = 10000 tls_port = 20000 use_tls = no token = dd21fc88-f00a-43ff-b977-e3a4233c53af --- lib/ansible/plugins/callback/logentries.py | 336 +++++++++++++++++++++ 1 file changed, 336 insertions(+) create mode 100644 lib/ansible/plugins/callback/logentries.py diff --git a/lib/ansible/plugins/callback/logentries.py b/lib/ansible/plugins/callback/logentries.py new file mode 100644 index 0000000000..3d5346952f --- /dev/null +++ b/lib/ansible/plugins/callback/logentries.py @@ -0,0 +1,336 @@ +""" +(c) 2015, Logentries.com +Author: Jimmy Tang + +This callback plugin will generate json objects to be sent to logentries +for auditing/debugging purposes. + +Todo: + +* Better formatting of output before sending out to logentries data/api nodes. + +To use: + +Add this to your ansible.cfg file in the defaults block + + [defaults] + callback_plugins = ./callback_plugins + callback_stdout = logentries + callback_whitelist = logentries + +Copy the callback plugin into the callback_plugings directory + +Either set the environment variables + + export LOGENTRIES_API=data.logentries.com + export LOGENTRIES_PORT=10000 + export LOGENTRIES_ANSIBLE_TOKEN=dd21fc88-f00a-43ff-b977-e3a4233c53af + +Or create a logentries.ini config file that sites next to the plugin with the following contents + + [logentries] + api = data.logentries.com + port = 10000 + tls_port = 20000 + use_tls = no + token = dd21fc88-f00a-43ff-b977-e3a4233c53af + + +""" + +import os +import threading +import socket +import random +import time +import codecs +import Queue +import ConfigParser +import uuid +try: + import certifi +except ImportError: + print("please do 'pip install certifi'") + +try: + import flatdict +except ImportError: + print("please do 'pip install flatdict'") + +from ansible.plugins.callback import CallbackBase + + +def to_unicode(ch): + return codecs.unicode_escape_decode(ch)[0] + + +def is_unicode(ch): + return isinstance(ch, unicode) + + +def create_unicode(ch): + return unicode(ch, 'utf-8') + + +class PlainTextSocketAppender(threading.Thread): + def __init__(self, + verbose=True, + LE_API='data.logentries.com', + LE_PORT=80, + LE_TLS_PORT=443): + threading.Thread.__init__(self) + + self.QUEUE_SIZE = 32768 + self.LE_API = LE_API + self.LE_PORT = LE_PORT + self.LE_TLS_PORT = LE_TLS_PORT + self.MIN_DELAY = 0.1 + self.MAX_DELAY = 10 + # Error message displayed when an incorrect Token has been detected + self.INVALID_TOKEN = ("\n\nIt appears the LOGENTRIES_TOKEN " + "parameter you entered is incorrect!\n\n") + # Unicode Line separator character \u2028 + self.LINE_SEP = to_unicode('\u2028') + + self.daemon = True + self.verbose = verbose + self._conn = None + self._queue = Queue.Queue(self.QUEUE_SIZE) + + def empty(self): + return self._queue.empty() + + def open_connection(self): + self._conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self._conn.connect((self.LE_API, self.LE_PORT)) + + def reopen_connection(self): + self.close_connection() + + root_delay = self.MIN_DELAY + while True: + try: + self.open_connection() + return + except Exception: + if self.verbose: + print("Unable to connect to Logentries") + + root_delay *= 2 + if (root_delay > self.MAX_DELAY): + root_delay = self.MAX_DELAY + + wait_for = root_delay + random.uniform(0, root_delay) + + try: + time.sleep(wait_for) + except KeyboardInterrupt: + raise + + def close_connection(self): + if self._conn is not None: + self._conn.close() + + def run(self): + try: + # Open connection + self.reopen_connection() + + # Send data in queue + while True: + # Take data from queue + data = self._queue.get(block=True) + + # Replace newlines with Unicode line separator + # for multi-line events + if not is_unicode(data): + multiline = create_unicode(data).replace( + '\n', self.LINE_SEP) + else: + multiline = data.replace('\n', self.LINE_SEP) + multiline += "\n" + # Send data, reconnect if needed + while True: + try: + self._conn.send(multiline.encode('utf-8')) + except socket.error: + self.reopen_connection() + continue + break + except KeyboardInterrupt: + if self.verbose: + print("Logentries asynchronous socket client interrupted") + + self.close_connection() + + +try: + import ssl +except ImportError: # for systems without TLS support. + SocketAppender = PlainTextSocketAppender + print("Unable to import ssl module. Will send over port 80.") +else: + + class TLSSocketAppender(PlainTextSocketAppender): + def open_connection(self): + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock = ssl.wrap_socket( + sock=sock, + keyfile=None, + certfile=None, + server_side=False, + cert_reqs=ssl.CERT_REQUIRED, + ssl_version=getattr( + ssl, 'PROTOCOL_TLSv1_2', ssl.PROTOCOL_TLSv1), + ca_certs=certifi.where(), + do_handshake_on_connect=True, + suppress_ragged_eofs=True, ) + sock.connect((self.LE_API, self.LE_TLS_PORT)) + self._conn = sock + + SocketAppender = TLSSocketAppender + + +class CallbackModule(CallbackBase): + CALLBACK_VERSION = 2.0 + CALLBACK_TYPE = 'notification' + CALLBACK_NAME = 'logentries' + + def __init__(self, display): + super(CallbackModule, self).__init__(display) + + config_path = os.path.abspath(os.path.dirname(__file__)) + config = ConfigParser.ConfigParser() + try: + config.readfp(open(os.path.join(config_path, 'logentries.ini'))) + if config.has_option('logentries', 'api'): + self.api_uri = config.get('logentries', 'api') + if config.has_option('logentries', 'port'): + self.api_port = config.getint('logentries', 'port') + if config.has_option('logentries', 'tls_port'): + self.api_tls_port = config.getint('logentries', 'tls_port') + if config.has_option('logentries', 'use_tls'): + self.use_tls = config.getboolean('logentries', 'use_tls') + if config.has_option('logentries', 'token'): + self.token = config.get('logentries', 'token') + except: + self.api_uri = os.getenv('LOGENTRIES_API') + if self.api_uri is None: + self.api_uri = 'data.logentries.com' + + try: + self.api_port = int(os.getenv('LOGENTRIES_PORT')) + if self.api_port is None: + self.api_port = 80 + except TypeError: + self.api_port = 80 + + try: + self.api_tls_port = int(os.getenv('LOGENTRIES_TLS_PORT')) + if self.api_tls_port is None: + self.api_tls_port = 443 + except TypeError: + self.api_tls_port = 443 + + # this just needs to be set to use TLS + self.use_tls = os.getenv('LOGENTRIES_USE_TLS') + if self.use_tls is None: + self.use_tls = False + elif self.use_tls.lower() in ['yes', 'true']: + self.use_tls = True + + self.token = os.getenv('LOGENTRIES_ANSIBLE_TOKEN') + if self.token is None: + self.disabled = True + self._display.warning( + 'Logentries token could not be loaded. The logentries token can be provided using the `LOGENTRIES_TOKEN` environment variable') + + self.verbose = False + self.timeout = 10 + self.le_jobid = str(uuid.uuid4()) + + if self.use_tls: + self._thread = TLSSocketAppender(verbose=self.verbose, + LE_API=self.api_uri, + LE_TLS_PORT=self.api_tls_port) + else: + self._thread = PlainTextSocketAppender(verbose=self.verbose, + LE_API=self.api_uri, + LE_PORT=self.api_port) + + def emit(self, record): + if not self._thread.is_alive(): + try: + self._thread.start() + if self.verbose: + print("Starting Logentries Asynchronous Socket Appender") + except RuntimeError: # It's already started. + if not self._thread.is_alive(): + raise + + msg = record.rstrip('\n') + msg = "{} {}".format(self.token, msg) + self._thread._queue.put(msg) + + def runner_on_ok(self, host, res): + results = {} + results['le_jobid'] = self.le_jobid + results['hostname'] = host + results['results'] = res + results['status'] = 'OK' + results = flatdict.FlatDict(results) + self.emit(self._dump_results(results)) + + def runner_on_failed(self, host, res, ignore_errors=False): + results = {} + results['le_jobid'] = self.le_jobid + results['hostname'] = host + results['results'] = res + results['status'] = 'FAILED' + results = flatdict.FlatDict(results) + self.emit(self._dump_results(results)) + + def runner_on_skipped(self, host, item=None): + results = {} + results['le_jobid'] = self.le_jobid + results['hostname'] = host + results['status'] = 'SKIPPED' + results = flatdict.FlatDict(results) + self.emit(self._dump_results(results)) + + def runner_on_unreachable(self, host, res): + results = {} + results['le_jobid'] = self.le_jobid + results['hostname'] = host + results['results'] = res + results['status'] = 'UNREACHABLE' + results = flatdict.FlatDict(results) + self.emit(self._dump_results(results)) + + def runner_on_async_failed(self, host, res, jid): + results = {} + results['le_jobid'] = self.le_jobid + results['hostname'] = host + results['results'] = res + results['jid'] = jid + results['status'] = 'ASYNC_FAILED' + results = flatdict.FlatDict(results) + self.emit(self._dump_results(results)) + + def v2_playbook_on_play_start(self, play): + results = {} + results['le_jobid'] = self.le_jobid + results['started_by'] = os.getlogin() + if play.name: + results['play'] = play.name + results['hosts'] = play.hosts + results = flatdict.FlatDict(results) + self.emit(self._dump_results(results)) + + def playbook_on_stats(self, stats): + """ flush out queue of messages """ + now = time.time() + while not self._thread.empty(): + time.sleep(0.2) + if time.time() - now > self.timeout: + break From c02ceb8f123c19caefecaa178cef9e2be7ab687a Mon Sep 17 00:00:00 2001 From: Jimmy Tang Date: Wed, 28 Oct 2015 16:31:37 +0000 Subject: [PATCH 2/5] Remove threading and queues. Added license information and cleaned up callback. --- lib/ansible/plugins/callback/logentries.py | 144 ++++++++++----------- 1 file changed, 67 insertions(+), 77 deletions(-) diff --git a/lib/ansible/plugins/callback/logentries.py b/lib/ansible/plugins/callback/logentries.py index 3d5346952f..746c9e08ba 100644 --- a/lib/ansible/plugins/callback/logentries.py +++ b/lib/ansible/plugins/callback/logentries.py @@ -1,6 +1,19 @@ -""" -(c) 2015, Logentries.com -Author: Jimmy Tang +""" (c) 2015, Logentries.com, Jimmy Tang + +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see . This callback plugin will generate json objects to be sent to logentries for auditing/debugging purposes. @@ -18,7 +31,7 @@ Add this to your ansible.cfg file in the defaults block callback_stdout = logentries callback_whitelist = logentries -Copy the callback plugin into the callback_plugings directory +Copy the callback plugin into the callback_plugins directory Either set the environment variables @@ -34,17 +47,16 @@ Or create a logentries.ini config file that sites next to the plugin with the fo tls_port = 20000 use_tls = no token = dd21fc88-f00a-43ff-b977-e3a4233c53af + flatten = False """ import os -import threading import socket import random import time import codecs -import Queue import ConfigParser import uuid try: @@ -72,15 +84,13 @@ def create_unicode(ch): return unicode(ch, 'utf-8') -class PlainTextSocketAppender(threading.Thread): +class PlainTextSocketAppender(object): def __init__(self, verbose=True, LE_API='data.logentries.com', LE_PORT=80, LE_TLS_PORT=443): - threading.Thread.__init__(self) - self.QUEUE_SIZE = 32768 self.LE_API = LE_API self.LE_PORT = LE_PORT self.LE_TLS_PORT = LE_TLS_PORT @@ -92,13 +102,8 @@ class PlainTextSocketAppender(threading.Thread): # Unicode Line separator character \u2028 self.LINE_SEP = to_unicode('\u2028') - self.daemon = True self.verbose = verbose self._conn = None - self._queue = Queue.Queue(self.QUEUE_SIZE) - - def empty(self): - return self._queue.empty() def open_connection(self): self._conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM) @@ -131,35 +136,22 @@ class PlainTextSocketAppender(threading.Thread): if self._conn is not None: self._conn.close() - def run(self): - try: - # Open connection - self.reopen_connection() - - # Send data in queue - while True: - # Take data from queue - data = self._queue.get(block=True) - - # Replace newlines with Unicode line separator - # for multi-line events - if not is_unicode(data): - multiline = create_unicode(data).replace( - '\n', self.LINE_SEP) - else: - multiline = data.replace('\n', self.LINE_SEP) - multiline += "\n" - # Send data, reconnect if needed - while True: - try: - self._conn.send(multiline.encode('utf-8')) - except socket.error: - self.reopen_connection() - continue - break - except KeyboardInterrupt: - if self.verbose: - print("Logentries asynchronous socket client interrupted") + def put(self, data): + # Replace newlines with Unicode line separator + # for multi-line events + if not is_unicode(data): + multiline = create_unicode(data).replace('\n', self.LINE_SEP) + else: + multiline = data.replace('\n', self.LINE_SEP) + multiline += "\n" + # Send data, reconnect if needed + while True: + try: + self._conn.send(multiline.encode('utf-8')) + except socket.error: + self.reopen_connection() + continue + break self.close_connection() @@ -213,6 +205,9 @@ class CallbackModule(CallbackBase): self.use_tls = config.getboolean('logentries', 'use_tls') if config.has_option('logentries', 'token'): self.token = config.get('logentries', 'token') + if config.has_option('logentries', 'flatten'): + self.flatten = config.getboolean('logentries', 'flatten') + except: self.api_uri = os.getenv('LOGENTRIES_API') if self.api_uri is None: @@ -245,32 +240,37 @@ class CallbackModule(CallbackBase): self._display.warning( 'Logentries token could not be loaded. The logentries token can be provided using the `LOGENTRIES_TOKEN` environment variable') + self.flatten = os.getenv('LOGENTRIES_FLATTEN') + if self.flatten is None: + self.flatten = False + elif self.flatten.lower() in ['yes', 'true']: + self.flatten = True + self.verbose = False self.timeout = 10 self.le_jobid = str(uuid.uuid4()) if self.use_tls: - self._thread = TLSSocketAppender(verbose=self.verbose, - LE_API=self.api_uri, - LE_TLS_PORT=self.api_tls_port) + self._appender = TLSSocketAppender(verbose=self.verbose, + LE_API=self.api_uri, + LE_TLS_PORT=self.api_tls_port) else: - self._thread = PlainTextSocketAppender(verbose=self.verbose, - LE_API=self.api_uri, - LE_PORT=self.api_port) + self._appender = PlainTextSocketAppender(verbose=self.verbose, + LE_API=self.api_uri, + LE_PORT=self.api_port) + self._appender.reopen_connection() + + def emit_formatted(self, record): + if self.flatten: + results = flatdict.FlatDict(record) + self.emit(self._dump_results(results)) + else: + self.emit(self._dump_results(record)) def emit(self, record): - if not self._thread.is_alive(): - try: - self._thread.start() - if self.verbose: - print("Starting Logentries Asynchronous Socket Appender") - except RuntimeError: # It's already started. - if not self._thread.is_alive(): - raise - msg = record.rstrip('\n') msg = "{} {}".format(self.token, msg) - self._thread._queue.put(msg) + self._appender.put(msg) def runner_on_ok(self, host, res): results = {} @@ -278,8 +278,7 @@ class CallbackModule(CallbackBase): results['hostname'] = host results['results'] = res results['status'] = 'OK' - results = flatdict.FlatDict(results) - self.emit(self._dump_results(results)) + self.emit_formatted(results) def runner_on_failed(self, host, res, ignore_errors=False): results = {} @@ -287,16 +286,14 @@ class CallbackModule(CallbackBase): results['hostname'] = host results['results'] = res results['status'] = 'FAILED' - results = flatdict.FlatDict(results) - self.emit(self._dump_results(results)) + self.emit_formatted(results) def runner_on_skipped(self, host, item=None): results = {} results['le_jobid'] = self.le_jobid results['hostname'] = host results['status'] = 'SKIPPED' - results = flatdict.FlatDict(results) - self.emit(self._dump_results(results)) + self.emit_formatted(results) def runner_on_unreachable(self, host, res): results = {} @@ -304,8 +301,7 @@ class CallbackModule(CallbackBase): results['hostname'] = host results['results'] = res results['status'] = 'UNREACHABLE' - results = flatdict.FlatDict(results) - self.emit(self._dump_results(results)) + self.emit_formatted(results) def runner_on_async_failed(self, host, res, jid): results = {} @@ -314,8 +310,7 @@ class CallbackModule(CallbackBase): results['results'] = res results['jid'] = jid results['status'] = 'ASYNC_FAILED' - results = flatdict.FlatDict(results) - self.emit(self._dump_results(results)) + self.emit_formatted(results) def v2_playbook_on_play_start(self, play): results = {} @@ -324,13 +319,8 @@ class CallbackModule(CallbackBase): if play.name: results['play'] = play.name results['hosts'] = play.hosts - results = flatdict.FlatDict(results) - self.emit(self._dump_results(results)) + self.emit_formatted(results) def playbook_on_stats(self, stats): - """ flush out queue of messages """ - now = time.time() - while not self._thread.empty(): - time.sleep(0.2) - if time.time() - now > self.timeout: - break + """ close connection """ + self._appender.close_connection() From 5f2f5e2b59608cec99a62e15c7a9b4fb5a63a74a Mon Sep 17 00:00:00 2001 From: Jimmy Tang Date: Sat, 14 Nov 2015 08:53:40 +0000 Subject: [PATCH 3/5] Add boilerplate and fix initialisation to match what 2.0 expects --- lib/ansible/plugins/callback/logentries.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/lib/ansible/plugins/callback/logentries.py b/lib/ansible/plugins/callback/logentries.py index 746c9e08ba..7195cca6f1 100644 --- a/lib/ansible/plugins/callback/logentries.py +++ b/lib/ansible/plugins/callback/logentries.py @@ -52,6 +52,9 @@ Or create a logentries.ini config file that sites next to the plugin with the fo """ +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + import os import socket import random @@ -188,8 +191,8 @@ class CallbackModule(CallbackBase): CALLBACK_TYPE = 'notification' CALLBACK_NAME = 'logentries' - def __init__(self, display): - super(CallbackModule, self).__init__(display) + def __init__(self): + super(CallbackModule, self).__init__() config_path = os.path.abspath(os.path.dirname(__file__)) config = ConfigParser.ConfigParser() From 125370ab482a3d0179b2f8a5c473550e17daa4e0 Mon Sep 17 00:00:00 2001 From: Jimmy Tang Date: Sat, 14 Nov 2015 19:46:00 +0000 Subject: [PATCH 4/5] Run when whitelisted --- lib/ansible/plugins/callback/logentries.py | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/ansible/plugins/callback/logentries.py b/lib/ansible/plugins/callback/logentries.py index 7195cca6f1..22980e1e4d 100644 --- a/lib/ansible/plugins/callback/logentries.py +++ b/lib/ansible/plugins/callback/logentries.py @@ -190,6 +190,7 @@ class CallbackModule(CallbackBase): CALLBACK_VERSION = 2.0 CALLBACK_TYPE = 'notification' CALLBACK_NAME = 'logentries' + CALLBACK_NEEDS_WHITELIST = True def __init__(self): super(CallbackModule, self).__init__() From 19ba54c9fd7f51475101213f0440397e3d673e7f Mon Sep 17 00:00:00 2001 From: Jimmy Tang Date: Sat, 14 Nov 2015 19:53:26 +0000 Subject: [PATCH 5/5] Don't be fatal on import errors so plays don't fail if plugin doesn't have required dependencies --- lib/ansible/plugins/callback/logentries.py | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/lib/ansible/plugins/callback/logentries.py b/lib/ansible/plugins/callback/logentries.py index 22980e1e4d..bf168d68a9 100644 --- a/lib/ansible/plugins/callback/logentries.py +++ b/lib/ansible/plugins/callback/logentries.py @@ -64,13 +64,15 @@ import ConfigParser import uuid try: import certifi + HAS_CERTIFI = True except ImportError: - print("please do 'pip install certifi'") + HAS_CERTIFI = False try: import flatdict + HAS_FLATDICT = True except ImportError: - print("please do 'pip install flatdict'") + HAS_FLATDICT = False from ansible.plugins.callback import CallbackBase @@ -195,6 +197,16 @@ class CallbackModule(CallbackBase): def __init__(self): super(CallbackModule, self).__init__() + if not HAS_CERTIFI: + self.disabled =True + self.display.warning('The `certifi` python module is not installed. ' + 'Disabling the Logentries callback plugin.') + + if not HAS_FLATDICT: + self.disabled =True + self.display.warning('The `flatdict` python module is not installed. ' + 'Disabling the Logentries callback plugin.') + config_path = os.path.abspath(os.path.dirname(__file__)) config = ConfigParser.ConfigParser() try: