Add batch submission probability

This commit is contained in:
Gavin M. Roy 2017-04-05 14:01:08 -04:00
parent cb705b847c
commit b94f14bd66
4 changed files with 126 additions and 32 deletions

View file

@ -27,35 +27,39 @@ documentation.
The following table details the environment variable configuration options.
+------------------------------+--------------------------------------------------+---------------+
| Variable | Definition | Default |
+==============================+==================================================+===============+
| ``INFLUXDB_SCHEME`` | The URL request scheme for making HTTP requests | ``https`` |
+------------------------------+--------------------------------------------------+---------------+
| ``INFLUXDB_HOST`` | The InfluxDB server hostname | ``localhost`` |
+------------------------------+--------------------------------------------------+---------------+
| ``INFLUXDB_PORT`` | The InfluxDB server port | ``8086`` |
+------------------------------+--------------------------------------------------+---------------+
| ``INFLUXDB_USER`` | The InfluxDB server username | |
+------------------------------+--------------------------------------------------+---------------+
| ``INFLUXDB_PASSWORD`` | The InfluxDB server password | |
+------------------------------+--------------------------------------------------+---------------+
| ``INFLUXDB_ENABLED`` | Set to ``false`` to disable InfluxDB support | ``true`` |
+------------------------------+--------------------------------------------------+---------------+
| ``INFLUXDB_INTERVAL`` | How many milliseconds to wait before submitting | ``60000`` |
| | measurements when the buffer has fewer than | |
| | ``INFLUXDB_TRIGGER_SIZE`` measurements. | |
+------------------------------+--------------------------------------------------+---------------+
| ``INFLUXDB_MAX_BATCH_SIZE`` | Max # of measurements to submit in a batch | ``10000`` |
+------------------------------+--------------------------------------------------+---------------+
| ``INFLUXDB_MAX_BUFFER_SIZE`` | Limit of measurements in a buffer before new | ``25000`` |
| | measurements are discarded. | |
+------------------------------+--------------------------------------------------+---------------+
| ``INFLUXDB_TRIGGER_SIZE`` | The number of metrics in the buffer to trigger | ``60000`` |
| | the submission of a batch. | |
+------------------------------+--------------------------------------------------+---------------+
| ``INFLUXDB_TAG_HOSTNAME`` | Include the hostname as a tag in the measurement | ``true`` |
+------------------------------+--------------------------------------------------+---------------+
+---------------------------------+--------------------------------------------------+---------------+
| Variable | Definition | Default |
+=================================+==================================================+===============+
| ``INFLUXDB_SCHEME`` | The URL request scheme for making HTTP requests | ``https`` |
+---------------------------------+--------------------------------------------------+---------------+
| ``INFLUXDB_HOST`` | The InfluxDB server hostname | ``localhost`` |
+---------------------------------+--------------------------------------------------+---------------+
| ``INFLUXDB_PORT`` | The InfluxDB server port | ``8086`` |
+---------------------------------+--------------------------------------------------+---------------+
| ``INFLUXDB_USER`` | The InfluxDB server username | |
+---------------------------------+--------------------------------------------------+---------------+
| ``INFLUXDB_PASSWORD`` | The InfluxDB server password | |
+---------------------------------+--------------------------------------------------+---------------+
| ``INFLUXDB_ENABLED`` | Set to ``false`` to disable InfluxDB support | ``true`` |
+---------------------------------+--------------------------------------------------+---------------+
| ``INFLUXDB_INTERVAL`` | How many milliseconds to wait before submitting | ``60000`` |
| | measurements when the buffer has fewer than | |
| | ``INFLUXDB_TRIGGER_SIZE`` measurements. | |
+---------------------------------+--------------------------------------------------+---------------+
| ``INFLUXDB_MAX_BATCH_SIZE`` | Max # of measurements to submit in a batch | ``10000`` |
+---------------------------------+--------------------------------------------------+---------------+
| ``INFLUXDB_MAX_BUFFER_SIZE`` | Limit of measurements in a buffer before new | ``25000`` |
| | measurements are discarded. | |
+------------------------------+-----------------------------------------------------+---------------+
| ``INFLUXDB_SAMPLE_PROBABILITY`` | A value that is >= 0 and <= 1.0 that specifies | ``1.0`` |
| | the probability that a batch will be submitted | |
| | to InfluxDB or dropped. | |
+---------------------------------+--------------------------------------------------+---------------+
| ``INFLUXDB_TRIGGER_SIZE`` | The number of metrics in the buffer to trigger | ``60000`` |
| | the submission of a batch. | |
+---------------------------------+--------------------------------------------------+---------------+
| ``INFLUXDB_TAG_HOSTNAME`` | Include the hostname as a tag in the measurement | ``true`` |
+---------------------------------+--------------------------------------------------+---------------+
Mixin Configuration
^^^^^^^^^^^^^^^^^^^

View file

@ -17,6 +17,7 @@ batch currently being written, and a measurement is added to the buffer.
import contextlib
import logging
import os
import random
import select
import socket
import ssl
@ -29,7 +30,7 @@ except ImportError: # pragma: no cover
logging.critical('Could not import Tornado')
concurrent, httpclient, ioloop = None, None, None
version_info = (2, 0, 0)
version_info = (2, 1, 0)
__version__ = '.'.join(str(v) for v in version_info)
__all__ = ['__version__', 'version_info', 'add_measurement', 'flush',
'install', 'shutdown', 'Measurement']
@ -61,6 +62,7 @@ _measurements = {}
_max_batch_size = 10000
_max_buffer_size = 25000
_max_clients = 10
_sample_probability = 1.0
_stopping = False
_timeout_interval = 60000
_timeout = None
@ -196,7 +198,8 @@ def flush():
def install(url=None, auth_username=None, auth_password=None, io_loop=None,
submission_interval=None, max_batch_size=None, max_clients=10,
base_tags=None, max_buffer_size=None, trigger_size=None):
base_tags=None, max_buffer_size=None, trigger_size=None,
sample_probability=1.0):
"""Call this to install/setup the InfluxDB client collector. All arguments
are optional.
@ -227,6 +230,8 @@ def install(url=None, auth_username=None, auth_password=None, io_loop=None,
Default: ``25000``
:param int trigger_size: The minimum number of measurements that
are in the buffer before a batch can be submitted. Default: ``5000``
:param float sample_probability: Value between 0 and 1.0 specifying the
probability that a batch will be submitted (0.25 == 25%)
:returns: :data:`True` if the client was installed by this call
and :data:`False` otherwise.
@ -236,7 +241,7 @@ def install(url=None, auth_username=None, auth_password=None, io_loop=None,
"""
global _base_tags, _base_url, _credentials, _enabled, _installed, \
_io_loop, _max_batch_size, _max_buffer_size, _max_clients, \
_timeout, _timeout_interval, _trigger_size
_sample_probability, _timeout, _timeout_interval, _trigger_size
_enabled = os.environ.get('INFLUXDB_ENABLED', 'true') == 'true'
if not _enabled:
@ -269,6 +274,9 @@ def install(url=None, auth_username=None, auth_password=None, io_loop=None,
_max_clients = max_clients
_max_buffer_size = max_buffer_size or \
int(os.environ.get('INFLUXDB_MAX_BUFFER_SIZE', _max_buffer_size))
_sample_probability = sample_probability or \
float(os.environ.get('INFLUXDB_SAMPLE_PROBABILITY',
_sample_probability))
_trigger_size = trigger_size or \
int(os.environ.get('INFLUXDB_TRIGGER_SIZE', _trigger_size))
@ -279,6 +287,9 @@ def install(url=None, auth_username=None, auth_password=None, io_loop=None,
_base_tags.setdefault('environment', os.environ['ENVIRONMENT'])
_base_tags.update(base_tags or {})
# Seed the random number generator for batch sampling
random.seed()
# Don't let this run multiple times
_installed = True
@ -375,6 +386,25 @@ def set_max_clients(limit):
_max_clients = limit
def set_sample_probability(probability):
"""Set the probability that a batch will be submitted to the InfluxDB
server. This should be a value that is greater than or equal to ``0`` and
less than or equal to ``1.0``. A value of ``0.25`` would represent a
probability of 25% that a batch would be written to InfluxDB.
:param float probability: The value between 0 and 1.0 that represents the
probability that a batch will be submitted to the InfluxDB server.
"""
global _sample_probability
if not 0.0 <= probability <= 1.0:
raise ValueError('Invalid probability value')
LOGGER.debug('Setting sample probability to %.2f', probability)
_sample_probability = float(probability)
def set_timeout(milliseconds):
"""Override the maximum duration to wait for submitting measurements to
InfluxDB.
@ -581,6 +611,22 @@ def _pending_measurements():
return sum([len(_measurements[dbname]) for dbname in _measurements])
def _sample_batch():
"""Determine if a batch should be processed and if not, pop off all of
the pending metrics for that batch.
:rtype: bool
"""
if _sample_probability == 1.0 or random.random() < _sample_probability:
return True
# Pop off all the metrics for the batch
for database in _measurements:
_measurements[database] = _measurements[database][_max_batch_size:]
return False
def _start_timeout():
"""Stop a running timeout if it's there, then create a new one."""
global _timeout
@ -620,6 +666,9 @@ def _write_measurements():
future.set_result(False)
elif not _pending_measurements():
future.set_result(True)
elif not _sample_batch():
LOGGER.debug('Skipping batch submission due to sampling')
future.set_result(True)
# Exit early if there's an error condition
if future.done():

View file

@ -193,3 +193,32 @@ class MeasurementTestCase(base.AsyncServerTestCase):
self.flush()
value = self.get_measurement()
self.assertAlmostEqual(float(value.fields['duration-test']), 0.1, 1)
class SampleProbabilityTestCase(base.AsyncServerTestCase):
@staticmethod
def setup_batch():
influxdb.set_max_batch_size(100)
database = str(uuid.uuid4())
name = str(uuid.uuid4())
for iteration in range(0, 1000):
measurement = influxdb.Measurement(database, name)
measurement.set_field('test', random.randint(1000, 2000))
influxdb.add_measurement(measurement)
def test_sample_batch_false(self):
influxdb.set_sample_probability(0.0)
self.setup_batch()
self.assertEqual(influxdb._pending_measurements(), 1000)
result = influxdb._sample_batch()
self.assertFalse(result)
self.assertEqual(influxdb._pending_measurements(), 900)
def test_sample_batch_true(self):
influxdb.set_sample_probability(1.0)
self.setup_batch()
self.assertEqual(influxdb._pending_measurements(), 1000)
result = influxdb._sample_batch()
self.assertTrue(result)
self.assertEqual(influxdb._pending_measurements(), 1000)

View file

@ -139,6 +139,18 @@ class SetConfigurationTestCase(base.AsyncTestCase):
influxdb.set_timeout(expectation)
self.assertEqual(influxdb._timeout_interval, expectation)
def test_set_sample_probability(self):
influxdb.install()
expectation = random.random()
influxdb.set_sample_probability(expectation)
self.assertEqual(influxdb._sample_probability, expectation)
def test_set_invalid_sample_probability(self):
influxdb.install()
with self.assertRaises(ValueError):
influxdb.set_sample_probability(2.0)
influxdb.set_sample_probability(-1.0)
class MeasurementTests(unittest.TestCase):