2010-03-26 21:32:16 +00:00
"""
SleekXMPP : The Sleek XMPP Library
Copyright ( C ) 2010 Nathanael C . Fritz
This file is part of SleekXMPP .
2010-07-20 15:19:49 +00:00
See the file LICENSE for copying permission .
2010-03-26 21:32:16 +00:00
"""
2010-01-08 06:03:02 +00:00
from __future__ import with_statement , unicode_literals
try :
import queue
except ImportError :
import Queue as queue
2009-06-03 22:56:51 +00:00
from . import statemachine
from . stanzabase import StanzaBase
from xml . etree import cElementTree
from xml . parsers import expat
import logging
import socket
2009-08-31 22:46:31 +00:00
import threading
2009-06-03 22:56:51 +00:00
import time
import traceback
import types
2010-06-04 02:42:11 +00:00
import copy
2009-06-03 22:56:51 +00:00
import xml . sax . saxutils
2010-05-27 01:32:28 +00:00
from . import scheduler
2009-06-03 22:56:51 +00:00
2009-09-01 00:06:46 +00:00
HANDLER_THREADS = 1
2009-06-03 22:56:51 +00:00
ssl_support = True
2010-01-08 06:03:02 +00:00
#try:
import ssl
#except ImportError:
# ssl_support = False
import sys
if sys . version_info < ( 3 , 0 ) :
#monkey patch broken filesocket object
from . import filesocket
2010-01-25 18:40:44 +00:00
#socket._fileobject = filesocket.filesocket
2009-06-03 22:56:51 +00:00
class RestartStream ( Exception ) :
pass
class CloseStream ( Exception ) :
pass
stanza_extensions = { }
class XMLStream ( object ) :
" A connection manager with XML events. "
def __init__ ( self , socket = None , host = ' ' , port = 0 , escape_quotes = False ) :
global ssl_support
self . ssl_support = ssl_support
self . escape_quotes = escape_quotes
self . state = statemachine . StateMachine ( )
2010-04-14 02:35:47 +00:00
self . state . addStates ( { ' connected ' : False , ' is client ' : False , ' ssl ' : False , ' tls ' : False , ' reconnect ' : True , ' processing ' : False , ' disconnecting ' : False } ) #set initial states
2009-06-03 22:56:51 +00:00
self . setSocket ( socket )
self . address = ( host , int ( port ) )
self . __thread = { }
2009-12-10 01:23:03 +00:00
self . __root_stanza = [ ]
2009-06-03 22:56:51 +00:00
self . __stanza = { }
self . __stanza_extension = { }
self . __handlers = [ ]
self . __tls_socket = None
2009-08-31 22:46:31 +00:00
self . filesocket = None
2009-06-03 22:56:51 +00:00
self . use_ssl = False
self . use_tls = False
self . stream_header = " <stream> "
self . stream_footer = " </stream> "
2009-08-31 22:46:31 +00:00
self . eventqueue = queue . Queue ( )
2010-02-15 10:13:44 +00:00
self . sendqueue = queue . Queue ( )
2010-05-29 02:19:28 +00:00
self . scheduler = scheduler . Scheduler ( self . eventqueue )
2009-08-31 22:46:31 +00:00
2009-06-03 22:56:51 +00:00
self . namespace_map = { }
2009-08-31 22:46:31 +00:00
self . run = True
2009-06-03 22:56:51 +00:00
def setSocket ( self , socket ) :
" Set the socket "
self . socket = socket
if socket is not None :
self . filesocket = socket . makefile ( ' rb ' , 0 ) # ElementTree.iterparse requires a file. 0 buffer files have to be binary
self . state . set ( ' connected ' , True )
def setFileSocket ( self , filesocket ) :
self . filesocket = filesocket
def connect ( self , host = ' ' , port = 0 , use_ssl = False , use_tls = True ) :
" Link to connectTCP "
return self . connectTCP ( host , port , use_ssl , use_tls )
def connectTCP ( self , host = ' ' , port = 0 , use_ssl = None , use_tls = None , reattempt = True ) :
" Connect and create socket "
while reattempt and not self . state [ ' connected ' ] :
if host and port :
self . address = ( host , int ( port ) )
if use_ssl is not None :
self . use_ssl = use_ssl
if use_tls is not None :
self . use_tls = use_tls
self . state . set ( ' is client ' , True )
2010-01-25 18:40:44 +00:00
if sys . version_info < ( 3 , 0 ) :
self . socket = filesocket . Socket26 ( socket . AF_INET , socket . SOCK_STREAM )
else :
self . socket = socket . socket ( socket . AF_INET , socket . SOCK_STREAM )
2010-01-08 06:03:02 +00:00
self . socket . settimeout ( None )
2009-06-03 22:56:51 +00:00
if self . use_ssl and self . ssl_support :
logging . debug ( " Socket Wrapped for SSL " )
self . socket = ssl . wrap_socket ( self . socket )
try :
self . socket . connect ( self . address )
2010-01-08 06:03:02 +00:00
#self.filesocket = self.socket.makefile('rb', 0)
2009-08-31 22:46:31 +00:00
self . filesocket = self . socket . makefile ( ' rb ' , 0 )
2009-06-03 22:56:51 +00:00
self . state . set ( ' connected ' , True )
return True
2009-08-31 22:46:31 +00:00
except socket . error as serr :
logging . error ( " Could not connect. Socket Error # %s : %s " % ( serr . errno , serr . strerror ) )
2009-06-03 22:56:51 +00:00
time . sleep ( 1 )
def connectUnix ( self , filepath ) :
" Connect to Unix file and create socket "
def startTLS ( self ) :
" Handshakes for TLS "
if self . ssl_support :
2009-08-31 22:46:31 +00:00
logging . info ( " Negotiating TLS " )
2009-06-03 22:56:51 +00:00
self . realsocket = self . socket
2009-08-31 22:46:31 +00:00
self . socket = ssl . wrap_socket ( self . socket , ssl_version = ssl . PROTOCOL_TLSv1 , do_handshake_on_connect = False )
self . socket . do_handshake ( )
2010-01-08 06:03:02 +00:00
if sys . version_info < ( 3 , 0 ) :
from . filesocket import filesocket
self . filesocket = filesocket ( self . socket )
else :
self . filesocket = self . socket . makefile ( ' rb ' , 0 )
2009-06-03 22:56:51 +00:00
return True
else :
2009-08-31 22:46:31 +00:00
logging . warning ( " Tried to enable TLS, but ssl module not found. " )
2009-06-03 22:56:51 +00:00
return False
raise RestartStream ( )
def process ( self , threaded = True ) :
2010-05-27 01:32:28 +00:00
self . scheduler . process ( threaded = True )
2009-09-01 00:06:46 +00:00
for t in range ( 0 , HANDLER_THREADS ) :
2010-05-29 02:19:28 +00:00
logging . debug ( " Starting HANDLER THREAD " )
2009-09-01 00:06:46 +00:00
self . __thread [ ' eventhandle %s ' % t ] = threading . Thread ( name = ' eventhandle %s ' % t , target = self . _eventRunner )
self . __thread [ ' eventhandle %s ' % t ] . start ( )
2010-02-15 10:13:44 +00:00
self . __thread [ ' sendthread ' ] = threading . Thread ( name = ' sendthread ' , target = self . _sendThread )
self . __thread [ ' sendthread ' ] . start ( )
2009-06-03 22:56:51 +00:00
if threaded :
2009-08-31 22:46:31 +00:00
self . __thread [ ' process ' ] = threading . Thread ( name = ' process ' , target = self . _process )
self . __thread [ ' process ' ] . start ( )
2009-06-03 22:56:51 +00:00
else :
self . _process ( )
2010-05-27 01:32:28 +00:00
def schedule ( self , name , seconds , callback , args = None , kwargs = None , repeat = False ) :
self . scheduler . add ( name , seconds , callback , args , kwargs , repeat , qpointer = self . eventqueue )
2010-02-15 10:13:44 +00:00
2009-06-03 22:56:51 +00:00
def _process ( self ) :
" Start processing the socket. "
firstrun = True
2010-02-27 02:02:08 +00:00
while self . run and ( firstrun or self . state [ ' reconnect ' ] ) :
2009-06-03 22:56:51 +00:00
self . state . set ( ' processing ' , True )
firstrun = False
try :
if self . state [ ' is client ' ] :
self . sendRaw ( self . stream_header )
2010-04-14 02:35:47 +00:00
while self . run and self . __readXML ( ) :
2009-06-03 22:56:51 +00:00
if self . state [ ' is client ' ] :
self . sendRaw ( self . stream_header )
except KeyboardInterrupt :
logging . debug ( " Keyboard Escape Detected " )
self . state . set ( ' processing ' , False )
2009-06-16 11:59:55 +00:00
self . state . set ( ' reconnect ' , False )
2009-06-03 22:56:51 +00:00
self . disconnect ( )
2009-08-31 22:46:31 +00:00
self . run = False
2010-05-27 11:58:57 +00:00
self . scheduler . run = False
2009-08-31 22:46:31 +00:00
self . eventqueue . put ( ( ' quit ' , None , None ) )
2009-06-25 06:49:58 +00:00
return
2009-06-16 11:59:55 +00:00
except CloseStream :
2009-06-25 06:49:58 +00:00
return
except SystemExit :
2009-08-31 22:46:31 +00:00
self . eventqueue . put ( ( ' quit ' , None , None ) )
2009-06-25 06:49:58 +00:00
return
2009-08-31 22:46:31 +00:00
except socket . error :
2009-06-25 06:49:58 +00:00
if not self . state . reconnect :
return
else :
self . state . set ( ' processing ' , False )
traceback . print_exc ( )
self . disconnect ( reconnect = True )
2009-06-03 22:56:51 +00:00
except :
2009-06-25 06:49:58 +00:00
if not self . state . reconnect :
return
else :
self . state . set ( ' processing ' , False )
traceback . print_exc ( )
self . disconnect ( reconnect = True )
2009-06-03 22:56:51 +00:00
if self . state [ ' reconnect ' ] :
self . reconnect ( )
self . state . set ( ' processing ' , False )
2009-08-31 22:46:31 +00:00
self . eventqueue . put ( ( ' quit ' , None , None ) )
2009-06-03 22:56:51 +00:00
#self.__thread['readXML'] = threading.Thread(name='readXML', target=self.__readXML)
#self.__thread['readXML'].start()
#self.__thread['spawnEvents'] = threading.Thread(name='spawnEvents', target=self.__spawnEvents)
#self.__thread['spawnEvents'].start()
def __readXML ( self ) :
" Parses the incoming stream, adding to xmlin queue as it goes "
#build cElementTree object from expat was we go
2010-03-15 17:19:45 +00:00
#self.filesocket = self.socket.makefile('rb', 0)
2010-01-08 06:03:02 +00:00
#print self.filesocket.read(1024) #self.filesocket._sock.recv(1024)
2009-06-03 22:56:51 +00:00
edepth = 0
root = None
2009-08-31 22:46:31 +00:00
for ( event , xmlobj ) in cElementTree . iterparse ( self . filesocket , ( b ' end ' , b ' start ' ) ) :
2009-06-03 22:56:51 +00:00
if edepth == 0 : # and xmlobj.tag.split('}', 1)[-1] == self.basetag:
2009-08-31 22:46:31 +00:00
if event == b ' start ' :
2009-06-03 22:56:51 +00:00
root = xmlobj
self . start_stream_handler ( root )
2009-08-31 22:46:31 +00:00
if event == b ' end ' :
2009-06-03 22:56:51 +00:00
edepth + = - 1
2009-08-31 22:46:31 +00:00
if edepth == 0 and event == b ' end ' :
2010-04-14 02:35:47 +00:00
self . disconnect ( reconnect = self . state [ ' reconnect ' ] )
2010-05-27 11:58:57 +00:00
logging . debug ( " Ending readXML loop " )
2009-06-03 22:56:51 +00:00
return False
elif edepth == 1 :
#self.xmlin.put(xmlobj)
try :
self . __spawnEvent ( xmlobj )
except RestartStream :
return True
except CloseStream :
2010-05-27 11:58:57 +00:00
logging . debug ( " Ending readXML loop " )
2009-06-03 22:56:51 +00:00
return False
if root :
root . clear ( )
2009-08-31 22:46:31 +00:00
if event == b ' start ' :
2009-06-03 22:56:51 +00:00
edepth + = 1
2010-05-27 11:58:57 +00:00
logging . debug ( " Ending readXML loop " )
2009-06-03 22:56:51 +00:00
2010-02-15 10:13:44 +00:00
def _sendThread ( self ) :
2010-02-27 02:02:08 +00:00
while self . run :
2010-02-15 10:13:44 +00:00
data = self . sendqueue . get ( True )
logging . debug ( " SEND: %s " % data )
try :
self . socket . send ( data . encode ( ' utf-8 ' ) )
#self.socket.send(bytes(data, "utf-8"))
#except socket.error,(errno, strerror):
except :
2010-04-14 02:35:47 +00:00
logging . warning ( " Failed to send %s " % data )
2010-02-15 10:13:44 +00:00
self . state . set ( ' connected ' , False )
if self . state . reconnect :
logging . error ( " Disconnected. Socket Error. " )
traceback . print_exc ( )
self . disconnect ( reconnect = True )
2009-06-03 22:56:51 +00:00
def sendRaw ( self , data ) :
2010-02-15 10:13:44 +00:00
self . sendqueue . put ( data )
2009-06-03 22:56:51 +00:00
return True
def disconnect ( self , reconnect = False ) :
self . state . set ( ' reconnect ' , reconnect )
2010-04-14 02:35:47 +00:00
if self . state [ ' disconnecting ' ] :
return
if not self . state [ ' reconnect ' ] :
logging . debug ( " Disconnecting... " )
self . state . set ( ' disconnecting ' , True )
self . run = False
2010-05-27 11:58:57 +00:00
self . scheduler . run = False
2009-06-03 22:56:51 +00:00
if self . state [ ' connected ' ] :
self . sendRaw ( self . stream_footer )
2010-04-14 02:35:47 +00:00
time . sleep ( 1 )
2009-06-03 22:56:51 +00:00
#send end of stream
#wait for end of stream back
try :
self . socket . close ( )
self . filesocket . close ( )
self . socket . shutdown ( socket . SHUT_RDWR )
2009-08-31 22:46:31 +00:00
except socket . error as serr :
2009-06-25 06:49:58 +00:00
#logging.warning("Error while disconnecting. Socket Error #%s: %s" % (errno, strerror))
#thread.exit_thread()
pass
2009-06-03 22:56:51 +00:00
if self . state [ ' processing ' ] :
2009-06-25 06:49:58 +00:00
#raise CloseStream
pass
2009-06-03 22:56:51 +00:00
def reconnect ( self ) :
self . state . set ( ' tls ' , False )
self . state . set ( ' ssl ' , False )
time . sleep ( 1 )
self . connect ( )
2009-07-11 21:46:31 +00:00
def incoming_filter ( self , xmlobj ) :
return xmlobj
2009-06-03 22:56:51 +00:00
def __spawnEvent ( self , xmlobj ) :
" watching xmlOut and processes handlers "
#convert XML into Stanza
2009-09-05 07:38:29 +00:00
logging . debug ( " RECV: %s " % cElementTree . tostring ( xmlobj ) )
2009-07-11 21:46:31 +00:00
xmlobj = self . incoming_filter ( xmlobj )
2010-06-04 02:42:11 +00:00
stanza_type = StanzaBase
2009-06-03 22:56:51 +00:00
for stanza_class in self . __root_stanza :
2009-12-10 01:23:03 +00:00
if xmlobj . tag == " { %s } %s " % ( self . default_ns , stanza_class . name ) :
2010-06-04 02:42:11 +00:00
stanza_type = stanza_class
2009-06-03 22:56:51 +00:00
break
2009-12-22 10:05:53 +00:00
unhandled = True
2010-06-04 02:42:11 +00:00
stanza = stanza_type ( self , xmlobj )
2009-06-03 22:56:51 +00:00
for handler in self . __handlers :
2010-01-16 05:07:28 +00:00
if handler . match ( stanza ) :
2010-06-04 02:42:11 +00:00
stanza_copy = stanza_type ( self , copy . deepcopy ( xmlobj ) )
handler . prerun ( stanza_copy )
self . eventqueue . put ( ( ' stanza ' , handler , stanza_copy ) )
2009-06-03 22:56:51 +00:00
if handler . checkDelete ( ) : self . __handlers . pop ( self . __handlers . index ( handler ) )
2009-12-22 10:05:53 +00:00
unhandled = False
if unhandled :
stanza . unhandled ( )
2009-06-03 22:56:51 +00:00
#loop through handlers and test match
#spawn threads as necessary, call handlers, sending Stanza
2009-08-31 22:46:31 +00:00
def _eventRunner ( self ) :
logging . debug ( " Loading event runner " )
while self . run :
try :
event = self . eventqueue . get ( True , timeout = 5 )
except queue . Empty :
2009-09-01 00:06:46 +00:00
event = None
2010-05-29 02:19:28 +00:00
except KeyboardInterrupt :
self . run = False
self . scheduler . run = False
2009-08-31 22:46:31 +00:00
if event is not None :
2010-01-08 06:03:02 +00:00
etype = event [ 0 ]
handler = event [ 1 ]
args = event [ 2 : ]
#etype, handler, *args = event #python 3.x way
2009-08-31 22:46:31 +00:00
if etype == ' stanza ' :
2009-12-17 01:54:22 +00:00
try :
handler . run ( args [ 0 ] )
2010-01-05 21:56:48 +00:00
except Exception as e :
2010-01-25 18:40:44 +00:00
traceback . print_exc ( )
2010-01-05 21:56:48 +00:00
args [ 0 ] . exception ( e )
2010-05-27 01:32:28 +00:00
elif etype == ' schedule ' :
2009-12-17 01:54:22 +00:00
try :
2010-05-27 11:58:57 +00:00
logging . debug ( args )
handler ( * args [ 0 ] )
2009-12-17 01:54:22 +00:00
except :
logging . error ( traceback . format_exc ( ) )
elif etype == ' quit ' :
2009-08-31 22:46:31 +00:00
logging . debug ( " Quitting eventRunner thread " )
return False
2009-06-03 22:56:51 +00:00
def registerHandler ( self , handler , before = None , after = None ) :
" Add handler with matcher class and parameters. "
self . __handlers . append ( handler )
def removeHandler ( self , name ) :
" Removes the handler. "
idx = 0
for handler in self . __handlers :
if handler . name == name :
self . __handlers . pop ( idx )
return
idx + = 1
2009-12-10 01:23:03 +00:00
def registerStanza ( self , stanza_class ) :
2009-06-03 22:56:51 +00:00
" Adds stanza. If root stanzas build stanzas sent in events while non-root stanzas build substanza objects. "
2009-12-10 01:23:03 +00:00
self . __root_stanza . append ( stanza_class )
2009-06-03 22:56:51 +00:00
def registerStanzaExtension ( self , stanza_class , stanza_extension ) :
if stanza_class not in stanza_extensions :
stanza_extensions [ stanza_class ] = [ stanza_extension ]
else :
stanza_extensions [ stanza_class ] . append ( stanza_extension )
def removeStanza ( self , stanza_class , root = False ) :
" Removes the stanza ' s registration. "
if root :
del self . __root_stanza [ stanza_class ]
else :
del self . __stanza [ stanza_class ]
def removeStanzaExtension ( self , stanza_class , stanza_extension ) :
stanza_extension [ stanza_class ] . pop ( stanza_extension )
def tostring ( self , xml , xmlns = ' ' , stringbuffer = ' ' ) :
newoutput = [ stringbuffer ]
#TODO respect ET mapped namespaces
itag = xml . tag . split ( ' } ' , 1 ) [ - 1 ]
if ' } ' in xml . tag :
ixmlns = xml . tag . split ( ' } ' , 1 ) [ 0 ] [ 1 : ]
else :
ixmlns = ' '
nsbuffer = ' '
if xmlns != ixmlns and ixmlns != ' ' :
if ixmlns in self . namespace_map :
if self . namespace_map [ ixmlns ] != ' ' :
itag = " %s : %s " % ( self . namespace_map [ ixmlns ] , itag )
else :
nsbuffer = """ xmlns= " %s \" """ % ixmlns
newoutput . append ( " < %s " % itag )
newoutput . append ( nsbuffer )
for attrib in xml . attrib :
newoutput . append ( """ %s = " %s \" """ % ( attrib , self . xmlesc ( xml . attrib [ attrib ] ) ) )
if len ( xml ) or xml . text or xml . tail :
newoutput . append ( " > " )
if xml . text :
newoutput . append ( self . xmlesc ( xml . text ) )
if len ( xml ) :
for child in xml . getchildren ( ) :
newoutput . append ( self . tostring ( child , ixmlns ) )
newoutput . append ( " </ %s > " % ( itag , ) )
if xml . tail :
newoutput . append ( self . xmlesc ( xml . tail ) )
elif xml . text :
newoutput . append ( " > %s </ %s > " % ( self . xmlesc ( xml . text ) , itag ) )
else :
newoutput . append ( " /> " )
return ' ' . join ( newoutput )
def xmlesc ( self , text ) :
2009-08-31 22:46:31 +00:00
text = list ( text )
2009-06-03 22:56:51 +00:00
cc = 0
matches = ( ' & ' , ' < ' , ' " ' , ' > ' , " ' " )
for c in text :
if c in matches :
if c == ' & ' :
2009-08-31 22:46:31 +00:00
text [ cc ] = ' & '
2009-06-03 22:56:51 +00:00
elif c == ' < ' :
2009-08-31 22:46:31 +00:00
text [ cc ] = ' < '
2009-06-03 22:56:51 +00:00
elif c == ' > ' :
2009-08-31 22:46:31 +00:00
text [ cc ] = ' > '
2009-06-03 22:56:51 +00:00
elif c == " ' " :
2009-08-31 22:46:31 +00:00
text [ cc ] = ' ' '
2009-06-03 22:56:51 +00:00
elif self . escape_quotes :
2009-08-31 22:46:31 +00:00
text [ cc ] = ' " '
2009-06-03 22:56:51 +00:00
cc + = 1
return ' ' . join ( text )
def start_stream_handler ( self , xml ) :
""" Meant to be overridden """
pass