Add ease of use imports and add integration tests

- Import in psycopg2 classes into the ``sprockets.clients.postgresql`` namespace to make it easier to work with
- Add integration tests that connect to a local postgresql instance if it's running
- Update documentation a bit
This commit is contained in:
Gavin M. Roy 2014-09-03 11:06:11 -04:00
parent 5c482052ce
commit f626d052c5
13 changed files with 196 additions and 55 deletions

View file

@ -9,11 +9,14 @@ python:
- 3.3
- 3.4
install:
- if [[ $TRAVIS_PYTHON_VERSION == '2.6' ]]; then pip install unittest2; fi
- pip install -r requirements.txt
- pip install -r test-requirements.txt
- pip install -e .
script: nosetests
after_success:
- coveralls
addons:
postgresql: "9.3"
deploy:
provider: pypi
user: sprockets

View file

@ -1,7 +1,2 @@
include LICENSE
include README.rst
include *requirements.txt
graft docs
graft tests.py
global-exclude __pycache__
global-exclude *.pyc

View file

@ -1,6 +1,3 @@
-r requirements.txt
-r test-requirements.txt
flake8>=2.1,<3
sphinx>=1.2,<2
sphinx-rtd-theme>=0.1,<1.0
sphinxcontrib-httpdomain>=1.2,<2
sphinxcontrib-httpdomain>=1.2,<2

View file

@ -1,3 +1,12 @@
.. automodule:: sprockets.clients.postgresql
:members:
:inherited-members:
Session Classes
---------------
.. autoclass:: sprockets.clients.postgresql.Session
:members:
:inherited-members:
.. autoclass:: sprockets.clients.postgresql.TornadoSession
:members:
:inherited-members:

View file

@ -23,9 +23,8 @@ pygments_style = 'sphinx'
html_theme = 'sphinx_rtd_theme'
html_theme_path = [sphinx_rtd_theme.get_html_theme_path()]
intersphinx_mapping = {
'python': ('https://docs.python.org/', None),
'python': ('https://docs.python.org/2/', None),
'queries': ('https://queries.readthedocs.org/en/latest/', None),
'requests': ('https://requests.readthedocs.org/en/latest/', None),
'sprockets': ('https://sprockets.readthedocs.org/en/latest/', None),
'tornado': ('http://www.tornadoweb.org/en/stable/', None),
}

View file

@ -17,3 +17,31 @@ PostgreSQL on localhost to the ``postgres`` database and issues a query.
session = postgresql.Session('postgres')
result = session.query('SELECT 1')
print(repr(result))
The following example shows how to use the :py:class:`TornadoSession <sprockets.clients.postgresql.TornadoSession>`
class in a Tornado :py:class:`RequestHandler <tornado.web.RequestHandler>`.
.. code:: python
import os
from tornado import gen
from sprockets.clients import postgresql
from tornado import web
os.environ['POSTGRES_HOST'] = 'localhost'
os.environ['POSTGRES_USER'] = 'postgres'
os.environ['POSTGRES_PORT'] = 5432
os.environ['POSTGRES_DBNAME'] = 'postgres'
class RequestHandler(web.RequestHandler):
def initialize(self):
self.session = postgresql.TornadoSession('postgres')
@gen.coroutine
def get(self, *args, **kwargs):
result = yield self.session.query('SELECT 1')
self.write(result.as_dict())
result.free()

View file

@ -1,4 +1,7 @@
Version History
---------------
- 1.0.1 [2014-09-03]
- Expose psycopg2/queries exceptions, objects, etc from ``sprockets.queries.postgresql``
- Add integration testing with PostgreSQL
- 1.0.0 [2014-08-29]
- Initial release

View file

@ -1,3 +1,2 @@
queries>=1.2.1
sprockets>=0.1.1
tornado>=4.0.1
queries>=1.4.0
tornado>=4.0.1

View file

@ -7,6 +7,3 @@ build-dir = build/docs
with-coverage = 1
cover-package = sprockets.clients.postgresql
verbose = 1
[flake8]
exclude = build,dist,docs,env

View file

@ -1,32 +1,7 @@
import codecs
import sys
import setuptools
def read_requirements_file(req_name):
requirements = []
try:
with codecs.open(req_name, encoding='utf-8') as req_file:
for req_line in req_file:
if '#' in req_line:
req_line = req_line[0:req_line.find('#')].strip()
if req_line:
requirements.append(req_line.strip())
except IOError:
pass
return requirements
install_requires = read_requirements_file('requirements.txt')
setup_requires = read_requirements_file('setup-requirements.txt')
tests_require = read_requirements_file('test-requirements.txt')
if sys.version_info < (2, 7):
tests_require.append('unittest2')
if sys.version_info < (3, 0):
tests_require.append('mock')
setuptools.setup(
name='sprockets.clients.postgresql',
version='1.0.0',
@ -60,10 +35,6 @@ setuptools.setup(
'sprockets.clients.postgresql'],
package_data={'': ['LICENSE', 'README.rst']},
include_package_data=True,
namespace_packages=['sprockets',
'sprockets.clients'],
install_requires=install_requires,
setup_requires=setup_requires,
tests_require=tests_require,
test_suite='nose.collector',
namespace_packages=['sprockets', 'sprockets.clients'],
install_requires=['queries'],
zip_safe=False)

View file

@ -1,6 +1,6 @@
"""
PostgreSQL Session Classes
==========================
PostgreSQL Session API
======================
The Session classes wrap the Queries :py:class:`Session <queries.Session>` and
:py:class:`TornadoSession <queries.tornado_session.TornadoSession>` classes
providing environment variable based configuration.
@ -30,7 +30,7 @@ The uri ``postgresql://bar:baz@foodb:6000/foo`` will be used when creating the
instance of :py:class:`queries.Session`.
"""
version_info = (1, 0, 0)
version_info = (1, 0, 1)
__version__ = '.'.join(str(v) for v in version_info)
import logging
@ -45,6 +45,26 @@ _ARGUMENTS = ['host', 'port', 'dbname', 'user', 'password']
LOGGER = logging.getLogger(__name__)
# For ease of access to different cursor types
from queries import DictCursor
from queries import NamedTupleCursor
from queries import RealDictCursor
from queries import LoggingCursor
from queries import MinTimeLoggingCursor
# Expose exceptions so clients do not need to import queries as well
from queries import DataError
from queries import DatabaseError
from queries import IntegrityError
from queries import InterfaceError
from queries import InternalError
from queries import NotSupportedError
from queries import OperationalError
from queries import ProgrammingError
from queries import QueryCanceledError
from queries import TransactionRollbackError
def _get_uri(dbname):
"""Construct the URI for connecting to PostgreSQL by appending each
argument name to the dbname, delimited by an underscore and

View file

@ -1,3 +1,4 @@
coverage>=3.7,<4
coveralls>=0.4,<1
nose>=1.3,<2
nose>=1.3,<2
mock==1.0.1

119
tests.py
View file

@ -2,6 +2,7 @@
Tests for the sprockets.clients.postgresql package
"""
import datetime
import mock
import os
try:
@ -9,11 +10,18 @@ try:
except ImportError:
import unittest
from tornado import gen
from sprockets.clients import postgresql
import queries
from tornado import testing
class TestGetURI(unittest.TestCase):
def tearDown(self):
for key in ['HOST', 'PORT', 'DBNAME', 'USER', 'PASSWORD']:
del os.environ['TEST1_%s' % key]
def test_get_uri_returns_proper_values(self):
os.environ['TEST1_HOST'] = 'test1-host'
@ -38,6 +46,10 @@ class TestSession(unittest.TestCase):
os.environ['TEST2_PASSWORD'] = 'baz'
self.session = postgresql.Session('test2')
def tearDown(self):
for key in ['HOST', 'PORT', 'DBNAME', 'USER', 'PASSWORD']:
del os.environ['TEST2_%s' % key]
def test_session_invokes_queries_session(self):
self.assertTrue(self.mock_init.called)
@ -54,5 +66,112 @@ class TestTornadoSession(unittest.TestCase):
os.environ['TEST3_PASSWORD'] = 'baz'
self.session = postgresql.TornadoSession('test3')
def tearDown(self):
for key in ['HOST', 'PORT', 'DBNAME', 'USER', 'PASSWORD']:
del os.environ['TEST3_%s' % key]
def test_session_invokes_queries_session(self):
self.assertTrue(self.mock_init.called)
class SessionIntegrationTests(unittest.TestCase):
def setUp(self):
os.environ['TEST4_HOST'] = 'localhost'
os.environ['TEST4_PORT'] = '5432'
os.environ['TEST4_DBNAME'] = 'postgres'
os.environ['TEST4_USER'] = 'postgres'
try:
self.session = postgresql.Session('test', pool_max_size=10)
except postgresql.OperationalError as error:
raise unittest.SkipTest(str(error).split('\n')[0])
def tearDown(self):
for key in ['HOST', 'PORT', 'DBNAME', 'USER']:
del os.environ['TEST4_%s' % key]
def test_query_returns_results_object(self):
self.assertIsInstance(self.session.query('SELECT 1 AS value'),
queries.Results)
def test_query_result_value(self):
result = self.session.query('SELECT 1 AS value')
self.assertDictEqual(result.as_dict(), {'value': 1})
def test_query_multirow_result_has_at_least_three_rows(self):
result = self.session.query('SELECT * FROM pg_stat_database')
self.assertGreaterEqual(result.count(), 3)
def test_callproc_returns_results_object(self):
timestamp = int(datetime.datetime.now().strftime('%s'))
self.assertIsInstance(self.session.callproc('to_timestamp',
[timestamp]),
queries.Results)
def test_callproc_mod_result_value(self):
result = self.session.callproc('mod', [6, 4])
self.assertEqual(6 % 4, result[0]['mod'])
class TornadoSessionIntegrationTests(testing.AsyncTestCase):
def setUp(self):
super(TornadoSessionIntegrationTests, self).setUp()
os.environ['TEST5_HOST'] = 'localhost'
os.environ['TEST5_PORT'] = '5432'
os.environ['TEST5_DBNAME'] = 'postgres'
os.environ['TEST5_USER'] = 'postgres'
self.session = postgresql.TornadoSession('test',
pool_max_size=10,
io_loop=self.io_loop)
#def tearDown(self):
# for key in ['HOST', 'PORT', 'DBNAME', 'USER']:
# del os.environ['TEST5_%s' % key]
@testing.gen_test
def test_query_returns_results_object(self):
try:
result = yield self.session.query('SELECT 1 AS value')
except postgresql.OperationalError:
raise unittest.SkipTest('PostgreSQL is not running')
self.assertIsInstance(result, queries.Results)
result.free()
@testing.gen_test
def test_query_result_value(self):
try:
result = yield self.session.query('SELECT 1 AS value')
except postgresql.OperationalError:
raise unittest.SkipTest('PostgreSQL is not running')
self.assertDictEqual(result.as_dict(), {'value': 1})
result.free()
@testing.gen_test
def test_query_multirow_result_has_at_least_three_rows(self):
try:
result = yield self.session.query('SELECT * FROM pg_stat_database')
except postgresql.OperationalError:
raise unittest.SkipTest('PostgreSQL is not running')
self.assertGreaterEqual(result.count(), 3)
result.free()
@testing.gen_test
def test_callproc_returns_results_object(self):
timestamp = int(datetime.datetime.now().strftime('%s'))
try:
result = yield self.session.callproc('to_timestamp', [timestamp])
except postgresql.OperationalError:
raise unittest.SkipTest('PostgreSQL is not running')
self.assertIsInstance(result, queries.Results)
result.free()
@testing.gen_test
def test_callproc_mod_result_value(self):
try:
result = yield self.session.callproc('mod', [6, 4])
except postgresql.OperationalError:
raise unittest.SkipTest('PostgreSQL is not running')
self.assertEqual(6 % 4, result[0]['mod'])
result.free()