mirror of
https://github.com/ansible-collections/community.general.git
synced 2024-09-14 20:13:21 +02:00
495 lines
18 KiB
Python
495 lines
18 KiB
Python
#!/usr/bin/python
|
|
# -*- coding: utf-8 -*-
|
|
#
|
|
# Ansible module to manage PaloAltoNetworks Firewall
|
|
# (c) 2016, techbizdev <techbizdev@paloaltonetworks.com>
|
|
#
|
|
# This file is part of Ansible
|
|
#
|
|
# Ansible is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# Ansible is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
|
|
# limitations under the License.
|
|
|
|
ANSIBLE_METADATA = {'metadata_version': '1.1',
|
|
'status': ['deprecated'],
|
|
'supported_by': 'community'}
|
|
|
|
DOCUMENTATION = '''
|
|
---
|
|
module: panos_query_rules
|
|
short_description: PANOS module that allows search for security rules in PANW NGFW devices.
|
|
description: >
|
|
- Security policies allow you to enforce rules and take action, and can be as general or specific as needed. The
|
|
policy rules are compared against the incoming traffic in sequence, and because the first rule that matches the
|
|
traffic is applied, the more specific rules must precede the more general ones.
|
|
author: "Bob Hagen (@rnh556)"
|
|
requirements:
|
|
- pan-python can be obtained from PyPI U(https://pypi.org/project/pan-python/)
|
|
- pandevice can be obtained from PyPI U(https://pypi.org/project/pandevice/)
|
|
- xmltodict can be obtains from PyPI U(https://pypi.org/project/xmltodict/)
|
|
deprecated:
|
|
alternative: Use U(https://galaxy.ansible.com/PaloAltoNetworks/paloaltonetworks) instead.
|
|
removed_in: "2.12"
|
|
why: Consolidating code base.
|
|
notes:
|
|
- Checkmode is not supported.
|
|
- Panorama is supported.
|
|
options:
|
|
ip_address:
|
|
description:
|
|
- IP address (or hostname) of PAN-OS firewall or Panorama management console being queried.
|
|
required: true
|
|
username:
|
|
description:
|
|
- Username credentials to use for authentication.
|
|
default: "admin"
|
|
password:
|
|
description:
|
|
- Password credentials to use for authentication.
|
|
required: true
|
|
api_key:
|
|
description:
|
|
- API key that can be used instead of I(username)/I(password) credentials.
|
|
application:
|
|
description:
|
|
- Name of the application or application group to be queried.
|
|
source_zone:
|
|
description:
|
|
- Name of the source security zone to be queried.
|
|
source_ip:
|
|
description:
|
|
- The source IP address to be queried.
|
|
source_port:
|
|
description:
|
|
- The source port to be queried.
|
|
destination_zone:
|
|
description:
|
|
- Name of the destination security zone to be queried.
|
|
destination_ip:
|
|
description:
|
|
- The destination IP address to be queried.
|
|
destination_port:
|
|
description:
|
|
- The destination port to be queried.
|
|
protocol:
|
|
description:
|
|
- The protocol used to be queried. Must be either I(tcp) or I(udp).
|
|
choices:
|
|
- tcp
|
|
- udp
|
|
tag_name:
|
|
description:
|
|
- Name of the rule tag to be queried.
|
|
devicegroup:
|
|
description:
|
|
- The Panorama device group in which to conduct the query.
|
|
'''
|
|
|
|
EXAMPLES = '''
|
|
- name: search for rules with tcp/3306
|
|
panos_query_rules:
|
|
ip_address: '{{ ip_address }}'
|
|
username: '{{ username }}'
|
|
password: '{{ password }}'
|
|
source_zone: 'DevNet'
|
|
destination_zone: 'DevVPC'
|
|
destination_port: '3306'
|
|
protocol: 'tcp'
|
|
|
|
- name: search devicegroup for inbound rules to dmz host
|
|
panos_query_rules:
|
|
ip_address: '{{ ip_address }}'
|
|
api_key: '{{ api_key }}'
|
|
destination_zone: 'DMZ'
|
|
destination_ip: '10.100.42.18'
|
|
address: 'DeviceGroupA'
|
|
|
|
- name: search for rules containing a specified rule tag
|
|
panos_query_rules:
|
|
ip_address: '{{ ip_address }}'
|
|
username: '{{ username }}'
|
|
password: '{{ password }}'
|
|
tag_name: 'ProjectX'
|
|
'''
|
|
|
|
RETURN = '''
|
|
# Default return values
|
|
'''
|
|
|
|
from ansible.module_utils.basic import AnsibleModule
|
|
|
|
try:
|
|
import pan.xapi
|
|
from pan.xapi import PanXapiError
|
|
import pandevice
|
|
from pandevice import base
|
|
from pandevice import firewall
|
|
from pandevice import panorama
|
|
from pandevice import objects
|
|
from pandevice import policies
|
|
import ipaddress
|
|
import xmltodict
|
|
import json
|
|
HAS_LIB = True
|
|
except ImportError:
|
|
HAS_LIB = False
|
|
|
|
|
|
def get_devicegroup(device, devicegroup):
|
|
dg_list = device.refresh_devices()
|
|
for group in dg_list:
|
|
if isinstance(group, pandevice.panorama.DeviceGroup):
|
|
if group.name == devicegroup:
|
|
return group
|
|
return False
|
|
|
|
|
|
def get_rulebase(device, devicegroup):
|
|
# Build the rulebase
|
|
if isinstance(device, firewall.Firewall):
|
|
rulebase = policies.Rulebase()
|
|
device.add(rulebase)
|
|
elif isinstance(device, panorama.Panorama):
|
|
dg = panorama.DeviceGroup(devicegroup)
|
|
device.add(dg)
|
|
rulebase = policies.PreRulebase()
|
|
dg.add(rulebase)
|
|
else:
|
|
return False
|
|
policies.SecurityRule.refreshall(rulebase)
|
|
return rulebase
|
|
|
|
|
|
def get_object(device, dev_group, obj_name):
|
|
# Search global address objects
|
|
match = device.find(obj_name, objects.AddressObject)
|
|
if match:
|
|
return match
|
|
|
|
# Search global address groups
|
|
match = device.find(obj_name, objects.AddressGroup)
|
|
if match:
|
|
return match
|
|
|
|
# Search Panorama device group
|
|
if isinstance(device, pandevice.panorama.Panorama):
|
|
# Search device group address objects
|
|
match = dev_group.find(obj_name, objects.AddressObject)
|
|
if match:
|
|
return match
|
|
|
|
# Search device group address groups
|
|
match = dev_group.find(obj_name, objects.AddressGroup)
|
|
if match:
|
|
return match
|
|
return False
|
|
|
|
|
|
def addr_in_obj(addr, obj):
|
|
ip = ipaddress.ip_address(addr)
|
|
# Process address objects
|
|
if isinstance(obj, objects.AddressObject):
|
|
if obj.type == 'ip-netmask':
|
|
net = ipaddress.ip_network(obj.value)
|
|
if ip in net:
|
|
return True
|
|
if obj.type == 'ip-range':
|
|
ip_range = obj.value.split('-')
|
|
lower = ipaddress.ip_address(ip_range[0])
|
|
upper = ipaddress.ip_address(ip_range[1])
|
|
if lower < ip < upper:
|
|
return True
|
|
return False
|
|
|
|
|
|
def get_services(device, dev_group, svc_list, obj_list):
|
|
for svc in svc_list:
|
|
|
|
# Search global address objects
|
|
global_obj_match = device.find(svc, objects.ServiceObject)
|
|
if global_obj_match:
|
|
obj_list.append(global_obj_match)
|
|
|
|
# Search global address groups
|
|
global_grp_match = device.find(svc, objects.ServiceGroup)
|
|
if global_grp_match:
|
|
get_services(device, dev_group, global_grp_match.value, obj_list)
|
|
|
|
# Search Panorama device group
|
|
if isinstance(device, pandevice.panorama.Panorama):
|
|
|
|
# Search device group address objects
|
|
dg_obj_match = dev_group.find(svc, objects.ServiceObject)
|
|
if dg_obj_match:
|
|
obj_list.append(dg_obj_match)
|
|
|
|
# Search device group address groups
|
|
dg_grp_match = dev_group.find(svc, objects.ServiceGroup)
|
|
if dg_grp_match:
|
|
get_services(device, dev_group, dg_grp_match.value, obj_list)
|
|
|
|
return obj_list
|
|
|
|
|
|
def port_in_svc(orientation, port, protocol, obj):
|
|
# Process address objects
|
|
if orientation == 'source':
|
|
for x in obj.source_port.split(','):
|
|
if '-' in x:
|
|
port_range = x.split('-')
|
|
lower = int(port_range[0])
|
|
upper = int(port_range[1])
|
|
if (lower <= int(port) <= upper) and (obj.protocol == protocol):
|
|
return True
|
|
else:
|
|
if port == x and obj.protocol == protocol:
|
|
return True
|
|
elif orientation == 'destination':
|
|
for x in obj.destination_port.split(','):
|
|
if '-' in x:
|
|
port_range = x.split('-')
|
|
lower = int(port_range[0])
|
|
upper = int(port_range[1])
|
|
if (lower <= int(port) <= upper) and (obj.protocol == protocol):
|
|
return True
|
|
else:
|
|
if port == x and obj.protocol == protocol:
|
|
return True
|
|
return False
|
|
|
|
|
|
def get_tag(device, dev_group, tag_name):
|
|
# Search global address objects
|
|
match = device.find(tag_name, objects.Tag)
|
|
if match:
|
|
return match
|
|
# Search Panorama device group
|
|
if isinstance(device, panorama.Panorama):
|
|
# Search device group address objects
|
|
match = dev_group.find(tag_name, objects.Tag)
|
|
if match:
|
|
return match
|
|
return False
|
|
|
|
|
|
def main():
|
|
argument_spec = dict(
|
|
ip_address=dict(required=True),
|
|
password=dict(no_log=True),
|
|
username=dict(default='admin'),
|
|
api_key=dict(no_log=True),
|
|
application=dict(default=None),
|
|
source_zone=dict(default=None),
|
|
destination_zone=dict(default=None),
|
|
source_ip=dict(default=None),
|
|
destination_ip=dict(default=None),
|
|
source_port=dict(default=None),
|
|
destination_port=dict(default=None),
|
|
protocol=dict(default=None, choices=['tcp', 'udp']),
|
|
tag_name=dict(default=None),
|
|
devicegroup=dict(default=None)
|
|
)
|
|
module = AnsibleModule(argument_spec=argument_spec, supports_check_mode=False,
|
|
required_one_of=[['api_key', 'password']]
|
|
)
|
|
if not HAS_LIB:
|
|
module.fail_json(msg='Missing required libraries.')
|
|
|
|
ip_address = module.params["ip_address"]
|
|
password = module.params["password"]
|
|
username = module.params['username']
|
|
api_key = module.params['api_key']
|
|
application = module.params['application']
|
|
source_zone = module.params['source_zone']
|
|
source_ip = module.params['source_ip']
|
|
source_port = module.params['source_port']
|
|
destination_zone = module.params['destination_zone']
|
|
destination_ip = module.params['destination_ip']
|
|
destination_port = module.params['destination_port']
|
|
protocol = module.params['protocol']
|
|
tag_name = module.params['tag_name']
|
|
devicegroup = module.params['devicegroup']
|
|
|
|
# Create the device with the appropriate pandevice type
|
|
device = base.PanDevice.create_from_device(ip_address, username, password, api_key=api_key)
|
|
|
|
# Grab the global objects
|
|
objects.AddressObject.refreshall(device)
|
|
objects.AddressGroup.refreshall(device)
|
|
objects.ServiceObject.refreshall(device)
|
|
objects.ServiceGroup.refreshall(device)
|
|
objects.Tag.refreshall(device)
|
|
|
|
# If Panorama, validate the devicegroup and grab the devicegroup objects
|
|
dev_group = None
|
|
if devicegroup and isinstance(device, panorama.Panorama):
|
|
dev_group = get_devicegroup(device, devicegroup)
|
|
if dev_group:
|
|
device.add(dev_group)
|
|
objects.AddressObject.refreshall(dev_group)
|
|
objects.AddressGroup.refreshall(dev_group)
|
|
objects.ServiceObject.refreshall(dev_group)
|
|
objects.ServiceGroup.refreshall(dev_group)
|
|
objects.Tag.refreshall(dev_group)
|
|
else:
|
|
module.fail_json(
|
|
failed=1,
|
|
msg='\'%s\' device group not found in Panorama. Is the name correct?' % devicegroup
|
|
)
|
|
|
|
# Build the rulebase and produce list
|
|
rulebase = get_rulebase(device, dev_group)
|
|
rulelist = rulebase.children
|
|
hitbase = policies.Rulebase()
|
|
loose_match = True
|
|
|
|
# Process each rule
|
|
for rule in rulelist:
|
|
hitlist = []
|
|
|
|
if source_zone:
|
|
source_zone_match = False
|
|
if loose_match and 'any' in rule.fromzone:
|
|
source_zone_match = True
|
|
else:
|
|
for object_string in rule.fromzone:
|
|
if object_string == source_zone:
|
|
source_zone_match = True
|
|
hitlist.append(source_zone_match)
|
|
|
|
if destination_zone:
|
|
destination_zone_match = False
|
|
if loose_match and 'any' in rule.tozone:
|
|
destination_zone_match = True
|
|
else:
|
|
for object_string in rule.tozone:
|
|
if object_string == destination_zone:
|
|
destination_zone_match = True
|
|
hitlist.append(destination_zone_match)
|
|
|
|
if source_ip:
|
|
source_ip_match = False
|
|
if loose_match and 'any' in rule.source:
|
|
source_ip_match = True
|
|
else:
|
|
for object_string in rule.source:
|
|
# Get a valid AddressObject or AddressGroup
|
|
obj = get_object(device, dev_group, object_string)
|
|
# Otherwise the object_string is not an object and should be handled differently
|
|
if obj is False:
|
|
if '-' in object_string:
|
|
obj = ipaddress.ip_address(source_ip)
|
|
source_range = object_string.split('-')
|
|
source_lower = ipaddress.ip_address(source_range[0])
|
|
source_upper = ipaddress.ip_address(source_range[1])
|
|
if source_lower <= obj <= source_upper:
|
|
source_ip_match = True
|
|
else:
|
|
if source_ip == object_string:
|
|
source_ip_match = True
|
|
if isinstance(obj, objects.AddressObject) and addr_in_obj(source_ip, obj):
|
|
source_ip_match = True
|
|
elif isinstance(obj, objects.AddressGroup) and obj.static_value:
|
|
for member_string in obj.static_value:
|
|
member = get_object(device, dev_group, member_string)
|
|
if addr_in_obj(source_ip, member):
|
|
source_ip_match = True
|
|
hitlist.append(source_ip_match)
|
|
|
|
if destination_ip:
|
|
destination_ip_match = False
|
|
if loose_match and 'any' in rule.destination:
|
|
destination_ip_match = True
|
|
else:
|
|
for object_string in rule.destination:
|
|
# Get a valid AddressObject or AddressGroup
|
|
obj = get_object(device, dev_group, object_string)
|
|
# Otherwise the object_string is not an object and should be handled differently
|
|
if obj is False:
|
|
if '-' in object_string:
|
|
obj = ipaddress.ip_address(destination_ip)
|
|
destination_range = object_string.split('-')
|
|
destination_lower = ipaddress.ip_address(destination_range[0])
|
|
destination_upper = ipaddress.ip_address(destination_range[1])
|
|
if destination_lower <= obj <= destination_upper:
|
|
destination_ip_match = True
|
|
else:
|
|
if destination_ip == object_string:
|
|
destination_ip_match = True
|
|
if isinstance(obj, objects.AddressObject) and addr_in_obj(destination_ip, obj):
|
|
destination_ip_match = True
|
|
elif isinstance(obj, objects.AddressGroup) and obj.static_value:
|
|
for member_string in obj.static_value:
|
|
member = get_object(device, dev_group, member_string)
|
|
if addr_in_obj(destination_ip, member):
|
|
destination_ip_match = True
|
|
hitlist.append(destination_ip_match)
|
|
|
|
if source_port:
|
|
source_port_match = False
|
|
orientation = 'source'
|
|
if loose_match and (rule.service[0] == 'any'):
|
|
source_port_match = True
|
|
elif rule.service[0] == 'application-default':
|
|
source_port_match = False # Fix this once apps are supported
|
|
else:
|
|
service_list = []
|
|
service_list = get_services(device, dev_group, rule.service, service_list)
|
|
for obj in service_list:
|
|
if port_in_svc(orientation, source_port, protocol, obj):
|
|
source_port_match = True
|
|
break
|
|
hitlist.append(source_port_match)
|
|
|
|
if destination_port:
|
|
destination_port_match = False
|
|
orientation = 'destination'
|
|
if loose_match and (rule.service[0] == 'any'):
|
|
destination_port_match = True
|
|
elif rule.service[0] == 'application-default':
|
|
destination_port_match = False # Fix this once apps are supported
|
|
else:
|
|
service_list = []
|
|
service_list = get_services(device, dev_group, rule.service, service_list)
|
|
for obj in service_list:
|
|
if port_in_svc(orientation, destination_port, protocol, obj):
|
|
destination_port_match = True
|
|
break
|
|
hitlist.append(destination_port_match)
|
|
|
|
if tag_name:
|
|
tag_match = False
|
|
if rule.tag:
|
|
for object_string in rule.tag:
|
|
obj = get_tag(device, dev_group, object_string)
|
|
if obj and (obj.name == tag_name):
|
|
tag_match = True
|
|
hitlist.append(tag_match)
|
|
|
|
# Add to hit rulebase
|
|
if False not in hitlist:
|
|
hitbase.add(rule)
|
|
|
|
# Dump the hit rulebase
|
|
if hitbase.children:
|
|
output_string = xmltodict.parse(hitbase.element_str())
|
|
module.exit_json(
|
|
stdout_lines=json.dumps(output_string, indent=2),
|
|
msg='%s of %s rules matched' % (hitbase.children.__len__(), rulebase.children.__len__())
|
|
)
|
|
else:
|
|
module.fail_json(msg='No matching rules found.')
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|