#!/usr/bin/python # -*- coding: utf-8 -*- # # Copyright (c) 2021, Tyler Gates <tgates81@gmail.com> # # GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) # SPDX-License-Identifier: GPL-3.0-or-later from __future__ import (absolute_import, division, print_function) __metaclass__ = type DOCUMENTATION = r''' --- module: spectrum_model_attrs short_description: Enforce a model's attributes in CA Spectrum. description: - This module can be used to enforce a model's attributes in CA Spectrum. version_added: 2.5.0 author: - Tyler Gates (@tgates81) notes: - Tested on CA Spectrum version 10.4.2.0.189. - Model creation and deletion are not possible with this module. For that use M(community.general.spectrum_device) instead. requirements: - 'python >= 2.7' options: url: description: - URL of OneClick server. type: str required: true url_username: description: - OneClick username. type: str required: true aliases: [username] url_password: description: - OneClick password. type: str required: true aliases: [password] use_proxy: description: - if C(false), it will not use a proxy, even if one is defined in an environment variable on the target hosts. default: true required: false type: bool name: description: - Model name. type: str required: true type: description: - Model type. type: str required: true validate_certs: description: - Validate SSL certificates. Only change this to C(false) if you can guarantee that you are talking to the correct endpoint and there is no man-in-the-middle attack happening. type: bool default: true required: false attributes: description: - A list of attribute names and values to enforce. - All values and parameters are case sensitive and must be provided as strings only. required: true type: list elements: dict suboptions: name: description: - Attribute name OR hex ID. - 'Currently defined names are:' - ' C(App_Manufacturer) (C(0x230683))' - ' C(CollectionsModelNameString) (C(0x12adb))' - ' C(Condition) (C(0x1000a))' - ' C(Criticality) (C(0x1290c))' - ' C(DeviceType) (C(0x23000e))' - ' C(isManaged) (C(0x1295d))' - ' C(Model_Class) (C(0x11ee8))' - ' C(Model_Handle) (C(0x129fa))' - ' C(Model_Name) (C(0x1006e))' - ' C(Modeltype_Handle) (C(0x10001))' - ' C(Modeltype_Name) (C(0x10000))' - ' C(Network_Address) (C(0x12d7f))' - ' C(Notes) (C(0x11564))' - ' C(ServiceDesk_Asset_ID) (C(0x12db9))' - ' C(TopologyModelNameString) (C(0x129e7))' - ' C(sysDescr) (C(0x10052))' - ' C(sysName) (C(0x10b5b))' - ' C(Vendor_Name) (C(0x11570))' - ' C(Description) (C(0x230017))' - Hex IDs are the direct identifiers in Spectrum and will always work. - 'To lookup hex IDs go to the UI: Locator -> Devices -> By Model Name -> <enter any model> -> Attributes tab.' type: str required: true value: description: - Attribute value. Empty strings should be C("") or C(null). type: str required: true ''' EXAMPLES = r''' - name: Enforce maintenance mode for modelxyz01 with a note about why community.general.spectrum_model_attrs: url: "http://oneclick.url.com" username: "{{ oneclick_username }}" password: "{{ oneclick_password }}" name: "modelxyz01" type: "Host_Device" validate_certs: true attributes: - name: "isManaged" value: "false" - name: "Notes" value: "MM set on {{ ansible_date_time.iso8601 }} via CO {{ CO }} by {{ tower_user_name | default(ansible_user_id) }}" delegate_to: localhost register: spectrum_model_attrs_status ''' RETURN = r''' msg: description: Informational message on the job result. type: str returned: always sample: 'Success' changed_attrs: description: Dictionary of changed name or hex IDs (whichever was specified) to their new corresponding values. type: dict returned: always sample: { "Notes": "MM set on 2021-02-03T22:04:02Z via CO CO9999 by tgates", "isManaged": "true" } ''' from ansible.module_utils.basic import AnsibleModule from ansible.module_utils.common.text.converters import to_native from ansible.module_utils.urls import fetch_url from ansible.module_utils.six.moves.urllib.parse import quote import json import re import xml.etree.ElementTree as ET class spectrum_model_attrs: def __init__(self, module): self.module = module self.url = module.params['url'] # If the user did not define a full path to the restul space in url: # params, add what we believe it to be. if not re.search('\\/.+', self.url.split('://')[1]): self.url = "%s/spectrum/restful" % self.url.rstrip('/') # Align these with what is defined in OneClick's UI under: # Locator -> Devices -> By Model Name -> <enter any model> -> # Attributes tab. self.attr_map = dict(App_Manufacturer=hex(0x230683), CollectionsModelNameString=hex(0x12adb), Condition=hex(0x1000a), Criticality=hex(0x1290c), DeviceType=hex(0x23000e), isManaged=hex(0x1295d), Model_Class=hex(0x11ee8), Model_Handle=hex(0x129fa), Model_Name=hex(0x1006e), Modeltype_Handle=hex(0x10001), Modeltype_Name=hex(0x10000), Network_Address=hex(0x12d7f), Notes=hex(0x11564), ServiceDesk_Asset_ID=hex(0x12db9), TopologyModelNameString=hex(0x129e7), sysDescr=hex(0x10052), sysName=hex(0x10b5b), Vendor_Name=hex(0x11570), Description=hex(0x230017)) self.search_qualifiers = [ "and", "or", "not", "greater-than", "greater-than-or-equals", "less-than", "less-than-or-equals", "equals", "equals-ignore-case", "does-not-equal", "does-not-equal-ignore-case", "has-prefix", "does-not-have-prefix", "has-prefix-ignore-case", "does-not-have-prefix-ignore-case", "has-substring", "does-not-have-substring", "has-substring-ignore-case", "does-not-have-substring-ignore-case", "has-suffix", "does-not-have-suffix", "has-suffix-ignore-case", "does-not-have-suffix-ignore-case", "has-pcre", "has-pcre-ignore-case", "has-wildcard", "has-wildcard-ignore-case", "is-derived-from", "not-is-derived-from"] self.resp_namespace = dict(ca="http://www.ca.com/spectrum/restful/schema/response") self.result = dict(msg="", changed_attrs=dict()) self.success_msg = "Success" def build_url(self, path): """ Build a sane Spectrum restful API URL :param path: The path to append to the restful base :type path: str :returns: Complete restful API URL :rtype: str """ return "%s/%s" % (self.url.rstrip('/'), path.lstrip('/')) def attr_id(self, name): """ Get attribute hex ID :param name: The name of the attribute to retrieve the hex ID for :type name: str :returns: Translated hex ID of name, or None if no translation found :rtype: str or None """ try: return self.attr_map[name] except KeyError: return None def attr_name(self, _id): """ Get attribute name from hex ID :param _id: The hex ID to lookup a name for :type _id: str :returns: Translated name of hex ID, or None if no translation found :rtype: str or None """ for name, m_id in list(self.attr_map.items()): if _id == m_id: return name return None def urlencode(self, string): """ URL Encode a string :param: string: The string to URL encode :type string: str :returns: URL encode version of supplied string :rtype: str """ return quote(string, "<>%-_.!*'():?#/@&+,;=") def update_model(self, model_handle, attrs): """ Update a model's attributes :param model_handle: The model's handle ID :type model_handle: str :param attrs: Model's attributes to update. {'<name/id>': '<attr>'} :type attrs: dict :returns: Nothing; exits on error or updates self.results :rtype: None """ # Build the update URL update_url = self.build_url("/model/%s?" % model_handle) for name, val in list(attrs.items()): if val is None: # None values should be converted to empty strings val = "" val = self.urlencode(str(val)) if not update_url.endswith('?'): update_url += "&" update_url += "attr=%s&val=%s" % (self.attr_id(name) or name, val) # POST to /model to update the attributes, or fail. resp, info = fetch_url(self.module, update_url, method="PUT", headers={"Content-Type": "application/json", "Accept": "application/json"}, use_proxy=self.module.params['use_proxy']) status_code = info["status"] if status_code >= 400: body = info['body'] else: body = "" if resp is None else resp.read() if status_code != 200: self.result['msg'] = "HTTP PUT error %s: %s: %s" % (status_code, update_url, body) self.module.fail_json(**self.result) # Load and parse the JSON response and either fail or set results. json_resp = json.loads(body) """ Example success response: {'model-update-response-list':{'model-responses':{'model':{'@error':'Success','@mh':'0x1010e76','attribute':{'@error':'Success','@id':'0x1295d'}}}}}" Example failure response: {'model-update-response-list': {'model-responses': {'model': {'@error': 'PartialFailure', '@mh': '0x1010e76', 'attribute': {'@error-message': 'brn0vlappua001: You do not have permission to set attribute Network_Address for this model.', '@error': 'Error', '@id': '0x12d7f'}}}}} """ # noqa model_resp = json_resp['model-update-response-list']['model-responses']['model'] if model_resp['@error'] != "Success": # I'm not 100% confident on the expected failure structure so just # dump all of ['attribute']. self.result['msg'] = str(model_resp['attribute']) self.module.fail_json(**self.result) # Should be OK if we get to here, set results. self.result['msg'] = self.success_msg self.result['changed_attrs'].update(attrs) self.result['changed'] = True def find_model(self, search_criteria, ret_attrs=None): """ Search for a model in /models :param search_criteria: The XML <rs:search-criteria> :type search_criteria: str :param ret_attrs: List of attributes by name or ID to return back (default is Model_Handle) :type ret_attrs: list returns: Dictionary mapping of ret_attrs to values: {ret_attr: ret_val} rtype: dict """ # If no return attributes were asked for, return Model_Handle. if ret_attrs is None: ret_attrs = ['Model_Handle'] # Set the XML <rs:requested-attribute id=<id>> tags. If no hex ID # is found for the name, assume it is already in hex. {name: hex ID} rqstd_attrs = "" for ra in ret_attrs: _id = self.attr_id(ra) or ra rqstd_attrs += '<rs:requested-attribute id="%s" />' % (self.attr_id(ra) or ra) # Build the complete XML search query for HTTP POST. xml = """<?xml version="1.0" encoding="UTF-8"?> <rs:model-request throttlesize="5" xmlns:rs="http://www.ca.com/spectrum/restful/schema/request" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.ca.com/spectrum/restful/schema/request ../../../xsd/Request.xsd"> <rs:target-models> <rs:models-search> <rs:search-criteria xmlns="http://www.ca.com/spectrum/restful/schema/filter"> {0} </rs:search-criteria> </rs:models-search> </rs:target-models> {1} </rs:model-request> """.format(search_criteria, rqstd_attrs) # POST to /models and fail on errors. url = self.build_url("/models") resp, info = fetch_url(self.module, url, data=xml, method="POST", use_proxy=self.module.params['use_proxy'], headers={"Content-Type": "application/xml", "Accept": "application/xml"}) status_code = info["status"] if status_code >= 400: body = info['body'] else: body = "" if resp is None else resp.read() if status_code != 200: self.result['msg'] = "HTTP POST error %s: %s: %s" % (status_code, url, body) self.module.fail_json(**self.result) # Parse through the XML response and fail on any detected errors. root = ET.fromstring(body) total_models = int(root.attrib['total-models']) error = root.attrib['error'] model_responses = root.find('ca:model-responses', self.resp_namespace) if total_models < 1: self.result['msg'] = "No models found matching search criteria `%s'" % search_criteria self.module.fail_json(**self.result) elif total_models > 1: self.result['msg'] = "More than one model found (%s): `%s'" % (total_models, ET.tostring(model_responses, encoding='unicode')) self.module.fail_json(**self.result) if error != "EndOfResults": self.result['msg'] = "Unexpected search response `%s': %s" % (error, ET.tostring(model_responses, encoding='unicode')) self.module.fail_json(**self.result) model = model_responses.find('ca:model', self.resp_namespace) attrs = model.findall('ca:attribute', self.resp_namespace) if not attrs: self.result['msg'] = "No attributes returned." self.module.fail_json(**self.result) # XML response should be successful. Iterate and set each returned # attribute ID/name and value for return. ret = dict() for attr in attrs: attr_id = attr.get('id') attr_name = self.attr_name(attr_id) # Note: all values except empty strings (None) are strings only! attr_val = attr.text key = attr_name if attr_name in ret_attrs else attr_id ret[key] = attr_val ret_attrs.remove(key) return ret def find_model_by_name_type(self, mname, mtype, ret_attrs=None): """ Find a model by name and type :param mname: Model name :type mname: str :param mtype: Model type :type mtype: str :param ret_attrs: List of attributes by name or ID to return back (default is Model_Handle) :type ret_attrs: list returns: find_model(): Dictionary mapping of ret_attrs to values: {ret_attr: ret_val} rtype: dict """ # If no return attributes were asked for, return Model_Handle. if ret_attrs is None: ret_attrs = ['Model_Handle'] """This is basically as follows: <filtered-models> <and> <equals> <attribute id=...> <value>...</value> </attribute> </equals> <equals> <attribute...> </equals> </and> </filtered-models> """ # Parent filter tag filtered_models = ET.Element('filtered-models') # Logically and _and = ET.SubElement(filtered_models, 'and') # Model Name MN_equals = ET.SubElement(_and, 'equals') Model_Name = ET.SubElement(MN_equals, 'attribute', {'id': self.attr_map['Model_Name']}) MN_value = ET.SubElement(Model_Name, 'value') MN_value.text = mname # Model Type Name MTN_equals = ET.SubElement(_and, 'equals') Modeltype_Name = ET.SubElement(MTN_equals, 'attribute', {'id': self.attr_map['Modeltype_Name']}) MTN_value = ET.SubElement(Modeltype_Name, 'value') MTN_value.text = mtype return self.find_model(ET.tostring(filtered_models, encoding='unicode'), ret_attrs) def ensure_model_attrs(self): # Get a list of all requested attribute names/IDs plus Model_Handle and # use them to query the values currently set. Store finding in a # dictionary. req_attrs = [] for attr in self.module.params['attributes']: req_attrs.append(attr['name']) if 'Model_Handle' not in req_attrs: req_attrs.append('Model_Handle') # Survey attributes currently set and store in a dict. cur_attrs = self.find_model_by_name_type(self.module.params['name'], self.module.params['type'], req_attrs) # Iterate through the requested attributes names/IDs values pair and # compare with those currently set. If different, attempt to change. Model_Handle = cur_attrs.pop("Model_Handle") for attr in self.module.params['attributes']: req_name = attr['name'] req_val = attr['value'] if req_val == "": # The API will return None on empty string req_val = None if cur_attrs[req_name] != req_val: if self.module.check_mode: self.result['changed_attrs'][req_name] = req_val self.result['msg'] = self.success_msg self.result['changed'] = True continue resp = self.update_model(Model_Handle, {req_name: req_val}) self.module.exit_json(**self.result) def run_module(): argument_spec = dict( url=dict(type='str', required=True), url_username=dict(type='str', required=True, aliases=['username']), url_password=dict(type='str', required=True, aliases=['password'], no_log=True), validate_certs=dict(type='bool', default=True), use_proxy=dict(type='bool', default=True), name=dict(type='str', required=True), type=dict(type='str', required=True), attributes=dict(type='list', required=True, elements='dict', options=dict( name=dict(type='str', required=True), value=dict(type='str', required=True) )), ) module = AnsibleModule( supports_check_mode=True, argument_spec=argument_spec, ) try: sm = spectrum_model_attrs(module) sm.ensure_model_attrs() except Exception as e: module.fail_json(msg="Failed to ensure attribute(s) on `%s' with " "exception: %s" % (module.params['name'], to_native(e))) def main(): run_module() if __name__ == "__main__": main()