diff --git a/.travis.yml b/.travis.yml index 63db33b..6d6a031 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,15 +1,23 @@ language: python python: -- '3.4' +- 2.7 +- pypy +- 3.4 +- 3.5.1 +services: +- rabbitmq install: -- pip install -e . -script: '' +- pip install -r requires/testing.txt -e . +script: +- nosetests +after_success: +- codecov deploy: provider: pypi user: sprockets password: secure: TPpdlEbbxfjcFXR7KizRNiwqZDdB8C8/y0zE/nbFvlds1ZfdqRwCZJlvgsVwhUfiMrUfKiZ3sg1kSI6wN0Li8Z4DVe8RPyhnOXjml6DDao4iZ569LYEfqMdfWrp6NcN/+sMOpGlq5XuQMcdsy3P9uP8WGOUzRwjuQ0ny+2BN8yxD3TxY+TqgYo3FaCYwR0bp5u8l1pmX9gIbD8DhcbbC7EyO+/t8IZj4x5TxIQamIvhWyd8LIFpvR1FcCKRPbqu2x2fPZG4t6YwBHbcmLf8VnZx5xFGvOKEP9HaN4YkWtSIHQ/RhCuFslSPg4peHK3xgurDKMsXdvxnsV2AgSQBEQEaWt6ewACbM4nyW09K++LKK0F19U6keSzYgLZZK+Twsn02xhNpQf58k5kuAB0pJNm+EFxymUcJjR5g9gnVE2ln/Y3MkU1YhXRJGvo0hwdcytkDaPitIASuPC9buy/UQ8smzYPfA60PAF9Dl0/gq8lIWteq3u6PJQT+qtSobWwhR/nFYqTWDMk1sZu4sHVe1IPhSnJonlWccPe/AcS6qG8QNUt5n4fC1l5rerfsiplo+aSLH8gb6p5eiueBAXGwH/akZa6nb9hs1gFFZicHlCzMXlvA845qiLBHbj1ABNJri8jnRvtNxNcYdqBu73lkhExIHlsIG8sgRw93bN1ys73A= on: - tags: true python: 3.4 + tags: true all_branches: true diff --git a/MANIFEST.in b/MANIFEST.in index 43f5a43..e16af52 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,4 +1,4 @@ -include requires/python2.txt -include requires/python3.txt include LICENSE include README.rst +graft docs +graft requires diff --git a/README.rst b/README.rst index d54d845..eb5c69a 100644 --- a/README.rst +++ b/README.rst @@ -1,6 +1,6 @@ sprockets.mixins.avro-publisher =============================== -AMQP Publishing Mixin for publishing messages as Avro datum +AMQP Publishing Mixin for publishing a message as an Avro datum. |Version| |Downloads| @@ -16,7 +16,9 @@ and can be installed via ``pip`` or ``easy_install``: Requirements ------------ -- sprockets.mixins.amqp>=1.0.0 +- sprockets.mixins.amqp>=2.0.0 +- fastavro>=0.10.1,<1.0.0 +- tornado>=4.2.0,<5.0.0 Example ------- @@ -44,14 +46,16 @@ This examples demonstrates the most basic usage of ``sprockets.mixins.avro-publi avro_publisher.install(application) return application - class RequestHandler(avro_publisher.AvroPublishingMixin, web.RequestHandler): + class RequestHandler(avro_publisher.PublishingMixin, web.RequestHandler): @gen.coroutine def get(self, *args, **kwargs): body = {'request': self.request.path, 'args': args, 'kwargs': kwargs} - yield self.amqp_publish('exchange', 'routing.key', body, - {'content_type': avro_publisher.DATUM_MIME_TYPE, - 'type': 'avro-schema-name'}) + yield self.avro_amqp_publish( + 'exchange', + 'routing.key', + 'avro-schema-name' + body) if __name__ == "__main__": application = make_app() diff --git a/docs/history.rst b/docs/history.rst new file mode 100644 index 0000000..a505ee4 --- /dev/null +++ b/docs/history.rst @@ -0,0 +1,26 @@ +Version History +=============== + +`2.0.0`_ Apr 26, 2017 +--------------------- +- Move Mixin to separate file +- Replace code with latest internal version +- Rename AvroPublishingMixin to PublishingMixin +- Update setup.py and requires files to current standard +- Replace avro library with fastavro library +- Add avro_amqp_publish helper method +- Add retry when schema cannot be fetched + - Delay before retrying is configurable via application.settings: + - avro_schema_fetch_retry_delay (default 0.5 seconds) +- Separate HTTP client from common app-based pool to help avoid excessive locking on high load +- Add unit tests + - Test execution requires a running AMQP server, see tests.py + +`0.1.0`_ Sept 24, 2015 +---------------------- + - Initial implementation + +.. _Next Release: https://github.com/sprockets/sprockets.mixins.avro-publisher/compare/2.0.0...HEAD +.. _2.0.0: https://github.com/sprockets/sprockets.mixins.avro-publisher/compare/1.0.1...2.0.0 +.. _1.0.1: https://github.com/sprockets/sprockets.mixins.avro-publisher/compare/1.0.0...1.0.1 +.. _1.0.0: https://github.com/sprockets/sprockets.mixins.avro-publisher/compare/7324bea...1.0.0 diff --git a/docs/index.rst b/docs/index.rst new file mode 100644 index 0000000..ecda180 --- /dev/null +++ b/docs/index.rst @@ -0,0 +1,10 @@ +.. include:: ../README.rst + +Issues +------ +Please report any issues to the Github project at `https://github.com/sprockets/sprockets.mixins.avro-publisher/issues `_ + +.. toctree:: + :hidden: + + history diff --git a/requires/installation.txt b/requires/installation.txt new file mode 100644 index 0000000..f7164f0 --- /dev/null +++ b/requires/installation.txt @@ -0,0 +1,3 @@ +sprockets.mixins.amqp>=2.0.0,<3 +fastavro>=0.10.1,<1.0.0 +tornado>=4.2.0,<5.0.0 diff --git a/requires/python2.txt b/requires/python2.txt deleted file mode 100644 index 8953202..0000000 --- a/requires/python2.txt +++ /dev/null @@ -1,2 +0,0 @@ -sprockets.mixins.amqp>=1.0.0,<2 -avro>=1.7.7,<2 diff --git a/requires/python3.txt b/requires/python3.txt deleted file mode 100644 index 8804715..0000000 --- a/requires/python3.txt +++ /dev/null @@ -1,2 +0,0 @@ -sprockets.mixins.amqp>=1.0.0,<2 -avro-python3>=1.7.7,<2 diff --git a/requires/testing.txt b/requires/testing.txt new file mode 100644 index 0000000..b81929a --- /dev/null +++ b/requires/testing.txt @@ -0,0 +1,5 @@ +coverage>=3.7,<4 +nose>=1.3.1,<2.0.0 +mock>=2.0.0,<3 +wheel +-r installation.txt diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 0000000..e08b28e --- /dev/null +++ b/setup.cfg @@ -0,0 +1,7 @@ +[bdist_wheel] +universal = 1 + +[nosetests] +with-coverage = 1 +cover-erase = 1 +cover-package = sprockets.mixins.avro_publisher diff --git a/setup.py b/setup.py index 31a6453..6e76681 100644 --- a/setup.py +++ b/setup.py @@ -1,14 +1,31 @@ -import setuptools -import sys +#!/usr/bin/env python +import os.path -requires = 'requires/python{0}.txt'.format(sys.version_info[0]) -with open(requires) as handle: - requirements = [line.strip() for line in handle.readlines()] +import setuptools + +from sprockets.mixins.avro_publisher import __version__ + + +def read_requirements(name): + requirements = [] + try: + with open(os.path.join('requires', name)) as req_file: + for line in req_file: + if '#' in line: + line = line[:line.index('#')] + line = line.strip() + if line.startswith('-r'): + requirements.extend(read_requirements(line[2:].strip())) + elif line and not line.startswith('-'): + requirements.append(line) + except IOError: + pass + return requirements setuptools.setup( name='sprockets.mixins.avro-publisher', - version='1.0.1', + version=__version__, description='Mixin for publishing events to RabbitMQ as avro datums', long_description=open('README.rst').read(), url='https://github.com/sprockets/sprockets.mixins.avro-publisher', @@ -32,5 +49,5 @@ setuptools.setup( ], packages=setuptools.find_packages(), namespace_packages=['sprockets', 'sprockets.mixins'], - install_requires=requirements, + install_requires=read_requirements('installation.txt'), zip_safe=True) diff --git a/sprockets/mixins/avro_publisher/__init__.py b/sprockets/mixins/avro_publisher/__init__.py index 003bbd4..3189c18 100644 --- a/sprockets/mixins/avro_publisher/__init__.py +++ b/sprockets/mixins/avro_publisher/__init__.py @@ -1,15 +1,5 @@ -"""The AvroPublishingMixin wraps RabbitMQ use into a request handler, with -methods to speed the development of publishing RabbitMQ messages serialized -as Avro datums. - -RabbitMQ is configured using two environment variables: ``AMQP_URL`` and -``AMQP_TIMEOUT``. - -``AMQP_URL`` is the AMQP url to connect to, defaults to -``amqp://guest:guest@localhost:5672/%2f``. - -``AMQP_TIMEOUT`` is the number of seconds to wait until timing out when -connecting to RabbitMQ. +"""The AvroPublishingMixin adds Apache Avro serialization to the +RabbitMQ publishing capabilities in sprockets.mixins.amqp To configure the URL format for the avro schema, add a Tornado application setting called ``avro_schema_uri_format``. The format @@ -17,132 +7,52 @@ should be similar to the following: ``http://my-schema-repository/avro/%(name)s.avsc`` +Take note also of the required configurations in sprockets.mixins.amqp """ -import io import logging -import sys - -from sprockets.mixins import amqp -from tornado import gen -from tornado import httpclient -import avro.io -import avro.schema - -version_info = (1, 0, 1) -__version__ = '.'.join(str(v) for v in version_info) LOGGER = logging.getLogger(__name__) -PYTHON3 = True if sys.version_info > (3, 0, 0) else False +try: + from sprockets.mixins import amqp + from sprockets.mixins.avro_publisher.mixins import (PublishingMixin, + SchemaFetchError) -class AvroPublishingMixin(amqp.PublishingMixin): - """The request handler will connect to RabbitMQ on the first request, - blocking until the connection and channel are established. If RabbitMQ - closes it's connection to the app at any point, a connection attempt will - be made on the next request. +except ImportError as error: + class PublishingMixin(object): + def __init__(self, *args, **kwargs): + raise error - This class implements a pattern for the use of a single AMQP connection - to RabbitMQ. + class SchemaFetchError(Exception): + def __init__(self, *args, **kwargs): + raise error - Expects the :envvar:`AMQP_URL` environment variable to construct - :class:`pika.connection.URLParameters`. + class amqp(object): - """ - DATUM_MIME_TYPE = 'application/vnd.apache.avro.datum' - DEFAULT_SCHEMA_URI_FORMAT = 'http://localhost/avro/%(name)s.avsc' + error = None - @gen.coroutine - def amqp_publish(self, exchange, routing_key, body, properties): - """Publish the message to RabbitMQ + @classmethod + def install(cls, *args, **kwargs): + raise cls.error - :param str exchange: The exchange to publish to - :param str routing_key: The routing key to publish with - :param dict body: The message body - :param dict properties: The message properties + amqp.error = error - """ - if (('content_type' in properties and - properties['content_type']) == self.DATUM_MIME_TYPE): - body = yield self._avro_serialize(properties['type'], body) - yield self.application.amqp.publish(exchange, routing_key, body, - properties) - - @gen.coroutine - def _avro_fetch_schema(self, schema_name): - """Fetch the avro schema file from the remote HTTP endpoint - - :param str schema_name: The schema name - :rtype: str - - """ - http_client = httpclient.AsyncHTTPClient() - url = self._avro_schema_url(schema_name) - LOGGER.info('Loading schema for %s from %s', schema_name, url) - try: - response = yield http_client.fetch(url) - except httpclient.HTTPError as error: - LOGGER.error('Could not fetch Avro schema for %s (%s)', schema_name, - error) - raise ValueError('Error fetching avro schema') - raise gen.Return(response.body) - - @gen.coroutine - def _avro_schema(self, schema_name): - """Fetch the Avro schema file from cache or the filesystem. - - :param str schema_name: The avro schema name - :rtype: str - - """ - if schema_name not in self.application.avro: - schema = yield self._avro_fetch_schema(schema_name) - if PYTHON3: - schema = str(schema, 'utf-8') - self.application.avro[schema_name] = avro.schema.Parse(schema) - else: - self.application.avro[schema_name] = avro.schema.parse(schema) - raise gen.Return(self.application.avro[schema_name]) - - def _avro_schema_url(self, schema_name): - """Return the Avro schema URL for the specified schema name. - - :param str schema_name: The avro schema name - :rtype: str - - """ - if 'avro_schema_uri_format' in self.application.settings: - schema_format = self.application.settings['avro_schema_uri_format'] - else: - schema_format = self.DEFAULT_SCHEMA_URI_FORMAT - return schema_format % {'name': schema_name} - - @gen.coroutine - def _avro_serialize(self, schema_name, data): - """Serialize a data structure into an Avro datum - - :param str schema_name: The Avro schema name - :param dict data: The value to turn into an Avro datum - :rtype: str - - """ - schema = yield self._avro_schema(schema_name) - bytes_io = io.BytesIO() - encoder = avro.io.BinaryEncoder(bytes_io) - writer = avro.io.DatumWriter(schema) - try: - writer.write(data, encoder) - except avro.io.AvroTypeException as error: - raise ValueError(error) - raise gen.Return(bytes_io.getvalue()) +version_info = (2, 0, 0) +__version__ = '.'.join(str(v) for v in version_info) def install(application, **kwargs): - """Call this to install avro publishing for the Tornado application.""" + """Call this to install avro publishing for the Tornado application. + + :rtype: bool + + """ amqp.install(application, **kwargs) if 'avro_schema_uri_format' not in application.settings: LOGGER.warning('avro_schema_uri_format is not set, using default') - setattr(application, 'avro', {}) + setattr(application, 'avro_schemas', {}) + return True diff --git a/sprockets/mixins/avro_publisher/mixins.py b/sprockets/mixins/avro_publisher/mixins.py new file mode 100644 index 0000000..0c85ff5 --- /dev/null +++ b/sprockets/mixins/avro_publisher/mixins.py @@ -0,0 +1,182 @@ +import io +import json +import logging + +from sprockets.mixins import amqp +from tornado import gen, httpclient +import fastavro + +LOGGER = logging.getLogger(__name__) + + +class PublishingMixin(amqp.PublishingMixin): + """The request handler will connect to RabbitMQ on the first request, + blocking until the connection and channel are established. If RabbitMQ + closes its connection to the app at any point, a connection attempt will + be made on the next request. + + This class implements a pattern for the use of a single AMQP connection + to RabbitMQ. + """ + DATUM_MIME_TYPE = 'application/vnd.apache.avro.datum' + DEFAULT_SCHEMA_URI_FORMAT = 'http://localhost/avro/%(name)s.avsc' + DEFAULT_FETCH_RETRY_DELAY = 0.5 + + def initialize(self, *args, **kwargs): + self._schema_fetch_failed = False + + if not hasattr(self, '_http_client'): + self._http_client = httpclient.AsyncHTTPClient(force_instance=True) + + self._schema_uri_format = self.application.settings.get( + 'avro_schema_uri_format', + self.DEFAULT_SCHEMA_URI_FORMAT) + + self._fetch_retry_delay = self.application.settings.get( + 'avro_schema_fetch_retry_delay', + self.DEFAULT_FETCH_RETRY_DELAY) + + if hasattr(super(PublishingMixin, self), 'initialize'): + super(PublishingMixin, self).initialize(*args, **kwargs) + + @gen.coroutine + def avro_amqp_publish(self, exchange, routing_key, message_type, + data, properties=None, mandatory=False): + """Publish a message to RabbitMQ, serializing the payload data as an + Avro datum and creating the AMQP message properties. + + :param str exchange: The exchange to publish the message to. + :param str routing_key: The routing key to publish the message with. + :param str message_type: The message type for the Avro schema. + :param dict data: The message data to serialize. + :param dict properties: An optional dict of additional properties + to append. Will not override mandatory + properties: + content_type, type + :param bool mandatory: Whether to instruct the server to return an + unqueueable message + http://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.publish.mandatory + + :raises: sprockets.mixins.avro_publisher.SchemaFetchError + """ + # Set mandatory Avro-related properties + properties = properties or {} + properties['content_type'] = self.DATUM_MIME_TYPE + properties['type'] = message_type + + yield self.amqp_publish(exchange, routing_key, data, properties, + mandatory) + + @gen.coroutine + def amqp_publish(self, exchange, routing_key, body, properties, + mandatory=False): + """Publish a message to RabbitMQ, serializing the payload data as an + Avro datum if the message is to be sent as such. + + :param str exchange: The exchange to publish the message to. + :param str routing_key: The routing key to publish the message with. + :param dict body: The message data to serialize. + :param dict properties: A dict of additional properties + to append. If publishing an Avro message, it + must contain the Avro message type at 'type' + and have a content type of + 'application/vnd.apache.avro.datum' + :param bool mandatory: Whether to instruct the server to return an + unqueueable message + http://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.publish.mandatory + + :raises: sprockets.mixins.avro_publisher.SchemaFetchError + """ + if (('content_type' in properties and + properties['content_type']) == self.DATUM_MIME_TYPE): + avro_schema = yield self._schema(properties['type']) + body = self._serialize(avro_schema, body) + + yield super(PublishingMixin, self).amqp_publish( + exchange, + routing_key, + body, + properties, + mandatory + ) + + @gen.coroutine + def _schema(self, message_type): + """Fetch the Avro schema file from application cache or the remote URI. + If the request for the schema from the remote URI fails, a + :exc:`sprockets.mixins.avro_publisher.SchemaFetchError` will be raised. + + :param str message_type: The message type for the Avro schema. + + :rtype: str + + :raises: sprockets.mixins.avro_publisher.SchemaFetchError + + """ + if message_type not in self.application.avro_schemas: + schema = yield self._fetch_schema(message_type) + self.application.avro_schemas[message_type] = schema + raise gen.Return(self.application.avro_schemas[message_type]) + + @gen.coroutine + def _fetch_schema(self, message_type): + """Fetch the Avro schema for the given message type from a remote + location, returning the schema JSON string. + + If fetching the schema results in an ``tornado.httpclient.HTTPError``, + it will retry once then raise a SchemaFetchError if the retry fails. + + :param str message_type: The message type for the Avro schema. + + :rtype: str + + :raises: sprockets.mixins.avro_publisher.SchemaFetchError + + """ + url = self._schema_url(message_type) + LOGGER.debug('Loading schema for %s from %s', message_type, url) + try: + response = yield self._http_client.fetch(url) + except httpclient.HTTPError as error: + if self._schema_fetch_failed: + LOGGER.error('Could not fetch Avro schema for %s (%s)', + message_type, error) + raise SchemaFetchError(str(error)) + else: + self._schema_fetch_failed = True + yield gen.sleep(self._fetch_retry_delay) + yield self._fetch_schema(message_type) + + else: + self._schema_fetch_failed = False + raise gen.Return(json.loads(response.body.decode('utf-8'))) + + def _schema_url(self, message_type): + """Return the URL for the given message type for retrieving the Avro + schema from a remote location. + + :param str message_type: The message type for the Avro schema. + + :rtype: str + + """ + return self._schema_uri_format % {'name': message_type} + + @staticmethod + def _serialize(schema, data): + """Serialize a data structure into an Avro datum. + + :param dict schema: The parsed Avro schema. + :param dict data: The value to turn into an Avro datum. + + :rtype: bytes + + """ + stream = io.BytesIO() + fastavro.schemaless_writer(stream, schema, data) + return stream.getvalue() + + +class SchemaFetchError(ValueError): + """Raised when the Avro schema could not be fetched.""" + pass diff --git a/tests.py b/tests.py new file mode 100644 index 0000000..b591302 --- /dev/null +++ b/tests.py @@ -0,0 +1,423 @@ +import io +import json +import logging +import os +import uuid + +from pika import spec +import mock + +from tornado.concurrent import Future +from tornado.httpclient import HTTPError +from tornado import gen, locks, testing, web +import fastavro + +from sprockets.mixins import avro_publisher +from sprockets.mixins.avro_publisher.mixins import SchemaFetchError + +MESSAGE_TYPE = "example.avro.User" + +AVRO_SCHEMA = """ +{"namespace": "example.avro", + "type": "record", + "name": "User", + "fields": [ + {"name": "name", "type": "string"}, + {"name": "favorite_number", "type": ["int", "null"]}, + {"name": "favorite_color", "type": ["string", "null"]} + ] +} +""" + +# Set this URL to that of a running AMQP server before executing tests +if 'TEST_AMQP_URL' in os.environ: + AMQP_URL = os.environ['TEST_AMQP_URL'] +else: + AMQP_URL = 'amqp://guest:guest@localhost:5672/%2f' + +LOGGER = logging.getLogger(__name__) + + +class TestRequestHandler(avro_publisher.PublishingMixin): + + def __init__(self, application): + self.application = application + self.correlation_id = str(uuid.uuid4()) + self.initialize() + + def set_client(self, client): + self._http_client = client + + +@mock.patch('tornado.httpclient.AsyncHTTPClient') +class MockHTTPClient: + + def fetch(self, *args, **kwargs): + with mock.patch('tornado.httpclient.HTTPResponse') as response_class: + response = response_class.return_value + response.body = AVRO_SCHEMA.encode() + + future = Future() + future.set_result(response) + return future + + +@mock.patch('tornado.httpclient.AsyncHTTPClient') +class MockHTTPClientError: + + def __init__(self): + self.times_called = 0 + + def fetch(self, *args, **kwargs): + self.times_called += 1 + raise HTTPError(500) + + +class BaseTestCase(testing.AsyncTestCase): + + @gen.coroutine + def setUp(self): + super(BaseTestCase, self).setUp() + + # make sure that our logging statements get executed + avro_publisher.mixins.LOGGER.enabled = True + avro_publisher.mixins.LOGGER.setLevel(logging.DEBUG) + + self.exchange = str(uuid.uuid4()) + self.queue = str(uuid.uuid4()) + self.routing_key = MESSAGE_TYPE + self.correlation_id = str(uuid.uuid4()) + self.message = None + self.test_queue_bound = locks.Event() + self.get_response = locks.Event() + self.amqp_ready = locks.Event() + self.condition = locks.Condition() + self.config = { + "url": AMQP_URL, + "reconnect_delay": 1, + "timeout": 2, + "on_ready_callback": self.on_ready, + "on_unavailable_callback": self.on_unavailable, + "on_persistent_failure_callback": self.on_persistent_failure, + "on_message_returned_callback": self.on_message_returned, + "io_loop": self.io_loop, + } + self.app = web.Application() + self.app.settings = { + 'service': 'unit_tests', + 'version': '0.0', + } + + self.clear_event_tracking() + + self.handler = TestRequestHandler(self.app) + + self.create_http_client() + + avro_publisher.install(self.app, **self.config) + yield self.condition.wait(self.io_loop.time() + 5) + + LOGGER.info('Connected to RabbitMQ, declaring exchange %s', + self.exchange) + self.app.amqp.channel.exchange_declare(self.on_exchange_declare_ok, + self.exchange, + auto_delete=True) + + def create_http_client(self): + client = MockHTTPClient() + self.handler.set_client(client) + + def create_http_error_client(self): + client = MockHTTPClientError() + self.handler.set_client(client) + + def on_exchange_declare_ok(self, _method): + LOGGER.info( + 'Exchange %s declared, declaring queue %s', + self.exchange, + self.queue + ) + self.app.amqp.channel.queue_declare(self.on_queue_declare_ok, + queue=self.queue, + auto_delete=True) + + def on_queue_declare_ok(self, _method): + LOGGER.info('Queue %s declared', self.queue) + self.app.amqp.channel.queue_bind(self.on_bind_ok, self.queue, + self.exchange, self.routing_key) + + def on_bind_ok(self, _method): + LOGGER.info('Queue %s bound to %s', self.queue, self.exchange) + self.app.amqp.channel.add_callback(self.on_get_response, + [spec.Basic.GetEmpty], False) + self.test_queue_bound.set() + + def on_get_response(self, channel, method, properties=None, body=None): + LOGGER.info('get_response: %r', method) + self.message = { + 'method': method, + 'properties': properties, + 'body': body, + } + self.get_response.set() + + def on_ready(self, caller): + LOGGER.info('on_ready called') + self.ready_called = True + self.amqp_ready.set() + + def on_unavailable(self, caller): + LOGGER.info('on_unavailable called') + self.unavailable_called = True + self.amqp_ready.clear() + + def on_persistent_failure(self, caller, exchange, + routing_key, body, properties): + LOGGER.info('on_persistent_failure called') + self.persistent_failure_called = True + self.failed_message = { + 'exchange': exchange, + 'routing_key': routing_key, + 'body': body, + 'properties': properties, + } + self.amqp_ready.clear() + + def on_message_returned(self, caller, method, properties, body): + LOGGER.info('on_message_returned called') + self.message_returned_called = True + self.message_returned_error = method.reply_text + self.returned_message = { + 'exchange': method.exchange, + 'routing_key': method.routing_key, + 'body': body, + 'properties': properties, + } + + def clear_event_tracking(self): + self.ready_called = False + self.unavailable_called = False + self.persistent_failure_called = False + self.message_returned_called = False + self.failed_message = { + 'exchange': None, + 'routing_key': None, + 'body': None, + 'properties': None, + } + self.returned_message = { + 'exchange': None, + 'routing_key': None, + 'body': None, + 'properties': None, + } + + @gen.coroutine + def get_message(self): + self.message = None + self.get_response.clear() + self.app.amqp.channel.basic_get(self.on_get_response, self.queue) + + LOGGER.info('Waiting on get') + yield self.get_response.wait() + if isinstance(self.message['method'], spec.Basic.GetEmpty): + raise ValueError('Basic.GetEmpty') + raise gen.Return(self.message) + + +class SettingsTests(testing.AsyncTestCase): + + @testing.gen_test(timeout=10) + def should_warn_when_no_uri_schema_set_test(self): + app = web.Application() + + with mock.patch('logging.Logger.warning') as mock_logger: + avro_publisher.install(app, url=AMQP_URL) + + mock_logger.assert_called_with( + 'avro_schema_uri_format is not set, using default') + + @testing.gen_test(timeout=10) + def should_set_default_settings_test(self): + app = web.Application() + + avro_publisher.install(app, url=AMQP_URL) + + handler = TestRequestHandler(app) + + self.assertEqual( + handler._schema_uri_format, + avro_publisher.PublishingMixin.DEFAULT_SCHEMA_URI_FORMAT + ) + self.assertEqual( + handler._fetch_retry_delay, + avro_publisher.PublishingMixin.DEFAULT_FETCH_RETRY_DELAY + ) + + @testing.gen_test(timeout=10) + def should_override_default_with_configured_settings_test(self): + app = web.Application() + app.settings = { + 'avro_schema_uri_format': 'http://127.0.0.1/avro/%(name)s.avsc', + 'avro_schema_fetch_retry_delay': + avro_publisher.PublishingMixin.DEFAULT_FETCH_RETRY_DELAY + 1, + + } + + avro_publisher.install(app, url=AMQP_URL) + + handler = TestRequestHandler(app) + + self.assertEqual(handler._schema_uri_format, + app.settings['avro_schema_uri_format']) + self.assertEqual(handler._fetch_retry_delay, + app.settings['avro_schema_fetch_retry_delay']) + + +class AvroIntegrationTests(BaseTestCase): + + @testing.gen_test(timeout=10) + def should_publish_avro_message_test(self): + yield self.test_queue_bound.wait() + + LOGGER.info('Should be ready') + + message = { + "name": "testuser", + "favorite_number": 1, + "favorite_color": "green", + } + + yield self.handler.avro_amqp_publish( + self.exchange, + MESSAGE_TYPE, + self.routing_key, + message + ) + + stream = io.BytesIO() + fastavro.schemaless_writer(stream, json.loads(AVRO_SCHEMA), message) + serialized_message = stream.getvalue() + + LOGGER.info('Published') + + result = yield self.get_message() + + self.assertEqual(serialized_message, result['body']) + self.assertEqual(self.handler.app_id, result['properties'].app_id) + self.assertEqual(self.handler.correlation_id, + result['properties'].correlation_id) + self.assertEqual(avro_publisher.PublishingMixin.DATUM_MIME_TYPE, + result['properties'].content_type) + + @testing.gen_test(timeout=10) + def should_publish_other_format_amqp_message_test(self): + yield self.test_queue_bound.wait() + + LOGGER.info('Should be ready') + + message = bytes(bytearray(range(255, 0, -1))) + properties = {'content_type': 'application/octet-stream'} + + yield self.handler.amqp_publish( + self.exchange, + self.routing_key, + message, + properties + ) + + LOGGER.info('Published') + + result = yield self.get_message() + + self.assertEqual(message, result['body']) + self.assertEqual(self.handler.app_id, result['properties'].app_id) + self.assertEqual(self.handler.correlation_id, + result['properties'].correlation_id) + self.assertEqual(properties['content_type'], + result['properties'].content_type) + + @testing.gen_test(timeout=10) + def should_raise_schema_fetch_error_on_fetch_failure_test(self): + yield self.test_queue_bound.wait() + + LOGGER.info('Should be ready') + + self.create_http_error_client() + + with self.assertRaises(SchemaFetchError): + message = { + "name": "testuser", + "favorite_number": 1, + "favorite_color": "green", + } + + yield self.handler.avro_amqp_publish( + self.exchange, + MESSAGE_TYPE, + self.routing_key, + message + ) + + @testing.gen_test(timeout=10) + def should_retry_once_on_fetch_failure_test(self): + yield self.test_queue_bound.wait() + + LOGGER.info('Should be ready') + + self.create_http_error_client() + + with self.assertRaises(SchemaFetchError): + message = { + "name": "testuser", + "favorite_number": 1, + "favorite_color": "green", + } + + yield self.handler.avro_amqp_publish( + self.exchange, + MESSAGE_TYPE, + self.routing_key, + message + ) + + self.assertEqual(2, self.handler._http_client.times_called) + + @testing.gen_test(timeout=10) + def should_serialize_avro_message_when_amqp_publish_called_test(self): + yield self.test_queue_bound.wait() + + LOGGER.info('Should be ready') + + message = { + "name": "testuser", + "favorite_number": 1, + "favorite_color": "green", + } + + properties = { + 'content_type': avro_publisher.PublishingMixin.DATUM_MIME_TYPE, + 'type': MESSAGE_TYPE + } + + yield self.handler.amqp_publish( + self.exchange, + self.routing_key, + message, + properties + ) + + stream = io.BytesIO() + fastavro.schemaless_writer(stream, json.loads(AVRO_SCHEMA), message) + serialized_message = stream.getvalue() + + LOGGER.info('Published') + + result = yield self.get_message() + + self.assertEqual(serialized_message, result['body']) + self.assertEqual(self.handler.app_id, result['properties'].app_id) + self.assertEqual(self.handler.correlation_id, + result['properties'].correlation_id) + self.assertEqual(avro_publisher.PublishingMixin.DATUM_MIME_TYPE, + result['properties'].content_type)