[PyWebMail-Checkins] webmail checker.py,1.16,1.17
Status: Beta
Brought to you by:
dubnerm
|
From: Michael D. <du...@us...> - 2008-03-02 10:53:06
|
Update of /cvsroot/pywebmail/webmail In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv21338 Modified Files: checker.py Log Message: support verify folders, mail receive and delete Index: checker.py =================================================================== RCS file: /cvsroot/pywebmail/webmail/checker.py,v retrieving revision 1.16 retrieving revision 1.17 diff -C2 -d -r1.16 -r1.17 *** checker.py 6 Dec 2007 10:12:44 -0000 1.16 --- checker.py 2 Mar 2008 10:53:08 -0000 1.17 *************** *** 28,31 **** --- 28,39 ---- unixfrom = 'From - Thu Jan 01 00:00:00 1970\n' + outmail_template = '''\ + From: %(from)s + To: %(to)s + Subject: %(subject)s + + %(body)s + ''' + class TestProcess: def __init__(self, test_name, main_config, test_config): *************** *** 48,51 **** --- 56,61 ---- mailbox_info = ([folder]+mailbox_name.split('#',1))[-2:] self.mailboxes.append(mailbox_info) + self.test_key = '%X' % (id(self),) + self.support_outbox = hasattr(self.parent, 'get_outbox_by_name') def _get_verify(self): *************** *** 92,96 **** def msg_prepare_literal(self, msg): msg.rewindbody() ! return ''.join(msg.headers)+'\n'+(msg.fp.read().rstrip('\n'))+'\n' def msg_prepare_relaxed(self, msg): --- 102,106 ---- def msg_prepare_literal(self, msg): msg.rewindbody() ! return ''.join(msg.headers).replace('\r\n','\n')+'\n'+(msg.fp.read().replace('\r\n','\n').rstrip('\n'))+'\n' def msg_prepare_relaxed(self, msg): *************** *** 147,164 **** log.info('Starting test %s...', self.name) try: getattr(self, 'perform_'+self.verify)() log.info('Test %s (%s) succeded!', self.name, self.verify) return 1,'Ok' except Exception, e: ! log.info('Test %s (%s) failed:', self.name, self.verify, exc_info=1) if type(e) not in (type(u''), type('')): try: e = unicode(e) ! except UnicodeEncodeError: ! e = str(e) if type(e) == type(u'') and self.parent.output_encoding is not None: e = e.encode(self.parent.output_encoding, 'replace') return 0,e def perform_references(self): tests = [] --- 157,349 ---- log.info('Starting test %s...', self.name) try: + test_step = 'parse' + want_delete = a2bool(self.config.get('verify_delete', '0')) + want_outbox = self.config.has_key('verify_outbox') + want_folders = self.config.has_key('verify_folders') + if want_folders: + test_step = 'folders' + self.perform_folders() + log.info('Folder list test %s succeded!', self.name) + test_step = self.verify getattr(self, 'perform_'+self.verify)() log.info('Test %s (%s) succeded!', self.name, self.verify) + if want_outbox: + if self.support_outbox: + test_step = 'outbox' + self.perform_outbox() + log.info('Sending test %s succeded!', self.name) + else: + want_delete = want_delete and self.config.has_key('delete_pattern') + if want_delete: + test_step = 'delete' + self.perform_delete() + log.info('Deleting test %s succeded!', self.name) return 1,'Ok' except Exception, e: ! log.info('Test %s (%s) failed:', self.name, test_step, exc_info=1) if type(e) not in (type(u''), type('')): try: e = unicode(e) ! except (UnicodeEncodeError,UnicodeDecodeError): ! try: ! e = unicode(e,'utf-8') ! except (TypeError,UnicodeEncodeError,UnicodeDecodeError): ! e = str(e) ! try: ! e = unicode(e,'utf-8') ! except (UnicodeEncodeError,UnicodeDecodeError): ! pass if type(e) == type(u'') and self.parent.output_encoding is not None: e = e.encode(self.parent.output_encoding, 'replace') return 0,e + def perform_folders(self): + encoding = self.config.get('folders_encoding') + compare_lower = a2bool(self.config.get('folders_compare_lower','0')) + inbox_name = self.config.get('folders_inbox',None) + folders = self.config['verify_folders'] + if encoding: + folders = folders.decode(encoding) + inbox_name = inbox_name.decode(encoding) + if compare_lower: + folders = folders.lower() + inbox_name = inbox_name.lower() + folders = folders.split(',') + folders.sort() + + mailbox_name = self.config.get('folders_mailbox') + if mailbox_name is None: + mailbox_name = self.config['mailbox'] + mailbox = self.parent.get_mailbox_by_name(mailbox_name) + + mailbox.open() + try: + if inbox_name is not None: + real_inbox_name = mailbox.get_inbox_name() + if compare_lower: + real_inbox_name = real_inbox_name.lower() + if real_inbox_name != inbox_name: + raise TestFailed, "Folder list test failed: inbox name doesn't match (%s!=%s)" % (real_inbox_name, inbox_name) + real_folders = mailbox.list() + if compare_lower: + real_folders = map(lambda s:s.lower(), real_folders) + real_folders.sort() + if real_folders != folders: + raise TestFailed, "Folder list test failed: %s!=%s" % (real_folders, folders) + finally: + mailbox.close() + + def perform_outbox(self): + max_attempts = int(self.config.get('outmail_attempts','10')) + outmail_sleep = int(self.config.get('outmail_wait','60')) + + outbox_name = self.config['verify_outbox'] + outbox = self.parent.get_outbox_by_name(outbox_name) + + mailbox_name = self.config['outbox_mailbox'] + mailbox = self.parent.get_mailbox_by_name(mailbox_name) + + subject = 'PyWebmail outbox test:'+self.test_key + outmail_dict = { + 'from':outbox.email, + 'to':mailbox.email, + 'subject':subject, + 'body':subject+'\n', + } + + outbox.send(outmail_template % outmail_dict) + + done = 0 + for attempt in xrange(1,max_attempts+1): + time.sleep(outmail_sleep) + log.info('Starting attempt %d/%d to get test mail', attempt, max_attempts) + try: + if not mailbox.closed: mailbox.close() + mailbox.open() + folder = None + try: + folder = mailbox.get() + for msginfo in folder.list(): + if msginfo.subject is None: + msginfo.read_details(1) + if msginfo.subject == subject: + log.info('Found test mail %s with subject %s', msginfo.uid, msginfo.subject) + log.debug('Found test mail %s: date=%s, sender=%s, recipients=%s, size=%s', + msginfo.uid, msginfo.date, msginfo.sender, msginfo.recipients, msginfo.size) + done = 1 + else: + log.debug('Skipping mail %s (%s!=%s): date=%s, sender=%s, recipients=%s, size=%s', + msginfo.uid, msginfo.subject, subject, msginfo.date, + msginfo.sender, msginfo.recipients, msginfo.size) + finally: + if folder is not None: + folder.close() + mailbox.close() + except Exception, e: + log.info('Get outmail attempt %d/%d failed:', attempt, max_attempts, exc_info=1) + if done: + log.debug('Outbox test succeeded after %d/%d attempts', attempt, max_attempts) + return + raise TestFailed, 'Outbox test failed to found mail after %d attempts' % (max_attempts,) + + def perform_delete(self): + allow_empty_delete = a2bool(self.config.get('allow_empty_delete','0')) + delete_check_list = a2bool(self.config.get('delete_check_list','1')) + has_outbox = self.config.has_key('verify_outbox') + if has_outbox and self.support_outbox: + mailbox_name = self.config.get('delete_mailbox', self.config['outbox_mailbox']) + subject_re = re.escape('PyWebmail outbox test:'+self.test_key) + folder_name = self.config.get('delete_folder') + else: + mailbox_name = self.config['delete_mailbox'] + subject_re = self.config['delete_pattern'] + folder_name = self.config.get('delete_folder', self.config.get('folder')) + subject_re = re.compile(subject_re) + mailbox = self.parent.get_mailbox_by_name(mailbox_name) + + mailbox.open() + folder_name = folder_name or mailbox.get_inbox_name() + folder = mailbox.get(folder_name) + found = [] + for msginfo in folder.list(): + if msginfo.subject is None: + msginfo.read_details(1) + if msginfo.subject and subject_re.match(msginfo.subject): + log.debug('Found mail %s: subject=%s, date=%s, sender=%s, recipients=%s, size=%s', + msginfo.uid, msginfo.subject, msginfo.date, + msginfo.sender, msginfo.recipients, msginfo.size) + found.append(msginfo.uid) + msginfo.delete() + else: + log.debug('Leaving mail %s: subject=%s, date=%s, sender=%s, recipients=%s, size=%s', + msginfo.uid, msginfo.subject, msginfo.date, + msginfo.sender, msginfo.recipients, msginfo.size) + folder.close() + mailbox.close() + if found: + still_here = [] + mailbox.open() + folder = mailbox.get(folder_name) + if delete_check_list: + folder_list = map(lambda msginfo:msginfo.uid, folder.list()) + for uid in found: + if uid in folder_list: + still_here.append(uid) + else: + for uid in found: + try: + msginfo = folder.get(uid) + still_here.append(uid) + except: + log.info('Get deleted mail failed - this is OK.', exc_info=1) + folder.close() + mailbox.close() + if still_here: + raise TestFailed, 'Deletion test: Mail still here: '+(', '.join(still_here)) + else: + log.warn("Deletion test can't proceed because no mail found for mask %s", repr(subject_re.pattern)) + if not allow_empty_delete: + raise TestFailed, 'Deletion test: No mail found (mask=%s)' % (subject_re.pattern,) + def perform_references(self): tests = [] *************** *** 175,178 **** --- 360,366 ---- assert res, 'Test %s failed' % (test.name,) + def perform_ok(self): + pass + def perform_simple(self): test_mbx_file0 = self.config.get('test_mbx', None) *************** *** 340,344 **** def perform_orderedmailboxes(self): orderby = self.config.get('orderby', 'none').lower().strip() ! assert orderby in ('none', 'uid', 'size', '!size', 'date', '!date', 'msghash') orderforce = orderby[:1] == '!' if orderforce: orderby = orderby[1:] --- 528,532 ---- def perform_orderedmailboxes(self): orderby = self.config.get('orderby', 'none').lower().strip() ! assert orderby in ('none', 'uid', 'size', '!size', 'date', '!date', 'mailid', 'headhash', 'msghash') orderforce = orderby[:1] == '!' if orderforce: orderby = orderby[1:] *************** *** 372,379 **** msginfo.read_details(1) l.sort(lambda a,b:cmp(a.date,b.date)) elif orderby == 'msghash': for msginfo in l: msg1 = msginfo.get() ! msginfo.msghash = hash(self.msg_prepare_literal(msg1)) l.sort(lambda a,b:cmp(a.msghash,b.msghash)) elif orderby != 'none': --- 560,580 ---- msginfo.read_details(1) l.sort(lambda a,b:cmp(a.date,b.date)) + elif orderby == 'mailid': + for msginfo in l: + msg1 = msginfo.get() + mailid = msg1.get('message-id') + if mailid is None: + mailid = hex(hash(''.join(msg1.headers)))+'@localhost' + msginfo.mailid = mailid + l.sort(lambda a,b:cmp(a.mailid,b.mailid)) + elif orderby == 'headhash': + for msginfo in l: + msg1 = msginfo.get() + msginfo.headhash = hex(hash(''.join(msg1.headers))) + l.sort(lambda a,b:cmp(a.headhash,b.headhash)) elif orderby == 'msghash': for msginfo in l: msg1 = msginfo.get() ! msginfo.msghash = hex(hash(self.msg_prepare_literal(msg1))) l.sort(lambda a,b:cmp(a.msghash,b.msghash)) elif orderby != 'none': *************** *** 455,459 **** mbx_file = self.config.get('mbx', None) assert mbx_file ! mbx = open(mbx_file).read() i = 0 for folder_name,mailbox_name in self.mailboxes: --- 656,660 ---- mbx_file = self.config.get('mbx', None) assert mbx_file ! mbx = open(mbx_file, 'rb').read() i = 0 for folder_name,mailbox_name in self.mailboxes: *************** *** 564,567 **** --- 765,773 ---- return webmail.settings.MailboxesListSetup.get_mailbox_by_name(self, name, self.mailbox_settings) + if hasattr(webmail.settings.MailboxesListSetup, 'get_outbox_by_name'): + def get_outbox_by_name(self, name): + log.debug("get_outbox_by_name(%s): mailbox_settings=%s", name, self.mailbox_settings) + return webmail.settings.MailboxesListSetup.get_outbox_by_name(self, name, self.mailbox_settings) + def get_mailbox(self, email, password): return webmail.settings.MailboxesListSetup.get_mailbox(self, email, password, self.mailbox_settings) |