Cleaned up the Scheduler.

This commit is contained in:
Lance Stout 2010-10-06 15:03:21 -04:00
parent 77b8f0f4bb
commit cbe76c8a70

View file

@ -1,87 +1,195 @@
try: """
import queue SleekXMPP: The Sleek XMPP Library
except ImportError: Copyright (C) 2010 Nathanael C. Fritz
import Queue as queue This file is part of SleekXMPP.
See the file LICENSE for copying permission.
"""
import time import time
import threading import threading
import logging import logging
try:
import queue
except ImportError:
import Queue as queue
class Task(object): class Task(object):
"""Task object for the Scheduler class"""
def __init__(self, name, seconds, callback, args=None, kwargs=None, repeat=False, qpointer=None): """
self.name = name A scheduled task that will be executed by the scheduler
self.seconds = seconds after a given time interval has passed.
self.callback = callback
self.args = args or tuple() Attributes:
self.kwargs = kwargs or {} name -- The name of the task.
self.repeat = repeat seconds -- The number of seconds to wait before executing.
self.next = time.time() + self.seconds callback -- The function to execute.
self.qpointer = qpointer args -- The arguments to pass to the callback.
kwargs -- The keyword arguments to pass to the callback.
def run(self): repeat -- Indicates if the task should repeat.
if self.qpointer is not None: Defaults to False.
self.qpointer.put(('schedule', self.callback, self.args)) qpointer -- A pointer to an event queue for queuing callback
else: execution instead of executing immediately.
self.callback(*self.args, **self.kwargs)
self.reset() Methods:
return self.repeat run -- Either queue or execute the callback.
reset -- Reset the task's timer.
def reset(self): """
self.next = time.time() + self.seconds
def __init__(self, name, seconds, callback, args=None,
kwargs=None, repeat=False, qpointer=None):
"""
Create a new task.
Arguments:
name -- The name of the task.
seconds -- The number of seconds to wait before executing.
callback -- The function to execute.
args -- The arguments to pass to the callback.
kwargs -- The keyword arguments to pass to the callback.
repeat -- Indicates if the task should repeat.
Defaults to False.
qpointer -- A pointer to an event queue for queuing callback
execution instead of executing immediately.
"""
self.name = name
self.seconds = seconds
self.callback = callback
self.args = args or tuple()
self.kwargs = kwargs or {}
self.repeat = repeat
self.next = time.time() + self.seconds
self.qpointer = qpointer
def run(self):
"""
Execute the task's callback.
If an event queue was supplied, place the callback in the queue;
otherwise, execute the callback immediately.
"""
if self.qpointer is not None:
self.qpointer.put(('schedule', self.callback, self.args))
else:
self.callback(*self.args, **self.kwargs)
self.reset()
return self.repeat
def reset(self):
"""
Reset the task's timer so that it will repeat.
"""
self.next = time.time() + self.seconds
class Scheduler(object): class Scheduler(object):
"""Threaded scheduler that allows for updates mid-execution unlike http://docs.python.org/library/sched.html#module-sched"""
def __init__(self, parentqueue=None):
self.addq = queue.Queue()
self.schedule = []
self.thread = None
self.run = False
self.parentqueue = parentqueue
def process(self, threaded=True):
if threaded:
self.thread = threading.Thread(name='shedulerprocess', target=self._process)
self.thread.start()
else:
self._process()
def _process(self): """
self.run = True A threaded scheduler that allows for updates mid-execution unlike the
while self.run: scheduler in the standard library.
try:
wait = 1
updated = False
if self.schedule:
wait = self.schedule[0].next - time.time()
try:
if wait <= 0.0:
newtask = self.addq.get(False)
else:
newtask = self.addq.get(True, wait)
except queue.Empty:
cleanup = []
for task in self.schedule:
if time.time() >= task.next:
updated = True
if not task.run():
cleanup.append(task)
else:
break
for task in cleanup:
x = self.schedule.pop(self.schedule.index(task))
else:
updated = True
self.schedule.append(newtask)
finally:
if updated: self.schedule = sorted(self.schedule, key=lambda task: task.next)
except KeyboardInterrupt:
self.run = False
logging.debug("Qutting Scheduler thread")
if self.parentqueue is not None:
self.parentqueue.put(('quit', None, None))
def add(self, name, seconds, callback, args=None, kwargs=None, repeat=False, qpointer=None): http://docs.python.org/library/sched.html#module-sched
self.addq.put(Task(name, seconds, callback, args, kwargs, repeat, qpointer))
Attributes:
def quit(self): addq -- A queue storing added tasks.
self.run = False schedule -- A list of tasks in order of execution times.
thread -- If threaded, the thread processing the schedule.
run -- Indicates if the scheduler is running.
parentqueue -- A parent event queue in control of this scheduler.
Methods:
add -- Add a new task to the schedule.
process -- Process and schedule tasks.
quit -- Stop the scheduler.
"""
def __init__(self, parentqueue=None):
"""
Create a new scheduler.
Arguments:
parentqueue -- A separate event queue controlling this scheduler.
"""
self.addq = queue.Queue()
self.schedule = []
self.thread = None
self.run = False
self.parentqueue = parentqueue
def process(self, threaded=True):
"""
Begin accepting and processing scheduled tasks.
Arguments:
threaded -- Indicates if the scheduler should execute in its own
thread. Defaults to True.
"""
if threaded:
self.thread = threading.Thread(name='sheduler_process',
target=self._process)
self.thread.start()
else:
self._process()
def _process(self):
"""Process scheduled tasks."""
self.run = True
while self.run:
try:
wait = 1
updated = False
if self.schedule:
wait = self.schedule[0].next - time.time()
try:
if wait <= 0.0:
newtask = self.addq.get(False)
else:
newtask = self.addq.get(True, wait)
except queue.Empty:
cleanup = []
for task in self.schedule:
if time.time() >= task.next:
updated = True
if not task.run():
cleanup.append(task)
else:
break
for task in cleanup:
x = self.schedule.pop(self.schedule.index(task))
else:
updated = True
self.schedule.append(newtask)
finally:
if updated:
self.schedule = sorted(self.schedule,
key=lambda task: task.next)
except KeyboardInterrupt:
self.run = False
logging.debug("Qutting Scheduler thread")
if self.parentqueue is not None:
self.parentqueue.put(('quit', None, None))
def add(self, name, seconds, callback, args=None,
kwargs=None, repeat=False, qpointer=None):
"""
Schedule a new task.
Arguments:
name -- The name of the task.
seconds -- The number of seconds to wait before executing.
callback -- The function to execute.
args -- The arguments to pass to the callback.
kwargs -- The keyword arguments to pass to the callback.
repeat -- Indicates if the task should repeat.
Defaults to False.
qpointer -- A pointer to an event queue for queuing callback
execution instead of executing immediately.
"""
self.addq.put(Task(name, seconds, callback, args,
kwargs, repeat, qpointer))
def quit(self):
"""Shutdown the scheduler."""
self.run = False