Expose a more usable interface on the Connector.

@nvllsvm likes the interface that https://statsd.readthedocs.io/ exposes
and I agree.  I mimicked this interface on the lower-level Connector.  I
left the sprockets.mixins.metrics style on the tornado helpers for
compat reasons.  You can always use `self.statsd_connector.incr()` in
your request handlers if you prefer the other interface.
This commit is contained in:
Dave Shawley 2021-03-24 07:36:16 -04:00
parent e5c343e577
commit 65df480421
No known key found for this signature in database
GPG key ID: 44A9C9992CCFAB82
4 changed files with 100 additions and 25 deletions

View file

@ -15,8 +15,8 @@ This library provides connectors to send metrics to a statsd_ instance using eit
async def do_stuff():
start = time.time()
response = make_some_http_call()
statsd.inject_metric(f'timers.http.something.{response.code}',
(time.time() - start) * 1000.0, 'ms')
statsd.timing(f'timers.http.something.{response.code}',
(time.time() - start))
async def main():
await statsd.start()
@ -28,8 +28,21 @@ This library provides connectors to send metrics to a statsd_ instance using eit
The ``Connector`` instance maintains a resilient connection to the target StatsD instance, formats the metric data
into payloads, and sends them to the StatsD target. It defaults to using TCP as the transport but will use UDP if
the ``ip_protocol`` keyword is set to ``socket.IPPROTO_UDP``. The ``Connector.start`` method starts a background
``asyncio.Task`` that is responsible for maintaining the connection. The ``inject_metric`` method enqueues metric
data to send and the task consumes the internal queue when it is connected.
``asyncio.Task`` that is responsible for maintaining the connection. The ``timing`` method enqueues a timing
metric to send and the task consumes the internal queue when it is connected.
The following convenience methods are available. You can also call ``inject_metric`` for complete control over
the payload.
+--------------+--------------------------------------+
| ``incr`` | Increment a counter metric |
+--------------+--------------------------------------+
| ``decr`` | Decrement a counter metric |
+--------------+--------------------------------------+
| ``gauge`` | Adjust or set a gauge metric |
+--------------+--------------------------------------+
| ``timing`` | Append a duration to a timer metric |
+--------------+--------------------------------------+
Tornado helpers
===============

View file

@ -60,6 +60,53 @@ class Connector:
"""
await self.processor.stop()
def incr(self, path: str, value: int = 1):
"""Increment a counter metric.
:param path: counter to increment
:param value: amount to increment the counter by
"""
self.inject_metric(path, str(value), 'c')
def decr(self, path: str, value: int = 1):
"""Decrement a counter metric.
:param path: counter to decrement
:param value: amount to decrement the counter by
This is equivalent to ``self.incr(path, -value)``.
"""
self.inject_metric(path, str(-value), 'c')
def gauge(self, path: str, value: int, delta: bool = False):
"""Manipulate a gauge metric.
:param path: gauge to adjust
:param value: value to send
:param delta: is this an adjustment of the gauge?
If the `delta` parameter is ``False`` (or omitted), then
`value` is the new value to set the gauge to. Otherwise,
`value` is an adjustment for the current gauge.
"""
if delta:
value = f'{value:+d}'
else:
value = str(value)
self.inject_metric(path, value, 'g')
def timing(self, path: str, seconds: float):
"""Send a timer metric.
:param path: timer to append a value to
:param seconds: number of **seconds** to record
"""
self.inject_metric(path, seconds * 1000.0, 'ms')
def inject_metric(self, path: str, value, type_code: str):
"""Send a metric to the statsd server.

View file

@ -170,8 +170,8 @@ class RequestHandler(web.RequestHandler):
"""
if self.statsd_connector is not None:
self.statsd_connector.inject_metric(
self.__build_path('timers', *path), secs * 1000.0, 'ms')
self.statsd_connector.timing(self.__build_path('timers', *path),
secs)
def increase_counter(self, *path, amount: int = 1):
"""Adjust a counter.
@ -182,8 +182,8 @@ class RequestHandler(web.RequestHandler):
"""
if self.statsd_connector is not None:
self.statsd_connector.inject_metric(
self.__build_path('counters', *path), amount, 'c')
self.statsd_connector.incr(self.__build_path('counters', *path),
amount)
@contextlib.contextmanager
def execution_timer(self, *path):

View file

@ -218,8 +218,8 @@ class UDPProcessingTests(ProcessorTestCase):
await super().asyncTearDown()
async def test_sending_metrics(self):
self.connector.inject_metric('counter', 1, 'c')
self.connector.inject_metric('timer', 1.0, 'ms')
self.connector.incr('counter')
self.connector.timing('timer', 0.001)
await self.wait_for(self.statsd_server.message_received.acquire())
await self.wait_for(self.statsd_server.message_received.acquire())
@ -230,13 +230,13 @@ class UDPProcessingTests(ProcessorTestCase):
self.statsd_server.close()
await self.statsd_server.wait_closed()
self.connector.inject_metric('should.be.lost', 1, 'c')
self.connector.incr('should.be.lost')
await asyncio.sleep(self.connector.processor._wait_timeout * 2)
self.statsd_task = asyncio.create_task(self.statsd_server.run())
await self.statsd_server.wait_running()
self.connector.inject_metric('should.be.recvd', 1, 'c')
self.connector.incr('should.be.recvd')
await self.wait_for(self.statsd_server.message_received.acquire())
self.assertEqual(self.statsd_server.metrics[0], b'should.be.recvd:1|c')
@ -245,7 +245,7 @@ class UDPProcessingTests(ProcessorTestCase):
await self.wait_for(
asyncio.sleep(self.connector.processor._reconnect_sleep))
self.connector.inject_metric('should.be.recvd', 1, 'c')
self.connector.incr('should.be.recvd')
await self.wait_for(self.statsd_server.message_received.acquire())
self.assertEqual(self.statsd_server.metrics[0], b'should.be.recvd:1|c')
@ -272,16 +272,31 @@ class ConnectorTests(ProcessorTestCase):
self.assertEqual(recvd_value, str(value), 'metric value mismatch')
self.assertEqual(recvd_code, type_code, 'metric type mismatch')
async def test_sending_simple_counter(self):
self.connector.inject_metric('simple.counter', 1000, 'c')
async def test_adjusting_counter(self):
self.connector.incr('simple.counter')
await self.wait_for(self.statsd_server.message_received.acquire())
self.assert_metrics_equal(self.statsd_server.metrics[0],
'simple.counter', 1000, 'c')
self.assert_metrics_equal(self.statsd_server.metrics[-1],
'simple.counter', 1, 'c')
self.connector.incr('simple.counter', 10)
await self.wait_for(self.statsd_server.message_received.acquire())
self.assert_metrics_equal(self.statsd_server.metrics[-1],
'simple.counter', 10, 'c')
self.connector.decr('simple.counter')
await self.wait_for(self.statsd_server.message_received.acquire())
self.assert_metrics_equal(self.statsd_server.metrics[-1],
'simple.counter', -1, 'c')
self.connector.decr('simple.counter', 10)
await self.wait_for(self.statsd_server.message_received.acquire())
self.assert_metrics_equal(self.statsd_server.metrics[-1],
'simple.counter', -10, 'c')
async def test_adjusting_gauge(self):
self.connector.inject_metric('simple.gauge', 100, 'g')
self.connector.inject_metric('simple.gauge', -10, 'g')
self.connector.inject_metric('simple.gauge', '+10', 'g')
self.connector.gauge('simple.gauge', 100)
self.connector.gauge('simple.gauge', -10, delta=True)
self.connector.gauge('simple.gauge', 10, delta=True)
for _ in range(3):
await self.wait_for(self.statsd_server.message_received.acquire())
@ -294,7 +309,7 @@ class ConnectorTests(ProcessorTestCase):
async def test_sending_timer(self):
secs = 12.34
self.connector.inject_metric('simple.timer', secs * 1000.0, 'ms')
self.connector.timing('simple.timer', secs)
await self.wait_for(self.statsd_server.message_received.acquire())
self.assert_metrics_equal(self.statsd_server.metrics[0],
'simple.timer', 12340.0, 'ms')
@ -309,9 +324,9 @@ class ConnectorTests(ProcessorTestCase):
async def fake_process_metric():
if not self.connector.processor.should_terminate:
self.connector.inject_metric('counter', 1, 'c')
self.connector.inject_metric('counter', 2, 'c')
self.connector.inject_metric('counter', 3, 'c')
self.connector.incr('counter', 1)
self.connector.incr('counter', 2)
self.connector.incr('counter', 3)
self.connector.processor.should_terminate = True
return await real_process_metric()
@ -325,7 +340,7 @@ class ConnectorTests(ProcessorTestCase):
await self.statsd_server.wait_closed()
for value in range(50):
self.connector.inject_metric('counter', value, 'c')
self.connector.incr('counter', value)
asyncio.create_task(self.statsd_server.run())
await self.wait_for(self.statsd_server.client_connected.acquire())