Move path prefixing into Connector.
This makes the low-level code a little more opinionated.  If there is a
good reason for making `timers`, `counters`, etc. optional, then we can
PR it into place.
2021-03-30 07:56:20 -04:00

import asyncio
import logging
import socket
import typing
class Connector:
"""Sends metrics to a statsd server.
:param host: statsd server to send metrics to
:param port: socket port that the server is listening on
:keyword ip_protocol: IP protocol to use for the underlying
socket -- either ``socket.IPPROTO_TCP`` for TCP or
``socket.IPPROTO_UDP`` for UDP sockets.
:keyword prefix: optional string to prepend to metric paths
:param kwargs: additional keyword parameters are passed
to the :class:`.Processor` initializer
This class maintains a connection to a statsd server and
sends metric lines to it asynchronously. You must call the
:meth:`start` method when your application is starting. It
creates a :class:`~asyncio.Task` that manages the connection
to the statsd server. You must also call :meth:`.stop` before
terminating to ensure that all metrics are flushed to the
statsd server.
Metrics are optionally prefixed with :attr:`prefix` before the
metric type prefix. This *should* be used to prevent metrics
from being overwritten when multiple applications share a StatsD
instance. Each metric type is also prefixed by one of the
following strings based on the metric type:
| Method call | Prefix | Type code |
| :meth:`.incr` | ``counters.`` | ``c`` |
| :meth:`.decr` | ``counters.`` | ``c`` |
| :meth:`.gauge` | ``gauges.`` | ``g`` |
| :meth:`.timing` | ``timers.`` | ``ms`` |
When the connector is *should_terminate*, metric payloads are
sent by calling the :meth:`.inject_metric` method. The payloads
are stored in an internal queue that is consumed whenever the
connection to the server is active.
.. attribute:: prefix
:type: str
String to prefix to all metrics *before* the metric type prefix.
.. attribute:: processor
:type: Processor
The statsd processor that maintains the connection and
sends the metric payloads.
logger: logging.Logger
prefix: str
processor: 'Processor'
def __init__(self,
host: str,
port: int = 8125,
prefix: str = '',
**kwargs: typing.Any) -> None:
self.logger = logging.getLogger(__package__).getChild('Connector')
self.prefix = f'{prefix}.' if prefix else prefix
self.processor = Processor(host=host, port=port, **kwargs)
self._processor_task: typing.Optional[asyncio.Task[None]] = None
async def start(self) -> None:
"""Start the processor in the background.
This is a *blocking* method and does not return until the
processor task is actually running.
self._processor_task = asyncio.create_task(self.processor.run())
await self.processor.running.wait()
async def stop(self) -> None:
"""Stop the background processor.
Items that are currently in the queue will be flushed to
the statsd server if possible. This is a *blocking* method
and does not return until the background processor has
await self.processor.stop()
def incr(self, path: str, value: int = 1) -> None:
"""Increment a counter metric.
:param path: counter to increment
:param value: amount to increment the counter by
self.inject_metric(f'counters.{path}', str(value), 'c')
def decr(self, path: str, value: int = 1) -> None:
"""Decrement a counter metric.
:param path: counter to decrement
:param value: amount to decrement the counter by
This is equivalent to ``self.incr(path, -value)``.
self.inject_metric(f'counters.{path}', str(-value), 'c')
def gauge(self, path: str, value: int, delta: bool = False) -> None:
"""Manipulate a gauge metric.
:param path: gauge to adjust
:param value: value to send
:param delta: is this an adjustment of the gauge?
If the `delta` parameter is ``False`` (or omitted), then
`value` is the new value to set the gauge to. Otherwise,
`value` is an adjustment for the current gauge.
if delta:
payload = f'{value:+d}'
payload = str(value)
self.inject_metric(f'gauges.{path}', payload, 'g')
def timing(self, path: str, seconds: float) -> None:
"""Send a timer metric.
:param path: timer to append a value to
:param seconds: number of **seconds** to record
self.inject_metric(f'timers.{path}', str(seconds * 1000.0), 'ms')
def inject_metric(self, path: str, value: str, type_code: str) -> None:
"""Send a metric to the statsd server.
:param path: formatted metric name
:param value: formatted metric value
:param type_code: type of the metric to send
This method formats the payload and inserts it on the
internal queue for future processing.
payload = f'{self.prefix}{path}:{value}|{type_code}'
except asyncio.QueueFull:
self.logger.warning('statsd queue is full, discarding metric')
class StatsdProtocol(asyncio.BaseProtocol):
"""Common interface for backend protocols/transports.
UDP and TCP transports have different interfaces (sendto vs write)
so this class adapts them to a common protocol that our code
can depend on.
.. attribute:: buffered_data
:type: bytes
Bytes that are buffered due to low-level transport failures.
Since protocols & transports are created anew with each connect
attempt, the :class:`.Processor` instance ensures that data
buffered on a transport is copied over to the new transport
when creating a connection.
.. attribute:: connected
:type: asyncio.Event
Is the protocol currently connected?
buffered_data: bytes
ip_protocol: int = socket.IPPROTO_NONE
logger: logging.Logger
transport: typing.Optional[asyncio.BaseTransport]
def __init__(self) -> None:
self.buffered_data = b''
self.connected = asyncio.Event()
self.logger = logging.getLogger(__package__).getChild(
self.transport = None
def send(self, metric: bytes) -> None:
"""Send a metric payload over the transport."""
raise NotImplementedError()
async def shutdown(self) -> None:
"""Shutdown the transport and wait for it to close."""
raise NotImplementedError()
def connection_made(self, transport: asyncio.BaseTransport) -> None:
"""Capture the new transport and set the connected event."""
# NB - this will return a 4-part tuple in some cases
server, port = transport.get_extra_info('peername')[:2]
self.logger.info('connected to statsd %s:%s', server, port)
self.transport = transport
def connection_lost(self, exc: typing.Optional[Exception]) -> None:
"""Clear the connected event."""
self.logger.warning('statsd server connection lost: %s', exc)
class TCPProtocol(StatsdProtocol, asyncio.Protocol):
"""StatsdProtocol implementation over a TCP/IP connection."""
ip_protocol = socket.IPPROTO_TCP
transport: asyncio.WriteTransport
def eof_received(self) -> None:
self.logger.warning('received EOF from statsd server')
def send(self, metric: bytes) -> None:
"""Send `metric` to the server.
If sending the metric fails, it will be saved in
``self.buffered_data``. The processor will save and
restore the buffered data if it needs to create a
new protocol object.
if not self.buffered_data and not metric:
self.buffered_data = self.buffered_data + metric + b'\n'
while (self.transport is not None and self.connected.is_set()
and self.buffered_data):
line, maybe_nl, rest = self.buffered_data.partition(b'\n')
line += maybe_nl
if self.transport.is_closing():
self.logger.warning('transport closed during write')
self.buffered_data = rest
async def shutdown(self) -> None:
"""Close the transport after flushing any outstanding data."""
self.logger.info('shutting down')
if self.connected.is_set():
self.send(b'') # flush buffered data
while self.connected.is_set():
await asyncio.sleep(0.1)
class UDPProtocol(StatsdProtocol, asyncio.DatagramProtocol):
"""StatsdProtocol implementation over a UDP/IP connection."""
ip_protocol = socket.IPPROTO_UDP
transport: asyncio.DatagramTransport
def send(self, metric: bytes) -> None:
if metric:
async def shutdown(self) -> None:
self.logger.info('shutting down')
class Processor:
"""Maintains the statsd connection and sends metric payloads.
:param host: statsd server to send metrics to
:param port: TCP port that the server is listening on
:param max_queue_size: only allow this many elements to be
stored in the queue before discarding metrics
:param reconnect_sleep: number of seconds to sleep after socket
error occurs when connecting
:param wait_timeout: number os seconds to wait for a message to
arrive on the queue
This class implements :class:`~asyncio.Protocol` for the statsd
TCP connection. The :meth:`.run` method is run as a background
:class:`~asyncio.Task` that consumes payloads from an internal
queue, connects to the TCP server as required, and sends the
already formatted payloads.
.. attribute:: host
:type: str
IP address or DNS name for the statsd server to send metrics to
.. attribute:: port
:type: int
TCP port number that the statsd server is listening on
.. attribute:: should_terminate
:type: bool
Flag that controls whether the background task is active or
not. This flag is set to :data:`False` when the task is started.
Setting it to :data:`True` will cause the task to shutdown in
an orderly fashion.
.. attribute:: queue
:type: asyncio.Queue
Formatted metric payloads to send to the statsd server. Enqueue
payloads to send them to the server.
.. attribute:: running
:type: asyncio.Event
Is the background task currently running? This is the event that
:meth:`.run` sets when it starts and it remains set until the task
.. attribute:: stopped
:type: asyncio.Event
Is the background task currently stopped? This is the event that
:meth:`.run` sets when it exits and that :meth:`.stop` blocks on
until the task stops.
logger: logging.Logger
protocol: typing.Optional[StatsdProtocol]
queue: asyncio.Queue[bytes]
_create_transport: typing.Callable[[], typing.Coroutine[
typing.Any, typing.Any, typing.Tuple[asyncio.BaseTransport,
def __init__(self,
host: str,
port: int = 8125,
ip_protocol: int = socket.IPPROTO_TCP,
max_queue_size: int = 1000,
reconnect_sleep: float = 1.0,
wait_timeout: float = 0.1) -> None:
if not host:
raise RuntimeError('host must be set')
port = int(port)
if not port or port < 1:
raise RuntimeError(
f'port must be a positive integer: {port!r}')
except (TypeError, ValueError):
raise RuntimeError(f'port must be a positive integer: {port!r}')
transport_creators = {
socket.IPPROTO_TCP: self._create_tcp_transport,
socket.IPPROTO_UDP: self._create_udp_transport,
factory = transport_creators[ip_protocol]
except KeyError:
raise RuntimeError(f'ip_protocol {ip_protocol} is not supported')
self._create_transport = factory # type: ignore
self.host = host
self.port = port
self._ip_protocol = ip_protocol
self._reconnect_sleep = reconnect_sleep
self._wait_timeout = wait_timeout
self.running = asyncio.Event()
self.stopped = asyncio.Event()
self.logger = logging.getLogger(__package__).getChild('Processor')
self.should_terminate = False
self.protocol = None
self.queue = asyncio.Queue(maxsize=max_queue_size)
def connected(self) -> bool:
"""Is the processor connected?"""
return self.protocol is not None and self.protocol.connected.is_set()
def enqueue(self, metric: bytes) -> None:
async def run(self) -> None:
"""Maintains the connection and processes metric payloads."""
self.should_terminate = False
while not self.should_terminate:
await self._connect_if_necessary()
if self.connected:
await self._process_metric()
except asyncio.CancelledError:
self.logger.info('task cancelled, exiting')
self.should_terminate = True
except Exception as error:
self.logger.exception('unexpected error occurred: %s', error)
self.should_terminate = True
self.should_terminate = True
self.logger.info('loop finished with %d metrics in the queue',
if self.connected:
num_ready = max(self.queue.qsize(), 1)
self.logger.info('draining %d metrics', num_ready)
for _ in range(num_ready):
await self._process_metric()
self.logger.debug('closing transport')
if self.protocol is not None:
await self.protocol.shutdown()
self.logger.info('processor is exiting')
async def stop(self) -> None:
"""Stop the processor.
This is an asynchronous but blocking method. It does not
return until enqueued metrics are flushed and the processor
connection is closed.
self.should_terminate = True
await self.stopped.wait()
async def _create_tcp_transport(
self) -> typing.Tuple[asyncio.BaseTransport, StatsdProtocol]:
t, p = await asyncio.get_running_loop().create_connection(
protocol_factory=TCPProtocol, host=self.host, port=self.port)
return t, typing.cast(StatsdProtocol, p)
async def _create_udp_transport(
self) -> typing.Tuple[asyncio.BaseTransport, StatsdProtocol]:
t, p = await asyncio.get_running_loop().create_datagram_endpoint(
remote_addr=(self.host, self.port),
return t, typing.cast(StatsdProtocol, p)
async def _connect_if_necessary(self) -> None:
if self.protocol is not None:
await asyncio.wait_for(self.protocol.connected.wait(),
except asyncio.TimeoutError:
self.logger.debug('protocol is no longer connected')
if not self.connected:
buffered_data = b''
if self.protocol is not None:
buffered_data = self.protocol.buffered_data
t, p = await self._create_transport() # type: ignore[misc]
transport, self.protocol = t, p
self.protocol.buffered_data = buffered_data
self.logger.info('connection established to %s',
except IOError as error:
self.logger.warning('connection to %s:%s failed: %s',
self.host, self.port, error)
await asyncio.sleep(self._reconnect_sleep)
async def _process_metric(self) -> None:
metric = await asyncio.wait_for(self.queue.get(),
self.logger.debug('received %r from queue', metric)
except asyncio.TimeoutError:
# we still want to invoke the protocol send in case
# it has queued metrics to send
metric = b''
assert self.protocol is not None # AFAICT, this cannot happen