diff --git a/README.rst b/README.rst index 9e7bffe..c62cf6f 100644 --- a/README.rst +++ b/README.rst @@ -3,7 +3,7 @@ sprockets.mixins.metrics |Version| |Status| |Coverage| |License| -Adjust counter and timer metrics in `InfluxDB`_ or `StatsD`_ using the same API. +Adjust counter and timer metrics in `StatsD`_ using the same API. The mix-in is configured through the ``tornado.web.Application`` settings property using a key defined by the specific mix-in. @@ -63,76 +63,6 @@ Settings :prepend_metric_type: Optional flag to prepend bucket path with the StatsD metric type -InfluxDB Mixin --------------- - -The following snippet configures the InfluxDB mix-in from common environment -variables: - -.. code-block:: python - - import os - - from sprockets.mixins.metrics import influxdb - from sprockets.mixins import postgresql - from tornado import gen, web - - def make_app(**settings): - settings[influxdb.SETTINGS_KEY] = { - 'measurement': 'rollup', - } - - application = web.Application( - [ - web.url(r'/', MyHandler), - ], **settings) - - influxdb.install({'url': 'http://localhost:8086', - 'database': 'tornado-app'}) - return application - - - class MyHandler(influxdb.InfluxDBMixin, - postgresql.HandlerMixin, - web.RequestHandler): - - @gen.coroutine - def get(self, obj_id): - with self.execution_timer('dbquery', 'get'): - result = yield self.postgresql_session.query( - 'SELECT * FROM foo WHERE id=%s', obj_id) - self.send_response(result) - -If your application handles signal handling for shutdowns, the -:meth:`~sprockets.mixins.influxdb.shutdown` method will try to cleanly ensure -that any buffered metrics in the InfluxDB collector are written prior to -shutting down. The method returns a :class:`~tornado.concurrent.TracebackFuture` -that should be waited on prior to shutting down. - -For environment variable based configuration, use the ``INFLUX_SCHEME``, -``INFLUX_HOST``, and ``INFLUX_PORT`` environment variables. The defaults are -``https``, ``localhost``, and ``8086`` respectively. - -To use authentication with InfluxDB, set the ``INFLUX_USER`` and the -``INFLUX_PASSWORD`` environment variables. Once installed, the -``INFLUX_PASSWORD`` value will be masked in the Python process. - -Settings -^^^^^^^^ - -:url: The InfluxDB API URL -:database: the database to write measurements into -:submission_interval: How often to submit metric batches in - milliseconds. Default: ``5000`` -:max_batch_size: The number of measurements to be submitted in a - single HTTP request. Default: ``1000`` -:tags: Default tags that are to be submitted with each metric. The tag - ``hostname`` is added by default along with ``environment`` and ``service`` - if the corresponding ``ENVIRONMENT`` or ``SERVICE`` environment variables - are set. -:auth_username: A username to use for InfluxDB authentication, if desired. -:auth_password: A password to use for InfluxDB authentication, if desired. - Development Quickstart ---------------------- .. code-block:: bash @@ -164,7 +94,6 @@ Development Quickstart (env)$ open build/sphinx/html/index.html .. _StatsD: https://github.com/etsy/statsd -.. _InfluxDB: https://influxdata.com .. |Version| image:: https://img.shields.io/pypi/v/sprockets_mixins_metrics.svg diff --git a/docs/api.rst b/docs/api.rst index 618040a..c68183d 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -43,30 +43,12 @@ implements the same interface: with self.execution_timer('db', 'query', 'foo'): rows = yield self.session.query('SELECT * FROM foo') - .. method:: set_metric_tag(tag, value) - :noindex: - - :param str tag: the tag to set - :param str value: the value to assign to the tag - - This method stores a tag and value pair to be reported with - metrics. It is only implemented on back-ends that support - tagging metrics (e.g., :class:`sprockets.mixins.metrics.InfluxDBMixin`) - Statsd Implementation --------------------- .. autoclass:: sprockets.mixins.metrics.statsd.StatsdMixin :members: -InfluxDB Implementation ------------------------ -.. autoclass:: sprockets.mixins.metrics.influxdb.InfluxDBMixin - :members: - -.. autoclass:: sprockets.mixins.metrics.influxdb.InfluxDBCollector - :members: - Testing Helpers --------------- *So who actually tests that their metrics are emitted as they expect?* @@ -76,6 +58,3 @@ contains some helper that make testing a little easier. .. autoclass:: sprockets.mixins.metrics.testing.FakeStatsdServer :members: - -.. autoclass:: sprockets.mixins.metrics.testing.FakeInfluxHandler - :members: diff --git a/docs/examples.rst b/docs/examples.rst index 3c2e649..42c8992 100644 --- a/docs/examples.rst +++ b/docs/examples.rst @@ -16,22 +16,3 @@ as a base class. .. literalinclude:: ../examples/statsd.py :pyobject: SimpleHandler - -Sending measurements to InfluxDB --------------------------------- -This simple application sends per-request measurements to an InfluxDB -server listening on ``localhost``. The mix-in is configured by passing -a ``sprockets.mixins.metrics.influxdb`` key into the application settings -as shown below. - -.. literalinclude:: ../examples/influxdb.py - :pyobject: make_application - -The InfluxDB database and measurement name are also configured in the -application settings object. The request handler is responsible for -providing the tag and value portions of the measurement. The standard -:class:`Metric Mixin API` is used to set -tagged values. - -.. literalinclude:: ../examples/influxdb.py - :pyobject: SimpleHandler diff --git a/examples/influxdb.py b/examples/influxdb.py deleted file mode 100644 index b794002..0000000 --- a/examples/influxdb.py +++ /dev/null @@ -1,86 +0,0 @@ -import signal - -from sprockets.mixins.metrics import influxdb -from tornado import concurrent, gen, ioloop, web - - -class SimpleHandler(influxdb.InfluxDBMixin, web.RequestHandler): - """ - Simply emits a few metrics around the GET method. - - The ``InfluxDBMixin`` sends all of the metrics gathered during - the processing of a request as a single measurement when the - request is finished. Each request of this sample will result - in a single measurement using the service name as the key. - - The following tag keys are defined by default: - - handler="examples.influxdb.SimpleHandler" - host="$HOSTNAME" - method="GET" - - and the following values are written: - - duration=0.2573668956756592 - sleepytime=0.255108118057251 - slept=42 - status_code=204 - - The duration and status_code values are handled by the mix-in - and the slept and sleepytime values are added in the method. - - """ - - def initialize(self): - super(SimpleHandler, self).initialize() - self.set_metric_tag('environment', 'testing') - - @gen.coroutine - def prepare(self): - maybe_future = super(SimpleHandler, self).prepare() - if concurrent.is_future(maybe_future): - yield maybe_future - - if 'Correlation-ID' in self.request.headers: - self.set_metric_tag('correlation_id', - self.request.headers['Correlation-ID']) - - @gen.coroutine - def get(self): - with self.execution_timer('sleepytime'): - yield gen.sleep(0.25) - self.increase_counter('slept', amount=42) - self.set_status(204) - self.finish() - - -def _sig_handler(*args_): - iol = ioloop.IOLoop.instance() - iol.add_callback_from_signal(iol.stop) - - -def make_application(): - """ - Create a application configured to send metrics. - - Measurements will be sent to the ``testing`` database on the - configured InfluxDB instance. The measurement name is set - by the ``service`` setting. - - """ - settings = { - influxdb.SETTINGS_KEY: { - 'measurement': 'example', - } - } - application = web.Application([web.url('/', SimpleHandler)], **settings) - influxdb.install(application, **{'database': 'testing'}) - return application - - -if __name__ == '__main__': - app = make_application() - app.listen(8000) - signal.signal(signal.SIGINT, _sig_handler) - signal.signal(signal.SIGTERM, _sig_handler) - ioloop.IOLoop.instance().start() diff --git a/examples/statsd.py b/examples/statsd.py index 0278984..27cca9a 100644 --- a/examples/statsd.py +++ b/examples/statsd.py @@ -14,16 +14,6 @@ class SimpleHandler(statsd.StatsdMixin, web.RequestHandler): """ - @gen.coroutine - def prepare(self): - maybe_future = super(SimpleHandler, self).prepare() - if concurrent.is_future(maybe_future): - yield maybe_future - - if 'Correlation-ID' in self.request.headers: - self.set_metric_tag('correlation_id', - self.request.headers['Correlation-ID']) - @gen.coroutine def get(self): yield gen.sleep(0.25) diff --git a/requires/testing.txt b/requires/testing.txt index 0ec0feb..fd00fa2 100644 --- a/requires/testing.txt +++ b/requires/testing.txt @@ -1,3 +1,2 @@ -mock>=2,<3 nose>=1.3,<2 tornado>=4.2,<4.3 diff --git a/sprockets/mixins/metrics/influxdb.py b/sprockets/mixins/metrics/influxdb.py deleted file mode 100644 index 57fba7a..0000000 --- a/sprockets/mixins/metrics/influxdb.py +++ /dev/null @@ -1,362 +0,0 @@ -import contextlib -import logging -import os -import socket -import time - -from tornado import concurrent, httpclient, ioloop - -from sprockets.mixins.metrics import __version__ - -LOGGER = logging.getLogger(__name__) - -SETTINGS_KEY = 'sprockets.mixins.metrics.influxdb' -"""``self.settings`` key that configures this mix-in.""" - -_USER_AGENT = 'sprockets.mixins.metrics/v{}'.format(__version__) - - -class InfluxDBMixin(object): - """Mix this class in to record measurements to a InfluxDB server.""" - - def __init__(self, application, request, **kwargs): - self.__metrics = [] - self.__tags = { - 'handler': '{}.{}'.format(self.__module__, - self.__class__.__name__), - 'method': request.method, - } - - # Call to super().__init__() needs to be *AFTER* we create our - # properties since it calls initialize() which may want to call - # methods like ``set_metric_tag`` - super(InfluxDBMixin, self).__init__(application, request, **kwargs) - - def set_metric_tag(self, tag, value): - """ - Add a tag to the measurement key. - - :param str tag: name of the tag to set - :param str value: value to assign - - This will overwrite the current value assigned to a tag - if one exists. - - """ - self.__tags[tag] = value - - def record_timing(self, duration, *path): - """ - Record a timing. - - :param float duration: timing to record in seconds - :param path: elements of the metric path to record - - A timing is a named duration value. - - """ - self.__metrics.append('{}={}'.format( - self.application.influxdb.escape_str('.'.join(path)), duration)) - - def increase_counter(self, *path, **kwargs): - """ - Increase a counter. - - :param path: elements of the path to record - :keyword int amount: value to record. If omitted, the counter - value is one. - - Counters are simply values that are summed in a query. - - """ - self.__metrics.append('{}={}'.format( - self.application.influxdb.escape_str('.'.join(path)), - kwargs.get('amount', 1))) - - @contextlib.contextmanager - def execution_timer(self, *path): - """ - Record the time it takes to run an arbitrary code block. - - :param path: elements of the metric path to record - - This method returns a context manager that records the amount - of time spent inside of the context and records a value - named `path` using (:meth:`record_timing`). - - """ - start = time.time() - try: - yield - finally: - self.record_timing(max(time.time(), start) - start, *path) - - def on_finish(self): - super(InfluxDBMixin, self).on_finish() - self.set_metric_tag('status_code', self.get_status()) - self.record_timing(self.request.request_time(), 'duration') - self.application.influxdb.submit( - self.settings[SETTINGS_KEY]['measurement'], - self.__tags, - self.__metrics) - - -class InfluxDBCollector(object): - """Collects and submits stats to InfluxDB on a periodic callback. - - :param str url: The InfluxDB API URL - :param str database: the database to write measurements into - :param tornado.ioloop.IOLoop: the IOLoop to spawn callbacks on. - If this parameter is :data:`None`, then the active IOLoop, - as determined by :meth:`tornado.ioloop.IOLoop.instance`, - is used. - :param int submission_interval: How often to submit metric batches in - milliseconds. Default: ``5000`` - :param max_batch_size: The number of measurements to be submitted in a - single HTTP request. Default: ``1000`` - :param dict tags: Default tags that are to be submitted with each metric. - :param str auth_username: Optional username for authenticated requests. - :param str auth_password: Optional password for authenticated requests. - - This class should be constructed using the - :meth:`~sprockets.mixins.influxdb.install` method. When installed, it is - attached to the :class:`~tornado.web.Application` instance for your web - application and schedules a periodic callback to submit metrics to InfluxDB - in batches. - - """ - SUBMISSION_INTERVAL = 5000 - MAX_BATCH_SIZE = 1000 - WARN_THRESHOLD = 25000 - - def __init__(self, url='http://localhost:8086', database='sprockets', - io_loop=None, submission_interval=SUBMISSION_INTERVAL, - max_batch_size=MAX_BATCH_SIZE, tags=None, - auth_username=None, auth_password=None): - self._buffer = list() - self._database = database - self._influxdb_url = '{}?db={}'.format(url, database) - self._interval = submission_interval or self.SUBMISSION_INTERVAL - self._io_loop = io_loop or ioloop.IOLoop.current() - self._max_batch_size = max_batch_size or self.MAX_BATCH_SIZE - self._pending = 0 - self._tags = tags or {} - - # Configure the default - defaults = {'user_agent': _USER_AGENT} - if auth_username and auth_password: - LOGGER.debug('Adding authentication info to defaults (%s)', - auth_username) - defaults['auth_username'] = auth_username - defaults['auth_password'] = auth_password - - self._client = httpclient.AsyncHTTPClient(force_instance=True, - defaults=defaults, - io_loop=self._io_loop) - - # Add the periodic callback for submitting metrics - LOGGER.info('Starting PeriodicCallback for writing InfluxDB metrics') - self._callback = ioloop.PeriodicCallback(self._write_metrics, - self._interval) - self._callback.start() - - @staticmethod - def escape_str(value): - """Escape the value with InfluxDB's wonderful escaping logic: - - "Measurement names, tag keys, and tag values must escape any spaces or - commas using a backslash (\). For example: \ and \,. All tag values are - stored as strings and should not be surrounded in quotes." - - :param str value: The value to be escaped - :rtype: str - - """ - return str(value).replace(' ', '\ ').replace(',', '\,') - - @property - def database(self): - """Return the configured database name. - - :rtype: str - - """ - return self._database - - def shutdown(self): - """Invoke on shutdown of your application to stop the periodic - callbacks and flush any remaining metrics. - - Returns a future that is complete when all pending metrics have been - submitted. - - :rtype: :class:`~tornado.concurrent.TracebackFuture()` - - """ - future = concurrent.TracebackFuture() - self._callback.stop() - self._write_metrics() - self._shutdown_wait(future) - return future - - def submit(self, measurement, tags, values): - """Add a measurement to the buffer that will be submitted to InfluxDB - on the next periodic callback for writing metrics. - - :param str measurement: The measurement name - :param dict tags: The measurement tags - :param list values: The recorded measurements - - """ - self._buffer.append('{},{} {} {:d}'.format( - self.escape_str(measurement), - self._get_tag_string(tags), - ','.join(values), - int(time.time() * 1000000000))) - if len(self._buffer) > self.WARN_THRESHOLD: - LOGGER.warning('InfluxDB metric buffer is > %i (%i)', - self.WARN_THRESHOLD, len(self._buffer)) - - def _get_tag_string(self, tags): - """Return the tags to be submitted with a measurement combining the - default tags that were passed in when constructing the class along - with any measurement specific tags passed into the - :meth:`~InfluxDBConnection.submit` method. Tags will be properly - escaped and formatted for submission. - - :param dict tags: Measurement specific tags - :rtype: str - - """ - values = dict(self._tags) - values.update(tags) - return ','.join(['{}={}'.format(self.escape_str(k), self.escape_str(v)) - for k, v in values.items()]) - - def _on_write_response(self, response): - """This is invoked by the Tornado IOLoop when the HTTP request to - InfluxDB has returned with a result. - - :param response: The response from InfluxDB - :type response: :class:`~tornado.httpclient.HTTPResponse` - - """ - self._pending -= 1 - LOGGER.debug('InfluxDB batch response: %s', response.code) - if response.error: - LOGGER.error('InfluxDB batch submission error: %s', response.error) - - def _shutdown_wait(self, future): - """Pause briefly allowing any pending metric writes to complete before - shutting down. - - :param future tornado.concurrent.TracebackFuture: The future to resulve - when the shutdown is complete. - - """ - if not self._pending: - future.set_result(True) - return - LOGGER.debug('Waiting for pending metric writes') - self._io_loop.add_timeout(self._io_loop.time() + 0.1, - self._shutdown_wait, - (future,)) - - def _write_metrics(self): - """Submit the metrics in the buffer to InfluxDB. This is invoked - by the periodic callback scheduled when the class is created. - - It will submit batches until the buffer is empty. - - """ - if not self._buffer: - return - LOGGER.debug('InfluxDB buffer has %i items', len(self._buffer)) - while self._buffer: - body = '\n'.join(self._buffer[:self._max_batch_size]) - self._buffer = self._buffer[self._max_batch_size:] - self._pending += 1 - self._client.fetch(self._influxdb_url, method='POST', - body=body.encode('utf-8'), - raise_error=False, - callback=self._on_write_response) - LOGGER.debug('Submitted all InfluxDB metrics for writing') - - -def install(application, **kwargs): - """Call this to install the InfluxDB collector into a Tornado application. - - :param tornado.web.Application application: the application to - install the collector into. - :param kwargs: keyword parameters to pass to the - :class:`InfluxDBCollector` initializer. - :returns: :data:`True` if the client was installed by this call - and :data:`False` otherwise. - - - Optional configuration values: - - - **url** The InfluxDB API URL. If URL is not specified, the - ``INFLUX_HOST`` and ``INFLUX_PORT`` environment variables will be used - to construct the URL to pass into the :class:`InfluxDBCollector`. - - **database** the database to write measurements into. - The default is ``sprockets``. - - **io_loop** A :class:`~tornado.ioloop.IOLoop` to use - - **submission_interval** How often to submit metric batches in - milliseconds. Default: ``5000`` - - **max_batch_size** The number of measurements to be submitted in a - single HTTP request. Default: ``1000`` - - **tags** Default tags that are to be submitted with each metric. - - **auth_username** A username to use for InfluxDB authentication - - **auth_password** A password to use for InfluxDB authentication - - If ``auth_password`` is specified as an environment variable, it will be - masked in the Python process. - - """ - if getattr(application, 'influxdb', None) is not None: - LOGGER.warning('InfluxDBCollector is already installed') - return False - - # Get config values - url = '{}://{}:{}/write'.format(os.environ.get('INFLUX_SCHEME', 'http'), - os.environ.get('INFLUX_HOST', 'localhost'), - os.environ.get('INFLUX_PORT', 8086)) - kwargs.setdefault('url', url) - - # Build the full tag dict and replace what was passed in - tags = {'hostname': socket.gethostname()} - if os.environ.get('ENVIRONMENT'): - tags['environment'] = os.environ.get('ENVIRONMENT') - if os.environ.get('SERVICE'): - tags['service'] = os.environ.get('SERVICE') - tags.update(kwargs.get('tags', {})) - kwargs['tags'] = tags - - # Check if auth variables are set as env vars and set them if so - if os.environ.get('INFLUX_USER'): - kwargs.setdefault('auth_username', os.environ.get('INFLUX_USER')) - kwargs.setdefault('auth_password', - os.environ.get('INFLUX_PASSWORD', '')) - - # Don't leave the environment variable out there with the password - if os.environ.get('INFLUX_PASSWORD'): - os.environ['INFLUX_PASSWORD'] = 'X' * len(kwargs['auth_password']) - - # Create and start the collector - setattr(application, 'influxdb', InfluxDBCollector(**kwargs)) - return True - - -def shutdown(application): - """Invoke to shutdown the InfluxDB collector, writing any pending - measurements to InfluxDB before stopping. - - :param tornado.web.Application application: the application to - install the collector into. - :rtype: tornado.concurrent.TracebackFuture or None - - """ - collector = getattr(application, 'influxdb', None) - if collector: - return collector.shutdown() diff --git a/sprockets/mixins/metrics/statsd.py b/sprockets/mixins/metrics/statsd.py index f0e3817..7cc5477 100644 --- a/sprockets/mixins/metrics/statsd.py +++ b/sprockets/mixins/metrics/statsd.py @@ -18,15 +18,6 @@ class StatsdMixin(object): def initialize(self): super(StatsdMixin, self).initialize() - def set_metric_tag(self, tag, value): - """Ignored for statsd since it does not support tagging. - - :param str tag: name of the tag to set - :param str value: value to assign - - """ - pass - def record_timing(self, duration, *path): """Record a timing. diff --git a/sprockets/mixins/metrics/testing.py b/sprockets/mixins/metrics/testing.py index f50acdb..2b486af 100644 --- a/sprockets/mixins/metrics/testing.py +++ b/sprockets/mixins/metrics/testing.py @@ -2,7 +2,7 @@ import logging import re import socket -from tornado import gen, iostream, locks, tcpserver, testing, web +from tornado import gen, iostream, locks, tcpserver, testing LOGGER = logging.getLogger(__name__) @@ -119,89 +119,3 @@ class FakeStatsdServer(tcpserver.TCPServer): raise AssertionError( 'Expected metric starting with "{}" in {!r}'.format( prefix, self.datagrams)) - - -class FakeInfluxHandler(web.RequestHandler): - """ - Request handler that mimics the InfluxDB write endpoint. - - Install this handler into your testing application and configure - the metrics plugin to write to it. After running a test, you can - examine the received measurements by iterating over the - ``influx_db`` list in the application object. - - .. code-block:: python - - class TestThatMyStuffWorks(testing.AsyncHTTPTestCase): - - def get_app(self): - self.app = web.Application([ - web.url('/', HandlerUnderTest), - web.url('/write', metrics.testing.FakeInfluxHandler), - ]) - return self.app - - def setUp(self): - super(TestThatMyStuffWorks, self).setUp() - self.app.settings[metrics.InfluxDBMixin.SETTINGS_KEY] = { - 'measurement': 'stuff', - 'write_url': self.get_url('/write'), - 'database': 'requests', - } - - def test_that_measurements_are_emitted(self): - self.fetch('/') # invokes handler under test - measurements = metrics.testing.FakeInfluxHandler( - self.app, 'requests', self) - for key, fields, timestamp in measurements: - # inspect measurements - - """ - def initialize(self): - super(FakeInfluxHandler, self).initialize() - self.logger = LOGGER.getChild(__name__) - if not hasattr(self.application, 'influx_db'): - self.application.influx_db = {} - if self.application.influxdb.database not in self.application.influx_db: - self.application.influx_db[self.application.influxdb.database] = [] - - def post(self): - db = self.get_query_argument('db') - payload = self.request.body.decode('utf-8') - for line in payload.splitlines(): - self.logger.debug('received "%s"', line) - key, fields, timestamp = line.split() - self.application.influx_db[db].append((key, fields, timestamp, - self.request.headers)) - self.set_status(204) - - @staticmethod - def get_messages(application, test_case): - """ - Wait for measurements to show up and return them. - - :param tornado.web.Application application: application that - :class:`.FakeInfluxHandler` is writing to - :param str database: database to retrieve - :param tornado.testing.AsyncTestCase test_case: test case - that is being executed - :return: measurements received as a :class:`list` of - (key, fields, timestamp) tuples - - Since measurements are sent asynchronously from within the - ``on_finish`` handler they are usually not sent by the time - that the test case has stopped the IOloop. This method accounts - for this by running the IOloop until measurements have been - received. It will raise an assertion error if measurements - are not received in a reasonable number of runs. - - """ - for _ in range(0, 15): - if hasattr(application, 'influx_db'): - if application.influx_db.get(application.influxdb.database): - return application.influx_db[application.influxdb.database] - test_case.io_loop.add_future(gen.sleep(0.1), - lambda _: test_case.stop()) - test_case.wait() - else: - test_case.fail('Message not published to InfluxDB before timeout') diff --git a/tests.py b/tests.py index 2161d9e..6c890fa 100644 --- a/tests.py +++ b/tests.py @@ -1,20 +1,13 @@ -import base64 import itertools -import logging -import os import socket -import time import unittest -import uuid from tornado import gen, iostream, testing, web import mock from mock import patch -from sprockets.mixins.metrics import influxdb, statsd -from sprockets.mixins.metrics.testing import ( - FakeInfluxHandler, FakeStatsdServer) -import examples.influxdb +from sprockets.mixins.metrics import statsd +from sprockets.mixins.metrics.testing import FakeStatsdServer import examples.statsd @@ -389,172 +382,3 @@ class StatsdInstallationTests(unittest.TestCase): statsd.install(self.application, **{'namespace': 'testing'}) self.assertEqual(self.application.statsd._host, '127.0.0.1') self.assertEqual(self.application.statsd._port, 8125) - - -class InfluxDbTests(testing.AsyncHTTPTestCase): - - def get_app(self): - self.application = web.Application([ - web.url(r'/', examples.influxdb.SimpleHandler), - web.url(r'/write', FakeInfluxHandler), - ]) - influxdb.install(self.application, **{'database': 'requests', - 'submission_interval': 1, - 'url': self.get_url('/write')}) - self.application.influx_db = {} - return self.application - - def setUp(self): - self.application = None - super(InfluxDbTests, self).setUp() - self.application.settings[influxdb.SETTINGS_KEY] = { - 'measurement': 'my-service' - } - logging.getLogger(FakeInfluxHandler.__module__).setLevel(logging.DEBUG) - - @gen.coroutine - def tearDown(self): - yield influxdb.shutdown(self.application) - super(InfluxDbTests, self).tearDown() - - @property - def influx_messages(self): - return FakeInfluxHandler.get_messages(self.application, self) - - def test_that_http_method_call_details_are_recorded(self): - start = int(time.time()) - response = self.fetch('/') - self.assertEqual(response.code, 204) - - for key, fields, timestamp, _headers in self.influx_messages: - if key.startswith('my-service,'): - tag_dict = dict(a.split('=') for a in key.split(',')[1:]) - self.assertEqual(tag_dict['handler'], - 'examples.influxdb.SimpleHandler') - self.assertEqual(tag_dict['method'], 'GET') - self.assertEqual(tag_dict['hostname'], socket.gethostname()) - self.assertEqual(tag_dict['status_code'], '204') - - value_dict = dict(a.split('=') for a in fields.split(',')) - self.assertIn('duration', value_dict) - self.assertTrue(float(value_dict['duration']) > 0) - - nanos_since_epoch = int(timestamp) - then = nanos_since_epoch / 1000000000 - assert_between(start, then, time.time()) - break - else: - self.fail('Expected to find "request" metric in {!r}'.format( - list(self.application.influx_db['requests']))) - - def test_that_execution_timer_is_tracked(self): - response = self.fetch('/') - self.assertEqual(response.code, 204) - - for key, fields, timestamp, _headers in self.influx_messages: - if key.startswith('my-service,'): - value_dict = dict(a.split('=') for a in fields.split(',')) - assert_between(0.25, float(value_dict['sleepytime']), 0.3) - break - else: - self.fail('Expected to find "request" metric in {!r}'.format( - list(self.application.influx_db['requests']))) - - def test_that_counter_is_tracked(self): - response = self.fetch('/') - self.assertEqual(response.code, 204) - - for key, fields, timestamp, _headers in self.influx_messages: - if key.startswith('my-service,'): - value_dict = dict(a.split('=') for a in fields.split(',')) - self.assertEqual(int(value_dict['slept']), 42) - break - else: - self.fail('Expected to find "request" metric in {!r}'.format( - list(self.application.influx_db['requests']))) - - def test_that_cached_db_connection_is_used(self): - cfg = self.application.settings[influxdb.SETTINGS_KEY] - conn = mock.Mock() - cfg['db_connection'] = conn - response = self.fetch('/') - self.assertEqual(response.code, 204) - self.assertIs(cfg['db_connection'], conn) - - def test_that_metric_tag_is_tracked(self): - cid = str(uuid.uuid4()) - response = self.fetch('/', headers={'Correlation-ID': cid}) - self.assertEqual(response.code, 204) - - for key, fields, timestamp, _headers in self.influx_messages: - if key.startswith('my-service,'): - tag_dict = dict(a.split('=') for a in key.split(',')[1:]) - self.assertEqual(tag_dict['correlation_id'], cid) - break - else: - self.fail('Expected to find "request" metric in {!r}'.format( - list(self.application.influx_db['requests']))) - - def test_metrics_with_buffer_not_flush(self): - self.application.settings[influxdb] = { - 'measurement': 'my-service' - } - - # 2 requests - response = self.fetch('/') - self.assertEqual(response.code, 204) - response = self.fetch('/') - self.assertEqual(response.code, 204) - with self.assertRaises(AssertionError): - self.assertEqual(0, len(self.influx_messages)) - - -class InfluxDbAuthTests(testing.AsyncHTTPTestCase): - - def setUp(self): - self.application = None - self.username, self.password = str(uuid.uuid4()), str(uuid.uuid4()) - os.environ['INFLUX_USER'] = self.username - os.environ['INFLUX_PASSWORD'] = self.password - super(InfluxDbAuthTests, self).setUp() - self.application.settings[influxdb.SETTINGS_KEY] = { - 'measurement': 'my-service' - } - logging.getLogger(FakeInfluxHandler.__module__).setLevel(logging.DEBUG) - - @gen.coroutine - def tearDown(self): - yield influxdb.shutdown(self.application) - super(InfluxDbAuthTests, self).tearDown() - - @property - def influx_messages(self): - return FakeInfluxHandler.get_messages(self.application, self) - - def get_app(self): - self.application = web.Application([ - web.url(r'/', examples.influxdb.SimpleHandler), - web.url(r'/write', FakeInfluxHandler), - ]) - influxdb.install(self.application, **{'database': 'requests', - 'submission_interval': 1, - 'url': self.get_url('/write')}) - self.application.influx_db = {} - return self.application - - def test_that_authentication_header_was_sent(self): - print(os.environ) - response = self.fetch('/') - self.assertEqual(response.code, 204) - - for _key, _fields, _timestamp, headers in self.influx_messages: - self.assertIn('Authorization', headers) - scheme, value = headers['Authorization'].split(' ') - self.assertEqual(scheme, 'Basic') - temp = base64.b64decode(value.encode('utf-8')) - values = temp.decode('utf-8').split(':') - self.assertEqual(values[0], self.username) - self.assertEqual(values[1], self.password) - break - else: - self.fail('Did not have an Authorization header')