From eaa7714ff8c608e75a223b594882c273a9922b1c Mon Sep 17 00:00:00 2001 From: Michael DeHaan Date: Sat, 3 Mar 2012 12:25:56 -0500 Subject: [PATCH] Laying the groundwork for async mode, async status script still needs to be done, and async_wrapper still needs to daemonize. Then, once done, playbook can be taught how to poll async within the timing window. --- bin/ansible | 3 ++ lib/ansible/runner.py | 69 ++++++++++++++++++++----------- library/async_wrapper | 96 +++++++++++++++++++++++++++++++++++++++++++ library/copy | 0 library/ping | 2 +- 5 files changed, 145 insertions(+), 25 deletions(-) create mode 100755 library/async_wrapper mode change 100644 => 100755 library/copy mode change 100644 => 100755 library/ping diff --git a/bin/ansible b/bin/ansible index 187ad6219e..3098e5295e 100755 --- a/bin/ansible +++ b/bin/ansible @@ -52,6 +52,8 @@ class Cli(object): parser = OptionParser(usage = 'ansible [options]') parser.add_option("-a", "--args", dest="module_args", help="module arguments", default=C.DEFAULT_MODULE_ARGS) + parser.add_option("-B", "--background", dest="seconds", default=0, + help="run asynchronously, failing after X seconds") parser.add_option('-f','--forks', dest='forks', default=C.DEFAULT_FORKS, type='int', help='number of parallel processes to use') parser.add_option("-i", "--inventory-file", dest="inventory", @@ -96,6 +98,7 @@ class Cli(object): host_list=options.inventory, timeout=options.timeout, forks=options.forks, + background=options.seconds, pattern=pattern, verbose=True, ).run() diff --git a/lib/ansible/runner.py b/lib/ansible/runner.py index aa7b7d5afb..e00470c188 100755 --- a/lib/ansible/runner.py +++ b/lib/ansible/runner.py @@ -30,6 +30,7 @@ import os import ansible.constants as C import Queue import paramiko +import random ################################################ @@ -56,6 +57,7 @@ class Runner(object): pattern=C.DEFAULT_PATTERN, remote_user=C.DEFAULT_REMOTE_USER, remote_pass=C.DEFAULT_REMOTE_PASS, + background=0, verbose=False): ''' @@ -68,6 +70,7 @@ class Runner(object): forks -------- how parallel should we be? 1 is extra debuggable. remote_user -- who to login as (default root) remote_pass -- provide only if you don't want to use keys or ssh-agent + background --- if non 0, run async, failing after X seconds, -1 == infinite ''' # save input values @@ -82,9 +85,14 @@ class Runner(object): self.verbose = verbose self.remote_user = remote_user self.remote_pass = remote_pass + self.background = background + # hosts in each group name in the inventory file self._tmp_paths = {} + random.seed() + self.generated_jid = str(random.randint(0, 999999999999)) + @classmethod def parse_hosts(cls, host_list): ''' @@ -162,7 +170,11 @@ class Runner(object): def _delete_remote_files(self, conn, files): ''' deletes one or more remote files ''' + if type(files) == str: + files = [ files ] for filename in files: + if not filename.startswith('/tmp/'): + raise Exception("not going to happen") self._exec_command(conn, "rm -rf %s" % filename) def _transfer_file(self, conn, source, dest): @@ -172,20 +184,21 @@ class Runner(object): sftp.put(source, dest) sftp.close() - def _transfer_module(self, conn, tmp): + def _transfer_module(self, conn, tmp, module): ''' transfers a module file to the remote side to execute it, but does not execute it yet ''' - outpath = self._copy_module(conn, tmp) + outpath = self._copy_module(conn, tmp, module) self._exec_command(conn, "chmod +x %s" % outpath) return outpath - def _execute_module(self, conn, outpath, tmp): + def _execute_module(self, conn, tmp, remote_module_path, module_args): ''' runs a module that has already been transferred ''' - cmd = self._command(outpath) + args = " ".join(module_args) + cmd = "%s %s" % (remote_module_path, args) result = self._exec_command(conn, cmd) self._delete_remote_files(conn, [ tmp ]) return result @@ -195,11 +208,23 @@ class Runner(object): transfer & execute a module that is not 'copy' or 'template' because those require extra work. ''' - module = self._transfer_module(conn, tmp) - result = self._execute_module(conn, module, tmp) + module = self._transfer_module(conn, tmp, self.module_name) + result = self._execute_module(conn, tmp, module, self.module_args) self._delete_remote_files(conn, tmp) - result = self._return_from_module(conn, host, result) - return result + return self._return_from_module(conn, host, result) + + def _execute_async_module(self, conn, host, tmp): + ''' + transfer the given module name, plus the async module + and then run the async module wrapping the other module + ''' + async = self._transfer_module(conn, tmp, 'async_wrapper') + module = self._transfer_module(conn, tmp, self.module_name) + new_args = [] + new_args = [ self.generated_jid, module ] + new_args.extend(self.module_args) + result = self._execute_module(conn, tmp, async, new_args) + return self._return_from_module(conn, host, result) def _parse_kv(self, args): ''' helper function to convert a string of key/value items to a dict ''' @@ -225,11 +250,11 @@ class Runner(object): # install the copy module self.module_name = 'copy' - module = self._transfer_module(conn) + module = self._transfer_module(conn, tmp, 'copy') # run the copy module - self.module_args = [ "src=%s" % tmp_src, "dest=%s" % dest ] - result = self._execute_module(conn, module, tmp) + args = [ "src=%s" % tmp_src, "dest=%s" % dest ] + result = self._execute_module(conn, tmp, module, args) self._delete_remote_files(conn, tmp_path) return self._return_from_module(conn, host, result) @@ -253,8 +278,8 @@ class Runner(object): module = self._transfer_module(conn, tmp) # run the template module - self.module_args = [ "src=%s" % temppath, "dest=%s" % dest, "metadata=%s" % metadata ] - result = self._execute_module(conn, module, tmp) + args = [ "src=%s" % temppath, "dest=%s" % dest, "metadata=%s" % metadata ] + result = self._execute_module(conn, tmp, module, args) self._delete_remote_files(conn, [ tpath ]) return self._return_from_module(conn, host, result) @@ -278,7 +303,10 @@ class Runner(object): tmp = self._get_tmp_path(conn) result = None if self.module_name not in [ 'copy', 'template' ]: - result = self._execute_normal_module(conn, host, tmp) + if self.background == 0: + result = self._execute_normal_module(conn, host, tmp) + else: + result = self._execute_async_module(conn, host, tmp) elif self.module_name == 'copy': result = self._execute_copy(conn, host, tmp) elif self.module_name == 'template': @@ -291,12 +319,6 @@ class Runner(object): conn.close() return result - - def _command(self, outpath): - ''' form up a command string for running over SSH ''' - cmd = "%s %s" % (outpath, " ".join(self.module_args)) - return cmd - def remote_log(self, conn, msg): ''' this is the function we use to log things ''' stdin, stdout, stderr = conn.exec_command('/usr/bin/logger -t ansible -p auth.info %r' % msg) @@ -315,12 +337,12 @@ class Runner(object): result = self._exec_command(conn, "mktemp -d /tmp/ansible.XXXXXX") return result.split("\n")[0] + '/' - def _copy_module(self, conn, tmp): + def _copy_module(self, conn, tmp, module): ''' transfer a module over SFTP, does not run it ''' in_path = os.path.expanduser( - os.path.join(self.module_path, self.module_name) + os.path.join(self.module_path, module) ) - out_path = tmp + self.module_name + out_path = tmp + module sftp = conn.open_sftp() sftp.put(in_path, out_path) sftp.close() @@ -359,7 +381,6 @@ class Runner(object): for worker in workers: worker.join() except KeyboardInterrupt: - print 'parent received ctrl-c' for worker in workers: worker.terminate() worker.join() diff --git a/library/async_wrapper b/library/async_wrapper new file mode 100755 index 0000000000..523011698b --- /dev/null +++ b/library/async_wrapper @@ -0,0 +1,96 @@ +#!/usr/bin/python + +# (c) 2012, Michael DeHaan , and others +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see . +# + +try: + import json +except ImportError: + import simplejson as json +import shlex +import os +import subprocess +import sys +import datetime +import traceback + +if len(sys.argv) < 3: + print json.dumps({ + "failed" : True, + "msg" : "usage: async_wrapper . Humans, do not call directly!" + }) + sys.exit(1) + +jid = sys.argv[1] +wrapped_module = sys.argv[2] +args = sys.argv[3:] + +cmd = "%s %s" % (wrapped_module, " ".join(args)) + +# setup logging directory +logdir = os.path.expanduser("~/.ansible_async") +log_path = os.path.join(logdir, jid) + +if not os.path.exists(logdir): + try: + os.makedirs(logdir) + except: + print json.dumps({ + "failed" : 1, + "msg" : "could not create: %s" % logdir + }) + +def _run_command(wrapped_cmd, jid, log_path): + + logfile = open(log_path, "w+") + logfile.write(json.dumps({ "started" : 1, "ansible_job_id" : jid })) + result = {} + + try: + cmd = shlex.split(wrapped_cmd) + subprocess.call("/usr/bin/logger %s" % wrapped_cmd, shell=True) + script = subprocess.Popen(cmd, shell=False, + stdout=subprocess.PIPE, stderr=subprocess.PIPE) + out, err = script.communicate() + result = json.loads(out) + + except (OSError, IOError), e: + result = { + "failed": 1, + "msg": str(e), + } + except: + result = { + "failed" : 1, + "msg" : traceback.format_exc() + } + + result['ansible_job_id'] = jid + logfile = open(log_path, "w+") + logfile.write(json.dumps(result)) + logfile.close() + + # TEMPORARY: + print json.dumps(result) + +# TODO: daemonize this with time limits +# TODO: might be nice to keep timing data, eventually... + +_run_command(cmd, jid, log_path) + + diff --git a/library/copy b/library/copy old mode 100644 new mode 100755 diff --git a/library/ping b/library/ping old mode 100644 new mode 100755 index 4526406165..e5b068c8c0 --- a/library/ping +++ b/library/ping @@ -22,4 +22,4 @@ try: except ImportError: import simplejson as json -print json.dumps(1) +print json.dumps({ "ping" : "pong" })