mirror of
https://github.com/sprockets/sprockets.clients.dynamodb.git
synced 2024-11-27 11:19:54 +00:00
225 lines
7.4 KiB
Python
225 lines
7.4 KiB
Python
|
import json
|
||
|
import unittest
|
||
|
import uuid
|
||
|
|
||
|
try:
|
||
|
from unittest import mock
|
||
|
except ImportError:
|
||
|
import mock
|
||
|
|
||
|
|
||
|
from tornado import concurrent, gen, ioloop, testing, web
|
||
|
|
||
|
from sprockets.clients.dynamodb import connector, mixins
|
||
|
from sprockets.clients.dynamodb import exceptions as dynamo_exceptions
|
||
|
|
||
|
|
||
|
class CRUDHandler(mixins.DynamoMixin):
|
||
|
|
||
|
@gen.coroutine
|
||
|
def get(self, *args, **kwargs):
|
||
|
item = yield self.get_item(self.path_args[0],
|
||
|
{'id': self.path_args[1]})
|
||
|
self.write(json.dumps(item).encode('utf-8'))
|
||
|
self.set_status(200)
|
||
|
self.finish()
|
||
|
|
||
|
@gen.coroutine
|
||
|
def put(self, *args, **kwargs):
|
||
|
yield self.put_item(self.path_args[0],
|
||
|
json.loads(self.request.body.decode('utf-8')))
|
||
|
self.set_status(200)
|
||
|
self.finish()
|
||
|
|
||
|
@gen.coroutine
|
||
|
def post(self, *args, **kwargs):
|
||
|
yield self.dynamodb_execute(
|
||
|
self.path_args[0],
|
||
|
json.loads(self.request.body.decode('utf-8')))
|
||
|
|
||
|
|
||
|
class _CommonOperationTests(testing.AsyncHTTPTestCase):
|
||
|
num_retries = 2
|
||
|
retry_after = 3
|
||
|
|
||
|
def setUp(self):
|
||
|
self.application = None
|
||
|
super(_CommonOperationTests, self).setUp()
|
||
|
|
||
|
def get_app(self):
|
||
|
if self.application is None:
|
||
|
self.application = web.Application(
|
||
|
[web.url('/(.*)/(.*)', CRUDHandler),
|
||
|
web.url('/(.*)', CRUDHandler)],
|
||
|
dynamodb={'max_retries': self.num_retries,
|
||
|
'retry_after': self.retry_after})
|
||
|
setattr(self.application, 'dynamo', mock.Mock())
|
||
|
return self.application
|
||
|
|
||
|
@property
|
||
|
def dynamo_mock(self):
|
||
|
raise NotImplementedError
|
||
|
|
||
|
def perform_action(self):
|
||
|
raise NotImplementedError
|
||
|
|
||
|
def test_that_operation_can_succeed(self):
|
||
|
future = concurrent.Future()
|
||
|
future.set_result({})
|
||
|
self.dynamo_mock.return_value = future
|
||
|
response = self.perform_action()
|
||
|
self.assertEqual(response.code, 200)
|
||
|
self.assertEqual(self.dynamo_mock.call_count, 1)
|
||
|
|
||
|
def test_that_operation_returns_500_on_dynamodb_failure(self):
|
||
|
self.dynamo_mock.side_effect = dynamo_exceptions.DynamoDBException
|
||
|
response = self.perform_action()
|
||
|
self.assertEqual(response.code, 500)
|
||
|
self.assertEqual(self.dynamo_mock.call_count, 1)
|
||
|
|
||
|
def test_that_operation_retries_request_exceptions(self):
|
||
|
self.dynamo_mock.side_effect = dynamo_exceptions.RequestException
|
||
|
response = self.perform_action()
|
||
|
self.assertEqual(response.code, 500)
|
||
|
self.assertEqual(self.dynamo_mock.call_count, self.num_retries + 1)
|
||
|
|
||
|
def test_that_operation_retry_can_succeed(self):
|
||
|
future = concurrent.Future()
|
||
|
future.set_result({})
|
||
|
self.dynamo_mock.side_effect = [dynamo_exceptions.RequestException,
|
||
|
future]
|
||
|
response = self.perform_action()
|
||
|
self.assertEqual(response.code, 200)
|
||
|
self.assertEqual(self.dynamo_mock.call_count, 2)
|
||
|
|
||
|
def test_that_operation_credentials_error_returns_500(self):
|
||
|
self.dynamo_mock.side_effect = dynamo_exceptions.NoCredentialsError
|
||
|
response = self.perform_action()
|
||
|
self.assertEqual(response.code, 500)
|
||
|
self.assertEqual(self.dynamo_mock.call_count, 1)
|
||
|
|
||
|
def test_that_operation_timeout_returns_500(self):
|
||
|
self.dynamo_mock.side_effect = dynamo_exceptions.TimeoutException
|
||
|
response = self.perform_action()
|
||
|
self.assertEqual(response.code, 500)
|
||
|
self.assertEqual(self.dynamo_mock.call_count, 1)
|
||
|
|
||
|
def test_that_operation_rate_limits(self):
|
||
|
self.dynamo_mock.side_effect = dynamo_exceptions.ThroughputExceeded
|
||
|
response = self.perform_action()
|
||
|
self.assertEqual(response.code, 429)
|
||
|
self.assertEqual(self.dynamo_mock.call_count, 1)
|
||
|
self.assertEqual(response.headers['Retry-After'],
|
||
|
str(self.retry_after))
|
||
|
|
||
|
|
||
|
class GetItemTests(_CommonOperationTests):
|
||
|
|
||
|
@property
|
||
|
def dynamo_mock(self):
|
||
|
return self.application.dynamo.get_item
|
||
|
|
||
|
def perform_action(self):
|
||
|
return self.fetch('/table/key')
|
||
|
|
||
|
|
||
|
class PutItemTests(_CommonOperationTests):
|
||
|
|
||
|
@property
|
||
|
def dynamo_mock(self):
|
||
|
return self.application.dynamo.put_item
|
||
|
|
||
|
def perform_action(self):
|
||
|
return self.fetch('/table', method='PUT', body='{"one":2}',
|
||
|
headers={'Content-Type': 'application/json'})
|
||
|
|
||
|
|
||
|
class DynamoDBExecuteTests(_CommonOperationTests):
|
||
|
|
||
|
@property
|
||
|
def dynamo_mock(self):
|
||
|
return self.application.dynamo.execute
|
||
|
|
||
|
def perform_action(self):
|
||
|
return self.fetch('/operation', method='POST', body='"something"',
|
||
|
headers={'Content-Type': 'application/json'})
|
||
|
|
||
|
|
||
|
@gen.coroutine
|
||
|
def create_table(dynamodb, tabledef):
|
||
|
max_attempts = 10
|
||
|
first_time = True
|
||
|
response = {}
|
||
|
|
||
|
while not response.get('TableStatus') == 'Active':
|
||
|
try:
|
||
|
response = yield dynamodb.describe_table(tabledef['TableName'])
|
||
|
except dynamo_exceptions.ResourceNotFound:
|
||
|
if first_time:
|
||
|
first_time = False
|
||
|
response = yield dynamodb.create_table(tabledef)
|
||
|
yield gen.sleep(0.25)
|
||
|
|
||
|
max_attempts -= 1
|
||
|
if max_attempts <= 0:
|
||
|
raise RuntimeError('Failed to create table')
|
||
|
|
||
|
|
||
|
@unittest.skip('Currently broken in this project...')
|
||
|
class RoundTripTests(testing.AsyncHTTPTestCase):
|
||
|
|
||
|
@classmethod
|
||
|
def setUpClass(cls):
|
||
|
super(RoundTripTests, cls).setUpClass()
|
||
|
cls.table_name = uuid.uuid4().hex
|
||
|
tabledef = {
|
||
|
'TableName': cls.table_name,
|
||
|
'AttributeDefinitions': [
|
||
|
{'AttributeName': 'id', 'AttributeType': 'S'}],
|
||
|
'KeySchema': [{'AttributeName': 'id', 'KeyType': 'HASH'}],
|
||
|
'ProvisionedThroughput': {'ReadCapacityUnits': 1,
|
||
|
'WriteCapacityUnits': 1},
|
||
|
}
|
||
|
iol = ioloop.IOLoop.instance()
|
||
|
dynamo = connector.DynamoDB()
|
||
|
iol.add_future(create_table(dynamo, tabledef),
|
||
|
lambda f: iol.stop())
|
||
|
iol.start()
|
||
|
|
||
|
@classmethod
|
||
|
def tearDownClass(cls):
|
||
|
super(RoundTripTests, cls).tearDownClass()
|
||
|
iol = ioloop.IOLoop.instance()
|
||
|
dynamo = connector.DynamoDB()
|
||
|
iol.add_future(dynamo.delete_table(cls.table_name),
|
||
|
lambda f: iol.stop())
|
||
|
iol.start()
|
||
|
|
||
|
def setUp(self):
|
||
|
self.application = None
|
||
|
super(RoundTripTests, self).setUp()
|
||
|
|
||
|
def get_app(self):
|
||
|
if self.application is None:
|
||
|
self.application = web.Application(
|
||
|
[web.url('/(.*)/(.*)', CRUDHandler),
|
||
|
web.url('/(.*)', CRUDHandler)],
|
||
|
dynamodb={})
|
||
|
setattr(self.application, 'dynamo', connector.DynamoDB())
|
||
|
return self.application
|
||
|
|
||
|
def test_that_item_can_created_and_fetched(self):
|
||
|
object_id = str(uuid.uuid4())
|
||
|
response = self.fetch(
|
||
|
'/{}'.format(self.table_name), method='PUT',
|
||
|
body=json.dumps({'id': object_id}).encode('utf-8'),
|
||
|
headers={'Content-Type': 'application/json'})
|
||
|
self.assertEqual(response.code, 200)
|
||
|
|
||
|
response = self.fetch(
|
||
|
'/{}/{}'.format(self.table_name, object_id),
|
||
|
headers={'Accept': 'application/json'})
|
||
|
self.assertEqual(response.code, 200)
|
||
|
self.assertEqual(json.loads(response.body.decode('utf-8')),
|
||
|
{'id': object_id})
|