Rework line protocol marshalling and more

- Rework how the line protocol is marshalled and support the various data types.
- Remove the accept tag
- Strip down content_type to the ``type/subtype`` format only
- Make ``correlation_id`` a field value and not tag
- Change the precision to second precision, per the InfluxDB docs (use the most
    coarse precision for better compression)
This commit is contained in:
Gavin M. Roy 2016-09-14 00:27:05 -04:00
parent 7fee1a1c83
commit b73679c57a
7 changed files with 128 additions and 91 deletions

View file

@ -3,6 +3,16 @@
Release History
===============
`1.0.4`_ (14 Sep 2016)
----------------------
- Rework how the line protocol is marshalled and support the various data types.
- Remove the accept tag
- Strip down content_type to the ``type/subtype`` format only
- Make ``correlation_id`` a field value and not tag
- Change the precision to second precision, per the InfluxDB docs (use the most
coarse precision for better compression)
`1.0.3`_ (13 Sep 2016)
----------------------
- Add a response ``content_length`` field, an ``accept`` tag (if set in request
@ -20,8 +30,9 @@ Release History
----------------------
- Initial release
.. _1.0.4: https://github.com/sprockets/sprockets-influxdb/compare/1.0.3...1.0.4
.. _1.0.3: https://github.com/sprockets/sprockets-influxdb/compare/1.0.2...1.0.3
.. _1.0.2: https://github.com/sprockets/sprockets-influxdb/compare/1.0.1...1.0.2
.. _1.0.1: https://github.com/sprockets/sprockets-influxdb/compare/1.0.0...1.0.1
.. _1.0.0: https://github.com/sprockets/sprockets-influxdb/compare/0.0.0...1.0.0
.. _Next Release: https://github.com/sprockets/sprockets-influxdb/compare/1.0.3...master
.. _Next Release: https://github.com/sprockets/sprockets-influxdb/compare/1.0.4...master

View file

@ -1 +1,2 @@
ietfparse>=1.3.0,<2
tornado>=4.0,<4.3

View file

@ -12,12 +12,13 @@ import socket
import time
try:
from ietfparse import headers
from tornado import concurrent, httpclient, ioloop
except ImportError: # pragma: no cover
logging.critical('Could not import Tornado')
concurrent, httpclient, ioloop = None, None, None
headers, concurrent, httpclient, ioloop = None, None, None, None
version_info = (1, 0, 3)
version_info = (1, 0, 4)
__version__ = '.'.join(str(v) for v in version_info)
__all__ = ['__version__', 'version_info', 'add_measurement', 'flush',
'install', 'shutdown', 'Measurement']
@ -72,9 +73,6 @@ class InfluxDBMixin(object):
super(InfluxDBMixin, self).__init__(application, request, **kwargs)
handler = '{}.{}'.format(self.__module__, self.__class__.__name__)
self.influxdb.set_tags({'handler': handler, 'method': request.method})
if request.headers.get('Accept'):
accept = request.headers.get('Accept').split(';')[0].strip()
self.influxdb.set_tag('accept', accept)
for host, handlers in application.handlers:
if not host.match(request.host):
continue
@ -86,18 +84,16 @@ class InfluxDBMixin(object):
break
def on_finish(self):
if hasattr(self, 'correlation_id'):
self.influxdb.set_tag('correlation_id', self.correlation_id)
self.influxdb.set_tag('status_code', self._status_code)
self.influxdb.set_field('duration', self.request.request_time())
self.influxdb.set_field('content_length',
int(self._headers['Content-Length']))
try:
ctype = self._headers['Content-Type'].decode('utf-8')
except (AttributeError, TypeError):
pass
else:
self.influxdb.set_tag('content_type', ctype.split(';')[0].strip())
if hasattr(self, 'correlation_id'):
self.influxdb.set_field('correlation_id', self.correlation_id)
self.influxdb.set_field('duration', self.request.request_time())
parsed = headers.parse_content_type(self._headers['Content-Type'])
self.influxdb.set_tag(
'content_type', '{}/{}'.format(
parsed.content_type, parsed.content_subtype))
self.influxdb.set_tag('status_code', self._status_code)
add_measurement(self.influxdb)
@ -132,16 +128,8 @@ def add_measurement(measurement):
if measurement.database not in _measurements:
_measurements[measurement.database] = []
tags = ','.join(['{}={}'.format(k, v)
for k, v in measurement.tags.items()])
fields = ' '.join(['{}={}'.format(k, v)
for k, v in measurement.fields.items()])
LOGGER.debug('Appending measurement to %s', measurement.database)
_measurements[measurement.database].append(
'{},{} {} {:d}'.format(
measurement.name, tags, fields, int(time.time() * 1000000000)))
value = measurement.marshall()
_measurements[measurement.database].append(value)
_maybe_warn_about_buffer_size()
@ -230,8 +218,6 @@ def install(url=None, auth_username=None, auth_password=None, io_loop=None,
_base_tags.setdefault('hostname', socket.gethostname())
if os.environ.get('ENVIRONMENT'):
_base_tags.setdefault('environment', os.environ['ENVIRONMENT'])
if os.environ.get('SERVICE'):
_base_tags.setdefault('service', os.environ['SERVICE'])
_base_tags.update(base_tags or {})
# Start the periodic callback on IOLoop start
@ -373,20 +359,6 @@ def _create_http_client():
max_clients=_max_clients)
def _escape_str(value):
"""Escape the value with InfluxDB's wonderful escaping logic:
"Measurement names, tag keys, and tag values must escape any spaces or
commas using a backslash (\). For example: \ and \,. All tag values are
stored as strings and should not be surrounded in quotes."
:param str value: The value to be escaped
:rtype: str
"""
return str(value).replace(' ', '\ ').replace(',', '\,')
def _flush_wait(flush_future, write_future):
"""Pause briefly allowing any pending metric writes to complete before
shutting down.
@ -517,7 +489,6 @@ def _write_measurements():
LOGGER.warning('Currently writing measurements, skipping write')
future.set_result(False)
elif not _pending_measurements():
LOGGER.debug('No pending measurements, skipping write')
future.set_result(True)
# Exit early if there's an error condition
@ -532,7 +503,7 @@ def _write_measurements():
# Submit a batch for each database
for database in _measurements:
url = '{}?db={}'.format(_base_url, database)
url = '{}?db={}&precision=seconds'.format(_base_url, database)
# Get the measurements to submit
measurements = _measurements[database][:_max_batch_size]
@ -577,7 +548,7 @@ class Measurement(object):
"""
def __init__(self, database, name):
self.database = database
self.name = _escape_str(name)
self.name = name
self.fields = {}
self.tags = dict(_base_tags)
@ -598,17 +569,31 @@ class Measurement(object):
finally:
self.set_field(name, max(time.time(), start) - start)
def marshall(self):
"""Return the measurement in the line protocol format.
:rtype: str
"""
return '{},{} {} {}'.format(
self._escape(self.name),
','.join(['{}={}'.format(self._escape(k), self._escape(v))
for k, v in self.tags.items()]),
self._marshall_fields(),
int(time.time()))
def set_field(self, name, value):
"""Set the value of a field in the measurement.
:param str name: The name of the field to set the value for
:param int|float value: The value of the field
:param int|float|bool|str value: The value of the field
:raises: ValueError
"""
if not isinstance(value, int) and not isinstance(value, float):
raise ValueError('Value must be an integer or float')
self.fields[_escape_str(name)] = str(value)
if not any([isinstance(value, t) for t in {int, float, bool, str}]):
LOGGER.debug('Invalid field value: %r', value)
raise ValueError('Value must be a str, bool, integer, or float')
self.fields[name] = value
def set_tag(self, name, value):
"""Set a tag on the measurement.
@ -620,7 +605,7 @@ class Measurement(object):
if one exists.
"""
self.tags[_escape_str(name)] = _escape_str(value)
self.tags[name] = value
def set_tags(self, tags):
"""Set multiple tags for the measurement.
@ -633,3 +618,38 @@ class Measurement(object):
"""
for key, value in tags.items():
self.set_tag(key, value)
@staticmethod
def _escape(value):
"""Escape a string (key or value) for InfluxDB's line protocol.
:param str|int|float|bool value: The value to be escaped
:rtype: str
"""
value = str(value)
for char, escaped in {' ': '\ ', ',': '\,', '"': '\"'}.items():
value = value.replace(char, escaped)
return value
def _marshall_fields(self):
"""Convert the field dict into the string segment of field key/value
pairs.
:rtype: str
"""
values = {}
for key, value in self.fields.items():
if (isinstance(value, int) or
(isinstance(value, str) and value.isdigit() and
'.' not in value)):
values[key] = '{}i'.format(value)
elif isinstance(value, bool):
values[key] = self._escape(value)
elif isinstance(value, float):
values[key] = '{}'.format(value)
elif isinstance(value, str):
values[key] = '"{}"'.format(self._escape(value))
return ','.join(['{}={}'.format(self._escape(k), v)
for k, v in values.items()])

View file

@ -1,6 +1,7 @@
import collections
import logging
import os
import re
import unittest
import uuid
@ -9,6 +10,17 @@ from tornado import gen, testing, web
import sprockets_influxdb as influxdb
LOGGER = logging.getLogger(__name__)
LINE_PATTERN = re.compile(r'^([\w\-\\, ]+)(?<!\\),([\\\w=\-_.,/"\(\)<>?\+]+)(?'
r'<!\\) ([\w=\-/\\_.,;" ]+)(?<!\\) (\d+)$')
TAG_PATTERN = re.compile(r'(?P<name>(?:(?:\\ )|(?:\\,)|(?:[\w\-_.]+))+)=(?P<va'
r'lue>(?:(?:\\ )|(?:\\,)|(?:[\\\w\-_./\(\)<>?\+]+))+)')
FIELD_PATTERN = re.compile(r'(?P<key>(?:(?:\\ )|(?:\\,)|(?:\\")|(?:[\w\-_./]+)'
r')+)=(?P<value>(?:(?:(?:(?:\\ )|(?:\\,)|(?:\\")|(?'
r':[\w\-_./\;]+))+)|(?:"(?:(?:\\ )|(?:\\,)|(?:\\")|'
r'(?:[\w\-_./\;=]+))+")))')
Measurement = collections.namedtuple(
'measurement', ['db', 'timestamp', 'name', 'tags', 'fields', 'headers'])
@ -39,9 +51,8 @@ def clear_influxdb_module():
def _strip_backslashes(line):
if '\\ ' in line or '\\,' in line:
line = line.replace('\\ ', ' ')
line = line.replace('\\,', ',')
for sequence in {'\\ ', '\\,', '\\"'}:
line = line.replace(sequence, sequence[-1])
return line
@ -144,19 +155,28 @@ class FakeInfluxDBHandler(web.RequestHandler):
db = self.get_query_argument('db')
payload = self.request.body.decode('utf-8')
for line in payload.splitlines():
values = _strip_backslashes(line.split())
key = values[0]
fields = values[1:-1]
timestamp = values[-1]
name = key.split(',')[0]
tags = dict([a.split('=') for a in key.split(',')[1:]])
fields = dict([a.split('=') for a in fields])
LOGGER.debug('Line: %r', line)
parts = LINE_PATTERN.match(line.encode('utf-8'))
name, tags_str, fields_str, timestamp = parts.groups()
matches = TAG_PATTERN.findall(tags_str)
tags = dict([(_strip_backslashes(k), _strip_backslashes(v))
for k, v in matches])
matches = FIELD_PATTERN.findall(fields_str)
fields = dict([(_strip_backslashes(k), _strip_backslashes(v))
for k, v in matches])
for key, value in fields.items():
if '.' in value:
fields[key] = float(value)
if value[-1] == 'i' and value[:-1].isdigit():
fields[key] = int(value[:-1])
elif value[0] == '"' and value[-1] == '"':
fields[key] = value[1:-1]
elif value.lower() in {'t', 'true', 'f', 'false'}:
fields[key] = value.lower() in {'t', 'true'}
else:
fields[key] = int(value)
fields[key] = float(value)
measurements.append(
Measurement(db, timestamp, name, tags, fields,
Measurement(db, int(timestamp), name, tags, fields,
self.request.headers))
self.set_status(204)

View file

@ -85,7 +85,7 @@ class MeasurementTestCase(base.AsyncServerTestCase):
database = str(uuid.uuid4())
name = str(uuid.uuid4())
measurement = influxdb.Measurement(database, name)
measurement.set_field('foo', 'bar')
measurement.set_field('foo', ['bar'])
def test_missing_value_raises_value_error(self):
with self.assertRaises(ValueError):

View file

@ -10,19 +10,11 @@ import sprockets_influxdb as influxdb
from . import base
class EscapeStrTestCase(unittest.TestCase):
def test_escape_str(self):
expectation = 'foo\ bar\,\ baz'
self.assertEqual(influxdb._escape_str('foo bar, baz'), expectation)
class InstallDefaultsTestCase(base.TestCase):
def setUp(self):
super(InstallDefaultsTestCase, self).setUp()
os.environ['ENVIRONMENT'] = str(uuid.uuid4())
os.environ['SERVICE'] = str(uuid.uuid4())
influxdb.install()
def test_calling_install_again_returns_false(self):
@ -37,7 +29,6 @@ class InstallDefaultsTestCase(base.TestCase):
def test_default_tags(self):
expectation = {
'environment': os.environ['ENVIRONMENT'],
'service': os.environ['SERVICE'],
'hostname': socket.gethostname()
}
self.assertDictEqual(influxdb._base_tags, expectation)

View file

@ -22,20 +22,18 @@ class MeasurementTestCase(base.AsyncServerTestCase):
self.assertEqual(measurement.name, 'my-service')
self.assertEqual(measurement.tags['status_code'], '200')
self.assertEqual(measurement.tags['method'], 'GET')
self.assertEqual(
measurement.tags['handler'], 'tests.base.RequestHandler')
self.assertEqual(measurement.tags['handler'],
'tests.base.RequestHandler')
self.assertEqual(measurement.tags['endpoint'], '/')
self.assertEqual(measurement.tags['hostname'], socket.gethostname())
self.assertEqual(measurement.tags['content_type'], 'application/json')
self.assertEqual(measurement.fields['content_length'], 16)
self.assertGreater(float(measurement.fields['duration']), 0.001)
self.assertLess(float(measurement.fields['duration']), 0.1)
self.assertEqual(measurement.tags['accept'], 'application/json')
self.assertEqual(measurement.tags['content_type'], 'application/json')
nanos_since_epoch = int(measurement.timestamp)
then = nanos_since_epoch / 1000000000
self.assertGreaterEqual(then, int(start_time))
self.assertLessEqual(then, time.time())
self.assertGreaterEqual(measurement.timestamp, int(start_time))
self.assertLessEqual(measurement.timestamp, time.time())
def test_measurement_with_named_endpoint(self):
start_time = time.time()
@ -49,22 +47,18 @@ class MeasurementTestCase(base.AsyncServerTestCase):
self.assertEqual(measurement.tags['method'], 'GET')
self.assertEqual(measurement.tags['endpoint'], '/named')
self.assertEqual(measurement.tags['content_type'], 'application/json')
self.assertEqual(
measurement.tags['correlation_id'],
base.NamedRequestHandler.correlation_id)
self.assertEqual(
measurement.tags['handler'], 'tests.base.NamedRequestHandler')
self.assertEqual(measurement.tags['hostname'], socket.gethostname())
self.assertEqual(measurement.fields['content_length'], 16)
self.assertNotIn('accept', measurement.tags)
self.assertEqual(
measurement.fields['correlation_id'],
base.NamedRequestHandler.correlation_id)
self.assertGreater(float(measurement.fields['duration']), 0.001)
self.assertLess(float(measurement.fields['duration']), 0.1)
nanos_since_epoch = int(measurement.timestamp)
then = nanos_since_epoch / 1000000000
self.assertGreaterEqual(then, int(start_time))
self.assertLessEqual(then, time.time())
self.assertGreaterEqual(measurement.timestamp, int(start_time))
self.assertLessEqual(measurement.timestamp, time.time())
def test_measurement_with_param_endpoint(self):
result = self.fetch('/param/100')