1
0
Fork 0
mirror of https://github.com/ansible-collections/community.general.git synced 2024-09-14 20:13:21 +02:00
community.general/plugins/modules/cloud/scaleway/scaleway_security_group_rule.py
Felix Fontein c34dc24d3a
Replace ansible.module_utils._text by ansible.module_utils.common.text.converters (#2877) (#2882)
* Replace ansible.module_utils._text by ansible.module_utils.common.text.converters.

* Also adjust tests.

(cherry picked from commit fafabed9e6)
2021-06-27 09:39:18 +02:00

263 lines
7.3 KiB
Python

#!/usr/bin/python
#
# Scaleway Security Group Rule management module
#
# Copyright (C) 2018 Antoine Barbare (antoinebarbare@gmail.com).
#
# GNU General Public License v3.0+ (see COPYING or
# https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import absolute_import, division, print_function
__metaclass__ = type
DOCUMENTATION = '''
---
module: scaleway_security_group_rule
short_description: Scaleway Security Group Rule management module
author: Antoine Barbare (@abarbare)
description:
- This module manages Security Group Rule on Scaleway account
U(https://developer.scaleway.com)
extends_documentation_fragment:
- community.general.scaleway
options:
state:
type: str
description:
- Indicate desired state of the Security Group Rule.
default: present
choices:
- present
- absent
region:
type: str
description:
- Scaleway region to use (for example C(par1)).
required: true
choices:
- ams1
- EMEA-NL-EVS
- par1
- EMEA-FR-PAR1
- par2
- EMEA-FR-PAR2
- waw1
- EMEA-PL-WAW1
protocol:
type: str
description:
- Network protocol to use
choices:
- TCP
- UDP
- ICMP
required: true
port:
description:
- Port related to the rule, null value for all the ports
required: true
type: int
ip_range:
type: str
description:
- IPV4 CIDR notation to apply to the rule
default: 0.0.0.0/0
direction:
type: str
description:
- Rule direction
choices:
- inbound
- outbound
required: true
action:
type: str
description:
- Rule action
choices:
- accept
- drop
required: true
security_group:
type: str
description:
- Security Group unique identifier
required: true
'''
EXAMPLES = '''
- name: Create a Security Group Rule
community.general.scaleway_security_group_rule:
state: present
region: par1
protocol: TCP
port: 80
ip_range: 0.0.0.0/0
direction: inbound
action: accept
security_group: b57210ee-1281-4820-a6db-329f78596ecb
register: security_group_rule_creation_task
'''
RETURN = '''
data:
description: This is only present when C(state=present)
returned: when C(state=present)
type: dict
sample: {
"scaleway_security_group_rule": {
"direction": "inbound",
"protocol": "TCP",
"ip_range": "0.0.0.0/0",
"dest_port_from": 80,
"action": "accept",
"position": 2,
"dest_port_to": null,
"editable": null,
"id": "10cb0b9a-80f6-4830-abd7-a31cd828b5e9"
}
}
'''
from ansible_collections.community.general.plugins.module_utils.scaleway import SCALEWAY_LOCATION, scaleway_argument_spec, Scaleway, payload_from_object
from ansible_collections.community.general.plugins.module_utils.compat.ipaddress import ip_network
from ansible.module_utils.common.text.converters import to_text
from ansible.module_utils.basic import AnsibleModule
def get_sgr_from_api(security_group_rules, security_group_rule):
""" Check if a security_group_rule specs are present in security_group_rules
Return None if no rules match the specs
Return the rule if found
"""
for sgr in security_group_rules:
if (sgr['ip_range'] == security_group_rule['ip_range'] and sgr['dest_port_from'] == security_group_rule['dest_port_from'] and
sgr['direction'] == security_group_rule['direction'] and sgr['action'] == security_group_rule['action'] and
sgr['protocol'] == security_group_rule['protocol']):
return sgr
return None
def present_strategy(api, security_group_id, security_group_rule):
ret = {'changed': False}
response = api.get('security_groups/%s/rules' % security_group_id)
if not response.ok:
api.module.fail_json(
msg='Error getting security group rules "%s": "%s" (%s)' %
(response.info['msg'], response.json['message'], response.json))
existing_rule = get_sgr_from_api(
response.json['rules'], security_group_rule)
if not existing_rule:
ret['changed'] = True
if api.module.check_mode:
return ret
# Create Security Group Rule
response = api.post('/security_groups/%s/rules' % security_group_id,
data=payload_from_object(security_group_rule))
if not response.ok:
api.module.fail_json(
msg='Error during security group rule creation: "%s": "%s" (%s)' %
(response.info['msg'], response.json['message'], response.json))
ret['scaleway_security_group_rule'] = response.json['rule']
else:
ret['scaleway_security_group_rule'] = existing_rule
return ret
def absent_strategy(api, security_group_id, security_group_rule):
ret = {'changed': False}
response = api.get('security_groups/%s/rules' % security_group_id)
if not response.ok:
api.module.fail_json(
msg='Error getting security group rules "%s": "%s" (%s)' %
(response.info['msg'], response.json['message'], response.json))
existing_rule = get_sgr_from_api(
response.json['rules'], security_group_rule)
if not existing_rule:
return ret
ret['changed'] = True
if api.module.check_mode:
return ret
response = api.delete(
'/security_groups/%s/rules/%s' %
(security_group_id, existing_rule['id']))
if not response.ok:
api.module.fail_json(
msg='Error deleting security group rule "%s": "%s" (%s)' %
(response.info['msg'], response.json['message'], response.json))
return ret
def core(module):
api = Scaleway(module=module)
security_group_rule = {
'protocol': module.params['protocol'],
'dest_port_from': module.params['port'],
'ip_range': module.params['ip_range'],
'direction': module.params['direction'],
'action': module.params['action'],
}
region = module.params['region']
module.params['api_url'] = SCALEWAY_LOCATION[region]['api_endpoint']
if module.params['state'] == 'present':
summary = present_strategy(
api=api,
security_group_id=module.params['security_group'],
security_group_rule=security_group_rule)
else:
summary = absent_strategy(
api=api,
security_group_id=module.params['security_group'],
security_group_rule=security_group_rule)
module.exit_json(**summary)
def main():
argument_spec = scaleway_argument_spec()
argument_spec.update(
state=dict(type='str', default='present', choices=['absent', 'present']),
region=dict(type='str', required=True, choices=list(SCALEWAY_LOCATION.keys())),
protocol=dict(type='str', required=True, choices=['TCP', 'UDP', 'ICMP']),
port=dict(type='int', required=True),
ip_range=dict(type='str', default='0.0.0.0/0'),
direction=dict(type='str', required=True, choices=['inbound', 'outbound']),
action=dict(type='str', required=True, choices=['accept', 'drop']),
security_group=dict(type='str', required=True),
)
module = AnsibleModule(
argument_spec=argument_spec,
supports_check_mode=True,
)
core(module)
if __name__ == '__main__':
main()