mirror of
https://github.com/sprockets/sprockets.mixins.avro-publisher.git
synced 2024-12-03 03:00:23 +00:00
Initial commit
This commit is contained in:
commit
7324bea384
14 changed files with 314 additions and 0 deletions
25
LICENSE
Normal file
25
LICENSE
Normal file
|
@ -0,0 +1,25 @@
|
||||||
|
Copyright (c) 2015 AWeber Communications
|
||||||
|
All rights reserved.
|
||||||
|
|
||||||
|
Redistribution and use in source and binary forms, with or without modification,
|
||||||
|
are permitted provided that the following conditions are met:
|
||||||
|
|
||||||
|
* Redistributions of source code must retain the above copyright notice, this
|
||||||
|
list of conditions and the following disclaimer.
|
||||||
|
* Redistributions in binary form must reproduce the above copyright notice,
|
||||||
|
this list of conditions and the following disclaimer in the documentation
|
||||||
|
and/or other materials provided with the distribution.
|
||||||
|
* Neither the name of Sprockets nor the names of its
|
||||||
|
contributors may be used to endorse or promote products derived from this
|
||||||
|
software without specific prior written permission.
|
||||||
|
|
||||||
|
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
|
||||||
|
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
|
||||||
|
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
|
||||||
|
IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
|
||||||
|
INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
|
||||||
|
BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
|
||||||
|
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
|
||||||
|
LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
|
||||||
|
OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
|
||||||
|
ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
4
MANIFEST.in
Normal file
4
MANIFEST.in
Normal file
|
@ -0,0 +1,4 @@
|
||||||
|
include requires/python2.txt
|
||||||
|
include requires/python3.txt
|
||||||
|
include LICENSE
|
||||||
|
include README.rst
|
84
README.rst
Normal file
84
README.rst
Normal file
|
@ -0,0 +1,84 @@
|
||||||
|
sprockets.mixins.avro_publisher
|
||||||
|
===============================
|
||||||
|
AMQP Publishing Mixin for publishing messages as Avro datum
|
||||||
|
|
||||||
|
|Version| |Downloads| |Travis| |CodeCov| |ReadTheDocs|
|
||||||
|
|
||||||
|
Installation
|
||||||
|
------------
|
||||||
|
``sprockets.mixins.avro_publisher`` is available on the
|
||||||
|
`Python Package Index <https://pypi.python.org/pypi/sprockets.mixins.avro_publisher>`_
|
||||||
|
and can be installed via ``pip`` or ``easy_install``:
|
||||||
|
|
||||||
|
.. code-block:: bash
|
||||||
|
|
||||||
|
pip install sprockets.mixins.avro_publisher
|
||||||
|
|
||||||
|
Requirements
|
||||||
|
------------
|
||||||
|
- sprockets.mixins.amqp>=0.1.1
|
||||||
|
|
||||||
|
Example
|
||||||
|
-------
|
||||||
|
This examples demonstrates the most basic usage of ``sprockets.mixins.avro_publisher``
|
||||||
|
|
||||||
|
.. code:: bash
|
||||||
|
|
||||||
|
export AMQP_URL="amqp://user:password@rabbitmq_host:5672/%2f"
|
||||||
|
python my-example-app.py
|
||||||
|
|
||||||
|
|
||||||
|
.. code:: python
|
||||||
|
|
||||||
|
import json
|
||||||
|
|
||||||
|
from tornado import gen
|
||||||
|
from tornado import web
|
||||||
|
from sprockets.mixins import avro_publisher
|
||||||
|
|
||||||
|
class RequestHandler(avro_publisher.AvroPublishingMixin, web.RequestHandler):
|
||||||
|
|
||||||
|
@gen.coroutine
|
||||||
|
def get(self, *args, **kwargs):
|
||||||
|
body = {'request': self.request.path, 'args': args, 'kwargs': kwargs}
|
||||||
|
yield self.amqp_publish('exchange', 'routing.key', json.dumps(body),
|
||||||
|
{'content_type': avro_publisher.DATUM_MIME_TYPE,
|
||||||
|
'type': 'avro-schema-name'
|
||||||
|
|
||||||
|
|
||||||
|
})
|
||||||
|
|
||||||
|
settings = {'avro_schema_uri_format': 'http://my-schema-repository/%(name)s.avsc'}
|
||||||
|
application = web.Application([(r"/", RequestHandler),],
|
||||||
|
debug=True,
|
||||||
|
**settings)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
application.listen(8888)
|
||||||
|
logging.basicConfig(level=logging.INFO)
|
||||||
|
ioloop.IOLoop.current().start()
|
||||||
|
|
||||||
|
|
||||||
|
Source
|
||||||
|
------
|
||||||
|
``sprockets.mixins.avro_publisher`` source is available on Github at `https://github.com/sprockets/sprockets.mixins.avro_publisher <https://github.com/sprockets/sprockets.mixins.avro_publisher>`_
|
||||||
|
|
||||||
|
License
|
||||||
|
-------
|
||||||
|
``sprockets.mixins.avro_publisher`` is released under the `3-Clause BSD license <https://github.com/sprockets/sprockets.mixins.avro_publisher/blob/master/LICENSE>`_.
|
||||||
|
|
||||||
|
.. |Version| image:: https://badge.fury.io/py/sprockets.mixins.avro_publisher.svg?
|
||||||
|
:target: http://badge.fury.io/py/sprockets.mixins.avro_publisher
|
||||||
|
|
||||||
|
.. |Travis| image:: https://travis-ci.org/sprockets/sprockets.mixins.avro_publisher.svg?branch=master
|
||||||
|
:target: https://travis-ci.org/sprockets/sprockets.mixins.avro_publisher
|
||||||
|
|
||||||
|
.. |CodeCov| image:: http://codecov.io/github/sprockets/sprockets.mixins.avro_publisher/coverage.svg?branch=master
|
||||||
|
:target: https://codecov.io/github/sprockets/sprockets.mixins.avro_publisher?branch=master
|
||||||
|
|
||||||
|
.. |Downloads| image:: https://pypip.in/d/sprockets.mixins.avro_publisher/badge.svg?
|
||||||
|
:target: https://pypi.python.org/pypi/sprockets.mixins.avro_publisher
|
||||||
|
|
||||||
|
.. |ReadTheDocs| image:: https://readthedocs.org/projects/sprocketsamqp/badge/
|
||||||
|
:target: https://sprocketsamqp.readthedocs.org
|
2
requires/python2.txt
Normal file
2
requires/python2.txt
Normal file
|
@ -0,0 +1,2 @@
|
||||||
|
sprockets.mixins.amqp>=0.1.1,<1
|
||||||
|
avro>=1.7.7,<2
|
2
requires/python3.txt
Normal file
2
requires/python3.txt
Normal file
|
@ -0,0 +1,2 @@
|
||||||
|
sprockets.mixins.amqp>=0.1.1,<1
|
||||||
|
avro-python3>=1.7.7,<2
|
4
requires/testing.txt
Normal file
4
requires/testing.txt
Normal file
|
@ -0,0 +1,4 @@
|
||||||
|
coverage>=3.7,<4
|
||||||
|
nose>=1.3.1,<2.0.0
|
||||||
|
wheel
|
||||||
|
-r installation.txt
|
7
setup.cfg
Normal file
7
setup.cfg
Normal file
|
@ -0,0 +1,7 @@
|
||||||
|
[bdist_wheel]
|
||||||
|
universal = 1
|
||||||
|
|
||||||
|
[nosetests]
|
||||||
|
with-coverage = 1
|
||||||
|
cover-erase = 1
|
||||||
|
cover-package = sprockets.mixins.amqp
|
36
setup.py
Normal file
36
setup.py
Normal file
|
@ -0,0 +1,36 @@
|
||||||
|
import setuptools
|
||||||
|
import sys
|
||||||
|
|
||||||
|
requires = 'requires/python{0}.txt'.format(sys.version_info[0])
|
||||||
|
print(requires)
|
||||||
|
with open(requires) as handle:
|
||||||
|
requirements = [line.strip() for line in handle.readlines()]
|
||||||
|
|
||||||
|
|
||||||
|
setuptools.setup(
|
||||||
|
name='sprockets.mixins.avro_publisher',
|
||||||
|
version='0.1.0',
|
||||||
|
description='Mixin for publishing events to RabbitMQ as avro datums',
|
||||||
|
long_description=open('README.rst').read(),
|
||||||
|
url='https://github.com/sprockets/sprockets.mixins.amqp',
|
||||||
|
author='AWeber Communications, Inc.',
|
||||||
|
author_email='api@aweber.com',
|
||||||
|
license='BSD',
|
||||||
|
classifiers=[
|
||||||
|
'Development Status :: 3 - Alpha', 'Intended Audience :: Developers',
|
||||||
|
'License :: OSI Approved :: BSD License', 'Natural Language :: English',
|
||||||
|
'Operating System :: OS Independent',
|
||||||
|
'Programming Language :: Python :: 2',
|
||||||
|
'Programming Language :: Python :: 2.7',
|
||||||
|
'Programming Language :: Python :: 3',
|
||||||
|
'Programming Language :: Python :: 3.4',
|
||||||
|
'Programming Language :: Python :: 3.5',
|
||||||
|
'Programming Language :: Python :: Implementation :: CPython',
|
||||||
|
'Programming Language :: Python :: Implementation :: PyPy',
|
||||||
|
'Topic :: Software Development :: Libraries',
|
||||||
|
'Topic :: Software Development :: Libraries :: Python Modules'
|
||||||
|
],
|
||||||
|
py_modules =['sprockets.mixins.avro_publisher'],
|
||||||
|
namespace_packages=['sprockets', 'sprockets.mixins'],
|
||||||
|
install_requires=requirements,
|
||||||
|
zip_safe=True)
|
1
sprockets/__init__.py
Normal file
1
sprockets/__init__.py
Normal file
|
@ -0,0 +1 @@
|
||||||
|
__import__('pkg_resources').declare_namespace(__name__)
|
BIN
sprockets/__init__.pyc
Normal file
BIN
sprockets/__init__.pyc
Normal file
Binary file not shown.
BIN
sprockets/__pycache__/__init__.cpython-35.pyc
Normal file
BIN
sprockets/__pycache__/__init__.cpython-35.pyc
Normal file
Binary file not shown.
1
sprockets/mixins/__init__.py
Normal file
1
sprockets/mixins/__init__.py
Normal file
|
@ -0,0 +1 @@
|
||||||
|
__import__('pkg_resources').declare_namespace(__name__)
|
BIN
sprockets/mixins/__init__.pyc
Normal file
BIN
sprockets/mixins/__init__.pyc
Normal file
Binary file not shown.
148
sprockets/mixins/avro_publisher.py
Normal file
148
sprockets/mixins/avro_publisher.py
Normal file
|
@ -0,0 +1,148 @@
|
||||||
|
"""The AvroPublishingMixin wraps RabbitMQ use into a request handler, with
|
||||||
|
methods to speed the development of publishing RabbitMQ messages serialized
|
||||||
|
as Avro datums.
|
||||||
|
|
||||||
|
RabbitMQ is configured using two environment variables: ``AMQP_URL`` and
|
||||||
|
``AMQP_TIMEOUT``.
|
||||||
|
|
||||||
|
``AMQP_URL`` is the AMQP url to connect to, defaults to
|
||||||
|
``amqp://guest:guest@localhost:5672/%2f``.
|
||||||
|
|
||||||
|
``AMQP_TIMEOUT`` is the number of seconds to wait until timing out when
|
||||||
|
connecting to RabbitMQ.
|
||||||
|
|
||||||
|
To configure the URL format for the avro schema, add a
|
||||||
|
Tornado application setting called ``avro_schema_uri_format``. The format
|
||||||
|
should be similar to the following:
|
||||||
|
|
||||||
|
``http://my-schema-repository/avro/%(name)s.avsc``
|
||||||
|
|
||||||
|
"""
|
||||||
|
import io
|
||||||
|
import logging
|
||||||
|
import sys
|
||||||
|
|
||||||
|
from sprockets.mixins import amqp
|
||||||
|
from tornado import gen
|
||||||
|
from tornado import httpclient
|
||||||
|
import avro.io
|
||||||
|
import avro.schema
|
||||||
|
|
||||||
|
version_info = (0, 1, 0)
|
||||||
|
__version__ = '.'.join(str(v) for v in version_info)
|
||||||
|
|
||||||
|
LOGGER = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
PYTHON3 = True if sys.version_info > (3, 0, 0) else False
|
||||||
|
|
||||||
|
|
||||||
|
class AvroPublishingMixin(amqp.PublishingMixin):
|
||||||
|
"""The request handler will connect to RabbitMQ on the first request,
|
||||||
|
blocking until the connection and channel are established. If RabbitMQ
|
||||||
|
closes it's connection to the app at any point, a connection attempt will
|
||||||
|
be made on the next request.
|
||||||
|
|
||||||
|
This class implements a pattern for the use of a single AMQP connection
|
||||||
|
to RabbitMQ.
|
||||||
|
|
||||||
|
Expects the :envvar:`AMQP_URL` environment variable to construct
|
||||||
|
:class:`pika.connection.URLParameters`.
|
||||||
|
|
||||||
|
"""
|
||||||
|
DATUM_MIME_TYPE = 'application/vnd.apache.avro.datum'
|
||||||
|
DEFAULT_SCHEMA_URI_FORMAT = 'http://localhost/avro/%(name)s.avsc'
|
||||||
|
|
||||||
|
def initialize(self):
|
||||||
|
"""Initialize the RequestHandler ensuring there is an a dict for
|
||||||
|
caching avro schemas.
|
||||||
|
|
||||||
|
"""
|
||||||
|
super(AvroPublishingMixin, self).initialize()
|
||||||
|
if 'avro_schema_uri_format' not in self.application.settings:
|
||||||
|
LOGGER.warning('avro_schema_uri_format is not set, using default')
|
||||||
|
if not hasattr(self.application, 'avro'):
|
||||||
|
self.application.avro = {}
|
||||||
|
|
||||||
|
@gen.coroutine
|
||||||
|
def amqp_publish(self, exchange, routing_key, body, properties):
|
||||||
|
"""Publish the message to RabbitMQ
|
||||||
|
|
||||||
|
:param str exchange: The exchange to publish to
|
||||||
|
:param str routing_key: The routing key to publish with
|
||||||
|
:param dict body: The message body
|
||||||
|
:param dict properties: The message properties
|
||||||
|
|
||||||
|
"""
|
||||||
|
if ('content_type' in properties and
|
||||||
|
properties['content_type']) == self.DATUM_MIME_TYPE:
|
||||||
|
body = yield self._avro_serialize(properties['type'], body)
|
||||||
|
yield self.application.amqp.publish(exchange, routing_key, body,
|
||||||
|
properties)
|
||||||
|
|
||||||
|
@gen.coroutine
|
||||||
|
def _avro_fetch_schema(self, schema_name):
|
||||||
|
"""Fetch the avro schema file from the remote HTTP endpoint
|
||||||
|
|
||||||
|
:param str schema_name: The schema name
|
||||||
|
:rtype: str
|
||||||
|
|
||||||
|
"""
|
||||||
|
http_client = httpclient.AsyncHTTPClient()
|
||||||
|
url = self._avro_schema_url(schema_name)
|
||||||
|
LOGGER.info('Loading schema for %s from %s', schema_name, url)
|
||||||
|
try:
|
||||||
|
response = yield http_client.fetch(url)
|
||||||
|
except httpclient.HTTPError as error:
|
||||||
|
LOGGER.error('Could not fetch Avro schema for %s (%s)', schema_name,
|
||||||
|
error)
|
||||||
|
raise ValueError('Error fetching avro schema')
|
||||||
|
raise gen.Return(response.body)
|
||||||
|
|
||||||
|
@gen.coroutine
|
||||||
|
def _avro_schema(self, schema_name):
|
||||||
|
"""Fetch the Avro schema file from cache or the filesystem.
|
||||||
|
|
||||||
|
:param str schema_name: The avro schema name
|
||||||
|
:rtype: str
|
||||||
|
|
||||||
|
"""
|
||||||
|
if schema_name not in self.application.avro:
|
||||||
|
schema = yield self._avro_fetch_schema(schema_name)
|
||||||
|
if PYTHON3:
|
||||||
|
schema = str(schema, 'utf-8')
|
||||||
|
self.application.avro[schema_name] = avro.schema.Parse(schema)
|
||||||
|
else:
|
||||||
|
self.application.avro[schema_name] = avro.schema.parse(schema)
|
||||||
|
raise gen.Return(self.application.avro[schema_name])
|
||||||
|
|
||||||
|
def _avro_schema_url(self, schema_name):
|
||||||
|
"""Return the Avro schema URL for the specified schema name.
|
||||||
|
|
||||||
|
:param str schema_name: The avro schema name
|
||||||
|
:rtype: str
|
||||||
|
|
||||||
|
"""
|
||||||
|
if 'avro_schema_uri_format' in self.application.settings:
|
||||||
|
schema_format = self.application.settings['avro_schema_uri_format']
|
||||||
|
else:
|
||||||
|
schema_format = self.DEFAULT_SCHEMA_URI_FORMAT
|
||||||
|
return schema_format % {'name': schema_name}
|
||||||
|
|
||||||
|
@gen.coroutine
|
||||||
|
def _avro_serialize(self, schema_name, data):
|
||||||
|
"""Serialize a data structure into an Avro datum
|
||||||
|
|
||||||
|
:param str schema_name: The Avro schema name
|
||||||
|
:param dict data: The value to turn into an Avro datum
|
||||||
|
:rtype: str
|
||||||
|
|
||||||
|
"""
|
||||||
|
schema = yield self._avro_schema(schema_name)
|
||||||
|
bytes_io = io.BytesIO()
|
||||||
|
encoder = avro.io.BinaryEncoder(bytes_io)
|
||||||
|
writer = avro.io.DatumWriter(schema)
|
||||||
|
try:
|
||||||
|
writer.write(data, encoder)
|
||||||
|
except avro.io.AvroTypeException as error:
|
||||||
|
raise ValueError(error)
|
||||||
|
raise gen.Return(bytes_io.getvalue())
|
Loading…
Reference in a new issue