diff --git a/sleekxmpp/__init__.py b/sleekxmpp/__init__.py index 8e664a7..422b50f 100644 --- a/sleekxmpp/__init__.py +++ b/sleekxmpp/__init__.py @@ -138,14 +138,14 @@ class ClientXMPP(basexmpp, XMLStream): if result: self.event("connected") else: - print "** Failed to connect -- disconnected" + logging.warning("Failed to connect") self.event("disconnected") return result # overriding reconnect and disconnect so that we can get some events # should events be part of or required by xmlstream? Maybe that would be cleaner def reconnect(self): - print "** Reconnect -- disconnected" + logging.info("Reconnecting") self.event("disconnected") XMLStream.reconnect(self) @@ -192,7 +192,7 @@ class ClientXMPP(basexmpp, XMLStream): def handler_starttls(self, xml): if self.ssl_support: - self.add_handler("", self.handler_tls_start) + self.add_handler("", self.handler_tls_start, instream=True) self.send(xml) return True else: @@ -206,14 +206,14 @@ class ClientXMPP(basexmpp, XMLStream): def handler_sasl_auth(self, xml): logging.debug("Starting SASL Auth") - self.add_handler("", self.handler_auth_success) - self.add_handler("", self.handler_auth_fail) + self.add_handler("", self.handler_auth_success, instream=True) + self.add_handler("", self.handler_auth_fail, instream=True) sasl_mechs = xml.findall('{urn:ietf:params:xml:ns:xmpp-sasl}mechanism') if len(sasl_mechs): for sasl_mech in sasl_mechs: self.features.append("sasl:%s" % sasl_mech.text) if 'sasl:PLAIN' in self.features: - self.send("""%s""" % str(base64.b64encode('\x00' + self.username + '\x00' + self.password))) + self.send("""%s""" % base64.b64encode(b'\x00' + bytes(self.username, 'utf-8') + b'\x00' + bytes(self.password, 'utf-8')).decode('utf-8')) else: logging.error("No appropriate login method.") self.disconnect() diff --git a/sleekxmpp/basexmpp.py b/sleekxmpp/basexmpp.py index 87b6049..e120a71 100644 --- a/sleekxmpp/basexmpp.py +++ b/sleekxmpp/basexmpp.py @@ -137,9 +137,9 @@ class basexmpp(object): self.id += 1 return self.getId() - def add_handler(self, mask, pointer, disposable=False, threaded=False, filter=False): + def add_handler(self, mask, pointer, disposable=False, threaded=False, filter=False, instream=False): #logging.warning("Deprecated add_handler used for %s: %s." % (mask, pointer)) - self.registerHandler(XMLCallback('add_handler_%s' % self.getNewId(), MatchXMLMask(mask), pointer, threaded, disposable)) + self.registerHandler(XMLCallback('add_handler_%s' % self.getNewId(), MatchXMLMask(mask), pointer, threaded, disposable, instream)) def getId(self): return "%x".upper() % self.id diff --git a/sleekxmpp/component_example.py b/sleekxmpp/component_example.py index 0480237..37b7e96 100644 --- a/sleekxmpp/component_example.py +++ b/sleekxmpp/component_example.py @@ -6,14 +6,14 @@ import time class Example(sleekxmpp.componentxmpp.ComponentXMPP): def __init__(self, jid, password): - sleekxmpp.componentxmpp.ComponentXMPP.__init__(self, jid, password, 'localhost', 5060) + sleekxmpp.componentxmpp.ComponentXMPP.__init__(self, jid, password, 'vm1', 5230) self.add_event_handler("session_start", self.start) self.add_event_handler("message", self.message) def start(self, event): #self.getRoster() #self.sendPresence(pto='admin@tigase.netflint.net/sarkozy') - self.sendPresence(pto='tigase.netflint.net') + #self.sendPresence(pto='tigase.netflint.net') pass def message(self, event): @@ -30,7 +30,7 @@ if __name__ == '__main__': opts,args = optp.parse_args() logging.basicConfig(level=opts.loglevel, format='%(levelname)-8s %(message)s') - xmpp = Example('component.server.tld', 'asdfasdf') + xmpp = Example('component.vm1', 'secreteating') xmpp.registerPlugin('xep_0004') xmpp.registerPlugin('xep_0030') xmpp.registerPlugin('xep_0060') diff --git a/sleekxmpp/plugins/xep_0009.py b/sleekxmpp/plugins/xep_0009.py index c6b7b5d..e0da829 100644 --- a/sleekxmpp/plugins/xep_0009.py +++ b/sleekxmpp/plugins/xep_0009.py @@ -2,7 +2,7 @@ XEP-0009 XMPP Remote Procedure Calls """ from __future__ import with_statement -import base +from . import base import logging from xml.etree import cElementTree as ET import copy diff --git a/sleekxmpp/plugins/xep_0030.py b/sleekxmpp/plugins/xep_0030.py index d379530..fc92102 100644 --- a/sleekxmpp/plugins/xep_0030.py +++ b/sleekxmpp/plugins/xep_0030.py @@ -17,11 +17,9 @@ along with SleekXMPP; if not, write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA """ -from __future__ import absolute_import, with_statement from . import base import logging from xml.etree import cElementTree as ET -import thread class xep_0030(base.base_plugin): """ @@ -36,13 +34,11 @@ class xep_0030(base.base_plugin): self.items = {'main': []} self.xmpp.add_handler("" % self.xmpp.default_ns, self.info_handler) self.xmpp.add_handler("" % self.xmpp.default_ns, self.item_handler) - self.lock = thread.allocate_lock() def add_feature(self, feature, node='main'): - with self.lock: - if not self.features.has_key(node): - self.features[node] = [] - self.features[node].append(feature) + if not self.features.has_key(node): + self.features[node] = [] + self.features[node].append(feature) def add_identity(self, category=None, itype=None, name=None, node='main'): if not self.identities.has_key(node): diff --git a/sleekxmpp/plugins/xep_0045.py b/sleekxmpp/plugins/xep_0045.py index a85bfec..b052375 100644 --- a/sleekxmpp/plugins/xep_0045.py +++ b/sleekxmpp/plugins/xep_0045.py @@ -18,7 +18,7 @@ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA """ from __future__ import with_statement -import base +from . import base import logging from xml.etree import cElementTree as ET diff --git a/sleekxmpp/plugins/xep_0050.py b/sleekxmpp/plugins/xep_0050.py index 80025c8..20e1057 100644 --- a/sleekxmpp/plugins/xep_0050.py +++ b/sleekxmpp/plugins/xep_0050.py @@ -18,12 +18,11 @@ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA """ from __future__ import with_statement -import base +from . import base import logging from xml.etree import cElementTree as ET import traceback import time -import thread class xep_0050(base.base_plugin): """ diff --git a/sleekxmpp/plugins/xep_0060.py b/sleekxmpp/plugins/xep_0060.py index b5e338a..68c391f 100644 --- a/sleekxmpp/plugins/xep_0060.py +++ b/sleekxmpp/plugins/xep_0060.py @@ -190,7 +190,6 @@ class xep_0060(base.base_plugin): id = iq.get('id') result = self.xmpp.send(iq, "" % id) if result is None or result.get('type') == 'error': - print "---------- returning false, apparently" return False return True diff --git a/sleekxmpp/plugins/xep_0078.py b/sleekxmpp/plugins/xep_0078.py index 28aaeb2..24afc87 100644 --- a/sleekxmpp/plugins/xep_0078.py +++ b/sleekxmpp/plugins/xep_0078.py @@ -20,8 +20,8 @@ from __future__ import with_statement from xml.etree import cElementTree as ET import logging -import sha -import base +import hashlib +from . import base class xep_0078(base.base_plugin): @@ -66,7 +66,7 @@ class xep_0078(base.base_plugin): else: logging.debug("Authenticating via jabber:iq:auth Digest") digest = ET.Element('digest') - digest.text = sha.sha("%s%s" % (self.streamid, self.xmpp.password)).hexdigest() + digest.text = hashlib.sha1(b"%s%s" % (self.streamid, self.xmpp.password)).hexdigest() query.append(digest) attempt.append(query) result = self.xmpp.send(attempt, self.xmpp.makeIq(self.xmpp.id)) diff --git a/sleekxmpp/plugins/xep_0086.py b/sleekxmpp/plugins/xep_0086.py index 6871ef3..e6c18c7 100644 --- a/sleekxmpp/plugins/xep_0086.py +++ b/sleekxmpp/plugins/xep_0086.py @@ -1,6 +1,6 @@ from __future__ import with_statement -import base +from . import base import logging from xml.etree import cElementTree as ET import copy diff --git a/sleekxmpp/plugins/xep_0199.py b/sleekxmpp/plugins/xep_0199.py index cab84ac..57d56c0 100644 --- a/sleekxmpp/plugins/xep_0199.py +++ b/sleekxmpp/plugins/xep_0199.py @@ -31,8 +31,8 @@ class xep_0199(base.base_plugin): self.xep = "0199" self.xmpp.add_handler("" % self.xmpp.default_ns, self.handler_ping) self.running = False - if self.config.get('keepalive', True): - self.xmpp.add_event_handler('session_start', self.handler_pingserver, threaded=True) + #if self.config.get('keepalive', True): + #self.xmpp.add_event_handler('session_start', self.handler_pingserver, threaded=True) def post_init(self): self.xmpp['xep_0030'].add_feature('http://www.xmpp.org/extensions/xep-0199.html#ns') diff --git a/sleekxmpp/xmlstream/handler/base.py b/sleekxmpp/xmlstream/handler/base.py index 810aac9..9a95119 100644 --- a/sleekxmpp/xmlstream/handler/base.py +++ b/sleekxmpp/xmlstream/handler/base.py @@ -11,6 +11,9 @@ class BaseHandler(object): def match(self, xml): return self._matcher.match(xml) + def prerun(self, payload): + self._payload = payload + def run(self, payload): self._payload = payload diff --git a/sleekxmpp/xmlstream/handler/callback.py b/sleekxmpp/xmlstream/handler/callback.py index e3ef8cc..c618b71 100644 --- a/sleekxmpp/xmlstream/handler/callback.py +++ b/sleekxmpp/xmlstream/handler/callback.py @@ -1,20 +1,26 @@ from . import base -import threading class Callback(base.BaseHandler): - def __init__(self, name, matcher, pointer, thread=False, once=False): + def __init__(self, name, matcher, pointer, thread=False, once=False, instream=False): base.BaseHandler.__init__(self, name, matcher) self._pointer = pointer self._thread = thread self._once = once + self._instream = instream + + def prerun(self, payload): + base.BaseHandler.prerun(self, payload) + if self._instream: + self.run(payload, True) - def run(self, payload): - base.BaseHandler.run(self, payload) - if self._thread: - x = threading.Thread(name="Callback_%s" % self.name, target=self._pointer, args=(payload,)) - x.start() - else: + def run(self, payload, instream=False): + if not self._instream or instream: + base.BaseHandler.run(self, payload) + #if self._thread: + # x = threading.Thread(name="Callback_%s" % self.name, target=self._pointer, args=(payload,)) + # x.start() + #else: self._pointer(payload) - if self._once: - self._destroy = True + if self._once: + self._destroy = True diff --git a/sleekxmpp/xmlstream/handler/waiter.py b/sleekxmpp/xmlstream/handler/waiter.py index 7c06ddf..e62f330 100644 --- a/sleekxmpp/xmlstream/handler/waiter.py +++ b/sleekxmpp/xmlstream/handler/waiter.py @@ -1,20 +1,23 @@ from . import base -import Queue +import queue import logging class Waiter(base.BaseHandler): def __init__(self, name, matcher): base.BaseHandler.__init__(self, name, matcher) - self._payload = Queue.Queue() + self._payload = queue.Queue() + + def prerun(self, payload): + self._payload.put(payload) def run(self, payload): - self._payload.put(payload) + pass def wait(self, timeout=60): try: return self._payload.get(True, timeout) - except Queue.Empty: + except queue.Empty: return False def checkDelete(self): diff --git a/sleekxmpp/xmlstream/handler/xmlcallback.py b/sleekxmpp/xmlstream/handler/xmlcallback.py index 50d3d5f..ae288ff 100644 --- a/sleekxmpp/xmlstream/handler/xmlcallback.py +++ b/sleekxmpp/xmlstream/handler/xmlcallback.py @@ -3,5 +3,5 @@ from . callback import Callback class XMLCallback(Callback): - def run(self, payload): - Callback.run(self, payload.xml) + def run(self, payload, instream=False): + Callback.run(self, payload.xml, instream) diff --git a/sleekxmpp/xmlstream/handler/xmlwaiter.py b/sleekxmpp/xmlstream/handler/xmlwaiter.py index 9b2b339..1e524b0 100644 --- a/sleekxmpp/xmlstream/handler/xmlwaiter.py +++ b/sleekxmpp/xmlstream/handler/xmlwaiter.py @@ -2,5 +2,5 @@ from . waiter import Waiter class XMLWaiter(Waiter): - def run(self, payload): - Waiter.run(self, payload.xml) + def prerun(self, payload): + Waiter.prerun(self, payload.xml) diff --git a/sleekxmpp/xmlstream/xmlstream.py b/sleekxmpp/xmlstream/xmlstream.py index a3f3d2e..958a3b6 100644 --- a/sleekxmpp/xmlstream/xmlstream.py +++ b/sleekxmpp/xmlstream/xmlstream.py @@ -1,12 +1,12 @@ from __future__ import with_statement -import Queue +import queue from . import statemachine from . stanzabase import StanzaBase from xml.etree import cElementTree from xml.parsers import expat import logging import socket -import thread +import threading import time import traceback import types @@ -14,7 +14,7 @@ import xml.sax.saxutils ssl_support = True try: - from tlslite.api import * + import ssl except ImportError: ssl_support = False @@ -27,66 +27,6 @@ class CloseStream(Exception): stanza_extensions = {} -class _fileobject(object): # we still need this because Socket.makefile is broken in python2.5 (but it works fine in 3.0) - - def __init__(self, sock, mode='rb', bufsize=-1): - self._sock = sock - if bufsize <= 0: - bufsize = 1024 - self.bufsize = bufsize - self.softspace = False - - def read(self, size=-1): - if size <= 0: - size = sys.maxint - blocks = [] - #while size > 0: - # b = self._sock.recv(min(size, self.bufsize)) - # size -= len(b) - # if not b: - # break - # blocks.append(b) - # print size - #return "".join(blocks) - buff = self._sock.recv(self.bufsize) - logging.debug("RECV: %s" % buff) - return buff - - def readline(self, size=-1): - return self.read(size) - if size < 0: - size = sys.maxint - blocks = [] - read_size = min(20, size) - found = 0 - while size and not found: - b = self._sock.recv(read_size, MSG_PEEK) - if not b: - break - found = b.find('\n') + 1 - length = found or len(b) - size -= length - blocks.append(self._sock.recv(length)) - read_size = min(read_size * 2, size, self.bufsize) - return "".join(blocks) - - def write(self, data): - self._sock.sendall(str(data)) - - def writelines(self, lines): - # This version mimics the current writelines, which calls - # str() on each line, but comments that we should reject - # non-string non-buffers. Let's omit the next line. - lines = [str(s) for s in lines] - self._sock.sendall(''.join(lines)) - - def flush(self): - pass - - def close(self): - self._sock.close() - - class XMLStream(object): "A connection manager with XML events." @@ -108,13 +48,18 @@ class XMLStream(object): self.__handlers = [] self.__tls_socket = None + self.filesocket = None self.use_ssl = False self.use_tls = False self.stream_header = "" self.stream_footer = "" + self.eventqueue = queue.Queue() + self.namespace_map = {} + + self.run = True def setSocket(self, socket): "Set the socket" @@ -147,10 +92,12 @@ class XMLStream(object): self.socket = ssl.wrap_socket(self.socket) try: self.socket.connect(self.address) + logging.info("creating filesocket") + self.filesocket = self.socket.makefile('rb', 0) self.state.set('connected', True) return True - except socket.error,(errno, strerror): - logging.error("Could not connect. Socket Error #%s: %s" % (errno, strerror)) + except socket.error as serr: + logging.error("Could not connect. Socket Error #%s: %s" % (serr.errno, serr.strerror)) time.sleep(1) def connectUnix(self, filepath): @@ -158,24 +105,24 @@ class XMLStream(object): def startTLS(self): "Handshakes for TLS" - #self.socket = ssl.wrap_socket(self.socket, ssl_version=ssl.PROTOCOL_TLSv1, do_handshake_on_connect=False) - #self.socket.do_handshake() if self.ssl_support: + logging.info("Negotiating TLS") self.realsocket = self.socket - self.socket = TLSConnection(self.socket) - self.socket.handshakeClientCert() - self.file = _fileobject(self.socket) + self.socket = ssl.wrap_socket(self.socket, ssl_version=ssl.PROTOCOL_TLSv1, do_handshake_on_connect=False) + self.socket.do_handshake() + self.filesocket = self.socket.makefile('rb', 0) return True else: - logging.warning("Tried to enable TLS, but tlslite module not found.") + logging.warning("Tried to enable TLS, but ssl module not found.") return False raise RestartStream() def process(self, threaded=True): - #self.__thread['process'] = threading.Thread(name='process', target=self._process) - #self.__thread['process'].start() + self.__thread['eventhandle'] = threading.Thread(name='eventhandle', target=self._eventRunner) + self.__thread['eventhandle'].start() if threaded: - thread.start_new(self._process, tuple()) + self.__thread['process'] = threading.Thread(name='process', target=self._process) + self.__thread['process'].start() else: self._process() @@ -196,12 +143,15 @@ class XMLStream(object): self.state.set('processing', False) self.state.set('reconnect', False) self.disconnect() + self.run = False + self.eventqueue.put(('quit', None, None)) return except CloseStream: return except SystemExit: + self.eventqueue.put(('quit', None, None)) return - except socket.EBADF: + except socket.error: if not self.state.reconnect: return else: @@ -218,6 +168,7 @@ class XMLStream(object): if self.state['reconnect']: self.reconnect() self.state.set('processing', False) + self.eventqueue.put(('quit', None, None)) #self.__thread['readXML'] = threading.Thread(name='readXML', target=self.__readXML) #self.__thread['readXML'].start() #self.__thread['spawnEvents'] = threading.Thread(name='spawnEvents', target=self.__spawnEvents) @@ -226,18 +177,17 @@ class XMLStream(object): def __readXML(self): "Parses the incoming stream, adding to xmlin queue as it goes" #build cElementTree object from expat was we go - #self.filesocket = self.socket.makefile('rb',0) #this is broken in python2.5, but works in python3.0 - self.filesocket = _fileobject(self.socket) + #self.filesocket = self.socket.makefile('rb', 0) edepth = 0 root = None - for (event, xmlobj) in cElementTree.iterparse(self.filesocket, ('end', 'start')): + for (event, xmlobj) in cElementTree.iterparse(self.filesocket, (b'end', b'start')): if edepth == 0: # and xmlobj.tag.split('}', 1)[-1] == self.basetag: - if event == 'start': + if event == b'start': root = xmlobj self.start_stream_handler(root) - if event == 'end': + if event == b'end': edepth += -1 - if edepth == 0 and event == 'end': + if edepth == 0 and event == b'end': return False elif edepth == 1: #self.xmlin.put(xmlobj) @@ -249,15 +199,13 @@ class XMLStream(object): return False if root: root.clear() - if event == 'start': + if event == b'start': edepth += 1 def sendRaw(self, data): logging.debug("SEND: %s" % data) - if type(data) == type(u''): - data = data.encode('utf-8') try: - self.socket.send(data) + self.socket.send(bytes(data, "utf-8")) #except socket.error,(errno, strerror): except: self.state.set('connected', False) @@ -277,7 +225,7 @@ class XMLStream(object): self.socket.close() self.filesocket.close() self.socket.shutdown(socket.SHUT_RDWR) - except socket.error,(errno,strerror): + except socket.error as serr: #logging.warning("Error while disconnecting. Socket Error #%s: %s" % (errno, strerror)) #thread.exit_thread() pass @@ -308,12 +256,28 @@ class XMLStream(object): stanza = StanzaBase(self, xmlobj) for handler in self.__handlers: if handler.match(xmlobj): - handler.run(stanza) + handler.prerun(stanza) + self.eventqueue.put(('stanza', handler, stanza)) if handler.checkDelete(): self.__handlers.pop(self.__handlers.index(handler)) #loop through handlers and test match #spawn threads as necessary, call handlers, sending Stanza + def _eventRunner(self): + logging.debug("Loading event runner") + while self.run: + try: + event = self.eventqueue.get(True, timeout=5) + except queue.Empty: + even = None + if event is not None: + etype, handler, stanza = event + if etype == 'stanza': + handler.run(stanza) + if etype == 'quit': + logging.debug("Quitting eventRunner thread") + return False + def registerHandler(self, handler, before=None, after=None): "Add handler with matcher class and parameters." self.__handlers.append(handler) @@ -386,24 +350,21 @@ class XMLStream(object): return ''.join(newoutput) def xmlesc(self, text): - if type(text) != types.UnicodeType: - text = list(unicode(text, 'utf-8', 'ignore')) - else: - text = list(text) + text = list(text) cc = 0 matches = ('&', '<', '"', '>', "'") for c in text: if c in matches: if c == '&': - text[cc] = u'&' + text[cc] = '&' elif c == '<': - text[cc] = u'<' + text[cc] = '<' elif c == '>': - text[cc] = u'>' + text[cc] = '>' elif c == "'": - text[cc] = u''' + text[cc] = ''' elif self.escape_quotes: - text[cc] = u'"' + text[cc] = '"' cc += 1 return ''.join(text)