1
0
Fork 0
mirror of https://github.com/ansible-collections/community.general.git synced 2024-09-14 20:13:21 +02:00
community.general/lib/ansible/modules/cloud/amazon/data_pipeline.py
Sloane Hertel dbbad16385 [cloud] New module: Add module for managing AWS Datapipelines (cloud/amazon/data_pipeline) (#22878)
* New module for managing AWS Datapipelines

* Supports create/activate/deactivate and deletion
* Handles idempotent creation by embeding the version in the
uniqueId field
* Waits for requested state to be reached, as Botocore doesn't
have waiters yet for datapipelines

* rename module, fix imports, add tags option, improve exit_json results, fix a couple bugs, add a TODO so I don't forget

fix pep8

allow timeout to be used for pipeline creation

make .format syntax uniform

fix pep8

fix exception handling

allow pipeline to be modified, refactor, add some comments, remove unnecessary imports

pipeline activation may not be in the activated state long

remove datapipeline version option

change a loop to a list comprehension

create idempotence by hashing the options given to the module minus the objects (which can be modified)

small bugfix

* data_pipeline unittests

make unittests pep8

fix bug in unittests

* remove exception handling that serves no purpose

* Fix python3 incompatibilities in datapipeline tests and add placebo fixture maybe_sleep for faster tests

Fix python3 incompatibilities in data_pipeline build_unique_id()

Don't delete a pipeline in diff_pipeline() because it's unexpected

Don't use time.time() because it causes an issue with placebo testing

re-recorded tests

fix pep8 in data_pipeline

Remove disable_rollback from tests

Make sure unique identifier is a string

re-record tests

* improve documentation and add another example

* use a placebo fixture instead of redundant code in tests

fix tests for PLACEBO_RECORD=false

* Fix data_pipeline docs

use isinstance instead of type()

fix documentation

* fix documentation

* Remove use of undefined variable from data_pipeline module and fix license

* fix copyright header
2017-08-03 15:04:10 -04:00

604 lines
20 KiB
Python

#!/usr/bin/python
#
# Copyright (c) 2017 Ansible Project
#
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
ANSIBLE_METADATA = {'status': ['preview'],
'supported_by': 'community',
'metadata_version': '1.0'}
DOCUMENTATION = '''
---
module: data_pipeline
version_added: "2.4"
author:
- Raghu Udiyar <raghusiddarth@gmail.com> (@raags)
- Sloane Hertel <shertel@redhat.com>
requirements: [ "boto3" ]
short_description: Create and manage AWS Datapipelines
extends_documentation_fragment:
- aws
- ec2
description:
- Create and manage AWS Datapipelines. Creation is not idempotent in AWS,
so the uniqueId is created by hashing the options (minus objects) given to the datapipeline.
The pipeline definition must be in the format given here
U(http://docs.aws.amazon.com/datapipeline/latest/APIReference/API_PutPipelineDefinition.html#API_PutPipelineDefinition_RequestSyntax).
Also operations will wait for a configurable amount
of time to ensure the pipeline is in the requested state.
options:
name:
description:
- The name of the Datapipeline to create/modify/delete.
required: true
description:
description:
- An optional description for the pipeline being created.
default: ''
objects:
description:
- A list of pipeline object definitions, each of which is a dict that takes the keys C(id), C(name) and C(fields).
suboptions:
id:
description:
- The ID of the object.
name:
description:
- The name of the object.
fields:
description:
- A list of dicts that take the keys C(key) and C(stringValue)/C(refValue).
The value is specified as a reference to another object C(refValue) or as a string value C(stringValue)
but not as both.
parameters:
description:
- A list of parameter objects (dicts) in the pipeline definition.
suboptions:
id:
description:
- The ID of the parameter object.
attributes:
description:
- A list of attributes (dicts) of the parameter object. Each attribute takes the keys C(key) and C(stringValue) both
of which are strings.
values:
description:
- A list of parameter values (dicts) in the pipeline definition. Each dict takes the keys C(id) and C(stringValue) both
of which are strings.
timeout:
description:
- Time in seconds to wait for the pipeline to transition to the requested state, fail otherwise.
default: 300
state:
description:
- The requested state of the pipeline.
choices: ['present', 'absent', 'active', 'inactive']
default: present
tags:
description:
- A dict of key:value pair(s) to add to the pipeline.
default: null
'''
EXAMPLES = '''
# Note: These examples do not set authentication details, see the AWS Guide for details.
# Create pipeline
- data_pipeline:
name: test-dp
region: us-west-2
objects: "{{pipelineObjects}}"
parameters: "{{pipelineParameters}}"
values: "{{pipelineValues}}"
tags:
key1: val1
key2: val2
state: present
# Example populating and activating a pipeline that demonstrates two ways of providing pipeline objects
- data_pipeline:
name: test-dp
objects:
- "id": "DefaultSchedule"
"name": "Every 1 day"
"fields":
- "key": "period"
"stringValue": "1 days"
- "key": "type"
"stringValue": "Schedule"
- "key": "startAt"
"stringValue": "FIRST_ACTIVATION_DATE_TIME"
- "id": "Default"
"name": "Default"
"fields": [ { "key": "resourceRole", "stringValue": "my_resource_role" },
{ "key": "role", "stringValue": "DataPipelineDefaultRole" },
{ "key": "pipelineLogUri", "stringValue": "s3://my_s3_log.txt" },
{ "key": "scheduleType", "stringValue": "cron" },
{ "key": "schedule", "refValue": "DefaultSchedule" },
{ "key": "failureAndRerunMode", "stringValue": "CASCADE" } ]
state: active
# Activate pipeline
- data_pipeline:
name: test-dp
region: us-west-2
state: active
# Delete pipeline
- data_pipeline:
name: test-dp
region: us-west-2
state: absent
'''
RETURN = '''
changed:
description: whether the data pipeline has been modified
type: bool
returned: always
sample:
changed: true
result:
description:
- Contains the data pipeline data (data_pipeline) and a return message (msg).
If the data pipeline exists data_pipeline will contain the keys description, name,
pipeline_id, state, tags, and unique_id. If the data pipeline does not exist then
data_pipeline will be an empty dict. The msg describes the status of the operation.
returned: always
type: dict
'''
import hashlib
import traceback
import re
import json
import time
from ansible.module_utils.basic import AnsibleModule
from ansible.module_utils.ec2 import ec2_argument_spec, get_aws_connection_info, boto3_conn, camel_dict_to_snake_dict
from ansible.module_utils._text import to_bytes, to_text
try:
import boto3
from botocore.exceptions import ClientError
HAS_BOTO3 = True
except ImportError:
HAS_BOTO3 = False
DP_ACTIVE_STATES = ['ACTIVE', 'SCHEDULED']
DP_INACTIVE_STATES = ['INACTIVE', 'PENDING', 'FINISHED', 'DELETING']
DP_ACTIVATING_STATE = 'ACTIVATING'
DP_DEACTIVATING_STATE = 'DEACTIVATING'
PIPELINE_DOESNT_EXIST = '^.*Pipeline with id: {0} does not exist$'
class DataPipelineNotFound(Exception):
pass
class TimeOutException(Exception):
pass
def pipeline_id(client, name):
"""Return pipeline id for the given pipeline name
:param object client: boto3 datapipeline client
:param string name: pipeline name
:returns: pipeline id
:raises: DataPipelineNotFound
"""
pipelines = client.list_pipelines()
for dp in pipelines['pipelineIdList']:
if dp['name'] == name:
return dp['id']
else:
raise DataPipelineNotFound
def pipeline_description(client, dp_id):
"""Return pipeline description list
:param object client: boto3 datapipeline client
:returns: pipeline description dictionary
:raises: DataPipelineNotFound
"""
try:
return client.describe_pipelines(pipelineIds=[dp_id])
except ClientError as e:
raise DataPipelineNotFound
def pipeline_field(client, dp_id, field):
"""Return a pipeline field from the pipeline description.
The available fields are listed in describe_pipelines output.
:param object client: boto3 datapipeline client
:param string dp_id: pipeline id
:param string field: pipeline description field
:returns: pipeline field information
"""
dp_description = pipeline_description(client, dp_id)
for field_key in dp_description['pipelineDescriptionList'][0]['fields']:
if field_key['key'] == field:
return field_key['stringValue']
else:
raise KeyError("Field key {0} not found!".format(field))
def run_with_timeout(timeout, func, *func_args, **func_kwargs):
"""Run func with the provided args and kwargs, and wait utill
timeout for truthy return value
:param int timeout: time to wait for status
:param function func: function to run, should return True or False
:param args func_args: function args to pass to func
:param kwargs func_kwargs: function key word args
:returns: True if func returns truthy within timeout
:raises: TimeOutException
"""
for _ in range(timeout // 10):
if func(*func_args, **func_kwargs):
return True
else:
# check every 10s
time.sleep(10)
raise TimeOutException
def check_dp_exists(client, dp_id):
"""Check if datapipeline exists
:param object client: boto3 datapipeline client
:param string dp_id: pipeline id
:returns: True or False
"""
try:
# pipeline_description raises DataPipelineNotFound
if pipeline_description(client, dp_id):
return True
else:
return False
except DataPipelineNotFound:
return False
def check_dp_status(client, dp_id, status):
"""Checks if datapipeline matches states in status list
:param object client: boto3 datapipeline client
:param string dp_id: pipeline id
:param list status: list of states to check against
:returns: True or False
"""
assert isinstance(status, list)
if pipeline_field(client, dp_id, field="@pipelineState") in status:
return True
else:
return False
def pipeline_status_timeout(client, dp_id, status, timeout):
args = (client, dp_id, status)
return run_with_timeout(timeout, check_dp_status, *args)
def pipeline_exists_timeout(client, dp_id, timeout):
args = (client, dp_id)
return run_with_timeout(timeout, check_dp_exists, *args)
def activate_pipeline(client, module):
"""Activates pipeline
"""
dp_name = module.params.get('name')
timeout = module.params.get('timeout')
try:
dp_id = pipeline_id(client, dp_name)
except DataPipelineNotFound:
module.fail_json(msg='Data Pipeline {0} not found'.format(dp_name))
if pipeline_field(client, dp_id, field="@pipelineState") in DP_ACTIVE_STATES:
changed = False
else:
try:
client.activate_pipeline(pipelineId=dp_id)
except ClientError as e:
if e.response["Error"]["Code"] == "InvalidRequestException":
module.fail_json(msg="You need to populate your pipeline before activation.")
try:
pipeline_status_timeout(client, dp_id, status=DP_ACTIVE_STATES,
timeout=timeout)
except TimeOutException:
if pipeline_field(client, dp_id, field="@pipelineState") == "FINISHED":
# activated but completed more rapidly than it was checked
pass
else:
module.fail_json(msg=('Data Pipeline {0} failed to activate '
'within timeout {1} seconds').format(dp_name, timeout))
changed = True
data_pipeline = get_result(client, dp_id)
result = {'data_pipeline': data_pipeline,
'msg': 'Data Pipeline {0} activated.'.format(dp_name)}
return (changed, result)
def deactivate_pipeline(client, module):
"""Deactivates pipeline
"""
dp_name = module.params.get('name')
timeout = module.params.get('timeout')
try:
dp_id = pipeline_id(client, dp_name)
except DataPipelineNotFound:
module.fail_json(msg='Data Pipeline {0} not found'.format(dp_name))
if pipeline_field(client, dp_id, field="@pipelineState") in DP_INACTIVE_STATES:
changed = False
else:
client.deactivate_pipeline(pipelineId=dp_id)
try:
pipeline_status_timeout(client, dp_id, status=DP_INACTIVE_STATES,
timeout=timeout)
except TimeOutException:
module.fail_json(msg=('Data Pipeline {0} failed to deactivate'
'within timeout {1} seconds').format(dp_name, timeout))
changed = True
data_pipeline = get_result(client, dp_id)
result = {'data_pipeline': data_pipeline,
'msg': 'Data Pipeline {0} deactivated.'.format(dp_name)}
return (changed, result)
def _delete_dp_with_check(dp_id, client, timeout):
client.delete_pipeline(pipelineId=dp_id)
try:
pipeline_status_timeout(client=client, dp_id=dp_id, status=[PIPELINE_DOESNT_EXIST], timeout=timeout)
except DataPipelineNotFound:
return True
def delete_pipeline(client, module):
"""Deletes pipeline
"""
dp_name = module.params.get('name')
timeout = module.params.get('timeout')
try:
dp_id = pipeline_id(client, dp_name)
_delete_dp_with_check(dp_id, client, timeout)
changed = True
except DataPipelineNotFound:
changed = False
except TimeOutException:
module.fail_json(msg=('Data Pipeline {0} failed to delete'
'within timeout {1} seconds').format(dp_name, timeout))
result = {'data_pipeline': {},
'msg': 'Data Pipeline {0} deleted'.format(dp_name)}
return (changed, result)
def build_unique_id(module):
data = dict(module.params)
# removing objects from the unique id so we can update objects or populate the pipeline after creation without needing to make a new pipeline
[data.pop(each, None) for each in ('objects', 'timeout')]
json_data = json.dumps(data, sort_keys=True).encode("utf-8")
hashed_data = hashlib.md5(json_data).hexdigest()
return hashed_data
def format_tags(tags):
""" Reformats tags
:param dict tags: dict of data pipeline tags (e.g. {key1: val1, key2: val2, key3: val3})
:returns: list of dicts (e.g. [{key: key1, value: val1}, {key: key2, value: val2}, {key: key3, value: val3}])
"""
return [dict(key=k, value=v) for k, v in tags.items()]
def get_result(client, dp_id):
""" Get the current state of the data pipeline and reformat it to snake_case for exit_json
:param object client: boto3 datapipeline client
:param string dp_id: pipeline id
:returns: reformatted dict of pipeline description
"""
# pipeline_description returns a pipelineDescriptionList of length 1
# dp is a dict with keys "description" (str), "fields" (list), "name" (str), "pipelineId" (str), "tags" (dict)
dp = pipeline_description(client, dp_id)['pipelineDescriptionList'][0]
# Get uniqueId and pipelineState in fields to add to the exit_json result
dp["unique_id"] = pipeline_field(client, dp_id, field="uniqueId")
dp["pipeline_state"] = pipeline_field(client, dp_id, field="@pipelineState")
# Remove fields; can't make a list snake_case and most of the data is redundant
del dp["fields"]
# Note: tags is already formatted fine so we don't need to do anything with it
# Reformat data pipeline and add reformatted fields back
dp = camel_dict_to_snake_dict(dp)
return dp
def diff_pipeline(client, module, objects, unique_id, dp_name):
"""Check if there's another pipeline with the same unique_id and if so, checks if the object needs to be updated
"""
result = {}
changed = False
create_dp = False
# See if there is already a pipeline with the same unique_id
unique_id = build_unique_id(module)
try:
dp_id = pipeline_id(client, dp_name)
dp_unique_id = to_text(pipeline_field(client, dp_id, field="uniqueId"))
if dp_unique_id != unique_id:
# A change is expected but not determined. Updated to a bool in create_pipeline().
changed = "NEW_VERSION"
create_dp = True
# Unique ids are the same - check if pipeline needs modification
else:
dp_objects = client.get_pipeline_definition(pipelineId=dp_id)['pipelineObjects']
# Definition needs to be updated
if dp_objects != objects:
changed, msg = define_pipeline(client, module, objects, dp_id)
# No changes
else:
msg = 'Data Pipeline {0} is present'.format(dp_name)
data_pipeline = get_result(client, dp_id)
result = {'data_pipeline': data_pipeline,
'msg': msg}
except DataPipelineNotFound:
create_dp = True
return create_dp, changed, result
def define_pipeline(client, module, objects, dp_id):
"""Puts pipeline definition
"""
dp_name = module.params.get('name')
if pipeline_field(client, dp_id, field="@pipelineState") == "FINISHED":
msg = 'Data Pipeline {0} is unable to be updated while in state FINISHED.'.format(dp_name)
changed = False
elif objects:
parameters = module.params.get('parameters')
values = module.params.get('values')
try:
client.put_pipeline_definition(pipelineId=dp_id,
pipelineObjects=objects,
parameterObjects=parameters,
parameterValues=values)
msg = 'Data Pipeline {0} has been updated.'.format(dp_name)
changed = True
except ClientError as e:
module.fail_json(msg="Failed to put the definition for pipeline {0}. Check that string/reference fields"
"are not empty and that the number of objects in the pipeline does not exceed maximum allowed"
"objects".format(dp_name), exception=traceback.format_exc())
else:
changed = False
msg = ""
return changed, msg
def create_pipeline(client, module):
"""Creates datapipeline. Uses uniqueId to achieve idempotency.
"""
dp_name = module.params.get('name')
objects = module.params.get('objects', None)
description = module.params.get('description', '')
tags = module.params.get('tags')
timeout = module.params.get('timeout')
unique_id = build_unique_id(module)
create_dp, changed, result = diff_pipeline(client, module, objects, unique_id, dp_name)
if changed == "NEW_VERSION":
# delete old version
changed, _ = delete_pipeline(client, module)
# There isn't a pipeline or it has different parameters than the pipeline in existence.
if create_dp:
# Make pipeline
try:
tags = format_tags(tags)
dp = client.create_pipeline(name=dp_name,
uniqueId=unique_id,
description=description,
tags=tags)
dp_id = dp['pipelineId']
pipeline_exists_timeout(client, dp_id, timeout)
except ClientError as e:
module.fail_json(msg="Failed to create the data pipeline {0}.".format(dp_name), exception=traceback.format_exc())
except TimeOutException:
module.fail_json(msg=('Data Pipeline {0} failed to create'
'within timeout {1} seconds').format(dp_name, timeout))
# Put pipeline definition
_, msg = define_pipeline(client, module, objects, dp_id)
changed = True
data_pipeline = get_result(client, dp_id)
result = {'data_pipeline': data_pipeline,
'msg': 'Data Pipeline {0} created.'.format(dp_name) + msg}
return (changed, result)
def main():
argument_spec = ec2_argument_spec()
argument_spec.update(
dict(
name=dict(required=True),
version=dict(required=False),
description=dict(required=False, default=''),
objects=dict(required=False, type='list', default=[]),
parameters=dict(required=False, type='list', default=[]),
timeout=dict(required=False, type='int', default=300),
state=dict(default='present', choices=['present', 'absent',
'active', 'inactive']),
tags=dict(required=False, type='dict')
)
)
module = AnsibleModule(argument_spec, supports_check_mode=False)
if not HAS_BOTO3:
module.fail_json(msg='boto3 is required for the datapipeline module!')
try:
region, ec2_url, aws_connect_kwargs = get_aws_connection_info(module, boto3=True)
if not region:
module.fail_json(msg="Region must be specified as a parameter, in EC2_REGION or AWS_REGION environment variables or in boto configuration file")
client = boto3_conn(module, conn_type='client',
resource='datapipeline', region=region,
endpoint=ec2_url, **aws_connect_kwargs)
except ClientError as e:
module.fail_json(msg="Can't authorize connection - " + str(e))
state = module.params.get('state')
if state == 'present':
changed, result = create_pipeline(client, module)
elif state == 'absent':
changed, result = delete_pipeline(client, module)
elif state == 'active':
changed, result = activate_pipeline(client, module)
elif state == 'inactive':
changed, result = deactivate_pipeline(client, module)
module.exit_json(result=result, changed=changed)
if __name__ == '__main__':
main()