1
0
Fork 0
mirror of https://github.com/ansible-collections/community.general.git synced 2024-09-14 20:13:21 +02:00
community.general/tests/unit/plugins/module_utils/test_database.py
patchback[bot] 203747027e
Move licenses to LICENSES/, run add-license.py, add LICENSES/MIT.txt (#5065) (#5068)
* Move licenses to LICENSES/, run add-license.py, add LICENSES/MIT.txt.

* Replace 'Copyright:' with 'Copyright'

sed -i 's|Copyright:\(.*\)|Copyright\1|' $(rg -l 'Copyright:')

Co-authored-by: Maxwell G <gotmax@e.email>
(cherry picked from commit 123c7efe5e)

Co-authored-by: Felix Fontein <felix@fontein.de>
2022-08-05 13:17:19 +02:00

142 lines
5.8 KiB
Python

# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
# SPDX-License-Identifier: GPL-3.0-or-later
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import pytest
from ansible_collections.community.general.plugins.module_utils.database import (
is_input_dangerous,
pg_quote_identifier,
SQLParseError,
)
# These are all valid strings
# The results are based on interpreting the identifier as a table name
VALID = {
# User quoted
'"public.table"': '"public.table"',
'"public"."table"': '"public"."table"',
'"schema test"."table test"': '"schema test"."table test"',
# We quote part
'public.table': '"public"."table"',
'"public".table': '"public"."table"',
'public."table"': '"public"."table"',
'schema test.table test': '"schema test"."table test"',
'"schema test".table test': '"schema test"."table test"',
'schema test."table test"': '"schema test"."table test"',
# Embedded double quotes
'table "test"': '"table ""test"""',
'public."table ""test"""': '"public"."table ""test"""',
'public.table "test"': '"public"."table ""test"""',
'schema "test".table': '"schema ""test"""."table"',
'"schema ""test""".table': '"schema ""test"""."table"',
'"""wat"""."""test"""': '"""wat"""."""test"""',
# Sigh, handle these as well:
'"no end quote': '"""no end quote"',
'schema."table': '"schema"."""table"',
'"schema.table': '"""schema"."table"',
'schema."table.something': '"schema"."""table"."something"',
# Embedded dots
'"schema.test"."table.test"': '"schema.test"."table.test"',
'"schema.".table': '"schema."."table"',
'"schema."."table"': '"schema."."table"',
'schema.".table"': '"schema".".table"',
'"schema".".table"': '"schema".".table"',
'"schema.".".table"': '"schema.".".table"',
# These are valid but maybe not what the user intended
'."table"': '".""table"""',
'table.': '"table."',
}
INVALID = {
('test.too.many.dots', 'table'): 'PostgreSQL does not support table with more than 3 dots',
('"test.too".many.dots', 'database'): 'PostgreSQL does not support database with more than 1 dots',
('test.too."many.dots"', 'database'): 'PostgreSQL does not support database with more than 1 dots',
('"test"."too"."many"."dots"', 'database'): "PostgreSQL does not support database with more than 1 dots",
('"test"."too"."many"."dots"', 'schema'): "PostgreSQL does not support schema with more than 2 dots",
('"test"."too"."many"."dots"', 'table'): "PostgreSQL does not support table with more than 3 dots",
('"test"."too"."many"."dots"."for"."column"', 'column'): "PostgreSQL does not support column with more than 4 dots",
('"table "invalid" double quote"', 'table'): 'User escaped identifiers must escape extra quotes',
('"schema "invalid"""."table "invalid"', 'table'): 'User escaped identifiers must escape extra quotes',
('"schema."table"', 'table'): 'User escaped identifiers must escape extra quotes',
('"schema".', 'table'): 'Identifier name unspecified or unquoted trailing dot',
}
HOW_MANY_DOTS = (
('role', 'role', '"role"',
'PostgreSQL does not support role with more than 1 dots'),
('db', 'database', '"db"',
'PostgreSQL does not support database with more than 1 dots'),
('db.schema', 'schema', '"db"."schema"',
'PostgreSQL does not support schema with more than 2 dots'),
('db.schema.table', 'table', '"db"."schema"."table"',
'PostgreSQL does not support table with more than 3 dots'),
('db.schema.table.column', 'column', '"db"."schema"."table"."column"',
'PostgreSQL does not support column with more than 4 dots'),
)
VALID_QUOTES = ((test, VALID[test]) for test in sorted(VALID))
INVALID_QUOTES = ((test[0], test[1], INVALID[test]) for test in sorted(INVALID))
IS_STRINGS_DANGEROUS = (
(u'', False),
(u' ', False),
(u'alternative database', False),
(u'backup of TRUNCATED table', False),
(u'bob.dropper', False),
(u'd\'artagnan', False),
(u'user_with_select_update_truncate_right', False),
(u';DROP DATABASE fluffy_pets_photos', True),
(u';drop DATABASE fluffy_pets_photos', True),
(u'; TRUNCATE TABLE his_valuable_table', True),
(u'; truncate TABLE his_valuable_table', True),
(u'\'--', True),
(u'"--', True),
(u'\' union select username, password from admin_credentials', True),
(u'\' UNION SELECT username, password from admin_credentials', True),
(u'\' intersect select', True),
(u'\' INTERSECT select', True),
(u'\' except select', True),
(u'\' EXCEPT select', True),
(u';ALTER TABLE prices', True),
(u';alter table prices', True),
(u"; UPDATE products SET price = '0'", True),
(u";update products SET price = '0'", True),
(u"; DELETE FROM products", True),
(u"; delete FROM products", True),
(u"; SELECT * FROM products", True),
(u" ; select * from products", True),
)
@pytest.mark.parametrize("identifier, quoted_identifier", VALID_QUOTES)
def test_valid_quotes(identifier, quoted_identifier):
assert pg_quote_identifier(identifier, 'table') == quoted_identifier
@pytest.mark.parametrize("identifier, id_type, msg", INVALID_QUOTES)
def test_invalid_quotes(identifier, id_type, msg):
with pytest.raises(SQLParseError) as ex:
pg_quote_identifier(identifier, id_type)
ex.match(msg)
@pytest.mark.parametrize("identifier, id_type, quoted_identifier, msg", HOW_MANY_DOTS)
def test_how_many_dots(identifier, id_type, quoted_identifier, msg):
assert pg_quote_identifier(identifier, id_type) == quoted_identifier
with pytest.raises(SQLParseError) as ex:
pg_quote_identifier('%s.more' % identifier, id_type)
ex.match(msg)
@pytest.mark.parametrize("string, result", IS_STRINGS_DANGEROUS)
def test_is_input_dangerous(string, result):
assert is_input_dangerous(string) == result