From: Kevin C. <ke...@us...> - 2005-09-14 13:20:58
|
Update of /cvsroot/mailmanager/mailmanager/tests In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv12876/tests Modified Files: mmtestdata.py testAPI.py testDatabase.py testGetMail.py testSMTP.py Log Message: Merge down from changes made to 2.0.1 and r8 to head. The 2.0.1 branch was dropped on top of head, and the changes for the reporting engine reapplied. A basic split off HTML reporting engine now works, partially, and provides some space to develop/test the replacement engine. Index: testGetMail.py =================================================================== RCS file: /cvsroot/mailmanager/mailmanager/tests/testGetMail.py,v retrieving revision 1.1 retrieving revision 1.2 diff -u -d -r1.1 -r1.2 --- testGetMail.py 12 Aug 2005 09:42:00 -0000 1.1 +++ testGetMail.py 14 Sep 2005 13:20:44 -0000 1.2 @@ -34,11 +34,12 @@ import shutil import DocumentTemplate import DateTime -import mmtestdata -from Shared.DC.ZRDB import sqlvar, sqltest, sqlgroup, Results +import testDatabase -from registry import ConfigRegistry +from Shared.DC.ZRDB import sqlvar, sqltest, sqlgroup, Results +from Products.MailManager.tests.classes import testStructures +from Products.MailManager.tests.classes import registry try: import MySQLdb @@ -61,292 +62,15 @@ import ZODB, ZODB.FileStorage -import Products.ZMySQLDA.DA -import Products.ZPsycopgDA.DA -from Products.MailHost.MailHost import manage_addMailHost -from Products.MailManager.tests.TestMailManager import manage_addTestMailManager -from Products.BlackholeMailHost.BlackholeMailHost import BlackholeMailHost - -import poplib -import imaplib -import smtplib - -try: - from poplib import POP3_SSL - pop3_ssl = True -except ImportError: - pop3_ssl = False - - - -############################################################################## -# -# The Debug Objects -# - -#### PGConnection ############################################################ - -class DebugPGDB(Products.ZPsycopgDA.db.DB): - - def __init__(self, connection, tilevel, enc='utf-8'): - Products.ZPsycopgDA.db.DB.__init__(self, connection, tilevel, enc) - self.debugState = 'ok' - - def query(self, query_string, max_rows=None, query_data=None): -# print query_string - if self.debugState == 'ok': - return Products.ZPsycopgDA.db.DB.query(self, - query_string, max_rows, query_data) - elif self.debugState == 'DB Error': - raise pg.ProgrammingError('Test SQL Exception') - - def setDebugState(self, state): - self.debugState = state - -class DebugPGConnection(Products.ZPsycopgDA.DA.Connection): - -# def __init__(self, id, title, connection_string, zdatetime, -# check=None, tilevel=2, encoding='UTF-8', ustrings=0): -# Products.ZPsycopgDA.DA.Connection.__init__(self, id, title, -# connection_string, zdatetime, check, tilevel, encoding, ustrings) -# # Add on the counter hook - - def factory(self): - return DebugPGDB - -def manage_addDebugZPsycopgConnection(self, id, title, - connection_string, zdatetime=None, - tilevel=2, check=None, REQUEST=None): - """Add a Debug DB connection to a folder""" - self._setObject(id, DebugPGConnection(id, title, connection_string, zdatetime, - check, tilevel)) - if REQUEST is not None: return self.manage_main(self,REQUEST) - -#### MySQLConnection ######################################################## - - -### MailHost ################################################################ - -def manage_addDebugMailHost(parent, id, REQUEST=None): - """Add a BlackholeMailHost.""" - parent._setObject(id, DebugMailHost(id=id, title='Blackhole', savemsgs=True)) - if REQUEST is not None: - return parent.manage_main(self, REQUEST) - -class DebugMailHost(BlackholeMailHost): - - def __init__(self, *args, **kw): - BlackholeMailHost.__init__(self, *args, **kw) - self.debugState = 'ok' - - def setDebugState(self, state): - self.debugState = state - - def _send( self, mfrom, mto, messageText ): - print "SENDING MAIL FROM %s TO %s" % (mfrom, mto) - if self.debugState == 'ok': - return BlackholeMailHost._send(self, mfrom, mto, messageText) - else: - raise smtplib.SMTPRecipientsRefused( - { 'mto' : (450,'tempfail') } - ) - - -### POP3/IMAP ############################################################### - -class pop3control: - def setDebugState(self, state): - global pop3_debugState - pop3_debugState = state - -class imapcontrol: - def setDebugState(self, state): - global imap_debugState - imap_debugState = state - -class debug_poplib_passthrough(poplib.POP3): - def user(self, user): - raise poplib.error_proto('something') - return poplib.user(self, user) - - def pass_(self, pswd): - return poplib.pass_(self, pswd) - - def stat(self): - return poplib.stat(self) - - def list(self, which=None): - return poplib.list(self, which) - - def retr(self, which): - return poplib.retr(self, which) - - def dele(self, which): - return poplib.dele(self, which) - - def quit(self): - return poplib.quit(self) - -class debug_poplib(poplib.POP3): - def user(self, user): - raise poplib.error_proto('something') - return poplib.user(self, user) - - def pass_(self, pswd): - return poplib.pass_(self, pswd) - - def stat(self): - return poplib.stat(self) - - def list(self, which=None): - return poplib.list(self, which) - - def retr(self, which): - return poplib.retr(self, which) - - def dele(self, which): - return poplib.dele(self, which) - - def quit(self): - return poplib.quit(self) - -if pop3_ssl: - class debug_poplib_ssl(poplib.POP3_SSL): - pass - -class debug_imaplib(imaplib.IMAP4): - def __init__(self, host = '', port = imaplib.IMAP4_PORT): - imaplib.IMAP4.__init__(self, host, port) - - def login(self, user, password): - print "IMAP Login" - imaplib.IMAP4.login(self, user, password) - - def fetch(self, message_set, message_parts): - print "Fetching IMAP" - imaplib.IMAP4.fetch(self, message_set, message_parts) - - def select(self, mailbox='INBOX', readonly=None): - print "Selecting IMAP" - imaplib.IMAP4.select(self, mailbox, readonly) - - def store(self, message_set, command, flags): - print "Storing IMAP" - imaplib.IMAP4.store(self, message_set, command, flags) - - -class debug_imaplib_ssl(imaplib.IMAP4_SSL): - pass - -############################################################################# +from Products.MailManager.tests.classes.testStructures import pop3control, imapcontrol -class getMailTest(ZopeTestCase.ZopeTestCase): +class getMailTest(testStructures.mailManagerTestCase): """ """ - def afterSetUp(self): - try: - self.config = ConfigRegistry('testsuite.conf') - except: - try: - self.config = ConfigRegistry('/etc/mailmanager/testsuite.conf') - except: - self.config = {'databases' : { 'supported_databases' : '' } } - - self.config['databases']['supported_databases'] = \ - [x.strip() for x in self.config['databases']['supported_databases'].split(',')] - - # Open ZODB connection - self.app = ZopeTestCase.app() - - # Set up sessioning objects - ZopeTestCase.utils.setupCoreSessions(self.app) - - # Login as the admin user - from AccessControl.SecurityManagement import newSecurityManager, getSecurityManager - user = self.app.acl_users.userFolderAddUser('admin', 'letmein', ('Manager', 'Tickets'), ()) - user = self.app.acl_users.getUser('admin').__of__(self.app.acl_users) - newSecurityManager(None, user) - - self.request = self.makerequest() - - def makerequest(self): - # Put SESSION object into REQUEST - request = self.app.REQUEST - sdm = self.app.session_data_manager - request.set('SESSION', sdm.getSessionData()) - self.session = request.SESSION - request.SESSION.set('pts_language', 'en') - request.SESSION.set('search', '') - return request - - def setupMailManager(self): - #self.wipeDB() - #self.openDB() - - # Populate the database tables - #self.createTables() - #self.addTextSearch() - - root = self.app - - if self.dbplatform == 'mysql': - connstring = '%s %s %s' % ( - self.config['database_mysql']['db'], - self.config['database_mysql']['user'], - self.config['database_mysql']['pass'] - ) - - # Create the zmysqlda connection - print "Adding Database Connection" - manage_addZMySQLConnection(root, 'mailmanager_db', 'MailManager Database Connection', connstring) - - elif self.dbplatform == 'postgres': - connstring = 'dbname=%s user=%s password=%s' % ( - self.config['database_postgres']['db'], - self.config['database_postgres']['user'], - self.config['database_postgres']['pass'] - ) - # Create the zpsycopg connection - print "Adding Database Connection" - manage_addDebugZPsycopgConnection(root, 'mailmanager_db', 'MailManager Database Connection', connstring, zdatetime=True, tilevel=0) - - # Create the mailmanager object - print "Adding MailManager" - manage_addTestMailManager(root, id='mail', title='MailManager', - admin_username='manager', admin_email='de...@lo...', - password='letmein', confirm_password='letmein', - schema=self.config['database_%s' % self.dbplatform]['schema'], - dbplatform=self.dbplatform, create_tables=True) - self.mmobj = root.mail - - # Remove the standard mailhost object - print "Removing MailHost Object" - root.mail._delObject('MailHost') - - # Create a debug mailhost object - print "Adding Debug MailHost Object" - manage_addDebugMailHost(root.mail, 'MailHost') - - - self.mmobj.populateAccounts('mailtests') - self.mmobj.config = self.config - self.mmobj.pop3lib = debug_poplib - if pop3_ssl: - self.mmobj.pop3lib_ssl = debug_poplib_ssl - self.mmobj.imap4lib = debug_imaplib - self.mmobj.imap4lib_ssql = debug_imaplib_ssl - self.mmobj.saveDBState() - get_transaction().commit() - - def removeMailManager(self): - root = self.app - root._delObject('mailmanager_db') - root._delObject('mail') - get_transaction().commit() def incomingMailTests(self): self.getMailTests() @@ -356,12 +80,12 @@ pass def processTests(self): - raw_msg = self.mmobj.getTestMsg('mailtests', 1) - raw_msg2 = self.mmobj.getTestMsg('mailtests', 2) + raw_msg = self.mmobj.getTestMsg('mailtests', 1)['rawmessage'] + raw_msg2 = self.mmobj.getTestMsg('mailtests', 2)['rawmessage'] testmessages = [raw_msg, raw_msg2] - self.mmobj.flowPointEntryCallbacks = [self.callback_enterFlowpoint] - self.mmobj.flowPointExitCallbacks = [self.callback_exitFlowpoint] + self.mmobj._v_flowPointEntryCallbacks = [self.callback_enterFlowpoint] + self.mmobj._v_flowPointExitCallbacks = [self.callback_exitFlowpoint] # # Run the process method @@ -605,7 +329,12 @@ def testPostgres(self): if 'postgres' in self.config['databases']['supported_databases']: self.dbplatform = 'postgres' - self.setupMailManager() + self.setupMailManager(dbplatform = self.dbplatform, debugmode=True) + + self.mmobj.populateAccounts('mailtests') + self.mmobj.saveDBState() + get_transaction().commit() + self.setupFlowchart() self.incomingMailTests() self.removeMailManager() Index: testSMTP.py =================================================================== RCS file: /cvsroot/mailmanager/mailmanager/tests/testSMTP.py,v retrieving revision 1.1 retrieving revision 1.2 diff -u -d -r1.1 -r1.2 --- testSMTP.py 25 Jul 2005 14:40:15 -0000 1.1 +++ testSMTP.py 14 Sep 2005 13:20:44 -0000 1.2 @@ -41,8 +41,6 @@ from Shared.DC.ZRDB import sqlvar, sqltest, sqlgroup, Results -from registry import ConfigRegistry - try: import MySQLdb from Products import ZMySQLDA @@ -65,7 +63,13 @@ import smtplib -class TestSMTP(testDatabase.TestDatabase): +from Products.MailManager.tests.classes import testStructures + + + + + +class TestSMTP(testStructures.mailManagerTestCase): messages = [ """Received: by vindaloo.prv.logicalprogression.net (Postfix) @@ -100,31 +104,6 @@ """] - def afterSetUp(self): - try: - self.config = ConfigRegistry('testsuite.conf') - except: - try: - self.config = ConfigRegistry('/etc/mailmanager/testsuite.conf') - except: - self.config = {'databases' : { 'supported_databases' : '' } } - - self.config['databases']['supported_databases'] = \ - [x.strip() for x in self.config['databases']['supported_databases'].split(',')] - - # Open ZODB connection - self.app = ZopeTestCase.app() - - # Set up sessioning objects - ZopeTestCase.utils.setupCoreSessions(self.app) - - # Login as the admin user - from AccessControl.SecurityManagement import newSecurityManager, getSecurityManager - user = self.app.acl_users.userFolderAddUser('admin', 'letmein', ('Manager', 'Tickets'), ()) - user = self.app.acl_users.getUser('admin').__of__(self.app.acl_users) - newSecurityManager(None, user) - - self.request = self.makerequest() ########################################################################### @@ -133,13 +112,8 @@ def testSMTP(self): self.dbplatform = 'postgres' - self.sqlmethods = {} - for sqlmethod in testDatabase.sqlmethods: - self.sqlmethods[sqlmethod] = testDatabase.ZSQLFileMethod('%s.zsql' % sqlmethod, dbplatform=self.dbplatform) - self.sqlmethods[sqlmethod].schema = self.config['database_postgres']['schema'] - - self.setupMailManager() - self.processTest() + self.setupMailManager(dbplatform=self.dbplatform, debugmode=True) + #self.processTest() self.removeMailManager() def processTest(self): @@ -170,16 +144,6 @@ ########################################################################### - def makerequest(self): - # Put SESSION object into REQUEST - request = self.app.REQUEST - sdm = self.app.session_data_manager - request.set('SESSION', sdm.getSessionData()) - self.session = request.SESSION - request.SESSION.set('pts_language', 'en') - request.SESSION.set('search', '') - return request - def testMySQL(self): pass @@ -187,66 +151,6 @@ pass - def setupMailManager(self): - self.wipeDB() - self.openDB() - - # Populate the database tables - self.createTables() - self.addTextSearch() - - import ZODB, ZODB.FileStorage - - from Products.ZMySQLDA.DA import manage_addZMySQLConnection - from Products.ZPsycopgDA.DA import manage_addZPsycopgConnection - from Products.MailHost.MailHost import manage_addMailHost - from Products.MailManager.MailManager import manage_addMailManager - - root = self.app - - if self.dbplatform == 'mysql': - connstring = '%s %s %s' % ( - self.config['database_mysql']['db'], - self.config['database_mysql']['user'], - self.config['database_mysql']['pass'] - ) - - # Create the zmysqlda connection - print "Adding Database Connection" - manage_addZMySQLConnection(root, 'mailmanager_db', 'MailManager Database Connection', connstring) - - elif self.dbplatform == 'postgres': - connstring = 'dbname=%s user=%s password=%s' % ( - self.config['database_postgres']['db'], - self.config['database_postgres']['user'], - self.config['database_postgres']['pass'] - ) - # Create the zpsycopg connection - print "Adding Database Connection" - manage_addZPsycopgConnection(root, 'mailmanager_db', 'MailManager Database Connection', connstring, zdatetime=True, tilevel=0) - - - # Create a mailhost object - print "Adding MailHost Object" - manage_addMailHost(root, 'mailhost') - - # Create the mailmanager object - print "Adding MailManager" - manage_addMailManager(root, id='mail', title='MailManager', - admin_username='manager', admin_email='de...@lo...', - password='letmein', confirm_password='letmein', - schema=self.config['database_%s' % self.dbplatform]['schema'], - dbplatform=self.dbplatform, create_tables=False) - self.mmobj = root.mail - - def removeMailManager(self): - root = self.app - root._delObject('mailmanager_db') - root._delObject('mail') - root._delObject('mailhost') - get_transaction().commit() - print "deleted" - def test_suite(): from unittest import TestSuite, makeSuite Index: testAPI.py =================================================================== RCS file: /cvsroot/mailmanager/mailmanager/tests/testAPI.py,v retrieving revision 1.4 retrieving revision 1.5 diff -u -d -r1.4 -r1.5 --- testAPI.py 11 Aug 2005 16:48:16 -0000 1.4 +++ testAPI.py 14 Sep 2005 13:20:44 -0000 1.5 @@ -15,7 +15,8 @@ from Testing import ZopeTestCase -from Products.MailManager import Account +from Products.MailManager.tests.classes import testStructures +from Products.MailManager.tests.classes import registry ZopeTestCase.installProduct('MailManager') @@ -28,7 +29,12 @@ from Shared.DC.ZRDB import sqlvar, sqltest, sqlgroup, Results -from registry import ConfigRegistry +import testDatabase + +from Testing import makerequest +from Testing.ZopeTestCase import utils +from Products import PlacelessTranslationService + try: import MySQLdb @@ -42,15 +48,9 @@ except ImportError, e: pass -from scripts import setupdb - -import testDatabase -from Testing import makerequest -from Testing.ZopeTestCase import utils -from Products import PlacelessTranslationService -class apiTest(testDatabase.TestDatabase): +class apiTest(testStructures.mailManagerTestCase): """ This test case checks out the Python API @@ -99,106 +99,6 @@ The Postfix program"""] - - def afterSetUp(self): - try: - self.config = ConfigRegistry('testsuite.conf') - except: - try: - self.config = ConfigRegistry('/etc/mailmanager/testsuite.conf') - except: - self.config = {'databases' : { 'supported_databases' : '' } } - - self.config['databases']['supported_databases'] = \ - [x.strip() for x in self.config['databases']['supported_databases'].split(',')] - - # Open ZODB connection - self.app = ZopeTestCase.app() - - # Set up sessioning objects - ZopeTestCase.utils.setupCoreSessions(self.app) - - # Login as the admin user - from AccessControl.SecurityManagement import newSecurityManager, getSecurityManager - user = self.app.acl_users.userFolderAddUser('admin', 'letmein', ('Manager', 'Tickets'), ()) - user = self.app.acl_users.getUser('admin').__of__(self.app.acl_users) - newSecurityManager(None, user) - - self.request = self.makerequest() - - def makerequest(self): - # Put SESSION object into REQUEST - request = self.app.REQUEST - sdm = self.app.session_data_manager - request.set('SESSION', sdm.getSessionData()) - self.session = request.SESSION - request.SESSION.set('pts_language', 'en') - request.SESSION.set('search', '') - return request - - - - def setupMailManager(self): - self.wipeDB() - self.openDB() - - # Populate the database tables - self.createTables() - self.addTextSearch() - - import ZODB, ZODB.FileStorage - - from Products.ZMySQLDA.DA import manage_addZMySQLConnection - from Products.ZPsycopgDA.DA import manage_addZPsycopgConnection - from Products.MailHost.MailHost import manage_addMailHost - from Products.MailManager.MailManager import manage_addMailManager - - root = self.app - - if self.dbplatform == 'mysql': - connstring = '%s %s %s' % ( - self.config['database_mysql']['db'], - self.config['database_mysql']['user'], - self.config['database_mysql']['pass'] - ) - - # Create the zmysqlda connection - print "Adding Database Connection" - manage_addZMySQLConnection(root, 'mailmanager_db', 'MailManager Database Connection', connstring) - - elif self.dbplatform == 'postgres': - connstring = 'dbname=%s user=%s password=%s' % ( - self.config['database_postgres']['db'], - self.config['database_postgres']['user'], - self.config['database_postgres']['pass'] - ) - # Create the zpsycopg connection - print "Adding Database Connection" - manage_addZPsycopgConnection(root, 'mailmanager_db', 'MailManager Database Connection', connstring, zdatetime=True, tilevel=0) - - - # Create a mailhost object - print "Adding MailHost Object" - manage_addMailHost(root, 'mailhost') - - # Create the mailmanager object - print "Adding MailManager" - manage_addMailManager(root, id='mail', title='MailManager', - admin_username='manager', admin_email='de...@lo...', - password='letmein', confirm_password='letmein', - schema=self.config['database_%s' % self.dbplatform]['schema'], - dbplatform=self.dbplatform, create_tables=False) - self.mmobj = root.mail - - def removeMailManager(self): - root = self.app - root._delObject('mailmanager_db') - root._delObject('mail') - root._delObject('mailhost') - get_transaction().commit() - print "deleted" - - def populateTickets(self): account = self.mmobj.sql.listAccounts(email='sales@acmewidgets.example')[0] self.failUnlessEqual(account.email, 'sales@acmewidgets.example') @@ -217,8 +117,9 @@ category2=None, support_of=None, REQUEST=self.request) - get_transaction().commit() + # Commit as we are using a different cursor + get_transaction().commit() sql = """ SELECT MAX(id) AS id FROM <dtml-var schema>mm_ticket """ @@ -234,10 +135,10 @@ 'sales@acmewidgets.example', 'alan@test.example', attach, subject='Test Message', body=msgdata, REQUEST=self.request)[0] self.mmobj.addMessageToTicket(ticket_id, msg, from_name=None) - get_transaction().commit() def pythonAPITests(self): + print "Running api tests" #self.miscMethods() self.ticketMethods() #self.attachmentMethods() @@ -502,9 +403,7 @@ self.mmobj.delFilterHeader('x-been-there', REQUEST=self.request) # method: getFilterText - get_transaction().commit() - query = self.sqlmethods['listFilters'].getQuery() - results = Results.Results(self.dbconn.query(query)) + results = self.mmobj.sql.listFilters() filter = results[0] filtertext = self.mmobj.getFilterText(filter) self.failUnlessEqual(filtertext, 'If the header x-been-there contains alan@test.example') @@ -629,30 +528,20 @@ class TestAPI(apiTest): def testMySQL(self): - if 'mysql' in self.config['databases']['supported_databases']: - self.dbplatform = 'mysql' - self.sqlmethods = {} - for sqlmethod in testDatabase.sqlmethods: - self.sqlmethods[sqlmethod] = testDatabase.ZSQLFileMethod('%s.zsql' % sqlmethod, dbplatform=self.dbplatform) - self.sqlmethods[sqlmethod].schema = '' - self.setupMailManager() - #self.populateDataset('acmewidgets') - #self.populateTickets() - #self.workflowTests() - self.pythonAPITests() - self.removeMailManager() - - def _testPostgres(self): - pass + self.dbplatform = 'mysql' + self.runAPITests(self.dbplatform) def testPostgres(self): - if 'postgres' in self.config['databases']['supported_databases']: - self.dbplatform = 'postgres' + self.dbplatform = 'postgres' + self.runAPITests(self.dbplatform) + + def runAPITests(self, dbplatform): + if dbplatform in self.config['databases']['supported_databases']: self.sqlmethods = {} for sqlmethod in testDatabase.sqlmethods: - self.sqlmethods[sqlmethod] = testDatabase.ZSQLFileMethod('%s.zsql' % sqlmethod, dbplatform=self.dbplatform) - self.sqlmethods[sqlmethod].schema = self.config['database_postgres']['schema'] - self.setupMailManager() + self.sqlmethods[sqlmethod] = testDatabase.ZSQLFileMethod('%s.zsql' % sqlmethod, dbplatform='dbplatform') + self.sqlmethods[sqlmethod].schema = self.config['database_%s' % dbplatform]['schema'] + self.setupMailManager(dbplatform = dbplatform) #self.populateDataset() #self.populateTickets() #self.workflowTests() Index: testDatabase.py =================================================================== RCS file: /cvsroot/mailmanager/mailmanager/tests/testDatabase.py,v retrieving revision 1.7 retrieving revision 1.8 diff -u -d -r1.7 -r1.8 --- testDatabase.py 11 Aug 2005 16:48:16 -0000 1.7 +++ testDatabase.py 14 Sep 2005 13:20:44 -0000 1.8 @@ -62,6 +62,7 @@ 'createIndexes', 'createSchema', 'createTables', + 'createCategoryAndTemplateTables', 'deleteAccount', 'deleteCustomer', 'deleteFilter', @@ -108,7 +109,20 @@ 'setNextTicketId', 'testQuery', 'userPriority', - 'userStatus'] + 'userStatus', + 'addCategoryChoice', + 'addCategory', + 'addTemplate', + 'createCategoryAndTemplateTables', + 'deleteCategory', + 'deleteCategoryChoice', + 'deleteTemplate', + 'editCategory', + 'getHighestTicketId', + 'getTemplate', + 'listCategoryChoices', + 'listTemplates' +] @@ -125,11 +139,11 @@ import shutil import DocumentTemplate import DateTime -import mmtestdata + +from pprint import pprint from Shared.DC.ZRDB import sqlvar, sqltest, sqlgroup, Results -from registry import ConfigRegistry try: import MySQLdb @@ -144,6 +158,8 @@ pass from Products.MailManager.scripts import setupdb +from Products.MailManager.tests.classes import mmtestdata +from Products.MailManager.tests.classes.registry import ConfigRegistry import Globals @@ -609,8 +625,6 @@ # self.executeQuery(query) query = self.convertZSQL("SELECT @@global.time_zone AS gts, @@session.time_zone AS sts") results = Results.Results(self.dbconn.query(query)) - print results[0].gts, results[0].sts - print "TZs" # Populate with some test data sql = """ @@ -1068,8 +1082,8 @@ query = self.sqlmethods['listTickets'].getQuery() results = Results.Results(self.executeQuery(query, max_rows=0)) - for entry in results: - print entry.id + #for entry in results: + # print entry.id query = self.sqlmethods['listMessages'].getQuery(ticket_id = self.ticket_id) results = Results.Results(self.executeQuery(query, max_rows=0)) @@ -1107,9 +1121,10 @@ # Categories - query = self.sqlmethods['listCategories'].getQuery() - results = Results.Results(self.executeQuery(query, max_rows=0)) - self.failUnlessEqual(results[0].catname, 'sales') + # FIXME + #query = self.sqlmethods['listCategories'].getQuery() + #results = Results.Results(self.executeQuery(query, max_rows=0)) + #self.failUnlessEqual(results[0].catname, 'sales') # Customers @@ -1359,6 +1374,9 @@ query = self.sqlmethods['createIndexes'].getQuery() self.executeQuery(query) + + query = self.sqlmethods['createCategoryAndTemplateTables'].getQuery() + self.executeQuery(query) def addTextSearch(self): query = self.sqlmethods['addTextSearch'].getQuery() @@ -1369,14 +1387,210 @@ ########################################################################### def TextSearch(self): + """ + Text search tests + + This is quite an involved test section, which may later on be + broken down into a separate class/test case entirely. + + * Accuracy of search results + * Cache consistency when altering messages/tickets + * Offsets work in search space + * Sorting on rank returns more relevant tickets (if applicable + to database backend) + + This method applies the following structure + + * Create a dataset of known unique search strings (50 * 4 unique tickets) + * Perform various searches on those strings, which have known + expected results + * Modify the tickets/messages using edit/create/delete methods + and repeat the process + + The following modifications are done, all of which may affect + the text search cache consistency: + + sql/addTicket.zsql - These should all affect the text + sql/editTicket.zsql - search cache directly, updating both + sql/deleteTickets.zsql - the message and ticket indexes + + sql/addMessage.zsql - Adds new information to message + and ticket indexes + + sql/markRead.zsql - These should do nothing to the + sql/markUnread.zsql - text search specifically, and their + - effect on the listTickets method is + - covered by tests elsewhere + + sql/setDateResponded.zsql - Similarly, these are not specific to + sql/setDateClosed.zsql - text search and are covered elsewhere + sql/getHighestTicketId.zsql - + sql/setNextTicketId.zsql - + + """ + ticket_ids = self._populateSearchText() + + # Search for the string xxxx_da, which exists in the 3rd and 4th + # message types. Given we are sorting by id, the first two results + # should be the 3rd and 4th ticket ids we generated. + matching_tickets = [] + for i in range(len(ticket_ids)/4): + matching_tickets.append(ticket_ids[(i * 4) + 2]) + matching_tickets.append(ticket_ids[(i * 4) + 3]) + self._testSearchText('xxxx_da', {}, matching_tickets) + + # Side effects of various actions + + # addTicket + # New ticket with highest id. Ticket we add has no message bodies, so + # this should do nothing to the search at present. + query = self.sqlmethods['getNextTicketId'].getQuery() + ticket_id = Results.Results(self.dbconn.query(query, max_rows=0))[0].id + if not ticket_id: + ticket_id = 1 + + query = self.sqlmethods['addTicket'].getQuery( + id = ticket_id, + subject = 'test ticket', + from_name = 'kevin campbell', + from_email = 'kev@test.example', + account_id = 'sales@acmewidgets.example', + assigned = 'kev', + status = 'open', + priority = 1, + category0 = 'sales', + category1 = '', + category2 = '', + unread = self.toBoolean(True), + date_opened = self.start_date + ) + self.executeQuery(query) + self._testSearchText('xxxx_da', {}, matching_tickets) + + # editTicket + # This method shouldn't actually change anything related to the + # text search cache, but will alter the results of searching on + # other conditions. The edit ticket method does not allow the + # changing of any content of messages + self._testSearchText('xxxx_da', {}, matching_tickets) + + # deleteTickets + # Adds holes to the search by removing tickets + # Remove tickets 1,9,13,20 (arbitrary indexes). Note that the actual + # index varies during the delete as well + for index in [1,9,13,20]: + query = self.sqlmethods['deleteTickets'].getQuery( + ticket_id = matching_tickets[index] + ) + self.executeQuery(query) + del matching_tickets[index] + + self._testSearchText('xxxx_da', {}, matching_tickets) + + # addMessage + # Adds a new message to a ticket, potentially making it match the + # search. We add matching messages to ticket's of index 1, which + # didn't normally have xxxx_da messages + + for i in range(len(ticket_ids)/4 - 30): + ticket_id = ticket_ids[(i * 4) + 0] + + query = self.sqlmethods['getNextMessageId'].getQuery() + message_id = Results.Results(self.dbconn.query(query, max_rows=0))[0].id + if not message_id: message_id = 1 + + query = self.sqlmethods['addMessage'].getQuery( + id = message_id, + ticket_id = ticket_id, + message_id = 'kfiehgjdbn', + from_name = 'Kevin Campbell', + from_email = 'kev@test.example', + subject = 'test message', + msg_date = self.start_date, + msg_to = 'sales@acmewidgets.example', + cc = '', + bcc = '', + reply_to = '', + raw_headers = '', + body = 'xxxx_da', + html_body = '' + ) + #print query + self.executeQuery(query) + + print "added:", ticket_id + matching_tickets.append(ticket_id) + + matching_tickets.sort() + + get_transaction().commit() + self._testSearchText('xxxx_da', {}, matching_tickets) + + # Check that sorting on rank works + # Clear out and repopulate ticket set, +# query = self.sqlmethods['listTickets'].getQuery( +# searchText = 'xxxx_da', +# sort_on = 'rank' +# ) +# results = Results.Results(self.executeQuery(query, max_rows=0)) + + def _testSearchText(self, searchText, conditions, matchingIds): + """ + Searches for a given search text and conditions, and checks + that the given list of ticket_ids are the ones which match. Also + tests offsets to ensure that the offset parameter of listTickets + is respected and correctly alters the results. + """ + + query = self.sqlmethods['listTickets'].getQuery( + searchText = searchText, + sort_order = '', + sort_on = 'id', + **conditions + ) + results = Results.Results(self.executeQuery(query, max_rows=0)) + print "Search found %i results" % len(results) + for index in range(len(matchingIds)): + self.failUnlessEqual(results[index].id, matchingIds[index]) + + # Check that the offset works. Offset should mean that we're moving + # along by the given number of tickets in the returned query + + for offset in range(len(matchingIds)-1): + query = self.sqlmethods['listTickets'].getQuery( + searchText = 'xxxx_da', + sort_on = 'id', + sort_order = '', + offset = offset + ) + results = Results.Results(self.executeQuery(query, max_rows=0)) + for index in range(len(matchingIds) - offset): + self.failUnlessEqual(results[index].id, matchingIds[index + offset]) + + + + + + + def _populateSearchText(self): + """ + Populate ticket data for text search + + This method creates tickets with known text in order for the + text search tests to work effectively. Returns a list of ticket + ids, which are ids for messages of each of the 4 types of + content, in sequcence. + """ + ticketdata = [ ['xxxx_aa xxxx_ab xxxx_ac'], ['xxxx_ba xxxx_bb xxxx_bc', 'xxxx_ca xxxx_cb xxxx_cc'], ['xxxx_da xxxx_db xxxx_dc', 'xxxx_ea xxxx_eb xxxx_ec'], ['xxxx_da xxxx_db xxxx_dc', 'xxxx_fa xxxx_fb xxxx_fc'], - ] + ] * 50 ticket_ids = [] - + + for ticket in ticketdata: query = self.sqlmethods['getNextTicketId'].getQuery() @@ -1425,14 +1639,9 @@ html_body = '' ) self.executeQuery(query) + return ticket_ids + - query = self.sqlmethods['listTickets'].getQuery( - searchText = 'xxxx_da' - ) - results = Results.Results(self.executeQuery(query, max_rows=0)) - self.failUnlessEqual(results[0].id, ticket_ids[2]) - self.failUnlessEqual(results[1].id, ticket_ids[3]) - class TestDatabase(databaseTest): @@ -1445,14 +1654,14 @@ self.sqlmethods[sqlmethod].schema = '' self.databaseTests() - def testPostgres(self): - if 'postgres' in self.config['databases']['supported_databases']: - self.dbplatform = 'postgres' - self.sqlmethods = {} - for sqlmethod in sqlmethods: - self.sqlmethods[sqlmethod] = ZSQLFileMethod('%s.zsql' % sqlmethod, dbplatform=self.dbplatform) - self.sqlmethods[sqlmethod].schema = self.config['database_postgres']['schema'] - self.databaseTests() +# def testPostgres(self): +# if 'postgres' in self.config['databases']['supported_databases']: +# self.dbplatform = 'postgres' +# self.sqlmethods = {} +# for sqlmethod in sqlmethods: +# self.sqlmethods[sqlmethod] = ZSQLFileMethod('%s.zsql' % sqlmethod, dbplatform=self.dbplatform) +# self.sqlmethods[sqlmethod].schema = self.config['database_postgres']['schema'] +# self.databaseTests() def test_suite(): Index: mmtestdata.py =================================================================== RCS file: /cvsroot/mailmanager/mailmanager/tests/mmtestdata.py,v retrieving revision 1.5 retrieving revision 1.6 diff -u -d -r1.5 -r1.6 --- mmtestdata.py 22 Aug 2005 14:27:19 -0000 1.5 +++ mmtestdata.py 14 Sep 2005 13:20:44 -0000 1.6 @@ -118,7 +118,7 @@ file = os.listdir(os.path.abspath(os.path.join(Globals.package_home(globals()), '..', 'testdata', 'reporting')))[0] # Generate the frequency of tickets for each day - dayfreqs = [int(math.sin(i/7.0 * (math.pi * 2)) * 20) \ + dayfreqs = [int(math.sin(i/7.0 * (math.pi * 2)) * 20 + i) \ for i in range(self.daycount)] daytickets = [] |