mirror of
https://github.com/ansible-collections/community.general.git
synced 2024-09-14 20:13:21 +02:00
Add requests.Session like class (#37622)
* Adds requests.Session like class * py2 syntax fix * Add a few examples to the Request docstrings * Add helper methods and docs * Fix test failures * Switch tests to test Request instead of open_url, add simple open_url test to validate funcitonality * Fix filename in replace-urlopen code smell test
This commit is contained in:
parent
c86fd6e2df
commit
d4930e6692
3 changed files with 440 additions and 172 deletions
|
@ -816,6 +816,311 @@ def maybe_add_ssl_handler(url, validate_certs):
|
||||||
return SSLValidationHandler(hostname, port)
|
return SSLValidationHandler(hostname, port)
|
||||||
|
|
||||||
|
|
||||||
|
class Request:
|
||||||
|
def __init__(self, headers=None, use_proxy=True, force=False, timeout=10, validate_certs=True,
|
||||||
|
url_username=None, url_password=None, http_agent=None, force_basic_auth=False,
|
||||||
|
follow_redirects='urllib2', client_cert=None, client_key=None, cookies=None):
|
||||||
|
"""This class works somewhat similarly to the ``Session`` class of from requests
|
||||||
|
by defining a cookiejar that an be used across requests as well as cascaded defaults that
|
||||||
|
can apply to repeated requests
|
||||||
|
|
||||||
|
For documentation of params, see ``Request.open``
|
||||||
|
|
||||||
|
>>> from ansible.module_utils.urls import Request
|
||||||
|
>>> r = Request()
|
||||||
|
>>> r.open('GET', 'http://httpbin.org/cookies/set?k1=v1').read()
|
||||||
|
'{\n "cookies": {\n "k1": "v1"\n }\n}\n'
|
||||||
|
>>> r = Request(url_username='user', url_password='passwd')
|
||||||
|
>>> r.open('GET', 'http://httpbin.org/basic-auth/user/passwd').read()
|
||||||
|
'{\n "authenticated": true, \n "user": "user"\n}\n'
|
||||||
|
>>> r = Request(headers=dict(foo='bar'))
|
||||||
|
>>> r.open('GET', 'http://httpbin.org/get', headers=dict(baz='qux')).read()
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
self.headers = headers or {}
|
||||||
|
if not isinstance(self.headers, dict):
|
||||||
|
raise ValueError("headers must be a dict")
|
||||||
|
self.use_proxy = use_proxy
|
||||||
|
self.force = force
|
||||||
|
self.timeout = timeout
|
||||||
|
self.validate_certs = validate_certs
|
||||||
|
self.url_username = url_username
|
||||||
|
self.url_password = url_password
|
||||||
|
self.http_agent = http_agent
|
||||||
|
self.force_basic_auth = force_basic_auth
|
||||||
|
self.follow_redirects = follow_redirects
|
||||||
|
self.client_cert = client_cert
|
||||||
|
self.client_key = client_key
|
||||||
|
if isinstance(cookies, cookiejar.CookieJar):
|
||||||
|
self.cookies = cookies
|
||||||
|
else:
|
||||||
|
self.cookies = cookiejar.CookieJar()
|
||||||
|
|
||||||
|
def _fallback(self, value, fallback):
|
||||||
|
if value is None:
|
||||||
|
return fallback
|
||||||
|
return value
|
||||||
|
|
||||||
|
def open(self, method, url, data=None, headers=None, use_proxy=None,
|
||||||
|
force=None, last_mod_time=None, timeout=None, validate_certs=None,
|
||||||
|
url_username=None, url_password=None, http_agent=None,
|
||||||
|
force_basic_auth=None, follow_redirects=None,
|
||||||
|
client_cert=None, client_key=None, cookies=None):
|
||||||
|
"""
|
||||||
|
Sends a request via HTTP(S) or FTP using urllib2 (Python2) or urllib (Python3)
|
||||||
|
|
||||||
|
Does not require the module environment
|
||||||
|
|
||||||
|
Returns :class:`HTTPResponse` object.
|
||||||
|
|
||||||
|
:arg method: method for the request
|
||||||
|
:arg url: URL to request
|
||||||
|
|
||||||
|
:kwarg data: (optional) bytes, or file-like object to send
|
||||||
|
in the body of the request
|
||||||
|
:kwarg headers: (optional) Dictionary of HTTP Headers to send with the
|
||||||
|
request
|
||||||
|
:kwarg use_proxy: (optional) Boolean of whether or not to use proxy
|
||||||
|
:kwarg force: (optional) Boolean of whether or not to set `cache-control: no-cache` header
|
||||||
|
:kwarg last_mod_time: (optional) Datetime object to use when setting If-Modified-Since header
|
||||||
|
:kwarg timeout: (optional) How long to wait for the server to send
|
||||||
|
data before giving up, as a float
|
||||||
|
:kwarg validate_certs: (optional) Booleani that controls whether we verify
|
||||||
|
the server's TLS certificate
|
||||||
|
:kwarg url_username: (optional) String of the user to use when authenticating
|
||||||
|
:kwarg url_password: (optional) String of the password to use when authenticating
|
||||||
|
:kwarg http_agent: (optional) String of the User-Agent to use in the request
|
||||||
|
:kwarg force_basic_auth: (optional) Boolean determining if auth header should be sent in the initial request
|
||||||
|
:kwarg follow_redirects: (optional) String of urllib2, all/yes, safe, none to determine how redirects are
|
||||||
|
followed, see RedirectHandlerFactory for more information
|
||||||
|
:kwarg client_cert: (optional) PEM formatted certificate chain file to be used for SSL client authentication.
|
||||||
|
This file can also include the key as well, and if the key is included, client_key is not required
|
||||||
|
:kwarg client_key: (optional) PEM formatted file that contains your private key to be used for SSL client
|
||||||
|
authentication. If client_cert contains both the certificate and key, this option is not required
|
||||||
|
:kwarg cookies: (optional) CookieJar object to send with the
|
||||||
|
request
|
||||||
|
:returns: HTTPResponse
|
||||||
|
"""
|
||||||
|
|
||||||
|
method = method.upper()
|
||||||
|
|
||||||
|
if headers is None:
|
||||||
|
headers = {}
|
||||||
|
elif not isinstance(headers, dict):
|
||||||
|
raise ValueError("headers must be a dict")
|
||||||
|
headers = dict(self.headers, **headers)
|
||||||
|
|
||||||
|
use_proxy = self._fallback(use_proxy, self.use_proxy)
|
||||||
|
force = self._fallback(force, self.force)
|
||||||
|
timeout = self._fallback(timeout, self.timeout)
|
||||||
|
validate_certs = self._fallback(validate_certs, self.validate_certs)
|
||||||
|
url_username = self._fallback(url_username, self.url_username)
|
||||||
|
url_password = self._fallback(url_password, self.url_password)
|
||||||
|
http_agent = self._fallback(http_agent, self.http_agent)
|
||||||
|
force_basic_auth = self._fallback(force_basic_auth, self.force_basic_auth)
|
||||||
|
follow_redirects = self._fallback(follow_redirects, self.follow_redirects)
|
||||||
|
client_cert = self._fallback(client_cert, self.client_cert)
|
||||||
|
client_key = self._fallback(client_key, self.client_key)
|
||||||
|
cookies = self._fallback(cookies, self.cookies)
|
||||||
|
|
||||||
|
handlers = []
|
||||||
|
ssl_handler = maybe_add_ssl_handler(url, validate_certs)
|
||||||
|
if ssl_handler:
|
||||||
|
handlers.append(ssl_handler)
|
||||||
|
|
||||||
|
parsed = generic_urlparse(urlparse(url))
|
||||||
|
if parsed.scheme != 'ftp':
|
||||||
|
username = url_username
|
||||||
|
|
||||||
|
if username:
|
||||||
|
password = url_password
|
||||||
|
netloc = parsed.netloc
|
||||||
|
elif '@' in parsed.netloc:
|
||||||
|
credentials, netloc = parsed.netloc.split('@', 1)
|
||||||
|
if ':' in credentials:
|
||||||
|
username, password = credentials.split(':', 1)
|
||||||
|
else:
|
||||||
|
username = credentials
|
||||||
|
password = ''
|
||||||
|
|
||||||
|
parsed_list = parsed.as_list()
|
||||||
|
parsed_list[1] = netloc
|
||||||
|
|
||||||
|
# reconstruct url without credentials
|
||||||
|
url = urlunparse(parsed_list)
|
||||||
|
|
||||||
|
if username and not force_basic_auth:
|
||||||
|
passman = urllib_request.HTTPPasswordMgrWithDefaultRealm()
|
||||||
|
|
||||||
|
# this creates a password manager
|
||||||
|
passman.add_password(None, netloc, username, password)
|
||||||
|
|
||||||
|
# because we have put None at the start it will always
|
||||||
|
# use this username/password combination for urls
|
||||||
|
# for which `theurl` is a super-url
|
||||||
|
authhandler = urllib_request.HTTPBasicAuthHandler(passman)
|
||||||
|
digest_authhandler = urllib_request.HTTPDigestAuthHandler(passman)
|
||||||
|
|
||||||
|
# create the AuthHandler
|
||||||
|
handlers.append(authhandler)
|
||||||
|
handlers.append(digest_authhandler)
|
||||||
|
|
||||||
|
elif username and force_basic_auth:
|
||||||
|
headers["Authorization"] = basic_auth_header(username, password)
|
||||||
|
|
||||||
|
else:
|
||||||
|
try:
|
||||||
|
rc = netrc.netrc(os.environ.get('NETRC'))
|
||||||
|
login = rc.authenticators(parsed.hostname)
|
||||||
|
except IOError:
|
||||||
|
login = None
|
||||||
|
|
||||||
|
if login:
|
||||||
|
username, _, password = login
|
||||||
|
if username and password:
|
||||||
|
headers["Authorization"] = basic_auth_header(username, password)
|
||||||
|
|
||||||
|
if not use_proxy:
|
||||||
|
proxyhandler = urllib_request.ProxyHandler({})
|
||||||
|
handlers.append(proxyhandler)
|
||||||
|
|
||||||
|
if HAS_SSLCONTEXT and not validate_certs:
|
||||||
|
# In 2.7.9, the default context validates certificates
|
||||||
|
context = SSLContext(ssl.PROTOCOL_SSLv23)
|
||||||
|
if ssl.OP_NO_SSLv2:
|
||||||
|
context.options |= ssl.OP_NO_SSLv2
|
||||||
|
context.options |= ssl.OP_NO_SSLv3
|
||||||
|
context.verify_mode = ssl.CERT_NONE
|
||||||
|
context.check_hostname = False
|
||||||
|
handlers.append(HTTPSClientAuthHandler(client_cert=client_cert,
|
||||||
|
client_key=client_key,
|
||||||
|
context=context))
|
||||||
|
elif client_cert:
|
||||||
|
handlers.append(HTTPSClientAuthHandler(client_cert=client_cert,
|
||||||
|
client_key=client_key))
|
||||||
|
|
||||||
|
# pre-2.6 versions of python cannot use the custom https
|
||||||
|
# handler, since the socket class is lacking create_connection.
|
||||||
|
# Some python builds lack HTTPS support.
|
||||||
|
if hasattr(socket, 'create_connection') and CustomHTTPSHandler:
|
||||||
|
handlers.append(CustomHTTPSHandler)
|
||||||
|
|
||||||
|
handlers.append(RedirectHandlerFactory(follow_redirects, validate_certs))
|
||||||
|
|
||||||
|
# add some nicer cookie handling
|
||||||
|
if cookies is not None:
|
||||||
|
handlers.append(urllib_request.HTTPCookieProcessor(cookies))
|
||||||
|
|
||||||
|
opener = urllib_request.build_opener(*handlers)
|
||||||
|
urllib_request.install_opener(opener)
|
||||||
|
|
||||||
|
data = to_bytes(data, nonstring='passthru')
|
||||||
|
if method not in ('OPTIONS', 'GET', 'HEAD', 'POST', 'PUT', 'DELETE', 'TRACE', 'CONNECT', 'PATCH'):
|
||||||
|
raise ConnectionError('invalid HTTP request method; %s' % method)
|
||||||
|
request = RequestWithMethod(url, method, data)
|
||||||
|
|
||||||
|
# add the custom agent header, to help prevent issues
|
||||||
|
# with sites that block the default urllib agent string
|
||||||
|
if http_agent:
|
||||||
|
request.add_header('User-agent', http_agent)
|
||||||
|
|
||||||
|
# Cache control
|
||||||
|
# Either we directly force a cache refresh
|
||||||
|
if force:
|
||||||
|
request.add_header('cache-control', 'no-cache')
|
||||||
|
# or we do it if the original is more recent than our copy
|
||||||
|
elif last_mod_time:
|
||||||
|
tstamp = last_mod_time.strftime('%a, %d %b %Y %H:%M:%S +0000')
|
||||||
|
request.add_header('If-Modified-Since', tstamp)
|
||||||
|
|
||||||
|
# user defined headers now, which may override things we've set above
|
||||||
|
for header in headers:
|
||||||
|
request.add_header(header, headers[header])
|
||||||
|
|
||||||
|
urlopen_args = [request, None]
|
||||||
|
if sys.version_info >= (2, 6, 0):
|
||||||
|
# urlopen in python prior to 2.6.0 did not
|
||||||
|
# have a timeout parameter
|
||||||
|
urlopen_args.append(timeout)
|
||||||
|
|
||||||
|
r = urllib_request.urlopen(*urlopen_args)
|
||||||
|
return r
|
||||||
|
|
||||||
|
def get(self, url, **kwargs):
|
||||||
|
r"""Sends a GET request. Returns :class:`HTTPResponse` object.
|
||||||
|
|
||||||
|
:arg url: URL to request
|
||||||
|
:kwarg \*\*kwargs: Optional arguments that ``open`` takes.
|
||||||
|
:returns: HTTPResponse
|
||||||
|
"""
|
||||||
|
|
||||||
|
return self.open('GET', url, **kwargs)
|
||||||
|
|
||||||
|
def options(self, url, **kwargs):
|
||||||
|
r"""Sends a OPTIONS request. Returns :class:`HTTPResponse` object.
|
||||||
|
|
||||||
|
:arg url: URL to request
|
||||||
|
:kwarg \*\*kwargs: Optional arguments that ``open`` takes.
|
||||||
|
:returns: HTTPResponse
|
||||||
|
"""
|
||||||
|
|
||||||
|
return self.open('OPTIONS', url, **kwargs)
|
||||||
|
|
||||||
|
def head(self, url, **kwargs):
|
||||||
|
r"""Sends a HEAD request. Returns :class:`HTTPResponse` object.
|
||||||
|
|
||||||
|
:arg url: URL to request
|
||||||
|
:kwarg \*\*kwargs: Optional arguments that ``open`` takes.
|
||||||
|
:returns: HTTPResponse
|
||||||
|
"""
|
||||||
|
|
||||||
|
return self.open('HEAD', url, **kwargs)
|
||||||
|
|
||||||
|
def post(self, url, data=None, **kwargs):
|
||||||
|
r"""Sends a POST request. Returns :class:`HTTPResponse` object.
|
||||||
|
|
||||||
|
:arg url: URL to request.
|
||||||
|
:kwarg data: (optional) bytes, or file-like object to send in the body of the request.
|
||||||
|
:kwarg \*\*kwargs: Optional arguments that ``open`` takes.
|
||||||
|
:returns: HTTPResponse
|
||||||
|
"""
|
||||||
|
|
||||||
|
return self.open('POST', url, data=data, **kwargs)
|
||||||
|
|
||||||
|
def put(self, url, data=None, **kwargs):
|
||||||
|
r"""Sends a PUT request. Returns :class:`HTTPResponse` object.
|
||||||
|
|
||||||
|
:arg url: URL to request.
|
||||||
|
:kwarg data: (optional) bytes, or file-like object to send in the body of the request.
|
||||||
|
:kwarg \*\*kwargs: Optional arguments that ``open`` takes.
|
||||||
|
:returns: HTTPResponse
|
||||||
|
"""
|
||||||
|
|
||||||
|
return self.open('PUT', url, data=data, **kwargs)
|
||||||
|
|
||||||
|
def patch(self, url, data=None, **kwargs):
|
||||||
|
r"""Sends a PATCH request. Returns :class:`HTTPResponse` object.
|
||||||
|
|
||||||
|
:arg url: URL to request.
|
||||||
|
:kwarg data: (optional) bytes, or file-like object to send in the body of the request.
|
||||||
|
:kwarg \*\*kwargs: Optional arguments that ``open`` takes.
|
||||||
|
:returns: HTTPResponse
|
||||||
|
"""
|
||||||
|
|
||||||
|
return self.open('PATCH', url, data=data, **kwargs)
|
||||||
|
|
||||||
|
def delete(self, url, **kwargs):
|
||||||
|
r"""Sends a DELETE request. Returns :class:`HTTPResponse` object.
|
||||||
|
|
||||||
|
:arg url: URL to request
|
||||||
|
:kwargs \*\*kwargs: Optional arguments that ``open`` takes.
|
||||||
|
:returns: HTTPResponse
|
||||||
|
"""
|
||||||
|
|
||||||
|
return self.open('DELETE', url, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
def open_url(url, data=None, headers=None, method=None, use_proxy=True,
|
def open_url(url, data=None, headers=None, method=None, use_proxy=True,
|
||||||
force=False, last_mod_time=None, timeout=10, validate_certs=True,
|
force=False, last_mod_time=None, timeout=10, validate_certs=True,
|
||||||
url_username=None, url_password=None, http_agent=None,
|
url_username=None, url_password=None, http_agent=None,
|
||||||
|
@ -826,137 +1131,13 @@ def open_url(url, data=None, headers=None, method=None, use_proxy=True,
|
||||||
|
|
||||||
Does not require the module environment
|
Does not require the module environment
|
||||||
'''
|
'''
|
||||||
handlers = []
|
method = method or 'GET'
|
||||||
ssl_handler = maybe_add_ssl_handler(url, validate_certs)
|
return Request().open(method, url, data=data, headers=headers, use_proxy=use_proxy,
|
||||||
if ssl_handler:
|
force=force, last_mod_time=last_mod_time, timeout=timeout, validate_certs=validate_certs,
|
||||||
handlers.append(ssl_handler)
|
url_username=url_username, url_password=url_password, http_agent=http_agent,
|
||||||
|
force_basic_auth=force_basic_auth, follow_redirects=follow_redirects,
|
||||||
|
client_cert=client_cert, client_key=client_key, cookies=cookies)
|
||||||
|
|
||||||
parsed = generic_urlparse(urlparse(url))
|
|
||||||
if parsed.scheme != 'ftp':
|
|
||||||
username = url_username
|
|
||||||
|
|
||||||
if headers is None:
|
|
||||||
headers = {}
|
|
||||||
|
|
||||||
if username:
|
|
||||||
password = url_password
|
|
||||||
netloc = parsed.netloc
|
|
||||||
elif '@' in parsed.netloc:
|
|
||||||
credentials, netloc = parsed.netloc.split('@', 1)
|
|
||||||
if ':' in credentials:
|
|
||||||
username, password = credentials.split(':', 1)
|
|
||||||
else:
|
|
||||||
username = credentials
|
|
||||||
password = ''
|
|
||||||
|
|
||||||
parsed_list = parsed.as_list()
|
|
||||||
parsed_list[1] = netloc
|
|
||||||
|
|
||||||
# reconstruct url without credentials
|
|
||||||
url = urlunparse(parsed_list)
|
|
||||||
|
|
||||||
if username and not force_basic_auth:
|
|
||||||
passman = urllib_request.HTTPPasswordMgrWithDefaultRealm()
|
|
||||||
|
|
||||||
# this creates a password manager
|
|
||||||
passman.add_password(None, netloc, username, password)
|
|
||||||
|
|
||||||
# because we have put None at the start it will always
|
|
||||||
# use this username/password combination for urls
|
|
||||||
# for which `theurl` is a super-url
|
|
||||||
authhandler = urllib_request.HTTPBasicAuthHandler(passman)
|
|
||||||
digest_authhandler = urllib_request.HTTPDigestAuthHandler(passman)
|
|
||||||
|
|
||||||
# create the AuthHandler
|
|
||||||
handlers.append(authhandler)
|
|
||||||
handlers.append(digest_authhandler)
|
|
||||||
|
|
||||||
elif username and force_basic_auth:
|
|
||||||
headers["Authorization"] = basic_auth_header(username, password)
|
|
||||||
|
|
||||||
else:
|
|
||||||
try:
|
|
||||||
rc = netrc.netrc(os.environ.get('NETRC'))
|
|
||||||
login = rc.authenticators(parsed.hostname)
|
|
||||||
except IOError:
|
|
||||||
login = None
|
|
||||||
|
|
||||||
if login:
|
|
||||||
username, _, password = login
|
|
||||||
if username and password:
|
|
||||||
headers["Authorization"] = basic_auth_header(username, password)
|
|
||||||
|
|
||||||
if not use_proxy:
|
|
||||||
proxyhandler = urllib_request.ProxyHandler({})
|
|
||||||
handlers.append(proxyhandler)
|
|
||||||
|
|
||||||
if HAS_SSLCONTEXT and not validate_certs:
|
|
||||||
# In 2.7.9, the default context validates certificates
|
|
||||||
context = SSLContext(ssl.PROTOCOL_SSLv23)
|
|
||||||
if ssl.OP_NO_SSLv2:
|
|
||||||
context.options |= ssl.OP_NO_SSLv2
|
|
||||||
context.options |= ssl.OP_NO_SSLv3
|
|
||||||
context.verify_mode = ssl.CERT_NONE
|
|
||||||
context.check_hostname = False
|
|
||||||
handlers.append(HTTPSClientAuthHandler(client_cert=client_cert,
|
|
||||||
client_key=client_key,
|
|
||||||
context=context))
|
|
||||||
elif client_cert:
|
|
||||||
handlers.append(HTTPSClientAuthHandler(client_cert=client_cert,
|
|
||||||
client_key=client_key))
|
|
||||||
|
|
||||||
# pre-2.6 versions of python cannot use the custom https
|
|
||||||
# handler, since the socket class is lacking create_connection.
|
|
||||||
# Some python builds lack HTTPS support.
|
|
||||||
if hasattr(socket, 'create_connection') and CustomHTTPSHandler:
|
|
||||||
handlers.append(CustomHTTPSHandler)
|
|
||||||
|
|
||||||
handlers.append(RedirectHandlerFactory(follow_redirects, validate_certs))
|
|
||||||
|
|
||||||
# add some nicer cookie handling
|
|
||||||
if cookies is not None:
|
|
||||||
handlers.append(urllib_request.HTTPCookieProcessor(cookies))
|
|
||||||
|
|
||||||
opener = urllib_request.build_opener(*handlers)
|
|
||||||
urllib_request.install_opener(opener)
|
|
||||||
|
|
||||||
data = to_bytes(data, nonstring='passthru')
|
|
||||||
if method:
|
|
||||||
if method.upper() not in ('OPTIONS', 'GET', 'HEAD', 'POST', 'PUT', 'DELETE', 'TRACE', 'CONNECT', 'PATCH'):
|
|
||||||
raise ConnectionError('invalid HTTP request method; %s' % method.upper())
|
|
||||||
request = RequestWithMethod(url, method.upper(), data)
|
|
||||||
else:
|
|
||||||
request = urllib_request.Request(url, data)
|
|
||||||
|
|
||||||
# add the custom agent header, to help prevent issues
|
|
||||||
# with sites that block the default urllib agent string
|
|
||||||
if http_agent:
|
|
||||||
request.add_header('User-agent', http_agent)
|
|
||||||
|
|
||||||
# Cache control
|
|
||||||
# Either we directly force a cache refresh
|
|
||||||
if force:
|
|
||||||
request.add_header('cache-control', 'no-cache')
|
|
||||||
# or we do it if the original is more recent than our copy
|
|
||||||
elif last_mod_time:
|
|
||||||
tstamp = last_mod_time.strftime('%a, %d %b %Y %H:%M:%S +0000')
|
|
||||||
request.add_header('If-Modified-Since', tstamp)
|
|
||||||
|
|
||||||
# user defined headers now, which may override things we've set above
|
|
||||||
if headers:
|
|
||||||
if not isinstance(headers, dict):
|
|
||||||
raise ValueError("headers provided to open_url() must be a dict")
|
|
||||||
for header in headers:
|
|
||||||
request.add_header(header, headers[header])
|
|
||||||
|
|
||||||
urlopen_args = [request, None]
|
|
||||||
if sys.version_info >= (2, 6, 0):
|
|
||||||
# urlopen in python prior to 2.6.0 did not
|
|
||||||
# have a timeout parameter
|
|
||||||
urlopen_args.append(timeout)
|
|
||||||
|
|
||||||
r = urllib_request.urlopen(*urlopen_args)
|
|
||||||
return r
|
|
||||||
|
|
||||||
#
|
#
|
||||||
# Module-related functions
|
# Module-related functions
|
||||||
|
|
|
@ -10,7 +10,7 @@ def main():
|
||||||
'test/sanity/code-smell/%s' % os.path.basename(__file__),
|
'test/sanity/code-smell/%s' % os.path.basename(__file__),
|
||||||
'lib/ansible/module_utils/six/__init__.py',
|
'lib/ansible/module_utils/six/__init__.py',
|
||||||
'lib/ansible/module_utils/urls.py',
|
'lib/ansible/module_utils/urls.py',
|
||||||
'test/units/module_utils/urls/test_open_url.py',
|
'test/units/module_utils/urls/test_Request.py',
|
||||||
'test/units/module_utils/urls/test_fetch_url.py',
|
'test/units/module_utils/urls/test_fetch_url.py',
|
||||||
])
|
])
|
||||||
|
|
||||||
|
|
|
@ -8,10 +8,11 @@ __metaclass__ = type
|
||||||
import datetime
|
import datetime
|
||||||
import os
|
import os
|
||||||
|
|
||||||
from ansible.module_utils.urls import open_url, urllib_request, HAS_SSLCONTEXT, cookiejar, ConnectionError, RequestWithMethod
|
from ansible.module_utils.urls import Request, open_url, urllib_request, HAS_SSLCONTEXT, cookiejar, ConnectionError, RequestWithMethod
|
||||||
from ansible.module_utils.urls import SSLValidationHandler, HTTPSClientAuthHandler, RedirectHandlerFactory
|
from ansible.module_utils.urls import SSLValidationHandler, HTTPSClientAuthHandler, RedirectHandlerFactory
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
from mock import call
|
||||||
|
|
||||||
|
|
||||||
if HAS_SSLCONTEXT:
|
if HAS_SSLCONTEXT:
|
||||||
|
@ -28,8 +29,62 @@ def install_opener_mock(mocker):
|
||||||
return mocker.patch('ansible.module_utils.urls.urllib_request.install_opener')
|
return mocker.patch('ansible.module_utils.urls.urllib_request.install_opener')
|
||||||
|
|
||||||
|
|
||||||
def test_open_url(urlopen_mock, install_opener_mock):
|
def test_Request_fallback(urlopen_mock, install_opener_mock, mocker):
|
||||||
r = open_url('https://ansible.com/')
|
cookies = cookiejar.CookieJar()
|
||||||
|
request = Request(
|
||||||
|
headers={'foo': 'bar'},
|
||||||
|
use_proxy=False,
|
||||||
|
force=True,
|
||||||
|
timeout=100,
|
||||||
|
validate_certs=False,
|
||||||
|
url_username='user',
|
||||||
|
url_password='passwd',
|
||||||
|
http_agent='ansible-tests',
|
||||||
|
force_basic_auth=True,
|
||||||
|
follow_redirects='all',
|
||||||
|
client_cert='/tmp/client.pem',
|
||||||
|
client_key='/tmp/client.key',
|
||||||
|
cookies=cookies,
|
||||||
|
)
|
||||||
|
fallback_mock = mocker.spy(request, '_fallback')
|
||||||
|
|
||||||
|
r = request.open('GET', 'https://ansible.com')
|
||||||
|
|
||||||
|
calls = [
|
||||||
|
call(None, False), # use_proxy
|
||||||
|
call(None, True), # force
|
||||||
|
call(None, 100), # timeout
|
||||||
|
call(None, False), # validate_certs
|
||||||
|
call(None, 'user'), # url_username
|
||||||
|
call(None, 'passwd'), # url_password
|
||||||
|
call(None, 'ansible-tests'), # http_agent
|
||||||
|
call(None, True), # force_basic_auth
|
||||||
|
call(None, 'all'), # follow_redirects
|
||||||
|
call(None, '/tmp/client.pem'), # client_cert
|
||||||
|
call(None, '/tmp/client.key'), # client_key
|
||||||
|
call(None, cookies), # cookies
|
||||||
|
]
|
||||||
|
fallback_mock.assert_has_calls(calls)
|
||||||
|
|
||||||
|
assert fallback_mock.call_count == 12 # All but headers use fallback
|
||||||
|
|
||||||
|
args = urlopen_mock.call_args[0]
|
||||||
|
assert args[1] is None # data, this is handled in the Request not urlopen
|
||||||
|
assert args[2] == 100 # timeout
|
||||||
|
|
||||||
|
req = args[0]
|
||||||
|
assert req.headers == {
|
||||||
|
'Authorization': b'Basic dXNlcjpwYXNzd2Q=',
|
||||||
|
'Cache-control': 'no-cache',
|
||||||
|
'Foo': 'bar',
|
||||||
|
'User-agent': 'ansible-tests'
|
||||||
|
}
|
||||||
|
assert req.data is None
|
||||||
|
assert req.get_method() == 'GET'
|
||||||
|
|
||||||
|
|
||||||
|
def test_Request_open(urlopen_mock, install_opener_mock):
|
||||||
|
r = Request().open('GET', 'https://ansible.com/')
|
||||||
args = urlopen_mock.call_args[0]
|
args = urlopen_mock.call_args[0]
|
||||||
assert args[1] is None # data, this is handled in the Request not urlopen
|
assert args[1] is None # data, this is handled in the Request not urlopen
|
||||||
assert args[2] == 10 # timeout
|
assert args[2] == 10 # timeout
|
||||||
|
@ -55,8 +110,8 @@ def test_open_url(urlopen_mock, install_opener_mock):
|
||||||
assert len(found_handlers) == 2
|
assert len(found_handlers) == 2
|
||||||
|
|
||||||
|
|
||||||
def test_open_url_http(urlopen_mock, install_opener_mock):
|
def test_Request_open_http(urlopen_mock, install_opener_mock):
|
||||||
r = open_url('http://ansible.com/')
|
r = Request().open('GET', 'http://ansible.com/')
|
||||||
args = urlopen_mock.call_args[0]
|
args = urlopen_mock.call_args[0]
|
||||||
|
|
||||||
opener = install_opener_mock.call_args[0][0]
|
opener = install_opener_mock.call_args[0][0]
|
||||||
|
@ -70,22 +125,22 @@ def test_open_url_http(urlopen_mock, install_opener_mock):
|
||||||
assert len(found_handlers) == 0
|
assert len(found_handlers) == 0
|
||||||
|
|
||||||
|
|
||||||
def test_open_url_ftp(urlopen_mock, install_opener_mock, mocker):
|
def test_Request_open_ftp(urlopen_mock, install_opener_mock, mocker):
|
||||||
mocker.patch('ansible.module_utils.urls.ParseResultDottedDict.as_list', side_effect=AssertionError)
|
mocker.patch('ansible.module_utils.urls.ParseResultDottedDict.as_list', side_effect=AssertionError)
|
||||||
|
|
||||||
# Using ftp scheme should prevent the AssertionError side effect to fire
|
# Using ftp scheme should prevent the AssertionError side effect to fire
|
||||||
r = open_url('ftp://foo@ansible.com/')
|
r = Request().open('GET', 'ftp://foo@ansible.com/')
|
||||||
|
|
||||||
|
|
||||||
def test_open_url_headers(urlopen_mock, install_opener_mock):
|
def test_Request_open_headers(urlopen_mock, install_opener_mock):
|
||||||
r = open_url('http://ansible.com/', headers={'Foo': 'bar'})
|
r = Request().open('GET', 'http://ansible.com/', headers={'Foo': 'bar'})
|
||||||
args = urlopen_mock.call_args[0]
|
args = urlopen_mock.call_args[0]
|
||||||
req = args[0]
|
req = args[0]
|
||||||
assert req.headers == {'Foo': 'bar'}
|
assert req.headers == {'Foo': 'bar'}
|
||||||
|
|
||||||
|
|
||||||
def test_open_url_username(urlopen_mock, install_opener_mock):
|
def test_Request_open_username(urlopen_mock, install_opener_mock):
|
||||||
r = open_url('http://ansible.com/', url_username='user')
|
r = Request().open('GET', 'http://ansible.com/', url_username='user')
|
||||||
|
|
||||||
opener = install_opener_mock.call_args[0][0]
|
opener = install_opener_mock.call_args[0][0]
|
||||||
handlers = opener.handlers
|
handlers = opener.handlers
|
||||||
|
@ -103,8 +158,8 @@ def test_open_url_username(urlopen_mock, install_opener_mock):
|
||||||
assert found_handlers[0].passwd.passwd[None] == {(('ansible.com', '/'),): ('user', None)}
|
assert found_handlers[0].passwd.passwd[None] == {(('ansible.com', '/'),): ('user', None)}
|
||||||
|
|
||||||
|
|
||||||
def test_open_url_username_in_url(urlopen_mock, install_opener_mock):
|
def test_Request_open_username_in_url(urlopen_mock, install_opener_mock):
|
||||||
r = open_url('http://user2@ansible.com/')
|
r = Request().open('GET', 'http://user2@ansible.com/')
|
||||||
|
|
||||||
opener = install_opener_mock.call_args[0][0]
|
opener = install_opener_mock.call_args[0][0]
|
||||||
handlers = opener.handlers
|
handlers = opener.handlers
|
||||||
|
@ -121,8 +176,8 @@ def test_open_url_username_in_url(urlopen_mock, install_opener_mock):
|
||||||
assert found_handlers[0].passwd.passwd[None] == {(('ansible.com', '/'),): ('user2', '')}
|
assert found_handlers[0].passwd.passwd[None] == {(('ansible.com', '/'),): ('user2', '')}
|
||||||
|
|
||||||
|
|
||||||
def test_open_url_username_force_basic(urlopen_mock, install_opener_mock):
|
def test_Request_open_username_force_basic(urlopen_mock, install_opener_mock):
|
||||||
r = open_url('http://ansible.com/', url_username='user', url_password='passwd', force_basic_auth=True)
|
r = Request().open('GET', 'http://ansible.com/', url_username='user', url_password='passwd', force_basic_auth=True)
|
||||||
|
|
||||||
opener = install_opener_mock.call_args[0][0]
|
opener = install_opener_mock.call_args[0][0]
|
||||||
handlers = opener.handlers
|
handlers = opener.handlers
|
||||||
|
@ -144,8 +199,8 @@ def test_open_url_username_force_basic(urlopen_mock, install_opener_mock):
|
||||||
assert req.headers.get('Authorization') == b'Basic dXNlcjpwYXNzd2Q='
|
assert req.headers.get('Authorization') == b'Basic dXNlcjpwYXNzd2Q='
|
||||||
|
|
||||||
|
|
||||||
def test_open_url_auth_in_netloc(urlopen_mock, install_opener_mock):
|
def test_Request_open_auth_in_netloc(urlopen_mock, install_opener_mock):
|
||||||
r = open_url('http://user:passwd@ansible.com/')
|
r = Request().open('GET', 'http://user:passwd@ansible.com/')
|
||||||
args = urlopen_mock.call_args[0]
|
args = urlopen_mock.call_args[0]
|
||||||
req = args[0]
|
req = args[0]
|
||||||
assert req.get_full_url() == 'http://ansible.com/'
|
assert req.get_full_url() == 'http://ansible.com/'
|
||||||
|
@ -166,31 +221,31 @@ def test_open_url_auth_in_netloc(urlopen_mock, install_opener_mock):
|
||||||
assert len(found_handlers) == 2
|
assert len(found_handlers) == 2
|
||||||
|
|
||||||
|
|
||||||
def test_open_url_netrc(urlopen_mock, install_opener_mock, monkeypatch):
|
def test_Request_open_netrc(urlopen_mock, install_opener_mock, monkeypatch):
|
||||||
here = os.path.dirname(__file__)
|
here = os.path.dirname(__file__)
|
||||||
|
|
||||||
monkeypatch.setenv('NETRC', os.path.join(here, 'fixtures/netrc'))
|
monkeypatch.setenv('NETRC', os.path.join(here, 'fixtures/netrc'))
|
||||||
r = open_url('http://ansible.com/')
|
r = Request().open('GET', 'http://ansible.com/')
|
||||||
args = urlopen_mock.call_args[0]
|
args = urlopen_mock.call_args[0]
|
||||||
req = args[0]
|
req = args[0]
|
||||||
assert req.headers.get('Authorization') == b'Basic dXNlcjpwYXNzd2Q='
|
assert req.headers.get('Authorization') == b'Basic dXNlcjpwYXNzd2Q='
|
||||||
|
|
||||||
r = open_url('http://foo.ansible.com/')
|
r = Request().open('GET', 'http://foo.ansible.com/')
|
||||||
args = urlopen_mock.call_args[0]
|
args = urlopen_mock.call_args[0]
|
||||||
req = args[0]
|
req = args[0]
|
||||||
assert 'Authorization' not in req.headers
|
assert 'Authorization' not in req.headers
|
||||||
|
|
||||||
monkeypatch.setenv('NETRC', os.path.join(here, 'fixtures/netrc.nonexistant'))
|
monkeypatch.setenv('NETRC', os.path.join(here, 'fixtures/netrc.nonexistant'))
|
||||||
r = open_url('http://ansible.com/')
|
r = Request().open('GET', 'http://ansible.com/')
|
||||||
args = urlopen_mock.call_args[0]
|
args = urlopen_mock.call_args[0]
|
||||||
req = args[0]
|
req = args[0]
|
||||||
assert 'Authorization' not in req.headers
|
assert 'Authorization' not in req.headers
|
||||||
|
|
||||||
|
|
||||||
def test_open_url_no_proxy(urlopen_mock, install_opener_mock, mocker):
|
def test_Request_open_no_proxy(urlopen_mock, install_opener_mock, mocker):
|
||||||
build_opener_mock = mocker.patch('ansible.module_utils.urls.urllib_request.build_opener')
|
build_opener_mock = mocker.patch('ansible.module_utils.urls.urllib_request.build_opener')
|
||||||
|
|
||||||
r = open_url('http://ansible.com/', use_proxy=False)
|
r = Request().open('GET', 'http://ansible.com/', use_proxy=False)
|
||||||
|
|
||||||
handlers = build_opener_mock.call_args[0]
|
handlers = build_opener_mock.call_args[0]
|
||||||
found_handlers = []
|
found_handlers = []
|
||||||
|
@ -202,8 +257,8 @@ def test_open_url_no_proxy(urlopen_mock, install_opener_mock, mocker):
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.skipif(not HAS_SSLCONTEXT, reason="requires SSLContext")
|
@pytest.mark.skipif(not HAS_SSLCONTEXT, reason="requires SSLContext")
|
||||||
def test_open_url_no_validate_certs(urlopen_mock, install_opener_mock):
|
def test_Request_open_no_validate_certs(urlopen_mock, install_opener_mock):
|
||||||
r = open_url('https://ansible.com/', validate_certs=False)
|
r = Request().open('GET', 'https://ansible.com/', validate_certs=False)
|
||||||
|
|
||||||
opener = install_opener_mock.call_args[0][0]
|
opener = install_opener_mock.call_args[0][0]
|
||||||
handlers = opener.handlers
|
handlers = opener.handlers
|
||||||
|
@ -224,13 +279,13 @@ def test_open_url_no_validate_certs(urlopen_mock, install_opener_mock):
|
||||||
assert context.check_hostname is False
|
assert context.check_hostname is False
|
||||||
|
|
||||||
|
|
||||||
def test_open_url_client_cert(urlopen_mock, install_opener_mock):
|
def test_Request_open_client_cert(urlopen_mock, install_opener_mock):
|
||||||
here = os.path.dirname(__file__)
|
here = os.path.dirname(__file__)
|
||||||
|
|
||||||
client_cert = os.path.join(here, 'fixtures/client.pem')
|
client_cert = os.path.join(here, 'fixtures/client.pem')
|
||||||
client_key = os.path.join(here, 'fixtures/client.key')
|
client_key = os.path.join(here, 'fixtures/client.key')
|
||||||
|
|
||||||
r = open_url('https://ansible.com/', client_cert=client_cert, client_key=client_key)
|
r = Request().open('GET', 'https://ansible.com/', client_cert=client_cert, client_key=client_key)
|
||||||
|
|
||||||
opener = install_opener_mock.call_args[0][0]
|
opener = install_opener_mock.call_args[0][0]
|
||||||
handlers = opener.handlers
|
handlers = opener.handlers
|
||||||
|
@ -252,8 +307,8 @@ def test_open_url_client_cert(urlopen_mock, install_opener_mock):
|
||||||
assert https_connection.cert_file == client_cert
|
assert https_connection.cert_file == client_cert
|
||||||
|
|
||||||
|
|
||||||
def test_open_url_cookies(urlopen_mock, install_opener_mock):
|
def test_Request_open_cookies(urlopen_mock, install_opener_mock):
|
||||||
r = open_url('https://ansible.com/', cookies=cookiejar.CookieJar())
|
r = Request().open('GET', 'https://ansible.com/', cookies=cookiejar.CookieJar())
|
||||||
|
|
||||||
opener = install_opener_mock.call_args[0][0]
|
opener = install_opener_mock.call_args[0][0]
|
||||||
handlers = opener.handlers
|
handlers = opener.handlers
|
||||||
|
@ -267,13 +322,13 @@ def test_open_url_cookies(urlopen_mock, install_opener_mock):
|
||||||
assert cookies_handler is not None
|
assert cookies_handler is not None
|
||||||
|
|
||||||
|
|
||||||
def test_open_url_invalid_method(urlopen_mock, install_opener_mock):
|
def test_Request_open_invalid_method(urlopen_mock, install_opener_mock):
|
||||||
with pytest.raises(ConnectionError):
|
with pytest.raises(ConnectionError):
|
||||||
r = open_url('https://ansible.com/', method='BOGUS')
|
r = Request().open('BOGUS', 'https://ansible.com/')
|
||||||
|
|
||||||
|
|
||||||
def test_open_url_custom_method(urlopen_mock, install_opener_mock):
|
def test_Request_open_custom_method(urlopen_mock, install_opener_mock):
|
||||||
r = open_url('https://ansible.com/', method='DELETE')
|
r = Request().open('DELETE', 'https://ansible.com/')
|
||||||
|
|
||||||
args = urlopen_mock.call_args[0]
|
args = urlopen_mock.call_args[0]
|
||||||
req = args[0]
|
req = args[0]
|
||||||
|
@ -281,8 +336,8 @@ def test_open_url_custom_method(urlopen_mock, install_opener_mock):
|
||||||
assert isinstance(req, RequestWithMethod)
|
assert isinstance(req, RequestWithMethod)
|
||||||
|
|
||||||
|
|
||||||
def test_open_url_user_agent(urlopen_mock, install_opener_mock):
|
def test_Request_open_user_agent(urlopen_mock, install_opener_mock):
|
||||||
r = open_url('https://ansible.com/', http_agent='ansible-tests')
|
r = Request().open('GET', 'https://ansible.com/', http_agent='ansible-tests')
|
||||||
|
|
||||||
args = urlopen_mock.call_args[0]
|
args = urlopen_mock.call_args[0]
|
||||||
req = args[0]
|
req = args[0]
|
||||||
|
@ -290,8 +345,8 @@ def test_open_url_user_agent(urlopen_mock, install_opener_mock):
|
||||||
assert req.headers.get('User-agent') == 'ansible-tests'
|
assert req.headers.get('User-agent') == 'ansible-tests'
|
||||||
|
|
||||||
|
|
||||||
def test_open_url_force(urlopen_mock, install_opener_mock):
|
def test_Request_open_force(urlopen_mock, install_opener_mock):
|
||||||
r = open_url('https://ansible.com/', force=True, last_mod_time=datetime.datetime.now())
|
r = Request().open('GET', 'https://ansible.com/', force=True, last_mod_time=datetime.datetime.now())
|
||||||
|
|
||||||
args = urlopen_mock.call_args[0]
|
args = urlopen_mock.call_args[0]
|
||||||
req = args[0]
|
req = args[0]
|
||||||
|
@ -300,9 +355,9 @@ def test_open_url_force(urlopen_mock, install_opener_mock):
|
||||||
assert 'If-modified-since' not in req.headers
|
assert 'If-modified-since' not in req.headers
|
||||||
|
|
||||||
|
|
||||||
def test_open_url_last_mod(urlopen_mock, install_opener_mock):
|
def test_Request_open_last_mod(urlopen_mock, install_opener_mock):
|
||||||
now = datetime.datetime.now()
|
now = datetime.datetime.now()
|
||||||
r = open_url('https://ansible.com/', last_mod_time=now)
|
r = Request().open('GET', 'https://ansible.com/', last_mod_time=now)
|
||||||
|
|
||||||
args = urlopen_mock.call_args[0]
|
args = urlopen_mock.call_args[0]
|
||||||
req = args[0]
|
req = args[0]
|
||||||
|
@ -310,6 +365,38 @@ def test_open_url_last_mod(urlopen_mock, install_opener_mock):
|
||||||
assert req.headers.get('If-modified-since') == now.strftime('%a, %d %b %Y %H:%M:%S +0000')
|
assert req.headers.get('If-modified-since') == now.strftime('%a, %d %b %Y %H:%M:%S +0000')
|
||||||
|
|
||||||
|
|
||||||
def test_open_url_headers_not_dict(urlopen_mock, install_opener_mock):
|
def test_Request_open_headers_not_dict(urlopen_mock, install_opener_mock):
|
||||||
with pytest.raises(ValueError):
|
with pytest.raises(ValueError):
|
||||||
r = open_url('https://ansible.com/', headers=['bob'])
|
Request().open('GET', 'https://ansible.com/', headers=['bob'])
|
||||||
|
|
||||||
|
|
||||||
|
def test_Request_init_headers_not_dict(urlopen_mock, install_opener_mock):
|
||||||
|
with pytest.raises(ValueError):
|
||||||
|
Request(headers=['bob'])
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize('method,kwargs', [
|
||||||
|
('get', {}),
|
||||||
|
('options', {}),
|
||||||
|
('head', {}),
|
||||||
|
('post', {'data': None}),
|
||||||
|
('put', {'data': None}),
|
||||||
|
('patch', {'data': None}),
|
||||||
|
('delete', {}),
|
||||||
|
])
|
||||||
|
def test_methods(method, kwargs, mocker):
|
||||||
|
expected = method.upper()
|
||||||
|
open_mock = mocker.patch('ansible.module_utils.urls.Request.open')
|
||||||
|
request = Request()
|
||||||
|
getattr(request, method)('https://ansible.com')
|
||||||
|
open_mock.assert_called_once_with(expected, 'https://ansible.com', **kwargs)
|
||||||
|
|
||||||
|
|
||||||
|
def test_open_url(urlopen_mock, install_opener_mock, mocker):
|
||||||
|
req_mock = mocker.patch('ansible.module_utils.urls.Request.open')
|
||||||
|
open_url('https://ansible.com/')
|
||||||
|
req_mock.assert_called_once_with('GET', 'https://ansible.com/', data=None, headers=None, use_proxy=True,
|
||||||
|
force=False, last_mod_time=None, timeout=10, validate_certs=True,
|
||||||
|
url_username=None, url_password=None, http_agent=None,
|
||||||
|
force_basic_auth=False, follow_redirects='urllib2',
|
||||||
|
client_cert=None, client_key=None, cookies=None)
|
Loading…
Reference in a new issue