1
0
Fork 0
mirror of https://github.com/ansible-collections/community.general.git synced 2024-09-14 20:13:21 +02:00

Remote management modules for OCAPI-based devices. (#5754)

* Remote management modules for OCAPI-based devices.

    Open Composable API (OCAPI) is a REST-based API designed for data center composability. For more information, see https://www.opencompute.org/documents/open-composable-api-for-ocp-2019-06-24-pdf

    This PR introduces ocapi_command and ocapi_info modules.  These are based on the existing redfish_command and redfish_info modules and follow similar patterns.  This initial implementation includes support for the folowing operations:

    - Indicator LED toggling
    - Power state toggling
    - Enclosure reset (reboot)
    - Firmware upload
    - Firmware update
    - Firmware activate
    - Job deletion
    - Job status

    These modules have been tested against Western Digital OpenFlex(tm) Data24 storage enclosures. API reference is at https://documents.westerndigital.com/content/dam/doc-library/en_us/assets/public/western-digital/product/platforms/openflex/reference-architecture-open-composable-api.pdf

* Fix licensing issue for ocapi_utils.py

* PR Feedback

* Apply suggestions from code review

Co-authored-by: Felix Fontein <felix@fontein.de>

* Update plugins/module_utils/ocapi_utils.py

Co-authored-by: Felix Fontein <felix@fontein.de>

* Update plugins/modules/ocapi_info.py

Co-authored-by: Felix Fontein <felix@fontein.de>

* Apply suggestions from code review

Co-authored-by: Felix Fontein <felix@fontein.de>

* PR Feedback

Use six module for urlparse

* Apply suggestions from code review

Documentation fixes.

Co-authored-by: Felix Fontein <felix@fontein.de>

* Fix sanity test line too long error.

Co-authored-by: Felix Fontein <felix@fontein.de>
This commit is contained in:
Mike Moerk 2023-01-22 09:10:36 -07:00 committed by GitHub
parent c4b18361b9
commit 59a9d34250
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 1927 additions and 0 deletions

4
.github/BOTMETA.yml vendored
View file

@ -828,6 +828,10 @@ files:
maintainers: shane-walker xcambar maintainers: shane-walker xcambar
$modules/nsupdate.py: $modules/nsupdate.py:
maintainers: nerzhul maintainers: nerzhul
$modules/ocapi_command.py:
maintainers: $team_wdc
$modules/ocapi_info.py:
maintainers: $team_wdc
$modules/oci_vcn.py: $modules/oci_vcn.py:
maintainers: $team_oracle rohitChaware maintainers: $team_oracle rohitChaware
$modules/odbc.py: $modules/odbc.py:

View file

@ -0,0 +1,502 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2022 Western Digital Corporation
# GNU General Public License v3.0+ (see LICENSE or https://www.gnu.org/licenses/gpl-3.0.txt)
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
# SPDX-License-Identifier: GPL-3.0-or-later
from __future__ import absolute_import, division, print_function
__metaclass__ = type
import json
import os
import uuid
from ansible.module_utils.urls import open_url
from ansible.module_utils.common.text.converters import to_native
from ansible.module_utils.common.text.converters import to_text
from ansible.module_utils.six.moves.urllib.error import URLError, HTTPError
from ansible.module_utils.six.moves.urllib.parse import urlparse
GET_HEADERS = {'accept': 'application/json'}
PUT_HEADERS = {'content-type': 'application/json', 'accept': 'application/json'}
POST_HEADERS = {'content-type': 'application/json', 'accept': 'application/json'}
DELETE_HEADERS = {'accept': 'application/json'}
HEALTH_OK = 5
class OcapiUtils(object):
def __init__(self, creds, base_uri, proxy_slot_number, timeout, module):
self.root_uri = base_uri
self.proxy_slot_number = proxy_slot_number
self.creds = creds
self.timeout = timeout
self.module = module
def _auth_params(self):
"""
Return tuple of required authentication params based on the username and password.
:return: tuple of username, password
"""
username = self.creds['user']
password = self.creds['pswd']
force_basic_auth = True
return username, password, force_basic_auth
def get_request(self, uri):
req_headers = dict(GET_HEADERS)
username, password, basic_auth = self._auth_params()
try:
resp = open_url(uri, method="GET", headers=req_headers,
url_username=username, url_password=password,
force_basic_auth=basic_auth, validate_certs=False,
follow_redirects='all',
use_proxy=True, timeout=self.timeout)
data = json.loads(to_native(resp.read()))
headers = dict((k.lower(), v) for (k, v) in resp.info().items())
except HTTPError as e:
return {'ret': False,
'msg': "HTTP Error %s on GET request to '%s'"
% (e.code, uri),
'status': e.code}
except URLError as e:
return {'ret': False, 'msg': "URL Error on GET request to '%s': '%s'"
% (uri, e.reason)}
# Almost all errors should be caught above, but just in case
except Exception as e:
return {'ret': False,
'msg': "Failed GET request to '%s': '%s'" % (uri, to_text(e))}
return {'ret': True, 'data': data, 'headers': headers}
def delete_request(self, uri, etag=None):
req_headers = dict(DELETE_HEADERS)
if etag is not None:
req_headers['If-Match'] = etag
username, password, basic_auth = self._auth_params()
try:
resp = open_url(uri, method="DELETE", headers=req_headers,
url_username=username, url_password=password,
force_basic_auth=basic_auth, validate_certs=False,
follow_redirects='all',
use_proxy=True, timeout=self.timeout)
if resp.status != 204:
data = json.loads(to_native(resp.read()))
else:
data = ""
headers = dict((k.lower(), v) for (k, v) in resp.info().items())
except HTTPError as e:
return {'ret': False,
'msg': "HTTP Error %s on DELETE request to '%s'"
% (e.code, uri),
'status': e.code}
except URLError as e:
return {'ret': False, 'msg': "URL Error on DELETE request to '%s': '%s'"
% (uri, e.reason)}
# Almost all errors should be caught above, but just in case
except Exception as e:
return {'ret': False,
'msg': "Failed DELETE request to '%s': '%s'" % (uri, to_text(e))}
return {'ret': True, 'data': data, 'headers': headers}
def put_request(self, uri, payload, etag=None):
req_headers = dict(PUT_HEADERS)
if etag is not None:
req_headers['If-Match'] = etag
username, password, basic_auth = self._auth_params()
try:
resp = open_url(uri, data=json.dumps(payload),
headers=req_headers, method="PUT",
url_username=username, url_password=password,
force_basic_auth=basic_auth, validate_certs=False,
follow_redirects='all',
use_proxy=True, timeout=self.timeout)
headers = dict((k.lower(), v) for (k, v) in resp.info().items())
except HTTPError as e:
return {'ret': False,
'msg': "HTTP Error %s on PUT request to '%s'"
% (e.code, uri),
'status': e.code}
except URLError as e:
return {'ret': False, 'msg': "URL Error on PUT request to '%s': '%s'"
% (uri, e.reason)}
# Almost all errors should be caught above, but just in case
except Exception as e:
return {'ret': False,
'msg': "Failed PUT request to '%s': '%s'" % (uri, to_text(e))}
return {'ret': True, 'headers': headers, 'resp': resp}
def post_request(self, uri, payload, content_type="application/json", timeout=None):
req_headers = dict(POST_HEADERS)
if content_type != "application/json":
req_headers["content-type"] = content_type
username, password, basic_auth = self._auth_params()
if content_type == "application/json":
request_data = json.dumps(payload)
else:
request_data = payload
try:
resp = open_url(uri, data=request_data,
headers=req_headers, method="POST",
url_username=username, url_password=password,
force_basic_auth=basic_auth, validate_certs=False,
follow_redirects='all',
use_proxy=True, timeout=self.timeout if timeout is None else timeout)
headers = dict((k.lower(), v) for (k, v) in resp.info().items())
except HTTPError as e:
return {'ret': False,
'msg': "HTTP Error %s on POST request to '%s'"
% (e.code, uri),
'status': e.code}
except URLError as e:
return {'ret': False, 'msg': "URL Error on POST request to '%s': '%s'"
% (uri, e.reason)}
# Almost all errors should be caught above, but just in case
except Exception as e:
return {'ret': False,
'msg': "Failed POST request to '%s': '%s'" % (uri, to_text(e))}
return {'ret': True, 'headers': headers, 'resp': resp}
def get_uri_with_slot_number_query_param(self, uri):
"""Return the URI with proxy slot number added as a query param, if there is one.
If a proxy slot number is provided, to access it, we must append it as a query parameter.
This method returns the given URI with the slotnumber query param added, if there is one.
If there is not a proxy slot number, it just returns the URI as it was passed in.
"""
if self.proxy_slot_number is not None:
parsed_url = urlparse(uri)
return parsed_url._replace(query="slotnumber=" + str(self.proxy_slot_number)).geturl()
else:
return uri
def manage_system_power(self, command):
"""Process a command to manage the system power.
:param str command: The Ansible command being processed.
"""
if command == "PowerGracefulRestart":
resource_uri = self.root_uri
resource_uri = self.get_uri_with_slot_number_query_param(resource_uri)
# Get the resource so that we have the Etag
response = self.get_request(resource_uri)
if 'etag' not in response['headers']:
return {'ret': False, 'msg': 'Etag not found in response.'}
etag = response['headers']['etag']
if response['ret'] is False:
return response
# Issue the PUT to do the reboot (unless we are in check mode)
if self.module.check_mode:
return {
'ret': True,
'changed': True,
'msg': 'Update not performed in check mode.'
}
payload = {'Reboot': True}
response = self.put_request(resource_uri, payload, etag)
if response['ret'] is False:
return response
elif command.startswith("PowerMode"):
return self.manage_power_mode(command)
else:
return {'ret': False, 'msg': 'Invalid command: ' + command}
return {'ret': True}
def manage_chassis_indicator_led(self, command):
"""Process a command to manage the chassis indicator LED.
:param string command: The Ansible command being processed.
"""
return self.manage_indicator_led(command, self.root_uri)
def manage_indicator_led(self, command, resource_uri=None):
"""Process a command to manage an indicator LED.
:param string command: The Ansible command being processed.
:param string resource_uri: URI of the resource whose indicator LED is being managed.
"""
key = "IndicatorLED"
if resource_uri is None:
resource_uri = self.root_uri
resource_uri = self.get_uri_with_slot_number_query_param(resource_uri)
payloads = {
'IndicatorLedOn': {
'ID': 2
},
'IndicatorLedOff': {
'ID': 4
}
}
response = self.get_request(resource_uri)
if 'etag' not in response['headers']:
return {'ret': False, 'msg': 'Etag not found in response.'}
etag = response['headers']['etag']
if response['ret'] is False:
return response
data = response['data']
if key not in data:
return {'ret': False, 'msg': "Key %s not found" % key}
if 'ID' not in data[key]:
return {'ret': False, 'msg': 'IndicatorLED for resource has no ID.'}
if command in payloads.keys():
# See if the LED is already set as requested.
current_led_status = data[key]['ID']
if current_led_status == payloads[command]['ID']:
return {'ret': True, 'changed': False}
# Set the LED (unless we are in check mode)
if self.module.check_mode:
return {
'ret': True,
'changed': True,
'msg': 'Update not performed in check mode.'
}
payload = {'IndicatorLED': payloads[command]}
response = self.put_request(resource_uri, payload, etag)
if response['ret'] is False:
return response
else:
return {'ret': False, 'msg': 'Invalid command'}
return {'ret': True}
def manage_power_mode(self, command):
key = "PowerState"
resource_uri = self.get_uri_with_slot_number_query_param(self.root_uri)
payloads = {
"PowerModeNormal": 2,
"PowerModeLow": 4
}
response = self.get_request(resource_uri)
if 'etag' not in response['headers']:
return {'ret': False, 'msg': 'Etag not found in response.'}
etag = response['headers']['etag']
if response['ret'] is False:
return response
data = response['data']
if key not in data:
return {'ret': False, 'msg': "Key %s not found" % key}
if 'ID' not in data[key]:
return {'ret': False, 'msg': 'PowerState for resource has no ID.'}
if command in payloads.keys():
# See if the PowerState is already set as requested.
current_power_state = data[key]['ID']
if current_power_state == payloads[command]:
return {'ret': True, 'changed': False}
# Set the Power State (unless we are in check mode)
if self.module.check_mode:
return {
'ret': True,
'changed': True,
'msg': 'Update not performed in check mode.'
}
payload = {'PowerState': {"ID": payloads[command]}}
response = self.put_request(resource_uri, payload, etag)
if response['ret'] is False:
return response
else:
return {'ret': False, 'msg': 'Invalid command: ' + command}
return {'ret': True}
def prepare_multipart_firmware_upload(self, filename):
"""Prepare a multipart/form-data body for OCAPI firmware upload.
:arg filename: The name of the file to upload.
:returns: tuple of (content_type, body) where ``content_type`` is
the ``multipart/form-data`` ``Content-Type`` header including
``boundary`` and ``body`` is the prepared bytestring body
Prepares the body to include "FirmwareFile" field with the contents of the file.
Because some OCAPI targets do not support Base-64 encoding for multipart/form-data,
this method sends the file as binary.
"""
boundary = str(uuid.uuid4()) # Generate a random boundary
body = "--" + boundary + '\r\n'
body += 'Content-Disposition: form-data; name="FirmwareFile"; filename="%s"\r\n' % to_native(os.path.basename(filename))
body += 'Content-Type: application/octet-stream\r\n\r\n'
body_bytes = bytearray(body, 'utf-8')
with open(filename, 'rb') as f:
body_bytes += f.read()
body_bytes += bytearray("\r\n--%s--" % boundary, 'utf-8')
return ("multipart/form-data; boundary=%s" % boundary,
body_bytes)
def upload_firmware_image(self, update_image_path):
"""Perform Firmware Upload to the OCAPI storage device.
:param str update_image_path: The path/filename of the firmware image, on the local filesystem.
"""
if not (os.path.exists(update_image_path) and os.path.isfile(update_image_path)):
return {'ret': False, 'msg': 'File does not exist.'}
url = self.root_uri + "OperatingSystem"
url = self.get_uri_with_slot_number_query_param(url)
content_type, b_form_data = self.prepare_multipart_firmware_upload(update_image_path)
# Post the firmware (unless we are in check mode)
if self.module.check_mode:
return {
'ret': True,
'changed': True,
'msg': 'Update not performed in check mode.'
}
result = self.post_request(url, b_form_data, content_type=content_type, timeout=300)
if result['ret'] is False:
return result
return {'ret': True}
def update_firmware_image(self):
"""Perform a Firmware Update on the OCAPI storage device."""
resource_uri = self.root_uri
resource_uri = self.get_uri_with_slot_number_query_param(resource_uri)
# We have to do a GET to obtain the Etag. It's required on the PUT.
response = self.get_request(resource_uri)
if response['ret'] is False:
return response
if 'etag' not in response['headers']:
return {'ret': False, 'msg': 'Etag not found in response.'}
etag = response['headers']['etag']
# Issue the PUT (unless we are in check mode)
if self.module.check_mode:
return {
'ret': True,
'changed': True,
'msg': 'Update not performed in check mode.'
}
payload = {'FirmwareUpdate': True}
response = self.put_request(resource_uri, payload, etag)
if response['ret'] is False:
return response
return {'ret': True, 'jobUri': response["headers"]["location"]}
def activate_firmware_image(self):
"""Perform a Firmware Activate on the OCAPI storage device."""
resource_uri = self.root_uri
resource_uri = self.get_uri_with_slot_number_query_param(resource_uri)
# We have to do a GET to obtain the Etag. It's required on the PUT.
response = self.get_request(resource_uri)
if 'etag' not in response['headers']:
return {'ret': False, 'msg': 'Etag not found in response.'}
etag = response['headers']['etag']
if response['ret'] is False:
return response
# Issue the PUT (unless we are in check mode)
if self.module.check_mode:
return {
'ret': True,
'changed': True,
'msg': 'Update not performed in check mode.'
}
payload = {'FirmwareActivate': True}
response = self.put_request(resource_uri, payload, etag)
if response['ret'] is False:
return response
return {'ret': True, 'jobUri': response["headers"]["location"]}
def get_job_status(self, job_uri):
"""Get the status of a job.
:param str job_uri: The URI of the job's status monitor.
"""
job_uri = self.get_uri_with_slot_number_query_param(job_uri)
response = self.get_request(job_uri)
if response['ret'] is False:
if response.get('status') == 404:
# Job not found -- assume 0%
return {
"ret": True,
"percentComplete": 0,
"operationStatus": "Not Available",
"operationStatusId": 1,
"operationHealth": None,
"operationHealthId": None,
"details": "Job does not exist.",
"jobExists": False
}
else:
return response
details = response["data"]["Status"].get("Details")
if type(details) is str:
details = [details]
health_list = response["data"]["Status"]["Health"]
return_value = {
"ret": True,
"percentComplete": response["data"]["PercentComplete"],
"operationStatus": response["data"]["Status"]["State"]["Name"],
"operationStatusId": response["data"]["Status"]["State"]["ID"],
"operationHealth": health_list[0]["Name"] if len(health_list) > 0 else None,
"operationHealthId": health_list[0]["ID"] if len(health_list) > 0 else None,
"details": details,
"jobExists": True
}
return return_value
def delete_job(self, job_uri):
"""Delete the OCAPI job referenced by the specified job_uri."""
job_uri = self.get_uri_with_slot_number_query_param(job_uri)
# We have to do a GET to obtain the Etag. It's required on the DELETE.
response = self.get_request(job_uri)
if response['ret'] is True:
if 'etag' not in response['headers']:
return {'ret': False, 'msg': 'Etag not found in response.'}
else:
etag = response['headers']['etag']
if response['data']['PercentComplete'] != 100:
return {
'ret': False,
'changed': False,
'msg': 'Cannot delete job because it is in progress.'
}
if response['ret'] is False:
if response['status'] == 404:
return {
'ret': True,
'changed': False,
'msg': 'Job already deleted.'
}
return response
if self.module.check_mode:
return {
'ret': True,
'changed': True,
'msg': 'Update not performed in check mode.'
}
# Do the DELETE (unless we are in check mode)
response = self.delete_request(job_uri, etag)
if response['ret'] is False:
if response['status'] == 404:
return {
'ret': True,
'changed': False
}
elif response['status'] == 409:
return {
'ret': False,
'changed': False,
'msg': 'Cannot delete job because it is in progress.'
}
return response
return {
'ret': True,
'changed': True
}

View file

@ -0,0 +1,267 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# Copyright (c) 2022 Western Digital Corporation
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
# SPDX-License-Identifier: GPL-3.0-or-later
from __future__ import absolute_import, division, print_function
__metaclass__ = type
DOCUMENTATION = '''
---
module: ocapi_command
version_added: 6.3.0
short_description: Manages Out-Of-Band controllers using Open Composable API (OCAPI)
description:
- Builds OCAPI URIs locally and sends them to remote OOB controllers to
perform an action.
- Manages OOB controller such as Indicator LED, Reboot, Power Mode, Firmware Update.
options:
category:
required: true
description:
- Category to execute on OOB controller.
type: str
command:
required: true
description:
- Command to execute on OOB controller.
type: str
baseuri:
required: true
description:
- Base URI of OOB controller.
type: str
proxy_slot_number:
description: For proxied inband requests, the slot number of the IOM. Only applies if I(baseuri) is a proxy server.
type: int
update_image_path:
required: false
description:
- For C(FWUpload), the path on the local filesystem of the firmware update image.
type: str
job_name:
required: false
description:
- For C(DeleteJob) command, the name of the job to delete.
type: str
username:
required: true
description:
- Username for authenticating to OOB controller.
type: str
password:
required: true
description:
- Password for authenticating to OOB controller.
type: str
timeout:
description:
- Timeout in seconds for URL requests to OOB controller.
default: 10
type: int
author: "Mike Moerk (@mikemoerk)"
'''
EXAMPLES = '''
- name: Set the power state to low
community.general.ocapi_command:
category: Chassis
command: PowerModeLow
baseuri: "{{ baseuri }}"
username: "{{ username }}"
password: "{{ password }}"
- name: Set the power state to normal
community.general.ocapi_command:
category: Chassis
command: PowerModeNormal
baseuri: "{{ baseuri }}"
username: "{{ username }}"
password: "{{ password }}"
- name: Set chassis indicator LED to on
community.general.ocapi_command:
category: Chassis
command: IndicatorLedOn
baseuri: "{{ baseuri }}"
proxy_slot_number: 2
username: "{{ username }}"
password: "{{ password }}"
- name: Set chassis indicator LED to off
community.general.ocapi_command:
category: Chassis
command: IndicatorLedOff
baseuri: "{{ baseuri }}"
proxy_slot_number: 2
username: "{{ username }}"
password: "{{ password }}"
- name: Reset Enclosure
community.general.ocapi_command:
category: Systems
command: PowerGracefulRestart
baseuri: "{{ baseuri }}"
proxy_slot_number: 2
username: "{{ username }}"
password: "{{ password }}"
- name: Firmware Upload
community.general.ocapi_command:
category: Update
command: FWUpload
baseuri: "iom1.wdc.com"
proxy_slot_number: 2
username: "{{ username }}"
password: "{{ password }}"
update_image_path: "/path/to/firmware.tar.gz"
- name: Firmware Update
community.general.ocapi_command:
category: Update
command: FWUpdate
baseuri: "iom1.wdc.com"
proxy_slot_number: 2
username: "{{ username }}"
password: "{{ password }}"
- name: Firmware Activate
community.general.ocapi_command:
category: Update
command: FWActivate
baseuri: "iom1.wdc.com"
proxy_slot_number: 2
username: "{{ username }}"
password: "{{ password }}"
- name: Delete Job
community.general.ocapi_command:
category: Jobs
command: DeleteJob
job_name: FirmwareUpdate
baseuri: "{{ baseuri }}"
proxy_slot_number: 2
username: "{{ username }}"
password: "{{ password }}"
'''
RETURN = '''
msg:
description: Message with action result or error description.
returned: always
type: str
sample: "Action was successful"
jobUri:
description: URI to use to monitor status of the operation. Returned for async commands such as Firmware Update, Firmware Activate.
returned: when supported
type: str
sample: "https://ioma.wdc.com/Storage/Devices/openflex-data24-usalp03020qb0003/Jobs/FirmwareUpdate/"
operationStatusId:
description: OCAPI State ID (see OCAPI documentation for possible values).
returned: when supported
type: int
sample: 2
'''
from ansible.module_utils.basic import AnsibleModule
from ansible_collections.community.general.plugins.module_utils.ocapi_utils import OcapiUtils
from ansible.module_utils.common.text.converters import to_native
from ansible.module_utils.six.moves.urllib.parse import quote_plus, urljoin
# More will be added as module features are expanded
CATEGORY_COMMANDS_ALL = {
"Chassis": ["IndicatorLedOn", "IndicatorLedOff", "PowerModeLow", "PowerModeNormal"],
"Systems": ["PowerGracefulRestart"],
"Update": ["FWUpload", "FWUpdate", "FWActivate"],
"Jobs": ["DeleteJob"]
}
def main():
result = {}
module = AnsibleModule(
argument_spec=dict(
category=dict(required=True),
command=dict(required=True, type='str'),
job_name=dict(type='str'),
baseuri=dict(required=True, type='str'),
proxy_slot_number=dict(type='int'),
update_image_path=dict(type='str'),
username=dict(required=True),
password=dict(required=True, no_log=True),
timeout=dict(type='int', default=10)
),
supports_check_mode=True
)
category = module.params['category']
command = module.params['command']
# admin credentials used for authentication
creds = {
'user': module.params['username'],
'pswd': module.params['password']
}
# timeout
timeout = module.params['timeout']
base_uri = "https://" + module.params["baseuri"]
proxy_slot_number = module.params.get("proxy_slot_number")
ocapi_utils = OcapiUtils(creds, base_uri, proxy_slot_number, timeout, module)
# Check that Category is valid
if category not in CATEGORY_COMMANDS_ALL:
module.fail_json(msg=to_native("Invalid Category '%s'. Valid Categories = %s" % (category, list(CATEGORY_COMMANDS_ALL.keys()))))
# Check that the command is valid
if command not in CATEGORY_COMMANDS_ALL[category]:
module.fail_json(msg=to_native("Invalid Command '%s'. Valid Commands = %s" % (command, CATEGORY_COMMANDS_ALL[category])))
# Organize by Categories / Commands
if category == "Chassis":
if command.startswith("IndicatorLed"):
result = ocapi_utils.manage_chassis_indicator_led(command)
elif command.startswith("PowerMode"):
result = ocapi_utils.manage_system_power(command)
elif category == "Systems":
if command.startswith("Power"):
result = ocapi_utils.manage_system_power(command)
elif category == "Update":
if command == "FWUpload":
update_image_path = module.params.get("update_image_path")
if update_image_path is None:
module.fail_json(msg=to_native("Missing update_image_path."))
result = ocapi_utils.upload_firmware_image(update_image_path)
elif command == "FWUpdate":
result = ocapi_utils.update_firmware_image()
elif command == "FWActivate":
result = ocapi_utils.activate_firmware_image()
elif category == "Jobs":
if command == "DeleteJob":
job_name = module.params.get("job_name")
if job_name is None:
module.fail_json("Missing job_name")
job_uri = urljoin(base_uri, "Jobs/" + job_name)
result = ocapi_utils.delete_job(job_uri)
if result['ret'] is False:
module.fail_json(msg=to_native(result['msg']))
else:
del result['ret']
changed = result.get('changed', True)
session = result.get('session', dict())
kwargs = {
"changed": changed,
"session": session,
"msg": "Action was successful." if not module.check_mode else result.get(
"msg", "No action performed in check mode."
)
}
result_keys = [result_key for result_key in result if result_key not in kwargs]
for result_key in result_keys:
kwargs[result_key] = result[result_key]
module.exit_json(**kwargs)
if __name__ == '__main__':
main()

View file

@ -0,0 +1,221 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# Copyright (c) 2022 Western Digital Corporation
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
# SPDX-License-Identifier: GPL-3.0-or-later
from __future__ import absolute_import, division, print_function
__metaclass__ = type
DOCUMENTATION = '''
---
module: ocapi_info
version_added: 6.3.0
short_description: Manages Out-Of-Band controllers using Open Composable API (OCAPI)
description:
- Builds OCAPI URIs locally and sends them to remote OOB controllers to
get information back.
options:
category:
required: true
description:
- Category to execute on OOB controller.
type: str
command:
required: true
description:
- Command to execute on OOB controller.
type: str
baseuri:
required: true
description:
- Base URI of OOB controller.
type: str
proxy_slot_number:
description: For proxied inband requests, the slot number of the IOM. Only applies if I(baseuri) is a proxy server.
type: int
username:
required: true
description:
- Username for authenticating to OOB controller.
type: str
password:
required: true
description:
- Password for authenticating to OOB controller.
type: str
timeout:
description:
- Timeout in seconds for URL requests to OOB controller.
default: 10
type: int
job_name:
description:
- Name of job for fetching status.
type: str
author: "Mike Moerk (@mikemoerk)"
'''
EXAMPLES = '''
- name: Get job status
community.general.ocapi_info:
category: Status
command: JobStatus
baseuri: "http://iom1.wdc.com"
jobName: FirmwareUpdate
username: "{{ username }}"
password: "{{ password }}"
'''
RETURN = '''
msg:
description: Message with action result or error description.
returned: always
type: str
sample: "Action was successful"
percentComplete:
description: Percent complete of the relevant operation. Applies to C(JobStatus) command.
returned: when supported
type: int
sample: 99
operationStatus:
description: Status of the relevant operation. Applies to C(JobStatus) command. See OCAPI documentation for details.
returned: when supported
type: str
sample: "Activate needed"
operationStatusId:
description: Integer value of status (corresponds to operationStatus). Applies to C(JobStatus) command. See OCAPI documentation for details.
returned: when supported
type: int
sample: 65540
operationHealth:
description: Health of the operation. Applies to C(JobStatus) command. See OCAPI documentation for details.
returned: when supported
type: str
sample: "OK"
operationHealthId:
description: >
Integer value for health of the operation (corresponds to C(operationHealth)). Applies to C(JobStatus) command.
See OCAPI documentation for details.
returned: when supported
type: str
sample: "OK"
details:
description: Details of the relevant operation. Applies to C(JobStatus) command.
returned: when supported
type: list
elements: str
status:
description: Dict containing status information. See OCAPI documentation for details.
returned: when supported
type: dict
sample: {
"Details": [
"None"
],
"Health": [
{
"ID": 5,
"Name": "OK"
}
],
"State": {
"ID": 16,
"Name": "In service"
}
}
'''
from ansible.module_utils.basic import AnsibleModule
from ansible_collections.community.general.plugins.module_utils.ocapi_utils import OcapiUtils
from ansible.module_utils.common.text.converters import to_native
from ansible.module_utils.six.moves.urllib.parse import quote_plus, urljoin
# More will be added as module features are expanded
CATEGORY_COMMANDS_ALL = {
"Jobs": ["JobStatus"]
}
def main():
result = {}
module = AnsibleModule(
argument_spec=dict(
category=dict(required=True),
command=dict(required=True, type='str'),
job_name=dict(type='str'),
baseuri=dict(required=True, type='str'),
proxy_slot_number=dict(type='int'),
username=dict(required=True),
password=dict(required=True, no_log=True),
timeout=dict(type='int', default=10)
),
supports_check_mode=True
)
category = module.params['category']
command = module.params['command']
# admin credentials used for authentication
creds = {
'user': module.params['username'],
'pswd': module.params['password']
}
# timeout
timeout = module.params['timeout']
base_uri = "https://" + module.params["baseuri"]
proxy_slot_number = module.params.get("proxy_slot_number")
ocapi_utils = OcapiUtils(creds, base_uri, proxy_slot_number, timeout, module)
# Check that Category is valid
if category not in CATEGORY_COMMANDS_ALL:
module.fail_json(msg=to_native("Invalid Category '%s'. Valid Categories = %s" % (category, list(CATEGORY_COMMANDS_ALL.keys()))))
# Check that the command is valid
if command not in CATEGORY_COMMANDS_ALL[category]:
module.fail_json(msg=to_native("Invalid Command '%s'. Valid Commands = %s" % (command, CATEGORY_COMMANDS_ALL[category])))
# Organize by Categories / Commands
if category == "Jobs":
if command == "JobStatus":
if module.params.get("job_name") is None:
module.fail_json(msg=to_native(
"job_name required for JobStatus command."))
job_uri = urljoin(base_uri, 'Jobs/' + module.params["job_name"])
result = ocapi_utils.get_job_status(job_uri)
if result['ret'] is False:
module.fail_json(msg=to_native(result['msg']))
else:
del result['ret']
changed = False
session = result.get('session', dict())
kwargs = {
"changed": changed,
"session": session,
"msg": "Action was successful." if not module.check_mode else result.get(
"msg", "No action performed in check mode."
)
}
result_keys = [result_key for result_key in result if result_key not in kwargs]
for result_key in result_keys:
kwargs[result_key] = result[result_key]
module.exit_json(**kwargs)
if __name__ == '__main__':
main()

View file

@ -0,0 +1,54 @@
# -*- coding: utf-8 -*-
# Copyright (c) Ansible project
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
# SPDX-License-Identifier: GPL-3.0-or-later
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import os
import re
import shutil
import tempfile
from ansible_collections.community.general.tests.unit.compat import unittest
from ansible_collections.community.general.plugins.module_utils.ocapi_utils import OcapiUtils
class TestOcapiUtils(unittest.TestCase):
def setUp(self):
self.tempdir = tempfile.mkdtemp()
self.utils = OcapiUtils(creds={"user": "a_user", "pswd": "a_password"},
base_uri="fakeUri",
proxy_slot_number=None,
timeout=30,
module=None)
def tearDown(self):
shutil.rmtree(self.tempdir)
def test_prepare_multipart_firmware_upload(self):
# Generate a binary file and save it
filename = "fake_firmware.bin"
filepath = os.path.join(self.tempdir, filename)
file_contents = b'\x00\x01\x02\x03\x04'
with open(filepath, 'wb+') as f:
f.write(file_contents)
# Call prepare_mutipart_firmware_upload
content_type, b_form_data = self.utils.prepare_multipart_firmware_upload(filepath)
# Check the returned content-type
content_type_pattern = r"multipart/form-data; boundary=(.*)"
m = re.match(content_type_pattern, content_type)
self.assertIsNotNone(m)
# Check the returned binary data
boundary = m.group(1)
expected_content_text = '--%s\r\n' % boundary
expected_content_text += 'Content-Disposition: form-data; name="FirmwareFile"; filename="%s"\r\n' % filename
expected_content_text += 'Content-Type: application/octet-stream\r\n\r\n'
expected_content_bytes = bytearray(expected_content_text, 'utf-8')
expected_content_bytes += file_contents
expected_content_bytes += bytearray('\r\n--%s--' % boundary, 'utf-8')
self.assertEqual(expected_content_bytes, b_form_data)

View file

@ -0,0 +1,639 @@
# -*- coding: utf-8 -*-
# Copyright (c) Ansible project
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
# SPDX-License-Identifier: GPL-3.0-or-later
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import os
import shutil
import tempfile
from ansible_collections.community.general.tests.unit.compat.mock import patch
from ansible_collections.community.general.tests.unit.compat import unittest
from ansible.module_utils import basic
import ansible_collections.community.general.plugins.modules.ocapi_command as module
from ansible_collections.community.general.tests.unit.plugins.modules.utils import AnsibleExitJson, AnsibleFailJson
from ansible_collections.community.general.tests.unit.plugins.modules.utils import set_module_args, exit_json, fail_json
from ansible.module_utils.six.moves.urllib.parse import quote_plus, urljoin
MOCK_BASE_URI = "mockBaseUri/"
OPERATING_SYSTEM_URI = "OperatingSystem"
MOCK_JOB_NAME = "MockJob"
ACTION_WAS_SUCCESSFUL = "Action was successful."
UPDATE_NOT_PERFORMED_IN_CHECK_MODE = "Update not performed in check mode."
NO_ACTION_PERFORMED_IN_CHECK_MODE = "No action performed in check mode."
MOCK_SUCCESSFUL_HTTP_RESPONSE_LED_INDICATOR_OFF_WITH_ETAG = {
"ret": True,
"data": {
"IndicatorLED": {
"ID": 4,
"Name": "Off"
},
"PowerState": {
"ID": 2,
"Name": "On"
}
},
"headers": {"etag": "MockETag"}
}
MOCK_SUCCESSFUL_HTTP_RESPONSE = {
"ret": True,
"data": {}
}
MOCK_404_RESPONSE = {
"ret": False,
"status": 404
}
MOCK_SUCCESSFUL_HTTP_RESPONSE_WITH_LOCATION_HEADER = {
"ret": True,
"data": {},
"headers": {"location": "mock_location"}
}
MOCK_HTTP_RESPONSE_CONFLICT = {
"ret": False,
"msg": "Conflict",
"status": 409
}
MOCK_HTTP_RESPONSE_JOB_IN_PROGRESS = {
"ret": True,
"data": {
"PercentComplete": 99
},
"headers": {
"etag": "12345"
}
}
MOCK_HTTP_RESPONSE_JOB_COMPLETE = {
"ret": True,
"data": {
"PercentComplete": 100
},
"headers": {
"etag": "12345"
}
}
def get_bin_path(self, arg, required=False):
"""Mock AnsibleModule.get_bin_path"""
return arg
def get_exception_message(ansible_exit_json):
"""From an AnsibleExitJson exception, get the message string."""
return ansible_exit_json.exception.args[0]["msg"]
def is_changed(ansible_exit_json):
"""From an AnsibleExitJson exception, return the value of the changed flag"""
return ansible_exit_json.exception.args[0]["changed"]
def mock_get_request(*args, **kwargs):
"""Mock for get_request."""
url = args[1]
if url == 'https://' + MOCK_BASE_URI:
return MOCK_SUCCESSFUL_HTTP_RESPONSE_LED_INDICATOR_OFF_WITH_ETAG
elif url == "mock_location":
return MOCK_SUCCESSFUL_HTTP_RESPONSE
raise RuntimeError("Illegal call to get_request in test: " + args[1])
def mock_get_request_job_does_not_exist(*args, **kwargs):
"""Mock for get_request."""
url = args[1]
if url == 'https://' + MOCK_BASE_URI:
return MOCK_SUCCESSFUL_HTTP_RESPONSE_LED_INDICATOR_OFF_WITH_ETAG
elif url == urljoin('https://' + MOCK_BASE_URI, "Jobs/" + MOCK_JOB_NAME):
return MOCK_404_RESPONSE
raise RuntimeError("Illegal call to get_request in test: " + args[1])
def mock_get_request_job_in_progress(*args, **kwargs):
url = args[1]
if url == 'https://' + MOCK_BASE_URI:
return MOCK_SUCCESSFUL_HTTP_RESPONSE_LED_INDICATOR_OFF_WITH_ETAG
elif url == urljoin('https://' + MOCK_BASE_URI, "Jobs/" + MOCK_JOB_NAME):
return MOCK_HTTP_RESPONSE_JOB_IN_PROGRESS
raise RuntimeError("Illegal call to get_request in test: " + args[1])
def mock_get_request_job_complete(*args, **kwargs):
url = args[1]
if url == 'https://' + MOCK_BASE_URI:
return MOCK_SUCCESSFUL_HTTP_RESPONSE_LED_INDICATOR_OFF_WITH_ETAG
elif url == urljoin('https://' + MOCK_BASE_URI, "Jobs/" + MOCK_JOB_NAME):
return MOCK_HTTP_RESPONSE_JOB_COMPLETE
raise RuntimeError("Illegal call to get_request in test: " + args[1])
def mock_put_request(*args, **kwargs):
"""Mock put_request."""
url = args[1]
if url == 'https://' + MOCK_BASE_URI:
return MOCK_SUCCESSFUL_HTTP_RESPONSE_WITH_LOCATION_HEADER
raise RuntimeError("Illegal PUT call to: " + args[1])
def mock_delete_request(*args, **kwargs):
"""Mock delete request."""
url = args[1]
if url == urljoin('https://' + MOCK_BASE_URI, 'Jobs/' + MOCK_JOB_NAME):
return MOCK_SUCCESSFUL_HTTP_RESPONSE
raise RuntimeError("Illegal DELETE call to: " + args[1])
def mock_post_request(*args, **kwargs):
"""Mock post_request."""
url = args[1]
if url == urljoin('https://' + MOCK_BASE_URI, OPERATING_SYSTEM_URI):
return MOCK_SUCCESSFUL_HTTP_RESPONSE
raise RuntimeError("Illegal POST call to: " + args[1])
def mock_http_request_conflict(*args, **kwargs):
"""Mock to make an HTTP request return 409 Conflict"""
return MOCK_HTTP_RESPONSE_CONFLICT
def mock_invalid_http_request(*args, **kwargs):
"""Mock to make an HTTP request invalid. Raises an exception."""
raise RuntimeError("Illegal HTTP call to " + args[1])
class TestOcapiCommand(unittest.TestCase):
def setUp(self):
self.mock_module_helper = patch.multiple(basic.AnsibleModule,
exit_json=exit_json,
fail_json=fail_json,
get_bin_path=get_bin_path)
self.mock_module_helper.start()
self.addCleanup(self.mock_module_helper.stop)
self.tempdir = tempfile.mkdtemp()
def tearDown(self):
shutil.rmtree(self.tempdir)
def test_module_fail_when_required_args_missing(self):
with self.assertRaises(AnsibleFailJson) as ansible_fail_json:
set_module_args({})
module.main()
self.assertIn("missing required arguments:", get_exception_message(ansible_fail_json))
def test_module_fail_when_unknown_category(self):
with self.assertRaises(AnsibleFailJson) as ansible_fail_json:
set_module_args({
'category': 'unknown',
'command': 'IndicatorLedOn',
'username': 'USERID',
'password': 'PASSW0RD=21',
'baseuri': MOCK_BASE_URI
})
module.main()
self.assertIn("Invalid Category 'unknown", get_exception_message(ansible_fail_json))
def test_set_power_mode(self):
"""Test that we can set chassis power mode"""
with patch.multiple("ansible_collections.community.general.plugins.module_utils.ocapi_utils.OcapiUtils",
get_request=mock_get_request,
put_request=mock_put_request):
with self.assertRaises(AnsibleExitJson) as ansible_exit_json:
set_module_args({
'category': 'Chassis',
'command': 'PowerModeLow',
'baseuri': MOCK_BASE_URI,
'username': 'USERID',
'password': 'PASSWORD=21'
})
module.main()
self.assertEqual(ACTION_WAS_SUCCESSFUL, get_exception_message(ansible_exit_json))
self.assertTrue(is_changed(ansible_exit_json))
def test_set_chassis_led_indicator(self):
"""Test that we can set chassis LED indicator."""
with patch.multiple("ansible_collections.community.general.plugins.module_utils.ocapi_utils.OcapiUtils",
get_request=mock_get_request,
put_request=mock_put_request):
with self.assertRaises(AnsibleExitJson) as ansible_exit_json:
set_module_args({
'category': 'Chassis',
'command': 'IndicatorLedOn',
'baseuri': MOCK_BASE_URI,
'username': 'USERID',
'password': 'PASSWORD=21'
})
module.main()
self.assertEqual(ACTION_WAS_SUCCESSFUL, get_exception_message(ansible_exit_json))
self.assertTrue(is_changed(ansible_exit_json))
def test_set_power_mode_already_set(self):
"""Test that if we set Power Mode to normal when it's already normal, we get changed=False."""
with patch.multiple("ansible_collections.community.general.plugins.module_utils.ocapi_utils.OcapiUtils",
get_request=mock_get_request,
put_request=mock_invalid_http_request):
with self.assertRaises(AnsibleExitJson) as ansible_exit_json:
set_module_args({
'category': 'Chassis',
'command': 'PowerModeNormal',
'baseuri': MOCK_BASE_URI,
'username': 'USERID',
'password': 'PASSWORD=21'
})
module.main()
self.assertEqual(ACTION_WAS_SUCCESSFUL, get_exception_message(ansible_exit_json))
self.assertFalse(is_changed(ansible_exit_json))
def test_set_power_mode_check_mode(self):
"""Test check mode when setting chassis Power Mode."""
with patch.multiple("ansible_collections.community.general.plugins.module_utils.ocapi_utils.OcapiUtils",
get_request=mock_get_request,
put_request=mock_invalid_http_request):
with self.assertRaises(AnsibleExitJson) as ansible_exit_json:
set_module_args({
'category': 'Chassis',
'command': 'IndicatorLedOn',
'baseuri': MOCK_BASE_URI,
'username': 'USERID',
'password': 'PASSWORD=21',
'_ansible_check_mode': True
})
module.main()
self.assertEqual(UPDATE_NOT_PERFORMED_IN_CHECK_MODE, get_exception_message(ansible_exit_json))
self.assertTrue(is_changed(ansible_exit_json))
def test_set_chassis_led_indicator_check_mode(self):
"""Test check mode when setting chassis LED indicator"""
with patch.multiple("ansible_collections.community.general.plugins.module_utils.ocapi_utils.OcapiUtils",
get_request=mock_get_request,
put_request=mock_invalid_http_request):
with self.assertRaises(AnsibleExitJson) as ansible_exit_json:
set_module_args({
'category': 'Chassis',
'command': 'IndicatorLedOn',
'baseuri': MOCK_BASE_URI,
'username': 'USERID',
'password': 'PASSWORD=21',
'_ansible_check_mode': True
})
module.main()
self.assertEqual(UPDATE_NOT_PERFORMED_IN_CHECK_MODE, get_exception_message(ansible_exit_json))
self.assertTrue(is_changed(ansible_exit_json))
def test_set_chassis_led_indicator_already_set(self):
"""Test that if we set LED Indicator to off when it's already off, we get changed=False."""
with patch.multiple("ansible_collections.community.general.plugins.module_utils.ocapi_utils.OcapiUtils",
get_request=mock_get_request,
put_request=mock_invalid_http_request):
with self.assertRaises(AnsibleExitJson) as ansible_exit_json:
set_module_args({
'category': 'Chassis',
'command': 'IndicatorLedOff',
'baseuri': MOCK_BASE_URI,
'username': 'USERID',
'password': 'PASSWORD=21'
})
module.main()
self.assertEqual(ACTION_WAS_SUCCESSFUL, get_exception_message(ansible_exit_json))
self.assertFalse(is_changed(ansible_exit_json))
def test_set_chassis_led_indicator_already_set_check_mode(self):
"""Test that if we set LED Indicator to off when it's already off, we get changed=False even in check mode."""
with patch.multiple("ansible_collections.community.general.plugins.module_utils.ocapi_utils.OcapiUtils",
get_request=mock_get_request,
put_request=mock_invalid_http_request):
with self.assertRaises(AnsibleExitJson) as ansible_exit_json:
set_module_args({
'category': 'Chassis',
'command': 'IndicatorLedOff',
'baseuri': MOCK_BASE_URI,
'username': 'USERID',
'password': 'PASSWORD=21',
"_ansible_check_mode": True
})
module.main()
self.assertEqual(NO_ACTION_PERFORMED_IN_CHECK_MODE, get_exception_message(ansible_exit_json))
self.assertFalse(is_changed(ansible_exit_json))
def test_set_chassis_invalid_indicator_command(self):
with patch.multiple("ansible_collections.community.general.plugins.module_utils.ocapi_utils.OcapiUtils",
get_request=mock_get_request,
put_request=mock_put_request):
with self.assertRaises(AnsibleFailJson) as ansible_fail_json:
set_module_args({
'category': 'Chassis',
'command': 'IndicatorLedBright',
'baseuri': MOCK_BASE_URI,
'username': 'USERID',
'password': 'PASSWORD=21'
})
module.main()
self.assertIn("Invalid Command", get_exception_message(ansible_fail_json))
def test_reset_enclosure(self):
with patch.multiple("ansible_collections.community.general.plugins.module_utils.ocapi_utils.OcapiUtils",
get_request=mock_get_request,
put_request=mock_put_request):
with self.assertRaises(AnsibleExitJson) as ansible_exit_json:
set_module_args({
'category': 'Systems',
'command': 'PowerGracefulRestart',
'baseuri': MOCK_BASE_URI,
'username': 'USERID',
'password': 'PASSWORD=21'
})
module.main()
self.assertEqual(ACTION_WAS_SUCCESSFUL, get_exception_message(ansible_exit_json))
self.assertTrue(is_changed(ansible_exit_json))
def test_reset_enclosure_check_mode(self):
with patch.multiple("ansible_collections.community.general.plugins.module_utils.ocapi_utils.OcapiUtils",
get_request=mock_get_request,
put_request=mock_invalid_http_request):
with self.assertRaises(AnsibleExitJson) as ansible_exit_json:
set_module_args({
'category': 'Systems',
'command': 'PowerGracefulRestart',
'baseuri': MOCK_BASE_URI,
'username': 'USERID',
'password': 'PASSWORD=21',
"_ansible_check_mode": True
})
module.main()
self.assertEqual(UPDATE_NOT_PERFORMED_IN_CHECK_MODE, get_exception_message(ansible_exit_json))
self.assertTrue(is_changed(ansible_exit_json))
def test_firmware_upload_missing_update_image_path(self):
with patch.multiple("ansible_collections.community.general.plugins.module_utils.ocapi_utils.OcapiUtils",
get_request=mock_get_request,
put_request=mock_put_request):
with self.assertRaises(AnsibleFailJson) as ansible_fail_json:
set_module_args({
'category': 'Update',
'command': 'FWUpload',
'baseuri': MOCK_BASE_URI,
'username': 'USERID',
'password': 'PASSWORD=21'
})
module.main()
self.assertEqual("Missing update_image_path.", get_exception_message(ansible_fail_json))
def test_firmware_upload_file_not_found(self):
with patch.multiple("ansible_collections.community.general.plugins.module_utils.ocapi_utils.OcapiUtils",
get_request=mock_get_request,
put_request=mock_invalid_http_request):
with self.assertRaises(AnsibleFailJson) as ansible_fail_json:
set_module_args({
'category': 'Update',
'command': 'FWUpload',
'update_image_path': 'nonexistentfile.bin',
'baseuri': MOCK_BASE_URI,
'username': 'USERID',
'password': 'PASSWORD=21'
})
module.main()
self.assertEqual("File does not exist.", get_exception_message(ansible_fail_json))
def test_firmware_upload(self):
filename = "fake_firmware.bin"
filepath = os.path.join(self.tempdir, filename)
file_contents = b'\x00\x01\x02\x03\x04'
with open(filepath, 'wb+') as f:
f.write(file_contents)
with patch.multiple("ansible_collections.community.general.plugins.module_utils.ocapi_utils.OcapiUtils",
get_request=mock_get_request,
put_request=mock_put_request,
post_request=mock_post_request):
with self.assertRaises(AnsibleExitJson) as ansible_exit_json:
set_module_args({
'category': 'Update',
'command': 'FWUpload',
'update_image_path': filepath,
'baseuri': MOCK_BASE_URI,
'username': 'USERID',
'password': 'PASSWORD=21'
})
module.main()
self.assertEqual(ACTION_WAS_SUCCESSFUL, get_exception_message(ansible_exit_json))
self.assertTrue(is_changed(ansible_exit_json))
def test_firmware_upload_check_mode(self):
filename = "fake_firmware.bin"
filepath = os.path.join(self.tempdir, filename)
file_contents = b'\x00\x01\x02\x03\x04'
with open(filepath, 'wb+') as f:
f.write(file_contents)
with patch.multiple("ansible_collections.community.general.plugins.module_utils.ocapi_utils.OcapiUtils",
get_request=mock_get_request,
put_request=mock_put_request,
post_request=mock_invalid_http_request):
with self.assertRaises(AnsibleExitJson) as ansible_exit_json:
set_module_args({
'category': 'Update',
'command': 'FWUpload',
'update_image_path': filepath,
'baseuri': MOCK_BASE_URI,
'username': 'USERID',
'password': 'PASSWORD=21',
"_ansible_check_mode": True
})
module.main()
self.assertEqual(UPDATE_NOT_PERFORMED_IN_CHECK_MODE, get_exception_message(ansible_exit_json))
self.assertTrue(is_changed(ansible_exit_json))
def test_firmware_update(self):
with patch.multiple("ansible_collections.community.general.plugins.module_utils.ocapi_utils.OcapiUtils",
get_request=mock_get_request,
put_request=mock_put_request,
post_request=mock_invalid_http_request):
with self.assertRaises(AnsibleExitJson) as ansible_exit_json:
set_module_args({
'category': 'Update',
'command': 'FWUpdate',
'baseuri': MOCK_BASE_URI,
'username': 'USERID',
'password': 'PASSWORD=21'
})
module.main()
self.assertEqual(ACTION_WAS_SUCCESSFUL, get_exception_message(ansible_exit_json))
self.assertTrue(is_changed(ansible_exit_json))
def test_firmware_update_check_mode(self):
with patch.multiple("ansible_collections.community.general.plugins.module_utils.ocapi_utils.OcapiUtils",
get_request=mock_get_request,
put_request=mock_invalid_http_request,
post_request=mock_invalid_http_request):
with self.assertRaises(AnsibleExitJson) as ansible_exit_json:
set_module_args({
'category': 'Update',
'command': 'FWUpdate',
'baseuri': MOCK_BASE_URI,
'username': 'USERID',
'password': 'PASSWORD=21',
"_ansible_check_mode": True
})
module.main()
self.assertEqual(UPDATE_NOT_PERFORMED_IN_CHECK_MODE, get_exception_message(ansible_exit_json))
self.assertTrue(is_changed(ansible_exit_json))
def test_firmware_activate(self):
with patch.multiple("ansible_collections.community.general.plugins.module_utils.ocapi_utils.OcapiUtils",
get_request=mock_get_request,
put_request=mock_put_request,
post_request=mock_invalid_http_request):
with self.assertRaises(AnsibleExitJson) as ansible_exit_json:
set_module_args({
'category': 'Update',
'command': 'FWActivate',
'baseuri': MOCK_BASE_URI,
'username': 'USERID',
'password': 'PASSWORD=21'
})
module.main()
self.assertEqual(ACTION_WAS_SUCCESSFUL, get_exception_message(ansible_exit_json))
self.assertTrue(is_changed(ansible_exit_json))
def test_firmware_activate_check_mode(self):
with patch.multiple("ansible_collections.community.general.plugins.module_utils.ocapi_utils.OcapiUtils",
get_request=mock_get_request,
put_request=mock_invalid_http_request,
post_request=mock_invalid_http_request):
with self.assertRaises(AnsibleExitJson) as ansible_exit_json:
set_module_args({
'category': 'Update',
'command': 'FWActivate',
'baseuri': MOCK_BASE_URI,
'username': 'USERID',
'password': 'PASSWORD=21',
"_ansible_check_mode": True
})
module.main()
self.assertEqual(UPDATE_NOT_PERFORMED_IN_CHECK_MODE, get_exception_message(ansible_exit_json))
self.assertTrue(is_changed(ansible_exit_json))
def test_delete_job(self):
with patch.multiple("ansible_collections.community.general.plugins.module_utils.ocapi_utils.OcapiUtils",
get_request=mock_get_request_job_complete,
delete_request=mock_delete_request,
put_request=mock_invalid_http_request,
post_request=mock_invalid_http_request):
with self.assertRaises(AnsibleExitJson) as ansible_exit_json:
set_module_args({
'category': 'Jobs',
'command': 'DeleteJob',
'baseuri': MOCK_BASE_URI,
'job_name': MOCK_JOB_NAME,
'username': 'USERID',
'password': 'PASSWORD=21'
})
module.main()
self.assertEqual(ACTION_WAS_SUCCESSFUL, get_exception_message(ansible_exit_json))
self.assertTrue(is_changed(ansible_exit_json))
def test_delete_job_in_progress(self):
with patch.multiple("ansible_collections.community.general.plugins.module_utils.ocapi_utils.OcapiUtils",
get_request=mock_get_request_job_in_progress,
delete_request=mock_invalid_http_request,
put_request=mock_invalid_http_request,
post_request=mock_invalid_http_request):
with self.assertRaises(AnsibleFailJson) as ansible_fail_json:
set_module_args({
'category': 'Jobs',
'command': 'DeleteJob',
'baseuri': MOCK_BASE_URI,
'job_name': MOCK_JOB_NAME,
'username': 'USERID',
'password': 'PASSWORD=21'
})
module.main()
self.assertEqual("Cannot delete job because it is in progress.", get_exception_message(ansible_fail_json))
def test_delete_job_in_progress_only_on_delete(self):
with patch.multiple("ansible_collections.community.general.plugins.module_utils.ocapi_utils.OcapiUtils",
get_request=mock_get_request_job_complete,
delete_request=mock_http_request_conflict,
put_request=mock_invalid_http_request,
post_request=mock_invalid_http_request):
with self.assertRaises(AnsibleFailJson) as ansible_fail_json:
set_module_args({
'category': 'Jobs',
'command': 'DeleteJob',
'baseuri': MOCK_BASE_URI,
'job_name': MOCK_JOB_NAME,
'username': 'USERID',
'password': 'PASSWORD=21'
})
module.main()
self.assertEqual("Cannot delete job because it is in progress.", get_exception_message(ansible_fail_json))
def test_delete_job_check_mode(self):
with patch.multiple("ansible_collections.community.general.plugins.module_utils.ocapi_utils.OcapiUtils",
get_request=mock_get_request_job_complete,
delete_request=mock_delete_request,
put_request=mock_invalid_http_request,
post_request=mock_invalid_http_request):
with self.assertRaises(AnsibleExitJson) as ansible_exit_json:
set_module_args({
'category': 'Jobs',
'command': 'DeleteJob',
'baseuri': MOCK_BASE_URI,
'job_name': MOCK_JOB_NAME,
'username': 'USERID',
'password': 'PASSWORD=21',
'_ansible_check_mode': True
})
module.main()
self.assertEqual(UPDATE_NOT_PERFORMED_IN_CHECK_MODE, get_exception_message(ansible_exit_json))
self.assertTrue(is_changed(ansible_exit_json))
def test_delete_job_check_mode_job_not_found(self):
with patch.multiple("ansible_collections.community.general.plugins.module_utils.ocapi_utils.OcapiUtils",
get_request=mock_get_request_job_does_not_exist,
delete_request=mock_delete_request,
put_request=mock_invalid_http_request,
post_request=mock_invalid_http_request):
with self.assertRaises(AnsibleExitJson) as ansible_exit_json:
set_module_args({
'category': 'Jobs',
'command': 'DeleteJob',
'baseuri': MOCK_BASE_URI,
'job_name': MOCK_JOB_NAME,
'username': 'USERID',
'password': 'PASSWORD=21',
'_ansible_check_mode': True
})
module.main()
self.assertEqual("Job already deleted.", get_exception_message(ansible_exit_json))
self.assertFalse(is_changed(ansible_exit_json))
def test_delete_job_check_mode_job_in_progress(self):
with patch.multiple("ansible_collections.community.general.plugins.module_utils.ocapi_utils.OcapiUtils",
get_request=mock_get_request_job_in_progress,
delete_request=mock_delete_request,
put_request=mock_invalid_http_request,
post_request=mock_invalid_http_request):
with self.assertRaises(AnsibleFailJson) as ansible_fail_json:
set_module_args({
'category': 'Jobs',
'command': 'DeleteJob',
'baseuri': MOCK_BASE_URI,
'job_name': MOCK_JOB_NAME,
'username': 'USERID',
'password': 'PASSWORD=21',
'_ansible_check_mode': True
})
module.main()
self.assertEqual("Cannot delete job because it is in progress.", get_exception_message(ansible_fail_json))

View file

@ -0,0 +1,240 @@
# -*- coding: utf-8 -*-
# Copyright (c) Ansible project
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
# SPDX-License-Identifier: GPL-3.0-or-later
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
from ansible_collections.community.general.tests.unit.compat.mock import patch
from ansible_collections.community.general.tests.unit.compat import unittest
from ansible.module_utils import basic
import ansible_collections.community.general.plugins.modules.ocapi_info as module
from ansible_collections.community.general.tests.unit.plugins.modules.utils import AnsibleExitJson, AnsibleFailJson
from ansible_collections.community.general.tests.unit.plugins.modules.utils import set_module_args, exit_json, fail_json
MOCK_BASE_URI = "mockBaseUri"
MOCK_JOB_NAME_IN_PROGRESS = "MockJobInProgress"
MOCK_JOB_NAME_COMPLETE = "MockJobComplete"
MOCK_JOB_NAME_DOES_NOT_EXIST = "MockJobDoesNotExist"
ACTION_WAS_SUCCESSFUL = "Action was successful."
MOCK_SUCCESSFUL_HTTP_RESPONSE = {
"ret": True,
"data": {}
}
MOCK_404_RESPONSE = {
"ret": False,
"status": 404
}
MOCK_HTTP_RESPONSE_JOB_IN_PROGRESS = {
"ret": True,
"data": {
"Self": "https://openflex-data24-usalp02120qo0012-iomb:443/Storage/Devices/openflex-data24-usalp02120qo0012/Jobs/FirmwareUpdate/",
"ID": MOCK_JOB_NAME_IN_PROGRESS,
"PercentComplete": 10,
"Status": {
"State": {
"ID": 16,
"Name": "In service"
},
"Health": [
{
"ID": 5,
"Name": "OK"
}
]
}
}
}
MOCK_HTTP_RESPONSE_JOB_COMPLETE = {
"ret": True,
"data": {
"Self": "https://openflex-data24-usalp02120qo0012-iomb:443/Storage/Devices/openflex-data24-usalp02120qo0012/Jobs/FirmwareUpdate/",
"ID": MOCK_JOB_NAME_COMPLETE,
"PercentComplete": 100,
"Status": {
"State": {
"ID": 65540,
"Name": "Activate needed"
},
"Health": [
{
"ID": 5,
"Name": "OK"
}
],
"Details": [
"Completed."
]
}
}
}
def get_bin_path(self, arg, required=False):
"""Mock AnsibleModule.get_bin_path"""
return arg
def get_exception_message(ansible_exit_json):
"""From an AnsibleExitJson exception, get the message string."""
return ansible_exit_json.exception.args[0]["msg"]
def mock_get_request(*args, **kwargs):
"""Mock for get_request."""
url = args[1]
if url == "https://" + MOCK_BASE_URI:
return MOCK_SUCCESSFUL_HTTP_RESPONSE
elif url == "https://" + MOCK_BASE_URI + '/Jobs/' + MOCK_JOB_NAME_IN_PROGRESS:
return MOCK_HTTP_RESPONSE_JOB_IN_PROGRESS
elif url == "https://" + MOCK_BASE_URI + '/Jobs/' + MOCK_JOB_NAME_COMPLETE:
return MOCK_HTTP_RESPONSE_JOB_COMPLETE
elif url == "https://" + MOCK_BASE_URI + '/Jobs/' + MOCK_JOB_NAME_DOES_NOT_EXIST:
return MOCK_404_RESPONSE
else:
raise RuntimeError("Illegal GET call to: " + args[1])
def mock_put_request(*args, **kwargs):
"""Mock put_request. PUT should never happen so it will raise an error."""
raise RuntimeError("Illegal PUT call to: " + args[1])
def mock_delete_request(*args, **kwargs):
"""Mock delete request. DELETE should never happen so it will raise an error."""
raise RuntimeError("Illegal DELETE call to: " + args[1])
def mock_post_request(*args, **kwargs):
"""Mock post_request. POST should never happen so it will raise an error."""
raise RuntimeError("Illegal POST call to: " + args[1])
class TestOcapiInfo(unittest.TestCase):
def setUp(self):
self.mock_module_helper = patch.multiple(basic.AnsibleModule,
exit_json=exit_json,
fail_json=fail_json,
get_bin_path=get_bin_path)
self.mock_module_helper.start()
self.addCleanup(self.mock_module_helper.stop)
def test_module_fail_when_required_args_missing(self):
with self.assertRaises(AnsibleFailJson) as ansible_fail_json:
set_module_args({})
module.main()
self.assertIn("missing required arguments:", get_exception_message(ansible_fail_json))
def test_module_fail_when_unknown_category(self):
with self.assertRaises(AnsibleFailJson) as ansible_fail_json:
set_module_args({
'category': 'unknown',
'command': 'JobStatus',
'username': 'USERID',
'password': 'PASSW0RD=21',
'baseuri': MOCK_BASE_URI
})
module.main()
self.assertIn("Invalid Category 'unknown", get_exception_message(ansible_fail_json))
def test_module_fail_when_unknown_command(self):
with self.assertRaises(AnsibleFailJson) as ansible_fail_json:
set_module_args({
'category': 'Jobs',
'command': 'unknown',
'username': 'USERID',
'password': 'PASSW0RD=21',
'baseuri': MOCK_BASE_URI
})
module.main()
self.assertIn("Invalid Command 'unknown", get_exception_message(ansible_fail_json))
def test_job_status_in_progress(self):
with patch.multiple("ansible_collections.community.general.plugins.module_utils.ocapi_utils.OcapiUtils",
get_request=mock_get_request,
put_request=mock_put_request,
delete_request=mock_delete_request,
post_request=mock_post_request):
with self.assertRaises(AnsibleExitJson) as ansible_exit_json:
set_module_args({
'category': 'Jobs',
'command': 'JobStatus',
'job_name': MOCK_JOB_NAME_IN_PROGRESS,
'baseuri': MOCK_BASE_URI,
'username': 'USERID',
'password': 'PASSWORD=21'
})
module.main()
self.assertEqual(ACTION_WAS_SUCCESSFUL, get_exception_message(ansible_exit_json))
response_data = ansible_exit_json.exception.args[0]
self.assertEqual(MOCK_HTTP_RESPONSE_JOB_IN_PROGRESS["data"]["PercentComplete"], response_data["percentComplete"])
self.assertEqual(MOCK_HTTP_RESPONSE_JOB_IN_PROGRESS["data"]["Status"]["State"]["ID"], response_data["operationStatusId"])
self.assertEqual(MOCK_HTTP_RESPONSE_JOB_IN_PROGRESS["data"]["Status"]["State"]["Name"], response_data["operationStatus"])
self.assertEqual(MOCK_HTTP_RESPONSE_JOB_IN_PROGRESS["data"]["Status"]["Health"][0]["Name"], response_data["operationHealth"])
self.assertEqual(MOCK_HTTP_RESPONSE_JOB_IN_PROGRESS["data"]["Status"]["Health"][0]["ID"], response_data["operationHealthId"])
self.assertTrue(response_data["jobExists"])
self.assertFalse(response_data["changed"])
self.assertEqual(ACTION_WAS_SUCCESSFUL, response_data["msg"])
self.assertIsNone(response_data["details"])
def test_job_status_complete(self):
with patch.multiple("ansible_collections.community.general.plugins.module_utils.ocapi_utils.OcapiUtils",
get_request=mock_get_request,
put_request=mock_put_request,
delete_request=mock_delete_request,
post_request=mock_post_request):
with self.assertRaises(AnsibleExitJson) as ansible_exit_json:
set_module_args({
'category': 'Jobs',
'command': 'JobStatus',
'job_name': MOCK_JOB_NAME_COMPLETE,
'baseuri': MOCK_BASE_URI,
'username': 'USERID',
'password': 'PASSWORD=21'
})
module.main()
self.assertEqual(ACTION_WAS_SUCCESSFUL, get_exception_message(ansible_exit_json))
response_data = ansible_exit_json.exception.args[0]
self.assertEqual(MOCK_HTTP_RESPONSE_JOB_COMPLETE["data"]["PercentComplete"], response_data["percentComplete"])
self.assertEqual(MOCK_HTTP_RESPONSE_JOB_COMPLETE["data"]["Status"]["State"]["ID"], response_data["operationStatusId"])
self.assertEqual(MOCK_HTTP_RESPONSE_JOB_COMPLETE["data"]["Status"]["State"]["Name"], response_data["operationStatus"])
self.assertEqual(MOCK_HTTP_RESPONSE_JOB_COMPLETE["data"]["Status"]["Health"][0]["Name"], response_data["operationHealth"])
self.assertEqual(MOCK_HTTP_RESPONSE_JOB_COMPLETE["data"]["Status"]["Health"][0]["ID"], response_data["operationHealthId"])
self.assertTrue(response_data["jobExists"])
self.assertFalse(response_data["changed"])
self.assertEqual(ACTION_WAS_SUCCESSFUL, response_data["msg"])
self.assertEqual(["Completed."], response_data["details"])
def test_job_status_not_found(self):
with patch.multiple("ansible_collections.community.general.plugins.module_utils.ocapi_utils.OcapiUtils",
get_request=mock_get_request,
put_request=mock_put_request,
delete_request=mock_delete_request,
post_request=mock_post_request):
with self.assertRaises(AnsibleExitJson) as ansible_exit_json:
set_module_args({
'category': 'Jobs',
'command': 'JobStatus',
'job_name': MOCK_JOB_NAME_DOES_NOT_EXIST,
'baseuri': MOCK_BASE_URI,
'username': 'USERID',
'password': 'PASSWORD=21'
})
module.main()
self.assertEqual(ACTION_WAS_SUCCESSFUL, get_exception_message(ansible_exit_json))
response_data = ansible_exit_json.exception.args[0]
self.assertFalse(response_data["jobExists"])
self.assertEqual(0, response_data["percentComplete"])
self.assertEqual(1, response_data["operationStatusId"])
self.assertEqual("Not Available", response_data["operationStatus"])
self.assertIsNone(response_data["operationHealth"])
self.assertIsNone(response_data["operationHealthId"])
self.assertFalse(response_data["changed"])
self.assertEqual(ACTION_WAS_SUCCESSFUL, response_data["msg"])
self.assertEqual("Job does not exist.", response_data["details"])