diff --git a/.github/workflows/deploy.yaml b/.github/workflows/deploy.yaml new file mode 100644 index 0000000..f75d195 --- /dev/null +++ b/.github/workflows/deploy.yaml @@ -0,0 +1,20 @@ +name: Deployment +on: + push: + branches-ignore: ["*"] + tags: ["*"] +jobs: + deploy: + runs-on: ubuntu-latest + if: github.event_name == 'push' && startsWith(github.event.ref, 'refs/tags') && github.repository == 'sprockets/sprockets.mixins.avro-publisher' + container: python:3.8-alpine + steps: + - name: Checkout repository + uses: actions/checkout@v1 + - name: Build package + run: python3 setup.py sdist + - name: Publish package + uses: pypa/gh-action-pypi-publish@master + with: + user: __token__ + password: ${{ secrets.PYPI_PASSWORD }} diff --git a/.github/workflows/testing.yaml b/.github/workflows/testing.yaml new file mode 100644 index 0000000..2d833d0 --- /dev/null +++ b/.github/workflows/testing.yaml @@ -0,0 +1,64 @@ +name: Testing +on: + push: + branches: ["*"] + paths-ignore: + - 'docs/**' + - 'setup.*' + - '*.md' + - '*.rst' + tags-ignore: ["*"] +jobs: + test: + runs-on: ubuntu-latest + services: + rabbitmq: + image: rabbitmq:3.8 + options: >- + --health-cmd "/opt/rabbitmq/sbin/rabbitmqctl node_health_check" + --health-interval 10s + --health-timeout 10s + --health-retries 5 + ports: + - 5672:5672 + - 15672:15672 + env: + AMQP_EXCHANGE: amq.topic + AMQP_URL: amqp://guest:guest@rabbitmq:5672/%2f + RABBIMQ_URL: http://guest:guest@rabbitmq:15672 + strategy: + matrix: + python: [3.7, 3.8] + container: + image: python:${{ matrix.python }}-alpine + steps: + - name: Checkout repository + uses: actions/checkout@v1 + + - name: Install GCC Libraries and Headers + run: apk add gcc linux-headers make musl-dev python3-dev + + - name: Install testing dependencies + run: pip3 install -r requires/testing.txt + + - name: Install library dependencies + run: python3 setup.py develop + + - name: Create build directory + run: mkdir -p build + + - name: Run flake8 tests + run: flake8 --output build/flake8.txt --tee + + - name: Run tests + run: coverage run + + - name: Generate reports + run: coverage xml && coverage report + + - name: Upload Coverage + uses: codecov/codecov-action@v1.0.2 + if: github.repository == 'sprockets/sprockets.mixins.avro-publisher' + with: + token: ${{secrets.CODECOV_TOKEN}} + file: build/coverage.xml diff --git a/.travis.yml b/.travis.yml deleted file mode 100644 index 7d3ad93..0000000 --- a/.travis.yml +++ /dev/null @@ -1,23 +0,0 @@ -language: python -python: - - 2.7 - - pypy - - 3.4 -services: - - rabbitmq -install: - - pip install -r requires/testing.txt -e . -script: - - nosetests -after_success: - - codecov -deploy: - provider: pypi - user: sprockets - password: - secure: TPpdlEbbxfjcFXR7KizRNiwqZDdB8C8/y0zE/nbFvlds1ZfdqRwCZJlvgsVwhUfiMrUfKiZ3sg1kSI6wN0Li8Z4DVe8RPyhnOXjml6DDao4iZ569LYEfqMdfWrp6NcN/+sMOpGlq5XuQMcdsy3P9uP8WGOUzRwjuQ0ny+2BN8yxD3TxY+TqgYo3FaCYwR0bp5u8l1pmX9gIbD8DhcbbC7EyO+/t8IZj4x5TxIQamIvhWyd8LIFpvR1FcCKRPbqu2x2fPZG4t6YwBHbcmLf8VnZx5xFGvOKEP9HaN4YkWtSIHQ/RhCuFslSPg4peHK3xgurDKMsXdvxnsV2AgSQBEQEaWt6ewACbM4nyW09K++LKK0F19U6keSzYgLZZK+Twsn02xhNpQf58k5kuAB0pJNm+EFxymUcJjR5g9gnVE2ln/Y3MkU1YhXRJGvo0hwdcytkDaPitIASuPC9buy/UQ8smzYPfA60PAF9Dl0/gq8lIWteq3u6PJQT+qtSobWwhR/nFYqTWDMk1sZu4sHVe1IPhSnJonlWccPe/AcS6qG8QNUt5n4fC1l5rerfsiplo+aSLH8gb6p5eiueBAXGwH/akZa6nb9hs1gFFZicHlCzMXlvA845qiLBHbj1ABNJri8jnRvtNxNcYdqBu73lkhExIHlsIG8sgRw93bN1ys73A= - on: - python: 3.4 - distributions: sdist bdist_wheel - tags: true - all_branches: true diff --git a/LICENSE b/LICENSE index 3c72859..c896130 100644 --- a/LICENSE +++ b/LICENSE @@ -1,4 +1,4 @@ -Copyright (c) 2015 AWeber Communications +Copyright (c) 2015-2020 AWeber Communications All rights reserved. Redistribution and use in source and binary forms, with or without modification, diff --git a/README.rst b/README.rst index eb5c69a..698b167 100644 --- a/README.rst +++ b/README.rst @@ -16,9 +16,9 @@ and can be installed via ``pip`` or ``easy_install``: Requirements ------------ -- sprockets.mixins.amqp>=2.0.0 +- sprockets.mixins.amqp>=3.0.0 - fastavro>=0.10.1,<1.0.0 -- tornado>=4.2.0,<5.0.0 +- tornado>=6,<7 Example ------- @@ -48,10 +48,9 @@ This examples demonstrates the most basic usage of ``sprockets.mixins.avro-publi class RequestHandler(avro_publisher.PublishingMixin, web.RequestHandler): - @gen.coroutine - def get(self, *args, **kwargs): + async def get(self, *args, **kwargs): body = {'request': self.request.path, 'args': args, 'kwargs': kwargs} - yield self.avro_amqp_publish( + await self.avro_amqp_publish( 'exchange', 'routing.key', 'avro-schema-name' diff --git a/requires/installation.txt b/requires/installation.txt index 140e05b..4cf80a6 100644 --- a/requires/installation.txt +++ b/requires/installation.txt @@ -1,4 +1,4 @@ -sprockets.mixins.amqp>=2.1.1,<3 -sprockets.mixins.http>=1.0.3,<2 +sprockets.mixins.amqp>=3.0.0,<4 +sprockets.mixins.http>=2.3.0,<3 fastavro>=0.10.1,<1.0.0 -tornado>=4.2.0,<5.0.0 +tornado>=6,<7 \ No newline at end of file diff --git a/requires/testing.txt b/requires/testing.txt index 5a19b4f..62e755a 100644 --- a/requires/testing.txt +++ b/requires/testing.txt @@ -1,7 +1,10 @@ -codecov coverage -nose -mock flake8 -pylint --r installation.txt +flake8-comprehensions +flake8-deprecated +flake8-import-order +flake8-print +flake8-quotes +flake8-rst-docstrings +flake8-tuple +-r installation.txt \ No newline at end of file diff --git a/setup.cfg b/setup.cfg index da3855b..345ca3e 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,15 +1,28 @@ [bdist_wheel] universal = 1 -[nosetests] -cover-branches = 1 -cover-erase = 1 -cover-package = sprockets.mixins.avro_publisher -with-coverage = 1 -verbosity = 2 +[coverage:run] +branch = True +command_line = -m unittest discover tests --verbose +data_file = build/.coverage + +[coverage:report] +show_missing = True +include = sprockets/mixins/avro_publisher/*.py +omit = + tests.py + +[coverage:html] +directory = build/coverage + +[coverage:xml] +output = build/coverage.xml [upload_docs] upload-dir = build/sphinx/html [flake8] -exclude = env,build +application-import-names = sprockets.mixins,tests +exclude = build,dist,docs,env +import-order-style = pycharm +rst-roles = attr,class,const,data,exc,func,meth,mod,obj diff --git a/setup.py b/setup.py index 9c4856f..63b039e 100644 --- a/setup.py +++ b/setup.py @@ -3,6 +3,8 @@ import os.path import setuptools +from sprockets.mixins.avro_publisher import __version__ + def read_requirements(name): requirements = [] @@ -23,7 +25,7 @@ def read_requirements(name): setuptools.setup( name='sprockets.mixins.avro-publisher', - version='2.1.0', + version=__version__, description='Mixin for publishing events to RabbitMQ as avro datums', long_description=open('README.rst').read(), url='https://github.com/sprockets/sprockets.mixins.avro-publisher', @@ -31,16 +33,13 @@ setuptools.setup( author_email='api@aweber.com', license='BSD', classifiers=[ - 'Development Status :: 4 - Beta', + 'Development Status :: 5 - Production/Stable', 'Intended Audience :: Developers', 'License :: OSI Approved :: BSD License', 'Natural Language :: English', 'Operating System :: OS Independent', - 'Programming Language :: Python :: 2', - 'Programming Language :: Python :: 2.7', - 'Programming Language :: Python :: 3', - 'Programming Language :: Python :: 3.4', - 'Programming Language :: Python :: 3.5', + 'Programming Language :: Python :: 3.7', + 'Programming Language :: Python :: 3.8', 'Programming Language :: Python :: Implementation :: CPython', 'Topic :: Software Development :: Libraries', 'Topic :: Software Development :: Libraries :: Python Modules' diff --git a/sprockets/mixins/avro_publisher/__init__.py b/sprockets/mixins/avro_publisher/__init__.py index 8fdbedb..3dcb232 100644 --- a/sprockets/mixins/avro_publisher/__init__.py +++ b/sprockets/mixins/avro_publisher/__init__.py @@ -14,11 +14,11 @@ import io import json import logging -from sprockets.mixins import amqp, http -from tornado import gen import fastavro -__version__ = '2.1.0' +from sprockets.mixins import amqp, http + +__version__ = '3.0.0' LOGGER = logging.getLogger(__name__) @@ -62,8 +62,7 @@ class PublishingMixin(amqp.PublishingMixin, http.HTTPClientMixin): properties['type'] = message_type return self.amqp_publish(exchange, routing_key, data, properties) - @gen.coroutine - def amqp_publish(self, exchange, routing_key, body, properties=None): + async def amqp_publish(self, exchange, routing_key, body, properties=None): """Publish a message to RabbitMQ, serializing the payload data as an Avro datum if the message is to be sent as such. @@ -83,14 +82,13 @@ class PublishingMixin(amqp.PublishingMixin, http.HTTPClientMixin): properties = properties or {} if properties.get('content_type') == DATUM_MIME_TYPE and \ isinstance(body, dict): - avro_schema = yield self._schema(properties['type']) + avro_schema = await self._schema(properties['type']) LOGGER.debug('Schema: %r', avro_schema) body = self._serialize(avro_schema, body) - yield super(PublishingMixin, self).amqp_publish( + await super(PublishingMixin, self).amqp_publish( exchange, routing_key, body, properties) - @gen.coroutine - def _schema(self, message_type): + async def _schema(self, message_type): """Fetch the Avro schema file from application cache or the remote URI. If the request for the schema from the remote URI fails, a :exc:`sprockets.mixins.avro_publisher.SchemaFetchError` will be @@ -104,12 +102,11 @@ class PublishingMixin(amqp.PublishingMixin, http.HTTPClientMixin): global _SCHEMAS if message_type not in _SCHEMAS: - schema = yield self._fetch_schema(message_type) + schema = await self._fetch_schema(message_type) _SCHEMAS[message_type] = schema - raise gen.Return(_SCHEMAS[message_type]) + return _SCHEMAS[message_type] - @gen.coroutine - def _fetch_schema(self, message_type): + async def _fetch_schema(self, message_type): """Fetch the Avro schema for the given message type from a remote location, returning the schema JSON string. @@ -122,9 +119,9 @@ class PublishingMixin(amqp.PublishingMixin, http.HTTPClientMixin): :raises: sprockets.mixins.avro_publisher.SchemaFetchError """ - response = yield self.http_fetch(self._schema_url(message_type)) + response = await self.http_fetch(self._schema_url(message_type)) if response.ok: - raise gen.Return(json.loads(response.raw.body.decode('utf-8'))) + return json.loads(response.raw.body.decode('utf-8')) raise SchemaFetchError() def _schema_url(self, message_type): diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests.py b/tests/tests.py similarity index 80% rename from tests.py rename to tests/tests.py index 7f65546..37488b5 100644 --- a/tests.py +++ b/tests/tests.py @@ -1,27 +1,29 @@ -import json import io +import json import logging +import os import random +import sys import uuid -from tornado import concurrent, gen, locks, testing, web import fastavro from pika import spec +from tornado import concurrent, locks, testing, web from sprockets.mixins import amqp, avro_publisher LOGGER = logging.getLogger(__name__) -MESSAGE_TYPE = "example.avro.Test" +MESSAGE_TYPE = 'example.avro.Test' AVRO_SCHEMA = { - "namespace": "example.avro", - "type": "record", - "name": "User", - "fields": [ - {"name": "name", "type": "string"}, - {"name": "favorite_number", "type": ["int", "null"]}, - {"name": "favorite_color", "type": ["string", "null"]}]} + 'namespace': 'example.avro', + 'type': 'record', + 'name': 'User', + 'fields': [ + {'name': 'name', 'type': 'string'}, + {'name': 'favorite_number', 'type': ['int', 'null']}, + {'name': 'favorite_color', 'type': ['string', 'null']}]} def deserialize(value): @@ -34,19 +36,17 @@ class Test1RequestHandler(avro_publisher.PublishingMixin, web.RequestHandler): self.correlation_id = self.request.headers.get('Correlation-Id') self.publish = self.amqp_publish - @gen.coroutine - def get(self, *args, **kwargs): + async def get(self, *args, **kwargs): LOGGER.debug('Handling Request %r', self.correlation_id) parameters = self.parameters() try: - yield self.publish(**parameters) + await self.publish(**parameters) except amqp.AMQPException as error: self.write({'error': str(error), 'type': error.__class__.__name__, 'parameters': parameters}) else: self.write(parameters) # Correlation-ID is added pass by reference - self.finish() def parameters(self): return { @@ -108,17 +108,39 @@ class AsyncHTTPTestCase(testing.AsyncHTTPTestCase): 'on_ready_callback': self.on_amqp_ready, 'enable_confirmations': self.CONFIRMATIONS, 'on_return_callback': self.on_message_returned, - 'url': 'amqp://guest:guest@127.0.0.1:5672/%2f'}) + 'url': os.environ['AMQP_URL']}) + + def wait_on_ready(): + if self._app.amqp.ready: + self.io_loop.stop() + else: + self.io_loop.call_later(0.1, wait_on_ready) + + sys.stdout.flush() + self.io_loop.add_callback(wait_on_ready) self.io_loop.start() + def tearDown(self): + def shutdown(): + if self._app.amqp.closed: + self.io_loop.stop() + elif not self._app.amqp.closing: + self._app.amqp.close() + self.io_loop.call_later(0.1, shutdown) + + self.io_loop.add_callback(shutdown) + self.io_loop.start() + super().tearDown() + def get_app(self): - return web.Application( + application = web.Application( [(r'/test1', Test1RequestHandler), (r'/test2', Test2RequestHandler), (r'/schema/(.*).avsc', SchemaRequestHandler)], **{'avro_schema_uri_format': self.get_url('/schema/%(name)s.avsc'), 'service': 'test', 'version': avro_publisher.__version__}) + return application def on_amqp_ready(self, _client): LOGGER.debug('AMQP ready') @@ -154,13 +176,13 @@ class AsyncHTTPTestCase(testing.AsyncHTTPTestCase): class PublisherConfirmationTestCase(AsyncHTTPTestCase): @testing.gen_test - def test_amqp_publish(self): - response = yield self.http_client.fetch( + async def test_amqp_publish(self): + response = await self.http_client.fetch( self.get_url('/test1?exchange={}&routing_key={}'.format( self.exchange, self.routing_key)), headers={'Correlation-Id': self.correlation_id}) published = json.loads(response.body.decode('utf-8')) - delivered = yield self.get_delivered_message + delivered = await self.get_delivered_message self.assertIsInstance(delivered[0], spec.Basic.Deliver) self.assertEqual(delivered[1].correlation_id, self.correlation_id) self.assertEqual( @@ -169,13 +191,13 @@ class PublisherConfirmationTestCase(AsyncHTTPTestCase): self.assertEqual(deserialize(delivered[2]), published['body']) @testing.gen_test - def test_avro_amqp_publish(self): - response = yield self.http_client.fetch( + async def test_avro_amqp_publish(self): + response = await self.http_client.fetch( self.get_url('/test2?exchange={}&routing_key={}'.format( self.exchange, self.routing_key)), headers={'Correlation-Id': self.correlation_id}) published = json.loads(response.body.decode('utf-8')) - delivered = yield self.get_delivered_message + delivered = await self.get_delivered_message self.assertIsInstance(delivered[0], spec.Basic.Deliver) self.assertEqual(delivered[1].correlation_id, self.correlation_id) self.assertEqual(