From 9bfd61e1179f48f012fea31c09c4406f2330ac99 Mon Sep 17 00:00:00 2001 From: "patchback[bot]" <45432694+patchback[bot]@users.noreply.github.com> Date: Sun, 21 Mar 2021 11:46:33 +0100 Subject: [PATCH] New module: Add Pritunl VPN user module (net_tools/pritunl/) (#803) (#2071) (cherry picked from commit 68fc48cd1f77945a4cdbad5f1984d062c8ee998f) Co-authored-by: Florian Dambrine --- plugins/doc_fragments/pritunl.py | 43 ++ .../net_tools/pritunl/__init__.py | 0 plugins/module_utils/net_tools/pritunl/api.py | 300 ++++++++++ .../modules/net_tools/pritunl/pritunl_user.py | 343 +++++++++++ .../net_tools/pritunl/pritunl_user_info.py | 171 ++++++ plugins/modules/pritunl_user.py | 1 + plugins/modules/pritunl_user_info.py | 1 + .../net_tools/pritunl/__init__.py | 0 .../net_tools/pritunl/test_api.py | 541 ++++++++++++++++++ .../net_tools/pritunl/test_pritunl_user.py | 208 +++++++ .../pritunl/test_pritunl_user_info.py | 160 ++++++ 11 files changed, 1768 insertions(+) create mode 100644 plugins/doc_fragments/pritunl.py create mode 100644 plugins/module_utils/net_tools/pritunl/__init__.py create mode 100644 plugins/module_utils/net_tools/pritunl/api.py create mode 100644 plugins/modules/net_tools/pritunl/pritunl_user.py create mode 100644 plugins/modules/net_tools/pritunl/pritunl_user_info.py create mode 120000 plugins/modules/pritunl_user.py create mode 120000 plugins/modules/pritunl_user_info.py create mode 100644 tests/unit/plugins/module_utils/net_tools/pritunl/__init__.py create mode 100644 tests/unit/plugins/module_utils/net_tools/pritunl/test_api.py create mode 100644 tests/unit/plugins/modules/net_tools/pritunl/test_pritunl_user.py create mode 100644 tests/unit/plugins/modules/net_tools/pritunl/test_pritunl_user_info.py diff --git a/plugins/doc_fragments/pritunl.py b/plugins/doc_fragments/pritunl.py new file mode 100644 index 0000000000..e2eaff2889 --- /dev/null +++ b/plugins/doc_fragments/pritunl.py @@ -0,0 +1,43 @@ +# -*- coding: utf-8 -*- + +# Copyright: (c) 2021, Florian Dambrine +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import absolute_import, division, print_function + +__metaclass__ = type + + +class ModuleDocFragment(object): + + DOCUMENTATION = r""" +options: + pritunl_url: + type: str + required: true + description: + - URL and port of the Pritunl server on which the API is enabled. + + pritunl_api_token: + type: str + required: true + description: + - API Token of a Pritunl admin user. + - It needs to be enabled in Administrators > USERNAME > Enable Token Authentication. + + pritunl_api_secret: + type: str + required: true + description: + - API Secret found in Administrators > USERNAME > API Secret. + + validate_certs: + type: bool + required: false + default: true + description: + - If certificates should be validated or not. + - This should never be set to C(false), except if you are very sure that + your connection to the server can not be subject to a Man In The Middle + attack. +""" diff --git a/plugins/module_utils/net_tools/pritunl/__init__.py b/plugins/module_utils/net_tools/pritunl/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/plugins/module_utils/net_tools/pritunl/api.py b/plugins/module_utils/net_tools/pritunl/api.py new file mode 100644 index 0000000000..e78f1848eb --- /dev/null +++ b/plugins/module_utils/net_tools/pritunl/api.py @@ -0,0 +1,300 @@ +# -*- coding: utf-8 -*- +# Copyright: (c) 2021, Florian Dambrine +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +""" +Pritunl API that offers CRUD operations on Pritunl Organizations and Users +""" + +from __future__ import absolute_import, division, print_function + +import base64 +import hashlib +import hmac +import json +import time +import uuid + +from ansible.module_utils.six import iteritems +from ansible.module_utils.urls import open_url + +__metaclass__ = type + + +class PritunlException(Exception): + pass + + +def pritunl_argument_spec(): + return dict( + pritunl_url=dict(required=True, type="str"), + pritunl_api_token=dict(required=True, type="str", no_log=False), + pritunl_api_secret=dict(required=True, type="str", no_log=True), + validate_certs=dict(required=False, type="bool", default=True), + ) + + +def get_pritunl_settings(module): + """ + Helper function to set required Pritunl request params from module arguments. + """ + return { + "api_token": module.params.get("pritunl_api_token"), + "api_secret": module.params.get("pritunl_api_secret"), + "base_url": module.params.get("pritunl_url"), + "validate_certs": module.params.get("validate_certs"), + } + + +def _get_pritunl_organizations(api_token, api_secret, base_url, validate_certs=True): + return pritunl_auth_request( + base_url=base_url, + api_token=api_token, + api_secret=api_secret, + method="GET", + path="/organization", + validate_certs=validate_certs, + ) + + +def _get_pritunl_users( + api_token, api_secret, base_url, organization_id, validate_certs=True +): + return pritunl_auth_request( + api_token=api_token, + api_secret=api_secret, + base_url=base_url, + method="GET", + path="/user/%s" % organization_id, + validate_certs=validate_certs, + ) + + +def _delete_pritunl_user( + api_token, api_secret, base_url, organization_id, user_id, validate_certs=True +): + return pritunl_auth_request( + api_token=api_token, + api_secret=api_secret, + base_url=base_url, + method="DELETE", + path="/user/%s/%s" % (organization_id, user_id), + validate_certs=validate_certs, + ) + + +def _post_pritunl_user( + api_token, api_secret, base_url, organization_id, user_data, validate_certs=True +): + return pritunl_auth_request( + api_token=api_token, + api_secret=api_secret, + base_url=base_url, + method="POST", + path="/user/%s" % organization_id, + headers={"Content-Type": "application/json"}, + data=json.dumps(user_data), + validate_certs=validate_certs, + ) + + +def _put_pritunl_user( + api_token, + api_secret, + base_url, + organization_id, + user_id, + user_data, + validate_certs=True, +): + return pritunl_auth_request( + api_token=api_token, + api_secret=api_secret, + base_url=base_url, + method="PUT", + path="/user/%s/%s" % (organization_id, user_id), + headers={"Content-Type": "application/json"}, + data=json.dumps(user_data), + validate_certs=validate_certs, + ) + + +def list_pritunl_organizations( + api_token, api_secret, base_url, validate_certs=True, filters=None +): + orgs = [] + + response = _get_pritunl_organizations( + api_token=api_token, + api_secret=api_secret, + base_url=base_url, + validate_certs=validate_certs, + ) + + if response.getcode() != 200: + raise PritunlException("Could not retrieve organizations from Pritunl") + else: + for org in json.loads(response.read()): + # No filtering + if filters is None: + orgs.append(org) + else: + if not any( + filter_val != org[filter_key] + for filter_key, filter_val in iteritems(filters) + ): + orgs.append(org) + + return orgs + + +def list_pritunl_users( + api_token, api_secret, base_url, organization_id, validate_certs=True, filters=None +): + users = [] + + response = _get_pritunl_users( + api_token=api_token, + api_secret=api_secret, + base_url=base_url, + validate_certs=validate_certs, + organization_id=organization_id, + ) + + if response.getcode() != 200: + raise PritunlException("Could not retrieve users from Pritunl") + else: + for user in json.loads(response.read()): + # No filtering + if filters is None: + users.append(user) + + else: + if not any( + filter_val != user[filter_key] + for filter_key, filter_val in iteritems(filters) + ): + users.append(user) + + return users + + +def post_pritunl_user( + api_token, + api_secret, + base_url, + organization_id, + user_data, + user_id=None, + validate_certs=True, +): + # If user_id is provided will do PUT otherwise will do POST + if user_id is None: + response = _post_pritunl_user( + api_token=api_token, + api_secret=api_secret, + base_url=base_url, + organization_id=organization_id, + user_data=user_data, + validate_certs=True, + ) + + if response.getcode() != 200: + raise PritunlException( + "Could not remove user %s from organization %s from Pritunl" + % (user_id, organization_id) + ) + # user POST request returns an array of a single item, + # so return this item instead of the list + return json.loads(response.read())[0] + else: + response = _put_pritunl_user( + api_token=api_token, + api_secret=api_secret, + base_url=base_url, + organization_id=organization_id, + user_data=user_data, + user_id=user_id, + validate_certs=True, + ) + + if response.getcode() != 200: + raise PritunlException( + "Could not update user %s from organization %s from Pritunl" + % (user_id, organization_id) + ) + # The user PUT request returns the updated user object + return json.loads(response.read()) + + +def delete_pritunl_user( + api_token, api_secret, base_url, organization_id, user_id, validate_certs=True +): + response = _delete_pritunl_user( + api_token=api_token, + api_secret=api_secret, + base_url=base_url, + organization_id=organization_id, + user_id=user_id, + validate_certs=True, + ) + + if response.getcode() != 200: + raise PritunlException( + "Could not remove user %s from organization %s from Pritunl" + % (user_id, organization_id) + ) + + return json.loads(response.read()) + + +def pritunl_auth_request( + api_token, + api_secret, + base_url, + method, + path, + validate_certs=True, + headers=None, + data=None, +): + """ + Send an API call to a Pritunl server. + Taken from https://pritunl.com/api and adaped work with Ansible open_url + """ + auth_timestamp = str(int(time.time())) + auth_nonce = uuid.uuid4().hex + + auth_string = "&".join( + [api_token, auth_timestamp, auth_nonce, method.upper(), path] + + ([data] if data else []) + ) + + auth_signature = base64.b64encode( + hmac.new( + api_secret.encode("utf-8"), auth_string.encode("utf-8"), hashlib.sha256 + ).digest() + ) + + auth_headers = { + "Auth-Token": api_token, + "Auth-Timestamp": auth_timestamp, + "Auth-Nonce": auth_nonce, + "Auth-Signature": auth_signature, + } + + if headers: + auth_headers.update(headers) + + try: + uri = "%s%s" % (base_url, path) + + return open_url( + uri, + method=method.upper(), + headers=auth_headers, + data=data, + validate_certs=validate_certs, + ) + except Exception as e: + raise PritunlException(e) diff --git a/plugins/modules/net_tools/pritunl/pritunl_user.py b/plugins/modules/net_tools/pritunl/pritunl_user.py new file mode 100644 index 0000000000..3d1c7f338f --- /dev/null +++ b/plugins/modules/net_tools/pritunl/pritunl_user.py @@ -0,0 +1,343 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- +# Copyright: (c) 2021, Florian Dambrine +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import absolute_import, division, print_function + +__metaclass__ = type + +DOCUMENTATION = """ +--- +module: pritunl_user +author: "Florian Dambrine (@Lowess)" +version_added: 2.3.0 +short_description: Manage Pritunl Users using the Pritunl API +description: + - A module to manage Pritunl users using the Pritunl API. +extends_documentation_fragment: + - community.general.pritunl +options: + organization: + type: str + required: true + aliases: + - org + description: + - The name of the organization the user is part of. + + state: + type: str + default: 'present' + choices: + - present + - absent + description: + - If C(present), the module adds user I(user_name) to + the Pritunl I(organization). If C(absent), removes the user + I(user_name) from the Pritunl I(organization). + + user_name: + type: str + required: true + default: null + description: + - Name of the user to create or delete from Pritunl. + + user_email: + type: str + required: false + default: null + description: + - Email address associated with the user I(user_name). + + user_type: + type: str + required: false + default: client + choices: + - client + - server + description: + - Type of the user I(user_name). + + user_groups: + type: list + elements: str + required: false + default: null + description: + - List of groups associated with the user I(user_name). + + user_disabled: + type: bool + required: false + default: null + description: + - Enable/Disable the user I(user_name). + + user_gravatar: + type: bool + required: false + default: null + description: + - Enable/Disable Gravatar usage for the user I(user_name). +""" + +EXAMPLES = """ +- name: Create the user Foo with email address foo@bar.com in MyOrg + community.general.pritunl_user: + state: present + name: MyOrg + user_name: Foo + user_email: foo@bar.com + +- name: Disable the user Foo but keep it in Pritunl + community.general.pritunl_user: + state: present + name: MyOrg + user_name: Foo + user_email: foo@bar.com + user_disabled: yes + +- name: Make sure the user Foo is not part of MyOrg anymore + community.general.pritunl_user: + state: absent + name: MyOrg + user_name: Foo +""" + +RETURN = """ +response: + description: JSON representation of Pritunl Users. + returned: success + type: dict + sample: + { + "audit": false, + "auth_type": "google", + "bypass_secondary": false, + "client_to_client": false, + "disabled": false, + "dns_mapping": null, + "dns_servers": null, + "dns_suffix": null, + "email": "foo@bar.com", + "gravatar": true, + "groups": [ + "foo", "bar" + ], + "id": "5d070dafe63q3b2e6s472c3b", + "name": "foo@acme.com", + "network_links": [], + "organization": "58070daee6sf342e6e4s2c36", + "organization_name": "Acme", + "otp_auth": true, + "otp_secret": "35H5EJA3XB2$4CWG", + "pin": false, + "port_forwarding": [], + "servers": [], + } +""" + + +from ansible.module_utils.basic import AnsibleModule +from ansible.module_utils._text import to_native +from ansible.module_utils.common.dict_transformations import dict_merge +from ansible_collections.community.general.plugins.module_utils.net_tools.pritunl.api import ( + PritunlException, + delete_pritunl_user, + get_pritunl_settings, + list_pritunl_organizations, + list_pritunl_users, + post_pritunl_user, + pritunl_argument_spec, +) + + +def add_or_update_pritunl_user(module): + result = {} + + org_name = module.params.get("organization") + user_name = module.params.get("user_name") + + user_params = { + "name": user_name, + "email": module.params.get("user_email"), + "groups": module.params.get("user_groups"), + "disabled": module.params.get("user_disabled"), + "gravatar": module.params.get("user_gravatar"), + "type": module.params.get("user_type"), + } + + org_obj_list = list_pritunl_organizations( + **dict_merge( + get_pritunl_settings(module), + {"filters": {"name": org_name}}, + ) + ) + + if len(org_obj_list) == 0: + module.fail_json( + msg="Can not add user to organization '%s' which does not exist" % org_name + ) + + org_id = org_obj_list[0]["id"] + + # Grab existing users from this org + users = list_pritunl_users( + **dict_merge( + get_pritunl_settings(module), + { + "organization_id": org_id, + "filters": {"name": user_name}, + }, + ) + ) + + # Check if the pritunl user already exists + if len(users) > 0: + # Compare remote user params with local user_params and trigger update if needed + user_params_changed = False + for key in user_params.keys(): + # When a param is not specified grab existing ones to prevent from changing it with the PUT request + if user_params[key] is None: + user_params[key] = users[0][key] + + # 'groups' is a list comparison + if key == "groups": + if set(users[0][key]) != set(user_params[key]): + user_params_changed = True + + # otherwise it is either a boolean or a string + else: + if users[0][key] != user_params[key]: + user_params_changed = True + + # Trigger a PUT on the API to update the current user if settings have changed + if user_params_changed: + response = post_pritunl_user( + **dict_merge( + get_pritunl_settings(module), + { + "organization_id": org_id, + "user_id": users[0]["id"], + "user_data": user_params, + }, + ) + ) + + result["changed"] = True + result["response"] = response + else: + result["changed"] = False + result["response"] = users + else: + response = post_pritunl_user( + **dict_merge( + get_pritunl_settings(module), + { + "organization_id": org_id, + "user_data": user_params, + }, + ) + ) + result["changed"] = True + result["response"] = response + + module.exit_json(**result) + + +def remove_pritunl_user(module): + result = {} + + org_name = module.params.get("organization") + user_name = module.params.get("user_name") + + org_obj_list = [] + + org_obj_list = list_pritunl_organizations( + **dict_merge( + get_pritunl_settings(module), + { + "filters": {"name": org_name}, + }, + ) + ) + + if len(org_obj_list) == 0: + module.fail_json( + msg="Can not remove user '%s' from a non existing organization '%s'" + % (user_name, org_name) + ) + + org_id = org_obj_list[0]["id"] + + # Grab existing users from this org + users = list_pritunl_users( + **dict_merge( + get_pritunl_settings(module), + { + "organization_id": org_id, + "filters": {"name": user_name}, + }, + ) + ) + + # Check if the pritunl user exists, if not, do nothing + if len(users) == 0: + result["changed"] = False + result["response"] = {} + + # Otherwise remove the org from Pritunl + else: + response = delete_pritunl_user( + **dict_merge( + get_pritunl_settings(module), + { + "organization_id": org_id, + "user_id": users[0]["id"], + }, + ) + ) + result["changed"] = True + result["response"] = response + + module.exit_json(**result) + + +def main(): + argument_spec = pritunl_argument_spec() + + argument_spec.update( + dict( + organization=dict(required=True, type="str", aliases=["org"]), + state=dict( + required=False, choices=["present", "absent"], default="present" + ), + user_name=dict(required=True, type="str"), + user_type=dict( + required=False, choices=["client", "server"], default="client" + ), + user_email=dict(required=False, type="str", default=None), + user_groups=dict(required=False, type="list", elements="str", default=None), + user_disabled=dict(required=False, type="bool", default=None), + user_gravatar=dict(required=False, type="bool", default=None), + ) + ), + + module = AnsibleModule(argument_spec=argument_spec) + + state = module.params.get("state") + + try: + if state == "present": + add_or_update_pritunl_user(module) + elif state == "absent": + remove_pritunl_user(module) + except PritunlException as e: + module.fail_json(msg=to_native(e)) + + +if __name__ == "__main__": + main() diff --git a/plugins/modules/net_tools/pritunl/pritunl_user_info.py b/plugins/modules/net_tools/pritunl/pritunl_user_info.py new file mode 100644 index 0000000000..c00da6dc23 --- /dev/null +++ b/plugins/modules/net_tools/pritunl/pritunl_user_info.py @@ -0,0 +1,171 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- +# Copyright: (c) 2021, Florian Dambrine +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import absolute_import, division, print_function + +__metaclass__ = type + +DOCUMENTATION = """ +--- +module: pritunl_user_info +author: "Florian Dambrine (@Lowess)" +version_added: 2.3.0 +short_description: List Pritunl Users using the Pritunl API +description: + - A module to list Pritunl users using the Pritunl API. +extends_documentation_fragment: + - community.general.pritunl +options: + organization: + type: str + required: true + aliases: + - org + description: + - The name of the organization the user is part of. + + user_name: + type: str + required: false + description: + - Name of the user to filter on Pritunl. + + user_type: + type: str + required: false + default: client + choices: + - client + - server + description: + - Type of the user I(user_name). +""" + +EXAMPLES = """ +- name: List all existing users part of the organization MyOrg + community.general.pritunl_user_info: + state: list + organization: MyOrg + +- name: Search for the user named Florian part of the organization MyOrg + community.general.pritunl_user_info: + state: list + organization: MyOrg + user_name: Florian +""" + +RETURN = """ +users: + description: List of Pritunl users. + returned: success + type: list + elements: dict + sample: + [ + { + "audit": false, + "auth_type": "google", + "bypass_secondary": false, + "client_to_client": false, + "disabled": false, + "dns_mapping": null, + "dns_servers": null, + "dns_suffix": null, + "email": "foo@bar.com", + "gravatar": true, + "groups": [ + "foo", "bar" + ], + "id": "5d070dafe63q3b2e6s472c3b", + "name": "foo@acme.com", + "network_links": [], + "organization": "58070daee6sf342e6e4s2c36", + "organization_name": "Acme", + "otp_auth": true, + "otp_secret": "35H5EJA3XB2$4CWG", + "pin": false, + "port_forwarding": [], + "servers": [], + } + ] +""" + +from ansible.module_utils.basic import AnsibleModule +from ansible.module_utils._text import to_native +from ansible.module_utils.common.dict_transformations import dict_merge +from ansible_collections.community.general.plugins.module_utils.net_tools.pritunl.api import ( + PritunlException, + get_pritunl_settings, + list_pritunl_organizations, + list_pritunl_users, + pritunl_argument_spec, +) + + +def get_pritunl_user(module): + user_name = module.params.get("user_name") + user_type = module.params.get("user_type") + org_name = module.params.get("organization") + + org_obj_list = [] + + org_obj_list = list_pritunl_organizations( + **dict_merge(get_pritunl_settings(module), {"filters": {"name": org_name}}) + ) + + if len(org_obj_list) == 0: + module.fail_json( + msg="Can not list users from the organization '%s' which does not exist" + % org_name + ) + + org_id = org_obj_list[0]["id"] + + users = list_pritunl_users( + **dict_merge( + get_pritunl_settings(module), + { + "organization_id": org_id, + "filters": ( + {"type": user_type} + if user_name is None + else {"name": user_name, "type": user_type} + ), + }, + ) + ) + + result = {} + result["changed"] = False + result["users"] = users + + module.exit_json(**result) + + +def main(): + argument_spec = pritunl_argument_spec() + + argument_spec.update( + dict( + organization=dict(required=True, type="str", aliases=["org"]), + user_name=dict(required=False, type="str", default=None), + user_type=dict( + required=False, + choices=["client", "server"], + default="client", + ), + ) + ), + + module = AnsibleModule(argument_spec=argument_spec, supports_check_mode=True) + + try: + get_pritunl_user(module) + except PritunlException as e: + module.fail_json(msg=to_native(e)) + + +if __name__ == "__main__": + main() diff --git a/plugins/modules/pritunl_user.py b/plugins/modules/pritunl_user.py new file mode 120000 index 0000000000..25a91db66b --- /dev/null +++ b/plugins/modules/pritunl_user.py @@ -0,0 +1 @@ +./net_tools/pritunl/pritunl_user.py \ No newline at end of file diff --git a/plugins/modules/pritunl_user_info.py b/plugins/modules/pritunl_user_info.py new file mode 120000 index 0000000000..bfabbe0c8c --- /dev/null +++ b/plugins/modules/pritunl_user_info.py @@ -0,0 +1 @@ +net_tools/pritunl/pritunl_user_info.py \ No newline at end of file diff --git a/tests/unit/plugins/module_utils/net_tools/pritunl/__init__.py b/tests/unit/plugins/module_utils/net_tools/pritunl/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/unit/plugins/module_utils/net_tools/pritunl/test_api.py b/tests/unit/plugins/module_utils/net_tools/pritunl/test_api.py new file mode 100644 index 0000000000..1d78a6b555 --- /dev/null +++ b/tests/unit/plugins/module_utils/net_tools/pritunl/test_api.py @@ -0,0 +1,541 @@ +# -*- coding: utf-8 -*- +# Copyright: (c) 2021, Florian Dambrine +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import absolute_import, division, print_function + +import json + +import pytest +from ansible.module_utils.common.dict_transformations import dict_merge +from ansible.module_utils.six import iteritems +from ansible_collections.community.general.plugins.module_utils.net_tools.pritunl import api +from mock import MagicMock + +__metaclass__ = type + + +# Pritunl Mocks + + +class PritunlListOrganizationMock(MagicMock): + """Pritunl API Mock for organization GET API calls.""" + + def getcode(self): + return 200 + + def read(self): + return json.dumps( + [ + { + "auth_api": False, + "name": "Foo", + "auth_token": None, + "user_count": 0, + "auth_secret": None, + "id": "csftwlu6uhralzi2dpmhekz3", + }, + { + "auth_api": False, + "name": "GumGum", + "auth_token": None, + "user_count": 3, + "auth_secret": None, + "id": "58070daee63f3b2e6e472c36", + }, + { + "auth_api": False, + "name": "Bar", + "auth_token": None, + "user_count": 0, + "auth_secret": None, + "id": "v1sncsxxybnsylc8gpqg85pg", + }, + ] + ) + + +class PritunlListUserMock(MagicMock): + """Pritunl API Mock for user GET API calls.""" + + def getcode(self): + return 200 + + def read(self): + return json.dumps( + [ + { + "auth_type": "google", + "dns_servers": None, + "pin": True, + "dns_suffix": None, + "servers": [ + { + "status": False, + "platform": None, + "server_id": "580711322bb66c1d59b9568f", + "virt_address6": "fd00:c0a8: 9700: 0: 192: 168: 101: 27", + "virt_address": "192.168.101.27", + "name": "vpn-A", + "real_address": None, + "connected_since": None, + "id": "580711322bb66c1d59b9568f", + "device_name": None, + }, + { + "status": False, + "platform": None, + "server_id": "5dad2cc6e63f3b3f4a6dfea5", + "virt_address6": "fd00:c0a8:f200: 0: 192: 168: 201: 37", + "virt_address": "192.168.201.37", + "name": "vpn-B", + "real_address": None, + "connected_since": None, + "id": "5dad2cc6e63f3b3f4a6dfea5", + "device_name": None, + }, + ], + "disabled": False, + "network_links": [], + "port_forwarding": [], + "id": "58070dafe63f3b2e6e472c3b", + "organization_name": "GumGum", + "type": "server", + "email": "bot@company.com", + "status": True, + "dns_mapping": None, + "otp_secret": "123456789ABCDEFG", + "client_to_client": False, + "sso": "google", + "bypass_secondary": False, + "groups": ["admin", "multiregion"], + "audit": False, + "name": "bot", + "gravatar": True, + "otp_auth": True, + "organization": "58070daee63f3b2e6e472c36", + }, + { + "auth_type": "google", + "dns_servers": None, + "pin": True, + "dns_suffix": None, + "servers": [ + { + "status": False, + "platform": None, + "server_id": "580711322bb66c1d59b9568f", + "virt_address6": "fd00:c0a8: 9700: 0: 192: 168: 101: 27", + "virt_address": "192.168.101.27", + "name": "vpn-A", + "real_address": None, + "connected_since": None, + "id": "580711322bb66c1d59b9568f", + "device_name": None, + }, + { + "status": False, + "platform": None, + "server_id": "5dad2cc6e63f3b3f4a6dfea5", + "virt_address6": "fd00:c0a8:f200: 0: 192: 168: 201: 37", + "virt_address": "192.168.201.37", + "name": "vpn-B", + "real_address": None, + "connected_since": None, + "id": "5dad2cc6e63f3b3f4a6dfea5", + "device_name": None, + }, + ], + "disabled": False, + "network_links": [], + "port_forwarding": [], + "id": "58070dafe63f3b2e6e472c3b", + "organization_name": "GumGum", + "type": "client", + "email": "florian@company.com", + "status": True, + "dns_mapping": None, + "otp_secret": "123456789ABCDEFG", + "client_to_client": False, + "sso": "google", + "bypass_secondary": False, + "groups": ["web", "database"], + "audit": False, + "name": "florian", + "gravatar": True, + "otp_auth": True, + "organization": "58070daee63f3b2e6e472c36", + }, + { + "auth_type": "google", + "dns_servers": None, + "pin": True, + "dns_suffix": None, + "servers": [ + { + "status": False, + "platform": None, + "server_id": "580711322bb66c1d59b9568f", + "virt_address6": "fd00:c0a8: 9700: 0: 192: 168: 101: 27", + "virt_address": "192.168.101.27", + "name": "vpn-A", + "real_address": None, + "connected_since": None, + "id": "580711322bb66c1d59b9568f", + "device_name": None, + }, + { + "status": False, + "platform": None, + "server_id": "5dad2cc6e63f3b3f4a6dfea5", + "virt_address6": "fd00:c0a8:f200: 0: 192: 168: 201: 37", + "virt_address": "192.168.201.37", + "name": "vpn-B", + "real_address": None, + "connected_since": None, + "id": "5dad2cc6e63f3b3f4a6dfea5", + "device_name": None, + }, + ], + "disabled": False, + "network_links": [], + "port_forwarding": [], + "id": "58070dafe63f3b2e6e472c3b", + "organization_name": "GumGum", + "type": "server", + "email": "ops@company.com", + "status": True, + "dns_mapping": None, + "otp_secret": "123456789ABCDEFG", + "client_to_client": False, + "sso": "google", + "bypass_secondary": False, + "groups": ["web", "database"], + "audit": False, + "name": "ops", + "gravatar": True, + "otp_auth": True, + "organization": "58070daee63f3b2e6e472c36", + }, + ] + ) + + +class PritunlErrorMock(MagicMock): + """Pritunl API Mock for API call failures.""" + + def getcode(self): + return 500 + + def read(self): + return "{}" + + +class PritunlPostUserMock(MagicMock): + """Pritunl API Mock for POST API calls.""" + + def getcode(self): + return 200 + + def read(self): + return json.dumps( + [ + { + "auth_type": "local", + "disabled": False, + "dns_servers": None, + "otp_secret": "6M4UWP2BCJBSYZAT", + "name": "alice", + "pin": False, + "dns_suffix": None, + "client_to_client": False, + "email": "alice@company.com", + "organization_name": "GumGum", + "bypass_secondary": False, + "groups": ["a", "b"], + "organization": "58070daee63f3b2e6e472c36", + "port_forwarding": [], + "type": "client", + "id": "590add71e63f3b72d8bb951a", + } + ] + ) + + +class PritunlPutUserMock(MagicMock): + """Pritunl API Mock for PUT API calls.""" + + def getcode(self): + return 200 + + def read(self): + return json.dumps( + { + "auth_type": "local", + "disabled": True, + "dns_servers": None, + "otp_secret": "WEJANJYMF3Q2QSLG", + "name": "bob", + "pin": False, + "dns_suffix": False, + "client_to_client": False, + "email": "bob@company.com", + "organization_name": "GumGum", + "bypass_secondary": False, + "groups": ["c", "d"], + "organization": "58070daee63f3b2e6e472c36", + "port_forwarding": [], + "type": "client", + "id": "590add71e63f3b72d8bb951a", + } + ) + + +class PritunlDeleteUserMock(MagicMock): + """Pritunl API Mock for DELETE API calls.""" + + def getcode(self): + return 200 + + def read(self): + return "{}" + + +# Ansible Module Mock and Pytest mock fixtures + + +class ModuleFailException(Exception): + def __init__(self, msg, **kwargs): + super(ModuleFailException, self).__init__(msg) + self.fail_msg = msg + self.fail_kwargs = kwargs + + +@pytest.fixture +def pritunl_settings(): + return { + "api_token": "token", + "api_secret": "secret", + "base_url": "https://pritunl.domain.com", + "validate_certs": True, + } + + +@pytest.fixture +def pritunl_user_data(): + return { + "name": "alice", + "email": "alice@company.com", + "groups": ["a", "b"], + "disabled": False, + "type": "client", + } + + +@pytest.fixture +def get_pritunl_organization_mock(): + return PritunlListOrganizationMock() + + +@pytest.fixture +def get_pritunl_user_mock(): + return PritunlListUserMock() + + +@pytest.fixture +def get_pritunl_error_mock(): + return PritunlErrorMock() + + +@pytest.fixture +def post_pritunl_user_mock(): + return PritunlPostUserMock() + + +@pytest.fixture +def put_pritunl_user_mock(): + return PritunlPutUserMock() + + +@pytest.fixture +def delete_pritunl_user_mock(): + return PritunlDeleteUserMock() + + +class TestPritunlApi: + """ + Test class to validate CRUD operations on Pritunl. + """ + + # Test for GET / list operation on Pritunl API + @pytest.mark.parametrize( + "org_id,org_user_count", + [ + ("58070daee63f3b2e6e472c36", 3), + ("v1sncsxxybnsylc8gpqg85pg", 0), + ], + ) + def test_list_all_pritunl_organization( + self, + pritunl_settings, + get_pritunl_organization_mock, + org_id, + org_user_count, + ): + api._get_pritunl_organizations = get_pritunl_organization_mock() + + response = api.list_pritunl_organizations(**pritunl_settings) + + assert len(response) == 3 + + for org in response: + if org["id"] == org_id: + org["user_count"] == org_user_count + + @pytest.mark.parametrize( + "org_filters,org_expected", + [ + ({"id": "58070daee63f3b2e6e472c36"}, "GumGum"), + ({"name": "GumGum"}, "GumGum"), + ], + ) + def test_list_filtered_pritunl_organization( + self, + pritunl_settings, + get_pritunl_organization_mock, + org_filters, + org_expected, + ): + api._get_pritunl_organizations = get_pritunl_organization_mock() + + response = api.list_pritunl_organizations( + **dict_merge(pritunl_settings, {"filters": org_filters}) + ) + + assert len(response) == 1 + assert response[0]["name"] == org_expected + + @pytest.mark.parametrize( + "org_id,org_user_count", + [("58070daee63f3b2e6e472c36", 3)], + ) + def test_list_all_pritunl_user( + self, pritunl_settings, get_pritunl_user_mock, org_id, org_user_count + ): + api._get_pritunl_users = get_pritunl_user_mock() + + response = api.list_pritunl_users( + **dict_merge(pritunl_settings, {"organization_id": org_id}) + ) + + assert len(response) == org_user_count + + @pytest.mark.parametrize( + "org_id,user_filters,user_expected", + [ + ("58070daee63f3b2e6e472c36", {"email": "bot@company.com"}, "bot"), + ("58070daee63f3b2e6e472c36", {"name": "florian"}, "florian"), + ], + ) + def test_list_filtered_pritunl_user( + self, + pritunl_settings, + get_pritunl_user_mock, + org_id, + user_filters, + user_expected, + ): + api._get_pritunl_users = get_pritunl_user_mock() + + response = api.list_pritunl_users( + **dict_merge( + pritunl_settings, {"organization_id": org_id, "filters": user_filters} + ) + ) + + assert len(response) > 0 + + for user in response: + assert user["organization"] == org_id + assert user["name"] == user_expected + + # Test for POST operation on Pritunl API + @pytest.mark.parametrize("org_id", [("58070daee63f3b2e6e472c36")]) + def test_add_and_update_pritunl_user( + self, + pritunl_settings, + pritunl_user_data, + post_pritunl_user_mock, + put_pritunl_user_mock, + org_id, + ): + api._post_pritunl_user = post_pritunl_user_mock() + api._put_pritunl_user = put_pritunl_user_mock() + + create_response = api.post_pritunl_user( + **dict_merge( + pritunl_settings, + { + "organization_id": org_id, + "user_data": pritunl_user_data, + }, + ) + ) + + # Ensure provided settings match with the ones returned by Pritunl + for k, v in iteritems(pritunl_user_data): + assert create_response[k] == v + + # Update the newly created user to ensure only certain settings are changed + + user_updates = { + "name": "bob", + "email": "bob@company.com", + "disabled": True, + } + + update_response = api.post_pritunl_user( + **dict_merge( + pritunl_settings, + { + "organization_id": org_id, + "user_id": create_response["id"], + "user_data": dict_merge(pritunl_user_data, user_updates), + }, + ) + ) + + # Ensure only certain settings changed and the rest remained untouched. + for k, v in iteritems(update_response): + if k in update_response: + assert update_response[k] == v + else: + assert update_response[k] == create_response[k] + + # Test for DELETE operation on Pritunl API + @pytest.mark.parametrize( + "org_id,user_id", [("58070daee63f3b2e6e472c36", "590add71e63f3b72d8bb951a")] + ) + def test_delete_pritunl_user( + self, pritunl_settings, org_id, user_id, delete_pritunl_user_mock + ): + api._delete_pritunl_user = delete_pritunl_user_mock() + + response = api.delete_pritunl_user( + **dict_merge( + pritunl_settings, + { + "organization_id": org_id, + "user_id": user_id, + }, + ) + ) + + assert response == {} + + # Test API call errors + def test_pritunl_error(self, pritunl_settings, get_pritunl_error_mock): + api.pritunl_auth_request = get_pritunl_error_mock() + + with pytest.raises(api.PritunlException): + response = api.list_pritunl_organizations(**pritunl_settings) diff --git a/tests/unit/plugins/modules/net_tools/pritunl/test_pritunl_user.py b/tests/unit/plugins/modules/net_tools/pritunl/test_pritunl_user.py new file mode 100644 index 0000000000..114fe8a81a --- /dev/null +++ b/tests/unit/plugins/modules/net_tools/pritunl/test_pritunl_user.py @@ -0,0 +1,208 @@ +# -*- coding: utf-8 -*- +# (c) 2021 Florian Dambrine +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import absolute_import, division, print_function + +import sys + +from ansible.module_utils.common.dict_transformations import dict_merge +from ansible.module_utils.six import iteritems +from ansible_collections.community.general.plugins.modules.net_tools.pritunl import ( + pritunl_user, +) +from ansible_collections.community.general.tests.unit.compat.mock import patch +from ansible_collections.community.general.tests.unit.plugins.module_utils.net_tools.pritunl.test_api import ( + PritunlDeleteUserMock, + PritunlListOrganizationMock, + PritunlListUserMock, + PritunlPostUserMock, + PritunlPutUserMock, +) +from ansible_collections.community.general.tests.unit.plugins.modules.utils import ( + AnsibleExitJson, + AnsibleFailJson, + ModuleTestCase, + set_module_args, +) + +__metaclass__ = type + + +def mock_pritunl_api(func, **kwargs): + def wrapped(self=None): + with self.patch_get_pritunl_organizations( + side_effect=PritunlListOrganizationMock + ): + with self.patch_get_pritunl_users(side_effect=PritunlListUserMock): + with self.patch_add_pritunl_users(side_effect=PritunlPostUserMock): + with self.patch_delete_pritunl_users( + side_effect=PritunlDeleteUserMock + ): + func(self, **kwargs) + + return wrapped + + +class TestPritunlUser(ModuleTestCase): + def setUp(self): + super(TestPritunlUser, self).setUp() + self.module = pritunl_user + + # Add backward compatibility + if sys.version_info < (3, 2): + self.assertRegex = self.assertRegexpMatches + + def tearDown(self): + super(TestPritunlUser, self).tearDown() + + def patch_get_pritunl_users(self, **kwds): + return patch( + "ansible_collections.community.general.plugins.module_utils.net_tools.pritunl.api._get_pritunl_users", + autospec=True, + **kwds + ) + + def patch_add_pritunl_users(self, **kwds): + return patch( + "ansible_collections.community.general.plugins.module_utils.net_tools.pritunl.api._post_pritunl_user", + autospec=True, + **kwds + ) + + def patch_update_pritunl_users(self, **kwds): + return patch( + "ansible_collections.community.general.plugins.module_utils.net_tools.pritunl.api._put_pritunl_user", + autospec=True, + **kwds + ) + + def patch_delete_pritunl_users(self, **kwds): + return patch( + "ansible_collections.community.general.plugins.module_utils.net_tools.pritunl.api._delete_pritunl_user", + autospec=True, + **kwds + ) + + def patch_get_pritunl_organizations(self, **kwds): + return patch( + "ansible_collections.community.general.plugins.module_utils.net_tools.pritunl.api._get_pritunl_organizations", + autospec=True, + **kwds + ) + + def test_without_parameters(self): + """Test without parameters""" + set_module_args({}) + with self.assertRaises(AnsibleFailJson): + self.module.main() + + @mock_pritunl_api + def test_present(self): + """Test Pritunl user creation and update.""" + user_params = { + "user_name": "alice", + "user_email": "alice@company.com", + } + set_module_args( + dict_merge( + { + "pritunl_api_token": "token", + "pritunl_api_secret": "secret", + "pritunl_url": "https://pritunl.domain.com", + "organization": "GumGum", + }, + user_params, + ) + ) + + with self.patch_update_pritunl_users( + side_effect=PritunlPostUserMock + ) as post_mock: + with self.assertRaises(AnsibleExitJson) as create_result: + self.module.main() + + create_exc = create_result.exception.args[0] + + self.assertTrue(create_exc["changed"]) + self.assertEqual(create_exc["response"]["name"], user_params["user_name"]) + self.assertEqual(create_exc["response"]["email"], user_params["user_email"]) + self.assertFalse(create_exc["response"]["disabled"]) + + # Changing user from alice to bob should update certain fields only + + new_user_params = { + "user_name": "bob", + "user_email": "bob@company.com", + "user_disabled": True, + } + set_module_args( + dict_merge( + { + "pritunl_api_token": "token", + "pritunl_api_secret": "secret", + "pritunl_url": "https://pritunl.domain.com", + "organization": "GumGum", + }, + new_user_params, + ) + ) + + with self.patch_update_pritunl_users( + side_effect=PritunlPutUserMock + ) as put_mock: + + with self.assertRaises(AnsibleExitJson) as update_result: + self.module.main() + + update_exc = update_result.exception.args[0] + + # Ensure only certain settings changed and the rest remained untouched. + for k, v in iteritems(update_exc): + if k in new_user_params: + assert update_exc[k] == v + else: + assert update_exc[k] == create_exc[k] + + @mock_pritunl_api + def test_absent(self): + """Test user removal from Pritunl.""" + set_module_args( + { + "state": "absent", + "pritunl_api_token": "token", + "pritunl_api_secret": "secret", + "pritunl_url": "https://pritunl.domain.com", + "organization": "GumGum", + "user_name": "florian", + } + ) + + with self.assertRaises(AnsibleExitJson) as result: + self.module.main() + + exc = result.exception.args[0] + + self.assertTrue(exc["changed"]) + self.assertEqual(exc["response"], {}) + + @mock_pritunl_api + def test_absent_failure(self): + """Test user removal from a non existing organization.""" + set_module_args( + { + "state": "absent", + "pritunl_api_token": "token", + "pritunl_api_secret": "secret", + "pritunl_url": "https://pritunl.domain.com", + "organization": "Unknown", + "user_name": "floria@company.com", + } + ) + + with self.assertRaises(AnsibleFailJson) as result: + self.module.main() + + exc = result.exception.args[0] + + self.assertRegex(exc["msg"], "Can not remove user") diff --git a/tests/unit/plugins/modules/net_tools/pritunl/test_pritunl_user_info.py b/tests/unit/plugins/modules/net_tools/pritunl/test_pritunl_user_info.py new file mode 100644 index 0000000000..b253dc27ec --- /dev/null +++ b/tests/unit/plugins/modules/net_tools/pritunl/test_pritunl_user_info.py @@ -0,0 +1,160 @@ +# -*- coding: utf-8 -*- +# Copyright: (c) 2021, Florian Dambrine +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import absolute_import, division, print_function + +import sys + +from ansible_collections.community.general.plugins.modules.net_tools.pritunl import ( + pritunl_user_info, +) +from ansible_collections.community.general.tests.unit.compat.mock import patch +from ansible_collections.community.general.tests.unit.plugins.module_utils.net_tools.pritunl.test_api import ( + PritunlListOrganizationMock, + PritunlListUserMock, +) +from ansible_collections.community.general.tests.unit.plugins.modules.utils import ( + AnsibleExitJson, + AnsibleFailJson, + ModuleTestCase, + set_module_args, +) + +__metaclass__ = type + + +class TestPritunlUserInfo(ModuleTestCase): + def setUp(self): + super(TestPritunlUserInfo, self).setUp() + self.module = pritunl_user_info + + # Add backward compatibility + if sys.version_info < (3, 2): + self.assertRegex = self.assertRegexpMatches + + def tearDown(self): + super(TestPritunlUserInfo, self).tearDown() + + def patch_get_pritunl_users(self, **kwds): + return patch( + "ansible_collections.community.general.plugins.module_utils.net_tools.pritunl.api._get_pritunl_users", + autospec=True, + **kwds + ) + + def patch_get_pritunl_organizations(self, **kwds): + return patch( + "ansible_collections.community.general.plugins.module_utils.net_tools.pritunl.api._get_pritunl_organizations", + autospec=True, + **kwds + ) + + def test_without_parameters(self): + """Test without parameters""" + with self.patch_get_pritunl_organizations( + side_effect=PritunlListOrganizationMock + ) as org_mock: + with self.patch_get_pritunl_users( + side_effect=PritunlListUserMock + ) as user_mock: + set_module_args({}) + with self.assertRaises(AnsibleFailJson): + self.module.main() + + self.assertEqual(org_mock.call_count, 0) + self.assertEqual(user_mock.call_count, 0) + + def test_missing_organization(self): + """Failure must occur when the requested organization is not found.""" + with self.patch_get_pritunl_organizations( + side_effect=PritunlListOrganizationMock + ) as org_mock: + with self.patch_get_pritunl_users( + side_effect=PritunlListUserMock + ) as user_mock: + with self.assertRaises(AnsibleFailJson) as result: + set_module_args( + { + "pritunl_api_token": "token", + "pritunl_api_secret": "secret", + "pritunl_url": "https://pritunl.domain.com", + "organization": "Unknown", + } + ) + self.module.main() + + self.assertEqual(org_mock.call_count, 1) + self.assertEqual(user_mock.call_count, 0) + + exc = result.exception.args[0] + self.assertRegex(exc["msg"], "Can not list users from the organization") + + def test_get_all_client_users_from_organization(self): + """ + The list of all Pritunl client users from the organization must be returned when no user specified. + """ + expected_user_type = "client" + with self.patch_get_pritunl_organizations( + side_effect=PritunlListOrganizationMock + ) as org_mock: + with self.patch_get_pritunl_users( + side_effect=PritunlListUserMock + ) as user_mock: + with self.assertRaises(AnsibleExitJson) as result: + set_module_args( + { + "pritunl_api_token": "token", + "pritunl_api_secret": "secret", + "pritunl_url": "https://pritunl.domain.com", + "organization": "GumGum", + } + ) + self.module.main() + + self.assertEqual(org_mock.call_count, 1) + self.assertEqual(user_mock.call_count, 1) + + exc = result.exception.args[0] + # module should not report changes + self.assertFalse(exc["changed"]) + # user_type when not provided is set client and should only return client user type + self.assertEqual(len(exc["users"]), 1) + for user in exc["users"]: + self.assertEqual(user["type"], expected_user_type) + + def test_get_specific_server_user_from_organization(self): + """ + Retrieving a specific user from the organization must return a single record. + """ + expected_user_type = "server" + expected_user_name = "ops" + with self.patch_get_pritunl_organizations( + side_effect=PritunlListOrganizationMock + ) as org_mock: + with self.patch_get_pritunl_users( + side_effect=PritunlListUserMock + ) as user_mock: + with self.assertRaises(AnsibleExitJson) as result: + set_module_args( + { + "pritunl_api_token": "token", + "pritunl_api_secret": "secret", + "pritunl_url": "https://pritunl.domain.com", + "organization": "GumGum", + "user_name": expected_user_name, + "user_type": expected_user_type, + } + ) + self.module.main() + + self.assertEqual(org_mock.call_count, 1) + self.assertEqual(user_mock.call_count, 1) + + exc = result.exception.args[0] + # module should not report changes + self.assertFalse(exc["changed"]) + self.assertEqual(len(exc["users"]), 1) + for user in exc["users"]: + self.assertEqual(user["type"], expected_user_type) + self.assertEqual(user["name"], expected_user_name)