From 63c47fb2715311a1a6258332221034585ea23593 Mon Sep 17 00:00:00 2001 From: James Cammarata Date: Wed, 4 Nov 2015 11:26:06 -0500 Subject: [PATCH] Fixing up performance --- lib/ansible/executor/process/worker.py | 22 ++++++++++++++++------ lib/ansible/executor/task_executor.py | 9 ++++++--- lib/ansible/executor/task_queue_manager.py | 2 +- lib/ansible/inventory/__init__.py | 9 +++++++-- lib/ansible/plugins/strategy/__init__.py | 16 ++++++++-------- lib/ansible/plugins/strategy/linear.py | 2 +- lib/ansible/template/__init__.py | 2 +- lib/ansible/template/vars.py | 5 ++++- lib/ansible/vars/__init__.py | 2 +- lib/ansible/vars/unsafe_proxy.py | 19 +++++++++++++++++-- 10 files changed, 62 insertions(+), 26 deletions(-) diff --git a/lib/ansible/executor/process/worker.py b/lib/ansible/executor/process/worker.py index a8bb3d8085..4754d76790 100644 --- a/lib/ansible/executor/process/worker.py +++ b/lib/ansible/executor/process/worker.py @@ -21,6 +21,7 @@ __metaclass__ = type from ansible.compat.six.moves import queue +import json import multiprocessing import os import signal @@ -43,6 +44,7 @@ from ansible.executor.task_executor import TaskExecutor from ansible.executor.task_result import TaskResult from ansible.playbook.handler import Handler from ansible.playbook.task import Task +from ansible.vars.unsafe_proxy import AnsibleJSONUnsafeDecoder from ansible.utils.debug import debug @@ -59,9 +61,9 @@ class WorkerProcess(multiprocessing.Process): def __init__(self, tqm, main_q, rslt_q, loader): # takes a task queue manager as the sole param: - self._main_q = main_q - self._rslt_q = rslt_q - self._loader = loader + self._main_q = main_q + self._rslt_q = rslt_q + self._loader = loader # dupe stdin, if we have one self._new_stdin = sys.stdin @@ -97,9 +99,9 @@ class WorkerProcess(multiprocessing.Process): while True: task = None try: + debug("waiting for a message...") (host, task, basedir, job_vars, play_context, shared_loader_obj) = self._main_q.get() - debug("there's work to be done!") - debug("got a task/handler to work on: %s" % task) + debug("there's work to be done! got a task/handler to work on: %s" % task) # because the task queue manager starts workers (forks) before the # playbook is loaded, set the basedir of the loader inherted by @@ -114,7 +116,15 @@ class WorkerProcess(multiprocessing.Process): # execute the task and build a TaskResult from the result debug("running TaskExecutor() for %s/%s" % (host, task)) - executor_result = TaskExecutor(host, task, job_vars, play_context, self._new_stdin, self._loader, shared_loader_obj).run() + executor_result = TaskExecutor( + host, + task, + job_vars, + play_context, + self._new_stdin, + self._loader, + shared_loader_obj, + ).run() debug("done running TaskExecutor() for %s/%s" % (host, task)) task_result = TaskResult(host, task, executor_result) diff --git a/lib/ansible/executor/task_executor.py b/lib/ansible/executor/task_executor.py index c3c6821700..91dde77434 100644 --- a/lib/ansible/executor/task_executor.py +++ b/lib/ansible/executor/task_executor.py @@ -156,7 +156,8 @@ class TaskExecutor: # create a copy of the job vars here so that we can modify # them temporarily without changing them too early for other # parts of the code that might still need a pristine version - vars_copy = self._job_vars.copy() + #vars_copy = self._job_vars.copy() + vars_copy = self._job_vars # now we update them with the play context vars self._play_context.update_vars(vars_copy) @@ -196,7 +197,8 @@ class TaskExecutor: # make copies of the job vars and task so we can add the item to # the variables and re-validate the task with the item variable - task_vars = self._job_vars.copy() + #task_vars = self._job_vars.copy() + task_vars = self._job_vars items = self._squash_items(items, task_vars) for item in items: @@ -357,7 +359,8 @@ class TaskExecutor: # make a copy of the job vars here, in case we need to update them # with the registered variable value later on when testing conditions - vars_copy = variables.copy() + #vars_copy = variables.copy() + vars_copy = variables self._display.debug("starting attempt loop") result = None diff --git a/lib/ansible/executor/task_queue_manager.py b/lib/ansible/executor/task_queue_manager.py index 63cf5d2921..9dcb3769d9 100644 --- a/lib/ansible/executor/task_queue_manager.py +++ b/lib/ansible/executor/task_queue_manager.py @@ -96,7 +96,7 @@ class TaskQueueManager: self._workers = [] def _initialize_workers(self, num): - for i in range(num): + for i in xrange(num_workers): main_q = multiprocessing.Queue() rslt_q = multiprocessing.Queue() diff --git a/lib/ansible/inventory/__init__.py b/lib/ansible/inventory/__init__.py index a967553385..2c682402a1 100644 --- a/lib/ansible/inventory/__init__.py +++ b/lib/ansible/inventory/__init__.py @@ -77,6 +77,13 @@ class Inventory(object): self.parse_inventory(host_list) + def serialize(self): + data = dict() + return data + + def deserialize(self, data): + pass + def parse_inventory(self, host_list): if isinstance(host_list, string_types): @@ -686,8 +693,6 @@ class Inventory(object): basedirs = [self._playbook_basedir] for basedir in basedirs: - display.debug('getting vars from %s' % basedir) - # this can happen from particular API usages, particularly if not run # from /usr/bin/ansible-playbook if basedir in ('', None): diff --git a/lib/ansible/plugins/strategy/__init__.py b/lib/ansible/plugins/strategy/__init__.py index e5c43572c2..fdbfb70772 100644 --- a/lib/ansible/plugins/strategy/__init__.py +++ b/lib/ansible/plugins/strategy/__init__.py @@ -22,6 +22,9 @@ __metaclass__ = type from ansible.compat.six.moves import queue as Queue from ansible.compat.six import iteritems, text_type, string_types +import json +import pickle +import sys import time from jinja2.exceptions import UndefinedError @@ -37,7 +40,7 @@ from ansible.playbook.included_file import IncludedFile from ansible.playbook.role import hash_params from ansible.plugins import action_loader, connection_loader, filter_loader, lookup_loader, module_loader, test_loader from ansible.template import Templar -from ansible.vars.unsafe_proxy import wrap_var +from ansible.vars.unsafe_proxy import wrap_var, AnsibleJSONUnsafeEncoder try: from __main__ import display @@ -127,11 +130,8 @@ class StrategyBase: Base class method to add extra variables/information to the list of task vars sent through the executor engine regarding the task queue manager state. ''' - - new_vars = vars.copy() - new_vars['ansible_current_hosts'] = self.get_hosts_remaining(play) - new_vars['ansible_failed_hosts'] = self.get_failed_hosts(play) - return new_vars + vars['ansible_current_hosts'] = [h.name for h in self.get_hosts_remaining(play)] + vars['ansible_failed_hosts'] = [h.name for h in self.get_failed_hosts(play)] def _queue_task(self, host, task, task_vars, play_context): ''' handles queueing the task up to be sent to a worker ''' @@ -263,7 +263,7 @@ class StrategyBase: if task.delegate_to is not None: task_vars = self._variable_manager.get_vars(loader=self._loader, play=iterator._play, host=host, task=task) - task_vars = self.add_tqm_variables(task_vars, play=iterator._play) + add_tqm_variables(task_vars, play=iterator._play) if item is not None: task_vars['item'] = item templar = Templar(loader=self._loader, variables=task_vars) @@ -516,7 +516,7 @@ class StrategyBase: for host in notified_hosts: if not handler.has_triggered(host) and (host.name not in self._tqm._failed_hosts or play_context.force_handlers): task_vars = self._variable_manager.get_vars(loader=self._loader, play=iterator._play, host=host, task=handler) - task_vars = self.add_tqm_variables(task_vars, play=iterator._play) + self.add_tqm_variables(task_vars, play=iterator._play) self._queue_task(host, handler, task_vars, play_context) if run_once: break diff --git a/lib/ansible/plugins/strategy/linear.py b/lib/ansible/plugins/strategy/linear.py index 22d2a708ed..321e5ced17 100644 --- a/lib/ansible/plugins/strategy/linear.py +++ b/lib/ansible/plugins/strategy/linear.py @@ -211,7 +211,7 @@ class StrategyModule(StrategyBase): self._display.debug("getting variables") task_vars = self._variable_manager.get_vars(loader=self._loader, play=iterator._play, host=host, task=task) - task_vars = self.add_tqm_variables(task_vars, play=iterator._play) + self.add_tqm_variables(task_vars, play=iterator._play) templar = Templar(loader=self._loader, variables=task_vars) self._display.debug("done getting variables") diff --git a/lib/ansible/template/__init__.py b/lib/ansible/template/__init__.py index 6226ee66ca..72dd462aa9 100644 --- a/lib/ansible/template/__init__.py +++ b/lib/ansible/template/__init__.py @@ -258,7 +258,7 @@ class Templar: ''' assert isinstance(variables, dict) - self._available_variables = variables.copy() + self._available_variables = variables def template(self, variable, convert_bare=False, preserve_trailing_newlines=True, escape_backslashes=True, fail_on_undefined=None, overrides=None, convert_data=True): ''' diff --git a/lib/ansible/template/vars.py b/lib/ansible/template/vars.py index d55169368a..b1b82d6d3e 100644 --- a/lib/ansible/template/vars.py +++ b/lib/ansible/template/vars.py @@ -83,7 +83,10 @@ class AnsibleJ2Vars: if isinstance(variable, dict) and varname == "vars" or isinstance(variable, HostVars): return variable else: - return self._templar.template(variable) + if self._templar._contains_vars(variable): + return self._templar.template(variable) + else: + return variable def add_locals(self, locals): ''' diff --git a/lib/ansible/vars/__init__.py b/lib/ansible/vars/__init__.py index a8972293d6..fd912ceaec 100644 --- a/lib/ansible/vars/__init__.py +++ b/lib/ansible/vars/__init__.py @@ -198,7 +198,7 @@ class VariableManager: debug("vars are cached, returning them now") return VARIABLE_CACHE[cache_entry] - all_vars = defaultdict(dict) + all_vars = dict() magic_variables = self._get_magic_variables( loader=loader, play=play, diff --git a/lib/ansible/vars/unsafe_proxy.py b/lib/ansible/vars/unsafe_proxy.py index 47b56db723..ac5cce24af 100644 --- a/lib/ansible/vars/unsafe_proxy.py +++ b/lib/ansible/vars/unsafe_proxy.py @@ -53,11 +53,12 @@ from __future__ import (absolute_import, division, print_function) __metaclass__ = type +import json + from ansible.utils.unicode import to_unicode from ansible.compat.six import string_types, text_type -__all__ = ['UnsafeProxy', 'AnsibleUnsafe', 'wrap_var'] - +__all__ = ['UnsafeProxy', 'AnsibleUnsafe', 'AnsibleJSONUnsafeEncoder', 'AnsibleJSONUnsafeDecoder', 'wrap_var'] class AnsibleUnsafe(object): __UNSAFE__ = True @@ -76,6 +77,20 @@ class UnsafeProxy(object): return AnsibleUnsafeText(obj) return obj +class AnsibleJSONUnsafeEncoder(json.JSONEncoder): + def encode(self, obj): + if isinstance(obj, AnsibleUnsafe): + return super(AnsibleJSONUnsafeEncoder, self).encode(dict(__ansible_unsafe=True, value=unicode(obj))) + else: + return super(AnsibleJSONUnsafeEncoder, self).encode(obj) + +class AnsibleJSONUnsafeDecoder(json.JSONDecoder): + def decode(self, obj): + value = super(AnsibleJSONUnsafeDecoder, self).decode(obj) + if isinstance(value, dict) and '__ansible_unsafe' in value: + return UnsafeProxy(value.get('value', '')) + else: + return value def _wrap_dict(v): for k in v.keys():