Add support for UDP sockets.

This commit is contained in:
Dave Shawley 2021-03-21 10:22:55 -04:00
parent 38a5e3f566
commit c4c44f9864
No known key found for this signature in database
GPG key ID: 44A9C9992CCFAB82
7 changed files with 236 additions and 63 deletions

View file

@ -1,3 +1,3 @@
Initial release
---------------
- support for sending counters & timers to statsd over a TCP socket
- support for sending counters & timers to statsd over a TCP or UDP socket

View file

@ -1,4 +1,40 @@
Report metrics from your tornado_ web application to a statsd_ instance.
Asynchronously send metrics to a statsd_ instance.
This library provides connectors to send metrics to a statsd_ instance using either TCP or UDP.
.. code-block:: python
import asyncio
import time
import sprockets_statsd.statsd
statsd = sprockets_statsd.statsd.Connector(
host=os.environ.get('STATSD_HOST', '127.0.0.1'))
async def do_stuff():
start = time.time()
response = make_some_http_call()
statsd.inject_metric(f'timers.http.something.{response.code}',
(time.time() - start) * 1000.0, 'ms')
async def main():
await statsd.start()
try:
do_stuff()
finally:
await statsd.stop()
The ``Connector`` instance maintains a resilient connection to the target StatsD instance, formats the metric data
into payloads, and sends them to the StatsD target. It defaults to using TCP as the transport but will use UDP if
the ``ip_protocol`` keyword is set to ``socket.IPPROTO_UDP``. The ``Connector.start`` method starts a background
``asyncio.Task`` that is responsible for maintaining the connection. The ``inject_metric`` method enqueues metric
data to send and the task consumes the internal queue when it is connected.
Tornado helpers
===============
The ``sprockets_statsd.mixins`` module contains mix-in classes that make reporting metrics from your tornado_ web
application simple.
.. code-block:: python
@ -61,4 +97,3 @@ not connected to the server and will be sent in the order received when the task
.. _statsd: https://github.com/statsd/statsd/
.. _tornado: https://tornadoweb.org/

View file

@ -1,7 +1,7 @@
[metadata]
name = sprockets-statsd
version = attr: sprockets_statsd.version
description = Asynchronous Statsd connector.
description = Asynchronously send metrics to a statsd instance.
long_description = file: README.rst
license = BSD 3-Clause License
url = https://sprockets-statsd.readthedocs.io/

View file

@ -1,5 +1,6 @@
import asyncio
import logging
import socket
import typing
@ -7,11 +8,14 @@ class Connector:
"""Sends metrics to a statsd server.
:param host: statsd server to send metrics to
:param port: TCP port that the server is listening on
:param port: socket port that the server is listening on
:keyword ip_protocol: IP protocol to use for the underlying
socket -- either :data:`socket.IPPROTO_TCP` for TCP or
:data:`socket.IPPROTO_UDP` for UDP sockets.
:param kwargs: additional keyword parameters are passed
to the :class:`.Processor` initializer
This class maintains a TCP connection to a statsd server and
This class maintains a connection to a statsd server and
sends metric lines to it asynchronously. You must call the
:meth:`start` method when your application is starting. It
creates a :class:`~asyncio.Task` that manages the connection
@ -165,6 +169,19 @@ class TCPProtocol(StatsdProtocol, asyncio.Protocol):
await asyncio.sleep(0.1)
class UDPProtocol(StatsdProtocol, asyncio.DatagramProtocol):
"""StatsdProtocol implementation over a UDP/IP connection."""
transport: asyncio.DatagramTransport
def send(self, metric: bytes) -> None:
if metric:
self.transport.sendto(metric)
async def shutdown(self) -> None:
self.logger.info('shutting down')
self.transport.close()
class Processor:
"""Maintains the statsd connection and sends metric payloads.
@ -222,12 +239,16 @@ class Processor:
"""
protocol: typing.Union[StatsdProtocol, None]
_create_transport: typing.Callable[[], typing.Coroutine[
typing.Any, typing.Any, typing.Tuple[asyncio.BaseTransport,
StatsdProtocol]]]
def __init__(self,
*,
host,
port: int = 8125,
reconnect_sleep: float = 1.0,
ip_protocol: int = socket.IPPROTO_TCP,
wait_timeout: float = 0.1):
super().__init__()
if not host:
@ -237,11 +258,21 @@ class Processor:
if not port or port < 1:
raise RuntimeError(
f'port must be a positive integer: {port!r}')
except TypeError:
except (TypeError, ValueError):
raise RuntimeError(f'port must be a positive integer: {port!r}')
transport_creators = {
socket.IPPROTO_TCP: self._create_tcp_transport,
socket.IPPROTO_UDP: self._create_udp_transport,
}
try:
self._create_transport = transport_creators[ip_protocol]
except KeyError:
raise RuntimeError(f'ip_protocol {ip_protocol} is not supported')
self.host = host
self.port = port
self._ip_protocol = ip_protocol
self._reconnect_sleep = reconnect_sleep
self._wait_timeout = wait_timeout
@ -274,7 +305,10 @@ class Processor:
await self._process_metric()
except asyncio.CancelledError:
self.logger.info('task cancelled, exiting')
break
self.should_terminate = True
except Exception as error:
self.logger.exception('unexpected error occurred: %s', error)
self.should_terminate = True
self.should_terminate = True
self.logger.info('loop finished with %d metrics in the queue',
@ -303,6 +337,20 @@ class Processor:
self.should_terminate = True
await self.stopped.wait()
async def _create_tcp_transport(
self) -> typing.Tuple[asyncio.BaseTransport, StatsdProtocol]:
t, p = await asyncio.get_running_loop().create_connection(
protocol_factory=TCPProtocol, host=self.host, port=self.port)
return t, typing.cast(StatsdProtocol, p)
async def _create_udp_transport(
self) -> typing.Tuple[asyncio.BaseTransport, StatsdProtocol]:
t, p = await asyncio.get_running_loop().create_datagram_endpoint(
protocol_factory=UDPProtocol,
remote_addr=(self.host, self.port),
reuse_port=True)
return t, typing.cast(StatsdProtocol, p)
async def _connect_if_necessary(self):
if self.protocol is not None:
try:
@ -316,12 +364,7 @@ class Processor:
buffered_data = b''
if self.protocol is not None:
buffered_data = self.protocol.buffered_data
loop = asyncio.get_running_loop()
transport, protocol = await loop.create_connection(
protocol_factory=TCPProtocol,
host=self.host,
port=self.port)
self.protocol = typing.cast(TCPProtocol, protocol)
transport, self.protocol = await self._create_transport()
self.protocol.buffered_data = buffered_data
self.logger.info('connection established to %s',
transport.get_extra_info('peername'))
@ -341,8 +384,4 @@ class Processor:
# it has queued metrics to send
metric = b''
try:
self.protocol.send(metric)
except Exception as error:
self.logger.exception('exception occurred when sending metric: %s',
error)
self.protocol.send(metric)

View file

@ -1,15 +1,23 @@
import asyncio
import io
import socket
import typing
class StatsdServer(asyncio.Protocol):
metrics: typing.List[bytes]
class SupportsClose(typing.Protocol):
def close(self) -> None:
...
def __init__(self):
self.service = None
class StatsdServer(asyncio.DatagramProtocol, asyncio.Protocol):
metrics: typing.List[bytes]
server: typing.Union[SupportsClose, None]
def __init__(self, ip_protocol):
self.server = None
self.host = '127.0.0.1'
self.port = 0
self.ip_protocol = ip_protocol
self.connections_made = 0
self.connections_lost = 0
self.message_counter = 0
@ -26,25 +34,41 @@ class StatsdServer(asyncio.Protocol):
await self._reset()
loop = asyncio.get_running_loop()
self.service = await loop.create_server(lambda: self,
self.host,
self.port,
reuse_port=True)
listening_sock = self.service.sockets[0]
self.host, self.port = listening_sock.getsockname()
self.running.set()
try:
await self.service.serve_forever()
self.running.clear()
except asyncio.CancelledError:
self.close()
await self.service.wait_closed()
except Exception as error:
raise error
if self.ip_protocol == socket.IPPROTO_TCP:
server = await loop.create_server(lambda: self,
self.host,
self.port,
reuse_port=True)
self.server = server
listening_sock = server.sockets[0]
self.host, self.port = listening_sock.getsockname()
self.running.set()
try:
await server.serve_forever()
except asyncio.CancelledError:
self.close()
await server.wait_closed()
except Exception as error:
raise error
finally:
self.running.clear()
elif self.ip_protocol == socket.IPPROTO_UDP:
transport, protocol = await loop.create_datagram_endpoint(
lambda: self,
local_addr=(self.host, self.port),
reuse_port=True)
self.server = transport
self.host, self.port = transport.get_extra_info('sockname')
self.running.set()
try:
while not transport.is_closing():
await asyncio.sleep(0.1)
finally:
self.running.clear()
def close(self):
self.running.clear()
self.service.close()
self.server.close()
for connected_client in self.transports:
connected_client.close()
self.transports.clear()
@ -53,9 +77,6 @@ class StatsdServer(asyncio.Protocol):
await self.running.wait()
async def wait_closed(self):
if self.service.is_serving():
self.close()
await self.service.wait_closed()
while self.running.is_set():
await asyncio.sleep(0.1)
@ -69,6 +90,13 @@ class StatsdServer(asyncio.Protocol):
def data_received(self, data: bytes):
self._buffer.write(data)
self._process_buffer()
def datagram_received(self, data: bytes, _addr):
self._buffer.write(data + b'\n')
self._process_buffer()
def _process_buffer(self):
buf = self._buffer.getvalue()
if b'\n' in buf:
buf_complete = buf[-1] == ord('\n')

View file

@ -1,5 +1,6 @@
import asyncio
import os
import socket
import time
import typing
@ -133,7 +134,7 @@ class ApplicationTests(testing.AsyncTestCase):
class RequestHandlerTests(testing.AsyncHTTPTestCase):
def setUp(self):
super().setUp()
self.statsd_server = helpers.StatsdServer()
self.statsd_server = helpers.StatsdServer(socket.IPPROTO_TCP)
self.io_loop.spawn_callback(self.statsd_server.run)
self.io_loop.run_sync(self.statsd_server.wait_running)

View file

@ -1,5 +1,6 @@
import asyncio
import logging
import socket
import time
import asynctest
@ -9,6 +10,8 @@ from tests import helpers
class ProcessorTestCase(asynctest.TestCase):
ip_protocol: int
async def setUp(self):
self.test_timeout = 5.0
super().setUp()
@ -25,16 +28,18 @@ class ProcessorTestCase(asynctest.TestCase):
self.fail('future took too long to resolve')
async def asyncSetUp(self):
self.statsd_server = helpers.StatsdServer()
self.statsd_server = helpers.StatsdServer(self.ip_protocol)
self.statsd_task = asyncio.create_task(self.statsd_server.run())
await self.statsd_server.wait_running()
async def asyncTearDown(self):
self.statsd_task.cancel()
self.statsd_server.close()
await self.statsd_server.wait_closed()
class ProcessorTests(ProcessorTestCase):
ip_protocol = socket.IPPROTO_TCP
async def test_that_processor_connects_and_disconnects(self):
processor = statsd.Processor(host=self.statsd_server.host,
port=self.statsd_server.port)
@ -108,19 +113,6 @@ class ProcessorTests(ProcessorTestCase):
statsd.Processor(host=None, port=12345)
self.assertIn('host', str(context.exception))
def test_that_processor_fails_when_port_is_invalid(self):
with self.assertRaises(RuntimeError) as context:
statsd.Processor(host='localhost', port=None)
self.assertIn('port', str(context.exception))
with self.assertRaises(RuntimeError) as context:
statsd.Processor(host='localhost', port=0)
self.assertIn('port', str(context.exception))
with self.assertRaises(RuntimeError) as context:
statsd.Processor(host='localhost', port=-1)
self.assertIn('port', str(context.exception))
async def test_starting_and_stopping_without_connecting(self):
host, port = self.statsd_server.host, self.statsd_server.port
self.statsd_server.close()
@ -142,20 +134,22 @@ class ProcessorTests(ProcessorTestCase):
await asyncio.sleep(0.1)
for record in cm.records:
if (record.exc_info is not None
and record.funcName == '_process_metric'):
if record.exc_info is not None and record.funcName == 'run':
break
else:
self.fail('Expected _process_metric to log exception')
self.fail('Expected run to log exception')
await processor.stop()
class TCPProcessingTests(ProcessorTestCase):
ip_protocol = socket.IPPROTO_TCP
async def asyncSetUp(self):
await super().asyncSetUp()
self.processor = statsd.Processor(host=self.statsd_server.host,
port=self.statsd_server.port)
port=self.statsd_server.port,
reconnect_sleep=0.25)
asyncio.create_task(self.processor.run())
await self.wait_for(self.statsd_server.client_connected.acquire())
@ -208,7 +202,57 @@ class TCPProcessingTests(ProcessorTestCase):
await self.wait_for(self.statsd_server.message_received.acquire())
class UDPProcessingTests(ProcessorTestCase):
ip_protocol = socket.IPPROTO_UDP
async def asyncSetUp(self):
await super().asyncSetUp()
self.connector = statsd.Connector(host=self.statsd_server.host,
port=self.statsd_server.port,
ip_protocol=self.ip_protocol,
reconnect_sleep=0.25)
await self.connector.start()
async def asyncTearDown(self):
await self.connector.stop()
await super().asyncTearDown()
async def test_sending_metrics(self):
self.connector.inject_metric('counter', 1, 'c')
self.connector.inject_metric('timer', 1.0, 'ms')
await self.wait_for(self.statsd_server.message_received.acquire())
await self.wait_for(self.statsd_server.message_received.acquire())
self.assertEqual(self.statsd_server.metrics[0], b'counter:1|c')
self.assertEqual(self.statsd_server.metrics[1], b'timer:1.0|ms')
async def test_that_client_sends_to_new_server(self):
self.statsd_server.close()
await self.statsd_server.wait_closed()
self.connector.inject_metric('should.be.lost', 1, 'c')
await asyncio.sleep(self.connector.processor._wait_timeout * 2)
self.statsd_task = asyncio.create_task(self.statsd_server.run())
await self.statsd_server.wait_running()
self.connector.inject_metric('should.be.recvd', 1, 'c')
await self.wait_for(self.statsd_server.message_received.acquire())
self.assertEqual(self.statsd_server.metrics[0], b'should.be.recvd:1|c')
async def test_that_client_handles_socket_closure(self):
self.connector.processor.protocol.transport.close()
await self.wait_for(
asyncio.sleep(self.connector.processor._reconnect_sleep))
self.connector.inject_metric('should.be.recvd', 1, 'c')
await self.wait_for(self.statsd_server.message_received.acquire())
self.assertEqual(self.statsd_server.metrics[0], b'should.be.recvd:1|c')
class ConnectorTests(ProcessorTestCase):
ip_protocol = socket.IPPROTO_TCP
async def asyncSetUp(self):
await super().asyncSetUp()
self.connector = statsd.Connector(self.statsd_server.host,
@ -289,3 +333,29 @@ class ConnectorTests(ProcessorTestCase):
await self.wait_for(self.statsd_server.message_received.acquire())
self.assertEqual(f'counter:{value}|c'.encode(),
self.statsd_server.metrics.pop(0))
class ConnectorOptionTests(ProcessorTestCase):
ip_protocol = socket.IPPROTO_TCP
def test_protocol_values(self):
connector = statsd.Connector(host=self.statsd_server.host,
port=self.statsd_server.port)
self.assertEqual(socket.IPPROTO_TCP, connector.processor._ip_protocol)
connector = statsd.Connector(host=self.statsd_server.host,
port=self.statsd_server.port,
ip_protocol=socket.IPPROTO_UDP)
self.assertEqual(socket.IPPROTO_UDP, connector.processor._ip_protocol)
with self.assertRaises(RuntimeError):
statsd.Connector(host=self.statsd_server.host,
port=self.statsd_server.port,
ip_protocol=socket.IPPROTO_GRE)
def test_invalid_port_values(self):
for port in {None, 0, -1, 'not-a-number'}:
with self.assertRaises(RuntimeError) as context:
statsd.Connector(host=self.statsd_server.host, port=port)
self.assertIn('port', str(context.exception))
self.assertIn(repr(port), str(context.exception))