diff --git a/.travis.yml b/.travis.yml index 40bf056..ecccc8a 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,13 +1,10 @@ language: python python: -- 2.6 - 2.7 - pypy -- 3.2 - 3.3 - 3.4 install: -- if [[ $TRAVIS_PYTHON_VERSION == '2.6' ]]; then pip install unittest2; fi - pip install -r requirements.txt -r test-requirements.txt script: nosetests after_success: @@ -22,3 +19,5 @@ deploy: python: 2.7 tags: true all_branches: true +services: + - cassandra diff --git a/docs/api.rst b/docs/api.rst new file mode 100644 index 0000000..307c475 --- /dev/null +++ b/docs/api.rst @@ -0,0 +1,2 @@ +.. automodule:: sprockets.clients.cassandra + :members: diff --git a/requirements.txt b/requirements.txt index fd63dd2..bbaab97 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,2 @@ -# Add dependencies that are required to install your package. These will -# be passed into `setup` as the `install_requires` keyword. -# +blist==1.3.6 +cassandra-driver==2.5.1 diff --git a/setup.py b/setup.py index fdc9edc..db64e44 100755 --- a/setup.py +++ b/setup.py @@ -2,51 +2,62 @@ import codecs import sys -from setuptools import setup, find_packages +import setuptools -import sprockets.clients.cassandra +def read_requirements_file(req_name): + requirements = [] + try: + with codecs.open(req_name, encoding='utf-8') as req_file: + for req_line in req_file: + if '#' in req_line: + req_line = req_line[0:req_line.find('#')].strip() + if req_line: + requirements.append(req_line.strip()) + except IOError: + pass + return requirements -def read_requirements_file(filename): - """Read pip-formatted requirements from a file.""" - with open(filename, 'r') as f: - return [line.strip() for line in f.readlines() - if not line.startswith('#')] +install_requires = read_requirements_file('requirements.txt') +setup_requires = read_requirements_file('setup-requirements.txt') +tests_require = read_requirements_file('test-requirements.txt') -requirements = read_requirements_file('requirements.txt') -test_requirements = read_requirements_file('test-requirements.txt') -if sys.version_info < (3, ): - requirements.append('six>=1.7,<2.0') - test_requirements.append('mock>=1.0.1,<2.0') -if sys.version_info < (2, 7): - test_requirements.append('unittest2==0.5.1') -setup( +setuptools.setup( name='sprockets.clients.cassandra', - description='Base functioanlity for accessing/modifying data in Cassandra', - version=sprockets.clients.cassandra.__version__, - packages=find_packages(exclude=['tests', 'tests.*']), - test_suite='nose.collector', - include_package_data=True, + version='0.0.0', + description='Base functionality for accessing/modifying data in Cassandra', long_description=codecs.open('README.rst', encoding='utf-8').read(), - install_requires=requirements, - tests_require=test_requirements, - author='AWeber Communications, Inc.', + url='https://github.com/sprockets/sprockets.clients.cassandra.git', + author='AWeber Communications', author_email='api@aweber.com', - url='https://github.com/aweber/sprockets.clients.cassandra', + license=codecs.open('LICENSE', encoding='utf-8').read(), classifiers=[ - 'Development Status :: 1 - Planning', + 'Development Status :: 4 - Beta', 'Intended Audience :: Developers', 'License :: OSI Approved :: BSD License', 'Natural Language :: English', 'Operating System :: OS Independent', 'Programming Language :: Python :: 2', - 'Programming Language :: Python :: 2.6', 'Programming Language :: Python :: 2.7', 'Programming Language :: Python :: 3', 'Programming Language :: Python :: 3.2', 'Programming Language :: Python :: 3.3', 'Programming Language :: Python :: 3.4', 'Programming Language :: Python :: Implementation :: CPython', + 'Programming Language :: Python :: Implementation :: PyPy', + 'Topic :: Software Development :: Libraries', + 'Topic :: Software Development :: Libraries :: Python Modules' ], -) + packages=['sprockets', + 'sprockets.clients', + 'sprockets.clients.cassandra'], + package_data={'': ['LICENSE', 'README.md']}, + include_package_data=True, + namespace_packages=['sprockets', + 'sprockets.clients'], + install_requires=install_requires, + setup_requires=setup_requires, + tests_require=tests_require, + test_suite='nose.collector', + zip_safe=False) diff --git a/sprockets.clients.cassandra/__init__.py b/sprockets.clients.cassandra/__init__.py deleted file mode 100644 index 9afb70c..0000000 --- a/sprockets.clients.cassandra/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -# -*- coding: utf-8 -*- -version_info = (0, 0, 0) -__version__ = '.'.join(str(v) for v in version_info[:3]) diff --git a/sprockets/__init__.py b/sprockets/__init__.py new file mode 100644 index 0000000..de40ea7 --- /dev/null +++ b/sprockets/__init__.py @@ -0,0 +1 @@ +__import__('pkg_resources').declare_namespace(__name__) diff --git a/sprockets/clients/__init__.py b/sprockets/clients/__init__.py new file mode 100644 index 0000000..de40ea7 --- /dev/null +++ b/sprockets/clients/__init__.py @@ -0,0 +1 @@ +__import__('pkg_resources').declare_namespace(__name__) diff --git a/sprockets/clients/cassandra/__init__.py b/sprockets/clients/cassandra/__init__.py new file mode 100644 index 0000000..7c8b2ad --- /dev/null +++ b/sprockets/clients/cassandra/__init__.py @@ -0,0 +1,101 @@ +""" +clients.cassandra +================= + +Base functionality for accessing/modifying data in Cassandra. + +""" +import os +import socket + +from cassandra.cluster import Cluster +from tornado.concurrent import Future +from tornado.ioloop import IOLoop + +try: + from urllib.parse import urlsplit +except: + from urlparse import urlsplit + +version_info = (0, 0, 0) +__version__ = '.'.join(str(v) for v in version_info) + +DEFAULT_URI = 'cassandra://localhost' + + +class CassandraConnection(object): + """Maintain a connection to a Cassandra cluster. + + The Sprockets Cassandra client handles provides the glue + needed to join the Tornado async I/O module with the native + python async I/O used in the Cassandra driver. The + constructor of the function will grab the current handle + to the underlying Tornado I/O loop so that a Tornado future + result can be returned to the host application. + + Configuration parameters for the module are obtained from + environment variables. Currently, the only variable is + ``CASSANDRA_URI``, which takes the format "cassandra://hostname". + If not located, the hostname defaults to localhost. + + .. note:: + + The hostname in the ``CASSANDRA_URI`` will be resolved + to a list of IP addresses that will be passed to the + Cassandra driver as the contact points. + + """ + + def __init__(self, ioloop=None): + self._config = self._get_cassandra_config() + self._cluster = Cluster(self._config['contact_points']) + self._session = self._cluster.connect() + self._ioloop = IOLoop.current() + + def _get_cassandra_config(self): + """Retrieve a dict containing Cassandra client config params.""" + config = {} + parts = urlsplit(os.environ.get('CASSANDRA_URI', DEFAULT_URI)) + if parts.scheme != 'cassandra': + raise RuntimeError( + 'CASSANDRA_URI scheme is not "cassandra://"!') + + _, _, ip_addresses = socket.gethostbyname_ex(parts.hostname) + if not ip_addresses: + raise RuntimeError('Unable to find Cassandra in DNS!') + + config['contact_points'] = ip_addresses + return config + + def set_keyspace(self, keyspace): + """Set the keyspace used by the connection.""" + self._session.set_keyspace(keyspace) + + def shutdown(self): + """Shutdown the connection to the Cassandra cluster.""" + self._cluster.shutdown() + self._session = None + self._cluster = None + + def execute(self, query, *args, **kwargs): + """Asynchronously execute the specified CQL query. + + The execute command also takes optional parameters and trace + keyword arguments. See cassandra-python documentation for + definition of those parameters. + """ + + tornado_future = Future() + cassandra_future = self._session.execute_async( + query, *args, **kwargs) + self._ioloop.add_callback( + self._callback, cassandra_future, tornado_future) + return tornado_future + + def _callback(self, cassandra_future, tornado_future): + """Cassandra async I/O loop callback handler.""" + try: + result = cassandra_future.result() + except Exception as exc: + return tornado_future.set_exception(exc) + tornado_future.set_result(result) diff --git a/test-requirements.txt b/test-requirements.txt index 28aec1f..952e16a 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -9,4 +9,4 @@ # minimize breakage to our dev environment. coveralls>=0.4,<1.0 nose>=1.3.1,<2.0.0 -test-helpers>=1.5.1,<2.0.0 +tornado>=3.0.0,<4.0.0 diff --git a/tests.py b/tests.py new file mode 100644 index 0000000..eb83ff5 --- /dev/null +++ b/tests.py @@ -0,0 +1,68 @@ +""" +Tests for the sprockets.clients.cassandra package + +""" +import os +import socket +import time + +from cassandra.cluster import Cluster +from cassandra.protocol import SyntaxException +from tornado.testing import AsyncTestCase, gen_test + +from sprockets.clients.cassandra import CassandraConnection + + +class TestCassandraConnectionClass(AsyncTestCase): + + def setUp(self): + super(TestCassandraConnectionClass, self).setUp() + self.cluster = Cluster(self.find_cassandra()) + self.session = self.cluster.connect() + self.keyspace = 'sprocketstest{0}'.format(int(time.time()*10000)) + self.create_fixtures() + self.connection = CassandraConnection() + + def tearDown(self): + super(TestCassandraConnectionClass, self).tearDown() + self.session.execute("DROP KEYSPACE {0}".format(self.keyspace)) + self.connection.shutdown() + + def find_cassandra(self): + uri = os.environ.get('CASSANDRA_URI', 'cassandra://localhost') + hostname = uri[12:] + _, _, ips = socket.gethostbyname_ex(hostname) + return ips + + def create_fixtures(self): + self.session.execute( + "CREATE KEYSPACE IF NOT EXISTS {0} WITH REPLICATION = " + "{{'class': 'SimpleStrategy', " + "'replication_factor': 1}}".format(self.keyspace)) + self.session.execute("USE {0}".format(self.keyspace)) + self.session.execute( + "CREATE TABLE IF NOT EXISTS names (name text PRIMARY KEY)") + self.session.execute( + "INSERT INTO names (name) VALUES ('Peabody')") + + @gen_test + def test_several_queries(self): + futures = [] + count = 100 + for i in range(count): + futures.append(self.connection.execute( + "SELECT name FROM {0}.names".format(self.keyspace))) + results = 0 + for future in futures: + yield future + results += 1 + self.assertEqual(count, results) + + @gen_test + def test_bad_query(self): + with self.assertRaises(SyntaxException): + yield self.connection.execute('goobletygook') + + @gen_test + def test_set_keyspace(self): + self.connection.set_keyspace(self.keyspace) diff --git a/tests/__init__.py b/tests/__init__.py deleted file mode 100644 index 8b13789..0000000 --- a/tests/__init__.py +++ /dev/null @@ -1 +0,0 @@ - diff --git a/tests/acceptance/__init__.py b/tests/acceptance/__init__.py deleted file mode 100644 index 8b13789..0000000 --- a/tests/acceptance/__init__.py +++ /dev/null @@ -1 +0,0 @@ - diff --git a/tests/integration/__init__.py b/tests/integration/__init__.py deleted file mode 100644 index 8b13789..0000000 --- a/tests/integration/__init__.py +++ /dev/null @@ -1 +0,0 @@ - diff --git a/tests/unit/__init__.py b/tests/unit/__init__.py deleted file mode 100644 index 8b13789..0000000 --- a/tests/unit/__init__.py +++ /dev/null @@ -1 +0,0 @@ - diff --git a/tests/unit/test_example.py b/tests/unit/test_example.py deleted file mode 100644 index 8319b63..0000000 --- a/tests/unit/test_example.py +++ /dev/null @@ -1,58 +0,0 @@ -from test_helpers import bases -from test_helpers.compat import mock -from test_helpers.mixins import patch_mixin - - -class App(object): - """Quick example app for testing.""" - - def first(self, fail): - """Example method under test.""" - self.do_db_lookup('random') - if fail: - raise AttributeError - - def do_db_lookup(self, name): - """Method that reaches out to a database.""" - raise NotImplementedError - - -class _BaseFirstTestCase(patch_mixin.PatchMixin, bases.BaseTest): - """Example base test showing current test style.""" - - patch_prefix = 'tests.unit.test_example' - - @classmethod - def configure(cls): - cls.app = App() - - cls.do_db_lookup = cls.create_patch('App.do_db_lookup') - - @classmethod - def execute(cls): - try: - cls.app.first(cls.fail) - except AttributeError as exc: - cls.exception = exc - - def should_do_db_lookup(self): - self.do_db_lookup.assert_called_once_with(mock.ANY) - - -class WhenFirstAppSuccessful(_BaseFirstTestCase): - - @classmethod - def configure(cls): - cls.fail = False - super(WhenFirstAppSuccessful, cls).configure() - - -class WhenFirstAppFails(_BaseFirstTestCase): - - @classmethod - def configure(cls): - cls.fail = True - super(WhenFirstAppFails, cls).configure() - - def should_raise_AttributeError(self): - self.assertIsInstance(self.exception, AttributeError) diff --git a/tox.ini b/tox.ini index 3a25764..968d11c 100644 --- a/tox.ini +++ b/tox.ini @@ -1,16 +1,12 @@ [tox] -envlist = py26,py27,py32,py33,py34 +envlist = py27,py33,py34 [testenv] commands = nosetests [] -deps = -rtest-requirements.txt +deps = -rrequirements.txt + -rtest-requirements.txt [testenv:py27] deps = mock>=1.0.1,<2.0 {[testenv]deps} - -[testenv:py26] -deps = - unittest2==0.5.1 - {[testenv:py27]deps}