1
0
Fork 0
mirror of https://github.com/ansible-collections/community.general.git synced 2024-09-14 20:13:21 +02:00
community.general/plugins/module_utils/network/ftd/configuration.py
Ansible Core Team aebc1b03fd Initial commit
2020-03-09 09:11:07 +00:00

565 lines
24 KiB
Python

# Copyright (c) 2018 Cisco and/or its affiliates.
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
#
import copy
from functools import partial
from ansible_collections.community.general.plugins.module_utils.network.ftd.common import HTTPMethod, equal_objects, FtdConfigurationError, \
FtdServerError, ResponseParams, copy_identity_properties, FtdUnexpectedResponse
from ansible_collections.community.general.plugins.module_utils.network.ftd.fdm_swagger_client import OperationField, ValidationError
from ansible.module_utils.six import iteritems
DEFAULT_PAGE_SIZE = 10
DEFAULT_OFFSET = 0
UNPROCESSABLE_ENTITY_STATUS = 422
INVALID_UUID_ERROR_MESSAGE = "Validation failed due to an invalid UUID"
DUPLICATE_NAME_ERROR_MESSAGE = "Validation failed due to a duplicate name"
MULTIPLE_DUPLICATES_FOUND_ERROR = (
"Multiple objects matching specified filters are found. "
"Please, define filters more precisely to match one object exactly."
)
DUPLICATE_ERROR = (
"Cannot add a new object. "
"An object with the same name but different parameters already exists."
)
ADD_OPERATION_NOT_SUPPORTED_ERROR = (
"Cannot add a new object while executing an upsert request. "
"Creation of objects with this type is not supported."
)
PATH_PARAMS_FOR_DEFAULT_OBJ = {'objId': 'default'}
class OperationNamePrefix:
ADD = 'add'
EDIT = 'edit'
GET = 'get'
DELETE = 'delete'
UPSERT = 'upsert'
class QueryParams:
FILTER = 'filter'
class ParamName:
QUERY_PARAMS = 'query_params'
PATH_PARAMS = 'path_params'
DATA = 'data'
FILTERS = 'filters'
class CheckModeException(Exception):
pass
class FtdInvalidOperationNameError(Exception):
def __init__(self, operation_name):
super(FtdInvalidOperationNameError, self).__init__(operation_name)
self.operation_name = operation_name
class OperationChecker(object):
@classmethod
def is_add_operation(cls, operation_name, operation_spec):
"""
Check if operation defined with 'operation_name' is add object operation according to 'operation_spec'.
:param operation_name: name of the operation being called by the user
:type operation_name: str
:param operation_spec: specification of the operation being called by the user
:type operation_spec: dict
:return: True if the called operation is add object operation, otherwise False
:rtype: bool
"""
# Some endpoints have non-CRUD operations, so checking operation name is required in addition to the HTTP method
return operation_name.startswith(OperationNamePrefix.ADD) and is_post_request(operation_spec)
@classmethod
def is_edit_operation(cls, operation_name, operation_spec):
"""
Check if operation defined with 'operation_name' is edit object operation according to 'operation_spec'.
:param operation_name: name of the operation being called by the user
:type operation_name: str
:param operation_spec: specification of the operation being called by the user
:type operation_spec: dict
:return: True if the called operation is edit object operation, otherwise False
:rtype: bool
"""
# Some endpoints have non-CRUD operations, so checking operation name is required in addition to the HTTP method
return operation_name.startswith(OperationNamePrefix.EDIT) and is_put_request(operation_spec)
@classmethod
def is_delete_operation(cls, operation_name, operation_spec):
"""
Check if operation defined with 'operation_name' is delete object operation according to 'operation_spec'.
:param operation_name: name of the operation being called by the user
:type operation_name: str
:param operation_spec: specification of the operation being called by the user
:type operation_spec: dict
:return: True if the called operation is delete object operation, otherwise False
:rtype: bool
"""
# Some endpoints have non-CRUD operations, so checking operation name is required in addition to the HTTP method
return operation_name.startswith(OperationNamePrefix.DELETE) \
and operation_spec[OperationField.METHOD] == HTTPMethod.DELETE
@classmethod
def is_get_list_operation(cls, operation_name, operation_spec):
"""
Check if operation defined with 'operation_name' is get list of objects operation according to 'operation_spec'.
:param operation_name: name of the operation being called by the user
:type operation_name: str
:param operation_spec: specification of the operation being called by the user
:type operation_spec: dict
:return: True if the called operation is get a list of objects operation, otherwise False
:rtype: bool
"""
return operation_spec[OperationField.METHOD] == HTTPMethod.GET \
and operation_spec[OperationField.RETURN_MULTIPLE_ITEMS]
@classmethod
def is_get_operation(cls, operation_name, operation_spec):
"""
Check if operation defined with 'operation_name' is get objects operation according to 'operation_spec'.
:param operation_name: name of the operation being called by the user
:type operation_name: str
:param operation_spec: specification of the operation being called by the user
:type operation_spec: dict
:return: True if the called operation is get object operation, otherwise False
:rtype: bool
"""
return operation_spec[OperationField.METHOD] == HTTPMethod.GET \
and not operation_spec[OperationField.RETURN_MULTIPLE_ITEMS]
@classmethod
def is_upsert_operation(cls, operation_name):
"""
Check if operation defined with 'operation_name' is upsert objects operation according to 'operation_name'.
:param operation_name: name of the operation being called by the user
:type operation_name: str
:return: True if the called operation is upsert object operation, otherwise False
:rtype: bool
"""
return operation_name.startswith(OperationNamePrefix.UPSERT)
@classmethod
def is_find_by_filter_operation(cls, operation_name, params, operation_spec):
"""
Checks whether the called operation is 'find by filter'. This operation fetches all objects and finds
the matching ones by the given filter. As filtering is done on the client side, this operation should be used
only when selected filters are not implemented on the server side.
:param operation_name: name of the operation being called by the user
:type operation_name: str
:param operation_spec: specification of the operation being called by the user
:type operation_spec: dict
:param params: params - params should contain 'filters'
:return: True if the called operation is find by filter, otherwise False
:rtype: bool
"""
is_get_list = cls.is_get_list_operation(operation_name, operation_spec)
return is_get_list and ParamName.FILTERS in params and params[ParamName.FILTERS]
@classmethod
def is_upsert_operation_supported(cls, operations):
"""
Checks if all operations required for upsert object operation are defined in 'operations'.
:param operations: specification of the operations supported by model
:type operations: dict
:return: True if all criteria required to provide requested called operation are satisfied, otherwise False
:rtype: bool
"""
has_edit_op = next((name for name, spec in iteritems(operations) if cls.is_edit_operation(name, spec)), None)
has_get_list_op = next((name for name, spec in iteritems(operations)
if cls.is_get_list_operation(name, spec)), None)
return has_edit_op and has_get_list_op
class BaseConfigurationResource(object):
def __init__(self, conn, check_mode=False):
self._conn = conn
self.config_changed = False
self._operation_spec_cache = {}
self._models_operations_specs_cache = {}
self._check_mode = check_mode
self._operation_checker = OperationChecker
self._system_info = None
def execute_operation(self, op_name, params):
"""
Allow user request execution of simple operations(natively supported by API provider) as well as complex
operations(operations that are implemented as a set of simple operations).
:param op_name: name of the operation being called by the user
:type op_name: str
:param params: definition of the params that operation should be executed with
:type params: dict
:return: Result of the operation being executed
:rtype: dict
"""
if self._operation_checker.is_upsert_operation(op_name):
return self.upsert_object(op_name, params)
else:
return self.crud_operation(op_name, params)
def crud_operation(self, op_name, params):
"""
Allow user request execution of simple operations(natively supported by API provider) only.
:param op_name: name of the operation being called by the user
:type op_name: str
:param params: definition of the params that operation should be executed with
:type params: dict
:return: Result of the operation being executed
:rtype: dict
"""
op_spec = self.get_operation_spec(op_name)
if op_spec is None:
raise FtdInvalidOperationNameError(op_name)
if self._operation_checker.is_add_operation(op_name, op_spec):
resp = self.add_object(op_name, params)
elif self._operation_checker.is_edit_operation(op_name, op_spec):
resp = self.edit_object(op_name, params)
elif self._operation_checker.is_delete_operation(op_name, op_spec):
resp = self.delete_object(op_name, params)
elif self._operation_checker.is_find_by_filter_operation(op_name, params, op_spec):
resp = list(self.get_objects_by_filter(op_name, params))
else:
resp = self.send_general_request(op_name, params)
return resp
def get_operation_spec(self, operation_name):
if operation_name not in self._operation_spec_cache:
self._operation_spec_cache[operation_name] = self._conn.get_operation_spec(operation_name)
return self._operation_spec_cache[operation_name]
def get_operation_specs_by_model_name(self, model_name):
if model_name not in self._models_operations_specs_cache:
model_op_specs = self._conn.get_operation_specs_by_model_name(model_name)
self._models_operations_specs_cache[model_name] = model_op_specs
for op_name, op_spec in iteritems(model_op_specs):
self._operation_spec_cache.setdefault(op_name, op_spec)
return self._models_operations_specs_cache[model_name]
def get_objects_by_filter(self, operation_name, params):
def match_filters(filter_params, obj):
for k, v in iteritems(filter_params):
if k not in obj or obj[k] != v:
return False
return True
dummy, query_params, path_params = _get_user_params(params)
# copy required params to avoid mutation of passed `params` dict
url_params = {ParamName.QUERY_PARAMS: dict(query_params), ParamName.PATH_PARAMS: dict(path_params)}
filters = params.get(ParamName.FILTERS) or {}
if QueryParams.FILTER not in url_params[ParamName.QUERY_PARAMS] and 'name' in filters:
# most endpoints only support filtering by name, so remaining `filters` are applied on returned objects
url_params[ParamName.QUERY_PARAMS][QueryParams.FILTER] = self._stringify_name_filter(filters)
item_generator = iterate_over_pageable_resource(
partial(self.send_general_request, operation_name=operation_name), url_params
)
return (i for i in item_generator if match_filters(filters, i))
def _stringify_name_filter(self, filters):
build_version = self.get_build_version()
if build_version >= '6.4.0':
return "fts~%s" % filters['name']
return "name:%s" % filters['name']
def _fetch_system_info(self):
if not self._system_info:
params = {ParamName.PATH_PARAMS: PATH_PARAMS_FOR_DEFAULT_OBJ}
self._system_info = self.send_general_request('getSystemInformation', params)
return self._system_info
def get_build_version(self):
system_info = self._fetch_system_info()
return system_info['databaseInfo']['buildVersion']
def add_object(self, operation_name, params):
def is_duplicate_name_error(err):
return err.code == UNPROCESSABLE_ENTITY_STATUS and DUPLICATE_NAME_ERROR_MESSAGE in str(err)
try:
return self.send_general_request(operation_name, params)
except FtdServerError as e:
if is_duplicate_name_error(e):
return self._check_equality_with_existing_object(operation_name, params, e)
else:
raise e
def _check_equality_with_existing_object(self, operation_name, params, e):
"""
Looks for an existing object that caused "object duplicate" error and
checks whether it corresponds to the one specified in `params`.
In case a single object is found and it is equal to one we are trying
to create, the existing object is returned.
When the existing object is not equal to the object being created or
several objects are returned, an exception is raised.
"""
model_name = self.get_operation_spec(operation_name)[OperationField.MODEL_NAME]
existing_obj = self._find_object_matching_params(model_name, params)
if existing_obj is not None:
if equal_objects(existing_obj, params[ParamName.DATA]):
return existing_obj
else:
raise FtdConfigurationError(DUPLICATE_ERROR, existing_obj)
raise e
def _find_object_matching_params(self, model_name, params):
get_list_operation = self._find_get_list_operation(model_name)
if not get_list_operation:
return None
data = params[ParamName.DATA]
if not params.get(ParamName.FILTERS):
params[ParamName.FILTERS] = {'name': data['name']}
obj = None
filtered_objs = self.get_objects_by_filter(get_list_operation, params)
for i, obj in enumerate(filtered_objs):
if i > 0:
raise FtdConfigurationError(MULTIPLE_DUPLICATES_FOUND_ERROR)
obj = obj
return obj
def _find_get_list_operation(self, model_name):
operations = self.get_operation_specs_by_model_name(model_name) or {}
return next((
op for op, op_spec in operations.items()
if self._operation_checker.is_get_list_operation(op, op_spec)), None)
def _find_get_operation(self, model_name):
operations = self.get_operation_specs_by_model_name(model_name) or {}
return next((
op for op, op_spec in operations.items()
if self._operation_checker.is_get_operation(op, op_spec)), None)
def delete_object(self, operation_name, params):
def is_invalid_uuid_error(err):
return err.code == UNPROCESSABLE_ENTITY_STATUS and INVALID_UUID_ERROR_MESSAGE in str(err)
try:
return self.send_general_request(operation_name, params)
except FtdServerError as e:
if is_invalid_uuid_error(e):
return {'status': 'Referenced object does not exist'}
else:
raise e
def edit_object(self, operation_name, params):
data, dummy, path_params = _get_user_params(params)
model_name = self.get_operation_spec(operation_name)[OperationField.MODEL_NAME]
get_operation = self._find_get_operation(model_name)
if get_operation:
existing_object = self.send_general_request(get_operation, {ParamName.PATH_PARAMS: path_params})
if not existing_object:
raise FtdConfigurationError('Referenced object does not exist')
elif equal_objects(existing_object, data):
return existing_object
return self.send_general_request(operation_name, params)
def send_general_request(self, operation_name, params):
def stop_if_check_mode():
if self._check_mode:
raise CheckModeException()
self.validate_params(operation_name, params)
stop_if_check_mode()
data, query_params, path_params = _get_user_params(params)
op_spec = self.get_operation_spec(operation_name)
url, method = op_spec[OperationField.URL], op_spec[OperationField.METHOD]
return self._send_request(url, method, data, path_params, query_params)
def _send_request(self, url_path, http_method, body_params=None, path_params=None, query_params=None):
def raise_for_failure(resp):
if not resp[ResponseParams.SUCCESS]:
raise FtdServerError(resp[ResponseParams.RESPONSE], resp[ResponseParams.STATUS_CODE])
response = self._conn.send_request(url_path=url_path, http_method=http_method, body_params=body_params,
path_params=path_params, query_params=query_params)
raise_for_failure(response)
if http_method != HTTPMethod.GET:
self.config_changed = True
return response[ResponseParams.RESPONSE]
def validate_params(self, operation_name, params):
report = {}
op_spec = self.get_operation_spec(operation_name)
data, query_params, path_params = _get_user_params(params)
def validate(validation_method, field_name, user_params):
key = 'Invalid %s provided' % field_name
try:
is_valid, validation_report = validation_method(operation_name, user_params)
if not is_valid:
report[key] = validation_report
except Exception as e:
report[key] = str(e)
return report
validate(self._conn.validate_query_params, ParamName.QUERY_PARAMS, query_params)
validate(self._conn.validate_path_params, ParamName.PATH_PARAMS, path_params)
if is_post_request(op_spec) or is_put_request(op_spec):
validate(self._conn.validate_data, ParamName.DATA, data)
if report:
raise ValidationError(report)
@staticmethod
def _get_operation_name(checker, operations):
return next((op_name for op_name, op_spec in iteritems(operations) if checker(op_name, op_spec)), None)
def _add_upserted_object(self, model_operations, params):
add_op_name = self._get_operation_name(self._operation_checker.is_add_operation, model_operations)
if not add_op_name:
raise FtdConfigurationError(ADD_OPERATION_NOT_SUPPORTED_ERROR)
return self.add_object(add_op_name, params)
def _edit_upserted_object(self, model_operations, existing_object, params):
edit_op_name = self._get_operation_name(self._operation_checker.is_edit_operation, model_operations)
_set_default(params, 'path_params', {})
_set_default(params, 'data', {})
params['path_params']['objId'] = existing_object['id']
copy_identity_properties(existing_object, params['data'])
return self.edit_object(edit_op_name, params)
def upsert_object(self, op_name, params):
"""
Updates an object if it already exists, or tries to create a new one if there is no
such object. If multiple objects match filter criteria, or add operation is not supported,
the exception is raised.
:param op_name: upsert operation name
:type op_name: str
:param params: params that upsert operation should be executed with
:type params: dict
:return: upserted object representation
:rtype: dict
"""
def extract_and_validate_model():
model = op_name[len(OperationNamePrefix.UPSERT):]
if not self._conn.get_model_spec(model):
raise FtdInvalidOperationNameError(op_name)
return model
model_name = extract_and_validate_model()
model_operations = self.get_operation_specs_by_model_name(model_name)
if not self._operation_checker.is_upsert_operation_supported(model_operations):
raise FtdInvalidOperationNameError(op_name)
existing_obj = self._find_object_matching_params(model_name, params)
if existing_obj:
equal_to_existing_obj = equal_objects(existing_obj, params[ParamName.DATA])
return existing_obj if equal_to_existing_obj \
else self._edit_upserted_object(model_operations, existing_obj, params)
else:
return self._add_upserted_object(model_operations, params)
def _set_default(params, field_name, value):
if field_name not in params or params[field_name] is None:
params[field_name] = value
def is_post_request(operation_spec):
return operation_spec[OperationField.METHOD] == HTTPMethod.POST
def is_put_request(operation_spec):
return operation_spec[OperationField.METHOD] == HTTPMethod.PUT
def _get_user_params(params):
return params.get(ParamName.DATA) or {}, params.get(ParamName.QUERY_PARAMS) or {}, params.get(
ParamName.PATH_PARAMS) or {}
def iterate_over_pageable_resource(resource_func, params):
"""
A generator function that iterates over a resource that supports pagination and lazily returns present items
one by one.
:param resource_func: function that receives `params` argument and returns a page of objects
:type resource_func: callable
:param params: initial dictionary of parameters that will be passed to the resource_func.
Should contain `query_params` inside.
:type params: dict
:return: an iterator containing returned items
:rtype: iterator of dict
"""
# creating a copy not to mutate passed dict
params = copy.deepcopy(params)
params[ParamName.QUERY_PARAMS].setdefault('limit', DEFAULT_PAGE_SIZE)
params[ParamName.QUERY_PARAMS].setdefault('offset', DEFAULT_OFFSET)
limit = int(params[ParamName.QUERY_PARAMS]['limit'])
def received_less_items_than_requested(items_in_response, items_expected):
if items_in_response == items_expected:
return False
elif items_in_response < items_expected:
return True
raise FtdUnexpectedResponse(
"Get List of Objects Response from the server contains more objects than requested. "
"There are {0} item(s) in the response while {1} was(ere) requested".format(
items_in_response, items_expected)
)
while True:
result = resource_func(params=params)
for item in result['items']:
yield item
if received_less_items_than_requested(len(result['items']), limit):
break
# creating a copy not to mutate existing dict
params = copy.deepcopy(params)
query_params = params[ParamName.QUERY_PARAMS]
query_params['offset'] = int(query_params['offset']) + limit