mirror of
https://github.com/sprockets/sprockets-influxdb.git
synced 2024-11-15 03:00:24 +00:00
commit
5e8dbcdc50
5 changed files with 35 additions and 64 deletions
|
@ -1 +1 @@
|
|||
tornado>4.0,<5
|
||||
tornado>4.0,<6
|
||||
|
|
|
@ -62,7 +62,6 @@ _dirty = False
|
|||
_enabled = True
|
||||
_http_client = None
|
||||
_installed = False
|
||||
_io_loop = None
|
||||
_last_warning = None
|
||||
_measurements = {}
|
||||
_max_batch_size = 10000
|
||||
|
@ -222,7 +221,7 @@ def flush():
|
|||
:rtype: :class:`~tornado.concurrent.Future`
|
||||
|
||||
"""
|
||||
flush_future = concurrent.TracebackFuture()
|
||||
flush_future = concurrent.Future()
|
||||
if _batch_future and not _batch_future.done():
|
||||
LOGGER.debug('Flush waiting on incomplete _batch_future')
|
||||
_flush_wait(flush_future, _batch_future)
|
||||
|
@ -233,7 +232,7 @@ def flush():
|
|||
return flush_future
|
||||
|
||||
|
||||
def install(url=None, auth_username=None, auth_password=None, io_loop=None,
|
||||
def install(url=None, auth_username=None, auth_password=None,
|
||||
submission_interval=None, max_batch_size=None, max_clients=10,
|
||||
base_tags=None, max_buffer_size=None, trigger_size=None,
|
||||
sample_probability=1.0):
|
||||
|
@ -250,9 +249,6 @@ def install(url=None, auth_username=None, auth_password=None, io_loop=None,
|
|||
:param str auth_password: A password to use for InfluxDB authentication. If
|
||||
not specified, the ``INFLUXDB_PASSWORD`` environment variable will
|
||||
be used. Default: ``None``
|
||||
:param io_loop: A :class:`~tornado.ioloop.IOLoop` to use instead of the
|
||||
version returned by :meth:`~tornado.ioloop.IOLoop.current`
|
||||
:type io_loop: :class:`tornado.ioloop.IOLoop`
|
||||
:param int submission_interval: The maximum number of milliseconds to wait
|
||||
after the last batch submission before submitting a batch that is
|
||||
smaller than ``trigger_size``. Default: ``60000``
|
||||
|
@ -277,7 +273,7 @@ def install(url=None, auth_username=None, auth_password=None, io_loop=None,
|
|||
|
||||
"""
|
||||
global _base_tags, _base_url, _credentials, _enabled, _installed, \
|
||||
_io_loop, _max_batch_size, _max_buffer_size, _max_clients, \
|
||||
_max_batch_size, _max_buffer_size, _max_clients, \
|
||||
_sample_probability, _timeout, _timeout_interval, _trigger_size
|
||||
|
||||
_enabled = os.environ.get('INFLUXDB_ENABLED', 'true') == 'true'
|
||||
|
@ -303,7 +299,6 @@ def install(url=None, auth_username=None, auth_password=None, io_loop=None,
|
|||
'X' * len(os.environ['INFLUXDB_PASSWORD'])
|
||||
|
||||
# Submission related values
|
||||
_io_loop = io_loop or ioloop.IOLoop.current()
|
||||
_timeout_interval = submission_interval or \
|
||||
int(os.environ.get('INFLUXDB_INTERVAL', _timeout_interval))
|
||||
_max_batch_size = max_batch_size or \
|
||||
|
@ -365,23 +360,6 @@ def set_base_url(url):
|
|||
_dirty = True
|
||||
|
||||
|
||||
def set_io_loop(io_loop):
|
||||
"""Override the use of the default IOLoop.
|
||||
|
||||
:param tornado.ioloop.IOLoop io_loop: The IOLoop to use
|
||||
:raises: ValueError
|
||||
|
||||
"""
|
||||
global _dirty, _io_loop
|
||||
|
||||
if not isinstance(io_loop, ioloop.IOLoop):
|
||||
raise ValueError('Invalid io_loop value')
|
||||
|
||||
LOGGER.debug('Overriding the default IOLoop, using %r', io_loop)
|
||||
_dirty = True
|
||||
_io_loop = io_loop
|
||||
|
||||
|
||||
def set_max_batch_size(limit):
|
||||
"""Set a limit to the number of measurements that are submitted in
|
||||
a single batch that is submitted per databases.
|
||||
|
@ -454,7 +432,7 @@ def set_timeout(milliseconds):
|
|||
LOGGER.debug('Setting batch wait timeout to %i ms', milliseconds)
|
||||
_timeout_interval = milliseconds
|
||||
_maybe_stop_timeout()
|
||||
_timeout = _io_loop.add_timeout(milliseconds, _on_timeout)
|
||||
_timeout = ioloop.IOLoop.current().add_timeout(milliseconds, _on_timeout)
|
||||
|
||||
|
||||
def set_trigger_size(limit):
|
||||
|
@ -502,7 +480,7 @@ def _create_http_client():
|
|||
defaults['auth_password'] = auth_password
|
||||
|
||||
_http_client = httpclient.AsyncHTTPClient(
|
||||
force_instance=True, defaults=defaults, io_loop=_io_loop,
|
||||
force_instance=True, defaults=defaults,
|
||||
max_clients=_max_clients)
|
||||
|
||||
|
||||
|
@ -522,8 +500,9 @@ def _flush_wait(flush_future, write_future):
|
|||
return
|
||||
else:
|
||||
write_future = _write_measurements()
|
||||
_io_loop.add_timeout(
|
||||
_io_loop.time() + 0.25, _flush_wait, flush_future, write_future)
|
||||
ioloop.IOLoop.current().add_timeout(
|
||||
ioloop.IOLoop.current().time() + 0.25,
|
||||
_flush_wait, flush_future, write_future)
|
||||
|
||||
|
||||
def _futures_wait(wait_future, futures):
|
||||
|
@ -564,13 +543,14 @@ def _futures_wait(wait_future, futures):
|
|||
|
||||
# If there are futures that remain, try again in 100ms.
|
||||
if remaining:
|
||||
return _io_loop.add_timeout(
|
||||
_io_loop.time() + 0.1, _futures_wait, wait_future, remaining)
|
||||
return ioloop.IOLoop.current().add_timeout(
|
||||
ioloop.IOLoop.current().time() + 0.1,
|
||||
_futures_wait, wait_future, remaining)
|
||||
else: # Start the next timeout or trigger the next batch
|
||||
_buffer_size = _pending_measurements()
|
||||
LOGGER.debug('Batch submitted, %i measurements remain', _buffer_size)
|
||||
if _buffer_size >= _trigger_size:
|
||||
_io_loop.add_callback(_trigger_batch_write)
|
||||
ioloop.IOLoop.current().add_callback(_trigger_batch_write)
|
||||
elif _buffer_size:
|
||||
_start_timeout()
|
||||
|
||||
|
@ -587,7 +567,7 @@ def _maybe_stop_timeout():
|
|||
|
||||
if _timeout is not None:
|
||||
LOGGER.debug('Removing the pending timeout (%r)', _timeout)
|
||||
_io_loop.remove_timeout(_timeout)
|
||||
ioloop.IOLoop.current().remove_timeout(_timeout)
|
||||
_timeout = None
|
||||
|
||||
|
||||
|
@ -670,8 +650,9 @@ def _start_timeout():
|
|||
|
||||
LOGGER.debug('Adding a new timeout in %i ms', _timeout_interval)
|
||||
_maybe_stop_timeout()
|
||||
_timeout = _io_loop.add_timeout(
|
||||
_io_loop.time() + _timeout_interval / 1000.0, _on_timeout)
|
||||
_timeout = ioloop.IOLoop.current().add_timeout(
|
||||
ioloop.IOLoop.current().time() + _timeout_interval / 1000.0,
|
||||
_on_timeout)
|
||||
|
||||
|
||||
def _trigger_batch_write():
|
||||
|
@ -696,7 +677,7 @@ def _write_measurements():
|
|||
"""
|
||||
global _timeout, _writing
|
||||
|
||||
future = concurrent.TracebackFuture()
|
||||
future = concurrent.Future()
|
||||
|
||||
if _writing:
|
||||
LOGGER.warning('Currently writing measurements, skipping write')
|
||||
|
@ -771,8 +752,10 @@ def _write_error_batch(batch, database, measurements):
|
|||
url, method='POST', body=measurement.encode('utf-8'))
|
||||
|
||||
# Check in 25ms to see if it's done
|
||||
_io_loop.add_timeout(_io_loop.time() + 0.025, _write_error_batch_wait,
|
||||
future, batch, database, measurement, measurements)
|
||||
ioloop.IOLoop.current().add_timeout(
|
||||
ioloop.IOLoop.current().time() + 0.025,
|
||||
_write_error_batch_wait, future, batch, database, measurement,
|
||||
measurements)
|
||||
|
||||
|
||||
def _write_error_batch_wait(future, batch, database, measurement,
|
||||
|
@ -792,9 +775,10 @@ def _write_error_batch_wait(future, batch, database, measurement,
|
|||
|
||||
"""
|
||||
if not future.done():
|
||||
_io_loop.add_timeout(_io_loop.time() + 0.025, _write_error_batch_wait,
|
||||
future, batch, database, measurement,
|
||||
measurements)
|
||||
ioloop.IOLoop.current().add_timeout(
|
||||
ioloop.IOLoop.current().time() + 0.025,
|
||||
_write_error_batch_wait, future, batch, database, measurement,
|
||||
measurements)
|
||||
return
|
||||
|
||||
error = future.exception()
|
||||
|
|
|
@ -38,7 +38,6 @@ def clear_influxdb_module():
|
|||
influxdb._dirty = False
|
||||
influxdb._http_client = None
|
||||
influxdb._installed = False
|
||||
influxdb._io_loop = None
|
||||
influxdb._last_warning = None
|
||||
influxdb._measurements = {}
|
||||
influxdb._max_batch_size = 5000
|
||||
|
|
|
@ -41,10 +41,6 @@ class InstallDefaultsTestCase(base.TestCase):
|
|||
self.assertEqual(influxdb._http_client.defaults['user_agent'],
|
||||
influxdb.USER_AGENT)
|
||||
|
||||
def test_set_io_loop(self):
|
||||
global_io_loop = ioloop.IOLoop.current()
|
||||
self.assertEqual(influxdb._io_loop, global_io_loop)
|
||||
|
||||
def test_set_submission_interval(self):
|
||||
self.assertEqual(influxdb._timeout_interval, 60000)
|
||||
|
||||
|
@ -103,21 +99,6 @@ class SetConfigurationTestCase(base.AsyncTestCase):
|
|||
self.assertEqual(influxdb._base_url, expectation)
|
||||
self.assertTrue(influxdb._dirty)
|
||||
|
||||
def test_set_io_loop_invalid_raises(self):
|
||||
influxdb.install()
|
||||
with self.assertRaises(ValueError):
|
||||
influxdb.set_io_loop('bad value')
|
||||
|
||||
def test_set_io_loop(self):
|
||||
influxdb.install()
|
||||
previous = influxdb._io_loop
|
||||
io_loop = self.get_new_ioloop()
|
||||
|
||||
influxdb.set_io_loop(io_loop)
|
||||
self.assertEqual(influxdb._io_loop, io_loop)
|
||||
self.assertNotEqual(io_loop, previous)
|
||||
self.assertTrue(influxdb._dirty)
|
||||
|
||||
def test_set_max_batch_size(self):
|
||||
influxdb.install()
|
||||
expectation = random.randint(1000, 100000)
|
||||
|
@ -133,8 +114,7 @@ class SetConfigurationTestCase(base.AsyncTestCase):
|
|||
|
||||
@testing.gen_test()
|
||||
def test_set_timeout(self):
|
||||
io_loop = self.get_new_ioloop()
|
||||
influxdb.install(io_loop=io_loop)
|
||||
influxdb.install()
|
||||
expectation = random.randint(1000, 10000)
|
||||
influxdb.set_timeout(expectation)
|
||||
self.assertEqual(influxdb._timeout_interval, expectation)
|
||||
|
|
8
tox.ini
8
tox.ini
|
@ -2,6 +2,7 @@
|
|||
envlist =
|
||||
tornado42
|
||||
tornado45
|
||||
tornado51
|
||||
|
||||
[testenv]
|
||||
setenv =
|
||||
|
@ -22,3 +23,10 @@ deps =
|
|||
mock
|
||||
nose
|
||||
tornado==4.5.3
|
||||
|
||||
[testenv:tornado51]
|
||||
deps =
|
||||
coverage
|
||||
mock
|
||||
nose
|
||||
tornado==5.1.1
|
||||
|
|
Loading…
Reference in a new issue