sprockets.clients.cassandra/sprockets/clients/cassandra/__init__.py
Brian Korty b391aab881 Implement initial client.
Implement the initial client. Add support for getting hostname via
environment variable.
2015-05-11 11:12:54 -04:00

103 lines
3.3 KiB
Python

"""
clients.cassandra
=================
Base functionality for accessing/modifying data in Cassandra.
"""
import os
import socket
from cassandra.cluster import Cluster
from tornado.concurrent import Future
version_info = (0, 0, 0)
__version__ = '.'.join(str(v) for v in version_info)
try:
from urllib.parse import urlsplit
except:
from urlparse import urlsplit
version_info = (0, 0, 0)
__version__ = '.'.join(str(v) for v in version_info)
DEFAULT_URI = 'cassandra://localhost'
class CassandraConnection(object):
"""Maintain a connection to a Cassandra cluster.
The Sprockets Cassandra client handles provides the glue
needed to join the Tornado async I/O module with the native
python async I/O used in the Cassandra driver. When
instantiating the client, a reference to the Tornado I/O loop
needs to be passed via the constructor so that Tornado future
result can be returned to the host application.
Configuration parameters the module are obtained from an
environment variables. Currently, the only variable is
``CASSANDRA_URI``, which takes the format "cassandra://hostname".
If not located, the hostname defaults to localhost.
.. note::
The hostname in the ``CASSANDRA_URI`` will be resolved
to a list of IP addresses that will be passed to the
Cassandra driver as the contact points.
"""
def __init__(self, ioloop):
self._config = self._get_cassandra_config()
self._cluster = Cluster(self._config['contact_points'])
self._session = self._cluster.connect()
self._ioloop = ioloop
def _get_cassandra_config(self):
"""Retrieve a dict containing Cassandra client config params."""
config = {}
parts = urlsplit(os.environ.get('CASSANDRA_URI', DEFAULT_URI))
if parts.scheme != 'cassandra':
raise RuntimeError(
'CASSANDRA_URI scheme is not "cassandra://"!')
_, _, ip_addresses = socket.gethostbyname_ex(parts.hostname)
if not ip_addresses:
raise RuntimeError('Unable to find Cassandra in DNS!')
config['contact_points'] = ip_addresses
return config
def set_keyspace(self, keyspace):
"""Set the keyspace used by the connection."""
self._session.set_keyspace(keyspace)
def shutdown(self):
"""Shutdown the connection to the Cassandra cluster."""
self._cluster.shutdown()
self._session = None
self._cluster = None
def execute(self, query, *args, **kwargs):
"""Asynchronously execute the specified CQL query.
The execute command also takes optional parameters and trace
keyword arguments. See cassandra-python documentation for
definition of those parameters.
"""
tornado_future = Future()
cassandra_future = self._session.execute_async(
query, *args, **kwargs)
self._ioloop.add_callback(
self._callback, cassandra_future, tornado_future)
return tornado_future
def _callback(self, cassandra_future, tornado_future):
"""Cassandra async I/O loop callback handler."""
try:
result = cassandra_future.result()
except Exception as exc:
return tornado_future.set_exception(exc)
tornado_future.set_result(result)