control-c fixes

This commit is contained in:
Nathan Fritz 2010-05-28 19:19:28 -07:00
parent bde1818400
commit 2f1ba368e2
3 changed files with 52 additions and 42 deletions

View file

@ -67,7 +67,7 @@ class TestPubsubServer(unittest.TestCase):
p = self.xmpp2.Presence() p = self.xmpp2.Presence()
p['to'] = self.pshost p['to'] = self.pshost
p.send() p.send()
self.failUnless(self.xmpp1['xep_0060'].create_node(self.pshost, 'testnode2', self.statev['defaultconfig'], ntype='queue')) self.failUnless(self.xmpp1['xep_0060'].create_node(self.pshost, 'testnode2', self.statev['defaultconfig'], ntype='job'))
def test005reconfigure(self): def test005reconfigure(self):
"""Retrieving node config and reconfiguring""" """Retrieving node config and reconfiguring"""
@ -86,31 +86,26 @@ class TestPubsubServer(unittest.TestCase):
item = ET.Element('{http://netflint.net/protocol/test}test') item = ET.Element('{http://netflint.net/protocol/test}test')
w = Waiter('wait publish', StanzaPath('message/pubsub_event/items')) w = Waiter('wait publish', StanzaPath('message/pubsub_event/items'))
self.xmpp2.registerHandler(w) self.xmpp2.registerHandler(w)
result = self.xmpp1['xep_0060'].setItem(self.pshost, "testnode2", (('test1', item),)) #result = self.xmpp1['xep_0060'].setItem(self.pshost, "testnode2", (('test1', item),))
result = self.xmpp1['jobs'].createJob(self.pshost, "testnode2", 'test1', item)
msg = w.wait(5) # got to get a result in 5 seconds msg = w.wait(5) # got to get a result in 5 seconds
self.failUnless(msg != False, "Account #2 did not get message event") self.failUnless(msg != False, "Account #2 did not get message event")
result = self.xmpp1['xep_0060'].setItem(self.pshost, "testnode2", (('test2', item),)) #result = self.xmpp1['xep_0060'].setItem(self.pshost, "testnode2", (('test2', item),))
iq = self.xmpp2.Iq() result = self.xmpp1['jobs'].createJob(self.pshost, "testnode2", 'test2', item)
iq['to'] = self.pshost w = Waiter('wait publish2', StanzaPath('message/pubsub_event/items'))
iq['type'] = 'set' self.xmpp2.registerHandler(w)
iq['psstate']['node'] = 'testnode2' self.xmpp2['jobs'].claimJob(self.pshost, 'testnode2', 'test1')
iq['psstate']['item'] = 'test1' msg = w.wait(5) # got to get a result in 5 seconds
iq['psstate']['payload'] = ET.Element('{http://andyet.net/protocol/pubsubqueue}claimed') self.xmpp2['jobs'].claimJob(self.pshost, 'testnode2', 'test2')
result = iq.send() self.xmpp2['jobs'].finishJob(self.pshost, 'testnode2', 'test1')
time.sleep(10) self.xmpp2['jobs'].finishJob(self.pshost, 'testnode2', 'test2')
iq = self.xmpp2.Iq() print result
iq['to'] = self.pshost
iq['type'] = 'set'
iq['psstate']['node'] = 'testnode2'
iq['psstate']['item'] = 'test2'
iq['psstate']['payload'] = ET.Element('{http://andyet.net/protocol/pubsubqueue}claimed')
result = iq.send()
self.failUnless(result['type'] == 'result')
#need to add check for update #need to add check for update
def test900cleanup(self): def test900cleanup(self):
"Cleaning up" "Cleaning up"
self.failUnless(self.xmpp1['xep_0060'].deleteNode(self.pshost, 'testnode2'), "Could not delete test node.") #self.failUnless(self.xmpp1['xep_0060'].deleteNode(self.pshost, 'testnode2'), "Could not delete test node.")
time.sleep(10)
if __name__ == '__main__': if __name__ == '__main__':
@ -141,10 +136,12 @@ if __name__ == '__main__':
xmpp1.registerPlugin('xep_0030') xmpp1.registerPlugin('xep_0030')
xmpp1.registerPlugin('xep_0060') xmpp1.registerPlugin('xep_0060')
xmpp1.registerPlugin('xep_0199') xmpp1.registerPlugin('xep_0199')
xmpp1.registerPlugin('jobs')
xmpp2.registerPlugin('xep_0004') xmpp2.registerPlugin('xep_0004')
xmpp2.registerPlugin('xep_0030') xmpp2.registerPlugin('xep_0030')
xmpp2.registerPlugin('xep_0060') xmpp2.registerPlugin('xep_0060')
xmpp2.registerPlugin('xep_0199') xmpp2.registerPlugin('xep_0199')
xmpp2.registerPlugin('jobs')
if not config.get('account1', 'server'): if not config.get('account1', 'server'):
# we don't know the server, but the lib can probably figure it out # we don't know the server, but the lib can probably figure it out

View file

@ -31,11 +31,12 @@ class Task(object):
class Scheduler(object): class Scheduler(object):
"""Threaded scheduler that allows for updates mid-execution unlike http://docs.python.org/library/sched.html#module-sched""" """Threaded scheduler that allows for updates mid-execution unlike http://docs.python.org/library/sched.html#module-sched"""
def __init__(self): def __init__(self, parentqueue=None):
self.addq = queue.Queue() self.addq = queue.Queue()
self.schedule = [] self.schedule = []
self.thread = None self.thread = None
self.run = False self.run = False
self.parentqueue = parentqueue
def process(self, threaded=True): def process(self, threaded=True):
if threaded: if threaded:
@ -47,29 +48,37 @@ class Scheduler(object):
def _process(self): def _process(self):
self.run = True self.run = True
while self.run: while self.run:
wait = 5
updated = False
if self.schedule:
wait = self.schedule[0].next - time.time()
try: try:
newtask = self.addq.get(True, wait) wait = 5
except queue.Empty: updated = False
cleanup = [] if self.schedule:
for task in self.schedule: wait = self.schedule[0].next - time.time()
if time.time() >= task.next: try:
updated = True if wait <= 0.0:
if not task.run(): newtask = self.addq.get(False)
cleanup.append(task)
else: else:
break newtask = self.addq.get(True, wait)
for task in cleanup: except queue.Empty:
x = self.schedule.pop(self.schedule.index(task)) cleanup = []
else: for task in self.schedule:
updated = True if time.time() >= task.next:
self.schedule.append(newtask) updated = True
finally: if not task.run():
if updated: self.schedule = sorted(self.schedule, key=lambda task: task.next) cleanup.append(task)
else:
break
for task in cleanup:
x = self.schedule.pop(self.schedule.index(task))
else:
updated = True
self.schedule.append(newtask)
finally:
if updated: self.schedule = sorted(self.schedule, key=lambda task: task.next)
except KeyboardInterrupt:
self.run = False
logging.debug("Qutting Scheduler thread") logging.debug("Qutting Scheduler thread")
if self.parentqueue is not None:
self.parentqueue.put(('quit', None, None))
def add(self, name, seconds, callback, args=None, kwargs=None, repeat=False, qpointer=None): def add(self, name, seconds, callback, args=None, kwargs=None, repeat=False, qpointer=None):
self.addq.put(Task(name, seconds, callback, args, kwargs, repeat, qpointer)) self.addq.put(Task(name, seconds, callback, args, kwargs, repeat, qpointer))

View file

@ -76,7 +76,7 @@ class XMLStream(object):
self.eventqueue = queue.Queue() self.eventqueue = queue.Queue()
self.sendqueue = queue.Queue() self.sendqueue = queue.Queue()
self.scheduler = scheduler.Scheduler() self.scheduler = scheduler.Scheduler(self.eventqueue)
self.namespace_map = {} self.namespace_map = {}
@ -149,6 +149,7 @@ class XMLStream(object):
def process(self, threaded=True): def process(self, threaded=True):
self.scheduler.process(threaded=True) self.scheduler.process(threaded=True)
for t in range(0, HANDLER_THREADS): for t in range(0, HANDLER_THREADS):
logging.debug("Starting HANDLER THREAD")
self.__thread['eventhandle%s' % t] = threading.Thread(name='eventhandle%s' % t, target=self._eventRunner) self.__thread['eventhandle%s' % t] = threading.Thread(name='eventhandle%s' % t, target=self._eventRunner)
self.__thread['eventhandle%s' % t].start() self.__thread['eventhandle%s' % t].start()
self.__thread['sendthread'] = threading.Thread(name='sendthread', target=self._sendThread) self.__thread['sendthread'] = threading.Thread(name='sendthread', target=self._sendThread)
@ -331,6 +332,9 @@ class XMLStream(object):
event = self.eventqueue.get(True, timeout=5) event = self.eventqueue.get(True, timeout=5)
except queue.Empty: except queue.Empty:
event = None event = None
except KeyboardInterrupt:
self.run = False
self.scheduler.run = False
if event is not None: if event is not None:
etype = event[0] etype = event[0]
handler = event[1] handler = event[1]