sprockets.mixins.metrics/sprockets/mixins/metrics/statsd.py

235 lines
8.3 KiB
Python
Raw Normal View History

import asyncio
2016-01-19 16:02:04 +00:00
import contextlib
import logging
import os
2016-01-19 12:51:09 +00:00
import socket
2016-01-19 16:02:04 +00:00
import time
2016-01-19 12:51:09 +00:00
from tornado import iostream
2018-07-17 21:01:32 +00:00
LOGGER = logging.getLogger(__name__)
2016-03-10 20:45:50 +00:00
SETTINGS_KEY = 'sprockets.mixins.metrics.statsd'
"""``self.settings`` key that configures this mix-in."""
2016-01-19 12:51:09 +00:00
2018-12-13 22:30:42 +00:00
class StatsdMixin:
"""Mix this class in to record metrics to a Statsd server."""
2016-01-19 12:51:09 +00:00
def record_timing(self, duration, *path):
"""Record a timing.
2016-01-19 12:51:09 +00:00
This method records a timing to the application's namespace
followed by a calculated path. Each element of `path` is
converted to a string and normalized before joining the
elements by periods. The normalization process is little
more than replacing periods with dashes.
:param float duration: timing to record in seconds
:param path: elements of the metric path to record
2016-01-19 15:43:02 +00:00
"""
self.application.statsd.send(path, duration * 1000.0, 'ms')
2016-01-19 15:43:02 +00:00
def increase_counter(self, *path, **kwargs):
"""Increase a counter.
2016-01-19 15:43:02 +00:00
This method increases a counter within the application's
namespace. Each element of `path` is converted to a string
and normalized before joining the elements by periods. The
normalization process is little more than replacing periods
with dashes.
:param path: elements of the metric path to incr
:keyword int amount: amount to increase the counter by. If
omitted, the counter is increased by one.
2016-01-19 15:43:02 +00:00
"""
self.application.statsd.send(path, kwargs.get('amount', '1'), 'c')
2016-01-19 12:51:09 +00:00
2016-01-19 16:02:04 +00:00
@contextlib.contextmanager
def execution_timer(self, *path):
"""
Record the time it takes to perform an arbitrary code block.
:param path: elements of the metric path to record
This method returns a context manager that records the amount
of time spent inside of the context and submits a timing metric
to the specified `path` using (:meth:`record_timing`).
"""
start = time.time()
try:
yield
finally:
2016-03-10 20:45:50 +00:00
self.record_timing(max(start, time.time()) - start, *path)
2016-01-19 16:02:04 +00:00
2016-01-19 12:51:09 +00:00
def on_finish(self):
"""
Records the time taken to process the request.
This method records the amount of time taken to process the request
(as reported by
:meth:`~tornado.httputil.HTTPServerRequest.request_time`) under the
2016-01-19 12:51:09 +00:00
path defined by the class's module, it's name, the request method,
and the status code. The :meth:`.record_timing` method is used
to send the metric, so the configured namespace is used as well.
"""
2018-12-13 22:30:42 +00:00
super().on_finish()
self.record_timing(self.request.request_time(),
2016-01-19 12:51:09 +00:00
self.__class__.__name__, self.request.method,
2017-03-24 18:47:31 +00:00
self.get_status())
2018-12-13 22:30:42 +00:00
class StatsDCollector:
2018-07-18 19:01:06 +00:00
"""Collects and submits stats to StatsD.
This class should be constructed using the
:meth:`~sprockets.mixins.statsd.install` method. When installed,
it is attached to the :class:`~tornado.web.Application` instance
for your web application.
:param str host: The StatsD host
:param str port: The StatsD port
2018-07-18 19:01:06 +00:00
:param str protocol: The StatsD protocol. May be either ``udp`` or ``tcp``.
:param str namespace: The StatsD bucket to write metrics into.
:param bool prepend_metric_type: Optional flag to prepend bucket path
with the StatsD metric type
"""
METRIC_TYPES = {'c': 'counters',
'ms': 'timers'}
2018-07-18 15:05:27 +00:00
def __init__(self, host, port, protocol='udp', namespace='sprockets',
2016-12-08 20:48:32 +00:00
prepend_metric_type=True):
self._host = host
self._port = int(port)
2018-07-17 21:01:32 +00:00
self._address = (self._host, self._port)
self._namespace = namespace
self._prepend_metric_type = prepend_metric_type
self._tcp_reconnect_sleep = 5
2018-07-17 21:01:32 +00:00
2018-07-18 15:05:27 +00:00
if protocol == 'tcp':
self._tcp = True
self._msg_format = '{path}:{value}|{metric_type}\n'
2018-07-18 15:05:27 +00:00
self._sock = self._tcp_socket()
elif protocol == 'udp':
2018-07-17 21:01:32 +00:00
self._tcp = False
self._msg_format = '{path}:{value}|{metric_type}'
2018-07-17 21:01:32 +00:00
self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
else:
2018-07-18 15:05:27 +00:00
raise ValueError('Invalid protocol: {}'.format(protocol))
2018-07-17 21:01:32 +00:00
def _tcp_socket(self):
"""Connect to statsd via TCP and return the IOStream handle.
:rtype: iostream.IOStream
"""
sock = iostream.IOStream(socket.socket(
socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP))
sock.connect(self._address, self._tcp_on_connected)
2018-07-17 21:01:32 +00:00
sock.set_close_callback(self._tcp_on_closed)
return sock
async def _tcp_on_closed(self):
2018-07-17 21:01:32 +00:00
"""Invoked when the socket is closed."""
LOGGER.warning('Not connected to statsd, connecting in %s seconds',
self._tcp_reconnect_sleep)
await asyncio.sleep(self._tcp_reconnect_sleep)
2018-07-17 21:01:32 +00:00
self._sock = self._tcp_socket()
def _tcp_on_connected(self):
"""Invoked when the IOStream is connected"""
LOGGER.debug('Connected to statsd at %s via TCP', self._address)
def send(self, path, value, metric_type):
"""Send a metric to Statsd.
:param list path: The metric path to record
:param mixed value: The value to record
:param str metric_type: The metric type
"""
msg = self._msg_format.format(
path=self._build_path(path, metric_type),
value=value,
metric_type=metric_type)
2018-07-17 21:01:32 +00:00
2018-07-18 15:43:25 +00:00
LOGGER.debug('Sending %s to %s:%s', msg.encode('ascii'),
self._host, self._port)
try:
2018-07-17 21:01:32 +00:00
if self._tcp:
if self._sock.closed():
return
2018-07-18 15:05:27 +00:00
return self._sock.write(msg.encode('ascii'))
self._sock.sendto(msg.encode('ascii'), (self._host, self._port))
except iostream.StreamClosedError as error: # pragma: nocover
LOGGER.warning('Error sending TCP statsd metric: %s', error)
2018-07-18 15:05:27 +00:00
except (OSError, socket.error) as error: # pragma: nocover
LOGGER.exception('Error sending statsd metric: %s', error)
def _build_path(self, path, metric_type):
"""Return a normalized path.
:param list path: elements of the metric path to record
:param str metric_type: The metric type
:rtype: str
"""
path = self._get_prefixes(metric_type) + list(path)
return '{}.{}'.format(self._namespace,
'.'.join(str(p).replace('.', '-') for p in path))
def _get_prefixes(self, metric_type):
"""Get prefixes where applicable
Add metric prefix counters, timers respectively if
2016-12-08 20:48:32 +00:00
:attr:`prepend_metric_type` flag is True.
:param str metric_type: The metric type
:rtype: list
"""
prefixes = []
if self._prepend_metric_type:
prefixes.append(self.METRIC_TYPES[metric_type])
return prefixes
def install(application, **kwargs):
"""Call this to install StatsD for the Tornado application.
:param tornado.web.Application application: the application to
install the collector into.
:param kwargs: keyword parameters to pass to the
:class:`StatsDCollector` initializer.
2016-12-08 20:48:32 +00:00
:returns: :data:`True` if the client was installed successfully,
or :data:`False` otherwise.
- **host** The StatsD host. If host is not specified, the
2016-08-02 21:31:32 +00:00
``STATSD_HOST`` environment variable, or default `127.0.0.1`,
will be pass into the :class:`StatsDCollector`.
- **port** The StatsD port. If port is not specified, the
``STATSD_PORT`` environment variable, or default `8125`,
will be pass into the :class:`StatsDCollector`.
- **namespace** The StatsD bucket to write metrics into.
"""
if getattr(application, 'statsd', None) is not None:
LOGGER.warning('Statsd collector is already installed')
return False
if 'host' not in kwargs:
kwargs['host'] = os.environ.get('STATSD_HOST', '127.0.0.1')
if 'port' not in kwargs:
kwargs['port'] = os.environ.get('STATSD_PORT', '8125')
2018-07-18 15:05:27 +00:00
if 'protocol' not in kwargs:
kwargs['protocol'] = os.environ.get('STATSD_PROTOCOL', 'udp')
2018-07-17 21:01:32 +00:00
setattr(application, 'statsd', StatsDCollector(**kwargs))
return True