Fix infinite callback loop.

This commit is contained in:
Lance Stout 2012-02-03 16:03:46 +01:00
parent 021c57205f
commit 85dd005abc
4 changed files with 139 additions and 20 deletions

View file

@ -22,7 +22,7 @@ class xep_0047(base_plugin):
self.stanza = stanza self.stanza = stanza
self.streams = {} self.streams = {}
self.pending_streams = {} self.pending_streams = {3: 5}
self.pending_close_streams = {} self.pending_close_streams = {}
self._stream_lock = threading.Lock() self._stream_lock = threading.Lock()
@ -63,9 +63,6 @@ class xep_0047(base_plugin):
def open_stream(self, jid, block_size=4096, sid=None, window=1, def open_stream(self, jid, block_size=4096, sid=None, window=1,
ifrom=None, block=True, timeout=None, callback=None): ifrom=None, block=True, timeout=None, callback=None):
if not block and callback is not None:
callback = lambda resp: self._handle_opened_stream(resp, callback)
if sid is None: if sid is None:
sid = str(uuid.uuid4()) sid = str(uuid.uuid4())
@ -77,29 +74,40 @@ class xep_0047(base_plugin):
iq['ibb_open']['sid'] = sid iq['ibb_open']['sid'] = sid
iq['ibb_open']['stanza'] = 'iq' iq['ibb_open']['stanza'] = 'iq'
stream = IBBytestream(self.xmpp, sid, size, stream = IBBytestream(self.xmpp, sid, block_size,
iq['to'], iq['from'], window) iq['to'], iq['from'], window)
with self._stream_lock: with self._stream_lock:
self.pending_streams[iq['id']] = stream self.pending_streams[iq['id']] = stream
resp = iq.send(block=block, timeout=timeout, callback=callback) self.pending_streams[iq['id']] = stream
if block: if block:
resp = iq.send(timeout=timeout)
self._handle_opened_stream(resp) self._handle_opened_stream(resp)
return stream return stream
else:
cb = None
if callback is not None:
def chained(resp):
self._handle_opened_stream(resp)
callback(resp)
cb = chained
else:
cb = self._handle_opened_stream
return iq.send(block=block, timeout=timeout, callback=cb)
def _handle_opened_stream(self, iq, callback=None):
def _handle_opened_stream(self, iq):
if iq['type'] == 'result': if iq['type'] == 'result':
with self._stream_lock: with self._stream_lock:
stream = self.pending_streams[iq['id']] stream = self.pending_streams.get(iq['id'], None)
stream.sender = iq['to'] if stream is not None:
stream.receiver = iq['from'] stream.sender = iq['to']
stream.stream_started.set() stream.receiver = iq['from']
self.streams[stream.sid] = stream stream.stream_started.set()
self.xmpp.event('ibb_stream_start', stream) self.streams[stream.sid] = stream
self.xmpp.event('ibb_stream_start', stream)
if callback is not None:
callback(iq)
with self._stream_lock: with self._stream_lock:
if iq['id'] in self.pending_streams: if iq['id'] in self.pending_streams:
@ -114,7 +122,7 @@ class xep_0047(base_plugin):
if size > self.max_block_size: if size > self.max_block_size:
raise XMPPError('resource-constraint') raise XMPPError('resource-constraint')
stream = IBBytestream(self.xmpp, sid, size, stream = IBBytestream(self.xmpp, sid, size,
iq['from'], iq['to'], iq['from'], iq['to'],
self.window_size) self.window_size)
stream.stream_started.set() stream.stream_started.set()

View file

@ -53,7 +53,7 @@ class Data(ElementBase):
raise XMPPError('not-acceptable') raise XMPPError('not-acceptable')
def set_data(self, value): def set_data(self, value):
self.xml.text = to_64(value) self.xml.text = to_b64(value)
def del_data(self): def del_data(self):
self.xml.text = '' self.xml.text = ''

View file

@ -73,5 +73,18 @@ class TestIBB(SleekTest):
self.assertTrue(errored, "ABCD?EFGH did not raise base64 error") self.assertTrue(errored, "ABCD?EFGH did not raise base64 error")
def testConvertData(self):
"""Test that data is converted to base64"""
iq = Iq()
iq['type'] = 'set'
iq['ibb_data']['seq'] = 0
iq['ibb_data']['data'] = 'sleekxmpp'
self.check(iq, """
<iq type="set">
<data xmlns="http://jabber.org/protocol/ibb" seq="0">c2xlZWt4bXBw</data>
</iq>
""")
suite = unittest.TestLoader().loadTestsFromTestCase(TestIBB) suite = unittest.TestLoader().loadTestsFromTestCase(TestIBB)

View file

@ -0,0 +1,98 @@
import threading
import time
from sleekxmpp.test import *
class TestInBandByteStreams(SleekTest):
def setUp(self):
self.stream_start(plugins=['xep_0047', 'xep_0030'])
def tearDown(self):
self.stream_close()
def testOpenStream(self):
"""Test requesting a stream, successfully"""
events = []
def on_stream_start(stream):
events.append('ibb_stream_start')
self.xmpp.add_event_handler('ibb_stream_start', on_stream_start)
t = threading.Thread(name='open_stream',
target=self.xmpp['xep_0047'].open_stream,
args=('tester@localhost/receiver',),
kwargs={'sid': 'testing'})
t.start()
self.send("""
<iq type="set" to="tester@localhost/receiver" id="1">
<open xmlns="http://jabber.org/protocol/ibb"
sid="testing"
block-size="4096"
stanza="iq" />
</iq>
""")
self.recv("""
<iq type="result" id="1"
to="tester@localhost"
from="tester@localhost/receiver" />
""")
t.join()
time.sleep(0.2)
self.assertEqual(events, ['ibb_stream_start'])
def testAysncOpenStream(self):
"""Test requesting a stream, aysnc"""
events = set()
def on_stream_start(stream):
events.add('ibb_stream_start')
def stream_callback(iq):
events.add('callback')
self.xmpp.add_event_handler('ibb_stream_start', on_stream_start)
t = threading.Thread(name='open_stream',
target=self.xmpp['xep_0047'].open_stream,
args=('tester@localhost/receiver',),
kwargs={'sid': 'testing',
'block': False,
'callback': stream_callback})
t.start()
self.send("""
<iq type="set" to="tester@localhost/receiver" id="1">
<open xmlns="http://jabber.org/protocol/ibb"
sid="testing"
block-size="4096"
stanza="iq" />
</iq>
""")
self.recv("""
<iq type="result" id="1"
to="tester@localhost"
from="tester@localhost/receiver" />
""")
t.join()
time.sleep(0.2)
self.assertEqual(events, set(['ibb_stream_start', 'callback']))
suite = unittest.TestLoader().loadTestsFromTestCase(TestInBandByteStreams)