Merge pull request #9 from dave-shawley/support-timer

Add support for python-statsd alike timer method.
This commit is contained in:
Andrew Rabert 2021-07-20 10:22:25 -04:00 committed by GitHub
commit a47f121f6d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 249 additions and 18 deletions

View file

@ -1,3 +1,10 @@
Next release
------------
- Added ``Connector.timer`` method (addresses :issue:`8`)
- Implement ``Timer`` abstraction from python-statsd
- ``Connector.timing`` now accepts :class:`datetime.timedelta` instances in addition
to :class:`float` instances
:tag:`0.1.0 <0.0.1...0.1.0>` (10-May-2021) :tag:`0.1.0 <0.0.1...0.1.0>` (10-May-2021)
------------------------------------------ ------------------------------------------
- Added :envvar:`STATSD_ENABLED` environment variable to disable the Tornado integration - Added :envvar:`STATSD_ENABLED` environment variable to disable the Tornado integration

View file

@ -36,15 +36,29 @@ metric to send and the task consumes the internal queue when it is connected.
The following convenience methods are available. You can also call ``inject_metric`` for complete control over The following convenience methods are available. You can also call ``inject_metric`` for complete control over
the payload. the payload.
+--------------+--------------------------------------+ +--------------+--------------------------------------------------------------+
| ``incr`` | Increment a counter metric | | ``incr`` | Increment a counter metric |
+--------------+--------------------------------------+ +--------------+--------------------------------------------------------------+
| ``decr`` | Decrement a counter metric | | ``decr`` | Decrement a counter metric |
+--------------+--------------------------------------+ +--------------+--------------------------------------------------------------+
| ``gauge`` | Adjust or set a gauge metric | | ``gauge`` | Adjust or set a gauge metric |
+--------------+--------------------------------------+ +--------------+--------------------------------------------------------------+
| ``timing`` | Append a duration to a timer metric | | ``timer`` | Append a duration to a timer metric using a context manager |
+--------------+--------------------------------------+ +--------------+--------------------------------------------------------------+
| ``timing`` | Append a duration to a timer metric |
+--------------+--------------------------------------------------------------+
If you are a `python-statsd`_ user, then the method names should look very familiar. That is quite intentional.
I like the interface and many others do as well. There is one very very important difference though -- the
``timing`` method takes the duration as the number of **seconds** as a :class:`float` instead of the number of
milliseconds.
.. warning::
If you are accustomed to using `python-statsd`_, be aware that the ``timing`` method expects the number of
seconds as a :class:`float` instead of the number of milliseconds.
.. _python-statsd: https://statsd.readthedocs.io/en/latest/
Tornado helpers Tornado helpers
=============== ===============

View file

@ -19,6 +19,7 @@ intersphinx_mapping = {
# https://www.sphinx-doc.org/en/master/usage/extensions/extlinks.html # https://www.sphinx-doc.org/en/master/usage/extensions/extlinks.html
extensions.append('sphinx.ext.extlinks') extensions.append('sphinx.ext.extlinks')
extlinks = { extlinks = {
'issue': ("https://github.com/sprockets/sprockets-statsd/issues/%s", "#%s"),
'tag': ("https://github.com/sprockets/sprockets-statsd/compare/%s", "%s"), 'tag': ("https://github.com/sprockets/sprockets-statsd/compare/%s", "%s"),
} }

View file

@ -46,6 +46,7 @@ Reference
.. autoclass:: sprockets_statsd.statsd.Connector .. autoclass:: sprockets_statsd.statsd.Connector
:members: :members:
:inherited-members: incr, decr, gauge, timer, timing
Tornado helpers Tornado helpers
--------------- ---------------
@ -63,6 +64,9 @@ Internals
.. autoclass:: sprockets_statsd.statsd.Processor .. autoclass:: sprockets_statsd.statsd.Processor
:members: :members:
.. autoclass:: sprockets_statsd.statsd.Timer
:members:
.. autoclass:: sprockets_statsd.statsd.StatsdProtocol .. autoclass:: sprockets_statsd.statsd.StatsdProtocol
:members: :members:

View file

@ -13,7 +13,7 @@ project_urls:
author = Dave Shawley author = Dave Shawley
author_email = daveshawley@gmail.com author_email = daveshawley@gmail.com
classifiers = classifiers =
Development Status :: 4 - Beta Development Status :: 5 - Production/Stable
Intended Audience :: Developers Intended Audience :: Developers
License :: OSI Approved :: BSD License License :: OSI Approved :: BSD License
Natural Language :: English Natural Language :: English
@ -38,16 +38,16 @@ tornado =
dev = dev =
asynctest==0.13.0 asynctest==0.13.0
coverage==5.5 coverage==5.5
flake8==3.8.4 flake8==3.9.2
flake8-import-order==0.18.1 flake8-import-order==0.18.1
sphinx==3.5.2 sphinx==4.1.1
sphinx-autodoc-typehints==1.11.1 sphinx-autodoc-typehints==1.12.0
sprockets.http==2.2.0 sprockets.http==2.2.0
tornado>=5 tornado>=5
yapf==0.30.0 yapf==0.31.0
readthedocs = readthedocs =
sphinx==3.5.2 sphinx==4.1.1
sphinx-autodoc-typehints==1.11.1 sphinx-autodoc-typehints==1.12.1
tornado>=5 tornado>=5
[options.packages.find] [options.packages.find]

View file

@ -1,6 +1,8 @@
import asyncio import asyncio
import datetime
import logging import logging
import socket import socket
import time
import typing import typing
@ -45,6 +47,80 @@ class ThrottleGuard:
self.counter = 0 self.counter = 0
class Timer:
"""Implement Timer interface from python-statsd.
Instances of this class are returned from
:meth:`.AbstractConnector.timer` to maintain some compatibility
with `python-statsd`_. You should not create instances of this
class yourself.
This implementation follows the careful protocol created by
python-statsd in that it raises :exc:`RuntimeError` in the
following cases:
* :meth:`.stop` is called before calling :meth:`.start`
* :meth:`.send` is called before calling :meth:`.stop`
which requires a prior call to :meth:`.start`
The call to :meth:`.send` clears the timing values so calling it
twice in a row will result in a :exc:`RuntimeError` as well.
"""
def __init__(self, connector: 'AbstractConnector', path: str):
self._connector = connector
self._path = path
self._start_time, self._finish_time = None, None
def start(self) -> 'Timer':
"""Start the timer and return `self`."""
self._start_time = time.time()
return self
def stop(self, send: bool = True) -> 'Timer':
"""Stop the timer and send the timing.
:param send: immediately send the recorded timing to the
processor
You can delay sending the timing by setting the `send`
parameter to :data:`False`.
"""
if self._start_time is None:
raise RuntimeError('Timer.stop called before start')
self._finish_time = time.time()
if send:
self.send()
return self
def send(self):
"""Send the recorded timing to the connector.
This method will raise a :exc:`RuntimeError` if a timing has
not been recorded by calling :meth:`start` and :meth:`stop` in
sequence.
"""
if self._start_time is None:
raise RuntimeError('Timer.send called before start')
if self._finish_time is None:
raise RuntimeError('Timer.send called before stop')
self._connector.timing(
self._path,
max(self._finish_time, self._start_time) - self._start_time)
self._start_time, self._finish_time = None, None
def __enter__(self):
self.start()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.stop()
return False
class AbstractConnector: class AbstractConnector:
"""StatsD connector that does not send metrics or connect. """StatsD connector that does not send metrics or connect.
@ -99,15 +175,26 @@ class AbstractConnector:
payload = str(value) payload = str(value)
self.inject_metric(f'gauges.{path}', payload, 'g') self.inject_metric(f'gauges.{path}', payload, 'g')
def timing(self, path: str, seconds: float) -> None: def timing(self, path: str,
seconds: typing.Union[float, datetime.timedelta]) -> None:
"""Send a timer metric. """Send a timer metric.
:param path: timer to append a value to :param path: timer to append a value to
:param seconds: number of **seconds** to record :param seconds: number of **seconds** to record
""" """
if isinstance(seconds, datetime.timedelta):
seconds = seconds.total_seconds()
self.inject_metric(f'timers.{path}', str(seconds * 1000.0), 'ms') self.inject_metric(f'timers.{path}', str(seconds * 1000.0), 'ms')
def timer(self, path) -> Timer:
"""Send a timer metric using a context manager.
:param path: timer to append the measured time to
"""
return Timer(self, path)
class Connector(AbstractConnector): class Connector(AbstractConnector):
"""Sends metrics to a statsd server. """Sends metrics to a statsd server.

View file

@ -1,4 +1,5 @@
import asyncio import asyncio
import datetime
import logging import logging
import socket import socket
import time import time
@ -286,7 +287,13 @@ class ConnectorTests(ProcessorTestCase):
recvd_path, _, rest = decoded.partition(':') recvd_path, _, rest = decoded.partition(':')
recvd_value, _, recvd_code = rest.partition('|') recvd_value, _, recvd_code = rest.partition('|')
self.assertEqual(path, recvd_path, 'metric path mismatch') self.assertEqual(path, recvd_path, 'metric path mismatch')
self.assertEqual(recvd_value, str(value), 'metric value mismatch') if type_code == 'ms':
self.assertAlmostEqual(float(recvd_value),
value,
places=3,
msg='metric value mismatch')
else:
self.assertEqual(recvd_value, str(value), 'metric value mismatch')
self.assertEqual(recvd_code, type_code, 'metric type mismatch') self.assertEqual(recvd_code, type_code, 'metric type mismatch')
async def test_adjusting_counter(self): async def test_adjusting_counter(self):
@ -331,6 +338,37 @@ class ConnectorTests(ProcessorTestCase):
self.assert_metrics_equal(self.statsd_server.metrics[0], self.assert_metrics_equal(self.statsd_server.metrics[0],
'timers.simple.timer', 12340.0, 'ms') 'timers.simple.timer', 12340.0, 'ms')
async def test_sending_timer_using_timedelta(self):
secs = datetime.timedelta(seconds=12, milliseconds=340)
self.connector.timing('simple.timer', secs)
await self.wait_for(self.statsd_server.message_received.acquire())
self.assert_metrics_equal(self.statsd_server.metrics[0],
'timers.simple.timer', 12340.0, 'ms')
async def test_timing_context_manager(self):
with unittest.mock.patch(
'sprockets_statsd.statsd.time.time') as time_function:
time_function.side_effect = [10.0, 22.345]
with self.connector.timer('some.timer'):
pass # exercising context manager
self.assertEqual(2, time_function.call_count)
await self.wait_for(self.statsd_server.message_received.acquire())
self.assert_metrics_equal(self.statsd_server.metrics[0],
'timers.some.timer', 12345.0, 'ms')
async def test_timer_is_monotonic(self):
with unittest.mock.patch(
'sprockets_statsd.statsd.time.time') as time_function:
time_function.side_effect = [10.001, 10.000]
with self.connector.timer('some.timer'):
pass # exercising context manager
self.assertEqual(2, time_function.call_count)
await self.wait_for(self.statsd_server.message_received.acquire())
self.assert_metrics_equal(self.statsd_server.metrics[0],
'timers.some.timer', 0.0, 'ms')
async def test_that_queued_metrics_are_drained(self): async def test_that_queued_metrics_are_drained(self):
# The easiest way to test that the internal metrics queue # The easiest way to test that the internal metrics queue
# is drained when the processor is stopped is to monkey # is drained when the processor is stopped is to monkey
@ -430,3 +468,83 @@ class ConnectorOptionTests(ProcessorTestCase):
for _ in range(connector.processor.queue.qsize()): for _ in range(connector.processor.queue.qsize()):
metric = await connector.processor.queue.get() metric = await connector.processor.queue.get()
self.assertEqual(metric, b'counters.counter:1|c') self.assertEqual(metric, b'counters.counter:1|c')
class ConnectorTimerTests(ProcessorTestCase):
ip_protocol = socket.IPPROTO_TCP
async def asyncSetUp(self):
await super().asyncSetUp()
self.connector = statsd.Connector(self.statsd_server.host,
self.statsd_server.port)
await self.connector.start()
await self.wait_for(self.statsd_server.client_connected.acquire())
async def asyncTearDown(self):
await self.wait_for(self.connector.stop())
await super().asyncTearDown()
async def test_that_stop_raises_if_not_started(self):
timer = self.connector.timer('whatever')
with self.assertRaises(RuntimeError):
timer.stop()
async def test_that_start_returns_instance(self):
timer = self.connector.timer('whatever')
self.assertIs(timer, timer.start())
async def test_that_stop_returns_instance(self):
timer = self.connector.timer('whatever')
timer.start()
self.assertIs(timer, timer.stop())
async def test_that_timing_is_sent_by_stop(self):
timer = self.connector.timer('whatever')
timer.start()
self.assertTrue(self.statsd_server.message_received.locked(),
'timing sent to server unexpectedly')
timer.stop()
await self.wait_for(self.statsd_server.message_received.acquire())
async def test_that_timing_send_can_be_delayed(self):
timer = self.connector.timer('whatever')
timer.start()
self.assertTrue(self.statsd_server.message_received.locked(),
'timing sent to server unexpectedly')
timer.stop(send=False)
self.assertTrue(self.statsd_server.message_received.locked(),
'timing sent to server unexpectedly')
timer.send()
await self.wait_for(self.statsd_server.message_received.acquire())
async def test_that_send_raises_when_already_sent(self):
timer = self.connector.timer('whatever')
timer.start()
timer.stop(send=False)
timer.send()
await self.wait_for(self.statsd_server.message_received.acquire())
with self.assertRaises(RuntimeError):
timer.send()
async def test_that_send_raises_when_not_started(self):
timer = self.connector.timer('whatever')
with self.assertRaises(RuntimeError):
timer.send()
async def test_that_send_raises_when_not_stopped(self):
timer = self.connector.timer('whatever')
timer.start()
with self.assertRaises(RuntimeError):
timer.send()
async def test_that_timer_can_be_reused(self):
timer = self.connector.timer('whatever')
with timer:
pass # exercising context manager
await self.wait_for(self.statsd_server.message_received.acquire())
self.assertTrue(self.statsd_server.message_received.locked())
with timer:
pass # exercising context manager
await self.wait_for(self.statsd_server.message_received.acquire())