diff --git a/setup.cfg b/setup.cfg index 2d00bbb..3fa065b 100644 --- a/setup.cfg +++ b/setup.cfg @@ -12,4 +12,4 @@ cover-branches = 1 cover-erase = 1 cover-package = sprockets_dynamodb with-coverage = 1 -verbose = 1 +verbosity = 2 diff --git a/sprockets_dynamodb/__init__.py b/sprockets_dynamodb/__init__.py index f72ce27..f6fe8d6 100644 --- a/sprockets_dynamodb/__init__.py +++ b/sprockets_dynamodb/__init__.py @@ -4,31 +4,32 @@ Sprockets DynamoDB """ import logging +try: + from sprockets_dynamodb.client import Client +except ImportError: # pragma: nocover + Client = None +try: + from sprockets_dynamodb.mixin import DynamoDBMixin +except ImportError: # pragma: nocover + DynamoDBMixin = None + +try: + from sprockets_dynamodb.exceptions import * +except ImportError: # pragma: nocover + pass version_info = (2, 0, 0) __version__ = '.'.join(str(v) for v in version_info) logging.getLogger(__name__).addHandler(logging.NullHandler()) -try: - from sprockets_dynamodb.client import Client -except ImportError: - Client = None -try: - from sprockets_dynamodb.mixin import DynamoDBMixin -except ImportError: - DynamoDBMixin = None - -from sprockets_dynamodb.exceptions import * - -# Response constants +# Table status constants TABLE_ACTIVE = 'ACTIVE' TABLE_CREATING = 'CREATING' TABLE_DELETING = 'DELETING' TABLE_DISABLED = 'DISABLED' TABLE_UPDATING = 'UPDATING' - __all__ = [ 'client', 'exceptions', diff --git a/sprockets_dynamodb/client.py b/sprockets_dynamodb/client.py index 2737f75..ddad996 100644 --- a/sprockets_dynamodb/client.py +++ b/sprockets_dynamodb/client.py @@ -7,7 +7,7 @@ import collections import json import logging import os -import select +import select as _select import socket import ssl import time @@ -21,7 +21,7 @@ from sprockets_dynamodb import exceptions, utils # Stub Python3 exceptions for Python 2.7 try: ConnectionError -except NameError: +except NameError: # pragma: nocover class ConnectionError(Exception): pass @@ -771,24 +771,6 @@ class Client(object): def batch_get_item(self): """Invoke the `BatchGetItem`_ function. - :raises: - :exc:`~sprockets_dynamodb.exceptions.DynamoDBException` - :exc:`~sprockets_dynamodb.exceptions.ConfigNotFound` - :exc:`~sprockets_dynamodb.exceptions.NoCredentialsError` - :exc:`~sprockets_dynamodb.exceptions.NoProfileError` - :exc:`~sprockets_dynamodb.exceptions.TimeoutException` - :exc:`~sprockets_dynamodb.exceptions.RequestException` - :exc:`~sprockets_dynamodb.exceptions.InternalFailure` - :exc:`~sprockets_dynamodb.exceptions.LimitExceeded` - :exc:`~sprockets_dynamodb.exceptions.MissingParameter` - :exc:`~sprockets_dynamodb.exceptions.OptInRequired` - :exc:`~sprockets_dynamodb.exceptions.ResourceInUse` - :exc:`~sprockets_dynamodb.exceptions.RequestExpired` - :exc:`~sprockets_dynamodb.exceptions.ResourceNotFound` - :exc:`~sprockets_dynamodb.exceptions.ServiceUnavailable` - :exc:`~sprockets_dynamodb.exceptions.ThroughputExceeded` - :exc:`~sprockets_dynamodb.exceptions.ValidationException` - .. _BatchGetItem: http://docs.aws.amazon.com/amazondynamodb/ latest/APIReference/API_BatchGetItem.html @@ -798,36 +780,25 @@ class Client(object): def batch_write_item(self): """Invoke the `BatchWriteItem`_ function. - :raises: - :exc:`~sprockets_dynamodb.exceptions.DynamoDBException` - :exc:`~sprockets_dynamodb.exceptions.ConfigNotFound` - :exc:`~sprockets_dynamodb.exceptions.NoCredentialsError` - :exc:`~sprockets_dynamodb.exceptions.NoProfileError` - :exc:`~sprockets_dynamodb.exceptions.TimeoutException` - :exc:`~sprockets_dynamodb.exceptions.RequestException` - :exc:`~sprockets_dynamodb.exceptions.InternalFailure` - :exc:`~sprockets_dynamodb.exceptions.LimitExceeded` - :exc:`~sprockets_dynamodb.exceptions.MissingParameter` - :exc:`~sprockets_dynamodb.exceptions.OptInRequired` - :exc:`~sprockets_dynamodb.exceptions.ResourceInUse` - :exc:`~sprockets_dynamodb.exceptions.RequestExpired` - :exc:`~sprockets_dynamodb.exceptions.ResourceNotFound` - :exc:`~sprockets_dynamodb.exceptions.ServiceUnavailable` - :exc:`~sprockets_dynamodb.exceptions.ThroughputExceeded` - :exc:`~sprockets_dynamodb.exceptions.ValidationException` - .. _BatchWriteItem: http://docs.aws.amazon.com/amazondynamodb/ latest/APIReference/API_BatchWriteItem.html """ raise NotImplementedError - def query(self, table_name, consistent_read=False, - exclusive_start_key=None, expression_attribute_names=None, - expression_attribute_values=None, filter_expression=None, - projection_expression=None, index_name=None,limit=None, - return_consumed_capacity=None, scan_index_forward=True, - select=None): + def query(self, table_name, + index_name=None, + consistent_read=None, + key_condition_expression=None, + filter_expression=None, + expression_attribute_names=None, + expression_attribute_values=None, + projection_expression=None, + select=None, + exclusive_start_key=None, + limit=None, + scan_index_forward=True, + return_consumed_capacity=None): """A `Query`_ operation uses the primary key of a table or a secondary index to directly access items from that table or index. @@ -861,7 +832,7 @@ class Client(object): you query a global secondary index with ``consistent_read`` set to ``True``, you will receive a :exc:`~sprockets_dynamodb.exceptions.ValidationException`. - :param str|bytes|int exclusive_start_key: The primary key of the first + :param dict exclusive_start_key: The primary key of the first item that this operation will evaluate. Use the value that was returned for ``LastEvaluatedKey`` in the previous operation. In a parallel scan, a *Scan* request that includes @@ -948,35 +919,53 @@ class Client(object): the matching items themselves. :rtype: dict - :raises: - :exc:`~sprockets_dynamodb.exceptions.DynamoDBException` - :exc:`~sprockets_dynamodb.exceptions.ConfigNotFound` - :exc:`~sprockets_dynamodb.exceptions.NoCredentialsError` - :exc:`~sprockets_dynamodb.exceptions.NoProfileError` - :exc:`~sprockets_dynamodb.exceptions.TimeoutException` - :exc:`~sprockets_dynamodb.exceptions.RequestException` - :exc:`~sprockets_dynamodb.exceptions.InternalFailure` - :exc:`~sprockets_dynamodb.exceptions.LimitExceeded` - :exc:`~sprockets_dynamodb.exceptions.MissingParameter` - :exc:`~sprockets_dynamodb.exceptions.OptInRequired` - :exc:`~sprockets_dynamodb.exceptions.ResourceInUse` - :exc:`~sprockets_dynamodb.exceptions.RequestExpired` - :exc:`~sprockets_dynamodb.exceptions.ResourceNotFound` - :exc:`~sprockets_dynamodb.exceptions.ServiceUnavailable` - :exc:`~sprockets_dynamodb.exceptions.ThroughputExceeded` - :exc:`~sprockets_dynamodb.exceptions.ValidationException` - .. _Query: http://docs.aws.amazon.com/amazondynamodb/ latest/APIReference/API_Query.html """ - raise NotImplementedError + payload = {'TableName': table_name, + 'ScanIndexForward': scan_index_forward} + if index_name: + payload['IndexName'] = index_name + if consistent_read is not None: + payload['ConsistentRead'] = consistent_read + if key_condition_expression: + payload['KeyConditionExpression'] = key_condition_expression + if filter_expression: + payload['FilterExpression'] = filter_expression + if expression_attribute_names: + payload['ExpressionAttributeNames'] = expression_attribute_names + if expression_attribute_values: + payload['ExpressionAttributeValues'] = \ + utils.marshall(expression_attribute_values) + if projection_expression: + payload['ProjectionExpression'] = projection_expression + if select: + _validate_select(select) + payload['Select'] = select + if exclusive_start_key: + payload['ExclusiveStartKey'] = utils.marshall(exclusive_start_key) + if limit: + payload['Limit'] = limit + if return_consumed_capacity: + _validate_return_consumed_capacity(return_consumed_capacity) + payload['ReturnConsumedCapacity'] = return_consumed_capacity + return self.execute('Query', payload) - def scan(self, table_name, consistent_read=False, exclusive_start_key=None, - expression_attribute_names=None, expression_attribute_values=None, - filter_expression=None, projection_expression=None, - index_name=None, limit=None, return_consumed_capacity=None, - segment=None, total_segments=None): + def scan(self, + table_name, + index_name=None, + consistent_read=None, + projection_expression=None, + filter_expression=None, + expression_attribute_names=None, + expression_attribute_values=None, + segment=None, + total_segments=None, + select=None, + limit=None, + exclusive_start_key=None, + return_consumed_capacity=None): """The `Scan`_ operation returns one or more items and item attributes by accessing every item in a table or a secondary index. @@ -1000,141 +989,41 @@ class Client(object): you need a consistent copy of the data, as of the time that the *Scan* begins, you can set the ``consistent_read`` parameter to ``True``. - :param str table_name: The name of the table containing the requested - items; or, if you provide IndexName, the name of the table to which - that index belongs. - :param bool consistent_read: A Boolean value that determines the read - consistency model during the scan: - - - If set to ``False``, then the data returned from *Scan* might not - contain the results from other recently completed write - operations (*PutItem*, *UpdateItem*, or *DeleteItem*). - - If set to ``True``, then all of the write operations that - completed before the Scan began are guaranteed to be contained in - the *Scan* response. - - The default setting is ``False``. - - This parameter is not supported on global secondary indexes. If you - scan a global secondary index and set ``consistent_read`` to - ``true``, you will receive a - :exc:`~sprockets_dynamodb.exceptions.ValidationException`. - :param str|bytes|int exclusive_start_key: The primary key of the first - item that this operation will evaluate. Use the value that was - returned for ``LastEvaluatedKey`` in the previous operation. - - In a parallel scan, a *Scan* request that includes - ``exclusive_start_key`` must specify the same segment whose - previous *Scan* returned the corresponding value of - ``LastEvaluatedKey``. - :param dict expression_attribute_names: One or more substitution tokens - for attribute names in an expression. - :param dict expression_attribute_values: One or more values that can be - substituted in an expression. - :param str filter_expression: A string that contains conditions that - DynamoDB applies after the Scan operation, but before the data is - returned to you. Items that do not satisfy the expression criteria - are not returned. - - .. note:: A filter expression is applied after the items have - already been read; the process of filtering does not consume - any additional read capacity units. - - For more information, see `Filter Expressions `_ in the Amazon DynamoDB Developer Guide. - :param str projection_expression: A string that identifies one or more - attributes to retrieve from the specified table or index. These - attributes can include scalars, sets, or elements of a JSON - document. The attributes in the expression must be separated by - commas. - - If no attribute names are specified, then all attributes will be - returned. If any of the requested attributes are not found, they - will not appear in the result. - - For more information, see `Accessing Item Attributes `_ in the Amazon DynamoDB Developer - Guide. - :param str index_name: The name of a secondary index to scan. This - index can be any local secondary index or global secondary index. - Note that if you use this parameter, you must also provide - ``table_name``. - :param int limit: The maximum number of items to evaluate (not - necessarily the number of matching items). If DynamoDB processes - the number of items up to the limit while processing the results, - it stops the operation and returns the matching values up to that - point, and a key in ``LastEvaluatedKey`` to apply in a subsequent - operation, so that you can pick up where you left off. Also, if the - processed data set size exceeds 1 MB before DynamoDB reaches this - limit, it stops the operation and returns the matching values up to - the limit, and a key in ``LastEvaluatedKey`` to apply in a - subsequent operation to continue the operation. For more - information, see `Query and Scan `_ in the Amazon - DynamoDB Developer Guide. - :param str return_consumed_capacity: Determines the level of detail - about provisioned throughput consumption that is returned in the - response. Should be ``None`` or one of ``INDEXES`` or ``TOTAL`` - :param int segment: For a parallel *Scan* request, ``segment`` - identifies an individual segment to be scanned by an application - worker. - - Segment IDs are zero-based, so the first segment is always ``0``. - For example, if you want to use four application threads to scan a - table or an index, then the first thread specifies a Segment value - of ``0``, the second thread specifies ``1``, and so on. - - The value of ``LastEvaluatedKey`` returned from a parallel *Scan* - request must be used as ``ExclusiveStartKey`` with the same segment - ID in a subsequent *Scan* operation. - - The value for ``segment`` must be greater than or equal to ``0``, - and less than the value provided for ``total_segments``. - - If you provide ``segment``, you must also provide - ``total_segments``. - :param int total_segments: For a parallel *Scan* request, - ``total_segments`` represents the total number of segments into - which the *Scan* operation will be divided. The value of - ``total_segments`` corresponds to the number of application workers - that will perform the parallel scan. For example, if you want to - use four application threads to scan a table or an index, specify a - ``total_segments`` value of 4. - - The value for ``total_segments`` must be greater than or equal to - ``1``, and less than or equal to ``1000000``. If you specify a - ``total_segments`` value of ``1``, the *Scan* operation will be - sequential rather than parallel. - - If you specify ``total_segments``, you must also specify - ``segments``. :rtype: dict - :raises: - :exc:`~sprockets_dynamodb.exceptions.DynamoDBException` - :exc:`~sprockets_dynamodb.exceptions.ConfigNotFound` - :exc:`~sprockets_dynamodb.exceptions.NoCredentialsError` - :exc:`~sprockets_dynamodb.exceptions.NoProfileError` - :exc:`~sprockets_dynamodb.exceptions.TimeoutException` - :exc:`~sprockets_dynamodb.exceptions.RequestException` - :exc:`~sprockets_dynamodb.exceptions.InternalFailure` - :exc:`~sprockets_dynamodb.exceptions.LimitExceeded` - :exc:`~sprockets_dynamodb.exceptions.MissingParameter` - :exc:`~sprockets_dynamodb.exceptions.OptInRequired` - :exc:`~sprockets_dynamodb.exceptions.ResourceInUse` - :exc:`~sprockets_dynamodb.exceptions.RequestExpired` - :exc:`~sprockets_dynamodb.exceptions.ResourceNotFound` - :exc:`~sprockets_dynamodb.exceptions.ServiceUnavailable` - :exc:`~sprockets_dynamodb.exceptions.ThroughputExceeded` - :exc:`~sprockets_dynamodb.exceptions.ValidationException` - .. _Scan: http://docs.aws.amazon.com/amazondynamodb/ latest/APIReference/API_Scan.html """ - raise NotImplementedError + payload = {'TableName': table_name} + if index_name: + payload['IndexName'] = index_name + if consistent_read is not None: + payload['ConsistentRead'] = consistent_read + if filter_expression: + payload['FilterExpression'] = filter_expression + if expression_attribute_names: + payload['ExpressionAttributeNames'] = expression_attribute_names + if expression_attribute_values: + payload['ExpressionAttributeValues'] = \ + utils.marshall(expression_attribute_values) + if projection_expression: + payload['ProjectionExpression'] = projection_expression + if segment: + payload['Segment'] = segment + if total_segments: + payload['TotalSegments'] = total_segments + if select: + _validate_select(select) + payload['Select'] = select + if exclusive_start_key: + payload['ExclusiveStartKey'] = utils.marshall(exclusive_start_key) + if limit: + payload['Limit'] = limit + if return_consumed_capacity: + _validate_return_consumed_capacity(return_consumed_capacity) + payload['ReturnConsumedCapacity'] = return_consumed_capacity + return self.execute('Scan', payload) @gen.coroutine def execute(self, action, parameters): @@ -1171,6 +1060,7 @@ class Client(object): """ result = yield self._execute_action(action, parameters) + self.logger.debug('%s result: %r', action, result) raise gen.Return(_unwrap_result(action, result)) @gen.coroutine @@ -1208,7 +1098,7 @@ class Client(object): :param method callback: The method to invoke """ - LOGGER.debug('Setting error callback: %r', callback) + self.logger.debug('Setting error callback: %r', callback) self._on_error = callback def set_instrumentation_callback(self, callback): @@ -1218,7 +1108,7 @@ class Client(object): :param method callback: The method to invoke """ - LOGGER.debug('Setting instrumentation callback: %r', callback) + self.logger.debug('Setting instrumentation callback: %r', callback) self._instrumentation_callback = callback def _execute(self, action, parameters, attempt, measurements): @@ -1248,7 +1138,6 @@ class Client(object): :exc:`~sprockets_dynamodb.exceptions.ValidationException` """ - LOGGER.debug('%s %r', action, parameters) future = concurrent.TracebackFuture() start = time.time() @@ -1274,7 +1163,7 @@ class Client(object): except aws_exceptions.NoProfileError as error: future.set_exception(exceptions.NoProfileError(str(error))) except (ConnectionError, ConnectionResetError, OSError, ssl.SSLError, - select.error, ssl.socket_error, socket.gaierror) as error: + _select.error, ssl.socket_error, socket.gaierror) as error: future.set_exception(exceptions.RequestException(str(error))) except TimeoutError: future.set_exception(exceptions.TimeoutException()) @@ -1316,7 +1205,7 @@ class Client(object): :param list measurements: The measurement accumulator """ - self.logger.debug('processing %s(%s) #%i = %r', + self.logger.debug('%s on %s request #%i = %r', action, table, attempt, response) now, error, result = time.time(), None, None try: @@ -1381,7 +1270,6 @@ def _unwrap_result(action, result): :rtype: list or dict """ - LOGGER.debug('%s Result: %r', action, result) if not result: return elif action in ['DeleteItem', 'PutItem', 'UpdateItem']: @@ -1407,7 +1295,6 @@ def _unwrap_delete_put_update_item(result): result['ItemCollectionMetrics'].get('SizeEstimateRangeGB', [None]).pop() } - LOGGER.debug('DELETE: %r', response) return response @@ -1423,11 +1310,14 @@ def _unwrap_get_item(result): def _unwrap_query_scan(result): response = { 'Count': result.get('Count', 0), - 'Items': [utils.unmarshall(i) for i in result.get('Items', [])] + 'Items': [utils.unmarshall(i) for i in result.get('Items', [])], + 'ScannedCount': result.get('ScannedCount', 0) } - for key in ['ConsumedCapacity', 'LastEvaluatedKey', 'ScannedCount']: - if key in result: - response[key] = result[key] + if 'LastEvaluatedKey' in result: + response['LastEvaluatedKey'] = \ + utils.unmarshall(result['LastEvaluatedKey']) + if 'ConsumedCapacity' in result: + response['ConsumedCapacity'] = result['ConsumedCapacity'] return response @@ -1445,3 +1335,9 @@ def _validate_return_values(value): if value not in ['NONE', 'ALL_NEW', 'ALL_OLD', 'UPDATED_NEW', 'UPDATED_OLD']: raise ValueError('Invalid return_values value') + + +def _validate_select(value): + if value not in ['ALL_ATTRIBUTES', 'ALL_PROJECTED_ATTRIBUTES', 'COUNT', + 'SPECIFIC_ATTRIBUTES']: + raise ValueError('Invalid select value') diff --git a/sprockets_dynamodb/utils.py b/sprockets_dynamodb/utils.py index b8ed322..97607fe 100644 --- a/sprockets_dynamodb/utils.py +++ b/sprockets_dynamodb/utils.py @@ -22,6 +22,19 @@ import sys PYTHON3 = True if sys.version_info > (3, 0, 0) else False TEXT_CHARS = bytearray({7, 8, 9, 10, 12, 13, 27} | set(range(0x20, 0x100)) - {0x7f}) +if PYTHON3: # pragma: nocover + unicode = str + + +def is_binary(value): + """ + Check to see if a string contains binary data in Python2 + + :param str|bytes value: The value to check + :rtype: bool + + """ + return bool(value.translate(None, TEXT_CHARS)) def marshall(values): @@ -42,6 +55,31 @@ def marshall(values): return serialized +def unmarshall(values): + """ + Transform a response payload from DynamoDB to a native dict + + :param dict values: The response payload from DynamoDB + :rtype: dict + :raises ValueError: if an unsupported type code is encountered + + """ + unmarshalled = {} + for key in values: + unmarshalled[key] = _unmarshall_dict(values[key]) + return unmarshalled + + +def _encode_binary_set(value): + """Base64 encode binary values in list of values. + + :param list value: The list of binary values + :rtype: list + + """ + return sorted([base64.b64encode(v).decode('ascii') for v in value]) + + def _marshall_value(value): """ Recursively transform `value` into an AttributeValue `dict` @@ -59,9 +97,11 @@ def _marshall_value(value): elif PYTHON3 and isinstance(value, str): return {'S': value} elif not PYTHON3 and isinstance(value, str): - if _is_binary(value): + if is_binary(value): return {'B': base64.b64encode(value).decode('ascii')} return {'S': value} + elif not PYTHON3 and isinstance(value, unicode): + return {'S': value.encode('utf-8')} elif isinstance(value, dict): return {'M': marshall(value)} elif isinstance(value, bool): @@ -82,10 +122,10 @@ def _marshall_value(value): elif all([isinstance(v, (int, float)) for v in value]): return {'NS': sorted([str(v) for v in value])} elif not PYTHON3 and all([isinstance(v, str) for v in value]) and \ - all([_is_binary(v) for v in value]): + all([is_binary(v) for v in value]): return {'BS': _encode_binary_set(value)} elif not PYTHON3 and all([isinstance(v, str) for v in value]) and \ - all([_is_binary(v) is False for v in value]): + all([is_binary(v) is False for v in value]): return {'SS': sorted(list(value))} else: raise ValueError('Can not mix types in a set') @@ -94,24 +134,15 @@ def _marshall_value(value): raise ValueError('Unsupported type: %s' % type(value)) -def _encode_binary_set(value): - return sorted([base64.b64encode(v).decode('ascii') for v in value]) - - - -def unmarshall(values): +def _to_number(value): """ - Transform a response payload from DynamoDB to a native dict + Convert the string containing a number to a number - :param dict values: The response payload from DynamoDB - :rtype: dict - :raises ValueError: if an unsupported type code is encountered + :param str value: The value to convert + :rtype: float|int """ - unmarshalled = {} - for key in values: - unmarshalled[key] = _unmarshall_dict(values[key]) - return unmarshalled + return float(value) if '.' in value else int(value) def _unmarshall_dict(value): @@ -148,23 +179,3 @@ def _unmarshall_dict(value): raise ValueError('Unsupported value type: %s' % key) -def _to_number(value): - """ - Convert the string containing a number to a number - - :param str value: The value to convert - :rtype: float|int - - """ - return float(value) if '.' in value else int(value) - - -def _is_binary(value): - """ - Check to see if a string contains binary data in Python2 - - :param str value: The value to check - :rtype: bool - - """ - return bool(value.translate(None, TEXT_CHARS)) diff --git a/tests/api_tests.py b/tests/api_tests.py index 18addc2..14a327c 100644 --- a/tests/api_tests.py +++ b/tests/api_tests.py @@ -1,5 +1,8 @@ +import collections import datetime +import logging import os +import random import socket import sys import uuid @@ -8,12 +11,18 @@ import unittest import mock from tornado import concurrent +from tornado import gen from tornado import httpclient from tornado import locks from tornado import testing from tornado_aws import exceptions as aws_exceptions import sprockets_dynamodb as dynamodb +from sprockets_dynamodb import utils + +LOGGER = logging.getLogger(__name__) + +os.environ['ASYNC_TEST_TIMEOUT'] = '30' class AsyncTestCase(testing.AsyncTestCase): @@ -24,6 +33,18 @@ class AsyncTestCase(testing.AsyncTestCase): self.client.set_error_callback(None) self.client.set_instrumentation_callback(None) + @gen.coroutine + def create_table(self, definition): + response = yield self.client.create_table(definition) + yield self.wait_on_table_creation(definition['TableName'], response) + + @gen.coroutine + def wait_on_table_creation(self, table_name, response): + while not self._table_is_ready(response): + LOGGER.debug('Waiting on %s to become ready', table_name) + yield gen.sleep(1) + response = yield self.client.describe_table(table_name) + @property def endpoint(self): return os.getenv('DYNAMODB_ENDPOINT') @@ -44,6 +65,54 @@ class AsyncTestCase(testing.AsyncTestCase): def get_client(self): return dynamodb.Client(endpoint=self.endpoint) + @staticmethod + def _table_is_ready(response): + LOGGER.debug('Is table ready? %r', response) + if response['TableStatus'] != dynamodb.TABLE_ACTIVE: + return False + for index in response.get('GlobalSecondaryIndexes', {}): + if index['IndexStatus'] != dynamodb.TABLE_ACTIVE: + return False + return True + +class AsyncItemTestCase(AsyncTestCase): + + def setUp(self): + self.definition = self.generic_table_definition() + return super(AsyncItemTestCase, self).setUp() + + def tearDown(self): + yield self.client.delete_table(self.definition['TableName']) + super(AsyncItemTestCase, self).tearDown() + + def new_item_value(self): + return { + 'id': str(uuid.uuid4()), + 'created_at': datetime.datetime.utcnow(), + 'value': str(uuid.uuid4()) + } + + def create_table(self, definition=None): + return super(AsyncItemTestCase, self).create_table( + definition or self.definition) + + def delete_item(self, item_id): + return self.client.delete_item(self.definition['TableName'], + {'id': item_id}, + return_consumed_capacity='TOTAL', + return_item_collection_metrics='SIZE', + return_values='ALL_OLD') + + def get_item(self, item_id): + return self.client.get_item(self.definition['TableName'], + {'id': item_id}) + + def put_item(self, item): + return self.client.put_item(self.definition['TableName'], item, + return_consumed_capacity='TOTAL', + return_item_collection_metrics='SIZE', + return_values='ALL_OLD') + class AWSClientTests(AsyncTestCase): @@ -223,13 +292,10 @@ class CreateTableTests(AsyncTestCase): @testing.gen_test def test_double_create(self): definition = self.generic_table_definition() - response = yield self.client.create_table(definition) - self.assertEqual(response['TableName'], definition['TableName']) - self.assertIn(response['TableStatus'], - [dynamodb.TABLE_ACTIVE, - dynamodb.TABLE_CREATING]) + yield self.create_table(definition) with self.assertRaises(dynamodb.ResourceInUse): - response = yield self.client.create_table(definition) + yield self.create_table(definition) + yield self.client.delete_table(definition['TableName']) class DeleteTableTests(AsyncTestCase): @@ -237,8 +303,7 @@ class DeleteTableTests(AsyncTestCase): @testing.gen_test def test_delete_table(self): definition = self.generic_table_definition() - response = yield self.client.create_table(definition) - self.assertEqual(response['TableName'], definition['TableName']) + yield self.create_table(definition) yield self.client.delete_table(definition['TableName']) with self.assertRaises(dynamodb.ResourceNotFound): yield self.client.describe_table(definition['TableName']) @@ -264,6 +329,8 @@ class DescribeTableTests(AsyncTestCase): self.assertEqual(response['TableName'], definition['TableName']) self.assertEqual(response['TableStatus'], dynamodb.TABLE_ACTIVE) + # Delete the table + yield self.client.delete_table(definition['TableName']) @testing.gen_test def test_table_not_found(self): @@ -289,62 +356,26 @@ class ListTableTests(AsyncTestCase): self.assertNotIn(definition['TableName'], response['TableNames']) -class ItemLifecycleTests(AsyncTestCase): - - def setUp(self): - self.definition = self.generic_table_definition() - return super(ItemLifecycleTests, self).setUp() - - def tearDown(self): - yield self.client.delete_table(self.definition['TableName']) - return super(ItemLifecycleTests, self).setUp() - - - def _create_item(self): - return { - 'id': str(uuid.uuid4()), - 'created_at': datetime.datetime.utcnow(), - 'value': str(uuid.uuid4()) - } - - def _create_table(self): - return self.client.create_table(self.definition) - - def _delete_item(self, item_id): - return self.client.delete_item(self.definition['TableName'], - {'id': item_id}, - return_consumed_capacity='TOTAL', - return_item_collection_metrics='SIZE', - return_values='ALL_OLD') - - def _get_item(self, item_id): - return self.client.get_item(self.definition['TableName'], - {'id': item_id}) - - def _put_item(self, item): - return self.client.put_item(self.definition['TableName'], item, - return_consumed_capacity='TOTAL', - return_item_collection_metrics='SIZE', - return_values='ALL_OLD') +class ItemLifecycleTests(AsyncItemTestCase): @testing.gen_test def test_item_lifecycle(self): - yield self._create_table() + yield self.create_table() - item = self._create_item() + item = self.new_item_value() - response = yield self._put_item(item) + response = yield self.put_item(item) self.assertIsNone(response) - response = yield self._get_item(item['id']) + response = yield self.get_item(item['id']) self.assertEqual(response['Item']['id'], item['id']) item['update_check'] = str(uuid.uuid4()) - response = yield self._put_item(item) + response = yield self.put_item(item) self.assertEqual(response['Attributes']['id'], item['id']) - response = yield self._get_item(item['id']) + response = yield self.get_item(item['id']) self.assertEqual(response['Item']['id'], item['id']) self.assertEqual(response['Item']['update_check'], item['update_check']) @@ -367,9 +398,126 @@ class ItemLifecycleTests(AsyncTestCase): self.assertEqual(response['Attributes']['update_check'], item['update_check']) - response = yield self._delete_item(item['id']) + response = yield self.delete_item(item['id']) self.assertEqual(response['Attributes']['id'], item['id']) self.assertEqual(response['Attributes']['update_check'], update_check) - response = yield self._get_item(item['id']) + response = yield self.get_item(item['id']) self.assertIsNone(response) + + +class TableQueryTests(AsyncItemTestCase): + + def setUp(self): + super(TableQueryTests, self).setUp() + self.common_counts = collections.Counter() + self.common_keys = [] + for iteration in range(0, 5): + self.common_keys.append(str(uuid.uuid4())) + + def new_item_value(self): + offset = random.randint(0, len(self.common_keys) - 1) + common_key = self.common_keys[offset] + self.common_counts[common_key] += 1 + return { + 'id': str(uuid.uuid4()), + 'created_at': datetime.datetime.utcnow(), + 'value': str(uuid.uuid4()), + 'common': common_key + } + + @staticmethod + def generic_table_definition(): + return { + 'TableName': str(uuid.uuid4()), + 'AttributeDefinitions': [{'AttributeName': 'id', + 'AttributeType': 'S'}, + {'AttributeName': 'common', + 'AttributeType': 'S'}], + 'KeySchema': [{'AttributeName': 'id', 'KeyType': 'HASH'}], + 'ProvisionedThroughput': { + 'ReadCapacityUnits': 5, + 'WriteCapacityUnits': 5 + }, + 'GlobalSecondaryIndexes': [ + { + 'IndexName': 'common', + 'KeySchema': [ + {'AttributeName': 'common', 'KeyType': 'HASH'} + ], + 'ProvisionedThroughput': { + 'ReadCapacityUnits': 1, + 'WriteCapacityUnits': 1 + }, + 'Projection': {'ProjectionType': 'ALL'} + } + ] + } + + @testing.gen_test() + def test_query_on_common_key(self): + yield self.create_table() + for iteration in range(0, 10): + payload = { + 'RequestItems': { + self.definition['TableName']: [{ + 'PutRequest': { + 'Item': utils.marshall(self.new_item_value()) + } + } for _row in range(0, 25)] + }, + 'ReturnConsumedCapacity': 'TOTAL', + 'ReturnItemCollectionMetrics': 'SIZE' + } + yield self.client.execute('BatchWriteItem', payload) + + for key in self.common_keys: + items = [] + kwargs = { + 'index_name': 'common', + 'key_condition_expression': '#common = :common', + 'expression_attribute_names': {'#common': 'common'}, + 'expression_attribute_values': {':common': key}, + 'limit': 5 + } + while True: + result = yield self.client.query(self.definition['TableName'], + **kwargs) + items += result['Items'] + if not result.get('LastEvaluatedKey'): + break + kwargs['exclusive_start_key'] = result['LastEvaluatedKey'] + self.assertEqual(len(items), self.common_counts[key]) + + +class TableScanTests(AsyncItemTestCase): + + @testing.gen_test() + def test_query_on_common_key(self): + yield self.create_table() + for iteration in range(0, 10): + payload = { + 'RequestItems': { + self.definition['TableName']: [{ + 'PutRequest': { + 'Item': utils.marshall(self.new_item_value()) + } + } for _row in range(0, 25)] + }, + 'ReturnConsumedCapacity': 'TOTAL', + 'ReturnItemCollectionMetrics': 'SIZE' + } + yield self.client.execute('BatchWriteItem', payload) + + items = [] + kwargs = { + 'limit': 5 + } + while True: + result = yield self.client.scan(self.definition['TableName'], + **kwargs) + items += result['Items'] + if not result.get('LastEvaluatedKey'): + break + kwargs['exclusive_start_key'] = result['LastEvaluatedKey'] + self.assertEqual(len(items), 250) diff --git a/tests/utils_tests.py b/tests/utils_tests.py index e20293d..a383e6a 100644 --- a/tests/utils_tests.py +++ b/tests/utils_tests.py @@ -1,5 +1,6 @@ import base64 import datetime +import sys import unittest import uuid @@ -17,6 +18,17 @@ class UTC(datetime.tzinfo): return datetime.timedelta(0) +class IsBinaryTests(unittest.TestCase): + + @unittest.skipIf(sys.version_info.major > 2, 'is_binary is Python2 only') + def test_is_binary_true(self): + self.assertTrue(utils.is_binary('\0x01\0x02\0x03')) + + @unittest.skipIf(sys.version_info.major > 2, 'is_binary is Python2 only') + def test_is_binary_false(self): + self.assertFalse(utils.is_binary('This is ASCII')) + + class MarshallTests(unittest.TestCase): maxDiff = None