Add compatibility with sprockets.http.

This commit is contained in:
Dave Shawley 2021-04-24 09:15:50 -04:00
parent b823d3a521
commit a48453b70e
No known key found for this signature in database
GPG key ID: F41A8A99298F8EED
5 changed files with 147 additions and 23 deletions

View file

@ -1,6 +1,8 @@
:tag:`Next release <0.0.1...main>`
----------------------------------
- Added :envvar:`STATSD_ENABLED` environment variable to disable the Tornado integration
- Tornado application mixin automatically installs start/stop hooks if the application
quacks like a ``sprockets.http.app.Application``.
:tag:`0.0.1 <832f8af7...0.0.1>` (08-Apr-2021)
---------------------------------------------

View file

@ -115,6 +115,42 @@ Metrics are sent by a ``asyncio.Task`` that is started by ``start_statsd``. The
metric data onto a ``asyncio.Queue`` that the task reads from. Metric data remains on the queue when the task is
not connected to the server and will be sent in the order received when the task establishes the server connection.
Integration with sprockets.http
===============================
If you use `sprockets.http`_ in your application stack, then the Tornado integration will detect it and install the
initialization and shutdown hooks for you. The application will *just work* provided that the `$STATSD_HOST`
and `$STATSD_PREFIX` environment variables are set appropriately. The following snippet will produce the same result
as the Tornado example even without setting the prefix:
.. code-block:: python
class Application(sprockets_statsd.tornado.Application,
sprockets.http.app.Application):
def __init__(self, **settings):
statsd = settings.setdefault('statsd', {})
statsd.setdefault('host', os.environ['STATSD_HOST'])
statsd.setdefault('protocol', 'tcp')
settings.update({
'service': 'my-service',
'environment': os.environ.get('ENVIRONMENT', 'development'),
'statsd': statsd,
'version': getattr(__package__, 'version'),
})
super().__init__([web.url('/', MyHandler)], **settings)
if __name__ == '__main__':
sprockets.http.run(Application, log_config=...)
Definint the ``service`` and ``environment`` in `settings` as above will result in the prefix being set to::
applications.{self.settings["service"]}.{self.settings["environment"]}
The recommended usage is to:
#. define ``service``, ``environment``, and ``version`` in the settings
#. explicitly set the ``host`` and ``protocol`` settings in ``self.settings["statsd"]``
.. _sprockets.http: https://sprocketshttp.readthedocs.io/en/master/
.. _statsd: https://github.com/statsd/statsd/
.. _tornado: https://tornadoweb.org/

View file

@ -41,6 +41,7 @@ dev =
flake8-import-order==0.18.1
sphinx==3.5.2
sphinx-autodoc-typehints==1.11.1
sprockets.http==2.2.0
tornado>=5
yapf==0.30.0
readthedocs =

View file

@ -1,10 +1,11 @@
import contextlib
import logging
import os
import socket
import time
import typing
from tornado import web
from tornado import ioloop, web
from sprockets_statsd import statsd
@ -114,7 +115,13 @@ class Application(web.Application):
self.settings['statsd']['port'] = int(self.settings['statsd']['port'])
self.statsd_connector = None
async def start_statsd(self) -> None:
try:
self.on_start_callbacks.append(self.start_statsd)
self.on_shutdown_callbacks.append(self.stop_statsd)
except AttributeError:
pass
async def start_statsd(self, *_) -> None:
"""Start the connector during startup.
Call this method during application startup to enable the statsd
@ -124,7 +131,9 @@ class Application(web.Application):
"""
if self.statsd_connector is None:
logger = self.__get_logger()
if not self.settings['statsd']['enabled']:
logger.info('statsd connector is disabled by configuration')
self.statsd_connector = statsd.AbstractConnector()
else:
kwargs = self.settings['statsd'].copy()
@ -135,14 +144,20 @@ class Application(web.Application):
elif protocol == 'udp':
kwargs['ip_protocol'] = socket.IPPROTO_UDP
else:
raise RuntimeError(
return self.__handle_fatal_error(
f'statsd configuration error: {protocol} is not '
f'a valid protocol')
self.statsd_connector = statsd.Connector(**kwargs)
logger.info('creating %s statsd connector', protocol.upper())
try:
self.statsd_connector = statsd.Connector(**kwargs)
except RuntimeError as error:
return self.__handle_fatal_error(
'statsd.Connector failed to start', error)
await self.statsd_connector.start()
async def stop_statsd(self) -> None:
async def stop_statsd(self, *_) -> None:
"""Stop the connector during shutdown.
If the connector was started, then this method will gracefully
@ -154,6 +169,26 @@ class Application(web.Application):
await self.statsd_connector.stop()
self.statsd_connector = None
def __handle_fatal_error(self,
message: str,
exc: typing.Optional[Exception] = None):
logger = self.__get_logger()
if exc is not None:
logger.exception('%s', message)
else:
logger.error('%s', message)
if hasattr(self, 'stop'):
self.stop(ioloop.IOLoop.current())
else:
raise RuntimeError(message)
def __get_logger(self) -> logging.Logger:
try:
return getattr(self, 'logger')
except AttributeError:
return logging.getLogger(__package__).getChild(
'tornado.Application')
class RequestHandler(web.RequestHandler):
"""Mix this into your handler to send metrics to a statsd server."""

View file

@ -3,7 +3,10 @@ import os
import socket
import time
import typing
import unittest.mock
import sprockets.http.app
import sprockets.http.testing
from tornado import testing, web
import sprockets_statsd.statsd
@ -255,30 +258,30 @@ class ApplicationTests(AsyncTestCaseWithTimeout):
sprockets_statsd.statsd.AbstractConnector)
class RequestHandlerTests(AsyncTestCaseWithTimeout, testing.AsyncHTTPTestCase):
def setUp(self):
super().setUp()
self.statsd_server = helpers.StatsdServer(socket.IPPROTO_TCP)
self.io_loop.spawn_callback(self.statsd_server.run)
self.run_coroutine(self.statsd_server.wait_running())
class StatsdTestCase(AsyncTestCaseWithTimeout, testing.AsyncHTTPTestCase):
Application = web.Application
self.app.settings['statsd'].update({
'host': self.statsd_server.host,
'port': self.statsd_server.port,
})
self.run_coroutine(self.app.start_statsd())
def setUp(self):
self.statsd_server = None
super().setUp()
def tearDown(self):
self.run_coroutine(self.app.stop_statsd())
self.statsd_server.close()
self.run_coroutine(self.statsd_server.wait_closed())
if self.statsd_server is not None:
self.statsd_server.close()
self.run_coroutine(self.statsd_server.wait_closed())
super().tearDown()
def get_app(self):
self.app = Application(statsd={
'prefix': 'applications.service',
'protocol': 'tcp',
})
self.statsd_server = helpers.StatsdServer(socket.IPPROTO_TCP)
self.io_loop.spawn_callback(self.statsd_server.run)
self.run_coroutine(self.statsd_server.wait_running())
self.app = self.Application(shutdown_limit=0.5,
statsd={
'host': self.statsd_server.host,
'port': self.statsd_server.port,
'prefix': 'applications.service',
'protocol': 'tcp',
})
return self.app
def wait_for_metrics(self, metric_count=3):
@ -310,6 +313,19 @@ class RequestHandlerTests(AsyncTestCaseWithTimeout, testing.AsyncHTTPTestCase):
return self.parse_metric(line)
self.fail(f'failed to find metric containing {needle!r}')
class RequestHandlerTests(StatsdTestCase, AsyncTestCaseWithTimeout,
testing.AsyncHTTPTestCase):
Application = Application
def setUp(self):
super().setUp()
self.run_coroutine(self.app.start_statsd())
def tearDown(self):
self.run_coroutine(self.app.stop_statsd())
super().tearDown()
def test_the_request_metric_is_sent_last(self):
rsp = self.fetch('/')
self.assertEqual(200, rsp.code)
@ -343,3 +359,37 @@ class RequestHandlerTests(AsyncTestCaseWithTimeout, testing.AsyncHTTPTestCase):
rsp = self.fetch('/')
self.assertEqual(200, rsp.code)
class SprocketsHttpInteropTests(StatsdTestCase, AsyncTestCaseWithTimeout,
sprockets.http.testing.SprocketsHttpTestCase):
class Application(sprockets_statsd.tornado.Application,
sprockets.http.app.Application):
def __init__(self, **settings):
super().__init__([web.url('/', Handler)], **settings)
def setUp(self):
super().setUp()
self.let_callbacks_run()
def let_callbacks_run(self):
self.io_loop.run_sync(lambda: asyncio.sleep(0))
def test_that_callbacks_are_installed(self):
self.assertIn(self.app.start_statsd, self.app.on_start_callbacks)
self.assertIn(self.app.stop_statsd, self.app.on_shutdown_callbacks)
def test_that_statsd_connector_is_enabled(self):
# verifies that the start callback actually runs correctly.
self.assertIsNotNone(self.app.statsd_connector,
'statsd_connecter was never created')
def test_that_misconfiguration_stops_application(self):
another_app = self.Application(statsd={
'host': '',
'prefix': 'whatever',
})
another_app.stop = unittest.mock.Mock(wraps=another_app.stop)
another_app.start(self.io_loop)
self.let_callbacks_run()
another_app.stop.assert_called_once_with(self.io_loop)