Merge branch 'develop' into stream_features

Conflicts:
	sleekxmpp/clientxmpp.py
This commit is contained in:
Lance Stout 2011-05-31 11:05:54 -07:00
commit 83a73ac9b7
10 changed files with 180 additions and 67 deletions

View file

@ -77,9 +77,6 @@ class ClientXMPP(BaseXMPP):
self.plugin_whitelist = plugin_whitelist self.plugin_whitelist = plugin_whitelist
self.srv_support = SRV_SUPPORT self.srv_support = SRV_SUPPORT
self.session_started_event = threading.Event()
self.session_started_event.clear()
self.stream_header = "<stream:stream to='%s' %s %s version='1.0'>" % ( self.stream_header = "<stream:stream to='%s' %s %s version='1.0'>" % (
self.boundjid.host, self.boundjid.host,
"xmlns:stream='%s'" % self.stream_ns, "xmlns:stream='%s'" % self.stream_ns,
@ -369,7 +366,7 @@ class ClientXMPP(BaseXMPP):
MatchXPath(tls.Proceed.tag_name()), MatchXPath(tls.Proceed.tag_name()),
tls_proceed, tls_proceed,
instream=True)) instream=True))
self.send(features['starttls']) self.send(features['starttls'], now=True)
return True return True
else: else:
log.warning("The module tlslite is required to log in" +\ log.warning("The module tlslite is required to log in" +\
@ -446,8 +443,7 @@ class ClientXMPP(BaseXMPP):
resp = sasl.Auth(xmpp) resp = sasl.Auth(xmpp)
resp['mechanism'] = 'PLAIN' resp['mechanism'] = 'PLAIN'
resp['value'] = auth resp['value'] = auth
resp.send() resp.send(now=True)
return True return True
def _handle_sasl_anonymous(self, xmpp): def _handle_sasl_anonymous(self, xmpp):
@ -479,7 +475,7 @@ class ClientXMPP(BaseXMPP):
iq.enable('bind') iq.enable('bind')
if self.boundjid.resource: if self.boundjid.resource:
iq['bind']['resource'] = self.boundjid.resource iq['bind']['resource'] = self.boundjid.resource
response = iq.send() response = iq.send(now=True)
self.set_jid(response['bind']['jid']) self.set_jid(response['bind']['jid'])
self.bound = True self.bound = True
@ -502,7 +498,7 @@ class ClientXMPP(BaseXMPP):
iq = self.Iq() iq = self.Iq()
iq['type'] = 'set' iq['type'] = 'set'
iq.enable('session') iq.enable('session')
response = iq.send() response = iq.send(now=True)
log.debug("Established Session") log.debug("Established Session")
self.sessionstarted = True self.sessionstarted = True

View file

@ -138,4 +138,5 @@ class ComponentXMPP(BaseXMPP):
Arguments: Arguments:
xml -- The reply handshake stanza. xml -- The reply handshake stanza.
""" """
self.session_started_event.set()
self.event("session_start") self.event("session_start")

View file

@ -154,7 +154,7 @@ class Iq(RootStanza):
StanzaBase.reply(self, clear) StanzaBase.reply(self, clear)
return self return self
def send(self, block=True, timeout=None, callback=None): def send(self, block=True, timeout=None, callback=None, now=False):
""" """
Send an <iq> stanza over the XML stream. Send an <iq> stanza over the XML stream.
@ -178,6 +178,9 @@ class Iq(RootStanza):
Defaults to sleekxmpp.xmlstream.RESPONSE_TIMEOUT Defaults to sleekxmpp.xmlstream.RESPONSE_TIMEOUT
callback -- Optional reference to a stream handler function. Will callback -- Optional reference to a stream handler function. Will
be executed when a reply stanza is received. be executed when a reply stanza is received.
now -- Indicates if the send queue should be skipped and send
the stanza immediately. Used during stream
initialization. Defaults to False.
""" """
if timeout is None: if timeout is None:
timeout = self.stream.response_timeout timeout = self.stream.response_timeout
@ -188,15 +191,15 @@ class Iq(RootStanza):
callback, callback,
once=True) once=True)
self.stream.register_handler(handler) self.stream.register_handler(handler)
StanzaBase.send(self) StanzaBase.send(self, now=now)
return handler_name return handler_name
elif block and self['type'] in ('get', 'set'): elif block and self['type'] in ('get', 'set'):
waitfor = Waiter('IqWait_%s' % self['id'], MatcherId(self['id'])) waitfor = Waiter('IqWait_%s' % self['id'], MatcherId(self['id']))
self.stream.register_handler(waitfor) self.stream.register_handler(waitfor)
StanzaBase.send(self) StanzaBase.send(self, now=now)
return waitfor.wait(timeout) return waitfor.wait(timeout)
else: else:
return StanzaBase.send(self) return StanzaBase.send(self, now=now)
def _set_stanza_values(self, values): def _set_stanza_values(self, values):
""" """

View file

@ -58,6 +58,18 @@ class TestLiveSocket(object):
# ------------------------------------------------------------------ # ------------------------------------------------------------------
# Testing Interface # Testing Interface
def disconnect_errror(self):
"""
Used to simulate a socket disconnection error.
Not used by live sockets.
"""
try:
self.socket.shutdown()
self.socket.close()
except:
pass
def next_sent(self, timeout=None): def next_sent(self, timeout=None):
""" """
Get the next stanza that has been sent. Get the next stanza that has been sent.

View file

@ -39,6 +39,7 @@ class TestSocket(object):
self.recv_queue = queue.Queue() self.recv_queue = queue.Queue()
self.send_queue = queue.Queue() self.send_queue = queue.Queue()
self.is_live = False self.is_live = False
self.disconnected = False
def __getattr__(self, name): def __getattr__(self, name):
""" """
@ -89,6 +90,13 @@ class TestSocket(object):
""" """
self.recv_queue.put(data) self.recv_queue.put(data)
def disconnect_error(self):
"""
Simulate a disconnect error by raising a socket.error exception
for any current or further socket operations.
"""
self.disconnected = True
# ------------------------------------------------------------------ # ------------------------------------------------------------------
# Socket Interface # Socket Interface
@ -99,6 +107,8 @@ class TestSocket(object):
Arguments: Arguments:
Placeholders. Same as for socket.Socket.recv. Placeholders. Same as for socket.Socket.recv.
""" """
if self.disconnected:
raise socket.error
return self.read(block=True) return self.read(block=True)
def send(self, data): def send(self, data):
@ -108,6 +118,8 @@ class TestSocket(object):
Arguments: Arguments:
data -- String value to write. data -- String value to write.
""" """
if self.disconnected:
raise socket.error
self.send_queue.put(data) self.send_queue.put(data)
# ------------------------------------------------------------------ # ------------------------------------------------------------------
@ -132,6 +144,8 @@ class TestSocket(object):
timeout -- Time in seconds a block should last before timeout -- Time in seconds a block should last before
returning None. returning None.
""" """
if self.disconnected:
raise socket.error
if timeout is not None: if timeout is not None:
block = True block = True
try: try:

View file

@ -259,6 +259,13 @@ class SleekTest(unittest.TestCase):
# ------------------------------------------------------------------ # ------------------------------------------------------------------
# Methods for simulating stanza streams. # Methods for simulating stanza streams.
def stream_disconnect(self):
"""
Simulate a stream disconnection.
"""
if self.xmpp:
self.xmpp.socket.disconnect_error()
def stream_start(self, mode='client', skip=True, header=None, def stream_start(self, mode='client', skip=True, header=None,
socket='mock', jid='tester@localhost', socket='mock', jid='tester@localhost',
password='test', server='localhost', password='test', server='localhost',
@ -327,6 +334,8 @@ class SleekTest(unittest.TestCase):
self.xmpp.process(threaded=True) self.xmpp.process(threaded=True)
if skip: if skip:
if socket != 'live': if socket != 'live':
# Mark send queue as usable
self.xmpp.session_started_event.set()
# Clear startup stanzas # Clear startup stanzas
self.xmpp.socket.next_sent(timeout=1) self.xmpp.socket.next_sent(timeout=1)
if mode == 'component': if mode == 'component':

View file

@ -22,6 +22,8 @@ class FileSocket(_fileobject):
def read(self, size=4096): def read(self, size=4096):
"""Read data from the socket as if it were a file.""" """Read data from the socket as if it were a file."""
if self._sock is None:
return None
data = self._sock.recv(size) data = self._sock.recv(size)
if data is not None: if data is not None:
return data return data

View file

@ -1255,9 +1255,15 @@ class StanzaBase(ElementBase):
log.exception('Error handling {%s}%s stanza' % (self.namespace, log.exception('Error handling {%s}%s stanza' % (self.namespace,
self.name)) self.name))
def send(self): def send(self, now=False):
"""Queue the stanza to be sent on the XML stream.""" """
self.stream.sendRaw(self.__str__()) Queue the stanza to be sent on the XML stream.
Arguments:
now -- Indicates if the queue should be skipped and the
stanza sent immediately. Useful for stream
initialization. Defaults to False.
"""
self.stream.send_raw(self.__str__(), now=now)
def __copy__(self): def __copy__(self):
""" """

View file

@ -17,6 +17,7 @@ import sys
import threading import threading
import time import time
import types import types
import random
try: try:
import queue import queue
except ImportError: except ImportError:
@ -45,6 +46,9 @@ HANDLER_THREADS = 1
# Flag indicating if the SSL library is available for use. # Flag indicating if the SSL library is available for use.
SSL_SUPPORT = True SSL_SUPPORT = True
# Maximum time to delay between connection attempts is one hour.
RECONNECT_MAX_DELAY = 3600
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -104,7 +108,11 @@ class XMLStream(object):
use_ssl -- Flag indicating if SSL should be used. use_ssl -- Flag indicating if SSL should be used.
use_tls -- Flag indicating if TLS should be used. use_tls -- Flag indicating if TLS should be used.
stop -- threading Event used to stop all threads. stop -- threading Event used to stop all threads.
auto_reconnect-- Flag to determine whether we auto reconnect.
auto_reconnect -- Flag to determine whether we auto reconnect.
reconnect_max_delay -- Maximum time to delay between connection
attempts. Defaults to RECONNECT_MAX_DELAY,
which is one hour.
Methods: Methods:
add_event_handler -- Add a handler for a custom event. add_event_handler -- Add a handler for a custom event.
@ -155,6 +163,8 @@ class XMLStream(object):
self.ca_certs = None self.ca_certs = None
self.response_timeout = RESPONSE_TIMEOUT self.response_timeout = RESPONSE_TIMEOUT
self.reconnect_delay = None
self.reconnect_max_delay = RECONNECT_MAX_DELAY
self.state = StateMachine(('disconnected', 'connected')) self.state = StateMachine(('disconnected', 'connected'))
self.state._set_state('disconnected') self.state._set_state('disconnected')
@ -178,6 +188,8 @@ class XMLStream(object):
self.stop = threading.Event() self.stop = threading.Event()
self.stream_end_event = threading.Event() self.stream_end_event = threading.Event()
self.stream_end_event.set() self.stream_end_event.set()
self.session_started_event = threading.Event()
self.event_queue = queue.Queue() self.event_queue = queue.Queue()
self.send_queue = queue.Queue() self.send_queue = queue.Queue()
self.scheduler = Scheduler(self.event_queue, self.stop) self.scheduler = Scheduler(self.event_queue, self.stop)
@ -300,6 +312,15 @@ class XMLStream(object):
self.stop.clear() self.stop.clear()
self.socket = self.socket_class(Socket.AF_INET, Socket.SOCK_STREAM) self.socket = self.socket_class(Socket.AF_INET, Socket.SOCK_STREAM)
self.socket.settimeout(None) self.socket.settimeout(None)
if self.reconnect_delay is None:
delay = 1.0
else:
delay = min(self.reconnect_delay * 2, self.reconnect_max_delay)
delay = random.normalvariate(delay, delay * 0.1)
log.debug('Waiting %s seconds before connecting.' % delay)
time.sleep(delay)
if self.use_ssl and self.ssl_support: if self.use_ssl and self.ssl_support:
log.debug("Socket Wrapped for SSL") log.debug("Socket Wrapped for SSL")
if self.ca_certs is None: if self.ca_certs is None:
@ -309,7 +330,7 @@ class XMLStream(object):
ssl_socket = ssl.wrap_socket(self.socket, ssl_socket = ssl.wrap_socket(self.socket,
ca_certs=self.ca_certs, ca_certs=self.ca_certs,
certs_reqs=cert_policy) cert_reqs=cert_policy)
if hasattr(self.socket, 'socket'): if hasattr(self.socket, 'socket'):
# We are using a testing socket, so preserve the top # We are using a testing socket, so preserve the top
@ -324,13 +345,14 @@ class XMLStream(object):
self.set_socket(self.socket, ignore=True) self.set_socket(self.socket, ignore=True)
#this event is where you should set your application state #this event is where you should set your application state
self.event("connected", direct=True) self.event("connected", direct=True)
self.reconnect_delay = 1.0
return True return True
except Socket.error as serr: except Socket.error as serr:
error_msg = "Could not connect to %s:%s. Socket Error #%s: %s" error_msg = "Could not connect to %s:%s. Socket Error #%s: %s"
self.event('socket_error', serr) self.event('socket_error', serr)
log.error(error_msg % (self.address[0], self.address[1], log.error(error_msg % (self.address[0], self.address[1],
serr.errno, serr.strerror)) serr.errno, serr.strerror))
time.sleep(1) self.reconnect_delay = delay
return False return False
def disconnect(self, reconnect=False): def disconnect(self, reconnect=False):
@ -350,7 +372,8 @@ class XMLStream(object):
def _disconnect(self, reconnect=False): def _disconnect(self, reconnect=False):
# Send the end of stream marker. # Send the end of stream marker.
self.send_raw(self.stream_footer) self.send_raw(self.stream_footer, now=True)
self.session_started_event.clear()
# Wait for confirmation that the stream was # Wait for confirmation that the stream was
# closed in the other direction. # closed in the other direction.
self.auto_reconnect = reconnect self.auto_reconnect = reconnect
@ -643,7 +666,7 @@ class XMLStream(object):
""" """
return xml return xml
def send(self, data, mask=None, timeout=None): def send(self, data, mask=None, timeout=None, now=False):
""" """
A wrapper for send_raw for sending stanza objects. A wrapper for send_raw for sending stanza objects.
@ -657,10 +680,13 @@ class XMLStream(object):
or a timeout occurs. or a timeout occurs.
timeout -- Time in seconds to wait for a response before timeout -- Time in seconds to wait for a response before
continuing. Defaults to RESPONSE_TIMEOUT. continuing. Defaults to RESPONSE_TIMEOUT.
now -- Indicates if the send queue should be skipped,
sending the stanza immediately. Useful mainly
for stream initialization stanzas.
Defaults to False.
""" """
if timeout is None: if timeout is None:
timeout = self.response_timeout timeout = self.response_timeout
if hasattr(mask, 'xml'): if hasattr(mask, 'xml'):
mask = mask.xml mask = mask.xml
data = str(data) data = str(data)
@ -669,21 +695,11 @@ class XMLStream(object):
wait_for = Waiter("SendWait_%s" % self.new_id(), wait_for = Waiter("SendWait_%s" % self.new_id(),
MatchXMLMask(mask)) MatchXMLMask(mask))
self.register_handler(wait_for) self.register_handler(wait_for)
self.send_raw(data) self.send_raw(data, now)
if mask is not None: if mask is not None:
return wait_for.wait(timeout) return wait_for.wait(timeout)
def send_raw(self, data): def send_xml(self, data, mask=None, timeout=None, now=False):
"""
Send raw data across the stream.
Arguments:
data -- Any string value.
"""
self.send_queue.put(data)
return True
def send_xml(self, data, mask=None, timeout=None):
""" """
Send an XML object on the stream, and optionally wait Send an XML object on the stream, and optionally wait
for a response. for a response.
@ -696,10 +712,39 @@ class XMLStream(object):
or a timeout occurs. or a timeout occurs.
timeout -- Time in seconds to wait for a response before timeout -- Time in seconds to wait for a response before
continuing. Defaults to RESPONSE_TIMEOUT. continuing. Defaults to RESPONSE_TIMEOUT.
now -- Indicates if the send queue should be skipped,
sending the stanza immediately. Useful mainly
for stream initialization stanzas.
Defaults to False.
""" """
if timeout is None: if timeout is None:
timeout = self.response_timeout timeout = self.response_timeout
return self.send(tostring(data), mask, timeout) return self.send(tostring(data), mask, timeout, now)
def send_raw(self, data, now=False, reconnect=None):
"""
Send raw data across the stream.
Arguments:
data -- Any string value.
reconnect -- Indicates if the stream should be
restarted if there is an error sending
the stanza. Used mainly for testing.
Defaults to self.auto_reconnect.
"""
if now:
log.debug("SEND (IMMED): %s" % data)
try:
self.socket.send(data.encode('utf-8'))
except Socket.error as serr:
self.event('socket_error', serr)
log.warning("Failed to send %s" % data)
if reconnect is None:
reconnect = self.auto_reconnect
self.disconnect(reconnect)
else:
self.send_queue.put(data)
return True
def process(self, threaded=True): def process(self, threaded=True):
""" """
@ -753,7 +798,7 @@ class XMLStream(object):
firstrun = False firstrun = False
try: try:
if self.is_client: if self.is_client:
self.send_raw(self.stream_header) self.send_raw(self.stream_header, now=True)
# The call to self.__read_xml will block and prevent # The call to self.__read_xml will block and prevent
# the body of the loop from running until a disconnect # the body of the loop from running until a disconnect
# occurs. After any reconnection, the stream header will # occurs. After any reconnection, the stream header will
@ -762,7 +807,7 @@ class XMLStream(object):
# Ensure the stream header is sent for any # Ensure the stream header is sent for any
# new connections. # new connections.
if self.is_client: if self.is_client:
self.send_raw(self.stream_header) self.send_raw(self.stream_header, now=True)
except KeyboardInterrupt: except KeyboardInterrupt:
log.debug("Keyboard Escape Detected in _process") log.debug("Keyboard Escape Detected in _process")
self.stop.set() self.stop.set()
@ -790,35 +835,39 @@ class XMLStream(object):
""" """
depth = 0 depth = 0
root = None root = None
for (event, xml) in ET.iterparse(self.filesocket, (b'end', b'start')): try:
if event == b'start': for (event, xml) in ET.iterparse(self.filesocket,
if depth == 0: (b'end', b'start')):
# We have received the start of the root element. if event == b'start':
root = xml if depth == 0:
# Perform any stream initialization actions, such # We have received the start of the root element.
# as handshakes. root = xml
self.stream_end_event.clear() # Perform any stream initialization actions, such
self.start_stream_handler(root) # as handshakes.
depth += 1 self.stream_end_event.clear()
if event == b'end': self.start_stream_handler(root)
depth -= 1 depth += 1
if depth == 0: if event == b'end':
# The stream's root element has closed, depth -= 1
# terminating the stream. if depth == 0:
log.debug("End of stream recieved") # The stream's root element has closed,
self.stream_end_event.set() # terminating the stream.
return False log.debug("End of stream recieved")
elif depth == 1: self.stream_end_event.set()
# We only raise events for stanzas that are direct return False
# children of the root element. elif depth == 1:
try: # We only raise events for stanzas that are direct
self.__spawn_event(xml) # children of the root element.
except RestartStream: try:
return True self.__spawn_event(xml)
if root: except RestartStream:
# Keep the root element empty of children to return True
# save on memory use. if root:
root.clear() # Keep the root element empty of children to
# save on memory use.
root.clear()
except SyntaxError:
log.error("Error reading from XML stream.")
log.debug("Ending read XML loop") log.debug("Ending read XML loop")
def _build_stanza(self, xml, default_ns=None): def _build_stanza(self, xml, default_ns=None):
@ -971,6 +1020,7 @@ class XMLStream(object):
""" """
try: try:
while not self.stop.isSet(): while not self.stop.isSet():
self.session_started_event.wait()
try: try:
data = self.send_queue.get(True, 1) data = self.send_queue.get(True, 1)
except queue.Empty: except queue.Empty:
@ -978,7 +1028,8 @@ class XMLStream(object):
log.debug("SEND: %s" % data) log.debug("SEND: %s" % data)
try: try:
self.socket.send(data.encode('utf-8')) self.socket.send(data.encode('utf-8'))
except: except Socket.error as serr:
self.event('socket_error', serr)
log.warning("Failed to send %s" % data) log.warning("Failed to send %s" % data)
self.disconnect(self.auto_reconnect) self.disconnect(self.auto_reconnect)
except KeyboardInterrupt: except KeyboardInterrupt:

View file

@ -1,5 +1,5 @@
import time
from sleekxmpp.test import * from sleekxmpp.test import *
import sleekxmpp.plugins.xep_0033 as xep_0033
class TestStreamTester(SleekTest): class TestStreamTester(SleekTest):
@ -57,4 +57,23 @@ class TestStreamTester(SleekTest):
self.stream_start(mode='client', skip=False) self.stream_start(mode='client', skip=False)
self.send_header(sto='localhost') self.send_header(sto='localhost')
def testStreamDisconnect(self):
"""Test that the test socket can simulate disconnections."""
self.stream_start()
events = set()
def stream_error(event):
events.add('socket_error')
self.xmpp.add_event_handler('socket_error', stream_error)
self.stream_disconnect()
self.xmpp.send_raw(' ')
time.sleep(.1)
self.failUnless('socket_error' in events,
"Stream error event not raised: %s" % events)
suite = unittest.TestLoader().loadTestsFromTestCase(TestStreamTester) suite = unittest.TestLoader().loadTestsFromTestCase(TestStreamTester)