From 65df48042158bd78bee3ec83b0d1f94ba07d931b Mon Sep 17 00:00:00 2001 From: Dave Shawley Date: Wed, 24 Mar 2021 07:36:16 -0400 Subject: [PATCH] Expose a more usable interface on the `Connector`. @nvllsvm likes the interface that https://statsd.readthedocs.io/ exposes and I agree. I mimicked this interface on the lower-level Connector. I left the sprockets.mixins.metrics style on the tornado helpers for compat reasons. You can always use `self.statsd_connector.incr()` in your request handlers if you prefer the other interface. --- README.rst | 21 +++++++++++++--- sprockets_statsd/statsd.py | 47 +++++++++++++++++++++++++++++++++++ sprockets_statsd/tornado.py | 8 +++--- tests/test_processor.py | 49 ++++++++++++++++++++++++------------- 4 files changed, 100 insertions(+), 25 deletions(-) diff --git a/README.rst b/README.rst index c5aa4a4..b072e31 100644 --- a/README.rst +++ b/README.rst @@ -15,8 +15,8 @@ This library provides connectors to send metrics to a statsd_ instance using eit async def do_stuff(): start = time.time() response = make_some_http_call() - statsd.inject_metric(f'timers.http.something.{response.code}', - (time.time() - start) * 1000.0, 'ms') + statsd.timing(f'timers.http.something.{response.code}', + (time.time() - start)) async def main(): await statsd.start() @@ -28,8 +28,21 @@ This library provides connectors to send metrics to a statsd_ instance using eit The ``Connector`` instance maintains a resilient connection to the target StatsD instance, formats the metric data into payloads, and sends them to the StatsD target. It defaults to using TCP as the transport but will use UDP if the ``ip_protocol`` keyword is set to ``socket.IPPROTO_UDP``. The ``Connector.start`` method starts a background -``asyncio.Task`` that is responsible for maintaining the connection. The ``inject_metric`` method enqueues metric -data to send and the task consumes the internal queue when it is connected. +``asyncio.Task`` that is responsible for maintaining the connection. The ``timing`` method enqueues a timing +metric to send and the task consumes the internal queue when it is connected. + +The following convenience methods are available. You can also call ``inject_metric`` for complete control over +the payload. + ++--------------+--------------------------------------+ +| ``incr`` | Increment a counter metric | ++--------------+--------------------------------------+ +| ``decr`` | Decrement a counter metric | ++--------------+--------------------------------------+ +| ``gauge`` | Adjust or set a gauge metric | ++--------------+--------------------------------------+ +| ``timing`` | Append a duration to a timer metric | ++--------------+--------------------------------------+ Tornado helpers =============== diff --git a/sprockets_statsd/statsd.py b/sprockets_statsd/statsd.py index ac2eabe..1ef1957 100644 --- a/sprockets_statsd/statsd.py +++ b/sprockets_statsd/statsd.py @@ -60,6 +60,53 @@ class Connector: """ await self.processor.stop() + def incr(self, path: str, value: int = 1): + """Increment a counter metric. + + :param path: counter to increment + :param value: amount to increment the counter by + + """ + self.inject_metric(path, str(value), 'c') + + def decr(self, path: str, value: int = 1): + """Decrement a counter metric. + + :param path: counter to decrement + :param value: amount to decrement the counter by + + This is equivalent to ``self.incr(path, -value)``. + + """ + self.inject_metric(path, str(-value), 'c') + + def gauge(self, path: str, value: int, delta: bool = False): + """Manipulate a gauge metric. + + :param path: gauge to adjust + :param value: value to send + :param delta: is this an adjustment of the gauge? + + If the `delta` parameter is ``False`` (or omitted), then + `value` is the new value to set the gauge to. Otherwise, + `value` is an adjustment for the current gauge. + + """ + if delta: + value = f'{value:+d}' + else: + value = str(value) + self.inject_metric(path, value, 'g') + + def timing(self, path: str, seconds: float): + """Send a timer metric. + + :param path: timer to append a value to + :param seconds: number of **seconds** to record + + """ + self.inject_metric(path, seconds * 1000.0, 'ms') + def inject_metric(self, path: str, value, type_code: str): """Send a metric to the statsd server. diff --git a/sprockets_statsd/tornado.py b/sprockets_statsd/tornado.py index a21635a..56ab888 100644 --- a/sprockets_statsd/tornado.py +++ b/sprockets_statsd/tornado.py @@ -170,8 +170,8 @@ class RequestHandler(web.RequestHandler): """ if self.statsd_connector is not None: - self.statsd_connector.inject_metric( - self.__build_path('timers', *path), secs * 1000.0, 'ms') + self.statsd_connector.timing(self.__build_path('timers', *path), + secs) def increase_counter(self, *path, amount: int = 1): """Adjust a counter. @@ -182,8 +182,8 @@ class RequestHandler(web.RequestHandler): """ if self.statsd_connector is not None: - self.statsd_connector.inject_metric( - self.__build_path('counters', *path), amount, 'c') + self.statsd_connector.incr(self.__build_path('counters', *path), + amount) @contextlib.contextmanager def execution_timer(self, *path): diff --git a/tests/test_processor.py b/tests/test_processor.py index fec1bb7..59df78f 100644 --- a/tests/test_processor.py +++ b/tests/test_processor.py @@ -218,8 +218,8 @@ class UDPProcessingTests(ProcessorTestCase): await super().asyncTearDown() async def test_sending_metrics(self): - self.connector.inject_metric('counter', 1, 'c') - self.connector.inject_metric('timer', 1.0, 'ms') + self.connector.incr('counter') + self.connector.timing('timer', 0.001) await self.wait_for(self.statsd_server.message_received.acquire()) await self.wait_for(self.statsd_server.message_received.acquire()) @@ -230,13 +230,13 @@ class UDPProcessingTests(ProcessorTestCase): self.statsd_server.close() await self.statsd_server.wait_closed() - self.connector.inject_metric('should.be.lost', 1, 'c') + self.connector.incr('should.be.lost') await asyncio.sleep(self.connector.processor._wait_timeout * 2) self.statsd_task = asyncio.create_task(self.statsd_server.run()) await self.statsd_server.wait_running() - self.connector.inject_metric('should.be.recvd', 1, 'c') + self.connector.incr('should.be.recvd') await self.wait_for(self.statsd_server.message_received.acquire()) self.assertEqual(self.statsd_server.metrics[0], b'should.be.recvd:1|c') @@ -245,7 +245,7 @@ class UDPProcessingTests(ProcessorTestCase): await self.wait_for( asyncio.sleep(self.connector.processor._reconnect_sleep)) - self.connector.inject_metric('should.be.recvd', 1, 'c') + self.connector.incr('should.be.recvd') await self.wait_for(self.statsd_server.message_received.acquire()) self.assertEqual(self.statsd_server.metrics[0], b'should.be.recvd:1|c') @@ -272,16 +272,31 @@ class ConnectorTests(ProcessorTestCase): self.assertEqual(recvd_value, str(value), 'metric value mismatch') self.assertEqual(recvd_code, type_code, 'metric type mismatch') - async def test_sending_simple_counter(self): - self.connector.inject_metric('simple.counter', 1000, 'c') + async def test_adjusting_counter(self): + self.connector.incr('simple.counter') await self.wait_for(self.statsd_server.message_received.acquire()) - self.assert_metrics_equal(self.statsd_server.metrics[0], - 'simple.counter', 1000, 'c') + self.assert_metrics_equal(self.statsd_server.metrics[-1], + 'simple.counter', 1, 'c') + + self.connector.incr('simple.counter', 10) + await self.wait_for(self.statsd_server.message_received.acquire()) + self.assert_metrics_equal(self.statsd_server.metrics[-1], + 'simple.counter', 10, 'c') + + self.connector.decr('simple.counter') + await self.wait_for(self.statsd_server.message_received.acquire()) + self.assert_metrics_equal(self.statsd_server.metrics[-1], + 'simple.counter', -1, 'c') + + self.connector.decr('simple.counter', 10) + await self.wait_for(self.statsd_server.message_received.acquire()) + self.assert_metrics_equal(self.statsd_server.metrics[-1], + 'simple.counter', -10, 'c') async def test_adjusting_gauge(self): - self.connector.inject_metric('simple.gauge', 100, 'g') - self.connector.inject_metric('simple.gauge', -10, 'g') - self.connector.inject_metric('simple.gauge', '+10', 'g') + self.connector.gauge('simple.gauge', 100) + self.connector.gauge('simple.gauge', -10, delta=True) + self.connector.gauge('simple.gauge', 10, delta=True) for _ in range(3): await self.wait_for(self.statsd_server.message_received.acquire()) @@ -294,7 +309,7 @@ class ConnectorTests(ProcessorTestCase): async def test_sending_timer(self): secs = 12.34 - self.connector.inject_metric('simple.timer', secs * 1000.0, 'ms') + self.connector.timing('simple.timer', secs) await self.wait_for(self.statsd_server.message_received.acquire()) self.assert_metrics_equal(self.statsd_server.metrics[0], 'simple.timer', 12340.0, 'ms') @@ -309,9 +324,9 @@ class ConnectorTests(ProcessorTestCase): async def fake_process_metric(): if not self.connector.processor.should_terminate: - self.connector.inject_metric('counter', 1, 'c') - self.connector.inject_metric('counter', 2, 'c') - self.connector.inject_metric('counter', 3, 'c') + self.connector.incr('counter', 1) + self.connector.incr('counter', 2) + self.connector.incr('counter', 3) self.connector.processor.should_terminate = True return await real_process_metric() @@ -325,7 +340,7 @@ class ConnectorTests(ProcessorTestCase): await self.statsd_server.wait_closed() for value in range(50): - self.connector.inject_metric('counter', value, 'c') + self.connector.incr('counter', value) asyncio.create_task(self.statsd_server.run()) await self.wait_for(self.statsd_server.client_connected.acquire())