Allow sending stanzas on session_end.

May set self.disconnect_wait=True so that all disconnect
calls wait for the send queue to empty, unless explicitly
overridden with wait=False.

The session_end now fires before closing the socket so
that final stanzas may be sent, such as unavailable presences
for components.
This commit is contained in:
Lance Stout 2011-12-09 23:56:39 -08:00
parent 65dbddb6b6
commit efe1b9f5a9

View file

@ -276,6 +276,13 @@ class XMLStream(object):
#: the stream will be restarted in the event of an error. #: the stream will be restarted in the event of an error.
self.auto_reconnect = True self.auto_reconnect = True
#: The :attr:`disconnect_wait` setting is the default value
#: for controlling if the system waits for the send queue to
#: empty before ending the stream. This may be overridden by
#: passing ``wait=True`` or ``wait=False`` to :meth:`disconnect`.
#: The default :attr:`disconnect_wait` value is ``False``.
self.disconnect_wait = False
#: A list of DNS results that have not yet been tried. #: A list of DNS results that have not yet been tried.
self.dns_answers = [] self.dns_answers = []
@ -402,6 +409,7 @@ class XMLStream(object):
try: try:
while elapsed < delay and not self.stop.is_set(): while elapsed < delay and not self.stop.is_set():
time.sleep(0.1) time.sleep(0.1)
elapsed += 0.1
except KeyboardInterrupt: except KeyboardInterrupt:
self.stop.set() self.stop.set()
return False return False
@ -519,7 +527,7 @@ class XMLStream(object):
self.session_timeout, self.session_timeout,
_handle_session_timeout) _handle_session_timeout)
def disconnect(self, reconnect=False, wait=False): def disconnect(self, reconnect=False, wait=None):
"""Terminate processing and close the XML streams. """Terminate processing and close the XML streams.
Optionally, the connection may be reconnected and Optionally, the connection may be reconnected and
@ -538,15 +546,21 @@ class XMLStream(object):
and processing should be restarted. and processing should be restarted.
Defaults to ``False``. Defaults to ``False``.
:param wait: Flag indicating if the send queue should :param wait: Flag indicating if the send queue should
be emptied before disconnecting. be emptied before disconnecting, overriding
:attr:`disconnect_wait`.
""" """
self.state.transition('connected', 'disconnected', self.state.transition('connected', 'disconnected',
func=self._disconnect, args=(reconnect, wait)) func=self._disconnect, args=(reconnect, wait))
def _disconnect(self, reconnect=False, wait=False): def _disconnect(self, reconnect=False, wait=None):
self.event('session_end', direct=True)
# Wait for the send queue to empty. # Wait for the send queue to empty.
if wait is not None:
if wait: if wait:
self.send_queue.join() self.send_queue.join()
elif self.disconnect_wait:
self.send_queue.join()
# Send the end of stream marker. # Send the end of stream marker.
self.send_raw(self.stream_footer, now=True) self.send_raw(self.stream_footer, now=True)
@ -566,7 +580,6 @@ class XMLStream(object):
self.event('socket_error', serr) self.event('socket_error', serr)
finally: finally:
#clear your application state #clear your application state
self.event('session_end', direct=True)
self.event("disconnected", direct=True) self.event("disconnected", direct=True)
return True return True
@ -1119,6 +1132,7 @@ class XMLStream(object):
# Additional passes will be made only if an error occurs and # Additional passes will be made only if an error occurs and
# reconnecting is permitted. # reconnecting is permitted.
while True: while True:
shutdown = False
try: try:
# The call to self.__read_xml will block and prevent # The call to self.__read_xml will block and prevent
# the body of the loop from running until a disconnect # the body of the loop from running until a disconnect
@ -1136,16 +1150,16 @@ class XMLStream(object):
if not self.__read_xml(): if not self.__read_xml():
# If the server terminated the stream, end processing # If the server terminated the stream, end processing
break break
except KeyboardInterrupt:
log.debug("Keyboard Escape Detected in _process")
self.event('killed', direct=True)
shutdown = True
except SystemExit:
log.debug("SystemExit in _process")
shutdown = True
except SyntaxError as e: except SyntaxError as e:
log.error("Error reading from XML stream.") log.error("Error reading from XML stream.")
self.exception(e) self.exception(e)
except KeyboardInterrupt:
log.debug("Keyboard Escape Detected in _process")
self.stop.set()
except SystemExit:
log.debug("SystemExit in _process")
self.stop.set()
self.scheduler.quit()
except Socket.error as serr: except Socket.error as serr:
self.event('socket_error', serr) self.event('socket_error', serr)
log.exception('Socket Error') log.exception('Socket Error')
@ -1154,7 +1168,8 @@ class XMLStream(object):
log.exception('Connection error.') log.exception('Connection error.')
self.exception(e) self.exception(e)
if not self.stop.is_set() and self.auto_reconnect: if not shutdown and not self.stop.is_set() \
and self.auto_reconnect:
self.reconnect() self.reconnect()
else: else:
self.disconnect() self.disconnect()