import asyncio import logging import sys import warnings from tornado import concurrent, web class _ShutdownHandler: """Keeps track of the application state during shutdown.""" def __init__(self, io_loop, shutdown_limit, wait_timeout): self.io_loop = io_loop self.logger = logging.getLogger(self.__class__.__name__) self.pending_callbacks = 0 self.shutdown_limit = shutdown_limit self.wait_timeout = wait_timeout self.__deadline = None def add_future(self, future): self.pending_callbacks += 1 self.io_loop.add_future(future, self.on_shutdown_future_complete) def on_shutdown_future_complete(self, future): self.pending_callbacks -= 1 if future.exception(): if any(sys.exc_info()): self.logger.exception('shutdown callback raised exception') else: self.logger.warning('shutdown callback raised exception: %r', exc_info=(None, future.exception(), None)) else: self.logger.debug('shutdown future completed: %r, %d pending', future.result(), self.pending_callbacks) if not self.pending_callbacks: self.on_shutdown_ready() def on_shutdown_ready(self): self.logger.info('starting IOLoop shutdown process') self.__deadline = self.io_loop.time() + self.shutdown_limit self._maybe_stop() def _maybe_stop(self): all_tasks = self._all_tasks() now = self.io_loop.time() if now < self.__deadline and all_tasks: self.io_loop.add_timeout(now + self.wait_timeout, self._maybe_stop) else: self.io_loop.stop() self.logger.info('stopped IOLoop') def _all_tasks(self): if hasattr(asyncio, 'all_tasks'): return asyncio.all_tasks(self.io_loop.asyncio_loop) return asyncio.Task.all_tasks(self.io_loop.asyncio_loop) class CallbackManager: """ Application state management. This is where the core of the application wrapper actually lives. It is responsible for managing and calling the various application callbacks. Sub-classes are responsible for gluing in the actual :class:`tornado.web.Application` object and the :mod:`sprockets.http.runner` module is responsible for starting up the HTTP stack and calling the :meth:`.start` and :meth:`.stop` methods. .. attribute:: runner_callbacks :class:`dict` of lists of callback functions to call at certain points in the application lifecycle. See :attr:`.before_run_callbacks`, :attr:`.on_start_callbacks`, and :attr:`on_shutdown_callbacks`. .. deprecated:: 1.4 Use the property callbacks instead of this dictionary. It will be going away in a future release. """ def __init__(self, tornado_application, *args, **kwargs): self.runner_callbacks = kwargs.pop('runner_callbacks', {}) super().__init__(*args, **kwargs) self._tornado_application = tornado_application self.logger = logging.getLogger(self.__class__.__name__) self.runner_callbacks.setdefault('before_run', []) self.runner_callbacks.setdefault('on_start', []) self.runner_callbacks.setdefault('shutdown', []) def start(self, io_loop): """ Run the ``before_run`` callbacks and queue to ``on_start`` callbacks. :param tornado.ioloop.IOLoop io_loop: loop to start the app on. """ for callback in self.before_run_callbacks: try: callback(self.tornado_application, io_loop) except Exception: self.logger.error('before_run callback %r cancelled start', callback, exc_info=1) self.stop(io_loop) raise for callback in self.on_start_callbacks: io_loop.spawn_callback(callback, self.tornado_application, io_loop) def stop(self, io_loop, shutdown_limit=5.0, wait_timeout=1.0): """ Asynchronously stop the application. :param tornado.ioloop.IOLoop io_loop: loop to run until all callbacks, timeouts, and queued calls are complete :param float shutdown_limit: maximum number of seconds to wait before terminating :param float wait_timeout: number of seconds to wait between checks for pending callbacks & timers Call this method to start the application shutdown process. The IOLoop will be stopped once the application is completely shut down or after `shutdown_limit` seconds. """ running_async = False shutdown = _ShutdownHandler(io_loop, shutdown_limit, wait_timeout) for callback in self.on_shutdown_callbacks: try: maybe_future = callback(self.tornado_application) if asyncio.iscoroutine(maybe_future): maybe_future = io_loop.asyncio_loop.create_task( maybe_future) if concurrent.is_future(maybe_future): shutdown.add_future(maybe_future) running_async = True except Exception as error: self.logger.warning('exception raised from shutdown ' 'callback %r, ignored: %s', callback, error, exc_info=1) if not running_async: shutdown.on_shutdown_ready() @property def before_run_callbacks(self): """ List of synchronous functions called before the IOLoop is started. The *before_run* callbacks are called after the IOLoop is created and before it is started. The callbacks are run synchronously and the application will exit if a callback raises an exception. **Signature**: callback(application, io_loop) """ return self.runner_callbacks['before_run'] @property def on_start_callbacks(self): """ List of asynchronous functions spawned before the IOLoop is started. The *on_start* callbacks are spawned after the IOLoop is created and before it is started. The callbacks are run asynchronously via :meth:`tornado.ioloop.IOLoop.spawn_callback` as soon as the IOLoop is started. **Signature**: callback(application, io_loop) """ return self.runner_callbacks['on_start'] @property def on_shutdown_callbacks(self): """ List of functions when the application is shutting down. The *on_shutdown* callbacks are called after the HTTP server has been stopped. If a callback returns a :class:`tornado.concurrent.Future` instance, then the future is added to the IOLoop. **Signature**: callback(application) """ return self.runner_callbacks['shutdown'] @property def tornado_application(self): """The underlying :class:`tornado.web.Application` instance.""" return self._tornado_application class Application(CallbackManager, web.Application): """ Callback-aware version of :class:`tornado.web.Application`. Using this class instead of the vanilla Tornado ``Application`` class provides a clean way to customize application-level constructs such as connection pools. Note that much of the functionality is implemented in :class:`.CallbackManager`. """ def __init__(self, *args, **kwargs): super().__init__(self, *args, **kwargs) class _ApplicationAdapter(CallbackManager): """ Simple adapter for a :class:`tornado.web.Application` instance. This class adapts/wraps a :class:`~tornado.web.Application` instance and adds callback management in a backwards compatible manner. .. warning:: Do not use this class directly. Either switch to using :class:`.Application` explicitly or call :func:`.wrap_application` to wrap your current ``Application`` instance. """ def __init__(self, application): self._application = application self.settings = self._application.settings super().__init__( self._application, runner_callbacks=getattr(application, 'runner_callbacks', {})) setattr(self._application, 'runner_callbacks', self.runner_callbacks) def wrap_application(application, before_run, on_start, shutdown): """ Wrap a tornado application in a callback-aware wrapper. :param tornado.web.Application application: application to wrap. :param list|NoneType before_run: optional list of callbacks to invoke before the IOLoop is started. :param list|NoneType on_start: optional list of callbacks to register with :meth:`~tornado.IOLoop.spawn_callback`. :param list|NoneType shutdown: optional list of callbacks to invoke before stopping the IOLoop :return: a wrapped application object :rtype: sprockets.http.app.Application """ before_run = [] if before_run is None else before_run on_start = [] if on_start is None else on_start shutdown = [] if shutdown is None else shutdown if not isinstance(application, Application): warnings.warn( 'sprockets.http.run is only going to accept ' 'sprockets.app.Application instances in 3.0, ' 'was called with {}'.format(type(application).__name__), category=DeprecationWarning) application = _ApplicationAdapter(application) application.before_run_callbacks.extend(before_run) application.on_start_callbacks.extend(on_start) application.on_shutdown_callbacks.extend(shutdown) return application