1
0
Fork 0
mirror of https://github.com/ansible-collections/community.general.git synced 2024-09-14 20:13:21 +02:00

Performance improvements

* Speed up serialization of hostvars by simply using the internal
  dictionary used for cached lookups
* Use blocking gets/puts on queues instead of spin locking
* Merge sequential implicitly created blocks
This commit is contained in:
James Cammarata 2015-08-17 13:45:07 -04:00
parent 1d402ae390
commit e7b2308b66
5 changed files with 57 additions and 42 deletions

View file

@ -72,9 +72,9 @@ class ResultProcess(multiprocessing.Process):
self._cur_worker = 0 self._cur_worker = 0
try: try:
if not rslt_q.empty(): if rslt_q.qsize() > 0:
debug("worker %d has data to read" % self._cur_worker) debug("worker %d has data to read" % self._cur_worker)
result = rslt_q.get(block=False) result = rslt_q.get()
debug("got a result from worker %d: %s" % (self._cur_worker, result)) debug("got a result from worker %d: %s" % (self._cur_worker, result))
break break
except queue.Empty: except queue.Empty:
@ -102,7 +102,7 @@ class ResultProcess(multiprocessing.Process):
try: try:
result = self._read_worker_result() result = self._read_worker_result()
if result is None: if result is None:
time.sleep(0.1) time.sleep(0.01)
continue continue
# if this task is registering a result, do it now # if this task is registering a result, do it now

View file

@ -92,40 +92,36 @@ class WorkerProcess(multiprocessing.Process):
while True: while True:
task = None task = None
try: try:
if not self._main_q.empty(): (host, task, basedir, job_vars, play_context, shared_loader_obj) = self._main_q.get()
debug("there's work to be done!") debug("there's work to be done!")
(host, task, basedir, job_vars, play_context, shared_loader_obj) = self._main_q.get(block=False) debug("got a task/handler to work on: %s" % task)
debug("got a task/handler to work on: %s" % task)
# because the task queue manager starts workers (forks) before the # because the task queue manager starts workers (forks) before the
# playbook is loaded, set the basedir of the loader inherted by # playbook is loaded, set the basedir of the loader inherted by
# this fork now so that we can find files correctly # this fork now so that we can find files correctly
self._loader.set_basedir(basedir) self._loader.set_basedir(basedir)
# Serializing/deserializing tasks does not preserve the loader attribute, # Serializing/deserializing tasks does not preserve the loader attribute,
# since it is passed to the worker during the forking of the process and # since it is passed to the worker during the forking of the process and
# would be wasteful to serialize. So we set it here on the task now, and # would be wasteful to serialize. So we set it here on the task now, and
# the task handles updating parent/child objects as needed. # the task handles updating parent/child objects as needed.
task.set_loader(self._loader) task.set_loader(self._loader)
# apply the given task's information to the connection info, # apply the given task's information to the connection info,
# which may override some fields already set by the play or # which may override some fields already set by the play or
# the options specified on the command line # the options specified on the command line
new_play_context = play_context.set_task_and_variable_override(task=task, variables=job_vars) new_play_context = play_context.set_task_and_variable_override(task=task, variables=job_vars)
# execute the task and build a TaskResult from the result # execute the task and build a TaskResult from the result
debug("running TaskExecutor() for %s/%s" % (host, task)) debug("running TaskExecutor() for %s/%s" % (host, task))
executor_result = TaskExecutor(host, task, job_vars, new_play_context, self._new_stdin, self._loader, shared_loader_obj).run() executor_result = TaskExecutor(host, task, job_vars, new_play_context, self._new_stdin, self._loader, shared_loader_obj).run()
debug("done running TaskExecutor() for %s/%s" % (host, task)) debug("done running TaskExecutor() for %s/%s" % (host, task))
task_result = TaskResult(host, task, executor_result) task_result = TaskResult(host, task, executor_result)
# put the result on the result queue # put the result on the result queue
debug("sending task result") debug("sending task result")
self._rslt_q.put(task_result, block=False) self._rslt_q.put(task_result)
debug("done sending task result") debug("done sending task result")
else:
time.sleep(0.1)
except queue.Empty: except queue.Empty:
pass pass

View file

@ -37,12 +37,13 @@ class Block(Base, Become, Conditional, Taggable):
# similar to the 'else' clause for exceptions # similar to the 'else' clause for exceptions
#_otherwise = FieldAttribute(isa='list') #_otherwise = FieldAttribute(isa='list')
def __init__(self, play=None, parent_block=None, role=None, task_include=None, use_handlers=False): def __init__(self, play=None, parent_block=None, role=None, task_include=None, use_handlers=False, implicit=False):
self._play = play self._play = play
self._role = role self._role = role
self._task_include = task_include self._task_include = task_include
self._parent_block = parent_block self._parent_block = parent_block
self._use_handlers = use_handlers self._use_handlers = use_handlers
self._implicit = implicit
self._dep_chain = [] self._dep_chain = []
super(Block, self).__init__() super(Block, self).__init__()
@ -66,22 +67,27 @@ class Block(Base, Become, Conditional, Taggable):
@staticmethod @staticmethod
def load(data, play=None, parent_block=None, role=None, task_include=None, use_handlers=False, variable_manager=None, loader=None): def load(data, play=None, parent_block=None, role=None, task_include=None, use_handlers=False, variable_manager=None, loader=None):
b = Block(play=play, parent_block=parent_block, role=role, task_include=task_include, use_handlers=use_handlers) implicit = not Block.is_block(data)
b = Block(play=play, parent_block=parent_block, role=role, task_include=task_include, use_handlers=use_handlers, implicit=implicit)
return b.load_data(data, variable_manager=variable_manager, loader=loader) return b.load_data(data, variable_manager=variable_manager, loader=loader)
@staticmethod
def is_block(ds):
is_block = False
if isinstance(ds, dict):
for attr in ('block', 'rescue', 'always'):
if attr in ds:
is_block = True
break
return is_block
def preprocess_data(self, ds): def preprocess_data(self, ds):
''' '''
If a simple task is given, an implicit block for that single task If a simple task is given, an implicit block for that single task
is created, which goes in the main portion of the block is created, which goes in the main portion of the block
''' '''
is_block = False if not Block.is_block(ds):
for attr in ('block', 'rescue', 'always'):
if attr in ds:
is_block = True
break
if not is_block:
if isinstance(ds, list): if isinstance(ds, list):
return super(Block, self).preprocess_data(dict(block=ds)) return super(Block, self).preprocess_data(dict(block=ds))
else: else:

View file

@ -52,7 +52,13 @@ def load_list_of_blocks(ds, play, parent_block=None, role=None, task_include=Non
variable_manager=variable_manager, variable_manager=variable_manager,
loader=loader loader=loader
) )
block_list.append(b) # Implicit blocks are created by bare tasks listed in a play withou
# an explicit block statement. If we have two implicit blocks in a row,
# squash them down to a single block to save processing time later.
if b._implicit and len(block_list) > 0 and block_list[-1]._implicit:
block_list[-1].block.extend(b.block)
else:
block_list.append(b)
return block_list return block_list

View file

@ -54,8 +54,15 @@ class HostVars(collections.Mapping):
if item and item is not j2undefined: if item and item is not j2undefined:
return True return True
return False return False
def __iter__(self): def __iter__(self):
raise NotImplementedError('HostVars does not support iteration as hosts are discovered on an as needed basis.') raise NotImplementedError('HostVars does not support iteration as hosts are discovered on an as needed basis.')
def __len__(self): def __len__(self):
raise NotImplementedError('HostVars does not support len. hosts entries are discovered dynamically as needed') raise NotImplementedError('HostVars does not support len. hosts entries are discovered dynamically as needed')
def __getstate__(self):
return self._lookup
def __setstate__(self, data):
self._lookup = data