Updated the suite of handler classes with documentation.

Updated XMLStream to return True or False from removeHandler to indicate if the handler
existed and was removed.

Waiter handlers now unregister themselves after timing out.
This commit is contained in:
Lance Stout 2010-08-27 16:42:26 -04:00
parent 906aa0bd68
commit 89fb15e896
9 changed files with 419 additions and 98 deletions

View file

@ -37,7 +37,7 @@ except ImportError:
#class PresenceStanzaType(object):
#
#
# def fromXML(self, xml):
# self.ptype = xml.get('type')
@ -69,24 +69,24 @@ class ClientXMPP(basexmpp, XMLStream):
self.bound = False
self.bindfail = False
self.is_component = False
self.registerHandler(Callback('Stream Features', MatchXPath('{http://etherx.jabber.org/streams}features'), self._handleStreamFeatures, thread=True))
self.registerHandler(Callback('Roster Update', MatchXPath('{%s}iq/{jabber:iq:roster}query' % self.default_ns), self._handleRoster, thread=True))
self.registerHandler(Callback('Stream Features', MatchXPath('{http://etherx.jabber.org/streams}features'), self._handleStreamFeatures))
self.registerHandler(Callback('Roster Update', MatchXPath('{%s}iq/{jabber:iq:roster}query' % self.default_ns), self._handleRoster))
#self.registerHandler(Callback('Roster Update', MatchXMLMask("<presence xmlns='%s' type='subscribe' />" % self.default_ns), self._handlePresenceSubscribe, thread=True))
self.registerFeature("<starttls xmlns='urn:ietf:params:xml:ns:xmpp-tls' />", self.handler_starttls, True)
self.registerFeature("<mechanisms xmlns='urn:ietf:params:xml:ns:xmpp-sasl' />", self.handler_sasl_auth, True)
self.registerFeature("<bind xmlns='urn:ietf:params:xml:ns:xmpp-bind' />", self.handler_bind_resource)
self.registerFeature("<session xmlns='urn:ietf:params:xml:ns:xmpp-session' />", self.handler_start_session)
#self.registerStanzaExtension('PresenceStanza', PresenceStanzaType)
#self.register_plugins()
def __getitem__(self, key):
if key in self.plugin:
return self.plugin[key]
else:
logging.warning("""Plugin "%s" is not loaded.""" % key)
return False
def get(self, key, default):
return self.plugin.get(key, default)
@ -104,7 +104,7 @@ class ClientXMPP(basexmpp, XMLStream):
logging.debug("No appropriate SRV record found. Using JID server name.")
else:
# pick a random answer, weighted by priority
# there are less verbose ways of doing this (random.choice() with answer * priority), but I chose this way anyway
# there are less verbose ways of doing this (random.choice() with answer * priority), but I chose this way anyway
# suggestions are welcome
addresses = {}
intmax = 0
@ -128,18 +128,18 @@ class ClientXMPP(basexmpp, XMLStream):
logging.warning("Failed to connect")
self.event("disconnected")
return result
# overriding reconnect and disconnect so that we can get some events
# should events be part of or required by xmlstream? Maybe that would be cleaner
def reconnect(self):
logging.info("Reconnecting")
self.event("disconnected")
XMLStream.reconnect(self)
def disconnect(self, init=True, close=False, reconnect=False):
self.event("disconnected")
XMLStream.disconnect(self, reconnect)
def registerFeature(self, mask, pointer, breaker = False):
"""Register a stream feature."""
self.registered_features.append((MatchXMLMask(mask), pointer, breaker))
@ -157,12 +157,12 @@ class ClientXMPP(basexmpp, XMLStream):
iq['type'] = 'set'
iq['roster']['items'] = {jid: {'subscription': 'remove'}}
return iq.send()['type'] == 'result'
def getRoster(self):
"""Request the roster be sent."""
iq = self.Iq().setStanzaValues({'type': 'get'}).enable('roster').send()
self._handleRoster(iq, request=True)
def _handleStreamFeatures(self, features):
self.features = []
for sub in features.xml:
@ -173,7 +173,7 @@ class ClientXMPP(basexmpp, XMLStream):
#if self.maskcmp(subelement, feature[0], True):
if feature[1](subelement) and feature[2]: #if breaker, don't continue
return True
def handler_starttls(self, xml):
if not self.authenticated and self.ssl_support:
self.add_handler("<proceed xmlns='urn:ietf:params:xml:ns:xmpp-tls' />", self.handler_tls_start, name='TLS Proceed', instream=True)
@ -187,7 +187,7 @@ class ClientXMPP(basexmpp, XMLStream):
logging.debug("Starting TLS")
if self.startTLS():
raise RestartStream()
def handler_sasl_auth(self, xml):
if '{urn:ietf:params:xml:ns:xmpp-tls}starttls' in self.features:
return False
@ -209,7 +209,7 @@ class ClientXMPP(basexmpp, XMLStream):
#if 'sasl:DIGEST-MD5' in self.features:
# self._auth_digestmd5()
return True
def handler_auth_success(self, xml):
self.authenticated = True
self.features = []
@ -219,7 +219,7 @@ class ClientXMPP(basexmpp, XMLStream):
logging.info("Authentication failed.")
self.disconnect()
self.event("failed_auth")
def handler_bind_resource(self, xml):
logging.debug("Requesting resource: %s" % self.resource)
xml.clear()
@ -238,7 +238,7 @@ class ClientXMPP(basexmpp, XMLStream):
logging.debug("Established Session")
self.sessionstarted = True
self.event("session_start")
def handler_start_session(self, xml):
if self.authenticated and self.bound:
iq = self.makeIqSet(xml)
@ -249,7 +249,7 @@ class ClientXMPP(basexmpp, XMLStream):
else:
#bind probably hasn't happened yet
self.bindfail = True
def _handleRoster(self, iq, request=False):
if iq['type'] == 'set' or (iq['type'] == 'result' and request):
for jid in iq['roster']['items']:

View file

@ -123,7 +123,7 @@ class basexmpp(object):
# threaded is no longer needed, but leaving it for backwards compatibility for now
if name is None:
name = 'add_handler_%s' % self.getNewId()
self.registerHandler(XMLCallback(name, MatchXMLMask(mask), pointer, threaded, disposable, instream))
self.registerHandler(XMLCallback(name, MatchXMLMask(mask), pointer, once=disposable, instream=instream))
def getId(self):
return "%x".upper() % self.id

View file

@ -6,23 +6,82 @@
See the file LICENSE for copying permission.
"""
class BaseHandler(object):
"""
Base class for stream handlers. Stream handlers are matched with
incoming stanzas so that the stanza may be processed in some way.
Stanzas may be matched with multiple handlers.
def __init__(self, name, matcher):
self.name = name
self._destroy = False
self._payload = None
self._matcher = matcher
def match(self, xml):
return self._matcher.match(xml)
def prerun(self, payload):
self._payload = payload
Handler execution may take place in two phases. The first is during
the stream processing itself. The second is after stream processing
and during SleekXMPP's main event loop. The prerun method is used
for execution during stream processing, and the run method is used
during the main event loop.
def run(self, payload):
self._payload = payload
def checkDelete(self):
return self._destroy
Attributes:
name -- The name of the handler.
stream -- The stream this handler is assigned to.
Methods:
match -- Compare a stanza with the handler's matcher.
prerun -- Handler execution during stream processing.
run -- Handler execution during the main event loop.
checkDelete -- Indicate if the handler may be removed from use.
"""
def __init__(self, name, matcher, stream=None):
"""
Create a new stream handler.
Arguments:
name -- The name of the handler.
matcher -- A matcher object from xmlstream.matcher that will be
used to determine if a stanza should be accepted by
this handler.
stream -- The XMLStream instance the handler should monitor.
"""
self.name = name
self.stream = stream
self._destroy = False
self._payload = None
self._matcher = matcher
if stream is not None:
stream.registerHandler(self)
def match(self, xml):
"""
Compare a stanza or XML object with the handler's matcher.
Arguments
xml -- An XML or stanza object.
"""
return self._matcher.match(xml)
def prerun(self, payload):
"""
Prepare the handler for execution while the XML stream is being
processed.
Arguments:
payload -- A stanza object.
"""
self._payload = payload
def run(self, payload):
"""
Execute the handler after XML stream processing and during the
main event loop.
Arguments:
payload -- A stanza object.
"""
self._payload = payload
def checkDelete(self):
"""
Check if the handler should be removed from the list of stream
handlers.
"""
return self._destroy

View file

@ -5,30 +5,80 @@
See the file LICENSE for copying permission.
"""
from . import base
import logging
class Callback(base.BaseHandler):
def __init__(self, name, matcher, pointer, thread=False, once=False, instream=False):
base.BaseHandler.__init__(self, name, matcher)
self._pointer = pointer
self._thread = thread
self._once = once
self._instream = instream
from sleekxmpp.xmlstream.handler.base import BaseHandler
def prerun(self, payload):
base.BaseHandler.prerun(self, payload)
if self._instream:
self.run(payload, True)
def run(self, payload, instream=False):
if not self._instream or instream:
base.BaseHandler.run(self, payload)
#if self._thread:
# x = threading.Thread(name="Callback_%s" % self.name, target=self._pointer, args=(payload,))
# x.start()
#else:
self._pointer(payload)
if self._once:
self._destroy = True
class Callback(BaseHandler):
"""
The Callback handler will execute a callback function with
matched stanzas.
The handler may execute the callback either during stream
processing or during the main event loop.
Callback functions are all executed in the same thread, so be
aware if you are executing functions that will block for extended
periods of time. Typically, you should signal your own events using the
SleekXMPP object's event() method to pass the stanza off to a threaded
event handler for further processing.
Methods:
prerun -- Overrides BaseHandler.prerun
run -- Overrides BaseHandler.run
"""
def __init__(self, name, matcher, pointer, thread=False,
once=False, instream=False, stream=None):
"""
Create a new callback handler.
Arguments:
name -- The name of the handler.
matcher -- A matcher object for matching stanza objects.
pointer -- The function to execute during callback.
threaded -- DEPRECATED. Remains only for backwards compatibility.
once -- Indicates if the handler should be used only
once. Defaults to False.
instream -- Indicates if the callback should be executed
during stream processing instead of in the
main event loop.
stream -- The XMLStream instance this handler should monitor.
"""
BaseHandler.__init__(self, name, matcher, stream)
self._pointer = pointer
self._once = once
self._instream = instream
def prerun(self, payload):
"""
Execute the callback during stream processing, if
the callback was created with instream=True.
Overrides BaseHandler.prerun
Arguments:
payload -- The matched stanza object.
"""
BaseHandler.prerun(self, payload)
if self._instream:
self.run(payload, True)
def run(self, payload, instream=False):
"""
Execute the callback function with the matched stanza payload.
Overrides BaseHandler.run
Arguments:
payload -- The matched stanza object.
instream -- Force the handler to execute during
stream processing. Used only by prerun.
Defaults to False.
"""
if not self._instream or instream:
BaseHandler.run(self, payload)
self._pointer(payload)
if self._once:
self._destroy = True

View file

@ -5,32 +5,86 @@
See the file LICENSE for copying permission.
"""
from . import base
try:
import queue
except ImportError:
import Queue as queue
import logging
from .. stanzabase import StanzaBase
try:
import queue
except ImportError:
import Queue as queue
class Waiter(base.BaseHandler):
def __init__(self, name, matcher):
base.BaseHandler.__init__(self, name, matcher)
self._payload = queue.Queue()
def prerun(self, payload):
self._payload.put(payload)
def run(self, payload):
pass
from sleekxmpp.xmlstream import StanzaBase, RESPONSE_TIMEOUT
from sleekxmpp.xmlstream.handler.base import BaseHandler
def wait(self, timeout=60):
try:
return self._payload.get(True, timeout)
except queue.Empty:
logging.warning("Timed out waiting for %s" % self.name)
return False
def checkDelete(self):
return True
class Waiter(BaseHandler):
"""
The Waiter handler allows an event handler to block
until a particular stanza has been received. The handler
will either be given the matched stanza, or False if the
waiter has timed out.
Methods:
checkDelete -- Overrides BaseHandler.checkDelete
prerun -- Overrides BaseHandler.prerun
run -- Overrides BaseHandler.run
wait -- Wait for a stanza to arrive and return it to
an event handler.
"""
def __init__(self, name, matcher, stream=None):
BaseHandler.__init__(self, name, matcher)
self._payload = queue.Queue()
def prerun(self, payload):
"""
Store the matched stanza.
Overrides BaseHandler.prerun
Arguments:
payload -- The matched stanza object.
"""
self._payload.put(payload)
def run(self, payload):
"""
Do not process this handler during the main event loop.
Overrides BaseHandler.run
Arguments:
payload -- The matched stanza object.
"""
pass
def wait(self, timeout=RESPONSE_TIMEOUT):
"""
Block an event handler while waiting for a stanza to arrive.
Be aware that this will impact performance if called from a
non-threaded event handler.
Will return either the received stanza, or False if the waiter
timed out.
Arguments:
timeout -- The number of seconds to wait for the stanza to
arrive. Defaults to the global default timeout
value sleekxmpp.xmlstream.RESPONSE_TIMEOUT.
"""
try:
stanza = self._payload.get(True, timeout)
except queue.Empty:
stanza = False
logging.warning("Timed out waiting for %s" % self.name)
self.stream.removeHandler(self.name)
return stanza
def checkDelete(self):
"""
Always remove waiters after use.
Overrides BaseHandler.checkDelete
"""
return True

View file

@ -5,10 +5,32 @@
See the file LICENSE for copying permission.
"""
import threading
from . callback import Callback
from sleekxmpp.xmlstream.handler import Callback
class XMLCallback(Callback):
def run(self, payload, instream=False):
Callback.run(self, payload.xml, instream)
"""
The XMLCallback class is identical to the normal Callback class,
except that XML contents of matched stanzas will be processed instead
of the stanza objects themselves.
Methods:
run -- Overrides Callback.run
"""
def run(self, payload, instream=False):
"""
Execute the callback function with the matched stanza's
XML contents, instead of the stanza itself.
Overrides BaseHandler.run
Arguments:
payload -- The matched stanza object.
instream -- Force the handler to execute during
stream processing. Used only by prerun.
Defaults to False.
"""
Callback.run(self, payload.xml, instream)

View file

@ -5,9 +5,29 @@
See the file LICENSE for copying permission.
"""
from . waiter import Waiter
from sleekxmpp.xmlstream.handler import Waiter
class XMLWaiter(Waiter):
def prerun(self, payload):
Waiter.prerun(self, payload.xml)
"""
The XMLWaiter class is identical to the normal Waiter class
except that it returns the XML contents of the stanza instead
of the full stanza object itself.
Methods:
prerun -- Overrides Waiter.prerun
"""
def prerun(self, payload):
"""
Store the XML contents of the stanza to return to the
waiting event handler.
Overrides Waiter.prerun
Arguments:
payload -- The matched stanza object.
"""
Waiter.prerun(self, payload.xml)

View file

@ -357,8 +357,10 @@ class XMLStream(object):
return False
def registerHandler(self, handler, before=None, after=None):
"Add handler with matcher class and parameters."
self.__handlers.append(handler)
"Add handler with matcher class and parameters."
if handler.stream is None:
self.__handlers.append(handler)
handler.stream = self
def removeHandler(self, name):
"Removes the handler."
@ -366,8 +368,10 @@ class XMLStream(object):
for handler in self.__handlers:
if handler.name == name:
self.__handlers.pop(idx)
return
return True
idx += 1
return False
def registerStanza(self, stanza_class):
"Adds stanza. If root stanzas build stanzas sent in events while non-root stanzas build substanza objects."

112
tests/test_handlers.py Normal file
View file

@ -0,0 +1,112 @@
from . sleektest import *
import sleekxmpp
from sleekxmpp.xmlstream.handler import *
from sleekxmpp.xmlstream.matcher import *
class TestHandlers(SleekTest):
"""
Test that we can simulate and test a stanza stream.
"""
def setUp(self):
self.streamStart()
def tearDown(self):
self.streamClose()
def testCallback(self):
"""Test using stream callback handlers."""
def callback_handler(stanza):
self.xmpp.sendRaw("""
<message>
<body>Success!</body>
</message>
""")
callback = Callback('Test Callback',
MatchXPath('{test}tester'),
callback_handler)
self.xmpp.registerHandler(callback)
self.streamRecv("""<tester xmlns="test" />""")
msg = self.Message()
msg['body'] = 'Success!'
self.streamSendMessage(msg)
def testWaiter(self):
"""Test using stream waiter handler."""
def waiter_handler(stanza):
iq = self.xmpp.Iq()
iq['id'] = 'test'
iq['type'] = 'set'
iq['query'] = 'test'
reply = iq.send(block=True)
if reply:
self.xmpp.sendRaw("""
<message>
<body>Successful: %s</body>
</message>
""" % reply['query'])
self.xmpp.add_event_handler('message', waiter_handler, threaded=True)
# Send message to trigger waiter_handler
self.streamRecv("""
<message>
<body>Testing</body>
</message>
""")
# Check that Iq was sent by waiter_handler
iq = self.Iq()
iq['id'] = 'test'
iq['type'] = 'set'
iq['query'] = 'test'
self.streamSendIq(iq)
# Send the reply Iq
self.streamRecv("""
<iq id="test" type="result">
<query xmlns="test" />
</iq>
""")
# Check that waiter_handler received the reply
msg = self.Message()
msg['body'] = 'Successful: test'
self.streamSendMessage(msg)
def testWaiterTimeout(self):
"""Test that waiter handler is removed after timeout."""
def waiter_handler(stanza):
iq = self.xmpp.Iq()
iq['id'] = 'test2'
iq['type'] = 'set'
iq['query'] = 'test2'
reply = iq.send(block=True, timeout=0)
self.xmpp.add_event_handler('message', waiter_handler, threaded=True)
# Start test by triggerig waiter_handler
self.streamRecv("""<message><body>Start Test</body></message>""")
# Check that Iq was sent to trigger start of timeout period
iq = self.Iq()
iq['id'] = 'test2'
iq['type'] = 'set'
iq['query'] = 'test2'
self.streamSendIq(iq)
# Check that the waiter is no longer registered
waiter_exists = self.xmpp.removeHandler('IqWait_test2')
self.failUnless(waiter_exists == False,
"Waiter handler was not removed.")
suite = unittest.TestLoader().loadTestsFromTestCase(TestHandlers)