Always uses IOLoop.current()

This commit is contained in:
Andrew Rabert 2018-11-21 23:21:38 -05:00
parent 09a203df47
commit d9a1e83675
3 changed files with 24 additions and 61 deletions

View file

@ -62,7 +62,6 @@ _dirty = False
_enabled = True
_http_client = None
_installed = False
_io_loop = None
_last_warning = None
_measurements = {}
_max_batch_size = 10000
@ -233,7 +232,7 @@ def flush():
return flush_future
def install(url=None, auth_username=None, auth_password=None, io_loop=None,
def install(url=None, auth_username=None, auth_password=None,
submission_interval=None, max_batch_size=None, max_clients=10,
base_tags=None, max_buffer_size=None, trigger_size=None,
sample_probability=1.0):
@ -250,9 +249,6 @@ def install(url=None, auth_username=None, auth_password=None, io_loop=None,
:param str auth_password: A password to use for InfluxDB authentication. If
not specified, the ``INFLUXDB_PASSWORD`` environment variable will
be used. Default: ``None``
:param io_loop: A :class:`~tornado.ioloop.IOLoop` to use instead of the
version returned by :meth:`~tornado.ioloop.IOLoop.current`
:type io_loop: :class:`tornado.ioloop.IOLoop`
:param int submission_interval: The maximum number of milliseconds to wait
after the last batch submission before submitting a batch that is
smaller than ``trigger_size``. Default: ``60000``
@ -277,7 +273,7 @@ def install(url=None, auth_username=None, auth_password=None, io_loop=None,
"""
global _base_tags, _base_url, _credentials, _enabled, _installed, \
_io_loop, _max_batch_size, _max_buffer_size, _max_clients, \
_max_batch_size, _max_buffer_size, _max_clients, \
_sample_probability, _timeout, _timeout_interval, _trigger_size
_enabled = os.environ.get('INFLUXDB_ENABLED', 'true') == 'true'
@ -303,7 +299,6 @@ def install(url=None, auth_username=None, auth_password=None, io_loop=None,
'X' * len(os.environ['INFLUXDB_PASSWORD'])
# Submission related values
_io_loop = io_loop or ioloop.IOLoop.current()
_timeout_interval = submission_interval or \
int(os.environ.get('INFLUXDB_INTERVAL', _timeout_interval))
_max_batch_size = max_batch_size or \
@ -365,23 +360,6 @@ def set_base_url(url):
_dirty = True
def set_io_loop(io_loop):
"""Override the use of the default IOLoop.
:param tornado.ioloop.IOLoop io_loop: The IOLoop to use
:raises: ValueError
"""
global _dirty, _io_loop
if not isinstance(io_loop, ioloop.IOLoop):
raise ValueError('Invalid io_loop value')
LOGGER.debug('Overriding the default IOLoop, using %r', io_loop)
_dirty = True
_io_loop = io_loop
def set_max_batch_size(limit):
"""Set a limit to the number of measurements that are submitted in
a single batch that is submitted per databases.
@ -454,7 +432,7 @@ def set_timeout(milliseconds):
LOGGER.debug('Setting batch wait timeout to %i ms', milliseconds)
_timeout_interval = milliseconds
_maybe_stop_timeout()
_timeout = _io_loop.add_timeout(milliseconds, _on_timeout)
_timeout = ioloop.IOLoop.current().add_timeout(milliseconds, _on_timeout)
def set_trigger_size(limit):
@ -502,7 +480,7 @@ def _create_http_client():
defaults['auth_password'] = auth_password
_http_client = httpclient.AsyncHTTPClient(
force_instance=True, defaults=defaults, io_loop=_io_loop,
force_instance=True, defaults=defaults,
max_clients=_max_clients)
@ -522,8 +500,9 @@ def _flush_wait(flush_future, write_future):
return
else:
write_future = _write_measurements()
_io_loop.add_timeout(
_io_loop.time() + 0.25, _flush_wait, flush_future, write_future)
ioloop.IOLoop.current().add_timeout(
ioloop.IOLoop.current().time() + 0.25,
_flush_wait, flush_future, write_future)
def _futures_wait(wait_future, futures):
@ -564,13 +543,14 @@ def _futures_wait(wait_future, futures):
# If there are futures that remain, try again in 100ms.
if remaining:
return _io_loop.add_timeout(
_io_loop.time() + 0.1, _futures_wait, wait_future, remaining)
return ioloop.IOLoop.current().add_timeout(
ioloop.IOLoop.current().time() + 0.1,
_futures_wait, wait_future, remaining)
else: # Start the next timeout or trigger the next batch
_buffer_size = _pending_measurements()
LOGGER.debug('Batch submitted, %i measurements remain', _buffer_size)
if _buffer_size >= _trigger_size:
_io_loop.add_callback(_trigger_batch_write)
ioloop.IOLoop.current().add_callback(_trigger_batch_write)
elif _buffer_size:
_start_timeout()
@ -587,7 +567,7 @@ def _maybe_stop_timeout():
if _timeout is not None:
LOGGER.debug('Removing the pending timeout (%r)', _timeout)
_io_loop.remove_timeout(_timeout)
ioloop.IOLoop.current().remove_timeout(_timeout)
_timeout = None
@ -670,8 +650,9 @@ def _start_timeout():
LOGGER.debug('Adding a new timeout in %i ms', _timeout_interval)
_maybe_stop_timeout()
_timeout = _io_loop.add_timeout(
_io_loop.time() + _timeout_interval / 1000.0, _on_timeout)
_timeout = ioloop.IOLoop.current().add_timeout(
ioloop.IOLoop.current().time() + _timeout_interval / 1000.0,
_on_timeout)
def _trigger_batch_write():
@ -771,8 +752,10 @@ def _write_error_batch(batch, database, measurements):
url, method='POST', body=measurement.encode('utf-8'))
# Check in 25ms to see if it's done
_io_loop.add_timeout(_io_loop.time() + 0.025, _write_error_batch_wait,
future, batch, database, measurement, measurements)
ioloop.IOLoop.current().add_timeout(
ioloop.IOLoop.current().time() + 0.025,
_write_error_batch_wait, future, batch, database, measurement,
measurements)
def _write_error_batch_wait(future, batch, database, measurement,
@ -792,9 +775,10 @@ def _write_error_batch_wait(future, batch, database, measurement,
"""
if not future.done():
_io_loop.add_timeout(_io_loop.time() + 0.025, _write_error_batch_wait,
future, batch, database, measurement,
measurements)
ioloop.IOLoop.current().add_timeout(
ioloop.IOLoop.current().time() + 0.025,
_write_error_batch_wait, future, batch, database, measurement,
measurements)
return
error = future.exception()

View file

@ -38,7 +38,6 @@ def clear_influxdb_module():
influxdb._dirty = False
influxdb._http_client = None
influxdb._installed = False
influxdb._io_loop = None
influxdb._last_warning = None
influxdb._measurements = {}
influxdb._max_batch_size = 5000

View file

@ -41,10 +41,6 @@ class InstallDefaultsTestCase(base.TestCase):
self.assertEqual(influxdb._http_client.defaults['user_agent'],
influxdb.USER_AGENT)
def test_set_io_loop(self):
global_io_loop = ioloop.IOLoop.current()
self.assertEqual(influxdb._io_loop, global_io_loop)
def test_set_submission_interval(self):
self.assertEqual(influxdb._timeout_interval, 60000)
@ -103,21 +99,6 @@ class SetConfigurationTestCase(base.AsyncTestCase):
self.assertEqual(influxdb._base_url, expectation)
self.assertTrue(influxdb._dirty)
def test_set_io_loop_invalid_raises(self):
influxdb.install()
with self.assertRaises(ValueError):
influxdb.set_io_loop('bad value')
def test_set_io_loop(self):
influxdb.install()
previous = influxdb._io_loop
io_loop = self.get_new_ioloop()
influxdb.set_io_loop(io_loop)
self.assertEqual(influxdb._io_loop, io_loop)
self.assertNotEqual(io_loop, previous)
self.assertTrue(influxdb._dirty)
def test_set_max_batch_size(self):
influxdb.install()
expectation = random.randint(1000, 100000)
@ -133,8 +114,7 @@ class SetConfigurationTestCase(base.AsyncTestCase):
@testing.gen_test()
def test_set_timeout(self):
io_loop = self.get_new_ioloop()
influxdb.install(io_loop=io_loop)
influxdb.install()
expectation = random.randint(1000, 10000)
influxdb.set_timeout(expectation)
self.assertEqual(influxdb._timeout_interval, expectation)