From e1aa4d0b93f09b92506af8bb21e87c367905e194 Mon Sep 17 00:00:00 2001 From: Brian Beggs Date: Wed, 2 Jun 2010 19:34:43 +0800 Subject: [PATCH 01/18] Added .pydevproject to the .gitignore --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index 15997ce..8b41dac 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,4 @@ .project build/ *.swp +.pydevproject From 184f7cb8a4d2835c317eca7db129fb06d92a336a Mon Sep 17 00:00:00 2001 From: Brian Beggs Date: Wed, 2 Jun 2010 20:28:49 +0800 Subject: [PATCH 02/18] moddified plugin loading so plugins located outside of the plugins directory in sleek may be loaded. Added optional argument pluginModule that is a string that represents the module the desired plugin should be loaded from. An exception on plugin loading now also will not cause the program to exit. The exception is caught and loading of other plugins contains. --- .gitignore | 1 + sleekxmpp/basexmpp.py | 17 ++++++++++------- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/.gitignore b/.gitignore index 8b41dac..6257bbf 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,4 @@ build/ *.swp .pydevproject +.settings diff --git a/sleekxmpp/basexmpp.py b/sleekxmpp/basexmpp.py index a916fe8..5f33017 100644 --- a/sleekxmpp/basexmpp.py +++ b/sleekxmpp/basexmpp.py @@ -1,9 +1,9 @@ """ - SleekXMPP: The Sleek XMPP Library - Copyright (C) 2010 Nathanael C. Fritz - This file is part of SleekXMPP. + SleekXMPP: The Sleek XMPP Library + Copyright (C) 2010 Nathanael C. Fritz + This file is part of SleekXMPP. - See the file license.txt for copying permission. + See the file license.txt for copying permission. """ from __future__ import with_statement, unicode_literals @@ -91,15 +91,18 @@ class basexmpp(object): if not self.plugin[idx].post_inited: self.plugin[idx].post_init() return super(basexmpp, self).process(*args, **kwargs) - def registerPlugin(self, plugin, pconfig = {}): + def registerPlugin(self, plugin, pconfig = {}, pluginModule = None): """Register a plugin not in plugins.__init__.__all__ but in the plugins directory.""" # discover relative "path" to the plugins module from the main app, and import it. # TODO: # gross, this probably isn't necessary anymore, especially for an installed module - __import__("%s.%s" % (globals()['plugins'].__name__, plugin)) + if pluginModule: + module = __import__("%s.%s" % (pluginModule, plugin), globals(), locals(), [plugin]) + else: + module = __import__("%s.%s" % (globals()['plugins'].__name__, plugin), globals(), locals(), [plugin]) # init the plugin class - self.plugin[plugin] = getattr(getattr(plugins, plugin), plugin)(self, pconfig) # eek + self.plugin[plugin] = getattr(module, plugin)(self, pconfig) # eek # all of this for a nice debug? sure. xep = '' if hasattr(self.plugin[plugin], 'xep'): From 2a43f59a588af7206b0d1935e85fbb9577b3c200 Mon Sep 17 00:00:00 2001 From: Brian Beggs Date: Wed, 2 Jun 2010 20:45:42 +0800 Subject: [PATCH 03/18] added try/catch block to plugin loading --- sleekxmpp/basexmpp.py | 27 +++++++++++++++------------ 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/sleekxmpp/basexmpp.py b/sleekxmpp/basexmpp.py index 5f33017..b78c255 100644 --- a/sleekxmpp/basexmpp.py +++ b/sleekxmpp/basexmpp.py @@ -97,18 +97,21 @@ class basexmpp(object): # discover relative "path" to the plugins module from the main app, and import it. # TODO: # gross, this probably isn't necessary anymore, especially for an installed module - if pluginModule: - module = __import__("%s.%s" % (pluginModule, plugin), globals(), locals(), [plugin]) - else: - module = __import__("%s.%s" % (globals()['plugins'].__name__, plugin), globals(), locals(), [plugin]) - # init the plugin class - self.plugin[plugin] = getattr(module, plugin)(self, pconfig) # eek - # all of this for a nice debug? sure. - xep = '' - if hasattr(self.plugin[plugin], 'xep'): - xep = "(XEP-%s) " % self.plugin[plugin].xep - logging.debug("Loaded Plugin %s%s" % (xep, self.plugin[plugin].description)) - + try: + if pluginModule: + module = __import__(pluginModule, globals(), locals(), [plugin]) + else: + module = __import__("%s.%s" % (globals()['plugins'].__name__, plugin), globals(), locals(), [plugin]) + # init the plugin class + self.plugin[plugin] = getattr(module, plugin)(self, pconfig) # eek + # all of this for a nice debug? sure. + xep = '' + if hasattr(self.plugin[plugin], 'xep'): + xep = "(XEP-%s) " % self.plugin[plugin].xep + logging.debug("Loaded Plugin %s%s" % (xep, self.plugin[plugin].description)) + except: + logging.error("Unable to load plugin: %s" %(plugin) ) + def register_plugins(self): """Initiates all plugins in the plugins/__init__.__all__""" if self.plugin_whitelist: From b03e6168a8131330390df35572345bfe07ba73fa Mon Sep 17 00:00:00 2001 From: Nathan Fritz Date: Wed, 2 Jun 2010 12:40:52 +0800 Subject: [PATCH 04/18] if binding and session are advertised in the same go, do session first --- sleekxmpp/__init__.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sleekxmpp/__init__.py b/sleekxmpp/__init__.py index 6995f7c..9297ad3 100644 --- a/sleekxmpp/__init__.py +++ b/sleekxmpp/__init__.py @@ -69,6 +69,7 @@ class ClientXMPP(basexmpp, XMLStream): #TODO: Use stream state here self.authenticated = False self.sessionstarted = False + self.bound = False self.registerHandler(Callback('Stream Features', MatchXPath('{http://etherx.jabber.org/streams}features'), self._handleStreamFeatures, thread=True)) self.registerHandler(Callback('Roster Update', MatchXPath('{%s}iq/{jabber:iq:roster}query' % self.default_ns), self._handleRoster, thread=True)) #self.registerHandler(Callback('Roster Update', MatchXMLMask("" % self.default_ns), self._handlePresenceSubscribe, thread=True)) @@ -248,6 +249,7 @@ class ClientXMPP(basexmpp, XMLStream): response = iq.send() #response = self.send(iq, self.Iq(sid=iq['id'])) self.set_jid(response.xml.find('{urn:ietf:params:xml:ns:xmpp-bind}bind/{urn:ietf:params:xml:ns:xmpp-bind}jid').text) + self.bound = True logging.info("Node set to: %s" % self.fulljid) if "{urn:ietf:params:xml:ns:xmpp-session}session" not in self.features: logging.debug("Established Session") @@ -255,7 +257,7 @@ class ClientXMPP(basexmpp, XMLStream): self.event("session_start") def handler_start_session(self, xml): - if self.authenticated: + if self.authenticated and self.bound: iq = self.makeIqSet(xml) response = iq.send() logging.debug("Established Session") From 85a2715c7de1f436b5553a73f74c30cffa8bc04f Mon Sep 17 00:00:00 2001 From: Nathan Fritz Date: Wed, 2 Jun 2010 12:44:54 +0800 Subject: [PATCH 05/18] hack fix for session before bind --- sleekxmpp/__init__.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/sleekxmpp/__init__.py b/sleekxmpp/__init__.py index 9297ad3..23e8c9b 100644 --- a/sleekxmpp/__init__.py +++ b/sleekxmpp/__init__.py @@ -70,6 +70,7 @@ class ClientXMPP(basexmpp, XMLStream): self.authenticated = False self.sessionstarted = False self.bound = False + self.bindfail = False self.registerHandler(Callback('Stream Features', MatchXPath('{http://etherx.jabber.org/streams}features'), self._handleStreamFeatures, thread=True)) self.registerHandler(Callback('Roster Update', MatchXPath('{%s}iq/{jabber:iq:roster}query' % self.default_ns), self._handleRoster, thread=True)) #self.registerHandler(Callback('Roster Update', MatchXMLMask("" % self.default_ns), self._handlePresenceSubscribe, thread=True)) @@ -251,7 +252,7 @@ class ClientXMPP(basexmpp, XMLStream): self.set_jid(response.xml.find('{urn:ietf:params:xml:ns:xmpp-bind}bind/{urn:ietf:params:xml:ns:xmpp-bind}jid').text) self.bound = True logging.info("Node set to: %s" % self.fulljid) - if "{urn:ietf:params:xml:ns:xmpp-session}session" not in self.features: + if "{urn:ietf:params:xml:ns:xmpp-session}session" not in self.features or self.bindfail: logging.debug("Established Session") self.sessionstarted = True self.event("session_start") @@ -263,6 +264,9 @@ class ClientXMPP(basexmpp, XMLStream): logging.debug("Established Session") self.sessionstarted = True self.event("session_start") + else: + #bind probably hasn't happened yet + self.bindfail = True def _handleRoster(self, iq, request=False): if iq['type'] == 'set' or (iq['type'] == 'result' and request): From 8227affd7f30a5fb8c6b215d00e42bc15df49d36 Mon Sep 17 00:00:00 2001 From: Thom Nichols Date: Wed, 2 Jun 2010 14:17:36 -0400 Subject: [PATCH 06/18] removed unnecessary flags and arguments from disconnect method --- sleekxmpp/__init__.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/sleekxmpp/__init__.py b/sleekxmpp/__init__.py index 6995f7c..56ba45e 100644 --- a/sleekxmpp/__init__.py +++ b/sleekxmpp/__init__.py @@ -146,13 +146,9 @@ class ClientXMPP(basexmpp, XMLStream): # overriding reconnect and disconnect so that we can get some events # should events be part of or required by xmlstream? Maybe that would be cleaner def reconnect(self): - logging.info("Reconnecting") - self.event("disconnected") - self.authenticated = False - self.sessionstarted = False - XMLStream.reconnect(self) + self.disconnect(reconnect=True) - def disconnect(self, init=True, close=False, reconnect=False): + def disconnect(self, reconnect=False): self.event("disconnected") self.authenticated = False self.sessionstarted = False From 4295a66c70e595d02e14f384432d8eee8dfef013 Mon Sep 17 00:00:00 2001 From: Thom Nichols Date: Wed, 2 Jun 2010 14:18:09 -0400 Subject: [PATCH 07/18] reconnection quiesce logic --- sleekxmpp/xmlstream/xmlstream.py | 36 ++++++++++++++++++++++++++------ 1 file changed, 30 insertions(+), 6 deletions(-) diff --git a/sleekxmpp/xmlstream/xmlstream.py b/sleekxmpp/xmlstream/xmlstream.py index 3bcb341..e0b3a23 100644 --- a/sleekxmpp/xmlstream/xmlstream.py +++ b/sleekxmpp/xmlstream/xmlstream.py @@ -16,6 +16,7 @@ from . stanzabase import StanzaBase from xml.etree import cElementTree from xml.parsers import expat import logging +import random import socket import threading import time @@ -46,6 +47,10 @@ class CloseStream(Exception): stanza_extensions = {} +RECONNECT_MAX_DELAY = 3600 +RECONNECT_QUIESCE_FACTOR = 1.6180339887498948 # Phi +RECONNECT_QUIESCE_JITTER = 0.11962656472 # molar Planck constant times c, joule meter/mole + class XMLStream(object): "A connection manager with XML events." @@ -101,7 +106,12 @@ class XMLStream(object): def connectTCP(self, host='', port=0, use_ssl=None, use_tls=None, reattempt=True): "Connect and create socket" - while reattempt and not self.state['connected']: # the self.state part is redundant. + + # Note that this is thread-safe by merit of being called solely from connect() which + # holds the state lock. + + delay = 1.0 # reconnection delay + while self.run: logging.debug('connecting....') try: if host and port: @@ -115,21 +125,35 @@ class XMLStream(object): else: self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.socket.settimeout(None) #10) + if self.use_ssl and self.ssl_support: logging.debug("Socket Wrapped for SSL") self.socket = ssl.wrap_socket(self.socket,ca_certs=self.ca_certs) - except: - logging.exception("Connection error") - try: + self.socket.connect(self.address) self.filesocket = self.socket.makefile('rb', 0) + if not self.state.transition('connecting','connected'): logging.error( "State transition error!!!! Shouldn't have happened" ) logging.debug('connect complete.') return True + except socket.error as serr: - logging.error("Could not connect. Socket Error #%s: %s" % (serr.errno, serr.strerror)) - time.sleep(1) # TODO proper quiesce if connection attempt fails + logging.exception("Socket Error #%s: %s", serr.errno, serr.strerror) + if not reattempt: return False + except: + logging.exception("Connection error") + if not reattempt: return False + + # quiesce if rconnection fails: + # This code based loosely on Twisted internet.protocol + # http://twistedmatrix.com/trac/browser/trunk/twisted/internet/protocol.py#L310 + delay = min(delay * RECONNECT_QUIESCE_JITTER, RECONNECT_MAX_DELAY) + delay = random.normalvariate(delay, delay * RECONNECT_QUIESCE_JITTER) + logging.debug('Waiting %fs until next reconnect attempt...', delay) + time.sleep(delay) + + def connectUnix(self, filepath): "Connect to Unix file and create socket" From 1c32668e18c0cbf840694c66a6f724b2d3cb8f29 Mon Sep 17 00:00:00 2001 From: Thom Nichols Date: Thu, 3 Jun 2010 07:47:27 -0400 Subject: [PATCH 08/18] fixed quiesce algorithm; state transition if connect fails; note about use_tls instance variable. --- sleekxmpp/xmlstream/xmlstream.py | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/sleekxmpp/xmlstream/xmlstream.py b/sleekxmpp/xmlstream/xmlstream.py index e0b3a23..af95c2b 100644 --- a/sleekxmpp/xmlstream/xmlstream.py +++ b/sleekxmpp/xmlstream/xmlstream.py @@ -101,8 +101,16 @@ class XMLStream(object): def connect(self, host='', port=0, use_ssl=None, use_tls=None): "Link to connectTCP" - if self.state.transition('disconnected', 'connecting'): - return self.connectTCP(host, port, use_ssl, use_tls) + if not self.connectTCP(host, port, use_ssl, use_tls): + # return to the 'disconnected' state if connect failed: + # otherwise the connect method is not reentrant + if not self.state.transition('connecting','disconnected'): + logging.error("Couldn't transition to the 'disconnected' state!") + return False + return True + + # TODO currently a caller can't distinguish between "connection failed" and + # "we're already trying to connect from another thread" def connectTCP(self, host='', port=0, use_ssl=None, use_tls=None, reattempt=True): "Connect and create socket" @@ -119,6 +127,7 @@ class XMLStream(object): if use_ssl is not None: self.use_ssl = use_ssl if use_tls is not None: + # TODO this variable doesn't seem to be used for anything! self.use_tls = use_tls if sys.version_info < (3, 0): self.socket = filesocket.Socket26(socket.AF_INET, socket.SOCK_STREAM) @@ -146,9 +155,9 @@ class XMLStream(object): if not reattempt: return False # quiesce if rconnection fails: - # This code based loosely on Twisted internet.protocol + # This algorithm based loosely on Twisted internet.protocol # http://twistedmatrix.com/trac/browser/trunk/twisted/internet/protocol.py#L310 - delay = min(delay * RECONNECT_QUIESCE_JITTER, RECONNECT_MAX_DELAY) + delay = min(delay * RECONNECT_QUIESCE_FACTOR, RECONNECT_MAX_DELAY) delay = random.normalvariate(delay, delay * RECONNECT_QUIESCE_JITTER) logging.debug('Waiting %fs until next reconnect attempt...', delay) time.sleep(delay) From 2f0f18a8c62db21fa3ba8cc87a6d190aca205c30 Mon Sep 17 00:00:00 2001 From: Thom Nichols Date: Thu, 3 Jun 2010 08:07:56 -0400 Subject: [PATCH 09/18] added function to retrieve the current state --- sleekxmpp/xmlstream/statemachine.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/sleekxmpp/xmlstream/statemachine.py b/sleekxmpp/xmlstream/statemachine.py index c5f5176..065d579 100644 --- a/sleekxmpp/xmlstream/statemachine.py +++ b/sleekxmpp/xmlstream/statemachine.py @@ -110,7 +110,14 @@ class StateMachine(object): def reset(self): # TODO need to lock before calling this? self.transition(self.__current_state, self._default_state) - + + + def current_state(self): + ''' + Return the current state name. + ''' + return self.__current_state + def __getitem__(self, state): ''' From da6e1e47dc81f5f9579201644d7c18dd85510368 Mon Sep 17 00:00:00 2001 From: Thom Nichols Date: Thu, 3 Jun 2010 08:09:09 -0400 Subject: [PATCH 10/18] whups, somehow I lost the 'connecting' lock in connect() --- sleekxmpp/xmlstream/xmlstream.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/sleekxmpp/xmlstream/xmlstream.py b/sleekxmpp/xmlstream/xmlstream.py index af95c2b..76aecee 100644 --- a/sleekxmpp/xmlstream/xmlstream.py +++ b/sleekxmpp/xmlstream/xmlstream.py @@ -101,6 +101,10 @@ class XMLStream(object): def connect(self, host='', port=0, use_ssl=None, use_tls=None): "Link to connectTCP" + if not self.state.transition('disconnected','connecting'): + logging.warning("Can't connect now; Already in state %s", self.state.current_state()) + return False + if not self.connectTCP(host, port, use_ssl, use_tls): # return to the 'disconnected' state if connect failed: # otherwise the connect method is not reentrant From d20cd6b3e6a7edacfef6010339d7fb7b587a9def Mon Sep 17 00:00:00 2001 From: Thom Nichols Date: Thu, 3 Jun 2010 13:51:11 -0400 Subject: [PATCH 11/18] added function execution on transition, and more unit tests. --- sleekxmpp/xmlstream/xmlstream.py | 3 +- tests/test_statemachine.py | 91 +++++++++++++++++++++++++++++++- 2 files changed, 91 insertions(+), 3 deletions(-) diff --git a/sleekxmpp/xmlstream/xmlstream.py b/sleekxmpp/xmlstream/xmlstream.py index 76aecee..c407a33 100644 --- a/sleekxmpp/xmlstream/xmlstream.py +++ b/sleekxmpp/xmlstream/xmlstream.py @@ -281,7 +281,7 @@ class XMLStream(object): data = None try: - data = self.sendqueue.get(True,10) + data = self.sendqueue.get(True,5) logging.debug("SEND: %s" % data) self.socket.sendall(data.encode('utf-8')) except queue.Empty: @@ -372,6 +372,7 @@ class XMLStream(object): try: event = self.eventqueue.get(True, timeout=5) except queue.Empty: + logging.debug('Nothing on event queue') event = None if event is not None: etype = event[0] diff --git a/tests/test_statemachine.py b/tests/test_statemachine.py index 6749c8d..fec3109 100644 --- a/tests/test_statemachine.py +++ b/tests/test_statemachine.py @@ -1,5 +1,5 @@ import unittest -import time, threading +import time, threading, random, functools if __name__ == '__main__': import sys, os @@ -83,12 +83,12 @@ class testStateMachine(unittest.TestCase): thread_state = {'ready': False, 'transitioned': False} def t1(): - # this will block until the main thread transitions to 'two' if s['two']: print 'thread has already transitioned!' self.fail() thread_state['ready'] = True print 'Thread is ready' + # this will block until the main thread transitions to 'two' self.assertTrue( s.transition('two','three', wait=20) ) print 'transitioned to three!' thread_state['transitioned'] = True @@ -109,6 +109,93 @@ class testStateMachine(unittest.TestCase): self.assertTrue( thread_state['transitioned'] ) + def testForRaceCondition(self): + """Attempt to allow two threads to perform the same transition; + only one should ever make it.""" + + s = sm.StateMachine(('one','two','three')) + + def t1(num): + while True: + if not trigger['go'] or thread_state[num] in (True,False): + time.sleep( random.random()/100 ) # < .01s + if thread_state[num] == 'quit': break + continue + + thread_state[num] = s.transition('one','two' ) +# print '-', + + thread_count = 20 + threads = [] + thread_state = {} + def reset(): + for c in range(thread_count): thread_state[c] = "reset" + trigger = {'go':False} # use of a plain boolean seems to be non-volatile between threads. + + for c in range(thread_count): + thread_state[c] = "reset" + thread = threading.Thread( target= functools.partial(t1,c) ) + threads.append( thread ) + thread.daemon = True + thread.start() + + for x in range(100): # this will take 10s to execute +# print "+", + trigger['go'] = True + time.sleep(.1) + trigger['go'] = False + winners = 0 + for (num, state) in thread_state.items(): + if state == True: winners = winners +1 + elif state != False: raise Exception( "!%d!%s!" % (num,state) ) + + self.assertEqual( 1, winners, "Expected one winner! %d" % winners ) + self.assertTrue( s.ensure('two') ) + self.assertTrue( s.transition('two','one') ) # return to the first state. + reset() + + # now let the threads quit gracefully: + for c in range(thread_count): thread_state[c] = 'quit' + time.sleep(2) + + + def testTransitionFunctions(self): + "test that a `func` argument allows or blocks the transition correctly." + + s = sm.StateMachine(('one','two','three')) + + def alwaysFalse(): return False + def alwaysTrue(): return True + + self.failIf( s.transition('one','two', func=alwaysFalse) ) + self.assertTrue(s['one']) + self.failIf(s['two']) + + self.assertTrue( s.transition('one','two', func=alwaysTrue) ) + self.failIf(s['one']) + self.assertTrue(s['two']) + + + def testTransitionFuncException(self): + "if a transition function throws an exeption, ensure we're in a sane state" + + s = sm.StateMachine(('one','two','three')) + + def alwaysException(): raise Exception('whups!') + + try: + self.failIf( s.transition('one','two', func=alwaysException) ) + self.fail("exception should have been thrown") + except: pass #expected exception + + self.assertTrue(s['one']) + self.failIf(s['two']) + + # ensure a subsequent attempt completes normally: + self.assertTrue( s.transition('one','two') ) + self.failIf(s['one']) + self.assertTrue(s['two']) + suite = unittest.TestLoader().loadTestsFromTestCase(testStateMachine) From f54501a3465b84f1be157c1ba99df854dbeaed1d Mon Sep 17 00:00:00 2001 From: Thom Nichols Date: Thu, 3 Jun 2010 14:12:06 -0400 Subject: [PATCH 12/18] added function execution on transition, and more unit tests. --- sleekxmpp/xmlstream/statemachine.py | 30 ++++++++++++++++++++++++----- 1 file changed, 25 insertions(+), 5 deletions(-) diff --git a/sleekxmpp/xmlstream/statemachine.py b/sleekxmpp/xmlstream/statemachine.py index 065d579..0c841a9 100644 --- a/sleekxmpp/xmlstream/statemachine.py +++ b/sleekxmpp/xmlstream/statemachine.py @@ -10,6 +10,7 @@ import threading import time import logging + class StateMachine(object): def __init__(self, states=[]): @@ -27,7 +28,7 @@ class StateMachine(object): self.__states.append( state ) - def transition(self, from_state, to_state, wait=0.0): + def transition(self, from_state, to_state, wait=0.0, func=None, args=[], kwargs={} ): ''' Transition from the given `from_state` to the given `to_state`. This method will return `True` if the state machine is now in `to_state`. It @@ -47,12 +48,22 @@ class StateMachine(object): if thread_should_exit: return # perform actions here after successful transition - This allows the thread to be interrupted by setting `thread_should_exit=True` + This allows the thread to be responsive by setting `thread_should_exit=True`. + + The optional `func` argument allows the user to pass a callable operation which occurs + within the context of the state transition (e.g. while the state machine is locked.) + If `func` returns a True value, the transition will occur. If `func` returns a non- + True value or if an exception is thrown, the transition will not occur. Any thrown + exception is not caught by the state machine and is the caller's responsibility to handle. + If `func` completes normally, this method will return the value returned by `func.` If + values for `args` and `kwargs` are provided, they are expanded and passed like so: + `func( *args, **kwargs )`. ''' - return self.transition_any( (from_state,), to_state, wait=wait ) + return self.transition_any( (from_state,), to_state, wait=wait, + func=func, args=args, kwargs=kwargs ) - def transition_any(self, from_states, to_state, wait=0.0): + def transition_any(self, from_states, to_state, wait=0.0, func=None, args=[], kwargs={} ): ''' Transition from any of the given `from_states` to the given `to_state`. ''' @@ -73,10 +84,19 @@ class StateMachine(object): self.lock.wait(wait) if self.__current_state in from_states: # should always be True due to lock + + return_val = True + # Note that func might throw an exception, but that's OK, it aborts the transition + if func is not None: return_val = func(*args,**kwargs) + + # some 'false' value returned from func, + # indicating that transition should not occur: + if not return_val: return return_val + logging.debug(' ==== TRANSITION %s -> %s', self.__current_state, to_state) self.__current_state = to_state self.lock.notifyAll() - return True + return return_val # some 'true' value returned by func or True if func was None else: logging.error( "StateMachine bug!! The lock should ensure this doesn't happen!" ) return False From 919c8c5633acd0d638f39c4a7ec8794fc5e94fff Mon Sep 17 00:00:00 2001 From: Thom Nichols Date: Thu, 3 Jun 2010 15:21:26 -0400 Subject: [PATCH 13/18] tweaked connectTCP call slightly to reduce possibility of 'connecting' state limbo --- sleekxmpp/xmlstream/xmlstream.py | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/sleekxmpp/xmlstream/xmlstream.py b/sleekxmpp/xmlstream/xmlstream.py index c407a33..cc5e1ec 100644 --- a/sleekxmpp/xmlstream/xmlstream.py +++ b/sleekxmpp/xmlstream/xmlstream.py @@ -100,18 +100,20 @@ class XMLStream(object): self.filesocket = filesocket def connect(self, host='', port=0, use_ssl=None, use_tls=None): - "Link to connectTCP" + "Establish a socket connection to the given XMPP server." if not self.state.transition('disconnected','connecting'): logging.warning("Can't connect now; Already in state %s", self.state.current_state()) return False - if not self.connectTCP(host, port, use_ssl, use_tls): - # return to the 'disconnected' state if connect failed: - # otherwise the connect method is not reentrant - if not self.state.transition('connecting','disconnected'): - logging.error("Couldn't transition to the 'disconnected' state!") - return False - return True + try: + return self.connectTCP(host, port, use_ssl, use_tls) + finally: + # attempt to ensure once a connection attempt starts, we leave either in the + # 'connected' or 'disconnected' state. Otherwise the connect method is not reentrant + if self.state['connecting']: + if not self.state.transition('connecting','disconnected'): + logging.error("Couldn't return to the 'disconnected' state after connection failure!") + # TODO currently a caller can't distinguish between "connection failed" and # "we're already trying to connect from another thread" @@ -285,7 +287,8 @@ class XMLStream(object): logging.debug("SEND: %s" % data) self.socket.sendall(data.encode('utf-8')) except queue.Empty: - logging.debug('nothing on send queue') +# logging.debug('Nothing on send queue') + pass except socket.timeout: # this is to prevent a thread blocked indefinitely logging.debug('timeout sending packet data') @@ -372,7 +375,7 @@ class XMLStream(object): try: event = self.eventqueue.get(True, timeout=5) except queue.Empty: - logging.debug('Nothing on event queue') +# logging.debug('Nothing on event queue') event = None if event is not None: etype = event[0] From e7c37c4ec5a31402ba4d3fd95f2ee31d72183a83 Mon Sep 17 00:00:00 2001 From: Thom Nichols Date: Fri, 4 Jun 2010 17:00:51 -0400 Subject: [PATCH 14/18] connect uses the new function-on-state-transition so when the connect method returns you are guaranteed to be either in the 'connected' or 'disconnected' state. Could remove the 'connecting' state except uses it. --- sleekxmpp/xmlstream/xmlstream.py | 22 ++++++++-------------- 1 file changed, 8 insertions(+), 14 deletions(-) diff --git a/sleekxmpp/xmlstream/xmlstream.py b/sleekxmpp/xmlstream/xmlstream.py index cc5e1ec..6dbe7b3 100644 --- a/sleekxmpp/xmlstream/xmlstream.py +++ b/sleekxmpp/xmlstream/xmlstream.py @@ -101,19 +101,16 @@ class XMLStream(object): def connect(self, host='', port=0, use_ssl=None, use_tls=None): "Establish a socket connection to the given XMPP server." - if not self.state.transition('disconnected','connecting'): - logging.warning("Can't connect now; Already in state %s", self.state.current_state()) + + if not self.state.transition('disconnected','connected', + func=self.connectTCP, args=[host, port, use_ssl, use_tls] ): + + if self.state['connected']: logging.debug('Already connected') + else: logging.warning("Connection failed" ) return False - try: - return self.connectTCP(host, port, use_ssl, use_tls) - finally: - # attempt to ensure once a connection attempt starts, we leave either in the - # 'connected' or 'disconnected' state. Otherwise the connect method is not reentrant - if self.state['connecting']: - if not self.state.transition('connecting','disconnected'): - logging.error("Couldn't return to the 'disconnected' state after connection failure!") - + logging.debug('Connection complete.') + return True # TODO currently a caller can't distinguish between "connection failed" and # "we're already trying to connect from another thread" @@ -148,9 +145,6 @@ class XMLStream(object): self.socket.connect(self.address) self.filesocket = self.socket.makefile('rb', 0) - if not self.state.transition('connecting','connected'): - logging.error( "State transition error!!!! Shouldn't have happened" ) - logging.debug('connect complete.') return True except socket.error as serr: From 66cf0c20218657622a5d98aee155df1ea37d9a89 Mon Sep 17 00:00:00 2001 From: Thom Nichols Date: Mon, 7 Jun 2010 13:16:02 -0400 Subject: [PATCH 15/18] context manager is working but there's a fatal flaw: inside the body of the 'with' statement, there's no way to tell whether or not the transition occurred or timed out. --- sleekxmpp/xmlstream/statemachine.py | 83 ++++++++++++++++++++++------- tests/test_statemachine.py | 57 ++++++++++++++++++++ 2 files changed, 122 insertions(+), 18 deletions(-) diff --git a/sleekxmpp/xmlstream/statemachine.py b/sleekxmpp/xmlstream/statemachine.py index 0c841a9..51b4aae 100644 --- a/sleekxmpp/xmlstream/statemachine.py +++ b/sleekxmpp/xmlstream/statemachine.py @@ -63,20 +63,22 @@ class StateMachine(object): return self.transition_any( (from_state,), to_state, wait=wait, func=func, args=args, kwargs=kwargs ) + def transition_any(self, from_states, to_state, wait=0.0, func=None, args=[], kwargs={} ): ''' Transition from any of the given `from_states` to the given `to_state`. ''' - with self.lock: - for state in from_states: - if isinstance(state,tuple) or isinstance(state,list): - raise ValueError( "State %s should be a string. Did you mean to call 'StateMachine.transition_any()?" % str(state) ) - if not state in self.__states: - raise ValueError( "StateMachine does not contain from_state %s." % state ) - if not to_state in self.__states: - raise ValueError( "StateMachine does not contain to_state %s." % to_state ) + if not (isinstance(from_states,tuple) or isinstance(from_states,list)): + raise ValueError( "from_states should be a list or tuple" ) + for state in from_states: + if not state in self.__states: + raise ValueError( "StateMachine does not contain from_state %s." % state ) + if not to_state in self.__states: + raise ValueError( "StateMachine does not contain to_state %s." % to_state ) + + with self.lock: start = time.time() while not self.__current_state in from_states: # detect timeout: @@ -102,23 +104,34 @@ class StateMachine(object): return False + def transition_ctx(self, from_state, to_state, wait=0.0): + if not from_state in self.__states: + raise ValueError( "StateMachine does not contain from_state %s." % state ) + if not to_state in self.__states: + raise ValueError( "StateMachine does not contain to_state %s." % to_state ) + + return _StateCtx(self, from_state, to_state, wait) + + def ensure(self, state, wait=0.0): ''' Ensure the state machine is currently in `state`, or wait until it enters `state`. ''' return self.ensure_any( (state,), wait=wait ) + def ensure_any(self, states, wait=0.0): ''' Ensure we are currently in one of the given `states` ''' - with self.lock: - for state in states: - if isinstance(state,tuple) or isinstance(state,list): - raise ValueError( "State %s should be a string. Did you mean to call 'StateMachine.transition_any()?" % str(state) ) - if not state in self.__states: - raise ValueError( "StateMachine does not contain state %s." % state ) + if not (isinstance(states,tuple) or isinstance(states,list)): + raise ValueError('states arg should be a tuple or list') + for state in states: + if not state in self.__states: + raise ValueError( "StateMachine does not contain state '%s'" % state ) + + with self.lock: start = time.time() while not self.__current_state in states: # detect timeout: @@ -132,6 +145,11 @@ class StateMachine(object): self.transition(self.__current_state, self._default_state) + def _set_state(self, state): #unsynchronized, only call internally after lock is acquired + self.__current_state = state + return state + + def current_state(self): ''' Return the current state name. @@ -146,12 +164,41 @@ class StateMachine(object): ''' return self.__current_state == state + +class _StateCtx: + + def __init__( self, state_machine, from_state, to_state, wait ): + self.state_machine = state_machine + self.from_state = from_state + self.to_state = to_state + self.wait = wait + self._timeout = False + def __enter__(self): - self.lock.acquire() - return self + self.state_machine.lock.acquire() + start = time.time() + while not self.state_machine[ self.from_state ]: + # detect timeout: + if time.time() >= start + self.wait: + logging.debug('StateMachine timeout while waiting for state: %s', self.from_state ) + self._timeout = True # to indicate we should not transition + break + self.state_machine.lock.wait(self.wait) + + logging.debug('StateMachine entered context in state: %s', + self.state_machine.current_state() ) + return self.state_machine def __exit__(self, exc_type, exc_val, exc_tb): - self.lock.nofityAll() - self.lock.release() + if exc_val is not None: + logging.exception( "StateMachine exception in context, remaining in state: %s\n%s:%s", + self.state_machine.current_state(), exc_type.__name__, exc_val ) + elif not self._timeout: + logging.debug(' ==== TRANSITION %s -> %s', + self.state_machine.current_state(), self.to_state) + self.state_machine._set_state( self.to_state ) + + self.state_machine.lock.notifyAll() + self.state_machine.lock.release() return False # re-raise any exception diff --git a/tests/test_statemachine.py b/tests/test_statemachine.py index fec3109..4cfb50d 100644 --- a/tests/test_statemachine.py +++ b/tests/test_statemachine.py @@ -197,6 +197,63 @@ class testStateMachine(unittest.TestCase): self.assertTrue(s['two']) + def testContextManager(self): + + s = sm.StateMachine(('one','two','three')) + + with s.transition_ctx('one','two'): + self.assertTrue( s['one'] ) + self.failIf( s['two'] ) + + #successful transition b/c no exception was thrown + self.assertTrue( s['two'] ) + self.failIf( s['one'] ) + + # failed transition because exception is thrown: + try: + with s.transition_ctx('two','three'): + raise Exception("boom!") + self.fail('exception expected') + except: pass + + self.failIf( s.current_state() in ('one','three') ) + self.assertTrue( s['two'] ) + + def testCtxManagerTransitionFailure(self): + + s = sm.StateMachine(('one','two','three')) + + with s.transition_ctx('two','three') as _s: + self.assertTrue( _s['one'] ) + self.failIf( _s.current_state in ('two','three') ) + + self.assertTrue( _s['one'] ) + + def r1(): + print 'thread 1 started' + self.assertTrue( s.transition('one','two') ) + print 'thread 1 transitioned' + + def r2(): + print 'thread 2 started' + self.failIf( s['two'] ) + with s.transition_ctx('two','three', 10) as _s: + self.assertTrue( _s['two'] ) + print 'thread 2 will transition on exit from the context manager...' + self.assertTrue( s['three'] ) + + t1 = threading.Thread(target=r1) + t2 = threading.Thread(target=r2) + + t2.start() # this should block until r1 goes + time.sleep(1) + t1.start() + + t1.join() + t2.join() + + self.assertTrue( s['three'] ) + suite = unittest.TestLoader().loadTestsFromTestCase(testStateMachine) From 47f1fb16909d3baaec5822b1dcbca89491d0d18c Mon Sep 17 00:00:00 2001 From: Thom Nichols Date: Mon, 7 Jun 2010 13:43:37 -0400 Subject: [PATCH 16/18] context manager now returns a boolean 'result' as the context variable to indicate whether the transition timed out or if you are actually locked when entering the context body --- sleekxmpp/xmlstream/statemachine.py | 4 ++-- tests/test_statemachine.py | 15 +++++++++------ 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/sleekxmpp/xmlstream/statemachine.py b/sleekxmpp/xmlstream/statemachine.py index 51b4aae..b1ab757 100644 --- a/sleekxmpp/xmlstream/statemachine.py +++ b/sleekxmpp/xmlstream/statemachine.py @@ -182,12 +182,12 @@ class _StateCtx: if time.time() >= start + self.wait: logging.debug('StateMachine timeout while waiting for state: %s', self.from_state ) self._timeout = True # to indicate we should not transition - break + return False self.state_machine.lock.wait(self.wait) logging.debug('StateMachine entered context in state: %s', self.state_machine.current_state() ) - return self.state_machine + return True def __exit__(self, exc_type, exc_val, exc_tb): if exc_val is not None: diff --git a/tests/test_statemachine.py b/tests/test_statemachine.py index 4cfb50d..0173ff0 100644 --- a/tests/test_statemachine.py +++ b/tests/test_statemachine.py @@ -223,11 +223,12 @@ class testStateMachine(unittest.TestCase): s = sm.StateMachine(('one','two','three')) - with s.transition_ctx('two','three') as _s: - self.assertTrue( _s['one'] ) - self.failIf( _s.current_state in ('two','three') ) + with s.transition_ctx('two','three') as result: + self.failIf( result ) + self.assertTrue( s['one'] ) + self.failIf( s.current_state in ('two','three') ) - self.assertTrue( _s['one'] ) + self.assertTrue( s['one'] ) def r1(): print 'thread 1 started' @@ -237,10 +238,12 @@ class testStateMachine(unittest.TestCase): def r2(): print 'thread 2 started' self.failIf( s['two'] ) - with s.transition_ctx('two','three', 10) as _s: - self.assertTrue( _s['two'] ) + with s.transition_ctx('two','three', 10) as result: + self.assertTrue( result ) + self.assertTrue( s['two'] ) print 'thread 2 will transition on exit from the context manager...' self.assertTrue( s['three'] ) + print 'transitioned to %s' % s.current_state() t1 = threading.Thread(target=r1) t2 = threading.Thread(target=r2) From 9464736551311c015f1511e7535c762923838eaf Mon Sep 17 00:00:00 2001 From: Thom Nichols Date: Mon, 7 Jun 2010 13:58:15 -0400 Subject: [PATCH 17/18] added __str__ --- sleekxmpp/xmlstream/statemachine.py | 4 ++++ tests/test_statemachine.py | 7 +++++-- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/sleekxmpp/xmlstream/statemachine.py b/sleekxmpp/xmlstream/statemachine.py index b1ab757..704cabd 100644 --- a/sleekxmpp/xmlstream/statemachine.py +++ b/sleekxmpp/xmlstream/statemachine.py @@ -163,6 +163,10 @@ class StateMachine(object): Use `StateMachine.ensure(state)` to wait until the machine enters a certain state. ''' return self.__current_state == state + + def __str__(self): + return "".join(( "StateMachine(", ','.join(self.__states), "): ", self.__current_state )) + class _StateCtx: diff --git a/tests/test_statemachine.py b/tests/test_statemachine.py index 0173ff0..00e4d3a 100644 --- a/tests/test_statemachine.py +++ b/tests/test_statemachine.py @@ -20,11 +20,14 @@ class testStateMachine(unittest.TestCase): # self.failIf(s.two) self.failIf(s['two']) try: - s.booga + s['booga'] self.fail('s.booga is an invalid state and should throw an exception!') except: pass #expected exception - + # just make sure __str__ works, no reason to test its exact value: + print str(s) + + def testTransitions(self): "Test ensure transitions occur correctly in a single thread" s = sm.StateMachine(('one','two','three')) From 34dc236126d272a16df0915b010dca0e2e38f0d4 Mon Sep 17 00:00:00 2001 From: Thom Nichols Date: Mon, 7 Jun 2010 14:41:42 -0400 Subject: [PATCH 18/18] added documentation for transition_ctx and removed some superfluous comment lines --- sleekxmpp/xmlstream/statemachine.py | 26 ++++++++++++++++++++++++++ tests/test_statemachine.py | 5 ----- 2 files changed, 26 insertions(+), 5 deletions(-) diff --git a/sleekxmpp/xmlstream/statemachine.py b/sleekxmpp/xmlstream/statemachine.py index 704cabd..c6e0ea4 100644 --- a/sleekxmpp/xmlstream/statemachine.py +++ b/sleekxmpp/xmlstream/statemachine.py @@ -105,6 +105,32 @@ class StateMachine(object): def transition_ctx(self, from_state, to_state, wait=0.0): + ''' + Use the state machine as a context manager. The transition occurs on /exit/ from + the `with` context, so long as no exception is thrown. For example: + + :: + + with state_machine.transition_ctx('one','two', wait=5) as locked: + if locked: + # the state machine is currently locked in state 'one', and will + # transition to 'two' when the 'with' statement ends, so long as + # no exception is thrown. + print 'Currently locked in state one: %s' % state_machine['one'] + + else: + # The 'wait' timed out, and no lock has been acquired + print 'Timed out before entering state "one"' + + print 'Since no exception was thrown, we are now in state "two": %s' % state_machine['two'] + + + The other main difference between this method and `transition()` is that the + state machine is locked for the duration of the `with` statement (normally, + after a `transition() occurs, the state machine is immediately unlocked and + available to another thread to call `transition()` again. + ''' + if not from_state in self.__states: raise ValueError( "StateMachine does not contain from_state %s." % state ) if not to_state in self.__states: diff --git a/tests/test_statemachine.py b/tests/test_statemachine.py index 00e4d3a..e44b8e4 100644 --- a/tests/test_statemachine.py +++ b/tests/test_statemachine.py @@ -15,9 +15,7 @@ class testStateMachine(unittest.TestCase): def testDefaults(self): "Test ensure transitions occur correctly in a single thread" s = sm.StateMachine(('one','two','three')) -# self.assertTrue(s.one) self.assertTrue(s['one']) -# self.failIf(s.two) self.failIf(s['two']) try: s['booga'] @@ -31,12 +29,9 @@ class testStateMachine(unittest.TestCase): def testTransitions(self): "Test ensure transitions occur correctly in a single thread" s = sm.StateMachine(('one','two','three')) -# self.assertTrue(s.one) self.assertTrue( s.transition('one', 'two') ) -# self.assertTrue( s.two ) self.assertTrue( s['two'] ) -# self.failIf( s.one ) self.failIf( s['one'] ) self.assertTrue( s.transition('two', 'three') )