diff --git a/.github/workflows/run-tests.yml b/.github/workflows/run-tests.yml new file mode 100644 index 0000000..2b88174 --- /dev/null +++ b/.github/workflows/run-tests.yml @@ -0,0 +1,47 @@ +name: Testing +on: + push: + branches: ["*"] + tags-ignore: ["*"] + pull_request: + branches: [main] + +jobs: + test: + runs-on: ubuntu-latest + strategy: + matrix: + python-version: [3.7, 3.8, 3.9] + steps: + - uses: actions/checkout@v2 + - name: Install python ${{ matrix.python-version }} + uses: actions/setup-python@v2 + with: + python-version: ${{ matrix.python-version }} + - name: Install dependencies + run: | + python -m pip install --upgrade pip setuptools + python -m pip install '.[dev]' + python -m pip install -e . + - name: Lint + run: | + flake8 sprockets_statsd tests + - name: Check format + run: | + yapf -dr sprockets_statsd tests + - name: Run tests + run: | + coverage run -m unittest + coverage report + coverage xml -o ./coverage.xml + - name: Generate documentation + run: | + sphinx-build -b html -W --no-color docs build/sphinx/html + - name: Upload coverage + uses: codecov/codecov-action@v1.0.2 + with: + token: ${{ secrets.CODECOV_TOKEN }} + file: ./coverage.xml + flags: unittests + env_vars: OS,PYTHON + fail_ci_if_error: true diff --git a/CHANGELOG.rst b/CHANGELOG.rst index c90af8f..25c7b1c 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,2 +1,3 @@ -Next Release ------------- +Initial release +--------------- +- support for sending counters & timers to statsd over a TCP or UDP socket diff --git a/MANIFEST.in b/MANIFEST.in index ba8e519..d2b04be 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,4 +1,7 @@ graft docs graft tests -include LICENSE + include CHANGELOG.rst +include example.py +include LICENSE +include tox.ini diff --git a/README.rst b/README.rst index bbb7a36..aec5e05 100644 --- a/README.rst +++ b/README.rst @@ -1,5 +1,117 @@ -Report metrics from your tornado_ web application to a StatsD_ instance. +Asynchronously send metrics to a statsd_ instance. -.. _StatsD: https://github.com/statsd/statsd/ +This library provides connectors to send metrics to a statsd_ instance using either TCP or UDP. + +.. code-block:: python + + import asyncio + import time + + import sprockets_statsd.statsd + + statsd = sprockets_statsd.statsd.Connector( + host=os.environ.get('STATSD_HOST', '127.0.0.1')) + + async def do_stuff(): + start = time.time() + response = make_some_http_call() + statsd.timing(f'timers.http.something.{response.code}', + (time.time() - start)) + + async def main(): + await statsd.start() + try: + do_stuff() + finally: + await statsd.stop() + +The ``Connector`` instance maintains a resilient connection to the target StatsD instance, formats the metric data +into payloads, and sends them to the StatsD target. It defaults to using TCP as the transport but will use UDP if +the ``ip_protocol`` keyword is set to ``socket.IPPROTO_UDP``. The ``Connector.start`` method starts a background +``asyncio.Task`` that is responsible for maintaining the connection. The ``timing`` method enqueues a timing +metric to send and the task consumes the internal queue when it is connected. + +The following convenience methods are available. You can also call ``inject_metric`` for complete control over +the payload. + ++--------------+--------------------------------------+ +| ``incr`` | Increment a counter metric | ++--------------+--------------------------------------+ +| ``decr`` | Decrement a counter metric | ++--------------+--------------------------------------+ +| ``gauge`` | Adjust or set a gauge metric | ++--------------+--------------------------------------+ +| ``timing`` | Append a duration to a timer metric | ++--------------+--------------------------------------+ + +Tornado helpers +=============== +The ``sprockets_statsd.tornado`` module contains mix-in classes that make reporting metrics from your tornado_ web +application simple. You will need to install the ``sprockets_statsd[tornado]`` extra to ensure that the Tornado +requirements for this library are met. + +.. code-block:: python + + import asyncio + import logging + + from tornado import ioloop, web + + import sprockets_statsd.tornado + + + class MyHandler(sprockets_statsd.tornado.RequestHandler, + web.RequestHandler): + async def get(self): + with self.execution_timer('some-operation'): + await self.do_something() + self.set_status(204) + + async def do_something(self): + await asyncio.sleep(1) + + + class Application(sprockets_statsd.tornado.Application, web.Application): + def __init__(self, **settings): + settings['statsd'] = { + 'host': os.environ['STATSD_HOST'], + 'prefix': 'applications.my-service', + } + super().__init__([web.url('/', MyHandler)], **settings) + + async def on_start(self): + await self.start_statsd() + + async def on_stop(self): + await self.stop_statsd() + + + if __name__ == '__main__': + logging.basicConfig(level=logging.DEBUG) + app = Application() + app.listen(8888) + iol = ioloop.IOLoop.current() + try: + iol.add_callback(app.on_start) + iol.start() + except KeyboardInterrupt: + iol.add_future(asyncio.ensure_future(app.on_stop()), + lambda f: iol.stop()) + iol.start() + +This application will emit two timing metrics each time that the endpoint is invoked:: + + applications.my-service.timers.some-operation:1001.3449192047119|ms + applications.my-service.timers.MyHandler.GET.204:1002.4960041046143|ms + +You will need to set the ``$STATSD_HOST`` environment variable to enable the statsd processing inside of the +application. The ``RequestHandler`` class exposes methods that send counter and timing metrics to a statsd server. +The connection is managed by the ``Application`` provided that you call the ``start_statsd`` method during application +startup. + +Metrics are sent by a ``asyncio.Task`` that is started by ``start_statsd``. The request handler methods insert the +metric data onto a ``asyncio.Queue`` that the task reads from. Metric data remains on the queue when the task is +not connected to the server and will be sent in the order received when the task establishes the server connection. + +.. _statsd: https://github.com/statsd/statsd/ .. _tornado: https://tornadoweb.org/ - diff --git a/docs/index.rst b/docs/index.rst index 95ec2fb..7777077 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -1,9 +1,60 @@ -================ sprockets-statsd ================ .. include:: ../README.rst +Configuration +============= +The statsd connection is configured by the ``statsd`` application settings key. The default values can be set by +the following environment variables. + +.. envvar:: STATSD_HOST + + The host or IP address of the StatsD server to send metrics to. + +.. envvar:: STATSD_PORT + + The TCP port number that the StatsD server is listening on. This defaults to 8125 if it is not configured. + +.. envvar:: STATSD_PREFIX + + Optional prefix to use for metric paths. See the documentation for :class:`~sprockets_statsd.tornado.Application` + for addition notes on setting the path prefix when using the Tornado helpers. + +.. envvar:: STATSD_PROTOCOL + + The IP protocol to use when connecting to the StatsD server. You can specify either "tcp" or "udp". The + default is "tcp" if it not not configured. + +If you are using the Tornado helper clases, then you can fine tune the metric payloads and the connector by +setting additional values in the ``statsd`` key of :attr:`tornado.web.Application.settings`. See the +:class:`sprockets_statsd.tornado.Application` class documentation for a description of the supported settings. + +Reference +========= + +.. autoclass:: sprockets_statsd.statsd.Connector + :members: + +Tornado helpers +--------------- +.. autoclass:: sprockets_statsd.tornado.Application + :members: + +.. autoclass:: sprockets_statsd.tornado.RequestHandler + :members: + +Internals +--------- +.. autoclass:: sprockets_statsd.statsd.Processor + :members: + +.. autoclass:: sprockets_statsd.statsd.StatsdProtocol + :members: + +.. autoclass:: sprockets_statsd.statsd.TCPProtocol + :members: + Release history =============== diff --git a/example.py b/example.py new file mode 100644 index 0000000..c5c0cfb --- /dev/null +++ b/example.py @@ -0,0 +1,42 @@ +import asyncio +import logging + +from tornado import ioloop, web + +import sprockets_statsd.tornado + + +class MyHandler(sprockets_statsd.tornado.RequestHandler, + web.RequestHandler): + async def get(self): + with self.execution_timer('some-operation'): + await self.do_something() + self.set_status(204) + + async def do_something(self): + await asyncio.sleep(1) + + +class Application(sprockets_statsd.tornado.Application, web.Application): + def __init__(self, **settings): + super().__init__([web.url('/', MyHandler)], **settings) + + async def on_start(self): + await self.start_statsd() + + async def on_stop(self): + await self.stop_statsd() + + +if __name__ == '__main__': + logging.basicConfig(level=logging.DEBUG) + app = Application() + app.listen(8888) + iol = ioloop.IOLoop.current() + try: + iol.add_callback(app.on_start) + iol.start() + except KeyboardInterrupt: + iol.add_future(asyncio.ensure_future(app.on_stop()), + lambda f: iol.stop()) + iol.start() diff --git a/setup.cfg b/setup.cfg index 7eb0493..ded2401 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,7 +1,7 @@ [metadata] name = sprockets-statsd version = attr: sprockets_statsd.version -description = Asynchronous Statsd connector. +description = Asynchronously send metrics to a statsd instance. long_description = file: README.rst license = BSD 3-Clause License url = https://sprockets-statsd.readthedocs.io/ @@ -29,17 +29,19 @@ classifiers = [options] packages = find: install_requires = - tornado [options.extras_require] +tornado = + tornado>=5 dev = - coverage - flake8 - flake8-import-order - sphinx - sphinx-autodoc-typehints - tox - yapf + asynctest==0.13.0 + coverage==5.5 + flake8==3.8.4 + flake8-import-order==0.18.1 + sphinx==3.5.2 + sphinx-autodoc-typehints==1.11.1 + tornado>=5 + yapf==0.30.0 [options.packages.find] exclude = @@ -51,6 +53,9 @@ nitpicky = 1 warning_is_error = 1 [coverage:report] +exclude_lines = + pragma: no cover + raise NotImplementedError fail_under = 100 show_missing = 1 @@ -59,10 +64,25 @@ branch = 1 source = sprockets_statsd [flake8] -application_import_names = statsd +application_import_names = sprockets_statsd,tests exclude = build,env,dist import_order_style = pycharm +[mypy] +cache_dir = build/mypy-cache +check_untyped_defs = true +show_error_codes = true +warn_no_return = true +warn_redundant_casts = true +warn_unused_configs = true +warn_unused_ignores = true + +[mypy-sprockets_statsd] +disallow_incomplete_defs = true +disallow_untyped_defs = true +no_implicit_optional = true +strict = true + [yapf] allow_split_before_dict_value = false indent_dictionary_value = true diff --git a/sprockets_statsd/statsd.py b/sprockets_statsd/statsd.py new file mode 100644 index 0000000..b35e605 --- /dev/null +++ b/sprockets_statsd/statsd.py @@ -0,0 +1,486 @@ +import asyncio +import logging +import socket +import typing + + +class Connector: + """Sends metrics to a statsd server. + + :param host: statsd server to send metrics to + :param port: socket port that the server is listening on + :keyword ip_protocol: IP protocol to use for the underlying + socket -- either ``socket.IPPROTO_TCP`` for TCP or + ``socket.IPPROTO_UDP`` for UDP sockets. + :keyword prefix: optional string to prepend to metric paths + :param kwargs: additional keyword parameters are passed + to the :class:`.Processor` initializer + + This class maintains a connection to a statsd server and + sends metric lines to it asynchronously. You must call the + :meth:`start` method when your application is starting. It + creates a :class:`~asyncio.Task` that manages the connection + to the statsd server. You must also call :meth:`.stop` before + terminating to ensure that all metrics are flushed to the + statsd server. + + Metrics are optionally prefixed with :attr:`prefix` before the + metric type prefix. This *should* be used to prevent metrics + from being overwritten when multiple applications share a StatsD + instance. Each metric type is also prefixed by one of the + following strings based on the metric type: + + +-------------------+---------------+-----------+ + | Method call | Prefix | Type code | + +-------------------+---------------+-----------+ + | :meth:`.incr` | ``counters.`` | ``c`` | + +-------------------+---------------+-----------+ + | :meth:`.decr` | ``counters.`` | ``c`` | + +-------------------+---------------+-----------+ + | :meth:`.gauge` | ``gauges.`` | ``g`` | + +-------------------+---------------+-----------+ + | :meth:`.timing` | ``timers.`` | ``ms`` | + +-------------------+---------------+-----------+ + + When the connector is *should_terminate*, metric payloads are + sent by calling the :meth:`.inject_metric` method. The payloads + are stored in an internal queue that is consumed whenever the + connection to the server is active. + + .. attribute:: prefix + :type: str + + String to prefix to all metrics *before* the metric type prefix. + + .. attribute:: processor + :type: Processor + + The statsd processor that maintains the connection and + sends the metric payloads. + + """ + logger: logging.Logger + prefix: str + processor: 'Processor' + + def __init__(self, + host: str, + port: int = 8125, + *, + prefix: str = '', + **kwargs: typing.Any) -> None: + self.logger = logging.getLogger(__package__).getChild('Connector') + self.prefix = f'{prefix}.' if prefix else prefix + self.processor = Processor(host=host, port=port, **kwargs) + self._processor_task: typing.Optional[asyncio.Task[None]] = None + + async def start(self) -> None: + """Start the processor in the background. + + This is a *blocking* method and does not return until the + processor task is actually running. + + """ + self._processor_task = asyncio.create_task(self.processor.run()) + await self.processor.running.wait() + + async def stop(self) -> None: + """Stop the background processor. + + Items that are currently in the queue will be flushed to + the statsd server if possible. This is a *blocking* method + and does not return until the background processor has + stopped. + + """ + await self.processor.stop() + + def incr(self, path: str, value: int = 1) -> None: + """Increment a counter metric. + + :param path: counter to increment + :param value: amount to increment the counter by + + """ + self.inject_metric(f'counters.{path}', str(value), 'c') + + def decr(self, path: str, value: int = 1) -> None: + """Decrement a counter metric. + + :param path: counter to decrement + :param value: amount to decrement the counter by + + This is equivalent to ``self.incr(path, -value)``. + + """ + self.inject_metric(f'counters.{path}', str(-value), 'c') + + def gauge(self, path: str, value: int, delta: bool = False) -> None: + """Manipulate a gauge metric. + + :param path: gauge to adjust + :param value: value to send + :param delta: is this an adjustment of the gauge? + + If the `delta` parameter is ``False`` (or omitted), then + `value` is the new value to set the gauge to. Otherwise, + `value` is an adjustment for the current gauge. + + """ + if delta: + payload = f'{value:+d}' + else: + payload = str(value) + self.inject_metric(f'gauges.{path}', payload, 'g') + + def timing(self, path: str, seconds: float) -> None: + """Send a timer metric. + + :param path: timer to append a value to + :param seconds: number of **seconds** to record + + """ + self.inject_metric(f'timers.{path}', str(seconds * 1000.0), 'ms') + + def inject_metric(self, path: str, value: str, type_code: str) -> None: + """Send a metric to the statsd server. + + :param path: formatted metric name + :param value: formatted metric value + :param type_code: type of the metric to send + + This method formats the payload and inserts it on the + internal queue for future processing. + + """ + payload = f'{self.prefix}{path}:{value}|{type_code}' + try: + self.processor.enqueue(payload.encode('utf-8')) + except asyncio.QueueFull: + self.logger.warning('statsd queue is full, discarding metric') + + +class StatsdProtocol(asyncio.BaseProtocol): + """Common interface for backend protocols/transports. + + UDP and TCP transports have different interfaces (sendto vs write) + so this class adapts them to a common protocol that our code + can depend on. + + .. attribute:: buffered_data + :type: bytes + + Bytes that are buffered due to low-level transport failures. + Since protocols & transports are created anew with each connect + attempt, the :class:`.Processor` instance ensures that data + buffered on a transport is copied over to the new transport + when creating a connection. + + .. attribute:: connected + :type: asyncio.Event + + Is the protocol currently connected? + + """ + buffered_data: bytes + ip_protocol: int = socket.IPPROTO_NONE + logger: logging.Logger + transport: typing.Optional[asyncio.BaseTransport] + + def __init__(self) -> None: + self.buffered_data = b'' + self.connected = asyncio.Event() + self.logger = logging.getLogger(__package__).getChild( + self.__class__.__name__) + self.transport = None + + def send(self, metric: bytes) -> None: + """Send a metric payload over the transport.""" + raise NotImplementedError() + + async def shutdown(self) -> None: + """Shutdown the transport and wait for it to close.""" + raise NotImplementedError() + + def connection_made(self, transport: asyncio.BaseTransport) -> None: + """Capture the new transport and set the connected event.""" + # NB - this will return a 4-part tuple in some cases + server, port = transport.get_extra_info('peername')[:2] + self.logger.info('connected to statsd %s:%s', server, port) + self.transport = transport + self.transport.set_protocol(self) + self.connected.set() + + def connection_lost(self, exc: typing.Optional[Exception]) -> None: + """Clear the connected event.""" + self.logger.warning('statsd server connection lost: %s', exc) + self.connected.clear() + + +class TCPProtocol(StatsdProtocol, asyncio.Protocol): + """StatsdProtocol implementation over a TCP/IP connection.""" + ip_protocol = socket.IPPROTO_TCP + transport: asyncio.WriteTransport + + def eof_received(self) -> None: + self.logger.warning('received EOF from statsd server') + self.connected.clear() + + def send(self, metric: bytes) -> None: + """Send `metric` to the server. + + If sending the metric fails, it will be saved in + ``self.buffered_data``. The processor will save and + restore the buffered data if it needs to create a + new protocol object. + + """ + if not self.buffered_data and not metric: + return + + self.buffered_data = self.buffered_data + metric + b'\n' + while (self.transport is not None and self.connected.is_set() + and self.buffered_data): + line, maybe_nl, rest = self.buffered_data.partition(b'\n') + line += maybe_nl + self.transport.write(line) + if self.transport.is_closing(): + self.logger.warning('transport closed during write') + break + self.buffered_data = rest + + async def shutdown(self) -> None: + """Close the transport after flushing any outstanding data.""" + self.logger.info('shutting down') + if self.connected.is_set(): + self.send(b'') # flush buffered data + self.transport.close() + while self.connected.is_set(): + await asyncio.sleep(0.1) + + +class UDPProtocol(StatsdProtocol, asyncio.DatagramProtocol): + """StatsdProtocol implementation over a UDP/IP connection.""" + ip_protocol = socket.IPPROTO_UDP + transport: asyncio.DatagramTransport + + def send(self, metric: bytes) -> None: + if metric: + self.transport.sendto(metric) + + async def shutdown(self) -> None: + self.logger.info('shutting down') + self.transport.close() + + +class Processor: + """Maintains the statsd connection and sends metric payloads. + + :param host: statsd server to send metrics to + :param port: TCP port that the server is listening on + :param max_queue_size: only allow this many elements to be + stored in the queue before discarding metrics + :param reconnect_sleep: number of seconds to sleep after socket + error occurs when connecting + :param wait_timeout: number os seconds to wait for a message to + arrive on the queue + + This class implements :class:`~asyncio.Protocol` for the statsd + TCP connection. The :meth:`.run` method is run as a background + :class:`~asyncio.Task` that consumes payloads from an internal + queue, connects to the TCP server as required, and sends the + already formatted payloads. + + .. attribute:: host + :type: str + + IP address or DNS name for the statsd server to send metrics to + + .. attribute:: port + :type: int + + TCP port number that the statsd server is listening on + + .. attribute:: should_terminate + :type: bool + + Flag that controls whether the background task is active or + not. This flag is set to :data:`False` when the task is started. + Setting it to :data:`True` will cause the task to shutdown in + an orderly fashion. + + .. attribute:: queue + :type: asyncio.Queue + + Formatted metric payloads to send to the statsd server. Enqueue + payloads to send them to the server. + + .. attribute:: running + :type: asyncio.Event + + Is the background task currently running? This is the event that + :meth:`.run` sets when it starts and it remains set until the task + exits. + + .. attribute:: stopped + :type: asyncio.Event + + Is the background task currently stopped? This is the event that + :meth:`.run` sets when it exits and that :meth:`.stop` blocks on + until the task stops. + + """ + + logger: logging.Logger + protocol: typing.Optional[StatsdProtocol] + queue: asyncio.Queue + _create_transport: typing.Callable[[], typing.Coroutine[ + typing.Any, typing.Any, typing.Tuple[asyncio.BaseTransport, + StatsdProtocol]]] + + def __init__(self, + *, + host: str, + port: int = 8125, + ip_protocol: int = socket.IPPROTO_TCP, + max_queue_size: int = 1000, + reconnect_sleep: float = 1.0, + wait_timeout: float = 0.1) -> None: + super().__init__() + if not host: + raise RuntimeError('host must be set') + try: + port = int(port) + if not port or port < 1: + raise RuntimeError( + f'port must be a positive integer: {port!r}') + except (TypeError, ValueError): + raise RuntimeError(f'port must be a positive integer: {port!r}') + + transport_creators = { + socket.IPPROTO_TCP: self._create_tcp_transport, + socket.IPPROTO_UDP: self._create_udp_transport, + } + try: + factory = transport_creators[ip_protocol] + except KeyError: + raise RuntimeError(f'ip_protocol {ip_protocol} is not supported') + else: + self._create_transport = factory # type: ignore + + self.host = host + self.port = port + self._ip_protocol = ip_protocol + self._reconnect_sleep = reconnect_sleep + self._wait_timeout = wait_timeout + + self.running = asyncio.Event() + self.stopped = asyncio.Event() + self.stopped.set() + self.logger = logging.getLogger(__package__).getChild('Processor') + self.should_terminate = False + self.protocol = None + self.queue = asyncio.Queue(maxsize=max_queue_size) + + @property + def connected(self) -> bool: + """Is the processor connected?""" + return self.protocol is not None and self.protocol.connected.is_set() + + def enqueue(self, metric: bytes) -> None: + self.queue.put_nowait(metric) + + async def run(self) -> None: + """Maintains the connection and processes metric payloads.""" + self.running.set() + self.stopped.clear() + self.should_terminate = False + while not self.should_terminate: + try: + await self._connect_if_necessary() + if self.connected: + await self._process_metric() + except asyncio.CancelledError: + self.logger.info('task cancelled, exiting') + self.should_terminate = True + except Exception as error: + self.logger.exception('unexpected error occurred: %s', error) + self.should_terminate = True + + self.should_terminate = True + self.logger.info('loop finished with %d metrics in the queue', + self.queue.qsize()) + if self.connected: + num_ready = max(self.queue.qsize(), 1) + self.logger.info('draining %d metrics', num_ready) + for _ in range(num_ready): + await self._process_metric() + self.logger.debug('closing transport') + if self.protocol is not None: + await self.protocol.shutdown() + + self.logger.info('processor is exiting') + self.running.clear() + self.stopped.set() + + async def stop(self) -> None: + """Stop the processor. + + This is an asynchronous but blocking method. It does not + return until enqueued metrics are flushed and the processor + connection is closed. + + """ + self.should_terminate = True + await self.stopped.wait() + + async def _create_tcp_transport( + self) -> typing.Tuple[asyncio.BaseTransport, StatsdProtocol]: + t, p = await asyncio.get_running_loop().create_connection( + protocol_factory=TCPProtocol, host=self.host, port=self.port) + return t, typing.cast(StatsdProtocol, p) + + async def _create_udp_transport( + self) -> typing.Tuple[asyncio.BaseTransport, StatsdProtocol]: + t, p = await asyncio.get_running_loop().create_datagram_endpoint( + protocol_factory=UDPProtocol, + remote_addr=(self.host, self.port), + reuse_port=True) + return t, typing.cast(StatsdProtocol, p) + + async def _connect_if_necessary(self) -> None: + if self.protocol is not None: + try: + await asyncio.wait_for(self.protocol.connected.wait(), + self._wait_timeout) + except asyncio.TimeoutError: + self.logger.debug('protocol is no longer connected') + + if not self.connected: + try: + buffered_data = b'' + if self.protocol is not None: + buffered_data = self.protocol.buffered_data + t, p = await self._create_transport() # type: ignore[misc] + transport, self.protocol = t, p + self.protocol.buffered_data = buffered_data + self.logger.info('connection established to %s', + transport.get_extra_info('peername')) + except IOError as error: + self.logger.warning('connection to %s:%s failed: %s', + self.host, self.port, error) + await asyncio.sleep(self._reconnect_sleep) + + async def _process_metric(self) -> None: + try: + metric = await asyncio.wait_for(self.queue.get(), + self._wait_timeout) + self.logger.debug('received %r from queue', metric) + self.queue.task_done() + except asyncio.TimeoutError: + # we still want to invoke the protocol send in case + # it has queued metrics to send + metric = b'' + + assert self.protocol is not None # AFAICT, this cannot happen + self.protocol.send(metric) diff --git a/sprockets_statsd/tornado.py b/sprockets_statsd/tornado.py new file mode 100644 index 0000000..5e368c6 --- /dev/null +++ b/sprockets_statsd/tornado.py @@ -0,0 +1,197 @@ +import contextlib +import os +import socket +import time +import typing + +from tornado import web + +from sprockets_statsd import statsd + + +class Application(web.Application): + """Mix this into your application to add a statsd connection. + + .. attribute:: statsd_connector + :type: sprockets_statsd.statsd.Connector + + Connection to the StatsD server that is set between calls + to :meth:`.start_statsd` and :meth:`.stop_statsd`. + + This mix-in is configured by the ``statsd`` settings key. The + value is a dictionary with the following keys. + + +-------------------+---------------------------------------------+ + | host | the statsd host to send metrics to | + +-------------------+---------------------------------------------+ + | port | port number that statsd is listening on | + +-------------------+---------------------------------------------+ + | prefix | segment to prefix to metrics | + +-------------------+---------------------------------------------+ + | protocol | "tcp" or "udp" | + +-------------------+---------------------------------------------+ + | reconnect_timeout | number of seconds to sleep after a statsd | + | | connection attempt fails | + +-------------------+---------------------------------------------+ + | wait_timeout | number of seconds to wait for a metric to | + | | arrive on the queue before verifying the | + | | connection | + +-------------------+---------------------------------------------+ + + **host** defaults to the :envvar:`STATSD_HOST` environment variable. + If this value is not set, then the statsd connector *WILL NOT* be + enabled. + + **port** defaults to the :envvar:`STATSD_PORT` environment variable + with a back up default of 8125 if the environment variable is not + set. + + **prefix** is prefixed to all metric paths. This provides a + namespace for metrics so that each applications metrics are maintained + in separate buckets. The default is to use the :envvar:`STATSD_PREFIX` + environment variable. If it is unset and the *service* and + *environment* keys are set in ``settings``, then the default is + ``applications..``. This is a convenient way to + maintain consistent metric paths when you are managing a larger number + of services. + + .. rubric:: Warning + + If you want to run without a prefix, then you are required to explicitly + set ``statsd.prefix`` to ``None``. This prevents accidentally polluting + the metric namespace with unqualified paths. + + **protocol** defaults to the :envvar:`STATSD_PROTOCOL` environment + variable with a back default of "tcp" if the environment variable + is not set. + + **reconnect_timeout** defaults to 1.0 seconds which limits the + aggressiveness of creating new TCP connections. + + **wait_timeout** defaults to 0.1 seconds which ensures that the + processor quickly responds to connection faults. + + """ + statsd_connector: typing.Optional[statsd.Connector] + + def __init__(self, *args: typing.Any, **settings: typing.Any): + statsd_settings = settings.setdefault('statsd', {}) + statsd_settings.setdefault('host', os.environ.get('STATSD_HOST')) + statsd_settings.setdefault('port', + os.environ.get('STATSD_PORT', '8125')) + statsd_settings.setdefault('protocol', + os.environ.get('STATSD_PROTOCOL', 'tcp')) + + if 'prefix' not in statsd_settings: + statsd_settings['prefix'] = os.environ.get('STATSD_PREFIX') + if not statsd_settings['prefix']: + try: + statsd_settings['prefix'] = '.'.join([ + 'applications', + settings['service'], + settings['environment'], + ]) + except KeyError: + raise RuntimeError( + 'statsd configuration error: prefix is not set. Set' + ' $STATSD_PREFIX or configure settings.statsd.prefix') + + super().__init__(*args, **settings) + + self.settings['statsd']['port'] = int(self.settings['statsd']['port']) + self.statsd_connector = None + + async def start_statsd(self) -> None: + """Start the connector during startup. + + Call this method during application startup to enable the statsd + connection. A new :class:`~sprockets_statsd.statsd.Connector` + instance will be created and started. This method does not return + until the connector is running. + + """ + if self.statsd_connector is None: + kwargs = self.settings['statsd'].copy() + protocol = kwargs.pop('protocol', None) + if protocol == 'tcp': + kwargs['ip_protocol'] = socket.IPPROTO_TCP + elif protocol == 'udp': + kwargs['ip_protocol'] = socket.IPPROTO_UDP + else: + raise RuntimeError(f'statsd configuration error: {protocol} ' + f'is not a valid protocol') + + self.statsd_connector = statsd.Connector(**kwargs) + await self.statsd_connector.start() + + async def stop_statsd(self) -> None: + """Stop the connector during shutdown. + + If the connector was started, then this method will gracefully + terminate it. The method does not return until after the + connector is stopped. + + """ + if self.statsd_connector is not None: + await self.statsd_connector.stop() + self.statsd_connector = None + + +class RequestHandler(web.RequestHandler): + """Mix this into your handler to send metrics to a statsd server.""" + statsd_connector: typing.Optional[statsd.Connector] + + def initialize(self, **kwargs: typing.Any) -> None: + super().initialize(**kwargs) + self.application: Application + self.statsd_connector = self.application.statsd_connector + + def __build_path(self, *path: typing.Any) -> str: + return '.'.join(str(c) for c in path) + + def record_timing(self, secs: float, *path: typing.Any) -> None: + """Record the duration. + + :param secs: number of seconds to record + :param path: path to record the duration under + + """ + if self.statsd_connector is not None: + self.statsd_connector.timing(self.__build_path(*path), secs) + + def increase_counter(self, *path: typing.Any, amount: int = 1) -> None: + """Adjust a counter. + + :param path: path of the counter to adjust + :param amount: amount to adjust the counter by. Defaults to + 1 and can be negative + + """ + if self.statsd_connector is not None: + self.statsd_connector.incr(self.__build_path(*path), amount) + + @contextlib.contextmanager + def execution_timer( + self, *path: typing.Any) -> typing.Generator[None, None, None]: + """Record the execution duration of a block of code. + + :param path: path to record the duration as + + """ + start = time.time() + try: + yield + finally: + self.record_timing(time.time() - start, *path) + + def on_finish(self) -> None: + """Extended to record the request time as a duration. + + This method extends :meth:`tornado.web.RequestHandler.on_finish` + to record ``self.request.request_time`` as a timing metric. + + """ + super().on_finish() + self.record_timing(self.request.request_time(), + self.__class__.__name__, self.request.method, + self.get_status()) diff --git a/tests/helpers.py b/tests/helpers.py new file mode 100644 index 0000000..91a21ef --- /dev/null +++ b/tests/helpers.py @@ -0,0 +1,132 @@ +import asyncio +import io +import socket +import typing + + +class StatsdServer(asyncio.DatagramProtocol, asyncio.Protocol): + metrics: typing.List[bytes] + + def __init__(self, ip_protocol): + self.server = None + self.host = '127.0.0.1' + self.port = 0 + self.ip_protocol = ip_protocol + self.connections_made = 0 + self.connections_lost = 0 + self.message_counter = 0 + + self.metrics = [] + self.running = asyncio.Event() + self.client_connected = asyncio.Semaphore(value=0) + self.message_received = asyncio.Semaphore(value=0) + self.transports: typing.List[asyncio.BaseTransport] = [] + + self._buffer = io.BytesIO() + + async def run(self): + await self._reset() + + loop = asyncio.get_running_loop() + if self.ip_protocol == socket.IPPROTO_TCP: + server = await loop.create_server(lambda: self, + self.host, + self.port, + reuse_port=True) + self.server = server + listening_sock = typing.cast(typing.List[socket.socket], + server.sockets)[0] + self.host, self.port = listening_sock.getsockname() + self.running.set() + try: + await server.serve_forever() + except asyncio.CancelledError: + self.close() + await server.wait_closed() + except Exception as error: + raise error + finally: + self.running.clear() + + elif self.ip_protocol == socket.IPPROTO_UDP: + transport, protocol = await loop.create_datagram_endpoint( + lambda: self, + local_addr=(self.host, self.port), + reuse_port=True) + self.server = transport + self.host, self.port = transport.get_extra_info('sockname') + self.running.set() + try: + while not transport.is_closing(): + await asyncio.sleep(0.1) + finally: + self.running.clear() + + def close(self): + if self.server is not None: + self.server.close() + for connected_client in self.transports: + connected_client.close() + self.transports.clear() + + async def wait_running(self): + await self.running.wait() + + async def wait_closed(self): + while self.running.is_set(): + await asyncio.sleep(0.1) + + def connection_made(self, transport: asyncio.BaseTransport): + self.client_connected.release() + self.connections_made += 1 + self.transports.append(transport) + + def connection_lost(self, exc) -> None: + self.connections_lost += 1 + + def data_received(self, data: bytes): + self._buffer.write(data) + self._process_buffer() + + def datagram_received(self, data: bytes, _addr): + self._buffer.write(data + b'\n') + self._process_buffer() + + def _process_buffer(self): + buf = self._buffer.getvalue() + if b'\n' in buf: + buf_complete = buf[-1] == ord('\n') + if not buf_complete: + offset = buf.rfind(b'\n') + self._buffer = io.BytesIO(buf[offset:]) + buf = buf[:offset] + else: + self._buffer = io.BytesIO() + buf = buf[:-1] + + for metric in buf.split(b'\n'): + self.metrics.append(metric) + self.message_received.release() + self.message_counter += 1 + + async def _reset(self): + self._buffer = io.BytesIO() + self.connections_made = 0 + self.connections_lost = 0 + self.message_counter = 0 + self.metrics.clear() + for transport in self.transports: + transport.close() + self.transports.clear() + + self.running.clear() + await self._drain_semaphore(self.client_connected) + await self._drain_semaphore(self.message_received) + + @staticmethod + async def _drain_semaphore(semaphore: asyncio.Semaphore): + while not semaphore.locked(): + try: + await asyncio.wait_for(semaphore.acquire(), 0.1) + except asyncio.TimeoutError: + break diff --git a/tests/test_processor.py b/tests/test_processor.py new file mode 100644 index 0000000..4b802da --- /dev/null +++ b/tests/test_processor.py @@ -0,0 +1,403 @@ +import asyncio +import logging +import socket +import time +import typing + +import asynctest + +from sprockets_statsd import statsd +from tests import helpers + + +class ProcessorTestCase(asynctest.TestCase): + ip_protocol: int + + async def setUp(self): + self.test_timeout = 5.0 + super().setUp() + await self.asyncSetUp() + + async def tearDown(self): + await self.asyncTearDown() + super().tearDown() + + async def wait_for(self, fut): + try: + await asyncio.wait_for(fut, timeout=self.test_timeout) + except asyncio.TimeoutError: + self.fail('future took too long to resolve') + + async def asyncSetUp(self): + self.statsd_server = helpers.StatsdServer(self.ip_protocol) + self.statsd_task = asyncio.create_task(self.statsd_server.run()) + await self.statsd_server.wait_running() + + async def asyncTearDown(self): + self.statsd_server.close() + await self.statsd_server.wait_closed() + + +class ProcessorTests(ProcessorTestCase): + ip_protocol = socket.IPPROTO_TCP + + async def test_that_processor_connects_and_disconnects(self): + processor = statsd.Processor(host=self.statsd_server.host, + port=self.statsd_server.port) + asyncio.create_task(processor.run()) + await self.wait_for(self.statsd_server.client_connected.acquire()) + await self.wait_for(processor.stop()) + + self.assertEqual(1, self.statsd_server.connections_made) + self.assertEqual(1, self.statsd_server.connections_lost) + + async def test_that_processor_reconnects(self): + processor = statsd.Processor(host=self.statsd_server.host, + port=self.statsd_server.port) + asyncio.create_task(processor.run()) + await self.wait_for(self.statsd_server.client_connected.acquire()) + + # Now that the server is running and the client has connected, + # cancel the server and let it die off. + self.statsd_server.close() + await self.statsd_server.wait_closed() + until = time.time() + self.test_timeout + while processor.connected: + await asyncio.sleep(0.1) + if time.time() >= until: + self.fail('processor never disconnected') + + # Start the server on the same port and let the client reconnect. + self.statsd_task = asyncio.create_task(self.statsd_server.run()) + await self.wait_for(self.statsd_server.client_connected.acquire()) + self.assertTrue(processor.connected) + + await self.wait_for(processor.stop()) + + async def test_that_processor_can_be_cancelled(self): + processor = statsd.Processor(host=self.statsd_server.host, + port=self.statsd_server.port) + task = asyncio.create_task(processor.run()) + await self.wait_for(self.statsd_server.client_connected.acquire()) + + task.cancel() + await self.wait_for(processor.stopped.wait()) + + async def test_shutdown_when_disconnected(self): + processor = statsd.Processor(host=self.statsd_server.host, + port=self.statsd_server.port) + asyncio.create_task(processor.run()) + await self.wait_for(self.statsd_server.client_connected.acquire()) + + self.statsd_server.close() + await self.statsd_server.wait_closed() + + await self.wait_for(processor.stop()) + + async def test_socket_resets(self): + processor = statsd.Processor(host=self.statsd_server.host, + port=self.statsd_server.port) + asyncio.create_task(processor.run()) + await self.wait_for(self.statsd_server.client_connected.acquire()) + + self.statsd_server.transports[0].close() + await self.wait_for(self.statsd_server.client_connected.acquire()) + await self.wait_for(processor.stop()) + + async def test_that_stopping_when_not_running_is_safe(self): + processor = statsd.Processor(host=self.statsd_server.host, + port=self.statsd_server.port) + await self.wait_for(processor.stop()) + + def test_that_processor_fails_when_host_is_none(self): + with self.assertRaises(RuntimeError) as context: + statsd.Processor(host=None, port=12345) # type: ignore[arg-type] + self.assertIn('host', str(context.exception)) + + async def test_starting_and_stopping_without_connecting(self): + host, port = self.statsd_server.host, self.statsd_server.port + self.statsd_server.close() + await self.wait_for(self.statsd_server.wait_closed()) + processor = statsd.Processor(host=host, port=port) + asyncio.create_task(processor.run()) + await self.wait_for(processor.running.wait()) + await processor.stop() + + async def test_that_protocol_exceptions_are_logged(self): + processor = statsd.Processor(host=self.statsd_server.host, + port=self.statsd_server.port) + asyncio.create_task(processor.run()) + await self.wait_for(processor.running.wait()) + + with self.assertLogs(processor.logger, level=logging.ERROR) as cm: + processor.queue.put_nowait('not-bytes') # type: ignore[arg-type] + while processor.queue.qsize() > 0: + await asyncio.sleep(0.1) + + for record in cm.records: + if record.exc_info is not None and record.funcName == 'run': + break + else: + self.fail('Expected run to log exception') + + await processor.stop() + + +class TCPProcessingTests(ProcessorTestCase): + ip_protocol = socket.IPPROTO_TCP + + async def asyncSetUp(self): + await super().asyncSetUp() + self.processor = statsd.Processor(host=self.statsd_server.host, + port=self.statsd_server.port, + reconnect_sleep=0.25) + asyncio.create_task(self.processor.run()) + await self.wait_for(self.statsd_server.client_connected.acquire()) + + async def asyncTearDown(self): + await self.processor.stop() + await super().asyncTearDown() + + async def test_connection_failures(self): + # Change the port and close the transport, this will cause the + # processor to reconnect to the new port and fail. + self.processor.port = 1 + self.processor.protocol.transport.close() + + # Wait for the processor to be disconnected, then change the + # port back and let the processor reconnect. + while self.processor.connected: + await asyncio.sleep(0.1) + await asyncio.sleep(0.2) + self.processor.port = self.statsd_server.port + + await self.wait_for(self.statsd_server.client_connected.acquire()) + + async def test_socket_closure_while_processing_failed_event(self): + state = {'first_time': True} + real_process_metric = self.processor._process_metric + + async def fake_process_metric(): + if state['first_time']: + self.processor.protocol.buffered_data = b'counter:1|c\n' + self.processor.protocol.transport.close() + state['first_time'] = False + return await real_process_metric() + + self.processor._process_metric = fake_process_metric + + await self.wait_for(self.statsd_server.message_received.acquire()) + + async def test_socket_closure_while_sending(self): + state = {'first_time': True} + protocol = typing.cast(statsd.TCPProtocol, self.processor.protocol) + real_transport_write = protocol.transport.write + + def fake_transport_write(data): + if state['first_time']: + self.processor.protocol.transport.close() + state['first_time'] = False + return real_transport_write(data) + + protocol.transport.write = fake_transport_write + self.processor.queue.put_nowait(b'counter:1|c') + await self.wait_for(self.statsd_server.message_received.acquire()) + + +class UDPProcessingTests(ProcessorTestCase): + ip_protocol = socket.IPPROTO_UDP + + async def asyncSetUp(self): + await super().asyncSetUp() + self.connector = statsd.Connector(host=self.statsd_server.host, + port=self.statsd_server.port, + ip_protocol=self.ip_protocol, + reconnect_sleep=0.25) + await self.connector.start() + + async def asyncTearDown(self): + await self.connector.stop() + await super().asyncTearDown() + + async def test_sending_metrics(self): + self.connector.incr('counter') + self.connector.timing('timer', 0.001) + await self.wait_for(self.statsd_server.message_received.acquire()) + await self.wait_for(self.statsd_server.message_received.acquire()) + + self.assertEqual(self.statsd_server.metrics[0], + b'counters.counter:1|c') + self.assertEqual(self.statsd_server.metrics[1], b'timers.timer:1.0|ms') + + async def test_that_client_sends_to_new_server(self): + self.statsd_server.close() + await self.statsd_server.wait_closed() + + self.connector.incr('should.be.lost') + await asyncio.sleep(self.connector.processor._wait_timeout * 2) + + self.statsd_task = asyncio.create_task(self.statsd_server.run()) + await self.statsd_server.wait_running() + + self.connector.incr('should.be.recvd') + await self.wait_for(self.statsd_server.message_received.acquire()) + self.assertEqual(self.statsd_server.metrics[0], + b'counters.should.be.recvd:1|c') + + async def test_that_client_handles_socket_closure(self): + self.connector.processor.protocol.transport.close() + await self.wait_for( + asyncio.sleep(self.connector.processor._reconnect_sleep)) + + self.connector.incr('should.be.recvd') + await self.wait_for(self.statsd_server.message_received.acquire()) + self.assertEqual(self.statsd_server.metrics[0], + b'counters.should.be.recvd:1|c') + + +class ConnectorTests(ProcessorTestCase): + ip_protocol = socket.IPPROTO_TCP + + async def asyncSetUp(self): + await super().asyncSetUp() + self.connector = statsd.Connector(self.statsd_server.host, + self.statsd_server.port) + await self.connector.start() + await self.wait_for(self.statsd_server.client_connected.acquire()) + + async def asyncTearDown(self): + await self.wait_for(self.connector.stop()) + await super().asyncTearDown() + + def assert_metrics_equal(self, recvd: bytes, path, value, type_code): + decoded = recvd.decode('utf-8') + recvd_path, _, rest = decoded.partition(':') + recvd_value, _, recvd_code = rest.partition('|') + self.assertEqual(path, recvd_path, 'metric path mismatch') + self.assertEqual(recvd_value, str(value), 'metric value mismatch') + self.assertEqual(recvd_code, type_code, 'metric type mismatch') + + async def test_adjusting_counter(self): + self.connector.incr('simple.counter') + await self.wait_for(self.statsd_server.message_received.acquire()) + self.assert_metrics_equal(self.statsd_server.metrics[-1], + 'counters.simple.counter', 1, 'c') + + self.connector.incr('simple.counter', 10) + await self.wait_for(self.statsd_server.message_received.acquire()) + self.assert_metrics_equal(self.statsd_server.metrics[-1], + 'counters.simple.counter', 10, 'c') + + self.connector.decr('simple.counter') + await self.wait_for(self.statsd_server.message_received.acquire()) + self.assert_metrics_equal(self.statsd_server.metrics[-1], + 'counters.simple.counter', -1, 'c') + + self.connector.decr('simple.counter', 10) + await self.wait_for(self.statsd_server.message_received.acquire()) + self.assert_metrics_equal(self.statsd_server.metrics[-1], + 'counters.simple.counter', -10, 'c') + + async def test_adjusting_gauge(self): + self.connector.gauge('simple.gauge', 100) + self.connector.gauge('simple.gauge', -10, delta=True) + self.connector.gauge('simple.gauge', 10, delta=True) + for _ in range(3): + await self.wait_for(self.statsd_server.message_received.acquire()) + + self.assert_metrics_equal(self.statsd_server.metrics[0], + 'gauges.simple.gauge', '100', 'g') + self.assert_metrics_equal(self.statsd_server.metrics[1], + 'gauges.simple.gauge', '-10', 'g') + self.assert_metrics_equal(self.statsd_server.metrics[2], + 'gauges.simple.gauge', '+10', 'g') + + async def test_sending_timer(self): + secs = 12.34 + self.connector.timing('simple.timer', secs) + await self.wait_for(self.statsd_server.message_received.acquire()) + self.assert_metrics_equal(self.statsd_server.metrics[0], + 'timers.simple.timer', 12340.0, 'ms') + + async def test_that_queued_metrics_are_drained(self): + # The easiest way to test that the internal metrics queue + # is drained when the processor is stopped is to monkey + # patch the "process metric" method to enqueue a few + # metrics and then terminate the processor. It will exit + # the run loop and drain the queue. + real_process_metric = self.connector.processor._process_metric + + async def fake_process_metric(): + if not self.connector.processor.should_terminate: + self.connector.incr('counter', 1) + self.connector.incr('counter', 2) + self.connector.incr('counter', 3) + self.connector.processor.should_terminate = True + return await real_process_metric() + + self.connector.processor._process_metric = fake_process_metric + await self.wait_for(self.statsd_server.message_received.acquire()) + await self.wait_for(self.statsd_server.message_received.acquire()) + await self.wait_for(self.statsd_server.message_received.acquire()) + + async def test_metrics_sent_while_disconnected_are_queued(self): + self.statsd_server.close() + await self.statsd_server.wait_closed() + + for value in range(50): + self.connector.incr('counter', value) + + asyncio.create_task(self.statsd_server.run()) + await self.wait_for(self.statsd_server.client_connected.acquire()) + for value in range(50): + await self.wait_for(self.statsd_server.message_received.acquire()) + self.assertEqual(f'counters.counter:{value}|c'.encode(), + self.statsd_server.metrics.pop(0)) + + +class ConnectorOptionTests(ProcessorTestCase): + ip_protocol = socket.IPPROTO_TCP + + def test_protocol_values(self): + connector = statsd.Connector(host=self.statsd_server.host, + port=self.statsd_server.port) + self.assertEqual(socket.IPPROTO_TCP, connector.processor._ip_protocol) + + connector = statsd.Connector(host=self.statsd_server.host, + port=self.statsd_server.port, + ip_protocol=socket.IPPROTO_UDP) + self.assertEqual(socket.IPPROTO_UDP, connector.processor._ip_protocol) + + with self.assertRaises(RuntimeError): + statsd.Connector(host=self.statsd_server.host, + port=self.statsd_server.port, + ip_protocol=socket.IPPROTO_GRE) + + def test_invalid_port_values(self): + for port in {None, 0, -1, 'not-a-number'}: + with self.assertRaises(RuntimeError) as context: + statsd.Connector(host=self.statsd_server.host, port=port) + self.assertIn('port', str(context.exception)) + self.assertIn(repr(port), str(context.exception)) + + async def test_that_metrics_are_dropped_when_queue_overflows(self): + connector = statsd.Connector(host=self.statsd_server.host, + port=1, + max_queue_size=10) + await connector.start() + self.addCleanup(connector.stop) + + # fill up the queue with incr's + for expected_size in range(1, connector.processor.queue.maxsize + 1): + connector.incr('counter') + self.assertEqual(connector.processor.queue.qsize(), expected_size) + + # the following decr's should be ignored + for _ in range(10): + connector.decr('counter') + self.assertEqual(connector.processor.queue.qsize(), 10) + + # make sure that only the incr's are in the queue + for _ in range(connector.processor.queue.qsize()): + metric = await connector.processor.queue.get() + self.assertEqual(metric, b'counters.counter:1|c') diff --git a/tests/test_tornado.py b/tests/test_tornado.py new file mode 100644 index 0000000..77559b8 --- /dev/null +++ b/tests/test_tornado.py @@ -0,0 +1,322 @@ +import asyncio +import os +import socket +import time +import typing + +from tornado import testing, web + +import sprockets_statsd.tornado +from tests import helpers + +ParsedMetric = typing.Tuple[str, float, str] + + +class Handler(sprockets_statsd.tornado.RequestHandler, web.RequestHandler): + async def get(self): + with self.execution_timer('execution-timer'): + await asyncio.sleep(0.1) + self.increase_counter('request-count') + self.write('true') + + +class Application(sprockets_statsd.tornado.Application, web.Application): + def __init__(self, **settings): + super().__init__([web.url('/', Handler)], **settings) + + +class AsyncTestCaseWithTimeout(testing.AsyncTestCase): + def run_coroutine(self, coro): + loop: asyncio.AbstractEventLoop = self.io_loop.asyncio_loop + try: + loop.run_until_complete( + asyncio.wait_for(coro, + timeout=testing.get_async_test_timeout())) + except asyncio.TimeoutError: + self.fail(f'coroutine {coro} took too long to complete') + + +class ApplicationTests(AsyncTestCaseWithTimeout): + def setUp(self): + super().setUp() + self._environ = {} + + def tearDown(self): + super().tearDown() + for name, value in self._environ.items(): + if value is not None: + os.environ[name] = value + else: + os.environ.pop(name, None) + + def setenv(self, name, value): + self._environ.setdefault(name, os.environ.pop(name, None)) + os.environ[name] = value + + def unsetenv(self, name): + self._environ.setdefault(name, os.environ.pop(name, None)) + + def test_statsd_setting_defaults(self): + self.unsetenv('STATSD_HOST') + self.unsetenv('STATSD_PORT') + self.unsetenv('STATSD_PREFIX') + self.unsetenv('STATSD_PROTOCOL') + + app = sprockets_statsd.tornado.Application(statsd={'prefix': ''}) + self.assertIn('statsd', app.settings) + self.assertIsNone(app.settings['statsd']['host'], + 'default host value should be None') + self.assertEqual(8125, app.settings['statsd']['port']) + self.assertEqual('', app.settings['statsd']['prefix']) + self.assertEqual('tcp', app.settings['statsd']['protocol']) + + def test_that_statsd_settings_read_from_environment(self): + self.setenv('STATSD_HOST', 'statsd') + self.setenv('STATSD_PORT', '5218') + self.setenv('STATSD_PREFIX', 'my-service') + self.setenv('STATSD_PROTOCOL', 'udp') + + app = sprockets_statsd.tornado.Application() + self.assertIn('statsd', app.settings) + self.assertEqual('statsd', app.settings['statsd']['host']) + self.assertEqual(5218, app.settings['statsd']['port']) + self.assertEqual('my-service', app.settings['statsd']['prefix']) + self.assertEqual('udp', app.settings['statsd']['protocol']) + + def test_prefix_when_only_service_is_set(self): + with self.assertRaises(RuntimeError): + sprockets_statsd.tornado.Application(service='blah') + + def test_prefix_when_only_environment_is_set(self): + with self.assertRaises(RuntimeError): + sprockets_statsd.tornado.Application(environment='whatever') + + def test_prefix_default_when_service_and_environment_are_set(self): + app = sprockets_statsd.tornado.Application(environment='development', + service='my-service') + self.assertIn('statsd', app.settings) + self.assertEqual('applications.my-service.development', + app.settings['statsd']['prefix']) + + def test_overridden_settings(self): + self.setenv('STATSD_HOST', 'statsd') + self.setenv('STATSD_PORT', '9999') + self.setenv('STATSD_PREFIX', 'service') + self.setenv('STATSD_PROTOCOL', 'tcp') + app = sprockets_statsd.tornado.Application( + statsd={ + 'host': 'statsd.example.com', + 'port': 5218, + 'prefix': 'myapp', + 'protocol': 'udp', + }) + self.assertEqual('statsd.example.com', app.settings['statsd']['host']) + self.assertEqual(5218, app.settings['statsd']['port']) + self.assertEqual('myapp', app.settings['statsd']['prefix']) + self.assertEqual('udp', app.settings['statsd']['protocol']) + + def test_that_starting_without_host_fails(self): + self.unsetenv('STATSD_HOST') + app = sprockets_statsd.tornado.Application(statsd={'prefix': 'app'}) + with self.assertRaises(RuntimeError): + self.run_coroutine(app.start_statsd()) + + def test_creating_without_prefix_on_purpose(self): + self.unsetenv('STATSD_PREFIX') + app = sprockets_statsd.tornado.Application(statsd={ + 'host': 'statsd.example.com', + 'protocol': 'udp', + 'prefix': None, + }) + self.assertEqual(None, app.settings['statsd']['prefix']) + + def test_starting_with_calculated_prefix(self): + self.unsetenv('STATSD_PREFIX') + app = sprockets_statsd.tornado.Application( + environment='development', + service='my-service', + statsd={ + 'host': 'statsd.example.com', + 'protocol': 'udp', + }) + try: + self.run_coroutine(app.start_statsd()) + self.assertEqual('applications.my-service.development', + app.settings['statsd']['prefix']) + finally: + self.run_coroutine(app.stop_statsd()) + + def test_starting_twice(self): + app = sprockets_statsd.tornado.Application(statsd={ + 'host': 'localhost', + 'port': '8125', + 'prefix': 'my-service', + }) + try: + self.run_coroutine(app.start_statsd()) + connector = app.statsd_connector + self.assertIsNotNone(connector, 'statsd.Connector not created') + + self.run_coroutine(app.start_statsd()) + self.assertIs(app.statsd_connector, connector, + 'statsd.Connector should not be recreated') + finally: + self.run_coroutine(app.stop_statsd()) + + def test_stopping_without_starting(self): + app = sprockets_statsd.tornado.Application(statsd={ + 'host': 'localhost', + 'port': '8125', + 'prefix': 'my-service', + }) + self.run_coroutine(app.stop_statsd()) + + def test_optional_parameters(self): + app = sprockets_statsd.tornado.Application( + statsd={ + 'host': 'localhost', + 'port': '8125', + 'prefix': 'my-service', + 'reconnect_sleep': 0.5, + 'wait_timeout': 0.25, + }) + self.run_coroutine(app.start_statsd()) + + processor = app.statsd_connector.processor + self.assertEqual(0.5, processor._reconnect_sleep) + self.assertEqual(0.25, processor._wait_timeout) + self.run_coroutine(app.stop_statsd()) + + def test_starting_with_invalid_protocol(self): + app = sprockets_statsd.tornado.Application(statsd={ + 'host': 'localhost', + 'prefix': 'my-service', + 'protocol': 'unknown' + }) + with self.assertRaises(RuntimeError): + self.run_coroutine(app.start_statsd()) + + def test_that_protocol_strings_are_translated(self): + app = sprockets_statsd.tornado.Application(statsd={ + 'host': 'localhost', + 'prefix': 'my-service', + 'protocol': 'tcp', + }) + self.run_coroutine(app.start_statsd()) + self.assertEqual(socket.IPPROTO_TCP, + app.statsd_connector.processor._ip_protocol) + self.run_coroutine(app.stop_statsd()) + + app = sprockets_statsd.tornado.Application(statsd={ + 'host': 'localhost', + 'prefix': 'my-service', + 'protocol': 'udp', + }) + self.run_coroutine(app.start_statsd()) + self.assertEqual(socket.IPPROTO_UDP, + app.statsd_connector.processor._ip_protocol) + self.run_coroutine(app.stop_statsd()) + + def test_disabling_statsd_prefix(self): + app = sprockets_statsd.tornado.Application( + service='my-service', + version='1.0.0', + statsd={ + 'host': 'localhost', + 'prefix': '', + 'protocol': 'udp', + }, + ) + self.run_coroutine(app.start_statsd()) + self.assertEqual(app.statsd_connector.prefix, '') + self.run_coroutine(app.stop_statsd()) + + +class RequestHandlerTests(AsyncTestCaseWithTimeout, testing.AsyncHTTPTestCase): + def setUp(self): + super().setUp() + self.statsd_server = helpers.StatsdServer(socket.IPPROTO_TCP) + self.io_loop.spawn_callback(self.statsd_server.run) + self.run_coroutine(self.statsd_server.wait_running()) + + self.app.settings['statsd'].update({ + 'host': self.statsd_server.host, + 'port': self.statsd_server.port, + }) + self.run_coroutine(self.app.start_statsd()) + + def tearDown(self): + self.run_coroutine(self.app.stop_statsd()) + self.statsd_server.close() + self.run_coroutine(self.statsd_server.wait_closed()) + super().tearDown() + + def get_app(self): + self.app = Application(statsd={ + 'prefix': 'applications.service', + 'protocol': 'tcp', + }) + return self.app + + def wait_for_metrics(self, metric_count=3): + timeout_remaining = testing.get_async_test_timeout() + for _ in range(metric_count): + start = time.time() + try: + self.io_loop.run_sync( + self.statsd_server.message_received.acquire, + timeout=timeout_remaining) + except TimeoutError: + self.fail() + timeout_remaining -= (time.time() - start) + + def parse_metric(self, metric_line: bytes) -> ParsedMetric: + decoded = metric_line.decode() + path, _, rest = decoded.partition(':') + value, _, type_code = rest.partition('|') + try: + parsed_value = float(value) + except ValueError: + self.fail(f'value of {path} is not a number: value={value!r}') + return path, parsed_value, type_code + + def find_metric(self, needle: str) -> ParsedMetric: + encoded = needle.encode() + for line in self.statsd_server.metrics: + if encoded in line: + return self.parse_metric(line) + self.fail(f'failed to find metric containing {needle!r}') + + def test_the_request_metric_is_sent_last(self): + rsp = self.fetch('/') + self.assertEqual(200, rsp.code) + self.wait_for_metrics() + + path, _, type_code = self.find_metric('Handler.GET.200') + self.assertEqual(path, 'applications.service.timers.Handler.GET.200') + self.assertEqual('ms', type_code) + + def test_execution_timer(self): + rsp = self.fetch('/') + self.assertEqual(200, rsp.code) + self.wait_for_metrics() + + path, _, type_code = self.find_metric('execution-timer') + self.assertEqual('applications.service.timers.execution-timer', path) + self.assertEqual('ms', type_code) + + def test_counter(self): + rsp = self.fetch('/') + self.assertEqual(200, rsp.code) + self.wait_for_metrics() + + path, value, type_code = self.find_metric('request-count') + self.assertEqual('applications.service.counters.request-count', path) + self.assertEqual(1.0, value) + self.assertEqual('c', type_code) + + def test_handling_request_without_statsd_configured(self): + self.io_loop.run_sync(self.app.stop_statsd) + + rsp = self.fetch('/') + self.assertEqual(200, rsp.code) diff --git a/tox.ini b/tox.ini new file mode 100644 index 0000000..64481a1 --- /dev/null +++ b/tox.ini @@ -0,0 +1,19 @@ +[tox] +envlist = lint,py37,py38,py39,tornado5 +toxworkdir = ./build/tox + +[testenv] +deps = + .[dev,tornado] +commands = + python -m unittest + +[testenv:lint] +commands = + flake8 sprockets_statsd tests + yapf -dr sprockets_statsd tests + +[testenv:tornado5] +deps = + tornado>=5,<6 + .[dev]