Reworked the Gmail notification plugin to use stanza objects and expose more information.

This commit is contained in:
Lance Stout 2010-07-11 22:01:51 -04:00
parent d0cb400c54
commit b1c997be1d
2 changed files with 228 additions and 51 deletions

View file

@ -1,57 +1,146 @@
""" """
SleekXMPP: The Sleek XMPP Library SleekXMPP: The Sleek XMPP Library
Copyright (C) 2007 Nathanael C. Fritz Copyright (C) 2010 Nathanael C. Fritz, Lance J.T. Stout
This file is part of SleekXMPP. This file is part of SleekXMPP.
SleekXMPP is free software; you can redistribute it and/or modify See the file license.txt for copying permission.
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
SleekXMPP is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with SleekXMPP; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
""" """
from __future__ import with_statement
from . import base
import logging import logging
from xml.etree import cElementTree as ET from . import base
import traceback from .. xmlstream.handler.callback import Callback
import time from .. xmlstream.matcher.xpath import MatchXPath
from .. xmlstream.stanzabase import ElementBase, ET, JID
from .. stanza.iq import Iq
class GmailQuery(ElementBase):
namespace = 'google:mail:notify'
name = 'query'
plugin_attrib = 'gmail'
interfaces = set(('newer-than-time', 'newer-than-tid', 'q', 'search'))
def getSearch(self):
return self['q']
def setSearch(self, search):
self['q'] = search
def delSearch(self):
del self['q']
class MailBox(ElementBase):
namespace = 'google:mail:notify'
name = 'mailbox'
plugin_attrib = 'mailbox'
interfaces = set(('result-time', 'total-matched', 'total-estimate',
'url', 'threads', 'matched', 'estimate'))
def getThreads(self):
threads = []
for threadXML in self.xml.findall('{%s}%s' % (MailThread.namespace,
MailThread.name)):
threads.append(MailThread(xml=threadXML, parent=None))
return threads
def getMatched(self):
return self['total-matched']
def getEstimate(self):
return self['total-estimate'] == '1'
class MailThread(ElementBase):
namespace = 'google:mail:notify'
name = 'mail-thread-info'
plugin_attrib = 'thread'
interfaces = set(('tid', 'participation', 'messages', 'date',
'senders', 'url', 'labels', 'subject', 'snippet'))
sub_interfaces = set(('labels', 'subject', 'snippet'))
def getSenders(self):
senders = []
sendersXML = self.xml.find('{%s}senders' % self.namespace)
if sendersXML is not None:
for senderXML in sendersXML.findall('{%s}sender' % self.namespace):
senders.append(MailSender(xml=senderXML, parent=None))
return senders
class MailSender(ElementBase):
namespace = 'google:mail:notify'
name = 'sender'
plugin_attrib = 'sender'
interfaces = set(('address', 'name', 'originator', 'unread'))
def getOriginator(self):
return self.xml.attrib.get('originator', '0') == '1'
def getUnread(self):
return self.xml.attrib.get('unread', '0') == '1'
class NewMail(ElementBase):
namespace = 'google:mail:notify'
name = 'new-mail'
plugin_attrib = 'new-mail'
class gmail_notify(base.base_plugin): class gmail_notify(base.base_plugin):
"""
def plugin_init(self): Google Talk: Gmail Notifications
self.description = 'Google Talk Gmail Notification' """
self.xmpp.add_event_handler('sent_presence', self.handler_gmailcheck, threaded=True)
self.emails = [] def plugin_init(self):
self.description = 'Google Talk: Gmail Notifications'
def handler_gmailcheck(self, payload):
#TODO XEP 30 should cache results and have getFeature self.xmpp.registerHandler(
result = self.xmpp['xep_0030'].getInfo(self.xmpp.server) Callback('Gmail Result',
features = [] MatchXPath('{%s}iq/{%s}%s' % (self.xmpp.default_ns,
for feature in result.findall('{http://jabber.org/protocol/disco#info}query/{http://jabber.org/protocol/disco#info}feature'): MailBox.namespace,
features.append(feature.get('var')) MailBox.name)),
if 'google:mail:notify' in features: self.handle_gmail))
logging.debug("Server supports Gmail Notify")
self.xmpp.add_handler("<iq type='set' xmlns='%s'><new-mail xmlns='google:mail:notify' /></iq>" % self.xmpp.default_ns, self.handler_notify) self.xmpp.registerHandler(
self.getEmail() Callback('Gmail New Mail',
MatchXPath('{%s}iq/{%s}%s' % (self.xmpp.default_ns,
def handler_notify(self, xml): NewMail.namespace,
logging.info("New Gmail recieved!") NewMail.name)),
self.xmpp.event('gmail_notify') self.handle_new_mail))
def getEmail(self): self.xmpp.stanzaPlugin(Iq, GmailQuery)
iq = self.xmpp.makeIqGet() self.xmpp.stanzaPlugin(Iq, MailBox)
iq.attrib['from'] = self.xmpp.fulljid self.xmpp.stanzaPlugin(Iq, NewMail)
iq.attrib['to'] = self.xmpp.jid
self.xmpp.makeIqQuery(iq, 'google:mail:notify') self.last_result_time = None
emails = iq.send()
mailbox = emails.find('{google:mail:notify}mailbox') def handle_gmail(self, iq):
total = int(mailbox.get('total-matched', 0)) mailbox = iq['mailbox']
logging.info("%s New Gmail Messages" % total) approx = ' approximately' if mailbox['estimated'] else ''
logging.info('Gmail: Received%s %s emails' % (approx, mailbox['total-matched']))
self.last_result_time = mailbox['result-time']
self.xmpp.event('gmail_messages', iq)
def handle_new_mail(self, iq):
logging.info("Gmail: New emails received!")
self.xmpp.event('gmail_notify')
self.checkEmail()
def getEmail(self, query=None):
return self.search(query)
def checkEmail(self):
return self.search(newer=self.last_result_time)
def search(self, query=None, newer=None):
if query is None:
logging.info("Gmail: Checking for new emails")
else:
logging.info('Gmail: Searching for emails matching: "%s"' % query)
iq = self.xmpp.Iq()
iq['type'] = 'get'
iq['to'] = self.xmpp.jid
iq['gmail']['q'] = query
iq['gmail']['newer-than-time'] = newer
return iq.send()

88
tests/test_gmail.py Normal file
View file

@ -0,0 +1,88 @@
from sleektest import *
import sleekxmpp.plugins.gmail_notify as gmail
class TestGmail(SleekTest):
def setUp(self):
self.stanzaPlugin(Iq, gmail.GmailQuery)
self.stanzaPlugin(Iq, gmail.MailBox)
self.stanzaPlugin(Iq, gmail.NewMail)
def testCreateQuery(self):
"""Testing querying Gmail for emails."""
iq = self.Iq()
iq['type'] = 'get'
iq['gmail']['search'] = 'is:starred'
iq['gmail']['newer-than-time'] = '1140638252542'
iq['gmail']['newer-than-tid'] = '11134623426430234'
self.checkIq(iq, """
<iq type="get">
<query xmlns="google:mail:notify"
newer-than-time="1140638252542"
newer-than-tid="11134623426430234"
q="is:starred" />
</iq>
""")
def testMailBox(self):
"""Testing reading from Gmail mailbox result"""
# Use the example from Google's documentation at
# http://code.google.com/apis/talk/jep_extensions/gmail.html#notifications
xml = ET.fromstring("""
<iq type="result">
<mailbox xmlns="google:mail:notify"
result-time='1118012394209'
url='http://mail.google.com/mail'
total-matched='95'
total-estimate='0'>
<mail-thread-info tid='1172320964060972012'
participation='1'
messages='28'
date='1118012394209'
url='http://mail.google.com/mail?view=cv'>
<senders>
<sender name='Me' address='romeo@gmail.com' originator='1' />
<sender name='Benvolio' address='benvolio@gmail.com' />
<sender name='Mercutio' address='mercutio@gmail.com' unread='1'/>
</senders>
<labels>act1scene3</labels>
<subject>Put thy rapier up.</subject>
<snippet>Ay, ay, a scratch, a scratch; marry, 'tis enough.</snippet>
</mail-thread-info>
</mailbox>
</iq>
""")
iq = self.Iq(xml=xml)
mailbox = iq['mailbox']
self.failUnless(mailbox['result-time'] == '1118012394209', "result-time doesn't match")
self.failUnless(mailbox['url'] == 'http://mail.google.com/mail', "url doesn't match")
self.failUnless(mailbox['matched'] == '95', "total-matched incorrect")
self.failUnless(mailbox['estimate'] == False, "total-estimate incorrect")
self.failUnless(len(mailbox['threads']) == 1, "could not extract message threads")
thread = mailbox['threads'][0]
self.failUnless(thread['tid'] == '1172320964060972012', "thread tid doesn't match")
self.failUnless(thread['participation'] == '1', "thread participation incorrect")
self.failUnless(thread['messages'] == '28', "thread message count incorrect")
self.failUnless(thread['date'] == '1118012394209', "thread date doesn't match")
self.failUnless(thread['url'] == 'http://mail.google.com/mail?view=cv', "thread url doesn't match")
self.failUnless(thread['labels'] == 'act1scene3', "thread labels incorrect")
self.failUnless(thread['subject'] == 'Put thy rapier up.', "thread subject doesn't match")
self.failUnless(thread['snippet'] == "Ay, ay, a scratch, a scratch; marry, 'tis enough.", "snippet doesn't match")
self.failUnless(len(thread['senders']) == 3, "could not extract senders")
sender1 = thread['senders'][0]
self.failUnless(sender1['name'] == 'Me', "sender name doesn't match")
self.failUnless(sender1['address'] == 'romeo@gmail.com', "sender address doesn't match")
self.failUnless(sender1['originator'] == True, "sender originator incorrect")
self.failUnless(sender1['unread'] == False, "sender unread incorrectly True")
sender2 = thread['senders'][2]
self.failUnless(sender2['unread'] == True, "sender unread incorrectly False")
suite = unittest.TestLoader().loadTestsFromTestCase(TestGmail)