diff --git a/README.rst b/README.rst index f5968d3..31432fb 100644 --- a/README.rst +++ b/README.rst @@ -101,6 +101,10 @@ that any buffered metrics in the InfluxDB collector are written prior to shutting down. The method returns a :cls:`~tornado.concurrent.TracebackFuture` that should be waited on prior to shutting down. +To use authentication with InfluxDB, set the ``INFLUX_USER`` and the +``INFLUX_PASSWORD`` environment variables. Once installed, the +``INFLUX_PASSWORD`` value will be masked in the Python process. + Settings ^^^^^^^^ diff --git a/docs/history.rst b/docs/history.rst index 058d5a3..9e03d4b 100644 --- a/docs/history.rst +++ b/docs/history.rst @@ -3,6 +3,10 @@ Release History =============== +`2.1.0`_ (2-Aug-2016) +--------------------- +- Add authentication environment variables for InfluxDB + `2.0.1`_ (21-Mar-2016) ---------------------- - Make it possible to call methods (e.g., @@ -40,7 +44,8 @@ Release History - Add :class:`sprockets.mixins.metrics.InfluxDBMixin` - Add :class:`sprockets.mixins.metrics.influxdb.InfluxDBConnection` -.. _Next Release: https://github.com/sprockets/sprockets.mixins.metrics/compare/2.0.1...master +.. _Next Release: https://github.com/sprockets/sprockets.mixins.metrics/compare/2.1.0...master +.. _2.1.0: https://github.com/sprockets/sprockets.mixins.metrics/compare/2.0.1...2.1.0 .. _2.0.1: https://github.com/sprockets/sprockets.mixins.metrics/compare/2.0.0...2.0.1 .. _2.0.0: https://github.com/sprockets/sprockets.mixins.metrics/compare/1.1.1...2.0.0 .. _1.1.1: https://github.com/sprockets/sprockets.mixins.metrics/compare/1.1.0...1.1.1 diff --git a/sprockets/mixins/metrics/__init__.py b/sprockets/mixins/metrics/__init__.py index e44ce59..239a121 100644 --- a/sprockets/mixins/metrics/__init__.py +++ b/sprockets/mixins/metrics/__init__.py @@ -1,3 +1,3 @@ -version_info = (2, 0, 1) +version_info = (2, 1, 0) __version__ = '.'.join(str(v) for v in version_info) __all__ = ['__version__', 'version_info'] diff --git a/sprockets/mixins/metrics/influxdb.py b/sprockets/mixins/metrics/influxdb.py index e5a8f85..fb817ff 100644 --- a/sprockets/mixins/metrics/influxdb.py +++ b/sprockets/mixins/metrics/influxdb.py @@ -115,6 +115,8 @@ class InfluxDBCollector(object): :param max_batch_size: The number of measurements to be submitted in a single HTTP request. Default: ``1000`` :param dict tags: Default tags that are to be submitted with each metric. + :param str auth_username: Optional username for authenticated requests. + :param str auth_password: Optional password for authenticated requests. This class should be constructed using the :meth:`~sprockets.mixins.influxdb.install` method. When installed, it is @@ -129,7 +131,8 @@ class InfluxDBCollector(object): def __init__(self, url='http://localhost:8086', database='sprockets', io_loop=None, submission_interval=SUBMISSION_INTERVAL, - max_batch_size=MAX_BATCH_SIZE, tags=None): + max_batch_size=MAX_BATCH_SIZE, tags=None, + auth_username=None, auth_password=None): self._buffer = list() self._database = database self._influxdb_url = '{}?db={}'.format(url, database) @@ -139,9 +142,17 @@ class InfluxDBCollector(object): self._pending = 0 self._tags = tags or {} + # Configure the default + defaults = {'user_agent': _USER_AGENT} + if auth_username and auth_password: + LOGGER.debug('Adding authentication info to defaults (%s)', + auth_username) + defaults['auth_username'] = auth_username + defaults['auth_password'] = auth_password + self._client = httpclient.AsyncHTTPClient(force_instance=True, + defaults=defaults, io_loop=self._io_loop) - self._client.configure(None, defaults={'user_agent': _USER_AGENT}) # Add the periodic callback for submitting metrics LOGGER.info('Starting PeriodicCallback for writing InfluxDB metrics') @@ -296,6 +307,11 @@ def install(application, **kwargs): - **max_batch_size** The number of measurements to be submitted in a single HTTP request. Default: ``1000`` - **tags** Default tags that are to be submitted with each metric. + - **auth_username** A username to use for InfluxDB authentication + - **auth_password** A password to use for InfluxDB authentication + + If ``auth_password`` is specified as an environment variable, it will be + masked in the Python process. """ if getattr(application, 'influxdb', None) is not None: @@ -316,6 +332,16 @@ def install(application, **kwargs): tags.update(kwargs.get('tags', {})) kwargs['tags'] = tags + # Check if auth variables are set as env vars and set them if so + if os.environ.get('INFLUX_USER'): + kwargs.setdefault('auth_username', os.environ.get('INFLUX_USER')) + kwargs.setdefault('auth_password', + os.environ.get('INFLUX_PASSWORD', '')) + + # Don't leave the environment variable out there with the password + if os.environ.get('INFLUX_PASSWORD'): + os.environ['INFLUX_PASSWORD'] = 'X' * len(kwargs['auth_password']) + # Create and start the collector setattr(application, 'influxdb', InfluxDBCollector(**kwargs)) return True diff --git a/sprockets/mixins/metrics/testing.py b/sprockets/mixins/metrics/testing.py index 8ec76be..c66275d 100644 --- a/sprockets/mixins/metrics/testing.py +++ b/sprockets/mixins/metrics/testing.py @@ -139,7 +139,8 @@ class FakeInfluxHandler(web.RequestHandler): for line in payload.splitlines(): self.logger.debug('received "%s"', line) key, fields, timestamp = line.split() - self.application.influx_db[db].append((key, fields, timestamp)) + self.application.influx_db[db].append((key, fields, timestamp, + self.request.headers)) self.set_status(204) @staticmethod diff --git a/tests.py b/tests.py index eca1954..6fb328d 100644 --- a/tests.py +++ b/tests.py @@ -1,4 +1,6 @@ +import base64 import logging +import os import socket import time import uuid @@ -145,7 +147,7 @@ class InfluxDbTests(testing.AsyncHTTPTestCase): response = self.fetch('/') self.assertEqual(response.code, 204) - for key, fields, timestamp in self.influx_messages: + for key, fields, timestamp, _headers in self.influx_messages: if key.startswith('my-service,'): tag_dict = dict(a.split('=') for a in key.split(',')[1:]) self.assertEqual(tag_dict['handler'], @@ -169,7 +171,7 @@ class InfluxDbTests(testing.AsyncHTTPTestCase): response = self.fetch('/') self.assertEqual(response.code, 204) - for key, fields, timestamp in self.influx_messages: + for key, fields, timestamp, _headers in self.influx_messages: if key.startswith('my-service,'): value_dict = dict(a.split('=') for a in fields.split(',')) assert_between(0.25, float(value_dict['sleepytime']), 0.3) @@ -182,7 +184,7 @@ class InfluxDbTests(testing.AsyncHTTPTestCase): response = self.fetch('/') self.assertEqual(response.code, 204) - for key, fields, timestamp in self.influx_messages: + for key, fields, timestamp, _headers in self.influx_messages: if key.startswith('my-service,'): value_dict = dict(a.split('=') for a in fields.split(',')) self.assertEqual(int(value_dict['slept']), 42) @@ -204,7 +206,7 @@ class InfluxDbTests(testing.AsyncHTTPTestCase): response = self.fetch('/', headers={'Correlation-ID': cid}) self.assertEqual(response.code, 204) - for key, fields, timestamp in self.influx_messages: + for key, fields, timestamp, _headers in self.influx_messages: if key.startswith('my-service,'): tag_dict = dict(a.split('=') for a in key.split(',')[1:]) self.assertEqual(tag_dict['correlation_id'], cid) @@ -225,3 +227,54 @@ class InfluxDbTests(testing.AsyncHTTPTestCase): self.assertEqual(response.code, 204) with self.assertRaises(AssertionError): self.assertEqual(0, len(self.influx_messages)) + + +class InfluxDbAuthTests(testing.AsyncHTTPTestCase): + + def setUp(self): + self.application = None + self.username, self.password = str(uuid.uuid4()), str(uuid.uuid4()) + os.environ['INFLUX_USER'] = self.username + os.environ['INFLUX_PASSWORD'] = self.password + super(InfluxDbAuthTests, self).setUp() + self.application.settings[influxdb.SETTINGS_KEY] = { + 'measurement': 'my-service' + } + logging.getLogger(FakeInfluxHandler.__module__).setLevel(logging.DEBUG) + + @gen.coroutine + def tearDown(self): + yield influxdb.shutdown(self.application) + super(InfluxDbAuthTests, self).tearDown() + + @property + def influx_messages(self): + return FakeInfluxHandler.get_messages(self.application, self) + + def get_app(self): + self.application = web.Application([ + web.url(r'/', examples.influxdb.SimpleHandler), + web.url(r'/write', FakeInfluxHandler), + ]) + influxdb.install(self.application, **{'database': 'requests', + 'submission_interval': 1, + 'url': self.get_url('/write')}) + self.application.influx_db = {} + return self.application + + def test_that_authentication_header_was_sent(self): + print(os.environ) + response = self.fetch('/') + self.assertEqual(response.code, 204) + + for _key, _fields, _timestamp, headers in self.influx_messages: + self.assertIn('Authorization', headers) + scheme, value = headers['Authorization'].split(' ') + self.assertEqual(scheme, 'Basic') + temp = base64.b64decode(value.encode('utf-8')) + values = temp.decode('utf-8').split(':') + self.assertEqual(values[0], self.username) + self.assertEqual(values[1], self.password) + break + else: + self.fail('Did not have an Authorization header')