From 6fd2cf40352517839b854b627f4b04c3a6551d19 Mon Sep 17 00:00:00 2001 From: Dan g Date: Fri, 27 Jul 2018 10:14:32 -0400 Subject: [PATCH 1/8] MOBILE-9220 added newline to tcp messages, added tests --- sprockets/mixins/metrics/statsd.py | 7 +++++++ tests.py | 13 +++++++++++++ 2 files changed, 20 insertions(+) diff --git a/sprockets/mixins/metrics/statsd.py b/sprockets/mixins/metrics/statsd.py index 3889e2c..41d3607 100644 --- a/sprockets/mixins/metrics/statsd.py +++ b/sprockets/mixins/metrics/statsd.py @@ -162,6 +162,7 @@ class StatsDCollector(object): """ msg = '{0}:{1}|{2}'.format( self._build_path(path, metric_type), value, metric_type) + msg = self._build_udp_or_tcp_message(msg) LOGGER.debug('Sending %s to %s:%s', msg.encode('ascii'), self._host, self._port) @@ -174,6 +175,12 @@ class StatsDCollector(object): except (OSError, socket.error) as error: # pragma: nocover LOGGER.exception('Error sending statsd metric: %s', error) + def _build_udp_or_tcp_message(self, msg): + if self._tcp is False: + return msg + + return msg + "\n" + def _build_path(self, path, metric_type): """Return a normalized path. diff --git a/tests.py b/tests.py index 4b0f2f2..cb2b024 100644 --- a/tests.py +++ b/tests.py @@ -63,6 +63,13 @@ class TCPStatsdMetricCollectionTests(testing.AsyncHTTPTestCase): 'protocol': 'tcp', 'prepend_metric_type': True}) + def test_that_tcp_message_appends_a_newline(self): + orig = 'testing.timers.SimpleHandler.GET.204' + expected = 'testing.timers.SimpleHandler.GET.204\n' + + msg = self.application.statsd._build_udp_or_tcp_message(orig) + self.assertEqual(msg, expected) + def test_that_http_method_call_is_recorded(self): response = self.fetch('/') self.assertEqual(response.code, 204) @@ -171,6 +178,12 @@ class UDPStatsdMetricCollectionTests(testing.AsyncHTTPTestCase): self.statsd.close() super(UDPStatsdMetricCollectionTests, self).tearDown() + def test_that_udp_message_is_unchanged(self): + expected = 'testing.timers.SimpleHandler.GET.204' + + msg = self.application.statsd._build_udp_or_tcp_message(expected) + self.assertEqual(msg, expected) + def test_that_http_method_call_is_recorded(self): response = self.fetch('/') self.assertEqual(response.code, 204) From 5599e9f73bf701c873c630cbceb981b7c915d69d Mon Sep 17 00:00:00 2001 From: Dan g Date: Fri, 27 Jul 2018 13:51:02 -0400 Subject: [PATCH 2/8] Set format strings ass attributes, update tests --- sprockets/mixins/metrics/statsd.py | 15 ++++++--------- tests.py | 17 ++++++----------- 2 files changed, 12 insertions(+), 20 deletions(-) diff --git a/sprockets/mixins/metrics/statsd.py b/sprockets/mixins/metrics/statsd.py index 41d3607..ff187b8 100644 --- a/sprockets/mixins/metrics/statsd.py +++ b/sprockets/mixins/metrics/statsd.py @@ -123,9 +123,11 @@ class StatsDCollector(object): if protocol == 'tcp': self._tcp = True + self._msg_format = '{path}:{value}|{metric_type}\n' self._sock = self._tcp_socket() elif protocol == 'udp': self._tcp = False + self._msg_format = '{path}:{value}|{metric_type}' self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0) else: raise ValueError('Invalid protocol: {}'.format(protocol)) @@ -160,9 +162,10 @@ class StatsDCollector(object): :param str metric_type: The metric type """ - msg = '{0}:{1}|{2}'.format( - self._build_path(path, metric_type), value, metric_type) - msg = self._build_udp_or_tcp_message(msg) + msg = self._msg_format.format( + path=self._build_path(path, metric_type), + value=value, + metric_type=metric_type) LOGGER.debug('Sending %s to %s:%s', msg.encode('ascii'), self._host, self._port) @@ -175,12 +178,6 @@ class StatsDCollector(object): except (OSError, socket.error) as error: # pragma: nocover LOGGER.exception('Error sending statsd metric: %s', error) - def _build_udp_or_tcp_message(self, msg): - if self._tcp is False: - return msg - - return msg + "\n" - def _build_path(self, path, metric_type): """Return a normalized path. diff --git a/tests.py b/tests.py index cb2b024..9e5519a 100644 --- a/tests.py +++ b/tests.py @@ -63,12 +63,9 @@ class TCPStatsdMetricCollectionTests(testing.AsyncHTTPTestCase): 'protocol': 'tcp', 'prepend_metric_type': True}) - def test_that_tcp_message_appends_a_newline(self): - orig = 'testing.timers.SimpleHandler.GET.204' - expected = 'testing.timers.SimpleHandler.GET.204\n' - - msg = self.application.statsd._build_udp_or_tcp_message(orig) - self.assertEqual(msg, expected) + def test_tcp_message_format(self): + expected = '{path}:{value}|{metric_type}\n' + self.assertEqual(self.application.statsd._msg_format, expected) def test_that_http_method_call_is_recorded(self): response = self.fetch('/') @@ -178,11 +175,9 @@ class UDPStatsdMetricCollectionTests(testing.AsyncHTTPTestCase): self.statsd.close() super(UDPStatsdMetricCollectionTests, self).tearDown() - def test_that_udp_message_is_unchanged(self): - expected = 'testing.timers.SimpleHandler.GET.204' - - msg = self.application.statsd._build_udp_or_tcp_message(expected) - self.assertEqual(msg, expected) + def test_udp_message_format(self): + expected = '{path}:{value}|{metric_type}' + self.assertEqual(self.application.statsd._msg_format, expected) def test_that_http_method_call_is_recorded(self): response = self.fetch('/') From b24a3699a5be38a478d66ea9dde95ca1785ca2d2 Mon Sep 17 00:00:00 2001 From: Dan g Date: Fri, 27 Jul 2018 16:28:02 -0400 Subject: [PATCH 3/8] Fakeserver check for \n in tcp stream, const name changed to TCP_PATTERN --- sprockets/mixins/metrics/testing.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sprockets/mixins/metrics/testing.py b/sprockets/mixins/metrics/testing.py index 838a3d4..f50acdb 100644 --- a/sprockets/mixins/metrics/testing.py +++ b/sprockets/mixins/metrics/testing.py @@ -30,7 +30,7 @@ class FakeStatsdServer(tcpserver.TCPServer): """ - PATTERN = br'(?P[^:]*):(?P[^|]*)\|(?P.*)$' + TCP_PATTERN = br'(?P[^:]*):(?P[^|]*)\|(?P.*)\n$' def __init__(self, iol, protocol='udp'): self.datagrams = [] @@ -71,7 +71,7 @@ class FakeStatsdServer(tcpserver.TCPServer): def handle_stream(self, stream, address): while True: try: - result = yield stream.read_until_regex(self.PATTERN) + result = yield stream.read_until_regex(self.TCP_PATTERN) except iostream.StreamClosedError: break else: From 924735c24ed9fde6b7b92c69c616575083c5b45f Mon Sep 17 00:00:00 2001 From: Dan g Date: Tue, 31 Jul 2018 14:27:16 -0400 Subject: [PATCH 4/8] Add tcp & udp tests for data sent --- tests.py | 75 ++++++++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 70 insertions(+), 5 deletions(-) diff --git a/tests.py b/tests.py index 9e5519a..b856bcd 100644 --- a/tests.py +++ b/tests.py @@ -4,9 +4,10 @@ import os import socket import time import unittest +from unittest.mock import patch import uuid -from tornado import gen, testing, web +from tornado import gen, iostream, testing, web import mock from sprockets.mixins.metrics import influxdb, statsd @@ -54,15 +55,43 @@ class TCPStatsdMetricCollectionTests(testing.AsyncHTTPTestCase): def setUp(self): self.application = None + self.namespace = 'testing' + super(TCPStatsdMetricCollectionTests, self).setUp() self.statsd = FakeStatsdServer(self.io_loop, protocol='tcp') - statsd.install(self.application, **{'namespace': 'testing', + statsd.install(self.application, **{'namespace': self.namespace, 'host': self.statsd.sockaddr[0], 'port': self.statsd.sockaddr[1], 'protocol': 'tcp', 'prepend_metric_type': True}) + @patch.object(iostream.IOStream, 'write') + def test_expected_counters_data_written(self, mock_sock): + path = ('foo', 'bar') + value = 500 + metric_type = 'c' + expected = "{}:{}|{}\n".format( + '.'.join((self.namespace, 'counters', *path)), + value, + metric_type) + + self.application.statsd.send(path, value, metric_type) + mock_sock.assert_called_once_with(expected.encode()) + + @patch.object(iostream.IOStream, 'write') + def test_expected_timers_data_written(self, mock_sock): + path = ('foo', 'bar') + value = 500 + metric_type = 'ms' + expected = "{}:{}|{}\n".format( + '.'.join((self.namespace, 'timers', *path)), + value, + metric_type) + + self.application.statsd.send(path, value, metric_type) + mock_sock.assert_called_once_with(expected.encode()) + def test_tcp_message_format(self): expected = '{path}:{value}|{metric_type}\n' self.assertEqual(self.application.statsd._msg_format, expected) @@ -124,10 +153,12 @@ class TCPStatsdConfigurationTests(testing.AsyncHTTPTestCase): def setUp(self): self.application = None + self.namespace = 'testing' + super(TCPStatsdConfigurationTests, self).setUp() self.statsd = FakeStatsdServer(self.io_loop, protocol='tcp') - statsd.install(self.application, **{'namespace': 'testing', + statsd.install(self.application, **{'namespace': self.namespace, 'host': self.statsd.sockaddr[0], 'port': self.statsd.sockaddr[1], 'protocol': 'tcp', @@ -162,10 +193,12 @@ class UDPStatsdMetricCollectionTests(testing.AsyncHTTPTestCase): def setUp(self): self.application = None + self.namespace = 'testing' + super(UDPStatsdMetricCollectionTests, self).setUp() self.statsd = FakeStatsdServer(self.io_loop, protocol='udp') - statsd.install(self.application, **{'namespace': 'testing', + statsd.install(self.application, **{'namespace': self.namespace, 'host': self.statsd.sockaddr[0], 'port': self.statsd.sockaddr[1], 'protocol': 'udp', @@ -175,6 +208,36 @@ class UDPStatsdMetricCollectionTests(testing.AsyncHTTPTestCase): self.statsd.close() super(UDPStatsdMetricCollectionTests, self).tearDown() + @patch.object(socket.socket, 'sendto') + def test_expected_counters_data_written(self, mock_sock): + path = ('foo', 'bar') + value = 500 + metric_type = 'c' + expected = "{}:{}|{}".format( + '.'.join((self.namespace, 'counters', *path)), + value, + metric_type) + + self.application.statsd.send(path, value, metric_type) + mock_sock.assert_called_once_with( + expected.encode(), + (self.statsd.sockaddr[0], self.statsd.sockaddr[1])) + + @patch.object(socket.socket, 'sendto') + def test_expected_timers_data_written(self, mock_sock): + path = ('foo', 'bar') + value = 500 + metric_type = 'ms' + expected = "{}:{}|{}".format( + '.'.join((self.namespace, 'timers', *path)), + value, + metric_type) + + self.application.statsd.send(path, value, metric_type) + mock_sock.assert_called_once_with( + expected.encode(), + (self.statsd.sockaddr[0], self.statsd.sockaddr[1])) + def test_udp_message_format(self): expected = '{path}:{value}|{metric_type}' self.assertEqual(self.application.statsd._msg_format, expected) @@ -236,10 +299,12 @@ class UDPStatsdConfigurationTests(testing.AsyncHTTPTestCase): def setUp(self): self.application = None + self.namespace = 'testing' + super(UDPStatsdConfigurationTests, self).setUp() self.statsd = FakeStatsdServer(self.io_loop, protocol='udp') - statsd.install(self.application, **{'namespace': 'testing', + statsd.install(self.application, **{'namespace': self.namespace, 'host': self.statsd.sockaddr[0], 'port': self.statsd.sockaddr[1], 'protocol': 'udp', From 03ba3b7125fb71f27556ed6473fde1c607fd8935 Mon Sep 17 00:00:00 2001 From: Dan g Date: Tue, 31 Jul 2018 14:52:46 -0400 Subject: [PATCH 5/8] replace list dereferencing with itertools.chain for py2 --- tests.py | 29 +++++++++++++---------------- 1 file changed, 13 insertions(+), 16 deletions(-) diff --git a/tests.py b/tests.py index b856bcd..fbe5b7b 100644 --- a/tests.py +++ b/tests.py @@ -1,4 +1,5 @@ import base64 +import itertools import logging import os import socket @@ -71,10 +72,9 @@ class TCPStatsdMetricCollectionTests(testing.AsyncHTTPTestCase): path = ('foo', 'bar') value = 500 metric_type = 'c' - expected = "{}:{}|{}\n".format( - '.'.join((self.namespace, 'counters', *path)), - value, - metric_type) + expected = "{}:{}|{}\n".format('.'.join(itertools.chain((self.namespace, 'counters'), path)), + value, + metric_type) self.application.statsd.send(path, value, metric_type) mock_sock.assert_called_once_with(expected.encode()) @@ -84,10 +84,9 @@ class TCPStatsdMetricCollectionTests(testing.AsyncHTTPTestCase): path = ('foo', 'bar') value = 500 metric_type = 'ms' - expected = "{}:{}|{}\n".format( - '.'.join((self.namespace, 'timers', *path)), - value, - metric_type) + expected = "{}:{}|{}\n".format('.'.join(itertools.chain((self.namespace, 'timers'), path)), + value, + metric_type) self.application.statsd.send(path, value, metric_type) mock_sock.assert_called_once_with(expected.encode()) @@ -213,10 +212,9 @@ class UDPStatsdMetricCollectionTests(testing.AsyncHTTPTestCase): path = ('foo', 'bar') value = 500 metric_type = 'c' - expected = "{}:{}|{}".format( - '.'.join((self.namespace, 'counters', *path)), - value, - metric_type) + expected = "{}:{}|{}".format('.'.join(itertools.chain((self.namespace, 'counters'), path)), + value, + metric_type) self.application.statsd.send(path, value, metric_type) mock_sock.assert_called_once_with( @@ -228,10 +226,9 @@ class UDPStatsdMetricCollectionTests(testing.AsyncHTTPTestCase): path = ('foo', 'bar') value = 500 metric_type = 'ms' - expected = "{}:{}|{}".format( - '.'.join((self.namespace, 'timers', *path)), - value, - metric_type) + expected = "{}:{}|{}".format('.'.join(itertools.chain((self.namespace, 'timers'), path)), + value, + metric_type) self.application.statsd.send(path, value, metric_type) mock_sock.assert_called_once_with( From 6414edf5ab3db67cc1e210f5e266a846fb7805a7 Mon Sep 17 00:00:00 2001 From: Dan g Date: Tue, 31 Jul 2018 15:16:33 -0400 Subject: [PATCH 6/8] import correct patch for py2 compatibility --- tests.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests.py b/tests.py index fbe5b7b..9265361 100644 --- a/tests.py +++ b/tests.py @@ -5,11 +5,11 @@ import os import socket import time import unittest -from unittest.mock import patch import uuid from tornado import gen, iostream, testing, web import mock +from mock import patch from sprockets.mixins.metrics import influxdb, statsd from sprockets.mixins.metrics.testing import ( From 6176ab654dccf03be6665b2d9db8449af9b1c4c2 Mon Sep 17 00:00:00 2001 From: Dan g Date: Thu, 2 Aug 2018 10:22:30 -0400 Subject: [PATCH 7/8] - add non-blocking sleep to reconnect - catch iostream.StreamClosedError - short circuit write on a closed tcp stream - flake8 formatting --- sprockets/mixins/metrics/statsd.py | 19 ++++++++++++------- tests.py | 28 ++++++++++++++++------------ 2 files changed, 28 insertions(+), 19 deletions(-) diff --git a/sprockets/mixins/metrics/statsd.py b/sprockets/mixins/metrics/statsd.py index ff187b8..2d3d972 100644 --- a/sprockets/mixins/metrics/statsd.py +++ b/sprockets/mixins/metrics/statsd.py @@ -4,7 +4,7 @@ import os import socket import time -from tornado import iostream +from tornado import gen, iostream LOGGER = logging.getLogger(__name__) @@ -137,17 +137,18 @@ class StatsDCollector(object): :rtype: iostream.IOStream """ sock = iostream.IOStream(socket.socket( - socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP)) - try: - sock.connect(self._address, self._tcp_on_connected) - except (OSError, socket.error) as error: - LOGGER.error('Failed to connect via TCP: %s', error) + socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP)) + sock.connect(self._address, self._tcp_on_connected) sock.set_close_callback(self._tcp_on_closed) return sock + @gen.engine def _tcp_on_closed(self): """Invoked when the socket is closed.""" - LOGGER.warning('Disconnected from statsd, reconnecting') + sleep = 5 + LOGGER.warning('Not connected to statsd, connecting in %s seconds', + sleep) + yield gen.sleep(sleep) self._sock = self._tcp_socket() def _tcp_on_connected(self): @@ -172,9 +173,13 @@ class StatsDCollector(object): try: if self._tcp: + if self._sock.closed(): + return return self._sock.write(msg.encode('ascii')) self._sock.sendto(msg.encode('ascii'), (self._host, self._port)) + except iostream.StreamClosedError as error: # pragma: nocover + LOGGER.warning('Error sending TCP statsd metric: %s', error) except (OSError, socket.error) as error: # pragma: nocover LOGGER.exception('Error sending statsd metric: %s', error) diff --git a/tests.py b/tests.py index 9265361..17c6d4e 100644 --- a/tests.py +++ b/tests.py @@ -72,9 +72,10 @@ class TCPStatsdMetricCollectionTests(testing.AsyncHTTPTestCase): path = ('foo', 'bar') value = 500 metric_type = 'c' - expected = "{}:{}|{}\n".format('.'.join(itertools.chain((self.namespace, 'counters'), path)), - value, - metric_type) + expected = "{}:{}|{}\n".format('.'.join( + itertools.chain((self.namespace, 'counters'), path)), + value, + metric_type) self.application.statsd.send(path, value, metric_type) mock_sock.assert_called_once_with(expected.encode()) @@ -84,9 +85,10 @@ class TCPStatsdMetricCollectionTests(testing.AsyncHTTPTestCase): path = ('foo', 'bar') value = 500 metric_type = 'ms' - expected = "{}:{}|{}\n".format('.'.join(itertools.chain((self.namespace, 'timers'), path)), - value, - metric_type) + expected = "{}:{}|{}\n".format('.'.join( + itertools.chain((self.namespace, 'timers'), path)), + value, + metric_type) self.application.statsd.send(path, value, metric_type) mock_sock.assert_called_once_with(expected.encode()) @@ -212,9 +214,10 @@ class UDPStatsdMetricCollectionTests(testing.AsyncHTTPTestCase): path = ('foo', 'bar') value = 500 metric_type = 'c' - expected = "{}:{}|{}".format('.'.join(itertools.chain((self.namespace, 'counters'), path)), - value, - metric_type) + expected = "{}:{}|{}".format('.'.join( + itertools.chain((self.namespace, 'counters'), path)), + value, + metric_type) self.application.statsd.send(path, value, metric_type) mock_sock.assert_called_once_with( @@ -226,9 +229,10 @@ class UDPStatsdMetricCollectionTests(testing.AsyncHTTPTestCase): path = ('foo', 'bar') value = 500 metric_type = 'ms' - expected = "{}:{}|{}".format('.'.join(itertools.chain((self.namespace, 'timers'), path)), - value, - metric_type) + expected = "{}:{}|{}".format('.'.join( + itertools.chain((self.namespace, 'timers'), path)), + value, + metric_type) self.application.statsd.send(path, value, metric_type) mock_sock.assert_called_once_with( From dc57f5c5d15269b56dc34d3d0e9cb40077d87c45 Mon Sep 17 00:00:00 2001 From: Dan g Date: Fri, 3 Aug 2018 10:56:14 -0400 Subject: [PATCH 8/8] Added statsd tests for 100% coverage, upgraded coverage pkg and conf --- requires/development.txt | 2 +- setup.cfg | 8 +++++++ sprockets/mixins/metrics/statsd.py | 6 +++--- tests.py | 34 ++++++++++++++++++++++++++++++ 4 files changed, 46 insertions(+), 4 deletions(-) diff --git a/requires/development.txt b/requires/development.txt index e8a3b99..2df164b 100644 --- a/requires/development.txt +++ b/requires/development.txt @@ -1,3 +1,3 @@ -r testing.txt -coverage>=3.7,<4.1 +coverage==4.5.1 Sphinx diff --git a/setup.cfg b/setup.cfg index d7cd6f9..f768660 100644 --- a/setup.cfg +++ b/setup.cfg @@ -3,3 +3,11 @@ universal = 1 [nosetests] cover-package = sprockets.mixins.metrics cover-branches = 1 +cover-erase = 1 +cover-html = 1 +cover-html-dir = build/coverage +cover-xml = 1 +match = ((?:^|[\b_.-])(:?[Tt]est|When|should|[Dd]escribe)) +verbosity = 2 +with-coverage = 1 +with-xunit = 1 diff --git a/sprockets/mixins/metrics/statsd.py b/sprockets/mixins/metrics/statsd.py index 2d3d972..f0e3817 100644 --- a/sprockets/mixins/metrics/statsd.py +++ b/sprockets/mixins/metrics/statsd.py @@ -120,6 +120,7 @@ class StatsDCollector(object): self._address = (self._host, self._port) self._namespace = namespace self._prepend_metric_type = prepend_metric_type + self._tcp_reconnect_sleep = 5 if protocol == 'tcp': self._tcp = True @@ -145,10 +146,9 @@ class StatsDCollector(object): @gen.engine def _tcp_on_closed(self): """Invoked when the socket is closed.""" - sleep = 5 LOGGER.warning('Not connected to statsd, connecting in %s seconds', - sleep) - yield gen.sleep(sleep) + self._tcp_reconnect_sleep) + yield gen.sleep(self._tcp_reconnect_sleep) self._sock = self._tcp_socket() def _tcp_on_connected(self): diff --git a/tests.py b/tests.py index 17c6d4e..2161d9e 100644 --- a/tests.py +++ b/tests.py @@ -44,6 +44,22 @@ def assert_between(low, value, high): value, low, high)) +class MisconfiguredStatsdMetricCollectionTests(testing.AsyncHTTPTestCase): + + def get_app(self): + self.application = web.Application([ + web.url('/', examples.statsd.SimpleHandler), + web.url('/counters/(.*)/([.0-9]*)', CounterBumper), + web.url('/status_code', DefaultStatusCode), + ]) + + def test_bad_protocol_raises_ValueError(self): + with self.assertRaises(ValueError): + statsd.StatsDCollector(host='127.0.0.1', + port=8125, + protocol='bad_protocol') + + class TCPStatsdMetricCollectionTests(testing.AsyncHTTPTestCase): def get_app(self): @@ -67,6 +83,24 @@ class TCPStatsdMetricCollectionTests(testing.AsyncHTTPTestCase): 'protocol': 'tcp', 'prepend_metric_type': True}) + def test_tcp_reconnect_on_stream_close(self): + path_sleep = 'tornado.gen.sleep' + path_statsd = self.application.statsd + with mock.patch(path_sleep) as gen_sleep, \ + patch.object(path_statsd, '_tcp_socket') as mock_tcp_socket: + f = web.Future() + f.set_result(None) + gen_sleep.return_value = f + + self.application.statsd._tcp_on_closed() + mock_tcp_socket.assert_called_once_with() + + @patch.object(iostream.IOStream, 'write') + def test_write_not_executed_when_connection_is_closed(self, mock_write): + self.application.statsd._sock.close() + self.application.statsd.send('foo', 500, 'c') + mock_write.assert_not_called() + @patch.object(iostream.IOStream, 'write') def test_expected_counters_data_written(self, mock_sock): path = ('foo', 'bar')