mirror of
https://github.com/correl/SleekXMPP.git
synced 2024-11-24 03:00:15 +00:00
240 lines
6 KiB
Python
240 lines
6 KiB
Python
from xml.etree import cElementTree as ET
|
|
|
|
class JID(object):
|
|
def __init__(self, jid):
|
|
self.jid = jid
|
|
|
|
def __getattr__(self, name):
|
|
if name == 'resource':
|
|
return self.jid.split('/', 1)[-1]
|
|
elif name == 'user':
|
|
return self.jid.split('@', 1)[0]
|
|
elif name == 'server':
|
|
return self.jid.split('@', 1)[-1].split('/', 1)[0]
|
|
elif name == 'full':
|
|
return self.jid
|
|
|
|
def __str__(self):
|
|
return self.jid
|
|
|
|
class StanzaBase(object):
|
|
name = 'stanza'
|
|
namespace = 'jabber:client'
|
|
interfaces = set(('type', 'to', 'from', 'id', 'payload'))
|
|
types = set(('get', 'set', 'error', None, 'unavailable', 'normal', 'chat'))
|
|
sub_interfaces = tuple()
|
|
|
|
def __init__(self, stream, xml=None, stype=None, sto=None, sfrom=None, sid=None):
|
|
self.stream = stream
|
|
self.xml = xml
|
|
if xml is None:
|
|
self.xml = ET.Element("{%(namespace)s}%(name)s" % {'name': self.name, 'namespace': self.namespace})
|
|
if stype is not None:
|
|
self['type'] = stype
|
|
if sto is not None:
|
|
self['to'] = sto
|
|
if sfrom is not None:
|
|
self['from'] = sfrom
|
|
self.tag = "{%s}%s" % (self.stream.default_ns, self.name)
|
|
|
|
def match(self, xml):
|
|
return xml.tag == self.tag
|
|
|
|
def __getitem__(self, attrib):
|
|
if attrib in self.interfaces:
|
|
if hasattr(self, "get%s" % attrib.title()):
|
|
return getattr(self, "get%s" % attrib.title())()
|
|
else:
|
|
if attrib in self.sub_interfaces:
|
|
return self._getSubText(attrib)
|
|
else:
|
|
return self._getAttr(attrib)
|
|
else:
|
|
return ''
|
|
|
|
def __setitem__(self, attrib, value):
|
|
if attrib.lower() in self.interfaces:
|
|
if value is not None:
|
|
if hasattr(self, "set%s" % attrib.title()):
|
|
getattr(self, "set%s" % attrib.title())(value,)
|
|
else:
|
|
if attrib in self.sub_interfaces:
|
|
return self._setSubText(attrib, text=value)
|
|
else:
|
|
self._setAttr(attrib, value)
|
|
else:
|
|
self.__delitem__(attrib)
|
|
return self
|
|
|
|
def __delitem__(self, attrib):
|
|
if attrib.lower() in self.interfaces:
|
|
if hasattr(self, "del%s" % attrib.title()):
|
|
getattr(self, "del%s" % attrib.title())()
|
|
else:
|
|
if attrib in self.sub_interfaces:
|
|
return self._delSub(attrib)
|
|
else:
|
|
self._delAttr(attrib)
|
|
return self
|
|
|
|
def setType(self, value):
|
|
if value in self.types:
|
|
if value is None and 'type' in self.xml.attrib:
|
|
del self.xml.attrib['type']
|
|
elif value is not None:
|
|
self.xml.attrib['type'] = value
|
|
else:
|
|
raise ValueError
|
|
return self
|
|
|
|
def getPayload(self):
|
|
return self.xml.getchildren()
|
|
|
|
def setPayload(self, value):
|
|
self.xml.append(value)
|
|
|
|
def delPayload(self):
|
|
self.clear()
|
|
|
|
def clear(self):
|
|
for child in self.xml.getchildren():
|
|
self.xml.remove(child)
|
|
|
|
def reply(self):
|
|
self['from'], self['to'] = self['to'], self['from']
|
|
self.clear()
|
|
return self
|
|
|
|
def error(self):
|
|
self['type'] = 'error'
|
|
|
|
def getTo(self):
|
|
return JID(self._getAttr('to'))
|
|
|
|
def setTo(self, value):
|
|
return self._setAttr('to', str(value))
|
|
|
|
def getFrom(self):
|
|
return JID(self._getAttr('from'))
|
|
|
|
def setFrom(self, value):
|
|
return self._setAttr('from', str(value))
|
|
|
|
def getValues(self):
|
|
out = {}
|
|
for interface in self.interfaces:
|
|
out[interface] = self[interface]
|
|
return out
|
|
|
|
def setValues(self, attrib):
|
|
for interface in attrib:
|
|
if interface in self.interfaces:
|
|
self[interface] = attrib[interface]
|
|
|
|
def _setAttr(self, name, value):
|
|
self.xml.attrib[name] = value
|
|
|
|
def _delAttr(self, name):
|
|
if name in self.xml.attrib:
|
|
del self.xml.attrib[name]
|
|
|
|
def _getAttr(self, name):
|
|
return self.xml.attrib.get(name, '')
|
|
|
|
def _getSubText(self, name):
|
|
stanza = self.xml.find("{%s}%s" % (self.namespace, name))
|
|
if stanza is None or stanza.text is None:
|
|
return ''
|
|
else:
|
|
return stanza.text
|
|
|
|
def _setSubText(self, name, attrib={}, text=None):
|
|
stanza = self.xml.find("{%s}%s" % (self.namespace, name))
|
|
if stanza is None:
|
|
self.xml.append(ET.Element("{%s}%s" % (self.namespace, name), attrib))
|
|
stanza = self.xml.find("{%s}%s" % (self.namespace, name))
|
|
if text is not None:
|
|
stanza.text = text
|
|
return stanza
|
|
|
|
def _delSub(self, name):
|
|
for child in self.xml.getchildren():
|
|
if child.tag == "{%s}%s" % (self.namespace, name):
|
|
self.xml.remove(child)
|
|
|
|
def unhandled(self):
|
|
pass
|
|
|
|
def send(self):
|
|
self.stream.sendRaw(str(self))
|
|
|
|
def __str__(self, xml=None, xmlns='', stringbuffer=''):
|
|
if xml is None:
|
|
xml = self.xml
|
|
newoutput = [stringbuffer]
|
|
#TODO respect ET mapped namespaces
|
|
itag = xml.tag.split('}', 1)[-1]
|
|
if '}' in xml.tag:
|
|
ixmlns = xml.tag.split('}', 1)[0][1:]
|
|
else:
|
|
ixmlns = ''
|
|
nsbuffer = ''
|
|
if xmlns != ixmlns and ixmlns != '' and ixmlns != self.stream.default_ns:
|
|
if ixmlns in self.stream.namespace_map:
|
|
if self.stream.namespace_map[ixmlns] != '':
|
|
itag = "%s:%s" % (self.stream.namespace_map[ixmlns], itag)
|
|
else:
|
|
nsbuffer = """ xmlns="%s\"""" % ixmlns
|
|
if ixmlns not in (xmlns, self.namespace):
|
|
nsbuffer = """ xmlns="%s\"""" % ixmlns
|
|
newoutput.append("<%s" % itag)
|
|
newoutput.append(nsbuffer)
|
|
for attrib in xml.attrib:
|
|
if '{' not in attrib:
|
|
newoutput.append(""" %s="%s\"""" % (attrib, self.xmlesc(xml.attrib[attrib])))
|
|
if len(xml) or xml.text or xml.tail:
|
|
newoutput.append(">")
|
|
if xml.text:
|
|
newoutput.append(self.xmlesc(xml.text))
|
|
if len(xml):
|
|
for child in xml.getchildren():
|
|
newoutput.append(self.__str__(child, ixmlns))
|
|
newoutput.append("</%s>" % (itag, ))
|
|
if xml.tail:
|
|
newoutput.append(self.xmlesc(xml.tail))
|
|
elif xml.text:
|
|
newoutput.append(">%s</%s>" % (self.xmlesc(xml.text), itag))
|
|
else:
|
|
newoutput.append(" />")
|
|
return ''.join(newoutput)
|
|
|
|
def xmlesc(self, text):
|
|
text = list(text)
|
|
cc = 0
|
|
matches = ('&', '<', '"', '>', "'")
|
|
for c in text:
|
|
if c in matches:
|
|
if c == '&':
|
|
text[cc] = '&'
|
|
elif c == '<':
|
|
text[cc] = '<'
|
|
elif c == '>':
|
|
text[cc] = '>'
|
|
elif c == "'":
|
|
text[cc] = '''
|
|
elif self.escape_quotes:
|
|
text[cc] = '"'
|
|
cc += 1
|
|
return ''.join(text)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
x = Stanza()
|
|
x['from'] = 'you'
|
|
x['to'] = 'me'
|
|
print(x['from'], x['to'])
|
|
x.reply()
|
|
print(x['from'], x['to'])
|
|
x['from'] = None
|
|
print(x['from'], x['to'])
|
|
print(str(x))
|