From adb1719fe8bd61b2537e84ffdc7b5ffac2f68620 Mon Sep 17 00:00:00 2001 From: Michael DeHaan Date: Fri, 7 Feb 2014 16:53:18 -0500 Subject: [PATCH] Add the fork manager code (related to previous patch). --- lib/ansible/runner/foon.py | 134 +++++++++++++++++++++++++++++++++++++ 1 file changed, 134 insertions(+) create mode 100644 lib/ansible/runner/foon.py diff --git a/lib/ansible/runner/foon.py b/lib/ansible/runner/foon.py new file mode 100644 index 0000000000..f084ba726e --- /dev/null +++ b/lib/ansible/runner/foon.py @@ -0,0 +1,134 @@ +# (c) 2012-2014, Michael DeHaan +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see . + + + + +import multiprocessing.pool as mpool +import multiprocessing +import sys +import os + +POOL = None +OLD_SIZE = 0 + +class MyPool(mpool.Pool): + + ''' + What is Foon? + A Foon is another name for a Spork, which is a Fork plus a Spoon + this class is a wrapper around multiprocessing in Python + which deals with parallelism via Forks. + there is no Spoon. + + Two things we do differently over stock multiprocessing: + * intercept exceptions + * duplicate stdin per host to enable the process to ask questions about host key checking + + That's it. This class is specific to Ansible's runner forking and is not meant to be generic. + ''' + + + def __init__(self, *args, **kwargs): + super(MyPool, self).__init__(*args, **kwargs) + + # overriding map_async to catch exceptions and be extensible + + def map_async(self, func, iterable, chunksize=None, callback=None): + ''' + Asynchronous equivalent of `map()` builtin + ''' + + mapstar = mpool.mapstar + + if not hasattr(iterable, '__len__'): + iterable = list(iterable) + + if chunksize is None: + chunksize, extra = divmod(len(iterable), len(self._pool) * 4) + if extra: + chunksize += 1 + + task_batches = MyPool._get_tasks(func, iterable, chunksize) + + new_batches = [] + stdins = [] + for x in task_batches: + # make sure each batch has a different stdin + new_stdin = os.fdopen(os.dup(sys.stdin.fileno())) + (function, data_list) = x + new_data_list = [] + for host_name in data_list: + new_data_list.append((host_name, new_stdin)) + stdins.append(new_stdin) + new_batches.append((function, new_data_list)) + + result = mpool.MapResult(self._cache, chunksize, len(iterable), callback) + + #for i, x in enumerate(task_batches): + # print "%s => %s" % (i,x) + + self._taskqueue.put( + ( + ( + (result._job, i, mapstar, (x,), {}) for i, x in enumerate(new_batches) # task_batches) + ), None) + ) + + for x in stdins: + x.close() + + return result + + + + +class Foon(object): + + def __init__(self): + self.set_size(0) + + def make_pool(self, processes=None, initializer=None, initargs=()): + ''' + Returns a process pool object + ''' + return MyPool(processes, initializer, initargs) + + def set_size(self, size): + + global OLD_SIZE + global POOL + + if size > OLD_SIZE or POOL is None: + OLD_SIZE = size + POOL = self.make_pool() + + + def map(self, function, data_list): + + global POOL + try: + return POOL.map(function, data_list) + except KeyboardInterrupt: + print "KEYBOARD INTERRUPT!" + sys.exit(1) + + + + + +