1
0
Fork 0
mirror of https://github.com/ansible-collections/community.general.git synced 2024-09-14 20:13:21 +02:00

Checkpoint httpapi plugin (#49929)

* Add checkpoint httpapi plugin and access rule facts module

* WIP checkpoint_access_rule module

* Add publish and install policy, plus fix empty json object request for publish

* Refactor publish and install_policy onto module_utils

* Add update resource logic

* Add checkpoint_host_facts module

* Return code and response on get_acess_rule function

* Add checkpoint_host module

* Add checkpoint_run_script module

* Add checkpoint_task_facts module

* Show all tasks if no task id is passed

Note, this is only available on v1.3 of Checkpoint WS API

* Add update logic to checkpoint host

* Add full details on get task call

* Add checkpoint httpapi plugin

* Fix pep8

* Use auth instead of sid property and return False on handle_httperror method

* Fix version in docstring

* Remove constructor

* Remove Accept from base headers

* Do not override http error handler and assign Checkpoint sid to connection _auth

There is scaffolding in the base class to autoappend the token, given
it is assigned to connection _send

* Use new connection queue message method instead of display

* Remove unused display

* Catch ValueError, since it's a parent of JSONDecodeError

* Make static methods that are not used outside the class regular methods

* Add missing self to previously static methods

* Fix logout

Was carrying copy pasta from ftd plugin

* Remove send_auth_request

* Use BASE_HEADERS constant

* Simplify copyright header on httpapi plugin

* Remove access rule module

* Remove unused imports

* Add unit test

* Fix pep8

* Add test

* Add test

* Fix pep8
This commit is contained in:
Ricardo Carrillo Cruz 2019-01-07 14:02:29 +01:00 committed by GitHub
parent 82c95e07b5
commit f9079274e7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 175 additions and 0 deletions

View file

@ -0,0 +1,36 @@
# This code is part of Ansible, but is an independent component.
# This particular file snippet, and this file snippet only, is BSD licensed.
# Modules you write using this snippet, which is embedded dynamically by Ansible
# still belong to the author of the module, and may assign their own license
# to the complete work.
#
# (c) 2018 Red Hat Inc.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
# IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
# USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
def publish(module, connection):
connection.send_request('/web_api/publish', None)
def install_policy(module, connection):
payload = {'policy-package': 'standard'}
connection.send_request('/web_api/install-policy', payload)

View file

@ -0,0 +1,76 @@
# (c) 2018 Red Hat Inc.
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
DOCUMENTATION = """
---
author: Ansible Networking Team
httpapi : checkpoint
short_description: HttpApi Plugin for Checkpoint devices
description:
- This HttpApi plugin provides methods to connect to Checkpoint
devices over a HTTP(S)-based api.
version_added: "2.8"
"""
import json
from ansible.module_utils.basic import to_text
from ansible.errors import AnsibleConnectionFailure
from ansible.module_utils.six.moves.urllib.error import HTTPError
from ansible.plugins.httpapi import HttpApiBase
from ansible.module_utils.connection import ConnectionError
BASE_HEADERS = {
'Content-Type': 'application/json',
}
class HttpApi(HttpApiBase):
def login(self, username, password):
if username and password:
payload = {'user': username, 'password': password}
url = '/web_api/login'
response, response_data = self.send_request(url, payload)
else:
raise AnsibleConnectionFailure('Username and password are required for login')
try:
self.connection._auth = {'X-chkp-sid': response_data['sid']}
except KeyError:
raise ConnectionError(
'Server returned response without token info during connection authentication: %s' % response)
def logout(self):
url = '/web_api/logout'
response, dummy = self.send_request(url, None)
def send_request(self, path, body_params):
data = json.dumps(body_params) if body_params else '{}'
try:
self._display_request()
response, response_data = self.connection.send(path, data, method='POST', headers=BASE_HEADERS)
value = self._get_response_value(response_data)
return response.getcode(), self._response_to_json(value)
except HTTPError as e:
error = json.loads(e.read())
return e.code, error
def _display_request(self):
self.connection.queue_message('vvvv', 'Web Services: %s %s' % ('POST', self.connection._url))
def _get_response_value(self, response_data):
return to_text(response_data.getvalue())
def _response_to_json(self, response_text):
try:
return json.loads(response_text) if response_text else {}
# JSONDecodeError only available on Python 3.5+
except ValueError:
raise ConnectionError('Invalid JSON response: %s' % response_text)

View file

@ -0,0 +1,63 @@
# (c) 2018 Red Hat Inc.
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
import json
from ansible.module_utils.six.moves.urllib.error import HTTPError
from units.compat import mock
from units.compat import unittest
from units.compat.builtins import BUILTINS
from units.compat.mock import mock_open, patch
from ansible.errors import AnsibleConnectionFailure
from ansible.module_utils.connection import ConnectionError
from ansible.module_utils.six import BytesIO, StringIO
from ansible.plugins.httpapi.checkpoint import HttpApi
EXPECTED_BASE_HEADERS = {
'Content-Type': 'application/json'
}
class FakeCheckpointHttpApiPlugin(HttpApi):
def __init__(self, conn):
super(FakeCheckpointHttpApiPlugin, self).__init__(conn)
class TestCheckpointHttpApi(unittest.TestCase):
def setUp(self):
self.connection_mock = mock.Mock()
self.checkpoint_plugin = FakeCheckpointHttpApiPlugin(self.connection_mock)
self.checkpoint_plugin._load_name = 'httpapi'
def test_login_raises_exception_when_username_and_password_are_not_provided(self):
with self.assertRaises(AnsibleConnectionFailure) as res:
self.checkpoint_plugin.login(None, None)
assert 'Username and password are required' in str(res.exception)
def test_login_raises_exception_when_invalid_response(self):
self.connection_mock.send.return_value = self._connection_response(
{'NOSIDKEY': 'NOSIDVALUE'}
)
with self.assertRaises(ConnectionError) as res:
self.checkpoint_plugin.login('foo', 'bar')
assert 'Server returned response without token info during connection authentication' in str(res.exception)
def test_send_request_should_return_error_info_when_http_error_raises(self):
self.connection_mock.send.side_effect = HTTPError('http://testhost.com', 500, '', {},
StringIO('{"errorMessage": "ERROR"}'))
resp = self.checkpoint_plugin.send_request('/test', None)
assert resp == (500, {'errorMessage': 'ERROR'})
@staticmethod
def _connection_response(response, status=200):
response_mock = mock.Mock()
response_mock.getcode.return_value = status
response_text = json.dumps(response) if type(response) is dict else response
response_data = BytesIO(response_text.encode() if response_text else ''.encode())
return response_mock, response_data