- add non-blocking sleep to reconnect

- catch iostream.StreamClosedError
- short circuit write on a closed tcp stream
- flake8 formatting
This commit is contained in:
Dan g 2018-08-02 10:22:30 -04:00
parent 6414edf5ab
commit 6176ab654d
2 changed files with 28 additions and 19 deletions

View file

@ -4,7 +4,7 @@ import os
import socket
import time
from tornado import iostream
from tornado import gen, iostream
LOGGER = logging.getLogger(__name__)
@ -137,17 +137,18 @@ class StatsDCollector(object):
:rtype: iostream.IOStream
"""
sock = iostream.IOStream(socket.socket(
socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP))
try:
sock.connect(self._address, self._tcp_on_connected)
except (OSError, socket.error) as error:
LOGGER.error('Failed to connect via TCP: %s', error)
socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP))
sock.connect(self._address, self._tcp_on_connected)
sock.set_close_callback(self._tcp_on_closed)
return sock
@gen.engine
def _tcp_on_closed(self):
"""Invoked when the socket is closed."""
LOGGER.warning('Disconnected from statsd, reconnecting')
sleep = 5
LOGGER.warning('Not connected to statsd, connecting in %s seconds',
sleep)
yield gen.sleep(sleep)
self._sock = self._tcp_socket()
def _tcp_on_connected(self):
@ -172,9 +173,13 @@ class StatsDCollector(object):
try:
if self._tcp:
if self._sock.closed():
return
return self._sock.write(msg.encode('ascii'))
self._sock.sendto(msg.encode('ascii'), (self._host, self._port))
except iostream.StreamClosedError as error: # pragma: nocover
LOGGER.warning('Error sending TCP statsd metric: %s', error)
except (OSError, socket.error) as error: # pragma: nocover
LOGGER.exception('Error sending statsd metric: %s', error)

View file

@ -72,9 +72,10 @@ class TCPStatsdMetricCollectionTests(testing.AsyncHTTPTestCase):
path = ('foo', 'bar')
value = 500
metric_type = 'c'
expected = "{}:{}|{}\n".format('.'.join(itertools.chain((self.namespace, 'counters'), path)),
value,
metric_type)
expected = "{}:{}|{}\n".format('.'.join(
itertools.chain((self.namespace, 'counters'), path)),
value,
metric_type)
self.application.statsd.send(path, value, metric_type)
mock_sock.assert_called_once_with(expected.encode())
@ -84,9 +85,10 @@ class TCPStatsdMetricCollectionTests(testing.AsyncHTTPTestCase):
path = ('foo', 'bar')
value = 500
metric_type = 'ms'
expected = "{}:{}|{}\n".format('.'.join(itertools.chain((self.namespace, 'timers'), path)),
value,
metric_type)
expected = "{}:{}|{}\n".format('.'.join(
itertools.chain((self.namespace, 'timers'), path)),
value,
metric_type)
self.application.statsd.send(path, value, metric_type)
mock_sock.assert_called_once_with(expected.encode())
@ -212,9 +214,10 @@ class UDPStatsdMetricCollectionTests(testing.AsyncHTTPTestCase):
path = ('foo', 'bar')
value = 500
metric_type = 'c'
expected = "{}:{}|{}".format('.'.join(itertools.chain((self.namespace, 'counters'), path)),
value,
metric_type)
expected = "{}:{}|{}".format('.'.join(
itertools.chain((self.namespace, 'counters'), path)),
value,
metric_type)
self.application.statsd.send(path, value, metric_type)
mock_sock.assert_called_once_with(
@ -226,9 +229,10 @@ class UDPStatsdMetricCollectionTests(testing.AsyncHTTPTestCase):
path = ('foo', 'bar')
value = 500
metric_type = 'ms'
expected = "{}:{}|{}".format('.'.join(itertools.chain((self.namespace, 'timers'), path)),
value,
metric_type)
expected = "{}:{}|{}".format('.'.join(
itertools.chain((self.namespace, 'timers'), path)),
value,
metric_type)
self.application.statsd.send(path, value, metric_type)
mock_sock.assert_called_once_with(