diff --git a/README.rst b/README.rst index 7ec521e..f5968d3 100644 --- a/README.rst +++ b/README.rst @@ -1,14 +1,40 @@ sprockets.mixins.metrics ======================== -Adjust counter and timer metrics in InfluxDB or Graphite using the same API. +Adjust counter and timer metrics in `InfluxDB`_ or `StatsD`_ using the same API. + +The mix-in is configured through the ``tornado.web.Application`` settings +property using a key defined by the specific mix-in. + +Statsd Mixin +------------ + +The following snippet configures the StatsD mix-in from common environment +variables. This simple handler will emit a timer metric that identifies each +call to the ``get`` method as well as a separate metric for the database query. .. code-block:: python - from sprockets.mixins import mediatype, metrics + import os + + from sprockets.mixins import mediatype + from sprockets.mixins.metrics import statsd from tornado import gen, web import queries - class MyHandler(metrics.StatsdMixin, mediatype.ContentMixin, + def make_application(): + settings = { + statsd.SETTINGS_KEY: { + 'namespace': 'my-application', + 'host': os.environ.get('STATSD_HOST', '127.0.0.1'), + 'port': os.environ.get('STATSD_PORT', '8125'), + } + } + return web.Application([ + # insert handlers here + ], **settings) + + class MyHandler(statsd.StatsdMixin, + mediatype.ContentMixin, web.RequestHandler): def initialize(self): @@ -22,38 +48,8 @@ Adjust counter and timer metrics in InfluxDB or Graphite using the same API. obj_id) self.send_response(result) -This simple handler will emit a timer metric that identifies each call to the -``get`` method as well as a separate metric for the database query. Switching -from using `statsd`_ to `InfluxDB`_ is simply a matter of switch from the -``metrics.StatsdMixin`` to the ``metrics.InfluxDBMixin``. - -The mix-in is configured through the ``tornado.web.Application`` settings -property using a key defined by the specific mix-in. - -Statsd Mixin ------------- - -The following snippet configures the StatsD mix-in from common environment -variables: - -.. code-block:: python - - import os - - from sprockets.mixins import metrics - from tornado import web - - def make_application(): - settings = { - metrics.StatsdMixin.SETTINGS_KEY: { - 'namespace': 'my-application', - 'host': os.environ.get('STATSD_HOST', '127.0.0.1'), - 'port': os.environ.get('STATSD_PORT', '8125'), - } - } - return web.Application([ - # insert handlers here - ], **settings) +Settings +^^^^^^^^ :namespace: The namespace for the measurements :host: The Statsd host @@ -69,32 +65,55 @@ variables: import os - from sprockets.mixins import metrics - from tornado import web + from sprockets.mixins.metrics import influxdb + from sprockets.mixins import postgresql + from tornado import gen, web - def make_application(): - settings = { - metrics.InfluxDBMixin.SETTINGS_KEY: { - 'measurement': 'my-application', - 'database': 'services', - 'write_url': 'http://{}:{}/write'.format( - os.environ.get('INFLUX_HOST', '127.0.0.1'), - os.environ.get('INFLUX_PORT', 8086)), - 'max_buffer_time': 3, - 'max_buffer_length': 100 - } + def make_app(**settings): + settings[influxdb.SETTINGS_KEY] = { + 'measurement': 'rollup', } - return web.Application([ - # insert handlers here - ], **settings) -:measurement: The InfluxDB measurement name -:database: The InfluxDB database to write measurements into -:write_url: the InfluxDB write URL to send HTTP requests to -:max_buffer_time: The maximum elasped time measurements should remain in - buffer before writing to InfluxDB. -:max_buffer_length: The maximum number of measurements to - buffer before writing to InfluxDB. + application = web.Application( + [ + web.url(r'/', RequestHandler), + ], **settings) + + influxdb.install({'url': 'http://localhost:8086', + 'database': 'tornado-app'}) + return application + + + class MyHandler(influxdb.InfluxDBMixin, + postgresql.HandlerMixin, + web.RequestHandler): + + @gen.coroutine + def get(self, obj_id): + with self.execution_timer('dbquery', 'get'): + result = yield self.postgresql_session.query( + 'SELECT * FROM foo WHERE id=%s', obj_id) + self.send_response(result) + +If your application handles signal handling for shutdowns, the +:meth:`~sprockets.mixins.influxdb.shutdown` method will try to cleanly ensure +that any buffered metrics in the InfluxDB collector are written prior to +shutting down. The method returns a :cls:`~tornado.concurrent.TracebackFuture` +that should be waited on prior to shutting down. + +Settings +^^^^^^^^ + +:url: The InfluxDB API URL +:database: the database to write measurements into +:submission_interval: How often to submit metric batches in + milliseconds. Default: ``5000`` +:max_batch_size: The number of measurements to be submitted in a + single HTTP request. Default: ``1000`` +:tags: Default tags that are to be submitted with each metric. The tag + ``hostname`` is added by default along with ``environment`` and ``service`` + if the corresponding ``ENVIRONMENT`` or ``SERVICE`` environment variables + are set. Development Quickstart ---------------------- @@ -104,6 +123,13 @@ Development Quickstart $ . ./env/bin/activate (env)$ env/bin/pip install -r requires/development.txt (env)$ nosetests + test_metrics_with_buffer_not_flush (tests.InfluxDbTests) ... ok + test_that_cached_db_connection_is_used (tests.InfluxDbTests) ... ok + test_that_counter_is_tracked (tests.InfluxDbTests) ... ok + test_that_execution_timer_is_tracked (tests.InfluxDbTests) ... ok + test_that_http_method_call_details_are_recorded (tests.InfluxDbTests) ... ok + test_that_metric_tag_is_tracked (tests.InfluxDbTests) ... ok + test_that_add_metric_tag_is_ignored (tests.StatsdMethodTimingTests) ... ok test_that_cached_socket_is_used (tests.StatsdMethodTimingTests) ... ok test_that_counter_accepts_increment_value (tests.StatsdMethodTimingTests) ... ok test_that_counter_increment_defaults_to_one (tests.StatsdMethodTimingTests) ... ok @@ -112,12 +138,12 @@ Development Quickstart test_that_http_method_call_is_recorded (tests.StatsdMethodTimingTests) ... ok ---------------------------------------------------------------------- - Ran 6 tests in 1.089s + Ran 13 tests in 3.572s OK (env)$ ./setup.py build_sphinx -q running build_sphinx (env)$ open build/sphinx/html/index.html -.. _statsd: https://github.com/etsy/statsd +.. _StatsD: https://github.com/etsy/statsd .. _InfluxDB: https://influxdata.com diff --git a/examples/influxdb.py b/examples/influxdb.py index f6db3a9..697047c 100644 --- a/examples/influxdb.py +++ b/examples/influxdb.py @@ -1,11 +1,10 @@ -import os import signal -from sprockets.mixins import metrics +from sprockets.mixins.metrics import influxdb from tornado import concurrent, gen, ioloop, web -class SimpleHandler(metrics.InfluxDBMixin, web.RequestHandler): +class SimpleHandler(influxdb.InfluxDBMixin, web.RequestHandler): """ Simply emits a few metrics around the GET method. @@ -65,17 +64,14 @@ def make_application(): by the ``service`` setting. """ - influx_url = 'http://{}:{}/write'.format( - os.environ.get('INFLUX_HOST', '127.0.0.1'), - os.environ.get('INFLUX_PORT', 8086)) settings = { - metrics.InfluxDBMixin.SETTINGS_KEY: { - 'measurement': 'cli', - 'database': 'testing', - 'write_url': influx_url, + influxdb.SETTINGS_KEY: { + 'measurement': 'example', } } - return web.Application([web.url('/', SimpleHandler)], **settings) + application = web.Application([web.url('/', SimpleHandler)], **settings) + influxdb.install(application, **{'database': 'testing'}) + return application if __name__ == '__main__': diff --git a/examples/statsd.py b/examples/statsd.py index 432a7fa..99e8571 100644 --- a/examples/statsd.py +++ b/examples/statsd.py @@ -1,10 +1,10 @@ import signal -from sprockets.mixins import metrics +from sprockets.mixins.metrics import statsd from tornado import concurrent, gen, ioloop, web -class SimpleHandler(metrics.StatsdMixin, web.RequestHandler): +class SimpleHandler(statsd.StatsdMixin, web.RequestHandler): """ Simply emits a timing metric around the method call. @@ -53,7 +53,7 @@ def make_application(): """ settings = { - metrics.StatsdMixin.SETTINGS_KEY: { + statsd.SETTINGS_KEY: { 'namespace': 'webapps', 'host': '127.0.0.1', 'port': 8125, diff --git a/sprockets/mixins/metrics/__init__.py b/sprockets/mixins/metrics/__init__.py index 8d919ab..7c63015 100644 --- a/sprockets/mixins/metrics/__init__.py +++ b/sprockets/mixins/metrics/__init__.py @@ -1,13 +1,3 @@ -try: - from .influxdb import InfluxDBMixin - from .statsd import StatsdMixin -except ImportError as error: - def InfluxDBMixin(*args, **kwargs): - raise error - - def StatsdMixin(*args, **kwargs): - raise error - -version_info = (1, 1, 1) +version_info = (2, 0, 0) __version__ = '.'.join(str(v) for v in version_info) -__all__ = ['__version__', 'version_info', 'InfluxDBMixin', 'StatsdMixin'] +__all__ = ['__version__', 'version_info'] diff --git a/sprockets/mixins/metrics/influxdb.py b/sprockets/mixins/metrics/influxdb.py index 6dc01e4..a6a25a1 100644 --- a/sprockets/mixins/metrics/influxdb.py +++ b/sprockets/mixins/metrics/influxdb.py @@ -1,119 +1,31 @@ import contextlib +import logging +import os import socket import time -from tornado import httpclient, ioloop +from tornado import concurrent, httpclient, ioloop +from sprockets.mixins.metrics import __version__ -class InfluxDBConnection(object): - """Connection to an InfluxDB instance. +LOGGER = logging.getLogger(__name__) - :param str write_url: the URL to send HTTP requests to - :param str database: the database to write measurements into - :param tornado.ioloop.IOLoop: the IOLoop to spawn callbacks on. - If this parameter is :data:`None`, then the active IOLoop, - as determined by :meth:`tornado.ioloop.IOLoop.instance`, - is used. - :param int max_buffer_time: the maximum elasped time measurements - should remain in buffer before writing to InfluxDB. - :param int max_buffer_length: the maximum number of measurements to - buffer before writing to InfluxDB. +SETTINGS_KEY = 'sprockets.mixins.metrics.influxdb' +"""``self.settings`` key that configures this mix-in.""" - An instance of this class is stored in the application settings - and used to asynchronously send measurements to InfluxDB instance. - Each measurement is sent by spawning a context-free callback on - the IOloop. - - """ - - MAX_BUFFER_TIME = 5 - MAX_BUFFER_LENGTH = 100 - - def __init__(self, write_url, database, io_loop=None, - max_buffer_time=None, max_buffer_length=None): - self.io_loop = ioloop.IOLoop.instance() if io_loop is None else io_loop - self.client = httpclient.AsyncHTTPClient() - self.write_url = '{}?db={}'.format(write_url, database) - - self._buffer = [] - if max_buffer_time is None: - max_buffer_time = self.MAX_BUFFER_TIME - if max_buffer_length is None: - max_buffer_length = self.MAX_BUFFER_LENGTH - self._max_buffer_time = float(max_buffer_time) - self._max_buffer_length = int(max_buffer_length) - self._last_write = self.io_loop.time() - - def submit(self, measurement, tags, values): - """Write the data using the HTTP API - - :param str measurement: The required measurement name - :param list tags: The measurement tags - :param list values: The recorded measurements - """ - body = '{},{} {} {:d}'.format(measurement, ','.join(tags), - ','.join(values), - int(time.time() * 1000000000)) - self._buffer.append(body) - if self._should_write: - self._write() - - def _write(self): - """Write the measurement""" - body = '\n'.join(self._buffer) - request = httpclient.HTTPRequest(self.write_url, method='POST', - body=body.encode('utf-8')) - ioloop.IOLoop.current().spawn_callback(self.client.fetch, request) - self._last_write = self.io_loop.time() - del self._buffer[:] - - @property - def _should_write(self): - """Returns ``True`` if the buffered measurements should be sent""" - if len(self._buffer) >= self._max_buffer_length: - return True - if self.io_loop.time() >= (self._last_write + self._max_buffer_time): - return True - return False +_USER_AGENT = 'sprockets.mixins.metrics/v{}'.format(__version__) class InfluxDBMixin(object): - """ - Mix this class in to record measurements to a InfluxDB server. - - **Configuration** - - :database: - InfluxDB database to write measurements to. This is passed - as the ``db`` query parameter when writing to Influx. - - https://docs.influxdata.com/influxdb/v0.9/guides/writing_data/ - - :write_url: - The URL that the InfluxDB write endpoint is available on. - This is used as-is to write data into Influx. - - """ - - SETTINGS_KEY = 'sprockets.mixins.metrics.influxdb' - """``self.settings`` key that configures this mix-in.""" - - def initialize(self): - super(InfluxDBMixin, self).initialize() - if self.SETTINGS_KEY in self.settings: - settings = self.settings[self.SETTINGS_KEY] - if 'db_connection' not in settings: - settings['db_connection'] = InfluxDBConnection( - settings['write_url'], settings['database'], - max_buffer_time=settings.get('max_buffer_time'), - max_buffer_length=settings.get('max_buffer_length')) + """Mix this class in to record measurements to a InfluxDB server.""" + def __init__(self, application, request, **kwargs): + super(InfluxDBMixin, self).__init__(application, request, **kwargs) self.__metrics = [] self.__tags = { - 'host': socket.gethostname(), 'handler': '{}.{}'.format(self.__module__, self.__class__.__name__), - 'method': self.request.method, + 'method': request.method, } def set_metric_tag(self, tag, value): @@ -139,7 +51,8 @@ class InfluxDBMixin(object): A timing is a named duration value. """ - self.__metrics.append('{}={}'.format('.'.join(path), duration)) + self.__metrics.append('{}={}'.format( + self.application.influxdb.escape_str('.'.join(path)), duration)) def increase_counter(self, *path, **kwargs): """ @@ -152,8 +65,9 @@ class InfluxDBMixin(object): Counters are simply values that are summed in a query. """ - self.__metrics.append('{}={}'.format('.'.join(path), - kwargs.get('amount', 1))) + self.__metrics.append('{}={}'.format( + self.application.influxdb.escape_str('.'.join(path)), + kwargs.get('amount', 1))) @contextlib.contextmanager def execution_timer(self, *path): @@ -171,15 +85,247 @@ class InfluxDBMixin(object): try: yield finally: - fini = max(time.time(), start) - self.record_timing(fini - start, *path) + self.record_timing(max(time.time(), start) - start, *path) def on_finish(self): super(InfluxDBMixin, self).on_finish() self.set_metric_tag('status_code', self._status_code) self.record_timing(self.request.request_time(), 'duration') - self.settings[self.SETTINGS_KEY]['db_connection'].submit( - self.settings[self.SETTINGS_KEY]['measurement'], - ('{}={}'.format(k, v) for k, v in self.__tags.items()), - self.__metrics, - ) + self.application.influxdb.submit( + self.settings[SETTINGS_KEY]['measurement'], + self.__tags, + self.__metrics) + + +class InfluxDBCollector(object): + """Collects and submits stats to InfluxDB on a periodic callback. + + :param str url: The InfluxDB API URL + :param str database: the database to write measurements into + :param tornado.ioloop.IOLoop: the IOLoop to spawn callbacks on. + If this parameter is :data:`None`, then the active IOLoop, + as determined by :meth:`tornado.ioloop.IOLoop.instance`, + is used. + :param int submission_interval: How often to submit metric batches in + milliseconds. Default: ``5000`` + :param max_batch_size: The number of measurements to be submitted in a + single HTTP request. Default: ``1000`` + :param dict tags: Default tags that are to be submitted with each metric. + + This class should be constructed using the + :meth:`~sprockets.mixins.influxdb.install` method. When installed, it is + attached to the :class:`~tornado.web.Application` instance for your web + application and schedules a periodic callback to submit metrics to InfluxDB + in batches. + + """ + SUBMISSION_INTERVAL = 5000 + MAX_BATCH_SIZE = 1000 + WARN_THRESHOLD = 25000 + + def __init__(self, url='http://localhost:8086', database='sprockets', + io_loop=None, submission_interval=SUBMISSION_INTERVAL, + max_batch_size=MAX_BATCH_SIZE, tags=None): + self._buffer = list() + self._database = database + self._influxdb_url = '{}?db={}'.format(url, database) + self._interval = submission_interval or self.SUBMISSION_INTERVAL + self._io_loop = io_loop or ioloop.IOLoop.current() + self._max_batch_size = max_batch_size or self.MAX_BATCH_SIZE + self._pending = 0 + self._tags = tags or {} + + self._client = httpclient.AsyncHTTPClient(force_instance=True, + io_loop=self._io_loop) + self._client.configure(None, defaults={'user_agent': _USER_AGENT}) + + # Add the periodic callback for submitting metrics + LOGGER.info('Starting PeriodicCallback for writing InfluxDB metrics') + self._callback = ioloop.PeriodicCallback(self._write_metrics, + self._interval) + self._callback.start() + + @staticmethod + def escape_str(value): + """Escape the value with InfluxDB's wonderful escaping logic: + + "Measurement names, tag keys, and tag values must escape any spaces or + commas using a backslash (\). For example: \ and \,. All tag values are + stored as strings and should not be surrounded in quotes." + + :param str value: The value to be escaped + :rtype: str + + """ + return str(value).replace(' ', '\ ').replace(',', '\,') + + @property + def database(self): + """Return the configured database name. + + :rtype: str + + """ + return self._database + + def shutdown(self): + """Invoke on shutdown of your application to stop the periodic + callbacks and flush any remaining metrics. + + Returns a future that is complete when all pending metrics have been + submitted. + + :rtype: :class:`~tornado.concurrent.TracebackFuture()` + + """ + future = concurrent.TracebackFuture() + self._callback.stop() + self._write_metrics() + self._shutdown_wait(future) + return future + + def submit(self, measurement, tags, values): + """Add a measurement to the buffer that will be submitted to InfluxDB + on the next periodic callback for writing metrics. + + :param str measurement: The measurement name + :param dict tags: The measurement tags + :param list values: The recorded measurements + + """ + self._buffer.append('{},{} {} {:d}'.format( + self.escape_str(measurement), + self._get_tag_string(tags), + ','.join(values), + int(time.time() * 1000000000))) + if len(self._buffer) > self.WARN_THRESHOLD: + LOGGER.warning('InfluxDB metric buffer is > %i (%i)', + self.WARN_THRESHOLD, len(self._buffer)) + + def _get_tag_string(self, tags): + """Return the tags to be submitted with a measurement combining the + default tags that were passed in when constructing the class along + with any measurement specific tags passed into the + :meth:`~InfluxDBConnection.submit` method. Tags will be properly + escaped and formatted for submission. + + :param dict tags: Measurement specific tags + :rtype: str + + """ + values = dict(self._tags) + values.update(tags) + return ','.join(['{}={}'.format(self.escape_str(k), self.escape_str(v)) + for k, v in values.items()]) + + def _on_write_response(self, response): + """This is invoked by the Tornado IOLoop when the HTTP request to + InfluxDB has returned with a result. + + :param response: The response from InfluxDB + :type response: :class:`~tornado.httpclient.HTTPResponse` + + """ + self._pending -= 1 + LOGGER.debug('InfluxDB batch response: %s', response.code) + if response.error: + LOGGER.error('InfluxDB batch submission error: %s', response.error) + + def _shutdown_wait(self, future): + """Pause briefly allowing any pending metric writes to complete before + shutting down. + + :param future tornado.concurrent.TracebackFuture: The future to resulve + when the shutdown is complete. + + """ + if not self._pending: + future.set_result(True) + return + LOGGER.debug('Waiting for pending metric writes') + self._io_loop.add_timeout(self._io_loop.time() + 0.1, + self._shutdown_wait, + (future,)) + + def _write_metrics(self): + """Submit the metrics in the buffer to InfluxDB. This is invoked + by the periodic callback scheduled when the class is created. + + It will submit batches until the buffer is empty. + + """ + if not self._buffer: + return + LOGGER.debug('InfluxDB buffer has %i items', len(self._buffer)) + while self._buffer: + body = '\n'.join(self._buffer[:self._max_batch_size]) + self._buffer = self._buffer[self._max_batch_size:] + self._pending += 1 + self._client.fetch(self._influxdb_url, method='POST', + body=body.encode('utf-8'), + raise_error=False, + callback=self._on_write_response) + LOGGER.debug('Submitted all InfluxDB metrics for writing') + + +def install(application, **kwargs): + """Call this to install the InfluxDB collector into a Tornado application. + + :param tornado.web.Application application: the application to + install the collector into. + :param kwargs: keyword parameters to pass to the + :class:`InfluxDBCollector` initializer. + :returns: :data:`True` if the client was installed by this call + and :data:`False` otherwise. + + + Optional configuration values: + + - **url** The InfluxDB API URL. If URL is not specified, the + ``INFLUX_HOST`` and ``INFLUX_PORT`` environment variables will be used + to construct the URL to pass into the :class:`InfluxDBCollector`. + - **database** the database to write measurements into. + The default is ``sprockets``. + - **io_loop** A :class:`~tornado.ioloop.IOLoop` to use + - **submission_interval** How often to submit metric batches in + milliseconds. Default: ``5000`` + - **max_batch_size** The number of measurements to be submitted in a + single HTTP request. Default: ``1000`` + - **tags** Default tags that are to be submitted with each metric. + + """ + if getattr(application, 'influxdb', None) is not None: + LOGGER.warning('InfluxDBCollector is already installed') + return False + + # Get config values + url = 'http://{}:{}/write'.format(os.environ.get('INFLUX_HOST'), + os.environ.get('INFLUX_PORT')) + kwargs.setdefault('url', url) + + # Build the full tag dict and replace what was passed in + tags = {'hostname': socket.gethostname()} + if os.environ.get('ENVIRONMENT'): + tags['environment'] = os.environ.get('ENVIRONMENT') + if os.environ.get('SERVICE'): + tags['service'] = os.environ.get('SERVICE') + tags.update(kwargs.get('tags', {})) + kwargs['tags'] = tags + + # Create and start the collector + setattr(application, 'influxdb', InfluxDBCollector(**kwargs)) + return True + + +def shutdown(application): + """Invoke to shutdown the InfluxDB collector, writing any pending + measurements to InfluxDB before stopping. + + :param tornado.web.Application application: the application to + install the collector into. + :rtype: tornado.concurrent.TracebackFuture or None + + """ + collector = getattr(application, 'influxdb', None) + if collector: + return collector.shutdown() diff --git a/sprockets/mixins/metrics/statsd.py b/sprockets/mixins/metrics/statsd.py index 480e2cf..9c31c95 100644 --- a/sprockets/mixins/metrics/statsd.py +++ b/sprockets/mixins/metrics/statsd.py @@ -2,6 +2,9 @@ import contextlib import socket import time +SETTINGS_KEY = 'sprockets.mixins.metrics.statsd' +"""``self.settings`` key that configures this mix-in.""" + class StatsdMixin(object): """ @@ -22,13 +25,9 @@ class StatsdMixin(object): this defaults to ``8125``. """ - - SETTINGS_KEY = 'sprockets.mixins.metrics.statsd' - """``self.settings`` key that configures this mix-in.""" - def initialize(self): super(StatsdMixin, self).initialize() - settings = self.settings.setdefault(self.SETTINGS_KEY, {}) + settings = self.settings.setdefault(SETTINGS_KEY, {}) if 'socket' not in settings: sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0) settings['socket'] = sock @@ -97,8 +96,7 @@ class StatsdMixin(object): try: yield finally: - fini = max(start, time.time()) - self.record_timing(fini - start, *path) + self.record_timing(max(start, time.time()) - start, *path) def on_finish(self): """ @@ -119,12 +117,12 @@ class StatsdMixin(object): def _build_path(self, path): """Return a normalized path.""" - return '{}.{}'.format(self.settings[self.SETTINGS_KEY]['namespace'], + return '{}.{}'.format(self.settings[SETTINGS_KEY]['namespace'], '.'.join(str(p).replace('.', '-') for p in path)) def _send(self, path, value, stat_type): """Send a metric to Statsd.""" - settings = self.settings[self.SETTINGS_KEY] + settings = self.settings[SETTINGS_KEY] msg = '{0}:{1}|{2}'.format(path, value, stat_type) settings['socket'].sendto(msg.encode('ascii'), (settings['host'], int(settings['port']))) diff --git a/sprockets/mixins/metrics/testing.py b/sprockets/mixins/metrics/testing.py index c1b0f39..8ec76be 100644 --- a/sprockets/mixins/metrics/testing.py +++ b/sprockets/mixins/metrics/testing.py @@ -1,10 +1,11 @@ -import collections import logging import re import socket from tornado import gen, web +from sprockets.mixins.metrics import influxdb + LOGGER = logging.getLogger(__name__) STATS_PATTERN = re.compile(r'(?P[^:]*):(?P[^|]*)\|(?P.*)$') @@ -124,12 +125,13 @@ class FakeInfluxHandler(web.RequestHandler): # inspect measurements """ - def initialize(self): super(FakeInfluxHandler, self).initialize() self.logger = LOGGER.getChild(__name__) if not hasattr(self.application, 'influx_db'): - self.application.influx_db = collections.defaultdict(list) + self.application.influx_db = {} + if self.application.influxdb.database not in self.application.influx_db: + self.application.influx_db[self.application.influxdb.database] = [] def post(self): db = self.get_query_argument('db') @@ -141,7 +143,7 @@ class FakeInfluxHandler(web.RequestHandler): self.set_status(204) @staticmethod - def get_messages(application, database, test_case): + def get_messages(application, test_case): """ Wait for measurements to show up and return them. @@ -161,10 +163,10 @@ class FakeInfluxHandler(web.RequestHandler): are not received in a reasonable number of runs. """ - for _ in range(0, 10): + for _ in range(0, 15): if hasattr(application, 'influx_db'): - if application.influx_db[database]: - return application.influx_db[database] + if application.influx_db.get(application.influxdb.database): + return application.influx_db[application.influxdb.database] test_case.io_loop.add_future(gen.sleep(0.1), lambda _: test_case.stop()) test_case.wait() diff --git a/tests.py b/tests.py index ad1aa67..eca1954 100644 --- a/tests.py +++ b/tests.py @@ -6,26 +6,23 @@ import uuid from tornado import gen, testing, web import mock -from sprockets.mixins import metrics -from sprockets.mixins.metrics.testing import ( - FakeInfluxHandler, FakeStatsdServer) +from sprockets.mixins.metrics import influxdb, statsd +from sprockets.mixins.metrics.testing import FakeInfluxHandler, FakeStatsdServer import examples.influxdb import examples.statsd -class CounterBumper(metrics.StatsdMixin, web.RequestHandler): +class CounterBumper(statsd.StatsdMixin, web.RequestHandler): @gen.coroutine - def get(self, counter, time): - path = counter.split('.') - with self.execution_timer(*path): - yield gen.sleep(float(time)) + def get(self, counter, value): + with self.execution_timer(*counter.split('.')): + yield gen.sleep(float(value)) self.set_status(204) self.finish() def post(self, counter, amount): - path = counter.split('.') - self.increase_counter(*path, amount=int(amount)) + self.increase_counter(*counter.split('.'), amount=int(amount)) self.set_status(204) @@ -48,7 +45,7 @@ class StatsdMethodTimingTests(testing.AsyncHTTPTestCase): self.application = None super(StatsdMethodTimingTests, self).setUp() self.statsd = FakeStatsdServer(self.io_loop) - self.application.settings[metrics.StatsdMixin.SETTINGS_KEY] = { + self.application.settings[statsd.SETTINGS_KEY] = { 'host': self.statsd.sockaddr[0], 'port': self.statsd.sockaddr[1], 'namespace': 'testing', @@ -60,7 +57,7 @@ class StatsdMethodTimingTests(testing.AsyncHTTPTestCase): @property def settings(self): - return self.application.settings[metrics.StatsdMixin.SETTINGS_KEY] + return self.application.settings[statsd.SETTINGS_KEY] def test_that_http_method_call_is_recorded(self): response = self.fetch('/') @@ -120,23 +117,28 @@ class InfluxDbTests(testing.AsyncHTTPTestCase): web.url(r'/', examples.influxdb.SimpleHandler), web.url(r'/write', FakeInfluxHandler), ]) + influxdb.install(self.application, **{'database': 'requests', + 'submission_interval': 1, + 'url': self.get_url('/write')}) + self.application.influx_db = {} return self.application def setUp(self): self.application = None super(InfluxDbTests, self).setUp() - self.application.settings[metrics.InfluxDBMixin.SETTINGS_KEY] = { - 'measurement': 'my-service', - 'write_url': self.get_url('/write'), - 'database': 'requests', - 'max_buffer_length': 0 + self.application.settings[influxdb.SETTINGS_KEY] = { + 'measurement': 'my-service' } logging.getLogger(FakeInfluxHandler.__module__).setLevel(logging.DEBUG) + @gen.coroutine + def tearDown(self): + yield influxdb.shutdown(self.application) + super(InfluxDbTests, self).tearDown() + @property def influx_messages(self): - return FakeInfluxHandler.get_messages(self.application, - 'requests', self) + return FakeInfluxHandler.get_messages(self.application, self) def test_that_http_method_call_details_are_recorded(self): start = int(time.time()) @@ -149,7 +151,7 @@ class InfluxDbTests(testing.AsyncHTTPTestCase): self.assertEqual(tag_dict['handler'], 'examples.influxdb.SimpleHandler') self.assertEqual(tag_dict['method'], 'GET') - self.assertEqual(tag_dict['host'], socket.gethostname()) + self.assertEqual(tag_dict['hostname'], socket.gethostname()) self.assertEqual(tag_dict['status_code'], '204') value_dict = dict(a.split('=') for a in fields.split(',')) @@ -190,7 +192,7 @@ class InfluxDbTests(testing.AsyncHTTPTestCase): list(self.application.influx_db['requests']))) def test_that_cached_db_connection_is_used(self): - cfg = self.application.settings[metrics.InfluxDBMixin.SETTINGS_KEY] + cfg = self.application.settings[influxdb.SETTINGS_KEY] conn = mock.Mock() cfg['db_connection'] = conn response = self.fetch('/') @@ -211,68 +213,9 @@ class InfluxDbTests(testing.AsyncHTTPTestCase): self.fail('Expected to find "request" metric in {!r}'.format( list(self.application.influx_db['requests']))) - def test_metrics_with_buffer_flush_on_max_time(self): - max_buffer_time = 1 - self.application.settings[metrics.InfluxDBMixin.SETTINGS_KEY] = { - 'measurement': 'my-service', - 'write_url': self.get_url('/write'), - 'database': 'requests', - 'max_buffer_time': max_buffer_time - } - - # 2 requests - response = self.fetch('/') - self.assertEqual(response.code, 204) - time.sleep(max_buffer_time+1) - response = self.fetch('/') - self.assertEqual(response.code, 204) - self.assertEqual(2, len(self.influx_messages)) - - for key, fields, timestamp in self.influx_messages: - if key.startswith('my-service,'): - value_dict = dict(a.split('=') for a in fields.split(',')) - self.assertEqual(int(value_dict['slept']), 42) - break - else: - self.fail('Expected to find "request" metric in {!r}'.format( - list(self.application.influx_db['requests']))) - - def test_metrics_with_buffer_flush_on_max_length(self): - max_buffer_length = 2 - self.application.settings[metrics.InfluxDBMixin.SETTINGS_KEY] = { - 'measurement': 'my-service', - 'write_url': self.get_url('/write'), - 'database': 'requests', - 'max_buffer_time': 100, - 'max_buffer_length': max_buffer_length - } - - # 3 requests with buffer length = 2, - # so only 2 metrics should be flushed - response = self.fetch('/') - self.assertEqual(response.code, 204) - response = self.fetch('/') - self.assertEqual(response.code, 204) - response = self.fetch('/') - self.assertEqual(response.code, 204) - self.assertEqual(max_buffer_length, len(self.influx_messages)) - - for key, fields, timestamp in self.influx_messages: - if key.startswith('my-service,'): - value_dict = dict(a.split('=') for a in fields.split(',')) - self.assertEqual(int(value_dict['slept']), 42) - break - else: - self.fail('Expected to find "request" metric in {!r}'.format( - list(self.application.influx_db['requests']))) - def test_metrics_with_buffer_not_flush(self): - self.application.settings[metrics.InfluxDBMixin.SETTINGS_KEY] = { - 'measurement': 'my-service', - 'write_url': self.get_url('/write'), - 'database': 'requests', - 'max_buffer_time': 100, - 'max_buffer_length': 100 + self.application.settings[influxdb] = { + 'measurement': 'my-service' } # 2 requests