1
0
Fork 0
mirror of https://github.com/ansible-collections/community.general.git synced 2024-09-14 20:13:21 +02:00

tss_lookup_plugin - Refactor and decoupling (#3252) (#3264)

* Initial commit

* Adding changelog fragment

* Applying initial review suggestions

* Increasing unit coverage

* Removed unneccessary constant

* Improving test readability

* Cleanup constants

(cherry picked from commit cbcb942b0e)

Co-authored-by: Ajpantuso <ajpantuso@gmail.com>
This commit is contained in:
patchback[bot] 2021-08-25 07:05:33 +02:00 committed by GitHub
parent e5bdc028c4
commit e01d014c36
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 187 additions and 80 deletions

View file

@ -0,0 +1,4 @@
---
minor_changes:
- tss lookup plugin - refactored to decouple the supporting third-party library (``python-tss-sdk``)
(https://github.com/ansible-collections/community.general/pull/3252).

View file

@ -47,7 +47,9 @@ options:
required: true required: true
domain: domain:
default: "" default: ""
description: The domain with which to request the OAuth2 Access Grant. description:
- The domain with which to request the OAuth2 Access Grant.
- Requires C(python-tss-sdk) version 1.0.0 or greater.
env: env:
- name: TSS_DOMAIN - name: TSS_DOMAIN
ini: ini:
@ -122,100 +124,125 @@ EXAMPLES = r"""
- ansible.builtin.debug: - ansible.builtin.debug:
msg: the password is {{ secret_password }} msg: the password is {{ secret_password }}
""" """
from distutils.version import LooseVersion
from ansible.errors import AnsibleError, AnsibleOptionsError
sdk_is_missing = False import abc
from ansible.errors import AnsibleError, AnsibleOptionsError
from ansible.module_utils import six
from ansible.plugins.lookup import LookupBase
from ansible.utils.display import Display
try: try:
from thycotic.secrets.server import SecretServer, SecretServerError from thycotic.secrets.server import SecretServer, SecretServerError
except ImportError:
sdk_is_missing = True
# Added for backwards compatability - See issue #3192 HAS_TSS_SDK = True
# https://github.com/ansible-collections/community.general/issues/3192
try:
from thycotic import __version__ as sdk_version
except ImportError: except ImportError:
sdk_version = "0.0.5" SecretServer = None
SecretServerError = None
HAS_TSS_SDK = False
try: try:
from thycotic.secrets.server import PasswordGrantAuthorizer, DomainPasswordGrantAuthorizer from thycotic.secrets.server import PasswordGrantAuthorizer, DomainPasswordGrantAuthorizer
sdK_version_below_v1 = False HAS_TSS_AUTHORIZER = True
except ImportError: except ImportError:
sdK_version_below_v1 = True PasswordGrantAuthorizer = None
DomainPasswordGrantAuthorizer = None
HAS_TSS_AUTHORIZER = False
from ansible.utils.display import Display
from ansible.plugins.lookup import LookupBase
display = Display() display = Display()
class LookupModule(LookupBase): @six.add_metaclass(abc.ABCMeta)
@staticmethod class TSSClient(object):
def Client(server_parameters): def __init__(self):
self._client = None
if LooseVersion(sdk_version) < LooseVersion('1.0.0') or sdK_version_below_v1: @staticmethod
return SecretServer( def from_params(**server_parameters):
if HAS_TSS_AUTHORIZER:
return TSSClientV1(**server_parameters)
else:
return TSSClientV0(**server_parameters)
def get_secret(self, term):
display.debug("tss_lookup term: %s" % term)
secret_id = self._term_to_secret_id(term)
display.vvv(u"Secret Server lookup of Secret with ID %d" % secret_id)
return self._client.get_secret_json(secret_id)
@staticmethod
def _term_to_secret_id(term):
try:
return int(term)
except ValueError:
raise AnsibleOptionsError("Secret ID must be an integer")
class TSSClientV0(TSSClient):
def __init__(self, **server_parameters):
super(TSSClientV0, self).__init__()
if server_parameters.get("domain"):
raise AnsibleError("The 'domain' option requires 'python-tss-sdk' version 1.0.0 or greater")
self._client = SecretServer(
server_parameters["base_url"],
server_parameters["username"],
server_parameters["password"],
server_parameters["api_path_uri"],
server_parameters["token_path_uri"],
)
class TSSClientV1(TSSClient):
def __init__(self, **server_parameters):
super(TSSClientV1, self).__init__()
authorizer = self._get_authorizer(**server_parameters)
self._client = SecretServer(
server_parameters["base_url"], authorizer, server_parameters["api_path_uri"]
)
@staticmethod
def _get_authorizer(**server_parameters):
if server_parameters.get("domain"):
return DomainPasswordGrantAuthorizer(
server_parameters["base_url"], server_parameters["base_url"],
server_parameters["username"], server_parameters["username"],
server_parameters["domain"],
server_parameters["password"], server_parameters["password"],
server_parameters["api_path_uri"],
server_parameters["token_path_uri"], server_parameters["token_path_uri"],
) )
else:
# The Password Authorizer and Domain Password Authorizer
# became available in v1.0.0 and beyond.
# Import only if sdk_version requires it.
# from thycotic.secrets.server import PasswordGrantAuthorizer
if server_parameters["domain"]: return PasswordGrantAuthorizer(
authorizer = DomainPasswordGrantAuthorizer( server_parameters["base_url"],
server_parameters["base_url"], server_parameters["username"],
server_parameters["username"], server_parameters["password"],
server_parameters["domain"], server_parameters["token_path_uri"],
server_parameters["password"], )
server_parameters["token_path_uri"],
)
else:
authorizer = PasswordGrantAuthorizer(
server_parameters["base_url"],
server_parameters["username"],
server_parameters["password"],
server_parameters["token_path_uri"],
)
return SecretServer(
server_parameters["base_url"], authorizer, server_parameters["api_path_uri"]
)
class LookupModule(LookupBase):
def run(self, terms, variables, **kwargs): def run(self, terms, variables, **kwargs):
if sdk_is_missing: if not HAS_TSS_SDK:
raise AnsibleError("python-tss-sdk must be installed to use this plugin") raise AnsibleError("python-tss-sdk must be installed to use this plugin")
self.set_options(var_options=variables, direct=kwargs) self.set_options(var_options=variables, direct=kwargs)
secret_server = LookupModule.Client( tss = TSSClient.from_params(
{ base_url=self.get_option("base_url"),
"base_url": self.get_option("base_url"), username=self.get_option("username"),
"username": self.get_option("username"), password=self.get_option("password"),
"password": self.get_option("password"), domain=self.get_option("domain"),
"domain": self.get_option("domain"), api_path_uri=self.get_option("api_path_uri"),
"api_path_uri": self.get_option("api_path_uri"), token_path_uri=self.get_option("token_path_uri"),
"token_path_uri": self.get_option("token_path_uri"),
}
) )
result = []
for term in terms: try:
display.debug("tss_lookup term: %s" % term) return [tss.get_secret(term) for term in terms]
try: except SecretServerError as error:
id = int(term) raise AnsibleError("Secret Server lookup failure: %s" % error.message)
display.vvv(u"Secret Server lookup of Secret with ID %d" % id)
result.append(secret_server.get_secret_json(id))
except ValueError:
raise AnsibleOptionsError("Secret ID must be an integer")
except SecretServerError as error:
raise AnsibleError("Secret Server lookup failure: %s" % error.message)
return result

View file

@ -10,12 +10,25 @@ __metaclass__ = type
from ansible_collections.community.general.tests.unit.compat.unittest import TestCase from ansible_collections.community.general.tests.unit.compat.unittest import TestCase
from ansible_collections.community.general.tests.unit.compat.mock import ( from ansible_collections.community.general.tests.unit.compat.mock import (
patch, patch,
DEFAULT,
MagicMock, MagicMock,
) )
from ansible_collections.community.general.plugins.lookup import tss from ansible_collections.community.general.plugins.lookup import tss
from ansible.plugins.loader import lookup_loader from ansible.plugins.loader import lookup_loader
TSS_IMPORT_PATH = 'ansible_collections.community.general.plugins.lookup.tss'
def make_absolute(name):
return '.'.join([TSS_IMPORT_PATH, name])
class SecretServerError(Exception):
def __init__(self):
self.message = ''
class MockSecretServer(MagicMock): class MockSecretServer(MagicMock):
RESPONSE = '{"foo": "bar"}' RESPONSE = '{"foo": "bar"}'
@ -23,21 +36,84 @@ class MockSecretServer(MagicMock):
return self.RESPONSE return self.RESPONSE
class TestLookupModule(TestCase): class MockFaultySecretServer(MagicMock):
def get_secret_json(self, path):
raise SecretServerError
@patch(make_absolute('SecretServer'), MockSecretServer())
class TestTSSClient(TestCase):
def setUp(self):
self.server_params = {
'base_url': '',
'username': '',
'domain': '',
'password': '',
'api_path_uri': '',
'token_path_uri': '',
}
def test_from_params(self):
with patch(make_absolute('HAS_TSS_AUTHORIZER'), False):
self.assert_client_version('v0')
with patch.dict(self.server_params, {'domain': 'foo'}):
with self.assertRaises(tss.AnsibleError):
self._get_client()
with patch.multiple(TSS_IMPORT_PATH,
HAS_TSS_AUTHORIZER=True,
PasswordGrantAuthorizer=DEFAULT,
DomainPasswordGrantAuthorizer=DEFAULT):
self.assert_client_version('v1')
with patch.dict(self.server_params, {'domain': 'foo'}):
self.assert_client_version('v1')
def assert_client_version(self, version):
version_to_class = {
'v0': tss.TSSClientV0,
'v1': tss.TSSClientV1
}
client = self._get_client()
self.assertIsInstance(client, version_to_class[version])
def _get_client(self):
return tss.TSSClient.from_params(**self.server_params)
class TestLookupModule(TestCase):
VALID_TERMS = [1]
INVALID_TERMS = ['foo']
def setUp(self): def setUp(self):
tss.sdk_is_missing = False
self.lookup = lookup_loader.get("community.general.tss") self.lookup = lookup_loader.get("community.general.tss")
@patch( @patch.multiple(TSS_IMPORT_PATH,
"ansible_collections.community.general.plugins.lookup.tss.LookupModule.Client", HAS_TSS_SDK=False,
MockSecretServer(), SecretServer=MockSecretServer)
) def test_missing_sdk(self):
with self.assertRaises(tss.AnsibleError):
self._run_lookup(self.VALID_TERMS)
@patch.multiple(TSS_IMPORT_PATH,
HAS_TSS_SDK=True,
SecretServerError=SecretServerError)
def test_get_secret_json(self): def test_get_secret_json(self):
self.assertListEqual( with patch(make_absolute('SecretServer'), MockSecretServer):
[MockSecretServer.RESPONSE], self.assertListEqual([MockSecretServer.RESPONSE], self._run_lookup(self.VALID_TERMS))
self.lookup.run(
[1], with self.assertRaises(tss.AnsibleOptionsError):
[], self._run_lookup(self.INVALID_TERMS)
**{"base_url": "dummy", "username": "dummy", "password": "dummy", }
), with patch(make_absolute('SecretServer'), MockFaultySecretServer):
) with self.assertRaises(tss.AnsibleError):
self._run_lookup(self.VALID_TERMS)
def _run_lookup(self, terms, variables=None, **kwargs):
variables = variables or []
kwargs = kwargs or {"base_url": "dummy", "username": "dummy", "password": "dummy"}
return self.lookup.run(terms, variables, **kwargs)