Made first pass at cleaning up ClientXMPP.

Added self.stream_ns to BaseXMPP.
Moved connected/disconnected events and logging to XMLStream.
This commit is contained in:
Lance Stout 2010-10-06 14:03:19 -04:00
parent a7410f2146
commit e1866ab328
4 changed files with 358 additions and 217 deletions

View file

@ -1,5 +1,3 @@
#!/usr/bin/env python
""" """
SleekXMPP: The Sleek XMPP Library SleekXMPP: The Sleek XMPP Library
Copyright (C) 2010 Nathanael C. Fritz Copyright (C) 2010 Nathanael C. Fritz
@ -7,245 +5,384 @@
See the file LICENSE for copying permission. See the file LICENSE for copying permission.
""" """
from __future__ import absolute_import, unicode_literals from __future__ import absolute_import, unicode_literals
from . basexmpp import BaseXMPP, basexmpp
from xml.etree import cElementTree as ET
from . xmlstream.xmlstream import XMLStream
from . xmlstream.xmlstream import RestartStream
from . xmlstream.matcher.xmlmask import MatchXMLMask
from . xmlstream.matcher.xpath import MatchXPath
from . xmlstream.matcher.many import MatchMany
from . xmlstream.handler.callback import Callback
from . xmlstream.stanzabase import StanzaBase
from . xmlstream import xmlstream as xmlstreammod
from . stanza.message import Message
from . stanza.iq import Iq
import time
import logging import logging
import base64 import base64
import sys import sys
import hashlib
import random import random
import copy
from . import plugins from sleekxmpp import plugins
#from . import stanza from sleekxmpp import stanza
srvsupport = True from sleekxmpp.basexmpp import BaseXMPP
from sleekxmpp.stanza import Message, Presence, Iq
from sleekxmpp.xmlstream import XMLStream, RestartStream
from sleekxmpp.xmlstream.matcher import *
from sleekxmpp.xmlstream.handler import *
from sleekxmpp.xmlstream.stanzabase import StanzaBase, ET
# Flag indicating if DNS SRV records are available for use.
SRV_SUPPORT = True
try: try:
import dns.resolver import dns.resolver
except ImportError: except:
srvsupport = False SRV_SUPPORT = False
#class PresenceStanzaType(object):
#
# def fromXML(self, xml):
# self.ptype = xml.get('type')
class ClientXMPP(BaseXMPP): class ClientXMPP(BaseXMPP):
"""SleekXMPP's client class. Use only for good, not evil."""
def __init__(self, jid, password, ssl=False, plugin_config = {}, plugin_whitelist=[], escape_quotes=True): """
BaseXMPP.__init__(self, 'jabber:client') SleekXMPP's client class.
global srvsupport
self.default_ns = 'jabber:client'
self.plugin_config = plugin_config
self.escape_quotes = escape_quotes
self.set_jid(jid)
self.plugin_whitelist = plugin_whitelist
self.auto_reconnect = True
self.srvsupport = srvsupport
self.password = password
self.registered_features = []
self.stream_header = """<stream:stream to='%s' xmlns:stream='http://etherx.jabber.org/streams' xmlns='%s' version='1.0'>""" % (self.server,self.default_ns)
self.stream_footer = "</stream:stream>"
#self.map_namespace('http://etherx.jabber.org/streams', 'stream')
#self.map_namespace('jabber:client', '')
self.features = []
#TODO: Use stream state here
self.authenticated = False
self.sessionstarted = False
self.bound = False
self.bindfail = False
self.is_component = False
self.registerHandler(Callback('Stream Features', MatchXPath('{http://etherx.jabber.org/streams}features'), self._handleStreamFeatures))
self.registerHandler(Callback('Roster Update', MatchXPath('{%s}iq/{jabber:iq:roster}query' % self.default_ns), self._handleRoster))
#self.registerHandler(Callback('Roster Update', MatchXMLMask("<presence xmlns='%s' type='subscribe' />" % self.default_ns), self._handlePresenceSubscribe, thread=True))
self.registerFeature("<starttls xmlns='urn:ietf:params:xml:ns:xmpp-tls' />", self.handler_starttls, True)
self.registerFeature("<mechanisms xmlns='urn:ietf:params:xml:ns:xmpp-sasl' />", self.handler_sasl_auth, True)
self.registerFeature("<bind xmlns='urn:ietf:params:xml:ns:xmpp-bind' />", self.handler_bind_resource)
self.registerFeature("<session xmlns='urn:ietf:params:xml:ns:xmpp-session' />", self.handler_start_session)
#self.registerStanzaExtension('PresenceStanza', PresenceStanzaType) Use only for good, not for evil.
#self.register_plugins()
def connect(self, address=tuple()): Attributes:
"""Connect to the Jabber Server. Attempts SRV lookup, and if it fails, uses
the JID server.""" Methods:
if not address or len(address) < 2: connect -- Overrides XMLStream.connect.
if not self.srvsupport: del_roster_item -- Delete a roster item.
logging.debug("Did not supply (address, port) to connect to and no SRV support is installed (http://www.dnspython.org). Continuing to attempt connection, using server hostname from JID.") get_roster -- Retrieve the roster from the server.
else: register_feature -- Register a stream feature.
logging.debug("Since no address is supplied, attempting SRV lookup.") update_roster -- Update a roster item.
try: """
answers = dns.resolver.query("_xmpp-client._tcp.%s" % self.server, dns.rdatatype.SRV)
except dns.resolver.NXDOMAIN: def __init__(self, jid, password, ssl=False, plugin_config={},
logging.debug("No appropriate SRV record found. Using JID server name.") plugin_whitelist=[], escape_quotes=True):
else: """
# pick a random answer, weighted by priority Create a new SleekXMPP client.
# there are less verbose ways of doing this (random.choice() with answer * priority), but I chose this way anyway
# suggestions are welcome Arguments:
addresses = {} jid -- The JID of the XMPP user account.
intmax = 0 password -- The password for the XMPP user account.
priorities = [] ssl -- Deprecated.
for answer in answers: plugin_config -- A dictionary of plugin configurations.
intmax += answer.priority plugin_whitelist -- A list of approved plugins that will be loaded
addresses[intmax] = (answer.target.to_text()[:-1], answer.port) when calling register_plugins.
priorities.append(intmax) # sure, I could just do priorities = addresses.keys()\n priorities.sort() escape_quotes -- Deprecated.
picked = random.randint(0, intmax) """
for priority in priorities: BaseXMPP.__init__(self, 'jabber:client')
if picked <= priority:
address = addresses[priority] # To comply with PEP8, method names now use underscores.
break # Deprecated method names are re-mapped for backwards compatibility.
if not address: self.updateRoster = self.update_roster
# if all else fails take server from JID. self.delRosterItem = self.del_roster_item
address = (self.server, 5222) self.getRoster = self.get_roster
result = XMLStream.connect(self, address[0], address[1], use_tls=True) self.registerFeature = self.register_feature
if result:
self.event("connected") self.set_jid(jid)
self.password = password
self.escape_quotes = escape_quotes
self.plugin_config = plugin_config
self.plugin_whitelist = plugin_whitelist
self.srv_support = SRV_SUPPORT
self.stream_header = "<stream:stream to='%s' %s %s version='1.0'>" % (
self.server,
"xmlns:stream='%s'" % self.stream_ns,
"xmlns='%s'" % self.default_ns)
self.stream_footer = "</stream:stream>"
self.features = []
self.registered_features = []
#TODO: Use stream state here
self.authenticated = False
self.sessionstarted = False
self.bound = False
self.bindfail = False
self.registerHandler(
Callback('Stream Features',
MatchXPath('{%s}features' % self.stream_ns),
self._handle_stream_features))
self.registerHandler(
Callback('Roster Update',
MatchXPath('{%s}iq/{%s}query' % (
self.default_ns,
'jabber:iq:roster')),
self._handle_roster))
self.registerFeature(
"<starttls xmlns='urn:ietf:params:xml:ns:xmpp-tls' />",
self._handle_starttls, True)
self.registerFeature(
"<mechanisms xmlns='urn:ietf:params:xml:ns:xmpp-sasl' />",
self._handle_sasl_auth, True)
self.registerFeature(
"<bind xmlns='urn:ietf:params:xml:ns:xmpp-bind' />",
self._handle_bind_resource)
self.registerFeature(
"<session xmlns='urn:ietf:params:xml:ns:xmpp-session' />",
self._handle_start_session)
def connect(self, address=tuple()):
"""
Connect to the XMPP server.
When no address is given, a SRV lookup for the server will
be attempted. If that fails, the server user in the JID
will be used.
Arguments:
address -- A tuple containing the server's host and port.
"""
if not address or len(address) < 2:
if not self.srv_support:
logging.debug("Did not supply (address, port) to connect" + \
" to and no SRV support is installed" + \
" (http://www.dnspython.org)." + \
" Continuing to attempt connection, using" + \
" server hostname from JID.")
else:
logging.debug("Since no address is supplied," + \
"attempting SRV lookup.")
try:
xmpp_srv = "_xmpp-client._tcp.%s" % self.server
answers = dns.resolver.query(xmpp_srv, dns.rdatatype.SRV)
except dns.resolver.NXDOMAIN:
logging.debug("No appropriate SRV record found." + \
" Using JID server name.")
else: else:
logging.warning("Failed to connect") # Pick a random server, weighted by priority.
self.event("disconnected")
return result
# overriding reconnect and disconnect so that we can get some events addresses = {}
# should events be part of or required by xmlstream? Maybe that would be cleaner intmax = 0
def reconnect(self): for answer in answers:
logging.info("Reconnecting") intmax += answer.priority
self.event("disconnected") addresses[intmax] = (answer.target.to_text()[:-1],
XMLStream.reconnect(self) answer.port)
priorities = addresses.keys()
priorities.sort()
def disconnect(self, init=True, close=False, reconnect=False): picked = random.randint(0, intmax)
self.event("disconnected") for priority in priorities:
XMLStream.disconnect(self, reconnect) if picked <= priority:
address = addresses[priority]
break
def registerFeature(self, mask, pointer, breaker = False): if not address:
"""Register a stream feature.""" # If all else fails, use the server from the JID.
self.registered_features.append((MatchXMLMask(mask), pointer, breaker)) address = (self.server, 5222)
def updateRoster(self, jid, name=None, subscription=None, groups=[]): return XMLStream.connect(self, address[0], address[1], use_tls=True)
"""Add or change a roster item."""
iq = self.Iq().setStanzaValues({'type': 'set'})
iq['roster']['items'] = {jid: {'name': name, 'subscription': subscription, 'groups': groups}}
#self.send(iq, self.Iq().setValues({'id': iq['id']}))
r = iq.send()
return r['type'] == 'result'
def delRosterItem(self, jid): def register_feature(self, mask, pointer, breaker=False):
iq = self.Iq() """
iq['type'] = 'set' Register a stream feature.
iq['roster']['items'] = {jid: {'subscription': 'remove'}}
return iq.send()['type'] == 'result'
def getRoster(self): Arguments:
"""Request the roster be sent.""" mask -- An XML string matching the feature's element.
iq = self.Iq().setStanzaValues({'type': 'get'}).enable('roster').send() pointer -- The function to execute if the feature is received.
self._handleRoster(iq, request=True) breaker -- Indicates if feature processing should halt with
this feature. Defaults to False.
"""
self.registered_features.append((MatchXMLMask(mask),
pointer,
breaker))
def _handleStreamFeatures(self, features): def update_roster(self, jid, name=None, subscription=None, groups=[]):
self.features = [] """
for sub in features.xml: Add or change a roster item.
self.features.append(sub.tag)
for subelement in features.xml:
for feature in self.registered_features:
if feature[0].match(subelement):
#if self.maskcmp(subelement, feature[0], True):
if feature[1](subelement) and feature[2]: #if breaker, don't continue
return True
def handler_starttls(self, xml): Arguments:
if not self.authenticated and self.ssl_support: jid -- The JID of the entry to modify.
self.add_handler("<proceed xmlns='urn:ietf:params:xml:ns:xmpp-tls' />", self.handler_tls_start, name='TLS Proceed', instream=True) name -- The user's nickname for this JID.
self.sendXML(xml) subscription -- The subscription status. May be one of
'to', 'from', 'both', or 'none'. If set
to 'remove', the entry will be deleted.
groups -- The roster groups that contain this item.
"""
iq = self.Iq().setStanzaValues({'type': 'set'})
iq['roster']['items'] = {jid: {'name': name,
'subscription': subscription,
'groups': groups}}
resp = iq.send()
return resp['type'] == 'result'
def del_roster_item(self, jid):
"""
Remove an item from the roster by setting its subscription
status to 'remove'.
Arguments:
jid -- The JID of the item to remove.
"""
return self.update_roster(jid, subscription='remove')
def get_roster(self):
"""Request the roster from the server."""
iq = self.Iq().setStanzaValues({'type': 'get'}).enable('roster')
iq.send()
self._handle_roster(iq, request=True)
def _handle_stream_features(self, features):
"""
Process the received stream features.
Arguments:
features -- The features stanza.
"""
# Record all of the features.
self.features = []
for sub in features.xml:
self.features.append(sub.tag)
# Process the features.
for sub in features.xml:
for feature in self.registered_features:
mask, handler, halt = feature
if mask.match(sub):
if handler(sub) and halt:
# Don't continue if the feature was
# marked as a breaker.
return True return True
def _handle_starttls(self, xml):
if not self.authenticated and self.ssl_support:
tls_ns = 'urn:ietf:params:xml:ns:xmpp-tls'
self.add_handler("<proceed xmlns='%s' />" % tls_ns,
self._handle_tls_start,
name='TLS Proceed',
instream=True)
self.send_xml(xml)
return True
else:
logging.warning("The module tlslite is required to log in" +\
" to some servers, and has not been found.")
return False
def _handle_tls_start(self, xml):
logging.debug("Starting TLS")
if self.start_tls():
raise RestartStream()
def _handle_sasl_auth(self, xml):
if '{urn:ietf:params:xml:ns:xmpp-tls}starttls' in self.features:
return False
logging.debug("Starting SASL Auth")
sasl_ns = 'urn:ietf:params:xml:ns:xmpp-sasl'
self.add_handler("<success xmlns='%s' />" % sasl_ns,
self._handle_auth_success,
name='SASL Sucess',
instream=True)
self.add_handler("<failure xmlns='%s' />" % sasl_ns,
self._handle_auth_fail,
name='SASL Failure',
instream=True)
sasl_mechs = xml.findall('{%s}mechanism' % sasl_ns)
if sasl_mechs:
for sasl_mech in sasl_mechs:
self.features.append("sasl:%s" % sasl_mech.text)
if 'sasl:PLAIN' in self.features:
if sys.version_info < (3, 0):
user = bytes(self.username)
password = bytes(self.password)
else: else:
logging.warning("The module tlslite is required in to some servers, and has not been found.") user = bytes(self.username, 'utf-8')
return False password = bytes(self.password, 'utf-8')
def handler_tls_start(self, xml): auth = base64.b64encode(b'\x00' + user + \
logging.debug("Starting TLS") b'\x00' + password).decode('utf-8')
if self.startTLS():
raise RestartStream()
def handler_sasl_auth(self, xml): self.send("<auth xmlns='%s' mechanism='PLAIN'>%s</auth>" % (
if '{urn:ietf:params:xml:ns:xmpp-tls}starttls' in self.features: sasl_ns,
return False auth))
logging.debug("Starting SASL Auth") else:
self.add_handler("<success xmlns='urn:ietf:params:xml:ns:xmpp-sasl' />", self.handler_auth_success, name='SASL Sucess', instream=True) logging.error("No appropriate login method.")
self.add_handler("<failure xmlns='urn:ietf:params:xml:ns:xmpp-sasl' />", self.handler_auth_fail, name='SASL Failure', instream=True)
sasl_mechs = xml.findall('{urn:ietf:params:xml:ns:xmpp-sasl}mechanism')
if len(sasl_mechs):
for sasl_mech in sasl_mechs:
self.features.append("sasl:%s" % sasl_mech.text)
if 'sasl:PLAIN' in self.features:
if sys.version_info < (3,0):
self.send("""<auth xmlns='urn:ietf:params:xml:ns:xmpp-sasl' mechanism='PLAIN'>%s</auth>""" % base64.b64encode(b'\x00' + bytes(self.username) + b'\x00' + bytes(self.password)).decode('utf-8'))
else:
self.send("""<auth xmlns='urn:ietf:params:xml:ns:xmpp-sasl' mechanism='PLAIN'>%s</auth>""" % base64.b64encode(b'\x00' + bytes(self.username, 'utf-8') + b'\x00' + bytes(self.password, 'utf-8')).decode('utf-8'))
else:
logging.error("No appropriate login method.")
self.disconnect()
#if 'sasl:DIGEST-MD5' in self.features:
# self._auth_digestmd5()
return True
def handler_auth_success(self, xml):
self.authenticated = True
self.features = []
raise RestartStream()
def handler_auth_fail(self, xml):
logging.info("Authentication failed.")
self.disconnect() self.disconnect()
self.event("failed_auth") return True
def handler_bind_resource(self, xml): def _handle_auth_success(self, xml):
logging.debug("Requesting resource: %s" % self.resource) """
xml.clear() SASL authentication succeeded. Restart the stream.
iq = self.Iq(stype='set')
if self.resource:
res = ET.Element('resource')
res.text = self.resource
xml.append(res)
iq.append(xml)
response = iq.send()
#response = self.send(iq, self.Iq(sid=iq['id']))
self.set_jid(response.xml.find('{urn:ietf:params:xml:ns:xmpp-bind}bind/{urn:ietf:params:xml:ns:xmpp-bind}jid').text)
self.bound = True
logging.info("Node set to: %s" % self.fulljid)
if "{urn:ietf:params:xml:ns:xmpp-session}session" not in self.features or self.bindfail:
logging.debug("Established Session")
self.sessionstarted = True
self.event("session_start")
def handler_start_session(self, xml): Arguments:
if self.authenticated and self.bound: xml -- The SASL authentication success element.
iq = self.makeIqSet(xml) """
response = iq.send() self.authenticated = True
logging.debug("Established Session") self.features = []
self.sessionstarted = True raise RestartStream()
self.event("session_start")
else:
#bind probably hasn't happened yet
self.bindfail = True
def _handleRoster(self, iq, request=False): def _handle_auth_fail(self, xml):
if iq['type'] == 'set' or (iq['type'] == 'result' and request): """
for jid in iq['roster']['items']: SASL authentication failed. Disconnect and shutdown.
if not jid in self.roster:
self.roster[jid] = {'groups': [], 'name': '', 'subscription': 'none', 'presence': {}, 'in_roster': True} Arguments:
self.roster[jid].update(iq['roster']['items'][jid]) xml -- The SASL authentication failure element.
if iq['type'] == 'set': """
self.send(self.Iq().setStanzaValues({'type': 'result', 'id': iq['id']}).enable('roster')) logging.info("Authentication failed.")
self.event("roster_update", iq) self.disconnect()
self.event("failed_auth")
def _handle_bind_resource(self, xml):
"""
Handle requesting a specific resource.
Arguments:
xml -- The bind feature element.
"""
logging.debug("Requesting resource: %s" % self.resource)
xml.clear()
iq = self.Iq(stype='set')
if self.resource:
res = ET.Element('resource')
res.text = self.resource
xml.append(res)
iq.append(xml)
response = iq.send()
bind_ns = 'urn:ietf:params:xml:ns:xmpp-bind'
self.set_jid(response.xml.find('{%s}bind/{%s}jid' % (bind_ns,
bind_ns)).text)
self.bound = True
logging.info("Node set to: %s" % self.fulljid)
session_ns = 'urn:ietf:params:xml:ns:xmpp-session'
if "{%s}session" % session_ns not in self.features or self.bindfail:
logging.debug("Established Session")
self.sessionstarted = True
self.event("session_start")
def _handle_start_session(self, xml):
"""
Handle the start of the session.
Arguments:
xml -- The session feature element.
"""
if self.authenticated and self.bound:
iq = self.makeIqSet(xml)
response = iq.send()
logging.debug("Established Session")
self.sessionstarted = True
self.event("session_start")
else:
# Bind probably hasn't happened yet.
self.bindfail = True
def _handle_roster(self, iq, request=False):
"""
Update the roster after receiving a roster stanza.
Arguments:
iq -- The roster stanza.
request -- Indicates if this stanza is a response
to a request for the roster.
"""
if iq['type'] == 'set' or (iq['type'] == 'result' and request):
for jid in iq['roster']['items']:
if not jid in self.roster:
self.roster[jid] = {'groups': [],
'name': '',
'subscription': 'none',
'presence': {},
'in_roster': True}
self.roster[jid].update(iq['roster']['items'][jid])
self.event("roster_update", iq)
if iq['type'] == 'set':
iq.reply()
iq.enable('roster')
iq.send()

View file

@ -111,6 +111,7 @@ class BaseXMPP(XMLStream):
self.sendPresenceSubscription = self.send_presence_subscription self.sendPresenceSubscription = self.send_presence_subscription
self.default_ns = default_ns self.default_ns = default_ns
self.stream_ns = 'http://etherx.jabber.org/streams'
self.jid = '' self.jid = ''
self.fulljid = '' self.fulljid = ''

View file

@ -60,7 +60,7 @@ class ComponentXMPP(BaseXMPP):
self.auto_authorize = None self.auto_authorize = None
self.stream_header = "<stream:stream %s %s to='%s'>" % ( self.stream_header = "<stream:stream %s %s to='%s'>" % (
'xmlns="jabber:component:accept"', 'xmlns="jabber:component:accept"',
'xmlns:stream="http://etherx.jabber.org/streams"', 'xmlns:stream="%s"' % self.stream_ns,
jid) jid)
self.stream_footer = "</stream:stream>" self.stream_footer = "</stream:stream>"
self.server_host = host self.server_host = host

View file

@ -269,6 +269,7 @@ class XMLStream(object):
and processing should be restarted. and processing should be restarted.
Defaults to False. Defaults to False.
""" """
self.event("disconnected")
self.state.set('reconnect', reconnect) self.state.set('reconnect', reconnect)
if self.state['disconnecting']: if self.state['disconnecting']:
return return
@ -294,6 +295,8 @@ class XMLStream(object):
""" """
Reset the stream's state and reconnect to the server. Reset the stream's state and reconnect to the server.
""" """
logging.info("Reconnecting")
self.event("disconnected")
self.state.set('tls', False) self.state.set('tls', False)
self.state.set('ssl', False) self.state.set('ssl', False)
time.sleep(1) time.sleep(1)