1
0
Fork 0
mirror of https://github.com/ansible-collections/community.general.git synced 2024-09-14 20:13:21 +02:00

Added modules ipa_otpconfig and ipa_otptoken (#2122) (#2219)

* Added module for ipa_otpconfig

* Make no_log=False explicit.

* Updated inputs to be int type instead of strings to align to expected inputs.  Updated output message

* Add changelog fragment

* Remove changelog fragment as this is a new module

* Update plugins/modules/identity/ipa/ipa_otpconfig.py

Add version_added field to module description.

Co-authored-by: Felix Fontein <felix@fontein.de>

* Updated punctuation in examples

* Add unit test for ipa_otpconfig

* Add ipa_otptoken module with unit test

* Updated documentation in unit test

* Update plugins/modules/identity/ipa/ipa_otpconfig.py

Co-authored-by: Felix Fontein <felix@fontein.de>

* Update plugins/modules/identity/ipa/ipa_otpconfig.py

Co-authored-by: Felix Fontein <felix@fontein.de>

* Update plugins/modules/identity/ipa/ipa_otptoken.py

Co-authored-by: Felix Fontein <felix@fontein.de>

* Update plugins/modules/identity/ipa/ipa_otptoken.py

Co-authored-by: Felix Fontein <felix@fontein.de>

* Update plugins/modules/identity/ipa/ipa_otptoken.py

Co-authored-by: Felix Fontein <felix@fontein.de>

* Update plugins/modules/identity/ipa/ipa_otptoken.py

Co-authored-by: Felix Fontein <felix@fontein.de>

* Update plugins/modules/identity/ipa/ipa_otptoken.py

Co-authored-by: Felix Fontein <felix@fontein.de>

* Update plugins/modules/identity/ipa/ipa_otptoken.py

Co-authored-by: Felix Fontein <felix@fontein.de>

* Added some documentation updates to make it conform to ansible standards

* Update plugins/modules/identity/ipa/ipa_otptoken.py

Co-authored-by: Felix Fontein <felix@fontein.de>

* Address review comments

Co-authored-by: Chris Costa <chris.costa@compellingtech.com>
Co-authored-by: Felix Fontein <felix@fontein.de>
(cherry picked from commit 31645ded11)

Co-authored-by: justchris1 <30219018+justchris1@users.noreply.github.com>
This commit is contained in:
patchback[bot] 2021-04-11 15:53:33 +02:00 committed by GitHub
parent b701b5893f
commit f64ace97af
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 1604 additions and 2 deletions

View file

@ -119,9 +119,9 @@ class IPAClient(object):
data = dict(method=method)
# TODO: We should probably handle this a little better.
if method in ('ping', 'config_show'):
if method in ('ping', 'config_show', 'otpconfig_show'):
data['params'] = [[], {}]
elif method == 'config_mod':
elif method in ('config_mod', 'otpconfig_mod'):
data['params'] = [[], item]
else:
data['params'] = [[name], item]

View file

@ -0,0 +1,172 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# Copyright: (c) 2021, Ansible Project
# Heavily influenced from Fran Fitzpatrick <francis.x.fitzpatrick@gmail.com> ipa_config module
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import absolute_import, division, print_function
__metaclass__ = type
DOCUMENTATION = r'''
---
module: ipa_otpconfig
author: justchris1 (@justchris1)
short_description: Manage FreeIPA OTP Configuration Settings
version_added: 2.5.0
description:
- Modify global configuration settings of a FreeIPA Server with respect to OTP (One Time Passwords).
options:
ipatokentotpauthwindow:
description: TOTP authentication window in seconds.
aliases: ["totpauthwindow"]
type: int
ipatokentotpsyncwindow:
description: TOTP synchronization window in seconds.
aliases: ["totpsyncwindow"]
type: int
ipatokenhotpauthwindow:
description: HOTP authentication window in number of hops.
aliases: ["hotpauthwindow"]
type: int
ipatokenhotpsyncwindow:
description: HOTP synchronization window in hops.
aliases: ["hotpsyncwindow"]
type: int
extends_documentation_fragment:
- community.general.ipa.documentation
'''
EXAMPLES = r'''
- name: Ensure the TOTP authentication window is set to 300 seconds
community.general.ipa_otpconfig:
ipatokentotpauthwindow: '300'
ipa_host: localhost
ipa_user: admin
ipa_pass: supersecret
- name: Ensure the TOTP syncronization window is set to 86400 seconds
community.general.ipa_otpconfig:
ipatokentotpsyncwindow: '86400'
ipa_host: localhost
ipa_user: admin
ipa_pass: supersecret
- name: Ensure the HOTP authentication window is set to 10 hops
community.general.ipa_otpconfig:
ipatokenhotpauthwindow: '10'
ipa_host: localhost
ipa_user: admin
ipa_pass: supersecret
- name: Ensure the HOTP syncronization window is set to 100 hops
community.general.ipa_otpconfig:
ipatokenhotpsyncwindow: '100'
ipa_host: localhost
ipa_user: admin
ipa_pass: supersecret
'''
RETURN = r'''
otpconfig:
description: OTP configuration as returned by IPA API.
returned: always
type: dict
'''
import traceback
from ansible.module_utils.basic import AnsibleModule
from ansible_collections.community.general.plugins.module_utils.ipa import IPAClient, ipa_argument_spec
from ansible.module_utils._text import to_native
class OTPConfigIPAClient(IPAClient):
def __init__(self, module, host, port, protocol):
super(OTPConfigIPAClient, self).__init__(module, host, port, protocol)
def otpconfig_show(self):
return self._post_json(method='otpconfig_show', name=None)
def otpconfig_mod(self, name, item):
return self._post_json(method='otpconfig_mod', name=name, item=item)
def get_otpconfig_dict(ipatokentotpauthwindow=None, ipatokentotpsyncwindow=None,
ipatokenhotpauthwindow=None, ipatokenhotpsyncwindow=None):
config = {}
if ipatokentotpauthwindow is not None:
config['ipatokentotpauthwindow'] = str(ipatokentotpauthwindow)
if ipatokentotpsyncwindow is not None:
config['ipatokentotpsyncwindow'] = str(ipatokentotpsyncwindow)
if ipatokenhotpauthwindow is not None:
config['ipatokenhotpauthwindow'] = str(ipatokenhotpauthwindow)
if ipatokenhotpsyncwindow is not None:
config['ipatokenhotpsyncwindow'] = str(ipatokenhotpsyncwindow)
return config
def get_otpconfig_diff(client, ipa_config, module_config):
return client.get_diff(ipa_data=ipa_config, module_data=module_config)
def ensure(module, client):
module_otpconfig = get_otpconfig_dict(
ipatokentotpauthwindow=module.params.get('ipatokentotpauthwindow'),
ipatokentotpsyncwindow=module.params.get('ipatokentotpsyncwindow'),
ipatokenhotpauthwindow=module.params.get('ipatokenhotpauthwindow'),
ipatokenhotpsyncwindow=module.params.get('ipatokenhotpsyncwindow'),
)
ipa_otpconfig = client.otpconfig_show()
diff = get_otpconfig_diff(client, ipa_otpconfig, module_otpconfig)
changed = False
new_otpconfig = {}
for module_key in diff:
if module_otpconfig.get(module_key) != ipa_otpconfig.get(module_key, None):
changed = True
new_otpconfig.update({module_key: module_otpconfig.get(module_key)})
if changed and not module.check_mode:
client.otpconfig_mod(name=None, item=new_otpconfig)
return changed, client.otpconfig_show()
def main():
argument_spec = ipa_argument_spec()
argument_spec.update(
ipatokentotpauthwindow=dict(type='int', aliases=['totpauthwindow'], no_log=False),
ipatokentotpsyncwindow=dict(type='int', aliases=['totpsyncwindow'], no_log=False),
ipatokenhotpauthwindow=dict(type='int', aliases=['hotpauthwindow'], no_log=False),
ipatokenhotpsyncwindow=dict(type='int', aliases=['hotpsyncwindow'], no_log=False),
)
module = AnsibleModule(
argument_spec=argument_spec,
supports_check_mode=True
)
client = OTPConfigIPAClient(
module=module,
host=module.params['ipa_host'],
port=module.params['ipa_port'],
protocol=module.params['ipa_prot']
)
try:
client.login(
username=module.params['ipa_user'],
password=module.params['ipa_pass']
)
changed, otpconfig = ensure(module, client)
except Exception as e:
module.fail_json(msg=to_native(e), exception=traceback.format_exc())
module.exit_json(changed=changed, otpconfig=otpconfig)
if __name__ == '__main__':
main()

View file

@ -0,0 +1,527 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# Copyright: (c) 2017, Ansible Project
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import absolute_import, division, print_function
__metaclass__ = type
DOCUMENTATION = r'''
---
module: ipa_otptoken
author: justchris1 (@justchris1)
short_description: Manage FreeIPA OTPs
version_added: 2.5.0
description:
- Add, modify, and delete One Time Passwords in IPA.
options:
uniqueid:
description: Unique ID of the token in IPA.
required: true
aliases: ["name"]
type: str
newuniqueid:
description: If specified, the unique id specified will be changed to this.
type: str
otptype:
description:
- Type of OTP.
- "B(Note:) Cannot be modified after OTP is created."
type: str
choices: [ totp, hotp ]
secretkey:
description:
- Token secret (Base64).
- If OTP is created and this is not specified, a random secret will be generated by IPA.
- "B(Note:) Cannot be modified after OTP is created."
type: str
description:
description: Description of the token (informational only).
type: str
owner:
description: Assigned user of the token.
type: str
enabled:
description: Mark the token as enabled (default C(true)).
default: true
type: bool
notbefore:
description:
- First date/time the token can be used.
- In the format C(YYYYMMddHHmmss).
- For example, C(20180121182022) will allow the token to be used starting on 21 January 2018 at 18:20:22.
type: str
notafter:
description:
- Last date/time the token can be used.
- In the format C(YYYYMMddHHmmss).
- For example, C(20200121182022) will allow the token to be used until 21 January 2020 at 18:20:22.
type: str
vendor:
description: Token vendor name (informational only).
type: str
model:
description: Token model (informational only).
type: str
serial:
description: Token serial (informational only).
type: str
state:
description: State to ensure.
choices: ['present', 'absent']
default: 'present'
type: str
algorithm:
description:
- Token hash algorithm.
- "B(Note:) Cannot be modified after OTP is created."
choices: ['sha1', 'sha256', 'sha384', 'sha512']
type: str
digits:
description:
- Number of digits each token code will have.
- "B(Note:) Cannot be modified after OTP is created."
choices: [ 6, 8 ]
type: int
offset:
description:
- TOTP token / IPA server time difference.
- "B(Note:) Cannot be modified after OTP is created."
type: int
interval:
description:
- Length of TOTP token code validity in seconds.
- "B(Note:) Cannot be modified after OTP is created."
type: int
counter:
description:
- Initial counter for the HOTP token.
- "B(Note:) Cannot be modified after OTP is created."
type: int
extends_documentation_fragment:
- community.general.ipa.documentation
'''
EXAMPLES = r'''
- name: Create a totp for pinky, allowing the IPA server to generate using defaults
community.general.ipa_otptoken:
uniqueid: Token123
otptype: totp
owner: pinky
ipa_host: ipa.example.com
ipa_user: admin
ipa_pass: topsecret
- name: Create a 8 digit hotp for pinky with sha256 with specified validity times
community.general.ipa_otptoken:
uniqueid: Token123
enabled: true
otptype: hotp
digits: 8
secretkey: UMKSIER00zT2T2tWMUlTRmNlekRCbFQvWFBVZUh2dElHWGR6T3VUR3IzK2xjaFk9
algorithm: sha256
notbefore: 20180121182123
notafter: 20220121182123
owner: pinky
ipa_host: ipa.example.com
ipa_user: admin
ipa_pass: topsecret
- name: Update Token123 to indicate a vendor, model, serial number (info only), and description
community.general.ipa_otptoken:
uniqueid: Token123
vendor: Acme
model: acme101
serial: SerialNumber1
description: Acme OTP device
ipa_host: ipa.example.com
ipa_user: admin
ipa_pass: topsecret
- name: Disable Token123
community.general.ipa_otptoken:
uniqueid: Token123
enabled: false
ipa_host: ipa.example.com
ipa_user: admin
ipa_pass: topsecret
- name: Rename Token123 to TokenABC and enable it
community.general.ipa_otptoken:
uniqueid: Token123
newuniqueid: TokenABC
enabled: true
ipa_host: ipa.example.com
ipa_user: admin
ipa_pass: topsecret
'''
RETURN = r'''
otptoken:
description: OTP Token as returned by IPA API
returned: always
type: dict
'''
import base64
import traceback
from ansible.module_utils.basic import AnsibleModule, sanitize_keys
from ansible_collections.community.general.plugins.module_utils.ipa import IPAClient, ipa_argument_spec
from ansible.module_utils._text import to_native
class OTPTokenIPAClient(IPAClient):
def __init__(self, module, host, port, protocol):
super(OTPTokenIPAClient, self).__init__(module, host, port, protocol)
def otptoken_find(self, name):
return self._post_json(method='otptoken_find', name=None, item={'all': True,
'ipatokenuniqueid': name,
'timelimit': '0',
'sizelimit': '0'})
def otptoken_add(self, name, item):
return self._post_json(method='otptoken_add', name=name, item=item)
def otptoken_mod(self, name, item):
return self._post_json(method='otptoken_mod', name=name, item=item)
def otptoken_del(self, name):
return self._post_json(method='otptoken_del', name=name)
def base64_to_base32(base64_string):
"""Converts base64 string to base32 string"""
b32_string = base64.b32encode(base64.b64decode(base64_string)).decode('ascii')
return b32_string
def base32_to_base64(base32_string):
"""Converts base32 string to base64 string"""
b64_string = base64.b64encode(base64.b32decode(base32_string)).decode('ascii')
return b64_string
def get_otptoken_dict(ansible_to_ipa, uniqueid=None, newuniqueid=None, otptype=None, secretkey=None, description=None, owner=None,
enabled=None, notbefore=None, notafter=None, vendor=None,
model=None, serial=None, algorithm=None, digits=None, offset=None,
interval=None, counter=None):
"""Create the dictionary of settings passed in"""
otptoken = {}
if uniqueid is not None:
otptoken[ansible_to_ipa['uniqueid']] = uniqueid
if newuniqueid is not None:
otptoken[ansible_to_ipa['newuniqueid']] = newuniqueid
if otptype is not None:
otptoken[ansible_to_ipa['otptype']] = otptype.upper()
if secretkey is not None:
# For some unknown reason, while IPA returns the secret in base64,
# it wants the secret passed in as base32. This makes it more difficult
# for comparison (does 'current' equal to 'new'). Moreover, this may
# cause some subtle issue in a playbook as the output is encoded
# in a different way than if it was passed in as a parameter. For
# these reasons, have the module standardize on base64 input (as parameter)
# and output (from IPA).
otptoken[ansible_to_ipa['secretkey']] = base64_to_base32(secretkey)
if description is not None:
otptoken[ansible_to_ipa['description']] = description
if owner is not None:
otptoken[ansible_to_ipa['owner']] = owner
if enabled is not None:
otptoken[ansible_to_ipa['enabled']] = 'FALSE' if enabled else 'TRUE'
if notbefore is not None:
otptoken[ansible_to_ipa['notbefore']] = notbefore + 'Z'
if notafter is not None:
otptoken[ansible_to_ipa['notafter']] = notafter + 'Z'
if vendor is not None:
otptoken[ansible_to_ipa['vendor']] = vendor
if model is not None:
otptoken[ansible_to_ipa['model']] = model
if serial is not None:
otptoken[ansible_to_ipa['serial']] = serial
if algorithm is not None:
otptoken[ansible_to_ipa['algorithm']] = algorithm
if digits is not None:
otptoken[ansible_to_ipa['digits']] = str(digits)
if offset is not None:
otptoken[ansible_to_ipa['offset']] = str(offset)
if interval is not None:
otptoken[ansible_to_ipa['interval']] = str(interval)
if counter is not None:
otptoken[ansible_to_ipa['counter']] = str(counter)
return otptoken
def transform_output(ipa_otptoken, ansible_to_ipa, ipa_to_ansible):
"""Transform the output received by IPA to a format more friendly
before it is returned to the user. IPA returns even simple
strings as a list of strings. It also returns bools and
int as string. This function cleans that up before return.
"""
updated_otptoken = ipa_otptoken
# Used to hold values that will be sanitized from output as no_log.
# For the case where secretkey is not specified at the module, but
# is passed back from IPA.
sanitize_strings = set()
# Rename the IPA parameters to the more friendly ansible module names for them
for ipa_parameter in ipa_to_ansible:
if ipa_parameter in ipa_otptoken:
updated_otptoken[ipa_to_ansible[ipa_parameter]] = ipa_otptoken[ipa_parameter]
updated_otptoken.pop(ipa_parameter)
# Change the type from IPA's list of string to the appropriate return value type
# based on field. By default, assume they should be strings.
for ansible_parameter in ansible_to_ipa:
if ansible_parameter in updated_otptoken:
if isinstance(updated_otptoken[ansible_parameter], list) and len(updated_otptoken[ansible_parameter]) == 1:
if ansible_parameter in ['digits', 'offset', 'interval', 'counter']:
updated_otptoken[ansible_parameter] = int(updated_otptoken[ansible_parameter][0])
elif ansible_parameter == 'enabled':
updated_otptoken[ansible_parameter] = bool(updated_otptoken[ansible_parameter][0])
else:
updated_otptoken[ansible_parameter] = updated_otptoken[ansible_parameter][0]
if 'secretkey' in updated_otptoken:
if isinstance(updated_otptoken['secretkey'], dict):
if '__base64__' in updated_otptoken['secretkey']:
sanitize_strings.add(updated_otptoken['secretkey']['__base64__'])
b64key = updated_otptoken['secretkey']['__base64__']
updated_otptoken.pop('secretkey')
updated_otptoken['secretkey'] = b64key
sanitize_strings.add(b64key)
elif '__base32__' in updated_otptoken['secretkey']:
sanitize_strings.add(updated_otptoken['secretkey']['__base32__'])
b32key = updated_otptoken['secretkey']['__base32__']
b64key = base32_to_base64(b32key)
updated_otptoken.pop('secretkey')
updated_otptoken['secretkey'] = b64key
sanitize_strings.add(b32key)
sanitize_strings.add(b64key)
return updated_otptoken, sanitize_strings
def validate_modifications(ansible_to_ipa, module, ipa_otptoken,
module_otptoken, unmodifiable_after_creation):
"""Checks to see if the requested modifications are valid. Some elements
cannot be modified after initial creation. However, we still want to
validate arguments that are specified, but are not different than what
is currently set on the server.
"""
modifications_valid = True
for parameter in unmodifiable_after_creation:
if ansible_to_ipa[parameter] in module_otptoken and ansible_to_ipa[parameter] in ipa_otptoken:
mod_value = module_otptoken[ansible_to_ipa[parameter]]
# For someone unknown reason, the returns from IPA put almost all
# values in a list, even though passing them in a list (even of
# length 1) will be rejected. The module values for all elements
# other than type (totp or hotp) have this happen.
if parameter == 'otptype':
ipa_value = ipa_otptoken[ansible_to_ipa[parameter]]
else:
if len(ipa_otptoken[ansible_to_ipa[parameter]]) != 1:
module.fail_json(msg=("Invariant fail: Return value from IPA is not a list " +
"of length 1. Please open a bug report for the module."))
if parameter == 'secretkey':
# We stored the secret key in base32 since we had assumed that would need to
# be the format if we were contacting IPA to create it. However, we are
# now comparing it against what is already set in the IPA server, so convert
# back to base64 for comparison.
mod_value = base32_to_base64(mod_value)
# For the secret key, it is even more specific in that the key is returned
# in a dict, in the list, as the __base64__ entry for the IPA response.
ipa_value = ipa_otptoken[ansible_to_ipa[parameter]][0]['__base64__']
if '__base64__' in ipa_otptoken[ansible_to_ipa[parameter]][0]:
ipa_value = ipa_otptoken[ansible_to_ipa[parameter]][0]['__base64__']
elif '__base32__' in ipa_otptoken[ansible_to_ipa[parameter]][0]:
b32key = ipa_otptoken[ansible_to_ipa[parameter]][0]['__base32__']
b64key = base32_to_base64(b32key)
ipa_value = b64key
else:
ipa_value = None
else:
ipa_value = ipa_otptoken[ansible_to_ipa[parameter]][0]
if mod_value != ipa_value:
modifications_valid = False
fail_message = ("Parameter '" + parameter + "' cannot be changed once " +
"the OTP is created and the requested value specified here (" +
str(mod_value) +
") differs from what is set in the IPA server ("
+ str(ipa_value) + ")")
module.fail_json(msg=fail_message)
return modifications_valid
def ensure(module, client):
# dict to map from ansible parameter names to attribute names
# used by IPA (which are not so friendly).
ansible_to_ipa = {'uniqueid': 'ipatokenuniqueid',
'newuniqueid': 'rename',
'otptype': 'type',
'secretkey': 'ipatokenotpkey',
'description': 'description',
'owner': 'ipatokenowner',
'enabled': 'ipatokendisabled',
'notbefore': 'ipatokennotbefore',
'notafter': 'ipatokennotafter',
'vendor': 'ipatokenvendor',
'model': 'ipatokenmodel',
'serial': 'ipatokenserial',
'algorithm': 'ipatokenotpalgorithm',
'digits': 'ipatokenotpdigits',
'offset': 'ipatokentotpclockoffset',
'interval': 'ipatokentotptimestep',
'counter': 'ipatokenhotpcounter'}
# Create inverse dictionary for mapping return values
ipa_to_ansible = {}
for (k, v) in ansible_to_ipa.items():
ipa_to_ansible[v] = k
unmodifiable_after_creation = ['otptype', 'secretkey', 'algorithm',
'digits', 'offset', 'interval', 'counter']
state = module.params['state']
uniqueid = module.params['uniqueid']
module_otptoken = get_otptoken_dict(ansible_to_ipa=ansible_to_ipa,
uniqueid=module.params.get('uniqueid'),
newuniqueid=module.params.get('newuniqueid'),
otptype=module.params.get('otptype'),
secretkey=module.params.get('secretkey'),
description=module.params.get('description'),
owner=module.params.get('owner'),
enabled=module.params.get('enabled'),
notbefore=module.params.get('notbefore'),
notafter=module.params.get('notafter'),
vendor=module.params.get('vendor'),
model=module.params.get('model'),
serial=module.params.get('serial'),
algorithm=module.params.get('algorithm'),
digits=module.params.get('digits'),
offset=module.params.get('offset'),
interval=module.params.get('interval'),
counter=module.params.get('counter'))
ipa_otptoken = client.otptoken_find(name=uniqueid)
if ansible_to_ipa['newuniqueid'] in module_otptoken:
# Check to see if the new unique id is already taken in use
ipa_otptoken_new = client.otptoken_find(name=module_otptoken[ansible_to_ipa['newuniqueid']])
if ipa_otptoken_new:
module.fail_json(msg=("Requested rename through newuniqueid to " +
module_otptoken[ansible_to_ipa['newuniqueid']] +
" failed because the new unique id is already in use"))
changed = False
if state == 'present':
if not ipa_otptoken:
changed = True
if not module.check_mode:
# It would not make sense to have a rename after creation, so if the user
# specified a newuniqueid, just replace the uniqueid with the updated one
# before creation
if ansible_to_ipa['newuniqueid'] in module_otptoken:
module_otptoken[ansible_to_ipa['uniqueid']] = module_otptoken[ansible_to_ipa['newuniqueid']]
uniqueid = module_otptoken[ansible_to_ipa['newuniqueid']]
module_otptoken.pop(ansible_to_ipa['newuniqueid'])
# IPA wants the unique id in the first position and not as a key/value pair.
# Get rid of it from the otptoken dict and just specify it in the name field
# for otptoken_add.
if ansible_to_ipa['uniqueid'] in module_otptoken:
module_otptoken.pop(ansible_to_ipa['uniqueid'])
module_otptoken['all'] = True
ipa_otptoken = client.otptoken_add(name=uniqueid, item=module_otptoken)
else:
if not(validate_modifications(ansible_to_ipa, module, ipa_otptoken,
module_otptoken, unmodifiable_after_creation)):
module.fail_json(msg="Modifications requested in module are not valid")
# IPA will reject 'modifications' that do not actually modify anything
# if any of the unmodifiable elements are specified. Explicitly
# get rid of them here. They were not different or else the
# we would have failed out in validate_modifications.
for x in unmodifiable_after_creation:
if ansible_to_ipa[x] in module_otptoken:
module_otptoken.pop(ansible_to_ipa[x])
diff = client.get_diff(ipa_data=ipa_otptoken, module_data=module_otptoken)
if len(diff) > 0:
changed = True
if not module.check_mode:
# IPA wants the unique id in the first position and not as a key/value pair.
# Get rid of it from the otptoken dict and just specify it in the name field
# for otptoken_mod.
if ansible_to_ipa['uniqueid'] in module_otptoken:
module_otptoken.pop(ansible_to_ipa['uniqueid'])
module_otptoken['all'] = True
ipa_otptoken = client.otptoken_mod(name=uniqueid, item=module_otptoken)
else:
if ipa_otptoken:
changed = True
if not module.check_mode:
client.otptoken_del(name=uniqueid)
# Transform the output to use ansible keywords (not the IPA keywords) and
# sanitize any key values in the output.
ipa_otptoken, sanitize_strings = transform_output(ipa_otptoken, ansible_to_ipa, ipa_to_ansible)
module.no_log_values = module.no_log_values.union(sanitize_strings)
sanitized_otptoken = sanitize_keys(obj=ipa_otptoken, no_log_strings=module.no_log_values)
return changed, sanitized_otptoken
def main():
argument_spec = ipa_argument_spec()
argument_spec.update(uniqueid=dict(type='str', aliases=['name'], required=True),
newuniqueid=dict(type='str'),
otptype=dict(type='str', choices=['totp', 'hotp']),
secretkey=dict(type='str', no_log=True),
description=dict(type='str'),
owner=dict(type='str'),
enabled=dict(type='bool', default=True),
notbefore=dict(type='str'),
notafter=dict(type='str'),
vendor=dict(type='str'),
model=dict(type='str'),
serial=dict(type='str'),
state=dict(type='str', choices=['present', 'absent'], default='present'),
algorithm=dict(type='str', choices=['sha1', 'sha256', 'sha384', 'sha512']),
digits=dict(type='int', choices=[6, 8]),
offset=dict(type='int'),
interval=dict(type='int'),
counter=dict(type='int'))
module = AnsibleModule(argument_spec=argument_spec,
supports_check_mode=True)
client = OTPTokenIPAClient(module=module,
host=module.params['ipa_host'],
port=module.params['ipa_port'],
protocol=module.params['ipa_prot'])
try:
client.login(username=module.params['ipa_user'],
password=module.params['ipa_pass'])
changed, otptoken = ensure(module, client)
except Exception as e:
module.fail_json(msg=to_native(e), exception=traceback.format_exc())
module.exit_json(changed=changed, otptoken=otptoken)
if __name__ == '__main__':
main()

View file

@ -0,0 +1 @@
./identity/ipa/ipa_otpconfig.py

View file

@ -0,0 +1 @@
./identity/ipa/ipa_otptoken.py

View file

@ -0,0 +1,406 @@
# -*- coding: utf-8 -*-
# Copyright: (c) 2020, Ansible Project
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import absolute_import, division, print_function
__metaclass__ = type
from contextlib import contextmanager
from ansible_collections.community.general.tests.unit.compat import unittest
from ansible_collections.community.general.tests.unit.compat.mock import call, patch
from ansible_collections.community.general.tests.unit.plugins.modules.utils import AnsibleExitJson, AnsibleFailJson, ModuleTestCase, set_module_args
from ansible_collections.community.general.plugins.modules.identity.ipa import ipa_otpconfig
@contextmanager
def patch_ipa(**kwargs):
"""Mock context manager for patching the methods in OTPConfigIPAClient that contact the IPA server
Patches the `login` and `_post_json` methods
Keyword arguments are passed to the mock object that patches `_post_json`
No arguments are passed to the mock object that patches `login` because no tests require it
Example::
with patch_ipa(return_value={}) as (mock_login, mock_post):
...
"""
obj = ipa_otpconfig.OTPConfigIPAClient
with patch.object(obj, 'login') as mock_login:
with patch.object(obj, '_post_json', **kwargs) as mock_post:
yield mock_login, mock_post
class TestIPAOTPConfig(ModuleTestCase):
def setUp(self):
super(TestIPAOTPConfig, self).setUp()
self.module = ipa_otpconfig
def _test_base(self, module_args, return_value, mock_calls, changed):
"""Base function that's called by all the other test functions
module_args (dict):
Arguments passed to the module
return_value (dict):
Mocked return value of OTPConfigIPAClient.otpconfig_show, as returned by the IPA API.
This should be set to the current state. It will be changed to the desired state using the above arguments.
(Technically, this is the return value of _post_json, but it's only checked by otpconfig_show).
mock_calls (list/tuple of dicts):
List of calls made to OTPConfigIPAClient._post_json, in order.
_post_json is called by all of the otpconfig_* methods of the class.
Pass an empty list if no calls are expected.
changed (bool):
Whether or not the module is supposed to be marked as changed
"""
set_module_args(module_args)
# Run the module
with patch_ipa(return_value=return_value) as (mock_login, mock_post):
with self.assertRaises(AnsibleExitJson) as exec_info:
self.module.main()
# Verify that the calls to _post_json match what is expected
expected_call_count = len(mock_calls)
if expected_call_count > 1:
# Convert the call dicts to unittest.mock.call instances because `assert_has_calls` only accepts them
converted_calls = []
for call_dict in mock_calls:
converted_calls.append(call(**call_dict))
mock_post.assert_has_calls(converted_calls)
self.assertEqual(len(mock_post.mock_calls), expected_call_count)
elif expected_call_count == 1:
mock_post.assert_called_once_with(**mock_calls[0])
else: # expected_call_count is 0
mock_post.assert_not_called()
# Verify that the module's changed status matches what is expected
self.assertIs(exec_info.exception.args[0]['changed'], changed)
def test_set_all_no_adjustment(self):
"""Set values requiring no adjustment"""
module_args = {
'ipatokentotpauthwindow': 11,
'ipatokentotpsyncwindow': 12,
'ipatokenhotpauthwindow': 13,
'ipatokenhotpsyncwindow': 14
}
return_value = {
'ipatokentotpauthwindow': ['11'],
'ipatokentotpsyncwindow': ['12'],
'ipatokenhotpauthwindow': ['13'],
'ipatokenhotpsyncwindow': ['14']}
mock_calls = (
{
'method': 'otpconfig_show',
'name': None
},
{
'method': 'otpconfig_show',
'name': None
}
)
changed = False
self._test_base(module_args, return_value, mock_calls, changed)
def test_set_all_aliases_no_adjustment(self):
"""Set values requiring no adjustment on all using aliases values"""
module_args = {
'totpauthwindow': 11,
'totpsyncwindow': 12,
'hotpauthwindow': 13,
'hotpsyncwindow': 14
}
return_value = {
'ipatokentotpauthwindow': ['11'],
'ipatokentotpsyncwindow': ['12'],
'ipatokenhotpauthwindow': ['13'],
'ipatokenhotpsyncwindow': ['14']}
mock_calls = (
{
'method': 'otpconfig_show',
'name': None
},
{
'method': 'otpconfig_show',
'name': None
}
)
changed = False
self._test_base(module_args, return_value, mock_calls, changed)
def test_set_totp_auth_window_no_adjustment(self):
"""Set values requiring no adjustment on totpauthwindow"""
module_args = {
'totpauthwindow': 11
}
return_value = {
'ipatokentotpauthwindow': ['11'],
'ipatokentotpsyncwindow': ['12'],
'ipatokenhotpauthwindow': ['13'],
'ipatokenhotpsyncwindow': ['14']}
mock_calls = (
{
'method': 'otpconfig_show',
'name': None
},
{
'method': 'otpconfig_show',
'name': None
}
)
changed = False
self._test_base(module_args, return_value, mock_calls, changed)
def test_set_totp_sync_window_no_adjustment(self):
"""Set values requiring no adjustment on totpsyncwindow"""
module_args = {
'totpsyncwindow': 12
}
return_value = {
'ipatokentotpauthwindow': ['11'],
'ipatokentotpsyncwindow': ['12'],
'ipatokenhotpauthwindow': ['13'],
'ipatokenhotpsyncwindow': ['14']}
mock_calls = (
{
'method': 'otpconfig_show',
'name': None
},
{
'method': 'otpconfig_show',
'name': None
}
)
changed = False
self._test_base(module_args, return_value, mock_calls, changed)
def test_set_hotp_auth_window_no_adjustment(self):
"""Set values requiring no adjustment on hotpauthwindow"""
module_args = {
'hotpauthwindow': 13
}
return_value = {
'ipatokentotpauthwindow': ['11'],
'ipatokentotpsyncwindow': ['12'],
'ipatokenhotpauthwindow': ['13'],
'ipatokenhotpsyncwindow': ['14']}
mock_calls = (
{
'method': 'otpconfig_show',
'name': None
},
{
'method': 'otpconfig_show',
'name': None
}
)
changed = False
self._test_base(module_args, return_value, mock_calls, changed)
def test_set_hotp_sync_window_no_adjustment(self):
"""Set values requiring no adjustment on hotpsyncwindow"""
module_args = {
'hotpsyncwindow': 14
}
return_value = {
'ipatokentotpauthwindow': ['11'],
'ipatokentotpsyncwindow': ['12'],
'ipatokenhotpauthwindow': ['13'],
'ipatokenhotpsyncwindow': ['14']}
mock_calls = (
{
'method': 'otpconfig_show',
'name': None
},
{
'method': 'otpconfig_show',
'name': None
}
)
changed = False
self._test_base(module_args, return_value, mock_calls, changed)
def test_set_totp_auth_window(self):
"""Set values requiring adjustment on totpauthwindow"""
module_args = {
'totpauthwindow': 10
}
return_value = {
'ipatokentotpauthwindow': ['11'],
'ipatokentotpsyncwindow': ['12'],
'ipatokenhotpauthwindow': ['13'],
'ipatokenhotpsyncwindow': ['14']}
mock_calls = (
{
'method': 'otpconfig_show',
'name': None
},
{
'method': 'otpconfig_mod',
'name': None,
'item': {'ipatokentotpauthwindow': '10'}
},
{
'method': 'otpconfig_show',
'name': None
}
)
changed = True
self._test_base(module_args, return_value, mock_calls, changed)
def test_set_totp_sync_window(self):
"""Set values requiring adjustment on totpsyncwindow"""
module_args = {
'totpsyncwindow': 10
}
return_value = {
'ipatokentotpauthwindow': ['11'],
'ipatokentotpsyncwindow': ['12'],
'ipatokenhotpauthwindow': ['13'],
'ipatokenhotpsyncwindow': ['14']}
mock_calls = (
{
'method': 'otpconfig_show',
'name': None
},
{
'method': 'otpconfig_mod',
'name': None,
'item': {'ipatokentotpsyncwindow': '10'}
},
{
'method': 'otpconfig_show',
'name': None
}
)
changed = True
self._test_base(module_args, return_value, mock_calls, changed)
def test_set_hotp_auth_window(self):
"""Set values requiring adjustment on hotpauthwindow"""
module_args = {
'hotpauthwindow': 10
}
return_value = {
'ipatokentotpauthwindow': ['11'],
'ipatokentotpsyncwindow': ['12'],
'ipatokenhotpauthwindow': ['13'],
'ipatokenhotpsyncwindow': ['14']}
mock_calls = (
{
'method': 'otpconfig_show',
'name': None
},
{
'method': 'otpconfig_mod',
'name': None,
'item': {'ipatokenhotpauthwindow': '10'}
},
{
'method': 'otpconfig_show',
'name': None
}
)
changed = True
self._test_base(module_args, return_value, mock_calls, changed)
def test_set_hotp_sync_window(self):
"""Set values requiring adjustment on hotpsyncwindow"""
module_args = {
'hotpsyncwindow': 10
}
return_value = {
'ipatokentotpauthwindow': ['11'],
'ipatokentotpsyncwindow': ['12'],
'ipatokenhotpauthwindow': ['13'],
'ipatokenhotpsyncwindow': ['14']}
mock_calls = (
{
'method': 'otpconfig_show',
'name': None
},
{
'method': 'otpconfig_mod',
'name': None,
'item': {'ipatokenhotpsyncwindow': '10'}
},
{
'method': 'otpconfig_show',
'name': None
}
)
changed = True
self._test_base(module_args, return_value, mock_calls, changed)
def test_set_all(self):
"""Set values requiring adjustment on all"""
module_args = {
'ipatokentotpauthwindow': 11,
'ipatokentotpsyncwindow': 12,
'ipatokenhotpauthwindow': 13,
'ipatokenhotpsyncwindow': 14
}
return_value = {
'ipatokentotpauthwindow': ['1'],
'ipatokentotpsyncwindow': ['2'],
'ipatokenhotpauthwindow': ['3'],
'ipatokenhotpsyncwindow': ['4']}
mock_calls = (
{
'method': 'otpconfig_show',
'name': None
},
{
'method': 'otpconfig_mod',
'name': None,
'item': {'ipatokentotpauthwindow': '11',
'ipatokentotpsyncwindow': '12',
'ipatokenhotpauthwindow': '13',
'ipatokenhotpsyncwindow': '14'}
},
{
'method': 'otpconfig_show',
'name': None
}
)
changed = True
self._test_base(module_args, return_value, mock_calls, changed)
def test_fail_post(self):
"""Fail due to an exception raised from _post_json"""
set_module_args({
'ipatokentotpauthwindow': 11,
'ipatokentotpsyncwindow': 12,
'ipatokenhotpauthwindow': 13,
'ipatokenhotpsyncwindow': 14
})
with patch_ipa(side_effect=Exception('ERROR MESSAGE')) as (mock_login, mock_post):
with self.assertRaises(AnsibleFailJson) as exec_info:
self.module.main()
self.assertEqual(exec_info.exception.args[0]['msg'], 'ERROR MESSAGE')
if __name__ == '__main__':
unittest.main()

View file

@ -0,0 +1,495 @@
# -*- coding: utf-8 -*-
# Copyright: (c) 2020, Ansible Project
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import absolute_import, division, print_function
__metaclass__ = type
from contextlib import contextmanager
from ansible_collections.community.general.tests.unit.compat import unittest
from ansible_collections.community.general.tests.unit.compat.mock import call, patch
from ansible_collections.community.general.tests.unit.plugins.modules.utils import AnsibleExitJson, AnsibleFailJson, ModuleTestCase, set_module_args
from ansible_collections.community.general.plugins.modules.identity.ipa import ipa_otptoken
@contextmanager
def patch_ipa(**kwargs):
"""Mock context manager for patching the methods in OTPTokenIPAClient that contact the IPA server
Patches the `login` and `_post_json` methods
Keyword arguments are passed to the mock object that patches `_post_json`
No arguments are passed to the mock object that patches `login` because no tests require it
Example::
with patch_ipa(return_value={}) as (mock_login, mock_post):
...
"""
obj = ipa_otptoken.OTPTokenIPAClient
with patch.object(obj, 'login') as mock_login:
with patch.object(obj, '_post_json', **kwargs) as mock_post:
yield mock_login, mock_post
class TestIPAOTPToken(ModuleTestCase):
def setUp(self):
super(TestIPAOTPToken, self).setUp()
self.module = ipa_otptoken
def _test_base(self, module_args, return_value, mock_calls, changed):
"""Base function that's called by all the other test functions
module_args (dict):
Arguments passed to the module
return_value (dict):
Mocked return value of OTPTokenIPAClient.otptoken_show, as returned by the IPA API.
This should be set to the current state. It will be changed to the desired state using the above arguments.
(Technically, this is the return value of _post_json, but it's only checked by otptoken_show).
mock_calls (list/tuple of dicts):
List of calls made to OTPTokenIPAClient._post_json, in order.
_post_json is called by all of the otptoken_* methods of the class.
Pass an empty list if no calls are expected.
changed (bool):
Whether or not the module is supposed to be marked as changed
"""
set_module_args(module_args)
# Run the module
with patch_ipa(return_value=return_value) as (mock_login, mock_post):
with self.assertRaises(AnsibleExitJson) as exec_info:
self.module.main()
# Verify that the calls to _post_json match what is expected
expected_call_count = len(mock_calls)
if expected_call_count > 1:
# Convert the call dicts to unittest.mock.call instances because `assert_has_calls` only accepts them
converted_calls = []
for call_dict in mock_calls:
converted_calls.append(call(**call_dict))
mock_post.assert_has_calls(converted_calls)
self.assertEqual(len(mock_post.mock_calls), expected_call_count)
elif expected_call_count == 1:
mock_post.assert_called_once_with(**mock_calls[0])
else: # expected_call_count is 0
mock_post.assert_not_called()
# Verify that the module's changed status matches what is expected
self.assertIs(exec_info.exception.args[0]['changed'], changed)
def test_add_new_all_default(self):
"""Add a new OTP with all default values"""
module_args = {
'uniqueid': 'NewToken1'
}
return_value = {}
mock_calls = (
{
'method': 'otptoken_find',
'name': None,
'item': {'all': True,
'ipatokenuniqueid': 'NewToken1',
'timelimit': '0',
'sizelimit': '0'}
},
{
'method': 'otptoken_add',
'name': 'NewToken1',
'item': {'ipatokendisabled': 'FALSE',
'all': True}
}
)
changed = True
self._test_base(module_args, return_value, mock_calls, changed)
def test_add_new_all_default_with_aliases(self):
"""Add a new OTP with all default values using alias values"""
module_args = {
'name': 'NewToken1'
}
return_value = {}
mock_calls = (
{
'method': 'otptoken_find',
'name': None,
'item': {'all': True,
'ipatokenuniqueid': 'NewToken1',
'timelimit': '0',
'sizelimit': '0'}
},
{
'method': 'otptoken_add',
'name': 'NewToken1',
'item': {'ipatokendisabled': 'FALSE',
'all': True}
}
)
changed = True
self._test_base(module_args, return_value, mock_calls, changed)
def test_add_new_all_specified(self):
"""Add a new OTP with all default values"""
module_args = {
'uniqueid': 'NewToken1',
'otptype': 'hotp',
'secretkey': 'VGVzdFNlY3JldDE=',
'description': 'Test description',
'owner': 'pinky',
'enabled': True,
'notbefore': '20200101010101',
'notafter': '20900101010101',
'vendor': 'Acme',
'model': 'ModelT',
'serial': 'Number1',
'state': 'present',
'algorithm': 'sha256',
'digits': 6,
'offset': 10,
'interval': 30,
'counter': 30,
}
return_value = {}
mock_calls = (
{
'method': 'otptoken_find',
'name': None,
'item': {'all': True,
'ipatokenuniqueid': 'NewToken1',
'timelimit': '0',
'sizelimit': '0'}
},
{
'method': 'otptoken_add',
'name': 'NewToken1',
'item': {'type': 'HOTP',
'ipatokenotpkey': 'KRSXG5CTMVRXEZLUGE======',
'description': 'Test description',
'ipatokenowner': 'pinky',
'ipatokendisabled': 'FALSE',
'ipatokennotbefore': '20200101010101Z',
'ipatokennotafter': '20900101010101Z',
'ipatokenvendor': 'Acme',
'ipatokenmodel': 'ModelT',
'ipatokenserial': 'Number1',
'ipatokenotpalgorithm': 'sha256',
'ipatokenotpdigits': '6',
'ipatokentotpclockoffset': '10',
'ipatokentotptimestep': '30',
'ipatokenhotpcounter': '30',
'all': True}
}
)
changed = True
self._test_base(module_args, return_value, mock_calls, changed)
def test_already_existing_no_change_all_specified(self):
"""Add a new OTP with all values specified but needing no change"""
module_args = {
'uniqueid': 'NewToken1',
'otptype': 'hotp',
'secretkey': 'VGVzdFNlY3JldDE=',
'description': 'Test description',
'owner': 'pinky',
'enabled': True,
'notbefore': '20200101010101',
'notafter': '20900101010101',
'vendor': 'Acme',
'model': 'ModelT',
'serial': 'Number1',
'state': 'present',
'algorithm': 'sha256',
'digits': 6,
'offset': 10,
'interval': 30,
'counter': 30,
}
return_value = {'ipatokenuniqueid': 'NewToken1',
'type': 'HOTP',
'ipatokenotpkey': [{'__base64__': 'VGVzdFNlY3JldDE='}],
'description': ['Test description'],
'ipatokenowner': ['pinky'],
'ipatokendisabled': ['FALSE'],
'ipatokennotbefore': ['20200101010101Z'],
'ipatokennotafter': ['20900101010101Z'],
'ipatokenvendor': ['Acme'],
'ipatokenmodel': ['ModelT'],
'ipatokenserial': ['Number1'],
'ipatokenotpalgorithm': ['sha256'],
'ipatokenotpdigits': ['6'],
'ipatokentotpclockoffset': ['10'],
'ipatokentotptimestep': ['30'],
'ipatokenhotpcounter': ['30']}
mock_calls = [
{
'method': 'otptoken_find',
'name': None,
'item': {'all': True,
'ipatokenuniqueid': 'NewToken1',
'timelimit': '0',
'sizelimit': '0'}
}
]
changed = False
self._test_base(module_args, return_value, mock_calls, changed)
def test_already_existing_one_change_all_specified(self):
"""Modify an existing OTP with one value specified needing change"""
module_args = {
'uniqueid': 'NewToken1',
'otptype': 'hotp',
'secretkey': 'VGVzdFNlY3JldDE=',
'description': 'Test description',
'owner': 'brain',
'enabled': True,
'notbefore': '20200101010101',
'notafter': '20900101010101',
'vendor': 'Acme',
'model': 'ModelT',
'serial': 'Number1',
'state': 'present',
'algorithm': 'sha256',
'digits': 6,
'offset': 10,
'interval': 30,
'counter': 30,
}
return_value = {'ipatokenuniqueid': 'NewToken1',
'type': 'HOTP',
'ipatokenotpkey': [{'__base64__': 'VGVzdFNlY3JldDE='}],
'description': ['Test description'],
'ipatokenowner': ['pinky'],
'ipatokendisabled': ['FALSE'],
'ipatokennotbefore': ['20200101010101Z'],
'ipatokennotafter': ['20900101010101Z'],
'ipatokenvendor': ['Acme'],
'ipatokenmodel': ['ModelT'],
'ipatokenserial': ['Number1'],
'ipatokenotpalgorithm': ['sha256'],
'ipatokenotpdigits': ['6'],
'ipatokentotpclockoffset': ['10'],
'ipatokentotptimestep': ['30'],
'ipatokenhotpcounter': ['30']}
mock_calls = (
{
'method': 'otptoken_find',
'name': None,
'item': {'all': True,
'ipatokenuniqueid': 'NewToken1',
'timelimit': '0',
'sizelimit': '0'}
},
{
'method': 'otptoken_mod',
'name': 'NewToken1',
'item': {'description': 'Test description',
'ipatokenowner': 'brain',
'ipatokendisabled': 'FALSE',
'ipatokennotbefore': '20200101010101Z',
'ipatokennotafter': '20900101010101Z',
'ipatokenvendor': 'Acme',
'ipatokenmodel': 'ModelT',
'ipatokenserial': 'Number1',
'all': True}
}
)
changed = True
self._test_base(module_args, return_value, mock_calls, changed)
def test_already_existing_all_valid_change_all_specified(self):
"""Modify an existing OTP with all valid values specified needing change"""
module_args = {
'uniqueid': 'NewToken1',
'otptype': 'hotp',
'secretkey': 'VGVzdFNlY3JldDE=',
'description': 'New Test description',
'owner': 'pinky',
'enabled': False,
'notbefore': '20200101010102',
'notafter': '20900101010102',
'vendor': 'NewAcme',
'model': 'NewModelT',
'serial': 'Number2',
'state': 'present',
'algorithm': 'sha256',
'digits': 6,
'offset': 10,
'interval': 30,
'counter': 30,
}
return_value = {'ipatokenuniqueid': 'NewToken1',
'type': 'HOTP',
'ipatokenotpkey': [{'__base64__': 'VGVzdFNlY3JldDE='}],
'description': ['Test description'],
'ipatokenowner': ['pinky'],
'ipatokendisabled': ['FALSE'],
'ipatokennotbefore': ['20200101010101Z'],
'ipatokennotafter': ['20900101010101Z'],
'ipatokenvendor': ['Acme'],
'ipatokenmodel': ['ModelT'],
'ipatokenserial': ['Number1'],
'ipatokenotpalgorithm': ['sha256'],
'ipatokenotpdigits': ['6'],
'ipatokentotpclockoffset': ['10'],
'ipatokentotptimestep': ['30'],
'ipatokenhotpcounter': ['30']}
mock_calls = (
{
'method': 'otptoken_find',
'name': None,
'item': {'all': True,
'ipatokenuniqueid': 'NewToken1',
'timelimit': '0',
'sizelimit': '0'}
},
{
'method': 'otptoken_mod',
'name': 'NewToken1',
'item': {'description': 'New Test description',
'ipatokenowner': 'pinky',
'ipatokendisabled': 'TRUE',
'ipatokennotbefore': '20200101010102Z',
'ipatokennotafter': '20900101010102Z',
'ipatokenvendor': 'NewAcme',
'ipatokenmodel': 'NewModelT',
'ipatokenserial': 'Number2',
'all': True}
}
)
changed = True
self._test_base(module_args, return_value, mock_calls, changed)
def test_delete_existing_token(self):
"""Delete an existing OTP"""
module_args = {
'uniqueid': 'NewToken1',
'state': 'absent'
}
return_value = {'ipatokenuniqueid': 'NewToken1',
'type': 'HOTP',
'ipatokenotpkey': [{'__base64__': 'KRSXG5CTMVRXEZLUGE======'}],
'description': ['Test description'],
'ipatokenowner': ['pinky'],
'ipatokendisabled': ['FALSE'],
'ipatokennotbefore': ['20200101010101Z'],
'ipatokennotafter': ['20900101010101Z'],
'ipatokenvendor': ['Acme'],
'ipatokenmodel': ['ModelT'],
'ipatokenserial': ['Number1'],
'ipatokenotpalgorithm': ['sha256'],
'ipatokenotpdigits': ['6'],
'ipatokentotpclockoffset': ['10'],
'ipatokentotptimestep': ['30'],
'ipatokenhotpcounter': ['30']}
mock_calls = (
{
'method': 'otptoken_find',
'name': None,
'item': {'all': True,
'ipatokenuniqueid': 'NewToken1',
'timelimit': '0',
'sizelimit': '0'}
},
{
'method': 'otptoken_del',
'name': 'NewToken1'
}
)
changed = True
self._test_base(module_args, return_value, mock_calls, changed)
def test_disable_existing_token(self):
"""Disable an existing OTP"""
module_args = {
'uniqueid': 'NewToken1',
'otptype': 'hotp',
'enabled': False
}
return_value = {'ipatokenuniqueid': 'NewToken1',
'type': 'HOTP',
'ipatokenotpkey': [{'__base64__': 'KRSXG5CTMVRXEZLUGE======'}],
'description': ['Test description'],
'ipatokenowner': ['pinky'],
'ipatokendisabled': ['FALSE'],
'ipatokennotbefore': ['20200101010101Z'],
'ipatokennotafter': ['20900101010101Z'],
'ipatokenvendor': ['Acme'],
'ipatokenmodel': ['ModelT'],
'ipatokenserial': ['Number1'],
'ipatokenotpalgorithm': ['sha256'],
'ipatokenotpdigits': ['6'],
'ipatokentotpclockoffset': ['10'],
'ipatokentotptimestep': ['30'],
'ipatokenhotpcounter': ['30']}
mock_calls = (
{
'method': 'otptoken_find',
'name': None,
'item': {'all': True,
'ipatokenuniqueid': 'NewToken1',
'timelimit': '0',
'sizelimit': '0'}
},
{
'method': 'otptoken_mod',
'name': 'NewToken1',
'item': {'ipatokendisabled': 'TRUE',
'all': True}
}
)
changed = True
self._test_base(module_args, return_value, mock_calls, changed)
def test_delete_not_existing_token(self):
"""Delete a OTP that does not exist"""
module_args = {
'uniqueid': 'NewToken1',
'state': 'absent'
}
return_value = {}
mock_calls = [
{
'method': 'otptoken_find',
'name': None,
'item': {'all': True,
'ipatokenuniqueid': 'NewToken1',
'timelimit': '0',
'sizelimit': '0'}
}
]
changed = False
self._test_base(module_args, return_value, mock_calls, changed)
def test_fail_post(self):
"""Fail due to an exception raised from _post_json"""
set_module_args({
'uniqueid': 'NewToken1'
})
with patch_ipa(side_effect=Exception('ERROR MESSAGE')) as (mock_login, mock_post):
with self.assertRaises(AnsibleFailJson) as exec_info:
self.module.main()
self.assertEqual(exec_info.exception.args[0]['msg'], 'ERROR MESSAGE')
if __name__ == '__main__':
unittest.main()