mirror of
https://github.com/ansible-collections/community.general.git
synced 2024-09-14 20:13:21 +02:00
Initial add of logentries callback plugin
This callback plugin will generate json objects to be sent to the logentries service for auditing/debugging purposes. To use: Add this to your ansible.cfg file in the defaults block [defaults] callback_plugins = ./callback_plugins callback_stdout = logentries callback_whitelist = logentries Copy the callback plugin into the callback_plugings directory Either set the environment variables export LOGENTRIES_API=data.logentries.com export LOGENTRIES_PORT=10000 export LOGENTRIES_ANSIBLE_TOKEN=dd21fc88-f00a-43ff-b977-e3a4233c53af Or create a logentries.ini config file that sites next to the plugin with the following contents [logentries] api = data.logentries.com port = 10000 tls_port = 20000 use_tls = no token = dd21fc88-f00a-43ff-b977-e3a4233c53af
This commit is contained in:
parent
a766044b26
commit
85277c8aae
1 changed files with 336 additions and 0 deletions
336
lib/ansible/plugins/callback/logentries.py
Normal file
336
lib/ansible/plugins/callback/logentries.py
Normal file
|
@ -0,0 +1,336 @@
|
|||
"""
|
||||
(c) 2015, Logentries.com
|
||||
Author: Jimmy Tang <jimmy.tang@logentries.com>
|
||||
|
||||
This callback plugin will generate json objects to be sent to logentries
|
||||
for auditing/debugging purposes.
|
||||
|
||||
Todo:
|
||||
|
||||
* Better formatting of output before sending out to logentries data/api nodes.
|
||||
|
||||
To use:
|
||||
|
||||
Add this to your ansible.cfg file in the defaults block
|
||||
|
||||
[defaults]
|
||||
callback_plugins = ./callback_plugins
|
||||
callback_stdout = logentries
|
||||
callback_whitelist = logentries
|
||||
|
||||
Copy the callback plugin into the callback_plugings directory
|
||||
|
||||
Either set the environment variables
|
||||
|
||||
export LOGENTRIES_API=data.logentries.com
|
||||
export LOGENTRIES_PORT=10000
|
||||
export LOGENTRIES_ANSIBLE_TOKEN=dd21fc88-f00a-43ff-b977-e3a4233c53af
|
||||
|
||||
Or create a logentries.ini config file that sites next to the plugin with the following contents
|
||||
|
||||
[logentries]
|
||||
api = data.logentries.com
|
||||
port = 10000
|
||||
tls_port = 20000
|
||||
use_tls = no
|
||||
token = dd21fc88-f00a-43ff-b977-e3a4233c53af
|
||||
|
||||
|
||||
"""
|
||||
|
||||
import os
|
||||
import threading
|
||||
import socket
|
||||
import random
|
||||
import time
|
||||
import codecs
|
||||
import Queue
|
||||
import ConfigParser
|
||||
import uuid
|
||||
try:
|
||||
import certifi
|
||||
except ImportError:
|
||||
print("please do 'pip install certifi'")
|
||||
|
||||
try:
|
||||
import flatdict
|
||||
except ImportError:
|
||||
print("please do 'pip install flatdict'")
|
||||
|
||||
from ansible.plugins.callback import CallbackBase
|
||||
|
||||
|
||||
def to_unicode(ch):
|
||||
return codecs.unicode_escape_decode(ch)[0]
|
||||
|
||||
|
||||
def is_unicode(ch):
|
||||
return isinstance(ch, unicode)
|
||||
|
||||
|
||||
def create_unicode(ch):
|
||||
return unicode(ch, 'utf-8')
|
||||
|
||||
|
||||
class PlainTextSocketAppender(threading.Thread):
|
||||
def __init__(self,
|
||||
verbose=True,
|
||||
LE_API='data.logentries.com',
|
||||
LE_PORT=80,
|
||||
LE_TLS_PORT=443):
|
||||
threading.Thread.__init__(self)
|
||||
|
||||
self.QUEUE_SIZE = 32768
|
||||
self.LE_API = LE_API
|
||||
self.LE_PORT = LE_PORT
|
||||
self.LE_TLS_PORT = LE_TLS_PORT
|
||||
self.MIN_DELAY = 0.1
|
||||
self.MAX_DELAY = 10
|
||||
# Error message displayed when an incorrect Token has been detected
|
||||
self.INVALID_TOKEN = ("\n\nIt appears the LOGENTRIES_TOKEN "
|
||||
"parameter you entered is incorrect!\n\n")
|
||||
# Unicode Line separator character \u2028
|
||||
self.LINE_SEP = to_unicode('\u2028')
|
||||
|
||||
self.daemon = True
|
||||
self.verbose = verbose
|
||||
self._conn = None
|
||||
self._queue = Queue.Queue(self.QUEUE_SIZE)
|
||||
|
||||
def empty(self):
|
||||
return self._queue.empty()
|
||||
|
||||
def open_connection(self):
|
||||
self._conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
self._conn.connect((self.LE_API, self.LE_PORT))
|
||||
|
||||
def reopen_connection(self):
|
||||
self.close_connection()
|
||||
|
||||
root_delay = self.MIN_DELAY
|
||||
while True:
|
||||
try:
|
||||
self.open_connection()
|
||||
return
|
||||
except Exception:
|
||||
if self.verbose:
|
||||
print("Unable to connect to Logentries")
|
||||
|
||||
root_delay *= 2
|
||||
if (root_delay > self.MAX_DELAY):
|
||||
root_delay = self.MAX_DELAY
|
||||
|
||||
wait_for = root_delay + random.uniform(0, root_delay)
|
||||
|
||||
try:
|
||||
time.sleep(wait_for)
|
||||
except KeyboardInterrupt:
|
||||
raise
|
||||
|
||||
def close_connection(self):
|
||||
if self._conn is not None:
|
||||
self._conn.close()
|
||||
|
||||
def run(self):
|
||||
try:
|
||||
# Open connection
|
||||
self.reopen_connection()
|
||||
|
||||
# Send data in queue
|
||||
while True:
|
||||
# Take data from queue
|
||||
data = self._queue.get(block=True)
|
||||
|
||||
# Replace newlines with Unicode line separator
|
||||
# for multi-line events
|
||||
if not is_unicode(data):
|
||||
multiline = create_unicode(data).replace(
|
||||
'\n', self.LINE_SEP)
|
||||
else:
|
||||
multiline = data.replace('\n', self.LINE_SEP)
|
||||
multiline += "\n"
|
||||
# Send data, reconnect if needed
|
||||
while True:
|
||||
try:
|
||||
self._conn.send(multiline.encode('utf-8'))
|
||||
except socket.error:
|
||||
self.reopen_connection()
|
||||
continue
|
||||
break
|
||||
except KeyboardInterrupt:
|
||||
if self.verbose:
|
||||
print("Logentries asynchronous socket client interrupted")
|
||||
|
||||
self.close_connection()
|
||||
|
||||
|
||||
try:
|
||||
import ssl
|
||||
except ImportError: # for systems without TLS support.
|
||||
SocketAppender = PlainTextSocketAppender
|
||||
print("Unable to import ssl module. Will send over port 80.")
|
||||
else:
|
||||
|
||||
class TLSSocketAppender(PlainTextSocketAppender):
|
||||
def open_connection(self):
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
sock = ssl.wrap_socket(
|
||||
sock=sock,
|
||||
keyfile=None,
|
||||
certfile=None,
|
||||
server_side=False,
|
||||
cert_reqs=ssl.CERT_REQUIRED,
|
||||
ssl_version=getattr(
|
||||
ssl, 'PROTOCOL_TLSv1_2', ssl.PROTOCOL_TLSv1),
|
||||
ca_certs=certifi.where(),
|
||||
do_handshake_on_connect=True,
|
||||
suppress_ragged_eofs=True, )
|
||||
sock.connect((self.LE_API, self.LE_TLS_PORT))
|
||||
self._conn = sock
|
||||
|
||||
SocketAppender = TLSSocketAppender
|
||||
|
||||
|
||||
class CallbackModule(CallbackBase):
|
||||
CALLBACK_VERSION = 2.0
|
||||
CALLBACK_TYPE = 'notification'
|
||||
CALLBACK_NAME = 'logentries'
|
||||
|
||||
def __init__(self, display):
|
||||
super(CallbackModule, self).__init__(display)
|
||||
|
||||
config_path = os.path.abspath(os.path.dirname(__file__))
|
||||
config = ConfigParser.ConfigParser()
|
||||
try:
|
||||
config.readfp(open(os.path.join(config_path, 'logentries.ini')))
|
||||
if config.has_option('logentries', 'api'):
|
||||
self.api_uri = config.get('logentries', 'api')
|
||||
if config.has_option('logentries', 'port'):
|
||||
self.api_port = config.getint('logentries', 'port')
|
||||
if config.has_option('logentries', 'tls_port'):
|
||||
self.api_tls_port = config.getint('logentries', 'tls_port')
|
||||
if config.has_option('logentries', 'use_tls'):
|
||||
self.use_tls = config.getboolean('logentries', 'use_tls')
|
||||
if config.has_option('logentries', 'token'):
|
||||
self.token = config.get('logentries', 'token')
|
||||
except:
|
||||
self.api_uri = os.getenv('LOGENTRIES_API')
|
||||
if self.api_uri is None:
|
||||
self.api_uri = 'data.logentries.com'
|
||||
|
||||
try:
|
||||
self.api_port = int(os.getenv('LOGENTRIES_PORT'))
|
||||
if self.api_port is None:
|
||||
self.api_port = 80
|
||||
except TypeError:
|
||||
self.api_port = 80
|
||||
|
||||
try:
|
||||
self.api_tls_port = int(os.getenv('LOGENTRIES_TLS_PORT'))
|
||||
if self.api_tls_port is None:
|
||||
self.api_tls_port = 443
|
||||
except TypeError:
|
||||
self.api_tls_port = 443
|
||||
|
||||
# this just needs to be set to use TLS
|
||||
self.use_tls = os.getenv('LOGENTRIES_USE_TLS')
|
||||
if self.use_tls is None:
|
||||
self.use_tls = False
|
||||
elif self.use_tls.lower() in ['yes', 'true']:
|
||||
self.use_tls = True
|
||||
|
||||
self.token = os.getenv('LOGENTRIES_ANSIBLE_TOKEN')
|
||||
if self.token is None:
|
||||
self.disabled = True
|
||||
self._display.warning(
|
||||
'Logentries token could not be loaded. The logentries token can be provided using the `LOGENTRIES_TOKEN` environment variable')
|
||||
|
||||
self.verbose = False
|
||||
self.timeout = 10
|
||||
self.le_jobid = str(uuid.uuid4())
|
||||
|
||||
if self.use_tls:
|
||||
self._thread = TLSSocketAppender(verbose=self.verbose,
|
||||
LE_API=self.api_uri,
|
||||
LE_TLS_PORT=self.api_tls_port)
|
||||
else:
|
||||
self._thread = PlainTextSocketAppender(verbose=self.verbose,
|
||||
LE_API=self.api_uri,
|
||||
LE_PORT=self.api_port)
|
||||
|
||||
def emit(self, record):
|
||||
if not self._thread.is_alive():
|
||||
try:
|
||||
self._thread.start()
|
||||
if self.verbose:
|
||||
print("Starting Logentries Asynchronous Socket Appender")
|
||||
except RuntimeError: # It's already started.
|
||||
if not self._thread.is_alive():
|
||||
raise
|
||||
|
||||
msg = record.rstrip('\n')
|
||||
msg = "{} {}".format(self.token, msg)
|
||||
self._thread._queue.put(msg)
|
||||
|
||||
def runner_on_ok(self, host, res):
|
||||
results = {}
|
||||
results['le_jobid'] = self.le_jobid
|
||||
results['hostname'] = host
|
||||
results['results'] = res
|
||||
results['status'] = 'OK'
|
||||
results = flatdict.FlatDict(results)
|
||||
self.emit(self._dump_results(results))
|
||||
|
||||
def runner_on_failed(self, host, res, ignore_errors=False):
|
||||
results = {}
|
||||
results['le_jobid'] = self.le_jobid
|
||||
results['hostname'] = host
|
||||
results['results'] = res
|
||||
results['status'] = 'FAILED'
|
||||
results = flatdict.FlatDict(results)
|
||||
self.emit(self._dump_results(results))
|
||||
|
||||
def runner_on_skipped(self, host, item=None):
|
||||
results = {}
|
||||
results['le_jobid'] = self.le_jobid
|
||||
results['hostname'] = host
|
||||
results['status'] = 'SKIPPED'
|
||||
results = flatdict.FlatDict(results)
|
||||
self.emit(self._dump_results(results))
|
||||
|
||||
def runner_on_unreachable(self, host, res):
|
||||
results = {}
|
||||
results['le_jobid'] = self.le_jobid
|
||||
results['hostname'] = host
|
||||
results['results'] = res
|
||||
results['status'] = 'UNREACHABLE'
|
||||
results = flatdict.FlatDict(results)
|
||||
self.emit(self._dump_results(results))
|
||||
|
||||
def runner_on_async_failed(self, host, res, jid):
|
||||
results = {}
|
||||
results['le_jobid'] = self.le_jobid
|
||||
results['hostname'] = host
|
||||
results['results'] = res
|
||||
results['jid'] = jid
|
||||
results['status'] = 'ASYNC_FAILED'
|
||||
results = flatdict.FlatDict(results)
|
||||
self.emit(self._dump_results(results))
|
||||
|
||||
def v2_playbook_on_play_start(self, play):
|
||||
results = {}
|
||||
results['le_jobid'] = self.le_jobid
|
||||
results['started_by'] = os.getlogin()
|
||||
if play.name:
|
||||
results['play'] = play.name
|
||||
results['hosts'] = play.hosts
|
||||
results = flatdict.FlatDict(results)
|
||||
self.emit(self._dump_results(results))
|
||||
|
||||
def playbook_on_stats(self, stats):
|
||||
""" flush out queue of messages """
|
||||
now = time.time()
|
||||
while not self._thread.empty():
|
||||
time.sleep(0.2)
|
||||
if time.time() - now > self.timeout:
|
||||
break
|
Loading…
Reference in a new issue