Add some controls for operational tuning

- Add a flag to disable submission
- Add more environment variables for configuration
- Add a maximum buffer size that discards metrics when there are too many
- Remove correlation-id fields
This commit is contained in:
Gavin M. Roy 2016-10-13 18:52:17 -04:00
parent 3eea44dfef
commit ac7d886f11
4 changed files with 106 additions and 49 deletions

View file

@ -27,30 +27,38 @@ documentation.
The following table details the environment variable configuration options.
+-----------------------+-------------------------------------------------+---------------+
| Variable | Definition | Default |
+=======================+=================================================+===============+
| ``INFLUXDB_SCHEME`` | The URL request scheme for making HTTP requests | ``https`` |
+-----------------------+-------------------------------------------------+---------------+
| ``INFLUXDB_HOST`` | The InfluxDB server hostname | ``localhost`` |
+-----------------------+-------------------------------------------------+---------------+
| ``INFLUXDB_PORT`` | The InfluxDB server port | ``8086`` |
+-----------------------+-------------------------------------------------+---------------+
| ``INFLUXDB_USER`` | The InfluxDB server username | |
+-----------------------+-------------------------------------------------+---------------+
| ``INFLUXDB_PASSWORD`` | The InfluxDB server password | |
+-----------------------+-------------------------------------------------+---------------+
+-------------------------------+-------------------------------------------------+---------------+
| Variable | Definition | Default |
+===============================+=================================================+===============+
| ``INFLUXDB_SCHEME`` | The URL request scheme for making HTTP requests | ``https`` |
+-------------------------------+-------------------------------------------------+---------------+
| ``INFLUXDB_HOST`` | The InfluxDB server hostname | ``localhost`` |
+-------------------------------+-------------------------------------------------+---------------+
| ``INFLUXDB_PORT`` | The InfluxDB server port | ``8086`` |
+-------------------------------+-------------------------------------------------+---------------+
| ``INFLUXDB_USER`` | The InfluxDB server username | |
+-------------------------------+-------------------------------------------------+---------------+
| ``INFLUXDB_PASSWORD`` | The InfluxDB server password | |
+-------------------------------+-------------------------------------------------+---------------+
| ``INFLUXDB_ENABLED`` | Set to ``false`` to disable InfluxDB support | ``true`` |
+-------------------------------+-------------------------------------------------+---------------+
| ``INFLUXDB_INTERVAL`` | How often to submit measurements to InfluxDB in | ``5000`` |
| | milliseconds. | |
+-------------------------------+-------------------------------------------------+---------------+
| ``INFLUXDB_MAX_BATCH_SIZE`` | Max # of measurements to submit in a batch | ``5000`` |
+-------------------------------+-------------------------------------------------+---------------+
| ``INFLUXDB_MAX_BUFFER_SIZE`` | Limit of measurements in a buffer before new | ``20000`` |
| | measurements are discarded. | |
+-------------------------------+-------------------------------------------------+---------------+
Mixin Configuration
^^^^^^^^^^^^^^^^^^^
The ``sprockets_influxdb.InfluxDBMixin`` class will automatically tag measurements
with the application/service name if the ``SERVICE`` environment variable is set. It will
also tag the measurement if the ``ENVIRONMENT`` environment variable is set with the environment
that the application is running in. Finally, if you are using the
The ``sprockets_influxdb.InfluxDBMixin`` class will automatically tag the measurement if the
``ENVIRONMENT`` environment variable is set with the environment that the application is running
in. Finally, if you are using the
`Sprockets Correlation Mixin <https://github.com/sprockets/sprockets.mixins.correlation>`_,
measurements will automatically be tagged with the correlation ID for a request.
Example
-------
In the following example, a measurement is added to the ``example`` InfluxDB database

View file

@ -95,6 +95,7 @@ Configuration Methods
.. autofunction:: sprockets_influxdb.set_base_url
.. autofunction:: sprockets_influxdb.set_io_loop
.. autofunction:: sprockets_influxdb.set_max_batch_size
.. autofunction:: sprockets_influxdb.set_max_buffer_size
.. autofunction:: sprockets_influxdb.set_clients
.. autofunction:: sprockets_influxdb.set_submission_interval

View file

@ -3,6 +3,13 @@
Release History
===============
`1.3.0`_ (12 Oct 2016)
----------------------
- Add a flag to disable submission
- Add more environment variables for configuration
- Add a maximum buffer size that discards metrics when there are too many
- Remove correlation-id fields
`1.2.0`_ (23 Sep 2016)
----------------------
- Make the timestamp for a measurement something that can be overridden
@ -49,7 +56,9 @@ Release History
----------------------
- Initial release
.. _Next Release: https://github.com/sprockets/sprockets-influxdb/compare/1.1.0...master
.. _Next Release: https://github.com/sprockets/sprockets-influxdb/compare/1.3.0...master
.. _1.3.0: https://github.com/sprockets/sprockets-influxdb/compare/1.2.0...1.3.0
.. _1.2.0: https://github.com/sprockets/sprockets-influxdb/compare/1.1.0...1.2.0
.. _1.1.0: https://github.com/sprockets/sprockets-influxdb/compare/1.0.7...1.1.0
.. _1.0.7: https://github.com/sprockets/sprockets-influxdb/compare/1.0.6...1.0.7
.. _1.0.6: https://github.com/sprockets/sprockets-influxdb/compare/1.0.5...1.0.6

View file

@ -20,7 +20,7 @@ except ImportError: # pragma: no cover
logging.critical('Could not import Tornado')
concurrent, httpclient, ioloop = None, None, None
version_info = (1, 2, 0)
version_info = (1, 3, 0)
__version__ = '.'.join(str(v) for v in version_info)
__all__ = ['__version__', 'version_info', 'add_measurement', 'flush',
'install', 'shutdown', 'Measurement']
@ -39,14 +39,17 @@ except NameError: # Python 2.7 compatibility
_base_tags = {}
_base_url = 'http://localhost:8086/write'
_buffer_size = 0
_credentials = None, None
_dirty = False
_enabled = True
_http_client = None
_installed = False
_io_loop = None
_last_warning = None
_measurements = {}
_max_batch_size = 5000
_max_buffer_size = 20000
_max_clients = 10
_periodic_callback = None
_periodic_future = None
@ -80,27 +83,28 @@ class InfluxDBMixin(object):
application.settings[REQUEST_DATABASE],
application.settings.get('service', 'request'))
super(InfluxDBMixin, self).__init__(application, request, **kwargs)
handler = '{}.{}'.format(self.__module__, self.__class__.__name__)
self.influxdb.set_tags({'handler': handler, 'method': request.method})
for host, handlers in application.handlers:
if not host.match(request.host):
continue
for handler in handlers:
match = handler.regex.match(request.path)
if match:
self.influxdb.set_tag('endpoint',
handler.regex.pattern.rstrip('$'))
break
if _enabled:
handler = '{}.{}'.format(self.__module__, self.__class__.__name__)
self.influxdb.set_tags({'handler': handler,
'method': request.method})
for host, handlers in application.handlers:
if not host.match(request.host):
continue
for handler in handlers:
match = handler.regex.match(request.path)
if match:
self.influxdb.set_tag('endpoint',
handler.regex.pattern.rstrip('$'))
break
def on_finish(self):
self.influxdb.set_field('content_length',
int(self._headers.get('Content-Length', 0)))
if hasattr(self, 'correlation_id'):
self.influxdb.set_field('correlation_id', self.correlation_id)
self.influxdb.set_field('duration', self.request.request_time())
self.influxdb.set_tag('status_code', self._status_code)
self.influxdb.set_tag('remote_ip', self.request.remote_ip)
add_measurement(self.influxdb)
if _enabled:
self.influxdb.set_field(
'content_length', int(self._headers.get('Content-Length', 0)))
self.influxdb.set_field('duration', self.request.request_time())
self.influxdb.set_tag('status_code', self._status_code)
self.influxdb.set_tag('remote_ip', self.request.remote_ip)
add_measurement(self.influxdb)
def add_measurement(measurement):
@ -123,11 +127,18 @@ def add_measurement(measurement):
measurement to add to the buffer for submission to InfluxDB.
"""
if not _enabled:
LOGGER.warning('Discarding measurement for %s while not enabled',
measurement.database)
if _stopping:
LOGGER.warning('Discarding measurement for %s while stopping',
measurement.database)
return
if _buffer_size > _max_buffer_size:
LOGGER.warning('Discarding measurement due to buffer size limit')
return
if not measurement.fields:
raise ValueError('Measurement does not contain a field')
@ -160,8 +171,8 @@ def flush():
def install(url=None, auth_username=None, auth_password=None, io_loop=None,
submission_interval=5000, max_batch_size=1000, max_clients=10,
base_tags=None):
submission_interval=None, max_batch_size=None, max_clients=10,
base_tags=None, max_buffer_size=None):
"""Call this to install/setup the InfluxDB client collector. All arguments
are optional.
@ -186,6 +197,8 @@ def install(url=None, auth_username=None, auth_password=None, io_loop=None,
may be made at any given time. Default: ``10``
:param dict base_tags: Default tags that are to be submitted with each
measurement. Default: ``None``
:param int max_buffer_size: The maximum number of pending measurements
in the buffer before new measurements are discarded.
:returns: :data:`True` if the client was installed by this call
and :data:`False` otherwise.
@ -193,8 +206,14 @@ def install(url=None, auth_username=None, auth_password=None, io_loop=None,
be masked in the Python process.
"""
global _base_tags, _base_url, _credentials, _installed, _io_loop, \
_max_batch_size, _max_clients, _periodic_callback
global _base_tags, _base_url, _credentials, _enabled, _installed, \
_io_loop, _max_batch_size, _max_buffer_size, _max_clients, \
_periodic_callback
_enabled = os.environ.get('INFLUXDB_ENABLED', 'true') == 'true'
if not _enabled:
LOGGER.warning('Disabling InfluxDB support')
return
if _installed:
LOGGER.warning('InfluxDB client already installed')
@ -215,10 +234,16 @@ def install(url=None, auth_username=None, auth_password=None, io_loop=None,
# Submission related values
_io_loop = io_loop or ioloop.IOLoop.current()
_max_batch_size = max_batch_size
interval = submission_interval or \
int(os.environ.get('INFLUXDB_INTERVAL', 5000))
_max_batch_size = max_batch_size or \
int(os.environ.get('INFLUXDB_MAX_BATCH_SIZE', 5000))
_max_clients = max_clients
_max_buffer_size = max_buffer_size or \
int(os.environ.get('INFLUXDB_MAX_BUFFER_SIZE', 20000))
_periodic_callback = ioloop.PeriodicCallback(
_on_periodic_callback, submission_interval, _io_loop)
_on_periodic_callback, interval, _io_loop)
# Set the base tags
_base_tags.setdefault('hostname', socket.gethostname())
@ -295,6 +320,19 @@ def set_max_batch_size(limit):
_max_batch_size = limit
def set_max_buffer_size(limit):
"""Set the maximum number of pending measurements allowed in the buffer
before new measurements are discarded.
:param int limit: The maximum number of measurements per batch
"""
global _max_buffer_size
LOGGER.debug('Setting maximum buffer size to %i', limit)
_max_buffer_size = limit
def set_max_clients(limit):
"""Set the maximum number of simultaneous batch submission that can execute
in parallel.
@ -433,14 +471,15 @@ def _maybe_warn_about_buffer_size():
a warning has not been issued for more than 60 seconds.
"""
global _last_warning
global _buffer_size, _last_warning
if not _last_warning:
_last_warning = time.time()
count = _pending_measurements()
if count > _warn_threshold and (time.time() - _last_warning) > 60:
LOGGER.warning('InfluxDB measurement buffer has %i entries', count)
_buffer_size = _pending_measurements()
if _buffer_size > _warn_threshold and (time.time() - _last_warning) > 120:
LOGGER.warning('InfluxDB measurement buffer has %i entries',
_buffer_size)
def _on_5xx_error(batch, error, database, measurements):