mirror of
https://github.com/ansible-collections/community.general.git
synced 2024-09-14 20:13:21 +02:00
e64989beb4
Also making PlayContext a child class of the Playbook Base class, which gives it access to all of the FieldAttribute code to ensure field values are correctly typed after post_validation Fixes #11381
175 lines
4.8 KiB
Python
175 lines
4.8 KiB
Python
#!/usr/bin/env python
|
|
|
|
import sys
|
|
import time
|
|
import Queue
|
|
import traceback
|
|
import multiprocessing
|
|
|
|
from ansible.inventory import Inventory
|
|
from ansible.inventory.host import Host
|
|
from ansible.playbook.play import Play
|
|
from ansible.playbook.play_context import PlayContext
|
|
from ansible.playbook.task import Task
|
|
from ansible.executor.task_executor import TaskExecutor
|
|
from ansible.executor.task_result import TaskResult
|
|
from ansible.parsing import DataLoader
|
|
from ansible.vars import VariableManager
|
|
|
|
from ansible.utils.debug import debug
|
|
|
|
NUM_WORKERS = 20
|
|
NUM_HOSTS = 1778
|
|
NUM_TASKS = 1
|
|
|
|
def results(final_q, workers):
|
|
cur_worker = 0
|
|
def _read_worker_result(cur_worker):
|
|
result = None
|
|
starting_point = cur_worker
|
|
while True:
|
|
(worker_prc, main_q, res_q) = workers[cur_worker]
|
|
cur_worker += 1
|
|
if cur_worker >= len(workers):
|
|
cur_worker = 0
|
|
|
|
try:
|
|
if not res_q.empty():
|
|
debug("worker %d has data to read" % cur_worker)
|
|
result = res_q.get()
|
|
debug("got a result from worker %d: %s" % (cur_worker, result))
|
|
break
|
|
except:
|
|
pass
|
|
|
|
if cur_worker == starting_point:
|
|
break
|
|
|
|
return (result, cur_worker)
|
|
|
|
while True:
|
|
result = None
|
|
try:
|
|
(result, cur_worker) = _read_worker_result(cur_worker)
|
|
if result is None:
|
|
time.sleep(0.01)
|
|
continue
|
|
final_q.put(result, block=False)
|
|
except (IOError, EOFError, KeyboardInterrupt) as e:
|
|
debug("got a breaking error: %s" % e)
|
|
break
|
|
except Exception as e:
|
|
debug("EXCEPTION DURING RESULTS PROCESSING: %s" % e)
|
|
traceback.print_exc()
|
|
break
|
|
|
|
def worker(main_q, res_q, loader):
|
|
while True:
|
|
task = None
|
|
try:
|
|
if not main_q.empty():
|
|
(host, task, task_vars, conn_info) = main_q.get(block=False)
|
|
executor_result = TaskExecutor(host, task, task_vars, conn_info, loader).run()
|
|
debug("executor result: %s" % executor_result)
|
|
task_result = TaskResult(host, task, executor_result)
|
|
res_q.put(task_result)
|
|
else:
|
|
time.sleep(0.01)
|
|
except Queue.Empty:
|
|
pass
|
|
except (IOError, EOFError, KeyboardInterrupt) as e:
|
|
debug("got a breaking error: %s" % e)
|
|
break
|
|
except Exception as e:
|
|
debug("EXCEPTION DURING WORKER PROCESSING: %s" % e)
|
|
traceback.print_exc()
|
|
break
|
|
|
|
loader = DataLoader()
|
|
|
|
workers = []
|
|
for i in range(NUM_WORKERS):
|
|
main_q = multiprocessing.Queue()
|
|
res_q = multiprocessing.Queue()
|
|
worker_p = multiprocessing.Process(target=worker, args=(main_q, res_q, loader))
|
|
worker_p.start()
|
|
workers.append((worker_p, main_q, res_q))
|
|
|
|
res_q = multiprocessing.Queue()
|
|
res_p = multiprocessing.Process(target=results, args=(res_q, workers))
|
|
res_p.start()
|
|
|
|
def send_data(obj):
|
|
global cur_worker
|
|
global workers
|
|
global pending_results
|
|
|
|
(w_proc, main_q, wrkr_q) = workers[cur_worker]
|
|
cur_worker += 1
|
|
if cur_worker >= len(workers):
|
|
cur_worker = 0
|
|
|
|
pending_results += 1
|
|
main_q.put(obj, block=False)
|
|
|
|
def _process_pending_results():
|
|
global res_q
|
|
global pending_results
|
|
|
|
while not res_q.empty():
|
|
try:
|
|
result = res_q.get(block=False)
|
|
debug("got final result: %s" % (result,))
|
|
pending_results -= 1
|
|
except Queue.Empty:
|
|
pass
|
|
|
|
def _wait_on_pending_results():
|
|
global pending_results
|
|
while pending_results > 0:
|
|
debug("waiting for pending results (%d left)" % pending_results)
|
|
_process_pending_results()
|
|
time.sleep(0.01)
|
|
|
|
|
|
debug("starting")
|
|
cur_worker = 0
|
|
pending_results = 0
|
|
|
|
|
|
var_manager = VariableManager()
|
|
|
|
debug("loading inventory")
|
|
inventory = Inventory(host_list='/tmp/med_inventory', loader=loader, variable_manager=var_manager)
|
|
hosts = inventory.get_hosts()[:]
|
|
debug("done loading inventory")
|
|
|
|
play_context = PlayContext()
|
|
play_context.connection = 'local'
|
|
|
|
for i in range(NUM_TASKS):
|
|
#for j in range(NUM_HOSTS):
|
|
for h in hosts:
|
|
debug("queuing %s %d" % (h, i))
|
|
#h = Host(name="host%06d" % j)
|
|
t = Task().load(dict(name="task %d" % (i,), debug="msg='hello from %s, %d'" % (h,i)))
|
|
#t = Task().load(dict(name="task %d" % (i,), ping=""))
|
|
#task_vars = var_manager.get_vars(loader=loader, host=h, task=t)
|
|
task_vars = dict()
|
|
new_t = t.copy()
|
|
new_t.post_validate(task_vars)
|
|
send_data((h, t, task_vars, play_context))
|
|
debug("done queuing %s %d" % (h, i))
|
|
_process_pending_results()
|
|
debug("waiting for the results to drain...")
|
|
_wait_on_pending_results()
|
|
|
|
res_q.close()
|
|
res_p.terminate()
|
|
|
|
for (w_p, main_q, wrkr_q) in workers:
|
|
main_q.close()
|
|
wrkr_q.close()
|
|
w_p.terminate()
|
|
|
|
debug("done")
|