Merge branch 'develop' of github.com:fritzy/SleekXMPP into develop

This commit is contained in:
Nathan Fritz 2010-06-01 21:45:15 -07:00
commit 18e27d65ce
13 changed files with 841 additions and 121 deletions

View file

@ -0,0 +1,171 @@
import logging
import sleekxmpp
from optparse import OptionParser
from xml.etree import cElementTree as ET
import os
import time
import sys
import unittest
import sleekxmpp.plugins.xep_0004
from sleekxmpp.xmlstream.matcher.stanzapath import StanzaPath
from sleekxmpp.xmlstream.handler.waiter import Waiter
try:
import configparser
except ImportError:
import ConfigParser as configparser
try:
import queue
except ImportError:
import Queue as queue
class TestClient(sleekxmpp.ClientXMPP):
def __init__(self, jid, password):
sleekxmpp.ClientXMPP.__init__(self, jid, password)
self.add_event_handler("session_start", self.start)
#self.add_event_handler("message", self.message)
self.waitforstart = queue.Queue()
def start(self, event):
self.getRoster()
self.sendPresence()
self.waitforstart.put(True)
class TestPubsubServer(unittest.TestCase):
statev = {}
def __init__(self, *args, **kwargs):
unittest.TestCase.__init__(self, *args, **kwargs)
def setUp(self):
pass
def test001getdefaultconfig(self):
"""Get the default node config"""
self.xmpp1['xep_0060'].deleteNode(self.pshost, 'testnode2')
self.xmpp1['xep_0060'].deleteNode(self.pshost, 'testnode3')
self.xmpp1['xep_0060'].deleteNode(self.pshost, 'testnode4')
self.xmpp1['xep_0060'].deleteNode(self.pshost, 'testnode5')
result = self.xmpp1['xep_0060'].getNodeConfig(self.pshost)
self.statev['defaultconfig'] = result
self.failUnless(isinstance(result, sleekxmpp.plugins.xep_0004.Form))
def test002createdefaultnode(self):
"""Create a node without config"""
self.failUnless(self.xmpp1['xep_0060'].create_node(self.pshost, 'testnode1'))
def test003deletenode(self):
"""Delete recently created node"""
self.failUnless(self.xmpp1['xep_0060'].deleteNode(self.pshost, 'testnode1'))
def test004createnode(self):
"""Create a node with a config"""
self.statev['defaultconfig'].field['pubsub#access_model'].setValue('open')
self.statev['defaultconfig'].field['pubsub#notify_retract'].setValue(True)
self.statev['defaultconfig'].field['pubsub#persist_items'].setValue(True)
self.statev['defaultconfig'].field['pubsub#presence_based_delivery'].setValue(True)
p = self.xmpp2.Presence()
p['to'] = self.pshost
p.send()
self.failUnless(self.xmpp1['xep_0060'].create_node(self.pshost, 'testnode2', self.statev['defaultconfig'], ntype='job'))
def test005reconfigure(self):
"""Retrieving node config and reconfiguring"""
nconfig = self.xmpp1['xep_0060'].getNodeConfig(self.pshost, 'testnode2')
self.failUnless(nconfig, "No configuration returned")
#print("\n%s ==\n %s" % (nconfig.getValues(), self.statev['defaultconfig'].getValues()))
self.failUnless(nconfig.getValues() == self.statev['defaultconfig'].getValues(), "Configuration does not match")
self.failUnless(self.xmpp1['xep_0060'].setNodeConfig(self.pshost, 'testnode2', nconfig))
def test006subscribetonode(self):
"""Subscribe to node from account 2"""
self.failUnless(self.xmpp2['xep_0060'].subscribe(self.pshost, "testnode2"))
def test007publishitem(self):
"""Publishing item"""
item = ET.Element('{http://netflint.net/protocol/test}test')
w = Waiter('wait publish', StanzaPath('message/pubsub_event/items'))
self.xmpp2.registerHandler(w)
#result = self.xmpp1['xep_0060'].setItem(self.pshost, "testnode2", (('test1', item),))
result = self.xmpp1['jobs'].createJob(self.pshost, "testnode2", 'test1', item)
msg = w.wait(5) # got to get a result in 5 seconds
self.failUnless(msg != False, "Account #2 did not get message event")
#result = self.xmpp1['xep_0060'].setItem(self.pshost, "testnode2", (('test2', item),))
result = self.xmpp1['jobs'].createJob(self.pshost, "testnode2", 'test2', item)
w = Waiter('wait publish2', StanzaPath('message/pubsub_event/items'))
self.xmpp2.registerHandler(w)
self.xmpp2['jobs'].claimJob(self.pshost, 'testnode2', 'test1')
msg = w.wait(5) # got to get a result in 5 seconds
self.xmpp2['jobs'].claimJob(self.pshost, 'testnode2', 'test2')
self.xmpp2['jobs'].finishJob(self.pshost, 'testnode2', 'test1')
self.xmpp2['jobs'].finishJob(self.pshost, 'testnode2', 'test2')
print result
#need to add check for update
def test900cleanup(self):
"Cleaning up"
#self.failUnless(self.xmpp1['xep_0060'].deleteNode(self.pshost, 'testnode2'), "Could not delete test node.")
time.sleep(10)
if __name__ == '__main__':
#parse command line arguements
optp = OptionParser()
optp.add_option('-q','--quiet', help='set logging to ERROR', action='store_const', dest='loglevel', const=logging.ERROR, default=logging.INFO)
optp.add_option('-d','--debug', help='set logging to DEBUG', action='store_const', dest='loglevel', const=logging.DEBUG, default=logging.INFO)
optp.add_option('-v','--verbose', help='set logging to COMM', action='store_const', dest='loglevel', const=5, default=logging.INFO)
optp.add_option("-c","--config", dest="configfile", default="config.xml", help="set config file to use")
optp.add_option("-n","--nodenum", dest="nodenum", default="1", help="set node number to use")
optp.add_option("-p","--pubsub", dest="pubsub", default="1", help="set pubsub host to use")
opts,args = optp.parse_args()
logging.basicConfig(level=opts.loglevel, format='%(levelname)-8s %(message)s')
#load xml config
logging.info("Loading config file: %s" % opts.configfile)
config = configparser.RawConfigParser()
config.read(opts.configfile)
#init
logging.info("Account 1 is %s" % config.get('account1', 'jid'))
xmpp1 = TestClient(config.get('account1','jid'), config.get('account1','pass'))
logging.info("Account 2 is %s" % config.get('account2', 'jid'))
xmpp2 = TestClient(config.get('account2','jid'), config.get('account2','pass'))
xmpp1.registerPlugin('xep_0004')
xmpp1.registerPlugin('xep_0030')
xmpp1.registerPlugin('xep_0060')
xmpp1.registerPlugin('xep_0199')
xmpp1.registerPlugin('jobs')
xmpp2.registerPlugin('xep_0004')
xmpp2.registerPlugin('xep_0030')
xmpp2.registerPlugin('xep_0060')
xmpp2.registerPlugin('xep_0199')
xmpp2.registerPlugin('jobs')
if not config.get('account1', 'server'):
# we don't know the server, but the lib can probably figure it out
xmpp1.connect()
else:
xmpp1.connect((config.get('account1', 'server'), 5222))
xmpp1.process(threaded=True)
#init
if not config.get('account2', 'server'):
# we don't know the server, but the lib can probably figure it out
xmpp2.connect()
else:
xmpp2.connect((config.get('account2', 'server'), 5222))
xmpp2.process(threaded=True)
TestPubsubServer.xmpp1 = xmpp1
TestPubsubServer.xmpp2 = xmpp2
TestPubsubServer.pshost = config.get('settings', 'pubsub')
xmpp1.waitforstart.get(True)
xmpp2.waitforstart.get(True)
testsuite = unittest.TestLoader().loadTestsFromTestCase(TestPubsubServer)
alltests_suite = unittest.TestSuite([testsuite])
result = unittest.TextTestRunner(verbosity=2).run(alltests_suite)
xmpp1.disconnect()
xmpp2.disconnect()

View file

@ -5,7 +5,6 @@ from xml.etree import cElementTree as ET
import os
import time
import sys
import thread
import unittest
import sleekxmpp.plugins.xep_0004
from sleekxmpp.xmlstream.matcher.stanzapath import StanzaPath

44
sleekxmpp/plugins/jobs.py Normal file
View file

@ -0,0 +1,44 @@
from . import base
import logging
from xml.etree import cElementTree as ET
class jobs(base.base_plugin):
def plugin_init(self):
self.xep = 'pubsubjob'
self.description = "Job distribution over Pubsub"
def post_init(self):
pass
#TODO add event
def createJobNode(self, host, jid, node, config=None):
pass
def createJob(self, host, node, jobid=None, payload=None):
return self.xmpp.plugin['xep_0060'].setItem(host, node, ((jobid, payload),))
def claimJob(self, host, node, jobid, ifrom=None):
return self._setState(host, node, jobid, ET.Element('{http://andyet.net/protocol/pubsubjob}claimed'))
def unclaimJob(self, jobid):
return self._setState(host, node, jobid, ET.Element('{http://andyet.net/protocol/pubsubjob}unclaimed'))
def finishJob(self, host, node, jobid, payload=None):
finished = ET.Element('{http://andyet.net/protocol/pubsubjob}finished')
if payload is not None:
finished.append(payload)
return self._setState(host, node, jobid, finished)
def _setState(self, host, node, jobid, state, ifrom=None):
iq = self.xmpp.Iq()
iq['to'] = host
if ifrom: iq['from'] = ifrom
iq['type'] = 'set'
iq['psstate']['node'] = node
iq['psstate']['item'] = jobid
iq['psstate']['payload'] = state
result = iq.send()
if result is None or result['type'] != 'result':
return False
return True

View file

@ -10,6 +10,39 @@ def stanzaPlugin(stanza, plugin):
stanza.plugin_attrib_map[plugin.plugin_attrib] = plugin
stanza.plugin_tag_map["{%s}%s" % (plugin.namespace, plugin.name)] = plugin
class PubsubState(ElementBase):
namespace = 'http://jabber.org/protocol/psstate'
name = 'state'
plugin_attrib = 'psstate'
interfaces = set(('node', 'item', 'payload'))
plugin_attrib_map = {}
plugin_tag_map = {}
def setPayload(self, value):
self.xml.append(value)
def getPayload(self):
childs = self.xml.getchildren()
if len(childs) > 0:
return childs[0]
def delPayload(self):
for child in self.xml.getchildren():
self.xml.remove(child)
stanzaPlugin(Iq, PubsubState)
class PubsubStateEvent(ElementBase):
namespace = 'http://jabber.org/protocol/psstate#event'
name = 'event'
plugin_attrib = 'psstate_event'
intefaces = set(tuple())
plugin_attrib_map = {}
plugin_tag_map = {}
stanzaPlugin(Message, PubsubStateEvent)
stanzaPlugin(PubsubStateEvent, PubsubState)
class Pubsub(ElementBase):
namespace = 'http://jabber.org/protocol/pubsub'
name = 'pubsub'
@ -321,18 +354,6 @@ class Options(ElementBase):
stanzaPlugin(Pubsub, Options)
stanzaPlugin(Subscribe, Options)
#iq = Iq()
#iq['pubsub']['defaultconfig']
#print(iq)
#from xml.etree import cElementTree as ET
#iq = Iq()
#item = Item()
#item['payload'] = ET.Element("{http://netflint.net/p/crap}stupidshit")
#item['id'] = 'aa11bbcc'
#iq['pubsub']['items'].append(item)
#print(iq)
class OwnerAffiliations(Affiliations):
namespace = 'http://jabber.org/protocol/pubsub#owner'
interfaces = set(('node'))

View file

@ -188,7 +188,6 @@ class Form(FieldContainer):
#def getXML(self, tostring = False):
def getXML(self, ftype=None):
logging.debug("creating form as %s" % ftype)
if ftype:
self.type = ftype
form = ET.Element('{jabber:x:data}x')

View file

@ -1,25 +1,184 @@
"""
SleekXMPP: The Sleek XMPP Library
Copyright (C) 2007 Nathanael C. Fritz
This file is part of SleekXMPP.
SleekXMPP: The Sleek XMPP Library
Copyright (C) 2010 Nathanael C. Fritz, Lance J.T. Stout
This file is part of SleekXMPP.
SleekXMPP is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
SleekXMPP is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with SleekXMPP; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
See the file license.txt for copying permissio
"""
from . import base
import logging
from xml.etree import cElementTree as ET
from . import base
from .. xmlstream.handler.callback import Callback
from .. xmlstream.matcher.xpath import MatchXPath
from .. xmlstream.stanzabase import ElementBase, ET, JID
from .. stanza.iq import Iq
class DiscoInfo(ElementBase):
namespace = 'http://jabber.org/protocol/disco#info'
name = 'query'
plugin_attrib = 'disco_info'
interfaces = set(('node', 'features', 'identities'))
def getFeatures(self):
features = []
featuresXML = self.xml.findall('{%s}feature' % self.namespace)
for feature in featuresXML:
features.append(feature.attrib['var'])
return features
def setFeatures(self, features):
self.delFeatures()
for name in features:
self.addFeature(name)
def delFeatures(self):
featuresXML = self.xml.findall('{%s}feature' % self.namespace)
for feature in featuresXML:
self.xml.remove(feature)
def addFeature(self, feature):
featureXML = ET.Element('{%s}feature' % self.namespace,
{'var': feature})
self.xml.append(featureXML)
def delFeature(self, feature):
featuresXML = self.xml.findall('{%s}feature' % self.namespace)
for featureXML in featuresXML:
if featureXML.attrib['var'] == feature:
self.xml.remove(featureXML)
def getIdentities(self):
ids = []
idsXML = self.xml.findall('{%s}identity' % self.namespace)
for idXML in idsXML:
idData = (idXML.attrib['category'],
idXML.attrib['type'],
idXML.attrib.get('name', ''))
ids.append(idData)
return ids
def setIdentities(self, ids):
self.delIdentities()
for idData in ids:
self.addIdentity(*idData)
def delIdentities(self):
idsXML = self.xml.findall('{%s}identity' % self.namespace)
for idXML in idsXML:
self.xml.remove(idXML)
def addIdentity(self, category, id_type, name=''):
idXML = ET.Element('{%s}identity' % self.namespace,
{'category': category,
'type': id_type,
'name': name})
self.xml.append(idXML)
def delIdentity(self, category, id_type, name=''):
idsXML = self.xml.findall('{%s}identity' % self.namespace)
for idXML in idsXML:
idData = (idXML.attrib['category'],
idXML.attrib['type'])
delId = (category, id_type)
if idData == delId:
self.xml.remove(idXML)
class DiscoItems(ElementBase):
namespace = 'http://jabber.org/protocol/disco#items'
name = 'query'
plugin_attrib = 'disco_items'
interfaces = set(('node', 'items'))
def getItems(self):
items = []
itemsXML = self.xml.findall('{%s}item' % self.namespace)
for item in itemsXML:
itemData = (item.attrib['jid'],
item.attrib.get('node'),
item.attrib.get('name'))
items.append(itemData)
return items
def setItems(self, items):
self.delItems()
for item in items:
self.addItem(*item)
def delItems(self):
itemsXML = self.xml.findall('{%s}item' % self.namespace)
for item in itemsXML:
self.xml.remove(item)
def addItem(self, jid, node='', name=''):
itemXML = ET.Element('{%s}item' % self.namespace, {'jid': jid})
if name:
itemXML.attrib['name'] = name
if node:
itemXML.attrib['node'] = node
self.xml.append(itemXML)
def delItem(self, jid, node=''):
itemsXML = self.xml.findall('{%s}item' % self.namespace)
for itemXML in itemsXML:
itemData = (itemXML.attrib['jid'],
itemXML.attrib.get('node', ''))
itemDel = (jid, node)
if itemData == itemDel:
self.xml.remove(itemXML)
class DiscoNode(object):
"""
Collection object for grouping info and item information
into nodes.
"""
def __init__(self, name):
self.name = name
self.info = DiscoInfo()
self.items = DiscoItems()
# This is a bit like poor man's inheritance, but
# to simplify adding information to the node we
# map node functions to either the info or items
# stanza objects.
#
# We don't want to make DiscoNode inherit from
# DiscoInfo and DiscoItems because DiscoNode is
# not an actual stanza, and doing so would create
# confusion and potential bugs.
self._map(self.items, 'items', ['get', 'set', 'del'])
self._map(self.items, 'item', ['add', 'del'])
self._map(self.info, 'identities', ['get', 'set', 'del'])
self._map(self.info, 'identity', ['add', 'del'])
self._map(self.info, 'features', ['get', 'set', 'del'])
self._map(self.info, 'feature', ['add', 'del'])
def isEmpty(self):
"""
Test if the node contains any information. Useful for
determining if a node can be deleted.
"""
ids = self.getIdentities()
features = self.getFeatures()
items = self.getItems()
if not ids and not features and not items:
return True
return False
def _map(self, obj, interface, access):
"""
Map functions of the form obj.accessInterface
to self.accessInterface for each given access type.
"""
interface = interface.title()
for access_type in access:
method = access_type + interface
if hasattr(obj, method):
setattr(self, method, getattr(obj, method))
class xep_0030(base.base_plugin):
"""
@ -29,85 +188,137 @@ class xep_0030(base.base_plugin):
def plugin_init(self):
self.xep = '0030'
self.description = 'Service Discovery'
self.features = {'main': ['http://jabber.org/protocol/disco#info', 'http://jabber.org/protocol/disco#items']}
self.identities = {'main': [{'category': 'client', 'type': 'pc', 'name': 'SleekXMPP'}]}
self.items = {'main': []}
self.xmpp.add_handler("<iq type='get' xmlns='%s'><query xmlns='http://jabber.org/protocol/disco#info' /></iq>" % self.xmpp.default_ns, self.info_handler)
self.xmpp.add_handler("<iq type='get' xmlns='%s'><query xmlns='http://jabber.org/protocol/disco#items' /></iq>" % self.xmpp.default_ns, self.item_handler)
self.xmpp.registerHandler(
Callback('Disco Items',
MatchXPath('{%s}iq/{%s}query' % (self.xmpp.default_ns,
DiscoItems.namespace)),
self.handle_item_query))
self.xmpp.registerHandler(
Callback('Disco Info',
MatchXPath('{%s}iq/{%s}query' % (self.xmpp.default_ns,
DiscoInfo.namespace)),
self.handle_info_query))
self.xmpp.stanzaPlugin(Iq, DiscoInfo)
self.xmpp.stanzaPlugin(Iq, DiscoItems)
self.xmpp.add_event_handler('disco_items_request', self.handle_disco_items)
self.xmpp.add_event_handler('disco_info_request', self.handle_disco_info)
self.nodes = {'main': DiscoNode('main')}
def add_node(self, node):
if node not in self.nodes:
self.nodes[node] = DiscoNode(node)
def del_node(self, node):
if node in self.nodes:
del self.nodes[node]
def handle_item_query(self, iq):
if iq['type'] == 'get':
logging.debug("Items requested by %s" % iq['from'])
self.xmpp.event('disco_items_request', iq)
elif iq['type'] == 'result':
logging.debug("Items result from %s" % iq['from'])
self.xmpp.event('disco_items', iq)
def handle_info_query(self, iq):
if iq['type'] == 'get':
logging.debug("Info requested by %s" % iq['from'])
self.xmpp.event('disco_info_request', iq)
elif iq['type'] == 'result':
logging.debug("Info result from %s" % iq['from'])
self.xmpp.event('disco_info', iq)
def handle_disco_info(self, iq, forwarded=False):
"""
A default handler for disco#info requests. If another
handler is registered, this one will defer and not run.
"""
handlers = self.xmpp.event_handlers['disco_info_request']
if not forwarded and len(handlers) > 1:
return
node_name = iq['disco_info']['node']
if not node_name:
node_name = 'main'
logging.debug("Using default handler for disco#info on node '%s'." % node_name)
if node_name in self.nodes:
node = self.nodes[node_name]
iq.reply().setPayload(node.info.xml).send()
else:
logging.debug("Node %s requested, but does not exist." % node_name)
iq.reply().error().setPayload(iq['disco_info'].xml)
iq['error']['code'] = '404'
iq['error']['type'] = 'cancel'
iq['error']['condition'] = 'item-not-found'
iq.send()
def handle_disco_items(self, iq, forwarded=False):
"""
A default handler for disco#items requests. If another
handler is registered, this one will defer and not run.
If this handler is called by your own custom handler with
forwarded set to True, then it will run as normal.
"""
handlers = self.xmpp.event_handlers['disco_items_request']
if not forwarded and len(handlers) > 1:
return
node_name = iq['disco_items']['node']
if not node_name:
node_name = 'main'
logging.debug("Using default handler for disco#items on node '%s'." % node_name)
if node_name in self.nodes:
node = self.nodes[node_name]
iq.reply().setPayload(node.items.xml).send()
else:
logging.debug("Node %s requested, but does not exist." % node_name)
iq.reply().error().setPayload(iq['disco_items'].xml)
iq['error']['code'] = '404'
iq['error']['type'] = 'cancel'
iq['error']['condition'] = 'item-not-found'
iq.send()
# Older interface methods for backwards compatibility
def getInfo(self, jid, node=''):
iq = self.xmpp.Iq()
iq['type'] = 'get'
iq['to'] = jid
iq['from'] = self.xmpp.fulljid
iq['disco_info']['node'] = node
iq.send()
def getItems(self, jid, node=''):
iq = self.xmpp.Iq()
iq['type'] = 'get'
iq['to'] = jid
iq['from'] = self.xmpp.fulljid
iq['disco_items']['node'] = node
iq.send()
def add_feature(self, feature, node='main'):
if not node in self.features:
self.features[node] = []
self.features[node].append(feature)
self.add_node(node)
self.nodes[node].addFeature(feature)
def add_identity(self, category=None, itype=None, name=None, node='main'):
if not node in self.identities:
self.identities[node] = []
self.identities[node].append({'category': category, 'type': itype, 'name': name})
def add_identity(self, category='', itype='', name='', node='main'):
self.add_node(node)
self.nodes[node].addIdentity(category=category,
id_type=itype,
name=name)
def add_item(self, jid=None, name=None, node='main', subnode=''):
if not node in self.items:
self.items[node] = []
self.items[node].append({'jid': jid, 'name': name, 'node': subnode})
def info_handler(self, xml):
logging.debug("Info request from %s" % xml.get('from', ''))
iq = self.xmpp.makeIqResult(xml.get('id', self.xmpp.getNewId()))
iq.attrib['from'] = xml.get('to')
iq.attrib['to'] = xml.get('from', self.xmpp.server)
query = xml.find('{http://jabber.org/protocol/disco#info}query')
node = query.get('node', 'main')
for identity in self.identities.get(node, []):
idxml = ET.Element('identity')
for attrib in identity:
if identity[attrib]:
idxml.attrib[attrib] = identity[attrib]
query.append(idxml)
for feature in self.features.get(node, []):
featxml = ET.Element('feature')
featxml.attrib['var'] = feature
query.append(featxml)
iq.append(query)
#print ET.tostring(iq)
self.xmpp.send(iq)
def item_handler(self, xml):
logging.debug("Item request from %s" % xml.get('from', ''))
iq = self.xmpp.makeIqResult(xml.get('id', self.xmpp.getNewId()))
iq.attrib['from'] = xml.get('to')
iq.attrib['to'] = xml.get('from', self.xmpp.server)
query = self.xmpp.makeIqQuery(iq, 'http://jabber.org/protocol/disco#items').find('{http://jabber.org/protocol/disco#items}query')
node = xml.find('{http://jabber.org/protocol/disco#items}query').get('node', 'main')
for item in self.items.get(node, []):
itemxml = ET.Element('item')
itemxml.attrib = item
if itemxml.attrib['jid'] is None:
itemxml.attrib['jid'] = xml.get('to')
query.append(itemxml)
self.xmpp.send(iq)
def getItems(self, jid, node=None):
iq = self.xmpp.makeIqGet()
iq.attrib['from'] = self.xmpp.fulljid
iq.attrib['to'] = jid
self.xmpp.makeIqQuery(iq, 'http://jabber.org/protocol/disco#items')
if node:
iq.find('{http://jabber.org/protocol/disco#items}query').attrib['node'] = node
return iq.send()
def getInfo(self, jid, node=None):
iq = self.xmpp.makeIqGet()
iq.attrib['from'] = self.xmpp.fulljid
iq.attrib['to'] = jid
self.xmpp.makeIqQuery(iq, 'http://jabber.org/protocol/disco#info')
if node:
iq.find('{http://jabber.org/protocol/disco#info}query').attrib['node'] = node
return iq.send()
def parseInfo(self, xml):
result = {'identity': {}, 'feature': []}
for identity in xml.findall('{http://jabber.org/protocol/disco#info}query/{{http://jabber.org/protocol/disco#info}identity'):
result['identity'][identity['name']] = identity.attrib
for feature in xml.findall('{http://jabber.org/protocol/disco#info}query/{{http://jabber.org/protocol/disco#info}feature'):
result['feature'].append(feature.get('var', '__unknown__'))
return result
def add_item(self, jid=None, name='', node='main', subnode=''):
self.add_node(node)
self.add_node(subnode)
if jid is None:
jid = self.xmpp.fulljid
self.nodes[node].addItem(jid=jid, name=name, node=subnode)

View file

@ -14,12 +14,14 @@ class xep_0060(base.base_plugin):
self.xep = '0060'
self.description = 'Publish-Subscribe'
def create_node(self, jid, node, config=None, collection=False):
def create_node(self, jid, node, config=None, collection=False, ntype=None):
pubsub = ET.Element('{http://jabber.org/protocol/pubsub}pubsub')
create = ET.Element('create')
create.set('node', node)
pubsub.append(create)
configure = ET.Element('configure')
if collection:
ntype = 'collection'
#if config is None:
# submitform = self.xmpp.plugin['xep_0004'].makeForm('submit')
#else:
@ -29,11 +31,11 @@ class xep_0060(base.base_plugin):
submitform.field['FORM_TYPE'].setValue('http://jabber.org/protocol/pubsub#node_config')
else:
submitform.addField('FORM_TYPE', 'hidden', value='http://jabber.org/protocol/pubsub#node_config')
if collection:
if ntype:
if 'pubsub#node_type' in submitform.field:
submitform.field['pubsub#node_type'].setValue('collection')
submitform.field['pubsub#node_type'].setValue(ntype)
else:
submitform.addField('pubsub#node_type', value='collection')
submitform.addField('pubsub#node_type', value=ntype)
else:
if 'pubsub#node_type' in submitform.field:
submitform.field['pubsub#node_type'].setValue('leaf')

View file

@ -11,7 +11,7 @@ class Error(ElementBase):
namespace = 'jabber:client'
name = 'error'
plugin_attrib = 'error'
conditions = set(('bad-request', 'conflict', 'feature-not-implemented', 'forbidden', 'gone', 'item-not-found', 'jid-malformed', 'not-acceptable', 'not-allowed', 'not-authorized', 'payment-required', 'recipient-unavailable', 'redirect', 'registration-required', 'remote-server-not-found', 'remote-server-timeout', 'service-unavailable', 'subscription-required', 'undefined-condition', 'unexpected-request'))
conditions = set(('bad-request', 'conflict', 'feature-not-implemented', 'forbidden', 'gone', 'internal-server-error', 'item-not-found', 'jid-malformed', 'not-acceptable', 'not-allowed', 'not-authorized', 'payment-required', 'recipient-unavailable', 'redirect', 'registration-required', 'remote-server-not-found', 'remote-server-timeout', 'resource-constraint', 'service-unavailable', 'subscription-required', 'undefined-condition', 'unexpected-request'))
interfaces = set(('code', 'condition', 'text', 'type'))
types = set(('cancel', 'continue', 'modify', 'auth', 'wait'))
sub_interfaces = set(('text',))

View file

@ -0,0 +1,87 @@
try:
import queue
except ImportError:
import Queue as queue
import time
import threading
import logging
class Task(object):
"""Task object for the Scheduler class"""
def __init__(self, name, seconds, callback, args=None, kwargs=None, repeat=False, qpointer=None):
self.name = name
self.seconds = seconds
self.callback = callback
self.args = args or tuple()
self.kwargs = kwargs or {}
self.repeat = repeat
self.next = time.time() + self.seconds
self.qpointer = qpointer
def run(self):
if self.qpointer is not None:
self.qpointer.put(('schedule', self.callback, self.args))
else:
self.callback(*self.args, **self.kwargs)
self.reset()
return self.repeat
def reset(self):
self.next = time.time() + self.seconds
class Scheduler(object):
"""Threaded scheduler that allows for updates mid-execution unlike http://docs.python.org/library/sched.html#module-sched"""
def __init__(self, parentqueue=None):
self.addq = queue.Queue()
self.schedule = []
self.thread = None
self.run = False
self.parentqueue = parentqueue
def process(self, threaded=True):
if threaded:
self.thread = threading.Thread(name='shedulerprocess', target=self._process)
self.thread.start()
else:
self._process()
def _process(self):
self.run = True
while self.run:
try:
wait = 1
updated = False
if self.schedule:
wait = self.schedule[0].next - time.time()
try:
if wait <= 0.0:
newtask = self.addq.get(False)
else:
newtask = self.addq.get(True, wait)
except queue.Empty:
cleanup = []
for task in self.schedule:
if time.time() >= task.next:
updated = True
if not task.run():
cleanup.append(task)
else:
break
for task in cleanup:
x = self.schedule.pop(self.schedule.index(task))
else:
updated = True
self.schedule.append(newtask)
finally:
if updated: self.schedule = sorted(self.schedule, key=lambda task: task.next)
except KeyboardInterrupt:
self.run = False
logging.debug("Qutting Scheduler thread")
if self.parentqueue is not None:
self.parentqueue.put(('quit', None, None))
def add(self, name, seconds, callback, args=None, kwargs=None, repeat=False, qpointer=None):
self.addq.put(Task(name, seconds, callback, args, kwargs, repeat, qpointer))
def quit(self):
self.run = False

View file

@ -79,6 +79,9 @@ class ElementBase(tostring.ToString):
self.idx = 0
return self
def __bool__(self):
return True
def __next__(self):
self.idx += 1
if self.idx > len(self.iterables):
@ -319,6 +322,8 @@ class StanzaBase(ElementBase):
def __init__(self, stream=None, xml=None, stype=None, sto=None, sfrom=None, sid=None):
self.stream = stream
if stream is not None:
self.namespace = stream.default_ns
ElementBase.__init__(self, xml)
if stype is not None:
self['type'] = stype
@ -326,8 +331,6 @@ class StanzaBase(ElementBase):
self['to'] = sto
if sfrom is not None:
self['from'] = sfrom
if stream is not None:
self.namespace = stream.default_ns
self.tag = "{%s}%s" % (self.namespace, self.name)
def setType(self, value):

View file

@ -22,6 +22,7 @@ import time
import traceback
import types
import xml.sax.saxutils
from . import scheduler
HANDLER_THREADS = 1
@ -75,6 +76,7 @@ class XMLStream(object):
self.eventqueue = queue.Queue()
self.sendqueue = queue.Queue()
self.scheduler = scheduler.Scheduler(self.eventqueue)
self.namespace_map = {}
@ -145,7 +147,9 @@ class XMLStream(object):
raise RestartStream()
def process(self, threaded=True):
self.scheduler.process(threaded=True)
for t in range(0, HANDLER_THREADS):
logging.debug("Starting HANDLER THREAD")
self.__thread['eventhandle%s' % t] = threading.Thread(name='eventhandle%s' % t, target=self._eventRunner)
self.__thread['eventhandle%s' % t].start()
self.__thread['sendthread'] = threading.Thread(name='sendthread', target=self._sendThread)
@ -156,8 +160,8 @@ class XMLStream(object):
else:
self._process()
def schedule(self, seconds, handler, args=None):
threading.Timer(seconds, handler, args).start()
def schedule(self, name, seconds, callback, args=None, kwargs=None, repeat=False):
self.scheduler.add(name, seconds, callback, args, kwargs, repeat, qpointer=self.eventqueue)
def _process(self):
"Start processing the socket."
@ -177,6 +181,7 @@ class XMLStream(object):
self.state.set('reconnect', False)
self.disconnect()
self.run = False
self.scheduler.run = False
self.eventqueue.put(('quit', None, None))
return
except CloseStream:
@ -223,6 +228,7 @@ class XMLStream(object):
edepth += -1
if edepth == 0 and event == b'end':
self.disconnect(reconnect=self.state['reconnect'])
logging.debug("Ending readXML loop")
return False
elif edepth == 1:
#self.xmlin.put(xmlobj)
@ -231,11 +237,13 @@ class XMLStream(object):
except RestartStream:
return True
except CloseStream:
logging.debug("Ending readXML loop")
return False
if root:
root.clear()
if event == b'start':
edepth += 1
logging.debug("Ending readXML loop")
def _sendThread(self):
while self.run:
@ -265,6 +273,7 @@ class XMLStream(object):
logging.debug("Disconnecting...")
self.state.set('disconnecting', True)
self.run = False
self.scheduler.run = False
if self.state['connected']:
self.sendRaw(self.stream_footer)
time.sleep(1)
@ -323,6 +332,9 @@ class XMLStream(object):
event = self.eventqueue.get(True, timeout=5)
except queue.Empty:
event = None
except KeyboardInterrupt:
self.run = False
self.scheduler.run = False
if event is not None:
etype = event[0]
handler = event[1]
@ -334,9 +346,10 @@ class XMLStream(object):
except Exception as e:
traceback.print_exc()
args[0].exception(e)
elif etype == 'sched':
elif etype == 'schedule':
try:
handler.run(*args)
logging.debug(args)
handler(*args[0])
except:
logging.error(traceback.format_exc())
elif etype == 'quit':

155
tests/test_disco.py Normal file
View file

@ -0,0 +1,155 @@
import unittest
from xml.etree import cElementTree as ET
from sleekxmpp.xmlstream.matcher.stanzapath import StanzaPath
from . import xmlcompare
import sleekxmpp.plugins.xep_0030 as sd
def stanzaPlugin(stanza, plugin):
stanza.plugin_attrib_map[plugin.plugin_attrib] = plugin
stanza.plugin_tag_map["{%s}%s" % (plugin.namespace, plugin.name)] = plugin
class testdisco(unittest.TestCase):
def setUp(self):
self.sd = sd
stanzaPlugin(self.sd.Iq, self.sd.DiscoInfo)
stanzaPlugin(self.sd.Iq, self.sd.DiscoItems)
def try3Methods(self, xmlstring, iq):
iq2 = self.sd.Iq(None, self.sd.ET.fromstring(xmlstring))
values = iq2.getValues()
iq3 = self.sd.Iq()
iq3.setValues(values)
self.failUnless(xmlstring == str(iq) == str(iq2) == str(iq3), str(iq)+"3 methods for creating stanza don't match")
def testCreateInfoQueryNoNode(self):
"""Testing disco#info query with no node."""
iq = self.sd.Iq()
iq['id'] = "0"
iq['disco_info']['node'] = ''
xmlstring = """<iq id="0"><query xmlns="http://jabber.org/protocol/disco#info" /></iq>"""
self.try3Methods(xmlstring, iq)
def testCreateInfoQueryWithNode(self):
"""Testing disco#info query with a node."""
iq = self.sd.Iq()
iq['id'] = "0"
iq['disco_info']['node'] = 'foo'
xmlstring = """<iq id="0"><query xmlns="http://jabber.org/protocol/disco#info" node="foo" /></iq>"""
self.try3Methods(xmlstring, iq)
def testCreateInfoQueryNoNode(self):
"""Testing disco#items query with no node."""
iq = self.sd.Iq()
iq['id'] = "0"
iq['disco_items']['node'] = ''
xmlstring = """<iq id="0"><query xmlns="http://jabber.org/protocol/disco#items" /></iq>"""
self.try3Methods(xmlstring, iq)
def testCreateItemsQueryWithNode(self):
"""Testing disco#items query with a node."""
iq = self.sd.Iq()
iq['id'] = "0"
iq['disco_items']['node'] = 'foo'
xmlstring = """<iq id="0"><query xmlns="http://jabber.org/protocol/disco#items" node="foo" /></iq>"""
self.try3Methods(xmlstring, iq)
def testInfoIdentities(self):
"""Testing adding identities to disco#info."""
iq = self.sd.Iq()
iq['id'] = "0"
iq['disco_info']['node'] = 'foo'
iq['disco_info'].addIdentity('conference', 'text', 'Chatroom')
xmlstring = """<iq id="0"><query xmlns="http://jabber.org/protocol/disco#info" node="foo"><identity category="conference" type="text" name="Chatroom" /></query></iq>"""
self.try3Methods(xmlstring, iq)
def testInfoFeatures(self):
"""Testing adding features to disco#info."""
iq = self.sd.Iq()
iq['id'] = "0"
iq['disco_info']['node'] = 'foo'
iq['disco_info'].addFeature('foo')
iq['disco_info'].addFeature('bar')
xmlstring = """<iq id="0"><query xmlns="http://jabber.org/protocol/disco#info" node="foo"><feature var="foo" /><feature var="bar" /></query></iq>"""
self.try3Methods(xmlstring, iq)
def testItems(self):
"""Testing adding features to disco#info."""
iq = self.sd.Iq()
iq['id'] = "0"
iq['disco_items']['node'] = 'foo'
iq['disco_items'].addItem('user@localhost')
iq['disco_items'].addItem('user@localhost', 'foo')
iq['disco_items'].addItem('user@localhost', 'bar', 'Testing')
xmlstring = """<iq id="0"><query xmlns="http://jabber.org/protocol/disco#items" node="foo"><item jid="user@localhost" /><item node="foo" jid="user@localhost" /><item node="bar" jid="user@localhost" name="Testing" /></query></iq>"""
self.try3Methods(xmlstring, iq)
def testAddRemoveIdentities(self):
"""Test adding and removing identities to disco#info stanza"""
ids = [('automation', 'commands', 'AdHoc'),
('conference', 'text', 'ChatRoom')]
info = self.sd.DiscoInfo()
info.addIdentity(*ids[0])
self.failUnless(info.getIdentities() == [ids[0]])
info.delIdentity('automation', 'commands')
self.failUnless(info.getIdentities() == [])
info.setIdentities(ids)
self.failUnless(info.getIdentities() == ids)
info.delIdentity('automation', 'commands')
self.failUnless(info.getIdentities() == [ids[1]])
info.delIdentities()
self.failUnless(info.getIdentities() == [])
def testAddRemoveFeatures(self):
"""Test adding and removing features to disco#info stanza"""
features = ['foo', 'bar', 'baz']
info = self.sd.DiscoInfo()
info.addFeature(features[0])
self.failUnless(info.getFeatures() == [features[0]])
info.delFeature('foo')
self.failUnless(info.getFeatures() == [])
info.setFeatures(features)
self.failUnless(info.getFeatures() == features)
info.delFeature('bar')
self.failUnless(info.getFeatures() == ['foo', 'baz'])
info.delFeatures()
self.failUnless(info.getFeatures() == [])
def testAddRemoveItems(self):
"""Test adding and removing items to disco#items stanza"""
items = [('user@localhost', None, None),
('user@localhost', 'foo', None),
('user@localhost', 'bar', 'Test')]
info = self.sd.DiscoItems()
self.failUnless(True, ""+str(items[0]))
info.addItem(*(items[0]))
self.failUnless(info.getItems() == [items[0]], info.getItems())
info.delItem('user@localhost')
self.failUnless(info.getItems() == [])
info.setItems(items)
self.failUnless(info.getItems() == items)
info.delItem('user@localhost', 'foo')
self.failUnless(info.getItems() == [items[0], items[2]])
info.delItems()
self.failUnless(info.getItems() == [])
suite = unittest.TestLoader().loadTestsFromTestCase(testdisco)

View file

@ -97,6 +97,21 @@ class testpubsubstanzas(unittest.TestCase):
iq3.setValues(values)
self.failUnless(xmlstring == str(iq) == str(iq2) == str(iq3))
def testState(self):
"Testing iq/psstate stanzas"
from sleekxmpp.plugins import xep_0004
iq = self.ps.Iq()
iq['psstate']['node']= 'mynode'
iq['psstate']['item']= 'myitem'
pl = ET.Element('{http://andyet.net/protocol/pubsubqueue}claimed')
iq['psstate']['payload'] = pl
xmlstring = """<iq id="0"><state xmlns="http://jabber.org/protocol/psstate" node="mynode" item="myitem"><claimed xmlns="http://andyet.net/protocol/pubsubqueue" /></state></iq>"""
iq2 = self.ps.Iq(None, self.ps.ET.fromstring(xmlstring))
iq3 = self.ps.Iq()
values = iq2.getValues()
iq3.setValues(values)
self.failUnless(xmlstring == str(iq) == str(iq2) == str(iq3))
def testDefault(self):
"Testing iq/pubsub_owner/default stanzas"
from sleekxmpp.plugins import xep_0004