From e1aa4d0b93f09b92506af8bb21e87c367905e194 Mon Sep 17 00:00:00 2001 From: Brian Beggs Date: Wed, 2 Jun 2010 19:34:43 +0800 Subject: [PATCH 01/10] Added .pydevproject to the .gitignore --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index 15997ce..8b41dac 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,4 @@ .project build/ *.swp +.pydevproject From 184f7cb8a4d2835c317eca7db129fb06d92a336a Mon Sep 17 00:00:00 2001 From: Brian Beggs Date: Wed, 2 Jun 2010 20:28:49 +0800 Subject: [PATCH 02/10] moddified plugin loading so plugins located outside of the plugins directory in sleek may be loaded. Added optional argument pluginModule that is a string that represents the module the desired plugin should be loaded from. An exception on plugin loading now also will not cause the program to exit. The exception is caught and loading of other plugins contains. --- .gitignore | 1 + sleekxmpp/basexmpp.py | 17 ++++++++++------- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/.gitignore b/.gitignore index 8b41dac..6257bbf 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,4 @@ build/ *.swp .pydevproject +.settings diff --git a/sleekxmpp/basexmpp.py b/sleekxmpp/basexmpp.py index a916fe8..5f33017 100644 --- a/sleekxmpp/basexmpp.py +++ b/sleekxmpp/basexmpp.py @@ -1,9 +1,9 @@ """ - SleekXMPP: The Sleek XMPP Library - Copyright (C) 2010 Nathanael C. Fritz - This file is part of SleekXMPP. + SleekXMPP: The Sleek XMPP Library + Copyright (C) 2010 Nathanael C. Fritz + This file is part of SleekXMPP. - See the file license.txt for copying permission. + See the file license.txt for copying permission. """ from __future__ import with_statement, unicode_literals @@ -91,15 +91,18 @@ class basexmpp(object): if not self.plugin[idx].post_inited: self.plugin[idx].post_init() return super(basexmpp, self).process(*args, **kwargs) - def registerPlugin(self, plugin, pconfig = {}): + def registerPlugin(self, plugin, pconfig = {}, pluginModule = None): """Register a plugin not in plugins.__init__.__all__ but in the plugins directory.""" # discover relative "path" to the plugins module from the main app, and import it. # TODO: # gross, this probably isn't necessary anymore, especially for an installed module - __import__("%s.%s" % (globals()['plugins'].__name__, plugin)) + if pluginModule: + module = __import__("%s.%s" % (pluginModule, plugin), globals(), locals(), [plugin]) + else: + module = __import__("%s.%s" % (globals()['plugins'].__name__, plugin), globals(), locals(), [plugin]) # init the plugin class - self.plugin[plugin] = getattr(getattr(plugins, plugin), plugin)(self, pconfig) # eek + self.plugin[plugin] = getattr(module, plugin)(self, pconfig) # eek # all of this for a nice debug? sure. xep = '' if hasattr(self.plugin[plugin], 'xep'): From 2a43f59a588af7206b0d1935e85fbb9577b3c200 Mon Sep 17 00:00:00 2001 From: Brian Beggs Date: Wed, 2 Jun 2010 20:45:42 +0800 Subject: [PATCH 03/10] added try/catch block to plugin loading --- sleekxmpp/basexmpp.py | 27 +++++++++++++++------------ 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/sleekxmpp/basexmpp.py b/sleekxmpp/basexmpp.py index 5f33017..b78c255 100644 --- a/sleekxmpp/basexmpp.py +++ b/sleekxmpp/basexmpp.py @@ -97,18 +97,21 @@ class basexmpp(object): # discover relative "path" to the plugins module from the main app, and import it. # TODO: # gross, this probably isn't necessary anymore, especially for an installed module - if pluginModule: - module = __import__("%s.%s" % (pluginModule, plugin), globals(), locals(), [plugin]) - else: - module = __import__("%s.%s" % (globals()['plugins'].__name__, plugin), globals(), locals(), [plugin]) - # init the plugin class - self.plugin[plugin] = getattr(module, plugin)(self, pconfig) # eek - # all of this for a nice debug? sure. - xep = '' - if hasattr(self.plugin[plugin], 'xep'): - xep = "(XEP-%s) " % self.plugin[plugin].xep - logging.debug("Loaded Plugin %s%s" % (xep, self.plugin[plugin].description)) - + try: + if pluginModule: + module = __import__(pluginModule, globals(), locals(), [plugin]) + else: + module = __import__("%s.%s" % (globals()['plugins'].__name__, plugin), globals(), locals(), [plugin]) + # init the plugin class + self.plugin[plugin] = getattr(module, plugin)(self, pconfig) # eek + # all of this for a nice debug? sure. + xep = '' + if hasattr(self.plugin[plugin], 'xep'): + xep = "(XEP-%s) " % self.plugin[plugin].xep + logging.debug("Loaded Plugin %s%s" % (xep, self.plugin[plugin].description)) + except: + logging.error("Unable to load plugin: %s" %(plugin) ) + def register_plugins(self): """Initiates all plugins in the plugins/__init__.__all__""" if self.plugin_whitelist: From b03e6168a8131330390df35572345bfe07ba73fa Mon Sep 17 00:00:00 2001 From: Nathan Fritz Date: Wed, 2 Jun 2010 12:40:52 +0800 Subject: [PATCH 04/10] if binding and session are advertised in the same go, do session first --- sleekxmpp/__init__.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sleekxmpp/__init__.py b/sleekxmpp/__init__.py index 6995f7c..9297ad3 100644 --- a/sleekxmpp/__init__.py +++ b/sleekxmpp/__init__.py @@ -69,6 +69,7 @@ class ClientXMPP(basexmpp, XMLStream): #TODO: Use stream state here self.authenticated = False self.sessionstarted = False + self.bound = False self.registerHandler(Callback('Stream Features', MatchXPath('{http://etherx.jabber.org/streams}features'), self._handleStreamFeatures, thread=True)) self.registerHandler(Callback('Roster Update', MatchXPath('{%s}iq/{jabber:iq:roster}query' % self.default_ns), self._handleRoster, thread=True)) #self.registerHandler(Callback('Roster Update', MatchXMLMask("" % self.default_ns), self._handlePresenceSubscribe, thread=True)) @@ -248,6 +249,7 @@ class ClientXMPP(basexmpp, XMLStream): response = iq.send() #response = self.send(iq, self.Iq(sid=iq['id'])) self.set_jid(response.xml.find('{urn:ietf:params:xml:ns:xmpp-bind}bind/{urn:ietf:params:xml:ns:xmpp-bind}jid').text) + self.bound = True logging.info("Node set to: %s" % self.fulljid) if "{urn:ietf:params:xml:ns:xmpp-session}session" not in self.features: logging.debug("Established Session") @@ -255,7 +257,7 @@ class ClientXMPP(basexmpp, XMLStream): self.event("session_start") def handler_start_session(self, xml): - if self.authenticated: + if self.authenticated and self.bound: iq = self.makeIqSet(xml) response = iq.send() logging.debug("Established Session") From 85a2715c7de1f436b5553a73f74c30cffa8bc04f Mon Sep 17 00:00:00 2001 From: Nathan Fritz Date: Wed, 2 Jun 2010 12:44:54 +0800 Subject: [PATCH 05/10] hack fix for session before bind --- sleekxmpp/__init__.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/sleekxmpp/__init__.py b/sleekxmpp/__init__.py index 9297ad3..23e8c9b 100644 --- a/sleekxmpp/__init__.py +++ b/sleekxmpp/__init__.py @@ -70,6 +70,7 @@ class ClientXMPP(basexmpp, XMLStream): self.authenticated = False self.sessionstarted = False self.bound = False + self.bindfail = False self.registerHandler(Callback('Stream Features', MatchXPath('{http://etherx.jabber.org/streams}features'), self._handleStreamFeatures, thread=True)) self.registerHandler(Callback('Roster Update', MatchXPath('{%s}iq/{jabber:iq:roster}query' % self.default_ns), self._handleRoster, thread=True)) #self.registerHandler(Callback('Roster Update', MatchXMLMask("" % self.default_ns), self._handlePresenceSubscribe, thread=True)) @@ -251,7 +252,7 @@ class ClientXMPP(basexmpp, XMLStream): self.set_jid(response.xml.find('{urn:ietf:params:xml:ns:xmpp-bind}bind/{urn:ietf:params:xml:ns:xmpp-bind}jid').text) self.bound = True logging.info("Node set to: %s" % self.fulljid) - if "{urn:ietf:params:xml:ns:xmpp-session}session" not in self.features: + if "{urn:ietf:params:xml:ns:xmpp-session}session" not in self.features or self.bindfail: logging.debug("Established Session") self.sessionstarted = True self.event("session_start") @@ -263,6 +264,9 @@ class ClientXMPP(basexmpp, XMLStream): logging.debug("Established Session") self.sessionstarted = True self.event("session_start") + else: + #bind probably hasn't happened yet + self.bindfail = True def _handleRoster(self, iq, request=False): if iq['type'] == 'set' or (iq['type'] == 'result' and request): From 8227affd7f30a5fb8c6b215d00e42bc15df49d36 Mon Sep 17 00:00:00 2001 From: Thom Nichols Date: Wed, 2 Jun 2010 14:17:36 -0400 Subject: [PATCH 06/10] removed unnecessary flags and arguments from disconnect method --- sleekxmpp/__init__.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/sleekxmpp/__init__.py b/sleekxmpp/__init__.py index 6995f7c..56ba45e 100644 --- a/sleekxmpp/__init__.py +++ b/sleekxmpp/__init__.py @@ -146,13 +146,9 @@ class ClientXMPP(basexmpp, XMLStream): # overriding reconnect and disconnect so that we can get some events # should events be part of or required by xmlstream? Maybe that would be cleaner def reconnect(self): - logging.info("Reconnecting") - self.event("disconnected") - self.authenticated = False - self.sessionstarted = False - XMLStream.reconnect(self) + self.disconnect(reconnect=True) - def disconnect(self, init=True, close=False, reconnect=False): + def disconnect(self, reconnect=False): self.event("disconnected") self.authenticated = False self.sessionstarted = False From 4295a66c70e595d02e14f384432d8eee8dfef013 Mon Sep 17 00:00:00 2001 From: Thom Nichols Date: Wed, 2 Jun 2010 14:18:09 -0400 Subject: [PATCH 07/10] reconnection quiesce logic --- sleekxmpp/xmlstream/xmlstream.py | 36 ++++++++++++++++++++++++++------ 1 file changed, 30 insertions(+), 6 deletions(-) diff --git a/sleekxmpp/xmlstream/xmlstream.py b/sleekxmpp/xmlstream/xmlstream.py index 3bcb341..e0b3a23 100644 --- a/sleekxmpp/xmlstream/xmlstream.py +++ b/sleekxmpp/xmlstream/xmlstream.py @@ -16,6 +16,7 @@ from . stanzabase import StanzaBase from xml.etree import cElementTree from xml.parsers import expat import logging +import random import socket import threading import time @@ -46,6 +47,10 @@ class CloseStream(Exception): stanza_extensions = {} +RECONNECT_MAX_DELAY = 3600 +RECONNECT_QUIESCE_FACTOR = 1.6180339887498948 # Phi +RECONNECT_QUIESCE_JITTER = 0.11962656472 # molar Planck constant times c, joule meter/mole + class XMLStream(object): "A connection manager with XML events." @@ -101,7 +106,12 @@ class XMLStream(object): def connectTCP(self, host='', port=0, use_ssl=None, use_tls=None, reattempt=True): "Connect and create socket" - while reattempt and not self.state['connected']: # the self.state part is redundant. + + # Note that this is thread-safe by merit of being called solely from connect() which + # holds the state lock. + + delay = 1.0 # reconnection delay + while self.run: logging.debug('connecting....') try: if host and port: @@ -115,21 +125,35 @@ class XMLStream(object): else: self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.socket.settimeout(None) #10) + if self.use_ssl and self.ssl_support: logging.debug("Socket Wrapped for SSL") self.socket = ssl.wrap_socket(self.socket,ca_certs=self.ca_certs) - except: - logging.exception("Connection error") - try: + self.socket.connect(self.address) self.filesocket = self.socket.makefile('rb', 0) + if not self.state.transition('connecting','connected'): logging.error( "State transition error!!!! Shouldn't have happened" ) logging.debug('connect complete.') return True + except socket.error as serr: - logging.error("Could not connect. Socket Error #%s: %s" % (serr.errno, serr.strerror)) - time.sleep(1) # TODO proper quiesce if connection attempt fails + logging.exception("Socket Error #%s: %s", serr.errno, serr.strerror) + if not reattempt: return False + except: + logging.exception("Connection error") + if not reattempt: return False + + # quiesce if rconnection fails: + # This code based loosely on Twisted internet.protocol + # http://twistedmatrix.com/trac/browser/trunk/twisted/internet/protocol.py#L310 + delay = min(delay * RECONNECT_QUIESCE_JITTER, RECONNECT_MAX_DELAY) + delay = random.normalvariate(delay, delay * RECONNECT_QUIESCE_JITTER) + logging.debug('Waiting %fs until next reconnect attempt...', delay) + time.sleep(delay) + + def connectUnix(self, filepath): "Connect to Unix file and create socket" From 1c32668e18c0cbf840694c66a6f724b2d3cb8f29 Mon Sep 17 00:00:00 2001 From: Thom Nichols Date: Thu, 3 Jun 2010 07:47:27 -0400 Subject: [PATCH 08/10] fixed quiesce algorithm; state transition if connect fails; note about use_tls instance variable. --- sleekxmpp/xmlstream/xmlstream.py | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/sleekxmpp/xmlstream/xmlstream.py b/sleekxmpp/xmlstream/xmlstream.py index e0b3a23..af95c2b 100644 --- a/sleekxmpp/xmlstream/xmlstream.py +++ b/sleekxmpp/xmlstream/xmlstream.py @@ -101,8 +101,16 @@ class XMLStream(object): def connect(self, host='', port=0, use_ssl=None, use_tls=None): "Link to connectTCP" - if self.state.transition('disconnected', 'connecting'): - return self.connectTCP(host, port, use_ssl, use_tls) + if not self.connectTCP(host, port, use_ssl, use_tls): + # return to the 'disconnected' state if connect failed: + # otherwise the connect method is not reentrant + if not self.state.transition('connecting','disconnected'): + logging.error("Couldn't transition to the 'disconnected' state!") + return False + return True + + # TODO currently a caller can't distinguish between "connection failed" and + # "we're already trying to connect from another thread" def connectTCP(self, host='', port=0, use_ssl=None, use_tls=None, reattempt=True): "Connect and create socket" @@ -119,6 +127,7 @@ class XMLStream(object): if use_ssl is not None: self.use_ssl = use_ssl if use_tls is not None: + # TODO this variable doesn't seem to be used for anything! self.use_tls = use_tls if sys.version_info < (3, 0): self.socket = filesocket.Socket26(socket.AF_INET, socket.SOCK_STREAM) @@ -146,9 +155,9 @@ class XMLStream(object): if not reattempt: return False # quiesce if rconnection fails: - # This code based loosely on Twisted internet.protocol + # This algorithm based loosely on Twisted internet.protocol # http://twistedmatrix.com/trac/browser/trunk/twisted/internet/protocol.py#L310 - delay = min(delay * RECONNECT_QUIESCE_JITTER, RECONNECT_MAX_DELAY) + delay = min(delay * RECONNECT_QUIESCE_FACTOR, RECONNECT_MAX_DELAY) delay = random.normalvariate(delay, delay * RECONNECT_QUIESCE_JITTER) logging.debug('Waiting %fs until next reconnect attempt...', delay) time.sleep(delay) From 2f0f18a8c62db21fa3ba8cc87a6d190aca205c30 Mon Sep 17 00:00:00 2001 From: Thom Nichols Date: Thu, 3 Jun 2010 08:07:56 -0400 Subject: [PATCH 09/10] added function to retrieve the current state --- sleekxmpp/xmlstream/statemachine.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/sleekxmpp/xmlstream/statemachine.py b/sleekxmpp/xmlstream/statemachine.py index c5f5176..065d579 100644 --- a/sleekxmpp/xmlstream/statemachine.py +++ b/sleekxmpp/xmlstream/statemachine.py @@ -110,7 +110,14 @@ class StateMachine(object): def reset(self): # TODO need to lock before calling this? self.transition(self.__current_state, self._default_state) - + + + def current_state(self): + ''' + Return the current state name. + ''' + return self.__current_state + def __getitem__(self, state): ''' From da6e1e47dc81f5f9579201644d7c18dd85510368 Mon Sep 17 00:00:00 2001 From: Thom Nichols Date: Thu, 3 Jun 2010 08:09:09 -0400 Subject: [PATCH 10/10] whups, somehow I lost the 'connecting' lock in connect() --- sleekxmpp/xmlstream/xmlstream.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/sleekxmpp/xmlstream/xmlstream.py b/sleekxmpp/xmlstream/xmlstream.py index af95c2b..76aecee 100644 --- a/sleekxmpp/xmlstream/xmlstream.py +++ b/sleekxmpp/xmlstream/xmlstream.py @@ -101,6 +101,10 @@ class XMLStream(object): def connect(self, host='', port=0, use_ssl=None, use_tls=None): "Link to connectTCP" + if not self.state.transition('disconnected','connecting'): + logging.warning("Can't connect now; Already in state %s", self.state.current_state()) + return False + if not self.connectTCP(host, port, use_ssl, use_tls): # return to the 'disconnected' state if connect failed: # otherwise the connect method is not reentrant