mirror of
https://github.com/sprockets/sprockets-statsd.git
synced 2024-11-24 19:29:52 +00:00
2ecdee61c4
This almost makes mypy happy. I did manage to step on a defect that there really isn't a great workaround for so I resorted to disabling typing there. https://github.com/python/mypy/issues/2427
450 lines
16 KiB
Python
450 lines
16 KiB
Python
import asyncio
|
|
import logging
|
|
import socket
|
|
import typing
|
|
|
|
|
|
class Connector:
|
|
"""Sends metrics to a statsd server.
|
|
|
|
:param host: statsd server to send metrics to
|
|
:param port: socket port that the server is listening on
|
|
:keyword ip_protocol: IP protocol to use for the underlying
|
|
socket -- either ``socket.IPPROTO_TCP`` for TCP or
|
|
``socket.IPPROTO_UDP`` for UDP sockets.
|
|
:param kwargs: additional keyword parameters are passed
|
|
to the :class:`.Processor` initializer
|
|
|
|
This class maintains a connection to a statsd server and
|
|
sends metric lines to it asynchronously. You must call the
|
|
:meth:`start` method when your application is starting. It
|
|
creates a :class:`~asyncio.Task` that manages the connection
|
|
to the statsd server. You must also call :meth:`.stop` before
|
|
terminating to ensure that all metrics are flushed to the
|
|
statsd server.
|
|
|
|
When the connector is *should_terminate*, metric payloads are sent by
|
|
calling the :meth:`.inject_metric` method. The payloads are
|
|
stored in an internal queue that is consumed whenever the
|
|
connection to the server is active.
|
|
|
|
.. attribute:: processor
|
|
:type: Processor
|
|
|
|
The statsd processor that maintains the connection and
|
|
sends the metric payloads.
|
|
|
|
"""
|
|
processor: 'Processor'
|
|
|
|
def __init__(self,
|
|
host: str,
|
|
port: int = 8125,
|
|
**kwargs: typing.Any) -> None:
|
|
self.processor = Processor(host=host, port=port, **kwargs)
|
|
self._processor_task: typing.Optional[asyncio.Task[None]] = None
|
|
|
|
async def start(self) -> None:
|
|
"""Start the processor in the background.
|
|
|
|
This is a *blocking* method and does not return until the
|
|
processor task is actually running.
|
|
|
|
"""
|
|
self._processor_task = asyncio.create_task(self.processor.run())
|
|
await self.processor.running.wait()
|
|
|
|
async def stop(self) -> None:
|
|
"""Stop the background processor.
|
|
|
|
Items that are currently in the queue will be flushed to
|
|
the statsd server if possible. This is a *blocking* method
|
|
and does not return until the background processor has
|
|
stopped.
|
|
|
|
"""
|
|
await self.processor.stop()
|
|
|
|
def incr(self, path: str, value: int = 1) -> None:
|
|
"""Increment a counter metric.
|
|
|
|
:param path: counter to increment
|
|
:param value: amount to increment the counter by
|
|
|
|
"""
|
|
self.inject_metric(path, str(value), 'c')
|
|
|
|
def decr(self, path: str, value: int = 1) -> None:
|
|
"""Decrement a counter metric.
|
|
|
|
:param path: counter to decrement
|
|
:param value: amount to decrement the counter by
|
|
|
|
This is equivalent to ``self.incr(path, -value)``.
|
|
|
|
"""
|
|
self.inject_metric(path, str(-value), 'c')
|
|
|
|
def gauge(self, path: str, value: int, delta: bool = False) -> None:
|
|
"""Manipulate a gauge metric.
|
|
|
|
:param path: gauge to adjust
|
|
:param value: value to send
|
|
:param delta: is this an adjustment of the gauge?
|
|
|
|
If the `delta` parameter is ``False`` (or omitted), then
|
|
`value` is the new value to set the gauge to. Otherwise,
|
|
`value` is an adjustment for the current gauge.
|
|
|
|
"""
|
|
if delta:
|
|
payload = f'{value:+d}'
|
|
else:
|
|
payload = str(value)
|
|
self.inject_metric(path, payload, 'g')
|
|
|
|
def timing(self, path: str, seconds: float) -> None:
|
|
"""Send a timer metric.
|
|
|
|
:param path: timer to append a value to
|
|
:param seconds: number of **seconds** to record
|
|
|
|
"""
|
|
self.inject_metric(path, str(seconds * 1000.0), 'ms')
|
|
|
|
def inject_metric(self, path: str, value: str, type_code: str) -> None:
|
|
"""Send a metric to the statsd server.
|
|
|
|
:param path: formatted metric name
|
|
:param value: formatted metric value
|
|
:param type_code: type of the metric to send
|
|
|
|
This method formats the payload and inserts it on the
|
|
internal queue for future processing.
|
|
|
|
"""
|
|
payload = f'{path}:{value}|{type_code}'
|
|
self.processor.enqueue(payload.encode('utf-8'))
|
|
|
|
|
|
class StatsdProtocol(asyncio.BaseProtocol):
|
|
"""Common interface for backend protocols/transports.
|
|
|
|
UDP and TCP transports have different interfaces (sendto vs write)
|
|
so this class adapts them to a common protocol that our code
|
|
can depend on.
|
|
|
|
.. attribute:: buffered_data
|
|
:type: bytes
|
|
|
|
Bytes that are buffered due to low-level transport failures.
|
|
Since protocols & transports are created anew with each connect
|
|
attempt, the :class:`.Processor` instance ensures that data
|
|
buffered on a transport is copied over to the new transport
|
|
when creating a connection.
|
|
|
|
.. attribute:: connected
|
|
:type: asyncio.Event
|
|
|
|
Is the protocol currently connected?
|
|
|
|
"""
|
|
buffered_data: bytes
|
|
ip_protocol: int = socket.IPPROTO_NONE
|
|
logger: logging.Logger
|
|
transport: typing.Optional[asyncio.BaseTransport]
|
|
|
|
def __init__(self) -> None:
|
|
self.buffered_data = b''
|
|
self.connected = asyncio.Event()
|
|
self.logger = logging.getLogger(__package__).getChild(
|
|
self.__class__.__name__)
|
|
self.transport = None
|
|
|
|
def send(self, metric: bytes) -> None:
|
|
"""Send a metric payload over the transport."""
|
|
raise NotImplementedError()
|
|
|
|
async def shutdown(self) -> None:
|
|
"""Shutdown the transport and wait for it to close."""
|
|
raise NotImplementedError()
|
|
|
|
def connection_made(self, transport: asyncio.BaseTransport) -> None:
|
|
"""Capture the new transport and set the connected event."""
|
|
# NB - this will return a 4-part tuple in some cases
|
|
server, port = transport.get_extra_info('peername')[:2]
|
|
self.logger.info('connected to statsd %s:%s', server, port)
|
|
self.transport = transport
|
|
self.transport.set_protocol(self)
|
|
self.connected.set()
|
|
|
|
def connection_lost(self, exc: typing.Optional[Exception]) -> None:
|
|
"""Clear the connected event."""
|
|
self.logger.warning('statsd server connection lost: %s', exc)
|
|
self.connected.clear()
|
|
|
|
|
|
class TCPProtocol(StatsdProtocol, asyncio.Protocol):
|
|
"""StatsdProtocol implementation over a TCP/IP connection."""
|
|
ip_protocol = socket.IPPROTO_TCP
|
|
transport: asyncio.WriteTransport
|
|
|
|
def eof_received(self) -> None:
|
|
self.logger.warning('received EOF from statsd server')
|
|
self.connected.clear()
|
|
|
|
def send(self, metric: bytes) -> None:
|
|
"""Send `metric` to the server.
|
|
|
|
If sending the metric fails, it will be saved in
|
|
``self.buffered_data``. The processor will save and
|
|
restore the buffered data if it needs to create a
|
|
new protocol object.
|
|
|
|
"""
|
|
if not self.buffered_data and not metric:
|
|
return
|
|
|
|
self.buffered_data = self.buffered_data + metric + b'\n'
|
|
while (self.transport is not None and self.connected.is_set()
|
|
and self.buffered_data):
|
|
line, maybe_nl, rest = self.buffered_data.partition(b'\n')
|
|
line += maybe_nl
|
|
self.transport.write(line)
|
|
if self.transport.is_closing():
|
|
self.logger.warning('transport closed during write')
|
|
break
|
|
self.buffered_data = rest
|
|
|
|
async def shutdown(self) -> None:
|
|
"""Close the transport after flushing any outstanding data."""
|
|
self.logger.info('shutting down')
|
|
if self.connected.is_set():
|
|
self.send(b'') # flush buffered data
|
|
self.transport.close()
|
|
while self.connected.is_set():
|
|
await asyncio.sleep(0.1)
|
|
|
|
|
|
class UDPProtocol(StatsdProtocol, asyncio.DatagramProtocol):
|
|
"""StatsdProtocol implementation over a UDP/IP connection."""
|
|
ip_protocol = socket.IPPROTO_UDP
|
|
transport: asyncio.DatagramTransport
|
|
|
|
def send(self, metric: bytes) -> None:
|
|
if metric:
|
|
self.transport.sendto(metric)
|
|
|
|
async def shutdown(self) -> None:
|
|
self.logger.info('shutting down')
|
|
self.transport.close()
|
|
|
|
|
|
class Processor:
|
|
"""Maintains the statsd connection and sends metric payloads.
|
|
|
|
:param host: statsd server to send metrics to
|
|
:param port: TCP port that the server is listening on
|
|
:param reconnect_sleep: number of seconds to sleep after socket
|
|
error occurs when connecting
|
|
:param wait_timeout: number os seconds to wait for a message to
|
|
arrive on the queue
|
|
|
|
This class implements :class:`~asyncio.Protocol` for the statsd
|
|
TCP connection. The :meth:`.run` method is run as a background
|
|
:class:`~asyncio.Task` that consumes payloads from an internal
|
|
queue, connects to the TCP server as required, and sends the
|
|
already formatted payloads.
|
|
|
|
.. attribute:: host
|
|
:type: str
|
|
|
|
IP address or DNS name for the statsd server to send metrics to
|
|
|
|
.. attribute:: port
|
|
:type: int
|
|
|
|
TCP port number that the statsd server is listening on
|
|
|
|
.. attribute:: should_terminate
|
|
:type: bool
|
|
|
|
Flag that controls whether the background task is active or
|
|
not. This flag is set to :data:`False` when the task is started.
|
|
Setting it to :data:`True` will cause the task to shutdown in
|
|
an orderly fashion.
|
|
|
|
.. attribute:: queue
|
|
:type: asyncio.Queue
|
|
|
|
Formatted metric payloads to send to the statsd server. Enqueue
|
|
payloads to send them to the server.
|
|
|
|
.. attribute:: running
|
|
:type: asyncio.Event
|
|
|
|
Is the background task currently running? This is the event that
|
|
:meth:`.run` sets when it starts and it remains set until the task
|
|
exits.
|
|
|
|
.. attribute:: stopped
|
|
:type: asyncio.Event
|
|
|
|
Is the background task currently stopped? This is the event that
|
|
:meth:`.run` sets when it exits and that :meth:`.stop` blocks on
|
|
until the task stops.
|
|
|
|
"""
|
|
|
|
logger: logging.Logger
|
|
protocol: typing.Optional[StatsdProtocol]
|
|
queue: asyncio.Queue[bytes]
|
|
_create_transport: typing.Callable[[], typing.Coroutine[
|
|
typing.Any, typing.Any, typing.Tuple[asyncio.BaseTransport,
|
|
StatsdProtocol]]]
|
|
|
|
def __init__(self,
|
|
*,
|
|
host: str,
|
|
port: int = 8125,
|
|
reconnect_sleep: float = 1.0,
|
|
ip_protocol: int = socket.IPPROTO_TCP,
|
|
wait_timeout: float = 0.1) -> None:
|
|
super().__init__()
|
|
if not host:
|
|
raise RuntimeError('host must be set')
|
|
try:
|
|
port = int(port)
|
|
if not port or port < 1:
|
|
raise RuntimeError(
|
|
f'port must be a positive integer: {port!r}')
|
|
except (TypeError, ValueError):
|
|
raise RuntimeError(f'port must be a positive integer: {port!r}')
|
|
|
|
transport_creators = {
|
|
socket.IPPROTO_TCP: self._create_tcp_transport,
|
|
socket.IPPROTO_UDP: self._create_udp_transport,
|
|
}
|
|
try:
|
|
factory = transport_creators[ip_protocol]
|
|
except KeyError:
|
|
raise RuntimeError(f'ip_protocol {ip_protocol} is not supported')
|
|
else:
|
|
self._create_transport = factory # type: ignore
|
|
|
|
self.host = host
|
|
self.port = port
|
|
self._ip_protocol = ip_protocol
|
|
self._reconnect_sleep = reconnect_sleep
|
|
self._wait_timeout = wait_timeout
|
|
|
|
self.running = asyncio.Event()
|
|
self.stopped = asyncio.Event()
|
|
self.stopped.set()
|
|
self.logger = logging.getLogger(__package__).getChild('Processor')
|
|
self.should_terminate = False
|
|
self.protocol = None
|
|
self.queue = asyncio.Queue()
|
|
|
|
@property
|
|
def connected(self) -> bool:
|
|
"""Is the processor connected?"""
|
|
return self.protocol is not None and self.protocol.connected.is_set()
|
|
|
|
def enqueue(self, metric: bytes) -> None:
|
|
self.queue.put_nowait(metric)
|
|
|
|
async def run(self) -> None:
|
|
"""Maintains the connection and processes metric payloads."""
|
|
self.running.set()
|
|
self.stopped.clear()
|
|
self.should_terminate = False
|
|
while not self.should_terminate:
|
|
try:
|
|
await self._connect_if_necessary()
|
|
if self.connected:
|
|
await self._process_metric()
|
|
except asyncio.CancelledError:
|
|
self.logger.info('task cancelled, exiting')
|
|
self.should_terminate = True
|
|
except Exception as error:
|
|
self.logger.exception('unexpected error occurred: %s', error)
|
|
self.should_terminate = True
|
|
|
|
self.should_terminate = True
|
|
self.logger.info('loop finished with %d metrics in the queue',
|
|
self.queue.qsize())
|
|
if self.connected:
|
|
num_ready = max(self.queue.qsize(), 1)
|
|
self.logger.info('draining %d metrics', num_ready)
|
|
for _ in range(num_ready):
|
|
await self._process_metric()
|
|
self.logger.debug('closing transport')
|
|
if self.protocol is not None:
|
|
await self.protocol.shutdown()
|
|
|
|
self.logger.info('processor is exiting')
|
|
self.running.clear()
|
|
self.stopped.set()
|
|
|
|
async def stop(self) -> None:
|
|
"""Stop the processor.
|
|
|
|
This is an asynchronous but blocking method. It does not
|
|
return until enqueued metrics are flushed and the processor
|
|
connection is closed.
|
|
|
|
"""
|
|
self.should_terminate = True
|
|
await self.stopped.wait()
|
|
|
|
async def _create_tcp_transport(
|
|
self) -> typing.Tuple[asyncio.BaseTransport, StatsdProtocol]:
|
|
t, p = await asyncio.get_running_loop().create_connection(
|
|
protocol_factory=TCPProtocol, host=self.host, port=self.port)
|
|
return t, typing.cast(StatsdProtocol, p)
|
|
|
|
async def _create_udp_transport(
|
|
self) -> typing.Tuple[asyncio.BaseTransport, StatsdProtocol]:
|
|
t, p = await asyncio.get_running_loop().create_datagram_endpoint(
|
|
protocol_factory=UDPProtocol,
|
|
remote_addr=(self.host, self.port),
|
|
reuse_port=True)
|
|
return t, typing.cast(StatsdProtocol, p)
|
|
|
|
async def _connect_if_necessary(self) -> None:
|
|
if self.protocol is not None:
|
|
try:
|
|
await asyncio.wait_for(self.protocol.connected.wait(),
|
|
self._wait_timeout)
|
|
except asyncio.TimeoutError:
|
|
self.logger.debug('protocol is no longer connected')
|
|
|
|
if not self.connected:
|
|
try:
|
|
buffered_data = b''
|
|
if self.protocol is not None:
|
|
buffered_data = self.protocol.buffered_data
|
|
t, p = await self._create_transport() # type: ignore[misc]
|
|
transport, self.protocol = t, p
|
|
self.protocol.buffered_data = buffered_data
|
|
self.logger.info('connection established to %s',
|
|
transport.get_extra_info('peername'))
|
|
except IOError as error:
|
|
self.logger.warning('connection to %s:%s failed: %s',
|
|
self.host, self.port, error)
|
|
await asyncio.sleep(self._reconnect_sleep)
|
|
|
|
async def _process_metric(self) -> None:
|
|
try:
|
|
metric = await asyncio.wait_for(self.queue.get(),
|
|
self._wait_timeout)
|
|
self.logger.debug('received %r from queue', metric)
|
|
self.queue.task_done()
|
|
except asyncio.TimeoutError:
|
|
# we still want to invoke the protocol send in case
|
|
# it has queued metrics to send
|
|
metric = b''
|
|
|
|
assert self.protocol is not None # AFAICT, this cannot happen
|
|
self.protocol.send(metric)
|