Add async shutdown support an on_start callbacks

This commit is contained in:
Gavin M. Roy 2016-03-10 17:10:31 -05:00
parent de4c434d38
commit d17c668f83
3 changed files with 120 additions and 17 deletions

View file

@ -2,7 +2,7 @@ import logging
import os import os
version_info = (1, 1, 2) version_info = (1, 2, 0)
__version__ = '.'.join(str(v) for v in version_info) __version__ = '.'.join(str(v) for v in version_info)
@ -69,8 +69,13 @@ def run(create_application, settings=None, log_config=None):
sub-processes are forked (if necessary) and before the IOLoop is sub-processes are forked (if necessary) and before the IOLoop is
started. started.
The *on_start* key contains functions that are invoked when the IOLoop
is started.
The *shutdown* key contains functions that are invoked when a stop The *shutdown* key contains functions that are invoked when a stop
signal is received *before* the IOLoop is stopped. signal is received *before* the IOLoop is stopped. These functions
can return a :class:`~tornado.concurrent.Future` to allow for asynchronous
processing of events during the shutdown phase.
""" """
from . import runner from . import runner

View file

@ -10,7 +10,7 @@ import logging
import signal import signal
import sys import sys
from tornado import httpserver, ioloop from tornado import concurrent, httpserver, ioloop
import sprockets.logging import sprockets.logging
@ -41,18 +41,34 @@ class Runner(object):
""" """
def __init__(self, application): def __init__(self, application, before_run=None, on_start=None,
shutdown=None):
"""Create a new instance of the runner.
:param application: The application instance to run
:type application: tornado.web.Application
:param list before_run: Callbacks to invoke before starting
:param list on_start: Callbacks to invoke after starting the IOLoop
:param list shutdown: Callbacks to invoke on shutdown
"""
self.application = application self.application = application
self.logger = logging.getLogger('Runner') self.logger = logging.getLogger('Runner')
self.server = None self.server = None
self.shutdown_limit = 5 self.shutdown_limit = 5
self._pending_callbacks = 0
try: try:
self.application.runner_callbacks.setdefault('shutdown', []) self.application.runner_callbacks.setdefault('before_run',
self.application.runner_callbacks.setdefault('before_run', []) before_run or [])
self.application.runner_callbacks.setdefault('on_start',
on_start or [])
self.application.runner_callbacks.setdefault('shutdown',
shutdown or [])
except AttributeError: except AttributeError:
setattr(self.application, 'runner_callbacks', { setattr(self.application, 'runner_callbacks', {
'shutdown': [], 'before_run': before_run or [],
'before_run': [], 'on_start': on_start or [],
'shutdown': shutdown or []
}) })
def start_server(self, port_number, number_of_procs=0): def start_server(self, port_number, number_of_procs=0):
@ -83,6 +99,10 @@ class Runner(object):
self.server.bind(port_number) self.server.bind(port_number)
self.server.start(number_of_procs) self.server.start(number_of_procs)
def stop_server(self):
"""Stop the HTTP Server"""
self.server.stop()
def run(self, port_number, number_of_procs=0): def run(self, port_number, number_of_procs=0):
""" """
Create the server and run the IOLoop. Create the server and run the IOLoop.
@ -98,9 +118,13 @@ class Runner(object):
raises an exception, then the application is terminated by calling raises an exception, then the application is terminated by calling
:func:`sys.exit`. :func:`sys.exit`.
If any ``on_start`` callbacks are registered, they will be added to
the Tornado IOLoop for execution after the IOLoop is started.
""" """
self.start_server(port_number, number_of_procs) self.start_server(port_number, number_of_procs)
iol = ioloop.IOLoop.instance() iol = ioloop.IOLoop.instance()
for callback in self.application.runner_callbacks['before_run']: for callback in self.application.runner_callbacks['before_run']:
try: try:
callback(self.application, iol) callback(self.application, iol)
@ -110,21 +134,29 @@ class Runner(object):
self._shutdown() self._shutdown()
sys.exit(70) sys.exit(70)
# Add any on start callbacks
for callback in self.application.runner_callbacks['on_start']:
iol.spawn_callback(callback, self.application, iol)
# Start the IOLoop and block
iol.start() iol.start()
def _on_signal(self, signo, frame): def _on_signal(self, signo, frame):
self.logger.info('signal %s received, stopping', signo) self.logger.info('signal %s received, stopping', signo)
ioloop.IOLoop.instance().add_callback_from_signal(self._shutdown) ioloop.IOLoop.instance().add_callback_from_signal(self._shutdown)
def _shutdown(self): def _on_shutdown_future_complete(self, response):
for callback in self.application.runner_callbacks['shutdown']: self._pending_callbacks -= 1
try: if response.exception():
callback(self.application) self.logger.warning('shutdown callback raised an exception',
except Exception: response.exception, exc_info=1)
self.logger.warning('shutdown callback %r raised an exception', else:
callback, exc_info=1) self.logger.debug('Future callback result: %r', response.result())
if not self._pending_callbacks:
self._on_shutdown_ready()
self.server.stop() def _on_shutdown_ready(self):
self.logger.debug('Stopping IOLoop')
iol = ioloop.IOLoop.instance() iol = ioloop.IOLoop.instance()
deadline = iol.time() + self.shutdown_limit deadline = iol.time() + self.shutdown_limit
@ -137,3 +169,26 @@ class Runner(object):
self.logger.info('stopping within %s seconds', self.shutdown_limit) self.logger.info('stopping within %s seconds', self.shutdown_limit)
maybe_stop() maybe_stop()
def _shutdown(self):
self.logger.debug('Shutting down')
# Ensure the HTTP server is stopped
self.stop_server()
iol = ioloop.IOLoop.instance()
# Iterate through the callbacks, dealing with futures when returned
for callback in self.application.runner_callbacks['shutdown']:
try:
response = callback(self.application)
if concurrent.is_future(response):
self._pending_callbacks += 1
iol.add_future(response, self._on_shutdown_future_complete)
except Exception:
self.logger.warning('shutdown callback %r raised an exception',
callback, exc_info=1)
# If no futures were return, invoke on shutdown ready
if not self._pending_callbacks:
self._on_shutdown_ready()

View file

@ -5,7 +5,7 @@ import json
import time import time
import unittest import unittest
from tornado import httputil, testing, web from tornado import concurrent, httputil, ioloop, testing, web
import mock import mock
import sprockets.http.mixins import sprockets.http.mixins
@ -396,6 +396,7 @@ class RunnerTests(MockHelper, unittest.TestCase):
application = web.Application() application = web.Application()
_ = sprockets.http.runner.Runner(application) _ = sprockets.http.runner.Runner(application)
self.assertEqual(application.runner_callbacks['before_run'], []) self.assertEqual(application.runner_callbacks['before_run'], [])
self.assertEqual(application.runner_callbacks['on_start'], [])
self.assertEqual(application.runner_callbacks['shutdown'], []) self.assertEqual(application.runner_callbacks['shutdown'], [])
def test_that_signal_handler_invokes_shutdown(self): def test_that_signal_handler_invokes_shutdown(self):
@ -450,3 +451,45 @@ class RunnerTests(MockHelper, unittest.TestCase):
runner._shutdown() runner._shutdown()
self.io_loop.stop.assert_called_once_with() self.io_loop.stop.assert_called_once_with()
self.assertNotEqual(self.io_loop._timeouts, []) self.assertNotEqual(self.io_loop._timeouts, [])
class AsyncRunTests(unittest.TestCase):
def test_that_on_start_callbacks_are_invoked(self):
future = concurrent.Future()
def on_started(*args, **kwargs):
with mock.patch('sprockets.http.runner.Runner.stop_server'):
runner._shutdown()
future.set_result(True)
application = web.Application()
with mock.patch('sprockets.http.runner.Runner.start_server'):
runner = sprockets.http.runner.Runner(application,
on_start=[on_started])
runner.run(8000)
self.assertTrue(future.result())
def test_that_shutdown_futures_are_waited_on(self):
future = concurrent.Future()
def on_started(*args, **kwargs):
with mock.patch('sprockets.http.runner.Runner.stop_server'):
runner._shutdown()
def on_shutdown(*args, **kwargs):
def shutdown_complete():
future.set_result(True)
ioloop.IOLoop.current().add_timeout(1, shutdown_complete)
return future
application = web.Application()
with mock.patch('sprockets.http.runner.Runner.start_server'):
runner = sprockets.http.runner.Runner(application,
on_start=[on_started],
shutdown=[on_shutdown])
runner.run(8000)
self.assertTrue(future.result())