1
0
Fork 0
mirror of https://github.com/ansible-collections/community.general.git synced 2024-09-14 20:13:21 +02:00

Keycloak: add authentication management (#2456) (#2864)

* Allow keycloak_group.py to take token as parameter for the authentification

Refactor get_token to pass module.params + Documentation

Fix unit test and add new one for token as param

Fix identation

Update plugins/modules/identity/keycloak/keycloak_client.py

Co-authored-by: Felix Fontein <felix@fontein.de>

Update plugins/modules/identity/keycloak/keycloak_clienttemplate.py

Co-authored-by: Felix Fontein <felix@fontein.de>

Allow keycloak_group.py to take token as parameter for the authentification

Refactor get_token to pass module.params + Documentation

* Update plugins/module_utils/identity/keycloak/keycloak.py

Co-authored-by: Felix Fontein <felix@fontein.de>

Check if base_url is None before to check format

Update plugins/module_utils/identity/keycloak/keycloak.py

Co-authored-by: Felix Fontein <felix@fontein.de>

Update plugins/modules/identity/keycloak/keycloak_client.py

Co-authored-by: Amin Vakil <info@aminvakil.com>

Update plugins/modules/identity/keycloak/keycloak_clienttemplate.py

Co-authored-by: Amin Vakil <info@aminvakil.com>

Switch to modern syntax for the documentation (e.g. community.general.keycloak_client)

Update keycloak_client.py

Update keycloak_clienttemplate.py

Add keycloak_authentication module to manage authentication

Minor fixex

Fix indent

* Update plugins/modules/identity/keycloak/keycloak_authentication.py

Co-authored-by: Felix Fontein <felix@fontein.de>

Update plugins/modules/identity/keycloak/keycloak_authentication.py

Co-authored-by: Felix Fontein <felix@fontein.de>

Update plugins/modules/identity/keycloak/keycloak_authentication.py

Co-authored-by: Felix Fontein <felix@fontein.de>

Update plugins/modules/identity/keycloak/keycloak_authentication.py

Co-authored-by: Felix Fontein <felix@fontein.de>

Update plugins/modules/identity/keycloak/keycloak_authentication.py

Co-authored-by: Felix Fontein <felix@fontein.de>

Removing variable ANSIBLE_METADATA from beginning of file

Minor fix

Refactoring create_or_update_executions :add change_execution_priority function

Refactoring create_or_update_executions :add create_execution function

Refactoring create_or_update_executions: add create_subflow

Refactoring create_or_update_executions: add update_authentication_executions function

Minor fix

* Using FQCN for the examples

Minor fix

Update plugins/module_utils/identity/keycloak/keycloak.py

Co-authored-by: Felix Fontein <felix@fontein.de>

Update plugins/module_utils/identity/keycloak/keycloak.py

Co-authored-by: Felix Fontein <felix@fontein.de>

Update plugins/module_utils/identity/keycloak/keycloak.py

Co-authored-by: Felix Fontein <felix@fontein.de>

Update plugins/module_utils/identity/keycloak/keycloak.py

Co-authored-by: Felix Fontein <felix@fontein.de>

Update plugins/module_utils/identity/keycloak/keycloak.py

Co-authored-by: Felix Fontein <felix@fontein.de>

Update plugins/module_utils/identity/keycloak/keycloak.py

Co-authored-by: Felix Fontein <felix@fontein.de>

* Update plugins/modules/identity/keycloak/keycloak_authentication.py

Co-authored-by: Felix Fontein <felix@fontein.de>

Update plugins/modules/identity/keycloak/keycloak_authentication.py

Co-authored-by: Felix Fontein <felix@fontein.de>

Refactoring: rename isDictEquals into is_dict_equals

Refactoring: rename variable as authentication_flow

Refactoring: rename variable as new_name

Refactoring: rename variable as flow_list

Refactoring: rename variable as new_flow

Refactoring: changing construction of dict newAuthenticationRepresentation and renaming as new_auth_repr

Minor fix

* Refactoring: rename variables with correct Python syntax (auth_repr, exec_repr)

Move create_or_update_executions function from keycloak.py to keycloak_authentication.py

Minor fix

Remove mock_create_or_update_executions not needed anymore

Fix unit test

Update plugins/module_utils/identity/keycloak/keycloak.py

is_dict_equals function return True if value1 empty

Update plugins/module_utils/identity/keycloak/keycloak.py

Co-authored-by: Alexei Znamensky <103110+russoz@users.noreply.github.com>

Rename is_dict_equal as is_struct_included and rename params as struct1 and struct2

Rename variables according to Python naming conventions

Refactoring: add find_exec_in_executions function in keycloak_authentication to remove code duplication

typo

Add blank line

Add required parameter, either creds or token

Typo

try/except only surround for loop containing struct2[key]

Add sub-options to meta_args

assigment of result['changed'] after if-elif-else block

Fix CI error: parameter-type-not-in-doc

Fix unit test: none value excluded from comparison

Minor fix

Simplify is_struct_included function

Replace 'type(..) is' by isinstance(..)

Remove redundant required=True and redundant parenthesis

Add check_mode, check if value is None (None value added by argument spec checker)

Apply suggestions from code review

Update plugins/modules/identity/keycloak/keycloak_authentication.py

* Update plugins/modules/identity/keycloak/keycloak_authentication.py

* Add index paramter to configure the priority order of the execution

* Minor fix: authenticationConfig dict instead of str

Co-authored-by: Felix Fontein <felix@fontein.de>
(cherry picked from commit 24c5d4320f)

Co-authored-by: Gaetan2907 <48204380+Gaetan2907@users.noreply.github.com>
This commit is contained in:
patchback[bot] 2021-06-24 19:14:46 +02:00 committed by GitHub
parent 82225e5850
commit 2322937a4a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 1323 additions and 2 deletions

View file

@ -33,9 +33,9 @@ import json
import traceback
from ansible.module_utils.urls import open_url
from ansible.module_utils.six.moves.urllib.parse import urlencode
from ansible.module_utils.six.moves.urllib.parse import urlencode, quote
from ansible.module_utils.six.moves.urllib.error import HTTPError
from ansible.module_utils._text import to_native
from ansible.module_utils._text import to_native, to_text
URL_REALMS = "{url}/admin/realms"
URL_REALM = "{url}/admin/realms/{realm}"
@ -51,6 +51,17 @@ URL_CLIENTTEMPLATES = "{url}/admin/realms/{realm}/client-templates"
URL_GROUPS = "{url}/admin/realms/{realm}/groups"
URL_GROUP = "{url}/admin/realms/{realm}/groups/{groupid}"
URL_AUTHENTICATION_FLOWS = "{url}/admin/realms/{realm}/authentication/flows"
URL_AUTHENTICATION_FLOW = "{url}/admin/realms/{realm}/authentication/flows/{id}"
URL_AUTHENTICATION_FLOW_COPY = "{url}/admin/realms/{realm}/authentication/flows/{copyfrom}/copy"
URL_AUTHENTICATION_FLOW_EXECUTIONS = "{url}/admin/realms/{realm}/authentication/flows/{flowalias}/executions"
URL_AUTHENTICATION_FLOW_EXECUTIONS_EXECUTION = "{url}/admin/realms/{realm}/authentication/flows/{flowalias}/executions/execution"
URL_AUTHENTICATION_FLOW_EXECUTIONS_FLOW = "{url}/admin/realms/{realm}/authentication/flows/{flowalias}/executions/flow"
URL_AUTHENTICATION_EXECUTION_CONFIG = "{url}/admin/realms/{realm}/authentication/executions/{id}/config"
URL_AUTHENTICATION_EXECUTION_RAISE_PRIORITY = "{url}/admin/realms/{realm}/authentication/executions/{id}/raise-priority"
URL_AUTHENTICATION_EXECUTION_LOWER_PRIORITY = "{url}/admin/realms/{realm}/authentication/executions/{id}/lower-priority"
URL_AUTHENTICATION_CONFIG = "{url}/admin/realms/{realm}/authentication/config/{id}"
def keycloak_argument_spec():
"""
@ -132,6 +143,59 @@ def get_token(module_params):
}
def is_struct_included(struct1, struct2, exclude=None):
"""
This function compare if the first parameter structure is included in the second.
The function use every elements of struct1 and validates they are present in the struct2 structure.
The two structure does not need to be equals for that function to return true.
Each elements are compared recursively.
:param struct1:
type:
dict for the initial call, can be dict, list, bool, int or str for recursive calls
description:
reference structure
:param struct2:
type:
dict for the initial call, can be dict, list, bool, int or str for recursive calls
description:
structure to compare with first parameter.
:param exclude:
type:
list
description:
Key to exclude from the comparison.
default: None
:return:
type:
bool
description:
Return True if all element of dict 1 are present in dict 2, return false otherwise.
"""
if isinstance(struct1, list) and isinstance(struct2, list):
for item1 in struct1:
if isinstance(item1, (list, dict)):
for item2 in struct2:
if not is_struct_included(item1, item2, exclude):
return False
else:
if item1 not in struct2:
return False
return True
elif isinstance(struct1, dict) and isinstance(struct2, dict):
try:
for key in struct1:
if not (exclude and key in exclude):
if not is_struct_included(struct1[key], struct2[key], exclude):
return False
return True
except KeyError:
return False
elif isinstance(struct1, bool) and isinstance(struct2, bool):
return struct1 == struct2
else:
return to_text(struct1, 'utf-8') == to_text(struct2, 'utf-8')
class KeycloakAPI(object):
""" Keycloak API access; Keycloak uses OAuth 2.0 to protect its API, an access token for which
is obtained through OpenID connect
@ -571,3 +635,254 @@ class KeycloakAPI(object):
except Exception as e:
self.module.fail_json(msg="Unable to delete group %s: %s" % (groupid, str(e)))
def get_authentication_flow_by_alias(self, alias, realm='master'):
"""
Get an authentication flow by it's alias
:param alias: Alias of the authentication flow to get.
:param realm: Realm.
:return: Authentication flow representation.
"""
try:
authentication_flow = {}
# Check if the authentication flow exists on the Keycloak serveraders
authentications = json.load(open_url(URL_AUTHENTICATION_FLOWS.format(url=self.baseurl, realm=realm), method='GET', headers=self.restheaders))
for authentication in authentications:
if authentication["alias"] == alias:
authentication_flow = authentication
break
return authentication_flow
except Exception as e:
self.module.fail_json(msg="Unable get authentication flow %s: %s" % (alias, str(e)))
def delete_authentication_flow_by_id(self, id, realm='master'):
"""
Delete an authentication flow from Keycloak
:param id: id of authentication flow to be deleted
:param realm: realm of client to be deleted
:return: HTTPResponse object on success
"""
flow_url = URL_AUTHENTICATION_FLOW.format(url=self.baseurl, realm=realm, id=id)
try:
return open_url(flow_url, method='DELETE', headers=self.restheaders,
validate_certs=self.validate_certs)
except Exception as e:
self.module.fail_json(msg='Could not delete authentication flow %s in realm %s: %s'
% (id, realm, str(e)))
def copy_auth_flow(self, config, realm='master'):
"""
Create a new authentication flow from a copy of another.
:param config: Representation of the authentication flow to create.
:param realm: Realm.
:return: Representation of the new authentication flow.
"""
try:
new_name = dict(
newName=config["alias"]
)
open_url(
URL_AUTHENTICATION_FLOW_COPY.format(
url=self.baseurl,
realm=realm,
copyfrom=quote(config["copyFrom"])),
method='POST',
headers=self.restheaders,
data=json.dumps(new_name))
flow_list = json.load(
open_url(
URL_AUTHENTICATION_FLOWS.format(url=self.baseurl,
realm=realm),
method='GET',
headers=self.restheaders))
for flow in flow_list:
if flow["alias"] == config["alias"]:
return flow
return None
except Exception as e:
self.module.fail_json(msg='Could not copy authentication flow %s in realm %s: %s'
% (config["alias"], realm, str(e)))
def create_empty_auth_flow(self, config, realm='master'):
"""
Create a new empty authentication flow.
:param config: Representation of the authentication flow to create.
:param realm: Realm.
:return: Representation of the new authentication flow.
"""
try:
new_flow = dict(
alias=config["alias"],
providerId=config["providerId"],
description=config["description"],
topLevel=True
)
open_url(
URL_AUTHENTICATION_FLOWS.format(
url=self.baseurl,
realm=realm),
method='POST',
headers=self.restheaders,
data=json.dumps(new_flow))
flow_list = json.load(
open_url(
URL_AUTHENTICATION_FLOWS.format(
url=self.baseurl,
realm=realm),
method='GET',
headers=self.restheaders))
for flow in flow_list:
if flow["alias"] == config["alias"]:
return flow
return None
except Exception as e:
self.module.fail_json(msg='Could not create empty authentication flow %s in realm %s: %s'
% (config["alias"], realm, str(e)))
def update_authentication_executions(self, flowAlias, updatedExec, realm='master'):
""" Update authentication executions
:param flowAlias: name of the parent flow
:param updatedExec: JSON containing updated execution
:return: HTTPResponse object on success
"""
try:
open_url(
URL_AUTHENTICATION_FLOW_EXECUTIONS.format(
url=self.baseurl,
realm=realm,
flowalias=quote(flowAlias)),
method='PUT',
headers=self.restheaders,
data=json.dumps(updatedExec))
except Exception as e:
self.module.fail_json(msg="Unable to update executions %s: %s" % (updatedExec, str(e)))
def add_authenticationConfig_to_execution(self, executionId, authenticationConfig, realm='master'):
""" Add autenticatorConfig to the execution
:param executionId: id of execution
:param authenticationConfig: config to add to the execution
:return: HTTPResponse object on success
"""
try:
open_url(
URL_AUTHENTICATION_EXECUTION_CONFIG.format(
url=self.baseurl,
realm=realm,
id=executionId),
method='POST',
headers=self.restheaders,
data=json.dumps(authenticationConfig))
except Exception as e:
self.module.fail_json(msg="Unable to add authenticationConfig %s: %s" % (executionId, str(e)))
def create_subflow(self, subflowName, flowAlias, realm='master'):
""" Create new sublow on the flow
:param subflowName: name of the subflow to create
:param flowAlias: name of the parent flow
:return: HTTPResponse object on success
"""
try:
newSubFlow = {}
newSubFlow["alias"] = subflowName
newSubFlow["provider"] = "registration-page-form"
newSubFlow["type"] = "basic-flow"
open_url(
URL_AUTHENTICATION_FLOW_EXECUTIONS_FLOW.format(
url=self.baseurl,
realm=realm,
flowalias=quote(flowAlias)),
method='POST',
headers=self.restheaders,
data=json.dumps(newSubFlow))
except Exception as e:
self.module.fail_json(msg="Unable to create new subflow %s: %s" % (subflowName, str(e)))
def create_execution(self, execution, flowAlias, realm='master'):
""" Create new execution on the flow
:param execution: name of execution to create
:param flowAlias: name of the parent flow
:return: HTTPResponse object on success
"""
try:
newExec = {}
newExec["provider"] = execution["providerId"]
newExec["requirement"] = execution["requirement"]
open_url(
URL_AUTHENTICATION_FLOW_EXECUTIONS_EXECUTION.format(
url=self.baseurl,
realm=realm,
flowalias=quote(flowAlias)),
method='POST',
headers=self.restheaders,
data=json.dumps(newExec))
except Exception as e:
self.module.fail_json(msg="Unable to create new execution %s: %s" % (execution["provider"], str(e)))
def change_execution_priority(self, executionId, diff, realm='master'):
""" Raise or lower execution priority of diff time
:param executionId: id of execution to lower priority
:param realm: realm the client is in
:param diff: Integer number, raise of diff time if positive lower of diff time if negative
:return: HTTPResponse object on success
"""
try:
if diff > 0:
for i in range(diff):
open_url(
URL_AUTHENTICATION_EXECUTION_RAISE_PRIORITY.format(
url=self.baseurl,
realm=realm,
id=executionId),
method='POST',
headers=self.restheaders)
elif diff < 0:
for i in range(-diff):
open_url(
URL_AUTHENTICATION_EXECUTION_LOWER_PRIORITY.format(
url=self.baseurl,
realm=realm,
id=executionId),
method='POST',
headers=self.restheaders)
except Exception as e:
self.module.fail_json(msg="Unable to change execution priority %s: %s" % (executionId, str(e)))
def get_executions_representation(self, config, realm='master'):
"""
Get a representation of the executions for an authentication flow.
:param config: Representation of the authentication flow
:param realm: Realm
:return: Representation of the executions
"""
try:
# Get executions created
executions = json.load(
open_url(
URL_AUTHENTICATION_FLOW_EXECUTIONS.format(
url=self.baseurl,
realm=realm,
flowalias=quote(config["alias"])),
method='GET',
headers=self.restheaders))
for execution in executions:
if "authenticationConfig" in execution:
execConfigId = execution["authenticationConfig"]
execConfig = json.load(
open_url(
URL_AUTHENTICATION_CONFIG.format(
url=self.baseurl,
realm=realm,
id=execConfigId),
method='GET',
headers=self.restheaders))
execution["authenticationConfig"] = execConfig
return executions
except Exception as e:
self.module.fail_json(msg='Could not get executions for authentication flow %s in realm %s: %s'
% (config["alias"], realm, str(e)))

View file

@ -0,0 +1,383 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# Copyright: (c) 2019, INSPQ <philippe.gauthier@inspq.qc.ca>
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import absolute_import, division, print_function
__metaclass__ = type
DOCUMENTATION = '''
---
module: keycloak_authentication
short_description: Configure authentication in Keycloak
description:
- This module actually can only make a copy of an existing authentication flow, add an execution to it and configure it.
- It can also delete the flow.
version_added: "3.3.0"
options:
realm:
description:
- The name of the realm in which is the authentication.
required: true
type: str
alias:
description:
- Alias for the authentication flow.
required: true
type: str
description:
description:
- Description of the flow.
type: str
providerId:
description:
- C(providerId) for the new flow when not copied from an existing flow.
type: str
copyFrom:
description:
- C(flowAlias) of the authentication flow to use for the copy.
type: str
authenticationExecutions:
description:
- Configuration structure for the executions.
type: list
elements: dict
suboptions:
providerId:
description:
- C(providerID) for the new flow when not copied from an existing flow.
type: str
displayName:
description:
- Name of the execution or subflow to create or update.
type: str
requirement:
description:
- Control status of the subflow or execution.
choices: [ "REQUIRED", "ALTERNATIVE", "DISABLED", "CONDITIONAL" ]
type: str
flowAlias:
description:
- Alias of parent flow.
type: str
authenticationConfig:
description:
- Describe the config of the authentication.
type: dict
index:
description:
- Priority order of the execution.
type: int
state:
description:
- Control if the authentication flow must exists or not.
choices: [ "present", "absent" ]
default: present
type: str
force:
type: bool
default: false
description:
- If C(true), allows to remove the authentication flow and recreate it.
extends_documentation_fragment:
- community.general.keycloak
author:
- Philippe Gauthier (@elfelip)
- Gaëtan Daubresse (@Gaetan2907)
'''
EXAMPLES = '''
- name: Create an authentication flow from first broker login and add an execution to it.
community.general.keycloak_authentication:
auth_keycloak_url: http://localhost:8080/auth
auth_realm: master
auth_username: admin
auth_password: password
realm: master
alias: "Copy of first broker login"
copyFrom: "first broker login"
authenticationExecutions:
- providerId: "test-execution1"
requirement: "REQUIRED"
authenticationConfig:
alias: "test.execution1.property"
config:
test1.property: "value"
- providerId: "test-execution2"
requirement: "REQUIRED"
authenticationConfig:
alias: "test.execution2.property"
config:
test2.property: "value"
state: present
- name: Re-create the authentication flow
community.general.keycloak_authentication:
auth_keycloak_url: http://localhost:8080/auth
auth_realm: master
auth_username: admin
auth_password: password
realm: master
alias: "Copy of first broker login"
copyFrom: "first broker login"
authenticationExecutions:
- providerId: "test-provisioning"
requirement: "REQUIRED"
authenticationConfig:
alias: "test.provisioning.property"
config:
test.provisioning.property: "value"
state: present
force: true
- name: Create an authentication flow with subflow containing an execution.
community.general.keycloak_authentication:
auth_keycloak_url: http://localhost:8080/auth
auth_realm: master
auth_username: admin
auth_password: password
realm: master
alias: "Copy of first broker login"
copyFrom: "first broker login"
authenticationExecutions:
- providerId: "test-execution1"
requirement: "REQUIRED"
- displayName: "New Subflow"
requirement: "REQUIRED"
- providerId: "auth-cookie"
requirement: "REQUIRED"
flowAlias: "New Sublow"
state: present
- name: Remove authentication.
community.general.keycloak_authentication:
auth_keycloak_url: http://localhost:8080/auth
auth_realm: master
auth_username: admin
auth_password: password
realm: master
alias: "Copy of first broker login"
state: absent
'''
RETURN = '''
flow:
description: JSON representation for the authentication.
returned: on success
type: dict
'''
from ansible_collections.community.general.plugins.module_utils.identity.keycloak.keycloak \
import KeycloakAPI, camel, keycloak_argument_spec, get_token, KeycloakError, is_struct_included
from ansible.module_utils.basic import AnsibleModule
def find_exec_in_executions(searched_exec, executions):
"""
Search if exec is contained in the executions.
:param searched_exec: Execution to search for.
:param executions: List of executions.
:return: Index of the execution, -1 if not found..
"""
for i, existing_exec in enumerate(executions, start=0):
if ("providerId" in existing_exec and "providerId" in searched_exec and
existing_exec["providerId"] == searched_exec["providerId"] or
"displayName" in existing_exec and "displayName" in searched_exec and
existing_exec["displayName"] == searched_exec["displayName"]):
return i
return -1
def create_or_update_executions(kc, config, realm='master'):
"""
Create or update executions for an authentication flow.
:param kc: Keycloak API access.
:param config: Representation of the authentication flow including it's executions.
:param realm: Realm
:return: True if executions have been modified. False otherwise.
"""
try:
changed = False
if "authenticationExecutions" in config:
for new_exec_index, new_exec in enumerate(config["authenticationExecutions"], start=0):
if new_exec["index"] is not None:
new_exec_index = new_exec["index"]
# Get existing executions on the Keycloak server for this alias
existing_executions = kc.get_executions_representation(config, realm=realm)
exec_found = False
# Get flowalias parent if given
if new_exec["flowAlias"] is not None:
flow_alias_parent = new_exec["flowAlias"]
else:
flow_alias_parent = config["alias"]
# Check if same providerId or displayName name between existing and new execution
exec_index = find_exec_in_executions(new_exec, existing_executions)
if exec_index != -1:
# Remove key that doesn't need to be compared with existing_exec
exclude_key = ["flowAlias"]
for index_key, key in enumerate(new_exec, start=0):
if new_exec[key] is None:
exclude_key.append(key)
# Compare the executions to see if it need changes
if not is_struct_included(new_exec, existing_executions[exec_index], exclude_key) or exec_index != new_exec_index:
changed = True
elif new_exec["providerId"] is not None:
kc.create_execution(new_exec, flowAlias=flow_alias_parent, realm=realm)
changed = True
elif new_exec["displayName"] is not None:
kc.create_subflow(new_exec["displayName"], flow_alias_parent, realm=realm)
changed = True
if changed:
# Get existing executions on the Keycloak server for this alias
existing_executions = kc.get_executions_representation(config, realm=realm)
exec_index = find_exec_in_executions(new_exec, existing_executions)
if exec_index != -1:
# Update the existing execution
updated_exec = {
"id": existing_executions[exec_index]["id"]
}
# add the execution configuration
if new_exec["authenticationConfig"] is not None:
kc.add_authenticationConfig_to_execution(updated_exec["id"], new_exec["authenticationConfig"], realm=realm)
for key in new_exec:
# remove unwanted key for the next API call
if key != "flowAlias" and key != "authenticationConfig":
updated_exec[key] = new_exec[key]
if new_exec["requirement"] is not None:
kc.update_authentication_executions(flow_alias_parent, updated_exec, realm=realm)
diff = exec_index - new_exec_index
kc.change_execution_priority(updated_exec["id"], diff, realm=realm)
return changed
except Exception as e:
kc.module.fail_json(msg='Could not create or update executions for authentication flow %s in realm %s: %s'
% (config["alias"], realm, str(e)))
def main():
"""
Module execution
:return:
"""
argument_spec = keycloak_argument_spec()
meta_args = dict(
realm=dict(type='str', required=True),
alias=dict(type='str', required=True),
providerId=dict(type='str'),
description=dict(type='str'),
copyFrom=dict(type='str'),
authenticationExecutions=dict(type='list', elements='dict',
options=dict(
providerId=dict(type='str'),
displayName=dict(type='str'),
requirement=dict(choices=["REQUIRED", "ALTERNATIVE", "DISABLED", "CONDITIONAL"], type='str'),
flowAlias=dict(type='str'),
authenticationConfig=dict(type='dict'),
index=dict(type='int'),
)),
state=dict(choices=["absent", "present"], default='present'),
force=dict(type='bool', default=False),
)
argument_spec.update(meta_args)
module = AnsibleModule(argument_spec=argument_spec,
supports_check_mode=True,
required_one_of=([['token', 'auth_realm', 'auth_username', 'auth_password']]),
required_together=([['auth_realm', 'auth_username', 'auth_password']])
)
result = dict(changed=False, msg='', flow={})
# Obtain access token, initialize API
try:
connection_header = get_token(module.params)
except KeycloakError as e:
module.fail_json(msg=str(e))
kc = KeycloakAPI(module, connection_header)
realm = module.params.get('realm')
state = module.params.get('state')
force = module.params.get('force')
new_auth_repr = {
"alias": module.params.get("alias"),
"copyFrom": module.params.get("copyFrom"),
"providerId": module.params.get("providerId"),
"authenticationExecutions": module.params.get("authenticationExecutions"),
"description": module.params.get("description"),
"builtIn": module.params.get("builtIn"),
"subflow": module.params.get("subflow"),
}
auth_repr = kc.get_authentication_flow_by_alias(alias=new_auth_repr["alias"], realm=realm)
if auth_repr == {}: # Authentication flow does not exist
if state == 'present': # If desired state is present
result['changed'] = True
if module._diff:
result['diff'] = dict(before='', after=new_auth_repr)
if module.check_mode:
module.exit_json(**result)
# If copyFrom is defined, create authentication flow from a copy
if "copyFrom" in new_auth_repr and new_auth_repr["copyFrom"] is not None:
auth_repr = kc.copy_auth_flow(config=new_auth_repr, realm=realm)
else: # Create an empty authentication flow
auth_repr = kc.create_empty_auth_flow(config=new_auth_repr, realm=realm)
# If the authentication still not exist on the server, raise an exception.
if auth_repr is None:
result['msg'] = "Authentication just created not found: " + str(new_auth_repr)
module.fail_json(**result)
# Configure the executions for the flow
create_or_update_executions(kc=kc, config=new_auth_repr, realm=realm)
# Get executions created
exec_repr = kc.get_executions_representation(config=new_auth_repr, realm=realm)
if exec_repr is not None:
auth_repr["authenticationExecutions"] = exec_repr
result['flow'] = auth_repr
elif state == 'absent': # If desired state is absent.
if module._diff:
result['diff'] = dict(before='', after='')
result['msg'] = new_auth_repr["alias"] + ' absent'
else: # The authentication flow already exist
if state == 'present': # if desired state is present
if force: # If force option is true
# Delete the actual authentication flow
result['changed'] = True
if module._diff:
result['diff'] = dict(before=auth_repr, after=new_auth_repr)
if module.check_mode:
module.exit_json(**result)
kc.delete_authentication_flow_by_id(id=auth_repr["id"], realm=realm)
# If copyFrom is defined, create authentication flow from a copy
if "copyFrom" in new_auth_repr and new_auth_repr["copyFrom"] is not None:
auth_repr = kc.copy_auth_flow(config=new_auth_repr, realm=realm)
else: # Create an empty authentication flow
auth_repr = kc.create_empty_auth_flow(config=new_auth_repr, realm=realm)
# If the authentication still not exist on the server, raise an exception.
if auth_repr is None:
result['msg'] = "Authentication just created not found: " + str(new_auth_repr)
module.fail_json(**result)
# Configure the executions for the flow
if module.check_mode:
module.exit_json(**result)
if create_or_update_executions(kc=kc, config=new_auth_repr, realm=realm):
result['changed'] = True
# Get executions created
exec_repr = kc.get_executions_representation(config=new_auth_repr, realm=realm)
if exec_repr is not None:
auth_repr["authenticationExecutions"] = exec_repr
result['flow'] = auth_repr
elif state == 'absent': # If desired state is absent
result['changed'] = True
# Delete the authentication flow alias.
if module._diff:
result['diff'] = dict(before=auth_repr, after='')
if module.check_mode:
module.exit_json(**result)
kc.delete_authentication_flow_by_id(id=auth_repr["id"], realm=realm)
result['msg'] = 'Authentication flow: {alias} id: {id} is deleted'.format(alias=new_auth_repr['alias'],
id=auth_repr["id"])
module.exit_json(**result)
if __name__ == '__main__':
main()

View file

@ -0,0 +1 @@
./identity/keycloak/keycloak_authentication.py

View file

@ -0,0 +1,622 @@
# -*- coding: utf-8 -*-
# Copyright: (c) 2021, Ansible Project
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import absolute_import, division, print_function
__metaclass__ = type
from contextlib import contextmanager
from ansible_collections.community.general.tests.unit.compat import unittest
from ansible_collections.community.general.tests.unit.compat.mock import call, patch
from ansible_collections.community.general.tests.unit.plugins.modules.utils import AnsibleExitJson, AnsibleFailJson, ModuleTestCase, set_module_args
from ansible_collections.community.general.plugins.modules.identity.keycloak import keycloak_authentication
from itertools import count
from ansible.module_utils.six import StringIO
@contextmanager
def patch_keycloak_api(get_authentication_flow_by_alias=None, copy_auth_flow=None, create_empty_auth_flow=None,
get_executions_representation=None, delete_authentication_flow_by_id=None):
"""Mock context manager for patching the methods in PwPolicyIPAClient that contact the IPA server
Patches the `login` and `_post_json` methods
Keyword arguments are passed to the mock object that patches `_post_json`
No arguments are passed to the mock object that patches `login` because no tests require it
Example::
with patch_ipa(return_value={}) as (mock_login, mock_post):
...
"""
obj = keycloak_authentication.KeycloakAPI
with patch.object(obj, 'get_authentication_flow_by_alias', side_effect=get_authentication_flow_by_alias) \
as mock_get_authentication_flow_by_alias:
with patch.object(obj, 'copy_auth_flow', side_effect=copy_auth_flow) \
as mock_copy_auth_flow:
with patch.object(obj, 'create_empty_auth_flow', side_effect=create_empty_auth_flow) \
as mock_create_empty_auth_flow:
with patch.object(obj, 'get_executions_representation', return_value=get_executions_representation) \
as mock_get_executions_representation:
with patch.object(obj, 'delete_authentication_flow_by_id', side_effect=delete_authentication_flow_by_id) \
as mock_delete_authentication_flow_by_id:
yield mock_get_authentication_flow_by_alias, mock_copy_auth_flow, mock_create_empty_auth_flow, \
mock_get_executions_representation, mock_delete_authentication_flow_by_id
def get_response(object_with_future_response, method, get_id_call_count):
if callable(object_with_future_response):
return object_with_future_response()
if isinstance(object_with_future_response, dict):
return get_response(
object_with_future_response[method], method, get_id_call_count)
if isinstance(object_with_future_response, list):
call_number = next(get_id_call_count)
return get_response(
object_with_future_response[call_number], method, get_id_call_count)
return object_with_future_response
def build_mocked_request(get_id_user_count, response_dict):
def _mocked_requests(*args, **kwargs):
url = args[0]
method = kwargs['method']
future_response = response_dict.get(url, None)
return get_response(future_response, method, get_id_user_count)
return _mocked_requests
def create_wrapper(text_as_string):
"""Allow to mock many times a call to one address.
Without this function, the StringIO is empty for the second call.
"""
def _create_wrapper():
return StringIO(text_as_string)
return _create_wrapper
def mock_good_connection():
token_response = {
'http://keycloak.url/auth/realms/master/protocol/openid-connect/token': create_wrapper('{"access_token": "alongtoken"}'), }
return patch(
'ansible_collections.community.general.plugins.module_utils.identity.keycloak.keycloak.open_url',
side_effect=build_mocked_request(count(), token_response),
autospec=True
)
class TestKeycloakAuthentication(ModuleTestCase):
def setUp(self):
super(TestKeycloakAuthentication, self).setUp()
self.module = keycloak_authentication
def test_create_auth_flow_from_copy(self):
"""Add a new authentication flow from copy of an other flow"""
module_args = {
'auth_keycloak_url': 'http://keycloak.url/auth',
'auth_username': 'admin',
'auth_password': 'admin',
'auth_realm': 'master',
'realm': 'realm-name',
'alias': 'Test create authentication flow copy',
'copyFrom': 'first broker login',
'authenticationExecutions': [
{
'providerId': 'identity-provider-redirector',
'requirement': 'ALTERNATIVE',
},
],
'state': 'present',
}
return_value_auth_flow_before = [{}]
return_value_copied = [{
'id': '2ac059fc-c548-414f-9c9e-84d42bd4944e',
'alias': 'first broker login',
'description': 'browser based authentication',
'providerId': 'basic-flow',
'topLevel': True,
'builtIn': False,
'authenticationExecutions': [
{
'authenticator': 'auth-cookie',
'requirement': 'ALTERNATIVE',
'priority': 10,
'userSetupAllowed': False,
'autheticatorFlow': False
},
],
}]
return_value_executions_after = [
{
'id': 'b678e30c-8469-40a7-8c21-8d0cda76a591',
'requirement': 'ALTERNATIVE',
'displayName': 'Identity Provider Redirector',
'requirementChoices': ['REQUIRED', 'DISABLED'],
'configurable': True,
'providerId': 'identity-provider-redirector',
'level': 0,
'index': 0
},
{
'id': 'fdc208e9-c292-48b7-b7d1-1d98315ee893',
'requirement': 'ALTERNATIVE',
'displayName': 'Cookie',
'requirementChoices': [
'REQUIRED',
'ALTERNATIVE',
'DISABLED'
],
'configurable': False,
'providerId': 'auth-cookie',
'level': 0,
'index': 1
},
]
changed = True
set_module_args(module_args)
# Run the module
with mock_good_connection():
with patch_keycloak_api(get_authentication_flow_by_alias=return_value_auth_flow_before, copy_auth_flow=return_value_copied,
get_executions_representation=return_value_executions_after) \
as (mock_get_authentication_flow_by_alias, mock_copy_auth_flow, mock_create_empty_auth_flow,
mock_get_executions_representation, mock_delete_authentication_flow_by_id):
with self.assertRaises(AnsibleExitJson) as exec_info:
self.module.main()
# Verify number of call on each mock
self.assertEqual(len(mock_get_authentication_flow_by_alias.mock_calls), 1)
self.assertEqual(len(mock_copy_auth_flow.mock_calls), 1)
self.assertEqual(len(mock_create_empty_auth_flow.mock_calls), 0)
self.assertEqual(len(mock_get_executions_representation.mock_calls), 2)
self.assertEqual(len(mock_delete_authentication_flow_by_id.mock_calls), 0)
# Verify that the module's changed status matches what is expected
self.assertIs(exec_info.exception.args[0]['changed'], changed)
def test_create_auth_flow_from_copy_idempotency(self):
"""Add an already existing authentication flow from copy of an other flow to test idempotency"""
module_args = {
'auth_keycloak_url': 'http://keycloak.url/auth',
'auth_username': 'admin',
'auth_password': 'admin',
'auth_realm': 'master',
'realm': 'realm-name',
'alias': 'Test create authentication flow copy',
'copyFrom': 'first broker login',
'authenticationExecutions': [
{
'providerId': 'identity-provider-redirector',
'requirement': 'ALTERNATIVE',
},
],
'state': 'present',
}
return_value_auth_flow_before = [{
'id': '71275d5e-e11f-4be4-b119-0abfa87987a4',
'alias': 'Test create authentication flow copy',
'description': '',
'providerId': 'basic-flow',
'topLevel': True,
'builtIn': False,
'authenticationExecutions': [
{
'authenticator': 'identity-provider-redirector',
'requirement': 'ALTERNATIVE',
'priority': 0,
'userSetupAllowed': False,
'autheticatorFlow': False
},
{
'authenticator': 'auth-cookie',
'requirement': 'ALTERNATIVE',
'priority': 0,
'userSetupAllowed': False,
'autheticatorFlow': False
},
],
}]
return_value_executions_after = [
{
'id': 'b678e30c-8469-40a7-8c21-8d0cda76a591',
'requirement': 'ALTERNATIVE',
'displayName': 'Identity Provider Redirector',
'requirementChoices': ['REQUIRED', 'DISABLED'],
'configurable': True,
'providerId': 'identity-provider-redirector',
'level': 0,
'index': 0
},
{
'id': 'fdc208e9-c292-48b7-b7d1-1d98315ee893',
'requirement': 'ALTERNATIVE',
'displayName': 'Cookie',
'requirementChoices': [
'REQUIRED',
'ALTERNATIVE',
'DISABLED'
],
'configurable': False,
'providerId': 'auth-cookie',
'level': 0,
'index': 1
},
]
changed = False
set_module_args(module_args)
# Run the module
with mock_good_connection():
with patch_keycloak_api(get_authentication_flow_by_alias=return_value_auth_flow_before,
get_executions_representation=return_value_executions_after) \
as (mock_get_authentication_flow_by_alias, mock_copy_auth_flow, mock_create_empty_auth_flow,
mock_get_executions_representation, mock_delete_authentication_flow_by_id):
with self.assertRaises(AnsibleExitJson) as exec_info:
self.module.main()
# Verify number of call on each mock
self.assertEqual(len(mock_get_authentication_flow_by_alias.mock_calls), 1)
self.assertEqual(len(mock_copy_auth_flow.mock_calls), 0)
self.assertEqual(len(mock_create_empty_auth_flow.mock_calls), 0)
self.assertEqual(len(mock_get_executions_representation.mock_calls), 2)
self.assertEqual(len(mock_delete_authentication_flow_by_id.mock_calls), 0)
# Verify that the module's changed status matches what is expected
self.assertIs(exec_info.exception.args[0]['changed'], changed)
def test_create_auth_flow_without_copy(self):
"""Add authentication without copy"""
module_args = {
'auth_keycloak_url': 'http://keycloak.url/auth',
'auth_username': 'admin',
'auth_password': 'admin',
'auth_realm': 'master',
'realm': 'realm-name',
'alias': 'Test create authentication flow copy',
'authenticationExecutions': [
{
'providerId': 'identity-provider-redirector',
'requirement': 'ALTERNATIVE',
'authenticationConfig': {
'alias': 'name',
'config': {
'defaultProvider': 'value'
},
},
},
],
'state': 'present',
}
return_value_auth_flow_before = [{}]
return_value_created_empty_flow = [
{
"alias": "Test of the keycloak_auth module",
"authenticationExecutions": [],
"builtIn": False,
"description": "",
"id": "513f5baa-cc42-47bf-b4b6-1d23ccc0a67f",
"providerId": "basic-flow",
"topLevel": True
},
]
return_value_executions_after = [
{
'id': 'b678e30c-8469-40a7-8c21-8d0cda76a591',
'requirement': 'ALTERNATIVE',
'displayName': 'Identity Provider Redirector',
'requirementChoices': ['REQUIRED', 'DISABLED'],
'configurable': True,
'providerId': 'identity-provider-redirector',
'level': 0,
'index': 0
},
]
changed = True
set_module_args(module_args)
# Run the module
with mock_good_connection():
with patch_keycloak_api(get_authentication_flow_by_alias=return_value_auth_flow_before,
get_executions_representation=return_value_executions_after, create_empty_auth_flow=return_value_created_empty_flow) \
as (mock_get_authentication_flow_by_alias, mock_copy_auth_flow, mock_create_empty_auth_flow,
mock_get_executions_representation, mock_delete_authentication_flow_by_id):
with self.assertRaises(AnsibleExitJson) as exec_info:
self.module.main()
# Verify number of call on each mock
self.assertEqual(len(mock_get_authentication_flow_by_alias.mock_calls), 1)
self.assertEqual(len(mock_copy_auth_flow.mock_calls), 0)
self.assertEqual(len(mock_create_empty_auth_flow.mock_calls), 1)
self.assertEqual(len(mock_get_executions_representation.mock_calls), 3)
self.assertEqual(len(mock_delete_authentication_flow_by_id.mock_calls), 0)
# Verify that the module's changed status matches what is expected
self.assertIs(exec_info.exception.args[0]['changed'], changed)
def test_update_auth_flow_adding_exec(self):
"""Update authentication flow by adding execution"""
module_args = {
'auth_keycloak_url': 'http://keycloak.url/auth',
'auth_username': 'admin',
'auth_password': 'admin',
'auth_realm': 'master',
'realm': 'realm-name',
'alias': 'Test create authentication flow copy',
'authenticationExecutions': [
{
'providerId': 'identity-provider-redirector',
'requirement': 'ALTERNATIVE',
'authenticationConfig': {
'alias': 'name',
'config': {
'defaultProvider': 'value'
},
},
},
],
'state': 'present',
}
return_value_auth_flow_before = [{
'id': '71275d5e-e11f-4be4-b119-0abfa87987a4',
'alias': 'Test create authentication flow copy',
'description': '',
'providerId': 'basic-flow',
'topLevel': True,
'builtIn': False,
'authenticationExecutions': [
{
'authenticator': 'auth-cookie',
'requirement': 'ALTERNATIVE',
'priority': 0,
'userSetupAllowed': False,
'autheticatorFlow': False
},
],
}]
return_value_executions_after = [
{
'id': 'b678e30c-8469-40a7-8c21-8d0cda76a591',
'requirement': 'DISABLED',
'displayName': 'Identity Provider Redirector',
'requirementChoices': ['REQUIRED', 'DISABLED'],
'configurable': True,
'providerId': 'identity-provider-redirector',
'level': 0,
'index': 0
},
{
'id': 'fdc208e9-c292-48b7-b7d1-1d98315ee893',
'requirement': 'ALTERNATIVE',
'displayName': 'Cookie',
'requirementChoices': [
'REQUIRED',
'ALTERNATIVE',
'DISABLED'
],
'configurable': False,
'providerId': 'auth-cookie',
'level': 0,
'index': 1
},
]
changed = True
set_module_args(module_args)
# Run the module
with mock_good_connection():
with patch_keycloak_api(get_authentication_flow_by_alias=return_value_auth_flow_before,
get_executions_representation=return_value_executions_after) \
as (mock_get_authentication_flow_by_alias, mock_copy_auth_flow, mock_create_empty_auth_flow,
mock_get_executions_representation, mock_delete_authentication_flow_by_id):
with self.assertRaises(AnsibleExitJson) as exec_info:
self.module.main()
# Verify number of call on each mock
self.assertEqual(len(mock_get_authentication_flow_by_alias.mock_calls), 1)
self.assertEqual(len(mock_copy_auth_flow.mock_calls), 0)
self.assertEqual(len(mock_create_empty_auth_flow.mock_calls), 0)
self.assertEqual(len(mock_get_executions_representation.mock_calls), 3)
self.assertEqual(len(mock_delete_authentication_flow_by_id.mock_calls), 0)
# Verify that the module's changed status matches what is expected
self.assertIs(exec_info.exception.args[0]['changed'], changed)
def test_delete_auth_flow(self):
"""Delete authentication flow"""
module_args = {
'auth_keycloak_url': 'http://keycloak.url/auth',
'auth_username': 'admin',
'auth_password': 'admin',
'auth_realm': 'master',
'realm': 'realm-name',
'alias': 'Test create authentication flow copy',
'state': 'absent',
}
return_value_auth_flow_before = [{
'id': '71275d5e-e11f-4be4-b119-0abfa87987a4',
'alias': 'Test create authentication flow copy',
'description': '',
'providerId': 'basic-flow',
'topLevel': True,
'builtIn': False,
'authenticationExecutions': [
{
'authenticator': 'auth-cookie',
'requirement': 'ALTERNATIVE',
'priority': 0,
'userSetupAllowed': False,
'autheticatorFlow': False
},
],
}]
changed = True
set_module_args(module_args)
# Run the module
with mock_good_connection():
with patch_keycloak_api(get_authentication_flow_by_alias=return_value_auth_flow_before) \
as (mock_get_authentication_flow_by_alias, mock_copy_auth_flow, mock_create_empty_auth_flow,
mock_get_executions_representation, mock_delete_authentication_flow_by_id):
with self.assertRaises(AnsibleExitJson) as exec_info:
self.module.main()
# Verify number of call on each mock
self.assertEqual(len(mock_get_authentication_flow_by_alias.mock_calls), 1)
self.assertEqual(len(mock_copy_auth_flow.mock_calls), 0)
self.assertEqual(len(mock_create_empty_auth_flow.mock_calls), 0)
self.assertEqual(len(mock_get_executions_representation.mock_calls), 0)
self.assertEqual(len(mock_delete_authentication_flow_by_id.mock_calls), 1)
# Verify that the module's changed status matches what is expected
self.assertIs(exec_info.exception.args[0]['changed'], changed)
def test_delete_auth_flow_idempotency(self):
"""Delete second time authentication flow to test idempotency"""
module_args = {
'auth_keycloak_url': 'http://keycloak.url/auth',
'auth_username': 'admin',
'auth_password': 'admin',
'auth_realm': 'master',
'realm': 'realm-name',
'alias': 'Test create authentication flow copy',
'state': 'absent',
}
return_value_auth_flow_before = [{}]
changed = False
set_module_args(module_args)
# Run the module
with mock_good_connection():
with patch_keycloak_api(get_authentication_flow_by_alias=return_value_auth_flow_before) \
as (mock_get_authentication_flow_by_alias, mock_copy_auth_flow, mock_create_empty_auth_flow,
mock_get_executions_representation, mock_delete_authentication_flow_by_id):
with self.assertRaises(AnsibleExitJson) as exec_info:
self.module.main()
# Verify number of call on each mock
self.assertEqual(len(mock_get_authentication_flow_by_alias.mock_calls), 1)
self.assertEqual(len(mock_copy_auth_flow.mock_calls), 0)
self.assertEqual(len(mock_create_empty_auth_flow.mock_calls), 0)
self.assertEqual(len(mock_get_executions_representation.mock_calls), 0)
self.assertEqual(len(mock_delete_authentication_flow_by_id.mock_calls), 0)
# Verify that the module's changed status matches what is expected
self.assertIs(exec_info.exception.args[0]['changed'], changed)
def test_force_update_auth_flow(self):
"""Delete authentication flow and create new one"""
module_args = {
'auth_keycloak_url': 'http://keycloak.url/auth',
'auth_username': 'admin',
'auth_password': 'admin',
'auth_realm': 'master',
'realm': 'realm-name',
'alias': 'Test create authentication flow copy',
'authenticationExecutions': [
{
'providerId': 'identity-provider-redirector',
'requirement': 'ALTERNATIVE',
'authenticationConfig': {
'alias': 'name',
'config': {
'defaultProvider': 'value'
},
},
},
],
'state': 'present',
'force': 'yes',
}
return_value_auth_flow_before = [{
'id': '71275d5e-e11f-4be4-b119-0abfa87987a4',
'alias': 'Test create authentication flow copy',
'description': '',
'providerId': 'basic-flow',
'topLevel': True,
'builtIn': False,
'authenticationExecutions': [
{
'authenticator': 'auth-cookie',
'requirement': 'ALTERNATIVE',
'priority': 0,
'userSetupAllowed': False,
'autheticatorFlow': False
},
],
}]
return_value_created_empty_flow = [
{
"alias": "Test of the keycloak_auth module",
"authenticationExecutions": [],
"builtIn": False,
"description": "",
"id": "513f5baa-cc42-47bf-b4b6-1d23ccc0a67f",
"providerId": "basic-flow",
"topLevel": True
},
]
return_value_executions_after = [
{
'id': 'b678e30c-8469-40a7-8c21-8d0cda76a591',
'requirement': 'DISABLED',
'displayName': 'Identity Provider Redirector',
'requirementChoices': ['REQUIRED', 'DISABLED'],
'configurable': True,
'providerId': 'identity-provider-redirector',
'level': 0,
'index': 0
},
]
changed = True
set_module_args(module_args)
# Run the module
with mock_good_connection():
with patch_keycloak_api(get_authentication_flow_by_alias=return_value_auth_flow_before,
get_executions_representation=return_value_executions_after, create_empty_auth_flow=return_value_created_empty_flow) \
as (mock_get_authentication_flow_by_alias, mock_copy_auth_flow, mock_create_empty_auth_flow,
mock_get_executions_representation, mock_delete_authentication_flow_by_id):
with self.assertRaises(AnsibleExitJson) as exec_info:
self.module.main()
# Verify number of call on each mock
self.assertEqual(len(mock_get_authentication_flow_by_alias.mock_calls), 1)
self.assertEqual(len(mock_copy_auth_flow.mock_calls), 0)
self.assertEqual(len(mock_create_empty_auth_flow.mock_calls), 1)
self.assertEqual(len(mock_get_executions_representation.mock_calls), 3)
self.assertEqual(len(mock_delete_authentication_flow_by_id.mock_calls), 1)
# Verify that the module's changed status matches what is expected
self.assertIs(exec_info.exception.args[0]['changed'], changed)
if __name__ == '__main__':
unittest.main()