diff --git a/lib/ansible/executor/process/result.py b/lib/ansible/executor/process/result.py index 2d13aa44cd..cdc8875631 100644 --- a/lib/ansible/executor/process/result.py +++ b/lib/ansible/executor/process/result.py @@ -58,7 +58,7 @@ class ResultProcess(multiprocessing.Process): def _send_result(self, result): debug(u"sending result: %s" % ([text_type(x) for x in result],)) - self._final_q.put(result, block=False) + self._final_q.put(result) debug("done sending result") def _read_worker_result(self): @@ -73,7 +73,7 @@ class ResultProcess(multiprocessing.Process): try: if not rslt_q.empty(): debug("worker %d has data to read" % self._cur_worker) - result = rslt_q.get(block=False) + result = rslt_q.get() debug("got a result from worker %d: %s" % (self._cur_worker, result)) break except queue.Empty: @@ -101,7 +101,7 @@ class ResultProcess(multiprocessing.Process): try: result = self._read_worker_result() if result is None: - time.sleep(0.01) + time.sleep(0.0001) continue clean_copy = strip_internal_keys(result._result) diff --git a/lib/ansible/executor/process/worker.py b/lib/ansible/executor/process/worker.py index 1cc1f7df43..a1a83a5dda 100644 --- a/lib/ansible/executor/process/worker.py +++ b/lib/ansible/executor/process/worker.py @@ -59,11 +59,13 @@ class WorkerProcess(multiprocessing.Process): for reading later. ''' - def __init__(self, tqm, main_q, rslt_q, loader): + def __init__(self, tqm, main_q, rslt_q, hostvars_manager, loader): + super(WorkerProcess, self).__init__() # takes a task queue manager as the sole param: self._main_q = main_q self._rslt_q = rslt_q + self._hostvars = hostvars_manager self._loader = loader # dupe stdin, if we have one @@ -82,8 +84,6 @@ class WorkerProcess(multiprocessing.Process): # couldn't get stdin's fileno, so we just carry on pass - super(WorkerProcess, self).__init__() - def run(self): ''' Called when the process is started, and loops indefinitely @@ -100,14 +100,15 @@ class WorkerProcess(multiprocessing.Process): while True: task = None try: - debug("waiting for a message...") - (host, task, basedir, zip_vars, hostvars, compressed_vars, play_context, shared_loader_obj) = self._main_q.get() + #debug("waiting for work") + (host, task, basedir, zip_vars, compressed_vars, play_context, shared_loader_obj) = self._main_q.get(block=False) if compressed_vars: job_vars = json.loads(zlib.decompress(zip_vars)) else: job_vars = zip_vars - job_vars['hostvars'] = hostvars + + job_vars['hostvars'] = self._hostvars.hostvars() debug("there's work to be done! got a task/handler to work on: %s" % task) @@ -142,7 +143,7 @@ class WorkerProcess(multiprocessing.Process): debug("done sending task result") except queue.Empty: - pass + time.sleep(0.0001) except AnsibleConnectionFailure: try: if task: diff --git a/lib/ansible/executor/task_queue_manager.py b/lib/ansible/executor/task_queue_manager.py index 001d71e9e0..3e62cb3c99 100644 --- a/lib/ansible/executor/task_queue_manager.py +++ b/lib/ansible/executor/task_queue_manager.py @@ -19,6 +19,7 @@ from __future__ import (absolute_import, division, print_function) __metaclass__ = type +from multiprocessing.managers import SyncManager, DictProxy import multiprocessing import os import tempfile @@ -32,6 +33,7 @@ from ansible.executor.stats import AggregateStats from ansible.playbook.play_context import PlayContext from ansible.plugins import callback_loader, strategy_loader, module_loader from ansible.template import Templar +from ansible.vars.hostvars import HostVars try: from __main__ import display @@ -98,7 +100,7 @@ class TaskQueueManager: main_q = multiprocessing.Queue() rslt_q = multiprocessing.Queue() - prc = WorkerProcess(self, main_q, rslt_q, self._loader) + prc = WorkerProcess(self, main_q, rslt_q, self._hostvars_manager, self._loader) prc.start() self._workers.append((prc, main_q, rslt_q)) @@ -173,11 +175,6 @@ class TaskQueueManager: are done with the current task). ''' - # Fork # of forks, # of hosts or serial, whichever is lowest - contenders = [self._options.forks, play.serial, len(self._inventory.get_hosts(play.hosts))] - contenders = [ v for v in contenders if v is not None and v > 0 ] - self._initialize_processes(min(contenders)) - if not self._callbacks_loaded: self.load_callbacks() @@ -187,6 +184,34 @@ class TaskQueueManager: new_play = play.copy() new_play.post_validate(templar) + class HostVarsManager(SyncManager): + pass + + hostvars = HostVars( + play=new_play, + inventory=self._inventory, + variable_manager=self._variable_manager, + loader=self._loader, + ) + + HostVarsManager.register( + 'hostvars', + callable=lambda: hostvars, + # FIXME: this is the list of exposed methods to the DictProxy object, plus our + # one special one (set_variable_manager). There's probably a better way + # to do this with a proper BaseProxy/DictProxy derivative + exposed=('set_variable_manager', '__contains__', '__delitem__', '__getitem__', + '__len__', '__setitem__', 'clear', 'copy', 'get', 'has_key', 'items', + 'keys', 'pop', 'popitem', 'setdefault', 'update', 'values'), + ) + self._hostvars_manager = HostVarsManager() + self._hostvars_manager.start() + + # Fork # of forks, # of hosts or serial, whichever is lowest + contenders = [self._options.forks, play.serial, len(self._inventory.get_hosts(new_play.hosts))] + contenders = [ v for v in contenders if v is not None and v > 0 ] + self._initialize_processes(min(contenders)) + play_context = PlayContext(new_play, self._options, self.passwords, self._connection_lockfile.fileno()) for callback_plugin in self._callback_plugins: if hasattr(callback_plugin, 'set_play_context'): @@ -221,6 +246,7 @@ class TaskQueueManager: # and run the play using the strategy and cleanup on way out play_return = strategy.run(iterator, play_context) self._cleanup_processes() + self._hostvars_manager.shutdown() return play_return def cleanup(self): diff --git a/lib/ansible/plugins/strategy/__init__.py b/lib/ansible/plugins/strategy/__init__.py index 3cdec5b573..f1f4650529 100644 --- a/lib/ansible/plugins/strategy/__init__.py +++ b/lib/ansible/plugins/strategy/__init__.py @@ -158,7 +158,6 @@ class StrategyBase: # hostvars out of the task variables right now, due to the fact # that they're not JSON serializable compressed_vars = False - hostvars = task_vars.pop('hostvars', None) if C.DEFAULT_VAR_COMPRESSION_LEVEL > 0: zip_vars = zlib.compress(json.dumps(task_vars), C.DEFAULT_VAR_COMPRESSION_LEVEL) compressed_vars = True @@ -170,10 +169,7 @@ class StrategyBase: zip_vars = task_vars # noqa (pyflakes false positive because task_vars is deleted in the conditional above) # and queue the task - main_q.put((host, task, self._loader.get_basedir(), zip_vars, hostvars, compressed_vars, play_context, shared_loader_obj), block=False) - - # nuke the hostvars object too, as its no longer needed - del hostvars + main_q.put((host, task, self._loader.get_basedir(), zip_vars, compressed_vars, play_context, shared_loader_obj)) self._pending_results += 1 except (EOFError, IOError, AssertionError) as e: @@ -192,7 +188,7 @@ class StrategyBase: while not self._final_q.empty() and not self._tqm._terminated: try: - result = self._final_q.get(block=False) + result = self._final_q.get() display.debug("got result from result worker: %s" % ([text_type(x) for x in result],)) # all host status messages contain 2 entries: (msg, task_result) @@ -277,6 +273,7 @@ class StrategyBase: var_value = wrap_var(result[3]) self._variable_manager.set_nonpersistent_facts(host, {var_name: var_value}) + self._tqm._hostvars_manager.hostvars().set_variable_manager(self._variable_manager) elif result[0] in ('set_host_var', 'set_host_facts'): host = result[1] @@ -307,11 +304,12 @@ class StrategyBase: self._variable_manager.set_nonpersistent_facts(target_host, facts) else: self._variable_manager.set_host_facts(target_host, facts) + self._tqm._hostvars_manager.hostvars().set_variable_manager(self._variable_manager) else: raise AnsibleError("unknown result message received: %s" % result[0]) except Queue.Empty: - time.sleep(0.01) + time.sleep(0.0001) return ret_results @@ -327,7 +325,7 @@ class StrategyBase: while self._pending_results > 0 and not self._tqm._terminated: results = self._process_pending_results(iterator) ret_results.extend(results) - time.sleep(0.01) + time.sleep(0.0001) display.debug("no more pending results, returning what we have") return ret_results diff --git a/lib/ansible/vars/__init__.py b/lib/ansible/vars/__init__.py index 26f52adfb0..c895b59f5f 100644 --- a/lib/ansible/vars/__init__.py +++ b/lib/ansible/vars/__init__.py @@ -359,15 +359,15 @@ class VariableManager: for (group_name, group) in iteritems(self._inventory.groups): variables['groups'][group_name] = [h.name for h in group.get_hosts()] - if include_hostvars: - hostvars_cache_entry = self._get_cache_entry(play=play) - if hostvars_cache_entry in HOSTVARS_CACHE: - hostvars = HOSTVARS_CACHE[hostvars_cache_entry] - else: - hostvars = HostVars(play=play, inventory=self._inventory, loader=loader, variable_manager=self) - HOSTVARS_CACHE[hostvars_cache_entry] = hostvars - variables['hostvars'] = hostvars - variables['vars'] = hostvars[host.get_name()] + #if include_hostvars: + # hostvars_cache_entry = self._get_cache_entry(play=play) + # if hostvars_cache_entry in HOSTVARS_CACHE: + # hostvars = HOSTVARS_CACHE[hostvars_cache_entry] + # else: + # hostvars = HostVars(play=play, inventory=self._inventory, loader=loader, variable_manager=self) + # HOSTVARS_CACHE[hostvars_cache_entry] = hostvars + # variables['hostvars'] = hostvars + # variables['vars'] = hostvars[host.get_name()] if play: variables['role_names'] = [r._role_name for r in play.roles] diff --git a/lib/ansible/vars/hostvars.py b/lib/ansible/vars/hostvars.py index de27984039..246b2c7812 100644 --- a/lib/ansible/vars/hostvars.py +++ b/lib/ansible/vars/hostvars.py @@ -48,74 +48,50 @@ class HostVars(collections.Mapping): def __init__(self, play, inventory, variable_manager, loader): self._lookup = dict() + self._inventory = inventory self._loader = loader self._play = play self._variable_manager = variable_manager self._cached_result = dict() - hosts = inventory.get_hosts(ignore_limits_and_restrictions=True) + def set_variable_manager(self, variable_manager): + self._variable_manager = variable_manager - # check to see if localhost is in the hosts list, as we - # may have it referenced via hostvars but if created implicitly - # it doesn't sow up in the hosts list - has_localhost = False - for host in hosts: - if host.name in C.LOCALHOST: - has_localhost = True - break - - if not has_localhost: - new_host = Host(name='localhost') - new_host.set_variable("ansible_python_interpreter", sys.executable) - new_host.set_variable("ansible_connection", "local") - new_host.address = '127.0.0.1' - hosts.append(new_host) - - for host in hosts: - self._lookup[host.name] = host + def _find_host(self, host_name): + return self._inventory.get_host(host_name) def __getitem__(self, host_name): + host = self._find_host(host_name) + if host is None: + return j2undefined - if host_name not in self._lookup: - return j2undefined() - - host = self._lookup.get(host_name) data = self._variable_manager.get_vars(loader=self._loader, host=host, play=self._play, include_hostvars=False) + #**************************************************** + # TESTING REMOVAL OF THIS + #**************************************************** + # Since we template much later now in 2.0, it may be completely unrequired to do + # a full template of the vars returned above, which is quite costly in time when + # the result is large. # Using cache in order to avoid template call - sha1_hash = sha1(str(data).encode('utf-8')).hexdigest() - if sha1_hash in self._cached_result: - result = self._cached_result[sha1_hash] - else: - templar = Templar(variables=data, loader=self._loader) - result = templar.template(data, fail_on_undefined=False, static_vars=STATIC_VARS) - self._cached_result[sha1_hash] = result - return result + #sha1_hash = sha1(str(data).encode('utf-8')).hexdigest() + #if sha1_hash in self._cached_result: + # result = self._cached_result[sha1_hash] + #else: + # templar = Templar(variables=data, loader=self._loader) + # result = templar.template(data, fail_on_undefined=False, static_vars=STATIC_VARS) + # self._cached_result[sha1_hash] = result + #return result + #**************************************************** + return data def __contains__(self, host_name): - item = self.get(host_name) - if item and item is not j2undefined: - return True - return False + return self._find_host(host_name) is not None def __iter__(self): - for host in self._lookup: + for host in self._inventory.get_hosts(ignore_limits_and_restrictions=True): yield host def __len__(self): - return len(self._lookup) + return len(self._inventory.get_hosts(ignore_limits_and_restrictions=True)) - def __getstate__(self): - return dict( - loader=self._loader, - lookup=self._lookup, - play=self._play, - var_manager=self._variable_manager, - ) - - def __setstate__(self, data): - self._play = data.get('play') - self._loader = data.get('loader') - self._lookup = data.get('lookup') - self._variable_manager = data.get('var_manager') - self._cached_result = dict()