mirror of
https://github.com/ansible-collections/community.general.git
synced 2024-09-14 20:13:21 +02:00
Added keepalive packets to accelerate mode
Commands will now be started up in a separate task from the main handler thread, so that it can be monitored for completeness while sending a keepalive packet back to the controller to avoid a socket receive timeout.
This commit is contained in:
parent
52a42bf607
commit
f9c87868ac
2 changed files with 97 additions and 49 deletions
|
@ -180,11 +180,22 @@ class Connection(object):
|
||||||
if self.send_data(data):
|
if self.send_data(data):
|
||||||
raise errors.AnsibleError("Failed to send command to %s" % self.host)
|
raise errors.AnsibleError("Failed to send command to %s" % self.host)
|
||||||
|
|
||||||
|
while True:
|
||||||
|
# we loop here while waiting for the response, because a
|
||||||
|
# long running command may cause us to receive keepalive packets
|
||||||
|
# ({"pong":"true"}) rather than the response we want.
|
||||||
response = self.recv_data()
|
response = self.recv_data()
|
||||||
if not response:
|
if not response:
|
||||||
raise errors.AnsibleError("Failed to get a response from %s" % self.host)
|
raise errors.AnsibleError("Failed to get a response from %s" % self.host)
|
||||||
response = utils.decrypt(self.key, response)
|
response = utils.decrypt(self.key, response)
|
||||||
response = utils.parse_json(response)
|
response = utils.parse_json(response)
|
||||||
|
if "pong" in response:
|
||||||
|
# it's a keepalive, go back to waiting
|
||||||
|
vvvv("received a keepalive packet")
|
||||||
|
continue
|
||||||
|
else:
|
||||||
|
vvvv("received the response")
|
||||||
|
break
|
||||||
|
|
||||||
return (response.get('rc',None), '', response.get('stdout',''), response.get('stderr',''))
|
return (response.get('rc',None), '', response.get('stdout',''), response.get('stderr',''))
|
||||||
|
|
||||||
|
|
|
@ -58,24 +58,25 @@ EXAMPLES = '''
|
||||||
- command: /usr/bin/anything
|
- command: /usr/bin/anything
|
||||||
'''
|
'''
|
||||||
|
|
||||||
import os
|
|
||||||
import os.path
|
|
||||||
import tempfile
|
|
||||||
import sys
|
|
||||||
import shutil
|
|
||||||
import socket
|
|
||||||
import struct
|
|
||||||
import time
|
|
||||||
import base64
|
import base64
|
||||||
import getpass
|
import getpass
|
||||||
|
import os
|
||||||
|
import os.path
|
||||||
|
import shutil
|
||||||
|
import signal
|
||||||
|
import socket
|
||||||
|
import struct
|
||||||
|
import sys
|
||||||
import syslog
|
import syslog
|
||||||
import signal
|
import tempfile
|
||||||
import time
|
import time
|
||||||
import signal
|
|
||||||
import traceback
|
import traceback
|
||||||
|
|
||||||
import SocketServer
|
import SocketServer
|
||||||
|
|
||||||
|
from datetime import datetime
|
||||||
|
from threading import Thread
|
||||||
|
|
||||||
syslog.openlog('ansible-%s' % os.path.basename(__file__))
|
syslog.openlog('ansible-%s' % os.path.basename(__file__))
|
||||||
PIDFILE = os.path.expanduser("~/.accelerate.pid")
|
PIDFILE = os.path.expanduser("~/.accelerate.pid")
|
||||||
|
|
||||||
|
@ -160,6 +161,19 @@ def daemonize_self(module, password, port, minutes):
|
||||||
os.dup2(dev_null.fileno(), sys.stderr.fileno())
|
os.dup2(dev_null.fileno(), sys.stderr.fileno())
|
||||||
log("daemonizing successful")
|
log("daemonizing successful")
|
||||||
|
|
||||||
|
class ThreadWithReturnValue(Thread):
|
||||||
|
def __init__(self, group=None, target=None, name=None, args=(), kwargs={}, Verbose=None):
|
||||||
|
Thread.__init__(self, group, target, name, args, kwargs, Verbose)
|
||||||
|
self._return = None
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
if self._Thread__target is not None:
|
||||||
|
self._return = self._Thread__target(*self._Thread__args,
|
||||||
|
**self._Thread__kwargs)
|
||||||
|
def join(self,timeout=None):
|
||||||
|
Thread.join(self, timeout=timeout)
|
||||||
|
return self._return
|
||||||
|
|
||||||
class ThreadedTCPServer(SocketServer.ThreadingTCPServer):
|
class ThreadedTCPServer(SocketServer.ThreadingTCPServer):
|
||||||
def __init__(self, server_address, RequestHandlerClass, module, password):
|
def __init__(self, server_address, RequestHandlerClass, module, password):
|
||||||
self.module = module
|
self.module = module
|
||||||
|
@ -193,10 +207,12 @@ class ThreadedTCPRequestHandler(SocketServer.BaseRequestHandler):
|
||||||
vvv("received nothing, bailing out")
|
vvv("received nothing, bailing out")
|
||||||
return None
|
return None
|
||||||
data += d
|
data += d
|
||||||
|
vvvv("data received so far (expecting %d): %d" % (data_len,len(data)))
|
||||||
vvvv("received all of the data, returning")
|
vvvv("received all of the data, returning")
|
||||||
return data
|
return data
|
||||||
|
|
||||||
def handle(self):
|
def handle(self):
|
||||||
|
try:
|
||||||
while True:
|
while True:
|
||||||
vvvv("waiting for data")
|
vvvv("waiting for data")
|
||||||
data = self.recv_data()
|
data = self.recv_data()
|
||||||
|
@ -219,9 +235,22 @@ class ThreadedTCPRequestHandler(SocketServer.BaseRequestHandler):
|
||||||
|
|
||||||
mode = data['mode']
|
mode = data['mode']
|
||||||
response = {}
|
response = {}
|
||||||
|
last_pong = datetime.now()
|
||||||
if mode == 'command':
|
if mode == 'command':
|
||||||
vvvv("received a command request, running it")
|
vvvv("received a command request, running it")
|
||||||
response = self.command(data)
|
twrv = ThreadWithReturnValue(target=self.command, args=(data,))
|
||||||
|
twrv.start()
|
||||||
|
response = None
|
||||||
|
while twrv.is_alive():
|
||||||
|
if (datetime.now() - last_pong).seconds >= 15:
|
||||||
|
last_pong = datetime.now()
|
||||||
|
vvvv("command still running, sending keepalive packet")
|
||||||
|
data2 = json.dumps(dict(pong=True))
|
||||||
|
data2 = self.server.key.Encrypt(data2)
|
||||||
|
self.send_data(data2)
|
||||||
|
time.sleep(0.1)
|
||||||
|
response = twrv._return
|
||||||
|
vvvv("thread is done, response from join was %s" % response)
|
||||||
elif mode == 'put':
|
elif mode == 'put':
|
||||||
vvvv("received a put request, putting it")
|
vvvv("received a put request, putting it")
|
||||||
response = self.put(data)
|
response = self.put(data)
|
||||||
|
@ -229,11 +258,19 @@ class ThreadedTCPRequestHandler(SocketServer.BaseRequestHandler):
|
||||||
vvvv("received a fetch request, getting it")
|
vvvv("received a fetch request, getting it")
|
||||||
response = self.fetch(data)
|
response = self.fetch(data)
|
||||||
|
|
||||||
|
vvvv("response result is %s" % str(response))
|
||||||
data2 = json.dumps(response)
|
data2 = json.dumps(response)
|
||||||
data2 = self.server.key.Encrypt(data2)
|
data2 = self.server.key.Encrypt(data2)
|
||||||
vvvv("sending the response back to the controller")
|
vvvv("sending the response back to the controller")
|
||||||
self.send_data(data2)
|
self.send_data(data2)
|
||||||
vvvv("done sending the response")
|
vvvv("done sending the response")
|
||||||
|
except:
|
||||||
|
tb = traceback.format_exc()
|
||||||
|
log("encountered an unhandled exception in the handle() function")
|
||||||
|
log("error was:\n%s" % tb)
|
||||||
|
data2 = json.dumps(dict(rc=1, failed=True, msg="unhandled error in the handle() function"))
|
||||||
|
data2 = self.server.key.Encrypt(data2)
|
||||||
|
self.send_data(data2)
|
||||||
|
|
||||||
def command(self, data):
|
def command(self, data):
|
||||||
if 'cmd' not in data:
|
if 'cmd' not in data:
|
||||||
|
|
Loading…
Reference in a new issue