Merge pull request #1 from sprockets/syn

Initial implementation
This commit is contained in:
Ryan Mclean 2021-04-05 14:06:12 -04:00 committed by GitHub
commit a0e2ef1c19
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 1852 additions and 17 deletions

47
.github/workflows/run-tests.yml vendored Normal file
View file

@ -0,0 +1,47 @@
name: Testing
on:
push:
branches: ["*"]
tags-ignore: ["*"]
pull_request:
branches: [main]
jobs:
test:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: [3.7, 3.8, 3.9]
steps:
- uses: actions/checkout@v2
- name: Install python ${{ matrix.python-version }}
uses: actions/setup-python@v2
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: |
python -m pip install --upgrade pip setuptools
python -m pip install '.[dev]'
python -m pip install -e .
- name: Lint
run: |
flake8 sprockets_statsd tests
- name: Check format
run: |
yapf -dr sprockets_statsd tests
- name: Run tests
run: |
coverage run -m unittest
coverage report
coverage xml -o ./coverage.xml
- name: Generate documentation
run: |
sphinx-build -b html -W --no-color docs build/sphinx/html
- name: Upload coverage
uses: codecov/codecov-action@v1.0.2
with:
token: ${{ secrets.CODECOV_TOKEN }}
file: ./coverage.xml
flags: unittests
env_vars: OS,PYTHON
fail_ci_if_error: true

View file

@ -1,2 +1,3 @@
Next Release Initial release
------------ ---------------
- support for sending counters & timers to statsd over a TCP or UDP socket

View file

@ -1,4 +1,7 @@
graft docs graft docs
graft tests graft tests
include LICENSE
include CHANGELOG.rst include CHANGELOG.rst
include example.py
include LICENSE
include tox.ini

View file

@ -1,5 +1,117 @@
Report metrics from your tornado_ web application to a StatsD_ instance. Asynchronously send metrics to a statsd_ instance.
.. _StatsD: https://github.com/statsd/statsd/ This library provides connectors to send metrics to a statsd_ instance using either TCP or UDP.
.. code-block:: python
import asyncio
import time
import sprockets_statsd.statsd
statsd = sprockets_statsd.statsd.Connector(
host=os.environ.get('STATSD_HOST', '127.0.0.1'))
async def do_stuff():
start = time.time()
response = make_some_http_call()
statsd.timing(f'timers.http.something.{response.code}',
(time.time() - start))
async def main():
await statsd.start()
try:
do_stuff()
finally:
await statsd.stop()
The ``Connector`` instance maintains a resilient connection to the target StatsD instance, formats the metric data
into payloads, and sends them to the StatsD target. It defaults to using TCP as the transport but will use UDP if
the ``ip_protocol`` keyword is set to ``socket.IPPROTO_UDP``. The ``Connector.start`` method starts a background
``asyncio.Task`` that is responsible for maintaining the connection. The ``timing`` method enqueues a timing
metric to send and the task consumes the internal queue when it is connected.
The following convenience methods are available. You can also call ``inject_metric`` for complete control over
the payload.
+--------------+--------------------------------------+
| ``incr`` | Increment a counter metric |
+--------------+--------------------------------------+
| ``decr`` | Decrement a counter metric |
+--------------+--------------------------------------+
| ``gauge`` | Adjust or set a gauge metric |
+--------------+--------------------------------------+
| ``timing`` | Append a duration to a timer metric |
+--------------+--------------------------------------+
Tornado helpers
===============
The ``sprockets_statsd.tornado`` module contains mix-in classes that make reporting metrics from your tornado_ web
application simple. You will need to install the ``sprockets_statsd[tornado]`` extra to ensure that the Tornado
requirements for this library are met.
.. code-block:: python
import asyncio
import logging
from tornado import ioloop, web
import sprockets_statsd.tornado
class MyHandler(sprockets_statsd.tornado.RequestHandler,
web.RequestHandler):
async def get(self):
with self.execution_timer('some-operation'):
await self.do_something()
self.set_status(204)
async def do_something(self):
await asyncio.sleep(1)
class Application(sprockets_statsd.tornado.Application, web.Application):
def __init__(self, **settings):
settings['statsd'] = {
'host': os.environ['STATSD_HOST'],
'prefix': 'applications.my-service',
}
super().__init__([web.url('/', MyHandler)], **settings)
async def on_start(self):
await self.start_statsd()
async def on_stop(self):
await self.stop_statsd()
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
app = Application()
app.listen(8888)
iol = ioloop.IOLoop.current()
try:
iol.add_callback(app.on_start)
iol.start()
except KeyboardInterrupt:
iol.add_future(asyncio.ensure_future(app.on_stop()),
lambda f: iol.stop())
iol.start()
This application will emit two timing metrics each time that the endpoint is invoked::
applications.my-service.timers.some-operation:1001.3449192047119|ms
applications.my-service.timers.MyHandler.GET.204:1002.4960041046143|ms
You will need to set the ``$STATSD_HOST`` environment variable to enable the statsd processing inside of the
application. The ``RequestHandler`` class exposes methods that send counter and timing metrics to a statsd server.
The connection is managed by the ``Application`` provided that you call the ``start_statsd`` method during application
startup.
Metrics are sent by a ``asyncio.Task`` that is started by ``start_statsd``. The request handler methods insert the
metric data onto a ``asyncio.Queue`` that the task reads from. Metric data remains on the queue when the task is
not connected to the server and will be sent in the order received when the task establishes the server connection.
.. _statsd: https://github.com/statsd/statsd/
.. _tornado: https://tornadoweb.org/ .. _tornado: https://tornadoweb.org/

View file

@ -1,9 +1,60 @@
================
sprockets-statsd sprockets-statsd
================ ================
.. include:: ../README.rst .. include:: ../README.rst
Configuration
=============
The statsd connection is configured by the ``statsd`` application settings key. The default values can be set by
the following environment variables.
.. envvar:: STATSD_HOST
The host or IP address of the StatsD server to send metrics to.
.. envvar:: STATSD_PORT
The TCP port number that the StatsD server is listening on. This defaults to 8125 if it is not configured.
.. envvar:: STATSD_PREFIX
Optional prefix to use for metric paths. See the documentation for :class:`~sprockets_statsd.tornado.Application`
for addition notes on setting the path prefix when using the Tornado helpers.
.. envvar:: STATSD_PROTOCOL
The IP protocol to use when connecting to the StatsD server. You can specify either "tcp" or "udp". The
default is "tcp" if it not not configured.
If you are using the Tornado helper clases, then you can fine tune the metric payloads and the connector by
setting additional values in the ``statsd`` key of :attr:`tornado.web.Application.settings`. See the
:class:`sprockets_statsd.tornado.Application` class documentation for a description of the supported settings.
Reference
=========
.. autoclass:: sprockets_statsd.statsd.Connector
:members:
Tornado helpers
---------------
.. autoclass:: sprockets_statsd.tornado.Application
:members:
.. autoclass:: sprockets_statsd.tornado.RequestHandler
:members:
Internals
---------
.. autoclass:: sprockets_statsd.statsd.Processor
:members:
.. autoclass:: sprockets_statsd.statsd.StatsdProtocol
:members:
.. autoclass:: sprockets_statsd.statsd.TCPProtocol
:members:
Release history Release history
=============== ===============

42
example.py Normal file
View file

@ -0,0 +1,42 @@
import asyncio
import logging
from tornado import ioloop, web
import sprockets_statsd.tornado
class MyHandler(sprockets_statsd.tornado.RequestHandler,
web.RequestHandler):
async def get(self):
with self.execution_timer('some-operation'):
await self.do_something()
self.set_status(204)
async def do_something(self):
await asyncio.sleep(1)
class Application(sprockets_statsd.tornado.Application, web.Application):
def __init__(self, **settings):
super().__init__([web.url('/', MyHandler)], **settings)
async def on_start(self):
await self.start_statsd()
async def on_stop(self):
await self.stop_statsd()
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
app = Application()
app.listen(8888)
iol = ioloop.IOLoop.current()
try:
iol.add_callback(app.on_start)
iol.start()
except KeyboardInterrupt:
iol.add_future(asyncio.ensure_future(app.on_stop()),
lambda f: iol.stop())
iol.start()

View file

@ -1,7 +1,7 @@
[metadata] [metadata]
name = sprockets-statsd name = sprockets-statsd
version = attr: sprockets_statsd.version version = attr: sprockets_statsd.version
description = Asynchronous Statsd connector. description = Asynchronously send metrics to a statsd instance.
long_description = file: README.rst long_description = file: README.rst
license = BSD 3-Clause License license = BSD 3-Clause License
url = https://sprockets-statsd.readthedocs.io/ url = https://sprockets-statsd.readthedocs.io/
@ -29,17 +29,19 @@ classifiers =
[options] [options]
packages = find: packages = find:
install_requires = install_requires =
tornado
[options.extras_require] [options.extras_require]
tornado =
tornado>=5
dev = dev =
coverage asynctest==0.13.0
flake8 coverage==5.5
flake8-import-order flake8==3.8.4
sphinx flake8-import-order==0.18.1
sphinx-autodoc-typehints sphinx==3.5.2
tox sphinx-autodoc-typehints==1.11.1
yapf tornado>=5
yapf==0.30.0
[options.packages.find] [options.packages.find]
exclude = exclude =
@ -51,6 +53,9 @@ nitpicky = 1
warning_is_error = 1 warning_is_error = 1
[coverage:report] [coverage:report]
exclude_lines =
pragma: no cover
raise NotImplementedError
fail_under = 100 fail_under = 100
show_missing = 1 show_missing = 1
@ -59,10 +64,25 @@ branch = 1
source = sprockets_statsd source = sprockets_statsd
[flake8] [flake8]
application_import_names = statsd application_import_names = sprockets_statsd,tests
exclude = build,env,dist exclude = build,env,dist
import_order_style = pycharm import_order_style = pycharm
[mypy]
cache_dir = build/mypy-cache
check_untyped_defs = true
show_error_codes = true
warn_no_return = true
warn_redundant_casts = true
warn_unused_configs = true
warn_unused_ignores = true
[mypy-sprockets_statsd]
disallow_incomplete_defs = true
disallow_untyped_defs = true
no_implicit_optional = true
strict = true
[yapf] [yapf]
allow_split_before_dict_value = false allow_split_before_dict_value = false
indent_dictionary_value = true indent_dictionary_value = true

486
sprockets_statsd/statsd.py Normal file
View file

@ -0,0 +1,486 @@
import asyncio
import logging
import socket
import typing
class Connector:
"""Sends metrics to a statsd server.
:param host: statsd server to send metrics to
:param port: socket port that the server is listening on
:keyword ip_protocol: IP protocol to use for the underlying
socket -- either ``socket.IPPROTO_TCP`` for TCP or
``socket.IPPROTO_UDP`` for UDP sockets.
:keyword prefix: optional string to prepend to metric paths
:param kwargs: additional keyword parameters are passed
to the :class:`.Processor` initializer
This class maintains a connection to a statsd server and
sends metric lines to it asynchronously. You must call the
:meth:`start` method when your application is starting. It
creates a :class:`~asyncio.Task` that manages the connection
to the statsd server. You must also call :meth:`.stop` before
terminating to ensure that all metrics are flushed to the
statsd server.
Metrics are optionally prefixed with :attr:`prefix` before the
metric type prefix. This *should* be used to prevent metrics
from being overwritten when multiple applications share a StatsD
instance. Each metric type is also prefixed by one of the
following strings based on the metric type:
+-------------------+---------------+-----------+
| Method call | Prefix | Type code |
+-------------------+---------------+-----------+
| :meth:`.incr` | ``counters.`` | ``c`` |
+-------------------+---------------+-----------+
| :meth:`.decr` | ``counters.`` | ``c`` |
+-------------------+---------------+-----------+
| :meth:`.gauge` | ``gauges.`` | ``g`` |
+-------------------+---------------+-----------+
| :meth:`.timing` | ``timers.`` | ``ms`` |
+-------------------+---------------+-----------+
When the connector is *should_terminate*, metric payloads are
sent by calling the :meth:`.inject_metric` method. The payloads
are stored in an internal queue that is consumed whenever the
connection to the server is active.
.. attribute:: prefix
:type: str
String to prefix to all metrics *before* the metric type prefix.
.. attribute:: processor
:type: Processor
The statsd processor that maintains the connection and
sends the metric payloads.
"""
logger: logging.Logger
prefix: str
processor: 'Processor'
def __init__(self,
host: str,
port: int = 8125,
*,
prefix: str = '',
**kwargs: typing.Any) -> None:
self.logger = logging.getLogger(__package__).getChild('Connector')
self.prefix = f'{prefix}.' if prefix else prefix
self.processor = Processor(host=host, port=port, **kwargs)
self._processor_task: typing.Optional[asyncio.Task[None]] = None
async def start(self) -> None:
"""Start the processor in the background.
This is a *blocking* method and does not return until the
processor task is actually running.
"""
self._processor_task = asyncio.create_task(self.processor.run())
await self.processor.running.wait()
async def stop(self) -> None:
"""Stop the background processor.
Items that are currently in the queue will be flushed to
the statsd server if possible. This is a *blocking* method
and does not return until the background processor has
stopped.
"""
await self.processor.stop()
def incr(self, path: str, value: int = 1) -> None:
"""Increment a counter metric.
:param path: counter to increment
:param value: amount to increment the counter by
"""
self.inject_metric(f'counters.{path}', str(value), 'c')
def decr(self, path: str, value: int = 1) -> None:
"""Decrement a counter metric.
:param path: counter to decrement
:param value: amount to decrement the counter by
This is equivalent to ``self.incr(path, -value)``.
"""
self.inject_metric(f'counters.{path}', str(-value), 'c')
def gauge(self, path: str, value: int, delta: bool = False) -> None:
"""Manipulate a gauge metric.
:param path: gauge to adjust
:param value: value to send
:param delta: is this an adjustment of the gauge?
If the `delta` parameter is ``False`` (or omitted), then
`value` is the new value to set the gauge to. Otherwise,
`value` is an adjustment for the current gauge.
"""
if delta:
payload = f'{value:+d}'
else:
payload = str(value)
self.inject_metric(f'gauges.{path}', payload, 'g')
def timing(self, path: str, seconds: float) -> None:
"""Send a timer metric.
:param path: timer to append a value to
:param seconds: number of **seconds** to record
"""
self.inject_metric(f'timers.{path}', str(seconds * 1000.0), 'ms')
def inject_metric(self, path: str, value: str, type_code: str) -> None:
"""Send a metric to the statsd server.
:param path: formatted metric name
:param value: formatted metric value
:param type_code: type of the metric to send
This method formats the payload and inserts it on the
internal queue for future processing.
"""
payload = f'{self.prefix}{path}:{value}|{type_code}'
try:
self.processor.enqueue(payload.encode('utf-8'))
except asyncio.QueueFull:
self.logger.warning('statsd queue is full, discarding metric')
class StatsdProtocol(asyncio.BaseProtocol):
"""Common interface for backend protocols/transports.
UDP and TCP transports have different interfaces (sendto vs write)
so this class adapts them to a common protocol that our code
can depend on.
.. attribute:: buffered_data
:type: bytes
Bytes that are buffered due to low-level transport failures.
Since protocols & transports are created anew with each connect
attempt, the :class:`.Processor` instance ensures that data
buffered on a transport is copied over to the new transport
when creating a connection.
.. attribute:: connected
:type: asyncio.Event
Is the protocol currently connected?
"""
buffered_data: bytes
ip_protocol: int = socket.IPPROTO_NONE
logger: logging.Logger
transport: typing.Optional[asyncio.BaseTransport]
def __init__(self) -> None:
self.buffered_data = b''
self.connected = asyncio.Event()
self.logger = logging.getLogger(__package__).getChild(
self.__class__.__name__)
self.transport = None
def send(self, metric: bytes) -> None:
"""Send a metric payload over the transport."""
raise NotImplementedError()
async def shutdown(self) -> None:
"""Shutdown the transport and wait for it to close."""
raise NotImplementedError()
def connection_made(self, transport: asyncio.BaseTransport) -> None:
"""Capture the new transport and set the connected event."""
# NB - this will return a 4-part tuple in some cases
server, port = transport.get_extra_info('peername')[:2]
self.logger.info('connected to statsd %s:%s', server, port)
self.transport = transport
self.transport.set_protocol(self)
self.connected.set()
def connection_lost(self, exc: typing.Optional[Exception]) -> None:
"""Clear the connected event."""
self.logger.warning('statsd server connection lost: %s', exc)
self.connected.clear()
class TCPProtocol(StatsdProtocol, asyncio.Protocol):
"""StatsdProtocol implementation over a TCP/IP connection."""
ip_protocol = socket.IPPROTO_TCP
transport: asyncio.WriteTransport
def eof_received(self) -> None:
self.logger.warning('received EOF from statsd server')
self.connected.clear()
def send(self, metric: bytes) -> None:
"""Send `metric` to the server.
If sending the metric fails, it will be saved in
``self.buffered_data``. The processor will save and
restore the buffered data if it needs to create a
new protocol object.
"""
if not self.buffered_data and not metric:
return
self.buffered_data = self.buffered_data + metric + b'\n'
while (self.transport is not None and self.connected.is_set()
and self.buffered_data):
line, maybe_nl, rest = self.buffered_data.partition(b'\n')
line += maybe_nl
self.transport.write(line)
if self.transport.is_closing():
self.logger.warning('transport closed during write')
break
self.buffered_data = rest
async def shutdown(self) -> None:
"""Close the transport after flushing any outstanding data."""
self.logger.info('shutting down')
if self.connected.is_set():
self.send(b'') # flush buffered data
self.transport.close()
while self.connected.is_set():
await asyncio.sleep(0.1)
class UDPProtocol(StatsdProtocol, asyncio.DatagramProtocol):
"""StatsdProtocol implementation over a UDP/IP connection."""
ip_protocol = socket.IPPROTO_UDP
transport: asyncio.DatagramTransport
def send(self, metric: bytes) -> None:
if metric:
self.transport.sendto(metric)
async def shutdown(self) -> None:
self.logger.info('shutting down')
self.transport.close()
class Processor:
"""Maintains the statsd connection and sends metric payloads.
:param host: statsd server to send metrics to
:param port: TCP port that the server is listening on
:param max_queue_size: only allow this many elements to be
stored in the queue before discarding metrics
:param reconnect_sleep: number of seconds to sleep after socket
error occurs when connecting
:param wait_timeout: number os seconds to wait for a message to
arrive on the queue
This class implements :class:`~asyncio.Protocol` for the statsd
TCP connection. The :meth:`.run` method is run as a background
:class:`~asyncio.Task` that consumes payloads from an internal
queue, connects to the TCP server as required, and sends the
already formatted payloads.
.. attribute:: host
:type: str
IP address or DNS name for the statsd server to send metrics to
.. attribute:: port
:type: int
TCP port number that the statsd server is listening on
.. attribute:: should_terminate
:type: bool
Flag that controls whether the background task is active or
not. This flag is set to :data:`False` when the task is started.
Setting it to :data:`True` will cause the task to shutdown in
an orderly fashion.
.. attribute:: queue
:type: asyncio.Queue
Formatted metric payloads to send to the statsd server. Enqueue
payloads to send them to the server.
.. attribute:: running
:type: asyncio.Event
Is the background task currently running? This is the event that
:meth:`.run` sets when it starts and it remains set until the task
exits.
.. attribute:: stopped
:type: asyncio.Event
Is the background task currently stopped? This is the event that
:meth:`.run` sets when it exits and that :meth:`.stop` blocks on
until the task stops.
"""
logger: logging.Logger
protocol: typing.Optional[StatsdProtocol]
queue: asyncio.Queue
_create_transport: typing.Callable[[], typing.Coroutine[
typing.Any, typing.Any, typing.Tuple[asyncio.BaseTransport,
StatsdProtocol]]]
def __init__(self,
*,
host: str,
port: int = 8125,
ip_protocol: int = socket.IPPROTO_TCP,
max_queue_size: int = 1000,
reconnect_sleep: float = 1.0,
wait_timeout: float = 0.1) -> None:
super().__init__()
if not host:
raise RuntimeError('host must be set')
try:
port = int(port)
if not port or port < 1:
raise RuntimeError(
f'port must be a positive integer: {port!r}')
except (TypeError, ValueError):
raise RuntimeError(f'port must be a positive integer: {port!r}')
transport_creators = {
socket.IPPROTO_TCP: self._create_tcp_transport,
socket.IPPROTO_UDP: self._create_udp_transport,
}
try:
factory = transport_creators[ip_protocol]
except KeyError:
raise RuntimeError(f'ip_protocol {ip_protocol} is not supported')
else:
self._create_transport = factory # type: ignore
self.host = host
self.port = port
self._ip_protocol = ip_protocol
self._reconnect_sleep = reconnect_sleep
self._wait_timeout = wait_timeout
self.running = asyncio.Event()
self.stopped = asyncio.Event()
self.stopped.set()
self.logger = logging.getLogger(__package__).getChild('Processor')
self.should_terminate = False
self.protocol = None
self.queue = asyncio.Queue(maxsize=max_queue_size)
@property
def connected(self) -> bool:
"""Is the processor connected?"""
return self.protocol is not None and self.protocol.connected.is_set()
def enqueue(self, metric: bytes) -> None:
self.queue.put_nowait(metric)
async def run(self) -> None:
"""Maintains the connection and processes metric payloads."""
self.running.set()
self.stopped.clear()
self.should_terminate = False
while not self.should_terminate:
try:
await self._connect_if_necessary()
if self.connected:
await self._process_metric()
except asyncio.CancelledError:
self.logger.info('task cancelled, exiting')
self.should_terminate = True
except Exception as error:
self.logger.exception('unexpected error occurred: %s', error)
self.should_terminate = True
self.should_terminate = True
self.logger.info('loop finished with %d metrics in the queue',
self.queue.qsize())
if self.connected:
num_ready = max(self.queue.qsize(), 1)
self.logger.info('draining %d metrics', num_ready)
for _ in range(num_ready):
await self._process_metric()
self.logger.debug('closing transport')
if self.protocol is not None:
await self.protocol.shutdown()
self.logger.info('processor is exiting')
self.running.clear()
self.stopped.set()
async def stop(self) -> None:
"""Stop the processor.
This is an asynchronous but blocking method. It does not
return until enqueued metrics are flushed and the processor
connection is closed.
"""
self.should_terminate = True
await self.stopped.wait()
async def _create_tcp_transport(
self) -> typing.Tuple[asyncio.BaseTransport, StatsdProtocol]:
t, p = await asyncio.get_running_loop().create_connection(
protocol_factory=TCPProtocol, host=self.host, port=self.port)
return t, typing.cast(StatsdProtocol, p)
async def _create_udp_transport(
self) -> typing.Tuple[asyncio.BaseTransport, StatsdProtocol]:
t, p = await asyncio.get_running_loop().create_datagram_endpoint(
protocol_factory=UDPProtocol,
remote_addr=(self.host, self.port),
reuse_port=True)
return t, typing.cast(StatsdProtocol, p)
async def _connect_if_necessary(self) -> None:
if self.protocol is not None:
try:
await asyncio.wait_for(self.protocol.connected.wait(),
self._wait_timeout)
except asyncio.TimeoutError:
self.logger.debug('protocol is no longer connected')
if not self.connected:
try:
buffered_data = b''
if self.protocol is not None:
buffered_data = self.protocol.buffered_data
t, p = await self._create_transport() # type: ignore[misc]
transport, self.protocol = t, p
self.protocol.buffered_data = buffered_data
self.logger.info('connection established to %s',
transport.get_extra_info('peername'))
except IOError as error:
self.logger.warning('connection to %s:%s failed: %s',
self.host, self.port, error)
await asyncio.sleep(self._reconnect_sleep)
async def _process_metric(self) -> None:
try:
metric = await asyncio.wait_for(self.queue.get(),
self._wait_timeout)
self.logger.debug('received %r from queue', metric)
self.queue.task_done()
except asyncio.TimeoutError:
# we still want to invoke the protocol send in case
# it has queued metrics to send
metric = b''
assert self.protocol is not None # AFAICT, this cannot happen
self.protocol.send(metric)

197
sprockets_statsd/tornado.py Normal file
View file

@ -0,0 +1,197 @@
import contextlib
import os
import socket
import time
import typing
from tornado import web
from sprockets_statsd import statsd
class Application(web.Application):
"""Mix this into your application to add a statsd connection.
.. attribute:: statsd_connector
:type: sprockets_statsd.statsd.Connector
Connection to the StatsD server that is set between calls
to :meth:`.start_statsd` and :meth:`.stop_statsd`.
This mix-in is configured by the ``statsd`` settings key. The
value is a dictionary with the following keys.
+-------------------+---------------------------------------------+
| host | the statsd host to send metrics to |
+-------------------+---------------------------------------------+
| port | port number that statsd is listening on |
+-------------------+---------------------------------------------+
| prefix | segment to prefix to metrics |
+-------------------+---------------------------------------------+
| protocol | "tcp" or "udp" |
+-------------------+---------------------------------------------+
| reconnect_timeout | number of seconds to sleep after a statsd |
| | connection attempt fails |
+-------------------+---------------------------------------------+
| wait_timeout | number of seconds to wait for a metric to |
| | arrive on the queue before verifying the |
| | connection |
+-------------------+---------------------------------------------+
**host** defaults to the :envvar:`STATSD_HOST` environment variable.
If this value is not set, then the statsd connector *WILL NOT* be
enabled.
**port** defaults to the :envvar:`STATSD_PORT` environment variable
with a back up default of 8125 if the environment variable is not
set.
**prefix** is prefixed to all metric paths. This provides a
namespace for metrics so that each applications metrics are maintained
in separate buckets. The default is to use the :envvar:`STATSD_PREFIX`
environment variable. If it is unset and the *service* and
*environment* keys are set in ``settings``, then the default is
``applications.<service>.<environment>``. This is a convenient way to
maintain consistent metric paths when you are managing a larger number
of services.
.. rubric:: Warning
If you want to run without a prefix, then you are required to explicitly
set ``statsd.prefix`` to ``None``. This prevents accidentally polluting
the metric namespace with unqualified paths.
**protocol** defaults to the :envvar:`STATSD_PROTOCOL` environment
variable with a back default of "tcp" if the environment variable
is not set.
**reconnect_timeout** defaults to 1.0 seconds which limits the
aggressiveness of creating new TCP connections.
**wait_timeout** defaults to 0.1 seconds which ensures that the
processor quickly responds to connection faults.
"""
statsd_connector: typing.Optional[statsd.Connector]
def __init__(self, *args: typing.Any, **settings: typing.Any):
statsd_settings = settings.setdefault('statsd', {})
statsd_settings.setdefault('host', os.environ.get('STATSD_HOST'))
statsd_settings.setdefault('port',
os.environ.get('STATSD_PORT', '8125'))
statsd_settings.setdefault('protocol',
os.environ.get('STATSD_PROTOCOL', 'tcp'))
if 'prefix' not in statsd_settings:
statsd_settings['prefix'] = os.environ.get('STATSD_PREFIX')
if not statsd_settings['prefix']:
try:
statsd_settings['prefix'] = '.'.join([
'applications',
settings['service'],
settings['environment'],
])
except KeyError:
raise RuntimeError(
'statsd configuration error: prefix is not set. Set'
' $STATSD_PREFIX or configure settings.statsd.prefix')
super().__init__(*args, **settings)
self.settings['statsd']['port'] = int(self.settings['statsd']['port'])
self.statsd_connector = None
async def start_statsd(self) -> None:
"""Start the connector during startup.
Call this method during application startup to enable the statsd
connection. A new :class:`~sprockets_statsd.statsd.Connector`
instance will be created and started. This method does not return
until the connector is running.
"""
if self.statsd_connector is None:
kwargs = self.settings['statsd'].copy()
protocol = kwargs.pop('protocol', None)
if protocol == 'tcp':
kwargs['ip_protocol'] = socket.IPPROTO_TCP
elif protocol == 'udp':
kwargs['ip_protocol'] = socket.IPPROTO_UDP
else:
raise RuntimeError(f'statsd configuration error: {protocol} '
f'is not a valid protocol')
self.statsd_connector = statsd.Connector(**kwargs)
await self.statsd_connector.start()
async def stop_statsd(self) -> None:
"""Stop the connector during shutdown.
If the connector was started, then this method will gracefully
terminate it. The method does not return until after the
connector is stopped.
"""
if self.statsd_connector is not None:
await self.statsd_connector.stop()
self.statsd_connector = None
class RequestHandler(web.RequestHandler):
"""Mix this into your handler to send metrics to a statsd server."""
statsd_connector: typing.Optional[statsd.Connector]
def initialize(self, **kwargs: typing.Any) -> None:
super().initialize(**kwargs)
self.application: Application
self.statsd_connector = self.application.statsd_connector
def __build_path(self, *path: typing.Any) -> str:
return '.'.join(str(c) for c in path)
def record_timing(self, secs: float, *path: typing.Any) -> None:
"""Record the duration.
:param secs: number of seconds to record
:param path: path to record the duration under
"""
if self.statsd_connector is not None:
self.statsd_connector.timing(self.__build_path(*path), secs)
def increase_counter(self, *path: typing.Any, amount: int = 1) -> None:
"""Adjust a counter.
:param path: path of the counter to adjust
:param amount: amount to adjust the counter by. Defaults to
1 and can be negative
"""
if self.statsd_connector is not None:
self.statsd_connector.incr(self.__build_path(*path), amount)
@contextlib.contextmanager
def execution_timer(
self, *path: typing.Any) -> typing.Generator[None, None, None]:
"""Record the execution duration of a block of code.
:param path: path to record the duration as
"""
start = time.time()
try:
yield
finally:
self.record_timing(time.time() - start, *path)
def on_finish(self) -> None:
"""Extended to record the request time as a duration.
This method extends :meth:`tornado.web.RequestHandler.on_finish`
to record ``self.request.request_time`` as a timing metric.
"""
super().on_finish()
self.record_timing(self.request.request_time(),
self.__class__.__name__, self.request.method,
self.get_status())

132
tests/helpers.py Normal file
View file

@ -0,0 +1,132 @@
import asyncio
import io
import socket
import typing
class StatsdServer(asyncio.DatagramProtocol, asyncio.Protocol):
metrics: typing.List[bytes]
def __init__(self, ip_protocol):
self.server = None
self.host = '127.0.0.1'
self.port = 0
self.ip_protocol = ip_protocol
self.connections_made = 0
self.connections_lost = 0
self.message_counter = 0
self.metrics = []
self.running = asyncio.Event()
self.client_connected = asyncio.Semaphore(value=0)
self.message_received = asyncio.Semaphore(value=0)
self.transports: typing.List[asyncio.BaseTransport] = []
self._buffer = io.BytesIO()
async def run(self):
await self._reset()
loop = asyncio.get_running_loop()
if self.ip_protocol == socket.IPPROTO_TCP:
server = await loop.create_server(lambda: self,
self.host,
self.port,
reuse_port=True)
self.server = server
listening_sock = typing.cast(typing.List[socket.socket],
server.sockets)[0]
self.host, self.port = listening_sock.getsockname()
self.running.set()
try:
await server.serve_forever()
except asyncio.CancelledError:
self.close()
await server.wait_closed()
except Exception as error:
raise error
finally:
self.running.clear()
elif self.ip_protocol == socket.IPPROTO_UDP:
transport, protocol = await loop.create_datagram_endpoint(
lambda: self,
local_addr=(self.host, self.port),
reuse_port=True)
self.server = transport
self.host, self.port = transport.get_extra_info('sockname')
self.running.set()
try:
while not transport.is_closing():
await asyncio.sleep(0.1)
finally:
self.running.clear()
def close(self):
if self.server is not None:
self.server.close()
for connected_client in self.transports:
connected_client.close()
self.transports.clear()
async def wait_running(self):
await self.running.wait()
async def wait_closed(self):
while self.running.is_set():
await asyncio.sleep(0.1)
def connection_made(self, transport: asyncio.BaseTransport):
self.client_connected.release()
self.connections_made += 1
self.transports.append(transport)
def connection_lost(self, exc) -> None:
self.connections_lost += 1
def data_received(self, data: bytes):
self._buffer.write(data)
self._process_buffer()
def datagram_received(self, data: bytes, _addr):
self._buffer.write(data + b'\n')
self._process_buffer()
def _process_buffer(self):
buf = self._buffer.getvalue()
if b'\n' in buf:
buf_complete = buf[-1] == ord('\n')
if not buf_complete:
offset = buf.rfind(b'\n')
self._buffer = io.BytesIO(buf[offset:])
buf = buf[:offset]
else:
self._buffer = io.BytesIO()
buf = buf[:-1]
for metric in buf.split(b'\n'):
self.metrics.append(metric)
self.message_received.release()
self.message_counter += 1
async def _reset(self):
self._buffer = io.BytesIO()
self.connections_made = 0
self.connections_lost = 0
self.message_counter = 0
self.metrics.clear()
for transport in self.transports:
transport.close()
self.transports.clear()
self.running.clear()
await self._drain_semaphore(self.client_connected)
await self._drain_semaphore(self.message_received)
@staticmethod
async def _drain_semaphore(semaphore: asyncio.Semaphore):
while not semaphore.locked():
try:
await asyncio.wait_for(semaphore.acquire(), 0.1)
except asyncio.TimeoutError:
break

403
tests/test_processor.py Normal file
View file

@ -0,0 +1,403 @@
import asyncio
import logging
import socket
import time
import typing
import asynctest
from sprockets_statsd import statsd
from tests import helpers
class ProcessorTestCase(asynctest.TestCase):
ip_protocol: int
async def setUp(self):
self.test_timeout = 5.0
super().setUp()
await self.asyncSetUp()
async def tearDown(self):
await self.asyncTearDown()
super().tearDown()
async def wait_for(self, fut):
try:
await asyncio.wait_for(fut, timeout=self.test_timeout)
except asyncio.TimeoutError:
self.fail('future took too long to resolve')
async def asyncSetUp(self):
self.statsd_server = helpers.StatsdServer(self.ip_protocol)
self.statsd_task = asyncio.create_task(self.statsd_server.run())
await self.statsd_server.wait_running()
async def asyncTearDown(self):
self.statsd_server.close()
await self.statsd_server.wait_closed()
class ProcessorTests(ProcessorTestCase):
ip_protocol = socket.IPPROTO_TCP
async def test_that_processor_connects_and_disconnects(self):
processor = statsd.Processor(host=self.statsd_server.host,
port=self.statsd_server.port)
asyncio.create_task(processor.run())
await self.wait_for(self.statsd_server.client_connected.acquire())
await self.wait_for(processor.stop())
self.assertEqual(1, self.statsd_server.connections_made)
self.assertEqual(1, self.statsd_server.connections_lost)
async def test_that_processor_reconnects(self):
processor = statsd.Processor(host=self.statsd_server.host,
port=self.statsd_server.port)
asyncio.create_task(processor.run())
await self.wait_for(self.statsd_server.client_connected.acquire())
# Now that the server is running and the client has connected,
# cancel the server and let it die off.
self.statsd_server.close()
await self.statsd_server.wait_closed()
until = time.time() + self.test_timeout
while processor.connected:
await asyncio.sleep(0.1)
if time.time() >= until:
self.fail('processor never disconnected')
# Start the server on the same port and let the client reconnect.
self.statsd_task = asyncio.create_task(self.statsd_server.run())
await self.wait_for(self.statsd_server.client_connected.acquire())
self.assertTrue(processor.connected)
await self.wait_for(processor.stop())
async def test_that_processor_can_be_cancelled(self):
processor = statsd.Processor(host=self.statsd_server.host,
port=self.statsd_server.port)
task = asyncio.create_task(processor.run())
await self.wait_for(self.statsd_server.client_connected.acquire())
task.cancel()
await self.wait_for(processor.stopped.wait())
async def test_shutdown_when_disconnected(self):
processor = statsd.Processor(host=self.statsd_server.host,
port=self.statsd_server.port)
asyncio.create_task(processor.run())
await self.wait_for(self.statsd_server.client_connected.acquire())
self.statsd_server.close()
await self.statsd_server.wait_closed()
await self.wait_for(processor.stop())
async def test_socket_resets(self):
processor = statsd.Processor(host=self.statsd_server.host,
port=self.statsd_server.port)
asyncio.create_task(processor.run())
await self.wait_for(self.statsd_server.client_connected.acquire())
self.statsd_server.transports[0].close()
await self.wait_for(self.statsd_server.client_connected.acquire())
await self.wait_for(processor.stop())
async def test_that_stopping_when_not_running_is_safe(self):
processor = statsd.Processor(host=self.statsd_server.host,
port=self.statsd_server.port)
await self.wait_for(processor.stop())
def test_that_processor_fails_when_host_is_none(self):
with self.assertRaises(RuntimeError) as context:
statsd.Processor(host=None, port=12345) # type: ignore[arg-type]
self.assertIn('host', str(context.exception))
async def test_starting_and_stopping_without_connecting(self):
host, port = self.statsd_server.host, self.statsd_server.port
self.statsd_server.close()
await self.wait_for(self.statsd_server.wait_closed())
processor = statsd.Processor(host=host, port=port)
asyncio.create_task(processor.run())
await self.wait_for(processor.running.wait())
await processor.stop()
async def test_that_protocol_exceptions_are_logged(self):
processor = statsd.Processor(host=self.statsd_server.host,
port=self.statsd_server.port)
asyncio.create_task(processor.run())
await self.wait_for(processor.running.wait())
with self.assertLogs(processor.logger, level=logging.ERROR) as cm:
processor.queue.put_nowait('not-bytes') # type: ignore[arg-type]
while processor.queue.qsize() > 0:
await asyncio.sleep(0.1)
for record in cm.records:
if record.exc_info is not None and record.funcName == 'run':
break
else:
self.fail('Expected run to log exception')
await processor.stop()
class TCPProcessingTests(ProcessorTestCase):
ip_protocol = socket.IPPROTO_TCP
async def asyncSetUp(self):
await super().asyncSetUp()
self.processor = statsd.Processor(host=self.statsd_server.host,
port=self.statsd_server.port,
reconnect_sleep=0.25)
asyncio.create_task(self.processor.run())
await self.wait_for(self.statsd_server.client_connected.acquire())
async def asyncTearDown(self):
await self.processor.stop()
await super().asyncTearDown()
async def test_connection_failures(self):
# Change the port and close the transport, this will cause the
# processor to reconnect to the new port and fail.
self.processor.port = 1
self.processor.protocol.transport.close()
# Wait for the processor to be disconnected, then change the
# port back and let the processor reconnect.
while self.processor.connected:
await asyncio.sleep(0.1)
await asyncio.sleep(0.2)
self.processor.port = self.statsd_server.port
await self.wait_for(self.statsd_server.client_connected.acquire())
async def test_socket_closure_while_processing_failed_event(self):
state = {'first_time': True}
real_process_metric = self.processor._process_metric
async def fake_process_metric():
if state['first_time']:
self.processor.protocol.buffered_data = b'counter:1|c\n'
self.processor.protocol.transport.close()
state['first_time'] = False
return await real_process_metric()
self.processor._process_metric = fake_process_metric
await self.wait_for(self.statsd_server.message_received.acquire())
async def test_socket_closure_while_sending(self):
state = {'first_time': True}
protocol = typing.cast(statsd.TCPProtocol, self.processor.protocol)
real_transport_write = protocol.transport.write
def fake_transport_write(data):
if state['first_time']:
self.processor.protocol.transport.close()
state['first_time'] = False
return real_transport_write(data)
protocol.transport.write = fake_transport_write
self.processor.queue.put_nowait(b'counter:1|c')
await self.wait_for(self.statsd_server.message_received.acquire())
class UDPProcessingTests(ProcessorTestCase):
ip_protocol = socket.IPPROTO_UDP
async def asyncSetUp(self):
await super().asyncSetUp()
self.connector = statsd.Connector(host=self.statsd_server.host,
port=self.statsd_server.port,
ip_protocol=self.ip_protocol,
reconnect_sleep=0.25)
await self.connector.start()
async def asyncTearDown(self):
await self.connector.stop()
await super().asyncTearDown()
async def test_sending_metrics(self):
self.connector.incr('counter')
self.connector.timing('timer', 0.001)
await self.wait_for(self.statsd_server.message_received.acquire())
await self.wait_for(self.statsd_server.message_received.acquire())
self.assertEqual(self.statsd_server.metrics[0],
b'counters.counter:1|c')
self.assertEqual(self.statsd_server.metrics[1], b'timers.timer:1.0|ms')
async def test_that_client_sends_to_new_server(self):
self.statsd_server.close()
await self.statsd_server.wait_closed()
self.connector.incr('should.be.lost')
await asyncio.sleep(self.connector.processor._wait_timeout * 2)
self.statsd_task = asyncio.create_task(self.statsd_server.run())
await self.statsd_server.wait_running()
self.connector.incr('should.be.recvd')
await self.wait_for(self.statsd_server.message_received.acquire())
self.assertEqual(self.statsd_server.metrics[0],
b'counters.should.be.recvd:1|c')
async def test_that_client_handles_socket_closure(self):
self.connector.processor.protocol.transport.close()
await self.wait_for(
asyncio.sleep(self.connector.processor._reconnect_sleep))
self.connector.incr('should.be.recvd')
await self.wait_for(self.statsd_server.message_received.acquire())
self.assertEqual(self.statsd_server.metrics[0],
b'counters.should.be.recvd:1|c')
class ConnectorTests(ProcessorTestCase):
ip_protocol = socket.IPPROTO_TCP
async def asyncSetUp(self):
await super().asyncSetUp()
self.connector = statsd.Connector(self.statsd_server.host,
self.statsd_server.port)
await self.connector.start()
await self.wait_for(self.statsd_server.client_connected.acquire())
async def asyncTearDown(self):
await self.wait_for(self.connector.stop())
await super().asyncTearDown()
def assert_metrics_equal(self, recvd: bytes, path, value, type_code):
decoded = recvd.decode('utf-8')
recvd_path, _, rest = decoded.partition(':')
recvd_value, _, recvd_code = rest.partition('|')
self.assertEqual(path, recvd_path, 'metric path mismatch')
self.assertEqual(recvd_value, str(value), 'metric value mismatch')
self.assertEqual(recvd_code, type_code, 'metric type mismatch')
async def test_adjusting_counter(self):
self.connector.incr('simple.counter')
await self.wait_for(self.statsd_server.message_received.acquire())
self.assert_metrics_equal(self.statsd_server.metrics[-1],
'counters.simple.counter', 1, 'c')
self.connector.incr('simple.counter', 10)
await self.wait_for(self.statsd_server.message_received.acquire())
self.assert_metrics_equal(self.statsd_server.metrics[-1],
'counters.simple.counter', 10, 'c')
self.connector.decr('simple.counter')
await self.wait_for(self.statsd_server.message_received.acquire())
self.assert_metrics_equal(self.statsd_server.metrics[-1],
'counters.simple.counter', -1, 'c')
self.connector.decr('simple.counter', 10)
await self.wait_for(self.statsd_server.message_received.acquire())
self.assert_metrics_equal(self.statsd_server.metrics[-1],
'counters.simple.counter', -10, 'c')
async def test_adjusting_gauge(self):
self.connector.gauge('simple.gauge', 100)
self.connector.gauge('simple.gauge', -10, delta=True)
self.connector.gauge('simple.gauge', 10, delta=True)
for _ in range(3):
await self.wait_for(self.statsd_server.message_received.acquire())
self.assert_metrics_equal(self.statsd_server.metrics[0],
'gauges.simple.gauge', '100', 'g')
self.assert_metrics_equal(self.statsd_server.metrics[1],
'gauges.simple.gauge', '-10', 'g')
self.assert_metrics_equal(self.statsd_server.metrics[2],
'gauges.simple.gauge', '+10', 'g')
async def test_sending_timer(self):
secs = 12.34
self.connector.timing('simple.timer', secs)
await self.wait_for(self.statsd_server.message_received.acquire())
self.assert_metrics_equal(self.statsd_server.metrics[0],
'timers.simple.timer', 12340.0, 'ms')
async def test_that_queued_metrics_are_drained(self):
# The easiest way to test that the internal metrics queue
# is drained when the processor is stopped is to monkey
# patch the "process metric" method to enqueue a few
# metrics and then terminate the processor. It will exit
# the run loop and drain the queue.
real_process_metric = self.connector.processor._process_metric
async def fake_process_metric():
if not self.connector.processor.should_terminate:
self.connector.incr('counter', 1)
self.connector.incr('counter', 2)
self.connector.incr('counter', 3)
self.connector.processor.should_terminate = True
return await real_process_metric()
self.connector.processor._process_metric = fake_process_metric
await self.wait_for(self.statsd_server.message_received.acquire())
await self.wait_for(self.statsd_server.message_received.acquire())
await self.wait_for(self.statsd_server.message_received.acquire())
async def test_metrics_sent_while_disconnected_are_queued(self):
self.statsd_server.close()
await self.statsd_server.wait_closed()
for value in range(50):
self.connector.incr('counter', value)
asyncio.create_task(self.statsd_server.run())
await self.wait_for(self.statsd_server.client_connected.acquire())
for value in range(50):
await self.wait_for(self.statsd_server.message_received.acquire())
self.assertEqual(f'counters.counter:{value}|c'.encode(),
self.statsd_server.metrics.pop(0))
class ConnectorOptionTests(ProcessorTestCase):
ip_protocol = socket.IPPROTO_TCP
def test_protocol_values(self):
connector = statsd.Connector(host=self.statsd_server.host,
port=self.statsd_server.port)
self.assertEqual(socket.IPPROTO_TCP, connector.processor._ip_protocol)
connector = statsd.Connector(host=self.statsd_server.host,
port=self.statsd_server.port,
ip_protocol=socket.IPPROTO_UDP)
self.assertEqual(socket.IPPROTO_UDP, connector.processor._ip_protocol)
with self.assertRaises(RuntimeError):
statsd.Connector(host=self.statsd_server.host,
port=self.statsd_server.port,
ip_protocol=socket.IPPROTO_GRE)
def test_invalid_port_values(self):
for port in {None, 0, -1, 'not-a-number'}:
with self.assertRaises(RuntimeError) as context:
statsd.Connector(host=self.statsd_server.host, port=port)
self.assertIn('port', str(context.exception))
self.assertIn(repr(port), str(context.exception))
async def test_that_metrics_are_dropped_when_queue_overflows(self):
connector = statsd.Connector(host=self.statsd_server.host,
port=1,
max_queue_size=10)
await connector.start()
self.addCleanup(connector.stop)
# fill up the queue with incr's
for expected_size in range(1, connector.processor.queue.maxsize + 1):
connector.incr('counter')
self.assertEqual(connector.processor.queue.qsize(), expected_size)
# the following decr's should be ignored
for _ in range(10):
connector.decr('counter')
self.assertEqual(connector.processor.queue.qsize(), 10)
# make sure that only the incr's are in the queue
for _ in range(connector.processor.queue.qsize()):
metric = await connector.processor.queue.get()
self.assertEqual(metric, b'counters.counter:1|c')

322
tests/test_tornado.py Normal file
View file

@ -0,0 +1,322 @@
import asyncio
import os
import socket
import time
import typing
from tornado import testing, web
import sprockets_statsd.tornado
from tests import helpers
ParsedMetric = typing.Tuple[str, float, str]
class Handler(sprockets_statsd.tornado.RequestHandler, web.RequestHandler):
async def get(self):
with self.execution_timer('execution-timer'):
await asyncio.sleep(0.1)
self.increase_counter('request-count')
self.write('true')
class Application(sprockets_statsd.tornado.Application, web.Application):
def __init__(self, **settings):
super().__init__([web.url('/', Handler)], **settings)
class AsyncTestCaseWithTimeout(testing.AsyncTestCase):
def run_coroutine(self, coro):
loop: asyncio.AbstractEventLoop = self.io_loop.asyncio_loop
try:
loop.run_until_complete(
asyncio.wait_for(coro,
timeout=testing.get_async_test_timeout()))
except asyncio.TimeoutError:
self.fail(f'coroutine {coro} took too long to complete')
class ApplicationTests(AsyncTestCaseWithTimeout):
def setUp(self):
super().setUp()
self._environ = {}
def tearDown(self):
super().tearDown()
for name, value in self._environ.items():
if value is not None:
os.environ[name] = value
else:
os.environ.pop(name, None)
def setenv(self, name, value):
self._environ.setdefault(name, os.environ.pop(name, None))
os.environ[name] = value
def unsetenv(self, name):
self._environ.setdefault(name, os.environ.pop(name, None))
def test_statsd_setting_defaults(self):
self.unsetenv('STATSD_HOST')
self.unsetenv('STATSD_PORT')
self.unsetenv('STATSD_PREFIX')
self.unsetenv('STATSD_PROTOCOL')
app = sprockets_statsd.tornado.Application(statsd={'prefix': ''})
self.assertIn('statsd', app.settings)
self.assertIsNone(app.settings['statsd']['host'],
'default host value should be None')
self.assertEqual(8125, app.settings['statsd']['port'])
self.assertEqual('', app.settings['statsd']['prefix'])
self.assertEqual('tcp', app.settings['statsd']['protocol'])
def test_that_statsd_settings_read_from_environment(self):
self.setenv('STATSD_HOST', 'statsd')
self.setenv('STATSD_PORT', '5218')
self.setenv('STATSD_PREFIX', 'my-service')
self.setenv('STATSD_PROTOCOL', 'udp')
app = sprockets_statsd.tornado.Application()
self.assertIn('statsd', app.settings)
self.assertEqual('statsd', app.settings['statsd']['host'])
self.assertEqual(5218, app.settings['statsd']['port'])
self.assertEqual('my-service', app.settings['statsd']['prefix'])
self.assertEqual('udp', app.settings['statsd']['protocol'])
def test_prefix_when_only_service_is_set(self):
with self.assertRaises(RuntimeError):
sprockets_statsd.tornado.Application(service='blah')
def test_prefix_when_only_environment_is_set(self):
with self.assertRaises(RuntimeError):
sprockets_statsd.tornado.Application(environment='whatever')
def test_prefix_default_when_service_and_environment_are_set(self):
app = sprockets_statsd.tornado.Application(environment='development',
service='my-service')
self.assertIn('statsd', app.settings)
self.assertEqual('applications.my-service.development',
app.settings['statsd']['prefix'])
def test_overridden_settings(self):
self.setenv('STATSD_HOST', 'statsd')
self.setenv('STATSD_PORT', '9999')
self.setenv('STATSD_PREFIX', 'service')
self.setenv('STATSD_PROTOCOL', 'tcp')
app = sprockets_statsd.tornado.Application(
statsd={
'host': 'statsd.example.com',
'port': 5218,
'prefix': 'myapp',
'protocol': 'udp',
})
self.assertEqual('statsd.example.com', app.settings['statsd']['host'])
self.assertEqual(5218, app.settings['statsd']['port'])
self.assertEqual('myapp', app.settings['statsd']['prefix'])
self.assertEqual('udp', app.settings['statsd']['protocol'])
def test_that_starting_without_host_fails(self):
self.unsetenv('STATSD_HOST')
app = sprockets_statsd.tornado.Application(statsd={'prefix': 'app'})
with self.assertRaises(RuntimeError):
self.run_coroutine(app.start_statsd())
def test_creating_without_prefix_on_purpose(self):
self.unsetenv('STATSD_PREFIX')
app = sprockets_statsd.tornado.Application(statsd={
'host': 'statsd.example.com',
'protocol': 'udp',
'prefix': None,
})
self.assertEqual(None, app.settings['statsd']['prefix'])
def test_starting_with_calculated_prefix(self):
self.unsetenv('STATSD_PREFIX')
app = sprockets_statsd.tornado.Application(
environment='development',
service='my-service',
statsd={
'host': 'statsd.example.com',
'protocol': 'udp',
})
try:
self.run_coroutine(app.start_statsd())
self.assertEqual('applications.my-service.development',
app.settings['statsd']['prefix'])
finally:
self.run_coroutine(app.stop_statsd())
def test_starting_twice(self):
app = sprockets_statsd.tornado.Application(statsd={
'host': 'localhost',
'port': '8125',
'prefix': 'my-service',
})
try:
self.run_coroutine(app.start_statsd())
connector = app.statsd_connector
self.assertIsNotNone(connector, 'statsd.Connector not created')
self.run_coroutine(app.start_statsd())
self.assertIs(app.statsd_connector, connector,
'statsd.Connector should not be recreated')
finally:
self.run_coroutine(app.stop_statsd())
def test_stopping_without_starting(self):
app = sprockets_statsd.tornado.Application(statsd={
'host': 'localhost',
'port': '8125',
'prefix': 'my-service',
})
self.run_coroutine(app.stop_statsd())
def test_optional_parameters(self):
app = sprockets_statsd.tornado.Application(
statsd={
'host': 'localhost',
'port': '8125',
'prefix': 'my-service',
'reconnect_sleep': 0.5,
'wait_timeout': 0.25,
})
self.run_coroutine(app.start_statsd())
processor = app.statsd_connector.processor
self.assertEqual(0.5, processor._reconnect_sleep)
self.assertEqual(0.25, processor._wait_timeout)
self.run_coroutine(app.stop_statsd())
def test_starting_with_invalid_protocol(self):
app = sprockets_statsd.tornado.Application(statsd={
'host': 'localhost',
'prefix': 'my-service',
'protocol': 'unknown'
})
with self.assertRaises(RuntimeError):
self.run_coroutine(app.start_statsd())
def test_that_protocol_strings_are_translated(self):
app = sprockets_statsd.tornado.Application(statsd={
'host': 'localhost',
'prefix': 'my-service',
'protocol': 'tcp',
})
self.run_coroutine(app.start_statsd())
self.assertEqual(socket.IPPROTO_TCP,
app.statsd_connector.processor._ip_protocol)
self.run_coroutine(app.stop_statsd())
app = sprockets_statsd.tornado.Application(statsd={
'host': 'localhost',
'prefix': 'my-service',
'protocol': 'udp',
})
self.run_coroutine(app.start_statsd())
self.assertEqual(socket.IPPROTO_UDP,
app.statsd_connector.processor._ip_protocol)
self.run_coroutine(app.stop_statsd())
def test_disabling_statsd_prefix(self):
app = sprockets_statsd.tornado.Application(
service='my-service',
version='1.0.0',
statsd={
'host': 'localhost',
'prefix': '',
'protocol': 'udp',
},
)
self.run_coroutine(app.start_statsd())
self.assertEqual(app.statsd_connector.prefix, '')
self.run_coroutine(app.stop_statsd())
class RequestHandlerTests(AsyncTestCaseWithTimeout, testing.AsyncHTTPTestCase):
def setUp(self):
super().setUp()
self.statsd_server = helpers.StatsdServer(socket.IPPROTO_TCP)
self.io_loop.spawn_callback(self.statsd_server.run)
self.run_coroutine(self.statsd_server.wait_running())
self.app.settings['statsd'].update({
'host': self.statsd_server.host,
'port': self.statsd_server.port,
})
self.run_coroutine(self.app.start_statsd())
def tearDown(self):
self.run_coroutine(self.app.stop_statsd())
self.statsd_server.close()
self.run_coroutine(self.statsd_server.wait_closed())
super().tearDown()
def get_app(self):
self.app = Application(statsd={
'prefix': 'applications.service',
'protocol': 'tcp',
})
return self.app
def wait_for_metrics(self, metric_count=3):
timeout_remaining = testing.get_async_test_timeout()
for _ in range(metric_count):
start = time.time()
try:
self.io_loop.run_sync(
self.statsd_server.message_received.acquire,
timeout=timeout_remaining)
except TimeoutError:
self.fail()
timeout_remaining -= (time.time() - start)
def parse_metric(self, metric_line: bytes) -> ParsedMetric:
decoded = metric_line.decode()
path, _, rest = decoded.partition(':')
value, _, type_code = rest.partition('|')
try:
parsed_value = float(value)
except ValueError:
self.fail(f'value of {path} is not a number: value={value!r}')
return path, parsed_value, type_code
def find_metric(self, needle: str) -> ParsedMetric:
encoded = needle.encode()
for line in self.statsd_server.metrics:
if encoded in line:
return self.parse_metric(line)
self.fail(f'failed to find metric containing {needle!r}')
def test_the_request_metric_is_sent_last(self):
rsp = self.fetch('/')
self.assertEqual(200, rsp.code)
self.wait_for_metrics()
path, _, type_code = self.find_metric('Handler.GET.200')
self.assertEqual(path, 'applications.service.timers.Handler.GET.200')
self.assertEqual('ms', type_code)
def test_execution_timer(self):
rsp = self.fetch('/')
self.assertEqual(200, rsp.code)
self.wait_for_metrics()
path, _, type_code = self.find_metric('execution-timer')
self.assertEqual('applications.service.timers.execution-timer', path)
self.assertEqual('ms', type_code)
def test_counter(self):
rsp = self.fetch('/')
self.assertEqual(200, rsp.code)
self.wait_for_metrics()
path, value, type_code = self.find_metric('request-count')
self.assertEqual('applications.service.counters.request-count', path)
self.assertEqual(1.0, value)
self.assertEqual('c', type_code)
def test_handling_request_without_statsd_configured(self):
self.io_loop.run_sync(self.app.stop_statsd)
rsp = self.fetch('/')
self.assertEqual(200, rsp.code)

19
tox.ini Normal file
View file

@ -0,0 +1,19 @@
[tox]
envlist = lint,py37,py38,py39,tornado5
toxworkdir = ./build/tox
[testenv]
deps =
.[dev,tornado]
commands =
python -m unittest
[testenv:lint]
commands =
flake8 sprockets_statsd tests
yapf -dr sprockets_statsd tests
[testenv:tornado5]
deps =
tornado>=5,<6
.[dev]