sprockets.mixins.metrics/sprockets/mixins/metrics/influxdb.py
2016-03-10 15:45:50 -05:00

329 lines
12 KiB
Python

import contextlib
import logging
import os
import socket
import time
from tornado import concurrent, httpclient, ioloop
from sprockets.mixins.metrics import __version__
LOGGER = logging.getLogger(__name__)
SETTINGS_KEY = 'sprockets.mixins.metrics.influxdb'
"""``self.settings`` key that configures this mix-in."""
_USER_AGENT = 'sprockets.mixins.metrics/v{}'.format(__version__)
class InfluxDBMixin(object):
"""Mix this class in to record measurements to a InfluxDB server."""
def __init__(self, application, request, **kwargs):
super(InfluxDBMixin, self).__init__(application, request, **kwargs)
self.__metrics = []
self.__tags = {
'handler': '{}.{}'.format(self.__module__,
self.__class__.__name__),
'method': request.method,
}
def set_metric_tag(self, tag, value):
"""
Add a tag to the measurement key.
:param str tag: name of the tag to set
:param str value: value to assign
This will overwrite the current value assigned to a tag
if one exists.
"""
self.__tags[tag] = value
def record_timing(self, duration, *path):
"""
Record a timing.
:param float duration: timing to record in seconds
:param path: elements of the metric path to record
A timing is a named duration value.
"""
self.__metrics.append('{}={}'.format(
self.application.influxdb.escape_str('.'.join(path)), duration))
def increase_counter(self, *path, **kwargs):
"""
Increase a counter.
:param path: elements of the path to record
:keyword int amount: value to record. If omitted, the counter
value is one.
Counters are simply values that are summed in a query.
"""
self.__metrics.append('{}={}'.format(
self.application.influxdb.escape_str('.'.join(path)),
kwargs.get('amount', 1)))
@contextlib.contextmanager
def execution_timer(self, *path):
"""
Record the time it takes to run an arbitrary code block.
:param path: elements of the metric path to record
This method returns a context manager that records the amount
of time spent inside of the context and records a value
named `path` using (:meth:`record_timing`).
"""
start = time.time()
try:
yield
finally:
self.record_timing(max(time.time(), start) - start, *path)
def on_finish(self):
super(InfluxDBMixin, self).on_finish()
self.set_metric_tag('status_code', self._status_code)
self.record_timing(self.request.request_time(), 'duration')
self.application.influxdb.submit(
self.settings[SETTINGS_KEY]['measurement'],
self.__tags,
self.__metrics)
class InfluxDBCollector(object):
"""Collects and submits stats to InfluxDB on a periodic callback.
:param str url: The InfluxDB API URL
:param str database: the database to write measurements into
:param tornado.ioloop.IOLoop: the IOLoop to spawn callbacks on.
If this parameter is :data:`None`, then the active IOLoop,
as determined by :meth:`tornado.ioloop.IOLoop.instance`,
is used.
:param int submission_interval: How often to submit metric batches in
milliseconds. Default: ``5000``
:param max_batch_size: The number of measurements to be submitted in a
single HTTP request. Default: ``1000``
:param dict tags: Default tags that are to be submitted with each metric.
This class should be constructed using the
:meth:`~sprockets.mixins.influxdb.install` method. When installed, it is
attached to the :class:`~tornado.web.Application` instance for your web
application and schedules a periodic callback to submit metrics to InfluxDB
in batches.
"""
SUBMISSION_INTERVAL = 5000
MAX_BATCH_SIZE = 1000
WARN_THRESHOLD = 25000
def __init__(self, url='http://localhost:8086', database='sprockets',
io_loop=None, submission_interval=SUBMISSION_INTERVAL,
max_batch_size=MAX_BATCH_SIZE, tags=None):
self._buffer = list()
self._client = httpclient.AsyncHTTPClient(force_instance=True)
self._client.configure(None, defaults={'user_agent': _USER_AGENT})
self._database = database
self._influxdb_url = '{}?db={}'.format(url, database)
self._interval = submission_interval or self.SUBMISSION_INTERVAL
self._io_loop = io_loop or ioloop.IOLoop.current()
self._max_batch_size = max_batch_size or self.MAX_BATCH_SIZE
self._pending = 0
self._tags = tags or {}
# Add the periodic callback for submitting metrics
LOGGER.info('Starting PeriodicCallback for writing InfluxDB metrics')
self._callback = ioloop.PeriodicCallback(self._write_metrics,
self._interval)
self._callback.start()
@staticmethod
def escape_str(value):
"""Escape the value with InfluxDB's wonderful escaping logic:
"Measurement names, tag keys, and tag values must escape any spaces or
commas using a backslash (\). For example: \ and \,. All tag values are
stored as strings and should not be surrounded in quotes."
:param str value: The value to be escaped
:rtype: str
"""
return str(value).replace(' ', '\ ').replace(',', '\,')
@property
def database(self):
"""Return the configured database name.
:rtype: str
"""
return self._database
def shutdown(self):
"""Invoke on shutdown of your application to stop the periodic
callbacks and flush any remaining metrics.
Returns a future that is complete when all pending metrics have been
submitted.
:rtype: :class:`~tornado.concurrent.TracebackFuture()`
"""
future = concurrent.TracebackFuture()
self._callback.stop()
self._write_metrics()
self._shutdown_wait(future)
return future
def submit(self, measurement, tags, values):
"""Add a measurement to the buffer that will be submitted to InfluxDB
on the next periodic callback for writing metrics.
:param str measurement: The measurement name
:param dict tags: The measurement tags
:param list values: The recorded measurements
"""
self._buffer.append('{},{} {} {:d}'.format(
self.escape_str(measurement),
self._get_tag_string(tags),
','.join(values),
int(time.time() * 1000000000)))
if len(self._buffer) > self.WARN_THRESHOLD:
LOGGER.warning('InfluxDB metric buffer is > %i (%i)',
self.WARN_THRESHOLD, len(self._buffer))
def _get_tag_string(self, tags):
"""Return the tags to be submitted with a measurement combining the
default tags that were passed in when constructing the class along
with any measurement specific tags passed into the
:meth:`~InfluxDBConnection.submit` method. Tags will be properly
escaped and formatted for submission.
:param dict tags: Measurement specific tags
:rtype: str
"""
values = dict(self._tags)
values.update(tags)
return ','.join(['{}={}'.format(self.escape_str(k), self.escape_str(v))
for k, v in values.items()])
def _on_write_response(self, response):
"""This is invoked by the Tornado IOLoop when the HTTP request to
InfluxDB has returned with a result.
:param response: The response from InfluxDB
:type response: :class:`~tornado.httpclient.HTTPResponse`
"""
self._pending -= 1
LOGGER.debug('InfluxDB batch response: %s', response.code)
if response.error:
LOGGER.error('InfluxDB batch submission error: %s', response.error)
def _shutdown_wait(self, future):
"""Pause briefly allowing any pending metric writes to complete before
shutting down.
:param future tornado.concurrent.TracebackFuture: The future to resulve
when the shutdown is complete.
"""
if not self._pending:
future.set_result(True)
return
LOGGER.debug('Waiting for pending metric writes')
self._io_loop.add_timeout(self._io_loop.time() + 0.1,
self._shutdown_wait,
(future,))
def _write_metrics(self):
"""Submit the metrics in the buffer to InfluxDB. This is invoked
by the periodic callback scheduled when the class is created.
It will submit batches until the buffer is empty.
"""
if not self._buffer:
return
LOGGER.debug('InfluxDB buffer has %i items', len(self._buffer))
while self._buffer:
body = '\n'.join(self._buffer[:self._max_batch_size])
self._buffer = self._buffer[self._max_batch_size:]
self._pending += 1
self._client.fetch(self._influxdb_url, method='POST',
body=body.encode('utf-8'),
raise_error=False,
callback=self._on_write_response)
LOGGER.debug('Submitted all InfluxDB metrics for writing')
def install(application, **kwargs):
"""Call this to install the InfluxDB collector into a Tornado application.
:param tornado.web.Application application: the application to
install the collector into.
:param kwargs: keyword parameters to pass to the
:class:`InfluxDBCollector` initializer.
:returns: :data:`True` if the client was installed by this call
and :data:`False` otherwise.
Optional configuration values:
- **url** The InfluxDB API URL. If URL is not specified, the
``INFLUX_HOST`` and ``INFLUX_PORT`` environment variables will be used
to construct the URL to pass into the :class:`InfluxDBCollector`.
- **database** the database to write measurements into.
The default is ``sprockets``.
- **io_loop** A :class:`~tornado.ioloop.IOLoop` to use
- **submission_interval** How often to submit metric batches in
milliseconds. Default: ``5000``
- **max_batch_size** The number of measurements to be submitted in a
single HTTP request. Default: ``1000``
- **tags** Default tags that are to be submitted with each metric.
"""
if getattr(application, 'influxdb', None) is not None:
LOGGER.warning('InfluxDBCollector is already installed')
return False
# Get config values
url = 'http://{}:{}/write'.format(os.environ.get('INFLUX_HOST'),
os.environ.get('INFLUX_PORT'))
kwargs.setdefault('url', url)
# Build the full tag dict and replace what was passed in
tags = {'hostname': socket.gethostname()}
if os.environ.get('ENVIRONMENT'):
tags['environment'] = os.environ.get('ENVIRONMENT')
if os.environ.get('SERVICE'):
tags['service'] = os.environ.get('SERVICE')
tags.update(kwargs.get('tags', {}))
kwargs['tags'] = tags
# Create and start the collector
setattr(application, 'influxdb', InfluxDBCollector(**kwargs))
return True
def shutdown(application):
"""Invoke to shutdown the InfluxDB collector, writing any pending
measurements to InfluxDB before stopping.
:param tornado.web.Application application: the application to
install the collector into.
:rtype: tornado.concurrent.TracebackFuture or None
"""
collector = getattr(application, 'influxdb', None)
if collector:
return collector.shutdown()