From 9a34c9a9a1cb4ee6e34089f9e2aa204f7ed8c0c0 Mon Sep 17 00:00:00 2001 From: Lance Stout Date: Fri, 1 Oct 2010 13:49:58 -0400 Subject: [PATCH] Modified event handling to use the event queue. Updated tests to match. (Needed to add a small wait to make sure the event got through the queue before checking the results.) --- sleekxmpp/xmlstream/xmlstream.py | 39 +++++++++----- tests/test_events.py | 90 +++++++++++++++++++++++--------- 2 files changed, 91 insertions(+), 38 deletions(-) diff --git a/sleekxmpp/xmlstream/xmlstream.py b/sleekxmpp/xmlstream/xmlstream.py index 401865c..948b19d 100644 --- a/sleekxmpp/xmlstream/xmlstream.py +++ b/sleekxmpp/xmlstream/xmlstream.py @@ -398,6 +398,11 @@ class XMLStream(object): stream processing and not during normal event processing. """ + # To prevent circular dependencies, we must load the matcher + # and handler classes here. + from sleekxmpp.xmlstream.matcher import MatchXMLMask + from sleekxmpp.xmlstream.handler import XMLCallback + if name is None: name = 'add_handler_%s' % self.getNewId() self.registerHandler(XMLCallback(name, MatchXMLMask(mask), pointer, @@ -478,19 +483,13 @@ class XMLStream(object): Defaults to an empty dictionary. """ for handler in self.__event_handlers.get(name, []): - func, threaded, disposable = handler - - handler_data = copy.copy(data) - if threaded: - x = threading.Thread(name="Event_%s" % str(func), - target=func, - args=(handler_data,)) - x.start() - else: - func(handler_data) - if disposable: + self.event_queue.put(('event', handler, copy.copy(data))) + if handler[2]: + # If the handler is disposable, we will go ahead and + # remove it now instead of waiting for it to be + # processed in the queue. with self.__event_handlers_lock: - handler_index = self.event_handlers[name].index(handler) + handler_index = self.__event_handlers[name].index(handler) self.__event_handlers[name].pop(handler_index) def schedule(self, name, seconds, callback, args=None, @@ -522,7 +521,7 @@ class XMLStream(object): """ return xml - def send(self, data, mask, timeout=RESPONSE_TIMEOUT): + def send(self, data, mask=None, timeout=RESPONSE_TIMEOUT): """ A wrapper for send_raw for sending stanza objects. @@ -772,7 +771,7 @@ class XMLStream(object): try: handler.run(args[0]) except Exception as e: - error_msg = 'Error processing event handler: %s' + error_msg = 'Error processing stream handler: %s' logging.exception(error_msg % handler.name) args[0].exception(e) elif etype == 'schedule': @@ -781,6 +780,18 @@ class XMLStream(object): handler(*args[0]) except: logging.exception('Error processing scheduled task') + elif etype == 'event': + func, threaded, disposable = handler + try: + if threaded: + x = threading.Thread(name="Event_%s" % str(func), + target=func, + args=args) + x.start() + else: + func(*args) + except: + logging.exception('Error processing event handler: %s') elif etype == 'quit': logging.debug("Quitting event runner thread") return False diff --git a/tests/test_events.py b/tests/test_events.py index bbc5832..df36969 100644 --- a/tests/test_events.py +++ b/tests/test_events.py @@ -1,31 +1,73 @@ import sleekxmpp +import time from . sleektest import * class TestEvents(SleekTest): - - def testEventHappening(self): - "Test handler working" - c = sleekxmpp.ClientXMPP('crap@wherever', 'password') - happened = [] - def handletestevent(event): - happened.append(True) - c.add_event_handler("test_event", handletestevent) - c.event("test_event", {}) - c.event("test_event", {}) - self.failUnless(happened == [True, True], "event did not get triggered twice") - - def testDelEvent(self): - "Test handler working, then deleted and not triggered" - c = sleekxmpp.ClientXMPP('crap@wherever', 'password') - happened = [] - def handletestevent(event): - happened.append(True) - c.add_event_handler("test_event", handletestevent) - c.event("test_event", {}) - c.del_event_handler("test_event", handletestevent) - c.event("test_event", {}) # should not trigger because it was deleted - self.failUnless(happened == [True], "event did not get triggered the correct number of times") - + + def setUp(self): + self.streamStart() + + def tearDown(self): + self.streamClose() + + def testEventHappening(self): + """Test handler working""" + happened = [] + + def handletestevent(event): + happened.append(True) + + self.xmpp.add_event_handler("test_event", handletestevent) + self.xmpp.event("test_event") + self.xmpp.event("test_event") + + # Give the event queue time to process. + time.sleep(0.1) + + msg = "Event was not triggered the correct number of times: %s" + self.failUnless(happened == [True, True], msg) + + def testDelEvent(self): + """Test handler working, then deleted and not triggered""" + happened = [] + + def handletestevent(event): + happened.append(True) + + self.xmpp.add_event_handler("test_event", handletestevent) + self.xmpp.event("test_event", {}) + + self.xmpp.del_event_handler("test_event", handletestevent) + + # Should not trigger because it was deleted + self.xmpp.event("test_event", {}) + + # Give the event queue time to process. + time.sleep(0.1) + + msg = "Event was not triggered the correct number of times: %s" + self.failUnless(happened == [True], msg % happened) + + def testDisposableEvent(self): + """Test disposable handler working, then not being triggered again.""" + happened = [] + + def handletestevent(event): + happened.append(True) + + self.xmpp.add_event_handler("test_event", handletestevent, + disposable=True) + self.xmpp.event("test_event", {}) + + # Should not trigger because it was deleted + self.xmpp.event("test_event", {}) + + # Give the event queue time to process. + time.sleep(0.1) + + msg = "Event was not triggered the correct number of times: %s" + self.failUnless(happened == [True], msg % happened) + suite = unittest.TestLoader().loadTestsFromTestCase(TestEvents)