From d17c668f835558fc0c28421f77866bdc792928cb Mon Sep 17 00:00:00 2001 From: "Gavin M. Roy" Date: Thu, 10 Mar 2016 17:10:31 -0500 Subject: [PATCH] Add async shutdown support an on_start callbacks --- sprockets/http/__init__.py | 9 ++++- sprockets/http/runner.py | 83 +++++++++++++++++++++++++++++++------- tests.py | 45 ++++++++++++++++++++- 3 files changed, 120 insertions(+), 17 deletions(-) diff --git a/sprockets/http/__init__.py b/sprockets/http/__init__.py index 968bd9b..63620fb 100644 --- a/sprockets/http/__init__.py +++ b/sprockets/http/__init__.py @@ -2,7 +2,7 @@ import logging import os -version_info = (1, 1, 2) +version_info = (1, 2, 0) __version__ = '.'.join(str(v) for v in version_info) @@ -69,8 +69,13 @@ def run(create_application, settings=None, log_config=None): sub-processes are forked (if necessary) and before the IOLoop is started. + The *on_start* key contains functions that are invoked when the IOLoop + is started. + The *shutdown* key contains functions that are invoked when a stop - signal is received *before* the IOLoop is stopped. + signal is received *before* the IOLoop is stopped. These functions + can return a :class:`~tornado.concurrent.Future` to allow for asynchronous + processing of events during the shutdown phase. """ from . import runner diff --git a/sprockets/http/runner.py b/sprockets/http/runner.py index 7110eac..afa1641 100644 --- a/sprockets/http/runner.py +++ b/sprockets/http/runner.py @@ -10,7 +10,7 @@ import logging import signal import sys -from tornado import httpserver, ioloop +from tornado import concurrent, httpserver, ioloop import sprockets.logging @@ -41,18 +41,34 @@ class Runner(object): """ - def __init__(self, application): + def __init__(self, application, before_run=None, on_start=None, + shutdown=None): + """Create a new instance of the runner. + + :param application: The application instance to run + :type application: tornado.web.Application + :param list before_run: Callbacks to invoke before starting + :param list on_start: Callbacks to invoke after starting the IOLoop + :param list shutdown: Callbacks to invoke on shutdown + + """ self.application = application self.logger = logging.getLogger('Runner') self.server = None self.shutdown_limit = 5 + self._pending_callbacks = 0 try: - self.application.runner_callbacks.setdefault('shutdown', []) - self.application.runner_callbacks.setdefault('before_run', []) + self.application.runner_callbacks.setdefault('before_run', + before_run or []) + self.application.runner_callbacks.setdefault('on_start', + on_start or []) + self.application.runner_callbacks.setdefault('shutdown', + shutdown or []) except AttributeError: setattr(self.application, 'runner_callbacks', { - 'shutdown': [], - 'before_run': [], + 'before_run': before_run or [], + 'on_start': on_start or [], + 'shutdown': shutdown or [] }) def start_server(self, port_number, number_of_procs=0): @@ -83,6 +99,10 @@ class Runner(object): self.server.bind(port_number) self.server.start(number_of_procs) + def stop_server(self): + """Stop the HTTP Server""" + self.server.stop() + def run(self, port_number, number_of_procs=0): """ Create the server and run the IOLoop. @@ -98,9 +118,13 @@ class Runner(object): raises an exception, then the application is terminated by calling :func:`sys.exit`. + If any ``on_start`` callbacks are registered, they will be added to + the Tornado IOLoop for execution after the IOLoop is started. + """ self.start_server(port_number, number_of_procs) iol = ioloop.IOLoop.instance() + for callback in self.application.runner_callbacks['before_run']: try: callback(self.application, iol) @@ -110,21 +134,29 @@ class Runner(object): self._shutdown() sys.exit(70) + # Add any on start callbacks + for callback in self.application.runner_callbacks['on_start']: + iol.spawn_callback(callback, self.application, iol) + + # Start the IOLoop and block iol.start() def _on_signal(self, signo, frame): self.logger.info('signal %s received, stopping', signo) ioloop.IOLoop.instance().add_callback_from_signal(self._shutdown) - def _shutdown(self): - for callback in self.application.runner_callbacks['shutdown']: - try: - callback(self.application) - except Exception: - self.logger.warning('shutdown callback %r raised an exception', - callback, exc_info=1) + def _on_shutdown_future_complete(self, response): + self._pending_callbacks -= 1 + if response.exception(): + self.logger.warning('shutdown callback raised an exception', + response.exception, exc_info=1) + else: + self.logger.debug('Future callback result: %r', response.result()) + if not self._pending_callbacks: + self._on_shutdown_ready() - self.server.stop() + def _on_shutdown_ready(self): + self.logger.debug('Stopping IOLoop') iol = ioloop.IOLoop.instance() deadline = iol.time() + self.shutdown_limit @@ -137,3 +169,26 @@ class Runner(object): self.logger.info('stopping within %s seconds', self.shutdown_limit) maybe_stop() + + def _shutdown(self): + self.logger.debug('Shutting down') + + # Ensure the HTTP server is stopped + self.stop_server() + + iol = ioloop.IOLoop.instance() + + # Iterate through the callbacks, dealing with futures when returned + for callback in self.application.runner_callbacks['shutdown']: + try: + response = callback(self.application) + if concurrent.is_future(response): + self._pending_callbacks += 1 + iol.add_future(response, self._on_shutdown_future_complete) + except Exception: + self.logger.warning('shutdown callback %r raised an exception', + callback, exc_info=1) + + # If no futures were return, invoke on shutdown ready + if not self._pending_callbacks: + self._on_shutdown_ready() diff --git a/tests.py b/tests.py index 00f000a..b9da0e9 100644 --- a/tests.py +++ b/tests.py @@ -5,7 +5,7 @@ import json import time import unittest -from tornado import httputil, testing, web +from tornado import concurrent, httputil, ioloop, testing, web import mock import sprockets.http.mixins @@ -396,6 +396,7 @@ class RunnerTests(MockHelper, unittest.TestCase): application = web.Application() _ = sprockets.http.runner.Runner(application) self.assertEqual(application.runner_callbacks['before_run'], []) + self.assertEqual(application.runner_callbacks['on_start'], []) self.assertEqual(application.runner_callbacks['shutdown'], []) def test_that_signal_handler_invokes_shutdown(self): @@ -450,3 +451,45 @@ class RunnerTests(MockHelper, unittest.TestCase): runner._shutdown() self.io_loop.stop.assert_called_once_with() self.assertNotEqual(self.io_loop._timeouts, []) + + +class AsyncRunTests(unittest.TestCase): + + def test_that_on_start_callbacks_are_invoked(self): + future = concurrent.Future() + + def on_started(*args, **kwargs): + with mock.patch('sprockets.http.runner.Runner.stop_server'): + runner._shutdown() + future.set_result(True) + + application = web.Application() + with mock.patch('sprockets.http.runner.Runner.start_server'): + runner = sprockets.http.runner.Runner(application, + on_start=[on_started]) + runner.run(8000) + self.assertTrue(future.result()) + + + def test_that_shutdown_futures_are_waited_on(self): + future = concurrent.Future() + + def on_started(*args, **kwargs): + with mock.patch('sprockets.http.runner.Runner.stop_server'): + runner._shutdown() + + def on_shutdown(*args, **kwargs): + def shutdown_complete(): + future.set_result(True) + + ioloop.IOLoop.current().add_timeout(1, shutdown_complete) + return future + + application = web.Application() + with mock.patch('sprockets.http.runner.Runner.start_server'): + runner = sprockets.http.runner.Runner(application, + on_start=[on_started], + shutdown=[on_shutdown]) + runner.run(8000) + + self.assertTrue(future.result())