SleekXMPP/sleekxmpp/xmlstream/xmlstream.py

444 lines
13 KiB
Python
Raw Normal View History

2010-03-26 21:32:16 +00:00
"""
SleekXMPP: The Sleek XMPP Library
Copyright (C) 2010 Nathanael C. Fritz
This file is part of SleekXMPP.
See the file license.txt for copying permission.
"""
2010-01-08 06:03:02 +00:00
from __future__ import with_statement, unicode_literals
try:
import queue
except ImportError:
import Queue as queue
2009-06-03 22:56:51 +00:00
from . import statemachine
from . stanzabase import StanzaBase
from xml.etree import cElementTree
from xml.parsers import expat
import logging
import socket
import threading
2009-06-03 22:56:51 +00:00
import time
import traceback
import types
import xml.sax.saxutils
HANDLER_THREADS = 1
2009-06-03 22:56:51 +00:00
ssl_support = True
2010-01-08 06:03:02 +00:00
#try:
import ssl
#except ImportError:
# ssl_support = False
import sys
if sys.version_info < (3, 0):
#monkey patch broken filesocket object
from . import filesocket
#socket._fileobject = filesocket.filesocket
2009-06-03 22:56:51 +00:00
class RestartStream(Exception):
pass
class CloseStream(Exception):
pass
stanza_extensions = {}
class XMLStream(object):
"A connection manager with XML events."
def __init__(self, socket=None, host='', port=0, escape_quotes=False):
global ssl_support
self.ssl_support = ssl_support
self.escape_quotes = escape_quotes
self.state = statemachine.StateMachine()
2010-04-14 02:35:47 +00:00
self.state.addStates({'connected':False, 'is client':False, 'ssl':False, 'tls':False, 'reconnect':True, 'processing':False, 'disconnecting':False}) #set initial states
2009-06-03 22:56:51 +00:00
self.setSocket(socket)
self.address = (host, int(port))
self.__thread = {}
self.__root_stanza = []
2009-06-03 22:56:51 +00:00
self.__stanza = {}
self.__stanza_extension = {}
self.__handlers = []
self.__tls_socket = None
self.filesocket = None
2009-06-03 22:56:51 +00:00
self.use_ssl = False
self.use_tls = False
self.stream_header = "<stream>"
self.stream_footer = "</stream>"
self.eventqueue = queue.Queue()
self.sendqueue = queue.Queue()
2009-06-03 22:56:51 +00:00
self.namespace_map = {}
self.run = True
2009-06-03 22:56:51 +00:00
def setSocket(self, socket):
"Set the socket"
self.socket = socket
if socket is not None:
self.filesocket = socket.makefile('rb', 0) # ElementTree.iterparse requires a file. 0 buffer files have to be binary
self.state.set('connected', True)
def setFileSocket(self, filesocket):
self.filesocket = filesocket
def connect(self, host='', port=0, use_ssl=False, use_tls=True):
"Link to connectTCP"
return self.connectTCP(host, port, use_ssl, use_tls)
def connectTCP(self, host='', port=0, use_ssl=None, use_tls=None, reattempt=True):
"Connect and create socket"
while reattempt and not self.state['connected']:
if host and port:
self.address = (host, int(port))
if use_ssl is not None:
self.use_ssl = use_ssl
if use_tls is not None:
self.use_tls = use_tls
self.state.set('is client', True)
if sys.version_info < (3, 0):
self.socket = filesocket.Socket26(socket.AF_INET, socket.SOCK_STREAM)
else:
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
2010-01-08 06:03:02 +00:00
self.socket.settimeout(None)
2009-06-03 22:56:51 +00:00
if self.use_ssl and self.ssl_support:
logging.debug("Socket Wrapped for SSL")
self.socket = ssl.wrap_socket(self.socket)
try:
self.socket.connect(self.address)
2010-01-08 06:03:02 +00:00
#self.filesocket = self.socket.makefile('rb', 0)
self.filesocket = self.socket.makefile('rb', 0)
2009-06-03 22:56:51 +00:00
self.state.set('connected', True)
return True
except socket.error as serr:
logging.error("Could not connect. Socket Error #%s: %s" % (serr.errno, serr.strerror))
2009-06-03 22:56:51 +00:00
time.sleep(1)
def connectUnix(self, filepath):
"Connect to Unix file and create socket"
def startTLS(self):
"Handshakes for TLS"
if self.ssl_support:
logging.info("Negotiating TLS")
2009-06-03 22:56:51 +00:00
self.realsocket = self.socket
self.socket = ssl.wrap_socket(self.socket, ssl_version=ssl.PROTOCOL_TLSv1, do_handshake_on_connect=False)
self.socket.do_handshake()
2010-01-08 06:03:02 +00:00
if sys.version_info < (3,0):
from . filesocket import filesocket
self.filesocket = filesocket(self.socket)
else:
self.filesocket = self.socket.makefile('rb', 0)
2009-06-03 22:56:51 +00:00
return True
else:
logging.warning("Tried to enable TLS, but ssl module not found.")
2009-06-03 22:56:51 +00:00
return False
raise RestartStream()
def process(self, threaded=True):
for t in range(0, HANDLER_THREADS):
2010-05-12 20:51:14 +00:00
th = threading.Thread(name='eventhandle%s' % t, target=self._eventRunner)
th.setDaemon(True)
self.__thread['eventhandle%s' % t] = th
th.start()
th = threading.Thread(name='sendthread', target=self._sendThread)
th.setDaemon(True)
self.__thread['sendthread'] = th
th.start()
2009-06-03 22:56:51 +00:00
if threaded:
2010-05-12 20:51:14 +00:00
th = threading.Thread(name='process', target=self._process)
th.setDaemon(True)
self.__thread['process'] = th
th.start()
2009-06-03 22:56:51 +00:00
else:
self._process()
def schedule(self, seconds, handler, args=None):
threading.Timer(seconds, handler, args).start()
2009-06-03 22:56:51 +00:00
def _process(self):
"Start processing the socket."
firstrun = True
2010-02-27 02:02:08 +00:00
while self.run and (firstrun or self.state['reconnect']):
2009-06-03 22:56:51 +00:00
self.state.set('processing', True)
firstrun = False
try:
if self.state['is client']:
self.sendRaw(self.stream_header)
2010-04-14 02:35:47 +00:00
while self.run and self.__readXML():
2009-06-03 22:56:51 +00:00
if self.state['is client']:
self.sendRaw(self.stream_header)
except KeyboardInterrupt:
logging.debug("Keyboard Escape Detected")
self.state.set('processing', False)
2009-06-16 11:59:55 +00:00
self.state.set('reconnect', False)
2009-06-03 22:56:51 +00:00
self.disconnect()
self.run = False
self.eventqueue.put(('quit', None, None))
2009-06-25 06:49:58 +00:00
return
2009-06-16 11:59:55 +00:00
except CloseStream:
2009-06-25 06:49:58 +00:00
return
except SystemExit:
self.eventqueue.put(('quit', None, None))
2009-06-25 06:49:58 +00:00
return
except socket.error:
2009-06-25 06:49:58 +00:00
if not self.state.reconnect:
return
else:
self.state.set('processing', False)
traceback.print_exc()
self.disconnect(reconnect=True)
2009-06-03 22:56:51 +00:00
except:
2009-06-25 06:49:58 +00:00
if not self.state.reconnect:
return
else:
self.state.set('processing', False)
traceback.print_exc()
self.disconnect(reconnect=True)
2009-06-03 22:56:51 +00:00
if self.state['reconnect']:
Fixes for disconnection problems detailed in http://github.com/fritzy/SleekXMPP/issues/#issue/20 Fixes to both ClientXMPP & xmlstream. ClientXMPP was not tracking the changes to authenticated and sessionstarted after the client was disconnected. xmlstream had some funkyness with state in the _process method that was cleaned up and hopefully made a little cleaner. Also changed a DNS issue that was occuring that rendered me unable to disconnect. I would recieve the following error upon reconnect. Exception in thread process: Exception in thread process: Traceback (most recent call last): File "/usr/local/lib/python2.6/threading.py", line 532, in __bootstrap_inner self.run() File "/usr/local/lib/python2.6/threading.py", line 484, in run self.__target(*self.__args, **self.__kwargs) File "/home/macdiesel/tmp/workspace/SleekXMPP/sleekxmpp/xmlstream/xmlstream.py", line 202, in _process self.reconnect() File "/home/macdiesel/tmp/workspace/SleekXMPP/sleekxmpp/__init__.py", line 134, in reconnect XMLStream.reconnect(self) File "/home/macdiesel/tmp/workspace/SleekXMPP/sleekxmpp/xmlstream/xmlstream.py", line 289, in reconnect self.connect() File "/home/macdiesel/tmp/workspace/SleekXMPP/sleekxmpp/__init__.py", line 99, in connect answers = dns.resolver.query("_xmpp-client._tcp.%s" % self.server, "SRV") File "/usr/local/lib/python2.6/site-packages/dns/resolver.py", line 732, in query return get_default_resolver().query(qname, rdtype, rdclass, tcp, source) File "/usr/local/lib/python2.6/site-packages/dns/resolver.py", line 617, in query source=source) File "/usr/local/lib/python2.6/site-packages/dns/query.py", line 113, in udp wire = q.to_wire() File "/usr/local/lib/python2.6/site-packages/dns/message.py", line 404, in to_wire r.add_question(rrset.name, rrset.rdtype, rrset.rdclass) File "/usr/local/lib/python2.6/site-packages/dns/renderer.py", line 152, in add_question self.output.write(struct.pack("!HH", rdtype, rdclass)) TypeError: unsupported operand type(s) for &: 'unicode' and 'long' Seems I was getting this error when calling line 99 in ClientXMPP. You can't bit-shift a 1 and a string and this is why this error is coming up. I removed the "SRV" argument and used the default of 1. not sure exactly what this should be so it may need to be fixed back before it's merged back to trunk. The line in question: answers = dns.resolver.query("_xmpp-client._tcp.%s" % self.server, "SRV")
2010-05-04 18:03:38 +00:00
self.state.set('connected', False)
self.state.set('processing', False)
2009-06-03 22:56:51 +00:00
self.reconnect()
Fixes for disconnection problems detailed in http://github.com/fritzy/SleekXMPP/issues/#issue/20 Fixes to both ClientXMPP & xmlstream. ClientXMPP was not tracking the changes to authenticated and sessionstarted after the client was disconnected. xmlstream had some funkyness with state in the _process method that was cleaned up and hopefully made a little cleaner. Also changed a DNS issue that was occuring that rendered me unable to disconnect. I would recieve the following error upon reconnect. Exception in thread process: Exception in thread process: Traceback (most recent call last): File "/usr/local/lib/python2.6/threading.py", line 532, in __bootstrap_inner self.run() File "/usr/local/lib/python2.6/threading.py", line 484, in run self.__target(*self.__args, **self.__kwargs) File "/home/macdiesel/tmp/workspace/SleekXMPP/sleekxmpp/xmlstream/xmlstream.py", line 202, in _process self.reconnect() File "/home/macdiesel/tmp/workspace/SleekXMPP/sleekxmpp/__init__.py", line 134, in reconnect XMLStream.reconnect(self) File "/home/macdiesel/tmp/workspace/SleekXMPP/sleekxmpp/xmlstream/xmlstream.py", line 289, in reconnect self.connect() File "/home/macdiesel/tmp/workspace/SleekXMPP/sleekxmpp/__init__.py", line 99, in connect answers = dns.resolver.query("_xmpp-client._tcp.%s" % self.server, "SRV") File "/usr/local/lib/python2.6/site-packages/dns/resolver.py", line 732, in query return get_default_resolver().query(qname, rdtype, rdclass, tcp, source) File "/usr/local/lib/python2.6/site-packages/dns/resolver.py", line 617, in query source=source) File "/usr/local/lib/python2.6/site-packages/dns/query.py", line 113, in udp wire = q.to_wire() File "/usr/local/lib/python2.6/site-packages/dns/message.py", line 404, in to_wire r.add_question(rrset.name, rrset.rdtype, rrset.rdclass) File "/usr/local/lib/python2.6/site-packages/dns/renderer.py", line 152, in add_question self.output.write(struct.pack("!HH", rdtype, rdclass)) TypeError: unsupported operand type(s) for &: 'unicode' and 'long' Seems I was getting this error when calling line 99 in ClientXMPP. You can't bit-shift a 1 and a string and this is why this error is coming up. I removed the "SRV" argument and used the default of 1. not sure exactly what this should be so it may need to be fixed back before it's merged back to trunk. The line in question: answers = dns.resolver.query("_xmpp-client._tcp.%s" % self.server, "SRV")
2010-05-04 18:03:38 +00:00
else:
self.eventqueue.put(('quit', None, None))
2009-06-03 22:56:51 +00:00
#self.__thread['readXML'] = threading.Thread(name='readXML', target=self.__readXML)
#self.__thread['readXML'].start()
#self.__thread['spawnEvents'] = threading.Thread(name='spawnEvents', target=self.__spawnEvents)
#self.__thread['spawnEvents'].start()
def __readXML(self):
"Parses the incoming stream, adding to xmlin queue as it goes"
#build cElementTree object from expat was we go
2010-03-15 17:19:45 +00:00
#self.filesocket = self.socket.makefile('rb', 0)
2010-01-08 06:03:02 +00:00
#print self.filesocket.read(1024) #self.filesocket._sock.recv(1024)
2009-06-03 22:56:51 +00:00
edepth = 0
root = None
for (event, xmlobj) in cElementTree.iterparse(self.filesocket, (b'end', b'start')):
2009-06-03 22:56:51 +00:00
if edepth == 0: # and xmlobj.tag.split('}', 1)[-1] == self.basetag:
if event == b'start':
2009-06-03 22:56:51 +00:00
root = xmlobj
self.start_stream_handler(root)
if event == b'end':
2009-06-03 22:56:51 +00:00
edepth += -1
if edepth == 0 and event == b'end':
2010-04-14 02:35:47 +00:00
self.disconnect(reconnect=self.state['reconnect'])
2009-06-03 22:56:51 +00:00
return False
elif edepth == 1:
#self.xmlin.put(xmlobj)
try:
self.__spawnEvent(xmlobj)
except RestartStream:
return True
except CloseStream:
return False
if root:
root.clear()
if event == b'start':
2009-06-03 22:56:51 +00:00
edepth += 1
def _sendThread(self):
2010-02-27 02:02:08 +00:00
while self.run:
data = self.sendqueue.get(True)
logging.debug("SEND: %s" % data)
try:
self.socket.send(data.encode('utf-8'))
#self.socket.send(bytes(data, "utf-8"))
#except socket.error,(errno, strerror):
except:
2010-04-14 02:35:47 +00:00
logging.warning("Failed to send %s" % data)
self.state.set('connected', False)
if self.state.reconnect:
logging.error("Disconnected. Socket Error.")
traceback.print_exc()
self.disconnect(reconnect=True)
2009-06-03 22:56:51 +00:00
def sendRaw(self, data):
self.sendqueue.put(data)
2009-06-03 22:56:51 +00:00
return True
def disconnect(self, reconnect=False):
self.state.set('reconnect', reconnect)
2010-04-14 02:35:47 +00:00
if self.state['disconnecting']:
return
if not self.state['reconnect']:
logging.debug("Disconnecting...")
self.state.set('disconnecting', True)
self.run = False
2009-06-03 22:56:51 +00:00
if self.state['connected']:
self.sendRaw(self.stream_footer)
2010-04-14 02:35:47 +00:00
time.sleep(1)
2009-06-03 22:56:51 +00:00
#send end of stream
#wait for end of stream back
try:
self.socket.close()
self.filesocket.close()
self.socket.shutdown(socket.SHUT_RDWR)
except socket.error as serr:
2009-06-25 06:49:58 +00:00
#logging.warning("Error while disconnecting. Socket Error #%s: %s" % (errno, strerror))
#thread.exit_thread()
pass
2009-06-03 22:56:51 +00:00
if self.state['processing']:
2009-06-25 06:49:58 +00:00
#raise CloseStream
pass
2009-06-03 22:56:51 +00:00
def reconnect(self):
self.state.set('tls',False)
self.state.set('ssl',False)
time.sleep(1)
2010-05-12 20:51:14 +00:00
self.connect(self.server,self.port)
2009-07-11 21:46:31 +00:00
def incoming_filter(self, xmlobj):
return xmlobj
2009-06-03 22:56:51 +00:00
def __spawnEvent(self, xmlobj):
"watching xmlOut and processes handlers"
#convert XML into Stanza
2009-09-05 07:38:29 +00:00
logging.debug("RECV: %s" % cElementTree.tostring(xmlobj))
2009-07-11 21:46:31 +00:00
xmlobj = self.incoming_filter(xmlobj)
2009-06-03 22:56:51 +00:00
stanza = None
for stanza_class in self.__root_stanza:
if xmlobj.tag == "{%s}%s" % (self.default_ns, stanza_class.name):
#if self.__root_stanza[stanza_class].match(xmlobj):
2009-06-03 22:56:51 +00:00
stanza = stanza_class(self, xmlobj)
break
if stanza is None:
stanza = StanzaBase(self, xmlobj)
2009-12-22 10:05:53 +00:00
unhandled = True
2009-06-03 22:56:51 +00:00
for handler in self.__handlers:
if handler.match(stanza):
handler.prerun(stanza)
self.eventqueue.put(('stanza', handler, stanza))
2009-06-03 22:56:51 +00:00
if handler.checkDelete(): self.__handlers.pop(self.__handlers.index(handler))
2009-12-22 10:05:53 +00:00
unhandled = False
if unhandled:
stanza.unhandled()
2009-06-03 22:56:51 +00:00
#loop through handlers and test match
#spawn threads as necessary, call handlers, sending Stanza
def _eventRunner(self):
logging.debug("Loading event runner")
while self.run:
try:
event = self.eventqueue.get(True, timeout=5)
except queue.Empty:
event = None
if event is not None:
2010-01-08 06:03:02 +00:00
etype = event[0]
handler = event[1]
args = event[2:]
#etype, handler, *args = event #python 3.x way
if etype == 'stanza':
try:
handler.run(args[0])
except Exception as e:
traceback.print_exc()
args[0].exception(e)
elif etype == 'sched':
try:
handler.run(*args)
except:
logging.error(traceback.format_exc())
elif etype == 'quit':
logging.debug("Quitting eventRunner thread")
return False
2009-06-03 22:56:51 +00:00
def registerHandler(self, handler, before=None, after=None):
"Add handler with matcher class and parameters."
self.__handlers.append(handler)
def removeHandler(self, name):
"Removes the handler."
idx = 0
for handler in self.__handlers:
if handler.name == name:
self.__handlers.pop(idx)
return
idx += 1
def registerStanza(self, stanza_class):
2009-06-03 22:56:51 +00:00
"Adds stanza. If root stanzas build stanzas sent in events while non-root stanzas build substanza objects."
self.__root_stanza.append(stanza_class)
2009-06-03 22:56:51 +00:00
def registerStanzaExtension(self, stanza_class, stanza_extension):
if stanza_class not in stanza_extensions:
stanza_extensions[stanza_class] = [stanza_extension]
else:
stanza_extensions[stanza_class].append(stanza_extension)
def removeStanza(self, stanza_class, root=False):
"Removes the stanza's registration."
if root:
del self.__root_stanza[stanza_class]
else:
del self.__stanza[stanza_class]
def removeStanzaExtension(self, stanza_class, stanza_extension):
stanza_extension[stanza_class].pop(stanza_extension)
def tostring(self, xml, xmlns='', stringbuffer=''):
newoutput = [stringbuffer]
#TODO respect ET mapped namespaces
itag = xml.tag.split('}', 1)[-1]
if '}' in xml.tag:
ixmlns = xml.tag.split('}', 1)[0][1:]
else:
ixmlns = ''
nsbuffer = ''
if xmlns != ixmlns and ixmlns != '':
if ixmlns in self.namespace_map:
if self.namespace_map[ixmlns] != '':
itag = "%s:%s" % (self.namespace_map[ixmlns], itag)
else:
nsbuffer = """ xmlns="%s\"""" % ixmlns
newoutput.append("<%s" % itag)
newoutput.append(nsbuffer)
for attrib in xml.attrib:
newoutput.append(""" %s="%s\"""" % (attrib, self.xmlesc(xml.attrib[attrib])))
if len(xml) or xml.text or xml.tail:
newoutput.append(">")
if xml.text:
newoutput.append(self.xmlesc(xml.text))
if len(xml):
for child in xml.getchildren():
newoutput.append(self.tostring(child, ixmlns))
newoutput.append("</%s>" % (itag, ))
if xml.tail:
newoutput.append(self.xmlesc(xml.tail))
elif xml.text:
newoutput.append(">%s</%s>" % (self.xmlesc(xml.text), itag))
else:
newoutput.append(" />")
return ''.join(newoutput)
def xmlesc(self, text):
text = list(text)
2009-06-03 22:56:51 +00:00
cc = 0
matches = ('&', '<', '"', '>', "'")
for c in text:
if c in matches:
if c == '&':
text[cc] = '&amp;'
2009-06-03 22:56:51 +00:00
elif c == '<':
text[cc] = '&lt;'
2009-06-03 22:56:51 +00:00
elif c == '>':
text[cc] = '&gt;'
2009-06-03 22:56:51 +00:00
elif c == "'":
text[cc] = '&apos;'
2009-06-03 22:56:51 +00:00
elif self.escape_quotes:
text[cc] = '&quot;'
2009-06-03 22:56:51 +00:00
cc += 1
return ''.join(text)
def start_stream_handler(self, xml):
"""Meant to be overridden"""
pass