From 46f829fc4ac0b998f489ae62c6f81a1805056c50 Mon Sep 17 00:00:00 2001 From: "Gavin M. Roy" Date: Mon, 6 Apr 2020 17:39:52 -0400 Subject: [PATCH 01/19] Initial WIP commit --- .editorconfig | 21 +++ .github/workflows/deploy.yaml | 20 +++ .github/workflows/testing.yaml | 66 +++++++++ LICENSE | 40 +++-- README.md | 1 - README.rst | 67 +++++++++ bootstrap | 75 ++++++++++ docker-compose.yml | 13 ++ setup.cfg | 77 ++++++++++ setup.py | 3 + sprockets_postgres/__init__.py | 238 ++++++++++++++++++++++++++++++ sprockets_postgres/__version__.py | 5 + tests/__init__.py | 0 tests/test_application.py | 158 ++++++++++++++++++++ 14 files changed, 761 insertions(+), 23 deletions(-) create mode 100644 .editorconfig create mode 100644 .github/workflows/deploy.yaml create mode 100644 .github/workflows/testing.yaml delete mode 100644 README.md create mode 100644 README.rst create mode 100755 bootstrap create mode 100644 docker-compose.yml create mode 100644 setup.cfg create mode 100644 setup.py create mode 100644 sprockets_postgres/__init__.py create mode 100644 sprockets_postgres/__version__.py create mode 100644 tests/__init__.py create mode 100644 tests/test_application.py diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 0000000..b5d8093 --- /dev/null +++ b/.editorconfig @@ -0,0 +1,21 @@ +# top-most EditorConfig file +root = true + +# Unix-style newlines with a newline ending every file +[*] +end_of_line = lf +insert_final_newline = true + +# 4 space indentation +[*.py] +indent_style = space +indent_size = 4 + +# 2 space indentation +[*.yml] +indent_style = space +indent_size = 2 + +[bootstrap] +indent_style = space +indent_size = 2 diff --git a/.github/workflows/deploy.yaml b/.github/workflows/deploy.yaml new file mode 100644 index 0000000..5ac31cd --- /dev/null +++ b/.github/workflows/deploy.yaml @@ -0,0 +1,20 @@ +name: Deployment +on: + push: + branches-ignore: ["*"] + tags: ["*"] +jobs: + deploy: + runs-on: ubuntu-latest + if: github.event_name == 'push' && startsWith(github.event.ref, 'refs/tags') + container: python:3.8-alpine + steps: + - name: Checkout repository + uses: actions/checkout@v1 + - name: Build package + run: python3 setup.py sdist + - name: Publish package + uses: pypa/gh-action-pypi-publish@master + with: + user: __token__ + password: ${{ secrets.PYPI_PASSWORD }} diff --git a/.github/workflows/testing.yaml b/.github/workflows/testing.yaml new file mode 100644 index 0000000..ccbadd6 --- /dev/null +++ b/.github/workflows/testing.yaml @@ -0,0 +1,66 @@ +name: Testing +on: + push: + branches: ["*"] + paths-ignore: + - 'docs/**' + - 'setup.*' + - '*.md' + - '*.rst' + tags-ignore: ["*"] +jobs: + test: + runs-on: ubuntu-latest + timeout-minutes: 3 + services: + postgres: + image: postgres:12-alpine + options: >- + --health-cmd "pg_isready" + --health-interval 10s + --health-timeout 10s + --health-retries 5 + ports: + - 5432 + env: + POSTGRES_HOST_AUTH_METHOD: trust + strategy: + matrix: + python: [3.7, 3.8] + container: + image: python:${{ matrix.python }}-alpine + steps: + - name: Checkout repository + uses: actions/checkout@v1 + + - name: Install OS dependencies + run: apk --update add gcc make musl-dev linux-headers postgresql-dev + + - name: Install testing dependencies + run: pip3 --no-cache-dir install -e '.[testing]' + + - name: Create build directory + run: mkdir build + + - name: Create build/test-environment + run: echo "export POSTGRES_URL=postgresql://postgres@postgres:5432/postgres" > build/test-environment + + - name: Install UUID extension + run: psql -q -h postgres -U postgres -d postgres -c 'CREATE EXTENSION "uuid-ossp";' + + - name: Run flake8 tests + run: flake8 + + - name: Run tests + run: coverage run + + - name: Output coverage + run: coverage report && coverage xml + + - name: Upload Coverage + uses: codecov/codecov-action@v1.0.2 + with: + token: ${{secrets.CODECOV_TOKEN}} + file: build/coverage.xml + flags: unittests + fail_ci_if_error: true diff --git a/LICENSE b/LICENSE index 3a7e87f..8c1fb0a 100644 --- a/LICENSE +++ b/LICENSE @@ -1,29 +1,25 @@ -BSD 3-Clause License - -Copyright (c) 2020, Sprockets +Copyright (c) 2020 AWeber Communications All rights reserved. -Redistribution and use in source and binary forms, with or without -modification, are permitted provided that the following conditions are met: +Redistribution and use in source and binary forms, with or without modification, +are permitted provided that the following conditions are met: -1. Redistributions of source code must retain the above copyright notice, this + * Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer. - -2. Redistributions in binary form must reproduce the above copyright notice, + * Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. + * Neither the name of the copyright holder nor the names of its contributors may + be used to endorse or promote products derived from this software without + specific prior written permission. -3. Neither the name of the copyright holder nor the names of its - contributors may be used to endorse or promote products derived from - this software without specific prior written permission. - -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" -AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE -DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE -FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL -DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR -SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER -CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, -OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. +IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, +INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, +BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF +LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE +OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF +ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/README.md b/README.md deleted file mode 100644 index f194828..0000000 --- a/README.md +++ /dev/null @@ -1 +0,0 @@ -# sprockets-postgres \ No newline at end of file diff --git a/README.rst b/README.rst new file mode 100644 index 0000000..50a3087 --- /dev/null +++ b/README.rst @@ -0,0 +1,67 @@ +Sprockets Postgres +================== +An asynchronous Postgres client mixin for Tornado applications + +|Version| |Status| |Coverage| |License| + +Installation +------------ +``sprockets-postgres`` is available on the Python package index and is installable via pip: + +.. code:: bash + + pip install sprockets-postgres + +Documentation +------------- +Documentation is available at `sprockets-postgres.readthedocs.io `_. + +Configuration +------------- +The following table details the environment variable configuration options: + ++---------------------------------+--------------------------------------------------+---------------------------------+ +| Variable | Definition | Default | ++=================================+==================================================+=================================+ +| ``PGSQL_URL`` | The PostgreSQL URL to connect to | ``postgresql://localhost:5432`` | ++---------------------------------+--------------------------------------------------+---------------------------------+ +| ``POSTGRES_MAX_POOL_SIZE`` | Maximum connection count to Postgres per backend | ``0`` (Unlimited | ++---------------------------------+--------------------------------------------------+---------------------------------+ +| ``POSTGRES_MIN_POOL_SIZE`` | Minimum or starting pool size. | ``1`` | ++---------------------------------+--------------------------------------------------+---------------------------------+ +| ``POSTGRES_CONNECTION_TIMEOUT`` | The maximum time in seconds to spend attempting | ``10`` | +| | to create a new connection. | | ++---------------------------------+--------------------------------------------------+---------------------------------+ +| ``POSTGRES_CONNECTION_TTL`` | Time-to-life in seconds for a pooled connection. | ``300`` | ++---------------------------------+--------------------------------------------------+---------------------------------+ +| ``POSTGRES_QUERY_TIMEOUT`` | Maximum execution time for a query in seconds. | ``60`` | ++---------------------------------+--------------------------------------------------+---------------------------------+ +| ``POSTGRES_HSTORE`` | Enable HSTORE support in the client. | ``FALSE`` | ++---------------------------------+--------------------------------------------------+---------------------------------+ +| ``POSTGRES_JSON`` | Enable JSON support in the client. | ``FALSE`` | ++---------------------------------+--------------------------------------------------+---------------------------------+ +| ``POSTGRES_UUID`` | Enable UUID support in the client. | ``TRUE`` | ++---------------------------------+--------------------------------------------------+---------------------------------+ + + +Requirements +------------ +- `aiopg `_ +- `sprockets.http `_ +- `Tornado `_ + +Version History +--------------- +Available at https://sprockets-postgres.readthedocs.org/en/latest/history.html + +.. |Version| image:: https://img.shields.io/pypi/v/sprockets-postgres.svg? + :target: http://badge.fury.io/py/sprockets-postgres + +.. |Status| image:: https://img.shields.io/travis/sprockets/sprockets-postgres.svg? + :target: https://travis-ci.org/sprockets/sprockets-postgres + +.. |Coverage| image:: https://img.shields.io/codecov/c/github/sprockets/sprockets-postgres.svg? + :target: https://codecov.io/github/sprockets/sprockets-postgres?branch=master + +.. |License| image:: https://img.shields.io/pypi/l/sprockets-postgres.svg? + :target: https://sprockets-postgres.readthedocs.org diff --git a/bootstrap b/bootstrap new file mode 100755 index 0000000..d527a2a --- /dev/null +++ b/bootstrap @@ -0,0 +1,75 @@ +#!/usr/bin/env sh +set -e + +# Common constants +COLOR_RESET='\033[0m' +COLOR_GREEN='\033[0;32m' +COMPOSE_PROJECT_NAME="${COMPOSE_PROJECT_NAME:-${PWD##*/}}" +TEST_HOST="${TEST_HOST:-localhost}" + +echo "Integration test host: ${TEST_HOST}" + +get_exposed_port() { + if [ -z $3 ] + then + docker-compose port $1 $2 | cut -d: -f2 + else + docker-compose port --index=$3 $1 $2 | cut -d: -f2 + fi +} + +report_start() { + printf "Waiting for $1 ... " +} + +report_done() { + printf "${COLOR_GREEN}done${COLOR_RESET}\n" +} + +wait_for_healthy_containers() { + IDs=$(docker-compose ps -q | paste -sd " " -) + report_start "${1} containers to report healthy" + counter="0" + while true + do + if [ "$(docker inspect -f "{{.State.Health.Status}}" ${IDs} | grep -c healthy)" -eq "${1}" ]; then + break + fi + counter=$((++counter)) + if [ "${counter}" -eq 120 ]; then + echo " ERROR: containers failed to start" + exit 1 + fi + sleep 1 + done + report_done +} + +# Ensure Docker is Running +echo "Docker Information:" +echo "" +docker version +echo "" + +# Activate the virtual environment +if test -e env/bin/activate +then + . ./env/bin/activate +fi + +mkdir -p build + +# Stop any running instances and clean up after them, then pull images +docker-compose down --volumes --remove-orphans +docker-compose up -d --quiet-pull + +wait_for_healthy_containers 1 + +docker-compose exec postgres psql -q -o /dev/null -U postgres -d postgres -c 'CREATE EXTENSION "uuid-ossp";' + +cat > build/test-environment<=1.0.0,<2 + sprockets.http>=2.1.1,<3 + tornado>=6,<7 +packages = + sprockets_postgres +zip_safe = true + +[options.extras_require] +testing = + coverage + flake8 + flake8-comprehensions + flake8-deprecated + flake8-import-order + flake8-print + flake8-quotes + flake8-rst-docstrings + flake8-tuple + pygments + +[coverage:run] +branch = True +command_line = -m unittest discover tests --verbose +data_file = build/.coverage + +[coverage:report] +show_missing = True +include = + sprockets_postgres/*.py + +[coverage:html] +directory = build/coverage + +[coverage:xml] +output = build/coverage.xml + +[flake8] +application-import-names = sprockets_postgres,tests +exclude = build,docs,env +ignore = W503 +import-order-style = pycharm +rst-roles = attr,class,const,data,exc,func,meth,mod,obj,ref,yields diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..b908cbe --- /dev/null +++ b/setup.py @@ -0,0 +1,3 @@ +import setuptools + +setuptools.setup() diff --git a/sprockets_postgres/__init__.py b/sprockets_postgres/__init__.py new file mode 100644 index 0000000..ca07fa9 --- /dev/null +++ b/sprockets_postgres/__init__.py @@ -0,0 +1,238 @@ +""" +:class:`sprockets.http.app.Application` mixin for handling the connection to +Postgres and exporting functions for querying the database, getting the status, +and proving a cursor. + +Automatically creates and shuts down :class:`aio.pool.Pool` on startup +and shutdown. + +""" +import asyncio +import contextlib +import functools +import logging +import os +import typing +from distutils import util + +import aiopg +import psycopg2 +from aiopg import pool +from psycopg2 import errors, extras, extensions +from tornado import ioloop, web + +LOGGER = logging.getLogger('sprockets-postgres') + +DEFAULT_POSTGRES_CONNECTION_TIMEOUT = 10 +DEFAULT_POSTGRES_CONNECTION_TTL = 300 +DEFAULT_POSTGRES_HSTORE = 'FALSE' +DEFAULT_POSTGRES_JSON = 'FALSE' +DEFAULT_POSTGRES_MAX_POOL_SIZE = 0 +DEFAULT_POSTGRES_MIN_POOL_SIZE = 1 +DEFAULT_POSTGRES_QUERY_TIMEOUT = 120 +DEFAULT_POSTGRES_URL = 'postgresql://localhost:5432' +DEFAULT_POSTGRES_UUID = 'TRUE' + + +class ApplicationMixin: + """Application mixin for setting up the PostgreSQL client pool""" + + POSTGRES_STATUS_TIMEOUT = 3 + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._postgres_pool: typing.Optional[pool.Pool] = None + self.runner_callbacks['on_start'].append(self._postgres_setup) + self.runner_callbacks['shutdown'].append(self._postgres_shutdown) + + @contextlib.asynccontextmanager + async def postgres_cursor(self, + timeout: typing.Optional[int] = None, + raise_http_error: bool = True) \ + -> typing.AsyncContextManager[extensions.cursor]: + """Return a Postgres cursor for the pool""" + try: + async with self._postgres_pool.acquire() as conn: + async with conn.cursor( + cursor_factory=extras.RealDictCursor, + timeout=self._postgres_query_timeout(timeout)) as pgc: + yield pgc + except (asyncio.TimeoutError, + psycopg2.OperationalError, + psycopg2.Error) as error: + LOGGER.critical('Error connecting to Postgres: %s', error) + if raise_http_error: + raise web.HTTPError(503, 'Database Unavailable') + raise + + async def postgres_callproc(self, + name: str, + params: typing.Union[list, tuple, None] = None, + timeout: typing.Optional[int] = None) \ + -> typing.Union[dict, list, None]: + """Execute a stored procedure, specifying the name, SQL, passing in + optional parameters. + + :param name: The stored-proc to call + :param params: Optional parameters to pass into the function + :param timeout: Optional timeout to override the default query timeout + + """ + async with self.postgres_cursor(timeout) as cursor: + return await self._postgres_query( + cursor, cursor.callproc, name, name, params) + + async def postgres_execute(self, name: str, sql: str, + *args, + timeout: typing.Optional[int] = None) \ + -> typing.Union[dict, list, None]: + """Execute a query, specifying a name for the query, the SQL statement, + and optional positional arguments to pass in with the query. + + Parameters may be provided as sequence or mapping and will be + bound to variables in the operation. Variables are specified + either with positional ``%s`` or named ``%({name})s`` placeholders. + + :param name: The stored-proc to call + :param sql: The SQL statement to execute + :param timeout: Optional timeout to override the default query timeout + + """ + async with self.postgres_cursor(timeout) as cursor: + return await self._postgres_query( + cursor, cursor.execute, name, sql, args) + + async def postgres_status(self) -> dict: + """Invoke from the ``/status`` RequestHandler to check that there is + a Postgres connection handler available and return info about the + pool. + + """ + available = True + try: + async with self.postgres_cursor( + self.POSTGRES_STATUS_TIMEOUT, False) as cursor: + await cursor.execute('SELECT 1') + except (asyncio.TimeoutError, psycopg2.OperationalError): + available = False + return { + 'available': available, + 'pool_size': self._postgres_pool.size, + 'pool_free': self._postgres_pool.freesize + } + + async def _postgres_query(self, + cursor: aiopg.Cursor, + method: typing.Callable, + name: str, + sql: str, + parameters: typing.Union[dict, list, tuple]) \ + -> typing.Union[dict, list, None]: + """Execute a query, specifying the name, SQL, passing in + + """ + try: + await method(sql, parameters) + except asyncio.TimeoutError as err: + LOGGER.error('Query timeout for %s: %s', + name, str(err).split('\n')[0]) + raise web.HTTPError(500, reason='Query Timeout') + except errors.UniqueViolation as err: + LOGGER.error('Database error for %s: %s', + name, str(err).split('\n')[0]) + raise web.HTTPError(409, reason='Unique Violation') + except psycopg2.Error as err: + LOGGER.error('Database error for %s: %s', + name, str(err).split('\n')[0]) + raise web.HTTPError(500, reason='Database Error') + try: + return await self._postgres_query_results(cursor) + except psycopg2.ProgrammingError: + return + + @staticmethod + async def _postgres_query_results(cursor: aiopg.Cursor) \ + -> typing.Union[dict, list, None]: + """Invoked by self.postgres_query to return all of the query results + as either a ``dict`` or ``list`` depending on the quantity of rows. + + This can raise a ``psycopg2.ProgrammingError`` for an INSERT/UPDATE + without RETURNING or a DELETE. That exception is caught by the caller. + + :raises psycopg2.ProgrammingError: when there are no rows to fetch + even though the rowcount is > 0 + + """ + if cursor.rowcount == 1: + return await cursor.fetchone() + elif cursor.rowcount > 1: + return await cursor.fetchall() + return None + + @functools.lru_cache + def _postgres_query_timeout(self, + timeout: typing.Optional[int] = None) -> int: + """Return query timeout, either from the specified value or + ``POSTGRES_QUERY_TIMEOUT`` environment variable, if set. + + Defaults to sprockets_postgres.DEFAULT_POSTGRES_QUERY_TIMEOUT. + + """ + return timeout if timeout else int( + os.environ.get( + 'POSTGRES_QUERY_TIMEOUT', + DEFAULT_POSTGRES_QUERY_TIMEOUT)) + + async def _postgres_setup(self, + _app: web.Application, + _ioloop: ioloop.IOLoop) -> None: + """Setup the Postgres pool of connections and log if there is an error. + + This is invoked by the Application on start callback mechanism. + + """ + url = os.environ.get('POSTGRES_URL', DEFAULT_POSTGRES_URL) + LOGGER.debug('Connecting to PostgreSQL: %s', url) + self._postgres_pool = pool.Pool( + url, + minsize=int( + os.environ.get( + 'POSTGRES_MIN_POOL_SIZE', + DEFAULT_POSTGRES_MIN_POOL_SIZE)), + maxsize=int( + os.environ.get( + 'POSTGRES_MAX_POOL_SIZE', + DEFAULT_POSTGRES_MAX_POOL_SIZE)), + timeout=int( + os.environ.get( + 'POSTGRES_CONNECT_TIMEOUT', + DEFAULT_POSTGRES_CONNECTION_TIMEOUT)), + enable_hstore=util.strtobool( + os.environ.get( + 'POSTGRES_HSTORE', DEFAULT_POSTGRES_HSTORE)), + enable_json=util.strtobool( + os.environ.get('POSTGRES_JSON', DEFAULT_POSTGRES_JSON)), + enable_uuid=util.strtobool( + os.environ.get('POSTGRES_UUID', DEFAULT_POSTGRES_UUID)), + echo=False, + on_connect=None, + pool_recycle=int( + os.environ.get( + 'POSTGRES_CONNECTION_TTL', + DEFAULT_POSTGRES_CONNECTION_TTL))) + try: + async with self._postgres_pool._cond: + await self._postgres_pool._fill_free_pool(False) + except (psycopg2.OperationalError, + psycopg2.Error) as error: # pragma: nocover + LOGGER.warning('Error connecting to PostgreSQL on startup: %s', + error) + + async def _postgres_shutdown(self, _ioloop: ioloop.IOLoop) -> None: + """Shutdown the Postgres connections and wait for them to close. + + This is invoked by the Application shutdown callback mechanism. + + """ + self._postgres_pool.close() + await self._postgres_pool.wait_closed() diff --git a/sprockets_postgres/__version__.py b/sprockets_postgres/__version__.py new file mode 100644 index 0000000..a9c45c6 --- /dev/null +++ b/sprockets_postgres/__version__.py @@ -0,0 +1,5 @@ +""" +sprockets-postgres Version + +""" +version = '1.0.0a1' diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_application.py b/tests/test_application.py new file mode 100644 index 0000000..31013ac --- /dev/null +++ b/tests/test_application.py @@ -0,0 +1,158 @@ +import asyncio +import json +import os +from unittest import mock +import uuid + +import psycopg2 +from psycopg2 import errors +from sprockets.http import app, testing +from tornado import web + +import sprockets_postgres + + +class CallprocRequestHandler(web.RequestHandler): + + async def get(self): + result = await self.application.postgres_callproc('uuid_generate_v4') + await self.finish({'value': str(result['uuid_generate_v4'])}) + + +class ExecuteRequestHandler(web.RequestHandler): + + GET_SQL = 'SELECT %s::TEXT AS value;' + + async def get(self): + result = await self.application.postgres_execute( + 'get', self.GET_SQL, self.get_argument('value')) + await self.finish({'value': result['value'] if result else None}) + + +class MultiRowRequestHandler(web.RequestHandler): + + GET_SQL = 'SELECT * FROM information_schema.enabled_roles;' + + async def get(self): + rows = await self.application.postgres_execute('get', self.GET_SQL) + await self.finish({'rows': [row['role_name'] for row in rows]}) + + +class NoRowRequestHandler(web.RequestHandler): + + GET_SQL = """\ + SELECT * FROM information_schema.tables WHERE table_schema = 'public';""" + + async def get(self): + rows = await self.application.postgres_execute('get', self.GET_SQL) + await self.finish({'rows': rows}) + + +class StatusRequestHandler(web.RequestHandler): + + async def get(self): + result = await self.application.postgres_status() + if not result['available']: + self.set_status(503, 'Database Unavailable') + await self.finish(dict(result)) + + +class Application(sprockets_postgres.ApplicationMixin, + app.Application): + pass + + +class ExecuteTestCase(testing.SprocketsHttpTestCase): + + @classmethod + def setUpClass(cls): + with open('build/test-environment') as f: + for line in f: + if line.startswith('export '): + line = line[7:] + name, _, value = line.strip().partition('=') + os.environ[name] = value + + def get_app(self): + self.app = Application(handlers=[ + web.url('/callproc', CallprocRequestHandler), + web.url('/execute', ExecuteRequestHandler), + web.url('/multi-row', MultiRowRequestHandler), + web.url('/no-row', NoRowRequestHandler), + web.url('/status', StatusRequestHandler) + ]) + return self.app + + + def test_postgres_status(self): + response = self.fetch('/status') + data = json.loads(response.body) + self.assertTrue(data['available']) + self.assertGreaterEqual(data['pool_size'], 1) + self.assertGreaterEqual(data['pool_free'], 1) + + @mock.patch('aiopg.cursor.Cursor.execute') + def test_postgres_status_error(self, execute): + execute.side_effect = asyncio.TimeoutError() + response = self.fetch('/status') + self.assertEqual(response.code, 503) + self.assertFalse(json.loads(response.body)['available']) + + def test_postgres_callproc(self): + response = self.fetch('/callproc') + self.assertEqual(response.code, 200) + self.assertIsInstance( + uuid.UUID(json.loads(response.body)['value']), uuid.UUID) + + def test_postgres_execute(self): + expectation = str(uuid.uuid4()) + response = self.fetch('/execute?value={}'.format(expectation)) + self.assertEqual(response.code, 200) + self.assertEqual(json.loads(response.body)['value'], expectation) + + def test_postgres_multirow(self): + response = self.fetch('/multi-row') + self.assertEqual(response.code, 200) + body = json.loads(response.body) + self.assertIsInstance(body['rows'], list) + self.assertIn('postgres', body['rows']) + + def test_postgres_norow(self): + response = self.fetch('/no-row') + self.assertEqual(response.code, 200) + body = json.loads(response.body) + self.assertIsNone(body['rows']) + + @mock.patch('aiopg.cursor.Cursor.execute') + def test_postgres_execute_timeout_error(self, execute): + execute.side_effect = asyncio.TimeoutError() + response = self.fetch('/execute?value=1') + self.assertEqual(response.code, 500) + self.assertIn(b'Query Timeout', response.body) + + @mock.patch('aiopg.cursor.Cursor.execute') + def test_postgres_execute_unique_violation(self, execute): + execute.side_effect = errors.UniqueViolation() + response = self.fetch('/execute?value=1') + self.assertEqual(response.code, 409) + self.assertIn(b'Unique Violation', response.body) + + @mock.patch('aiopg.cursor.Cursor.execute') + def test_postgres_execute_error(self, execute): + execute.side_effect = psycopg2.Error() + response = self.fetch('/execute?value=1') + self.assertEqual(response.code, 500) + self.assertIn(b'Database Error', response.body) + + def test_postgres_programming_error(self): + with mock.patch.object(self.app, '_postgres_query_results') as pqr: + pqr.side_effect = psycopg2.ProgrammingError() + response = self.fetch('/execute?value=1') + self.assertEqual(response.code, 200) + self.assertIsNone(json.loads(response.body)['value']) + + @mock.patch('aiopg.connection.Connection.cursor') + def test_postgres_cursor_raises(self, cursor): + cursor.side_effect = asyncio.TimeoutError() + response = self.fetch('/execute?value=1') + self.assertEqual(response.code, 503) From 137c32c6e555b250e82a082c1561d072e51f3fc9 Mon Sep 17 00:00:00 2001 From: "Gavin M. Roy" Date: Mon, 6 Apr 2020 17:41:19 -0400 Subject: [PATCH 02/19] README updates --- README.rst | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/README.rst b/README.rst index 50a3087..db62a24 100644 --- a/README.rst +++ b/README.rst @@ -25,7 +25,7 @@ The following table details the environment variable configuration options: +=================================+==================================================+=================================+ | ``PGSQL_URL`` | The PostgreSQL URL to connect to | ``postgresql://localhost:5432`` | +---------------------------------+--------------------------------------------------+---------------------------------+ -| ``POSTGRES_MAX_POOL_SIZE`` | Maximum connection count to Postgres per backend | ``0`` (Unlimited | +| ``POSTGRES_MAX_POOL_SIZE`` | Maximum connection count to Postgres per backend | ``0`` (Unlimited) | +---------------------------------+--------------------------------------------------+---------------------------------+ | ``POSTGRES_MIN_POOL_SIZE`` | Minimum or starting pool size. | ``1`` | +---------------------------------+--------------------------------------------------+---------------------------------+ @@ -43,7 +43,6 @@ The following table details the environment variable configuration options: | ``POSTGRES_UUID`` | Enable UUID support in the client. | ``TRUE`` | +---------------------------------+--------------------------------------------------+---------------------------------+ - Requirements ------------ - `aiopg `_ From ca5dc26de312af8fbefef34f7a3b2bdea6d4f0b7 Mon Sep 17 00:00:00 2001 From: "Gavin M. Roy" Date: Mon, 6 Apr 2020 17:47:50 -0400 Subject: [PATCH 03/19] Move from a package to a module --- VERSION | 1 + setup.cfg | 4 ++-- sprockets_postgres/__init__.py => sprockets_postgres.py | 0 sprockets_postgres/__version__.py | 5 ----- tests/test_application.py => tests.py | 0 tests/__init__.py | 0 6 files changed, 3 insertions(+), 7 deletions(-) create mode 100644 VERSION rename sprockets_postgres/__init__.py => sprockets_postgres.py (100%) delete mode 100644 sprockets_postgres/__version__.py rename tests/test_application.py => tests.py (100%) delete mode 100644 tests/__init__.py diff --git a/VERSION b/VERSION new file mode 100644 index 0000000..1628379 --- /dev/null +++ b/VERSION @@ -0,0 +1 @@ +1.0.0a1 diff --git a/setup.cfg b/setup.cfg index b2d632b..3d25199 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = sprockets-postgres -version = attr: sprockets_postgres.__version__.version +version = file: VERSION description = An asynchronous Postgres client and mixin for Tornado applications long_description = file: README.rst long_description_content_type = text/x-rst; charset=UTF-8 @@ -36,7 +36,7 @@ install_requires = aiopg>=1.0.0,<2 sprockets.http>=2.1.1,<3 tornado>=6,<7 -packages = +pymodules = sprockets_postgres zip_safe = true diff --git a/sprockets_postgres/__init__.py b/sprockets_postgres.py similarity index 100% rename from sprockets_postgres/__init__.py rename to sprockets_postgres.py diff --git a/sprockets_postgres/__version__.py b/sprockets_postgres/__version__.py deleted file mode 100644 index a9c45c6..0000000 --- a/sprockets_postgres/__version__.py +++ /dev/null @@ -1,5 +0,0 @@ -""" -sprockets-postgres Version - -""" -version = '1.0.0a1' diff --git a/tests/test_application.py b/tests.py similarity index 100% rename from tests/test_application.py rename to tests.py diff --git a/tests/__init__.py b/tests/__init__.py deleted file mode 100644 index e69de29..0000000 From ff541b7c19ea0e719e68505e3bfebcbc5763573f Mon Sep 17 00:00:00 2001 From: "Gavin M. Roy" Date: Mon, 6 Apr 2020 17:49:03 -0400 Subject: [PATCH 04/19] Include the VERSION file when packaging --- MANIFEST.in | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 MANIFEST.in diff --git a/MANIFEST.in b/MANIFEST.in new file mode 100644 index 0000000..9340542 --- /dev/null +++ b/MANIFEST.in @@ -0,0 +1,3 @@ +include README.rst +include LICENSE +include VERSION From 05cedb1d571325a2bdc510745b0e7fb6e5cdae8c Mon Sep 17 00:00:00 2001 From: "Gavin M. Roy" Date: Mon, 6 Apr 2020 17:51:28 -0400 Subject: [PATCH 05/19] install psql --- .github/workflows/testing.yaml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/.github/workflows/testing.yaml b/.github/workflows/testing.yaml index ccbadd6..9ca7753 100644 --- a/.github/workflows/testing.yaml +++ b/.github/workflows/testing.yaml @@ -46,7 +46,9 @@ jobs: run: echo "export POSTGRES_URL=postgresql://postgres@postgres:5432/postgres" > build/test-environment - name: Install UUID extension - run: psql -q -h postgres -U postgres -d postgres -c 'CREATE EXTENSION "uuid-ossp";' + run: | + apt-get install -y postgresql-client \ + && psql -q -h postgres -U postgres -d postgres -c 'CREATE EXTENSION "uuid-ossp";' - name: Run flake8 tests run: flake8 From 705be45ff1cf3deb757bcf24756f31a84b0ee3f5 Mon Sep 17 00:00:00 2001 From: "Gavin M. Roy" Date: Mon, 6 Apr 2020 17:58:56 -0400 Subject: [PATCH 06/19] apk, not apt --- .github/workflows/testing.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/testing.yaml b/.github/workflows/testing.yaml index 9ca7753..5d376d0 100644 --- a/.github/workflows/testing.yaml +++ b/.github/workflows/testing.yaml @@ -47,7 +47,7 @@ jobs: - name: Install UUID extension run: | - apt-get install -y postgresql-client \ + apk add postgresql-client \ && psql -q -h postgres -U postgres -d postgres -c 'CREATE EXTENSION "uuid-ossp";' - name: Run flake8 tests From b620e1048cc09c492cdc87d0666f1683fbd8abc8 Mon Sep 17 00:00:00 2001 From: "Gavin M. Roy" Date: Mon, 6 Apr 2020 18:20:34 -0400 Subject: [PATCH 07/19] Address flake8 errors --- sprockets_postgres.py | 2 +- tests.py | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/sprockets_postgres.py b/sprockets_postgres.py index ca07fa9..d7ab758 100644 --- a/sprockets_postgres.py +++ b/sprockets_postgres.py @@ -18,7 +18,7 @@ from distutils import util import aiopg import psycopg2 from aiopg import pool -from psycopg2 import errors, extras, extensions +from psycopg2 import errors, extensions, extras from tornado import ioloop, web LOGGER = logging.getLogger('sprockets-postgres') diff --git a/tests.py b/tests.py index 31013ac..2294ff8 100644 --- a/tests.py +++ b/tests.py @@ -1,8 +1,8 @@ import asyncio import json import os -from unittest import mock import uuid +from unittest import mock import psycopg2 from psycopg2 import errors @@ -83,7 +83,6 @@ class ExecuteTestCase(testing.SprocketsHttpTestCase): ]) return self.app - def test_postgres_status(self): response = self.fetch('/status') data = json.loads(response.body) From 780a428f9d51f783bd7b39929924c43f8460a180 Mon Sep 17 00:00:00 2001 From: "Gavin M. Roy" Date: Mon, 6 Apr 2020 18:23:53 -0400 Subject: [PATCH 08/19] Specify the max size of the lrucache for Python 3.7 --- sprockets_postgres.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sprockets_postgres.py b/sprockets_postgres.py index d7ab758..2c04970 100644 --- a/sprockets_postgres.py +++ b/sprockets_postgres.py @@ -169,7 +169,7 @@ class ApplicationMixin: return await cursor.fetchall() return None - @functools.lru_cache + @functools.lru_cache(2) def _postgres_query_timeout(self, timeout: typing.Optional[int] = None) -> int: """Return query timeout, either from the specified value or From ad92604caa6a013d8fef46b278741118fb125725 Mon Sep 17 00:00:00 2001 From: "Gavin M. Roy" Date: Mon, 6 Apr 2020 18:24:38 -0400 Subject: [PATCH 09/19] Fix coverage report --- setup.cfg | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.cfg b/setup.cfg index 3d25199..629b3d0 100644 --- a/setup.cfg +++ b/setup.cfg @@ -61,7 +61,7 @@ data_file = build/.coverage [coverage:report] show_missing = True include = - sprockets_postgres/*.py + sprockets_postgres.py [coverage:html] directory = build/coverage From a7f8ef35c8fd818a96a67b94edc0f5850cf884f6 Mon Sep 17 00:00:00 2001 From: "Gavin M. Roy" Date: Mon, 6 Apr 2020 18:31:29 -0400 Subject: [PATCH 10/19] Only upload coverage if on main repo --- .github/workflows/testing.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/testing.yaml b/.github/workflows/testing.yaml index 5d376d0..86407f4 100644 --- a/.github/workflows/testing.yaml +++ b/.github/workflows/testing.yaml @@ -61,6 +61,7 @@ jobs: - name: Upload Coverage uses: codecov/codecov-action@v1.0.2 + if: github.event_name == 'push' && startsWith(github.event.ref, 'refs/tags') && github.repository == 'sprockets/sprockets-postgres' with: token: ${{secrets.CODECOV_TOKEN}} file: build/coverage.xml From 324347e32865e7acb14a1cae224d8f455148d4ec Mon Sep 17 00:00:00 2001 From: "Gavin M. Roy" Date: Mon, 6 Apr 2020 18:32:18 -0400 Subject: [PATCH 11/19] Only deploy on main branch --- .github/workflows/deploy.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/deploy.yaml b/.github/workflows/deploy.yaml index 5ac31cd..09c97f1 100644 --- a/.github/workflows/deploy.yaml +++ b/.github/workflows/deploy.yaml @@ -6,7 +6,7 @@ on: jobs: deploy: runs-on: ubuntu-latest - if: github.event_name == 'push' && startsWith(github.event.ref, 'refs/tags') + if: github.event_name == 'push' && startsWith(github.event.ref, 'refs/tags') && github.repository == 'sprockets/sprockets-postgres' container: python:3.8-alpine steps: - name: Checkout repository From b9c9495545c1be97b085f156faab38bcb9781897 Mon Sep 17 00:00:00 2001 From: "Gavin M. Roy" Date: Mon, 6 Apr 2020 18:33:17 -0400 Subject: [PATCH 12/19] Better conditional --- .github/workflows/testing.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/testing.yaml b/.github/workflows/testing.yaml index 86407f4..f1eba12 100644 --- a/.github/workflows/testing.yaml +++ b/.github/workflows/testing.yaml @@ -61,7 +61,7 @@ jobs: - name: Upload Coverage uses: codecov/codecov-action@v1.0.2 - if: github.event_name == 'push' && startsWith(github.event.ref, 'refs/tags') && github.repository == 'sprockets/sprockets-postgres' + if: github.event_name == 'push' && github.repository == 'sprockets/sprockets-postgres' with: token: ${{secrets.CODECOV_TOKEN}} file: build/coverage.xml From 611dfd1ec7e181ef17b9bbc81bdfdfe81e49f6df Mon Sep 17 00:00:00 2001 From: "Gavin M. Roy" Date: Tue, 7 Apr 2020 13:42:02 -0400 Subject: [PATCH 13/19] WIP refactor --- sprockets_postgres.py | 361 ++++++++++++++++++++++++++---------------- tests.py | 75 ++++++--- 2 files changed, 277 insertions(+), 159 deletions(-) diff --git a/sprockets_postgres.py b/sprockets_postgres.py index 2c04970..9618843 100644 --- a/sprockets_postgres.py +++ b/sprockets_postgres.py @@ -1,24 +1,16 @@ -""" -:class:`sprockets.http.app.Application` mixin for handling the connection to -Postgres and exporting functions for querying the database, getting the status, -and proving a cursor. - -Automatically creates and shuts down :class:`aio.pool.Pool` on startup -and shutdown. - -""" import asyncio import contextlib -import functools +import dataclasses import logging import os +import time import typing from distutils import util import aiopg import psycopg2 from aiopg import pool -from psycopg2 import errors, extensions, extras +from psycopg2 import errors, extras from tornado import ioloop, web LOGGER = logging.getLogger('sprockets-postgres') @@ -30,13 +22,115 @@ DEFAULT_POSTGRES_JSON = 'FALSE' DEFAULT_POSTGRES_MAX_POOL_SIZE = 0 DEFAULT_POSTGRES_MIN_POOL_SIZE = 1 DEFAULT_POSTGRES_QUERY_TIMEOUT = 120 -DEFAULT_POSTGRES_URL = 'postgresql://localhost:5432' DEFAULT_POSTGRES_UUID = 'TRUE' +QueryParameters = typing.Union[list, tuple, None] +Timeout = typing.Union[int, float, None] + + +@dataclasses.dataclass +class QueryResult: + row_count: int + row: typing.Optional[dict] + rows: typing.Optional[typing.List[dict]] + + +class PostgresConnector: + + def __init__(self, + cursor: aiopg.Cursor, + on_error: typing.Callable, + record_duration: typing.Optional[typing.Callable] = None, + timeout: Timeout = None): + self.cursor = cursor + self._on_error = on_error + self._record_duration = record_duration + self._timeout = timeout or int( + os.environ.get( + 'POSTGRES_QUERY_TIMEOUT', + DEFAULT_POSTGRES_QUERY_TIMEOUT)) + + async def callproc(self, + name: str, + parameters: QueryParameters = None, + metric_name: str = '', + *, + timeout: Timeout = None) -> QueryResult: + return await self._query( + self.cursor.callproc, + metric_name, + procname=name, + parameters=parameters, + timeout=timeout) + + async def execute(self, + sql: str, + parameters: QueryParameters = None, + metric_name: str = '', + *, + timeout: Timeout = None) -> QueryResult: + return await self._query( + self.cursor.execute, + metric_name, + operation=sql, + parameters=parameters, + timeout=timeout) + + @contextlib.asynccontextmanager + async def transaction(self) \ + -> typing.AsyncContextManager['PostgresConnector']: + async with self.cursor.begin(): + yield self + + async def _query(self, + method: typing.Callable, + metric_name: str, + **kwargs): + if kwargs['timeout'] is None: + kwargs['timeout'] = self._timeout + start_time = time.monotonic() + try: + await method(**kwargs) + except (asyncio.TimeoutError, psycopg2.Error) as err: + LOGGER.error('Caught %r', err) + exc = self._on_error(metric_name, err) + if exc: + raise exc + finally: + if self._record_duration: + self._record_duration( + metric_name, time.monotonic() - start_time) + return await self._query_results() + + async def _query_results(self) -> QueryResult: + row, rows = None, None + if self.cursor.rowcount == 1: + try: + row = dict(await self.cursor.fetchone()) + except psycopg2.ProgrammingError: + pass + elif self.cursor.rowcount > 1: + try: + rows = [dict(row) for row in await self.cursor.fetchall()] + except psycopg2.ProgrammingError: + pass + return QueryResult(self.cursor.rowcount, row, rows) + + +class ConnectionException(Exception): + """Raised when the connection to Postgres can not be established""" + class ApplicationMixin: - """Application mixin for setting up the PostgreSQL client pool""" + """ + :class:`sprockets.http.app.Application` mixin for handling the connection + to Postgres and exporting functions for querying the database, + getting the status, and proving a cursor. + Automatically creates and shuts down :class:`aio.pool.Pool` on startup + and shutdown. + + """ POSTGRES_STATUS_TIMEOUT = 3 def __init__(self, *args, **kwargs): @@ -46,155 +140,57 @@ class ApplicationMixin: self.runner_callbacks['shutdown'].append(self._postgres_shutdown) @contextlib.asynccontextmanager - async def postgres_cursor(self, - timeout: typing.Optional[int] = None, - raise_http_error: bool = True) \ - -> typing.AsyncContextManager[extensions.cursor]: - """Return a Postgres cursor for the pool""" + async def postgres_connector(self, + on_error: typing.Callable, + record_duration: typing.Optional[ + typing.Callable] = None, + timeout: Timeout = None) \ + -> typing.AsyncContextManager[PostgresConnector]: try: async with self._postgres_pool.acquire() as conn: async with conn.cursor( cursor_factory=extras.RealDictCursor, - timeout=self._postgres_query_timeout(timeout)) as pgc: - yield pgc - except (asyncio.TimeoutError, - psycopg2.OperationalError, - psycopg2.Error) as error: - LOGGER.critical('Error connecting to Postgres: %s', error) - if raise_http_error: - raise web.HTTPError(503, 'Database Unavailable') - raise - - async def postgres_callproc(self, - name: str, - params: typing.Union[list, tuple, None] = None, - timeout: typing.Optional[int] = None) \ - -> typing.Union[dict, list, None]: - """Execute a stored procedure, specifying the name, SQL, passing in - optional parameters. - - :param name: The stored-proc to call - :param params: Optional parameters to pass into the function - :param timeout: Optional timeout to override the default query timeout - - """ - async with self.postgres_cursor(timeout) as cursor: - return await self._postgres_query( - cursor, cursor.callproc, name, name, params) - - async def postgres_execute(self, name: str, sql: str, - *args, - timeout: typing.Optional[int] = None) \ - -> typing.Union[dict, list, None]: - """Execute a query, specifying a name for the query, the SQL statement, - and optional positional arguments to pass in with the query. - - Parameters may be provided as sequence or mapping and will be - bound to variables in the operation. Variables are specified - either with positional ``%s`` or named ``%({name})s`` placeholders. - - :param name: The stored-proc to call - :param sql: The SQL statement to execute - :param timeout: Optional timeout to override the default query timeout - - """ - async with self.postgres_cursor(timeout) as cursor: - return await self._postgres_query( - cursor, cursor.execute, name, sql, args) + timeout=timeout) as cursor: + yield PostgresConnector( + cursor, on_error, record_duration, timeout) + except (asyncio.TimeoutError, psycopg2.Error) as err: + on_error('postgres_connector', ConnectionException(str(err))) async def postgres_status(self) -> dict: """Invoke from the ``/status`` RequestHandler to check that there is a Postgres connection handler available and return info about the pool. - """ - available = True - try: - async with self.postgres_cursor( - self.POSTGRES_STATUS_TIMEOUT, False) as cursor: - await cursor.execute('SELECT 1') - except (asyncio.TimeoutError, psycopg2.OperationalError): - available = False + """ + query_error = asyncio.Event() + + def on_error(_metric_name, _exc) -> None: + query_error.set() + return None + + async with self.postgres_connector( + on_error, + timeout=self.POSTGRES_STATUS_TIMEOUT) as connector: + await connector.execute('SELECT 1') return { - 'available': available, + 'available': not query_error.is_set(), 'pool_size': self._postgres_pool.size, 'pool_free': self._postgres_pool.freesize } - async def _postgres_query(self, - cursor: aiopg.Cursor, - method: typing.Callable, - name: str, - sql: str, - parameters: typing.Union[dict, list, tuple]) \ - -> typing.Union[dict, list, None]: - """Execute a query, specifying the name, SQL, passing in - - """ - try: - await method(sql, parameters) - except asyncio.TimeoutError as err: - LOGGER.error('Query timeout for %s: %s', - name, str(err).split('\n')[0]) - raise web.HTTPError(500, reason='Query Timeout') - except errors.UniqueViolation as err: - LOGGER.error('Database error for %s: %s', - name, str(err).split('\n')[0]) - raise web.HTTPError(409, reason='Unique Violation') - except psycopg2.Error as err: - LOGGER.error('Database error for %s: %s', - name, str(err).split('\n')[0]) - raise web.HTTPError(500, reason='Database Error') - try: - return await self._postgres_query_results(cursor) - except psycopg2.ProgrammingError: - return - - @staticmethod - async def _postgres_query_results(cursor: aiopg.Cursor) \ - -> typing.Union[dict, list, None]: - """Invoked by self.postgres_query to return all of the query results - as either a ``dict`` or ``list`` depending on the quantity of rows. - - This can raise a ``psycopg2.ProgrammingError`` for an INSERT/UPDATE - without RETURNING or a DELETE. That exception is caught by the caller. - - :raises psycopg2.ProgrammingError: when there are no rows to fetch - even though the rowcount is > 0 - - """ - if cursor.rowcount == 1: - return await cursor.fetchone() - elif cursor.rowcount > 1: - return await cursor.fetchall() - return None - - @functools.lru_cache(2) - def _postgres_query_timeout(self, - timeout: typing.Optional[int] = None) -> int: - """Return query timeout, either from the specified value or - ``POSTGRES_QUERY_TIMEOUT`` environment variable, if set. - - Defaults to sprockets_postgres.DEFAULT_POSTGRES_QUERY_TIMEOUT. - - """ - return timeout if timeout else int( - os.environ.get( - 'POSTGRES_QUERY_TIMEOUT', - DEFAULT_POSTGRES_QUERY_TIMEOUT)) - async def _postgres_setup(self, _app: web.Application, - _ioloop: ioloop.IOLoop) -> None: + loop: ioloop.IOLoop) -> None: """Setup the Postgres pool of connections and log if there is an error. This is invoked by the Application on start callback mechanism. """ - url = os.environ.get('POSTGRES_URL', DEFAULT_POSTGRES_URL) - LOGGER.debug('Connecting to PostgreSQL: %s', url) + if 'POSTGRES_URL' not in os.environ: + LOGGER.critical('Missing POSTGRES_URL environment variable') + return self.stop(loop) self._postgres_pool = pool.Pool( - url, + os.environ['POSTGRES_URL'], minsize=int( os.environ.get( 'POSTGRES_MIN_POOL_SIZE', @@ -236,3 +232,90 @@ class ApplicationMixin: """ self._postgres_pool.close() await self._postgres_pool.wait_closed() + + +class RequestHandlerMixin: + """ + RequestHandler mixin class exposing functions for querying the database, + recording the duration to either `sprockets-influxdb` or + `sprockets.mixins.metrics`, and handling exceptions. + + """ + async def postgres_callproc(self, + name: str, + parameters: QueryParameters = None, + metric_name: str = '', + *, + timeout: Timeout = None) -> QueryResult: + async with self._postgres_connector(timeout) as connector: + return await connector.callproc( + name, parameters, metric_name, timeout=timeout) + + async def postgres_execute(self, + sql: str, + parameters: QueryParameters = None, + metric_name: str = '', + *, + timeout: Timeout = None) -> QueryResult: + """Execute a query, specifying a name for the query, the SQL statement, + and optional positional arguments to pass in with the query. + + Parameters may be provided as sequence or mapping and will be + bound to variables in the operation. Variables are specified + either with positional ``%s`` or named ``%({name})s`` placeholders. + + """ + async with self._postgres_connector(timeout) as connector: + return await connector.execute( + sql, parameters, metric_name, timeout=timeout) + + @contextlib.asynccontextmanager + async def postgres_transaction(self, timeout: Timeout = None) \ + -> typing.AsyncContextManager[PostgresConnector]: + """Yields a :class:`PostgresConnector` instance in a transaction. + Will automatically commit or rollback based upon exception. + + """ + async with self._postgres_connector(timeout) as connector: + async with connector.transaction(): + yield connector + + @contextlib.asynccontextmanager + async def _postgres_connector(self, timeout: Timeout = None) \ + -> typing.AsyncContextManager[PostgresConnector]: + async with self.application.postgres_connector( + self.__on_postgres_error, + self.__on_postgres_timing, + timeout) as connector: + yield connector + + def __on_postgres_error(self, + metric_name: str, + exc: Exception) -> typing.Optional[Exception]: + """Override for different error handling behaviors""" + LOGGER.error('%s in %s for %s (%s)', + exc.__class__.__name__, + self.__class__.__name__, + metric_name, + str(exc).split('\n')[0]) + if isinstance(exc, ConnectionException): + raise web.HTTPError(503, reason='Database Connection Error') + elif isinstance(exc, asyncio.TimeoutError): + raise web.HTTPError(500, reason='Query Timeout') + elif isinstance(exc, errors.UniqueViolation): + raise web.HTTPError(409, reason='Unique Violation') + elif isinstance(exc, psycopg2.Error): + raise web.HTTPError(500, reason='Database Error') + return exc + + def __on_postgres_timing(self, + metric_name: str, + duration: float) -> None: + """Override for custom metric recording""" + if hasattr(self, 'influxdb'): # sprockets-influxdb + self.influxdb.set_field(metric_name, duration) + elif hasattr(self, 'record_timing'): # sprockets.mixins.metrics + self.record_timing(metric_name, duration) + else: + LOGGER.debug('Postgres query %s duration: %s', + metric_name, duration) diff --git a/tests.py b/tests.py index 2294ff8..acfa9a2 100644 --- a/tests.py +++ b/tests.py @@ -12,40 +12,46 @@ from tornado import web import sprockets_postgres -class CallprocRequestHandler(web.RequestHandler): +class CallprocRequestHandler(sprockets_postgres.RequestHandlerMixin, + web.RequestHandler): async def get(self): - result = await self.application.postgres_callproc('uuid_generate_v4') - await self.finish({'value': str(result['uuid_generate_v4'])}) + result = await self.postgres_callproc( + 'uuid_generate_v4', metric_name='uuid') + await self.finish({'value': str(result.row['uuid_generate_v4'])}) -class ExecuteRequestHandler(web.RequestHandler): +class ExecuteRequestHandler(sprockets_postgres.RequestHandlerMixin, + web.RequestHandler): GET_SQL = 'SELECT %s::TEXT AS value;' async def get(self): - result = await self.application.postgres_execute( - 'get', self.GET_SQL, self.get_argument('value')) - await self.finish({'value': result['value'] if result else None}) + result = await self.postgres_execute( + self.GET_SQL, [self.get_argument('value')], 'get') + await self.finish({ + 'value': result.row['value'] if result.row else None}) -class MultiRowRequestHandler(web.RequestHandler): +class MultiRowRequestHandler(sprockets_postgres.RequestHandlerMixin, + web.RequestHandler): GET_SQL = 'SELECT * FROM information_schema.enabled_roles;' async def get(self): - rows = await self.application.postgres_execute('get', self.GET_SQL) - await self.finish({'rows': [row['role_name'] for row in rows]}) + result = await self.postgres_execute(self.GET_SQL) + await self.finish({'rows': [row['role_name'] for row in result.rows]}) -class NoRowRequestHandler(web.RequestHandler): +class NoRowRequestHandler(sprockets_postgres.RequestHandlerMixin, + web.RequestHandler): GET_SQL = """\ SELECT * FROM information_schema.tables WHERE table_schema = 'public';""" async def get(self): - rows = await self.application.postgres_execute('get', self.GET_SQL) - await self.finish({'rows': rows}) + result = await self.postgres_execute(self.GET_SQL) + await self.finish({'rows': result.rows}) class StatusRequestHandler(web.RequestHandler): @@ -62,7 +68,7 @@ class Application(sprockets_postgres.ApplicationMixin, pass -class ExecuteTestCase(testing.SprocketsHttpTestCase): +class TestCase(testing.SprocketsHttpTestCase): @classmethod def setUpClass(cls): @@ -143,15 +149,44 @@ class ExecuteTestCase(testing.SprocketsHttpTestCase): self.assertEqual(response.code, 500) self.assertIn(b'Database Error', response.body) - def test_postgres_programming_error(self): - with mock.patch.object(self.app, '_postgres_query_results') as pqr: - pqr.side_effect = psycopg2.ProgrammingError() - response = self.fetch('/execute?value=1') - self.assertEqual(response.code, 200) - self.assertIsNone(json.loads(response.body)['value']) + @mock.patch('aiopg.cursor.Cursor.fetchone') + def test_postgres_programming_error(self, fetchone): + fetchone.side_effect = psycopg2.ProgrammingError() + response = self.fetch('/execute?value=1') + self.assertEqual(response.code, 200) + self.assertIsNone(json.loads(response.body)['value']) @mock.patch('aiopg.connection.Connection.cursor') def test_postgres_cursor_raises(self, cursor): cursor.side_effect = asyncio.TimeoutError() response = self.fetch('/execute?value=1') self.assertEqual(response.code, 503) + +""" +class MissingURLTestCase(testing.SprocketsHttpTestCase): + + @classmethod + def setUpClass(cls): + with open('build/test-environment') as f: + for line in f: + if line.startswith('export '): + line = line[7:] + name, _, value = line.strip().partition('=') + if name != 'POSTGRES_URL': + os.environ[name] = value + if 'POSTGRES_URL' in os.environ: + del os.environ['POSTGRES_URL'] + + def setUp(self): + self.stop_mock = None + super().setUp() + + def get_app(self): + self.app = Application() + self.stop_mock = mock.Mock( + wraps=self.app.stop, side_effect=RuntimeError) + return self.app + + def test_that_stop_is_invoked(self): + self.stop_mock.assert_called_once_with(self.io_loop) +""" From 603eb4d6dd3e2c4912b0c695c2d88a11994f5601 Mon Sep 17 00:00:00 2001 From: "Gavin M. Roy" Date: Tue, 7 Apr 2020 16:59:06 -0400 Subject: [PATCH 14/19] Testing update and fixes found in testing --- .github/workflows/testing.yaml | 2 +- bootstrap | 4 +- docker-compose.yml | 4 + fixtures/testing.sql | 30 ++++ sprockets_postgres.py | 51 ++++--- tests.py | 249 ++++++++++++++++++++++++++++++--- 6 files changed, 290 insertions(+), 50 deletions(-) create mode 100644 fixtures/testing.sql diff --git a/.github/workflows/testing.yaml b/.github/workflows/testing.yaml index f1eba12..93a17bd 100644 --- a/.github/workflows/testing.yaml +++ b/.github/workflows/testing.yaml @@ -48,7 +48,7 @@ jobs: - name: Install UUID extension run: | apk add postgresql-client \ - && psql -q -h postgres -U postgres -d postgres -c 'CREATE EXTENSION "uuid-ossp";' + && psql -q -h postgres -U postgres -d postgres -f fixtures/testing.sql - name: Run flake8 tests run: flake8 diff --git a/bootstrap b/bootstrap index d527a2a..735a3fc 100755 --- a/bootstrap +++ b/bootstrap @@ -65,7 +65,9 @@ docker-compose up -d --quiet-pull wait_for_healthy_containers 1 -docker-compose exec postgres psql -q -o /dev/null -U postgres -d postgres -c 'CREATE EXTENSION "uuid-ossp";' +printf "Loading fixture data ... " +docker-compose exec postgres psql -q -o /dev/null -U postgres -d postgres -f /fixtures/testing.sql +report_done cat > build/test-environment< QueryResult: - row, rows = None, None + count, row, rows = self.cursor.rowcount, None, None if self.cursor.rowcount == 1: try: row = dict(await self.cursor.fetchone()) @@ -114,7 +113,7 @@ class PostgresConnector: rows = [dict(row) for row in await self.cursor.fetchall()] except psycopg2.ProgrammingError: pass - return QueryResult(self.cursor.rowcount, row, rows) + return QueryResult(count, row, rows) class ConnectionException(Exception): @@ -247,7 +246,10 @@ class RequestHandlerMixin: metric_name: str = '', *, timeout: Timeout = None) -> QueryResult: - async with self._postgres_connector(timeout) as connector: + async with self.application.postgres_connector( + self._on_postgres_error, + self._on_postgres_timing, + timeout) as connector: return await connector.callproc( name, parameters, metric_name, timeout=timeout) @@ -265,7 +267,10 @@ class RequestHandlerMixin: either with positional ``%s`` or named ``%({name})s`` placeholders. """ - async with self._postgres_connector(timeout) as connector: + async with self.application.postgres_connector( + self._on_postgres_error, + self._on_postgres_timing, + timeout) as connector: return await connector.execute( sql, parameters, metric_name, timeout=timeout) @@ -276,28 +281,20 @@ class RequestHandlerMixin: Will automatically commit or rollback based upon exception. """ - async with self._postgres_connector(timeout) as connector: + async with self.application.postgres_connector( + self._on_postgres_error, + self._on_postgres_timing, + timeout) as connector: async with connector.transaction(): yield connector - @contextlib.asynccontextmanager - async def _postgres_connector(self, timeout: Timeout = None) \ - -> typing.AsyncContextManager[PostgresConnector]: - async with self.application.postgres_connector( - self.__on_postgres_error, - self.__on_postgres_timing, - timeout) as connector: - yield connector - - def __on_postgres_error(self, - metric_name: str, - exc: Exception) -> typing.Optional[Exception]: + def _on_postgres_error(self, + metric_name: str, + exc: Exception) -> typing.Optional[Exception]: """Override for different error handling behaviors""" LOGGER.error('%s in %s for %s (%s)', - exc.__class__.__name__, - self.__class__.__name__, - metric_name, - str(exc).split('\n')[0]) + exc.__class__.__name__, self.__class__.__name__, + metric_name, str(exc).split('\n')[0]) if isinstance(exc, ConnectionException): raise web.HTTPError(503, reason='Database Connection Error') elif isinstance(exc, asyncio.TimeoutError): @@ -308,9 +305,9 @@ class RequestHandlerMixin: raise web.HTTPError(500, reason='Database Error') return exc - def __on_postgres_timing(self, - metric_name: str, - duration: float) -> None: + def _on_postgres_timing(self, + metric_name: str, + duration: float) -> None: """Override for custom metric recording""" if hasattr(self, 'influxdb'): # sprockets-influxdb self.influxdb.set_field(metric_name, duration) diff --git a/tests.py b/tests.py index acfa9a2..f676824 100644 --- a/tests.py +++ b/tests.py @@ -1,6 +1,7 @@ import asyncio import json import os +import typing import uuid from unittest import mock @@ -12,8 +13,20 @@ from tornado import web import sprockets_postgres -class CallprocRequestHandler(sprockets_postgres.RequestHandlerMixin, - web.RequestHandler): +class RequestHandler(sprockets_postgres.RequestHandlerMixin, + web.RequestHandler): + """Base RequestHandler for test endpoints""" + + def cast_data(self, data: typing.Union[dict, list, None]) \ + -> typing.Union[dict, list, None]: + if data is None: + return None + elif isinstance(data, list): + return [self.cast_data(row) for row in data] + return {k: str(v) for k, v in data.items()} + + +class CallprocRequestHandler(RequestHandler): async def get(self): result = await self.postgres_callproc( @@ -21,51 +34,164 @@ class CallprocRequestHandler(sprockets_postgres.RequestHandlerMixin, await self.finish({'value': str(result.row['uuid_generate_v4'])}) -class ExecuteRequestHandler(sprockets_postgres.RequestHandlerMixin, - web.RequestHandler): +class CountRequestHandler(RequestHandler): + + GET_SQL = """\ + SELECT last_updated_at, count + FROM public.query_count + WHERE key = 'test';""" + + async def get(self): + result = await self.postgres_execute(self.GET_SQL) + await self.finish(self.cast_data(result.row)) + + +class ErrorRequestHandler(RequestHandler): + + GET_SQL = """\ + SELECT last_updated_at, count + FROM public.query_count + WHERE key = 'test';""" + + async def get(self): + await self.postgres_execute(self.GET_SQL) + self.set_status(204) + + def _on_postgres_error(self, + metric_name: str, + exc: Exception) -> typing.Optional[Exception]: + return RuntimeError() + + +class ExecuteRequestHandler(RequestHandler): GET_SQL = 'SELECT %s::TEXT AS value;' async def get(self): + timeout = self.get_argument('timeout', None) + if timeout is not None: + timeout = int(timeout) result = await self.postgres_execute( - self.GET_SQL, [self.get_argument('value')], 'get') + self.GET_SQL, [self.get_argument('value')], timeout=timeout) await self.finish({ 'value': result.row['value'] if result.row else None}) -class MultiRowRequestHandler(sprockets_postgres.RequestHandlerMixin, - web.RequestHandler): +class InfluxDBRequestHandler(ExecuteRequestHandler): - GET_SQL = 'SELECT * FROM information_schema.enabled_roles;' + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.influxdb = self.application.influxdb + self.influxdb.add_field = mock.Mock() + + +class MetricsMixinRequestHandler(ExecuteRequestHandler): + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.record_timing = self.application.record_timing + + +class MultiRowRequestHandler(RequestHandler): + + GET_SQL = 'SELECT * FROM public.test_rows;' + + UPDATE_SQL = """\ + UPDATE public.test_rows + SET toggle = %(to_value)s, + last_modified_at = CURRENT_TIMESTAMP + WHERE toggle IS %(from_value)s""" async def get(self): result = await self.postgres_execute(self.GET_SQL) - await self.finish({'rows': [row['role_name'] for row in result.rows]}) + await self.finish({ + 'count': result.row_count, + 'rows': self.cast_data(result.rows)}) + + async def post(self): + body = json.loads(self.request.body.decode('utf-8')) + result = await self.postgres_execute( + self.UPDATE_SQL, { + 'to_value': body['value'], 'from_value': not body['value']}) + await self.finish({ + 'count': result.row_count, + 'rows': self.cast_data(result.rows)}) -class NoRowRequestHandler(sprockets_postgres.RequestHandlerMixin, - web.RequestHandler): +class NoRowRequestHandler(RequestHandler): GET_SQL = """\ - SELECT * FROM information_schema.tables WHERE table_schema = 'public';""" + SELECT * FROM information_schema.tables WHERE table_schema = 'foo';""" async def get(self): result = await self.postgres_execute(self.GET_SQL) - await self.finish({'rows': result.rows}) + await self.finish({ + 'count': result.row_count, + 'rows': self.cast_data(result.rows)}) -class StatusRequestHandler(web.RequestHandler): +class StatusRequestHandler(RequestHandler): async def get(self): - result = await self.application.postgres_status() - if not result['available']: + status = await self.application.postgres_status() + if not status['available']: self.set_status(503, 'Database Unavailable') - await self.finish(dict(result)) + await self.finish(status) + + +class TransactionRequestHandler(RequestHandler): + + GET_SQL = """\ + SELECT id, created_at, last_modified_at, value + FROM public.test + WHERE id = %(id)s;""" + + POST_SQL = """\ + INSERT INTO public.test (id, created_at, value) + VALUES (%(id)s, CURRENT_TIMESTAMP, %(value)s) + RETURNING id, created_at, value;""" + + UPDATE_COUNT_SQL = """\ + UPDATE public.query_count + SET count = count + 1, + last_updated_at = CURRENT_TIMESTAMP + WHERE key = 'test' + RETURNING last_updated_at, count;""" + + async def get(self, test_id): + result = await self.postgres_execute(self.GET_SQL, {'id': test_id}) + if not result.row_count: + raise web.HTTPError(404, 'Not Found') + await self.finish(self.cast_data(result.row)) + + async def post(self): + body = json.loads(self.request.body.decode('utf-8')) + async with self.postgres_transaction() as postgres: + + # This should roll back on the second call to this endopoint + self.application.first_txn = await postgres.execute( + self.POST_SQL, {'id': str(uuid.uuid4()), + 'value': str(uuid.uuid4())}) + + # This should roll back on the second call to this endopoint + count = await postgres.execute(self.UPDATE_COUNT_SQL) + + # This will trigger an error on the second call to this endpoint + user = await postgres.execute(self.POST_SQL, body) + + await self.finish({ + 'count': self.cast_data(count.row), + 'user': self.cast_data(user.row)}) class Application(sprockets_postgres.ApplicationMixin, app.Application): - pass + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.influxdb = mock.Mock() + self.record_timing = mock.Mock() + self.first_txn: typing.Optional[sprockets_postgres.QueryResult] = None class TestCase(testing.SprocketsHttpTestCase): @@ -82,13 +208,22 @@ class TestCase(testing.SprocketsHttpTestCase): def get_app(self): self.app = Application(handlers=[ web.url('/callproc', CallprocRequestHandler), + web.url('/count', CountRequestHandler), + web.url('/error', ErrorRequestHandler), web.url('/execute', ExecuteRequestHandler), + web.url('/influxdb', InfluxDBRequestHandler), + web.url('/metrics-mixin', MetricsMixinRequestHandler), web.url('/multi-row', MultiRowRequestHandler), web.url('/no-row', NoRowRequestHandler), - web.url('/status', StatusRequestHandler) + web.url('/status', StatusRequestHandler), + web.url('/transaction', TransactionRequestHandler), + web.url('/transaction/(?P.*)', TransactionRequestHandler) ]) return self.app + +class RequestHandlerMixinTestCase(TestCase): + def test_postgres_status(self): response = self.fetch('/status') data = json.loads(response.body) @@ -109,23 +244,63 @@ class TestCase(testing.SprocketsHttpTestCase): self.assertIsInstance( uuid.UUID(json.loads(response.body)['value']), uuid.UUID) + @mock.patch('aiopg.cursor.Cursor.execute') + def test_postgres_error_passthrough(self, execute): + execute.side_effect = asyncio.TimeoutError + response = self.fetch('/error') + self.assertEqual(response.code, 500) + self.assertIn(b'Internal Server Error', response.body) + def test_postgres_execute(self): expectation = str(uuid.uuid4()) response = self.fetch('/execute?value={}'.format(expectation)) self.assertEqual(response.code, 200) self.assertEqual(json.loads(response.body)['value'], expectation) - def test_postgres_multirow(self): + def test_postgres_execute_with_timeout(self): + expectation = str(uuid.uuid4()) + response = self.fetch( + '/execute?value={}&timeout=5'.format(expectation)) + self.assertEqual(response.code, 200) + self.assertEqual(json.loads(response.body)['value'], expectation) + + def test_postgres_influxdb(self): + expectation = str(uuid.uuid4()) + response = self.fetch( + '/influxdb?value={}'.format(expectation)) + self.assertEqual(response.code, 200) + self.assertEqual(json.loads(response.body)['value'], expectation) + self.app.influxdb.set_field.assert_called_once() + + def test_postgres_metrics_mixin(self): + expectation = str(uuid.uuid4()) + response = self.fetch( + '/metrics-mixin?value={}'.format(expectation)) + self.assertEqual(response.code, 200) + self.assertEqual(json.loads(response.body)['value'], expectation) + self.app.record_timing.assert_called_once() + + def test_postgres_multirow_get(self): response = self.fetch('/multi-row') self.assertEqual(response.code, 200) body = json.loads(response.body) + self.assertEqual(body['count'], 5) self.assertIsInstance(body['rows'], list) - self.assertIn('postgres', body['rows']) + + def test_postgres_multirow_no_data(self): + for value in [True, False]: + response = self.fetch( + '/multi-row', method='POST', body=json.dumps({'value': value})) + self.assertEqual(response.code, 200) + body = json.loads(response.body) + self.assertEqual(body['count'], 5) + self.assertIsNone(body['rows']) def test_postgres_norow(self): response = self.fetch('/no-row') self.assertEqual(response.code, 200) body = json.loads(response.body) + self.assertEqual(body['count'], 0) self.assertIsNone(body['rows']) @mock.patch('aiopg.cursor.Cursor.execute') @@ -162,6 +337,38 @@ class TestCase(testing.SprocketsHttpTestCase): response = self.fetch('/execute?value=1') self.assertEqual(response.code, 503) + +class TransactionTestCase(TestCase): + + def test_transactions(self): + test_body = { + 'id': str(uuid.uuid4()), + 'value': str(uuid.uuid4()) + } + response = self.fetch( + '/transaction', method='POST', body=json.dumps(test_body)) + self.assertEqual(response.code, 200) + record = json.loads(response.body.decode('utf-8')) + self.assertEqual(record['user']['id'], test_body['id']) + self.assertEqual(record['user']['value'], test_body['value']) + count = record['count']['count'] + last_updated = record['count']['last_updated_at'] + + response = self.fetch( + '/transaction', method='POST', body=json.dumps(test_body)) + self.assertEqual(response.code, 409) + + response = self.fetch( + '/transaction/{}'.format(self.app.first_txn.row['id'])) + self.assertEqual(response.code, 404) + + response = self.fetch('/count') + self.assertEqual(response.code, 200) + record = json.loads(response.body.decode('utf-8')) + self.assertEqual(record['count'], count) + self.assertEqual(record['last_updated_at'], last_updated) + + """ class MissingURLTestCase(testing.SprocketsHttpTestCase): From 58d349e7900ecbe71f0b872bc5b8c9edb7463482 Mon Sep 17 00:00:00 2001 From: "Gavin M. Roy" Date: Wed, 8 Apr 2020 18:15:50 -0400 Subject: [PATCH 15/19] Add documentation --- README.rst | 47 ++++++++++++++++++++-------------------- docs/application.rst | 6 +++++ docs/conf.py | 41 +++++++++++++++++++++++++++++++++++ docs/configuration.rst | 27 +++++++++++++++++++++++ docs/connector.rst | 9 ++++++++ docs/example.rst | 42 +++++++++++++++++++++++++++++++++++ docs/exceptions.rst | 4 ++++ docs/genindex.rst | 2 ++ docs/index.rst | 31 ++++++++++++++++++++++++++ docs/query_result.rst | 9 ++++++++ docs/request_handler.rst | 11 ++++++++++ docs/requirements.txt | 5 +++++ docs/typing.rst | 6 +++++ 13 files changed, 217 insertions(+), 23 deletions(-) create mode 100644 docs/application.rst create mode 100644 docs/conf.py create mode 100644 docs/configuration.rst create mode 100644 docs/connector.rst create mode 100644 docs/example.rst create mode 100644 docs/exceptions.rst create mode 100644 docs/genindex.rst create mode 100644 docs/index.rst create mode 100644 docs/query_result.rst create mode 100644 docs/request_handler.rst create mode 100644 docs/requirements.txt create mode 100644 docs/typing.rst diff --git a/README.rst b/README.rst index db62a24..8d92f5a 100644 --- a/README.rst +++ b/README.rst @@ -1,6 +1,7 @@ Sprockets Postgres ================== -An asynchronous Postgres client mixin for Tornado applications +An set of mixins and classes for interacting with PostgreSQL using asyncio in +Tornado / sprockets.http applications using aiopg. |Version| |Status| |Coverage| |License| @@ -20,28 +21,28 @@ Configuration ------------- The following table details the environment variable configuration options: -+---------------------------------+--------------------------------------------------+---------------------------------+ -| Variable | Definition | Default | -+=================================+==================================================+=================================+ -| ``PGSQL_URL`` | The PostgreSQL URL to connect to | ``postgresql://localhost:5432`` | -+---------------------------------+--------------------------------------------------+---------------------------------+ -| ``POSTGRES_MAX_POOL_SIZE`` | Maximum connection count to Postgres per backend | ``0`` (Unlimited) | -+---------------------------------+--------------------------------------------------+---------------------------------+ -| ``POSTGRES_MIN_POOL_SIZE`` | Minimum or starting pool size. | ``1`` | -+---------------------------------+--------------------------------------------------+---------------------------------+ -| ``POSTGRES_CONNECTION_TIMEOUT`` | The maximum time in seconds to spend attempting | ``10`` | -| | to create a new connection. | | -+---------------------------------+--------------------------------------------------+---------------------------------+ -| ``POSTGRES_CONNECTION_TTL`` | Time-to-life in seconds for a pooled connection. | ``300`` | -+---------------------------------+--------------------------------------------------+---------------------------------+ -| ``POSTGRES_QUERY_TIMEOUT`` | Maximum execution time for a query in seconds. | ``60`` | -+---------------------------------+--------------------------------------------------+---------------------------------+ -| ``POSTGRES_HSTORE`` | Enable HSTORE support in the client. | ``FALSE`` | -+---------------------------------+--------------------------------------------------+---------------------------------+ -| ``POSTGRES_JSON`` | Enable JSON support in the client. | ``FALSE`` | -+---------------------------------+--------------------------------------------------+---------------------------------+ -| ``POSTGRES_UUID`` | Enable UUID support in the client. | ``TRUE`` | -+---------------------------------+--------------------------------------------------+---------------------------------+ ++---------------------------------+--------------------------------------------------+-------------------+ +| Variable | Definition | Default | ++=================================+==================================================+===================+ +| ``POSTGRES_URL`` | The PostgreSQL URL to connect to | | ++---------------------------------+--------------------------------------------------+-------------------+ +| ``POSTGRES_MAX_POOL_SIZE`` | Maximum connection count to Postgres per backend | ``0`` (Unlimited) | ++---------------------------------+--------------------------------------------------+-------------------+ +| ``POSTGRES_MIN_POOL_SIZE`` | Minimum or starting pool size. | ``1`` | ++---------------------------------+--------------------------------------------------+-------------------+ +| ``POSTGRES_CONNECTION_TIMEOUT`` | The maximum time in seconds to spend attempting | ``10`` | +| | to create a new connection. | | ++---------------------------------+--------------------------------------------------+-------------------+ +| ``POSTGRES_CONNECTION_TTL`` | Time-to-life in seconds for a pooled connection. | ``300`` | ++---------------------------------+--------------------------------------------------+-------------------+ +| ``POSTGRES_QUERY_TIMEOUT`` | Maximum execution time for a query in seconds. | ``60`` | ++---------------------------------+--------------------------------------------------+-------------------+ +| ``POSTGRES_HSTORE`` | Enable HSTORE support in the client. | ``FALSE`` | ++---------------------------------+--------------------------------------------------+-------------------+ +| ``POSTGRES_JSON`` | Enable JSON support in the client. | ``FALSE`` | ++---------------------------------+--------------------------------------------------+-------------------+ +| ``POSTGRES_UUID`` | Enable UUID support in the client. | ``TRUE`` | ++---------------------------------+--------------------------------------------------+-------------------+ Requirements ------------ diff --git a/docs/application.rst b/docs/application.rst new file mode 100644 index 0000000..2f71fdc --- /dev/null +++ b/docs/application.rst @@ -0,0 +1,6 @@ +Application Mixin +================= + +.. autoclass:: sprockets_postgres.ApplicationMixin + :members: + :member-order: bysource diff --git a/docs/conf.py b/docs/conf.py new file mode 100644 index 0000000..a2f4e60 --- /dev/null +++ b/docs/conf.py @@ -0,0 +1,41 @@ +import datetime + +import pkg_resources + +master_doc = 'index' +project = 'sprockets-postgres' +release = version = pkg_resources.get_distribution(project).version +copyright = '{}, AWeber Communications'.format(datetime.date.today().year) + +html_theme = 'sphinx_rtd_theme' +html_theme_path = ['_themes'] + +extensions = [ + 'sphinx.ext.autodoc', + 'sphinx.ext.intersphinx', + 'sphinx_autodoc_typehints', + 'sphinx.ext.viewcode' +] + +set_type_checking_flag = True +typehints_fully_qualified = True +always_document_param_types = True +typehints_document_rtype = True + +templates_path = ['_templates'] +exclude_patterns = ['_build', 'Thumbs.db', '.DS_Store'] +intersphinx_mapping = { + 'aiopg': ('https://aiopg.readthedocs.io/en/stable/', None), + 'psycopg2': ('http://initd.org/psycopg/docs/', None), + 'python': ('https://docs.python.org/3', None), + 'sprockets-http': ( + 'https://sprocketshttp.readthedocs.io/en/master/', None), + 'sprockets-influxdb': ( + 'https://sprockets-influxdb.readthedocs.io/en/latest/', None), + 'sprockets.mixins.metrics': ( + 'https://sprocketsmixinsmetrics.readthedocs.io/en/latest/', None), + 'tornado': ('http://tornadoweb.org/en/latest/', None) +} + +autodoc_default_options = {'autodoc_typehints': 'description'} + diff --git a/docs/configuration.rst b/docs/configuration.rst new file mode 100644 index 0000000..15a47ac --- /dev/null +++ b/docs/configuration.rst @@ -0,0 +1,27 @@ +Configuration +============= +:py:mod:`sprockets-postgres ` is configured via environment variables. The following table +details the configuration options and their defaults. + ++---------------------------------+--------------------------------------------------+-------------------+ +| Variable | Definition | Default | ++=================================+==================================================+===================+ +| ``POSTGRES_URL`` | The PostgreSQL URL to connect to | | ++---------------------------------+--------------------------------------------------+-------------------+ +| ``POSTGRES_MAX_POOL_SIZE`` | Maximum connection count to Postgres per backend | ``0`` (Unlimited) | ++---------------------------------+--------------------------------------------------+-------------------+ +| ``POSTGRES_MIN_POOL_SIZE`` | Minimum or starting pool size. | ``1`` | ++---------------------------------+--------------------------------------------------+-------------------+ +| ``POSTGRES_CONNECTION_TIMEOUT`` | The maximum time in seconds to spend attempting | ``10`` | +| | to create a new connection. | | ++---------------------------------+--------------------------------------------------+-------------------+ +| ``POSTGRES_CONNECTION_TTL`` | Time-to-life in seconds for a pooled connection. | ``300`` | ++---------------------------------+--------------------------------------------------+-------------------+ +| ``POSTGRES_QUERY_TIMEOUT`` | Maximum execution time for a query in seconds. | ``60`` | ++---------------------------------+--------------------------------------------------+-------------------+ +| ``POSTGRES_HSTORE`` | Enable HSTORE support in the client. | ``FALSE`` | ++---------------------------------+--------------------------------------------------+-------------------+ +| ``POSTGRES_JSON`` | Enable JSON support in the client. | ``FALSE`` | ++---------------------------------+--------------------------------------------------+-------------------+ +| ``POSTGRES_UUID`` | Enable UUID support in the client. | ``TRUE`` | ++---------------------------------+--------------------------------------------------+-------------------+ diff --git a/docs/connector.rst b/docs/connector.rst new file mode 100644 index 0000000..40d4811 --- /dev/null +++ b/docs/connector.rst @@ -0,0 +1,9 @@ +PostgresConnector +================= +A :class:`~sprockets_postgres.PostgresConnector` instance contains a cursor for +a Postgres connection and methods to execute queries. + +.. autoclass:: sprockets_postgres.PostgresConnector + :members: + :undoc-members: + :member-order: bysource diff --git a/docs/example.rst b/docs/example.rst new file mode 100644 index 0000000..c6bbfe1 --- /dev/null +++ b/docs/example.rst @@ -0,0 +1,42 @@ +Example Web Application +======================= +The following code provides a simple example for using the + +.. code-block:: python + + import sprockets.http + import sprockets_postgres as postgres + from sprockets.http import app + + + class RequestHandler(postgres.RequestHandlerMixin, + web.RequestHandler): + + GET_SQL = """\ + SELECT foo_id, bar, baz, qux + FROM public.foo + WHERE foo_id = %(foo_id)s;""" + + async def get(self, foo_id: str) -> None: + result = await self.postgres_execute(self.GET_SQL, {'foo_id': foo_id}) + await self.finish(result.row) + + + class Application(postgres.ApplicationMixin, app.Application): + """ + The ``ApplicationMixin`` provides the foundation for the + ``RequestHandlerMixin`` to properly function and will automatically + setup the pool to connect to PostgreSQL and will shutdown the connections + cleanly when the application stops. + + """ + + + def make_app(**settings): + return Application([ + web.url(r'/foo/(?P.*)', FooRequestHandler) + ], **settings) + + + if __name__ == '__main__': + sprockets.http.run(make_app) diff --git a/docs/exceptions.rst b/docs/exceptions.rst new file mode 100644 index 0000000..e3d9324 --- /dev/null +++ b/docs/exceptions.rst @@ -0,0 +1,4 @@ +Exceptions +========== + +.. autoclass:: sprockets_postgres.ConnectionException diff --git a/docs/genindex.rst b/docs/genindex.rst new file mode 100644 index 0000000..9e530fa --- /dev/null +++ b/docs/genindex.rst @@ -0,0 +1,2 @@ +Index +===== diff --git a/docs/index.rst b/docs/index.rst new file mode 100644 index 0000000..9a503c3 --- /dev/null +++ b/docs/index.rst @@ -0,0 +1,31 @@ +Sprockets Postgres +================== +An set of mixins and classes for interacting with PostgreSQL using :mod:`asyncio` +in :class:`Tornado ` / +:class:`sprockets.http ` applications using +:py:mod:`aiopg`. + +Installation +------------ +``sprockets-postgres`` is available on the Python package index and is installable via pip: + +.. code:: bash + + pip install sprockets-postgres + + +Documentation +------------- + +.. toctree:: + :maxdepth: 1 + + configuration + application + request_handler + query_result + connector + typing + exceptions + example + genindex diff --git a/docs/query_result.rst b/docs/query_result.rst new file mode 100644 index 0000000..623dcc2 --- /dev/null +++ b/docs/query_result.rst @@ -0,0 +1,9 @@ +QueryResult +=========== +A :class:`~sprockets_postgres.QueryResult` instance is created for every query +and contains the count of rows effected by the query and either the ``row`` as +a :class:`dict` or ``rows`` as a list of :class:`dict`. For queries that do +not return any data, both ``row`` and ``rows`` will be :const:`None`. + +.. autoclass:: sprockets_postgres.QueryResult + :members: diff --git a/docs/request_handler.rst b/docs/request_handler.rst new file mode 100644 index 0000000..4d9f19c --- /dev/null +++ b/docs/request_handler.rst @@ -0,0 +1,11 @@ +RequestHandler Mixin +==================== +The :class:`~sprockets_postgres.RequestHandlerMixin` is a Tornado +:class:`tornado.web.RequestHandler` mixin that provides easy to use +functionality for interacting with PostgreSQL. + +.. autoclass:: sprockets_postgres.RequestHandlerMixin + :members: + :undoc-members: + :private-members: + :member-order: bysource diff --git a/docs/requirements.txt b/docs/requirements.txt new file mode 100644 index 0000000..7a7ed02 --- /dev/null +++ b/docs/requirements.txt @@ -0,0 +1,5 @@ +Sphinx==2.4.4 +sphinx-autodoc-typehints +sphinx_rtd_theme +typed_ast +typing_extensions diff --git a/docs/typing.rst b/docs/typing.rst new file mode 100644 index 0000000..0765676 --- /dev/null +++ b/docs/typing.rst @@ -0,0 +1,6 @@ +Type Annotations +================ + +.. autodata:: sprockets_postgres.QueryParameters + +.. autodata:: sprockets_postgres.Timeout From 75b8d49bed71a8f486bd37f5dd196786da3cfff8 Mon Sep 17 00:00:00 2001 From: "Gavin M. Roy" Date: Wed, 8 Apr 2020 18:22:24 -0400 Subject: [PATCH 16/19] Documentation updates --- docs/connector.rst | 4 ++-- docs/example.rst | 8 +++++--- docs/index.rst | 4 +++- docs/query_result.rst | 4 ++-- 4 files changed, 12 insertions(+), 8 deletions(-) diff --git a/docs/connector.rst b/docs/connector.rst index 40d4811..95f062c 100644 --- a/docs/connector.rst +++ b/docs/connector.rst @@ -1,5 +1,5 @@ -PostgresConnector -================= +Postgres Connector +================== A :class:`~sprockets_postgres.PostgresConnector` instance contains a cursor for a Postgres connection and methods to execute queries. diff --git a/docs/example.rst b/docs/example.rst index c6bbfe1..d3c95bd 100644 --- a/docs/example.rst +++ b/docs/example.rst @@ -3,14 +3,13 @@ Example Web Application The following code provides a simple example for using the .. code-block:: python - + import sprockets.http import sprockets_postgres as postgres from sprockets.http import app - class RequestHandler(postgres.RequestHandlerMixin, - web.RequestHandler): + class RequestHandler(postgres.RequestHandlerMixin, web.RequestHandler): GET_SQL = """\ SELECT foo_id, bar, baz, qux @@ -29,6 +28,9 @@ The following code provides a simple example for using the setup the pool to connect to PostgreSQL and will shutdown the connections cleanly when the application stops. + It should be used in conjunction with ``sprockets.http.app.Application`` + and not directly with ``tornado.web.Application``. + """ diff --git a/docs/index.rst b/docs/index.rst index 9a503c3..087f454 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -5,13 +5,15 @@ in :class:`Tornado ` / :class:`sprockets.http ` applications using :py:mod:`aiopg`. +Python versions supported: 3.7+ + Installation ------------ ``sprockets-postgres`` is available on the Python package index and is installable via pip: .. code:: bash - pip install sprockets-postgres + pip3 install sprockets-postgres Documentation diff --git a/docs/query_result.rst b/docs/query_result.rst index 623dcc2..de1cb4f 100644 --- a/docs/query_result.rst +++ b/docs/query_result.rst @@ -1,5 +1,5 @@ -QueryResult -=========== +Query Result +============ A :class:`~sprockets_postgres.QueryResult` instance is created for every query and contains the count of rows effected by the query and either the ``row`` as a :class:`dict` or ``rows`` as a list of :class:`dict`. For queries that do From fb816ddca78c8e5b8a40ff7c5767c63388317559 Mon Sep 17 00:00:00 2001 From: "Gavin M. Roy" Date: Wed, 8 Apr 2020 18:23:42 -0400 Subject: [PATCH 17/19] Add CONTRIBUTING info --- CONTRIBUTING.md | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) create mode 100644 CONTRIBUTING.md diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md new file mode 100644 index 0000000..c74dfbc --- /dev/null +++ b/CONTRIBUTING.md @@ -0,0 +1,23 @@ +# Contributing + +To get setup in the environment and run the tests, take the following steps: + +```bash +python3 -m venv env +source env/bin/activate +pip install -e '.[testing]' + +bootstrap + +flake8 +coverage run && coverage report +``` + +## Test Coverage + +Pull requests that make changes or additions that are not covered by tests +will likely be closed without review. + +In addition, all tests must pass the tests **AND** flake8 linter. If flake8 +exceptions are included, the reasoning for adding the exception must be included +in the pull request. From d057b140ae935a4584d203c6eac5666c2c76b096 Mon Sep 17 00:00:00 2001 From: "Gavin M. Roy" Date: Wed, 8 Apr 2020 18:43:51 -0400 Subject: [PATCH 18/19] Add docstrings, fix edge cases --- setup.cfg | 1 - sprockets_postgres.py | 317 ++++++++++++++++++++++++++++++++++++++---- tests.py | 64 ++++++--- 3 files changed, 340 insertions(+), 42 deletions(-) diff --git a/setup.cfg b/setup.cfg index 629b3d0..5000020 100644 --- a/setup.cfg +++ b/setup.cfg @@ -72,6 +72,5 @@ output = build/coverage.xml [flake8] application-import-names = sprockets_postgres,tests exclude = build,docs,env -ignore = W503 import-order-style = pycharm rst-roles = attr,class,const,data,exc,func,meth,mod,obj,ref,yields diff --git a/sprockets_postgres.py b/sprockets_postgres.py index 9651750..6747e28 100644 --- a/sprockets_postgres.py +++ b/sprockets_postgres.py @@ -25,26 +25,63 @@ DEFAULT_POSTGRES_QUERY_TIMEOUT = 120 DEFAULT_POSTGRES_UUID = 'TRUE' QueryParameters = typing.Union[dict, list, tuple, None] +"""Type annotation for query parameters""" + Timeout = typing.Union[int, float, None] +"""Type annotation for timeout values""" @dataclasses.dataclass class QueryResult: + """A :func:`Data Class ` that is generated as a + result of each query that is executed. + + :param row_count: The quantity of rows impacted by the query + :param row: If a single row is returned, the data for that row + :param rows: If more than one row is returned, this attribute is set as the + list of rows, in order. + + """ row_count: int row: typing.Optional[dict] rows: typing.Optional[typing.List[dict]] class PostgresConnector: + """Wraps a :class:`aiopg.Cursor` instance for creating explicit + transactions, calling stored procedures, and executing queries. + Unless the :meth:`~sprockets_postgres.PostgresConnector.transaction` + asynchronous :ref:`context-manager ` is used, + each call to :meth:`~sprockets_postgres.PostgresConnector.callproc` and + :meth:`~sprockets_postgres.PostgresConnector.execute` is an explicit + transaction. + + .. note:: :class:`PostgresConnector` instances are created by + :meth:`ApplicationMixin.postgres_connector + ` and should + not be created directly. + + :param cursor: The cursor to use in the connector + :type cursor: aiopg.Cursor + :param on_error: The callback to invoke when an exception is caught + :param on_duration: The callback to invoke when a query is complete and all + of the data has been returned. + :param timeout: A timeout value in seconds for executing queries. If + unspecified, defaults to the ``POSTGRES_QUERY_TIMEOUT`` environment + variable and if that is not specified, to the + :const:`DEFAULT_POSTGRES_QUERY_TIMEOUT` value of ``120`` + :type timeout: :data:`~sprockets_postgres.Timeout` + + """ def __init__(self, cursor: aiopg.Cursor, on_error: typing.Callable, - record_duration: typing.Optional[typing.Callable] = None, + on_duration: typing.Optional[typing.Callable] = None, timeout: Timeout = None): self.cursor = cursor self._on_error = on_error - self._record_duration = record_duration + self._on_duration = on_duration self._timeout = timeout or int( os.environ.get( 'POSTGRES_QUERY_TIMEOUT', @@ -56,6 +93,27 @@ class PostgresConnector: metric_name: str = '', *, timeout: Timeout = None) -> QueryResult: + """Execute a stored procedure / function + + :param name: The stored procedure / function name to call + :param parameters: Query parameters to pass when calling + :type parameters: :data:`~sprockets_postgres.QueryParameters` + :param metric_name: The metric name for duration recording and logging + :param timeout: Timeout value to override the default or the value + specified when creating the + :class:`~sprockets_postgres.PostgresConnector`. + :type timeout: :data:`~sprockets_postgres.Timeout` + + :raises asyncio.TimeoutError: when there is a query or network timeout + :raises psycopg2.Error: when there is an exception raised by Postgres + + .. note: :exc:`psycopg2.Error` is the base exception for all + :mod:`psycopg2` exceptions and the actual exception raised will + likely be more specific. + + :rtype: :class:`~sprockets_postgres.QueryResult` + + """ return await self._query( self.cursor.callproc, metric_name, @@ -69,6 +127,32 @@ class PostgresConnector: metric_name: str = '', *, timeout: Timeout = None) -> QueryResult: + """Execute a query, specifying a name for the query, the SQL statement, + and optional positional arguments to pass in with the query. + + Parameters may be provided as sequence or mapping and will be + bound to variables in the operation. Variables are specified + either with positional ``%s`` or named ``%({name})s`` placeholders. + + :param sql: The SQL statement to execute + :param parameters: Query parameters to pass as part of the execution + :type parameters: :data:`~sprockets_postgres.QueryParameters` + :param metric_name: The metric name for duration recording and logging + :param timeout: Timeout value to override the default or the value + specified when creating the + :class:`~sprockets_postgres.PostgresConnector`. + :type timeout: :data:`~sprockets_postgres.Timeout` + + :raises asyncio.TimeoutError: when there is a query or network timeout + :raises psycopg2.Error: when there is an exception raised by Postgres + + .. note: :exc:`psycopg2.Error` is the base exception for all + :mod:`psycopg2` exceptions and the actual exception raised will + likely be more specific. + + :rtype: :class:`~sprockets_postgres.QueryResult` + + """ return await self._query( self.cursor.execute, metric_name, @@ -79,6 +163,42 @@ class PostgresConnector: @contextlib.asynccontextmanager async def transaction(self) \ -> typing.AsyncContextManager['PostgresConnector']: + """asynchronous :ref:`context-manager ` + function that implements full ``BEGIN``, ``COMMIT``, and ``ROLLBACK`` + semantics. If there is a :exc:`psycopg2.Error` raised during the + transaction, the entire transaction will be rolled back. + + If no exception is raised, the transaction will be committed when + exiting the context manager. + + .. note:: This method is provided for edge case usage. As a + generalization + :meth:`sprockets_postgres.RequestHandlerMixin.postgres_transaction` + should be used instead. + + *Usage Example* + + .. code-block:: + + class RequestHandler(sprockets_postgres.RequestHandlerMixin, + web.RequestHandler): + + async def post(self): + async with self.postgres_transaction() as transaction: + result1 = await transaction.execute(QUERY_ONE) + result2 = await transaction.execute(QUERY_TWO) + result3 = await transaction.execute(QUERY_THREE) + + :raises asyncio.TimeoutError: when there is a query or network timeout + when starting the transaction + :raises psycopg2.Error: when there is an exception raised by Postgres + when starting the transaction + + .. note: :exc:`psycopg2.Error` is the base exception for all + :mod:`psycopg2` exceptions and the actual exception raised will + likely be more specific. + + """ async with self.cursor.begin(): yield self @@ -96,10 +216,11 @@ class PostgresConnector: if exc: raise exc else: - if self._record_duration: - self._record_duration( + results = await self._query_results() + if self._on_duration: + self._on_duration( metric_name, time.monotonic() - start_time) - return await self._query_results() + return results async def _query_results(self) -> QueryResult: count, row, rows = self.cursor.rowcount, None, None @@ -122,12 +243,13 @@ class ConnectionException(Exception): class ApplicationMixin: """ - :class:`sprockets.http.app.Application` mixin for handling the connection - to Postgres and exporting functions for querying the database, - getting the status, and proving a cursor. + :class:`sprockets.http.app.Application` / :class:`tornado.web.Application` + mixin for handling the connection to Postgres and exporting functions for + querying the database, getting the status, and proving a cursor. - Automatically creates and shuts down :class:`aio.pool.Pool` on startup - and shutdown. + Automatically creates and shuts down :class:`aiopg.Pool` on startup + and shutdown by installing `on_start` and `shutdown` callbacks into the + :class:`~sprockets.http.app.Application` instance. """ POSTGRES_STATUS_TIMEOUT = 3 @@ -141,24 +263,77 @@ class ApplicationMixin: @contextlib.asynccontextmanager async def postgres_connector(self, on_error: typing.Callable, - record_duration: typing.Optional[ + on_duration: typing.Optional[ typing.Callable] = None, timeout: Timeout = None) \ -> typing.AsyncContextManager[PostgresConnector]: + """Asynchronous :ref:`context-manager ` + that returns a :class:`~sprockets_postgres.PostgresConnector` instance + from the connection pool with a cursor. + + .. note:: This function is designed to work in conjunction with the + :class:`~sprockets_postgres.RequestHandlerMixin` and is generally + not invoked directly. + + :param on_error: A callback function that is invoked on exception. If + an exception is returned from that function, it will raise it. + :param on_duration: An optional callback function that is invoked after + a query has completed to record the duration that encompasses + both executing the query and retrieving the returned records, if + any. + :param timeout: Used to override the default query timeout. + :type timeout: :data:`~sprockets_postgres.Timeout` + + :raises asyncio.TimeoutError: when the request to retrieve a connection + from the pool times out. + :raises sprockets_postgres.ConnectionException: when the application + can not connect to the configured Postgres instance. + :raises psycopg2.Error: when Postgres raises an exception during the + creation of the cursor. + + .. note: :exc:`psycopg2.Error` is the base exception for all + :mod:`psycopg2` exceptions and the actual exception raised will + likely be more specific. + + """ try: async with self._postgres_pool.acquire() as conn: async with conn.cursor( cursor_factory=extras.RealDictCursor, timeout=timeout) as cursor: yield PostgresConnector( - cursor, on_error, record_duration, timeout) + cursor, on_error, on_duration, timeout) except (asyncio.TimeoutError, psycopg2.Error) as err: - on_error('postgres_connector', ConnectionException(str(err))) + exc = on_error('postgres_connector', ConnectionException(str(err))) + if exc: + raise exc + else: # postgres_status.on_error does not return an exception + yield None async def postgres_status(self) -> dict: """Invoke from the ``/status`` RequestHandler to check that there is - a Postgres connection handler available and return info about the - pool. + a Postgres connection handler available and return info about the + pool. + + The ``available`` item in the dictionary indicates that the + application was able to perform a ``SELECT 1`` against the database + using a :class:`~sprockets_postgres.PostgresConnector` instance. + + The ``pool_size`` item indicates the current quantity of open + connections to Postgres. + + The ``pool_free`` item indicates the current number of idle + connections available to process queries. + + *Example return value* + + .. code-block:: python + + { + 'available': True, + 'pool_size': 10, + 'pool_free': 8 + } """ query_error = asyncio.Event() @@ -170,7 +345,9 @@ class ApplicationMixin: async with self.postgres_connector( on_error, timeout=self.POSTGRES_STATUS_TIMEOUT) as connector: - await connector.execute('SELECT 1') + if connector: + await connector.execute('SELECT 1') + return { 'available': not query_error.is_set(), 'pool_size': self._postgres_pool.size, @@ -229,15 +406,17 @@ class ApplicationMixin: This is invoked by the Application shutdown callback mechanism. """ - self._postgres_pool.close() - await self._postgres_pool.wait_closed() + if self._postgres_pool is not None: + self._postgres_pool.close() + await self._postgres_pool.wait_closed() class RequestHandlerMixin: """ - RequestHandler mixin class exposing functions for querying the database, - recording the duration to either `sprockets-influxdb` or - `sprockets.mixins.metrics`, and handling exceptions. + A RequestHandler mixin class exposing functions for querying the database, + recording the duration to either :mod:`sprockets-influxdb + ` or :mod:`sprockets.mixins.metrics`, and + handling exceptions. """ async def postgres_callproc(self, @@ -246,6 +425,27 @@ class RequestHandlerMixin: metric_name: str = '', *, timeout: Timeout = None) -> QueryResult: + """Execute a stored procedure / function + + :param name: The stored procedure / function name to call + :param parameters: Query parameters to pass when calling + :type parameters: :data:`~sprockets_postgres.QueryParameters` + :param metric_name: The metric name for duration recording and logging + :param timeout: Timeout value to override the default or the value + specified when creating the + :class:`~sprockets_postgres.PostgresConnector`. + :type timeout: :data:`~sprockets_postgres.Timeout` + + :raises asyncio.TimeoutError: when there is a query or network timeout + :raises psycopg2.Error: when there is an exception raised by Postgres + + .. note: :exc:`psycopg2.Error` is the base exception for all + :mod:`psycopg2` exceptions and the actual exception raised will + likely be more specific. + + :rtype: :class:`~sprockets_postgres.QueryResult` + + """ async with self.application.postgres_connector( self._on_postgres_error, self._on_postgres_timing, @@ -266,6 +466,24 @@ class RequestHandlerMixin: bound to variables in the operation. Variables are specified either with positional ``%s`` or named ``%({name})s`` placeholders. + :param sql: The SQL statement to execute + :param parameters: Query parameters to pass as part of the execution + :type parameters: :data:`~sprockets_postgres.QueryParameters` + :param metric_name: The metric name for duration recording and logging + :param timeout: Timeout value to override the default or the value + specified when creating the + :class:`~sprockets_postgres.PostgresConnector`. + :type timeout: :data:`~sprockets_postgres.Timeout` + + :raises asyncio.TimeoutError: when there is a query or network timeout + :raises psycopg2.Error: when there is an exception raised by Postgres + + .. note: :exc:`psycopg2.Error` is the base exception for all + :mod:`psycopg2` exceptions and the actual exception raised will + likely be more specific. + + :rtype: :class:`~sprockets_postgres.QueryResult` + """ async with self.application.postgres_connector( self._on_postgres_error, @@ -277,8 +495,41 @@ class RequestHandlerMixin: @contextlib.asynccontextmanager async def postgres_transaction(self, timeout: Timeout = None) \ -> typing.AsyncContextManager[PostgresConnector]: - """Yields a :class:`PostgresConnector` instance in a transaction. - Will automatically commit or rollback based upon exception. + """asynchronous :ref:`context-manager ` + function that implements full ``BEGIN``, ``COMMIT``, and ``ROLLBACK`` + semantics. If there is a :exc:`psycopg2.Error` raised during the + transaction, the entire transaction will be rolled back. + + If no exception is raised, the transaction will be committed when + exiting the context manager. + + *Usage Example* + + .. code-block:: python + + class RequestHandler(sprockets_postgres.RequestHandlerMixin, + web.RequestHandler): + + async def post(self): + async with self.postgres_transaction() as transaction: + result1 = await transaction.execute(QUERY_ONE) + result2 = await transaction.execute(QUERY_TWO) + result3 = await transaction.execute(QUERY_THREE) + + + :param timeout: Timeout value to override the default or the value + specified when creating the + :class:`~sprockets_postgres.PostgresConnector`. + :type timeout: :data:`~sprockets_postgres.Timeout` + + :raises asyncio.TimeoutError: when there is a query or network timeout + when starting the transaction + :raises psycopg2.Error: when there is an exception raised by Postgres + when starting the transaction + + .. note: :exc:`psycopg2.Error` is the base exception for all + :mod:`psycopg2` exceptions and the actual exception raised will + likely be more specific. """ async with self.application.postgres_connector( @@ -291,7 +542,12 @@ class RequestHandlerMixin: def _on_postgres_error(self, metric_name: str, exc: Exception) -> typing.Optional[Exception]: - """Override for different error handling behaviors""" + """Override for different error handling behaviors + + Return an exception if you would like for it to be raised, or swallow + it here. + + """ LOGGER.error('%s in %s for %s (%s)', exc.__class__.__name__, self.__class__.__name__, metric_name, str(exc).split('\n')[0]) @@ -308,7 +564,18 @@ class RequestHandlerMixin: def _on_postgres_timing(self, metric_name: str, duration: float) -> None: - """Override for custom metric recording""" + """Override for custom metric recording. As a default behavior it will + attempt to detect `sprockets-influxdb + `_ and + `sprockets.mixins.metrics + `_ and + record the metrics using them if they are available. If they are not + available, it will record the query duration to the `DEBUG` log. + + :param metric_name: The name of the metric to record + :param duration: The duration to record for the metric + + """ if hasattr(self, 'influxdb'): # sprockets-influxdb self.influxdb.set_field(metric_name, duration) elif hasattr(self, 'record_timing'): # sprockets.mixins.metrics diff --git a/tests.py b/tests.py index f676824..67d0402 100644 --- a/tests.py +++ b/tests.py @@ -2,13 +2,14 @@ import asyncio import json import os import typing +import unittest import uuid from unittest import mock import psycopg2 from psycopg2 import errors from sprockets.http import app, testing -from tornado import web +from tornado import ioloop, web import sprockets_postgres @@ -63,6 +64,16 @@ class ErrorRequestHandler(RequestHandler): return RuntimeError() +class ErrorPassthroughRequestHandler(RequestHandler): + + async def get(self): + exc = self._on_postgres_error('test', RuntimeError()) + if isinstance(exc, RuntimeError): + self.set_status(204) + else: + raise web.HTTPError(500, 'Did not pass through') + + class ExecuteRequestHandler(RequestHandler): GET_SQL = 'SELECT %s::TEXT AS value;' @@ -118,6 +129,14 @@ class MultiRowRequestHandler(RequestHandler): 'rows': self.cast_data(result.rows)}) +class NoErrorRequestHandler(ErrorRequestHandler): + + def _on_postgres_error(self, + metric_name: str, + exc: Exception) -> typing.Optional[Exception]: + return None + + class NoRowRequestHandler(RequestHandler): GET_SQL = """\ @@ -210,10 +229,12 @@ class TestCase(testing.SprocketsHttpTestCase): web.url('/callproc', CallprocRequestHandler), web.url('/count', CountRequestHandler), web.url('/error', ErrorRequestHandler), + web.url('/error-passthrough', ErrorPassthroughRequestHandler), web.url('/execute', ExecuteRequestHandler), web.url('/influxdb', InfluxDBRequestHandler), web.url('/metrics-mixin', MetricsMixinRequestHandler), web.url('/multi-row', MultiRowRequestHandler), + web.url('/no-error', NoErrorRequestHandler), web.url('/no-row', NoRowRequestHandler), web.url('/status', StatusRequestHandler), web.url('/transaction', TransactionRequestHandler), @@ -231,6 +252,13 @@ class RequestHandlerMixinTestCase(TestCase): self.assertGreaterEqual(data['pool_size'], 1) self.assertGreaterEqual(data['pool_free'], 1) + @mock.patch('aiopg.pool.Pool.acquire') + def test_postgres_status_connect_error(self, acquire): + acquire.side_effect = asyncio.TimeoutError() + response = self.fetch('/status') + self.assertEqual(response.code, 503) + self.assertFalse(json.loads(response.body)['available']) + @mock.patch('aiopg.cursor.Cursor.execute') def test_postgres_status_error(self, execute): execute.side_effect = asyncio.TimeoutError() @@ -245,12 +273,23 @@ class RequestHandlerMixinTestCase(TestCase): uuid.UUID(json.loads(response.body)['value']), uuid.UUID) @mock.patch('aiopg.cursor.Cursor.execute') - def test_postgres_error_passthrough(self, execute): + def test_postgres_error(self, execute): execute.side_effect = asyncio.TimeoutError response = self.fetch('/error') self.assertEqual(response.code, 500) self.assertIn(b'Internal Server Error', response.body) + @mock.patch('aiopg.pool.Pool.acquire') + def test_postgres_error_on_connect(self, acquire): + acquire.side_effect = asyncio.TimeoutError + response = self.fetch('/error') + self.assertEqual(response.code, 500) + self.assertIn(b'Internal Server Error', response.body) + + def test_postgres_error_passthrough(self): + response = self.fetch('/error-passthrough') + self.assertEqual(response.code, 204) + def test_postgres_execute(self): expectation = str(uuid.uuid4()) response = self.fetch('/execute?value={}'.format(expectation)) @@ -369,8 +408,7 @@ class TransactionTestCase(TestCase): self.assertEqual(record['last_updated_at'], last_updated) -""" -class MissingURLTestCase(testing.SprocketsHttpTestCase): +class MissingURLTestCase(unittest.TestCase): @classmethod def setUpClass(cls): @@ -384,16 +422,10 @@ class MissingURLTestCase(testing.SprocketsHttpTestCase): if 'POSTGRES_URL' in os.environ: del os.environ['POSTGRES_URL'] - def setUp(self): - self.stop_mock = None - super().setUp() - - def get_app(self): - self.app = Application() - self.stop_mock = mock.Mock( - wraps=self.app.stop, side_effect=RuntimeError) - return self.app - def test_that_stop_is_invoked(self): - self.stop_mock.assert_called_once_with(self.io_loop) -""" + io_loop = ioloop.IOLoop.current() + obj = Application() + obj.stop = mock.Mock(wraps=obj.stop) + obj.start(io_loop) + io_loop.start() + obj.stop.assert_called_once() From a7b15254f4a65a20a2a6cfc9403040cc17cbe867 Mon Sep 17 00:00:00 2001 From: "Gavin M. Roy" Date: Wed, 8 Apr 2020 18:48:10 -0400 Subject: [PATCH 19/19] Bump the version --- VERSION | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/VERSION b/VERSION index 1628379..3eefcb9 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -1.0.0a1 +1.0.0