diff --git a/sleekxmpp/__init__.py b/sleekxmpp/__init__.py index d2f5765..afb7d9d 100644 --- a/sleekxmpp/__init__.py +++ b/sleekxmpp/__init__.py @@ -37,7 +37,7 @@ except ImportError: #class PresenceStanzaType(object): -# +# # def fromXML(self, xml): # self.ptype = xml.get('type') @@ -69,24 +69,24 @@ class ClientXMPP(basexmpp, XMLStream): self.bound = False self.bindfail = False self.is_component = False - self.registerHandler(Callback('Stream Features', MatchXPath('{http://etherx.jabber.org/streams}features'), self._handleStreamFeatures, thread=True)) - self.registerHandler(Callback('Roster Update', MatchXPath('{%s}iq/{jabber:iq:roster}query' % self.default_ns), self._handleRoster, thread=True)) + self.registerHandler(Callback('Stream Features', MatchXPath('{http://etherx.jabber.org/streams}features'), self._handleStreamFeatures)) + self.registerHandler(Callback('Roster Update', MatchXPath('{%s}iq/{jabber:iq:roster}query' % self.default_ns), self._handleRoster)) #self.registerHandler(Callback('Roster Update', MatchXMLMask("" % self.default_ns), self._handlePresenceSubscribe, thread=True)) self.registerFeature("", self.handler_starttls, True) self.registerFeature("", self.handler_sasl_auth, True) self.registerFeature("", self.handler_bind_resource) self.registerFeature("", self.handler_start_session) - + #self.registerStanzaExtension('PresenceStanza', PresenceStanzaType) #self.register_plugins() - + def __getitem__(self, key): if key in self.plugin: return self.plugin[key] else: logging.warning("""Plugin "%s" is not loaded.""" % key) return False - + def get(self, key, default): return self.plugin.get(key, default) @@ -104,7 +104,7 @@ class ClientXMPP(basexmpp, XMLStream): logging.debug("No appropriate SRV record found. Using JID server name.") else: # pick a random answer, weighted by priority - # there are less verbose ways of doing this (random.choice() with answer * priority), but I chose this way anyway + # there are less verbose ways of doing this (random.choice() with answer * priority), but I chose this way anyway # suggestions are welcome addresses = {} intmax = 0 @@ -128,18 +128,18 @@ class ClientXMPP(basexmpp, XMLStream): logging.warning("Failed to connect") self.event("disconnected") return result - + # overriding reconnect and disconnect so that we can get some events # should events be part of or required by xmlstream? Maybe that would be cleaner def reconnect(self): logging.info("Reconnecting") self.event("disconnected") XMLStream.reconnect(self) - + def disconnect(self, init=True, close=False, reconnect=False): self.event("disconnected") XMLStream.disconnect(self, reconnect) - + def registerFeature(self, mask, pointer, breaker = False): """Register a stream feature.""" self.registered_features.append((MatchXMLMask(mask), pointer, breaker)) @@ -157,12 +157,12 @@ class ClientXMPP(basexmpp, XMLStream): iq['type'] = 'set' iq['roster']['items'] = {jid: {'subscription': 'remove'}} return iq.send()['type'] == 'result' - + def getRoster(self): """Request the roster be sent.""" iq = self.Iq().setStanzaValues({'type': 'get'}).enable('roster').send() self._handleRoster(iq, request=True) - + def _handleStreamFeatures(self, features): self.features = [] for sub in features.xml: @@ -173,7 +173,7 @@ class ClientXMPP(basexmpp, XMLStream): #if self.maskcmp(subelement, feature[0], True): if feature[1](subelement) and feature[2]: #if breaker, don't continue return True - + def handler_starttls(self, xml): if not self.authenticated and self.ssl_support: self.add_handler("", self.handler_tls_start, name='TLS Proceed', instream=True) @@ -187,7 +187,7 @@ class ClientXMPP(basexmpp, XMLStream): logging.debug("Starting TLS") if self.startTLS(): raise RestartStream() - + def handler_sasl_auth(self, xml): if '{urn:ietf:params:xml:ns:xmpp-tls}starttls' in self.features: return False @@ -209,7 +209,7 @@ class ClientXMPP(basexmpp, XMLStream): #if 'sasl:DIGEST-MD5' in self.features: # self._auth_digestmd5() return True - + def handler_auth_success(self, xml): self.authenticated = True self.features = [] @@ -219,7 +219,7 @@ class ClientXMPP(basexmpp, XMLStream): logging.info("Authentication failed.") self.disconnect() self.event("failed_auth") - + def handler_bind_resource(self, xml): logging.debug("Requesting resource: %s" % self.resource) xml.clear() @@ -238,7 +238,7 @@ class ClientXMPP(basexmpp, XMLStream): logging.debug("Established Session") self.sessionstarted = True self.event("session_start") - + def handler_start_session(self, xml): if self.authenticated and self.bound: iq = self.makeIqSet(xml) @@ -249,7 +249,7 @@ class ClientXMPP(basexmpp, XMLStream): else: #bind probably hasn't happened yet self.bindfail = True - + def _handleRoster(self, iq, request=False): if iq['type'] == 'set' or (iq['type'] == 'result' and request): for jid in iq['roster']['items']: diff --git a/sleekxmpp/basexmpp.py b/sleekxmpp/basexmpp.py index b7b605b..f83fc06 100644 --- a/sleekxmpp/basexmpp.py +++ b/sleekxmpp/basexmpp.py @@ -123,7 +123,7 @@ class basexmpp(object): # threaded is no longer needed, but leaving it for backwards compatibility for now if name is None: name = 'add_handler_%s' % self.getNewId() - self.registerHandler(XMLCallback(name, MatchXMLMask(mask), pointer, threaded, disposable, instream)) + self.registerHandler(XMLCallback(name, MatchXMLMask(mask), pointer, once=disposable, instream=instream)) def getId(self): return "%x".upper() % self.id diff --git a/sleekxmpp/xmlstream/handler/base.py b/sleekxmpp/xmlstream/handler/base.py index 720846d..3ae82a8 100644 --- a/sleekxmpp/xmlstream/handler/base.py +++ b/sleekxmpp/xmlstream/handler/base.py @@ -6,23 +6,82 @@ See the file LICENSE for copying permission. """ + class BaseHandler(object): + """ + Base class for stream handlers. Stream handlers are matched with + incoming stanzas so that the stanza may be processed in some way. + Stanzas may be matched with multiple handlers. - def __init__(self, name, matcher): - self.name = name - self._destroy = False - self._payload = None - self._matcher = matcher - - def match(self, xml): - return self._matcher.match(xml) - - def prerun(self, payload): - self._payload = payload + Handler execution may take place in two phases. The first is during + the stream processing itself. The second is after stream processing + and during SleekXMPP's main event loop. The prerun method is used + for execution during stream processing, and the run method is used + during the main event loop. - def run(self, payload): - self._payload = payload - - def checkDelete(self): - return self._destroy + Attributes: + name -- The name of the handler. + stream -- The stream this handler is assigned to. + + Methods: + match -- Compare a stanza with the handler's matcher. + prerun -- Handler execution during stream processing. + run -- Handler execution during the main event loop. + checkDelete -- Indicate if the handler may be removed from use. + """ + + def __init__(self, name, matcher, stream=None): + """ + Create a new stream handler. + + Arguments: + name -- The name of the handler. + matcher -- A matcher object from xmlstream.matcher that will be + used to determine if a stanza should be accepted by + this handler. + stream -- The XMLStream instance the handler should monitor. + """ + self.name = name + self.stream = stream + self._destroy = False + self._payload = None + self._matcher = matcher + if stream is not None: + stream.registerHandler(self) + + def match(self, xml): + """ + Compare a stanza or XML object with the handler's matcher. + + Arguments + xml -- An XML or stanza object. + """ + return self._matcher.match(xml) + + def prerun(self, payload): + """ + Prepare the handler for execution while the XML stream is being + processed. + + Arguments: + payload -- A stanza object. + """ + self._payload = payload + + def run(self, payload): + """ + Execute the handler after XML stream processing and during the + main event loop. + + Arguments: + payload -- A stanza object. + """ + self._payload = payload + + def checkDelete(self): + """ + Check if the handler should be removed from the list of stream + handlers. + """ + return self._destroy diff --git a/sleekxmpp/xmlstream/handler/callback.py b/sleekxmpp/xmlstream/handler/callback.py index 889b0aa..04a4eed 100644 --- a/sleekxmpp/xmlstream/handler/callback.py +++ b/sleekxmpp/xmlstream/handler/callback.py @@ -5,30 +5,80 @@ See the file LICENSE for copying permission. """ -from . import base -import logging -class Callback(base.BaseHandler): - - def __init__(self, name, matcher, pointer, thread=False, once=False, instream=False): - base.BaseHandler.__init__(self, name, matcher) - self._pointer = pointer - self._thread = thread - self._once = once - self._instream = instream +from sleekxmpp.xmlstream.handler.base import BaseHandler - def prerun(self, payload): - base.BaseHandler.prerun(self, payload) - if self._instream: - self.run(payload, True) - - def run(self, payload, instream=False): - if not self._instream or instream: - base.BaseHandler.run(self, payload) - #if self._thread: - # x = threading.Thread(name="Callback_%s" % self.name, target=self._pointer, args=(payload,)) - # x.start() - #else: - self._pointer(payload) - if self._once: - self._destroy = True + +class Callback(BaseHandler): + + """ + The Callback handler will execute a callback function with + matched stanzas. + + The handler may execute the callback either during stream + processing or during the main event loop. + + Callback functions are all executed in the same thread, so be + aware if you are executing functions that will block for extended + periods of time. Typically, you should signal your own events using the + SleekXMPP object's event() method to pass the stanza off to a threaded + event handler for further processing. + + Methods: + prerun -- Overrides BaseHandler.prerun + run -- Overrides BaseHandler.run + """ + + def __init__(self, name, matcher, pointer, thread=False, + once=False, instream=False, stream=None): + """ + Create a new callback handler. + + Arguments: + name -- The name of the handler. + matcher -- A matcher object for matching stanza objects. + pointer -- The function to execute during callback. + threaded -- DEPRECATED. Remains only for backwards compatibility. + once -- Indicates if the handler should be used only + once. Defaults to False. + instream -- Indicates if the callback should be executed + during stream processing instead of in the + main event loop. + stream -- The XMLStream instance this handler should monitor. + """ + BaseHandler.__init__(self, name, matcher, stream) + self._pointer = pointer + self._once = once + self._instream = instream + + def prerun(self, payload): + """ + Execute the callback during stream processing, if + the callback was created with instream=True. + + Overrides BaseHandler.prerun + + Arguments: + payload -- The matched stanza object. + """ + BaseHandler.prerun(self, payload) + if self._instream: + self.run(payload, True) + + def run(self, payload, instream=False): + """ + Execute the callback function with the matched stanza payload. + + Overrides BaseHandler.run + + Arguments: + payload -- The matched stanza object. + instream -- Force the handler to execute during + stream processing. Used only by prerun. + Defaults to False. + """ + if not self._instream or instream: + BaseHandler.run(self, payload) + self._pointer(payload) + if self._once: + self._destroy = True diff --git a/sleekxmpp/xmlstream/handler/waiter.py b/sleekxmpp/xmlstream/handler/waiter.py index 7c4330a..0e5206b 100644 --- a/sleekxmpp/xmlstream/handler/waiter.py +++ b/sleekxmpp/xmlstream/handler/waiter.py @@ -5,32 +5,86 @@ See the file LICENSE for copying permission. """ -from . import base -try: - import queue -except ImportError: - import Queue as queue + import logging -from .. stanzabase import StanzaBase +try: + import queue +except ImportError: + import Queue as queue -class Waiter(base.BaseHandler): - - def __init__(self, name, matcher): - base.BaseHandler.__init__(self, name, matcher) - self._payload = queue.Queue() - - def prerun(self, payload): - self._payload.put(payload) - - def run(self, payload): - pass +from sleekxmpp.xmlstream import StanzaBase, RESPONSE_TIMEOUT +from sleekxmpp.xmlstream.handler.base import BaseHandler - def wait(self, timeout=60): - try: - return self._payload.get(True, timeout) - except queue.Empty: - logging.warning("Timed out waiting for %s" % self.name) - return False - - def checkDelete(self): - return True + +class Waiter(BaseHandler): + + """ + The Waiter handler allows an event handler to block + until a particular stanza has been received. The handler + will either be given the matched stanza, or False if the + waiter has timed out. + + Methods: + checkDelete -- Overrides BaseHandler.checkDelete + prerun -- Overrides BaseHandler.prerun + run -- Overrides BaseHandler.run + wait -- Wait for a stanza to arrive and return it to + an event handler. + """ + + def __init__(self, name, matcher, stream=None): + BaseHandler.__init__(self, name, matcher) + self._payload = queue.Queue() + + def prerun(self, payload): + """ + Store the matched stanza. + + Overrides BaseHandler.prerun + + Arguments: + payload -- The matched stanza object. + """ + self._payload.put(payload) + + def run(self, payload): + """ + Do not process this handler during the main event loop. + + Overrides BaseHandler.run + + Arguments: + payload -- The matched stanza object. + """ + pass + + def wait(self, timeout=RESPONSE_TIMEOUT): + """ + Block an event handler while waiting for a stanza to arrive. + + Be aware that this will impact performance if called from a + non-threaded event handler. + + Will return either the received stanza, or False if the waiter + timed out. + + Arguments: + timeout -- The number of seconds to wait for the stanza to + arrive. Defaults to the global default timeout + value sleekxmpp.xmlstream.RESPONSE_TIMEOUT. + """ + try: + stanza = self._payload.get(True, timeout) + except queue.Empty: + stanza = False + logging.warning("Timed out waiting for %s" % self.name) + self.stream.removeHandler(self.name) + return stanza + + def checkDelete(self): + """ + Always remove waiters after use. + + Overrides BaseHandler.checkDelete + """ + return True diff --git a/sleekxmpp/xmlstream/handler/xmlcallback.py b/sleekxmpp/xmlstream/handler/xmlcallback.py index 67879df..11607ff 100644 --- a/sleekxmpp/xmlstream/handler/xmlcallback.py +++ b/sleekxmpp/xmlstream/handler/xmlcallback.py @@ -5,10 +5,32 @@ See the file LICENSE for copying permission. """ -import threading -from . callback import Callback + +from sleekxmpp.xmlstream.handler import Callback + class XMLCallback(Callback): - - def run(self, payload, instream=False): - Callback.run(self, payload.xml, instream) + + """ + The XMLCallback class is identical to the normal Callback class, + except that XML contents of matched stanzas will be processed instead + of the stanza objects themselves. + + Methods: + run -- Overrides Callback.run + """ + + def run(self, payload, instream=False): + """ + Execute the callback function with the matched stanza's + XML contents, instead of the stanza itself. + + Overrides BaseHandler.run + + Arguments: + payload -- The matched stanza object. + instream -- Force the handler to execute during + stream processing. Used only by prerun. + Defaults to False. + """ + Callback.run(self, payload.xml, instream) diff --git a/sleekxmpp/xmlstream/handler/xmlwaiter.py b/sleekxmpp/xmlstream/handler/xmlwaiter.py index cf90751..5201caf 100644 --- a/sleekxmpp/xmlstream/handler/xmlwaiter.py +++ b/sleekxmpp/xmlstream/handler/xmlwaiter.py @@ -5,9 +5,29 @@ See the file LICENSE for copying permission. """ -from . waiter import Waiter + +from sleekxmpp.xmlstream.handler import Waiter + class XMLWaiter(Waiter): - - def prerun(self, payload): - Waiter.prerun(self, payload.xml) + + """ + The XMLWaiter class is identical to the normal Waiter class + except that it returns the XML contents of the stanza instead + of the full stanza object itself. + + Methods: + prerun -- Overrides Waiter.prerun + """ + + def prerun(self, payload): + """ + Store the XML contents of the stanza to return to the + waiting event handler. + + Overrides Waiter.prerun + + Arguments: + payload -- The matched stanza object. + """ + Waiter.prerun(self, payload.xml) diff --git a/sleekxmpp/xmlstream/xmlstream.py b/sleekxmpp/xmlstream/xmlstream.py index 415567e..28aee2b 100644 --- a/sleekxmpp/xmlstream/xmlstream.py +++ b/sleekxmpp/xmlstream/xmlstream.py @@ -357,8 +357,10 @@ class XMLStream(object): return False def registerHandler(self, handler, before=None, after=None): - "Add handler with matcher class and parameters." - self.__handlers.append(handler) + "Add handler with matcher class and parameters." + if handler.stream is None: + self.__handlers.append(handler) + handler.stream = self def removeHandler(self, name): "Removes the handler." @@ -366,8 +368,10 @@ class XMLStream(object): for handler in self.__handlers: if handler.name == name: self.__handlers.pop(idx) - return + return True idx += 1 + return False + def registerStanza(self, stanza_class): "Adds stanza. If root stanzas build stanzas sent in events while non-root stanzas build substanza objects." diff --git a/tests/test_handlers.py b/tests/test_handlers.py new file mode 100644 index 0000000..c6262c6 --- /dev/null +++ b/tests/test_handlers.py @@ -0,0 +1,112 @@ +from . sleektest import * +import sleekxmpp +from sleekxmpp.xmlstream.handler import * +from sleekxmpp.xmlstream.matcher import * + +class TestHandlers(SleekTest): + """ + Test that we can simulate and test a stanza stream. + """ + + def setUp(self): + self.streamStart() + + def tearDown(self): + self.streamClose() + + def testCallback(self): + """Test using stream callback handlers.""" + + def callback_handler(stanza): + self.xmpp.sendRaw(""" + + Success! + + """) + + callback = Callback('Test Callback', + MatchXPath('{test}tester'), + callback_handler) + + self.xmpp.registerHandler(callback) + + self.streamRecv("""""") + + msg = self.Message() + msg['body'] = 'Success!' + self.streamSendMessage(msg) + + def testWaiter(self): + """Test using stream waiter handler.""" + + def waiter_handler(stanza): + iq = self.xmpp.Iq() + iq['id'] = 'test' + iq['type'] = 'set' + iq['query'] = 'test' + reply = iq.send(block=True) + if reply: + self.xmpp.sendRaw(""" + + Successful: %s + + """ % reply['query']) + + self.xmpp.add_event_handler('message', waiter_handler, threaded=True) + + # Send message to trigger waiter_handler + self.streamRecv(""" + + Testing + + """) + + # Check that Iq was sent by waiter_handler + iq = self.Iq() + iq['id'] = 'test' + iq['type'] = 'set' + iq['query'] = 'test' + self.streamSendIq(iq) + + # Send the reply Iq + self.streamRecv(""" + + + + """) + + # Check that waiter_handler received the reply + msg = self.Message() + msg['body'] = 'Successful: test' + self.streamSendMessage(msg) + + def testWaiterTimeout(self): + """Test that waiter handler is removed after timeout.""" + + def waiter_handler(stanza): + iq = self.xmpp.Iq() + iq['id'] = 'test2' + iq['type'] = 'set' + iq['query'] = 'test2' + reply = iq.send(block=True, timeout=0) + + self.xmpp.add_event_handler('message', waiter_handler, threaded=True) + + # Start test by triggerig waiter_handler + self.streamRecv("""Start Test""") + + # Check that Iq was sent to trigger start of timeout period + iq = self.Iq() + iq['id'] = 'test2' + iq['type'] = 'set' + iq['query'] = 'test2' + self.streamSendIq(iq) + + # Check that the waiter is no longer registered + waiter_exists = self.xmpp.removeHandler('IqWait_test2') + + self.failUnless(waiter_exists == False, + "Waiter handler was not removed.") + + +suite = unittest.TestLoader().loadTestsFromTestCase(TestHandlers)