mirror of
https://github.com/ansible-collections/community.general.git
synced 2024-09-14 20:13:21 +02:00
Initial version of source/testing report tool.
This commit is contained in:
parent
bf48364c72
commit
1c1527c73f
1 changed files with 244 additions and 0 deletions
244
hacking/report.py
Executable file
244
hacking/report.py
Executable file
|
@ -0,0 +1,244 @@
|
|||
#!/usr/bin/env python
|
||||
# PYTHON_ARGCOMPLETE_OK
|
||||
"""A tool to aggregate data about Ansible source and testing into a sqlite DB for reporting."""
|
||||
|
||||
from __future__ import (absolute_import, print_function)
|
||||
|
||||
import argparse
|
||||
import os
|
||||
import requests
|
||||
import sqlite3
|
||||
import sys
|
||||
|
||||
DATABASE_PATH = os.path.expanduser('~/.ansible/report.db')
|
||||
BASE_PATH = os.path.abspath(os.path.join(os.path.dirname(os.path.abspath(__file__)), '..')) + '/'
|
||||
ANSIBLE_PATH = os.path.join(BASE_PATH, 'lib')
|
||||
ANSIBLE_TEST_PATH = os.path.join(BASE_PATH, 'test/runner')
|
||||
|
||||
if ANSIBLE_PATH not in sys.path:
|
||||
sys.path.insert(0, ANSIBLE_PATH)
|
||||
|
||||
if ANSIBLE_TEST_PATH not in sys.path:
|
||||
sys.path.insert(0, ANSIBLE_TEST_PATH)
|
||||
|
||||
from ansible.parsing.metadata import extract_metadata
|
||||
from lib.target import walk_integration_targets
|
||||
|
||||
|
||||
def main():
|
||||
os.chdir(BASE_PATH)
|
||||
|
||||
args = parse_args()
|
||||
args.func()
|
||||
|
||||
|
||||
def parse_args():
|
||||
try:
|
||||
import argcomplete
|
||||
except ImportError:
|
||||
argcomplete = None
|
||||
|
||||
parser = argparse.ArgumentParser()
|
||||
|
||||
subparsers = parser.add_subparsers(metavar='COMMAND')
|
||||
subparsers.required = True # work-around for python 3 bug which makes subparsers optional
|
||||
|
||||
populate = subparsers.add_parser('populate',
|
||||
help='populate report database')
|
||||
|
||||
populate.set_defaults(func=populate_database)
|
||||
|
||||
query = subparsers.add_parser('query',
|
||||
help='query report database')
|
||||
|
||||
query.set_defaults(func=query_database)
|
||||
|
||||
if argcomplete:
|
||||
argcomplete.autocomplete(parser)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
return args
|
||||
|
||||
|
||||
def query_database():
|
||||
if not os.path.exists(DATABASE_PATH):
|
||||
exit('error: Database not found. Did you run `report.py populate` first?')
|
||||
|
||||
os.execvp('sqlite3', ('sqlite3', DATABASE_PATH))
|
||||
|
||||
|
||||
def populate_database():
|
||||
populate_modules()
|
||||
populate_coverage()
|
||||
populate_integration_targets()
|
||||
|
||||
|
||||
def populate_modules():
|
||||
module_dir = os.path.join(BASE_PATH, 'lib/ansible/modules/')
|
||||
|
||||
modules_rows = []
|
||||
module_statuses_rows = []
|
||||
|
||||
for root, dir_names, file_names in os.walk(module_dir):
|
||||
for file_name in file_names:
|
||||
module, extension = os.path.splitext(file_name)
|
||||
|
||||
if module == '__init__' or extension != '.py':
|
||||
continue
|
||||
|
||||
if module.startswith('_'):
|
||||
module = module[1:]
|
||||
|
||||
namespace = os.path.join(root.replace(module_dir, '')).replace('/', '.')
|
||||
|
||||
path = os.path.join(root, file_name)
|
||||
|
||||
with open(path, 'rb') as module_fd:
|
||||
module_data = module_fd.read()
|
||||
|
||||
result = extract_metadata(module_data=module_data)
|
||||
|
||||
metadata = result[0]
|
||||
|
||||
if not metadata:
|
||||
if module == 'async_wrapper':
|
||||
continue
|
||||
|
||||
raise Exception('no metadata for: %s' % path)
|
||||
|
||||
modules_rows.append(dict(
|
||||
module=module,
|
||||
namespace=namespace,
|
||||
path=path.replace(BASE_PATH, ''),
|
||||
supported_by=metadata['supported_by'],
|
||||
))
|
||||
|
||||
for status in metadata['status']:
|
||||
module_statuses_rows.append(dict(
|
||||
module=module,
|
||||
status=status,
|
||||
))
|
||||
|
||||
populate_data(dict(
|
||||
modules=dict(
|
||||
rows=modules_rows,
|
||||
schema=(
|
||||
('module', 'TEXT'),
|
||||
('namespace', 'TEXT'),
|
||||
('path', 'TEXT'),
|
||||
('supported_by', 'TEXT'),
|
||||
)),
|
||||
module_statuses=dict(
|
||||
rows=module_statuses_rows,
|
||||
schema=(
|
||||
('module', 'TEXT'),
|
||||
('status', 'TEXT'),
|
||||
)),
|
||||
))
|
||||
|
||||
|
||||
def populate_coverage():
|
||||
response = requests.get('https://codecov.io/api/gh/ansible/ansible/tree/devel/?src=extension')
|
||||
data = response.json()
|
||||
files = data['commit']['report']['files']
|
||||
coverage_rows = []
|
||||
|
||||
for path, data in files.items():
|
||||
report = data['t']
|
||||
coverage_rows.append(dict(
|
||||
path=path,
|
||||
coverage=float(report['c']),
|
||||
lines=report['n'],
|
||||
hit=report['h'],
|
||||
partial=report['p'],
|
||||
missed=report['m'],
|
||||
))
|
||||
|
||||
populate_data(dict(
|
||||
coverage=dict(
|
||||
rows=coverage_rows,
|
||||
schema=(
|
||||
('path', 'TEXT'),
|
||||
('coverage', 'REAL'),
|
||||
('lines', 'INTEGER'),
|
||||
('hit', 'INTEGER'),
|
||||
('partial', 'INTEGER'),
|
||||
('missed', 'INTEGER'),
|
||||
)),
|
||||
))
|
||||
|
||||
|
||||
def populate_integration_targets():
|
||||
targets = list(walk_integration_targets())
|
||||
|
||||
integration_targets_rows = [dict(
|
||||
target=target.name,
|
||||
type=target.type,
|
||||
path=target.path,
|
||||
script_path=target.script_path,
|
||||
) for target in targets]
|
||||
|
||||
integration_target_aliases_rows = [dict(
|
||||
target=target.name,
|
||||
alias=alias,
|
||||
) for target in targets for alias in target.aliases]
|
||||
|
||||
integration_target_modules_rows = [dict(
|
||||
target=target.name,
|
||||
module=module,
|
||||
) for target in targets for module in target.modules]
|
||||
|
||||
populate_data(dict(
|
||||
integration_targets=dict(
|
||||
rows=integration_targets_rows,
|
||||
schema=(
|
||||
('target', 'TEXT'),
|
||||
('type', 'TEXT'),
|
||||
('path', 'TEXT'),
|
||||
('script_path', 'TEXT'),
|
||||
)),
|
||||
integration_target_aliases=dict(
|
||||
rows=integration_target_aliases_rows,
|
||||
schema=(
|
||||
('target', 'TEXT'),
|
||||
('alias', 'TEXT'),
|
||||
)),
|
||||
integration_target_modules=dict(
|
||||
rows=integration_target_modules_rows,
|
||||
schema=(
|
||||
('target', 'TEXT'),
|
||||
('module', 'TEXT'),
|
||||
)),
|
||||
))
|
||||
|
||||
|
||||
def create_table(cursor, name, columns):
|
||||
schema = ', '.join('%s %s' % column for column in columns)
|
||||
|
||||
cursor.execute('DROP TABLE IF EXISTS %s' % name)
|
||||
cursor.execute('CREATE TABLE %s (%s)' % (name, schema))
|
||||
|
||||
|
||||
def populate_table(cursor, rows, name, columns):
|
||||
create_table(cursor, name, columns)
|
||||
|
||||
values = ', '.join([':%s' % column[0] for column in columns])
|
||||
|
||||
for row in rows:
|
||||
cursor.execute('INSERT INTO %s VALUES (%s)' % (name, values), row)
|
||||
|
||||
|
||||
def populate_data(data):
|
||||
connection = sqlite3.connect(DATABASE_PATH)
|
||||
cursor = connection.cursor()
|
||||
|
||||
for table in data:
|
||||
populate_table(cursor, data[table]['rows'], table, data[table]['schema'])
|
||||
|
||||
connection.commit()
|
||||
connection.close()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
Loading…
Reference in a new issue