Add docs, rename a few things.

This commit is contained in:
Dave Shawley 2021-03-08 07:27:40 -05:00
parent ed6e479e2a
commit 0d5b212efc
No known key found for this signature in database
GPG key ID: 44A9C9992CCFAB82
3 changed files with 175 additions and 43 deletions

View file

@ -4,6 +4,19 @@ sprockets-statsd
.. include:: ../README.rst .. include:: ../README.rst
Reference
=========
Connector
---------
.. autoclass:: sprockets_statsd.statsd.Connector
:members:
Processor internals
-------------------
.. autoclass:: sprockets_statsd.statsd.Processor
:members:
Release history Release history
=============== ===============

View file

@ -3,24 +3,136 @@ import logging
import typing import typing
class Connector:
"""Sends metrics to a statsd server.
:param host: statsd server to send metrics to
:param port: TCP port that the server is listening on
This class maintains a TCP connection to a statsd server and
sends metric lines to it asynchronously. You must call the
:meth:`start` method when your application is starting. It
creates a :class:`~asyncio.Task` that manages the connection
to the statsd server. You must also call :meth:`.stop` before
terminating to ensure that all metrics are flushed to the
statsd server.
When the connector is *should_terminate*, metric payloads are sent by
calling the :meth:`.inject_metric` method. The payloads are
stored in an internal queue that is consumed whenever the
connection to the server is active.
.. attribute:: processor
:type: Processor
The statsd processor that maintains the connection and
sends the metric payloads.
"""
def __init__(self, host: str, port: int = 8125):
self.processor = Processor(host=host, port=port)
self._processor_task = None
async def start(self):
"""Start the processor in the background."""
self._processor_task = asyncio.create_task(self.processor.run())
async def stop(self):
"""Stop the background processor.
Items that are currently in the queue will be flushed to
the statsd server if possible. This is a *blocking* method
and does not return until the background processor has
stopped.
"""
await self.processor.stop()
def inject_metric(self, path: str, value, type_code: str):
"""Send a metric to the statsd server.
:param path: formatted metric name
:param value: metric value as a number or a string. The
string form is required for relative gauges.
:param type_code: type of the metric to send
This method formats the payload and inserts it on the
internal queue for future processing.
"""
payload = f'{path}:{value}|{type_code}\n'
self.processor.queue.put_nowait(payload.encode('utf-8'))
class Processor(asyncio.Protocol): class Processor(asyncio.Protocol):
def __init__(self, *, host, port: int = 8125, **kwargs): """Maintains the statsd connection and sends metric payloads.
super().__init__(**kwargs)
:param host: statsd server to send metrics to
:param port: TCP port that the server is listening on
This class implements :class:`~asyncio.Protocol` for the statsd
TCP connection. The :meth:`.run` method is run as a background
:class:`~asyncio.Task` that consumes payloads from an internal
queue, connects to the TCP server as required, and sends the
already formatted payloads.
.. attribute:: host
:type: str
IP address or DNS name for the statsd server to send metrics to
.. attribute:: port
:type: int
TCP port number that the statsd server is listening on
.. attribute:: should_terminate
:type: bool
Flag that controls whether the background task is active or
not. This flag is set to :data:`False` when the task is started.
Setting it to :data:`True` will cause the task to shutdown in
an orderly fashion.
.. attribute:: queue
:type: asyncio.Queue
Formatted metric payloads to send to the statsd server. Enqueue
payloads to send them to the server.
.. attribute:: connected
:type: asyncio.Event
Is the TCP connection currently connected?
.. attribute:: stopped
:type: asyncio.Event
Is the background task currently stopped? This is the event that
:meth:`.run` sets when it exits and that :meth:`.stop` blocks on
until the task stops.
"""
def __init__(self, *, host, port: int = 8125):
super().__init__()
self.host = host self.host = host
self.port = port self.port = port
self.closed = asyncio.Event() self.stopped = asyncio.Event()
self.stopped.set()
self.connected = asyncio.Event() self.connected = asyncio.Event()
self.logger = logging.getLogger(__package__).getChild('Processor') self.logger = logging.getLogger(__package__).getChild('Processor')
self.running = False self.should_terminate = False
self.transport = None self.transport = None
self.queue = asyncio.Queue()
self._queue = asyncio.Queue()
self._failed_sends = [] self._failed_sends = []
async def run(self): async def run(self):
self.running = True """Maintains the connection and processes metric payloads."""
while self.running: self.stopped.clear()
self.should_terminate = False
while not self.should_terminate:
try: try:
await self._connect_if_necessary() await self._connect_if_necessary()
await self._process_metric() await self._process_metric()
@ -28,11 +140,11 @@ class Processor(asyncio.Protocol):
self.logger.info('task cancelled, exiting') self.logger.info('task cancelled, exiting')
break break
self.running = False self.should_terminate = True
self.logger.info('loop finished with %d metrics in the queue', self.logger.info('loop finished with %d metrics in the queue',
self._queue.qsize()) self.queue.qsize())
if self.connected.is_set(): if self.connected.is_set():
num_ready = self._queue.qsize() num_ready = self.queue.qsize()
self.logger.info('draining %d metrics', num_ready) self.logger.info('draining %d metrics', num_ready)
for _ in range(num_ready): for _ in range(num_ready):
await self._process_metric() await self._process_metric()
@ -44,16 +156,18 @@ class Processor(asyncio.Protocol):
await asyncio.sleep(0.1) await asyncio.sleep(0.1)
self.logger.info('processor is exiting') self.logger.info('processor is exiting')
self.closed.set() self.stopped.set()
async def stop(self): async def stop(self):
self.running = False """Stop the processor.
await self.closed.wait()
def inject_metric(self, path: str, value: typing.Union[float, int, str], This is an asynchronous but blocking method. It does not
type_code: str): return until enqueued metrics are flushed and the processor
payload = f'{path}:{value}|{type_code}\n' connection is closed.
self._queue.put_nowait(payload.encode('utf-8'))
"""
self.should_terminate = True
await self.stopped.wait()
def eof_received(self): def eof_received(self):
self.logger.warning('received EOF from statsd server') self.logger.warning('received EOF from statsd server')
@ -92,7 +206,7 @@ class Processor(asyncio.Protocol):
processing_failed_send = True processing_failed_send = True
else: else:
try: try:
metric = await asyncio.wait_for(self._queue.get(), 0.1) metric = await asyncio.wait_for(self.queue.get(), 0.1)
self.logger.debug('received %r from queue', metric) self.logger.debug('received %r from queue', metric)
except asyncio.TimeoutError: except asyncio.TimeoutError:
return return
@ -116,4 +230,4 @@ class Processor(asyncio.Protocol):
if processing_failed_send: if processing_failed_send:
self._failed_sends.pop(0) self._failed_sends.pop(0)
else: else:
self._queue.task_done() self.queue.task_done()

View file

@ -71,7 +71,7 @@ class ProcessorTests(ProcessorTestCase):
await self.wait_for(self.statsd_server.client_connected.acquire()) await self.wait_for(self.statsd_server.client_connected.acquire())
task.cancel() task.cancel()
await self.wait_for(processor.closed.wait()) await self.wait_for(processor.stopped.wait())
async def test_shutdown_when_disconnected(self): async def test_shutdown_when_disconnected(self):
processor = statsd.Processor(host=self.statsd_server.host, processor = statsd.Processor(host=self.statsd_server.host,
@ -113,17 +113,22 @@ class ProcessorTests(ProcessorTestCase):
await self.wait_for(self.statsd_server.client_connected.acquire()) await self.wait_for(self.statsd_server.client_connected.acquire())
async def test_that_stopping_when_not_running_is_safe(self):
processor = statsd.Processor(host=self.statsd_server.host,
port=self.statsd_server.port)
await self.wait_for(processor.stop())
class MetricProcessingTests(ProcessorTestCase):
class ConnectorTests(ProcessorTestCase):
async def asyncSetUp(self): async def asyncSetUp(self):
await super().asyncSetUp() await super().asyncSetUp()
self.processor = statsd.Processor(host=self.statsd_server.host, self.connector = statsd.Connector(self.statsd_server.host,
port=self.statsd_server.port) self.statsd_server.port)
asyncio.create_task(self.processor.run()) await self.connector.start()
await self.wait_for(self.statsd_server.client_connected.acquire()) await self.wait_for(self.statsd_server.client_connected.acquire())
async def asyncTearDown(self): async def asyncTearDown(self):
await self.wait_for(self.processor.stop()) await self.wait_for(self.connector.stop())
await super().asyncTearDown() await super().asyncTearDown()
def assert_metrics_equal(self, recvd: bytes, path, value, type_code): def assert_metrics_equal(self, recvd: bytes, path, value, type_code):
@ -135,15 +140,15 @@ class MetricProcessingTests(ProcessorTestCase):
self.assertEqual(recvd_code, type_code, 'metric type mismatch') self.assertEqual(recvd_code, type_code, 'metric type mismatch')
async def test_sending_simple_counter(self): async def test_sending_simple_counter(self):
self.processor.inject_metric('simple.counter', 1000, 'c') self.connector.inject_metric('simple.counter', 1000, 'c')
await self.wait_for(self.statsd_server.message_received.acquire()) await self.wait_for(self.statsd_server.message_received.acquire())
self.assert_metrics_equal(self.statsd_server.metrics[0], self.assert_metrics_equal(self.statsd_server.metrics[0],
'simple.counter', 1000, 'c') 'simple.counter', 1000, 'c')
async def test_adjusting_gauge(self): async def test_adjusting_gauge(self):
self.processor.inject_metric('simple.gauge', 100, 'g') self.connector.inject_metric('simple.gauge', 100, 'g')
self.processor.inject_metric('simple.gauge', -10, 'g') self.connector.inject_metric('simple.gauge', -10, 'g')
self.processor.inject_metric('simple.gauge', '+10', 'g') self.connector.inject_metric('simple.gauge', '+10', 'g')
for _ in range(3): for _ in range(3):
await self.wait_for(self.statsd_server.message_received.acquire()) await self.wait_for(self.statsd_server.message_received.acquire())
@ -156,7 +161,7 @@ class MetricProcessingTests(ProcessorTestCase):
async def test_sending_timer(self): async def test_sending_timer(self):
secs = 12.34 secs = 12.34
self.processor.inject_metric('simple.timer', secs * 1000.0, 'ms') self.connector.inject_metric('simple.timer', secs * 1000.0, 'ms')
await self.wait_for(self.statsd_server.message_received.acquire()) await self.wait_for(self.statsd_server.message_received.acquire())
self.assert_metrics_equal(self.statsd_server.metrics[0], self.assert_metrics_equal(self.statsd_server.metrics[0],
'simple.timer', 12340.0, 'ms') 'simple.timer', 12340.0, 'ms')
@ -165,19 +170,19 @@ class MetricProcessingTests(ProcessorTestCase):
# The easiest way to test that the internal metrics queue # The easiest way to test that the internal metrics queue
# is drained when the processor is stopped is to monkey # is drained when the processor is stopped is to monkey
# patch the "process metric" method to enqueue a few # patch the "process metric" method to enqueue a few
# metrics and then set running to false. It will exit # metrics and then terminate the processor. It will exit
# the run loop and drain the queue. # the run loop and drain the queue.
real_process_metric = self.processor._process_metric real_process_metric = self.connector.processor._process_metric
async def fake_process_metric(): async def fake_process_metric():
if self.processor.running: if not self.connector.processor.should_terminate:
self.processor.inject_metric('counter', 1, 'c') self.connector.inject_metric('counter', 1, 'c')
self.processor.inject_metric('counter', 2, 'c') self.connector.inject_metric('counter', 2, 'c')
self.processor.inject_metric('counter', 3, 'c') self.connector.inject_metric('counter', 3, 'c')
self.processor.running = False self.connector.processor.should_terminate = True
return await real_process_metric() return await real_process_metric()
self.processor._process_metric = fake_process_metric self.connector.processor._process_metric = fake_process_metric
await self.wait_for(self.statsd_server.message_received.acquire()) await self.wait_for(self.statsd_server.message_received.acquire())
await self.wait_for(self.statsd_server.message_received.acquire()) await self.wait_for(self.statsd_server.message_received.acquire())
await self.wait_for(self.statsd_server.message_received.acquire()) await self.wait_for(self.statsd_server.message_received.acquire())
@ -187,7 +192,7 @@ class MetricProcessingTests(ProcessorTestCase):
await self.statsd_server.wait_closed() await self.statsd_server.wait_closed()
for value in range(50): for value in range(50):
self.processor.inject_metric('counter', value, 'c') self.connector.inject_metric('counter', value, 'c')
asyncio.create_task(self.statsd_server.run()) asyncio.create_task(self.statsd_server.run())
await self.wait_for(self.statsd_server.client_connected.acquire()) await self.wait_for(self.statsd_server.client_connected.acquire())
@ -198,15 +203,15 @@ class MetricProcessingTests(ProcessorTestCase):
async def test_socket_closure_while_processing_failed_event(self): async def test_socket_closure_while_processing_failed_event(self):
state = {'first_time': True} state = {'first_time': True}
real_process_metric = self.processor._process_metric real_process_metric = self.connector.processor._process_metric
async def fake_process_metric(): async def fake_process_metric():
if state['first_time']: if state['first_time']:
self.processor._failed_sends.append(b'counter:1|c\n') self.connector.processor._failed_sends.append(b'counter:1|c\n')
self.processor.transport.close() self.connector.processor.transport.close()
state['first_time'] = False state['first_time'] = False
return await real_process_metric() return await real_process_metric()
self.processor._process_metric = fake_process_metric self.connector.processor._process_metric = fake_process_metric
await self.wait_for(self.statsd_server.message_received.acquire()) await self.wait_for(self.statsd_server.message_received.acquire())