From d057b140ae935a4584d203c6eac5666c2c76b096 Mon Sep 17 00:00:00 2001 From: "Gavin M. Roy" Date: Wed, 8 Apr 2020 18:43:51 -0400 Subject: [PATCH] Add docstrings, fix edge cases --- setup.cfg | 1 - sprockets_postgres.py | 317 ++++++++++++++++++++++++++++++++++++++---- tests.py | 64 ++++++--- 3 files changed, 340 insertions(+), 42 deletions(-) diff --git a/setup.cfg b/setup.cfg index 629b3d0..5000020 100644 --- a/setup.cfg +++ b/setup.cfg @@ -72,6 +72,5 @@ output = build/coverage.xml [flake8] application-import-names = sprockets_postgres,tests exclude = build,docs,env -ignore = W503 import-order-style = pycharm rst-roles = attr,class,const,data,exc,func,meth,mod,obj,ref,yields diff --git a/sprockets_postgres.py b/sprockets_postgres.py index 9651750..6747e28 100644 --- a/sprockets_postgres.py +++ b/sprockets_postgres.py @@ -25,26 +25,63 @@ DEFAULT_POSTGRES_QUERY_TIMEOUT = 120 DEFAULT_POSTGRES_UUID = 'TRUE' QueryParameters = typing.Union[dict, list, tuple, None] +"""Type annotation for query parameters""" + Timeout = typing.Union[int, float, None] +"""Type annotation for timeout values""" @dataclasses.dataclass class QueryResult: + """A :func:`Data Class ` that is generated as a + result of each query that is executed. + + :param row_count: The quantity of rows impacted by the query + :param row: If a single row is returned, the data for that row + :param rows: If more than one row is returned, this attribute is set as the + list of rows, in order. + + """ row_count: int row: typing.Optional[dict] rows: typing.Optional[typing.List[dict]] class PostgresConnector: + """Wraps a :class:`aiopg.Cursor` instance for creating explicit + transactions, calling stored procedures, and executing queries. + Unless the :meth:`~sprockets_postgres.PostgresConnector.transaction` + asynchronous :ref:`context-manager ` is used, + each call to :meth:`~sprockets_postgres.PostgresConnector.callproc` and + :meth:`~sprockets_postgres.PostgresConnector.execute` is an explicit + transaction. + + .. note:: :class:`PostgresConnector` instances are created by + :meth:`ApplicationMixin.postgres_connector + ` and should + not be created directly. + + :param cursor: The cursor to use in the connector + :type cursor: aiopg.Cursor + :param on_error: The callback to invoke when an exception is caught + :param on_duration: The callback to invoke when a query is complete and all + of the data has been returned. + :param timeout: A timeout value in seconds for executing queries. If + unspecified, defaults to the ``POSTGRES_QUERY_TIMEOUT`` environment + variable and if that is not specified, to the + :const:`DEFAULT_POSTGRES_QUERY_TIMEOUT` value of ``120`` + :type timeout: :data:`~sprockets_postgres.Timeout` + + """ def __init__(self, cursor: aiopg.Cursor, on_error: typing.Callable, - record_duration: typing.Optional[typing.Callable] = None, + on_duration: typing.Optional[typing.Callable] = None, timeout: Timeout = None): self.cursor = cursor self._on_error = on_error - self._record_duration = record_duration + self._on_duration = on_duration self._timeout = timeout or int( os.environ.get( 'POSTGRES_QUERY_TIMEOUT', @@ -56,6 +93,27 @@ class PostgresConnector: metric_name: str = '', *, timeout: Timeout = None) -> QueryResult: + """Execute a stored procedure / function + + :param name: The stored procedure / function name to call + :param parameters: Query parameters to pass when calling + :type parameters: :data:`~sprockets_postgres.QueryParameters` + :param metric_name: The metric name for duration recording and logging + :param timeout: Timeout value to override the default or the value + specified when creating the + :class:`~sprockets_postgres.PostgresConnector`. + :type timeout: :data:`~sprockets_postgres.Timeout` + + :raises asyncio.TimeoutError: when there is a query or network timeout + :raises psycopg2.Error: when there is an exception raised by Postgres + + .. note: :exc:`psycopg2.Error` is the base exception for all + :mod:`psycopg2` exceptions and the actual exception raised will + likely be more specific. + + :rtype: :class:`~sprockets_postgres.QueryResult` + + """ return await self._query( self.cursor.callproc, metric_name, @@ -69,6 +127,32 @@ class PostgresConnector: metric_name: str = '', *, timeout: Timeout = None) -> QueryResult: + """Execute a query, specifying a name for the query, the SQL statement, + and optional positional arguments to pass in with the query. + + Parameters may be provided as sequence or mapping and will be + bound to variables in the operation. Variables are specified + either with positional ``%s`` or named ``%({name})s`` placeholders. + + :param sql: The SQL statement to execute + :param parameters: Query parameters to pass as part of the execution + :type parameters: :data:`~sprockets_postgres.QueryParameters` + :param metric_name: The metric name for duration recording and logging + :param timeout: Timeout value to override the default or the value + specified when creating the + :class:`~sprockets_postgres.PostgresConnector`. + :type timeout: :data:`~sprockets_postgres.Timeout` + + :raises asyncio.TimeoutError: when there is a query or network timeout + :raises psycopg2.Error: when there is an exception raised by Postgres + + .. note: :exc:`psycopg2.Error` is the base exception for all + :mod:`psycopg2` exceptions and the actual exception raised will + likely be more specific. + + :rtype: :class:`~sprockets_postgres.QueryResult` + + """ return await self._query( self.cursor.execute, metric_name, @@ -79,6 +163,42 @@ class PostgresConnector: @contextlib.asynccontextmanager async def transaction(self) \ -> typing.AsyncContextManager['PostgresConnector']: + """asynchronous :ref:`context-manager ` + function that implements full ``BEGIN``, ``COMMIT``, and ``ROLLBACK`` + semantics. If there is a :exc:`psycopg2.Error` raised during the + transaction, the entire transaction will be rolled back. + + If no exception is raised, the transaction will be committed when + exiting the context manager. + + .. note:: This method is provided for edge case usage. As a + generalization + :meth:`sprockets_postgres.RequestHandlerMixin.postgres_transaction` + should be used instead. + + *Usage Example* + + .. code-block:: + + class RequestHandler(sprockets_postgres.RequestHandlerMixin, + web.RequestHandler): + + async def post(self): + async with self.postgres_transaction() as transaction: + result1 = await transaction.execute(QUERY_ONE) + result2 = await transaction.execute(QUERY_TWO) + result3 = await transaction.execute(QUERY_THREE) + + :raises asyncio.TimeoutError: when there is a query or network timeout + when starting the transaction + :raises psycopg2.Error: when there is an exception raised by Postgres + when starting the transaction + + .. note: :exc:`psycopg2.Error` is the base exception for all + :mod:`psycopg2` exceptions and the actual exception raised will + likely be more specific. + + """ async with self.cursor.begin(): yield self @@ -96,10 +216,11 @@ class PostgresConnector: if exc: raise exc else: - if self._record_duration: - self._record_duration( + results = await self._query_results() + if self._on_duration: + self._on_duration( metric_name, time.monotonic() - start_time) - return await self._query_results() + return results async def _query_results(self) -> QueryResult: count, row, rows = self.cursor.rowcount, None, None @@ -122,12 +243,13 @@ class ConnectionException(Exception): class ApplicationMixin: """ - :class:`sprockets.http.app.Application` mixin for handling the connection - to Postgres and exporting functions for querying the database, - getting the status, and proving a cursor. + :class:`sprockets.http.app.Application` / :class:`tornado.web.Application` + mixin for handling the connection to Postgres and exporting functions for + querying the database, getting the status, and proving a cursor. - Automatically creates and shuts down :class:`aio.pool.Pool` on startup - and shutdown. + Automatically creates and shuts down :class:`aiopg.Pool` on startup + and shutdown by installing `on_start` and `shutdown` callbacks into the + :class:`~sprockets.http.app.Application` instance. """ POSTGRES_STATUS_TIMEOUT = 3 @@ -141,24 +263,77 @@ class ApplicationMixin: @contextlib.asynccontextmanager async def postgres_connector(self, on_error: typing.Callable, - record_duration: typing.Optional[ + on_duration: typing.Optional[ typing.Callable] = None, timeout: Timeout = None) \ -> typing.AsyncContextManager[PostgresConnector]: + """Asynchronous :ref:`context-manager ` + that returns a :class:`~sprockets_postgres.PostgresConnector` instance + from the connection pool with a cursor. + + .. note:: This function is designed to work in conjunction with the + :class:`~sprockets_postgres.RequestHandlerMixin` and is generally + not invoked directly. + + :param on_error: A callback function that is invoked on exception. If + an exception is returned from that function, it will raise it. + :param on_duration: An optional callback function that is invoked after + a query has completed to record the duration that encompasses + both executing the query and retrieving the returned records, if + any. + :param timeout: Used to override the default query timeout. + :type timeout: :data:`~sprockets_postgres.Timeout` + + :raises asyncio.TimeoutError: when the request to retrieve a connection + from the pool times out. + :raises sprockets_postgres.ConnectionException: when the application + can not connect to the configured Postgres instance. + :raises psycopg2.Error: when Postgres raises an exception during the + creation of the cursor. + + .. note: :exc:`psycopg2.Error` is the base exception for all + :mod:`psycopg2` exceptions and the actual exception raised will + likely be more specific. + + """ try: async with self._postgres_pool.acquire() as conn: async with conn.cursor( cursor_factory=extras.RealDictCursor, timeout=timeout) as cursor: yield PostgresConnector( - cursor, on_error, record_duration, timeout) + cursor, on_error, on_duration, timeout) except (asyncio.TimeoutError, psycopg2.Error) as err: - on_error('postgres_connector', ConnectionException(str(err))) + exc = on_error('postgres_connector', ConnectionException(str(err))) + if exc: + raise exc + else: # postgres_status.on_error does not return an exception + yield None async def postgres_status(self) -> dict: """Invoke from the ``/status`` RequestHandler to check that there is - a Postgres connection handler available and return info about the - pool. + a Postgres connection handler available and return info about the + pool. + + The ``available`` item in the dictionary indicates that the + application was able to perform a ``SELECT 1`` against the database + using a :class:`~sprockets_postgres.PostgresConnector` instance. + + The ``pool_size`` item indicates the current quantity of open + connections to Postgres. + + The ``pool_free`` item indicates the current number of idle + connections available to process queries. + + *Example return value* + + .. code-block:: python + + { + 'available': True, + 'pool_size': 10, + 'pool_free': 8 + } """ query_error = asyncio.Event() @@ -170,7 +345,9 @@ class ApplicationMixin: async with self.postgres_connector( on_error, timeout=self.POSTGRES_STATUS_TIMEOUT) as connector: - await connector.execute('SELECT 1') + if connector: + await connector.execute('SELECT 1') + return { 'available': not query_error.is_set(), 'pool_size': self._postgres_pool.size, @@ -229,15 +406,17 @@ class ApplicationMixin: This is invoked by the Application shutdown callback mechanism. """ - self._postgres_pool.close() - await self._postgres_pool.wait_closed() + if self._postgres_pool is not None: + self._postgres_pool.close() + await self._postgres_pool.wait_closed() class RequestHandlerMixin: """ - RequestHandler mixin class exposing functions for querying the database, - recording the duration to either `sprockets-influxdb` or - `sprockets.mixins.metrics`, and handling exceptions. + A RequestHandler mixin class exposing functions for querying the database, + recording the duration to either :mod:`sprockets-influxdb + ` or :mod:`sprockets.mixins.metrics`, and + handling exceptions. """ async def postgres_callproc(self, @@ -246,6 +425,27 @@ class RequestHandlerMixin: metric_name: str = '', *, timeout: Timeout = None) -> QueryResult: + """Execute a stored procedure / function + + :param name: The stored procedure / function name to call + :param parameters: Query parameters to pass when calling + :type parameters: :data:`~sprockets_postgres.QueryParameters` + :param metric_name: The metric name for duration recording and logging + :param timeout: Timeout value to override the default or the value + specified when creating the + :class:`~sprockets_postgres.PostgresConnector`. + :type timeout: :data:`~sprockets_postgres.Timeout` + + :raises asyncio.TimeoutError: when there is a query or network timeout + :raises psycopg2.Error: when there is an exception raised by Postgres + + .. note: :exc:`psycopg2.Error` is the base exception for all + :mod:`psycopg2` exceptions and the actual exception raised will + likely be more specific. + + :rtype: :class:`~sprockets_postgres.QueryResult` + + """ async with self.application.postgres_connector( self._on_postgres_error, self._on_postgres_timing, @@ -266,6 +466,24 @@ class RequestHandlerMixin: bound to variables in the operation. Variables are specified either with positional ``%s`` or named ``%({name})s`` placeholders. + :param sql: The SQL statement to execute + :param parameters: Query parameters to pass as part of the execution + :type parameters: :data:`~sprockets_postgres.QueryParameters` + :param metric_name: The metric name for duration recording and logging + :param timeout: Timeout value to override the default or the value + specified when creating the + :class:`~sprockets_postgres.PostgresConnector`. + :type timeout: :data:`~sprockets_postgres.Timeout` + + :raises asyncio.TimeoutError: when there is a query or network timeout + :raises psycopg2.Error: when there is an exception raised by Postgres + + .. note: :exc:`psycopg2.Error` is the base exception for all + :mod:`psycopg2` exceptions and the actual exception raised will + likely be more specific. + + :rtype: :class:`~sprockets_postgres.QueryResult` + """ async with self.application.postgres_connector( self._on_postgres_error, @@ -277,8 +495,41 @@ class RequestHandlerMixin: @contextlib.asynccontextmanager async def postgres_transaction(self, timeout: Timeout = None) \ -> typing.AsyncContextManager[PostgresConnector]: - """Yields a :class:`PostgresConnector` instance in a transaction. - Will automatically commit or rollback based upon exception. + """asynchronous :ref:`context-manager ` + function that implements full ``BEGIN``, ``COMMIT``, and ``ROLLBACK`` + semantics. If there is a :exc:`psycopg2.Error` raised during the + transaction, the entire transaction will be rolled back. + + If no exception is raised, the transaction will be committed when + exiting the context manager. + + *Usage Example* + + .. code-block:: python + + class RequestHandler(sprockets_postgres.RequestHandlerMixin, + web.RequestHandler): + + async def post(self): + async with self.postgres_transaction() as transaction: + result1 = await transaction.execute(QUERY_ONE) + result2 = await transaction.execute(QUERY_TWO) + result3 = await transaction.execute(QUERY_THREE) + + + :param timeout: Timeout value to override the default or the value + specified when creating the + :class:`~sprockets_postgres.PostgresConnector`. + :type timeout: :data:`~sprockets_postgres.Timeout` + + :raises asyncio.TimeoutError: when there is a query or network timeout + when starting the transaction + :raises psycopg2.Error: when there is an exception raised by Postgres + when starting the transaction + + .. note: :exc:`psycopg2.Error` is the base exception for all + :mod:`psycopg2` exceptions and the actual exception raised will + likely be more specific. """ async with self.application.postgres_connector( @@ -291,7 +542,12 @@ class RequestHandlerMixin: def _on_postgres_error(self, metric_name: str, exc: Exception) -> typing.Optional[Exception]: - """Override for different error handling behaviors""" + """Override for different error handling behaviors + + Return an exception if you would like for it to be raised, or swallow + it here. + + """ LOGGER.error('%s in %s for %s (%s)', exc.__class__.__name__, self.__class__.__name__, metric_name, str(exc).split('\n')[0]) @@ -308,7 +564,18 @@ class RequestHandlerMixin: def _on_postgres_timing(self, metric_name: str, duration: float) -> None: - """Override for custom metric recording""" + """Override for custom metric recording. As a default behavior it will + attempt to detect `sprockets-influxdb + `_ and + `sprockets.mixins.metrics + `_ and + record the metrics using them if they are available. If they are not + available, it will record the query duration to the `DEBUG` log. + + :param metric_name: The name of the metric to record + :param duration: The duration to record for the metric + + """ if hasattr(self, 'influxdb'): # sprockets-influxdb self.influxdb.set_field(metric_name, duration) elif hasattr(self, 'record_timing'): # sprockets.mixins.metrics diff --git a/tests.py b/tests.py index f676824..67d0402 100644 --- a/tests.py +++ b/tests.py @@ -2,13 +2,14 @@ import asyncio import json import os import typing +import unittest import uuid from unittest import mock import psycopg2 from psycopg2 import errors from sprockets.http import app, testing -from tornado import web +from tornado import ioloop, web import sprockets_postgres @@ -63,6 +64,16 @@ class ErrorRequestHandler(RequestHandler): return RuntimeError() +class ErrorPassthroughRequestHandler(RequestHandler): + + async def get(self): + exc = self._on_postgres_error('test', RuntimeError()) + if isinstance(exc, RuntimeError): + self.set_status(204) + else: + raise web.HTTPError(500, 'Did not pass through') + + class ExecuteRequestHandler(RequestHandler): GET_SQL = 'SELECT %s::TEXT AS value;' @@ -118,6 +129,14 @@ class MultiRowRequestHandler(RequestHandler): 'rows': self.cast_data(result.rows)}) +class NoErrorRequestHandler(ErrorRequestHandler): + + def _on_postgres_error(self, + metric_name: str, + exc: Exception) -> typing.Optional[Exception]: + return None + + class NoRowRequestHandler(RequestHandler): GET_SQL = """\ @@ -210,10 +229,12 @@ class TestCase(testing.SprocketsHttpTestCase): web.url('/callproc', CallprocRequestHandler), web.url('/count', CountRequestHandler), web.url('/error', ErrorRequestHandler), + web.url('/error-passthrough', ErrorPassthroughRequestHandler), web.url('/execute', ExecuteRequestHandler), web.url('/influxdb', InfluxDBRequestHandler), web.url('/metrics-mixin', MetricsMixinRequestHandler), web.url('/multi-row', MultiRowRequestHandler), + web.url('/no-error', NoErrorRequestHandler), web.url('/no-row', NoRowRequestHandler), web.url('/status', StatusRequestHandler), web.url('/transaction', TransactionRequestHandler), @@ -231,6 +252,13 @@ class RequestHandlerMixinTestCase(TestCase): self.assertGreaterEqual(data['pool_size'], 1) self.assertGreaterEqual(data['pool_free'], 1) + @mock.patch('aiopg.pool.Pool.acquire') + def test_postgres_status_connect_error(self, acquire): + acquire.side_effect = asyncio.TimeoutError() + response = self.fetch('/status') + self.assertEqual(response.code, 503) + self.assertFalse(json.loads(response.body)['available']) + @mock.patch('aiopg.cursor.Cursor.execute') def test_postgres_status_error(self, execute): execute.side_effect = asyncio.TimeoutError() @@ -245,12 +273,23 @@ class RequestHandlerMixinTestCase(TestCase): uuid.UUID(json.loads(response.body)['value']), uuid.UUID) @mock.patch('aiopg.cursor.Cursor.execute') - def test_postgres_error_passthrough(self, execute): + def test_postgres_error(self, execute): execute.side_effect = asyncio.TimeoutError response = self.fetch('/error') self.assertEqual(response.code, 500) self.assertIn(b'Internal Server Error', response.body) + @mock.patch('aiopg.pool.Pool.acquire') + def test_postgres_error_on_connect(self, acquire): + acquire.side_effect = asyncio.TimeoutError + response = self.fetch('/error') + self.assertEqual(response.code, 500) + self.assertIn(b'Internal Server Error', response.body) + + def test_postgres_error_passthrough(self): + response = self.fetch('/error-passthrough') + self.assertEqual(response.code, 204) + def test_postgres_execute(self): expectation = str(uuid.uuid4()) response = self.fetch('/execute?value={}'.format(expectation)) @@ -369,8 +408,7 @@ class TransactionTestCase(TestCase): self.assertEqual(record['last_updated_at'], last_updated) -""" -class MissingURLTestCase(testing.SprocketsHttpTestCase): +class MissingURLTestCase(unittest.TestCase): @classmethod def setUpClass(cls): @@ -384,16 +422,10 @@ class MissingURLTestCase(testing.SprocketsHttpTestCase): if 'POSTGRES_URL' in os.environ: del os.environ['POSTGRES_URL'] - def setUp(self): - self.stop_mock = None - super().setUp() - - def get_app(self): - self.app = Application() - self.stop_mock = mock.Mock( - wraps=self.app.stop, side_effect=RuntimeError) - return self.app - def test_that_stop_is_invoked(self): - self.stop_mock.assert_called_once_with(self.io_loop) -""" + io_loop = ioloop.IOLoop.current() + obj = Application() + obj.stop = mock.Mock(wraps=obj.stop) + obj.start(io_loop) + io_loop.start() + obj.stop.assert_called_once()