Change batch submission logic

- Submit batches serially while the count of buffered measurements exceed a configurable threshold (minimum triggered batch size). Batches can be larger than this value (maximum batch size)
- If the there are measurements in the buffer but the threshold is not met after N seconds (60 by default), submit a batch that is less than minimum triggered batch size.
- rename sprockets_influxdb.set_submission_interval to sprockets_influxdb.set_timeout
- Add new sprockets_influxdb.set_trigger_size
- Move from a single periodic callback object to adding a timeout to the ioloop
- Update tests to work with the new structure
- Add more detailed dubugging information
This commit is contained in:
Gavin M. Roy 2016-11-09 15:50:27 -05:00
parent 6d639b436d
commit f671b7029c
4 changed files with 146 additions and 77 deletions

View file

@ -4,6 +4,15 @@ Sprockets InfluxDB
`sprockets_influxdb` includes both a buffering InfluxDB client and a Tornado
RequestHandler mixin.
Measurements will be sent in batches to InfluxDB when there are
``INFLUXDB_TRIGGER_SIZE`` measurements in the buffer or after
``INFLUXDB_INTERVAL`` milliseconds have passed since the last measurement was
added, which ever occurs first.
The timeout timer for submitting a buffer of < ``INFLUXDB_TRIGGER_SIZE``
measurements is only started when there isn't an active timer, there is not a
batch currently being written, and a measurement is added to the buffer.
"""
import contextlib
import logging
@ -20,7 +29,7 @@ except ImportError: # pragma: no cover
logging.critical('Could not import Tornado')
concurrent, httpclient, ioloop = None, None, None
version_info = (1, 4, 0)
version_info = (2, 0, 0)
__version__ = '.'.join(str(v) for v in version_info)
__all__ = ['__version__', 'version_info', 'add_measurement', 'flush',
'install', 'shutdown', 'Measurement']
@ -39,6 +48,7 @@ except NameError: # Python 2.7 compatibility
_base_tags = {}
_base_url = 'http://localhost:8086/write'
_batch_future = None
_buffer_size = 0
_credentials = None, None
_dirty = False
@ -48,13 +58,14 @@ _installed = False
_io_loop = None
_last_warning = None
_measurements = {}
_max_batch_size = 5000
_max_buffer_size = 20000
_max_batch_size = 10000
_max_buffer_size = 25000
_max_clients = 10
_periodic_callback = None
_periodic_future = None
_stopping = False
_warn_threshold = 5000
_timeout_interval = 60000
_timeout = None
_trigger_size = 5000
_warn_threshold = 15000
_writing = False
@ -93,8 +104,8 @@ class InfluxDBMixin(object):
for handler in handlers:
match = handler.regex.match(request.path)
if match:
self.influxdb.set_tag('endpoint',
handler.regex.pattern.rstrip('$'))
self.influxdb.set_tag(
'endpoint', handler.regex.pattern.rstrip('$'))
break
def on_finish(self):
@ -127,6 +138,8 @@ def add_measurement(measurement):
measurement to add to the buffer for submission to InfluxDB.
"""
global _buffer_size
if not _enabled:
LOGGER.debug('Discarding measurement for %s while not enabled',
measurement.database)
@ -149,7 +162,16 @@ def add_measurement(measurement):
value = measurement.marshall()
_measurements[measurement.database].append(value)
_maybe_warn_about_buffer_size()
# Ensure that len(measurements) < _trigger_size are written
if not _timeout:
if (_batch_future and _batch_future.done()) or not _batch_future:
_start_timeout()
# Check to see if the batch should be triggered
_buffer_size = _pending_measurements()
if _buffer_size >= _trigger_size:
_trigger_batch_write()
def flush():
@ -161,20 +183,20 @@ def flush():
:rtype: :class:`~tornado.concurrent.Future`
"""
LOGGER.debug('Flushing')
flush_future = concurrent.TracebackFuture()
if _periodic_future and not _periodic_future.done():
LOGGER.debug('Waiting on _periodic_future instead')
write_future = _periodic_future
if _batch_future and not _batch_future.done():
LOGGER.debug('Flush waiting on incomplete _batch_future')
_flush_wait(flush_future, _batch_future)
else:
write_future = _write_measurements()
_flush_wait(flush_future, write_future)
LOGGER.info('Flushing buffer with %i measurements to InfluxDB',
_pending_measurements())
_flush_wait(flush_future, _write_measurements())
return flush_future
def install(url=None, auth_username=None, auth_password=None, io_loop=None,
submission_interval=None, max_batch_size=None, max_clients=10,
base_tags=None, max_buffer_size=None):
base_tags=None, max_buffer_size=None, trigger_size=None):
"""Call this to install/setup the InfluxDB client collector. All arguments
are optional.
@ -191,8 +213,9 @@ def install(url=None, auth_username=None, auth_password=None, io_loop=None,
:param io_loop: A :class:`~tornado.ioloop.IOLoop` to use instead of the
version returned by :meth:`~tornado.ioloop.IOLoop.current`
:type io_loop: :class:`tornado.ioloop.IOLoop`
:param int submission_interval: How often to submit metric batches in
milliseconds. Default: ``5000``
:param int submission_interval: The maximum number of milliseconds to wait
after the last batch submission before submitting a batch that is
smaller than ``trigger_size``. Default: ``60000``
:param int max_batch_size: The number of measurements to be submitted in a
single HTTP request. Default: ``1000``
:param int max_clients: The number of simultaneous batch submissions that
@ -201,6 +224,8 @@ def install(url=None, auth_username=None, auth_password=None, io_loop=None,
measurement. Default: ``None``
:param int max_buffer_size: The maximum number of pending measurements
in the buffer before new measurements are discarded.
:param int trigger_size: The minimum number of measurements that
are in the buffer before a batch can be submitted.
:returns: :data:`True` if the client was installed by this call
and :data:`False` otherwise.
@ -210,7 +235,7 @@ def install(url=None, auth_username=None, auth_password=None, io_loop=None,
"""
global _base_tags, _base_url, _credentials, _enabled, _installed, \
_io_loop, _max_batch_size, _max_buffer_size, _max_clients, \
_periodic_callback
_timeout, _timeout_interval, _trigger_size
_enabled = os.environ.get('INFLUXDB_ENABLED', 'true') == 'true'
if not _enabled:
@ -236,16 +261,15 @@ def install(url=None, auth_username=None, auth_password=None, io_loop=None,
# Submission related values
_io_loop = io_loop or ioloop.IOLoop.current()
interval = submission_interval or \
int(os.environ.get('INFLUXDB_INTERVAL', 5000))
_timeout_interval = submission_interval or \
int(os.environ.get('INFLUXDB_INTERVAL', _timeout_interval))
_max_batch_size = max_batch_size or \
int(os.environ.get('INFLUXDB_MAX_BATCH_SIZE', 5000))
int(os.environ.get('INFLUXDB_MAX_BATCH_SIZE', _max_batch_size))
_max_clients = max_clients
_max_buffer_size = max_buffer_size or \
int(os.environ.get('INFLUXDB_MAX_BUFFER_SIZE', 20000))
_periodic_callback = ioloop.PeriodicCallback(
_on_periodic_callback, interval, _io_loop)
int(os.environ.get('INFLUXDB_MAX_BUFFER_SIZE', _max_buffer_size))
_trigger_size = trigger_size or \
int(os.environ.get('INFLUXDB_TRIGGER_SIZE', _trigger_size))
# Set the base tags
if os.environ.get('INFLUXDB_TAG_HOSTNAME', 'true') == 'true':
@ -254,12 +278,12 @@ def install(url=None, auth_username=None, auth_password=None, io_loop=None,
_base_tags.setdefault('environment', os.environ['ENVIRONMENT'])
_base_tags.update(base_tags or {})
# Start the periodic callback on IOLoop start
_io_loop.add_callback(_periodic_callback.start)
# Don't let this run multiple times
_installed = True
LOGGER.info('sprockets_influxdb v%s installed; %i measurements or %.2f '
'seconds will trigger batch submission', __version__,
_trigger_size, _timeout_interval / 1000.0)
return True
@ -350,21 +374,32 @@ def set_max_clients(limit):
_max_clients = limit
def set_submission_interval(seconds):
"""Override how often to submit measurements to InfluxDB.
def set_timeout(milliseconds):
"""Override the maximum duration to wait for submitting measurements to
InfluxDB.
:param int seconds: How often to wait in seconds
:param int milliseconds: Maximum wait in milliseconds
"""
global _periodic_callback
global _timeout, _timeout_interval
LOGGER.debug('Setting submission interval to %s seconds', seconds)
if _periodic_callback.is_running():
_periodic_callback.stop()
_periodic_callback = ioloop.PeriodicCallback(_on_periodic_callback,
seconds, _io_loop)
# Start the periodic callback on IOLoop start if it's not already started
_io_loop.add_callback(_periodic_callback.start)
LOGGER.debug('Setting batch wait timeout to %i ms', milliseconds)
_timeout_interval = milliseconds
_maybe_stop_timeout()
_timeout = _io_loop.add_timeout(milliseconds, _on_timeout)
def set_trigger_size(limit):
"""Set the number of pending measurements that trigger the writing of data
to InfluxDB
:param int limit: The minimum number of measurements to trigger a batch
"""
global _trigger_size
LOGGER.debug('Setting trigger buffer size to %i', limit)
_trigger_size = limit
def shutdown():
@ -384,10 +419,7 @@ def shutdown():
return
_stopping = True
if _periodic_callback.is_running():
_periodic_callback.stop()
LOGGER.info('Stopped periodic measurement submission and writing current '
'buffer to InfluxDB')
_maybe_stop_timeout()
return flush()
@ -410,8 +442,10 @@ def _flush_wait(flush_future, write_future):
"""Pause briefly allowing any pending metric writes to complete before
shutting down.
:param future tornado.concurrent.TracebackFuture: The future to resolve
:param tornado.concurrent.Future flush_future: The future to resolve
when the shutdown is complete.
:param tornado.concurrent.Future write_future: The future that is for the
current batch write operation.
"""
if write_future.done():
@ -435,7 +469,7 @@ def _futures_wait(wait_future, futures):
:param list futures: The list of futures to watch for completion
"""
global _writing
global _buffer_size, _writing
remaining = []
for (future, batch, database, measurements) in futures:
@ -464,22 +498,41 @@ def _futures_wait(wait_future, futures):
if remaining:
return _io_loop.add_timeout(
_io_loop.time() + 0.1, _futures_wait, wait_future, remaining)
else: # Start the next timeout or trigger the next batch
_buffer_size = _pending_measurements()
LOGGER.debug('Batch submitted, %i measurements remain', _buffer_size)
if _buffer_size >= _trigger_size:
_io_loop.add_callback(_trigger_batch_write)
elif _buffer_size:
_start_timeout()
_writing = False
wait_future.set_result(True)
def _maybe_stop_timeout():
"""If there is a pending timeout, remove it from the IOLoop and set the
``_timeout`` global to None.
"""
global _timeout
if _timeout is not None:
LOGGER.debug('Removing the pending timeout (%r)', _timeout)
_io_loop.remove_timeout(_timeout)
_timeout = None
def _maybe_warn_about_buffer_size():
"""Check the buffer size and issue a warning if it's too large and
a warning has not been issued for more than 60 seconds.
"""
global _buffer_size, _last_warning
global _last_warning
if not _last_warning:
_last_warning = time.time()
_buffer_size = _pending_measurements()
if _buffer_size > _warn_threshold and (time.time() - _last_warning) > 120:
LOGGER.warning('InfluxDB measurement buffer has %i entries',
_buffer_size)
@ -500,23 +553,21 @@ def _on_5xx_error(batch, error, database, measurements):
_measurements[database] = _measurements[database] + measurements
def _on_periodic_callback():
def _on_timeout():
"""Invoked periodically to ensure that metrics that have been collected
are submitted to InfluxDB. If metrics are still being written when it
is invoked, pass until the next time.
are submitted to InfluxDB.
:rtype: tornado.concurrent.Future
:rtype: tornado.concurrent.Future or None
"""
global _periodic_future
global _buffer_size
if isinstance(_periodic_future, concurrent.Future) \
and not _periodic_future.done():
LOGGER.warning('Metrics are currently being written, '
'skipping write interval')
return
_periodic_future = _write_measurements()
return _periodic_future
LOGGER.debug('No metrics submitted in the last %.2f seconds',
_timeout_interval / 1000.0)
_buffer_size = _pending_measurements()
if _buffer_size:
return _trigger_batch_write()
_start_timeout()
def _pending_measurements():
@ -529,6 +580,28 @@ def _pending_measurements():
return sum([len(_measurements[dbname]) for dbname in _measurements])
def _start_timeout():
"""Stop a running timeout if it's there, then create a new one."""
global _timeout
LOGGER.debug('Adding a new timeout in %i ms', _timeout_interval)
_maybe_stop_timeout()
_timeout = _io_loop.add_timeout(_io_loop.time() + _timeout_interval / 1000,
_on_timeout)
def _trigger_batch_write():
"""Stop a timeout if it's running, and then write the measurements."""
global _batch_future
LOGGER.debug('Batch write triggered (%r/%r)',
_buffer_size, _trigger_size)
_maybe_stop_timeout()
_maybe_warn_about_buffer_size()
_batch_future = _write_measurements()
return _batch_future
def _write_measurements():
"""Write out all of the metrics in each of the databases,
returning a future that will indicate all metrics have been written
@ -537,7 +610,7 @@ def _write_measurements():
:rtype: tornado.concurrent.Future
"""
global _writing
global _timeout, _writing
future = concurrent.TracebackFuture()
@ -568,6 +641,8 @@ def _write_measurements():
_measurements[database] = _measurements[database][_max_batch_size:]
# Create the request future
LOGGER.debug('Submitting %r measurements to %r',
len(measurements), url)
request = _http_client.fetch(
url, method='POST', body='\n'.join(measurements).encode('utf-8'))

View file

@ -43,8 +43,7 @@ def clear_influxdb_module():
influxdb._measurements = {}
influxdb._max_batch_size = 5000
influxdb._max_clients = 10
influxdb._periodic_callback = None
influxdb._periodic_future = None
influxdb._timeout = None
influxdb._stopping = False
influxdb._warn_threshold = 5000
influxdb._writing = False
@ -89,7 +88,7 @@ class AsyncServerTestCase(testing.AsyncHTTPTestCase):
sprockets.clients.influxdb are flushed to the server.
"""
future = influxdb._on_periodic_callback()
future = influxdb._on_timeout()
if future:
self.io_loop.add_future(future, self.stop)
self.wait()

View file

@ -131,7 +131,7 @@ class MeasurementTestCase(base.AsyncServerTestCase):
future = concurrent.Future()
future.set_exception(OSError())
fetch.return_value = future
influxdb._on_periodic_callback()
influxdb._on_timeout()
self.assertEqual(influxdb._pending_measurements(), 1)
self.flush()
result = self.get_measurement()
@ -147,8 +147,8 @@ class MeasurementTestCase(base.AsyncServerTestCase):
measurement.set_field('test', test_value)
influxdb.add_measurement(measurement)
self.assertEqual(influxdb._pending_measurements(), 1)
future = influxdb._on_periodic_callback()
self.assertIsNone(influxdb._on_periodic_callback())
future = influxdb._on_timeout()
self.assertIsNone(influxdb._on_timeout())
self.assertEqual(influxdb._pending_measurements(), 0)
self.io_loop.add_future(future, self.stop)
self.wait()
@ -165,7 +165,7 @@ class MeasurementTestCase(base.AsyncServerTestCase):
measurement.set_field('test', test_value)
influxdb.add_measurement(measurement)
self.assertEqual(influxdb._pending_measurements(), 1)
future = influxdb._on_periodic_callback()
future = influxdb._on_timeout()
second_write = influxdb._write_measurements()
self.assertTrue(concurrent.is_future(second_write))
self.assertTrue(second_write.done())

View file

@ -46,9 +46,7 @@ class InstallDefaultsTestCase(base.TestCase):
self.assertEqual(influxdb._io_loop, global_io_loop)
def test_set_submission_interval(self):
expectation = 5000
self.assertEqual(influxdb._periodic_callback.callback_time,
expectation)
self.assertEqual(influxdb._timeout_interval, 60000)
class InstallCredentialsTestCase(base.TestCase):
@ -134,15 +132,12 @@ class SetConfigurationTestCase(base.AsyncTestCase):
self.assertTrue(influxdb._dirty)
@testing.gen_test()
def test_set_submission_interval(self):
def test_set_timeout(self):
io_loop = self.get_new_ioloop()
influxdb.install(io_loop=io_loop)
expectation = random.randint(1000, 10000)
previous = influxdb._periodic_callback
influxdb.set_submission_interval(expectation)
self.assertEqual(influxdb._periodic_callback.callback_time,
expectation)
self.assertNotEqual(influxdb._periodic_callback, previous)
influxdb.set_timeout(expectation)
self.assertEqual(influxdb._timeout_interval, expectation)
class MeasurementTests(unittest.TestCase):