mirror of
https://github.com/sprockets/sprockets.mixins.metrics.git
synced 2024-11-22 03:00:25 +00:00
Merge pull request #25 from ezhidblr/MOBILE-9146-add-statsd-tcp
Adding TCP support for statsd
This commit is contained in:
commit
6e049cc1da
3 changed files with 207 additions and 23 deletions
|
@ -4,6 +4,8 @@ import os
|
|||
import socket
|
||||
import time
|
||||
|
||||
from tornado import iostream
|
||||
|
||||
LOGGER = logging.getLogger(__name__)
|
||||
|
||||
SETTINGS_KEY = 'sprockets.mixins.metrics.statsd'
|
||||
|
@ -93,7 +95,7 @@ class StatsdMixin(object):
|
|||
|
||||
|
||||
class StatsDCollector(object):
|
||||
"""Collects and submits stats to StatsD via UDP socket.
|
||||
"""Collects and submits stats to StatsD.
|
||||
|
||||
This class should be constructed using the
|
||||
:meth:`~sprockets.mixins.statsd.install` method. When installed,
|
||||
|
@ -102,6 +104,7 @@ class StatsDCollector(object):
|
|||
|
||||
:param str host: The StatsD host
|
||||
:param str port: The StatsD port
|
||||
:param str protocol: The StatsD protocol. May be either ``udp`` or ``tcp``.
|
||||
:param str namespace: The StatsD bucket to write metrics into.
|
||||
:param bool prepend_metric_type: Optional flag to prepend bucket path
|
||||
with the StatsD metric type
|
||||
|
@ -110,13 +113,44 @@ class StatsDCollector(object):
|
|||
METRIC_TYPES = {'c': 'counters',
|
||||
'ms': 'timers'}
|
||||
|
||||
def __init__(self, host, port, namespace='sprockets',
|
||||
def __init__(self, host, port, protocol='udp', namespace='sprockets',
|
||||
prepend_metric_type=True):
|
||||
self._host = host
|
||||
self._port = int(port)
|
||||
self._address = (self._host, self._port)
|
||||
self._namespace = namespace
|
||||
self._prepend_metric_type = prepend_metric_type
|
||||
self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
|
||||
|
||||
if protocol == 'tcp':
|
||||
self._tcp = True
|
||||
self._sock = self._tcp_socket()
|
||||
elif protocol == 'udp':
|
||||
self._tcp = False
|
||||
self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
|
||||
else:
|
||||
raise ValueError('Invalid protocol: {}'.format(protocol))
|
||||
|
||||
def _tcp_socket(self):
|
||||
"""Connect to statsd via TCP and return the IOStream handle.
|
||||
:rtype: iostream.IOStream
|
||||
"""
|
||||
sock = iostream.IOStream(socket.socket(
|
||||
socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP))
|
||||
try:
|
||||
sock.connect(self._address, self._tcp_on_connected)
|
||||
except (OSError, socket.error) as error:
|
||||
LOGGER.error('Failed to connect via TCP: %s', error)
|
||||
sock.set_close_callback(self._tcp_on_closed)
|
||||
return sock
|
||||
|
||||
def _tcp_on_closed(self):
|
||||
"""Invoked when the socket is closed."""
|
||||
LOGGER.warning('Disconnected from statsd, reconnecting')
|
||||
self._sock = self._tcp_socket()
|
||||
|
||||
def _tcp_on_connected(self):
|
||||
"""Invoked when the IOStream is connected"""
|
||||
LOGGER.debug('Connected to statsd at %s via TCP', self._address)
|
||||
|
||||
def send(self, path, value, metric_type):
|
||||
"""Send a metric to Statsd.
|
||||
|
@ -128,12 +162,17 @@ class StatsDCollector(object):
|
|||
"""
|
||||
msg = '{0}:{1}|{2}'.format(
|
||||
self._build_path(path, metric_type), value, metric_type)
|
||||
|
||||
LOGGER.debug('Sending %s to %s:%s', msg.encode('ascii'),
|
||||
self._host, self._port)
|
||||
|
||||
try:
|
||||
LOGGER.debug('Sending %s to %s:%s', msg.encode('ascii'),
|
||||
self._host, self._port)
|
||||
if self._tcp:
|
||||
return self._sock.write(msg.encode('ascii'))
|
||||
|
||||
self._sock.sendto(msg.encode('ascii'), (self._host, self._port))
|
||||
except socket.error:
|
||||
LOGGER.exception('Error sending StatsD metrics')
|
||||
except (OSError, socket.error) as error: # pragma: nocover
|
||||
LOGGER.exception('Error sending statsd metric: %s', error)
|
||||
|
||||
def _build_path(self, path, metric_type):
|
||||
"""Return a normalized path.
|
||||
|
@ -191,5 +230,8 @@ def install(application, **kwargs):
|
|||
if 'port' not in kwargs:
|
||||
kwargs['port'] = os.environ.get('STATSD_PORT', '8125')
|
||||
|
||||
if 'protocol' not in kwargs:
|
||||
kwargs['protocol'] = os.environ.get('STATSD_PROTOCOL', 'udp')
|
||||
|
||||
setattr(application, 'statsd', StatsDCollector(**kwargs))
|
||||
return True
|
||||
|
|
|
@ -2,17 +2,17 @@ import logging
|
|||
import re
|
||||
import socket
|
||||
|
||||
from tornado import gen, web
|
||||
from tornado import gen, iostream, locks, tcpserver, testing, web
|
||||
|
||||
LOGGER = logging.getLogger(__name__)
|
||||
STATS_PATTERN = re.compile(r'(?P<path>[^:]*):(?P<value>[^|]*)\|(?P<type>.*)$')
|
||||
|
||||
|
||||
class FakeStatsdServer(object):
|
||||
class FakeStatsdServer(tcpserver.TCPServer):
|
||||
"""
|
||||
Implements something resembling a statsd server.
|
||||
|
||||
:param tornado.ioloop.IOLoop iol: the loop to attach to
|
||||
:param str protocol: The StatsD protocol. May be either ``udp`` or ``tcp``.
|
||||
|
||||
Create an instance of this class in your asynchronous test case
|
||||
attached to the IOLoop and configure your application to send
|
||||
|
@ -30,12 +30,31 @@ class FakeStatsdServer(object):
|
|||
|
||||
"""
|
||||
|
||||
def __init__(self, iol):
|
||||
PATTERN = br'(?P<path>[^:]*):(?P<value>[^|]*)\|(?P<type>.*)$'
|
||||
|
||||
def __init__(self, iol, protocol='udp'):
|
||||
self.datagrams = []
|
||||
|
||||
if protocol == 'tcp':
|
||||
self.tcp_server()
|
||||
elif protocol == 'udp':
|
||||
self.udp_server(iol)
|
||||
else:
|
||||
raise ValueError('Invalid protocol: {}'.format(protocol))
|
||||
|
||||
def tcp_server(self):
|
||||
self.event = locks.Event()
|
||||
super(FakeStatsdServer, self).__init__()
|
||||
|
||||
sock, port = testing.bind_unused_port()
|
||||
self.add_socket(sock)
|
||||
self.sockaddr = sock.getsockname()
|
||||
|
||||
def udp_server(self, iol):
|
||||
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM,
|
||||
socket.IPPROTO_UDP)
|
||||
self.socket.bind(('127.0.0.1', 0))
|
||||
self.sockaddr = self.socket.getsockname()
|
||||
self.datagrams = []
|
||||
|
||||
iol.add_handler(self.socket, self._handle_events, iol.READ)
|
||||
self._iol = iol
|
||||
|
@ -48,6 +67,21 @@ class FakeStatsdServer(object):
|
|||
self.socket.close()
|
||||
self.socket = None
|
||||
|
||||
@gen.coroutine
|
||||
def handle_stream(self, stream, address):
|
||||
while True:
|
||||
try:
|
||||
result = yield stream.read_until_regex(self.PATTERN)
|
||||
except iostream.StreamClosedError:
|
||||
break
|
||||
else:
|
||||
self.event.set()
|
||||
self.datagrams.append(result)
|
||||
if b'reconnect' in result:
|
||||
self.reconnect_receive = True
|
||||
stream.close()
|
||||
return
|
||||
|
||||
def _handle_events(self, fd, events):
|
||||
if fd != self.socket:
|
||||
return
|
||||
|
@ -73,6 +107,7 @@ class FakeStatsdServer(object):
|
|||
'(?P<path>{}[^:]*):(?P<value>[^|]*)\\|(?P<type>{})'.format(
|
||||
re.escape(prefix), re.escape(metric_type)))
|
||||
matched = False
|
||||
|
||||
for datagram in self.datagrams:
|
||||
text_msg = datagram.decode('ascii')
|
||||
match = pattern.match(text_msg)
|
||||
|
|
129
tests.py
129
tests.py
|
@ -42,7 +42,7 @@ def assert_between(low, value, high):
|
|||
value, low, high))
|
||||
|
||||
|
||||
class StatsdMetricCollectionTests(testing.AsyncHTTPTestCase):
|
||||
class TCPStatsdMetricCollectionTests(testing.AsyncHTTPTestCase):
|
||||
|
||||
def get_app(self):
|
||||
self.application = web.Application([
|
||||
|
@ -54,17 +54,15 @@ class StatsdMetricCollectionTests(testing.AsyncHTTPTestCase):
|
|||
|
||||
def setUp(self):
|
||||
self.application = None
|
||||
super(StatsdMetricCollectionTests, self).setUp()
|
||||
self.statsd = FakeStatsdServer(self.io_loop)
|
||||
super(TCPStatsdMetricCollectionTests, self).setUp()
|
||||
self.statsd = FakeStatsdServer(self.io_loop, protocol='tcp')
|
||||
|
||||
statsd.install(self.application, **{'namespace': 'testing',
|
||||
'host': self.statsd.sockaddr[0],
|
||||
'port': self.statsd.sockaddr[1],
|
||||
'protocol': 'tcp',
|
||||
'prepend_metric_type': True})
|
||||
|
||||
def tearDown(self):
|
||||
self.statsd.close()
|
||||
super(StatsdMetricCollectionTests, self).tearDown()
|
||||
|
||||
def test_that_http_method_call_is_recorded(self):
|
||||
response = self.fetch('/')
|
||||
self.assertEqual(response.code, 204)
|
||||
|
@ -111,7 +109,7 @@ class StatsdMetricCollectionTests(testing.AsyncHTTPTestCase):
|
|||
list(self.statsd.find_metrics(expected, 'ms'))[0][0])
|
||||
|
||||
|
||||
class StatsdConfigurationTests(testing.AsyncHTTPTestCase):
|
||||
class TCPStatsdConfigurationTests(testing.AsyncHTTPTestCase):
|
||||
|
||||
def get_app(self):
|
||||
self.application = web.Application([
|
||||
|
@ -122,17 +120,126 @@ class StatsdConfigurationTests(testing.AsyncHTTPTestCase):
|
|||
|
||||
def setUp(self):
|
||||
self.application = None
|
||||
super(StatsdConfigurationTests, self).setUp()
|
||||
self.statsd = FakeStatsdServer(self.io_loop)
|
||||
super(TCPStatsdConfigurationTests, self).setUp()
|
||||
self.statsd = FakeStatsdServer(self.io_loop, protocol='tcp')
|
||||
|
||||
statsd.install(self.application, **{'namespace': 'testing',
|
||||
'host': self.statsd.sockaddr[0],
|
||||
'port': self.statsd.sockaddr[1],
|
||||
'protocol': 'tcp',
|
||||
'prepend_metric_type': False})
|
||||
|
||||
def test_that_http_method_call_is_recorded(self):
|
||||
response = self.fetch('/')
|
||||
self.assertEqual(response.code, 204)
|
||||
|
||||
expected = 'testing.SimpleHandler.GET.204'
|
||||
for path, value, stat_type in self.statsd.find_metrics(expected, 'ms'):
|
||||
assert_between(250.0, float(value), 500.0)
|
||||
|
||||
def test_that_counter_accepts_increment_value(self):
|
||||
response = self.fetch('/counters/path/5', method='POST', body='')
|
||||
self.assertEqual(response.code, 204)
|
||||
|
||||
prefix = 'testing.path'
|
||||
for path, value, stat_type in self.statsd.find_metrics(prefix, 'c'):
|
||||
self.assertEqual(int(value), 5)
|
||||
|
||||
|
||||
class UDPStatsdMetricCollectionTests(testing.AsyncHTTPTestCase):
|
||||
|
||||
def get_app(self):
|
||||
self.application = web.Application([
|
||||
web.url('/', examples.statsd.SimpleHandler),
|
||||
web.url('/counters/(.*)/([.0-9]*)', CounterBumper),
|
||||
web.url('/status_code', DefaultStatusCode),
|
||||
])
|
||||
return self.application
|
||||
|
||||
def setUp(self):
|
||||
self.application = None
|
||||
super(UDPStatsdMetricCollectionTests, self).setUp()
|
||||
self.statsd = FakeStatsdServer(self.io_loop, protocol='udp')
|
||||
|
||||
statsd.install(self.application, **{'namespace': 'testing',
|
||||
'host': self.statsd.sockaddr[0],
|
||||
'port': self.statsd.sockaddr[1],
|
||||
'protocol': 'udp',
|
||||
'prepend_metric_type': True})
|
||||
|
||||
def tearDown(self):
|
||||
self.statsd.close()
|
||||
super(UDPStatsdMetricCollectionTests, self).tearDown()
|
||||
|
||||
def test_that_http_method_call_is_recorded(self):
|
||||
response = self.fetch('/')
|
||||
self.assertEqual(response.code, 204)
|
||||
|
||||
expected = 'testing.timers.SimpleHandler.GET.204'
|
||||
for path, value, stat_type in self.statsd.find_metrics(expected, 'ms'):
|
||||
assert_between(250.0, float(value), 500.0)
|
||||
|
||||
def test_that_counter_increment_defaults_to_one(self):
|
||||
response = self.fetch('/', method='POST', body='')
|
||||
self.assertEqual(response.code, 204)
|
||||
|
||||
prefix = 'testing.counters.request.path'
|
||||
for path, value, stat_type in self.statsd.find_metrics(prefix, 'c'):
|
||||
self.assertEqual(int(value), 1)
|
||||
|
||||
def test_that_counter_accepts_increment_value(self):
|
||||
response = self.fetch('/counters/path/5', method='POST', body='')
|
||||
self.assertEqual(response.code, 204)
|
||||
|
||||
prefix = 'testing.counters.path'
|
||||
for path, value, stat_type in self.statsd.find_metrics(prefix, 'c'):
|
||||
self.assertEqual(int(value), 5)
|
||||
|
||||
def test_that_execution_timer_records_time_spent(self):
|
||||
response = self.fetch('/counters/one.two.three/0.25')
|
||||
self.assertEqual(response.code, 204)
|
||||
|
||||
prefix = 'testing.timers.one.two.three'
|
||||
for path, value, stat_type in self.statsd.find_metrics(prefix, 'ms'):
|
||||
assert_between(250.0, float(value), 300.0)
|
||||
|
||||
def test_that_add_metric_tag_is_ignored(self):
|
||||
response = self.fetch('/',
|
||||
headers={'Correlation-ID': 'does not matter'})
|
||||
self.assertEqual(response.code, 204)
|
||||
|
||||
def test_that_status_code_is_used_when_not_explicitly_set(self):
|
||||
response = self.fetch('/status_code')
|
||||
self.assertEqual(response.code, 200)
|
||||
|
||||
expected = 'testing.timers.DefaultStatusCode.GET.200'
|
||||
self.assertEqual(expected,
|
||||
list(self.statsd.find_metrics(expected, 'ms'))[0][0])
|
||||
|
||||
|
||||
class UDPStatsdConfigurationTests(testing.AsyncHTTPTestCase):
|
||||
|
||||
def get_app(self):
|
||||
self.application = web.Application([
|
||||
web.url('/', examples.statsd.SimpleHandler),
|
||||
web.url('/counters/(.*)/([.0-9]*)', CounterBumper),
|
||||
])
|
||||
return self.application
|
||||
|
||||
def setUp(self):
|
||||
self.application = None
|
||||
super(UDPStatsdConfigurationTests, self).setUp()
|
||||
self.statsd = FakeStatsdServer(self.io_loop, protocol='udp')
|
||||
|
||||
statsd.install(self.application, **{'namespace': 'testing',
|
||||
'host': self.statsd.sockaddr[0],
|
||||
'port': self.statsd.sockaddr[1],
|
||||
'protocol': 'udp',
|
||||
'prepend_metric_type': False})
|
||||
|
||||
def tearDown(self):
|
||||
self.statsd.close()
|
||||
super(StatsdConfigurationTests, self).tearDown()
|
||||
super(UDPStatsdConfigurationTests, self).tearDown()
|
||||
|
||||
def test_that_http_method_call_is_recorded(self):
|
||||
response = self.fetch('/')
|
||||
|
|
Loading…
Reference in a new issue