Merge pull request #27 from ezhidblr/MOBILE-9146-add-statsd-tcp

MOBILE-9220 added newline to tcp messages, added tests
This commit is contained in:
Brian Korty 2018-08-03 15:52:22 -04:00 committed by GitHub
commit 3491f853db
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 141 additions and 16 deletions

View file

@ -4,3 +4,11 @@ universal = 1
[nosetests] [nosetests]
cover-package = sprockets.mixins.metrics cover-package = sprockets.mixins.metrics
cover-branches = 1 cover-branches = 1
cover-erase = 1
cover-html = 1
cover-html-dir = build/coverage
cover-xml = 1
match = ((?:^|[\b_.-])(:?[Tt]est|When|should|[Dd]escribe))
verbosity = 2
with-coverage = 1
with-xunit = 1

View file

@ -4,7 +4,7 @@ import os
import socket import socket
import time import time
from tornado import iostream from tornado import gen, iostream
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
@ -120,12 +120,15 @@ class StatsDCollector(object):
self._address = (self._host, self._port) self._address = (self._host, self._port)
self._namespace = namespace self._namespace = namespace
self._prepend_metric_type = prepend_metric_type self._prepend_metric_type = prepend_metric_type
self._tcp_reconnect_sleep = 5
if protocol == 'tcp': if protocol == 'tcp':
self._tcp = True self._tcp = True
self._msg_format = '{path}:{value}|{metric_type}\n'
self._sock = self._tcp_socket() self._sock = self._tcp_socket()
elif protocol == 'udp': elif protocol == 'udp':
self._tcp = False self._tcp = False
self._msg_format = '{path}:{value}|{metric_type}'
self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0) self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
else: else:
raise ValueError('Invalid protocol: {}'.format(protocol)) raise ValueError('Invalid protocol: {}'.format(protocol))
@ -135,17 +138,17 @@ class StatsDCollector(object):
:rtype: iostream.IOStream :rtype: iostream.IOStream
""" """
sock = iostream.IOStream(socket.socket( sock = iostream.IOStream(socket.socket(
socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP)) socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP))
try: sock.connect(self._address, self._tcp_on_connected)
sock.connect(self._address, self._tcp_on_connected)
except (OSError, socket.error) as error:
LOGGER.error('Failed to connect via TCP: %s', error)
sock.set_close_callback(self._tcp_on_closed) sock.set_close_callback(self._tcp_on_closed)
return sock return sock
@gen.engine
def _tcp_on_closed(self): def _tcp_on_closed(self):
"""Invoked when the socket is closed.""" """Invoked when the socket is closed."""
LOGGER.warning('Disconnected from statsd, reconnecting') LOGGER.warning('Not connected to statsd, connecting in %s seconds',
self._tcp_reconnect_sleep)
yield gen.sleep(self._tcp_reconnect_sleep)
self._sock = self._tcp_socket() self._sock = self._tcp_socket()
def _tcp_on_connected(self): def _tcp_on_connected(self):
@ -160,17 +163,23 @@ class StatsDCollector(object):
:param str metric_type: The metric type :param str metric_type: The metric type
""" """
msg = '{0}:{1}|{2}'.format( msg = self._msg_format.format(
self._build_path(path, metric_type), value, metric_type) path=self._build_path(path, metric_type),
value=value,
metric_type=metric_type)
LOGGER.debug('Sending %s to %s:%s', msg.encode('ascii'), LOGGER.debug('Sending %s to %s:%s', msg.encode('ascii'),
self._host, self._port) self._host, self._port)
try: try:
if self._tcp: if self._tcp:
if self._sock.closed():
return
return self._sock.write(msg.encode('ascii')) return self._sock.write(msg.encode('ascii'))
self._sock.sendto(msg.encode('ascii'), (self._host, self._port)) self._sock.sendto(msg.encode('ascii'), (self._host, self._port))
except iostream.StreamClosedError as error: # pragma: nocover
LOGGER.warning('Error sending TCP statsd metric: %s', error)
except (OSError, socket.error) as error: # pragma: nocover except (OSError, socket.error) as error: # pragma: nocover
LOGGER.exception('Error sending statsd metric: %s', error) LOGGER.exception('Error sending statsd metric: %s', error)

View file

@ -30,7 +30,7 @@ class FakeStatsdServer(tcpserver.TCPServer):
""" """
PATTERN = br'(?P<path>[^:]*):(?P<value>[^|]*)\|(?P<type>.*)$' TCP_PATTERN = br'(?P<path>[^:]*):(?P<value>[^|]*)\|(?P<type>.*)\n$'
def __init__(self, iol, protocol='udp'): def __init__(self, iol, protocol='udp'):
self.datagrams = [] self.datagrams = []
@ -71,7 +71,7 @@ class FakeStatsdServer(tcpserver.TCPServer):
def handle_stream(self, stream, address): def handle_stream(self, stream, address):
while True: while True:
try: try:
result = yield stream.read_until_regex(self.PATTERN) result = yield stream.read_until_regex(self.TCP_PATTERN)
except iostream.StreamClosedError: except iostream.StreamClosedError:
break break
else: else:

118
tests.py
View file

@ -1,4 +1,5 @@
import base64 import base64
import itertools
import logging import logging
import os import os
import socket import socket
@ -6,8 +7,9 @@ import time
import unittest import unittest
import uuid import uuid
from tornado import gen, testing, web from tornado import gen, iostream, testing, web
import mock import mock
from mock import patch
from sprockets.mixins.metrics import influxdb, statsd from sprockets.mixins.metrics import influxdb, statsd
from sprockets.mixins.metrics.testing import ( from sprockets.mixins.metrics.testing import (
@ -42,6 +44,22 @@ def assert_between(low, value, high):
value, low, high)) value, low, high))
class MisconfiguredStatsdMetricCollectionTests(testing.AsyncHTTPTestCase):
def get_app(self):
self.application = web.Application([
web.url('/', examples.statsd.SimpleHandler),
web.url('/counters/(.*)/([.0-9]*)', CounterBumper),
web.url('/status_code', DefaultStatusCode),
])
def test_bad_protocol_raises_ValueError(self):
with self.assertRaises(ValueError):
statsd.StatsDCollector(host='127.0.0.1',
port=8125,
protocol='bad_protocol')
class TCPStatsdMetricCollectionTests(testing.AsyncHTTPTestCase): class TCPStatsdMetricCollectionTests(testing.AsyncHTTPTestCase):
def get_app(self): def get_app(self):
@ -54,15 +72,65 @@ class TCPStatsdMetricCollectionTests(testing.AsyncHTTPTestCase):
def setUp(self): def setUp(self):
self.application = None self.application = None
self.namespace = 'testing'
super(TCPStatsdMetricCollectionTests, self).setUp() super(TCPStatsdMetricCollectionTests, self).setUp()
self.statsd = FakeStatsdServer(self.io_loop, protocol='tcp') self.statsd = FakeStatsdServer(self.io_loop, protocol='tcp')
statsd.install(self.application, **{'namespace': 'testing', statsd.install(self.application, **{'namespace': self.namespace,
'host': self.statsd.sockaddr[0], 'host': self.statsd.sockaddr[0],
'port': self.statsd.sockaddr[1], 'port': self.statsd.sockaddr[1],
'protocol': 'tcp', 'protocol': 'tcp',
'prepend_metric_type': True}) 'prepend_metric_type': True})
def test_tcp_reconnect_on_stream_close(self):
path_sleep = 'tornado.gen.sleep'
path_statsd = self.application.statsd
with mock.patch(path_sleep) as gen_sleep, \
patch.object(path_statsd, '_tcp_socket') as mock_tcp_socket:
f = web.Future()
f.set_result(None)
gen_sleep.return_value = f
self.application.statsd._tcp_on_closed()
mock_tcp_socket.assert_called_once_with()
@patch.object(iostream.IOStream, 'write')
def test_write_not_executed_when_connection_is_closed(self, mock_write):
self.application.statsd._sock.close()
self.application.statsd.send('foo', 500, 'c')
mock_write.assert_not_called()
@patch.object(iostream.IOStream, 'write')
def test_expected_counters_data_written(self, mock_sock):
path = ('foo', 'bar')
value = 500
metric_type = 'c'
expected = "{}:{}|{}\n".format('.'.join(
itertools.chain((self.namespace, 'counters'), path)),
value,
metric_type)
self.application.statsd.send(path, value, metric_type)
mock_sock.assert_called_once_with(expected.encode())
@patch.object(iostream.IOStream, 'write')
def test_expected_timers_data_written(self, mock_sock):
path = ('foo', 'bar')
value = 500
metric_type = 'ms'
expected = "{}:{}|{}\n".format('.'.join(
itertools.chain((self.namespace, 'timers'), path)),
value,
metric_type)
self.application.statsd.send(path, value, metric_type)
mock_sock.assert_called_once_with(expected.encode())
def test_tcp_message_format(self):
expected = '{path}:{value}|{metric_type}\n'
self.assertEqual(self.application.statsd._msg_format, expected)
def test_that_http_method_call_is_recorded(self): def test_that_http_method_call_is_recorded(self):
response = self.fetch('/') response = self.fetch('/')
self.assertEqual(response.code, 204) self.assertEqual(response.code, 204)
@ -120,10 +188,12 @@ class TCPStatsdConfigurationTests(testing.AsyncHTTPTestCase):
def setUp(self): def setUp(self):
self.application = None self.application = None
self.namespace = 'testing'
super(TCPStatsdConfigurationTests, self).setUp() super(TCPStatsdConfigurationTests, self).setUp()
self.statsd = FakeStatsdServer(self.io_loop, protocol='tcp') self.statsd = FakeStatsdServer(self.io_loop, protocol='tcp')
statsd.install(self.application, **{'namespace': 'testing', statsd.install(self.application, **{'namespace': self.namespace,
'host': self.statsd.sockaddr[0], 'host': self.statsd.sockaddr[0],
'port': self.statsd.sockaddr[1], 'port': self.statsd.sockaddr[1],
'protocol': 'tcp', 'protocol': 'tcp',
@ -158,10 +228,12 @@ class UDPStatsdMetricCollectionTests(testing.AsyncHTTPTestCase):
def setUp(self): def setUp(self):
self.application = None self.application = None
self.namespace = 'testing'
super(UDPStatsdMetricCollectionTests, self).setUp() super(UDPStatsdMetricCollectionTests, self).setUp()
self.statsd = FakeStatsdServer(self.io_loop, protocol='udp') self.statsd = FakeStatsdServer(self.io_loop, protocol='udp')
statsd.install(self.application, **{'namespace': 'testing', statsd.install(self.application, **{'namespace': self.namespace,
'host': self.statsd.sockaddr[0], 'host': self.statsd.sockaddr[0],
'port': self.statsd.sockaddr[1], 'port': self.statsd.sockaddr[1],
'protocol': 'udp', 'protocol': 'udp',
@ -171,6 +243,40 @@ class UDPStatsdMetricCollectionTests(testing.AsyncHTTPTestCase):
self.statsd.close() self.statsd.close()
super(UDPStatsdMetricCollectionTests, self).tearDown() super(UDPStatsdMetricCollectionTests, self).tearDown()
@patch.object(socket.socket, 'sendto')
def test_expected_counters_data_written(self, mock_sock):
path = ('foo', 'bar')
value = 500
metric_type = 'c'
expected = "{}:{}|{}".format('.'.join(
itertools.chain((self.namespace, 'counters'), path)),
value,
metric_type)
self.application.statsd.send(path, value, metric_type)
mock_sock.assert_called_once_with(
expected.encode(),
(self.statsd.sockaddr[0], self.statsd.sockaddr[1]))
@patch.object(socket.socket, 'sendto')
def test_expected_timers_data_written(self, mock_sock):
path = ('foo', 'bar')
value = 500
metric_type = 'ms'
expected = "{}:{}|{}".format('.'.join(
itertools.chain((self.namespace, 'timers'), path)),
value,
metric_type)
self.application.statsd.send(path, value, metric_type)
mock_sock.assert_called_once_with(
expected.encode(),
(self.statsd.sockaddr[0], self.statsd.sockaddr[1]))
def test_udp_message_format(self):
expected = '{path}:{value}|{metric_type}'
self.assertEqual(self.application.statsd._msg_format, expected)
def test_that_http_method_call_is_recorded(self): def test_that_http_method_call_is_recorded(self):
response = self.fetch('/') response = self.fetch('/')
self.assertEqual(response.code, 204) self.assertEqual(response.code, 204)
@ -228,10 +334,12 @@ class UDPStatsdConfigurationTests(testing.AsyncHTTPTestCase):
def setUp(self): def setUp(self):
self.application = None self.application = None
self.namespace = 'testing'
super(UDPStatsdConfigurationTests, self).setUp() super(UDPStatsdConfigurationTests, self).setUp()
self.statsd = FakeStatsdServer(self.io_loop, protocol='udp') self.statsd = FakeStatsdServer(self.io_loop, protocol='udp')
statsd.install(self.application, **{'namespace': 'testing', statsd.install(self.application, **{'namespace': self.namespace,
'host': self.statsd.sockaddr[0], 'host': self.statsd.sockaddr[0],
'port': self.statsd.sockaddr[1], 'port': self.statsd.sockaddr[1],
'protocol': 'udp', 'protocol': 'udp',