Merge pull request #2 from sprockets/2.0

Change batch submission logic
This commit is contained in:
dave-shawley 2016-11-09 16:12:41 -05:00 committed by GitHub
commit cb705b847c
10 changed files with 204 additions and 110 deletions

View file

@ -6,6 +6,7 @@ python:
- 3.5
install:
- pip install -r requires/testing.txt
- pip install wheel
script: nosetests --with-coverage
after_success:
- codecov

View file

@ -27,31 +27,35 @@ documentation.
The following table details the environment variable configuration options.
+-------------------------------+--------------------------------------------------+---------------+
| Variable | Definition | Default |
+===============================+==================================================+===============+
| ``INFLUXDB_SCHEME`` | The URL request scheme for making HTTP requests | ``https`` |
+-------------------------------+--------------------------------------------------+---------------+
| ``INFLUXDB_HOST`` | The InfluxDB server hostname | ``localhost`` |
+-------------------------------+--------------------------------------------------+---------------+
| ``INFLUXDB_PORT`` | The InfluxDB server port | ``8086`` |
+-------------------------------+--------------------------------------------------+---------------+
| ``INFLUXDB_USER`` | The InfluxDB server username | |
+-------------------------------+--------------------------------------------------+---------------+
| ``INFLUXDB_PASSWORD`` | The InfluxDB server password | |
+-------------------------------+--------------------------------------------------+---------------+
| ``INFLUXDB_ENABLED`` | Set to ``false`` to disable InfluxDB support | ``true`` |
+-------------------------------+--------------------------------------------------+---------------+
| ``INFLUXDB_INTERVAL`` | How often to submit measurements to InfluxDB in | ``5000`` |
| | milliseconds. | |
+-------------------------------+--------------------------------------------------+---------------+
| ``INFLUXDB_MAX_BATCH_SIZE`` | Max # of measurements to submit in a batch | ``5000`` |
+-------------------------------+--------------------------------------------------+---------------+
| ``INFLUXDB_MAX_BUFFER_SIZE`` | Limit of measurements in a buffer before new | ``20000`` |
| | measurements are discarded. | |
+-------------------------------+--------------------------------------------------+---------------+
| ``INFLUXDB_TAG_HOSTNAME`` | Include the hostname as a tag in the measurement | ``true`` |
+-------------------------------+--------------------------------------------------+---------------+
+------------------------------+--------------------------------------------------+---------------+
| Variable | Definition | Default |
+==============================+==================================================+===============+
| ``INFLUXDB_SCHEME`` | The URL request scheme for making HTTP requests | ``https`` |
+------------------------------+--------------------------------------------------+---------------+
| ``INFLUXDB_HOST`` | The InfluxDB server hostname | ``localhost`` |
+------------------------------+--------------------------------------------------+---------------+
| ``INFLUXDB_PORT`` | The InfluxDB server port | ``8086`` |
+------------------------------+--------------------------------------------------+---------------+
| ``INFLUXDB_USER`` | The InfluxDB server username | |
+------------------------------+--------------------------------------------------+---------------+
| ``INFLUXDB_PASSWORD`` | The InfluxDB server password | |
+------------------------------+--------------------------------------------------+---------------+
| ``INFLUXDB_ENABLED`` | Set to ``false`` to disable InfluxDB support | ``true`` |
+------------------------------+--------------------------------------------------+---------------+
| ``INFLUXDB_INTERVAL`` | How many milliseconds to wait before submitting | ``60000`` |
| | measurements when the buffer has fewer than | |
| | ``INFLUXDB_TRIGGER_SIZE`` measurements. | |
+------------------------------+--------------------------------------------------+---------------+
| ``INFLUXDB_MAX_BATCH_SIZE`` | Max # of measurements to submit in a batch | ``10000`` |
+------------------------------+--------------------------------------------------+---------------+
| ``INFLUXDB_MAX_BUFFER_SIZE`` | Limit of measurements in a buffer before new | ``25000`` |
| | measurements are discarded. | |
+------------------------------+--------------------------------------------------+---------------+
| ``INFLUXDB_TRIGGER_SIZE`` | The number of metrics in the buffer to trigger | ``60000`` |
| | the submission of a batch. | |
+------------------------------+--------------------------------------------------+---------------+
| ``INFLUXDB_TAG_HOSTNAME`` | Include the hostname as a tag in the measurement | ``true`` |
+------------------------------+--------------------------------------------------+---------------+
Mixin Configuration
^^^^^^^^^^^^^^^^^^^

View file

@ -10,6 +10,15 @@ is added to the ``example`` InfluxDB database with the measurement name of
which calls :meth:`~sprockets_influxdb.shutdown`. :meth:`~sprockets_influxdb.shutdown`
ensures that all of the buffered metrics are written before the IOLoop is stopped.
Measurements will be sent in batches to InfluxDB when there are
``INFLUXDB_TRIGGER_SIZE`` measurements in the buffer or after ``INFLUXDB_INTERVAL``
milliseconds have passed since the last measurement was added,
which ever occurs first.
The timeout timer for submitting a buffer of < ``INFLUXDB_TRIGGER_SIZE``
measurements is only started when there isn't an active timer, there is not a
batch currently being written, and a measurement is added to the buffer.
.. code:: python
import logging
@ -97,7 +106,8 @@ Configuration Methods
.. autofunction:: sprockets_influxdb.set_max_batch_size
.. autofunction:: sprockets_influxdb.set_max_buffer_size
.. autofunction:: sprockets_influxdb.set_clients
.. autofunction:: sprockets_influxdb.set_submission_interval
.. autofunction:: sprockets_influxdb.set_timeout
.. autofunction:: sprockets_influxdb.set_trigger_size
Request Handler Mixin
---------------------

View file

@ -3,6 +3,10 @@
Release History
===============
`2.0.0`_ (09 Nov 2016)
----------------------
- Change the way the buffered submission of measurements is handled
`1.4.0`_ (12 Oct 2016)
----------------------
- Make the hostname tag optional
@ -60,7 +64,9 @@ Release History
----------------------
- Initial release
.. _Next Release: https://github.com/sprockets/sprockets-influxdb/compare/1.3.0...master
.. _Next Release: https://github.com/sprockets/sprockets-influxdb/compare/2.0.0...master
.. _2.0.0: https://github.com/sprockets/sprockets-influxdb/compare/1.4.0...2.0.0
.. _1.4.0: https://github.com/sprockets/sprockets-influxdb/compare/1.3.0...1.4.0
.. _1.3.0: https://github.com/sprockets/sprockets-influxdb/compare/1.2.0...1.3.0
.. _1.2.0: https://github.com/sprockets/sprockets-influxdb/compare/1.1.0...1.2.0
.. _1.1.0: https://github.com/sprockets/sprockets-influxdb/compare/1.0.7...1.1.0

View file

@ -1 +1 @@
tornado>=4.0,<4.3
tornado>4.0

View file

@ -11,4 +11,5 @@ exclude = env,build
cover-branches = 1
cover-erase = 1
cover-package = sprockets_influxdb
verbose = 1
verbosity = 2
with-coverage = 1

View file

@ -4,6 +4,15 @@ Sprockets InfluxDB
`sprockets_influxdb` includes both a buffering InfluxDB client and a Tornado
RequestHandler mixin.
Measurements will be sent in batches to InfluxDB when there are
``INFLUXDB_TRIGGER_SIZE`` measurements in the buffer or after
``INFLUXDB_INTERVAL`` milliseconds have passed since the last measurement was
added, which ever occurs first.
The timeout timer for submitting a buffer of < ``INFLUXDB_TRIGGER_SIZE``
measurements is only started when there isn't an active timer, there is not a
batch currently being written, and a measurement is added to the buffer.
"""
import contextlib
import logging
@ -20,7 +29,7 @@ except ImportError: # pragma: no cover
logging.critical('Could not import Tornado')
concurrent, httpclient, ioloop = None, None, None
version_info = (1, 4, 0)
version_info = (2, 0, 0)
__version__ = '.'.join(str(v) for v in version_info)
__all__ = ['__version__', 'version_info', 'add_measurement', 'flush',
'install', 'shutdown', 'Measurement']
@ -39,6 +48,7 @@ except NameError: # Python 2.7 compatibility
_base_tags = {}
_base_url = 'http://localhost:8086/write'
_batch_future = None
_buffer_size = 0
_credentials = None, None
_dirty = False
@ -48,13 +58,14 @@ _installed = False
_io_loop = None
_last_warning = None
_measurements = {}
_max_batch_size = 5000
_max_buffer_size = 20000
_max_batch_size = 10000
_max_buffer_size = 25000
_max_clients = 10
_periodic_callback = None
_periodic_future = None
_stopping = False
_warn_threshold = 5000
_timeout_interval = 60000
_timeout = None
_trigger_size = 5000
_warn_threshold = 15000
_writing = False
@ -93,8 +104,8 @@ class InfluxDBMixin(object):
for handler in handlers:
match = handler.regex.match(request.path)
if match:
self.influxdb.set_tag('endpoint',
handler.regex.pattern.rstrip('$'))
self.influxdb.set_tag(
'endpoint', handler.regex.pattern.rstrip('$'))
break
def on_finish(self):
@ -127,6 +138,8 @@ def add_measurement(measurement):
measurement to add to the buffer for submission to InfluxDB.
"""
global _buffer_size
if not _enabled:
LOGGER.debug('Discarding measurement for %s while not enabled',
measurement.database)
@ -149,7 +162,16 @@ def add_measurement(measurement):
value = measurement.marshall()
_measurements[measurement.database].append(value)
_maybe_warn_about_buffer_size()
# Ensure that len(measurements) < _trigger_size are written
if not _timeout:
if (_batch_future and _batch_future.done()) or not _batch_future:
_start_timeout()
# Check to see if the batch should be triggered
_buffer_size = _pending_measurements()
if _buffer_size >= _trigger_size:
_trigger_batch_write()
def flush():
@ -161,20 +183,20 @@ def flush():
:rtype: :class:`~tornado.concurrent.Future`
"""
LOGGER.debug('Flushing')
flush_future = concurrent.TracebackFuture()
if _periodic_future and not _periodic_future.done():
LOGGER.debug('Waiting on _periodic_future instead')
write_future = _periodic_future
if _batch_future and not _batch_future.done():
LOGGER.debug('Flush waiting on incomplete _batch_future')
_flush_wait(flush_future, _batch_future)
else:
write_future = _write_measurements()
_flush_wait(flush_future, write_future)
LOGGER.info('Flushing buffer with %i measurements to InfluxDB',
_pending_measurements())
_flush_wait(flush_future, _write_measurements())
return flush_future
def install(url=None, auth_username=None, auth_password=None, io_loop=None,
submission_interval=None, max_batch_size=None, max_clients=10,
base_tags=None, max_buffer_size=None):
base_tags=None, max_buffer_size=None, trigger_size=None):
"""Call this to install/setup the InfluxDB client collector. All arguments
are optional.
@ -191,16 +213,20 @@ def install(url=None, auth_username=None, auth_password=None, io_loop=None,
:param io_loop: A :class:`~tornado.ioloop.IOLoop` to use instead of the
version returned by :meth:`~tornado.ioloop.IOLoop.current`
:type io_loop: :class:`tornado.ioloop.IOLoop`
:param int submission_interval: How often to submit metric batches in
milliseconds. Default: ``5000``
:param int submission_interval: The maximum number of milliseconds to wait
after the last batch submission before submitting a batch that is
smaller than ``trigger_size``. Default: ``60000``
:param int max_batch_size: The number of measurements to be submitted in a
single HTTP request. Default: ``1000``
single HTTP request. Default: ``10000``
:param int max_clients: The number of simultaneous batch submissions that
may be made at any given time. Default: ``10``
:param dict base_tags: Default tags that are to be submitted with each
measurement. Default: ``None``
:param int max_buffer_size: The maximum number of pending measurements
in the buffer before new measurements are discarded.
Default: ``25000``
:param int trigger_size: The minimum number of measurements that
are in the buffer before a batch can be submitted. Default: ``5000``
:returns: :data:`True` if the client was installed by this call
and :data:`False` otherwise.
@ -210,7 +236,7 @@ def install(url=None, auth_username=None, auth_password=None, io_loop=None,
"""
global _base_tags, _base_url, _credentials, _enabled, _installed, \
_io_loop, _max_batch_size, _max_buffer_size, _max_clients, \
_periodic_callback
_timeout, _timeout_interval, _trigger_size
_enabled = os.environ.get('INFLUXDB_ENABLED', 'true') == 'true'
if not _enabled:
@ -236,16 +262,15 @@ def install(url=None, auth_username=None, auth_password=None, io_loop=None,
# Submission related values
_io_loop = io_loop or ioloop.IOLoop.current()
interval = submission_interval or \
int(os.environ.get('INFLUXDB_INTERVAL', 5000))
_timeout_interval = submission_interval or \
int(os.environ.get('INFLUXDB_INTERVAL', _timeout_interval))
_max_batch_size = max_batch_size or \
int(os.environ.get('INFLUXDB_MAX_BATCH_SIZE', 5000))
int(os.environ.get('INFLUXDB_MAX_BATCH_SIZE', _max_batch_size))
_max_clients = max_clients
_max_buffer_size = max_buffer_size or \
int(os.environ.get('INFLUXDB_MAX_BUFFER_SIZE', 20000))
_periodic_callback = ioloop.PeriodicCallback(
_on_periodic_callback, interval, _io_loop)
int(os.environ.get('INFLUXDB_MAX_BUFFER_SIZE', _max_buffer_size))
_trigger_size = trigger_size or \
int(os.environ.get('INFLUXDB_TRIGGER_SIZE', _trigger_size))
# Set the base tags
if os.environ.get('INFLUXDB_TAG_HOSTNAME', 'true') == 'true':
@ -254,12 +279,12 @@ def install(url=None, auth_username=None, auth_password=None, io_loop=None,
_base_tags.setdefault('environment', os.environ['ENVIRONMENT'])
_base_tags.update(base_tags or {})
# Start the periodic callback on IOLoop start
_io_loop.add_callback(_periodic_callback.start)
# Don't let this run multiple times
_installed = True
LOGGER.info('sprockets_influxdb v%s installed; %i measurements or %.2f '
'seconds will trigger batch submission', __version__,
_trigger_size, _timeout_interval / 1000.0)
return True
@ -350,21 +375,32 @@ def set_max_clients(limit):
_max_clients = limit
def set_submission_interval(seconds):
"""Override how often to submit measurements to InfluxDB.
def set_timeout(milliseconds):
"""Override the maximum duration to wait for submitting measurements to
InfluxDB.
:param int seconds: How often to wait in seconds
:param int milliseconds: Maximum wait in milliseconds
"""
global _periodic_callback
global _timeout, _timeout_interval
LOGGER.debug('Setting submission interval to %s seconds', seconds)
if _periodic_callback.is_running():
_periodic_callback.stop()
_periodic_callback = ioloop.PeriodicCallback(_on_periodic_callback,
seconds, _io_loop)
# Start the periodic callback on IOLoop start if it's not already started
_io_loop.add_callback(_periodic_callback.start)
LOGGER.debug('Setting batch wait timeout to %i ms', milliseconds)
_timeout_interval = milliseconds
_maybe_stop_timeout()
_timeout = _io_loop.add_timeout(milliseconds, _on_timeout)
def set_trigger_size(limit):
"""Set the number of pending measurements that trigger the writing of data
to InfluxDB
:param int limit: The minimum number of measurements to trigger a batch
"""
global _trigger_size
LOGGER.debug('Setting trigger buffer size to %i', limit)
_trigger_size = limit
def shutdown():
@ -384,10 +420,7 @@ def shutdown():
return
_stopping = True
if _periodic_callback.is_running():
_periodic_callback.stop()
LOGGER.info('Stopped periodic measurement submission and writing current '
'buffer to InfluxDB')
_maybe_stop_timeout()
return flush()
@ -410,8 +443,10 @@ def _flush_wait(flush_future, write_future):
"""Pause briefly allowing any pending metric writes to complete before
shutting down.
:param future tornado.concurrent.TracebackFuture: The future to resolve
:param tornado.concurrent.Future flush_future: The future to resolve
when the shutdown is complete.
:param tornado.concurrent.Future write_future: The future that is for the
current batch write operation.
"""
if write_future.done():
@ -435,7 +470,7 @@ def _futures_wait(wait_future, futures):
:param list futures: The list of futures to watch for completion
"""
global _writing
global _buffer_size, _writing
remaining = []
for (future, batch, database, measurements) in futures:
@ -464,22 +499,41 @@ def _futures_wait(wait_future, futures):
if remaining:
return _io_loop.add_timeout(
_io_loop.time() + 0.1, _futures_wait, wait_future, remaining)
else: # Start the next timeout or trigger the next batch
_buffer_size = _pending_measurements()
LOGGER.debug('Batch submitted, %i measurements remain', _buffer_size)
if _buffer_size >= _trigger_size:
_io_loop.add_callback(_trigger_batch_write)
elif _buffer_size:
_start_timeout()
_writing = False
wait_future.set_result(True)
def _maybe_stop_timeout():
"""If there is a pending timeout, remove it from the IOLoop and set the
``_timeout`` global to None.
"""
global _timeout
if _timeout is not None:
LOGGER.debug('Removing the pending timeout (%r)', _timeout)
_io_loop.remove_timeout(_timeout)
_timeout = None
def _maybe_warn_about_buffer_size():
"""Check the buffer size and issue a warning if it's too large and
a warning has not been issued for more than 60 seconds.
"""
global _buffer_size, _last_warning
global _last_warning
if not _last_warning:
_last_warning = time.time()
_buffer_size = _pending_measurements()
if _buffer_size > _warn_threshold and (time.time() - _last_warning) > 120:
LOGGER.warning('InfluxDB measurement buffer has %i entries',
_buffer_size)
@ -500,23 +554,21 @@ def _on_5xx_error(batch, error, database, measurements):
_measurements[database] = _measurements[database] + measurements
def _on_periodic_callback():
def _on_timeout():
"""Invoked periodically to ensure that metrics that have been collected
are submitted to InfluxDB. If metrics are still being written when it
is invoked, pass until the next time.
are submitted to InfluxDB.
:rtype: tornado.concurrent.Future
:rtype: tornado.concurrent.Future or None
"""
global _periodic_future
global _buffer_size
if isinstance(_periodic_future, concurrent.Future) \
and not _periodic_future.done():
LOGGER.warning('Metrics are currently being written, '
'skipping write interval')
return
_periodic_future = _write_measurements()
return _periodic_future
LOGGER.debug('No metrics submitted in the last %.2f seconds',
_timeout_interval / 1000.0)
_buffer_size = _pending_measurements()
if _buffer_size:
return _trigger_batch_write()
_start_timeout()
def _pending_measurements():
@ -529,6 +581,28 @@ def _pending_measurements():
return sum([len(_measurements[dbname]) for dbname in _measurements])
def _start_timeout():
"""Stop a running timeout if it's there, then create a new one."""
global _timeout
LOGGER.debug('Adding a new timeout in %i ms', _timeout_interval)
_maybe_stop_timeout()
_timeout = _io_loop.add_timeout(
_io_loop.time() + _timeout_interval / 1000.0, _on_timeout)
def _trigger_batch_write():
"""Stop a timeout if it's running, and then write the measurements."""
global _batch_future
LOGGER.debug('Batch write triggered (%r/%r)',
_buffer_size, _trigger_size)
_maybe_stop_timeout()
_maybe_warn_about_buffer_size()
_batch_future = _write_measurements()
return _batch_future
def _write_measurements():
"""Write out all of the metrics in each of the databases,
returning a future that will indicate all metrics have been written
@ -537,7 +611,7 @@ def _write_measurements():
:rtype: tornado.concurrent.Future
"""
global _writing
global _timeout, _writing
future = concurrent.TracebackFuture()
@ -568,6 +642,8 @@ def _write_measurements():
_measurements[database] = _measurements[database][_max_batch_size:]
# Create the request future
LOGGER.debug('Submitting %r measurements to %r',
len(measurements), url)
request = _http_client.fetch(
url, method='POST', body='\n'.join(measurements).encode('utf-8'))
@ -613,7 +689,8 @@ def _write_error_batch(batch, database, measurements):
future, batch, database, measurement, measurements)
def _write_error_batch_wait(future, batch, database, measurement, measurements):
def _write_error_batch_wait(future, batch, database, measurement,
measurements):
"""Invoked by the IOLoop, this method checks if the HTTP request future
created by :meth:`_write_error_batch` is done. If it's done it will
evaluate the result, logging any error and moving on to the next
@ -643,8 +720,9 @@ def _write_error_batch_wait(future, batch, database, measurement, measurements):
LOGGER.info('Bad %s measurement from batch %s: %s',
database, batch, measurement)
else:
LOGGER.error('Error submitting individual metric for %s from batch '
'%s to InfluxDB (%s): %s', database, batch, error.code)
LOGGER.error('Error submitting individual metric for %s from '
'batch %s to InfluxDB (%s): %s',
database, batch, error.code)
measurements = measurements + [measurement]
elif isinstance(error, (TimeoutError, OSError, socket.error,
select.error, ssl.socket_error)):

View file

@ -43,8 +43,7 @@ def clear_influxdb_module():
influxdb._measurements = {}
influxdb._max_batch_size = 5000
influxdb._max_clients = 10
influxdb._periodic_callback = None
influxdb._periodic_future = None
influxdb._timeout = None
influxdb._stopping = False
influxdb._warn_threshold = 5000
influxdb._writing = False
@ -89,7 +88,7 @@ class AsyncServerTestCase(testing.AsyncHTTPTestCase):
sprockets.clients.influxdb are flushed to the server.
"""
future = influxdb._on_periodic_callback()
future = influxdb._on_timeout()
if future:
self.io_loop.add_future(future, self.stop)
self.wait()

View file

@ -131,7 +131,7 @@ class MeasurementTestCase(base.AsyncServerTestCase):
future = concurrent.Future()
future.set_exception(OSError())
fetch.return_value = future
influxdb._on_periodic_callback()
influxdb._on_timeout()
self.assertEqual(influxdb._pending_measurements(), 1)
self.flush()
result = self.get_measurement()
@ -147,8 +147,8 @@ class MeasurementTestCase(base.AsyncServerTestCase):
measurement.set_field('test', test_value)
influxdb.add_measurement(measurement)
self.assertEqual(influxdb._pending_measurements(), 1)
future = influxdb._on_periodic_callback()
self.assertIsNone(influxdb._on_periodic_callback())
future = influxdb._on_timeout()
self.assertIsNone(influxdb._on_timeout())
self.assertEqual(influxdb._pending_measurements(), 0)
self.io_loop.add_future(future, self.stop)
self.wait()
@ -165,7 +165,7 @@ class MeasurementTestCase(base.AsyncServerTestCase):
measurement.set_field('test', test_value)
influxdb.add_measurement(measurement)
self.assertEqual(influxdb._pending_measurements(), 1)
future = influxdb._on_periodic_callback()
future = influxdb._on_timeout()
second_write = influxdb._write_measurements()
self.assertTrue(concurrent.is_future(second_write))
self.assertTrue(second_write.done())

View file

@ -46,9 +46,7 @@ class InstallDefaultsTestCase(base.TestCase):
self.assertEqual(influxdb._io_loop, global_io_loop)
def test_set_submission_interval(self):
expectation = 5000
self.assertEqual(influxdb._periodic_callback.callback_time,
expectation)
self.assertEqual(influxdb._timeout_interval, 60000)
class InstallCredentialsTestCase(base.TestCase):
@ -134,15 +132,12 @@ class SetConfigurationTestCase(base.AsyncTestCase):
self.assertTrue(influxdb._dirty)
@testing.gen_test()
def test_set_submission_interval(self):
def test_set_timeout(self):
io_loop = self.get_new_ioloop()
influxdb.install(io_loop=io_loop)
expectation = random.randint(1000, 10000)
previous = influxdb._periodic_callback
influxdb.set_submission_interval(expectation)
self.assertEqual(influxdb._periodic_callback.callback_time,
expectation)
self.assertNotEqual(influxdb._periodic_callback, previous)
influxdb.set_timeout(expectation)
self.assertEqual(influxdb._timeout_interval, expectation)
class MeasurementTests(unittest.TestCase):