Add authentication support for InfluxDB

This commit is contained in:
Gavin M. Roy 2016-08-02 17:45:35 -04:00
parent cfb9157387
commit 56caca7c46
6 changed files with 98 additions and 9 deletions

View file

@ -101,6 +101,10 @@ that any buffered metrics in the InfluxDB collector are written prior to
shutting down. The method returns a :cls:`~tornado.concurrent.TracebackFuture`
that should be waited on prior to shutting down.
To use authentication with InfluxDB, set the ``INFLUX_USER`` and the
``INFLUX_PASSWORD`` environment variables. Once installed, the
``INFLUX_PASSWORD`` value will be masked in the Python process.
Settings
^^^^^^^^

View file

@ -3,6 +3,10 @@
Release History
===============
`2.1.0`_ (2-Aug-2016)
---------------------
- Add authentication environment variables for InfluxDB
`2.0.1`_ (21-Mar-2016)
----------------------
- Make it possible to call methods (e.g.,
@ -40,7 +44,8 @@ Release History
- Add :class:`sprockets.mixins.metrics.InfluxDBMixin`
- Add :class:`sprockets.mixins.metrics.influxdb.InfluxDBConnection`
.. _Next Release: https://github.com/sprockets/sprockets.mixins.metrics/compare/2.0.1...master
.. _Next Release: https://github.com/sprockets/sprockets.mixins.metrics/compare/2.1.0...master
.. _2.1.0: https://github.com/sprockets/sprockets.mixins.metrics/compare/2.0.1...2.1.0
.. _2.0.1: https://github.com/sprockets/sprockets.mixins.metrics/compare/2.0.0...2.0.1
.. _2.0.0: https://github.com/sprockets/sprockets.mixins.metrics/compare/1.1.1...2.0.0
.. _1.1.1: https://github.com/sprockets/sprockets.mixins.metrics/compare/1.1.0...1.1.1

View file

@ -1,3 +1,3 @@
version_info = (2, 0, 1)
version_info = (2, 1, 0)
__version__ = '.'.join(str(v) for v in version_info)
__all__ = ['__version__', 'version_info']

View file

@ -115,6 +115,8 @@ class InfluxDBCollector(object):
:param max_batch_size: The number of measurements to be submitted in a
single HTTP request. Default: ``1000``
:param dict tags: Default tags that are to be submitted with each metric.
:param str auth_username: Optional username for authenticated requests.
:param str auth_password: Optional password for authenticated requests.
This class should be constructed using the
:meth:`~sprockets.mixins.influxdb.install` method. When installed, it is
@ -129,7 +131,8 @@ class InfluxDBCollector(object):
def __init__(self, url='http://localhost:8086', database='sprockets',
io_loop=None, submission_interval=SUBMISSION_INTERVAL,
max_batch_size=MAX_BATCH_SIZE, tags=None):
max_batch_size=MAX_BATCH_SIZE, tags=None,
auth_username=None, auth_password=None):
self._buffer = list()
self._database = database
self._influxdb_url = '{}?db={}'.format(url, database)
@ -139,9 +142,17 @@ class InfluxDBCollector(object):
self._pending = 0
self._tags = tags or {}
# Configure the default
defaults = {'user_agent': _USER_AGENT}
if auth_username and auth_password:
LOGGER.debug('Adding authentication info to defaults (%s)',
auth_username)
defaults['auth_username'] = auth_username
defaults['auth_password'] = auth_password
self._client = httpclient.AsyncHTTPClient(force_instance=True,
defaults=defaults,
io_loop=self._io_loop)
self._client.configure(None, defaults={'user_agent': _USER_AGENT})
# Add the periodic callback for submitting metrics
LOGGER.info('Starting PeriodicCallback for writing InfluxDB metrics')
@ -296,6 +307,11 @@ def install(application, **kwargs):
- **max_batch_size** The number of measurements to be submitted in a
single HTTP request. Default: ``1000``
- **tags** Default tags that are to be submitted with each metric.
- **auth_username** A username to use for InfluxDB authentication
- **auth_password** A password to use for InfluxDB authentication
If ``auth_password`` is specified as an environment variable, it will be
masked in the Python process.
"""
if getattr(application, 'influxdb', None) is not None:
@ -316,6 +332,16 @@ def install(application, **kwargs):
tags.update(kwargs.get('tags', {}))
kwargs['tags'] = tags
# Check if auth variables are set as env vars and set them if so
if os.environ.get('INFLUX_USER'):
kwargs.setdefault('auth_username', os.environ.get('INFLUX_USER'))
kwargs.setdefault('auth_password',
os.environ.get('INFLUX_PASSWORD', ''))
# Don't leave the environment variable out there with the password
if os.environ.get('INFLUX_PASSWORD'):
os.environ['INFLUX_PASSWORD'] = 'X' * len(kwargs['auth_password'])
# Create and start the collector
setattr(application, 'influxdb', InfluxDBCollector(**kwargs))
return True

View file

@ -139,7 +139,8 @@ class FakeInfluxHandler(web.RequestHandler):
for line in payload.splitlines():
self.logger.debug('received "%s"', line)
key, fields, timestamp = line.split()
self.application.influx_db[db].append((key, fields, timestamp))
self.application.influx_db[db].append((key, fields, timestamp,
self.request.headers))
self.set_status(204)
@staticmethod

View file

@ -1,4 +1,6 @@
import base64
import logging
import os
import socket
import time
import uuid
@ -145,7 +147,7 @@ class InfluxDbTests(testing.AsyncHTTPTestCase):
response = self.fetch('/')
self.assertEqual(response.code, 204)
for key, fields, timestamp in self.influx_messages:
for key, fields, timestamp, _headers in self.influx_messages:
if key.startswith('my-service,'):
tag_dict = dict(a.split('=') for a in key.split(',')[1:])
self.assertEqual(tag_dict['handler'],
@ -169,7 +171,7 @@ class InfluxDbTests(testing.AsyncHTTPTestCase):
response = self.fetch('/')
self.assertEqual(response.code, 204)
for key, fields, timestamp in self.influx_messages:
for key, fields, timestamp, _headers in self.influx_messages:
if key.startswith('my-service,'):
value_dict = dict(a.split('=') for a in fields.split(','))
assert_between(0.25, float(value_dict['sleepytime']), 0.3)
@ -182,7 +184,7 @@ class InfluxDbTests(testing.AsyncHTTPTestCase):
response = self.fetch('/')
self.assertEqual(response.code, 204)
for key, fields, timestamp in self.influx_messages:
for key, fields, timestamp, _headers in self.influx_messages:
if key.startswith('my-service,'):
value_dict = dict(a.split('=') for a in fields.split(','))
self.assertEqual(int(value_dict['slept']), 42)
@ -204,7 +206,7 @@ class InfluxDbTests(testing.AsyncHTTPTestCase):
response = self.fetch('/', headers={'Correlation-ID': cid})
self.assertEqual(response.code, 204)
for key, fields, timestamp in self.influx_messages:
for key, fields, timestamp, _headers in self.influx_messages:
if key.startswith('my-service,'):
tag_dict = dict(a.split('=') for a in key.split(',')[1:])
self.assertEqual(tag_dict['correlation_id'], cid)
@ -225,3 +227,54 @@ class InfluxDbTests(testing.AsyncHTTPTestCase):
self.assertEqual(response.code, 204)
with self.assertRaises(AssertionError):
self.assertEqual(0, len(self.influx_messages))
class InfluxDbAuthTests(testing.AsyncHTTPTestCase):
def setUp(self):
self.application = None
self.username, self.password = str(uuid.uuid4()), str(uuid.uuid4())
os.environ['INFLUX_USER'] = self.username
os.environ['INFLUX_PASSWORD'] = self.password
super(InfluxDbAuthTests, self).setUp()
self.application.settings[influxdb.SETTINGS_KEY] = {
'measurement': 'my-service'
}
logging.getLogger(FakeInfluxHandler.__module__).setLevel(logging.DEBUG)
@gen.coroutine
def tearDown(self):
yield influxdb.shutdown(self.application)
super(InfluxDbAuthTests, self).tearDown()
@property
def influx_messages(self):
return FakeInfluxHandler.get_messages(self.application, self)
def get_app(self):
self.application = web.Application([
web.url(r'/', examples.influxdb.SimpleHandler),
web.url(r'/write', FakeInfluxHandler),
])
influxdb.install(self.application, **{'database': 'requests',
'submission_interval': 1,
'url': self.get_url('/write')})
self.application.influx_db = {}
return self.application
def test_that_authentication_header_was_sent(self):
print(os.environ)
response = self.fetch('/')
self.assertEqual(response.code, 204)
for _key, _fields, _timestamp, headers in self.influx_messages:
self.assertIn('Authorization', headers)
scheme, value = headers['Authorization'].split(' ')
self.assertEqual(scheme, 'Basic')
temp = base64.b64decode(value.encode('utf-8'))
values = temp.decode('utf-8').split(':')
self.assertEqual(values[0], self.username)
self.assertEqual(values[1], self.password)
break
else:
self.fail('Did not have an Authorization header')