diff --git a/setup.cfg b/setup.cfg index ed40628..cbab8e8 100644 --- a/setup.cfg +++ b/setup.cfg @@ -4,3 +4,11 @@ universal = 1 [nosetests] cover-package = sprockets.mixins.metrics cover-branches = 1 +cover-erase = 1 +cover-html = 1 +cover-html-dir = build/coverage +cover-xml = 1 +match = ((?:^|[\b_.-])(:?[Tt]est|When|should|[Dd]escribe)) +verbosity = 2 +with-coverage = 1 +with-xunit = 1 diff --git a/sprockets/mixins/metrics/statsd.py b/sprockets/mixins/metrics/statsd.py index 3889e2c..f0e3817 100644 --- a/sprockets/mixins/metrics/statsd.py +++ b/sprockets/mixins/metrics/statsd.py @@ -4,7 +4,7 @@ import os import socket import time -from tornado import iostream +from tornado import gen, iostream LOGGER = logging.getLogger(__name__) @@ -120,12 +120,15 @@ class StatsDCollector(object): self._address = (self._host, self._port) self._namespace = namespace self._prepend_metric_type = prepend_metric_type + self._tcp_reconnect_sleep = 5 if protocol == 'tcp': self._tcp = True + self._msg_format = '{path}:{value}|{metric_type}\n' self._sock = self._tcp_socket() elif protocol == 'udp': self._tcp = False + self._msg_format = '{path}:{value}|{metric_type}' self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0) else: raise ValueError('Invalid protocol: {}'.format(protocol)) @@ -135,17 +138,17 @@ class StatsDCollector(object): :rtype: iostream.IOStream """ sock = iostream.IOStream(socket.socket( - socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP)) - try: - sock.connect(self._address, self._tcp_on_connected) - except (OSError, socket.error) as error: - LOGGER.error('Failed to connect via TCP: %s', error) + socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP)) + sock.connect(self._address, self._tcp_on_connected) sock.set_close_callback(self._tcp_on_closed) return sock + @gen.engine def _tcp_on_closed(self): """Invoked when the socket is closed.""" - LOGGER.warning('Disconnected from statsd, reconnecting') + LOGGER.warning('Not connected to statsd, connecting in %s seconds', + self._tcp_reconnect_sleep) + yield gen.sleep(self._tcp_reconnect_sleep) self._sock = self._tcp_socket() def _tcp_on_connected(self): @@ -160,17 +163,23 @@ class StatsDCollector(object): :param str metric_type: The metric type """ - msg = '{0}:{1}|{2}'.format( - self._build_path(path, metric_type), value, metric_type) + msg = self._msg_format.format( + path=self._build_path(path, metric_type), + value=value, + metric_type=metric_type) LOGGER.debug('Sending %s to %s:%s', msg.encode('ascii'), self._host, self._port) try: if self._tcp: + if self._sock.closed(): + return return self._sock.write(msg.encode('ascii')) self._sock.sendto(msg.encode('ascii'), (self._host, self._port)) + except iostream.StreamClosedError as error: # pragma: nocover + LOGGER.warning('Error sending TCP statsd metric: %s', error) except (OSError, socket.error) as error: # pragma: nocover LOGGER.exception('Error sending statsd metric: %s', error) diff --git a/sprockets/mixins/metrics/testing.py b/sprockets/mixins/metrics/testing.py index 838a3d4..f50acdb 100644 --- a/sprockets/mixins/metrics/testing.py +++ b/sprockets/mixins/metrics/testing.py @@ -30,7 +30,7 @@ class FakeStatsdServer(tcpserver.TCPServer): """ - PATTERN = br'(?P[^:]*):(?P[^|]*)\|(?P.*)$' + TCP_PATTERN = br'(?P[^:]*):(?P[^|]*)\|(?P.*)\n$' def __init__(self, iol, protocol='udp'): self.datagrams = [] @@ -71,7 +71,7 @@ class FakeStatsdServer(tcpserver.TCPServer): def handle_stream(self, stream, address): while True: try: - result = yield stream.read_until_regex(self.PATTERN) + result = yield stream.read_until_regex(self.TCP_PATTERN) except iostream.StreamClosedError: break else: diff --git a/tests.py b/tests.py index 4b0f2f2..2161d9e 100644 --- a/tests.py +++ b/tests.py @@ -1,4 +1,5 @@ import base64 +import itertools import logging import os import socket @@ -6,8 +7,9 @@ import time import unittest import uuid -from tornado import gen, testing, web +from tornado import gen, iostream, testing, web import mock +from mock import patch from sprockets.mixins.metrics import influxdb, statsd from sprockets.mixins.metrics.testing import ( @@ -42,6 +44,22 @@ def assert_between(low, value, high): value, low, high)) +class MisconfiguredStatsdMetricCollectionTests(testing.AsyncHTTPTestCase): + + def get_app(self): + self.application = web.Application([ + web.url('/', examples.statsd.SimpleHandler), + web.url('/counters/(.*)/([.0-9]*)', CounterBumper), + web.url('/status_code', DefaultStatusCode), + ]) + + def test_bad_protocol_raises_ValueError(self): + with self.assertRaises(ValueError): + statsd.StatsDCollector(host='127.0.0.1', + port=8125, + protocol='bad_protocol') + + class TCPStatsdMetricCollectionTests(testing.AsyncHTTPTestCase): def get_app(self): @@ -54,15 +72,65 @@ class TCPStatsdMetricCollectionTests(testing.AsyncHTTPTestCase): def setUp(self): self.application = None + self.namespace = 'testing' + super(TCPStatsdMetricCollectionTests, self).setUp() self.statsd = FakeStatsdServer(self.io_loop, protocol='tcp') - statsd.install(self.application, **{'namespace': 'testing', + statsd.install(self.application, **{'namespace': self.namespace, 'host': self.statsd.sockaddr[0], 'port': self.statsd.sockaddr[1], 'protocol': 'tcp', 'prepend_metric_type': True}) + def test_tcp_reconnect_on_stream_close(self): + path_sleep = 'tornado.gen.sleep' + path_statsd = self.application.statsd + with mock.patch(path_sleep) as gen_sleep, \ + patch.object(path_statsd, '_tcp_socket') as mock_tcp_socket: + f = web.Future() + f.set_result(None) + gen_sleep.return_value = f + + self.application.statsd._tcp_on_closed() + mock_tcp_socket.assert_called_once_with() + + @patch.object(iostream.IOStream, 'write') + def test_write_not_executed_when_connection_is_closed(self, mock_write): + self.application.statsd._sock.close() + self.application.statsd.send('foo', 500, 'c') + mock_write.assert_not_called() + + @patch.object(iostream.IOStream, 'write') + def test_expected_counters_data_written(self, mock_sock): + path = ('foo', 'bar') + value = 500 + metric_type = 'c' + expected = "{}:{}|{}\n".format('.'.join( + itertools.chain((self.namespace, 'counters'), path)), + value, + metric_type) + + self.application.statsd.send(path, value, metric_type) + mock_sock.assert_called_once_with(expected.encode()) + + @patch.object(iostream.IOStream, 'write') + def test_expected_timers_data_written(self, mock_sock): + path = ('foo', 'bar') + value = 500 + metric_type = 'ms' + expected = "{}:{}|{}\n".format('.'.join( + itertools.chain((self.namespace, 'timers'), path)), + value, + metric_type) + + self.application.statsd.send(path, value, metric_type) + mock_sock.assert_called_once_with(expected.encode()) + + def test_tcp_message_format(self): + expected = '{path}:{value}|{metric_type}\n' + self.assertEqual(self.application.statsd._msg_format, expected) + def test_that_http_method_call_is_recorded(self): response = self.fetch('/') self.assertEqual(response.code, 204) @@ -120,10 +188,12 @@ class TCPStatsdConfigurationTests(testing.AsyncHTTPTestCase): def setUp(self): self.application = None + self.namespace = 'testing' + super(TCPStatsdConfigurationTests, self).setUp() self.statsd = FakeStatsdServer(self.io_loop, protocol='tcp') - statsd.install(self.application, **{'namespace': 'testing', + statsd.install(self.application, **{'namespace': self.namespace, 'host': self.statsd.sockaddr[0], 'port': self.statsd.sockaddr[1], 'protocol': 'tcp', @@ -158,10 +228,12 @@ class UDPStatsdMetricCollectionTests(testing.AsyncHTTPTestCase): def setUp(self): self.application = None + self.namespace = 'testing' + super(UDPStatsdMetricCollectionTests, self).setUp() self.statsd = FakeStatsdServer(self.io_loop, protocol='udp') - statsd.install(self.application, **{'namespace': 'testing', + statsd.install(self.application, **{'namespace': self.namespace, 'host': self.statsd.sockaddr[0], 'port': self.statsd.sockaddr[1], 'protocol': 'udp', @@ -171,6 +243,40 @@ class UDPStatsdMetricCollectionTests(testing.AsyncHTTPTestCase): self.statsd.close() super(UDPStatsdMetricCollectionTests, self).tearDown() + @patch.object(socket.socket, 'sendto') + def test_expected_counters_data_written(self, mock_sock): + path = ('foo', 'bar') + value = 500 + metric_type = 'c' + expected = "{}:{}|{}".format('.'.join( + itertools.chain((self.namespace, 'counters'), path)), + value, + metric_type) + + self.application.statsd.send(path, value, metric_type) + mock_sock.assert_called_once_with( + expected.encode(), + (self.statsd.sockaddr[0], self.statsd.sockaddr[1])) + + @patch.object(socket.socket, 'sendto') + def test_expected_timers_data_written(self, mock_sock): + path = ('foo', 'bar') + value = 500 + metric_type = 'ms' + expected = "{}:{}|{}".format('.'.join( + itertools.chain((self.namespace, 'timers'), path)), + value, + metric_type) + + self.application.statsd.send(path, value, metric_type) + mock_sock.assert_called_once_with( + expected.encode(), + (self.statsd.sockaddr[0], self.statsd.sockaddr[1])) + + def test_udp_message_format(self): + expected = '{path}:{value}|{metric_type}' + self.assertEqual(self.application.statsd._msg_format, expected) + def test_that_http_method_call_is_recorded(self): response = self.fetch('/') self.assertEqual(response.code, 204) @@ -228,10 +334,12 @@ class UDPStatsdConfigurationTests(testing.AsyncHTTPTestCase): def setUp(self): self.application = None + self.namespace = 'testing' + super(UDPStatsdConfigurationTests, self).setUp() self.statsd = FakeStatsdServer(self.io_loop, protocol='udp') - statsd.install(self.application, **{'namespace': 'testing', + statsd.install(self.application, **{'namespace': self.namespace, 'host': self.statsd.sockaddr[0], 'port': self.statsd.sockaddr[1], 'protocol': 'udp',