Merge pull request #1 from gmr/master

Initial Version
This commit is contained in:
Gavin M. Roy 2020-04-10 15:38:28 -04:00 committed by GitHub
commit 9056d9b8b1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
28 changed files with 1638 additions and 23 deletions

21
.editorconfig Normal file
View file

@ -0,0 +1,21 @@
# top-most EditorConfig file
root = true
# Unix-style newlines with a newline ending every file
[*]
end_of_line = lf
insert_final_newline = true
# 4 space indentation
[*.py]
indent_style = space
indent_size = 4
# 2 space indentation
[*.yml]
indent_style = space
indent_size = 2
[bootstrap]
indent_style = space
indent_size = 2

20
.github/workflows/deploy.yaml vendored Normal file
View file

@ -0,0 +1,20 @@
name: Deployment
on:
push:
branches-ignore: ["*"]
tags: ["*"]
jobs:
deploy:
runs-on: ubuntu-latest
if: github.event_name == 'push' && startsWith(github.event.ref, 'refs/tags') && github.repository == 'sprockets/sprockets-postgres'
container: python:3.8-alpine
steps:
- name: Checkout repository
uses: actions/checkout@v1
- name: Build package
run: python3 setup.py sdist
- name: Publish package
uses: pypa/gh-action-pypi-publish@master
with:
user: __token__
password: ${{ secrets.PYPI_PASSWORD }}

69
.github/workflows/testing.yaml vendored Normal file
View file

@ -0,0 +1,69 @@
name: Testing
on:
push:
branches: ["*"]
paths-ignore:
- 'docs/**'
- 'setup.*'
- '*.md'
- '*.rst'
tags-ignore: ["*"]
jobs:
test:
runs-on: ubuntu-latest
timeout-minutes: 3
services:
postgres:
image: postgres:12-alpine
options: >-
--health-cmd "pg_isready"
--health-interval 10s
--health-timeout 10s
--health-retries 5
ports:
- 5432
env:
POSTGRES_HOST_AUTH_METHOD: trust
strategy:
matrix:
python: [3.7, 3.8]
container:
image: python:${{ matrix.python }}-alpine
steps:
- name: Checkout repository
uses: actions/checkout@v1
- name: Install OS dependencies
run: apk --update add gcc make musl-dev linux-headers postgresql-dev
- name: Install testing dependencies
run: pip3 --no-cache-dir install -e '.[testing]'
- name: Create build directory
run: mkdir build
- name: Create build/test-environment
run: echo "export POSTGRES_URL=postgresql://postgres@postgres:5432/postgres" > build/test-environment
- name: Install UUID extension
run: |
apk add postgresql-client \
&& psql -q -h postgres -U postgres -d postgres -f fixtures/testing.sql
- name: Run flake8 tests
run: flake8
- name: Run tests
run: coverage run
- name: Output coverage
run: coverage report && coverage xml
- name: Upload Coverage
uses: codecov/codecov-action@v1.0.2
if: github.event_name == 'push' && github.repository == 'sprockets/sprockets-postgres'
with:
token: ${{secrets.CODECOV_TOKEN}}
file: build/coverage.xml
flags: unittests
fail_ci_if_error: true

23
CONTRIBUTING.md Normal file
View file

@ -0,0 +1,23 @@
# Contributing
To get setup in the environment and run the tests, take the following steps:
```bash
python3 -m venv env
source env/bin/activate
pip install -e '.[testing]'
bootstrap
flake8
coverage run && coverage report
```
## Test Coverage
Pull requests that make changes or additions that are not covered by tests
will likely be closed without review.
In addition, all tests must pass the tests **AND** flake8 linter. If flake8
exceptions are included, the reasoning for adding the exception must be included
in the pull request.

40
LICENSE
View file

@ -1,29 +1,25 @@
BSD 3-Clause License
Copyright (c) 2020, Sprockets
Copyright (c) 2020 AWeber Communications
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
Redistribution and use in source and binary forms, with or without modification,
are permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright notice, this
* Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright notice,
* Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
* Neither the name of the copyright holder nor the names of its contributors may
be used to endorse or promote products derived from this software without
specific prior written permission.
3. Neither the name of the copyright holder nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

3
MANIFEST.in Normal file
View file

@ -0,0 +1,3 @@
include README.rst
include LICENSE
include VERSION

View file

@ -1 +0,0 @@
# sprockets-postgres

67
README.rst Normal file
View file

@ -0,0 +1,67 @@
Sprockets Postgres
==================
An set of mixins and classes for interacting with PostgreSQL using asyncio in
Tornado / sprockets.http applications using aiopg.
|Version| |Status| |Coverage| |License|
Installation
------------
``sprockets-postgres`` is available on the Python package index and is installable via pip:
.. code:: bash
pip install sprockets-postgres
Documentation
-------------
Documentation is available at `sprockets-postgres.readthedocs.io <https://sprockets-postgres.readthedocs.io>`_.
Configuration
-------------
The following table details the environment variable configuration options:
+---------------------------------+--------------------------------------------------+-------------------+
| Variable | Definition | Default |
+=================================+==================================================+===================+
| ``POSTGRES_URL`` | The PostgreSQL URL to connect to | |
+---------------------------------+--------------------------------------------------+-------------------+
| ``POSTGRES_MAX_POOL_SIZE`` | Maximum connection count to Postgres per backend | ``0`` (Unlimited) |
+---------------------------------+--------------------------------------------------+-------------------+
| ``POSTGRES_MIN_POOL_SIZE`` | Minimum or starting pool size. | ``1`` |
+---------------------------------+--------------------------------------------------+-------------------+
| ``POSTGRES_CONNECTION_TIMEOUT`` | The maximum time in seconds to spend attempting | ``10`` |
| | to create a new connection. | |
+---------------------------------+--------------------------------------------------+-------------------+
| ``POSTGRES_CONNECTION_TTL`` | Time-to-life in seconds for a pooled connection. | ``300`` |
+---------------------------------+--------------------------------------------------+-------------------+
| ``POSTGRES_QUERY_TIMEOUT`` | Maximum execution time for a query in seconds. | ``60`` |
+---------------------------------+--------------------------------------------------+-------------------+
| ``POSTGRES_HSTORE`` | Enable HSTORE support in the client. | ``FALSE`` |
+---------------------------------+--------------------------------------------------+-------------------+
| ``POSTGRES_JSON`` | Enable JSON support in the client. | ``FALSE`` |
+---------------------------------+--------------------------------------------------+-------------------+
| ``POSTGRES_UUID`` | Enable UUID support in the client. | ``TRUE`` |
+---------------------------------+--------------------------------------------------+-------------------+
Requirements
------------
- `aiopg <https://aioboto3.readthedocs.io/en/latest/>`_
- `sprockets.http <https://sprocketshttp.readthedocs.io/en/master/>`_
- `Tornado <https://tornadoweb.org>`_
Version History
---------------
Available at https://sprockets-postgres.readthedocs.org/en/latest/history.html
.. |Version| image:: https://img.shields.io/pypi/v/sprockets-postgres.svg?
:target: http://badge.fury.io/py/sprockets-postgres
.. |Status| image:: https://img.shields.io/travis/sprockets/sprockets-postgres.svg?
:target: https://travis-ci.org/sprockets/sprockets-postgres
.. |Coverage| image:: https://img.shields.io/codecov/c/github/sprockets/sprockets-postgres.svg?
:target: https://codecov.io/github/sprockets/sprockets-postgres?branch=master
.. |License| image:: https://img.shields.io/pypi/l/sprockets-postgres.svg?
:target: https://sprockets-postgres.readthedocs.org

1
VERSION Normal file
View file

@ -0,0 +1 @@
1.0.0

77
bootstrap Executable file
View file

@ -0,0 +1,77 @@
#!/usr/bin/env sh
set -e
# Common constants
COLOR_RESET='\033[0m'
COLOR_GREEN='\033[0;32m'
COMPOSE_PROJECT_NAME="${COMPOSE_PROJECT_NAME:-${PWD##*/}}"
TEST_HOST="${TEST_HOST:-localhost}"
echo "Integration test host: ${TEST_HOST}"
get_exposed_port() {
if [ -z $3 ]
then
docker-compose port $1 $2 | cut -d: -f2
else
docker-compose port --index=$3 $1 $2 | cut -d: -f2
fi
}
report_start() {
printf "Waiting for $1 ... "
}
report_done() {
printf "${COLOR_GREEN}done${COLOR_RESET}\n"
}
wait_for_healthy_containers() {
IDs=$(docker-compose ps -q | paste -sd " " -)
report_start "${1} containers to report healthy"
counter="0"
while true
do
if [ "$(docker inspect -f "{{.State.Health.Status}}" ${IDs} | grep -c healthy)" -eq "${1}" ]; then
break
fi
counter=$((++counter))
if [ "${counter}" -eq 120 ]; then
echo " ERROR: containers failed to start"
exit 1
fi
sleep 1
done
report_done
}
# Ensure Docker is Running
echo "Docker Information:"
echo ""
docker version
echo ""
# Activate the virtual environment
if test -e env/bin/activate
then
. ./env/bin/activate
fi
mkdir -p build
# Stop any running instances and clean up after them, then pull images
docker-compose down --volumes --remove-orphans
docker-compose up -d --quiet-pull
wait_for_healthy_containers 1
printf "Loading fixture data ... "
docker-compose exec postgres psql -q -o /dev/null -U postgres -d postgres -f /fixtures/testing.sql
report_done
cat > build/test-environment<<EOF
export ASYNC_TEST_TIMEOUT=5
export POSTGRES_URL=postgresql://postgres@${TEST_HOST}:$(get_exposed_port postgres 5432)/postgres
EOF
printf "\nBootstrap complete\n\nDon't forget to \"source build/test-environment\"\n"

17
docker-compose.yml Normal file
View file

@ -0,0 +1,17 @@
version: '3.3'
services:
postgres:
image: postgres:12-alpine
environment:
POSTGRES_HOST_AUTH_METHOD: trust
healthcheck:
test: pg_isready
interval: 30s
timeout: 10s
retries: 3
ports:
- 5432
volumes:
- type: bind
source: ./fixtures
target: /fixtures

6
docs/application.rst Normal file
View file

@ -0,0 +1,6 @@
Application Mixin
=================
.. autoclass:: sprockets_postgres.ApplicationMixin
:members:
:member-order: bysource

41
docs/conf.py Normal file
View file

@ -0,0 +1,41 @@
import datetime
import pkg_resources
master_doc = 'index'
project = 'sprockets-postgres'
release = version = pkg_resources.get_distribution(project).version
copyright = '{}, AWeber Communications'.format(datetime.date.today().year)
html_theme = 'sphinx_rtd_theme'
html_theme_path = ['_themes']
extensions = [
'sphinx.ext.autodoc',
'sphinx.ext.intersphinx',
'sphinx_autodoc_typehints',
'sphinx.ext.viewcode'
]
set_type_checking_flag = True
typehints_fully_qualified = True
always_document_param_types = True
typehints_document_rtype = True
templates_path = ['_templates']
exclude_patterns = ['_build', 'Thumbs.db', '.DS_Store']
intersphinx_mapping = {
'aiopg': ('https://aiopg.readthedocs.io/en/stable/', None),
'psycopg2': ('http://initd.org/psycopg/docs/', None),
'python': ('https://docs.python.org/3', None),
'sprockets-http': (
'https://sprocketshttp.readthedocs.io/en/master/', None),
'sprockets-influxdb': (
'https://sprockets-influxdb.readthedocs.io/en/latest/', None),
'sprockets.mixins.metrics': (
'https://sprocketsmixinsmetrics.readthedocs.io/en/latest/', None),
'tornado': ('http://tornadoweb.org/en/latest/', None)
}
autodoc_default_options = {'autodoc_typehints': 'description'}

27
docs/configuration.rst Normal file
View file

@ -0,0 +1,27 @@
Configuration
=============
:py:mod:`sprockets-postgres <sprockets_postgres>` is configured via environment variables. The following table
details the configuration options and their defaults.
+---------------------------------+--------------------------------------------------+-------------------+
| Variable | Definition | Default |
+=================================+==================================================+===================+
| ``POSTGRES_URL`` | The PostgreSQL URL to connect to | |
+---------------------------------+--------------------------------------------------+-------------------+
| ``POSTGRES_MAX_POOL_SIZE`` | Maximum connection count to Postgres per backend | ``0`` (Unlimited) |
+---------------------------------+--------------------------------------------------+-------------------+
| ``POSTGRES_MIN_POOL_SIZE`` | Minimum or starting pool size. | ``1`` |
+---------------------------------+--------------------------------------------------+-------------------+
| ``POSTGRES_CONNECTION_TIMEOUT`` | The maximum time in seconds to spend attempting | ``10`` |
| | to create a new connection. | |
+---------------------------------+--------------------------------------------------+-------------------+
| ``POSTGRES_CONNECTION_TTL`` | Time-to-life in seconds for a pooled connection. | ``300`` |
+---------------------------------+--------------------------------------------------+-------------------+
| ``POSTGRES_QUERY_TIMEOUT`` | Maximum execution time for a query in seconds. | ``60`` |
+---------------------------------+--------------------------------------------------+-------------------+
| ``POSTGRES_HSTORE`` | Enable HSTORE support in the client. | ``FALSE`` |
+---------------------------------+--------------------------------------------------+-------------------+
| ``POSTGRES_JSON`` | Enable JSON support in the client. | ``FALSE`` |
+---------------------------------+--------------------------------------------------+-------------------+
| ``POSTGRES_UUID`` | Enable UUID support in the client. | ``TRUE`` |
+---------------------------------+--------------------------------------------------+-------------------+

9
docs/connector.rst Normal file
View file

@ -0,0 +1,9 @@
Postgres Connector
==================
A :class:`~sprockets_postgres.PostgresConnector` instance contains a cursor for
a Postgres connection and methods to execute queries.
.. autoclass:: sprockets_postgres.PostgresConnector
:members:
:undoc-members:
:member-order: bysource

44
docs/example.rst Normal file
View file

@ -0,0 +1,44 @@
Example Web Application
=======================
The following code provides a simple example for using the
.. code-block:: python
import sprockets.http
import sprockets_postgres as postgres
from sprockets.http import app
class RequestHandler(postgres.RequestHandlerMixin, web.RequestHandler):
GET_SQL = """\
SELECT foo_id, bar, baz, qux
FROM public.foo
WHERE foo_id = %(foo_id)s;"""
async def get(self, foo_id: str) -> None:
result = await self.postgres_execute(self.GET_SQL, {'foo_id': foo_id})
await self.finish(result.row)
class Application(postgres.ApplicationMixin, app.Application):
"""
The ``ApplicationMixin`` provides the foundation for the
``RequestHandlerMixin`` to properly function and will automatically
setup the pool to connect to PostgreSQL and will shutdown the connections
cleanly when the application stops.
It should be used in conjunction with ``sprockets.http.app.Application``
and not directly with ``tornado.web.Application``.
"""
def make_app(**settings):
return Application([
web.url(r'/foo/(?P<foo_id>.*)', FooRequestHandler)
], **settings)
if __name__ == '__main__':
sprockets.http.run(make_app)

4
docs/exceptions.rst Normal file
View file

@ -0,0 +1,4 @@
Exceptions
==========
.. autoclass:: sprockets_postgres.ConnectionException

2
docs/genindex.rst Normal file
View file

@ -0,0 +1,2 @@
Index
=====

33
docs/index.rst Normal file
View file

@ -0,0 +1,33 @@
Sprockets Postgres
==================
An set of mixins and classes for interacting with PostgreSQL using :mod:`asyncio`
in :class:`Tornado <tornado.web.Application>` /
:class:`sprockets.http <sprockets.http.app.Application>` applications using
:py:mod:`aiopg`.
Python versions supported: 3.7+
Installation
------------
``sprockets-postgres`` is available on the Python package index and is installable via pip:
.. code:: bash
pip3 install sprockets-postgres
Documentation
-------------
.. toctree::
:maxdepth: 1
configuration
application
request_handler
query_result
connector
typing
exceptions
example
genindex

9
docs/query_result.rst Normal file
View file

@ -0,0 +1,9 @@
Query Result
============
A :class:`~sprockets_postgres.QueryResult` instance is created for every query
and contains the count of rows effected by the query and either the ``row`` as
a :class:`dict` or ``rows`` as a list of :class:`dict`. For queries that do
not return any data, both ``row`` and ``rows`` will be :const:`None`.
.. autoclass:: sprockets_postgres.QueryResult
:members:

11
docs/request_handler.rst Normal file
View file

@ -0,0 +1,11 @@
RequestHandler Mixin
====================
The :class:`~sprockets_postgres.RequestHandlerMixin` is a Tornado
:class:`tornado.web.RequestHandler` mixin that provides easy to use
functionality for interacting with PostgreSQL.
.. autoclass:: sprockets_postgres.RequestHandlerMixin
:members:
:undoc-members:
:private-members:
:member-order: bysource

5
docs/requirements.txt Normal file
View file

@ -0,0 +1,5 @@
Sphinx==2.4.4
sphinx-autodoc-typehints
sphinx_rtd_theme
typed_ast
typing_extensions

6
docs/typing.rst Normal file
View file

@ -0,0 +1,6 @@
Type Annotations
================
.. autodata:: sprockets_postgres.QueryParameters
.. autodata:: sprockets_postgres.Timeout

30
fixtures/testing.sql Normal file
View file

@ -0,0 +1,30 @@
CREATE EXTENSION "uuid-ossp";
CREATE TABLE public.test (
id UUID NOT NULL PRIMARY KEY,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
last_modified_at TIMESTAMP WITH TIME ZONE,
value TEXT NOT NULL
);
CREATE TABLE public.query_count(
key TEXT NOT NULL PRIMARY KEY,
last_updated_at TIMESTAMP WITH TIME ZONE,
count INTEGER
);
INSERT INTO public.query_count (key, last_updated_at, count)
VALUES ('test', CURRENT_TIMESTAMP, 0);
CREATE TABLE public.test_rows (
id INTEGER NOT NULL PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
last_modified_at TIMESTAMP WITH TIME ZONE,
toggle BOOLEAN NOT NULL
);
INSERT INTO public.test_rows (toggle) VALUES (FALSE);
INSERT INTO public.test_rows (toggle) VALUES (FALSE);
INSERT INTO public.test_rows (toggle) VALUES (FALSE);
INSERT INTO public.test_rows (toggle) VALUES (FALSE);
INSERT INTO public.test_rows (toggle) VALUES (FALSE);

76
setup.cfg Normal file
View file

@ -0,0 +1,76 @@
[metadata]
name = sprockets-postgres
version = file: VERSION
description = An asynchronous Postgres client and mixin for Tornado applications
long_description = file: README.rst
long_description_content_type = text/x-rst; charset=UTF-8
license = BSD 3-Clause License
license-file = LICENSE
home-page = https://github.com/sprockets/sprockets-postgres
project_urls =
Bug Tracker = https://github.com/sprockets/sprockets-postgres/issues
Documentation = https://sprockets-postgres.readthedocs.io
Source Code = https://github.com/sprockets/sprockets-postgres/
classifiers =
Development Status :: 3 - Alpha
Intended Audience :: Developers
License :: OSI Approved :: BSD License
Natural Language :: English
Operating System :: OS Independent
Programming Language :: Python :: 3
Programming Language :: Python :: 3.7
Programming Language :: Python :: 3.8
Topic :: Communications
Topic :: Internet
Topic :: Software Development
Typing :: Typed
requires-dist = setuptools
keywords =
postgres
python3
tornado
[options]
include_package_data = True
install_requires =
aiopg>=1.0.0,<2
sprockets.http>=2.1.1,<3
tornado>=6,<7
pymodules =
sprockets_postgres
zip_safe = true
[options.extras_require]
testing =
coverage
flake8
flake8-comprehensions
flake8-deprecated
flake8-import-order
flake8-print
flake8-quotes
flake8-rst-docstrings
flake8-tuple
pygments
[coverage:run]
branch = True
command_line = -m unittest discover tests --verbose
data_file = build/.coverage
[coverage:report]
show_missing = True
include =
sprockets_postgres.py
[coverage:html]
directory = build/coverage
[coverage:xml]
output = build/coverage.xml
[flake8]
application-import-names = sprockets_postgres,tests
exclude = build,docs,env
import-order-style = pycharm
rst-roles = attr,class,const,data,exc,func,meth,mod,obj,ref,yields

3
setup.py Normal file
View file

@ -0,0 +1,3 @@
import setuptools
setuptools.setup()

585
sprockets_postgres.py Normal file
View file

@ -0,0 +1,585 @@
import asyncio
import contextlib
import dataclasses
import logging
import os
import time
import typing
from distutils import util
import aiopg
import psycopg2
from aiopg import pool
from psycopg2 import errors, extras
from tornado import ioloop, web
LOGGER = logging.getLogger('sprockets-postgres')
DEFAULT_POSTGRES_CONNECTION_TIMEOUT = 10
DEFAULT_POSTGRES_CONNECTION_TTL = 300
DEFAULT_POSTGRES_HSTORE = 'FALSE'
DEFAULT_POSTGRES_JSON = 'FALSE'
DEFAULT_POSTGRES_MAX_POOL_SIZE = 0
DEFAULT_POSTGRES_MIN_POOL_SIZE = 1
DEFAULT_POSTGRES_QUERY_TIMEOUT = 120
DEFAULT_POSTGRES_UUID = 'TRUE'
QueryParameters = typing.Union[dict, list, tuple, None]
"""Type annotation for query parameters"""
Timeout = typing.Union[int, float, None]
"""Type annotation for timeout values"""
@dataclasses.dataclass
class QueryResult:
"""A :func:`Data Class <dataclasses.dataclass>` that is generated as a
result of each query that is executed.
:param row_count: The quantity of rows impacted by the query
:param row: If a single row is returned, the data for that row
:param rows: If more than one row is returned, this attribute is set as the
list of rows, in order.
"""
row_count: int
row: typing.Optional[dict]
rows: typing.Optional[typing.List[dict]]
class PostgresConnector:
"""Wraps a :class:`aiopg.Cursor` instance for creating explicit
transactions, calling stored procedures, and executing queries.
Unless the :meth:`~sprockets_postgres.PostgresConnector.transaction`
asynchronous :ref:`context-manager <python:typecontextmanager>` is used,
each call to :meth:`~sprockets_postgres.PostgresConnector.callproc` and
:meth:`~sprockets_postgres.PostgresConnector.execute` is an explicit
transaction.
.. note:: :class:`PostgresConnector` instances are created by
:meth:`ApplicationMixin.postgres_connector
<sprockets_postgres.ApplicationMixin.postgres_connector>` and should
not be created directly.
:param cursor: The cursor to use in the connector
:type cursor: aiopg.Cursor
:param on_error: The callback to invoke when an exception is caught
:param on_duration: The callback to invoke when a query is complete and all
of the data has been returned.
:param timeout: A timeout value in seconds for executing queries. If
unspecified, defaults to the ``POSTGRES_QUERY_TIMEOUT`` environment
variable and if that is not specified, to the
:const:`DEFAULT_POSTGRES_QUERY_TIMEOUT` value of ``120``
:type timeout: :data:`~sprockets_postgres.Timeout`
"""
def __init__(self,
cursor: aiopg.Cursor,
on_error: typing.Callable,
on_duration: typing.Optional[typing.Callable] = None,
timeout: Timeout = None):
self.cursor = cursor
self._on_error = on_error
self._on_duration = on_duration
self._timeout = timeout or int(
os.environ.get(
'POSTGRES_QUERY_TIMEOUT',
DEFAULT_POSTGRES_QUERY_TIMEOUT))
async def callproc(self,
name: str,
parameters: QueryParameters = None,
metric_name: str = '',
*,
timeout: Timeout = None) -> QueryResult:
"""Execute a stored procedure / function
:param name: The stored procedure / function name to call
:param parameters: Query parameters to pass when calling
:type parameters: :data:`~sprockets_postgres.QueryParameters`
:param metric_name: The metric name for duration recording and logging
:param timeout: Timeout value to override the default or the value
specified when creating the
:class:`~sprockets_postgres.PostgresConnector`.
:type timeout: :data:`~sprockets_postgres.Timeout`
:raises asyncio.TimeoutError: when there is a query or network timeout
:raises psycopg2.Error: when there is an exception raised by Postgres
.. note: :exc:`psycopg2.Error` is the base exception for all
:mod:`psycopg2` exceptions and the actual exception raised will
likely be more specific.
:rtype: :class:`~sprockets_postgres.QueryResult`
"""
return await self._query(
self.cursor.callproc,
metric_name,
procname=name,
parameters=parameters,
timeout=timeout)
async def execute(self,
sql: str,
parameters: QueryParameters = None,
metric_name: str = '',
*,
timeout: Timeout = None) -> QueryResult:
"""Execute a query, specifying a name for the query, the SQL statement,
and optional positional arguments to pass in with the query.
Parameters may be provided as sequence or mapping and will be
bound to variables in the operation. Variables are specified
either with positional ``%s`` or named ``%({name})s`` placeholders.
:param sql: The SQL statement to execute
:param parameters: Query parameters to pass as part of the execution
:type parameters: :data:`~sprockets_postgres.QueryParameters`
:param metric_name: The metric name for duration recording and logging
:param timeout: Timeout value to override the default or the value
specified when creating the
:class:`~sprockets_postgres.PostgresConnector`.
:type timeout: :data:`~sprockets_postgres.Timeout`
:raises asyncio.TimeoutError: when there is a query or network timeout
:raises psycopg2.Error: when there is an exception raised by Postgres
.. note: :exc:`psycopg2.Error` is the base exception for all
:mod:`psycopg2` exceptions and the actual exception raised will
likely be more specific.
:rtype: :class:`~sprockets_postgres.QueryResult`
"""
return await self._query(
self.cursor.execute,
metric_name,
operation=sql,
parameters=parameters,
timeout=timeout)
@contextlib.asynccontextmanager
async def transaction(self) \
-> typing.AsyncContextManager['PostgresConnector']:
"""asynchronous :ref:`context-manager <python:typecontextmanager>`
function that implements full ``BEGIN``, ``COMMIT``, and ``ROLLBACK``
semantics. If there is a :exc:`psycopg2.Error` raised during the
transaction, the entire transaction will be rolled back.
If no exception is raised, the transaction will be committed when
exiting the context manager.
.. note:: This method is provided for edge case usage. As a
generalization
:meth:`sprockets_postgres.RequestHandlerMixin.postgres_transaction`
should be used instead.
*Usage Example*
.. code-block::
class RequestHandler(sprockets_postgres.RequestHandlerMixin,
web.RequestHandler):
async def post(self):
async with self.postgres_transaction() as transaction:
result1 = await transaction.execute(QUERY_ONE)
result2 = await transaction.execute(QUERY_TWO)
result3 = await transaction.execute(QUERY_THREE)
:raises asyncio.TimeoutError: when there is a query or network timeout
when starting the transaction
:raises psycopg2.Error: when there is an exception raised by Postgres
when starting the transaction
.. note: :exc:`psycopg2.Error` is the base exception for all
:mod:`psycopg2` exceptions and the actual exception raised will
likely be more specific.
"""
async with self.cursor.begin():
yield self
async def _query(self,
method: typing.Callable,
metric_name: str,
**kwargs):
if kwargs['timeout'] is None:
kwargs['timeout'] = self._timeout
start_time = time.monotonic()
try:
await method(**kwargs)
except (asyncio.TimeoutError, psycopg2.Error) as err:
exc = self._on_error(metric_name, err)
if exc:
raise exc
else:
results = await self._query_results()
if self._on_duration:
self._on_duration(
metric_name, time.monotonic() - start_time)
return results
async def _query_results(self) -> QueryResult:
count, row, rows = self.cursor.rowcount, None, None
if self.cursor.rowcount == 1:
try:
row = dict(await self.cursor.fetchone())
except psycopg2.ProgrammingError:
pass
elif self.cursor.rowcount > 1:
try:
rows = [dict(row) for row in await self.cursor.fetchall()]
except psycopg2.ProgrammingError:
pass
return QueryResult(count, row, rows)
class ConnectionException(Exception):
"""Raised when the connection to Postgres can not be established"""
class ApplicationMixin:
"""
:class:`sprockets.http.app.Application` / :class:`tornado.web.Application`
mixin for handling the connection to Postgres and exporting functions for
querying the database, getting the status, and proving a cursor.
Automatically creates and shuts down :class:`aiopg.Pool` on startup
and shutdown by installing `on_start` and `shutdown` callbacks into the
:class:`~sprockets.http.app.Application` instance.
"""
POSTGRES_STATUS_TIMEOUT = 3
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._postgres_pool: typing.Optional[pool.Pool] = None
self.runner_callbacks['on_start'].append(self._postgres_setup)
self.runner_callbacks['shutdown'].append(self._postgres_shutdown)
@contextlib.asynccontextmanager
async def postgres_connector(self,
on_error: typing.Callable,
on_duration: typing.Optional[
typing.Callable] = None,
timeout: Timeout = None) \
-> typing.AsyncContextManager[PostgresConnector]:
"""Asynchronous :ref:`context-manager <python:typecontextmanager>`
that returns a :class:`~sprockets_postgres.PostgresConnector` instance
from the connection pool with a cursor.
.. note:: This function is designed to work in conjunction with the
:class:`~sprockets_postgres.RequestHandlerMixin` and is generally
not invoked directly.
:param on_error: A callback function that is invoked on exception. If
an exception is returned from that function, it will raise it.
:param on_duration: An optional callback function that is invoked after
a query has completed to record the duration that encompasses
both executing the query and retrieving the returned records, if
any.
:param timeout: Used to override the default query timeout.
:type timeout: :data:`~sprockets_postgres.Timeout`
:raises asyncio.TimeoutError: when the request to retrieve a connection
from the pool times out.
:raises sprockets_postgres.ConnectionException: when the application
can not connect to the configured Postgres instance.
:raises psycopg2.Error: when Postgres raises an exception during the
creation of the cursor.
.. note: :exc:`psycopg2.Error` is the base exception for all
:mod:`psycopg2` exceptions and the actual exception raised will
likely be more specific.
"""
try:
async with self._postgres_pool.acquire() as conn:
async with conn.cursor(
cursor_factory=extras.RealDictCursor,
timeout=timeout) as cursor:
yield PostgresConnector(
cursor, on_error, on_duration, timeout)
except (asyncio.TimeoutError, psycopg2.Error) as err:
exc = on_error('postgres_connector', ConnectionException(str(err)))
if exc:
raise exc
else: # postgres_status.on_error does not return an exception
yield None
async def postgres_status(self) -> dict:
"""Invoke from the ``/status`` RequestHandler to check that there is
a Postgres connection handler available and return info about the
pool.
The ``available`` item in the dictionary indicates that the
application was able to perform a ``SELECT 1`` against the database
using a :class:`~sprockets_postgres.PostgresConnector` instance.
The ``pool_size`` item indicates the current quantity of open
connections to Postgres.
The ``pool_free`` item indicates the current number of idle
connections available to process queries.
*Example return value*
.. code-block:: python
{
'available': True,
'pool_size': 10,
'pool_free': 8
}
"""
query_error = asyncio.Event()
def on_error(_metric_name, _exc) -> None:
query_error.set()
return None
async with self.postgres_connector(
on_error,
timeout=self.POSTGRES_STATUS_TIMEOUT) as connector:
if connector:
await connector.execute('SELECT 1')
return {
'available': not query_error.is_set(),
'pool_size': self._postgres_pool.size,
'pool_free': self._postgres_pool.freesize
}
async def _postgres_setup(self,
_app: web.Application,
loop: ioloop.IOLoop) -> None:
"""Setup the Postgres pool of connections and log if there is an error.
This is invoked by the Application on start callback mechanism.
"""
if 'POSTGRES_URL' not in os.environ:
LOGGER.critical('Missing POSTGRES_URL environment variable')
return self.stop(loop)
self._postgres_pool = pool.Pool(
os.environ['POSTGRES_URL'],
minsize=int(
os.environ.get(
'POSTGRES_MIN_POOL_SIZE',
DEFAULT_POSTGRES_MIN_POOL_SIZE)),
maxsize=int(
os.environ.get(
'POSTGRES_MAX_POOL_SIZE',
DEFAULT_POSTGRES_MAX_POOL_SIZE)),
timeout=int(
os.environ.get(
'POSTGRES_CONNECT_TIMEOUT',
DEFAULT_POSTGRES_CONNECTION_TIMEOUT)),
enable_hstore=util.strtobool(
os.environ.get(
'POSTGRES_HSTORE', DEFAULT_POSTGRES_HSTORE)),
enable_json=util.strtobool(
os.environ.get('POSTGRES_JSON', DEFAULT_POSTGRES_JSON)),
enable_uuid=util.strtobool(
os.environ.get('POSTGRES_UUID', DEFAULT_POSTGRES_UUID)),
echo=False,
on_connect=None,
pool_recycle=int(
os.environ.get(
'POSTGRES_CONNECTION_TTL',
DEFAULT_POSTGRES_CONNECTION_TTL)))
try:
async with self._postgres_pool._cond:
await self._postgres_pool._fill_free_pool(False)
except (psycopg2.OperationalError,
psycopg2.Error) as error: # pragma: nocover
LOGGER.warning('Error connecting to PostgreSQL on startup: %s',
error)
async def _postgres_shutdown(self, _ioloop: ioloop.IOLoop) -> None:
"""Shutdown the Postgres connections and wait for them to close.
This is invoked by the Application shutdown callback mechanism.
"""
if self._postgres_pool is not None:
self._postgres_pool.close()
await self._postgres_pool.wait_closed()
class RequestHandlerMixin:
"""
A RequestHandler mixin class exposing functions for querying the database,
recording the duration to either :mod:`sprockets-influxdb
<sprockets_influxdb>` or :mod:`sprockets.mixins.metrics`, and
handling exceptions.
"""
async def postgres_callproc(self,
name: str,
parameters: QueryParameters = None,
metric_name: str = '',
*,
timeout: Timeout = None) -> QueryResult:
"""Execute a stored procedure / function
:param name: The stored procedure / function name to call
:param parameters: Query parameters to pass when calling
:type parameters: :data:`~sprockets_postgres.QueryParameters`
:param metric_name: The metric name for duration recording and logging
:param timeout: Timeout value to override the default or the value
specified when creating the
:class:`~sprockets_postgres.PostgresConnector`.
:type timeout: :data:`~sprockets_postgres.Timeout`
:raises asyncio.TimeoutError: when there is a query or network timeout
:raises psycopg2.Error: when there is an exception raised by Postgres
.. note: :exc:`psycopg2.Error` is the base exception for all
:mod:`psycopg2` exceptions and the actual exception raised will
likely be more specific.
:rtype: :class:`~sprockets_postgres.QueryResult`
"""
async with self.application.postgres_connector(
self._on_postgres_error,
self._on_postgres_timing,
timeout) as connector:
return await connector.callproc(
name, parameters, metric_name, timeout=timeout)
async def postgres_execute(self,
sql: str,
parameters: QueryParameters = None,
metric_name: str = '',
*,
timeout: Timeout = None) -> QueryResult:
"""Execute a query, specifying a name for the query, the SQL statement,
and optional positional arguments to pass in with the query.
Parameters may be provided as sequence or mapping and will be
bound to variables in the operation. Variables are specified
either with positional ``%s`` or named ``%({name})s`` placeholders.
:param sql: The SQL statement to execute
:param parameters: Query parameters to pass as part of the execution
:type parameters: :data:`~sprockets_postgres.QueryParameters`
:param metric_name: The metric name for duration recording and logging
:param timeout: Timeout value to override the default or the value
specified when creating the
:class:`~sprockets_postgres.PostgresConnector`.
:type timeout: :data:`~sprockets_postgres.Timeout`
:raises asyncio.TimeoutError: when there is a query or network timeout
:raises psycopg2.Error: when there is an exception raised by Postgres
.. note: :exc:`psycopg2.Error` is the base exception for all
:mod:`psycopg2` exceptions and the actual exception raised will
likely be more specific.
:rtype: :class:`~sprockets_postgres.QueryResult`
"""
async with self.application.postgres_connector(
self._on_postgres_error,
self._on_postgres_timing,
timeout) as connector:
return await connector.execute(
sql, parameters, metric_name, timeout=timeout)
@contextlib.asynccontextmanager
async def postgres_transaction(self, timeout: Timeout = None) \
-> typing.AsyncContextManager[PostgresConnector]:
"""asynchronous :ref:`context-manager <python:typecontextmanager>`
function that implements full ``BEGIN``, ``COMMIT``, and ``ROLLBACK``
semantics. If there is a :exc:`psycopg2.Error` raised during the
transaction, the entire transaction will be rolled back.
If no exception is raised, the transaction will be committed when
exiting the context manager.
*Usage Example*
.. code-block:: python
class RequestHandler(sprockets_postgres.RequestHandlerMixin,
web.RequestHandler):
async def post(self):
async with self.postgres_transaction() as transaction:
result1 = await transaction.execute(QUERY_ONE)
result2 = await transaction.execute(QUERY_TWO)
result3 = await transaction.execute(QUERY_THREE)
:param timeout: Timeout value to override the default or the value
specified when creating the
:class:`~sprockets_postgres.PostgresConnector`.
:type timeout: :data:`~sprockets_postgres.Timeout`
:raises asyncio.TimeoutError: when there is a query or network timeout
when starting the transaction
:raises psycopg2.Error: when there is an exception raised by Postgres
when starting the transaction
.. note: :exc:`psycopg2.Error` is the base exception for all
:mod:`psycopg2` exceptions and the actual exception raised will
likely be more specific.
"""
async with self.application.postgres_connector(
self._on_postgres_error,
self._on_postgres_timing,
timeout) as connector:
async with connector.transaction():
yield connector
def _on_postgres_error(self,
metric_name: str,
exc: Exception) -> typing.Optional[Exception]:
"""Override for different error handling behaviors
Return an exception if you would like for it to be raised, or swallow
it here.
"""
LOGGER.error('%s in %s for %s (%s)',
exc.__class__.__name__, self.__class__.__name__,
metric_name, str(exc).split('\n')[0])
if isinstance(exc, ConnectionException):
raise web.HTTPError(503, reason='Database Connection Error')
elif isinstance(exc, asyncio.TimeoutError):
raise web.HTTPError(500, reason='Query Timeout')
elif isinstance(exc, errors.UniqueViolation):
raise web.HTTPError(409, reason='Unique Violation')
elif isinstance(exc, psycopg2.Error):
raise web.HTTPError(500, reason='Database Error')
return exc
def _on_postgres_timing(self,
metric_name: str,
duration: float) -> None:
"""Override for custom metric recording. As a default behavior it will
attempt to detect `sprockets-influxdb
<https://sprockets-influxdb.readthedocs.io/>`_ and
`sprockets.mixins.metrics
<https://sprocketsmixinsmetrics.readthedocs.io/en/latest/>`_ and
record the metrics using them if they are available. If they are not
available, it will record the query duration to the `DEBUG` log.
:param metric_name: The name of the metric to record
:param duration: The duration to record for the metric
"""
if hasattr(self, 'influxdb'): # sprockets-influxdb
self.influxdb.set_field(metric_name, duration)
elif hasattr(self, 'record_timing'): # sprockets.mixins.metrics
self.record_timing(metric_name, duration)
else:
LOGGER.debug('Postgres query %s duration: %s',
metric_name, duration)

431
tests.py Normal file
View file

@ -0,0 +1,431 @@
import asyncio
import json
import os
import typing
import unittest
import uuid
from unittest import mock
import psycopg2
from psycopg2 import errors
from sprockets.http import app, testing
from tornado import ioloop, web
import sprockets_postgres
class RequestHandler(sprockets_postgres.RequestHandlerMixin,
web.RequestHandler):
"""Base RequestHandler for test endpoints"""
def cast_data(self, data: typing.Union[dict, list, None]) \
-> typing.Union[dict, list, None]:
if data is None:
return None
elif isinstance(data, list):
return [self.cast_data(row) for row in data]
return {k: str(v) for k, v in data.items()}
class CallprocRequestHandler(RequestHandler):
async def get(self):
result = await self.postgres_callproc(
'uuid_generate_v4', metric_name='uuid')
await self.finish({'value': str(result.row['uuid_generate_v4'])})
class CountRequestHandler(RequestHandler):
GET_SQL = """\
SELECT last_updated_at, count
FROM public.query_count
WHERE key = 'test';"""
async def get(self):
result = await self.postgres_execute(self.GET_SQL)
await self.finish(self.cast_data(result.row))
class ErrorRequestHandler(RequestHandler):
GET_SQL = """\
SELECT last_updated_at, count
FROM public.query_count
WHERE key = 'test';"""
async def get(self):
await self.postgres_execute(self.GET_SQL)
self.set_status(204)
def _on_postgres_error(self,
metric_name: str,
exc: Exception) -> typing.Optional[Exception]:
return RuntimeError()
class ErrorPassthroughRequestHandler(RequestHandler):
async def get(self):
exc = self._on_postgres_error('test', RuntimeError())
if isinstance(exc, RuntimeError):
self.set_status(204)
else:
raise web.HTTPError(500, 'Did not pass through')
class ExecuteRequestHandler(RequestHandler):
GET_SQL = 'SELECT %s::TEXT AS value;'
async def get(self):
timeout = self.get_argument('timeout', None)
if timeout is not None:
timeout = int(timeout)
result = await self.postgres_execute(
self.GET_SQL, [self.get_argument('value')], timeout=timeout)
await self.finish({
'value': result.row['value'] if result.row else None})
class InfluxDBRequestHandler(ExecuteRequestHandler):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.influxdb = self.application.influxdb
self.influxdb.add_field = mock.Mock()
class MetricsMixinRequestHandler(ExecuteRequestHandler):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.record_timing = self.application.record_timing
class MultiRowRequestHandler(RequestHandler):
GET_SQL = 'SELECT * FROM public.test_rows;'
UPDATE_SQL = """\
UPDATE public.test_rows
SET toggle = %(to_value)s,
last_modified_at = CURRENT_TIMESTAMP
WHERE toggle IS %(from_value)s"""
async def get(self):
result = await self.postgres_execute(self.GET_SQL)
await self.finish({
'count': result.row_count,
'rows': self.cast_data(result.rows)})
async def post(self):
body = json.loads(self.request.body.decode('utf-8'))
result = await self.postgres_execute(
self.UPDATE_SQL, {
'to_value': body['value'], 'from_value': not body['value']})
await self.finish({
'count': result.row_count,
'rows': self.cast_data(result.rows)})
class NoErrorRequestHandler(ErrorRequestHandler):
def _on_postgres_error(self,
metric_name: str,
exc: Exception) -> typing.Optional[Exception]:
return None
class NoRowRequestHandler(RequestHandler):
GET_SQL = """\
SELECT * FROM information_schema.tables WHERE table_schema = 'foo';"""
async def get(self):
result = await self.postgres_execute(self.GET_SQL)
await self.finish({
'count': result.row_count,
'rows': self.cast_data(result.rows)})
class StatusRequestHandler(RequestHandler):
async def get(self):
status = await self.application.postgres_status()
if not status['available']:
self.set_status(503, 'Database Unavailable')
await self.finish(status)
class TransactionRequestHandler(RequestHandler):
GET_SQL = """\
SELECT id, created_at, last_modified_at, value
FROM public.test
WHERE id = %(id)s;"""
POST_SQL = """\
INSERT INTO public.test (id, created_at, value)
VALUES (%(id)s, CURRENT_TIMESTAMP, %(value)s)
RETURNING id, created_at, value;"""
UPDATE_COUNT_SQL = """\
UPDATE public.query_count
SET count = count + 1,
last_updated_at = CURRENT_TIMESTAMP
WHERE key = 'test'
RETURNING last_updated_at, count;"""
async def get(self, test_id):
result = await self.postgres_execute(self.GET_SQL, {'id': test_id})
if not result.row_count:
raise web.HTTPError(404, 'Not Found')
await self.finish(self.cast_data(result.row))
async def post(self):
body = json.loads(self.request.body.decode('utf-8'))
async with self.postgres_transaction() as postgres:
# This should roll back on the second call to this endopoint
self.application.first_txn = await postgres.execute(
self.POST_SQL, {'id': str(uuid.uuid4()),
'value': str(uuid.uuid4())})
# This should roll back on the second call to this endopoint
count = await postgres.execute(self.UPDATE_COUNT_SQL)
# This will trigger an error on the second call to this endpoint
user = await postgres.execute(self.POST_SQL, body)
await self.finish({
'count': self.cast_data(count.row),
'user': self.cast_data(user.row)})
class Application(sprockets_postgres.ApplicationMixin,
app.Application):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.influxdb = mock.Mock()
self.record_timing = mock.Mock()
self.first_txn: typing.Optional[sprockets_postgres.QueryResult] = None
class TestCase(testing.SprocketsHttpTestCase):
@classmethod
def setUpClass(cls):
with open('build/test-environment') as f:
for line in f:
if line.startswith('export '):
line = line[7:]
name, _, value = line.strip().partition('=')
os.environ[name] = value
def get_app(self):
self.app = Application(handlers=[
web.url('/callproc', CallprocRequestHandler),
web.url('/count', CountRequestHandler),
web.url('/error', ErrorRequestHandler),
web.url('/error-passthrough', ErrorPassthroughRequestHandler),
web.url('/execute', ExecuteRequestHandler),
web.url('/influxdb', InfluxDBRequestHandler),
web.url('/metrics-mixin', MetricsMixinRequestHandler),
web.url('/multi-row', MultiRowRequestHandler),
web.url('/no-error', NoErrorRequestHandler),
web.url('/no-row', NoRowRequestHandler),
web.url('/status', StatusRequestHandler),
web.url('/transaction', TransactionRequestHandler),
web.url('/transaction/(?P<test_id>.*)', TransactionRequestHandler)
])
return self.app
class RequestHandlerMixinTestCase(TestCase):
def test_postgres_status(self):
response = self.fetch('/status')
data = json.loads(response.body)
self.assertTrue(data['available'])
self.assertGreaterEqual(data['pool_size'], 1)
self.assertGreaterEqual(data['pool_free'], 1)
@mock.patch('aiopg.pool.Pool.acquire')
def test_postgres_status_connect_error(self, acquire):
acquire.side_effect = asyncio.TimeoutError()
response = self.fetch('/status')
self.assertEqual(response.code, 503)
self.assertFalse(json.loads(response.body)['available'])
@mock.patch('aiopg.cursor.Cursor.execute')
def test_postgres_status_error(self, execute):
execute.side_effect = asyncio.TimeoutError()
response = self.fetch('/status')
self.assertEqual(response.code, 503)
self.assertFalse(json.loads(response.body)['available'])
def test_postgres_callproc(self):
response = self.fetch('/callproc')
self.assertEqual(response.code, 200)
self.assertIsInstance(
uuid.UUID(json.loads(response.body)['value']), uuid.UUID)
@mock.patch('aiopg.cursor.Cursor.execute')
def test_postgres_error(self, execute):
execute.side_effect = asyncio.TimeoutError
response = self.fetch('/error')
self.assertEqual(response.code, 500)
self.assertIn(b'Internal Server Error', response.body)
@mock.patch('aiopg.pool.Pool.acquire')
def test_postgres_error_on_connect(self, acquire):
acquire.side_effect = asyncio.TimeoutError
response = self.fetch('/error')
self.assertEqual(response.code, 500)
self.assertIn(b'Internal Server Error', response.body)
def test_postgres_error_passthrough(self):
response = self.fetch('/error-passthrough')
self.assertEqual(response.code, 204)
def test_postgres_execute(self):
expectation = str(uuid.uuid4())
response = self.fetch('/execute?value={}'.format(expectation))
self.assertEqual(response.code, 200)
self.assertEqual(json.loads(response.body)['value'], expectation)
def test_postgres_execute_with_timeout(self):
expectation = str(uuid.uuid4())
response = self.fetch(
'/execute?value={}&timeout=5'.format(expectation))
self.assertEqual(response.code, 200)
self.assertEqual(json.loads(response.body)['value'], expectation)
def test_postgres_influxdb(self):
expectation = str(uuid.uuid4())
response = self.fetch(
'/influxdb?value={}'.format(expectation))
self.assertEqual(response.code, 200)
self.assertEqual(json.loads(response.body)['value'], expectation)
self.app.influxdb.set_field.assert_called_once()
def test_postgres_metrics_mixin(self):
expectation = str(uuid.uuid4())
response = self.fetch(
'/metrics-mixin?value={}'.format(expectation))
self.assertEqual(response.code, 200)
self.assertEqual(json.loads(response.body)['value'], expectation)
self.app.record_timing.assert_called_once()
def test_postgres_multirow_get(self):
response = self.fetch('/multi-row')
self.assertEqual(response.code, 200)
body = json.loads(response.body)
self.assertEqual(body['count'], 5)
self.assertIsInstance(body['rows'], list)
def test_postgres_multirow_no_data(self):
for value in [True, False]:
response = self.fetch(
'/multi-row', method='POST', body=json.dumps({'value': value}))
self.assertEqual(response.code, 200)
body = json.loads(response.body)
self.assertEqual(body['count'], 5)
self.assertIsNone(body['rows'])
def test_postgres_norow(self):
response = self.fetch('/no-row')
self.assertEqual(response.code, 200)
body = json.loads(response.body)
self.assertEqual(body['count'], 0)
self.assertIsNone(body['rows'])
@mock.patch('aiopg.cursor.Cursor.execute')
def test_postgres_execute_timeout_error(self, execute):
execute.side_effect = asyncio.TimeoutError()
response = self.fetch('/execute?value=1')
self.assertEqual(response.code, 500)
self.assertIn(b'Query Timeout', response.body)
@mock.patch('aiopg.cursor.Cursor.execute')
def test_postgres_execute_unique_violation(self, execute):
execute.side_effect = errors.UniqueViolation()
response = self.fetch('/execute?value=1')
self.assertEqual(response.code, 409)
self.assertIn(b'Unique Violation', response.body)
@mock.patch('aiopg.cursor.Cursor.execute')
def test_postgres_execute_error(self, execute):
execute.side_effect = psycopg2.Error()
response = self.fetch('/execute?value=1')
self.assertEqual(response.code, 500)
self.assertIn(b'Database Error', response.body)
@mock.patch('aiopg.cursor.Cursor.fetchone')
def test_postgres_programming_error(self, fetchone):
fetchone.side_effect = psycopg2.ProgrammingError()
response = self.fetch('/execute?value=1')
self.assertEqual(response.code, 200)
self.assertIsNone(json.loads(response.body)['value'])
@mock.patch('aiopg.connection.Connection.cursor')
def test_postgres_cursor_raises(self, cursor):
cursor.side_effect = asyncio.TimeoutError()
response = self.fetch('/execute?value=1')
self.assertEqual(response.code, 503)
class TransactionTestCase(TestCase):
def test_transactions(self):
test_body = {
'id': str(uuid.uuid4()),
'value': str(uuid.uuid4())
}
response = self.fetch(
'/transaction', method='POST', body=json.dumps(test_body))
self.assertEqual(response.code, 200)
record = json.loads(response.body.decode('utf-8'))
self.assertEqual(record['user']['id'], test_body['id'])
self.assertEqual(record['user']['value'], test_body['value'])
count = record['count']['count']
last_updated = record['count']['last_updated_at']
response = self.fetch(
'/transaction', method='POST', body=json.dumps(test_body))
self.assertEqual(response.code, 409)
response = self.fetch(
'/transaction/{}'.format(self.app.first_txn.row['id']))
self.assertEqual(response.code, 404)
response = self.fetch('/count')
self.assertEqual(response.code, 200)
record = json.loads(response.body.decode('utf-8'))
self.assertEqual(record['count'], count)
self.assertEqual(record['last_updated_at'], last_updated)
class MissingURLTestCase(unittest.TestCase):
@classmethod
def setUpClass(cls):
with open('build/test-environment') as f:
for line in f:
if line.startswith('export '):
line = line[7:]
name, _, value = line.strip().partition('=')
if name != 'POSTGRES_URL':
os.environ[name] = value
if 'POSTGRES_URL' in os.environ:
del os.environ['POSTGRES_URL']
def test_that_stop_is_invoked(self):
io_loop = ioloop.IOLoop.current()
obj = Application()
obj.stop = mock.Mock(wraps=obj.stop)
obj.start(io_loop)
io_loop.start()
obj.stop.assert_called_once()