Migrate to async and await for avro_pubsliher __init__ file

This commit is contained in:
prashantk 2020-03-16 16:45:01 +05:30
parent af82ea0b5f
commit eb5e3be8e7

View file

@ -15,10 +15,9 @@ import json
import logging import logging
from sprockets.mixins import amqp, http from sprockets.mixins import amqp, http
from tornado import gen
import fastavro import fastavro
__version__ = '2.1.0' __version__ = '3.0.0'
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
@ -62,8 +61,7 @@ class PublishingMixin(amqp.PublishingMixin, http.HTTPClientMixin):
properties['type'] = message_type properties['type'] = message_type
return self.amqp_publish(exchange, routing_key, data, properties) return self.amqp_publish(exchange, routing_key, data, properties)
@gen.coroutine async def amqp_publish(self, exchange, routing_key, body, properties=None):
def amqp_publish(self, exchange, routing_key, body, properties=None):
"""Publish a message to RabbitMQ, serializing the payload data as an """Publish a message to RabbitMQ, serializing the payload data as an
Avro datum if the message is to be sent as such. Avro datum if the message is to be sent as such.
@ -83,14 +81,13 @@ class PublishingMixin(amqp.PublishingMixin, http.HTTPClientMixin):
properties = properties or {} properties = properties or {}
if properties.get('content_type') == DATUM_MIME_TYPE and \ if properties.get('content_type') == DATUM_MIME_TYPE and \
isinstance(body, dict): isinstance(body, dict):
avro_schema = yield self._schema(properties['type']) avro_schema = await self._schema(properties['type'])
LOGGER.debug('Schema: %r', avro_schema) LOGGER.debug('Schema: %r', avro_schema)
body = self._serialize(avro_schema, body) body = self._serialize(avro_schema, body)
yield super(PublishingMixin, self).amqp_publish( await super(PublishingMixin, self).amqp_publish(
exchange, routing_key, body, properties) exchange, routing_key, body, properties)
@gen.coroutine async def _schema(self, message_type):
def _schema(self, message_type):
"""Fetch the Avro schema file from application cache or the remote """Fetch the Avro schema file from application cache or the remote
URI. If the request for the schema from the remote URI fails, a URI. If the request for the schema from the remote URI fails, a
:exc:`sprockets.mixins.avro_publisher.SchemaFetchError` will be :exc:`sprockets.mixins.avro_publisher.SchemaFetchError` will be
@ -104,12 +101,11 @@ class PublishingMixin(amqp.PublishingMixin, http.HTTPClientMixin):
global _SCHEMAS global _SCHEMAS
if message_type not in _SCHEMAS: if message_type not in _SCHEMAS:
schema = yield self._fetch_schema(message_type) schema = await self._fetch_schema(message_type)
_SCHEMAS[message_type] = schema _SCHEMAS[message_type] = schema
raise gen.Return(_SCHEMAS[message_type]) return _SCHEMAS[message_type]
@gen.coroutine async def _fetch_schema(self, message_type):
def _fetch_schema(self, message_type):
"""Fetch the Avro schema for the given message type from a remote """Fetch the Avro schema for the given message type from a remote
location, returning the schema JSON string. location, returning the schema JSON string.
@ -122,9 +118,9 @@ class PublishingMixin(amqp.PublishingMixin, http.HTTPClientMixin):
:raises: sprockets.mixins.avro_publisher.SchemaFetchError :raises: sprockets.mixins.avro_publisher.SchemaFetchError
""" """
response = yield self.http_fetch(self._schema_url(message_type)) response = await self.http_fetch(self._schema_url(message_type))
if response.ok: if response.ok:
raise gen.Return(json.loads(response.raw.body.decode('utf-8'))) return json.loads(response.raw.body.decode('utf-8'))
raise SchemaFetchError() raise SchemaFetchError()
def _schema_url(self, message_type): def _schema_url(self, message_type):