From cbe76c8a70540d8890c2be97a908a00d93f5e597 Mon Sep 17 00:00:00 2001 From: Lance Stout Date: Wed, 6 Oct 2010 15:03:21 -0400 Subject: [PATCH] Cleaned up the Scheduler. --- sleekxmpp/xmlstream/scheduler.py | 264 ++++++++++++++++++++++--------- 1 file changed, 186 insertions(+), 78 deletions(-) diff --git a/sleekxmpp/xmlstream/scheduler.py b/sleekxmpp/xmlstream/scheduler.py index 945d9fa..0e3d30b 100644 --- a/sleekxmpp/xmlstream/scheduler.py +++ b/sleekxmpp/xmlstream/scheduler.py @@ -1,87 +1,195 @@ -try: - import queue -except ImportError: - import Queue as queue +""" + SleekXMPP: The Sleek XMPP Library + Copyright (C) 2010 Nathanael C. Fritz + This file is part of SleekXMPP. + + See the file LICENSE for copying permission. +""" + import time import threading import logging +try: + import queue +except ImportError: + import Queue as queue + class Task(object): - """Task object for the Scheduler class""" - def __init__(self, name, seconds, callback, args=None, kwargs=None, repeat=False, qpointer=None): - self.name = name - self.seconds = seconds - self.callback = callback - self.args = args or tuple() - self.kwargs = kwargs or {} - self.repeat = repeat - self.next = time.time() + self.seconds - self.qpointer = qpointer - - def run(self): - if self.qpointer is not None: - self.qpointer.put(('schedule', self.callback, self.args)) - else: - self.callback(*self.args, **self.kwargs) - self.reset() - return self.repeat - - def reset(self): - self.next = time.time() + self.seconds + + """ + A scheduled task that will be executed by the scheduler + after a given time interval has passed. + + Attributes: + name -- The name of the task. + seconds -- The number of seconds to wait before executing. + callback -- The function to execute. + args -- The arguments to pass to the callback. + kwargs -- The keyword arguments to pass to the callback. + repeat -- Indicates if the task should repeat. + Defaults to False. + qpointer -- A pointer to an event queue for queuing callback + execution instead of executing immediately. + + Methods: + run -- Either queue or execute the callback. + reset -- Reset the task's timer. + """ + + def __init__(self, name, seconds, callback, args=None, + kwargs=None, repeat=False, qpointer=None): + """ + Create a new task. + + Arguments: + name -- The name of the task. + seconds -- The number of seconds to wait before executing. + callback -- The function to execute. + args -- The arguments to pass to the callback. + kwargs -- The keyword arguments to pass to the callback. + repeat -- Indicates if the task should repeat. + Defaults to False. + qpointer -- A pointer to an event queue for queuing callback + execution instead of executing immediately. + """ + self.name = name + self.seconds = seconds + self.callback = callback + self.args = args or tuple() + self.kwargs = kwargs or {} + self.repeat = repeat + self.next = time.time() + self.seconds + self.qpointer = qpointer + + def run(self): + """ + Execute the task's callback. + + If an event queue was supplied, place the callback in the queue; + otherwise, execute the callback immediately. + """ + if self.qpointer is not None: + self.qpointer.put(('schedule', self.callback, self.args)) + else: + self.callback(*self.args, **self.kwargs) + self.reset() + return self.repeat + + def reset(self): + """ + Reset the task's timer so that it will repeat. + """ + self.next = time.time() + self.seconds + class Scheduler(object): - """Threaded scheduler that allows for updates mid-execution unlike http://docs.python.org/library/sched.html#module-sched""" - def __init__(self, parentqueue=None): - self.addq = queue.Queue() - self.schedule = [] - self.thread = None - self.run = False - self.parentqueue = parentqueue - - def process(self, threaded=True): - if threaded: - self.thread = threading.Thread(name='shedulerprocess', target=self._process) - self.thread.start() - else: - self._process() - def _process(self): - self.run = True - while self.run: - try: - wait = 1 - updated = False - if self.schedule: - wait = self.schedule[0].next - time.time() - try: - if wait <= 0.0: - newtask = self.addq.get(False) - else: - newtask = self.addq.get(True, wait) - except queue.Empty: - cleanup = [] - for task in self.schedule: - if time.time() >= task.next: - updated = True - if not task.run(): - cleanup.append(task) - else: - break - for task in cleanup: - x = self.schedule.pop(self.schedule.index(task)) - else: - updated = True - self.schedule.append(newtask) - finally: - if updated: self.schedule = sorted(self.schedule, key=lambda task: task.next) - except KeyboardInterrupt: - self.run = False - logging.debug("Qutting Scheduler thread") - if self.parentqueue is not None: - self.parentqueue.put(('quit', None, None)) + """ + A threaded scheduler that allows for updates mid-execution unlike the + scheduler in the standard library. - def add(self, name, seconds, callback, args=None, kwargs=None, repeat=False, qpointer=None): - self.addq.put(Task(name, seconds, callback, args, kwargs, repeat, qpointer)) - - def quit(self): - self.run = False + http://docs.python.org/library/sched.html#module-sched + + Attributes: + addq -- A queue storing added tasks. + schedule -- A list of tasks in order of execution times. + thread -- If threaded, the thread processing the schedule. + run -- Indicates if the scheduler is running. + parentqueue -- A parent event queue in control of this scheduler. + + Methods: + add -- Add a new task to the schedule. + process -- Process and schedule tasks. + quit -- Stop the scheduler. + """ + + def __init__(self, parentqueue=None): + """ + Create a new scheduler. + + Arguments: + parentqueue -- A separate event queue controlling this scheduler. + """ + self.addq = queue.Queue() + self.schedule = [] + self.thread = None + self.run = False + self.parentqueue = parentqueue + + def process(self, threaded=True): + """ + Begin accepting and processing scheduled tasks. + + Arguments: + threaded -- Indicates if the scheduler should execute in its own + thread. Defaults to True. + """ + if threaded: + self.thread = threading.Thread(name='sheduler_process', + target=self._process) + self.thread.start() + else: + self._process() + + def _process(self): + """Process scheduled tasks.""" + self.run = True + while self.run: + try: + wait = 1 + updated = False + if self.schedule: + wait = self.schedule[0].next - time.time() + try: + if wait <= 0.0: + newtask = self.addq.get(False) + else: + newtask = self.addq.get(True, wait) + except queue.Empty: + cleanup = [] + for task in self.schedule: + if time.time() >= task.next: + updated = True + if not task.run(): + cleanup.append(task) + else: + break + for task in cleanup: + x = self.schedule.pop(self.schedule.index(task)) + else: + updated = True + self.schedule.append(newtask) + finally: + if updated: + self.schedule = sorted(self.schedule, + key=lambda task: task.next) + except KeyboardInterrupt: + self.run = False + + logging.debug("Qutting Scheduler thread") + if self.parentqueue is not None: + self.parentqueue.put(('quit', None, None)) + + def add(self, name, seconds, callback, args=None, + kwargs=None, repeat=False, qpointer=None): + """ + Schedule a new task. + + Arguments: + name -- The name of the task. + seconds -- The number of seconds to wait before executing. + callback -- The function to execute. + args -- The arguments to pass to the callback. + kwargs -- The keyword arguments to pass to the callback. + repeat -- Indicates if the task should repeat. + Defaults to False. + qpointer -- A pointer to an event queue for queuing callback + execution instead of executing immediately. + """ + self.addq.put(Task(name, seconds, callback, args, + kwargs, repeat, qpointer)) + + def quit(self): + """Shutdown the scheduler.""" + self.run = False