From ce9a8c9ffc9e049d9362e1a5703a8b9097676308 Mon Sep 17 00:00:00 2001 From: Jeroen Hoekx Date: Thu, 26 Apr 2012 20:34:49 +0200 Subject: [PATCH] Introduce Async API in Runner. --- lib/ansible/callbacks.py | 44 +++++++++++++++----- lib/ansible/runner.py | 86 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 120 insertions(+), 10 deletions(-) diff --git a/lib/ansible/callbacks.py b/lib/ansible/callbacks.py index 62c506c8d3..0f9764b294 100644 --- a/lib/ansible/callbacks.py +++ b/lib/ansible/callbacks.py @@ -50,7 +50,7 @@ class AggregateStats(object): elif 'skipped' in value and bool(value['skipped']): self._increment('skipped', host) elif 'changed' in value and bool(value['changed']): - if not setup: + if not setup and not poll: self._increment('changed', host) self._increment('ok', host) else: @@ -98,6 +98,15 @@ class DefaultRunnerCallbacks(object): def on_no_hosts(self): pass + def on_async_poll(self, host, res, jid, clock): + pass + + def on_async_ok(self, host, res, jid): + pass + + def on_async_failed(self, host, res, jid): + pass + ######################################################################## class CliRunnerCallbacks(DefaultRunnerCallbacks): @@ -108,10 +117,14 @@ class CliRunnerCallbacks(DefaultRunnerCallbacks): self.options = None def on_failed(self, host, res): - self._on_any(host,res) + invocation = res.get('invocation','') + if not invocation.startswith('async_status'): + self._on_any(host,res) def on_ok(self, host, res): - self._on_any(host,res) + invocation = res.get('invocation','') + if not invocation.startswith('async_status'): + self._on_any(host,res) def on_unreachable(self, host, res): print "%s | FAILED => %s" % (host, res) @@ -127,6 +140,15 @@ class CliRunnerCallbacks(DefaultRunnerCallbacks): def on_no_hosts(self): print >>sys.stderr, "no hosts matched\n" + def on_async_poll(self, host, res, jid, clock): + print " polling on %s, %s remaining"%(jid, host, clock) + + def on_async_ok(self, host, res, jid): + print " finished on %s => %s"%(jid, host, utils.bigjson(res)) + + def on_async_failed(self, host, res, jid): + print " FAILED on %s => %s"%(jid, host, utils.bigjson(res)) + def _on_any(self, host, result): print utils.host_report_msg(host, self.options.module_name, result, self.options.one_line) if self.options.tree: @@ -168,6 +190,15 @@ class PlaybookRunnerCallbacks(DefaultRunnerCallbacks): def on_no_hosts(self): print "no hosts matched or remaining\n" + def on_async_poll(self, host, res, jid, clock): + print " polling on %s, %s remaining"%(jid, host, clock) + + def on_async_ok(self, host, res, jid): + print " finished on %s"%(jid, host) + + def on_async_failed(self, host, res, jid): + print " FAILED on %s"%(jid, host) + ######################################################################## class PlaybookCallbacks(object): @@ -205,10 +236,3 @@ class PlaybookCallbacks(object): def on_play_start(self, pattern): print "PLAY [%s] ****************************\n" % pattern - - def on_async_confused(self, msg): - print msg - - def on_async_poll(self, jid, host, clock, host_result): - print utils.async_poll_status(jid, host, clock, host_result) - diff --git a/lib/ansible/runner.py b/lib/ansible/runner.py index 8e10d5b1df..dce62cc6bc 100644 --- a/lib/ansible/runner.py +++ b/lib/ansible/runner.py @@ -26,6 +26,7 @@ import Queue import random import traceback import tempfile +import time import base64 import getpass @@ -759,4 +760,89 @@ class Runner(object): results = [ self._executor(h[1]) for h in hosts ] return self._partition_results(results) + def runAsync(self, time_limit): + ''' Run this module asynchronously and return a poller. ''' + self.background = time_limit + results = self.run() + return results, AsyncPoller(results, self) + +class AsyncPoller(object): + """ Manage asynchronous jobs. """ + + def __init__(self, results, runner): + self.runner = runner + + self.results = { 'contacted': {}, 'dark': {}} + self.hosts_to_poll = [] + self.completed = False + + # Get job id and which hosts to poll again in the future + jid = None + for (host, res) in results['contacted'].iteritems(): + if res.get('started', False): + self.hosts_to_poll.append(host) + jid = res.get('ansible_job_id', None) + else: + self.results['contacted'][host] = res + for (host, res) in results['dark'].iteritems(): + self.results['dark'][host] = res + + if jid is None: + raise errors.AnsibleError("unexpected error: unable to determine jid") + if len(self.hosts_to_poll)==0: + raise errors.AnsibleErrot("unexpected error: no hosts to poll") + self.jid = jid + + def poll(self): + """ Poll the job status. + + Returns the changes in this iteration.""" + self.runner.module_name = 'async_status' + self.runner.module_args = "jid=%s" % self.jid + self.runner.pattern = "*" + self.runner.background = 0 + + self.runner.inventory.restrict_to(self.hosts_to_poll) + results = self.runner.run() + self.runner.inventory.lift_restriction() + + hosts = [] + poll_results = { 'contacted': {}, 'dark': {}, 'polled': {}} + for (host, res) in results['contacted'].iteritems(): + if res.get('started',False): + hosts.append(host) + poll_results['polled'][host] = res + else: + self.results['contacted'][host] = res + poll_results['contacted'][host] = res + if 'failed' in res: + self.runner.callbacks.on_async_failed(host, res, self.jid) + else: + self.runner.callbacks.on_async_ok(host, res, self.jid) + for (host, res) in results['dark'].iteritems(): + self.results['dark'][host] = res + poll_results['dark'][host] = res + self.runner.callbacks.on_async_failed(host, res, self.jid) + + self.hosts_to_poll = hosts + if len(hosts)==0: + self.completed = True + + return poll_results + + def wait(self, seconds, poll_interval): + """ Wait a certain time for job completion, check status every poll_interval. """ + clock = seconds - poll_interval + while (clock >= 0 and not self.completed): + time.sleep(poll_interval) + + poll_results = self.poll() + + for (host, res) in poll_results['polled'].iteritems(): + if res.get('started'): + self.runner.callbacks.on_async_poll(host, res, self.jid, clock) + + clock = clock - poll_interval + + return self.results