diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 0000000..b5d8093 --- /dev/null +++ b/.editorconfig @@ -0,0 +1,21 @@ +# top-most EditorConfig file +root = true + +# Unix-style newlines with a newline ending every file +[*] +end_of_line = lf +insert_final_newline = true + +# 4 space indentation +[*.py] +indent_style = space +indent_size = 4 + +# 2 space indentation +[*.yml] +indent_style = space +indent_size = 2 + +[bootstrap] +indent_style = space +indent_size = 2 diff --git a/.github/workflows/deploy.yaml b/.github/workflows/deploy.yaml new file mode 100644 index 0000000..09c97f1 --- /dev/null +++ b/.github/workflows/deploy.yaml @@ -0,0 +1,20 @@ +name: Deployment +on: + push: + branches-ignore: ["*"] + tags: ["*"] +jobs: + deploy: + runs-on: ubuntu-latest + if: github.event_name == 'push' && startsWith(github.event.ref, 'refs/tags') && github.repository == 'sprockets/sprockets-postgres' + container: python:3.8-alpine + steps: + - name: Checkout repository + uses: actions/checkout@v1 + - name: Build package + run: python3 setup.py sdist + - name: Publish package + uses: pypa/gh-action-pypi-publish@master + with: + user: __token__ + password: ${{ secrets.PYPI_PASSWORD }} diff --git a/.github/workflows/testing.yaml b/.github/workflows/testing.yaml new file mode 100644 index 0000000..93a17bd --- /dev/null +++ b/.github/workflows/testing.yaml @@ -0,0 +1,69 @@ +name: Testing +on: + push: + branches: ["*"] + paths-ignore: + - 'docs/**' + - 'setup.*' + - '*.md' + - '*.rst' + tags-ignore: ["*"] +jobs: + test: + runs-on: ubuntu-latest + timeout-minutes: 3 + services: + postgres: + image: postgres:12-alpine + options: >- + --health-cmd "pg_isready" + --health-interval 10s + --health-timeout 10s + --health-retries 5 + ports: + - 5432 + env: + POSTGRES_HOST_AUTH_METHOD: trust + strategy: + matrix: + python: [3.7, 3.8] + container: + image: python:${{ matrix.python }}-alpine + steps: + - name: Checkout repository + uses: actions/checkout@v1 + + - name: Install OS dependencies + run: apk --update add gcc make musl-dev linux-headers postgresql-dev + + - name: Install testing dependencies + run: pip3 --no-cache-dir install -e '.[testing]' + + - name: Create build directory + run: mkdir build + + - name: Create build/test-environment + run: echo "export POSTGRES_URL=postgresql://postgres@postgres:5432/postgres" > build/test-environment + + - name: Install UUID extension + run: | + apk add postgresql-client \ + && psql -q -h postgres -U postgres -d postgres -f fixtures/testing.sql + + - name: Run flake8 tests + run: flake8 + + - name: Run tests + run: coverage run + + - name: Output coverage + run: coverage report && coverage xml + + - name: Upload Coverage + uses: codecov/codecov-action@v1.0.2 + if: github.event_name == 'push' && github.repository == 'sprockets/sprockets-postgres' + with: + token: ${{secrets.CODECOV_TOKEN}} + file: build/coverage.xml + flags: unittests + fail_ci_if_error: true diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md new file mode 100644 index 0000000..c74dfbc --- /dev/null +++ b/CONTRIBUTING.md @@ -0,0 +1,23 @@ +# Contributing + +To get setup in the environment and run the tests, take the following steps: + +```bash +python3 -m venv env +source env/bin/activate +pip install -e '.[testing]' + +bootstrap + +flake8 +coverage run && coverage report +``` + +## Test Coverage + +Pull requests that make changes or additions that are not covered by tests +will likely be closed without review. + +In addition, all tests must pass the tests **AND** flake8 linter. If flake8 +exceptions are included, the reasoning for adding the exception must be included +in the pull request. diff --git a/LICENSE b/LICENSE index 3a7e87f..8c1fb0a 100644 --- a/LICENSE +++ b/LICENSE @@ -1,29 +1,25 @@ -BSD 3-Clause License - -Copyright (c) 2020, Sprockets +Copyright (c) 2020 AWeber Communications All rights reserved. -Redistribution and use in source and binary forms, with or without -modification, are permitted provided that the following conditions are met: +Redistribution and use in source and binary forms, with or without modification, +are permitted provided that the following conditions are met: -1. Redistributions of source code must retain the above copyright notice, this + * Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer. - -2. Redistributions in binary form must reproduce the above copyright notice, + * Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. + * Neither the name of the copyright holder nor the names of its contributors may + be used to endorse or promote products derived from this software without + specific prior written permission. -3. Neither the name of the copyright holder nor the names of its - contributors may be used to endorse or promote products derived from - this software without specific prior written permission. - -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" -AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE -DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE -FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL -DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR -SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER -CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, -OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. +IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, +INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, +BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF +LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE +OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF +ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/MANIFEST.in b/MANIFEST.in new file mode 100644 index 0000000..9340542 --- /dev/null +++ b/MANIFEST.in @@ -0,0 +1,3 @@ +include README.rst +include LICENSE +include VERSION diff --git a/README.md b/README.md deleted file mode 100644 index f194828..0000000 --- a/README.md +++ /dev/null @@ -1 +0,0 @@ -# sprockets-postgres \ No newline at end of file diff --git a/README.rst b/README.rst new file mode 100644 index 0000000..8d92f5a --- /dev/null +++ b/README.rst @@ -0,0 +1,67 @@ +Sprockets Postgres +================== +An set of mixins and classes for interacting with PostgreSQL using asyncio in +Tornado / sprockets.http applications using aiopg. + +|Version| |Status| |Coverage| |License| + +Installation +------------ +``sprockets-postgres`` is available on the Python package index and is installable via pip: + +.. code:: bash + + pip install sprockets-postgres + +Documentation +------------- +Documentation is available at `sprockets-postgres.readthedocs.io `_. + +Configuration +------------- +The following table details the environment variable configuration options: + ++---------------------------------+--------------------------------------------------+-------------------+ +| Variable | Definition | Default | ++=================================+==================================================+===================+ +| ``POSTGRES_URL`` | The PostgreSQL URL to connect to | | ++---------------------------------+--------------------------------------------------+-------------------+ +| ``POSTGRES_MAX_POOL_SIZE`` | Maximum connection count to Postgres per backend | ``0`` (Unlimited) | ++---------------------------------+--------------------------------------------------+-------------------+ +| ``POSTGRES_MIN_POOL_SIZE`` | Minimum or starting pool size. | ``1`` | ++---------------------------------+--------------------------------------------------+-------------------+ +| ``POSTGRES_CONNECTION_TIMEOUT`` | The maximum time in seconds to spend attempting | ``10`` | +| | to create a new connection. | | ++---------------------------------+--------------------------------------------------+-------------------+ +| ``POSTGRES_CONNECTION_TTL`` | Time-to-life in seconds for a pooled connection. | ``300`` | ++---------------------------------+--------------------------------------------------+-------------------+ +| ``POSTGRES_QUERY_TIMEOUT`` | Maximum execution time for a query in seconds. | ``60`` | ++---------------------------------+--------------------------------------------------+-------------------+ +| ``POSTGRES_HSTORE`` | Enable HSTORE support in the client. | ``FALSE`` | ++---------------------------------+--------------------------------------------------+-------------------+ +| ``POSTGRES_JSON`` | Enable JSON support in the client. | ``FALSE`` | ++---------------------------------+--------------------------------------------------+-------------------+ +| ``POSTGRES_UUID`` | Enable UUID support in the client. | ``TRUE`` | ++---------------------------------+--------------------------------------------------+-------------------+ + +Requirements +------------ +- `aiopg `_ +- `sprockets.http `_ +- `Tornado `_ + +Version History +--------------- +Available at https://sprockets-postgres.readthedocs.org/en/latest/history.html + +.. |Version| image:: https://img.shields.io/pypi/v/sprockets-postgres.svg? + :target: http://badge.fury.io/py/sprockets-postgres + +.. |Status| image:: https://img.shields.io/travis/sprockets/sprockets-postgres.svg? + :target: https://travis-ci.org/sprockets/sprockets-postgres + +.. |Coverage| image:: https://img.shields.io/codecov/c/github/sprockets/sprockets-postgres.svg? + :target: https://codecov.io/github/sprockets/sprockets-postgres?branch=master + +.. |License| image:: https://img.shields.io/pypi/l/sprockets-postgres.svg? + :target: https://sprockets-postgres.readthedocs.org diff --git a/VERSION b/VERSION new file mode 100644 index 0000000..3eefcb9 --- /dev/null +++ b/VERSION @@ -0,0 +1 @@ +1.0.0 diff --git a/bootstrap b/bootstrap new file mode 100755 index 0000000..735a3fc --- /dev/null +++ b/bootstrap @@ -0,0 +1,77 @@ +#!/usr/bin/env sh +set -e + +# Common constants +COLOR_RESET='\033[0m' +COLOR_GREEN='\033[0;32m' +COMPOSE_PROJECT_NAME="${COMPOSE_PROJECT_NAME:-${PWD##*/}}" +TEST_HOST="${TEST_HOST:-localhost}" + +echo "Integration test host: ${TEST_HOST}" + +get_exposed_port() { + if [ -z $3 ] + then + docker-compose port $1 $2 | cut -d: -f2 + else + docker-compose port --index=$3 $1 $2 | cut -d: -f2 + fi +} + +report_start() { + printf "Waiting for $1 ... " +} + +report_done() { + printf "${COLOR_GREEN}done${COLOR_RESET}\n" +} + +wait_for_healthy_containers() { + IDs=$(docker-compose ps -q | paste -sd " " -) + report_start "${1} containers to report healthy" + counter="0" + while true + do + if [ "$(docker inspect -f "{{.State.Health.Status}}" ${IDs} | grep -c healthy)" -eq "${1}" ]; then + break + fi + counter=$((++counter)) + if [ "${counter}" -eq 120 ]; then + echo " ERROR: containers failed to start" + exit 1 + fi + sleep 1 + done + report_done +} + +# Ensure Docker is Running +echo "Docker Information:" +echo "" +docker version +echo "" + +# Activate the virtual environment +if test -e env/bin/activate +then + . ./env/bin/activate +fi + +mkdir -p build + +# Stop any running instances and clean up after them, then pull images +docker-compose down --volumes --remove-orphans +docker-compose up -d --quiet-pull + +wait_for_healthy_containers 1 + +printf "Loading fixture data ... " +docker-compose exec postgres psql -q -o /dev/null -U postgres -d postgres -f /fixtures/testing.sql +report_done + +cat > build/test-environment<` is configured via environment variables. The following table +details the configuration options and their defaults. + ++---------------------------------+--------------------------------------------------+-------------------+ +| Variable | Definition | Default | ++=================================+==================================================+===================+ +| ``POSTGRES_URL`` | The PostgreSQL URL to connect to | | ++---------------------------------+--------------------------------------------------+-------------------+ +| ``POSTGRES_MAX_POOL_SIZE`` | Maximum connection count to Postgres per backend | ``0`` (Unlimited) | ++---------------------------------+--------------------------------------------------+-------------------+ +| ``POSTGRES_MIN_POOL_SIZE`` | Minimum or starting pool size. | ``1`` | ++---------------------------------+--------------------------------------------------+-------------------+ +| ``POSTGRES_CONNECTION_TIMEOUT`` | The maximum time in seconds to spend attempting | ``10`` | +| | to create a new connection. | | ++---------------------------------+--------------------------------------------------+-------------------+ +| ``POSTGRES_CONNECTION_TTL`` | Time-to-life in seconds for a pooled connection. | ``300`` | ++---------------------------------+--------------------------------------------------+-------------------+ +| ``POSTGRES_QUERY_TIMEOUT`` | Maximum execution time for a query in seconds. | ``60`` | ++---------------------------------+--------------------------------------------------+-------------------+ +| ``POSTGRES_HSTORE`` | Enable HSTORE support in the client. | ``FALSE`` | ++---------------------------------+--------------------------------------------------+-------------------+ +| ``POSTGRES_JSON`` | Enable JSON support in the client. | ``FALSE`` | ++---------------------------------+--------------------------------------------------+-------------------+ +| ``POSTGRES_UUID`` | Enable UUID support in the client. | ``TRUE`` | ++---------------------------------+--------------------------------------------------+-------------------+ diff --git a/docs/connector.rst b/docs/connector.rst new file mode 100644 index 0000000..95f062c --- /dev/null +++ b/docs/connector.rst @@ -0,0 +1,9 @@ +Postgres Connector +================== +A :class:`~sprockets_postgres.PostgresConnector` instance contains a cursor for +a Postgres connection and methods to execute queries. + +.. autoclass:: sprockets_postgres.PostgresConnector + :members: + :undoc-members: + :member-order: bysource diff --git a/docs/example.rst b/docs/example.rst new file mode 100644 index 0000000..d3c95bd --- /dev/null +++ b/docs/example.rst @@ -0,0 +1,44 @@ +Example Web Application +======================= +The following code provides a simple example for using the + +.. code-block:: python + + import sprockets.http + import sprockets_postgres as postgres + from sprockets.http import app + + + class RequestHandler(postgres.RequestHandlerMixin, web.RequestHandler): + + GET_SQL = """\ + SELECT foo_id, bar, baz, qux + FROM public.foo + WHERE foo_id = %(foo_id)s;""" + + async def get(self, foo_id: str) -> None: + result = await self.postgres_execute(self.GET_SQL, {'foo_id': foo_id}) + await self.finish(result.row) + + + class Application(postgres.ApplicationMixin, app.Application): + """ + The ``ApplicationMixin`` provides the foundation for the + ``RequestHandlerMixin`` to properly function and will automatically + setup the pool to connect to PostgreSQL and will shutdown the connections + cleanly when the application stops. + + It should be used in conjunction with ``sprockets.http.app.Application`` + and not directly with ``tornado.web.Application``. + + """ + + + def make_app(**settings): + return Application([ + web.url(r'/foo/(?P.*)', FooRequestHandler) + ], **settings) + + + if __name__ == '__main__': + sprockets.http.run(make_app) diff --git a/docs/exceptions.rst b/docs/exceptions.rst new file mode 100644 index 0000000..e3d9324 --- /dev/null +++ b/docs/exceptions.rst @@ -0,0 +1,4 @@ +Exceptions +========== + +.. autoclass:: sprockets_postgres.ConnectionException diff --git a/docs/genindex.rst b/docs/genindex.rst new file mode 100644 index 0000000..9e530fa --- /dev/null +++ b/docs/genindex.rst @@ -0,0 +1,2 @@ +Index +===== diff --git a/docs/index.rst b/docs/index.rst new file mode 100644 index 0000000..087f454 --- /dev/null +++ b/docs/index.rst @@ -0,0 +1,33 @@ +Sprockets Postgres +================== +An set of mixins and classes for interacting with PostgreSQL using :mod:`asyncio` +in :class:`Tornado ` / +:class:`sprockets.http ` applications using +:py:mod:`aiopg`. + +Python versions supported: 3.7+ + +Installation +------------ +``sprockets-postgres`` is available on the Python package index and is installable via pip: + +.. code:: bash + + pip3 install sprockets-postgres + + +Documentation +------------- + +.. toctree:: + :maxdepth: 1 + + configuration + application + request_handler + query_result + connector + typing + exceptions + example + genindex diff --git a/docs/query_result.rst b/docs/query_result.rst new file mode 100644 index 0000000..de1cb4f --- /dev/null +++ b/docs/query_result.rst @@ -0,0 +1,9 @@ +Query Result +============ +A :class:`~sprockets_postgres.QueryResult` instance is created for every query +and contains the count of rows effected by the query and either the ``row`` as +a :class:`dict` or ``rows`` as a list of :class:`dict`. For queries that do +not return any data, both ``row`` and ``rows`` will be :const:`None`. + +.. autoclass:: sprockets_postgres.QueryResult + :members: diff --git a/docs/request_handler.rst b/docs/request_handler.rst new file mode 100644 index 0000000..4d9f19c --- /dev/null +++ b/docs/request_handler.rst @@ -0,0 +1,11 @@ +RequestHandler Mixin +==================== +The :class:`~sprockets_postgres.RequestHandlerMixin` is a Tornado +:class:`tornado.web.RequestHandler` mixin that provides easy to use +functionality for interacting with PostgreSQL. + +.. autoclass:: sprockets_postgres.RequestHandlerMixin + :members: + :undoc-members: + :private-members: + :member-order: bysource diff --git a/docs/requirements.txt b/docs/requirements.txt new file mode 100644 index 0000000..7a7ed02 --- /dev/null +++ b/docs/requirements.txt @@ -0,0 +1,5 @@ +Sphinx==2.4.4 +sphinx-autodoc-typehints +sphinx_rtd_theme +typed_ast +typing_extensions diff --git a/docs/typing.rst b/docs/typing.rst new file mode 100644 index 0000000..0765676 --- /dev/null +++ b/docs/typing.rst @@ -0,0 +1,6 @@ +Type Annotations +================ + +.. autodata:: sprockets_postgres.QueryParameters + +.. autodata:: sprockets_postgres.Timeout diff --git a/fixtures/testing.sql b/fixtures/testing.sql new file mode 100644 index 0000000..b6b3ce0 --- /dev/null +++ b/fixtures/testing.sql @@ -0,0 +1,30 @@ +CREATE EXTENSION "uuid-ossp"; + +CREATE TABLE public.test ( + id UUID NOT NULL PRIMARY KEY, + created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP, + last_modified_at TIMESTAMP WITH TIME ZONE, + value TEXT NOT NULL +); + +CREATE TABLE public.query_count( + key TEXT NOT NULL PRIMARY KEY, + last_updated_at TIMESTAMP WITH TIME ZONE, + count INTEGER +); + +INSERT INTO public.query_count (key, last_updated_at, count) + VALUES ('test', CURRENT_TIMESTAMP, 0); + +CREATE TABLE public.test_rows ( + id INTEGER NOT NULL PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY, + created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP, + last_modified_at TIMESTAMP WITH TIME ZONE, + toggle BOOLEAN NOT NULL +); + +INSERT INTO public.test_rows (toggle) VALUES (FALSE); +INSERT INTO public.test_rows (toggle) VALUES (FALSE); +INSERT INTO public.test_rows (toggle) VALUES (FALSE); +INSERT INTO public.test_rows (toggle) VALUES (FALSE); +INSERT INTO public.test_rows (toggle) VALUES (FALSE); diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 0000000..5000020 --- /dev/null +++ b/setup.cfg @@ -0,0 +1,76 @@ +[metadata] +name = sprockets-postgres +version = file: VERSION +description = An asynchronous Postgres client and mixin for Tornado applications +long_description = file: README.rst +long_description_content_type = text/x-rst; charset=UTF-8 +license = BSD 3-Clause License +license-file = LICENSE +home-page = https://github.com/sprockets/sprockets-postgres +project_urls = + Bug Tracker = https://github.com/sprockets/sprockets-postgres/issues + Documentation = https://sprockets-postgres.readthedocs.io + Source Code = https://github.com/sprockets/sprockets-postgres/ +classifiers = + Development Status :: 3 - Alpha + Intended Audience :: Developers + License :: OSI Approved :: BSD License + Natural Language :: English + Operating System :: OS Independent + Programming Language :: Python :: 3 + Programming Language :: Python :: 3.7 + Programming Language :: Python :: 3.8 + Topic :: Communications + Topic :: Internet + Topic :: Software Development + Typing :: Typed +requires-dist = setuptools +keywords = + postgres + python3 + tornado + +[options] +include_package_data = True +install_requires = + aiopg>=1.0.0,<2 + sprockets.http>=2.1.1,<3 + tornado>=6,<7 +pymodules = + sprockets_postgres +zip_safe = true + +[options.extras_require] +testing = + coverage + flake8 + flake8-comprehensions + flake8-deprecated + flake8-import-order + flake8-print + flake8-quotes + flake8-rst-docstrings + flake8-tuple + pygments + +[coverage:run] +branch = True +command_line = -m unittest discover tests --verbose +data_file = build/.coverage + +[coverage:report] +show_missing = True +include = + sprockets_postgres.py + +[coverage:html] +directory = build/coverage + +[coverage:xml] +output = build/coverage.xml + +[flake8] +application-import-names = sprockets_postgres,tests +exclude = build,docs,env +import-order-style = pycharm +rst-roles = attr,class,const,data,exc,func,meth,mod,obj,ref,yields diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..b908cbe --- /dev/null +++ b/setup.py @@ -0,0 +1,3 @@ +import setuptools + +setuptools.setup() diff --git a/sprockets_postgres.py b/sprockets_postgres.py new file mode 100644 index 0000000..6747e28 --- /dev/null +++ b/sprockets_postgres.py @@ -0,0 +1,585 @@ +import asyncio +import contextlib +import dataclasses +import logging +import os +import time +import typing +from distutils import util + +import aiopg +import psycopg2 +from aiopg import pool +from psycopg2 import errors, extras +from tornado import ioloop, web + +LOGGER = logging.getLogger('sprockets-postgres') + +DEFAULT_POSTGRES_CONNECTION_TIMEOUT = 10 +DEFAULT_POSTGRES_CONNECTION_TTL = 300 +DEFAULT_POSTGRES_HSTORE = 'FALSE' +DEFAULT_POSTGRES_JSON = 'FALSE' +DEFAULT_POSTGRES_MAX_POOL_SIZE = 0 +DEFAULT_POSTGRES_MIN_POOL_SIZE = 1 +DEFAULT_POSTGRES_QUERY_TIMEOUT = 120 +DEFAULT_POSTGRES_UUID = 'TRUE' + +QueryParameters = typing.Union[dict, list, tuple, None] +"""Type annotation for query parameters""" + +Timeout = typing.Union[int, float, None] +"""Type annotation for timeout values""" + + +@dataclasses.dataclass +class QueryResult: + """A :func:`Data Class ` that is generated as a + result of each query that is executed. + + :param row_count: The quantity of rows impacted by the query + :param row: If a single row is returned, the data for that row + :param rows: If more than one row is returned, this attribute is set as the + list of rows, in order. + + """ + row_count: int + row: typing.Optional[dict] + rows: typing.Optional[typing.List[dict]] + + +class PostgresConnector: + """Wraps a :class:`aiopg.Cursor` instance for creating explicit + transactions, calling stored procedures, and executing queries. + + Unless the :meth:`~sprockets_postgres.PostgresConnector.transaction` + asynchronous :ref:`context-manager ` is used, + each call to :meth:`~sprockets_postgres.PostgresConnector.callproc` and + :meth:`~sprockets_postgres.PostgresConnector.execute` is an explicit + transaction. + + .. note:: :class:`PostgresConnector` instances are created by + :meth:`ApplicationMixin.postgres_connector + ` and should + not be created directly. + + :param cursor: The cursor to use in the connector + :type cursor: aiopg.Cursor + :param on_error: The callback to invoke when an exception is caught + :param on_duration: The callback to invoke when a query is complete and all + of the data has been returned. + :param timeout: A timeout value in seconds for executing queries. If + unspecified, defaults to the ``POSTGRES_QUERY_TIMEOUT`` environment + variable and if that is not specified, to the + :const:`DEFAULT_POSTGRES_QUERY_TIMEOUT` value of ``120`` + :type timeout: :data:`~sprockets_postgres.Timeout` + + """ + def __init__(self, + cursor: aiopg.Cursor, + on_error: typing.Callable, + on_duration: typing.Optional[typing.Callable] = None, + timeout: Timeout = None): + self.cursor = cursor + self._on_error = on_error + self._on_duration = on_duration + self._timeout = timeout or int( + os.environ.get( + 'POSTGRES_QUERY_TIMEOUT', + DEFAULT_POSTGRES_QUERY_TIMEOUT)) + + async def callproc(self, + name: str, + parameters: QueryParameters = None, + metric_name: str = '', + *, + timeout: Timeout = None) -> QueryResult: + """Execute a stored procedure / function + + :param name: The stored procedure / function name to call + :param parameters: Query parameters to pass when calling + :type parameters: :data:`~sprockets_postgres.QueryParameters` + :param metric_name: The metric name for duration recording and logging + :param timeout: Timeout value to override the default or the value + specified when creating the + :class:`~sprockets_postgres.PostgresConnector`. + :type timeout: :data:`~sprockets_postgres.Timeout` + + :raises asyncio.TimeoutError: when there is a query or network timeout + :raises psycopg2.Error: when there is an exception raised by Postgres + + .. note: :exc:`psycopg2.Error` is the base exception for all + :mod:`psycopg2` exceptions and the actual exception raised will + likely be more specific. + + :rtype: :class:`~sprockets_postgres.QueryResult` + + """ + return await self._query( + self.cursor.callproc, + metric_name, + procname=name, + parameters=parameters, + timeout=timeout) + + async def execute(self, + sql: str, + parameters: QueryParameters = None, + metric_name: str = '', + *, + timeout: Timeout = None) -> QueryResult: + """Execute a query, specifying a name for the query, the SQL statement, + and optional positional arguments to pass in with the query. + + Parameters may be provided as sequence or mapping and will be + bound to variables in the operation. Variables are specified + either with positional ``%s`` or named ``%({name})s`` placeholders. + + :param sql: The SQL statement to execute + :param parameters: Query parameters to pass as part of the execution + :type parameters: :data:`~sprockets_postgres.QueryParameters` + :param metric_name: The metric name for duration recording and logging + :param timeout: Timeout value to override the default or the value + specified when creating the + :class:`~sprockets_postgres.PostgresConnector`. + :type timeout: :data:`~sprockets_postgres.Timeout` + + :raises asyncio.TimeoutError: when there is a query or network timeout + :raises psycopg2.Error: when there is an exception raised by Postgres + + .. note: :exc:`psycopg2.Error` is the base exception for all + :mod:`psycopg2` exceptions and the actual exception raised will + likely be more specific. + + :rtype: :class:`~sprockets_postgres.QueryResult` + + """ + return await self._query( + self.cursor.execute, + metric_name, + operation=sql, + parameters=parameters, + timeout=timeout) + + @contextlib.asynccontextmanager + async def transaction(self) \ + -> typing.AsyncContextManager['PostgresConnector']: + """asynchronous :ref:`context-manager ` + function that implements full ``BEGIN``, ``COMMIT``, and ``ROLLBACK`` + semantics. If there is a :exc:`psycopg2.Error` raised during the + transaction, the entire transaction will be rolled back. + + If no exception is raised, the transaction will be committed when + exiting the context manager. + + .. note:: This method is provided for edge case usage. As a + generalization + :meth:`sprockets_postgres.RequestHandlerMixin.postgres_transaction` + should be used instead. + + *Usage Example* + + .. code-block:: + + class RequestHandler(sprockets_postgres.RequestHandlerMixin, + web.RequestHandler): + + async def post(self): + async with self.postgres_transaction() as transaction: + result1 = await transaction.execute(QUERY_ONE) + result2 = await transaction.execute(QUERY_TWO) + result3 = await transaction.execute(QUERY_THREE) + + :raises asyncio.TimeoutError: when there is a query or network timeout + when starting the transaction + :raises psycopg2.Error: when there is an exception raised by Postgres + when starting the transaction + + .. note: :exc:`psycopg2.Error` is the base exception for all + :mod:`psycopg2` exceptions and the actual exception raised will + likely be more specific. + + """ + async with self.cursor.begin(): + yield self + + async def _query(self, + method: typing.Callable, + metric_name: str, + **kwargs): + if kwargs['timeout'] is None: + kwargs['timeout'] = self._timeout + start_time = time.monotonic() + try: + await method(**kwargs) + except (asyncio.TimeoutError, psycopg2.Error) as err: + exc = self._on_error(metric_name, err) + if exc: + raise exc + else: + results = await self._query_results() + if self._on_duration: + self._on_duration( + metric_name, time.monotonic() - start_time) + return results + + async def _query_results(self) -> QueryResult: + count, row, rows = self.cursor.rowcount, None, None + if self.cursor.rowcount == 1: + try: + row = dict(await self.cursor.fetchone()) + except psycopg2.ProgrammingError: + pass + elif self.cursor.rowcount > 1: + try: + rows = [dict(row) for row in await self.cursor.fetchall()] + except psycopg2.ProgrammingError: + pass + return QueryResult(count, row, rows) + + +class ConnectionException(Exception): + """Raised when the connection to Postgres can not be established""" + + +class ApplicationMixin: + """ + :class:`sprockets.http.app.Application` / :class:`tornado.web.Application` + mixin for handling the connection to Postgres and exporting functions for + querying the database, getting the status, and proving a cursor. + + Automatically creates and shuts down :class:`aiopg.Pool` on startup + and shutdown by installing `on_start` and `shutdown` callbacks into the + :class:`~sprockets.http.app.Application` instance. + + """ + POSTGRES_STATUS_TIMEOUT = 3 + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._postgres_pool: typing.Optional[pool.Pool] = None + self.runner_callbacks['on_start'].append(self._postgres_setup) + self.runner_callbacks['shutdown'].append(self._postgres_shutdown) + + @contextlib.asynccontextmanager + async def postgres_connector(self, + on_error: typing.Callable, + on_duration: typing.Optional[ + typing.Callable] = None, + timeout: Timeout = None) \ + -> typing.AsyncContextManager[PostgresConnector]: + """Asynchronous :ref:`context-manager ` + that returns a :class:`~sprockets_postgres.PostgresConnector` instance + from the connection pool with a cursor. + + .. note:: This function is designed to work in conjunction with the + :class:`~sprockets_postgres.RequestHandlerMixin` and is generally + not invoked directly. + + :param on_error: A callback function that is invoked on exception. If + an exception is returned from that function, it will raise it. + :param on_duration: An optional callback function that is invoked after + a query has completed to record the duration that encompasses + both executing the query and retrieving the returned records, if + any. + :param timeout: Used to override the default query timeout. + :type timeout: :data:`~sprockets_postgres.Timeout` + + :raises asyncio.TimeoutError: when the request to retrieve a connection + from the pool times out. + :raises sprockets_postgres.ConnectionException: when the application + can not connect to the configured Postgres instance. + :raises psycopg2.Error: when Postgres raises an exception during the + creation of the cursor. + + .. note: :exc:`psycopg2.Error` is the base exception for all + :mod:`psycopg2` exceptions and the actual exception raised will + likely be more specific. + + """ + try: + async with self._postgres_pool.acquire() as conn: + async with conn.cursor( + cursor_factory=extras.RealDictCursor, + timeout=timeout) as cursor: + yield PostgresConnector( + cursor, on_error, on_duration, timeout) + except (asyncio.TimeoutError, psycopg2.Error) as err: + exc = on_error('postgres_connector', ConnectionException(str(err))) + if exc: + raise exc + else: # postgres_status.on_error does not return an exception + yield None + + async def postgres_status(self) -> dict: + """Invoke from the ``/status`` RequestHandler to check that there is + a Postgres connection handler available and return info about the + pool. + + The ``available`` item in the dictionary indicates that the + application was able to perform a ``SELECT 1`` against the database + using a :class:`~sprockets_postgres.PostgresConnector` instance. + + The ``pool_size`` item indicates the current quantity of open + connections to Postgres. + + The ``pool_free`` item indicates the current number of idle + connections available to process queries. + + *Example return value* + + .. code-block:: python + + { + 'available': True, + 'pool_size': 10, + 'pool_free': 8 + } + + """ + query_error = asyncio.Event() + + def on_error(_metric_name, _exc) -> None: + query_error.set() + return None + + async with self.postgres_connector( + on_error, + timeout=self.POSTGRES_STATUS_TIMEOUT) as connector: + if connector: + await connector.execute('SELECT 1') + + return { + 'available': not query_error.is_set(), + 'pool_size': self._postgres_pool.size, + 'pool_free': self._postgres_pool.freesize + } + + async def _postgres_setup(self, + _app: web.Application, + loop: ioloop.IOLoop) -> None: + """Setup the Postgres pool of connections and log if there is an error. + + This is invoked by the Application on start callback mechanism. + + """ + if 'POSTGRES_URL' not in os.environ: + LOGGER.critical('Missing POSTGRES_URL environment variable') + return self.stop(loop) + self._postgres_pool = pool.Pool( + os.environ['POSTGRES_URL'], + minsize=int( + os.environ.get( + 'POSTGRES_MIN_POOL_SIZE', + DEFAULT_POSTGRES_MIN_POOL_SIZE)), + maxsize=int( + os.environ.get( + 'POSTGRES_MAX_POOL_SIZE', + DEFAULT_POSTGRES_MAX_POOL_SIZE)), + timeout=int( + os.environ.get( + 'POSTGRES_CONNECT_TIMEOUT', + DEFAULT_POSTGRES_CONNECTION_TIMEOUT)), + enable_hstore=util.strtobool( + os.environ.get( + 'POSTGRES_HSTORE', DEFAULT_POSTGRES_HSTORE)), + enable_json=util.strtobool( + os.environ.get('POSTGRES_JSON', DEFAULT_POSTGRES_JSON)), + enable_uuid=util.strtobool( + os.environ.get('POSTGRES_UUID', DEFAULT_POSTGRES_UUID)), + echo=False, + on_connect=None, + pool_recycle=int( + os.environ.get( + 'POSTGRES_CONNECTION_TTL', + DEFAULT_POSTGRES_CONNECTION_TTL))) + try: + async with self._postgres_pool._cond: + await self._postgres_pool._fill_free_pool(False) + except (psycopg2.OperationalError, + psycopg2.Error) as error: # pragma: nocover + LOGGER.warning('Error connecting to PostgreSQL on startup: %s', + error) + + async def _postgres_shutdown(self, _ioloop: ioloop.IOLoop) -> None: + """Shutdown the Postgres connections and wait for them to close. + + This is invoked by the Application shutdown callback mechanism. + + """ + if self._postgres_pool is not None: + self._postgres_pool.close() + await self._postgres_pool.wait_closed() + + +class RequestHandlerMixin: + """ + A RequestHandler mixin class exposing functions for querying the database, + recording the duration to either :mod:`sprockets-influxdb + ` or :mod:`sprockets.mixins.metrics`, and + handling exceptions. + + """ + async def postgres_callproc(self, + name: str, + parameters: QueryParameters = None, + metric_name: str = '', + *, + timeout: Timeout = None) -> QueryResult: + """Execute a stored procedure / function + + :param name: The stored procedure / function name to call + :param parameters: Query parameters to pass when calling + :type parameters: :data:`~sprockets_postgres.QueryParameters` + :param metric_name: The metric name for duration recording and logging + :param timeout: Timeout value to override the default or the value + specified when creating the + :class:`~sprockets_postgres.PostgresConnector`. + :type timeout: :data:`~sprockets_postgres.Timeout` + + :raises asyncio.TimeoutError: when there is a query or network timeout + :raises psycopg2.Error: when there is an exception raised by Postgres + + .. note: :exc:`psycopg2.Error` is the base exception for all + :mod:`psycopg2` exceptions and the actual exception raised will + likely be more specific. + + :rtype: :class:`~sprockets_postgres.QueryResult` + + """ + async with self.application.postgres_connector( + self._on_postgres_error, + self._on_postgres_timing, + timeout) as connector: + return await connector.callproc( + name, parameters, metric_name, timeout=timeout) + + async def postgres_execute(self, + sql: str, + parameters: QueryParameters = None, + metric_name: str = '', + *, + timeout: Timeout = None) -> QueryResult: + """Execute a query, specifying a name for the query, the SQL statement, + and optional positional arguments to pass in with the query. + + Parameters may be provided as sequence or mapping and will be + bound to variables in the operation. Variables are specified + either with positional ``%s`` or named ``%({name})s`` placeholders. + + :param sql: The SQL statement to execute + :param parameters: Query parameters to pass as part of the execution + :type parameters: :data:`~sprockets_postgres.QueryParameters` + :param metric_name: The metric name for duration recording and logging + :param timeout: Timeout value to override the default or the value + specified when creating the + :class:`~sprockets_postgres.PostgresConnector`. + :type timeout: :data:`~sprockets_postgres.Timeout` + + :raises asyncio.TimeoutError: when there is a query or network timeout + :raises psycopg2.Error: when there is an exception raised by Postgres + + .. note: :exc:`psycopg2.Error` is the base exception for all + :mod:`psycopg2` exceptions and the actual exception raised will + likely be more specific. + + :rtype: :class:`~sprockets_postgres.QueryResult` + + """ + async with self.application.postgres_connector( + self._on_postgres_error, + self._on_postgres_timing, + timeout) as connector: + return await connector.execute( + sql, parameters, metric_name, timeout=timeout) + + @contextlib.asynccontextmanager + async def postgres_transaction(self, timeout: Timeout = None) \ + -> typing.AsyncContextManager[PostgresConnector]: + """asynchronous :ref:`context-manager ` + function that implements full ``BEGIN``, ``COMMIT``, and ``ROLLBACK`` + semantics. If there is a :exc:`psycopg2.Error` raised during the + transaction, the entire transaction will be rolled back. + + If no exception is raised, the transaction will be committed when + exiting the context manager. + + *Usage Example* + + .. code-block:: python + + class RequestHandler(sprockets_postgres.RequestHandlerMixin, + web.RequestHandler): + + async def post(self): + async with self.postgres_transaction() as transaction: + result1 = await transaction.execute(QUERY_ONE) + result2 = await transaction.execute(QUERY_TWO) + result3 = await transaction.execute(QUERY_THREE) + + + :param timeout: Timeout value to override the default or the value + specified when creating the + :class:`~sprockets_postgres.PostgresConnector`. + :type timeout: :data:`~sprockets_postgres.Timeout` + + :raises asyncio.TimeoutError: when there is a query or network timeout + when starting the transaction + :raises psycopg2.Error: when there is an exception raised by Postgres + when starting the transaction + + .. note: :exc:`psycopg2.Error` is the base exception for all + :mod:`psycopg2` exceptions and the actual exception raised will + likely be more specific. + + """ + async with self.application.postgres_connector( + self._on_postgres_error, + self._on_postgres_timing, + timeout) as connector: + async with connector.transaction(): + yield connector + + def _on_postgres_error(self, + metric_name: str, + exc: Exception) -> typing.Optional[Exception]: + """Override for different error handling behaviors + + Return an exception if you would like for it to be raised, or swallow + it here. + + """ + LOGGER.error('%s in %s for %s (%s)', + exc.__class__.__name__, self.__class__.__name__, + metric_name, str(exc).split('\n')[0]) + if isinstance(exc, ConnectionException): + raise web.HTTPError(503, reason='Database Connection Error') + elif isinstance(exc, asyncio.TimeoutError): + raise web.HTTPError(500, reason='Query Timeout') + elif isinstance(exc, errors.UniqueViolation): + raise web.HTTPError(409, reason='Unique Violation') + elif isinstance(exc, psycopg2.Error): + raise web.HTTPError(500, reason='Database Error') + return exc + + def _on_postgres_timing(self, + metric_name: str, + duration: float) -> None: + """Override for custom metric recording. As a default behavior it will + attempt to detect `sprockets-influxdb + `_ and + `sprockets.mixins.metrics + `_ and + record the metrics using them if they are available. If they are not + available, it will record the query duration to the `DEBUG` log. + + :param metric_name: The name of the metric to record + :param duration: The duration to record for the metric + + """ + if hasattr(self, 'influxdb'): # sprockets-influxdb + self.influxdb.set_field(metric_name, duration) + elif hasattr(self, 'record_timing'): # sprockets.mixins.metrics + self.record_timing(metric_name, duration) + else: + LOGGER.debug('Postgres query %s duration: %s', + metric_name, duration) diff --git a/tests.py b/tests.py new file mode 100644 index 0000000..67d0402 --- /dev/null +++ b/tests.py @@ -0,0 +1,431 @@ +import asyncio +import json +import os +import typing +import unittest +import uuid +from unittest import mock + +import psycopg2 +from psycopg2 import errors +from sprockets.http import app, testing +from tornado import ioloop, web + +import sprockets_postgres + + +class RequestHandler(sprockets_postgres.RequestHandlerMixin, + web.RequestHandler): + """Base RequestHandler for test endpoints""" + + def cast_data(self, data: typing.Union[dict, list, None]) \ + -> typing.Union[dict, list, None]: + if data is None: + return None + elif isinstance(data, list): + return [self.cast_data(row) for row in data] + return {k: str(v) for k, v in data.items()} + + +class CallprocRequestHandler(RequestHandler): + + async def get(self): + result = await self.postgres_callproc( + 'uuid_generate_v4', metric_name='uuid') + await self.finish({'value': str(result.row['uuid_generate_v4'])}) + + +class CountRequestHandler(RequestHandler): + + GET_SQL = """\ + SELECT last_updated_at, count + FROM public.query_count + WHERE key = 'test';""" + + async def get(self): + result = await self.postgres_execute(self.GET_SQL) + await self.finish(self.cast_data(result.row)) + + +class ErrorRequestHandler(RequestHandler): + + GET_SQL = """\ + SELECT last_updated_at, count + FROM public.query_count + WHERE key = 'test';""" + + async def get(self): + await self.postgres_execute(self.GET_SQL) + self.set_status(204) + + def _on_postgres_error(self, + metric_name: str, + exc: Exception) -> typing.Optional[Exception]: + return RuntimeError() + + +class ErrorPassthroughRequestHandler(RequestHandler): + + async def get(self): + exc = self._on_postgres_error('test', RuntimeError()) + if isinstance(exc, RuntimeError): + self.set_status(204) + else: + raise web.HTTPError(500, 'Did not pass through') + + +class ExecuteRequestHandler(RequestHandler): + + GET_SQL = 'SELECT %s::TEXT AS value;' + + async def get(self): + timeout = self.get_argument('timeout', None) + if timeout is not None: + timeout = int(timeout) + result = await self.postgres_execute( + self.GET_SQL, [self.get_argument('value')], timeout=timeout) + await self.finish({ + 'value': result.row['value'] if result.row else None}) + + +class InfluxDBRequestHandler(ExecuteRequestHandler): + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.influxdb = self.application.influxdb + self.influxdb.add_field = mock.Mock() + + +class MetricsMixinRequestHandler(ExecuteRequestHandler): + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.record_timing = self.application.record_timing + + +class MultiRowRequestHandler(RequestHandler): + + GET_SQL = 'SELECT * FROM public.test_rows;' + + UPDATE_SQL = """\ + UPDATE public.test_rows + SET toggle = %(to_value)s, + last_modified_at = CURRENT_TIMESTAMP + WHERE toggle IS %(from_value)s""" + + async def get(self): + result = await self.postgres_execute(self.GET_SQL) + await self.finish({ + 'count': result.row_count, + 'rows': self.cast_data(result.rows)}) + + async def post(self): + body = json.loads(self.request.body.decode('utf-8')) + result = await self.postgres_execute( + self.UPDATE_SQL, { + 'to_value': body['value'], 'from_value': not body['value']}) + await self.finish({ + 'count': result.row_count, + 'rows': self.cast_data(result.rows)}) + + +class NoErrorRequestHandler(ErrorRequestHandler): + + def _on_postgres_error(self, + metric_name: str, + exc: Exception) -> typing.Optional[Exception]: + return None + + +class NoRowRequestHandler(RequestHandler): + + GET_SQL = """\ + SELECT * FROM information_schema.tables WHERE table_schema = 'foo';""" + + async def get(self): + result = await self.postgres_execute(self.GET_SQL) + await self.finish({ + 'count': result.row_count, + 'rows': self.cast_data(result.rows)}) + + +class StatusRequestHandler(RequestHandler): + + async def get(self): + status = await self.application.postgres_status() + if not status['available']: + self.set_status(503, 'Database Unavailable') + await self.finish(status) + + +class TransactionRequestHandler(RequestHandler): + + GET_SQL = """\ + SELECT id, created_at, last_modified_at, value + FROM public.test + WHERE id = %(id)s;""" + + POST_SQL = """\ + INSERT INTO public.test (id, created_at, value) + VALUES (%(id)s, CURRENT_TIMESTAMP, %(value)s) + RETURNING id, created_at, value;""" + + UPDATE_COUNT_SQL = """\ + UPDATE public.query_count + SET count = count + 1, + last_updated_at = CURRENT_TIMESTAMP + WHERE key = 'test' + RETURNING last_updated_at, count;""" + + async def get(self, test_id): + result = await self.postgres_execute(self.GET_SQL, {'id': test_id}) + if not result.row_count: + raise web.HTTPError(404, 'Not Found') + await self.finish(self.cast_data(result.row)) + + async def post(self): + body = json.loads(self.request.body.decode('utf-8')) + async with self.postgres_transaction() as postgres: + + # This should roll back on the second call to this endopoint + self.application.first_txn = await postgres.execute( + self.POST_SQL, {'id': str(uuid.uuid4()), + 'value': str(uuid.uuid4())}) + + # This should roll back on the second call to this endopoint + count = await postgres.execute(self.UPDATE_COUNT_SQL) + + # This will trigger an error on the second call to this endpoint + user = await postgres.execute(self.POST_SQL, body) + + await self.finish({ + 'count': self.cast_data(count.row), + 'user': self.cast_data(user.row)}) + + +class Application(sprockets_postgres.ApplicationMixin, + app.Application): + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.influxdb = mock.Mock() + self.record_timing = mock.Mock() + self.first_txn: typing.Optional[sprockets_postgres.QueryResult] = None + + +class TestCase(testing.SprocketsHttpTestCase): + + @classmethod + def setUpClass(cls): + with open('build/test-environment') as f: + for line in f: + if line.startswith('export '): + line = line[7:] + name, _, value = line.strip().partition('=') + os.environ[name] = value + + def get_app(self): + self.app = Application(handlers=[ + web.url('/callproc', CallprocRequestHandler), + web.url('/count', CountRequestHandler), + web.url('/error', ErrorRequestHandler), + web.url('/error-passthrough', ErrorPassthroughRequestHandler), + web.url('/execute', ExecuteRequestHandler), + web.url('/influxdb', InfluxDBRequestHandler), + web.url('/metrics-mixin', MetricsMixinRequestHandler), + web.url('/multi-row', MultiRowRequestHandler), + web.url('/no-error', NoErrorRequestHandler), + web.url('/no-row', NoRowRequestHandler), + web.url('/status', StatusRequestHandler), + web.url('/transaction', TransactionRequestHandler), + web.url('/transaction/(?P.*)', TransactionRequestHandler) + ]) + return self.app + + +class RequestHandlerMixinTestCase(TestCase): + + def test_postgres_status(self): + response = self.fetch('/status') + data = json.loads(response.body) + self.assertTrue(data['available']) + self.assertGreaterEqual(data['pool_size'], 1) + self.assertGreaterEqual(data['pool_free'], 1) + + @mock.patch('aiopg.pool.Pool.acquire') + def test_postgres_status_connect_error(self, acquire): + acquire.side_effect = asyncio.TimeoutError() + response = self.fetch('/status') + self.assertEqual(response.code, 503) + self.assertFalse(json.loads(response.body)['available']) + + @mock.patch('aiopg.cursor.Cursor.execute') + def test_postgres_status_error(self, execute): + execute.side_effect = asyncio.TimeoutError() + response = self.fetch('/status') + self.assertEqual(response.code, 503) + self.assertFalse(json.loads(response.body)['available']) + + def test_postgres_callproc(self): + response = self.fetch('/callproc') + self.assertEqual(response.code, 200) + self.assertIsInstance( + uuid.UUID(json.loads(response.body)['value']), uuid.UUID) + + @mock.patch('aiopg.cursor.Cursor.execute') + def test_postgres_error(self, execute): + execute.side_effect = asyncio.TimeoutError + response = self.fetch('/error') + self.assertEqual(response.code, 500) + self.assertIn(b'Internal Server Error', response.body) + + @mock.patch('aiopg.pool.Pool.acquire') + def test_postgres_error_on_connect(self, acquire): + acquire.side_effect = asyncio.TimeoutError + response = self.fetch('/error') + self.assertEqual(response.code, 500) + self.assertIn(b'Internal Server Error', response.body) + + def test_postgres_error_passthrough(self): + response = self.fetch('/error-passthrough') + self.assertEqual(response.code, 204) + + def test_postgres_execute(self): + expectation = str(uuid.uuid4()) + response = self.fetch('/execute?value={}'.format(expectation)) + self.assertEqual(response.code, 200) + self.assertEqual(json.loads(response.body)['value'], expectation) + + def test_postgres_execute_with_timeout(self): + expectation = str(uuid.uuid4()) + response = self.fetch( + '/execute?value={}&timeout=5'.format(expectation)) + self.assertEqual(response.code, 200) + self.assertEqual(json.loads(response.body)['value'], expectation) + + def test_postgres_influxdb(self): + expectation = str(uuid.uuid4()) + response = self.fetch( + '/influxdb?value={}'.format(expectation)) + self.assertEqual(response.code, 200) + self.assertEqual(json.loads(response.body)['value'], expectation) + self.app.influxdb.set_field.assert_called_once() + + def test_postgres_metrics_mixin(self): + expectation = str(uuid.uuid4()) + response = self.fetch( + '/metrics-mixin?value={}'.format(expectation)) + self.assertEqual(response.code, 200) + self.assertEqual(json.loads(response.body)['value'], expectation) + self.app.record_timing.assert_called_once() + + def test_postgres_multirow_get(self): + response = self.fetch('/multi-row') + self.assertEqual(response.code, 200) + body = json.loads(response.body) + self.assertEqual(body['count'], 5) + self.assertIsInstance(body['rows'], list) + + def test_postgres_multirow_no_data(self): + for value in [True, False]: + response = self.fetch( + '/multi-row', method='POST', body=json.dumps({'value': value})) + self.assertEqual(response.code, 200) + body = json.loads(response.body) + self.assertEqual(body['count'], 5) + self.assertIsNone(body['rows']) + + def test_postgres_norow(self): + response = self.fetch('/no-row') + self.assertEqual(response.code, 200) + body = json.loads(response.body) + self.assertEqual(body['count'], 0) + self.assertIsNone(body['rows']) + + @mock.patch('aiopg.cursor.Cursor.execute') + def test_postgres_execute_timeout_error(self, execute): + execute.side_effect = asyncio.TimeoutError() + response = self.fetch('/execute?value=1') + self.assertEqual(response.code, 500) + self.assertIn(b'Query Timeout', response.body) + + @mock.patch('aiopg.cursor.Cursor.execute') + def test_postgres_execute_unique_violation(self, execute): + execute.side_effect = errors.UniqueViolation() + response = self.fetch('/execute?value=1') + self.assertEqual(response.code, 409) + self.assertIn(b'Unique Violation', response.body) + + @mock.patch('aiopg.cursor.Cursor.execute') + def test_postgres_execute_error(self, execute): + execute.side_effect = psycopg2.Error() + response = self.fetch('/execute?value=1') + self.assertEqual(response.code, 500) + self.assertIn(b'Database Error', response.body) + + @mock.patch('aiopg.cursor.Cursor.fetchone') + def test_postgres_programming_error(self, fetchone): + fetchone.side_effect = psycopg2.ProgrammingError() + response = self.fetch('/execute?value=1') + self.assertEqual(response.code, 200) + self.assertIsNone(json.loads(response.body)['value']) + + @mock.patch('aiopg.connection.Connection.cursor') + def test_postgres_cursor_raises(self, cursor): + cursor.side_effect = asyncio.TimeoutError() + response = self.fetch('/execute?value=1') + self.assertEqual(response.code, 503) + + +class TransactionTestCase(TestCase): + + def test_transactions(self): + test_body = { + 'id': str(uuid.uuid4()), + 'value': str(uuid.uuid4()) + } + response = self.fetch( + '/transaction', method='POST', body=json.dumps(test_body)) + self.assertEqual(response.code, 200) + record = json.loads(response.body.decode('utf-8')) + self.assertEqual(record['user']['id'], test_body['id']) + self.assertEqual(record['user']['value'], test_body['value']) + count = record['count']['count'] + last_updated = record['count']['last_updated_at'] + + response = self.fetch( + '/transaction', method='POST', body=json.dumps(test_body)) + self.assertEqual(response.code, 409) + + response = self.fetch( + '/transaction/{}'.format(self.app.first_txn.row['id'])) + self.assertEqual(response.code, 404) + + response = self.fetch('/count') + self.assertEqual(response.code, 200) + record = json.loads(response.body.decode('utf-8')) + self.assertEqual(record['count'], count) + self.assertEqual(record['last_updated_at'], last_updated) + + +class MissingURLTestCase(unittest.TestCase): + + @classmethod + def setUpClass(cls): + with open('build/test-environment') as f: + for line in f: + if line.startswith('export '): + line = line[7:] + name, _, value = line.strip().partition('=') + if name != 'POSTGRES_URL': + os.environ[name] = value + if 'POSTGRES_URL' in os.environ: + del os.environ['POSTGRES_URL'] + + def test_that_stop_is_invoked(self): + io_loop = ioloop.IOLoop.current() + obj = Application() + obj.stop = mock.Mock(wraps=obj.stop) + obj.start(io_loop) + io_loop.start() + obj.stop.assert_called_once()