diff --git a/sprockets/mixins/avro_publisher/__init__.py b/sprockets/mixins/avro_publisher/__init__.py index 8fdbedb..2561e79 100644 --- a/sprockets/mixins/avro_publisher/__init__.py +++ b/sprockets/mixins/avro_publisher/__init__.py @@ -15,10 +15,9 @@ import json import logging from sprockets.mixins import amqp, http -from tornado import gen import fastavro -__version__ = '2.1.0' +__version__ = '3.0.0' LOGGER = logging.getLogger(__name__) @@ -62,8 +61,7 @@ class PublishingMixin(amqp.PublishingMixin, http.HTTPClientMixin): properties['type'] = message_type return self.amqp_publish(exchange, routing_key, data, properties) - @gen.coroutine - def amqp_publish(self, exchange, routing_key, body, properties=None): + async def amqp_publish(self, exchange, routing_key, body, properties=None): """Publish a message to RabbitMQ, serializing the payload data as an Avro datum if the message is to be sent as such. @@ -83,14 +81,13 @@ class PublishingMixin(amqp.PublishingMixin, http.HTTPClientMixin): properties = properties or {} if properties.get('content_type') == DATUM_MIME_TYPE and \ isinstance(body, dict): - avro_schema = yield self._schema(properties['type']) + avro_schema = await self._schema(properties['type']) LOGGER.debug('Schema: %r', avro_schema) body = self._serialize(avro_schema, body) - yield super(PublishingMixin, self).amqp_publish( + await super(PublishingMixin, self).amqp_publish( exchange, routing_key, body, properties) - @gen.coroutine - def _schema(self, message_type): + async def _schema(self, message_type): """Fetch the Avro schema file from application cache or the remote URI. If the request for the schema from the remote URI fails, a :exc:`sprockets.mixins.avro_publisher.SchemaFetchError` will be @@ -104,12 +101,11 @@ class PublishingMixin(amqp.PublishingMixin, http.HTTPClientMixin): global _SCHEMAS if message_type not in _SCHEMAS: - schema = yield self._fetch_schema(message_type) + schema = await self._fetch_schema(message_type) _SCHEMAS[message_type] = schema - raise gen.Return(_SCHEMAS[message_type]) + return _SCHEMAS[message_type] - @gen.coroutine - def _fetch_schema(self, message_type): + async def _fetch_schema(self, message_type): """Fetch the Avro schema for the given message type from a remote location, returning the schema JSON string. @@ -122,9 +118,9 @@ class PublishingMixin(amqp.PublishingMixin, http.HTTPClientMixin): :raises: sprockets.mixins.avro_publisher.SchemaFetchError """ - response = yield self.http_fetch(self._schema_url(message_type)) + response = await self.http_fetch(self._schema_url(message_type)) if response.ok: - raise gen.Return(json.loads(response.raw.body.decode('utf-8'))) + return json.loads(response.raw.body.decode('utf-8')) raise SchemaFetchError() def _schema_url(self, message_type):