mirror of
https://github.com/sprockets/sprockets.mixins.metrics.git
synced 2024-11-22 03:00:25 +00:00
Merge pull request #15 from sprockets/refactor
Implement collector/periodic callback
This commit is contained in:
commit
29ab41a1ee
8 changed files with 395 additions and 294 deletions
146
README.rst
146
README.rst
|
@ -1,14 +1,40 @@
|
|||
sprockets.mixins.metrics
|
||||
========================
|
||||
Adjust counter and timer metrics in InfluxDB or Graphite using the same API.
|
||||
Adjust counter and timer metrics in `InfluxDB`_ or `StatsD`_ using the same API.
|
||||
|
||||
The mix-in is configured through the ``tornado.web.Application`` settings
|
||||
property using a key defined by the specific mix-in.
|
||||
|
||||
Statsd Mixin
|
||||
------------
|
||||
|
||||
The following snippet configures the StatsD mix-in from common environment
|
||||
variables. This simple handler will emit a timer metric that identifies each
|
||||
call to the ``get`` method as well as a separate metric for the database query.
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
from sprockets.mixins import mediatype, metrics
|
||||
import os
|
||||
|
||||
from sprockets.mixins import mediatype
|
||||
from sprockets.mixins.metrics import statsd
|
||||
from tornado import gen, web
|
||||
import queries
|
||||
|
||||
class MyHandler(metrics.StatsdMixin, mediatype.ContentMixin,
|
||||
def make_application():
|
||||
settings = {
|
||||
statsd.SETTINGS_KEY: {
|
||||
'namespace': 'my-application',
|
||||
'host': os.environ.get('STATSD_HOST', '127.0.0.1'),
|
||||
'port': os.environ.get('STATSD_PORT', '8125'),
|
||||
}
|
||||
}
|
||||
return web.Application([
|
||||
# insert handlers here
|
||||
], **settings)
|
||||
|
||||
class MyHandler(statsd.StatsdMixin,
|
||||
mediatype.ContentMixin,
|
||||
web.RequestHandler):
|
||||
|
||||
def initialize(self):
|
||||
|
@ -22,38 +48,8 @@ Adjust counter and timer metrics in InfluxDB or Graphite using the same API.
|
|||
obj_id)
|
||||
self.send_response(result)
|
||||
|
||||
This simple handler will emit a timer metric that identifies each call to the
|
||||
``get`` method as well as a separate metric for the database query. Switching
|
||||
from using `statsd`_ to `InfluxDB`_ is simply a matter of switch from the
|
||||
``metrics.StatsdMixin`` to the ``metrics.InfluxDBMixin``.
|
||||
|
||||
The mix-in is configured through the ``tornado.web.Application`` settings
|
||||
property using a key defined by the specific mix-in.
|
||||
|
||||
Statsd Mixin
|
||||
------------
|
||||
|
||||
The following snippet configures the StatsD mix-in from common environment
|
||||
variables:
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
import os
|
||||
|
||||
from sprockets.mixins import metrics
|
||||
from tornado import web
|
||||
|
||||
def make_application():
|
||||
settings = {
|
||||
metrics.StatsdMixin.SETTINGS_KEY: {
|
||||
'namespace': 'my-application',
|
||||
'host': os.environ.get('STATSD_HOST', '127.0.0.1'),
|
||||
'port': os.environ.get('STATSD_PORT', '8125'),
|
||||
}
|
||||
}
|
||||
return web.Application([
|
||||
# insert handlers here
|
||||
], **settings)
|
||||
Settings
|
||||
^^^^^^^^
|
||||
|
||||
:namespace: The namespace for the measurements
|
||||
:host: The Statsd host
|
||||
|
@ -69,32 +65,55 @@ variables:
|
|||
|
||||
import os
|
||||
|
||||
from sprockets.mixins import metrics
|
||||
from tornado import web
|
||||
from sprockets.mixins.metrics import influxdb
|
||||
from sprockets.mixins import postgresql
|
||||
from tornado import gen, web
|
||||
|
||||
def make_application():
|
||||
settings = {
|
||||
metrics.InfluxDBMixin.SETTINGS_KEY: {
|
||||
'measurement': 'my-application',
|
||||
'database': 'services',
|
||||
'write_url': 'http://{}:{}/write'.format(
|
||||
os.environ.get('INFLUX_HOST', '127.0.0.1'),
|
||||
os.environ.get('INFLUX_PORT', 8086)),
|
||||
'max_buffer_time': 3,
|
||||
'max_buffer_length': 100
|
||||
}
|
||||
def make_app(**settings):
|
||||
settings[influxdb.SETTINGS_KEY] = {
|
||||
'measurement': 'rollup',
|
||||
}
|
||||
return web.Application([
|
||||
# insert handlers here
|
||||
], **settings)
|
||||
|
||||
:measurement: The InfluxDB measurement name
|
||||
:database: The InfluxDB database to write measurements into
|
||||
:write_url: the InfluxDB write URL to send HTTP requests to
|
||||
:max_buffer_time: The maximum elasped time measurements should remain in
|
||||
buffer before writing to InfluxDB.
|
||||
:max_buffer_length: The maximum number of measurements to
|
||||
buffer before writing to InfluxDB.
|
||||
application = web.Application(
|
||||
[
|
||||
web.url(r'/', RequestHandler),
|
||||
], **settings)
|
||||
|
||||
influxdb.install({'url': 'http://localhost:8086',
|
||||
'database': 'tornado-app'})
|
||||
return application
|
||||
|
||||
|
||||
class MyHandler(influxdb.InfluxDBMixin,
|
||||
postgresql.HandlerMixin,
|
||||
web.RequestHandler):
|
||||
|
||||
@gen.coroutine
|
||||
def get(self, obj_id):
|
||||
with self.execution_timer('dbquery', 'get'):
|
||||
result = yield self.postgresql_session.query(
|
||||
'SELECT * FROM foo WHERE id=%s', obj_id)
|
||||
self.send_response(result)
|
||||
|
||||
If your application handles signal handling for shutdowns, the
|
||||
:meth:`~sprockets.mixins.influxdb.shutdown` method will try to cleanly ensure
|
||||
that any buffered metrics in the InfluxDB collector are written prior to
|
||||
shutting down. The method returns a :cls:`~tornado.concurrent.TracebackFuture`
|
||||
that should be waited on prior to shutting down.
|
||||
|
||||
Settings
|
||||
^^^^^^^^
|
||||
|
||||
:url: The InfluxDB API URL
|
||||
:database: the database to write measurements into
|
||||
:submission_interval: How often to submit metric batches in
|
||||
milliseconds. Default: ``5000``
|
||||
:max_batch_size: The number of measurements to be submitted in a
|
||||
single HTTP request. Default: ``1000``
|
||||
:tags: Default tags that are to be submitted with each metric. The tag
|
||||
``hostname`` is added by default along with ``environment`` and ``service``
|
||||
if the corresponding ``ENVIRONMENT`` or ``SERVICE`` environment variables
|
||||
are set.
|
||||
|
||||
Development Quickstart
|
||||
----------------------
|
||||
|
@ -104,6 +123,13 @@ Development Quickstart
|
|||
$ . ./env/bin/activate
|
||||
(env)$ env/bin/pip install -r requires/development.txt
|
||||
(env)$ nosetests
|
||||
test_metrics_with_buffer_not_flush (tests.InfluxDbTests) ... ok
|
||||
test_that_cached_db_connection_is_used (tests.InfluxDbTests) ... ok
|
||||
test_that_counter_is_tracked (tests.InfluxDbTests) ... ok
|
||||
test_that_execution_timer_is_tracked (tests.InfluxDbTests) ... ok
|
||||
test_that_http_method_call_details_are_recorded (tests.InfluxDbTests) ... ok
|
||||
test_that_metric_tag_is_tracked (tests.InfluxDbTests) ... ok
|
||||
test_that_add_metric_tag_is_ignored (tests.StatsdMethodTimingTests) ... ok
|
||||
test_that_cached_socket_is_used (tests.StatsdMethodTimingTests) ... ok
|
||||
test_that_counter_accepts_increment_value (tests.StatsdMethodTimingTests) ... ok
|
||||
test_that_counter_increment_defaults_to_one (tests.StatsdMethodTimingTests) ... ok
|
||||
|
@ -112,12 +138,12 @@ Development Quickstart
|
|||
test_that_http_method_call_is_recorded (tests.StatsdMethodTimingTests) ... ok
|
||||
|
||||
----------------------------------------------------------------------
|
||||
Ran 6 tests in 1.089s
|
||||
Ran 13 tests in 3.572s
|
||||
|
||||
OK
|
||||
(env)$ ./setup.py build_sphinx -q
|
||||
running build_sphinx
|
||||
(env)$ open build/sphinx/html/index.html
|
||||
|
||||
.. _statsd: https://github.com/etsy/statsd
|
||||
.. _StatsD: https://github.com/etsy/statsd
|
||||
.. _InfluxDB: https://influxdata.com
|
||||
|
|
|
@ -1,11 +1,10 @@
|
|||
import os
|
||||
import signal
|
||||
|
||||
from sprockets.mixins import metrics
|
||||
from sprockets.mixins.metrics import influxdb
|
||||
from tornado import concurrent, gen, ioloop, web
|
||||
|
||||
|
||||
class SimpleHandler(metrics.InfluxDBMixin, web.RequestHandler):
|
||||
class SimpleHandler(influxdb.InfluxDBMixin, web.RequestHandler):
|
||||
"""
|
||||
Simply emits a few metrics around the GET method.
|
||||
|
||||
|
@ -65,17 +64,14 @@ def make_application():
|
|||
by the ``service`` setting.
|
||||
|
||||
"""
|
||||
influx_url = 'http://{}:{}/write'.format(
|
||||
os.environ.get('INFLUX_HOST', '127.0.0.1'),
|
||||
os.environ.get('INFLUX_PORT', 8086))
|
||||
settings = {
|
||||
metrics.InfluxDBMixin.SETTINGS_KEY: {
|
||||
'measurement': 'cli',
|
||||
'database': 'testing',
|
||||
'write_url': influx_url,
|
||||
influxdb.SETTINGS_KEY: {
|
||||
'measurement': 'example',
|
||||
}
|
||||
}
|
||||
return web.Application([web.url('/', SimpleHandler)], **settings)
|
||||
application = web.Application([web.url('/', SimpleHandler)], **settings)
|
||||
influxdb.install(application, **{'database': 'testing'})
|
||||
return application
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
|
|
@ -1,10 +1,10 @@
|
|||
import signal
|
||||
|
||||
from sprockets.mixins import metrics
|
||||
from sprockets.mixins.metrics import statsd
|
||||
from tornado import concurrent, gen, ioloop, web
|
||||
|
||||
|
||||
class SimpleHandler(metrics.StatsdMixin, web.RequestHandler):
|
||||
class SimpleHandler(statsd.StatsdMixin, web.RequestHandler):
|
||||
"""
|
||||
Simply emits a timing metric around the method call.
|
||||
|
||||
|
@ -53,7 +53,7 @@ def make_application():
|
|||
|
||||
"""
|
||||
settings = {
|
||||
metrics.StatsdMixin.SETTINGS_KEY: {
|
||||
statsd.SETTINGS_KEY: {
|
||||
'namespace': 'webapps',
|
||||
'host': '127.0.0.1',
|
||||
'port': 8125,
|
||||
|
|
|
@ -1,13 +1,3 @@
|
|||
try:
|
||||
from .influxdb import InfluxDBMixin
|
||||
from .statsd import StatsdMixin
|
||||
except ImportError as error:
|
||||
def InfluxDBMixin(*args, **kwargs):
|
||||
raise error
|
||||
|
||||
def StatsdMixin(*args, **kwargs):
|
||||
raise error
|
||||
|
||||
version_info = (1, 1, 1)
|
||||
version_info = (2, 0, 0)
|
||||
__version__ = '.'.join(str(v) for v in version_info)
|
||||
__all__ = ['__version__', 'version_info', 'InfluxDBMixin', 'StatsdMixin']
|
||||
__all__ = ['__version__', 'version_info']
|
||||
|
|
|
@ -1,119 +1,31 @@
|
|||
import contextlib
|
||||
import logging
|
||||
import os
|
||||
import socket
|
||||
import time
|
||||
|
||||
from tornado import httpclient, ioloop
|
||||
from tornado import concurrent, httpclient, ioloop
|
||||
|
||||
from sprockets.mixins.metrics import __version__
|
||||
|
||||
class InfluxDBConnection(object):
|
||||
"""Connection to an InfluxDB instance.
|
||||
LOGGER = logging.getLogger(__name__)
|
||||
|
||||
:param str write_url: the URL to send HTTP requests to
|
||||
:param str database: the database to write measurements into
|
||||
:param tornado.ioloop.IOLoop: the IOLoop to spawn callbacks on.
|
||||
If this parameter is :data:`None`, then the active IOLoop,
|
||||
as determined by :meth:`tornado.ioloop.IOLoop.instance`,
|
||||
is used.
|
||||
:param int max_buffer_time: the maximum elasped time measurements
|
||||
should remain in buffer before writing to InfluxDB.
|
||||
:param int max_buffer_length: the maximum number of measurements to
|
||||
buffer before writing to InfluxDB.
|
||||
SETTINGS_KEY = 'sprockets.mixins.metrics.influxdb'
|
||||
"""``self.settings`` key that configures this mix-in."""
|
||||
|
||||
An instance of this class is stored in the application settings
|
||||
and used to asynchronously send measurements to InfluxDB instance.
|
||||
Each measurement is sent by spawning a context-free callback on
|
||||
the IOloop.
|
||||
|
||||
"""
|
||||
|
||||
MAX_BUFFER_TIME = 5
|
||||
MAX_BUFFER_LENGTH = 100
|
||||
|
||||
def __init__(self, write_url, database, io_loop=None,
|
||||
max_buffer_time=None, max_buffer_length=None):
|
||||
self.io_loop = ioloop.IOLoop.instance() if io_loop is None else io_loop
|
||||
self.client = httpclient.AsyncHTTPClient()
|
||||
self.write_url = '{}?db={}'.format(write_url, database)
|
||||
|
||||
self._buffer = []
|
||||
if max_buffer_time is None:
|
||||
max_buffer_time = self.MAX_BUFFER_TIME
|
||||
if max_buffer_length is None:
|
||||
max_buffer_length = self.MAX_BUFFER_LENGTH
|
||||
self._max_buffer_time = float(max_buffer_time)
|
||||
self._max_buffer_length = int(max_buffer_length)
|
||||
self._last_write = self.io_loop.time()
|
||||
|
||||
def submit(self, measurement, tags, values):
|
||||
"""Write the data using the HTTP API
|
||||
|
||||
:param str measurement: The required measurement name
|
||||
:param list tags: The measurement tags
|
||||
:param list values: The recorded measurements
|
||||
"""
|
||||
body = '{},{} {} {:d}'.format(measurement, ','.join(tags),
|
||||
','.join(values),
|
||||
int(time.time() * 1000000000))
|
||||
self._buffer.append(body)
|
||||
if self._should_write:
|
||||
self._write()
|
||||
|
||||
def _write(self):
|
||||
"""Write the measurement"""
|
||||
body = '\n'.join(self._buffer)
|
||||
request = httpclient.HTTPRequest(self.write_url, method='POST',
|
||||
body=body.encode('utf-8'))
|
||||
ioloop.IOLoop.current().spawn_callback(self.client.fetch, request)
|
||||
self._last_write = self.io_loop.time()
|
||||
del self._buffer[:]
|
||||
|
||||
@property
|
||||
def _should_write(self):
|
||||
"""Returns ``True`` if the buffered measurements should be sent"""
|
||||
if len(self._buffer) >= self._max_buffer_length:
|
||||
return True
|
||||
if self.io_loop.time() >= (self._last_write + self._max_buffer_time):
|
||||
return True
|
||||
return False
|
||||
_USER_AGENT = 'sprockets.mixins.metrics/v{}'.format(__version__)
|
||||
|
||||
|
||||
class InfluxDBMixin(object):
|
||||
"""
|
||||
Mix this class in to record measurements to a InfluxDB server.
|
||||
|
||||
**Configuration**
|
||||
|
||||
:database:
|
||||
InfluxDB database to write measurements to. This is passed
|
||||
as the ``db`` query parameter when writing to Influx.
|
||||
|
||||
https://docs.influxdata.com/influxdb/v0.9/guides/writing_data/
|
||||
|
||||
:write_url:
|
||||
The URL that the InfluxDB write endpoint is available on.
|
||||
This is used as-is to write data into Influx.
|
||||
|
||||
"""
|
||||
|
||||
SETTINGS_KEY = 'sprockets.mixins.metrics.influxdb'
|
||||
"""``self.settings`` key that configures this mix-in."""
|
||||
|
||||
def initialize(self):
|
||||
super(InfluxDBMixin, self).initialize()
|
||||
if self.SETTINGS_KEY in self.settings:
|
||||
settings = self.settings[self.SETTINGS_KEY]
|
||||
if 'db_connection' not in settings:
|
||||
settings['db_connection'] = InfluxDBConnection(
|
||||
settings['write_url'], settings['database'],
|
||||
max_buffer_time=settings.get('max_buffer_time'),
|
||||
max_buffer_length=settings.get('max_buffer_length'))
|
||||
"""Mix this class in to record measurements to a InfluxDB server."""
|
||||
|
||||
def __init__(self, application, request, **kwargs):
|
||||
super(InfluxDBMixin, self).__init__(application, request, **kwargs)
|
||||
self.__metrics = []
|
||||
self.__tags = {
|
||||
'host': socket.gethostname(),
|
||||
'handler': '{}.{}'.format(self.__module__,
|
||||
self.__class__.__name__),
|
||||
'method': self.request.method,
|
||||
'method': request.method,
|
||||
}
|
||||
|
||||
def set_metric_tag(self, tag, value):
|
||||
|
@ -139,7 +51,8 @@ class InfluxDBMixin(object):
|
|||
A timing is a named duration value.
|
||||
|
||||
"""
|
||||
self.__metrics.append('{}={}'.format('.'.join(path), duration))
|
||||
self.__metrics.append('{}={}'.format(
|
||||
self.application.influxdb.escape_str('.'.join(path)), duration))
|
||||
|
||||
def increase_counter(self, *path, **kwargs):
|
||||
"""
|
||||
|
@ -152,8 +65,9 @@ class InfluxDBMixin(object):
|
|||
Counters are simply values that are summed in a query.
|
||||
|
||||
"""
|
||||
self.__metrics.append('{}={}'.format('.'.join(path),
|
||||
kwargs.get('amount', 1)))
|
||||
self.__metrics.append('{}={}'.format(
|
||||
self.application.influxdb.escape_str('.'.join(path)),
|
||||
kwargs.get('amount', 1)))
|
||||
|
||||
@contextlib.contextmanager
|
||||
def execution_timer(self, *path):
|
||||
|
@ -171,15 +85,247 @@ class InfluxDBMixin(object):
|
|||
try:
|
||||
yield
|
||||
finally:
|
||||
fini = max(time.time(), start)
|
||||
self.record_timing(fini - start, *path)
|
||||
self.record_timing(max(time.time(), start) - start, *path)
|
||||
|
||||
def on_finish(self):
|
||||
super(InfluxDBMixin, self).on_finish()
|
||||
self.set_metric_tag('status_code', self._status_code)
|
||||
self.record_timing(self.request.request_time(), 'duration')
|
||||
self.settings[self.SETTINGS_KEY]['db_connection'].submit(
|
||||
self.settings[self.SETTINGS_KEY]['measurement'],
|
||||
('{}={}'.format(k, v) for k, v in self.__tags.items()),
|
||||
self.__metrics,
|
||||
)
|
||||
self.application.influxdb.submit(
|
||||
self.settings[SETTINGS_KEY]['measurement'],
|
||||
self.__tags,
|
||||
self.__metrics)
|
||||
|
||||
|
||||
class InfluxDBCollector(object):
|
||||
"""Collects and submits stats to InfluxDB on a periodic callback.
|
||||
|
||||
:param str url: The InfluxDB API URL
|
||||
:param str database: the database to write measurements into
|
||||
:param tornado.ioloop.IOLoop: the IOLoop to spawn callbacks on.
|
||||
If this parameter is :data:`None`, then the active IOLoop,
|
||||
as determined by :meth:`tornado.ioloop.IOLoop.instance`,
|
||||
is used.
|
||||
:param int submission_interval: How often to submit metric batches in
|
||||
milliseconds. Default: ``5000``
|
||||
:param max_batch_size: The number of measurements to be submitted in a
|
||||
single HTTP request. Default: ``1000``
|
||||
:param dict tags: Default tags that are to be submitted with each metric.
|
||||
|
||||
This class should be constructed using the
|
||||
:meth:`~sprockets.mixins.influxdb.install` method. When installed, it is
|
||||
attached to the :class:`~tornado.web.Application` instance for your web
|
||||
application and schedules a periodic callback to submit metrics to InfluxDB
|
||||
in batches.
|
||||
|
||||
"""
|
||||
SUBMISSION_INTERVAL = 5000
|
||||
MAX_BATCH_SIZE = 1000
|
||||
WARN_THRESHOLD = 25000
|
||||
|
||||
def __init__(self, url='http://localhost:8086', database='sprockets',
|
||||
io_loop=None, submission_interval=SUBMISSION_INTERVAL,
|
||||
max_batch_size=MAX_BATCH_SIZE, tags=None):
|
||||
self._buffer = list()
|
||||
self._database = database
|
||||
self._influxdb_url = '{}?db={}'.format(url, database)
|
||||
self._interval = submission_interval or self.SUBMISSION_INTERVAL
|
||||
self._io_loop = io_loop or ioloop.IOLoop.current()
|
||||
self._max_batch_size = max_batch_size or self.MAX_BATCH_SIZE
|
||||
self._pending = 0
|
||||
self._tags = tags or {}
|
||||
|
||||
self._client = httpclient.AsyncHTTPClient(force_instance=True,
|
||||
io_loop=self._io_loop)
|
||||
self._client.configure(None, defaults={'user_agent': _USER_AGENT})
|
||||
|
||||
# Add the periodic callback for submitting metrics
|
||||
LOGGER.info('Starting PeriodicCallback for writing InfluxDB metrics')
|
||||
self._callback = ioloop.PeriodicCallback(self._write_metrics,
|
||||
self._interval)
|
||||
self._callback.start()
|
||||
|
||||
@staticmethod
|
||||
def escape_str(value):
|
||||
"""Escape the value with InfluxDB's wonderful escaping logic:
|
||||
|
||||
"Measurement names, tag keys, and tag values must escape any spaces or
|
||||
commas using a backslash (\). For example: \ and \,. All tag values are
|
||||
stored as strings and should not be surrounded in quotes."
|
||||
|
||||
:param str value: The value to be escaped
|
||||
:rtype: str
|
||||
|
||||
"""
|
||||
return str(value).replace(' ', '\ ').replace(',', '\,')
|
||||
|
||||
@property
|
||||
def database(self):
|
||||
"""Return the configured database name.
|
||||
|
||||
:rtype: str
|
||||
|
||||
"""
|
||||
return self._database
|
||||
|
||||
def shutdown(self):
|
||||
"""Invoke on shutdown of your application to stop the periodic
|
||||
callbacks and flush any remaining metrics.
|
||||
|
||||
Returns a future that is complete when all pending metrics have been
|
||||
submitted.
|
||||
|
||||
:rtype: :class:`~tornado.concurrent.TracebackFuture()`
|
||||
|
||||
"""
|
||||
future = concurrent.TracebackFuture()
|
||||
self._callback.stop()
|
||||
self._write_metrics()
|
||||
self._shutdown_wait(future)
|
||||
return future
|
||||
|
||||
def submit(self, measurement, tags, values):
|
||||
"""Add a measurement to the buffer that will be submitted to InfluxDB
|
||||
on the next periodic callback for writing metrics.
|
||||
|
||||
:param str measurement: The measurement name
|
||||
:param dict tags: The measurement tags
|
||||
:param list values: The recorded measurements
|
||||
|
||||
"""
|
||||
self._buffer.append('{},{} {} {:d}'.format(
|
||||
self.escape_str(measurement),
|
||||
self._get_tag_string(tags),
|
||||
','.join(values),
|
||||
int(time.time() * 1000000000)))
|
||||
if len(self._buffer) > self.WARN_THRESHOLD:
|
||||
LOGGER.warning('InfluxDB metric buffer is > %i (%i)',
|
||||
self.WARN_THRESHOLD, len(self._buffer))
|
||||
|
||||
def _get_tag_string(self, tags):
|
||||
"""Return the tags to be submitted with a measurement combining the
|
||||
default tags that were passed in when constructing the class along
|
||||
with any measurement specific tags passed into the
|
||||
:meth:`~InfluxDBConnection.submit` method. Tags will be properly
|
||||
escaped and formatted for submission.
|
||||
|
||||
:param dict tags: Measurement specific tags
|
||||
:rtype: str
|
||||
|
||||
"""
|
||||
values = dict(self._tags)
|
||||
values.update(tags)
|
||||
return ','.join(['{}={}'.format(self.escape_str(k), self.escape_str(v))
|
||||
for k, v in values.items()])
|
||||
|
||||
def _on_write_response(self, response):
|
||||
"""This is invoked by the Tornado IOLoop when the HTTP request to
|
||||
InfluxDB has returned with a result.
|
||||
|
||||
:param response: The response from InfluxDB
|
||||
:type response: :class:`~tornado.httpclient.HTTPResponse`
|
||||
|
||||
"""
|
||||
self._pending -= 1
|
||||
LOGGER.debug('InfluxDB batch response: %s', response.code)
|
||||
if response.error:
|
||||
LOGGER.error('InfluxDB batch submission error: %s', response.error)
|
||||
|
||||
def _shutdown_wait(self, future):
|
||||
"""Pause briefly allowing any pending metric writes to complete before
|
||||
shutting down.
|
||||
|
||||
:param future tornado.concurrent.TracebackFuture: The future to resulve
|
||||
when the shutdown is complete.
|
||||
|
||||
"""
|
||||
if not self._pending:
|
||||
future.set_result(True)
|
||||
return
|
||||
LOGGER.debug('Waiting for pending metric writes')
|
||||
self._io_loop.add_timeout(self._io_loop.time() + 0.1,
|
||||
self._shutdown_wait,
|
||||
(future,))
|
||||
|
||||
def _write_metrics(self):
|
||||
"""Submit the metrics in the buffer to InfluxDB. This is invoked
|
||||
by the periodic callback scheduled when the class is created.
|
||||
|
||||
It will submit batches until the buffer is empty.
|
||||
|
||||
"""
|
||||
if not self._buffer:
|
||||
return
|
||||
LOGGER.debug('InfluxDB buffer has %i items', len(self._buffer))
|
||||
while self._buffer:
|
||||
body = '\n'.join(self._buffer[:self._max_batch_size])
|
||||
self._buffer = self._buffer[self._max_batch_size:]
|
||||
self._pending += 1
|
||||
self._client.fetch(self._influxdb_url, method='POST',
|
||||
body=body.encode('utf-8'),
|
||||
raise_error=False,
|
||||
callback=self._on_write_response)
|
||||
LOGGER.debug('Submitted all InfluxDB metrics for writing')
|
||||
|
||||
|
||||
def install(application, **kwargs):
|
||||
"""Call this to install the InfluxDB collector into a Tornado application.
|
||||
|
||||
:param tornado.web.Application application: the application to
|
||||
install the collector into.
|
||||
:param kwargs: keyword parameters to pass to the
|
||||
:class:`InfluxDBCollector` initializer.
|
||||
:returns: :data:`True` if the client was installed by this call
|
||||
and :data:`False` otherwise.
|
||||
|
||||
|
||||
Optional configuration values:
|
||||
|
||||
- **url** The InfluxDB API URL. If URL is not specified, the
|
||||
``INFLUX_HOST`` and ``INFLUX_PORT`` environment variables will be used
|
||||
to construct the URL to pass into the :class:`InfluxDBCollector`.
|
||||
- **database** the database to write measurements into.
|
||||
The default is ``sprockets``.
|
||||
- **io_loop** A :class:`~tornado.ioloop.IOLoop` to use
|
||||
- **submission_interval** How often to submit metric batches in
|
||||
milliseconds. Default: ``5000``
|
||||
- **max_batch_size** The number of measurements to be submitted in a
|
||||
single HTTP request. Default: ``1000``
|
||||
- **tags** Default tags that are to be submitted with each metric.
|
||||
|
||||
"""
|
||||
if getattr(application, 'influxdb', None) is not None:
|
||||
LOGGER.warning('InfluxDBCollector is already installed')
|
||||
return False
|
||||
|
||||
# Get config values
|
||||
url = 'http://{}:{}/write'.format(os.environ.get('INFLUX_HOST'),
|
||||
os.environ.get('INFLUX_PORT'))
|
||||
kwargs.setdefault('url', url)
|
||||
|
||||
# Build the full tag dict and replace what was passed in
|
||||
tags = {'hostname': socket.gethostname()}
|
||||
if os.environ.get('ENVIRONMENT'):
|
||||
tags['environment'] = os.environ.get('ENVIRONMENT')
|
||||
if os.environ.get('SERVICE'):
|
||||
tags['service'] = os.environ.get('SERVICE')
|
||||
tags.update(kwargs.get('tags', {}))
|
||||
kwargs['tags'] = tags
|
||||
|
||||
# Create and start the collector
|
||||
setattr(application, 'influxdb', InfluxDBCollector(**kwargs))
|
||||
return True
|
||||
|
||||
|
||||
def shutdown(application):
|
||||
"""Invoke to shutdown the InfluxDB collector, writing any pending
|
||||
measurements to InfluxDB before stopping.
|
||||
|
||||
:param tornado.web.Application application: the application to
|
||||
install the collector into.
|
||||
:rtype: tornado.concurrent.TracebackFuture or None
|
||||
|
||||
"""
|
||||
collector = getattr(application, 'influxdb', None)
|
||||
if collector:
|
||||
return collector.shutdown()
|
||||
|
|
|
@ -2,6 +2,9 @@ import contextlib
|
|||
import socket
|
||||
import time
|
||||
|
||||
SETTINGS_KEY = 'sprockets.mixins.metrics.statsd'
|
||||
"""``self.settings`` key that configures this mix-in."""
|
||||
|
||||
|
||||
class StatsdMixin(object):
|
||||
"""
|
||||
|
@ -22,13 +25,9 @@ class StatsdMixin(object):
|
|||
this defaults to ``8125``.
|
||||
|
||||
"""
|
||||
|
||||
SETTINGS_KEY = 'sprockets.mixins.metrics.statsd'
|
||||
"""``self.settings`` key that configures this mix-in."""
|
||||
|
||||
def initialize(self):
|
||||
super(StatsdMixin, self).initialize()
|
||||
settings = self.settings.setdefault(self.SETTINGS_KEY, {})
|
||||
settings = self.settings.setdefault(SETTINGS_KEY, {})
|
||||
if 'socket' not in settings:
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
|
||||
settings['socket'] = sock
|
||||
|
@ -97,8 +96,7 @@ class StatsdMixin(object):
|
|||
try:
|
||||
yield
|
||||
finally:
|
||||
fini = max(start, time.time())
|
||||
self.record_timing(fini - start, *path)
|
||||
self.record_timing(max(start, time.time()) - start, *path)
|
||||
|
||||
def on_finish(self):
|
||||
"""
|
||||
|
@ -119,12 +117,12 @@ class StatsdMixin(object):
|
|||
|
||||
def _build_path(self, path):
|
||||
"""Return a normalized path."""
|
||||
return '{}.{}'.format(self.settings[self.SETTINGS_KEY]['namespace'],
|
||||
return '{}.{}'.format(self.settings[SETTINGS_KEY]['namespace'],
|
||||
'.'.join(str(p).replace('.', '-') for p in path))
|
||||
|
||||
def _send(self, path, value, stat_type):
|
||||
"""Send a metric to Statsd."""
|
||||
settings = self.settings[self.SETTINGS_KEY]
|
||||
settings = self.settings[SETTINGS_KEY]
|
||||
msg = '{0}:{1}|{2}'.format(path, value, stat_type)
|
||||
settings['socket'].sendto(msg.encode('ascii'),
|
||||
(settings['host'], int(settings['port'])))
|
||||
|
|
|
@ -1,10 +1,11 @@
|
|||
import collections
|
||||
import logging
|
||||
import re
|
||||
import socket
|
||||
|
||||
from tornado import gen, web
|
||||
|
||||
from sprockets.mixins.metrics import influxdb
|
||||
|
||||
|
||||
LOGGER = logging.getLogger(__name__)
|
||||
STATS_PATTERN = re.compile(r'(?P<path>[^:]*):(?P<value>[^|]*)\|(?P<type>.*)$')
|
||||
|
@ -124,12 +125,13 @@ class FakeInfluxHandler(web.RequestHandler):
|
|||
# inspect measurements
|
||||
|
||||
"""
|
||||
|
||||
def initialize(self):
|
||||
super(FakeInfluxHandler, self).initialize()
|
||||
self.logger = LOGGER.getChild(__name__)
|
||||
if not hasattr(self.application, 'influx_db'):
|
||||
self.application.influx_db = collections.defaultdict(list)
|
||||
self.application.influx_db = {}
|
||||
if self.application.influxdb.database not in self.application.influx_db:
|
||||
self.application.influx_db[self.application.influxdb.database] = []
|
||||
|
||||
def post(self):
|
||||
db = self.get_query_argument('db')
|
||||
|
@ -141,7 +143,7 @@ class FakeInfluxHandler(web.RequestHandler):
|
|||
self.set_status(204)
|
||||
|
||||
@staticmethod
|
||||
def get_messages(application, database, test_case):
|
||||
def get_messages(application, test_case):
|
||||
"""
|
||||
Wait for measurements to show up and return them.
|
||||
|
||||
|
@ -161,10 +163,10 @@ class FakeInfluxHandler(web.RequestHandler):
|
|||
are not received in a reasonable number of runs.
|
||||
|
||||
"""
|
||||
for _ in range(0, 10):
|
||||
for _ in range(0, 15):
|
||||
if hasattr(application, 'influx_db'):
|
||||
if application.influx_db[database]:
|
||||
return application.influx_db[database]
|
||||
if application.influx_db.get(application.influxdb.database):
|
||||
return application.influx_db[application.influxdb.database]
|
||||
test_case.io_loop.add_future(gen.sleep(0.1),
|
||||
lambda _: test_case.stop())
|
||||
test_case.wait()
|
||||
|
|
107
tests.py
107
tests.py
|
@ -6,26 +6,23 @@ import uuid
|
|||
from tornado import gen, testing, web
|
||||
import mock
|
||||
|
||||
from sprockets.mixins import metrics
|
||||
from sprockets.mixins.metrics.testing import (
|
||||
FakeInfluxHandler, FakeStatsdServer)
|
||||
from sprockets.mixins.metrics import influxdb, statsd
|
||||
from sprockets.mixins.metrics.testing import FakeInfluxHandler, FakeStatsdServer
|
||||
import examples.influxdb
|
||||
import examples.statsd
|
||||
|
||||
|
||||
class CounterBumper(metrics.StatsdMixin, web.RequestHandler):
|
||||
class CounterBumper(statsd.StatsdMixin, web.RequestHandler):
|
||||
|
||||
@gen.coroutine
|
||||
def get(self, counter, time):
|
||||
path = counter.split('.')
|
||||
with self.execution_timer(*path):
|
||||
yield gen.sleep(float(time))
|
||||
def get(self, counter, value):
|
||||
with self.execution_timer(*counter.split('.')):
|
||||
yield gen.sleep(float(value))
|
||||
self.set_status(204)
|
||||
self.finish()
|
||||
|
||||
def post(self, counter, amount):
|
||||
path = counter.split('.')
|
||||
self.increase_counter(*path, amount=int(amount))
|
||||
self.increase_counter(*counter.split('.'), amount=int(amount))
|
||||
self.set_status(204)
|
||||
|
||||
|
||||
|
@ -48,7 +45,7 @@ class StatsdMethodTimingTests(testing.AsyncHTTPTestCase):
|
|||
self.application = None
|
||||
super(StatsdMethodTimingTests, self).setUp()
|
||||
self.statsd = FakeStatsdServer(self.io_loop)
|
||||
self.application.settings[metrics.StatsdMixin.SETTINGS_KEY] = {
|
||||
self.application.settings[statsd.SETTINGS_KEY] = {
|
||||
'host': self.statsd.sockaddr[0],
|
||||
'port': self.statsd.sockaddr[1],
|
||||
'namespace': 'testing',
|
||||
|
@ -60,7 +57,7 @@ class StatsdMethodTimingTests(testing.AsyncHTTPTestCase):
|
|||
|
||||
@property
|
||||
def settings(self):
|
||||
return self.application.settings[metrics.StatsdMixin.SETTINGS_KEY]
|
||||
return self.application.settings[statsd.SETTINGS_KEY]
|
||||
|
||||
def test_that_http_method_call_is_recorded(self):
|
||||
response = self.fetch('/')
|
||||
|
@ -120,23 +117,28 @@ class InfluxDbTests(testing.AsyncHTTPTestCase):
|
|||
web.url(r'/', examples.influxdb.SimpleHandler),
|
||||
web.url(r'/write', FakeInfluxHandler),
|
||||
])
|
||||
influxdb.install(self.application, **{'database': 'requests',
|
||||
'submission_interval': 1,
|
||||
'url': self.get_url('/write')})
|
||||
self.application.influx_db = {}
|
||||
return self.application
|
||||
|
||||
def setUp(self):
|
||||
self.application = None
|
||||
super(InfluxDbTests, self).setUp()
|
||||
self.application.settings[metrics.InfluxDBMixin.SETTINGS_KEY] = {
|
||||
'measurement': 'my-service',
|
||||
'write_url': self.get_url('/write'),
|
||||
'database': 'requests',
|
||||
'max_buffer_length': 0
|
||||
self.application.settings[influxdb.SETTINGS_KEY] = {
|
||||
'measurement': 'my-service'
|
||||
}
|
||||
logging.getLogger(FakeInfluxHandler.__module__).setLevel(logging.DEBUG)
|
||||
|
||||
@gen.coroutine
|
||||
def tearDown(self):
|
||||
yield influxdb.shutdown(self.application)
|
||||
super(InfluxDbTests, self).tearDown()
|
||||
|
||||
@property
|
||||
def influx_messages(self):
|
||||
return FakeInfluxHandler.get_messages(self.application,
|
||||
'requests', self)
|
||||
return FakeInfluxHandler.get_messages(self.application, self)
|
||||
|
||||
def test_that_http_method_call_details_are_recorded(self):
|
||||
start = int(time.time())
|
||||
|
@ -149,7 +151,7 @@ class InfluxDbTests(testing.AsyncHTTPTestCase):
|
|||
self.assertEqual(tag_dict['handler'],
|
||||
'examples.influxdb.SimpleHandler')
|
||||
self.assertEqual(tag_dict['method'], 'GET')
|
||||
self.assertEqual(tag_dict['host'], socket.gethostname())
|
||||
self.assertEqual(tag_dict['hostname'], socket.gethostname())
|
||||
self.assertEqual(tag_dict['status_code'], '204')
|
||||
|
||||
value_dict = dict(a.split('=') for a in fields.split(','))
|
||||
|
@ -190,7 +192,7 @@ class InfluxDbTests(testing.AsyncHTTPTestCase):
|
|||
list(self.application.influx_db['requests'])))
|
||||
|
||||
def test_that_cached_db_connection_is_used(self):
|
||||
cfg = self.application.settings[metrics.InfluxDBMixin.SETTINGS_KEY]
|
||||
cfg = self.application.settings[influxdb.SETTINGS_KEY]
|
||||
conn = mock.Mock()
|
||||
cfg['db_connection'] = conn
|
||||
response = self.fetch('/')
|
||||
|
@ -211,68 +213,9 @@ class InfluxDbTests(testing.AsyncHTTPTestCase):
|
|||
self.fail('Expected to find "request" metric in {!r}'.format(
|
||||
list(self.application.influx_db['requests'])))
|
||||
|
||||
def test_metrics_with_buffer_flush_on_max_time(self):
|
||||
max_buffer_time = 1
|
||||
self.application.settings[metrics.InfluxDBMixin.SETTINGS_KEY] = {
|
||||
'measurement': 'my-service',
|
||||
'write_url': self.get_url('/write'),
|
||||
'database': 'requests',
|
||||
'max_buffer_time': max_buffer_time
|
||||
}
|
||||
|
||||
# 2 requests
|
||||
response = self.fetch('/')
|
||||
self.assertEqual(response.code, 204)
|
||||
time.sleep(max_buffer_time+1)
|
||||
response = self.fetch('/')
|
||||
self.assertEqual(response.code, 204)
|
||||
self.assertEqual(2, len(self.influx_messages))
|
||||
|
||||
for key, fields, timestamp in self.influx_messages:
|
||||
if key.startswith('my-service,'):
|
||||
value_dict = dict(a.split('=') for a in fields.split(','))
|
||||
self.assertEqual(int(value_dict['slept']), 42)
|
||||
break
|
||||
else:
|
||||
self.fail('Expected to find "request" metric in {!r}'.format(
|
||||
list(self.application.influx_db['requests'])))
|
||||
|
||||
def test_metrics_with_buffer_flush_on_max_length(self):
|
||||
max_buffer_length = 2
|
||||
self.application.settings[metrics.InfluxDBMixin.SETTINGS_KEY] = {
|
||||
'measurement': 'my-service',
|
||||
'write_url': self.get_url('/write'),
|
||||
'database': 'requests',
|
||||
'max_buffer_time': 100,
|
||||
'max_buffer_length': max_buffer_length
|
||||
}
|
||||
|
||||
# 3 requests with buffer length = 2,
|
||||
# so only 2 metrics should be flushed
|
||||
response = self.fetch('/')
|
||||
self.assertEqual(response.code, 204)
|
||||
response = self.fetch('/')
|
||||
self.assertEqual(response.code, 204)
|
||||
response = self.fetch('/')
|
||||
self.assertEqual(response.code, 204)
|
||||
self.assertEqual(max_buffer_length, len(self.influx_messages))
|
||||
|
||||
for key, fields, timestamp in self.influx_messages:
|
||||
if key.startswith('my-service,'):
|
||||
value_dict = dict(a.split('=') for a in fields.split(','))
|
||||
self.assertEqual(int(value_dict['slept']), 42)
|
||||
break
|
||||
else:
|
||||
self.fail('Expected to find "request" metric in {!r}'.format(
|
||||
list(self.application.influx_db['requests'])))
|
||||
|
||||
def test_metrics_with_buffer_not_flush(self):
|
||||
self.application.settings[metrics.InfluxDBMixin.SETTINGS_KEY] = {
|
||||
'measurement': 'my-service',
|
||||
'write_url': self.get_url('/write'),
|
||||
'database': 'requests',
|
||||
'max_buffer_time': 100,
|
||||
'max_buffer_length': 100
|
||||
self.application.settings[influxdb] = {
|
||||
'measurement': 'my-service'
|
||||
}
|
||||
|
||||
# 2 requests
|
||||
|
|
Loading…
Reference in a new issue