diff --git a/changelogs/fragments/3252-tss_lookup_plugin-refactor.yml b/changelogs/fragments/3252-tss_lookup_plugin-refactor.yml new file mode 100644 index 0000000000..6e8ccb29f8 --- /dev/null +++ b/changelogs/fragments/3252-tss_lookup_plugin-refactor.yml @@ -0,0 +1,4 @@ +--- +minor_changes: + - tss lookup plugin - refactored to decouple the supporting third-party library (``python-tss-sdk``) + (https://github.com/ansible-collections/community.general/pull/3252). diff --git a/plugins/lookup/tss.py b/plugins/lookup/tss.py index ecc3fd6c8b..fe6042e130 100644 --- a/plugins/lookup/tss.py +++ b/plugins/lookup/tss.py @@ -47,7 +47,9 @@ options: required: true domain: default: "" - description: The domain with which to request the OAuth2 Access Grant. + description: + - The domain with which to request the OAuth2 Access Grant. + - Requires C(python-tss-sdk) version 1.0.0 or greater. env: - name: TSS_DOMAIN ini: @@ -122,100 +124,125 @@ EXAMPLES = r""" - ansible.builtin.debug: msg: the password is {{ secret_password }} """ -from distutils.version import LooseVersion -from ansible.errors import AnsibleError, AnsibleOptionsError -sdk_is_missing = False +import abc + +from ansible.errors import AnsibleError, AnsibleOptionsError +from ansible.module_utils import six +from ansible.plugins.lookup import LookupBase +from ansible.utils.display import Display try: from thycotic.secrets.server import SecretServer, SecretServerError -except ImportError: - sdk_is_missing = True -# Added for backwards compatability - See issue #3192 -# https://github.com/ansible-collections/community.general/issues/3192 -try: - from thycotic import __version__ as sdk_version + HAS_TSS_SDK = True except ImportError: - sdk_version = "0.0.5" + SecretServer = None + SecretServerError = None + HAS_TSS_SDK = False try: from thycotic.secrets.server import PasswordGrantAuthorizer, DomainPasswordGrantAuthorizer - sdK_version_below_v1 = False + HAS_TSS_AUTHORIZER = True except ImportError: - sdK_version_below_v1 = True + PasswordGrantAuthorizer = None + DomainPasswordGrantAuthorizer = None + HAS_TSS_AUTHORIZER = False -from ansible.utils.display import Display -from ansible.plugins.lookup import LookupBase display = Display() -class LookupModule(LookupBase): - @staticmethod - def Client(server_parameters): +@six.add_metaclass(abc.ABCMeta) +class TSSClient(object): + def __init__(self): + self._client = None - if LooseVersion(sdk_version) < LooseVersion('1.0.0') or sdK_version_below_v1: - return SecretServer( + @staticmethod + def from_params(**server_parameters): + if HAS_TSS_AUTHORIZER: + return TSSClientV1(**server_parameters) + else: + return TSSClientV0(**server_parameters) + + def get_secret(self, term): + display.debug("tss_lookup term: %s" % term) + + secret_id = self._term_to_secret_id(term) + display.vvv(u"Secret Server lookup of Secret with ID %d" % secret_id) + + return self._client.get_secret_json(secret_id) + + @staticmethod + def _term_to_secret_id(term): + try: + return int(term) + except ValueError: + raise AnsibleOptionsError("Secret ID must be an integer") + + +class TSSClientV0(TSSClient): + def __init__(self, **server_parameters): + super(TSSClientV0, self).__init__() + + if server_parameters.get("domain"): + raise AnsibleError("The 'domain' option requires 'python-tss-sdk' version 1.0.0 or greater") + + self._client = SecretServer( + server_parameters["base_url"], + server_parameters["username"], + server_parameters["password"], + server_parameters["api_path_uri"], + server_parameters["token_path_uri"], + ) + + +class TSSClientV1(TSSClient): + def __init__(self, **server_parameters): + super(TSSClientV1, self).__init__() + + authorizer = self._get_authorizer(**server_parameters) + self._client = SecretServer( + server_parameters["base_url"], authorizer, server_parameters["api_path_uri"] + ) + + @staticmethod + def _get_authorizer(**server_parameters): + if server_parameters.get("domain"): + return DomainPasswordGrantAuthorizer( server_parameters["base_url"], server_parameters["username"], + server_parameters["domain"], server_parameters["password"], - server_parameters["api_path_uri"], server_parameters["token_path_uri"], ) - else: - # The Password Authorizer and Domain Password Authorizer - # became available in v1.0.0 and beyond. - # Import only if sdk_version requires it. - # from thycotic.secrets.server import PasswordGrantAuthorizer - if server_parameters["domain"]: - authorizer = DomainPasswordGrantAuthorizer( - server_parameters["base_url"], - server_parameters["username"], - server_parameters["domain"], - server_parameters["password"], - server_parameters["token_path_uri"], - ) - else: - authorizer = PasswordGrantAuthorizer( - server_parameters["base_url"], - server_parameters["username"], - server_parameters["password"], - server_parameters["token_path_uri"], - ) + return PasswordGrantAuthorizer( + server_parameters["base_url"], + server_parameters["username"], + server_parameters["password"], + server_parameters["token_path_uri"], + ) - return SecretServer( - server_parameters["base_url"], authorizer, server_parameters["api_path_uri"] - ) +class LookupModule(LookupBase): def run(self, terms, variables, **kwargs): - if sdk_is_missing: + if not HAS_TSS_SDK: raise AnsibleError("python-tss-sdk must be installed to use this plugin") self.set_options(var_options=variables, direct=kwargs) - secret_server = LookupModule.Client( - { - "base_url": self.get_option("base_url"), - "username": self.get_option("username"), - "password": self.get_option("password"), - "domain": self.get_option("domain"), - "api_path_uri": self.get_option("api_path_uri"), - "token_path_uri": self.get_option("token_path_uri"), - } + tss = TSSClient.from_params( + base_url=self.get_option("base_url"), + username=self.get_option("username"), + password=self.get_option("password"), + domain=self.get_option("domain"), + api_path_uri=self.get_option("api_path_uri"), + token_path_uri=self.get_option("token_path_uri"), ) - result = [] - for term in terms: - display.debug("tss_lookup term: %s" % term) - try: - id = int(term) - display.vvv(u"Secret Server lookup of Secret with ID %d" % id) - result.append(secret_server.get_secret_json(id)) - except ValueError: - raise AnsibleOptionsError("Secret ID must be an integer") - except SecretServerError as error: - raise AnsibleError("Secret Server lookup failure: %s" % error.message) - return result + try: + return [tss.get_secret(term) for term in terms] + except SecretServerError as error: + raise AnsibleError("Secret Server lookup failure: %s" % error.message) diff --git a/tests/unit/plugins/lookup/test_tss.py b/tests/unit/plugins/lookup/test_tss.py index cca2f6ff5f..97073d34be 100644 --- a/tests/unit/plugins/lookup/test_tss.py +++ b/tests/unit/plugins/lookup/test_tss.py @@ -10,12 +10,25 @@ __metaclass__ = type from ansible_collections.community.general.tests.unit.compat.unittest import TestCase from ansible_collections.community.general.tests.unit.compat.mock import ( patch, + DEFAULT, MagicMock, ) from ansible_collections.community.general.plugins.lookup import tss from ansible.plugins.loader import lookup_loader +TSS_IMPORT_PATH = 'ansible_collections.community.general.plugins.lookup.tss' + + +def make_absolute(name): + return '.'.join([TSS_IMPORT_PATH, name]) + + +class SecretServerError(Exception): + def __init__(self): + self.message = '' + + class MockSecretServer(MagicMock): RESPONSE = '{"foo": "bar"}' @@ -23,21 +36,84 @@ class MockSecretServer(MagicMock): return self.RESPONSE -class TestLookupModule(TestCase): +class MockFaultySecretServer(MagicMock): + def get_secret_json(self, path): + raise SecretServerError + + +@patch(make_absolute('SecretServer'), MockSecretServer()) +class TestTSSClient(TestCase): + def setUp(self): + self.server_params = { + 'base_url': '', + 'username': '', + 'domain': '', + 'password': '', + 'api_path_uri': '', + 'token_path_uri': '', + } + + def test_from_params(self): + with patch(make_absolute('HAS_TSS_AUTHORIZER'), False): + self.assert_client_version('v0') + + with patch.dict(self.server_params, {'domain': 'foo'}): + with self.assertRaises(tss.AnsibleError): + self._get_client() + + with patch.multiple(TSS_IMPORT_PATH, + HAS_TSS_AUTHORIZER=True, + PasswordGrantAuthorizer=DEFAULT, + DomainPasswordGrantAuthorizer=DEFAULT): + + self.assert_client_version('v1') + + with patch.dict(self.server_params, {'domain': 'foo'}): + self.assert_client_version('v1') + + def assert_client_version(self, version): + version_to_class = { + 'v0': tss.TSSClientV0, + 'v1': tss.TSSClientV1 + } + + client = self._get_client() + self.assertIsInstance(client, version_to_class[version]) + + def _get_client(self): + return tss.TSSClient.from_params(**self.server_params) + + +class TestLookupModule(TestCase): + VALID_TERMS = [1] + INVALID_TERMS = ['foo'] + def setUp(self): - tss.sdk_is_missing = False self.lookup = lookup_loader.get("community.general.tss") - @patch( - "ansible_collections.community.general.plugins.lookup.tss.LookupModule.Client", - MockSecretServer(), - ) + @patch.multiple(TSS_IMPORT_PATH, + HAS_TSS_SDK=False, + SecretServer=MockSecretServer) + def test_missing_sdk(self): + with self.assertRaises(tss.AnsibleError): + self._run_lookup(self.VALID_TERMS) + + @patch.multiple(TSS_IMPORT_PATH, + HAS_TSS_SDK=True, + SecretServerError=SecretServerError) def test_get_secret_json(self): - self.assertListEqual( - [MockSecretServer.RESPONSE], - self.lookup.run( - [1], - [], - **{"base_url": "dummy", "username": "dummy", "password": "dummy", } - ), - ) + with patch(make_absolute('SecretServer'), MockSecretServer): + self.assertListEqual([MockSecretServer.RESPONSE], self._run_lookup(self.VALID_TERMS)) + + with self.assertRaises(tss.AnsibleOptionsError): + self._run_lookup(self.INVALID_TERMS) + + with patch(make_absolute('SecretServer'), MockFaultySecretServer): + with self.assertRaises(tss.AnsibleError): + self._run_lookup(self.VALID_TERMS) + + def _run_lookup(self, terms, variables=None, **kwargs): + variables = variables or [] + kwargs = kwargs or {"base_url": "dummy", "username": "dummy", "password": "dummy"} + + return self.lookup.run(terms, variables, **kwargs)