mirror of
https://github.com/ansible-collections/community.general.git
synced 2024-09-14 20:13:21 +02:00
Performance improvements for HostVars and some bugfixes
This commit is contained in:
parent
9d23acf302
commit
5cbeab5a3c
6 changed files with 85 additions and 84 deletions
|
@ -58,7 +58,7 @@ class ResultProcess(multiprocessing.Process):
|
||||||
|
|
||||||
def _send_result(self, result):
|
def _send_result(self, result):
|
||||||
debug(u"sending result: %s" % ([text_type(x) for x in result],))
|
debug(u"sending result: %s" % ([text_type(x) for x in result],))
|
||||||
self._final_q.put(result, block=False)
|
self._final_q.put(result)
|
||||||
debug("done sending result")
|
debug("done sending result")
|
||||||
|
|
||||||
def _read_worker_result(self):
|
def _read_worker_result(self):
|
||||||
|
@ -73,7 +73,7 @@ class ResultProcess(multiprocessing.Process):
|
||||||
try:
|
try:
|
||||||
if not rslt_q.empty():
|
if not rslt_q.empty():
|
||||||
debug("worker %d has data to read" % self._cur_worker)
|
debug("worker %d has data to read" % self._cur_worker)
|
||||||
result = rslt_q.get(block=False)
|
result = rslt_q.get()
|
||||||
debug("got a result from worker %d: %s" % (self._cur_worker, result))
|
debug("got a result from worker %d: %s" % (self._cur_worker, result))
|
||||||
break
|
break
|
||||||
except queue.Empty:
|
except queue.Empty:
|
||||||
|
@ -101,7 +101,7 @@ class ResultProcess(multiprocessing.Process):
|
||||||
try:
|
try:
|
||||||
result = self._read_worker_result()
|
result = self._read_worker_result()
|
||||||
if result is None:
|
if result is None:
|
||||||
time.sleep(0.01)
|
time.sleep(0.0001)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
clean_copy = strip_internal_keys(result._result)
|
clean_copy = strip_internal_keys(result._result)
|
||||||
|
|
|
@ -59,11 +59,13 @@ class WorkerProcess(multiprocessing.Process):
|
||||||
for reading later.
|
for reading later.
|
||||||
'''
|
'''
|
||||||
|
|
||||||
def __init__(self, tqm, main_q, rslt_q, loader):
|
def __init__(self, tqm, main_q, rslt_q, hostvars_manager, loader):
|
||||||
|
|
||||||
|
super(WorkerProcess, self).__init__()
|
||||||
# takes a task queue manager as the sole param:
|
# takes a task queue manager as the sole param:
|
||||||
self._main_q = main_q
|
self._main_q = main_q
|
||||||
self._rslt_q = rslt_q
|
self._rslt_q = rslt_q
|
||||||
|
self._hostvars = hostvars_manager
|
||||||
self._loader = loader
|
self._loader = loader
|
||||||
|
|
||||||
# dupe stdin, if we have one
|
# dupe stdin, if we have one
|
||||||
|
@ -82,8 +84,6 @@ class WorkerProcess(multiprocessing.Process):
|
||||||
# couldn't get stdin's fileno, so we just carry on
|
# couldn't get stdin's fileno, so we just carry on
|
||||||
pass
|
pass
|
||||||
|
|
||||||
super(WorkerProcess, self).__init__()
|
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
'''
|
'''
|
||||||
Called when the process is started, and loops indefinitely
|
Called when the process is started, and loops indefinitely
|
||||||
|
@ -100,14 +100,15 @@ class WorkerProcess(multiprocessing.Process):
|
||||||
while True:
|
while True:
|
||||||
task = None
|
task = None
|
||||||
try:
|
try:
|
||||||
debug("waiting for a message...")
|
#debug("waiting for work")
|
||||||
(host, task, basedir, zip_vars, hostvars, compressed_vars, play_context, shared_loader_obj) = self._main_q.get()
|
(host, task, basedir, zip_vars, compressed_vars, play_context, shared_loader_obj) = self._main_q.get(block=False)
|
||||||
|
|
||||||
if compressed_vars:
|
if compressed_vars:
|
||||||
job_vars = json.loads(zlib.decompress(zip_vars))
|
job_vars = json.loads(zlib.decompress(zip_vars))
|
||||||
else:
|
else:
|
||||||
job_vars = zip_vars
|
job_vars = zip_vars
|
||||||
job_vars['hostvars'] = hostvars
|
|
||||||
|
job_vars['hostvars'] = self._hostvars.hostvars()
|
||||||
|
|
||||||
debug("there's work to be done! got a task/handler to work on: %s" % task)
|
debug("there's work to be done! got a task/handler to work on: %s" % task)
|
||||||
|
|
||||||
|
@ -142,7 +143,7 @@ class WorkerProcess(multiprocessing.Process):
|
||||||
debug("done sending task result")
|
debug("done sending task result")
|
||||||
|
|
||||||
except queue.Empty:
|
except queue.Empty:
|
||||||
pass
|
time.sleep(0.0001)
|
||||||
except AnsibleConnectionFailure:
|
except AnsibleConnectionFailure:
|
||||||
try:
|
try:
|
||||||
if task:
|
if task:
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
from __future__ import (absolute_import, division, print_function)
|
from __future__ import (absolute_import, division, print_function)
|
||||||
__metaclass__ = type
|
__metaclass__ = type
|
||||||
|
|
||||||
|
from multiprocessing.managers import SyncManager, DictProxy
|
||||||
import multiprocessing
|
import multiprocessing
|
||||||
import os
|
import os
|
||||||
import tempfile
|
import tempfile
|
||||||
|
@ -32,6 +33,7 @@ from ansible.executor.stats import AggregateStats
|
||||||
from ansible.playbook.play_context import PlayContext
|
from ansible.playbook.play_context import PlayContext
|
||||||
from ansible.plugins import callback_loader, strategy_loader, module_loader
|
from ansible.plugins import callback_loader, strategy_loader, module_loader
|
||||||
from ansible.template import Templar
|
from ansible.template import Templar
|
||||||
|
from ansible.vars.hostvars import HostVars
|
||||||
|
|
||||||
try:
|
try:
|
||||||
from __main__ import display
|
from __main__ import display
|
||||||
|
@ -98,7 +100,7 @@ class TaskQueueManager:
|
||||||
main_q = multiprocessing.Queue()
|
main_q = multiprocessing.Queue()
|
||||||
rslt_q = multiprocessing.Queue()
|
rslt_q = multiprocessing.Queue()
|
||||||
|
|
||||||
prc = WorkerProcess(self, main_q, rslt_q, self._loader)
|
prc = WorkerProcess(self, main_q, rslt_q, self._hostvars_manager, self._loader)
|
||||||
prc.start()
|
prc.start()
|
||||||
|
|
||||||
self._workers.append((prc, main_q, rslt_q))
|
self._workers.append((prc, main_q, rslt_q))
|
||||||
|
@ -173,11 +175,6 @@ class TaskQueueManager:
|
||||||
are done with the current task).
|
are done with the current task).
|
||||||
'''
|
'''
|
||||||
|
|
||||||
# Fork # of forks, # of hosts or serial, whichever is lowest
|
|
||||||
contenders = [self._options.forks, play.serial, len(self._inventory.get_hosts(play.hosts))]
|
|
||||||
contenders = [ v for v in contenders if v is not None and v > 0 ]
|
|
||||||
self._initialize_processes(min(contenders))
|
|
||||||
|
|
||||||
if not self._callbacks_loaded:
|
if not self._callbacks_loaded:
|
||||||
self.load_callbacks()
|
self.load_callbacks()
|
||||||
|
|
||||||
|
@ -187,6 +184,34 @@ class TaskQueueManager:
|
||||||
new_play = play.copy()
|
new_play = play.copy()
|
||||||
new_play.post_validate(templar)
|
new_play.post_validate(templar)
|
||||||
|
|
||||||
|
class HostVarsManager(SyncManager):
|
||||||
|
pass
|
||||||
|
|
||||||
|
hostvars = HostVars(
|
||||||
|
play=new_play,
|
||||||
|
inventory=self._inventory,
|
||||||
|
variable_manager=self._variable_manager,
|
||||||
|
loader=self._loader,
|
||||||
|
)
|
||||||
|
|
||||||
|
HostVarsManager.register(
|
||||||
|
'hostvars',
|
||||||
|
callable=lambda: hostvars,
|
||||||
|
# FIXME: this is the list of exposed methods to the DictProxy object, plus our
|
||||||
|
# one special one (set_variable_manager). There's probably a better way
|
||||||
|
# to do this with a proper BaseProxy/DictProxy derivative
|
||||||
|
exposed=('set_variable_manager', '__contains__', '__delitem__', '__getitem__',
|
||||||
|
'__len__', '__setitem__', 'clear', 'copy', 'get', 'has_key', 'items',
|
||||||
|
'keys', 'pop', 'popitem', 'setdefault', 'update', 'values'),
|
||||||
|
)
|
||||||
|
self._hostvars_manager = HostVarsManager()
|
||||||
|
self._hostvars_manager.start()
|
||||||
|
|
||||||
|
# Fork # of forks, # of hosts or serial, whichever is lowest
|
||||||
|
contenders = [self._options.forks, play.serial, len(self._inventory.get_hosts(new_play.hosts))]
|
||||||
|
contenders = [ v for v in contenders if v is not None and v > 0 ]
|
||||||
|
self._initialize_processes(min(contenders))
|
||||||
|
|
||||||
play_context = PlayContext(new_play, self._options, self.passwords, self._connection_lockfile.fileno())
|
play_context = PlayContext(new_play, self._options, self.passwords, self._connection_lockfile.fileno())
|
||||||
for callback_plugin in self._callback_plugins:
|
for callback_plugin in self._callback_plugins:
|
||||||
if hasattr(callback_plugin, 'set_play_context'):
|
if hasattr(callback_plugin, 'set_play_context'):
|
||||||
|
@ -221,6 +246,7 @@ class TaskQueueManager:
|
||||||
# and run the play using the strategy and cleanup on way out
|
# and run the play using the strategy and cleanup on way out
|
||||||
play_return = strategy.run(iterator, play_context)
|
play_return = strategy.run(iterator, play_context)
|
||||||
self._cleanup_processes()
|
self._cleanup_processes()
|
||||||
|
self._hostvars_manager.shutdown()
|
||||||
return play_return
|
return play_return
|
||||||
|
|
||||||
def cleanup(self):
|
def cleanup(self):
|
||||||
|
|
|
@ -158,7 +158,6 @@ class StrategyBase:
|
||||||
# hostvars out of the task variables right now, due to the fact
|
# hostvars out of the task variables right now, due to the fact
|
||||||
# that they're not JSON serializable
|
# that they're not JSON serializable
|
||||||
compressed_vars = False
|
compressed_vars = False
|
||||||
hostvars = task_vars.pop('hostvars', None)
|
|
||||||
if C.DEFAULT_VAR_COMPRESSION_LEVEL > 0:
|
if C.DEFAULT_VAR_COMPRESSION_LEVEL > 0:
|
||||||
zip_vars = zlib.compress(json.dumps(task_vars), C.DEFAULT_VAR_COMPRESSION_LEVEL)
|
zip_vars = zlib.compress(json.dumps(task_vars), C.DEFAULT_VAR_COMPRESSION_LEVEL)
|
||||||
compressed_vars = True
|
compressed_vars = True
|
||||||
|
@ -170,10 +169,7 @@ class StrategyBase:
|
||||||
zip_vars = task_vars # noqa (pyflakes false positive because task_vars is deleted in the conditional above)
|
zip_vars = task_vars # noqa (pyflakes false positive because task_vars is deleted in the conditional above)
|
||||||
|
|
||||||
# and queue the task
|
# and queue the task
|
||||||
main_q.put((host, task, self._loader.get_basedir(), zip_vars, hostvars, compressed_vars, play_context, shared_loader_obj), block=False)
|
main_q.put((host, task, self._loader.get_basedir(), zip_vars, compressed_vars, play_context, shared_loader_obj))
|
||||||
|
|
||||||
# nuke the hostvars object too, as its no longer needed
|
|
||||||
del hostvars
|
|
||||||
|
|
||||||
self._pending_results += 1
|
self._pending_results += 1
|
||||||
except (EOFError, IOError, AssertionError) as e:
|
except (EOFError, IOError, AssertionError) as e:
|
||||||
|
@ -192,7 +188,7 @@ class StrategyBase:
|
||||||
|
|
||||||
while not self._final_q.empty() and not self._tqm._terminated:
|
while not self._final_q.empty() and not self._tqm._terminated:
|
||||||
try:
|
try:
|
||||||
result = self._final_q.get(block=False)
|
result = self._final_q.get()
|
||||||
display.debug("got result from result worker: %s" % ([text_type(x) for x in result],))
|
display.debug("got result from result worker: %s" % ([text_type(x) for x in result],))
|
||||||
|
|
||||||
# all host status messages contain 2 entries: (msg, task_result)
|
# all host status messages contain 2 entries: (msg, task_result)
|
||||||
|
@ -277,6 +273,7 @@ class StrategyBase:
|
||||||
var_value = wrap_var(result[3])
|
var_value = wrap_var(result[3])
|
||||||
|
|
||||||
self._variable_manager.set_nonpersistent_facts(host, {var_name: var_value})
|
self._variable_manager.set_nonpersistent_facts(host, {var_name: var_value})
|
||||||
|
self._tqm._hostvars_manager.hostvars().set_variable_manager(self._variable_manager)
|
||||||
|
|
||||||
elif result[0] in ('set_host_var', 'set_host_facts'):
|
elif result[0] in ('set_host_var', 'set_host_facts'):
|
||||||
host = result[1]
|
host = result[1]
|
||||||
|
@ -307,11 +304,12 @@ class StrategyBase:
|
||||||
self._variable_manager.set_nonpersistent_facts(target_host, facts)
|
self._variable_manager.set_nonpersistent_facts(target_host, facts)
|
||||||
else:
|
else:
|
||||||
self._variable_manager.set_host_facts(target_host, facts)
|
self._variable_manager.set_host_facts(target_host, facts)
|
||||||
|
self._tqm._hostvars_manager.hostvars().set_variable_manager(self._variable_manager)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
raise AnsibleError("unknown result message received: %s" % result[0])
|
raise AnsibleError("unknown result message received: %s" % result[0])
|
||||||
except Queue.Empty:
|
except Queue.Empty:
|
||||||
time.sleep(0.01)
|
time.sleep(0.0001)
|
||||||
|
|
||||||
return ret_results
|
return ret_results
|
||||||
|
|
||||||
|
@ -327,7 +325,7 @@ class StrategyBase:
|
||||||
while self._pending_results > 0 and not self._tqm._terminated:
|
while self._pending_results > 0 and not self._tqm._terminated:
|
||||||
results = self._process_pending_results(iterator)
|
results = self._process_pending_results(iterator)
|
||||||
ret_results.extend(results)
|
ret_results.extend(results)
|
||||||
time.sleep(0.01)
|
time.sleep(0.0001)
|
||||||
display.debug("no more pending results, returning what we have")
|
display.debug("no more pending results, returning what we have")
|
||||||
|
|
||||||
return ret_results
|
return ret_results
|
||||||
|
|
|
@ -359,15 +359,15 @@ class VariableManager:
|
||||||
for (group_name, group) in iteritems(self._inventory.groups):
|
for (group_name, group) in iteritems(self._inventory.groups):
|
||||||
variables['groups'][group_name] = [h.name for h in group.get_hosts()]
|
variables['groups'][group_name] = [h.name for h in group.get_hosts()]
|
||||||
|
|
||||||
if include_hostvars:
|
#if include_hostvars:
|
||||||
hostvars_cache_entry = self._get_cache_entry(play=play)
|
# hostvars_cache_entry = self._get_cache_entry(play=play)
|
||||||
if hostvars_cache_entry in HOSTVARS_CACHE:
|
# if hostvars_cache_entry in HOSTVARS_CACHE:
|
||||||
hostvars = HOSTVARS_CACHE[hostvars_cache_entry]
|
# hostvars = HOSTVARS_CACHE[hostvars_cache_entry]
|
||||||
else:
|
# else:
|
||||||
hostvars = HostVars(play=play, inventory=self._inventory, loader=loader, variable_manager=self)
|
# hostvars = HostVars(play=play, inventory=self._inventory, loader=loader, variable_manager=self)
|
||||||
HOSTVARS_CACHE[hostvars_cache_entry] = hostvars
|
# HOSTVARS_CACHE[hostvars_cache_entry] = hostvars
|
||||||
variables['hostvars'] = hostvars
|
# variables['hostvars'] = hostvars
|
||||||
variables['vars'] = hostvars[host.get_name()]
|
# variables['vars'] = hostvars[host.get_name()]
|
||||||
|
|
||||||
if play:
|
if play:
|
||||||
variables['role_names'] = [r._role_name for r in play.roles]
|
variables['role_names'] = [r._role_name for r in play.roles]
|
||||||
|
|
|
@ -48,74 +48,50 @@ class HostVars(collections.Mapping):
|
||||||
|
|
||||||
def __init__(self, play, inventory, variable_manager, loader):
|
def __init__(self, play, inventory, variable_manager, loader):
|
||||||
self._lookup = dict()
|
self._lookup = dict()
|
||||||
|
self._inventory = inventory
|
||||||
self._loader = loader
|
self._loader = loader
|
||||||
self._play = play
|
self._play = play
|
||||||
self._variable_manager = variable_manager
|
self._variable_manager = variable_manager
|
||||||
self._cached_result = dict()
|
self._cached_result = dict()
|
||||||
|
|
||||||
hosts = inventory.get_hosts(ignore_limits_and_restrictions=True)
|
def set_variable_manager(self, variable_manager):
|
||||||
|
self._variable_manager = variable_manager
|
||||||
|
|
||||||
# check to see if localhost is in the hosts list, as we
|
def _find_host(self, host_name):
|
||||||
# may have it referenced via hostvars but if created implicitly
|
return self._inventory.get_host(host_name)
|
||||||
# it doesn't sow up in the hosts list
|
|
||||||
has_localhost = False
|
|
||||||
for host in hosts:
|
|
||||||
if host.name in C.LOCALHOST:
|
|
||||||
has_localhost = True
|
|
||||||
break
|
|
||||||
|
|
||||||
if not has_localhost:
|
|
||||||
new_host = Host(name='localhost')
|
|
||||||
new_host.set_variable("ansible_python_interpreter", sys.executable)
|
|
||||||
new_host.set_variable("ansible_connection", "local")
|
|
||||||
new_host.address = '127.0.0.1'
|
|
||||||
hosts.append(new_host)
|
|
||||||
|
|
||||||
for host in hosts:
|
|
||||||
self._lookup[host.name] = host
|
|
||||||
|
|
||||||
def __getitem__(self, host_name):
|
def __getitem__(self, host_name):
|
||||||
|
host = self._find_host(host_name)
|
||||||
|
if host is None:
|
||||||
|
return j2undefined
|
||||||
|
|
||||||
if host_name not in self._lookup:
|
|
||||||
return j2undefined()
|
|
||||||
|
|
||||||
host = self._lookup.get(host_name)
|
|
||||||
data = self._variable_manager.get_vars(loader=self._loader, host=host, play=self._play, include_hostvars=False)
|
data = self._variable_manager.get_vars(loader=self._loader, host=host, play=self._play, include_hostvars=False)
|
||||||
|
|
||||||
|
#****************************************************
|
||||||
|
# TESTING REMOVAL OF THIS
|
||||||
|
#****************************************************
|
||||||
|
# Since we template much later now in 2.0, it may be completely unrequired to do
|
||||||
|
# a full template of the vars returned above, which is quite costly in time when
|
||||||
|
# the result is large.
|
||||||
# Using cache in order to avoid template call
|
# Using cache in order to avoid template call
|
||||||
sha1_hash = sha1(str(data).encode('utf-8')).hexdigest()
|
#sha1_hash = sha1(str(data).encode('utf-8')).hexdigest()
|
||||||
if sha1_hash in self._cached_result:
|
#if sha1_hash in self._cached_result:
|
||||||
result = self._cached_result[sha1_hash]
|
# result = self._cached_result[sha1_hash]
|
||||||
else:
|
#else:
|
||||||
templar = Templar(variables=data, loader=self._loader)
|
# templar = Templar(variables=data, loader=self._loader)
|
||||||
result = templar.template(data, fail_on_undefined=False, static_vars=STATIC_VARS)
|
# result = templar.template(data, fail_on_undefined=False, static_vars=STATIC_VARS)
|
||||||
self._cached_result[sha1_hash] = result
|
# self._cached_result[sha1_hash] = result
|
||||||
return result
|
#return result
|
||||||
|
#****************************************************
|
||||||
|
return data
|
||||||
|
|
||||||
def __contains__(self, host_name):
|
def __contains__(self, host_name):
|
||||||
item = self.get(host_name)
|
return self._find_host(host_name) is not None
|
||||||
if item and item is not j2undefined:
|
|
||||||
return True
|
|
||||||
return False
|
|
||||||
|
|
||||||
def __iter__(self):
|
def __iter__(self):
|
||||||
for host in self._lookup:
|
for host in self._inventory.get_hosts(ignore_limits_and_restrictions=True):
|
||||||
yield host
|
yield host
|
||||||
|
|
||||||
def __len__(self):
|
def __len__(self):
|
||||||
return len(self._lookup)
|
return len(self._inventory.get_hosts(ignore_limits_and_restrictions=True))
|
||||||
|
|
||||||
def __getstate__(self):
|
|
||||||
return dict(
|
|
||||||
loader=self._loader,
|
|
||||||
lookup=self._lookup,
|
|
||||||
play=self._play,
|
|
||||||
var_manager=self._variable_manager,
|
|
||||||
)
|
|
||||||
|
|
||||||
def __setstate__(self, data):
|
|
||||||
self._play = data.get('play')
|
|
||||||
self._loader = data.get('loader')
|
|
||||||
self._lookup = data.get('lookup')
|
|
||||||
self._variable_manager = data.get('var_manager')
|
|
||||||
self._cached_result = dict()
|
|
||||||
|
|
Loading…
Reference in a new issue