Added option for iq.send to accept a callhandler.

The callback will be a stream level handler, and will not
execute in its own thread. If you must have a thread, have the
callback function raise a custom event, which can be processed
by another event handler, which may run in an individual thread,
like so:

def handle_reply(self, iq):
    self.event('custom_event', iq)

def do_long_operation_in_thread(self, iq):
    ...

self.add_event_handler('custom_event', self.do_long_operation_in_thread)

...take out already prepared iq stanza...
iq.send(callback=self.handle_reply)
This commit is contained in:
Lance Stout 2010-12-07 17:19:39 -05:00
parent 8ead33fc3b
commit 5f2fc67c40
3 changed files with 70 additions and 13 deletions

View file

@ -9,7 +9,7 @@
from sleekxmpp.stanza import Error
from sleekxmpp.stanza.rootstanza import RootStanza
from sleekxmpp.xmlstream import StanzaBase, ET
from sleekxmpp.xmlstream.handler import Waiter
from sleekxmpp.xmlstream.handler import Waiter, Callback
from sleekxmpp.xmlstream.matcher import MatcherId
@ -157,28 +157,43 @@ class Iq(RootStanza):
StanzaBase.reply(self)
return self
def send(self, block=True, timeout=None):
def send(self, block=True, timeout=None, callback=None):
"""
Send an <iq> stanza over the XML stream.
The send call can optionally block until a response is received or
a timeout occurs. Be aware that using blocking in non-threaded event
handlers can drastically impact performance.
handlers can drastically impact performance. Otherwise, a callback
handler can be provided that will be executed when the Iq stanza's
result reply is received. Be aware though that that the callback
handler will not be executed in its own thread.
Using both block and callback is not recommended, and only the
callback argument will be used in that case.
Overrides StanzaBase.send
Arguments:
block -- Specify if the send call will block until a response
is received, or a timeout occurs. Defaults to True.
timeout -- The length of time (in seconds) to wait for a response
before exiting the send call if blocking is used.
Defaults to sleekxmpp.xmlstream.RESPONSE_TIMEOUT
block -- Specify if the send call will block until a response
is received, or a timeout occurs. Defaults to True.
timeout -- The length of time (in seconds) to wait for a response
before exiting the send call if blocking is used.
Defaults to sleekxmpp.xmlstream.RESPONSE_TIMEOUT
callback -- Optional reference to a stream handler function. Will
be executed when a reply stanza is received.
"""
if timeout is None:
timeout = self.stream.response_timeout
if block and self['type'] in ('get', 'set'):
if callback is not None and self['type'] in ('get', 'set'):
handler = Callback('IqCallback_%s' % self['id'],
MatcherId(self['id']),
callback,
once=True)
self.stream.register_handler(handler)
return None
elif block and self['type'] in ('get', 'set'):
waitfor = Waiter('IqWait_%s' % self['id'], MatcherId(self['id']))
self.stream.registerHandler(waitfor)
self.stream.register_handler(waitfor)
StanzaBase.send(self)
return waitfor.wait(timeout)
else:

View file

@ -52,6 +52,10 @@ class SleekTest(unittest.TestCase):
compare -- Compare XML objects against each other.
"""
def __init__(self, *args, **kwargs):
unittest.TestCase.__init__(self, *args, **kwargs)
self.xmpp = None
def runTest(self):
pass
@ -86,7 +90,7 @@ class SleekTest(unittest.TestCase):
Arguments:
xml -- An XML object to use for the Message's values.
"""
return Message(None, *args, **kwargs)
return Message(self.xmpp, *args, **kwargs)
def Iq(self, *args, **kwargs):
"""
@ -97,7 +101,7 @@ class SleekTest(unittest.TestCase):
Arguments:
xml -- An XML object to use for the Iq's values.
"""
return Iq(None, *args, **kwargs)
return Iq(self.xmpp, *args, **kwargs)
def Presence(self, *args, **kwargs):
"""
@ -108,7 +112,7 @@ class SleekTest(unittest.TestCase):
Arguments:
xml -- An XML object to use for the Iq's values.
"""
return Presence(None, *args, **kwargs)
return Presence(self.xmpp, *args, **kwargs)
def check_jid(self, jid, user=None, domain=None, resource=None,
bare=None, full=None, string=None):

View file

@ -1,3 +1,5 @@
import time
from sleekxmpp.test import *
from sleekxmpp.xmlstream.handler import *
from sleekxmpp.xmlstream.matcher import *
@ -108,5 +110,41 @@ class TestHandlers(SleekTest):
self.failUnless(waiter_exists == False,
"Waiter handler was not removed.")
def testIqCallback(self):
"""Test that iq.send(callback=handle_foo) works."""
events = []
def handle_foo(iq):
events.append('foo')
iq = self.Iq()
iq['type'] = 'get'
iq['id'] = 'test-foo'
iq['to'] = 'user@localhost'
iq['query'] = 'foo'
iq.send(callback=handle_foo)
self.send("""
<iq type="get" id="test-foo" to="user@localhost">
<query xmlns="foo" />
</iq>
""")
self.recv("""
<iq type="result" id="test-foo"
to="test@localhost"
from="user@localhost">
<query xmlns="foo">
<data />
</query>
</iq>
""")
# Give event queue time to process
time.sleep(0.1)
self.failUnless(events == ['foo'],
"Iq callback was not executed: %s" % events)
suite = unittest.TestLoader().loadTestsFromTestCase(TestHandlers)