sprockets.mixins.avro-publi.../tests/tests.py
2020-03-16 16:47:18 +05:30

206 lines
7.4 KiB
Python

import json
import io
import os
import sys
import logging
import random
import uuid
from tornado import concurrent, locks, testing, web
import fastavro
from pika import spec
from sprockets.mixins import amqp, avro_publisher
LOGGER = logging.getLogger(__name__)
MESSAGE_TYPE = "example.avro.Test"
AVRO_SCHEMA = {
"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}]}
def deserialize(value):
return fastavro.schemaless_reader(io.BytesIO(value), AVRO_SCHEMA)
class Test1RequestHandler(avro_publisher.PublishingMixin, web.RequestHandler):
def initialize(self):
self.correlation_id = self.request.headers.get('Correlation-Id')
self.publish = self.amqp_publish
async def get(self, *args, **kwargs):
LOGGER.debug('Handling Request %r', self.correlation_id)
parameters = self.parameters()
try:
await self.publish(**parameters)
except amqp.AMQPException as error:
self.write({'error': str(error),
'type': error.__class__.__name__,
'parameters': parameters})
else:
self.write(parameters) # Correlation-ID is added pass by reference
def parameters(self):
return {
'exchange': self.get_argument('exchange', str(uuid.uuid4())),
'routing_key': self.get_argument('routing_key', str(uuid.uuid4())),
'body': {
'name': str(uuid.uuid4()),
'favorite_number': random.randint(1, 1000),
'favorite_color': str(uuid.uuid4())
},
'properties': {
'content_type': avro_publisher.DATUM_MIME_TYPE,
'message_id': str(uuid.uuid4()),
'type': MESSAGE_TYPE}}
class Test2RequestHandler(Test1RequestHandler):
def initialize(self):
self.correlation_id = self.request.headers.get('Correlation-Id')
self.publish = self.avro_amqp_publish
def parameters(self):
return {
'exchange': self.get_argument('exchange', str(uuid.uuid4())),
'routing_key': self.get_argument('routing_key', str(uuid.uuid4())),
'message_type': MESSAGE_TYPE,
'data': {
'name': str(uuid.uuid4()),
'favorite_number': random.randint(1, 1000),
'favorite_color': str(uuid.uuid4())
},
'properties': {'message_id': str(uuid.uuid4())}}
class SchemaRequestHandler(web.RequestHandler):
def get(self, *args, **kwargs):
LOGGER.debug('Returning Schema for %r %r', args, kwargs)
self.finish(AVRO_SCHEMA)
def setUpModule():
logging.getLogger('pika').setLevel(logging.INFO)
class AsyncHTTPTestCase(testing.AsyncHTTPTestCase):
CONFIRMATIONS = True
def setUp(self):
super(AsyncHTTPTestCase, self).setUp()
self.correlation_id = str(uuid.uuid4())
self.exchange = str(uuid.uuid4())
self.get_delivered_message = concurrent.Future()
self.get_returned_message = concurrent.Future()
self.queue = str(uuid.uuid4())
self.routing_key = str(uuid.uuid4())
self.ready = locks.Event()
avro_publisher.install(self._app, self.io_loop, **{
'on_ready_callback': self.on_amqp_ready,
'enable_confirmations': self.CONFIRMATIONS,
'on_return_callback': self.on_message_returned,
'url': os.environ['AMQP_URL']})
def wait_on_ready():
if self._app.amqp.ready:
self.io_loop.stop()
else:
self.io_loop.call_later(0.1, wait_on_ready)
sys.stdout.flush()
self.io_loop.add_callback(wait_on_ready)
self.io_loop.start()
def tearDown(self):
def shutdown():
if self._app.amqp.closed:
self.io_loop.stop()
elif not self._app.amqp.closing:
self._app.amqp.close()
self.io_loop.call_later(0.1, shutdown)
self.io_loop.add_callback(shutdown)
self.io_loop.start()
super().tearDown()
def get_app(self):
application = web.Application(
[(r'/test1', Test1RequestHandler),
(r'/test2', Test2RequestHandler),
(r'/schema/(.*).avsc', SchemaRequestHandler)],
**{'avro_schema_uri_format': self.get_url('/schema/%(name)s.avsc'),
'service': 'test',
'version': avro_publisher.__version__})
return application
def on_amqp_ready(self, _client):
LOGGER.debug('AMQP ready')
self._app.amqp.channel.exchange_declare(
self.on_exchange_declared, self.exchange,
durable=False, auto_delete=True)
def on_exchange_declared(self, method):
LOGGER.debug('Exchange declared: %r', method)
self._app.amqp.channel.queue_declare(
self.on_queue_declared, self.queue,
arguments={'x-expires': 30000},
auto_delete=True, durable=False)
def on_queue_declared(self, method):
LOGGER.debug('Queue declared: %r', method)
self._app.amqp.channel.queue_bind(
self.on_queue_bound, self.queue, self.exchange, self.routing_key)
def on_queue_bound(self, method):
LOGGER.debug('Queue bound: %r', method)
self._app.amqp.channel.basic_consume(
self.on_message_delivered, self.queue)
self.io_loop.stop()
def on_message_delivered(self, _channel, method, properties, body):
self.get_delivered_message.set_result((method, properties, body))
def on_message_returned(self, method, properties, body):
self.get_returned_message.set_result((method, properties, body))
class PublisherConfirmationTestCase(AsyncHTTPTestCase):
@testing.gen_test
async def test_amqp_publish(self):
response = await self.http_client.fetch(
self.get_url('/test1?exchange={}&routing_key={}'.format(
self.exchange, self.routing_key)),
headers={'Correlation-Id': self.correlation_id})
published = json.loads(response.body.decode('utf-8'))
delivered = await self.get_delivered_message
self.assertIsInstance(delivered[0], spec.Basic.Deliver)
self.assertEqual(delivered[1].correlation_id, self.correlation_id)
self.assertEqual(
delivered[1].content_type, avro_publisher.DATUM_MIME_TYPE)
self.assertEqual(delivered[1].type, MESSAGE_TYPE)
self.assertEqual(deserialize(delivered[2]), published['body'])
@testing.gen_test
async def test_avro_amqp_publish(self):
response = await self.http_client.fetch(
self.get_url('/test2?exchange={}&routing_key={}'.format(
self.exchange, self.routing_key)),
headers={'Correlation-Id': self.correlation_id})
published = json.loads(response.body.decode('utf-8'))
delivered = await self.get_delivered_message
self.assertIsInstance(delivered[0], spec.Basic.Deliver)
self.assertEqual(delivered[1].correlation_id, self.correlation_id)
self.assertEqual(
delivered[1].content_type, avro_publisher.DATUM_MIME_TYPE)
self.assertEqual(delivered[1].type, MESSAGE_TYPE)
self.assertEqual(deserialize(delivered[2]), published['data'])