Fix tests, implement more methods

This commit is contained in:
Gavin M. Roy 2016-10-14 17:49:25 -04:00
parent 8f53fef944
commit 4e25f08233
6 changed files with 381 additions and 313 deletions

View file

@ -12,4 +12,4 @@ cover-branches = 1
cover-erase = 1
cover-package = sprockets_dynamodb
with-coverage = 1
verbose = 1
verbosity = 2

View file

@ -4,31 +4,32 @@ Sprockets DynamoDB
"""
import logging
try:
from sprockets_dynamodb.client import Client
except ImportError: # pragma: nocover
Client = None
try:
from sprockets_dynamodb.mixin import DynamoDBMixin
except ImportError: # pragma: nocover
DynamoDBMixin = None
try:
from sprockets_dynamodb.exceptions import *
except ImportError: # pragma: nocover
pass
version_info = (2, 0, 0)
__version__ = '.'.join(str(v) for v in version_info)
logging.getLogger(__name__).addHandler(logging.NullHandler())
try:
from sprockets_dynamodb.client import Client
except ImportError:
Client = None
try:
from sprockets_dynamodb.mixin import DynamoDBMixin
except ImportError:
DynamoDBMixin = None
from sprockets_dynamodb.exceptions import *
# Response constants
# Table status constants
TABLE_ACTIVE = 'ACTIVE'
TABLE_CREATING = 'CREATING'
TABLE_DELETING = 'DELETING'
TABLE_DISABLED = 'DISABLED'
TABLE_UPDATING = 'UPDATING'
__all__ = [
'client',
'exceptions',

View file

@ -7,7 +7,7 @@ import collections
import json
import logging
import os
import select
import select as _select
import socket
import ssl
import time
@ -21,7 +21,7 @@ from sprockets_dynamodb import exceptions, utils
# Stub Python3 exceptions for Python 2.7
try:
ConnectionError
except NameError:
except NameError: # pragma: nocover
class ConnectionError(Exception):
pass
@ -771,24 +771,6 @@ class Client(object):
def batch_get_item(self):
"""Invoke the `BatchGetItem`_ function.
:raises:
:exc:`~sprockets_dynamodb.exceptions.DynamoDBException`
:exc:`~sprockets_dynamodb.exceptions.ConfigNotFound`
:exc:`~sprockets_dynamodb.exceptions.NoCredentialsError`
:exc:`~sprockets_dynamodb.exceptions.NoProfileError`
:exc:`~sprockets_dynamodb.exceptions.TimeoutException`
:exc:`~sprockets_dynamodb.exceptions.RequestException`
:exc:`~sprockets_dynamodb.exceptions.InternalFailure`
:exc:`~sprockets_dynamodb.exceptions.LimitExceeded`
:exc:`~sprockets_dynamodb.exceptions.MissingParameter`
:exc:`~sprockets_dynamodb.exceptions.OptInRequired`
:exc:`~sprockets_dynamodb.exceptions.ResourceInUse`
:exc:`~sprockets_dynamodb.exceptions.RequestExpired`
:exc:`~sprockets_dynamodb.exceptions.ResourceNotFound`
:exc:`~sprockets_dynamodb.exceptions.ServiceUnavailable`
:exc:`~sprockets_dynamodb.exceptions.ThroughputExceeded`
:exc:`~sprockets_dynamodb.exceptions.ValidationException`
.. _BatchGetItem: http://docs.aws.amazon.com/amazondynamodb/
latest/APIReference/API_BatchGetItem.html
@ -798,36 +780,25 @@ class Client(object):
def batch_write_item(self):
"""Invoke the `BatchWriteItem`_ function.
:raises:
:exc:`~sprockets_dynamodb.exceptions.DynamoDBException`
:exc:`~sprockets_dynamodb.exceptions.ConfigNotFound`
:exc:`~sprockets_dynamodb.exceptions.NoCredentialsError`
:exc:`~sprockets_dynamodb.exceptions.NoProfileError`
:exc:`~sprockets_dynamodb.exceptions.TimeoutException`
:exc:`~sprockets_dynamodb.exceptions.RequestException`
:exc:`~sprockets_dynamodb.exceptions.InternalFailure`
:exc:`~sprockets_dynamodb.exceptions.LimitExceeded`
:exc:`~sprockets_dynamodb.exceptions.MissingParameter`
:exc:`~sprockets_dynamodb.exceptions.OptInRequired`
:exc:`~sprockets_dynamodb.exceptions.ResourceInUse`
:exc:`~sprockets_dynamodb.exceptions.RequestExpired`
:exc:`~sprockets_dynamodb.exceptions.ResourceNotFound`
:exc:`~sprockets_dynamodb.exceptions.ServiceUnavailable`
:exc:`~sprockets_dynamodb.exceptions.ThroughputExceeded`
:exc:`~sprockets_dynamodb.exceptions.ValidationException`
.. _BatchWriteItem: http://docs.aws.amazon.com/amazondynamodb/
latest/APIReference/API_BatchWriteItem.html
"""
raise NotImplementedError
def query(self, table_name, consistent_read=False,
exclusive_start_key=None, expression_attribute_names=None,
expression_attribute_values=None, filter_expression=None,
projection_expression=None, index_name=None,limit=None,
return_consumed_capacity=None, scan_index_forward=True,
select=None):
def query(self, table_name,
index_name=None,
consistent_read=None,
key_condition_expression=None,
filter_expression=None,
expression_attribute_names=None,
expression_attribute_values=None,
projection_expression=None,
select=None,
exclusive_start_key=None,
limit=None,
scan_index_forward=True,
return_consumed_capacity=None):
"""A `Query`_ operation uses the primary key of a table or a secondary
index to directly access items from that table or index.
@ -861,7 +832,7 @@ class Client(object):
you query a global secondary index with ``consistent_read`` set to
``True``, you will receive a
:exc:`~sprockets_dynamodb.exceptions.ValidationException`.
:param str|bytes|int exclusive_start_key: The primary key of the first
:param dict exclusive_start_key: The primary key of the first
item that this operation will evaluate. Use the value that was
returned for ``LastEvaluatedKey`` in the previous operation. In a
parallel scan, a *Scan* request that includes
@ -948,35 +919,53 @@ class Client(object):
the matching items themselves.
:rtype: dict
:raises:
:exc:`~sprockets_dynamodb.exceptions.DynamoDBException`
:exc:`~sprockets_dynamodb.exceptions.ConfigNotFound`
:exc:`~sprockets_dynamodb.exceptions.NoCredentialsError`
:exc:`~sprockets_dynamodb.exceptions.NoProfileError`
:exc:`~sprockets_dynamodb.exceptions.TimeoutException`
:exc:`~sprockets_dynamodb.exceptions.RequestException`
:exc:`~sprockets_dynamodb.exceptions.InternalFailure`
:exc:`~sprockets_dynamodb.exceptions.LimitExceeded`
:exc:`~sprockets_dynamodb.exceptions.MissingParameter`
:exc:`~sprockets_dynamodb.exceptions.OptInRequired`
:exc:`~sprockets_dynamodb.exceptions.ResourceInUse`
:exc:`~sprockets_dynamodb.exceptions.RequestExpired`
:exc:`~sprockets_dynamodb.exceptions.ResourceNotFound`
:exc:`~sprockets_dynamodb.exceptions.ServiceUnavailable`
:exc:`~sprockets_dynamodb.exceptions.ThroughputExceeded`
:exc:`~sprockets_dynamodb.exceptions.ValidationException`
.. _Query: http://docs.aws.amazon.com/amazondynamodb/
latest/APIReference/API_Query.html
"""
raise NotImplementedError
payload = {'TableName': table_name,
'ScanIndexForward': scan_index_forward}
if index_name:
payload['IndexName'] = index_name
if consistent_read is not None:
payload['ConsistentRead'] = consistent_read
if key_condition_expression:
payload['KeyConditionExpression'] = key_condition_expression
if filter_expression:
payload['FilterExpression'] = filter_expression
if expression_attribute_names:
payload['ExpressionAttributeNames'] = expression_attribute_names
if expression_attribute_values:
payload['ExpressionAttributeValues'] = \
utils.marshall(expression_attribute_values)
if projection_expression:
payload['ProjectionExpression'] = projection_expression
if select:
_validate_select(select)
payload['Select'] = select
if exclusive_start_key:
payload['ExclusiveStartKey'] = utils.marshall(exclusive_start_key)
if limit:
payload['Limit'] = limit
if return_consumed_capacity:
_validate_return_consumed_capacity(return_consumed_capacity)
payload['ReturnConsumedCapacity'] = return_consumed_capacity
return self.execute('Query', payload)
def scan(self, table_name, consistent_read=False, exclusive_start_key=None,
expression_attribute_names=None, expression_attribute_values=None,
filter_expression=None, projection_expression=None,
index_name=None, limit=None, return_consumed_capacity=None,
segment=None, total_segments=None):
def scan(self,
table_name,
index_name=None,
consistent_read=None,
projection_expression=None,
filter_expression=None,
expression_attribute_names=None,
expression_attribute_values=None,
segment=None,
total_segments=None,
select=None,
limit=None,
exclusive_start_key=None,
return_consumed_capacity=None):
"""The `Scan`_ operation returns one or more items and item attributes
by accessing every item in a table or a secondary index.
@ -1000,141 +989,41 @@ class Client(object):
you need a consistent copy of the data, as of the time that the *Scan*
begins, you can set the ``consistent_read`` parameter to ``True``.
:param str table_name: The name of the table containing the requested
items; or, if you provide IndexName, the name of the table to which
that index belongs.
:param bool consistent_read: A Boolean value that determines the read
consistency model during the scan:
- If set to ``False``, then the data returned from *Scan* might not
contain the results from other recently completed write
operations (*PutItem*, *UpdateItem*, or *DeleteItem*).
- If set to ``True``, then all of the write operations that
completed before the Scan began are guaranteed to be contained in
the *Scan* response.
The default setting is ``False``.
This parameter is not supported on global secondary indexes. If you
scan a global secondary index and set ``consistent_read`` to
``true``, you will receive a
:exc:`~sprockets_dynamodb.exceptions.ValidationException`.
:param str|bytes|int exclusive_start_key: The primary key of the first
item that this operation will evaluate. Use the value that was
returned for ``LastEvaluatedKey`` in the previous operation.
In a parallel scan, a *Scan* request that includes
``exclusive_start_key`` must specify the same segment whose
previous *Scan* returned the corresponding value of
``LastEvaluatedKey``.
:param dict expression_attribute_names: One or more substitution tokens
for attribute names in an expression.
:param dict expression_attribute_values: One or more values that can be
substituted in an expression.
:param str filter_expression: A string that contains conditions that
DynamoDB applies after the Scan operation, but before the data is
returned to you. Items that do not satisfy the expression criteria
are not returned.
.. note:: A filter expression is applied after the items have
already been read; the process of filtering does not consume
any additional read capacity units.
For more information, see `Filter Expressions <http://docs.aws.
amazon.com/amazondynamodb/latest/developerguide/QueryAndScan.html
#FilteringResults>`_ in the Amazon DynamoDB Developer Guide.
:param str projection_expression: A string that identifies one or more
attributes to retrieve from the specified table or index. These
attributes can include scalars, sets, or elements of a JSON
document. The attributes in the expression must be separated by
commas.
If no attribute names are specified, then all attributes will be
returned. If any of the requested attributes are not found, they
will not appear in the result.
For more information, see `Accessing Item Attributes <http://docs.
aws.amazon.com/amazondynamodb/latest/developerguide/Expressions.
AccessingItemAttributes.html>`_ in the Amazon DynamoDB Developer
Guide.
:param str index_name: The name of a secondary index to scan. This
index can be any local secondary index or global secondary index.
Note that if you use this parameter, you must also provide
``table_name``.
:param int limit: The maximum number of items to evaluate (not
necessarily the number of matching items). If DynamoDB processes
the number of items up to the limit while processing the results,
it stops the operation and returns the matching values up to that
point, and a key in ``LastEvaluatedKey`` to apply in a subsequent
operation, so that you can pick up where you left off. Also, if the
processed data set size exceeds 1 MB before DynamoDB reaches this
limit, it stops the operation and returns the matching values up to
the limit, and a key in ``LastEvaluatedKey`` to apply in a
subsequent operation to continue the operation. For more
information, see `Query and Scan <http://docs.aws.amazon.com/amazo
ndynamodb/latest/developerguide/QueryAndScan.html>`_ in the Amazon
DynamoDB Developer Guide.
:param str return_consumed_capacity: Determines the level of detail
about provisioned throughput consumption that is returned in the
response. Should be ``None`` or one of ``INDEXES`` or ``TOTAL``
:param int segment: For a parallel *Scan* request, ``segment``
identifies an individual segment to be scanned by an application
worker.
Segment IDs are zero-based, so the first segment is always ``0``.
For example, if you want to use four application threads to scan a
table or an index, then the first thread specifies a Segment value
of ``0``, the second thread specifies ``1``, and so on.
The value of ``LastEvaluatedKey`` returned from a parallel *Scan*
request must be used as ``ExclusiveStartKey`` with the same segment
ID in a subsequent *Scan* operation.
The value for ``segment`` must be greater than or equal to ``0``,
and less than the value provided for ``total_segments``.
If you provide ``segment``, you must also provide
``total_segments``.
:param int total_segments: For a parallel *Scan* request,
``total_segments`` represents the total number of segments into
which the *Scan* operation will be divided. The value of
``total_segments`` corresponds to the number of application workers
that will perform the parallel scan. For example, if you want to
use four application threads to scan a table or an index, specify a
``total_segments`` value of 4.
The value for ``total_segments`` must be greater than or equal to
``1``, and less than or equal to ``1000000``. If you specify a
``total_segments`` value of ``1``, the *Scan* operation will be
sequential rather than parallel.
If you specify ``total_segments``, you must also specify
``segments``.
:rtype: dict
:raises:
:exc:`~sprockets_dynamodb.exceptions.DynamoDBException`
:exc:`~sprockets_dynamodb.exceptions.ConfigNotFound`
:exc:`~sprockets_dynamodb.exceptions.NoCredentialsError`
:exc:`~sprockets_dynamodb.exceptions.NoProfileError`
:exc:`~sprockets_dynamodb.exceptions.TimeoutException`
:exc:`~sprockets_dynamodb.exceptions.RequestException`
:exc:`~sprockets_dynamodb.exceptions.InternalFailure`
:exc:`~sprockets_dynamodb.exceptions.LimitExceeded`
:exc:`~sprockets_dynamodb.exceptions.MissingParameter`
:exc:`~sprockets_dynamodb.exceptions.OptInRequired`
:exc:`~sprockets_dynamodb.exceptions.ResourceInUse`
:exc:`~sprockets_dynamodb.exceptions.RequestExpired`
:exc:`~sprockets_dynamodb.exceptions.ResourceNotFound`
:exc:`~sprockets_dynamodb.exceptions.ServiceUnavailable`
:exc:`~sprockets_dynamodb.exceptions.ThroughputExceeded`
:exc:`~sprockets_dynamodb.exceptions.ValidationException`
.. _Scan: http://docs.aws.amazon.com/amazondynamodb/
latest/APIReference/API_Scan.html
"""
raise NotImplementedError
payload = {'TableName': table_name}
if index_name:
payload['IndexName'] = index_name
if consistent_read is not None:
payload['ConsistentRead'] = consistent_read
if filter_expression:
payload['FilterExpression'] = filter_expression
if expression_attribute_names:
payload['ExpressionAttributeNames'] = expression_attribute_names
if expression_attribute_values:
payload['ExpressionAttributeValues'] = \
utils.marshall(expression_attribute_values)
if projection_expression:
payload['ProjectionExpression'] = projection_expression
if segment:
payload['Segment'] = segment
if total_segments:
payload['TotalSegments'] = total_segments
if select:
_validate_select(select)
payload['Select'] = select
if exclusive_start_key:
payload['ExclusiveStartKey'] = utils.marshall(exclusive_start_key)
if limit:
payload['Limit'] = limit
if return_consumed_capacity:
_validate_return_consumed_capacity(return_consumed_capacity)
payload['ReturnConsumedCapacity'] = return_consumed_capacity
return self.execute('Scan', payload)
@gen.coroutine
def execute(self, action, parameters):
@ -1171,6 +1060,7 @@ class Client(object):
"""
result = yield self._execute_action(action, parameters)
self.logger.debug('%s result: %r', action, result)
raise gen.Return(_unwrap_result(action, result))
@gen.coroutine
@ -1208,7 +1098,7 @@ class Client(object):
:param method callback: The method to invoke
"""
LOGGER.debug('Setting error callback: %r', callback)
self.logger.debug('Setting error callback: %r', callback)
self._on_error = callback
def set_instrumentation_callback(self, callback):
@ -1218,7 +1108,7 @@ class Client(object):
:param method callback: The method to invoke
"""
LOGGER.debug('Setting instrumentation callback: %r', callback)
self.logger.debug('Setting instrumentation callback: %r', callback)
self._instrumentation_callback = callback
def _execute(self, action, parameters, attempt, measurements):
@ -1248,7 +1138,6 @@ class Client(object):
:exc:`~sprockets_dynamodb.exceptions.ValidationException`
"""
LOGGER.debug('%s %r', action, parameters)
future = concurrent.TracebackFuture()
start = time.time()
@ -1274,7 +1163,7 @@ class Client(object):
except aws_exceptions.NoProfileError as error:
future.set_exception(exceptions.NoProfileError(str(error)))
except (ConnectionError, ConnectionResetError, OSError, ssl.SSLError,
select.error, ssl.socket_error, socket.gaierror) as error:
_select.error, ssl.socket_error, socket.gaierror) as error:
future.set_exception(exceptions.RequestException(str(error)))
except TimeoutError:
future.set_exception(exceptions.TimeoutException())
@ -1316,7 +1205,7 @@ class Client(object):
:param list measurements: The measurement accumulator
"""
self.logger.debug('processing %s(%s) #%i = %r',
self.logger.debug('%s on %s request #%i = %r',
action, table, attempt, response)
now, error, result = time.time(), None, None
try:
@ -1381,7 +1270,6 @@ def _unwrap_result(action, result):
:rtype: list or dict
"""
LOGGER.debug('%s Result: %r', action, result)
if not result:
return
elif action in ['DeleteItem', 'PutItem', 'UpdateItem']:
@ -1407,7 +1295,6 @@ def _unwrap_delete_put_update_item(result):
result['ItemCollectionMetrics'].get('SizeEstimateRangeGB',
[None]).pop()
}
LOGGER.debug('DELETE: %r', response)
return response
@ -1423,11 +1310,14 @@ def _unwrap_get_item(result):
def _unwrap_query_scan(result):
response = {
'Count': result.get('Count', 0),
'Items': [utils.unmarshall(i) for i in result.get('Items', [])]
'Items': [utils.unmarshall(i) for i in result.get('Items', [])],
'ScannedCount': result.get('ScannedCount', 0)
}
for key in ['ConsumedCapacity', 'LastEvaluatedKey', 'ScannedCount']:
if key in result:
response[key] = result[key]
if 'LastEvaluatedKey' in result:
response['LastEvaluatedKey'] = \
utils.unmarshall(result['LastEvaluatedKey'])
if 'ConsumedCapacity' in result:
response['ConsumedCapacity'] = result['ConsumedCapacity']
return response
@ -1445,3 +1335,9 @@ def _validate_return_values(value):
if value not in ['NONE', 'ALL_NEW', 'ALL_OLD',
'UPDATED_NEW', 'UPDATED_OLD']:
raise ValueError('Invalid return_values value')
def _validate_select(value):
if value not in ['ALL_ATTRIBUTES', 'ALL_PROJECTED_ATTRIBUTES', 'COUNT',
'SPECIFIC_ATTRIBUTES']:
raise ValueError('Invalid select value')

View file

@ -22,6 +22,19 @@ import sys
PYTHON3 = True if sys.version_info > (3, 0, 0) else False
TEXT_CHARS = bytearray({7, 8, 9, 10, 12, 13, 27} |
set(range(0x20, 0x100)) - {0x7f})
if PYTHON3: # pragma: nocover
unicode = str
def is_binary(value):
"""
Check to see if a string contains binary data in Python2
:param str|bytes value: The value to check
:rtype: bool
"""
return bool(value.translate(None, TEXT_CHARS))
def marshall(values):
@ -42,6 +55,31 @@ def marshall(values):
return serialized
def unmarshall(values):
"""
Transform a response payload from DynamoDB to a native dict
:param dict values: The response payload from DynamoDB
:rtype: dict
:raises ValueError: if an unsupported type code is encountered
"""
unmarshalled = {}
for key in values:
unmarshalled[key] = _unmarshall_dict(values[key])
return unmarshalled
def _encode_binary_set(value):
"""Base64 encode binary values in list of values.
:param list value: The list of binary values
:rtype: list
"""
return sorted([base64.b64encode(v).decode('ascii') for v in value])
def _marshall_value(value):
"""
Recursively transform `value` into an AttributeValue `dict`
@ -59,9 +97,11 @@ def _marshall_value(value):
elif PYTHON3 and isinstance(value, str):
return {'S': value}
elif not PYTHON3 and isinstance(value, str):
if _is_binary(value):
if is_binary(value):
return {'B': base64.b64encode(value).decode('ascii')}
return {'S': value}
elif not PYTHON3 and isinstance(value, unicode):
return {'S': value.encode('utf-8')}
elif isinstance(value, dict):
return {'M': marshall(value)}
elif isinstance(value, bool):
@ -82,10 +122,10 @@ def _marshall_value(value):
elif all([isinstance(v, (int, float)) for v in value]):
return {'NS': sorted([str(v) for v in value])}
elif not PYTHON3 and all([isinstance(v, str) for v in value]) and \
all([_is_binary(v) for v in value]):
all([is_binary(v) for v in value]):
return {'BS': _encode_binary_set(value)}
elif not PYTHON3 and all([isinstance(v, str) for v in value]) and \
all([_is_binary(v) is False for v in value]):
all([is_binary(v) is False for v in value]):
return {'SS': sorted(list(value))}
else:
raise ValueError('Can not mix types in a set')
@ -94,24 +134,15 @@ def _marshall_value(value):
raise ValueError('Unsupported type: %s' % type(value))
def _encode_binary_set(value):
return sorted([base64.b64encode(v).decode('ascii') for v in value])
def unmarshall(values):
def _to_number(value):
"""
Transform a response payload from DynamoDB to a native dict
Convert the string containing a number to a number
:param dict values: The response payload from DynamoDB
:rtype: dict
:raises ValueError: if an unsupported type code is encountered
:param str value: The value to convert
:rtype: float|int
"""
unmarshalled = {}
for key in values:
unmarshalled[key] = _unmarshall_dict(values[key])
return unmarshalled
return float(value) if '.' in value else int(value)
def _unmarshall_dict(value):
@ -148,23 +179,3 @@ def _unmarshall_dict(value):
raise ValueError('Unsupported value type: %s' % key)
def _to_number(value):
"""
Convert the string containing a number to a number
:param str value: The value to convert
:rtype: float|int
"""
return float(value) if '.' in value else int(value)
def _is_binary(value):
"""
Check to see if a string contains binary data in Python2
:param str value: The value to check
:rtype: bool
"""
return bool(value.translate(None, TEXT_CHARS))

View file

@ -1,5 +1,8 @@
import collections
import datetime
import logging
import os
import random
import socket
import sys
import uuid
@ -8,12 +11,18 @@ import unittest
import mock
from tornado import concurrent
from tornado import gen
from tornado import httpclient
from tornado import locks
from tornado import testing
from tornado_aws import exceptions as aws_exceptions
import sprockets_dynamodb as dynamodb
from sprockets_dynamodb import utils
LOGGER = logging.getLogger(__name__)
os.environ['ASYNC_TEST_TIMEOUT'] = '30'
class AsyncTestCase(testing.AsyncTestCase):
@ -24,6 +33,18 @@ class AsyncTestCase(testing.AsyncTestCase):
self.client.set_error_callback(None)
self.client.set_instrumentation_callback(None)
@gen.coroutine
def create_table(self, definition):
response = yield self.client.create_table(definition)
yield self.wait_on_table_creation(definition['TableName'], response)
@gen.coroutine
def wait_on_table_creation(self, table_name, response):
while not self._table_is_ready(response):
LOGGER.debug('Waiting on %s to become ready', table_name)
yield gen.sleep(1)
response = yield self.client.describe_table(table_name)
@property
def endpoint(self):
return os.getenv('DYNAMODB_ENDPOINT')
@ -44,6 +65,54 @@ class AsyncTestCase(testing.AsyncTestCase):
def get_client(self):
return dynamodb.Client(endpoint=self.endpoint)
@staticmethod
def _table_is_ready(response):
LOGGER.debug('Is table ready? %r', response)
if response['TableStatus'] != dynamodb.TABLE_ACTIVE:
return False
for index in response.get('GlobalSecondaryIndexes', {}):
if index['IndexStatus'] != dynamodb.TABLE_ACTIVE:
return False
return True
class AsyncItemTestCase(AsyncTestCase):
def setUp(self):
self.definition = self.generic_table_definition()
return super(AsyncItemTestCase, self).setUp()
def tearDown(self):
yield self.client.delete_table(self.definition['TableName'])
super(AsyncItemTestCase, self).tearDown()
def new_item_value(self):
return {
'id': str(uuid.uuid4()),
'created_at': datetime.datetime.utcnow(),
'value': str(uuid.uuid4())
}
def create_table(self, definition=None):
return super(AsyncItemTestCase, self).create_table(
definition or self.definition)
def delete_item(self, item_id):
return self.client.delete_item(self.definition['TableName'],
{'id': item_id},
return_consumed_capacity='TOTAL',
return_item_collection_metrics='SIZE',
return_values='ALL_OLD')
def get_item(self, item_id):
return self.client.get_item(self.definition['TableName'],
{'id': item_id})
def put_item(self, item):
return self.client.put_item(self.definition['TableName'], item,
return_consumed_capacity='TOTAL',
return_item_collection_metrics='SIZE',
return_values='ALL_OLD')
class AWSClientTests(AsyncTestCase):
@ -223,13 +292,10 @@ class CreateTableTests(AsyncTestCase):
@testing.gen_test
def test_double_create(self):
definition = self.generic_table_definition()
response = yield self.client.create_table(definition)
self.assertEqual(response['TableName'], definition['TableName'])
self.assertIn(response['TableStatus'],
[dynamodb.TABLE_ACTIVE,
dynamodb.TABLE_CREATING])
yield self.create_table(definition)
with self.assertRaises(dynamodb.ResourceInUse):
response = yield self.client.create_table(definition)
yield self.create_table(definition)
yield self.client.delete_table(definition['TableName'])
class DeleteTableTests(AsyncTestCase):
@ -237,8 +303,7 @@ class DeleteTableTests(AsyncTestCase):
@testing.gen_test
def test_delete_table(self):
definition = self.generic_table_definition()
response = yield self.client.create_table(definition)
self.assertEqual(response['TableName'], definition['TableName'])
yield self.create_table(definition)
yield self.client.delete_table(definition['TableName'])
with self.assertRaises(dynamodb.ResourceNotFound):
yield self.client.describe_table(definition['TableName'])
@ -264,6 +329,8 @@ class DescribeTableTests(AsyncTestCase):
self.assertEqual(response['TableName'], definition['TableName'])
self.assertEqual(response['TableStatus'],
dynamodb.TABLE_ACTIVE)
# Delete the table
yield self.client.delete_table(definition['TableName'])
@testing.gen_test
def test_table_not_found(self):
@ -289,62 +356,26 @@ class ListTableTests(AsyncTestCase):
self.assertNotIn(definition['TableName'], response['TableNames'])
class ItemLifecycleTests(AsyncTestCase):
def setUp(self):
self.definition = self.generic_table_definition()
return super(ItemLifecycleTests, self).setUp()
def tearDown(self):
yield self.client.delete_table(self.definition['TableName'])
return super(ItemLifecycleTests, self).setUp()
def _create_item(self):
return {
'id': str(uuid.uuid4()),
'created_at': datetime.datetime.utcnow(),
'value': str(uuid.uuid4())
}
def _create_table(self):
return self.client.create_table(self.definition)
def _delete_item(self, item_id):
return self.client.delete_item(self.definition['TableName'],
{'id': item_id},
return_consumed_capacity='TOTAL',
return_item_collection_metrics='SIZE',
return_values='ALL_OLD')
def _get_item(self, item_id):
return self.client.get_item(self.definition['TableName'],
{'id': item_id})
def _put_item(self, item):
return self.client.put_item(self.definition['TableName'], item,
return_consumed_capacity='TOTAL',
return_item_collection_metrics='SIZE',
return_values='ALL_OLD')
class ItemLifecycleTests(AsyncItemTestCase):
@testing.gen_test
def test_item_lifecycle(self):
yield self._create_table()
yield self.create_table()
item = self._create_item()
item = self.new_item_value()
response = yield self._put_item(item)
response = yield self.put_item(item)
self.assertIsNone(response)
response = yield self._get_item(item['id'])
response = yield self.get_item(item['id'])
self.assertEqual(response['Item']['id'], item['id'])
item['update_check'] = str(uuid.uuid4())
response = yield self._put_item(item)
response = yield self.put_item(item)
self.assertEqual(response['Attributes']['id'], item['id'])
response = yield self._get_item(item['id'])
response = yield self.get_item(item['id'])
self.assertEqual(response['Item']['id'], item['id'])
self.assertEqual(response['Item']['update_check'],
item['update_check'])
@ -367,9 +398,126 @@ class ItemLifecycleTests(AsyncTestCase):
self.assertEqual(response['Attributes']['update_check'],
item['update_check'])
response = yield self._delete_item(item['id'])
response = yield self.delete_item(item['id'])
self.assertEqual(response['Attributes']['id'], item['id'])
self.assertEqual(response['Attributes']['update_check'], update_check)
response = yield self._get_item(item['id'])
response = yield self.get_item(item['id'])
self.assertIsNone(response)
class TableQueryTests(AsyncItemTestCase):
def setUp(self):
super(TableQueryTests, self).setUp()
self.common_counts = collections.Counter()
self.common_keys = []
for iteration in range(0, 5):
self.common_keys.append(str(uuid.uuid4()))
def new_item_value(self):
offset = random.randint(0, len(self.common_keys) - 1)
common_key = self.common_keys[offset]
self.common_counts[common_key] += 1
return {
'id': str(uuid.uuid4()),
'created_at': datetime.datetime.utcnow(),
'value': str(uuid.uuid4()),
'common': common_key
}
@staticmethod
def generic_table_definition():
return {
'TableName': str(uuid.uuid4()),
'AttributeDefinitions': [{'AttributeName': 'id',
'AttributeType': 'S'},
{'AttributeName': 'common',
'AttributeType': 'S'}],
'KeySchema': [{'AttributeName': 'id', 'KeyType': 'HASH'}],
'ProvisionedThroughput': {
'ReadCapacityUnits': 5,
'WriteCapacityUnits': 5
},
'GlobalSecondaryIndexes': [
{
'IndexName': 'common',
'KeySchema': [
{'AttributeName': 'common', 'KeyType': 'HASH'}
],
'ProvisionedThroughput': {
'ReadCapacityUnits': 1,
'WriteCapacityUnits': 1
},
'Projection': {'ProjectionType': 'ALL'}
}
]
}
@testing.gen_test()
def test_query_on_common_key(self):
yield self.create_table()
for iteration in range(0, 10):
payload = {
'RequestItems': {
self.definition['TableName']: [{
'PutRequest': {
'Item': utils.marshall(self.new_item_value())
}
} for _row in range(0, 25)]
},
'ReturnConsumedCapacity': 'TOTAL',
'ReturnItemCollectionMetrics': 'SIZE'
}
yield self.client.execute('BatchWriteItem', payload)
for key in self.common_keys:
items = []
kwargs = {
'index_name': 'common',
'key_condition_expression': '#common = :common',
'expression_attribute_names': {'#common': 'common'},
'expression_attribute_values': {':common': key},
'limit': 5
}
while True:
result = yield self.client.query(self.definition['TableName'],
**kwargs)
items += result['Items']
if not result.get('LastEvaluatedKey'):
break
kwargs['exclusive_start_key'] = result['LastEvaluatedKey']
self.assertEqual(len(items), self.common_counts[key])
class TableScanTests(AsyncItemTestCase):
@testing.gen_test()
def test_query_on_common_key(self):
yield self.create_table()
for iteration in range(0, 10):
payload = {
'RequestItems': {
self.definition['TableName']: [{
'PutRequest': {
'Item': utils.marshall(self.new_item_value())
}
} for _row in range(0, 25)]
},
'ReturnConsumedCapacity': 'TOTAL',
'ReturnItemCollectionMetrics': 'SIZE'
}
yield self.client.execute('BatchWriteItem', payload)
items = []
kwargs = {
'limit': 5
}
while True:
result = yield self.client.scan(self.definition['TableName'],
**kwargs)
items += result['Items']
if not result.get('LastEvaluatedKey'):
break
kwargs['exclusive_start_key'] = result['LastEvaluatedKey']
self.assertEqual(len(items), 250)

View file

@ -1,5 +1,6 @@
import base64
import datetime
import sys
import unittest
import uuid
@ -17,6 +18,17 @@ class UTC(datetime.tzinfo):
return datetime.timedelta(0)
class IsBinaryTests(unittest.TestCase):
@unittest.skipIf(sys.version_info.major > 2, 'is_binary is Python2 only')
def test_is_binary_true(self):
self.assertTrue(utils.is_binary('\0x01\0x02\0x03'))
@unittest.skipIf(sys.version_info.major > 2, 'is_binary is Python2 only')
def test_is_binary_false(self):
self.assertFalse(utils.is_binary('This is ASCII'))
class MarshallTests(unittest.TestCase):
maxDiff = None