Don't use the send queue for stream initialization.

Use the parameter now=True to skip the queue when
sending Iq stanzas, or using xmpp.send().
This commit is contained in:
Lance Stout 2011-05-27 16:59:52 -07:00
parent 6997b2fbf8
commit 1735c194cd
6 changed files with 70 additions and 33 deletions

View file

@ -75,9 +75,6 @@ class ClientXMPP(BaseXMPP):
self.plugin_whitelist = plugin_whitelist self.plugin_whitelist = plugin_whitelist
self.srv_support = SRV_SUPPORT self.srv_support = SRV_SUPPORT
self.session_started_event = threading.Event()
self.session_started_event.clear()
self.stream_header = "<stream:stream to='%s' %s %s version='1.0'>" % ( self.stream_header = "<stream:stream to='%s' %s %s version='1.0'>" % (
self.boundjid.host, self.boundjid.host,
"xmlns:stream='%s'" % self.stream_ns, "xmlns:stream='%s'" % self.stream_ns,
@ -313,7 +310,7 @@ class ClientXMPP(BaseXMPP):
self._handle_tls_start, self._handle_tls_start,
name='TLS Proceed', name='TLS Proceed',
instream=True) instream=True)
self.send_xml(xml) self.send_xml(xml, now=True)
return True return True
else: else:
log.warning("The module tlslite is required to log in" +\ log.warning("The module tlslite is required to log in" +\
@ -369,11 +366,13 @@ class ClientXMPP(BaseXMPP):
self.send("<auth xmlns='%s' mechanism='PLAIN'>%s</auth>" % ( self.send("<auth xmlns='%s' mechanism='PLAIN'>%s</auth>" % (
sasl_ns, sasl_ns,
auth)) auth),
now=True)
elif 'sasl:ANONYMOUS' in self.features and not self.boundjid.user: elif 'sasl:ANONYMOUS' in self.features and not self.boundjid.user:
self.send("<auth xmlns='%s' mechanism='%s' />" % ( self.send("<auth xmlns='%s' mechanism='%s' />" % (
sasl_ns, sasl_ns,
'ANONYMOUS')) 'ANONYMOUS'),
now=True)
else: else:
log.error("No appropriate login method.") log.error("No appropriate login method.")
self.disconnect() self.disconnect()
@ -416,7 +415,7 @@ class ClientXMPP(BaseXMPP):
res.text = self.boundjid.resource res.text = self.boundjid.resource
xml.append(res) xml.append(res)
iq.append(xml) iq.append(xml)
response = iq.send() response = iq.send(now=True)
bind_ns = 'urn:ietf:params:xml:ns:xmpp-bind' bind_ns = 'urn:ietf:params:xml:ns:xmpp-bind'
self.set_jid(response.xml.find('{%s}bind/{%s}jid' % (bind_ns, self.set_jid(response.xml.find('{%s}bind/{%s}jid' % (bind_ns,
@ -439,7 +438,7 @@ class ClientXMPP(BaseXMPP):
""" """
if self.authenticated and self.bound: if self.authenticated and self.bound:
iq = self.makeIqSet(xml) iq = self.makeIqSet(xml)
response = iq.send() response = iq.send(now=True)
log.debug("Established Session") log.debug("Established Session")
self.sessionstarted = True self.sessionstarted = True
self.session_started_event.set() self.session_started_event.set()

View file

@ -138,4 +138,5 @@ class ComponentXMPP(BaseXMPP):
Arguments: Arguments:
xml -- The reply handshake stanza. xml -- The reply handshake stanza.
""" """
self.session_started_event.set()
self.event("session_start") self.event("session_start")

View file

@ -154,7 +154,7 @@ class Iq(RootStanza):
StanzaBase.reply(self, clear) StanzaBase.reply(self, clear)
return self return self
def send(self, block=True, timeout=None, callback=None): def send(self, block=True, timeout=None, callback=None, now=False):
""" """
Send an <iq> stanza over the XML stream. Send an <iq> stanza over the XML stream.
@ -178,6 +178,9 @@ class Iq(RootStanza):
Defaults to sleekxmpp.xmlstream.RESPONSE_TIMEOUT Defaults to sleekxmpp.xmlstream.RESPONSE_TIMEOUT
callback -- Optional reference to a stream handler function. Will callback -- Optional reference to a stream handler function. Will
be executed when a reply stanza is received. be executed when a reply stanza is received.
now -- Indicates if the send queue should be skipped and send
the stanza immediately. Used during stream
initialization. Defaults to False.
""" """
if timeout is None: if timeout is None:
timeout = self.stream.response_timeout timeout = self.stream.response_timeout
@ -188,15 +191,15 @@ class Iq(RootStanza):
callback, callback,
once=True) once=True)
self.stream.register_handler(handler) self.stream.register_handler(handler)
StanzaBase.send(self) StanzaBase.send(self, now=now)
return handler_name return handler_name
elif block and self['type'] in ('get', 'set'): elif block and self['type'] in ('get', 'set'):
waitfor = Waiter('IqWait_%s' % self['id'], MatcherId(self['id'])) waitfor = Waiter('IqWait_%s' % self['id'], MatcherId(self['id']))
self.stream.register_handler(waitfor) self.stream.register_handler(waitfor)
StanzaBase.send(self) StanzaBase.send(self, now=now)
return waitfor.wait(timeout) return waitfor.wait(timeout)
else: else:
return StanzaBase.send(self) return StanzaBase.send(self, now=now)
def _set_stanza_values(self, values): def _set_stanza_values(self, values):
""" """

View file

@ -334,6 +334,8 @@ class SleekTest(unittest.TestCase):
self.xmpp.process(threaded=True) self.xmpp.process(threaded=True)
if skip: if skip:
if socket != 'live': if socket != 'live':
# Mark send queue as usable
self.xmpp.session_started_event.set()
# Clear startup stanzas # Clear startup stanzas
self.xmpp.socket.next_sent(timeout=1) self.xmpp.socket.next_sent(timeout=1)
if mode == 'component': if mode == 'component':

View file

@ -1253,9 +1253,15 @@ class StanzaBase(ElementBase):
log.exception('Error handling {%s}%s stanza' % (self.namespace, log.exception('Error handling {%s}%s stanza' % (self.namespace,
self.name)) self.name))
def send(self): def send(self, now=False):
"""Queue the stanza to be sent on the XML stream.""" """
self.stream.sendRaw(self.__str__()) Queue the stanza to be sent on the XML stream.
Arguments:
now -- Indicates if the queue should be skipped and the
stanza sent immediately. Useful for stream
initialization. Defaults to False.
"""
self.stream.send_raw(self.__str__(), now=now)
def __copy__(self): def __copy__(self):
""" """

View file

@ -187,6 +187,8 @@ class XMLStream(object):
self.stop = threading.Event() self.stop = threading.Event()
self.stream_end_event = threading.Event() self.stream_end_event = threading.Event()
self.stream_end_event.set() self.stream_end_event.set()
self.session_started_event = threading.Event()
self.event_queue = queue.Queue() self.event_queue = queue.Queue()
self.send_queue = queue.Queue() self.send_queue = queue.Queue()
self.scheduler = Scheduler(self.event_queue, self.stop) self.scheduler = Scheduler(self.event_queue, self.stop)
@ -364,7 +366,8 @@ class XMLStream(object):
def _disconnect(self, reconnect=False): def _disconnect(self, reconnect=False):
# Send the end of stream marker. # Send the end of stream marker.
self.send_raw(self.stream_footer) self.send_raw(self.stream_footer, now=True)
self.session_started_event.clear()
# Wait for confirmation that the stream was # Wait for confirmation that the stream was
# closed in the other direction. # closed in the other direction.
self.auto_reconnect = reconnect self.auto_reconnect = reconnect
@ -657,7 +660,7 @@ class XMLStream(object):
""" """
return xml return xml
def send(self, data, mask=None, timeout=None): def send(self, data, mask=None, timeout=None, now=False):
""" """
A wrapper for send_raw for sending stanza objects. A wrapper for send_raw for sending stanza objects.
@ -671,10 +674,13 @@ class XMLStream(object):
or a timeout occurs. or a timeout occurs.
timeout -- Time in seconds to wait for a response before timeout -- Time in seconds to wait for a response before
continuing. Defaults to RESPONSE_TIMEOUT. continuing. Defaults to RESPONSE_TIMEOUT.
now -- Indicates if the send queue should be skipped,
sending the stanza immediately. Useful mainly
for stream initialization stanzas.
Defaults to False.
""" """
if timeout is None: if timeout is None:
timeout = self.response_timeout timeout = self.response_timeout
if hasattr(mask, 'xml'): if hasattr(mask, 'xml'):
mask = mask.xml mask = mask.xml
data = str(data) data = str(data)
@ -683,21 +689,11 @@ class XMLStream(object):
wait_for = Waiter("SendWait_%s" % self.new_id(), wait_for = Waiter("SendWait_%s" % self.new_id(),
MatchXMLMask(mask)) MatchXMLMask(mask))
self.register_handler(wait_for) self.register_handler(wait_for)
self.send_raw(data) self.send_raw(data, now)
if mask is not None: if mask is not None:
return wait_for.wait(timeout) return wait_for.wait(timeout)
def send_raw(self, data): def send_xml(self, data, mask=None, timeout=None, now=False):
"""
Send raw data across the stream.
Arguments:
data -- Any string value.
"""
self.send_queue.put(data)
return True
def send_xml(self, data, mask=None, timeout=None):
""" """
Send an XML object on the stream, and optionally wait Send an XML object on the stream, and optionally wait
for a response. for a response.
@ -710,10 +706,39 @@ class XMLStream(object):
or a timeout occurs. or a timeout occurs.
timeout -- Time in seconds to wait for a response before timeout -- Time in seconds to wait for a response before
continuing. Defaults to RESPONSE_TIMEOUT. continuing. Defaults to RESPONSE_TIMEOUT.
now -- Indicates if the send queue should be skipped,
sending the stanza immediately. Useful mainly
for stream initialization stanzas.
Defaults to False.
""" """
if timeout is None: if timeout is None:
timeout = self.response_timeout timeout = self.response_timeout
return self.send(tostring(data), mask, timeout) return self.send(tostring(data), mask, timeout, now)
def send_raw(self, data, now=False, reconnect=None):
"""
Send raw data across the stream.
Arguments:
data -- Any string value.
reconnect -- Indicates if the stream should be
restarted if there is an error sending
the stanza. Used mainly for testing.
Defaults to self.auto_reconnect.
"""
if now:
log.debug("SEND: %s" % data)
try:
self.socket.send(data.encode('utf-8'))
except Socket.error as serr:
self.event('socket_error', serr)
log.warning("Failed to send %s" % data)
if reconnect is None:
reconnect = self.auto_reconnect
self.disconnect(reconnect)
else:
self.send_queue.put(data)
return True
def process(self, threaded=True): def process(self, threaded=True):
""" """
@ -767,7 +792,7 @@ class XMLStream(object):
firstrun = False firstrun = False
try: try:
if self.is_client: if self.is_client:
self.send_raw(self.stream_header) self.send_raw(self.stream_header, now=True)
# The call to self.__read_xml will block and prevent # The call to self.__read_xml will block and prevent
# the body of the loop from running until a disconnect # the body of the loop from running until a disconnect
# occurs. After any reconnection, the stream header will # occurs. After any reconnection, the stream header will
@ -776,7 +801,7 @@ class XMLStream(object):
# Ensure the stream header is sent for any # Ensure the stream header is sent for any
# new connections. # new connections.
if self.is_client: if self.is_client:
self.send_raw(self.stream_header) self.send_raw(self.stream_header, now=True)
except KeyboardInterrupt: except KeyboardInterrupt:
log.debug("Keyboard Escape Detected in _process") log.debug("Keyboard Escape Detected in _process")
self.stop.set() self.stop.set()
@ -985,6 +1010,7 @@ class XMLStream(object):
""" """
try: try:
while not self.stop.isSet(): while not self.stop.isSet():
self.session_started_event.wait()
try: try:
data = self.send_queue.get(True, 1) data = self.send_queue.get(True, 1)
except queue.Empty: except queue.Empty: