2010-03-26 21:32:16 +00:00
"""
SleekXMPP : The Sleek XMPP Library
Copyright ( C ) 2010 Nathanael C . Fritz
This file is part of SleekXMPP .
2010-07-20 15:19:49 +00:00
See the file LICENSE for copying permission .
2010-03-26 21:32:16 +00:00
"""
2010-01-08 06:03:02 +00:00
from __future__ import with_statement , unicode_literals
try :
import queue
except ImportError :
import Queue as queue
2009-06-03 22:56:51 +00:00
from . import statemachine
from . stanzabase import StanzaBase
from xml . etree import cElementTree
from xml . parsers import expat
import logging
import socket
2009-08-31 22:46:31 +00:00
import threading
2009-06-03 22:56:51 +00:00
import time
import types
2010-06-04 02:42:11 +00:00
import copy
2009-06-03 22:56:51 +00:00
import xml . sax . saxutils
2010-05-27 01:32:28 +00:00
from . import scheduler
2010-08-06 00:26:41 +00:00
from sleekxmpp . xmlstream . tostring import tostring
2009-06-03 22:56:51 +00:00
2010-07-30 00:16:57 +00:00
RESPONSE_TIMEOUT = 10
2009-09-01 00:06:46 +00:00
HANDLER_THREADS = 1
2009-06-03 22:56:51 +00:00
ssl_support = True
2010-01-08 06:03:02 +00:00
#try:
import ssl
#except ImportError:
# ssl_support = False
import sys
if sys . version_info < ( 3 , 0 ) :
#monkey patch broken filesocket object
from . import filesocket
2010-01-25 18:40:44 +00:00
#socket._fileobject = filesocket.filesocket
2010-08-06 00:26:41 +00:00
2009-06-03 22:56:51 +00:00
class RestartStream ( Exception ) :
pass
class CloseStream ( Exception ) :
pass
stanza_extensions = { }
class XMLStream ( object ) :
" A connection manager with XML events. "
def __init__ ( self , socket = None , host = ' ' , port = 0 , escape_quotes = False ) :
global ssl_support
self . ssl_support = ssl_support
self . escape_quotes = escape_quotes
self . state = statemachine . StateMachine ( )
2010-04-14 02:35:47 +00:00
self . state . addStates ( { ' connected ' : False , ' is client ' : False , ' ssl ' : False , ' tls ' : False , ' reconnect ' : True , ' processing ' : False , ' disconnecting ' : False } ) #set initial states
2009-06-03 22:56:51 +00:00
self . setSocket ( socket )
self . address = ( host , int ( port ) )
self . __thread = { }
2009-12-10 01:23:03 +00:00
self . __root_stanza = [ ]
2009-06-03 22:56:51 +00:00
self . __stanza = { }
self . __stanza_extension = { }
self . __handlers = [ ]
self . __tls_socket = None
2009-08-31 22:46:31 +00:00
self . filesocket = None
2009-06-03 22:56:51 +00:00
self . use_ssl = False
self . use_tls = False
2010-08-06 03:11:22 +00:00
self . default_ns = ' '
2009-06-03 22:56:51 +00:00
self . stream_header = " <stream> "
self . stream_footer = " </stream> "
2009-08-31 22:46:31 +00:00
self . eventqueue = queue . Queue ( )
2010-02-15 10:13:44 +00:00
self . sendqueue = queue . Queue ( )
2010-05-29 02:19:28 +00:00
self . scheduler = scheduler . Scheduler ( self . eventqueue )
2009-08-31 22:46:31 +00:00
2009-06-03 22:56:51 +00:00
self . namespace_map = { }
2009-08-31 22:46:31 +00:00
self . run = True
2010-08-06 00:26:41 +00:00
2009-06-03 22:56:51 +00:00
def setSocket ( self , socket ) :
" Set the socket "
self . socket = socket
if socket is not None :
self . filesocket = socket . makefile ( ' rb ' , 0 ) # ElementTree.iterparse requires a file. 0 buffer files have to be binary
self . state . set ( ' connected ' , True )
2010-08-06 00:26:41 +00:00
2009-06-03 22:56:51 +00:00
def setFileSocket ( self , filesocket ) :
self . filesocket = filesocket
2010-08-06 00:26:41 +00:00
2009-06-03 22:56:51 +00:00
def connect ( self , host = ' ' , port = 0 , use_ssl = False , use_tls = True ) :
" Link to connectTCP "
return self . connectTCP ( host , port , use_ssl , use_tls )
def connectTCP ( self , host = ' ' , port = 0 , use_ssl = None , use_tls = None , reattempt = True ) :
" Connect and create socket "
while reattempt and not self . state [ ' connected ' ] :
if host and port :
self . address = ( host , int ( port ) )
if use_ssl is not None :
self . use_ssl = use_ssl
if use_tls is not None :
self . use_tls = use_tls
self . state . set ( ' is client ' , True )
2010-01-25 18:40:44 +00:00
if sys . version_info < ( 3 , 0 ) :
self . socket = filesocket . Socket26 ( socket . AF_INET , socket . SOCK_STREAM )
else :
self . socket = socket . socket ( socket . AF_INET , socket . SOCK_STREAM )
2010-01-08 06:03:02 +00:00
self . socket . settimeout ( None )
2009-06-03 22:56:51 +00:00
if self . use_ssl and self . ssl_support :
logging . debug ( " Socket Wrapped for SSL " )
self . socket = ssl . wrap_socket ( self . socket )
try :
self . socket . connect ( self . address )
2010-01-08 06:03:02 +00:00
#self.filesocket = self.socket.makefile('rb', 0)
2009-08-31 22:46:31 +00:00
self . filesocket = self . socket . makefile ( ' rb ' , 0 )
2009-06-03 22:56:51 +00:00
self . state . set ( ' connected ' , True )
return True
2009-08-31 22:46:31 +00:00
except socket . error as serr :
logging . error ( " Could not connect. Socket Error # %s : %s " % ( serr . errno , serr . strerror ) )
2009-06-03 22:56:51 +00:00
time . sleep ( 1 )
2010-08-06 00:26:41 +00:00
2009-06-03 22:56:51 +00:00
def connectUnix ( self , filepath ) :
" Connect to Unix file and create socket "
def startTLS ( self ) :
" Handshakes for TLS "
if self . ssl_support :
2009-08-31 22:46:31 +00:00
logging . info ( " Negotiating TLS " )
2009-06-03 22:56:51 +00:00
self . realsocket = self . socket
2009-08-31 22:46:31 +00:00
self . socket = ssl . wrap_socket ( self . socket , ssl_version = ssl . PROTOCOL_TLSv1 , do_handshake_on_connect = False )
self . socket . do_handshake ( )
2010-01-08 06:03:02 +00:00
if sys . version_info < ( 3 , 0 ) :
2010-08-27 15:29:48 +00:00
self . filesocket = filesocket . FileSocket ( self . socket )
2010-01-08 06:03:02 +00:00
else :
self . filesocket = self . socket . makefile ( ' rb ' , 0 )
2009-06-03 22:56:51 +00:00
return True
else :
2009-08-31 22:46:31 +00:00
logging . warning ( " Tried to enable TLS, but ssl module not found. " )
2009-06-03 22:56:51 +00:00
return False
raise RestartStream ( )
2010-08-06 00:26:41 +00:00
2009-06-03 22:56:51 +00:00
def process ( self , threaded = True ) :
2010-05-27 01:32:28 +00:00
self . scheduler . process ( threaded = True )
2009-09-01 00:06:46 +00:00
for t in range ( 0 , HANDLER_THREADS ) :
2010-05-29 02:19:28 +00:00
logging . debug ( " Starting HANDLER THREAD " )
2009-09-01 00:06:46 +00:00
self . __thread [ ' eventhandle %s ' % t ] = threading . Thread ( name = ' eventhandle %s ' % t , target = self . _eventRunner )
self . __thread [ ' eventhandle %s ' % t ] . start ( )
2010-02-15 10:13:44 +00:00
self . __thread [ ' sendthread ' ] = threading . Thread ( name = ' sendthread ' , target = self . _sendThread )
self . __thread [ ' sendthread ' ] . start ( )
2009-06-03 22:56:51 +00:00
if threaded :
2009-08-31 22:46:31 +00:00
self . __thread [ ' process ' ] = threading . Thread ( name = ' process ' , target = self . _process )
self . __thread [ ' process ' ] . start ( )
2009-06-03 22:56:51 +00:00
else :
self . _process ( )
2010-08-06 00:26:41 +00:00
2010-05-27 01:32:28 +00:00
def schedule ( self , name , seconds , callback , args = None , kwargs = None , repeat = False ) :
self . scheduler . add ( name , seconds , callback , args , kwargs , repeat , qpointer = self . eventqueue )
2010-08-06 00:26:41 +00:00
2009-06-03 22:56:51 +00:00
def _process ( self ) :
" Start processing the socket. "
firstrun = True
2010-02-27 02:02:08 +00:00
while self . run and ( firstrun or self . state [ ' reconnect ' ] ) :
2009-06-03 22:56:51 +00:00
self . state . set ( ' processing ' , True )
firstrun = False
try :
if self . state [ ' is client ' ] :
self . sendRaw ( self . stream_header )
2010-04-14 02:35:47 +00:00
while self . run and self . __readXML ( ) :
2009-06-03 22:56:51 +00:00
if self . state [ ' is client ' ] :
self . sendRaw ( self . stream_header )
except KeyboardInterrupt :
logging . debug ( " Keyboard Escape Detected " )
self . state . set ( ' processing ' , False )
2009-06-16 11:59:55 +00:00
self . state . set ( ' reconnect ' , False )
2009-06-03 22:56:51 +00:00
self . disconnect ( )
2009-08-31 22:46:31 +00:00
self . run = False
2010-05-27 11:58:57 +00:00
self . scheduler . run = False
2009-08-31 22:46:31 +00:00
self . eventqueue . put ( ( ' quit ' , None , None ) )
2009-06-25 06:49:58 +00:00
return
2009-06-16 11:59:55 +00:00
except CloseStream :
2009-06-25 06:49:58 +00:00
return
except SystemExit :
2009-08-31 22:46:31 +00:00
self . eventqueue . put ( ( ' quit ' , None , None ) )
2009-06-25 06:49:58 +00:00
return
2009-08-31 22:46:31 +00:00
except socket . error :
2009-06-25 06:49:58 +00:00
if not self . state . reconnect :
return
else :
self . state . set ( ' processing ' , False )
2010-07-27 01:02:25 +00:00
logging . exception ( ' Socket Error ' )
2009-06-25 06:49:58 +00:00
self . disconnect ( reconnect = True )
2009-06-03 22:56:51 +00:00
except :
2009-06-25 06:49:58 +00:00
if not self . state . reconnect :
return
else :
self . state . set ( ' processing ' , False )
2010-07-27 01:02:25 +00:00
logging . exception ( ' Connection error. Reconnecting. ' )
2009-06-25 06:49:58 +00:00
self . disconnect ( reconnect = True )
2009-06-03 22:56:51 +00:00
if self . state [ ' reconnect ' ] :
self . reconnect ( )
self . state . set ( ' processing ' , False )
2009-08-31 22:46:31 +00:00
self . eventqueue . put ( ( ' quit ' , None , None ) )
2009-06-03 22:56:51 +00:00
#self.__thread['readXML'] = threading.Thread(name='readXML', target=self.__readXML)
#self.__thread['readXML'].start()
#self.__thread['spawnEvents'] = threading.Thread(name='spawnEvents', target=self.__spawnEvents)
#self.__thread['spawnEvents'].start()
2010-08-06 00:26:41 +00:00
2009-06-03 22:56:51 +00:00
def __readXML ( self ) :
" Parses the incoming stream, adding to xmlin queue as it goes "
#build cElementTree object from expat was we go
2010-03-15 17:19:45 +00:00
#self.filesocket = self.socket.makefile('rb', 0)
2010-01-08 06:03:02 +00:00
#print self.filesocket.read(1024) #self.filesocket._sock.recv(1024)
2009-06-03 22:56:51 +00:00
edepth = 0
root = None
2009-08-31 22:46:31 +00:00
for ( event , xmlobj ) in cElementTree . iterparse ( self . filesocket , ( b ' end ' , b ' start ' ) ) :
2009-06-03 22:56:51 +00:00
if edepth == 0 : # and xmlobj.tag.split('}', 1)[-1] == self.basetag:
2009-08-31 22:46:31 +00:00
if event == b ' start ' :
2009-06-03 22:56:51 +00:00
root = xmlobj
self . start_stream_handler ( root )
2009-08-31 22:46:31 +00:00
if event == b ' end ' :
2009-06-03 22:56:51 +00:00
edepth + = - 1
2009-08-31 22:46:31 +00:00
if edepth == 0 and event == b ' end ' :
2010-04-14 02:35:47 +00:00
self . disconnect ( reconnect = self . state [ ' reconnect ' ] )
2010-05-27 11:58:57 +00:00
logging . debug ( " Ending readXML loop " )
2009-06-03 22:56:51 +00:00
return False
elif edepth == 1 :
#self.xmlin.put(xmlobj)
try :
self . __spawnEvent ( xmlobj )
except RestartStream :
return True
except CloseStream :
2010-05-27 11:58:57 +00:00
logging . debug ( " Ending readXML loop " )
2009-06-03 22:56:51 +00:00
return False
if root :
root . clear ( )
2009-08-31 22:46:31 +00:00
if event == b ' start ' :
2009-06-03 22:56:51 +00:00
edepth + = 1
2010-05-27 11:58:57 +00:00
logging . debug ( " Ending readXML loop " )
2010-08-06 00:26:41 +00:00
2010-02-15 10:13:44 +00:00
def _sendThread ( self ) :
2010-02-27 02:02:08 +00:00
while self . run :
2010-02-15 10:13:44 +00:00
data = self . sendqueue . get ( True )
logging . debug ( " SEND: %s " % data )
try :
self . socket . send ( data . encode ( ' utf-8 ' ) )
#self.socket.send(bytes(data, "utf-8"))
#except socket.error,(errno, strerror):
except :
2010-04-14 02:35:47 +00:00
logging . warning ( " Failed to send %s " % data )
2010-02-15 10:13:44 +00:00
self . state . set ( ' connected ' , False )
if self . state . reconnect :
2010-07-27 01:02:25 +00:00
logging . exception ( " Disconnected. Socket Error. " )
2010-02-15 10:13:44 +00:00
self . disconnect ( reconnect = True )
2010-08-06 00:26:41 +00:00
2009-06-03 22:56:51 +00:00
def sendRaw ( self , data ) :
2010-02-15 10:13:44 +00:00
self . sendqueue . put ( data )
2009-06-03 22:56:51 +00:00
return True
2010-08-06 00:26:41 +00:00
2009-06-03 22:56:51 +00:00
def disconnect ( self , reconnect = False ) :
self . state . set ( ' reconnect ' , reconnect )
2010-04-14 02:35:47 +00:00
if self . state [ ' disconnecting ' ] :
return
if not self . state [ ' reconnect ' ] :
logging . debug ( " Disconnecting... " )
self . state . set ( ' disconnecting ' , True )
self . run = False
2010-05-27 11:58:57 +00:00
self . scheduler . run = False
2009-06-03 22:56:51 +00:00
if self . state [ ' connected ' ] :
self . sendRaw ( self . stream_footer )
2010-04-14 02:35:47 +00:00
time . sleep ( 1 )
2009-06-03 22:56:51 +00:00
#send end of stream
#wait for end of stream back
try :
self . socket . close ( )
self . filesocket . close ( )
self . socket . shutdown ( socket . SHUT_RDWR )
2009-08-31 22:46:31 +00:00
except socket . error as serr :
2009-06-25 06:49:58 +00:00
#logging.warning("Error while disconnecting. Socket Error #%s: %s" % (errno, strerror))
#thread.exit_thread()
pass
2009-06-03 22:56:51 +00:00
if self . state [ ' processing ' ] :
2009-06-25 06:49:58 +00:00
#raise CloseStream
pass
2010-08-06 00:26:41 +00:00
2009-06-03 22:56:51 +00:00
def reconnect ( self ) :
self . state . set ( ' tls ' , False )
self . state . set ( ' ssl ' , False )
time . sleep ( 1 )
self . connect ( )
2010-08-06 00:26:41 +00:00
2009-07-11 21:46:31 +00:00
def incoming_filter ( self , xmlobj ) :
return xmlobj
2010-08-06 00:26:41 +00:00
2009-06-03 22:56:51 +00:00
def __spawnEvent ( self , xmlobj ) :
" watching xmlOut and processes handlers "
#convert XML into Stanza
2010-08-06 03:11:22 +00:00
logging . debug ( " RECV: %s " % tostring ( xmlobj , xmlns = self . default_ns , stream = self ) )
2009-07-11 21:46:31 +00:00
xmlobj = self . incoming_filter ( xmlobj )
2010-06-04 02:42:11 +00:00
stanza_type = StanzaBase
2009-06-03 22:56:51 +00:00
for stanza_class in self . __root_stanza :
2009-12-10 01:23:03 +00:00
if xmlobj . tag == " { %s } %s " % ( self . default_ns , stanza_class . name ) :
2010-06-04 02:42:11 +00:00
stanza_type = stanza_class
2009-06-03 22:56:51 +00:00
break
2009-12-22 10:05:53 +00:00
unhandled = True
2010-06-04 02:42:11 +00:00
stanza = stanza_type ( self , xmlobj )
2009-06-03 22:56:51 +00:00
for handler in self . __handlers :
2010-01-16 05:07:28 +00:00
if handler . match ( stanza ) :
2010-06-04 02:42:11 +00:00
stanza_copy = stanza_type ( self , copy . deepcopy ( xmlobj ) )
handler . prerun ( stanza_copy )
self . eventqueue . put ( ( ' stanza ' , handler , stanza_copy ) )
2009-06-03 22:56:51 +00:00
if handler . checkDelete ( ) : self . __handlers . pop ( self . __handlers . index ( handler ) )
2009-12-22 10:05:53 +00:00
unhandled = False
if unhandled :
stanza . unhandled ( )
2009-06-03 22:56:51 +00:00
#loop through handlers and test match
#spawn threads as necessary, call handlers, sending Stanza
2010-08-06 00:26:41 +00:00
2009-08-31 22:46:31 +00:00
def _eventRunner ( self ) :
logging . debug ( " Loading event runner " )
while self . run :
try :
event = self . eventqueue . get ( True , timeout = 5 )
except queue . Empty :
2009-09-01 00:06:46 +00:00
event = None
2010-05-29 02:19:28 +00:00
except KeyboardInterrupt :
self . run = False
self . scheduler . run = False
2009-08-31 22:46:31 +00:00
if event is not None :
2010-01-08 06:03:02 +00:00
etype = event [ 0 ]
handler = event [ 1 ]
args = event [ 2 : ]
#etype, handler, *args = event #python 3.x way
2009-08-31 22:46:31 +00:00
if etype == ' stanza ' :
2009-12-17 01:54:22 +00:00
try :
handler . run ( args [ 0 ] )
2010-01-05 21:56:48 +00:00
except Exception as e :
2010-07-27 01:02:25 +00:00
logging . exception ( ' Error processing event handler: %s ' % handler . name )
2010-01-05 21:56:48 +00:00
args [ 0 ] . exception ( e )
2010-05-27 01:32:28 +00:00
elif etype == ' schedule ' :
2009-12-17 01:54:22 +00:00
try :
2010-05-27 11:58:57 +00:00
logging . debug ( args )
handler ( * args [ 0 ] )
2009-12-17 01:54:22 +00:00
except :
2010-07-27 01:02:25 +00:00
logging . exception ( ' Error processing scheduled task ' )
2009-12-17 01:54:22 +00:00
elif etype == ' quit ' :
2009-08-31 22:46:31 +00:00
logging . debug ( " Quitting eventRunner thread " )
return False
2010-08-06 00:26:41 +00:00
2009-06-03 22:56:51 +00:00
def registerHandler ( self , handler , before = None , after = None ) :
2010-08-27 20:42:26 +00:00
" Add handler with matcher class and parameters. "
if handler . stream is None :
self . __handlers . append ( handler )
handler . stream = self
2010-08-06 00:26:41 +00:00
2009-06-03 22:56:51 +00:00
def removeHandler ( self , name ) :
" Removes the handler. "
idx = 0
for handler in self . __handlers :
if handler . name == name :
self . __handlers . pop ( idx )
2010-08-27 20:42:26 +00:00
return True
2009-06-03 22:56:51 +00:00
idx + = 1
2010-08-27 20:42:26 +00:00
return False
2010-08-06 00:26:41 +00:00
2009-12-10 01:23:03 +00:00
def registerStanza ( self , stanza_class ) :
2009-06-03 22:56:51 +00:00
" Adds stanza. If root stanzas build stanzas sent in events while non-root stanzas build substanza objects. "
2009-12-10 01:23:03 +00:00
self . __root_stanza . append ( stanza_class )
2010-08-06 00:26:41 +00:00
2009-06-03 22:56:51 +00:00
def registerStanzaExtension ( self , stanza_class , stanza_extension ) :
if stanza_class not in stanza_extensions :
stanza_extensions [ stanza_class ] = [ stanza_extension ]
else :
stanza_extensions [ stanza_class ] . append ( stanza_extension )
2010-08-06 00:26:41 +00:00
2009-06-03 22:56:51 +00:00
def removeStanza ( self , stanza_class , root = False ) :
" Removes the stanza ' s registration. "
if root :
del self . __root_stanza [ stanza_class ]
else :
del self . __stanza [ stanza_class ]
2010-08-06 00:26:41 +00:00
2009-06-03 22:56:51 +00:00
def removeStanzaExtension ( self , stanza_class , stanza_extension ) :
stanza_extension [ stanza_class ] . pop ( stanza_extension )
def start_stream_handler ( self , xml ) :
""" Meant to be overridden """
pass