From a48453b70e6ce6995f876ba2b4fda5447258eedb Mon Sep 17 00:00:00 2001 From: Dave Shawley Date: Sat, 24 Apr 2021 09:15:50 -0400 Subject: [PATCH 1/5] Add compatibility with sprockets.http. --- CHANGELOG.rst | 2 + README.rst | 36 ++++++++++++++++ setup.cfg | 1 + sprockets_statsd/tornado.py | 45 ++++++++++++++++--- tests/test_tornado.py | 86 +++++++++++++++++++++++++++++-------- 5 files changed, 147 insertions(+), 23 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index bcb20c5..8f08110 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,6 +1,8 @@ :tag:`Next release <0.0.1...main>` ---------------------------------- - Added :envvar:`STATSD_ENABLED` environment variable to disable the Tornado integration +- Tornado application mixin automatically installs start/stop hooks if the application + quacks like a ``sprockets.http.app.Application``. :tag:`0.0.1 <832f8af7...0.0.1>` (08-Apr-2021) --------------------------------------------- diff --git a/README.rst b/README.rst index 2e380d7..117fb64 100644 --- a/README.rst +++ b/README.rst @@ -115,6 +115,42 @@ Metrics are sent by a ``asyncio.Task`` that is started by ``start_statsd``. The metric data onto a ``asyncio.Queue`` that the task reads from. Metric data remains on the queue when the task is not connected to the server and will be sent in the order received when the task establishes the server connection. +Integration with sprockets.http +=============================== +If you use `sprockets.http`_ in your application stack, then the Tornado integration will detect it and install the +initialization and shutdown hooks for you. The application will *just work* provided that the `$STATSD_HOST` +and `$STATSD_PREFIX` environment variables are set appropriately. The following snippet will produce the same result +as the Tornado example even without setting the prefix: + +.. code-block:: python + + class Application(sprockets_statsd.tornado.Application, + sprockets.http.app.Application): + def __init__(self, **settings): + statsd = settings.setdefault('statsd', {}) + statsd.setdefault('host', os.environ['STATSD_HOST']) + statsd.setdefault('protocol', 'tcp') + settings.update({ + 'service': 'my-service', + 'environment': os.environ.get('ENVIRONMENT', 'development'), + 'statsd': statsd, + 'version': getattr(__package__, 'version'), + }) + super().__init__([web.url('/', MyHandler)], **settings) + + if __name__ == '__main__': + sprockets.http.run(Application, log_config=...) + +Definint the ``service`` and ``environment`` in `settings` as above will result in the prefix being set to:: + + applications.{self.settings["service"]}.{self.settings["environment"]} + +The recommended usage is to: + +#. define ``service``, ``environment``, and ``version`` in the settings +#. explicitly set the ``host`` and ``protocol`` settings in ``self.settings["statsd"]`` + +.. _sprockets.http: https://sprocketshttp.readthedocs.io/en/master/ .. _statsd: https://github.com/statsd/statsd/ .. _tornado: https://tornadoweb.org/ diff --git a/setup.cfg b/setup.cfg index 5218f4c..ec84465 100644 --- a/setup.cfg +++ b/setup.cfg @@ -41,6 +41,7 @@ dev = flake8-import-order==0.18.1 sphinx==3.5.2 sphinx-autodoc-typehints==1.11.1 + sprockets.http==2.2.0 tornado>=5 yapf==0.30.0 readthedocs = diff --git a/sprockets_statsd/tornado.py b/sprockets_statsd/tornado.py index 22e5586..316dd7d 100644 --- a/sprockets_statsd/tornado.py +++ b/sprockets_statsd/tornado.py @@ -1,10 +1,11 @@ import contextlib +import logging import os import socket import time import typing -from tornado import web +from tornado import ioloop, web from sprockets_statsd import statsd @@ -114,7 +115,13 @@ class Application(web.Application): self.settings['statsd']['port'] = int(self.settings['statsd']['port']) self.statsd_connector = None - async def start_statsd(self) -> None: + try: + self.on_start_callbacks.append(self.start_statsd) + self.on_shutdown_callbacks.append(self.stop_statsd) + except AttributeError: + pass + + async def start_statsd(self, *_) -> None: """Start the connector during startup. Call this method during application startup to enable the statsd @@ -124,7 +131,9 @@ class Application(web.Application): """ if self.statsd_connector is None: + logger = self.__get_logger() if not self.settings['statsd']['enabled']: + logger.info('statsd connector is disabled by configuration') self.statsd_connector = statsd.AbstractConnector() else: kwargs = self.settings['statsd'].copy() @@ -135,14 +144,20 @@ class Application(web.Application): elif protocol == 'udp': kwargs['ip_protocol'] = socket.IPPROTO_UDP else: - raise RuntimeError( + return self.__handle_fatal_error( f'statsd configuration error: {protocol} is not ' f'a valid protocol') - self.statsd_connector = statsd.Connector(**kwargs) + + logger.info('creating %s statsd connector', protocol.upper()) + try: + self.statsd_connector = statsd.Connector(**kwargs) + except RuntimeError as error: + return self.__handle_fatal_error( + 'statsd.Connector failed to start', error) await self.statsd_connector.start() - async def stop_statsd(self) -> None: + async def stop_statsd(self, *_) -> None: """Stop the connector during shutdown. If the connector was started, then this method will gracefully @@ -154,6 +169,26 @@ class Application(web.Application): await self.statsd_connector.stop() self.statsd_connector = None + def __handle_fatal_error(self, + message: str, + exc: typing.Optional[Exception] = None): + logger = self.__get_logger() + if exc is not None: + logger.exception('%s', message) + else: + logger.error('%s', message) + if hasattr(self, 'stop'): + self.stop(ioloop.IOLoop.current()) + else: + raise RuntimeError(message) + + def __get_logger(self) -> logging.Logger: + try: + return getattr(self, 'logger') + except AttributeError: + return logging.getLogger(__package__).getChild( + 'tornado.Application') + class RequestHandler(web.RequestHandler): """Mix this into your handler to send metrics to a statsd server.""" diff --git a/tests/test_tornado.py b/tests/test_tornado.py index 14ba997..108f05a 100644 --- a/tests/test_tornado.py +++ b/tests/test_tornado.py @@ -3,7 +3,10 @@ import os import socket import time import typing +import unittest.mock +import sprockets.http.app +import sprockets.http.testing from tornado import testing, web import sprockets_statsd.statsd @@ -255,30 +258,30 @@ class ApplicationTests(AsyncTestCaseWithTimeout): sprockets_statsd.statsd.AbstractConnector) -class RequestHandlerTests(AsyncTestCaseWithTimeout, testing.AsyncHTTPTestCase): - def setUp(self): - super().setUp() - self.statsd_server = helpers.StatsdServer(socket.IPPROTO_TCP) - self.io_loop.spawn_callback(self.statsd_server.run) - self.run_coroutine(self.statsd_server.wait_running()) +class StatsdTestCase(AsyncTestCaseWithTimeout, testing.AsyncHTTPTestCase): + Application = web.Application - self.app.settings['statsd'].update({ - 'host': self.statsd_server.host, - 'port': self.statsd_server.port, - }) - self.run_coroutine(self.app.start_statsd()) + def setUp(self): + self.statsd_server = None + super().setUp() def tearDown(self): - self.run_coroutine(self.app.stop_statsd()) - self.statsd_server.close() - self.run_coroutine(self.statsd_server.wait_closed()) + if self.statsd_server is not None: + self.statsd_server.close() + self.run_coroutine(self.statsd_server.wait_closed()) super().tearDown() def get_app(self): - self.app = Application(statsd={ - 'prefix': 'applications.service', - 'protocol': 'tcp', - }) + self.statsd_server = helpers.StatsdServer(socket.IPPROTO_TCP) + self.io_loop.spawn_callback(self.statsd_server.run) + self.run_coroutine(self.statsd_server.wait_running()) + self.app = self.Application(shutdown_limit=0.5, + statsd={ + 'host': self.statsd_server.host, + 'port': self.statsd_server.port, + 'prefix': 'applications.service', + 'protocol': 'tcp', + }) return self.app def wait_for_metrics(self, metric_count=3): @@ -310,6 +313,19 @@ class RequestHandlerTests(AsyncTestCaseWithTimeout, testing.AsyncHTTPTestCase): return self.parse_metric(line) self.fail(f'failed to find metric containing {needle!r}') + +class RequestHandlerTests(StatsdTestCase, AsyncTestCaseWithTimeout, + testing.AsyncHTTPTestCase): + Application = Application + + def setUp(self): + super().setUp() + self.run_coroutine(self.app.start_statsd()) + + def tearDown(self): + self.run_coroutine(self.app.stop_statsd()) + super().tearDown() + def test_the_request_metric_is_sent_last(self): rsp = self.fetch('/') self.assertEqual(200, rsp.code) @@ -343,3 +359,37 @@ class RequestHandlerTests(AsyncTestCaseWithTimeout, testing.AsyncHTTPTestCase): rsp = self.fetch('/') self.assertEqual(200, rsp.code) + + +class SprocketsHttpInteropTests(StatsdTestCase, AsyncTestCaseWithTimeout, + sprockets.http.testing.SprocketsHttpTestCase): + class Application(sprockets_statsd.tornado.Application, + sprockets.http.app.Application): + def __init__(self, **settings): + super().__init__([web.url('/', Handler)], **settings) + + def setUp(self): + super().setUp() + self.let_callbacks_run() + + def let_callbacks_run(self): + self.io_loop.run_sync(lambda: asyncio.sleep(0)) + + def test_that_callbacks_are_installed(self): + self.assertIn(self.app.start_statsd, self.app.on_start_callbacks) + self.assertIn(self.app.stop_statsd, self.app.on_shutdown_callbacks) + + def test_that_statsd_connector_is_enabled(self): + # verifies that the start callback actually runs correctly. + self.assertIsNotNone(self.app.statsd_connector, + 'statsd_connecter was never created') + + def test_that_misconfiguration_stops_application(self): + another_app = self.Application(statsd={ + 'host': '', + 'prefix': 'whatever', + }) + another_app.stop = unittest.mock.Mock(wraps=another_app.stop) + another_app.start(self.io_loop) + self.let_callbacks_run() + another_app.stop.assert_called_once_with(self.io_loop) From ed67689fe2ad25946008381bd2c66827a2516edb Mon Sep 17 00:00:00 2001 From: Dave Shawley Date: Sun, 9 May 2021 15:48:18 -0400 Subject: [PATCH 2/5] Limit logging when disconnected. Instead of logging a warning every time that the connection fails, only log the first 100 of them, then log every 100th time thereafter. --- CHANGELOG.rst | 1 + sprockets_statsd/statsd.py | 62 +++++++++++++++++++++++++++++++++++--- tests/test_processor.py | 29 ++++++++++++++++++ 3 files changed, 87 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 8f08110..425699c 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -3,6 +3,7 @@ - Added :envvar:`STATSD_ENABLED` environment variable to disable the Tornado integration - Tornado application mixin automatically installs start/stop hooks if the application quacks like a ``sprockets.http.app.Application``. +- Limit logging when disconnected from statsd :tag:`0.0.1 <832f8af7...0.0.1>` (08-Apr-2021) --------------------------------------------- diff --git a/sprockets_statsd/statsd.py b/sprockets_statsd/statsd.py index d8439af..513803e 100644 --- a/sprockets_statsd/statsd.py +++ b/sprockets_statsd/statsd.py @@ -4,6 +4,47 @@ import socket import typing +class ThrottleGuard: + """Prevent code from executing repeatedly. + + :param threshold: guarding threshold + + This abstraction allows code to execute the first "threshold" + times and then only once per "threshold" times afterwards. Use + it to ensure that log statements are continuously written during + persistent error conditions. The goal is to provide regular + feedback while limiting the amount of log spam. + + The following snippet will log the first 100 failures and then + once every 100 failures thereafter: + + .. code-block:: python + + executions = 0 + guard = ThrottleGuard(100) + for _ in range(1000): + if guard.allow_execution(): + executions += 1 + logging.info('called %s times instead of %s times', + executions, guard.counter) + + """ + def __init__(self, threshold: int): + self.counter = 0 + self.threshold = threshold + + def allow_execution(self) -> bool: + """Should this execution be allowed?""" + self.counter += 1 + allow = (self.counter < self.threshold + or (self.counter % self.threshold) == 0) + return allow + + def reset(self) -> None: + """Reset counter after error has resolved.""" + self.counter = 0 + + class AbstractConnector: """StatsD connector that does not send metrics or connect. @@ -137,6 +178,7 @@ class Connector(AbstractConnector): self.logger = logging.getLogger(__package__).getChild('Connector') self.prefix = f'{prefix}.' if prefix else prefix self.processor = Processor(host=host, port=port, **kwargs) + self._enqueue_log_guard = ThrottleGuard(100) self._processor_task: typing.Optional[asyncio.Task[None]] = None async def start(self) -> None: @@ -174,8 +216,10 @@ class Connector(AbstractConnector): payload = f'{self.prefix}{path}:{value}|{type_code}' try: self.processor.enqueue(payload.encode('utf-8')) + self._enqueue_log_guard.reset() except asyncio.QueueFull: - self.logger.warning('statsd queue is full, discarding metric') + if self._enqueue_log_guard.allow_execution(): + self.logger.warning('statsd queue is full, discarding metric') class StatsdProtocol(asyncio.BaseProtocol): @@ -389,6 +433,7 @@ class Processor: self.host = host self.port = port self._ip_protocol = ip_protocol + self._connect_log_guard = ThrottleGuard(100) self._reconnect_sleep = reconnect_sleep self._wait_timeout = wait_timeout @@ -479,14 +524,21 @@ class Processor: buffered_data = b'' if self.protocol is not None: buffered_data = self.protocol.buffered_data + t, p = await self._create_transport() # type: ignore[misc] transport, self.protocol = t, p self.protocol.buffered_data = buffered_data - self.logger.info('connection established to %s', - transport.get_extra_info('peername')) + self.logger.info( + 'connection established to %s after %s attempts', + transport.get_extra_info('peername'), + self._connect_log_guard.counter) + self._connect_log_guard.reset() except IOError as error: - self.logger.warning('connection to %s:%s failed: %s', - self.host, self.port, error) + if self._connect_log_guard.allow_execution(): + self.logger.warning( + 'connection to %s:%s failed: %s (%s attempts)', + self.host, self.port, error, + self._connect_log_guard.counter) await asyncio.sleep(self._reconnect_sleep) async def _process_metric(self) -> None: diff --git a/tests/test_processor.py b/tests/test_processor.py index 4b802da..fa0b1a8 100644 --- a/tests/test_processor.py +++ b/tests/test_processor.py @@ -3,6 +3,7 @@ import logging import socket import time import typing +import unittest.mock import asynctest @@ -203,6 +204,17 @@ class TCPProcessingTests(ProcessorTestCase): self.processor.queue.put_nowait(b'counter:1|c') await self.wait_for(self.statsd_server.message_received.acquire()) + async def test_that_disconnected_logging_is_throttled(self): + self.statsd_server.close() + await self.statsd_server.wait_closed() + + self.processor.logger = unittest.mock.Mock() + self.processor._connect_log_guard.threshold = 10 + self.processor._reconnect_sleep = 0 + while self.processor._connect_log_guard.counter < (20 + 1): + await asyncio.sleep(0) + self.assertLess(self.processor.logger.warning.call_count, 20) + class UDPProcessingTests(ProcessorTestCase): ip_protocol = socket.IPPROTO_UDP @@ -354,6 +366,23 @@ class ConnectorTests(ProcessorTestCase): self.assertEqual(f'counters.counter:{value}|c'.encode(), self.statsd_server.metrics.pop(0)) + async def test_that_queue_full_logging_is_throttled(self): + await self.connector.processor.stop() + + self.connector.logger = unittest.mock.Mock() + self.connector._enqueue_log_guard.threshold = 10 + + # fill up the queue + for _ in range(self.connector.processor.queue.maxsize): + self.connector.incr('counter') + + # then overflow it a bunch of times + overflow_count = self.connector._enqueue_log_guard.threshold * 5 + for value in range(overflow_count): + self.connector.incr('counter') + self.assertLess(self.connector.logger.warning.call_count, + overflow_count) + class ConnectorOptionTests(ProcessorTestCase): ip_protocol = socket.IPPROTO_TCP From 89d20a5f89bee184348396246698793dcab6826c Mon Sep 17 00:00:00 2001 From: Dave Shawley Date: Mon, 10 May 2021 06:45:19 -0400 Subject: [PATCH 3/5] Remove unused variable in test. --- tests/test_processor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_processor.py b/tests/test_processor.py index fa0b1a8..ab69c5e 100644 --- a/tests/test_processor.py +++ b/tests/test_processor.py @@ -378,7 +378,7 @@ class ConnectorTests(ProcessorTestCase): # then overflow it a bunch of times overflow_count = self.connector._enqueue_log_guard.threshold * 5 - for value in range(overflow_count): + for _ in range(overflow_count): self.connector.incr('counter') self.assertLess(self.connector.logger.warning.call_count, overflow_count) From ea97d07341f41fa67fcc29c0cd065c2ebf21cd3b Mon Sep 17 00:00:00 2001 From: Dave Shawley Date: Mon, 10 May 2021 07:22:18 -0400 Subject: [PATCH 4/5] Switch to manual sonarqube analysis. AFAICT the automatic analysis does not wait for coverage reports to be generated. --- .github/workflows/run-tests.yml | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/.github/workflows/run-tests.yml b/.github/workflows/run-tests.yml index bf21fd4..e749d7d 100644 --- a/.github/workflows/run-tests.yml +++ b/.github/workflows/run-tests.yml @@ -15,6 +15,8 @@ jobs: python-version: [3.7, 3.8, 3.9] steps: - uses: actions/checkout@v2 + with: + fetch-depth: 0 # sonar wants a "deep" clone - name: Install python ${{ matrix.python-version }} uses: actions/setup-python@v2 with: @@ -42,6 +44,12 @@ jobs: file: ./coverage.xml flags: unittests fail_ci_if_error: true + - name: Run sonarqube scan + if: ${{ matrix.python-version == '3.9' }} + uses: SonarSource/sonarcloud-github-action@v1.3 + env: + GITHUB_TOKEN: ${{secrets.GITHUB_TOKEN}} + SONAR_TOKEN: ${{secrets.SONAR_TOKEN}} - name: Save coverage report if: ${{ matrix.python-version == '3.9' }} uses: actions/upload-artifact@v2 From c7a3a834c028648e32b0e77b49611d7ae26b3bb5 Mon Sep 17 00:00:00 2001 From: Dave Shawley Date: Mon, 10 May 2021 07:47:56 -0400 Subject: [PATCH 5/5] Remove explicit sonar scan of PR. I cannot for the life of me figure out how to get the sonar token to be available in the PR. Setting it in the project doesn't work for PRs since tokens are not available to PRs run from forks. Setting it in my fork didn't work either. It is available via codecov so /shrug --- .github/workflows/run-tests.yml | 8 -------- 1 file changed, 8 deletions(-) diff --git a/.github/workflows/run-tests.yml b/.github/workflows/run-tests.yml index e749d7d..bf21fd4 100644 --- a/.github/workflows/run-tests.yml +++ b/.github/workflows/run-tests.yml @@ -15,8 +15,6 @@ jobs: python-version: [3.7, 3.8, 3.9] steps: - uses: actions/checkout@v2 - with: - fetch-depth: 0 # sonar wants a "deep" clone - name: Install python ${{ matrix.python-version }} uses: actions/setup-python@v2 with: @@ -44,12 +42,6 @@ jobs: file: ./coverage.xml flags: unittests fail_ci_if_error: true - - name: Run sonarqube scan - if: ${{ matrix.python-version == '3.9' }} - uses: SonarSource/sonarcloud-github-action@v1.3 - env: - GITHUB_TOKEN: ${{secrets.GITHUB_TOKEN}} - SONAR_TOKEN: ${{secrets.SONAR_TOKEN}} - name: Save coverage report if: ${{ matrix.python-version == '3.9' }} uses: actions/upload-artifact@v2