diff --git a/sprockets_statsd/statsd.py b/sprockets_statsd/statsd.py index 11e0223..933f23b 100644 --- a/sprockets_statsd/statsd.py +++ b/sprockets_statsd/statsd.py @@ -155,7 +155,8 @@ class Processor(asyncio.Protocol): while not self.should_terminate: try: await self._connect_if_necessary() - await self._process_metric() + if self.connected.is_set(): + await self._process_metric() except asyncio.CancelledError: self.logger.info('task cancelled, exiting') break @@ -221,14 +222,11 @@ class Processor(asyncio.Protocol): async def _process_metric(self): processing_failed_send = False - if self._failed_sends: - self.logger.debug('using previous send attempt') - metric = self._failed_sends[0] - processing_failed_send = True - else: + if not self._failed_sends: try: metric = await asyncio.wait_for(self.queue.get(), 0.1) self.logger.debug('received %r from queue', metric) + self.queue.task_done() except asyncio.TimeoutError: return else: @@ -240,15 +238,17 @@ class Processor(asyncio.Protocol): self.logger.debug('preventing send on closed transport') self._failed_sends.append(metric) return + else: + self.logger.debug('using previous send attempt') + metric = self._failed_sends[0] + processing_failed_send = True self.transport.write(metric) - if self.transport.is_closing(): - # Writing to a transport does not raise exceptions, it - # will close the transport if a low-level error occurs. - self.logger.debug('transport closed by writing') - else: + if not self.transport.is_closing(): self.logger.debug('sent %r to statsd', metric) if processing_failed_send: self._failed_sends.pop(0) - else: - self.queue.task_done() + else: + # Writing to a transport does not raise exceptions, it + # will close the transport if a low-level error occurs. + self.logger.debug('transport closed by writing')