Added wait parameter to disconnect.

If wait=True, then the disconnect call will block until
the send queue has emptied.

WARNING: Using wait=True when more stanzas are being added to the
queue than can be processed such that the queue is never empty
will cause the disconnect call to block indefinitely without actually
disconnecting.
This commit is contained in:
Lance Stout 2011-07-04 18:47:57 -07:00
parent cccccdcc0a
commit 5efb170e1d

View file

@ -356,22 +356,34 @@ class XMLStream(object):
self.reconnect_delay = delay self.reconnect_delay = delay
return False return False
def disconnect(self, reconnect=False): def disconnect(self, reconnect=False, wait=False):
""" """
Terminate processing and close the XML streams. Terminate processing and close the XML streams.
Optionally, the connection may be reconnected and Optionally, the connection may be reconnected and
resume processing afterwards. resume processing afterwards.
If the disconnect should take place after all items
in the send queue have been sent, use wait=True. However,
take note: If you are constantly adding items to the queue
such that it is never empty, then the disconnect will
not occur and the call will continue to block.
Arguments: Arguments:
reconnect -- Flag indicating if the connection reconnect -- Flag indicating if the connection
and processing should be restarted. and processing should be restarted.
Defaults to False. Defaults to False.
wait -- Flag indicating if the send queue should
be emptied before disconnecting.
""" """
self.state.transition('connected', 'disconnected', wait=0.0, self.state.transition('connected', 'disconnected', wait=0.0,
func=self._disconnect, args=(reconnect,)) func=self._disconnect, args=(reconnect, wait))
def _disconnect(self, reconnect=False, wait=False):
# Wait for the send queue to empty.
if wait:
self.send_queue.join()
def _disconnect(self, reconnect=False):
# Send the end of stream marker. # Send the end of stream marker.
self.send_raw(self.stream_footer, now=True) self.send_raw(self.stream_footer, now=True)
self.session_started_event.clear() self.session_started_event.clear()
@ -1036,6 +1048,7 @@ class XMLStream(object):
log.debug("SEND: %s" % data) log.debug("SEND: %s" % data)
try: try:
self.socket.send(data.encode('utf-8')) self.socket.send(data.encode('utf-8'))
self.send_queue.task_done()
except Socket.error as serr: except Socket.error as serr:
self.event('socket_error', serr) self.event('socket_error', serr)
log.warning("Failed to send %s" % data) log.warning("Failed to send %s" % data)