Merge remote branch 'tom/hacks'

This commit is contained in:
Brian Beggs 2010-06-03 09:54:48 -04:00
commit 71d72f431f
4 changed files with 64 additions and 18 deletions

View file

@ -69,6 +69,8 @@ class ClientXMPP(basexmpp, XMLStream):
#TODO: Use stream state here #TODO: Use stream state here
self.authenticated = False self.authenticated = False
self.sessionstarted = False self.sessionstarted = False
self.bound = False
self.bindfail = False
self.registerHandler(Callback('Stream Features', MatchXPath('{http://etherx.jabber.org/streams}features'), self._handleStreamFeatures, thread=True)) self.registerHandler(Callback('Stream Features', MatchXPath('{http://etherx.jabber.org/streams}features'), self._handleStreamFeatures, thread=True))
self.registerHandler(Callback('Roster Update', MatchXPath('{%s}iq/{jabber:iq:roster}query' % self.default_ns), self._handleRoster, thread=True)) self.registerHandler(Callback('Roster Update', MatchXPath('{%s}iq/{jabber:iq:roster}query' % self.default_ns), self._handleRoster, thread=True))
#self.registerHandler(Callback('Roster Update', MatchXMLMask("<presence xmlns='%s' type='subscribe' />" % self.default_ns), self._handlePresenceSubscribe, thread=True)) #self.registerHandler(Callback('Roster Update', MatchXMLMask("<presence xmlns='%s' type='subscribe' />" % self.default_ns), self._handlePresenceSubscribe, thread=True))
@ -146,13 +148,9 @@ class ClientXMPP(basexmpp, XMLStream):
# overriding reconnect and disconnect so that we can get some events # overriding reconnect and disconnect so that we can get some events
# should events be part of or required by xmlstream? Maybe that would be cleaner # should events be part of or required by xmlstream? Maybe that would be cleaner
def reconnect(self): def reconnect(self):
logging.info("Reconnecting") self.disconnect(reconnect=True)
self.event("disconnected")
self.authenticated = False
self.sessionstarted = False
XMLStream.reconnect(self)
def disconnect(self, init=True, close=False, reconnect=False): def disconnect(self, reconnect=False):
self.event("disconnected") self.event("disconnected")
self.authenticated = False self.authenticated = False
self.sessionstarted = False self.sessionstarted = False
@ -248,19 +246,23 @@ class ClientXMPP(basexmpp, XMLStream):
response = iq.send() response = iq.send()
#response = self.send(iq, self.Iq(sid=iq['id'])) #response = self.send(iq, self.Iq(sid=iq['id']))
self.set_jid(response.xml.find('{urn:ietf:params:xml:ns:xmpp-bind}bind/{urn:ietf:params:xml:ns:xmpp-bind}jid').text) self.set_jid(response.xml.find('{urn:ietf:params:xml:ns:xmpp-bind}bind/{urn:ietf:params:xml:ns:xmpp-bind}jid').text)
self.bound = True
logging.info("Node set to: %s" % self.fulljid) logging.info("Node set to: %s" % self.fulljid)
if "{urn:ietf:params:xml:ns:xmpp-session}session" not in self.features: if "{urn:ietf:params:xml:ns:xmpp-session}session" not in self.features or self.bindfail:
logging.debug("Established Session") logging.debug("Established Session")
self.sessionstarted = True self.sessionstarted = True
self.event("session_start") self.event("session_start")
def handler_start_session(self, xml): def handler_start_session(self, xml):
if self.authenticated: if self.authenticated and self.bound:
iq = self.makeIqSet(xml) iq = self.makeIqSet(xml)
response = iq.send() response = iq.send()
logging.debug("Established Session") logging.debug("Established Session")
self.sessionstarted = True self.sessionstarted = True
self.event("session_start") self.event("session_start")
else:
#bind probably hasn't happened yet
self.bindfail = True
def _handleRoster(self, iq, request=False): def _handleRoster(self, iq, request=False):
if iq['type'] == 'set' or (iq['type'] == 'result' and request): if iq['type'] == 'set' or (iq['type'] == 'result' and request):

View file

@ -76,7 +76,7 @@ class Scheduler(object):
if updated: self.schedule = sorted(self.schedule, key=lambda task: task.next) if updated: self.schedule = sorted(self.schedule, key=lambda task: task.next)
except KeyboardInterrupt: except KeyboardInterrupt:
self.run = False self.run = False
logging.debug("Qutting Scheduler thread") logging.debug("Quitting Scheduler thread")
if self.parentqueue is not None: if self.parentqueue is not None:
self.parentqueue.put(('quit', None, None)) self.parentqueue.put(('quit', None, None))

View file

@ -112,6 +112,13 @@ class StateMachine(object):
self.transition(self.__current_state, self._default_state) self.transition(self.__current_state, self._default_state)
def current_state(self):
'''
Return the current state name.
'''
return self.__current_state
def __getitem__(self, state): def __getitem__(self, state):
''' '''
Non-blocking, non-synchronized test to determine if we are in the given state. Non-blocking, non-synchronized test to determine if we are in the given state.

View file

@ -16,6 +16,7 @@ from . stanzabase import StanzaBase
from xml.etree import cElementTree from xml.etree import cElementTree
from xml.parsers import expat from xml.parsers import expat
import logging import logging
import random
import socket import socket
import threading import threading
import time import time
@ -46,6 +47,10 @@ class CloseStream(Exception):
stanza_extensions = {} stanza_extensions = {}
RECONNECT_MAX_DELAY = 3600
RECONNECT_QUIESCE_FACTOR = 1.6180339887498948 # Phi
RECONNECT_QUIESCE_JITTER = 0.11962656472 # molar Planck constant times c, joule meter/mole
class XMLStream(object): class XMLStream(object):
"A connection manager with XML events." "A connection manager with XML events."
@ -96,12 +101,29 @@ class XMLStream(object):
def connect(self, host='', port=0, use_ssl=None, use_tls=None): def connect(self, host='', port=0, use_ssl=None, use_tls=None):
"Link to connectTCP" "Link to connectTCP"
if self.state.transition('disconnected', 'connecting'): if not self.state.transition('disconnected','connecting'):
return self.connectTCP(host, port, use_ssl, use_tls) logging.warning("Can't connect now; Already in state %s", self.state.current_state())
return False
if not self.connectTCP(host, port, use_ssl, use_tls):
# return to the 'disconnected' state if connect failed:
# otherwise the connect method is not reentrant
if not self.state.transition('connecting','disconnected'):
logging.error("Couldn't transition to the 'disconnected' state!")
return False
return True
# TODO currently a caller can't distinguish between "connection failed" and
# "we're already trying to connect from another thread"
def connectTCP(self, host='', port=0, use_ssl=None, use_tls=None, reattempt=True): def connectTCP(self, host='', port=0, use_ssl=None, use_tls=None, reattempt=True):
"Connect and create socket" "Connect and create socket"
while reattempt and not self.state['connected']: # the self.state part is redundant.
# Note that this is thread-safe by merit of being called solely from connect() which
# holds the state lock.
delay = 1.0 # reconnection delay
while self.run:
logging.debug('connecting....') logging.debug('connecting....')
try: try:
if host and port: if host and port:
@ -109,27 +131,42 @@ class XMLStream(object):
if use_ssl is not None: if use_ssl is not None:
self.use_ssl = use_ssl self.use_ssl = use_ssl
if use_tls is not None: if use_tls is not None:
# TODO this variable doesn't seem to be used for anything!
self.use_tls = use_tls self.use_tls = use_tls
if sys.version_info < (3, 0): if sys.version_info < (3, 0):
self.socket = filesocket.Socket26(socket.AF_INET, socket.SOCK_STREAM) self.socket = filesocket.Socket26(socket.AF_INET, socket.SOCK_STREAM)
else: else:
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.settimeout(None) #10) self.socket.settimeout(None) #10)
if self.use_ssl and self.ssl_support: if self.use_ssl and self.ssl_support:
logging.debug("Socket Wrapped for SSL") logging.debug("Socket Wrapped for SSL")
self.socket = ssl.wrap_socket(self.socket,ca_certs=self.ca_certs) self.socket = ssl.wrap_socket(self.socket,ca_certs=self.ca_certs)
except:
logging.exception("Connection error")
try:
self.socket.connect(self.address) self.socket.connect(self.address)
self.filesocket = self.socket.makefile('rb', 0) self.filesocket = self.socket.makefile('rb', 0)
if not self.state.transition('connecting','connected'): if not self.state.transition('connecting','connected'):
logging.error( "State transition error!!!! Shouldn't have happened" ) logging.error( "State transition error!!!! Shouldn't have happened" )
logging.debug('connect complete.') logging.debug('connect complete.')
return True return True
except socket.error as serr: except socket.error as serr:
logging.error("Could not connect. Socket Error #%s: %s" % (serr.errno, serr.strerror)) logging.exception("Socket Error #%s: %s", serr.errno, serr.strerror)
time.sleep(1) # TODO proper quiesce if connection attempt fails if not reattempt: return False
except:
logging.exception("Connection error")
if not reattempt: return False
# quiesce if rconnection fails:
# This algorithm based loosely on Twisted internet.protocol
# http://twistedmatrix.com/trac/browser/trunk/twisted/internet/protocol.py#L310
delay = min(delay * RECONNECT_QUIESCE_FACTOR, RECONNECT_MAX_DELAY)
delay = random.normalvariate(delay, delay * RECONNECT_QUIESCE_JITTER)
logging.debug('Waiting %fs until next reconnect attempt...', delay)
time.sleep(delay)
def connectUnix(self, filepath): def connectUnix(self, filepath):
"Connect to Unix file and create socket" "Connect to Unix file and create socket"