From d0cfcdbc4d9430fedc11ac2d2dc9e59b61bb96e4 Mon Sep 17 00:00:00 2001 From: James Cammarata Date: Tue, 3 Mar 2015 14:59:23 -0600 Subject: [PATCH] Getting dynamic includes working a bit better on v2 --- v2/ansible/executor/play_iterator.py | 49 ++++++++----- v2/ansible/inventory/host.py | 3 + v2/ansible/plugins/strategies/__init__.py | 30 +++++--- v2/ansible/plugins/strategies/linear.py | 83 ++++++++++++++++++++--- v2/samples/common_include.yml | 1 + v2/samples/l1_include.yml | 3 + v2/samples/l2_include.yml | 4 ++ v2/samples/l3_include.yml | 4 ++ v2/samples/test_include.yml | 9 ++- 9 files changed, 147 insertions(+), 39 deletions(-) create mode 100644 v2/samples/common_include.yml create mode 100644 v2/samples/l1_include.yml create mode 100644 v2/samples/l2_include.yml create mode 100644 v2/samples/l3_include.yml diff --git a/v2/ansible/executor/play_iterator.py b/v2/ansible/executor/play_iterator.py index 8936a216be..0461fc87f2 100644 --- a/v2/ansible/executor/play_iterator.py +++ b/v2/ansible/executor/play_iterator.py @@ -39,6 +39,18 @@ class HostState: self.fail_state = PlayIterator.FAILED_NONE self.pending_setup = False + def __repr__(self): + return "HOST STATE: block=%d, task=%d, rescue=%d, always=%d, role=%s, run_state=%d, fail_state=%d, pending_setup=%s" % ( + self.cur_block, + self.cur_regular_task, + self.cur_rescue_task, + self.cur_always_task, + self.cur_role, + self.run_state, + self.fail_state, + self.pending_setup, + ) + def get_current_block(self): return self._blocks[self.cur_block] @@ -55,6 +67,7 @@ class HostState: return new_state class PlayIterator: + # the primary running states for the play iteration ITERATING_SETUP = 0 ITERATING_TASKS = 1 @@ -193,7 +206,8 @@ class PlayIterator: the different processes, and not all data structures are preserved. This method allows us to find the original task passed into the executor engine. ''' - for block in self._blocks: + s = self.get_host_state(host) + for block in s._blocks: if block.block: for t in block.block: if t._uuid == task._uuid: @@ -208,20 +222,23 @@ class PlayIterator: return t return None - def add_tasks(self, task_list): - if self._run_state == self.ITERATING_TASKS: - before = self._task_list[:self._cur_task_pos + self._tasks_added] - after = self._task_list[self._cur_task_pos + self._tasks_added:] - self._task_list = before + task_list + after - elif self._run_state == self.ITERATING_RESCUE: - before = self._cur_block.rescue[:self._cur_rescue_pos + self._tasks_added] - after = self._cur_block.rescue[self._cur_rescue_pos + self._tasks_added:] - self._cur_block.rescue = before + task_list + after - elif self._run_state == self.ITERATING_ALWAYS: - before = self._cur_block.always[:self._cur_always_pos + self._tasks_added] - after = self._cur_block.always[self._cur_always_pos + self._tasks_added:] - self._cur_block.always = before + task_list + after + def add_tasks(self, host, task_list): + s = self.get_host_state(host) + target_block = s._blocks[s.cur_block].copy() - # set this internal flag now so we know if - self._tasks_added += len(task_list) + if s.run_state == self.ITERATING_TASKS: + before = target_block.block[:s.cur_regular_task] + after = target_block.block[s.cur_regular_task:] + target_block.block = before + task_list + after + elif s.run_state == self.ITERATING_RESCUE: + before = target_block.rescue[:s.cur_rescue_task] + after = target_block.rescue[s.cur_rescue_task:] + target_block.rescue = before + task_list + after + elif s.run_state == self.ITERATING_ALWAYS: + before = target_block.always[:s.cur_always_task] + after = target_block.always[s.cur_always_task:] + target_block.always = before + task_list + after + + s._blocks[s.cur_block] = target_block + self._host_states[host.name] = s diff --git a/v2/ansible/inventory/host.py b/v2/ansible/inventory/host.py index 414ec34b96..29d6afd991 100644 --- a/v2/ansible/inventory/host.py +++ b/v2/ansible/inventory/host.py @@ -36,6 +36,9 @@ class Host: def __setstate__(self, data): return self.deserialize(data) + def __eq__(self, other): + return self.name == other.name + def serialize(self): groups = [] for group in self.groups: diff --git a/v2/ansible/plugins/strategies/__init__.py b/v2/ansible/plugins/strategies/__init__.py index a6f382289b..196868ba96 100644 --- a/v2/ansible/plugins/strategies/__init__.py +++ b/v2/ansible/plugins/strategies/__init__.py @@ -118,6 +118,8 @@ class StrategyBase: based on the result (executing callbacks, updating state, etc.). ''' + ret_results = [] + while not self._final_q.empty() and not self._tqm._terminated: try: result = self._final_q.get(block=False) @@ -156,6 +158,8 @@ class StrategyBase: if entry == hashed_entry : role_obj._had_task_run = True + ret_results.append(task_result) + #elif result[0] == 'include': # host = result[1] # task = result[2] @@ -211,19 +215,26 @@ class StrategyBase: except Queue.Empty: pass + return ret_results + def _wait_on_pending_results(self, iterator): ''' Wait for the shared counter to drop to zero, using a short sleep between checks to ensure we don't spin lock ''' + ret_results = [] + while self._pending_results > 0 and not self._tqm._terminated: debug("waiting for pending results (%d left)" % self._pending_results) - self._process_pending_results(iterator) + results = self._process_pending_results(iterator) + ret_results.extend(results) if self._tqm._terminated: break time.sleep(0.01) + return ret_results + def _add_host(self, host_info): ''' Helper function to add a new host to inventory based on a task result. @@ -292,22 +303,21 @@ class StrategyBase: # and add the host to the group new_group.add_host(actual_host) - def _load_included_file(self, task, include_file, include_vars): + def _load_included_file(self, included_file): ''' Loads an included YAML file of tasks, applying the optional set of variables. ''' - data = self._loader.load_from_file(include_file) + data = self._loader.load_from_file(included_file._filename) if not isinstance(data, list): - raise AnsibleParsingError("included task files must contain a list of tasks", obj=ds) - - is_handler = isinstance(task, Handler) + raise AnsibleParsingError("included task files must contain a list of tasks", obj=included_file._task._ds) + is_handler = isinstance(included_file._task, Handler) block_list = load_list_of_blocks( data, - parent_block=task._block, - task_include=task, - role=task._role, + parent_block=included_file._task._block, + task_include=included_file._task, + role=included_file._task._role, use_handlers=is_handler, loader=self._loader ) @@ -317,7 +327,7 @@ class StrategyBase: # set the vars for this task from those specified as params to the include for t in task_list: - t.vars = include_vars.copy() + t.vars = included_file._args.copy() return task_list diff --git a/v2/ansible/plugins/strategies/linear.py b/v2/ansible/plugins/strategies/linear.py index e4f55be86a..37a7ddf0fe 100644 --- a/v2/ansible/plugins/strategies/linear.py +++ b/v2/ansible/plugins/strategies/linear.py @@ -139,6 +139,7 @@ class StrategyModule(StrategyBase): callback_sent = False work_to_do = False + host_results = [] host_tasks = self._get_next_task_lockstep(hosts_left, iterator) for (host, task) in host_tasks: if not task: @@ -151,9 +152,14 @@ class StrategyModule(StrategyBase): # sets BYPASS_HOST_LOOP to true, or if it has run_once enabled. If so, we # will only send this task to the first host in the list. - action = action_loader.get(task.action, class_only=True) - if task.run_once or getattr(action, 'BYPASS_HOST_LOOP', False): - run_once = True + try: + action = action_loader.get(task.action, class_only=True) + if task.run_once or getattr(action, 'BYPASS_HOST_LOOP', False): + run_once = True + except KeyError: + # we don't care here, because the action may simply not have a + # corresponding action plugin + pass debug("getting variables") task_vars = self._variable_manager.get_vars(loader=self._loader, play=iterator._play, host=host, task=task) @@ -178,6 +184,7 @@ class StrategyModule(StrategyBase): meta_action = task.args.get('_raw_params') if meta_action == 'noop': # FIXME: issue a callback for the noop here? + print("%s => NOOP" % host) continue elif meta_action == 'flush_handlers': self.run_handlers(iterator, connection_info) @@ -191,18 +198,78 @@ class StrategyModule(StrategyBase): self._blocked_hosts[host.get_name()] = True self._queue_task(host, task, task_vars, connection_info) - self._process_pending_results(iterator) + results = self._process_pending_results(iterator) + host_results.extend(results) # if we're bypassing the host loop, break out now if run_once: break debug("done queuing things up, now waiting for results queue to drain") - self._wait_on_pending_results(iterator) + results = self._wait_on_pending_results(iterator) + host_results.extend(results) - # FIXME: MAKE PENDING RESULTS RETURN RESULTS PROCESSED AND USE THEM - # TO TAKE ACTION, ie. FOR INCLUDE STATEMENTS TO PRESERVE THE - # LOCK STEP OPERATION + class IncludedFile: + def __init__(self, filename, args, task): + self._filename = filename + self._args = args + self._task = task + self._hosts = [] + def add_host(self, host): + if host not in self._hosts: + self._hosts.append(host) + def __eq__(self, other): + return other._filename == self._filename and other._args == self._args + def __repr__(self): + return "%s (%s): %s" % (self._filename, self._args, self._hosts) + + included_files = [] + for res in host_results: + if res._task.action == 'include': + if res._task.loop: + include_results = res._result['results'] + else: + include_results = [ res._result ] + + for include_result in include_results: + original_task = iterator.get_original_task(res._host, res._task) + if original_task and original_task._role: + include_file = self._loader.path_dwim_relative(original_task._role._role_path, 'tasks', include_file) + else: + include_file = self._loader.path_dwim(res._task.args.get('_raw_params')) + + include_variables = include_result.get('include_variables', dict()) + + inc_file = IncludedFile(include_file, include_variables, original_task) + + try: + pos = included_files.index(inc_file) + inc_file = included_files[pos] + except ValueError: + included_files.append(inc_file) + + inc_file.add_host(res._host) + + if len(included_files) > 0: + noop_task = Task() + noop_task.action = 'meta' + noop_task.args['_raw_params'] = 'noop' + noop_task.set_loader(iterator._play._loader) + + all_tasks = dict((host, []) for host in hosts_left) + for included_file in included_files: + # included hosts get the task list while those excluded get an equal-length + # list of noop tasks, to make sure that they continue running in lock-step + new_tasks = self._load_included_file(included_file) + noop_tasks = [noop_task for t in new_tasks] + for host in hosts_left: + if host in included_file._hosts: + all_tasks[host].extend(new_tasks) + else: + all_tasks[host].extend(noop_tasks) + + for host in hosts_left: + iterator.add_tasks(host, all_tasks[host]) debug("results queue empty") except (IOError, EOFError), e: diff --git a/v2/samples/common_include.yml b/v2/samples/common_include.yml new file mode 100644 index 0000000000..c3fb79f5a1 --- /dev/null +++ b/v2/samples/common_include.yml @@ -0,0 +1 @@ +- debug: msg="this is the common include" diff --git a/v2/samples/l1_include.yml b/v2/samples/l1_include.yml new file mode 100644 index 0000000000..39438a63fe --- /dev/null +++ b/v2/samples/l1_include.yml @@ -0,0 +1,3 @@ +- debug: msg="this is the l1 include" +- include: common_include.yml + diff --git a/v2/samples/l2_include.yml b/v2/samples/l2_include.yml new file mode 100644 index 0000000000..ae931631e6 --- /dev/null +++ b/v2/samples/l2_include.yml @@ -0,0 +1,4 @@ +- debug: msg="this is the l2 include" +- debug: msg="a second task for l2" +- include: common_include.yml + diff --git a/v2/samples/l3_include.yml b/v2/samples/l3_include.yml new file mode 100644 index 0000000000..389adf1b3b --- /dev/null +++ b/v2/samples/l3_include.yml @@ -0,0 +1,4 @@ +- debug: msg="this is the l3 include" +- debug: msg="a second task for l3" +- debug: msg="a third task for l3" +- include: common_include.yml diff --git a/v2/samples/test_include.yml b/v2/samples/test_include.yml index 6a47f275f4..b47b4fc8cb 100644 --- a/v2/samples/test_include.yml +++ b/v2/samples/test_include.yml @@ -1,9 +1,7 @@ -- hosts: localhost +- hosts: all gather_facts: no tasks: - block: - - include: include.yml - when: 1 == 2 - include: include.yml a=1 when: 1 == 1 notify: foo @@ -12,10 +10,11 @@ - foo - bar - bam + - include: "{{inventory_hostname}}_include.yml" - fail: - #rescue: - #- include: include.yml a=rescue + rescue: + - include: include.yml a=rescue always: - include: include.yml a=always