1
0
Fork 0
mirror of https://github.com/ansible-collections/community.general.git synced 2024-09-14 20:13:21 +02:00

misc cleanup in the runner module, splitting some things out into utils, breaking up functions into smaller functions.

This commit is contained in:
Michael DeHaan 2012-03-21 23:39:09 -04:00
parent fef73393f0
commit 6a7aac38c5
3 changed files with 259 additions and 337 deletions

View file

@ -295,7 +295,7 @@ class PlayBook(object):
remote_user=remote_user, remote_user=remote_user,
setup_cache=SETUP_CACHE, setup_cache=SETUP_CACHE,
basedir=self.basedir, basedir=self.basedir,
conditionally_execute_if=only_if conditional=only_if
) )
if async_seconds == 0: if async_seconds == 0:

View file

@ -33,9 +33,6 @@ import ansible.connection
from ansible import utils from ansible import utils
from ansible import errors from ansible import errors
# should be True except in debug
CLEANUP_FILES = True
################################################ ################################################
def _executor_hook(job_queue, result_queue): def _executor_hook(job_queue, result_queue):
@ -62,43 +59,23 @@ class Runner(object):
_external_variable_script = None _external_variable_script = None
def __init__(self, def __init__(self, host_list=C.DEFAULT_HOST_LIST, module_path=C.DEFAULT_MODULE_PATH,
host_list=C.DEFAULT_HOST_LIST, module_name=C.DEFAULT_MODULE_NAME, module_args=C.DEFAULT_MODULE_ARGS,
module_path=C.DEFAULT_MODULE_PATH, forks=C.DEFAULT_FORKS, timeout=C.DEFAULT_TIMEOUT, pattern=C.DEFAULT_PATTERN,
module_name=C.DEFAULT_MODULE_NAME, remote_user=C.DEFAULT_REMOTE_USER, remote_pass=C.DEFAULT_REMOTE_PASS,
module_args=C.DEFAULT_MODULE_ARGS, background=0, basedir=None, setup_cache=None, transport='paramiko',
forks=C.DEFAULT_FORKS, conditional='True', verbose=False):
timeout=C.DEFAULT_TIMEOUT,
pattern=C.DEFAULT_PATTERN,
remote_user=C.DEFAULT_REMOTE_USER,
remote_pass=C.DEFAULT_REMOTE_PASS,
background=0,
basedir=None,
setup_cache=None,
transport='paramiko',
conditionally_execute_if='True',
verbose=False):
'''
Constructor
host_list -- file on disk listing hosts to manage, or an array of hostnames
pattern ------ a fnmatch pattern selecting some of the hosts in host_list
module_path -- location of ansible library on disk
module_name -- which module to run
module_args -- arguments to pass to module
forks -------- how parallel should we be? 1 is extra debuggable.
remote_user -- who to login as (default root)
remote_pass -- provide only if you don't want to use keys or ssh-agent
background --- if non 0, run async, failing after X seconds, -1 == infinite
setup_cache -- used only by playbook (complex explanation pending)
'''
if setup_cache is None: if setup_cache is None:
setup_cache = {} setup_cache = {}
self.setup_cache = setup_cache if basedir is None:
self.conditionally_execute_if = conditionally_execute_if basedir = os.getcwd()
self.generated_jid = str(random.randint(0, 999999999999))
self.connector = ansible.connection.Connection(self, transport)
self.host_list, self.groups = self.parse_hosts(host_list) self.host_list, self.groups = self.parse_hosts(host_list)
self.setup_cache = setup_cache
self.conditional = conditional
self.module_path = module_path self.module_path = module_path
self.module_name = module_name self.module_name = module_name
self.forks = int(forks) self.forks = int(forks)
@ -108,74 +85,73 @@ class Runner(object):
self.verbose = verbose self.verbose = verbose
self.remote_user = remote_user self.remote_user = remote_user
self.remote_pass = remote_pass self.remote_pass = remote_pass
self.background = background self.background = background
if basedir is None:
basedir = os.getcwd()
self.basedir = basedir self.basedir = basedir
# hosts in each group name in the inventory file
self._tmp_paths = {} self._tmp_paths = {}
random.seed() random.seed()
self.generated_jid = str(random.randint(0, 999999999999))
self.connector = ansible.connection.Connection(self, transport)
# *****************************************************
@classmethod
def parse_hosts_from_regular_file(cls, host_list, results, groups):
''' parse a textual host file '''
lines = file(host_list).read().split("\n")
group_name = 'ungrouped'
for item in lines:
item = item.lstrip().rstrip()
if item.startswith("#"):
# ignore commented out lines
continue
if item.startswith("["):
# looks like a group
group_name = item.replace("[","").replace("]","").lstrip().rstrip()
groups[group_name] = []
elif item != "":
# looks like a regular host
groups[group_name].append(item)
results.append(item)
# *****************************************************
@classmethod
def parse_hosts_from_script(cls, host_list, results, groups):
''' evaluate a script that returns list of hosts by groups '''
host_list = os.path.abspath(host_list)
cls._external_variable_script = host_list
cmd = subprocess.Popen([host_list], stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=False)
out, err = cmd.communicate()
try:
groups = utils.json_loads(out)
except:
raise errors.AnsibleError("invalid JSON response from script: %s" % host_list)
for (groupname, hostlist) in groups.iteritems():
for host in hostlist:
if host not in results:
results.append(host)
# ***************************************************** # *****************************************************
@classmethod @classmethod
def parse_hosts(cls, host_list): def parse_hosts(cls, host_list):
''' ''' parse the host inventory file, returns (hosts, groups) '''
parse the host inventory file, returns (hosts, groups)
[groupname]
host1
host2
'''
if type(host_list) == list: if type(host_list) == list:
return (host_list, {}) return (host_list, {})
host_list = os.path.expanduser(host_list) host_list = os.path.expanduser(host_list)
if not os.path.exists(host_list): if not os.path.exists(host_list):
raise errors.AnsibleFileNotFound("inventory file not found: %s" % host_list) raise errors.AnsibleFileNotFound("inventory file not found: %s" % host_list)
results = [] results = []
groups = { 'ungrouped' : [] } groups = dict(ungrouped=[])
if not os.access(host_list, os.X_OK): if not os.access(host_list, os.X_OK):
# it's a regular file Runner.parse_hosts_from_regular_file(host_list, results, groups)
lines = file(host_list).read().split("\n")
group_name = 'ungrouped'
results = []
for item in lines:
item = item.lstrip().rstrip()
if item.startswith("#"):
# ignore commented out lines
continue
if item.startswith("["):
# looks like a group
group_name = item.replace("[","").replace("]","").lstrip().rstrip()
groups[group_name] = []
elif item != "":
# looks like a regular host
groups[group_name].append(item)
results.append(item)
else: else:
host_list = os.path.abspath(host_list) Runner.parse_hosts_from_script(host_list, results, groups)
cls._external_variable_script = host_list
# it's a script -- expect a return of a JSON hash with group names keyed
# to lists of hosts
cmd = subprocess.Popen([host_list], stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=False)
out, err = cmd.communicate()
try:
groups = utils.json_loads(out)
except:
raise errors.AnsibleError("invalid JSON response from script: %s" % host_list)
for (groupname, hostlist) in groups.iteritems():
for host in hostlist:
if host not in results:
results.append(host)
return (results, groups) return (results, groups)
# ***************************************************** # *****************************************************
@ -192,13 +168,9 @@ class Runner(object):
pattern = pattern.replace(";",":") pattern = pattern.replace(";",":")
subpatterns = pattern.split(":") subpatterns = pattern.split(":")
for subpattern in subpatterns: for subpattern in subpatterns:
# the pattern could be a real glob if subpattern == 'all' or fnmatch.fnmatch(host_name, subpattern):
if subpattern == 'all':
return True return True
if fnmatch.fnmatch(host_name, subpattern): elif subpattern in self.groups:
return True
# or it could be a literal group name instead
if subpattern in self.groups:
if host_name in self.groups[subpattern]: if host_name in self.groups[subpattern]:
return True return True
return False return False
@ -206,11 +178,7 @@ class Runner(object):
# ***************************************************** # *****************************************************
def _connect(self, host): def _connect(self, host):
''' ''' connects to a host, returns (is_successful, connection_object OR traceback_string) '''
obtains a connection to the host.
on success, returns (True, connection)
on failure, returns (False, traceback str)
'''
try: try:
return [ True, self.connector.connect(host) ] return [ True, self.connector.connect(host) ]
@ -223,14 +191,11 @@ class Runner(object):
''' helper function to handle JSON parsing of results ''' ''' helper function to handle JSON parsing of results '''
try: try:
# try to parse the JSON response
result = utils.parse_json(result) result = utils.parse_json(result)
if executed is not None: if executed is not None:
result['invocation'] = executed result['invocation'] = executed
return [ host, True, result ] return [ host, True, result ]
except Exception, e: except Exception, e:
# it failed to parse, say so, but return the string anyway so
# it can be debugged
return [ host, False, "%s/%s/%s" % (str(e), result, executed) ] return [ host, False, "%s/%s/%s" % (str(e), result, executed) ]
# ***************************************************** # *****************************************************
@ -243,8 +208,7 @@ class Runner(object):
for filename in files: for filename in files:
if not filename.startswith('/tmp/'): if not filename.startswith('/tmp/'):
raise Exception("not going to happen") raise Exception("not going to happen")
if CLEANUP_FILES: self._exec_command(conn, "rm -rf %s" % filename)
self._exec_command(conn, "rm -rf %s" % filename)
# ***************************************************** # *****************************************************
@ -256,10 +220,7 @@ class Runner(object):
# ***************************************************** # *****************************************************
def _transfer_module(self, conn, tmp, module): def _transfer_module(self, conn, tmp, module):
''' ''' transfers a module file to the remote side to execute it, but does not execute it yet '''
transfers a module file to the remote side to execute it,
but does not execute it yet
'''
outpath = self._copy_module(conn, tmp, module) outpath = self._copy_module(conn, tmp, module)
self._exec_command(conn, "chmod +x %s" % outpath) self._exec_command(conn, "chmod +x %s" % outpath)
@ -268,93 +229,97 @@ class Runner(object):
# ***************************************************** # *****************************************************
def _transfer_argsfile(self, conn, tmp, args_str): def _transfer_argsfile(self, conn, tmp, args_str):
''' ''' transfer arguments as a single file to be fed to the module. '''
transfer arguments as a single file to be fed to the module.
this is to avoid various shell things being eaten by SSH
'''
args_fd, args_file = tempfile.mkstemp() args_fd, args_file = tempfile.mkstemp()
args_fo = os.fdopen(args_fd, 'w') args_fo = os.fdopen(args_fd, 'w')
args_fo.write(args_str) args_fo.write(args_str)
args_fo.flush() args_fo.flush()
args_fo.close() args_fo.close()
args_remote = os.path.join(tmp, 'arguments') args_remote = os.path.join(tmp, 'arguments')
self._transfer_file(conn, args_file, args_remote) self._transfer_file(conn, args_file, args_remote)
if CLEANUP_FILES: os.unlink(args_file)
os.unlink(args_file)
return args_remote return args_remote
# ***************************************************** # *****************************************************
def _execute_module(self, conn, tmp, remote_module_path, module_args, def _add_variables_from_script(self, conn, inject):
async_jid=None, async_module=None, async_limit=None): ''' support per system variabes from external variable scripts, see web docs '''
'''
runs a module that has already been transferred, but first
modifies the command using setup_cache variables (see playbook)
'''
host = conn.host
args = module_args cmd = subprocess.Popen([Runner._external_variable_script, host],
if type(args) == list: stdout=subprocess.PIPE,
if remote_module_path.endswith('setup'): stderr=subprocess.PIPE,
# briefly converting arguments to strings before file transfer shell=False
# causes some translation errors. This is a workaround only
# needed for the setup module
args = " ".join([ "\"%s\"" % str(x) for x in module_args ])
else:
args = " ".join([ str(x) for x in module_args ])
# by default the args to substitute in the action line are those from the setup cache
inject_vars = self.setup_cache.get(conn.host,{})
# see if we really need to run this or not...
# doubly templated so we can store a conditional expression in a variable!
conditional = utils.template(
utils.template(self.conditionally_execute_if, inject_vars),
inject_vars
) )
out, err = cmd.communicate()
if not eval(conditional):
return [ utils.smjson(dict(skipped=True)), 'skipped' ]
# if the host file was an external script, execute it with the hostname
# as a first parameter to get the variables to use for the host
inject2 = {} inject2 = {}
if Runner._external_variable_script is not None: try:
host = conn.host inject2 = utils.json_loads(out)
cmd = subprocess.Popen([Runner._external_variable_script, host], except:
stdout=subprocess.PIPE, raise errors.AnsibleError("%s returned invalid result when called with hostname %s" % (
stderr=subprocess.PIPE, Runner._external_variable_script,
shell=False host
) ))
out, err = cmd.communicate() # store injected variables in the templates
inject2 = {} inject.update(inject2)
try:
inject2 = utils.json_loads(out)
except:
raise errors.AnsibleError("%s returned invalid result when called with hostname %s" % (
Runner._external_variable_script,
host
))
# store injected variables in the templates
inject_vars.update(inject2)
if self.module_name == 'setup':
for (k,v) in inject_vars.iteritems():
if not k.startswith('facter_') and not k.startswith('ohai_'):
if str(v).find(" ") != -1:
v = "\"%s\"" % v
args += " %s=%s" % (k, str(v).replace(" ","~~~"))
# the metadata location for the setup module is transparently managed # *****************************************************
# since it's an 'internals' module, kind of a black box. See playbook
# other modules are not allowed to have this kind of handling def _add_setup_vars(self, inject, args):
if remote_module_path.endswith("/setup") and args.find("metadata=") == -1: ''' setup module variables need special handling '''
for (k,v) in inject.iteritems():
if not k.startswith('facter_') and not k.startswith('ohai_'):
if str(v).find(" ") != -1:
v = "\"%s\"" % v
args += " %s=%s" % (k, str(v).replace(" ","~~~"))
# *****************************************************
def _add_setup_metadata(self, args):
''' automatically determine where to store variables for the setup module '''
if args.find("metadata=") == -1:
if self.remote_user == 'root': if self.remote_user == 'root':
args = "%s metadata=/etc/ansible/setup" % args args = "%s metadata=/etc/ansible/setup" % args
else: else:
args = "%s metadata=~/.ansible/setup" % args args = "%s metadata=~/.ansible/setup" % args
# *****************************************************
args = utils.template(args, inject_vars) def _coerce_args_to_string(self, args, remote_module_path):
''' final arguments must always be made a string '''
if type(args) == list:
if remote_module_path.endswith('setup'):
# quote long strings so setup module gets them unscathed
args = " ".join([ "\"%s\"" % str(x) for x in args ])
else:
args = " ".join([ str(x) for x in args ])
return args
# *****************************************************
def _execute_module(self, conn, tmp, remote_module_path, args,
async_jid=None, async_module=None, async_limit=None):
''' runs a module that has already been transferred '''
args = self._coerce_args_to_string(args, remote_module_path)
inject = self.setup_cache.get(conn.host,{})
conditional = utils.double_template(self.conditional, inject)
if not eval(conditional):
return [ utils.smjson(dict(skipped=True)), 'skipped' ]
if Runner._external_variable_script is not None:
self._add_variables_from_script(conn, inject)
if self.module_name == 'setup':
self._add_setup_vars(inject, args)
self._add_setup_metadata(args)
args = utils.template(args, inject)
module_name_tail = remote_module_path.split("/")[-1] module_name_tail = remote_module_path.split("/")[-1]
client_executed_str = "%s %s" % (module_name_tail, args.strip()) client_executed_str = "%s %s" % (module_name_tail, args.strip())
@ -362,58 +327,51 @@ class Runner(object):
if async_jid is None: if async_jid is None:
cmd = "%s %s" % (remote_module_path, argsfile) cmd = "%s %s" % (remote_module_path, argsfile)
else: else:
args = [str(x) for x in [remote_module_path, async_jid, async_limit, async_module, argsfile]] cmd = " ".join([str(x) for x in [remote_module_path, async_jid, async_limit, async_module, argsfile]])
cmd = " ".join(args) return [ self._exec_command(conn, cmd), client_executed_str ]
result = self._exec_command(conn, cmd)
return [ result, client_executed_str ] # *****************************************************
def _add_result_to_setup_cache(self, conn, result):
''' allows discovered variables to be used in templates and action statements '''
host = conn.host
try:
var_result = utils.parse_json(result)
except:
var_result = {}
# note: do not allow variables from playbook to be stomped on
# by variables coming up from facter/ohai/etc. They
# should be prefixed anyway
if not host in self.setup_cache:
self.setup_cache[host] = {}
for (k, v) in var_result.iteritems():
if not k in self.setup_cache[host]:
self.setup_cache[host][k] = v
# ***************************************************** # *****************************************************
def _execute_normal_module(self, conn, host, tmp, module_name): def _execute_normal_module(self, conn, host, tmp, module_name):
''' ''' transfer & execute a module that is not 'copy' or 'template' '''
transfer & execute a module that is not 'copy' or 'template'
because those require extra work.
'''
# hack to make the 'shell' module keyword really be executed # shell and command are the same module
# by the command module
module_args = self.module_args
if module_name == 'shell': if module_name == 'shell':
module_name = 'command' module_name = 'command'
module_args.append("#USE_SHELL") self.module_args.append("#USE_SHELL")
module = self._transfer_module(conn, tmp, module_name) module = self._transfer_module(conn, tmp, module_name)
(result, executed) = self._execute_module(conn, tmp, module, module_args) (result, executed) = self._execute_module(conn, tmp, module, self.module_args)
# when running the setup module, which pushes vars to the host and ALSO
# returns them (+factoids), store the variables that were returned such that commands
# run AFTER setup use these variables for templating when executed
# from playbooks
if module_name == 'setup': if module_name == 'setup':
host = conn.host self._add_result_to_setup_cache(conn, result)
try:
var_result = utils.parse_json(result)
except:
var_result = {}
# note: do not allow variables from playbook to be stomped on
# by variables coming up from facter/ohai/etc. They
# should be prefixed anyway
if not host in self.setup_cache:
self.setup_cache[host] = {}
for (k, v) in var_result.iteritems():
if not k in self.setup_cache[host]:
self.setup_cache[host][k] = v
return self._return_from_module(conn, host, result, executed) return self._return_from_module(conn, host, result, executed)
# ***************************************************** # *****************************************************
def _execute_async_module(self, conn, host, tmp, module_name): def _execute_async_module(self, conn, host, tmp, module_name):
''' ''' transfer the given module name, plus the async module, then run it '''
transfer the given module name, plus the async module
and then run the async module wrapping the other module
'''
# hack to make the 'shell' module keyword really be executed # hack to make the 'shell' module keyword really be executed
# by the command module # by the command module
@ -434,30 +392,16 @@ class Runner(object):
# ***************************************************** # *****************************************************
def _parse_kv(self, args):
# FIXME: move to utils
''' helper function to convert a string of key/value items to a dict '''
options = {}
for x in args:
if x.find("=") != -1:
k, v = x.split("=")
options[k]=v
return options
# *****************************************************
def _execute_copy(self, conn, host, tmp): def _execute_copy(self, conn, host, tmp):
''' handler for file transfer operations ''' ''' handler for file transfer operations '''
# load up options # load up options
options = self._parse_kv(self.module_args) options = utils.parse_kv(self.module_args)
source = options['src'] source = options['src']
dest = options['dest'] dest = options['dest']
# transfer the file to a remote tmp location # transfer the file to a remote tmp location
tmp_path = tmp tmp_src = tmp + source.split('/')[-1]
tmp_src = tmp_path + source.split('/')[-1]
self._transfer_file(conn, utils.path_dwim(self.basedir, source), tmp_src) self._transfer_file(conn, utils.path_dwim(self.basedir, source), tmp_src)
# install the copy module # install the copy module
@ -467,34 +411,37 @@ class Runner(object):
# run the copy module # run the copy module
args = [ "src=%s" % tmp_src, "dest=%s" % dest ] args = [ "src=%s" % tmp_src, "dest=%s" % dest ]
(result1, executed) = self._execute_module(conn, tmp, module, args) (result1, executed) = self._execute_module(conn, tmp, module, args)
results1 = self._return_from_module(conn, host, result1, executed) (host, ok, data) = self._return_from_module(conn, host, result1, executed)
(host, ok, data) = results1
# magically chain into the file module
if ok: if ok:
# unless failed, run the file module to adjust file aspects return self._chain_file_module(conn, tmp, data, options, executed)
old_changed = data.get('changed', False)
module = self._transfer_module(conn, tmp, 'file')
args = [ "%s=%s" % (k,v) for (k,v) in options.items() ]
(result2, executed2) = self._execute_module(conn, tmp, module, args)
results2 = self._return_from_module(conn, host, result2, executed)
(host, ok, data2) = results2
new_changed = data2.get('changed', False)
data.update(data2)
if old_changed or new_changed:
data['changed'] = True
return (host, ok, data)
else: else:
# copy failed, return orig result without going through 'file' module
return results1 return results1
# ***************************************************** # *****************************************************
def _chain_file_module(self, conn, tmp, data, options, executed):
''' handles changing file attribs after copy/template operations '''
old_changed = data.get('changed', False)
module = self._transfer_module(conn, tmp, 'file')
args = [ "%s=%s" % (k,v) for (k,v) in options.items() ]
(result2, executed2) = self._execute_module(conn, tmp, module, args)
results2 = self._return_from_module(conn, conn.host, result2, executed)
(host, ok, data2) = results2
new_changed = data2.get('changed', False)
data.update(data2)
if old_changed or new_changed:
data['changed'] = True
return (host, ok, data)
# *****************************************************
def _execute_template(self, conn, host, tmp): def _execute_template(self, conn, host, tmp):
''' handler for template operations ''' ''' handler for template operations '''
# load up options # load up options
options = self._parse_kv(self.module_args) options = utils.parse_kv(self.module_args)
source = options['src'] source = options['src']
dest = options['dest'] dest = options['dest']
metadata = options.get('metadata', None) metadata = options.get('metadata', None)
@ -506,9 +453,7 @@ class Runner(object):
metadata = '~/.ansible/setup' metadata = '~/.ansible/setup'
# first copy the source template over # first copy the source template over
tpath = tmp temppath = tmp + os.path.split(source)[-1]
tempname = os.path.split(source)[-1]
temppath = tpath + tempname
self._transfer_file(conn, utils.path_dwim(self.basedir, source), temppath) self._transfer_file(conn, utils.path_dwim(self.basedir, source), temppath)
# install the template module # install the template module
@ -517,41 +462,17 @@ class Runner(object):
# run the template module # run the template module
args = [ "src=%s" % temppath, "dest=%s" % dest, "metadata=%s" % metadata ] args = [ "src=%s" % temppath, "dest=%s" % dest, "metadata=%s" % metadata ]
(result1, executed) = self._execute_module(conn, tmp, template_module, args) (result1, executed) = self._execute_module(conn, tmp, template_module, args)
results1 = self._return_from_module(conn, host, result1, executed) (host, ok, data) = self._return_from_module(conn, host, result1, executed)
(host, ok, data) = results1
# magically chain into the file module
if ok: if ok:
# unless failed, run the file module to adjust file aspects return self._chain_file_module(conn, tmp, data, options, executed)
old_changed = data.get('changed', False)
module = self._transfer_module(conn, tmp, 'file')
args = [ "%s=%s" % (k,v) for (k,v) in options.items() ]
(result2, executed2) = self._execute_module(conn, tmp, module, args)
results2 = self._return_from_module(conn, host, result2, executed)
(host, ok, data2) = results2
new_changed = data2.get('changed', False)
data.update(data2)
if old_changed or new_changed:
data['changed'] = True
return (host, ok, data)
else: else:
# copy failed, return orig result without going through 'file' module
return results1 return results1
# ***************************************************** # *****************************************************
def _executor(self, host): def _executor(self, host):
''' ''' callback executed in parallel for each host. returns (hostname, connected_ok, extra) '''
callback executed in parallel for each host.
returns (hostname, connected_ok, extra)
where extra is the result of a successful connect
or a traceback string
'''
# depending on whether it's a normal module,
# or a request to use the copy or template
# module, call the appropriate executor function
ok, conn = self._connect(host) ok, conn = self._connect(host)
if not ok: if not ok:
@ -562,24 +483,18 @@ class Runner(object):
tmp = self._get_tmp_path(conn) tmp = self._get_tmp_path(conn)
result = None result = None
if self.module_name not in [ 'copy', 'template' ]: if self.module_name == 'copy':
result = self._execute_copy(conn, host, tmp)
elif self.module_name == 'template':
result = self._execute_template(conn, host, tmp)
else:
if self.background == 0: if self.background == 0:
result = self._execute_normal_module(conn, host, tmp, module_name) result = self._execute_normal_module(conn, host, tmp, module_name)
else: else:
result = self._execute_async_module(conn, host, tmp, module_name) result = self._execute_async_module(conn, host, tmp, module_name)
elif self.module_name == 'copy':
result = self._execute_copy(conn, host, tmp)
elif self.module_name == 'template':
result = self._execute_template(conn, host, tmp)
else:
# this would be a coding error in THIS module
# shouldn't occur
raise Exception("???")
self._delete_remote_files(conn, tmp) self._delete_remote_files(conn, tmp)
conn.close() conn.close()
return result return result
# ***************************************************** # *****************************************************
@ -619,42 +534,28 @@ class Runner(object):
# ***************************************************** # *****************************************************
def match_hosts(self, pattern): def _match_hosts(self, pattern):
''' return all matched hosts fitting a pattern ''' ''' return all matched hosts fitting a pattern '''
return [ h for h in self.host_list if self._matches(h, pattern) ] return [ h for h in self.host_list if self._matches(h, pattern) ]
# ***************************************************** # *****************************************************
def run(self): def _parallel_exec(self, hosts):
''' xfer & run module on all matched hosts ''' ''' handles mulitprocessing when more than 1 fork is required '''
# find hosts that match the pattern
hosts = self.match_hosts(self.pattern)
if len(hosts) == 0:
return {
'contacted' : {},
'dark' : {}
}
# attack pool of hosts in N forks job_queue = multiprocessing.Manager().Queue()
# _executor_hook does all of the work result_queue = multiprocessing.Manager().Queue()
hosts = [ (self,x) for x in hosts ]
[job_queue.put(i) for i in hosts]
workers = []
for i in range(self.forks):
prc = multiprocessing.Process(target=_executor_hook,
args=(job_queue, result_queue))
prc.start()
workers.append(prc)
if self.forks > 1:
job_queue = multiprocessing.Manager().Queue()
result_queue = multiprocessing.Manager().Queue()
for i in hosts:
job_queue.put(i)
workers = []
for i in range(self.forks):
tmp = multiprocessing.Process(target=_executor_hook,
args=(job_queue, result_queue))
tmp.start()
workers.append(tmp)
try: try:
for worker in workers: for worker in workers:
worker.join() worker.join()
@ -662,38 +563,48 @@ class Runner(object):
for worker in workers: for worker in workers:
worker.terminate() worker.terminate()
worker.join() worker.join()
results = []
while not result_queue.empty():
results.append(result_queue.get(block=False))
else:
results = [ x._executor(h) for (x,h) in hosts ]
# sort hosts by ones we successfully contacted results = []
# and ones we did not so that we can return a while not result_queue.empty():
# dictionary containing results of everything results.append(result_queue.get(block=False))
return results
results2 = { # *****************************************************
"contacted" : {},
"dark" : {} def _partition_results(self, results):
} ''' seperate results by ones we contacted & ones we didn't '''
hosts_with_results = []
for x in results: results2 = dict(contacted={}, dark={})
(host, is_ok, result) = x
hosts_with_results.append(host) for result in results:
if not is_ok: (host, contacted_ok, result) = result
results2["dark"][host] = result if contacted_ok:
else:
results2["contacted"][host] = result results2["contacted"][host] = result
# hosts which were contacted but never got a chance else:
# to return a result before we exited/ctrl-c'd results2["dark"][host] = result
# perhaps these shouldn't be 'dark' but I'm not sure if they fit
# anywhere else. # hosts which were contacted but never got a chance to return
for host in self.match_hosts(self.pattern): for host in self._match_hosts(self.pattern):
if host not in hosts_with_results: if not (host in results2['dark'] or host in results2['contacted']):
results2["dark"][host] = {} results2["dark"][host] = {}
return results2 return results2
# *****************************************************
def run(self):
''' xfer & run module on all matched hosts '''
# find hosts that match the pattern
hosts = self._match_hosts(self.pattern)
if len(hosts) == 0:
return dict(contacted={}, dark={})
hosts = [ (self,x) for x in hosts ]
if self.forks > 1:
results = self._parallel_exec(hosts, results)
else:
results = [ x._executor(h) for (x,h) in hosts ]
return self._partition_results(results)

View file

@ -272,6 +272,9 @@ def template(text, vars):
template = jinja2.Template(text) template = jinja2.Template(text)
return template.render(vars) return template.render(vars)
def double_template(text, vars):
return template(template(text, vars), vars)
def template_from_file(path, vars): def template_from_file(path, vars):
''' run a file through the templating engine ''' ''' run a file through the templating engine '''
data = file(path).read() data = file(path).read()
@ -287,4 +290,12 @@ def parse_yaml_from_file(path):
raise errors.AnsibleError("file not found: %s" % path) raise errors.AnsibleError("file not found: %s" % path)
return parse_yaml(data) return parse_yaml(data)
def parse_kv(args):
''' convert a string of key/value items to a dict '''
options = {}
for x in args:
if x.find("=") != -1:
k, v = x.split("=")
options[k]=v
return options