Expose UDP support in the mixins.

This commit is contained in:
Dave Shawley 2021-03-21 17:45:23 -04:00
parent c4c44f9864
commit c634c20906
No known key found for this signature in database
GPG key ID: 44A9C9992CCFAB82
4 changed files with 93 additions and 22 deletions

View file

@ -17,7 +17,12 @@ the following environment variables.
The TCP port number that the StatsD server is listening on. This defaults to 8125 if it is not configured.
You can fine tune the metric payloads and the connector by setting additional values in the ``stats`` key of
.. envvar:: STATSD_PROTOCOL
The IP protocol to use when connecting to the StatsD server. You can specify either "tcp" or "udp". The
default is "tcp" if it not not configured.
You can fine tune the metric payloads and the connector by setting additional values in the ``statsd`` key of
:attr:`tornado.web.Application.settings`. See the :class:`sprockets_statsd.mixins.Application` class
documentation for a description of the supported settings.

View file

@ -1,5 +1,6 @@
import contextlib
import os
import socket
import time
from tornado import web
@ -26,6 +27,8 @@ class Application(web.Application):
+-------------------+---------------------------------------------+
| prefix | segment to prefix to metrics. |
+-------------------+---------------------------------------------+
| protocol | "tcp" or "udp" |
+-------------------+---------------------------------------------+
| reconnect_timeout | number of seconds to sleep after a statsd |
| | connection attempt fails |
+-------------------+---------------------------------------------+
@ -51,6 +54,10 @@ class Application(web.Application):
This is a convenient way to maintain consistent metric paths when
you are managing a larger number of services.
**protocol** defaults to the :envvar:`STATSD_PROTOCOL` environment
variable with a back default of "tcp" if the environment variable
is not set.
**reconnect_timeout** defaults to 1.0 seconds which limits the
aggressiveness of creating new TCP connections.
@ -63,6 +70,8 @@ class Application(web.Application):
statsd_settings.setdefault('host', os.environ.get('STATSD_HOST'))
statsd_settings.setdefault('port',
os.environ.get('STATSD_PORT', '8125'))
statsd_settings.setdefault('protocol',
os.environ.get('STATSD_PROTOCOL', 'tcp'))
try:
prefix = '.'.join([
@ -98,6 +107,15 @@ class Application(web.Application):
kwargs['reconnect_sleep'] = statsd_settings['reconnect_sleep']
if 'wait_timeout' in statsd_settings:
kwargs['wait_timeout'] = statsd_settings['wait_timeout']
if statsd_settings['protocol'] == 'tcp':
kwargs['ip_protocol'] = socket.IPPROTO_TCP
elif statsd_settings['protocol'] == 'udp':
kwargs['ip_protocol'] = socket.IPPROTO_UDP
else:
raise RuntimeError(
f'statsd configuration error:'
f' {statsd_settings["protocol"]} is not a valid'
f' protocol')
self.statsd_connector = statsd.Connector(**kwargs)
await self.statsd_connector.start()

View file

@ -10,8 +10,8 @@ class Connector:
:param host: statsd server to send metrics to
:param port: socket port that the server is listening on
:keyword ip_protocol: IP protocol to use for the underlying
socket -- either :data:`socket.IPPROTO_TCP` for TCP or
:data:`socket.IPPROTO_UDP` for UDP sockets.
socket -- either ``socket.IPPROTO_TCP`` for TCP or
``socket.IPPROTO_UDP`` for UDP sockets.
:param kwargs: additional keyword parameters are passed
to the :class:`.Processor` initializer
@ -117,7 +117,8 @@ class StatsdProtocol:
def connection_made(self, transport: asyncio.Transport):
"""Capture the new transport and set the connected event."""
server, port = transport.get_extra_info('peername')
# NB - this will return a 4-part tuple in some cases
server, port = transport.get_extra_info('peername')[:2]
self.logger.info('connected to statsd %s:%s', server, port)
self.transport = transport
self.connected.set()

View file

@ -25,7 +25,18 @@ class Application(sprockets_statsd.mixins.Application, web.Application):
super().__init__([web.url('/', Handler)], **settings)
class ApplicationTests(testing.AsyncTestCase):
class AsyncTestCaseWithTimeout(testing.AsyncTestCase):
def run_coroutine(self, coro):
loop: asyncio.AbstractEventLoop = self.io_loop.asyncio_loop
try:
loop.run_until_complete(
asyncio.wait_for(coro,
timeout=testing.get_async_test_timeout()))
except asyncio.TimeoutError:
self.fail(f'coroutine {coro} took too long to complete')
class ApplicationTests(AsyncTestCaseWithTimeout):
def setUp(self):
super().setUp()
self._environ = {}
@ -40,6 +51,7 @@ class ApplicationTests(testing.AsyncTestCase):
def test_statsd_setting_defaults(self):
self.unsetenv('STATSD_HOST')
self.unsetenv('STATSD_PORT')
self.unsetenv('STATSD_PROTOCOL')
app = sprockets_statsd.mixins.Application()
self.assertIn('statsd', app.settings)
@ -47,15 +59,18 @@ class ApplicationTests(testing.AsyncTestCase):
'default host value should be None')
self.assertEqual(8125, app.settings['statsd']['port'])
self.assertEqual(None, app.settings['statsd']['prefix'])
self.assertEqual('tcp', app.settings['statsd']['protocol'])
def test_that_statsd_settings_read_from_environment(self):
self.setenv('STATSD_HOST', 'statsd')
self.setenv('STATSD_PORT', '5218')
self.setenv('STATSD_PROTOCOL', 'udp')
app = sprockets_statsd.mixins.Application()
self.assertIn('statsd', app.settings)
self.assertEqual('statsd', app.settings['statsd']['host'])
self.assertEqual(5218, app.settings['statsd']['port'])
self.assertEqual('udp', app.settings['statsd']['protocol'])
def test_prefix_when_only_service_is_set(self):
app = sprockets_statsd.mixins.Application(service='blah')
@ -77,20 +92,24 @@ class ApplicationTests(testing.AsyncTestCase):
def test_overridden_settings(self):
self.setenv('STATSD_HOST', 'statsd')
self.setenv('STATSD_PORT', '9999')
app = sprockets_statsd.mixins.Application(statsd={
'host': 'statsd.example.com',
'port': 5218,
'prefix': 'myapp',
})
self.setenv('STATSD_PROTOCOL', 'tcp')
app = sprockets_statsd.mixins.Application(
statsd={
'host': 'statsd.example.com',
'port': 5218,
'prefix': 'myapp',
'protocol': 'udp',
})
self.assertEqual('statsd.example.com', app.settings['statsd']['host'])
self.assertEqual(5218, app.settings['statsd']['port'])
self.assertEqual('myapp', app.settings['statsd']['prefix'])
self.assertEqual('udp', app.settings['statsd']['protocol'])
def test_that_starting_without_configuration_fails(self):
self.unsetenv('STATSD_HOST')
app = sprockets_statsd.mixins.Application()
with self.assertRaises(RuntimeError):
self.io_loop.run_sync(app.start_statsd)
self.run_coroutine(app.start_statsd())
def test_starting_twice(self):
app = sprockets_statsd.mixins.Application(statsd={
@ -98,22 +117,22 @@ class ApplicationTests(testing.AsyncTestCase):
'port': '8125',
})
try:
self.io_loop.run_sync(app.start_statsd)
self.run_coroutine(app.start_statsd())
connector = app.statsd_connector
self.assertIsNotNone(connector, 'statsd.Connector not created')
self.io_loop.run_sync(app.start_statsd)
self.run_coroutine(app.start_statsd())
self.assertIs(app.statsd_connector, connector,
'statsd.Connector should not be recreated')
finally:
self.io_loop.run_sync(app.stop_statsd)
self.run_coroutine(app.stop_statsd())
def test_stopping_without_starting(self):
app = sprockets_statsd.mixins.Application(statsd={
'host': 'localhost',
'port': '8125',
})
self.io_loop.run_sync(app.stop_statsd)
self.run_coroutine(app.stop_statsd())
def test_optional_parameters(self):
app = sprockets_statsd.mixins.Application(
@ -123,32 +142,60 @@ class ApplicationTests(testing.AsyncTestCase):
'reconnect_sleep': 0.5,
'wait_timeout': 0.25,
})
self.io_loop.run_sync(app.start_statsd)
self.run_coroutine(app.start_statsd())
processor = app.statsd_connector.processor
self.assertEqual(0.5, processor._reconnect_sleep)
self.assertEqual(0.25, processor._wait_timeout)
self.io_loop.run_sync(app.stop_statsd)
self.run_coroutine(app.stop_statsd())
def test_starting_with_invalid_protocol(self):
app = sprockets_statsd.mixins.Application(statsd={
'host': 'localhost',
'protocol': 'unknown'
})
with self.assertRaises(RuntimeError):
self.run_coroutine(app.start_statsd())
def test_that_protocol_strings_are_translated(self):
app = sprockets_statsd.mixins.Application(statsd={
'host': 'localhost',
'protocol': 'tcp',
})
self.run_coroutine(app.start_statsd())
self.assertEqual(socket.IPPROTO_TCP,
app.statsd_connector.processor._ip_protocol)
self.run_coroutine(app.stop_statsd())
app = sprockets_statsd.mixins.Application(statsd={
'host': 'localhost',
'protocol': 'udp',
})
self.run_coroutine(app.start_statsd())
self.assertEqual(socket.IPPROTO_UDP,
app.statsd_connector.processor._ip_protocol)
self.run_coroutine(app.stop_statsd())
class RequestHandlerTests(testing.AsyncHTTPTestCase):
class RequestHandlerTests(AsyncTestCaseWithTimeout, testing.AsyncHTTPTestCase):
def setUp(self):
super().setUp()
self.statsd_server = helpers.StatsdServer(socket.IPPROTO_TCP)
self.io_loop.spawn_callback(self.statsd_server.run)
self.io_loop.run_sync(self.statsd_server.wait_running)
self.run_coroutine(self.statsd_server.wait_running())
self.app.settings['statsd'].update({
'host': self.statsd_server.host,
'port': self.statsd_server.port,
'prefix': 'applications.service',
'protocol': 'tcp',
})
self.io_loop.run_sync(self.app.start_statsd)
self.run_coroutine(self.app.start_statsd())
def tearDown(self):
self.io_loop.run_sync(self.app.stop_statsd)
self.run_coroutine(self.app.stop_statsd())
self.statsd_server.close()
self.io_loop.run_sync(self.statsd_server.wait_closed)
self.run_coroutine(self.statsd_server.wait_closed())
super().tearDown()
def get_app(self):