diff --git a/lib/ansible/module_utils/rabbitmq.py b/lib/ansible/module_utils/rabbitmq.py index 9d4f6a5869..21f784f7e1 100644 --- a/lib/ansible/module_utils/rabbitmq.py +++ b/lib/ansible/module_utils/rabbitmq.py @@ -1,9 +1,27 @@ # -*- coding: utf-8 -*- # # Copyright: (c) 2016, Jorge Rodriguez +# Copyright: (c) 2018, John Imison # # GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) +from __future__ import absolute_import, division, print_function +__metaclass__ = type + +from ansible.module_utils._text import to_native, to_text +from ansible.module_utils.basic import env_fallback +from mimetypes import MimeTypes + +import json +import os + +try: + import pika + from pika import spec + HAS_PIKA = True +except ImportError: + HAS_PIKA = False + def rabbitmq_argument_spec(): return dict( @@ -17,3 +35,161 @@ def rabbitmq_argument_spec(): key=dict(required=False, type='path', default=None), vhost=dict(default='/', type='str'), ) + + +# notification/rabbitmq_basic_publish.py +class RabbitClient(): + def __init__(self, module): + self.module = module + self.params = module.params + self.check_required_library() + self.check_host_params() + self.url = self.params['url'] + self.proto = self.params['proto'] + self.username = self.params['username'] + self.password = self.params['password'] + self.host = self.params['host'] + self.port = self.params['port'] + self.vhost = self.params['vhost'] + self.queue = self.params['queue'] + self.headers = self.params['headers'] + + if self.host is not None: + self.build_url() + + self.connect_to_rabbitmq() + + def check_required_library(self): + if not HAS_PIKA: + self.module.fail_json(msg="Unable to find 'pika' Python library which is required.") + + def check_host_params(self): + # Fail if url is specified and other conflicting parameters have been specified + if self.params['url'] is not None and any(self.params[k] is not None for k in ['proto', 'host', 'port', 'password', 'username', 'vhost']): + self.module.fail_json(msg="url and proto, host, port, vhost, username or password cannot be specified at the same time.") + + # Fail if url not specified and there is a missing parameter to build the url + if self.params['url'] is None and any(self.params[k] is None for k in ['proto', 'host', 'port', 'password', 'username', 'vhost']): + self.module.fail_json(msg="Connection parameters must be passed via url, or, proto, host, port, vhost, username or password.") + + @staticmethod + def rabbitmq_argument_spec(): + return dict( + url=dict(default=None, type='str'), + proto=dict(default=None, type='str', choices=['amqps', 'amqp']), + host=dict(default=None, type='str'), + port=dict(default=None, type='int'), + username=dict(default=None, type='str'), + password=dict(default=None, type='str', no_log=True), + vhost=dict(default=None, type='str'), + queue=dict(default=None, type='str') + ) + + ''' Consider some file size limits here ''' + def _read_file(self, path): + try: + fh = open(path, "rb").read() + except IOError as e: + self.module.fail_json(msg="Unable to open file %s: %s" % (path, to_native(e))) + + return fh + + @staticmethod + def _check_file_mime_type(path): + mime = MimeTypes() + return mime.guess_type(path) + + def build_url(self): + self.url = '{0}://{1}:{2}@{3}:{4}/{5}'.format(self.proto, + self.username, + self.password, + self.host, + self.port, + self.vhost) + + def connect_to_rabbitmq(self): + """ + Function to connect to rabbitmq using username and password + """ + try: + parameters = pika.URLParameters(self.url) + except Exception as e: + self.module.fail_json(msg="URL malformed: %s" % to_native(e)) + + try: + self.connection = pika.BlockingConnection(parameters) + except Exception as e: + self.module.fail_json(msg="Connection issue: %s" % to_native(e)) + + try: + self.conn_channel = self.connection.channel() + except pika.exceptions.AMQPChannelError as e: + self.close_connection() + self.module.fail_json(msg="Channel issue: %s" % to_native(e)) + + def close_connection(self): + try: + self.connection.close() + except pika.exceptions.AMQPConnectionError: + pass + + def basic_publish(self): + self.content_type = self.params.get("content_type") + + if self.params.get("body") is not None: + args = dict( + body=self.params.get("body"), + exchange=self.params.get("exchange"), + routing_key=self.params.get("routing_key"), + properties=pika.BasicProperties(content_type=self.content_type, delivery_mode=1, headers=self.headers)) + + # If src (file) is defined and content_type is left as default, do a mime lookup on the file + if self.params.get("src") is not None and self.content_type == 'text/plain': + self.content_type = RabbitClient._check_file_mime_type(self.params.get("src"))[0] + self.headers.update( + filename=os.path.basename(self.params.get("src")) + ) + + args = dict( + body=self._read_file(self.params.get("src")), + exchange=self.params.get("exchange"), + routing_key=self.params.get("routing_key"), + properties=pika.BasicProperties(content_type=self.content_type, + delivery_mode=1, + headers=self.headers + )) + elif self.params.get("src") is not None: + args = dict( + body=self._read_file(self.params.get("src")), + exchange=self.params.get("exchange"), + routing_key=self.params.get("routing_key"), + properties=pika.BasicProperties(content_type=self.content_type, + delivery_mode=1, + headers=self.headers + )) + + try: + # If queue is not defined, RabbitMQ will return the queue name of the automatically generated queue. + if self.queue is None: + result = self.conn_channel.queue_declare(durable=self.params.get("durable"), + exclusive=self.params.get("exclusive"), + auto_delete=self.params.get("auto_delete")) + self.conn_channel.confirm_delivery() + self.queue = result.method.queue + else: + self.conn_channel.queue_declare(queue=self.queue, + durable=self.params.get("durable"), + exclusive=self.params.get("exclusive"), + auto_delete=self.params.get("auto_delete")) + self.conn_channel.confirm_delivery() + except Exception as e: + self.module.fail_json(msg="Queue declare issue: %s" % to_native(e)) + + # https://github.com/ansible/ansible/blob/devel/lib/ansible/module_utils/cloudstack.py#L150 + if args['routing_key'] is None: + args['routing_key'] = self.queue + + if args['exchange'] is None: + args['exchange'] = '' + + return self.conn_channel.basic_publish(**args) diff --git a/lib/ansible/modules/notification/rabbitmq_publish.py b/lib/ansible/modules/notification/rabbitmq_publish.py new file mode 100644 index 0000000000..5a4aaa6605 --- /dev/null +++ b/lib/ansible/modules/notification/rabbitmq_publish.py @@ -0,0 +1,179 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- + +# (c) 2018, John Imison +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import absolute_import, division, print_function +__metaclass__ = type + + +ANSIBLE_METADATA = {'metadata_version': '1.1', + 'status': ['preview'], + 'supported_by': 'community'} + + +DOCUMENTATION = ''' +--- +module: rabbitmq_publish +short_description: Publish a message to a RabbitMQ queue. +version_added: "2.8" +description: + - Publish a message on a RabbitMQ queue using a blocking connection. +options: + url: + description: + - An URL connection string to connect to the RabbitMQ server. + - I(url) and I(host)/I(port)/I(user)/I(pass)/I(vhost) are mutually exclusive, use either or but not both. + proto: + description: + - The protocol to use. + choices: [amqps, amqp] + host: + description: + - The RabbitMQ server hostname or IP. + port: + description: + - The RabbitMQ server port. + username: + description: + - The RabbitMQ username. + password: + description: + - The RabbitMQ password. + vhost: + description: + - The virtual host to target. + - If default vhost is required, use C('%2F'). + queue: + description: + - The queue to publish a message to. If no queue is specified, RabbitMQ will return a random queue name. + exchange: + description: + - The exchange to publish a message to. + routing_key: + description: + - The routing key. + body: + description: + - The body of the message. + - A C(body) cannot be provided if a C(src) is specified. + src: + description: + - A file to upload to the queue. Automatic mime type detection is attempted if content_type is not defined (left as default). + - A C(src) cannot be provided if a C(body) is specified. + - The filename is added to the headers of the posted message to RabbitMQ. Key being the C(filename), value is the filename. + aliases: ['file'] + content_type: + description: + - The content type of the body. + default: text/plain + durable: + description: + - Set the queue to be durable. + default: False + type: bool + exclusive: + description: + - Set the queue to be exclusive. + default: False + type: bool + auto_delete: + description: + - Set the queue to auto delete. + default: False + type: bool + headers: + description: + - A dictionary of headers to post with the message. + default: {} + type: dict + + +requirements: [ pika ] +notes: + - This module requires the pika python library U(https://pika.readthedocs.io/). + - Pika is a pure-Python implementation of the AMQP 0-9-1 protocol that tries to stay fairly independent of the underlying network support library. + - This plugin is tested against RabbitMQ. Other AMQP 0.9.1 protocol based servers may work but not tested/guaranteed. +author: "John Imison (@Im0)" +''' + +EXAMPLES = ''' +- name: Publish a message to a queue with headers + rabbitmq_publish: + url: "amqp://guest:guest@192.168.0.32:5672/%2F" + queue: 'test' + body: "Hello world from ansible module rabbitmq_publish" + content_type: "text/plain" + headers: + myHeader: myHeaderValue + + +- name: Publish a file to a queue + rabbitmq_publish: + url: "amqp://guest:guest@192.168.0.32:5672/%2F" + queue: 'images' + file: 'path/to/logo.gif' + +- name: RabbitMQ auto generated queue + rabbitmq_publish: + url: "amqp://guest:guest@192.168.0.32:5672/%2F" + body: "Hello world random queue from ansible module rabbitmq_publish" + content_type: "text/plain" +''' + +RETURN = ''' +result: + description: + - Contains the status I(msg), content type I(content_type) and the queue name I(queue). + returned: success + type: dict + sample: | + 'result': { 'content_type': 'text/plain', 'msg': 'Successfully published to queue test', 'queue': 'test' } +''' + +try: + import pika + HAS_PIKA = True +except ImportError: + HAS_PIKA = False + + +from ansible.module_utils.basic import AnsibleModule +from ansible.module_utils._text import to_native, to_text +from ansible.module_utils.rabbitmq import RabbitClient + + +def main(): + argument_spec = RabbitClient.rabbitmq_argument_spec() + argument_spec.update( + exchange=dict(type='str', default=''), + routing_key=dict(type='str', required=False), + body=dict(type='str', required=False), + src=dict(aliases=['file'], type='path', required=False), + content_type=dict(default="text/plain", type='str'), + durable=dict(default=False, type='bool'), + exclusive=dict(default=False, type='bool'), + auto_delete=dict(default=False, type='bool'), + headers=dict(default={}, type='dict') + ) + module = AnsibleModule( + argument_spec=argument_spec, + mutually_exclusive=[['body', 'src']], + supports_check_mode=False + ) + + rabbitmq = RabbitClient(module) + + if rabbitmq.basic_publish(): + rabbitmq.close_connection() + module.exit_json(changed=True, result={"msg": "Successfully published to queue %s" % rabbitmq.queue, + "queue": rabbitmq.queue, + "content_type": rabbitmq.content_type}) + else: + rabbitmq.close_connection() + module.fail_json(changed=False, msg="Unsuccessful publishing to queue %s" % rabbitmq.queue) + + +if __name__ == '__main__': + main() diff --git a/test/integration/targets/rabbitmq_publish/aliases b/test/integration/targets/rabbitmq_publish/aliases new file mode 100644 index 0000000000..3d0091e7a9 --- /dev/null +++ b/test/integration/targets/rabbitmq_publish/aliases @@ -0,0 +1,5 @@ +destructive +shippable/posix/group1 +skip/osx +skip/freebsd +skip/rhel diff --git a/test/integration/targets/rabbitmq_publish/files/image.gif b/test/integration/targets/rabbitmq_publish/files/image.gif new file mode 100644 index 0000000000..0589d2082b Binary files /dev/null and b/test/integration/targets/rabbitmq_publish/files/image.gif differ diff --git a/test/integration/targets/rabbitmq_publish/meta/main.yml b/test/integration/targets/rabbitmq_publish/meta/main.yml new file mode 100644 index 0000000000..05ab59000b --- /dev/null +++ b/test/integration/targets/rabbitmq_publish/meta/main.yml @@ -0,0 +1,2 @@ +dependencies: + - setup_rabbitmq diff --git a/test/integration/targets/rabbitmq_publish/tasks/main.yml b/test/integration/targets/rabbitmq_publish/tasks/main.yml new file mode 100644 index 0000000000..740f899805 --- /dev/null +++ b/test/integration/targets/rabbitmq_publish/tasks/main.yml @@ -0,0 +1,5 @@ +# Rabbitmq lookup +- include: ubuntu.yml + when: + - ansible_distribution == 'Ubuntu' + - ansible_distribution_release != 'trusty' diff --git a/test/integration/targets/rabbitmq_publish/tasks/ubuntu.yml b/test/integration/targets/rabbitmq_publish/tasks/ubuntu.yml new file mode 100644 index 0000000000..fe8e9d000c --- /dev/null +++ b/test/integration/targets/rabbitmq_publish/tasks/ubuntu.yml @@ -0,0 +1,166 @@ +- name: Install requests and pika + pip: + name: requests,pika + state: present + +- name: RabbitMQ basic publish test + rabbitmq_publish: + url: "amqp://guest:guest@localhost:5672/%2F" + queue: 'publish_test' + body: "Hello world from ansible module rabbitmq_publish" + content_type: "text/plain" + register: rabbit_basic_output1 + +- assert: + that: + - "rabbit_basic_output1 is not failed" + - "'publish_test' in rabbit_basic_output1.result.msg" + - "'publish_test' in rabbit_basic_output1.result.queue" + - "'text/plain' in rabbit_basic_output1.result.content_type" + + +# Testing random queue +- name: Publish to random queue + rabbitmq_publish: + url: "amqp://guest:guest@localhost:5672/%2F" + body: "RANDOM QUEUE POST" + content_type: "text/plain" + register: rabbit_random_queue_output + +- assert: + that: + - "rabbit_random_queue_output is not failed" + - "'amq.gen' in rabbit_random_queue_output.result.msg" + - "'amq.gen' in rabbit_random_queue_output.result.queue" + - "'text/plain' in rabbit_random_queue_output.result.content_type" + +- name: Publish binary to a queue + rabbitmq_publish: + url: "amqp://guest:guest@localhost:5672/%2F" + queue: publish_test + src: "{{ role_path }}/files/image.gif" + register: rabbitmq_publish_file + +- assert: + that: + - "rabbitmq_publish_file is not failed" + - "'publish_test' in rabbitmq_publish_file.result.queue" + - "'image/gif' in rabbitmq_publish_file.result.content_type" + +- name: Raise error for src and body defined + rabbitmq_publish: + url: "amqp://guest:guest@localhost:5672/%2F" + queue: 'publish_test' + src: "{{ role_path }}/files/image.gif" + body: blah + register: rabbit_basic_fail_output1 + ignore_errors: yes + +- assert: + that: + - "rabbit_basic_fail_output1 is failed" + - "'parameters are mutually exclusive' in rabbit_basic_fail_output1.msg" + +- name: Publish a file that does not exist + rabbitmq_publish: + url: "amqp://guest:guest@localhost:5672/%2F" + queue: 'publish_test' + src: 'aaaaaaajax-loader.gif' + register: file_missing_fail + ignore_errors: yes + +- assert: + that: + - "file_missing_fail is failed" + - "'Unable to open file' in file_missing_fail.msg" + +- name: Publish with proto/host/port/user/pass + rabbitmq_publish: + proto: amqp + host: localhost + port: 5672 + username: guest + password: guest + vhost: '%2F' + queue: publish_test + body: Testing with proto/host/port/username/password/vhost + register: host_port_output + +- assert: + that: + - "host_port_output is not failed" + +- name: Publish with host/port/user but missing proto + rabbitmq_publish: + host: localhost + port: 5672 + username: guest + password: guest + vhost: '%2F' + queue: publish_test + body: Testing with proto/host/port/username/password/vhost + ignore_errors: yes + register: host_port_missing_proto_output + +- assert: + that: + - "host_port_missing_proto_output is failed" + - "'Connection parameters must be passed via' in host_port_missing_proto_output.msg" + +- name: Publish with proto/host/port/user and url + rabbitmq_publish: + url: "amqp://guest:guest@localhost:5672/%2F" + proto: amqp + host: localhost + port: 5672 + username: guest + password: guest + vhost: '%2F' + queue: publish_test + body: Testing with proto/host/port/username/password/vhost + ignore_errors: yes + register: host_and_url_output + +- assert: + that: + - "host_and_url_output is failed" + - "'cannot be specified at the same time' in host_and_url_output.msg" + +- name: Publish headers to queue + rabbitmq_publish: + url: "amqp://guest:guest@localhost:5672/%2F" + queue: 'publish_test' + body: blah + headers: + myHeader: Value1 + secondHeader: Value2 + register: test_headers1 + ignore_errors: yes + +- name: Publish headers with file + rabbitmq_publish: + url: "amqp://guest:guest@localhost:5672/%2F" + queue: 'publish_test' + src: "{{ role_path }}/files/image.gif" + headers: + myHeader: Value1 + secondHeader: Value2 + register: test_headers2 + ignore_errors: yes + +- name: Collect all messages off the publish queue + set_fact: + messages: "{{ lookup('rabbitmq', url='amqp://guest:guest@localhost:5672/%2F', queue='publish_test') }}" + +- name: Check contents of published messages + assert: + that: + - messages|length == 5 + - "'Hello world from ansible module rabbitmq_publish' in messages[0]['msg']" + - "'text/plain' in messages[0]['content_type']" + - "'image/gif' in messages[1]['content_type']" + - "'image.gif' in messages[1]['headers']['filename']" + - "'Testing with proto/host/port/username/password/vhost' in messages[2]['msg']" +# - messages[3]['headers']['myHeader'] is defined +# - messages[4]['headers']['filename'] is defined +# - messages[4]['headers']['secondHeader'] is defined