1
0
Fork 0
mirror of https://github.com/ansible-collections/community.general.git synced 2024-09-14 20:13:21 +02:00

Add Bitbucket Pipelines SSH key pair module (#54241)

* Add module to manage Bitbucket Pipelines key pairs

* Rename module bitbucket_pipelines_ssh_key to bitbucket_pipeline_key_pair

* Update `version_added` documentation field

* Cosmetic changes

* Apply suggestions from code review

Co-Authored-By: catcombo <evgeniy.krysanov@gmail.com>
This commit is contained in:
Evgeniy Krysanov 2019-03-29 16:13:59 +03:00 committed by Dag Wieers
parent c914df354b
commit a6a73594ec
2 changed files with 405 additions and 0 deletions

View file

@ -0,0 +1,213 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# Copyright: (c) 2019, Evgeniy Krysanov <evgeniy.krysanov@gmail.com>
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import absolute_import, division, print_function
__metaclass__ = type
ANSIBLE_METADATA = {
'metadata_version': '1.1',
'status': ['preview'],
'supported_by': 'community',
}
DOCUMENTATION = r'''
---
module: bitbucket_pipeline_key_pair
short_description: Manages Bitbucket pipeline SSH key pair
description:
- Manages Bitbucket pipeline SSH key pair.
version_added: "2.8"
author:
- Evgeniy Krysanov (@catcombo)
options:
client_id:
description:
- OAuth consumer key.
- If not set the environment variable C(BITBUCKET_CLIENT_ID) will be used.
type: str
client_secret:
description:
- OAuth consumer secret.
- If not set the environment variable C(BITBUCKET_CLIENT_SECRET) will be used.
type: str
repository:
description:
- The repository name.
type: str
required: true
username:
description:
- The repository owner.
type: str
required: true
public_key:
description:
- The public key.
type: str
private_key:
description:
- The private key.
type: str
state:
description:
- Indicates desired state of the key pair.
type: str
required: true
choices: [ absent, present ]
notes:
- Bitbucket OAuth consumer key and secret can be obtained from Bitbucket profile -> Settings -> Access Management -> OAuth.
- Check mode is supported.
'''
EXAMPLES = r'''
- name: Create or update SSH key pair
bitbucket_pipeline_key_pair:
repository: 'bitbucket-repo'
username: bitbucket_username
public_key: '{{lookup("file", "bitbucket.pub") }}'
private_key: '{{lookup("file", "bitbucket") }}'
state: present
- name: Remove SSH key pair
bitbucket_pipeline_key_pair:
repository: bitbucket-repo
username: bitbucket_username
state: absent
'''
RETURN = r''' # '''
from ansible.module_utils.basic import AnsibleModule
from ansible.module_utils.source_control.bitbucket import BitbucketHelper
error_messages = {
'invalid_params': 'Account, repository or SSH key pair was not found',
'required_keys': '`public_key` and `private_key` are required when the `state` is `present`',
}
BITBUCKET_API_ENDPOINTS = {
'ssh-key-pair': '%s/2.0/repositories/{username}/{repo_slug}/pipelines_config/ssh/key_pair' % BitbucketHelper.BITBUCKET_API_URL,
}
def get_existing_ssh_key_pair(module, bitbucket):
"""
Retrieves an existing ssh key pair from repository
specified in module param `repository`
:param module: instance of the :class:`AnsibleModule`
:param bitbucket: instance of the :class:`BitbucketHelper`
:return: existing key pair or None if not found
:rtype: dict or None
Return example::
{
"public_key": "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQ...2E8HAeT",
"type": "pipeline_ssh_key_pair"
}
"""
api_url = BITBUCKET_API_ENDPOINTS['ssh-key-pair'].format(
username=module.params['username'],
repo_slug=module.params['repository'],
)
info, content = bitbucket.request(
api_url=api_url,
method='GET',
)
if info['status'] == 404:
# Account, repository or SSH key pair was not found.
return None
return content
def update_ssh_key_pair(module, bitbucket):
info, content = bitbucket.request(
api_url=BITBUCKET_API_ENDPOINTS['ssh-key-pair'].format(
username=module.params['username'],
repo_slug=module.params['repository'],
),
method='PUT',
data={
'private_key': module.params['private_key'],
'public_key': module.params['public_key'],
},
)
if info['status'] == 404:
module.fail_json(msg=error_messages['invalid_params'])
if info['status'] != 200:
module.fail_json(msg='Failed to create or update pipeline ssh key pair : {0}'.format(info))
def delete_ssh_key_pair(module, bitbucket):
info, content = bitbucket.request(
api_url=BITBUCKET_API_ENDPOINTS['ssh-key-pair'].format(
username=module.params['username'],
repo_slug=module.params['repository'],
),
method='DELETE',
)
if info['status'] == 404:
module.fail_json(msg=error_messages['invalid_params'])
if info['status'] != 204:
module.fail_json(msg='Failed to delete pipeline ssh key pair: {0}'.format(info))
def main():
argument_spec = BitbucketHelper.bitbucket_argument_spec()
argument_spec.update(
repository=dict(type='str', required=True),
username=dict(type='str', required=True),
public_key=dict(type='str'),
private_key=dict(type='str', no_log=True),
state=dict(type='str', choices=['present', 'absent'], required=True),
)
module = AnsibleModule(
argument_spec=argument_spec,
supports_check_mode=True,
)
bitbucket = BitbucketHelper(module)
state = module.params['state']
public_key = module.params['public_key']
private_key = module.params['private_key']
# Check parameters
if ((public_key is None) or (private_key is None)) and (state == 'present'):
module.fail_json(msg=error_messages['required_keys'])
# Retrieve access token for authorized API requests
bitbucket.fetch_access_token()
# Retrieve existing ssh key
key_pair = get_existing_ssh_key_pair(module, bitbucket)
changed = False
# Create or update key pair
if (not key_pair or (key_pair.get('public_key') != public_key)) and (state == 'present'):
if not module.check_mode:
update_ssh_key_pair(module, bitbucket)
changed = True
# Delete key pair
elif key_pair and (state == 'absent'):
if not module.check_mode:
delete_ssh_key_pair(module, bitbucket)
changed = True
module.exit_json(changed=changed)
if __name__ == '__main__':
main()

View file

@ -0,0 +1,192 @@
from ansible.module_utils.source_control.bitbucket import BitbucketHelper
from ansible.modules.source_control.bitbucket import bitbucket_pipeline_key_pair
from units.compat import unittest
from units.compat.mock import patch
from units.modules.utils import AnsibleFailJson, AnsibleExitJson, ModuleTestCase, set_module_args
class TestBucketPipelineKeyPairModule(ModuleTestCase):
def setUp(self):
super(TestBucketPipelineKeyPairModule, self).setUp()
self.module = bitbucket_pipeline_key_pair
def test_missing_keys_with_present_state(self):
with self.assertRaises(AnsibleFailJson) as exec_info:
set_module_args({
'client_id': 'ABC',
'client_secret': 'XXX',
'username': 'name',
'repository': 'repo',
'state': 'present',
})
self.module.main()
self.assertEqual(exec_info.exception.args[0]['msg'], self.module.error_messages['required_keys'])
@patch.object(BitbucketHelper, 'fetch_access_token', return_value='token')
@patch.object(bitbucket_pipeline_key_pair, 'get_existing_ssh_key_pair', return_value=None)
def test_create_keys(self, *args):
with patch.object(self.module, 'update_ssh_key_pair') as update_ssh_key_pair_mock:
with self.assertRaises(AnsibleExitJson) as exec_info:
set_module_args({
'client_id': 'ABC',
'client_secret': 'XXX',
'username': 'name',
'repository': 'repo',
'public_key': 'public',
'private_key': 'PRIVATE',
'state': 'present',
})
self.module.main()
self.assertEqual(update_ssh_key_pair_mock.call_count, 1)
self.assertEqual(exec_info.exception.args[0]['changed'], True)
@patch.object(BitbucketHelper, 'fetch_access_token', return_value='token')
@patch.object(bitbucket_pipeline_key_pair, 'get_existing_ssh_key_pair', return_value=None)
def test_create_keys_check_mode(self, *args):
with patch.object(self.module, 'update_ssh_key_pair') as update_ssh_key_pair_mock:
with self.assertRaises(AnsibleExitJson) as exec_info:
set_module_args({
'client_id': 'ABC',
'client_secret': 'XXX',
'username': 'name',
'repository': 'repo',
'public_key': 'public',
'private_key': 'PRIVATE',
'state': 'present',
'_ansible_check_mode': True,
})
self.module.main()
self.assertEqual(update_ssh_key_pair_mock.call_count, 0)
self.assertEqual(exec_info.exception.args[0]['changed'], True)
@patch.object(BitbucketHelper, 'fetch_access_token', return_value='token')
@patch.object(bitbucket_pipeline_key_pair, 'get_existing_ssh_key_pair', return_value={
'public_key': 'unknown',
'type': 'pipeline_ssh_key_pair',
})
def test_update_keys(self, *args):
with patch.object(self.module, 'update_ssh_key_pair') as update_ssh_key_pair_mock:
with self.assertRaises(AnsibleExitJson) as exec_info:
set_module_args({
'client_id': 'ABC',
'client_secret': 'XXX',
'username': 'name',
'repository': 'repo',
'public_key': 'public',
'private_key': 'PRIVATE',
'state': 'present',
})
self.module.main()
self.assertEqual(update_ssh_key_pair_mock.call_count, 1)
self.assertEqual(exec_info.exception.args[0]['changed'], True)
@patch.object(BitbucketHelper, 'fetch_access_token', return_value='token')
@patch.object(bitbucket_pipeline_key_pair, 'get_existing_ssh_key_pair', return_value={
'public_key': 'public',
'type': 'pipeline_ssh_key_pair',
})
def test_dont_update_same_key(self, *args):
with patch.object(self.module, 'update_ssh_key_pair') as update_ssh_key_pair_mock:
with self.assertRaises(AnsibleExitJson) as exec_info:
set_module_args({
'client_id': 'ABC',
'client_secret': 'XXX',
'username': 'name',
'repository': 'repo',
'public_key': 'public',
'private_key': 'PRIVATE',
'state': 'present',
})
self.module.main()
self.assertEqual(update_ssh_key_pair_mock.call_count, 0)
self.assertEqual(exec_info.exception.args[0]['changed'], False)
@patch.object(BitbucketHelper, 'fetch_access_token', return_value='token')
@patch.object(bitbucket_pipeline_key_pair, 'get_existing_ssh_key_pair', return_value={
'public_key': 'unknown',
'type': 'pipeline_ssh_key_pair',
})
def test_update_keys_check_mode(self, *args):
with patch.object(self.module, 'update_ssh_key_pair') as update_ssh_key_pair_mock:
with self.assertRaises(AnsibleExitJson) as exec_info:
set_module_args({
'client_id': 'ABC',
'client_secret': 'XXX',
'username': 'name',
'repository': 'repo',
'public_key': 'public',
'private_key': 'PRIVATE',
'state': 'present',
'_ansible_check_mode': True,
})
self.module.main()
self.assertEqual(update_ssh_key_pair_mock.call_count, 0)
self.assertEqual(exec_info.exception.args[0]['changed'], True)
@patch.object(BitbucketHelper, 'fetch_access_token', return_value='token')
@patch.object(bitbucket_pipeline_key_pair, 'get_existing_ssh_key_pair', return_value={
'public_key': 'public',
'type': 'pipeline_ssh_key_pair',
})
def test_delete_keys(self, *args):
with patch.object(self.module, 'delete_ssh_key_pair') as delete_ssh_key_pair_mock:
with self.assertRaises(AnsibleExitJson) as exec_info:
set_module_args({
'client_id': 'ABC',
'client_secret': 'XXX',
'username': 'name',
'repository': 'repo',
'state': 'absent',
})
self.module.main()
self.assertEqual(delete_ssh_key_pair_mock.call_count, 1)
self.assertEqual(exec_info.exception.args[0]['changed'], True)
@patch.object(BitbucketHelper, 'fetch_access_token', return_value='token')
@patch.object(bitbucket_pipeline_key_pair, 'get_existing_ssh_key_pair', return_value=None)
def test_delete_absent_keys(self, *args):
with patch.object(self.module, 'delete_ssh_key_pair') as delete_ssh_key_pair_mock:
with self.assertRaises(AnsibleExitJson) as exec_info:
set_module_args({
'client_id': 'ABC',
'client_secret': 'XXX',
'username': 'name',
'repository': 'repo',
'state': 'absent',
})
self.module.main()
self.assertEqual(delete_ssh_key_pair_mock.call_count, 0)
self.assertEqual(exec_info.exception.args[0]['changed'], False)
@patch.object(BitbucketHelper, 'fetch_access_token', return_value='token')
@patch.object(bitbucket_pipeline_key_pair, 'get_existing_ssh_key_pair', return_value={
'public_key': 'public',
'type': 'pipeline_ssh_key_pair',
})
def test_delete_keys_check_mode(self, *args):
with patch.object(self.module, 'delete_ssh_key_pair') as delete_ssh_key_pair_mock:
with self.assertRaises(AnsibleExitJson) as exec_info:
set_module_args({
'client_id': 'ABC',
'client_secret': 'XXX',
'username': 'name',
'repository': 'repo',
'state': 'absent',
'_ansible_check_mode': True,
})
self.module.main()
self.assertEqual(delete_ssh_key_pair_mock.call_count, 0)
self.assertEqual(exec_info.exception.args[0]['changed'], True)
if __name__ == '__main__':
unittest.main()