First pass at a new XEP-0030 plugin.

Now with dynamic node handling goodness.

Some things are not quite working yet, in particular:
    set_items
    set_info
    set_identities
    set_features

And still need more unit tests to round things out.
This commit is contained in:
Lance Stout 2010-12-09 18:57:27 -05:00
parent 8d4e77aba6
commit f4451fe6b7
10 changed files with 1741 additions and 449 deletions

View file

@ -1,356 +0,0 @@
"""
SleekXMPP: The Sleek XMPP Library
Copyright (C) 2010 Nathanael C. Fritz, Lance J.T. Stout
This file is part of SleekXMPP.
See the file LICENSE for copying permission.
"""
import logging
from . import base
from .. xmlstream.handler.callback import Callback
from .. xmlstream.matcher.xpath import MatchXPath
from .. xmlstream.stanzabase import registerStanzaPlugin, ElementBase, ET, JID
from .. stanza.iq import Iq
log = logging.getLogger(__name__)
class DiscoInfo(ElementBase):
namespace = 'http://jabber.org/protocol/disco#info'
name = 'query'
plugin_attrib = 'disco_info'
interfaces = set(('node', 'features', 'identities'))
def getFeatures(self):
features = []
featuresXML = self.xml.findall('{%s}feature' % self.namespace)
for feature in featuresXML:
features.append(feature.attrib['var'])
return features
def setFeatures(self, features):
self.delFeatures()
for name in features:
self.addFeature(name)
def delFeatures(self):
featuresXML = self.xml.findall('{%s}feature' % self.namespace)
for feature in featuresXML:
self.xml.remove(feature)
def addFeature(self, feature):
featureXML = ET.Element('{%s}feature' % self.namespace,
{'var': feature})
self.xml.append(featureXML)
def delFeature(self, feature):
featuresXML = self.xml.findall('{%s}feature' % self.namespace)
for featureXML in featuresXML:
if featureXML.attrib['var'] == feature:
self.xml.remove(featureXML)
def getIdentities(self):
ids = []
idsXML = self.xml.findall('{%s}identity' % self.namespace)
for idXML in idsXML:
idData = (idXML.attrib['category'],
idXML.attrib['type'],
idXML.attrib.get('name', ''))
ids.append(idData)
return ids
def setIdentities(self, ids):
self.delIdentities()
for idData in ids:
self.addIdentity(*idData)
def delIdentities(self):
idsXML = self.xml.findall('{%s}identity' % self.namespace)
for idXML in idsXML:
self.xml.remove(idXML)
def addIdentity(self, category, itype, name=''):
idXML = ET.Element('{%s}identity' % self.namespace)
idXML.attrib['category'] = category
idXML.attrib['type'] = itype
if name:
idXML.attrib['name'] = name
self.xml.append(idXML)
def delIdentity(self, category, id_type, name=''):
idsXML = self.xml.findall('{%s}identity' % self.namespace)
for idXML in idsXML:
idData = (idXML.attrib['category'],
idXML.attrib['type'])
delId = (category, id_type)
if idData == delId:
self.xml.remove(idXML)
class DiscoItems(ElementBase):
namespace = 'http://jabber.org/protocol/disco#items'
name = 'query'
plugin_attrib = 'disco_items'
interfaces = set(('node', 'items'))
def getItems(self):
items = []
itemsXML = self.xml.findall('{%s}item' % self.namespace)
for item in itemsXML:
itemData = (item.attrib['jid'],
item.attrib.get('node'),
item.attrib.get('name'))
items.append(itemData)
return items
def setItems(self, items):
self.delItems()
for item in items:
self.addItem(*item)
def delItems(self):
itemsXML = self.xml.findall('{%s}item' % self.namespace)
for item in itemsXML:
self.xml.remove(item)
def addItem(self, jid, node='', name=''):
itemXML = ET.Element('{%s}item' % self.namespace, {'jid': jid})
if name:
itemXML.attrib['name'] = name
if node:
itemXML.attrib['node'] = node
self.xml.append(itemXML)
def delItem(self, jid, node=''):
itemsXML = self.xml.findall('{%s}item' % self.namespace)
for itemXML in itemsXML:
itemData = (itemXML.attrib['jid'],
itemXML.attrib.get('node', ''))
itemDel = (jid, node)
if itemData == itemDel:
self.xml.remove(itemXML)
class DiscoNode(object):
"""
Collection object for grouping info and item information
into nodes.
"""
def __init__(self, name):
self.name = name
self.info = DiscoInfo()
self.items = DiscoItems()
self.info['node'] = name
self.items['node'] = name
# This is a bit like poor man's inheritance, but
# to simplify adding information to the node we
# map node functions to either the info or items
# stanza objects.
#
# We don't want to make DiscoNode inherit from
# DiscoInfo and DiscoItems because DiscoNode is
# not an actual stanza, and doing so would create
# confusion and potential bugs.
self._map(self.items, 'items', ['get', 'set', 'del'])
self._map(self.items, 'item', ['add', 'del'])
self._map(self.info, 'identities', ['get', 'set', 'del'])
self._map(self.info, 'identity', ['add', 'del'])
self._map(self.info, 'features', ['get', 'set', 'del'])
self._map(self.info, 'feature', ['add', 'del'])
def isEmpty(self):
"""
Test if the node contains any information. Useful for
determining if a node can be deleted.
"""
ids = self.getIdentities()
features = self.getFeatures()
items = self.getItems()
if not ids and not features and not items:
return True
return False
def _map(self, obj, interface, access):
"""
Map functions of the form obj.accessInterface
to self.accessInterface for each given access type.
"""
interface = interface.title()
for access_type in access:
method = access_type + interface
if hasattr(obj, method):
setattr(self, method, getattr(obj, method))
class xep_0030(base.base_plugin):
"""
XEP-0030 Service Discovery
"""
def plugin_init(self):
self.xep = '0030'
self.description = 'Service Discovery'
self.xmpp.registerHandler(
Callback('Disco Items',
MatchXPath('{%s}iq/{%s}query' % (self.xmpp.default_ns,
DiscoItems.namespace)),
self.handle_item_query))
self.xmpp.registerHandler(
Callback('Disco Info',
MatchXPath('{%s}iq/{%s}query' % (self.xmpp.default_ns,
DiscoInfo.namespace)),
self.handle_info_query))
registerStanzaPlugin(Iq, DiscoInfo)
registerStanzaPlugin(Iq, DiscoItems)
self.xmpp.add_event_handler('disco_items_request', self.handle_disco_items)
self.xmpp.add_event_handler('disco_info_request', self.handle_disco_info)
self.nodes = {}
self.add_node('')
self.add_feature('http://jabber.org/protocol/disco#info', node='')
def add_node(self, node):
if node not in self.nodes:
self.nodes[node] = DiscoNode(node)
def del_node(self, node):
if node in self.nodes:
del self.nodes[node]
def rename_node(self, node, new_name):
if new_name not in self.nodes and node in self.nodes:
self.nodes[new_name] = self.nodes[node]
self.nodes[new_name].name = new_name
self.nodes[new_name].info['node'] = new_name
self.nodes[new_name].items['node'] = new_name
self.del_node(node)
def handle_item_query(self, iq):
if iq['type'] == 'get':
log.debug("Items requested by %s" % iq['from'])
self.xmpp.event('disco_items_request', iq)
elif iq['type'] == 'result':
log.debug("Items result from %s" % iq['from'])
self.xmpp.event('disco_items', iq)
def handle_info_query(self, iq):
if iq['type'] == 'get':
log.debug("Info requested by %s" % iq['from'])
self.xmpp.event('disco_info_request', iq)
elif iq['type'] == 'result':
log.debug("Info result from %s" % iq['from'])
self.xmpp.event('disco_info', iq)
def handle_disco_info(self, iq, forwarded=False):
"""
A default handler for disco#info requests. If another
handler is registered, this one will defer and not run.
"""
if not forwarded and \
self.xmpp.event_handled('disco_info_request') > 1:
return
node_name = iq['disco_info']['node']
log.debug("Using default handler for disco#info on node '%s'." % node_name)
if node_name in self.nodes:
node = self.nodes[node_name]
iq.reply()
iq['disco_info']['node'] = node_name
identities = node.info['identities']
if identities:
iq['disco_info']['identities'] = identities
else:
if self.xmpp.is_component:
iq['disco_info'].addIdentity(
category='component',
itype='generic')
else:
iq['disco_info'].addIdentity(
category='client',
itype='bot')
log.info("No identity found for node '%'," + \
"using default, generic identity")
iq['disco_info']['features'] = node.info['features']
iq.send()
else:
log.debug("Node %s requested, but does not exist." % node_name)
iq.reply().error().setPayload(iq['disco_info'].xml)
iq['error']['code'] = '404'
iq['error']['type'] = 'cancel'
iq['error']['condition'] = 'item-not-found'
iq.send()
def handle_disco_items(self, iq, forwarded=False):
"""
A default handler for disco#items requests. If another
handler is registered, this one will defer and not run.
If this handler is called by your own custom handler with
forwarded set to True, then it will run as normal.
"""
if not forwarded and \
self.xmpp.event_handled('disco_items_request') > 1:
return
node_name = iq['disco_items']['node']
log.debug("Using default handler for disco#items on node: '%s'." % node_name)
if node_name in self.nodes:
node = self.nodes[node_name]
iq.reply().setPayload(node.items.xml).send()
else:
log.debug("Node %s requested, but does not exist." % node_name)
iq.reply().error().setPayload(iq['disco_items'].xml)
iq['error']['code'] = '404'
iq['error']['type'] = 'cancel'
iq['error']['condition'] = 'item-not-found'
iq.send()
# Older interface methods for backwards compatibility
def getInfo(self, jid, node='', dfrom=None):
iq = self.xmpp.Iq()
iq['type'] = 'get'
iq['to'] = jid
iq['from'] = dfrom
iq['disco_info']['node'] = node
return iq.send()
def getItems(self, jid, node='', dfrom=None):
iq = self.xmpp.Iq()
iq['type'] = 'get'
iq['to'] = jid
iq['from'] = dfrom
iq['disco_items']['node'] = node
return iq.send()
def add_feature(self, feature, node=''):
self.add_node(node)
self.nodes[node].addFeature(feature)
def add_identity(self, category='', itype='', name='', node=''):
self.add_node(node)
self.nodes[node].addIdentity(category=category,
itype=itype,
name=name)
def add_item(self, jid=None, name='', node='', subnode=''):
self.add_node(node)
self.add_node(subnode)
if jid is None:
jid = self.xmpp.fulljid
self.nodes[node].addItem(jid=jid, name=name, node=subnode)

View file

@ -0,0 +1,12 @@
"""
SleekXMPP: The Sleek XMPP Library
Copyright (C) 2010 Nathanael C. Fritz, Lance J.T. Stout
This file is part of SleekXMPP.
See the file LICENSE for copying permission.
"""
from sleekxmpp.plugins.xep_0030 import stanza
from sleekxmpp.plugins.xep_0030.stanza import DiscoInfo, DiscoItems
from sleekxmpp.plugins.xep_0030.static import StaticDisco
from sleekxmpp.plugins.xep_0030.disco import xep_0030

View file

@ -0,0 +1,314 @@
"""
SleekXMPP: The Sleek XMPP Library
Copyright (C) 2010 Nathanael C. Fritz, Lance J.T. Stout
This file is part of SleekXMPP.
See the file LICENSE for copying permission.
"""
import logging
import sleekxmpp
from sleekxmpp import Iq
from sleekxmpp.exceptions import XMPPError
from sleekxmpp.plugins.base import base_plugin
from sleekxmpp.xmlstream.handler import Callback
from sleekxmpp.xmlstream.matcher import StanzaPath
from sleekxmpp.xmlstream import register_stanza_plugin, ElementBase, ET, JID
from sleekxmpp.plugins.xep_0030 import DiscoInfo, DiscoItems, StaticDisco
log = logging.getLogger(__name__)
class xep_0030(base_plugin):
"""
XEP-0030: Service Discovery
Stream Handlers:
Disco Info --
Disco Items --
Events:
disco_info --
disco_items --
disco_info_query --
disco_items_query --
Methods:
set_node_handler --
del_node_handler --
add_identity --
del_identity --
add_feature --
del_feature --
add_item --
del_item --
get_info --
get_items --
"""
def plugin_init(self):
self.xep = '0030'
self.description = 'Service Discovery'
self.stanza = sleekxmpp.plugins.xep_0030.stanza
self.xmpp.register_handler(
Callback('Disco Info',
StanzaPath('iq/disco_info'),
self._handle_disco_info))
self.xmpp.register_handler(
Callback('Disco Items',
StanzaPath('iq/disco_items'),
self._handle_disco_items))
register_stanza_plugin(Iq, DiscoInfo)
register_stanza_plugin(Iq, DiscoItems)
self.static = StaticDisco(self.xmpp)
self._disco_ops = ['get_info', 'set_identities', 'set_features',
'del_info', 'get_items', 'set_items', 'del_items',
'add_identity', 'del_identity', 'add_feature',
'del_feature', 'add_item', 'del_item']
self.handlers = {}
for op in self._disco_ops:
self.handlers[op] = {'global': getattr(self.static, op),
'jid': {},
'node': {}}
def set_node_handler(self, htype, jid=None, node=None, handler=None):
"""
Arguments:
htype
jid
node
handler
"""
if htype not in self._disco_ops:
return
if jid is None and node is None:
self.handlers[htype]['global'] = handler
elif node is None:
self.handlers[htype]['jid'][jid] = handler
elif jid is None:
jid = self.xmpp.boundjid.full
self.handlers[htype]['node'][(jid, node)] = handler
else:
self.handlers[htype]['node'][(jid, node)] = handler
def del_node_handler(self, htype, jid, node):
"""
Arguments:
htype
jid
node
"""
self.set_node_handler(htype, jid, node, None)
def make_static(self, jid=None, node=None, handlers=None):
"""
Change all of a node's handlers to the default static
handlers. Useful for manually overriding the contents
of a node that would otherwise be handled by a JID level
or global level dynamic handler.
Arguments:
jid -- The JID owning the node to modify.
node -- The node to change to using static handlers.
handlers -- Optional list of handlers to change to the
static version. If provided, only these
handlers will be changed. Otherwise, all
handlers will use the static version.
"""
if handlers is None:
handlers = self._disco_ops
for op in handlers:
self.del_node_handler(op, jid, node)
self.set_node_handler(op, jid, node, getattr(self.static, op))
def get_info(self, jid=None, node=None, local=False, **kwargs):
"""
Arguments:
jid --
node --
local --
dfrom --
block --
timeout --
callback --
"""
if local or jid is None:
log.debug("Looking up local disco#info data " + \
"for %s, node %s." % (jid, node))
info = self._run_node_handler('get_info', jid, node, kwargs)
return self._fix_default_info(info)
iq = self.xmpp.Iq()
iq['from'] = kwargs.get('dfrom', '')
iq['to'] = jid
iq['type'] = 'get'
iq['disco_info']['node'] = node if node else ''
return iq.send(timeout=kwargs.get('timeout', None),
block=kwargs.get('block', None),
callback=kwargs.get('callback', None))
def get_items(self, jid=None, node=None, local=False, **kwargs):
"""
Arguments:
jid --
node --
local --
dfrom --
block --
timeout --
callback --
"""
if local or jid is None:
return self._run_node_handler('get_items', jid, node, kwargs)
iq = self.xmpp.Iq()
iq['from'] = kwargs.get('dfrom', '')
iq['to'] = jid
iq['type'] = 'get'
iq['disco_items']['node'] = node if node else ''
return iq.send(timeout=kwargs.get('timeout', None),
block=kwargs.get('block', None),
callback=kwargs.get('callback', None))
def set_info(self, jid=None, node=None, **kwargs):
self._run_node_handler('set_info', jid, node, kwargs)
def del_info(self, jid=None, node=None, **kwargs):
self._run_node_handler('del_info', jid, node, kwargs)
def set_items(self, jid=None, node=None, **kwargs):
self._run_node_handler('set_items', jid, node, kwargs)
def del_items(self, jid=None, node=None, **kwargs):
self._run_node_handler('del_items', jid, node, kwargs)
def add_identity(self, jid=None, node=None, **kwargs):
self._run_node_handler('add_identity', jid, node, kwargs)
def add_feature(self, jid=None, node=None, **kwargs):
self._run_node_handler('add_feature', jid, node, kwargs)
def del_identity(self, jid=None, node=None, **kwargs):
self._run_node_handler('del_identity', jid, node, kwargs)
def del_feature(self, jid=None, node=None, **kwargs):
self._run_node_handler('del_feature', jid, node, kwargs)
def add_item(self, jid=None, node=None, **kwargs):
self._run_node_handler('add_item', jid, node, kwargs)
def del_item(self, jid=None, node=None, **kwargs):
self._run_node_handler('del_item', jid, node, kwargs)
def _run_node_handler(self, htype, jid, node, data=None):
"""
Execute the most specific node handler for the given
JID/node combination.
Arguments:
htype -- The handler type to execute.
jid -- The JID requested.
node -- The node requested.
dat -- Optional, custom data to pass to the handler.
"""
if jid is None:
jid = self.xmpp.boundjid.full
if node is None:
node = ''
if self.handlers[htype]['node'].get((jid, node), False):
return self.handlers[htype]['node'][(jid, node)](jid, node, data)
elif self.handlers[htype]['jid'].get(jid, False):
return self.handlers[htype]['jid'][jid](jid, node, data)
elif self.handlers[htype]['global']:
return self.handlers[htype]['global'](jid, node, data)
else:
return None
def _handle_disco_info(self, iq):
"""
Process an incoming disco#info stanza. If it is a get
request, find and return the appropriate identities
and features. If it is an info result, fire the
disco_info event.
Arguments:
iq -- The incoming disco#items stanza.
"""
if iq['type'] == 'get':
log.debug("Received disco info query from " + \
"<%s> to <%s>." % (iq['from'], iq['to']))
info = self._run_node_handler('get_info',
iq['to'].full,
iq['disco_info']['node'],
iq)
iq.reply()
if info:
info = self._fix_default_info(info)
iq.set_payload(info.xml)
iq.send()
elif iq['type'] == 'result':
log.debug("Received disco info result from" + \
"%s to %s." % (iq['from'], iq['to']))
self.xmpp.event('disco_info', iq)
def _handle_disco_items(self, iq):
"""
Process an incoming disco#items stanza. If it is a get
request, find and return the appropriate items. If it
is an items result, fire the disco_items event.
Arguments:
iq -- The incoming disco#items stanza.
"""
if iq['type'] == 'get':
log.debug("Received disco items query from " + \
"<%s> to <%s>." % (iq['from'], iq['to']))
items = self._run_node_handler('get_items',
iq['to'].full,
iq['disco_items']['node'])
iq.reply()
if items:
iq.set_payload(items.xml)
iq.send()
elif iq['type'] == 'result':
log.debug("Received disco items result from" + \
"%s to %s." % (iq['from'], iq['to']))
self.xmpp.event('disco_items', iq)
def _fix_default_info(self, info):
"""
Disco#info results for a JID are required to include at least
one identity and feature. As a default, if no other identity is
provided, SleekXMPP will use either the generic component or the
bot client identity. A the standard disco#info feature will also be
added if no features are provided.
Arguments:
info -- The disco#info quest (not the full Iq stanza) to modify.
"""
if not info['node']:
if not info['identities']:
if self.xmpp.is_component:
log.debug("No identity found for this entity." + \
"Using default component identity.")
info.add_identity('component', 'generic')
else:
log.debug("No identity found for this entity." + \
"Using default client identity.")
info.add_identity('client', 'bot')
if not info['features']:
log.debug("No features found for this entity." + \
"Using default disco#info feature.")
info.add_feature(info.namespace)
return info

View file

@ -0,0 +1,10 @@
"""
SleekXMPP: The Sleek XMPP Library
Copyright (C) 2010 Nathanael C. Fritz, Lance J.T. Stout
This file is part of SleekXMPP.
See the file LICENSE for copying permission.
"""
from sleekxmpp.plugins.xep_0030.stanza.info import DiscoInfo
from sleekxmpp.plugins.xep_0030.stanza.items import DiscoItems

View file

@ -0,0 +1,262 @@
"""
SleekXMPP: The Sleek XMPP Library
Copyright (C) 2010 Nathanael C. Fritz, Lance J.T. Stout
This file is part of SleekXMPP.
See the file LICENSE for copying permission.
"""
from sleekxmpp.xmlstream import ElementBase, ET
class DiscoInfo(ElementBase):
"""
XMPP allows for users and agents to find the identities and features
supported by other entities in the XMPP network through service discovery,
or "disco". In particular, the "disco#info" query type for <iq> stanzas is
used to request the list of identities and features offered by a JID.
An identity is a combination of a category and type, such as the 'client'
category with a type of 'pc' to indicate the agent is a human operated
client with a GUI, or a category of 'gateway' with a type of 'aim' to
identify the agent as a gateway for the legacy AIM protocol. See
<http://xmpp.org/registrar/disco-categories.html> for a full list of
accepted category and type combinations.
Features are simply a set of the namespaces that identify the supported
features. For example, a client that supports service discovery will
include the feature 'http://jabber.org/protocol/disco#info'.
Since clients and components may operate in several roles at once, identity
and feature information may be grouped into "nodes". If one were to write
all of the identities and features used by a client, then node names would
be like section headings.
Example disco#info stanzas:
<iq type="get">
<query xmlns="http://jabber.org/protocol/disco#info" />
</iq>
<iq type="result">
<query xmlns="http://jabber.org/protocol/disco#info">
<identity category="client" type="bot" name="SleekXMPP Bot" />
<feature var="http://jabber.org/protocol/disco#info" />
<feature var="jabber:x:data" />
<feature var="urn:xmpp:ping" />
</query>
</iq>
Stanza Interface:
node -- The name of the node to either
query or return info from.
identities -- A set of 4-tuples, where each tuple contains
the category, type, xml:lang, and name
of an identity.
features -- A set of namespaces for features.
Methods:
add_identity -- Add a new, single identity.
del_identity -- Remove a single identity.
get_identities -- Return all identities in tuple form.
set_identities -- Use multiple identities, each given in tuple form.
del_identities -- Remove all identities.
add_feature -- Add a single feature.
del_feature -- Remove a single feature.
get_features -- Return a list of all features.
set_features -- Use a given list of features.
del_features -- Remove all features.
"""
name = 'query'
namespace = 'http://jabber.org/protocol/disco#info'
plugin_attrib = 'disco_info'
interfaces = set(('node', 'features', 'identities'))
lang_interfaces = set(('identities',))
# Cache identities and features
_identities = set()
_features = set()
def setup(self, xml=None):
"""
Populate the stanza object using an optional XML object.
Overrides ElementBase.setup
Caches identity and feature information.
Arguments:
xml -- Use an existing XML object for the stanza's values.
"""
ElementBase.setup(self, xml)
self._identities = set([id[0:3] for id in self['identities']])
self._features = self['features']
def add_identity(self, category, itype, name=None, lang=None):
"""
Add a new identity element. Each identity must be unique
in terms of all four identity components.
Multiple, identical category/type pairs are allowed only
if the xml:lang values are different. Likewise, multiple
category/type/xml:lang pairs are allowed so long as the names
are different. In any case, a category and type are required.
Arguments:
category -- The general category to which the agent belongs.
itype -- A more specific designation with the category.
name -- Optional human readable name for this identity.
lang -- Optional standard xml:lang value.
"""
identity = (category, itype, lang)
if identity not in self._identities:
self._identities.add(identity)
id_xml = ET.Element('{%s}identity' % self.namespace)
id_xml.attrib['category'] = category
id_xml.attrib['type'] = itype
if lang:
id_xml.attrib['{%s}lang' % self.xml_ns] = lang
if name:
id_xml.attrib['name'] = name
self.xml.append(id_xml)
return True
return False
def del_identity(self, category, itype, name=None, lang=None):
"""
Remove a given identity.
Arguments:
category -- The general category to which the agent belonged.
itype -- A more specific designation with the category.
name -- Optional human readable name for this identity.
lang -- Optional, standard xml:lang value.
"""
identity = (category, itype, lang)
if identity in self._identities:
self._identities.remove(identity)
for id_xml in self.findall('{%s}identity' % self.namespace):
id = (id_xml.attrib['category'],
id_xml.attrib['type'],
id_xml.attrib.get('{%s}lang' % self.xml_ns, None))
if id == identity:
self.xml.remove(id_xml)
return True
return False
def get_identities(self, lang=None):
"""
Return a set of all identities in tuple form as so:
(category, type, lang, name)
If a language was specified, only return identities using
that language.
Arguments:
lang -- Optional, standard xml:lang value.
"""
identities = set()
for id_xml in self.findall('{%s}identity' % self.namespace):
xml_lang = id_xml.attrib.get('{%s}lang' % self.xml_ns, None)
if lang is None or xml_lang == lang:
identities.add((
id_xml.attrib['category'],
id_xml.attrib['type'],
id_xml.attrib.get('{%s}lang' % self.xml_ns, None),
id_xml.attrib.get('name', None)))
return identities
def set_identities(self, identities, lang=None):
"""
Add or replace all identities. The identities must be a in set
where each identity is a tuple of the form:
(category, type, lang, name)
If a language is specifified, any identities using that language
will be removed to be replaced with the given identities.
NOTE: An identity's language will not be changed regardless of
the value of lang.
Arguments:
identities -- A set of identities in tuple form.
lang -- Optional, standard xml:lang value.
"""
self.del_identities(lang)
for identity in identities:
category, itype, lang, name = identity
self.add_identity(category, itype, name, lang)
def del_identities(self, lang=None):
"""
Remove all identities. If a language was specified, only
remove identities using that language.
Arguments:
lang -- Optional, standard xml:lang value.
"""
for id_xml in self.findall('{%s}identity' % self.namespace):
if lang is None:
self.xml.remove(id_xml)
elif id_xml.attrib.get('{%s}lang' % self.xml_ns, None) == lang:
self._identities.remove((
id_xml.attrib['category'],
id_xml.attrib['type'],
id_xml.attrib.get('{%s}lang' % self.xml_ns, None)))
self.xml.remove(id_xml)
def add_feature(self, feature):
"""
Add a single, new feature.
Arguments:
feature -- The namespace of the supported feature.
"""
if feature not in self._features:
self._features.add(feature)
feature_xml = ET.Element('{%s}feature' % self.namespace)
feature_xml.attrib['var'] = feature
self.xml.append(feature_xml)
return True
return False
def del_feature(self, feature):
"""
Remove a single feature.
Arguments:
feature -- The namespace of the removed feature.
"""
if feature in self._features:
self._features.remove(feature)
for feature_xml in self.findall('{%s}feature' % self.namespace):
if feature_xml.attrib['var'] == feature:
self.xml.remove(feature_xml)
return True
return False
def get_features(self):
"""Return the set of all supported features."""
features = set()
for feature_xml in self.findall('{%s}feature' % self.namespace):
features.add(feature_xml.attrib['var'])
return features
def set_features(self, features):
"""
Add or replace the set of supported features.
Arguments:
features -- The new set of supported features.
"""
self.del_features()
for feature in features:
self.add_feature(feature)
def del_features(self):
"""Remove all features."""
self._features = set()
for feature_xml in self.findall('{%s}feature' % self.namespace):
self.xml.remove(feature_xml)

View file

@ -0,0 +1,138 @@
"""
SleekXMPP: The Sleek XMPP Library
Copyright (C) 2010 Nathanael C. Fritz, Lance J.T. Stout
This file is part of SleekXMPP.
See the file LICENSE for copying permission.
"""
from sleekxmpp.xmlstream import ElementBase, ET
class DiscoItems(ElementBase):
"""
Example disco#items stanzas:
<iq type="get">
<query xmlns="http://jabber.org/protocol/disco#items" />
</iq>
<iq type="result">
<query xmlns="http://jabber.org/protocol/disco#items">
<item jid="chat.example.com"
node="xmppdev"
name="XMPP Dev" />
<item jid="chat.example.com"
node="sleekdev"
name="SleekXMPP Dev" />
</query>
</iq>
Stanza Interface:
node -- The name of the node to either
query or return info from.
items -- A list of 3-tuples, where each tuple contains
the JID, node, and name of an item.
Methods:
add_item -- Add a single new item.
del_item -- Remove a single item.
get_items -- Return all items.
set_items -- Set or replace all items.
del_items -- Remove all items.
"""
name = 'query'
namespace = 'http://jabber.org/protocol/disco#items'
plugin_attrib = 'disco_items'
interfaces = set(('node', 'items'))
# Cache items
_items = set()
def setup(self, xml=None):
"""
Populate the stanza object using an optional XML object.
Overrides ElementBase.setup
Caches item information.
Arguments:
xml -- Use an existing XML object for the stanza's values.
"""
ElementBase.setup(self, xml)
self._items = set([item[0:2] for item in self['items']])
def add_item(self, jid, node=None, name=None):
"""
Add a new item element. Each item is required to have a
JID, but may also specify a node value to reference
non-addressable entitities.
Arguments:
jid -- The JID for the item.
node -- Optional additional information to reference
non-addressable items.
name -- Optional human readable name for the item.
"""
if (jid, node) not in self._items:
self._items.add((jid, node))
item_xml = ET.Element('{%s}item' % self.namespace)
item_xml.attrib['jid'] = jid
if name:
item_xml.attrib['name'] = name
if node:
item_xml.attrib['node'] = node
self.xml.append(item_xml)
return True
return False
def del_item(self, jid, node=None):
"""
Remove a single item.
Arguments:
jid -- JID of the item to remove.
node -- Optional extra identifying information.
"""
if (jid, node) in self._items:
for item_xml in self.findall('{%s}item' % self.namespace):
item = (item_xml.attrib['jid'],
item_xml.attrib.get('node', None))
if item == (jid, node):
self.xml.remove(item_xml)
return True
return False
def get_items(self):
"""Return all items."""
items = set()
for item_xml in self.findall('{%s}item' % self.namespace):
item = (item_xml.attrib['jid'],
item_xml.attrib.get('node'),
item_xml.attrib.get('name'))
items.add(item)
return items
def set_items(self, items):
"""
Set or replace all items. The given items must be in a
list or set where each item is a tuple of the form:
(jid, node, name)
Arguments:
items -- A series of items in tuple format.
"""
self.del_items()
for item in items:
jid, node, name = item
self.add_item(jid, node, name)
def del_items(self):
"""Remove all items."""
self._items = set()
for item_xml in self.findall('{%s}item' % self.namespace):
self.xml.remove(item_xml)

View file

@ -0,0 +1,127 @@
"""
SleekXMPP: The Sleek XMPP Library
Copyright (C) 2010 Nathanael C. Fritz, Lance J.T. Stout
This file is part of SleekXMPP.
See the file LICENSE for copying permission.
"""
import logging
import sleekxmpp
from sleekxmpp import Iq
from sleekxmpp.exceptions import XMPPError
from sleekxmpp.plugins.base import base_plugin
from sleekxmpp.xmlstream.handler import Callback
from sleekxmpp.xmlstream.matcher import StanzaPath
from sleekxmpp.xmlstream import register_stanza_plugin, ElementBase, ET, JID
from sleekxmpp.plugins.xep_0030 import DiscoInfo, DiscoItems
log = logging.getLogger(__name__)
class StaticDisco(object):
"""
While components will likely require fully dynamic handling
of service discovery information, most clients and simple bots
only need to manage a few disco nodes that will remain mostly
static.
StaticDisco provides a set of node handlers that will store
static sets of disco info and items in memory.
"""
def __init__(self, xmpp):
"""
Arguments:
xmpp -- The main SleekXMPP object.
"""
self.nodes = {}
self.xmpp = xmpp
def add_node(self, jid=None, node=None):
if jid is None:
jid = self.xmpp.boundjid.full
if node is None:
node = ''
if (jid, node) not in self.nodes:
self.nodes[(jid, node)] = {'info': DiscoInfo(),
'items': DiscoItems()}
self.nodes[(jid, node)]['info']['node'] = node
self.nodes[(jid, node)]['items']['node'] = node
def get_info(self, jid, node, data=None):
if (jid, node) not in self.nodes:
if not node:
return DiscoInfo()
else:
raise XMPPError(condition='item-not-found')
else:
return self.nodes[(jid, node)]['info']
def del_info(self, jid, node, data=None):
if (jid, node) in self.nodes:
self.nodes[(jid, node)]['info'] = DiscoInfo()
def get_items(self, jid, node, data=None):
if (jid, node) not in self.nodes:
if not node:
return DiscoInfo()
else:
raise XMPPError(condition='item-not-found')
else:
return self.nodes[(jid, node)]['items']
def set_items(self, jid, node, data=None):
pass
def del_items(self, jid, node, data=None):
if (jid, node) in self.nodes:
self.nodes[(jid, node)]['items'] = DiscoItems()
def add_identity(self, jid, node, data={}):
self.add_node(jid, node)
self.nodes[(jid, node)]['info'].add_identity(
data.get('category', ''),
data.get('itype', ''),
data.get('name', None),
data.get('lang', None))
def set_identities(self, jid, node, data=None):
pass
def del_identity(self, jid, node, data=None):
if (jid, node) not in self.nodes:
return
self.nodes[(jid, node)]['info'].del_identity(
data.get('category', ''),
data.get('itype', ''),
data.get('name', None),
data.get('lang', None))
def add_feature(self, jid, node, data=None):
self.add_node(jid, node)
self.nodes[(jid, node)]['info'].add_feature(data.get('feature', ''))
def set_features(self, jid, node, data=None):
pass
def del_feature(self, jid, node, data=None):
if (jid, node) not in self.nodes:
return
self.nodes[(jid, node)]['info'].del_feature(data.get('feature', ''))
def add_item(self, jid, node, data=None):
self.add_node(jid, node)
self.nodes[(jid, node)]['items'].add_item(
data.get('ijid', ''),
node=data.get('inode', None),
name=data.get('name', None))
def del_item(self, jid, node, data=None):
if (jid, node) in self.nodes:
self.nodes[(jid, node)]['items'].del_item(**data)

View file

@ -4,6 +4,11 @@ import sleekxmpp.plugins.xep_0030 as xep_0030
class TestDisco(SleekTest):
"""
Test creating and manipulating the disco#info and
disco#items stanzas from the XEP-0030 plugin.
"""
def setUp(self):
register_stanza_plugin(Iq, xep_0030.DiscoInfo)
register_stanza_plugin(Iq, xep_0030.DiscoItems)
@ -11,11 +16,10 @@ class TestDisco(SleekTest):
def testCreateInfoQueryNoNode(self):
"""Testing disco#info query with no node."""
iq = self.Iq()
iq['id'] = "0"
iq['disco_info']['node'] = ''
self.check(iq, """
<iq id="0">
<iq>
<query xmlns="http://jabber.org/protocol/disco#info" />
</iq>
""")
@ -23,23 +27,22 @@ class TestDisco(SleekTest):
def testCreateInfoQueryWithNode(self):
"""Testing disco#info query with a node."""
iq = self.Iq()
iq['id'] = "0"
iq['disco_info']['node'] = 'foo'
self.check(iq, """
<iq id="0">
<query xmlns="http://jabber.org/protocol/disco#info" node="foo" />
<iq>
<query xmlns="http://jabber.org/protocol/disco#info"
node="foo" />
</iq>
""")
def testCreateInfoQueryNoNode(self):
def testCreateItemsQueryNoNode(self):
"""Testing disco#items query with no node."""
iq = self.Iq()
iq['id'] = "0"
iq['disco_items']['node'] = ''
self.check(iq, """
<iq id="0">
<iq>
<query xmlns="http://jabber.org/protocol/disco#items" />
</iq>
""")
@ -47,130 +50,467 @@ class TestDisco(SleekTest):
def testCreateItemsQueryWithNode(self):
"""Testing disco#items query with a node."""
iq = self.Iq()
iq['id'] = "0"
iq['disco_items']['node'] = 'foo'
self.check(iq, """
<iq id="0">
<query xmlns="http://jabber.org/protocol/disco#items" node="foo" />
<iq>
<query xmlns="http://jabber.org/protocol/disco#items"
node="foo" />
</iq>
""")
def testInfoIdentities(self):
def testIdentities(self):
"""Testing adding identities to disco#info."""
iq = self.Iq()
iq['id'] = "0"
iq['disco_info']['node'] = 'foo'
iq['disco_info'].addIdentity('conference', 'text', 'Chatroom')
iq['disco_info'].add_identity('conference', 'text',
name='Chatroom',
lang='en')
self.check(iq, """
<iq id="0">
<query xmlns="http://jabber.org/protocol/disco#info" node="foo">
<identity category="conference" type="text" name="Chatroom" />
<iq>
<query xmlns="http://jabber.org/protocol/disco#info">
<identity category="conference"
type="text"
name="Chatroom"
xml:lang="en" />
</query>
</iq>
""")
def testInfoFeatures(self):
"""Testing adding features to disco#info."""
def testDuplicateIdentities(self):
"""
Test adding multiple copies of the same category
and type combination. Only the first identity should
be kept.
"""
iq = self.Iq()
iq['id'] = "0"
iq['disco_info']['node'] = 'foo'
iq['disco_info'].addFeature('foo')
iq['disco_info'].addFeature('bar')
iq['disco_info'].add_identity('conference', 'text',
name='Chatroom')
iq['disco_info'].add_identity('conference', 'text',
name='MUC')
self.check(iq, """
<iq>
<query xmlns="http://jabber.org/protocol/disco#info">
<identity category="conference"
type="text"
name="Chatroom" />
</query>
</iq>
""")
def testDuplicateIdentitiesWithLangs(self):
"""
Test adding multiple copies of the same category,
type, and language combination. Only the first identity
should be kept.
"""
iq = self.Iq()
iq['disco_info'].add_identity('conference', 'text',
name='Chatroom',
lang='en')
iq['disco_info'].add_identity('conference', 'text',
name='MUC',
lang='en')
self.check(iq, """
<iq>
<query xmlns="http://jabber.org/protocol/disco#info">
<identity category="conference"
type="text"
name="Chatroom"
xml:lang="en" />
</query>
</iq>
""")
def testRemoveIdentitiesNoLang(self):
"""Test removing identities from a disco#info stanza."""
iq = self.Iq()
iq['disco_info'].add_identity('client', 'pc')
iq['disco_info'].add_identity('client', 'bot')
iq['disco_info'].del_identity('client', 'pc')
self.check(iq, """
<iq id="0">
<query xmlns="http://jabber.org/protocol/disco#info" node="foo">
<iq>
<query xmlns="http://jabber.org/protocol/disco#info">
<identity category="client" type="bot" />
</query>
</iq>
""")
def testRemoveIdentitiesWithLang(self):
"""Test removing identities from a disco#info stanza."""
iq = self.Iq()
iq['disco_info'].add_identity('client', 'pc')
iq['disco_info'].add_identity('client', 'bot')
iq['disco_info'].add_identity('client', 'pc', lang='no')
iq['disco_info'].del_identity('client', 'pc')
self.check(iq, """
<iq>
<query xmlns="http://jabber.org/protocol/disco#info">
<identity category="client" type="bot" />
<identity category="client"
type="pc"
xml:lang="no" />
</query>
</iq>
""")
def testRemoveAllIdentitiesNoLang(self):
"""Test removing all identities from a disco#info stanza."""
iq = self.Iq()
iq['disco_info'].add_identity('client', 'bot', name='Bot')
iq['disco_info'].add_identity('client', 'bot', lang='no')
iq['disco_info'].add_identity('client', 'console')
del iq['disco_info']['identities']
self.check(iq, """
<iq>
<query xmlns="http://jabber.org/protocol/disco#info" />
</iq>
""")
def testRemoveAllIdentitiesWithLang(self):
"""Test removing all identities from a disco#info stanza."""
iq = self.Iq()
iq['disco_info'].add_identity('client', 'bot', name='Bot')
iq['disco_info'].add_identity('client', 'bot', lang='no')
iq['disco_info'].add_identity('client', 'console')
iq['disco_info'].del_identities(lang='no')
self.check(iq, """
<iq>
<query xmlns="http://jabber.org/protocol/disco#info">
<identity category="client" type="bot" name="Bot" />
<identity category="client" type="console" />
</query>
</iq>
""")
def testAddBatchIdentitiesNoLang(self):
"""Test adding multiple identities at once to a disco#info stanza."""
iq = self.Iq()
identities = [('client', 'pc', 'no', 'PC Client'),
('client', 'bot', None, 'Bot'),
('client', 'console', None, None)]
iq['disco_info']['identities'] = identities
self.check(iq, """
<iq>
<query xmlns="http://jabber.org/protocol/disco#info">
<identity category="client"
type="pc"
xml:lang="no"
name="PC Client" />
<identity category="client" type="bot" name="Bot" />
<identity category="client" type="console" />
</query>
</iq>
""")
def testAddBatchIdentitiesWithLang(self):
"""Test selectively replacing identities based on language."""
iq = self.Iq()
iq['disco_info'].add_identity('client', 'pc', lang='no')
iq['disco_info'].add_identity('client', 'pc', lang='en')
iq['disco_info'].add_identity('client', 'pc', lang='fr')
identities = [('client', 'bot', 'fr', 'Bot'),
('client', 'bot', 'en', 'Bot')]
iq['disco_info'].set_identities(identities, lang='fr')
self.check(iq, """
<iq>
<query xmlns="http://jabber.org/protocol/disco#info">
<identity category="client" type="pc" xml:lang="no" />
<identity category="client" type="pc" xml:lang="en" />
<identity category="client"
type="bot"
xml:lang="fr"
name="Bot" />
<identity category="client"
type="bot"
xml:lang="en"
name="Bot" />
</query>
</iq>
""")
def testGetIdentitiesNoLang(self):
"""Test getting all identities from a disco#info stanza."""
iq = self.Iq()
iq['disco_info'].add_identity('client', 'pc')
iq['disco_info'].add_identity('client', 'pc', lang='no')
iq['disco_info'].add_identity('client', 'pc', lang='en')
iq['disco_info'].add_identity('client', 'pc', lang='fr')
expected = set([('client', 'pc', None, None),
('client', 'pc', 'no', None),
('client', 'pc', 'en', None),
('client', 'pc', 'fr', None)])
self.failUnless(iq['disco_info']['identities'] == expected,
"Identities do not match:\n%s\n%s" % (
expected,
iq['disco_info']['identities']))
def testGetIdentitiesWithLang(self):
"""
Test getting all identities of a given
lang from a disco#info stanza.
"""
iq = self.Iq()
iq['disco_info'].add_identity('client', 'pc')
iq['disco_info'].add_identity('client', 'pc', lang='no')
iq['disco_info'].add_identity('client', 'pc', lang='en')
iq['disco_info'].add_identity('client', 'pc', lang='fr')
expected = set([('client', 'pc', 'no', None)])
result = iq['disco_info'].get_identities(lang='no')
self.failUnless(result == expected,
"Identities do not match:\n%s\n%s" % (
expected, result))
def testFeatures(self):
"""Testing adding features to disco#info."""
iq = self.Iq()
iq['disco_info'].add_feature('foo')
iq['disco_info'].add_feature('bar')
self.check(iq, """
<iq>
<query xmlns="http://jabber.org/protocol/disco#info">
<feature var="foo" />
<feature var="bar" />
</query>
</iq>
""")
def testFeaturesDuplicate(self):
"""Test adding duplicate features to disco#info."""
iq = self.Iq()
iq['disco_info'].add_feature('foo')
iq['disco_info'].add_feature('bar')
iq['disco_info'].add_feature('foo')
self.check(iq, """
<iq>
<query xmlns="http://jabber.org/protocol/disco#info">
<feature var="foo" />
<feature var="bar" />
</query>
</iq>
""")
def testRemoveFeature(self):
"""Test removing a feature from disco#info."""
iq = self.Iq()
iq['disco_info'].add_feature('foo')
iq['disco_info'].add_feature('bar')
iq['disco_info'].add_feature('baz')
iq['disco_info'].del_feature('foo')
self.check(iq, """
<iq>
<query xmlns="http://jabber.org/protocol/disco#info">
<feature var="bar" />
<feature var="baz" />
</query>
</iq>
""")
def testGetFeatures(self):
"""Test getting all features from a disco#info stanza."""
iq = self.Iq()
iq['disco_info'].add_feature('foo')
iq['disco_info'].add_feature('bar')
iq['disco_info'].add_feature('baz')
expected = set(['foo', 'bar', 'baz'])
self.failUnless(iq['disco_info']['features'] == expected,
"Features do not match:\n%s\n%s" % (
expected,
iq['disco_info']['features']))
def testRemoveAllFeatures(self):
"""Test removing all features from a disco#info stanza."""
iq = self.Iq()
iq['disco_info'].add_feature('foo')
iq['disco_info'].add_feature('bar')
iq['disco_info'].add_feature('baz')
del iq['disco_info']['features']
self.check(iq, """
<iq>
<query xmlns="http://jabber.org/protocol/disco#info" />
</iq>
""")
def testAddBatchFeatures(self):
"""Test adding multiple features at once to a disco#info stanza."""
iq = self.Iq()
features = ['foo', 'bar', 'baz']
iq['disco_info']['features'] = features
self.check(iq, """
<iq>
<query xmlns="http://jabber.org/protocol/disco#info">
<feature var="foo" />
<feature var="bar" />
<feature var="baz" />
</query>
</iq>
""")
def testItems(self):
"""Testing adding features to disco#info."""
iq = self.Iq()
iq['id'] = "0"
iq['disco_items']['node'] = 'foo'
iq['disco_items'].addItem('user@localhost')
iq['disco_items'].addItem('user@localhost', 'foo')
iq['disco_items'].addItem('user@localhost', 'bar', 'Testing')
iq['disco_items'].add_item('user@localhost')
iq['disco_items'].add_item('user@localhost', 'foo')
iq['disco_items'].add_item('user@localhost', 'bar', name='Testing')
self.check(iq, """
<iq id="0">
<query xmlns="http://jabber.org/protocol/disco#items" node="foo">
<iq>
<query xmlns="http://jabber.org/protocol/disco#items">
<item jid="user@localhost" />
<item node="foo" jid="user@localhost" />
<item node="bar" jid="user@localhost" name="Testing" />
<item jid="user@localhost"
node="foo" />
<item jid="user@localhost"
node="bar"
name="Testing" />
</query>
</iq>
""")
def testAddRemoveIdentities(self):
"""Test adding and removing identities to disco#info stanza"""
ids = [('automation', 'commands', 'AdHoc'),
('conference', 'text', 'ChatRoom')]
def testDuplicateItems(self):
"""Test adding items with the same JID without any nodes."""
iq = self.Iq()
iq['disco_items'].add_item('user@localhost', name='First')
iq['disco_items'].add_item('user@localhost', name='Second')
info = xep_0030.DiscoInfo()
info.addIdentity(*ids[0])
self.failUnless(info.getIdentities() == [ids[0]])
self.check(iq, """
<iq>
<query xmlns="http://jabber.org/protocol/disco#items">
<item jid="user@localhost" name="First" />
</query>
</iq>
""")
info.delIdentity('automation', 'commands')
self.failUnless(info.getIdentities() == [])
info.setIdentities(ids)
self.failUnless(info.getIdentities() == ids)
def testDuplicateItemsWithNodes(self):
"""Test adding items with the same JID/node combination."""
iq = self.Iq()
iq['disco_items'].add_item('user@localhost',
node='foo',
name='First')
iq['disco_items'].add_item('user@localhost',
node='foo',
name='Second')
info.delIdentity('automation', 'commands')
self.failUnless(info.getIdentities() == [ids[1]])
self.check(iq, """
<iq>
<query xmlns="http://jabber.org/protocol/disco#items">
<item jid="user@localhost" node="foo" name="First" />
</query>
</iq>
""")
info.delIdentities()
self.failUnless(info.getIdentities() == [])
def testRemoveItemsNoNode(self):
"""Test removing items without nodes from a disco#items stanza."""
iq = self.Iq()
iq['disco_items'].add_item('user@localhost')
iq['disco_items'].add_item('user@localhost', node='foo')
iq['disco_items'].add_item('test@localhost')
def testAddRemoveFeatures(self):
"""Test adding and removing features to disco#info stanza"""
features = ['foo', 'bar', 'baz']
iq['disco_items'].del_item('user@localhost')
info = xep_0030.DiscoInfo()
info.addFeature(features[0])
self.failUnless(info.getFeatures() == [features[0]])
self.check(iq, """
<iq>
<query xmlns="http://jabber.org/protocol/disco#items">
<item jid="user@localhost" node="foo" />
<item jid="test@localhost" />
</query>
</iq>
""")
info.delFeature('foo')
self.failUnless(info.getFeatures() == [])
def testRemoveItemsWithNode(self):
"""Test removing items with nodes from a disco#items stanza."""
iq = self.Iq()
iq['disco_items'].add_item('user@localhost')
iq['disco_items'].add_item('user@localhost', node='foo')
iq['disco_items'].add_item('test@localhost')
info.setFeatures(features)
self.failUnless(info.getFeatures() == features)
iq['disco_items'].del_item('user@localhost', node='foo')
info.delFeature('bar')
self.failUnless(info.getFeatures() == ['foo', 'baz'])
self.check(iq, """
<iq>
<query xmlns="http://jabber.org/protocol/disco#items">
<item jid="user@localhost" />
<item jid="test@localhost" />
</query>
</iq>
""")
info.delFeatures()
self.failUnless(info.getFeatures() == [])
def testGetItems(self):
"""Test retrieving items from disco#items stanza."""
iq = self.Iq()
iq['disco_items'].add_item('user@localhost')
iq['disco_items'].add_item('user@localhost', node='foo')
iq['disco_items'].add_item('test@localhost',
node='bar',
name='Tester')
def testAddRemoveItems(self):
"""Test adding and removing items to disco#items stanza"""
items = [('user@localhost', None, None),
expected = set([('user@localhost', None, None),
('user@localhost', 'foo', None),
('user@localhost', 'bar', 'Test')]
('test@localhost', 'bar', 'Tester')])
self.failUnless(iq['disco_items']['items'] == expected,
"Items do not match:\n%s\n%s" % (
expected,
iq['disco_items']['items']))
info = xep_0030.DiscoItems()
self.failUnless(True, ""+str(items[0]))
def testRemoveAllItems(self):
"""Test removing all items from a disco#items stanza."""
iq = self.Iq()
iq['disco_items'].add_item('user@localhost')
iq['disco_items'].add_item('user@localhost', node='foo')
iq['disco_items'].add_item('test@localhost',
node='bar',
name='Tester')
info.addItem(*(items[0]))
self.failUnless(info.getItems() == [items[0]], info.getItems())
del iq['disco_items']['items']
info.delItem('user@localhost')
self.failUnless(info.getItems() == [])
self.check(iq, """
<iq>
<query xmlns="http://jabber.org/protocol/disco#items" />
</iq>
""")
info.setItems(items)
self.failUnless(info.getItems() == items)
def testAddBatchItems(self):
"""Test adding multiple items to a disco#items stanza."""
iq = self.Iq()
items = [('user@localhost', 'foo', 'Test'),
('test@localhost', None, None),
('other@localhost', None, 'Other')]
info.delItem('user@localhost', 'foo')
self.failUnless(info.getItems() == [items[0], items[2]])
info.delItems()
self.failUnless(info.getItems() == [])
iq['disco_items']['items'] = items
self.check(iq, """
<iq>
<query xmlns="http://jabber.org/protocol/disco#items">
<item jid="user@localhost" node="foo" name="Test" />
<item jid="test@localhost" />
<item jid="other@localhost" name="Other" />
</query>
</iq>
""")
suite = unittest.TestLoader().loadTestsFromTestCase(TestDisco)

View file

@ -1,8 +1,11 @@
import time
import threading
from sleekxmpp.test import *
class TestStreamDisco(SleekTest):
"""
Test using the XEP-0030 plugin.
"""
@ -10,15 +13,16 @@ class TestStreamDisco(SleekTest):
def tearDown(self):
self.stream_close()
def testInfoEmptyNode(self):
def testInfoEmptyDefaultNode(self):
"""
Info queries to a node MUST have at least one identity
Info query result from an entity MUST have at least one identity
and feature, namely http://jabber.org/protocol/disco#info.
Since the XEP-0030 plugin is loaded, a disco response should
be generated and not an error result.
"""
self.stream_start(plugins=['xep_0030'])
self.stream_start(mode='client',
plugins=['xep_0030'])
self.recv("""
<iq type="get" id="test">
@ -32,13 +36,15 @@ class TestStreamDisco(SleekTest):
<identity category="client" type="bot" />
<feature var="http://jabber.org/protocol/disco#info" />
</query>
</iq>""")
</iq>
""")
def testInfoEmptyNodeComponent(self):
def testInfoEmptyDefaultNodeComponent(self):
"""
Test requesting an empty node using a Component.
Test requesting an empty, default node using a Component.
"""
self.stream_start(mode='component',
jid='tester.localhost',
plugins=['xep_0030'])
self.recv("""
@ -53,19 +59,22 @@ class TestStreamDisco(SleekTest):
<identity category="component" type="generic" />
<feature var="http://jabber.org/protocol/disco#info" />
</query>
</iq>""")
</iq>
""")
def testInfoIncludeNode(self):
"""
Results for info queries directed to a particular node MUST
include the node in the query response.
"""
self.stream_start(plugins=['xep_0030'])
self.stream_start(mode='client',
plugins=['xep_0030'])
self.xmpp['xep_0030'].add_node('testing')
self.xmpp['xep_0030'].static.add_node(node='testing')
self.recv("""
<iq type="get" id="test">
<iq to="tester@localhost" type="get" id="test">
<query xmlns="http://jabber.org/protocol/disco#info"
node="testing" />
</iq>
@ -79,5 +88,441 @@ class TestStreamDisco(SleekTest):
</iq>""",
method='mask')
def testItemsIncludeNode(self):
"""
Results for items queries directed to a particular node MUST
include the node in the query response.
"""
self.stream_start(mode='client',
plugins=['xep_0030'])
self.xmpp['xep_0030'].static.add_node(node='testing')
self.recv("""
<iq to="tester@localhost" type="get" id="test">
<query xmlns="http://jabber.org/protocol/disco#items"
node="testing" />
</iq>
""")
self.send("""
<iq type="result" id="test">
<query xmlns="http://jabber.org/protocol/disco#items"
node="testing">
</query>
</iq>""",
method='mask')
def testDynamicInfoJID(self):
"""
Test using a dynamic info handler for a particular JID.
"""
self.stream_start(mode='client',
plugins=['xep_0030'])
def dynamic_jid(jid, node, iq):
result = self.xmpp['xep_0030'].stanza.DiscoInfo()
result['node'] = node
result.add_identity('client', 'console', name='Dynamic Info')
return result
self.xmpp['xep_0030'].set_node_handler('get_info',
jid='tester@localhost',
handler=dynamic_jid)
self.recv("""
<iq type="get" id="test" to="tester@localhost">
<query xmlns="http://jabber.org/protocol/disco#info"
node="testing" />
</iq>
""")
self.send("""
<iq type="result" id="test">
<query xmlns="http://jabber.org/protocol/disco#info"
node="testing">
<identity category="client"
type="console"
name="Dynamic Info" />
</query>
</iq>
""")
def testDynamicInfoGlobal(self):
"""
Test using a dynamic info handler for all requests.
"""
self.stream_start(mode='component',
jid='tester.localhost',
plugins=['xep_0030'])
def dynamic_global(jid, node, iq):
result = self.xmpp['xep_0030'].stanza.DiscoInfo()
result['node'] = node
result.add_identity('component', 'generic', name='Dynamic Info')
return result
self.xmpp['xep_0030'].set_node_handler('get_info',
handler=dynamic_global)
self.recv("""
<iq type="get" id="test"
to="user@tester.localhost"
from="tester@localhost">
<query xmlns="http://jabber.org/protocol/disco#info"
node="testing" />
</iq>
""")
self.send("""
<iq type="result" id="test"
to="tester@localhost"
from="user@tester.localhost">
<query xmlns="http://jabber.org/protocol/disco#info"
node="testing">
<identity category="component"
type="generic"
name="Dynamic Info" />
</query>
</iq>
""")
def testOverrideJIDInfoHandler(self):
"""Test overriding a JID info handler."""
self.stream_start(mode='client',
plugins=['xep_0030'])
def dynamic_jid(jid, node, iq):
result = self.xmpp['xep_0030'].stanza.DiscoInfo()
result['node'] = node
result.add_identity('client', 'console', name='Dynamic Info')
return result
self.xmpp['xep_0030'].set_node_handler('get_info',
jid='tester@localhost',
handler=dynamic_jid)
self.xmpp['xep_0030'].make_static(jid='tester@localhost',
node='testing')
self.xmpp['xep_0030'].add_identity(jid='tester@localhost',
node='testing',
category='automation',
itype='command-list')
self.recv("""
<iq type="get" id="test" to="tester@localhost">
<query xmlns="http://jabber.org/protocol/disco#info"
node="testing" />
</iq>
""")
self.send("""
<iq type="result" id="test">
<query xmlns="http://jabber.org/protocol/disco#info"
node="testing">
<identity category="automation"
type="command-list" />
</query>
</iq>
""")
def testOverrideGlobalInfoHandler(self):
"""Test overriding the global JID info handler."""
self.stream_start(mode='component',
jid='tester.localhost',
plugins=['xep_0030'])
def dynamic_global(jid, node, iq):
result = self.xmpp['xep_0030'].stanza.DiscoInfo()
result['node'] = node
result.add_identity('component', 'generic', name='Dynamic Info')
return result
self.xmpp['xep_0030'].set_node_handler('get_info',
handler=dynamic_global)
self.xmpp['xep_0030'].make_static(jid='user@tester.localhost',
node='testing')
self.xmpp['xep_0030'].add_feature(jid='user@tester.localhost',
node='testing',
feature='urn:xmpp:ping')
self.recv("""
<iq type="get" id="test"
to="user@tester.localhost"
from="tester@localhost">
<query xmlns="http://jabber.org/protocol/disco#info"
node="testing" />
</iq>
""")
self.send("""
<iq type="result" id="test"
to="tester@localhost"
from="user@tester.localhost">
<query xmlns="http://jabber.org/protocol/disco#info"
node="testing">
<feature var="urn:xmpp:ping" />
</query>
</iq>
""")
def testGetInfoRemote(self):
"""
Test sending a disco#info query to another entity
and receiving the result.
"""
self.stream_start(mode='client',
plugins=['xep_0030'])
events = set()
def handle_disco_info(iq):
events.add('disco_info')
self.xmpp.add_event_handler('disco_info', handle_disco_info)
t = threading.Thread(name="get_info",
target=self.xmpp['xep_0030'].get_info,
args=('user@localhost', 'foo'))
t.start()
self.send("""
<iq type="get" to="user@localhost" id="1">
<query xmlns="http://jabber.org/protocol/disco#info"
node="foo" />
</iq>
""")
self.recv("""
<iq type="result" to="tester@localhost" id="1">
<query xmlns="http://jabber.org/protocol/disco#info"
node="foo">
<identity category="client" type="bot" />
<feature var="urn:xmpp:ping" />
</query>
</iq>
""")
# Wait for disco#info request to be received.
t.join()
time.sleep(0.1)
self.assertEqual(events, set(('disco_info',)),
"Disco info event was not triggered: %s" % events)
def testDynamicItemsJID(self):
"""
Test using a dynamic items handler for a particular JID.
"""
self.stream_start(mode='client',
plugins=['xep_0030'])
def dynamic_jid(jid, node, iq):
result = self.xmpp['xep_0030'].stanza.DiscoItems()
result['node'] = node
result.add_item('tester@localhost', node='foo', name='JID')
return result
self.xmpp['xep_0030'].set_node_handler('get_items',
jid='tester@localhost',
handler=dynamic_jid)
self.recv("""
<iq type="get" id="test" to="tester@localhost">
<query xmlns="http://jabber.org/protocol/disco#items"
node="testing" />
</iq>
""")
self.send("""
<iq type="result" id="test">
<query xmlns="http://jabber.org/protocol/disco#items"
node="testing">
<item jid="tester@localhost" node="foo" name="JID" />
</query>
</iq>
""")
def testDynamicItemsGlobal(self):
"""
Test using a dynamic items handler for all requests.
"""
self.stream_start(mode='component',
jid='tester.localhost',
plugins=['xep_0030'])
def dynamic_global(jid, node, iq):
result = self.xmpp['xep_0030'].stanza.DiscoItems()
result['node'] = node
result.add_item('tester@localhost', node='foo', name='Global')
return result
self.xmpp['xep_0030'].set_node_handler('get_items',
handler=dynamic_global)
self.recv("""
<iq type="get" id="test"
to="user@tester.localhost"
from="tester@localhost">
<query xmlns="http://jabber.org/protocol/disco#items"
node="testing" />
</iq>
""")
self.send("""
<iq type="result" id="test"
to="tester@localhost"
from="user@tester.localhost">
<query xmlns="http://jabber.org/protocol/disco#items"
node="testing">
<item jid="tester@localhost" node="foo" name="Global" />
</query>
</iq>
""")
def testOverrideJIDItemsHandler(self):
"""Test overriding a JID items handler."""
self.stream_start(mode='client',
plugins=['xep_0030'])
def dynamic_jid(jid, node, iq):
result = self.xmpp['xep_0030'].stanza.DiscoItems()
result['node'] = node
result.add_item('tester@localhost', node='foo', name='Global')
return result
self.xmpp['xep_0030'].set_node_handler('get_items',
jid='tester@localhost',
handler=dynamic_jid)
self.xmpp['xep_0030'].make_static(jid='tester@localhost',
node='testing')
self.xmpp['xep_0030'].add_item(jid='tester@localhost',
node='testing',
ijid='tester@localhost',
inode='foo',
name='Test')
self.recv("""
<iq type="get" id="test" to="tester@localhost">
<query xmlns="http://jabber.org/protocol/disco#items"
node="testing" />
</iq>
""")
self.send("""
<iq type="result" id="test">
<query xmlns="http://jabber.org/protocol/disco#items"
node="testing">
<item jid="tester@localhost" node="foo" name="Test" />
</query>
</iq>
""")
def testOverrideGlobalItemsHandler(self):
"""Test overriding the global JID items handler."""
self.stream_start(mode='component',
jid='tester.localhost',
plugins=['xep_0030'])
def dynamic_global(jid, node, iq):
result = self.xmpp['xep_0030'].stanza.DiscoItems()
result['node'] = node
result.add_item('tester.localhost', node='foo', name='Global')
return result
self.xmpp['xep_0030'].set_node_handler('get_items',
handler=dynamic_global)
self.xmpp['xep_0030'].make_static(jid='user@tester.localhost',
node='testing')
self.xmpp['xep_0030'].add_item(jid='user@tester.localhost',
node='testing',
ijid='user@tester.localhost',
inode='foo',
name='Test')
self.recv("""
<iq type="get" id="test"
to="user@tester.localhost"
from="tester@localhost">
<query xmlns="http://jabber.org/protocol/disco#items"
node="testing" />
</iq>
""")
self.send("""
<iq type="result" id="test"
to="tester@localhost"
from="user@tester.localhost">
<query xmlns="http://jabber.org/protocol/disco#items"
node="testing">
<item jid="user@tester.localhost" node="foo" name="Test" />
</query>
</iq>
""")
def testGetItemsRemote(self):
"""
Test sending a disco#items query to another entity
and receiving the result.
"""
self.stream_start(mode='client',
plugins=['xep_0030'])
events = set()
results = set()
def handle_disco_items(iq):
events.add('disco_items')
results.update(iq['disco_items']['items'])
self.xmpp.add_event_handler('disco_items', handle_disco_items)
t = threading.Thread(name="get_items",
target=self.xmpp['xep_0030'].get_items,
args=('user@localhost', 'foo'))
t.start()
self.send("""
<iq type="get" to="user@localhost" id="1">
<query xmlns="http://jabber.org/protocol/disco#items"
node="foo" />
</iq>
""")
self.recv("""
<iq type="result" to="tester@localhost" id="1">
<query xmlns="http://jabber.org/protocol/disco#items"
node="foo">
<item jid="user@localhost" node="bar" name="Test" />
<item jid="user@localhost" node="baz" name="Test 2" />
</query>
</iq>
""")
# Wait for disco#items request to be received.
t.join()
time.sleep(0.1)
items = set([('user@localhost', 'bar', 'Test'),
('user@localhost', 'baz', 'Test 2')])
self.assertEqual(events, set(('disco_items',)),
"Disco items event was not triggered: %s" % events)
self.assertEqual(results, items,
"Unexpected items: %s" % results)
suite = unittest.TestLoader().loadTestsFromTestCase(TestStreamDisco)