Ensure that reconnection happens properly after connection loss.

Calling reconnect() simultaneously from multiple threads (like when
using XEP-0199 keepalive) could break because the connection state
can transition and break the state expectations in one of the
reconnect() calls.
This commit is contained in:
Lance Stout 2011-11-20 12:18:37 -08:00
parent fba60ffff1
commit 862a2a1440

View file

@ -343,7 +343,7 @@ class XMLStream(object):
# is established. # is established.
connected = self.state.transition('disconnected', 'connected', connected = self.state.transition('disconnected', 'connected',
func=self._connect) func=self._connect)
while reattempt and not connected: while reattempt and not connected and not self.stop.is_set():
connected = self.state.transition('disconnected', 'connected', connected = self.state.transition('disconnected', 'connected',
func=self._connect) func=self._connect)
return connected return connected
@ -439,7 +439,7 @@ class XMLStream(object):
self.socket.connect(address) self.socket.connect(address)
self.send_raw(headers, now=True) self.send_raw(headers, now=True)
resp = '' resp = ''
while '\r\n\r\n' not in resp: while '\r\n\r\n' not in resp and not self.stop.is_set():
resp += self.socket.recv(1024).decode('utf-8') resp += self.socket.recv(1024).decode('utf-8')
log.debug('RECV: %s', resp) log.debug('RECV: %s', resp)
@ -475,7 +475,6 @@ class XMLStream(object):
self.session_timeout, self.session_timeout,
_handle_session_timeout) _handle_session_timeout)
def disconnect(self, reconnect=False, wait=False): def disconnect(self, reconnect=False, wait=False):
""" """
Terminate processing and close the XML streams. Terminate processing and close the XML streams.
@ -496,7 +495,7 @@ class XMLStream(object):
wait -- Flag indicating if the send queue should wait -- Flag indicating if the send queue should
be emptied before disconnecting. be emptied before disconnecting.
""" """
self.state.transition('connected', 'disconnected', wait=0.0, self.state.transition('connected', 'disconnected',
func=self._disconnect, args=(reconnect, wait)) func=self._disconnect, args=(reconnect, wait))
def _disconnect(self, reconnect=False, wait=False): def _disconnect(self, reconnect=False, wait=False):
@ -526,20 +525,22 @@ class XMLStream(object):
self.event("disconnected", direct=True) self.event("disconnected", direct=True)
return True return True
def reconnect(self): def reconnect(self, reattempt=True):
""" """
Reset the stream's state and reconnect to the server. Reset the stream's state and reconnect to the server.
""" """
log.debug("reconnecting...") log.debug("reconnecting...")
self.state.transition('connected', 'disconnected', wait=2.0, if self.state.ensure('connected'):
func=self._disconnect, args=(True,)) self.state.transition('connected', 'disconnected', wait=2.0,
func=self._disconnect, args=(True,))
log.debug("connecting...") log.debug("connecting...")
connected = self.state.transition('disconnected', 'connected', connected = self.state.transition('disconnected', 'connected',
wait=2.0, func=self._connect) wait=2.0, func=self._connect)
while not connected: while reattempt and not connected and not self.stop.is_set():
connected = self.state.transition('disconnected', 'connected', connected = self.state.transition('disconnected', 'connected',
wait=2.0, func=self._connect) wait=2.0, func=self._connect)
connected = connected or self.state.ensure('connected')
return connected return connected
def set_socket(self, socket, ignore=False): def set_socket(self, socket, ignore=False):
@ -1207,7 +1208,10 @@ class XMLStream(object):
unhandled = True unhandled = True
matched_handlers = [h for h in self.__handlers if h.match(stanza)] matched_handlers = [h for h in self.__handlers if h.match(stanza)]
for handler in matched_handlers: for handler in matched_handlers:
stanza_copy = copy.copy(stanza) if len(matched_handlers) > 1 else stanza if len(matched_handlers) > 1:
stanza_copy = copy.copy(stanza)
else:
stanza_copy = stanza
handler.prerun(stanza_copy) handler.prerun(stanza_copy)
self.event_queue.put(('stanza', handler, stanza_copy)) self.event_queue.put(('stanza', handler, stanza_copy))
try: try: