Updated, cleaned, and documented Iq stanza class. Also added unit tests.

This commit is contained in:
Lance Stout 2010-07-29 23:58:25 -04:00
parent 1da3e5b35e
commit cbed8029ba
2 changed files with 257 additions and 67 deletions

View file

@ -5,73 +5,170 @@
See the file LICENSE for copying permission.
"""
from .. xmlstream.stanzabase import StanzaBase
from xml.etree import cElementTree as ET
from . error import Error
from .. xmlstream.handler.waiter import Waiter
from .. xmlstream.matcher.id import MatcherId
from . rootstanza import RootStanza
from sleekxmpp.stanza import Error
from sleekxmpp.stanza.rootstanza import RootStanza
from sleekxmpp.xmlstream import RESPONSE_TIMEOUT
from sleekxmpp.xmlstream.stanzabase import StanzaBase, ET
from sleekxmpp.xmlstream.handler import Waiter
from sleekxmpp.xmlstream.matcher import MatcherId
class Iq(RootStanza):
interfaces = set(('type', 'to', 'from', 'id','query'))
types = set(('get', 'result', 'set', 'error'))
name = 'iq'
plugin_attrib = name
namespace = 'jabber:client'
def __init__(self, *args, **kwargs):
StanzaBase.__init__(self, *args, **kwargs)
if self['id'] == '':
if self.stream is not None:
self['id'] = self.stream.getNewId()
else:
self['id'] = '0'
def unhandled(self):
if self['type'] in ('get', 'set'):
self.reply()
self['error']['condition'] = 'feature-not-implemented'
self['error']['text'] = 'No handlers registered for this request.'
self.send()
def setPayload(self, value):
self.clear()
StanzaBase.setPayload(self, value)
return self
def setQuery(self, value):
query = self.xml.find("{%s}query" % value)
if query is None and value:
self.clear()
query = ET.Element("{%s}query" % value)
self.xml.append(query)
return self
def getQuery(self):
for child in self.xml.getchildren():
if child.tag.endswith('query'):
ns =child.tag.split('}')[0]
if '{' in ns:
ns = ns[1:]
return ns
return ''
def reply(self):
self['type'] = 'result'
StanzaBase.reply(self)
return self
def delQuery(self):
for child in self.getchildren():
if child.tag.endswith('query'):
self.xml.remove(child)
return self
def send(self, block=True, timeout=10):
if block and self['type'] in ('get', 'set'):
waitfor = Waiter('IqWait_%s' % self['id'], MatcherId(self['id']))
self.stream.registerHandler(waitfor)
StanzaBase.send(self)
return waitfor.wait(timeout)
else:
return StanzaBase.send(self)
"""
XMPP <iq> stanzas, or info/query stanzas, are XMPP's method of
requesting and modifying information, similar to HTTP's GET and
POST methods.
Each <iq> stanza must have an 'id' value which associates the
stanza with the response stanza. XMPP entities must always
be given a response <iq> stanza with a type of 'result' after
sending a stanza of type 'get' or 'set'.
Most uses cases for <iq> stanzas will involve adding a <query>
element whose namespace indicates the type of information
desired. However, some custom XMPP applications use <iq> stanzas
as a carrier stanza for an application-specific protocol instead.
Example <iq> Stanzas:
<iq to="user@example.com" type="get" id="314">
<query xmlns="http://jabber.org/protocol/disco#items" />
</iq>
<iq to="user@localhost" type="result" id="17">
<query xmlns='jabber:iq:roster'>
<item jid='otheruser@example.net'
name='John Doe'
subscription='both'>
<group>Friends</group>
</item>
</query>
</iq>
Stanza Interface:
query -- The namespace of the <query> element if one exists.
Methods:
__init__ -- Overrides StanzaBase.__init__.
unhandled -- Send error if there are no handlers.
setPayload -- Overrides StanzaBase.setPayload.
setQuery -- Add or modify a <query> element.
getQuery -- Return the namespace of the <query> element.
delQuery -- Remove the <query> element.
reply -- Overrides StanzaBase.reply
send -- Overrides StanzaBase.send
"""
namespace = 'jabber:client'
name = 'iq'
interfaces = set(('type', 'to', 'from', 'id', 'query'))
types = set(('get', 'result', 'set', 'error'))
plugin_attrib = name
def __init__(self, *args, **kwargs):
"""
Initialize a new <iq> stanza with an 'id' value.
Overrides StanzaBase.__init__.
"""
StanzaBase.__init__(self, *args, **kwargs)
if self['id'] == '':
if self.stream is not None:
self['id'] = self.stream.getNewId()
else:
self['id'] = '0'
def unhandled(self):
"""
Send a feature-not-implemented error if the stanza is not handled.
Overrides StanzaBase.unhandled.
"""
if self['type'] in ('get', 'set'):
self.reply()
self['error']['condition'] = 'feature-not-implemented'
self['error']['text'] = 'No handlers registered for this request.'
self.send()
def setPayload(self, value):
"""
Set the XML contents of the <iq> stanza.
Arguments:
value -- An XML object to use as the <iq> stanza's contents
"""
self.clear()
StanzaBase.setPayload(self, value)
return self
def setQuery(self, value):
"""
Add or modify a <query> element.
Query elements are differentiated by their namespace.
Arguments:
value -- The namespace of the <query> element.
"""
query = self.xml.find("{%s}query" % value)
if query is None and value:
self.clear()
query = ET.Element("{%s}query" % value)
self.xml.append(query)
return self
def getQuery(self):
"""Return the namespace of the <query> element."""
for child in self.xml.getchildren():
if child.tag.endswith('query'):
ns = child.tag.split('}')[0]
if '{' in ns:
ns = ns[1:]
return ns
return ''
def delQuery(self):
"""Remove the <query> element."""
for child in self.xml.getchildren():
if child.tag.endswith('query'):
self.xml.remove(child)
return self
def reply(self):
"""
Send a reply <iq> stanza.
Overrides StanzaBase.reply
Sets the 'type' to 'result' in addition to the default
StanzaBase.reply behavior.
"""
self['type'] = 'result'
StanzaBase.reply(self)
return self
def send(self, block=True, timeout=RESPONSE_TIMEOUT):
"""
Send an <iq> stanza over the XML stream.
The send call can optionally block until a response is received or
a timeout occurs. Be aware that using blocking in non-threaded event
handlers can drastically impact performance.
Overrides StanzaBase.send
Arguments:
block -- Specify if the send call will block until a response
is received, or a timeout occurs. Defaults to True.
timeout -- The length of time (in seconds) to wait for a response
before exiting the send call if blocking is used.
Defaults to sleekxmpp.xmlstream.RESPONSE_TIMEOUT
"""
if block and self['type'] in ('get', 'set'):
waitfor = Waiter('IqWait_%s' % self['id'], MatcherId(self['id']))
self.stream.registerHandler(waitfor)
StanzaBase.send(self)
return waitfor.wait(timeout)
else:
return StanzaBase.send(self)

93
tests/test_iqstanzas.py Normal file
View file

@ -0,0 +1,93 @@
from sleektest import *
from sleekxmpp.xmlstream.stanzabase import ET
class TestIqStanzas(SleekTest):
def setUp(self):
"""Start XML stream for testing."""
self.streamStart()
def tearDown(self):
"""Shutdown the XML stream after testing."""
self.streamClose()
def testSetup(self):
"""Test initializing default Iq values."""
iq = self.Iq()
self.checkIq(iq, """
<iq id="0" />
""")
def testPayload(self):
"""Test setting Iq stanza payload."""
iq = self.Iq()
iq.setPayload(ET.Element('{test}tester'))
self.checkIq(iq, """
<iq id="0">
<tester xmlns="test" />
</iq>
""", use_values=False)
def testUnhandled(self):
"""Test behavior for Iq.unhandled."""
self.streamRecv("""
<iq id="test" type="get">
<query xmlns="test" />
</iq>
""")
iq = self.Iq()
iq['id'] = 'test'
iq['error']['condition'] = 'feature-not-implemented'
iq['error']['text'] = 'No handlers registered for this request.'
self.streamSendIq(iq, """
<iq id="test" type="error">
<error type="cancel">
<feature-not-implemented xmlns="urn:ietf:params:xml:ns:xmpp-stanzas" />
<text xmlns="urn:ietf:params:xml:ns:xmpp-stanzas">
No handlers registered for this request.
</text>
</error>
</iq>
""")
def testQuery(self):
"""Test modifying query element of Iq stanzas."""
iq = self.Iq()
iq['query'] = 'query_ns'
self.checkIq(iq, """
<iq id="0">
<query xmlns="query_ns" />
</iq>
""")
iq['query'] = 'query_ns2'
self.checkIq(iq, """
<iq id="0">
<query xmlns="query_ns2" />
</iq>
""")
self.failUnless(iq['query'] == 'query_ns2', "Query namespace doesn't match")
del iq['query']
self.checkIq(iq, """
<iq id="0" />
""")
def testReply(self):
"""Test setting proper result type in Iq replies."""
iq = self.Iq()
iq['to'] = 'user@localhost'
iq['type'] = 'get'
iq.reply()
self.checkIq(iq, """
<iq id="0" type="result" />
""")
suite = unittest.TestLoader().loadTestsFromTestCase(TestIqStanzas)