mirror of
https://github.com/ansible-collections/community.general.git
synced 2024-09-14 20:13:21 +02:00
Modify /usr/bin/ansible and playbooks to use the new async API.
This commit is contained in:
parent
ce9a8c9ffc
commit
029fe1273c
4 changed files with 30 additions and 143 deletions
70
bin/ansible
70
bin/ansible
|
@ -21,7 +21,6 @@
|
||||||
|
|
||||||
import sys
|
import sys
|
||||||
import getpass
|
import getpass
|
||||||
import time
|
|
||||||
|
|
||||||
import ansible.runner
|
import ansible.runner
|
||||||
import ansible.constants as C
|
import ansible.constants as C
|
||||||
|
@ -40,7 +39,6 @@ class Cli(object):
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.stats = callbacks.AggregateStats()
|
self.stats = callbacks.AggregateStats()
|
||||||
self.callbacks = callbacks.CliRunnerCallbacks()
|
self.callbacks = callbacks.CliRunnerCallbacks()
|
||||||
self.silent_callbacks = callbacks.DefaultRunnerCallbacks()
|
|
||||||
|
|
||||||
# ----------------------------------------------
|
# ----------------------------------------------
|
||||||
|
|
||||||
|
@ -85,8 +83,6 @@ class Cli(object):
|
||||||
|
|
||||||
if options.tree:
|
if options.tree:
|
||||||
utils.prepare_writeable_dir(options.tree)
|
utils.prepare_writeable_dir(options.tree)
|
||||||
if options.seconds:
|
|
||||||
print "background launch...\n\n"
|
|
||||||
|
|
||||||
runner = ansible.runner.Runner(
|
runner = ansible.runner.Runner(
|
||||||
module_name=options.module_name, module_path=options.module_path,
|
module_name=options.module_name, module_path=options.module_path,
|
||||||
|
@ -94,72 +90,32 @@ class Cli(object):
|
||||||
remote_user=options.remote_user, remote_pass=sshpass,
|
remote_user=options.remote_user, remote_pass=sshpass,
|
||||||
inventory=inventory_manager, timeout=options.timeout,
|
inventory=inventory_manager, timeout=options.timeout,
|
||||||
forks=options.forks,
|
forks=options.forks,
|
||||||
background=options.seconds, pattern=pattern,
|
pattern=pattern,
|
||||||
callbacks=self.callbacks, sudo=options.sudo,
|
callbacks=self.callbacks, sudo=options.sudo,
|
||||||
sudo_pass=sudopass,
|
sudo_pass=sudopass,
|
||||||
transport=options.connection, debug=options.debug
|
transport=options.connection, debug=options.debug
|
||||||
)
|
)
|
||||||
return (runner, runner.run())
|
|
||||||
|
|
||||||
|
if options.seconds:
|
||||||
|
print "background launch...\n\n"
|
||||||
|
results, poller = runner.runAsync(options.seconds)
|
||||||
|
results = self.poll_while_needed(poller, options)
|
||||||
|
else:
|
||||||
|
results = runner.run()
|
||||||
|
|
||||||
# ----------------------------------------------
|
return (runner, results)
|
||||||
|
|
||||||
def get_polling_runner(self, old_runner, jid):
|
|
||||||
return ansible.runner.Runner(
|
|
||||||
module_name='async_status', module_path=old_runner.module_path,
|
|
||||||
module_args="jid=%s" % jid, remote_user=old_runner.remote_user,
|
|
||||||
remote_pass=old_runner.remote_pass, inventory=old_runner.inventory,
|
|
||||||
timeout=old_runner.timeout, forks=old_runner.forks,
|
|
||||||
pattern='*', callbacks=self.silent_callbacks,
|
|
||||||
)
|
|
||||||
|
|
||||||
# ----------------------------------------------
|
|
||||||
|
|
||||||
def hosts_to_poll(self, results):
|
|
||||||
hosts = []
|
|
||||||
for (host, res) in results['contacted'].iteritems():
|
|
||||||
if res.get('started',False):
|
|
||||||
hosts.append(host)
|
|
||||||
return hosts
|
|
||||||
|
|
||||||
# ----------------------------------------------
|
# ----------------------------------------------
|
||||||
|
|
||||||
def poll_if_needed(self, runner, results, options, args):
|
def poll_while_needed(self, poller, options):
|
||||||
''' summarize results from Runner '''
|
''' summarize results from Runner '''
|
||||||
|
|
||||||
if results is None:
|
|
||||||
exit("No hosts matched")
|
|
||||||
|
|
||||||
# BACKGROUND POLL LOGIC when -B and -P are specified
|
# BACKGROUND POLL LOGIC when -B and -P are specified
|
||||||
# FIXME: refactor
|
|
||||||
if options.seconds and options.poll_interval > 0:
|
if options.seconds and options.poll_interval > 0:
|
||||||
poll_hosts = results['contacted'].keys()
|
poller.wait(options.seconds, options.poll_interval)
|
||||||
if len(poll_hosts) == 0:
|
|
||||||
exit("no jobs were launched successfully")
|
return poller.results
|
||||||
ahost = poll_hosts[0]
|
|
||||||
jid = results['contacted'][ahost].get('ansible_job_id', None)
|
|
||||||
if jid is None:
|
|
||||||
exit("unexpected error: unable to determine jid")
|
|
||||||
|
|
||||||
clock = options.seconds
|
|
||||||
while (clock >= 0):
|
|
||||||
runner.inventory.restrict_to(poll_hosts)
|
|
||||||
polling_runner = self.get_polling_runner(runner, jid)
|
|
||||||
poll_results = polling_runner.run()
|
|
||||||
runner.inventory.lift_restriction()
|
|
||||||
if poll_results is None:
|
|
||||||
break
|
|
||||||
for (host, host_result) in poll_results['contacted'].iteritems():
|
|
||||||
# override last result with current status result for report
|
|
||||||
results['contacted'][host] = host_result
|
|
||||||
print utils.async_poll_status(jid, host, clock, host_result)
|
|
||||||
for (host, host_result) in poll_results['dark'].iteritems():
|
|
||||||
print "FAILED: %s => %s" % (host, host_result)
|
|
||||||
clock = clock - options.poll_interval
|
|
||||||
time.sleep(options.poll_interval)
|
|
||||||
poll_hosts = self.hosts_to_poll(poll_results)
|
|
||||||
if len(poll_hosts)==0:
|
|
||||||
break
|
|
||||||
|
|
||||||
########################################################
|
########################################################
|
||||||
|
|
||||||
|
@ -172,6 +128,4 @@ if __name__ == '__main__':
|
||||||
# Generic handler for ansible specific errors
|
# Generic handler for ansible specific errors
|
||||||
print "ERROR: %s" % str(e)
|
print "ERROR: %s" % str(e)
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
else:
|
|
||||||
cli.poll_if_needed(runner, results, options, args)
|
|
||||||
|
|
||||||
|
|
|
@ -23,7 +23,6 @@ import ansible.constants as C
|
||||||
from ansible import utils
|
from ansible import utils
|
||||||
from ansible import errors
|
from ansible import errors
|
||||||
import os
|
import os
|
||||||
import time
|
|
||||||
|
|
||||||
# used to transfer variables to Runner
|
# used to transfer variables to Runner
|
||||||
SETUP_CACHE={ }
|
SETUP_CACHE={ }
|
||||||
|
@ -254,83 +253,17 @@ class PlayBook(object):
|
||||||
|
|
||||||
# *****************************************************
|
# *****************************************************
|
||||||
|
|
||||||
def hosts_to_poll(self, results):
|
def _async_poll(self, poller, async_seconds, async_poll_interval):
|
||||||
''' which hosts need more polling? '''
|
|
||||||
|
|
||||||
hosts = []
|
|
||||||
for (host, res) in results['contacted'].iteritems():
|
|
||||||
if (host in self.stats.failures) or (host in self.stats.dark):
|
|
||||||
continue
|
|
||||||
if not 'finished' in res and not 'skipped' in res and 'started' in res:
|
|
||||||
hosts.append(host)
|
|
||||||
return hosts
|
|
||||||
|
|
||||||
# *****************************************************
|
|
||||||
|
|
||||||
def _async_poll(self, runner, hosts, async_seconds, async_poll_interval, only_if):
|
|
||||||
''' launch an async job, if poll_interval is set, wait for completion '''
|
''' launch an async job, if poll_interval is set, wait for completion '''
|
||||||
|
|
||||||
runner.background = async_seconds
|
results = poller.wait(async_seconds, async_poll_interval)
|
||||||
results = runner.run()
|
|
||||||
self.stats.compute(results, poll=True)
|
|
||||||
|
|
||||||
if async_poll_interval <= 0:
|
|
||||||
# if not polling, playbook requested fire and forget
|
|
||||||
# trust the user wanted that and return immediately
|
|
||||||
return results
|
|
||||||
|
|
||||||
poll_hosts = results['contacted'].keys()
|
|
||||||
if len(poll_hosts) == 0:
|
|
||||||
# no hosts launched ok, return that.
|
|
||||||
return results
|
|
||||||
ahost = poll_hosts[0]
|
|
||||||
|
|
||||||
jid = results['contacted'][ahost].get('ansible_job_id', None)
|
|
||||||
|
|
||||||
if jid is None:
|
|
||||||
# note: this really shouldn't happen, ever
|
|
||||||
self.callbacks.on_async_confused("unexpected error: unable to determine jid")
|
|
||||||
return results
|
|
||||||
|
|
||||||
clock = async_seconds
|
|
||||||
host_list = self.hosts_to_poll(results)
|
|
||||||
|
|
||||||
poll_results = results
|
|
||||||
while (clock >= 0):
|
|
||||||
|
|
||||||
# poll/loop until polling duration complete
|
|
||||||
runner.module_args = "jid=%s" % jid
|
|
||||||
runner.module_name = 'async_status'
|
|
||||||
runner.background = 0
|
|
||||||
runner.pattern = '*'
|
|
||||||
self.inventory.restrict_to(host_list)
|
|
||||||
poll_results = runner.run()
|
|
||||||
self.stats.compute(poll_results, poll=True)
|
|
||||||
host_list = self.hosts_to_poll(poll_results)
|
|
||||||
self.inventory.lift_restriction()
|
|
||||||
|
|
||||||
if len(host_list) == 0:
|
|
||||||
break
|
|
||||||
if poll_results is None:
|
|
||||||
break
|
|
||||||
|
|
||||||
# mention which hosts we're going to poll again...
|
|
||||||
for (host, host_result) in poll_results['contacted'].iteritems():
|
|
||||||
results['contacted'][host] = host_result
|
|
||||||
if not host in self.stats.dark and not host in self.stats.failures:
|
|
||||||
self.callbacks.on_async_poll(jid, host, clock, host_result)
|
|
||||||
|
|
||||||
# run down the clock
|
|
||||||
clock = clock - async_poll_interval
|
|
||||||
time.sleep(async_poll_interval)
|
|
||||||
|
|
||||||
# mark any hosts that are still listed as started as failed
|
# mark any hosts that are still listed as started as failed
|
||||||
# since these likely got killed by async_wrapper
|
# since these likely got killed by async_wrapper
|
||||||
for (host, host_result) in poll_results['contacted'].iteritems():
|
for host in poller.hosts_to_poll:
|
||||||
if 'started' in host_result:
|
reason = { 'failed' : 1, 'rc' : None, 'msg' : 'timed out' }
|
||||||
reason = { 'failed' : 1, 'rc' : None, 'msg' : 'timed out' }
|
self.runner_callbacks.on_failed(host, reason)
|
||||||
self.runner_callbacks.on_failed(host, reason)
|
results['contacted'][host] = reason
|
||||||
results['contacted'][host] = reason
|
|
||||||
|
|
||||||
return results
|
return results
|
||||||
|
|
||||||
|
@ -361,7 +294,12 @@ class PlayBook(object):
|
||||||
if async_seconds == 0:
|
if async_seconds == 0:
|
||||||
results = runner.run()
|
results = runner.run()
|
||||||
else:
|
else:
|
||||||
results = self._async_poll(runner, hosts, async_seconds, async_poll_interval, only_if)
|
results, poller = runner.runAsync(async_seconds)
|
||||||
|
self.stats.compute(results)
|
||||||
|
if async_poll_interval > 0:
|
||||||
|
# if not polling, playbook requested fire and forget
|
||||||
|
# trust the user wanted that and return immediately
|
||||||
|
results = self._async_poll(poller, async_seconds, async_poll_interval)
|
||||||
|
|
||||||
self.inventory.lift_restriction()
|
self.inventory.lift_restriction()
|
||||||
return results
|
return results
|
||||||
|
|
|
@ -170,14 +170,6 @@ def path_dwim(basedir, given):
|
||||||
else:
|
else:
|
||||||
return os.path.join(basedir, given)
|
return os.path.join(basedir, given)
|
||||||
|
|
||||||
def async_poll_status(jid, host, clock, result):
|
|
||||||
if 'finished' in result:
|
|
||||||
return "<job %s> finished on %s" % (jid, host)
|
|
||||||
elif 'failed' in result:
|
|
||||||
return "<job %s> FAILED on %s" % (jid, host)
|
|
||||||
else:
|
|
||||||
return "<job %s> polling on %s, %s remaining" % (jid, host, clock)
|
|
||||||
|
|
||||||
def json_loads(data):
|
def json_loads(data):
|
||||||
return json.loads(data)
|
return json.loads(data)
|
||||||
|
|
||||||
|
|
|
@ -74,12 +74,15 @@ class TestCallbacks(object):
|
||||||
def on_play_start(self, pattern):
|
def on_play_start(self, pattern):
|
||||||
EVENTS.append([ 'play start', [ pattern ]])
|
EVENTS.append([ 'play start', [ pattern ]])
|
||||||
|
|
||||||
def on_async_confused(self, msg):
|
def on_async_ok(self, host, res, jid):
|
||||||
EVENTS.append([ 'async confused', [ msg ]])
|
EVENTS.append([ 'async ok', [ host ]])
|
||||||
|
|
||||||
def on_async_poll(self, jid, host, clock, host_result):
|
def on_async_poll(self, host, res, jid, clock):
|
||||||
EVENTS.append([ 'async poll', [ host ]])
|
EVENTS.append([ 'async poll', [ host ]])
|
||||||
|
|
||||||
|
def on_async_failed(self, host, res, jid):
|
||||||
|
EVENTS.append([ 'async failed', [ host ]])
|
||||||
|
|
||||||
def on_unreachable(self, host, msg):
|
def on_unreachable(self, host, msg):
|
||||||
EVENTS.append([ 'failed/dark', [ host, msg ]])
|
EVENTS.append([ 'failed/dark', [ host, msg ]])
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue