From 661a5986c7fa3d6f6c6510d43813f370e8a05f7e Mon Sep 17 00:00:00 2001 From: "Gavin M. Roy" Date: Wed, 14 Apr 2021 15:59:13 -0400 Subject: [PATCH] Attempt to fix the reconnection race condition 1. Add new property function in the ApplicationMixin to indicate if Postgres is connected 2. Add a new guard in the RequestHandlerMixin to check that Postgres is connected prior to executing a query, raising a 503 if it is not 3. Catch OperationalError in RequestHandlerMixin and return 503 for it 4. Timeout when waiting on the connection when attempting to reconnect 5. Log when we're creating a new pool 6. Add debug logging to trace when connectios open 7. Add tests that ensure reconnect logic works as expected --- VERSION | 2 +- bootstrap | 2 +- sprockets_postgres.py | 64 ++++++++++++++++++++++++++++++++++--------- tests.py | 30 ++++++++++++++++++++ 4 files changed, 83 insertions(+), 15 deletions(-) diff --git a/VERSION b/VERSION index bd8bf88..27f9cd3 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -1.7.0 +1.8.0 diff --git a/bootstrap b/bootstrap index 735a3fc..3d641a3 100755 --- a/bootstrap +++ b/bootstrap @@ -71,7 +71,7 @@ report_done cat > build/test-environment< bool: + """Returns `True` if Postgres is currently connected""" + return self._postgres_connected is not None \ + and self._postgres_connected.is_set() + async def postgres_status(self) -> dict: """Invoke from the ``/status`` RequestHandler to check that there is a Postgres connection handler available and return info about the @@ -408,8 +423,7 @@ class ApplicationMixin: } """ - if not self._postgres_connected or \ - not self._postgres_connected.is_set(): + if not self.postgres_is_connected: return { 'available': False, 'pool_size': 0, @@ -497,12 +511,12 @@ class ApplicationMixin: else: url = self._postgres_settings['url'] - if self._postgres_pool: - self._postgres_pool.close() - safe_url = self._obscure_url_password(url) LOGGER.debug('Connecting to %s', safe_url) + if self._postgres_pool and not self._postgres_pool.closed: + self._postgres_pool.close() + try: self._postgres_pool = await pool.Pool.from_pool_fill( url, @@ -513,7 +527,7 @@ class ApplicationMixin: enable_json=self._postgres_settings['enable_json'], enable_uuid=self._postgres_settings['enable_uuid'], echo=False, - on_connect=None, + on_connect=self._on_postgres_connect, pool_recycle=self._postgres_settings['connection_ttl']) except psycopg2.Error as error: # pragma: nocover LOGGER.warning( @@ -535,6 +549,9 @@ class ApplicationMixin: url = parse.urlunparse(parsed._replace(netloc=netloc)) return url + async def _on_postgres_connect(self, conn): + LOGGER.debug('New postgres connection %s', conn) + async def _postgres_on_start(self, _app: web.Application, loop: ioloop.IOLoop): @@ -640,6 +657,7 @@ class RequestHandlerMixin: :rtype: :class:`~sprockets_postgres.QueryResult` """ + self._postgres_connection_check() async with self.application.postgres_connector( self.on_postgres_error, self.on_postgres_timing, @@ -679,6 +697,7 @@ class RequestHandlerMixin: :rtype: :class:`~sprockets_postgres.QueryResult` """ + self._postgres_connection_check() async with self.application.postgres_connector( self.on_postgres_error, self.on_postgres_timing, @@ -726,6 +745,7 @@ class RequestHandlerMixin: likely be more specific. """ + self._postgres_connection_check() async with self.application.postgres_connector( self.on_postgres_error, self.on_postgres_timing, @@ -771,6 +791,11 @@ class RequestHandlerMixin: raise problemdetails.Problem( status_code=409, title='Unique Violation') raise web.HTTPError(409, reason='Unique Violation') + elif isinstance(exc, psycopg2.OperationalError): + if problemdetails: + raise problemdetails.Problem( + status_code=503, title='Database Error') + raise web.HTTPError(503, reason='Database Error') elif isinstance(exc, psycopg2.Error): if problemdetails: raise problemdetails.Problem( @@ -801,6 +826,19 @@ class RequestHandlerMixin: LOGGER.debug('Postgres query %s duration: %s', metric_name, duration) + def _postgres_connection_check(self): + """Ensures Postgres is connected, exiting the request in error if not + + :raises: problemdetails.Problem + :raises: web.HTTPError + + """ + if not self.application.postgres_is_connected: + if problemdetails: + raise problemdetails.Problem( + status_code=503, title='Database Connection Error') + raise web.HTTPError(503, reason='Database Connection Error') + class StatusRequestHandler(web.RequestHandler): """A RequestHandler that can be used to expose API health or status""" diff --git a/tests.py b/tests.py index ee2bfd4..3aff85b 100644 --- a/tests.py +++ b/tests.py @@ -361,8 +361,38 @@ class PostgresStatusTestCase(asynctest.TestCase): 'pool_free': 0}) +class ReconnectionTestCast(TestCase): + + @ttesting.gen_test + async def test_postgres_reconnect(self): + response = await self.http_client.fetch(self.get_url('/callproc')) + self.assertEqual(response.code, 200) + self.assertIsInstance( + uuid.UUID(json.loads(response.body)['value']), uuid.UUID) + + # Force close all open connections for tests + conn = await aiopg.connect(os.environ['POSTGRES_URL'].split('?')[0]) + cursor = await conn.cursor() + await cursor.execute( + 'SELECT pg_terminate_backend(pid)' + ' FROM pg_stat_activity' + " WHERE application_name = 'sprockets_postgres'") + await cursor.fetchall() + await asyncio.sleep(1) + response = await self.http_client.fetch( + self.get_url('/callproc'), raise_error=False) + self.assertEqual(response.code, 200) + conn.close() + + class RequestHandlerMixinTestCase(TestCase): + def test_postgres_connected(self): + response = self.fetch('/status') + data = json.loads(response.body) + self.assertEqual(data['status'], 'ok') + self.assertTrue(self.app.postgres_is_connected) + def test_postgres_status(self): response = self.fetch('/status') data = json.loads(response.body)