diff --git a/sleekxmpp/clientxmpp.py b/sleekxmpp/clientxmpp.py index a93afe5..3a5f41b 100644 --- a/sleekxmpp/clientxmpp.py +++ b/sleekxmpp/clientxmpp.py @@ -77,9 +77,6 @@ class ClientXMPP(BaseXMPP): self.plugin_whitelist = plugin_whitelist self.srv_support = SRV_SUPPORT - self.session_started_event = threading.Event() - self.session_started_event.clear() - self.stream_header = "" % ( self.boundjid.host, "xmlns:stream='%s'" % self.stream_ns, @@ -369,7 +366,7 @@ class ClientXMPP(BaseXMPP): MatchXPath(tls.Proceed.tag_name()), tls_proceed, instream=True)) - self.send(features['starttls']) + self.send(features['starttls'], now=True) return True else: log.warning("The module tlslite is required to log in" +\ @@ -446,8 +443,7 @@ class ClientXMPP(BaseXMPP): resp = sasl.Auth(xmpp) resp['mechanism'] = 'PLAIN' resp['value'] = auth - resp.send() - + resp.send(now=True) return True def _handle_sasl_anonymous(self, xmpp): @@ -479,7 +475,7 @@ class ClientXMPP(BaseXMPP): iq.enable('bind') if self.boundjid.resource: iq['bind']['resource'] = self.boundjid.resource - response = iq.send() + response = iq.send(now=True) self.set_jid(response['bind']['jid']) self.bound = True @@ -502,7 +498,7 @@ class ClientXMPP(BaseXMPP): iq = self.Iq() iq['type'] = 'set' iq.enable('session') - response = iq.send() + response = iq.send(now=True) log.debug("Established Session") self.sessionstarted = True diff --git a/sleekxmpp/componentxmpp.py b/sleekxmpp/componentxmpp.py index ae58c5f..8c380dd 100644 --- a/sleekxmpp/componentxmpp.py +++ b/sleekxmpp/componentxmpp.py @@ -138,4 +138,5 @@ class ComponentXMPP(BaseXMPP): Arguments: xml -- The reply handshake stanza. """ + self.session_started_event.set() self.event("session_start") diff --git a/sleekxmpp/stanza/iq.py b/sleekxmpp/stanza/iq.py index 82ab13e..4a12a87 100644 --- a/sleekxmpp/stanza/iq.py +++ b/sleekxmpp/stanza/iq.py @@ -154,7 +154,7 @@ class Iq(RootStanza): StanzaBase.reply(self, clear) return self - def send(self, block=True, timeout=None, callback=None): + def send(self, block=True, timeout=None, callback=None, now=False): """ Send an stanza over the XML stream. @@ -178,6 +178,9 @@ class Iq(RootStanza): Defaults to sleekxmpp.xmlstream.RESPONSE_TIMEOUT callback -- Optional reference to a stream handler function. Will be executed when a reply stanza is received. + now -- Indicates if the send queue should be skipped and send + the stanza immediately. Used during stream + initialization. Defaults to False. """ if timeout is None: timeout = self.stream.response_timeout @@ -188,15 +191,15 @@ class Iq(RootStanza): callback, once=True) self.stream.register_handler(handler) - StanzaBase.send(self) + StanzaBase.send(self, now=now) return handler_name elif block and self['type'] in ('get', 'set'): waitfor = Waiter('IqWait_%s' % self['id'], MatcherId(self['id'])) self.stream.register_handler(waitfor) - StanzaBase.send(self) + StanzaBase.send(self, now=now) return waitfor.wait(timeout) else: - return StanzaBase.send(self) + return StanzaBase.send(self, now=now) def _set_stanza_values(self, values): """ diff --git a/sleekxmpp/test/livesocket.py b/sleekxmpp/test/livesocket.py index 3e0f213..7dd4693 100644 --- a/sleekxmpp/test/livesocket.py +++ b/sleekxmpp/test/livesocket.py @@ -58,6 +58,18 @@ class TestLiveSocket(object): # ------------------------------------------------------------------ # Testing Interface + def disconnect_errror(self): + """ + Used to simulate a socket disconnection error. + + Not used by live sockets. + """ + try: + self.socket.shutdown() + self.socket.close() + except: + pass + def next_sent(self, timeout=None): """ Get the next stanza that has been sent. diff --git a/sleekxmpp/test/mocksocket.py b/sleekxmpp/test/mocksocket.py index e3ddd70..a2af8d6 100644 --- a/sleekxmpp/test/mocksocket.py +++ b/sleekxmpp/test/mocksocket.py @@ -39,6 +39,7 @@ class TestSocket(object): self.recv_queue = queue.Queue() self.send_queue = queue.Queue() self.is_live = False + self.disconnected = False def __getattr__(self, name): """ @@ -89,6 +90,13 @@ class TestSocket(object): """ self.recv_queue.put(data) + def disconnect_error(self): + """ + Simulate a disconnect error by raising a socket.error exception + for any current or further socket operations. + """ + self.disconnected = True + # ------------------------------------------------------------------ # Socket Interface @@ -99,6 +107,8 @@ class TestSocket(object): Arguments: Placeholders. Same as for socket.Socket.recv. """ + if self.disconnected: + raise socket.error return self.read(block=True) def send(self, data): @@ -108,6 +118,8 @@ class TestSocket(object): Arguments: data -- String value to write. """ + if self.disconnected: + raise socket.error self.send_queue.put(data) # ------------------------------------------------------------------ @@ -132,6 +144,8 @@ class TestSocket(object): timeout -- Time in seconds a block should last before returning None. """ + if self.disconnected: + raise socket.error if timeout is not None: block = True try: diff --git a/sleekxmpp/test/sleektest.py b/sleekxmpp/test/sleektest.py index 24af1e7..7802a9b 100644 --- a/sleekxmpp/test/sleektest.py +++ b/sleekxmpp/test/sleektest.py @@ -259,6 +259,13 @@ class SleekTest(unittest.TestCase): # ------------------------------------------------------------------ # Methods for simulating stanza streams. + def stream_disconnect(self): + """ + Simulate a stream disconnection. + """ + if self.xmpp: + self.xmpp.socket.disconnect_error() + def stream_start(self, mode='client', skip=True, header=None, socket='mock', jid='tester@localhost', password='test', server='localhost', @@ -327,6 +334,8 @@ class SleekTest(unittest.TestCase): self.xmpp.process(threaded=True) if skip: if socket != 'live': + # Mark send queue as usable + self.xmpp.session_started_event.set() # Clear startup stanzas self.xmpp.socket.next_sent(timeout=1) if mode == 'component': diff --git a/sleekxmpp/xmlstream/filesocket.py b/sleekxmpp/xmlstream/filesocket.py index 441ff87..fd81864 100644 --- a/sleekxmpp/xmlstream/filesocket.py +++ b/sleekxmpp/xmlstream/filesocket.py @@ -22,6 +22,8 @@ class FileSocket(_fileobject): def read(self, size=4096): """Read data from the socket as if it were a file.""" + if self._sock is None: + return None data = self._sock.recv(size) if data is not None: return data diff --git a/sleekxmpp/xmlstream/stanzabase.py b/sleekxmpp/xmlstream/stanzabase.py index 28f78f3..f1a9e1f 100644 --- a/sleekxmpp/xmlstream/stanzabase.py +++ b/sleekxmpp/xmlstream/stanzabase.py @@ -1255,9 +1255,15 @@ class StanzaBase(ElementBase): log.exception('Error handling {%s}%s stanza' % (self.namespace, self.name)) - def send(self): - """Queue the stanza to be sent on the XML stream.""" - self.stream.sendRaw(self.__str__()) + def send(self, now=False): + """ + Queue the stanza to be sent on the XML stream. + Arguments: + now -- Indicates if the queue should be skipped and the + stanza sent immediately. Useful for stream + initialization. Defaults to False. + """ + self.stream.send_raw(self.__str__(), now=now) def __copy__(self): """ diff --git a/sleekxmpp/xmlstream/xmlstream.py b/sleekxmpp/xmlstream/xmlstream.py index 1c16556..2d72de5 100644 --- a/sleekxmpp/xmlstream/xmlstream.py +++ b/sleekxmpp/xmlstream/xmlstream.py @@ -17,6 +17,7 @@ import sys import threading import time import types +import random try: import queue except ImportError: @@ -45,6 +46,9 @@ HANDLER_THREADS = 1 # Flag indicating if the SSL library is available for use. SSL_SUPPORT = True +# Maximum time to delay between connection attempts is one hour. +RECONNECT_MAX_DELAY = 3600 + log = logging.getLogger(__name__) @@ -104,7 +108,11 @@ class XMLStream(object): use_ssl -- Flag indicating if SSL should be used. use_tls -- Flag indicating if TLS should be used. stop -- threading Event used to stop all threads. - auto_reconnect-- Flag to determine whether we auto reconnect. + + auto_reconnect -- Flag to determine whether we auto reconnect. + reconnect_max_delay -- Maximum time to delay between connection + attempts. Defaults to RECONNECT_MAX_DELAY, + which is one hour. Methods: add_event_handler -- Add a handler for a custom event. @@ -155,6 +163,8 @@ class XMLStream(object): self.ca_certs = None self.response_timeout = RESPONSE_TIMEOUT + self.reconnect_delay = None + self.reconnect_max_delay = RECONNECT_MAX_DELAY self.state = StateMachine(('disconnected', 'connected')) self.state._set_state('disconnected') @@ -178,6 +188,8 @@ class XMLStream(object): self.stop = threading.Event() self.stream_end_event = threading.Event() self.stream_end_event.set() + self.session_started_event = threading.Event() + self.event_queue = queue.Queue() self.send_queue = queue.Queue() self.scheduler = Scheduler(self.event_queue, self.stop) @@ -300,6 +312,15 @@ class XMLStream(object): self.stop.clear() self.socket = self.socket_class(Socket.AF_INET, Socket.SOCK_STREAM) self.socket.settimeout(None) + + if self.reconnect_delay is None: + delay = 1.0 + else: + delay = min(self.reconnect_delay * 2, self.reconnect_max_delay) + delay = random.normalvariate(delay, delay * 0.1) + log.debug('Waiting %s seconds before connecting.' % delay) + time.sleep(delay) + if self.use_ssl and self.ssl_support: log.debug("Socket Wrapped for SSL") if self.ca_certs is None: @@ -309,7 +330,7 @@ class XMLStream(object): ssl_socket = ssl.wrap_socket(self.socket, ca_certs=self.ca_certs, - certs_reqs=cert_policy) + cert_reqs=cert_policy) if hasattr(self.socket, 'socket'): # We are using a testing socket, so preserve the top @@ -324,13 +345,14 @@ class XMLStream(object): self.set_socket(self.socket, ignore=True) #this event is where you should set your application state self.event("connected", direct=True) + self.reconnect_delay = 1.0 return True except Socket.error as serr: error_msg = "Could not connect to %s:%s. Socket Error #%s: %s" self.event('socket_error', serr) log.error(error_msg % (self.address[0], self.address[1], serr.errno, serr.strerror)) - time.sleep(1) + self.reconnect_delay = delay return False def disconnect(self, reconnect=False): @@ -350,7 +372,8 @@ class XMLStream(object): def _disconnect(self, reconnect=False): # Send the end of stream marker. - self.send_raw(self.stream_footer) + self.send_raw(self.stream_footer, now=True) + self.session_started_event.clear() # Wait for confirmation that the stream was # closed in the other direction. self.auto_reconnect = reconnect @@ -643,7 +666,7 @@ class XMLStream(object): """ return xml - def send(self, data, mask=None, timeout=None): + def send(self, data, mask=None, timeout=None, now=False): """ A wrapper for send_raw for sending stanza objects. @@ -657,10 +680,13 @@ class XMLStream(object): or a timeout occurs. timeout -- Time in seconds to wait for a response before continuing. Defaults to RESPONSE_TIMEOUT. + now -- Indicates if the send queue should be skipped, + sending the stanza immediately. Useful mainly + for stream initialization stanzas. + Defaults to False. """ if timeout is None: timeout = self.response_timeout - if hasattr(mask, 'xml'): mask = mask.xml data = str(data) @@ -669,21 +695,11 @@ class XMLStream(object): wait_for = Waiter("SendWait_%s" % self.new_id(), MatchXMLMask(mask)) self.register_handler(wait_for) - self.send_raw(data) + self.send_raw(data, now) if mask is not None: return wait_for.wait(timeout) - def send_raw(self, data): - """ - Send raw data across the stream. - - Arguments: - data -- Any string value. - """ - self.send_queue.put(data) - return True - - def send_xml(self, data, mask=None, timeout=None): + def send_xml(self, data, mask=None, timeout=None, now=False): """ Send an XML object on the stream, and optionally wait for a response. @@ -696,10 +712,39 @@ class XMLStream(object): or a timeout occurs. timeout -- Time in seconds to wait for a response before continuing. Defaults to RESPONSE_TIMEOUT. + now -- Indicates if the send queue should be skipped, + sending the stanza immediately. Useful mainly + for stream initialization stanzas. + Defaults to False. """ if timeout is None: timeout = self.response_timeout - return self.send(tostring(data), mask, timeout) + return self.send(tostring(data), mask, timeout, now) + + def send_raw(self, data, now=False, reconnect=None): + """ + Send raw data across the stream. + + Arguments: + data -- Any string value. + reconnect -- Indicates if the stream should be + restarted if there is an error sending + the stanza. Used mainly for testing. + Defaults to self.auto_reconnect. + """ + if now: + log.debug("SEND (IMMED): %s" % data) + try: + self.socket.send(data.encode('utf-8')) + except Socket.error as serr: + self.event('socket_error', serr) + log.warning("Failed to send %s" % data) + if reconnect is None: + reconnect = self.auto_reconnect + self.disconnect(reconnect) + else: + self.send_queue.put(data) + return True def process(self, threaded=True): """ @@ -753,7 +798,7 @@ class XMLStream(object): firstrun = False try: if self.is_client: - self.send_raw(self.stream_header) + self.send_raw(self.stream_header, now=True) # The call to self.__read_xml will block and prevent # the body of the loop from running until a disconnect # occurs. After any reconnection, the stream header will @@ -762,7 +807,7 @@ class XMLStream(object): # Ensure the stream header is sent for any # new connections. if self.is_client: - self.send_raw(self.stream_header) + self.send_raw(self.stream_header, now=True) except KeyboardInterrupt: log.debug("Keyboard Escape Detected in _process") self.stop.set() @@ -790,35 +835,39 @@ class XMLStream(object): """ depth = 0 root = None - for (event, xml) in ET.iterparse(self.filesocket, (b'end', b'start')): - if event == b'start': - if depth == 0: - # We have received the start of the root element. - root = xml - # Perform any stream initialization actions, such - # as handshakes. - self.stream_end_event.clear() - self.start_stream_handler(root) - depth += 1 - if event == b'end': - depth -= 1 - if depth == 0: - # The stream's root element has closed, - # terminating the stream. - log.debug("End of stream recieved") - self.stream_end_event.set() - return False - elif depth == 1: - # We only raise events for stanzas that are direct - # children of the root element. - try: - self.__spawn_event(xml) - except RestartStream: - return True - if root: - # Keep the root element empty of children to - # save on memory use. - root.clear() + try: + for (event, xml) in ET.iterparse(self.filesocket, + (b'end', b'start')): + if event == b'start': + if depth == 0: + # We have received the start of the root element. + root = xml + # Perform any stream initialization actions, such + # as handshakes. + self.stream_end_event.clear() + self.start_stream_handler(root) + depth += 1 + if event == b'end': + depth -= 1 + if depth == 0: + # The stream's root element has closed, + # terminating the stream. + log.debug("End of stream recieved") + self.stream_end_event.set() + return False + elif depth == 1: + # We only raise events for stanzas that are direct + # children of the root element. + try: + self.__spawn_event(xml) + except RestartStream: + return True + if root: + # Keep the root element empty of children to + # save on memory use. + root.clear() + except SyntaxError: + log.error("Error reading from XML stream.") log.debug("Ending read XML loop") def _build_stanza(self, xml, default_ns=None): @@ -971,6 +1020,7 @@ class XMLStream(object): """ try: while not self.stop.isSet(): + self.session_started_event.wait() try: data = self.send_queue.get(True, 1) except queue.Empty: @@ -978,7 +1028,8 @@ class XMLStream(object): log.debug("SEND: %s" % data) try: self.socket.send(data.encode('utf-8')) - except: + except Socket.error as serr: + self.event('socket_error', serr) log.warning("Failed to send %s" % data) self.disconnect(self.auto_reconnect) except KeyboardInterrupt: diff --git a/tests/test_stream.py b/tests/test_stream.py index f91f71f..deac24a 100644 --- a/tests/test_stream.py +++ b/tests/test_stream.py @@ -1,5 +1,5 @@ +import time from sleekxmpp.test import * -import sleekxmpp.plugins.xep_0033 as xep_0033 class TestStreamTester(SleekTest): @@ -57,4 +57,23 @@ class TestStreamTester(SleekTest): self.stream_start(mode='client', skip=False) self.send_header(sto='localhost') + def testStreamDisconnect(self): + """Test that the test socket can simulate disconnections.""" + self.stream_start() + events = set() + + def stream_error(event): + events.add('socket_error') + + self.xmpp.add_event_handler('socket_error', stream_error) + + self.stream_disconnect() + self.xmpp.send_raw(' ') + + time.sleep(.1) + + self.failUnless('socket_error' in events, + "Stream error event not raised: %s" % events) + + suite = unittest.TestLoader().loadTestsFromTestCase(TestStreamTester)