diff --git a/.gitignore b/.gitignore index 48a738f..6f8b4fb 100644 --- a/.gitignore +++ b/.gitignore @@ -54,3 +54,6 @@ coverage.xml # Sphinx documentation docs/_build/ +.idea +sprockets/mixins/amqp +sprockets/mixins/http diff --git a/.travis.yml b/.travis.yml index 6d6a031..e543ef1 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,23 +1,25 @@ language: python python: -- 2.7 -- pypy -- 3.4 -- 3.5.1 + - 2.7 + - pypy + - 3.4 + - 3.5 + - 3.6 services: -- rabbitmq + - rabbitmq install: -- pip install -r requires/testing.txt -e . + - pip install -r requires/testing.txt -e . script: -- nosetests + - nosetests after_success: -- codecov + - codecov deploy: provider: pypi user: sprockets password: secure: TPpdlEbbxfjcFXR7KizRNiwqZDdB8C8/y0zE/nbFvlds1ZfdqRwCZJlvgsVwhUfiMrUfKiZ3sg1kSI6wN0Li8Z4DVe8RPyhnOXjml6DDao4iZ569LYEfqMdfWrp6NcN/+sMOpGlq5XuQMcdsy3P9uP8WGOUzRwjuQ0ny+2BN8yxD3TxY+TqgYo3FaCYwR0bp5u8l1pmX9gIbD8DhcbbC7EyO+/t8IZj4x5TxIQamIvhWyd8LIFpvR1FcCKRPbqu2x2fPZG4t6YwBHbcmLf8VnZx5xFGvOKEP9HaN4YkWtSIHQ/RhCuFslSPg4peHK3xgurDKMsXdvxnsV2AgSQBEQEaWt6ewACbM4nyW09K++LKK0F19U6keSzYgLZZK+Twsn02xhNpQf58k5kuAB0pJNm+EFxymUcJjR5g9gnVE2ln/Y3MkU1YhXRJGvo0hwdcytkDaPitIASuPC9buy/UQ8smzYPfA60PAF9Dl0/gq8lIWteq3u6PJQT+qtSobWwhR/nFYqTWDMk1sZu4sHVe1IPhSnJonlWccPe/AcS6qG8QNUt5n4fC1l5rerfsiplo+aSLH8gb6p5eiueBAXGwH/akZa6nb9hs1gFFZicHlCzMXlvA845qiLBHbj1ABNJri8jnRvtNxNcYdqBu73lkhExIHlsIG8sgRw93bN1ys73A= on: - python: 3.4 + python: 3.5 + distributions: sdist bdist_wheel tags: true all_branches: true diff --git a/docs/history.rst b/docs/history.rst index a505ee4..f6af608 100644 --- a/docs/history.rst +++ b/docs/history.rst @@ -1,6 +1,12 @@ Version History =============== +`2.1.0`_ May 3, 2017 +-------------------- +- Consolidate code +- Streamline and use sprockets.mixins.http as well +- Replace tests with integration tests modeled after sprockets.mixins.amqp + `2.0.0`_ Apr 26, 2017 --------------------- - Move Mixin to separate file @@ -20,7 +26,8 @@ Version History ---------------------- - Initial implementation -.. _Next Release: https://github.com/sprockets/sprockets.mixins.avro-publisher/compare/2.0.0...HEAD +.. _Next Release: https://github.com/sprockets/sprockets.mixins.avro-publisher/compare/2.1.0...HEAD +.. _2.1.0: https://github.com/sprockets/sprockets.mixins.avro-publisher/compare/2.0.0...2.1.0 .. _2.0.0: https://github.com/sprockets/sprockets.mixins.avro-publisher/compare/1.0.1...2.0.0 .. _1.0.1: https://github.com/sprockets/sprockets.mixins.avro-publisher/compare/1.0.0...1.0.1 .. _1.0.0: https://github.com/sprockets/sprockets.mixins.avro-publisher/compare/7324bea...1.0.0 diff --git a/requires/installation.txt b/requires/installation.txt index f7164f0..140e05b 100644 --- a/requires/installation.txt +++ b/requires/installation.txt @@ -1,3 +1,4 @@ -sprockets.mixins.amqp>=2.0.0,<3 +sprockets.mixins.amqp>=2.1.1,<3 +sprockets.mixins.http>=1.0.3,<2 fastavro>=0.10.1,<1.0.0 tornado>=4.2.0,<5.0.0 diff --git a/requires/testing.txt b/requires/testing.txt index b81929a..5a6f4a5 100644 --- a/requires/testing.txt +++ b/requires/testing.txt @@ -1,5 +1,6 @@ -coverage>=3.7,<4 +coverage>=4,<5 nose>=1.3.1,<2.0.0 mock>=2.0.0,<3 -wheel +flake8 +pylint -r installation.txt diff --git a/setup.cfg b/setup.cfg index e08b28e..7f8e60f 100644 --- a/setup.cfg +++ b/setup.cfg @@ -2,6 +2,16 @@ universal = 1 [nosetests] -with-coverage = 1 +cover-branches = 1 cover-erase = 1 cover-package = sprockets.mixins.avro_publisher +with-coverage = 1 +verbosity = 2 +cover-html = 1 +cover-html-dir = build/coverage + +[upload_docs] +upload-dir = build/sphinx/html + +[flake8] +exclude = env,build diff --git a/setup.py b/setup.py index 6e76681..9c4856f 100644 --- a/setup.py +++ b/setup.py @@ -3,8 +3,6 @@ import os.path import setuptools -from sprockets.mixins.avro_publisher import __version__ - def read_requirements(name): requirements = [] @@ -25,7 +23,7 @@ def read_requirements(name): setuptools.setup( name='sprockets.mixins.avro-publisher', - version=__version__, + version='2.1.0', description='Mixin for publishing events to RabbitMQ as avro datums', long_description=open('README.rst').read(), url='https://github.com/sprockets/sprockets.mixins.avro-publisher', diff --git a/sprockets/mixins/avro_publisher/__init__.py b/sprockets/mixins/avro_publisher/__init__.py index 3189c18..ec119d6 100644 --- a/sprockets/mixins/avro_publisher/__init__.py +++ b/sprockets/mixins/avro_publisher/__init__.py @@ -1,4 +1,4 @@ -"""The AvroPublishingMixin adds Apache Avro serialization to the +"""The AvroPublishingMixin adds Apache Avro serialization to the RabbitMQ publishing capabilities in sprockets.mixins.amqp To configure the URL format for the avro schema, add a @@ -8,51 +8,152 @@ should be similar to the following: ``http://my-schema-repository/avro/%(name)s.avsc`` Take note also of the required configurations in sprockets.mixins.amqp + """ +import io import logging +from sprockets.mixins import amqp, http +from tornado import gen +import fastavro + +__version__ = '2.1.0' + LOGGER = logging.getLogger(__name__) -try: - from sprockets.mixins import amqp +DATUM_MIME_TYPE = 'application/vnd.apache.avro.datum' +SCHEMA_URI_FORMAT = 'http://localhost/avro/%(name)s.avsc' - from sprockets.mixins.avro_publisher.mixins import (PublishingMixin, - SchemaFetchError) - -except ImportError as error: - class PublishingMixin(object): - def __init__(self, *args, **kwargs): - raise error - - class SchemaFetchError(Exception): - def __init__(self, *args, **kwargs): - raise error - - class amqp(object): - - error = None - - @classmethod - def install(cls, *args, **kwargs): - raise cls.error - - amqp.error = error - -version_info = (2, 0, 0) -__version__ = '.'.join(str(v) for v in version_info) +_SCHEMAS = {} -def install(application, **kwargs): - """Call this to install avro publishing for the Tornado application. +def install(application, io_loop=None, **kwargs): # pragma: nocover + """Call this to install Avro publishing for the Tornado application. :rtype: bool """ - amqp.install(application, **kwargs) - + amqp.install(application, io_loop=io_loop, **kwargs) if 'avro_schema_uri_format' not in application.settings: LOGGER.warning('avro_schema_uri_format is not set, using default') - - setattr(application, 'avro_schemas', {}) - return True + + +class PublishingMixin(amqp.PublishingMixin, http.HTTPClientMixin): + """Publish to Avro encoded datums to RabbitMQ""" + + def avro_amqp_publish(self, exchange, routing_key, message_type, + data, properties=None): + """Publish a message to RabbitMQ, serializing the payload data as an + Avro datum and creating the AMQP message properties. + + :param str exchange: The exchange to publish the message to. + :param str routing_key: The routing key to publish the message with. + :param str message_type: The message type for the Avro schema. + :param dict data: The message data to serialize. + :param dict properties: An optional dict of additional properties + to append. + :raises: sprockets.mixins.avro_publisher.SchemaFetchError + + """ + properties = properties or {} + properties['content_type'] = DATUM_MIME_TYPE + properties['type'] = message_type + return self.amqp_publish(exchange, routing_key, data, properties) + + @gen.coroutine + def amqp_publish(self, exchange, routing_key, body, properties=None): + """Publish a message to RabbitMQ, serializing the payload data as an + Avro datum if the message is to be sent as such. + + :param str exchange: The exchange to publish the message to. + :param str routing_key: The routing key to publish the message with. + :param dict body: The message data to serialize. + :param dict properties: An optional dict of additional properties + to append. If publishing an Avro message, it + must contain the Avro message type at 'type' + and have a content type of + 'application/vnd.apache.avro.datum' + :raises: sprockets.mixins.avro_publisher.SchemaFetchError + + """ + LOGGER.debug('Publishing Here: %r %r %r %r', + exchange, routing_key, body, properties) + properties = properties or {} + if properties.get('content_type') == DATUM_MIME_TYPE and \ + isinstance(body, dict): + avro_schema = yield self._schema(properties['type']) + LOGGER.debug('Schema: %r', avro_schema) + body = self._serialize(avro_schema, body) + yield super(PublishingMixin, self).amqp_publish( + exchange, routing_key, body, properties) + + @gen.coroutine + def _schema(self, message_type): + """Fetch the Avro schema file from application cache or the remote + URI. If the request for the schema from the remote URI fails, a + :exc:`sprockets.mixins.avro_publisher.SchemaFetchError` will be + raised. + + :param str message_type: The message type for the Avro schema. + :rtype: str + :raises: sprockets.mixins.avro_publisher.SchemaFetchError + + """ + global _SCHEMAS + + if message_type not in _SCHEMAS: + schema = yield self._fetch_schema(message_type) + _SCHEMAS[message_type] = schema + raise gen.Return(_SCHEMAS[message_type]) + + @gen.coroutine + def _fetch_schema(self, message_type): + """Fetch the Avro schema for the given message type from a remote + location, returning the schema JSON string. + + If the schema can not be retrieved, a + :exc:`~sprockets.mixins.avro_publisher.SchemaFetchError` will be + raised. + + :param str message_type: The message type for the Avro schema. + :rtype: str + :raises: sprockets.mixins.avro_publisher.SchemaFetchError + + """ + response = yield self.http_fetch(self._schema_url(message_type)) + if response.ok: + raise gen.Return(response.body) + raise SchemaFetchError() + + def _schema_url(self, message_type): + """Return the URL for the given message type for retrieving the Avro + schema from a remote location. + + :param str message_type: The message type for the Avro schema. + + :rtype: str + + """ + return self.application.settings.get( + 'avro_schema_uri_format', + SCHEMA_URI_FORMAT) % {'name': message_type} + + @staticmethod + def _serialize(schema, data): + """Serialize a data structure into an Avro datum. + + :param dict schema: The parsed Avro schema. + :param dict data: The value to turn into an Avro datum. + + :rtype: bytes + + """ + stream = io.BytesIO() + fastavro.schemaless_writer(stream, schema, data) + return stream.getvalue() + + +class SchemaFetchError(Exception): + """Raised when the Avro schema could not be fetched.""" + pass diff --git a/sprockets/mixins/avro_publisher/mixins.py b/sprockets/mixins/avro_publisher/mixins.py deleted file mode 100644 index 0c85ff5..0000000 --- a/sprockets/mixins/avro_publisher/mixins.py +++ /dev/null @@ -1,182 +0,0 @@ -import io -import json -import logging - -from sprockets.mixins import amqp -from tornado import gen, httpclient -import fastavro - -LOGGER = logging.getLogger(__name__) - - -class PublishingMixin(amqp.PublishingMixin): - """The request handler will connect to RabbitMQ on the first request, - blocking until the connection and channel are established. If RabbitMQ - closes its connection to the app at any point, a connection attempt will - be made on the next request. - - This class implements a pattern for the use of a single AMQP connection - to RabbitMQ. - """ - DATUM_MIME_TYPE = 'application/vnd.apache.avro.datum' - DEFAULT_SCHEMA_URI_FORMAT = 'http://localhost/avro/%(name)s.avsc' - DEFAULT_FETCH_RETRY_DELAY = 0.5 - - def initialize(self, *args, **kwargs): - self._schema_fetch_failed = False - - if not hasattr(self, '_http_client'): - self._http_client = httpclient.AsyncHTTPClient(force_instance=True) - - self._schema_uri_format = self.application.settings.get( - 'avro_schema_uri_format', - self.DEFAULT_SCHEMA_URI_FORMAT) - - self._fetch_retry_delay = self.application.settings.get( - 'avro_schema_fetch_retry_delay', - self.DEFAULT_FETCH_RETRY_DELAY) - - if hasattr(super(PublishingMixin, self), 'initialize'): - super(PublishingMixin, self).initialize(*args, **kwargs) - - @gen.coroutine - def avro_amqp_publish(self, exchange, routing_key, message_type, - data, properties=None, mandatory=False): - """Publish a message to RabbitMQ, serializing the payload data as an - Avro datum and creating the AMQP message properties. - - :param str exchange: The exchange to publish the message to. - :param str routing_key: The routing key to publish the message with. - :param str message_type: The message type for the Avro schema. - :param dict data: The message data to serialize. - :param dict properties: An optional dict of additional properties - to append. Will not override mandatory - properties: - content_type, type - :param bool mandatory: Whether to instruct the server to return an - unqueueable message - http://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.publish.mandatory - - :raises: sprockets.mixins.avro_publisher.SchemaFetchError - """ - # Set mandatory Avro-related properties - properties = properties or {} - properties['content_type'] = self.DATUM_MIME_TYPE - properties['type'] = message_type - - yield self.amqp_publish(exchange, routing_key, data, properties, - mandatory) - - @gen.coroutine - def amqp_publish(self, exchange, routing_key, body, properties, - mandatory=False): - """Publish a message to RabbitMQ, serializing the payload data as an - Avro datum if the message is to be sent as such. - - :param str exchange: The exchange to publish the message to. - :param str routing_key: The routing key to publish the message with. - :param dict body: The message data to serialize. - :param dict properties: A dict of additional properties - to append. If publishing an Avro message, it - must contain the Avro message type at 'type' - and have a content type of - 'application/vnd.apache.avro.datum' - :param bool mandatory: Whether to instruct the server to return an - unqueueable message - http://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.publish.mandatory - - :raises: sprockets.mixins.avro_publisher.SchemaFetchError - """ - if (('content_type' in properties and - properties['content_type']) == self.DATUM_MIME_TYPE): - avro_schema = yield self._schema(properties['type']) - body = self._serialize(avro_schema, body) - - yield super(PublishingMixin, self).amqp_publish( - exchange, - routing_key, - body, - properties, - mandatory - ) - - @gen.coroutine - def _schema(self, message_type): - """Fetch the Avro schema file from application cache or the remote URI. - If the request for the schema from the remote URI fails, a - :exc:`sprockets.mixins.avro_publisher.SchemaFetchError` will be raised. - - :param str message_type: The message type for the Avro schema. - - :rtype: str - - :raises: sprockets.mixins.avro_publisher.SchemaFetchError - - """ - if message_type not in self.application.avro_schemas: - schema = yield self._fetch_schema(message_type) - self.application.avro_schemas[message_type] = schema - raise gen.Return(self.application.avro_schemas[message_type]) - - @gen.coroutine - def _fetch_schema(self, message_type): - """Fetch the Avro schema for the given message type from a remote - location, returning the schema JSON string. - - If fetching the schema results in an ``tornado.httpclient.HTTPError``, - it will retry once then raise a SchemaFetchError if the retry fails. - - :param str message_type: The message type for the Avro schema. - - :rtype: str - - :raises: sprockets.mixins.avro_publisher.SchemaFetchError - - """ - url = self._schema_url(message_type) - LOGGER.debug('Loading schema for %s from %s', message_type, url) - try: - response = yield self._http_client.fetch(url) - except httpclient.HTTPError as error: - if self._schema_fetch_failed: - LOGGER.error('Could not fetch Avro schema for %s (%s)', - message_type, error) - raise SchemaFetchError(str(error)) - else: - self._schema_fetch_failed = True - yield gen.sleep(self._fetch_retry_delay) - yield self._fetch_schema(message_type) - - else: - self._schema_fetch_failed = False - raise gen.Return(json.loads(response.body.decode('utf-8'))) - - def _schema_url(self, message_type): - """Return the URL for the given message type for retrieving the Avro - schema from a remote location. - - :param str message_type: The message type for the Avro schema. - - :rtype: str - - """ - return self._schema_uri_format % {'name': message_type} - - @staticmethod - def _serialize(schema, data): - """Serialize a data structure into an Avro datum. - - :param dict schema: The parsed Avro schema. - :param dict data: The value to turn into an Avro datum. - - :rtype: bytes - - """ - stream = io.BytesIO() - fastavro.schemaless_writer(stream, schema, data) - return stream.getvalue() - - -class SchemaFetchError(ValueError): - """Raised when the Avro schema could not be fetched.""" - pass diff --git a/tests.py b/tests.py index b591302..cafd9e6 100644 --- a/tests.py +++ b/tests.py @@ -1,423 +1,183 @@ -import io import json +import io import logging -import os +import random import uuid -from pika import spec -import mock - -from tornado.concurrent import Future -from tornado.httpclient import HTTPError -from tornado import gen, locks, testing, web +from tornado import concurrent, gen, locks, testing, web import fastavro +from pika import spec -from sprockets.mixins import avro_publisher -from sprockets.mixins.avro_publisher.mixins import SchemaFetchError - -MESSAGE_TYPE = "example.avro.User" - -AVRO_SCHEMA = """ -{"namespace": "example.avro", - "type": "record", - "name": "User", - "fields": [ - {"name": "name", "type": "string"}, - {"name": "favorite_number", "type": ["int", "null"]}, - {"name": "favorite_color", "type": ["string", "null"]} - ] -} -""" - -# Set this URL to that of a running AMQP server before executing tests -if 'TEST_AMQP_URL' in os.environ: - AMQP_URL = os.environ['TEST_AMQP_URL'] -else: - AMQP_URL = 'amqp://guest:guest@localhost:5672/%2f' +from sprockets.mixins import amqp, avro_publisher LOGGER = logging.getLogger(__name__) +MESSAGE_TYPE = "example.avro.Test" -class TestRequestHandler(avro_publisher.PublishingMixin): - - def __init__(self, application): - self.application = application - self.correlation_id = str(uuid.uuid4()) - self.initialize() - - def set_client(self, client): - self._http_client = client +AVRO_SCHEMA = { + "namespace": "example.avro", + "type": "record", + "name": "User", + "fields": [ + {"name": "name", "type": "string"}, + {"name": "favorite_number", "type": ["int", "null"]}, + {"name": "favorite_color", "type": ["string", "null"]}]} -@mock.patch('tornado.httpclient.AsyncHTTPClient') -class MockHTTPClient: - - def fetch(self, *args, **kwargs): - with mock.patch('tornado.httpclient.HTTPResponse') as response_class: - response = response_class.return_value - response.body = AVRO_SCHEMA.encode() - - future = Future() - future.set_result(response) - return future +def deserialize(value): + return fastavro.schemaless_reader(io.BytesIO(value), AVRO_SCHEMA) -@mock.patch('tornado.httpclient.AsyncHTTPClient') -class MockHTTPClientError: +class Test1RequestHandler(avro_publisher.PublishingMixin, web.RequestHandler): - def __init__(self): - self.times_called = 0 - - def fetch(self, *args, **kwargs): - self.times_called += 1 - raise HTTPError(500) - - -class BaseTestCase(testing.AsyncTestCase): + def initialize(self): + self.correlation_id = self.request.headers.get('Correlation-Id') + self.publish = self.amqp_publish @gen.coroutine + def get(self, *args, **kwargs): + LOGGER.debug('Handling Request %r', self.correlation_id) + parameters = self.parameters() + try: + yield self.publish(**parameters) + except amqp.AMQPException as error: + self.write({'error': str(error), + 'type': error.__class__.__name__, + 'parameters': parameters}) + else: + self.write(parameters) # Correlation-ID is added pass by reference + self.finish() + + def parameters(self): + return { + 'exchange': self.get_argument('exchange', str(uuid.uuid4())), + 'routing_key': self.get_argument('routing_key', str(uuid.uuid4())), + 'body': { + 'name': str(uuid.uuid4()), + 'favorite_number': random.randint(1, 1000), + 'favorite_color': str(uuid.uuid4()) + }, + 'properties': { + 'content_type': avro_publisher.DATUM_MIME_TYPE, + 'message_id': str(uuid.uuid4()), + 'type': MESSAGE_TYPE}} + +class Test2RequestHandler(Test1RequestHandler): + + def initialize(self): + self.correlation_id = self.request.headers.get('Correlation-Id') + self.publish = self.avro_amqp_publish + + def parameters(self): + return { + 'exchange': self.get_argument('exchange', str(uuid.uuid4())), + 'routing_key': self.get_argument('routing_key', str(uuid.uuid4())), + 'message_type': MESSAGE_TYPE, + 'data': { + 'name': str(uuid.uuid4()), + 'favorite_number': random.randint(1, 1000), + 'favorite_color': str(uuid.uuid4()) + }, + 'properties': {'message_id': str(uuid.uuid4())}} + + +class SchemaRequestHandler(web.RequestHandler): + def get(self, *args, **kwargs): + LOGGER.debug('Returning Schema for %r %r', args, kwargs) + self.finish(AVRO_SCHEMA) + + +def setUpModule(): + logging.getLogger('pika').setLevel(logging.INFO) + + +class AsyncHTTPTestCase(testing.AsyncHTTPTestCase): + CONFIRMATIONS = True + def setUp(self): - super(BaseTestCase, self).setUp() - - # make sure that our logging statements get executed - avro_publisher.mixins.LOGGER.enabled = True - avro_publisher.mixins.LOGGER.setLevel(logging.DEBUG) - - self.exchange = str(uuid.uuid4()) - self.queue = str(uuid.uuid4()) - self.routing_key = MESSAGE_TYPE + super(AsyncHTTPTestCase, self).setUp() self.correlation_id = str(uuid.uuid4()) - self.message = None - self.test_queue_bound = locks.Event() - self.get_response = locks.Event() - self.amqp_ready = locks.Event() - self.condition = locks.Condition() - self.config = { - "url": AMQP_URL, - "reconnect_delay": 1, - "timeout": 2, - "on_ready_callback": self.on_ready, - "on_unavailable_callback": self.on_unavailable, - "on_persistent_failure_callback": self.on_persistent_failure, - "on_message_returned_callback": self.on_message_returned, - "io_loop": self.io_loop, - } - self.app = web.Application() - self.app.settings = { - 'service': 'unit_tests', - 'version': '0.0', - } + self.exchange = str(uuid.uuid4()) + self.get_delivered_message = concurrent.Future() + self.get_returned_message = concurrent.Future() + self.queue = str(uuid.uuid4()) + self.routing_key = str(uuid.uuid4()) + self.ready = locks.Event() + avro_publisher.install(self._app, self.io_loop, **{ + 'on_ready_callback': self.on_amqp_ready, + 'enable_confirmations': self.CONFIRMATIONS, + 'on_return_callback': self.on_message_returned, + 'url': 'amqp://guest:guest@127.0.0.1:5672/%2f'}) + self.io_loop.start() - self.clear_event_tracking() + def get_app(self): + return web.Application( + [(r'/test1', Test1RequestHandler), + (r'/test2', Test2RequestHandler), + (r'/schema/(.*).avsc', SchemaRequestHandler)], + **{'avro_schema_uri_format': self.get_url('/schema/%(name)s.avsc'), + 'service': 'test', + 'version': avro_publisher.__version__}) - self.handler = TestRequestHandler(self.app) + def on_amqp_ready(self, _client): + LOGGER.debug('AMQP ready') + self._app.amqp.channel.exchange_declare( + self.on_exchange_declared, self.exchange, + durable=False, auto_delete=True) - self.create_http_client() + def on_exchange_declared(self, method): + LOGGER.debug('Exchange declared: %r', method) + self._app.amqp.channel.queue_declare( + self.on_queue_declared, self.queue, + arguments={'x-expires': 30000}, + auto_delete=True, durable=False) - avro_publisher.install(self.app, **self.config) - yield self.condition.wait(self.io_loop.time() + 5) + def on_queue_declared(self, method): + LOGGER.debug('Queue declared: %r', method) + self._app.amqp.channel.queue_bind( + self.on_queue_bound, self.queue, self.exchange, self.routing_key) - LOGGER.info('Connected to RabbitMQ, declaring exchange %s', - self.exchange) - self.app.amqp.channel.exchange_declare(self.on_exchange_declare_ok, - self.exchange, - auto_delete=True) + def on_queue_bound(self, method): + LOGGER.debug('Queue bound: %r', method) + self._app.amqp.channel.basic_consume( + self.on_message_delivered, self.queue) + self.io_loop.stop() - def create_http_client(self): - client = MockHTTPClient() - self.handler.set_client(client) + def on_message_delivered(self, _channel, method, properties, body): + self.get_delivered_message.set_result((method, properties, body)) - def create_http_error_client(self): - client = MockHTTPClientError() - self.handler.set_client(client) - - def on_exchange_declare_ok(self, _method): - LOGGER.info( - 'Exchange %s declared, declaring queue %s', - self.exchange, - self.queue - ) - self.app.amqp.channel.queue_declare(self.on_queue_declare_ok, - queue=self.queue, - auto_delete=True) - - def on_queue_declare_ok(self, _method): - LOGGER.info('Queue %s declared', self.queue) - self.app.amqp.channel.queue_bind(self.on_bind_ok, self.queue, - self.exchange, self.routing_key) - - def on_bind_ok(self, _method): - LOGGER.info('Queue %s bound to %s', self.queue, self.exchange) - self.app.amqp.channel.add_callback(self.on_get_response, - [spec.Basic.GetEmpty], False) - self.test_queue_bound.set() - - def on_get_response(self, channel, method, properties=None, body=None): - LOGGER.info('get_response: %r', method) - self.message = { - 'method': method, - 'properties': properties, - 'body': body, - } - self.get_response.set() - - def on_ready(self, caller): - LOGGER.info('on_ready called') - self.ready_called = True - self.amqp_ready.set() - - def on_unavailable(self, caller): - LOGGER.info('on_unavailable called') - self.unavailable_called = True - self.amqp_ready.clear() - - def on_persistent_failure(self, caller, exchange, - routing_key, body, properties): - LOGGER.info('on_persistent_failure called') - self.persistent_failure_called = True - self.failed_message = { - 'exchange': exchange, - 'routing_key': routing_key, - 'body': body, - 'properties': properties, - } - self.amqp_ready.clear() - - def on_message_returned(self, caller, method, properties, body): - LOGGER.info('on_message_returned called') - self.message_returned_called = True - self.message_returned_error = method.reply_text - self.returned_message = { - 'exchange': method.exchange, - 'routing_key': method.routing_key, - 'body': body, - 'properties': properties, - } - - def clear_event_tracking(self): - self.ready_called = False - self.unavailable_called = False - self.persistent_failure_called = False - self.message_returned_called = False - self.failed_message = { - 'exchange': None, - 'routing_key': None, - 'body': None, - 'properties': None, - } - self.returned_message = { - 'exchange': None, - 'routing_key': None, - 'body': None, - 'properties': None, - } - - @gen.coroutine - def get_message(self): - self.message = None - self.get_response.clear() - self.app.amqp.channel.basic_get(self.on_get_response, self.queue) - - LOGGER.info('Waiting on get') - yield self.get_response.wait() - if isinstance(self.message['method'], spec.Basic.GetEmpty): - raise ValueError('Basic.GetEmpty') - raise gen.Return(self.message) + def on_message_returned(self, method, properties, body): + self.get_returned_message.set_result((method, properties, body)) -class SettingsTests(testing.AsyncTestCase): - - @testing.gen_test(timeout=10) - def should_warn_when_no_uri_schema_set_test(self): - app = web.Application() - - with mock.patch('logging.Logger.warning') as mock_logger: - avro_publisher.install(app, url=AMQP_URL) - - mock_logger.assert_called_with( - 'avro_schema_uri_format is not set, using default') - - @testing.gen_test(timeout=10) - def should_set_default_settings_test(self): - app = web.Application() - - avro_publisher.install(app, url=AMQP_URL) - - handler = TestRequestHandler(app) +class PublisherConfirmationTestCase(AsyncHTTPTestCase): + @testing.gen_test + def test_amqp_publish(self): + response = yield self.http_client.fetch( + self.get_url('/test1?exchange={}&routing_key={}'.format( + self.exchange, self.routing_key)), + headers={'Correlation-Id': self.correlation_id}) + published = json.loads(response.body.decode('utf-8')) + delivered = yield self.get_delivered_message + self.assertIsInstance(delivered[0], spec.Basic.Deliver) + self.assertEqual(delivered[1].correlation_id, self.correlation_id) self.assertEqual( - handler._schema_uri_format, - avro_publisher.PublishingMixin.DEFAULT_SCHEMA_URI_FORMAT - ) + delivered[1].content_type, avro_publisher.DATUM_MIME_TYPE) + self.assertEqual(delivered[1].type, MESSAGE_TYPE) + self.assertEqual(deserialize(delivered[2]), published['body']) + + @testing.gen_test + def test_avro_amqp_publish(self): + response = yield self.http_client.fetch( + self.get_url('/test2?exchange={}&routing_key={}'.format( + self.exchange, self.routing_key)), + headers={'Correlation-Id': self.correlation_id}) + published = json.loads(response.body.decode('utf-8')) + delivered = yield self.get_delivered_message + self.assertIsInstance(delivered[0], spec.Basic.Deliver) + self.assertEqual(delivered[1].correlation_id, self.correlation_id) self.assertEqual( - handler._fetch_retry_delay, - avro_publisher.PublishingMixin.DEFAULT_FETCH_RETRY_DELAY - ) - - @testing.gen_test(timeout=10) - def should_override_default_with_configured_settings_test(self): - app = web.Application() - app.settings = { - 'avro_schema_uri_format': 'http://127.0.0.1/avro/%(name)s.avsc', - 'avro_schema_fetch_retry_delay': - avro_publisher.PublishingMixin.DEFAULT_FETCH_RETRY_DELAY + 1, - - } - - avro_publisher.install(app, url=AMQP_URL) - - handler = TestRequestHandler(app) - - self.assertEqual(handler._schema_uri_format, - app.settings['avro_schema_uri_format']) - self.assertEqual(handler._fetch_retry_delay, - app.settings['avro_schema_fetch_retry_delay']) - - -class AvroIntegrationTests(BaseTestCase): - - @testing.gen_test(timeout=10) - def should_publish_avro_message_test(self): - yield self.test_queue_bound.wait() - - LOGGER.info('Should be ready') - - message = { - "name": "testuser", - "favorite_number": 1, - "favorite_color": "green", - } - - yield self.handler.avro_amqp_publish( - self.exchange, - MESSAGE_TYPE, - self.routing_key, - message - ) - - stream = io.BytesIO() - fastavro.schemaless_writer(stream, json.loads(AVRO_SCHEMA), message) - serialized_message = stream.getvalue() - - LOGGER.info('Published') - - result = yield self.get_message() - - self.assertEqual(serialized_message, result['body']) - self.assertEqual(self.handler.app_id, result['properties'].app_id) - self.assertEqual(self.handler.correlation_id, - result['properties'].correlation_id) - self.assertEqual(avro_publisher.PublishingMixin.DATUM_MIME_TYPE, - result['properties'].content_type) - - @testing.gen_test(timeout=10) - def should_publish_other_format_amqp_message_test(self): - yield self.test_queue_bound.wait() - - LOGGER.info('Should be ready') - - message = bytes(bytearray(range(255, 0, -1))) - properties = {'content_type': 'application/octet-stream'} - - yield self.handler.amqp_publish( - self.exchange, - self.routing_key, - message, - properties - ) - - LOGGER.info('Published') - - result = yield self.get_message() - - self.assertEqual(message, result['body']) - self.assertEqual(self.handler.app_id, result['properties'].app_id) - self.assertEqual(self.handler.correlation_id, - result['properties'].correlation_id) - self.assertEqual(properties['content_type'], - result['properties'].content_type) - - @testing.gen_test(timeout=10) - def should_raise_schema_fetch_error_on_fetch_failure_test(self): - yield self.test_queue_bound.wait() - - LOGGER.info('Should be ready') - - self.create_http_error_client() - - with self.assertRaises(SchemaFetchError): - message = { - "name": "testuser", - "favorite_number": 1, - "favorite_color": "green", - } - - yield self.handler.avro_amqp_publish( - self.exchange, - MESSAGE_TYPE, - self.routing_key, - message - ) - - @testing.gen_test(timeout=10) - def should_retry_once_on_fetch_failure_test(self): - yield self.test_queue_bound.wait() - - LOGGER.info('Should be ready') - - self.create_http_error_client() - - with self.assertRaises(SchemaFetchError): - message = { - "name": "testuser", - "favorite_number": 1, - "favorite_color": "green", - } - - yield self.handler.avro_amqp_publish( - self.exchange, - MESSAGE_TYPE, - self.routing_key, - message - ) - - self.assertEqual(2, self.handler._http_client.times_called) - - @testing.gen_test(timeout=10) - def should_serialize_avro_message_when_amqp_publish_called_test(self): - yield self.test_queue_bound.wait() - - LOGGER.info('Should be ready') - - message = { - "name": "testuser", - "favorite_number": 1, - "favorite_color": "green", - } - - properties = { - 'content_type': avro_publisher.PublishingMixin.DATUM_MIME_TYPE, - 'type': MESSAGE_TYPE - } - - yield self.handler.amqp_publish( - self.exchange, - self.routing_key, - message, - properties - ) - - stream = io.BytesIO() - fastavro.schemaless_writer(stream, json.loads(AVRO_SCHEMA), message) - serialized_message = stream.getvalue() - - LOGGER.info('Published') - - result = yield self.get_message() - - self.assertEqual(serialized_message, result['body']) - self.assertEqual(self.handler.app_id, result['properties'].app_id) - self.assertEqual(self.handler.correlation_id, - result['properties'].correlation_id) - self.assertEqual(avro_publisher.PublishingMixin.DATUM_MIME_TYPE, - result['properties'].content_type) + delivered[1].content_type, avro_publisher.DATUM_MIME_TYPE) + self.assertEqual(delivered[1].type, MESSAGE_TYPE) + self.assertEqual(deserialize(delivered[2]), published['data'])