From 6dda6f12dc8145590c5a41e023ceeafae85580be Mon Sep 17 00:00:00 2001 From: Michael DeHaan Date: Sun, 25 Mar 2012 19:05:27 -0400 Subject: [PATCH] Applying callback model to runner, and using that in playbooks, so output can be more immediate in playbooks. (Runner still does not use callbacks for default output) --- bin/ansible | 7 +- bin/ansible-playbook | 28 ++++-- lib/ansible/callbacks.py | 102 ++++++++++++++++---- lib/ansible/playbook.py | 200 ++++++++++++++------------------------- lib/ansible/runner.py | 51 +++++++--- library/async_status | 1 - test/TestPlayBook.py | 45 +++++---- test/playbook1.events | 66 +++++++++++-- 8 files changed, 309 insertions(+), 191 deletions(-) diff --git a/bin/ansible b/bin/ansible index 480112abd4..5b64c6e712 100755 --- a/bin/ansible +++ b/bin/ansible @@ -29,6 +29,7 @@ import ansible.runner import ansible.constants as C from ansible import utils from ansible import errors +from ansible import callbacks ######################################################## @@ -38,7 +39,8 @@ class Cli(object): # ---------------------------------------------- def __init__(self): - pass + self.stats = callbacks.AggregateStats() + self.callbacks = callbacks.DefaultRunnerCallbacks() # ---------------------------------------------- @@ -98,6 +100,7 @@ class Cli(object): forks=options.forks, background=options.seconds, pattern=pattern, + callbacks=self.callbacks, verbose=True, ) return (runner, runner.run()) @@ -116,6 +119,7 @@ class Cli(object): timeout=old_runner.timeout, forks=old_runner.forks, pattern='*', + callbacks=self.callbacks, verbose=True, ) @@ -178,6 +182,7 @@ class Cli(object): utils.write_tree_file(options.tree, hostname, utils.bigjson(utils.contacted_host_result(results, hostname))) buf += msg + # TODO: remove, callbacks now do this if utils.has_dark_hosts(results): buf += utils.dark_hosts_msg(results) diff --git a/bin/ansible-playbook b/bin/ansible-playbook index 6adb6f9c99..9e90c3e2f5 100755 --- a/bin/ansible-playbook +++ b/bin/ansible-playbook @@ -28,14 +28,6 @@ from ansible import errors from ansible import utils from ansible import callbacks -def summarize(results): - ''' print out per host statistics ''' - - print "PLAY RECAP ******************************\n" - hosts = sorted(results.keys()) - for host in hosts: - print "%s : %s" % (host, utils.smjson(results[host])) - def main(args): ''' run ansible-playbook operations ''' @@ -70,6 +62,11 @@ def main(args): # run all playbooks specified on the command line for playbook in args: + + stats = callbacks.AggregateStats() + playbook_cb = callbacks.PlaybookCallbacks() + runner_cb = callbacks.PlaybookRunnerCallbacks(stats) + pb = ansible.playbook.PlayBook( playbook=playbook, host_list=options.inventory, @@ -77,13 +74,24 @@ def main(args): forks=options.forks, verbose=True, remote_pass=sshpass, - callbacks=callbacks.PlaybookCallbacks(), + callbacks=playbook_cb, + runner_callbacks=runner_cb, + stats=stats, timeout=options.timeout, override_hosts=override_hosts, ) try: + results = pb.run() - summarize(results) + hosts = sorted(pb.stats.processed.keys()) + print "\n\nPLAY RECAP **********************\n\n" + for h in hosts: + t = pb.stats.summarize(h) + print "%-30s : ok=%4s changed=%4s unreachable=%4s failed=%4s " % (h, + t['ok'], t['changed'], t['unreachable'], t['failures'] + ) + print "\n" + except errors.AnsibleError, e: print >>sys.stderr, "ERROR: %s" % e return 1 diff --git a/lib/ansible/callbacks.py b/lib/ansible/callbacks.py index f33d966227..a332d74047 100755 --- a/lib/ansible/callbacks.py +++ b/lib/ansible/callbacks.py @@ -23,25 +23,70 @@ import utils ####################################################### -class PlaybookCallbacks(object): - +class AggregateStats(object): + + def __init__(self): + self.processed = {} + self.failures = {} + self.ok = {} + self.dark = {} + self.changed = {} + self.skipped = {} + + def _increment(self, what, host): + self.processed[host] = 1 + prev = (getattr(self, what)).get(host, 0) + getattr(self, what)[host] = prev+1 + + def compute(self, runner_results, setup=False, poll=False): + + for (host, value) in runner_results.get('contacted', {}).iteritems(): + if ('failed' in value and bool(value['failed'])) or ('rc' in value and value['rc'] != 0): + self._increment('failures', host) + elif 'skipped' in value and bool(value['skipped']): + self._increment('skipped', host) + elif 'changed' in value and bool(value['changed']): + if not setup: + self._increment('changed', host) + self._increment('ok', host) + else: + if not poll or ('finished' in value and bool(value['finished'])): + self._increment('ok', host) + + for (host, value) in runner_results.get('dark', {}).iteritems(): + self._increment('dark', host) + + + def summarize(self, host): + return dict( + ok = self.ok.get(host, 0), + failures = self.failures.get(host, 0), + unreachable = self.dark.get(host,0), + changed = self.changed.get(host, 0), + skipped = self.skipped.get(host, 0) + ) + +class DefaultRunnerCallbacks(object): + def __init__(self): pass - def set_playbook(self, playbook): - self.playbook = playbook + def on_failed(self, host, res): + pass - def on_start(self): - print "\n" + def on_ok(self, host, res): + pass - def on_task_start(self, name, is_conditional): - print utils.task_start_msg(name, is_conditional) + def on_skipped(self, host): + pass - def on_setup_primary(self): - print "SETUP PHASE ****************************\n" - - def on_setup_secondary(self): - print "\nVARIABLE IMPORT PHASE ******************\n" + def on_unreachable(self, host, res): + pass + +class PlaybookRunnerCallbacks(DefaultRunnerCallbacks): + + def __init__(self, stats): + self.stats = stats def on_unreachable(self, host, msg): print "unreachable: [%s] => %s" % (host, msg) @@ -55,7 +100,9 @@ class PlaybookCallbacks(object): def on_ok(self, host, host_result): invocation = host_result.get('invocation',None) - if not invocation or invocation.startswith('setup ') or invocation.startswith('async_status '): + if invocation.startswith('async_status'): + pass + elif not invocation or invocation.startswith('setup '): print "ok: [%s]\n" % (host) else: print "ok: [%s] => %s\n" % (host, invocation) @@ -63,6 +110,30 @@ class PlaybookCallbacks(object): def on_skipped(self, host): print "skipping: [%s]\n" % host +class PlaybookCallbacks(object): + + def __init__(self): + pass + + # TOOD: -- remove this + def set_playbook(self, playbook): + self.playbook = playbook + + def on_start(self): + print "\n" + + def on_notify(self, host, handler): + pass + + def on_task_start(self, name, is_conditional): + print utils.task_start_msg(name, is_conditional) + + def on_setup_primary(self): + print "SETUP PHASE ****************************\n" + + def on_setup_secondary(self): + print "\nVARIABLE IMPORT PHASE ******************\n" + def on_import_for_host(self, host, imported_file): print "%s: importing %s" % (host, imported_file) @@ -78,6 +149,3 @@ class PlaybookCallbacks(object): def on_async_poll(self, jid, host, clock, host_result): print utils.async_poll_status(jid, host, clock, host_result) - def on_dark_host(self, host, msg): - print "exception: [%s] => %s" % (host, msg) - diff --git a/lib/ansible/playbook.py b/lib/ansible/playbook.py index 8197ba6a9e..25808cb782 100755 --- a/lib/ansible/playbook.py +++ b/lib/ansible/playbook.py @@ -47,47 +47,41 @@ class PlayBook(object): # ***************************************************** def __init__(self, - playbook = None, - host_list = C.DEFAULT_HOST_LIST, - module_path = C.DEFAULT_MODULE_PATH, - forks = C.DEFAULT_FORKS, - timeout = C.DEFAULT_TIMEOUT, - remote_user = C.DEFAULT_REMOTE_USER, - remote_pass = C.DEFAULT_REMOTE_PASS, - override_hosts = None, - verbose = False, - callbacks = None): + playbook = None, + host_list = C.DEFAULT_HOST_LIST, + module_path = C.DEFAULT_MODULE_PATH, + forks = C.DEFAULT_FORKS, + timeout = C.DEFAULT_TIMEOUT, + remote_user = C.DEFAULT_REMOTE_USER, + remote_pass = C.DEFAULT_REMOTE_PASS, + override_hosts = None, + verbose = False, + callbacks = None, + runner_callbacks = None, + stats = None): + + if playbook is None or callbacks is None or runner_callbacks is None or stats is None: + raise Exception('missing required arguments') + + self.host_list = host_list + self.module_path = module_path + self.forks = forks + self.timeout = timeout + self.remote_user = remote_user + self.remote_pass = remote_pass + self.verbose = verbose + self.callbacks = callbacks + self.runner_callbacks = runner_callbacks + self.override_hosts = override_hosts + self.stats = stats - self.host_list = host_list - self.module_path = module_path - self.forks = forks - self.timeout = timeout - self.remote_user = remote_user - self.remote_pass = remote_pass - self.verbose = verbose - self.callbacks = callbacks - self.override_hosts = override_hosts self.callbacks.set_playbook(self) - - # store the list of changes/invocations/failure counts - # as a dictionary of integers keyed off the hostname - - self.dark = {} - self.changed = {} - self.invocations = {} - self.failures = {} - self.skipped = {} - self.processed = {} - - # playbook file can be passed in as a path or - # as file contents (to support API usage) - self.basedir = os.path.dirname(playbook) self.playbook = self._parse_playbook(playbook) self.host_list, self.groups = ansible.runner.Runner.parse_hosts( host_list, override_hosts=self.override_hosts) - + # ***************************************************** def _get_vars(self, play, dirname): @@ -172,75 +166,32 @@ class PlayBook(object): # summarize the results results = {} - for host in self.processed.keys(): - results[host] = dict( - resources = self.invocations.get(host, 0), - changed = self.changed.get(host, 0), - dark = self.dark.get(host, 0), - failed = self.failures.get(host, 0), - skipped = self.skipped.get(host, 0) - ) + for host in self.stats.processed.keys(): + results[host] = self.stats.summarize(host) return results # ***************************************************** - def _prune_failed_hosts(self, host_list): - ''' given a host list, use the global failure information to trim the list ''' - - new_hosts = [] - for x in host_list: - if not x in self.failures and not x in self.dark: - new_hosts.append(x) - return new_hosts - - # ***************************************************** - def hosts_to_poll(self, results): ''' which hosts need more polling? ''' hosts = [] for (host, res) in results['contacted'].iteritems(): + if (host in self.stats.failures) or (host in self.stats.dark): + continue if not 'finished' in res and not 'skipped' in res and 'started' in res: hosts.append(host) return hosts - # **************************************************** - - def _compute_aggregrate_counts(self, results, poll=False, setup=False): - ''' prints results about playbook run + computes stats about per host changes ''' - - dark_hosts = results.get('dark',{}) - contacted_hosts = results.get('contacted',{}) - for (host, error) in dark_hosts.iteritems(): - self.processed[host] = 1 - self.callbacks.on_dark_host(host, error) - self.dark[host] = 1 - for (host, host_result) in contacted_hosts.iteritems(): - self.processed[host] = 1 - if 'failed' in host_result or (int(host_result.get('rc',0)) != 0): - self.callbacks.on_failed(host, host_result) - self.failures[host] = 1 - elif 'skipped' in host_result: - self.skipped[host] = self.skipped.get(host, 0) + 1 - self.callbacks.on_skipped(host) - elif poll: - continue - elif not setup and ('changed' in host_result): - self.invocations[host] = self.invocations.get(host, 0) + 1 - self.changed[host] = self.changed.get(host, 0) + 1 - self.callbacks.on_ok(host, host_result) - else: - self.invocations[host] = self.invocations.get(host, 0) + 1 - self.callbacks.on_ok(host, host_result) - # ***************************************************** - def _async_poll(self, runner, async_seconds, async_poll_interval, only_if): + def _async_poll(self, runner, hosts, async_seconds, async_poll_interval, only_if): ''' launch an async job, if poll_interval is set, wait for completion ''' + runner.host_list = hosts runner.background = async_seconds results = runner.run() - self._compute_aggregrate_counts(results, poll=True) + self.stats.compute(results, poll=True) if async_poll_interval <= 0: # if not polling, playbook requested fire and forget @@ -261,83 +212,75 @@ class PlayBook(object): return results clock = async_seconds - runner.hosts = self.hosts_to_poll(results) - runner.hosts = self._prune_failed_hosts(runner.hosts) + runner.host_list = self.hosts_to_poll(results) poll_results = results while (clock >= 0): # poll/loop until polling duration complete # FIXME: make a "get_async_runner" method like in /bin/ansible - runner.hosts = poll_hosts runner.module_args = [ "jid=%s" % jid ] runner.module_name = 'async_status' # FIXME: make it such that if you say 'async_status' you # can't background that op! runner.background = 0 runner.pattern = '*' - runner.hosts = self.hosts_to_poll(poll_results) poll_results = runner.run() + self.stats.compute(poll_results, poll=True) + runner.host_list = self.hosts_to_poll(poll_results) - if len(runner.hosts) == 0: + if len(runner.host_list) == 0: break if poll_results is None: break - self._compute_aggregrate_counts(poll_results, poll=True) - # mention which hosts we're going to poll again... for (host, host_result) in poll_results['contacted'].iteritems(): results['contacted'][host] = host_result - if not host in self.dark and not host in self.failures: + if not host in self.stats.dark and not host in self.stats.failures: self.callbacks.on_async_poll(jid, host, clock, host_result) # run down the clock clock = clock - async_poll_interval time.sleep(async_poll_interval) - # mark any hosts that are still listed as started as failed - # since these likely got killed by async_wrapper - for (host, host_result) in results['contacted'].iteritems(): - if 'started' in host_result: - results['contacted'][host] = { 'failed' : 1, 'rc' : None, 'msg' : 'timed out' } + # mark any hosts that are still listed as started as failed + # since these likely got killed by async_wrapper + for (host, host_result) in poll_results['contacted'].iteritems(): + if 'started' in host_result: + reason = { 'failed' : 1, 'rc' : None, 'msg' : 'timed out' } + self.runner_callbacks.on_failed(host, reason) + results['contacted'][host] = reason return results # ***************************************************** - def _run_module(self, pattern, module, args, hosts, remote_user, + def _run_module(self, pattern, host_list, module, args, remote_user, async_seconds, async_poll_interval, only_if): ''' run a particular module step in a playbook ''' + hosts = [ h for h in host_list if (h not in self.stats.failures) and (h not in self.stats.dark)] + runner = ansible.runner.Runner( pattern=pattern, groups=self.groups, module_name=module, module_args=args, host_list=hosts, forks=self.forks, remote_pass=self.remote_pass, module_path=self.module_path, timeout=self.timeout, remote_user=remote_user, setup_cache=SETUP_CACHE, basedir=self.basedir, - conditional=only_if + conditional=only_if, callbacks=self.runner_callbacks, ) if async_seconds == 0: - rc = runner.run() + return runner.run() else: - rc = self._async_poll(runner, async_seconds, async_poll_interval, only_if) - - dark_hosts = rc.get('dark',{}) - for (host, error) in dark_hosts.iteritems(): - self.callbacks.on_dark_host(host, error) - - return rc + return self._async_poll(runner, hosts, async_seconds, async_poll_interval, only_if) # ***************************************************** - def _run_task(self, pattern=None, task=None, host_list=None, + def _run_task(self, pattern=None, host_list=None, task=None, remote_user=None, handlers=None, conditional=False): ''' run a single task in the playbook and recursively run any subtasks. ''' - # do not continue to run tasks on hosts that have had failures - host_list = self._prune_failed_hosts(host_list) - # load the module name and parameters from the task entry name = task.get('name', None) action = task.get('action', None) @@ -362,17 +305,17 @@ class PlayBook(object): # load up an appropriate ansible runner to # run the task in parallel - results = self._run_module(pattern, module_name, - module_args, host_list, remote_user, + results = self._run_module(pattern, host_list, module_name, + module_args, remote_user, async_seconds, async_poll_interval, only_if) + self.stats.compute(results) + # if no hosts are matched, carry on, unlike /bin/ansible # which would warn you about this if results is None: results = {} - self._compute_aggregrate_counts(results) - # flag which notify handlers need to be run # this will be on a SUBSET of the actual host list. For instance # a file might need to be written on only half of the nodes so @@ -402,6 +345,7 @@ class PlayBook(object): if name is None: raise errors.AnsibleError('handler is missing a name') if match_name == name: + self.callbacks.on_notify(host, name) # flag the handler with the list of hosts it needs to be run on, it will be run later if not 'run' in x: x['run'] = [] @@ -454,7 +398,7 @@ class PlayBook(object): # ***************************************************** - def _do_setup_step(self, pattern, vars, user, host_list, vars_files=None): + def _do_setup_step(self, pattern, vars, user, vars_files=None): ''' push variables down to the systems and get variables+facts back up ''' # this enables conditional includes like $facter_os.yml and is only done @@ -463,7 +407,7 @@ class PlayBook(object): if vars_files is not None: self.callbacks.on_setup_secondary() - self._do_conditional_imports(vars_files, host_list) + self._do_conditional_imports(vars_files, self.host_list) else: self.callbacks.on_setup_primary() @@ -474,16 +418,18 @@ class PlayBook(object): for (k,v) in vars.iteritems(): push_var_str += "%s=\"%s\" " % (k,v) + host_list = [ h for h in self.host_list if not (h in self.stats.failures or h in self.stats.dark) ] + # push any variables down to the system setup_results = ansible.runner.Runner( pattern=pattern, groups=self.groups, module_name='setup', - module_args=push_var_str, host_list=self.host_list, + module_args=push_var_str, host_list=host_list, forks=self.forks, module_path=self.module_path, timeout=self.timeout, remote_user=user, - remote_pass=self.remote_pass, setup_cache=SETUP_CACHE + remote_pass=self.remote_pass, setup_cache=SETUP_CACHE, + callbacks=self.runner_callbacks, ).run() - - self._compute_aggregrate_counts(setup_results, setup=True) + self.stats.compute(setup_results, setup=True) # now for each result, load into the setup cache so we can # let runner template out future commands @@ -493,7 +439,6 @@ class PlayBook(object): for (host, result) in setup_ok.iteritems(): SETUP_CACHE[host] = result - host_list = self._prune_failed_hosts(host_list) return host_list # ***************************************************** @@ -517,11 +462,11 @@ class PlayBook(object): self.callbacks.on_play_start(pattern) # push any variables down to the system # and get facts/ohai/other data back up - self.host_list = self._do_setup_step(pattern, vars, user, self.host_list, None) + self._do_setup_step(pattern, vars, user, None) # now with that data, handle contentional variable file imports! if len(vars_files) > 0: - self.host_list = self._do_setup_step(pattern, vars, user, self.host_list, vars_files) + self._do_setup_step(pattern, vars, user, vars_files) # run all the top level tasks, these get run on every node for task in tasks: @@ -540,12 +485,13 @@ class PlayBook(object): # but Apache will only be restarted once (at the end). for task in handlers: - if type(task.get("run", None)) == list: + triggered_by = task.get('run', None) + if type(triggered_by) == list: self._run_task( pattern=pattern, task=task, - handlers=handlers, - host_list=task.get('run',[]), + handlers=[], + host_list=triggered_by, conditional=True, remote_user=user ) diff --git a/lib/ansible/runner.py b/lib/ansible/runner.py index c64fa38686..49f17bb4d3 100755 --- a/lib/ansible/runner.py +++ b/lib/ansible/runner.py @@ -32,6 +32,7 @@ import ansible.constants as C import ansible.connection from ansible import utils from ansible import errors +from ansible import callbacks as ans_callbacks ################################################ @@ -46,13 +47,9 @@ def _executor_hook(job_queue, result_queue): result_queue.put(runner._executor(host)) except Queue.Empty: pass - except errors.AnsibleError, ae: - result_queue.put([host, False, str(ae)]) - except Exception: - # probably should include the full trace - result_queue.put([host, False, traceback.format_exc()]) - - + except: + traceback.print_exc() + ################################################ class Runner(object): @@ -64,13 +61,17 @@ class Runner(object): forks=C.DEFAULT_FORKS, timeout=C.DEFAULT_TIMEOUT, pattern=C.DEFAULT_PATTERN, remote_user=C.DEFAULT_REMOTE_USER, remote_pass=C.DEFAULT_REMOTE_PASS, background=0, basedir=None, setup_cache=None, transport='paramiko', - conditional='True', groups={}, verbose=False): + conditional='True', groups={}, callbacks=None, verbose=False): if setup_cache is None: setup_cache = {} if basedir is None: basedir = os.getcwd() + if callbacks is None: + callbacks = ans_callbacks.DefaultRunnerCallbacks() + self.callbacks = callbacks + self.generated_jid = str(random.randint(0, 999999999999)) self.connector = ansible.connection.Connection(self, transport) @@ -492,6 +493,18 @@ class Runner(object): # ***************************************************** def _executor(self, host): + try: + return self._executor_internal(host) + except errors.AnsibleError, ae: + msg = str(ae) + self.callbacks.on_unreachable(host, msg) + return [host, False, msg] + except Exception: + msg = traceback.format_exc() + self.callbacks.on_unreachable(host, msg) + return [host, False, msg] + + def _executor_internal(self, host): ''' callback executed in parallel for each host. returns (hostname, connected_ok, extra) ''' ok, conn = self._connect(host) @@ -515,6 +528,18 @@ class Runner(object): self._delete_remote_files(conn, tmp) conn.close() + + (host, connect_ok, data) = result + if not connect_ok: + self.callbacks.on_unreachable(host, data) + else: + if 'failed' in data or 'rc' in data and str(data['rc']) != '0': + self.callbacks.on_failed(host, data) + elif 'skipped' in data: + self.callbacks.on_skipped(host) + else: + self.callbacks.on_ok(host, data) + return result # ***************************************************** @@ -566,10 +591,10 @@ class Runner(object): ''' handles mulitprocessing when more than 1 fork is required ''' job_queue = multiprocessing.Manager().Queue() - result_queue = multiprocessing.Manager().Queue() - [job_queue.put(i) for i in hosts] + result_queue = multiprocessing.Manager().Queue() + workers = [] for i in range(self.forks): prc = multiprocessing.Process(target=_executor_hook, @@ -597,6 +622,9 @@ class Runner(object): results2 = dict(contacted={}, dark={}) + if results is None: + return None + for result in results: (host, contacted_ok, result) = result if contacted_ok: @@ -622,10 +650,11 @@ class Runner(object): return dict(contacted={}, dark={}) hosts = [ (self,x) for x in hosts ] + results = None if self.forks > 1: results = self._parallel_exec(hosts) else: - results = [ x._executor(h) for (x,h) in hosts ] + results = [ self._executor(h[1]) for h in hosts ] return self._partition_results(results) diff --git a/library/async_status b/library/async_status index 74732b669b..bf5b0a82d3 100755 --- a/library/async_status +++ b/library/async_status @@ -87,7 +87,6 @@ except Exception, e: print json.dumps({ "results_file" : log_path, "ansible_job_id" : jid, - "traceback" : str(e), "started" : 1, }) else: diff --git a/test/TestPlayBook.py b/test/TestPlayBook.py index bedad5f83e..9fec3ef225 100644 --- a/test/TestPlayBook.py +++ b/test/TestPlayBook.py @@ -7,6 +7,7 @@ import unittest import getpass import ansible.playbook import ansible.utils as utils +import ansible.callbacks as ans_callbacks import os import shutil import time @@ -15,63 +16,69 @@ try: except: import simplejson as json +EVENTS = [] + class TestCallbacks(object): + # using same callbacks class for both runner and playbook def __init__(self): - self.events = [] + pass def set_playbook(self, playbook): self.playbook = playbook def on_start(self): - self.events.append('start') + EVENTS.append('start') def on_setup_primary(self): - self.events.append([ 'primary_setup' ]) + EVENTS.append([ 'primary_setup' ]) def on_setup_secondary(self): - self.events.append([ 'secondary_setup' ]) + EVENTS.append([ 'secondary_setup' ]) def on_skipped(self, host): - self.events.append([ 'skipped', [ host ]]) + EVENTS.append([ 'skipped', [ host ]]) def on_import_for_host(self, host, filename): - self.events.append([ 'import', [ host, filename ]]) + EVENTS.append([ 'import', [ host, filename ]]) def on_not_import_for_host(self, host, missing_filename): pass + def on_notify(self, host, handler): + EVENTS.append([ 'notify', [ host, handler ]]) + def on_task_start(self, name, is_conditional): - self.events.append([ 'task start', [ name, is_conditional ]]) + EVENTS.append([ 'task start', [ name, is_conditional ]]) def on_unreachable(self, host, msg): - self.events.append([ 'unreachable', [ host, msg ]]) + EVENTS.append([ 'unreachable', [ host, msg ]]) def on_failed(self, host, results): - self.events.append([ 'failed', [ host, results ]]) + EVENTS.append([ 'failed', [ host, results ]]) def on_ok(self, host, result): # delete certain info from host_result to make test comparisons easier host_result = result.copy() - for k in [ 'ansible_job_id', 'invocation', 'md5sum', 'delta', 'start', 'end' ]: + for k in [ 'ansible_job_id', 'results_file', 'invocation', 'md5sum', 'delta', 'start', 'end' ]: if k in host_result: del host_result[k] for k in host_result.keys(): if k.startswith('facter_') or k.startswith('ohai_'): del host_result[k] - self.events.append([ 'ok', [ host, host_result ]]) + EVENTS.append([ 'ok', [ host, host_result ]]) def on_play_start(self, pattern): - self.events.append([ 'play start', [ pattern ]]) + EVENTS.append([ 'play start', [ pattern ]]) def on_async_confused(self, msg): - self.events.append([ 'async confused', [ msg ]]) + EVENTS.append([ 'async confused', [ msg ]]) def on_async_poll(self, jid, host, clock, host_result): - self.events.append([ 'async poll', [ host ]]) + EVENTS.append([ 'async poll', [ host ]]) - def on_dark_host(self, host, msg): - self.events.append([ 'failed/dark', [ host, msg ]]) + def on_unreachable(self, host, msg): + EVENTS.append([ 'failed/dark', [ host, msg ]]) def on_setup_primary(self): pass @@ -125,12 +132,14 @@ class TestRunner(unittest.TestCase): remote_user = self.user, remote_pass = None, verbose = False, - callbacks = self.test_callbacks + stats = ans_callbacks.AggregateStats(), + callbacks = self.test_callbacks, + runner_callbacks = self.test_callbacks ) results = self.playbook.run() return dict( results = results, - events = self.test_callbacks.events, + events = EVENTS ) def test_one(self): diff --git a/test/playbook1.events b/test/playbook1.events index d4dd49dfbd..71d2f97f3e 100644 --- a/test/playbook1.events +++ b/test/playbook1.events @@ -151,6 +151,13 @@ } ] ], + [ + "notify", + [ + "127.0.0.1", + "on change 1" + ] + ], [ "task start", [ @@ -172,6 +179,20 @@ } ] ], + [ + "notify", + [ + "127.0.0.1", + "on change 1" + ] + ], + [ + "notify", + [ + "127.0.0.1", + "on change 2" + ] + ], [ "task start", [ @@ -180,9 +201,21 @@ ] ], [ - "async poll", + "ok", [ - "127.0.0.1" + "127.0.0.1", + { + "started": 1 + } + ] + ], + [ + "ok", + [ + "127.0.0.1", + { + "started": 1 + } ] ], [ @@ -191,6 +224,15 @@ "127.0.0.1" ] ], + [ + "ok", + [ + "127.0.0.1", + { + "started": 1 + } + ] + ], [ "async poll", [ @@ -230,6 +272,18 @@ true ] ], + [ + "ok", + [ + "127.0.0.1", + { + "cmd": "echo this should fire once ", + "rc": 0, + "stderr": "", + "stdout": "this should fire once" + } + ] + ], [ "ok", [ @@ -265,10 +319,10 @@ "results": { "127.0.0.1": { "changed": 2, - "dark": 0, - "failed": 0, - "resources": 11, - "skipped": 1 + "failures": 0, + "ok": 12, + "skipped": 1, + "unreachable": 0 } } }