Combine TCP & UDP server code into 1 class to prevent breaking changes

Updated tests to use the single class
This commit is contained in:
Dan g 2018-07-19 16:03:56 -04:00
parent df76061eec
commit ce352daf0e
2 changed files with 41 additions and 68 deletions

View file

@ -7,71 +7,12 @@ from tornado import gen, iostream, locks, tcpserver, testing, web
LOGGER = logging.getLogger(__name__)
class FakeTCPStatsdServer(tcpserver.TCPServer):
PATTERN = br'(?P<path>[^:]*):(?P<value>[^|]*)\|(?P<type>.*)$'
def __init__(self, iol, ssl_options=None, max_buffer_size=None,
read_chunk_size=None):
self.event = locks.Event()
self.datagrams = []
self.reconnect_receive = False
super(FakeTCPStatsdServer, self).__init__(
ssl_options, max_buffer_size, read_chunk_size)
self.sock, self.port = testing.bind_unused_port()
self.add_socket(self.sock)
self.sockaddr = self.sock.getsockname()
@gen.coroutine
def handle_stream(self, stream, address):
while True:
try:
result = yield stream.read_until_regex(self.PATTERN)
except iostream.StreamClosedError:
break
else:
self.event.set()
self.datagrams.append(result)
if b'reconnect' in result:
self.reconnect_receive = True
stream.close()
return
def find_metrics(self, prefix, metric_type):
"""
Yields captured datagrams that start with `prefix`.
:param str prefix: the metric prefix to search for
:param str metric_type: the statsd metric type (e.g., 'ms', 'c')
:returns: yields (path, value, metric_type) tuples for each
captured metric that matches
:raises AssertionError: if no metrics match.
"""
pattern = re.compile(
'(?P<path>{}[^:]*):(?P<value>[^|]*)\\|(?P<type>{})'.format(
re.escape(prefix), re.escape(metric_type)))
matched = False
for datagram in self.datagrams:
text_msg = datagram.decode('ascii')
match = pattern.match(text_msg)
if match:
yield match.groups()
matched = True
if not matched:
raise AssertionError(
'Expected metric starting with "{}" in {!r}'.format(
prefix, self.datagrams))
class FakeUDPStatsdServer(object):
class FakeStatsdServer(tcpserver.TCPServer):
"""
Implements something resembling a statsd server.
:param tornado.ioloop.IOLoop iol: the loop to attach to
:param bool tcp: whether the server implements TCP or UDP
Create an instance of this class in your asynchronous test case
attached to the IOLoop and configure your application to send
@ -89,12 +30,29 @@ class FakeUDPStatsdServer(object):
"""
def __init__(self, iol):
PATTERN = br'(?P<path>[^:]*):(?P<value>[^|]*)\|(?P<type>.*)$'
def __init__(self, iol, tcp=False):
self.datagrams = []
if tcp is False:
self.udp_server(iol)
else:
self.tcp_server()
def tcp_server(self):
self.event = locks.Event()
super(FakeStatsdServer, self).__init__()
sock, port = testing.bind_unused_port()
self.add_socket(sock)
self.sockaddr = sock.getsockname()
def udp_server(self, iol):
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM,
socket.IPPROTO_UDP)
self.socket.bind(('127.0.0.1', 0))
self.sockaddr = self.socket.getsockname()
self.datagrams = []
iol.add_handler(self.socket, self._handle_events, iol.READ)
self._iol = iol
@ -107,6 +65,21 @@ class FakeUDPStatsdServer(object):
self.socket.close()
self.socket = None
@gen.coroutine
def handle_stream(self, stream, address):
while True:
try:
result = yield stream.read_until_regex(self.PATTERN)
except iostream.StreamClosedError:
break
else:
self.event.set()
self.datagrams.append(result)
if b'reconnect' in result:
self.reconnect_receive = True
stream.close()
return
def _handle_events(self, fd, events):
if fd != self.socket:
return

View file

@ -11,7 +11,7 @@ import mock
from sprockets.mixins.metrics import influxdb, statsd
from sprockets.mixins.metrics.testing import (
FakeInfluxHandler, FakeUDPStatsdServer, FakeTCPStatsdServer)
FakeInfluxHandler, FakeStatsdServer)
import examples.influxdb
import examples.statsd
@ -55,7 +55,7 @@ class TCPStatsdMetricCollectionTests(testing.AsyncHTTPTestCase):
def setUp(self):
self.application = None
super(TCPStatsdMetricCollectionTests, self).setUp()
self.statsd = FakeTCPStatsdServer(self.io_loop)
self.statsd = FakeStatsdServer(self.io_loop, tcp=True)
statsd.install(self.application, **{'namespace': 'testing',
'host': self.statsd.sockaddr[0],
@ -121,7 +121,7 @@ class TCPStatsdConfigurationTests(testing.AsyncHTTPTestCase):
def setUp(self):
self.application = None
super(TCPStatsdConfigurationTests, self).setUp()
self.statsd = FakeTCPStatsdServer(self.io_loop)
self.statsd = FakeStatsdServer(self.io_loop, tcp=True)
statsd.install(self.application, **{'namespace': 'testing',
'host': self.statsd.sockaddr[0],
@ -159,7 +159,7 @@ class UDPStatsdMetricCollectionTests(testing.AsyncHTTPTestCase):
def setUp(self):
self.application = None
super(UDPStatsdMetricCollectionTests, self).setUp()
self.statsd = FakeUDPStatsdServer(self.io_loop)
self.statsd = FakeStatsdServer(self.io_loop, tcp=False)
statsd.install(self.application, **{'namespace': 'testing',
'host': self.statsd.sockaddr[0],
@ -229,7 +229,7 @@ class UDPStatsdConfigurationTests(testing.AsyncHTTPTestCase):
def setUp(self):
self.application = None
super(UDPStatsdConfigurationTests, self).setUp()
self.statsd = FakeUDPStatsdServer(self.io_loop)
self.statsd = FakeStatsdServer(self.io_loop, tcp=False)
statsd.install(self.application, **{'namespace': 'testing',
'host': self.statsd.sockaddr[0],