sprockets.mixins.avro-publi.../sprockets/mixins/avro_publisher.py

149 lines
5.1 KiB
Python
Raw Normal View History

2015-09-24 18:29:18 +00:00
"""The AvroPublishingMixin wraps RabbitMQ use into a request handler, with
methods to speed the development of publishing RabbitMQ messages serialized
as Avro datums.
RabbitMQ is configured using two environment variables: ``AMQP_URL`` and
``AMQP_TIMEOUT``.
``AMQP_URL`` is the AMQP url to connect to, defaults to
``amqp://guest:guest@localhost:5672/%2f``.
``AMQP_TIMEOUT`` is the number of seconds to wait until timing out when
connecting to RabbitMQ.
To configure the URL format for the avro schema, add a
Tornado application setting called ``avro_schema_uri_format``. The format
should be similar to the following:
``http://my-schema-repository/avro/%(name)s.avsc``
"""
import io
import logging
import sys
from sprockets.mixins import amqp
from tornado import gen
from tornado import httpclient
import avro.io
import avro.schema
version_info = (0, 1, 0)
__version__ = '.'.join(str(v) for v in version_info)
LOGGER = logging.getLogger(__name__)
PYTHON3 = True if sys.version_info > (3, 0, 0) else False
class AvroPublishingMixin(amqp.PublishingMixin):
"""The request handler will connect to RabbitMQ on the first request,
blocking until the connection and channel are established. If RabbitMQ
closes it's connection to the app at any point, a connection attempt will
be made on the next request.
This class implements a pattern for the use of a single AMQP connection
to RabbitMQ.
Expects the :envvar:`AMQP_URL` environment variable to construct
:class:`pika.connection.URLParameters`.
"""
DATUM_MIME_TYPE = 'application/vnd.apache.avro.datum'
DEFAULT_SCHEMA_URI_FORMAT = 'http://localhost/avro/%(name)s.avsc'
def initialize(self):
"""Initialize the RequestHandler ensuring there is an a dict for
caching avro schemas.
"""
super(AvroPublishingMixin, self).initialize()
if 'avro_schema_uri_format' not in self.application.settings:
LOGGER.warning('avro_schema_uri_format is not set, using default')
if not hasattr(self.application, 'avro'):
self.application.avro = {}
@gen.coroutine
def amqp_publish(self, exchange, routing_key, body, properties):
"""Publish the message to RabbitMQ
:param str exchange: The exchange to publish to
:param str routing_key: The routing key to publish with
:param dict body: The message body
:param dict properties: The message properties
"""
if ('content_type' in properties and
properties['content_type']) == self.DATUM_MIME_TYPE:
body = yield self._avro_serialize(properties['type'], body)
yield self.application.amqp.publish(exchange, routing_key, body,
properties)
@gen.coroutine
def _avro_fetch_schema(self, schema_name):
"""Fetch the avro schema file from the remote HTTP endpoint
:param str schema_name: The schema name
:rtype: str
"""
http_client = httpclient.AsyncHTTPClient()
url = self._avro_schema_url(schema_name)
LOGGER.info('Loading schema for %s from %s', schema_name, url)
try:
response = yield http_client.fetch(url)
except httpclient.HTTPError as error:
LOGGER.error('Could not fetch Avro schema for %s (%s)', schema_name,
error)
raise ValueError('Error fetching avro schema')
raise gen.Return(response.body)
@gen.coroutine
def _avro_schema(self, schema_name):
"""Fetch the Avro schema file from cache or the filesystem.
:param str schema_name: The avro schema name
:rtype: str
"""
if schema_name not in self.application.avro:
schema = yield self._avro_fetch_schema(schema_name)
if PYTHON3:
schema = str(schema, 'utf-8')
self.application.avro[schema_name] = avro.schema.Parse(schema)
else:
self.application.avro[schema_name] = avro.schema.parse(schema)
raise gen.Return(self.application.avro[schema_name])
def _avro_schema_url(self, schema_name):
"""Return the Avro schema URL for the specified schema name.
:param str schema_name: The avro schema name
:rtype: str
"""
if 'avro_schema_uri_format' in self.application.settings:
schema_format = self.application.settings['avro_schema_uri_format']
else:
schema_format = self.DEFAULT_SCHEMA_URI_FORMAT
return schema_format % {'name': schema_name}
@gen.coroutine
def _avro_serialize(self, schema_name, data):
"""Serialize a data structure into an Avro datum
:param str schema_name: The Avro schema name
:param dict data: The value to turn into an Avro datum
:rtype: str
"""
schema = yield self._avro_schema(schema_name)
bytes_io = io.BytesIO()
encoder = avro.io.BinaryEncoder(bytes_io)
writer = avro.io.DatumWriter(schema)
try:
writer.write(data, encoder)
except avro.io.AvroTypeException as error:
raise ValueError(error)
raise gen.Return(bytes_io.getvalue())