Add docstrings, fix edge cases

This commit is contained in:
Gavin M. Roy 2020-04-08 18:43:51 -04:00
parent fb816ddca7
commit d057b140ae
3 changed files with 340 additions and 42 deletions

View file

@ -72,6 +72,5 @@ output = build/coverage.xml
[flake8]
application-import-names = sprockets_postgres,tests
exclude = build,docs,env
ignore = W503
import-order-style = pycharm
rst-roles = attr,class,const,data,exc,func,meth,mod,obj,ref,yields

View file

@ -25,26 +25,63 @@ DEFAULT_POSTGRES_QUERY_TIMEOUT = 120
DEFAULT_POSTGRES_UUID = 'TRUE'
QueryParameters = typing.Union[dict, list, tuple, None]
"""Type annotation for query parameters"""
Timeout = typing.Union[int, float, None]
"""Type annotation for timeout values"""
@dataclasses.dataclass
class QueryResult:
"""A :func:`Data Class <dataclasses.dataclass>` that is generated as a
result of each query that is executed.
:param row_count: The quantity of rows impacted by the query
:param row: If a single row is returned, the data for that row
:param rows: If more than one row is returned, this attribute is set as the
list of rows, in order.
"""
row_count: int
row: typing.Optional[dict]
rows: typing.Optional[typing.List[dict]]
class PostgresConnector:
"""Wraps a :class:`aiopg.Cursor` instance for creating explicit
transactions, calling stored procedures, and executing queries.
Unless the :meth:`~sprockets_postgres.PostgresConnector.transaction`
asynchronous :ref:`context-manager <python:typecontextmanager>` is used,
each call to :meth:`~sprockets_postgres.PostgresConnector.callproc` and
:meth:`~sprockets_postgres.PostgresConnector.execute` is an explicit
transaction.
.. note:: :class:`PostgresConnector` instances are created by
:meth:`ApplicationMixin.postgres_connector
<sprockets_postgres.ApplicationMixin.postgres_connector>` and should
not be created directly.
:param cursor: The cursor to use in the connector
:type cursor: aiopg.Cursor
:param on_error: The callback to invoke when an exception is caught
:param on_duration: The callback to invoke when a query is complete and all
of the data has been returned.
:param timeout: A timeout value in seconds for executing queries. If
unspecified, defaults to the ``POSTGRES_QUERY_TIMEOUT`` environment
variable and if that is not specified, to the
:const:`DEFAULT_POSTGRES_QUERY_TIMEOUT` value of ``120``
:type timeout: :data:`~sprockets_postgres.Timeout`
"""
def __init__(self,
cursor: aiopg.Cursor,
on_error: typing.Callable,
record_duration: typing.Optional[typing.Callable] = None,
on_duration: typing.Optional[typing.Callable] = None,
timeout: Timeout = None):
self.cursor = cursor
self._on_error = on_error
self._record_duration = record_duration
self._on_duration = on_duration
self._timeout = timeout or int(
os.environ.get(
'POSTGRES_QUERY_TIMEOUT',
@ -56,6 +93,27 @@ class PostgresConnector:
metric_name: str = '',
*,
timeout: Timeout = None) -> QueryResult:
"""Execute a stored procedure / function
:param name: The stored procedure / function name to call
:param parameters: Query parameters to pass when calling
:type parameters: :data:`~sprockets_postgres.QueryParameters`
:param metric_name: The metric name for duration recording and logging
:param timeout: Timeout value to override the default or the value
specified when creating the
:class:`~sprockets_postgres.PostgresConnector`.
:type timeout: :data:`~sprockets_postgres.Timeout`
:raises asyncio.TimeoutError: when there is a query or network timeout
:raises psycopg2.Error: when there is an exception raised by Postgres
.. note: :exc:`psycopg2.Error` is the base exception for all
:mod:`psycopg2` exceptions and the actual exception raised will
likely be more specific.
:rtype: :class:`~sprockets_postgres.QueryResult`
"""
return await self._query(
self.cursor.callproc,
metric_name,
@ -69,6 +127,32 @@ class PostgresConnector:
metric_name: str = '',
*,
timeout: Timeout = None) -> QueryResult:
"""Execute a query, specifying a name for the query, the SQL statement,
and optional positional arguments to pass in with the query.
Parameters may be provided as sequence or mapping and will be
bound to variables in the operation. Variables are specified
either with positional ``%s`` or named ``%({name})s`` placeholders.
:param sql: The SQL statement to execute
:param parameters: Query parameters to pass as part of the execution
:type parameters: :data:`~sprockets_postgres.QueryParameters`
:param metric_name: The metric name for duration recording and logging
:param timeout: Timeout value to override the default or the value
specified when creating the
:class:`~sprockets_postgres.PostgresConnector`.
:type timeout: :data:`~sprockets_postgres.Timeout`
:raises asyncio.TimeoutError: when there is a query or network timeout
:raises psycopg2.Error: when there is an exception raised by Postgres
.. note: :exc:`psycopg2.Error` is the base exception for all
:mod:`psycopg2` exceptions and the actual exception raised will
likely be more specific.
:rtype: :class:`~sprockets_postgres.QueryResult`
"""
return await self._query(
self.cursor.execute,
metric_name,
@ -79,6 +163,42 @@ class PostgresConnector:
@contextlib.asynccontextmanager
async def transaction(self) \
-> typing.AsyncContextManager['PostgresConnector']:
"""asynchronous :ref:`context-manager <python:typecontextmanager>`
function that implements full ``BEGIN``, ``COMMIT``, and ``ROLLBACK``
semantics. If there is a :exc:`psycopg2.Error` raised during the
transaction, the entire transaction will be rolled back.
If no exception is raised, the transaction will be committed when
exiting the context manager.
.. note:: This method is provided for edge case usage. As a
generalization
:meth:`sprockets_postgres.RequestHandlerMixin.postgres_transaction`
should be used instead.
*Usage Example*
.. code-block::
class RequestHandler(sprockets_postgres.RequestHandlerMixin,
web.RequestHandler):
async def post(self):
async with self.postgres_transaction() as transaction:
result1 = await transaction.execute(QUERY_ONE)
result2 = await transaction.execute(QUERY_TWO)
result3 = await transaction.execute(QUERY_THREE)
:raises asyncio.TimeoutError: when there is a query or network timeout
when starting the transaction
:raises psycopg2.Error: when there is an exception raised by Postgres
when starting the transaction
.. note: :exc:`psycopg2.Error` is the base exception for all
:mod:`psycopg2` exceptions and the actual exception raised will
likely be more specific.
"""
async with self.cursor.begin():
yield self
@ -96,10 +216,11 @@ class PostgresConnector:
if exc:
raise exc
else:
if self._record_duration:
self._record_duration(
results = await self._query_results()
if self._on_duration:
self._on_duration(
metric_name, time.monotonic() - start_time)
return await self._query_results()
return results
async def _query_results(self) -> QueryResult:
count, row, rows = self.cursor.rowcount, None, None
@ -122,12 +243,13 @@ class ConnectionException(Exception):
class ApplicationMixin:
"""
:class:`sprockets.http.app.Application` mixin for handling the connection
to Postgres and exporting functions for querying the database,
getting the status, and proving a cursor.
:class:`sprockets.http.app.Application` / :class:`tornado.web.Application`
mixin for handling the connection to Postgres and exporting functions for
querying the database, getting the status, and proving a cursor.
Automatically creates and shuts down :class:`aio.pool.Pool` on startup
and shutdown.
Automatically creates and shuts down :class:`aiopg.Pool` on startup
and shutdown by installing `on_start` and `shutdown` callbacks into the
:class:`~sprockets.http.app.Application` instance.
"""
POSTGRES_STATUS_TIMEOUT = 3
@ -141,24 +263,77 @@ class ApplicationMixin:
@contextlib.asynccontextmanager
async def postgres_connector(self,
on_error: typing.Callable,
record_duration: typing.Optional[
on_duration: typing.Optional[
typing.Callable] = None,
timeout: Timeout = None) \
-> typing.AsyncContextManager[PostgresConnector]:
"""Asynchronous :ref:`context-manager <python:typecontextmanager>`
that returns a :class:`~sprockets_postgres.PostgresConnector` instance
from the connection pool with a cursor.
.. note:: This function is designed to work in conjunction with the
:class:`~sprockets_postgres.RequestHandlerMixin` and is generally
not invoked directly.
:param on_error: A callback function that is invoked on exception. If
an exception is returned from that function, it will raise it.
:param on_duration: An optional callback function that is invoked after
a query has completed to record the duration that encompasses
both executing the query and retrieving the returned records, if
any.
:param timeout: Used to override the default query timeout.
:type timeout: :data:`~sprockets_postgres.Timeout`
:raises asyncio.TimeoutError: when the request to retrieve a connection
from the pool times out.
:raises sprockets_postgres.ConnectionException: when the application
can not connect to the configured Postgres instance.
:raises psycopg2.Error: when Postgres raises an exception during the
creation of the cursor.
.. note: :exc:`psycopg2.Error` is the base exception for all
:mod:`psycopg2` exceptions and the actual exception raised will
likely be more specific.
"""
try:
async with self._postgres_pool.acquire() as conn:
async with conn.cursor(
cursor_factory=extras.RealDictCursor,
timeout=timeout) as cursor:
yield PostgresConnector(
cursor, on_error, record_duration, timeout)
cursor, on_error, on_duration, timeout)
except (asyncio.TimeoutError, psycopg2.Error) as err:
on_error('postgres_connector', ConnectionException(str(err)))
exc = on_error('postgres_connector', ConnectionException(str(err)))
if exc:
raise exc
else: # postgres_status.on_error does not return an exception
yield None
async def postgres_status(self) -> dict:
"""Invoke from the ``/status`` RequestHandler to check that there is
a Postgres connection handler available and return info about the
pool.
a Postgres connection handler available and return info about the
pool.
The ``available`` item in the dictionary indicates that the
application was able to perform a ``SELECT 1`` against the database
using a :class:`~sprockets_postgres.PostgresConnector` instance.
The ``pool_size`` item indicates the current quantity of open
connections to Postgres.
The ``pool_free`` item indicates the current number of idle
connections available to process queries.
*Example return value*
.. code-block:: python
{
'available': True,
'pool_size': 10,
'pool_free': 8
}
"""
query_error = asyncio.Event()
@ -170,7 +345,9 @@ class ApplicationMixin:
async with self.postgres_connector(
on_error,
timeout=self.POSTGRES_STATUS_TIMEOUT) as connector:
await connector.execute('SELECT 1')
if connector:
await connector.execute('SELECT 1')
return {
'available': not query_error.is_set(),
'pool_size': self._postgres_pool.size,
@ -229,15 +406,17 @@ class ApplicationMixin:
This is invoked by the Application shutdown callback mechanism.
"""
self._postgres_pool.close()
await self._postgres_pool.wait_closed()
if self._postgres_pool is not None:
self._postgres_pool.close()
await self._postgres_pool.wait_closed()
class RequestHandlerMixin:
"""
RequestHandler mixin class exposing functions for querying the database,
recording the duration to either `sprockets-influxdb` or
`sprockets.mixins.metrics`, and handling exceptions.
A RequestHandler mixin class exposing functions for querying the database,
recording the duration to either :mod:`sprockets-influxdb
<sprockets_influxdb>` or :mod:`sprockets.mixins.metrics`, and
handling exceptions.
"""
async def postgres_callproc(self,
@ -246,6 +425,27 @@ class RequestHandlerMixin:
metric_name: str = '',
*,
timeout: Timeout = None) -> QueryResult:
"""Execute a stored procedure / function
:param name: The stored procedure / function name to call
:param parameters: Query parameters to pass when calling
:type parameters: :data:`~sprockets_postgres.QueryParameters`
:param metric_name: The metric name for duration recording and logging
:param timeout: Timeout value to override the default or the value
specified when creating the
:class:`~sprockets_postgres.PostgresConnector`.
:type timeout: :data:`~sprockets_postgres.Timeout`
:raises asyncio.TimeoutError: when there is a query or network timeout
:raises psycopg2.Error: when there is an exception raised by Postgres
.. note: :exc:`psycopg2.Error` is the base exception for all
:mod:`psycopg2` exceptions and the actual exception raised will
likely be more specific.
:rtype: :class:`~sprockets_postgres.QueryResult`
"""
async with self.application.postgres_connector(
self._on_postgres_error,
self._on_postgres_timing,
@ -266,6 +466,24 @@ class RequestHandlerMixin:
bound to variables in the operation. Variables are specified
either with positional ``%s`` or named ``%({name})s`` placeholders.
:param sql: The SQL statement to execute
:param parameters: Query parameters to pass as part of the execution
:type parameters: :data:`~sprockets_postgres.QueryParameters`
:param metric_name: The metric name for duration recording and logging
:param timeout: Timeout value to override the default or the value
specified when creating the
:class:`~sprockets_postgres.PostgresConnector`.
:type timeout: :data:`~sprockets_postgres.Timeout`
:raises asyncio.TimeoutError: when there is a query or network timeout
:raises psycopg2.Error: when there is an exception raised by Postgres
.. note: :exc:`psycopg2.Error` is the base exception for all
:mod:`psycopg2` exceptions and the actual exception raised will
likely be more specific.
:rtype: :class:`~sprockets_postgres.QueryResult`
"""
async with self.application.postgres_connector(
self._on_postgres_error,
@ -277,8 +495,41 @@ class RequestHandlerMixin:
@contextlib.asynccontextmanager
async def postgres_transaction(self, timeout: Timeout = None) \
-> typing.AsyncContextManager[PostgresConnector]:
"""Yields a :class:`PostgresConnector` instance in a transaction.
Will automatically commit or rollback based upon exception.
"""asynchronous :ref:`context-manager <python:typecontextmanager>`
function that implements full ``BEGIN``, ``COMMIT``, and ``ROLLBACK``
semantics. If there is a :exc:`psycopg2.Error` raised during the
transaction, the entire transaction will be rolled back.
If no exception is raised, the transaction will be committed when
exiting the context manager.
*Usage Example*
.. code-block:: python
class RequestHandler(sprockets_postgres.RequestHandlerMixin,
web.RequestHandler):
async def post(self):
async with self.postgres_transaction() as transaction:
result1 = await transaction.execute(QUERY_ONE)
result2 = await transaction.execute(QUERY_TWO)
result3 = await transaction.execute(QUERY_THREE)
:param timeout: Timeout value to override the default or the value
specified when creating the
:class:`~sprockets_postgres.PostgresConnector`.
:type timeout: :data:`~sprockets_postgres.Timeout`
:raises asyncio.TimeoutError: when there is a query or network timeout
when starting the transaction
:raises psycopg2.Error: when there is an exception raised by Postgres
when starting the transaction
.. note: :exc:`psycopg2.Error` is the base exception for all
:mod:`psycopg2` exceptions and the actual exception raised will
likely be more specific.
"""
async with self.application.postgres_connector(
@ -291,7 +542,12 @@ class RequestHandlerMixin:
def _on_postgres_error(self,
metric_name: str,
exc: Exception) -> typing.Optional[Exception]:
"""Override for different error handling behaviors"""
"""Override for different error handling behaviors
Return an exception if you would like for it to be raised, or swallow
it here.
"""
LOGGER.error('%s in %s for %s (%s)',
exc.__class__.__name__, self.__class__.__name__,
metric_name, str(exc).split('\n')[0])
@ -308,7 +564,18 @@ class RequestHandlerMixin:
def _on_postgres_timing(self,
metric_name: str,
duration: float) -> None:
"""Override for custom metric recording"""
"""Override for custom metric recording. As a default behavior it will
attempt to detect `sprockets-influxdb
<https://sprockets-influxdb.readthedocs.io/>`_ and
`sprockets.mixins.metrics
<https://sprocketsmixinsmetrics.readthedocs.io/en/latest/>`_ and
record the metrics using them if they are available. If they are not
available, it will record the query duration to the `DEBUG` log.
:param metric_name: The name of the metric to record
:param duration: The duration to record for the metric
"""
if hasattr(self, 'influxdb'): # sprockets-influxdb
self.influxdb.set_field(metric_name, duration)
elif hasattr(self, 'record_timing'): # sprockets.mixins.metrics

View file

@ -2,13 +2,14 @@ import asyncio
import json
import os
import typing
import unittest
import uuid
from unittest import mock
import psycopg2
from psycopg2 import errors
from sprockets.http import app, testing
from tornado import web
from tornado import ioloop, web
import sprockets_postgres
@ -63,6 +64,16 @@ class ErrorRequestHandler(RequestHandler):
return RuntimeError()
class ErrorPassthroughRequestHandler(RequestHandler):
async def get(self):
exc = self._on_postgres_error('test', RuntimeError())
if isinstance(exc, RuntimeError):
self.set_status(204)
else:
raise web.HTTPError(500, 'Did not pass through')
class ExecuteRequestHandler(RequestHandler):
GET_SQL = 'SELECT %s::TEXT AS value;'
@ -118,6 +129,14 @@ class MultiRowRequestHandler(RequestHandler):
'rows': self.cast_data(result.rows)})
class NoErrorRequestHandler(ErrorRequestHandler):
def _on_postgres_error(self,
metric_name: str,
exc: Exception) -> typing.Optional[Exception]:
return None
class NoRowRequestHandler(RequestHandler):
GET_SQL = """\
@ -210,10 +229,12 @@ class TestCase(testing.SprocketsHttpTestCase):
web.url('/callproc', CallprocRequestHandler),
web.url('/count', CountRequestHandler),
web.url('/error', ErrorRequestHandler),
web.url('/error-passthrough', ErrorPassthroughRequestHandler),
web.url('/execute', ExecuteRequestHandler),
web.url('/influxdb', InfluxDBRequestHandler),
web.url('/metrics-mixin', MetricsMixinRequestHandler),
web.url('/multi-row', MultiRowRequestHandler),
web.url('/no-error', NoErrorRequestHandler),
web.url('/no-row', NoRowRequestHandler),
web.url('/status', StatusRequestHandler),
web.url('/transaction', TransactionRequestHandler),
@ -231,6 +252,13 @@ class RequestHandlerMixinTestCase(TestCase):
self.assertGreaterEqual(data['pool_size'], 1)
self.assertGreaterEqual(data['pool_free'], 1)
@mock.patch('aiopg.pool.Pool.acquire')
def test_postgres_status_connect_error(self, acquire):
acquire.side_effect = asyncio.TimeoutError()
response = self.fetch('/status')
self.assertEqual(response.code, 503)
self.assertFalse(json.loads(response.body)['available'])
@mock.patch('aiopg.cursor.Cursor.execute')
def test_postgres_status_error(self, execute):
execute.side_effect = asyncio.TimeoutError()
@ -245,12 +273,23 @@ class RequestHandlerMixinTestCase(TestCase):
uuid.UUID(json.loads(response.body)['value']), uuid.UUID)
@mock.patch('aiopg.cursor.Cursor.execute')
def test_postgres_error_passthrough(self, execute):
def test_postgres_error(self, execute):
execute.side_effect = asyncio.TimeoutError
response = self.fetch('/error')
self.assertEqual(response.code, 500)
self.assertIn(b'Internal Server Error', response.body)
@mock.patch('aiopg.pool.Pool.acquire')
def test_postgres_error_on_connect(self, acquire):
acquire.side_effect = asyncio.TimeoutError
response = self.fetch('/error')
self.assertEqual(response.code, 500)
self.assertIn(b'Internal Server Error', response.body)
def test_postgres_error_passthrough(self):
response = self.fetch('/error-passthrough')
self.assertEqual(response.code, 204)
def test_postgres_execute(self):
expectation = str(uuid.uuid4())
response = self.fetch('/execute?value={}'.format(expectation))
@ -369,8 +408,7 @@ class TransactionTestCase(TestCase):
self.assertEqual(record['last_updated_at'], last_updated)
"""
class MissingURLTestCase(testing.SprocketsHttpTestCase):
class MissingURLTestCase(unittest.TestCase):
@classmethod
def setUpClass(cls):
@ -384,16 +422,10 @@ class MissingURLTestCase(testing.SprocketsHttpTestCase):
if 'POSTGRES_URL' in os.environ:
del os.environ['POSTGRES_URL']
def setUp(self):
self.stop_mock = None
super().setUp()
def get_app(self):
self.app = Application()
self.stop_mock = mock.Mock(
wraps=self.app.stop, side_effect=RuntimeError)
return self.app
def test_that_stop_is_invoked(self):
self.stop_mock.assert_called_once_with(self.io_loop)
"""
io_loop = ioloop.IOLoop.current()
obj = Application()
obj.stop = mock.Mock(wraps=obj.stop)
obj.start(io_loop)
io_loop.start()
obj.stop.assert_called_once()