Submit measurements one at a time for a rejected batch

Better edge case exception handling
This commit is contained in:
Gavin M. Roy 2016-09-23 11:32:28 -04:00
parent 1affb16eb6
commit ead27c3942
4 changed files with 124 additions and 32 deletions

View file

@ -3,6 +3,10 @@
Release History Release History
=============== ===============
`1.1.0`_ (23 Sep 2016)
----------------------
- Submit measurements one at a time for a rejected batch, logging error responses
`1.0.7`_ (14 Sep 2016) `1.0.7`_ (14 Sep 2016)
---------------------- ----------------------
- Have a default content length for responses without one - Have a default content length for responses without one
@ -24,7 +28,6 @@ Release History
- Change the precision to second precision, per the InfluxDB docs (use the most - Change the precision to second precision, per the InfluxDB docs (use the most
coarse precision for better compression) coarse precision for better compression)
`1.0.3`_ (13 Sep 2016) `1.0.3`_ (13 Sep 2016)
---------------------- ----------------------
- Add a response ``content_length`` field, an ``accept`` tag (if set in request - Add a response ``content_length`` field, an ``accept`` tag (if set in request
@ -42,6 +45,8 @@ Release History
---------------------- ----------------------
- Initial release - Initial release
.. _Next Release: https://github.com/sprockets/sprockets-influxdb/compare/1.1.0...master
.. _1.1.0: https://github.com/sprockets/sprockets-influxdb/compare/1.0.7...1.1.0
.. _1.0.7: https://github.com/sprockets/sprockets-influxdb/compare/1.0.6...1.0.7 .. _1.0.7: https://github.com/sprockets/sprockets-influxdb/compare/1.0.6...1.0.7
.. _1.0.6: https://github.com/sprockets/sprockets-influxdb/compare/1.0.5...1.0.6 .. _1.0.6: https://github.com/sprockets/sprockets-influxdb/compare/1.0.5...1.0.6
.. _1.0.5: https://github.com/sprockets/sprockets-influxdb/compare/1.0.4...1.0.5 .. _1.0.5: https://github.com/sprockets/sprockets-influxdb/compare/1.0.4...1.0.5
@ -50,4 +55,3 @@ Release History
.. _1.0.2: https://github.com/sprockets/sprockets-influxdb/compare/1.0.1...1.0.2 .. _1.0.2: https://github.com/sprockets/sprockets-influxdb/compare/1.0.1...1.0.2
.. _1.0.1: https://github.com/sprockets/sprockets-influxdb/compare/1.0.0...1.0.1 .. _1.0.1: https://github.com/sprockets/sprockets-influxdb/compare/1.0.0...1.0.1
.. _1.0.0: https://github.com/sprockets/sprockets-influxdb/compare/0.0.0...1.0.0 .. _1.0.0: https://github.com/sprockets/sprockets-influxdb/compare/0.0.0...1.0.0
.. _Next Release: https://github.com/sprockets/sprockets-influxdb/compare/1.0.7...master

View file

@ -11,3 +11,4 @@ exclude = env,build
cover-branches = 1 cover-branches = 1
cover-erase = 1 cover-erase = 1
cover-package = sprockets_influxdb cover-package = sprockets_influxdb
verbose = 1

View file

@ -8,8 +8,11 @@ RequestHandler mixin.
import contextlib import contextlib
import logging import logging
import os import os
import select
import socket import socket
import ssl
import time import time
import uuid
try: try:
from tornado import concurrent, httpclient, ioloop from tornado import concurrent, httpclient, ioloop
@ -17,7 +20,7 @@ except ImportError: # pragma: no cover
logging.critical('Could not import Tornado') logging.critical('Could not import Tornado')
concurrent, httpclient, ioloop = None, None, None concurrent, httpclient, ioloop = None, None, None
version_info = (1, 0, 7) version_info = (1, 1, 0)
__version__ = '.'.join(str(v) for v in version_info) __version__ = '.'.join(str(v) for v in version_info)
__all__ = ['__version__', 'version_info', 'add_measurement', 'flush', __all__ = ['__version__', 'version_info', 'add_measurement', 'flush',
'install', 'shutdown', 'Measurement'] 'install', 'shutdown', 'Measurement']
@ -387,21 +390,27 @@ def _futures_wait(wait_future, futures):
global _writing global _writing
remaining = [] remaining = []
for (future, database, measurements) in futures: for (future, batch, database, measurements) in futures:
# If the future hasn't completed, add it to the remaining stack # If the future hasn't completed, add it to the remaining stack
if not future.done(): if not future.done():
remaining.append((future, database, measurements)) remaining.append((future, batch, database, measurements))
continue continue
# Get the result of the HTTP request, processing any errors # Get the result of the HTTP request, processing any errors
try: error = future.exception()
result = future.result() if isinstance(error, httpclient.HTTPError):
except (httpclient.HTTPError, OSError, socket.error) as error: if error.code == 400:
_on_request_error(error, database, measurements) _write_error_batch(batch, database, measurements)
elif error.code >= 500:
_on_5xx_error(batch, error, database, measurements)
else: else:
if result.code >= 400: LOGGER.error('Error submitting %s batch %s to InfluxDB (%s): '
_on_request_error(result.code, database, measurements) '%s', database, batch, error.code,
error.response.body)
elif isinstance(error, (TimeoutError, OSError, socket.error,
select.error, ssl.socket_error)):
_on_5xx_error(batch, error, database, measurements)
# If there are futures that remain, try again in 100ms. # If there are futures that remain, try again in 100ms.
if remaining: if remaining:
@ -427,6 +436,21 @@ def _maybe_warn_about_buffer_size():
LOGGER.warning('InfluxDB measurement buffer has %i entries', count) LOGGER.warning('InfluxDB measurement buffer has %i entries', count)
def _on_5xx_error(batch, error, database, measurements):
"""Handle a batch submission error, logging the problem and adding the
measurements back to the stack.
:param str batch: The batch ID
:param mixed error: The error that was returned
:param str database: The database the submission failed for
:param list measurements: The measurements to add back to the stack
"""
LOGGER.info('Appending %s measurements to stack due to batch %s %r',
database, batch, error)
_measurements[database] = _measurements[database] + measurements
def _on_periodic_callback(): def _on_periodic_callback():
"""Invoked periodically to ensure that metrics that have been collected """Invoked periodically to ensure that metrics that have been collected
are submitted to InfluxDB. If metrics are still being written when it are submitted to InfluxDB. If metrics are still being written when it
@ -446,19 +470,6 @@ def _on_periodic_callback():
return _periodic_future return _periodic_future
def _on_request_error(error, database, measurements):
"""Handle a batch submission error, logging the problem and adding the
measurements back to the stack.
:param mixed error: The error that was returned
:param str database: The database the submission failed for
:param list measurements: The measurements to add back to the stack
"""
LOGGER.error('Error submitting batch to %s: %r', database, error)
_measurements[database] = measurements + _measurements[database]
def _pending_measurements(): def _pending_measurements():
"""Return the number of measurements that have not been submitted to """Return the number of measurements that have not been submitted to
InfluxDB. InfluxDB.
@ -512,7 +523,7 @@ def _write_measurements():
url, method='POST', body='\n'.join(measurements).encode('utf-8')) url, method='POST', body='\n'.join(measurements).encode('utf-8'))
# Keep track of each request in our future stack # Keep track of each request in our future stack
futures.append((request, database, measurements)) futures.append((request, str(uuid.uuid4()), database, measurements))
# Start the wait cycle for all the requests to complete # Start the wait cycle for all the requests to complete
_writing = True _writing = True
@ -521,6 +532,87 @@ def _write_measurements():
return future return future
def _write_error_batch(batch, database, measurements):
"""Invoked when a batch submission fails, this method will submit one
measurement to InfluxDB. It then adds a timeout to the IOLoop which will
invoke :meth:`_write_error_batch_wait` which will evaluate the result and
then determine what to do next.
:param str batch: The batch ID for correlation purposes
:param str database: The database name for the measurements
:param list measurements: The measurements that failed to write as a batch
"""
if not measurements:
LOGGER.info('All %s measurements from batch %s processed',
database, batch)
return
LOGGER.debug('Processing batch %s for %s by measurement, %i left',
batch, database, len(measurements))
url = '{}?db={}&precision=ms'.format(_base_url, database)
measurement = measurements.pop(0)
# Create the request future
future = _http_client.fetch(
url, method='POST', body=measurement.encode('utf-8'))
# Check in 25ms to see if it's done
_io_loop.add_timeout(_io_loop.time() + 0.025, _write_error_batch_wait,
future, batch, database, measurement, measurements)
def _write_error_batch_wait(future, batch, database, measurement, measurements):
"""Invoked by the IOLoop, this method checks if the HTTP request future
created by :meth:`_write_error_batch` is done. If it's done it will
evaluate the result, logging any error and moving on to the next
measurement. If there are no measurements left in the `measurements`
argument, it will consider the batch complete.
:param tornado.concurrent.Future future: The AsyncHTTPClient request future
:param str batch: The batch ID
:param str database: The database name for the measurements
:param str measurement: The measurement the future is for
:param list measurements: The measurements that failed to write as a batch
"""
if not future.done():
_io_loop.add_timeout(_io_loop.time() + 0.025, _write_error_batch_wait,
future, batch, database, measurement,
measurements)
return
error = future.exception()
if isinstance(error, httpclient.HTTPError):
if error.code == 400:
LOGGER.error('Error writing %s measurement from batch %s to '
'InfluxDB (%s): %s', database, batch, error.code,
error.response.body)
LOGGER.info('Bad %s measurement from batch %s: %s',
database, batch, measurement)
else:
LOGGER.error('Error submitting individual metric for %s from batch '
'%s to InfluxDB (%s): %s', database, batch, error.code)
measurements = measurements + [measurement]
elif isinstance(error, (TimeoutError, OSError, socket.error,
select.error, ssl.socket_error)):
LOGGER.error('Error submitting individual metric for %s from batch '
'%s to InfluxDB (%s)', database, batch, error)
_write_error_batch(batch, database, measurements + [measurement])
measurements = measurements + [measurement]
if not measurements:
LOGGER.info('All %s measurements from batch %s processed',
database, batch)
return
# Continue writing measurements
_write_error_batch(batch, database, measurements)
class Measurement(object): class Measurement(object):
"""The :class:`Measurement` class represents what will become a single row """The :class:`Measurement` class represents what will become a single row
in an InfluxDB database. Measurements are added to InfluxDB via the in an InfluxDB database. Measurements are added to InfluxDB via the

View file

@ -1,7 +1,6 @@
import base64 import base64
import random import random
import mock import mock
import time
import uuid import uuid
from tornado import concurrent, gen, httpclient from tornado import concurrent, gen, httpclient
@ -110,11 +109,7 @@ class MeasurementTestCase(base.AsyncServerTestCase):
with mock.patch('tornado.httpclient.AsyncHTTPClient.fetch') as fetch: with mock.patch('tornado.httpclient.AsyncHTTPClient.fetch') as fetch:
future = concurrent.Future() future = concurrent.Future()
fetch.return_value = future fetch.return_value = future
request = httpclient.HTTPRequest('http://localhost/write?db') future.set_exception(httpclient.HTTPError(599, 'TestError'))
future.set_result(
httpclient.HTTPResponse(
request, 599, error=OSError(),
request_time=time.time() - request.start_time))
self.flush() self.flush()
self.assertEqual(influxdb._pending_measurements(), 1) self.assertEqual(influxdb._pending_measurements(), 1)
self.flush() self.flush()