fixed some major reconnection errors

This commit is contained in:
Thom Nichols 2010-06-01 22:51:49 -04:00
parent 1780ca900a
commit 4eb210bff5
5 changed files with 138 additions and 124 deletions

View file

@ -94,6 +94,8 @@ class ClientXMPP(basexmpp, XMLStream):
"""Connect to the Jabber Server. Attempts SRV lookup, and if it fails, uses """Connect to the Jabber Server. Attempts SRV lookup, and if it fails, uses
the JID server.""" the JID server."""
if self.state['connected']: return True
if host: if host:
self.server = host self.server = host
if port is None: port = self.port if port is None: port = self.port
@ -174,6 +176,7 @@ class ClientXMPP(basexmpp, XMLStream):
self._handleRoster(iq, request=True) self._handleRoster(iq, request=True)
def _handleStreamFeatures(self, features): def _handleStreamFeatures(self, features):
logging.debug('handling stream features')
self.features = [] self.features = []
for sub in features.xml: for sub in features.xml:
self.features.append(sub.tag) self.features.append(sub.tag)
@ -181,12 +184,16 @@ class ClientXMPP(basexmpp, XMLStream):
for feature in self.registered_features: for feature in self.registered_features:
if feature[0].match(subelement): if feature[0].match(subelement):
#if self.maskcmp(subelement, feature[0], True): #if self.maskcmp(subelement, feature[0], True):
# This calls the feature handler & optionally breaks
if feature[1](subelement) and feature[2]: #if breaker, don't continue if feature[1](subelement) and feature[2]: #if breaker, don't continue
return True return True
def handler_starttls(self, xml): def handler_starttls(self, xml):
logging.debug( 'TLS start handler; SSL support: %s', self.ssl_support )
if not self.authenticated and self.ssl_support: if not self.authenticated and self.ssl_support:
self.add_handler("<proceed xmlns='urn:ietf:params:xml:ns:xmpp-tls' />", self.handler_tls_start, instream=True) _stanza = "<proceed xmlns='urn:ietf:params:xml:ns:xmpp-tls' />"
if not self.event_handlers.get(_stanza,None): # don't add handler > once
self.add_handler( _stanza, self.handler_tls_start, instream=True )
self.sendXML(xml) self.sendXML(xml)
return True return True
else: else:
@ -221,12 +228,13 @@ class ClientXMPP(basexmpp, XMLStream):
return True return True
def handler_auth_success(self, xml): def handler_auth_success(self, xml):
logging.debug("Authentication successful.")
self.authenticated = True self.authenticated = True
self.features = [] self.features = []
raise RestartStream() raise RestartStream()
def handler_auth_fail(self, xml): def handler_auth_fail(self, xml):
logging.info("Authentication failed.") logging.warning("Authentication failed.")
self.disconnect() self.disconnect()
self.event("failed_auth") self.event("failed_auth")

View file

@ -84,7 +84,7 @@ class basexmpp(object):
self.resource = self.getjidresource(jid) self.resource = self.getjidresource(jid)
self.jid = self.getjidbare(jid) self.jid = self.getjidbare(jid)
self.username = jid.split('@', 1)[0] self.username = jid.split('@', 1)[0]
self.server = jid.split('@',1)[-1].split('/', 1)[0] self.domain = jid.split('@',1)[-1].split('/', 1)[0]
def process(self, *args, **kwargs): def process(self, *args, **kwargs):
for idx in self.plugin: for idx in self.plugin:

View file

@ -18,7 +18,7 @@ class BaseHandler(object):
def match(self, xml): def match(self, xml):
return self._matcher.match(xml) return self._matcher.match(xml)
def prerun(self, payload): def prerun(self, payload): # what's the point of this if the payload is called again in run??
self._payload = payload self._payload = payload
def run(self, payload): def run(self, payload):

View file

@ -17,13 +17,15 @@ class Callback(base.BaseHandler):
self._once = once self._once = once
self._instream = instream self._instream = instream
def prerun(self, payload): def prerun(self, payload): # prerun actually calls run?!? WTF! Then it gets run AGAIN!
base.BaseHandler.prerun(self, payload) base.BaseHandler.prerun(self, payload)
if self._instream: if self._instream:
logging.debug('callback "%s" prerun', self.name)
self.run(payload, True) self.run(payload, True)
def run(self, payload, instream=False): def run(self, payload, instream=False):
if not self._instream or instream: if not self._instream or instream:
logging.debug('callback "%s" run', self.name)
base.BaseHandler.run(self, payload) base.BaseHandler.run(self, payload)
#if self._thread: #if self._thread:
# x = threading.Thread(name="Callback_%s" % self.name, target=self._pointer, args=(payload,)) # x = threading.Thread(name="Callback_%s" % self.name, target=self._pointer, args=(payload,))

View file

@ -54,7 +54,7 @@ class XMLStream(object):
self.ssl_support = ssl_support self.ssl_support = ssl_support
self.escape_quotes = escape_quotes self.escape_quotes = escape_quotes
self.state = statemachine.StateMachine() self.state = statemachine.StateMachine()
self.state.addStates({'connected':False, 'is client':False, 'ssl':False, 'tls':False, 'reconnect':True, 'processing':False, 'disconnecting':False}) #set initial states self.state.addStates({'connected':False, 'is client':False, 'ssl':False, 'tls':False, 'reconnect':True, 'processing':False}) #set initial states
self.setSocket(socket) self.setSocket(socket)
self.address = (host, int(port)) self.address = (host, int(port))
@ -101,30 +101,33 @@ class XMLStream(object):
def connectTCP(self, host='', port=0, use_ssl=None, use_tls=None, reattempt=True): def connectTCP(self, host='', port=0, use_ssl=None, use_tls=None, reattempt=True):
"Connect and create socket" "Connect and create socket"
while reattempt and not self.state['connected']: while reattempt and not self.state['connected']:
logging.debug('connecting....')
try:
if host and port: if host and port:
self.address = (host, int(port)) self.address = (host, int(port))
if use_ssl is not None: if use_ssl is not None:
self.use_ssl = use_ssl self.use_ssl = use_ssl
if use_tls is not None: if use_tls is not None:
self.use_tls = use_tls self.use_tls = use_tls
self.state.set('is client', True)
if sys.version_info < (3, 0): if sys.version_info < (3, 0):
self.socket = filesocket.Socket26(socket.AF_INET, socket.SOCK_STREAM) self.socket = filesocket.Socket26(socket.AF_INET, socket.SOCK_STREAM)
else: else:
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.settimeout(None) self.socket.settimeout(None) #10)
if self.use_ssl and self.ssl_support: if self.use_ssl and self.ssl_support:
logging.debug("Socket Wrapped for SSL") logging.debug("Socket Wrapped for SSL")
self.socket = ssl.wrap_socket(self.socket,ca_certs=self.ca_certs) self.socket = ssl.wrap_socket(self.socket,ca_certs=self.ca_certs)
except:
logging.exception("Connection error")
try: try:
self.socket.connect(self.address) self.socket.connect(self.address)
#self.filesocket = self.socket.makefile('rb', 0)
self.filesocket = self.socket.makefile('rb', 0) self.filesocket = self.socket.makefile('rb', 0)
self.state.set('connected', True) self.state.set('connected', True)
logging.debug('connect complete.')
return True return True
except socket.error as serr: except socket.error as serr:
logging.error("Could not connect. Socket Error #%s: %s" % (serr.errno, serr.strerror)) logging.error("Could not connect. Socket Error #%s: %s" % (serr.errno, serr.strerror))
time.sleep(1) time.sleep(1) # TODO proper quiesce if connection attempt fails
def connectUnix(self, filepath): def connectUnix(self, filepath):
"Connect to Unix file and create socket" "Connect to Unix file and create socket"
@ -133,19 +136,19 @@ class XMLStream(object):
"Handshakes for TLS" "Handshakes for TLS"
if self.ssl_support: if self.ssl_support:
logging.info("Negotiating TLS") logging.info("Negotiating TLS")
self.realsocket = self.socket # self.realsocket = self.socket # NOT USED
self.socket = ssl.wrap_socket(self.socket, self.socket = ssl.wrap_socket(self.socket,
ssl_version=ssl.PROTOCOL_TLSv1, ssl_version=ssl.PROTOCOL_TLSv1,
do_handshake_on_connect=False, do_handshake_on_connect=False,
ca_certs=self.ca_certs) ca_certs=self.ca_certs)
print "doing handshake..."
self.socket.do_handshake() self.socket.do_handshake()
print "got handshake..."
if sys.version_info < (3,0): if sys.version_info < (3,0):
from . filesocket import filesocket from . filesocket import filesocket
self.filesocket = filesocket(self.socket) self.filesocket = filesocket(self.socket)
else: else:
self.filesocket = self.socket.makefile('rb', 0) self.filesocket = self.socket.makefile('rb', 0)
logging.debug("TLS negotitation successful")
return True return True
else: else:
logging.warning("Tried to enable TLS, but ssl module not found.") logging.warning("Tried to enable TLS, but ssl module not found.")
@ -154,8 +157,8 @@ class XMLStream(object):
def process(self, threaded=True): def process(self, threaded=True):
self.scheduler.process(threaded=True) self.scheduler.process(threaded=True)
self.run = True
for t in range(0, HANDLER_THREADS): for t in range(0, HANDLER_THREADS):
<<<<<<< HEAD
th = threading.Thread(name='eventhandle%s' % t, target=self._eventRunner) th = threading.Thread(name='eventhandle%s' % t, target=self._eventRunner)
th.setDaemon(True) th.setDaemon(True)
self.__thread['eventhandle%s' % t] = th self.__thread['eventhandle%s' % t] = th
@ -164,13 +167,6 @@ class XMLStream(object):
th.setDaemon(True) th.setDaemon(True)
self.__thread['sendthread'] = th self.__thread['sendthread'] = th
th.start() th.start()
=======
logging.debug("Starting HANDLER THREAD")
self.__thread['eventhandle%s' % t] = threading.Thread(name='eventhandle%s' % t, target=self._eventRunner)
self.__thread['eventhandle%s' % t].start()
self.__thread['sendthread'] = threading.Thread(name='sendthread', target=self._sendThread)
self.__thread['sendthread'].start()
>>>>>>> master
if threaded: if threaded:
th = threading.Thread(name='process', target=self._process) th = threading.Thread(name='process', target=self._process)
th.setDaemon(True) th.setDaemon(True)
@ -184,54 +180,54 @@ class XMLStream(object):
def _process(self): def _process(self):
"Start processing the socket." "Start processing the socket."
firstrun = True logging.debug('Process thread starting...')
while self.run and (firstrun or self.state['reconnect']): while self.run:
self.state.set('processing', True) self.state.set('processing', True)
firstrun = False
try: try:
if self.state['is client']:
self.sendRaw(self.stream_header)
while self.run and self.__readXML():
if self.state['is client']:
self.sendRaw(self.stream_header) self.sendRaw(self.stream_header)
while self.run and self.__readXML(): pass
except socket.timeout:
logging.debug('socket rcv timeout')
pass
except CloseStream:
# TODO warn that the listener thread is exiting!!!
pass
except RestartStream:
logging.debug("Restarting stream...")
continue # DON'T re-initialize the stream -- this exception is sent
# specifically when we've initialized TLS and need to re-send the <stream> header.
except KeyboardInterrupt: except KeyboardInterrupt:
logging.debug("Keyboard Escape Detected") logging.debug("Keyboard Escape Detected")
self.state.set('processing', False) self.state.set('processing', False)
self.state.set('reconnect', False) self.state.set('reconnect', False)
self.disconnect() self.disconnect()
self.run = False # TODO this is probably not necessary...
self.scheduler.run = False
self.eventqueue.put(('quit', None, None)) self.eventqueue.put(('quit', None, None))
return return
except CloseStream:
return
except SystemExit: except SystemExit:
# TODO shouldn't this be the same as KeyboardInterrupt????
self.eventqueue.put(('quit', None, None)) self.eventqueue.put(('quit', None, None))
return return
except socket.error:
if not self.state.reconnect:
return
else:
self.state.set('processing', False)
traceback.print_exc()
self.disconnect(reconnect=True)
except: except:
logging.exception('Unexpected error in RCV thread')
if not self.state.reconnect: if not self.state.reconnect:
return return
else: else:
logging.debug('reconnecting...')
self.state.set('processing', False) self.state.set('processing', False)
traceback.print_exc()
self.disconnect(reconnect=True) self.disconnect(reconnect=True)
if self.state['reconnect']: # TODO the individual exception handlers above already handle reconnect!
self.state.set('connected', False) # Why are we attempting to do it again down here???
# if self.state['reconnect']:
# self.state.set('connected', False)
self.state.set('processing', False) self.state.set('processing', False)
self.reconnect() # self.reconnect()
else: # else:
self.eventqueue.put(('quit', None, None)) # TODO I think this is getting queued, and when the eventRunner comes back online after
#self.__thread['readXML'] = threading.Thread(name='readXML', target=self.__readXML) # reconnect, it immediately processes a 'quit' event and exits again, meanwhile the
#self.__thread['readXML'].start() # rest of the client is just starting to connect and process the incoming event stream!!!
#self.__thread['spawnEvents'] = threading.Thread(name='spawnEvents', target=self.__spawnEvents) # self.eventqueue.put(('quit', None, None))
#self.__thread['spawnEvents'].start() logging.debug('Quitting Process thread')
def __readXML(self): def __readXML(self):
"Parses the incoming stream, adding to xmlin queue as it goes" "Parses the incoming stream, adding to xmlin queue as it goes"
@ -244,41 +240,50 @@ class XMLStream(object):
if edepth == 0: # and xmlobj.tag.split('}', 1)[-1] == self.basetag: if edepth == 0: # and xmlobj.tag.split('}', 1)[-1] == self.basetag:
if event == b'start': if event == b'start':
root = xmlobj root = xmlobj
logging.debug('handling start stream')
self.start_stream_handler(root) self.start_stream_handler(root)
if event == b'end': if event == b'end':
edepth += -1 edepth += -1
if edepth == 0 and event == b'end': if edepth == 0 and event == b'end':
self.disconnect(reconnect=self.state['reconnect']) # what is this case exactly? Premature EOF?
#self.disconnect(reconnect=self.state['reconnect'])
logging.debug("Ending readXML loop") logging.debug("Ending readXML loop")
return False return False
elif edepth == 1: elif edepth == 1:
#self.xmlin.put(xmlobj) #self.xmlin.put(xmlobj)
try:
self.__spawnEvent(xmlobj) self.__spawnEvent(xmlobj)
except RestartStream: if root: root.clear()
return True
except CloseStream:
logging.debug("Ending readXML loop")
return False
if root:
root.clear()
if event == b'start': if event == b'start':
edepth += 1 edepth += 1
logging.debug("Ending readXML loop") logging.debug("Exiting readXML loop")
return False
def _sendThread(self): def _sendThread(self):
logging.debug('send thread starting...')
while self.run: while self.run:
data = self.sendqueue.get(True) if not self.state['connected']:
logging.debug("SEND: %s" % data) logging.warning("Not connected yet...")
time.sleep(1)
data = None
try: try:
self.socket.send(data.encode('utf-8')) data = self.sendqueue.get(True,10)
#self.socket.send(bytes(data, "utf-8")) logging.debug("SEND: %s" % data)
#except socket.error,(errno, strerror): self.socket.sendall(data.encode('utf-8'))
except queue.Empty:
logging.debug('nothing on send queue')
except socket.timeout:
# this is to prevent hanging
logging.debug('timeout sending packet data')
except: except:
logging.warning("Failed to send %s" % data) logging.warning("Failed to send %s" % data)
self.state.set('connected', False) logging.exception("Socket error in SEND thread")
# TODO it's somewhat unsafe for the sender thread to assume it can just
# re-intitialize the connection, since the receiver thread could be doing
# the same thing concurrently. Oops! The safer option would be to throw
# some sort of event that could be handled by a common thread or the reader
# thread to perform reconnect and then re-initialize the handler threads as well.
if self.state.reconnect: if self.state.reconnect:
logging.error("Disconnected. Socket Error.") logging.debug('Reconnecting...')
traceback.print_exc() traceback.print_exc()
self.disconnect(reconnect=True) self.disconnect(reconnect=True)
@ -288,35 +293,32 @@ class XMLStream(object):
def disconnect(self, reconnect=False): def disconnect(self, reconnect=False):
self.state.set('reconnect', reconnect) self.state.set('reconnect', reconnect)
if self.state['disconnecting']: if not self.state['connected']:
logging.warning("Already disconnected.")
return return
if not self.state['reconnect']:
logging.debug("Disconnecting...") logging.debug("Disconnecting...")
self.state.set('disconnecting', True)
self.run = False
self.scheduler.run = False
if self.state['connected']:
self.sendRaw(self.stream_footer) self.sendRaw(self.stream_footer)
time.sleep(1) time.sleep(5)
#send end of stream #send end of stream
#wait for end of stream back #wait for end of stream back
self.run = False
self.scheduler.run = False
try: try:
self.state.set('connected',False)
# self.socket.shutdown(socket.SHUT_RDWR)
self.socket.close() self.socket.close()
except socket.error as (errno,strerror):
logging.exception("Error while disconnecting. Socket Error #%s: %s" % (errno, strerror))
try:
self.filesocket.close() self.filesocket.close()
self.socket.shutdown(socket.SHUT_RDWR) except socket.error as (errno,strerror):
except socket.error as serr: logging.exception("Error closing filesocket.")
#logging.warning("Error while disconnecting. Socket Error #%s: %s" % (errno, strerror))
#thread.exit_thread()
pass
if self.state['processing']:
#raise CloseStream
pass
def reconnect(self): def reconnect(self):
self.state.set('tls',False) self.state.set('tls',False)
self.state.set('ssl',False) self.state.set('ssl',False)
time.sleep(1) time.sleep(1)
self.connect(self.server,self.port) self.connect()
def incoming_filter(self, xmlobj): def incoming_filter(self, xmlobj):
return xmlobj return xmlobj
@ -324,6 +326,7 @@ class XMLStream(object):
def __spawnEvent(self, xmlobj): def __spawnEvent(self, xmlobj):
"watching xmlOut and processes handlers" "watching xmlOut and processes handlers"
#convert XML into Stanza #convert XML into Stanza
# TODO surround this log statement with an if, it's expensive
logging.debug("RECV: %s" % cElementTree.tostring(xmlobj)) logging.debug("RECV: %s" % cElementTree.tostring(xmlobj))
xmlobj = self.incoming_filter(xmlobj) xmlobj = self.incoming_filter(xmlobj)
stanza = None stanza = None
@ -335,11 +338,15 @@ class XMLStream(object):
if stanza is None: if stanza is None:
stanza = StanzaBase(self, xmlobj) stanza = StanzaBase(self, xmlobj)
unhandled = True unhandled = True
# TODO inefficient linear search; performance might be improved by hashtable lookup
for handler in self.__handlers: for handler in self.__handlers:
if handler.match(stanza): if handler.match(stanza):
logging.debug('matched stanza to handler %s', handler.name)
handler.prerun(stanza) handler.prerun(stanza)
self.eventqueue.put(('stanza', handler, stanza)) self.eventqueue.put(('stanza', handler, stanza))
if handler.checkDelete(): self.__handlers.pop(self.__handlers.index(handler)) if handler.checkDelete():
logging.debug('deleting callback %s', handler.name)
self.__handlers.pop(self.__handlers.index(handler))
unhandled = False unhandled = False
if unhandled: if unhandled:
stanza.unhandled() stanza.unhandled()
@ -353,9 +360,6 @@ class XMLStream(object):
event = self.eventqueue.get(True, timeout=5) event = self.eventqueue.get(True, timeout=5)
except queue.Empty: except queue.Empty:
event = None event = None
except KeyboardInterrupt:
self.run = False
self.scheduler.run = False
if event is not None: if event is not None:
etype = event[0] etype = event[0]
handler = event[1] handler = event[1]
@ -365,12 +369,12 @@ class XMLStream(object):
try: try:
handler.run(args[0]) handler.run(args[0])
except Exception as e: except Exception as e:
traceback.print_exc() logging.exception("Exception in event handler")
args[0].exception(e) args[0].exception(e)
elif etype == 'schedule': elif etype == 'sched':
try: try:
logging.debug(args) #handler(*args[0])
handler(*args[0]) handler.run(*args)
except: except:
logging.error(traceback.format_exc()) logging.error(traceback.format_exc())
elif etype == 'quit': elif etype == 'quit':
@ -466,4 +470,4 @@ class XMLStream(object):
def start_stream_handler(self, xml): def start_stream_handler(self, xml):
"""Meant to be overridden""" """Meant to be overridden"""
pass logging.warn("No start stream handler has been implemented.")