mirror of
https://github.com/sprockets/sprockets-statsd.git
synced 2024-11-15 03:00:25 +00:00
Send metrics asynchronously.
This commit is contained in:
parent
9febb7e7e8
commit
ff6f13591c
3 changed files with 144 additions and 12 deletions
|
@ -15,18 +15,26 @@ class Processor(asyncio.Protocol):
|
|||
self.running = False
|
||||
self.transport = None
|
||||
|
||||
self._queue = asyncio.Queue()
|
||||
|
||||
async def run(self):
|
||||
self.running = True
|
||||
while self.running:
|
||||
try:
|
||||
await self._connect_if_necessary()
|
||||
await asyncio.sleep(0.1)
|
||||
await self._process_metric()
|
||||
except asyncio.CancelledError:
|
||||
self.logger.info('task cancelled, exiting')
|
||||
break
|
||||
|
||||
self.running = False
|
||||
self.logger.info('loop finished with %d metrics in the queue',
|
||||
self._queue.qsize())
|
||||
if self.connected.is_set():
|
||||
num_ready = self._queue.qsize()
|
||||
self.logger.info('draining %d metrics', num_ready)
|
||||
for _ in range(num_ready):
|
||||
await self._process_metric()
|
||||
self.logger.debug('closing transport')
|
||||
self.transport.close()
|
||||
|
||||
|
@ -34,13 +42,18 @@ class Processor(asyncio.Protocol):
|
|||
self.logger.debug('waiting on transport to close')
|
||||
await asyncio.sleep(0.1)
|
||||
|
||||
self.logger.info('processing is exiting')
|
||||
self.logger.info('processor is exiting')
|
||||
self.closed.set()
|
||||
|
||||
async def stop(self):
|
||||
self.running = False
|
||||
await self.closed.wait()
|
||||
|
||||
def inject_metric(self, path: str, value: typing.Union[float, int, str],
|
||||
type_code: str):
|
||||
payload = f'{path}:{value}|{type_code}\n'
|
||||
self._queue.put_nowait(payload.encode('utf-8'))
|
||||
|
||||
def eof_received(self):
|
||||
self.logger.warning('received EOF from statsd server')
|
||||
self.connected.clear()
|
||||
|
@ -69,3 +82,12 @@ class Processor(asyncio.Protocol):
|
|||
except IOError as error:
|
||||
self.logger.warning('connection to %s:%s failed: %s',
|
||||
self.host, self.port, error)
|
||||
|
||||
async def _process_metric(self):
|
||||
try:
|
||||
metric = await asyncio.wait_for(self._queue.get(), 0.1)
|
||||
except asyncio.TimeoutError:
|
||||
return # nothing to do
|
||||
else:
|
||||
self.transport.write(metric)
|
||||
self._queue.task_done()
|
||||
|
|
|
@ -11,13 +11,17 @@ class StatsdServer(asyncio.Protocol):
|
|||
self.connections_lost = 0
|
||||
self.message_counter = 0
|
||||
|
||||
self.buffer = io.BytesIO()
|
||||
self.metrics = []
|
||||
self.running = asyncio.Event()
|
||||
self.client_connected = asyncio.Semaphore(value=0)
|
||||
self.message_received = asyncio.Semaphore(value=0)
|
||||
self.transports: list[asyncio.Transport] = []
|
||||
|
||||
self._buffer = io.BytesIO()
|
||||
|
||||
async def run(self):
|
||||
await self._reset()
|
||||
|
||||
loop = asyncio.get_running_loop()
|
||||
self.service = await loop.create_server(lambda: self,
|
||||
self.host,
|
||||
|
@ -61,6 +65,41 @@ class StatsdServer(asyncio.Protocol):
|
|||
self.connections_lost += 1
|
||||
|
||||
def data_received(self, data: bytes):
|
||||
self.buffer.write(data)
|
||||
self.message_received.release()
|
||||
self.message_counter += 1
|
||||
self._buffer.write(data)
|
||||
buf = self._buffer.getvalue()
|
||||
if b'\n' in buf:
|
||||
buf_complete = buf[-1] == ord('\n')
|
||||
if not buf_complete:
|
||||
offset = buf.rfind(b'\n')
|
||||
self._buffer = io.BytesIO(buf[offset:])
|
||||
buf = buf[:offset]
|
||||
else:
|
||||
self._buffer = io.BytesIO()
|
||||
buf = buf[:-1]
|
||||
|
||||
for metric in buf.split(b'\n'):
|
||||
self.metrics.append(metric)
|
||||
self.message_received.release()
|
||||
self.message_counter += 1
|
||||
|
||||
async def _reset(self):
|
||||
self._buffer = io.BytesIO()
|
||||
self.connections_made = 0
|
||||
self.connections_lost = 0
|
||||
self.message_counter = 0
|
||||
self.metrics.clear()
|
||||
for transport in self.transports:
|
||||
transport.close()
|
||||
self.transports.clear()
|
||||
|
||||
self.running.clear()
|
||||
await self._drain_semaphore(self.client_connected)
|
||||
await self._drain_semaphore(self.message_received)
|
||||
|
||||
@staticmethod
|
||||
async def _drain_semaphore(semaphore: asyncio.Semaphore):
|
||||
while not semaphore.locked():
|
||||
try:
|
||||
await asyncio.wait_for(semaphore.acquire(), 0.1)
|
||||
except asyncio.TimeoutError:
|
||||
break
|
||||
|
|
|
@ -7,11 +7,17 @@ from sprockets_statsd import statsd
|
|||
from tests import helpers
|
||||
|
||||
|
||||
class ProcessorTests(unittest.IsolatedAsyncioTestCase):
|
||||
class ProcessorTestCase(unittest.IsolatedAsyncioTestCase):
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.test_timeout = 5.0
|
||||
|
||||
async def wait_for(self, fut):
|
||||
try:
|
||||
await asyncio.wait_for(fut, timeout=self.test_timeout)
|
||||
except asyncio.TimeoutError:
|
||||
self.fail('future took too long to resolve')
|
||||
|
||||
async def asyncSetUp(self):
|
||||
await super().asyncSetUp()
|
||||
self.statsd_server = helpers.StatsdServer()
|
||||
|
@ -23,12 +29,8 @@ class ProcessorTests(unittest.IsolatedAsyncioTestCase):
|
|||
await self.statsd_server.wait_closed()
|
||||
await super().asyncTearDown()
|
||||
|
||||
async def wait_for(self, fut):
|
||||
try:
|
||||
await asyncio.wait_for(fut, timeout=self.test_timeout)
|
||||
except asyncio.TimeoutError:
|
||||
self.fail('future took too long to resolve')
|
||||
|
||||
class ProcessorTests(ProcessorTestCase):
|
||||
async def test_that_processor_connects_and_disconnects(self):
|
||||
processor = statsd.Processor(host=self.statsd_server.host,
|
||||
port=self.statsd_server.port)
|
||||
|
@ -110,3 +112,72 @@ class ProcessorTests(unittest.IsolatedAsyncioTestCase):
|
|||
processor.port = self.statsd_server.port
|
||||
|
||||
await self.wait_for(self.statsd_server.client_connected.acquire())
|
||||
|
||||
|
||||
class MetricProcessingTests(ProcessorTestCase):
|
||||
async def asyncSetUp(self):
|
||||
await super().asyncSetUp()
|
||||
self.processor = statsd.Processor(host=self.statsd_server.host,
|
||||
port=self.statsd_server.port)
|
||||
asyncio.create_task(self.processor.run())
|
||||
await self.wait_for(self.statsd_server.client_connected.acquire())
|
||||
|
||||
async def asyncTearDown(self):
|
||||
await self.wait_for(self.processor.stop())
|
||||
await super().asyncTearDown()
|
||||
|
||||
def assert_metrics_equal(self, recvd: bytes, path, value, type_code):
|
||||
recvd = recvd.decode('utf-8')
|
||||
recvd_path, _, rest = recvd.partition(':')
|
||||
recvd_value, _, recvd_code = rest.partition('|')
|
||||
self.assertEqual(path, recvd_path, 'metric path mismatch')
|
||||
self.assertEqual(recvd_value, str(value), 'metric value mismatch')
|
||||
self.assertEqual(recvd_code, type_code, 'metric type mismatch')
|
||||
|
||||
async def test_sending_simple_counter(self):
|
||||
self.processor.inject_metric('simple.counter', 1000, 'c')
|
||||
await self.wait_for(self.statsd_server.message_received.acquire())
|
||||
self.assert_metrics_equal(self.statsd_server.metrics[0],
|
||||
'simple.counter', 1000, 'c')
|
||||
|
||||
async def test_adjusting_gauge(self):
|
||||
self.processor.inject_metric('simple.gauge', 100, 'g')
|
||||
self.processor.inject_metric('simple.gauge', -10, 'g')
|
||||
self.processor.inject_metric('simple.gauge', '+10', 'g')
|
||||
for _ in range(3):
|
||||
await self.wait_for(self.statsd_server.message_received.acquire())
|
||||
|
||||
self.assert_metrics_equal(self.statsd_server.metrics[0],
|
||||
'simple.gauge', '100', 'g')
|
||||
self.assert_metrics_equal(self.statsd_server.metrics[1],
|
||||
'simple.gauge', '-10', 'g')
|
||||
self.assert_metrics_equal(self.statsd_server.metrics[2],
|
||||
'simple.gauge', '+10', 'g')
|
||||
|
||||
async def test_sending_timer(self):
|
||||
secs = 12.34
|
||||
self.processor.inject_metric('simple.timer', secs * 1000.0, 'ms')
|
||||
await self.wait_for(self.statsd_server.message_received.acquire())
|
||||
self.assert_metrics_equal(self.statsd_server.metrics[0],
|
||||
'simple.timer', 12340.0, 'ms')
|
||||
|
||||
async def test_that_queued_metrics_are_drained(self):
|
||||
# The easiest way to test that the internal metrics queue
|
||||
# is drained when the processor is stopped is to monkey
|
||||
# patch the "process metric" method to enqueue a few
|
||||
# metrics and then set running to false. It will exit
|
||||
# the run loop and drain the queue.
|
||||
real_process_metric = self.processor._process_metric
|
||||
|
||||
async def fake_process_metric():
|
||||
if self.processor.running:
|
||||
self.processor.inject_metric('counter', 1, 'c')
|
||||
self.processor.inject_metric('counter', 2, 'c')
|
||||
self.processor.inject_metric('counter', 3, 'c')
|
||||
self.processor.running = False
|
||||
return await real_process_metric()
|
||||
|
||||
self.processor._process_metric = fake_process_metric
|
||||
await self.wait_for(self.statsd_server.message_received.acquire())
|
||||
await self.wait_for(self.statsd_server.message_received.acquire())
|
||||
await self.wait_for(self.statsd_server.message_received.acquire())
|
||||
|
|
Loading…
Reference in a new issue