diff --git a/sleekxmpp/xmlstream/__init__.py b/sleekxmpp/xmlstream/__init__.py
index c82ab34..8533ca2 100644
--- a/sleekxmpp/xmlstream/__init__.py
+++ b/sleekxmpp/xmlstream/__init__.py
@@ -7,5 +7,8 @@
"""
from sleekxmpp.xmlstream.jid import JID
-from sleekxmpp.xmlstream.stanzabase import StanzaBase, ElementBase
+from sleekxmpp.xmlstream.scheduler import Scheduler
+from sleekxmpp.xmlstream.stanzabase import StanzaBase, ElementBase, ET
+from sleekxmpp.xmlstream.statemachine import StateMachine
+from sleekxmpp.xmlstream.tostring import tostring
from sleekxmpp.xmlstream.xmlstream import XMLStream, RESPONSE_TIMEOUT
diff --git a/sleekxmpp/xmlstream/xmlstream.py b/sleekxmpp/xmlstream/xmlstream.py
index 28aee2b..97d977e 100644
--- a/sleekxmpp/xmlstream/xmlstream.py
+++ b/sleekxmpp/xmlstream/xmlstream.py
@@ -7,392 +7,631 @@
"""
from __future__ import with_statement, unicode_literals
-try:
- import queue
-except ImportError:
- import Queue as queue
-from . import statemachine
-from . stanzabase import StanzaBase
-from xml.etree import cElementTree
-from xml.parsers import expat
+
+import copy
import logging
import socket
+import ssl
+import sys
import threading
import time
import types
-import copy
-import xml.sax.saxutils
-from . import scheduler
-from sleekxmpp.xmlstream.tostring import tostring
+try:
+ import queue
+except ImportError:
+ import Queue as queue
+from sleekxmpp.xmlstream import StateMachine, Scheduler, tostring
+from sleekxmpp.xmlstream.stanzabase import StanzaBase, ET
+
+# In Python 2.x, file socket objects are broken. A patched socket
+# wrapper is provided for this case in filesocket.py.
+if sys.version_info < (3, 0):
+ from sleekxmpp.xmlstream.filesocket import FileSocket, Socket26
+
+
+# The time in seconds to wait before timing out waiting for response stanzas.
RESPONSE_TIMEOUT = 10
+
+# The number of threads to use to handle XML stream events. This is not the
+# same as the number of custom event handling threads. HANDLER_THREADS must
+# be at least 1.
HANDLER_THREADS = 1
-ssl_support = True
-#try:
-import ssl
-#except ImportError:
-# ssl_support = False
-import sys
-if sys.version_info < (3, 0):
- #monkey patch broken filesocket object
- from . import filesocket
- #socket._fileobject = filesocket.filesocket
+# Flag indicating if the SSL library is available for use.
+SSL_SUPPORT = True
class RestartStream(Exception):
- pass
+ """
+ Exception to restart stream processing, including
+ resending the stream header.
+ """
-class CloseStream(Exception):
- pass
-
-stanza_extensions = {}
class XMLStream(object):
- "A connection manager with XML events."
+ """
+ An XML stream connection manager and event dispatcher.
- def __init__(self, socket=None, host='', port=0, escape_quotes=False):
- global ssl_support
- self.ssl_support = ssl_support
- self.escape_quotes = escape_quotes
- self.state = statemachine.StateMachine()
- self.state.addStates({'connected':False, 'is client':False, 'ssl':False, 'tls':False, 'reconnect':True, 'processing':False, 'disconnecting':False}) #set initial states
+ The XMLStream class abstracts away the issues of establishing a
+ connection with a server and sending and receiving XML "stanzas".
+ A stanza is a complete XML element that is a direct child of a root
+ document element. Two streams are used, one for each communication
+ direction, over the same socket. Once the connection is closed, both
+ streams should be complete and valid XML documents.
- self.setSocket(socket)
- self.address = (host, int(port))
-
- self.__thread = {}
-
- self.__root_stanza = []
- self.__stanza = {}
- self.__stanza_extension = {}
- self.__handlers = []
-
- self.__tls_socket = None
- self.filesocket = None
- self.use_ssl = False
- self.use_tls = False
-
- self.default_ns = ''
- self.stream_header = ""
- self.stream_footer = ""
-
- self.eventqueue = queue.Queue()
- self.sendqueue = queue.Queue()
- self.scheduler = scheduler.Scheduler(self.eventqueue)
-
- self.namespace_map = {}
-
- self.run = True
-
- def setSocket(self, socket):
- "Set the socket"
- self.socket = socket
- if socket is not None:
- self.filesocket = socket.makefile('rb', 0) # ElementTree.iterparse requires a file. 0 buffer files have to be binary
- self.state.set('connected', True)
+ Three types of events are provided to manage the stream:
+ Stream -- Triggered based on received stanzas, similar in concept
+ to events in a SAX XML parser.
+ Custom -- Triggered manually.
+ Scheduled -- Triggered based on time delays.
+
+ Typically, stanzas are first processed by a stream event handler which
+ will then trigger custom events to continue further processing,
+ especially since custom event handlers may run in individual threads.
- def setFileSocket(self, filesocket):
- self.filesocket = filesocket
+ Attributes:
+ address -- The hostname and port of the server.
+ default_ns -- The default XML namespace that will be applied
+ to all non-namespaced stanzas.
+ event_queue -- A queue of stream, custom, and scheduled
+ events to be processed.
+ filesocket -- A filesocket created from the main connection socket.
+ Required for ElementTree.iterparse.
+ namespace_map -- Optional mapping of namespaces to namespace prefixes.
+ scheduler -- A scheduler object for triggering events
+ after a given period of time.
+ send_queue -- A queue of stanzas to be sent on the stream.
+ socket -- The connection to the server.
+ ssl_support -- Indicates if a SSL library is available for use.
+ state -- A state machine for managing the stream's
+ connection state.
+ stream_footer -- The start tag and any attributes for the stream's
+ root element.
+ stream_header -- The closing tag of the stream's root element.
+ use_ssl -- Flag indicating if SSL should be used.
+ use_tls -- Flag indicating if TLS should be used.
- def connect(self, host='', port=0, use_ssl=False, use_tls=True):
- "Link to connectTCP"
- return self.connectTCP(host, port, use_ssl, use_tls)
+ Methods:
+ add_event_handler -- Add a handler for a custom event.
+ add_handler -- Shortcut method for registerHandler.
+ connect -- Connect to the given server.
+ del_event_handler -- Remove a handler for a custom event.
+ disconnect -- Disconnect from the server and terminate
+ processing.
+ event -- Trigger a custom event.
+ incoming_filter -- Optionally filter stanzas before processing.
+ process -- Read XML stanzas from the stream and apply
+ matching stream handlers.
+ reconnect -- Reestablish a connection to the server.
+ register_handler -- Add a handler for a stream event.
+ register_stanza -- Add a new stanza object type that may appear
+ as a direct child of the stream's root.
+ remove_handler -- Remove a stream handler.
+ remove_stanza -- Remove a stanza object type.
+ schedule -- Schedule an event handler to execute after a
+ given delay.
+ send -- Send a stanza object on the stream.
+ send_raw -- Send a raw string on the stream.
+ send_xml -- Send an XML string on the stream.
+ set_socket -- Set the stream's socket and generate a new
+ filesocket.
+ start_stream_handler -- Meant to be overridden.
+ start_tls -- Establish a TLS connection and restart
+ the stream.
+ """
- def connectTCP(self, host='', port=0, use_ssl=None, use_tls=None, reattempt=True):
- "Connect and create socket"
- while reattempt and not self.state['connected']:
- if host and port:
- self.address = (host, int(port))
- if use_ssl is not None:
- self.use_ssl = use_ssl
- if use_tls is not None:
- self.use_tls = use_tls
- self.state.set('is client', True)
- if sys.version_info < (3, 0):
- self.socket = filesocket.Socket26(socket.AF_INET, socket.SOCK_STREAM)
- else:
- self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.socket.settimeout(None)
- if self.use_ssl and self.ssl_support:
- logging.debug("Socket Wrapped for SSL")
- self.socket = ssl.wrap_socket(self.socket)
- try:
- self.socket.connect(self.address)
- #self.filesocket = self.socket.makefile('rb', 0)
- self.filesocket = self.socket.makefile('rb', 0)
- self.state.set('connected', True)
- return True
- except socket.error as serr:
- logging.error("Could not connect. Socket Error #%s: %s" % (serr.errno, serr.strerror))
- time.sleep(1)
+ def __init__(self, socket=None, host='', port=0):
+ """
+ Establish a new XML stream.
- def connectUnix(self, filepath):
- "Connect to Unix file and create socket"
+ Arguments:
+ socket -- Use an existing socket for the stream.
+ Defaults to None to generate a new socket.
+ host -- The name of the target server.
+ Defaults to the empty string.
+ port -- The port to use for the connection.
+ Defaults to 0.
+ """
+ # To comply with PEP8, method names now use underscores.
+ # Deprecated method names are re-mapped for backwards compatibility.
+ self.startTLS = self.start_tls
+ self.registerStanza = self.register_stanza
+ self.removeStanza = self.remove_stanza
+ self.registerHandler = self.register_handler
+ self.removeHandler = self.remove_handler
+ self.setSocket = self.set_socket
+ self.sendRaw = self.send_raw
- def startTLS(self):
- "Handshakes for TLS"
- if self.ssl_support:
- logging.info("Negotiating TLS")
- self.realsocket = self.socket
- self.socket = ssl.wrap_socket(self.socket, ssl_version=ssl.PROTOCOL_TLSv1, do_handshake_on_connect=False)
- self.socket.do_handshake()
- if sys.version_info < (3,0):
- self.filesocket = filesocket.FileSocket(self.socket)
- else:
- self.filesocket = self.socket.makefile('rb', 0)
- return True
- else:
- logging.warning("Tried to enable TLS, but ssl module not found.")
- return False
- raise RestartStream()
+ self.ssl_support = SSL_SUPPORT
- def process(self, threaded=True):
- self.scheduler.process(threaded=True)
- for t in range(0, HANDLER_THREADS):
- logging.debug("Starting HANDLER THREAD")
- self.__thread['eventhandle%s' % t] = threading.Thread(name='eventhandle%s' % t, target=self._eventRunner)
- self.__thread['eventhandle%s' % t].start()
- self.__thread['sendthread'] = threading.Thread(name='sendthread', target=self._sendThread)
- self.__thread['sendthread'].start()
- if threaded:
- self.__thread['process'] = threading.Thread(name='process', target=self._process)
- self.__thread['process'].start()
- else:
- self._process()
+ # TODO: Integrate the new state machine.
+ self.state = StateMachine()
+ self.state.addStates({'connected': False,
+ 'is client': False,
+ 'ssl': False,
+ 'tls': False,
+ 'reconnect': True,
+ 'processing': False,
+ 'disconnecting': False})
- def schedule(self, name, seconds, callback, args=None, kwargs=None, repeat=False):
- self.scheduler.add(name, seconds, callback, args, kwargs, repeat, qpointer=self.eventqueue)
+ self.address = (host, int(port))
+ self.filesocket = None
+ self.set_socket(socket)
- def _process(self):
- "Start processing the socket."
- firstrun = True
- while self.run and (firstrun or self.state['reconnect']):
- self.state.set('processing', True)
- firstrun = False
- try:
- if self.state['is client']:
- self.sendRaw(self.stream_header)
- while self.run and self.__readXML():
- if self.state['is client']:
- self.sendRaw(self.stream_header)
- except KeyboardInterrupt:
- logging.debug("Keyboard Escape Detected")
- self.state.set('processing', False)
- self.state.set('reconnect', False)
- self.disconnect()
- self.run = False
- self.scheduler.run = False
- self.eventqueue.put(('quit', None, None))
- return
- except CloseStream:
- return
- except SystemExit:
- self.eventqueue.put(('quit', None, None))
- return
- except socket.error:
- if not self.state.reconnect:
- return
- else:
- self.state.set('processing', False)
- logging.exception('Socket Error')
- self.disconnect(reconnect=True)
- except:
- if not self.state.reconnect:
- return
- else:
- self.state.set('processing', False)
- logging.exception('Connection error. Reconnecting.')
- self.disconnect(reconnect=True)
- if self.state['reconnect']:
- self.reconnect()
- self.state.set('processing', False)
- self.eventqueue.put(('quit', None, None))
- #self.__thread['readXML'] = threading.Thread(name='readXML', target=self.__readXML)
- #self.__thread['readXML'].start()
- #self.__thread['spawnEvents'] = threading.Thread(name='spawnEvents', target=self.__spawnEvents)
- #self.__thread['spawnEvents'].start()
+ self.use_ssl = False
+ self.use_tls = False
- def __readXML(self):
- "Parses the incoming stream, adding to xmlin queue as it goes"
- #build cElementTree object from expat was we go
- #self.filesocket = self.socket.makefile('rb', 0)
- #print self.filesocket.read(1024) #self.filesocket._sock.recv(1024)
- edepth = 0
- root = None
- for (event, xmlobj) in cElementTree.iterparse(self.filesocket, (b'end', b'start')):
- if edepth == 0: # and xmlobj.tag.split('}', 1)[-1] == self.basetag:
- if event == b'start':
- root = xmlobj
- self.start_stream_handler(root)
- if event == b'end':
- edepth += -1
- if edepth == 0 and event == b'end':
- self.disconnect(reconnect=self.state['reconnect'])
- logging.debug("Ending readXML loop")
- return False
- elif edepth == 1:
- #self.xmlin.put(xmlobj)
- try:
- self.__spawnEvent(xmlobj)
- except RestartStream:
- return True
- except CloseStream:
- logging.debug("Ending readXML loop")
- return False
- if root:
- root.clear()
- if event == b'start':
- edepth += 1
- logging.debug("Ending readXML loop")
+ self.default_ns = ''
+ self.stream_header = ""
+ self.stream_footer = ""
- def _sendThread(self):
- while self.run:
- data = self.sendqueue.get(True)
- logging.debug("SEND: %s" % data)
- try:
- self.socket.send(data.encode('utf-8'))
- #self.socket.send(bytes(data, "utf-8"))
- #except socket.error,(errno, strerror):
- except:
- logging.warning("Failed to send %s" % data)
- self.state.set('connected', False)
- if self.state.reconnect:
- logging.exception("Disconnected. Socket Error.")
- self.disconnect(reconnect=True)
+ self.event_queue = queue.Queue()
+ self.send_queue = queue.Queue()
+ self.scheduler = Scheduler(self.event_queue)
+
+ self.namespace_map = {}
- def sendRaw(self, data):
- self.sendqueue.put(data)
- return True
+ self.__thread = {}
+ self.__root_stanza = []
+ self.__handlers = []
- def disconnect(self, reconnect=False):
- self.state.set('reconnect', reconnect)
- if self.state['disconnecting']:
- return
- if not self.state['reconnect']:
- logging.debug("Disconnecting...")
- self.state.set('disconnecting', True)
- self.run = False
- self.scheduler.run = False
- if self.state['connected']:
- self.sendRaw(self.stream_footer)
- time.sleep(1)
- #send end of stream
- #wait for end of stream back
- try:
- self.socket.close()
- self.filesocket.close()
- self.socket.shutdown(socket.SHUT_RDWR)
- except socket.error as serr:
- #logging.warning("Error while disconnecting. Socket Error #%s: %s" % (errno, strerror))
- #thread.exit_thread()
- pass
- if self.state['processing']:
- #raise CloseStream
- pass
+ self.run = True
- def reconnect(self):
- self.state.set('tls',False)
- self.state.set('ssl',False)
- time.sleep(1)
- self.connect()
+ def connect(self, host='', port=0, use_ssl=False,
+ use_tls=True, reattempt=True):
+ """
+ Create a new socket and connect to the server.
- def incoming_filter(self, xmlobj):
- return xmlobj
+ Setting reattempt to True will cause connection attempts to be made
+ every second until a successful connection is established.
- def __spawnEvent(self, xmlobj):
- "watching xmlOut and processes handlers"
- #convert XML into Stanza
- logging.debug("RECV: %s" % tostring(xmlobj, xmlns=self.default_ns, stream=self))
- xmlobj = self.incoming_filter(xmlobj)
- stanza_type = StanzaBase
- for stanza_class in self.__root_stanza:
- if xmlobj.tag == "{%s}%s" % (self.default_ns, stanza_class.name):
- stanza_type = stanza_class
- break
- unhandled = True
- stanza = stanza_type(self, xmlobj)
- for handler in self.__handlers:
- if handler.match(stanza):
- stanza_copy = stanza_type(self, copy.deepcopy(xmlobj))
- handler.prerun(stanza_copy)
- self.eventqueue.put(('stanza', handler, stanza_copy))
- if handler.checkDelete(): self.__handlers.pop(self.__handlers.index(handler))
- unhandled = False
- if unhandled:
- stanza.unhandled()
- #loop through handlers and test match
- #spawn threads as necessary, call handlers, sending Stanza
+ Arguments:
+ host -- The name of the desired server for the connection.
+ port -- Port to connect to on the server.
+ use_ssl -- Flag indicating if SSL should be used.
+ use_tls -- Flag indicating if TLS should be used.
+ reattempt -- Flag indicating if the socket should reconnect
+ after disconnections.
+ """
+ if host and port:
+ self.address = (host, int(port))
- def _eventRunner(self):
- logging.debug("Loading event runner")
- while self.run:
- try:
- event = self.eventqueue.get(True, timeout=5)
- except queue.Empty:
- event = None
- except KeyboardInterrupt:
- self.run = False
- self.scheduler.run = False
- if event is not None:
- etype = event[0]
- handler = event[1]
- args = event[2:]
- #etype, handler, *args = event #python 3.x way
- if etype == 'stanza':
- try:
- handler.run(args[0])
- except Exception as e:
- logging.exception('Error processing event handler: %s' % handler.name)
- args[0].exception(e)
- elif etype == 'schedule':
- try:
- logging.debug(args)
- handler(*args[0])
- except:
- logging.exception('Error processing scheduled task')
- elif etype == 'quit':
- logging.debug("Quitting eventRunner thread")
- return False
+ # Respect previous SSL and TLS usage directives.
+ if use_ssl is not None:
+ self.use_ssl = use_ssl
+ if use_tls is not None:
+ self.use_tls = use_tls
- def registerHandler(self, handler, before=None, after=None):
- "Add handler with matcher class and parameters."
- if handler.stream is None:
- self.__handlers.append(handler)
- handler.stream = self
+ self.state.set('is client', True)
- def removeHandler(self, name):
- "Removes the handler."
- idx = 0
- for handler in self.__handlers:
- if handler.name == name:
- self.__handlers.pop(idx)
- return True
- idx += 1
- return False
+ # Repeatedly attempt to connect until a successful connection
+ # is established.
+ while reattempt and not self.state['connected']:
+ if sys.version_info < (3, 0):
+ self.socket = Socket26(socket.AF_INET, socket.SOCK_STREAM)
+ else:
+ self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.socket.settimeout(None)
+ if self.use_ssl and self.ssl_support:
+ logging.debug("Socket Wrapped for SSL")
+ self.socket = ssl.wrap_socket(self.socket)
+ try:
+ self.socket.connect(self.address)
+ self.set_socket(self.socket)
+ self.state.set('connected', True)
+ return True
+ except socket.error as serr:
+ error_msg = "Could not connect. Socket Error #%s: %s"
+ logging.error(error_msg % (serr.errno, serr.strerror))
+ time.sleep(1)
- def registerStanza(self, stanza_class):
- "Adds stanza. If root stanzas build stanzas sent in events while non-root stanzas build substanza objects."
- self.__root_stanza.append(stanza_class)
+ def disconnect(self, reconnect=False):
+ """
+ Terminate processing and close the XML streams.
- def registerStanzaExtension(self, stanza_class, stanza_extension):
- if stanza_class not in stanza_extensions:
- stanza_extensions[stanza_class] = [stanza_extension]
- else:
- stanza_extensions[stanza_class].append(stanza_extension)
+ Optionally, the connection may be reconnected and
+ resume processing afterwards.
- def removeStanza(self, stanza_class, root=False):
- "Removes the stanza's registration."
- if root:
- del self.__root_stanza[stanza_class]
- else:
- del self.__stanza[stanza_class]
+ Arguments:
+ reconnect -- Flag indicating if the connection
+ and processing should be restarted.
+ Defaults to False.
+ """
+ self.state.set('reconnect', reconnect)
+ if self.state['disconnecting']:
+ return
+ if not self.state['reconnect']:
+ logging.debug("Disconnecting...")
+ self.state.set('disconnecting', True)
+ self.run = False
+ self.scheduler.run = False
+ if self.state['connected']:
+ # Send the end of stream marker.
+ self.send_raw(self.stream_footer)
+ # Wait for confirmation that the stream was
+ # closed in the other direction.
+ time.sleep(1)
+ try:
+ self.socket.close()
+ self.filesocket.close()
+ self.socket.shutdown(socket.SHUT_RDWR)
+ except socket.error as serr:
+ pass
- def removeStanzaExtension(self, stanza_class, stanza_extension):
- stanza_extension[stanza_class].pop(stanza_extension)
+ def reconnect(self):
+ """
+ Reset the stream's state and reconnect to the server.
+ """
+ self.state.set('tls',False)
+ self.state.set('ssl',False)
+ time.sleep(1)
+ self.connect()
- def start_stream_handler(self, xml):
- """Meant to be overridden"""
- pass
+ def set_socket(self, socket):
+ """
+ Set the socket to use for the stream.
+
+ The filesocket will be recreated as well.
+
+ Arguments:
+ socket -- The new socket to use.
+ """
+ self.socket = socket
+ if socket is not None:
+ # ElementTree.iterparse requires a file.
+ # 0 buffer files have to be binary.
+
+ # Use the correct fileobject type based on the Python
+ # version to work around a broken implementation in
+ # Python 2.x.
+ if sys.version_info < (3, 0):
+ self.filesocket = FileSocket(self.socket)
+ else:
+ self.filesocket = self.socket.makefile('rb', 0)
+ self.state.set('connected', True)
+
+ def start_tls(self):
+ """
+ Perform handshakes for TLS.
+
+ If the handshake is successful, the XML stream will need
+ to be restarted.
+ """
+ if self.ssl_support:
+ logging.info("Negotiating TLS")
+ self.socket = ssl.wrap_socket(self.socket,
+ ssl_version=ssl.PROTOCOL_TLSv1,
+ do_handshake_on_connect=False)
+ self.socket.do_handshake()
+ self.set_socket(self.socket)
+ return True
+ else:
+ logging.warning("Tried to enable TLS, but ssl module not found.")
+ return False
+
+ def start_stream_handler(self, xml):
+ """Meant to be overridden"""
+ pass
+
+ def register_stanza(self, stanza_class):
+ """
+ Add a stanza object class as a known root stanza. A root stanza is
+ one that appears as a direct child of the stream's root element.
+
+ Stanzas that appear as substanzas of a root stanza do not need to
+ be registered here. That is done using registerStanzaPlugin() from
+ sleekxmpp.xmlstream.stanzabase.
+
+ Stanzas that are not registered will not be converted into
+ stanza objects, but may still be processed using handlers and
+ matchers.
+
+ Arguments:
+ stanza_class -- The top-level stanza object's class.
+ """
+ self.__root_stanza.append(stanza_class)
+
+ def remove_stanza(self, stanza_class):
+ """
+ Remove a stanza from being a known root stanza. A root stanza is
+ one that appears as a direct child of the stream's root element.
+
+ Stanzas that are not registered will not be converted into
+ stanza objects, but may still be processed using handlers and
+ matchers.
+ """
+ del self.__root_stanza[stanza_class]
+
+ def register_handler(self, handler, before=None, after=None):
+ """
+ Add a stream event handler that will be executed when a matching
+ stanza is received.
+
+ Arguments:
+ handler -- The handler object to execute.
+ """
+ if handler.stream is None:
+ self.__handlers.append(handler)
+ handler.stream = self
+
+ def remove_handler(self, name):
+ """
+ Remove any stream event handlers with the given name.
+
+ Arguments:
+ name -- The name of the handler.
+ """
+ idx = 0
+ for handler in self.__handlers:
+ if handler.name == name:
+ self.__handlers.pop(idx)
+ return True
+ idx += 1
+ return False
+
+ def schedule(self, name, seconds, callback, args=None,
+ kwargs=None, repeat=False):
+ """
+ Schedule a callback function to execute after a given delay.
+
+ Arguments:
+ name -- A unique name for the scheduled callback.
+ seconds -- The time in seconds to wait before executing.
+ callback -- A pointer to the function to execute.
+ args -- A tuple of arguments to pass to the function.
+ kwargs -- A dictionary of keyword arguments to pass to
+ the function.
+ repeat -- Flag indicating if the scheduled event should
+ be reset and repeat after executing.
+ """
+ self.scheduler.add(name, seconds, callback, args, kwargs,
+ repeat, qpointer=self.event_queue)
+
+ def incoming_filter(self, xml):
+ """
+ Filter incoming XML objects before they are processed.
+
+ Possible uses include remapping namespaces, or correcting elements
+ from sources with incorrect behavior.
+
+ Meant to be overridden.
+ """
+ return xml
+
+ def send_raw(self, data):
+ """
+ Send raw data across the stream.
+
+ Arguments:
+ data -- Any string value.
+ """
+ self.send_queue.put(data)
+ return True
+
+ def process(self, threaded=True):
+ """
+ Initialize the XML streams and begin processing events.
+
+ The number of threads used for processing stream events is determined
+ by HANDLER_THREADS.
+
+ Arguments:
+ threaded -- If threaded=True then event dispatcher will run
+ in a separate thread, allowing for the stream to be used
+ in the background for another application. Defaults
+ to True.
+
+ Event handlers and the send queue will be threaded
+ regardless of this parameter's value.
+ """
+ self.scheduler.process(threaded=True)
+
+ def start_thread(name, target):
+ self.__thread[name] = threading.Thread(name=name, target=target)
+ self.__thread[name].start()
+
+ for t in range(0, HANDLER_THREADS):
+ logging.debug("Starting HANDLER THREAD")
+ start_thread('stream_event_handler_%s' % t, self._event_runner)
+
+ start_thread('send_thread', self._send_thread)
+
+ if threaded:
+ # Run the XML stream in the background for another application.
+ start_thread('process', self._process)
+ else:
+ self._process()
+
+ def _process(self):
+ """
+ Start processing the XML streams.
+
+ Processing will continue after any recoverable errors
+ if reconnections are allowed.
+ """
+ firstrun = True
+
+ # The body of this loop will only execute once per connection.
+ # Additional passes will be made only if an error occurs and
+ # reconnecting is permitted.
+ while self.run and (firstrun or self.state['reconnect']):
+ self.state.set('processing', True)
+ firstrun = False
+ try:
+ if self.state['is client']:
+ self.send_raw(self.stream_header)
+ # The call to self.__read_xml will block and prevent
+ # the body of the loop from running until a diconnect
+ # occurs. After any reconnection, the stream header will
+ # be resent and processing will resume.
+ while self.run and self.__read_xml():
+ # Ensure the stream header is sent for any
+ # new connections.
+ if self.state['is client']:
+ self.send_raw(self.stream_header)
+ except KeyboardInterrupt:
+ logging.debug("Keyboard Escape Detected")
+ self.state.set('processing', False)
+ self.state.set('reconnect', False)
+ self.disconnect()
+ self.run = False
+ self.scheduler.run = False
+ self.event_queue.put(('quit', None, None))
+ return
+ except SystemExit:
+ self.event_queue.put(('quit', None, None))
+ return
+ except socket.error:
+ if not self.state.reconnect:
+ return
+ self.state.set('processing', False)
+ logging.exception('Socket Error')
+ self.disconnect(reconnect=True)
+ except:
+ if not self.state.reconnect:
+ return
+ self.state.set('processing', False)
+ logging.exception('Connection error. Reconnecting.')
+ self.disconnect(reconnect=True)
+ if self.state['reconnect']:
+ self.reconnect()
+ self.state.set('processing', False)
+ self.event_queue.put(('quit', None, None))
+
+ def __read_xml(self):
+ """
+ Parse the incoming XML stream, raising stream events for
+ each received stanza.
+ """
+ depth = 0
+ root = None
+ for (event, xml) in ET.iterparse(self.filesocket, (b'end', b'start')):
+ if event == b'start':
+ if depth == 0:
+ # We have received the start of the root element.
+ root = xml
+ # Perform any stream initialization actions, such
+ # as handshakes.
+ self.start_stream_handler(root)
+ depth += 1
+ if event == b'end':
+ depth -= 1
+ if depth == 0:
+ # The stream's root element has closed,
+ # terminating the stream.
+ self.disconnect(reconnect=self.state['reconnect'])
+ logging.debug("Ending read XML loop")
+ return False
+ elif depth == 1:
+ # We only raise events for stanzas that are direct
+ # children of the root element.
+ try:
+ self.__spawn_event(xml)
+ except RestartStream:
+ return True
+ if root:
+ # Keep the root element empty of children to
+ # save on memory use.
+ root.clear()
+ logging.debug("Ending read XML loop")
+
+ def __spawn_event(self, xml):
+ """
+ Analyze incoming XML stanzas and convert them into stanza
+ objects if applicable and queue stream events to be processed
+ by matching handlers.
+
+ Arguments:
+ xml -- The XML stanza to analyze.
+ """
+ logging.debug("RECV: %s" % tostring(xml,
+ xmlns=self.default_ns,
+ stream=self))
+ # Apply any preprocessing filters.
+ xml = self.incoming_filter(xml)
+
+ # Convert the raw XML object into a stanza object. If no registered
+ # stanza type applies, a generic StanzaBase stanza will be used.
+ stanza_type = StanzaBase
+ for stanza_class in self.__root_stanza:
+ if xml.tag == "{%s}%s" % (self.default_ns, stanza_class.name):
+ stanza_type = stanza_class
+ break
+ stanza = stanza_type(self, xml)
+
+ # Match the stanza against registered handlers. Handlers marked
+ # to run "in stream" will be executed immediately; the rest will
+ # be queued.
+ unhandled = True
+ for handler in self.__handlers:
+ if handler.match(stanza):
+ stanza_copy = stanza_type(self, copy.deepcopy(xml))
+ handler.prerun(stanza_copy)
+ self.event_queue.put(('stanza', handler, stanza_copy))
+ if handler.checkDelete():
+ self.__handlers.pop(self.__handlers.index(handler))
+ unhandled = False
+
+ # Some stanzas require responses, such as Iq queries. A default
+ # handler will be executed immediately for this case.
+ if unhandled:
+ stanza.unhandled()
+
+ def _event_runner(self):
+ """
+ Process the event queue and execute handlers.
+
+ The number of event runner threads is controlled by HANDLER_THREADS.
+
+ Stream event handlers will all execute in this thread. Custom event
+ handlers may be spawned in individual threads.
+ """
+ logging.debug("Loading event runner")
+ while self.run:
+ try:
+ event = self.event_queue.get(True, timeout=5)
+ except queue.Empty:
+ event = None
+ except KeyboardInterrupt:
+ self.run = False
+ self.scheduler.run = False
+ if event is None:
+ continue
+
+ etype, handler = event[0:2]
+ args = event[2:]
+
+ if etype == 'stanza':
+ try:
+ handler.run(args[0])
+ except Exception as e:
+ logging.exception('Error processing event handler: %s' % handler.name)
+ args[0].exception(e)
+ elif etype == 'schedule':
+ try:
+ logging.debug(args)
+ handler(*args[0])
+ except:
+ logging.exception('Error processing scheduled task')
+ elif etype == 'quit':
+ logging.debug("Quitting event runner thread")
+ return False
+
+ def _send_thread(self):
+ """
+ Extract stanzas from the send queue and send them on the stream.
+ """
+ while self.run:
+ data = self.send_queue.get(True)
+ logging.debug("SEND: %s" % data)
+ try:
+ self.socket.send(data.encode('utf-8'))
+ except:
+ logging.warning("Failed to send %s" % data)
+ self.state.set('connected', False)
+ if self.state.reconnect:
+ logging.exception("Disconnected. Socket Error.")
+ self.disconnect(reconnect=True)