From da95efa5446fe7ea418d7b763184cae62162df00 Mon Sep 17 00:00:00 2001 From: Dave Shawley Date: Tue, 30 Mar 2021 07:56:20 -0400 Subject: [PATCH] Move path prefixing into Connector. This makes the low-level code a little more opinionated. If there is a good reason for making `timers`, `counters`, etc. optional, then we can PR it into place. --- sprockets_statsd/statsd.py | 44 ++++++++++++++++++++++++++++++------- sprockets_statsd/tornado.py | 12 +++------- tests/test_processor.py | 31 ++++++++++++++------------ tests/test_tornado.py | 34 ++++++++++++++-------------- 4 files changed, 72 insertions(+), 49 deletions(-) diff --git a/sprockets_statsd/statsd.py b/sprockets_statsd/statsd.py index 6329e8b..43cd329 100644 --- a/sprockets_statsd/statsd.py +++ b/sprockets_statsd/statsd.py @@ -12,6 +12,7 @@ class Connector: :keyword ip_protocol: IP protocol to use for the underlying socket -- either ``socket.IPPROTO_TCP`` for TCP or ``socket.IPPROTO_UDP`` for UDP sockets. + :keyword prefix: optional string to prepend to metric paths :param kwargs: additional keyword parameters are passed to the :class:`.Processor` initializer @@ -23,11 +24,34 @@ class Connector: terminating to ensure that all metrics are flushed to the statsd server. - When the connector is *should_terminate*, metric payloads are sent by - calling the :meth:`.inject_metric` method. The payloads are - stored in an internal queue that is consumed whenever the + Metrics are optionally prefixed with :attr:`prefix` before the + metric type prefix. This *should* be used to prevent metrics + from being overwritten when multiple applications share a StatsD + instance. Each metric type is also prefixed by one of the + following strings based on the metric type: + + +-------------------+---------------+-----------+ + | Method call | Prefix | Type code | + +-------------------+---------------+-----------+ + | :meth:`.incr` | ``counters.`` | ``c`` | + +-------------------+---------------+-----------+ + | :meth:`.decr` | ``counters.`` | ``c`` | + +-------------------+---------------+-----------+ + | :meth:`.gauge` | ``gauges.`` | ``g`` | + +-------------------+---------------+-----------+ + | :meth:`.timing` | ``timers.`` | ``ms`` | + +-------------------+---------------+-----------+ + + When the connector is *should_terminate*, metric payloads are + sent by calling the :meth:`.inject_metric` method. The payloads + are stored in an internal queue that is consumed whenever the connection to the server is active. + .. attribute:: prefix + :type: str + + String to prefix to all metrics *before* the metric type prefix. + .. attribute:: processor :type: Processor @@ -36,13 +60,17 @@ class Connector: """ logger: logging.Logger + prefix: str processor: 'Processor' def __init__(self, host: str, port: int = 8125, + *, + prefix: str = '', **kwargs: typing.Any) -> None: self.logger = logging.getLogger(__package__).getChild('Connector') + self.prefix = f'{prefix}.' if prefix else prefix self.processor = Processor(host=host, port=port, **kwargs) self._processor_task: typing.Optional[asyncio.Task[None]] = None @@ -74,7 +102,7 @@ class Connector: :param value: amount to increment the counter by """ - self.inject_metric(path, str(value), 'c') + self.inject_metric(f'counters.{path}', str(value), 'c') def decr(self, path: str, value: int = 1) -> None: """Decrement a counter metric. @@ -85,7 +113,7 @@ class Connector: This is equivalent to ``self.incr(path, -value)``. """ - self.inject_metric(path, str(-value), 'c') + self.inject_metric(f'counters.{path}', str(-value), 'c') def gauge(self, path: str, value: int, delta: bool = False) -> None: """Manipulate a gauge metric. @@ -103,7 +131,7 @@ class Connector: payload = f'{value:+d}' else: payload = str(value) - self.inject_metric(path, payload, 'g') + self.inject_metric(f'gauges.{path}', payload, 'g') def timing(self, path: str, seconds: float) -> None: """Send a timer metric. @@ -112,7 +140,7 @@ class Connector: :param seconds: number of **seconds** to record """ - self.inject_metric(path, str(seconds * 1000.0), 'ms') + self.inject_metric(f'timers.{path}', str(seconds * 1000.0), 'ms') def inject_metric(self, path: str, value: str, type_code: str) -> None: """Send a metric to the statsd server. @@ -125,7 +153,7 @@ class Connector: internal queue for future processing. """ - payload = f'{path}:{value}|{type_code}' + payload = f'{self.prefix}{path}:{value}|{type_code}' try: self.processor.enqueue(payload.encode('utf-8')) except asyncio.QueueFull: diff --git a/sprockets_statsd/tornado.py b/sprockets_statsd/tornado.py index 0da1cbc..5e368c6 100644 --- a/sprockets_statsd/tornado.py +++ b/sprockets_statsd/tornado.py @@ -121,7 +121,6 @@ class Application(web.Application): raise RuntimeError(f'statsd configuration error: {protocol} ' f'is not a valid protocol') - kwargs.pop('prefix', None) # TODO move prefixing into Connector self.statsd_connector = statsd.Connector(**kwargs) await self.statsd_connector.start() @@ -148,10 +147,7 @@ class RequestHandler(web.RequestHandler): self.statsd_connector = self.application.statsd_connector def __build_path(self, *path: typing.Any) -> str: - full_path = '.'.join(str(c) for c in path) - if self.settings.get('statsd', {}).get('prefix', ''): - return f'{self.settings["statsd"]["prefix"]}.{full_path}' - return full_path + return '.'.join(str(c) for c in path) def record_timing(self, secs: float, *path: typing.Any) -> None: """Record the duration. @@ -161,8 +157,7 @@ class RequestHandler(web.RequestHandler): """ if self.statsd_connector is not None: - self.statsd_connector.timing(self.__build_path('timers', *path), - secs) + self.statsd_connector.timing(self.__build_path(*path), secs) def increase_counter(self, *path: typing.Any, amount: int = 1) -> None: """Adjust a counter. @@ -173,8 +168,7 @@ class RequestHandler(web.RequestHandler): """ if self.statsd_connector is not None: - self.statsd_connector.incr(self.__build_path('counters', *path), - amount) + self.statsd_connector.incr(self.__build_path(*path), amount) @contextlib.contextmanager def execution_timer( diff --git a/tests/test_processor.py b/tests/test_processor.py index 0825abd..4b802da 100644 --- a/tests/test_processor.py +++ b/tests/test_processor.py @@ -225,8 +225,9 @@ class UDPProcessingTests(ProcessorTestCase): await self.wait_for(self.statsd_server.message_received.acquire()) await self.wait_for(self.statsd_server.message_received.acquire()) - self.assertEqual(self.statsd_server.metrics[0], b'counter:1|c') - self.assertEqual(self.statsd_server.metrics[1], b'timer:1.0|ms') + self.assertEqual(self.statsd_server.metrics[0], + b'counters.counter:1|c') + self.assertEqual(self.statsd_server.metrics[1], b'timers.timer:1.0|ms') async def test_that_client_sends_to_new_server(self): self.statsd_server.close() @@ -240,7 +241,8 @@ class UDPProcessingTests(ProcessorTestCase): self.connector.incr('should.be.recvd') await self.wait_for(self.statsd_server.message_received.acquire()) - self.assertEqual(self.statsd_server.metrics[0], b'should.be.recvd:1|c') + self.assertEqual(self.statsd_server.metrics[0], + b'counters.should.be.recvd:1|c') async def test_that_client_handles_socket_closure(self): self.connector.processor.protocol.transport.close() @@ -249,7 +251,8 @@ class UDPProcessingTests(ProcessorTestCase): self.connector.incr('should.be.recvd') await self.wait_for(self.statsd_server.message_received.acquire()) - self.assertEqual(self.statsd_server.metrics[0], b'should.be.recvd:1|c') + self.assertEqual(self.statsd_server.metrics[0], + b'counters.should.be.recvd:1|c') class ConnectorTests(ProcessorTestCase): @@ -278,22 +281,22 @@ class ConnectorTests(ProcessorTestCase): self.connector.incr('simple.counter') await self.wait_for(self.statsd_server.message_received.acquire()) self.assert_metrics_equal(self.statsd_server.metrics[-1], - 'simple.counter', 1, 'c') + 'counters.simple.counter', 1, 'c') self.connector.incr('simple.counter', 10) await self.wait_for(self.statsd_server.message_received.acquire()) self.assert_metrics_equal(self.statsd_server.metrics[-1], - 'simple.counter', 10, 'c') + 'counters.simple.counter', 10, 'c') self.connector.decr('simple.counter') await self.wait_for(self.statsd_server.message_received.acquire()) self.assert_metrics_equal(self.statsd_server.metrics[-1], - 'simple.counter', -1, 'c') + 'counters.simple.counter', -1, 'c') self.connector.decr('simple.counter', 10) await self.wait_for(self.statsd_server.message_received.acquire()) self.assert_metrics_equal(self.statsd_server.metrics[-1], - 'simple.counter', -10, 'c') + 'counters.simple.counter', -10, 'c') async def test_adjusting_gauge(self): self.connector.gauge('simple.gauge', 100) @@ -303,18 +306,18 @@ class ConnectorTests(ProcessorTestCase): await self.wait_for(self.statsd_server.message_received.acquire()) self.assert_metrics_equal(self.statsd_server.metrics[0], - 'simple.gauge', '100', 'g') + 'gauges.simple.gauge', '100', 'g') self.assert_metrics_equal(self.statsd_server.metrics[1], - 'simple.gauge', '-10', 'g') + 'gauges.simple.gauge', '-10', 'g') self.assert_metrics_equal(self.statsd_server.metrics[2], - 'simple.gauge', '+10', 'g') + 'gauges.simple.gauge', '+10', 'g') async def test_sending_timer(self): secs = 12.34 self.connector.timing('simple.timer', secs) await self.wait_for(self.statsd_server.message_received.acquire()) self.assert_metrics_equal(self.statsd_server.metrics[0], - 'simple.timer', 12340.0, 'ms') + 'timers.simple.timer', 12340.0, 'ms') async def test_that_queued_metrics_are_drained(self): # The easiest way to test that the internal metrics queue @@ -348,7 +351,7 @@ class ConnectorTests(ProcessorTestCase): await self.wait_for(self.statsd_server.client_connected.acquire()) for value in range(50): await self.wait_for(self.statsd_server.message_received.acquire()) - self.assertEqual(f'counter:{value}|c'.encode(), + self.assertEqual(f'counters.counter:{value}|c'.encode(), self.statsd_server.metrics.pop(0)) @@ -397,4 +400,4 @@ class ConnectorOptionTests(ProcessorTestCase): # make sure that only the incr's are in the queue for _ in range(connector.processor.queue.qsize()): metric = await connector.processor.queue.get() - self.assertEqual(metric, b'counter:1|c') + self.assertEqual(metric, b'counters.counter:1|c') diff --git a/tests/test_tornado.py b/tests/test_tornado.py index 8432a04..77559b8 100644 --- a/tests/test_tornado.py +++ b/tests/test_tornado.py @@ -62,12 +62,12 @@ class ApplicationTests(AsyncTestCaseWithTimeout): self.unsetenv('STATSD_PREFIX') self.unsetenv('STATSD_PROTOCOL') - app = sprockets_statsd.tornado.Application(statsd={'prefix': None}) + app = sprockets_statsd.tornado.Application(statsd={'prefix': ''}) self.assertIn('statsd', app.settings) self.assertIsNone(app.settings['statsd']['host'], 'default host value should be None') self.assertEqual(8125, app.settings['statsd']['port']) - self.assertEqual(None, app.settings['statsd']['prefix']) + self.assertEqual('', app.settings['statsd']['prefix']) self.assertEqual('tcp', app.settings['statsd']['protocol']) def test_that_statsd_settings_read_from_environment(self): @@ -217,6 +217,20 @@ class ApplicationTests(AsyncTestCaseWithTimeout): app.statsd_connector.processor._ip_protocol) self.run_coroutine(app.stop_statsd()) + def test_disabling_statsd_prefix(self): + app = sprockets_statsd.tornado.Application( + service='my-service', + version='1.0.0', + statsd={ + 'host': 'localhost', + 'prefix': '', + 'protocol': 'udp', + }, + ) + self.run_coroutine(app.start_statsd()) + self.assertEqual(app.statsd_connector.prefix, '') + self.run_coroutine(app.stop_statsd()) + class RequestHandlerTests(AsyncTestCaseWithTimeout, testing.AsyncHTTPTestCase): def setUp(self): @@ -306,19 +320,3 @@ class RequestHandlerTests(AsyncTestCaseWithTimeout, testing.AsyncHTTPTestCase): rsp = self.fetch('/') self.assertEqual(200, rsp.code) - - def test_handling_request_without_prefix(self): - self.app.settings['statsd']['prefix'] = '' - - rsp = self.fetch('/') - self.assertEqual(200, rsp.code) - self.wait_for_metrics() - - path, _, _ = self.find_metric('Handler.GET.200') - self.assertEqual('timers.Handler.GET.200', path) - - path, _, _ = self.find_metric('execution-timer') - self.assertEqual('timers.execution-timer', path) - - path, _, _ = self.find_metric('request-count') - self.assertEqual('counters.request-count', path)