1
0
Fork 0
mirror of https://github.com/ansible-collections/community.general.git synced 2024-09-14 20:13:21 +02:00

awall: revamp the module

This commit is contained in:
Alexei Znamensky 2023-04-28 10:22:35 +12:00
parent eddd1ba4f2
commit f5c3b50550

View file

@ -70,68 +70,13 @@ EXAMPLES = r'''
RETURN = ''' # '''
import re
from ansible.module_utils.basic import AnsibleModule
from ansible_collections.community.general.plugins.module_utils.cmd_runner import CmdRunner, cmd_runner_fmt
from ansible_collections.community.general.plugins.module_utils.module_helper import StateModuleHelper
from ansible_collections.community.general.plugins.module_utils.mh.deco import check_mode_skip
def activate(module):
cmd = "%s activate --force" % (AWALL_PATH)
rc, stdout, stderr = module.run_command(cmd)
if rc == 0:
return True
else:
module.fail_json(msg="could not activate new rules", stdout=stdout, stderr=stderr)
def is_policy_enabled(module, name):
cmd = "%s list" % (AWALL_PATH)
rc, stdout, stderr = module.run_command(cmd)
if re.search(r"^%s\s+enabled" % name, stdout, re.MULTILINE):
return True
return False
def enable_policy(module, names, act):
policies = []
for name in names:
if not is_policy_enabled(module, name):
policies.append(name)
if not policies:
module.exit_json(changed=False, msg="policy(ies) already enabled")
names = " ".join(policies)
if module.check_mode:
cmd = "%s list" % (AWALL_PATH)
else:
cmd = "%s enable %s" % (AWALL_PATH, names)
rc, stdout, stderr = module.run_command(cmd)
if rc != 0:
module.fail_json(msg="failed to enable %s" % names, stdout=stdout, stderr=stderr)
if act and not module.check_mode:
activate(module)
module.exit_json(changed=True, msg="enabled awall policy(ies): %s" % names)
def disable_policy(module, names, act):
policies = []
for name in names:
if is_policy_enabled(module, name):
policies.append(name)
if not policies:
module.exit_json(changed=False, msg="policy(ies) already disabled")
names = " ".join(policies)
if module.check_mode:
cmd = "%s list" % (AWALL_PATH)
else:
cmd = "%s disable %s" % (AWALL_PATH, names)
rc, stdout, stderr = module.run_command(cmd)
if rc != 0:
module.fail_json(msg="failed to disable %s" % names, stdout=stdout, stderr=stderr)
if act and not module.check_mode:
activate(module)
module.exit_json(changed=True, msg="disabled awall policy(ies): %s" % names)
def main():
module = AnsibleModule(
class AWall(StateModuleHelper):
module = dict(
argument_spec=dict(
state=dict(type='str', default='enabled', choices=['disabled', 'enabled']),
name=dict(type='list', elements='str'),
@ -141,23 +86,83 @@ def main():
supports_check_mode=True,
)
global AWALL_PATH
AWALL_PATH = module.get_bin_path('awall', required=True)
def __init_module__(self):
state_map = dict(
enabled="enable",
disabled="disable",
)
p = module.params
self.runner = CmdRunner(
self.module,
command="awall",
arg_formats=dict(
activate=cmd_runner_fmt.as_fixed(["activate", "--force"]),
list=cmd_runner_fmt.as_fixed("list"),
state=cmd_runner_fmt.as_map(state_map),
name=cmd_runner_fmt.as_list(),
),
required_one_of=[["name", "activate"]],
check_rc=True,
)
if p['name']:
if p['state'] == 'enabled':
enable_policy(module, p['name'], p['activate'])
elif p['state'] == 'disabled':
disable_policy(module, p['name'], p['activate'])
self.vars.set("changed_policies", [], output=False, change=True)
if p['activate']:
if not module.check_mode:
activate(module)
module.exit_json(changed=True, msg="activated awall rules")
@check_mode_skip
def activate(self):
with self.runner("activate") as ctx:
ctx.run()
return True
module.fail_json(msg="no action defined")
def is_policy_enabled(self, name):
def process(rc, out, err):
return bool(re.search(r"^%s\s+enabled" % name, out, re.MULTILINE))
with self.runner("list", output_process=process) as ctx:
return ctx.run()
def state_enabled(self):
self.vars.changed_policies = [name
for name in self.vars.names
if not self.is_policy_enabled(name)]
if not self.vars.changed_policies:
return
if self.check_mode:
with self.runner("list") as ctx:
ctx.run()
else:
with self.runner("state name") as ctx:
ctx.run(name=self.vars.changed_policies)
def state_disabled(self):
self.vars.changed_policies = [name
for name in self.vars.names
if self.is_policy_enabled(name)]
if not self.vars.changed_policies:
return
if self.check_mode:
with self.runner("list") as ctx:
ctx.run()
else:
with self.runner("state name") as ctx:
ctx.run(name=self.vars.changed_policies)
def __state_fallback__(self):
# if state is not passed
pass
def __quit_module__(self):
if self.vars.activate:
self.activate()
if self.vars.changed_policies:
self.vars.meta("changed_policies", output=True)
def main():
AWall.execute()
if __name__ == '__main__':