mirror of
https://github.com/ansible-collections/community.general.git
synced 2024-09-14 20:13:21 +02:00
Added modules ipa_otpconfig and ipa_otptoken (#2122)
* Added module for ipa_otpconfig * Make no_log=False explicit. * Updated inputs to be int type instead of strings to align to expected inputs. Updated output message * Add changelog fragment * Remove changelog fragment as this is a new module * Update plugins/modules/identity/ipa/ipa_otpconfig.py Add version_added field to module description. Co-authored-by: Felix Fontein <felix@fontein.de> * Updated punctuation in examples * Add unit test for ipa_otpconfig * Add ipa_otptoken module with unit test * Updated documentation in unit test * Update plugins/modules/identity/ipa/ipa_otpconfig.py Co-authored-by: Felix Fontein <felix@fontein.de> * Update plugins/modules/identity/ipa/ipa_otpconfig.py Co-authored-by: Felix Fontein <felix@fontein.de> * Update plugins/modules/identity/ipa/ipa_otptoken.py Co-authored-by: Felix Fontein <felix@fontein.de> * Update plugins/modules/identity/ipa/ipa_otptoken.py Co-authored-by: Felix Fontein <felix@fontein.de> * Update plugins/modules/identity/ipa/ipa_otptoken.py Co-authored-by: Felix Fontein <felix@fontein.de> * Update plugins/modules/identity/ipa/ipa_otptoken.py Co-authored-by: Felix Fontein <felix@fontein.de> * Update plugins/modules/identity/ipa/ipa_otptoken.py Co-authored-by: Felix Fontein <felix@fontein.de> * Update plugins/modules/identity/ipa/ipa_otptoken.py Co-authored-by: Felix Fontein <felix@fontein.de> * Added some documentation updates to make it conform to ansible standards * Update plugins/modules/identity/ipa/ipa_otptoken.py Co-authored-by: Felix Fontein <felix@fontein.de> * Address review comments Co-authored-by: Chris Costa <chris.costa@compellingtech.com> Co-authored-by: Felix Fontein <felix@fontein.de>
This commit is contained in:
parent
fa13826273
commit
31645ded11
7 changed files with 1604 additions and 2 deletions
|
@ -119,9 +119,9 @@ class IPAClient(object):
|
|||
data = dict(method=method)
|
||||
|
||||
# TODO: We should probably handle this a little better.
|
||||
if method in ('ping', 'config_show'):
|
||||
if method in ('ping', 'config_show', 'otpconfig_show'):
|
||||
data['params'] = [[], {}]
|
||||
elif method == 'config_mod':
|
||||
elif method in ('config_mod', 'otpconfig_mod'):
|
||||
data['params'] = [[], item]
|
||||
else:
|
||||
data['params'] = [[name], item]
|
||||
|
|
172
plugins/modules/identity/ipa/ipa_otpconfig.py
Normal file
172
plugins/modules/identity/ipa/ipa_otpconfig.py
Normal file
|
@ -0,0 +1,172 @@
|
|||
#!/usr/bin/python
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright: (c) 2021, Ansible Project
|
||||
# Heavily influenced from Fran Fitzpatrick <francis.x.fitzpatrick@gmail.com> ipa_config module
|
||||
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
|
||||
|
||||
from __future__ import absolute_import, division, print_function
|
||||
__metaclass__ = type
|
||||
|
||||
DOCUMENTATION = r'''
|
||||
---
|
||||
module: ipa_otpconfig
|
||||
author: justchris1 (@justchris1)
|
||||
short_description: Manage FreeIPA OTP Configuration Settings
|
||||
version_added: 2.5.0
|
||||
description:
|
||||
- Modify global configuration settings of a FreeIPA Server with respect to OTP (One Time Passwords).
|
||||
options:
|
||||
ipatokentotpauthwindow:
|
||||
description: TOTP authentication window in seconds.
|
||||
aliases: ["totpauthwindow"]
|
||||
type: int
|
||||
ipatokentotpsyncwindow:
|
||||
description: TOTP synchronization window in seconds.
|
||||
aliases: ["totpsyncwindow"]
|
||||
type: int
|
||||
ipatokenhotpauthwindow:
|
||||
description: HOTP authentication window in number of hops.
|
||||
aliases: ["hotpauthwindow"]
|
||||
type: int
|
||||
ipatokenhotpsyncwindow:
|
||||
description: HOTP synchronization window in hops.
|
||||
aliases: ["hotpsyncwindow"]
|
||||
type: int
|
||||
extends_documentation_fragment:
|
||||
- community.general.ipa.documentation
|
||||
|
||||
'''
|
||||
|
||||
EXAMPLES = r'''
|
||||
- name: Ensure the TOTP authentication window is set to 300 seconds
|
||||
community.general.ipa_otpconfig:
|
||||
ipatokentotpauthwindow: '300'
|
||||
ipa_host: localhost
|
||||
ipa_user: admin
|
||||
ipa_pass: supersecret
|
||||
|
||||
- name: Ensure the TOTP syncronization window is set to 86400 seconds
|
||||
community.general.ipa_otpconfig:
|
||||
ipatokentotpsyncwindow: '86400'
|
||||
ipa_host: localhost
|
||||
ipa_user: admin
|
||||
ipa_pass: supersecret
|
||||
|
||||
- name: Ensure the HOTP authentication window is set to 10 hops
|
||||
community.general.ipa_otpconfig:
|
||||
ipatokenhotpauthwindow: '10'
|
||||
ipa_host: localhost
|
||||
ipa_user: admin
|
||||
ipa_pass: supersecret
|
||||
|
||||
- name: Ensure the HOTP syncronization window is set to 100 hops
|
||||
community.general.ipa_otpconfig:
|
||||
ipatokenhotpsyncwindow: '100'
|
||||
ipa_host: localhost
|
||||
ipa_user: admin
|
||||
ipa_pass: supersecret
|
||||
'''
|
||||
|
||||
RETURN = r'''
|
||||
otpconfig:
|
||||
description: OTP configuration as returned by IPA API.
|
||||
returned: always
|
||||
type: dict
|
||||
'''
|
||||
|
||||
import traceback
|
||||
|
||||
from ansible.module_utils.basic import AnsibleModule
|
||||
from ansible_collections.community.general.plugins.module_utils.ipa import IPAClient, ipa_argument_spec
|
||||
from ansible.module_utils._text import to_native
|
||||
|
||||
|
||||
class OTPConfigIPAClient(IPAClient):
|
||||
def __init__(self, module, host, port, protocol):
|
||||
super(OTPConfigIPAClient, self).__init__(module, host, port, protocol)
|
||||
|
||||
def otpconfig_show(self):
|
||||
return self._post_json(method='otpconfig_show', name=None)
|
||||
|
||||
def otpconfig_mod(self, name, item):
|
||||
return self._post_json(method='otpconfig_mod', name=name, item=item)
|
||||
|
||||
|
||||
def get_otpconfig_dict(ipatokentotpauthwindow=None, ipatokentotpsyncwindow=None,
|
||||
ipatokenhotpauthwindow=None, ipatokenhotpsyncwindow=None):
|
||||
|
||||
config = {}
|
||||
if ipatokentotpauthwindow is not None:
|
||||
config['ipatokentotpauthwindow'] = str(ipatokentotpauthwindow)
|
||||
if ipatokentotpsyncwindow is not None:
|
||||
config['ipatokentotpsyncwindow'] = str(ipatokentotpsyncwindow)
|
||||
if ipatokenhotpauthwindow is not None:
|
||||
config['ipatokenhotpauthwindow'] = str(ipatokenhotpauthwindow)
|
||||
if ipatokenhotpsyncwindow is not None:
|
||||
config['ipatokenhotpsyncwindow'] = str(ipatokenhotpsyncwindow)
|
||||
|
||||
return config
|
||||
|
||||
|
||||
def get_otpconfig_diff(client, ipa_config, module_config):
|
||||
return client.get_diff(ipa_data=ipa_config, module_data=module_config)
|
||||
|
||||
|
||||
def ensure(module, client):
|
||||
module_otpconfig = get_otpconfig_dict(
|
||||
ipatokentotpauthwindow=module.params.get('ipatokentotpauthwindow'),
|
||||
ipatokentotpsyncwindow=module.params.get('ipatokentotpsyncwindow'),
|
||||
ipatokenhotpauthwindow=module.params.get('ipatokenhotpauthwindow'),
|
||||
ipatokenhotpsyncwindow=module.params.get('ipatokenhotpsyncwindow'),
|
||||
)
|
||||
ipa_otpconfig = client.otpconfig_show()
|
||||
diff = get_otpconfig_diff(client, ipa_otpconfig, module_otpconfig)
|
||||
|
||||
changed = False
|
||||
new_otpconfig = {}
|
||||
for module_key in diff:
|
||||
if module_otpconfig.get(module_key) != ipa_otpconfig.get(module_key, None):
|
||||
changed = True
|
||||
new_otpconfig.update({module_key: module_otpconfig.get(module_key)})
|
||||
|
||||
if changed and not module.check_mode:
|
||||
client.otpconfig_mod(name=None, item=new_otpconfig)
|
||||
|
||||
return changed, client.otpconfig_show()
|
||||
|
||||
|
||||
def main():
|
||||
argument_spec = ipa_argument_spec()
|
||||
argument_spec.update(
|
||||
ipatokentotpauthwindow=dict(type='int', aliases=['totpauthwindow'], no_log=False),
|
||||
ipatokentotpsyncwindow=dict(type='int', aliases=['totpsyncwindow'], no_log=False),
|
||||
ipatokenhotpauthwindow=dict(type='int', aliases=['hotpauthwindow'], no_log=False),
|
||||
ipatokenhotpsyncwindow=dict(type='int', aliases=['hotpsyncwindow'], no_log=False),
|
||||
)
|
||||
|
||||
module = AnsibleModule(
|
||||
argument_spec=argument_spec,
|
||||
supports_check_mode=True
|
||||
)
|
||||
|
||||
client = OTPConfigIPAClient(
|
||||
module=module,
|
||||
host=module.params['ipa_host'],
|
||||
port=module.params['ipa_port'],
|
||||
protocol=module.params['ipa_prot']
|
||||
)
|
||||
|
||||
try:
|
||||
client.login(
|
||||
username=module.params['ipa_user'],
|
||||
password=module.params['ipa_pass']
|
||||
)
|
||||
changed, otpconfig = ensure(module, client)
|
||||
except Exception as e:
|
||||
module.fail_json(msg=to_native(e), exception=traceback.format_exc())
|
||||
|
||||
module.exit_json(changed=changed, otpconfig=otpconfig)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
527
plugins/modules/identity/ipa/ipa_otptoken.py
Normal file
527
plugins/modules/identity/ipa/ipa_otptoken.py
Normal file
|
@ -0,0 +1,527 @@
|
|||
#!/usr/bin/python
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright: (c) 2017, Ansible Project
|
||||
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
|
||||
|
||||
from __future__ import absolute_import, division, print_function
|
||||
__metaclass__ = type
|
||||
|
||||
DOCUMENTATION = r'''
|
||||
---
|
||||
module: ipa_otptoken
|
||||
author: justchris1 (@justchris1)
|
||||
short_description: Manage FreeIPA OTPs
|
||||
version_added: 2.5.0
|
||||
description:
|
||||
- Add, modify, and delete One Time Passwords in IPA.
|
||||
options:
|
||||
uniqueid:
|
||||
description: Unique ID of the token in IPA.
|
||||
required: true
|
||||
aliases: ["name"]
|
||||
type: str
|
||||
newuniqueid:
|
||||
description: If specified, the unique id specified will be changed to this.
|
||||
type: str
|
||||
otptype:
|
||||
description:
|
||||
- Type of OTP.
|
||||
- "B(Note:) Cannot be modified after OTP is created."
|
||||
type: str
|
||||
choices: [ totp, hotp ]
|
||||
secretkey:
|
||||
description:
|
||||
- Token secret (Base64).
|
||||
- If OTP is created and this is not specified, a random secret will be generated by IPA.
|
||||
- "B(Note:) Cannot be modified after OTP is created."
|
||||
type: str
|
||||
description:
|
||||
description: Description of the token (informational only).
|
||||
type: str
|
||||
owner:
|
||||
description: Assigned user of the token.
|
||||
type: str
|
||||
enabled:
|
||||
description: Mark the token as enabled (default C(true)).
|
||||
default: true
|
||||
type: bool
|
||||
notbefore:
|
||||
description:
|
||||
- First date/time the token can be used.
|
||||
- In the format C(YYYYMMddHHmmss).
|
||||
- For example, C(20180121182022) will allow the token to be used starting on 21 January 2018 at 18:20:22.
|
||||
type: str
|
||||
notafter:
|
||||
description:
|
||||
- Last date/time the token can be used.
|
||||
- In the format C(YYYYMMddHHmmss).
|
||||
- For example, C(20200121182022) will allow the token to be used until 21 January 2020 at 18:20:22.
|
||||
type: str
|
||||
vendor:
|
||||
description: Token vendor name (informational only).
|
||||
type: str
|
||||
model:
|
||||
description: Token model (informational only).
|
||||
type: str
|
||||
serial:
|
||||
description: Token serial (informational only).
|
||||
type: str
|
||||
state:
|
||||
description: State to ensure.
|
||||
choices: ['present', 'absent']
|
||||
default: 'present'
|
||||
type: str
|
||||
algorithm:
|
||||
description:
|
||||
- Token hash algorithm.
|
||||
- "B(Note:) Cannot be modified after OTP is created."
|
||||
choices: ['sha1', 'sha256', 'sha384', 'sha512']
|
||||
type: str
|
||||
digits:
|
||||
description:
|
||||
- Number of digits each token code will have.
|
||||
- "B(Note:) Cannot be modified after OTP is created."
|
||||
choices: [ 6, 8 ]
|
||||
type: int
|
||||
offset:
|
||||
description:
|
||||
- TOTP token / IPA server time difference.
|
||||
- "B(Note:) Cannot be modified after OTP is created."
|
||||
type: int
|
||||
interval:
|
||||
description:
|
||||
- Length of TOTP token code validity in seconds.
|
||||
- "B(Note:) Cannot be modified after OTP is created."
|
||||
type: int
|
||||
counter:
|
||||
description:
|
||||
- Initial counter for the HOTP token.
|
||||
- "B(Note:) Cannot be modified after OTP is created."
|
||||
type: int
|
||||
extends_documentation_fragment:
|
||||
- community.general.ipa.documentation
|
||||
'''
|
||||
|
||||
EXAMPLES = r'''
|
||||
- name: Create a totp for pinky, allowing the IPA server to generate using defaults
|
||||
community.general.ipa_otptoken:
|
||||
uniqueid: Token123
|
||||
otptype: totp
|
||||
owner: pinky
|
||||
ipa_host: ipa.example.com
|
||||
ipa_user: admin
|
||||
ipa_pass: topsecret
|
||||
|
||||
- name: Create a 8 digit hotp for pinky with sha256 with specified validity times
|
||||
community.general.ipa_otptoken:
|
||||
uniqueid: Token123
|
||||
enabled: true
|
||||
otptype: hotp
|
||||
digits: 8
|
||||
secretkey: UMKSIER00zT2T2tWMUlTRmNlekRCbFQvWFBVZUh2dElHWGR6T3VUR3IzK2xjaFk9
|
||||
algorithm: sha256
|
||||
notbefore: 20180121182123
|
||||
notafter: 20220121182123
|
||||
owner: pinky
|
||||
ipa_host: ipa.example.com
|
||||
ipa_user: admin
|
||||
ipa_pass: topsecret
|
||||
|
||||
- name: Update Token123 to indicate a vendor, model, serial number (info only), and description
|
||||
community.general.ipa_otptoken:
|
||||
uniqueid: Token123
|
||||
vendor: Acme
|
||||
model: acme101
|
||||
serial: SerialNumber1
|
||||
description: Acme OTP device
|
||||
ipa_host: ipa.example.com
|
||||
ipa_user: admin
|
||||
ipa_pass: topsecret
|
||||
|
||||
- name: Disable Token123
|
||||
community.general.ipa_otptoken:
|
||||
uniqueid: Token123
|
||||
enabled: false
|
||||
ipa_host: ipa.example.com
|
||||
ipa_user: admin
|
||||
ipa_pass: topsecret
|
||||
|
||||
- name: Rename Token123 to TokenABC and enable it
|
||||
community.general.ipa_otptoken:
|
||||
uniqueid: Token123
|
||||
newuniqueid: TokenABC
|
||||
enabled: true
|
||||
ipa_host: ipa.example.com
|
||||
ipa_user: admin
|
||||
ipa_pass: topsecret
|
||||
'''
|
||||
|
||||
RETURN = r'''
|
||||
otptoken:
|
||||
description: OTP Token as returned by IPA API
|
||||
returned: always
|
||||
type: dict
|
||||
'''
|
||||
|
||||
import base64
|
||||
import traceback
|
||||
|
||||
from ansible.module_utils.basic import AnsibleModule, sanitize_keys
|
||||
from ansible_collections.community.general.plugins.module_utils.ipa import IPAClient, ipa_argument_spec
|
||||
from ansible.module_utils._text import to_native
|
||||
|
||||
|
||||
class OTPTokenIPAClient(IPAClient):
|
||||
def __init__(self, module, host, port, protocol):
|
||||
super(OTPTokenIPAClient, self).__init__(module, host, port, protocol)
|
||||
|
||||
def otptoken_find(self, name):
|
||||
return self._post_json(method='otptoken_find', name=None, item={'all': True,
|
||||
'ipatokenuniqueid': name,
|
||||
'timelimit': '0',
|
||||
'sizelimit': '0'})
|
||||
|
||||
def otptoken_add(self, name, item):
|
||||
return self._post_json(method='otptoken_add', name=name, item=item)
|
||||
|
||||
def otptoken_mod(self, name, item):
|
||||
return self._post_json(method='otptoken_mod', name=name, item=item)
|
||||
|
||||
def otptoken_del(self, name):
|
||||
return self._post_json(method='otptoken_del', name=name)
|
||||
|
||||
|
||||
def base64_to_base32(base64_string):
|
||||
"""Converts base64 string to base32 string"""
|
||||
b32_string = base64.b32encode(base64.b64decode(base64_string)).decode('ascii')
|
||||
return b32_string
|
||||
|
||||
|
||||
def base32_to_base64(base32_string):
|
||||
"""Converts base32 string to base64 string"""
|
||||
b64_string = base64.b64encode(base64.b32decode(base32_string)).decode('ascii')
|
||||
return b64_string
|
||||
|
||||
|
||||
def get_otptoken_dict(ansible_to_ipa, uniqueid=None, newuniqueid=None, otptype=None, secretkey=None, description=None, owner=None,
|
||||
enabled=None, notbefore=None, notafter=None, vendor=None,
|
||||
model=None, serial=None, algorithm=None, digits=None, offset=None,
|
||||
interval=None, counter=None):
|
||||
"""Create the dictionary of settings passed in"""
|
||||
|
||||
otptoken = {}
|
||||
if uniqueid is not None:
|
||||
otptoken[ansible_to_ipa['uniqueid']] = uniqueid
|
||||
if newuniqueid is not None:
|
||||
otptoken[ansible_to_ipa['newuniqueid']] = newuniqueid
|
||||
if otptype is not None:
|
||||
otptoken[ansible_to_ipa['otptype']] = otptype.upper()
|
||||
if secretkey is not None:
|
||||
# For some unknown reason, while IPA returns the secret in base64,
|
||||
# it wants the secret passed in as base32. This makes it more difficult
|
||||
# for comparison (does 'current' equal to 'new'). Moreover, this may
|
||||
# cause some subtle issue in a playbook as the output is encoded
|
||||
# in a different way than if it was passed in as a parameter. For
|
||||
# these reasons, have the module standardize on base64 input (as parameter)
|
||||
# and output (from IPA).
|
||||
otptoken[ansible_to_ipa['secretkey']] = base64_to_base32(secretkey)
|
||||
if description is not None:
|
||||
otptoken[ansible_to_ipa['description']] = description
|
||||
if owner is not None:
|
||||
otptoken[ansible_to_ipa['owner']] = owner
|
||||
if enabled is not None:
|
||||
otptoken[ansible_to_ipa['enabled']] = 'FALSE' if enabled else 'TRUE'
|
||||
if notbefore is not None:
|
||||
otptoken[ansible_to_ipa['notbefore']] = notbefore + 'Z'
|
||||
if notafter is not None:
|
||||
otptoken[ansible_to_ipa['notafter']] = notafter + 'Z'
|
||||
if vendor is not None:
|
||||
otptoken[ansible_to_ipa['vendor']] = vendor
|
||||
if model is not None:
|
||||
otptoken[ansible_to_ipa['model']] = model
|
||||
if serial is not None:
|
||||
otptoken[ansible_to_ipa['serial']] = serial
|
||||
if algorithm is not None:
|
||||
otptoken[ansible_to_ipa['algorithm']] = algorithm
|
||||
if digits is not None:
|
||||
otptoken[ansible_to_ipa['digits']] = str(digits)
|
||||
if offset is not None:
|
||||
otptoken[ansible_to_ipa['offset']] = str(offset)
|
||||
if interval is not None:
|
||||
otptoken[ansible_to_ipa['interval']] = str(interval)
|
||||
if counter is not None:
|
||||
otptoken[ansible_to_ipa['counter']] = str(counter)
|
||||
|
||||
return otptoken
|
||||
|
||||
|
||||
def transform_output(ipa_otptoken, ansible_to_ipa, ipa_to_ansible):
|
||||
"""Transform the output received by IPA to a format more friendly
|
||||
before it is returned to the user. IPA returns even simple
|
||||
strings as a list of strings. It also returns bools and
|
||||
int as string. This function cleans that up before return.
|
||||
"""
|
||||
updated_otptoken = ipa_otptoken
|
||||
|
||||
# Used to hold values that will be sanitized from output as no_log.
|
||||
# For the case where secretkey is not specified at the module, but
|
||||
# is passed back from IPA.
|
||||
sanitize_strings = set()
|
||||
|
||||
# Rename the IPA parameters to the more friendly ansible module names for them
|
||||
for ipa_parameter in ipa_to_ansible:
|
||||
if ipa_parameter in ipa_otptoken:
|
||||
updated_otptoken[ipa_to_ansible[ipa_parameter]] = ipa_otptoken[ipa_parameter]
|
||||
updated_otptoken.pop(ipa_parameter)
|
||||
|
||||
# Change the type from IPA's list of string to the appropriate return value type
|
||||
# based on field. By default, assume they should be strings.
|
||||
for ansible_parameter in ansible_to_ipa:
|
||||
if ansible_parameter in updated_otptoken:
|
||||
if isinstance(updated_otptoken[ansible_parameter], list) and len(updated_otptoken[ansible_parameter]) == 1:
|
||||
if ansible_parameter in ['digits', 'offset', 'interval', 'counter']:
|
||||
updated_otptoken[ansible_parameter] = int(updated_otptoken[ansible_parameter][0])
|
||||
elif ansible_parameter == 'enabled':
|
||||
updated_otptoken[ansible_parameter] = bool(updated_otptoken[ansible_parameter][0])
|
||||
else:
|
||||
updated_otptoken[ansible_parameter] = updated_otptoken[ansible_parameter][0]
|
||||
|
||||
if 'secretkey' in updated_otptoken:
|
||||
if isinstance(updated_otptoken['secretkey'], dict):
|
||||
if '__base64__' in updated_otptoken['secretkey']:
|
||||
sanitize_strings.add(updated_otptoken['secretkey']['__base64__'])
|
||||
b64key = updated_otptoken['secretkey']['__base64__']
|
||||
updated_otptoken.pop('secretkey')
|
||||
updated_otptoken['secretkey'] = b64key
|
||||
sanitize_strings.add(b64key)
|
||||
elif '__base32__' in updated_otptoken['secretkey']:
|
||||
sanitize_strings.add(updated_otptoken['secretkey']['__base32__'])
|
||||
b32key = updated_otptoken['secretkey']['__base32__']
|
||||
b64key = base32_to_base64(b32key)
|
||||
updated_otptoken.pop('secretkey')
|
||||
updated_otptoken['secretkey'] = b64key
|
||||
sanitize_strings.add(b32key)
|
||||
sanitize_strings.add(b64key)
|
||||
|
||||
return updated_otptoken, sanitize_strings
|
||||
|
||||
|
||||
def validate_modifications(ansible_to_ipa, module, ipa_otptoken,
|
||||
module_otptoken, unmodifiable_after_creation):
|
||||
"""Checks to see if the requested modifications are valid. Some elements
|
||||
cannot be modified after initial creation. However, we still want to
|
||||
validate arguments that are specified, but are not different than what
|
||||
is currently set on the server.
|
||||
"""
|
||||
|
||||
modifications_valid = True
|
||||
|
||||
for parameter in unmodifiable_after_creation:
|
||||
if ansible_to_ipa[parameter] in module_otptoken and ansible_to_ipa[parameter] in ipa_otptoken:
|
||||
mod_value = module_otptoken[ansible_to_ipa[parameter]]
|
||||
|
||||
# For someone unknown reason, the returns from IPA put almost all
|
||||
# values in a list, even though passing them in a list (even of
|
||||
# length 1) will be rejected. The module values for all elements
|
||||
# other than type (totp or hotp) have this happen.
|
||||
if parameter == 'otptype':
|
||||
ipa_value = ipa_otptoken[ansible_to_ipa[parameter]]
|
||||
else:
|
||||
if len(ipa_otptoken[ansible_to_ipa[parameter]]) != 1:
|
||||
module.fail_json(msg=("Invariant fail: Return value from IPA is not a list " +
|
||||
"of length 1. Please open a bug report for the module."))
|
||||
if parameter == 'secretkey':
|
||||
# We stored the secret key in base32 since we had assumed that would need to
|
||||
# be the format if we were contacting IPA to create it. However, we are
|
||||
# now comparing it against what is already set in the IPA server, so convert
|
||||
# back to base64 for comparison.
|
||||
mod_value = base32_to_base64(mod_value)
|
||||
|
||||
# For the secret key, it is even more specific in that the key is returned
|
||||
# in a dict, in the list, as the __base64__ entry for the IPA response.
|
||||
ipa_value = ipa_otptoken[ansible_to_ipa[parameter]][0]['__base64__']
|
||||
if '__base64__' in ipa_otptoken[ansible_to_ipa[parameter]][0]:
|
||||
ipa_value = ipa_otptoken[ansible_to_ipa[parameter]][0]['__base64__']
|
||||
elif '__base32__' in ipa_otptoken[ansible_to_ipa[parameter]][0]:
|
||||
b32key = ipa_otptoken[ansible_to_ipa[parameter]][0]['__base32__']
|
||||
b64key = base32_to_base64(b32key)
|
||||
ipa_value = b64key
|
||||
else:
|
||||
ipa_value = None
|
||||
else:
|
||||
ipa_value = ipa_otptoken[ansible_to_ipa[parameter]][0]
|
||||
|
||||
if mod_value != ipa_value:
|
||||
modifications_valid = False
|
||||
fail_message = ("Parameter '" + parameter + "' cannot be changed once " +
|
||||
"the OTP is created and the requested value specified here (" +
|
||||
str(mod_value) +
|
||||
") differs from what is set in the IPA server ("
|
||||
+ str(ipa_value) + ")")
|
||||
module.fail_json(msg=fail_message)
|
||||
|
||||
return modifications_valid
|
||||
|
||||
|
||||
def ensure(module, client):
|
||||
# dict to map from ansible parameter names to attribute names
|
||||
# used by IPA (which are not so friendly).
|
||||
ansible_to_ipa = {'uniqueid': 'ipatokenuniqueid',
|
||||
'newuniqueid': 'rename',
|
||||
'otptype': 'type',
|
||||
'secretkey': 'ipatokenotpkey',
|
||||
'description': 'description',
|
||||
'owner': 'ipatokenowner',
|
||||
'enabled': 'ipatokendisabled',
|
||||
'notbefore': 'ipatokennotbefore',
|
||||
'notafter': 'ipatokennotafter',
|
||||
'vendor': 'ipatokenvendor',
|
||||
'model': 'ipatokenmodel',
|
||||
'serial': 'ipatokenserial',
|
||||
'algorithm': 'ipatokenotpalgorithm',
|
||||
'digits': 'ipatokenotpdigits',
|
||||
'offset': 'ipatokentotpclockoffset',
|
||||
'interval': 'ipatokentotptimestep',
|
||||
'counter': 'ipatokenhotpcounter'}
|
||||
|
||||
# Create inverse dictionary for mapping return values
|
||||
ipa_to_ansible = {}
|
||||
for (k, v) in ansible_to_ipa.items():
|
||||
ipa_to_ansible[v] = k
|
||||
|
||||
unmodifiable_after_creation = ['otptype', 'secretkey', 'algorithm',
|
||||
'digits', 'offset', 'interval', 'counter']
|
||||
state = module.params['state']
|
||||
uniqueid = module.params['uniqueid']
|
||||
|
||||
module_otptoken = get_otptoken_dict(ansible_to_ipa=ansible_to_ipa,
|
||||
uniqueid=module.params.get('uniqueid'),
|
||||
newuniqueid=module.params.get('newuniqueid'),
|
||||
otptype=module.params.get('otptype'),
|
||||
secretkey=module.params.get('secretkey'),
|
||||
description=module.params.get('description'),
|
||||
owner=module.params.get('owner'),
|
||||
enabled=module.params.get('enabled'),
|
||||
notbefore=module.params.get('notbefore'),
|
||||
notafter=module.params.get('notafter'),
|
||||
vendor=module.params.get('vendor'),
|
||||
model=module.params.get('model'),
|
||||
serial=module.params.get('serial'),
|
||||
algorithm=module.params.get('algorithm'),
|
||||
digits=module.params.get('digits'),
|
||||
offset=module.params.get('offset'),
|
||||
interval=module.params.get('interval'),
|
||||
counter=module.params.get('counter'))
|
||||
|
||||
ipa_otptoken = client.otptoken_find(name=uniqueid)
|
||||
|
||||
if ansible_to_ipa['newuniqueid'] in module_otptoken:
|
||||
# Check to see if the new unique id is already taken in use
|
||||
ipa_otptoken_new = client.otptoken_find(name=module_otptoken[ansible_to_ipa['newuniqueid']])
|
||||
if ipa_otptoken_new:
|
||||
module.fail_json(msg=("Requested rename through newuniqueid to " +
|
||||
module_otptoken[ansible_to_ipa['newuniqueid']] +
|
||||
" failed because the new unique id is already in use"))
|
||||
|
||||
changed = False
|
||||
if state == 'present':
|
||||
if not ipa_otptoken:
|
||||
changed = True
|
||||
if not module.check_mode:
|
||||
# It would not make sense to have a rename after creation, so if the user
|
||||
# specified a newuniqueid, just replace the uniqueid with the updated one
|
||||
# before creation
|
||||
if ansible_to_ipa['newuniqueid'] in module_otptoken:
|
||||
module_otptoken[ansible_to_ipa['uniqueid']] = module_otptoken[ansible_to_ipa['newuniqueid']]
|
||||
uniqueid = module_otptoken[ansible_to_ipa['newuniqueid']]
|
||||
module_otptoken.pop(ansible_to_ipa['newuniqueid'])
|
||||
|
||||
# IPA wants the unique id in the first position and not as a key/value pair.
|
||||
# Get rid of it from the otptoken dict and just specify it in the name field
|
||||
# for otptoken_add.
|
||||
if ansible_to_ipa['uniqueid'] in module_otptoken:
|
||||
module_otptoken.pop(ansible_to_ipa['uniqueid'])
|
||||
|
||||
module_otptoken['all'] = True
|
||||
ipa_otptoken = client.otptoken_add(name=uniqueid, item=module_otptoken)
|
||||
else:
|
||||
if not(validate_modifications(ansible_to_ipa, module, ipa_otptoken,
|
||||
module_otptoken, unmodifiable_after_creation)):
|
||||
module.fail_json(msg="Modifications requested in module are not valid")
|
||||
|
||||
# IPA will reject 'modifications' that do not actually modify anything
|
||||
# if any of the unmodifiable elements are specified. Explicitly
|
||||
# get rid of them here. They were not different or else the
|
||||
# we would have failed out in validate_modifications.
|
||||
for x in unmodifiable_after_creation:
|
||||
if ansible_to_ipa[x] in module_otptoken:
|
||||
module_otptoken.pop(ansible_to_ipa[x])
|
||||
|
||||
diff = client.get_diff(ipa_data=ipa_otptoken, module_data=module_otptoken)
|
||||
if len(diff) > 0:
|
||||
changed = True
|
||||
if not module.check_mode:
|
||||
|
||||
# IPA wants the unique id in the first position and not as a key/value pair.
|
||||
# Get rid of it from the otptoken dict and just specify it in the name field
|
||||
# for otptoken_mod.
|
||||
if ansible_to_ipa['uniqueid'] in module_otptoken:
|
||||
module_otptoken.pop(ansible_to_ipa['uniqueid'])
|
||||
|
||||
module_otptoken['all'] = True
|
||||
ipa_otptoken = client.otptoken_mod(name=uniqueid, item=module_otptoken)
|
||||
else:
|
||||
if ipa_otptoken:
|
||||
changed = True
|
||||
if not module.check_mode:
|
||||
client.otptoken_del(name=uniqueid)
|
||||
|
||||
# Transform the output to use ansible keywords (not the IPA keywords) and
|
||||
# sanitize any key values in the output.
|
||||
ipa_otptoken, sanitize_strings = transform_output(ipa_otptoken, ansible_to_ipa, ipa_to_ansible)
|
||||
module.no_log_values = module.no_log_values.union(sanitize_strings)
|
||||
sanitized_otptoken = sanitize_keys(obj=ipa_otptoken, no_log_strings=module.no_log_values)
|
||||
return changed, sanitized_otptoken
|
||||
|
||||
|
||||
def main():
|
||||
argument_spec = ipa_argument_spec()
|
||||
argument_spec.update(uniqueid=dict(type='str', aliases=['name'], required=True),
|
||||
newuniqueid=dict(type='str'),
|
||||
otptype=dict(type='str', choices=['totp', 'hotp']),
|
||||
secretkey=dict(type='str', no_log=True),
|
||||
description=dict(type='str'),
|
||||
owner=dict(type='str'),
|
||||
enabled=dict(type='bool', default=True),
|
||||
notbefore=dict(type='str'),
|
||||
notafter=dict(type='str'),
|
||||
vendor=dict(type='str'),
|
||||
model=dict(type='str'),
|
||||
serial=dict(type='str'),
|
||||
state=dict(type='str', choices=['present', 'absent'], default='present'),
|
||||
algorithm=dict(type='str', choices=['sha1', 'sha256', 'sha384', 'sha512']),
|
||||
digits=dict(type='int', choices=[6, 8]),
|
||||
offset=dict(type='int'),
|
||||
interval=dict(type='int'),
|
||||
counter=dict(type='int'))
|
||||
|
||||
module = AnsibleModule(argument_spec=argument_spec,
|
||||
supports_check_mode=True)
|
||||
|
||||
client = OTPTokenIPAClient(module=module,
|
||||
host=module.params['ipa_host'],
|
||||
port=module.params['ipa_port'],
|
||||
protocol=module.params['ipa_prot'])
|
||||
|
||||
try:
|
||||
client.login(username=module.params['ipa_user'],
|
||||
password=module.params['ipa_pass'])
|
||||
changed, otptoken = ensure(module, client)
|
||||
except Exception as e:
|
||||
module.fail_json(msg=to_native(e), exception=traceback.format_exc())
|
||||
|
||||
module.exit_json(changed=changed, otptoken=otptoken)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
1
plugins/modules/ipa_otpconfig.py
Symbolic link
1
plugins/modules/ipa_otpconfig.py
Symbolic link
|
@ -0,0 +1 @@
|
|||
./identity/ipa/ipa_otpconfig.py
|
1
plugins/modules/ipa_otptoken.py
Symbolic link
1
plugins/modules/ipa_otptoken.py
Symbolic link
|
@ -0,0 +1 @@
|
|||
./identity/ipa/ipa_otptoken.py
|
406
tests/unit/plugins/modules/identity/ipa/test_ipa_otpconfig.py
Normal file
406
tests/unit/plugins/modules/identity/ipa/test_ipa_otpconfig.py
Normal file
|
@ -0,0 +1,406 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
|
||||
# Copyright: (c) 2020, Ansible Project
|
||||
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
|
||||
|
||||
from __future__ import absolute_import, division, print_function
|
||||
__metaclass__ = type
|
||||
|
||||
from contextlib import contextmanager
|
||||
|
||||
from ansible_collections.community.general.tests.unit.compat import unittest
|
||||
from ansible_collections.community.general.tests.unit.compat.mock import call, patch
|
||||
from ansible_collections.community.general.tests.unit.plugins.modules.utils import AnsibleExitJson, AnsibleFailJson, ModuleTestCase, set_module_args
|
||||
|
||||
from ansible_collections.community.general.plugins.modules.identity.ipa import ipa_otpconfig
|
||||
|
||||
|
||||
@contextmanager
|
||||
def patch_ipa(**kwargs):
|
||||
"""Mock context manager for patching the methods in OTPConfigIPAClient that contact the IPA server
|
||||
|
||||
Patches the `login` and `_post_json` methods
|
||||
|
||||
Keyword arguments are passed to the mock object that patches `_post_json`
|
||||
|
||||
No arguments are passed to the mock object that patches `login` because no tests require it
|
||||
|
||||
Example::
|
||||
|
||||
with patch_ipa(return_value={}) as (mock_login, mock_post):
|
||||
...
|
||||
"""
|
||||
obj = ipa_otpconfig.OTPConfigIPAClient
|
||||
with patch.object(obj, 'login') as mock_login:
|
||||
with patch.object(obj, '_post_json', **kwargs) as mock_post:
|
||||
yield mock_login, mock_post
|
||||
|
||||
|
||||
class TestIPAOTPConfig(ModuleTestCase):
|
||||
def setUp(self):
|
||||
super(TestIPAOTPConfig, self).setUp()
|
||||
self.module = ipa_otpconfig
|
||||
|
||||
def _test_base(self, module_args, return_value, mock_calls, changed):
|
||||
"""Base function that's called by all the other test functions
|
||||
|
||||
module_args (dict):
|
||||
Arguments passed to the module
|
||||
|
||||
return_value (dict):
|
||||
Mocked return value of OTPConfigIPAClient.otpconfig_show, as returned by the IPA API.
|
||||
This should be set to the current state. It will be changed to the desired state using the above arguments.
|
||||
(Technically, this is the return value of _post_json, but it's only checked by otpconfig_show).
|
||||
|
||||
mock_calls (list/tuple of dicts):
|
||||
List of calls made to OTPConfigIPAClient._post_json, in order.
|
||||
_post_json is called by all of the otpconfig_* methods of the class.
|
||||
Pass an empty list if no calls are expected.
|
||||
|
||||
changed (bool):
|
||||
Whether or not the module is supposed to be marked as changed
|
||||
"""
|
||||
set_module_args(module_args)
|
||||
|
||||
# Run the module
|
||||
with patch_ipa(return_value=return_value) as (mock_login, mock_post):
|
||||
with self.assertRaises(AnsibleExitJson) as exec_info:
|
||||
self.module.main()
|
||||
|
||||
# Verify that the calls to _post_json match what is expected
|
||||
expected_call_count = len(mock_calls)
|
||||
if expected_call_count > 1:
|
||||
# Convert the call dicts to unittest.mock.call instances because `assert_has_calls` only accepts them
|
||||
converted_calls = []
|
||||
for call_dict in mock_calls:
|
||||
converted_calls.append(call(**call_dict))
|
||||
|
||||
mock_post.assert_has_calls(converted_calls)
|
||||
self.assertEqual(len(mock_post.mock_calls), expected_call_count)
|
||||
elif expected_call_count == 1:
|
||||
mock_post.assert_called_once_with(**mock_calls[0])
|
||||
else: # expected_call_count is 0
|
||||
mock_post.assert_not_called()
|
||||
|
||||
# Verify that the module's changed status matches what is expected
|
||||
self.assertIs(exec_info.exception.args[0]['changed'], changed)
|
||||
|
||||
def test_set_all_no_adjustment(self):
|
||||
"""Set values requiring no adjustment"""
|
||||
module_args = {
|
||||
'ipatokentotpauthwindow': 11,
|
||||
'ipatokentotpsyncwindow': 12,
|
||||
'ipatokenhotpauthwindow': 13,
|
||||
'ipatokenhotpsyncwindow': 14
|
||||
}
|
||||
return_value = {
|
||||
'ipatokentotpauthwindow': ['11'],
|
||||
'ipatokentotpsyncwindow': ['12'],
|
||||
'ipatokenhotpauthwindow': ['13'],
|
||||
'ipatokenhotpsyncwindow': ['14']}
|
||||
mock_calls = (
|
||||
{
|
||||
'method': 'otpconfig_show',
|
||||
'name': None
|
||||
},
|
||||
{
|
||||
'method': 'otpconfig_show',
|
||||
'name': None
|
||||
}
|
||||
)
|
||||
changed = False
|
||||
|
||||
self._test_base(module_args, return_value, mock_calls, changed)
|
||||
|
||||
def test_set_all_aliases_no_adjustment(self):
|
||||
"""Set values requiring no adjustment on all using aliases values"""
|
||||
module_args = {
|
||||
'totpauthwindow': 11,
|
||||
'totpsyncwindow': 12,
|
||||
'hotpauthwindow': 13,
|
||||
'hotpsyncwindow': 14
|
||||
}
|
||||
return_value = {
|
||||
'ipatokentotpauthwindow': ['11'],
|
||||
'ipatokentotpsyncwindow': ['12'],
|
||||
'ipatokenhotpauthwindow': ['13'],
|
||||
'ipatokenhotpsyncwindow': ['14']}
|
||||
mock_calls = (
|
||||
{
|
||||
'method': 'otpconfig_show',
|
||||
'name': None
|
||||
},
|
||||
{
|
||||
'method': 'otpconfig_show',
|
||||
'name': None
|
||||
}
|
||||
)
|
||||
changed = False
|
||||
|
||||
self._test_base(module_args, return_value, mock_calls, changed)
|
||||
|
||||
def test_set_totp_auth_window_no_adjustment(self):
|
||||
"""Set values requiring no adjustment on totpauthwindow"""
|
||||
module_args = {
|
||||
'totpauthwindow': 11
|
||||
}
|
||||
return_value = {
|
||||
'ipatokentotpauthwindow': ['11'],
|
||||
'ipatokentotpsyncwindow': ['12'],
|
||||
'ipatokenhotpauthwindow': ['13'],
|
||||
'ipatokenhotpsyncwindow': ['14']}
|
||||
mock_calls = (
|
||||
{
|
||||
'method': 'otpconfig_show',
|
||||
'name': None
|
||||
},
|
||||
{
|
||||
'method': 'otpconfig_show',
|
||||
'name': None
|
||||
}
|
||||
)
|
||||
changed = False
|
||||
|
||||
self._test_base(module_args, return_value, mock_calls, changed)
|
||||
|
||||
def test_set_totp_sync_window_no_adjustment(self):
|
||||
"""Set values requiring no adjustment on totpsyncwindow"""
|
||||
module_args = {
|
||||
'totpsyncwindow': 12
|
||||
}
|
||||
return_value = {
|
||||
'ipatokentotpauthwindow': ['11'],
|
||||
'ipatokentotpsyncwindow': ['12'],
|
||||
'ipatokenhotpauthwindow': ['13'],
|
||||
'ipatokenhotpsyncwindow': ['14']}
|
||||
mock_calls = (
|
||||
{
|
||||
'method': 'otpconfig_show',
|
||||
'name': None
|
||||
},
|
||||
{
|
||||
'method': 'otpconfig_show',
|
||||
'name': None
|
||||
}
|
||||
)
|
||||
changed = False
|
||||
|
||||
self._test_base(module_args, return_value, mock_calls, changed)
|
||||
|
||||
def test_set_hotp_auth_window_no_adjustment(self):
|
||||
"""Set values requiring no adjustment on hotpauthwindow"""
|
||||
module_args = {
|
||||
'hotpauthwindow': 13
|
||||
}
|
||||
return_value = {
|
||||
'ipatokentotpauthwindow': ['11'],
|
||||
'ipatokentotpsyncwindow': ['12'],
|
||||
'ipatokenhotpauthwindow': ['13'],
|
||||
'ipatokenhotpsyncwindow': ['14']}
|
||||
mock_calls = (
|
||||
{
|
||||
'method': 'otpconfig_show',
|
||||
'name': None
|
||||
},
|
||||
{
|
||||
'method': 'otpconfig_show',
|
||||
'name': None
|
||||
}
|
||||
)
|
||||
changed = False
|
||||
|
||||
self._test_base(module_args, return_value, mock_calls, changed)
|
||||
|
||||
def test_set_hotp_sync_window_no_adjustment(self):
|
||||
"""Set values requiring no adjustment on hotpsyncwindow"""
|
||||
module_args = {
|
||||
'hotpsyncwindow': 14
|
||||
}
|
||||
return_value = {
|
||||
'ipatokentotpauthwindow': ['11'],
|
||||
'ipatokentotpsyncwindow': ['12'],
|
||||
'ipatokenhotpauthwindow': ['13'],
|
||||
'ipatokenhotpsyncwindow': ['14']}
|
||||
mock_calls = (
|
||||
{
|
||||
'method': 'otpconfig_show',
|
||||
'name': None
|
||||
},
|
||||
{
|
||||
'method': 'otpconfig_show',
|
||||
'name': None
|
||||
}
|
||||
)
|
||||
changed = False
|
||||
|
||||
self._test_base(module_args, return_value, mock_calls, changed)
|
||||
|
||||
def test_set_totp_auth_window(self):
|
||||
"""Set values requiring adjustment on totpauthwindow"""
|
||||
module_args = {
|
||||
'totpauthwindow': 10
|
||||
}
|
||||
return_value = {
|
||||
'ipatokentotpauthwindow': ['11'],
|
||||
'ipatokentotpsyncwindow': ['12'],
|
||||
'ipatokenhotpauthwindow': ['13'],
|
||||
'ipatokenhotpsyncwindow': ['14']}
|
||||
mock_calls = (
|
||||
{
|
||||
'method': 'otpconfig_show',
|
||||
'name': None
|
||||
},
|
||||
{
|
||||
'method': 'otpconfig_mod',
|
||||
'name': None,
|
||||
'item': {'ipatokentotpauthwindow': '10'}
|
||||
},
|
||||
{
|
||||
'method': 'otpconfig_show',
|
||||
'name': None
|
||||
}
|
||||
)
|
||||
changed = True
|
||||
|
||||
self._test_base(module_args, return_value, mock_calls, changed)
|
||||
|
||||
def test_set_totp_sync_window(self):
|
||||
"""Set values requiring adjustment on totpsyncwindow"""
|
||||
module_args = {
|
||||
'totpsyncwindow': 10
|
||||
}
|
||||
return_value = {
|
||||
'ipatokentotpauthwindow': ['11'],
|
||||
'ipatokentotpsyncwindow': ['12'],
|
||||
'ipatokenhotpauthwindow': ['13'],
|
||||
'ipatokenhotpsyncwindow': ['14']}
|
||||
mock_calls = (
|
||||
{
|
||||
'method': 'otpconfig_show',
|
||||
'name': None
|
||||
},
|
||||
{
|
||||
'method': 'otpconfig_mod',
|
||||
'name': None,
|
||||
'item': {'ipatokentotpsyncwindow': '10'}
|
||||
},
|
||||
{
|
||||
'method': 'otpconfig_show',
|
||||
'name': None
|
||||
}
|
||||
)
|
||||
changed = True
|
||||
|
||||
self._test_base(module_args, return_value, mock_calls, changed)
|
||||
|
||||
def test_set_hotp_auth_window(self):
|
||||
"""Set values requiring adjustment on hotpauthwindow"""
|
||||
module_args = {
|
||||
'hotpauthwindow': 10
|
||||
}
|
||||
return_value = {
|
||||
'ipatokentotpauthwindow': ['11'],
|
||||
'ipatokentotpsyncwindow': ['12'],
|
||||
'ipatokenhotpauthwindow': ['13'],
|
||||
'ipatokenhotpsyncwindow': ['14']}
|
||||
mock_calls = (
|
||||
{
|
||||
'method': 'otpconfig_show',
|
||||
'name': None
|
||||
},
|
||||
{
|
||||
'method': 'otpconfig_mod',
|
||||
'name': None,
|
||||
'item': {'ipatokenhotpauthwindow': '10'}
|
||||
},
|
||||
{
|
||||
'method': 'otpconfig_show',
|
||||
'name': None
|
||||
}
|
||||
)
|
||||
changed = True
|
||||
|
||||
self._test_base(module_args, return_value, mock_calls, changed)
|
||||
|
||||
def test_set_hotp_sync_window(self):
|
||||
"""Set values requiring adjustment on hotpsyncwindow"""
|
||||
module_args = {
|
||||
'hotpsyncwindow': 10
|
||||
}
|
||||
return_value = {
|
||||
'ipatokentotpauthwindow': ['11'],
|
||||
'ipatokentotpsyncwindow': ['12'],
|
||||
'ipatokenhotpauthwindow': ['13'],
|
||||
'ipatokenhotpsyncwindow': ['14']}
|
||||
mock_calls = (
|
||||
{
|
||||
'method': 'otpconfig_show',
|
||||
'name': None
|
||||
},
|
||||
{
|
||||
'method': 'otpconfig_mod',
|
||||
'name': None,
|
||||
'item': {'ipatokenhotpsyncwindow': '10'}
|
||||
},
|
||||
{
|
||||
'method': 'otpconfig_show',
|
||||
'name': None
|
||||
}
|
||||
)
|
||||
changed = True
|
||||
|
||||
self._test_base(module_args, return_value, mock_calls, changed)
|
||||
|
||||
def test_set_all(self):
|
||||
"""Set values requiring adjustment on all"""
|
||||
module_args = {
|
||||
'ipatokentotpauthwindow': 11,
|
||||
'ipatokentotpsyncwindow': 12,
|
||||
'ipatokenhotpauthwindow': 13,
|
||||
'ipatokenhotpsyncwindow': 14
|
||||
}
|
||||
return_value = {
|
||||
'ipatokentotpauthwindow': ['1'],
|
||||
'ipatokentotpsyncwindow': ['2'],
|
||||
'ipatokenhotpauthwindow': ['3'],
|
||||
'ipatokenhotpsyncwindow': ['4']}
|
||||
mock_calls = (
|
||||
{
|
||||
'method': 'otpconfig_show',
|
||||
'name': None
|
||||
},
|
||||
{
|
||||
'method': 'otpconfig_mod',
|
||||
'name': None,
|
||||
'item': {'ipatokentotpauthwindow': '11',
|
||||
'ipatokentotpsyncwindow': '12',
|
||||
'ipatokenhotpauthwindow': '13',
|
||||
'ipatokenhotpsyncwindow': '14'}
|
||||
},
|
||||
{
|
||||
'method': 'otpconfig_show',
|
||||
'name': None
|
||||
}
|
||||
)
|
||||
changed = True
|
||||
|
||||
self._test_base(module_args, return_value, mock_calls, changed)
|
||||
|
||||
def test_fail_post(self):
|
||||
"""Fail due to an exception raised from _post_json"""
|
||||
set_module_args({
|
||||
'ipatokentotpauthwindow': 11,
|
||||
'ipatokentotpsyncwindow': 12,
|
||||
'ipatokenhotpauthwindow': 13,
|
||||
'ipatokenhotpsyncwindow': 14
|
||||
})
|
||||
|
||||
with patch_ipa(side_effect=Exception('ERROR MESSAGE')) as (mock_login, mock_post):
|
||||
with self.assertRaises(AnsibleFailJson) as exec_info:
|
||||
self.module.main()
|
||||
|
||||
self.assertEqual(exec_info.exception.args[0]['msg'], 'ERROR MESSAGE')
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
495
tests/unit/plugins/modules/identity/ipa/test_ipa_otptoken.py
Normal file
495
tests/unit/plugins/modules/identity/ipa/test_ipa_otptoken.py
Normal file
|
@ -0,0 +1,495 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
|
||||
# Copyright: (c) 2020, Ansible Project
|
||||
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
|
||||
|
||||
from __future__ import absolute_import, division, print_function
|
||||
__metaclass__ = type
|
||||
|
||||
from contextlib import contextmanager
|
||||
|
||||
from ansible_collections.community.general.tests.unit.compat import unittest
|
||||
from ansible_collections.community.general.tests.unit.compat.mock import call, patch
|
||||
from ansible_collections.community.general.tests.unit.plugins.modules.utils import AnsibleExitJson, AnsibleFailJson, ModuleTestCase, set_module_args
|
||||
|
||||
from ansible_collections.community.general.plugins.modules.identity.ipa import ipa_otptoken
|
||||
|
||||
|
||||
@contextmanager
|
||||
def patch_ipa(**kwargs):
|
||||
"""Mock context manager for patching the methods in OTPTokenIPAClient that contact the IPA server
|
||||
|
||||
Patches the `login` and `_post_json` methods
|
||||
|
||||
Keyword arguments are passed to the mock object that patches `_post_json`
|
||||
|
||||
No arguments are passed to the mock object that patches `login` because no tests require it
|
||||
|
||||
Example::
|
||||
|
||||
with patch_ipa(return_value={}) as (mock_login, mock_post):
|
||||
...
|
||||
"""
|
||||
obj = ipa_otptoken.OTPTokenIPAClient
|
||||
with patch.object(obj, 'login') as mock_login:
|
||||
with patch.object(obj, '_post_json', **kwargs) as mock_post:
|
||||
yield mock_login, mock_post
|
||||
|
||||
|
||||
class TestIPAOTPToken(ModuleTestCase):
|
||||
def setUp(self):
|
||||
super(TestIPAOTPToken, self).setUp()
|
||||
self.module = ipa_otptoken
|
||||
|
||||
def _test_base(self, module_args, return_value, mock_calls, changed):
|
||||
"""Base function that's called by all the other test functions
|
||||
|
||||
module_args (dict):
|
||||
Arguments passed to the module
|
||||
|
||||
return_value (dict):
|
||||
Mocked return value of OTPTokenIPAClient.otptoken_show, as returned by the IPA API.
|
||||
This should be set to the current state. It will be changed to the desired state using the above arguments.
|
||||
(Technically, this is the return value of _post_json, but it's only checked by otptoken_show).
|
||||
|
||||
mock_calls (list/tuple of dicts):
|
||||
List of calls made to OTPTokenIPAClient._post_json, in order.
|
||||
_post_json is called by all of the otptoken_* methods of the class.
|
||||
Pass an empty list if no calls are expected.
|
||||
|
||||
changed (bool):
|
||||
Whether or not the module is supposed to be marked as changed
|
||||
"""
|
||||
set_module_args(module_args)
|
||||
|
||||
# Run the module
|
||||
with patch_ipa(return_value=return_value) as (mock_login, mock_post):
|
||||
with self.assertRaises(AnsibleExitJson) as exec_info:
|
||||
self.module.main()
|
||||
|
||||
# Verify that the calls to _post_json match what is expected
|
||||
expected_call_count = len(mock_calls)
|
||||
if expected_call_count > 1:
|
||||
# Convert the call dicts to unittest.mock.call instances because `assert_has_calls` only accepts them
|
||||
converted_calls = []
|
||||
for call_dict in mock_calls:
|
||||
converted_calls.append(call(**call_dict))
|
||||
|
||||
mock_post.assert_has_calls(converted_calls)
|
||||
self.assertEqual(len(mock_post.mock_calls), expected_call_count)
|
||||
elif expected_call_count == 1:
|
||||
mock_post.assert_called_once_with(**mock_calls[0])
|
||||
else: # expected_call_count is 0
|
||||
mock_post.assert_not_called()
|
||||
|
||||
# Verify that the module's changed status matches what is expected
|
||||
self.assertIs(exec_info.exception.args[0]['changed'], changed)
|
||||
|
||||
def test_add_new_all_default(self):
|
||||
"""Add a new OTP with all default values"""
|
||||
module_args = {
|
||||
'uniqueid': 'NewToken1'
|
||||
}
|
||||
return_value = {}
|
||||
mock_calls = (
|
||||
{
|
||||
'method': 'otptoken_find',
|
||||
'name': None,
|
||||
'item': {'all': True,
|
||||
'ipatokenuniqueid': 'NewToken1',
|
||||
'timelimit': '0',
|
||||
'sizelimit': '0'}
|
||||
},
|
||||
{
|
||||
'method': 'otptoken_add',
|
||||
'name': 'NewToken1',
|
||||
'item': {'ipatokendisabled': 'FALSE',
|
||||
'all': True}
|
||||
}
|
||||
)
|
||||
changed = True
|
||||
|
||||
self._test_base(module_args, return_value, mock_calls, changed)
|
||||
|
||||
def test_add_new_all_default_with_aliases(self):
|
||||
"""Add a new OTP with all default values using alias values"""
|
||||
module_args = {
|
||||
'name': 'NewToken1'
|
||||
}
|
||||
return_value = {}
|
||||
mock_calls = (
|
||||
{
|
||||
'method': 'otptoken_find',
|
||||
'name': None,
|
||||
'item': {'all': True,
|
||||
'ipatokenuniqueid': 'NewToken1',
|
||||
'timelimit': '0',
|
||||
'sizelimit': '0'}
|
||||
},
|
||||
{
|
||||
'method': 'otptoken_add',
|
||||
'name': 'NewToken1',
|
||||
'item': {'ipatokendisabled': 'FALSE',
|
||||
'all': True}
|
||||
}
|
||||
)
|
||||
changed = True
|
||||
|
||||
self._test_base(module_args, return_value, mock_calls, changed)
|
||||
|
||||
def test_add_new_all_specified(self):
|
||||
"""Add a new OTP with all default values"""
|
||||
module_args = {
|
||||
'uniqueid': 'NewToken1',
|
||||
'otptype': 'hotp',
|
||||
'secretkey': 'VGVzdFNlY3JldDE=',
|
||||
'description': 'Test description',
|
||||
'owner': 'pinky',
|
||||
'enabled': True,
|
||||
'notbefore': '20200101010101',
|
||||
'notafter': '20900101010101',
|
||||
'vendor': 'Acme',
|
||||
'model': 'ModelT',
|
||||
'serial': 'Number1',
|
||||
'state': 'present',
|
||||
'algorithm': 'sha256',
|
||||
'digits': 6,
|
||||
'offset': 10,
|
||||
'interval': 30,
|
||||
'counter': 30,
|
||||
}
|
||||
return_value = {}
|
||||
mock_calls = (
|
||||
{
|
||||
'method': 'otptoken_find',
|
||||
'name': None,
|
||||
'item': {'all': True,
|
||||
'ipatokenuniqueid': 'NewToken1',
|
||||
'timelimit': '0',
|
||||
'sizelimit': '0'}
|
||||
},
|
||||
{
|
||||
'method': 'otptoken_add',
|
||||
'name': 'NewToken1',
|
||||
'item': {'type': 'HOTP',
|
||||
'ipatokenotpkey': 'KRSXG5CTMVRXEZLUGE======',
|
||||
'description': 'Test description',
|
||||
'ipatokenowner': 'pinky',
|
||||
'ipatokendisabled': 'FALSE',
|
||||
'ipatokennotbefore': '20200101010101Z',
|
||||
'ipatokennotafter': '20900101010101Z',
|
||||
'ipatokenvendor': 'Acme',
|
||||
'ipatokenmodel': 'ModelT',
|
||||
'ipatokenserial': 'Number1',
|
||||
'ipatokenotpalgorithm': 'sha256',
|
||||
'ipatokenotpdigits': '6',
|
||||
'ipatokentotpclockoffset': '10',
|
||||
'ipatokentotptimestep': '30',
|
||||
'ipatokenhotpcounter': '30',
|
||||
'all': True}
|
||||
}
|
||||
)
|
||||
changed = True
|
||||
|
||||
self._test_base(module_args, return_value, mock_calls, changed)
|
||||
|
||||
def test_already_existing_no_change_all_specified(self):
|
||||
"""Add a new OTP with all values specified but needing no change"""
|
||||
module_args = {
|
||||
'uniqueid': 'NewToken1',
|
||||
'otptype': 'hotp',
|
||||
'secretkey': 'VGVzdFNlY3JldDE=',
|
||||
'description': 'Test description',
|
||||
'owner': 'pinky',
|
||||
'enabled': True,
|
||||
'notbefore': '20200101010101',
|
||||
'notafter': '20900101010101',
|
||||
'vendor': 'Acme',
|
||||
'model': 'ModelT',
|
||||
'serial': 'Number1',
|
||||
'state': 'present',
|
||||
'algorithm': 'sha256',
|
||||
'digits': 6,
|
||||
'offset': 10,
|
||||
'interval': 30,
|
||||
'counter': 30,
|
||||
}
|
||||
return_value = {'ipatokenuniqueid': 'NewToken1',
|
||||
'type': 'HOTP',
|
||||
'ipatokenotpkey': [{'__base64__': 'VGVzdFNlY3JldDE='}],
|
||||
'description': ['Test description'],
|
||||
'ipatokenowner': ['pinky'],
|
||||
'ipatokendisabled': ['FALSE'],
|
||||
'ipatokennotbefore': ['20200101010101Z'],
|
||||
'ipatokennotafter': ['20900101010101Z'],
|
||||
'ipatokenvendor': ['Acme'],
|
||||
'ipatokenmodel': ['ModelT'],
|
||||
'ipatokenserial': ['Number1'],
|
||||
'ipatokenotpalgorithm': ['sha256'],
|
||||
'ipatokenotpdigits': ['6'],
|
||||
'ipatokentotpclockoffset': ['10'],
|
||||
'ipatokentotptimestep': ['30'],
|
||||
'ipatokenhotpcounter': ['30']}
|
||||
mock_calls = [
|
||||
{
|
||||
'method': 'otptoken_find',
|
||||
'name': None,
|
||||
'item': {'all': True,
|
||||
'ipatokenuniqueid': 'NewToken1',
|
||||
'timelimit': '0',
|
||||
'sizelimit': '0'}
|
||||
}
|
||||
]
|
||||
changed = False
|
||||
|
||||
self._test_base(module_args, return_value, mock_calls, changed)
|
||||
|
||||
def test_already_existing_one_change_all_specified(self):
|
||||
"""Modify an existing OTP with one value specified needing change"""
|
||||
module_args = {
|
||||
'uniqueid': 'NewToken1',
|
||||
'otptype': 'hotp',
|
||||
'secretkey': 'VGVzdFNlY3JldDE=',
|
||||
'description': 'Test description',
|
||||
'owner': 'brain',
|
||||
'enabled': True,
|
||||
'notbefore': '20200101010101',
|
||||
'notafter': '20900101010101',
|
||||
'vendor': 'Acme',
|
||||
'model': 'ModelT',
|
||||
'serial': 'Number1',
|
||||
'state': 'present',
|
||||
'algorithm': 'sha256',
|
||||
'digits': 6,
|
||||
'offset': 10,
|
||||
'interval': 30,
|
||||
'counter': 30,
|
||||
}
|
||||
return_value = {'ipatokenuniqueid': 'NewToken1',
|
||||
'type': 'HOTP',
|
||||
'ipatokenotpkey': [{'__base64__': 'VGVzdFNlY3JldDE='}],
|
||||
'description': ['Test description'],
|
||||
'ipatokenowner': ['pinky'],
|
||||
'ipatokendisabled': ['FALSE'],
|
||||
'ipatokennotbefore': ['20200101010101Z'],
|
||||
'ipatokennotafter': ['20900101010101Z'],
|
||||
'ipatokenvendor': ['Acme'],
|
||||
'ipatokenmodel': ['ModelT'],
|
||||
'ipatokenserial': ['Number1'],
|
||||
'ipatokenotpalgorithm': ['sha256'],
|
||||
'ipatokenotpdigits': ['6'],
|
||||
'ipatokentotpclockoffset': ['10'],
|
||||
'ipatokentotptimestep': ['30'],
|
||||
'ipatokenhotpcounter': ['30']}
|
||||
mock_calls = (
|
||||
{
|
||||
'method': 'otptoken_find',
|
||||
'name': None,
|
||||
'item': {'all': True,
|
||||
'ipatokenuniqueid': 'NewToken1',
|
||||
'timelimit': '0',
|
||||
'sizelimit': '0'}
|
||||
},
|
||||
{
|
||||
'method': 'otptoken_mod',
|
||||
'name': 'NewToken1',
|
||||
'item': {'description': 'Test description',
|
||||
'ipatokenowner': 'brain',
|
||||
'ipatokendisabled': 'FALSE',
|
||||
'ipatokennotbefore': '20200101010101Z',
|
||||
'ipatokennotafter': '20900101010101Z',
|
||||
'ipatokenvendor': 'Acme',
|
||||
'ipatokenmodel': 'ModelT',
|
||||
'ipatokenserial': 'Number1',
|
||||
'all': True}
|
||||
}
|
||||
)
|
||||
changed = True
|
||||
|
||||
self._test_base(module_args, return_value, mock_calls, changed)
|
||||
|
||||
def test_already_existing_all_valid_change_all_specified(self):
|
||||
"""Modify an existing OTP with all valid values specified needing change"""
|
||||
module_args = {
|
||||
'uniqueid': 'NewToken1',
|
||||
'otptype': 'hotp',
|
||||
'secretkey': 'VGVzdFNlY3JldDE=',
|
||||
'description': 'New Test description',
|
||||
'owner': 'pinky',
|
||||
'enabled': False,
|
||||
'notbefore': '20200101010102',
|
||||
'notafter': '20900101010102',
|
||||
'vendor': 'NewAcme',
|
||||
'model': 'NewModelT',
|
||||
'serial': 'Number2',
|
||||
'state': 'present',
|
||||
'algorithm': 'sha256',
|
||||
'digits': 6,
|
||||
'offset': 10,
|
||||
'interval': 30,
|
||||
'counter': 30,
|
||||
}
|
||||
return_value = {'ipatokenuniqueid': 'NewToken1',
|
||||
'type': 'HOTP',
|
||||
'ipatokenotpkey': [{'__base64__': 'VGVzdFNlY3JldDE='}],
|
||||
'description': ['Test description'],
|
||||
'ipatokenowner': ['pinky'],
|
||||
'ipatokendisabled': ['FALSE'],
|
||||
'ipatokennotbefore': ['20200101010101Z'],
|
||||
'ipatokennotafter': ['20900101010101Z'],
|
||||
'ipatokenvendor': ['Acme'],
|
||||
'ipatokenmodel': ['ModelT'],
|
||||
'ipatokenserial': ['Number1'],
|
||||
'ipatokenotpalgorithm': ['sha256'],
|
||||
'ipatokenotpdigits': ['6'],
|
||||
'ipatokentotpclockoffset': ['10'],
|
||||
'ipatokentotptimestep': ['30'],
|
||||
'ipatokenhotpcounter': ['30']}
|
||||
mock_calls = (
|
||||
{
|
||||
'method': 'otptoken_find',
|
||||
'name': None,
|
||||
'item': {'all': True,
|
||||
'ipatokenuniqueid': 'NewToken1',
|
||||
'timelimit': '0',
|
||||
'sizelimit': '0'}
|
||||
},
|
||||
{
|
||||
'method': 'otptoken_mod',
|
||||
'name': 'NewToken1',
|
||||
'item': {'description': 'New Test description',
|
||||
'ipatokenowner': 'pinky',
|
||||
'ipatokendisabled': 'TRUE',
|
||||
'ipatokennotbefore': '20200101010102Z',
|
||||
'ipatokennotafter': '20900101010102Z',
|
||||
'ipatokenvendor': 'NewAcme',
|
||||
'ipatokenmodel': 'NewModelT',
|
||||
'ipatokenserial': 'Number2',
|
||||
'all': True}
|
||||
}
|
||||
)
|
||||
changed = True
|
||||
|
||||
self._test_base(module_args, return_value, mock_calls, changed)
|
||||
|
||||
def test_delete_existing_token(self):
|
||||
"""Delete an existing OTP"""
|
||||
module_args = {
|
||||
'uniqueid': 'NewToken1',
|
||||
'state': 'absent'
|
||||
}
|
||||
return_value = {'ipatokenuniqueid': 'NewToken1',
|
||||
'type': 'HOTP',
|
||||
'ipatokenotpkey': [{'__base64__': 'KRSXG5CTMVRXEZLUGE======'}],
|
||||
'description': ['Test description'],
|
||||
'ipatokenowner': ['pinky'],
|
||||
'ipatokendisabled': ['FALSE'],
|
||||
'ipatokennotbefore': ['20200101010101Z'],
|
||||
'ipatokennotafter': ['20900101010101Z'],
|
||||
'ipatokenvendor': ['Acme'],
|
||||
'ipatokenmodel': ['ModelT'],
|
||||
'ipatokenserial': ['Number1'],
|
||||
'ipatokenotpalgorithm': ['sha256'],
|
||||
'ipatokenotpdigits': ['6'],
|
||||
'ipatokentotpclockoffset': ['10'],
|
||||
'ipatokentotptimestep': ['30'],
|
||||
'ipatokenhotpcounter': ['30']}
|
||||
mock_calls = (
|
||||
{
|
||||
'method': 'otptoken_find',
|
||||
'name': None,
|
||||
'item': {'all': True,
|
||||
'ipatokenuniqueid': 'NewToken1',
|
||||
'timelimit': '0',
|
||||
'sizelimit': '0'}
|
||||
},
|
||||
{
|
||||
'method': 'otptoken_del',
|
||||
'name': 'NewToken1'
|
||||
}
|
||||
)
|
||||
changed = True
|
||||
|
||||
self._test_base(module_args, return_value, mock_calls, changed)
|
||||
|
||||
def test_disable_existing_token(self):
|
||||
"""Disable an existing OTP"""
|
||||
module_args = {
|
||||
'uniqueid': 'NewToken1',
|
||||
'otptype': 'hotp',
|
||||
'enabled': False
|
||||
}
|
||||
return_value = {'ipatokenuniqueid': 'NewToken1',
|
||||
'type': 'HOTP',
|
||||
'ipatokenotpkey': [{'__base64__': 'KRSXG5CTMVRXEZLUGE======'}],
|
||||
'description': ['Test description'],
|
||||
'ipatokenowner': ['pinky'],
|
||||
'ipatokendisabled': ['FALSE'],
|
||||
'ipatokennotbefore': ['20200101010101Z'],
|
||||
'ipatokennotafter': ['20900101010101Z'],
|
||||
'ipatokenvendor': ['Acme'],
|
||||
'ipatokenmodel': ['ModelT'],
|
||||
'ipatokenserial': ['Number1'],
|
||||
'ipatokenotpalgorithm': ['sha256'],
|
||||
'ipatokenotpdigits': ['6'],
|
||||
'ipatokentotpclockoffset': ['10'],
|
||||
'ipatokentotptimestep': ['30'],
|
||||
'ipatokenhotpcounter': ['30']}
|
||||
mock_calls = (
|
||||
{
|
||||
'method': 'otptoken_find',
|
||||
'name': None,
|
||||
'item': {'all': True,
|
||||
'ipatokenuniqueid': 'NewToken1',
|
||||
'timelimit': '0',
|
||||
'sizelimit': '0'}
|
||||
},
|
||||
{
|
||||
'method': 'otptoken_mod',
|
||||
'name': 'NewToken1',
|
||||
'item': {'ipatokendisabled': 'TRUE',
|
||||
'all': True}
|
||||
}
|
||||
)
|
||||
changed = True
|
||||
|
||||
self._test_base(module_args, return_value, mock_calls, changed)
|
||||
|
||||
def test_delete_not_existing_token(self):
|
||||
"""Delete a OTP that does not exist"""
|
||||
module_args = {
|
||||
'uniqueid': 'NewToken1',
|
||||
'state': 'absent'
|
||||
}
|
||||
return_value = {}
|
||||
|
||||
mock_calls = [
|
||||
{
|
||||
'method': 'otptoken_find',
|
||||
'name': None,
|
||||
'item': {'all': True,
|
||||
'ipatokenuniqueid': 'NewToken1',
|
||||
'timelimit': '0',
|
||||
'sizelimit': '0'}
|
||||
}
|
||||
]
|
||||
|
||||
changed = False
|
||||
|
||||
self._test_base(module_args, return_value, mock_calls, changed)
|
||||
|
||||
def test_fail_post(self):
|
||||
"""Fail due to an exception raised from _post_json"""
|
||||
set_module_args({
|
||||
'uniqueid': 'NewToken1'
|
||||
})
|
||||
|
||||
with patch_ipa(side_effect=Exception('ERROR MESSAGE')) as (mock_login, mock_post):
|
||||
with self.assertRaises(AnsibleFailJson) as exec_info:
|
||||
self.module.main()
|
||||
|
||||
self.assertEqual(exec_info.exception.args[0]['msg'], 'ERROR MESSAGE')
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
Loading…
Reference in a new issue