Remove stream feature handlers on session_start.

Based on profiling, using around 35 stream handlers quarters the number
of basic message stanzas that can be processed in a second, in
comparison to only using the bare minimum of four handlers.

To help, we can drop handlers for stream features once the session
has started. So that we can re-enable these handlers when a stream
must restart, the 'stream_start' event has been added which fires
whenever a stream header is received.

The 'stream_start' event is a more generic replacement for the
existing start_stream_handler() method.
This commit is contained in:
Lance Stout 2012-01-17 22:14:24 -08:00
parent a4b27ff031
commit 4274f49ada
5 changed files with 43 additions and 13 deletions

View file

@ -98,13 +98,11 @@ class ClientXMPP(BaseXMPP):
self.add_event_handler('connected', self._handle_connected) self.add_event_handler('connected', self._handle_connected)
self.add_event_handler('session_bind', self._handle_session_bind) self.add_event_handler('session_bind', self._handle_session_bind)
self.add_event_handler('stream_start', self._handle_stream_start)
self.add_event_handler('session_start', self._handle_session_start)
self.register_stanza(StreamFeatures) self.register_stanza(StreamFeatures)
self.register_handler(
Callback('Stream Features',
MatchXPath('{%s}features' % self.stream_ns),
self._handle_stream_features))
self.register_handler( self.register_handler(
Callback('Roster Update', Callback('Roster Update',
MatchXPath('{%s}iq/{%s}query' % ( MatchXPath('{%s}iq/{%s}query' % (
@ -119,6 +117,15 @@ class ClientXMPP(BaseXMPP):
self.register_plugin('feature_mechanisms', self.register_plugin('feature_mechanisms',
pconfig={'use_mech': sasl_mech} if sasl_mech else None) pconfig={'use_mech': sasl_mech} if sasl_mech else None)
def _handle_stream_start(self, root):
self.register_handler(
Callback('Stream Features',
MatchXPath('{%s}features' % self.stream_ns),
self._handle_stream_features))
def _handle_session_start(self, e):
self.remove_handler('Stream Features')
def connect(self, address=tuple(), reattempt=True, def connect(self, address=tuple(), reattempt=True,
use_tls=True, use_ssl=False): use_tls=True, use_ssl=False):
"""Connect to the XMPP server. """Connect to the XMPP server.

View file

@ -63,6 +63,17 @@ class feature_mechanisms(base_plugin):
self.xmpp.register_stanza(stanza.Challenge) self.xmpp.register_stanza(stanza.Challenge)
self.xmpp.register_stanza(stanza.Response) self.xmpp.register_stanza(stanza.Response)
self.xmpp.add_event_handler('stream_start',
self._handle_stream_start)
self.xmpp.add_event_handler('session_start',
self._handle_session_start)
self.xmpp.register_feature('mechanisms',
self._handle_sasl_auth,
restart=True,
order=self.config.get('order', 100))
def _handle_stream_start(self, root):
self.xmpp.register_handler( self.xmpp.register_handler(
Callback('SASL Success', Callback('SASL Success',
MatchXPath(stanza.Success.tag_name()), MatchXPath(stanza.Success.tag_name()),
@ -80,10 +91,10 @@ class feature_mechanisms(base_plugin):
MatchXPath(stanza.Challenge.tag_name()), MatchXPath(stanza.Challenge.tag_name()),
self._handle_challenge)) self._handle_challenge))
self.xmpp.register_feature('mechanisms', def _handle_session_start(self, e):
self._handle_sasl_auth, self.xmpp.remove_handler('SASL Success')
restart=True, self.xmpp.remove_handler('SASL Failure')
order=self.config.get('order', 100)) self.xmpp.remove_handler('SASL Challenge')
def _handle_sasl_auth(self, features): def _handle_sasl_auth(self, features):
""" """

View file

@ -27,20 +27,30 @@ class feature_starttls(base_plugin):
self.description = "STARTTLS Stream Feature" self.description = "STARTTLS Stream Feature"
self.stanza = stanza self.stanza = stanza
self.xmpp.register_handler(
Callback('STARTTLS Proceed',
MatchXPath(stanza.Proceed.tag_name()),
self._handle_starttls_proceed,
instream=True))
self.xmpp.register_feature('starttls', self.xmpp.register_feature('starttls',
self._handle_starttls, self._handle_starttls,
restart=True, restart=True,
order=self.config.get('order', 0)) order=self.config.get('order', 0))
self.xmpp.add_event_handler('stream_start',
self._handle_stream_start)
self.xmpp.add_event_handler('session_start',
self._handle_session_start)
self.xmpp.register_stanza(stanza.Proceed) self.xmpp.register_stanza(stanza.Proceed)
self.xmpp.register_stanza(stanza.Failure) self.xmpp.register_stanza(stanza.Failure)
register_stanza_plugin(StreamFeatures, stanza.STARTTLS) register_stanza_plugin(StreamFeatures, stanza.STARTTLS)
def _handle_stream_start(self, root):
self.xmpp.register_handler(
Callback('STARTTLS Proceed',
MatchXPath(stanza.Proceed.tag_name()),
self._handle_starttls_proceed,
instream=True))
def _handle_session_start(self, e):
self.xmpp.remove_handler('STARTTLS Proceed')
def _handle_starttls(self, features): def _handle_starttls(self, features):
""" """
Handle notification that the server supports TLS. Handle notification that the server supports TLS.

View file

@ -367,6 +367,7 @@ class SleekTest(unittest.TestCase):
if skip: if skip:
if socket != 'live': if socket != 'live':
# Mark send queue as usable # Mark send queue as usable
self.xmpp.event('session_start')
self.xmpp.session_started_event.set() self.xmpp.session_started_event.set()
# Clear startup stanzas # Clear startup stanzas
self.xmpp.socket.next_sent(timeout=1) self.xmpp.socket.next_sent(timeout=1)

View file

@ -1223,6 +1223,7 @@ class XMLStream(object):
# as handshakes. # as handshakes.
self.stream_end_event.clear() self.stream_end_event.clear()
self.start_stream_handler(root) self.start_stream_handler(root)
self.event('stream_start', root, direct=True)
depth += 1 depth += 1
if event == b'end': if event == b'end':
depth -= 1 depth -= 1