mirror of
https://github.com/ansible-collections/community.general.git
synced 2024-09-14 20:13:21 +02:00
d8ae4dfbf2
* Adding aliases for field attributes and renaming async attribute As of Python 3.7, the use of async raises an error, whereas before the use of the reserved word was ignored. This adds an alias field for field attrs so that both async and async_val (interally) work. This allows us to be backwards-compatible with 3rd party plugins that may still reference Task.async, but for the core engine to work on Py3.7+. * Remove files fixed for 'async' usage from the python 3.7 skip list
243 lines
7.8 KiB
Python
243 lines
7.8 KiB
Python
'''
|
|
(Epdb) pprint(DeepDiff(self.final_task_vars, out_task_vars), indent=2)
|
|
{ 'dic_item_added': set([u"root['ansible_python_interpreter']"]),
|
|
'dic_item_removed': set([ u"root['hostvars']['127.0.0.1']",
|
|
u"root['hostvars']['::1']",
|
|
u"root['hostvars']['localhost']"]),
|
|
'iterable_item_added': { u"root['hostvars']['el6host']['groups']['all'][1]": u'::1',
|
|
u"root['hostvars']['el6host']['groups']['ungrouped'][1]": u'::1',
|
|
u"root['vars']['hostvars']['el6host']['groups']['all'][1]": u'::1',
|
|
u"root['vars']['hostvars']['el6host']['groups']['ungrouped'][1]": u'::1'}}
|
|
'''
|
|
|
|
import json
|
|
import os
|
|
import sys
|
|
import unittest
|
|
import yaml
|
|
|
|
from pprint import pprint
|
|
|
|
import ansible.plugins
|
|
from ansible.compat.tests.mock import patch, MagicMock
|
|
from ansible.plugins.action.synchronize import ActionModule
|
|
|
|
|
|
# Getting the incoming and outgoing task vars from the plugin's run method
|
|
|
|
'''
|
|
import copy
|
|
safe_vars = {}
|
|
for k,v in task_vars.items():
|
|
if k not in ['vars', 'hostvars']:
|
|
safe_vars[k] = copy.deepcopy(v)
|
|
else:
|
|
sdata = str(v)
|
|
newv = eval(sdata)
|
|
safe_vars[k] = newv
|
|
|
|
import json
|
|
with open('task_vars.json', 'wb') as f:
|
|
f.write(json.dumps(safe_vars, indent=2))
|
|
'''
|
|
|
|
|
|
class TaskMock(object):
|
|
args = {'src': u'/tmp/deleteme',
|
|
'dest': '/tmp/deleteme',
|
|
'rsync_path': 'rsync'}
|
|
async_val = None
|
|
become = None
|
|
become_user = None
|
|
become_method = None
|
|
|
|
|
|
class StdinMock(object):
|
|
shell = None
|
|
|
|
|
|
class ConnectionMock(object):
|
|
ismock = True
|
|
_play_context = None
|
|
# transport = 'ssh'
|
|
transport = None
|
|
_new_stdin = StdinMock()
|
|
|
|
|
|
class PlayContextMock(object):
|
|
shell = None
|
|
private_key_file = None
|
|
become = False
|
|
become_user = 'root'
|
|
become_method = None
|
|
check_mode = False
|
|
no_log = None
|
|
diff = None
|
|
remote_addr = None
|
|
remote_user = None
|
|
password = None
|
|
|
|
|
|
class ModuleLoaderMock(object):
|
|
def find_plugin(self, module_name, mod_type):
|
|
pass
|
|
|
|
|
|
class SharedLoaderMock(object):
|
|
module_loader = ModuleLoaderMock()
|
|
|
|
|
|
class SynchronizeTester(object):
|
|
|
|
''' A wrapper for mocking out synchronize environments '''
|
|
|
|
task = TaskMock()
|
|
connection = ConnectionMock()
|
|
_play_context = PlayContextMock()
|
|
loader = None
|
|
templar = None
|
|
shared_loader_obj = SharedLoaderMock()
|
|
|
|
final_task_vars = None
|
|
execute_called = False
|
|
|
|
def _execute_module(self, module_name, module_args=None, task_vars=None):
|
|
self.execute_called = True
|
|
self.final_module_args = module_args
|
|
self.final_task_vars = task_vars
|
|
return {}
|
|
|
|
def runtest(self, fixturepath='fixtures/synchronize/basic'):
|
|
|
|
metapath = os.path.join(fixturepath, 'meta.yaml')
|
|
with open(metapath, 'rb') as f:
|
|
fdata = f.read()
|
|
test_meta = yaml.load(fdata)
|
|
|
|
# load initial play context vars
|
|
if '_play_context' in test_meta:
|
|
if test_meta['_play_context']:
|
|
self.task.args = {}
|
|
for (k, v) in test_meta['_play_context'].items():
|
|
if v == 'None':
|
|
v = None
|
|
setattr(self._play_context, k, v)
|
|
|
|
# load initial task context vars
|
|
if '_task' in test_meta:
|
|
if test_meta['_task']:
|
|
self.task.args = {}
|
|
for (k, v) in test_meta['_task'].items():
|
|
# import epdb; epdb.st()
|
|
if v == 'None':
|
|
v = None
|
|
setattr(self.task, k, v)
|
|
|
|
# load initial task vars
|
|
if 'task_args' in test_meta:
|
|
if test_meta['task_args']:
|
|
self.task.args = {}
|
|
for (k, v) in test_meta['task_args'].items():
|
|
self.task.args[k] = v
|
|
|
|
# load initial task vars
|
|
invarspath = os.path.join(fixturepath, test_meta.get('fixtures', {}).get('taskvars_in', 'taskvars_in.json'))
|
|
with open(invarspath, 'rb') as f:
|
|
fdata = f.read()
|
|
fdata = fdata.decode("utf-8")
|
|
in_task_vars = json.loads(fdata)
|
|
|
|
# load expected final task vars
|
|
outvarspath = os.path.join(fixturepath, test_meta.get('fixtures', {}).get('taskvars_out', 'taskvars_out.json'))
|
|
with open(outvarspath, 'rb') as f:
|
|
fdata = f.read()
|
|
fdata = fdata.decode("utf-8")
|
|
out_task_vars = json.loads(fdata)
|
|
|
|
# fixup the connection
|
|
for (k, v) in test_meta['connection'].items():
|
|
setattr(self.connection, k, v)
|
|
|
|
# fixup the hostvars
|
|
if test_meta['hostvars']:
|
|
for (k, v) in test_meta['hostvars'].items():
|
|
in_task_vars['hostvars'][k] = v
|
|
|
|
# initialize and run the module
|
|
SAM = ActionModule(self.task, self.connection, self._play_context,
|
|
self.loader, self.templar, self.shared_loader_obj)
|
|
SAM._execute_module = self._execute_module
|
|
result = SAM.run(task_vars=in_task_vars)
|
|
|
|
# run assertions
|
|
for check in test_meta['asserts']:
|
|
value = eval(check)
|
|
# if not value:
|
|
# print(check, value)
|
|
# import epdb; epdb.st()
|
|
assert value, check
|
|
|
|
|
|
class FakePluginLoader(object):
|
|
mocked = True
|
|
|
|
@staticmethod
|
|
def get(transport, play_context, new_stdin):
|
|
conn = ConnectionMock()
|
|
conn.transport = transport
|
|
conn._play_context = play_context
|
|
conn._new_stdin = new_stdin
|
|
return conn
|
|
|
|
|
|
class TestSynchronizeAction(unittest.TestCase):
|
|
|
|
fixturedir = os.path.dirname(__file__)
|
|
fixturedir = os.path.join(fixturedir, 'fixtures', 'synchronize')
|
|
# print(basedir)
|
|
|
|
@patch('ansible.plugins.action.synchronize.connection_loader', FakePluginLoader)
|
|
def test_basic(self):
|
|
x = SynchronizeTester()
|
|
x.runtest(fixturepath=os.path.join(self.fixturedir, 'basic'))
|
|
|
|
@patch('ansible.plugins.action.synchronize.connection_loader', FakePluginLoader)
|
|
def test_basic_become(self):
|
|
x = SynchronizeTester()
|
|
x.runtest(fixturepath=os.path.join(self.fixturedir, 'basic_become'))
|
|
|
|
@patch('ansible.plugins.action.synchronize.connection_loader', FakePluginLoader)
|
|
def test_basic_become_cli(self):
|
|
# --become on the cli sets _play_context.become
|
|
x = SynchronizeTester()
|
|
x.runtest(fixturepath=os.path.join(self.fixturedir, 'basic_become_cli'))
|
|
|
|
@patch('ansible.plugins.action.synchronize.connection_loader', FakePluginLoader)
|
|
def test_basic_vagrant(self):
|
|
# simple vagrant example
|
|
x = SynchronizeTester()
|
|
x.runtest(fixturepath=os.path.join(self.fixturedir, 'basic_vagrant'))
|
|
|
|
@patch('ansible.plugins.action.synchronize.connection_loader', FakePluginLoader)
|
|
def test_basic_vagrant_sudo(self):
|
|
# vagrant plus sudo
|
|
x = SynchronizeTester()
|
|
x.runtest(fixturepath=os.path.join(self.fixturedir, 'basic_vagrant_sudo'))
|
|
|
|
@patch('ansible.plugins.action.synchronize.connection_loader', FakePluginLoader)
|
|
def test_basic_vagrant_become_cli(self):
|
|
# vagrant plus sudo
|
|
x = SynchronizeTester()
|
|
x.runtest(fixturepath=os.path.join(self.fixturedir, 'basic_vagrant_become_cli'))
|
|
|
|
@patch('ansible.plugins.action.synchronize.connection_loader', FakePluginLoader)
|
|
def test_delegate_remote(self):
|
|
# delegate to other remote host
|
|
x = SynchronizeTester()
|
|
x.runtest(fixturepath=os.path.join(self.fixturedir, 'delegate_remote'))
|
|
|
|
@patch('ansible.plugins.action.synchronize.connection_loader', FakePluginLoader)
|
|
def test_delegate_remote_su(self):
|
|
# delegate to other remote host with su enabled
|
|
x = SynchronizeTester()
|
|
x.runtest(fixturepath=os.path.join(self.fixturedir, 'delegate_remote_su'))
|