From ed67689fe2ad25946008381bd2c66827a2516edb Mon Sep 17 00:00:00 2001 From: Dave Shawley Date: Sun, 9 May 2021 15:48:18 -0400 Subject: [PATCH] Limit logging when disconnected. Instead of logging a warning every time that the connection fails, only log the first 100 of them, then log every 100th time thereafter. --- CHANGELOG.rst | 1 + sprockets_statsd/statsd.py | 62 +++++++++++++++++++++++++++++++++++--- tests/test_processor.py | 29 ++++++++++++++++++ 3 files changed, 87 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 8f08110..425699c 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -3,6 +3,7 @@ - Added :envvar:`STATSD_ENABLED` environment variable to disable the Tornado integration - Tornado application mixin automatically installs start/stop hooks if the application quacks like a ``sprockets.http.app.Application``. +- Limit logging when disconnected from statsd :tag:`0.0.1 <832f8af7...0.0.1>` (08-Apr-2021) --------------------------------------------- diff --git a/sprockets_statsd/statsd.py b/sprockets_statsd/statsd.py index d8439af..513803e 100644 --- a/sprockets_statsd/statsd.py +++ b/sprockets_statsd/statsd.py @@ -4,6 +4,47 @@ import socket import typing +class ThrottleGuard: + """Prevent code from executing repeatedly. + + :param threshold: guarding threshold + + This abstraction allows code to execute the first "threshold" + times and then only once per "threshold" times afterwards. Use + it to ensure that log statements are continuously written during + persistent error conditions. The goal is to provide regular + feedback while limiting the amount of log spam. + + The following snippet will log the first 100 failures and then + once every 100 failures thereafter: + + .. code-block:: python + + executions = 0 + guard = ThrottleGuard(100) + for _ in range(1000): + if guard.allow_execution(): + executions += 1 + logging.info('called %s times instead of %s times', + executions, guard.counter) + + """ + def __init__(self, threshold: int): + self.counter = 0 + self.threshold = threshold + + def allow_execution(self) -> bool: + """Should this execution be allowed?""" + self.counter += 1 + allow = (self.counter < self.threshold + or (self.counter % self.threshold) == 0) + return allow + + def reset(self) -> None: + """Reset counter after error has resolved.""" + self.counter = 0 + + class AbstractConnector: """StatsD connector that does not send metrics or connect. @@ -137,6 +178,7 @@ class Connector(AbstractConnector): self.logger = logging.getLogger(__package__).getChild('Connector') self.prefix = f'{prefix}.' if prefix else prefix self.processor = Processor(host=host, port=port, **kwargs) + self._enqueue_log_guard = ThrottleGuard(100) self._processor_task: typing.Optional[asyncio.Task[None]] = None async def start(self) -> None: @@ -174,8 +216,10 @@ class Connector(AbstractConnector): payload = f'{self.prefix}{path}:{value}|{type_code}' try: self.processor.enqueue(payload.encode('utf-8')) + self._enqueue_log_guard.reset() except asyncio.QueueFull: - self.logger.warning('statsd queue is full, discarding metric') + if self._enqueue_log_guard.allow_execution(): + self.logger.warning('statsd queue is full, discarding metric') class StatsdProtocol(asyncio.BaseProtocol): @@ -389,6 +433,7 @@ class Processor: self.host = host self.port = port self._ip_protocol = ip_protocol + self._connect_log_guard = ThrottleGuard(100) self._reconnect_sleep = reconnect_sleep self._wait_timeout = wait_timeout @@ -479,14 +524,21 @@ class Processor: buffered_data = b'' if self.protocol is not None: buffered_data = self.protocol.buffered_data + t, p = await self._create_transport() # type: ignore[misc] transport, self.protocol = t, p self.protocol.buffered_data = buffered_data - self.logger.info('connection established to %s', - transport.get_extra_info('peername')) + self.logger.info( + 'connection established to %s after %s attempts', + transport.get_extra_info('peername'), + self._connect_log_guard.counter) + self._connect_log_guard.reset() except IOError as error: - self.logger.warning('connection to %s:%s failed: %s', - self.host, self.port, error) + if self._connect_log_guard.allow_execution(): + self.logger.warning( + 'connection to %s:%s failed: %s (%s attempts)', + self.host, self.port, error, + self._connect_log_guard.counter) await asyncio.sleep(self._reconnect_sleep) async def _process_metric(self) -> None: diff --git a/tests/test_processor.py b/tests/test_processor.py index 4b802da..fa0b1a8 100644 --- a/tests/test_processor.py +++ b/tests/test_processor.py @@ -3,6 +3,7 @@ import logging import socket import time import typing +import unittest.mock import asynctest @@ -203,6 +204,17 @@ class TCPProcessingTests(ProcessorTestCase): self.processor.queue.put_nowait(b'counter:1|c') await self.wait_for(self.statsd_server.message_received.acquire()) + async def test_that_disconnected_logging_is_throttled(self): + self.statsd_server.close() + await self.statsd_server.wait_closed() + + self.processor.logger = unittest.mock.Mock() + self.processor._connect_log_guard.threshold = 10 + self.processor._reconnect_sleep = 0 + while self.processor._connect_log_guard.counter < (20 + 1): + await asyncio.sleep(0) + self.assertLess(self.processor.logger.warning.call_count, 20) + class UDPProcessingTests(ProcessorTestCase): ip_protocol = socket.IPPROTO_UDP @@ -354,6 +366,23 @@ class ConnectorTests(ProcessorTestCase): self.assertEqual(f'counters.counter:{value}|c'.encode(), self.statsd_server.metrics.pop(0)) + async def test_that_queue_full_logging_is_throttled(self): + await self.connector.processor.stop() + + self.connector.logger = unittest.mock.Mock() + self.connector._enqueue_log_guard.threshold = 10 + + # fill up the queue + for _ in range(self.connector.processor.queue.maxsize): + self.connector.incr('counter') + + # then overflow it a bunch of times + overflow_count = self.connector._enqueue_log_guard.threshold * 5 + for value in range(overflow_count): + self.connector.incr('counter') + self.assertLess(self.connector.logger.warning.call_count, + overflow_count) + class ConnectorOptionTests(ProcessorTestCase): ip_protocol = socket.IPPROTO_TCP