2021-03-06 14:50:29 +00:00
|
|
|
import asyncio
|
2021-03-19 22:19:03 +00:00
|
|
|
import logging
|
2021-03-21 14:22:55 +00:00
|
|
|
import socket
|
2021-03-06 14:50:29 +00:00
|
|
|
import time
|
2021-03-09 20:41:18 +00:00
|
|
|
|
|
|
|
import asynctest
|
2021-03-06 14:50:29 +00:00
|
|
|
|
|
|
|
from sprockets_statsd import statsd
|
|
|
|
from tests import helpers
|
|
|
|
|
|
|
|
|
2021-03-09 20:41:18 +00:00
|
|
|
class ProcessorTestCase(asynctest.TestCase):
|
2021-03-21 14:22:55 +00:00
|
|
|
ip_protocol: int
|
|
|
|
|
2021-03-09 20:41:18 +00:00
|
|
|
async def setUp(self):
|
2021-03-06 14:50:29 +00:00
|
|
|
self.test_timeout = 5.0
|
2021-03-09 20:41:18 +00:00
|
|
|
super().setUp()
|
|
|
|
await self.asyncSetUp()
|
|
|
|
|
|
|
|
async def tearDown(self):
|
|
|
|
await self.asyncTearDown()
|
|
|
|
super().tearDown()
|
2021-03-06 14:50:29 +00:00
|
|
|
|
2021-03-07 19:35:42 +00:00
|
|
|
async def wait_for(self, fut):
|
|
|
|
try:
|
|
|
|
await asyncio.wait_for(fut, timeout=self.test_timeout)
|
|
|
|
except asyncio.TimeoutError:
|
|
|
|
self.fail('future took too long to resolve')
|
|
|
|
|
2021-03-06 14:50:29 +00:00
|
|
|
async def asyncSetUp(self):
|
2021-03-21 14:22:55 +00:00
|
|
|
self.statsd_server = helpers.StatsdServer(self.ip_protocol)
|
2021-03-06 14:50:29 +00:00
|
|
|
self.statsd_task = asyncio.create_task(self.statsd_server.run())
|
|
|
|
await self.statsd_server.wait_running()
|
|
|
|
|
|
|
|
async def asyncTearDown(self):
|
2021-03-21 14:22:55 +00:00
|
|
|
self.statsd_server.close()
|
2021-03-06 14:50:29 +00:00
|
|
|
await self.statsd_server.wait_closed()
|
|
|
|
|
|
|
|
|
2021-03-07 19:35:42 +00:00
|
|
|
class ProcessorTests(ProcessorTestCase):
|
2021-03-21 14:22:55 +00:00
|
|
|
ip_protocol = socket.IPPROTO_TCP
|
|
|
|
|
2021-03-06 14:50:29 +00:00
|
|
|
async def test_that_processor_connects_and_disconnects(self):
|
|
|
|
processor = statsd.Processor(host=self.statsd_server.host,
|
|
|
|
port=self.statsd_server.port)
|
|
|
|
asyncio.create_task(processor.run())
|
|
|
|
await self.wait_for(self.statsd_server.client_connected.acquire())
|
|
|
|
await self.wait_for(processor.stop())
|
|
|
|
|
|
|
|
self.assertEqual(1, self.statsd_server.connections_made)
|
|
|
|
self.assertEqual(1, self.statsd_server.connections_lost)
|
|
|
|
|
|
|
|
async def test_that_processor_reconnects(self):
|
|
|
|
processor = statsd.Processor(host=self.statsd_server.host,
|
|
|
|
port=self.statsd_server.port)
|
|
|
|
asyncio.create_task(processor.run())
|
|
|
|
await self.wait_for(self.statsd_server.client_connected.acquire())
|
|
|
|
|
|
|
|
# Now that the server is running and the client has connected,
|
|
|
|
# cancel the server and let it die off.
|
|
|
|
self.statsd_server.close()
|
|
|
|
await self.statsd_server.wait_closed()
|
|
|
|
until = time.time() + self.test_timeout
|
2021-03-19 22:19:03 +00:00
|
|
|
while processor.connected:
|
2021-03-06 14:50:29 +00:00
|
|
|
await asyncio.sleep(0.1)
|
|
|
|
if time.time() >= until:
|
|
|
|
self.fail('processor never disconnected')
|
|
|
|
|
|
|
|
# Start the server on the same port and let the client reconnect.
|
|
|
|
self.statsd_task = asyncio.create_task(self.statsd_server.run())
|
|
|
|
await self.wait_for(self.statsd_server.client_connected.acquire())
|
2021-03-19 22:19:03 +00:00
|
|
|
self.assertTrue(processor.connected)
|
2021-03-06 14:50:29 +00:00
|
|
|
|
|
|
|
await self.wait_for(processor.stop())
|
|
|
|
|
|
|
|
async def test_that_processor_can_be_cancelled(self):
|
|
|
|
processor = statsd.Processor(host=self.statsd_server.host,
|
|
|
|
port=self.statsd_server.port)
|
|
|
|
task = asyncio.create_task(processor.run())
|
|
|
|
await self.wait_for(self.statsd_server.client_connected.acquire())
|
|
|
|
|
|
|
|
task.cancel()
|
2021-03-08 12:27:40 +00:00
|
|
|
await self.wait_for(processor.stopped.wait())
|
2021-03-06 14:50:29 +00:00
|
|
|
|
|
|
|
async def test_shutdown_when_disconnected(self):
|
|
|
|
processor = statsd.Processor(host=self.statsd_server.host,
|
|
|
|
port=self.statsd_server.port)
|
|
|
|
asyncio.create_task(processor.run())
|
|
|
|
await self.wait_for(self.statsd_server.client_connected.acquire())
|
|
|
|
|
|
|
|
self.statsd_server.close()
|
|
|
|
await self.statsd_server.wait_closed()
|
|
|
|
|
|
|
|
await self.wait_for(processor.stop())
|
|
|
|
|
|
|
|
async def test_socket_resets(self):
|
|
|
|
processor = statsd.Processor(host=self.statsd_server.host,
|
|
|
|
port=self.statsd_server.port)
|
|
|
|
asyncio.create_task(processor.run())
|
|
|
|
await self.wait_for(self.statsd_server.client_connected.acquire())
|
|
|
|
|
|
|
|
self.statsd_server.transports[0].close()
|
|
|
|
await self.wait_for(self.statsd_server.client_connected.acquire())
|
2021-03-10 12:16:47 +00:00
|
|
|
await self.wait_for(processor.stop())
|
2021-03-06 14:50:29 +00:00
|
|
|
|
2021-03-08 12:27:40 +00:00
|
|
|
async def test_that_stopping_when_not_running_is_safe(self):
|
|
|
|
processor = statsd.Processor(host=self.statsd_server.host,
|
|
|
|
port=self.statsd_server.port)
|
|
|
|
await self.wait_for(processor.stop())
|
|
|
|
|
2021-03-09 20:06:23 +00:00
|
|
|
def test_that_processor_fails_when_host_is_none(self):
|
|
|
|
with self.assertRaises(RuntimeError) as context:
|
|
|
|
statsd.Processor(host=None, port=12345)
|
|
|
|
self.assertIn('host', str(context.exception))
|
|
|
|
|
2021-03-19 22:19:03 +00:00
|
|
|
async def test_starting_and_stopping_without_connecting(self):
|
|
|
|
host, port = self.statsd_server.host, self.statsd_server.port
|
|
|
|
self.statsd_server.close()
|
|
|
|
await self.wait_for(self.statsd_server.wait_closed())
|
|
|
|
processor = statsd.Processor(host=host, port=port)
|
|
|
|
asyncio.create_task(processor.run())
|
|
|
|
await self.wait_for(processor.running.wait())
|
|
|
|
await processor.stop()
|
|
|
|
|
|
|
|
async def test_that_protocol_exceptions_are_logged(self):
|
|
|
|
processor = statsd.Processor(host=self.statsd_server.host,
|
|
|
|
port=self.statsd_server.port)
|
|
|
|
asyncio.create_task(processor.run())
|
|
|
|
await self.wait_for(processor.running.wait())
|
|
|
|
|
|
|
|
with self.assertLogs(processor.logger, level=logging.ERROR) as cm:
|
|
|
|
processor.queue.put_nowait('not-bytes')
|
|
|
|
while processor.queue.qsize() > 0:
|
|
|
|
await asyncio.sleep(0.1)
|
|
|
|
|
|
|
|
for record in cm.records:
|
2021-03-21 14:22:55 +00:00
|
|
|
if record.exc_info is not None and record.funcName == 'run':
|
2021-03-19 22:19:03 +00:00
|
|
|
break
|
|
|
|
else:
|
2021-03-21 14:22:55 +00:00
|
|
|
self.fail('Expected run to log exception')
|
2021-03-19 22:19:03 +00:00
|
|
|
|
|
|
|
await processor.stop()
|
|
|
|
|
|
|
|
|
|
|
|
class TCPProcessingTests(ProcessorTestCase):
|
2021-03-21 14:22:55 +00:00
|
|
|
ip_protocol = socket.IPPROTO_TCP
|
|
|
|
|
2021-03-19 22:19:03 +00:00
|
|
|
async def asyncSetUp(self):
|
|
|
|
await super().asyncSetUp()
|
|
|
|
self.processor = statsd.Processor(host=self.statsd_server.host,
|
2021-03-21 14:22:55 +00:00
|
|
|
port=self.statsd_server.port,
|
|
|
|
reconnect_sleep=0.25)
|
2021-03-19 22:19:03 +00:00
|
|
|
asyncio.create_task(self.processor.run())
|
|
|
|
await self.wait_for(self.statsd_server.client_connected.acquire())
|
|
|
|
|
|
|
|
async def asyncTearDown(self):
|
|
|
|
await self.processor.stop()
|
|
|
|
await super().asyncTearDown()
|
|
|
|
|
|
|
|
async def test_connection_failures(self):
|
|
|
|
# Change the port and close the transport, this will cause the
|
|
|
|
# processor to reconnect to the new port and fail.
|
|
|
|
self.processor.port = 1
|
|
|
|
self.processor.protocol.transport.close()
|
|
|
|
|
|
|
|
# Wait for the processor to be disconnected, then change the
|
|
|
|
# port back and let the processor reconnect.
|
|
|
|
while self.processor.connected:
|
|
|
|
await asyncio.sleep(0.1)
|
|
|
|
await asyncio.sleep(0.2)
|
|
|
|
self.processor.port = self.statsd_server.port
|
|
|
|
|
|
|
|
await self.wait_for(self.statsd_server.client_connected.acquire())
|
|
|
|
|
|
|
|
async def test_socket_closure_while_processing_failed_event(self):
|
|
|
|
state = {'first_time': True}
|
|
|
|
real_process_metric = self.processor._process_metric
|
|
|
|
|
|
|
|
async def fake_process_metric():
|
|
|
|
if state['first_time']:
|
|
|
|
self.processor.protocol.buffered_data = b'counter:1|c\n'
|
|
|
|
self.processor.protocol.transport.close()
|
|
|
|
state['first_time'] = False
|
|
|
|
return await real_process_metric()
|
|
|
|
|
|
|
|
self.processor._process_metric = fake_process_metric
|
|
|
|
|
|
|
|
await self.wait_for(self.statsd_server.message_received.acquire())
|
|
|
|
|
|
|
|
async def test_socket_closure_while_sending(self):
|
|
|
|
state = {'first_time': True}
|
|
|
|
real_transport_write = self.processor.protocol.transport.write
|
|
|
|
|
|
|
|
def fake_transport_write(buffer):
|
|
|
|
if state['first_time']:
|
|
|
|
self.processor.protocol.transport.close()
|
|
|
|
state['first_time'] = False
|
|
|
|
return real_transport_write(buffer)
|
|
|
|
|
|
|
|
self.processor.protocol.transport.write = fake_transport_write
|
|
|
|
self.processor.queue.put_nowait(b'counter:1|c')
|
|
|
|
await self.wait_for(self.statsd_server.message_received.acquire())
|
|
|
|
|
2021-03-07 19:35:42 +00:00
|
|
|
|
2021-03-21 14:22:55 +00:00
|
|
|
class UDPProcessingTests(ProcessorTestCase):
|
|
|
|
ip_protocol = socket.IPPROTO_UDP
|
|
|
|
|
|
|
|
async def asyncSetUp(self):
|
|
|
|
await super().asyncSetUp()
|
|
|
|
self.connector = statsd.Connector(host=self.statsd_server.host,
|
|
|
|
port=self.statsd_server.port,
|
|
|
|
ip_protocol=self.ip_protocol,
|
|
|
|
reconnect_sleep=0.25)
|
|
|
|
await self.connector.start()
|
|
|
|
|
|
|
|
async def asyncTearDown(self):
|
|
|
|
await self.connector.stop()
|
|
|
|
await super().asyncTearDown()
|
|
|
|
|
|
|
|
async def test_sending_metrics(self):
|
|
|
|
self.connector.inject_metric('counter', 1, 'c')
|
|
|
|
self.connector.inject_metric('timer', 1.0, 'ms')
|
|
|
|
await self.wait_for(self.statsd_server.message_received.acquire())
|
|
|
|
await self.wait_for(self.statsd_server.message_received.acquire())
|
|
|
|
|
|
|
|
self.assertEqual(self.statsd_server.metrics[0], b'counter:1|c')
|
|
|
|
self.assertEqual(self.statsd_server.metrics[1], b'timer:1.0|ms')
|
|
|
|
|
|
|
|
async def test_that_client_sends_to_new_server(self):
|
|
|
|
self.statsd_server.close()
|
|
|
|
await self.statsd_server.wait_closed()
|
|
|
|
|
|
|
|
self.connector.inject_metric('should.be.lost', 1, 'c')
|
|
|
|
await asyncio.sleep(self.connector.processor._wait_timeout * 2)
|
|
|
|
|
|
|
|
self.statsd_task = asyncio.create_task(self.statsd_server.run())
|
|
|
|
await self.statsd_server.wait_running()
|
|
|
|
|
|
|
|
self.connector.inject_metric('should.be.recvd', 1, 'c')
|
|
|
|
await self.wait_for(self.statsd_server.message_received.acquire())
|
|
|
|
self.assertEqual(self.statsd_server.metrics[0], b'should.be.recvd:1|c')
|
|
|
|
|
|
|
|
async def test_that_client_handles_socket_closure(self):
|
|
|
|
self.connector.processor.protocol.transport.close()
|
|
|
|
await self.wait_for(
|
|
|
|
asyncio.sleep(self.connector.processor._reconnect_sleep))
|
|
|
|
|
|
|
|
self.connector.inject_metric('should.be.recvd', 1, 'c')
|
|
|
|
await self.wait_for(self.statsd_server.message_received.acquire())
|
|
|
|
self.assertEqual(self.statsd_server.metrics[0], b'should.be.recvd:1|c')
|
|
|
|
|
|
|
|
|
2021-03-08 12:27:40 +00:00
|
|
|
class ConnectorTests(ProcessorTestCase):
|
2021-03-21 14:22:55 +00:00
|
|
|
ip_protocol = socket.IPPROTO_TCP
|
|
|
|
|
2021-03-07 19:35:42 +00:00
|
|
|
async def asyncSetUp(self):
|
|
|
|
await super().asyncSetUp()
|
2021-03-08 12:27:40 +00:00
|
|
|
self.connector = statsd.Connector(self.statsd_server.host,
|
|
|
|
self.statsd_server.port)
|
|
|
|
await self.connector.start()
|
2021-03-07 19:35:42 +00:00
|
|
|
await self.wait_for(self.statsd_server.client_connected.acquire())
|
|
|
|
|
|
|
|
async def asyncTearDown(self):
|
2021-03-08 12:27:40 +00:00
|
|
|
await self.wait_for(self.connector.stop())
|
2021-03-07 19:35:42 +00:00
|
|
|
await super().asyncTearDown()
|
|
|
|
|
|
|
|
def assert_metrics_equal(self, recvd: bytes, path, value, type_code):
|
|
|
|
recvd = recvd.decode('utf-8')
|
|
|
|
recvd_path, _, rest = recvd.partition(':')
|
|
|
|
recvd_value, _, recvd_code = rest.partition('|')
|
|
|
|
self.assertEqual(path, recvd_path, 'metric path mismatch')
|
|
|
|
self.assertEqual(recvd_value, str(value), 'metric value mismatch')
|
|
|
|
self.assertEqual(recvd_code, type_code, 'metric type mismatch')
|
|
|
|
|
|
|
|
async def test_sending_simple_counter(self):
|
2021-03-08 12:27:40 +00:00
|
|
|
self.connector.inject_metric('simple.counter', 1000, 'c')
|
2021-03-07 19:35:42 +00:00
|
|
|
await self.wait_for(self.statsd_server.message_received.acquire())
|
|
|
|
self.assert_metrics_equal(self.statsd_server.metrics[0],
|
|
|
|
'simple.counter', 1000, 'c')
|
|
|
|
|
|
|
|
async def test_adjusting_gauge(self):
|
2021-03-08 12:27:40 +00:00
|
|
|
self.connector.inject_metric('simple.gauge', 100, 'g')
|
|
|
|
self.connector.inject_metric('simple.gauge', -10, 'g')
|
|
|
|
self.connector.inject_metric('simple.gauge', '+10', 'g')
|
2021-03-07 19:35:42 +00:00
|
|
|
for _ in range(3):
|
|
|
|
await self.wait_for(self.statsd_server.message_received.acquire())
|
|
|
|
|
|
|
|
self.assert_metrics_equal(self.statsd_server.metrics[0],
|
|
|
|
'simple.gauge', '100', 'g')
|
|
|
|
self.assert_metrics_equal(self.statsd_server.metrics[1],
|
|
|
|
'simple.gauge', '-10', 'g')
|
|
|
|
self.assert_metrics_equal(self.statsd_server.metrics[2],
|
|
|
|
'simple.gauge', '+10', 'g')
|
|
|
|
|
|
|
|
async def test_sending_timer(self):
|
|
|
|
secs = 12.34
|
2021-03-08 12:27:40 +00:00
|
|
|
self.connector.inject_metric('simple.timer', secs * 1000.0, 'ms')
|
2021-03-07 19:35:42 +00:00
|
|
|
await self.wait_for(self.statsd_server.message_received.acquire())
|
|
|
|
self.assert_metrics_equal(self.statsd_server.metrics[0],
|
|
|
|
'simple.timer', 12340.0, 'ms')
|
|
|
|
|
|
|
|
async def test_that_queued_metrics_are_drained(self):
|
|
|
|
# The easiest way to test that the internal metrics queue
|
|
|
|
# is drained when the processor is stopped is to monkey
|
|
|
|
# patch the "process metric" method to enqueue a few
|
2021-03-08 12:27:40 +00:00
|
|
|
# metrics and then terminate the processor. It will exit
|
2021-03-07 19:35:42 +00:00
|
|
|
# the run loop and drain the queue.
|
2021-03-08 12:27:40 +00:00
|
|
|
real_process_metric = self.connector.processor._process_metric
|
2021-03-07 19:35:42 +00:00
|
|
|
|
|
|
|
async def fake_process_metric():
|
2021-03-08 12:27:40 +00:00
|
|
|
if not self.connector.processor.should_terminate:
|
|
|
|
self.connector.inject_metric('counter', 1, 'c')
|
|
|
|
self.connector.inject_metric('counter', 2, 'c')
|
|
|
|
self.connector.inject_metric('counter', 3, 'c')
|
|
|
|
self.connector.processor.should_terminate = True
|
2021-03-07 19:35:42 +00:00
|
|
|
return await real_process_metric()
|
|
|
|
|
2021-03-08 12:27:40 +00:00
|
|
|
self.connector.processor._process_metric = fake_process_metric
|
2021-03-07 19:35:42 +00:00
|
|
|
await self.wait_for(self.statsd_server.message_received.acquire())
|
|
|
|
await self.wait_for(self.statsd_server.message_received.acquire())
|
|
|
|
await self.wait_for(self.statsd_server.message_received.acquire())
|
2021-03-07 19:37:24 +00:00
|
|
|
|
|
|
|
async def test_metrics_sent_while_disconnected_are_queued(self):
|
|
|
|
self.statsd_server.close()
|
|
|
|
await self.statsd_server.wait_closed()
|
|
|
|
|
|
|
|
for value in range(50):
|
2021-03-08 12:27:40 +00:00
|
|
|
self.connector.inject_metric('counter', value, 'c')
|
2021-03-07 19:37:24 +00:00
|
|
|
|
|
|
|
asyncio.create_task(self.statsd_server.run())
|
|
|
|
await self.wait_for(self.statsd_server.client_connected.acquire())
|
|
|
|
for value in range(50):
|
|
|
|
await self.wait_for(self.statsd_server.message_received.acquire())
|
|
|
|
self.assertEqual(f'counter:{value}|c'.encode(),
|
|
|
|
self.statsd_server.metrics.pop(0))
|
2021-03-21 14:22:55 +00:00
|
|
|
|
|
|
|
|
|
|
|
class ConnectorOptionTests(ProcessorTestCase):
|
|
|
|
ip_protocol = socket.IPPROTO_TCP
|
|
|
|
|
|
|
|
def test_protocol_values(self):
|
|
|
|
connector = statsd.Connector(host=self.statsd_server.host,
|
|
|
|
port=self.statsd_server.port)
|
|
|
|
self.assertEqual(socket.IPPROTO_TCP, connector.processor._ip_protocol)
|
|
|
|
|
|
|
|
connector = statsd.Connector(host=self.statsd_server.host,
|
|
|
|
port=self.statsd_server.port,
|
|
|
|
ip_protocol=socket.IPPROTO_UDP)
|
|
|
|
self.assertEqual(socket.IPPROTO_UDP, connector.processor._ip_protocol)
|
|
|
|
|
|
|
|
with self.assertRaises(RuntimeError):
|
|
|
|
statsd.Connector(host=self.statsd_server.host,
|
|
|
|
port=self.statsd_server.port,
|
|
|
|
ip_protocol=socket.IPPROTO_GRE)
|
|
|
|
|
|
|
|
def test_invalid_port_values(self):
|
|
|
|
for port in {None, 0, -1, 'not-a-number'}:
|
|
|
|
with self.assertRaises(RuntimeError) as context:
|
|
|
|
statsd.Connector(host=self.statsd_server.host, port=port)
|
|
|
|
self.assertIn('port', str(context.exception))
|
|
|
|
self.assertIn(repr(port), str(context.exception))
|