reconnect if session isn't established within 15 seconds

This commit is contained in:
Nathan Fritz 2010-10-20 19:18:27 -07:00
parent 11264fe0a8
commit 77eab6544f
4 changed files with 26 additions and 7 deletions

View file

@ -13,6 +13,7 @@ import base64
import sys import sys
import hashlib import hashlib
import random import random
import threading
from sleekxmpp import plugins from sleekxmpp import plugins
from sleekxmpp import stanza from sleekxmpp import stanza
@ -78,6 +79,9 @@ class ClientXMPP(BaseXMPP):
self.plugin_whitelist = plugin_whitelist self.plugin_whitelist = plugin_whitelist
self.srv_support = SRV_SUPPORT self.srv_support = SRV_SUPPORT
self.session_started_event = threading.Event()
self.session_started_event.clear()
self.stream_header = "<stream:stream to='%s' %s %s version='1.0'>" % ( self.stream_header = "<stream:stream to='%s' %s %s version='1.0'>" % (
self.boundjid.host, self.boundjid.host,
"xmlns:stream='%s'" % self.stream_ns, "xmlns:stream='%s'" % self.stream_ns,
@ -124,6 +128,12 @@ class ClientXMPP(BaseXMPP):
self.sessionstarted = False self.sessionstarted = False
self.bound = False self.bound = False
self.bindfail = False self.bindfail = False
self.schedule("session timeout checker", 15, self._session_timeout_check)
def _session_timeout_check(self):
if not self.session_started_event.isSet():
logging.debug("Session start has taken more than 15 seconds")
self.disconnect(reconnect=self.auto_reconnect)
def connect(self, address=tuple()): def connect(self, address=tuple()):
""" """
@ -136,6 +146,7 @@ class ClientXMPP(BaseXMPP):
Arguments: Arguments:
address -- A tuple containing the server's host and port. address -- A tuple containing the server's host and port.
""" """
self.session_started_event.clear()
if not address or len(address) < 2: if not address or len(address) < 2:
if not self.srv_support: if not self.srv_support:
logging.debug("Did not supply (address, port) to connect" + \ logging.debug("Did not supply (address, port) to connect" + \
@ -374,6 +385,7 @@ class ClientXMPP(BaseXMPP):
if "{%s}session" % session_ns not in self.features or self.bindfail: if "{%s}session" % session_ns not in self.features or self.bindfail:
logging.debug("Established Session") logging.debug("Established Session")
self.sessionstarted = True self.sessionstarted = True
self.session_started_event.set()
self.event("session_start") self.event("session_start")
def _handle_start_session(self, xml): def _handle_start_session(self, xml):
@ -388,6 +400,7 @@ class ClientXMPP(BaseXMPP):
response = iq.send() response = iq.send()
logging.debug("Established Session") logging.debug("Established Session")
self.sessionstarted = True self.sessionstarted = True
self.session_started_event.set()
self.event("session_start") self.event("session_start")
else: else:
# Bind probably hasn't happened yet. # Bind probably hasn't happened yet.

View file

@ -82,18 +82,22 @@ class StateMachine(object):
if not to_state in self.__states: if not to_state in self.__states:
raise ValueError( "StateMachine does not contain to_state %s." % to_state ) raise ValueError( "StateMachine does not contain to_state %s." % to_state )
start = time.time() start = time.time()
while not self.lock.acquire(False): while not self.lock.acquire(False):
time.sleep(.001) time.sleep(.001)
if (start + wait - time.time()) <= 0.0: if (start + wait - time.time()) <= 0.0:
logging.debug("Could not acquire lock")
return False return False
while not self.__current_state in from_states: while not self.__current_state in from_states:
# detect timeout: # detect timeout:
remainder = start + wait - time.time() remainder = start + wait - time.time()
if remainder > 0: self.notifier.wait(remainder) if remainder > 0:
else: return False self.notifier.wait(remainder)
else:
logging.debug("State was not ready")
self.lock.release()
return False
try: # lock is acquired; all other threads will return false or wait until notify/timeout try: # lock is acquired; all other threads will return false or wait until notify/timeout
if self.__current_state in from_states: # should always be True due to lock if self.__current_state in from_states: # should always be True due to lock

View file

@ -137,7 +137,7 @@ class Scheduler(object):
"""Process scheduled tasks.""" """Process scheduled tasks."""
self.run = True self.run = True
try: try:
while self.run: while self.run and (self.parentstop is None or not self.parentstop.isSet()):
wait = 1 wait = 1
updated = False updated = False
if self.schedule: if self.schedule:
@ -168,6 +168,7 @@ class Scheduler(object):
except KeyboardInterrupt: except KeyboardInterrupt:
self.run = False self.run = False
if self.parentstop is not None: if self.parentstop is not None:
logging.debug("stopping parent")
self.parentstop.set() self.parentstop.set()
except SystemExit: except SystemExit:
self.run = False self.run = False

View file

@ -275,6 +275,7 @@ class XMLStream(object):
self.socket = ssl_socket self.socket = ssl_socket
try: try:
logging.debug("Connecting to %s:%s" % self.address)
self.socket.connect(self.address) self.socket.connect(self.address)
self.set_socket(self.socket, ignore=True) self.set_socket(self.socket, ignore=True)
#this event is where you should set your application state #this event is where you should set your application state
@ -328,10 +329,10 @@ class XMLStream(object):
Reset the stream's state and reconnect to the server. Reset the stream's state and reconnect to the server.
""" """
logging.debug("reconnecting...") logging.debug("reconnecting...")
self.state.transition('connected', 'disconnected', wait=0.0, self.state.transition('connected', 'disconnected', wait=2.0,
func=self._disconnect, args=(True,)) func=self._disconnect, args=(True,))
return self.state.transition('disconnected', 'connected', return self.state.transition('disconnected', 'connected',
wait=0.0, func=self._connect) wait=2.0, func=self._connect)
def set_socket(self, socket, ignore=False): def set_socket(self, socket, ignore=False):
""" """
@ -669,7 +670,7 @@ class XMLStream(object):
# The body of this loop will only execute once per connection. # The body of this loop will only execute once per connection.
# Additional passes will be made only if an error occurs and # Additional passes will be made only if an error occurs and
# reconnecting is permitted. # reconnecting is permitted.
while not self.stop.isSet() and firstrun or self.auto_reconnect: while firstrun or (self.auto_reconnect and not self.stop.isSet()):
firstrun = False firstrun = False
try: try:
if self.is_client: if self.is_client: