From aec6974bc7a0b291efce4cc91211aa52531ef820 Mon Sep 17 00:00:00 2001 From: Robin Klingsberg Date: Thu, 13 Apr 2017 17:13:22 -0400 Subject: [PATCH] Replace Avro publisher internals with latest internal version Move mixin to its own file Add helper method for Avro publishing Update setup.py and requires files to current standard Replace avro with fastavro Update MANIFEST.in with new requires files Add setup.cfg Add docs dir with index and history files Add unit tests Add unit tests to Travis CI config Add Python 2.7, pypy, and 3.5.1 to Travis CI config h/t to @gmr for the new internals --- .travis.yml | 16 +- MANIFEST.in | 4 +- README.rst | 16 +- docs/history.rst | 26 ++ docs/index.rst | 10 + requires/installation.txt | 3 + requires/python2.txt | 2 - requires/python3.txt | 2 - requires/testing.txt | 5 + setup.cfg | 7 + setup.py | 31 +- sprockets/mixins/avro_publisher/__init__.py | 148 ++----- sprockets/mixins/avro_publisher/mixins.py | 182 +++++++++ tests.py | 423 ++++++++++++++++++++ 14 files changed, 733 insertions(+), 142 deletions(-) create mode 100644 docs/history.rst create mode 100644 docs/index.rst create mode 100644 requires/installation.txt delete mode 100644 requires/python2.txt delete mode 100644 requires/python3.txt create mode 100644 requires/testing.txt create mode 100644 setup.cfg create mode 100644 sprockets/mixins/avro_publisher/mixins.py create mode 100644 tests.py diff --git a/.travis.yml b/.travis.yml index 63db33b..6d6a031 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,15 +1,23 @@ language: python python: -- '3.4' +- 2.7 +- pypy +- 3.4 +- 3.5.1 +services: +- rabbitmq install: -- pip install -e . -script: '' +- pip install -r requires/testing.txt -e . +script: +- nosetests +after_success: +- codecov deploy: provider: pypi user: sprockets password: secure: TPpdlEbbxfjcFXR7KizRNiwqZDdB8C8/y0zE/nbFvlds1ZfdqRwCZJlvgsVwhUfiMrUfKiZ3sg1kSI6wN0Li8Z4DVe8RPyhnOXjml6DDao4iZ569LYEfqMdfWrp6NcN/+sMOpGlq5XuQMcdsy3P9uP8WGOUzRwjuQ0ny+2BN8yxD3TxY+TqgYo3FaCYwR0bp5u8l1pmX9gIbD8DhcbbC7EyO+/t8IZj4x5TxIQamIvhWyd8LIFpvR1FcCKRPbqu2x2fPZG4t6YwBHbcmLf8VnZx5xFGvOKEP9HaN4YkWtSIHQ/RhCuFslSPg4peHK3xgurDKMsXdvxnsV2AgSQBEQEaWt6ewACbM4nyW09K++LKK0F19U6keSzYgLZZK+Twsn02xhNpQf58k5kuAB0pJNm+EFxymUcJjR5g9gnVE2ln/Y3MkU1YhXRJGvo0hwdcytkDaPitIASuPC9buy/UQ8smzYPfA60PAF9Dl0/gq8lIWteq3u6PJQT+qtSobWwhR/nFYqTWDMk1sZu4sHVe1IPhSnJonlWccPe/AcS6qG8QNUt5n4fC1l5rerfsiplo+aSLH8gb6p5eiueBAXGwH/akZa6nb9hs1gFFZicHlCzMXlvA845qiLBHbj1ABNJri8jnRvtNxNcYdqBu73lkhExIHlsIG8sgRw93bN1ys73A= on: - tags: true python: 3.4 + tags: true all_branches: true diff --git a/MANIFEST.in b/MANIFEST.in index 43f5a43..e16af52 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,4 +1,4 @@ -include requires/python2.txt -include requires/python3.txt include LICENSE include README.rst +graft docs +graft requires diff --git a/README.rst b/README.rst index d54d845..eb5c69a 100644 --- a/README.rst +++ b/README.rst @@ -1,6 +1,6 @@ sprockets.mixins.avro-publisher =============================== -AMQP Publishing Mixin for publishing messages as Avro datum +AMQP Publishing Mixin for publishing a message as an Avro datum. |Version| |Downloads| @@ -16,7 +16,9 @@ and can be installed via ``pip`` or ``easy_install``: Requirements ------------ -- sprockets.mixins.amqp>=1.0.0 +- sprockets.mixins.amqp>=2.0.0 +- fastavro>=0.10.1,<1.0.0 +- tornado>=4.2.0,<5.0.0 Example ------- @@ -44,14 +46,16 @@ This examples demonstrates the most basic usage of ``sprockets.mixins.avro-publi avro_publisher.install(application) return application - class RequestHandler(avro_publisher.AvroPublishingMixin, web.RequestHandler): + class RequestHandler(avro_publisher.PublishingMixin, web.RequestHandler): @gen.coroutine def get(self, *args, **kwargs): body = {'request': self.request.path, 'args': args, 'kwargs': kwargs} - yield self.amqp_publish('exchange', 'routing.key', body, - {'content_type': avro_publisher.DATUM_MIME_TYPE, - 'type': 'avro-schema-name'}) + yield self.avro_amqp_publish( + 'exchange', + 'routing.key', + 'avro-schema-name' + body) if __name__ == "__main__": application = make_app() diff --git a/docs/history.rst b/docs/history.rst new file mode 100644 index 0000000..a505ee4 --- /dev/null +++ b/docs/history.rst @@ -0,0 +1,26 @@ +Version History +=============== + +`2.0.0`_ Apr 26, 2017 +--------------------- +- Move Mixin to separate file +- Replace code with latest internal version +- Rename AvroPublishingMixin to PublishingMixin +- Update setup.py and requires files to current standard +- Replace avro library with fastavro library +- Add avro_amqp_publish helper method +- Add retry when schema cannot be fetched + - Delay before retrying is configurable via application.settings: + - avro_schema_fetch_retry_delay (default 0.5 seconds) +- Separate HTTP client from common app-based pool to help avoid excessive locking on high load +- Add unit tests + - Test execution requires a running AMQP server, see tests.py + +`0.1.0`_ Sept 24, 2015 +---------------------- + - Initial implementation + +.. _Next Release: https://github.com/sprockets/sprockets.mixins.avro-publisher/compare/2.0.0...HEAD +.. _2.0.0: https://github.com/sprockets/sprockets.mixins.avro-publisher/compare/1.0.1...2.0.0 +.. _1.0.1: https://github.com/sprockets/sprockets.mixins.avro-publisher/compare/1.0.0...1.0.1 +.. _1.0.0: https://github.com/sprockets/sprockets.mixins.avro-publisher/compare/7324bea...1.0.0 diff --git a/docs/index.rst b/docs/index.rst new file mode 100644 index 0000000..ecda180 --- /dev/null +++ b/docs/index.rst @@ -0,0 +1,10 @@ +.. include:: ../README.rst + +Issues +------ +Please report any issues to the Github project at `https://github.com/sprockets/sprockets.mixins.avro-publisher/issues `_ + +.. toctree:: + :hidden: + + history diff --git a/requires/installation.txt b/requires/installation.txt new file mode 100644 index 0000000..f7164f0 --- /dev/null +++ b/requires/installation.txt @@ -0,0 +1,3 @@ +sprockets.mixins.amqp>=2.0.0,<3 +fastavro>=0.10.1,<1.0.0 +tornado>=4.2.0,<5.0.0 diff --git a/requires/python2.txt b/requires/python2.txt deleted file mode 100644 index 8953202..0000000 --- a/requires/python2.txt +++ /dev/null @@ -1,2 +0,0 @@ -sprockets.mixins.amqp>=1.0.0,<2 -avro>=1.7.7,<2 diff --git a/requires/python3.txt b/requires/python3.txt deleted file mode 100644 index 8804715..0000000 --- a/requires/python3.txt +++ /dev/null @@ -1,2 +0,0 @@ -sprockets.mixins.amqp>=1.0.0,<2 -avro-python3>=1.7.7,<2 diff --git a/requires/testing.txt b/requires/testing.txt new file mode 100644 index 0000000..b81929a --- /dev/null +++ b/requires/testing.txt @@ -0,0 +1,5 @@ +coverage>=3.7,<4 +nose>=1.3.1,<2.0.0 +mock>=2.0.0,<3 +wheel +-r installation.txt diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 0000000..e08b28e --- /dev/null +++ b/setup.cfg @@ -0,0 +1,7 @@ +[bdist_wheel] +universal = 1 + +[nosetests] +with-coverage = 1 +cover-erase = 1 +cover-package = sprockets.mixins.avro_publisher diff --git a/setup.py b/setup.py index 31a6453..6e76681 100644 --- a/setup.py +++ b/setup.py @@ -1,14 +1,31 @@ -import setuptools -import sys +#!/usr/bin/env python +import os.path -requires = 'requires/python{0}.txt'.format(sys.version_info[0]) -with open(requires) as handle: - requirements = [line.strip() for line in handle.readlines()] +import setuptools + +from sprockets.mixins.avro_publisher import __version__ + + +def read_requirements(name): + requirements = [] + try: + with open(os.path.join('requires', name)) as req_file: + for line in req_file: + if '#' in line: + line = line[:line.index('#')] + line = line.strip() + if line.startswith('-r'): + requirements.extend(read_requirements(line[2:].strip())) + elif line and not line.startswith('-'): + requirements.append(line) + except IOError: + pass + return requirements setuptools.setup( name='sprockets.mixins.avro-publisher', - version='1.0.1', + version=__version__, description='Mixin for publishing events to RabbitMQ as avro datums', long_description=open('README.rst').read(), url='https://github.com/sprockets/sprockets.mixins.avro-publisher', @@ -32,5 +49,5 @@ setuptools.setup( ], packages=setuptools.find_packages(), namespace_packages=['sprockets', 'sprockets.mixins'], - install_requires=requirements, + install_requires=read_requirements('installation.txt'), zip_safe=True) diff --git a/sprockets/mixins/avro_publisher/__init__.py b/sprockets/mixins/avro_publisher/__init__.py index 003bbd4..3189c18 100644 --- a/sprockets/mixins/avro_publisher/__init__.py +++ b/sprockets/mixins/avro_publisher/__init__.py @@ -1,15 +1,5 @@ -"""The AvroPublishingMixin wraps RabbitMQ use into a request handler, with -methods to speed the development of publishing RabbitMQ messages serialized -as Avro datums. - -RabbitMQ is configured using two environment variables: ``AMQP_URL`` and -``AMQP_TIMEOUT``. - -``AMQP_URL`` is the AMQP url to connect to, defaults to -``amqp://guest:guest@localhost:5672/%2f``. - -``AMQP_TIMEOUT`` is the number of seconds to wait until timing out when -connecting to RabbitMQ. +"""The AvroPublishingMixin adds Apache Avro serialization to the +RabbitMQ publishing capabilities in sprockets.mixins.amqp To configure the URL format for the avro schema, add a Tornado application setting called ``avro_schema_uri_format``. The format @@ -17,132 +7,52 @@ should be similar to the following: ``http://my-schema-repository/avro/%(name)s.avsc`` +Take note also of the required configurations in sprockets.mixins.amqp """ -import io import logging -import sys - -from sprockets.mixins import amqp -from tornado import gen -from tornado import httpclient -import avro.io -import avro.schema - -version_info = (1, 0, 1) -__version__ = '.'.join(str(v) for v in version_info) LOGGER = logging.getLogger(__name__) -PYTHON3 = True if sys.version_info > (3, 0, 0) else False +try: + from sprockets.mixins import amqp + from sprockets.mixins.avro_publisher.mixins import (PublishingMixin, + SchemaFetchError) -class AvroPublishingMixin(amqp.PublishingMixin): - """The request handler will connect to RabbitMQ on the first request, - blocking until the connection and channel are established. If RabbitMQ - closes it's connection to the app at any point, a connection attempt will - be made on the next request. +except ImportError as error: + class PublishingMixin(object): + def __init__(self, *args, **kwargs): + raise error - This class implements a pattern for the use of a single AMQP connection - to RabbitMQ. + class SchemaFetchError(Exception): + def __init__(self, *args, **kwargs): + raise error - Expects the :envvar:`AMQP_URL` environment variable to construct - :class:`pika.connection.URLParameters`. + class amqp(object): - """ - DATUM_MIME_TYPE = 'application/vnd.apache.avro.datum' - DEFAULT_SCHEMA_URI_FORMAT = 'http://localhost/avro/%(name)s.avsc' + error = None - @gen.coroutine - def amqp_publish(self, exchange, routing_key, body, properties): - """Publish the message to RabbitMQ + @classmethod + def install(cls, *args, **kwargs): + raise cls.error - :param str exchange: The exchange to publish to - :param str routing_key: The routing key to publish with - :param dict body: The message body - :param dict properties: The message properties + amqp.error = error - """ - if (('content_type' in properties and - properties['content_type']) == self.DATUM_MIME_TYPE): - body = yield self._avro_serialize(properties['type'], body) - yield self.application.amqp.publish(exchange, routing_key, body, - properties) - - @gen.coroutine - def _avro_fetch_schema(self, schema_name): - """Fetch the avro schema file from the remote HTTP endpoint - - :param str schema_name: The schema name - :rtype: str - - """ - http_client = httpclient.AsyncHTTPClient() - url = self._avro_schema_url(schema_name) - LOGGER.info('Loading schema for %s from %s', schema_name, url) - try: - response = yield http_client.fetch(url) - except httpclient.HTTPError as error: - LOGGER.error('Could not fetch Avro schema for %s (%s)', schema_name, - error) - raise ValueError('Error fetching avro schema') - raise gen.Return(response.body) - - @gen.coroutine - def _avro_schema(self, schema_name): - """Fetch the Avro schema file from cache or the filesystem. - - :param str schema_name: The avro schema name - :rtype: str - - """ - if schema_name not in self.application.avro: - schema = yield self._avro_fetch_schema(schema_name) - if PYTHON3: - schema = str(schema, 'utf-8') - self.application.avro[schema_name] = avro.schema.Parse(schema) - else: - self.application.avro[schema_name] = avro.schema.parse(schema) - raise gen.Return(self.application.avro[schema_name]) - - def _avro_schema_url(self, schema_name): - """Return the Avro schema URL for the specified schema name. - - :param str schema_name: The avro schema name - :rtype: str - - """ - if 'avro_schema_uri_format' in self.application.settings: - schema_format = self.application.settings['avro_schema_uri_format'] - else: - schema_format = self.DEFAULT_SCHEMA_URI_FORMAT - return schema_format % {'name': schema_name} - - @gen.coroutine - def _avro_serialize(self, schema_name, data): - """Serialize a data structure into an Avro datum - - :param str schema_name: The Avro schema name - :param dict data: The value to turn into an Avro datum - :rtype: str - - """ - schema = yield self._avro_schema(schema_name) - bytes_io = io.BytesIO() - encoder = avro.io.BinaryEncoder(bytes_io) - writer = avro.io.DatumWriter(schema) - try: - writer.write(data, encoder) - except avro.io.AvroTypeException as error: - raise ValueError(error) - raise gen.Return(bytes_io.getvalue()) +version_info = (2, 0, 0) +__version__ = '.'.join(str(v) for v in version_info) def install(application, **kwargs): - """Call this to install avro publishing for the Tornado application.""" + """Call this to install avro publishing for the Tornado application. + + :rtype: bool + + """ amqp.install(application, **kwargs) if 'avro_schema_uri_format' not in application.settings: LOGGER.warning('avro_schema_uri_format is not set, using default') - setattr(application, 'avro', {}) + setattr(application, 'avro_schemas', {}) + return True diff --git a/sprockets/mixins/avro_publisher/mixins.py b/sprockets/mixins/avro_publisher/mixins.py new file mode 100644 index 0000000..0c85ff5 --- /dev/null +++ b/sprockets/mixins/avro_publisher/mixins.py @@ -0,0 +1,182 @@ +import io +import json +import logging + +from sprockets.mixins import amqp +from tornado import gen, httpclient +import fastavro + +LOGGER = logging.getLogger(__name__) + + +class PublishingMixin(amqp.PublishingMixin): + """The request handler will connect to RabbitMQ on the first request, + blocking until the connection and channel are established. If RabbitMQ + closes its connection to the app at any point, a connection attempt will + be made on the next request. + + This class implements a pattern for the use of a single AMQP connection + to RabbitMQ. + """ + DATUM_MIME_TYPE = 'application/vnd.apache.avro.datum' + DEFAULT_SCHEMA_URI_FORMAT = 'http://localhost/avro/%(name)s.avsc' + DEFAULT_FETCH_RETRY_DELAY = 0.5 + + def initialize(self, *args, **kwargs): + self._schema_fetch_failed = False + + if not hasattr(self, '_http_client'): + self._http_client = httpclient.AsyncHTTPClient(force_instance=True) + + self._schema_uri_format = self.application.settings.get( + 'avro_schema_uri_format', + self.DEFAULT_SCHEMA_URI_FORMAT) + + self._fetch_retry_delay = self.application.settings.get( + 'avro_schema_fetch_retry_delay', + self.DEFAULT_FETCH_RETRY_DELAY) + + if hasattr(super(PublishingMixin, self), 'initialize'): + super(PublishingMixin, self).initialize(*args, **kwargs) + + @gen.coroutine + def avro_amqp_publish(self, exchange, routing_key, message_type, + data, properties=None, mandatory=False): + """Publish a message to RabbitMQ, serializing the payload data as an + Avro datum and creating the AMQP message properties. + + :param str exchange: The exchange to publish the message to. + :param str routing_key: The routing key to publish the message with. + :param str message_type: The message type for the Avro schema. + :param dict data: The message data to serialize. + :param dict properties: An optional dict of additional properties + to append. Will not override mandatory + properties: + content_type, type + :param bool mandatory: Whether to instruct the server to return an + unqueueable message + http://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.publish.mandatory + + :raises: sprockets.mixins.avro_publisher.SchemaFetchError + """ + # Set mandatory Avro-related properties + properties = properties or {} + properties['content_type'] = self.DATUM_MIME_TYPE + properties['type'] = message_type + + yield self.amqp_publish(exchange, routing_key, data, properties, + mandatory) + + @gen.coroutine + def amqp_publish(self, exchange, routing_key, body, properties, + mandatory=False): + """Publish a message to RabbitMQ, serializing the payload data as an + Avro datum if the message is to be sent as such. + + :param str exchange: The exchange to publish the message to. + :param str routing_key: The routing key to publish the message with. + :param dict body: The message data to serialize. + :param dict properties: A dict of additional properties + to append. If publishing an Avro message, it + must contain the Avro message type at 'type' + and have a content type of + 'application/vnd.apache.avro.datum' + :param bool mandatory: Whether to instruct the server to return an + unqueueable message + http://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.publish.mandatory + + :raises: sprockets.mixins.avro_publisher.SchemaFetchError + """ + if (('content_type' in properties and + properties['content_type']) == self.DATUM_MIME_TYPE): + avro_schema = yield self._schema(properties['type']) + body = self._serialize(avro_schema, body) + + yield super(PublishingMixin, self).amqp_publish( + exchange, + routing_key, + body, + properties, + mandatory + ) + + @gen.coroutine + def _schema(self, message_type): + """Fetch the Avro schema file from application cache or the remote URI. + If the request for the schema from the remote URI fails, a + :exc:`sprockets.mixins.avro_publisher.SchemaFetchError` will be raised. + + :param str message_type: The message type for the Avro schema. + + :rtype: str + + :raises: sprockets.mixins.avro_publisher.SchemaFetchError + + """ + if message_type not in self.application.avro_schemas: + schema = yield self._fetch_schema(message_type) + self.application.avro_schemas[message_type] = schema + raise gen.Return(self.application.avro_schemas[message_type]) + + @gen.coroutine + def _fetch_schema(self, message_type): + """Fetch the Avro schema for the given message type from a remote + location, returning the schema JSON string. + + If fetching the schema results in an ``tornado.httpclient.HTTPError``, + it will retry once then raise a SchemaFetchError if the retry fails. + + :param str message_type: The message type for the Avro schema. + + :rtype: str + + :raises: sprockets.mixins.avro_publisher.SchemaFetchError + + """ + url = self._schema_url(message_type) + LOGGER.debug('Loading schema for %s from %s', message_type, url) + try: + response = yield self._http_client.fetch(url) + except httpclient.HTTPError as error: + if self._schema_fetch_failed: + LOGGER.error('Could not fetch Avro schema for %s (%s)', + message_type, error) + raise SchemaFetchError(str(error)) + else: + self._schema_fetch_failed = True + yield gen.sleep(self._fetch_retry_delay) + yield self._fetch_schema(message_type) + + else: + self._schema_fetch_failed = False + raise gen.Return(json.loads(response.body.decode('utf-8'))) + + def _schema_url(self, message_type): + """Return the URL for the given message type for retrieving the Avro + schema from a remote location. + + :param str message_type: The message type for the Avro schema. + + :rtype: str + + """ + return self._schema_uri_format % {'name': message_type} + + @staticmethod + def _serialize(schema, data): + """Serialize a data structure into an Avro datum. + + :param dict schema: The parsed Avro schema. + :param dict data: The value to turn into an Avro datum. + + :rtype: bytes + + """ + stream = io.BytesIO() + fastavro.schemaless_writer(stream, schema, data) + return stream.getvalue() + + +class SchemaFetchError(ValueError): + """Raised when the Avro schema could not be fetched.""" + pass diff --git a/tests.py b/tests.py new file mode 100644 index 0000000..b591302 --- /dev/null +++ b/tests.py @@ -0,0 +1,423 @@ +import io +import json +import logging +import os +import uuid + +from pika import spec +import mock + +from tornado.concurrent import Future +from tornado.httpclient import HTTPError +from tornado import gen, locks, testing, web +import fastavro + +from sprockets.mixins import avro_publisher +from sprockets.mixins.avro_publisher.mixins import SchemaFetchError + +MESSAGE_TYPE = "example.avro.User" + +AVRO_SCHEMA = """ +{"namespace": "example.avro", + "type": "record", + "name": "User", + "fields": [ + {"name": "name", "type": "string"}, + {"name": "favorite_number", "type": ["int", "null"]}, + {"name": "favorite_color", "type": ["string", "null"]} + ] +} +""" + +# Set this URL to that of a running AMQP server before executing tests +if 'TEST_AMQP_URL' in os.environ: + AMQP_URL = os.environ['TEST_AMQP_URL'] +else: + AMQP_URL = 'amqp://guest:guest@localhost:5672/%2f' + +LOGGER = logging.getLogger(__name__) + + +class TestRequestHandler(avro_publisher.PublishingMixin): + + def __init__(self, application): + self.application = application + self.correlation_id = str(uuid.uuid4()) + self.initialize() + + def set_client(self, client): + self._http_client = client + + +@mock.patch('tornado.httpclient.AsyncHTTPClient') +class MockHTTPClient: + + def fetch(self, *args, **kwargs): + with mock.patch('tornado.httpclient.HTTPResponse') as response_class: + response = response_class.return_value + response.body = AVRO_SCHEMA.encode() + + future = Future() + future.set_result(response) + return future + + +@mock.patch('tornado.httpclient.AsyncHTTPClient') +class MockHTTPClientError: + + def __init__(self): + self.times_called = 0 + + def fetch(self, *args, **kwargs): + self.times_called += 1 + raise HTTPError(500) + + +class BaseTestCase(testing.AsyncTestCase): + + @gen.coroutine + def setUp(self): + super(BaseTestCase, self).setUp() + + # make sure that our logging statements get executed + avro_publisher.mixins.LOGGER.enabled = True + avro_publisher.mixins.LOGGER.setLevel(logging.DEBUG) + + self.exchange = str(uuid.uuid4()) + self.queue = str(uuid.uuid4()) + self.routing_key = MESSAGE_TYPE + self.correlation_id = str(uuid.uuid4()) + self.message = None + self.test_queue_bound = locks.Event() + self.get_response = locks.Event() + self.amqp_ready = locks.Event() + self.condition = locks.Condition() + self.config = { + "url": AMQP_URL, + "reconnect_delay": 1, + "timeout": 2, + "on_ready_callback": self.on_ready, + "on_unavailable_callback": self.on_unavailable, + "on_persistent_failure_callback": self.on_persistent_failure, + "on_message_returned_callback": self.on_message_returned, + "io_loop": self.io_loop, + } + self.app = web.Application() + self.app.settings = { + 'service': 'unit_tests', + 'version': '0.0', + } + + self.clear_event_tracking() + + self.handler = TestRequestHandler(self.app) + + self.create_http_client() + + avro_publisher.install(self.app, **self.config) + yield self.condition.wait(self.io_loop.time() + 5) + + LOGGER.info('Connected to RabbitMQ, declaring exchange %s', + self.exchange) + self.app.amqp.channel.exchange_declare(self.on_exchange_declare_ok, + self.exchange, + auto_delete=True) + + def create_http_client(self): + client = MockHTTPClient() + self.handler.set_client(client) + + def create_http_error_client(self): + client = MockHTTPClientError() + self.handler.set_client(client) + + def on_exchange_declare_ok(self, _method): + LOGGER.info( + 'Exchange %s declared, declaring queue %s', + self.exchange, + self.queue + ) + self.app.amqp.channel.queue_declare(self.on_queue_declare_ok, + queue=self.queue, + auto_delete=True) + + def on_queue_declare_ok(self, _method): + LOGGER.info('Queue %s declared', self.queue) + self.app.amqp.channel.queue_bind(self.on_bind_ok, self.queue, + self.exchange, self.routing_key) + + def on_bind_ok(self, _method): + LOGGER.info('Queue %s bound to %s', self.queue, self.exchange) + self.app.amqp.channel.add_callback(self.on_get_response, + [spec.Basic.GetEmpty], False) + self.test_queue_bound.set() + + def on_get_response(self, channel, method, properties=None, body=None): + LOGGER.info('get_response: %r', method) + self.message = { + 'method': method, + 'properties': properties, + 'body': body, + } + self.get_response.set() + + def on_ready(self, caller): + LOGGER.info('on_ready called') + self.ready_called = True + self.amqp_ready.set() + + def on_unavailable(self, caller): + LOGGER.info('on_unavailable called') + self.unavailable_called = True + self.amqp_ready.clear() + + def on_persistent_failure(self, caller, exchange, + routing_key, body, properties): + LOGGER.info('on_persistent_failure called') + self.persistent_failure_called = True + self.failed_message = { + 'exchange': exchange, + 'routing_key': routing_key, + 'body': body, + 'properties': properties, + } + self.amqp_ready.clear() + + def on_message_returned(self, caller, method, properties, body): + LOGGER.info('on_message_returned called') + self.message_returned_called = True + self.message_returned_error = method.reply_text + self.returned_message = { + 'exchange': method.exchange, + 'routing_key': method.routing_key, + 'body': body, + 'properties': properties, + } + + def clear_event_tracking(self): + self.ready_called = False + self.unavailable_called = False + self.persistent_failure_called = False + self.message_returned_called = False + self.failed_message = { + 'exchange': None, + 'routing_key': None, + 'body': None, + 'properties': None, + } + self.returned_message = { + 'exchange': None, + 'routing_key': None, + 'body': None, + 'properties': None, + } + + @gen.coroutine + def get_message(self): + self.message = None + self.get_response.clear() + self.app.amqp.channel.basic_get(self.on_get_response, self.queue) + + LOGGER.info('Waiting on get') + yield self.get_response.wait() + if isinstance(self.message['method'], spec.Basic.GetEmpty): + raise ValueError('Basic.GetEmpty') + raise gen.Return(self.message) + + +class SettingsTests(testing.AsyncTestCase): + + @testing.gen_test(timeout=10) + def should_warn_when_no_uri_schema_set_test(self): + app = web.Application() + + with mock.patch('logging.Logger.warning') as mock_logger: + avro_publisher.install(app, url=AMQP_URL) + + mock_logger.assert_called_with( + 'avro_schema_uri_format is not set, using default') + + @testing.gen_test(timeout=10) + def should_set_default_settings_test(self): + app = web.Application() + + avro_publisher.install(app, url=AMQP_URL) + + handler = TestRequestHandler(app) + + self.assertEqual( + handler._schema_uri_format, + avro_publisher.PublishingMixin.DEFAULT_SCHEMA_URI_FORMAT + ) + self.assertEqual( + handler._fetch_retry_delay, + avro_publisher.PublishingMixin.DEFAULT_FETCH_RETRY_DELAY + ) + + @testing.gen_test(timeout=10) + def should_override_default_with_configured_settings_test(self): + app = web.Application() + app.settings = { + 'avro_schema_uri_format': 'http://127.0.0.1/avro/%(name)s.avsc', + 'avro_schema_fetch_retry_delay': + avro_publisher.PublishingMixin.DEFAULT_FETCH_RETRY_DELAY + 1, + + } + + avro_publisher.install(app, url=AMQP_URL) + + handler = TestRequestHandler(app) + + self.assertEqual(handler._schema_uri_format, + app.settings['avro_schema_uri_format']) + self.assertEqual(handler._fetch_retry_delay, + app.settings['avro_schema_fetch_retry_delay']) + + +class AvroIntegrationTests(BaseTestCase): + + @testing.gen_test(timeout=10) + def should_publish_avro_message_test(self): + yield self.test_queue_bound.wait() + + LOGGER.info('Should be ready') + + message = { + "name": "testuser", + "favorite_number": 1, + "favorite_color": "green", + } + + yield self.handler.avro_amqp_publish( + self.exchange, + MESSAGE_TYPE, + self.routing_key, + message + ) + + stream = io.BytesIO() + fastavro.schemaless_writer(stream, json.loads(AVRO_SCHEMA), message) + serialized_message = stream.getvalue() + + LOGGER.info('Published') + + result = yield self.get_message() + + self.assertEqual(serialized_message, result['body']) + self.assertEqual(self.handler.app_id, result['properties'].app_id) + self.assertEqual(self.handler.correlation_id, + result['properties'].correlation_id) + self.assertEqual(avro_publisher.PublishingMixin.DATUM_MIME_TYPE, + result['properties'].content_type) + + @testing.gen_test(timeout=10) + def should_publish_other_format_amqp_message_test(self): + yield self.test_queue_bound.wait() + + LOGGER.info('Should be ready') + + message = bytes(bytearray(range(255, 0, -1))) + properties = {'content_type': 'application/octet-stream'} + + yield self.handler.amqp_publish( + self.exchange, + self.routing_key, + message, + properties + ) + + LOGGER.info('Published') + + result = yield self.get_message() + + self.assertEqual(message, result['body']) + self.assertEqual(self.handler.app_id, result['properties'].app_id) + self.assertEqual(self.handler.correlation_id, + result['properties'].correlation_id) + self.assertEqual(properties['content_type'], + result['properties'].content_type) + + @testing.gen_test(timeout=10) + def should_raise_schema_fetch_error_on_fetch_failure_test(self): + yield self.test_queue_bound.wait() + + LOGGER.info('Should be ready') + + self.create_http_error_client() + + with self.assertRaises(SchemaFetchError): + message = { + "name": "testuser", + "favorite_number": 1, + "favorite_color": "green", + } + + yield self.handler.avro_amqp_publish( + self.exchange, + MESSAGE_TYPE, + self.routing_key, + message + ) + + @testing.gen_test(timeout=10) + def should_retry_once_on_fetch_failure_test(self): + yield self.test_queue_bound.wait() + + LOGGER.info('Should be ready') + + self.create_http_error_client() + + with self.assertRaises(SchemaFetchError): + message = { + "name": "testuser", + "favorite_number": 1, + "favorite_color": "green", + } + + yield self.handler.avro_amqp_publish( + self.exchange, + MESSAGE_TYPE, + self.routing_key, + message + ) + + self.assertEqual(2, self.handler._http_client.times_called) + + @testing.gen_test(timeout=10) + def should_serialize_avro_message_when_amqp_publish_called_test(self): + yield self.test_queue_bound.wait() + + LOGGER.info('Should be ready') + + message = { + "name": "testuser", + "favorite_number": 1, + "favorite_color": "green", + } + + properties = { + 'content_type': avro_publisher.PublishingMixin.DATUM_MIME_TYPE, + 'type': MESSAGE_TYPE + } + + yield self.handler.amqp_publish( + self.exchange, + self.routing_key, + message, + properties + ) + + stream = io.BytesIO() + fastavro.schemaless_writer(stream, json.loads(AVRO_SCHEMA), message) + serialized_message = stream.getvalue() + + LOGGER.info('Published') + + result = yield self.get_message() + + self.assertEqual(serialized_message, result['body']) + self.assertEqual(self.handler.app_id, result['properties'].app_id) + self.assertEqual(self.handler.correlation_id, + result['properties'].correlation_id) + self.assertEqual(avro_publisher.PublishingMixin.DATUM_MIME_TYPE, + result['properties'].content_type)