mirror of
https://github.com/ansible-collections/community.general.git
synced 2024-09-14 20:13:21 +02:00
ACI module_utils library for ACI modules (#27070)
* ACI module_utils library for ACI modules This PR includes: - the ACI argument_spec - an aci_login function - an experimental aci_request function - an aci_response function - included the ACI team * New prototype using ACIModule This PR includes: - A new ACIModule object with various useful methods
This commit is contained in:
parent
82a0a05406
commit
97aaf103e8
6 changed files with 287 additions and 152 deletions
4
.github/BOTMETA.yml
vendored
4
.github/BOTMETA.yml
vendored
|
@ -27,6 +27,7 @@
|
|||
#
|
||||
|
||||
files:
|
||||
$module_utils/aci.py: dagwieers schunduri
|
||||
$modules/cloud/amazon/:
|
||||
notify:
|
||||
- willthames
|
||||
|
@ -433,7 +434,7 @@ files:
|
|||
$modules/net_tools/omapi_host.py: nerzhul
|
||||
$modules/net_tools/snmp_facts.py: ogenstad
|
||||
$modules/network/a10/: ericchou1 mischapeters
|
||||
$modules/network/aci/: dagwieers jedelman8 schunduri
|
||||
$modules/network/aci/: dagwieers schunduri
|
||||
$modules/network/aos/: dgarros jeremyschulman
|
||||
$modules/network/asa/asa_acl.py: gundalow ogenstad
|
||||
$modules/network/asa/asa_command.py: gundalow ogenstad privateip
|
||||
|
@ -899,6 +900,7 @@ files:
|
|||
keywords:
|
||||
- validate-modules
|
||||
macros:
|
||||
module_utils: lib/ansible/module_utils
|
||||
modules: lib/ansible/modules
|
||||
team_ansible: []
|
||||
team_avi: ericsysmin grastogi23 khaltore
|
||||
|
|
224
lib/ansible/module_utils/aci.py
Normal file
224
lib/ansible/module_utils/aci.py
Normal file
|
@ -0,0 +1,224 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
|
||||
# Copyright 2017 Dag Wieers <dag@wieers.com>
|
||||
# Copyright 2017 Swetha Chunduri (@schunduri)
|
||||
|
||||
# This file is part of Ansible by Red Hat
|
||||
#
|
||||
# Ansible is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# Ansible is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import json
|
||||
|
||||
from ansible.module_utils.basic import get_exception
|
||||
from ansible.module_utils.urls import fetch_url
|
||||
from ansible.module_utils._text import to_bytes
|
||||
|
||||
# Optional, only used for XML payload
|
||||
try:
|
||||
import lxml.etree
|
||||
HAS_LXML_ETREE = True
|
||||
except ImportError:
|
||||
HAS_LXML_ETREE = False
|
||||
|
||||
# Optional, only used for XML payload
|
||||
try:
|
||||
from xmljson import cobra
|
||||
HAS_XMLJSON_COBRA = True
|
||||
except ImportError:
|
||||
HAS_XMLJSON_COBRA = False
|
||||
|
||||
|
||||
aci_argument_spec = dict(
|
||||
hostname=dict(type='str', required=True, aliases=['host']),
|
||||
username=dict(type='str', default='admin', aliases=['user']),
|
||||
password=dict(type='str', required=True, no_log=True),
|
||||
protocol=dict(type='str', removed_in_version='2.6'), # Deprecated in v2.6
|
||||
timeout=dict(type='int', default=30),
|
||||
use_ssl=dict(type='bool', default=True),
|
||||
validate_certs=dict(type='bool', default=True),
|
||||
)
|
||||
|
||||
|
||||
def aci_response_error(result):
|
||||
''' Set error information when found '''
|
||||
result['error_code'] = 0
|
||||
result['error_text'] = 'Success'
|
||||
# Handle possible APIC error information
|
||||
if result['totalCount'] != '0':
|
||||
try:
|
||||
result['error_code'] = result['imdata'][0]['error']['attributes']['code']
|
||||
result['error_text'] = result['imdata'][0]['error']['attributes']['text']
|
||||
except (KeyError, IndexError):
|
||||
pass
|
||||
|
||||
|
||||
def aci_response_json(result, rawoutput):
|
||||
''' Handle APIC JSON response output '''
|
||||
try:
|
||||
result.update(json.loads(rawoutput))
|
||||
except:
|
||||
e = get_exception()
|
||||
# Expose RAW output for troubleshooting
|
||||
result.update(raw=rawoutput, error_code=-1, error_text="Unable to parse output as JSON, see 'raw' output. %s" % e)
|
||||
return
|
||||
|
||||
# Handle possible APIC error information
|
||||
aci_response_error(result)
|
||||
|
||||
|
||||
def aci_response_xml(result, rawoutput):
|
||||
''' Handle APIC XML response output '''
|
||||
|
||||
# NOTE: The XML-to-JSON conversion is using the "Cobra" convention
|
||||
try:
|
||||
xml = lxml.etree.fromstring(to_bytes(rawoutput))
|
||||
xmldata = cobra.data(xml)
|
||||
except:
|
||||
e = get_exception()
|
||||
# Expose RAW output for troubleshooting
|
||||
result.update(raw=rawoutput, error_code=-1, error_text="Unable to parse output as XML, see 'raw' output. %s" % e)
|
||||
return
|
||||
|
||||
# Reformat as ACI does for JSON API output
|
||||
try:
|
||||
result.update(imdata=xmldata['imdata']['children'])
|
||||
except KeyError:
|
||||
result['imdata'] = dict()
|
||||
result['totalCount'] = xmldata['imdata']['attributes']['totalCount']
|
||||
|
||||
# Handle possible APIC error information
|
||||
aci_response_error(result)
|
||||
|
||||
|
||||
class ACIModule(object):
|
||||
|
||||
def __init__(self, module):
|
||||
self.module = module
|
||||
self.params = module.params
|
||||
self.result = dict(changed=False)
|
||||
self.headers = None
|
||||
|
||||
self.login()
|
||||
|
||||
def define_protocol(self):
|
||||
''' Set protocol based on use_ssl parameter '''
|
||||
|
||||
# Set protocol for further use
|
||||
if self.params['protocol'] in ('http', 'https'):
|
||||
self.module.deprecate("Parameter 'protocol' is deprecated, please use 'use_ssl' instead.", '2.6')
|
||||
elif self.params['protocol'] is None:
|
||||
self.params['protocol'] = 'https' if self.params.get('use_ssl', True) else 'http'
|
||||
else:
|
||||
self.module.fail_json(msg="Parameter 'protocol' needs to be one of ( http, https )")
|
||||
|
||||
def define_method(self):
|
||||
''' Set method based on state parameter '''
|
||||
|
||||
# Handle deprecated method/action parameter
|
||||
if self.params['method']:
|
||||
self.module.deprecate("Parameter 'method' or 'action' is deprecated, please use 'state' instead", '2.6')
|
||||
method_map = dict(delete='absent', get='query', post='present')
|
||||
self.params['state'] = method_map[self.params['method']]
|
||||
else:
|
||||
state_map = dict(absent='delete', present='post', query='get')
|
||||
self.params['method'] = state_map[self.params['state']]
|
||||
|
||||
def login(self):
|
||||
''' Log in to APIC '''
|
||||
|
||||
# Ensure protocol is set (only do this once)
|
||||
self.define_protocol()
|
||||
|
||||
# Perform login request
|
||||
url = '%(protocol)s://%(hostname)s/api/aaaLogin.json' % self.params
|
||||
payload = {'aaaUser': {'attributes': {'name': self.params['username'], 'pwd': self.params['password']}}}
|
||||
resp, auth = fetch_url(self.module, url, data=json.dumps(payload), method='POST', timeout=self.params['timeout'])
|
||||
|
||||
# Handle APIC response
|
||||
if auth['status'] != 200:
|
||||
self.result['response'] = auth['msg']
|
||||
self.result['status'] = auth['status']
|
||||
try:
|
||||
# APIC error
|
||||
aci_response_json(self.result, auth['body'])
|
||||
self.module.fail_json(msg='Authentication failed: %(error_code)s %(error_text)s' % self.result, **self.result)
|
||||
except KeyError:
|
||||
# Connection error
|
||||
self.module.fail_json(msg='Authentication failed for %(url)s. %(msg)s' % auth)
|
||||
|
||||
# Retain cookie for later use
|
||||
self.headers = dict(Cookie=resp.headers['Set-Cookie'])
|
||||
|
||||
def request(self, path, payload=None):
|
||||
''' Perform a REST request '''
|
||||
|
||||
# Ensure method is set (only do this once)
|
||||
self.define_method()
|
||||
|
||||
# Perform request
|
||||
self.result['url'] = '%(protocol)s://%(hostname)s/' % self.params + path.lstrip('/')
|
||||
resp, info = fetch_url(self.module,
|
||||
url=self.result['url'],
|
||||
data=payload,
|
||||
method=self.params['method'].upper(),
|
||||
timeout=self.params['timeout'],
|
||||
headers=self.headers)
|
||||
self.result['response'] = info['msg']
|
||||
self.result['status'] = info['status']
|
||||
|
||||
# Handle APIC response
|
||||
if info['status'] != 200:
|
||||
try:
|
||||
# APIC error
|
||||
aci_response_json(self.result, info['body'])
|
||||
self.module.fail_json(msg='Request failed: %(error_code)s %(error_text)s' % self.result, **self.result)
|
||||
except KeyError:
|
||||
# Connection error
|
||||
self.module.fail_json(msg='Request failed for %(url)s. %(msg)s' % info)
|
||||
|
||||
aci_response_json(self.result, resp.read())
|
||||
|
||||
def request_diff(self, path, payload=None):
|
||||
''' Perform a request, including a proper diff output '''
|
||||
self.result['diff'] = dict()
|
||||
self.result['diff']['before'] = self.query()
|
||||
self.request(path, payload=payload)
|
||||
# TODO: Check if we can use the request output for the 'after' diff
|
||||
self.result['diff']['after'] = self.query()
|
||||
|
||||
if self.result['diff']['before'] != self.result['diff']['after']:
|
||||
self.result['changed'] = True
|
||||
|
||||
def query(self, path):
|
||||
''' Perform a query with no payload '''
|
||||
url = '%(protocol)s://%(hostname)s/' % self.params + path.lstrip('/')
|
||||
resp, query = fetch_url(self.module, url=url, data=None, method='GET',
|
||||
timeout=self.params['timeout'],
|
||||
headers=self.headers)
|
||||
|
||||
# Handle APIC response
|
||||
if query['status'] != 200:
|
||||
result['response'] = query['msg']
|
||||
result['status'] = query['status']
|
||||
try:
|
||||
# APIC error
|
||||
aci_response_json(self.result, query['body'])
|
||||
module.fail_json(msg='Query failed: %(error_code)s %(error_text)s' % self.result, **self.result)
|
||||
except KeyError:
|
||||
# Connection error
|
||||
module.fail_json(msg='Query failed for %(url)s. %(msg)s' % query)
|
||||
|
||||
query = json.loads(resp.read())
|
||||
|
||||
return json.dumps(query['imdata'], sort_keys=True, indent=2) + '\n'
|
|
@ -1,8 +1,8 @@
|
|||
#!/usr/bin/python
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# Copyright 2015 Jason Edelman <jason@networktocode.com>, Network to Code, LLC
|
||||
# Copyright 2017 Dag Wieers <dag@wieers.com>
|
||||
# Copyright 2017 Swetha Chunduri (@schunduri)
|
||||
|
||||
# This file is part of Ansible by Red Hat
|
||||
#
|
||||
|
@ -31,8 +31,8 @@ description:
|
|||
- More information regarding the Cisco APIC REST API is available from
|
||||
U(http://www.cisco.com/c/en/us/td/docs/switches/datacenter/aci/apic/sw/2-x/rest_cfg/2_1_x/b_Cisco_APIC_REST_API_Configuration_Guide.html).
|
||||
author:
|
||||
- Jason Edelman (@jedelman8)
|
||||
- Dag Wieers (@dagwieers)
|
||||
- Swetha Chunduri (@schunduri)
|
||||
version_added: '2.4'
|
||||
requirements:
|
||||
- lxml (when using XML content)
|
||||
|
@ -46,7 +46,7 @@ options:
|
|||
- Using C(delete) is typically used for deleting objects.
|
||||
- Using C(get) is typically used for querying objects.
|
||||
- Using C(post) is typically used for modifying objects.
|
||||
required: true
|
||||
required: yes
|
||||
default: get
|
||||
choices: [ delete, get, post ]
|
||||
aliases: [ action ]
|
||||
|
@ -54,7 +54,7 @@ options:
|
|||
description:
|
||||
- URI being used to execute API calls.
|
||||
- Must end in C(.xml) or C(.json).
|
||||
required: true
|
||||
required: yes
|
||||
aliases: [ uri ]
|
||||
content:
|
||||
description:
|
||||
|
@ -192,131 +192,42 @@ try:
|
|||
except ImportError:
|
||||
HAS_XMLJSON_COBRA = False
|
||||
|
||||
# from ansible.module_utils.aci import aci_login
|
||||
from ansible.module_utils.basic import AnsibleModule, get_exception
|
||||
from ansible.module_utils.aci import ACIModule, aci_argument_spec, aci_response_json, aci_response_xml
|
||||
from ansible.module_utils.basic import AnsibleModule
|
||||
from ansible.module_utils.urls import fetch_url
|
||||
from ansible.module_utils._text import to_bytes
|
||||
|
||||
|
||||
aci_argument_spec = dict(
|
||||
hostname=dict(type='str', required=True, aliases=['host']),
|
||||
username=dict(type='str', default='admin', aliases=['user']),
|
||||
password=dict(type='str', required=True, no_log=True),
|
||||
protocol=dict(type='str'), # Deprecated in v2.8
|
||||
timeout=dict(type='int', default=30),
|
||||
use_ssl=dict(type='bool', default=True),
|
||||
validate_certs=dict(type='bool', default=True),
|
||||
)
|
||||
|
||||
|
||||
def aci_login(module, result=dict()):
|
||||
''' Log in to APIC '''
|
||||
|
||||
# Set protocol based on use_ssl parameter
|
||||
if module.params['protocol'] is None:
|
||||
module.params['protocol'] = 'https' if module.params.get('use_ssl', True) else 'http'
|
||||
|
||||
# Perform login request
|
||||
url = '%(protocol)s://%(hostname)s/api/aaaLogin.json' % module.params
|
||||
data = {'aaaUser': {'attributes': {'name': module.params['username'], 'pwd': module.params['password']}}}
|
||||
resp, auth = fetch_url(module, url, data=json.dumps(data), method="POST", timeout=module.params['timeout'])
|
||||
|
||||
# Handle APIC response
|
||||
if auth['status'] != 200:
|
||||
try:
|
||||
result.update(aci_response(auth['body'], 'json'))
|
||||
result['msg'] = 'Authentication failed: %(error_code)s %(error_text)s' % result
|
||||
except KeyError:
|
||||
result['msg'] = '%(msg)s for %(url)s' % auth
|
||||
result['response'] = auth['msg']
|
||||
result['status'] = auth['status']
|
||||
module.fail_json(**result)
|
||||
|
||||
return resp
|
||||
|
||||
|
||||
def aci_response(rawoutput, rest_type='xml'):
|
||||
def aci_response(result, rawoutput, rest_type='xml'):
|
||||
''' Handle APIC response output '''
|
||||
result = dict()
|
||||
|
||||
if rest_type == 'json':
|
||||
# Use APIC response as module output
|
||||
try:
|
||||
result = json.loads(rawoutput)
|
||||
except:
|
||||
e = get_exception()
|
||||
# Expose RAW output for troubleshooting
|
||||
result['error_code'] = -1
|
||||
result['error_text'] = "Unable to parse output as JSON, see 'raw' output. %s" % e
|
||||
result['raw'] = rawoutput
|
||||
return result
|
||||
else:
|
||||
# NOTE: The XML-to-JSON conversion is using the "Cobra" convention
|
||||
xmldata = None
|
||||
try:
|
||||
xml = lxml.etree.fromstring(to_bytes(rawoutput))
|
||||
xmldata = cobra.data(xml)
|
||||
except:
|
||||
e = get_exception()
|
||||
# Expose RAW output for troubleshooting
|
||||
result['error_code'] = -1
|
||||
result['error_text'] = "Unable to parse output as XML, see 'raw' output. %s" % e
|
||||
result['raw'] = rawoutput
|
||||
return result
|
||||
aci_response_json(result, rawoutput)
|
||||
|
||||
# Reformat as ACI does for JSON API output
|
||||
if xmldata and 'imdata' in xmldata:
|
||||
if 'children' in xmldata['imdata']:
|
||||
result['imdata'] = xmldata['imdata']['children']
|
||||
else:
|
||||
result['imdata'] = dict()
|
||||
result['totalCount'] = xmldata['imdata']['attributes']['totalCount']
|
||||
|
||||
# Handle possible APIC error information
|
||||
try:
|
||||
result['error_code'] = result['imdata'][0]['error']['attributes']['code']
|
||||
result['error_text'] = result['imdata'][0]['error']['attributes']['text']
|
||||
except KeyError:
|
||||
result['error_code'] = 0
|
||||
result['error_text'] = 'Success'
|
||||
|
||||
return result
|
||||
aci_response_xml(result, rawoutput)
|
||||
|
||||
|
||||
def main():
|
||||
argument_spec = dict(
|
||||
argument_spec = aci_argument_spec
|
||||
argument_spec.update(
|
||||
path=dict(type='str', required=True, aliases=['uri']),
|
||||
method=dict(type='str', default='get', choices=['delete', 'get', 'post'], aliases=['action']),
|
||||
src=dict(type='path', aliases=['config_file']),
|
||||
content=dict(type='str'),
|
||||
)
|
||||
|
||||
argument_spec.update(aci_argument_spec)
|
||||
|
||||
module = AnsibleModule(
|
||||
argument_spec=argument_spec,
|
||||
mutually_exclusive=[['content', 'src']],
|
||||
supports_check_mode=True,
|
||||
)
|
||||
|
||||
hostname = module.params['hostname']
|
||||
username = module.params['username']
|
||||
password = module.params['password']
|
||||
|
||||
path = module.params['path']
|
||||
content = module.params['content']
|
||||
src = module.params['src']
|
||||
|
||||
protocol = module.params['protocol']
|
||||
use_ssl = module.params['use_ssl']
|
||||
method = module.params['method']
|
||||
timeout = module.params['timeout']
|
||||
|
||||
result = dict(
|
||||
changed=False,
|
||||
payload='',
|
||||
)
|
||||
|
||||
# Report missing file
|
||||
file_exists = False
|
||||
if src:
|
||||
|
@ -337,56 +248,45 @@ def main():
|
|||
else:
|
||||
module.fail_json(msg='Failed to find REST API content type (neither .xml nor .json).')
|
||||
|
||||
# Set protocol for further use
|
||||
if protocol is None:
|
||||
protocol = 'https' if use_ssl else 'http'
|
||||
else:
|
||||
module.deprecate("Parameter 'protocol' is deprecated, please use 'use_ssl' instead.", 2.8)
|
||||
aci = ACIModule(module)
|
||||
|
||||
# Perform login first
|
||||
auth = aci_login(module, result)
|
||||
if method == 'get':
|
||||
aci.request()
|
||||
module.exit_json(**aci.result)
|
||||
elif module.check_mode:
|
||||
# In check_mode we assume it works, but we don't actually perform the requested change
|
||||
# TODO: Could we turn this request in a GET instead ?
|
||||
aci.result['changed'] = True
|
||||
module.exit_json(response='OK (Check mode)', status=200, **aci.result)
|
||||
|
||||
# Prepare request data
|
||||
if content:
|
||||
# We include the payload as it may be templated
|
||||
result['payload'] = content
|
||||
payload = content
|
||||
elif file_exists:
|
||||
with open(src, 'r') as config_object:
|
||||
# TODO: Would be nice to template this, requires action-plugin
|
||||
result['payload'] = config_object.read()
|
||||
payload = config_object.read()
|
||||
|
||||
# Ensure changes are reported
|
||||
if method in ('delete', 'post'):
|
||||
# FIXME: Hardcoding changed is not idempotent
|
||||
result['changed'] = True
|
||||
# Perform actual request using auth cookie (Same as aci_request,but also supports XML)
|
||||
url = '%(protocol)s://%(hostname)s/' % aci.params + path.lstrip('/')
|
||||
|
||||
# In check_mode we assume it works, but we don't actually perform the requested change
|
||||
# TODO: Could we turn this request in a GET instead ?
|
||||
if module.check_mode:
|
||||
module.exit_json(response='OK (Check mode)', status=200, **result)
|
||||
else:
|
||||
result['changed'] = False
|
||||
|
||||
# Perform actual request using auth cookie
|
||||
url = '%s://%s/%s' % (protocol, hostname, path.lstrip('/'))
|
||||
headers = dict(Cookie=auth.headers['Set-Cookie'])
|
||||
|
||||
resp, info = fetch_url(module, url, data=result['payload'], method=method.upper(), timeout=timeout, headers=headers)
|
||||
result['response'] = info['msg']
|
||||
result['status'] = info['status']
|
||||
resp, info = fetch_url(module, url, data=payload, method=method.upper(), timeout=timeout, headers=aci.headers)
|
||||
aci.result['response'] = info['msg']
|
||||
aci.result['status'] = info['status']
|
||||
|
||||
# Report failure
|
||||
if info['status'] != 200:
|
||||
try:
|
||||
result.update(aci_response(info['body'], rest_type))
|
||||
result['msg'] = 'Task failed: %(error_code)s %(error_text)s' % result
|
||||
aci_response(aci.result, info['body'], rest_type)
|
||||
module.fail_json(msg='Request failed: %(error_code)s %(error_text)s' % aci.result, **aci.result)
|
||||
except KeyError:
|
||||
result['msg'] = '%(msg)s for %(url)s' % info
|
||||
module.fail_json(**result)
|
||||
module.fail_json(msg='Request failed for %(url)s. %(msg)s' % info, **aci.result)
|
||||
|
||||
aci_response(aci.result, resp.read(), rest_type)
|
||||
|
||||
# Report success
|
||||
result.update(aci_response(resp.read(), rest_type))
|
||||
module.exit_json(**result)
|
||||
module.exit_json(**aci.result)
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
|
||||
# Copyright 2017 Dag Wieers <dag@wieers.com>
|
||||
# Copyright 2017 Swetha Chunduri (@schunduri)
|
||||
|
||||
# This file is part of Ansible by Red Hat
|
||||
#
|
||||
|
@ -25,18 +26,18 @@ options:
|
|||
hostname:
|
||||
description:
|
||||
- IP Address or hostname of APIC resolvable by Ansible control host.
|
||||
required: true
|
||||
required: yes
|
||||
aliases: [ host ]
|
||||
username:
|
||||
description:
|
||||
- The username to use for authentication.
|
||||
required: true
|
||||
required: yes
|
||||
default: admin
|
||||
aliases: [ user ]
|
||||
password:
|
||||
description:
|
||||
- The password to use for authentication.
|
||||
required: true
|
||||
required: yes
|
||||
timeout:
|
||||
description:
|
||||
- The socket level timeout in seconds.
|
||||
|
|
|
@ -21,7 +21,7 @@
|
|||
import sys
|
||||
|
||||
from ansible.compat.tests import unittest
|
||||
from ansible.modules.network.aci.aci_rest import aci_response
|
||||
from ansible.module_utils.aci import aci_response_json, aci_response_xml
|
||||
|
||||
from nose.plugins.skip import SkipTest
|
||||
|
||||
|
@ -53,7 +53,8 @@ class AciRest(unittest.TestCase):
|
|||
}
|
||||
|
||||
json_response = '{"totalCount":"1","imdata":[{"error":{"attributes":{"code":"401","text":"Username or password is incorrect - FAILED local authentication"}}}]}' # NOQA
|
||||
json_result = aci_response(json_response, 'json')
|
||||
json_result = dict()
|
||||
aci_response_json(json_result, json_response)
|
||||
self.assertEqual(expected_result, json_result)
|
||||
|
||||
# Python 2.7+ is needed for xmljson
|
||||
|
@ -64,7 +65,8 @@ class AciRest(unittest.TestCase):
|
|||
<error code="401" text="Username or password is incorrect - FAILED local authentication"/>
|
||||
</imdata>
|
||||
'''
|
||||
xml_result = aci_response(xml_response, 'xml')
|
||||
xml_result = dict()
|
||||
aci_response_xml(xml_result, xml_response)
|
||||
self.assertEqual(json_result, xml_result)
|
||||
|
||||
def test_valid_aci_login(self):
|
||||
|
@ -149,16 +151,17 @@ class AciRest(unittest.TestCase):
|
|||
}
|
||||
|
||||
json_response = '{"totalCount":"1","imdata":[{"aaaLogin":{"attributes":{"token":"ZldYAsoO9d0FfAQM8xaEVWvQPSOYwpnqzhwpIC1r4MaToknJjlIuAt9+TvXqrZ8lWYIGPj6VnZkWiS8nJfaiaX/AyrdD35jsSxiP3zydh+849xym7ALCw/fFNsc7b5ik1HaMuSUtdrN8fmCEUy7Pq/QNpGEqkE8m7HaxAuHpmvXgtdW1bA+KKJu2zY1c/tem","siteFingerprint":"NdxD72K/uXaUK0wn","refreshTimeoutSeconds":"600","maximumLifetimeSeconds":"86400","guiIdleTimeoutSeconds":"1200","restTimeoutSeconds":"90","creationTime":"1500134817","firstLoginTime":"1500134817","userName":"admin","remoteUser":"false","unixUserId":"15374","sessionId":"o7hObsqNTfCmDGcZI5c4ng==","lastName":"","firstName":"","version":"2.0(2f)","buildTime":"Sat Aug 20 23:07:07 PDT 2016","node":"topology/pod-1/node-1"},"children":[{"aaaUserDomain":{"attributes":{"name":"all","rolesR":"admin","rolesW":"admin"},"children":[{"aaaReadRoles":{"attributes":{}}},{"aaaWriteRoles":{"attributes":{},"children":[{"role":{"attributes":{"name":"admin"}}}]}}]}},{"DnDomainMapEntry":{"attributes":{"dn":"uni/tn-common","readPrivileges":"admin","writePrivileges":"admin"}}},{"DnDomainMapEntry":{"attributes":{"dn":"uni/tn-infra","readPrivileges":"admin","writePrivileges":"admin"}}},{"DnDomainMapEntry":{"attributes":{"dn":"uni/tn-mgmt","readPrivileges":"admin","writePrivileges":"admin"}}}]}}]}' # NOQA
|
||||
json_result = aci_response(json_response, 'json')
|
||||
json_result = dict()
|
||||
aci_response_json(json_result, json_response)
|
||||
self.assertEqual(expected_result, json_result)
|
||||
|
||||
# Python 2.7+ is needed for xmljson
|
||||
if sys.version_info < (2, 7):
|
||||
return
|
||||
|
||||
xml_response = '<?xml version="1.0" encoding="UTF-8"?><imdata totalCount="1">\n<aaaLogin token="ZldYAsoO9d0FfAQM8xaEVWvQPSOYwpnqzhwpIC1r4MaToknJjlIuAt9+TvXqrZ8lWYIGPj6VnZkWiS8nJfaiaX/AyrdD35jsSxiP3zydh+849xym7ALCw/fFNsc7b5ik1HaMuSUtdrN8fmCEUy7Pq/QNpGEqkE8m7HaxAuHpmvXgtdW1bA+KKJu2zY1c/tem" siteFingerprint="NdxD72K/uXaUK0wn" refreshTimeoutSeconds="600" maximumLifetimeSeconds="86400" guiIdleTimeoutSeconds="1200" restTimeoutSeconds="90" creationTime="1500134817" firstLoginTime="1500134817" userName="admin" remoteUser="false" unixUserId="15374" sessionId="o7hObsqNTfCmDGcZI5c4ng==" lastName="" firstName="" version="2.0(2f)" buildTime="Sat Aug 20 23:07:07 PDT 2016" node="topology/pod-1/node-1">\n<aaaUserDomain name="all" rolesR="admin" rolesW="admin">\n<aaaReadRoles/>\n<aaaWriteRoles>\n<role name="admin"/>\n</aaaWriteRoles>\n</aaaUserDomain>\n<DnDomainMapEntry dn="uni/tn-common" readPrivileges="admin" writePrivileges="admin"/>\n<DnDomainMapEntry dn="uni/tn-infra" readPrivileges="admin" writePrivileges="admin"/>\n<DnDomainMapEntry dn="uni/tn-mgmt" readPrivileges="admin" writePrivileges="admin"/>\n</aaaLogin></imdata>\n''' # NOQA
|
||||
xml_result = aci_response(xml_response, 'xml')
|
||||
|
||||
self.assertEqual(expected_result, json_result)
|
||||
xml_result = dict()
|
||||
aci_response_xml(xml_result, xml_response)
|
||||
self.assertEqual(json_result, xml_result)
|
||||
|
||||
def test_invalid_input(self):
|
||||
|
@ -179,7 +182,9 @@ class AciRest(unittest.TestCase):
|
|||
}
|
||||
|
||||
json_response = '{"totalCount":"1","imdata":[{"error":{"attributes":{"code":"401","text":"Username or password is incorrect - FAILED local authentication"}}}]}' # NOQA
|
||||
json_result = aci_response(json_response, 'json')
|
||||
json_result = dict()
|
||||
aci_response_json(json_result, json_response)
|
||||
self.assertEqual(expected_result, json_result)
|
||||
|
||||
# Python 2.7+ is needed for xmljson
|
||||
if sys.version_info < (2, 7):
|
||||
|
@ -189,9 +194,8 @@ class AciRest(unittest.TestCase):
|
|||
<error code="401" text="Username or password is incorrect - FAILED local authentication"/>
|
||||
</imdata>
|
||||
'''
|
||||
xml_result = aci_response(xml_response, 'xml')
|
||||
|
||||
self.assertEqual(expected_result, json_result)
|
||||
xml_result = dict()
|
||||
aci_response_xml(xml_result, xml_response)
|
||||
self.assertEqual(json_result, xml_result)
|
||||
|
||||
def test_empty_response(self):
|
||||
|
@ -212,7 +216,8 @@ class AciRest(unittest.TestCase):
|
|||
}
|
||||
|
||||
json_response = ''
|
||||
json_result = aci_response(json_response, 'json')
|
||||
json_result = dict()
|
||||
aci_response_json(json_result, json_response)
|
||||
self.assertEqual(expected_json_result, json_result)
|
||||
|
||||
# Python 2.7+ is needed for xmljson
|
||||
|
@ -241,7 +246,8 @@ class AciRest(unittest.TestCase):
|
|||
}
|
||||
|
||||
xml_response = ''
|
||||
xml_result = aci_response(xml_response, 'xml')
|
||||
xml_result = dict()
|
||||
aci_response_xml(xml_result, xml_response)
|
||||
self.assertEqual(expected_xml_result, xml_result)
|
||||
|
||||
def test_invalid_response(self):
|
||||
|
@ -269,7 +275,8 @@ class AciRest(unittest.TestCase):
|
|||
}
|
||||
|
||||
json_response = '{ "aaa":'
|
||||
json_result = aci_response(json_response, 'json')
|
||||
json_result = dict()
|
||||
aci_response_json(json_result, json_response)
|
||||
self.assertEqual(expected_json_result, json_result)
|
||||
|
||||
# Python 2.7+ is needed for xmljson
|
||||
|
@ -298,5 +305,6 @@ class AciRest(unittest.TestCase):
|
|||
}
|
||||
|
||||
xml_response = '<aaa '
|
||||
xml_result = aci_response(xml_response, 'xml')
|
||||
xml_result = dict()
|
||||
aci_response_xml(xml_result, xml_response)
|
||||
self.assertEqual(expected_xml_result, xml_result)
|
Loading…
Reference in a new issue