fixed logic error in state machine

This commit is contained in:
Nathan Fritz 2010-10-20 16:57:47 -07:00
parent 6e34b2cfdd
commit 11a6e6d2e0

View file

@ -14,265 +14,270 @@ log = logging.getLogger(__name__)
class StateMachine(object): class StateMachine(object):
def __init__(self, states=[]): def __init__(self, states=[]):
self.lock = threading.Lock() self.lock = threading.Lock()
self.notifier = threading.Event() self.notifier = threading.Event()
self.__states= [] self.__states= []
self.addStates(states) self.addStates(states)
self.__default_state = self.__states[0] self.__default_state = self.__states[0]
self.__current_state = self.__default_state self.__current_state = self.__default_state
def addStates(self, states): def addStates(self, states):
self.lock.acquire() self.lock.acquire()
try: try:
for state in states: for state in states:
if state in self.__states: if state in self.__states:
raise IndexError("The state '%s' is already in the StateMachine." % state) raise IndexError("The state '%s' is already in the StateMachine." % state)
self.__states.append( state ) self.__states.append( state )
finally: self.lock.release() finally: self.lock.release()
def transition(self, from_state, to_state, wait=0.0, func=None, args=[], kwargs={} ): def transition(self, from_state, to_state, wait=0.0, func=None, args=[], kwargs={} ):
''' '''
Transition from the given `from_state` to the given `to_state`. Transition from the given `from_state` to the given `to_state`.
This method will return `True` if the state machine is now in `to_state`. It This method will return `True` if the state machine is now in `to_state`. It
will return `False` if a timeout occurred the transition did not occur. will return `False` if a timeout occurred the transition did not occur.
If `wait` is 0 (the default,) this method returns immediately if the state machine If `wait` is 0 (the default,) this method returns immediately if the state machine
is not in `from_state`. is not in `from_state`.
If you want the thread to block and transition once the state machine to enters If you want the thread to block and transition once the state machine to enters
`from_state`, set `wait` to a non-negative value. Note there is no 'block `from_state`, set `wait` to a non-negative value. Note there is no 'block
indefinitely' flag since this leads to deadlock. If you want to wait indefinitely, indefinitely' flag since this leads to deadlock. If you want to wait indefinitely,
choose a reasonable value for `wait` (e.g. 20 seconds) and do so in a while loop like so: choose a reasonable value for `wait` (e.g. 20 seconds) and do so in a while loop like so:
:: ::
while not thread_should_exit and not state_machine.transition('disconnected', 'connecting', wait=20 ): while not thread_should_exit and not state_machine.transition('disconnected', 'connecting', wait=20 ):
pass # timeout will occur every 20s unless transition occurs pass # timeout will occur every 20s unless transition occurs
if thread_should_exit: return if thread_should_exit: return
# perform actions here after successful transition # perform actions here after successful transition
This allows the thread to be responsive by setting `thread_should_exit=True`. This allows the thread to be responsive by setting `thread_should_exit=True`.
The optional `func` argument allows the user to pass a callable operation which occurs The optional `func` argument allows the user to pass a callable operation which occurs
within the context of the state transition (e.g. while the state machine is locked.) within the context of the state transition (e.g. while the state machine is locked.)
If `func` returns a True value, the transition will occur. If `func` returns a non- If `func` returns a True value, the transition will occur. If `func` returns a non-
True value or if an exception is thrown, the transition will not occur. Any thrown True value or if an exception is thrown, the transition will not occur. Any thrown
exception is not caught by the state machine and is the caller's responsibility to handle. exception is not caught by the state machine and is the caller's responsibility to handle.
If `func` completes normally, this method will return the value returned by `func.` If If `func` completes normally, this method will return the value returned by `func.` If
values for `args` and `kwargs` are provided, they are expanded and passed like so: values for `args` and `kwargs` are provided, they are expanded and passed like so:
`func( *args, **kwargs )`. `func( *args, **kwargs )`.
''' '''
return self.transition_any( (from_state,), to_state, wait=wait, return self.transition_any( (from_state,), to_state, wait=wait,
func=func, args=args, kwargs=kwargs ) func=func, args=args, kwargs=kwargs )
def transition_any(self, from_states, to_state, wait=0.0, func=None, args=[], kwargs={} ): def transition_any(self, from_states, to_state, wait=0.0, func=None, args=[], kwargs={} ):
''' '''
Transition from any of the given `from_states` to the given `to_state`. Transition from any of the given `from_states` to the given `to_state`.
''' '''
if not (isinstance(from_states,tuple) or isinstance(from_states,list)): if not (isinstance(from_states,tuple) or isinstance(from_states,list)):
raise ValueError( "from_states should be a list or tuple" ) raise ValueError( "from_states should be a list or tuple" )
for state in from_states: for state in from_states:
if not state in self.__states: if not state in self.__states:
raise ValueError( "StateMachine does not contain from_state %s." % state ) raise ValueError( "StateMachine does not contain from_state %s." % state )
if not to_state in self.__states: if not to_state in self.__states:
raise ValueError( "StateMachine does not contain to_state %s." % to_state ) raise ValueError( "StateMachine does not contain to_state %s." % to_state )
start = time.time() start = time.time()
while not self.__current_state in from_states or not self.lock.acquire(False): while not self.lock.acquire(False):
# detect timeout: time.sleep(.001)
remainder = start + wait - time.time() if (start + wait - time.time()) <= 0.0:
if remainder > 0: self.notifier.wait(remainder) return False
else: return False
try: # lock is acquired; all other threads will return false or wait until notify/timeout while not self.__current_state in from_states:
if self.__current_state in from_states: # should always be True due to lock # detect timeout:
remainder = start + wait - time.time()
if remainder > 0: self.notifier.wait(remainder)
else: return False
# Note that func might throw an exception, but that's OK, it aborts the transition try: # lock is acquired; all other threads will return false or wait until notify/timeout
return_val = func(*args,**kwargs) if func is not None else True if self.__current_state in from_states: # should always be True due to lock
# some 'false' value returned from func, # Note that func might throw an exception, but that's OK, it aborts the transition
# indicating that transition should not occur: return_val = func(*args,**kwargs) if func is not None else True
if not return_val: return return_val
log.debug(' ==== TRANSITION %s -> %s', self.__current_state, to_state) # some 'false' value returned from func,
self._set_state( to_state ) # indicating that transition should not occur:
return return_val # some 'true' value returned by func or True if func was None if not return_val: return return_val
else:
log.error( "StateMachine bug!! The lock should ensure this doesn't happen!" ) log.debug(' ==== TRANSITION %s -> %s', self.__current_state, to_state)
return False self._set_state( to_state )
finally: return return_val # some 'true' value returned by func or True if func was None
self.notifier.set() # notify any waiting threads that the state has changed. else:
self.notifier.clear() log.error( "StateMachine bug!! The lock should ensure this doesn't happen!" )
self.lock.release() return False
finally:
self.notifier.set() # notify any waiting threads that the state has changed.
self.notifier.clear()
self.lock.release()
def transition_ctx(self, from_state, to_state, wait=0.0): def transition_ctx(self, from_state, to_state, wait=0.0):
''' '''
Use the state machine as a context manager. The transition occurs on /exit/ from Use the state machine as a context manager. The transition occurs on /exit/ from
the `with` context, so long as no exception is thrown. For example: the `with` context, so long as no exception is thrown. For example:
:: ::
with state_machine.transition_ctx('one','two', wait=5) as locked: with state_machine.transition_ctx('one','two', wait=5) as locked:
if locked: if locked:
# the state machine is currently locked in state 'one', and will # the state machine is currently locked in state 'one', and will
# transition to 'two' when the 'with' statement ends, so long as # transition to 'two' when the 'with' statement ends, so long as
# no exception is thrown. # no exception is thrown.
print 'Currently locked in state one: %s' % state_machine['one'] print 'Currently locked in state one: %s' % state_machine['one']
else: else:
# The 'wait' timed out, and no lock has been acquired # The 'wait' timed out, and no lock has been acquired
print 'Timed out before entering state "one"' print 'Timed out before entering state "one"'
print 'Since no exception was thrown, we are now in state "two": %s' % state_machine['two'] print 'Since no exception was thrown, we are now in state "two": %s' % state_machine['two']
The other main difference between this method and `transition()` is that the The other main difference between this method and `transition()` is that the
state machine is locked for the duration of the `with` statement. Normally, state machine is locked for the duration of the `with` statement. Normally,
after a `transition()` occurs, the state machine is immediately unlocked and after a `transition()` occurs, the state machine is immediately unlocked and
available to another thread to call `transition()` again. available to another thread to call `transition()` again.
''' '''
if not from_state in self.__states: if not from_state in self.__states:
raise ValueError( "StateMachine does not contain from_state %s." % from_state ) raise ValueError( "StateMachine does not contain from_state %s." % from_state )
if not to_state in self.__states: if not to_state in self.__states:
raise ValueError( "StateMachine does not contain to_state %s." % to_state ) raise ValueError( "StateMachine does not contain to_state %s." % to_state )
return _StateCtx(self, from_state, to_state, wait) return _StateCtx(self, from_state, to_state, wait)
def ensure(self, state, wait=0.0, block_on_transition=False ): def ensure(self, state, wait=0.0, block_on_transition=False ):
''' '''
Ensure the state machine is currently in `state`, or wait until it enters `state`. Ensure the state machine is currently in `state`, or wait until it enters `state`.
''' '''
return self.ensure_any( (state,), wait=wait, block_on_transition=block_on_transition ) return self.ensure_any( (state,), wait=wait, block_on_transition=block_on_transition )
def ensure_any(self, states, wait=0.0, block_on_transition=False): def ensure_any(self, states, wait=0.0, block_on_transition=False):
''' '''
Ensure we are currently in one of the given `states` or wait until Ensure we are currently in one of the given `states` or wait until
we enter one of those states. we enter one of those states.
Note that due to the nature of the function, you cannot guarantee that Note that due to the nature of the function, you cannot guarantee that
the entirety of some operation completes while you remain in a given the entirety of some operation completes while you remain in a given
state. That would require acquiring and holding a lock, which state. That would require acquiring and holding a lock, which
would mean no other threads could do the same. (You'd essentially would mean no other threads could do the same. (You'd essentially
be serializing all of the threads that are 'ensuring' their tasks be serializing all of the threads that are 'ensuring' their tasks
occurred in some state. occurred in some state.
''' '''
if not (isinstance(states,tuple) or isinstance(states,list)): if not (isinstance(states,tuple) or isinstance(states,list)):
raise ValueError('states arg should be a tuple or list') raise ValueError('states arg should be a tuple or list')
for state in states: for state in states:
if not state in self.__states: if not state in self.__states:
raise ValueError( "StateMachine does not contain state '%s'" % state ) raise ValueError( "StateMachine does not contain state '%s'" % state )
# if we're in the middle of a transition, determine whether we should # if we're in the middle of a transition, determine whether we should
# 'fall back' to the 'current' state, or wait for the new state, in order to # 'fall back' to the 'current' state, or wait for the new state, in order to
# avoid an operation occurring in the wrong state. # avoid an operation occurring in the wrong state.
# TODO another option would be an ensure_ctx that uses a semaphore to allow # TODO another option would be an ensure_ctx that uses a semaphore to allow
# threads to indicate they want to remain in a particular state. # threads to indicate they want to remain in a particular state.
# will return immediately if no transition is in process. # will return immediately if no transition is in process.
if block_on_transition: if block_on_transition:
# we're not in the middle of a transition; don't hold the lock # we're not in the middle of a transition; don't hold the lock
if self.lock.acquire(False): self.lock.release() if self.lock.acquire(False): self.lock.release()
# wait for the transition to complete # wait for the transition to complete
else: self.notifier.wait() else: self.notifier.wait()
start = time.time() start = time.time()
while not self.__current_state in states: while not self.__current_state in states:
# detect timeout: # detect timeout:
remainder = start + wait - time.time() remainder = start + wait - time.time()
if remainder > 0: self.notifier.wait(remainder) if remainder > 0: self.notifier.wait(remainder)
else: return False else: return False
return True return True
def reset(self): def reset(self):
# TODO need to lock before calling this? # TODO need to lock before calling this?
self.transition(self.__current_state, self.__default_state) self.transition(self.__current_state, self.__default_state)
def _set_state(self, state): #unsynchronized, only call internally after lock is acquired def _set_state(self, state): #unsynchronized, only call internally after lock is acquired
self.__current_state = state self.__current_state = state
return state return state
def current_state(self): def current_state(self):
''' '''
Return the current state name. Return the current state name.
''' '''
return self.__current_state return self.__current_state
def __getitem__(self, state): def __getitem__(self, state):
''' '''
Non-blocking, non-synchronized test to determine if we are in the given state. Non-blocking, non-synchronized test to determine if we are in the given state.
Use `StateMachine.ensure(state)` to wait until the machine enters a certain state. Use `StateMachine.ensure(state)` to wait until the machine enters a certain state.
''' '''
return self.__current_state == state return self.__current_state == state
def __str__(self): def __str__(self):
return "".join(( "StateMachine(", ','.join(self.__states), "): ", self.__current_state )) return "".join(( "StateMachine(", ','.join(self.__states), "): ", self.__current_state ))
class _StateCtx: class _StateCtx:
def __init__( self, state_machine, from_state, to_state, wait ): def __init__( self, state_machine, from_state, to_state, wait ):
self.state_machine = state_machine self.state_machine = state_machine
self.from_state = from_state self.from_state = from_state
self.to_state = to_state self.to_state = to_state
self.wait = wait self.wait = wait
self._locked = False self._locked = False
def __enter__(self): def __enter__(self):
start = time.time() start = time.time()
while not self.state_machine[ self.from_state ] or not self.state_machine.lock.acquire(False): while not self.state_machine[ self.from_state ] or not self.state_machine.lock.acquire(False):
# detect timeout: # detect timeout:
remainder = start + self.wait - time.time() remainder = start + self.wait - time.time()
if remainder > 0: self.state_machine.notifier.wait(remainder) if remainder > 0: self.state_machine.notifier.wait(remainder)
else: else:
log.debug('StateMachine timeout while waiting for state: %s', self.from_state ) log.debug('StateMachine timeout while waiting for state: %s', self.from_state )
return False return False
self._locked = True # lock has been acquired at this point self._locked = True # lock has been acquired at this point
self.state_machine.notifier.clear() self.state_machine.notifier.clear()
log.debug('StateMachine entered context in state: %s', log.debug('StateMachine entered context in state: %s',
self.state_machine.current_state() ) self.state_machine.current_state() )
return True return True
def __exit__(self, exc_type, exc_val, exc_tb): def __exit__(self, exc_type, exc_val, exc_tb):
if exc_val is not None: if exc_val is not None:
log.exception( "StateMachine exception in context, remaining in state: %s\n%s:%s", log.exception( "StateMachine exception in context, remaining in state: %s\n%s:%s",
self.state_machine.current_state(), exc_type.__name__, exc_val ) self.state_machine.current_state(), exc_type.__name__, exc_val )
if self._locked: if self._locked:
if exc_val is None: if exc_val is None:
log.debug(' ==== TRANSITION %s -> %s', log.debug(' ==== TRANSITION %s -> %s',
self.state_machine.current_state(), self.to_state) self.state_machine.current_state(), self.to_state)
self.state_machine._set_state( self.to_state ) self.state_machine._set_state( self.to_state )
self.state_machine.notifier.set() self.state_machine.notifier.set()
self.state_machine.lock.release() self.state_machine.lock.release()
return False # re-raise any exception return False # re-raise any exception
if __name__ == '__main__': if __name__ == '__main__':
def callback(s, s2): def callback(s, s2):
print((1, s.transition('on', 'off', wait=0.0, func=callback, args=[s,s2]))) print((1, s.transition('on', 'off', wait=0.0, func=callback, args=[s,s2])))
print((2, s2.transition('off', 'on', func=callback, args=[s,s2]))) print((2, s2.transition('off', 'on', func=callback, args=[s,s2])))
return True return True
s = StateMachine(('off', 'on')) s = StateMachine(('off', 'on'))
s2 = StateMachine(('off', 'on')) s2 = StateMachine(('off', 'on'))
print((3, s.transition('off', 'on', wait=0.0, func=callback, args=[s,s2]),)) print((3, s.transition('off', 'on', wait=0.0, func=callback, args=[s,s2]),))
print((s.current_state(), s2.current_state())) print((s.current_state(), s2.current_state()))