mirror of
https://github.com/sprockets/sprockets.mixins.metrics.git
synced 2024-11-22 03:00:25 +00:00
Merge pull request #35 from dave-shawley/add-close
Add StatsDCollector.close and some docs
This commit is contained in:
commit
54fc1ff50b
8 changed files with 117 additions and 28 deletions
12
docs/api.rst
12
docs/api.rst
|
@ -49,6 +49,18 @@ Statsd Implementation
|
||||||
.. autoclass:: sprockets.mixins.metrics.statsd.StatsdMixin
|
.. autoclass:: sprockets.mixins.metrics.statsd.StatsdMixin
|
||||||
:members:
|
:members:
|
||||||
|
|
||||||
|
.. autoclass:: sprockets.mixins.metrics.statsd.StatsDCollector
|
||||||
|
:members:
|
||||||
|
|
||||||
|
Application Functions
|
||||||
|
---------------------
|
||||||
|
Before you can use the mixin, you have to install the client by calling
|
||||||
|
the ``install`` function on your application instance.
|
||||||
|
|
||||||
|
.. autofunction:: sprockets.mixins.metrics.statsd.install
|
||||||
|
|
||||||
|
.. autofunction:: sprockets.mixins.metrics.statsd.get_client
|
||||||
|
|
||||||
Testing Helpers
|
Testing Helpers
|
||||||
---------------
|
---------------
|
||||||
*So who actually tests that their metrics are emitted as they expect?*
|
*So who actually tests that their metrics are emitted as they expect?*
|
||||||
|
|
|
@ -7,6 +7,8 @@ Release History
|
||||||
---------------
|
---------------
|
||||||
- Add configuration documentation
|
- Add configuration documentation
|
||||||
- Exclude Tornado >6 (as-yet-unreleased version)
|
- Exclude Tornado >6 (as-yet-unreleased version)
|
||||||
|
- Add :func:`sprockets.mixins.metrics.statsd.get_client` function
|
||||||
|
- Add :meth:`sprockets.mixins.metrics.statsd.StatsDCollector.close` method
|
||||||
|
|
||||||
`4.0.0`_ (06-Feb-2019)
|
`4.0.0`_ (06-Feb-2019)
|
||||||
----------------------
|
----------------------
|
||||||
|
|
|
@ -1,3 +1,3 @@
|
||||||
coverage==4.5.2
|
coverage==4.5.4
|
||||||
flake8==3.6.0
|
flake8==3.7.8
|
||||||
nose==1.3.7
|
nose==1.3.7
|
||||||
|
|
|
@ -8,6 +8,9 @@ warning-is-error = 1
|
||||||
[check]
|
[check]
|
||||||
strict = 1
|
strict = 1
|
||||||
|
|
||||||
|
[coverage:report]
|
||||||
|
show_missing = 1
|
||||||
|
|
||||||
[nosetests]
|
[nosetests]
|
||||||
cover-package = sprockets.mixins.metrics
|
cover-package = sprockets.mixins.metrics
|
||||||
cover-branches = 1
|
cover-branches = 1
|
||||||
|
|
1
setup.py
1
setup.py
|
@ -32,6 +32,7 @@ setuptools.setup(
|
||||||
'Natural Language :: English',
|
'Natural Language :: English',
|
||||||
'Operating System :: OS Independent',
|
'Operating System :: OS Independent',
|
||||||
'Programming Language :: Python :: 3.7',
|
'Programming Language :: Python :: 3.7',
|
||||||
|
'Programming Language :: Python :: 3.8',
|
||||||
'Programming Language :: Python :: Implementation :: CPython',
|
'Programming Language :: Python :: Implementation :: CPython',
|
||||||
'Topic :: Internet :: WWW/HTTP',
|
'Topic :: Internet :: WWW/HTTP',
|
||||||
'Topic :: Software Development :: Libraries',
|
'Topic :: Software Development :: Libraries',
|
||||||
|
|
|
@ -29,7 +29,9 @@ class StatsdMixin:
|
||||||
:param path: elements of the metric path to record
|
:param path: elements of the metric path to record
|
||||||
|
|
||||||
"""
|
"""
|
||||||
self.application.statsd.send(path, duration * 1000.0, 'ms')
|
client = get_client(self.application)
|
||||||
|
if client is not None:
|
||||||
|
client.send(path, duration * 1000.0, 'ms')
|
||||||
|
|
||||||
def increase_counter(self, *path, **kwargs):
|
def increase_counter(self, *path, **kwargs):
|
||||||
"""Increase a counter.
|
"""Increase a counter.
|
||||||
|
@ -45,7 +47,9 @@ class StatsdMixin:
|
||||||
omitted, the counter is increased by one.
|
omitted, the counter is increased by one.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
self.application.statsd.send(path, kwargs.get('amount', '1'), 'c')
|
client = get_client(self.application)
|
||||||
|
if client is not None:
|
||||||
|
client.send(path, kwargs.get('amount', '1'), 'c')
|
||||||
|
|
||||||
@contextlib.contextmanager
|
@contextlib.contextmanager
|
||||||
def execution_timer(self, *path):
|
def execution_timer(self, *path):
|
||||||
|
@ -86,10 +90,9 @@ class StatsdMixin:
|
||||||
class StatsDCollector:
|
class StatsDCollector:
|
||||||
"""Collects and submits stats to StatsD.
|
"""Collects and submits stats to StatsD.
|
||||||
|
|
||||||
This class should be constructed using the
|
This class should be constructed using the :func:`.install` function.
|
||||||
:meth:`~sprockets.mixins.statsd.install` method. When installed,
|
When installed, it is attached to the :class:`~tornado.web.Application`
|
||||||
it is attached to the :class:`~tornado.web.Application` instance
|
instance for your web application.
|
||||||
for your web application.
|
|
||||||
|
|
||||||
:param str host: The StatsD host
|
:param str host: The StatsD host
|
||||||
:param str port: The StatsD port
|
:param str port: The StatsD port
|
||||||
|
@ -110,6 +113,7 @@ class StatsDCollector:
|
||||||
self._namespace = namespace
|
self._namespace = namespace
|
||||||
self._prepend_metric_type = prepend_metric_type
|
self._prepend_metric_type = prepend_metric_type
|
||||||
self._tcp_reconnect_sleep = 5
|
self._tcp_reconnect_sleep = 5
|
||||||
|
self._closing = False
|
||||||
|
|
||||||
if protocol == 'tcp':
|
if protocol == 'tcp':
|
||||||
self._tcp = True
|
self._tcp = True
|
||||||
|
@ -128,20 +132,25 @@ class StatsDCollector:
|
||||||
"""
|
"""
|
||||||
sock = iostream.IOStream(socket.socket(
|
sock = iostream.IOStream(socket.socket(
|
||||||
socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP))
|
socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP))
|
||||||
sock.connect(self._address, self._tcp_on_connected)
|
sock.connect(self._address)
|
||||||
sock.set_close_callback(self._tcp_on_closed)
|
sock.set_close_callback(self._tcp_on_closed)
|
||||||
return sock
|
return sock
|
||||||
|
|
||||||
async def _tcp_on_closed(self):
|
async def _tcp_on_closed(self):
|
||||||
"""Invoked when the socket is closed."""
|
"""Invoked when the socket is closed."""
|
||||||
LOGGER.warning('Not connected to statsd, connecting in %s seconds',
|
if self._closing:
|
||||||
self._tcp_reconnect_sleep)
|
LOGGER.info('Statsd socket closed')
|
||||||
await asyncio.sleep(self._tcp_reconnect_sleep)
|
else:
|
||||||
self._sock = self._tcp_socket()
|
LOGGER.warning('Not connected to statsd, connecting in %s seconds',
|
||||||
|
self._tcp_reconnect_sleep)
|
||||||
|
await asyncio.sleep(self._tcp_reconnect_sleep)
|
||||||
|
self._sock = self._tcp_socket()
|
||||||
|
|
||||||
def _tcp_on_connected(self):
|
def close(self):
|
||||||
"""Invoked when the IOStream is connected"""
|
"""Gracefully close the socket."""
|
||||||
LOGGER.debug('Connected to statsd at %s via TCP', self._address)
|
if not self._closing:
|
||||||
|
self._closing = True
|
||||||
|
self._sock.close()
|
||||||
|
|
||||||
def send(self, path, value, metric_type):
|
def send(self, path, value, metric_type):
|
||||||
"""Send a metric to Statsd.
|
"""Send a metric to Statsd.
|
||||||
|
@ -205,16 +214,16 @@ def install(application, **kwargs):
|
||||||
:param tornado.web.Application application: the application to
|
:param tornado.web.Application application: the application to
|
||||||
install the collector into.
|
install the collector into.
|
||||||
:param kwargs: keyword parameters to pass to the
|
:param kwargs: keyword parameters to pass to the
|
||||||
:class:`StatsDCollector` initializer.
|
:class:`.StatsDCollector` initializer.
|
||||||
:returns: :data:`True` if the client was installed successfully,
|
:returns: :data:`True` if the client was installed successfully,
|
||||||
or :data:`False` otherwise.
|
or :data:`False` otherwise.
|
||||||
|
|
||||||
- **host** The StatsD host. If host is not specified, the
|
- **host** The StatsD host. If host is not specified, the
|
||||||
``STATSD_HOST`` environment variable, or default `127.0.0.1`,
|
``STATSD_HOST`` environment variable, or default `127.0.0.1`,
|
||||||
will be pass into the :class:`StatsDCollector`.
|
will be pass into the :class:`.StatsDCollector`.
|
||||||
- **port** The StatsD port. If port is not specified, the
|
- **port** The StatsD port. If port is not specified, the
|
||||||
``STATSD_PORT`` environment variable, or default `8125`,
|
``STATSD_PORT`` environment variable, or default `8125`,
|
||||||
will be pass into the :class:`StatsDCollector`.
|
will be pass into the :class:`.StatsDCollector`.
|
||||||
- **namespace** The StatsD bucket to write metrics into.
|
- **namespace** The StatsD bucket to write metrics into.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
@ -232,3 +241,12 @@ def install(application, **kwargs):
|
||||||
|
|
||||||
setattr(application, 'statsd', StatsDCollector(**kwargs))
|
setattr(application, 'statsd', StatsDCollector(**kwargs))
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
def get_client(application):
|
||||||
|
"""Fetch the statsd client if it is installed.
|
||||||
|
|
||||||
|
:rtype: .StatsDCollector
|
||||||
|
|
||||||
|
"""
|
||||||
|
return getattr(application, 'statsd', None)
|
||||||
|
|
59
tests.py
59
tests.py
|
@ -1,8 +1,7 @@
|
||||||
import asyncio
|
import asyncio
|
||||||
import itertools
|
import itertools
|
||||||
import socket
|
import socket
|
||||||
import unittest
|
import unittest.mock
|
||||||
from unittest import mock
|
|
||||||
|
|
||||||
from tornado import iostream, testing, web
|
from tornado import iostream, testing, web
|
||||||
|
|
||||||
|
@ -48,7 +47,7 @@ class MisconfiguredStatsdMetricCollectionTests(testing.AsyncHTTPTestCase):
|
||||||
def test_bad_protocol_raises_ValueError(self):
|
def test_bad_protocol_raises_ValueError(self):
|
||||||
with self.assertRaises(ValueError):
|
with self.assertRaises(ValueError):
|
||||||
statsd.StatsDCollector(host='127.0.0.1',
|
statsd.StatsDCollector(host='127.0.0.1',
|
||||||
port=8125,
|
port='8125',
|
||||||
protocol='bad_protocol')
|
protocol='bad_protocol')
|
||||||
|
|
||||||
|
|
||||||
|
@ -75,13 +74,13 @@ class TCPStatsdMetricCollectionTests(testing.AsyncHTTPTestCase):
|
||||||
'protocol': 'tcp',
|
'protocol': 'tcp',
|
||||||
'prepend_metric_type': True})
|
'prepend_metric_type': True})
|
||||||
|
|
||||||
@mock.patch.object(iostream.IOStream, 'write')
|
@unittest.mock.patch.object(iostream.IOStream, 'write')
|
||||||
def test_write_not_executed_when_connection_is_closed(self, mock_write):
|
def test_write_not_executed_when_connection_is_closed(self, mock_write):
|
||||||
self.application.statsd._sock.close()
|
self.application.statsd._sock.close()
|
||||||
self.application.statsd.send('foo', 500, 'c')
|
self.application.statsd.send('foo', 500, 'c')
|
||||||
mock_write.assert_not_called()
|
mock_write.assert_not_called()
|
||||||
|
|
||||||
@mock.patch.object(iostream.IOStream, 'write')
|
@unittest.mock.patch.object(iostream.IOStream, 'write')
|
||||||
def test_expected_counters_data_written(self, mock_sock):
|
def test_expected_counters_data_written(self, mock_sock):
|
||||||
path = ('foo', 'bar')
|
path = ('foo', 'bar')
|
||||||
value = 500
|
value = 500
|
||||||
|
@ -94,7 +93,7 @@ class TCPStatsdMetricCollectionTests(testing.AsyncHTTPTestCase):
|
||||||
self.application.statsd.send(path, value, metric_type)
|
self.application.statsd.send(path, value, metric_type)
|
||||||
mock_sock.assert_called_once_with(expected.encode())
|
mock_sock.assert_called_once_with(expected.encode())
|
||||||
|
|
||||||
@mock.patch.object(iostream.IOStream, 'write')
|
@unittest.mock.patch.object(iostream.IOStream, 'write')
|
||||||
def test_expected_timers_data_written(self, mock_sock):
|
def test_expected_timers_data_written(self, mock_sock):
|
||||||
path = ('foo', 'bar')
|
path = ('foo', 'bar')
|
||||||
value = 500
|
value = 500
|
||||||
|
@ -156,6 +155,43 @@ class TCPStatsdMetricCollectionTests(testing.AsyncHTTPTestCase):
|
||||||
self.assertEqual(expected,
|
self.assertEqual(expected,
|
||||||
list(self.statsd.find_metrics(expected, 'ms'))[0][0])
|
list(self.statsd.find_metrics(expected, 'ms'))[0][0])
|
||||||
|
|
||||||
|
def test_reconnect_logic(self):
|
||||||
|
self.application.statsd._tcp_reconnect_sleep = 0.05
|
||||||
|
self.application.statsd._sock.close()
|
||||||
|
asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.075))
|
||||||
|
response = self.fetch('/status_code')
|
||||||
|
self.assertEqual(response.code, 200)
|
||||||
|
|
||||||
|
def test_that_mixin_works_without_client(self):
|
||||||
|
self.application.statsd.close()
|
||||||
|
delattr(self.application, 'statsd')
|
||||||
|
|
||||||
|
response = self.fetch('/', method='POST', body='')
|
||||||
|
self.assertEqual(response.code, 204)
|
||||||
|
|
||||||
|
def test_that_client_closes_socket(self):
|
||||||
|
response = self.fetch('/status_code')
|
||||||
|
self.assertEqual(response.code, 200)
|
||||||
|
|
||||||
|
self.application.statsd.close()
|
||||||
|
response = self.fetch('/status_code')
|
||||||
|
self.assertEqual(response.code, 200)
|
||||||
|
self.assertTrue(self.application.statsd._sock.closed())
|
||||||
|
|
||||||
|
def test_that_client_can_be_closed_multiple_times(self):
|
||||||
|
response = self.fetch('/status_code')
|
||||||
|
self.assertEqual(response.code, 200)
|
||||||
|
|
||||||
|
self.application.statsd.close()
|
||||||
|
response = self.fetch('/status_code')
|
||||||
|
self.assertEqual(response.code, 200)
|
||||||
|
self.assertTrue(self.application.statsd._sock.closed())
|
||||||
|
|
||||||
|
self.application.statsd.close()
|
||||||
|
response = self.fetch('/status_code')
|
||||||
|
self.assertEqual(response.code, 200)
|
||||||
|
self.assertTrue(self.application.statsd._sock.closed())
|
||||||
|
|
||||||
|
|
||||||
class TCPStatsdConfigurationTests(testing.AsyncHTTPTestCase):
|
class TCPStatsdConfigurationTests(testing.AsyncHTTPTestCase):
|
||||||
|
|
||||||
|
@ -223,7 +259,7 @@ class UDPStatsdMetricCollectionTests(testing.AsyncHTTPTestCase):
|
||||||
self.statsd.close()
|
self.statsd.close()
|
||||||
super().tearDown()
|
super().tearDown()
|
||||||
|
|
||||||
@mock.patch.object(socket.socket, 'sendto')
|
@unittest.mock.patch.object(socket.socket, 'sendto')
|
||||||
def test_expected_counters_data_written(self, mock_sock):
|
def test_expected_counters_data_written(self, mock_sock):
|
||||||
path = ('foo', 'bar')
|
path = ('foo', 'bar')
|
||||||
value = 500
|
value = 500
|
||||||
|
@ -238,7 +274,7 @@ class UDPStatsdMetricCollectionTests(testing.AsyncHTTPTestCase):
|
||||||
expected.encode(),
|
expected.encode(),
|
||||||
(self.statsd.sockaddr[0], self.statsd.sockaddr[1]))
|
(self.statsd.sockaddr[0], self.statsd.sockaddr[1]))
|
||||||
|
|
||||||
@mock.patch.object(socket.socket, 'sendto')
|
@unittest.mock.patch.object(socket.socket, 'sendto')
|
||||||
def test_expected_timers_data_written(self, mock_sock):
|
def test_expected_timers_data_written(self, mock_sock):
|
||||||
path = ('foo', 'bar')
|
path = ('foo', 'bar')
|
||||||
value = 500
|
value = 500
|
||||||
|
@ -302,6 +338,13 @@ class UDPStatsdMetricCollectionTests(testing.AsyncHTTPTestCase):
|
||||||
self.assertEqual(expected,
|
self.assertEqual(expected,
|
||||||
list(self.statsd.find_metrics(expected, 'ms'))[0][0])
|
list(self.statsd.find_metrics(expected, 'ms'))[0][0])
|
||||||
|
|
||||||
|
def test_that_mixin_works_without_client(self):
|
||||||
|
self.application.statsd.close()
|
||||||
|
delattr(self.application, 'statsd')
|
||||||
|
|
||||||
|
response = self.fetch('/', method='POST', body='')
|
||||||
|
self.assertEqual(response.code, 204)
|
||||||
|
|
||||||
|
|
||||||
class UDPStatsdConfigurationTests(testing.AsyncHTTPTestCase):
|
class UDPStatsdConfigurationTests(testing.AsyncHTTPTestCase):
|
||||||
|
|
||||||
|
|
12
tox.ini
12
tox.ini
|
@ -1,8 +1,18 @@
|
||||||
[tox]
|
[tox]
|
||||||
envlist = py37
|
envlist = py37,tornado5,tornado6
|
||||||
toxworkdir = build/tox
|
toxworkdir = build/tox
|
||||||
skip_missing_interpreters = True
|
skip_missing_interpreters = True
|
||||||
|
|
||||||
[testenv]
|
[testenv]
|
||||||
deps = -r requires/testing.txt
|
deps = -r requires/testing.txt
|
||||||
commands = nosetests
|
commands = nosetests
|
||||||
|
|
||||||
|
[testenv:tornado5]
|
||||||
|
deps =
|
||||||
|
tornado>=5,<6
|
||||||
|
-r requires/testing.txt
|
||||||
|
|
||||||
|
[testenv:tornado6]
|
||||||
|
deps =
|
||||||
|
tornado>=6,<7
|
||||||
|
-r requires/testing.txt
|
||||||
|
|
Loading…
Reference in a new issue