mirror of
https://github.com/ansible-collections/community.general.git
synced 2024-09-14 20:13:21 +02:00
tss_lookup_plugin - Refactor and decoupling (#3252)
* Initial commit * Adding changelog fragment * Applying initial review suggestions * Increasing unit coverage * Removed unneccessary constant * Improving test readability * Cleanup constants
This commit is contained in:
parent
f2fa56b485
commit
cbcb942b0e
3 changed files with 187 additions and 80 deletions
4
changelogs/fragments/3252-tss_lookup_plugin-refactor.yml
Normal file
4
changelogs/fragments/3252-tss_lookup_plugin-refactor.yml
Normal file
|
@ -0,0 +1,4 @@
|
||||||
|
---
|
||||||
|
minor_changes:
|
||||||
|
- tss lookup plugin - refactored to decouple the supporting third-party library (``python-tss-sdk``)
|
||||||
|
(https://github.com/ansible-collections/community.general/pull/3252).
|
|
@ -47,7 +47,9 @@ options:
|
||||||
required: true
|
required: true
|
||||||
domain:
|
domain:
|
||||||
default: ""
|
default: ""
|
||||||
description: The domain with which to request the OAuth2 Access Grant.
|
description:
|
||||||
|
- The domain with which to request the OAuth2 Access Grant.
|
||||||
|
- Requires C(python-tss-sdk) version 1.0.0 or greater.
|
||||||
env:
|
env:
|
||||||
- name: TSS_DOMAIN
|
- name: TSS_DOMAIN
|
||||||
ini:
|
ini:
|
||||||
|
@ -122,100 +124,125 @@ EXAMPLES = r"""
|
||||||
- ansible.builtin.debug:
|
- ansible.builtin.debug:
|
||||||
msg: the password is {{ secret_password }}
|
msg: the password is {{ secret_password }}
|
||||||
"""
|
"""
|
||||||
from distutils.version import LooseVersion
|
|
||||||
from ansible.errors import AnsibleError, AnsibleOptionsError
|
|
||||||
|
|
||||||
sdk_is_missing = False
|
import abc
|
||||||
|
|
||||||
|
from ansible.errors import AnsibleError, AnsibleOptionsError
|
||||||
|
from ansible.module_utils import six
|
||||||
|
from ansible.plugins.lookup import LookupBase
|
||||||
|
from ansible.utils.display import Display
|
||||||
|
|
||||||
try:
|
try:
|
||||||
from thycotic.secrets.server import SecretServer, SecretServerError
|
from thycotic.secrets.server import SecretServer, SecretServerError
|
||||||
except ImportError:
|
|
||||||
sdk_is_missing = True
|
|
||||||
|
|
||||||
# Added for backwards compatability - See issue #3192
|
HAS_TSS_SDK = True
|
||||||
# https://github.com/ansible-collections/community.general/issues/3192
|
|
||||||
try:
|
|
||||||
from thycotic import __version__ as sdk_version
|
|
||||||
except ImportError:
|
except ImportError:
|
||||||
sdk_version = "0.0.5"
|
SecretServer = None
|
||||||
|
SecretServerError = None
|
||||||
|
HAS_TSS_SDK = False
|
||||||
|
|
||||||
try:
|
try:
|
||||||
from thycotic.secrets.server import PasswordGrantAuthorizer, DomainPasswordGrantAuthorizer
|
from thycotic.secrets.server import PasswordGrantAuthorizer, DomainPasswordGrantAuthorizer
|
||||||
|
|
||||||
sdK_version_below_v1 = False
|
HAS_TSS_AUTHORIZER = True
|
||||||
except ImportError:
|
except ImportError:
|
||||||
sdK_version_below_v1 = True
|
PasswordGrantAuthorizer = None
|
||||||
|
DomainPasswordGrantAuthorizer = None
|
||||||
|
HAS_TSS_AUTHORIZER = False
|
||||||
|
|
||||||
from ansible.utils.display import Display
|
|
||||||
from ansible.plugins.lookup import LookupBase
|
|
||||||
|
|
||||||
display = Display()
|
display = Display()
|
||||||
|
|
||||||
|
|
||||||
class LookupModule(LookupBase):
|
@six.add_metaclass(abc.ABCMeta)
|
||||||
@staticmethod
|
class TSSClient(object):
|
||||||
def Client(server_parameters):
|
def __init__(self):
|
||||||
|
self._client = None
|
||||||
|
|
||||||
if LooseVersion(sdk_version) < LooseVersion('1.0.0') or sdK_version_below_v1:
|
@staticmethod
|
||||||
return SecretServer(
|
def from_params(**server_parameters):
|
||||||
|
if HAS_TSS_AUTHORIZER:
|
||||||
|
return TSSClientV1(**server_parameters)
|
||||||
|
else:
|
||||||
|
return TSSClientV0(**server_parameters)
|
||||||
|
|
||||||
|
def get_secret(self, term):
|
||||||
|
display.debug("tss_lookup term: %s" % term)
|
||||||
|
|
||||||
|
secret_id = self._term_to_secret_id(term)
|
||||||
|
display.vvv(u"Secret Server lookup of Secret with ID %d" % secret_id)
|
||||||
|
|
||||||
|
return self._client.get_secret_json(secret_id)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _term_to_secret_id(term):
|
||||||
|
try:
|
||||||
|
return int(term)
|
||||||
|
except ValueError:
|
||||||
|
raise AnsibleOptionsError("Secret ID must be an integer")
|
||||||
|
|
||||||
|
|
||||||
|
class TSSClientV0(TSSClient):
|
||||||
|
def __init__(self, **server_parameters):
|
||||||
|
super(TSSClientV0, self).__init__()
|
||||||
|
|
||||||
|
if server_parameters.get("domain"):
|
||||||
|
raise AnsibleError("The 'domain' option requires 'python-tss-sdk' version 1.0.0 or greater")
|
||||||
|
|
||||||
|
self._client = SecretServer(
|
||||||
server_parameters["base_url"],
|
server_parameters["base_url"],
|
||||||
server_parameters["username"],
|
server_parameters["username"],
|
||||||
server_parameters["password"],
|
server_parameters["password"],
|
||||||
server_parameters["api_path_uri"],
|
server_parameters["api_path_uri"],
|
||||||
server_parameters["token_path_uri"],
|
server_parameters["token_path_uri"],
|
||||||
)
|
)
|
||||||
else:
|
|
||||||
# The Password Authorizer and Domain Password Authorizer
|
|
||||||
# became available in v1.0.0 and beyond.
|
|
||||||
# Import only if sdk_version requires it.
|
|
||||||
# from thycotic.secrets.server import PasswordGrantAuthorizer
|
|
||||||
|
|
||||||
if server_parameters["domain"]:
|
|
||||||
authorizer = DomainPasswordGrantAuthorizer(
|
class TSSClientV1(TSSClient):
|
||||||
|
def __init__(self, **server_parameters):
|
||||||
|
super(TSSClientV1, self).__init__()
|
||||||
|
|
||||||
|
authorizer = self._get_authorizer(**server_parameters)
|
||||||
|
self._client = SecretServer(
|
||||||
|
server_parameters["base_url"], authorizer, server_parameters["api_path_uri"]
|
||||||
|
)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _get_authorizer(**server_parameters):
|
||||||
|
if server_parameters.get("domain"):
|
||||||
|
return DomainPasswordGrantAuthorizer(
|
||||||
server_parameters["base_url"],
|
server_parameters["base_url"],
|
||||||
server_parameters["username"],
|
server_parameters["username"],
|
||||||
server_parameters["domain"],
|
server_parameters["domain"],
|
||||||
server_parameters["password"],
|
server_parameters["password"],
|
||||||
server_parameters["token_path_uri"],
|
server_parameters["token_path_uri"],
|
||||||
)
|
)
|
||||||
else:
|
|
||||||
authorizer = PasswordGrantAuthorizer(
|
return PasswordGrantAuthorizer(
|
||||||
server_parameters["base_url"],
|
server_parameters["base_url"],
|
||||||
server_parameters["username"],
|
server_parameters["username"],
|
||||||
server_parameters["password"],
|
server_parameters["password"],
|
||||||
server_parameters["token_path_uri"],
|
server_parameters["token_path_uri"],
|
||||||
)
|
)
|
||||||
|
|
||||||
return SecretServer(
|
|
||||||
server_parameters["base_url"], authorizer, server_parameters["api_path_uri"]
|
|
||||||
)
|
|
||||||
|
|
||||||
|
class LookupModule(LookupBase):
|
||||||
def run(self, terms, variables, **kwargs):
|
def run(self, terms, variables, **kwargs):
|
||||||
if sdk_is_missing:
|
if not HAS_TSS_SDK:
|
||||||
raise AnsibleError("python-tss-sdk must be installed to use this plugin")
|
raise AnsibleError("python-tss-sdk must be installed to use this plugin")
|
||||||
|
|
||||||
self.set_options(var_options=variables, direct=kwargs)
|
self.set_options(var_options=variables, direct=kwargs)
|
||||||
|
|
||||||
secret_server = LookupModule.Client(
|
tss = TSSClient.from_params(
|
||||||
{
|
base_url=self.get_option("base_url"),
|
||||||
"base_url": self.get_option("base_url"),
|
username=self.get_option("username"),
|
||||||
"username": self.get_option("username"),
|
password=self.get_option("password"),
|
||||||
"password": self.get_option("password"),
|
domain=self.get_option("domain"),
|
||||||
"domain": self.get_option("domain"),
|
api_path_uri=self.get_option("api_path_uri"),
|
||||||
"api_path_uri": self.get_option("api_path_uri"),
|
token_path_uri=self.get_option("token_path_uri"),
|
||||||
"token_path_uri": self.get_option("token_path_uri"),
|
|
||||||
}
|
|
||||||
)
|
)
|
||||||
result = []
|
|
||||||
|
|
||||||
for term in terms:
|
|
||||||
display.debug("tss_lookup term: %s" % term)
|
|
||||||
try:
|
try:
|
||||||
id = int(term)
|
return [tss.get_secret(term) for term in terms]
|
||||||
display.vvv(u"Secret Server lookup of Secret with ID %d" % id)
|
|
||||||
result.append(secret_server.get_secret_json(id))
|
|
||||||
except ValueError:
|
|
||||||
raise AnsibleOptionsError("Secret ID must be an integer")
|
|
||||||
except SecretServerError as error:
|
except SecretServerError as error:
|
||||||
raise AnsibleError("Secret Server lookup failure: %s" % error.message)
|
raise AnsibleError("Secret Server lookup failure: %s" % error.message)
|
||||||
return result
|
|
||||||
|
|
|
@ -10,12 +10,25 @@ __metaclass__ = type
|
||||||
from ansible_collections.community.general.tests.unit.compat.unittest import TestCase
|
from ansible_collections.community.general.tests.unit.compat.unittest import TestCase
|
||||||
from ansible_collections.community.general.tests.unit.compat.mock import (
|
from ansible_collections.community.general.tests.unit.compat.mock import (
|
||||||
patch,
|
patch,
|
||||||
|
DEFAULT,
|
||||||
MagicMock,
|
MagicMock,
|
||||||
)
|
)
|
||||||
from ansible_collections.community.general.plugins.lookup import tss
|
from ansible_collections.community.general.plugins.lookup import tss
|
||||||
from ansible.plugins.loader import lookup_loader
|
from ansible.plugins.loader import lookup_loader
|
||||||
|
|
||||||
|
|
||||||
|
TSS_IMPORT_PATH = 'ansible_collections.community.general.plugins.lookup.tss'
|
||||||
|
|
||||||
|
|
||||||
|
def make_absolute(name):
|
||||||
|
return '.'.join([TSS_IMPORT_PATH, name])
|
||||||
|
|
||||||
|
|
||||||
|
class SecretServerError(Exception):
|
||||||
|
def __init__(self):
|
||||||
|
self.message = ''
|
||||||
|
|
||||||
|
|
||||||
class MockSecretServer(MagicMock):
|
class MockSecretServer(MagicMock):
|
||||||
RESPONSE = '{"foo": "bar"}'
|
RESPONSE = '{"foo": "bar"}'
|
||||||
|
|
||||||
|
@ -23,21 +36,84 @@ class MockSecretServer(MagicMock):
|
||||||
return self.RESPONSE
|
return self.RESPONSE
|
||||||
|
|
||||||
|
|
||||||
class TestLookupModule(TestCase):
|
class MockFaultySecretServer(MagicMock):
|
||||||
|
def get_secret_json(self, path):
|
||||||
|
raise SecretServerError
|
||||||
|
|
||||||
|
|
||||||
|
@patch(make_absolute('SecretServer'), MockSecretServer())
|
||||||
|
class TestTSSClient(TestCase):
|
||||||
|
def setUp(self):
|
||||||
|
self.server_params = {
|
||||||
|
'base_url': '',
|
||||||
|
'username': '',
|
||||||
|
'domain': '',
|
||||||
|
'password': '',
|
||||||
|
'api_path_uri': '',
|
||||||
|
'token_path_uri': '',
|
||||||
|
}
|
||||||
|
|
||||||
|
def test_from_params(self):
|
||||||
|
with patch(make_absolute('HAS_TSS_AUTHORIZER'), False):
|
||||||
|
self.assert_client_version('v0')
|
||||||
|
|
||||||
|
with patch.dict(self.server_params, {'domain': 'foo'}):
|
||||||
|
with self.assertRaises(tss.AnsibleError):
|
||||||
|
self._get_client()
|
||||||
|
|
||||||
|
with patch.multiple(TSS_IMPORT_PATH,
|
||||||
|
HAS_TSS_AUTHORIZER=True,
|
||||||
|
PasswordGrantAuthorizer=DEFAULT,
|
||||||
|
DomainPasswordGrantAuthorizer=DEFAULT):
|
||||||
|
|
||||||
|
self.assert_client_version('v1')
|
||||||
|
|
||||||
|
with patch.dict(self.server_params, {'domain': 'foo'}):
|
||||||
|
self.assert_client_version('v1')
|
||||||
|
|
||||||
|
def assert_client_version(self, version):
|
||||||
|
version_to_class = {
|
||||||
|
'v0': tss.TSSClientV0,
|
||||||
|
'v1': tss.TSSClientV1
|
||||||
|
}
|
||||||
|
|
||||||
|
client = self._get_client()
|
||||||
|
self.assertIsInstance(client, version_to_class[version])
|
||||||
|
|
||||||
|
def _get_client(self):
|
||||||
|
return tss.TSSClient.from_params(**self.server_params)
|
||||||
|
|
||||||
|
|
||||||
|
class TestLookupModule(TestCase):
|
||||||
|
VALID_TERMS = [1]
|
||||||
|
INVALID_TERMS = ['foo']
|
||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
tss.sdk_is_missing = False
|
|
||||||
self.lookup = lookup_loader.get("community.general.tss")
|
self.lookup = lookup_loader.get("community.general.tss")
|
||||||
|
|
||||||
@patch(
|
@patch.multiple(TSS_IMPORT_PATH,
|
||||||
"ansible_collections.community.general.plugins.lookup.tss.LookupModule.Client",
|
HAS_TSS_SDK=False,
|
||||||
MockSecretServer(),
|
SecretServer=MockSecretServer)
|
||||||
)
|
def test_missing_sdk(self):
|
||||||
|
with self.assertRaises(tss.AnsibleError):
|
||||||
|
self._run_lookup(self.VALID_TERMS)
|
||||||
|
|
||||||
|
@patch.multiple(TSS_IMPORT_PATH,
|
||||||
|
HAS_TSS_SDK=True,
|
||||||
|
SecretServerError=SecretServerError)
|
||||||
def test_get_secret_json(self):
|
def test_get_secret_json(self):
|
||||||
self.assertListEqual(
|
with patch(make_absolute('SecretServer'), MockSecretServer):
|
||||||
[MockSecretServer.RESPONSE],
|
self.assertListEqual([MockSecretServer.RESPONSE], self._run_lookup(self.VALID_TERMS))
|
||||||
self.lookup.run(
|
|
||||||
[1],
|
with self.assertRaises(tss.AnsibleOptionsError):
|
||||||
[],
|
self._run_lookup(self.INVALID_TERMS)
|
||||||
**{"base_url": "dummy", "username": "dummy", "password": "dummy", }
|
|
||||||
),
|
with patch(make_absolute('SecretServer'), MockFaultySecretServer):
|
||||||
)
|
with self.assertRaises(tss.AnsibleError):
|
||||||
|
self._run_lookup(self.VALID_TERMS)
|
||||||
|
|
||||||
|
def _run_lookup(self, terms, variables=None, **kwargs):
|
||||||
|
variables = variables or []
|
||||||
|
kwargs = kwargs or {"base_url": "dummy", "username": "dummy", "password": "dummy"}
|
||||||
|
|
||||||
|
return self.lookup.run(terms, variables, **kwargs)
|
||||||
|
|
Loading…
Reference in a new issue