SleekXMPP/tests/test_stream_xep_0030.py
Lance Stout 2a2ac73845 So using sys.excepthook to catch errors only works once.
The error bubbles through the event processing loop, breaking it and
hanging the application.

Instead, there is now a .exception(e) method on XMLStream which may
be overridden or reassigned that will receive all unhandled exceptions
(read: not XMPPError) from event and stream handlers.
2011-07-01 15:18:10 -07:00

576 lines
19 KiB
Python

import sys
import time
import threading
from sleekxmpp.test import *
class TestStreamDisco(SleekTest):
"""
Test using the XEP-0030 plugin.
"""
def tearDown(self):
self.stream_close()
def testInfoEmptyDefaultNode(self):
"""
Info query result from an entity MUST have at least one identity
and feature, namely http://jabber.org/protocol/disco#info.
Since the XEP-0030 plugin is loaded, a disco response should
be generated and not an error result.
"""
self.stream_start(mode='client',
plugins=['xep_0030'])
self.recv("""
<iq type="get" id="test">
<query xmlns="http://jabber.org/protocol/disco#info" />
</iq>
""")
self.send("""
<iq type="result" id="test">
<query xmlns="http://jabber.org/protocol/disco#info">
<identity category="client" type="bot" />
<feature var="http://jabber.org/protocol/disco#info" />
</query>
</iq>
""")
def testInfoEmptyDefaultNodeComponent(self):
"""
Test requesting an empty, default node using a Component.
"""
self.stream_start(mode='component',
jid='tester.localhost',
plugins=['xep_0030'])
self.recv("""
<iq type="get" id="test">
<query xmlns="http://jabber.org/protocol/disco#info" />
</iq>
""")
self.send("""
<iq type="result" id="test">
<query xmlns="http://jabber.org/protocol/disco#info">
<identity category="component" type="generic" />
<feature var="http://jabber.org/protocol/disco#info" />
</query>
</iq>
""")
def testInfoIncludeNode(self):
"""
Results for info queries directed to a particular node MUST
include the node in the query response.
"""
self.stream_start(mode='client',
plugins=['xep_0030'])
self.xmpp['xep_0030'].static.add_node(node='testing')
self.recv("""
<iq to="tester@localhost" type="get" id="test">
<query xmlns="http://jabber.org/protocol/disco#info"
node="testing" />
</iq>
""")
self.send("""
<iq type="result" id="test">
<query xmlns="http://jabber.org/protocol/disco#info"
node="testing">
</query>
</iq>""",
method='mask')
def testItemsIncludeNode(self):
"""
Results for items queries directed to a particular node MUST
include the node in the query response.
"""
self.stream_start(mode='client',
plugins=['xep_0030'])
self.xmpp['xep_0030'].static.add_node(node='testing')
self.recv("""
<iq to="tester@localhost" type="get" id="test">
<query xmlns="http://jabber.org/protocol/disco#items"
node="testing" />
</iq>
""")
self.send("""
<iq type="result" id="test">
<query xmlns="http://jabber.org/protocol/disco#items"
node="testing">
</query>
</iq>""",
method='mask')
def testDynamicInfoJID(self):
"""
Test using a dynamic info handler for a particular JID.
"""
self.stream_start(mode='client',
plugins=['xep_0030'])
def dynamic_jid(jid, node, iq):
result = self.xmpp['xep_0030'].stanza.DiscoInfo()
result['node'] = node
result.add_identity('client', 'console', name='Dynamic Info')
return result
self.xmpp['xep_0030'].set_node_handler('get_info',
jid='tester@localhost',
handler=dynamic_jid)
self.recv("""
<iq type="get" id="test" to="tester@localhost">
<query xmlns="http://jabber.org/protocol/disco#info"
node="testing" />
</iq>
""")
self.send("""
<iq type="result" id="test">
<query xmlns="http://jabber.org/protocol/disco#info"
node="testing">
<identity category="client"
type="console"
name="Dynamic Info" />
</query>
</iq>
""")
def testDynamicInfoGlobal(self):
"""
Test using a dynamic info handler for all requests.
"""
self.stream_start(mode='component',
jid='tester.localhost',
plugins=['xep_0030'])
def dynamic_global(jid, node, iq):
result = self.xmpp['xep_0030'].stanza.DiscoInfo()
result['node'] = node
result.add_identity('component', 'generic', name='Dynamic Info')
return result
self.xmpp['xep_0030'].set_node_handler('get_info',
handler=dynamic_global)
self.recv("""
<iq type="get" id="test"
to="user@tester.localhost"
from="tester@localhost">
<query xmlns="http://jabber.org/protocol/disco#info"
node="testing" />
</iq>
""")
self.send("""
<iq type="result" id="test"
to="tester@localhost"
from="user@tester.localhost">
<query xmlns="http://jabber.org/protocol/disco#info"
node="testing">
<identity category="component"
type="generic"
name="Dynamic Info" />
</query>
</iq>
""")
def testOverrideJIDInfoHandler(self):
"""Test overriding a JID info handler."""
self.stream_start(mode='client',
plugins=['xep_0030'])
def dynamic_jid(jid, node, iq):
result = self.xmpp['xep_0030'].stanza.DiscoInfo()
result['node'] = node
result.add_identity('client', 'console', name='Dynamic Info')
return result
self.xmpp['xep_0030'].set_node_handler('get_info',
jid='tester@localhost',
handler=dynamic_jid)
self.xmpp['xep_0030'].make_static(jid='tester@localhost',
node='testing')
self.xmpp['xep_0030'].add_identity(jid='tester@localhost',
node='testing',
category='automation',
itype='command-list')
self.recv("""
<iq type="get" id="test" to="tester@localhost">
<query xmlns="http://jabber.org/protocol/disco#info"
node="testing" />
</iq>
""")
self.send("""
<iq type="result" id="test">
<query xmlns="http://jabber.org/protocol/disco#info"
node="testing">
<identity category="automation"
type="command-list" />
</query>
</iq>
""")
def testOverrideGlobalInfoHandler(self):
"""Test overriding the global JID info handler."""
self.stream_start(mode='component',
jid='tester.localhost',
plugins=['xep_0030'])
def dynamic_global(jid, node, iq):
result = self.xmpp['xep_0030'].stanza.DiscoInfo()
result['node'] = node
result.add_identity('component', 'generic', name='Dynamic Info')
return result
self.xmpp['xep_0030'].set_node_handler('get_info',
handler=dynamic_global)
self.xmpp['xep_0030'].make_static(jid='user@tester.localhost',
node='testing')
self.xmpp['xep_0030'].add_feature(jid='user@tester.localhost',
node='testing',
feature='urn:xmpp:ping')
self.recv("""
<iq type="get" id="test"
to="user@tester.localhost"
from="tester@localhost">
<query xmlns="http://jabber.org/protocol/disco#info"
node="testing" />
</iq>
""")
self.send("""
<iq type="result" id="test"
to="tester@localhost"
from="user@tester.localhost">
<query xmlns="http://jabber.org/protocol/disco#info"
node="testing">
<feature var="urn:xmpp:ping" />
</query>
</iq>
""")
def testGetInfoRemote(self):
"""
Test sending a disco#info query to another entity
and receiving the result.
"""
self.stream_start(mode='client',
plugins=['xep_0030'])
events = set()
def handle_disco_info(iq):
events.add('disco_info')
self.xmpp.add_event_handler('disco_info', handle_disco_info)
t = threading.Thread(name="get_info",
target=self.xmpp['xep_0030'].get_info,
args=('user@localhost', 'foo'))
t.start()
self.send("""
<iq type="get" to="user@localhost" id="1">
<query xmlns="http://jabber.org/protocol/disco#info"
node="foo" />
</iq>
""")
self.recv("""
<iq type="result" to="tester@localhost" id="1">
<query xmlns="http://jabber.org/protocol/disco#info"
node="foo">
<identity category="client" type="bot" />
<feature var="urn:xmpp:ping" />
</query>
</iq>
""")
# Wait for disco#info request to be received.
t.join()
time.sleep(0.1)
self.assertEqual(events, set(('disco_info',)),
"Disco info event was not triggered: %s" % events)
def testDynamicItemsJID(self):
"""
Test using a dynamic items handler for a particular JID.
"""
self.stream_start(mode='client',
plugins=['xep_0030'])
def dynamic_jid(jid, node, iq):
result = self.xmpp['xep_0030'].stanza.DiscoItems()
result['node'] = node
result.add_item('tester@localhost', node='foo', name='JID')
return result
self.xmpp['xep_0030'].set_node_handler('get_items',
jid='tester@localhost',
handler=dynamic_jid)
self.recv("""
<iq type="get" id="test" to="tester@localhost">
<query xmlns="http://jabber.org/protocol/disco#items"
node="testing" />
</iq>
""")
self.send("""
<iq type="result" id="test">
<query xmlns="http://jabber.org/protocol/disco#items"
node="testing">
<item jid="tester@localhost" node="foo" name="JID" />
</query>
</iq>
""")
def testDynamicItemsGlobal(self):
"""
Test using a dynamic items handler for all requests.
"""
self.stream_start(mode='component',
jid='tester.localhost',
plugins=['xep_0030'])
def dynamic_global(jid, node, iq):
result = self.xmpp['xep_0030'].stanza.DiscoItems()
result['node'] = node
result.add_item('tester@localhost', node='foo', name='Global')
return result
self.xmpp['xep_0030'].set_node_handler('get_items',
handler=dynamic_global)
self.recv("""
<iq type="get" id="test"
to="user@tester.localhost"
from="tester@localhost">
<query xmlns="http://jabber.org/protocol/disco#items"
node="testing" />
</iq>
""")
self.send("""
<iq type="result" id="test"
to="tester@localhost"
from="user@tester.localhost">
<query xmlns="http://jabber.org/protocol/disco#items"
node="testing">
<item jid="tester@localhost" node="foo" name="Global" />
</query>
</iq>
""")
def testOverrideJIDItemsHandler(self):
"""Test overriding a JID items handler."""
self.stream_start(mode='client',
plugins=['xep_0030'])
def dynamic_jid(jid, node, iq):
result = self.xmpp['xep_0030'].stanza.DiscoItems()
result['node'] = node
result.add_item('tester@localhost', node='foo', name='Global')
return result
self.xmpp['xep_0030'].set_node_handler('get_items',
jid='tester@localhost',
handler=dynamic_jid)
self.xmpp['xep_0030'].make_static(jid='tester@localhost',
node='testing')
self.xmpp['xep_0030'].add_item(ijid='tester@localhost',
node='testing',
jid='tester@localhost',
subnode='foo',
name='Test')
self.recv("""
<iq type="get" id="test" to="tester@localhost">
<query xmlns="http://jabber.org/protocol/disco#items"
node="testing" />
</iq>
""")
self.send("""
<iq type="result" id="test">
<query xmlns="http://jabber.org/protocol/disco#items"
node="testing">
<item jid="tester@localhost" node="foo" name="Test" />
</query>
</iq>
""")
def testOverrideGlobalItemsHandler(self):
"""Test overriding the global JID items handler."""
self.stream_start(mode='component',
jid='tester.localhost',
plugins=['xep_0030'])
def dynamic_global(jid, node, iq):
result = self.xmpp['xep_0030'].stanza.DiscoItems()
result['node'] = node
result.add_item('tester.localhost', node='foo', name='Global')
return result
self.xmpp['xep_0030'].set_node_handler('get_items',
handler=dynamic_global)
self.xmpp['xep_0030'].make_static(jid='user@tester.localhost',
node='testing')
self.xmpp['xep_0030'].add_item(ijid='user@tester.localhost',
node='testing',
jid='user@tester.localhost',
subnode='foo',
name='Test')
self.recv("""
<iq type="get" id="test"
to="user@tester.localhost"
from="tester@localhost">
<query xmlns="http://jabber.org/protocol/disco#items"
node="testing" />
</iq>
""")
self.send("""
<iq type="result" id="test"
to="tester@localhost"
from="user@tester.localhost">
<query xmlns="http://jabber.org/protocol/disco#items"
node="testing">
<item jid="user@tester.localhost" node="foo" name="Test" />
</query>
</iq>
""")
def testGetItemsRemote(self):
"""
Test sending a disco#items query to another entity
and receiving the result.
"""
self.stream_start(mode='client',
plugins=['xep_0030'])
events = set()
results = set()
def handle_disco_items(iq):
events.add('disco_items')
results.update(iq['disco_items']['items'])
self.xmpp.add_event_handler('disco_items', handle_disco_items)
t = threading.Thread(name="get_items",
target=self.xmpp['xep_0030'].get_items,
args=('user@localhost', 'foo'))
t.start()
self.send("""
<iq type="get" to="user@localhost" id="1">
<query xmlns="http://jabber.org/protocol/disco#items"
node="foo" />
</iq>
""")
self.recv("""
<iq type="result" to="tester@localhost" id="1">
<query xmlns="http://jabber.org/protocol/disco#items"
node="foo">
<item jid="user@localhost" node="bar" name="Test" />
<item jid="user@localhost" node="baz" name="Test 2" />
</query>
</iq>
""")
# Wait for disco#items request to be received.
t.join()
time.sleep(0.1)
items = set([('user@localhost', 'bar', 'Test'),
('user@localhost', 'baz', 'Test 2')])
self.assertEqual(events, set(('disco_items',)),
"Disco items event was not triggered: %s" % events)
self.assertEqual(results, items,
"Unexpected items: %s" % results)
def testGetItemsIterator(self):
"""Test interaction between XEP-0030 and XEP-0059 plugins."""
raised_exceptions = []
self.stream_start(mode='client',
plugins=['xep_0030', 'xep_0059'])
results = self.xmpp['xep_0030'].get_items(jid='foo@localhost',
node='bar',
iterator=True)
results.amount = 10
def run_test():
try:
results.next()
except StopIteration:
raised_exceptions.append(True)
t = threading.Thread(name="get_items_iterator",
target=run_test)
t.start()
self.send("""
<iq id="2" type="get" to="foo@localhost">
<query xmlns="http://jabber.org/protocol/disco#items"
node="bar">
<set xmlns="http://jabber.org/protocol/rsm">
<max>10</max>
</set>
</query>
</iq>
""")
self.recv("""
<iq id="2" type="result" to="tester@localhost">
<query xmlns="http://jabber.org/protocol/disco#items">
<set xmlns="http://jabber.org/protocol/rsm">
</set>
</query>
</iq>
""")
t.join()
self.assertEqual(raised_exceptions, [True],
"StopIteration was not raised: %s" % raised_exceptions)
suite = unittest.TestLoader().loadTestsFromTestCase(TestStreamDisco)