Update scheduler with locks and ability to remove tasks.

Scheduled tasks must have a unique name.
This commit is contained in:
Lance Stout 2011-08-25 13:34:30 -07:00
parent 63b8444abe
commit 4c08c9c524
2 changed files with 53 additions and 24 deletions

View file

@ -73,7 +73,8 @@ class Task(object):
otherwise, execute the callback immediately. otherwise, execute the callback immediately.
""" """
if self.qpointer is not None: if self.qpointer is not None:
self.qpointer.put(('schedule', self.callback, self.args)) self.qpointer.put(('schedule', self.callback,
self.args, self.name))
else: else:
self.callback(*self.args, **self.kwargs) self.callback(*self.args, **self.kwargs)
self.reset() self.reset()
@ -95,31 +96,32 @@ class Scheduler(object):
http://docs.python.org/library/sched.html#module-sched http://docs.python.org/library/sched.html#module-sched
Attributes: Attributes:
addq -- A queue storing added tasks. addq -- A queue storing added tasks.
schedule -- A list of tasks in order of execution times. schedule -- A list of tasks in order of execution times.
thread -- If threaded, the thread processing the schedule. thread -- If threaded, the thread processing the schedule.
run -- Indicates if the scheduler is running. run -- Indicates if the scheduler is running.
parentqueue -- A parent event queue in control of this scheduler. stop -- Threading event indicating if the main process
has been stopped.
Methods: Methods:
add -- Add a new task to the schedule. add -- Add a new task to the schedule.
process -- Process and schedule tasks. process -- Process and schedule tasks.
quit -- Stop the scheduler. quit -- Stop the scheduler.
""" """
def __init__(self, parentqueue=None, parentstop=None): def __init__(self, parentstop=None):
""" """
Create a new scheduler. Create a new scheduler.
Arguments: Arguments:
parentqueue -- A separate event queue controlling this scheduler. parentstop -- A threading event indicating if the main process has
been stopped.
""" """
self.addq = queue.Queue() self.addq = queue.Queue()
self.schedule = [] self.schedule = []
self.thread = None self.thread = None
self.run = False self.run = False
self.parentqueue = parentqueue self.stop = parentstop
self.parentstop = parentstop self.schedule_lock = threading.RLock()
def process(self, threaded=True): def process(self, threaded=True):
""" """
@ -141,8 +143,7 @@ class Scheduler(object):
"""Process scheduled tasks.""" """Process scheduled tasks."""
self.run = True self.run = True
try: try:
while self.run and (self.parentstop is None or \ while self.run and not self.stop.isSet():
not self.parentstop.isSet()):
wait = 1 wait = 1
updated = False updated = False
if self.schedule: if self.schedule:
@ -156,6 +157,7 @@ class Scheduler(object):
newtask = self.addq.get(True, wait) newtask = self.addq.get(True, wait)
except queue.Empty: except queue.Empty:
cleanup = [] cleanup = []
self.schedule_lock.acquire()
for task in self.schedule: for task in self.schedule:
if time.time() >= task.next: if time.time() >= task.next:
updated = True updated = True
@ -167,23 +169,18 @@ class Scheduler(object):
x = self.schedule.pop(self.schedule.index(task)) x = self.schedule.pop(self.schedule.index(task))
else: else:
updated = True updated = True
self.schedule_lock.acquire()
self.schedule.append(newtask) self.schedule.append(newtask)
finally: finally:
if updated: if updated:
self.schedule = sorted(self.schedule, self.schedule = sorted(self.schedule,
key=lambda task: task.next) key=lambda task: task.next)
self.schedule_lock.release()
except KeyboardInterrupt: except KeyboardInterrupt:
self.run = False self.run = False
if self.parentstop is not None:
log.debug("stopping parent")
self.parentstop.set()
except SystemExit: except SystemExit:
self.run = False self.run = False
if self.parentstop is not None:
self.parentstop.set()
log.debug("Quitting Scheduler thread") log.debug("Quitting Scheduler thread")
if self.parentqueue is not None:
self.parentqueue.put(('quit', None, None))
def add(self, name, seconds, callback, args=None, def add(self, name, seconds, callback, args=None,
kwargs=None, repeat=False, qpointer=None): kwargs=None, repeat=False, qpointer=None):
@ -201,8 +198,39 @@ class Scheduler(object):
qpointer -- A pointer to an event queue for queuing callback qpointer -- A pointer to an event queue for queuing callback
execution instead of executing immediately. execution instead of executing immediately.
""" """
self.addq.put(Task(name, seconds, callback, args, try:
kwargs, repeat, qpointer)) self.schedule_lock.acquire()
for task in self.schedule:
if task.name == name:
raise ValueError("Key %s already exists" % name)
self.addq.put(Task(name, seconds, callback, args,
kwargs, repeat, qpointer))
except:
raise
finally:
self.schedule_lock.release()
def remove(self, name):
"""
Remove a scheduled task ahead of schedule, and without
executing it.
Arguments:
name -- The name of the task to remove.
"""
try:
self.schedule_lock.acquire()
the_task = None
for task in self.schedule:
if task.name == name:
the_task = task
if the_task is not None:
self.schedule.remove(the_task)
except:
raise
finally:
self.schedule_lock.release()
def quit(self): def quit(self):
"""Shutdown the scheduler.""" """Shutdown the scheduler."""

View file

@ -215,7 +215,7 @@ class XMLStream(object):
self.event_queue = queue.Queue() self.event_queue = queue.Queue()
self.send_queue = queue.Queue() self.send_queue = queue.Queue()
self.__failed_send_stanza = None self.__failed_send_stanza = None
self.scheduler = Scheduler(self.event_queue, self.stop) self.scheduler = Scheduler(self.stop)
self.namespace_map = {StanzaBase.xml_ns: 'xml'} self.namespace_map = {StanzaBase.xml_ns: 'xml'}
@ -1178,8 +1178,9 @@ class XMLStream(object):
log.exception(error_msg % handler.name) log.exception(error_msg % handler.name)
orig.exception(e) orig.exception(e)
elif etype == 'schedule': elif etype == 'schedule':
name = args[1]
try: try:
log.debug('Scheduled event: %s' % args) log.debug('Scheduled event: %s: %s' % (name, args[0]))
handler(*args[0]) handler(*args[0])
except Exception as e: except Exception as e:
log.exception('Error processing scheduled task') log.exception('Error processing scheduled task')