Refactor to reflect changes to sprockets.mixins.amqp

This commit is contained in:
Gavin M. Roy 2017-05-03 22:44:37 -04:00
parent 6730f3a244
commit bd479da7cb
10 changed files with 326 additions and 625 deletions

3
.gitignore vendored
View file

@ -54,3 +54,6 @@ coverage.xml
# Sphinx documentation
docs/_build/
.idea
sprockets/mixins/amqp
sprockets/mixins/http

View file

@ -1,23 +1,25 @@
language: python
python:
- 2.7
- pypy
- 3.4
- 3.5.1
- 2.7
- pypy
- 3.4
- 3.5
- 3.6
services:
- rabbitmq
- rabbitmq
install:
- pip install -r requires/testing.txt -e .
- pip install -r requires/testing.txt -e .
script:
- nosetests
- nosetests
after_success:
- codecov
- codecov
deploy:
provider: pypi
user: sprockets
password:
secure: TPpdlEbbxfjcFXR7KizRNiwqZDdB8C8/y0zE/nbFvlds1ZfdqRwCZJlvgsVwhUfiMrUfKiZ3sg1kSI6wN0Li8Z4DVe8RPyhnOXjml6DDao4iZ569LYEfqMdfWrp6NcN/+sMOpGlq5XuQMcdsy3P9uP8WGOUzRwjuQ0ny+2BN8yxD3TxY+TqgYo3FaCYwR0bp5u8l1pmX9gIbD8DhcbbC7EyO+/t8IZj4x5TxIQamIvhWyd8LIFpvR1FcCKRPbqu2x2fPZG4t6YwBHbcmLf8VnZx5xFGvOKEP9HaN4YkWtSIHQ/RhCuFslSPg4peHK3xgurDKMsXdvxnsV2AgSQBEQEaWt6ewACbM4nyW09K++LKK0F19U6keSzYgLZZK+Twsn02xhNpQf58k5kuAB0pJNm+EFxymUcJjR5g9gnVE2ln/Y3MkU1YhXRJGvo0hwdcytkDaPitIASuPC9buy/UQ8smzYPfA60PAF9Dl0/gq8lIWteq3u6PJQT+qtSobWwhR/nFYqTWDMk1sZu4sHVe1IPhSnJonlWccPe/AcS6qG8QNUt5n4fC1l5rerfsiplo+aSLH8gb6p5eiueBAXGwH/akZa6nb9hs1gFFZicHlCzMXlvA845qiLBHbj1ABNJri8jnRvtNxNcYdqBu73lkhExIHlsIG8sgRw93bN1ys73A=
on:
python: 3.4
python: 3.5
distributions: sdist bdist_wheel
tags: true
all_branches: true

View file

@ -1,6 +1,12 @@
Version History
===============
`2.1.0`_ May 3, 2017
--------------------
- Consolidate code
- Streamline and use sprockets.mixins.http as well
- Replace tests with integration tests modeled after sprockets.mixins.amqp
`2.0.0`_ Apr 26, 2017
---------------------
- Move Mixin to separate file
@ -20,7 +26,8 @@ Version History
----------------------
- Initial implementation
.. _Next Release: https://github.com/sprockets/sprockets.mixins.avro-publisher/compare/2.0.0...HEAD
.. _Next Release: https://github.com/sprockets/sprockets.mixins.avro-publisher/compare/2.1.0...HEAD
.. _2.1.0: https://github.com/sprockets/sprockets.mixins.avro-publisher/compare/2.0.0...2.1.0
.. _2.0.0: https://github.com/sprockets/sprockets.mixins.avro-publisher/compare/1.0.1...2.0.0
.. _1.0.1: https://github.com/sprockets/sprockets.mixins.avro-publisher/compare/1.0.0...1.0.1
.. _1.0.0: https://github.com/sprockets/sprockets.mixins.avro-publisher/compare/7324bea...1.0.0

View file

@ -1,3 +1,4 @@
sprockets.mixins.amqp>=2.0.0,<3
sprockets.mixins.amqp>=2.1.1,<3
sprockets.mixins.http>=1.0.3,<2
fastavro>=0.10.1,<1.0.0
tornado>=4.2.0,<5.0.0

View file

@ -1,5 +1,6 @@
coverage>=3.7,<4
coverage>=4,<5
nose>=1.3.1,<2.0.0
mock>=2.0.0,<3
wheel
flake8
pylint
-r installation.txt

View file

@ -2,6 +2,16 @@
universal = 1
[nosetests]
with-coverage = 1
cover-branches = 1
cover-erase = 1
cover-package = sprockets.mixins.avro_publisher
with-coverage = 1
verbosity = 2
cover-html = 1
cover-html-dir = build/coverage
[upload_docs]
upload-dir = build/sphinx/html
[flake8]
exclude = env,build

View file

@ -3,8 +3,6 @@ import os.path
import setuptools
from sprockets.mixins.avro_publisher import __version__
def read_requirements(name):
requirements = []
@ -25,7 +23,7 @@ def read_requirements(name):
setuptools.setup(
name='sprockets.mixins.avro-publisher',
version=__version__,
version='2.1.0',
description='Mixin for publishing events to RabbitMQ as avro datums',
long_description=open('README.rst').read(),
url='https://github.com/sprockets/sprockets.mixins.avro-publisher',

View file

@ -8,51 +8,152 @@ should be similar to the following:
``http://my-schema-repository/avro/%(name)s.avsc``
Take note also of the required configurations in sprockets.mixins.amqp
"""
import io
import logging
from sprockets.mixins import amqp, http
from tornado import gen
import fastavro
__version__ = '2.1.0'
LOGGER = logging.getLogger(__name__)
try:
from sprockets.mixins import amqp
DATUM_MIME_TYPE = 'application/vnd.apache.avro.datum'
SCHEMA_URI_FORMAT = 'http://localhost/avro/%(name)s.avsc'
from sprockets.mixins.avro_publisher.mixins import (PublishingMixin,
SchemaFetchError)
except ImportError as error:
class PublishingMixin(object):
def __init__(self, *args, **kwargs):
raise error
class SchemaFetchError(Exception):
def __init__(self, *args, **kwargs):
raise error
class amqp(object):
error = None
@classmethod
def install(cls, *args, **kwargs):
raise cls.error
amqp.error = error
version_info = (2, 0, 0)
__version__ = '.'.join(str(v) for v in version_info)
_SCHEMAS = {}
def install(application, **kwargs):
"""Call this to install avro publishing for the Tornado application.
def install(application, io_loop=None, **kwargs): # pragma: nocover
"""Call this to install Avro publishing for the Tornado application.
:rtype: bool
"""
amqp.install(application, **kwargs)
amqp.install(application, io_loop=io_loop, **kwargs)
if 'avro_schema_uri_format' not in application.settings:
LOGGER.warning('avro_schema_uri_format is not set, using default')
setattr(application, 'avro_schemas', {})
return True
class PublishingMixin(amqp.PublishingMixin, http.HTTPClientMixin):
"""Publish to Avro encoded datums to RabbitMQ"""
def avro_amqp_publish(self, exchange, routing_key, message_type,
data, properties=None):
"""Publish a message to RabbitMQ, serializing the payload data as an
Avro datum and creating the AMQP message properties.
:param str exchange: The exchange to publish the message to.
:param str routing_key: The routing key to publish the message with.
:param str message_type: The message type for the Avro schema.
:param dict data: The message data to serialize.
:param dict properties: An optional dict of additional properties
to append.
:raises: sprockets.mixins.avro_publisher.SchemaFetchError
"""
properties = properties or {}
properties['content_type'] = DATUM_MIME_TYPE
properties['type'] = message_type
return self.amqp_publish(exchange, routing_key, data, properties)
@gen.coroutine
def amqp_publish(self, exchange, routing_key, body, properties=None):
"""Publish a message to RabbitMQ, serializing the payload data as an
Avro datum if the message is to be sent as such.
:param str exchange: The exchange to publish the message to.
:param str routing_key: The routing key to publish the message with.
:param dict body: The message data to serialize.
:param dict properties: An optional dict of additional properties
to append. If publishing an Avro message, it
must contain the Avro message type at 'type'
and have a content type of
'application/vnd.apache.avro.datum'
:raises: sprockets.mixins.avro_publisher.SchemaFetchError
"""
LOGGER.debug('Publishing Here: %r %r %r %r',
exchange, routing_key, body, properties)
properties = properties or {}
if properties.get('content_type') == DATUM_MIME_TYPE and \
isinstance(body, dict):
avro_schema = yield self._schema(properties['type'])
LOGGER.debug('Schema: %r', avro_schema)
body = self._serialize(avro_schema, body)
yield super(PublishingMixin, self).amqp_publish(
exchange, routing_key, body, properties)
@gen.coroutine
def _schema(self, message_type):
"""Fetch the Avro schema file from application cache or the remote
URI. If the request for the schema from the remote URI fails, a
:exc:`sprockets.mixins.avro_publisher.SchemaFetchError` will be
raised.
:param str message_type: The message type for the Avro schema.
:rtype: str
:raises: sprockets.mixins.avro_publisher.SchemaFetchError
"""
global _SCHEMAS
if message_type not in _SCHEMAS:
schema = yield self._fetch_schema(message_type)
_SCHEMAS[message_type] = schema
raise gen.Return(_SCHEMAS[message_type])
@gen.coroutine
def _fetch_schema(self, message_type):
"""Fetch the Avro schema for the given message type from a remote
location, returning the schema JSON string.
If the schema can not be retrieved, a
:exc:`~sprockets.mixins.avro_publisher.SchemaFetchError` will be
raised.
:param str message_type: The message type for the Avro schema.
:rtype: str
:raises: sprockets.mixins.avro_publisher.SchemaFetchError
"""
response = yield self.http_fetch(self._schema_url(message_type))
if response.ok:
raise gen.Return(response.body)
raise SchemaFetchError()
def _schema_url(self, message_type):
"""Return the URL for the given message type for retrieving the Avro
schema from a remote location.
:param str message_type: The message type for the Avro schema.
:rtype: str
"""
return self.application.settings.get(
'avro_schema_uri_format',
SCHEMA_URI_FORMAT) % {'name': message_type}
@staticmethod
def _serialize(schema, data):
"""Serialize a data structure into an Avro datum.
:param dict schema: The parsed Avro schema.
:param dict data: The value to turn into an Avro datum.
:rtype: bytes
"""
stream = io.BytesIO()
fastavro.schemaless_writer(stream, schema, data)
return stream.getvalue()
class SchemaFetchError(Exception):
"""Raised when the Avro schema could not be fetched."""
pass

View file

@ -1,182 +0,0 @@
import io
import json
import logging
from sprockets.mixins import amqp
from tornado import gen, httpclient
import fastavro
LOGGER = logging.getLogger(__name__)
class PublishingMixin(amqp.PublishingMixin):
"""The request handler will connect to RabbitMQ on the first request,
blocking until the connection and channel are established. If RabbitMQ
closes its connection to the app at any point, a connection attempt will
be made on the next request.
This class implements a pattern for the use of a single AMQP connection
to RabbitMQ.
"""
DATUM_MIME_TYPE = 'application/vnd.apache.avro.datum'
DEFAULT_SCHEMA_URI_FORMAT = 'http://localhost/avro/%(name)s.avsc'
DEFAULT_FETCH_RETRY_DELAY = 0.5
def initialize(self, *args, **kwargs):
self._schema_fetch_failed = False
if not hasattr(self, '_http_client'):
self._http_client = httpclient.AsyncHTTPClient(force_instance=True)
self._schema_uri_format = self.application.settings.get(
'avro_schema_uri_format',
self.DEFAULT_SCHEMA_URI_FORMAT)
self._fetch_retry_delay = self.application.settings.get(
'avro_schema_fetch_retry_delay',
self.DEFAULT_FETCH_RETRY_DELAY)
if hasattr(super(PublishingMixin, self), 'initialize'):
super(PublishingMixin, self).initialize(*args, **kwargs)
@gen.coroutine
def avro_amqp_publish(self, exchange, routing_key, message_type,
data, properties=None, mandatory=False):
"""Publish a message to RabbitMQ, serializing the payload data as an
Avro datum and creating the AMQP message properties.
:param str exchange: The exchange to publish the message to.
:param str routing_key: The routing key to publish the message with.
:param str message_type: The message type for the Avro schema.
:param dict data: The message data to serialize.
:param dict properties: An optional dict of additional properties
to append. Will not override mandatory
properties:
content_type, type
:param bool mandatory: Whether to instruct the server to return an
unqueueable message
http://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.publish.mandatory
:raises: sprockets.mixins.avro_publisher.SchemaFetchError
"""
# Set mandatory Avro-related properties
properties = properties or {}
properties['content_type'] = self.DATUM_MIME_TYPE
properties['type'] = message_type
yield self.amqp_publish(exchange, routing_key, data, properties,
mandatory)
@gen.coroutine
def amqp_publish(self, exchange, routing_key, body, properties,
mandatory=False):
"""Publish a message to RabbitMQ, serializing the payload data as an
Avro datum if the message is to be sent as such.
:param str exchange: The exchange to publish the message to.
:param str routing_key: The routing key to publish the message with.
:param dict body: The message data to serialize.
:param dict properties: A dict of additional properties
to append. If publishing an Avro message, it
must contain the Avro message type at 'type'
and have a content type of
'application/vnd.apache.avro.datum'
:param bool mandatory: Whether to instruct the server to return an
unqueueable message
http://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.publish.mandatory
:raises: sprockets.mixins.avro_publisher.SchemaFetchError
"""
if (('content_type' in properties and
properties['content_type']) == self.DATUM_MIME_TYPE):
avro_schema = yield self._schema(properties['type'])
body = self._serialize(avro_schema, body)
yield super(PublishingMixin, self).amqp_publish(
exchange,
routing_key,
body,
properties,
mandatory
)
@gen.coroutine
def _schema(self, message_type):
"""Fetch the Avro schema file from application cache or the remote URI.
If the request for the schema from the remote URI fails, a
:exc:`sprockets.mixins.avro_publisher.SchemaFetchError` will be raised.
:param str message_type: The message type for the Avro schema.
:rtype: str
:raises: sprockets.mixins.avro_publisher.SchemaFetchError
"""
if message_type not in self.application.avro_schemas:
schema = yield self._fetch_schema(message_type)
self.application.avro_schemas[message_type] = schema
raise gen.Return(self.application.avro_schemas[message_type])
@gen.coroutine
def _fetch_schema(self, message_type):
"""Fetch the Avro schema for the given message type from a remote
location, returning the schema JSON string.
If fetching the schema results in an ``tornado.httpclient.HTTPError``,
it will retry once then raise a SchemaFetchError if the retry fails.
:param str message_type: The message type for the Avro schema.
:rtype: str
:raises: sprockets.mixins.avro_publisher.SchemaFetchError
"""
url = self._schema_url(message_type)
LOGGER.debug('Loading schema for %s from %s', message_type, url)
try:
response = yield self._http_client.fetch(url)
except httpclient.HTTPError as error:
if self._schema_fetch_failed:
LOGGER.error('Could not fetch Avro schema for %s (%s)',
message_type, error)
raise SchemaFetchError(str(error))
else:
self._schema_fetch_failed = True
yield gen.sleep(self._fetch_retry_delay)
yield self._fetch_schema(message_type)
else:
self._schema_fetch_failed = False
raise gen.Return(json.loads(response.body.decode('utf-8')))
def _schema_url(self, message_type):
"""Return the URL for the given message type for retrieving the Avro
schema from a remote location.
:param str message_type: The message type for the Avro schema.
:rtype: str
"""
return self._schema_uri_format % {'name': message_type}
@staticmethod
def _serialize(schema, data):
"""Serialize a data structure into an Avro datum.
:param dict schema: The parsed Avro schema.
:param dict data: The value to turn into an Avro datum.
:rtype: bytes
"""
stream = io.BytesIO()
fastavro.schemaless_writer(stream, schema, data)
return stream.getvalue()
class SchemaFetchError(ValueError):
"""Raised when the Avro schema could not be fetched."""
pass

544
tests.py
View file

@ -1,423 +1,183 @@
import io
import json
import io
import logging
import os
import random
import uuid
from pika import spec
import mock
from tornado.concurrent import Future
from tornado.httpclient import HTTPError
from tornado import gen, locks, testing, web
from tornado import concurrent, gen, locks, testing, web
import fastavro
from pika import spec
from sprockets.mixins import avro_publisher
from sprockets.mixins.avro_publisher.mixins import SchemaFetchError
MESSAGE_TYPE = "example.avro.User"
AVRO_SCHEMA = """
{"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}
"""
# Set this URL to that of a running AMQP server before executing tests
if 'TEST_AMQP_URL' in os.environ:
AMQP_URL = os.environ['TEST_AMQP_URL']
else:
AMQP_URL = 'amqp://guest:guest@localhost:5672/%2f'
from sprockets.mixins import amqp, avro_publisher
LOGGER = logging.getLogger(__name__)
MESSAGE_TYPE = "example.avro.Test"
class TestRequestHandler(avro_publisher.PublishingMixin):
def __init__(self, application):
self.application = application
self.correlation_id = str(uuid.uuid4())
self.initialize()
def set_client(self, client):
self._http_client = client
AVRO_SCHEMA = {
"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}]}
@mock.patch('tornado.httpclient.AsyncHTTPClient')
class MockHTTPClient:
def fetch(self, *args, **kwargs):
with mock.patch('tornado.httpclient.HTTPResponse') as response_class:
response = response_class.return_value
response.body = AVRO_SCHEMA.encode()
future = Future()
future.set_result(response)
return future
def deserialize(value):
return fastavro.schemaless_reader(io.BytesIO(value), AVRO_SCHEMA)
@mock.patch('tornado.httpclient.AsyncHTTPClient')
class MockHTTPClientError:
class Test1RequestHandler(avro_publisher.PublishingMixin, web.RequestHandler):
def __init__(self):
self.times_called = 0
def fetch(self, *args, **kwargs):
self.times_called += 1
raise HTTPError(500)
class BaseTestCase(testing.AsyncTestCase):
def initialize(self):
self.correlation_id = self.request.headers.get('Correlation-Id')
self.publish = self.amqp_publish
@gen.coroutine
def get(self, *args, **kwargs):
LOGGER.debug('Handling Request %r', self.correlation_id)
parameters = self.parameters()
try:
yield self.publish(**parameters)
except amqp.AMQPException as error:
self.write({'error': str(error),
'type': error.__class__.__name__,
'parameters': parameters})
else:
self.write(parameters) # Correlation-ID is added pass by reference
self.finish()
def parameters(self):
return {
'exchange': self.get_argument('exchange', str(uuid.uuid4())),
'routing_key': self.get_argument('routing_key', str(uuid.uuid4())),
'body': {
'name': str(uuid.uuid4()),
'favorite_number': random.randint(1, 1000),
'favorite_color': str(uuid.uuid4())
},
'properties': {
'content_type': avro_publisher.DATUM_MIME_TYPE,
'message_id': str(uuid.uuid4()),
'type': MESSAGE_TYPE}}
class Test2RequestHandler(Test1RequestHandler):
def initialize(self):
self.correlation_id = self.request.headers.get('Correlation-Id')
self.publish = self.avro_amqp_publish
def parameters(self):
return {
'exchange': self.get_argument('exchange', str(uuid.uuid4())),
'routing_key': self.get_argument('routing_key', str(uuid.uuid4())),
'message_type': MESSAGE_TYPE,
'data': {
'name': str(uuid.uuid4()),
'favorite_number': random.randint(1, 1000),
'favorite_color': str(uuid.uuid4())
},
'properties': {'message_id': str(uuid.uuid4())}}
class SchemaRequestHandler(web.RequestHandler):
def get(self, *args, **kwargs):
LOGGER.debug('Returning Schema for %r %r', args, kwargs)
self.finish(AVRO_SCHEMA)
def setUpModule():
logging.getLogger('pika').setLevel(logging.INFO)
class AsyncHTTPTestCase(testing.AsyncHTTPTestCase):
CONFIRMATIONS = True
def setUp(self):
super(BaseTestCase, self).setUp()
# make sure that our logging statements get executed
avro_publisher.mixins.LOGGER.enabled = True
avro_publisher.mixins.LOGGER.setLevel(logging.DEBUG)
self.exchange = str(uuid.uuid4())
self.queue = str(uuid.uuid4())
self.routing_key = MESSAGE_TYPE
super(AsyncHTTPTestCase, self).setUp()
self.correlation_id = str(uuid.uuid4())
self.message = None
self.test_queue_bound = locks.Event()
self.get_response = locks.Event()
self.amqp_ready = locks.Event()
self.condition = locks.Condition()
self.config = {
"url": AMQP_URL,
"reconnect_delay": 1,
"timeout": 2,
"on_ready_callback": self.on_ready,
"on_unavailable_callback": self.on_unavailable,
"on_persistent_failure_callback": self.on_persistent_failure,
"on_message_returned_callback": self.on_message_returned,
"io_loop": self.io_loop,
}
self.app = web.Application()
self.app.settings = {
'service': 'unit_tests',
'version': '0.0',
}
self.exchange = str(uuid.uuid4())
self.get_delivered_message = concurrent.Future()
self.get_returned_message = concurrent.Future()
self.queue = str(uuid.uuid4())
self.routing_key = str(uuid.uuid4())
self.ready = locks.Event()
avro_publisher.install(self._app, self.io_loop, **{
'on_ready_callback': self.on_amqp_ready,
'enable_confirmations': self.CONFIRMATIONS,
'on_return_callback': self.on_message_returned,
'url': 'amqp://guest:guest@127.0.0.1:5672/%2f'})
self.io_loop.start()
self.clear_event_tracking()
def get_app(self):
return web.Application(
[(r'/test1', Test1RequestHandler),
(r'/test2', Test2RequestHandler),
(r'/schema/(.*).avsc', SchemaRequestHandler)],
**{'avro_schema_uri_format': self.get_url('/schema/%(name)s.avsc'),
'service': 'test',
'version': avro_publisher.__version__})
self.handler = TestRequestHandler(self.app)
def on_amqp_ready(self, _client):
LOGGER.debug('AMQP ready')
self._app.amqp.channel.exchange_declare(
self.on_exchange_declared, self.exchange,
durable=False, auto_delete=True)
self.create_http_client()
def on_exchange_declared(self, method):
LOGGER.debug('Exchange declared: %r', method)
self._app.amqp.channel.queue_declare(
self.on_queue_declared, self.queue,
arguments={'x-expires': 30000},
auto_delete=True, durable=False)
avro_publisher.install(self.app, **self.config)
yield self.condition.wait(self.io_loop.time() + 5)
def on_queue_declared(self, method):
LOGGER.debug('Queue declared: %r', method)
self._app.amqp.channel.queue_bind(
self.on_queue_bound, self.queue, self.exchange, self.routing_key)
LOGGER.info('Connected to RabbitMQ, declaring exchange %s',
self.exchange)
self.app.amqp.channel.exchange_declare(self.on_exchange_declare_ok,
self.exchange,
auto_delete=True)
def on_queue_bound(self, method):
LOGGER.debug('Queue bound: %r', method)
self._app.amqp.channel.basic_consume(
self.on_message_delivered, self.queue)
self.io_loop.stop()
def create_http_client(self):
client = MockHTTPClient()
self.handler.set_client(client)
def on_message_delivered(self, _channel, method, properties, body):
self.get_delivered_message.set_result((method, properties, body))
def create_http_error_client(self):
client = MockHTTPClientError()
self.handler.set_client(client)
def on_exchange_declare_ok(self, _method):
LOGGER.info(
'Exchange %s declared, declaring queue %s',
self.exchange,
self.queue
)
self.app.amqp.channel.queue_declare(self.on_queue_declare_ok,
queue=self.queue,
auto_delete=True)
def on_queue_declare_ok(self, _method):
LOGGER.info('Queue %s declared', self.queue)
self.app.amqp.channel.queue_bind(self.on_bind_ok, self.queue,
self.exchange, self.routing_key)
def on_bind_ok(self, _method):
LOGGER.info('Queue %s bound to %s', self.queue, self.exchange)
self.app.amqp.channel.add_callback(self.on_get_response,
[spec.Basic.GetEmpty], False)
self.test_queue_bound.set()
def on_get_response(self, channel, method, properties=None, body=None):
LOGGER.info('get_response: %r', method)
self.message = {
'method': method,
'properties': properties,
'body': body,
}
self.get_response.set()
def on_ready(self, caller):
LOGGER.info('on_ready called')
self.ready_called = True
self.amqp_ready.set()
def on_unavailable(self, caller):
LOGGER.info('on_unavailable called')
self.unavailable_called = True
self.amqp_ready.clear()
def on_persistent_failure(self, caller, exchange,
routing_key, body, properties):
LOGGER.info('on_persistent_failure called')
self.persistent_failure_called = True
self.failed_message = {
'exchange': exchange,
'routing_key': routing_key,
'body': body,
'properties': properties,
}
self.amqp_ready.clear()
def on_message_returned(self, caller, method, properties, body):
LOGGER.info('on_message_returned called')
self.message_returned_called = True
self.message_returned_error = method.reply_text
self.returned_message = {
'exchange': method.exchange,
'routing_key': method.routing_key,
'body': body,
'properties': properties,
}
def clear_event_tracking(self):
self.ready_called = False
self.unavailable_called = False
self.persistent_failure_called = False
self.message_returned_called = False
self.failed_message = {
'exchange': None,
'routing_key': None,
'body': None,
'properties': None,
}
self.returned_message = {
'exchange': None,
'routing_key': None,
'body': None,
'properties': None,
}
@gen.coroutine
def get_message(self):
self.message = None
self.get_response.clear()
self.app.amqp.channel.basic_get(self.on_get_response, self.queue)
LOGGER.info('Waiting on get')
yield self.get_response.wait()
if isinstance(self.message['method'], spec.Basic.GetEmpty):
raise ValueError('Basic.GetEmpty')
raise gen.Return(self.message)
def on_message_returned(self, method, properties, body):
self.get_returned_message.set_result((method, properties, body))
class SettingsTests(testing.AsyncTestCase):
@testing.gen_test(timeout=10)
def should_warn_when_no_uri_schema_set_test(self):
app = web.Application()
with mock.patch('logging.Logger.warning') as mock_logger:
avro_publisher.install(app, url=AMQP_URL)
mock_logger.assert_called_with(
'avro_schema_uri_format is not set, using default')
@testing.gen_test(timeout=10)
def should_set_default_settings_test(self):
app = web.Application()
avro_publisher.install(app, url=AMQP_URL)
handler = TestRequestHandler(app)
class PublisherConfirmationTestCase(AsyncHTTPTestCase):
@testing.gen_test
def test_amqp_publish(self):
response = yield self.http_client.fetch(
self.get_url('/test1?exchange={}&routing_key={}'.format(
self.exchange, self.routing_key)),
headers={'Correlation-Id': self.correlation_id})
published = json.loads(response.body.decode('utf-8'))
delivered = yield self.get_delivered_message
self.assertIsInstance(delivered[0], spec.Basic.Deliver)
self.assertEqual(delivered[1].correlation_id, self.correlation_id)
self.assertEqual(
handler._schema_uri_format,
avro_publisher.PublishingMixin.DEFAULT_SCHEMA_URI_FORMAT
)
delivered[1].content_type, avro_publisher.DATUM_MIME_TYPE)
self.assertEqual(delivered[1].type, MESSAGE_TYPE)
self.assertEqual(deserialize(delivered[2]), published['body'])
@testing.gen_test
def test_avro_amqp_publish(self):
response = yield self.http_client.fetch(
self.get_url('/test2?exchange={}&routing_key={}'.format(
self.exchange, self.routing_key)),
headers={'Correlation-Id': self.correlation_id})
published = json.loads(response.body.decode('utf-8'))
delivered = yield self.get_delivered_message
self.assertIsInstance(delivered[0], spec.Basic.Deliver)
self.assertEqual(delivered[1].correlation_id, self.correlation_id)
self.assertEqual(
handler._fetch_retry_delay,
avro_publisher.PublishingMixin.DEFAULT_FETCH_RETRY_DELAY
)
@testing.gen_test(timeout=10)
def should_override_default_with_configured_settings_test(self):
app = web.Application()
app.settings = {
'avro_schema_uri_format': 'http://127.0.0.1/avro/%(name)s.avsc',
'avro_schema_fetch_retry_delay':
avro_publisher.PublishingMixin.DEFAULT_FETCH_RETRY_DELAY + 1,
}
avro_publisher.install(app, url=AMQP_URL)
handler = TestRequestHandler(app)
self.assertEqual(handler._schema_uri_format,
app.settings['avro_schema_uri_format'])
self.assertEqual(handler._fetch_retry_delay,
app.settings['avro_schema_fetch_retry_delay'])
class AvroIntegrationTests(BaseTestCase):
@testing.gen_test(timeout=10)
def should_publish_avro_message_test(self):
yield self.test_queue_bound.wait()
LOGGER.info('Should be ready')
message = {
"name": "testuser",
"favorite_number": 1,
"favorite_color": "green",
}
yield self.handler.avro_amqp_publish(
self.exchange,
MESSAGE_TYPE,
self.routing_key,
message
)
stream = io.BytesIO()
fastavro.schemaless_writer(stream, json.loads(AVRO_SCHEMA), message)
serialized_message = stream.getvalue()
LOGGER.info('Published')
result = yield self.get_message()
self.assertEqual(serialized_message, result['body'])
self.assertEqual(self.handler.app_id, result['properties'].app_id)
self.assertEqual(self.handler.correlation_id,
result['properties'].correlation_id)
self.assertEqual(avro_publisher.PublishingMixin.DATUM_MIME_TYPE,
result['properties'].content_type)
@testing.gen_test(timeout=10)
def should_publish_other_format_amqp_message_test(self):
yield self.test_queue_bound.wait()
LOGGER.info('Should be ready')
message = bytes(bytearray(range(255, 0, -1)))
properties = {'content_type': 'application/octet-stream'}
yield self.handler.amqp_publish(
self.exchange,
self.routing_key,
message,
properties
)
LOGGER.info('Published')
result = yield self.get_message()
self.assertEqual(message, result['body'])
self.assertEqual(self.handler.app_id, result['properties'].app_id)
self.assertEqual(self.handler.correlation_id,
result['properties'].correlation_id)
self.assertEqual(properties['content_type'],
result['properties'].content_type)
@testing.gen_test(timeout=10)
def should_raise_schema_fetch_error_on_fetch_failure_test(self):
yield self.test_queue_bound.wait()
LOGGER.info('Should be ready')
self.create_http_error_client()
with self.assertRaises(SchemaFetchError):
message = {
"name": "testuser",
"favorite_number": 1,
"favorite_color": "green",
}
yield self.handler.avro_amqp_publish(
self.exchange,
MESSAGE_TYPE,
self.routing_key,
message
)
@testing.gen_test(timeout=10)
def should_retry_once_on_fetch_failure_test(self):
yield self.test_queue_bound.wait()
LOGGER.info('Should be ready')
self.create_http_error_client()
with self.assertRaises(SchemaFetchError):
message = {
"name": "testuser",
"favorite_number": 1,
"favorite_color": "green",
}
yield self.handler.avro_amqp_publish(
self.exchange,
MESSAGE_TYPE,
self.routing_key,
message
)
self.assertEqual(2, self.handler._http_client.times_called)
@testing.gen_test(timeout=10)
def should_serialize_avro_message_when_amqp_publish_called_test(self):
yield self.test_queue_bound.wait()
LOGGER.info('Should be ready')
message = {
"name": "testuser",
"favorite_number": 1,
"favorite_color": "green",
}
properties = {
'content_type': avro_publisher.PublishingMixin.DATUM_MIME_TYPE,
'type': MESSAGE_TYPE
}
yield self.handler.amqp_publish(
self.exchange,
self.routing_key,
message,
properties
)
stream = io.BytesIO()
fastavro.schemaless_writer(stream, json.loads(AVRO_SCHEMA), message)
serialized_message = stream.getvalue()
LOGGER.info('Published')
result = yield self.get_message()
self.assertEqual(serialized_message, result['body'])
self.assertEqual(self.handler.app_id, result['properties'].app_id)
self.assertEqual(self.handler.correlation_id,
result['properties'].correlation_id)
self.assertEqual(avro_publisher.PublishingMixin.DATUM_MIME_TYPE,
result['properties'].content_type)
delivered[1].content_type, avro_publisher.DATUM_MIME_TYPE)
self.assertEqual(delivered[1].type, MESSAGE_TYPE)
self.assertEqual(deserialize(delivered[2]), published['data'])