This commit is contained in:
Dave Shawley 2016-02-19 07:32:37 -05:00
parent 48e46e0eb4
commit 3e4300e6b8

View file

@ -32,9 +32,45 @@ class InfluxDBConnection(object):
body = '{},{} {} {:d}'.format(measurement, ','.join(tags),
','.join(values),
int(time.time() * 1000000000))
ioloop.IOLoop.current().spawn_callback(self._send_measurement, body)
def _send_measurement(self, body):
def _handle_response(self, future):
pass
request = httpclient.HTTPRequest(self.write_url, method='POST',
body=body.encode('utf-8'))
ioloop.IOLoop.current().spawn_callback(self.client.fetch, request)
body=body)
future = self.client.fetch(request)
ioloop.IOLoop.current().add_future(future, _handle_response)
class BatchingInfluxDBConnection(InfluxDBConnection):
def __init__(self, *args, **kwargs):
self._buffer_time = kwargs.pop('buffer_time', 1.0)
super(BatchingInfluxDBConnection, self).__init__(*args, **kwargs)
self._measurements = []
self._last_send = self.io_loop.time()
def submit(self, measurement, tags, value):
body = '{},{} {} {:d}'.format(measurement, ','.join(tags),
','.join(values),
int(time.time() * 1000000000))
self._maybe_send(body)
def _maybe_send(self, measurement):
if self.io_loop.time() < (self._last_send + self._buffer_time):
self._measurements.append(measurement)
return
if self._measurements:
body = '\n'.join(self._measurements)
ioloop.IOLoop.current().spawn_callback(self._send_measurement,
boby)
del self._measurements[:]
self._last_send = self.io_loop.time()
class InfluxDBMixin(object):