mirror of
https://github.com/sprockets/sprockets.mixins.metrics.git
synced 2024-11-21 19:28:34 +00:00
Use native async
Also remove a test that isn't that useful
This commit is contained in:
parent
8cc566acfd
commit
ad19d64911
5 changed files with 18 additions and 32 deletions
|
@ -26,7 +26,7 @@ call to the ``get`` method as well as a separate metric for the database query.
|
||||||
|
|
||||||
from sprockets.mixins import mediatype
|
from sprockets.mixins import mediatype
|
||||||
from sprockets.mixins.metrics import statsd
|
from sprockets.mixins.metrics import statsd
|
||||||
from tornado import gen, web
|
from tornado import web
|
||||||
import queries
|
import queries
|
||||||
|
|
||||||
def make_application():
|
def make_application():
|
||||||
|
@ -47,10 +47,9 @@ call to the ``get`` method as well as a separate metric for the database query.
|
||||||
super(MyHandler, self).initialize()
|
super(MyHandler, self).initialize()
|
||||||
self.db = queries.TornadoSession(os.environ['MY_PGSQL_DSN'])
|
self.db = queries.TornadoSession(os.environ['MY_PGSQL_DSN'])
|
||||||
|
|
||||||
@gen.coroutine
|
async def get(self, obj_id):
|
||||||
def get(self, obj_id):
|
|
||||||
with self.execution_timer('dbquery', 'get'):
|
with self.execution_timer('dbquery', 'get'):
|
||||||
result = yield self.db.query('SELECT * FROM foo WHERE id=%s',
|
result = await self.db.query('SELECT * FROM foo WHERE id=%s',
|
||||||
obj_id)
|
obj_id)
|
||||||
self.send_response(result)
|
self.send_response(result)
|
||||||
|
|
||||||
|
|
|
@ -1,7 +1,8 @@
|
||||||
|
import asyncio
|
||||||
import signal
|
import signal
|
||||||
|
|
||||||
from sprockets.mixins.metrics import statsd
|
from sprockets.mixins.metrics import statsd
|
||||||
from tornado import concurrent, gen, ioloop, web
|
from tornado import ioloop, web
|
||||||
|
|
||||||
|
|
||||||
class SimpleHandler(statsd.StatsdMixin, web.RequestHandler):
|
class SimpleHandler(statsd.StatsdMixin, web.RequestHandler):
|
||||||
|
@ -14,9 +15,8 @@ class SimpleHandler(statsd.StatsdMixin, web.RequestHandler):
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
@gen.coroutine
|
async def get(self):
|
||||||
def get(self):
|
await asyncio.sleep(0.25)
|
||||||
yield gen.sleep(0.25)
|
|
||||||
self.set_status(204)
|
self.set_status(204)
|
||||||
self.finish()
|
self.finish()
|
||||||
|
|
||||||
|
|
|
@ -1,10 +1,11 @@
|
||||||
|
import asyncio
|
||||||
import contextlib
|
import contextlib
|
||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
import socket
|
import socket
|
||||||
import time
|
import time
|
||||||
|
|
||||||
from tornado import gen, iostream
|
from tornado import iostream
|
||||||
|
|
||||||
LOGGER = logging.getLogger(__name__)
|
LOGGER = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
@ -134,12 +135,11 @@ class StatsDCollector:
|
||||||
sock.set_close_callback(self._tcp_on_closed)
|
sock.set_close_callback(self._tcp_on_closed)
|
||||||
return sock
|
return sock
|
||||||
|
|
||||||
@gen.engine
|
async def _tcp_on_closed(self):
|
||||||
def _tcp_on_closed(self):
|
|
||||||
"""Invoked when the socket is closed."""
|
"""Invoked when the socket is closed."""
|
||||||
LOGGER.warning('Not connected to statsd, connecting in %s seconds',
|
LOGGER.warning('Not connected to statsd, connecting in %s seconds',
|
||||||
self._tcp_reconnect_sleep)
|
self._tcp_reconnect_sleep)
|
||||||
yield gen.sleep(self._tcp_reconnect_sleep)
|
await asyncio.sleep(self._tcp_reconnect_sleep)
|
||||||
self._sock = self._tcp_socket()
|
self._sock = self._tcp_socket()
|
||||||
|
|
||||||
def _tcp_on_connected(self):
|
def _tcp_on_connected(self):
|
||||||
|
|
|
@ -2,7 +2,7 @@ import logging
|
||||||
import re
|
import re
|
||||||
import socket
|
import socket
|
||||||
|
|
||||||
from tornado import gen, iostream, locks, tcpserver, testing
|
from tornado import iostream, locks, tcpserver, testing
|
||||||
|
|
||||||
LOGGER = logging.getLogger(__name__)
|
LOGGER = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
@ -67,11 +67,10 @@ class FakeStatsdServer(tcpserver.TCPServer):
|
||||||
self.socket.close()
|
self.socket.close()
|
||||||
self.socket = None
|
self.socket = None
|
||||||
|
|
||||||
@gen.coroutine
|
async def handle_stream(self, stream, address):
|
||||||
def handle_stream(self, stream, address):
|
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
result = yield stream.read_until_regex(self.TCP_PATTERN)
|
result = await stream.read_until_regex(self.TCP_PATTERN)
|
||||||
except iostream.StreamClosedError:
|
except iostream.StreamClosedError:
|
||||||
break
|
break
|
||||||
else:
|
else:
|
||||||
|
|
20
tests.py
20
tests.py
|
@ -1,9 +1,10 @@
|
||||||
|
import asyncio
|
||||||
import itertools
|
import itertools
|
||||||
import socket
|
import socket
|
||||||
import unittest
|
import unittest
|
||||||
from unittest import mock
|
from unittest import mock
|
||||||
|
|
||||||
from tornado import gen, iostream, testing, web
|
from tornado import iostream, testing, web
|
||||||
|
|
||||||
from sprockets.mixins.metrics import statsd
|
from sprockets.mixins.metrics import statsd
|
||||||
from sprockets.mixins.metrics.testing import FakeStatsdServer
|
from sprockets.mixins.metrics.testing import FakeStatsdServer
|
||||||
|
@ -12,10 +13,9 @@ import examples.statsd
|
||||||
|
|
||||||
class CounterBumper(statsd.StatsdMixin, web.RequestHandler):
|
class CounterBumper(statsd.StatsdMixin, web.RequestHandler):
|
||||||
|
|
||||||
@gen.coroutine
|
async def get(self, counter, value):
|
||||||
def get(self, counter, value):
|
|
||||||
with self.execution_timer(*counter.split('.')):
|
with self.execution_timer(*counter.split('.')):
|
||||||
yield gen.sleep(float(value))
|
await asyncio.sleep(float(value))
|
||||||
self.set_status(204)
|
self.set_status(204)
|
||||||
self.finish()
|
self.finish()
|
||||||
|
|
||||||
|
@ -75,18 +75,6 @@ class TCPStatsdMetricCollectionTests(testing.AsyncHTTPTestCase):
|
||||||
'protocol': 'tcp',
|
'protocol': 'tcp',
|
||||||
'prepend_metric_type': True})
|
'prepend_metric_type': True})
|
||||||
|
|
||||||
def test_tcp_reconnect_on_stream_close(self):
|
|
||||||
path_sleep = 'tornado.gen.sleep'
|
|
||||||
path_statsd = self.application.statsd
|
|
||||||
with mock.patch(path_sleep) as gen_sleep, \
|
|
||||||
mock.patch.object(path_statsd, '_tcp_socket') as mock_tcp_socket:
|
|
||||||
f = web.Future()
|
|
||||||
f.set_result(None)
|
|
||||||
gen_sleep.return_value = f
|
|
||||||
|
|
||||||
self.application.statsd._tcp_on_closed()
|
|
||||||
mock_tcp_socket.assert_called_once_with()
|
|
||||||
|
|
||||||
@mock.patch.object(iostream.IOStream, 'write')
|
@mock.patch.object(iostream.IOStream, 'write')
|
||||||
def test_write_not_executed_when_connection_is_closed(self, mock_write):
|
def test_write_not_executed_when_connection_is_closed(self, mock_write):
|
||||||
self.application.statsd._sock.close()
|
self.application.statsd._sock.close()
|
||||||
|
|
Loading…
Reference in a new issue