sprockets.mixins.metrics/sprockets/mixins/metrics/influxdb.py

179 lines
6.1 KiB
Python
Raw Normal View History

import contextlib
import socket
import time
from tornado import httpclient, ioloop
class InfluxDBConnection(object):
"""Connection to an InfluxDB instance.
:param str write_url: the URL to send HTTP requests to
:param str database: the database to write measurements into
:param tornado.ioloop.IOLoop: the IOLoop to spawn callbacks on.
If this parameter is :data:`None`, then the active IOLoop,
as determined by :meth:`tornado.ioloop.IOLoop.instance`,
is used.
:param int max_buffer_time: the maximum elasped time measurements
should remain in buffer before writing to InfluxDB.
:param int max_buffer_length: the maximum number of measurements to
buffer before writing to InfluxDB.
An instance of this class is stored in the application settings
and used to asynchronously send measurements to InfluxDB instance.
Each measurement is sent by spawning a context-free callback on
the IOloop.
"""
def __init__(self, write_url, database, io_loop=None,
max_buffer_time=5, max_buffer_length=100):
self.io_loop = ioloop.IOLoop.instance() if io_loop is None else io_loop
self.client = httpclient.AsyncHTTPClient()
self.write_url = '{}?db={}'.format(write_url, database)
self._buffer = []
self._max_buffer_time = float(max_buffer_time)
self._max_buffer_length = int(max_buffer_length)
self._last_write = self.io_loop.time()
2016-02-19 12:32:37 +00:00
def submit(self, measurement, tags, values):
"""Write the data using the HTTP API
2016-02-19 12:32:37 +00:00
:param str measurement: The required measurement name
:param list tags: The measurement tags
:param list values: The recorded measurements
"""
2016-02-19 12:32:37 +00:00
body = '{},{} {} {:d}'.format(measurement, ','.join(tags),
','.join(values),
int(time.time() * 1000000000))
self._buffer.append(body)
if self._should_write:
self._write()
2016-02-19 12:32:37 +00:00
def _write(self):
"""Write the measurement"""
body = '\n'.join(self._buffer)
request = httpclient.HTTPRequest(self.write_url, method='POST',
body=body.encode('utf-8'))
ioloop.IOLoop.current().spawn_callback(self.client.fetch, request)
self._last_write = self.io_loop.time()
del self._buffer[:]
2016-02-19 12:32:37 +00:00
@property
def _should_write(self):
"""Returns ``True`` if the buffered measurements should be sent"""
if len(self._buffer) >= self._max_buffer_length:
return True
if self.io_loop.time() >= (self._last_write + self._max_buffer_time):
return True
return False
class InfluxDBMixin(object):
"""
Mix this class in to record measurements to a InfluxDB server.
**Configuration**
:database:
InfluxDB database to write measurements to. This is passed
as the ``db`` query parameter when writing to Influx.
https://docs.influxdata.com/influxdb/v0.9/guides/writing_data/
:write_url:
The URL that the InfluxDB write endpoint is available on.
This is used as-is to write data into Influx.
"""
SETTINGS_KEY = 'sprockets.mixins.metrics.influxdb'
"""``self.settings`` key that configures this mix-in."""
def initialize(self):
super(InfluxDBMixin, self).initialize()
if self.SETTINGS_KEY in self.settings:
settings = self.settings[self.SETTINGS_KEY]
if 'db_connection' not in settings:
keys = ('max_buffer_time', 'max_buffer_length')
kwargs = {k: settings[k] for k in keys if k in settings}
settings['db_connection'] = InfluxDBConnection(
settings['write_url'], settings['database'], **kwargs)
self.__metrics = []
self.__tags = {
'host': socket.gethostname(),
'handler': '{}.{}'.format(self.__module__,
self.__class__.__name__),
'method': self.request.method,
}
def set_metric_tag(self, tag, value):
"""
Add a tag to the measurement key.
:param str tag: name of the tag to set
:param str value: value to assign
This will overwrite the current value assigned to a tag
if one exists.
"""
self.__tags[tag] = value
def record_timing(self, duration, *path):
"""
Record a timing.
:param float duration: timing to record in seconds
:param path: elements of the metric path to record
A timing is a named duration value.
"""
self.__metrics.append('{}={}'.format('.'.join(path), duration))
def increase_counter(self, *path, **kwargs):
"""
Increase a counter.
:param path: elements of the path to record
:keyword int amount: value to record. If omitted, the counter
value is one.
Counters are simply values that are summed in a query.
"""
self.__metrics.append('{}={}'.format('.'.join(path),
kwargs.get('amount', 1)))
@contextlib.contextmanager
def execution_timer(self, *path):
"""
Record the time it takes to run an arbitrary code block.
:param path: elements of the metric path to record
This method returns a context manager that records the amount
of time spent inside of the context and records a value
named `path` using (:meth:`record_timing`).
"""
start = time.time()
try:
yield
finally:
fini = max(time.time(), start)
self.record_timing(fini - start, *path)
def on_finish(self):
super(InfluxDBMixin, self).on_finish()
self.set_metric_tag('status_code', self._status_code)
self.record_timing(self.request.request_time(), 'duration')
self.settings[self.SETTINGS_KEY]['db_connection'].submit(
self.settings[self.SETTINGS_KEY]['measurement'],
('{}={}'.format(k, v) for k, v in self.__tags.items()),
self.__metrics,
)