Add sprockets.mixins.metrics.InfluxDBMixin.

This commit is contained in:
Dave Shawley 2016-01-21 15:49:35 -05:00
parent 0f486b7ef3
commit 005ad9d7ba
6 changed files with 253 additions and 9 deletions

View file

@ -47,6 +47,14 @@ Statsd Implementation
.. autoclass:: sprockets.mixins.metrics.StatsdMixin
:members:
InfluxDB Implementation
-----------------------
.. autoclass:: sprockets.mixins.metrics.InfluxDBMixin
:members:
.. autoclass:: sprockets.mixins.metrics.influxdb.InfluxDBConnection
:members:
Testing Helpers
---------------
*So who actually tests that their metrics are emitted as they expect?*

View file

@ -8,5 +8,7 @@ Release History
- Add :class:`sprockets.mixins.metrics.StatsdMixin`
- Add :class:`sprockets.mixins.metrics.testing.FakeStatsdServer`
- Add :class:`sprockets.mixins.metrics.testing.FakeInfluxHandler`
- Add :class:`sprockets.mixins.metrics.InfluxDBMixin`
- Add :class:`sprockets.mixins.metrics.influxdb.InfluxDBConnection`
.. _Next Release: https://github.com/sprockets/sprockets.mixins.metrics/compare/0.0.0...master

View file

@ -1,2 +1,3 @@
mock>=1.0.1,<2
nose>=1.3,<2
tornado>=4.2,<4.3

View file

@ -1,5 +1,6 @@
from .influxdb import InfluxDBMixin
from .statsd import StatsdMixin
version_info = (0, 0, 0)
__version__ = '.'.join(str(v) for v in version_info)
__all__ = ['StatsdMixin']
__all__ = ['InfluxDBMixin', 'StatsdMixin']

View file

@ -0,0 +1,142 @@
import contextlib
import socket
import time
from tornado import httpclient, ioloop
class InfluxDBConnection(object):
"""
Connection to an InfluxDB instance.
:param str write_url: the URL to send HTTP requests to
:param str database: the database to write measurements into
:param tornado.ioloop.IOLoop: the IOLoop to spawn callbacks on.
If this parameter is :data:`None`, then the active IOLoop,
as determined by :meth:`tornado.ioloop.IOLoop.instance`,
is used.
An instance of this class is stored in the application settings
and used to asynchronously send measurements to InfluxDB instance.
Each measurement is sent by spawning a context-free callback on
the IOloop.
"""
def __init__(self, write_url, database, io_loop=None):
self.io_loop = ioloop.IOLoop.instance() if io_loop is None else io_loop
self.client = httpclient.AsyncHTTPClient()
self.write_url = '{}?db={}'.format(write_url, database)
def submit(self, measurement, tags, values):
body = '{},{} {} {:d}'.format(measurement, ','.join(tags),
','.join(values),
int(time.time() * 1000000000))
request = httpclient.HTTPRequest(self.write_url, method='POST',
body=body.encode('utf-8'))
ioloop.IOLoop.current().spawn_callback(self.client.fetch, request)
class InfluxDBMixin(object):
"""
Mix this class in to record measurements to a InfluxDB server.
**Configuration**
:database:
InfluxDB database to write measurements to. This is passed
as the ``db`` query parameter when writing to Influx.
https://docs.influxdata.com/influxdb/v0.9/guides/writing_data/
:write_url:
The URL that the InfluxDB write endpoint is available on.
This is used as-is to write data into Influx.
"""
SETTINGS_KEY = 'sprockets.mixins.metrics.influxdb'
"""``self.settings`` key that configures this mix-in."""
def initialize(self):
self.__tags = {
'host': socket.gethostname(),
'handler': '{}.{}'.format(self.__module__,
self.__class__.__name__),
'method': self.request.method,
}
super(InfluxDBMixin, self).initialize()
settings = self.settings.setdefault(self.SETTINGS_KEY, {})
if 'db_connection' not in settings:
settings['db_connection'] = InfluxDBConnection(
settings['write_url'], settings['database'])
self.__metrics = []
def set_metric_tag(self, tag, value):
"""
Add a tag to the measurement key.
:param str tag: name of the tag to set
:param str value: value to assign
This will overwrite the current value assigned to a tag
if one exists.
"""
self.__tags[tag] = value
def record_timing(self, duration, *path):
"""
Record a timing.
:param float duration: timing to record in seconds
:param path: elements of the metric path to record
A timing is a named duration value.
"""
self.__metrics.append('{}={}'.format('.'.join(path), duration))
def increase_counter(self, *path, **kwargs):
"""
Increase a counter.
:param path: elements of the path to record
:keyword int amount: value to record. If omitted, the counter
value is one.
Counters are simply values that are summed in a query.
"""
self.__metrics.append('{}={}'.format('.'.join(path),
kwargs.get('amount', 1)))
@contextlib.contextmanager
def execution_timer(self, *path):
"""
Record the time it takes to run an arbitrary code block.
:param path: elements of the metric path to record
This method returns a context manager that records the amount
of time spent inside of the context and records a value
named `path` using (:meth:`record_timing`).
"""
start = time.time()
try:
yield
finally:
fini = max(time.time(), start)
self.record_timing(fini - start, *path)
def on_finish(self):
super(InfluxDBMixin, self).on_finish()
self.__metrics.append('status_code={}'.format(self._status_code))
self.record_timing(self.request.request_time(), 'duration')
self.settings[self.SETTINGS_KEY]['db_connection'].submit(
self.settings[self.SETTINGS_KEY]['measurement'],
('{}={}'.format(k, v) for k, v in self.__tags.items()),
self.__metrics,
)

106
tests.py
View file

@ -1,9 +1,14 @@
import logging
import socket
import time
from tornado import gen, testing, web
import mock
from sprockets.mixins import metrics
from sprockets.mixins.metrics.testing import FakeStatsdServer
from sprockets.mixins.metrics.testing import (
FakeInfluxHandler, FakeStatsdServer)
import examples.influxdb
import examples.statsd
@ -23,6 +28,12 @@ class CounterBumper(metrics.StatsdMixin, web.RequestHandler):
self.set_status(204)
def assert_between(low, value, high):
if not (low <= value < high):
raise AssertionError('Expected {} to be between {} and {}'.format(
value, low, high))
class StatsdMethodTimingTests(testing.AsyncHTTPTestCase):
def get_app(self):
@ -50,18 +61,13 @@ class StatsdMethodTimingTests(testing.AsyncHTTPTestCase):
def settings(self):
return self.application.settings[metrics.StatsdMixin.SETTINGS_KEY]
def assert_between(self, low, value, high):
self.assertTrue(
low <= value < high,
'Expected {} to be between {} and {}'.format(value, low, high))
def test_that_http_method_call_is_recorded(self):
response = self.fetch('/')
self.assertEqual(response.code, 204)
expected = 'testing.SimpleHandler.GET.204'
for path, value, stat_type in self.statsd.find_metrics(expected, 'ms'):
self.assert_between(250.0, float(value), 500.0)
assert_between(250.0, float(value), 500.0)
def test_that_cached_socket_is_used(self):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
@ -98,4 +104,88 @@ class StatsdMethodTimingTests(testing.AsyncHTTPTestCase):
prefix = 'testing.one.two.three'
for path, value, stat_type in self.statsd.find_metrics(prefix, 'ms'):
self.assert_between(250.0, float(value), 300.0)
assert_between(250.0, float(value), 300.0)
class InfluxDbTests(testing.AsyncHTTPTestCase):
def get_app(self):
self.application = web.Application([
web.url(r'/', examples.influxdb.SimpleHandler),
web.url(r'/write', FakeInfluxHandler),
])
return self.application
def setUp(self):
self.application = None
super(InfluxDbTests, self).setUp()
self.application.settings[metrics.InfluxDBMixin.SETTINGS_KEY] = {
'measurement': 'my-service',
'write_url': self.get_url('/write'),
'database': 'requests',
}
logging.getLogger(FakeInfluxHandler.__module__).setLevel(logging.DEBUG)
@property
def influx_messages(self):
return FakeInfluxHandler.get_messages(self.application,
'requests', self)
def test_that_http_method_call_details_are_recorded(self):
start = int(time.time())
response = self.fetch('/')
self.assertEqual(response.code, 204)
for key, fields, timestamp in self.influx_messages:
if key.startswith('my-service,'):
tag_dict = dict(a.split('=') for a in key.split(',')[1:])
self.assertEqual(tag_dict['handler'],
'examples.influxdb.SimpleHandler')
self.assertEqual(tag_dict['method'], 'GET')
self.assertEqual(tag_dict['host'], socket.gethostname())
value_dict = dict(a.split('=') for a in fields.split(','))
assert_between(0.25, float(value_dict['duration']), 0.3)
self.assertEqual(value_dict['status_code'], '204')
nanos_since_epoch = int(timestamp)
then = nanos_since_epoch / 1000000000
assert_between(start, then, time.time())
break
else:
self.fail('Expected to find "request" metric in {!r}'.format(
list(self.application.influx_db['requests'])))
def test_that_execution_timer_is_tracked(self):
response = self.fetch('/')
self.assertEqual(response.code, 204)
for key, fields, timestamp in self.influx_messages:
if key.startswith('my-service,'):
value_dict = dict(a.split('=') for a in fields.split(','))
assert_between(0.25, float(value_dict['sleepytime']), 0.3)
break
else:
self.fail('Expected to find "request" metric in {!r}'.format(
list(self.application.influx_db['requests'])))
def test_that_counter_is_tracked(self):
response = self.fetch('/')
self.assertEqual(response.code, 204)
for key, fields, timestamp in self.influx_messages:
if key.startswith('my-service,'):
value_dict = dict(a.split('=') for a in fields.split(','))
self.assertEqual(int(value_dict['slept']), 42)
break
else:
self.fail('Expected to find "request" metric in {!r}'.format(
list(self.application.influx_db['requests'])))
def test_that_cached_db_connection_is_used(self):
cfg = self.application.settings[metrics.InfluxDBMixin.SETTINGS_KEY]
conn = mock.Mock()
cfg['db_connection'] = conn
response = self.fetch('/')
self.assertEqual(response.code, 204)
self.assertIs(cfg['db_connection'], conn)