From f9c87868ac4f90af4ba89e5060bee77f7d314343 Mon Sep 17 00:00:00 2001 From: James Cammarata Date: Tue, 1 Oct 2013 15:19:21 -0500 Subject: [PATCH] Added keepalive packets to accelerate mode Commands will now be started up in a separate task from the main handler thread, so that it can be monitored for completeness while sending a keepalive packet back to the controller to avoid a socket receive timeout. --- .../runner/connection_plugins/accelerate.py | 21 ++- library/utilities/accelerate | 125 ++++++++++++------ 2 files changed, 97 insertions(+), 49 deletions(-) diff --git a/lib/ansible/runner/connection_plugins/accelerate.py b/lib/ansible/runner/connection_plugins/accelerate.py index d6ce7243f1..3eb08e7b80 100644 --- a/lib/ansible/runner/connection_plugins/accelerate.py +++ b/lib/ansible/runner/connection_plugins/accelerate.py @@ -180,11 +180,22 @@ class Connection(object): if self.send_data(data): raise errors.AnsibleError("Failed to send command to %s" % self.host) - response = self.recv_data() - if not response: - raise errors.AnsibleError("Failed to get a response from %s" % self.host) - response = utils.decrypt(self.key, response) - response = utils.parse_json(response) + while True: + # we loop here while waiting for the response, because a + # long running command may cause us to receive keepalive packets + # ({"pong":"true"}) rather than the response we want. + response = self.recv_data() + if not response: + raise errors.AnsibleError("Failed to get a response from %s" % self.host) + response = utils.decrypt(self.key, response) + response = utils.parse_json(response) + if "pong" in response: + # it's a keepalive, go back to waiting + vvvv("received a keepalive packet") + continue + else: + vvvv("received the response") + break return (response.get('rc',None), '', response.get('stdout',''), response.get('stderr','')) diff --git a/library/utilities/accelerate b/library/utilities/accelerate index da16d8395e..f7cdea2985 100644 --- a/library/utilities/accelerate +++ b/library/utilities/accelerate @@ -58,24 +58,25 @@ EXAMPLES = ''' - command: /usr/bin/anything ''' -import os -import os.path -import tempfile -import sys -import shutil -import socket -import struct -import time import base64 import getpass +import os +import os.path +import shutil +import signal +import socket +import struct +import sys import syslog -import signal +import tempfile import time -import signal import traceback import SocketServer +from datetime import datetime +from threading import Thread + syslog.openlog('ansible-%s' % os.path.basename(__file__)) PIDFILE = os.path.expanduser("~/.accelerate.pid") @@ -160,6 +161,19 @@ def daemonize_self(module, password, port, minutes): os.dup2(dev_null.fileno(), sys.stderr.fileno()) log("daemonizing successful") +class ThreadWithReturnValue(Thread): + def __init__(self, group=None, target=None, name=None, args=(), kwargs={}, Verbose=None): + Thread.__init__(self, group, target, name, args, kwargs, Verbose) + self._return = None + + def run(self): + if self._Thread__target is not None: + self._return = self._Thread__target(*self._Thread__args, + **self._Thread__kwargs) + def join(self,timeout=None): + Thread.join(self, timeout=timeout) + return self._return + class ThreadedTCPServer(SocketServer.ThreadingTCPServer): def __init__(self, server_address, RequestHandlerClass, module, password): self.module = module @@ -193,47 +207,70 @@ class ThreadedTCPRequestHandler(SocketServer.BaseRequestHandler): vvv("received nothing, bailing out") return None data += d + vvvv("data received so far (expecting %d): %d" % (data_len,len(data))) vvvv("received all of the data, returning") return data def handle(self): - while True: - vvvv("waiting for data") - data = self.recv_data() - if not data: - vvvv("received nothing back from recv_data(), breaking out") - break - try: - vvvv("got data, decrypting") - data = self.server.key.Decrypt(data) - vvvv("decryption done") - except: - vv("bad decrypt, skipping...") - data2 = json.dumps(dict(rc=1)) + try: + while True: + vvvv("waiting for data") + data = self.recv_data() + if not data: + vvvv("received nothing back from recv_data(), breaking out") + break + try: + vvvv("got data, decrypting") + data = self.server.key.Decrypt(data) + vvvv("decryption done") + except: + vv("bad decrypt, skipping...") + data2 = json.dumps(dict(rc=1)) + data2 = self.server.key.Encrypt(data2) + send_data(client, data2) + return + + vvvv("loading json from the data") + data = json.loads(data) + + mode = data['mode'] + response = {} + last_pong = datetime.now() + if mode == 'command': + vvvv("received a command request, running it") + twrv = ThreadWithReturnValue(target=self.command, args=(data,)) + twrv.start() + response = None + while twrv.is_alive(): + if (datetime.now() - last_pong).seconds >= 15: + last_pong = datetime.now() + vvvv("command still running, sending keepalive packet") + data2 = json.dumps(dict(pong=True)) + data2 = self.server.key.Encrypt(data2) + self.send_data(data2) + time.sleep(0.1) + response = twrv._return + vvvv("thread is done, response from join was %s" % response) + elif mode == 'put': + vvvv("received a put request, putting it") + response = self.put(data) + elif mode == 'fetch': + vvvv("received a fetch request, getting it") + response = self.fetch(data) + + vvvv("response result is %s" % str(response)) + data2 = json.dumps(response) data2 = self.server.key.Encrypt(data2) - send_data(client, data2) - return - - vvvv("loading json from the data") - data = json.loads(data) - - mode = data['mode'] - response = {} - if mode == 'command': - vvvv("received a command request, running it") - response = self.command(data) - elif mode == 'put': - vvvv("received a put request, putting it") - response = self.put(data) - elif mode == 'fetch': - vvvv("received a fetch request, getting it") - response = self.fetch(data) - - data2 = json.dumps(response) + vvvv("sending the response back to the controller") + self.send_data(data2) + vvvv("done sending the response") + except: + tb = traceback.format_exc() + log("encountered an unhandled exception in the handle() function") + log("error was:\n%s" % tb) + data2 = json.dumps(dict(rc=1, failed=True, msg="unhandled error in the handle() function")) data2 = self.server.key.Encrypt(data2) - vvvv("sending the response back to the controller") self.send_data(data2) - vvvv("done sending the response") def command(self, data): if 'cmd' not in data: