1
0
Fork 0
mirror of https://github.com/ansible-collections/community.general.git synced 2024-09-14 20:13:21 +02:00
community.general/ansible_testing/modules.py
2016-10-13 14:22:34 +01:00

637 lines
22 KiB
Python

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright (C) 2015 Matt Martz <matt@sivel.net>
# Copyright (C) 2015 Rackspace US, Inc.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from __future__ import print_function
import abc
import argparse
import ast
import os
import re
import sys
from distutils.version import StrictVersion
from fnmatch import fnmatch
from ansible import __version__ as ansible_version
from ansible.executor.module_common import REPLACER_WINDOWS
from ansible.module_utils import basic as module_utils_basic
from ansible.plugins import module_loader
from ansible.utils.module_docs import BLACKLIST_MODULES, get_docstring
from schema import doc_schema, option_schema
from utils import CaptureStd, find_globals
import yaml
BLACKLIST_DIRS = frozenset(('.git', 'test', '.github'))
INDENT_REGEX = re.compile(r'([\t]*)')
BASIC_RESERVED = frozenset((r for r in dir(module_utils_basic) if r[0] != '_'))
BLACKLIST_IMPORTS = {
'requests': ('requests import found, should use '
'ansible.module_utils.urls instead'),
}
class Validator(object):
"""Validator instances are intended to be run on a single object. if you
are scanning multiple objects for problems, you'll want to have a separate
Validator for each one."""
__metaclass__ = abc.ABCMeta
def __init__(self):
self.reset()
def reset(self):
"""Reset the test results"""
self.errors = []
self.warnings = []
self.traces = []
@abc.abstractproperty
def object_name(self):
"""Name of the object we validated"""
pass
@abc.abstractproperty
def object_path(self):
"""Path of the object we validated"""
pass
@abc.abstractmethod
def validate(self, reset=True):
"""Run this method to generate the test results"""
if reset:
self.reset()
def report(self, warnings=False):
"""Print out the test results"""
if self.errors or (warnings and self.warnings):
print('=' * 76)
print(self.object_path)
print('=' * 76)
ret = []
for trace in self.traces:
print('TRACE:')
print('\n '.join((' %s' % trace).splitlines()))
for error in self.errors:
print('ERROR: %s' % error)
ret.append(1)
if warnings:
for warning in self.warnings:
print('WARNING: %s' % warning)
# ret.append(1) # Don't incrememt exit status for warnings
if self.errors or (warnings and self.warnings):
print()
return len(ret)
class ModuleValidator(Validator):
BLACKLIST_PATTERNS = ('.git*', '*.pyc', '*.pyo', '.*', '*.md', '*.txt')
BLACKLIST_FILES = frozenset(('.git', '.gitignore', '.travis.yml',
'.gitattributes', '.gitmodules', 'COPYING',
'__init__.py', 'VERSION', 'test-docs.sh'))
BLACKLIST = BLACKLIST_FILES.union(BLACKLIST_MODULES)
PS_DOC_BLACKLIST = frozenset((
'slurp.ps1',
'setup.ps1'
))
def __init__(self, path):
super(ModuleValidator, self).__init__()
self.path = path
self.basename = os.path.basename(self.path)
self.name, _ = os.path.splitext(self.basename)
self._python_module_override = False
with open(path) as f:
self.text = f.read()
self.length = len(self.text.splitlines())
try:
self.ast = ast.parse(self.text)
except:
self.ast = None
@property
def object_name(self):
return self.basename
@property
def object_path(self):
return self.path
def _python_module(self):
if self.path.endswith('.py') or self._python_module_override:
return True
return False
def _powershell_module(self):
if self.path.endswith('.ps1'):
return True
return False
def _just_docs(self):
try:
for child in self.ast.body:
if not isinstance(child, ast.Assign):
return False
return True
except AttributeError:
return False
def _is_new_module(self):
return not module_loader.has_plugin(self.name)
def _check_interpreter(self, powershell=False):
if powershell:
if not self.text.startswith('#!powershell\n'):
self.errors.append('Interpreter line is not "#!powershell"')
return
if not self.text.startswith('#!/usr/bin/python'):
self.errors.append('Interpreter line is not "#!/usr/bin/python"')
def _check_for_sys_exit(self):
if 'sys.exit(' in self.text:
self.errors.append('sys.exit() call found. Should be '
'exit_json/fail_json')
def _check_for_gpl3_header(self):
if ('GNU General Public License' not in self.text and
'version 3' not in self.text):
self.errors.append('GPLv3 license header not found')
def _check_for_tabs(self):
for line_no, line in enumerate(self.text.splitlines()):
indent = INDENT_REGEX.search(line)
if indent and '\t' in line:
index = line.index('\t')
self.errors.append('indentation contains tabs. line %d '
'column %d' % (line_no + 1, index))
def _find_blacklist_imports(self):
for child in self.ast.body:
if isinstance(child, ast.Import):
for name in child.names:
if name.name in BLACKLIST_IMPORTS:
self.errors.append(BLACKLIST_IMPORTS[name.name])
elif isinstance(child, ast.TryExcept):
bodies = child.body
for handler in child.handlers:
bodies.extend(handler.body)
for grandchild in bodies:
if isinstance(grandchild, ast.Import):
for name in grandchild.names:
if name.name in BLACKLIST_IMPORTS:
self.errors.append(
BLACKLIST_IMPORTS[name.name]
)
def _find_module_utils(self, main):
linenos = []
for child in self.ast.body:
found_module_utils_import = False
if isinstance(child, ast.ImportFrom):
if child.module.startswith('ansible.module_utils.'):
found_module_utils_import = True
linenos.append(child.lineno)
if not child.names:
self.errors.append('%s: not a "from" import"' %
child.module)
found_alias = False
for name in child.names:
if isinstance(name, ast.alias):
found_alias = True
if name.asname or name.name != '*':
self.errors.append('%s: did not import "*"' %
child.module)
if found_module_utils_import and not found_alias:
self.errors.append('%s: did not import "*"' % child.module)
if not linenos:
self.errors.append('Did not find a module_utils import')
return linenos
def _find_main_call(self):
lineno = False
if_bodies = []
for child in self.ast.body:
if isinstance(child, ast.If):
try:
if child.test.left.id == '__name__':
if_bodies.extend(child.body)
except AttributeError:
pass
bodies = self.ast.body
bodies.extend(if_bodies)
for child in bodies:
if isinstance(child, ast.Expr):
if isinstance(child.value, ast.Call):
if (isinstance(child.value.func, ast.Name) and
child.value.func.id == 'main'):
lineno = child.lineno
if lineno < self.length - 1:
self.errors.append('Call to main() not the last '
'line')
if not lineno:
self.errors.append('Did not find a call to main')
return lineno or 0
def _find_has_import(self):
for child in self.ast.body:
found_try_except_import = False
found_has = False
if isinstance(child, ast.TryExcept):
bodies = child.body
for handler in child.handlers:
bodies.extend(handler.body)
for grandchild in bodies:
if isinstance(grandchild, ast.Import):
found_try_except_import = True
if isinstance(grandchild, ast.Assign):
for target in grandchild.targets:
if target.id.lower().startswith('has_'):
found_has = True
if found_try_except_import and not found_has:
self.warnings.append('Found Try/Except block without HAS_ '
'assginment')
def _find_ps_replacers(self):
if 'WANT_JSON' not in self.text:
self.errors.append('WANT_JSON not found in module')
if REPLACER_WINDOWS not in self.text:
self.errors.append('"%s" not found in module' % REPLACER_WINDOWS)
def _find_ps_docs_py_file(self):
if self.object_name in self.PS_DOC_BLACKLIST:
return
py_path = self.path.replace('.ps1', '.py')
if not os.path.isfile(py_path):
self.errors.append('Missing python documentation file')
def _get_docs(self):
docs = {
'DOCUMENTATION': {
'value': None,
'lineno': 0
},
'EXAMPLES': {
'value': None,
'lineno': 0
},
'RETURN': {
'value': None,
'lineno': 0
}
}
for child in self.ast.body:
if isinstance(child, ast.Assign):
for grandchild in child.targets:
if grandchild.id == 'DOCUMENTATION':
docs['DOCUMENTATION']['value'] = child.value.s
docs['DOCUMENTATION']['lineno'] = child.lineno
elif grandchild.id == 'EXAMPLES':
docs['EXAMPLES']['value'] = child.value.s[1:]
docs['EXAMPLES']['lineno'] = child.lineno
elif grandchild.id == 'RETURN':
docs['RETURN']['value'] = child.value.s
docs['RETURN']['lineno'] = child.lineno
return docs
def _validate_docs_schema(self, doc):
errors = []
try:
doc_schema(doc)
except Exception as e:
errors.extend(e.errors)
for key, option in doc.get('options', {}).iteritems():
try:
option_schema(option)
except Exception as e:
for error in e.errors:
error.path[:0] = ['options', key]
errors.extend(e.errors)
for error in errors:
path = [str(p) for p in error.path]
self.errors.append('DOCUMENTATION.%s: %s' %
('.'.join(path), error.error_message))
def _validate_docs(self):
doc_info = self._get_docs()
try:
doc = yaml.safe_load(doc_info['DOCUMENTATION']['value'])
except yaml.YAMLError as e:
doc = None
# This offsets the error line number to where the
# DOCUMENTATION starts so we can just go to that line in the
# module
e.problem_mark.line += (
doc_info['DOCUMENTATION']['lineno'] - 1
)
e.problem_mark.name = '%s.DOCUMENTATION' % self.name
self.traces.append(e)
self.errors.append('DOCUMENTATION is not valid YAML. Line %d '
'column %d' %
(e.problem_mark.line + 1,
e.problem_mark.column + 1))
except AttributeError:
self.errors.append('No DOCUMENTATION provided')
else:
with CaptureStd():
try:
get_docstring(self.path, verbose=True)
except AssertionError:
fragment = doc['extends_documentation_fragment']
self.errors.append('DOCUMENTATION fragment missing: %s' %
fragment)
except Exception as e:
self.traces.append(e)
self.errors.append('Unknown DOCUMENTATION error, see '
'TRACE')
self._validate_docs_schema(doc)
self._check_version_added(doc)
self._check_for_new_args(doc)
if not bool(doc_info['EXAMPLES']['value']):
self.errors.append('No EXAMPLES provided')
if not bool(doc_info['RETURN']['value']):
if self._is_new_module():
self.errors.append('No RETURN documentation provided')
else:
self.warnings.append('No RETURN provided')
else:
try:
yaml.safe_load(doc_info['RETURN']['value'])
except yaml.YAMLError as e:
e.problem_mark.line += (
doc_info['RETURN']['lineno'] - 1
)
e.problem_mark.name = '%s.RETURN' % self.name
self.errors.append('RETURN is not valid YAML. Line %d '
'column %d' %
(e.problem_mark.line + 1,
e.problem_mark.column + 1))
self.traces.append(e)
def _find_redeclarations(self):
g = set()
find_globals(g, self.ast.body)
redeclared = BASIC_RESERVED.intersection(g)
if redeclared:
self.warnings.append('Redeclared basic.py variable or '
'function: %s' % ', '.join(redeclared))
def _check_version_added(self, doc):
if not self._is_new_module():
return
try:
version_added = StrictVersion(str(doc.get('version_added', '0.0')))
except ValueError:
version_added = doc.get('version_added', '0.0')
self.errors.append('version_added is not a valid version '
'number: %r' % version_added)
return
should_be = '.'.join(ansible_version.split('.')[:2])
strict_ansible_version = StrictVersion(should_be)
if (version_added < strict_ansible_version or
strict_ansible_version < version_added):
self.errors.append('version_added should be %s. Currently %s' %
(should_be, version_added))
def _check_for_new_args(self, doc):
if self._is_new_module():
return
with CaptureStd():
try:
existing = module_loader.find_plugin(self.name, mod_type='.py')
existing_doc, _, _ = get_docstring(existing, verbose=True)
existing_options = existing_doc.get('options', {})
except AssertionError:
fragment = doc['extends_documentation_fragment']
self.errors.append('Existing DOCUMENTATION fragment missing: '
'%s' % fragment)
return
except Exception as e:
self.traces.append(e)
self.errors.append('Unknown existing DOCUMENTATION error, see '
'TRACE')
return
try:
mod_version_added = StrictVersion(
str(existing_doc.get('version_added', '0.0'))
)
except ValueError:
mod_version_added = StrictVersion('0.0')
options = doc.get('options', {})
should_be = '.'.join(ansible_version.split('.')[:2])
strict_ansible_version = StrictVersion(should_be)
for option, details in options.iteritems():
new = not bool(existing_options.get(option))
if not new:
continue
try:
version_added = StrictVersion(
str(details.get('version_added', '0.0'))
)
except ValueError:
version_added = details.get('version_added', '0.0')
self.errors.append('version_added for new option (%s) '
'is not a valid version number: %r' %
(option, version_added))
continue
if (strict_ansible_version != mod_version_added and
(version_added < strict_ansible_version or
strict_ansible_version < version_added)):
self.errors.append('version_added for new option (%s) should '
'be %s. Currently %s' %
(option, should_be, version_added))
def validate(self):
super(ModuleValidator, self).validate()
# Blacklists -- these files are not checked
if not frozenset((self.basename,
self.name)).isdisjoint(self.BLACKLIST):
return
for pat in self.BLACKLIST_PATTERNS:
if fnmatch(self.basename, pat):
return
# if self._powershell_module():
# self.warnings.append('Cannot check powershell modules at this '
# 'time. Skipping')
# return
if not self._python_module() and not self._powershell_module():
self.errors.append('Official Ansible modules must have a .py '
'extension for python modules or a .ps1 '
'for powershell modules')
self._python_module_override = True
if self._python_module() and self.ast is None:
self.errors.append('Python SyntaxError while parsing module')
try:
compile(self.text, self.path, 'exec')
except Exception as e:
self.traces.append(e)
return
if self._python_module():
self._validate_docs()
if self._python_module() and not self._just_docs():
self._check_for_sys_exit()
self._find_blacklist_imports()
main = self._find_main_call()
self._find_module_utils(main)
self._find_has_import()
self._check_for_tabs()
self._find_redeclarations()
if self._powershell_module():
self._find_ps_replacers()
self._find_ps_docs_py_file()
self._check_for_gpl3_header()
if not self._just_docs():
self._check_interpreter(powershell=self._powershell_module())
class PythonPackageValidator(Validator):
def __init__(self, path):
super(PythonPackageValidator, self).__init__()
self.path = path
self.basename = os.path.basename(path)
@property
def object_name(self):
return self.basename
@property
def object_path(self):
return self.path
def validate(self):
super(PythonPackageValidator, self).validate()
init_file = os.path.join(self.path, '__init__.py')
if not os.path.exists(init_file):
self.errors.append('Ansible module subdirectories must contain an '
'__init__.py')
def re_compile(value):
"""
Argparse expects things to raise TypeError, re.compile raises an re.error
exception
This function is a shorthand to convert the re.error exception to a
TypeError
"""
try:
return re.compile(value)
except re.error as e:
raise TypeError(e)
def main():
parser = argparse.ArgumentParser()
parser.add_argument('modules', nargs='+',
help='Path to module or module directory')
parser.add_argument('-w', '--warnings', help='Show warnings',
action='store_true')
parser.add_argument('--exclude', help='RegEx exclusion pattern',
type=re_compile)
args = parser.parse_args()
args.modules[:] = [m.rstrip('/') for m in args.modules]
exit = []
for module in args.modules:
if os.path.isfile(module):
path = module
if args.exclude and args.exclude.search(path):
sys.exit(0)
mv = ModuleValidator(path)
mv.validate()
exit.append(mv.report(args.warnings))
for root, dirs, files in os.walk(module):
basedir = root[len(module)+1:].split('/', 1)[0]
if basedir in BLACKLIST_DIRS:
continue
for dirname in dirs:
if root == module and dirname in BLACKLIST_DIRS:
continue
path = os.path.join(root, dirname)
if args.exclude and args.exclude.search(path):
continue
pv = PythonPackageValidator(path)
pv.validate()
exit.append(pv.report(args.warnings))
for filename in files:
path = os.path.join(root, filename)
if args.exclude and args.exclude.search(path):
continue
mv = ModuleValidator(path)
mv.validate()
exit.append(mv.report(args.warnings))
sys.exit(sum(exit))
if __name__ == '__main__':
main()