mirror of
https://github.com/sprockets/sprockets.mixins.avro-publisher.git
synced 2024-12-28 19:29:22 +00:00
Merge pull request #6 from sprockets/amqp-mixin-updates
Refactor to reflect changes to sprockets.mixins.amqp
This commit is contained in:
commit
bf3821ac50
10 changed files with 326 additions and 626 deletions
3
.gitignore
vendored
3
.gitignore
vendored
|
@ -54,3 +54,6 @@ coverage.xml
|
||||||
|
|
||||||
# Sphinx documentation
|
# Sphinx documentation
|
||||||
docs/_build/
|
docs/_build/
|
||||||
|
.idea
|
||||||
|
sprockets/mixins/amqp
|
||||||
|
sprockets/mixins/http
|
||||||
|
|
16
.travis.yml
16
.travis.yml
|
@ -1,17 +1,16 @@
|
||||||
language: python
|
language: python
|
||||||
python:
|
python:
|
||||||
- 2.7
|
- 2.7
|
||||||
- pypy
|
- pypy
|
||||||
- 3.4
|
- 3.4
|
||||||
- 3.5.1
|
|
||||||
services:
|
services:
|
||||||
- rabbitmq
|
- rabbitmq
|
||||||
install:
|
install:
|
||||||
- pip install -r requires/testing.txt -e .
|
- pip install -r requires/testing.txt -e .
|
||||||
script:
|
script:
|
||||||
- nosetests
|
- nosetests
|
||||||
after_success:
|
after_success:
|
||||||
- codecov
|
- codecov
|
||||||
deploy:
|
deploy:
|
||||||
provider: pypi
|
provider: pypi
|
||||||
user: sprockets
|
user: sprockets
|
||||||
|
@ -19,5 +18,6 @@ deploy:
|
||||||
secure: TPpdlEbbxfjcFXR7KizRNiwqZDdB8C8/y0zE/nbFvlds1ZfdqRwCZJlvgsVwhUfiMrUfKiZ3sg1kSI6wN0Li8Z4DVe8RPyhnOXjml6DDao4iZ569LYEfqMdfWrp6NcN/+sMOpGlq5XuQMcdsy3P9uP8WGOUzRwjuQ0ny+2BN8yxD3TxY+TqgYo3FaCYwR0bp5u8l1pmX9gIbD8DhcbbC7EyO+/t8IZj4x5TxIQamIvhWyd8LIFpvR1FcCKRPbqu2x2fPZG4t6YwBHbcmLf8VnZx5xFGvOKEP9HaN4YkWtSIHQ/RhCuFslSPg4peHK3xgurDKMsXdvxnsV2AgSQBEQEaWt6ewACbM4nyW09K++LKK0F19U6keSzYgLZZK+Twsn02xhNpQf58k5kuAB0pJNm+EFxymUcJjR5g9gnVE2ln/Y3MkU1YhXRJGvo0hwdcytkDaPitIASuPC9buy/UQ8smzYPfA60PAF9Dl0/gq8lIWteq3u6PJQT+qtSobWwhR/nFYqTWDMk1sZu4sHVe1IPhSnJonlWccPe/AcS6qG8QNUt5n4fC1l5rerfsiplo+aSLH8gb6p5eiueBAXGwH/akZa6nb9hs1gFFZicHlCzMXlvA845qiLBHbj1ABNJri8jnRvtNxNcYdqBu73lkhExIHlsIG8sgRw93bN1ys73A=
|
secure: TPpdlEbbxfjcFXR7KizRNiwqZDdB8C8/y0zE/nbFvlds1ZfdqRwCZJlvgsVwhUfiMrUfKiZ3sg1kSI6wN0Li8Z4DVe8RPyhnOXjml6DDao4iZ569LYEfqMdfWrp6NcN/+sMOpGlq5XuQMcdsy3P9uP8WGOUzRwjuQ0ny+2BN8yxD3TxY+TqgYo3FaCYwR0bp5u8l1pmX9gIbD8DhcbbC7EyO+/t8IZj4x5TxIQamIvhWyd8LIFpvR1FcCKRPbqu2x2fPZG4t6YwBHbcmLf8VnZx5xFGvOKEP9HaN4YkWtSIHQ/RhCuFslSPg4peHK3xgurDKMsXdvxnsV2AgSQBEQEaWt6ewACbM4nyW09K++LKK0F19U6keSzYgLZZK+Twsn02xhNpQf58k5kuAB0pJNm+EFxymUcJjR5g9gnVE2ln/Y3MkU1YhXRJGvo0hwdcytkDaPitIASuPC9buy/UQ8smzYPfA60PAF9Dl0/gq8lIWteq3u6PJQT+qtSobWwhR/nFYqTWDMk1sZu4sHVe1IPhSnJonlWccPe/AcS6qG8QNUt5n4fC1l5rerfsiplo+aSLH8gb6p5eiueBAXGwH/akZa6nb9hs1gFFZicHlCzMXlvA845qiLBHbj1ABNJri8jnRvtNxNcYdqBu73lkhExIHlsIG8sgRw93bN1ys73A=
|
||||||
on:
|
on:
|
||||||
python: 3.4
|
python: 3.4
|
||||||
|
distributions: sdist bdist_wheel
|
||||||
tags: true
|
tags: true
|
||||||
all_branches: true
|
all_branches: true
|
||||||
|
|
|
@ -1,6 +1,12 @@
|
||||||
Version History
|
Version History
|
||||||
===============
|
===============
|
||||||
|
|
||||||
|
`2.1.0`_ May 3, 2017
|
||||||
|
--------------------
|
||||||
|
- Consolidate code
|
||||||
|
- Streamline and use sprockets.mixins.http as well
|
||||||
|
- Replace tests with integration tests modeled after sprockets.mixins.amqp
|
||||||
|
|
||||||
`2.0.0`_ Apr 26, 2017
|
`2.0.0`_ Apr 26, 2017
|
||||||
---------------------
|
---------------------
|
||||||
- Move Mixin to separate file
|
- Move Mixin to separate file
|
||||||
|
@ -20,7 +26,8 @@ Version History
|
||||||
----------------------
|
----------------------
|
||||||
- Initial implementation
|
- Initial implementation
|
||||||
|
|
||||||
.. _Next Release: https://github.com/sprockets/sprockets.mixins.avro-publisher/compare/2.0.0...HEAD
|
.. _Next Release: https://github.com/sprockets/sprockets.mixins.avro-publisher/compare/2.1.0...HEAD
|
||||||
|
.. _2.1.0: https://github.com/sprockets/sprockets.mixins.avro-publisher/compare/2.0.0...2.1.0
|
||||||
.. _2.0.0: https://github.com/sprockets/sprockets.mixins.avro-publisher/compare/1.0.1...2.0.0
|
.. _2.0.0: https://github.com/sprockets/sprockets.mixins.avro-publisher/compare/1.0.1...2.0.0
|
||||||
.. _1.0.1: https://github.com/sprockets/sprockets.mixins.avro-publisher/compare/1.0.0...1.0.1
|
.. _1.0.1: https://github.com/sprockets/sprockets.mixins.avro-publisher/compare/1.0.0...1.0.1
|
||||||
.. _1.0.0: https://github.com/sprockets/sprockets.mixins.avro-publisher/compare/7324bea...1.0.0
|
.. _1.0.0: https://github.com/sprockets/sprockets.mixins.avro-publisher/compare/7324bea...1.0.0
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
sprockets.mixins.amqp>=2.0.0,<3
|
sprockets.mixins.amqp>=2.1.1,<3
|
||||||
|
sprockets.mixins.http>=1.0.3,<2
|
||||||
fastavro>=0.10.1,<1.0.0
|
fastavro>=0.10.1,<1.0.0
|
||||||
tornado>=4.2.0,<5.0.0
|
tornado>=4.2.0,<5.0.0
|
||||||
|
|
|
@ -1,5 +1,7 @@
|
||||||
coverage>=3.7,<4
|
codecov
|
||||||
nose>=1.3.1,<2.0.0
|
coverage
|
||||||
mock>=2.0.0,<3
|
nose
|
||||||
wheel
|
mock
|
||||||
|
flake8
|
||||||
|
pylint
|
||||||
-r installation.txt
|
-r installation.txt
|
||||||
|
|
10
setup.cfg
10
setup.cfg
|
@ -2,6 +2,14 @@
|
||||||
universal = 1
|
universal = 1
|
||||||
|
|
||||||
[nosetests]
|
[nosetests]
|
||||||
with-coverage = 1
|
cover-branches = 1
|
||||||
cover-erase = 1
|
cover-erase = 1
|
||||||
cover-package = sprockets.mixins.avro_publisher
|
cover-package = sprockets.mixins.avro_publisher
|
||||||
|
with-coverage = 1
|
||||||
|
verbosity = 2
|
||||||
|
|
||||||
|
[upload_docs]
|
||||||
|
upload-dir = build/sphinx/html
|
||||||
|
|
||||||
|
[flake8]
|
||||||
|
exclude = env,build
|
||||||
|
|
4
setup.py
4
setup.py
|
@ -3,8 +3,6 @@ import os.path
|
||||||
|
|
||||||
import setuptools
|
import setuptools
|
||||||
|
|
||||||
from sprockets.mixins.avro_publisher import __version__
|
|
||||||
|
|
||||||
|
|
||||||
def read_requirements(name):
|
def read_requirements(name):
|
||||||
requirements = []
|
requirements = []
|
||||||
|
@ -25,7 +23,7 @@ def read_requirements(name):
|
||||||
|
|
||||||
setuptools.setup(
|
setuptools.setup(
|
||||||
name='sprockets.mixins.avro-publisher',
|
name='sprockets.mixins.avro-publisher',
|
||||||
version=__version__,
|
version='2.1.0',
|
||||||
description='Mixin for publishing events to RabbitMQ as avro datums',
|
description='Mixin for publishing events to RabbitMQ as avro datums',
|
||||||
long_description=open('README.rst').read(),
|
long_description=open('README.rst').read(),
|
||||||
url='https://github.com/sprockets/sprockets.mixins.avro-publisher',
|
url='https://github.com/sprockets/sprockets.mixins.avro-publisher',
|
||||||
|
|
|
@ -8,51 +8,153 @@ should be similar to the following:
|
||||||
``http://my-schema-repository/avro/%(name)s.avsc``
|
``http://my-schema-repository/avro/%(name)s.avsc``
|
||||||
|
|
||||||
Take note also of the required configurations in sprockets.mixins.amqp
|
Take note also of the required configurations in sprockets.mixins.amqp
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
import io
|
||||||
|
import json
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
|
from sprockets.mixins import amqp, http
|
||||||
|
from tornado import gen
|
||||||
|
import fastavro
|
||||||
|
|
||||||
|
__version__ = '2.1.0'
|
||||||
|
|
||||||
LOGGER = logging.getLogger(__name__)
|
LOGGER = logging.getLogger(__name__)
|
||||||
|
|
||||||
try:
|
DATUM_MIME_TYPE = 'application/vnd.apache.avro.datum'
|
||||||
from sprockets.mixins import amqp
|
SCHEMA_URI_FORMAT = 'http://localhost/avro/%(name)s.avsc'
|
||||||
|
|
||||||
from sprockets.mixins.avro_publisher.mixins import (PublishingMixin,
|
_SCHEMAS = {}
|
||||||
SchemaFetchError)
|
|
||||||
|
|
||||||
except ImportError as error:
|
|
||||||
class PublishingMixin(object):
|
|
||||||
def __init__(self, *args, **kwargs):
|
|
||||||
raise error
|
|
||||||
|
|
||||||
class SchemaFetchError(Exception):
|
|
||||||
def __init__(self, *args, **kwargs):
|
|
||||||
raise error
|
|
||||||
|
|
||||||
class amqp(object):
|
|
||||||
|
|
||||||
error = None
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def install(cls, *args, **kwargs):
|
|
||||||
raise cls.error
|
|
||||||
|
|
||||||
amqp.error = error
|
|
||||||
|
|
||||||
version_info = (2, 0, 0)
|
|
||||||
__version__ = '.'.join(str(v) for v in version_info)
|
|
||||||
|
|
||||||
|
|
||||||
def install(application, **kwargs):
|
def install(application, io_loop=None, **kwargs): # pragma: nocover
|
||||||
"""Call this to install avro publishing for the Tornado application.
|
"""Call this to install Avro publishing for the Tornado application.
|
||||||
|
|
||||||
:rtype: bool
|
:rtype: bool
|
||||||
|
|
||||||
"""
|
"""
|
||||||
amqp.install(application, **kwargs)
|
amqp.install(application, io_loop=io_loop, **kwargs)
|
||||||
|
|
||||||
if 'avro_schema_uri_format' not in application.settings:
|
if 'avro_schema_uri_format' not in application.settings:
|
||||||
LOGGER.warning('avro_schema_uri_format is not set, using default')
|
LOGGER.warning('avro_schema_uri_format is not set, using default')
|
||||||
|
|
||||||
setattr(application, 'avro_schemas', {})
|
|
||||||
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
class PublishingMixin(amqp.PublishingMixin, http.HTTPClientMixin):
|
||||||
|
"""Publish to Avro encoded datums to RabbitMQ"""
|
||||||
|
|
||||||
|
def avro_amqp_publish(self, exchange, routing_key, message_type,
|
||||||
|
data, properties=None):
|
||||||
|
"""Publish a message to RabbitMQ, serializing the payload data as an
|
||||||
|
Avro datum and creating the AMQP message properties.
|
||||||
|
|
||||||
|
:param str exchange: The exchange to publish the message to.
|
||||||
|
:param str routing_key: The routing key to publish the message with.
|
||||||
|
:param str message_type: The message type for the Avro schema.
|
||||||
|
:param dict data: The message data to serialize.
|
||||||
|
:param dict properties: An optional dict of additional properties
|
||||||
|
to append.
|
||||||
|
:raises: sprockets.mixins.avro_publisher.SchemaFetchError
|
||||||
|
|
||||||
|
"""
|
||||||
|
properties = properties or {}
|
||||||
|
properties['content_type'] = DATUM_MIME_TYPE
|
||||||
|
properties['type'] = message_type
|
||||||
|
return self.amqp_publish(exchange, routing_key, data, properties)
|
||||||
|
|
||||||
|
@gen.coroutine
|
||||||
|
def amqp_publish(self, exchange, routing_key, body, properties=None):
|
||||||
|
"""Publish a message to RabbitMQ, serializing the payload data as an
|
||||||
|
Avro datum if the message is to be sent as such.
|
||||||
|
|
||||||
|
:param str exchange: The exchange to publish the message to.
|
||||||
|
:param str routing_key: The routing key to publish the message with.
|
||||||
|
:param dict body: The message data to serialize.
|
||||||
|
:param dict properties: An optional dict of additional properties
|
||||||
|
to append. If publishing an Avro message, it
|
||||||
|
must contain the Avro message type at 'type'
|
||||||
|
and have a content type of
|
||||||
|
'application/vnd.apache.avro.datum'
|
||||||
|
:raises: sprockets.mixins.avro_publisher.SchemaFetchError
|
||||||
|
|
||||||
|
"""
|
||||||
|
LOGGER.debug('Publishing Here: %r %r %r %r',
|
||||||
|
exchange, routing_key, body, properties)
|
||||||
|
properties = properties or {}
|
||||||
|
if properties.get('content_type') == DATUM_MIME_TYPE and \
|
||||||
|
isinstance(body, dict):
|
||||||
|
avro_schema = yield self._schema(properties['type'])
|
||||||
|
LOGGER.debug('Schema: %r', avro_schema)
|
||||||
|
body = self._serialize(avro_schema, body)
|
||||||
|
yield super(PublishingMixin, self).amqp_publish(
|
||||||
|
exchange, routing_key, body, properties)
|
||||||
|
|
||||||
|
@gen.coroutine
|
||||||
|
def _schema(self, message_type):
|
||||||
|
"""Fetch the Avro schema file from application cache or the remote
|
||||||
|
URI. If the request for the schema from the remote URI fails, a
|
||||||
|
:exc:`sprockets.mixins.avro_publisher.SchemaFetchError` will be
|
||||||
|
raised.
|
||||||
|
|
||||||
|
:param str message_type: The message type for the Avro schema.
|
||||||
|
:rtype: str
|
||||||
|
:raises: sprockets.mixins.avro_publisher.SchemaFetchError
|
||||||
|
|
||||||
|
"""
|
||||||
|
global _SCHEMAS
|
||||||
|
|
||||||
|
if message_type not in _SCHEMAS:
|
||||||
|
schema = yield self._fetch_schema(message_type)
|
||||||
|
_SCHEMAS[message_type] = schema
|
||||||
|
raise gen.Return(_SCHEMAS[message_type])
|
||||||
|
|
||||||
|
@gen.coroutine
|
||||||
|
def _fetch_schema(self, message_type):
|
||||||
|
"""Fetch the Avro schema for the given message type from a remote
|
||||||
|
location, returning the schema JSON string.
|
||||||
|
|
||||||
|
If the schema can not be retrieved, a
|
||||||
|
:exc:`~sprockets.mixins.avro_publisher.SchemaFetchError` will be
|
||||||
|
raised.
|
||||||
|
|
||||||
|
:param str message_type: The message type for the Avro schema.
|
||||||
|
:rtype: str
|
||||||
|
:raises: sprockets.mixins.avro_publisher.SchemaFetchError
|
||||||
|
|
||||||
|
"""
|
||||||
|
response = yield self.http_fetch(self._schema_url(message_type))
|
||||||
|
if response.ok:
|
||||||
|
raise gen.Return(json.loads(response.raw.body.decode('utf-8')))
|
||||||
|
raise SchemaFetchError()
|
||||||
|
|
||||||
|
def _schema_url(self, message_type):
|
||||||
|
"""Return the URL for the given message type for retrieving the Avro
|
||||||
|
schema from a remote location.
|
||||||
|
|
||||||
|
:param str message_type: The message type for the Avro schema.
|
||||||
|
|
||||||
|
:rtype: str
|
||||||
|
|
||||||
|
"""
|
||||||
|
return self.application.settings.get(
|
||||||
|
'avro_schema_uri_format',
|
||||||
|
SCHEMA_URI_FORMAT) % {'name': message_type}
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _serialize(schema, data):
|
||||||
|
"""Serialize a data structure into an Avro datum.
|
||||||
|
|
||||||
|
:param dict schema: The parsed Avro schema.
|
||||||
|
:param dict data: The value to turn into an Avro datum.
|
||||||
|
|
||||||
|
:rtype: bytes
|
||||||
|
|
||||||
|
"""
|
||||||
|
stream = io.BytesIO()
|
||||||
|
fastavro.schemaless_writer(stream, schema, data)
|
||||||
|
return stream.getvalue()
|
||||||
|
|
||||||
|
|
||||||
|
class SchemaFetchError(Exception):
|
||||||
|
"""Raised when the Avro schema could not be fetched."""
|
||||||
|
pass
|
||||||
|
|
|
@ -1,182 +0,0 @@
|
||||||
import io
|
|
||||||
import json
|
|
||||||
import logging
|
|
||||||
|
|
||||||
from sprockets.mixins import amqp
|
|
||||||
from tornado import gen, httpclient
|
|
||||||
import fastavro
|
|
||||||
|
|
||||||
LOGGER = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
|
|
||||||
class PublishingMixin(amqp.PublishingMixin):
|
|
||||||
"""The request handler will connect to RabbitMQ on the first request,
|
|
||||||
blocking until the connection and channel are established. If RabbitMQ
|
|
||||||
closes its connection to the app at any point, a connection attempt will
|
|
||||||
be made on the next request.
|
|
||||||
|
|
||||||
This class implements a pattern for the use of a single AMQP connection
|
|
||||||
to RabbitMQ.
|
|
||||||
"""
|
|
||||||
DATUM_MIME_TYPE = 'application/vnd.apache.avro.datum'
|
|
||||||
DEFAULT_SCHEMA_URI_FORMAT = 'http://localhost/avro/%(name)s.avsc'
|
|
||||||
DEFAULT_FETCH_RETRY_DELAY = 0.5
|
|
||||||
|
|
||||||
def initialize(self, *args, **kwargs):
|
|
||||||
self._schema_fetch_failed = False
|
|
||||||
|
|
||||||
if not hasattr(self, '_http_client'):
|
|
||||||
self._http_client = httpclient.AsyncHTTPClient(force_instance=True)
|
|
||||||
|
|
||||||
self._schema_uri_format = self.application.settings.get(
|
|
||||||
'avro_schema_uri_format',
|
|
||||||
self.DEFAULT_SCHEMA_URI_FORMAT)
|
|
||||||
|
|
||||||
self._fetch_retry_delay = self.application.settings.get(
|
|
||||||
'avro_schema_fetch_retry_delay',
|
|
||||||
self.DEFAULT_FETCH_RETRY_DELAY)
|
|
||||||
|
|
||||||
if hasattr(super(PublishingMixin, self), 'initialize'):
|
|
||||||
super(PublishingMixin, self).initialize(*args, **kwargs)
|
|
||||||
|
|
||||||
@gen.coroutine
|
|
||||||
def avro_amqp_publish(self, exchange, routing_key, message_type,
|
|
||||||
data, properties=None, mandatory=False):
|
|
||||||
"""Publish a message to RabbitMQ, serializing the payload data as an
|
|
||||||
Avro datum and creating the AMQP message properties.
|
|
||||||
|
|
||||||
:param str exchange: The exchange to publish the message to.
|
|
||||||
:param str routing_key: The routing key to publish the message with.
|
|
||||||
:param str message_type: The message type for the Avro schema.
|
|
||||||
:param dict data: The message data to serialize.
|
|
||||||
:param dict properties: An optional dict of additional properties
|
|
||||||
to append. Will not override mandatory
|
|
||||||
properties:
|
|
||||||
content_type, type
|
|
||||||
:param bool mandatory: Whether to instruct the server to return an
|
|
||||||
unqueueable message
|
|
||||||
http://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.publish.mandatory
|
|
||||||
|
|
||||||
:raises: sprockets.mixins.avro_publisher.SchemaFetchError
|
|
||||||
"""
|
|
||||||
# Set mandatory Avro-related properties
|
|
||||||
properties = properties or {}
|
|
||||||
properties['content_type'] = self.DATUM_MIME_TYPE
|
|
||||||
properties['type'] = message_type
|
|
||||||
|
|
||||||
yield self.amqp_publish(exchange, routing_key, data, properties,
|
|
||||||
mandatory)
|
|
||||||
|
|
||||||
@gen.coroutine
|
|
||||||
def amqp_publish(self, exchange, routing_key, body, properties,
|
|
||||||
mandatory=False):
|
|
||||||
"""Publish a message to RabbitMQ, serializing the payload data as an
|
|
||||||
Avro datum if the message is to be sent as such.
|
|
||||||
|
|
||||||
:param str exchange: The exchange to publish the message to.
|
|
||||||
:param str routing_key: The routing key to publish the message with.
|
|
||||||
:param dict body: The message data to serialize.
|
|
||||||
:param dict properties: A dict of additional properties
|
|
||||||
to append. If publishing an Avro message, it
|
|
||||||
must contain the Avro message type at 'type'
|
|
||||||
and have a content type of
|
|
||||||
'application/vnd.apache.avro.datum'
|
|
||||||
:param bool mandatory: Whether to instruct the server to return an
|
|
||||||
unqueueable message
|
|
||||||
http://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.publish.mandatory
|
|
||||||
|
|
||||||
:raises: sprockets.mixins.avro_publisher.SchemaFetchError
|
|
||||||
"""
|
|
||||||
if (('content_type' in properties and
|
|
||||||
properties['content_type']) == self.DATUM_MIME_TYPE):
|
|
||||||
avro_schema = yield self._schema(properties['type'])
|
|
||||||
body = self._serialize(avro_schema, body)
|
|
||||||
|
|
||||||
yield super(PublishingMixin, self).amqp_publish(
|
|
||||||
exchange,
|
|
||||||
routing_key,
|
|
||||||
body,
|
|
||||||
properties,
|
|
||||||
mandatory
|
|
||||||
)
|
|
||||||
|
|
||||||
@gen.coroutine
|
|
||||||
def _schema(self, message_type):
|
|
||||||
"""Fetch the Avro schema file from application cache or the remote URI.
|
|
||||||
If the request for the schema from the remote URI fails, a
|
|
||||||
:exc:`sprockets.mixins.avro_publisher.SchemaFetchError` will be raised.
|
|
||||||
|
|
||||||
:param str message_type: The message type for the Avro schema.
|
|
||||||
|
|
||||||
:rtype: str
|
|
||||||
|
|
||||||
:raises: sprockets.mixins.avro_publisher.SchemaFetchError
|
|
||||||
|
|
||||||
"""
|
|
||||||
if message_type not in self.application.avro_schemas:
|
|
||||||
schema = yield self._fetch_schema(message_type)
|
|
||||||
self.application.avro_schemas[message_type] = schema
|
|
||||||
raise gen.Return(self.application.avro_schemas[message_type])
|
|
||||||
|
|
||||||
@gen.coroutine
|
|
||||||
def _fetch_schema(self, message_type):
|
|
||||||
"""Fetch the Avro schema for the given message type from a remote
|
|
||||||
location, returning the schema JSON string.
|
|
||||||
|
|
||||||
If fetching the schema results in an ``tornado.httpclient.HTTPError``,
|
|
||||||
it will retry once then raise a SchemaFetchError if the retry fails.
|
|
||||||
|
|
||||||
:param str message_type: The message type for the Avro schema.
|
|
||||||
|
|
||||||
:rtype: str
|
|
||||||
|
|
||||||
:raises: sprockets.mixins.avro_publisher.SchemaFetchError
|
|
||||||
|
|
||||||
"""
|
|
||||||
url = self._schema_url(message_type)
|
|
||||||
LOGGER.debug('Loading schema for %s from %s', message_type, url)
|
|
||||||
try:
|
|
||||||
response = yield self._http_client.fetch(url)
|
|
||||||
except httpclient.HTTPError as error:
|
|
||||||
if self._schema_fetch_failed:
|
|
||||||
LOGGER.error('Could not fetch Avro schema for %s (%s)',
|
|
||||||
message_type, error)
|
|
||||||
raise SchemaFetchError(str(error))
|
|
||||||
else:
|
|
||||||
self._schema_fetch_failed = True
|
|
||||||
yield gen.sleep(self._fetch_retry_delay)
|
|
||||||
yield self._fetch_schema(message_type)
|
|
||||||
|
|
||||||
else:
|
|
||||||
self._schema_fetch_failed = False
|
|
||||||
raise gen.Return(json.loads(response.body.decode('utf-8')))
|
|
||||||
|
|
||||||
def _schema_url(self, message_type):
|
|
||||||
"""Return the URL for the given message type for retrieving the Avro
|
|
||||||
schema from a remote location.
|
|
||||||
|
|
||||||
:param str message_type: The message type for the Avro schema.
|
|
||||||
|
|
||||||
:rtype: str
|
|
||||||
|
|
||||||
"""
|
|
||||||
return self._schema_uri_format % {'name': message_type}
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def _serialize(schema, data):
|
|
||||||
"""Serialize a data structure into an Avro datum.
|
|
||||||
|
|
||||||
:param dict schema: The parsed Avro schema.
|
|
||||||
:param dict data: The value to turn into an Avro datum.
|
|
||||||
|
|
||||||
:rtype: bytes
|
|
||||||
|
|
||||||
"""
|
|
||||||
stream = io.BytesIO()
|
|
||||||
fastavro.schemaless_writer(stream, schema, data)
|
|
||||||
return stream.getvalue()
|
|
||||||
|
|
||||||
|
|
||||||
class SchemaFetchError(ValueError):
|
|
||||||
"""Raised when the Avro schema could not be fetched."""
|
|
||||||
pass
|
|
539
tests.py
539
tests.py
|
@ -1,423 +1,184 @@
|
||||||
import io
|
|
||||||
import json
|
import json
|
||||||
|
import io
|
||||||
import logging
|
import logging
|
||||||
import os
|
import random
|
||||||
import uuid
|
import uuid
|
||||||
|
|
||||||
from pika import spec
|
from tornado import concurrent, gen, locks, testing, web
|
||||||
import mock
|
|
||||||
|
|
||||||
from tornado.concurrent import Future
|
|
||||||
from tornado.httpclient import HTTPError
|
|
||||||
from tornado import gen, locks, testing, web
|
|
||||||
import fastavro
|
import fastavro
|
||||||
|
from pika import spec
|
||||||
|
|
||||||
from sprockets.mixins import avro_publisher
|
from sprockets.mixins import amqp, avro_publisher
|
||||||
from sprockets.mixins.avro_publisher.mixins import SchemaFetchError
|
|
||||||
|
|
||||||
MESSAGE_TYPE = "example.avro.User"
|
LOGGER = logging.getLogger(__name__)
|
||||||
|
|
||||||
AVRO_SCHEMA = """
|
MESSAGE_TYPE = "example.avro.Test"
|
||||||
{"namespace": "example.avro",
|
|
||||||
|
AVRO_SCHEMA = {
|
||||||
|
"namespace": "example.avro",
|
||||||
"type": "record",
|
"type": "record",
|
||||||
"name": "User",
|
"name": "User",
|
||||||
"fields": [
|
"fields": [
|
||||||
{"name": "name", "type": "string"},
|
{"name": "name", "type": "string"},
|
||||||
{"name": "favorite_number", "type": ["int", "null"]},
|
{"name": "favorite_number", "type": ["int", "null"]},
|
||||||
{"name": "favorite_color", "type": ["string", "null"]}
|
{"name": "favorite_color", "type": ["string", "null"]}]}
|
||||||
]
|
|
||||||
}
|
|
||||||
"""
|
|
||||||
|
|
||||||
# Set this URL to that of a running AMQP server before executing tests
|
|
||||||
if 'TEST_AMQP_URL' in os.environ:
|
|
||||||
AMQP_URL = os.environ['TEST_AMQP_URL']
|
|
||||||
else:
|
|
||||||
AMQP_URL = 'amqp://guest:guest@localhost:5672/%2f'
|
|
||||||
|
|
||||||
LOGGER = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
|
|
||||||
class TestRequestHandler(avro_publisher.PublishingMixin):
|
def deserialize(value):
|
||||||
|
return fastavro.schemaless_reader(io.BytesIO(value), AVRO_SCHEMA)
|
||||||
def __init__(self, application):
|
|
||||||
self.application = application
|
|
||||||
self.correlation_id = str(uuid.uuid4())
|
|
||||||
self.initialize()
|
|
||||||
|
|
||||||
def set_client(self, client):
|
|
||||||
self._http_client = client
|
|
||||||
|
|
||||||
|
|
||||||
@mock.patch('tornado.httpclient.AsyncHTTPClient')
|
class Test1RequestHandler(avro_publisher.PublishingMixin, web.RequestHandler):
|
||||||
class MockHTTPClient:
|
|
||||||
|
|
||||||
def fetch(self, *args, **kwargs):
|
def initialize(self):
|
||||||
with mock.patch('tornado.httpclient.HTTPResponse') as response_class:
|
self.correlation_id = self.request.headers.get('Correlation-Id')
|
||||||
response = response_class.return_value
|
self.publish = self.amqp_publish
|
||||||
response.body = AVRO_SCHEMA.encode()
|
|
||||||
|
|
||||||
future = Future()
|
|
||||||
future.set_result(response)
|
|
||||||
return future
|
|
||||||
|
|
||||||
|
|
||||||
@mock.patch('tornado.httpclient.AsyncHTTPClient')
|
|
||||||
class MockHTTPClientError:
|
|
||||||
|
|
||||||
def __init__(self):
|
|
||||||
self.times_called = 0
|
|
||||||
|
|
||||||
def fetch(self, *args, **kwargs):
|
|
||||||
self.times_called += 1
|
|
||||||
raise HTTPError(500)
|
|
||||||
|
|
||||||
|
|
||||||
class BaseTestCase(testing.AsyncTestCase):
|
|
||||||
|
|
||||||
@gen.coroutine
|
@gen.coroutine
|
||||||
|
def get(self, *args, **kwargs):
|
||||||
|
LOGGER.debug('Handling Request %r', self.correlation_id)
|
||||||
|
parameters = self.parameters()
|
||||||
|
try:
|
||||||
|
yield self.publish(**parameters)
|
||||||
|
except amqp.AMQPException as error:
|
||||||
|
self.write({'error': str(error),
|
||||||
|
'type': error.__class__.__name__,
|
||||||
|
'parameters': parameters})
|
||||||
|
else:
|
||||||
|
self.write(parameters) # Correlation-ID is added pass by reference
|
||||||
|
self.finish()
|
||||||
|
|
||||||
|
def parameters(self):
|
||||||
|
return {
|
||||||
|
'exchange': self.get_argument('exchange', str(uuid.uuid4())),
|
||||||
|
'routing_key': self.get_argument('routing_key', str(uuid.uuid4())),
|
||||||
|
'body': {
|
||||||
|
'name': str(uuid.uuid4()),
|
||||||
|
'favorite_number': random.randint(1, 1000),
|
||||||
|
'favorite_color': str(uuid.uuid4())
|
||||||
|
},
|
||||||
|
'properties': {
|
||||||
|
'content_type': avro_publisher.DATUM_MIME_TYPE,
|
||||||
|
'message_id': str(uuid.uuid4()),
|
||||||
|
'type': MESSAGE_TYPE}}
|
||||||
|
|
||||||
|
|
||||||
|
class Test2RequestHandler(Test1RequestHandler):
|
||||||
|
|
||||||
|
def initialize(self):
|
||||||
|
self.correlation_id = self.request.headers.get('Correlation-Id')
|
||||||
|
self.publish = self.avro_amqp_publish
|
||||||
|
|
||||||
|
def parameters(self):
|
||||||
|
return {
|
||||||
|
'exchange': self.get_argument('exchange', str(uuid.uuid4())),
|
||||||
|
'routing_key': self.get_argument('routing_key', str(uuid.uuid4())),
|
||||||
|
'message_type': MESSAGE_TYPE,
|
||||||
|
'data': {
|
||||||
|
'name': str(uuid.uuid4()),
|
||||||
|
'favorite_number': random.randint(1, 1000),
|
||||||
|
'favorite_color': str(uuid.uuid4())
|
||||||
|
},
|
||||||
|
'properties': {'message_id': str(uuid.uuid4())}}
|
||||||
|
|
||||||
|
|
||||||
|
class SchemaRequestHandler(web.RequestHandler):
|
||||||
|
def get(self, *args, **kwargs):
|
||||||
|
LOGGER.debug('Returning Schema for %r %r', args, kwargs)
|
||||||
|
self.finish(AVRO_SCHEMA)
|
||||||
|
|
||||||
|
|
||||||
|
def setUpModule():
|
||||||
|
logging.getLogger('pika').setLevel(logging.INFO)
|
||||||
|
|
||||||
|
|
||||||
|
class AsyncHTTPTestCase(testing.AsyncHTTPTestCase):
|
||||||
|
CONFIRMATIONS = True
|
||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
super(BaseTestCase, self).setUp()
|
super(AsyncHTTPTestCase, self).setUp()
|
||||||
|
|
||||||
# make sure that our logging statements get executed
|
|
||||||
avro_publisher.mixins.LOGGER.enabled = True
|
|
||||||
avro_publisher.mixins.LOGGER.setLevel(logging.DEBUG)
|
|
||||||
|
|
||||||
self.exchange = str(uuid.uuid4())
|
|
||||||
self.queue = str(uuid.uuid4())
|
|
||||||
self.routing_key = MESSAGE_TYPE
|
|
||||||
self.correlation_id = str(uuid.uuid4())
|
self.correlation_id = str(uuid.uuid4())
|
||||||
self.message = None
|
self.exchange = str(uuid.uuid4())
|
||||||
self.test_queue_bound = locks.Event()
|
self.get_delivered_message = concurrent.Future()
|
||||||
self.get_response = locks.Event()
|
self.get_returned_message = concurrent.Future()
|
||||||
self.amqp_ready = locks.Event()
|
self.queue = str(uuid.uuid4())
|
||||||
self.condition = locks.Condition()
|
self.routing_key = str(uuid.uuid4())
|
||||||
self.config = {
|
self.ready = locks.Event()
|
||||||
"url": AMQP_URL,
|
avro_publisher.install(self._app, self.io_loop, **{
|
||||||
"reconnect_delay": 1,
|
'on_ready_callback': self.on_amqp_ready,
|
||||||
"timeout": 2,
|
'enable_confirmations': self.CONFIRMATIONS,
|
||||||
"on_ready_callback": self.on_ready,
|
'on_return_callback': self.on_message_returned,
|
||||||
"on_unavailable_callback": self.on_unavailable,
|
'url': 'amqp://guest:guest@127.0.0.1:5672/%2f'})
|
||||||
"on_persistent_failure_callback": self.on_persistent_failure,
|
self.io_loop.start()
|
||||||
"on_message_returned_callback": self.on_message_returned,
|
|
||||||
"io_loop": self.io_loop,
|
|
||||||
}
|
|
||||||
self.app = web.Application()
|
|
||||||
self.app.settings = {
|
|
||||||
'service': 'unit_tests',
|
|
||||||
'version': '0.0',
|
|
||||||
}
|
|
||||||
|
|
||||||
self.clear_event_tracking()
|
def get_app(self):
|
||||||
|
return web.Application(
|
||||||
|
[(r'/test1', Test1RequestHandler),
|
||||||
|
(r'/test2', Test2RequestHandler),
|
||||||
|
(r'/schema/(.*).avsc', SchemaRequestHandler)],
|
||||||
|
**{'avro_schema_uri_format': self.get_url('/schema/%(name)s.avsc'),
|
||||||
|
'service': 'test',
|
||||||
|
'version': avro_publisher.__version__})
|
||||||
|
|
||||||
self.handler = TestRequestHandler(self.app)
|
def on_amqp_ready(self, _client):
|
||||||
|
LOGGER.debug('AMQP ready')
|
||||||
|
self._app.amqp.channel.exchange_declare(
|
||||||
|
self.on_exchange_declared, self.exchange,
|
||||||
|
durable=False, auto_delete=True)
|
||||||
|
|
||||||
self.create_http_client()
|
def on_exchange_declared(self, method):
|
||||||
|
LOGGER.debug('Exchange declared: %r', method)
|
||||||
|
self._app.amqp.channel.queue_declare(
|
||||||
|
self.on_queue_declared, self.queue,
|
||||||
|
arguments={'x-expires': 30000},
|
||||||
|
auto_delete=True, durable=False)
|
||||||
|
|
||||||
avro_publisher.install(self.app, **self.config)
|
def on_queue_declared(self, method):
|
||||||
yield self.condition.wait(self.io_loop.time() + 5)
|
LOGGER.debug('Queue declared: %r', method)
|
||||||
|
self._app.amqp.channel.queue_bind(
|
||||||
|
self.on_queue_bound, self.queue, self.exchange, self.routing_key)
|
||||||
|
|
||||||
LOGGER.info('Connected to RabbitMQ, declaring exchange %s',
|
def on_queue_bound(self, method):
|
||||||
self.exchange)
|
LOGGER.debug('Queue bound: %r', method)
|
||||||
self.app.amqp.channel.exchange_declare(self.on_exchange_declare_ok,
|
self._app.amqp.channel.basic_consume(
|
||||||
self.exchange,
|
self.on_message_delivered, self.queue)
|
||||||
auto_delete=True)
|
self.io_loop.stop()
|
||||||
|
|
||||||
def create_http_client(self):
|
def on_message_delivered(self, _channel, method, properties, body):
|
||||||
client = MockHTTPClient()
|
self.get_delivered_message.set_result((method, properties, body))
|
||||||
self.handler.set_client(client)
|
|
||||||
|
|
||||||
def create_http_error_client(self):
|
def on_message_returned(self, method, properties, body):
|
||||||
client = MockHTTPClientError()
|
self.get_returned_message.set_result((method, properties, body))
|
||||||
self.handler.set_client(client)
|
|
||||||
|
|
||||||
def on_exchange_declare_ok(self, _method):
|
|
||||||
LOGGER.info(
|
|
||||||
'Exchange %s declared, declaring queue %s',
|
|
||||||
self.exchange,
|
|
||||||
self.queue
|
|
||||||
)
|
|
||||||
self.app.amqp.channel.queue_declare(self.on_queue_declare_ok,
|
|
||||||
queue=self.queue,
|
|
||||||
auto_delete=True)
|
|
||||||
|
|
||||||
def on_queue_declare_ok(self, _method):
|
|
||||||
LOGGER.info('Queue %s declared', self.queue)
|
|
||||||
self.app.amqp.channel.queue_bind(self.on_bind_ok, self.queue,
|
|
||||||
self.exchange, self.routing_key)
|
|
||||||
|
|
||||||
def on_bind_ok(self, _method):
|
|
||||||
LOGGER.info('Queue %s bound to %s', self.queue, self.exchange)
|
|
||||||
self.app.amqp.channel.add_callback(self.on_get_response,
|
|
||||||
[spec.Basic.GetEmpty], False)
|
|
||||||
self.test_queue_bound.set()
|
|
||||||
|
|
||||||
def on_get_response(self, channel, method, properties=None, body=None):
|
|
||||||
LOGGER.info('get_response: %r', method)
|
|
||||||
self.message = {
|
|
||||||
'method': method,
|
|
||||||
'properties': properties,
|
|
||||||
'body': body,
|
|
||||||
}
|
|
||||||
self.get_response.set()
|
|
||||||
|
|
||||||
def on_ready(self, caller):
|
|
||||||
LOGGER.info('on_ready called')
|
|
||||||
self.ready_called = True
|
|
||||||
self.amqp_ready.set()
|
|
||||||
|
|
||||||
def on_unavailable(self, caller):
|
|
||||||
LOGGER.info('on_unavailable called')
|
|
||||||
self.unavailable_called = True
|
|
||||||
self.amqp_ready.clear()
|
|
||||||
|
|
||||||
def on_persistent_failure(self, caller, exchange,
|
|
||||||
routing_key, body, properties):
|
|
||||||
LOGGER.info('on_persistent_failure called')
|
|
||||||
self.persistent_failure_called = True
|
|
||||||
self.failed_message = {
|
|
||||||
'exchange': exchange,
|
|
||||||
'routing_key': routing_key,
|
|
||||||
'body': body,
|
|
||||||
'properties': properties,
|
|
||||||
}
|
|
||||||
self.amqp_ready.clear()
|
|
||||||
|
|
||||||
def on_message_returned(self, caller, method, properties, body):
|
|
||||||
LOGGER.info('on_message_returned called')
|
|
||||||
self.message_returned_called = True
|
|
||||||
self.message_returned_error = method.reply_text
|
|
||||||
self.returned_message = {
|
|
||||||
'exchange': method.exchange,
|
|
||||||
'routing_key': method.routing_key,
|
|
||||||
'body': body,
|
|
||||||
'properties': properties,
|
|
||||||
}
|
|
||||||
|
|
||||||
def clear_event_tracking(self):
|
|
||||||
self.ready_called = False
|
|
||||||
self.unavailable_called = False
|
|
||||||
self.persistent_failure_called = False
|
|
||||||
self.message_returned_called = False
|
|
||||||
self.failed_message = {
|
|
||||||
'exchange': None,
|
|
||||||
'routing_key': None,
|
|
||||||
'body': None,
|
|
||||||
'properties': None,
|
|
||||||
}
|
|
||||||
self.returned_message = {
|
|
||||||
'exchange': None,
|
|
||||||
'routing_key': None,
|
|
||||||
'body': None,
|
|
||||||
'properties': None,
|
|
||||||
}
|
|
||||||
|
|
||||||
@gen.coroutine
|
|
||||||
def get_message(self):
|
|
||||||
self.message = None
|
|
||||||
self.get_response.clear()
|
|
||||||
self.app.amqp.channel.basic_get(self.on_get_response, self.queue)
|
|
||||||
|
|
||||||
LOGGER.info('Waiting on get')
|
|
||||||
yield self.get_response.wait()
|
|
||||||
if isinstance(self.message['method'], spec.Basic.GetEmpty):
|
|
||||||
raise ValueError('Basic.GetEmpty')
|
|
||||||
raise gen.Return(self.message)
|
|
||||||
|
|
||||||
|
|
||||||
class SettingsTests(testing.AsyncTestCase):
|
class PublisherConfirmationTestCase(AsyncHTTPTestCase):
|
||||||
|
|
||||||
@testing.gen_test(timeout=10)
|
|
||||||
def should_warn_when_no_uri_schema_set_test(self):
|
|
||||||
app = web.Application()
|
|
||||||
|
|
||||||
with mock.patch('logging.Logger.warning') as mock_logger:
|
|
||||||
avro_publisher.install(app, url=AMQP_URL)
|
|
||||||
|
|
||||||
mock_logger.assert_called_with(
|
|
||||||
'avro_schema_uri_format is not set, using default')
|
|
||||||
|
|
||||||
@testing.gen_test(timeout=10)
|
|
||||||
def should_set_default_settings_test(self):
|
|
||||||
app = web.Application()
|
|
||||||
|
|
||||||
avro_publisher.install(app, url=AMQP_URL)
|
|
||||||
|
|
||||||
handler = TestRequestHandler(app)
|
|
||||||
|
|
||||||
|
@testing.gen_test
|
||||||
|
def test_amqp_publish(self):
|
||||||
|
response = yield self.http_client.fetch(
|
||||||
|
self.get_url('/test1?exchange={}&routing_key={}'.format(
|
||||||
|
self.exchange, self.routing_key)),
|
||||||
|
headers={'Correlation-Id': self.correlation_id})
|
||||||
|
published = json.loads(response.body.decode('utf-8'))
|
||||||
|
delivered = yield self.get_delivered_message
|
||||||
|
self.assertIsInstance(delivered[0], spec.Basic.Deliver)
|
||||||
|
self.assertEqual(delivered[1].correlation_id, self.correlation_id)
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
handler._schema_uri_format,
|
delivered[1].content_type, avro_publisher.DATUM_MIME_TYPE)
|
||||||
avro_publisher.PublishingMixin.DEFAULT_SCHEMA_URI_FORMAT
|
self.assertEqual(delivered[1].type, MESSAGE_TYPE)
|
||||||
)
|
self.assertEqual(deserialize(delivered[2]), published['body'])
|
||||||
|
|
||||||
|
@testing.gen_test
|
||||||
|
def test_avro_amqp_publish(self):
|
||||||
|
response = yield self.http_client.fetch(
|
||||||
|
self.get_url('/test2?exchange={}&routing_key={}'.format(
|
||||||
|
self.exchange, self.routing_key)),
|
||||||
|
headers={'Correlation-Id': self.correlation_id})
|
||||||
|
published = json.loads(response.body.decode('utf-8'))
|
||||||
|
delivered = yield self.get_delivered_message
|
||||||
|
self.assertIsInstance(delivered[0], spec.Basic.Deliver)
|
||||||
|
self.assertEqual(delivered[1].correlation_id, self.correlation_id)
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
handler._fetch_retry_delay,
|
delivered[1].content_type, avro_publisher.DATUM_MIME_TYPE)
|
||||||
avro_publisher.PublishingMixin.DEFAULT_FETCH_RETRY_DELAY
|
self.assertEqual(delivered[1].type, MESSAGE_TYPE)
|
||||||
)
|
self.assertEqual(deserialize(delivered[2]), published['data'])
|
||||||
|
|
||||||
@testing.gen_test(timeout=10)
|
|
||||||
def should_override_default_with_configured_settings_test(self):
|
|
||||||
app = web.Application()
|
|
||||||
app.settings = {
|
|
||||||
'avro_schema_uri_format': 'http://127.0.0.1/avro/%(name)s.avsc',
|
|
||||||
'avro_schema_fetch_retry_delay':
|
|
||||||
avro_publisher.PublishingMixin.DEFAULT_FETCH_RETRY_DELAY + 1,
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
avro_publisher.install(app, url=AMQP_URL)
|
|
||||||
|
|
||||||
handler = TestRequestHandler(app)
|
|
||||||
|
|
||||||
self.assertEqual(handler._schema_uri_format,
|
|
||||||
app.settings['avro_schema_uri_format'])
|
|
||||||
self.assertEqual(handler._fetch_retry_delay,
|
|
||||||
app.settings['avro_schema_fetch_retry_delay'])
|
|
||||||
|
|
||||||
|
|
||||||
class AvroIntegrationTests(BaseTestCase):
|
|
||||||
|
|
||||||
@testing.gen_test(timeout=10)
|
|
||||||
def should_publish_avro_message_test(self):
|
|
||||||
yield self.test_queue_bound.wait()
|
|
||||||
|
|
||||||
LOGGER.info('Should be ready')
|
|
||||||
|
|
||||||
message = {
|
|
||||||
"name": "testuser",
|
|
||||||
"favorite_number": 1,
|
|
||||||
"favorite_color": "green",
|
|
||||||
}
|
|
||||||
|
|
||||||
yield self.handler.avro_amqp_publish(
|
|
||||||
self.exchange,
|
|
||||||
MESSAGE_TYPE,
|
|
||||||
self.routing_key,
|
|
||||||
message
|
|
||||||
)
|
|
||||||
|
|
||||||
stream = io.BytesIO()
|
|
||||||
fastavro.schemaless_writer(stream, json.loads(AVRO_SCHEMA), message)
|
|
||||||
serialized_message = stream.getvalue()
|
|
||||||
|
|
||||||
LOGGER.info('Published')
|
|
||||||
|
|
||||||
result = yield self.get_message()
|
|
||||||
|
|
||||||
self.assertEqual(serialized_message, result['body'])
|
|
||||||
self.assertEqual(self.handler.app_id, result['properties'].app_id)
|
|
||||||
self.assertEqual(self.handler.correlation_id,
|
|
||||||
result['properties'].correlation_id)
|
|
||||||
self.assertEqual(avro_publisher.PublishingMixin.DATUM_MIME_TYPE,
|
|
||||||
result['properties'].content_type)
|
|
||||||
|
|
||||||
@testing.gen_test(timeout=10)
|
|
||||||
def should_publish_other_format_amqp_message_test(self):
|
|
||||||
yield self.test_queue_bound.wait()
|
|
||||||
|
|
||||||
LOGGER.info('Should be ready')
|
|
||||||
|
|
||||||
message = bytes(bytearray(range(255, 0, -1)))
|
|
||||||
properties = {'content_type': 'application/octet-stream'}
|
|
||||||
|
|
||||||
yield self.handler.amqp_publish(
|
|
||||||
self.exchange,
|
|
||||||
self.routing_key,
|
|
||||||
message,
|
|
||||||
properties
|
|
||||||
)
|
|
||||||
|
|
||||||
LOGGER.info('Published')
|
|
||||||
|
|
||||||
result = yield self.get_message()
|
|
||||||
|
|
||||||
self.assertEqual(message, result['body'])
|
|
||||||
self.assertEqual(self.handler.app_id, result['properties'].app_id)
|
|
||||||
self.assertEqual(self.handler.correlation_id,
|
|
||||||
result['properties'].correlation_id)
|
|
||||||
self.assertEqual(properties['content_type'],
|
|
||||||
result['properties'].content_type)
|
|
||||||
|
|
||||||
@testing.gen_test(timeout=10)
|
|
||||||
def should_raise_schema_fetch_error_on_fetch_failure_test(self):
|
|
||||||
yield self.test_queue_bound.wait()
|
|
||||||
|
|
||||||
LOGGER.info('Should be ready')
|
|
||||||
|
|
||||||
self.create_http_error_client()
|
|
||||||
|
|
||||||
with self.assertRaises(SchemaFetchError):
|
|
||||||
message = {
|
|
||||||
"name": "testuser",
|
|
||||||
"favorite_number": 1,
|
|
||||||
"favorite_color": "green",
|
|
||||||
}
|
|
||||||
|
|
||||||
yield self.handler.avro_amqp_publish(
|
|
||||||
self.exchange,
|
|
||||||
MESSAGE_TYPE,
|
|
||||||
self.routing_key,
|
|
||||||
message
|
|
||||||
)
|
|
||||||
|
|
||||||
@testing.gen_test(timeout=10)
|
|
||||||
def should_retry_once_on_fetch_failure_test(self):
|
|
||||||
yield self.test_queue_bound.wait()
|
|
||||||
|
|
||||||
LOGGER.info('Should be ready')
|
|
||||||
|
|
||||||
self.create_http_error_client()
|
|
||||||
|
|
||||||
with self.assertRaises(SchemaFetchError):
|
|
||||||
message = {
|
|
||||||
"name": "testuser",
|
|
||||||
"favorite_number": 1,
|
|
||||||
"favorite_color": "green",
|
|
||||||
}
|
|
||||||
|
|
||||||
yield self.handler.avro_amqp_publish(
|
|
||||||
self.exchange,
|
|
||||||
MESSAGE_TYPE,
|
|
||||||
self.routing_key,
|
|
||||||
message
|
|
||||||
)
|
|
||||||
|
|
||||||
self.assertEqual(2, self.handler._http_client.times_called)
|
|
||||||
|
|
||||||
@testing.gen_test(timeout=10)
|
|
||||||
def should_serialize_avro_message_when_amqp_publish_called_test(self):
|
|
||||||
yield self.test_queue_bound.wait()
|
|
||||||
|
|
||||||
LOGGER.info('Should be ready')
|
|
||||||
|
|
||||||
message = {
|
|
||||||
"name": "testuser",
|
|
||||||
"favorite_number": 1,
|
|
||||||
"favorite_color": "green",
|
|
||||||
}
|
|
||||||
|
|
||||||
properties = {
|
|
||||||
'content_type': avro_publisher.PublishingMixin.DATUM_MIME_TYPE,
|
|
||||||
'type': MESSAGE_TYPE
|
|
||||||
}
|
|
||||||
|
|
||||||
yield self.handler.amqp_publish(
|
|
||||||
self.exchange,
|
|
||||||
self.routing_key,
|
|
||||||
message,
|
|
||||||
properties
|
|
||||||
)
|
|
||||||
|
|
||||||
stream = io.BytesIO()
|
|
||||||
fastavro.schemaless_writer(stream, json.loads(AVRO_SCHEMA), message)
|
|
||||||
serialized_message = stream.getvalue()
|
|
||||||
|
|
||||||
LOGGER.info('Published')
|
|
||||||
|
|
||||||
result = yield self.get_message()
|
|
||||||
|
|
||||||
self.assertEqual(serialized_message, result['body'])
|
|
||||||
self.assertEqual(self.handler.app_id, result['properties'].app_id)
|
|
||||||
self.assertEqual(self.handler.correlation_id,
|
|
||||||
result['properties'].correlation_id)
|
|
||||||
self.assertEqual(avro_publisher.PublishingMixin.DATUM_MIME_TYPE,
|
|
||||||
result['properties'].content_type)
|
|
||||||
|
|
Loading…
Reference in a new issue