mirror of
https://github.com/sprockets/sprockets-statsd.git
synced 2024-11-22 03:00:20 +00:00
395 lines
14 KiB
Python
395 lines
14 KiB
Python
import asyncio
|
|
import os
|
|
import socket
|
|
import time
|
|
import typing
|
|
import unittest.mock
|
|
|
|
import sprockets.http.app
|
|
import sprockets.http.testing
|
|
from tornado import testing, web
|
|
|
|
import sprockets_statsd.statsd
|
|
import sprockets_statsd.tornado
|
|
from tests import helpers
|
|
|
|
ParsedMetric = typing.Tuple[str, float, str]
|
|
|
|
|
|
class Handler(sprockets_statsd.tornado.RequestHandler, web.RequestHandler):
|
|
async def get(self):
|
|
with self.execution_timer('execution-timer'):
|
|
await asyncio.sleep(0.1)
|
|
self.increase_counter('request-count')
|
|
self.write('true')
|
|
|
|
|
|
class Application(sprockets_statsd.tornado.Application, web.Application):
|
|
def __init__(self, **settings):
|
|
super().__init__([web.url('/', Handler)], **settings)
|
|
|
|
|
|
class AsyncTestCaseWithTimeout(testing.AsyncTestCase):
|
|
def run_coroutine(self, coro):
|
|
loop: asyncio.AbstractEventLoop = self.io_loop.asyncio_loop
|
|
try:
|
|
loop.run_until_complete(
|
|
asyncio.wait_for(coro,
|
|
timeout=testing.get_async_test_timeout()))
|
|
except asyncio.TimeoutError:
|
|
self.fail(f'coroutine {coro} took too long to complete')
|
|
|
|
|
|
class ApplicationTests(AsyncTestCaseWithTimeout):
|
|
def setUp(self):
|
|
super().setUp()
|
|
self._environ = {}
|
|
|
|
def tearDown(self):
|
|
super().tearDown()
|
|
for name, value in self._environ.items():
|
|
if value is not None:
|
|
os.environ[name] = value
|
|
else:
|
|
os.environ.pop(name, None)
|
|
|
|
def setenv(self, **variables):
|
|
for name, value in variables.items():
|
|
self._environ.setdefault(name, os.environ.pop(name, None))
|
|
os.environ[name] = value
|
|
|
|
def unsetenv(self, *names):
|
|
for name in names:
|
|
self._environ.setdefault(name, os.environ.pop(name, None))
|
|
|
|
def test_statsd_setting_defaults(self):
|
|
self.unsetenv('STATSD_ENABLED', 'STATSD_HOST', 'STATSD_PORT',
|
|
'STATSD_PREFIX', 'STATSD_PROTOCOL')
|
|
|
|
app = sprockets_statsd.tornado.Application(statsd={'prefix': ''})
|
|
self.assertIn('statsd', app.settings)
|
|
self.assertTrue(app.settings['statsd']['enabled'])
|
|
self.assertIsNone(app.settings['statsd']['host'],
|
|
'default host value should be None')
|
|
self.assertEqual(8125, app.settings['statsd']['port'])
|
|
self.assertEqual('', app.settings['statsd']['prefix'])
|
|
self.assertEqual('tcp', app.settings['statsd']['protocol'])
|
|
|
|
def test_that_statsd_settings_read_from_environment(self):
|
|
self.setenv(STATSD_ENABLED='no',
|
|
STATSD_HOST='statsd',
|
|
STATSD_PORT='5218',
|
|
STATSD_PREFIX='my-service',
|
|
STATSD_PROTOCOL='udp')
|
|
|
|
app = sprockets_statsd.tornado.Application()
|
|
self.assertIn('statsd', app.settings)
|
|
self.assertFalse(app.settings['statsd']['enabled'])
|
|
self.assertEqual('statsd', app.settings['statsd']['host'])
|
|
self.assertEqual(5218, app.settings['statsd']['port'])
|
|
self.assertEqual('my-service', app.settings['statsd']['prefix'])
|
|
self.assertEqual('udp', app.settings['statsd']['protocol'])
|
|
|
|
def test_prefix_when_only_service_is_set(self):
|
|
with self.assertRaises(RuntimeError):
|
|
sprockets_statsd.tornado.Application(service='blah')
|
|
|
|
def test_prefix_when_only_environment_is_set(self):
|
|
with self.assertRaises(RuntimeError):
|
|
sprockets_statsd.tornado.Application(environment='whatever')
|
|
|
|
def test_prefix_default_when_service_and_environment_are_set(self):
|
|
app = sprockets_statsd.tornado.Application(environment='development',
|
|
service='my-service')
|
|
self.assertIn('statsd', app.settings)
|
|
self.assertEqual('applications.my-service.development',
|
|
app.settings['statsd']['prefix'])
|
|
|
|
def test_overridden_settings(self):
|
|
self.setenv(STATSD_ENABLED='0',
|
|
STATSD_HOST='statsd',
|
|
STATSD_PORT='9999',
|
|
STATSD_PREFIX='service',
|
|
STATSD_PROTOCOL='tcp')
|
|
|
|
app = sprockets_statsd.tornado.Application(
|
|
statsd={
|
|
'enabled': 'true',
|
|
'host': 'statsd.example.com',
|
|
'port': 5218,
|
|
'prefix': 'myapp',
|
|
'protocol': 'udp',
|
|
})
|
|
self.assertEqual('statsd.example.com', app.settings['statsd']['host'])
|
|
self.assertTrue(app.settings['statsd']['enabled'])
|
|
self.assertEqual(5218, app.settings['statsd']['port'])
|
|
self.assertEqual('myapp', app.settings['statsd']['prefix'])
|
|
self.assertEqual('udp', app.settings['statsd']['protocol'])
|
|
|
|
def test_that_starting_without_host_fails(self):
|
|
self.unsetenv('STATSD_HOST')
|
|
app = sprockets_statsd.tornado.Application(statsd={'prefix': 'app'})
|
|
with self.assertRaises(RuntimeError):
|
|
self.run_coroutine(app.start_statsd())
|
|
|
|
def test_creating_without_prefix_on_purpose(self):
|
|
self.unsetenv('STATSD_PREFIX')
|
|
app = sprockets_statsd.tornado.Application(statsd={
|
|
'host': 'statsd.example.com',
|
|
'protocol': 'udp',
|
|
'prefix': None,
|
|
})
|
|
self.assertEqual(None, app.settings['statsd']['prefix'])
|
|
|
|
def test_starting_with_calculated_prefix(self):
|
|
self.unsetenv('STATSD_PREFIX')
|
|
app = sprockets_statsd.tornado.Application(
|
|
environment='development',
|
|
service='my-service',
|
|
statsd={
|
|
'host': 'statsd.example.com',
|
|
'protocol': 'udp',
|
|
})
|
|
try:
|
|
self.run_coroutine(app.start_statsd())
|
|
self.assertEqual('applications.my-service.development',
|
|
app.settings['statsd']['prefix'])
|
|
finally:
|
|
self.run_coroutine(app.stop_statsd())
|
|
|
|
def test_starting_twice(self):
|
|
app = sprockets_statsd.tornado.Application(statsd={
|
|
'host': 'localhost',
|
|
'port': '8125',
|
|
'prefix': 'my-service',
|
|
})
|
|
try:
|
|
self.run_coroutine(app.start_statsd())
|
|
connector = app.statsd_connector
|
|
self.assertIsNotNone(connector, 'statsd.Connector not created')
|
|
|
|
self.run_coroutine(app.start_statsd())
|
|
self.assertIs(app.statsd_connector, connector,
|
|
'statsd.Connector should not be recreated')
|
|
finally:
|
|
self.run_coroutine(app.stop_statsd())
|
|
|
|
def test_stopping_without_starting(self):
|
|
app = sprockets_statsd.tornado.Application(statsd={
|
|
'host': 'localhost',
|
|
'port': '8125',
|
|
'prefix': 'my-service',
|
|
})
|
|
self.run_coroutine(app.stop_statsd())
|
|
|
|
def test_optional_parameters(self):
|
|
app = sprockets_statsd.tornado.Application(
|
|
statsd={
|
|
'host': 'localhost',
|
|
'port': '8125',
|
|
'prefix': 'my-service',
|
|
'reconnect_sleep': 0.5,
|
|
'wait_timeout': 0.25,
|
|
})
|
|
self.run_coroutine(app.start_statsd())
|
|
|
|
processor = app.statsd_connector.processor
|
|
self.assertEqual(0.5, processor._reconnect_sleep)
|
|
self.assertEqual(0.25, processor._wait_timeout)
|
|
self.run_coroutine(app.stop_statsd())
|
|
|
|
def test_starting_with_invalid_protocol(self):
|
|
app = sprockets_statsd.tornado.Application(statsd={
|
|
'host': 'localhost',
|
|
'prefix': 'my-service',
|
|
'protocol': 'unknown'
|
|
})
|
|
with self.assertRaises(RuntimeError):
|
|
self.run_coroutine(app.start_statsd())
|
|
|
|
def test_that_protocol_strings_are_translated(self):
|
|
app = sprockets_statsd.tornado.Application(statsd={
|
|
'host': 'localhost',
|
|
'prefix': 'my-service',
|
|
'protocol': 'tcp',
|
|
})
|
|
self.run_coroutine(app.start_statsd())
|
|
self.assertEqual(socket.IPPROTO_TCP,
|
|
app.statsd_connector.processor._ip_protocol)
|
|
self.run_coroutine(app.stop_statsd())
|
|
|
|
app = sprockets_statsd.tornado.Application(statsd={
|
|
'host': 'localhost',
|
|
'prefix': 'my-service',
|
|
'protocol': 'udp',
|
|
})
|
|
self.run_coroutine(app.start_statsd())
|
|
self.assertEqual(socket.IPPROTO_UDP,
|
|
app.statsd_connector.processor._ip_protocol)
|
|
self.run_coroutine(app.stop_statsd())
|
|
|
|
def test_disabling_statsd_prefix(self):
|
|
app = sprockets_statsd.tornado.Application(
|
|
service='my-service',
|
|
version='1.0.0',
|
|
statsd={
|
|
'host': 'localhost',
|
|
'prefix': '',
|
|
'protocol': 'udp',
|
|
},
|
|
)
|
|
self.run_coroutine(app.start_statsd())
|
|
self.assertEqual(app.statsd_connector.prefix, '')
|
|
self.run_coroutine(app.stop_statsd())
|
|
|
|
def test_disabling_statsd_connector(self):
|
|
app = sprockets_statsd.tornado.Application(
|
|
environment='development',
|
|
service='my-service',
|
|
version='1.0.0',
|
|
statsd={
|
|
'enabled': 'no',
|
|
'host': 'localhost',
|
|
},
|
|
)
|
|
self.run_coroutine(app.start_statsd())
|
|
self.assertIsNotNone(app.statsd_connector)
|
|
self.assertIsInstance(app.statsd_connector,
|
|
sprockets_statsd.statsd.AbstractConnector)
|
|
|
|
|
|
class StatsdTestCase(AsyncTestCaseWithTimeout, testing.AsyncHTTPTestCase):
|
|
Application = web.Application
|
|
|
|
def setUp(self):
|
|
self.statsd_server = None
|
|
super().setUp()
|
|
|
|
def tearDown(self):
|
|
if self.statsd_server is not None:
|
|
self.statsd_server.close()
|
|
self.run_coroutine(self.statsd_server.wait_closed())
|
|
super().tearDown()
|
|
|
|
def get_app(self):
|
|
self.statsd_server = helpers.StatsdServer(socket.IPPROTO_TCP)
|
|
self.io_loop.spawn_callback(self.statsd_server.run)
|
|
self.run_coroutine(self.statsd_server.wait_running())
|
|
self.app = self.Application(shutdown_limit=0.5,
|
|
statsd={
|
|
'host': self.statsd_server.host,
|
|
'port': self.statsd_server.port,
|
|
'prefix': 'applications.service',
|
|
'protocol': 'tcp',
|
|
})
|
|
return self.app
|
|
|
|
def wait_for_metrics(self, metric_count=3):
|
|
timeout_remaining = testing.get_async_test_timeout()
|
|
for _ in range(metric_count):
|
|
start = time.time()
|
|
try:
|
|
self.io_loop.run_sync(
|
|
self.statsd_server.message_received.acquire,
|
|
timeout=timeout_remaining)
|
|
except TimeoutError:
|
|
self.fail()
|
|
timeout_remaining -= (time.time() - start)
|
|
|
|
def parse_metric(self, metric_line: bytes) -> ParsedMetric:
|
|
decoded = metric_line.decode()
|
|
path, _, rest = decoded.partition(':')
|
|
value, _, type_code = rest.partition('|')
|
|
try:
|
|
parsed_value = float(value)
|
|
except ValueError:
|
|
self.fail(f'value of {path} is not a number: value={value!r}')
|
|
return path, parsed_value, type_code
|
|
|
|
def find_metric(self, needle: str) -> ParsedMetric:
|
|
encoded = needle.encode()
|
|
for line in self.statsd_server.metrics:
|
|
if encoded in line:
|
|
return self.parse_metric(line)
|
|
self.fail(f'failed to find metric containing {needle!r}')
|
|
|
|
|
|
class RequestHandlerTests(StatsdTestCase, AsyncTestCaseWithTimeout,
|
|
testing.AsyncHTTPTestCase):
|
|
Application = Application
|
|
|
|
def setUp(self):
|
|
super().setUp()
|
|
self.run_coroutine(self.app.start_statsd())
|
|
|
|
def tearDown(self):
|
|
self.run_coroutine(self.app.stop_statsd())
|
|
super().tearDown()
|
|
|
|
def test_the_request_metric_is_sent_last(self):
|
|
rsp = self.fetch('/')
|
|
self.assertEqual(200, rsp.code)
|
|
self.wait_for_metrics()
|
|
|
|
path, _, type_code = self.find_metric('Handler.GET.200')
|
|
self.assertEqual(path, 'applications.service.timers.Handler.GET.200')
|
|
self.assertEqual('ms', type_code)
|
|
|
|
def test_execution_timer(self):
|
|
rsp = self.fetch('/')
|
|
self.assertEqual(200, rsp.code)
|
|
self.wait_for_metrics()
|
|
|
|
path, _, type_code = self.find_metric('execution-timer')
|
|
self.assertEqual('applications.service.timers.execution-timer', path)
|
|
self.assertEqual('ms', type_code)
|
|
|
|
def test_counter(self):
|
|
rsp = self.fetch('/')
|
|
self.assertEqual(200, rsp.code)
|
|
self.wait_for_metrics()
|
|
|
|
path, value, type_code = self.find_metric('request-count')
|
|
self.assertEqual('applications.service.counters.request-count', path)
|
|
self.assertEqual(1.0, value)
|
|
self.assertEqual('c', type_code)
|
|
|
|
def test_handling_request_without_statsd_configured(self):
|
|
self.io_loop.run_sync(self.app.stop_statsd)
|
|
|
|
rsp = self.fetch('/')
|
|
self.assertEqual(200, rsp.code)
|
|
|
|
|
|
class SprocketsHttpInteropTests(StatsdTestCase, AsyncTestCaseWithTimeout,
|
|
sprockets.http.testing.SprocketsHttpTestCase):
|
|
class Application(sprockets_statsd.tornado.Application,
|
|
sprockets.http.app.Application):
|
|
def __init__(self, **settings):
|
|
super().__init__([web.url('/', Handler)], **settings)
|
|
|
|
def setUp(self):
|
|
super().setUp()
|
|
self.let_callbacks_run()
|
|
|
|
def let_callbacks_run(self):
|
|
self.io_loop.run_sync(lambda: asyncio.sleep(0))
|
|
|
|
def test_that_callbacks_are_installed(self):
|
|
self.assertIn(self.app.start_statsd, self.app.on_start_callbacks)
|
|
self.assertIn(self.app.stop_statsd, self.app.on_shutdown_callbacks)
|
|
|
|
def test_that_statsd_connector_is_enabled(self):
|
|
# verifies that the start callback actually runs correctly.
|
|
self.assertIsNotNone(self.app.statsd_connector,
|
|
'statsd_connecter was never created')
|
|
|
|
def test_that_misconfiguration_stops_application(self):
|
|
another_app = self.Application(statsd={
|
|
'host': '',
|
|
'prefix': 'whatever',
|
|
})
|
|
another_app.stop = unittest.mock.Mock(wraps=another_app.stop)
|
|
another_app.start(self.io_loop)
|
|
self.let_callbacks_run()
|
|
another_app.stop.assert_called_once_with(self.io_loop)
|