Handle sending stanzas in chunks if the socket has poor performance.

This commit is contained in:
Lance Stout 2011-08-25 15:08:45 -07:00
parent d929e0deb2
commit b8a4ffece9
3 changed files with 26 additions and 13 deletions

View file

@ -138,7 +138,7 @@ class TestLiveSocket(object):
""" """
with self.send_queue_lock: with self.send_queue_lock:
self.send_queue.put(data) self.send_queue.put(data)
self.socket.send(data) return self.socket.send(data)
# ------------------------------------------------------------------ # ------------------------------------------------------------------
# File Socket # File Socket

View file

@ -121,6 +121,7 @@ class TestSocket(object):
if self.disconnected: if self.disconnected:
raise socket.error raise socket.error
self.send_queue.put(data) self.send_queue.put(data)
return len(data)
# ------------------------------------------------------------------ # ------------------------------------------------------------------
# File Socket # File Socket

View file

@ -915,7 +915,15 @@ class XMLStream(object):
if now: if now:
log.debug("SEND (IMMED): %s" % data) log.debug("SEND (IMMED): %s" % data)
try: try:
self.socket.send(data.encode('utf-8')) data = data.encode('utf-8')
total = len(data)
sent = 0
count = 0
while sent < total and not self.stop.is_set():
sent += self.socket.send(data[sent:])
count += 1
if count > 1:
log.debug('SENT: %d chunks' % count)
except Socket.error as serr: except Socket.error as serr:
self.event('socket_error', serr) self.event('socket_error', serr)
log.warning("Failed to send %s" % data) log.warning("Failed to send %s" % data)
@ -1222,7 +1230,7 @@ class XMLStream(object):
Extract stanzas from the send queue and send them on the stream. Extract stanzas from the send queue and send them on the stream.
""" """
try: try:
while not self.stop.isSet(): while not self.stop.is_set():
self.session_started_event.wait() self.session_started_event.wait()
if self.__failed_send_stanza is not None: if self.__failed_send_stanza is not None:
data = self.__failed_send_stanza data = self.__failed_send_stanza
@ -1234,22 +1242,26 @@ class XMLStream(object):
continue continue
log.debug("SEND: %s" % data) log.debug("SEND: %s" % data)
try: try:
self.socket.send(data.encode('utf-8')) enc_data = data.encode('utf-8')
total = len(enc_data)
sent = 0
count = 0
while sent < total and not self.stop.is_set():
sent += self.socket.send(enc_data[sent:])
count += 1
if count > 1:
log.debug('SENT: %d chunks' % count)
self.send_queue.task_done() self.send_queue.task_done()
except Socket.error as serr: except Socket.error as serr:
self.event('socket_error', serr) self.event('socket_error', serr)
log.warning("Failed to send %s" % data) log.warning("Failed to send %s" % data)
self.__failed_send_stanza = data self.__failed_send_stanza = data
self.disconnect(self.auto_reconnect) self.disconnect(self.auto_reconnect)
except KeyboardInterrupt: except Exception as ex:
log.debug("Keyboard Escape Detected in _send_thread") log.exception('Unexpected error in send thread: %s' % ex)
self.event('killed', direct=True) self.exception(ex)
self.disconnect() if not self.stop.is_set():
return self.disconnect(self.auto_reconnect)
except SystemExit:
self.disconnect()
self.event_queue.put(('quit', None, None))
return
def exception(self, exception): def exception(self, exception):
""" """