mirror of
https://github.com/sprockets/sprockets.clients.dynamodb.git
synced 2024-11-30 11:19:54 +00:00
173 lines
6.1 KiB
Python
173 lines
6.1 KiB
Python
import json
|
|
import logging
|
|
import os
|
|
|
|
from tornado import concurrent, ioloop
|
|
from tornado_aws import client
|
|
|
|
from . import utils
|
|
|
|
|
|
LOGGER = logging.getLogger(__name__)
|
|
|
|
|
|
class DynamoDB(object):
|
|
"""
|
|
Connects to a DynamoDB instance.
|
|
|
|
:keyword str region: AWS region to send requests to
|
|
:keyword str access_key: AWS access key. If unspecified, this
|
|
defaults to the :envvar:`AWS_ACCESS_KEY_ID` environment
|
|
variable and will fall back to using the AWS CLI credentials
|
|
file. See :class:`tornado_aws.client.AsyncAWSClient` for
|
|
more details.
|
|
:keyword str secret_key: AWS secret used to secure API calls.
|
|
If unspecified, this defaults to the :envvar:`AWS_SECRET_ACCESS_KEY`
|
|
environment variable and will fall back to using the AWS CLI
|
|
credentials as described in :class:`tornado_aws.client.AsyncAWSClient`.
|
|
:keyword str profile: optional profile to use in AWS API calls.
|
|
If unspecified, this defaults to the :envvar:`AWS_DEFAULT_PROFILE`
|
|
environment variable or ``default`` if unset.
|
|
:keyword str endpoint: DynamoDB endpoint to contact. If unspecified,
|
|
the default is determined by the region.
|
|
:keyword int max_clients: optional maximum number of HTTP requests
|
|
that may be performed in parallel.
|
|
|
|
Create an instance of this class to interact with a DynamoDB
|
|
server. A :class:`tornado_aws.client.AsyncAWSClient` instance
|
|
implements the AWS API wrapping and this class provides the
|
|
DynamoDB specifics.
|
|
|
|
"""
|
|
|
|
def __init__(self, **kwargs):
|
|
self.logger = LOGGER.getChild(self.__class__.__name__)
|
|
self._client = None
|
|
self._args = kwargs.copy()
|
|
if os.environ.get('DYNAMODB_ENDPOINT', None):
|
|
self._args.setdefault('endpoint', os.environ['DYNAMODB_ENDPOINT'])
|
|
|
|
@property
|
|
def client(self):
|
|
if self._client is None:
|
|
self._client = client.AsyncAWSClient('dynamodb', **self._args)
|
|
return self._client
|
|
|
|
def execute(self, function, body):
|
|
"""
|
|
Invoke a DynamoDB function.
|
|
|
|
:param str function: DynamoDB function to invoke
|
|
:param dict body: body to send with the function
|
|
:rtype: tornado.concurrent.Future
|
|
|
|
This method creates a future that will resolve to the result
|
|
of calling the specified DynamoDB function. It does it's best
|
|
to unwrap the response from the function to make life a little
|
|
easier for you. It does this for the ``GetItem`` and ``Query``
|
|
functions currrently.
|
|
|
|
"""
|
|
encoded = json.dumps(body).encode('utf-8')
|
|
headers = {
|
|
'x-amz-target': 'DynamoDB_20120810.{}'.format(function),
|
|
'Content-Type': 'application/x-amz-json-1.0',
|
|
}
|
|
future = concurrent.TracebackFuture()
|
|
|
|
def handle_response(f):
|
|
self.logger.debug('processing %s() = %r', function, f)
|
|
try:
|
|
response = f.result()
|
|
result = json.loads(response.body.decode('utf-8'))
|
|
future.set_result(_unwrap_result(function, result))
|
|
except Exception as exception:
|
|
future.set_exception(exception)
|
|
|
|
self.logger.debug('calling %s', function)
|
|
aws_response = self.client.fetch('POST', '/', body=encoded,
|
|
headers=headers)
|
|
ioloop.IOLoop.current().add_future(aws_response, handle_response)
|
|
|
|
return future
|
|
|
|
def create_table(self, table_definition):
|
|
"""
|
|
Invoke the ``CreateTable`` function.
|
|
|
|
:param dict table_definition: description of the table to
|
|
create according to `CreateTable`_
|
|
:rtype: tornado.concurrent.Future
|
|
|
|
.. _CreateTable: http://docs.aws.amazon.com/amazondynamodb/
|
|
latest/APIReference/API_CreateTable.html
|
|
|
|
"""
|
|
return self.execute('CreateTable', table_definition)
|
|
|
|
def describe_table(self, table_name):
|
|
"""
|
|
Invoke the `DescribeTable`_ function.
|
|
|
|
:param str table_name: name of the table to describe.
|
|
:rtype: tornado.concurrent.Future
|
|
|
|
.. _DescribeTable: http://docs.aws.amazon.com/amazondynamodb/
|
|
latest/APIReference/API_DescribeTable.html
|
|
|
|
"""
|
|
return self.execute('DescribeTable', {'TableName': table_name})
|
|
|
|
def delete_table(self, table_name):
|
|
"""
|
|
Invoke the `DeleteTable`_ function.
|
|
|
|
:param str table_name: name of the table to describe.
|
|
:rtype: tornado.concurrent.Future
|
|
|
|
.. _DeleteTable: http://docs.aws.amazon.com/amazondynamodb/
|
|
latest/APIReference/API_DeleteTable.html
|
|
|
|
"""
|
|
return self.execute('DeleteTable', {'TableName': table_name})
|
|
|
|
def put_item(self, table_name, item):
|
|
"""
|
|
Invoke the `PutItem`_ function.
|
|
|
|
:param str table_name: table to insert into
|
|
:param dict item: item to insert. This will be marshalled
|
|
for you so a native :class:`dict` of native items works.
|
|
:rtype: tornado.concurrent.Future
|
|
|
|
.. _PutItem: http://docs.aws.amazon.com/amazondynamodb/
|
|
latest/APIReference/API_PutItem.html
|
|
|
|
"""
|
|
return self.execute('PutItem', {'TableName': table_name,
|
|
'Item': utils.marshall(item)})
|
|
|
|
def get_item(self, table_name, key_dict):
|
|
"""
|
|
Invoke the `GetItem`_ function.
|
|
|
|
:param str table_name: table to retrieve the item from
|
|
:param dict key_dict: key to use for retrieval. This will
|
|
be marshalled for you so a native :class:`dict` works.
|
|
:rtype: tornado.concurrent.Future
|
|
|
|
.. _GetItem: http://docs.aws.amazon.com/amazondynamodb/
|
|
latest/APIReference/API_GetItem.html
|
|
|
|
"""
|
|
return self.execute('GetItem', {'TableName': table_name,
|
|
'Key': utils.marshall(key_dict)})
|
|
|
|
|
|
def _unwrap_result(function, result):
|
|
if result:
|
|
if function == 'GetItem':
|
|
return utils.unmarshall(result['Item'])
|
|
if function == 'Query':
|
|
return [utils.unmarshall(item) for item in result['Items']]
|
|
return result
|