From 8641bebdca9baf05312e767bcaaad41f39603f6a Mon Sep 17 00:00:00 2001 From: Dave Shawley Date: Thu, 25 Feb 2016 13:35:22 -0500 Subject: [PATCH] Initial import of code from other projects. --- docs/api.rst | 3 +- docs/examples.rst | 6 + docs/index.rst | 1 + examples/__init__.py | 0 examples/create-table.py | 79 ++++++++++ setup.py | 2 +- sprockets/clients/dynamodb/__init__.py | 7 + sprockets/clients/dynamodb/connector.py | 173 ++++++++++++++++++++++ sprockets/clients/dynamodb/utils.py | 187 ++++++++++++++++++++++++ 9 files changed, 456 insertions(+), 2 deletions(-) create mode 100644 docs/examples.rst create mode 100644 examples/__init__.py create mode 100644 examples/create-table.py create mode 100644 sprockets/clients/dynamodb/connector.py create mode 100644 sprockets/clients/dynamodb/utils.py diff --git a/docs/api.rst b/docs/api.rst index f3e119f..b400c98 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -1,4 +1,5 @@ API Documentation ================= -.. automodule:: sprockets.clients.dynamodb +.. autoclass:: sprockets.clients.dynamodb.DynamoDB + :members: diff --git a/docs/examples.rst b/docs/examples.rst new file mode 100644 index 0000000..5d2ebfe --- /dev/null +++ b/docs/examples.rst @@ -0,0 +1,6 @@ +Examples +======== + +Creating a Table +---------------- +.. literalinclude:: ../examples/create-table.py diff --git a/docs/index.rst b/docs/index.rst index c9e8e47..05b4a05 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -4,5 +4,6 @@ :hidden: api + examples contributing history diff --git a/examples/__init__.py b/examples/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/examples/create-table.py b/examples/create-table.py new file mode 100644 index 0000000..58e858e --- /dev/null +++ b/examples/create-table.py @@ -0,0 +1,79 @@ +""" +Create the table described in `CreateTable`_. + +This example creates a table if it does not exist using chained +callbacks. + +.. _CreateTable: http://docs.aws.amazon.com/amazondynamodb/latest/ + APIReference/API_CreateTable.html + +""" +import logging +import sys + +from sprockets.clients import dynamodb +from tornado import ioloop + + +LOGGER = logging.getLogger('create-database') +TABLE_DEF = { + 'TableName': 'Thread', + 'AttributeDefinitions': [ + {'AttributeName': 'ForumName', 'AttributeType': 'S'}, + {'AttributeName': 'Subject', 'AttributeType': 'S'}, + {'AttributeName': 'LastPostDateTime', 'AttributeType': 'S'}, + ], + 'KeySchema': [ + {'AttributeName': 'ForumName', 'KeyType': 'HASH'}, + {'AttributeName': 'Subject', 'KeyType': 'RANGE'}, + ], + 'LocalSecondaryIndexes': [ + { + 'IndexName': 'LastPostIndex', + 'KeySchema': [ + {'AttributeName': 'ForumName', 'KeyType': 'HASH'}, + {'AttributeName': 'LastPostDateTime', 'KeyType': 'RANGE'}, + ], + 'Projection': { + 'ProjectionType': 'KEYS_ONLY', + }, + }, + ], + 'ProvisionedThroughput': { + 'ReadCapacityUnits': 5, + 'WriteCapacityUnits': 5, + } +} + +dynamo = dynamodb.DynamoDB() +iol = ioloop.IOLoop.instance() + + +def on_table_described(describe_response): + + def on_created(create_response): + try: + result = create_response.result() + LOGGER.info('table created - %r', result) + iol.add_callback(iol.stop) + except Exception: + LOGGER.exception('failed to create table') + sys.exit(-1) + + try: + result = describe_response.result() + LOGGER.info('found table %s, created %s', + result['Table']['TableName'], + result['Table']['CreationDateTime']) + iol.add_callback(iol.stop) + except Exception as error: + LOGGER.warn('table not found, attempting to create: %s', error) + next_future = dynamo.create_table(TABLE_DEF) + iol.add_future(next_future, on_created) + +logging.basicConfig(level=logging.DEBUG, + format='%(levelname)1.1s %(name)s: %(message)s') + +iol.add_future(dynamo.describe_table(TABLE_DEF['TableName']), + on_table_described) +iol.start() diff --git a/setup.py b/setup.py index 2ec9de3..5260213 100755 --- a/setup.py +++ b/setup.py @@ -35,7 +35,7 @@ setuptools.setup( install_requires=read_requirements('installation.txt'), license='BSD', namespace_packages=['sprockets', 'sprockets.clients'], - packages=setuptools.find_packages(), + packages=setuptools.find_packages(exclude=['examples']), classifiers=[ 'Development Status :: 1 - Planning', 'Environment :: No Input/Output (Daemon)', diff --git a/sprockets/clients/dynamodb/__init__.py b/sprockets/clients/dynamodb/__init__.py index 2e4dd40..fa8267b 100644 --- a/sprockets/clients/dynamodb/__init__.py +++ b/sprockets/clients/dynamodb/__init__.py @@ -1,2 +1,9 @@ +try: + from .connector import DynamoDB +except ImportError as error: + def DynamoDB(*args, **kwargs): + raise error + version_info = (0, 0, 0) __version__ = '.'.join(str(v) for v in version_info) +__all__ = ['DynamoDB', 'version_info', '__version__'] diff --git a/sprockets/clients/dynamodb/connector.py b/sprockets/clients/dynamodb/connector.py new file mode 100644 index 0000000..c6fb848 --- /dev/null +++ b/sprockets/clients/dynamodb/connector.py @@ -0,0 +1,173 @@ +import json +import logging +import os + +from tornado import concurrent, ioloop +from tornado_aws import client + +from . import utils + + +LOGGER = logging.getLogger(__name__) + + +class DynamoDB(object): + """ + Connects to a DynamoDB instance. + + :keyword str region: AWS region to send requests to + :keyword str access_key: AWS access key. If unspecified, this + defaults to the :envvar:`AWS_ACCESS_KEY_ID` environment + variable and will fall back to using the AWS CLI credentials + file. See :class:`tornado_aws.client.AsyncAWSClient` for + more details. + :keyword str secret_key: AWS secret used to secure API calls. + If unspecified, this defaults to the :envvar:`AWS_SECRET_ACCESS_KEY` + environment variable and will fall back to using the AWS CLI + credentials as described in :class:`tornado_aws.client.AsyncAWSClient`. + :keyword str profile: optional profile to use in AWS API calls. + If unspecified, this defaults to the :envvar:`AWS_DEFAULT_PROFILE` + environment variable or ``default`` if unset. + :keyword str endpoint: DynamoDB endpoint to contact. If unspecified, + the default is determined by the region. + :keyword int max_clients: optional maximum number of HTTP requests + that may be performed in parallel. + + Create an instance of this class to interact with a DynamoDB + server. A :class:`tornado_aws.client.AsyncAWSClient` instance + implements the AWS API wrapping and this class provides the + DynamoDB specifics. + + """ + + def __init__(self, **kwargs): + self.logger = LOGGER.getChild(self.__class__.__name__) + self._client = None + self._args = kwargs.copy() + if os.environ.get('DYNAMODB_ENDPOINT', None): + self._args.setdefault('endpoint', os.environ['DYNAMODB_ENDPOINT']) + + @property + def client(self): + if self._client is None: + self._client = client.AsyncAWSClient('dynamodb', **self._args) + return self._client + + def execute(self, function, body): + """ + Invoke a DynamoDB function. + + :param str function: DynamoDB function to invoke + :param dict body: body to send with the function + :rtype: tornado.concurrent.Future + + This method creates a future that will resolve to the result + of calling the specified DynamoDB function. It does it's best + to unwrap the response from the function to make life a little + easier for you. It does this for the ``GetItem`` and ``Query`` + functions currrently. + + """ + encoded = json.dumps(body).encode('utf-8') + headers = { + 'x-amz-target': 'DynamoDB_20120810.{}'.format(function), + 'Content-Type': 'application/x-amz-json-1.0', + } + future = concurrent.TracebackFuture() + + def handle_response(f): + self.logger.debug('processing %s() = %r', function, f) + try: + response = f.result() + result = json.loads(response.body.decode('utf-8')) + future.set_result(_unwrap_result(function, result)) + except Exception as exception: + future.set_exception(exception) + + self.logger.debug('calling %s', function) + aws_response = self.client.fetch('POST', '/', body=encoded, + headers=headers) + ioloop.IOLoop.current().add_future(aws_response, handle_response) + + return future + + def create_table(self, table_definition): + """ + Invoke the ``CreateTable`` function. + + :param dict table_definition: description of the table to + create according to `CreateTable`_ + :rtype: tornado.concurrent.Future + + .. _CreateTable: http://docs.aws.amazon.com/amazondynamodb/ + latest/APIReference/API_CreateTable.html + + """ + return self.execute('CreateTable', table_definition) + + def describe_table(self, table_name): + """ + Invoke the `DescribeTable`_ function. + + :param str table_name: name of the table to describe. + :rtype: tornado.concurrent.Future + + .. _DescribeTable: http://docs.aws.amazon.com/amazondynamodb/ + latest/APIReference/API_DescribeTable.html + + """ + return self.execute('DescribeTable', {'TableName': table_name}) + + def delete_table(self, table_name): + """ + Invoke the `DeleteTable`_ function. + + :param str table_name: name of the table to describe. + :rtype: tornado.concurrent.Future + + .. _DeleteTable: http://docs.aws.amazon.com/amazondynamodb/ + latest/APIReference/API_DeleteTable.html + + """ + return self.execute('DeleteTable', {'TableName': table_name}) + + def put_item(self, table_name, item): + """ + Invoke the `PutItem`_ function. + + :param str table_name: table to insert into + :param dict item: item to insert. This will be marshalled + for you so a native :class:`dict` of native items works. + :rtype: tornado.concurrent.Future + + .. _PutItem: http://docs.aws.amazon.com/amazondynamodb/ + latest/APIReference/API_PutItem.html + + """ + return self.execute('PutItem', {'TableName': table_name, + 'Item': utils.marshall(item)}) + + def get_item(self, table_name, key_dict): + """ + Invoke the `GetItem`_ function. + + :param str table_name: table to retrieve the item from + :param dict key_dict: key to use for retrieval. This will + be marshalled for you so a native :class:`dict` works. + :rtype: tornado.concurrent.Future + + .. _GetItem: http://docs.aws.amazon.com/amazondynamodb/ + latest/APIReference/API_GetItem.html + + """ + return self.execute('GetItem', {'TableName': table_name, + 'Key': utils.marshall(key_dict)}) + + +def _unwrap_result(function, result): + if result: + if function == 'GetItem': + return utils.unmarshall(result['Item']) + if function == 'Query': + return [utils.unmarshall(item) for item in result['Items']] + return result diff --git a/sprockets/clients/dynamodb/utils.py b/sprockets/clients/dynamodb/utils.py new file mode 100644 index 0000000..91fdb26 --- /dev/null +++ b/sprockets/clients/dynamodb/utils.py @@ -0,0 +1,187 @@ +""" +Utilities for working with DynamoDB. + +- :func:`.marshall` +- :func:`.unmarshal` + +This module contains some helpers that make working with the +Amazon DynamoDB API a little less painful. Data is encoded as +`AttributeValue`_ structures in the JSON payloads and this module +defines functions that will handle the transcoding for you for +the vast majority of types that we use. + +.. _AttributeValue: http://docs.aws.amazon.com/amazondynamodb/latest/ + APIReference/API_AttributeValue.html + +""" +import datetime +import uuid +import sys +try: + import arrow +except ImportError: + arrow = None + + +PYTHON3 = True if sys.version_info > (3, 0, 0) else False +TEXTCHARS = bytearray({7,8,9,10,12,13,27} | set(range(0x20, 0x100)) - {0x7f}) + + +def marshall(values): + """ + Marshall a `dict` into something DynamoDB likes. + + :param dict values: The values to marshall + :rtype: dict + :raises ValueError: if an unsupported type is encountered + + Return the values in a nested dict structure that is required for + writing the values to DynamoDB. + + """ + serialized = {} + for key in values: + serialized[key] = _marshall_value(values[key]) + return serialized + + +def _marshall_value(value): + """ + Recursively transform `value` into an AttributeValue `dict` + + :param mixed value: The value to encode + :rtype: dict + :raises ValueError: for unsupported types + + Return the value as dict indicating the data type and transform or + recursively process the value if required. + + """ + if PYTHON3 and isinstance(value, bytes): + return {'B': value} + elif PYTHON3 and isinstance(value, str): + return {'S': value} + elif not PYTHON3 and isinstance(value, str): + if _is_binary(value): + return {'B': value} + return {'S': value} + elif isinstance(value, dict): + return {'M': marshall(value)} + elif isinstance(value, bool): + return {'BOOL': value} + elif isinstance(value, (int, float)): + return {'N': str(value)} + elif isinstance(value, datetime.datetime): + return {'S': value.isoformat()} + elif arrow is not None and isinstance(value, arrow.Arrow): + return {'S': value.isoformat()} + elif isinstance(value, uuid.UUID): + return {'S': str(value)} + elif isinstance(value, list): + return {'L': [_marshall_value(v) for v in value]} + elif isinstance(value, set): + if PYTHON3 and all([isinstance(v, bytes) for v in value]): + return {'BS': sorted(list(value))} + elif PYTHON3 and all([isinstance(v, str) for v in value]): + return {'SS': sorted(list(value))} + elif all([isinstance(v, (int, float)) for v in value]): + return {'NS': sorted([str(v) for v in value])} + elif not PYTHON3 and all([isinstance(v, str) for v in value]) and \ + all([_is_binary(v) for v in value]): + return {'BS': sorted(list(value))} + elif not PYTHON3 and all([isinstance(v, str) for v in value]) and \ + all([_is_binary(v) is False for v in value]): + return {'SS': sorted(list(value))} + else: + raise ValueError('Can not mix types in a set') + elif value is None: + return {'NULL': True} + raise ValueError('Unsupported type: %s' % type(value)) + + +def unmarshall(values): + """ + Transform a response payload from DynamoDB to a native dict + + :param dict values: The response payload from DynamoDB + :rtype: dict + :raises ValueError: if an unsupported type code is encountered + + """ + unmarshalled = {} + for key in values: + unmarshalled[key] = _unmarshall_dict(values[key]) + return unmarshalled + + +def _unmarshall_dict(value): + """Unmarshall a single dict value from a row that was returned from + DynamoDB, returning the value as a normal Python dict. + + :param dict value: The value to unmarshall + :rtype: mixed + :raises ValueError: if an unsupported type code is encountered + + """ + key = list(value.keys()).pop() + if key == 'B': + return bytes(value[key]) + elif key == 'BS': + return set([bytes(v) for v in value[key]]) + elif key == 'BOOL': + return value[key] + elif key == 'L': + return [_unmarshall_dict(v) for v in value[key]] + elif key == 'M': + return unmarshall(value[key]) + elif key == 'NULL': + return None + elif key == 'N': + return _to_number(value[key]) + elif key == 'NS': + return set([_to_number(v) for v in value[key]]) + elif key == 'S': + return _maybe_convert(value[key]) + elif key == 'SS': + return set([_maybe_convert(v) for v in value[key]]) + raise ValueError('Unsupported value type: %s' % key) + + +def _to_number(value): + """ + Convert the string containing a number to a number + + :param str value: The value to convert + :rtype: float|int + + """ + return float(value) if '.' in value else int(value) + + +def _maybe_convert(value): + """ + Try to convert a string into something useful. + + :param str value: The value to convert + :rtype: uuid.UUID|datetime.datetime|str + + Possibly convert the value to a :py:class:`uuid.UUID` or + :py:class:`datetime.datetime` if possible, otherwise just return + the value. + + """ + try: + return uuid.UUID(value) + except ValueError: + return value + + +def _is_binary(value): + """ + Check to see if a string contains binary data in Python2 + + :param str value: The value to check + :rtype: bool + + """ + return bool(value.translate(None, TEXTCHARS))