adding scheduler

This commit is contained in:
Nathan Fritz 2010-05-26 18:32:28 -07:00
parent 1e3a6e1b5f
commit 0bda5fd3f2
2 changed files with 82 additions and 3 deletions

View file

@ -0,0 +1,76 @@
try:
import queue
except ImportError:
import Queue as queue
import time
import threading
class Task(object):
"""Task object for the Scheduler class"""
def __init__(self, name, seconds, callback, args=None, kwargs=None, repeat=False, qpointer=None):
self.name = name
self.seconds = seconds
self.callback = callback
self.args = args or tuple()
self.kwargs = kwargs or {}
self.repeat = repeat
self.next = time.time() + self.seconds
self.qpointer = qpointer
def run(self):
if self.qpointer is not None:
self.qpointer.put(('schedule', self.callback, self.args))
else:
self.callback(*self.args, **self.kwargs)
self.reset()
return self.repeat
def reset(self):
self.next = time.time() + self.seconds
class Scheduler(object):
"""Threaded scheduler that allows for updates mid-execution unlike http://docs.python.org/library/sched.html#module-sched"""
def __init__(self):
self.addq = queue.Queue()
self.schedule = []
self.thread = None
self.run = True
def process(self, threaded=True):
if threaded:
self.thread = threading.Thread(name='shedulerprocess', target=self._process)
self.thread.start()
else:
self._process()
def _process(self):
while self.run:
wait = 5
updated = False
if self.schedule:
wait = self.schedule[0].next - time.time()
try:
newtask = self.addq.get(True, wait)
except queue.Empty:
cleanup = []
for task in self.schedule:
if time.time() >= task.next:
updated = True
if not task.run():
cleanup.append(task)
else:
break
for task in cleanup:
x = self.schedule.pop(self.schedule.index(task))
else:
updated = True
self.schedule.append(newtask)
finally:
if updated: self.schedule = sorted(self.schedule, key=lambda task: task.next)
print [x.name for x in self.schedule]
def add(self, name, seconds, callback, args=None, kwargs=None, repeat=False, qpointer=None):
self.addq.put(Task(name, seconds, callback, args, kwargs, repeat, qpointer))
def quit(self):
self.run = False

View file

@ -22,6 +22,7 @@ import time
import traceback import traceback
import types import types
import xml.sax.saxutils import xml.sax.saxutils
from . import scheduler
HANDLER_THREADS = 1 HANDLER_THREADS = 1
@ -75,6 +76,7 @@ class XMLStream(object):
self.eventqueue = queue.Queue() self.eventqueue = queue.Queue()
self.sendqueue = queue.Queue() self.sendqueue = queue.Queue()
self.scheduler = scheduler.Scheduler()
self.namespace_map = {} self.namespace_map = {}
@ -145,6 +147,7 @@ class XMLStream(object):
raise RestartStream() raise RestartStream()
def process(self, threaded=True): def process(self, threaded=True):
self.scheduler.process(threaded=True)
for t in range(0, HANDLER_THREADS): for t in range(0, HANDLER_THREADS):
self.__thread['eventhandle%s' % t] = threading.Thread(name='eventhandle%s' % t, target=self._eventRunner) self.__thread['eventhandle%s' % t] = threading.Thread(name='eventhandle%s' % t, target=self._eventRunner)
self.__thread['eventhandle%s' % t].start() self.__thread['eventhandle%s' % t].start()
@ -156,8 +159,8 @@ class XMLStream(object):
else: else:
self._process() self._process()
def schedule(self, seconds, handler, args=None): def schedule(self, name, seconds, callback, args=None, kwargs=None, repeat=False):
threading.Timer(seconds, handler, args).start() self.scheduler.add(name, seconds, callback, args, kwargs, repeat, qpointer=self.eventqueue)
def _process(self): def _process(self):
"Start processing the socket." "Start processing the socket."
@ -334,7 +337,7 @@ class XMLStream(object):
except Exception as e: except Exception as e:
traceback.print_exc() traceback.print_exc()
args[0].exception(e) args[0].exception(e)
elif etype == 'sched': elif etype == 'schedule':
try: try:
handler.run(*args) handler.run(*args)
except: except: