Merge pull request #30 from nvllsvm/tornewdo

Drop InfluxDB, Add Tornado 5, Add Python 3.7
This commit is contained in:
Andrew Rabert 2019-01-10 16:56:36 -05:00 committed by GitHub
commit 3c0d4a0ed6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
19 changed files with 79 additions and 958 deletions

View file

@ -1,18 +1,14 @@
language: python
dist: xenial
python:
- 2.7
- 3.4
- 3.5
- 3.6
- 3.7-dev
- pypy
- pypy3
before_install:
- pip install nose coverage codecov
- pip install -r requires/testing.txt
- 3.7
install:
- pip install -e .
script: nosetests --with-coverage
- pip install -r requires/development.txt
script:
- nosetests --with-coverage
- python setup.py build_sphinx
- python setup.py check
- flake8
after_success:
- codecov
sudo: false
@ -25,4 +21,4 @@ deploy:
tags: true
distributions: sdist bdist_wheel
all_branches: true
python: 3.6
python: 3.7

View file

@ -3,7 +3,7 @@ sprockets.mixins.metrics
|Version| |Status| |Coverage| |License|
Adjust counter and timer metrics in `InfluxDB`_ or `StatsD`_ using the same API.
Adjust counter and timer metrics in `StatsD`_ using the same API.
The mix-in is configured through the ``tornado.web.Application`` settings
property using a key defined by the specific mix-in.
@ -26,7 +26,7 @@ call to the ``get`` method as well as a separate metric for the database query.
from sprockets.mixins import mediatype
from sprockets.mixins.metrics import statsd
from tornado import gen, web
from tornado import web
import queries
def make_application():
@ -47,10 +47,9 @@ call to the ``get`` method as well as a separate metric for the database query.
super(MyHandler, self).initialize()
self.db = queries.TornadoSession(os.environ['MY_PGSQL_DSN'])
@gen.coroutine
def get(self, obj_id):
async def get(self, obj_id):
with self.execution_timer('dbquery', 'get'):
result = yield self.db.query('SELECT * FROM foo WHERE id=%s',
result = await self.db.query('SELECT * FROM foo WHERE id=%s',
obj_id)
self.send_response(result)
@ -63,76 +62,6 @@ Settings
:prepend_metric_type: Optional flag to prepend bucket path with the StatsD
metric type
InfluxDB Mixin
--------------
The following snippet configures the InfluxDB mix-in from common environment
variables:
.. code-block:: python
import os
from sprockets.mixins.metrics import influxdb
from sprockets.mixins import postgresql
from tornado import gen, web
def make_app(**settings):
settings[influxdb.SETTINGS_KEY] = {
'measurement': 'rollup',
}
application = web.Application(
[
web.url(r'/', MyHandler),
], **settings)
influxdb.install({'url': 'http://localhost:8086',
'database': 'tornado-app'})
return application
class MyHandler(influxdb.InfluxDBMixin,
postgresql.HandlerMixin,
web.RequestHandler):
@gen.coroutine
def get(self, obj_id):
with self.execution_timer('dbquery', 'get'):
result = yield self.postgresql_session.query(
'SELECT * FROM foo WHERE id=%s', obj_id)
self.send_response(result)
If your application handles signal handling for shutdowns, the
:meth:`~sprockets.mixins.influxdb.shutdown` method will try to cleanly ensure
that any buffered metrics in the InfluxDB collector are written prior to
shutting down. The method returns a :class:`~tornado.concurrent.TracebackFuture`
that should be waited on prior to shutting down.
For environment variable based configuration, use the ``INFLUX_SCHEME``,
``INFLUX_HOST``, and ``INFLUX_PORT`` environment variables. The defaults are
``https``, ``localhost``, and ``8086`` respectively.
To use authentication with InfluxDB, set the ``INFLUX_USER`` and the
``INFLUX_PASSWORD`` environment variables. Once installed, the
``INFLUX_PASSWORD`` value will be masked in the Python process.
Settings
^^^^^^^^
:url: The InfluxDB API URL
:database: the database to write measurements into
:submission_interval: How often to submit metric batches in
milliseconds. Default: ``5000``
:max_batch_size: The number of measurements to be submitted in a
single HTTP request. Default: ``1000``
:tags: Default tags that are to be submitted with each metric. The tag
``hostname`` is added by default along with ``environment`` and ``service``
if the corresponding ``ENVIRONMENT`` or ``SERVICE`` environment variables
are set.
:auth_username: A username to use for InfluxDB authentication, if desired.
:auth_password: A password to use for InfluxDB authentication, if desired.
Development Quickstart
----------------------
.. code-block:: bash
@ -164,7 +93,6 @@ Development Quickstart
(env)$ open build/sphinx/html/index.html
.. _StatsD: https://github.com/etsy/statsd
.. _InfluxDB: https://influxdata.com
.. |Version| image:: https://img.shields.io/pypi/v/sprockets_mixins_metrics.svg

View file

@ -43,30 +43,12 @@ implements the same interface:
with self.execution_timer('db', 'query', 'foo'):
rows = yield self.session.query('SELECT * FROM foo')
.. method:: set_metric_tag(tag, value)
:noindex:
:param str tag: the tag to set
:param str value: the value to assign to the tag
This method stores a tag and value pair to be reported with
metrics. It is only implemented on back-ends that support
tagging metrics (e.g., :class:`sprockets.mixins.metrics.InfluxDBMixin`)
Statsd Implementation
---------------------
.. autoclass:: sprockets.mixins.metrics.statsd.StatsdMixin
:members:
InfluxDB Implementation
-----------------------
.. autoclass:: sprockets.mixins.metrics.influxdb.InfluxDBMixin
:members:
.. autoclass:: sprockets.mixins.metrics.influxdb.InfluxDBCollector
:members:
Testing Helpers
---------------
*So who actually tests that their metrics are emitted as they expect?*
@ -76,6 +58,3 @@ contains some helper that make testing a little easier.
.. autoclass:: sprockets.mixins.metrics.testing.FakeStatsdServer
:members:
.. autoclass:: sprockets.mixins.metrics.testing.FakeInfluxHandler
:members:

View file

@ -1,5 +1,3 @@
# -*- coding: utf-8 -*-
import alabaster
from sprockets.mixins import metrics
project = 'sprockets.mixins.metrics'
@ -7,23 +5,15 @@ copyright = 'AWeber Communications, Inc.'
version = metrics.__version__
release = '.'.join(str(v) for v in metrics.version_info[0:2])
needs_sphinx = '1.0'
extensions = [
'sphinx.ext.autodoc',
'sphinx.ext.intersphinx',
'sphinx.ext.viewcode',
]
templates_path = []
source_suffix = '.rst'
source_encoding = 'utf-8-sig'
master_doc = 'index'
exclude_patterns = []
pygments_style = 'sphinx'
html_style = 'custom.css'
html_static_path = ['_static']
html_theme = 'alabaster'
html_theme_path = [alabaster.get_path()]
html_sidebars = {
'**': ['about.html', 'navigation.html'],
}

View file

@ -16,22 +16,3 @@ as a base class.
.. literalinclude:: ../examples/statsd.py
:pyobject: SimpleHandler
Sending measurements to InfluxDB
--------------------------------
This simple application sends per-request measurements to an InfluxDB
server listening on ``localhost``. The mix-in is configured by passing
a ``sprockets.mixins.metrics.influxdb`` key into the application settings
as shown below.
.. literalinclude:: ../examples/influxdb.py
:pyobject: make_application
The InfluxDB database and measurement name are also configured in the
application settings object. The request handler is responsible for
providing the tag and value portions of the measurement. The standard
:class:`Metric Mixin API<sprockets.mixins.metrics.Mixin>` is used to set
tagged values.
.. literalinclude:: ../examples/influxdb.py
:pyobject: SimpleHandler

View file

@ -3,6 +3,15 @@
Release History
===============
`Next`_
-------
- Add support for Tornado 5
- Remove support for Tornado < 5
- Remove support for Python < 3.7
- Remove InfluxDB support (use `sprockets-influxdb`_)
.. _sprockets-influxdb: https://github.com/sprockets/sprockets-influxdb
`3.1.1`_ (07-Aug-2018)
----------------------
- Fixed bad formatted TCP StatsD messages by appending a newline
@ -79,7 +88,8 @@ Release History
- Add :class:`sprockets.mixins.metrics.InfluxDBMixin`
- Add :class:`sprockets.mixins.metrics.influxdb.InfluxDBConnection`
.. _Next Release: https://github.com/sprockets/sprockets.mixins.metrics/compare/3.1.0...master
.. _Next: https://github.com/sprockets/sprockets.mixins.metrics/compare/3.1.1...master
.. _3.1.1: https://github.com/sprockets/sprockets.mixins.metrics/compare/3.1.0...3.1.1
.. _3.1.0: https://github.com/sprockets/sprockets.mixins.metrics/compare/3.0.4...3.1.0
.. _3.0.4: https://github.com/sprockets/sprockets.mixins.metrics/compare/3.0.3...3.0.4
.. _3.0.3: https://github.com/sprockets/sprockets.mixins.metrics/compare/3.0.2...3.0.3

View file

@ -1,86 +0,0 @@
import signal
from sprockets.mixins.metrics import influxdb
from tornado import concurrent, gen, ioloop, web
class SimpleHandler(influxdb.InfluxDBMixin, web.RequestHandler):
"""
Simply emits a few metrics around the GET method.
The ``InfluxDBMixin`` sends all of the metrics gathered during
the processing of a request as a single measurement when the
request is finished. Each request of this sample will result
in a single measurement using the service name as the key.
The following tag keys are defined by default:
handler="examples.influxdb.SimpleHandler"
host="$HOSTNAME"
method="GET"
and the following values are written:
duration=0.2573668956756592
sleepytime=0.255108118057251
slept=42
status_code=204
The duration and status_code values are handled by the mix-in
and the slept and sleepytime values are added in the method.
"""
def initialize(self):
super(SimpleHandler, self).initialize()
self.set_metric_tag('environment', 'testing')
@gen.coroutine
def prepare(self):
maybe_future = super(SimpleHandler, self).prepare()
if concurrent.is_future(maybe_future):
yield maybe_future
if 'Correlation-ID' in self.request.headers:
self.set_metric_tag('correlation_id',
self.request.headers['Correlation-ID'])
@gen.coroutine
def get(self):
with self.execution_timer('sleepytime'):
yield gen.sleep(0.25)
self.increase_counter('slept', amount=42)
self.set_status(204)
self.finish()
def _sig_handler(*args_):
iol = ioloop.IOLoop.instance()
iol.add_callback_from_signal(iol.stop)
def make_application():
"""
Create a application configured to send metrics.
Measurements will be sent to the ``testing`` database on the
configured InfluxDB instance. The measurement name is set
by the ``service`` setting.
"""
settings = {
influxdb.SETTINGS_KEY: {
'measurement': 'example',
}
}
application = web.Application([web.url('/', SimpleHandler)], **settings)
influxdb.install(application, **{'database': 'testing'})
return application
if __name__ == '__main__':
app = make_application()
app.listen(8000)
signal.signal(signal.SIGINT, _sig_handler)
signal.signal(signal.SIGTERM, _sig_handler)
ioloop.IOLoop.instance().start()

View file

@ -1,7 +1,8 @@
import asyncio
import signal
from sprockets.mixins.metrics import statsd
from tornado import concurrent, gen, ioloop, web
from tornado import ioloop, web
class SimpleHandler(statsd.StatsdMixin, web.RequestHandler):
@ -14,19 +15,8 @@ class SimpleHandler(statsd.StatsdMixin, web.RequestHandler):
"""
@gen.coroutine
def prepare(self):
maybe_future = super(SimpleHandler, self).prepare()
if concurrent.is_future(maybe_future):
yield maybe_future
if 'Correlation-ID' in self.request.headers:
self.set_metric_tag('correlation_id',
self.request.headers['Correlation-ID'])
@gen.coroutine
def get(self):
yield gen.sleep(0.25)
async def get(self):
await asyncio.sleep(0.25)
self.set_status(204)
self.finish()

View file

@ -1,3 +1,3 @@
-e .
-r docs.txt
-r testing.txt
coverage>=4.5,<5
Sphinx

1
requires/docs.txt Normal file
View file

@ -0,0 +1 @@
sphinx==1.8.2

View file

@ -1 +1 @@
tornado>=4.0,<4.5
tornado>=5

View file

@ -1,3 +1,3 @@
mock>=2,<3
nose>=1.3,<2
tornado>=4.2,<4.3
coverage==4.5.2
flake8==3.6.0
nose==1.3.7

View file

@ -1,6 +1,13 @@
[bdist_wheel]
universal = 1
[build_sphinx]
fresh-env = 1
warning-is-error = 1
[check]
strict = 1
[nosetests]
cover-package = sprockets.mixins.metrics
cover-branches = 1

View file

@ -1,28 +1,11 @@
#!/usr/bin/env python
#
import os.path
import pathlib
import setuptools
from sprockets.mixins import metrics
def read_requirements(name):
requirements = []
try:
with open(os.path.join('requires', name)) as req_file:
for line in req_file:
if '#' in line:
line = line[:line.index('#')]
line = line.strip()
if line.startswith('-r'):
requirements.extend(read_requirements(line[2:].strip()))
elif line and not line.startswith('-'):
requirements.append(line)
except IOError:
pass
return requirements
REPO = pathlib.Path(__file__).parent
setuptools.setup(
@ -34,8 +17,8 @@ setuptools.setup(
author_email='api@aweber.com',
license='BSD',
url='https://github.com/sprockets/sprockets.mixins.metrics',
install_requires=read_requirements('installation.txt'),
tests_require=read_requirements('testing.txt'),
install_requires=REPO.joinpath('requires/installation.txt').read_text(),
tests_require=REPO.joinpath('requires/testing.txt').read_text(),
packages=setuptools.find_packages(exclude=['examples.']),
namespace_packages=['sprockets', 'sprockets.mixins'],
classifiers=[
@ -45,18 +28,12 @@ setuptools.setup(
'License :: OSI Approved :: BSD License',
'Natural Language :: English',
'Operating System :: OS Independent',
'Programming Language :: Python :: 2',
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.4',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: Implementation :: CPython',
'Programming Language :: Python :: Implementation :: PyPy',
'Topic :: Internet :: WWW/HTTP',
'Topic :: Software Development :: Libraries',
'Topic :: Software Development :: Libraries :: Python Modules'],
test_suite='nose.collector',
python_requires='>=3.7',
zip_safe=True,
)

View file

@ -1,362 +0,0 @@
import contextlib
import logging
import os
import socket
import time
from tornado import concurrent, httpclient, ioloop
from sprockets.mixins.metrics import __version__
LOGGER = logging.getLogger(__name__)
SETTINGS_KEY = 'sprockets.mixins.metrics.influxdb'
"""``self.settings`` key that configures this mix-in."""
_USER_AGENT = 'sprockets.mixins.metrics/v{}'.format(__version__)
class InfluxDBMixin(object):
"""Mix this class in to record measurements to a InfluxDB server."""
def __init__(self, application, request, **kwargs):
self.__metrics = []
self.__tags = {
'handler': '{}.{}'.format(self.__module__,
self.__class__.__name__),
'method': request.method,
}
# Call to super().__init__() needs to be *AFTER* we create our
# properties since it calls initialize() which may want to call
# methods like ``set_metric_tag``
super(InfluxDBMixin, self).__init__(application, request, **kwargs)
def set_metric_tag(self, tag, value):
"""
Add a tag to the measurement key.
:param str tag: name of the tag to set
:param str value: value to assign
This will overwrite the current value assigned to a tag
if one exists.
"""
self.__tags[tag] = value
def record_timing(self, duration, *path):
"""
Record a timing.
:param float duration: timing to record in seconds
:param path: elements of the metric path to record
A timing is a named duration value.
"""
self.__metrics.append('{}={}'.format(
self.application.influxdb.escape_str('.'.join(path)), duration))
def increase_counter(self, *path, **kwargs):
"""
Increase a counter.
:param path: elements of the path to record
:keyword int amount: value to record. If omitted, the counter
value is one.
Counters are simply values that are summed in a query.
"""
self.__metrics.append('{}={}'.format(
self.application.influxdb.escape_str('.'.join(path)),
kwargs.get('amount', 1)))
@contextlib.contextmanager
def execution_timer(self, *path):
"""
Record the time it takes to run an arbitrary code block.
:param path: elements of the metric path to record
This method returns a context manager that records the amount
of time spent inside of the context and records a value
named `path` using (:meth:`record_timing`).
"""
start = time.time()
try:
yield
finally:
self.record_timing(max(time.time(), start) - start, *path)
def on_finish(self):
super(InfluxDBMixin, self).on_finish()
self.set_metric_tag('status_code', self.get_status())
self.record_timing(self.request.request_time(), 'duration')
self.application.influxdb.submit(
self.settings[SETTINGS_KEY]['measurement'],
self.__tags,
self.__metrics)
class InfluxDBCollector(object):
"""Collects and submits stats to InfluxDB on a periodic callback.
:param str url: The InfluxDB API URL
:param str database: the database to write measurements into
:param tornado.ioloop.IOLoop: the IOLoop to spawn callbacks on.
If this parameter is :data:`None`, then the active IOLoop,
as determined by :meth:`tornado.ioloop.IOLoop.instance`,
is used.
:param int submission_interval: How often to submit metric batches in
milliseconds. Default: ``5000``
:param max_batch_size: The number of measurements to be submitted in a
single HTTP request. Default: ``1000``
:param dict tags: Default tags that are to be submitted with each metric.
:param str auth_username: Optional username for authenticated requests.
:param str auth_password: Optional password for authenticated requests.
This class should be constructed using the
:meth:`~sprockets.mixins.influxdb.install` method. When installed, it is
attached to the :class:`~tornado.web.Application` instance for your web
application and schedules a periodic callback to submit metrics to InfluxDB
in batches.
"""
SUBMISSION_INTERVAL = 5000
MAX_BATCH_SIZE = 1000
WARN_THRESHOLD = 25000
def __init__(self, url='http://localhost:8086', database='sprockets',
io_loop=None, submission_interval=SUBMISSION_INTERVAL,
max_batch_size=MAX_BATCH_SIZE, tags=None,
auth_username=None, auth_password=None):
self._buffer = list()
self._database = database
self._influxdb_url = '{}?db={}'.format(url, database)
self._interval = submission_interval or self.SUBMISSION_INTERVAL
self._io_loop = io_loop or ioloop.IOLoop.current()
self._max_batch_size = max_batch_size or self.MAX_BATCH_SIZE
self._pending = 0
self._tags = tags or {}
# Configure the default
defaults = {'user_agent': _USER_AGENT}
if auth_username and auth_password:
LOGGER.debug('Adding authentication info to defaults (%s)',
auth_username)
defaults['auth_username'] = auth_username
defaults['auth_password'] = auth_password
self._client = httpclient.AsyncHTTPClient(force_instance=True,
defaults=defaults,
io_loop=self._io_loop)
# Add the periodic callback for submitting metrics
LOGGER.info('Starting PeriodicCallback for writing InfluxDB metrics')
self._callback = ioloop.PeriodicCallback(self._write_metrics,
self._interval)
self._callback.start()
@staticmethod
def escape_str(value):
"""Escape the value with InfluxDB's wonderful escaping logic:
"Measurement names, tag keys, and tag values must escape any spaces or
commas using a backslash (\). For example: \ and \,. All tag values are
stored as strings and should not be surrounded in quotes."
:param str value: The value to be escaped
:rtype: str
"""
return str(value).replace(' ', '\ ').replace(',', '\,')
@property
def database(self):
"""Return the configured database name.
:rtype: str
"""
return self._database
def shutdown(self):
"""Invoke on shutdown of your application to stop the periodic
callbacks and flush any remaining metrics.
Returns a future that is complete when all pending metrics have been
submitted.
:rtype: :class:`~tornado.concurrent.TracebackFuture()`
"""
future = concurrent.TracebackFuture()
self._callback.stop()
self._write_metrics()
self._shutdown_wait(future)
return future
def submit(self, measurement, tags, values):
"""Add a measurement to the buffer that will be submitted to InfluxDB
on the next periodic callback for writing metrics.
:param str measurement: The measurement name
:param dict tags: The measurement tags
:param list values: The recorded measurements
"""
self._buffer.append('{},{} {} {:d}'.format(
self.escape_str(measurement),
self._get_tag_string(tags),
','.join(values),
int(time.time() * 1000000000)))
if len(self._buffer) > self.WARN_THRESHOLD:
LOGGER.warning('InfluxDB metric buffer is > %i (%i)',
self.WARN_THRESHOLD, len(self._buffer))
def _get_tag_string(self, tags):
"""Return the tags to be submitted with a measurement combining the
default tags that were passed in when constructing the class along
with any measurement specific tags passed into the
:meth:`~InfluxDBConnection.submit` method. Tags will be properly
escaped and formatted for submission.
:param dict tags: Measurement specific tags
:rtype: str
"""
values = dict(self._tags)
values.update(tags)
return ','.join(['{}={}'.format(self.escape_str(k), self.escape_str(v))
for k, v in values.items()])
def _on_write_response(self, response):
"""This is invoked by the Tornado IOLoop when the HTTP request to
InfluxDB has returned with a result.
:param response: The response from InfluxDB
:type response: :class:`~tornado.httpclient.HTTPResponse`
"""
self._pending -= 1
LOGGER.debug('InfluxDB batch response: %s', response.code)
if response.error:
LOGGER.error('InfluxDB batch submission error: %s', response.error)
def _shutdown_wait(self, future):
"""Pause briefly allowing any pending metric writes to complete before
shutting down.
:param future tornado.concurrent.TracebackFuture: The future to resulve
when the shutdown is complete.
"""
if not self._pending:
future.set_result(True)
return
LOGGER.debug('Waiting for pending metric writes')
self._io_loop.add_timeout(self._io_loop.time() + 0.1,
self._shutdown_wait,
(future,))
def _write_metrics(self):
"""Submit the metrics in the buffer to InfluxDB. This is invoked
by the periodic callback scheduled when the class is created.
It will submit batches until the buffer is empty.
"""
if not self._buffer:
return
LOGGER.debug('InfluxDB buffer has %i items', len(self._buffer))
while self._buffer:
body = '\n'.join(self._buffer[:self._max_batch_size])
self._buffer = self._buffer[self._max_batch_size:]
self._pending += 1
self._client.fetch(self._influxdb_url, method='POST',
body=body.encode('utf-8'),
raise_error=False,
callback=self._on_write_response)
LOGGER.debug('Submitted all InfluxDB metrics for writing')
def install(application, **kwargs):
"""Call this to install the InfluxDB collector into a Tornado application.
:param tornado.web.Application application: the application to
install the collector into.
:param kwargs: keyword parameters to pass to the
:class:`InfluxDBCollector` initializer.
:returns: :data:`True` if the client was installed by this call
and :data:`False` otherwise.
Optional configuration values:
- **url** The InfluxDB API URL. If URL is not specified, the
``INFLUX_HOST`` and ``INFLUX_PORT`` environment variables will be used
to construct the URL to pass into the :class:`InfluxDBCollector`.
- **database** the database to write measurements into.
The default is ``sprockets``.
- **io_loop** A :class:`~tornado.ioloop.IOLoop` to use
- **submission_interval** How often to submit metric batches in
milliseconds. Default: ``5000``
- **max_batch_size** The number of measurements to be submitted in a
single HTTP request. Default: ``1000``
- **tags** Default tags that are to be submitted with each metric.
- **auth_username** A username to use for InfluxDB authentication
- **auth_password** A password to use for InfluxDB authentication
If ``auth_password`` is specified as an environment variable, it will be
masked in the Python process.
"""
if getattr(application, 'influxdb', None) is not None:
LOGGER.warning('InfluxDBCollector is already installed')
return False
# Get config values
url = '{}://{}:{}/write'.format(os.environ.get('INFLUX_SCHEME', 'http'),
os.environ.get('INFLUX_HOST', 'localhost'),
os.environ.get('INFLUX_PORT', 8086))
kwargs.setdefault('url', url)
# Build the full tag dict and replace what was passed in
tags = {'hostname': socket.gethostname()}
if os.environ.get('ENVIRONMENT'):
tags['environment'] = os.environ.get('ENVIRONMENT')
if os.environ.get('SERVICE'):
tags['service'] = os.environ.get('SERVICE')
tags.update(kwargs.get('tags', {}))
kwargs['tags'] = tags
# Check if auth variables are set as env vars and set them if so
if os.environ.get('INFLUX_USER'):
kwargs.setdefault('auth_username', os.environ.get('INFLUX_USER'))
kwargs.setdefault('auth_password',
os.environ.get('INFLUX_PASSWORD', ''))
# Don't leave the environment variable out there with the password
if os.environ.get('INFLUX_PASSWORD'):
os.environ['INFLUX_PASSWORD'] = 'X' * len(kwargs['auth_password'])
# Create and start the collector
setattr(application, 'influxdb', InfluxDBCollector(**kwargs))
return True
def shutdown(application):
"""Invoke to shutdown the InfluxDB collector, writing any pending
measurements to InfluxDB before stopping.
:param tornado.web.Application application: the application to
install the collector into.
:rtype: tornado.concurrent.TracebackFuture or None
"""
collector = getattr(application, 'influxdb', None)
if collector:
return collector.shutdown()

View file

@ -1,10 +1,11 @@
import asyncio
import contextlib
import logging
import os
import socket
import time
from tornado import gen, iostream
from tornado import iostream
LOGGER = logging.getLogger(__name__)
@ -12,21 +13,9 @@ SETTINGS_KEY = 'sprockets.mixins.metrics.statsd'
"""``self.settings`` key that configures this mix-in."""
class StatsdMixin(object):
class StatsdMixin:
"""Mix this class in to record metrics to a Statsd server."""
def initialize(self):
super(StatsdMixin, self).initialize()
def set_metric_tag(self, tag, value):
"""Ignored for statsd since it does not support tagging.
:param str tag: name of the tag to set
:param str value: value to assign
"""
pass
def record_timing(self, duration, *path):
"""Record a timing.
@ -88,13 +77,13 @@ class StatsdMixin(object):
to send the metric, so the configured namespace is used as well.
"""
super(StatsdMixin, self).on_finish()
super().on_finish()
self.record_timing(self.request.request_time(),
self.__class__.__name__, self.request.method,
self.get_status())
class StatsDCollector(object):
class StatsDCollector:
"""Collects and submits stats to StatsD.
This class should be constructed using the
@ -143,12 +132,11 @@ class StatsDCollector(object):
sock.set_close_callback(self._tcp_on_closed)
return sock
@gen.engine
def _tcp_on_closed(self):
async def _tcp_on_closed(self):
"""Invoked when the socket is closed."""
LOGGER.warning('Not connected to statsd, connecting in %s seconds',
self._tcp_reconnect_sleep)
yield gen.sleep(self._tcp_reconnect_sleep)
await asyncio.sleep(self._tcp_reconnect_sleep)
self._sock = self._tcp_socket()
def _tcp_on_connected(self):

View file

@ -2,7 +2,7 @@ import logging
import re
import socket
from tornado import gen, iostream, locks, tcpserver, testing, web
from tornado import iostream, locks, tcpserver, testing
LOGGER = logging.getLogger(__name__)
@ -44,7 +44,7 @@ class FakeStatsdServer(tcpserver.TCPServer):
def tcp_server(self):
self.event = locks.Event()
super(FakeStatsdServer, self).__init__()
super().__init__()
sock, port = testing.bind_unused_port()
self.add_socket(sock)
@ -67,11 +67,10 @@ class FakeStatsdServer(tcpserver.TCPServer):
self.socket.close()
self.socket = None
@gen.coroutine
def handle_stream(self, stream, address):
async def handle_stream(self, stream, address):
while True:
try:
result = yield stream.read_until_regex(self.TCP_PATTERN)
result = await stream.read_until_regex(self.TCP_PATTERN)
except iostream.StreamClosedError:
break
else:
@ -119,89 +118,3 @@ class FakeStatsdServer(tcpserver.TCPServer):
raise AssertionError(
'Expected metric starting with "{}" in {!r}'.format(
prefix, self.datagrams))
class FakeInfluxHandler(web.RequestHandler):
"""
Request handler that mimics the InfluxDB write endpoint.
Install this handler into your testing application and configure
the metrics plugin to write to it. After running a test, you can
examine the received measurements by iterating over the
``influx_db`` list in the application object.
.. code-block:: python
class TestThatMyStuffWorks(testing.AsyncHTTPTestCase):
def get_app(self):
self.app = web.Application([
web.url('/', HandlerUnderTest),
web.url('/write', metrics.testing.FakeInfluxHandler),
])
return self.app
def setUp(self):
super(TestThatMyStuffWorks, self).setUp()
self.app.settings[metrics.InfluxDBMixin.SETTINGS_KEY] = {
'measurement': 'stuff',
'write_url': self.get_url('/write'),
'database': 'requests',
}
def test_that_measurements_are_emitted(self):
self.fetch('/') # invokes handler under test
measurements = metrics.testing.FakeInfluxHandler(
self.app, 'requests', self)
for key, fields, timestamp in measurements:
# inspect measurements
"""
def initialize(self):
super(FakeInfluxHandler, self).initialize()
self.logger = LOGGER.getChild(__name__)
if not hasattr(self.application, 'influx_db'):
self.application.influx_db = {}
if self.application.influxdb.database not in self.application.influx_db:
self.application.influx_db[self.application.influxdb.database] = []
def post(self):
db = self.get_query_argument('db')
payload = self.request.body.decode('utf-8')
for line in payload.splitlines():
self.logger.debug('received "%s"', line)
key, fields, timestamp = line.split()
self.application.influx_db[db].append((key, fields, timestamp,
self.request.headers))
self.set_status(204)
@staticmethod
def get_messages(application, test_case):
"""
Wait for measurements to show up and return them.
:param tornado.web.Application application: application that
:class:`.FakeInfluxHandler` is writing to
:param str database: database to retrieve
:param tornado.testing.AsyncTestCase test_case: test case
that is being executed
:return: measurements received as a :class:`list` of
(key, fields, timestamp) tuples
Since measurements are sent asynchronously from within the
``on_finish`` handler they are usually not sent by the time
that the test case has stopped the IOloop. This method accounts
for this by running the IOloop until measurements have been
received. It will raise an assertion error if measurements
are not received in a reasonable number of runs.
"""
for _ in range(0, 15):
if hasattr(application, 'influx_db'):
if application.influx_db.get(application.influxdb.database):
return application.influx_db[application.influxdb.database]
test_case.io_loop.add_future(gen.sleep(0.1),
lambda _: test_case.stop())
test_case.wait()
else:
test_case.fail('Message not published to InfluxDB before timeout')

225
tests.py
View file

@ -1,29 +1,21 @@
import base64
import asyncio
import itertools
import logging
import os
import socket
import time
import unittest
import uuid
from unittest import mock
from tornado import gen, iostream, testing, web
import mock
from mock import patch
from tornado import iostream, testing, web
from sprockets.mixins.metrics import influxdb, statsd
from sprockets.mixins.metrics.testing import (
FakeInfluxHandler, FakeStatsdServer)
import examples.influxdb
from sprockets.mixins.metrics import statsd
from sprockets.mixins.metrics.testing import FakeStatsdServer
import examples.statsd
class CounterBumper(statsd.StatsdMixin, web.RequestHandler):
@gen.coroutine
def get(self, counter, value):
async def get(self, counter, value):
with self.execution_timer(*counter.split('.')):
yield gen.sleep(float(value))
await asyncio.sleep(float(value))
self.set_status(204)
self.finish()
@ -74,7 +66,7 @@ class TCPStatsdMetricCollectionTests(testing.AsyncHTTPTestCase):
self.application = None
self.namespace = 'testing'
super(TCPStatsdMetricCollectionTests, self).setUp()
super().setUp()
self.statsd = FakeStatsdServer(self.io_loop, protocol='tcp')
statsd.install(self.application, **{'namespace': self.namespace,
@ -83,25 +75,13 @@ class TCPStatsdMetricCollectionTests(testing.AsyncHTTPTestCase):
'protocol': 'tcp',
'prepend_metric_type': True})
def test_tcp_reconnect_on_stream_close(self):
path_sleep = 'tornado.gen.sleep'
path_statsd = self.application.statsd
with mock.patch(path_sleep) as gen_sleep, \
patch.object(path_statsd, '_tcp_socket') as mock_tcp_socket:
f = web.Future()
f.set_result(None)
gen_sleep.return_value = f
self.application.statsd._tcp_on_closed()
mock_tcp_socket.assert_called_once_with()
@patch.object(iostream.IOStream, 'write')
@mock.patch.object(iostream.IOStream, 'write')
def test_write_not_executed_when_connection_is_closed(self, mock_write):
self.application.statsd._sock.close()
self.application.statsd.send('foo', 500, 'c')
mock_write.assert_not_called()
@patch.object(iostream.IOStream, 'write')
@mock.patch.object(iostream.IOStream, 'write')
def test_expected_counters_data_written(self, mock_sock):
path = ('foo', 'bar')
value = 500
@ -114,7 +94,7 @@ class TCPStatsdMetricCollectionTests(testing.AsyncHTTPTestCase):
self.application.statsd.send(path, value, metric_type)
mock_sock.assert_called_once_with(expected.encode())
@patch.object(iostream.IOStream, 'write')
@mock.patch.object(iostream.IOStream, 'write')
def test_expected_timers_data_written(self, mock_sock):
path = ('foo', 'bar')
value = 500
@ -190,7 +170,7 @@ class TCPStatsdConfigurationTests(testing.AsyncHTTPTestCase):
self.application = None
self.namespace = 'testing'
super(TCPStatsdConfigurationTests, self).setUp()
super().setUp()
self.statsd = FakeStatsdServer(self.io_loop, protocol='tcp')
statsd.install(self.application, **{'namespace': self.namespace,
@ -230,7 +210,7 @@ class UDPStatsdMetricCollectionTests(testing.AsyncHTTPTestCase):
self.application = None
self.namespace = 'testing'
super(UDPStatsdMetricCollectionTests, self).setUp()
super().setUp()
self.statsd = FakeStatsdServer(self.io_loop, protocol='udp')
statsd.install(self.application, **{'namespace': self.namespace,
@ -241,9 +221,9 @@ class UDPStatsdMetricCollectionTests(testing.AsyncHTTPTestCase):
def tearDown(self):
self.statsd.close()
super(UDPStatsdMetricCollectionTests, self).tearDown()
super().tearDown()
@patch.object(socket.socket, 'sendto')
@mock.patch.object(socket.socket, 'sendto')
def test_expected_counters_data_written(self, mock_sock):
path = ('foo', 'bar')
value = 500
@ -258,7 +238,7 @@ class UDPStatsdMetricCollectionTests(testing.AsyncHTTPTestCase):
expected.encode(),
(self.statsd.sockaddr[0], self.statsd.sockaddr[1]))
@patch.object(socket.socket, 'sendto')
@mock.patch.object(socket.socket, 'sendto')
def test_expected_timers_data_written(self, mock_sock):
path = ('foo', 'bar')
value = 500
@ -336,7 +316,7 @@ class UDPStatsdConfigurationTests(testing.AsyncHTTPTestCase):
self.application = None
self.namespace = 'testing'
super(UDPStatsdConfigurationTests, self).setUp()
super().setUp()
self.statsd = FakeStatsdServer(self.io_loop, protocol='udp')
statsd.install(self.application, **{'namespace': self.namespace,
@ -347,7 +327,7 @@ class UDPStatsdConfigurationTests(testing.AsyncHTTPTestCase):
def tearDown(self):
self.statsd.close()
super(UDPStatsdConfigurationTests, self).tearDown()
super().tearDown()
def test_that_http_method_call_is_recorded(self):
response = self.fetch('/')
@ -389,172 +369,3 @@ class StatsdInstallationTests(unittest.TestCase):
statsd.install(self.application, **{'namespace': 'testing'})
self.assertEqual(self.application.statsd._host, '127.0.0.1')
self.assertEqual(self.application.statsd._port, 8125)
class InfluxDbTests(testing.AsyncHTTPTestCase):
def get_app(self):
self.application = web.Application([
web.url(r'/', examples.influxdb.SimpleHandler),
web.url(r'/write', FakeInfluxHandler),
])
influxdb.install(self.application, **{'database': 'requests',
'submission_interval': 1,
'url': self.get_url('/write')})
self.application.influx_db = {}
return self.application
def setUp(self):
self.application = None
super(InfluxDbTests, self).setUp()
self.application.settings[influxdb.SETTINGS_KEY] = {
'measurement': 'my-service'
}
logging.getLogger(FakeInfluxHandler.__module__).setLevel(logging.DEBUG)
@gen.coroutine
def tearDown(self):
yield influxdb.shutdown(self.application)
super(InfluxDbTests, self).tearDown()
@property
def influx_messages(self):
return FakeInfluxHandler.get_messages(self.application, self)
def test_that_http_method_call_details_are_recorded(self):
start = int(time.time())
response = self.fetch('/')
self.assertEqual(response.code, 204)
for key, fields, timestamp, _headers in self.influx_messages:
if key.startswith('my-service,'):
tag_dict = dict(a.split('=') for a in key.split(',')[1:])
self.assertEqual(tag_dict['handler'],
'examples.influxdb.SimpleHandler')
self.assertEqual(tag_dict['method'], 'GET')
self.assertEqual(tag_dict['hostname'], socket.gethostname())
self.assertEqual(tag_dict['status_code'], '204')
value_dict = dict(a.split('=') for a in fields.split(','))
self.assertIn('duration', value_dict)
self.assertTrue(float(value_dict['duration']) > 0)
nanos_since_epoch = int(timestamp)
then = nanos_since_epoch / 1000000000
assert_between(start, then, time.time())
break
else:
self.fail('Expected to find "request" metric in {!r}'.format(
list(self.application.influx_db['requests'])))
def test_that_execution_timer_is_tracked(self):
response = self.fetch('/')
self.assertEqual(response.code, 204)
for key, fields, timestamp, _headers in self.influx_messages:
if key.startswith('my-service,'):
value_dict = dict(a.split('=') for a in fields.split(','))
assert_between(0.25, float(value_dict['sleepytime']), 0.3)
break
else:
self.fail('Expected to find "request" metric in {!r}'.format(
list(self.application.influx_db['requests'])))
def test_that_counter_is_tracked(self):
response = self.fetch('/')
self.assertEqual(response.code, 204)
for key, fields, timestamp, _headers in self.influx_messages:
if key.startswith('my-service,'):
value_dict = dict(a.split('=') for a in fields.split(','))
self.assertEqual(int(value_dict['slept']), 42)
break
else:
self.fail('Expected to find "request" metric in {!r}'.format(
list(self.application.influx_db['requests'])))
def test_that_cached_db_connection_is_used(self):
cfg = self.application.settings[influxdb.SETTINGS_KEY]
conn = mock.Mock()
cfg['db_connection'] = conn
response = self.fetch('/')
self.assertEqual(response.code, 204)
self.assertIs(cfg['db_connection'], conn)
def test_that_metric_tag_is_tracked(self):
cid = str(uuid.uuid4())
response = self.fetch('/', headers={'Correlation-ID': cid})
self.assertEqual(response.code, 204)
for key, fields, timestamp, _headers in self.influx_messages:
if key.startswith('my-service,'):
tag_dict = dict(a.split('=') for a in key.split(',')[1:])
self.assertEqual(tag_dict['correlation_id'], cid)
break
else:
self.fail('Expected to find "request" metric in {!r}'.format(
list(self.application.influx_db['requests'])))
def test_metrics_with_buffer_not_flush(self):
self.application.settings[influxdb] = {
'measurement': 'my-service'
}
# 2 requests
response = self.fetch('/')
self.assertEqual(response.code, 204)
response = self.fetch('/')
self.assertEqual(response.code, 204)
with self.assertRaises(AssertionError):
self.assertEqual(0, len(self.influx_messages))
class InfluxDbAuthTests(testing.AsyncHTTPTestCase):
def setUp(self):
self.application = None
self.username, self.password = str(uuid.uuid4()), str(uuid.uuid4())
os.environ['INFLUX_USER'] = self.username
os.environ['INFLUX_PASSWORD'] = self.password
super(InfluxDbAuthTests, self).setUp()
self.application.settings[influxdb.SETTINGS_KEY] = {
'measurement': 'my-service'
}
logging.getLogger(FakeInfluxHandler.__module__).setLevel(logging.DEBUG)
@gen.coroutine
def tearDown(self):
yield influxdb.shutdown(self.application)
super(InfluxDbAuthTests, self).tearDown()
@property
def influx_messages(self):
return FakeInfluxHandler.get_messages(self.application, self)
def get_app(self):
self.application = web.Application([
web.url(r'/', examples.influxdb.SimpleHandler),
web.url(r'/write', FakeInfluxHandler),
])
influxdb.install(self.application, **{'database': 'requests',
'submission_interval': 1,
'url': self.get_url('/write')})
self.application.influx_db = {}
return self.application
def test_that_authentication_header_was_sent(self):
print(os.environ)
response = self.fetch('/')
self.assertEqual(response.code, 204)
for _key, _fields, _timestamp, headers in self.influx_messages:
self.assertIn('Authorization', headers)
scheme, value = headers['Authorization'].split(' ')
self.assertEqual(scheme, 'Basic')
temp = base64.b64decode(value.encode('utf-8'))
values = temp.decode('utf-8').split(':')
self.assertEqual(values[0], self.username)
self.assertEqual(values[1], self.password)
break
else:
self.fail('Did not have an Authorization header')

View file

@ -1,10 +1,8 @@
[tox]
envlist = py27,py34,py35,py36,py37,pypy2,pypy3
indexserver =
default = https://pypi.python.org/simple
envlist = py37
toxworkdir = build/tox
skip_missing_interpreters = True
[testenv]
deps = -rrequires/testing.txt
commands = nosetests []
deps = -r requires/testing.txt
commands = nosetests