From 6176ab654dccf03be6665b2d9db8449af9b1c4c2 Mon Sep 17 00:00:00 2001 From: Dan g Date: Thu, 2 Aug 2018 10:22:30 -0400 Subject: [PATCH] - add non-blocking sleep to reconnect - catch iostream.StreamClosedError - short circuit write on a closed tcp stream - flake8 formatting --- sprockets/mixins/metrics/statsd.py | 19 ++++++++++++------- tests.py | 28 ++++++++++++++++------------ 2 files changed, 28 insertions(+), 19 deletions(-) diff --git a/sprockets/mixins/metrics/statsd.py b/sprockets/mixins/metrics/statsd.py index ff187b8..2d3d972 100644 --- a/sprockets/mixins/metrics/statsd.py +++ b/sprockets/mixins/metrics/statsd.py @@ -4,7 +4,7 @@ import os import socket import time -from tornado import iostream +from tornado import gen, iostream LOGGER = logging.getLogger(__name__) @@ -137,17 +137,18 @@ class StatsDCollector(object): :rtype: iostream.IOStream """ sock = iostream.IOStream(socket.socket( - socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP)) - try: - sock.connect(self._address, self._tcp_on_connected) - except (OSError, socket.error) as error: - LOGGER.error('Failed to connect via TCP: %s', error) + socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP)) + sock.connect(self._address, self._tcp_on_connected) sock.set_close_callback(self._tcp_on_closed) return sock + @gen.engine def _tcp_on_closed(self): """Invoked when the socket is closed.""" - LOGGER.warning('Disconnected from statsd, reconnecting') + sleep = 5 + LOGGER.warning('Not connected to statsd, connecting in %s seconds', + sleep) + yield gen.sleep(sleep) self._sock = self._tcp_socket() def _tcp_on_connected(self): @@ -172,9 +173,13 @@ class StatsDCollector(object): try: if self._tcp: + if self._sock.closed(): + return return self._sock.write(msg.encode('ascii')) self._sock.sendto(msg.encode('ascii'), (self._host, self._port)) + except iostream.StreamClosedError as error: # pragma: nocover + LOGGER.warning('Error sending TCP statsd metric: %s', error) except (OSError, socket.error) as error: # pragma: nocover LOGGER.exception('Error sending statsd metric: %s', error) diff --git a/tests.py b/tests.py index 9265361..17c6d4e 100644 --- a/tests.py +++ b/tests.py @@ -72,9 +72,10 @@ class TCPStatsdMetricCollectionTests(testing.AsyncHTTPTestCase): path = ('foo', 'bar') value = 500 metric_type = 'c' - expected = "{}:{}|{}\n".format('.'.join(itertools.chain((self.namespace, 'counters'), path)), - value, - metric_type) + expected = "{}:{}|{}\n".format('.'.join( + itertools.chain((self.namespace, 'counters'), path)), + value, + metric_type) self.application.statsd.send(path, value, metric_type) mock_sock.assert_called_once_with(expected.encode()) @@ -84,9 +85,10 @@ class TCPStatsdMetricCollectionTests(testing.AsyncHTTPTestCase): path = ('foo', 'bar') value = 500 metric_type = 'ms' - expected = "{}:{}|{}\n".format('.'.join(itertools.chain((self.namespace, 'timers'), path)), - value, - metric_type) + expected = "{}:{}|{}\n".format('.'.join( + itertools.chain((self.namespace, 'timers'), path)), + value, + metric_type) self.application.statsd.send(path, value, metric_type) mock_sock.assert_called_once_with(expected.encode()) @@ -212,9 +214,10 @@ class UDPStatsdMetricCollectionTests(testing.AsyncHTTPTestCase): path = ('foo', 'bar') value = 500 metric_type = 'c' - expected = "{}:{}|{}".format('.'.join(itertools.chain((self.namespace, 'counters'), path)), - value, - metric_type) + expected = "{}:{}|{}".format('.'.join( + itertools.chain((self.namespace, 'counters'), path)), + value, + metric_type) self.application.statsd.send(path, value, metric_type) mock_sock.assert_called_once_with( @@ -226,9 +229,10 @@ class UDPStatsdMetricCollectionTests(testing.AsyncHTTPTestCase): path = ('foo', 'bar') value = 500 metric_type = 'ms' - expected = "{}:{}|{}".format('.'.join(itertools.chain((self.namespace, 'timers'), path)), - value, - metric_type) + expected = "{}:{}|{}".format('.'.join( + itertools.chain((self.namespace, 'timers'), path)), + value, + metric_type) self.application.statsd.send(path, value, metric_type) mock_sock.assert_called_once_with(