Merge pull request #7 from dave-shawley/sprockets-http

Detect sprockets.http and make use of it.
This commit is contained in:
Gavin M. Roy 2021-05-10 10:24:33 -04:00 committed by GitHub
commit 0fbff9e19c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 234 additions and 28 deletions

View file

@ -1,6 +1,9 @@
:tag:`Next release <0.0.1...main>` :tag:`Next release <0.0.1...main>`
---------------------------------- ----------------------------------
- Added :envvar:`STATSD_ENABLED` environment variable to disable the Tornado integration - Added :envvar:`STATSD_ENABLED` environment variable to disable the Tornado integration
- Tornado application mixin automatically installs start/stop hooks if the application
quacks like a ``sprockets.http.app.Application``.
- Limit logging when disconnected from statsd
:tag:`0.0.1 <832f8af7...0.0.1>` (08-Apr-2021) :tag:`0.0.1 <832f8af7...0.0.1>` (08-Apr-2021)
--------------------------------------------- ---------------------------------------------

View file

@ -115,6 +115,42 @@ Metrics are sent by a ``asyncio.Task`` that is started by ``start_statsd``. The
metric data onto a ``asyncio.Queue`` that the task reads from. Metric data remains on the queue when the task is metric data onto a ``asyncio.Queue`` that the task reads from. Metric data remains on the queue when the task is
not connected to the server and will be sent in the order received when the task establishes the server connection. not connected to the server and will be sent in the order received when the task establishes the server connection.
Integration with sprockets.http
===============================
If you use `sprockets.http`_ in your application stack, then the Tornado integration will detect it and install the
initialization and shutdown hooks for you. The application will *just work* provided that the `$STATSD_HOST`
and `$STATSD_PREFIX` environment variables are set appropriately. The following snippet will produce the same result
as the Tornado example even without setting the prefix:
.. code-block:: python
class Application(sprockets_statsd.tornado.Application,
sprockets.http.app.Application):
def __init__(self, **settings):
statsd = settings.setdefault('statsd', {})
statsd.setdefault('host', os.environ['STATSD_HOST'])
statsd.setdefault('protocol', 'tcp')
settings.update({
'service': 'my-service',
'environment': os.environ.get('ENVIRONMENT', 'development'),
'statsd': statsd,
'version': getattr(__package__, 'version'),
})
super().__init__([web.url('/', MyHandler)], **settings)
if __name__ == '__main__':
sprockets.http.run(Application, log_config=...)
Definint the ``service`` and ``environment`` in `settings` as above will result in the prefix being set to::
applications.{self.settings["service"]}.{self.settings["environment"]}
The recommended usage is to:
#. define ``service``, ``environment``, and ``version`` in the settings
#. explicitly set the ``host`` and ``protocol`` settings in ``self.settings["statsd"]``
.. _sprockets.http: https://sprocketshttp.readthedocs.io/en/master/
.. _statsd: https://github.com/statsd/statsd/ .. _statsd: https://github.com/statsd/statsd/
.. _tornado: https://tornadoweb.org/ .. _tornado: https://tornadoweb.org/

View file

@ -41,6 +41,7 @@ dev =
flake8-import-order==0.18.1 flake8-import-order==0.18.1
sphinx==3.5.2 sphinx==3.5.2
sphinx-autodoc-typehints==1.11.1 sphinx-autodoc-typehints==1.11.1
sprockets.http==2.2.0
tornado>=5 tornado>=5
yapf==0.30.0 yapf==0.30.0
readthedocs = readthedocs =

View file

@ -4,6 +4,47 @@ import socket
import typing import typing
class ThrottleGuard:
"""Prevent code from executing repeatedly.
:param threshold: guarding threshold
This abstraction allows code to execute the first "threshold"
times and then only once per "threshold" times afterwards. Use
it to ensure that log statements are continuously written during
persistent error conditions. The goal is to provide regular
feedback while limiting the amount of log spam.
The following snippet will log the first 100 failures and then
once every 100 failures thereafter:
.. code-block:: python
executions = 0
guard = ThrottleGuard(100)
for _ in range(1000):
if guard.allow_execution():
executions += 1
logging.info('called %s times instead of %s times',
executions, guard.counter)
"""
def __init__(self, threshold: int):
self.counter = 0
self.threshold = threshold
def allow_execution(self) -> bool:
"""Should this execution be allowed?"""
self.counter += 1
allow = (self.counter < self.threshold
or (self.counter % self.threshold) == 0)
return allow
def reset(self) -> None:
"""Reset counter after error has resolved."""
self.counter = 0
class AbstractConnector: class AbstractConnector:
"""StatsD connector that does not send metrics or connect. """StatsD connector that does not send metrics or connect.
@ -137,6 +178,7 @@ class Connector(AbstractConnector):
self.logger = logging.getLogger(__package__).getChild('Connector') self.logger = logging.getLogger(__package__).getChild('Connector')
self.prefix = f'{prefix}.' if prefix else prefix self.prefix = f'{prefix}.' if prefix else prefix
self.processor = Processor(host=host, port=port, **kwargs) self.processor = Processor(host=host, port=port, **kwargs)
self._enqueue_log_guard = ThrottleGuard(100)
self._processor_task: typing.Optional[asyncio.Task[None]] = None self._processor_task: typing.Optional[asyncio.Task[None]] = None
async def start(self) -> None: async def start(self) -> None:
@ -174,7 +216,9 @@ class Connector(AbstractConnector):
payload = f'{self.prefix}{path}:{value}|{type_code}' payload = f'{self.prefix}{path}:{value}|{type_code}'
try: try:
self.processor.enqueue(payload.encode('utf-8')) self.processor.enqueue(payload.encode('utf-8'))
self._enqueue_log_guard.reset()
except asyncio.QueueFull: except asyncio.QueueFull:
if self._enqueue_log_guard.allow_execution():
self.logger.warning('statsd queue is full, discarding metric') self.logger.warning('statsd queue is full, discarding metric')
@ -389,6 +433,7 @@ class Processor:
self.host = host self.host = host
self.port = port self.port = port
self._ip_protocol = ip_protocol self._ip_protocol = ip_protocol
self._connect_log_guard = ThrottleGuard(100)
self._reconnect_sleep = reconnect_sleep self._reconnect_sleep = reconnect_sleep
self._wait_timeout = wait_timeout self._wait_timeout = wait_timeout
@ -479,14 +524,21 @@ class Processor:
buffered_data = b'' buffered_data = b''
if self.protocol is not None: if self.protocol is not None:
buffered_data = self.protocol.buffered_data buffered_data = self.protocol.buffered_data
t, p = await self._create_transport() # type: ignore[misc] t, p = await self._create_transport() # type: ignore[misc]
transport, self.protocol = t, p transport, self.protocol = t, p
self.protocol.buffered_data = buffered_data self.protocol.buffered_data = buffered_data
self.logger.info('connection established to %s', self.logger.info(
transport.get_extra_info('peername')) 'connection established to %s after %s attempts',
transport.get_extra_info('peername'),
self._connect_log_guard.counter)
self._connect_log_guard.reset()
except IOError as error: except IOError as error:
self.logger.warning('connection to %s:%s failed: %s', if self._connect_log_guard.allow_execution():
self.host, self.port, error) self.logger.warning(
'connection to %s:%s failed: %s (%s attempts)',
self.host, self.port, error,
self._connect_log_guard.counter)
await asyncio.sleep(self._reconnect_sleep) await asyncio.sleep(self._reconnect_sleep)
async def _process_metric(self) -> None: async def _process_metric(self) -> None:

View file

@ -1,10 +1,11 @@
import contextlib import contextlib
import logging
import os import os
import socket import socket
import time import time
import typing import typing
from tornado import web from tornado import ioloop, web
from sprockets_statsd import statsd from sprockets_statsd import statsd
@ -114,7 +115,13 @@ class Application(web.Application):
self.settings['statsd']['port'] = int(self.settings['statsd']['port']) self.settings['statsd']['port'] = int(self.settings['statsd']['port'])
self.statsd_connector = None self.statsd_connector = None
async def start_statsd(self) -> None: try:
self.on_start_callbacks.append(self.start_statsd)
self.on_shutdown_callbacks.append(self.stop_statsd)
except AttributeError:
pass
async def start_statsd(self, *_) -> None:
"""Start the connector during startup. """Start the connector during startup.
Call this method during application startup to enable the statsd Call this method during application startup to enable the statsd
@ -124,7 +131,9 @@ class Application(web.Application):
""" """
if self.statsd_connector is None: if self.statsd_connector is None:
logger = self.__get_logger()
if not self.settings['statsd']['enabled']: if not self.settings['statsd']['enabled']:
logger.info('statsd connector is disabled by configuration')
self.statsd_connector = statsd.AbstractConnector() self.statsd_connector = statsd.AbstractConnector()
else: else:
kwargs = self.settings['statsd'].copy() kwargs = self.settings['statsd'].copy()
@ -135,14 +144,20 @@ class Application(web.Application):
elif protocol == 'udp': elif protocol == 'udp':
kwargs['ip_protocol'] = socket.IPPROTO_UDP kwargs['ip_protocol'] = socket.IPPROTO_UDP
else: else:
raise RuntimeError( return self.__handle_fatal_error(
f'statsd configuration error: {protocol} is not ' f'statsd configuration error: {protocol} is not '
f'a valid protocol') f'a valid protocol')
logger.info('creating %s statsd connector', protocol.upper())
try:
self.statsd_connector = statsd.Connector(**kwargs) self.statsd_connector = statsd.Connector(**kwargs)
except RuntimeError as error:
return self.__handle_fatal_error(
'statsd.Connector failed to start', error)
await self.statsd_connector.start() await self.statsd_connector.start()
async def stop_statsd(self) -> None: async def stop_statsd(self, *_) -> None:
"""Stop the connector during shutdown. """Stop the connector during shutdown.
If the connector was started, then this method will gracefully If the connector was started, then this method will gracefully
@ -154,6 +169,26 @@ class Application(web.Application):
await self.statsd_connector.stop() await self.statsd_connector.stop()
self.statsd_connector = None self.statsd_connector = None
def __handle_fatal_error(self,
message: str,
exc: typing.Optional[Exception] = None):
logger = self.__get_logger()
if exc is not None:
logger.exception('%s', message)
else:
logger.error('%s', message)
if hasattr(self, 'stop'):
self.stop(ioloop.IOLoop.current())
else:
raise RuntimeError(message)
def __get_logger(self) -> logging.Logger:
try:
return getattr(self, 'logger')
except AttributeError:
return logging.getLogger(__package__).getChild(
'tornado.Application')
class RequestHandler(web.RequestHandler): class RequestHandler(web.RequestHandler):
"""Mix this into your handler to send metrics to a statsd server.""" """Mix this into your handler to send metrics to a statsd server."""

View file

@ -3,6 +3,7 @@ import logging
import socket import socket
import time import time
import typing import typing
import unittest.mock
import asynctest import asynctest
@ -203,6 +204,17 @@ class TCPProcessingTests(ProcessorTestCase):
self.processor.queue.put_nowait(b'counter:1|c') self.processor.queue.put_nowait(b'counter:1|c')
await self.wait_for(self.statsd_server.message_received.acquire()) await self.wait_for(self.statsd_server.message_received.acquire())
async def test_that_disconnected_logging_is_throttled(self):
self.statsd_server.close()
await self.statsd_server.wait_closed()
self.processor.logger = unittest.mock.Mock()
self.processor._connect_log_guard.threshold = 10
self.processor._reconnect_sleep = 0
while self.processor._connect_log_guard.counter < (20 + 1):
await asyncio.sleep(0)
self.assertLess(self.processor.logger.warning.call_count, 20)
class UDPProcessingTests(ProcessorTestCase): class UDPProcessingTests(ProcessorTestCase):
ip_protocol = socket.IPPROTO_UDP ip_protocol = socket.IPPROTO_UDP
@ -354,6 +366,23 @@ class ConnectorTests(ProcessorTestCase):
self.assertEqual(f'counters.counter:{value}|c'.encode(), self.assertEqual(f'counters.counter:{value}|c'.encode(),
self.statsd_server.metrics.pop(0)) self.statsd_server.metrics.pop(0))
async def test_that_queue_full_logging_is_throttled(self):
await self.connector.processor.stop()
self.connector.logger = unittest.mock.Mock()
self.connector._enqueue_log_guard.threshold = 10
# fill up the queue
for _ in range(self.connector.processor.queue.maxsize):
self.connector.incr('counter')
# then overflow it a bunch of times
overflow_count = self.connector._enqueue_log_guard.threshold * 5
for _ in range(overflow_count):
self.connector.incr('counter')
self.assertLess(self.connector.logger.warning.call_count,
overflow_count)
class ConnectorOptionTests(ProcessorTestCase): class ConnectorOptionTests(ProcessorTestCase):
ip_protocol = socket.IPPROTO_TCP ip_protocol = socket.IPPROTO_TCP

View file

@ -3,7 +3,10 @@ import os
import socket import socket
import time import time
import typing import typing
import unittest.mock
import sprockets.http.app
import sprockets.http.testing
from tornado import testing, web from tornado import testing, web
import sprockets_statsd.statsd import sprockets_statsd.statsd
@ -255,27 +258,27 @@ class ApplicationTests(AsyncTestCaseWithTimeout):
sprockets_statsd.statsd.AbstractConnector) sprockets_statsd.statsd.AbstractConnector)
class RequestHandlerTests(AsyncTestCaseWithTimeout, testing.AsyncHTTPTestCase): class StatsdTestCase(AsyncTestCaseWithTimeout, testing.AsyncHTTPTestCase):
def setUp(self): Application = web.Application
super().setUp()
self.statsd_server = helpers.StatsdServer(socket.IPPROTO_TCP)
self.io_loop.spawn_callback(self.statsd_server.run)
self.run_coroutine(self.statsd_server.wait_running())
self.app.settings['statsd'].update({ def setUp(self):
'host': self.statsd_server.host, self.statsd_server = None
'port': self.statsd_server.port, super().setUp()
})
self.run_coroutine(self.app.start_statsd())
def tearDown(self): def tearDown(self):
self.run_coroutine(self.app.stop_statsd()) if self.statsd_server is not None:
self.statsd_server.close() self.statsd_server.close()
self.run_coroutine(self.statsd_server.wait_closed()) self.run_coroutine(self.statsd_server.wait_closed())
super().tearDown() super().tearDown()
def get_app(self): def get_app(self):
self.app = Application(statsd={ self.statsd_server = helpers.StatsdServer(socket.IPPROTO_TCP)
self.io_loop.spawn_callback(self.statsd_server.run)
self.run_coroutine(self.statsd_server.wait_running())
self.app = self.Application(shutdown_limit=0.5,
statsd={
'host': self.statsd_server.host,
'port': self.statsd_server.port,
'prefix': 'applications.service', 'prefix': 'applications.service',
'protocol': 'tcp', 'protocol': 'tcp',
}) })
@ -310,6 +313,19 @@ class RequestHandlerTests(AsyncTestCaseWithTimeout, testing.AsyncHTTPTestCase):
return self.parse_metric(line) return self.parse_metric(line)
self.fail(f'failed to find metric containing {needle!r}') self.fail(f'failed to find metric containing {needle!r}')
class RequestHandlerTests(StatsdTestCase, AsyncTestCaseWithTimeout,
testing.AsyncHTTPTestCase):
Application = Application
def setUp(self):
super().setUp()
self.run_coroutine(self.app.start_statsd())
def tearDown(self):
self.run_coroutine(self.app.stop_statsd())
super().tearDown()
def test_the_request_metric_is_sent_last(self): def test_the_request_metric_is_sent_last(self):
rsp = self.fetch('/') rsp = self.fetch('/')
self.assertEqual(200, rsp.code) self.assertEqual(200, rsp.code)
@ -343,3 +359,37 @@ class RequestHandlerTests(AsyncTestCaseWithTimeout, testing.AsyncHTTPTestCase):
rsp = self.fetch('/') rsp = self.fetch('/')
self.assertEqual(200, rsp.code) self.assertEqual(200, rsp.code)
class SprocketsHttpInteropTests(StatsdTestCase, AsyncTestCaseWithTimeout,
sprockets.http.testing.SprocketsHttpTestCase):
class Application(sprockets_statsd.tornado.Application,
sprockets.http.app.Application):
def __init__(self, **settings):
super().__init__([web.url('/', Handler)], **settings)
def setUp(self):
super().setUp()
self.let_callbacks_run()
def let_callbacks_run(self):
self.io_loop.run_sync(lambda: asyncio.sleep(0))
def test_that_callbacks_are_installed(self):
self.assertIn(self.app.start_statsd, self.app.on_start_callbacks)
self.assertIn(self.app.stop_statsd, self.app.on_shutdown_callbacks)
def test_that_statsd_connector_is_enabled(self):
# verifies that the start callback actually runs correctly.
self.assertIsNotNone(self.app.statsd_connector,
'statsd_connecter was never created')
def test_that_misconfiguration_stops_application(self):
another_app = self.Application(statsd={
'host': '',
'prefix': 'whatever',
})
another_app.stop = unittest.mock.Mock(wraps=another_app.stop)
another_app.start(self.io_loop)
self.let_callbacks_run()
another_app.stop.assert_called_once_with(self.io_loop)