Limit logging when disconnected.

Instead of logging a warning every time that the connection fails, only
log the first 100 of them, then log every 100th time thereafter.
This commit is contained in:
Dave Shawley 2021-05-09 15:48:18 -04:00
parent a48453b70e
commit ed67689fe2
No known key found for this signature in database
GPG key ID: F41A8A99298F8EED
3 changed files with 87 additions and 5 deletions

View file

@ -3,6 +3,7 @@
- Added :envvar:`STATSD_ENABLED` environment variable to disable the Tornado integration - Added :envvar:`STATSD_ENABLED` environment variable to disable the Tornado integration
- Tornado application mixin automatically installs start/stop hooks if the application - Tornado application mixin automatically installs start/stop hooks if the application
quacks like a ``sprockets.http.app.Application``. quacks like a ``sprockets.http.app.Application``.
- Limit logging when disconnected from statsd
:tag:`0.0.1 <832f8af7...0.0.1>` (08-Apr-2021) :tag:`0.0.1 <832f8af7...0.0.1>` (08-Apr-2021)
--------------------------------------------- ---------------------------------------------

View file

@ -4,6 +4,47 @@ import socket
import typing import typing
class ThrottleGuard:
"""Prevent code from executing repeatedly.
:param threshold: guarding threshold
This abstraction allows code to execute the first "threshold"
times and then only once per "threshold" times afterwards. Use
it to ensure that log statements are continuously written during
persistent error conditions. The goal is to provide regular
feedback while limiting the amount of log spam.
The following snippet will log the first 100 failures and then
once every 100 failures thereafter:
.. code-block:: python
executions = 0
guard = ThrottleGuard(100)
for _ in range(1000):
if guard.allow_execution():
executions += 1
logging.info('called %s times instead of %s times',
executions, guard.counter)
"""
def __init__(self, threshold: int):
self.counter = 0
self.threshold = threshold
def allow_execution(self) -> bool:
"""Should this execution be allowed?"""
self.counter += 1
allow = (self.counter < self.threshold
or (self.counter % self.threshold) == 0)
return allow
def reset(self) -> None:
"""Reset counter after error has resolved."""
self.counter = 0
class AbstractConnector: class AbstractConnector:
"""StatsD connector that does not send metrics or connect. """StatsD connector that does not send metrics or connect.
@ -137,6 +178,7 @@ class Connector(AbstractConnector):
self.logger = logging.getLogger(__package__).getChild('Connector') self.logger = logging.getLogger(__package__).getChild('Connector')
self.prefix = f'{prefix}.' if prefix else prefix self.prefix = f'{prefix}.' if prefix else prefix
self.processor = Processor(host=host, port=port, **kwargs) self.processor = Processor(host=host, port=port, **kwargs)
self._enqueue_log_guard = ThrottleGuard(100)
self._processor_task: typing.Optional[asyncio.Task[None]] = None self._processor_task: typing.Optional[asyncio.Task[None]] = None
async def start(self) -> None: async def start(self) -> None:
@ -174,7 +216,9 @@ class Connector(AbstractConnector):
payload = f'{self.prefix}{path}:{value}|{type_code}' payload = f'{self.prefix}{path}:{value}|{type_code}'
try: try:
self.processor.enqueue(payload.encode('utf-8')) self.processor.enqueue(payload.encode('utf-8'))
self._enqueue_log_guard.reset()
except asyncio.QueueFull: except asyncio.QueueFull:
if self._enqueue_log_guard.allow_execution():
self.logger.warning('statsd queue is full, discarding metric') self.logger.warning('statsd queue is full, discarding metric')
@ -389,6 +433,7 @@ class Processor:
self.host = host self.host = host
self.port = port self.port = port
self._ip_protocol = ip_protocol self._ip_protocol = ip_protocol
self._connect_log_guard = ThrottleGuard(100)
self._reconnect_sleep = reconnect_sleep self._reconnect_sleep = reconnect_sleep
self._wait_timeout = wait_timeout self._wait_timeout = wait_timeout
@ -479,14 +524,21 @@ class Processor:
buffered_data = b'' buffered_data = b''
if self.protocol is not None: if self.protocol is not None:
buffered_data = self.protocol.buffered_data buffered_data = self.protocol.buffered_data
t, p = await self._create_transport() # type: ignore[misc] t, p = await self._create_transport() # type: ignore[misc]
transport, self.protocol = t, p transport, self.protocol = t, p
self.protocol.buffered_data = buffered_data self.protocol.buffered_data = buffered_data
self.logger.info('connection established to %s', self.logger.info(
transport.get_extra_info('peername')) 'connection established to %s after %s attempts',
transport.get_extra_info('peername'),
self._connect_log_guard.counter)
self._connect_log_guard.reset()
except IOError as error: except IOError as error:
self.logger.warning('connection to %s:%s failed: %s', if self._connect_log_guard.allow_execution():
self.host, self.port, error) self.logger.warning(
'connection to %s:%s failed: %s (%s attempts)',
self.host, self.port, error,
self._connect_log_guard.counter)
await asyncio.sleep(self._reconnect_sleep) await asyncio.sleep(self._reconnect_sleep)
async def _process_metric(self) -> None: async def _process_metric(self) -> None:

View file

@ -3,6 +3,7 @@ import logging
import socket import socket
import time import time
import typing import typing
import unittest.mock
import asynctest import asynctest
@ -203,6 +204,17 @@ class TCPProcessingTests(ProcessorTestCase):
self.processor.queue.put_nowait(b'counter:1|c') self.processor.queue.put_nowait(b'counter:1|c')
await self.wait_for(self.statsd_server.message_received.acquire()) await self.wait_for(self.statsd_server.message_received.acquire())
async def test_that_disconnected_logging_is_throttled(self):
self.statsd_server.close()
await self.statsd_server.wait_closed()
self.processor.logger = unittest.mock.Mock()
self.processor._connect_log_guard.threshold = 10
self.processor._reconnect_sleep = 0
while self.processor._connect_log_guard.counter < (20 + 1):
await asyncio.sleep(0)
self.assertLess(self.processor.logger.warning.call_count, 20)
class UDPProcessingTests(ProcessorTestCase): class UDPProcessingTests(ProcessorTestCase):
ip_protocol = socket.IPPROTO_UDP ip_protocol = socket.IPPROTO_UDP
@ -354,6 +366,23 @@ class ConnectorTests(ProcessorTestCase):
self.assertEqual(f'counters.counter:{value}|c'.encode(), self.assertEqual(f'counters.counter:{value}|c'.encode(),
self.statsd_server.metrics.pop(0)) self.statsd_server.metrics.pop(0))
async def test_that_queue_full_logging_is_throttled(self):
await self.connector.processor.stop()
self.connector.logger = unittest.mock.Mock()
self.connector._enqueue_log_guard.threshold = 10
# fill up the queue
for _ in range(self.connector.processor.queue.maxsize):
self.connector.incr('counter')
# then overflow it a bunch of times
overflow_count = self.connector._enqueue_log_guard.threshold * 5
for value in range(overflow_count):
self.connector.incr('counter')
self.assertLess(self.connector.logger.warning.call_count,
overflow_count)
class ConnectorOptionTests(ProcessorTestCase): class ConnectorOptionTests(ProcessorTestCase):
ip_protocol = socket.IPPROTO_TCP ip_protocol = socket.IPPROTO_TCP