From 029fe1273cd33e06591de7086b8db6df8838f2e2 Mon Sep 17 00:00:00 2001 From: Jeroen Hoekx Date: Thu, 26 Apr 2012 20:35:19 +0200 Subject: [PATCH] Modify /usr/bin/ansible and playbooks to use the new async API. --- bin/ansible | 70 ++++++--------------------------- lib/ansible/playbook.py | 86 ++++++----------------------------------- lib/ansible/utils.py | 8 ---- test/TestPlayBook.py | 9 +++-- 4 files changed, 30 insertions(+), 143 deletions(-) diff --git a/bin/ansible b/bin/ansible index b18aeb2488..9175417b9f 100755 --- a/bin/ansible +++ b/bin/ansible @@ -21,7 +21,6 @@ import sys import getpass -import time import ansible.runner import ansible.constants as C @@ -40,7 +39,6 @@ class Cli(object): def __init__(self): self.stats = callbacks.AggregateStats() self.callbacks = callbacks.CliRunnerCallbacks() - self.silent_callbacks = callbacks.DefaultRunnerCallbacks() # ---------------------------------------------- @@ -85,8 +83,6 @@ class Cli(object): if options.tree: utils.prepare_writeable_dir(options.tree) - if options.seconds: - print "background launch...\n\n" runner = ansible.runner.Runner( module_name=options.module_name, module_path=options.module_path, @@ -94,72 +90,32 @@ class Cli(object): remote_user=options.remote_user, remote_pass=sshpass, inventory=inventory_manager, timeout=options.timeout, forks=options.forks, - background=options.seconds, pattern=pattern, + pattern=pattern, callbacks=self.callbacks, sudo=options.sudo, sudo_pass=sudopass, transport=options.connection, debug=options.debug ) - return (runner, runner.run()) + if options.seconds: + print "background launch...\n\n" + results, poller = runner.runAsync(options.seconds) + results = self.poll_while_needed(poller, options) + else: + results = runner.run() - # ---------------------------------------------- - - def get_polling_runner(self, old_runner, jid): - return ansible.runner.Runner( - module_name='async_status', module_path=old_runner.module_path, - module_args="jid=%s" % jid, remote_user=old_runner.remote_user, - remote_pass=old_runner.remote_pass, inventory=old_runner.inventory, - timeout=old_runner.timeout, forks=old_runner.forks, - pattern='*', callbacks=self.silent_callbacks, - ) - - # ---------------------------------------------- - - def hosts_to_poll(self, results): - hosts = [] - for (host, res) in results['contacted'].iteritems(): - if res.get('started',False): - hosts.append(host) - return hosts + return (runner, results) # ---------------------------------------------- - def poll_if_needed(self, runner, results, options, args): + def poll_while_needed(self, poller, options): ''' summarize results from Runner ''' - if results is None: - exit("No hosts matched") - # BACKGROUND POLL LOGIC when -B and -P are specified - # FIXME: refactor if options.seconds and options.poll_interval > 0: - poll_hosts = results['contacted'].keys() - if len(poll_hosts) == 0: - exit("no jobs were launched successfully") - ahost = poll_hosts[0] - jid = results['contacted'][ahost].get('ansible_job_id', None) - if jid is None: - exit("unexpected error: unable to determine jid") + poller.wait(options.seconds, options.poll_interval) + + return poller.results - clock = options.seconds - while (clock >= 0): - runner.inventory.restrict_to(poll_hosts) - polling_runner = self.get_polling_runner(runner, jid) - poll_results = polling_runner.run() - runner.inventory.lift_restriction() - if poll_results is None: - break - for (host, host_result) in poll_results['contacted'].iteritems(): - # override last result with current status result for report - results['contacted'][host] = host_result - print utils.async_poll_status(jid, host, clock, host_result) - for (host, host_result) in poll_results['dark'].iteritems(): - print "FAILED: %s => %s" % (host, host_result) - clock = clock - options.poll_interval - time.sleep(options.poll_interval) - poll_hosts = self.hosts_to_poll(poll_results) - if len(poll_hosts)==0: - break ######################################################## @@ -172,6 +128,4 @@ if __name__ == '__main__': # Generic handler for ansible specific errors print "ERROR: %s" % str(e) sys.exit(1) - else: - cli.poll_if_needed(runner, results, options, args) diff --git a/lib/ansible/playbook.py b/lib/ansible/playbook.py index 038e5e5cee..507db625f1 100644 --- a/lib/ansible/playbook.py +++ b/lib/ansible/playbook.py @@ -23,7 +23,6 @@ import ansible.constants as C from ansible import utils from ansible import errors import os -import time # used to transfer variables to Runner SETUP_CACHE={ } @@ -254,83 +253,17 @@ class PlayBook(object): # ***************************************************** - def hosts_to_poll(self, results): - ''' which hosts need more polling? ''' - - hosts = [] - for (host, res) in results['contacted'].iteritems(): - if (host in self.stats.failures) or (host in self.stats.dark): - continue - if not 'finished' in res and not 'skipped' in res and 'started' in res: - hosts.append(host) - return hosts - - # ***************************************************** - - def _async_poll(self, runner, hosts, async_seconds, async_poll_interval, only_if): + def _async_poll(self, poller, async_seconds, async_poll_interval): ''' launch an async job, if poll_interval is set, wait for completion ''' - runner.background = async_seconds - results = runner.run() - self.stats.compute(results, poll=True) - - if async_poll_interval <= 0: - # if not polling, playbook requested fire and forget - # trust the user wanted that and return immediately - return results - - poll_hosts = results['contacted'].keys() - if len(poll_hosts) == 0: - # no hosts launched ok, return that. - return results - ahost = poll_hosts[0] - - jid = results['contacted'][ahost].get('ansible_job_id', None) - - if jid is None: - # note: this really shouldn't happen, ever - self.callbacks.on_async_confused("unexpected error: unable to determine jid") - return results - - clock = async_seconds - host_list = self.hosts_to_poll(results) - - poll_results = results - while (clock >= 0): - - # poll/loop until polling duration complete - runner.module_args = "jid=%s" % jid - runner.module_name = 'async_status' - runner.background = 0 - runner.pattern = '*' - self.inventory.restrict_to(host_list) - poll_results = runner.run() - self.stats.compute(poll_results, poll=True) - host_list = self.hosts_to_poll(poll_results) - self.inventory.lift_restriction() - - if len(host_list) == 0: - break - if poll_results is None: - break - - # mention which hosts we're going to poll again... - for (host, host_result) in poll_results['contacted'].iteritems(): - results['contacted'][host] = host_result - if not host in self.stats.dark and not host in self.stats.failures: - self.callbacks.on_async_poll(jid, host, clock, host_result) - - # run down the clock - clock = clock - async_poll_interval - time.sleep(async_poll_interval) + results = poller.wait(async_seconds, async_poll_interval) # mark any hosts that are still listed as started as failed # since these likely got killed by async_wrapper - for (host, host_result) in poll_results['contacted'].iteritems(): - if 'started' in host_result: - reason = { 'failed' : 1, 'rc' : None, 'msg' : 'timed out' } - self.runner_callbacks.on_failed(host, reason) - results['contacted'][host] = reason + for host in poller.hosts_to_poll: + reason = { 'failed' : 1, 'rc' : None, 'msg' : 'timed out' } + self.runner_callbacks.on_failed(host, reason) + results['contacted'][host] = reason return results @@ -361,7 +294,12 @@ class PlayBook(object): if async_seconds == 0: results = runner.run() else: - results = self._async_poll(runner, hosts, async_seconds, async_poll_interval, only_if) + results, poller = runner.runAsync(async_seconds) + self.stats.compute(results) + if async_poll_interval > 0: + # if not polling, playbook requested fire and forget + # trust the user wanted that and return immediately + results = self._async_poll(poller, async_seconds, async_poll_interval) self.inventory.lift_restriction() return results diff --git a/lib/ansible/utils.py b/lib/ansible/utils.py index cc6269895c..a44f62d0e7 100644 --- a/lib/ansible/utils.py +++ b/lib/ansible/utils.py @@ -170,14 +170,6 @@ def path_dwim(basedir, given): else: return os.path.join(basedir, given) -def async_poll_status(jid, host, clock, result): - if 'finished' in result: - return " finished on %s" % (jid, host) - elif 'failed' in result: - return " FAILED on %s" % (jid, host) - else: - return " polling on %s, %s remaining" % (jid, host, clock) - def json_loads(data): return json.loads(data) diff --git a/test/TestPlayBook.py b/test/TestPlayBook.py index 05a486ca00..c8c2eea410 100644 --- a/test/TestPlayBook.py +++ b/test/TestPlayBook.py @@ -74,12 +74,15 @@ class TestCallbacks(object): def on_play_start(self, pattern): EVENTS.append([ 'play start', [ pattern ]]) - def on_async_confused(self, msg): - EVENTS.append([ 'async confused', [ msg ]]) + def on_async_ok(self, host, res, jid): + EVENTS.append([ 'async ok', [ host ]]) - def on_async_poll(self, jid, host, clock, host_result): + def on_async_poll(self, host, res, jid, clock): EVENTS.append([ 'async poll', [ host ]]) + def on_async_failed(self, host, res, jid): + EVENTS.append([ 'async failed', [ host ]]) + def on_unreachable(self, host, msg): EVENTS.append([ 'failed/dark', [ host, msg ]])