Merge pull request from sprockets/add-statsd-install

Add install for statsd mixin
This commit is contained in:
Brian Korty 2016-12-12 13:15:36 -05:00 committed by GitHub
commit 603e8dd46a
5 changed files with 242 additions and 118 deletions
README.rst
examples
sprockets/mixins/metrics
tests.py

View file

@ -22,17 +22,15 @@ call to the ``get`` method as well as a separate metric for the database query.
import queries import queries
def make_application(): def make_application():
settings = { application = web.Application([
statsd.SETTINGS_KEY: { web.url(r'/', MyHandler),
'namespace': 'my-application',
'host': os.environ.get('STATSD_HOST', '127.0.0.1'),
'port': os.environ.get('STATSD_PORT', '8125'),
}
}
return web.Application([
# insert handlers here
], **settings) ], **settings)
statsd.install({'namespace': 'my-application',
'host': os.environ.get('STATSD_HOST', '127.0.0.1'),
'port': os.environ.get('STATSD_PORT', '8125')})
return application
class MyHandler(statsd.StatsdMixin, class MyHandler(statsd.StatsdMixin,
mediatype.ContentMixin, mediatype.ContentMixin,
web.RequestHandler): web.RequestHandler):
@ -54,6 +52,8 @@ Settings
:namespace: The namespace for the measurements :namespace: The namespace for the measurements
:host: The Statsd host :host: The Statsd host
:port: The Statsd port :port: The Statsd port
:prepend_metric_type: Optional flag to prepend bucket path with the StatsD
metric type
InfluxDB Mixin InfluxDB Mixin
-------------- --------------
@ -76,7 +76,7 @@ variables:
application = web.Application( application = web.Application(
[ [
web.url(r'/', RequestHandler), web.url(r'/', MyHandler),
], **settings) ], **settings)
influxdb.install({'url': 'http://localhost:8086', influxdb.install({'url': 'http://localhost:8086',

View file

@ -52,14 +52,10 @@ def make_application():
webapps.SimpleHandler.GET.204:255.24497032165527|ms webapps.SimpleHandler.GET.204:255.24497032165527|ms
""" """
settings = { settings = {}
statsd.SETTINGS_KEY: { application = web.Application([web.url('/', SimpleHandler)], **settings)
'namespace': 'webapps', statsd.install(application, **{'namespace': 'testing'})
'host': '127.0.0.1', return application
'port': 8125,
}
}
return web.Application([web.url('/', SimpleHandler)], **settings)
if __name__ == '__main__': if __name__ == '__main__':

View file

@ -1,58 +1,33 @@
import contextlib import contextlib
import logging
import os
import socket import socket
import time import time
LOGGER = logging.getLogger(__name__)
SETTINGS_KEY = 'sprockets.mixins.metrics.statsd' SETTINGS_KEY = 'sprockets.mixins.metrics.statsd'
"""``self.settings`` key that configures this mix-in.""" """``self.settings`` key that configures this mix-in."""
class StatsdMixin(object): class StatsdMixin(object):
""" """Mix this class in to record metrics to a Statsd server."""
Mix this class in to record metrics to a Statsd server.
**Configuration**
:namespace:
Path to prefix metrics with. If undefined, this defaults to
``applications`` + ``self.__class__.__module__``
:host:
Host name of the StatsD server to send metrics to. If undefined,
this defaults to ``127.0.0.1``.
:port:
Port number that the StatsD server is listening on. If undefined,
this defaults to ``8125``.
"""
def initialize(self): def initialize(self):
super(StatsdMixin, self).initialize() super(StatsdMixin, self).initialize()
settings = self.settings.setdefault(SETTINGS_KEY, {})
if 'socket' not in settings:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
settings['socket'] = sock
if 'namespace' not in settings:
settings['namespace'] = 'applications.{}'.format(
self.__class__.__module__)
settings.setdefault('host', '127.0.0.1')
settings.setdefault('port', '8125')
self.__status_code = None self.__status_code = None
def set_metric_tag(self, tag, value): def set_metric_tag(self, tag, value):
"""Ignored for statsd since it does not support tagging.""" """Ignored for statsd since it does not support tagging.
def set_status(self, status_code, reason=None): :param str tag: name of the tag to set
# Extended to track status code to avoid referencing the :param str value: value to assign
# _status internal variable
self.__status_code = status_code """
super(StatsdMixin, self).set_status(status_code, reason=reason) pass
def record_timing(self, duration, *path): def record_timing(self, duration, *path):
""" """Record a timing.
Record a timing.
:param float duration: timing to record in seconds
:param path: elements of the metric path to record
This method records a timing to the application's namespace This method records a timing to the application's namespace
followed by a calculated path. Each element of `path` is followed by a calculated path. Each element of `path` is
@ -60,16 +35,14 @@ class StatsdMixin(object):
elements by periods. The normalization process is little elements by periods. The normalization process is little
more than replacing periods with dashes. more than replacing periods with dashes.
:param float duration: timing to record in seconds
:param path: elements of the metric path to record
""" """
self._send(self._build_path(path), duration * 1000.0, 'ms') self.application.statsd.send(path, duration * 1000.0, 'ms')
def increase_counter(self, *path, **kwargs): def increase_counter(self, *path, **kwargs):
""" """Increase a counter.
Increase a counter.
:param path: elements of the metric path to incr
:keyword int amount: amount to increase the counter by. If
omitted, the counter is increased by one.
This method increases a counter within the application's This method increases a counter within the application's
namespace. Each element of `path` is converted to a string namespace. Each element of `path` is converted to a string
@ -77,8 +50,12 @@ class StatsdMixin(object):
normalization process is little more than replacing periods normalization process is little more than replacing periods
with dashes. with dashes.
:param path: elements of the metric path to incr
:keyword int amount: amount to increase the counter by. If
omitted, the counter is increased by one.
""" """
self._send(self._build_path(path), kwargs.get('amount', '1'), 'c') self.application.statsd.send(path, kwargs.get('amount', '1'), 'c')
@contextlib.contextmanager @contextlib.contextmanager
def execution_timer(self, *path): def execution_timer(self, *path):
@ -115,14 +92,119 @@ class StatsdMixin(object):
self.__class__.__name__, self.request.method, self.__class__.__name__, self.request.method,
self.__status_code) self.__status_code)
def _build_path(self, path): def set_status(self, status_code, reason=None):
"""Return a normalized path.""" """Extend tornado `set_status` method to track status code
return '{}.{}'.format(self.settings[SETTINGS_KEY]['namespace'], to avoid referencing the _status internal variable
:param int status_code: Response status code. If ``reason``
is ``None``, it must be present in `httplib.responses
<http.client.responses>`.
:param string reason: Human-readable reason phrase describing
the status code. If ``None``, it will be filled in from
`httplib.responses <http.client.responses>`.
"""
self.__status_code = status_code
super(StatsdMixin, self).set_status(status_code, reason=reason)
class StatsDCollector(object):
"""Collects and submits stats to StatsD via UDP socket.
This class should be constructed using the
:meth:`~sprockets.mixins.statsd.install` method. When installed,
it is attached to the :class:`~tornado.web.Application` instance
for your web application.
:param str host: The StatsD host
:param str port: The StatsD port
:param str namespace: The StatsD bucket to write metrics into.
:param bool prepend_metric_type: Optional flag to prepend bucket path
with the StatsD metric type
"""
METRIC_TYPES = {'c': 'counters',
'ms': 'timers'}
def __init__(self, host, port, namespace='sprockets',
prepend_metric_type=True):
self._host = host
self._port = int(port)
self._namespace = namespace
self._prepend_metric_type = prepend_metric_type
self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
def send(self, path, value, metric_type):
"""Send a metric to Statsd.
:param list path: The metric path to record
:param mixed value: The value to record
:param str metric_type: The metric type
"""
msg = '{0}:{1}|{2}'.format(
self._build_path(path, metric_type), value, metric_type)
try:
LOGGER.debug('Sending %s to %s:%s', msg.encode('ascii'),
self._host, self._port)
self._sock.sendto(msg.encode('ascii'), (self._host, self._port))
except socket.error:
LOGGER.exception('Error sending StatsD metrics')
def _build_path(self, path, metric_type):
"""Return a normalized path.
:param list path: elements of the metric path to record
:param str metric_type: The metric type
:rtype: str
"""
path = self._get_prefixes(metric_type) + list(path)
return '{}.{}'.format(self._namespace,
'.'.join(str(p).replace('.', '-') for p in path)) '.'.join(str(p).replace('.', '-') for p in path))
def _send(self, path, value, stat_type): def _get_prefixes(self, metric_type):
"""Send a metric to Statsd.""" """Get prefixes where applicable
settings = self.settings[SETTINGS_KEY]
msg = '{0}:{1}|{2}'.format(path, value, stat_type) Add metric prefix counters, timers respectively if
settings['socket'].sendto(msg.encode('ascii'), :attr:`prepend_metric_type` flag is True.
(settings['host'], int(settings['port'])))
:param str metric_type: The metric type
:rtype: list
"""
prefixes = []
if self._prepend_metric_type:
prefixes.append(self.METRIC_TYPES[metric_type])
return prefixes
def install(application, **kwargs):
"""Call this to install StatsD for the Tornado application.
:param tornado.web.Application application: the application to
install the collector into.
:param kwargs: keyword parameters to pass to the
:class:`StatsDCollector` initializer.
:returns: :data:`True` if the client was installed successfully,
or :data:`False` otherwise.
- **host** The StatsD host. If host is not specified, the
``STATSD_HOST`` environment variable, or default `127.0.0.1`,
will be pass into the :class:`StatsDCollector`.
- **port** The StatsD port. If port is not specified, the
``STATSD_PORT`` environment variable, or default `8125`,
will be pass into the :class:`StatsDCollector`.
- **namespace** The StatsD bucket to write metrics into.
"""
if getattr(application, 'statsd', None) is not None:
LOGGER.warning('Statsd collector is already installed')
return False
if 'host' not in kwargs:
kwargs['host'] = os.environ.get('STATSD_HOST', '127.0.0.1')
if 'port' not in kwargs:
kwargs['port'] = os.environ.get('STATSD_PORT', '8125')
setattr(application, 'statsd', StatsDCollector(**kwargs))
return True

View file

@ -4,9 +4,6 @@ import socket
from tornado import gen, web from tornado import gen, web
from sprockets.mixins.metrics import influxdb
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
STATS_PATTERN = re.compile(r'(?P<path>[^:]*):(?P<value>[^|]*)\|(?P<type>.*)$') STATS_PATTERN = re.compile(r'(?P<path>[^:]*):(?P<value>[^|]*)\|(?P<type>.*)$')

135
tests.py
View file

@ -3,13 +3,15 @@ import logging
import os import os
import socket import socket
import time import time
import unittest
import uuid import uuid
from tornado import gen, testing, web from tornado import gen, testing, web
import mock import mock
from sprockets.mixins.metrics import influxdb, statsd from sprockets.mixins.metrics import influxdb, statsd
from sprockets.mixins.metrics.testing import FakeInfluxHandler, FakeStatsdServer from sprockets.mixins.metrics.testing import (
FakeInfluxHandler, FakeStatsdServer)
import examples.influxdb import examples.influxdb
import examples.statsd import examples.statsd
@ -34,7 +36,7 @@ def assert_between(low, value, high):
value, low, high)) value, low, high))
class StatsdMethodTimingTests(testing.AsyncHTTPTestCase): class StatsdMetricCollectionTests(testing.AsyncHTTPTestCase):
def get_app(self): def get_app(self):
self.application = web.Application([ self.application = web.Application([
@ -45,21 +47,77 @@ class StatsdMethodTimingTests(testing.AsyncHTTPTestCase):
def setUp(self): def setUp(self):
self.application = None self.application = None
super(StatsdMethodTimingTests, self).setUp() super(StatsdMetricCollectionTests, self).setUp()
self.statsd = FakeStatsdServer(self.io_loop) self.statsd = FakeStatsdServer(self.io_loop)
self.application.settings[statsd.SETTINGS_KEY] = { statsd.install(self.application, **{'namespace': 'testing',
'host': self.statsd.sockaddr[0], 'host': self.statsd.sockaddr[0],
'port': self.statsd.sockaddr[1], 'port': self.statsd.sockaddr[1],
'namespace': 'testing', 'prepend_metric_type': True})
}
def tearDown(self): def tearDown(self):
self.statsd.close() self.statsd.close()
super(StatsdMethodTimingTests, self).tearDown() super(StatsdMetricCollectionTests, self).tearDown()
@property def test_that_http_method_call_is_recorded(self):
def settings(self): response = self.fetch('/')
return self.application.settings[statsd.SETTINGS_KEY] self.assertEqual(response.code, 204)
expected = 'testing.timers.SimpleHandler.GET.204'
for path, value, stat_type in self.statsd.find_metrics(expected, 'ms'):
assert_between(250.0, float(value), 500.0)
def test_that_counter_increment_defaults_to_one(self):
response = self.fetch('/', method='POST', body='')
self.assertEqual(response.code, 204)
prefix = 'testing.counters.request.path'
for path, value, stat_type in self.statsd.find_metrics(prefix, 'c'):
self.assertEqual(int(value), 1)
def test_that_counter_accepts_increment_value(self):
response = self.fetch('/counters/path/5', method='POST', body='')
self.assertEqual(response.code, 204)
prefix = 'testing.counters.path'
for path, value, stat_type in self.statsd.find_metrics(prefix, 'c'):
self.assertEqual(int(value), 5)
def test_that_execution_timer_records_time_spent(self):
response = self.fetch('/counters/one.two.three/0.25')
self.assertEqual(response.code, 204)
prefix = 'testing.timers.one.two.three'
for path, value, stat_type in self.statsd.find_metrics(prefix, 'ms'):
assert_between(250.0, float(value), 300.0)
def test_that_add_metric_tag_is_ignored(self):
response = self.fetch('/',
headers={'Correlation-ID': 'does not matter'})
self.assertEqual(response.code, 204)
class StatsdConfigurationTests(testing.AsyncHTTPTestCase):
def get_app(self):
self.application = web.Application([
web.url('/', examples.statsd.SimpleHandler),
web.url('/counters/(.*)/([.0-9]*)', CounterBumper),
])
return self.application
def setUp(self):
self.application = None
super(StatsdConfigurationTests, self).setUp()
self.statsd = FakeStatsdServer(self.io_loop)
statsd.install(self.application, **{'namespace': 'testing',
'host': self.statsd.sockaddr[0],
'port': self.statsd.sockaddr[1],
'prepend_metric_type': False})
def tearDown(self):
self.statsd.close()
super(StatsdConfigurationTests, self).tearDown()
def test_that_http_method_call_is_recorded(self): def test_that_http_method_call_is_recorded(self):
response = self.fetch('/') response = self.fetch('/')
@ -69,27 +127,6 @@ class StatsdMethodTimingTests(testing.AsyncHTTPTestCase):
for path, value, stat_type in self.statsd.find_metrics(expected, 'ms'): for path, value, stat_type in self.statsd.find_metrics(expected, 'ms'):
assert_between(250.0, float(value), 500.0) assert_between(250.0, float(value), 500.0)
def test_that_cached_socket_is_used(self):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
self.settings['socket'] = sock
self.fetch('/')
self.assertIs(self.settings['socket'], sock)
def test_that_default_prefix_is_stored(self):
del self.settings['namespace']
self.fetch('/')
self.assertEqual(
self.settings['namespace'],
'applications.' + examples.statsd.SimpleHandler.__module__)
def test_that_counter_increment_defaults_to_one(self):
response = self.fetch('/', method='POST', body='')
self.assertEqual(response.code, 204)
prefix = 'testing.request.path'
for path, value, stat_type in self.statsd.find_metrics(prefix, 'c'):
self.assertEqual(int(value), 1)
def test_that_counter_accepts_increment_value(self): def test_that_counter_accepts_increment_value(self):
response = self.fetch('/counters/path/5', method='POST', body='') response = self.fetch('/counters/path/5', method='POST', body='')
self.assertEqual(response.code, 204) self.assertEqual(response.code, 204)
@ -98,18 +135,30 @@ class StatsdMethodTimingTests(testing.AsyncHTTPTestCase):
for path, value, stat_type in self.statsd.find_metrics(prefix, 'c'): for path, value, stat_type in self.statsd.find_metrics(prefix, 'c'):
self.assertEqual(int(value), 5) self.assertEqual(int(value), 5)
def test_that_execution_timer_records_time_spent(self):
response = self.fetch('/counters/one.two.three/0.25')
self.assertEqual(response.code, 204)
prefix = 'testing.one.two.three' class StatsdInstallationTests(unittest.TestCase):
for path, value, stat_type in self.statsd.find_metrics(prefix, 'ms'):
assert_between(250.0, float(value), 300.0)
def test_that_add_metric_tag_is_ignored(self): def setUp(self):
response = self.fetch('/', self.application = web.Application([
headers={'Correlation-ID': 'does not matter'}) web.url('/', examples.statsd.SimpleHandler),
self.assertEqual(response.code, 204) ])
def test_collecter_is_not_reinstalled(self):
self.assertTrue(statsd.install(self.application))
self.assertFalse(statsd.install(self.application))
def test_host_is_used(self):
statsd.install(self.application, **{'host': 'example.com'})
self.assertEqual(self.application.statsd._host, 'example.com')
def test_port_is_used(self):
statsd.install(self.application, **{'port': '8888'})
self.assertEqual(self.application.statsd._port, 8888)
def test_default_host_and_port_is_used(self):
statsd.install(self.application, **{'namespace': 'testing'})
self.assertEqual(self.application.statsd._host, '127.0.0.1')
self.assertEqual(self.application.statsd._port, 8125)
class InfluxDbTests(testing.AsyncHTTPTestCase): class InfluxDbTests(testing.AsyncHTTPTestCase):