From 6be04d8026130214423eed573bfb77702faf8a53 Mon Sep 17 00:00:00 2001 From: Dave Shawley Date: Wed, 25 May 2016 21:28:15 -0400 Subject: [PATCH] Initial pass at mixins module. Needs a little love, but the functionality is there. For some reason the round-trip test is not working. I would guess that it is not finding the dynamodb instance correctly. --- sprockets/clients/dynamodb/mixins.py | 76 +++++++++ tests/dynamomixin_tests.py | 224 +++++++++++++++++++++++++++ 2 files changed, 300 insertions(+) create mode 100644 sprockets/clients/dynamodb/mixins.py create mode 100644 tests/dynamomixin_tests.py diff --git a/sprockets/clients/dynamodb/mixins.py b/sprockets/clients/dynamodb/mixins.py new file mode 100644 index 0000000..6327dc2 --- /dev/null +++ b/sprockets/clients/dynamodb/mixins.py @@ -0,0 +1,76 @@ +import logging + +from sprockets.clients.dynamodb import exceptions +from tornado import gen, web + + +class DynamoMixin(web.RequestHandler): + + def initialize(self): + super(DynamoMixin, self).initialize() + dynamocfg = self.settings['dynamodb'] + self.__max_retries = int(dynamocfg.get('max_retries', '3')) + self.__rate_limit_retry = int(dynamocfg.get('retry_after', '3')) + self.dynamo = self.application.dynamo + if not hasattr(self, 'logger'): + self.logger = logging.getLogger(self.__class__.__name__) + + def _on_dynamodb_exception(self, error, attempt): + """ + :param exceptions.DynamoDBException error: + """ + if isinstance(error, exceptions.NoCredentialsError): + self.logger.error('DynamoDB credentials error: %s', error) + self.send_error(500, reason='Database Authentication') + elif isinstance(error, exceptions.RequestException): + if attempt >= self.__max_retries: + self.send_error(500, reason='Database Request Failure') + else: + self.logger.warning('DynamoDB request exception #%i, ' + 'retrying', attempt) + return + elif isinstance(error, exceptions.ThroughputExceeded): + self.logger.warning('DynamoDB throughput exceeded: %s', error) + self.set_status(429) + self.add_header('Retry-After', self.__rate_limit_retry) + if hasattr(self, 'send_response'): + self.send_response( + {'error': 'Database throughput exceeded. Retry after {} ' + 'seconds'.format(self.__rate_limit_retry)}) + self.finish() + elif isinstance(error, exceptions.TimeoutException): + self.logger.error('DynamoDB timeout') + self.send_error(500, reason='Database Timeout') + else: + self.logger.error('DynamoDB error: %s (%r)', error, error.args) + self.send_error(500, reason='Unexpected Error {}'.format(error)) + + raise error + + @gen.coroutine + def get_item(self, table_name, key): + for attempt in range(0, self.__max_retries + 1): + try: + self.logger.debug('get_item %d', attempt) + item = yield self.dynamo.get_item(table_name, key) + raise gen.Return(item) + except exceptions.DynamoDBException as error: + self._on_dynamodb_exception(error, attempt) + + @gen.coroutine + def put_item(self, table_name, item): + for attempt in range(0, self.__max_retries + 1): + try: + result = yield self.dynamo.put_item(table_name, item) + raise gen.Return(result) + except exceptions.DynamoDBException as error: + self._on_dynamodb_exception(error, attempt) + + @gen.coroutine + def dynamodb_execute(self, function, arg_dict): + for attempt in range(0, self.__max_retries + 1): + try: + results = yield self.dynamo.execute(function, arg_dict) + raise gen.Return(results) + except exceptions.DynamoDBException as error: + self._on_dynamodb_exception(error, attempt) diff --git a/tests/dynamomixin_tests.py b/tests/dynamomixin_tests.py new file mode 100644 index 0000000..0ca90e1 --- /dev/null +++ b/tests/dynamomixin_tests.py @@ -0,0 +1,224 @@ +import json +import unittest +import uuid + +try: + from unittest import mock +except ImportError: + import mock + + +from tornado import concurrent, gen, ioloop, testing, web + +from sprockets.clients.dynamodb import connector, mixins +from sprockets.clients.dynamodb import exceptions as dynamo_exceptions + + +class CRUDHandler(mixins.DynamoMixin): + + @gen.coroutine + def get(self, *args, **kwargs): + item = yield self.get_item(self.path_args[0], + {'id': self.path_args[1]}) + self.write(json.dumps(item).encode('utf-8')) + self.set_status(200) + self.finish() + + @gen.coroutine + def put(self, *args, **kwargs): + yield self.put_item(self.path_args[0], + json.loads(self.request.body.decode('utf-8'))) + self.set_status(200) + self.finish() + + @gen.coroutine + def post(self, *args, **kwargs): + yield self.dynamodb_execute( + self.path_args[0], + json.loads(self.request.body.decode('utf-8'))) + + +class _CommonOperationTests(testing.AsyncHTTPTestCase): + num_retries = 2 + retry_after = 3 + + def setUp(self): + self.application = None + super(_CommonOperationTests, self).setUp() + + def get_app(self): + if self.application is None: + self.application = web.Application( + [web.url('/(.*)/(.*)', CRUDHandler), + web.url('/(.*)', CRUDHandler)], + dynamodb={'max_retries': self.num_retries, + 'retry_after': self.retry_after}) + setattr(self.application, 'dynamo', mock.Mock()) + return self.application + + @property + def dynamo_mock(self): + raise NotImplementedError + + def perform_action(self): + raise NotImplementedError + + def test_that_operation_can_succeed(self): + future = concurrent.Future() + future.set_result({}) + self.dynamo_mock.return_value = future + response = self.perform_action() + self.assertEqual(response.code, 200) + self.assertEqual(self.dynamo_mock.call_count, 1) + + def test_that_operation_returns_500_on_dynamodb_failure(self): + self.dynamo_mock.side_effect = dynamo_exceptions.DynamoDBException + response = self.perform_action() + self.assertEqual(response.code, 500) + self.assertEqual(self.dynamo_mock.call_count, 1) + + def test_that_operation_retries_request_exceptions(self): + self.dynamo_mock.side_effect = dynamo_exceptions.RequestException + response = self.perform_action() + self.assertEqual(response.code, 500) + self.assertEqual(self.dynamo_mock.call_count, self.num_retries + 1) + + def test_that_operation_retry_can_succeed(self): + future = concurrent.Future() + future.set_result({}) + self.dynamo_mock.side_effect = [dynamo_exceptions.RequestException, + future] + response = self.perform_action() + self.assertEqual(response.code, 200) + self.assertEqual(self.dynamo_mock.call_count, 2) + + def test_that_operation_credentials_error_returns_500(self): + self.dynamo_mock.side_effect = dynamo_exceptions.NoCredentialsError + response = self.perform_action() + self.assertEqual(response.code, 500) + self.assertEqual(self.dynamo_mock.call_count, 1) + + def test_that_operation_timeout_returns_500(self): + self.dynamo_mock.side_effect = dynamo_exceptions.TimeoutException + response = self.perform_action() + self.assertEqual(response.code, 500) + self.assertEqual(self.dynamo_mock.call_count, 1) + + def test_that_operation_rate_limits(self): + self.dynamo_mock.side_effect = dynamo_exceptions.ThroughputExceeded + response = self.perform_action() + self.assertEqual(response.code, 429) + self.assertEqual(self.dynamo_mock.call_count, 1) + self.assertEqual(response.headers['Retry-After'], + str(self.retry_after)) + + +class GetItemTests(_CommonOperationTests): + + @property + def dynamo_mock(self): + return self.application.dynamo.get_item + + def perform_action(self): + return self.fetch('/table/key') + + +class PutItemTests(_CommonOperationTests): + + @property + def dynamo_mock(self): + return self.application.dynamo.put_item + + def perform_action(self): + return self.fetch('/table', method='PUT', body='{"one":2}', + headers={'Content-Type': 'application/json'}) + + +class DynamoDBExecuteTests(_CommonOperationTests): + + @property + def dynamo_mock(self): + return self.application.dynamo.execute + + def perform_action(self): + return self.fetch('/operation', method='POST', body='"something"', + headers={'Content-Type': 'application/json'}) + + +@gen.coroutine +def create_table(dynamodb, tabledef): + max_attempts = 10 + first_time = True + response = {} + + while not response.get('TableStatus') == 'Active': + try: + response = yield dynamodb.describe_table(tabledef['TableName']) + except dynamo_exceptions.ResourceNotFound: + if first_time: + first_time = False + response = yield dynamodb.create_table(tabledef) + yield gen.sleep(0.25) + + max_attempts -= 1 + if max_attempts <= 0: + raise RuntimeError('Failed to create table') + + +@unittest.skip('Currently broken in this project...') +class RoundTripTests(testing.AsyncHTTPTestCase): + + @classmethod + def setUpClass(cls): + super(RoundTripTests, cls).setUpClass() + cls.table_name = uuid.uuid4().hex + tabledef = { + 'TableName': cls.table_name, + 'AttributeDefinitions': [ + {'AttributeName': 'id', 'AttributeType': 'S'}], + 'KeySchema': [{'AttributeName': 'id', 'KeyType': 'HASH'}], + 'ProvisionedThroughput': {'ReadCapacityUnits': 1, + 'WriteCapacityUnits': 1}, + } + iol = ioloop.IOLoop.instance() + dynamo = connector.DynamoDB() + iol.add_future(create_table(dynamo, tabledef), + lambda f: iol.stop()) + iol.start() + + @classmethod + def tearDownClass(cls): + super(RoundTripTests, cls).tearDownClass() + iol = ioloop.IOLoop.instance() + dynamo = connector.DynamoDB() + iol.add_future(dynamo.delete_table(cls.table_name), + lambda f: iol.stop()) + iol.start() + + def setUp(self): + self.application = None + super(RoundTripTests, self).setUp() + + def get_app(self): + if self.application is None: + self.application = web.Application( + [web.url('/(.*)/(.*)', CRUDHandler), + web.url('/(.*)', CRUDHandler)], + dynamodb={}) + setattr(self.application, 'dynamo', connector.DynamoDB()) + return self.application + + def test_that_item_can_created_and_fetched(self): + object_id = str(uuid.uuid4()) + response = self.fetch( + '/{}'.format(self.table_name), method='PUT', + body=json.dumps({'id': object_id}).encode('utf-8'), + headers={'Content-Type': 'application/json'}) + self.assertEqual(response.code, 200) + + response = self.fetch( + '/{}/{}'.format(self.table_name, object_id), + headers={'Accept': 'application/json'}) + self.assertEqual(response.code, 200) + self.assertEqual(json.loads(response.body.decode('utf-8')), + {'id': object_id})