diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 11a3e27..198b6bd 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,3 +1,10 @@ +Next release +------------ +- Added ``Connector.timer`` method (addresses :issue:`8`) +- Implement ``Timer`` abstraction from python-statsd +- ``Connector.timing`` now accepts :class:`datetime.timedelta` instances in addition + to :class:`float` instances + :tag:`0.1.0 <0.0.1...0.1.0>` (10-May-2021) ------------------------------------------ - Added :envvar:`STATSD_ENABLED` environment variable to disable the Tornado integration diff --git a/README.rst b/README.rst index 117fb64..5584c4c 100644 --- a/README.rst +++ b/README.rst @@ -36,15 +36,29 @@ metric to send and the task consumes the internal queue when it is connected. The following convenience methods are available. You can also call ``inject_metric`` for complete control over the payload. -+--------------+--------------------------------------+ -| ``incr`` | Increment a counter metric | -+--------------+--------------------------------------+ -| ``decr`` | Decrement a counter metric | -+--------------+--------------------------------------+ -| ``gauge`` | Adjust or set a gauge metric | -+--------------+--------------------------------------+ -| ``timing`` | Append a duration to a timer metric | -+--------------+--------------------------------------+ ++--------------+--------------------------------------------------------------+ +| ``incr`` | Increment a counter metric | ++--------------+--------------------------------------------------------------+ +| ``decr`` | Decrement a counter metric | ++--------------+--------------------------------------------------------------+ +| ``gauge`` | Adjust or set a gauge metric | ++--------------+--------------------------------------------------------------+ +| ``timer`` | Append a duration to a timer metric using a context manager | ++--------------+--------------------------------------------------------------+ +| ``timing`` | Append a duration to a timer metric | ++--------------+--------------------------------------------------------------+ + +If you are a `python-statsd`_ user, then the method names should look very familiar. That is quite intentional. +I like the interface and many others do as well. There is one very very important difference though -- the +``timing`` method takes the duration as the number of **seconds** as a :class:`float` instead of the number of +milliseconds. + +.. warning:: + + If you are accustomed to using `python-statsd`_, be aware that the ``timing`` method expects the number of + seconds as a :class:`float` instead of the number of milliseconds. + +.. _python-statsd: https://statsd.readthedocs.io/en/latest/ Tornado helpers =============== diff --git a/docs/conf.py b/docs/conf.py index 7f1cdb5..b0e18c7 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -19,6 +19,7 @@ intersphinx_mapping = { # https://www.sphinx-doc.org/en/master/usage/extensions/extlinks.html extensions.append('sphinx.ext.extlinks') extlinks = { + 'issue': ("https://github.com/sprockets/sprockets-statsd/issues/%s", "#%s"), 'tag': ("https://github.com/sprockets/sprockets-statsd/compare/%s", "%s"), } diff --git a/docs/index.rst b/docs/index.rst index 96657cc..58e62af 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -46,6 +46,7 @@ Reference .. autoclass:: sprockets_statsd.statsd.Connector :members: + :inherited-members: incr, decr, gauge, timer, timing Tornado helpers --------------- @@ -63,6 +64,9 @@ Internals .. autoclass:: sprockets_statsd.statsd.Processor :members: +.. autoclass:: sprockets_statsd.statsd.Timer + :members: + .. autoclass:: sprockets_statsd.statsd.StatsdProtocol :members: diff --git a/setup.cfg b/setup.cfg index 11b8123..d20f21e 100644 --- a/setup.cfg +++ b/setup.cfg @@ -13,7 +13,7 @@ project_urls: author = Dave Shawley author_email = daveshawley@gmail.com classifiers = - Development Status :: 4 - Beta + Development Status :: 5 - Production/Stable Intended Audience :: Developers License :: OSI Approved :: BSD License Natural Language :: English @@ -38,16 +38,16 @@ tornado = dev = asynctest==0.13.0 coverage==5.5 - flake8==3.8.4 + flake8==3.9.2 flake8-import-order==0.18.1 - sphinx==3.5.2 - sphinx-autodoc-typehints==1.11.1 + sphinx==4.1.1 + sphinx-autodoc-typehints==1.12.0 sprockets.http==2.2.0 tornado>=5 - yapf==0.30.0 + yapf==0.31.0 readthedocs = - sphinx==3.5.2 - sphinx-autodoc-typehints==1.11.1 + sphinx==4.1.1 + sphinx-autodoc-typehints==1.12.1 tornado>=5 [options.packages.find] diff --git a/sprockets_statsd/statsd.py b/sprockets_statsd/statsd.py index 513803e..73d0393 100644 --- a/sprockets_statsd/statsd.py +++ b/sprockets_statsd/statsd.py @@ -1,6 +1,8 @@ import asyncio +import datetime import logging import socket +import time import typing @@ -45,6 +47,80 @@ class ThrottleGuard: self.counter = 0 +class Timer: + """Implement Timer interface from python-statsd. + + Instances of this class are returned from + :meth:`.AbstractConnector.timer` to maintain some compatibility + with `python-statsd`_. You should not create instances of this + class yourself. + + This implementation follows the careful protocol created by + python-statsd in that it raises :exc:`RuntimeError` in the + following cases: + + * :meth:`.stop` is called before calling :meth:`.start` + * :meth:`.send` is called before calling :meth:`.stop` + which requires a prior call to :meth:`.start` + + The call to :meth:`.send` clears the timing values so calling it + twice in a row will result in a :exc:`RuntimeError` as well. + + """ + def __init__(self, connector: 'AbstractConnector', path: str): + self._connector = connector + self._path = path + self._start_time, self._finish_time = None, None + + def start(self) -> 'Timer': + """Start the timer and return `self`.""" + self._start_time = time.time() + return self + + def stop(self, send: bool = True) -> 'Timer': + """Stop the timer and send the timing. + + :param send: immediately send the recorded timing to the + processor + + You can delay sending the timing by setting the `send` + parameter to :data:`False`. + + """ + if self._start_time is None: + raise RuntimeError('Timer.stop called before start') + self._finish_time = time.time() + if send: + self.send() + return self + + def send(self): + """Send the recorded timing to the connector. + + This method will raise a :exc:`RuntimeError` if a timing has + not been recorded by calling :meth:`start` and :meth:`stop` in + sequence. + + """ + if self._start_time is None: + raise RuntimeError('Timer.send called before start') + if self._finish_time is None: + raise RuntimeError('Timer.send called before stop') + + self._connector.timing( + self._path, + max(self._finish_time, self._start_time) - self._start_time) + self._start_time, self._finish_time = None, None + + def __enter__(self): + self.start() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.stop() + return False + + class AbstractConnector: """StatsD connector that does not send metrics or connect. @@ -99,15 +175,26 @@ class AbstractConnector: payload = str(value) self.inject_metric(f'gauges.{path}', payload, 'g') - def timing(self, path: str, seconds: float) -> None: + def timing(self, path: str, + seconds: typing.Union[float, datetime.timedelta]) -> None: """Send a timer metric. :param path: timer to append a value to :param seconds: number of **seconds** to record """ + if isinstance(seconds, datetime.timedelta): + seconds = seconds.total_seconds() self.inject_metric(f'timers.{path}', str(seconds * 1000.0), 'ms') + def timer(self, path) -> Timer: + """Send a timer metric using a context manager. + + :param path: timer to append the measured time to + + """ + return Timer(self, path) + class Connector(AbstractConnector): """Sends metrics to a statsd server. diff --git a/tests/test_processor.py b/tests/test_processor.py index ab69c5e..d7b03e3 100644 --- a/tests/test_processor.py +++ b/tests/test_processor.py @@ -1,4 +1,5 @@ import asyncio +import datetime import logging import socket import time @@ -286,7 +287,13 @@ class ConnectorTests(ProcessorTestCase): recvd_path, _, rest = decoded.partition(':') recvd_value, _, recvd_code = rest.partition('|') self.assertEqual(path, recvd_path, 'metric path mismatch') - self.assertEqual(recvd_value, str(value), 'metric value mismatch') + if type_code == 'ms': + self.assertAlmostEqual(float(recvd_value), + value, + places=3, + msg='metric value mismatch') + else: + self.assertEqual(recvd_value, str(value), 'metric value mismatch') self.assertEqual(recvd_code, type_code, 'metric type mismatch') async def test_adjusting_counter(self): @@ -331,6 +338,37 @@ class ConnectorTests(ProcessorTestCase): self.assert_metrics_equal(self.statsd_server.metrics[0], 'timers.simple.timer', 12340.0, 'ms') + async def test_sending_timer_using_timedelta(self): + secs = datetime.timedelta(seconds=12, milliseconds=340) + self.connector.timing('simple.timer', secs) + await self.wait_for(self.statsd_server.message_received.acquire()) + self.assert_metrics_equal(self.statsd_server.metrics[0], + 'timers.simple.timer', 12340.0, 'ms') + + async def test_timing_context_manager(self): + with unittest.mock.patch( + 'sprockets_statsd.statsd.time.time') as time_function: + time_function.side_effect = [10.0, 22.345] + with self.connector.timer('some.timer'): + pass # exercising context manager + self.assertEqual(2, time_function.call_count) + + await self.wait_for(self.statsd_server.message_received.acquire()) + self.assert_metrics_equal(self.statsd_server.metrics[0], + 'timers.some.timer', 12345.0, 'ms') + + async def test_timer_is_monotonic(self): + with unittest.mock.patch( + 'sprockets_statsd.statsd.time.time') as time_function: + time_function.side_effect = [10.001, 10.000] + with self.connector.timer('some.timer'): + pass # exercising context manager + self.assertEqual(2, time_function.call_count) + + await self.wait_for(self.statsd_server.message_received.acquire()) + self.assert_metrics_equal(self.statsd_server.metrics[0], + 'timers.some.timer', 0.0, 'ms') + async def test_that_queued_metrics_are_drained(self): # The easiest way to test that the internal metrics queue # is drained when the processor is stopped is to monkey @@ -430,3 +468,83 @@ class ConnectorOptionTests(ProcessorTestCase): for _ in range(connector.processor.queue.qsize()): metric = await connector.processor.queue.get() self.assertEqual(metric, b'counters.counter:1|c') + + +class ConnectorTimerTests(ProcessorTestCase): + ip_protocol = socket.IPPROTO_TCP + + async def asyncSetUp(self): + await super().asyncSetUp() + self.connector = statsd.Connector(self.statsd_server.host, + self.statsd_server.port) + await self.connector.start() + await self.wait_for(self.statsd_server.client_connected.acquire()) + + async def asyncTearDown(self): + await self.wait_for(self.connector.stop()) + await super().asyncTearDown() + + async def test_that_stop_raises_if_not_started(self): + timer = self.connector.timer('whatever') + with self.assertRaises(RuntimeError): + timer.stop() + + async def test_that_start_returns_instance(self): + timer = self.connector.timer('whatever') + self.assertIs(timer, timer.start()) + + async def test_that_stop_returns_instance(self): + timer = self.connector.timer('whatever') + timer.start() + self.assertIs(timer, timer.stop()) + + async def test_that_timing_is_sent_by_stop(self): + timer = self.connector.timer('whatever') + timer.start() + self.assertTrue(self.statsd_server.message_received.locked(), + 'timing sent to server unexpectedly') + timer.stop() + await self.wait_for(self.statsd_server.message_received.acquire()) + + async def test_that_timing_send_can_be_delayed(self): + timer = self.connector.timer('whatever') + timer.start() + self.assertTrue(self.statsd_server.message_received.locked(), + 'timing sent to server unexpectedly') + timer.stop(send=False) + self.assertTrue(self.statsd_server.message_received.locked(), + 'timing sent to server unexpectedly') + + timer.send() + await self.wait_for(self.statsd_server.message_received.acquire()) + + async def test_that_send_raises_when_already_sent(self): + timer = self.connector.timer('whatever') + timer.start() + timer.stop(send=False) + timer.send() + await self.wait_for(self.statsd_server.message_received.acquire()) + with self.assertRaises(RuntimeError): + timer.send() + + async def test_that_send_raises_when_not_started(self): + timer = self.connector.timer('whatever') + with self.assertRaises(RuntimeError): + timer.send() + + async def test_that_send_raises_when_not_stopped(self): + timer = self.connector.timer('whatever') + timer.start() + with self.assertRaises(RuntimeError): + timer.send() + + async def test_that_timer_can_be_reused(self): + timer = self.connector.timer('whatever') + with timer: + pass # exercising context manager + await self.wait_for(self.statsd_server.message_received.acquire()) + self.assertTrue(self.statsd_server.message_received.locked()) + + with timer: + pass # exercising context manager + await self.wait_for(self.statsd_server.message_received.acquire())