From 384e1a92b716250c168f5dedc1f9693111f81423 Mon Sep 17 00:00:00 2001 From: Lance Stout Date: Fri, 27 May 2011 11:01:30 -0700 Subject: [PATCH 1/6] Added support for testind disconnect errors. --- sleekxmpp/test/livesocket.py | 12 ++++++++++++ sleekxmpp/test/mocksocket.py | 14 ++++++++++++++ sleekxmpp/test/sleektest.py | 7 +++++++ sleekxmpp/xmlstream/xmlstream.py | 3 ++- tests/test_stream.py | 21 ++++++++++++++++++++- 5 files changed, 55 insertions(+), 2 deletions(-) diff --git a/sleekxmpp/test/livesocket.py b/sleekxmpp/test/livesocket.py index 3e0f213..7dd4693 100644 --- a/sleekxmpp/test/livesocket.py +++ b/sleekxmpp/test/livesocket.py @@ -58,6 +58,18 @@ class TestLiveSocket(object): # ------------------------------------------------------------------ # Testing Interface + def disconnect_errror(self): + """ + Used to simulate a socket disconnection error. + + Not used by live sockets. + """ + try: + self.socket.shutdown() + self.socket.close() + except: + pass + def next_sent(self, timeout=None): """ Get the next stanza that has been sent. diff --git a/sleekxmpp/test/mocksocket.py b/sleekxmpp/test/mocksocket.py index e3ddd70..a2af8d6 100644 --- a/sleekxmpp/test/mocksocket.py +++ b/sleekxmpp/test/mocksocket.py @@ -39,6 +39,7 @@ class TestSocket(object): self.recv_queue = queue.Queue() self.send_queue = queue.Queue() self.is_live = False + self.disconnected = False def __getattr__(self, name): """ @@ -89,6 +90,13 @@ class TestSocket(object): """ self.recv_queue.put(data) + def disconnect_error(self): + """ + Simulate a disconnect error by raising a socket.error exception + for any current or further socket operations. + """ + self.disconnected = True + # ------------------------------------------------------------------ # Socket Interface @@ -99,6 +107,8 @@ class TestSocket(object): Arguments: Placeholders. Same as for socket.Socket.recv. """ + if self.disconnected: + raise socket.error return self.read(block=True) def send(self, data): @@ -108,6 +118,8 @@ class TestSocket(object): Arguments: data -- String value to write. """ + if self.disconnected: + raise socket.error self.send_queue.put(data) # ------------------------------------------------------------------ @@ -132,6 +144,8 @@ class TestSocket(object): timeout -- Time in seconds a block should last before returning None. """ + if self.disconnected: + raise socket.error if timeout is not None: block = True try: diff --git a/sleekxmpp/test/sleektest.py b/sleekxmpp/test/sleektest.py index 24af1e7..8cf7b70 100644 --- a/sleekxmpp/test/sleektest.py +++ b/sleekxmpp/test/sleektest.py @@ -259,6 +259,13 @@ class SleekTest(unittest.TestCase): # ------------------------------------------------------------------ # Methods for simulating stanza streams. + def stream_disconnect(self): + """ + Simulate a stream disconnection. + """ + if self.xmpp: + self.xmpp.socket.disconnect_error() + def stream_start(self, mode='client', skip=True, header=None, socket='mock', jid='tester@localhost', password='test', server='localhost', diff --git a/sleekxmpp/xmlstream/xmlstream.py b/sleekxmpp/xmlstream/xmlstream.py index 1c16556..468db03 100644 --- a/sleekxmpp/xmlstream/xmlstream.py +++ b/sleekxmpp/xmlstream/xmlstream.py @@ -978,7 +978,8 @@ class XMLStream(object): log.debug("SEND: %s" % data) try: self.socket.send(data.encode('utf-8')) - except: + except Socket.error as serr: + self.event('socket_error', serr) log.warning("Failed to send %s" % data) self.disconnect(self.auto_reconnect) except KeyboardInterrupt: diff --git a/tests/test_stream.py b/tests/test_stream.py index f91f71f..deac24a 100644 --- a/tests/test_stream.py +++ b/tests/test_stream.py @@ -1,5 +1,5 @@ +import time from sleekxmpp.test import * -import sleekxmpp.plugins.xep_0033 as xep_0033 class TestStreamTester(SleekTest): @@ -57,4 +57,23 @@ class TestStreamTester(SleekTest): self.stream_start(mode='client', skip=False) self.send_header(sto='localhost') + def testStreamDisconnect(self): + """Test that the test socket can simulate disconnections.""" + self.stream_start() + events = set() + + def stream_error(event): + events.add('socket_error') + + self.xmpp.add_event_handler('socket_error', stream_error) + + self.stream_disconnect() + self.xmpp.send_raw(' ') + + time.sleep(.1) + + self.failUnless('socket_error' in events, + "Stream error event not raised: %s" % events) + + suite = unittest.TestLoader().loadTestsFromTestCase(TestStreamTester) From b81ab979006956134e5d924640936fe8cc20dbf3 Mon Sep 17 00:00:00 2001 From: Lance Stout Date: Fri, 27 May 2011 14:42:40 -0700 Subject: [PATCH 2/6] Add exponential backoff to connection attempts. Delay will approximately double between attempts (random variation). See issue #67. --- sleekxmpp/xmlstream/xmlstream.py | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/sleekxmpp/xmlstream/xmlstream.py b/sleekxmpp/xmlstream/xmlstream.py index 468db03..6bf70fb 100644 --- a/sleekxmpp/xmlstream/xmlstream.py +++ b/sleekxmpp/xmlstream/xmlstream.py @@ -17,6 +17,7 @@ import sys import threading import time import types +import random try: import queue except ImportError: @@ -45,6 +46,9 @@ HANDLER_THREADS = 1 # Flag indicating if the SSL library is available for use. SSL_SUPPORT = True +# Maximum time to delay between connection attempts is one hour. +RECONNECT_MAX_DELAY = 3600 + log = logging.getLogger(__name__) @@ -104,7 +108,11 @@ class XMLStream(object): use_ssl -- Flag indicating if SSL should be used. use_tls -- Flag indicating if TLS should be used. stop -- threading Event used to stop all threads. - auto_reconnect-- Flag to determine whether we auto reconnect. + + auto_reconnect -- Flag to determine whether we auto reconnect. + reconnect_max_delay -- Maximum time to delay between connection + attempts. Defaults to RECONNECT_MAX_DELAY, + which is one hour. Methods: add_event_handler -- Add a handler for a custom event. @@ -155,6 +163,7 @@ class XMLStream(object): self.ca_certs = None self.response_timeout = RESPONSE_TIMEOUT + self.reconnect_max_delay = RECONNECT_MAX_DELAY self.state = StateMachine(('disconnected', 'connected')) self.state._set_state('disconnected') @@ -291,9 +300,14 @@ class XMLStream(object): # is established. connected = self.state.transition('disconnected', 'connected', func=self._connect) + delay = 1.0 while reattempt and not connected: connected = self.state.transition('disconnected', 'connected', func=self._connect) + delay = min(delay * 2, self.reconnect_max_delay) + delay = random.normalvariate(delay, delay * 0.1) + log.debug('Waiting %s seconds before reconnecting.' % delay) + time.sleep(delay) return connected def _connect(self): From 6997b2fbf87a080a12334b348653ed4cb30f9218 Mon Sep 17 00:00:00 2001 From: Lance Stout Date: Fri, 27 May 2011 16:39:45 -0700 Subject: [PATCH 3/6] Fix typo for SSL certificate use. --- sleekxmpp/xmlstream/xmlstream.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sleekxmpp/xmlstream/xmlstream.py b/sleekxmpp/xmlstream/xmlstream.py index 6bf70fb..1dc2d43 100644 --- a/sleekxmpp/xmlstream/xmlstream.py +++ b/sleekxmpp/xmlstream/xmlstream.py @@ -323,7 +323,7 @@ class XMLStream(object): ssl_socket = ssl.wrap_socket(self.socket, ca_certs=self.ca_certs, - certs_reqs=cert_policy) + cert_reqs=cert_policy) if hasattr(self.socket, 'socket'): # We are using a testing socket, so preserve the top From 1735c194cdf83b61850bba45044070db6c42d0ac Mon Sep 17 00:00:00 2001 From: Lance Stout Date: Fri, 27 May 2011 16:59:52 -0700 Subject: [PATCH 4/6] Don't use the send queue for stream initialization. Use the parameter now=True to skip the queue when sending Iq stanzas, or using xmpp.send(). --- sleekxmpp/clientxmpp.py | 15 ++++---- sleekxmpp/componentxmpp.py | 1 + sleekxmpp/stanza/iq.py | 11 ++++-- sleekxmpp/test/sleektest.py | 2 + sleekxmpp/xmlstream/stanzabase.py | 12 ++++-- sleekxmpp/xmlstream/xmlstream.py | 62 ++++++++++++++++++++++--------- 6 files changed, 70 insertions(+), 33 deletions(-) diff --git a/sleekxmpp/clientxmpp.py b/sleekxmpp/clientxmpp.py index 92186e9..fb5b208 100644 --- a/sleekxmpp/clientxmpp.py +++ b/sleekxmpp/clientxmpp.py @@ -75,9 +75,6 @@ class ClientXMPP(BaseXMPP): self.plugin_whitelist = plugin_whitelist self.srv_support = SRV_SUPPORT - self.session_started_event = threading.Event() - self.session_started_event.clear() - self.stream_header = "" % ( self.boundjid.host, "xmlns:stream='%s'" % self.stream_ns, @@ -313,7 +310,7 @@ class ClientXMPP(BaseXMPP): self._handle_tls_start, name='TLS Proceed', instream=True) - self.send_xml(xml) + self.send_xml(xml, now=True) return True else: log.warning("The module tlslite is required to log in" +\ @@ -369,11 +366,13 @@ class ClientXMPP(BaseXMPP): self.send("%s" % ( sasl_ns, - auth)) + auth), + now=True) elif 'sasl:ANONYMOUS' in self.features and not self.boundjid.user: self.send("" % ( sasl_ns, - 'ANONYMOUS')) + 'ANONYMOUS'), + now=True) else: log.error("No appropriate login method.") self.disconnect() @@ -416,7 +415,7 @@ class ClientXMPP(BaseXMPP): res.text = self.boundjid.resource xml.append(res) iq.append(xml) - response = iq.send() + response = iq.send(now=True) bind_ns = 'urn:ietf:params:xml:ns:xmpp-bind' self.set_jid(response.xml.find('{%s}bind/{%s}jid' % (bind_ns, @@ -439,7 +438,7 @@ class ClientXMPP(BaseXMPP): """ if self.authenticated and self.bound: iq = self.makeIqSet(xml) - response = iq.send() + response = iq.send(now=True) log.debug("Established Session") self.sessionstarted = True self.session_started_event.set() diff --git a/sleekxmpp/componentxmpp.py b/sleekxmpp/componentxmpp.py index ae58c5f..8c380dd 100644 --- a/sleekxmpp/componentxmpp.py +++ b/sleekxmpp/componentxmpp.py @@ -138,4 +138,5 @@ class ComponentXMPP(BaseXMPP): Arguments: xml -- The reply handshake stanza. """ + self.session_started_event.set() self.event("session_start") diff --git a/sleekxmpp/stanza/iq.py b/sleekxmpp/stanza/iq.py index 82ab13e..4a12a87 100644 --- a/sleekxmpp/stanza/iq.py +++ b/sleekxmpp/stanza/iq.py @@ -154,7 +154,7 @@ class Iq(RootStanza): StanzaBase.reply(self, clear) return self - def send(self, block=True, timeout=None, callback=None): + def send(self, block=True, timeout=None, callback=None, now=False): """ Send an stanza over the XML stream. @@ -178,6 +178,9 @@ class Iq(RootStanza): Defaults to sleekxmpp.xmlstream.RESPONSE_TIMEOUT callback -- Optional reference to a stream handler function. Will be executed when a reply stanza is received. + now -- Indicates if the send queue should be skipped and send + the stanza immediately. Used during stream + initialization. Defaults to False. """ if timeout is None: timeout = self.stream.response_timeout @@ -188,15 +191,15 @@ class Iq(RootStanza): callback, once=True) self.stream.register_handler(handler) - StanzaBase.send(self) + StanzaBase.send(self, now=now) return handler_name elif block and self['type'] in ('get', 'set'): waitfor = Waiter('IqWait_%s' % self['id'], MatcherId(self['id'])) self.stream.register_handler(waitfor) - StanzaBase.send(self) + StanzaBase.send(self, now=now) return waitfor.wait(timeout) else: - return StanzaBase.send(self) + return StanzaBase.send(self, now=now) def _set_stanza_values(self, values): """ diff --git a/sleekxmpp/test/sleektest.py b/sleekxmpp/test/sleektest.py index 8cf7b70..7802a9b 100644 --- a/sleekxmpp/test/sleektest.py +++ b/sleekxmpp/test/sleektest.py @@ -334,6 +334,8 @@ class SleekTest(unittest.TestCase): self.xmpp.process(threaded=True) if skip: if socket != 'live': + # Mark send queue as usable + self.xmpp.session_started_event.set() # Clear startup stanzas self.xmpp.socket.next_sent(timeout=1) if mode == 'component': diff --git a/sleekxmpp/xmlstream/stanzabase.py b/sleekxmpp/xmlstream/stanzabase.py index b8a7cea..d9a4636 100644 --- a/sleekxmpp/xmlstream/stanzabase.py +++ b/sleekxmpp/xmlstream/stanzabase.py @@ -1253,9 +1253,15 @@ class StanzaBase(ElementBase): log.exception('Error handling {%s}%s stanza' % (self.namespace, self.name)) - def send(self): - """Queue the stanza to be sent on the XML stream.""" - self.stream.sendRaw(self.__str__()) + def send(self, now=False): + """ + Queue the stanza to be sent on the XML stream. + Arguments: + now -- Indicates if the queue should be skipped and the + stanza sent immediately. Useful for stream + initialization. Defaults to False. + """ + self.stream.send_raw(self.__str__(), now=now) def __copy__(self): """ diff --git a/sleekxmpp/xmlstream/xmlstream.py b/sleekxmpp/xmlstream/xmlstream.py index 1dc2d43..9d00ee8 100644 --- a/sleekxmpp/xmlstream/xmlstream.py +++ b/sleekxmpp/xmlstream/xmlstream.py @@ -187,6 +187,8 @@ class XMLStream(object): self.stop = threading.Event() self.stream_end_event = threading.Event() self.stream_end_event.set() + self.session_started_event = threading.Event() + self.event_queue = queue.Queue() self.send_queue = queue.Queue() self.scheduler = Scheduler(self.event_queue, self.stop) @@ -364,7 +366,8 @@ class XMLStream(object): def _disconnect(self, reconnect=False): # Send the end of stream marker. - self.send_raw(self.stream_footer) + self.send_raw(self.stream_footer, now=True) + self.session_started_event.clear() # Wait for confirmation that the stream was # closed in the other direction. self.auto_reconnect = reconnect @@ -657,7 +660,7 @@ class XMLStream(object): """ return xml - def send(self, data, mask=None, timeout=None): + def send(self, data, mask=None, timeout=None, now=False): """ A wrapper for send_raw for sending stanza objects. @@ -671,10 +674,13 @@ class XMLStream(object): or a timeout occurs. timeout -- Time in seconds to wait for a response before continuing. Defaults to RESPONSE_TIMEOUT. + now -- Indicates if the send queue should be skipped, + sending the stanza immediately. Useful mainly + for stream initialization stanzas. + Defaults to False. """ if timeout is None: timeout = self.response_timeout - if hasattr(mask, 'xml'): mask = mask.xml data = str(data) @@ -683,21 +689,11 @@ class XMLStream(object): wait_for = Waiter("SendWait_%s" % self.new_id(), MatchXMLMask(mask)) self.register_handler(wait_for) - self.send_raw(data) + self.send_raw(data, now) if mask is not None: return wait_for.wait(timeout) - def send_raw(self, data): - """ - Send raw data across the stream. - - Arguments: - data -- Any string value. - """ - self.send_queue.put(data) - return True - - def send_xml(self, data, mask=None, timeout=None): + def send_xml(self, data, mask=None, timeout=None, now=False): """ Send an XML object on the stream, and optionally wait for a response. @@ -710,10 +706,39 @@ class XMLStream(object): or a timeout occurs. timeout -- Time in seconds to wait for a response before continuing. Defaults to RESPONSE_TIMEOUT. + now -- Indicates if the send queue should be skipped, + sending the stanza immediately. Useful mainly + for stream initialization stanzas. + Defaults to False. """ if timeout is None: timeout = self.response_timeout - return self.send(tostring(data), mask, timeout) + return self.send(tostring(data), mask, timeout, now) + + def send_raw(self, data, now=False, reconnect=None): + """ + Send raw data across the stream. + + Arguments: + data -- Any string value. + reconnect -- Indicates if the stream should be + restarted if there is an error sending + the stanza. Used mainly for testing. + Defaults to self.auto_reconnect. + """ + if now: + log.debug("SEND: %s" % data) + try: + self.socket.send(data.encode('utf-8')) + except Socket.error as serr: + self.event('socket_error', serr) + log.warning("Failed to send %s" % data) + if reconnect is None: + reconnect = self.auto_reconnect + self.disconnect(reconnect) + else: + self.send_queue.put(data) + return True def process(self, threaded=True): """ @@ -767,7 +792,7 @@ class XMLStream(object): firstrun = False try: if self.is_client: - self.send_raw(self.stream_header) + self.send_raw(self.stream_header, now=True) # The call to self.__read_xml will block and prevent # the body of the loop from running until a disconnect # occurs. After any reconnection, the stream header will @@ -776,7 +801,7 @@ class XMLStream(object): # Ensure the stream header is sent for any # new connections. if self.is_client: - self.send_raw(self.stream_header) + self.send_raw(self.stream_header, now=True) except KeyboardInterrupt: log.debug("Keyboard Escape Detected in _process") self.stop.set() @@ -985,6 +1010,7 @@ class XMLStream(object): """ try: while not self.stop.isSet(): + self.session_started_event.wait() try: data = self.send_queue.get(True, 1) except queue.Empty: From 8080b4cae2000ccd5be2eaa442b903d1b180273b Mon Sep 17 00:00:00 2001 From: Lance Stout Date: Tue, 31 May 2011 10:23:05 -0700 Subject: [PATCH 5/6] Cleanup logging and exception handling. The syntax and attribute errors raised during a disconnect/reconnect attempt are now caught and produce nicer log messages. --- sleekxmpp/xmlstream/filesocket.py | 2 + sleekxmpp/xmlstream/xmlstream.py | 64 ++++++++++++++++--------------- 2 files changed, 36 insertions(+), 30 deletions(-) diff --git a/sleekxmpp/xmlstream/filesocket.py b/sleekxmpp/xmlstream/filesocket.py index 441ff87..fd81864 100644 --- a/sleekxmpp/xmlstream/filesocket.py +++ b/sleekxmpp/xmlstream/filesocket.py @@ -22,6 +22,8 @@ class FileSocket(_fileobject): def read(self, size=4096): """Read data from the socket as if it were a file.""" + if self._sock is None: + return None data = self._sock.recv(size) if data is not None: return data diff --git a/sleekxmpp/xmlstream/xmlstream.py b/sleekxmpp/xmlstream/xmlstream.py index 9d00ee8..121e597 100644 --- a/sleekxmpp/xmlstream/xmlstream.py +++ b/sleekxmpp/xmlstream/xmlstream.py @@ -727,7 +727,7 @@ class XMLStream(object): Defaults to self.auto_reconnect. """ if now: - log.debug("SEND: %s" % data) + log.debug("SEND (IMMED): %s" % data) try: self.socket.send(data.encode('utf-8')) except Socket.error as serr: @@ -829,35 +829,39 @@ class XMLStream(object): """ depth = 0 root = None - for (event, xml) in ET.iterparse(self.filesocket, (b'end', b'start')): - if event == b'start': - if depth == 0: - # We have received the start of the root element. - root = xml - # Perform any stream initialization actions, such - # as handshakes. - self.stream_end_event.clear() - self.start_stream_handler(root) - depth += 1 - if event == b'end': - depth -= 1 - if depth == 0: - # The stream's root element has closed, - # terminating the stream. - log.debug("End of stream recieved") - self.stream_end_event.set() - return False - elif depth == 1: - # We only raise events for stanzas that are direct - # children of the root element. - try: - self.__spawn_event(xml) - except RestartStream: - return True - if root: - # Keep the root element empty of children to - # save on memory use. - root.clear() + try: + for (event, xml) in ET.iterparse(self.filesocket, + (b'end', b'start')): + if event == b'start': + if depth == 0: + # We have received the start of the root element. + root = xml + # Perform any stream initialization actions, such + # as handshakes. + self.stream_end_event.clear() + self.start_stream_handler(root) + depth += 1 + if event == b'end': + depth -= 1 + if depth == 0: + # The stream's root element has closed, + # terminating the stream. + log.debug("End of stream recieved") + self.stream_end_event.set() + return False + elif depth == 1: + # We only raise events for stanzas that are direct + # children of the root element. + try: + self.__spawn_event(xml) + except RestartStream: + return True + if root: + # Keep the root element empty of children to + # save on memory use. + root.clear() + except SyntaxError: + log.error("Error reading from XML stream.") log.debug("Ending read XML loop") def _build_stanza(self, xml, default_ns=None): From a81162edd2434756e21d7f9a79d71d770a43db7b Mon Sep 17 00:00:00 2001 From: Lance Stout Date: Tue, 31 May 2011 10:53:14 -0700 Subject: [PATCH 6/6] Apply connection backoff to reconnect attempts. Backoff was only being done for the initial connection attempt before. Now any reconnection will start with a minimum 1 sec delay which will approximately double between attempts. --- sleekxmpp/xmlstream/xmlstream.py | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/sleekxmpp/xmlstream/xmlstream.py b/sleekxmpp/xmlstream/xmlstream.py index 121e597..2d72de5 100644 --- a/sleekxmpp/xmlstream/xmlstream.py +++ b/sleekxmpp/xmlstream/xmlstream.py @@ -163,6 +163,7 @@ class XMLStream(object): self.ca_certs = None self.response_timeout = RESPONSE_TIMEOUT + self.reconnect_delay = None self.reconnect_max_delay = RECONNECT_MAX_DELAY self.state = StateMachine(('disconnected', 'connected')) @@ -302,20 +303,24 @@ class XMLStream(object): # is established. connected = self.state.transition('disconnected', 'connected', func=self._connect) - delay = 1.0 while reattempt and not connected: connected = self.state.transition('disconnected', 'connected', func=self._connect) - delay = min(delay * 2, self.reconnect_max_delay) - delay = random.normalvariate(delay, delay * 0.1) - log.debug('Waiting %s seconds before reconnecting.' % delay) - time.sleep(delay) return connected def _connect(self): self.stop.clear() self.socket = self.socket_class(Socket.AF_INET, Socket.SOCK_STREAM) self.socket.settimeout(None) + + if self.reconnect_delay is None: + delay = 1.0 + else: + delay = min(self.reconnect_delay * 2, self.reconnect_max_delay) + delay = random.normalvariate(delay, delay * 0.1) + log.debug('Waiting %s seconds before connecting.' % delay) + time.sleep(delay) + if self.use_ssl and self.ssl_support: log.debug("Socket Wrapped for SSL") if self.ca_certs is None: @@ -340,13 +345,14 @@ class XMLStream(object): self.set_socket(self.socket, ignore=True) #this event is where you should set your application state self.event("connected", direct=True) + self.reconnect_delay = 1.0 return True except Socket.error as serr: error_msg = "Could not connect to %s:%s. Socket Error #%s: %s" self.event('socket_error', serr) log.error(error_msg % (self.address[0], self.address[1], serr.errno, serr.strerror)) - time.sleep(1) + self.reconnect_delay = delay return False def disconnect(self, reconnect=False):