From d293a46994f6faafad005cfa6f31455cc75d2ac9 Mon Sep 17 00:00:00 2001 From: Toshio Kuratomi Date: Thu, 4 Sep 2014 18:03:49 -0700 Subject: [PATCH] Unittests to detect speed regressions in password obfuscation and that the passwords are correctly hidden in the output. --- test/units/TestModuleUtilsBasic.py | 145 +++++++++++++++++++++++++++++ 1 file changed, 145 insertions(+) diff --git a/test/units/TestModuleUtilsBasic.py b/test/units/TestModuleUtilsBasic.py index d1ee95a1a0..1eeff8058c 100644 --- a/test/units/TestModuleUtilsBasic.py +++ b/test/units/TestModuleUtilsBasic.py @@ -3,6 +3,7 @@ import tempfile import unittest from nose.tools import raises +from nose.tools import timed from ansible import errors from ansible.module_common import ModuleReplacer @@ -159,3 +160,147 @@ class TestModuleUtilsBasic(unittest.TestCase): self.cleanup_temp_dir(tmp_path) +class TestModuleUtilsBasicHelpers(unittest.TestCase): + ''' Test some implementation details of AnsibleModule + + Some pieces of AnsibleModule are implementation details but they have + potential cornercases that we need to check. Go ahead and test at + this level that the functions are behaving even though their API may + change and we'd have to rewrite these tests so that we know that we + need to check for those problems in any rewrite. + + In the future we might want to restructure higher level code to be + friendlier to unittests so that we can test at the level that the public + is interacting with the APIs. + ''' + + MANY_RECORDS = 7000 + URL_SECRET = 'http://username:pas:word@foo.com/data' + SSH_SECRET = 'username:pas:word@foo.com/data' + + def cleanup_temp_file(self, fd, path): + try: + os.close(fd) + os.remove(path) + except: + pass + + def cleanup_temp_dir(self, path): + try: + os.rmdir(path) + except: + pass + + def _gen_data(self, records, per_rec, top_level, secret_text): + hostvars = {'hostvars': {}} + for i in range(1, records, 1): + host_facts = {'host%s' % i: + {'pstack': + {'running': '875.1', + 'symlinked': '880.0', + 'tars': [], + 'versions': ['885.0']}, + }} + + if per_rec: + host_facts['host%s' % i]['secret'] = secret_text + hostvars['hostvars'].update(host_facts) + if top_level: + hostvars['secret'] = secret_text + return hostvars + + def setUp(self): + self.many_url = repr(self._gen_data(self.MANY_RECORDS, True, True, + self.URL_SECRET)) + self.many_ssh = repr(self._gen_data(self.MANY_RECORDS, True, True, + self.SSH_SECRET)) + self.one_url = repr(self._gen_data(self.MANY_RECORDS, False, True, + self.URL_SECRET)) + self.one_ssh = repr(self._gen_data(self.MANY_RECORDS, False, True, + self.SSH_SECRET)) + self.zero_secrets = repr(self._gen_data(self.MANY_RECORDS, False, + False, '')) + self.few_url = repr(self._gen_data(2, True, True, self.URL_SECRET)) + self.few_ssh = repr(self._gen_data(2, True, True, self.SSH_SECRET)) + + # create a temporary file for the test module + # we're about to generate + self.tmp_fd, self.tmp_path = tempfile.mkstemp() + os.write(self.tmp_fd, TEST_MODULE_DATA) + + # template the module code and eval it + module_data, module_style, shebang = ModuleReplacer().modify_module(self.tmp_path, {}, "", {}) + + d = {} + exec(module_data, d, d) + self.module = d['get_module']() + + # module_utils/basic.py screws with CWD, let's save it and reset + self.cwd = os.getcwd() + + def tearDown(self): + self.cleanup_temp_file(self.tmp_fd, self.tmp_path) + # Reset CWD back to what it was before basic.py changed it + os.chdir(self.cwd) + + + ################################################################################# + + # + # Several speed tests -- previously, the obfuscation of passwords carried + # an unreasonable speed penalty for some module parameters. We want to be + # sure we don't regress to that state. + # + + @timed(5) + def test_log_sanitize_speed_many_url(self): + self.module._heuristic_log_sanitize(self.many_url) + + @timed(5) + def test_log_sanitize_speed_many_ssh(self): + self.module._heuristic_log_sanitize(self.many_ssh) + + @timed(5) + def test_log_sanitize_speed_one_url(self): + self.module._heuristic_log_sanitize(self.one_url) + + @timed(5) + def test_log_sanitize_speed_one_ssh(self): + self.module._heuristic_log_sanitize(self.one_ssh) + + @timed(5) + def test_log_sanitize_speed_zero_secrets(self): + self.module._heuristic_log_sanitize(self.zero_secrets) + + # + # Test that the password obfuscation sanitizes somewhat cleanly. + # + + def test_log_sanitize_correctness(self): + url_data = repr(self._gen_data(3, True, True, self.URL_SECRET)) + ssh_data = repr(self._gen_data(3, True, True, self.SSH_SECRET)) + + url_output = self.module._heuristic_log_sanitize(url_data) + ssh_output = self.module._heuristic_log_sanitize(ssh_data) + + # Basic functionality: Successfully hid the password + self.assertNotIn('pas:word', url_output) + self.assertNotIn('pas:word', ssh_output) + + # Slightly more advanced, we hid all of the password despite the ":" + self.assertNotIn('pas', url_output) + self.assertNotIn('pas', ssh_output) + + # In this implementation we replace the password with 8 "*" which is + # also the length of our password. The url fields should be able to + # accurately detect where the password ends so the length should be + # the same: + self.assertEqual(len(url_output), len(url_data)) + + # ssh checking is somewhat harder as the heuristic is overzealous in + # most cases. Since the input will have at least one ":" present + # before the password we can tell some things about the beginning and + # end of the data, though: + self.assertTrue(ssh_output.startswith("{'")) + self.assertTrue(ssh_output.endswith("'}}}}")) + self.assertIn(":********@foo.com/data',", ssh_output)