Made a first pass at cleaning up XMLStream.

A few extra methods are mentioned in the docs, but those have not
been moved to XMLStream from BaseXMPP yet.
This commit is contained in:
Lance Stout 2010-09-30 12:56:22 -04:00
parent c258d2f19d
commit 7c10ff16fb
2 changed files with 590 additions and 348 deletions

View file

@ -7,5 +7,8 @@
""" """
from sleekxmpp.xmlstream.jid import JID from sleekxmpp.xmlstream.jid import JID
from sleekxmpp.xmlstream.stanzabase import StanzaBase, ElementBase from sleekxmpp.xmlstream.scheduler import Scheduler
from sleekxmpp.xmlstream.stanzabase import StanzaBase, ElementBase, ET
from sleekxmpp.xmlstream.statemachine import StateMachine
from sleekxmpp.xmlstream.tostring import tostring
from sleekxmpp.xmlstream.xmlstream import XMLStream, RESPONSE_TIMEOUT from sleekxmpp.xmlstream.xmlstream import XMLStream, RESPONSE_TIMEOUT

View file

@ -7,392 +7,631 @@
""" """
from __future__ import with_statement, unicode_literals from __future__ import with_statement, unicode_literals
try:
import queue import copy
except ImportError:
import Queue as queue
from . import statemachine
from . stanzabase import StanzaBase
from xml.etree import cElementTree
from xml.parsers import expat
import logging import logging
import socket import socket
import ssl
import sys
import threading import threading
import time import time
import types import types
import copy try:
import xml.sax.saxutils import queue
from . import scheduler except ImportError:
from sleekxmpp.xmlstream.tostring import tostring import Queue as queue
from sleekxmpp.xmlstream import StateMachine, Scheduler, tostring
from sleekxmpp.xmlstream.stanzabase import StanzaBase, ET
# In Python 2.x, file socket objects are broken. A patched socket
# wrapper is provided for this case in filesocket.py.
if sys.version_info < (3, 0):
from sleekxmpp.xmlstream.filesocket import FileSocket, Socket26
# The time in seconds to wait before timing out waiting for response stanzas.
RESPONSE_TIMEOUT = 10 RESPONSE_TIMEOUT = 10
# The number of threads to use to handle XML stream events. This is not the
# same as the number of custom event handling threads. HANDLER_THREADS must
# be at least 1.
HANDLER_THREADS = 1 HANDLER_THREADS = 1
ssl_support = True # Flag indicating if the SSL library is available for use.
#try: SSL_SUPPORT = True
import ssl
#except ImportError:
# ssl_support = False
import sys
if sys.version_info < (3, 0):
#monkey patch broken filesocket object
from . import filesocket
#socket._fileobject = filesocket.filesocket
class RestartStream(Exception): class RestartStream(Exception):
pass """
Exception to restart stream processing, including
resending the stream header.
"""
class CloseStream(Exception):
pass
stanza_extensions = {}
class XMLStream(object): class XMLStream(object):
"A connection manager with XML events." """
An XML stream connection manager and event dispatcher.
def __init__(self, socket=None, host='', port=0, escape_quotes=False): The XMLStream class abstracts away the issues of establishing a
global ssl_support connection with a server and sending and receiving XML "stanzas".
self.ssl_support = ssl_support A stanza is a complete XML element that is a direct child of a root
self.escape_quotes = escape_quotes document element. Two streams are used, one for each communication
self.state = statemachine.StateMachine() direction, over the same socket. Once the connection is closed, both
self.state.addStates({'connected':False, 'is client':False, 'ssl':False, 'tls':False, 'reconnect':True, 'processing':False, 'disconnecting':False}) #set initial states streams should be complete and valid XML documents.
self.setSocket(socket) Three types of events are provided to manage the stream:
self.address = (host, int(port)) Stream -- Triggered based on received stanzas, similar in concept
to events in a SAX XML parser.
self.__thread = {} Custom -- Triggered manually.
Scheduled -- Triggered based on time delays.
self.__root_stanza = []
self.__stanza = {} Typically, stanzas are first processed by a stream event handler which
self.__stanza_extension = {} will then trigger custom events to continue further processing,
self.__handlers = [] especially since custom event handlers may run in individual threads.
self.__tls_socket = None
self.filesocket = None
self.use_ssl = False
self.use_tls = False
self.default_ns = ''
self.stream_header = "<stream>"
self.stream_footer = "</stream>"
self.eventqueue = queue.Queue()
self.sendqueue = queue.Queue()
self.scheduler = scheduler.Scheduler(self.eventqueue)
self.namespace_map = {}
self.run = True
def setSocket(self, socket):
"Set the socket"
self.socket = socket
if socket is not None:
self.filesocket = socket.makefile('rb', 0) # ElementTree.iterparse requires a file. 0 buffer files have to be binary
self.state.set('connected', True)
def setFileSocket(self, filesocket): Attributes:
self.filesocket = filesocket address -- The hostname and port of the server.
default_ns -- The default XML namespace that will be applied
to all non-namespaced stanzas.
event_queue -- A queue of stream, custom, and scheduled
events to be processed.
filesocket -- A filesocket created from the main connection socket.
Required for ElementTree.iterparse.
namespace_map -- Optional mapping of namespaces to namespace prefixes.
scheduler -- A scheduler object for triggering events
after a given period of time.
send_queue -- A queue of stanzas to be sent on the stream.
socket -- The connection to the server.
ssl_support -- Indicates if a SSL library is available for use.
state -- A state machine for managing the stream's
connection state.
stream_footer -- The start tag and any attributes for the stream's
root element.
stream_header -- The closing tag of the stream's root element.
use_ssl -- Flag indicating if SSL should be used.
use_tls -- Flag indicating if TLS should be used.
def connect(self, host='', port=0, use_ssl=False, use_tls=True): Methods:
"Link to connectTCP" add_event_handler -- Add a handler for a custom event.
return self.connectTCP(host, port, use_ssl, use_tls) add_handler -- Shortcut method for registerHandler.
connect -- Connect to the given server.
del_event_handler -- Remove a handler for a custom event.
disconnect -- Disconnect from the server and terminate
processing.
event -- Trigger a custom event.
incoming_filter -- Optionally filter stanzas before processing.
process -- Read XML stanzas from the stream and apply
matching stream handlers.
reconnect -- Reestablish a connection to the server.
register_handler -- Add a handler for a stream event.
register_stanza -- Add a new stanza object type that may appear
as a direct child of the stream's root.
remove_handler -- Remove a stream handler.
remove_stanza -- Remove a stanza object type.
schedule -- Schedule an event handler to execute after a
given delay.
send -- Send a stanza object on the stream.
send_raw -- Send a raw string on the stream.
send_xml -- Send an XML string on the stream.
set_socket -- Set the stream's socket and generate a new
filesocket.
start_stream_handler -- Meant to be overridden.
start_tls -- Establish a TLS connection and restart
the stream.
"""
def connectTCP(self, host='', port=0, use_ssl=None, use_tls=None, reattempt=True): def __init__(self, socket=None, host='', port=0):
"Connect and create socket" """
while reattempt and not self.state['connected']: Establish a new XML stream.
if host and port:
self.address = (host, int(port))
if use_ssl is not None:
self.use_ssl = use_ssl
if use_tls is not None:
self.use_tls = use_tls
self.state.set('is client', True)
if sys.version_info < (3, 0):
self.socket = filesocket.Socket26(socket.AF_INET, socket.SOCK_STREAM)
else:
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.settimeout(None)
if self.use_ssl and self.ssl_support:
logging.debug("Socket Wrapped for SSL")
self.socket = ssl.wrap_socket(self.socket)
try:
self.socket.connect(self.address)
#self.filesocket = self.socket.makefile('rb', 0)
self.filesocket = self.socket.makefile('rb', 0)
self.state.set('connected', True)
return True
except socket.error as serr:
logging.error("Could not connect. Socket Error #%s: %s" % (serr.errno, serr.strerror))
time.sleep(1)
def connectUnix(self, filepath): Arguments:
"Connect to Unix file and create socket" socket -- Use an existing socket for the stream.
Defaults to None to generate a new socket.
host -- The name of the target server.
Defaults to the empty string.
port -- The port to use for the connection.
Defaults to 0.
"""
# To comply with PEP8, method names now use underscores.
# Deprecated method names are re-mapped for backwards compatibility.
self.startTLS = self.start_tls
self.registerStanza = self.register_stanza
self.removeStanza = self.remove_stanza
self.registerHandler = self.register_handler
self.removeHandler = self.remove_handler
self.setSocket = self.set_socket
self.sendRaw = self.send_raw
def startTLS(self): self.ssl_support = SSL_SUPPORT
"Handshakes for TLS"
if self.ssl_support:
logging.info("Negotiating TLS")
self.realsocket = self.socket
self.socket = ssl.wrap_socket(self.socket, ssl_version=ssl.PROTOCOL_TLSv1, do_handshake_on_connect=False)
self.socket.do_handshake()
if sys.version_info < (3,0):
self.filesocket = filesocket.FileSocket(self.socket)
else:
self.filesocket = self.socket.makefile('rb', 0)
return True
else:
logging.warning("Tried to enable TLS, but ssl module not found.")
return False
raise RestartStream()
def process(self, threaded=True): # TODO: Integrate the new state machine.
self.scheduler.process(threaded=True) self.state = StateMachine()
for t in range(0, HANDLER_THREADS): self.state.addStates({'connected': False,
logging.debug("Starting HANDLER THREAD") 'is client': False,
self.__thread['eventhandle%s' % t] = threading.Thread(name='eventhandle%s' % t, target=self._eventRunner) 'ssl': False,
self.__thread['eventhandle%s' % t].start() 'tls': False,
self.__thread['sendthread'] = threading.Thread(name='sendthread', target=self._sendThread) 'reconnect': True,
self.__thread['sendthread'].start() 'processing': False,
if threaded: 'disconnecting': False})
self.__thread['process'] = threading.Thread(name='process', target=self._process)
self.__thread['process'].start()
else:
self._process()
def schedule(self, name, seconds, callback, args=None, kwargs=None, repeat=False): self.address = (host, int(port))
self.scheduler.add(name, seconds, callback, args, kwargs, repeat, qpointer=self.eventqueue) self.filesocket = None
self.set_socket(socket)
def _process(self): self.use_ssl = False
"Start processing the socket." self.use_tls = False
firstrun = True
while self.run and (firstrun or self.state['reconnect']):
self.state.set('processing', True)
firstrun = False
try:
if self.state['is client']:
self.sendRaw(self.stream_header)
while self.run and self.__readXML():
if self.state['is client']:
self.sendRaw(self.stream_header)
except KeyboardInterrupt:
logging.debug("Keyboard Escape Detected")
self.state.set('processing', False)
self.state.set('reconnect', False)
self.disconnect()
self.run = False
self.scheduler.run = False
self.eventqueue.put(('quit', None, None))
return
except CloseStream:
return
except SystemExit:
self.eventqueue.put(('quit', None, None))
return
except socket.error:
if not self.state.reconnect:
return
else:
self.state.set('processing', False)
logging.exception('Socket Error')
self.disconnect(reconnect=True)
except:
if not self.state.reconnect:
return
else:
self.state.set('processing', False)
logging.exception('Connection error. Reconnecting.')
self.disconnect(reconnect=True)
if self.state['reconnect']:
self.reconnect()
self.state.set('processing', False)
self.eventqueue.put(('quit', None, None))
#self.__thread['readXML'] = threading.Thread(name='readXML', target=self.__readXML)
#self.__thread['readXML'].start()
#self.__thread['spawnEvents'] = threading.Thread(name='spawnEvents', target=self.__spawnEvents)
#self.__thread['spawnEvents'].start()
def __readXML(self): self.default_ns = ''
"Parses the incoming stream, adding to xmlin queue as it goes" self.stream_header = "<stream>"
#build cElementTree object from expat was we go self.stream_footer = "</stream>"
#self.filesocket = self.socket.makefile('rb', 0)
#print self.filesocket.read(1024) #self.filesocket._sock.recv(1024)
edepth = 0
root = None
for (event, xmlobj) in cElementTree.iterparse(self.filesocket, (b'end', b'start')):
if edepth == 0: # and xmlobj.tag.split('}', 1)[-1] == self.basetag:
if event == b'start':
root = xmlobj
self.start_stream_handler(root)
if event == b'end':
edepth += -1
if edepth == 0 and event == b'end':
self.disconnect(reconnect=self.state['reconnect'])
logging.debug("Ending readXML loop")
return False
elif edepth == 1:
#self.xmlin.put(xmlobj)
try:
self.__spawnEvent(xmlobj)
except RestartStream:
return True
except CloseStream:
logging.debug("Ending readXML loop")
return False
if root:
root.clear()
if event == b'start':
edepth += 1
logging.debug("Ending readXML loop")
def _sendThread(self): self.event_queue = queue.Queue()
while self.run: self.send_queue = queue.Queue()
data = self.sendqueue.get(True) self.scheduler = Scheduler(self.event_queue)
logging.debug("SEND: %s" % data)
try: self.namespace_map = {}
self.socket.send(data.encode('utf-8'))
#self.socket.send(bytes(data, "utf-8"))
#except socket.error,(errno, strerror):
except:
logging.warning("Failed to send %s" % data)
self.state.set('connected', False)
if self.state.reconnect:
logging.exception("Disconnected. Socket Error.")
self.disconnect(reconnect=True)
def sendRaw(self, data): self.__thread = {}
self.sendqueue.put(data) self.__root_stanza = []
return True self.__handlers = []
def disconnect(self, reconnect=False): self.run = True
self.state.set('reconnect', reconnect)
if self.state['disconnecting']:
return
if not self.state['reconnect']:
logging.debug("Disconnecting...")
self.state.set('disconnecting', True)
self.run = False
self.scheduler.run = False
if self.state['connected']:
self.sendRaw(self.stream_footer)
time.sleep(1)
#send end of stream
#wait for end of stream back
try:
self.socket.close()
self.filesocket.close()
self.socket.shutdown(socket.SHUT_RDWR)
except socket.error as serr:
#logging.warning("Error while disconnecting. Socket Error #%s: %s" % (errno, strerror))
#thread.exit_thread()
pass
if self.state['processing']:
#raise CloseStream
pass
def reconnect(self): def connect(self, host='', port=0, use_ssl=False,
self.state.set('tls',False) use_tls=True, reattempt=True):
self.state.set('ssl',False) """
time.sleep(1) Create a new socket and connect to the server.
self.connect()
def incoming_filter(self, xmlobj): Setting reattempt to True will cause connection attempts to be made
return xmlobj every second until a successful connection is established.
def __spawnEvent(self, xmlobj): Arguments:
"watching xmlOut and processes handlers" host -- The name of the desired server for the connection.
#convert XML into Stanza port -- Port to connect to on the server.
logging.debug("RECV: %s" % tostring(xmlobj, xmlns=self.default_ns, stream=self)) use_ssl -- Flag indicating if SSL should be used.
xmlobj = self.incoming_filter(xmlobj) use_tls -- Flag indicating if TLS should be used.
stanza_type = StanzaBase reattempt -- Flag indicating if the socket should reconnect
for stanza_class in self.__root_stanza: after disconnections.
if xmlobj.tag == "{%s}%s" % (self.default_ns, stanza_class.name): """
stanza_type = stanza_class if host and port:
break self.address = (host, int(port))
unhandled = True
stanza = stanza_type(self, xmlobj)
for handler in self.__handlers:
if handler.match(stanza):
stanza_copy = stanza_type(self, copy.deepcopy(xmlobj))
handler.prerun(stanza_copy)
self.eventqueue.put(('stanza', handler, stanza_copy))
if handler.checkDelete(): self.__handlers.pop(self.__handlers.index(handler))
unhandled = False
if unhandled:
stanza.unhandled()
#loop through handlers and test match
#spawn threads as necessary, call handlers, sending Stanza
def _eventRunner(self): # Respect previous SSL and TLS usage directives.
logging.debug("Loading event runner") if use_ssl is not None:
while self.run: self.use_ssl = use_ssl
try: if use_tls is not None:
event = self.eventqueue.get(True, timeout=5) self.use_tls = use_tls
except queue.Empty:
event = None
except KeyboardInterrupt:
self.run = False
self.scheduler.run = False
if event is not None:
etype = event[0]
handler = event[1]
args = event[2:]
#etype, handler, *args = event #python 3.x way
if etype == 'stanza':
try:
handler.run(args[0])
except Exception as e:
logging.exception('Error processing event handler: %s' % handler.name)
args[0].exception(e)
elif etype == 'schedule':
try:
logging.debug(args)
handler(*args[0])
except:
logging.exception('Error processing scheduled task')
elif etype == 'quit':
logging.debug("Quitting eventRunner thread")
return False
def registerHandler(self, handler, before=None, after=None): self.state.set('is client', True)
"Add handler with matcher class and parameters."
if handler.stream is None:
self.__handlers.append(handler)
handler.stream = self
def removeHandler(self, name): # Repeatedly attempt to connect until a successful connection
"Removes the handler." # is established.
idx = 0 while reattempt and not self.state['connected']:
for handler in self.__handlers: if sys.version_info < (3, 0):
if handler.name == name: self.socket = Socket26(socket.AF_INET, socket.SOCK_STREAM)
self.__handlers.pop(idx) else:
return True self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
idx += 1 self.socket.settimeout(None)
return False if self.use_ssl and self.ssl_support:
logging.debug("Socket Wrapped for SSL")
self.socket = ssl.wrap_socket(self.socket)
try:
self.socket.connect(self.address)
self.set_socket(self.socket)
self.state.set('connected', True)
return True
except socket.error as serr:
error_msg = "Could not connect. Socket Error #%s: %s"
logging.error(error_msg % (serr.errno, serr.strerror))
time.sleep(1)
def registerStanza(self, stanza_class): def disconnect(self, reconnect=False):
"Adds stanza. If root stanzas build stanzas sent in events while non-root stanzas build substanza objects." """
self.__root_stanza.append(stanza_class) Terminate processing and close the XML streams.
def registerStanzaExtension(self, stanza_class, stanza_extension): Optionally, the connection may be reconnected and
if stanza_class not in stanza_extensions: resume processing afterwards.
stanza_extensions[stanza_class] = [stanza_extension]
else:
stanza_extensions[stanza_class].append(stanza_extension)
def removeStanza(self, stanza_class, root=False): Arguments:
"Removes the stanza's registration." reconnect -- Flag indicating if the connection
if root: and processing should be restarted.
del self.__root_stanza[stanza_class] Defaults to False.
else: """
del self.__stanza[stanza_class] self.state.set('reconnect', reconnect)
if self.state['disconnecting']:
return
if not self.state['reconnect']:
logging.debug("Disconnecting...")
self.state.set('disconnecting', True)
self.run = False
self.scheduler.run = False
if self.state['connected']:
# Send the end of stream marker.
self.send_raw(self.stream_footer)
# Wait for confirmation that the stream was
# closed in the other direction.
time.sleep(1)
try:
self.socket.close()
self.filesocket.close()
self.socket.shutdown(socket.SHUT_RDWR)
except socket.error as serr:
pass
def removeStanzaExtension(self, stanza_class, stanza_extension): def reconnect(self):
stanza_extension[stanza_class].pop(stanza_extension) """
Reset the stream's state and reconnect to the server.
"""
self.state.set('tls',False)
self.state.set('ssl',False)
time.sleep(1)
self.connect()
def start_stream_handler(self, xml): def set_socket(self, socket):
"""Meant to be overridden""" """
pass Set the socket to use for the stream.
The filesocket will be recreated as well.
Arguments:
socket -- The new socket to use.
"""
self.socket = socket
if socket is not None:
# ElementTree.iterparse requires a file.
# 0 buffer files have to be binary.
# Use the correct fileobject type based on the Python
# version to work around a broken implementation in
# Python 2.x.
if sys.version_info < (3, 0):
self.filesocket = FileSocket(self.socket)
else:
self.filesocket = self.socket.makefile('rb', 0)
self.state.set('connected', True)
def start_tls(self):
"""
Perform handshakes for TLS.
If the handshake is successful, the XML stream will need
to be restarted.
"""
if self.ssl_support:
logging.info("Negotiating TLS")
self.socket = ssl.wrap_socket(self.socket,
ssl_version=ssl.PROTOCOL_TLSv1,
do_handshake_on_connect=False)
self.socket.do_handshake()
self.set_socket(self.socket)
return True
else:
logging.warning("Tried to enable TLS, but ssl module not found.")
return False
def start_stream_handler(self, xml):
"""Meant to be overridden"""
pass
def register_stanza(self, stanza_class):
"""
Add a stanza object class as a known root stanza. A root stanza is
one that appears as a direct child of the stream's root element.
Stanzas that appear as substanzas of a root stanza do not need to
be registered here. That is done using registerStanzaPlugin() from
sleekxmpp.xmlstream.stanzabase.
Stanzas that are not registered will not be converted into
stanza objects, but may still be processed using handlers and
matchers.
Arguments:
stanza_class -- The top-level stanza object's class.
"""
self.__root_stanza.append(stanza_class)
def remove_stanza(self, stanza_class):
"""
Remove a stanza from being a known root stanza. A root stanza is
one that appears as a direct child of the stream's root element.
Stanzas that are not registered will not be converted into
stanza objects, but may still be processed using handlers and
matchers.
"""
del self.__root_stanza[stanza_class]
def register_handler(self, handler, before=None, after=None):
"""
Add a stream event handler that will be executed when a matching
stanza is received.
Arguments:
handler -- The handler object to execute.
"""
if handler.stream is None:
self.__handlers.append(handler)
handler.stream = self
def remove_handler(self, name):
"""
Remove any stream event handlers with the given name.
Arguments:
name -- The name of the handler.
"""
idx = 0
for handler in self.__handlers:
if handler.name == name:
self.__handlers.pop(idx)
return True
idx += 1
return False
def schedule(self, name, seconds, callback, args=None,
kwargs=None, repeat=False):
"""
Schedule a callback function to execute after a given delay.
Arguments:
name -- A unique name for the scheduled callback.
seconds -- The time in seconds to wait before executing.
callback -- A pointer to the function to execute.
args -- A tuple of arguments to pass to the function.
kwargs -- A dictionary of keyword arguments to pass to
the function.
repeat -- Flag indicating if the scheduled event should
be reset and repeat after executing.
"""
self.scheduler.add(name, seconds, callback, args, kwargs,
repeat, qpointer=self.event_queue)
def incoming_filter(self, xml):
"""
Filter incoming XML objects before they are processed.
Possible uses include remapping namespaces, or correcting elements
from sources with incorrect behavior.
Meant to be overridden.
"""
return xml
def send_raw(self, data):
"""
Send raw data across the stream.
Arguments:
data -- Any string value.
"""
self.send_queue.put(data)
return True
def process(self, threaded=True):
"""
Initialize the XML streams and begin processing events.
The number of threads used for processing stream events is determined
by HANDLER_THREADS.
Arguments:
threaded -- If threaded=True then event dispatcher will run
in a separate thread, allowing for the stream to be used
in the background for another application. Defaults
to True.
Event handlers and the send queue will be threaded
regardless of this parameter's value.
"""
self.scheduler.process(threaded=True)
def start_thread(name, target):
self.__thread[name] = threading.Thread(name=name, target=target)
self.__thread[name].start()
for t in range(0, HANDLER_THREADS):
logging.debug("Starting HANDLER THREAD")
start_thread('stream_event_handler_%s' % t, self._event_runner)
start_thread('send_thread', self._send_thread)
if threaded:
# Run the XML stream in the background for another application.
start_thread('process', self._process)
else:
self._process()
def _process(self):
"""
Start processing the XML streams.
Processing will continue after any recoverable errors
if reconnections are allowed.
"""
firstrun = True
# The body of this loop will only execute once per connection.
# Additional passes will be made only if an error occurs and
# reconnecting is permitted.
while self.run and (firstrun or self.state['reconnect']):
self.state.set('processing', True)
firstrun = False
try:
if self.state['is client']:
self.send_raw(self.stream_header)
# The call to self.__read_xml will block and prevent
# the body of the loop from running until a diconnect
# occurs. After any reconnection, the stream header will
# be resent and processing will resume.
while self.run and self.__read_xml():
# Ensure the stream header is sent for any
# new connections.
if self.state['is client']:
self.send_raw(self.stream_header)
except KeyboardInterrupt:
logging.debug("Keyboard Escape Detected")
self.state.set('processing', False)
self.state.set('reconnect', False)
self.disconnect()
self.run = False
self.scheduler.run = False
self.event_queue.put(('quit', None, None))
return
except SystemExit:
self.event_queue.put(('quit', None, None))
return
except socket.error:
if not self.state.reconnect:
return
self.state.set('processing', False)
logging.exception('Socket Error')
self.disconnect(reconnect=True)
except:
if not self.state.reconnect:
return
self.state.set('processing', False)
logging.exception('Connection error. Reconnecting.')
self.disconnect(reconnect=True)
if self.state['reconnect']:
self.reconnect()
self.state.set('processing', False)
self.event_queue.put(('quit', None, None))
def __read_xml(self):
"""
Parse the incoming XML stream, raising stream events for
each received stanza.
"""
depth = 0
root = None
for (event, xml) in ET.iterparse(self.filesocket, (b'end', b'start')):
if event == b'start':
if depth == 0:
# We have received the start of the root element.
root = xml
# Perform any stream initialization actions, such
# as handshakes.
self.start_stream_handler(root)
depth += 1
if event == b'end':
depth -= 1
if depth == 0:
# The stream's root element has closed,
# terminating the stream.
self.disconnect(reconnect=self.state['reconnect'])
logging.debug("Ending read XML loop")
return False
elif depth == 1:
# We only raise events for stanzas that are direct
# children of the root element.
try:
self.__spawn_event(xml)
except RestartStream:
return True
if root:
# Keep the root element empty of children to
# save on memory use.
root.clear()
logging.debug("Ending read XML loop")
def __spawn_event(self, xml):
"""
Analyze incoming XML stanzas and convert them into stanza
objects if applicable and queue stream events to be processed
by matching handlers.
Arguments:
xml -- The XML stanza to analyze.
"""
logging.debug("RECV: %s" % tostring(xml,
xmlns=self.default_ns,
stream=self))
# Apply any preprocessing filters.
xml = self.incoming_filter(xml)
# Convert the raw XML object into a stanza object. If no registered
# stanza type applies, a generic StanzaBase stanza will be used.
stanza_type = StanzaBase
for stanza_class in self.__root_stanza:
if xml.tag == "{%s}%s" % (self.default_ns, stanza_class.name):
stanza_type = stanza_class
break
stanza = stanza_type(self, xml)
# Match the stanza against registered handlers. Handlers marked
# to run "in stream" will be executed immediately; the rest will
# be queued.
unhandled = True
for handler in self.__handlers:
if handler.match(stanza):
stanza_copy = stanza_type(self, copy.deepcopy(xml))
handler.prerun(stanza_copy)
self.event_queue.put(('stanza', handler, stanza_copy))
if handler.checkDelete():
self.__handlers.pop(self.__handlers.index(handler))
unhandled = False
# Some stanzas require responses, such as Iq queries. A default
# handler will be executed immediately for this case.
if unhandled:
stanza.unhandled()
def _event_runner(self):
"""
Process the event queue and execute handlers.
The number of event runner threads is controlled by HANDLER_THREADS.
Stream event handlers will all execute in this thread. Custom event
handlers may be spawned in individual threads.
"""
logging.debug("Loading event runner")
while self.run:
try:
event = self.event_queue.get(True, timeout=5)
except queue.Empty:
event = None
except KeyboardInterrupt:
self.run = False
self.scheduler.run = False
if event is None:
continue
etype, handler = event[0:2]
args = event[2:]
if etype == 'stanza':
try:
handler.run(args[0])
except Exception as e:
logging.exception('Error processing event handler: %s' % handler.name)
args[0].exception(e)
elif etype == 'schedule':
try:
logging.debug(args)
handler(*args[0])
except:
logging.exception('Error processing scheduled task')
elif etype == 'quit':
logging.debug("Quitting event runner thread")
return False
def _send_thread(self):
"""
Extract stanzas from the send queue and send them on the stream.
"""
while self.run:
data = self.send_queue.get(True)
logging.debug("SEND: %s" % data)
try:
self.socket.send(data.encode('utf-8'))
except:
logging.warning("Failed to send %s" % data)
self.state.set('connected', False)
if self.state.reconnect:
logging.exception("Disconnected. Socket Error.")
self.disconnect(reconnect=True)