diff --git a/lib/ansible/constants.py b/lib/ansible/constants.py index d37f0c58b5..7391424682 100644 --- a/lib/ansible/constants.py +++ b/lib/ansible/constants.py @@ -135,6 +135,7 @@ ANSIBLE_SSH_CONTROL_PATH = get_config(p, 'ssh_connection', 'control_path', PARAMIKO_RECORD_HOST_KEYS = get_config(p, 'paramiko_connection', 'record_host_keys', 'ANSIBLE_PARAMIKO_RECORD_HOST_KEYS', True, boolean=True) ZEROMQ_PORT = get_config(p, 'fireball_connection', 'zeromq_port', 'ANSIBLE_ZEROMQ_PORT', 5099, integer=True) ACCELERATE_PORT = get_config(p, 'accelerate', 'accelerate_port', 'ACCELERATE_PORT', 5099, integer=True) +ACCELERATE_TIMEOUT = int(get_config(p, 'accelerate', 'accelerate_timeout', 'ACCELERATE_TIMEOUT', 300)) DEFAULT_UNDEFINED_VAR_BEHAVIOR = get_config(p, DEFAULTS, 'error_on_undefined_vars', 'ANSIBLE_ERROR_ON_UNDEFINED_VARS', True, boolean=True) HOST_KEY_CHECKING = get_config(p, DEFAULTS, 'host_key_checking', 'ANSIBLE_HOST_KEY_CHECKING', True, boolean=True) diff --git a/lib/ansible/playbook/__init__.py b/lib/ansible/playbook/__init__.py index 216ddc6146..f350ff9aa4 100644 --- a/lib/ansible/playbook/__init__.py +++ b/lib/ansible/playbook/__init__.py @@ -314,6 +314,7 @@ class PlayBook(object): transport=task.transport, sudo_pass=task.sudo_pass, is_playbook=True, check=self.check, diff=self.diff, environment=task.environment, complex_args=task.args, accelerate=task.play.accelerate, accelerate_port=task.play.accelerate_port, + accelerate_timeout=task.play.accelerate_timeout, error_on_undefined_vars=C.DEFAULT_UNDEFINED_VAR_BEHAVIOR ) @@ -454,7 +455,8 @@ class PlayBook(object): setup_cache=self.SETUP_CACHE, callbacks=self.runner_callbacks, sudo=play.sudo, sudo_user=play.sudo_user, transport=play.transport, sudo_pass=self.sudo_pass, is_playbook=True, module_vars=play.vars, default_vars=play.default_vars, check=self.check, diff=self.diff, - accelerate=play.accelerate, accelerate_port=play.accelerate_port + accelerate=play.accelerate, accelerate_port=play.accelerate_port, + accelerate_timeout=play.accelerate_timeout ).run() self.stats.compute(setup_results, setup=True) diff --git a/lib/ansible/playbook/play.py b/lib/ansible/playbook/play.py index 0273be841c..1d617003ea 100644 --- a/lib/ansible/playbook/play.py +++ b/lib/ansible/playbook/play.py @@ -30,7 +30,7 @@ class Play(object): __slots__ = [ 'hosts', 'name', 'vars', 'default_vars', 'vars_prompt', 'vars_files', 'handlers', 'remote_user', 'remote_port', 'included_roles', 'accelerate', - 'accelerate_port', 'sudo', 'sudo_user', 'transport', 'playbook', + 'accelerate_port', 'accelerate_timeout', 'sudo', 'sudo_user', 'transport', 'playbook', 'tags', 'gather_facts', 'serial', '_ds', '_handlers', '_tasks', 'basedir', 'any_errors_fatal', 'roles', 'max_fail_pct' ] @@ -40,7 +40,7 @@ class Play(object): VALID_KEYS = [ 'hosts', 'name', 'vars', 'vars_prompt', 'vars_files', 'tasks', 'handlers', 'remote_user', 'user', 'port', 'include', 'accelerate', 'accelerate_port', - 'sudo', 'sudo_user', 'connection', 'tags', 'gather_facts', 'serial', + 'accelerate_timeout', 'sudo', 'sudo_user', 'connection', 'tags', 'gather_facts', 'serial', 'any_errors_fatal', 'roles', 'pre_tasks', 'post_tasks', 'max_fail_percentage' ] @@ -114,6 +114,7 @@ class Play(object): self.any_errors_fatal = utils.boolean(ds.get('any_errors_fatal', 'false')) self.accelerate = utils.boolean(ds.get('accelerate', 'false')) self.accelerate_port = ds.get('accelerate_port', None) + self.accelerate_timeout = int(ds.get('accelerate_timeout', 300)) self.max_fail_pct = int(ds.get('max_fail_percentage', 100)) load_vars = {} diff --git a/lib/ansible/runner/__init__.py b/lib/ansible/runner/__init__.py index ac22fb1399..e336a13da4 100644 --- a/lib/ansible/runner/__init__.py +++ b/lib/ansible/runner/__init__.py @@ -136,6 +136,7 @@ class Runner(object): error_on_undefined_vars=C.DEFAULT_UNDEFINED_VAR_BEHAVIOR, # ex. False accelerate=False, # use accelerated connection accelerate_port=None, # port to use with accelerated connection + accelerate_timeout=None, # number of seconds to wait for a response on the accelerated connection ): # used to lock multiprocess inputs and outputs at various levels @@ -179,6 +180,7 @@ class Runner(object): self.error_on_undefined_vars = error_on_undefined_vars self.accelerate = accelerate self.accelerate_port = accelerate_port + self.accelerate_timeout = accelerate_timeout self.callbacks.runner = self self.original_transport = self.transport @@ -581,6 +583,12 @@ class Runner(object): actual_transport = "accelerate" if not self.accelerate_port: self.accelerate_port = C.ACCELERATE_PORT + try: + if not self.accelerate_timeout: + self.accelerate_timeout = C.ACCELERATE_TIMEOUT + self.accelerate_timeout = int(self.accelerate_timeout) + except: + raise errors.AnsibleError("invalid value for the accelerate_timeout parameter") if actual_transport in [ 'paramiko', 'ssh', 'accelerate' ]: actual_port = inject.get('ansible_ssh_port', port) diff --git a/lib/ansible/runner/connection_plugins/accelerate.py b/lib/ansible/runner/connection_plugins/accelerate.py index 3eb08e7b80..0571fb25c7 100644 --- a/lib/ansible/runner/connection_plugins/accelerate.py +++ b/lib/ansible/runner/connection_plugins/accelerate.py @@ -103,7 +103,7 @@ class Connection(object): # TODO: make the timeout and retries configurable? tries = 3 self.conn = socket.socket() - self.conn.settimeout(300.0) + self.conn.settimeout(self.runner.accelerate_timeout) vvvv("attempting connection to %s via the accelerated port %d" % (self.host,self.accport)) while tries > 0: try: diff --git a/library/utilities/accelerate b/library/utilities/accelerate index f7cdea2985..420d78bdbe 100644 --- a/library/utilities/accelerate +++ b/library/utilities/accelerate @@ -35,6 +35,12 @@ options: required: false default: 5099 aliases: [] + timeout: + description: + - The number of seconds the socket will wait for data. If none is received when the timeout value is reached, the connection will be closed. + required: false + default: 300 + aliases: [] minutes: description: - The I(accelerate) listener daemon is started on nodes and will stay around for @@ -175,11 +181,11 @@ class ThreadWithReturnValue(Thread): return self._return class ThreadedTCPServer(SocketServer.ThreadingTCPServer): - def __init__(self, server_address, RequestHandlerClass, module, password): + def __init__(self, server_address, RequestHandlerClass, module, password, timeout): self.module = module self.key = AesKey.Read(password) self.allow_reuse_address = True - self.timeout = None + self.timeout = timeout SocketServer.ThreadingTCPServer.__init__(self, server_address, RequestHandlerClass) class ThreadedTCPRequestHandler(SocketServer.BaseRequestHandler): @@ -384,7 +390,7 @@ class ThreadedTCPRequestHandler(SocketServer.BaseRequestHandler): return dict(failed=True, stdout="failed to chown the file via sudo") return dict() -def daemonize(module, password, port, minutes): +def daemonize(module, password, port, timeout, minutes): try: daemonize_self(module, password, port, minutes) @@ -394,7 +400,7 @@ def daemonize(module, password, port, minutes): signal.signal(signal.SIGALRM, catcher) signal.setitimer(signal.ITIMER_REAL, 60 * minutes) - server = ThreadedTCPServer(("0.0.0.0", port), ThreadedTCPRequestHandler, module, password) + server = ThreadedTCPServer(("0.0.0.0", port), ThreadedTCPRequestHandler, module, password, timeout) server.allow_reuse_address = True vv("serving!") @@ -409,6 +415,7 @@ def main(): module = AnsibleModule( argument_spec = dict( port=dict(required=False, default=5099), + timeout=dict(required=False, default=300), password=dict(required=True), minutes=dict(required=False, default=30), debug=dict(required=False, default=0, type='int') @@ -418,6 +425,7 @@ def main(): password = base64.b64decode(module.params['password']) port = int(module.params['port']) + timeout = int(module.params['timeout']) minutes = int(module.params['minutes']) debug = int(module.params['debug']) @@ -426,8 +434,7 @@ def main(): DEBUG_LEVEL=debug - daemonize(module, password, port, minutes) - + daemonize(module, password, port, timeout, minutes) # this is magic, see lib/ansible/module_common.py #<>