1
0
Fork 0
mirror of https://github.com/ansible-collections/community.general.git synced 2024-09-14 20:13:21 +02:00

Refactors bigip device sshd (#33473)

* Refactors bigip device sshd

Fixes coding conventions for currect f5 conventions

* Fixes upstream errors
This commit is contained in:
Tim Rupp 2017-12-01 20:27:41 -08:00 committed by GitHub
parent a3b3dbe220
commit 76135a500e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 346 additions and 202 deletions

View file

@ -3,17 +3,20 @@
#
# Copyright (c) 2017 F5 Networks Inc.
# GNU General Public License v3.0 (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import absolute_import, division, print_function
__metaclass__ = type
ANSIBLE_METADATA = {'metadata_version': '1.1',
'status': ['preview'],
'supported_by': 'community'}
DOCUMENTATION = '''
DOCUMENTATION = r'''
---
module: bigip_device_sshd
short_description: Manage the SSHD settings of a BIG-IP
description:
- Manage the SSHD settings of a BIG-IP
- Manage the SSHD settings of a BIG-IP.
version_added: "2.2"
options:
allow:
@ -28,7 +31,6 @@ options:
banner:
description:
- Whether to enable the banner or not.
required: false
choices:
- enabled
- disabled
@ -36,12 +38,10 @@ options:
description:
- Specifies the text to include on the pre-login banner that displays
when a user attempts to login to the system using SSH.
required: false
inactivity_timeout:
description:
- Specifies the number of seconds before inactivity causes an SSH
session to log out.
required: false
log_level:
description:
- Specifies the minimum SSHD message level to include in the system log.
@ -62,11 +62,9 @@ options:
choices:
- enabled
- disabled
required: false
port:
description:
- Port that you want the SSH daemon to run on.
required: false
notes:
- Requires the f5-sdk Python package on the host This is as easy as pip
install f5-sdk.
@ -78,35 +76,35 @@ author:
- Tim Rupp (@caphrim007)
'''
EXAMPLES = '''
EXAMPLES = r'''
- name: Set the banner for the SSHD service from a string
bigip_device_sshd:
banner: "enabled"
banner_text: "banner text goes here"
password: "secret"
server: "lb.mydomain.com"
user: "admin"
banner: enabled
banner_text: banner text goes here
password: secret
server: lb.mydomain.com
user: admin
delegate_to: localhost
- name: Set the banner for the SSHD service from a file
bigip_device_sshd:
banner: "enabled"
banner: enabled
banner_text: "{{ lookup('file', '/path/to/file') }}"
password: "secret"
server: "lb.mydomain.com"
user: "admin"
password: secret
server: lb.mydomain.com
user: admin
delegate_to: localhost
- name: Set the SSHD service to run on port 2222
bigip_device_sshd:
password: "secret"
password: secret
port: 2222
server: "lb.mydomain.com"
user: "admin"
server: lb.mydomain.com
user: admin
delegate_to: localhost
'''
RETURN = '''
RETURN = r'''
allow:
description: >
Specifies, if you have enabled SSH access, the IP address or address
@ -114,31 +112,31 @@ allow:
system.
returned: changed
type: string
sample: "192.0.2.*"
sample: 192.0.2.*
banner:
description: Whether the banner is enabled or not.
returned: changed
type: string
sample: "true"
sample: true
banner_text:
description: >
Specifies the text included on the pre-login banner that
displays when a user attempts to login to the system using SSH.
returned: changed and success
type: string
sample: "This is a corporate device. Connecting to it without..."
sample: This is a corporate device. Connecting to it without...
inactivity_timeout:
description: >
The number of seconds before inactivity causes an SSH.
The number of seconds before inactivity causes an SSH
session to log out.
returned: changed
type: int
sample: "10"
sample: 10
log_level:
description: The minimum SSHD message level to include in the system log.
returned: changed
type: string
sample: "debug"
sample: debug
login:
description: Specifies that the system accepts SSH communications or not.
returned: changed
@ -151,186 +149,205 @@ port:
sample: 22
'''
from ansible.module_utils.f5_utils import AnsibleF5Client
from ansible.module_utils.f5_utils import AnsibleF5Parameters
from ansible.module_utils.f5_utils import HAS_F5SDK
from ansible.module_utils.f5_utils import F5ModuleError
try:
from f5.bigip import ManagementRoot
from icontrol.session import iControlUnexpectedHTTPError
HAS_F5SDK = True
from ansible.module_utils.f5_utils import iControlUnexpectedHTTPError
except ImportError:
HAS_F5SDK = False
CHOICES = ['enabled', 'disabled']
LEVELS = ['debug', 'debug1', 'debug2', 'debug3', 'error', 'fatal', 'info',
'quiet', 'verbose']
class Parameters(AnsibleF5Parameters):
api_map = {
'bannerText': 'banner_text',
'inactivityTimeout': 'inactivity_timeout',
'logLevel': 'log_level'
}
class BigIpDeviceSshd(object):
def __init__(self, *args, **kwargs):
if not HAS_F5SDK:
raise F5ModuleError("The python f5-sdk module is required")
api_attributes = [
'allow', 'banner', 'bannerText', 'inactivityTimeout', 'logLevel',
'login', 'port'
]
# The params that change in the module
self.cparams = dict()
updatables = [
'allow', 'banner', 'banner_text', 'inactivity_timeout', 'log_level',
'login', 'port'
]
# Stores the params that are sent to the module
self.params = kwargs
self.api = ManagementRoot(kwargs['server'],
kwargs['user'],
kwargs['password'],
port=kwargs['server_port'])
returnables = [
'allow', 'banner', 'banner_text', 'inactivity_timeout', 'log_level',
'login', 'port'
]
def update(self):
changed = False
current = self.read()
params = dict()
def to_return(self):
result = {}
for returnable in self.returnables:
result[returnable] = getattr(self, returnable)
result = self._filter_params(result)
return result
allow = self.params['allow']
banner = self.params['banner']
banner_text = self.params['banner_text']
timeout = self.params['inactivity_timeout']
log_level = self.params['log_level']
login = self.params['login']
port = self.params['port']
check_mode = self.params['check_mode']
if allow:
if 'allow' in current:
items = set(allow)
if items != current['allow']:
params['allow'] = list(items)
def api_params(self):
result = {}
for api_attribute in self.api_attributes:
if self.api_map is not None and api_attribute in self.api_map:
result[api_attribute] = getattr(self, self.api_map[api_attribute])
else:
params['allow'] = allow
result[api_attribute] = getattr(self, api_attribute)
result = self._filter_params(result)
return result
if banner:
if 'banner' in current:
if banner != current['banner']:
params['banner'] = banner
else:
params['banner'] = banner
@property
def inactivity_timeout(self):
if self._values['inactivity_timeout'] is None:
return None
return int(self._values['inactivity_timeout'])
if banner_text:
if 'banner_text' in current:
if banner_text != current['banner_text']:
params['bannerText'] = banner_text
else:
params['bannerText'] = banner_text
@property
def port(self):
if self._values['port'] is None:
return None
return int(self._values['port'])
if timeout:
if 'inactivity_timeout' in current:
if timeout != current['inactivity_timeout']:
params['inactivityTimeout'] = timeout
else:
params['inactivityTimeout'] = timeout
@property
def allow(self):
if self._values['allow'] is None:
return None
allow = self._values['allow']
return list(set([str(x) for x in allow]))
if log_level:
if 'log_level' in current:
if log_level != current['log_level']:
params['logLevel'] = log_level
else:
params['logLevel'] = log_level
if login:
if 'login' in current:
if login != current['login']:
params['login'] = login
else:
params['login'] = login
class ModuleManager(object):
def __init__(self, client):
self.client = client
self.have = None
self.want = Parameters(self.client.module.params)
self.changes = Parameters()
if port:
if 'port' in current:
if port != current['port']:
params['port'] = port
else:
params['port'] = port
def _update_changed_options(self):
changed = {}
for key in Parameters.updatables:
if getattr(self.want, key) is not None:
attr1 = getattr(self.want, key)
attr2 = getattr(self.have, key)
if attr1 != attr2:
changed[key] = attr1
if changed:
self.changes = Parameters(changed)
return True
return False
if params:
changed = True
if check_mode:
return changed
self.cparams = camel_dict_to_snake_dict(params)
else:
return changed
r = self.api.tm.sys.sshd.load()
r.update(**params)
r.refresh()
return changed
def read(self):
"""Read information and transform it
The values that are returned by BIG-IP in the f5-sdk can have encoding
attached to them as well as be completely missing in some cases.
Therefore, this method will transform the data from the BIG-IP into a
format that is more easily consumable by the rest of the class and the
parameters that are supported by the module.
"""
p = dict()
r = self.api.tm.sys.sshd.load()
if hasattr(r, 'allow'):
# Deliberately using sets to suppress duplicates
p['allow'] = set([str(x) for x in r.allow])
if hasattr(r, 'banner'):
p['banner'] = str(r.banner)
if hasattr(r, 'bannerText'):
p['banner_text'] = str(r.bannerText)
if hasattr(r, 'inactivityTimeout'):
p['inactivity_timeout'] = str(r.inactivityTimeout)
if hasattr(r, 'logLevel'):
p['log_level'] = str(r.logLevel)
if hasattr(r, 'login'):
p['login'] = str(r.login)
if hasattr(r, 'port'):
p['port'] = int(r.port)
return p
def flush(self):
def exec_module(self):
result = dict()
changed = False
try:
changed = self.update()
except iControlUnexpectedHTTPError as e:
raise F5ModuleError(str(e))
result.update(**self.cparams)
changes = self.changes.to_return()
result.update(**changes)
result.update(dict(changed=changed))
return result
def read_current_from_device(self):
resource = self.client.api.tm.sys.sshd.load()
result = resource.attrs
return Parameters(result)
def update(self):
self.have = self.read_current_from_device()
if not self.should_update():
return False
if self.client.check_mode:
return True
self.update_on_device()
return True
def should_update(self):
result = self._update_changed_options()
if result:
return True
return False
def update_on_device(self):
params = self.want.api_params()
resource = self.client.api.tm.sys.sshd.load()
resource.update(**params)
class ArgumentSpec(object):
def __init__(self):
self.choices = ['enabled', 'disabled']
self.levels = [
'debug', 'debug1', 'debug2', 'debug3', 'error', 'fatal', 'info',
'quiet', 'verbose'
]
self.supports_check_mode = True
self.argument_spec = dict(
allow=dict(
required=False,
default=None,
type='list'
),
banner=dict(
required=False,
default=None,
choices=self.choices
),
banner_text=dict(
required=False,
default=None
),
inactivity_timeout=dict(
required=False,
default=None,
type='int'
),
log_level=dict(
required=False,
default=None,
choices=self.levels
),
login=dict(
required=False,
default=None,
choices=self.choices
),
port=dict(
required=False,
default=None,
type='int'
),
state=dict(
default='present',
choices=['present']
)
)
self.f5_product_name = 'bigip'
def main():
argument_spec = f5_argument_spec()
if not HAS_F5SDK:
raise F5ModuleError("The python f5-sdk module is required")
meta_args = dict(
allow=dict(required=False, default=None, type='list'),
banner=dict(required=False, default=None, choices=CHOICES),
banner_text=dict(required=False, default=None),
inactivity_timeout=dict(required=False, default=None, type='int'),
log_level=dict(required=False, default=None, choices=LEVELS),
login=dict(required=False, default=None, choices=CHOICES),
port=dict(required=False, default=None, type='int'),
state=dict(default='present', choices=['present'])
)
argument_spec.update(meta_args)
spec = ArgumentSpec()
module = AnsibleModule(
argument_spec=argument_spec,
supports_check_mode=True
client = AnsibleF5Client(
argument_spec=spec.argument_spec,
supports_check_mode=spec.supports_check_mode,
f5_product_name=spec.f5_product_name
)
try:
obj = BigIpDeviceSshd(check_mode=module.check_mode, **module.params)
result = obj.flush()
module.exit_json(**result)
mm = ModuleManager(client)
results = mm.exec_module()
client.module.exit_json(**results)
except F5ModuleError as e:
module.fail_json(msg=str(e))
from ansible.module_utils.basic import *
from ansible.module_utils.ec2 import camel_dict_to_snake_dict
from ansible.module_utils.f5_utils import *
client.module.fail_json(msg=str(e))
if __name__ == '__main__':
main()

View file

@ -0,0 +1,127 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2017 F5 Networks Inc.
# GNU General Public License v3.0 (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import os
import json
import sys
from nose.plugins.skip import SkipTest
if sys.version_info < (2, 7):
raise SkipTest("F5 Ansible modules require Python >= 2.7")
from ansible.compat.tests import unittest
from ansible.compat.tests.mock import Mock
from ansible.compat.tests.mock import patch
from ansible.module_utils.f5_utils import AnsibleF5Client
try:
from library.bigip_device_sshd import Parameters
from library.bigip_device_sshd import ModuleManager
from library.bigip_device_sshd import ArgumentSpec
from ansible.module_utils.f5_utils import iControlUnexpectedHTTPError
from test.unit.modules.utils import set_module_args
except ImportError:
try:
from ansible.modules.network.f5.bigip_device_sshd import Parameters
from ansible.modules.network.f5.bigip_device_sshd import ModuleManager
from ansible.modules.network.f5.bigip_device_sshd import ArgumentSpec
from ansible.module_utils.f5_utils import iControlUnexpectedHTTPError
from units.modules.utils import set_module_args
except ImportError:
raise SkipTest("F5 Ansible modules require the f5-sdk Python library")
fixture_path = os.path.join(os.path.dirname(__file__), 'fixtures')
fixture_data = {}
def load_fixture(name):
path = os.path.join(fixture_path, name)
if path in fixture_data:
return fixture_data[path]
with open(path) as f:
data = f.read()
try:
data = json.loads(data)
except Exception:
pass
fixture_data[path] = data
return data
class TestParameters(unittest.TestCase):
def test_module_parameters(self):
args = dict(
allow=['all'],
banner='enabled',
banner_text='asdf',
inactivity_timeout='100',
log_level='debug',
login='enabled',
port=1010,
server='localhost',
user='admin',
password='password'
)
p = Parameters(args)
assert p.allow == ['all']
assert p.banner == 'enabled'
assert p.banner_text == 'asdf'
assert p.inactivity_timeout == 100
assert p.log_level == 'debug'
assert p.login == 'enabled'
assert p.port == 1010
class TestManager(unittest.TestCase):
def setUp(self):
self.spec = ArgumentSpec()
@patch('ansible.module_utils.f5_utils.AnsibleF5Client._get_mgmt_root',
return_value=True)
def test_update_settings(self, *args):
set_module_args(dict(
allow=['all'],
banner='enabled',
banner_text='asdf',
inactivity_timeout='100',
log_level='debug',
login='enabled',
port=1010,
server='localhost',
user='admin',
password='password'
))
# Configure the parameters that would be returned by querying the
# remote device
current = Parameters(
dict(
allow=['172.27.1.1']
)
)
client = AnsibleF5Client(
argument_spec=self.spec.argument_spec,
supports_check_mode=self.spec.supports_check_mode,
f5_product_name=self.spec.f5_product_name
)
mm = ModuleManager(client)
# Override methods to force specific logic in the module to happen
mm.update_on_device = Mock(return_value=True)
mm.read_current_from_device = Mock(return_value=current)
results = mm.exec_module()
assert results['changed'] is True
assert results['allow'] == ['all']