diff --git a/lib/ansible/modules/cloud/amazon/kinesis_stream.py b/lib/ansible/modules/cloud/amazon/kinesis_stream.py index 693d0444b9..4753c4a91d 100644 --- a/lib/ansible/modules/cloud/amazon/kinesis_stream.py +++ b/lib/ansible/modules/cloud/amazon/kinesis_stream.py @@ -36,8 +36,7 @@ options: required: true shards: description: - - "The number of shards you want to have with this stream. This can not - be modified after being created." + - "The number of shards you want to have with this stream." - "This is required when state == present" required: false default: None @@ -334,9 +333,15 @@ def find_stream(client, stream_name, check_mode=False): shards.extend(results.pop('Shards')) has_more_shards = results['HasMoreShards'] results['Shards'] = shards + num_closed_shards = len([s for s in shards if 'EndingSequenceNumber' in s['SequenceNumberRange']]) + results['OpenShardsCount'] = len(shards) - num_closed_shards + results['ClosedShardsCount'] = num_closed_shards results['ShardsCount'] = len(shards) else: results = { + 'OpenShardsCount': 5, + 'ClosedShardsCount': 0, + 'ShardsCount': 5, 'HasMoreShards': True, 'RetentionPeriodHours': 24, 'StreamName': stream_name, @@ -634,10 +639,10 @@ def stream_action(client, stream_name, shard_count=1, action='create', def retention_action(client, stream_name, retention_period=24, action='increase', check_mode=False): - """Increase or Decreaste the retention of messages in the Kinesis stream. + """Increase or Decrease the retention of messages in the Kinesis stream. Args: client (botocore.client.EC2): Boto3 client. - stream_name (str): The + stream_name (str): The name of the kinesis stream. Kwargs: retention_period (int): This is how long messages will be kept before @@ -652,7 +657,7 @@ def retention_action(client, stream_name, retention_period=24, >>> client = boto3.client('kinesis') >>> stream_name = 'test-stream' >>> retention_period = 48 - >>> stream_action(client, stream_name, retention_period, action='create') + >>> retention_action(client, stream_name, retention_period, action='increase') Returns: Tuple (bool, str) @@ -696,7 +701,44 @@ def retention_action(client, stream_name, retention_period=24, return success, err_msg -def update(client, current_stream, stream_name, retention_period=None, +def update_shard_count(client, stream_name, number_of_shards=1, check_mode=False): + """Increase or Decrease the number of shards in the Kinesis stream. + Args: + client (botocore.client.EC2): Boto3 client. + stream_name (str): The name of the kinesis stream. + + Kwargs: + number_of_shards (int): Number of shards this stream will use. + default=1 + check_mode (bool): This will pass DryRun as one of the parameters to the aws api. + default=False + + Basic Usage: + >>> client = boto3.client('kinesis') + >>> stream_name = 'test-stream' + >>> number_of_shards = 3 + >>> update_shard_count(client, stream_name, number_of_shards) + + Returns: + Tuple (bool, str) + """ + success = True + err_msg = '' + params = { + 'StreamName': stream_name, + 'ScalingType': 'UNIFORM_SCALING' + } + if not check_mode: + params['TargetShardCount'] = number_of_shards + try: + client.update_shard_count(**params) + except botocore.exceptions.ClientError as e: + return False, str(e) + + return success, err_msg + + +def update(client, current_stream, stream_name, number_of_shards=1, retention_period=None, tags=None, wait=False, wait_timeout=300, check_mode=False): """Update an Amazon Kinesis Stream. Args: @@ -704,6 +746,8 @@ def update(client, current_stream, stream_name, retention_period=None, stream_name (str): The name of the kinesis stream. Kwargs: + number_of_shards (int): Number of shards this stream will use. + default=1 retention_period (int): This is how long messages will be kept before they are discarded. This can not be less than 24 hours. tags (dict): The tags you want applied. @@ -717,6 +761,7 @@ def update(client, current_stream, stream_name, retention_period=None, Basic Usage: >>> client = boto3.client('kinesis') >>> current_stream = { + 'ShardCount': 3, 'HasMoreShards': True, 'RetentionPeriodHours': 24, 'StreamName': 'test-stream', @@ -725,8 +770,9 @@ def update(client, current_stream, stream_name, retention_period=None, } >>> stream_name = 'test-stream' >>> retention_period = 48 - >>> stream_action(client, current_stream, stream_name, - retention_period, action='create' ) + >>> number_of_shards = 10 + >>> update(client, current_stream, stream_name, + number_of_shards, retention_period ) Returns: Tuple (bool, bool, str) @@ -805,6 +851,36 @@ def update(client, current_stream, stream_name, retention_period=None, ) return success, changed, err_msg + if current_stream['OpenShardsCount'] != number_of_shards: + success, err_msg = ( + update_shard_count(client, stream_name, number_of_shards, check_mode=check_mode) + ) + + if not success: + return success, changed, err_msg + + changed = True + + if wait: + wait_success, wait_msg, current_stream = ( + wait_for_status( + client, stream_name, 'ACTIVE', wait_timeout, + check_mode=check_mode + ) + ) + if not wait_success: + return wait_success, changed, wait_msg + else: + stream_found, stream_msg, current_stream = ( + find_stream(client, stream_name, check_mode=check_mode) + ) + if stream_found and current_stream['StreamStatus'] != 'ACTIVE': + err_msg = ( + 'Number of shards for {0} is in the process of updating' + .format(stream_name) + ) + return success, changed, err_msg + if tags: _, _, err_msg = ( update_tags(client, stream_name, tags, check_mode=check_mode) @@ -863,6 +939,7 @@ def create_stream(client, stream_name, number_of_shards=1, retention_period=None stream_found, stream_msg, current_stream = ( find_stream(client, stream_name, check_mode=check_mode) ) + if stream_found and current_stream.get('StreamStatus') == 'DELETING' and wait: wait_success, wait_msg, current_stream = ( wait_for_status( @@ -878,8 +955,8 @@ def create_stream(client, stream_name, number_of_shards=1, retention_period=None if stream_found and current_stream.get('StreamStatus') != 'DELETING': success, changed, err_msg = update( - client, current_stream, stream_name, retention_period, tags, - wait, wait_timeout, check_mode=check_mode + client, current_stream, stream_name, number_of_shards, + retention_period, tags, wait, wait_timeout, check_mode=check_mode ) else: create_success, create_msg = ( diff --git a/test/units/modules/cloud/amazon/test_kinesis_stream.py b/test/units/modules/cloud/amazon/test_kinesis_stream.py index 5c908caf23..bc6e08a7d1 100644 --- a/test/units/modules/cloud/amazon/test_kinesis_stream.py +++ b/test/units/modules/cloud/amazon/test_kinesis_stream.py @@ -98,6 +98,9 @@ class AnsibleKinesisStreamFunctions(unittest.TestCase): kinesis_stream.find_stream(client, 'test', check_mode=True) ) should_return = { + 'OpenShardsCount': 5, + 'ClosedShardsCount': 0, + 'ShardsCount': 5, 'HasMoreShards': True, 'RetentionPeriodHours': 24, 'StreamName': 'test', @@ -115,6 +118,9 @@ class AnsibleKinesisStreamFunctions(unittest.TestCase): ) ) should_return = { + 'OpenShardsCount': 5, + 'ClosedShardsCount': 0, + 'ShardsCount': 5, 'HasMoreShards': True, 'RetentionPeriodHours': 24, 'StreamName': 'test', @@ -230,9 +236,21 @@ class AnsibleKinesisStreamFunctions(unittest.TestCase): ) self.assertFalse(success) + def test_update_shard_count(self): + client = boto3.client('kinesis', region_name=aws_region) + success, err_msg = ( + kinesis_stream.update_shard_count( + client, 'test', 5, check_mode=True + ) + ) + self.assertTrue(success) + def test_update(self): client = boto3.client('kinesis', region_name=aws_region) current_stream = { + 'OpenShardsCount': 5, + 'ClosedShardsCount': 0, + 'ShardsCount': 1, 'HasMoreShards': True, 'RetentionPeriodHours': 24, 'StreamName': 'test', @@ -245,7 +263,7 @@ class AnsibleKinesisStreamFunctions(unittest.TestCase): } success, changed, err_msg = ( kinesis_stream.update( - client, current_stream, 'test', retention_period=48, + client, current_stream, 'test', number_of_shards=2, retention_period=48, tags=tags, check_mode=True ) ) @@ -266,6 +284,9 @@ class AnsibleKinesisStreamFunctions(unittest.TestCase): ) ) should_return = { + 'open_shards_count': 5, + 'closed_shards_count': 0, + 'shards_count': 5, 'has_more_shards': True, 'retention_period_hours': 24, 'stream_name': 'test',