Merge pull request #8 from prashantkb/support-tornado-6

Support tornado 6
This commit is contained in:
Gavin M. Roy 2020-03-18 10:43:10 -04:00 committed by GitHub
commit bbf422f689
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 182 additions and 88 deletions

20
.github/workflows/deploy.yaml vendored Normal file
View file

@ -0,0 +1,20 @@
name: Deployment
on:
push:
branches-ignore: ["*"]
tags: ["*"]
jobs:
deploy:
runs-on: ubuntu-latest
if: github.event_name == 'push' && startsWith(github.event.ref, 'refs/tags') && github.repository == 'sprockets/sprockets.mixins.avro-publisher'
container: python:3.8-alpine
steps:
- name: Checkout repository
uses: actions/checkout@v1
- name: Build package
run: python3 setup.py sdist
- name: Publish package
uses: pypa/gh-action-pypi-publish@master
with:
user: __token__
password: ${{ secrets.PYPI_PASSWORD }}

64
.github/workflows/testing.yaml vendored Normal file
View file

@ -0,0 +1,64 @@
name: Testing
on:
push:
branches: ["*"]
paths-ignore:
- 'docs/**'
- 'setup.*'
- '*.md'
- '*.rst'
tags-ignore: ["*"]
jobs:
test:
runs-on: ubuntu-latest
services:
rabbitmq:
image: rabbitmq:3.8
options: >-
--health-cmd "/opt/rabbitmq/sbin/rabbitmqctl node_health_check"
--health-interval 10s
--health-timeout 10s
--health-retries 5
ports:
- 5672:5672
- 15672:15672
env:
AMQP_EXCHANGE: amq.topic
AMQP_URL: amqp://guest:guest@rabbitmq:5672/%2f
RABBIMQ_URL: http://guest:guest@rabbitmq:15672
strategy:
matrix:
python: [3.7, 3.8]
container:
image: python:${{ matrix.python }}-alpine
steps:
- name: Checkout repository
uses: actions/checkout@v1
- name: Install GCC Libraries and Headers
run: apk add gcc linux-headers make musl-dev python3-dev
- name: Install testing dependencies
run: pip3 install -r requires/testing.txt
- name: Install library dependencies
run: python3 setup.py develop
- name: Create build directory
run: mkdir -p build
- name: Run flake8 tests
run: flake8 --output build/flake8.txt --tee
- name: Run tests
run: coverage run
- name: Generate reports
run: coverage xml && coverage report
- name: Upload Coverage
uses: codecov/codecov-action@v1.0.2
if: github.repository == 'sprockets/sprockets.mixins.avro-publisher'
with:
token: ${{secrets.CODECOV_TOKEN}}
file: build/coverage.xml

View file

@ -1,23 +0,0 @@
language: python
python:
- 2.7
- pypy
- 3.4
services:
- rabbitmq
install:
- pip install -r requires/testing.txt -e .
script:
- nosetests
after_success:
- codecov
deploy:
provider: pypi
user: sprockets
password:
secure: TPpdlEbbxfjcFXR7KizRNiwqZDdB8C8/y0zE/nbFvlds1ZfdqRwCZJlvgsVwhUfiMrUfKiZ3sg1kSI6wN0Li8Z4DVe8RPyhnOXjml6DDao4iZ569LYEfqMdfWrp6NcN/+sMOpGlq5XuQMcdsy3P9uP8WGOUzRwjuQ0ny+2BN8yxD3TxY+TqgYo3FaCYwR0bp5u8l1pmX9gIbD8DhcbbC7EyO+/t8IZj4x5TxIQamIvhWyd8LIFpvR1FcCKRPbqu2x2fPZG4t6YwBHbcmLf8VnZx5xFGvOKEP9HaN4YkWtSIHQ/RhCuFslSPg4peHK3xgurDKMsXdvxnsV2AgSQBEQEaWt6ewACbM4nyW09K++LKK0F19U6keSzYgLZZK+Twsn02xhNpQf58k5kuAB0pJNm+EFxymUcJjR5g9gnVE2ln/Y3MkU1YhXRJGvo0hwdcytkDaPitIASuPC9buy/UQ8smzYPfA60PAF9Dl0/gq8lIWteq3u6PJQT+qtSobWwhR/nFYqTWDMk1sZu4sHVe1IPhSnJonlWccPe/AcS6qG8QNUt5n4fC1l5rerfsiplo+aSLH8gb6p5eiueBAXGwH/akZa6nb9hs1gFFZicHlCzMXlvA845qiLBHbj1ABNJri8jnRvtNxNcYdqBu73lkhExIHlsIG8sgRw93bN1ys73A=
on:
python: 3.4
distributions: sdist bdist_wheel
tags: true
all_branches: true

View file

@ -1,4 +1,4 @@
Copyright (c) 2015 AWeber Communications Copyright (c) 2015-2020 AWeber Communications
All rights reserved. All rights reserved.
Redistribution and use in source and binary forms, with or without modification, Redistribution and use in source and binary forms, with or without modification,

View file

@ -16,9 +16,9 @@ and can be installed via ``pip`` or ``easy_install``:
Requirements Requirements
------------ ------------
- sprockets.mixins.amqp>=2.0.0 - sprockets.mixins.amqp>=3.0.0
- fastavro>=0.10.1,<1.0.0 - fastavro>=0.10.1,<1.0.0
- tornado>=4.2.0,<5.0.0 - tornado>=6,<7
Example Example
------- -------
@ -48,10 +48,9 @@ This examples demonstrates the most basic usage of ``sprockets.mixins.avro-publi
class RequestHandler(avro_publisher.PublishingMixin, web.RequestHandler): class RequestHandler(avro_publisher.PublishingMixin, web.RequestHandler):
@gen.coroutine async def get(self, *args, **kwargs):
def get(self, *args, **kwargs):
body = {'request': self.request.path, 'args': args, 'kwargs': kwargs} body = {'request': self.request.path, 'args': args, 'kwargs': kwargs}
yield self.avro_amqp_publish( await self.avro_amqp_publish(
'exchange', 'exchange',
'routing.key', 'routing.key',
'avro-schema-name' 'avro-schema-name'

View file

@ -1,4 +1,4 @@
sprockets.mixins.amqp>=2.1.1,<3 sprockets.mixins.amqp>=3.0.0,<4
sprockets.mixins.http>=1.0.3,<2 sprockets.mixins.http>=2.3.0,<3
fastavro>=0.10.1,<1.0.0 fastavro>=0.10.1,<1.0.0
tornado>=4.2.0,<5.0.0 tornado>=6,<7

View file

@ -1,7 +1,10 @@
codecov
coverage coverage
nose
mock
flake8 flake8
pylint flake8-comprehensions
-r installation.txt flake8-deprecated
flake8-import-order
flake8-print
flake8-quotes
flake8-rst-docstrings
flake8-tuple
-r installation.txt

View file

@ -1,15 +1,28 @@
[bdist_wheel] [bdist_wheel]
universal = 1 universal = 1
[nosetests] [coverage:run]
cover-branches = 1 branch = True
cover-erase = 1 command_line = -m unittest discover tests --verbose
cover-package = sprockets.mixins.avro_publisher data_file = build/.coverage
with-coverage = 1
verbosity = 2 [coverage:report]
show_missing = True
include = sprockets/mixins/avro_publisher/*.py
omit =
tests.py
[coverage:html]
directory = build/coverage
[coverage:xml]
output = build/coverage.xml
[upload_docs] [upload_docs]
upload-dir = build/sphinx/html upload-dir = build/sphinx/html
[flake8] [flake8]
exclude = env,build application-import-names = sprockets.mixins,tests
exclude = build,dist,docs,env
import-order-style = pycharm
rst-roles = attr,class,const,data,exc,func,meth,mod,obj

View file

@ -3,6 +3,8 @@ import os.path
import setuptools import setuptools
from sprockets.mixins.avro_publisher import __version__
def read_requirements(name): def read_requirements(name):
requirements = [] requirements = []
@ -23,7 +25,7 @@ def read_requirements(name):
setuptools.setup( setuptools.setup(
name='sprockets.mixins.avro-publisher', name='sprockets.mixins.avro-publisher',
version='2.1.0', version=__version__,
description='Mixin for publishing events to RabbitMQ as avro datums', description='Mixin for publishing events to RabbitMQ as avro datums',
long_description=open('README.rst').read(), long_description=open('README.rst').read(),
url='https://github.com/sprockets/sprockets.mixins.avro-publisher', url='https://github.com/sprockets/sprockets.mixins.avro-publisher',
@ -31,16 +33,13 @@ setuptools.setup(
author_email='api@aweber.com', author_email='api@aweber.com',
license='BSD', license='BSD',
classifiers=[ classifiers=[
'Development Status :: 4 - Beta', 'Development Status :: 5 - Production/Stable',
'Intended Audience :: Developers', 'Intended Audience :: Developers',
'License :: OSI Approved :: BSD License', 'License :: OSI Approved :: BSD License',
'Natural Language :: English', 'Natural Language :: English',
'Operating System :: OS Independent', 'Operating System :: OS Independent',
'Programming Language :: Python :: 2', 'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 2.7', 'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.4',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: Implementation :: CPython', 'Programming Language :: Python :: Implementation :: CPython',
'Topic :: Software Development :: Libraries', 'Topic :: Software Development :: Libraries',
'Topic :: Software Development :: Libraries :: Python Modules' 'Topic :: Software Development :: Libraries :: Python Modules'

View file

@ -14,11 +14,11 @@ import io
import json import json
import logging import logging
from sprockets.mixins import amqp, http
from tornado import gen
import fastavro import fastavro
__version__ = '2.1.0' from sprockets.mixins import amqp, http
__version__ = '3.0.0'
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
@ -62,8 +62,7 @@ class PublishingMixin(amqp.PublishingMixin, http.HTTPClientMixin):
properties['type'] = message_type properties['type'] = message_type
return self.amqp_publish(exchange, routing_key, data, properties) return self.amqp_publish(exchange, routing_key, data, properties)
@gen.coroutine async def amqp_publish(self, exchange, routing_key, body, properties=None):
def amqp_publish(self, exchange, routing_key, body, properties=None):
"""Publish a message to RabbitMQ, serializing the payload data as an """Publish a message to RabbitMQ, serializing the payload data as an
Avro datum if the message is to be sent as such. Avro datum if the message is to be sent as such.
@ -83,14 +82,13 @@ class PublishingMixin(amqp.PublishingMixin, http.HTTPClientMixin):
properties = properties or {} properties = properties or {}
if properties.get('content_type') == DATUM_MIME_TYPE and \ if properties.get('content_type') == DATUM_MIME_TYPE and \
isinstance(body, dict): isinstance(body, dict):
avro_schema = yield self._schema(properties['type']) avro_schema = await self._schema(properties['type'])
LOGGER.debug('Schema: %r', avro_schema) LOGGER.debug('Schema: %r', avro_schema)
body = self._serialize(avro_schema, body) body = self._serialize(avro_schema, body)
yield super(PublishingMixin, self).amqp_publish( await super(PublishingMixin, self).amqp_publish(
exchange, routing_key, body, properties) exchange, routing_key, body, properties)
@gen.coroutine async def _schema(self, message_type):
def _schema(self, message_type):
"""Fetch the Avro schema file from application cache or the remote """Fetch the Avro schema file from application cache or the remote
URI. If the request for the schema from the remote URI fails, a URI. If the request for the schema from the remote URI fails, a
:exc:`sprockets.mixins.avro_publisher.SchemaFetchError` will be :exc:`sprockets.mixins.avro_publisher.SchemaFetchError` will be
@ -104,12 +102,11 @@ class PublishingMixin(amqp.PublishingMixin, http.HTTPClientMixin):
global _SCHEMAS global _SCHEMAS
if message_type not in _SCHEMAS: if message_type not in _SCHEMAS:
schema = yield self._fetch_schema(message_type) schema = await self._fetch_schema(message_type)
_SCHEMAS[message_type] = schema _SCHEMAS[message_type] = schema
raise gen.Return(_SCHEMAS[message_type]) return _SCHEMAS[message_type]
@gen.coroutine async def _fetch_schema(self, message_type):
def _fetch_schema(self, message_type):
"""Fetch the Avro schema for the given message type from a remote """Fetch the Avro schema for the given message type from a remote
location, returning the schema JSON string. location, returning the schema JSON string.
@ -122,9 +119,9 @@ class PublishingMixin(amqp.PublishingMixin, http.HTTPClientMixin):
:raises: sprockets.mixins.avro_publisher.SchemaFetchError :raises: sprockets.mixins.avro_publisher.SchemaFetchError
""" """
response = yield self.http_fetch(self._schema_url(message_type)) response = await self.http_fetch(self._schema_url(message_type))
if response.ok: if response.ok:
raise gen.Return(json.loads(response.raw.body.decode('utf-8'))) return json.loads(response.raw.body.decode('utf-8'))
raise SchemaFetchError() raise SchemaFetchError()
def _schema_url(self, message_type): def _schema_url(self, message_type):

0
tests/__init__.py Normal file
View file

View file

@ -1,27 +1,29 @@
import json
import io import io
import json
import logging import logging
import os
import random import random
import sys
import uuid import uuid
from tornado import concurrent, gen, locks, testing, web
import fastavro import fastavro
from pika import spec from pika import spec
from tornado import concurrent, locks, testing, web
from sprockets.mixins import amqp, avro_publisher from sprockets.mixins import amqp, avro_publisher
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
MESSAGE_TYPE = "example.avro.Test" MESSAGE_TYPE = 'example.avro.Test'
AVRO_SCHEMA = { AVRO_SCHEMA = {
"namespace": "example.avro", 'namespace': 'example.avro',
"type": "record", 'type': 'record',
"name": "User", 'name': 'User',
"fields": [ 'fields': [
{"name": "name", "type": "string"}, {'name': 'name', 'type': 'string'},
{"name": "favorite_number", "type": ["int", "null"]}, {'name': 'favorite_number', 'type': ['int', 'null']},
{"name": "favorite_color", "type": ["string", "null"]}]} {'name': 'favorite_color', 'type': ['string', 'null']}]}
def deserialize(value): def deserialize(value):
@ -34,19 +36,17 @@ class Test1RequestHandler(avro_publisher.PublishingMixin, web.RequestHandler):
self.correlation_id = self.request.headers.get('Correlation-Id') self.correlation_id = self.request.headers.get('Correlation-Id')
self.publish = self.amqp_publish self.publish = self.amqp_publish
@gen.coroutine async def get(self, *args, **kwargs):
def get(self, *args, **kwargs):
LOGGER.debug('Handling Request %r', self.correlation_id) LOGGER.debug('Handling Request %r', self.correlation_id)
parameters = self.parameters() parameters = self.parameters()
try: try:
yield self.publish(**parameters) await self.publish(**parameters)
except amqp.AMQPException as error: except amqp.AMQPException as error:
self.write({'error': str(error), self.write({'error': str(error),
'type': error.__class__.__name__, 'type': error.__class__.__name__,
'parameters': parameters}) 'parameters': parameters})
else: else:
self.write(parameters) # Correlation-ID is added pass by reference self.write(parameters) # Correlation-ID is added pass by reference
self.finish()
def parameters(self): def parameters(self):
return { return {
@ -108,17 +108,39 @@ class AsyncHTTPTestCase(testing.AsyncHTTPTestCase):
'on_ready_callback': self.on_amqp_ready, 'on_ready_callback': self.on_amqp_ready,
'enable_confirmations': self.CONFIRMATIONS, 'enable_confirmations': self.CONFIRMATIONS,
'on_return_callback': self.on_message_returned, 'on_return_callback': self.on_message_returned,
'url': 'amqp://guest:guest@127.0.0.1:5672/%2f'}) 'url': os.environ['AMQP_URL']})
def wait_on_ready():
if self._app.amqp.ready:
self.io_loop.stop()
else:
self.io_loop.call_later(0.1, wait_on_ready)
sys.stdout.flush()
self.io_loop.add_callback(wait_on_ready)
self.io_loop.start() self.io_loop.start()
def tearDown(self):
def shutdown():
if self._app.amqp.closed:
self.io_loop.stop()
elif not self._app.amqp.closing:
self._app.amqp.close()
self.io_loop.call_later(0.1, shutdown)
self.io_loop.add_callback(shutdown)
self.io_loop.start()
super().tearDown()
def get_app(self): def get_app(self):
return web.Application( application = web.Application(
[(r'/test1', Test1RequestHandler), [(r'/test1', Test1RequestHandler),
(r'/test2', Test2RequestHandler), (r'/test2', Test2RequestHandler),
(r'/schema/(.*).avsc', SchemaRequestHandler)], (r'/schema/(.*).avsc', SchemaRequestHandler)],
**{'avro_schema_uri_format': self.get_url('/schema/%(name)s.avsc'), **{'avro_schema_uri_format': self.get_url('/schema/%(name)s.avsc'),
'service': 'test', 'service': 'test',
'version': avro_publisher.__version__}) 'version': avro_publisher.__version__})
return application
def on_amqp_ready(self, _client): def on_amqp_ready(self, _client):
LOGGER.debug('AMQP ready') LOGGER.debug('AMQP ready')
@ -154,13 +176,13 @@ class AsyncHTTPTestCase(testing.AsyncHTTPTestCase):
class PublisherConfirmationTestCase(AsyncHTTPTestCase): class PublisherConfirmationTestCase(AsyncHTTPTestCase):
@testing.gen_test @testing.gen_test
def test_amqp_publish(self): async def test_amqp_publish(self):
response = yield self.http_client.fetch( response = await self.http_client.fetch(
self.get_url('/test1?exchange={}&routing_key={}'.format( self.get_url('/test1?exchange={}&routing_key={}'.format(
self.exchange, self.routing_key)), self.exchange, self.routing_key)),
headers={'Correlation-Id': self.correlation_id}) headers={'Correlation-Id': self.correlation_id})
published = json.loads(response.body.decode('utf-8')) published = json.loads(response.body.decode('utf-8'))
delivered = yield self.get_delivered_message delivered = await self.get_delivered_message
self.assertIsInstance(delivered[0], spec.Basic.Deliver) self.assertIsInstance(delivered[0], spec.Basic.Deliver)
self.assertEqual(delivered[1].correlation_id, self.correlation_id) self.assertEqual(delivered[1].correlation_id, self.correlation_id)
self.assertEqual( self.assertEqual(
@ -169,13 +191,13 @@ class PublisherConfirmationTestCase(AsyncHTTPTestCase):
self.assertEqual(deserialize(delivered[2]), published['body']) self.assertEqual(deserialize(delivered[2]), published['body'])
@testing.gen_test @testing.gen_test
def test_avro_amqp_publish(self): async def test_avro_amqp_publish(self):
response = yield self.http_client.fetch( response = await self.http_client.fetch(
self.get_url('/test2?exchange={}&routing_key={}'.format( self.get_url('/test2?exchange={}&routing_key={}'.format(
self.exchange, self.routing_key)), self.exchange, self.routing_key)),
headers={'Correlation-Id': self.correlation_id}) headers={'Correlation-Id': self.correlation_id})
published = json.loads(response.body.decode('utf-8')) published = json.loads(response.body.decode('utf-8'))
delivered = yield self.get_delivered_message delivered = await self.get_delivered_message
self.assertIsInstance(delivered[0], spec.Basic.Deliver) self.assertIsInstance(delivered[0], spec.Basic.Deliver)
self.assertEqual(delivered[1].correlation_id, self.correlation_id) self.assertEqual(delivered[1].correlation_id, self.correlation_id)
self.assertEqual( self.assertEqual(